0

I have a .NET Windows Service project deployed to an Azure App Service. My goal is to receive 100,000 messages from an Azure Service Bus queue and process these messages in a message handler. However, after processing approximately 800 messages, the service stops receiving new messages and becomes idle. How can I ensure that the Azure Service Bus continues to receive and process all 100,000 messages without stopping?

public void ReceiveMessages(string queueName, string namespaceConnectionString, string appInsightInstrumentationKey, string domainUrl, SecretClient client)

{

  var clientOptions = new ServiceBusClientOptions()
  {
      TransportType = ServiceBusTransportType.AmqpWebSockets,
  };
  var serviceBusClient = new ServiceBusClient(namespaceConnectionString, clientOptions);
  var adminClient = new ServiceBusAdministrationClient(namespaceConnectionString);
  var processorOptions = new ServiceBusProcessorOptions
  {
      AutoCompleteMessages = false,
      MaxAutoLockRenewalDuration = TimeSpan.FromHours(1)
  };
  var serviceBusProcessor = serviceBusClient.CreateProcessor(queueName, new ServiceBusProcessorOptions());
  _serviceBusProcessor = serviceBusProcessor;

  var handlerParams = new MessageHandlerParameter
  {
      AppInsightInstrumentationKey = appInsightInstrumentationKey,
      DomainUrl = domainUrl,
      Client = client
  };
  try
  {
      // Add handler to process messages;
      serviceBusProcessor.ProcessMessageAsync += (args) => MessageHandlerAsync(args, handlerParams);

      // Add handler to process any errors;
      serviceBusProcessor.ProcessErrorAsync += ErrorHandlerAsync;

      serviceBusProcessor.StartProcessingAsync();

      Console.WriteLine("Wait sometime for processing to start");
      Task.Delay(Timeout.InfiniteTimeSpan);
  }
  catch (Exception ex)
  {
  }
  finally
  {
      _ = serviceBusProcessor.DisposeAsync();
      _ = serviceBusClient.DisposeAsync();
  }

}

I am expecting to receive 100k messages without stopping.

7
  • Can you share your code in the question? Commented Jan 8 at 9:31
  • Set PrefetchCount and MaxConcurrentCalls properly, and check if your App Service and Service Bus settings can handle the load. Commented Jan 8 at 9:33
  • var processorOptions = new ServiceBusProcessorOptions { AutoCompleteMessages = false, MaxAutoLockRenewalDuration = TimeSpan.FromHours(1) }; I set these options. I didn't add prefetch count and concurrent calls because I am doing strictly receiving one message at a time. I set a flag in the messages and when that flag comes true I sent the email accordingly. I think when I set the Prefetch count and MaxConcurrentCalls it invoked my handler. I will not persue to next message untill my previous message is not processed. Commented Jan 8 at 10:03
  • Set MaxConcurrentCalls to 1 and PrefetchCount to 0 to ensure strict one-at-a-time message processing, and handle scaling properly based on your App Service and Service Bus limits. Commented Jan 8 at 10:12
  • 1. Could you share callbacks code. 2. You're calling asynchronous methods w/o awaiting. Commented Jan 8 at 14:01

1 Answer 1

0

Azure service bus queue not received all messages from the queue When the project is deployed on app service

Make sure you are not sending more than 100 messages per a transaction as mentioned in the Document and I do agree with @Sean Feldman and @Jesse Squire .

Below code works for me to handle big numbers of messages:

using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;


class Program
{
    private static ServiceBusProcessor _serviceBusProcessor;

    static async Task Main(string[] args)
    {
        string ser_q_nme = "rithq";
        string rithcon = "Endpoint=sb://demoser898.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=1zJgPECrithwikhk=";
        await Rith_Get_Msg(ser_q_nme, rithcon);
    }

    public static async Task Rith_Get_Msg(string ser_q_nme, string rithcon)
    {
        var rithcl = new ServiceBusClientOptions
        {
            TransportType = ServiceBusTransportType.AmqpWebSockets
        };
        var risbc = new ServiceBusClient(rithcon, rithcl);
        var ripro = new ServiceBusProcessorOptions
        {
            AutoCompleteMessages = false,
            MaxAutoLockRenewalDuration = TimeSpan.FromHours(1),
            MaxConcurrentCalls = 50
        };
        var ri_serbuspro = risbc.CreateProcessor(ser_q_nme, ripro);
        _serviceBusProcessor = ri_serbuspro;
        try
        {
            ri_serbuspro.ProcessMessageAsync += async (args) =>
            {
                await MessageHandlerAsync(args);
            };

            ri_serbuspro.ProcessErrorAsync += ErrorHandlerAsync;

            await ri_serbuspro.StartProcessingAsync();
            Console.WriteLine("Hello Rithwik, Messages recieved are in proccessing, To stop the processing press Enter");
            Console.ReadLine();

            await ri_serbuspro.StopProcessingAsync();
        }
        catch (Exception cho)
        {
            Console.WriteLine($"Exception: {cho.Message}");
        }
        finally
        {
            await ri_serbuspro.DisposeAsync();
            await risbc.DisposeAsync();
        }
    }
    private static async Task MessageHandlerAsync(ProcessMessageEventArgs args)
    {
        try
        {
            string ri_bdy = args.Message.Body.ToString();
            Console.WriteLine($"Received message: {ri_bdy}");
            await args.CompleteMessageAsync(args.Message);
        }
        catch (Exception cho)
        {
            Console.WriteLine($"Processing error: {cho.Message}");

            await args.AbandonMessageAsync(args.Message);
        }
    }
    private static Task ErrorHandlerAsync(ProcessErrorEventArgs rith)
    {
        Console.WriteLine($"Hello Rithwik, An error: {rith.Exception.Message}, Error Entity Path: {rith.EntityPath}");
        return Task.CompletedTask;
    }
}

Output:

Messages are recieved:

enter image description here

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.