0

While using python multiprocessing pool how many jobs are submitted?

How is it decided? Can we control it somehow? Like at most 10 jobs in the queue at most to reduce memory usage.

Assume that I have the backbone code written below: For each chrom and simulation I read the data as pandas dataframe.

(I thought that reading data before submiting the job would be better, to reduce I/O bound in the worker process)

Then I send the pandas dataframe to each worker to process it.

But it seems that lots of jobs are submitted than the number of jobs finalized and this is resulting in memory error.

numofProcesses = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=numofProcesses)
jobs=[]


all_result1={}
all_result2={}

def accumulate(result):
 result1=result[0]
 result2=result[1]
 accumulate(resulst1,all_result1)
 accumulate(resulst2,all_result2)
 print('ACCUMULATE')

for each chr:
 for each sim:
     chrBased_simBased_df= readData(chr,sim)
     jobs.append(pool.apply_async(func, args=(chrBased_simBased_df,too,many,),callback=accumulate))
     print('Submitted job:%d' %(len(jobs)))

pool.close()
pool.join()

Is there a way to get rid of it?

1 Answer 1

2

Both multiprocessing.Pool and concurrent.futures.ProcessPoolExecutor do not allow to limit the amount of tasks you submit to the workers.

Nevertheless, this a very trivial extension you can build yourself by using a Semaphore.

You can check an example in this gist. It uses the concurrent.futures module but it should be trivial to port it to multiprocessing.Pool as well.

from threading import BoundedSemaphore
from concurrent.futures import ProcessPoolExecutor


class MaxQueuePool:
    """This Class wraps a concurrent.futures.Executor
    limiting the size of its task queue.
    If `max_queue_size` tasks are submitted, the next call to submit will block
    until a previously submitted one is completed.
    """
    def __init__(self, executor, max_queue_size, max_workers=None):
        self.pool = executor(max_workers=max_workers)
        self.pool_queue = BoundedSemaphore(max_queue_size)

    def submit(self, function, *args, **kwargs):
        """Submits a new task to the pool, blocks if Pool queue is full."""
        self.pool_queue.acquire()

        future = self.pool.submit(function, *args, **kwargs)
        future.add_done_callback(self.pool_queue_callback)

        return future

    def pool_queue_callback(self, _):
        """Called once task is done, releases one queue slot."""
        self.pool_queue.release()


if __name__ == '__main__':
    pool = MaxQueuePool(ProcessPoolExecutor, 8)
    f = pool.submit(print, "Hello World!")
    f.result()
Sign up to request clarification or add additional context in comments.

1 Comment

Thanks for providing a solution for concurrent.futures. Unfortunately, I do not know how to do the same thing for multiprocessing.pool.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.