Search code examples
javaapache-kafkaapache-kafka-streamsspring-cloud-stream-binder-kafka

How to wait for future inside Kafka Stream map()?


I am implementing Spring Boot application in Java, using Spring Cloud Stream with Kafka Streams binder.

I need to implement blocking operation inside of KStream map method like so:

public Consumer<KStream<?, ?>> sink() {
    return input -> input
        .mapValues(value -> methodReturningCompletableFuture(value).get())
        .foreach((key, value) -> otherMethod(key, value));
}

completableFuture.get() throws exceptions (InterruptedException, ExecutionException)

How to handle these exceptions so that the chained method doesn't get executed and the Kafka message is not acknowledged? I cannot afford message loss, sending it to a dead letter topic is not an option.

Is there a better way of blocking inside map()?


Solution

  • You can try the branching feature in Kafka Streams to control the execution of the chained methods. For example, here is a pseudo-code that you can try. You can possibly use this as a starting point and adapt this to your particular use case.

    final Map<String, ? extends KStream<?, String>> branches = 
    input.split()
         .branch(k, v) -> {
            try {
              methodReturningCompletableFuture(value).get();
              return true;
            }
            catch (Exception e) {
              return false;
            }
          }, Branched.as("good-records"))
          .defaultBranch();
    
    final KStream<?, String> kStream = branches.get("good-records");
    
     kStream.foreach((key, value) -> otherMethod(key, value));
    

    The idea here is that you will only send the records that didn't throw an exception to the named branch good-records, everything else goes into a default branch which we simply ignore in this pseudo-code. Then you invoke additional chained methods (as this foreach call shows) only for those "good" records.

    This does not solve the problem of not acknowledging the message after an exception is thrown. That seems to be a bit challenging. However, I am curious about that use case. When an exception happens and you handle it, why don't you want to ack the message? The requirements seem to be a bit rigid without using a DLT. The ideal solution here is that you might want to introduce some retries and once exhausted from the retries, send the record to a DLT which makes Kafka Streams consumer acknowledges the message. Then the application moves on to the next offset.

    The call methodReturningCompletableFuture(value).get() simply waits until a default or configured timeout is reached, assuming that methodReturningCompletableFuture() returns a Future object. Therefore, that is already a good approach to wait inside the KStream map operation. I don't think anything else is necessary to make it wait further.