4,016 questions
Best practices
0
votes
0
replies
27
views
Throw message on DLT/DLQ from GlobalKTable using Kafka Streams
I am reading data from a Kafka topic using Kafka Streams and more specifically GlobalKTable to populate my store. In the case of corrupt data that can not successfully be parsed I wish to throw a ...
0
votes
0
answers
73
views
Kafka Streams main consumer fetch rate stays low after GlobalKTable (RocksDB) restore
I’m running a Kafka Streams application (tested with versions 3.9.1 and 4.1.0) that uses a GlobalKTable backed by RocksDB. There are multiple instances of the application, each with 4 stream threads.
...
0
votes
1
answer
32
views
Can the statestore be shared across two processors?
Here is a simple topology:
stream1
.merge(stream2)
.merge(stream3)
.to("topic1)
stream2 = builder.stream("topic1")
stream2.
process(() -> new Processor<>() { process() { Read ...
0
votes
1
answer
48
views
Wrong partitions are assigned to KStream threads after topic selectKey / repartition
I need to consume two topics A with 40 partitions and B with 10 partitions and keep some info in a shared persistent state store with social security number SSN of type string as its key and a custom ...
-2
votes
1
answer
35
views
Linear processing a kafka topic based on a constraint
I have a kafka topic producing product for multiple companies
{"company": "c1", "productIds": ["p1", "p2", "p3"]}
{"company": &...
0
votes
1
answer
52
views
Does Kafka Streams StreamTask process records from multiple co-partitioned topics sequentially or in parallel?
I have read the explanation written here that one StreamTask is handeling all messages from co-partitioned topics: How do co-partitioning ensure that partition from 2 different topics end up assigned ...
0
votes
1
answer
46
views
How can I emit the delete records to the output topic, and make that key tombstoned and removed from KTable internal state?
I'm working on a Change Data Capture (CDC) application using Kafka Streams, and I have the following setup:
A KStream is converted to a KTable (let's call it kt1).
I perform a left join between kt1 ...
0
votes
0
answers
23
views
Kafka Consumer unable to read messages because of partitions
I have written sample Kafka Producer application without mentioning partition which sends message to Consumer application.
My Consumer application is written using Kafka Streams as it is using state ...
4
votes
0
answers
168
views
decrease response time in Kafka streams
I have a project with kafka streams to create one minute candle on price for stock. My topology code is :
List<String> inputTopics = new ArrayList<>();
inputTopics.add(tradeTopic);
...
0
votes
1
answer
35
views
How does Kafka Streams' KTable.groupBy + aggregate guarantee correct order of the re-partitioned messages?
If I have a kafka input topic with multiple partitions and then in Kafka Streams I use kStream.map to change the key of each record and write that to an output topic, I will face the problem, that ...
0
votes
1
answer
29
views
How do I access the actual messages comprising a kstream sliding window aggregation?
I have a stream of <K,V> messages. When emitting any satisfied sliding windows, I want to know the list that the window matched.
I want to avoid taking on the accumulation job myself within my ....
1
vote
1
answer
121
views
Kafka stream message processing semantics
I am designing a Kafka stream app and want to know few details in order to design my failover strategy. I tried reading Kafka stream doc and existing stackoverflow posts, but was unable to find the ...
2
votes
1
answer
132
views
Kafka Streams default value for session.timeout.ms?
We are using Kafka Streams and Karpenter with normal Deployment in order to manage the pods for a service that we have.
After Karpenter decides to kill the pod, it brings a new Pod up, and we are ...
0
votes
1
answer
32
views
default max.compaction.lag.ms for changelog
I use below:
windowedBy(TimeWindows.of(Duration.ofHours(6)))
.aggregate(aggregator, aggregator,
Materialized.as("my-agg"))
Changelog was created with below configs
cleanup....
0
votes
1
answer
54
views
Kafka Streams KTable-KTable foreign key join emits null even if right side is empty
What is the semantics for a Kafka Streams (3.7.1) KTable-KTable foreign key join, where the extracted foreign key has never matched against the primary key in the right-side ktable?
In this example ...
0
votes
1
answer
29
views
In Helidon SE 4.1.6 , how to send data to a specific partition using kafka producer
I want to use Helidon SE 4.1.6 and producer the data to a specific partition of Apache Kafka using producer.
Detail :
I have gone through the https://helidon.io/docs/latest/se/reactive-messaging#...
1
vote
1
answer
59
views
Is there any side effect of setting repartition.purge.interval.ms config with too high value in order to prevent purge?
We are using Kafka Streams in our application to process events.
In order to join operation we use both repartition and selectKey methods of Kafka Streams and this cause to create an internal ...
1
vote
0
answers
85
views
Error retrieving state store during graceful shutdown
Context and analysis
Our Spring Boot application relies on information stored in Kafka to answer REST requests. It does so by retrieving information from a global state store via the ...
2
votes
1
answer
44
views
Kafka streams partition assigner not using all available clients
We have a Java application which uses Streams library to process data. The streams application does not join data from multiple topics and processes each message received independently. The ...
0
votes
0
answers
22
views
SessionWindows in Kafka
I'm reading a Confluent blog about Windowing in Kafka Strams:
https://www.confluent.io/blog/windowing-in-kafka-streams/
and I found this under the Session Windows:
If your inactivity window is too ...
1
vote
1
answer
76
views
Seeing duplicate records created in a streams app's output
I have a Kafka Streams app that takes input from a Kafka Topic, aggregates it on three fields in the original's value in 5 minute windows. On the output side of this, I need to translate the ...
0
votes
0
answers
65
views
kafka-streams does not add the right kafka-clients dependency
org.apache.kafka:kafka-streams:jar:3.9.0 is suposed to use org.apache.kafka:kafka-clients:jar:3.9.0, but when I run mvn dependency:tree, I get
[INFO] +- org.apache.kafka:kafka-streams:jar:3.9.0:...
1
vote
1
answer
192
views
Multiple topologies in Spring Kafka streams
What's the easiest or best way to create multiple topologies in a Spring Boot Kafka streams application?
Is it possible to use the same default StreamBuilder bean? Or if I need to create a new ...
0
votes
0
answers
71
views
TraceId not printing in kafka-stream application
I have kafka stream application which is running in springboot-2.x, kafka-streams-2.5.1 and spring-cloud-sleuth (log tracing) and I’m using KafkaStreamsTracing for print traceId and spanId. Which is ...
0
votes
1
answer
31
views
App producing 2 messages for only 1 input
I'm trying to debug a problem in our production Kafka Streams app. The (simplified) topology looks something like this
builder.stream("input").groupByKey().reduce(
(agg, val) -> "...