Search code examples
apache-kafkaapache-kafka-streams

Method of metadataForKey in Kafka Streams gives wrong information for multiple instances of application connected to the same group


I'm implementing mechanism which gives some metadata information by requesting it locally from store or requesting remote Kafka Streams instance.

I'm using Scala and kafka-streams-scala library of version 2.4.1

I'll try to give you small simple example of what I'm doing

  1. I'm running the Kafka cluster which creates 1 test topic with 2 partitions.
  2. As well I run 1 Kafka Streams instance as I mentioned above that implements mechanism of requesting local or remote metadata from store and it holds all the partitions information until no any other instances are connected to the same group.
  3. I push some records into test topic
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "1", "01"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "2", "02"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "3", "03"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "4", "04"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "5", "15"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "6", "16"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "7", "17"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "8", "18"));
  1. I run the second instance of Kafka Streams that connects to the same group, I see re-balancing and partitions reassignment process and as I understand well both applications should share after that partitions among themselves as for example Kafka Streams application 1 should work with partition 0 and Kafka Streams application 2 should work with partition 1 or viceversa after re-balancing and reassignment.

Next step to assure that Kafka Streams works in this way as I described in step 4 I'm running the following code.

val it: KeyValueIterator[String, String] = streams.store(TEST_REQUEST_STORE, QueryableStoreTypes.keyValueStore[String, String]).all()

while (it.hasNext) {
  val keyValue: KeyValue[String, String] = it.next();
  println(keyValue)
}

Very cool, I see what I expect. Kafka Stream that I run on localhost holds partition 1 after re-balancing and partition reassignment.

KeyValue(5, 15)
KeyValue(6, 16)
KeyValue(7, 17)
KeyValue(8, 18)

BUT when I run this small piece of code I see completely unexpected output from my point of view.

println(streams.metadataForKey(TEST_REQUEST_STORE, "1", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "2", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "3", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "4", stringSerializer))
println()
println(streams.metadataForKey(TEST_REQUEST_STORE, "5", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "6", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "7", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "8", stringSerializer))
println()
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}

StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}

As I understand well I should expect something like this

StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}

StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}

Solution

  • First of all I want to notice that metadataForKey gives you some information even if you don't have any records within store and seems that this information where key is hosted is random.

    I realized that problem was completely not related to the version but to serializers instead.

    I pushed records into topic from java using StringSerializer and from scala I tried to query metadata using Serdes.String.serializer() and it was giving me random results that didn't match the reality.

    I've been created another way of pushing data into topic using scala with GenericPrimitiveSerdeString key serializer and the same serializer for metadataForKey and to my surprise this time worked as expected.

    So, for those who will use metadataForKey pay attention to key serializers in order to have this method working properly