0

I am learning multiprocessing in Python and am trying to incorporate a worker pool for managing downloads. I have deduced my queue issue down to something with OOP, but I don't know what it is. The following is my unit test to verify functionality. The functional code works as expected even with simulated, delayed jobs. However, the same translation into OOP does not exhibit the same results.

Question:

Why does the OOP incorporation return nothing out of the message queue for the process pool?

Problem:

Methods to incorporate multiprocessing pool queues in OOP do no behave the same as functional incorporations.

Working Functional Code:

import multiprocessing

def worker(name, que):
    que.put("%d is done" % name)
    
if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    m = multiprocessing.Manager()
    q = m.Queue(maxsize=10)
    
    pool.apply_async(worker, (33, q))
    pool.apply_async(worker, (40, q))
    pool.apply_async(worker, (27, q))
    
    while True:
        try:
            print(q.get(False))
        except:
            pass

Output:

27 is done
33 is done
40 is done

Not Working OOP Code:

import multiprocessing

class Workers:
    def __init__(self):
        self.pool = multiprocessing.Pool(processes=3)
        self.m = multiprocessing.Manager()
        self.q = self.m.Queue()
        
        self.pool.apply_async(self.worker, (33, self.q))
        self.pool.apply_async(self.worker, (40, self.q))
        self.pool.apply_async(self.worker, (27, self.q))

        while True:
            try:
                print(self.q.get(False))
            except:
                pass

    def worker(self, name, que):
        que.put("%d is done" % name)


if __name__ == "__main__":
    w = Workers()

Output:

0

2 Answers 2

0

When you want a pool worker to execute a task, the callable for the task and all its arguments have to be pickled in the master and unpickled in the worker process.

When the callable is self.worker, pickling the method requires pickling self, and that requires pickling self.pool. But you can't pickle a multiprocessing pool. So the background thread responsible for sending tasks to workers hits an exception when trying to submit the task.

You can see the exception if you try to actually retrieve a task result. When a task submitted with apply_async fails with an exception, the AsyncResult's get method will rethrow that exception:

import multiprocessing

class Workers:
    def __init__(self):
        self.pool = multiprocessing.Pool(processes=3)
        self.m = multiprocessing.Manager()
        self.q = self.m.Queue()
        
        res = self.pool.apply_async(self.worker, (33, self.q))
        print(res.get())

    def worker(self, name, que):
        que.put("%d is done" % name)


if __name__ == "__main__":
    w = Workers()

Output:

Traceback (most recent call last):
  File "/home/.anon-afac205e9bbd49ae8b289a78/asdf.py", line 17, in <module>
    w = Workers()
  File "/home/.anon-afac205e9bbd49ae8b289a78/asdf.py", line 10, in __init__
    print(res.get())
          ~~~~~~~^^
  File "/usr/local/lib/python3.13/multiprocessing/pool.py", line 774, in get
    raise self._value
  File "/usr/local/lib/python3.13/multiprocessing/pool.py", line 540, in _handle_tasks
    put(task)
    ~~~^^^^^^
  File "/usr/local/lib/python3.13/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
                     ~~~~~~~~~~~~~~~~~~~~~^^^^^
  File "/usr/local/lib/python3.13/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
    ~~~~~~~~~~~~~~~~~~~~~~~^^^^^
  File "/usr/local/lib/python3.13/multiprocessing/pool.py", line 643, in __reduce__
    raise NotImplementedError(
          'pool objects cannot be passed between processes or pickled'
          )
Sign up to request clarification or add additional context in comments.

6 Comments

Ok. This clarifies a lot and shows how much I have to learn. My immediate thought is using pipes instead but that seems impulsive. Deeper inspection reveals my approach in using pools is wrong and I should just parent-child manage spawned processes. My next question, how is message passing with pools in OOP intended to be done, or it is not intended due to pools being an organized collection of "quiet" workers?
@TomSmith: If you want to do any message passing more complex than "parent to arbitrary worker: compute this thing please" and "worker to parent: here's the result", you probably shouldn't use a pool.
That said, while using a queue here is a little weird (it'd make more sense to use return values to communicate results, and to use the pool's own mechanisms for waiting for jobs to complete), the message passing design isn't what broke your code. What broke things was trying to pickle a method, which ended up trying to pickle the entire object along with it. Trying to use methods as tasks is very easy to get wrong.
How would even simple message passing in pool work with OOP? As you clearly defined the complex above, how would message passing otherwise be done with pools? What am I missing?
@TomSmith: "Compute this thing please" messages are sent by submitting tasks, with apply_async, imap, imap_unordered, or whatever other method makes sense for your use case. "Here's the result" messages are sent by returning the result.
|
0
  1. Your while True loop will run forever
  2. When using multiprocessing, the function you pass to another process should be be picklable, but instance methods aren't picklable because they're bound to a specific instance.

import multiprocessing


# Make worker a standalone function
def worker(name, que): 
    que.put("%d is done" % name)

class Workers:
    def __init__(self):
        self.pool = multiprocessing.Pool(processes=3)
        self.m = multiprocessing.Manager()
        self.q = self.m.Queue()
        
        self.pool.apply_async(worker, (33, self.q))  # Use the standalone function
        self.pool.apply_async(worker, (40, self.q))
        self.pool.apply_async(worker, (27, self.q))

        # Get results with a timeout to avoid infinite busy loop
        results = 0
        while results < 3:
            try:
                print(self.q.get(timeout=1))  # Add timeout
                results += 1
            except multiprocessing.TimeoutError:
                print("Waiting for results...")

if __name__ == "__main__":
    w = Workers()

If at all you want worker as an instance, make it a static method

@staticmethod
def worker(name, que): que.put("%d is done" % name)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.