Search code examples
javaspring-kafkaspring-aopspring-cloud-streamspring-cloud-stream-binder-kafka

Java - Exceptions ignored while processing on separate thread within annotation


We have an annotation which allows us to consume Kafka messages using a Polled Consumer. It's designed for long-running jobs so that one thread is processing the message, while the other remains available for polling to prevent Kafka from thinking our service has failed and rebalancing the consumers.

We're using Spring AOP.

Class:

@Aspect
@Component
@Slf4j
public class PollableStreamListenerAspect {

  private final ExecutorService executor = Executors.newFixedThreadPool(1);

  private volatile boolean paused = false;

  @Around(value = "@annotation(pollableStreamListener) && args(dataCapsule,..)")
  public void receiveMessage(ProceedingJoinPoint joinPoint,
      PollableStreamListener pollableStreamListener, Object dataCapsule) {
    if (dataCapsule instanceof Message) {
      Message<?> message = (Message<?>) dataCapsule;
      AcknowledgmentCallback callback = StaticMessageHeaderAccessor
          .getAcknowledgmentCallback(message);
      callback.noAutoAck();

      if (!paused) {
        // The separate thread is not busy with a previous message, so process this message:
        Runnable runnable = () -> {
          try {
            paused = true;

            // Call method to process this Kafka message
            joinPoint.proceed();

            callback.acknowledge(Status.ACCEPT);
          } catch (Throwable e) {
            callback.acknowledge(Status.REJECT);
            throw new PollableStreamListenerException(e);
          } finally {
            paused = false;
          }
        };

        executor.submit(runnable);
      } else {
        // The separate thread is busy with a previous message, so re-queue this message for later:
        callback.acknowledge(Status.REQUEUE);
      }
    }
  }

}

We only want to process one message at a time, so the paused flag is used to decide whether the message should be processed now, or re-queued for later.

The Runnable is used to process the message (using joinPoint.proceed()), then acknowledge the message back to Kafka. This is done in a separate thread via the executor.

I've found that if an exception is thrown inside the runnable that it is caught inside the catch statement, but then is not propagated when we throw the new PollableStreamListenerException exception, meaning that the failure does not end up in the DLQ.

I believe this is because the execution is happening on another thread, and the main thread has moved on in its execution (not waiting the second thread to process the message), and is therefore no longer able to propagate to the DLQ. However, I might be mistaken with this since I'm not very familiar with how multi-threading works.

I've tried modifying the executor.submit(runnable) to executor.submit(runnable).get(), which solves the issue. However it results in blocking the execution of the main thread until the other thread has finished executing, which means that the main thread is no longer available to poll for new messages. In effect this makes our Kafka consumer no longer a pollable consumer, which defeats the whole purpose of having the annotation.

Does anyone know if it's achievable to have the main thread continuing to run and poll for messages, and at the same time to have exceptions thrown inside the runnable propagated to the DLQ?

Thanks in advance for your help.


To provide a bit more context, we use the annotation as follows:

  @PollableStreamListener
  public void submitDeletion(Message<?> received) {
// Process message
}

The submitDeletion method is called whenever a new message is received on the Pollable message source. We check for new messages using a @Schedule:

 @Scheduled(fixedDelayString = "${app.pollable-consumer.time-interval}")
  public void pollForDeletionRequest() {
    log.trace("Polling for new messages");
    cleanupInput.poll(cleanupSubmissionService::submitDeletion);
  }

Update: attempting to use CompletableFuture

As per the comment made by @kriegaex, I've attempted to use a CompletableFuture. I simplified my example so that it is more of a POC.

@Aspect
@Component
@Slf4j
public class PollableStreamListenerAspect {

  private final ExecutorService executor = Executors.newFixedThreadPool(1);

  private volatile boolean paused = false;

  @Around(value = "@annotation(pollableStreamListener) && args(dataCapsule,..)")
  public void receiveMessage(ProceedingJoinPoint joinPoint,
      PollableStreamListener pollableStreamListener, Object dataCapsule) {
    if (dataCapsule instanceof Message) {
      Message<?> message = (Message<?>) dataCapsule;
      AcknowledgmentCallback callback = StaticMessageHeaderAccessor
          .getAcknowledgmentCallback(message);
      callback.noAutoAck();

      if (!paused) {
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
          log.info("Start execution logging");
          try {
            Thread.sleep(10000);
          } catch (Exception e) {
            log.error("Error while sleeping", e);
          }
          log.info("End execution logging");
          throw new RuntimeException("Throwing exception to force handle statement");
        }, executor).handle((s, t) -> {
          log.info("Inside handle block:");

          if (t != null) {
            log.info(t.toString());
            throw new RuntimeException(t);
          }
          return null;
        });

        try {
          completableFuture.join();
        } catch (Exception e) {
          log.error("Error while doing join()", e);
        }

        callback.acknowledge(Status.ACCEPT);
      } else {
        // The separate thread is busy with a previous message, so re-queue this message for later:
        callback.acknowledge(Status.REQUEUE);
      }
    }
  }

}

I run the function inside the CompletableFuture using my ExecutorService instance. It throws an exception, which is handled inside the .handle() block.

My first test did not have the following lines of code, and I found that the function ran on another thread without blocking the main thread from polling, but the exception thrown inside of handle() was not propagated to the DLQ. Lines of code missing:

try {
          completableFuture.join();
        } catch (Exception e) {
          log.error("Error while doing join()", e);
        }

Then I added those lines of code in, and found that it starts to block execution of the main thread while waiting for the completableFuture to finish running.

In short, the behaviour of using CompletableFuture seems to be the same as what I found for the initial Runnable for my purposes.


Solution

  • As a last resort, I ended up handling the error inside the Runnable by publishing the error manually to the DLQ, rather than relying upon Spring Cloud Stream to handle this for me. The end result works, although it is obviously not an ideal solution.