Search code examples
apache-kafkaspring-cloudavroapache-kafka-streamsspring-cloud-stream

How to ignore some kinds of messages in a Kafka Streams Application that reads and writes different event types from the same topic


Let's suppose a Spring Cloud Stream application creates a KStream from an order topic. It is interested in OrderCreated {"id":x, "productId": y, "customerId": z} events. Once one arrives, it processes it and generates an output event OrderShipped {"id":x, "productId": y, "customerName": <, "customerAddress": z} to the same order topic.

The problem I am facing is that since it reads and writes from/to the same topic, the Kafka Stream application is trying to process its own writes, which doesn't make sense.

How could I prevent this application from processing the events it generates?

UPDATE: As Artem Bilan and sobychako point out I had considered using KStream.filter() but there are some details that make me doubt about how to deal with this:

Right now the KStream application looks like this:

interface ShippingKStreamProcessor {
    ...
    @Input("order")
    fun order(): KStream<String, OrderCreated>

    @Output("output")
    fun output(): KStream<String, OrderShipped>

KStream Configuration

    @StreamListener
    @SendTo("output")
    fun process(..., @Input("order") order: KStream<Int, OrderCreated>): KStream<Int, OrderShipped> {

Both order and output bindings point at the order topic as destination.

OrderCreated class:

data class OrderCreated(var id: Int?, var productId: Int?, var customerId: Int?) {
    constructor() : this(null, null, null)
}

OrderShipped class

data class OrderShipped(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) {
    constructor() : this(null, null, null, null)
}

I am using JSON as message format so messages look like this:

  • INPUT - OrderCreated: {"id":1, "productId": 7,"customerId": 20}
  • OUTPUT - OrderShipped: {"id":1, "productId": 7, "customerName": "X", "customerAddress": "Y"}

I'm looking for the best approach to filter out unwanted messages considering this:

If I just use KStream.filter() right now, when I get {"id":1, "productId": 7, "customerName": "X", "customerAddress": "Y"} my KStream<Int, OrderCreated> would unmarshal an OrderShipped event as an OrderCreated object with some null fields: OrderCreated(id:1, productId: 7, customerId: null). Checking on null fields doesn't sound robust.

A possible solution could be to add another field, eventType = OrderCreated|OrderShipped, to every kind of message/class that uses that topic. Even in this case, I would end up having an OrderCreated class (remember KStream< Int,OrderCreated >) with an attribute eventType=OrderShipped. This looks like an ugly workaround. Any idea to improve it?

Is there another, more automatic way to deal with this? For instance, would another kind of serialization (AVRO?) prevent messages from being processed if they don't comply with the expected schema (OrderCreated)? This way of supporting multiple schemas (event types) in the same topic seems to be a good practice according to this article: https://www.confluent.io/blog/put-several-event-types-kafka-topic/ However it's not clear how unmarshall/deserialize different types.


Solution

  • You could use Kafka's record headers to store the type of the record. See KIP-82. You can set the headers in ProducerRecord.

    The processing would be as follows:

    1. Read a stream of type KStream<Integer, Bytes> with value serde Serdes.BytesSerde from the topic.
    2. Use KStream#transformValues() to filter and create the objects. More specifically, within transformValues() you can access the ProcessorContext which gives you access to the record headers that contain the information about the type of the record. Then:

      • If the type is OrderShipped, return null.
      • Otherwise create an OrderCreated object from the Bytes object and return it.

    For a solution with AVRO, you might want to have a look at the following docs