I am trying to setup a multiprocessing Python task on Windows 10, Python 3.13. I have "main.py" module, containing the main entry, "orchestration.py" module with worker initilization and task routines, as well as some other modules not relevanet to present issue. An MRE is shown below.
If I have an exception within main_worker() (active in MRE), it is properly propagated to main.py try-except and can be handled in the main process.
However, if I have an exception in worker_init() (by uncommenting the raise and commenting out the raise in main_worker()), Python attempts to restart the process without exception propagation. In case of a persistent error, Python gets stuck in an infinite loop of process restrating.
How can I properly terminate the whole thing in such a case?
main.py
import os
import sys
import logging
import multiprocessing as mp
from multiprocessing import Pool
if __package__ is None or __package__ == "":
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from orchestration import worker_init, main_worker
else:
from .orchestration import worker_init, main_worker
# ---------------------------------------------------------------------------
# Main driver
# ---------------------------------------------------------------------------
def main():
logging.getLogger().warning(f"Running MainProcess PID-{os.getpid()}")
try:
with Pool(processes=2, initializer=worker_init) as pool:
for i, result in enumerate(pool.imap_unordered(main_worker, range(10), chunksize=10)):
if result:
data, err = result
logging.getLogger().warning(f"Worker job completed. Returned data: {data}.")
else:
logging.getLogger().warning(f"Worker job did not return any result.")
err = mp.ProcessError(f"DUMMY error {err}")
if err:
logging.getLogger().error(f"Worker job returned and error: {err}.")
pool.terminate()
break
except Exception as e:
logging.getLogger().error(f"Pool error: {e}.")
if __name__ == "__main__":
main()
orchestration.py
import os
import sys
import logging
import multiprocessing as mp
def worker_init(worker_config=None):
"""Initializer for multiprocessing.Pool workers (per process)."""
logging.getLogger().warning(f"Running worker_init PID-{os.getpid()}")
# raise mp.ProcessError(f"DUMMY PID-{os.getpid()}")
return
def main_worker(task_data=None):
"""Execute one job and return (path, meta, error)."""
raise mp.ProcessError(f"DUMMY PID-{os.getpid()}")
try:
logging.getLogger().warning(f"Running main_worker PID-{os.getpid()}")
data = os.getpid()
except Exception as e:
return None, e
return data, None