1

I hooked up an Eventhandler to RabbitMQ message publisher and I am trying to insert those messages into a DB table as and when they are published. But when multiple messages are published simultaneously I am facing issue to access the database while querying. A System.NotSupportedException is thrown saying that a second operation has started before the previous asynchronous operation is completed.

public class RmqMessageHandler
{

    public void Register()
    {
        EventingBasicConsumer consumer = new EventingBasicConsumer(RmqChannel);
        consumer.Received += EventingBasicConsumer.Received;
        RmqChannel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);

    }

    private async void EventingBasicConsuner_Received(object sender, BasicDeliverEventArgs message)
    {
        var body = message.Body;
        var encodedmessage = Encoding.UTF8.GetString(body.Span);
        var transaction = await DbRepo.Query<AccTransaction>().Where(X => X.Id == encodedmessage.Id).ToListAsync();
        if(transaction.count == 0)
        {
            DbRepo.StageNewItem(transaction);                
        }
    }
}

I have tried using lock to lock the dbcontext in order to avoid concurrency issue like below.

lock(locker)
{
    var transaction = DbRepo.Query<AccTransaction>().Where(X => X.Id == encodedmessage.Id).ToList();
    if(transaction.count == 0)
    {
        DbRepo.StageNewItem(transaction);                
    }
}

but that is also throwing System.Data.Entity.Core.EntityException saying The underlying provider failed on open or System.InvalidOperationException saying that the previous connection was not closed.

How can I overcome this issue..

11
  • Maybe it helps if you make DbRepo a local variable which you create newly at every call of EventingBasicConsuner_Received Commented Feb 20, 2024 at 12:06
  • Are you disposing the context somewhere? Check the amount of connections in your connection pool if your amount of connections is increasing indefinitely, maybe the connection are not being closed. Commented Feb 20, 2024 at 12:09
  • @Luty Yeah the previous connection is not being closed(since the read and/or write operation is not completed) before trying to establish a new one(since new message has arrived and it also needs to written to the DB) Commented Feb 20, 2024 at 12:35
  • @SomeBody I am not able to understand why the locking scenario is not working here Commented Feb 20, 2024 at 12:48
  • Your DbRepo needs to create a service scope for each operation and use a different DbContext instance Commented Feb 20, 2024 at 13:06

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.