0

We have a message that can run for up to 1 hour. It needs to be processed only once. We are getting issues where the message is becoming available to other processors however when we attempt to complete the message.

Here is the setup in service bus:

service bus

We setup the service bus in the following way:

var options = new ServiceBusProcessorOptions
{
    AutoCompleteMessages = false,

    MaxConcurrentCalls = 1,
    PrefetchCount = 0,
    ReceiveMode = ServiceBusReceiveMode.PeekLock,
    MaxAutoLockRenewalDuration = TimeSpan.FromHours(2),

};

We then handle the message in the following way:

private async Task HandleMessageAsync(ProcessMessageEventArgs processMessageEventArgs)
{
    try
    {
        var rawMessageBody = Encoding.UTF8.GetString(processMessageEventArgs.Message.Body);
        _logger.LogInformation("Received message {MessageId} with body {MessageBody}",
            processMessageEventArgs.Message.MessageId, rawMessageBody);

        var repoRequest = JsonConvert.DeserializeObject<TMessage>(rawMessageBody);
        if (repoRequest != null)
        {
            await ProcessMessage(repoRequest, processMessageEventArgs.Message.MessageId,
                processMessageEventArgs.Message.ApplicationProperties,
                processMessageEventArgs.CancellationToken);
        }
        else
        {
            _logger.LogError(
                "Unable to deserialize to message contract {ContractName} for message {MessageBody}",
                typeof(TMessage), rawMessageBody);
        }

        _logger.LogInformation("Message {MessageId} processed", processMessageEventArgs.Message.MessageId);

        await processMessageEventArgs.CompleteMessageAsync(processMessageEventArgs.Message);
    }
    catch (Exception ex)
    {
        await processMessageEventArgs.AbandonMessageAsync(processMessageEventArgs.Message);
        _logger.LogError(ex, "Unable to handle message");
    }
}

However we are getting constant ServiceBusReceiver.RenewMessageLock exceptions along the way which is fine as the message keeps processing. However at the end of the message when we manually call await processMessageEventArgs.CompleteMessageAsync(processMessageEventArgs.Message); it is failing with

Azure.Messaging.ServiceBus.ServiceBusException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. For more information please see https://aka.ms/ServiceBusExceptions . Reference:xxxxxxx, TrackingId:xxxxx, SystemTracker:gi::G9:4521499:amqps://xxxxxx.servicebus.windows.net/-eb5b7cf4;25:30:31:source(address:/xxxxxx,filter:[]), bi::in-connection1648(G9-96965)::session1654::link417153, Timestamp:2025-02-05T11:37:31 (MessageLockLost). For troubleshooting information, see https://aka.ms/azsdk/net/servicebus/exceptions/troubleshoot. at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.DisposeMessageAsync(Guid lockToken, Outcome outcome, DispositionStatus disposition, TimeSpan timeout, IDictionary2 propertiesToModify, String deadLetterReason, String deadLetterDescription) at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.CompleteInternalAsync(Guid lockToken, TimeSpan timeout) at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.<>c.<<CompleteAsync>b__47_0>d.MoveNext() --- End of stack trace from previous location --- at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.<>c__221.<b__22_0>d.MoveNext() --- End of stack trace from previous location --- at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose) at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose) at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken) at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.CompleteAsync(Guid lockToken, CancellationToken cancellationToken) at Azure.Messaging.ServiceBus.ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage message, CancellationToken cancellationToken)

What should we be doing differently. I know that even though MaxAutoLockRenewalDuration is set to 2 hours, Azure doesn't guarantee renewal under all conditions. But how should we be handling this?

