I am using kafka @KafkaListener with @Transactional("kafkaTransactionManager").My flow will be something read/process/commit using transaction consumer.
public void listen(List<ConsumerRecord> records) {
// do some database activities
final Object result = kafkaTemplate.send(producerRecord).get();
//I am sending data and waiting for response to capture offset and store in DB
long offset = ((SendResult) result).getRecordMetadata().offset();
int partition = ((SendResult) result).getRecordMetadata().partition();
log.info(offset ,partition );
}
When anything happen before or at sender then transaction rolled back and it read from same offset happy path flow.
Issue - Say data is published and after consumer is going to commit the offset and i get error org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully committing offsets In this case consumer will read the same offset again ( i think so since transaction is NOT successfully completed)?
If yes - this will cause the duplicate since message already sent?
If No - then how the message will be removed from target topic ( where producer sent the message)
I am using acks = -1 and idempotence = true.