0

I have the following code (I redacted it and made it a bit simpler).

class AbstractRateLimiter(ABC):
    @abstractmethod
    async def acquire(self, *args, **kwargs):
        pass

    @abstractmethod
    async def release(self):
        pass


class RateLimiterB(AbstractRateLimiter):
    def __init__(self, rpm: int = 0):
        # from aiolimiter import AsyncLimiter
        self.limiter = AsyncLimiter(rpm) if rpm > 0 else None

    async def acquire(self, *args, **kwargs):
        if self.limiter:
            await self.limiter.acquire()

    async def release(self):
        pass


class RateLimiterA(AbstractRateLimiter):
       
    def __init__(self, concurrency_limit: int = 0):
        self._semaphore = asyncio.Semaphore(concurrency_limit) if concurrency_limit > 0 else None

    async def acquire(self, *args, **kwargs):
        if self._semaphore:
            await self._semaphore.acquire()

    async def release(self):
        if self._semaphore:
            self._semaphore.release()

and the following code that uses these limiters:

async def execute_async(self, body: dict) -> ChatAdapterResponse:
    """
    Returns:
        ChatAdapterResponse: a ChatAdapterResponse from the model
    """        
    rate_limiter = RateLimiterA(1) # RateLimterA(1) / RateLimterB(2) both do not work, try any one of the two.
    if rate_limiter:
            await rate_limiter.acquire()
    try:
        # async send, is wrapper around requests
        response = await async_send_request(
            "SomeUrl", body, self._get_headers()
        )
        return response
    finally:
        if rate_limiter:
            await rate_limiter.release()

When the code was using unittest it works fine, and waits properly:

class TestRateLimiterA():
  def test_rate_limitA():
     for i in range(self.TEST_COUNT):
                start_time = asyncio.get_running_loop().time()
                res = await execute_async(
                    SOME_DICT
                )
                end_time = asyncio.get_running_loop().time()
                print(
                    f"Finished requests for {model} in {end_time - start_time} seconds"
                )
                # assert that from the second try onwards, rpm limit = 1 ensure that the request takes at least 60 seconds
                if i > 0:
                    self.assertTrue(end_time - start_time > 60)
                )

def test_rate_limiter_B():
    start_time = asyncio.get_running_loop().time()
    tasks = [
        async_execute_on_model(
            body
        )
        for body in bodies # array of 4 dicts
    ]
    results = await asyncio.gather(*tasks)
    end_time = asyncio.get_running_loop().time()
    print(f"Finished requests in {end_time - start_time} seconds")
    self.assertTrue(
        end_time - start_time > 5 * (len(bodys) - 1)
    )

However when I tried to switch to using pytest, it seems the async loop and waits are not longer waiting, and the following code executes very quickly and does not wait or respect the synchronization objects. Any idea why ?

async def test_rate_limit_A(mock_send_request, 1rpm_rate_limiterA):

      for i in range(TEST_COUNT):
          start_time = asyncio.get_running_loop().time()
          res = await async_execute_on_model(
              SomeDict
          )
            end_time = asyncio.get_running_loop().time()
            print(
                f"Finished requests for {dict} in {end_time - start_time} seconds"
            )
            # assert that from the second try onwards, rpm limit = 1 ensure that the request takes at least 60 seconds
            if i > 0:
                assert end_time - start_time > 60  

async def test_rate_limitB(mock_async_requestB, 1rpm_rate_limiterB):
    
    start_time = asyncio.get_running_loop().time()
    tasks = [
        async_execute_on_model(
            body
        )
        for body in bodies
    ]
    results = await asyncio.gather(*tasks)
    end_time = asyncio.get_running_loop().time()
    print(f"Finished requests in {end_time - start_time} seconds")
    
    assert end_time - start_time > 5 * (len(bodies) - 1)
    for res in results:

Why is this happening, and what am I doing wrong with pytest-async?

4
  • Does this answer your question? How to test async function using pytest? Commented Sep 30, 2023 at 8:41
  • The test are passing.. I read how to test async. The issue is not error, it just the synchronization objects are not respected and they do not wait.. Commented Sep 30, 2023 at 16:13
  • Please edit your question and make sure all code is syntactically correct. For example: execute_async is missing triple quotes before the docstring; the functions in your unittest code use await despite not being async; you divide one limiter class by another but division is not defined for types by default. Also, your pytest test functions are missing the @pytest.mark.asyncio decorator, so they won't be executed at all. Commented Oct 1, 2023 at 11:20
  • @danzel putting aside the missing """ the code is redacted to make it simpler. I'm not diving, I mean it can be either I changed the comment. I use pytest auto, this code runs, I will try to setup a working cloud snippet. Commented Oct 2, 2023 at 8:35

1 Answer 1

0

You're instantiating a new rate limiter for each request in execute_async. Each limiter will allow its single request to be executed immediately.

You need to make sure the same limiter instance is used for all requests.

Sign up to request clarification or add additional context in comments.

1 Comment

yes I figured out the same, while i was setting up an online python ide, so you don't have to work through the code so hard..thank you!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.