Search code examples
javaapache-kafkaapache-kafka-streamsstream-processing

How to forward records to multiple Kafka Stream child Processors at once?


In Kafka Stream API, is it possible to forward more than one record at once to different child processors ? For an example, let say we have a parent processor called Processor-Parent and two child processors, Child-1, Child-2.

When Processor-Parent receives a record to process, I would like to do the following.

new_record = create_new_record(current_record)
context.forward(new_record, To(Child-1))
context.forward(old_record, To(Child-2))

Is this a good practice to forward records like this ?


Solution

  • It depends on your requirenments:

    • If your logic is straight forward you can even use Kafka Streams DSL.

    • If it is a little more complex and you need Procesor API, but you want pass same records to two Processors you can do it like @Sameer Killamsetty mentioned.

    builder = new TopologyBuilder();
        builder.addSource(SOURCE, kafkaTopic)
    .addProcessor("child1", () -> new child1(), SOURCE)
    .addProcessor("child2", () -> new child2(), SOURCE);
    
    • If it is more complex and depends on some logic in Processor you want to pass message to different Processor node you can do that.
    builder = new TopologyBuilder();
        builder.addSource(SOURCE, kafkaTopic)
    .addProcessor("InputProcessor", () -> new InputProcessor(), SOURCE)
    .addProcessor("child1", () -> new child1(), "InputProcessor")
    .addProcessor("child2", () -> new child2(), "InputProcessor");
    
    public class InputProcessor extends AbstractProcessor<String, String> {
        @Override
        public void process(String key, String value) {
            try {
                context().forward(key, Integer.parseInt(value), To.child("child1"));
                context().forward(key, value, To.child("child2"));
            }
            catch (NumberFormatException nfe) {
                context().forward(key, value, To.child("child2"));
            }
        }
    }