Search code examples
apache-kafkaapache-kafka-streamsktable

KStream - KTable Join not triggering


I’ve 2 topics(actually more but keeping it simple here) which I am joining using Streams DSL and once joined, publishing data to downstream. 

I am creating a KTable on top of Topic 1 and storing it into a named state store. Key for Topic1 looks like below:

{  sourceCode:"WXYZ",
    platformCode:"ABCD",
    transactionIdentifier:"012345:01:55555:12345000:1"
}

I am seeing the data in changelog topic as expected.  

There is a KStream on top of topic 2. Key for Topic2 looks like below:

{  sourceCode:"WXYZ",
   platformCode:"ABCD",
   transactionIdentifier:"012345:01:55555:12345000:1"
   lineIdentifier:"1"
}

  I am rekeying as well as aggregating data from topic 2 and putting it into another named state store as there is 1-Many relationship between data in topic1 and topic2.   After rekeying the data, key in topic 2 looks same as the key for topic 1. I can see both rekeyed data in the repartition topic as well as the aggregated data in changelog topic as expected. However, the join isn’t getting triggered.

Other key details – 


  1. Data in all topics is in Avro format.
  2. I am using Java/Spring Boot.
  3. I've left default settings on commit.interval.ms and cache.max.bytes.buffering

Any pointers to what I could be doing wrong here?

Edit 1: I looked into data partitions and looks like one ended up on 14 and the other 20. I also found a similar question.

Edit 2: The producer to topic1 and topic2 is a golang application. The streams restore consumer has following config:

partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]

The streams consumer has following config:

partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]


Solution

  • I am posting the answer below so that it helps others looking for nirvana from such issues. As pointed out in the comment section of the linked question, this was an issue due to the producer application.

    Producer application is written in golang and hence, its hashing is different than Java, which is what I am using for joining data using Streams DSL.

    Earlier, this is how I was reading the KTable which maintained the same partition as in in source topic:

    @Bean
    public KTable<MyKey, MyValue> myKTable(StreamsBuilder streamsBuilder) {
        return streamsBuilder.table(inputTopic1, Materialized.as(transactionStore));
    }
    

    I rewrote the code as below, to achieve desired result:

    @Bean
    public KTable<MyKey, MyValue> myKTable(StreamsBuilder streamsBuilder) {
    
        SpecificAvroSerde<MyKey> keySpecificAvroSerde = myKeySpecificAvroSerde();
        SpecificAvroSerde<MyValue> valueSpecificAvroSerde = mySpecificAvroSerde();
    
        streamsBuilder.stream(inputTopic1, Consumed.with(keySpecificAvroSerde, valueSpecificAvroSerde)).
                selectKey((key, value) -> new MyKey(key.get1(), key.get2(), key.get3())).
            to("dummyTopic", Produced.with(keySpecificAvroSerde, valueSpecificAvroSerde));
    
        return streamsBuilder.table("dummyTopic",
                Materialized.<MyKey, MyValue, KeyValueStore<Bytes, byte[]>>as("myStateStore").
                       withKeySerde(keySpecificAvroSerde).withValueSerde(valueSpecificAvroSerde));
    }