1

I'm using KafkaSink as the sink in my flink application and I require to send stringifiedJSONs to different Kafka topics based on some key-value pairs (for example, a few JSONs go to topic1 and a few other sinks to another topic, topic2 and so on). But I didn't find any way in documentation to configure the Kafka topic to be chosen based on incoming data stream. Can someone please help me with this?

NOTE: I'm using flink version 14.3

    DataStream<String> data = .....
    KafkaSink<String> sink = KafkaSink.<String>builder()
            .setBootstrapServers(parameter.get("bootstrap.servers"))
            .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                    .setTopic(parameter.get("kafka.output.topic"))
                    .setValueSerializationSchema(new SimpleStringSchema())
                    .build()
            )
            .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
            .build();
    data.sinkTo(sink);

2 Answers 2

3

I haven't tried this, but I believe that rather than using setTopic to hardwire the sink to a specific topic, you can instead implement the serialize method on a custom KafkaRecordSerializationSchema so that each ProducerRecord it returns specifies the topic it should be written to.

Another option would be to create a separate sink object for every topic, and then use a ProcessFunction that fans out to set of side outputs, each connected to the appropriate sink.

Sign up to request clarification or add additional context in comments.

1 Comment

thanks a lot. Writing custom KafkaRecordSerializationSchema resolved the issue.
0

I can sink output to multiple Kafka topics by implementing KafkaRecordSerializationSchema with a custom serialize method as suggested by @DavidAnderson. The code snippet is attached below.

public class CustomSchema implements KafkaRecordSerializationSchema<Tuple2<String,String>> {

private final String encoding = StandardCharsets.UTF_8.name();

@Override
public ProducerRecord<byte[], byte[]> serialize(Tuple2<String, String> input, KafkaSinkContext kafkaSinkContext, Long aLong) {
    String topic = input.f0;
    String data = input.f1;
    try {
        byte[] value = data==null ? null:data.getBytes(this.encoding);
        return new ProducerRecord<>(topic,value);
    } catch (UnsupportedEncodingException e) {
        throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + this.encoding);
    }
}

And I configured the Kafka sink to use this by setRecordSerializer method.

2 Comments

How about just using .setTopicSelector(Tuple2<String, String> input -> input.f0)
Maybe we can do that too.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.