Search code examples
apache-kafkaksqldbcdc

Using ksqlDB to implement CDC using multiple event types in a single topic?


I have the following situation where I have an Apache Kafka topic containing numerous record types.

For example:

  1. UserCreated
  2. UserUpdated
  3. UserDeleted
  4. AnotherRecordType
  5. ...

I wish to implement CDC on the three listed User* record types such that at the end, I have an up-to-date KTable with all user information.

How can I do this in ksqlDB? Since, as far as I know, Debezium and other CDC connectors also source their data from a single topic, I at least know it should be possible.

I've been reading through the Confluent docs for a while now, but I can't seem to find anything quite pertinent to my use case (CDC using existing topic). If there is anything I've overlooked, I would greatly appreciate a link to the relevant documentation as well.

I assume that, at the very least, the records must have the same key for ksqlDB to be able to match them. So my questions boil down to:

  1. How would I teach ksqlDB which is an insert, an update and a delete?
  2. Is the key matching a hard requirement, or are there other join/match predicates that we can use?

One possibility that I can think of is basically how CDC already does it: treat each incoming record as a new entry so that I can have something like a slowly changing dimension in the KTable, grouping on the key and selecting entries with e.g. the latest timestamp.

So, is something like the following:

CREATE TABLE users AS
    SELECT user.user_id,
           latest_by_offset(user.name) AS name,
           latest_by_offset(user.email),
           CASE WHEN record.key = UserDeleted THEN true ELSE FALSE END,
           user.timestamp,
           ...
    FROM users
    GROUP BY user.user_id
    EMIT CHANGES;

possible (using e.g. ROWKEY for record.key)? If not, how does e.g. Debezium do it?


Solution

  • The general pattern is to not have different schema types; just User. Then, the first record of any unique key (userid, for example) is an insert. Afterwards any non null values for the same key are updates (generally requiring all fields to be part of the value, effectively going a "replace" operation in the table). Deletes are caused by sending null values for the key (tombstone events).

    If you have multiple schemas, it might be better to create a new stream that nulls out any of the delete events, unifies the creates and updates to a common schema that you want information for, and filter event types that you want to ignore.

    how does e.g. Debezium do it?

    For consuming data coming from Debezium topics, you can use a transform to "extract the new record state". It doesn't create any tables for you.