In my async code, i am calling a sync library i have no control over, and that library calls my sync callback, in which i want to call async code. I have references to the loop running everything (the async code and the sync library code).
I have found a solution, by tracking creating free-flying tasks in sync code, and tracking them in my calling async code until they complete.
But i would like to know if there is a "better" way to proceed
- i tried actively waiting for task completion (which blocks and cannot work : solution 2)
- i tried waiting for task from the loop (solution 1) which i thought could work but does not
I am feeling there could be a way to wait for a task completino by "delegating" to a known loop, but i do not know if it is possible, and if so, how. run_until_complete was the obvious hint, but requires that a loop is not running in the current thread.
Sample code (3.11)
import asyncio
# outside library I have no control over
i: int = 1_000
def sync_lib_call(user_cb):
global i
i += 1 # dummy data generation
user_cb(i)
# my code
queue = asyncio.Queue()
loop = None
enqueue_tasks = []
async def put_in_queue(value: int):
print(f"enqueue: {value}")
await queue.put(value)
def my_callback(value: int):
task = loop.create_task(put_in_queue(value))
######################################################################################
## SOLUTION 1 = DOES NOT WORK: RuntimeError: This event loop is already running
## NOTE: I could be OK "awaiting" the task is complete but i cannot,
## as the user callback signature cannot be modified in the library
# loop.run_until_complete(task)
######################################################################################
## SOLUTION 2 = DOES NOT WORK: completely blocks the event queue
# while not task.done():
# time.sleep(1)
######################################################################################
## SOLUTION 3 = WORKS ... BUT ADDS TRACKING TASK MANAGEMENT
# strongly reference task to prevent weak references and task garbage collection
# https://docs.python.org/3.11/library/asyncio-task.html#asyncio.create_task
enqueue_tasks.append(task)
async def work():
global loop, enqueue_tasks
loop = asyncio.get_running_loop()
sync_lib_call(my_callback) # get data from library
enqueue_tasks = [t for t in enqueue_tasks if not t.done()] # cleanup task references
await asyncio.sleep(1) # do other stuff
async def main():
while True:
print(f"qsize: {queue.qsize()}")
print(enqueue_tasks)
await work()
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
@Booboo mentions this similar question (thanks !) : Call async code inside sync code inside async code
In that question, the first advice is to use
asyncio.to_thread()which in my case will not work well enough because part of the async code i want to call from the callback may actually spawn new long running Task too, that should still be alive when the first sync would return. Soto_threadwould (in my case) only work when the sync function and callbacks do not create new async tasks.In the same other question, the second advice is to use
run_coroutine_threadsafe()to run a coroutine that would push to the async queue and wait for completion. Something akin to the following :
import asyncio
queue = asyncio.Queue()
async def store(item: int) -> None:
await queue.put(item)
def sync_callback(some_value: int) -> None:
loop = asyncio.get_running_loop()
future = asyncio.run_coroutine_threadsafe(store(some_value), loop)
future.result() # Wait for the callback to complete
So this second advice of the other question would work indeed, and is a good alternative to my solution 3, albeit in a different way regarding to the behaviour when the queue is full :
- this other solution would block the sync code until a spot is free in the queue
- whereas my solution 3 would queue async tasks until they could later be flushed to the queue
Finally it all depends on the desired behaviour when the queue is full : drop items, wait for free spot, add new async pushing tasks to eventually put to queue.
asyncio.Queue? You can synchronously callqueue.put_nowait()if you can guarantee that you're on the same thread as the event loop.