In Kafka Stream API, is it possible to forward more than one record at once to different child processors ? For an example, let say we have a parent processor called Processor-Parent and two child processors, Child-1, Child-2.
When Processor-Parent receives a record to process, I would like to do the following.
new_record = create_new_record(current_record)
context.forward(new_record, To(Child-1))
context.forward(old_record, To(Child-2))
Is this a good practice to forward records like this ?
It depends on your requirenments:
If your logic is straight forward you can even use Kafka Streams DSL.
If it is a little more complex and you need Procesor API, but you want pass same records to two Processors you can do it like @Sameer Killamsetty mentioned.
builder = new TopologyBuilder();
builder.addSource(SOURCE, kafkaTopic)
.addProcessor("child1", () -> new child1(), SOURCE)
.addProcessor("child2", () -> new child2(), SOURCE);
builder = new TopologyBuilder();
builder.addSource(SOURCE, kafkaTopic)
.addProcessor("InputProcessor", () -> new InputProcessor(), SOURCE)
.addProcessor("child1", () -> new child1(), "InputProcessor")
.addProcessor("child2", () -> new child2(), "InputProcessor");
public class InputProcessor extends AbstractProcessor<String, String> {
@Override
public void process(String key, String value) {
try {
context().forward(key, Integer.parseInt(value), To.child("child1"));
context().forward(key, value, To.child("child2"));
}
catch (NumberFormatException nfe) {
context().forward(key, value, To.child("child2"));
}
}
}