Search code examples
apache-kafkascalatestapache-kafka-streamsembedded-kafka

Kafka Streams: mix-and-match PAPI and DSL KTable not co-partitioning


I have a mix-and-match Scala topology where the main worker is a PAPI processor, and other parts are connected through DSL.

EventsProcessor: 
INPUT: eventsTopic
OUTPUT: visitorsTopic (and others)

Data throughout the topics (incl. original eventsTopic) is partitioned through a, let's call it DoubleKey that has two fields. Visitors are sent to visitorsTopic through a Sink:

.addSink(VISITOR_SINK_NAME, visitorTopicName,
    DoubleKey.getSerializer(), Visitor.getSerializer(), visitorSinkPartitioner, EVENT_PROCESSOR_NAME)

In the DSL, I create a KV KTable over this topic:

val visitorTable = builder.table(
  visitorTopicName,
  Consumed.`with`(DoubleKey.getKafkaSerde(),
  Visitor.getKafkaSerde()),
  Materialized.as(visitorStoreName))

which I later connect to the EventProcessor:

topology.connectProcessorAndStateStores(EVENT_PROCESSOR_NAME, visitorStoreName)

Everything is co-partitioned (via DoubleKey). visitorSinkPartitioner performs a typical modulo operation:

Math.abs(partitionKey.hashCode % numPartitions)

In the PAPI processor EventsProcessor, I query this table to see if there are existent Visitors already.

However, in my tests (using EmbeddedKafka, but that should not make a difference), if I run them with one partition, all is fine (the EventsProcessor checks KTable on two events on same DoubleKey, and on the second event - with some delay - it can see the existent Visitor on the store), but if I run it with a higher number, the EventProcessor never sees the value in the Store.

However if I check the store via API ( iterating store.all()), the record is there. So I understand it must be going to different partition.

Since the KTable should work on the data on its partition, and everything is sent to the same partition, (using explicit partitioners calling the same code), the KTable should get that data on the same partition.

Are my assumptions correct? What could be happening?

KafkaStreams 1.0.0, Scala 2.12.4.

PS. Of course it would work doing the puts on the PAPI creating the store through PAPI instead of StreamsBuilder.table(), since that would definitely use the same partition where the code runs, but that's out of the question.


Solution

  • Yes, the assumptions were correct.

    In case it helps anyone:

    I had a problem when passing the Partitioner to the Scala EmbeddedKafka library. In one of the tests suites it was not done right. Now, following the everhealthy practice of refactoring, I have this method used in all the suites of this topology.

    def getEmbeddedKafkaTestConfig(zkPort: Int, kafkaPort: Int) : 
        EmbeddedKafkaConfig = {
        val producerProperties = Map(ProducerConfig.PARTITIONER_CLASS_CONFIG ->
            classOf[DoubleKeyPartitioner].getCanonicalName)
        EmbeddedKafkaConfig(kafkaPort = kafkaPort, zooKeeperPort = zkPort, 
            customProducerProperties = producerProperties)
    }