Search code examples
apache-kafkaapache-kafka-streams

Kafka streams topic merge message consumption order


I'm building a kafka streams application where several topics are merged into one.

The application seems to want to read all messages from the first topic, and only then proceed reading from the next one. I would like my application to switch reading from another topic after consuming 100 records.

Is it possible with kafka streams?

Edit: streams are merged using KStream::merge and are statefully transformed:

kstream1.merge(kstream2)
    .merge(kstream3)
    ...
    .merge(kstreamN)
    .transform(transformer, stateStore)
    .to(output)

The application is single-threaded and all topics have one partition.


Solution

  • Not sure what program you wrote, but the simplest way would be stream1.merge(stream2). For this case, data should be processed interleaved between both base on timestamp order.

    Using a custom timestamp extractor, you could maybe interleave between both in 100 records increments.