37

I have an ASP.NET Core application where I would like to consume RabbitMQ messages.

I have successfully set up the publishers and consumers in command line applications, but I'm not sure how to set it up properly in a web application.

I was thinking of initializing it in Startup.cs, but of course it dies once startup is complete.

How to initialize the consumer in a the right way from a web app?

6
  • 2
    Are you sure ASP.NET is the right place to host the RabbitMQ consumer ? Can you have command line app consuming RabbitMQ messages, and upon receiving them post to ASP.NET ? Commented Apr 25, 2017 at 11:37
  • I'm not sure, but it would be the most convenient because we don't currently have a regime to deploy and run other types of apps. I suppose a Windows Service would do the trick, but if there is a safe and sound way to do it from our web app that would be great Commented Apr 25, 2017 at 11:44
  • It could also be worth mentioning that the web app in question already does related background jobs with Hangfire, so it feels like a logical place to place it Commented Apr 25, 2017 at 11:47
  • 3
    Hangfire is not about long lived objects. Huge overhead. Commented Apr 25, 2017 at 12:27
  • 2
    I see many problems with using a request/response host (a web server) for hosting a long-lived eventing consumer. Keeping a stable number of consumers across recycles, IIS shutting down the process etc adds extra complexity and web servers are simply not designed for this use case. I see that hangfire also supports hosting in Windows Services, so that way you might be able to get the benefit of hangfire and an appropriate host. Commented Apr 26, 2017 at 5:06

5 Answers 5

42

Use the Singleton pattern for a consumer/listener to preserve it while the application is running. Use the IApplicationLifetime interface to start/stop the consumer on the application start/stop.

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        services.AddSingleton<RabbitListener>();
    }


    public void Configure(IApplicationBuilder app)
    {
        app.UseRabbitListener();
    }
}

public static class ApplicationBuilderExtentions
{
    //the simplest way to store a single long-living object, just for example.
    private static RabbitListener _listener { get; set; }

    public static IApplicationBuilder UseRabbitListener(this IApplicationBuilder app)
    {
        _listener = app.ApplicationServices.GetService<RabbitListener>();

        var lifetime = app.ApplicationServices.GetService<IApplicationLifetime>();

        lifetime.ApplicationStarted.Register(OnStarted);

        //press Ctrl+C to reproduce if your app runs in Kestrel as a console app
        lifetime.ApplicationStopping.Register(OnStopping);

        return app;
    }

    private static void OnStarted()
    {
        _listener.Register();
    }

    private static void OnStopping()
    {
        _listener.Deregister();    
    }
}
  • You should take care of where your app is hosted. For example, IIS can recycle and stop your code from running.
  • This pattern can be extended to a pool of listeners.
Sign up to request clarification or add additional context in comments.

10 Comments

what is RabbitListener here ?
@Prageeth it is just the code to take messages from the queue. Your own implementation would depend on the system requirements and the queue definition. You can find a lot of examples in the web, and one of those is github.com/plwestaxiom/RabbitMQ/blob/master/RabbitMQ_Tutorials/…
consumer.Received += (model, ea) => this called only once when application startup and not listening.I dont know if I miss anything Llya.
I miss this line Console.ReadLine(); if I put this it works charm.
When use Console.ReadLine() you don't have any problem but when removing Conosle.ReadLine() just one time execute consumer, but i use IHostService but these make me run tow console app at a same time, one for ASP.NET Core MVC 2 project and one for RabbitMQ Client
|
17

This is My Listener:

public class RabbitListener
{
    ConnectionFactory factory { get; set; }
    IConnection connection { get; set; }
    IModel channel { get; set; }

    public void Register()
    {
        channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body);
            int m = 0;
        };
        channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
    }

    public void Deregister()
    {
        this.connection.Close();
    }

    public RabbitListener()
    {
        this.factory = new ConnectionFactory() { HostName = "localhost" };
        this.connection = factory.CreateConnection();
        this.channel = connection.CreateModel();


    }
}

