I'm trying to have a simple foreign key join in Kafka Streams similar to many articles (like this for one: https://www.confluent.io/blog/data-enrichment-with-kafka-streams-foreign-key-joins/).
When I try to join the user id
(primary key of user table) with the foreign key user_id
in the account_balance
table to produce an AccountRecord
object, I get the following error:
[-StreamThread-1] ignJoinSubscriptionSendProcessorSupplier : Skipping record due to null foreign key.
The goal is ultimately to deliver the AccountRecord
's to a topic each time any field in either table update. The problem is that when I simply print the user table and the account table separately, the foreign keys and all fields are totally populated. I can't see what's wrong or why this error occurs. Here is a snippet of my code:
public void start_test(){
StreamsBuilder builder = new StreamsBuilder();
KTable<Long, User> userTable = builder.table(USER_TOPIC, Consumed.with(CustomSerdes.UserPKey(), CustomSerdes.User()));
KTable<Long, AccountBalance> accountBalanceTable = builder.table(ACCOUNT_BALANCE_TOPIC, Consumed.with(CustomSerdes.UserPKey(), CustomSerdes.AccountBalance()));
final KTable<Long, AccountRecord> accountRecordTable = accountBalanceTable.join(
userTable,
AccountBalance::getUserId,
(account, user) -> new AccountRecord(user.getFirstName(), account.getBalance());
);
// print the table
accountRecordTable
.toStream()
.print(Printed.toSysOut());
KafkaStreams stream = new KafkaStreams(builder.build(), properties);
stream.start();
}
Any guidance will be helpful. I didn't include the custom serde code or the object shapes, but they are very simple. Please let me know if you need additional clarification.
Thanks
Do your messages contain key record? A KTable is an abstraction of a changelog stream, where each data record represents an update, The way to know that update is with the key, is very important the key of the record at the moment to work with KTables. E.g
AccountBalance<Key=10,Value={accountBalanceId=10,userId=777,balance=10}>
User<Key=777, Value={firstName="Panchito"}>
Another observation is your Serde key, why are you using a custom serde if you are defining Long as key?
KTable<Long, User> userTable = builder.table(USER_TOPIC, Consumed.with(Serdes.Long(), CustomSerdes.User()));
KTable<Long, AccountBalance> accountBalanceTable = builder.table(ACCOUNT_BALANCE_TOPIC, Consumed.with(Serdes.Long(), CustomSerdes.AccountBalance()))
Maybe your key deserializer is sending the key as null. Check the output of your custom Serde loggin the output. Also you have to improve the join method adding a materialized because you are creating a new object and Kafka does not know how to handle the new object.
final KTable<Long, AccountRecord> accountRecordTable = accountBalanceTable.join(
userTable,
AccountBalance::getUserId,
(account, user) -> new AccountRecord(user.getFirstName(), account.getBalance()),
Materialized.with(Serdes.Long(), CustomSerdes.AccountBalanceSerde() )
);
Try to work with JsonSerde or Avro instead to create your custom Serdes.