Search code examples
javaapache-kafkaspring-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

Use StreamBridge or kafkaStreams API method in spring cloud


I am using spring-cloud-stream project to use Kafka Streams and I am new to this. As it is recommended to use functional programming I have defined a function as blow:

@Configuration
public class StreamConfiguration {

    @Bean
    public Consumer<KStream<String, PaymentRequest>> orderProcess() {
         return stream ->
                 stream.mapValues(requestBuilderValueMapper, Named.as("ExtendedRequestBuilder"))
                .peek(paymentRequestDateValidatorAction, Named.as("DateValidator"))
                .branch(Named.as("RejectionCheck"), rejectionCheckPredicate, (key, value) -> true)
                ...
    }
}

The point is as I have multiple branches in the stream and each branch path has unique destination, it is not possible to use Function to use orderProcess-out-0: destination: name. I have noticed that I can use the Kafka Streams to(destination) like blow:

stream.mapValues(requestBuilderValueMapper, Named.as("ExtendedRequestBuilder"))
                .peek(paymentRequestDateValidatorAction, Named.as("DateValidator"))
                ...
                .to("destination");

or use the StreamBridge like this:

stream.mapValues(requestBuilderValueMapper, Named.as("ExtendedRequestBuilder"))
                .peek(paymentRequestDateValidatorAction, Named.as("DateValidator"))
                ...
                .peek((k,v) -> streamBridge("bindName-out-0", v));

Which one is the correct way to handle this scenario?

Does both methods preserves the Kafka Streams exactly_once_beta transactional mode or break it?

The difference I noticed is that the first method causes the destination number of partitions inferred from the Kafka broker server.properties, but the latter makes the spring create the topic with application.yml partitions config.


Solution

  • Yes, with the second one, Spring provisions the topic from the binding properties.

    Exactly once beta is supported by the second one, as long as the broker is 2.5 or higher and spring-kafka is 2.6.x (or 2.5.x with the container's EOSMode set to BETA.

    They are functionally the same.