8
import queue

qq = queue.Queue()
qq.put('hi')

class MyApp():

    def __init__(self, q):
        self._queue = q

    def _process_item(self, item):
        print(f'Processing this item: {item}')

    def get_item(self):
        try:
            item = self._queue.get_nowait()
            self._process_item(item)
        except queue.Empty:
            pass

    async def listen_for_orders(self):  
        '''
        Asynchronously check the orders queue for new incoming orders
        '''
        while True:
            self.get_item()
            await asyncio.sleep(0)      

a = MyApp(qq)

loop = asyncio.get_event_loop()

loop.run_until_complete(a.listen_for_orders())

Using Python 3.6.

I'm trying to write an event handler that constantly listens for messages in the queue, and processes them (prints them in this case). But it must be asynchronous - I need to be able to run it in a terminal (IPython) and manually feed things to the queue (at least initially, for testing).

This code does not work - it blocks forever.

How do I make this run forever but return control after each iteration of the while loop?

Thanks.

side note: To make the event loop work with IPython (version 7.2), I'm using this code from the ib_insync library, I'm using this library for the real-world problem in the example above.

4
  • having a while True is bad in general, what you should do instead is when you add something to the queue (using a method) you should call another method to take an element from the queue, do its things and at the end check the queue again for more elements, if there arent any, terminate it is also easier to use threads for stuff like this Commented Mar 13, 2019 at 21:48
  • 1
    I'm not clear on what you're trying to do. Your method does appear to be synchronous, though of course loop.run_until_complete() will block. I don't see where messages are being put into the queue except at the top. You mention feeding messages in manually... what does that mean? Commented Mar 13, 2019 at 21:57
  • @NikolasStevenson-Molnar I have another Thread that polls an external network resource for data (I/O intensive) and dumps the incoming messages into this thread. This is the handler to receive the incoming data. Commented Mar 13, 2019 at 22:51
  • In that case, it seems like you're using asyncio in the wrong place. It's awesome for network I/O, but doesn't seem to be doing any good in your use here (at least from what's shown in your code example); you've basically constructed an async routine that essentially acts like a synchronous one. Commented Mar 13, 2019 at 23:34

2 Answers 2

8

You need to make your queue an asyncio.Queue, and add things to the queue in a thread-safe manner. For example:

qq = asyncio.Queue()

class MyApp():
    def __init__(self, q):
        self._queue = q

    def _process_item(self, item):
        print(f'Processing this item: {item}')

    async def get_item(self):
        item = await self._queue.get()
        self._process_item(item)

    async def listen_for_orders(self):  
        '''
        Asynchronously check the orders queue for new incoming orders
        '''
        while True:
            await self.get_item()

a = MyApp(qq)

loop = asyncio.get_event_loop()

loop.run_until_complete(a.listen_for_orders())

Your other thread must put stuff in the queue like this:

loop.call_soon_threadsafe(qq.put_nowait, <item>)

call_soon_threadsafe will ensure correct locking, and also that the event loop is woken up when a new queue item is ready.

Sign up to request clarification or add additional context in comments.

3 Comments

This is good, but when I call loop.run_until_complete(a.listen_for_orders()) it blocks me from inputting more commands into the python console. I need it to not block
@JoshD Then run run_until_complete in an background thread, and read from the console in the main thread. The code should work regardless of which thread the event loop is run in, as long as you use call_soon_threadsafe to add stuff to the queue.
Thanks. It's not exactly solving my problem though, I suspect I have something wrong with my overall architecture if I'm combining threads and async. I see from your other answers that you have a great grasp of async, would you mind if I contacted you offline for assistance with this? Are you available? Thanks.
3

This is not an async queue. You need to use asyncio.Queue

qq = queue.Queue()

Async is an event loop. You call the loop transferring control to it and it loops until your function is complete which never happens:

loop.run_until_complete(a.listen_for_orders())

You commented:

I have another Thread that polls an external network resource for data (I/O intensive) and dumps the incoming messages into this thread.

Write that code async - so you'd have:

async def run():
    while 1:
        item = await get_item_from_network()
        process_item(item)

loop = asyncio.get_event_loop()
loop.run_until_complete( run() )

If you don't want to do that what you can do is step through the loop though you don't want to do this.

import asyncio


def run_once(loop):
    loop.call_soon(loop.stop)
    loop.run_forever()


loop = asyncio.get_event_loop()

for x in range(100):
    print(x)
    run_once(loop)

Then you simply call your async function and each time you call run_once it will check your (asyncio queue) and pass control to your listen for orders function if the queue has an item in it.

1 Comment

Thanks, but my get_item_from_network() is not awaitable - it's a call to Azure Storage Queue Python SDK to get messages from the queue, and I believe it's built on top of requests which is blocking. So that's why I run the code that gets those messages in a separate thread and transfer to a queue.Queue

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.