Let's assume a practical case:
We have a customer service that publishes CustomerCreated/CustomerUpdated
events to a customer Kafka topic.
A shipping service listens to an order topic
When an OrderCreated
event is read by the shipping service, it will need access to the customer address. Instead of making a REST call to the customer service, shipping service will already have the user information available locally. It is kept in a KTable
/GlobalKTable
with persistent storage.
My questions are about how we should implement this: we want this system to be resilient and scalable so there will be more than one instance of the customer and shipping services, meaning there will also be more than one partition for the customer and order topics.
We could find scenarios like this: An OrderCreated(orderId=1, userId=7, ...)
event is read by shipping service but if it uses a KTable
to keep and access the local user information, the userId=7
may not be there because the partition that handles that userId could have been assigned to the other shipping service instance.
Offhand this problem could be solved using a GlobalKTable
so that all shipping service instances have access to the whole range of customers.
Is this (GlobalKTable
) the recommended approach to implement that pattern?
Is it a problem to replicate the whole customer dataset in every shipping service instance when the number of customers is very large?
Can this/should this case be implemented using KTable
in some way?
You can solve this problem with both a GKTable
and a KTable
. The former data structure is replicated so the whole table is available on every node (and uses up more storage). The latter is partitioned so the data is spread across the various nodes. This has the side effect that, as you say, the partition that handles the userId may not also handle the corresponding customer. You solve this problem by repartitioning one of the streams so they are co-partitioned.
So in your example you need to enrich Order events with Customer information in the Shipping Service. You can either:
a) Use a GlobalKTable
of Customer information and join to that on each node
b) Use a KTable
of Customer information and perform the same operation, but before doing the enrichment you must rekey using the selectKey()
operator to ensure the data is co-partitioned (i.e. the same keys will be on the same node). You also have to have the same number of partitions in the Customer and Orders topics.
The Inventory Service Example in the Confluent Microservices Examples does something similar. It rekeys the stream of orders so they are partitioned by productId, then joins to a KTable
of Inventory (also keyed by productId).
Regarding your individual questions:
Is GlobalKTable
the recommended approach to implement that pattern?
Both work. The GKTable
has a longer worst-case reload time if your service loses storage for whatever reason. The KTable
will have a slightly greater latency as data has to be repartitioned, which means writing the data out to Kafka and reading it back again.
Is it a problem to replicate the whole customer dataset in every shipping service instance when the amount of customers is very large?
The main difference is the aforementioned worst-case reload time. Although technically GKTable
and KTable
have slightly different semantics (GKTable
load fully on startup, KTable
load incrementally based on event-time, but that's not strictly relevant to this problem)
Can this/should this case be implemented using KTable
in some way?
See above.
See also: Microservice Examples, Quick start, Blog Post.