I am reading data from a Kafka topic using Kafka Streams and more specifically GlobalKTable to populate my store. In the case of corrupt data that can not successfully be parsed I wish to throw a message to a DLQ topic. When using a KStream I would traditionally use branching and produce DLQ messages, but after reading documentation for GlobalKTables it does not seem like it's possible out of the box to throw away faulty messages. I could ignore them and log an exception, but how would I read from my Kafka topic using GlobalKTables and produce to a Kafka DLQ topic?

Here is a sample of how I would set up my consumer to populate my store

var myStoreBuilder = Stores.keyValueStoreBuilder(
        Stores.inMemoryKeyValueStore("MY_STORE_NAME"),
        Serdes.String(),
        Serdes.String()
);

myStreamsBuilder().addGlobalStore(
        myStoreBuilder,
        "my_topic",
        Consumed.with(Serdes.String(), Serdes.String()),
        new DataGlobalProcessorSupplier()
);

Any tips and tricks are appreciated.

0

Your Reply

By clicking “Post Your Reply”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.