0

I have an application in .NET and I use Mass Transit. In Azure I am running Azure Service Bus.

The following situation is happening to me. It happens that I get an exception in the consumer, for example, an unavailable database. It crashes into the Mass transit error queue on that I have a consumer too, however it crashes into the unavailable database there too. And even if I have retry set there, it doesn't get resolved. So the message goes to my DLQ. What now? I actually don't even care that it fails. The problem is that it doesn't flag something in the error queue in the database, and because of it my whole application gets “stuck”.

I don't know how to handle this situation. Should I write some routine that checks my DLQ and processes those messages? Or is there already something from Mass Transsit ready for this?

Thanks a lot

4
  • You can handle this with Faults: masstransit.io/documentation/concepts/exceptions#faults Commented Apr 25 at 7:38
  • @bgajic If I'm not mistaken, the error queue is something different than the DLQ. It's just that I use the error queue and then when Commented Apr 25 at 7:42
  • You're correct, but you can use Faults to handle this scenario: " I actually don't even care that it fails. The problem is that it doesn't flag something in the error queue in the database, and because of it my whole application gets “stuck”.". In any case, you can also customize DLQ, have a look at the docs. You will probably get an answer from the author also. Commented Apr 25 at 7:43
  • I'm dealing with the situation when it crashes in the consumer of the fault (error queue). And then the message is moved to the DLQ. Commented Apr 25 at 13:21

1 Answer 1

0

You can create a separate MassTransit receive endpoint or background service that listens to the DLQ directly and processes or logs those messages. Program.cs

       {
                    services.AddMassTransit(x =>
                    {
                        x.AddConsumer<MyMessageConsumer>();

                        x.UsingAzureServiceBus((context, cfg) =>
                        {
                            cfg.Host("your-connection-string", h =>
                            {
                                h.TransportType = Microsoft.Azure.ServiceBus.TransportType.AmqpWebSockets;
                            });

                            cfg.ConfigureEndpoints(context);
                        });
                    });

                    services.AddHostedService<DeadLetterProcessorService>();
                })
                .Build()
                .RunAsync();
        }
public class MyMessage
{
    public string Value { get; set; }
}

public class MyMessageConsumer : IConsumer<MyMessage>
{
    public async Task Consume(ConsumeContext<MyMessage> context)
    {
        Console.WriteLine($"Received message: {context.Message.Value}");

        throw new Exception("Simulated database failure");
    }
}

DLQ Listener Service

using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Hosting;
using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

public class DeadLetterProcessorService : BackgroundService
{
    private readonly string connectionString = "your-connection-string";
    private readonly string queueName = "your-queue-name"; // No "-$DeadLetterQueue" here

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var client = new ServiceBusClient(connectionString);
        var receiver = client.CreateReceiver(
            queueName,
            new ServiceBusReceiverOptions
            {
                SubQueue = SubQueue.DeadLetter
            });

        while (!stoppingToken.IsCancellationRequested)
        {
            var messages = await receiver.ReceiveMessagesAsync(maxMessages: 10, TimeSpan.FromSeconds(5), stoppingToken);

            foreach (var message in messages)
            {
                try
                {
                    string body = message.Body.ToString();
                    Console.WriteLine($"[DLQ] Received dead-lettered message: {body}");

                    await receiver.CompleteMessageAsync(message, stoppingToken);
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"[DLQ] Failed to process message: {ex.Message}");
                }
            }

            await Task.Delay(5000, stoppingToken);
        }
    }
}

Output

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.