1

I have a Kafka Streams app that takes input from a Kafka Topic, aggregates it on three fields in the original's value in 5 minute windows. On the output side of this, I need to translate the aggregated message (where key = complex object, value = the aggregate) into a message where key = one of the fields in the complex object, and the value is the complex object + aggregate into a new field. Finally that output is persisted to an Iceberg table via the Iceberg Kafka Connector. The translation is required because of this part, Iceberg connector can only persist the value object not the key.

When the data hits the Iceberg connector, I'm seeing duplicate entries, e.g. 4 or 6 values for the same time window. I've ruled out late messages as the cause, as I've verified this inconsistent output exists in test environments so I'm certain it's a bug in my code. The duplication is happening before it hits iceberg.

I'm new when it comes to Kafka Streams. My understanding was that RocksDB is used as an intermediate store and there's several state stores managed within additional Kafka topics. The result being that the final message isn't written to the destination topic until the window is elapsed (and the grace period elapsed).

What possible reasons would cause duplicate messages getting outputted?

1 Answer 1

0

With Kafka Streams windowed aggregations, by default it will emit intermediate results and not wait until the window closes to emit a final one. For Kafka Streams to emit a final result, you'll need to set the EmitStrategy.onWindowClose() like this:

builder.stream(topic,....))
        .groupByKey(....))
        .windowedBy(TimeWindows.ofSizeAndNoGrace(Duration.ofMinutes(5)))
        .emitStrategy(EmitStrategy.onWindowClose())
        .aggregate(....)
       .....

You set the emit strategy on the TimeWindowedKStream object that results from calling KStream.groupByX().windowedBy(..)..

HTH and let me know how it goes.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.