Search code examples
javaapache-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

Is multiple brokers possible in one Kafka Streams Topology(Spring Cloud Stream)?


We have a topology reading from input topic (with binder: x - broker address: x) and the records are processed and written to output topic (with binder: y - broker address: y) using spring cloud stream kafka streams. Records are not written to output topic. But when I set the binders (broker addresses) same (with both x or y) the records are written to topic y. Should I use the same broker in a topology? I need to use different binders and brokers for the input and output topics? How can I solve this?

Error : 2021-06-17 12:17:21.483 WARN 20848 --- [read-1-producer] o.a.k.c.NetworkClient : [Producer clientId=inputTopic-32100000000000000000015-f0bd5423-e670-43e8-ab0b-84ec5505c2fd-StreamThread-1-producer] Error while fetching metadata with correlation id 182 : {inputTopic=UNKNOWN_TOPIC_OR_PARTITION}

Application.yml

spring:
  cloud:
    stream:
      kafka:
        streams:
          bindings:
            doob-output-topic-out:
              applicationId: doob-output-topic-out
              producer:
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: tr.com.havelsan.doob.cloud.framework.service.track.base.core.serde.BaseDataSerde
                topic:
                  properties:
                    retention.bytes: 300000000
                    segment.bytes: 300000000
            doob-input-topic-in:
              consumer:
                applicationId: doob-input-topic-in
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: tr.com.havelsan.doob.cloud.framework.service.track.base.core.serde.BaseDataSerde
                topic:
                  properties:
                    retention.bytes: 300000000
                    segment.bytes: 300000000
      binders:
        outputKafka:
          type: kstream
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    streams:
                      binder:
                        brokers: ${1kafka.brokers1}
                        autoCreateTopics: true
                        autoAddPartitions: true
                        minPartitionCount: 8
                        configuration:
                          commit.interval.ms: 1000
        inputKafka:
          type: kstream
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    streams:
                      binder:
                        brokers: ${2kafka.brokers2}
                        autoCreateTopics: true
                        autoAddPartitions: true
                        minPartitionCount: 8
                        configuration:
                          commit.interval.ms: 1000
                          max:
                            request:
                              size: 20000000
      bindings:
        doob-output-topic-out:
          destination: outputTopic
          binder: outputKafka
          producer:
            partition-count: 8
        doob-input-topic-in:
          destination: inputTopic
          binder: inputKafka

manage:
  storeName: trackList15

Source Code :

    @StreamListener(BASE_TOPIC_INPUT)
    @SendTo(BASE_TOPIC_OUTPUT)
    public KStream<String, BaseData> consumeTrackFromSynchronization(KStream<String, BaseData> baseDataStream) {
        return baseDataStream.filter((s, baseData) -> BaseUtil.getTrackType(baseData).equals(BaseTypeEnum.FK)).groupByKey()
                .reduce((baseData, s1) -> s1, Materialized.<String, BaseData, KeyValueStore<Bytes, byte[]>>as(storeName)
                        .withKeySerde(Serdes.String()).
                                withValueSerde(baseDataSerde)).toStream()
                .peek((s, baseData) -> baseServiceHelper.processBase(baseData, BaseTypeEnum.FK));
    }

Solution

  • Within a single Kafka Streams processor, it is not possible to read from a cluster and write to another cluster. However, within a single application (JVM), you can have multiple processors, each of them interacting with a single Kafka cluster.

    See this thread for more details.

    One workaround using Spring Cloud Stream is the following in your case.

    1. Have your Kafka Streams processor consume and produce to the same cluster.
    2. Then, write another simple processor with regular message channel based Kafka binder (not the Kafka Streams binder). In this model, you can apply the multi-binder pattern you have above, i.e. the input is receiving from the topic that you wrote to in the Kafka Streams processor, and then the output is going to the topic in the other cluster. This processor simply becomes a passthrough processor that moves data from cluster 1 to cluster 2.