Comments

9

Another option is Hosted Services.

You can create a HostedService and call a method to register RabbitMq listener.

public interface IConsumerService
{
    Task ReadMessgaes();
}

public class ConsumerService : IConsumerService, IDisposable
{
    private readonly IModel _model;
    private readonly IConnection _connection;
    public ConsumerService(IRabbitMqService rabbitMqService)
    {
        _connection = rabbitMqService.CreateChannel();
        _model = _connection.CreateModel();
        _model.QueueDeclare(_queueName, durable: true, exclusive: false, autoDelete: false);
        _model.ExchangeDeclare("your.exchange.name", ExchangeType.Fanout, durable: true, autoDelete: false);
        _model.QueueBind(_queueName, "your.exchange.name", string.Empty);
    }
    const string _queueName = "your.queue.name";
    public async Task ReadMessgaes()
    {
        var consumer = new AsyncEventingBasicConsumer(_model);
        consumer.Received += async (ch, ea) =>
        {
            var body = ea.Body.ToArray();
            var text = System.Text.Encoding.UTF8.GetString(body);
            Console.WriteLine(text);
            await Task.CompletedTask;
            _model.BasicAck(ea.DeliveryTag, false);
        };
        _model.BasicConsume(_queueName, false, consumer);
        await Task.CompletedTask;
    }

    public void Dispose()
    {
        if (_model.IsOpen)
            _model.Close();
        if (_connection.IsOpen)
            _connection.Close();
    }
}

RabbitMqService:

public interface IRabbitMqService
{
    IConnection CreateChannel();
}

public class RabbitMqService : IRabbitMqService
{
    private readonly RabbitMqConfiguration _configuration;
    public RabbitMqService(IOptions<RabbitMqConfiguration> options)
    {
        _configuration = options.Value;
    }
    public IConnection CreateChannel()
    {
        ConnectionFactory connection = new ConnectionFactory()
        {
            UserName = _configuration.Username,
            Password = _configuration.Password,
            HostName = _configuration.HostName
        };
        connection.DispatchConsumersAsync = true;
        var channel = connection.CreateConnection();
        return channel;
    }
}

And finally create a HostedService and call ReadMessages method to register:

public class ConsumerHostedService : BackgroundService
{
    private readonly IConsumerService _consumerService;

    public ConsumerHostedService(IConsumerService consumerService)
    {
        _consumerService = consumerService;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await _consumerService.ReadMessgaes();
    }
}

Register services:

services.AddSingleton<IRabbitMqService, RabbitMqService>();
services.AddSingleton<IConsumerService, ConsumerService>();
services.AddHostedService<ConsumerHostedService>();

In this case when application stopped, your consumer automatically will stop.

Additional information:

appsettings.json:

{
  "RabbitMqConfiguration": {
    "HostName": "localhost",
    "Username": "guest",
    "Password": "guest"
  }
}

RabbitMqConfiguration

public class RabbitMqConfiguration
{
    public string HostName { get; set; }
    public string Username { get; set; }
    public string Password { get; set; }
}

Reference

6 Comments

The code is not working. it is showing DI error.
@sina_Islam Share your error message
@sina_Islam I don't know how you implement rabbitmq . For more information you can check this link
thanks brother, it is working. Actually, I am implementing it at nopCommerce as a plugin to consume order place event and missed some configuration that is why it was not working properly. Your code is perfect and working as expected.
awesome! for anyone following this code example, don't forget to link the configuration to the configuration class in Program.cs: builder.Services.Configure<RabbitMqConfiguration>(builder.Configuration.GetSection("RabbitMqConfiguration"));
|
1

Late 2025 update

Took for reference Farhad Zamani's answer. Some classes in SDK was removed, some methods are async-only now, so might save some time for somebody

Code below is valid for:

  • NET Core 9

  • RabbitMQ.Client 7.1.1

RabbitMqService.cs

using RabbitMQ.Client;

