3

I'm trying to evaluate a number of processes in a multiprocessing pool but keep running into errors and I can't work out why... There's a simplified version of the code below:

class Object_1():

    def add_godd_spd_column()

        def calculate_correlations(arg1, arg2, arg3):
            return {'a': 1}

        processes = {}
        pool = Pool(processes=6)
        for i in range(1, 10):
            processes[i] = pool.apply_async(calculate_correlations,
                                            args=(arg1, arg2, arg3,))

        correlations = {}
        for i in range(0, 10):
            correlations[i] = processes[i].get()

This returns the following error:

Traceback (most recent call last):
  File "./02_results.py", line 116, in <module>
    correlations[0] = processes[0].get()
  File "/opt/anaconda3/lib/python3.5/multiprocessing/pool.py", line 608, in get
    raise self._value
  File "/opt/anaconda3/lib/python3.5/multiprocessing/pool.py", line 385, in 
_handle_tasks
    put(task)
  File "/opt/anaconda3/lib/python3.5/multiprocessing/connection.py", line 206, in send
     self._send_bytes(ForkingPickler.dumps(obj))
   File "/opt/anaconda3/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'SCADA.add_good_spd_column.<locals>.calculate_correlations

When I call the following: correlations[0].successful() I get the following error:

Traceback (most recent call last):
  File "./02_results.py", line 116, in <module>
    print(processes[0].successful())
  File "/opt/anaconda3/lib/python3.5/multiprocessing/pool.py", line 595, in 
successful
    assert self.ready()
AssertionError

Is this because the process isn't actually finished before the .get() is called? The function being evaluated just returns a dictionary which should definitely be pickle-able...

Cheers,

4
  • It's hard to answer this, because your simplified code works fine. It looks like the problem is that the real calculate_correlations function is not pickleable. Where is it really defined? It seems like it's not actually a top-level function, like your example shows. Commented Feb 23, 2018 at 15:00
  • The error indicates it's probably a function nested in another function (add_good_spd_column), which pickle doesn't support. Commented Feb 23, 2018 at 15:07
  • Yes, it's all nested inside a class structure - will update question! Commented Feb 23, 2018 at 15:08
  • Is this because the process isn't actually finished before the .get() is called? No. .get() by default blocks until an item is available (from docs ) Commented Feb 23, 2018 at 15:23

1 Answer 1

3

The error is occurring because pickling a function nested in another function is not supported, and multiprocessing.Pool needs to pickle the function you pass as an argument to apply_async in order to execute it in a worker process. You have to move the function to the top level of the module, or make it an instance method of the class. Keep in mind that if you make it an instance method, the instance of the class itself must also be picklable.

And yes, the assertion error when calling successful() occurs because you're calling it before a result is ready. From the docs:

successful()

Return whether the call completed without raising an exception. Will raise AssertionError if the result is not ready.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.