0

I am using RetryableTopic to enable kafka retry mechanism in a consumer. This is the sample code snippet I have tried.

@RetryableTopic(
        attempts = "3", 
        backoff = @Backoff(delay = 2000, multiplier = 2.0),
        autoCreateTopics = "false",
        retryTopicSuffix = ".retry",
        dltTopicSuffix = ".dlq"
)
@KafkaListener(
        groupId = "${group.listener.id}",
        topics = {"${topic.name}"},
        concurrency = "${concurrency}",
        properties = {
                "max.poll.interval.ms=${topic.properties.max.poll.interval.ms}",
                "max.poll.records=${topic.properties.max.poll.records}"
        }
)
public void consume() {
    // do something
}

I have created the retry topics and DLT according to the naming convention.

Original Topic: my.original.topic

Retry Topics: my.original.topic.retry-0, my.original.topic.retry-1

DLT: my.original.topic.dlq

But it gives the folowing error. It says retry topics are not present. But they are already created and functioning. (To ensure that, Tried to manually publishing for retry topics and DLT and it worked)

org.apache.kafka.common.errors.TimeoutException: Topic my.original.topic.retry-0 not present in metadata after 60000 ms. 

ERROR 7824  [ntainer#0-0-C-1] o.s.k.l.DeadLetterPublishingRecoverer    : Dead-letter publication to my.original.topic.retry-0 failed for: omy.original.topic-2@133

org.springframework.kafka.KafkaException: Send failed
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:835) ~[spring-kafka-3.3.0.jar:3.3.0]
at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:792) ~[spring-kafka-3.3.0.jar:3.3.0]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:595) ~[spring-kafka-3.3.0.jar:3.3.0]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:690) ~[spring-kafka-3.3.0.jar:3.3.0]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.send(DeadLetterPublishingRecoverer.java:599) ~[spring-kafka-3.3.0.jar:3.3.0]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.sendOrThrow(DeadLetterPublishingRecoverer.java:565) ~[spring-kafka-3.3.0.jar:3.3.0]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:534) ~[spring-kafka-3.3.0.jar:3.3.0]
at org.springframework.kafka.listener.FailedRecordTracker.attemptRecovery(FailedRecordTracker.java:228) ~[spring-kafka-3.3.0.jar:3.3.0]

What am I missing here?

5
  • maybe this is your problem? autoCreateTopics = "false", set it to true. Commented Aug 26 at 15:46
  • @JorgeCampos The problem is in my case I need to use manually created topics by some other party. Commented Aug 26 at 15:51
  • Then the message is clear. The topic doesn't exists. Did you make sure they are available for the application to use (on the environment you are running)? Commented Aug 26 at 16:46
  • @JorgeCampos I was able to find the root cause. It was not pointing to the correct bootsrap server. I had to configure spring.kafka.producer.bootstrap-servers explicitly. spring.kafka.properties.bootstrap.servers did not work. Anyway Thank you very much for your inputs and support. Commented Aug 29 at 3:27
  • 1
    That's great! Because you were using a wrong properties, here is a link that is useful to have docs.spring.io/spring-boot/appendix/application-properties/… All the properties available on Spring framework. Commented Aug 29 at 15:07

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.