1

Im trying to do things concurrently in my program and to throttle the number of processes opened at the same time (10).

from multiprocessing import Process
from threading import BoundedSemaphore

semaphore = BoundedSemaphore(10)
for x in xrange(100000):
  semaphore.acquire(blocking=True)
  print 'new'
  p = Process(target=f, args=(x,))
  p.start()

def f(x):
  ...  # do some work
  semaphore.release()
  print 'done'

The first 10 processes are launched and they end correctly (I see 10 "new" and "done" on the console), and then nothing. I don't see another "new", the program just hangs there (and Ctrl-C doesn't work either). What's wrong ?

5
  • Why not just use multiprocessing.Pool? Commented Mar 23, 2014 at 20:26
  • What makes you think threading.BoundedSemaphore's release will be seen across process boundaries? Commented Mar 23, 2014 at 20:29
  • @Thanatos semaphore is a global object and acquire is supposed to be blocking as said in the doc ; am I missing something ? Commented Mar 23, 2014 at 20:32
  • @g.d.d.c Noted, I tried it and Im pretty happy with it, but I am still interested to know why this doesn't work Commented Mar 23, 2014 at 20:35
  • possible duplicate of BoundedSemaphore hangs in threads on KeyboardInterrupt Commented Mar 23, 2014 at 20:36

2 Answers 2

2

Your problem is the use of threading.BoundedSemaphore across process boundaries:

import threading
import multiprocessing
import time

semaphore = threading.BoundedSemaphore(10)


def f(x):
  semaphore.release()
  print('done')


semaphore.acquire(blocking=True)
print('new')
print(semaphore._value)
p = multiprocessing.Process(target=f, args=(100,))
p.start()
time.sleep(3)
print(semaphore._value)

When you create a new process, the child gets a copy of the parent process's memory. Thus the child is decrementing it's semaphore, and the semaphore in the parent is untouched. (Typically, processes are isolated from each other: it takes some extra work to communicate across processes; this is what multiprocessing is for.)

This is opposed to threads, where the two threads share the memory space, and are considered the same process.

multiprocessing.BoundedSemaphore is probably what you want. (If you replace threading.BoundedSemaphore with it, and replace semaphore._value with semaphore.get_value()`, you'll see the above's output change.)

Sign up to request clarification or add additional context in comments.

Comments

0

Your bounded semaphore is not shared properly between the various processes which are being spawned; you might want to switch to using multiprocessing.BoundedSemaphore. See the answers to this question for some more details.

1 Comment

I saw this question which seemed different in the first place. Now with the answer I see that he used threading. instead of multiprocessing. too which is why he asked the question

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.