Search code examples
apache-kafkaapache-kafka-streamsdebezium

How to de-normalize data in Kafka?


I have a MySQL database with ~20 tables. The data is normalized.

Considering this example:

book -> book_authors <- authors

we try to stream the books info eg.:

{book_id:3, title='Red', authors:[{id:3, name:'Mary'}, {id:4, name:'John'}]}

An instance when we see a serious problem: if an author's name change, we have to re-generate all their books. I'm using Debezium to post the change log for each table in Kafka.

I am unable to find an elegant solution for data denormalization, eg. for adding it to ElasticSearch, MongoDb etc.

I identified two solutions, but both seem to fail:

  1. De-normalize data into a new MySQL table, at source, and use Debezium to stream only this new table. This might be not possible and we have to invest a lot of effort in changing the code of the source system.
  2. Join the streams in Kafka, though, I didn't manage to make it work. It seems that Kafka does not allow joining on a non-primary-key field. This seems a common situation with N-to-N relations.

Did anyone find a solution to data denormalization and publish data into a Kafka stream? This seems to be a common problem and I couldn't find any solution yet.


Solution

  • Try publishing the changes from Debezium to the topics book, book_authors and authors in its raw form, which creates three disjoint streams.

    Create a simple consumer application that subscribes to all three topics. Upon receiving a message on either topic, it queries the database to obtain the latest snapshot of the referenced entities, merges the data together, and publishes the denormalised version onto a new merged_book_authors topic. Downstream consumers can read directly from the merged topic.

    A minor variation of the above: rather than querying the database for each Debezium change, which may be slow, build a materialised view using a fast key-value or document store such as Redis. This is a little more work, but will (1) improve the throughput of the overall pipeline and (2) take the load off the system-of-record database.