Search code examples
apache-kafkaapache-kafka-streams

KStream Join with GlobalKTable over non-key values


I am trying to join KStream with GlobalKTable, the join is not completely on keys.

GlobalKTable<String, Employee> employeesDetails = builder.globalTable("EMPLOYEE_TOPIC",..);
KStream<String,String> empIdOverLoginUserId = builder.stream("LOG_TOPIC", ….);

I want to join empIdOverLoginUserId with employeesDetails over empIdOverLoginUserId's value over employeesDetails's key.

Any clue?


Solution

  • Second parameter of KStream-GlobalKTable join is a KeyValueMapper to map KStream record (key-value) to the key of GlobalKTable you want to join, you can use this to use empIdOverLoginUserId's value as key when joining with GlobalKTable:

    empIdOverLoginUserId.join(
                    employeesDetails, 
                    (userKey, userValue) -> userValue, 
                    (userValue, employeesDetailValue) -> employeesDetailValue)