Search code examples
apache-kafkastreamingkafka-consumer-apiin-memory-databaseapache-kafka-streams

Kafka validate messages in stateful processing


I have an application where multiple users can send REST operations to modify the state of shared objects. When an object is modified, then multiple actions will happen (DB, audit, logging...).

Not all the operations are valid for example you can not Modify an object after it was Deleted.

Using Kafka I was thinking about the following architecture:

  1. Rest operations are queuing in a Kafka topic.
  2. Operations to the same object are going to the same partition. So all the object's operations will be in sequence and processed by a consumer
  3. Consumers are listening to a partition and validate the operation using an in-memory database
  4. If the operation was valid then is sent to a "Valid operation topic" otherways is sent to an "Invalid operation topic"
  5. Other consumers (db, log, audit) are listening to the "Valid operation topic"

I am not very sure about point number 3. I don't like the idea to keep the state of all my objects. (I have billions of objects and even if an object can be of 10mb in size, what I need to store to validate its state is just few Kbytes...)

However, is this a common pattern? Otherwise how can you verify the validity of certain operations?

Also what would do you use as a in-memory database? Surely it has to be highly available, fault-tolerant and support transaction (read and write).


Solution

  • I believe this is a very valid pattern, and is essentially a variation to an event-sourced CQRS pattern.

    For example, Lagom implements their CQRS persistence in a very similar fashion (although based on completely different toolset)

    A few points:

    • you are right about the need for sequencial operations: since all your state mutations need to be based on the result of the previous mutation, there must be a strong order in their execution. This is very often the case for such things, so we like to be able to scale those operations horizontally as much as possible so that each of those sequences operations is happening in parallel to many other sequences. In your case we have one such sequence per shared object.
    • Relying on Kafka partitioning by key is a good way to achieve that (assuming you do not set max.in.flight.requests.per.connection higher than the default value 1). Here again Lagom has a similar approach by having their persistent entity distributed and single-threaded. I'm not saying Lagom is better, I'm just comforting you in the fact that is approach is used by others :)

    • a key aspect of your pattern is the transformation of a Command into an Event: in that jargon a command is seen as a request to impact the state and may be rejected for various reasons. An event is a description of a state update that happened in the past and is irrefutable from the point of view of those who receive it: a event always tells the truth. The process you are describing would be a controller that is at the boundary between the two: it is responsible for transforming commands into events.

    • In that sense the "Valid operation topic" you mention would be an event-sourced description of the state updates of your process. Since it's all backed by Kafka it would be arbitrarily partionable and thus scalable, which is awesome :)
    • Don't worry about the size of the sate of all your object, it must sit somewhere somehow. Since you have this controller that transforms the commands into events, this one becomes the primary source of truth related to that object, and this one is responsible for storing it: this controller handles the primary storage for your events, so you must cater space for it. You can use Kafka Streams's Key value store: those are local to each of your processing instance, though if you make them persistent they have no problem in handling data much bigger that the available RAM. Behind the scene data is spilled to disk thanks to RocksDB, and even more behind the scene it's all event-sourced to a kafka topic so your state store is replicated and will be transparently re-created on another machine if necessary

    I hope this helps you finalise your design :)