6

I am trying to load numpy files asynchronously in a Pool:

self.pool = Pool(2, maxtasksperchild = 1)
...
nextPackage = self.pool.apply_async(loadPackages, (...))
for fi in np.arange(len(files)):
    packages = nextPackage.get(timeout=30)
    # preload the next package asynchronously. It will be available
    # by the time it is required.
    nextPackage = self.pool.apply_async(loadPackages, (...))

The method "loadPackages":

def loadPackages(... (2 strings & 2 ints) ...):
    print("This isn't printed!')
    packages = {
        "TRUE": np.load(gzip.GzipFile(path1, "r")),
        "FALSE": np.load(gzip.GzipFile(path2, "r"))
    }
    return packages

Before even the first "package" is loaded, the following error occurs:

Exception in thread Thread-8: Traceback (most recent call last):
File "C:\Users\roman\Anaconda3\envs\tsc1\lib\threading.py", line 914, in _bootstrap_inner self.run() File "C:\Users\roman\Anaconda3\envs\tsc1\lib\threading.py", line 862, in run self._target(*self._args, **self._kwargs) File "C:\Users\roman\Anaconda3\envs\tsc1\lib\multiprocessing\pool.py", line 463, in _handle_results task = get() File "C:\Users\roman\Anaconda3\envs\tsc1\lib\multiprocessing\connection.py", line 250, in recv buf = self._recv_bytes() File "C:\Users\roman\Anaconda3\envs\tsc1\lib\multiprocessing\connection.py", line 318, in _recv_bytes return self._get_more_data(ov, maxsize) File "C:\Users\roman\Anaconda3\envs\tsc1\lib\multiprocessing\connection.py", line 337, in _get_more_data assert left > 0 AssertionError

I monitor the resources closely: Memory is not an issue, I still have plenty left when the error occurs. The unzipped files are just plain multidimensional numpy arrays. Individually, using a Pool with a simpler method works, and loading the file like that works. Only in combination it fails. (All this happens in a custom keras generator. I doubt this helps but who knows.) Python 3.5.

What could the cause of this issue be? How can this error be interpreted?

Thank you for your help!

1
  • I have the same problem, also with plenty of RAM to use left. Did you find a solution for the issue? Commented Feb 12, 2018 at 22:41

2 Answers 2

10

There is a bug in Python C core code that prevents data responses bigger than 2GB return correctly to the main thread. you need to either split the data into smaller chunks as suggested in the previous answer or not use multiprocessing for this function

I reported this bug to python bugs list (https://bugs.python.org/issue34563) and created a PR (https://github.com/python/cpython/pull/9027) to fix it, but it probably will take a while to get it released (UPDATE: the fix is present in python 3.8.0+)

if you are interested you can find more details on what causes the bug in the bug description in the link I posted

Sign up to request clarification or add additional context in comments.

5 Comments

Out of interest, I am seeing this error at python 3.7.7 (via an anaconda install environment). I'm afraid I can't say what the cause is but multiprocessing is definitely involved and it seems likely to me that the response from a thread is large.
yea, unfortunately, it looks like they didn't update the old version after backport, and the fix is only available starting 3.8. I will update the answer
@Alex Although this link (bugs.python.org/issue34563) says that the bug has been fixed, I'm getting the same error on Windows 10 with Python 3.8 and 3.9 using Anaconda 4.11.0. I'm pretty sure it's the same error, since the program I'm running doesn't throw any errors when a smaller dataset is used.
I am still experiencing the issue on python 3.10
@tim654321 same
2

It think I've found a workaround by retrieving data in small chunks. In my case it was a list of lists.

I had:

for i in range(0, NUMBER_OF_THREADS):
    print('MAIN: Getting data from process ' + str(i) + ' proxy...')
    X_train.extend(ListasX[i]._getvalue())
    Y_train.extend(ListasY[i]._getvalue())
    ListasX[i] = None
    ListasY[i] = None
    gc.collect()

Changed to:

CHUNK_SIZE = 1024
for i in range(0, NUMBER_OF_THREADS):
    print('MAIN: Getting data from process ' + str(i) + ' proxy...')
    for k in range(0, len(ListasX[i]), CHUNK_SIZE):
        X_train.extend(ListasX[i][k:k+CHUNK_SIZE])
        Y_train.extend(ListasY[i][k:k+CHUNK_SIZE])
    ListasX[i] = None
    ListasY[i] = None
    gc.collect()

And now it seems to work, possibly by serializing less data at a time. So maybe if you can segment your data into smaller portions you can overcome the issue. Good luck!

3 Comments

Thank you for your answer, Francisco! No, unfortunately, I'm still lacking an explanation of what exactly happened here. I've also worked around it with a different architecture. The assert left > 0 comes from copying a pipe's content into a buffer: "If the function succeeds, the return value is nonzero". And with GetLastError one could maybe try to get a meaningful error message... Yet for the time being, I have found a way around this issue. Forgive me for not marking this answer as an answer though, since it is just a workaround.
@Doidel Of course, it was just a modest attempt to help :]
Experiencing the same above on Python 3.9. Struggling to find any resources or solutions; is this error rare?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.