I would like to use both Processor API And DSL in a single Kafka streams application. In addition, how to build and run multiple topologies in a single application ( like 1 using processor API And other using DSL. )
You can easily mix DSL and processor API.
How I understand you would like to build processing graph using both of those two methods, to do that for DSL you can call StreamsBuilder::stream
, and for Processor API you call StreamsBuilder::build()
to get Topology
and than apply function to add Processor, etc.
Source code will be something like that:
StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("input1").to("output1");
Topology topology = builder.build();
topology.addSource("inputNode","input2");
topology.addProcessor("processor1", InputProcessor::new, "inputNode");
topology.addSink("sink1", "output2", "processor1");
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
EDIT1:
You can build two topologies with DSL, running in parallel and listening to different topics. It can be done as @Matthias J. Sax mentioned with KStream::transform(...)
, KStream::transformValues(...)
, and KStream::process(...)
. Code will be something like that:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> input1 = builder.<String, String>stream("input1").transform(SampleTransformer1::new);
KStream<String, String> input2 = builder.<String, String>stream("input2").transform(SampleTransformer2::new);