0

I am trying to use multiprocessing for a task that is very slow when done as a single process. As you can see in the code below, each process is supposed to return some results (return_dict). I initially tested this code with a 10K row dataset (the data stored in the docs.txt file, about 70mb), and the code ran as expected. However, when I used the script for the full data set (approx. 5.6gb), I got an AssertionError as shown at the bottom of my question. I wonder if anyone knows what may have caused it and how I may be able to avoid it. Thanks.

from multiprocessing import Process, Manager
import os, io, numpy
from gensim.models.doc2vec import Doc2Vec

def worker(i, data, return_dict):
    model = Doc2Vec.load("D:\\Project1\\doc2vec_model_DM_20180814.model")
    results = numpy.zeros((len(data), model.vector_size))
    for id, doc in enumerate(data):
        results[id,:] = model.infer_vector(doc, alpha = 0.01, steps = 100)
    return_dict[i] = results

if __name__ == '__main__':
    import time
    a  = time.time()
    path = "D:\\Project1\\docs.txt"    # <<=== data stored in this file
    data = []
    manager = Manager()
    jobs = []
    return_dict = manager.dict()

    with io.open(path, "r+", encoding = "utf-8") as datafile:
        for id, row in enumerate(datafile):
            row = row.strip().split('\t')[0].split()
            data.append(row)

    step = numpy.floor(len(data)/20)
    intervals = numpy.arange(0, len(data), step = int(step)).tolist()
    intervals.append(len(data))

    for i in range(len(intervals) - 1):
        p = Process(target=worker, args=(i, data[intervals[i]:intervals[i+1]], return_dict))
        jobs.append(p)
        p.start()
    for proc in jobs:
        proc.join()

    results = numpy.zeros((len(data), 1000))
    start = 0
    end = 0
    for _, result in return_dict.items():    #<<===Where error happens
        end = end + result.shape[0]
        results[start:end,:] = result[:,:]
        start = end

    print(time.time() - a)

Error message:

Traceback (most recent call last):
  File "D:\Project1\multiprocessing_test.py", line 43, in <module>
    for _, result in return_dict.items():
  File "<string>", line 2, in items
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\managers.py", line 757, in _callmethod
    kind, result = conn.recv()
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 318, in _recv_bytes
    return self._get_more_data(ov, maxsize)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 337, in _get_more_data
    assert left > 0
AssertionError
4
  • How about your memory usage during the whole process? The memory usage should between 2*5.6gb and 3*5.6gb including virtual memory. Commented Aug 17, 2018 at 14:00
  • memory might be an issue? The result_dict contains 20 elements, each element is a numpy array in the size of approximately 425k row and 1000 columns of float values. My computer has 256 GB ram, during the actual multiprocessing phrase, the computer had 50% of the ram free. I was at my computer when the error happened, so I don’t know what the memory usage was like then. Commented Aug 17, 2018 at 14:18
  • Well, as you have such large ram, I guess it won't be a memory issue. Basically, it looks like an issue that your child processes don't really transport the whole data to your main process. It's weired as I think return_dict[i] = results should be a blocking operation. I cannot debug your code so I cannot figure out the root cause. An alternative solution can be using concurrent.futures.ProcessPoolExecutor as it allows you to get the return value of target function: future = executor.submit(args) & result = future.result() Commented Aug 17, 2018 at 14:37
  • Possible duplicate of Python multiprocessing apply_async "assert left > 0" AssertionError Commented Sep 2, 2018 at 21:03

2 Answers 2

0

I guess you are using all your available memory. dict.items() creates a copy of your dict filled with all your items and using a lot of memory. Better just use dict.iteritems() to iterate over your result.

Edit: Sorry, I didn't notice the python-3 tag at first. In Python3, dict.items()doesn't return a copy anymore and should be ok to use.

The relevant code of connection.py in multiprocessing is

left = _winapi.PeekNamedPipe(self._handle)[1]
assert left > 0

Are you using windows? So I guess it is some windows related problem, it seems that PeekNamedPipe returns 0.

Sign up to request clarification or add additional context in comments.

1 Comment

Yes you can just use items(). Python 3 items() ist Python 2 iteritems().
0

This is my case and solution.Hope this can help! I have a function to process named as "func"

partial_func = partial(func,a=params1,b=params2)
for i, _ in enumerate(pool.imap(partial_func, [1]))):
    pass

the root cause is the params1 and params2 that i pass to the "partial_func" is too large.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.