Search code examples

Apache Beam KafkaIO - Write to Multiple Topics

Currently, I'm working on Apache Beam Pipeline implementation which consumes data from three different Kafka topics, and after some processing, I create three types of objects adding those data taken from the above-mentioned Kafka topics. Finally, it is required to publish those three objects into three different Kafka topics. It is possible to read from multiple topics using withTopics method in but I did not find a KafkaIO feature to write to multiple topics.

I would like to get some advice on how to do this in the most ideal way, appreciate it if anyone can provide some code examples.


  • You can do that with 3 different sinks on a PCollection, example :

        private transient TestPipeline pipeline = TestPipeline.create();
        public void kafkaIOSinksTest(){
            PCollection<String> inputCollection = pipeline.apply(Create.of(Arrays.asList("Object 1", "Object 2")));
            inputCollection.apply(KafkaIO.<Void, String>write()
                    .withValueSerializer(new StringSerializer())
            inputCollection.apply(KafkaIO.<Void, String>write()
                    .withValueSerializer(new StringSerializer())
            inputCollection.apply(KafkaIO.<Void, String>write()
                    .withValueSerializer(new StringSerializer())

    In this example, the same PCollection is sinked in 3 different topics, via multi sinks.