I am trying to write a transform function which will consume an input from one topic and produce two outputs to topics Left and Right. Moreover I need this to take place in a transaction so that if the application fails to produce a message to Right, it will roll back its attempt to Left. Furthermore, Kafka should re-attempt delivery to the transform's consumer in the event of failure so that the application has multiple chances to recover from transient errors. However, if the error is chronic, I need the application to give up after some number (let's say 3) attempts, at which point the message should be delivered to a DLQ.
I understand how to achieve bounded retry, DLQ, and transactions using the imperative model. That is, I can use Function<IN, OUT>
and get 90% of the way to a solution. However, to the best of my understanding, Function<IN, Tuple2<OUT1, OUT2>>
is not a supported signature at the moment, and I have to use the reactive programming model to deliver to multiple topics. Furthermore, automatic DLQ is not, as I understand it, part of the framework for reactive consumers. It also does not look like transactions are managed the same way, and consumer offset commits appear to be automatic instead of conditional on the success or failure of the stream processing.
Can someone advise how I can write a consumer which:
This does not need to be reactive to answer my question.
The following is what I have tried so far, adapted from my initial single-output imperative function that met the other requirements. It is intended to consume integers one at a time from the input topic, create an audit_log database record within a transaction (if that fails, roll everything back and try again), transform the number to two json payloads labeled "left" or "right," then publish the json to outputs 0 and 1 from the signature. I was trying to use output 3 for the DLQ, since errors from a reactive stream are not automatically DLQ'd as with the imperative model. This is exploratory code just to learn the ropes, so there are random chances to throw RTEs in order to exercise failure scenarios.
In the presence of an exception, the current message is lost without retry. If the application publishes one of two messages and the latter is interrupted by an exception, the former is nonetheless committed (no transaction rollback seems to take place). Finally, because all errors are dropped, they never reach my code to try to deliver them to a DLQ.
@Bean
@Transactional
public Function<Flux<Message<Integer>>, Tuple3<Flux<String>, Flux<String>, Flux<Object>>>
numberToJson(AuditLogRepository repository) {
var random = new Random();
// I tried to create two sinks, one for each output topic. This works fine.
var left = Sinks.many().unicast().<Integer>onBackpressureBuffer();
var right = Sinks.many().unicast().<Integer>onBackpressureBuffer();
// I tried to use this sink for the DLQ destination. I was hoping to manually shuffle UE's to the DLQ.
var dlq = Sinks.many().unicast().onBackpressureBuffer();
return flux -> {
// Do a database operation (in a transaction).
var persistent =
flux.map(
message -> {
var n = message.getPayload();
repository.createIfNotExists("Transformed n=" + n);
if (random.nextDouble() < 0.3) {
LOGGER.error("Transformer failure on n=" + n);
throw new RuntimeException("Transformer failure on n=" + n);
}
return message;
})
.doOnNext(
message -> {
var n = message.getPayload();
left.tryEmitNext(n).orThrow();
// If only one side fails to publish, we want the txn to roll back.
if (random.nextDouble() < 0.1) {
LOGGER.error("Failed to publish right-side JSON: n=" + n);
throw new RuntimeException("Failed to publish right-side JSON: n=" + n);
}
right.tryEmitNext(n).orThrow();
})
.retry(3) // Make 3 attempts overall to process and publish. If that fails, continue
// to the DLQ.
.onErrorContinue(
(error, message) -> {
dlq.tryEmitNext(message).orThrowWithCause(error);
})
.retry() // If DLQ fails and this flux crashes, always restart it. The failed message
// will be redelivered.
.publish()
.autoConnect(3);
// Split the "persistent" flux into three, which map to separate kafka topics and DLQ.
return Tuples.of(
left.asFlux()
.doOnSubscribe(_sub -> persistent.subscribe())
.map(n -> toJson(n, "left"))
.retry(),
right
.asFlux()
.doOnSubscribe(_sub -> persistent.subscribe())
.map(n -> toJson(n, "right"))
.retry(),
dlq.asFlux().doOnSubscribe(_sub -> persistent.subscribe()).retry());
};
}
Finally, here are the relevant bits of my application.yml
. I have some configuration left over from the imperative attempt, such as the Kafka DLQ setup. I've omitted the publishers and consumers which are earlier or later in the stream, because I know those are working fine.
spring:
cloud:
stream:
kafka:
binder:
transaction:
transaction-id-prefix: 'tx-'
producer:
configuration:
retries: 3
acks: all
bindings:
numberToJson-in-0:
consumer:
enableDlq: true
numberToJson-out-0:
producer:
topic.properties:
retention.bytes: 10000
bindings:
numberToJson-in-0:
destination: tx-number
group: numberToJson
consumer:
# Not sure how this interacts with the flux retries, if at all.
maxAttempts: 2
properties:
isolation.level: read_committed
numberToJson-out-0:
destination: tx-json-left
producer:
partitionCount: 3
numberToJson-out-1:
destination: tx-json-right
numberToJson-out-2:
# Manually wiring the function's 3rd output to a DLQ.
destination: error.tx-number.numberToJson
function:
definition: numberToJson
[EDIT] This is the imperative consumer I had tried to equip with a Tuple2 signature, which otherwise works:
@Bean
@Transactional
public Function<Integer, Tuple2<String, String>> numberToJson(
AuditLogRepository repository) {
var random = new Random();
return n -> {
LOGGER.info("Transforming n=" + n);
var left = "{ \"n\": \"" + n + "\", \"side\": \"left\" }";
var right = "{ \"n\": \"" + n + "\", \"side\": \"right\" }";
repository.createIfNotExists("Transformed n=" + n);
if (random.nextDouble() < 0.3) {
LOGGER.error("Transformer failure on n=" + n);
throw new RuntimeException("Transformer failure on n=" + n);
}
return Tuples.of(left, right);
};
}
But, this is met with the following exception at runtime:
Exception in thread "main" org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionInitializer' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.UnsupportedOperationException: At the moment only Tuple-based function are supporting multiple arguments
Thank you!
Use the StreamBridge
and add a configured AfterRollbackProcessor
.
The following example includes consumers on left
, right
, and input.DLT
.
@SpringBootApplication(proxyBeanMethods = false)
public class So68928091Application {
@Autowired
StreamBridge bridge;
public static void main(String[] args) {
SpringApplication.run(So68928091Application.class, args);
}
@Bean
Consumer<String> input() {
return str -> {
System.out.println(str);
this.bridge.send("left", str.toUpperCase());
this.bridge.send("right", str.toLowerCase());
if (str.equals("Fail")) {
throw new RuntimeException("test");
}
};
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
return (container, dest, group) -> {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
container.setAfterRollbackProcessor(rollbackProcessor);
};
}
DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
return new DefaultAfterRollbackProcessor<>(
new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("input.DLT").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topicLeft() {
return TopicBuilder.name("left").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topicRight() {
return TopicBuilder.name("right").partitions(1).replicas(1).build();
}
@KafkaListener(id = "left", topics = "left")
public void listenLeft(String in) {
System.out.println("left:" + in);
}
@KafkaListener(id = "right", topics = "right")
public void listenRight(String in) {
System.out.println("right:" + in);
}
@KafkaListener(id = "dlt", topics = "input.DLT")
public void listenDlt(String in) {
System.out.println("dlt:" + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
System.in.read();
template.send("input", "Fail".getBytes());
template.send("input", "Good".getBytes());
};
}
}
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right
#For @KafkaListeners
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.isolation-level=read-committed
Fail
...
Fail
...
Fail
...
Good
dlt:Fail
right:good
left:GOOD
Remove all DLT settings in the binding, and set maxAttempts
to 1 to disable retries there.