0

I am trying to manipulate the csv-file from https://www.kaggle.com/raymondsunartio/6000-nasdaq-stocks-historical-daily-prices using dask.dataframe. The original dataframe has columns 'date', 'ticker', 'open', 'close', etc...

My goal is to create a new data frame with index 'date' and columns as the closing price of each unique ticker.

The following code does the trick, but is quite slow, using almost a minute for N = 6. I suspect that dask tries to read the CSV-file multiple times in the for-loop, but I don't know how I would go about making this faster. My initial guess is that using df.groupby('ticker') somewhere would help, but I am not familiar enough with pandas.

import dask.dataframe as dd
from functools import reduce

def load_and_fix_csv(path: str, N: int, tickers: list = None) -> dd.DataFrame:
    raw = dd.read_csv(path, parse_dates=["date"])
    if tickers is None:
        tickers = raw.ticker.unique().compute()[:N] # Get unique tickers
    dfs = []
    for tick in tickers:
        tmp = raw[raw.ticker == tick][["date", "close"]] # Temporary dataframe from specific ticker with columns date, close
        dfs.append(tmp)
    df = reduce(lambda x, y: dd.merge(x, y, how="outer", on="date"), dfs) # Merge all dataframes on date
    df = df.set_index("date").compute()
    return df

Every kind of help is appreciated! Thank you.

1 Answer 1

1

I'm pretty sure you're right that Dask is likely going "back to the well" for each loop; this is because Dask builds a graph of operations and attempts to defer computation until forced or necessary. One thing I like to do is to cut the reading operations of the graph with Client.persist:

from distributed import Client

client = Client()


def persist_load_and_fix_csv(path: str, N: int, tickers: list = None) -> dd.DataFrame:
    raw = dd.read_csv(path, parse_dates=["date"])

    # This "cuts the graph" prior operations (just the `read_csv` here)
    raw = client.persist(raw)
    if tickers is None:
        tickers = raw.ticker.unique().compute()[:N] # Get unique tickers
    dfs = []
    for tick in tickers:
        tmp = raw[raw.ticker == tick][["date", "close"]] # Temporary dataframe from specific ticker with columns date, close
        dfs.append(tmp)
    df = reduce(lambda x, y: dd.merge(x, y, how="outer", on="date"), dfs) # Merge all dataframes on date
    df = df.set_index("date").compute()
    return df

In a Kaggle session I tested both functions with persist_load_and_fix_csv(csv_path, N=3) and managed to cut the time in half. You'll also get better performance by only keeping the columns you end up using.

(Note: I've found that, at least for me and my code, if I start seeing .compute() crop up in functions that I should step back and reevaluate the code paths; I view it as a code smell)

Sign up to request clarification or add additional context in comments.

2 Comments

Thank you, I will try this! By "keeping the columns I end up using", do you mean while reading the csv-file?
Sure it could be done at that point, basically try not to lock up memory with data that you won't end up using -- at least drop the column before the client.persist

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.