public interface IRabbitMqService
{
    Task<IConnection> CreateConnectionAsync();
}

public class RabbitMqService : IRabbitMqService
{
    private readonly RabbitMqConfiguration _configuration;
    public RabbitMqService(IOptions<RabbitMqConfiguration> options)
    {
        _configuration = options.Value;
    }

    public async Task<IConnection> CreateConnectionAsync()
    {
        var connectionFactory = new ConnectionFactory()
        {
            UserName = _configuration.Username,
            Password = _configuration.Password,
            HostName = _configuration.HostName, // in my case just IP address, w/o http/s, slashaes, port 
            Port = _rmqPort, // if exists, has to be defined separately
            VirtualHost = "/", // has to be defined, at least in my case
        };
        connectionFactory.ConsumerDispatchConcurrency = 1;
        var connection = await connectionFactory.CreateConnectionAsync();
        return connection;
    }
}

ConsumerService.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;

public interface IConsumerService
{
    Task ReadMessgaes();
}

public class ConsumerService : IConsumerService, IDisposable
{
    private readonly IConnection _connection;
    private readonly IChannel _channel;
    private readonly string _queueName = "your.queue.name";

    public ConsumerService(IRabbitMqService rabbitMqService)
    {
        _connection = rabbitMqService.CreateConnectionAsync().Result;
        _channel = _connection.CreateChannelAsync().Result;
    }

    public async Task ReadMessgaes()
    {
        var consumer = new AsyncEventingBasicConsumer(_channel);
        consumer.ReceivedAsync += async (ch, ea) =>
        {
            // that might not be text, so watch what you take from ea.Body
            var message = System.Text.Encoding.Default.GetString(ea.Body.ToArray());

            // ...do whatever you need with payload

            await _channel.BasicAckAsync(ea.DeliveryTag, false); // or BasicNackAsync if you cannot process message now and want to try one more time
        };
        // this consumer tag identifies the subscription
        // when it has to be cancelled
        string consumerTag = await _channel.BasicConsumeAsync(_queueName, false, consumer);
    }

    public void Dispose()
    {
        _channel.CloseAsync().Wait();
        _connection.CloseAsync().Wait();
        _channel.DisposeAsync();
        _connection.DisposeAsync();
    }
}

The rest is the same

Comments

0

One of the best ways I found is to use a BackgroundService

   public class TempConsumer : BackgroundService
{
    private readonly ConnectionFactory _factory;
    private IConnection _connection;
    private IModel _channel;

    public TempConsumer()
    {
        _factory = new ConnectionFactory()
        {
            HostName = "localhost",
            UserName = "guest",
            Password = "password",
            VirtualHost = "/",
        };
        _connection = _factory.CreateConnection() ;
        _channel = _connection.CreateModel();
        _channel.QueueDeclare(queue: "queue",
                                durable: false,
                                exclusive: false,
                                autoDelete: false,
                                arguments: null);
    }
  
    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        stoppingToken.ThrowIfCancellationRequested();
        
        var consumer = new EventingBasicConsumer(_channel);

        consumer.Shutdown += OnConsumerShutdown;
        consumer.Registered += OnConsumerRegistered;
        consumer.Unregistered += OnConsumerUnregistered;
        consumer.ConsumerCancelled += OnConsumerConsumerCancelled;


        consumer.Received += (model, ea) =>
        {
            Console.WriteLine("Recieved");
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body.ToArray());
            _channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            Console.WriteLine(message);
        };


        _channel.BasicConsume(queue: "queue",
                             autoAck: false,
                             consumer: consumer);

        return Task.CompletedTask;
    }

    private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e) { }
    private void OnConsumerUnregistered(object sender, ConsumerEventArgs e) { }
    private void OnConsumerRegistered(object sender, ConsumerEventArgs e) { }
    private void OnConsumerShutdown(object sender, ShutdownEventArgs e) { }
    private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e) { }

Then, Register the consumer as a hosted service services.AddHostedService<EmailConsumer>();

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.