Your implementation is already good. The threading.local() approach is simple and efficient, so with additional support for timeouts and non-blocking calls you would have a complete reentrant semaphore. However, I also wrote my own variant of combining these two structures, which I called aiologic.RCapacityLimiter (I'm the creator of aiologic).
import time
from concurrent.futures import ThreadPoolExecutor
from aiologic import RCapacityLimiter
limiter = RCapacityLimiter(2)
def subfunc(i):
with limiter:
assert limiter.borrowed_tokens <= 2
time.sleep(0.5)
print(f"it works! (thread #{i})")
def func(i):
with limiter:
subfunc(i)
with ThreadPoolExecutor(4) as executor:
for i in range(4):
executor.submit(func, i)
In contrast to semaphores, reentrant capacity limiters give you more information about what is happening at runtime:
RCapacityLimiter.waiting is the number of threads that are waiting to acquire the limiter.
RCapacityLimiter.available_tokens is the number of available non-blocking calls.
RCapacityLimiter.borrowed_tokens is the number of threads that have acquired the limiter.
RCapacityLimiter.total_tokens is the maximum number of threads.
RCapacityLimiter.borrowers is a dictionary that contains information about which thread has acquired the limiter how many times.
Along with this, you also get all the other features of the aiologic package, such as support for asynchronous libraries like asyncio.