4

How would you go about combining threading.RLock with threading.Semaphore? Or does such a structure already exist?

In Python, there is a primitive for a Reentrant lock, threading.RLock(N), which allows the same thread to acquire a lock multiple times, but no other threads can. There is also threading.Semaphore(N), which allows the lock to be acquired N times before blocking. How would one combine these two structures? I want up to N separate threads to be able to acquire the lock, but I'd like each individual lock on a thread to be a reentrant one.

2 Answers 2

2

So I guess a Reentrant semaphore does not exist. Here is the implementation I came up with, happy to entertain comments.

import threading
import datetime
class ReentrantSemaphore(object):
  '''A counting Semaphore which allows threads to reenter.'''
  def __init__(self, value = 1):
    self.local = threading.local()
    self.sem = threading.Semaphore(value)

  def acquire(self):
    if not getattr(self.local, 'lock_level', 0):
      # We do not yet have the lock, acquire it.
      start = datetime.datetime.utcnow()
      self.sem.acquire()
      end = datetime.datetime.utcnow()
      if end - start > datetime.timedelta(seconds = 3):
        logging.info("Took %d Sec to lock."%((end - start).total_seconds()))
      self.local.lock_time = end
      self.local.lock_level = 1
    else:
      # We already have the lock, just increment it due to the recursive call.
      self.local.lock_level += 1

  def release(self):
    if getattr(self.local, 'lock_level', 0) < 1:
      raise Exception("Trying to release a released lock.")

    self.local.lock_level -= 1
    if self.local.lock_level == 0:
      self.sem.release()

  __enter__ = acquire
  def __exit__(self, t, v, tb):
    self.release()
Sign up to request clarification or add additional context in comments.

Comments

0

Your implementation is already good. The threading.local() approach is simple and efficient, so with additional support for timeouts and non-blocking calls you would have a complete reentrant semaphore. However, I also wrote my own variant of combining these two structures, which I called aiologic.RCapacityLimiter (I'm the creator of aiologic).

import time

from concurrent.futures import ThreadPoolExecutor

from aiologic import RCapacityLimiter

limiter = RCapacityLimiter(2)


def subfunc(i):
    with limiter:
        assert limiter.borrowed_tokens <= 2

        time.sleep(0.5)

        print(f"it works! (thread #{i})")


def func(i):
    with limiter:
        subfunc(i)


with ThreadPoolExecutor(4) as executor:
    for i in range(4):
        executor.submit(func, i)

In contrast to semaphores, reentrant capacity limiters give you more information about what is happening at runtime:

  • RCapacityLimiter.waiting is the number of threads that are waiting to acquire the limiter.
  • RCapacityLimiter.available_tokens is the number of available non-blocking calls.
  • RCapacityLimiter.borrowed_tokens is the number of threads that have acquired the limiter.
  • RCapacityLimiter.total_tokens is the maximum number of threads.
  • RCapacityLimiter.borrowers is a dictionary that contains information about which thread has acquired the limiter how many times.

Along with this, you also get all the other features of the aiologic package, such as support for asynchronous libraries like asyncio.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.