Search code examples
javaapache-kafkaapache-kafka-streams

How to perform "reverse lookup" on GlobalKTable?


Setup

I have worked with kafka for some time, but I am rather new to the concept of Kafka Streams API and streams processing in general.

Imagine a kafka streams application that performs encoding and decoding by using a lookup table which is provided by an additional kafka topic. The business topics are all co-partitioned and have 5 partitions. The Lookup mapping stream is a change data stream - it is compacted and has only one partition.

+------------------------+        /-------\       +--------------------------+
| "Plain request" stream |   ->  | Encoder |  ->  | "Encoded request" stream |     -----+
+------------------------+        \-------/       +--------------------------+          |
                                                                                        |
                                      ^                                                 |
                                      |                                                 v
+-------------------------+
| "Lookup mapping" stream |  ->    (Cache)                               (Some other app)
+-------------------------+
                                      |                                                 |
                                      v                                                 |
                                                                                        |
+-------------------------+       /-------\       +---------------------------+         |
| "Plain response" stream |  <-  | Decoder |  <-  | "Encoded response" stream |     <---+
+-------------------------+       \-------/       +---------------------------+

Let's assume that the encoding is ASCII ('a' -> 97, 'b' -> 98, etc.) and that the Lookup mapping topic uses the letter as key and that the same topic contains both letter and code in the payload.

My current draft is to use LeftJoin on a GlobalKTable (which materializes the Lookup mapping stream):

final var mappingTable = streamsBuilder.globalTable(LOOKUP_MAPPINGS, ...);

streamsBuilder.stream(PLAIN_REQUESTS, ...)
  .leftJoin(mappingTable, (key, value) -> value.getPlain(), EncoderHelper::encode)
  .to(ENCODED_REQUESTS, ...);

Problem

Encoding works as expected. But what about decoding?

The materialized GlobalKTable contains data like this:

key value
'a' 'a', 97
'b' 'b', 98
... ...

For encoding, the LeftJoin operation could perform a lookup based on the plain value - which is used as the key for the table. But for decoding, the LeftJoin operation needs to perform a lookup based on the encoded value (i.e. 97) which is not the key in the table.

I don't see how the decoding should be designed.

Alternatives / Ideas

Consume the topic via kafka consumer API, store data and perform encoding / decoding "manually". This works but has some pitfalls (need to make sure that all data is read before stream processing starts). I don't like this solution very much as it feels like reinventing the wheel.

I realize that I could create another topic (outside my program) which uses the encoded value as the key and then replicate the solution above. But this implies two topics with strong coupling and feels generally wrong.

I also thought about repartitioning the topic in my program before materializing it into a GlobalKTable. Essentially the same concept as the one before but this time, the additional topic only needs to live within my program. This would still lead to duplicated mappings and therefore an increased memory footprint. I could live with that as there is only a small amount of mappings in my use case (~1000 or so). This still feels suboptimal (and I also don't think that it is an intended use case for the GlobalKTable).

I could do repartitioning and use a KTable. But this seems complex due to co-partitioning requirements and also feels wrong.

Question

I am looking for a hint on how such a bidirectional lookup system could/should be implemented in accordance to kafka streams API design principles. How would more experienced streams-people solve this?

NOT my question

Similar but not the same:

Data Enrichment using kafka streams, KStream-GlobalKtable Join: Imagine that I wanted to make a lookup from "john" to XDFER in their scenario.

KStream Join with GlobalKTable over non-key values: The plain value ('a') is not present in the encoded response stream. So it cannot be used as key.


Solution

  • Using the DSL, you provided your answer already. You would need two topics and two global-tables.

    If you are willing to accept some per-hit, you could fall back to the Processor API and use a "global store" though (it's basically the same thing as a global-table, but the API is more flexible). Instead of using join() you implement a custom Processor (or Transformer; depending what version of Kafka Streams you are using), and inside the process() function, you can do a full table scan on the global store (instead of just a key-lookup) to find the right entry. -- Given that you don't store a lot of data, you could even cache a reverse mapping in-memory to avoid scanning the full store all the time (of course, you get a cache invalidation problem if the global store gets an update...)