1

I am using kafka @KafkaListener with @Transactional("kafkaTransactionManager").My flow will be something read/process/commit using transaction consumer.

 public void listen(List<ConsumerRecord> records) {    
  
    // do some database activities           
    final Object result = kafkaTemplate.send(producerRecord).get(); 
    //I am sending data and waiting for response to capture offset and store in DB
    long offset = ((SendResult) result).getRecordMetadata().offset();
    int partition = ((SendResult) result).getRecordMetadata().partition();
    log.info(offset ,partition  );
}

When anything happen before or at sender then transaction rolled back and it read from same offset happy path flow.

Issue - Say data is published and after consumer is going to commit the offset and i get error org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully committing offsets In this case consumer will read the same offset again ( i think so since transaction is NOT successfully completed)?

If yes - this will cause the duplicate since message already sent?
If No - then how the message will be removed from target topic ( where producer sent the message)

I am using acks = -1 and idempotence = true.

1 Answer 1

0

First of all, make sure that ack is -1 and idempotence is true, because one will get ConfigException, saying that "ack must be all in order to use idempotent producer"

The consumer in the listener container will read the same message again, because consumer position will be reset by method recordAfterRollback of ListenerConsumer, when tx is rolled back on exception.

Commiting offset (via producer's sendOffsetsToTransaction) and sending message via kafka template is done in the same kafka transaction and the same producer instance.

If you're worried about duplicates in the topic, which may occur, set isolation.level for your consumers = read_committed, this will make consumers read only committed messages.
You can read about kafka transactions and how it works here

Also, since you're inserting something in a database, read how to sync jdbc and kafka transactions here. Because KafkaTransactionManager can't cover this alone.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.