In order to manage a long-running task with Spring Cloud Stream 3.1.1 with Kafka binder, we need to use a Pollable Consumer to manage the consumption manually in a separate thread so Kafka does not trigger the rebalance. To do that, we have defined a new annotation to manage Pollable Consumer. The issue with this approach is because the work needs to be managed in a separate thread any exception that is thrown won't end up in errorChannel
and DLQ eventually.
private final ExecutorService executor = Executors.newFixedThreadPool(1);
private volatile boolean paused = false;
@Around(value = "@annotation(pollableConsumer) && args(dataCapsule,..)")
public void handleMessage(ProceedingJoinPoint joinPoint,
PollableConsumer pollableConsumer, Object dataCapsule) {
if (dataCapsule instanceof Message) {
Message<?> message = (Message<?>) dataCapsule;
AcknowledgmentCallback callback = StaticMessageHeaderAccessor
.getAcknowledgmentCallback(message);
callback.noAutoAck();
if (!paused) {
// The separate thread is not busy with a previous message, so process this message:
Runnable runnable = () -> {
try {
paused = true;
// Call method to process this Kafka message
joinPoint.proceed();
callback.acknowledge(Status.ACCEPT);
} catch (Throwable e) {
callback.acknowledge(Status.REJECT);
throw new PollableConsumerException(e);
} finally {
paused = false;
}
};
executor.submit(runnable);
} else {
// The separate thread is busy with a previous message, so re-queue this message for later:
callback.acknowledge(Status.REQUEUE);
}
}
}
We can create a different output channel to publish the message in case of an exception but it feels we are trying to implement something that may not be necessary.
Update 1
We’ve added these beans:
@Bean
public KafkaTemplate<String, byte[]> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, byte[]> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"http://localhost:9092");
configProps.put(
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
KafkaAvroSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topicErr() {
return TopicBuilder.name("ERR").partitions(1).replicas(1).build();
}
@Bean
public SeekToCurrentErrorHandler eh(KafkaOperations<String, byte[]> template) {
return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(
template,
(cr, e) -> new TopicPartition("ERR", 1)),
new FixedBackOff(0L, 1L));
}
And enable-dlq
is not set in spring.cloud.stream.kafka.bindings.channel-name.consumer
But still we can’t see any messages being produced to the ERR topic.
Even for any exceptions thrown by the main thread.
If enable-dlq
is set to true, exceptions on the main thread will be published into the default dlq topic, and as expected, the ones on the child thread get ignored.
Update 2
The example Gary seems to be working in general. Although we needed to do some modification as we use the deprecated StreamListner approach instead of Functions, there are a few issues that we could not sort out with our case.
channel_name+.DLT
as we could not figure out how a different name like dlq
could be used. We are using a single dlq
topic for all consumers which does not seem to be how the Spring-kafka default DLT expects to have.max.attempts
and then kick in the DLQ part.this.endpoint.changeState("polled", State.PAUSED)
and this.endpoint.changeState("polled", State.RESUMED)
. Why do we need to do that in conjunction with pause, requeue, etc. What is the side effect of not doing that?Your observation is correct; the error handling is bound to the thread.
You could use a DeadLetterPublishingRecoverer
directly in your code to make publishing the DLQ a litter easier (instead of an output channel). That way, you'll get the enhanced headers with exception information etc.
https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters
EDIT
Here is an example; I am pausing the binding to prevent any new deliveries while the "job" is being run rather than requeuing the delivery, as you are doing.
@SpringBootApplication
@EnableScheduling
public class So67296258Application {
public static void main(String[] args) {
SpringApplication.run(So67296258Application.class, args);
}
@Bean
TaskExecutor exec() {
return new ThreadPoolTaskExecutor();
}
@Bean
DeadLetterPublishingRecoverer recoverer(KafkaOperations<Object, Object> template) {
return new DeadLetterPublishingRecoverer(template);
}
@Bean
NewTopic topic() {
return TopicBuilder.name("polled.DLT").partitions(1).replicas(1).build();
}
@Bean
MessageSourceCustomizer<KafkaMessageSource<?, ?>> customizer() {
return (source, dest, group) -> source.setRawMessageHeader(true);
}
}
@Component
class Handler {
private static final Logger LOG = LoggerFactory.getLogger(Handler.class);
private final PollableMessageSource source;
private final TaskExecutor exec;
private final BindingsEndpoint endpoint;
private final DeadLetterPublishingRecoverer recoverer;
Handler(PollableMessageSource source, TaskExecutor exec, BindingsEndpoint endpoint,
DeadLetterPublishingRecoverer recoverer) {
this.source = source;
this.exec = exec;
this.endpoint = endpoint;
this.recoverer = recoverer;
}
@Scheduled(fixedDelay = 5_000)
public void process() {
LOG.info("Polling");
boolean polled = this.source.poll(msg -> {
LOG.info("Pausing Binding");
this.endpoint.changeState("polled", State.PAUSED);
AcknowledgmentCallback callback = StaticMessageHeaderAccessor.getAcknowledgmentCallback(msg);
callback.noAutoAck();
// LOG.info(msg.toString());
this.exec.execute(() -> {
try {
runJob(msg);
}
catch (Exception e) {
this.recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), e);
}
finally {
callback.acknowledge();
this.endpoint.changeState("polled", State.RESUMED);
LOG.info("Resumed Binding");
}
});
});
LOG.info("" + polled);
}
private void runJob(Message<?> msg) throws InterruptedException {
LOG.info("Running job");
Thread.sleep(30_000);
throw new RuntimeException("fail");
}
}
spring.cloud.stream.pollable-source=polled
spring.cloud.stream.bindings.polled-in-0.destination=polled
spring.cloud.stream.bindings.polled-in-0.group=polled
EDIT2
Answers to the supplemental questions:
1, 2: See the Spring for Apache Kafka documentation: https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters
The DLPR has an alternate constructor enabling you to specify a destination resolver. The default just appends .DLT
and uses the same partition. The javadocs specify how the destination partition can be specified:
/**
* Create an instance with the provided template and destination resolving function,
* that receives the failed consumer record and the exception and returns a
* {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
* 0, no partition is set when publishing to the topic.
* @param template the {@link KafkaOperations} to use for publishing.
* @param destinationResolver the resolving function.
*/
When null
, the KafkaProducer
selects the partition.
RetryTemplate
with appropriate retry and backoff policies; thenretryTemplate.execute(context -> { ... },
context -> {...});
The second argument is a RecoveryCallback
, called when retries are exhausted.
poll()
until we resume the consumer. This allows us to keep the consumer alive by polling it, but without the overhead of retrieving and resetting the offset.