I am trying to get the basics of multiprocessing in python. I have a quite complex routine that takes a large array (c.a 1Gb) and a double as inputs and returns a double. The large array is not going to be modified by the routine. My objective is to run this routine over the same large array but varying the double parameter (assume c.a 20 possible values) to find an optimal value. So far I managed to compile the routine with @jit decorator, obtaining a consistent speed in improvement. What I would like is to parallelize the call of this function over each trial value for the double parameter 1) without recompiling the function on each process 2) sharing the large data array among each processes.
Here below there is a code snippet that mimics this case (I replaced the target function with another one for illustrative purposes) and also replicates the issues I am encountering:
import numpy as np
from numba import jit
import multiprocessing as mp
import time
import os
def test_func(arg_vec, power):
m = arg_vec.shape[0]
int_power = int(power)
out = 0
for i in range(m):
for j in range(int_power):
out += arg_vec[i]
return (out / m)
def worker(shared_array, power, result_queue):
result = test_func(shared_array, power)
result_queue.put(result)
@jit(nopython=True)
def test_func_jitted(arg_vec, power):
m = arg_vec.shape[0]
int_power = int(power)
out = 0
for i in range(m):
for j in range(int_power):
out += arg_vec[i]
return (out / m)
def worker_jitted(shared_array, power, result_queue):
result = test_func_jitted(shared_array, power)
result_queue.put(result)
if __name__ == "__main__":
#create large np.rand array
dim = int(1.0e7)
np.random.seed(0)
rand_arg = np.random.rand(dim)
n_trials = 20
rand_powers = np.random.randint(1, high=5, size=n_trials)
res_all = {}
res = np.zeros((n_trials,2))
# 0: looping
for i in range(n_trials):
ts = time.time()
res[i,0] = test_func(rand_arg, rand_powers[i])
te = time.time()
res[i,1] = te - ts
res_all[0] = res.copy()
print("0 (simple loop): Total Exec time = {t}, first exec time = {first}, last exec time {last} ".format(t=res[:,1].sum(), first = res[0,1], last = res[-1,1]))
res = np.zeros((n_trials,2))
# 1: looping on jit
for i in range(n_trials):
ts = time.time()
res[i,0] = test_func_jitted(rand_arg, rand_powers[i])
te = time.time()
res[i,1] = te - ts
res_all[1] = res.copy()
print("1 (simple loop on jitted function): Total Exec time = {t}, first exec time = {first}, last exec time {last} ".format(t=res[:,1].sum(), first = res[0,1], last = res[-1,1]))
key = 1
print("Coherence check on test case {key}: {res}".format(key = key, res = (res_all[key][:,0]== res_all[0][:,0]).all()))
res = np.zeros((n_trials,2))
# 2: multiprocess on plain function, no memory sharing
cpu = os.cpu_count()
processes = []
ts = time.time()
result_queue = mp.Queue()
for i in range(n_trials):
p = mp.Process(target = worker, args = (rand_arg, rand_powers[i], result_queue))
processes.append(p)
p.start()
for i,_ in enumerate(processes):
res[i,0] = (result_queue.get())
for p in processes:
p.join()
te = time.time()
res[:,1] = (te-ts) / n_trials
res_all[2] = res.copy()
print("2 (multiprocessing on simple function, no memory sharing): Total Exec time = {t}".format(t=te-ts))
key = 2
print("Coherence check on test case {key}: {res}".format(key = key, res = (res_all[key][:,0]== res_all[0][:,0]).all()))
res = np.zeros((n_trials,2))
# 3: multiprocess on jitted function, no memory sharing
cpu = os.cpu_count()
processes = []
ts = time.time()
result_queue = mp.Queue()
for i in range(n_trials):
p = mp.Process(target = worker_jitted, args = (rand_arg, rand_powers[i], result_queue))
processes.append(p)
p.start()
for i,_ in enumerate(processes):
res[i,0] = (result_queue.get())
for p in processes:
p.join()
te = time.time()
res[:,1] = (te-ts) / n_trials
res_all[3] = res.copy()
print("3 (multiprocessing on jitted function, no memory sharing): Total Exec time = {t}".format(t=te-ts))
key = 3
print("Coherence check on test case {key}: {res}".format(key = key, res = (res_all[key][:,0]== res_all[0][:,0]).all()))
res = np.zeros((n_trials,2))
# 4: multiprocess on plain function, memory sharing
cpu = os.cpu_count()
processes = []
ts = time.time()
shared_array = mp.Array('d', rand_arg, lock=False) # non ci scrivi sopra --> lock = False
result_queue = mp.Queue()
for i in range(n_trials):
p = mp.Process(target = worker, args = (shared_array, rand_powers[i], result_queue))
processes.append(p)
p.start()
for i,_ in enumerate(processes):
res[i,0] = (result_queue.get())
for p in processes:
p.join()
te = time.time()
res[:,1] = (te-ts) / n_trials
res_all[2] = res.copy()
print("4 (multiprocessing on simple function, no memory sharing): Total Exec time = {t}".format(t=te-ts))
key = 4
print("Coherence check on test case {key}: {res}".format(key = key, res = (res_all[key][:,0]== res_all[0][:,0]).all()))
The above code compares 4 cases: 0) looping over the function, no multiprocess, shared memory and jit
- looping over the jitted function, no multiprocess, shared memory
- multiprocess on the function, no shared memory
- multiprocess on the jitted function, no shared memory
- multiprocess on the function, shared memory
Issues are so far case 1) OK, the output is coherent with 0), quicker than 0) and first function call is slower than last due to precompile header case 2) KO, the output is not coherent with 0), however it is quicker than 0) case 3) OK, the output is coherent with 0) and is quicker than 0) case 4) KO, the code hangs ... this is the output that is produced
0 (simple loop): Total Exec time = 81.29447627067566, first exec time = 4.21647834777832, last exec time 3.0243942737579346
1 (simple loop on jitted function): Total Exec time = 0.8534014225006104, first exec time = 0.35982656478881836, last exec time 0.020049095153808594 Coherence check on test case 1: True
2 (multiprocessing on simple function, no memory sharing): Total Exec time = 22.989959001541138 Coherence check on test case 2: False
3 (multiprocessing on jitted function, no memory sharing): Total Exec time = 9.260219097137451 Coherence check on test case 3: True
I cannot understand why 2 fails and why 4 hangs. Any help on this?
EDIT
After @Paul and @ken comments I understood the reason for the fail in 2 and managed to make 4 working modifying the above in
shared_array = mp.Array('d', rand_arg, lock=False)
nparray = np.frombuffer(shared_array, dtype=ctypes.c_double)
result_queue = mp.Queue()
for i in range(n_trials):
p = mp.Process(target = worker, args = (nparray, rand_powers[i], result_queue))
processes.append(p)
p.start()
Following @ken suggestion, I added a test case relying on numba parallel, importing prange from numba: this is the function
@jit(nopython=True, parallel = True)
def loop_func_jitted(arg_vec, powers):
m = powers.shape[0]
out_res = np.empty_like(powers)
for i in prange(m):
out_res[i] = test_func_jitted(arg_vec, powers[i])
return out_res
indeed it works nicely and does not mix multiprocessing and numba as hinted by @Jerome. Considering this last example, I would like to understand a bit better what happens when using numba parallel:
- Is
arg_veccopied on each process? This is what I want to avoid the most, because the argument is huge and the time spent at copying itmtimes across each process will be significant (if it ever fits in the RAM of my machine) - Is
test_func_jittedrecompiled across each process? This would be less painful in my case I guess, because the precompile overhead is most likely significantly smaller than the execution time of a single call.
multiprocessing.Arrayhas noshapeproperty, all workers crash before producing a result. But the main process is waiting for results to be added to the queue, so it hangs. By the way, why don't you use numba's parallelization feature? It should be much easier and faster than the other options if you already make your routine a jit function.for j in range(int_power)loop is nearly-useless : you can just do a multiplication. Compilers applies smart optimizations so they might remove the loop with a simple FMA operation (much faster). This is especially true with fast-math flag (which is not provided).