-1

For items in an Azure queue that my application cannot process, I want them in a dead letter queue. Ignoring the differences between a DLQ and a PMQ, we only want to be able to deal with two queues. Right now, when dequeue count exceeds my retry limit the SDK moves my messages from emp-details to a queue called emp-details-poison (which if not present, it creates). I want it to move the failed messages to emp-details-dead-letter. Is this possible at all?

I cannot find this in documentation. AI suggests using

 "extensions": {
   "queues": {
     "maxDequeueCount": 3, // Number of times to attempt processing before sending to poison/dead letter queue
/*OPTION1*/     "poisonQueueNameSuffix": "-dead-letter" // Name of the poison queue suffix
/*OPTION2*/     "poisonQueueName": "myqueue-poison"  // Name of the poison queue
   }
 }

Both poisonQueueNameSuffix and poisonQueueName are non existent in the host.json spec. The documentation suggests manually dequeuing messages to a custom queue - but it seems wasteful to do

 if (message.DequeueCount >= _maxRetryCount)
{
   _logger.LogWarning($"Message {message.MessageId} has reached max retry count ({_maxRetryCount}). Moving to custom poison queue.");
   await MoveToCustomDeadLetterQueueAsync(message);
   return; // Complete the function successfully to remove from original queue
}

An if check for every single message is going to chalk up significant delays.

How do I use a custom queue for "-poison" messages?

Edit: What I ended up doing:

Not being able to use custom DLQ or PMQ names seems very archaic. Regardless, below is roughly what I ended up doing for a QueueTrigger function. Used Polly to setup a retry count - maxRetryCount and move a message to the required queue when the count exceeds.

 var policyResult = await _retryPolicy
                                     .ExecuteAndCaptureAsync(async (context) =>
                                     {
                                         employeeDetailsUpdated = await _empService.UpdateEmpDetails(oid);            
                                     },
                                     new Context { 
                                         { 
                                             "MessageId", message.MessageId 
                                         } 
                                     });//new Context and policyResult in Polly are important parts of managing contextual information during retries.

 //policyResult.Outcome will be OutcomeType.Failure after the policy has exhausted all of its retry attempts (after reaching _maxRetryCount)
 if (policyResult.Outcome == OutcomeType.Failure)
 {
     //if 3 attempts to dequeue have failed, then move the message to dead letter queue. By default, azure moves failed messages to {queuename}-poison queue
     _logger.LogWarning("Message {MessageId} has reached max retry count. Moving to custom dead letter queue.", message.MessageId);
     await EmpDetailsUpdateDeadLetter(oid);// The original message will still be completed/removed from the queue
 }
3
  • You're correct that poisonQueueNameSuffix and poisonQueueName are not part of the official host.json spec for Azure Storage Queues. The built-in behavior automatically moves messages to <queue-name>-poison when they exceed maxDequeueCount, and there is no direct way to change that default queue name via configuration. Commented Mar 24 at 14:21
  • Azure Functions always puts failed messages in a *-poison queue. If you want them in a different queue, you have to move them yourself when they fail too many times. @happybuddha Commented Mar 25 at 3:10
  • Downvoter - care to explain how "This question does not show any research effort; it is unclear or not useful" ? Commented May 15 at 7:10

1 Answer 1

0

Use custom queue name for a poison queue

The messages are moving to a queue named emp-details-poison, but you wanted them to be moved to a custom dead-letter queue, such as emp-details-dead-letter.

I used Polly for retry logic and manually moved the failed messages to the custom dead-letter queue after the retry limit was exceeded.

QueueProcessor.cs:

using Azure.Storage.Queues;
using Microsoft.Extensions.Logging;
using Polly;
using Polly.Retry;
using System;
using System.Threading.Tasks;
using AzureQueueProcessor.Services;
namespace AzureQueueProcessor
{
    public class QueueProcessor
    {
        private readonly IEmpService _empService;
        private readonly ILogger<QueueProcessor> _logger;
        private readonly QueueClient _mainQueue;
        private readonly QueueClient _deadLetterQueue;
        private readonly AsyncRetryPolicy _retryPolicy;
        public QueueProcessor(IEmpService empService, ILogger<QueueProcessor> logger)
        {
            _empService = empService;
            _logger = logger;
            string connectionString = "Your-Connection-String-Here";
            _mainQueue = new QueueClient(connectionString, "emptyqueue"); 
            _deadLetterQueue = new QueueClient(connectionString, "emptyqueue-deadletter"); 
            _mainQueue.CreateIfNotExists();
            _deadLetterQueue.CreateIfNotExists();
            _retryPolicy = Policy
                .Handle<Exception>()
                .WaitAndRetryAsync(3, attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
                    (exception, timeSpan, retryCount, context) =>
                    {
                        _logger.LogWarning($"Retry {retryCount} for message {context["MessageId"]}. Exception: {exception.Message}");
                    });
        }
        public async Task StartProcessing()
        {
            _logger.LogInformation("QueueProcessor started. Listening for messages...");

            while (true)
            {
                var response = await _mainQueue.ReceiveMessageAsync();
                if (response.Value != null)
                {
                    string messageText = response.Value.MessageText;
                    string messageId = response.Value.MessageId;
                    _logger.LogInformation($"Processing message: {messageText}");
                    var policyResult = await _retryPolicy.ExecuteAndCaptureAsync(async (context) =>
                    {
                        string messageId = context["MessageId"].ToString();
                        bool success = await _empService.UpdateEmpDetailsAsync(messageText);
                        if (!success)
                        {
                            throw new Exception("Max retries reached. Moving to dead-letter queue.");
                        }
                    },
                    new Context { { "MessageId", messageId } }); 
                    if (policyResult.Outcome == OutcomeType.Failure)
                    {
                        _logger.LogWarning($"Message {messageId} failed after retries. Moving to dead-letter queue.");
                        await _deadLetterQueue.SendMessageAsync(messageText);                     
                    }
                    else
                    {
                        _logger.LogInformation($"Message {messageId} processed successfully.");
                    }
                    await _mainQueue.DeleteMessageAsync(response.Value.MessageId, response.Value.PopReceipt);
                }
                await Task.Delay(2000);             
            }
        }
    }
}

EmpService.cs:

using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;
namespace AzureQueueProcessor.Services
{
    public interface IEmpService
    {
        Task<bool> UpdateEmpDetailsAsync(string empId);
    }
    public class EmpService : IEmpService
    {
        private readonly ILogger<EmpService> _logger;
        public EmpService(ILogger<EmpService> logger)
        {
            _logger = logger;
        }
        public async Task<bool> UpdateEmpDetailsAsync(string empId)
        {
            _logger.LogInformation($"Updating employee details for {empId}");
            try
            {
                 throw new Exception("Simulated processing failure");
                _logger.LogInformation($"Successfully updated details for {empId}");
                return true;
            }
            catch (Exception ex)
            {
                _logger.LogError($"Error updating details for {empId}: {ex.Message}");
                return false; 
            }
            finally
            {
                await Task.CompletedTask;
            }
        }
    }
}

the queue processor listens for messages, processes them, retries them up to 3 times ,if processing fails it moves them to a custom dead-letter queue if they cannot be processed successfully.

Output: enter image description here

enter image description here

Sign up to request clarification or add additional context in comments.

1 Comment

@happybuddha Check if above provided solution works for you? Let me know if I can be helpful here anyway with further input?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.