5
  • Two hours is not an insignificant amount of time. Lock renewal from the client-side is definitely not a guaranteed operation. The question is do you get occasional lock lost exceptions or all the time. If it's all the time, I'd check with the service team (github.com/Azure/azure-service-bus/issues) to see if the duration you're choosing is supported. Based on the answer, plan accordingly. You could complete the message by adding a tracking record in the DB and issuing a "refresh status" message in the future until long-running tasks is completed but in involves a lot of custom code. Commented Feb 5 at 19:30
  • I think that it's better to utilize the Inbox pattern. Take the message, save it locally, complete the message. Then later process it in any way you like. I don't think that Service Bus was made for your scenario. Commented Feb 5 at 21:04
  • Azure does not guarantee automatic renewal, so you can call RenewMessageLockAsync periodically while processing the message. If this is a rare issue use explicit lock renewal or if this happens all the time: Use a tracking database or the Inbox pattern as @SeanFeldman and @VladDX said. Commented Feb 6 at 4:20
  • Thanks so much for your answers. Yes the renewal message exception happens all the time. Usually however the message doesn't reprocess once we call complete message. But as you said we likely need to make a note of the messageid and then track it to completion or just throw to reprocess the message if required. Little confused though - what is servicebus meant for if not jobs? What is a better practice? Commented Feb 6 at 8:38
  • @SureshChikkam: "Azure does not guarantee automatic renewal" is an unfounded statement. Lock renewal is entirely a client-side operation - whether automatic or manual. The only difference is whether the processor calls RenewMessageLockAsync or an app does. When a lock is lost in a scenario like this, it generally is because the service state is lost, not that renewal was not attempted. Commented Feb 6 at 19:15

1 Answer 1

0

Auto-lock renewal is not reliable for long-running tasks. manually renew the lock.

As @SeanFeldman, @VladDX said,

  • Store the message in a database or blob storage.

  • Acknowledge the message immediately process it separately in an independent worker.

Instead of Auto-lock renewal session-based messages provides an exclusive lock that lasts as long as the session remains active.

Azure Service Bus locks the entire session as long as you keep the session open, the message won’t be available to other consumers.

Same processor continues handling the session’s messages until it’s closed.

Enable "session" while creating the queue in service bus.

enter image description here

  • Consume all messages in the queue using your existing regular queue processor before enabling sessions.

  • Once the queue is empty, enable sessions and start sending messages with SessionId.

Updated the Sender to Including SessionId:

var message = new ServiceBusMessage(Encoding.UTF8.GetBytes("Your message body"))
{
    SessionId = "MySession-" + Guid.NewGuid().ToString() // Each message belongs to a session
};
await sender.SendMessageAsync(message);

Use a Session-Based Processor:

var processor = client.CreateSessionProcessor(queueName, new ServiceBusSessionProcessorOptions
{
    AutoCompleteMessages = false,
    MaxConcurrentSessions = 1, // Ensures only one processor at a time
    MaxConcurrentCallsPerSession = 1, // Ensures messages in a session are processed sequentially
    SessionIdleTimeout = TimeSpan.FromMinutes(5), // Releases session lock if idle
    PrefetchCount = 0
});

// Message handler for session-based processing
processor.ProcessMessageAsync += async args =>
{
    try
    {
        var messageBody = Encoding.UTF8.GetString(args.Message.Body);
        _logger.LogInformation("Processing message {MessageId} in session {SessionId}", args.Message.MessageId, args.SessionId);

        // Simulate long processing time (e.g., 1 hour)
        await Task.Delay(TimeSpan.FromMinutes(60));

        // Complete the message after processing
        await args.CompleteMessageAsync(args.Message);
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "Error processing message {MessageId}");
        await args.AbandonMessageAsync(args.Message);
    }
};

// Error handler
processor.ProcessErrorAsync += args =>
{
    _logger.LogError(args.Exception, "Error in Service Bus session processor");
    return Task.CompletedTask;
};

// Start the processor
await processor.StartProcessingAsync();
Sign up to request clarification or add additional context in comments.

2 Comments

This is inaccurate. Messages with Session are still subject to 5 minutes lease and the will suffer from the same issue if processing takes 2h.
@Suresh Chikkam: I'd challenge the accuracy of the statement "auto-lock renewal is not appropriate for long-running tasks." So long as the max duration is set correctly, there's no difference between automatic renewal and manual. The failure point for scenarios such as this is generally the service state. Long-running tasks risk having a service node migrate, crash/recover or encounter an idle timeout, which invalidates the lock.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.