Search code examples
scalaapache-kafkaakkaakka-streamakka-kafka

Using reactive-kafka to conditionally process messages


I've been trying to use reactive-kafka, and I am having a problem with conditional processing, to which I didn't find a satisfying answer.

Basically I'm trying to consume one kafka topic which contains a huge number of messages (around 10 billion messages a day), and only process a few of those messages (a few thousands a day) based on some property of the message, then push the processed version of my message to another topic, and I am struggling to do that properly.

My first attempt was something like :

// This is pseudo code.
Source(ProducerSettings(...))
    .filter(isProcessable(_))
    .map(process(_))
    .via(Producer.flow(producerSettings))
    .map(_.commitScalaDsl())
    .runWith(Sink.ignore)

The problem with this approach is that I only commit when I read messages that I am able to process which is obviously not cool because if I have to stop and restart my program, then I have to re-read a bunch of useless messages, and since there are so much of them, I can't afford to do it that way.

I have then tried to use the GraphDSL by doing something around the lines of:

in ~> broadcast ~> isProcessable    ~> process ~> producer ~> merge ~> commit
   ~> broadcast ~>              isNotProcessable           ~> merge

This solution is obviously not good either because messages that I can't process go through the second branch of the graph and get committed before the processable messages are really pushed to their destination, which is kind of worse than the first message because it does not even guarantee at-least-once delivery.

Does anybody have an idea about how I could solve this problem ?


Solution

  • A way I used to solve a similar problem before was to exploit sequence number to guarantee ordering.

    For instance you could build a flow like the one you describe save the commit:

    in ~> broadcast ~> isProcessable ~> process ~> producer ~> merge ~> out
       ~> broadcast ~>            isNotProcessable          ~> merge
    

    And then wrap it into a order preserving flow like this one (taken from a library we developed in my company): OrderPreservingFlow. The resulting flow could then be sent to the committer sink.

    If your processing stage guarantee ordering you can even be more efficient and avoid any buffering by embedding the logic directly in your graph:

    in ~> injectSeqNr ~> broadcast ~> isProcessable ~> process ~> producer ~> mergeNextSeqNr ~> commit
                      ~> broadcast ~>             isNotProcessable         ~> mergeNextSeqNr
    

    Here your mergeNextSeqNr is just a modified merge stage where if input is available on port 1, you emit it immediately if its sequence number is the expected one, otherwise you just wait for data to be available on the other port.

    The end result should be exactly the same as using the flow wrapping above, but you might more easily adapt it to your needs if you embed it.