2

I have an event loop running in a separate thread from the main thread, and I would like to add a background task to this event loop from the main thread using the asyncio.create_task function.

I am able to achieve this using the asyncio.run_coroutine_threadsafe function, but that ends up returning a concurrent.futures._base.Future object as opposed to the asyncio.Task object that the asyncio.create_task function returns.

I would really like to have a Task object returned as I'm making use of the "name" attribute of the Task object in my application logic. Is there any way to implement this?

3
  • Wouldn't you just call asyncio.run() on the task? Commented May 29 at 7:40
  • @UlrichEckhardt unfortunately asyncio.run() is a blocking function and I need to have the tasks run in the background on a separate thread without blocking the main thread Commented May 29 at 19:41
  • Well, then run asyncio.run() on a separate thread. Happy you found a different solution though! Commented May 30 at 8:33

3 Answers 3

2
  1. We first create a daemon thread in which a new event loop runs.
  2. Function submit runs an arbitrary coroutine in the event loop/thread created in step 1. If argument return_result is True (the default), submit waits for the coroutine to complete and returns its result. Otherwise, submit return a Future and the caller can get the coroutine's result by executing the future's result method. This function is used by the functions described next.
  3. Function create_task creates a task in the daemon thread and returns that task.
  4. The returned task can be awaited and the task's result return by calling await_task with the return_result argument set to True (the default). This will block the main thread until the result can be returned. Alternatively, await_task can be called with return_result=False, in which case a Future will be returned whose result can be obtained later.
import asyncio
import threading

_event_loop = asyncio.new_event_loop()
threading.Thread(target=_event_loop.run_forever, name="Async Runner", daemon=True).start()

def submit(coro, return_result=True):
    """Run a coroutine in the "other thread". If return_result is True,
    we return the result. Otherwise we return the future."""
    future = asyncio.run_coroutine_threadsafe(coro, _event_loop)
    return future.result() if return_result else future

def create_task(coro):
    """Creates a task in the other thread and return the task."""
    async def task_creator():
        return asyncio.create_task(coro)

    return submit(task_creator())

def await_task(task, return_result=True):
    """Submit to the return_result thread a coroutine to await the
    passed thread. If wait is True, we wait for the task to complete
    and return the result. Otherwise, we return to the caller a future
    whose result can be obtained when the caller wants."""
    async def waiter():
        return await task

    return submit(waiter(), return_result=return_result)


if __name__ == '__main__':
    async def some_coro():
        await asyncio.sleep(1)
        return 'Done'

    async def main():
        task = create_task(some_coro())

        # We choose to have a future returned instead
        # of the actual result from executing the task:
        future = await_task(task, return_result=False)
        ... # Do other work
        print(future.result())

    asyncio.run(main())

Prints:

Done

Update

Here is another implementation where waiting for the submitted task to complete does not block the main thread by using asyncio.to_thread. Now all functions are async:

import asyncio
import threading

_event_loop = asyncio.new_event_loop()
threading.Thread(target=_event_loop.run_forever, name="Async Runner", daemon=True).start()

async def submit(coro):
    """Run a coroutine in the "other thread and await its result"""
    def submitter():
        future = asyncio.run_coroutine_threadsafe(coro, _event_loop)
        return future.result()

    return await asyncio.to_thread(submitter)

async def create_task(coro):
    """Creates a task in the other thread and return the task."""
    async def task_creator():
        return asyncio.create_task(coro)

    return await submit(task_creator())

async def await_task(task):
    """Await the task running in the other thread and return its result."""
    async def waiter():
        return await task

    return await submit(waiter())

if __name__ == '__main__':
    async def some_coro():
        await asyncio.sleep(1)
        return 'Done'

    async def main():
        task = await create_task(some_coro())
        ... # Do other work
        print(await await_task(task))

    asyncio.run(main())
Sign up to request clarification or add additional context in comments.

1 Comment

I have updated the answer with a new version that uses method asyncio.to_thread so that the main frame is never blocked waiting for a submitted coroutine to complete.
2

It can be done in two steps. The first call returns an auxilliary concurrent future (future1 in the program) that will quickly give you the task reference and the real future (future2) to wait for.

Note: Python 3.12+ is required or else you have to change the way the loop is passed between threads.

import asyncio
from threading import Thread

stop = asyncio.Event()

async def async_main():
    await stop.wait()

def async_start(loop):
    asyncio.run(async_main(), loop_factory=lambda: loop)

async def async_stop():
    stop.set()

async def test_task(n):
    for i in range(n):
        print(i)
        await asyncio.sleep(0.2)
    return "test result"

async def new_task(coro):
    task = asyncio.create_task(coro)
    async def waiter():
        return await task
    return (
        task,
        asyncio.run_coroutine_threadsafe(
            waiter(), asyncio.get_running_loop()))

def main():
    loop = asyncio.new_event_loop()
    def submit(coro):
        return asyncio.run_coroutine_threadsafe(coro, loop)

    thread = Thread(target=async_start, args=(loop,))
    thread.start()
    future1 = submit(new_task(test_task(5)))
    task, future2 = future1.result()
    print(f"{task.get_name()=}")
    result = future2.result()
    print(f"{result=}")
    submit(async_stop())
    thread.join()

if __name__ == "__main__":
    main()

1 Comment

I think I'm leaning towards @Booboo's implementation, but this also works really well!!
1

You can wrap the coroutine to return a task. Thus the first future will return the task (note: this task is bound to the loop and thus cannot be run in a different loop). Then you can simply make use of another asyncio.run_coroutine_threadsafe to run the actual coroutine with the original loop and get the desired result.

A sample program to demonstrate the same:

import asyncio
import threading

def start_event_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def simpleTask():
    await asyncio.sleep(1)
    print("Task done")
    return True

async def createTask():
    return asyncio.create_task(simpleTask(), name="BackgroundTask")

async def runTask(coro):
    return await coro

loop = asyncio.new_event_loop() # the common event loop 
thread = threading.Thread(target=start_event_loop, args=(loop,), daemon=True)
thread.start()

future = asyncio.run_coroutine_threadsafe(createTask(), loop)

task = future.result()

print(f"Got Task Type: {type(task)}, Name: {task.get_name()}")

future_1 = asyncio.run_coroutine_threadsafe(runTask(task), loop)

task2 = future_1.result()

print(f"Got: {task2}")

Output:

Got Task Type: <class '_asyncio.Task'>, Name: BackgroundTask
Task done
Got: True

2 Comments

Great idea to wrap the create_task function within a coroutine and run it through the run_couroutine_threadsafe_ function, but I'm not sure that the second run_coroutine_threadsafe function is necessary since the created task from the initial future was created to run on the thread with the running event loop
Nevermind, I see that the second run_couritine_threadsafe function is mainly to allow awaiting the task from a non-async function on the main thread. Thanks!!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.