I am working on a Spring Cloud Stream Kafka application. I have added only consumers to consume messages from topics and deliver them to a third party using FIX protocol.
It is working fine till this point, but now the third party sends back the response and I would like to produce them to a new topic. When I added a Supplier in my existing code, it starts behaving weirdly. bootstrap.servers
config changes from remoteHost broker to localhost and started giving below error:
[AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established> Broker may not be available.
error would come if trying to connect localhost as there isn't any Kafka setup.
Below is my application.yml file:
spring.cloud.stream.function.definition: amerData;emeaData;ackResponse #added new ackResponse here
spring.cloud.stream.kafka.streams:
binder:
brokers: remoteHost:9092
configuration:
schema.registry.url: remoteHost:8081
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
ackResponse-out-0: #new addition
producer.configuration:
key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
spring.cloud.stream.bindings:
amerData-in-0:
destination: topic1
emeaData-in-0:
destination: topic2
ackResponse-out-0: #new addition
destination: topic3
and tried possible options for Supplier -> Supplier<String> ackResponse()
or Supplier<Message<String>> ackResponse()
It only doesn't change remoteHost to localhost when I am doing Supplier<KStream<String,String>> ackResponse()
, then bootstrap.servers show the configured remote one, but this isn't correct and I can't write the received response (mostly a string or json) like this to a Kafka topic.
I did configure my consumers as Consumer<KStream<String, AVROPOJO1>> amerData()
and Consumer<KStream<String, AVROPOJO2>> emeaData()
as per need & they work fine.
Am I missing or messing up something? Can't we have producer/consumer both in the same spring cloud stream application? Using Streambridge
also couldn't solve this. Could someone help?
If you are adding a Supplier
bean as you have done, it becomes a regular producer that is using the MessageChannel
based Kafka binder. You need to add the regular Kafka binder in your project (spring-cloud-stream-binder-kafka
). The bindings for that should be under spring.cloud.stream.kafka.bindings
. I see that you have it defined above under spring.cloud.stream.kafka.streams.bindings
. I wonder if that is the issue?