0

I'm trying to think of how to rewrite some code asynchroniously. I have to download ~7500 datasets from an api and write them to .csv's. Here is a reproducible example (assuming you have a free api key for alpha vantage):

from alpha_vantage.timeseries import TimeSeries
import pandas as pd
import numpy as np
api_key = ""

def get_ts(symbol):
    
    ts = TimeSeries(key=api_key, output_format='pandas')
    data, meta_data = ts.get_daily_adjusted(symbol=symbol, outputsize='full')
    fname = "./data_dump/{}_data.csv".format(symbol)
    data.to_csv(fname)

symbols = ['AAPL', 'GOOG', 'TSLA', 'MSFT']

for s in symbols:
    get_ts(s)

The people who made the alpha_vantage API wrote an article on using it with asyncio here, but I'm not sure if I should make two functions for pulling the data and writing the csv like here.

I haven't used asyncio before, so any pointers would be greatly appreciated - just looking to make my download time take less than 3 hours if possible!

Edit: The other caveat is I'm helping a researcher with this so we are using Jupyter notebooks - see their caveat for asyncio here.

1 Answer 1

1

Without changing your function get_ts, it might look like this:

import multiprocessing

# PROCESSES = multiprocessing.cpu_count()
PROCESSES = 4  # number of parallel process
CHUNKS = 6  # one process handle n symbols

# 7.5k symbols
TICKERS = ["BCDA", "WBAI", "NM", "ZKIN", "TNXP", "FLY", "MYSZ", "GASX", "SAVA", "GCE",
           "XNET", "SRAX", "SINO", "LPCN", "XYF", "SNSS", "DRAD", "WLFC", "OILD", "JFIN",
           "TAOP", "PIC", "DIVC", "MKGI", "CCNC", "AEI", "ZCMD", "YVR", "OCG", "IMTE",
           "AZRX", "LIZI", "ORSN", "ASPU", "SHLL", "INOD", "NEXI", "INR", "SLN", "RHE-PA",
           "MAX", "ARRY", "BDGE", "TOTA", "PFMT", "AMRH", "IDN", "OIS", "RMG", "IMV",
           "CHFS", "SUMR", "NRG", "ULBR", "SJI", "HOML", "AMJL", "RUBY", "KBLMU", "ELP"]

# create a list of n sublist
TICKERS = [TICKERS[i:i + CHUNKS] for i in range(0, len(TICKERS), CHUNKS)]


def download_data(pool_id, symbols):
    for symbol in symbols:
        print("[{:02}]: {}".format(pool_id, symbol))
        # do stuff here
        # get_ts(symbol)


if __name__ == "__main__":
    with multiprocessing.Pool(PROCESSES) as pool:
        pool.starmap(download_data, enumerate(TICKERS, start=1))

Similar question here.

In this example, I split the list of tickers into sublists for each process retrieves data for multiple symbols and limits overhead due to create and destroy processes.

Sign up to request clarification or add additional context in comments.

2 Comments

thanks for this, it looks helpful and less confusing (to me) than using asyncio! i'll try it out and mark as accepted if it works well
worked great, thanks so much this is awesome!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.