I am reading data from a Kafka topic using Kafka Streams and more specifically GlobalKTable to populate my store. In the case of corrupt data that can not successfully be parsed I wish to throw a message to a DLQ topic. When using a KStream I would traditionally use branching and produce DLQ messages, but after reading documentation for GlobalKTables it does not seem like it's possible out of the box to throw away faulty messages. I could ignore them and log an exception, but how would I read from my Kafka topic using GlobalKTables and produce to a Kafka DLQ topic?
Here is a sample of how I would set up my consumer to populate my store
var myStoreBuilder = Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("MY_STORE_NAME"),
Serdes.String(),
Serdes.String()
);
myStreamsBuilder().addGlobalStore(
myStoreBuilder,
"my_topic",
Consumed.with(Serdes.String(), Serdes.String()),
new DataGlobalProcessorSupplier()
);
Any tips and tricks are appreciated.