Search code examples
javaapache-kafkaapache-kafka-streamsksqldb

Joining and enriching Kafka topic with in memory data (Dictionary, Hashmap, Dataframe)?


Let us say we have the follwoing

  1. A Kafka topic X which represents events happening on an entity X.
  2. The Entity X contains a foreign key to another entity Y

I want to enrich this topic X with data from outside Kafka, (i.e. from a CSV file that contains all the entities Y).

The solution I have now is as follows:

  1. Load the CSV in a memory in a dictionary-like structure to make the key-based lookups very fast.
  2. Start consuming from topic X, enrich the data in memory and then write the enriched records back to a new Kafka topic.

I am still evaluating if Kafka streams or Ksql can do the same for me,

My question is there an efficient way to do this with Kafka streams library or KSQL without losing performance?


Solution

  • Sure, you can do something like this

    final Map m = new Hashmap();
    builder.stream(topic).mapValues(v -> m.get(v)).to(out);
    

    But Kafka Streams is ideally going to be distributed, and your CSV would therefore need to be synced across multiple machines.

    Rather than building a map, use a KeyValueStore (this can also be in memory, but using RocksDB is more fault tolerant) via a KTable and use Kafka Connect Spooldir connector to load the CSV to a topic , build a table from that, then join only topics