Search code examples
javaapache-kafka-streamsapache-kafka-connectconfluent-platform

Kafka Streams KTable foreign key join not working as expected


I'm trying to have a simple foreign key join in Kafka Streams similar to many articles (like this for one: https://www.confluent.io/blog/data-enrichment-with-kafka-streams-foreign-key-joins/).

When I try to join the user id (primary key of user table) with the foreign key user_id in the account_balance table to produce an AccountRecord object, I get the following error: [-StreamThread-1] ignJoinSubscriptionSendProcessorSupplier : Skipping record due to null foreign key.

The goal is ultimately to deliver the AccountRecord's to a topic each time any field in either table update. The problem is that when I simply print the user table and the account table separately, the foreign keys and all fields are totally populated. I can't see what's wrong or why this error occurs. Here is a snippet of my code:

    public void start_test(){
        StreamsBuilder builder = new StreamsBuilder();

        KTable<Long, User> userTable = builder.table(USER_TOPIC, Consumed.with(CustomSerdes.UserPKey(), CustomSerdes.User()));
        KTable<Long, AccountBalance> accountBalanceTable = builder.table(ACCOUNT_BALANCE_TOPIC, Consumed.with(CustomSerdes.UserPKey(), CustomSerdes.AccountBalance()));

        final KTable<Long, AccountRecord> accountRecordTable = accountBalanceTable.join(
                userTable,
                AccountBalance::getUserId,
                (account, user) -> new AccountRecord(user.getFirstName(), account.getBalance());
        );

        // print the table
        accountRecordTable
                .toStream()
                .print(Printed.toSysOut());

        KafkaStreams stream = new KafkaStreams(builder.build(), properties);
        stream.start();
    }

Any guidance will be helpful. I didn't include the custom serde code or the object shapes, but they are very simple. Please let me know if you need additional clarification.

Thanks


Solution

  • Do your messages contain key record? A KTable is an abstraction of a changelog stream, where each data record represents an update, The way to know that update is with the key, is very important the key of the record at the moment to work with KTables. E.g

    AccountBalance<Key=10,Value={accountBalanceId=10,userId=777,balance=10}>
    User<Key=777, Value={firstName="Panchito"}>
    

    Another observation is your Serde key, why are you using a custom serde if you are defining Long as key?

    KTable<Long, User> userTable = builder.table(USER_TOPIC, Consumed.with(Serdes.Long(), CustomSerdes.User()));
    
    KTable<Long, AccountBalance> accountBalanceTable = builder.table(ACCOUNT_BALANCE_TOPIC, Consumed.with(Serdes.Long(), CustomSerdes.AccountBalance()))
    

    Maybe your key deserializer is sending the key as null. Check the output of your custom Serde loggin the output. Also you have to improve the join method adding a materialized because you are creating a new object and Kafka does not know how to handle the new object.

          final KTable<Long, AccountRecord> accountRecordTable = accountBalanceTable.join(
                        userTable,
                        AccountBalance::getUserId,
                        (account, user) -> new AccountRecord(user.getFirstName(), account.getBalance()),
    Materialized.with(Serdes.Long(), CustomSerdes.AccountBalanceSerde() )
                );
    

    Try to work with JsonSerde or Avro instead to create your custom Serdes.