Search code examples
kafka-consumer-apiapache-kafka-streams

Kafka Streams : Multiple topologies in a single application


I would like to use both Processor API And DSL in a single Kafka streams application. In addition, how to build and run multiple topologies in a single application ( like 1 using processor API And other using DSL. )


Solution

  • You can easily mix DSL and processor API. How I understand you would like to build processing graph using both of those two methods, to do that for DSL you can call StreamsBuilder::stream, and for Processor API you call StreamsBuilder::build() to get Topology and than apply function to add Processor, etc.

    Source code will be something like that:

    StreamsBuilder builder = new StreamsBuilder();
    builder.<String, String>stream("input1").to("output1");
    Topology topology = builder.build();
    topology.addSource("inputNode","input2");
    topology.addProcessor("processor1", InputProcessor::new, "inputNode");
    topology.addSink("sink1", "output2", "processor1");
    
    KafkaStreams streams = new KafkaStreams(topology, props);
    streams.start();
    

    EDIT1:

    You can build two topologies with DSL, running in parallel and listening to different topics. It can be done as @Matthias J. Sax mentioned with KStream::transform(...), KStream::transformValues(...), and KStream::process(...). Code will be something like that:

    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> input1 = builder.<String, String>stream("input1").transform(SampleTransformer1::new);
    KStream<String, String> input2 = builder.<String, String>stream("input2").transform(SampleTransformer2::new);