8

Is it possible to return execution to event loop from a function. And as soon Task will be completed return to the function and continue execution?

Im trying to use pytest-asyncio plugin

Example:

@pytest.mark.asyncio
async def test_async1(event_loop):
    print('start 1')
    res = event_loop.create_task(
        send_async_request("http://test.com", limit=1000))) # here I need to return execution to event loop and continue only after getting response from send_async_request function 
    print('end1',res)



@pytest.mark.asyncio
async def test_async2(event_loop):
    print('start 2')
    res = event_loop.create_task(
        send_async_request("http://test2", limit=1000))) # here I need to return execution to event loop and continue only after getting response from send_async_request function
    print('end2', res)

send_async_request- aiohttp:

@asyncio.coroutine
def send_async_request(url, method='GET'):
    with aiohttp.ClientSession() as session:
        resp = yield from session.get(url, timeout=60)
        if resp.status == 200:
            return resp.status, response
        else:
            return resp.status, False

1 Answer 1

7

When your tests are marked with pytest.mark.asyncio, they become coroutines so you can use the await syntax:

@pytest.mark.asyncio
async def test_sleep(event_loop):
    result = await asyncio.sleep(1, result=3, loop=event_loop)
    assert result == 3

EDIT: Another example, with multiple sleep operations:

@pytest.mark.asyncio
async def test_multiple_sleep(event_loop):
    tasks = [event_loop.create_task(asyncio.sleep(1, result=x))
             for x in range(10)]
    results = await asyncio.gather(*tasks)
    assert results == list(range(10))
Sign up to request clarification or add additional context in comments.

6 Comments

but when i'm using await my code became sync. I'm wanted to send request to both urls async and continue testing. If I will add await then there will be no difference between sync and async code . If I understand it right
For example if you add new function test_sleep2 and add sleep time = 6 , then total sleep time will be 7 seconds , instead 6 which I expected. I'm expecting it in case of http request - because in aiohttp - http request are non-blocking
@Arseniy I added an example running multiple tasks within a test coroutine.
Is it possible to run multiple tests this way? The better what I found so far -is to add gather to pytest-asyncio itself
@Arseniy Sure, every test runs with a different event_loop.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.