You can create a separate MassTransit receive endpoint or background service that listens to the DLQ directly and processes or logs those messages.
Program.cs
{
services.AddMassTransit(x =>
{
x.AddConsumer<MyMessageConsumer>();
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host("your-connection-string", h =>
{
h.TransportType = Microsoft.Azure.ServiceBus.TransportType.AmqpWebSockets;
});
cfg.ConfigureEndpoints(context);
});
});
services.AddHostedService<DeadLetterProcessorService>();
})
.Build()
.RunAsync();
}
public class MyMessage
{
public string Value { get; set; }
}
public class MyMessageConsumer : IConsumer<MyMessage>
{
public async Task Consume(ConsumeContext<MyMessage> context)
{
Console.WriteLine($"Received message: {context.Message.Value}");
throw new Exception("Simulated database failure");
}
}
DLQ Listener Service
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Hosting;
using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
public class DeadLetterProcessorService : BackgroundService
{
private readonly string connectionString = "your-connection-string";
private readonly string queueName = "your-queue-name"; // No "-$DeadLetterQueue" here
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var client = new ServiceBusClient(connectionString);
var receiver = client.CreateReceiver(
queueName,
new ServiceBusReceiverOptions
{
SubQueue = SubQueue.DeadLetter
});
while (!stoppingToken.IsCancellationRequested)
{
var messages = await receiver.ReceiveMessagesAsync(maxMessages: 10, TimeSpan.FromSeconds(5), stoppingToken);
foreach (var message in messages)
{
try
{
string body = message.Body.ToString();
Console.WriteLine($"[DLQ] Received dead-lettered message: {body}");
await receiver.CompleteMessageAsync(message, stoppingToken);
}
catch (Exception ex)
{
Console.WriteLine($"[DLQ] Failed to process message: {ex.Message}");
}
}
await Task.Delay(5000, stoppingToken);
}
}
}
