I am using RetryableTopic to enable kafka retry mechanism in a consumer. This is the sample code snippet I have tried.
@RetryableTopic(
attempts = "3",
backoff = @Backoff(delay = 2000, multiplier = 2.0),
autoCreateTopics = "false",
retryTopicSuffix = ".retry",
dltTopicSuffix = ".dlq"
)
@KafkaListener(
groupId = "${group.listener.id}",
topics = {"${topic.name}"},
concurrency = "${concurrency}",
properties = {
"max.poll.interval.ms=${topic.properties.max.poll.interval.ms}",
"max.poll.records=${topic.properties.max.poll.records}"
}
)
public void consume() {
// do something
}
I have created the retry topics and DLT according to the naming convention.
Original Topic: my.original.topic
Retry Topics: my.original.topic.retry-0, my.original.topic.retry-1
DLT: my.original.topic.dlq
But it gives the folowing error. It says retry topics are not present. But they are already created and functioning. (To ensure that, Tried to manually publishing for retry topics and DLT and it worked)
org.apache.kafka.common.errors.TimeoutException: Topic my.original.topic.retry-0 not present in metadata after 60000 ms.
ERROR 7824 [ntainer#0-0-C-1] o.s.k.l.DeadLetterPublishingRecoverer : Dead-letter publication to my.original.topic.retry-0 failed for: omy.original.topic-2@133
org.springframework.kafka.KafkaException: Send failed
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:835) ~[spring-kafka-3.3.0.jar:3.3.0]
at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:792) ~[spring-kafka-3.3.0.jar:3.3.0]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:595) ~[spring-kafka-3.3.0.jar:3.3.0]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:690) ~[spring-kafka-3.3.0.jar:3.3.0]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.send(DeadLetterPublishingRecoverer.java:599) ~[spring-kafka-3.3.0.jar:3.3.0]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.sendOrThrow(DeadLetterPublishingRecoverer.java:565) ~[spring-kafka-3.3.0.jar:3.3.0]
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:534) ~[spring-kafka-3.3.0.jar:3.3.0]
at org.springframework.kafka.listener.FailedRecordTracker.attemptRecovery(FailedRecordTracker.java:228) ~[spring-kafka-3.3.0.jar:3.3.0]
What am I missing here?
autoCreateTopics = "false",set it to true.