0

I am trying to setup a multiprocessing Python task on Windows 10, Python 3.13. I have "main.py" module, containing the main entry, "orchestration.py" module with worker initilization and task routines, as well as some other modules not relevanet to present issue. An MRE is shown below.

If I have an exception within main_worker() (active in MRE), it is properly propagated to main.py try-except and can be handled in the main process.

However, if I have an exception in worker_init() (by uncommenting the raise and commenting out the raise in main_worker()), Python attempts to restart the process without exception propagation. In case of a persistent error, Python gets stuck in an infinite loop of process restrating.

How can I properly terminate the whole thing in such a case?

main.py

import os
import sys
import logging
import multiprocessing as mp
from multiprocessing import Pool


if __package__ is None or __package__ == "":
    sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
    from orchestration import worker_init, main_worker
else:
    from .orchestration import worker_init, main_worker


# ---------------------------------------------------------------------------
# Main driver
# ---------------------------------------------------------------------------
def main():
    logging.getLogger().warning(f"Running MainProcess PID-{os.getpid()}")
    try:
        with Pool(processes=2, initializer=worker_init) as pool:
            for i, result in enumerate(pool.imap_unordered(main_worker, range(10), chunksize=10)):
                if result:
                    data, err = result
                    logging.getLogger().warning(f"Worker job completed. Returned data: {data}.")
                else:
                    logging.getLogger().warning(f"Worker job did not return any result.")
                    err = mp.ProcessError(f"DUMMY error {err}")
                if err:
                    logging.getLogger().error(f"Worker job returned and error: {err}.")
                    pool.terminate()
                    break
    except Exception as e:
        logging.getLogger().error(f"Pool error: {e}.")


if __name__ == "__main__":
    main()

orchestration.py

import os
import sys
import logging
import multiprocessing as mp


def worker_init(worker_config=None):
    """Initializer for multiprocessing.Pool workers (per process)."""
    logging.getLogger().warning(f"Running worker_init PID-{os.getpid()}")
    # raise mp.ProcessError(f"DUMMY PID-{os.getpid()}")
    return


def main_worker(task_data=None):
    """Execute one job and return (path, meta, error)."""
    raise mp.ProcessError(f"DUMMY PID-{os.getpid()}")
    try:
        logging.getLogger().warning(f"Running main_worker PID-{os.getpid()}")
        data = os.getpid()
    except Exception as e:
        return None, e

    return data, None

1 Answer 1

1

It appears that it is essential to let initilization routine in

with Pool(processes=2, initializer=worker_init) as pool:

complete normally. To propagate the error properly, the code inside worker_init() (or just potential error sources) needs to be wrapped in try-except without reraising the exception. The except block code must store the error in a module-level variable and exit normally. Then the main_worker() routine must first check if the error is set, and if so, reraise it:

import os
import sys
import logging
import multiprocessing as mp


_init_exception: Exception = None


def worker_init(worker_config=None):
    """Initializer for multiprocessing.Pool workers (per process)."""
    logging.getLogger().warning(f"Running worker_init PID-{os.getpid()}")
    try:
        raise mp.ProcessError(f"worker_init - DUMMY PID-{os.getpid()}")
    except Exception as e:
        global _init_exception
        _init_exception = e

    return


def main_worker(task_data=None):
    """Execute one job and return (path, meta, error)."""
    global _init_exception
    if _init_exception:
        raise _init_exception

    try:
        logging.getLogger().warning(f"Running main_worker PID-{os.getpid()}")
        data = os.getpid()
    except Exception as e:
        return None, e

    return data, None
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.