I hooked up an Eventhandler to RabbitMQ message publisher and I am trying to insert those messages into a DB table as and when they are published. But when multiple messages are published simultaneously I am facing issue to access the database while querying. A System.NotSupportedException is thrown saying that a second operation has started before the previous asynchronous operation is completed.
public class RmqMessageHandler
{
public void Register()
{
EventingBasicConsumer consumer = new EventingBasicConsumer(RmqChannel);
consumer.Received += EventingBasicConsumer.Received;
RmqChannel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
}
private async void EventingBasicConsuner_Received(object sender, BasicDeliverEventArgs message)
{
var body = message.Body;
var encodedmessage = Encoding.UTF8.GetString(body.Span);
var transaction = await DbRepo.Query<AccTransaction>().Where(X => X.Id == encodedmessage.Id).ToListAsync();
if(transaction.count == 0)
{
DbRepo.StageNewItem(transaction);
}
}
}
I have tried using lock to lock the dbcontext in order to avoid concurrency issue like below.
lock(locker)
{
var transaction = DbRepo.Query<AccTransaction>().Where(X => X.Id == encodedmessage.Id).ToList();
if(transaction.count == 0)
{
DbRepo.StageNewItem(transaction);
}
}
but that is also throwing System.Data.Entity.Core.EntityException saying The underlying provider failed on open or System.InvalidOperationException saying that the previous connection was not closed.
How can I overcome this issue..
DbRepoa local variable which you create newly at every call ofEventingBasicConsuner_ReceivedDbReponeeds to create a service scope for each operation and use a differentDbContextinstance