0

Currently I have a service which read events from a queue in SQS and process the found messages. I have a concurrency with 2 threads reading at the same time the queue to process the messages. The iteration is one second to validate if there are messages in the queue. This has increased the AWS costs a lot. Looking for a solution, Now I want to have a dynamic delay or something similar when the service read the messages from the queue. These are the steps I want to implement in my method

If there are not messages in the queue increase the delay to read again the queue in one second. For example, the service read the queue and there are not messages, the delay will be increased from 1 second to 2 seconds. It will be working in the same way until the delay was 60 seconds. In case of found a message the delay will be reset to 1 second and the validation will start again from 1 second to 60.

Basically I want to know if there are or not messages in the queue and inside of that validation increase the delay to read the queue.

This is the method I have created to read the messages from the queue.

private void createPollingStream() {
        Multi.createBy().repeating()
                .supplier(
                        () -> sqsMessagePoller.pollUDMUsages() //read messages from the queue. In case of not messages I want to increase the delay
                                .runSubscriptionOn(Infrastructure.getDefaultExecutor())
                                .onItem().transformToUniAndMerge(this::processMessagesQueue) //If the poll has messages we call to another method to process them.
                                .onFailure().invoke(failure ->
                                        Log.errorf("Error processing the messages: %s", failure))
                                .subscribe()
                                .with(succ -> Log.info("Current iteration of processing message complete"),
                                        failure -> Log.error("Failed to process message in flow", failure)
                                )
                ).withDelay(Duration.ofMillis(delay))//Initial delay of read messages from the queue. I guess it should be dynamic here.
                .indefinitely()
                .subscribe().with(x -> Log.info("Current iteration of processing message complete"),
                        failure -> Log.error("Failed to poll messages in flow", failure));
    }

1 Answer 1

0

Your code is not going to work, because once the pipeline has been created, it’s not going reevualate itself, so delay is not going to work as you want.

However, you can introduce a dynamic delay by calling it inside .call() and do the delay there.

Can you try something like this:

public class QueuePollingService {

private final AtomicLong delay = new AtomicLong(1000);
private static final long MAX_DELAY = 60000; 
private static final long INITIAL_DELAY = 1000; 

private void startPolling() {
    Multi.createBy().repeating()
            .supplier(() -> pollQueue()
                    .call(() -> Uni.createFrom().nullItem().onItem().delayItBy(Duration.ofMillis(delay.get()))) 
            )
            .indefinitely()
            .subscribe().with(
                    x -> Log.info("Polling iteration complete"),
                    failure -> Log.error("Polling stopped due to failure", failure)
            );
}

private Uni<Void> pollQueue() {
    return sqsMessagePoller.pollUDMUsages()
            .toUni() // Convert Multi<List<Message>> to Uni<List<Message>>
            .onItem().ifNull().continueWith(List.of()) // Ensure it’s never null
            .runSubscriptionOn(Infrastructure.getDefaultExecutor()) // Ensure execution
            .onItem().invoke(messages -> {
                if (messages.isEmpty()) {
                    delay.updateAndGet(d -> Math.min(d * 2, MAX_DELAY)); // Increase delay
                    Log.infof("No messages found, increasing delay to %d ms", delay.get());
                } else {
                    delay.set(INITIAL_DELAY); // Reset delay on message arrival
                    Log.infof("Messages found (%d), resetting delay to 1 second.", messages.size());
                }
            })
            .onItem().transformToUniAndMerge(this::processMessagesQueue)
            .onFailure().invoke(failure ->
                    Log.errorf("Error processing messages: %s", failure))
            .replaceWithVoid();
}

}

Note: I’m on my phone, so Ive let this code generated by ChatGPT for me.

@Allanh, I’ve updated the answer, can you try it again ?

Sign up to request clarification or add additional context in comments.

3 Comments

thanks to reply. I've tried to replicate the code you sent me but it didn't work. 1. The first delay is the default of the Mutiny because no matter which initial value I use, it's not taken into account. 2. The conditions inside this: .onItem().invoke(messages -> { if there are not messages the lambda never will go inside. The entire .onItem().invoke(messages -> { is skipped. If you can give me another suggestion it would be great. Something else, this processMessagesQueue method returns a Multy<List>>
thanks to edit the code. I've tried and result is the same. The onItem().invoke( is being skipped. Something else I noticed is no matter what INITIAL_DELAY I set, it's never taken into account. I did the test with 10000(10seconds) and the delay is not working, the method is being run 100 times in one second I think.
I've tried with Multi.createBy().repeating().uni(() instead to use the supplier and the delay it's working, the issue if there are errors, no matter if I have .onFailure, the execution is stopped. When I use the supplier, the delay is not taken into account no matter what I try.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.