Search code examples
apache-kafkaapache-kafka-streamsevent-drivenevent-driven-design

Should Kafka event carried state transfer systems be implemented using a GlobalKTable for local queries?


The event carried state transfer removes the need to make remote calls to query information from other services.

Let's assume a practical case:

  1. We have a customer service that publishes CustomerCreated/CustomerUpdated events to a customer Kafka topic.

  2. A shipping service listens to an order topic

  3. When an OrderCreated event is read by the shipping service, it will need access to the customer address. Instead of making a REST call to the customer service, shipping service will already have the user information available locally. It is kept in a KTable/GlobalKTable with persistent storage.

My questions are about how we should implement this: we want this system to be resilient and scalable so there will be more than one instance of the customer and shipping services, meaning there will also be more than one partition for the customer and order topics.

We could find scenarios like this: An OrderCreated(orderId=1, userId=7, ...) event is read by shipping service but if it uses a KTable to keep and access the local user information, the userId=7 may not be there because the partition that handles that userId could have been assigned to the other shipping service instance.

Offhand this problem could be solved using a GlobalKTable so that all shipping service instances have access to the whole range of customers.

  1. Is this (GlobalKTable) the recommended approach to implement that pattern?

  2. Is it a problem to replicate the whole customer dataset in every shipping service instance when the number of customers is very large?

  3. Can this/should this case be implemented using KTable in some way?


Solution

  • You can solve this problem with both a GKTable and a KTable. The former data structure is replicated so the whole table is available on every node (and uses up more storage). The latter is partitioned so the data is spread across the various nodes. This has the side effect that, as you say, the partition that handles the userId may not also handle the corresponding customer. You solve this problem by repartitioning one of the streams so they are co-partitioned.

    So in your example you need to enrich Order events with Customer information in the Shipping Service. You can either: a) Use a GlobalKTable of Customer information and join to that on each node b) Use a KTable of Customer information and perform the same operation, but before doing the enrichment you must rekey using the selectKey() operator to ensure the data is co-partitioned (i.e. the same keys will be on the same node). You also have to have the same number of partitions in the Customer and Orders topics.

    The Inventory Service Example in the Confluent Microservices Examples does something similar. It rekeys the stream of orders so they are partitioned by productId, then joins to a KTable of Inventory (also keyed by productId).

    Regarding your individual questions:

    1. Is GlobalKTable the recommended approach to implement that pattern? Both work. The GKTable has a longer worst-case reload time if your service loses storage for whatever reason. The KTable will have a slightly greater latency as data has to be repartitioned, which means writing the data out to Kafka and reading it back again.

    2. Is it a problem to replicate the whole customer dataset in every shipping service instance when the amount of customers is very large? The main difference is the aforementioned worst-case reload time. Although technically GKTable and KTable have slightly different semantics (GKTable load fully on startup, KTable load incrementally based on event-time, but that's not strictly relevant to this problem)

    3. Can this/should this case be implemented using KTable in some way? See above.

    See also: Microservice Examples, Quick start, Blog Post.