0

I have 1000 queues with specific names. so I want to process these queues with one broker. is it possible?

the queue names is stored in mysql db so I should fetch theme and run the broker for each one. and of course it should run asynchronously and should be able to pass the queued item to a idle broker. is this possible? or I should make 1000 files with specific queue names as brokers?

Update: this is a picture of my queues. the queues should run in a parallel manner not a serial one. so the users are producer and the worker is consumer that runs the send_message() method;

enter image description here

2
  • it should run asynchronously - which part should be asynchronous? Why should it be asynchronous? Do you know what asynchronous is? Commented Jun 20, 2017 at 14:54
  • actually its a little bit confusing for me. I updated the question to demonstrate my purpose. please have another look. Commented Jun 21, 2017 at 7:59

1 Answer 1

2

I can show you how to it with enqueue library. I must warn you, there is no way to consume messages asynchronously in one process. Though you can run a few processes that serve a set of queues. They could be divided into groups by the queue importance.

Install the AMQP transport and consumption library:

composer require enqueue/amqp-ext enqueue/enqueue

Create a consumption script. I assume that you have an array of queue names already fetched from DB. They are stored in $queueNames var. The example bound the same processor to all queues but you can set different ones, of course.

<?php

use Enqueue\AmqpExt\AmqpConnectionFactory;
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Psr\PsrMessage;
use Enqueue\Psr\PsrProcessor;

// here's the list of queue names which you fetched from DB
$queueNames = ['foo_queue', 'bar_queue', 'baz_queue'];

$factory = new AmqpConnectionFactory('amqp://');

$context = $factory->createContext();

// create queues at RabbitMQ side, you can remove it if you do not need it
foreach ($queueNames as $queueName) {
    $queue = $context->createQueue($queueName);
    $queue->addFlag(AMQP_DURABLE);

    $context->declareQueue($queue);
}

$consumer = new QueueConsumer($context);

foreach ($queueNames as $queueName) {
    $consumer->bind($queueName, function(PsrMessage $psrMessage) use ($queueName) {
        echo 'Consume the message from queue: '.$queueName;

        // your processing logic.

        return PsrProcessor::ACK;
    });
}

$consumer->consume();

More in doc

Sign up to request clarification or add additional context in comments.

13 Comments

so if there is some items in foo_queue and baz_queue, the baz Items will process after foo items done? or the have same priority and will process together in multiple worker-process?
and another thing is if a worker is working on a queue item then another worker will fetch the same item and work on it? or fetch another item?
The queue consumer switches between the queues all the time. If you have some messages in foo queue and bar queue it will process them in this order foo,bar,foo,bar and so on.
There is no way to get the message being currently processed. This is RabbitMQ responsibility to control such stuff.
so if i have 1000 queues the second item in first queue will process after processing of all 999 other queue's first items?
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.