Search code examples
javaapache-kafkaapache-kafka-streams

Using kafka-streams with custom partitioner


I want to join a KStream with a KTable. Both have a different key but are co-partitioned using a custom partitioner. However, the join does not produce and results.

The KStream has the following structure
- key: House - Group
- value: User
The KTable has the following structure
- key: User - Group
- value: Address

To make sure every insert both topics are processed in insertion order, I'm using a custom Partitioner where I'm partitioning both topics using the Group part of each key.

I want to end up with a stream of the following structure:
- key: House - Group
- value: User - Address

For this I'm doing the following:

val streamsBuilder = streamBuilderHolder.streamsBuilder
val houseToUser = streamsBuilder.stream<HouseGroup, User>("houseToUser")
val userToAddress = streamsBuilder.table<UserGroup, Address>("userToAddress")
val result: KStream<HouseGroup, UserWithAddress> = houseToUser
        .map { k: HouseGroup, v: User ->
            val newKey = UserGroup(v, k.group)
            val newVal = UserHouse(v, k.house)
            KeyValue(newKey, newVal)
        }
        .join(userToAddress) { v1: UserHouse, v2: Address ->
            UserHouseWithAddress(v1, v2)
        }
        .map{k: UserGroup, v: UserHouseWithAddress ->
            val newKey = HouseGroup(v.house, k.group)
            val newVal = UserWithAddress(k.user, v.address)
            KeyValue(newKey, newVal)
        }

This expected a matching join but that did not work.

I guess the obvious solution is to join with a global table and let go of the custom partitioner. However, I still don't understand why the above would not work.


Solution

  • I think the lack of matching is caused because different partitioners are used.

    For your input topic CustomPartitioner is used. Kafka Streams be default uses org.apache.kafka.clients.producer.internals.DefaultPartitioner.

    In your code just before KStream::join you have called KStream::map. KStream::map function enforced repartitioning before KStream::join. During repartioning messages are flushed to Kafka ($AppName-KSTREAM-MAP-000000000X-repartition topic). To spread messages Kafka Streams uses defined partitioner (property: ProducerConfig.PARTITIONER_CLASS_CONFIG). Summarizing: messages with same keys might be in different partition for "repartition topic" and for "KTable topic"

    Solution in your case will be set your custom partition in properties for your Kafka Streams application (props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.example.CustomPartitioner")

    For debugging you can check repartition topic ($AppName-KSTREAM-MAP-000000000X-repartition). Messages with same keys like input topic might be in different partitions (different number)

    Documentation about Join co-partitioning requirements