We have a topology reading from input topic (with binder: x - broker address: x) and the records are processed and written to output topic (with binder: y - broker address: y) using spring cloud stream kafka streams. Records are not written to output topic. But when I set the binders (broker addresses) same (with both x or y) the records are written to topic y. Should I use the same broker in a topology? I need to use different binders and brokers for the input and output topics? How can I solve this?
Error : 2021-06-17 12:17:21.483 WARN 20848 --- [read-1-producer] o.a.k.c.NetworkClient : [Producer clientId=inputTopic-32100000000000000000015-f0bd5423-e670-43e8-ab0b-84ec5505c2fd-StreamThread-1-producer] Error while fetching metadata with correlation id 182 : {inputTopic=UNKNOWN_TOPIC_OR_PARTITION}
Application.yml
spring:
cloud:
stream:
kafka:
streams:
bindings:
doob-output-topic-out:
applicationId: doob-output-topic-out
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: tr.com.havelsan.doob.cloud.framework.service.track.base.core.serde.BaseDataSerde
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
doob-input-topic-in:
consumer:
applicationId: doob-input-topic-in
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: tr.com.havelsan.doob.cloud.framework.service.track.base.core.serde.BaseDataSerde
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
binders:
outputKafka:
type: kstream
environment:
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: ${1kafka.brokers1}
autoCreateTopics: true
autoAddPartitions: true
minPartitionCount: 8
configuration:
commit.interval.ms: 1000
inputKafka:
type: kstream
environment:
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: ${2kafka.brokers2}
autoCreateTopics: true
autoAddPartitions: true
minPartitionCount: 8
configuration:
commit.interval.ms: 1000
max:
request:
size: 20000000
bindings:
doob-output-topic-out:
destination: outputTopic
binder: outputKafka
producer:
partition-count: 8
doob-input-topic-in:
destination: inputTopic
binder: inputKafka
manage:
storeName: trackList15
Source Code :
@StreamListener(BASE_TOPIC_INPUT)
@SendTo(BASE_TOPIC_OUTPUT)
public KStream<String, BaseData> consumeTrackFromSynchronization(KStream<String, BaseData> baseDataStream) {
return baseDataStream.filter((s, baseData) -> BaseUtil.getTrackType(baseData).equals(BaseTypeEnum.FK)).groupByKey()
.reduce((baseData, s1) -> s1, Materialized.<String, BaseData, KeyValueStore<Bytes, byte[]>>as(storeName)
.withKeySerde(Serdes.String()).
withValueSerde(baseDataSerde)).toStream()
.peek((s, baseData) -> baseServiceHelper.processBase(baseData, BaseTypeEnum.FK));
}
Within a single Kafka Streams processor, it is not possible to read from a cluster and write to another cluster. However, within a single application (JVM), you can have multiple processors, each of them interacting with a single Kafka cluster.
See this thread for more details.
One workaround using Spring Cloud Stream is the following in your case.