0

I have multiple .csv files in a folder. Each .csv file has trading data from a list of stocks. I wanted to take a specific section of data (in this case from the 'BABA' ticker) from each .csv, and then combine multiple days worth of sections. Due to the global interpreter lock it takes ~15 minutes to do this using a standard For Loop on 150 .csv files.

GOAL: speed up the For Loop using multiprocessing

PROBLEM: when using multiprocessing I receive the error: AttributeError: Can't pickle local object 'main.locals.compile' Full traceback of error at bottom.

The below code is slow but works using a for loop:

def main():

    import pandas as pd
    import glob
    import numpy as np
    import multiprocessing
    from multiprocessing import Pool

    path = '/Users/DataFiles' #multiple .csv files located here
    file_list = glob.glob(path + '/*.csv')
    stock_list = []

    def compile(file):
        df = pd.read_csv(file)
        df = df.loc[df['UnderlyingSymbol'] == 'BABA'] #select a certain section of data for ticker from .csv file

#=== make some changes to dataframe ===

        def market_delta():
            return np.sum(df['Delta'] * df['OpenInterest']) / np.sum(df['OpenInterest'])
        df['DMarket'] = market_delta().round(4)

#=== append dataframes to list ===

        stock_list.append(df)

#=== using FOR LOOP ===

    for file in file_list:
        compile(file)

#=== combine dataframes from list into one dataframe and export

stock_list_merged = pd.concat(stock_list)
stock_list_merged.to_csv('scholes expanded.csv', index = False)

if __name__ == '__main__':
    main()

When I change the for loop to this code for multiprocessing I receive the error.

def main():

    import pandas as pd
    import glob
    import numpy as np
    import multiprocessing
    from multiprocessing import Pool

    path = '/Users/DataFiles' #multiple .csv files located here
    file_list = glob.glob(path + '/*.csv')
    stock_list = []

    def compile(file):
        df = pd.read_csv(file)
        df = df.loc[df['UnderlyingSymbol'] == 'BABA'] #select a certain section of data for ticker from .csv file

#=== make some changes to dataframe ===

        def market_delta():
            return np.sum(df['Delta'] * df['OpenInterest']) / np.sum(df['OpenInterest'])
        df['DMarket'] = market_delta().round(4)

#=== append dataframes to list ===

        stock_list.append(df)

#=== using MULTIPROCESSING ===

    pool = Pool(processes = (multiprocessing.cpu_count()-1))
    results = pool.map(compile, file_list)
    pool.close()
    pool.join()
    results_df = pd.concat(results)

#=== combine dataframes from list into one dataframe and export

stock_list_merged = pd.concat(stock_list)
stock_list_merged.to_csv('scholes expanded.csv', index = False)

if __name__ == '__main__':
    main()

Error Traceback: Traceback (most recent call last): File "/Users/andrewbochat/Desktop/OptionsData/practice multiprocessing.py", line 54, in main() File "/Users/andrewbochat/Desktop/OptionsData/practice multiprocessing.py", line 41, in main results = pool.map(compile, file_list) File "/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/pool.py", line 364, in map return self._map_async(func, iterable, mapstar, chunksize).get() File "/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/pool.py", line 771, in get raise self._value File "/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/pool.py", line 537, in _handle_tasks put(task) File "/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/connection.py", line 211, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/Users/andrewbochat/opt/anaconda3/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) AttributeError: Can't pickle local object 'main.locals.compile'

3
  • (1) Format the code correctly. (2) Show the full traceback of the error as properly formatted text (formatted as code) in the question. (3) Show the variant of the code which really causes the error. Commented Mar 8, 2023 at 21:29
  • OK Corrections made! Commented Mar 8, 2023 at 22:42
  • The function that should be called in the worker processes of the pool (here: compile) must be available on module level and can't be a function nested in another function. Commented Mar 9, 2023 at 5:10

1 Answer 1

0

I'm not sure if you actually have your imports and other functions nested inside def main() - or if that's a code formatting issue?

You need to format your code to something like:

import glob
import pandas as pd
import numpy as np
from   multiprocessing import Pool

def compile(file):
    df = pd.read_csv(file)
    df = df.loc[df['UnderlyingSymbol'] == 'BABA'] 

    def market_delta():
        return np.sum(df['Delta'] * df['OpenInterest']) / np.sum(df['OpenInterest'])
    df['DMarket'] = market_delta().round(4)

    return df

def main():
    path = '/Users/DataFiles'
    file_list = glob.glob(path + '/*.csv')

    with Pool() as pool:
        results_df = pd.concat(pool.map(compile, file_list))

    print(results_df)
    results_df.to_csv('scholes expanded.csv', index = False)

if __name__ == '__main__': 
    main()
Sign up to request clarification or add additional context in comments.

1 Comment

Thank you so much!! It worked great! Thank you thank you! I would upvote but im too new to the site :)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.