Search code examples
apache-kafkaapache-kafka-streams

KafkaStreams multiple streams in same application


I'm trying to make a practical design decision based on convention and plausibility with KafkaStreams.

Let's say that I have two different events that I want to place into KTables. I have a producer sending these messages to a KStream that is listening to that topic.

From what I can tell I cannot use conditional forwarding for messages using KafkaStreams, so if the stream is subscribe to many topics (one for each of the above messages, for example) I can only call stream.to on a single sink topic - otherwise, I would have to do something like call foreach on the stream and send messages with a KProducer to the sink topics.

The above suggests using a single stream. I thought I could set up multiple streams in the same app, each listening to a topic, mapping and forwarding to a table sink, but everytime I try to create two instances of KafkaStreams, only the first initialized subscribes to its topic - the other gets a warning from the client that its topic has no subscriptions.

Can I set up multiple streams in the same app? If so, are there any special requirements?

    class Stream(topic: String) {
      val props: Option[Map[String, String]] = Some(TopicProps.get(topic))
      val streamsBuilder = new StreamsBuilder
      val topics = new util.ArrayList[String]
      topics.add(props.get("topic"))

      val stream: KStream[String, String] = configureStream(streamsBuilder, topics, props.get("sink"))

      def configureStream(builder: StreamsBuilder, topics: java.util.List[String], sink: String): KStream[String, String] = {
        builder.stream[String, String](
          topics,
          Consumed.`with`(String(), String())
        )
      }

      def init(): KafkaStreams = {
        val streams = new KafkaStreams(streamsBuilder.build(), KafkaConfig.streamConfig)

        streams.start()

        streams
      }
    }

    class Streams() {

      val eventStream = new Stream("first_event") //looking good!
      val eventStream2 = new Stream("second_event") // no subscribers
      //if I switch the other of these, eventStream2 is subscribed to and eventStream is dead in the water
      val streams: KafkaStreams = eventStream.init()
      val streams2: KafkaStreams = eventStream2.init()

    }

stream config

    val streamConfig: Properties = {
        val properties = new Properties()
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-application")
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BrokerHost)
        properties
    }

I'd also love any alternatives suggested


Solution

  • From what I can tell I cannot use conditional forwarding for messages

    Do you know about KStream#split() (KStream#branch() in order versions)? It basically the same as conditional forwarding.

    I thought I could set up multiple streams in the same app, each listening to a topic, mapping and forwarding to a table sink,

    This should work as follows:

    StreamsBuilder builder = new SteamsBuilder();
    KStream stream1 = builder.stream("topic1");
    KStream stream2 = builder.stream("topic2");
    
    stream1.to("table1-topic");
    stream2.to("table2-topic");
    

    but everytime I try to create two instances of KafkaStreams, only the first initialized subscribes to its topic - the other gets a warning from the client that its topic has no subscriptions.

    Not sure. This should work. Maybe you can share your code?