Search code examples
spring-kafkaspring-cloud-streamspring-cloud-stream-binder-kafkadlq

Using SendToDlqAndContinue in Function without using processor spring-cloud-stream-binder-kafka-streams


I'm trying to send message to DLQ when the message i'm processing in my Function doesn't respect some logic.

For some context, I have this configuration in my application.yaml My function looks like this :

function:
  definition: filterConsumption|EnrichConsumption;
#Functions and topic binding
stream:
  bindings:
    filterConsumptionEnrichConsumption-in-0:
      destination: input
    filterConsumptionEnrichConsumption-out-0:
      destination: output
  kafka:
    streams:
      bindings:
        filterConsumptionEnrichConsumption-in-0:
          consumer:
            enable-dlq: true
            dlqName: input_dlq
            application-id: input-application-id
        filterConsumptionEnrichConsumption-out-1:
          consumer:
            enable-dlq: false
            application-id: output-application-id
      binder:
        #Kafka consumer config
        replicationFactor: ${KAFKA_STREAM_REPLICATION_FACTOR:1}
        brokers: ${BOOTSTRAP_SERVERS_CONFIG:localhost:9092}
        deserialization-exception-handler: sendToDlq

And my function looks like this :

@Bean("EnrichConsumption")
public Function<KStream<String, ConsumptionSchema>, KStream<String, ConsumptionSchema>> EnrichConsumption() {

    return input ->
            input.filter((key, consumptions) -> !getSomething(consumptions).orElse("").isBlank())
                    .merge(
                            //filter consumptions having a tradingName
                            input.filter((key, consumptions) -> getSomething(consumptions).orElse("").isBlank())
                                    //enrich consumptions with missing tradingName
                                    .mapValues(this::setSomething)
                    );

}

During the "setSomething", some exceptions could occurr due to logic rules.

I tried two things : First using StreamBridge, but I keep getting the following error :

Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

I tried to configure the streamBridge as such :

private final StreamBridge streamBridge;

@Bean("EnrichConsumption")
public Function<KStream<String, ConsumptionSchema>, KStream<String, ConsumptionSchema>> EnrichConsumption() {
    return input ->
        input.filter((key, consumptions) ->
                !getSomething(consumptions).orElse("").isBlank())
            .merge(
                //filter consumptions having a tradingName
                input.filter((key, consumptions) ->
                        getSomething(consumptions).orElse("").isBlank())
                    //enrich consumptions with missing tradingName
                    .mapValues((ConsumptionSchema value) -> {
                        try {
                            return setSomething(value);
                        } catch (DlqException e) {
                            Message<ConsumptionSchema> mess = MessageBuilder.withPayload(value).build();
                            streamBridge.send("filterConsumptionEnrichConsumption-in-1", mess);
                            return null;
                        }
                    })

            );
}

I also tried using SendToDlqAndContinue, using a processor like this :

@Autowired
private final SendToDlqAndContinue dlqHandler;

@Bean("EnrichConsumption")
public Function<KStream<String, ConsumptionSchema>, KStream<String, ConsumptionSchema>> EnrichConsumption() {

    return input ->
        input.process(() -> new Processor<String, ConsumptionSchema, String, ConsumptionSchema>() {

            ProcessorContextImpl context;

            @Override
            public void init(ProcessorContext context) {
                this.context = (ProcessorContextImpl) context;
            }

            @Override
            public void process(Record<String, ConsumptionSchema> processInput) {
                input.filter((key, consumptions) ->
                        !getSomething(consumptions).orElse("").isBlank())
                    .merge(
                        //filter consumptions having a tradingName
                        input.filter((key, consumptions) ->
                                getSomething(consumptions).orElse("").isBlank())
                            //enrich consumptions with missing tradingName
                            .mapValues((ConsumptionSchema value) -> {
                                try {
                                    return setSomething(value);
                                } catch (DlqException e) {
                                    log.error("Exception during handling of consumption message : {}, message : {}",
                                        processInput.key(), e.getMessage());
                                    dlqHandler.sendToDlq(
                                        new ConsumerRecord<>(
                                            context.topic(),
                                            context.partition(),
                                            context.offset(),
                                            processInput.key(),
                                            processInput.value()), e);
                                    return null;
                                }
                            })

                    );
            }

In this case, and I don't understand why, the process method doesn't seems to be called.

Anyone could help me make it works using either SendToDlqAndContinue (preferred solution) or StreamBridge ?

EDIT :

Using the same application.yaml as in the first part, I tried the DltAwareProcessor :

@Configuration
@Data
@Slf4j
public class BillableConsumptionFilterStreaming {
@Bean("filterConsumption")
public Function<KStream<String, ConsumptionSchema>,
    KStream<String, ConsumptionSchema>> filterConsumption(DltSenderContext dltSenderContext) {
    return input ->
        input.process(() ->
            new DltAwareProcessor<>(
                (BiFunction<String, ConsumptionSchema, KeyValue<String, ConsumptionSchema>>) (s, c) -> {
                throw new RuntimeException("Exception that won't kill stream");
            }, "input_dlq", dltSenderContext));
}

Using break point, the DltAwareProcessor is correctly called, until this line : streamBridge.send(this.dltDestination, r.value());

No exception is thrown or whatsoever but I get the following logs :

Using kafka topic for outbound: input_dlq
Node -1 disconnected.
Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

In my application, BOOTSTRAP_SERVERS_CONFIG is overridden with the adress of our kafka, and when there is not exception, the messages are correctly routed to output topic. So maybe I'm missing some configuration in my application.yaml to configure the broker for StreamBridge.


Solution

  • The SendToDlqAndContinue is built explicitly for deserialization exception handling purposes. You cannot use this for runtime error handling as the way you are trying. We recently (in the 4.1.0-SNAPSHOT line) a new feature, that may help with this use case. Please see the following issues for more details on that. https://github.com/spring-cloud/spring-cloud-stream/issues/2779 https://github.com/spring-cloud/spring-cloud-stream/issues/2802

    Here are the docs for this feature: https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka-streams-binder/error-handling.html#runtime-error-handling

    See if that works for your use case. If you find any room for improvements on this feature, please comment on the issue (2802) above, and we can still make it, as we will do the 4.1.0-RC1 release later this month.

    When using DltAwareProcessor, you must include the regular Kafka binder in addition to the Kafka Streams binder. By regular, I meant the message channel-based binder (spring-cloud-stream-binder-kafka) as StreamBridge requires that. You also need to set the proper configuration against this binder - for e.g, if you are using a non-default server/port, you need to set it on spring.cloud.stream.kafka.binder.brokers.