0

In my async code, i am calling a sync library i have no control over, and that library calls my sync callback, in which i want to call async code. I have references to the loop running everything (the async code and the sync library code).

I have found a solution, by tracking creating free-flying tasks in sync code, and tracking them in my calling async code until they complete.

But i would like to know if there is a "better" way to proceed

  • i tried actively waiting for task completion (which blocks and cannot work : solution 2)
  • i tried waiting for task from the loop (solution 1) which i thought could work but does not

I am feeling there could be a way to wait for a task completino by "delegating" to a known loop, but i do not know if it is possible, and if so, how. run_until_complete was the obvious hint, but requires that a loop is not running in the current thread.

Sample code (3.11)

import asyncio

# outside library I have no control over
i: int = 1_000
def sync_lib_call(user_cb):
    global i
    i += 1  # dummy data generation
    user_cb(i)

# my code
queue = asyncio.Queue()
loop = None
enqueue_tasks = []

async def put_in_queue(value: int):
    print(f"enqueue: {value}")
    await queue.put(value)

def my_callback(value: int):
    task = loop.create_task(put_in_queue(value))
    ######################################################################################
    ## SOLUTION 1 = DOES NOT WORK: RuntimeError: This event loop is already running
    ## NOTE: I could be OK "awaiting" the task is complete but i cannot,
    ## as the user callback signature cannot be modified in the library
    # loop.run_until_complete(task)
    ######################################################################################
    ## SOLUTION 2 = DOES NOT WORK: completely blocks the event queue
    # while not task.done():
    #     time.sleep(1)
    ######################################################################################
    ## SOLUTION 3 = WORKS ... BUT ADDS TRACKING TASK MANAGEMENT
    # strongly reference task to prevent weak references and task garbage collection
    # https://docs.python.org/3.11/library/asyncio-task.html#asyncio.create_task
    enqueue_tasks.append(task)

async def work():
    global loop, enqueue_tasks
    loop = asyncio.get_running_loop()
    sync_lib_call(my_callback) # get data from library
    enqueue_tasks = [t for t in enqueue_tasks if not t.done()] # cleanup task references
    await asyncio.sleep(1) # do other stuff

async def main():
    while True:
        print(f"qsize: {queue.qsize()}")
        print(enqueue_tasks)
        await work()

try:
    asyncio.run(main())
except KeyboardInterrupt:
    pass

@Booboo mentions this similar question (thanks !) : Call async code inside sync code inside async code

  1. In that question, the first advice is to use asyncio.to_thread() which in my case will not work well enough because part of the async code i want to call from the callback may actually spawn new long running Task too, that should still be alive when the first sync would return. So to_thread would (in my case) only work when the sync function and callbacks do not create new async tasks.

  2. In the same other question, the second advice is to use run_coroutine_threadsafe() to run a coroutine that would push to the async queue and wait for completion. Something akin to the following :

import asyncio

queue = asyncio.Queue()

async def store(item: int) -> None:
    await queue.put(item)

def sync_callback(some_value: int) -> None:
    loop = asyncio.get_running_loop()
    future = asyncio.run_coroutine_threadsafe(store(some_value), loop)
    future.result()  # Wait for the callback to complete

So this second advice of the other question would work indeed, and is a good alternative to my solution 3, albeit in a different way regarding to the behaviour when the queue is full :

  • this other solution would block the sync code until a spot is free in the queue
  • whereas my solution 3 would queue async tasks until they could later be flushed to the queue

Finally it all depends on the desired behaviour when the queue is full : drop items, wait for free spot, add new async pushing tasks to eventually put to queue.

7
  • what was the initial goal ? why asyncio future does't satisfy your requirement ? Commented Jun 22 at 13:15
  • Is the task literally just putting something into an asyncio.Queue? You can synchronously call queue.put_nowait() if you can guarantee that you're on the same thread as the event loop. Commented Jun 22 at 13:18
  • @DavidMaze thanks, i accepted your comment and posted a solution Commented Jun 22 at 15:13
  • @bruno futures were satisfying, and working. But i since realized that using futures to compensate for an overflowing queue was not a good idea, so i switched gears on that point, as posted in accepted answer. Commented Jun 22 at 15:14
  • @Booboo that other answer was great, and one half of it i could use, so i'll amend my question, to mention it. thanks Commented Jun 22 at 19:32

1 Answer 1

0

Thanks to @DavidMaze i simply used queue.put_nowait() as i run on the same thread.

In hindsight :

  • I was using free-flying tasks to handle the case when the queue was full (the queue size was a user-provided setting, and not losing samples.
  • Doing this kind of queue overflow data-buffering via tasks (beyond the requested queue size) defeated the purpose of the user-provided queue size

So by using put_nowait() I handled QueueFull exception by logging/counting/dropping, which makes more sense when the user has requested a bound queue.

try:
    self._receive_queue.put_nowait(message)
except asyncio.QueueFull:
    logging.warning(f"Receive message queue is full, dropping message {message}")
    self._dropped_messages_because_receive_queue_was_full += 1
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.