2

The following python script uses read temperature from Ruuvi tag. In the synchronous Ruuvi callback we want to call a method that is async (send_message_to_output). The following code will on the second time it's called raise an exception

RuntimeError: Event loop is closed

How can I get handle_data to work multiple times?

import asyncio
from azure.iot.device.aio import IoTHubModuleClient
from ruuvitag_sensor.ruuvi import RuuviTagSensor

async def main():
    device_client = IoTHubModuleClient.create_from_edge_environment()
    await device_client.connect()

    def handle_data(found_data):
        asyncio.get_event_loop().run_until_complete(device_client.send_message_to_output("some data", "ruuvi"))

    while True:
        RuuviTagSensor.get_datas(handle_data)
        time.sleep(5)

    await device_client.disconnect()

if __name__ == "__main__":
    asyncio.run(main())
3
  • What about create_task for each of the send_message_to_output calls instead of run_until_complete. Use the latter instead of the asyncio.run and pass the loop to the main as param? Commented Apr 1, 2020 at 6:52
  • Could you elaborate? Commented Apr 1, 2020 at 6:53
  • I will answer with two solutions. Commented Apr 1, 2020 at 6:57

1 Answer 1

3

According to your exception it seems that the loop is closed for some reason. I think it is due to the run_until_complete at the handle_data function that cause a reaction that closes the loop.

Therefor I would suggest to try the following:

import asyncio
from azure.iot.device.aio import IoTHubModuleClient
from ruuvitag_sensor.ruuvi import RuuviTagSensor

async def main(main_loop):
    tasks = list()
    device_client = IoTHubModuleClient.create_from_edge_environment()
    await device_client.connect()

    def handle_data(found_data):
        nonlocal main_loop
        nonlocal tasks
        tasks.append(main_loop.create_task(device_client.send_message_to_output("some data", "ruuvi")))

    while True:
        RuuviTagSensor.get_datas(handle_data)
        # We need to wait async in order to let the tasks run
        await asyncio.sleep(5)

    # This is just an insurance that all the tasks (messages to output) completed
    await asyncio.wait(tasks, timeout=5)
    await device_client.disconnect()

if __name__ == "__main__":
    # Creating and closing the loop here
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop)
    loop.close()

Alternative (more complex) solution can be using a function that read from a queue and call the send_message_to_output function:

import asyncio
from azure.iot.device.aio import IoTHubModuleClient
from ruuvitag_sensor.ruuvi import RuuviTagSensor

async def main(main_loop):
    q = asyncio.Queue()
    stopping = asyncio.Event()

    device_client = IoTHubModuleClient.create_from_edge_environment()
    await device_client.connect()

    async def send_msg():
        nonlocal q
        nonlocal stopping
        nonlocal device_client
        while not stopping.is_set():
            msg, sender = await q.get()
            if msg is None and sender is None:
                break
            await device_client.send_message_to_output(msg, sender)

    def handle_data(found_data):
        nonlocal q
        nonlocal stopping
        if stopping.is_set():
            return
        q.put_nowait(("some data", "ruuvi"))

    while True:
        RuuviTagSensor.get_datas(handle_data)
        await asyncio.sleep(5)

    send_msg_task = main_loop.create_task(send_msg())

    await q.put((None, None))
    await stopping.set()
    await send_msg_task
    await device_client.disconnect()

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop)
    loop.close()

The idea here was to separate the handle_data from the send_msg. This way I manage to make the send_msg an async function that now does not need to create loop or a Task

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks a lot! So the task will start automatically after it has been created and appended to the list? We don't have to await the tasks? This regarding the first example.
I added tasks = [task for task in tasks if task._state != 'FINISHED'] after get_datas call to clean tasks from the list

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.