Search code examples
kotlinapache-kafkaapache-kafka-streams

Kafka streams: groupByKey and reduce not triggering action exactly once when error occurs in stream


I have a simple Kafka streams scenario where I am doing a groupyByKey then reduce and then an action. There could be duplicate events in the source topic hence the groupyByKey and reduce The action could error and in that case, I need the streams app to reprocess that event. In the example below I'm always throwing an error to demonstrate the point.

It is very important that the action only ever happens once and at least once.

The problem I'm finding is that when the streams app reprocesses the event, the reduce function is being called and as it returns null the action doesn't get recalled. As only one event is produced to the source topic TOPIC_NAME I would expect the reduce to not have any values and skip down to the mapValues.

val topologyBuilder = StreamsBuilder()

    topologyBuilder.stream(
        TOPIC_NAME,
        Consumed.with(Serdes.String(), EventSerde())
    )
        .groupByKey(Grouped.with(Serdes.String(), EventSerde()))
        .reduce { current, _ ->
            println("reduce hit")
            null
        }
        .mapValues { _, v ->
            println(Id: "${v.correlationId}")
            throw Exception("simulate error")
        }

To cause the issue I run the streams app twice. This is the output:

First run

Id: 90e6aefb-8763-4861-8d82-1304a6b5654e
11:10:52.320 [test-app-dcea4eb1-a58f-4a30-905f-46dad446b31e-StreamThread-1] ERROR org.apache.kafka.streams.KafkaStreams - stream-client [test-app-dcea4eb1-a58f-4a30-905f-46dad446b31e] All stream threads have died. The instance will be in error state and should be closed.

Second run

reduce hit

As you can see the .mapValues doesn't get called on the second run even though it errored on the first run causing the streams app to reprocess the same event again.

Is it possible to be able to have a streams app re-process an event with a reduced step where it's treating the event like it's never seen before? - Or is there a better approach to how I'm doing this?


Solution

  • I was missing a property setting for the streams app.

    props["processing.guarantee"]= "exactly_once"
    

    By setting this, it will guarantee that any state created from the point of picking up the event will rollback in case of a exception being thrown and the streams app crashing.

    The problem was that the streams app would pick up the event again to re-process but the reducer step had state which has persisted. By enabling the exactly_once setting it ensures that the reducer state is also rolled back.

    It now successfully re-processes the event as if it had never seen it before