3

I have a program with one main thread where I spawn a second thread that uses asyncio. Are there any tools provided to synchronize these two threads? If everything was asyncio, I could do it with its synchronization primitives, eg:

import asyncio

async def taskA(lst, evt):
    print(f'Appending 1')
    lst.append(1)
    evt.set()

async def taskB(lst, evt):
    await evt.wait()
    print('Retrieved:', lst.pop())

lst = []
evt = asyncio.Event()
asyncio.get_event_loop().run_until_complete(asyncio.gather(
    taskA(lst, evt),
    taskB(lst, evt),
))

However, this does not work with multiple threads. If I just use a threading.Event then it will block the asyncio thread. I figured out I could defer the wait to an executor:

import asyncio
import threading

def taskA(lst, evt):
    print(f'Appending 1')
    lst.append(1)
    evt.set()

async def taskB(lst, evt):
    await asyncio.get_event_loop().run_in_executor(None, evt.wait)
    print('Retrieved:', lst.pop())

def targetA(lst, evt):
    taskA(lst, evt)

def targetB(lst, evt):
    asyncio.set_event_loop(asyncio.new_event_loop())
    asyncio.get_event_loop().run_until_complete(taskB(lst, evt))

lst = []
evt = threading.Event()
threadA = threading.Thread(target=targetA, args=(lst, evt))
threadB = threading.Thread(target=targetB, args=(lst, evt))
threadA.start()
threadB.start()
threadA.join()
threadB.join()

However, having an executor thread only to wait for a mutex seems unnatural. Is this the way this is supposed to be done? Or is there any other way to wait for synchronization between OS threads asynchronously?

7
  • 1
    Await an asyncio.Event in taskB, and set it from taskA using loop.call_soon_threadsafe. Commented Nov 5, 2018 at 16:29
  • @user4815162342 That's a reasonable option. It feels a bit like I'm kind of inverting the logic, and ideally I'd like the main thread not to have to deal with the asyncio loop of the other thread directly, but yes this may work for my case I think. Commented Nov 5, 2018 at 16:33
  • @user4815162342 I solved my problem using your suggestion. Since I wanted to have "function semantics" (process this data in the main thread and return some result), I used a queue.Queue to send futures from the asyncio thread to the main thread and call_soon_threadsafe to set their results from the main thread. Feel free to post it as an answer to accept. Commented Nov 5, 2018 at 17:22
  • Using futures for this purpose is definitely the way to go - ideally taskB would create two futures, one concurrent.futures and one asyncio, then connect them, send the concurrent one to taskA, and await the asyncio one. run_in_executor implements fairly generic future chaining that could be reused for this, but I'm not sure if any of that is public. Commented Nov 5, 2018 at 19:33
  • I've now posted the answer with the original comment, but also an alternative approach which should remove the need for an explicit queue. Commented Nov 6, 2018 at 6:48

2 Answers 2

2

A simple way to synchronize an asyncio coroutine with an event coming from another thread is to await an asyncio.Event in taskB, and set it from taskA using loop.call_soon_threadsafe.

To be able to pass values and exceptions between the two, you can use futures; however then you are inventing much of run_in_executor. If the only job of taskA is to take tasks off a queue, you might as well make a single-worker "pool" and use it as your worker thread. Then you can use run_in_executor as intended:

worker = concurrent.futures.ThreadPoolExecutor(max_workers=1)

async def taskB(lst):
    loop = asyncio.get_event_loop()
    # or result = await ..., if taskA has a useful return value
    # This will also propagate exceptions raised by taskA
    await loop.run_in_executor(worker, taskA, lst)
    print('Retrieved:', lst.pop())

The semantics are the same as in your version with an explicit queue - the queue is still there, it's just inside the ThreadPoolExecutor.

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks, this is a good idea too and it's surely cleaner. In my case however I would probably have to write my own executor... The context for this is a Python script for Unreal Engine, which has a main thread that "ticks" objects on every frame, and I want to have an asyncio thread that sometimes sends stuff to be executed on the next UE "tick" (because some stuff needs to be done on the main thread). Anyway I got to solve it with your suggestions and the idea of this solution is good too.
@jdehesa Fair enough. Thanks for the interesting question!
0

The aiologic package that I created provides a wide set of synchronization primitives that don't use executor threads. You can use events:

event = aiologic.Event()
Timer(1, event.set).start()

print("before")
await event
print("after")

Or semaphores:

semaphore = aiologic.Semaphore(0)
Timer(1, semaphore.green_release).start()

print("before")
await semaphore.async_acquire()
print("after")

Or even queues, if you need them:

queue = aiologic.Queue()
Timer(1, queue.green_put, [42]).start()

print("before")
item = await queue.async_get()
print("after")

So it allows you to synchronize threads and event loops as easily as if you were using the threading module. None of its primitives blocks the event loop at any point in time, and you can even stop thinking that the event loop even exists, because there is no binding to the event loop.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.