Search code examples
avroapache-kafka-streamsspring-cloud-streamkafka-producer-apiconfluent-schema-registry

Spring Cloud Stream Kafka Consumer application doesn't allow adding Supplier


I am working on a Spring Cloud Stream Kafka application. I have added only consumers to consume messages from topics and deliver them to a third party using FIX protocol.

It is working fine till this point, but now the third party sends back the response and I would like to produce them to a new topic. When I added a Supplier in my existing code, it starts behaving weirdly. bootstrap.servers config changes from remoteHost broker to localhost and started giving below error:

[AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established> Broker may not be available.

error would come if trying to connect localhost as there isn't any Kafka setup.

Below is my application.yml file:

spring.cloud.stream.function.definition: amerData;emeaData;ackResponse  #added new ackResponse here
spring.cloud.stream.kafka.streams:
  binder:
    brokers: remoteHost:9092
    configuration:
      schema.registry.url: remoteHost:8081
      default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
  bindings:
    ackResponse-out-0:           #new addition
      producer.configuration:
        key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
        value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer

spring.cloud.stream.bindings:
  amerData-in-0:
    destination: topic1
  emeaData-in-0:
    destination: topic2
  ackResponse-out-0:       #new addition
    destination: topic3

and tried possible options for Supplier -> Supplier<String> ackResponse() or Supplier<Message<String>> ackResponse() It only doesn't change remoteHost to localhost when I am doing Supplier<KStream<String,String>> ackResponse(), then bootstrap.servers show the configured remote one, but this isn't correct and I can't write the received response (mostly a string or json) like this to a Kafka topic.

I did configure my consumers as Consumer<KStream<String, AVROPOJO1>> amerData() and Consumer<KStream<String, AVROPOJO2>> emeaData() as per need & they work fine.

Am I missing or messing up something? Can't we have producer/consumer both in the same spring cloud stream application? Using Streambridge also couldn't solve this. Could someone help?


Solution

  • If you are adding a Supplier bean as you have done, it becomes a regular producer that is using the MessageChannel based Kafka binder. You need to add the regular Kafka binder in your project (spring-cloud-stream-binder-kafka). The bindings for that should be under spring.cloud.stream.kafka.bindings. I see that you have it defined above under spring.cloud.stream.kafka.streams.bindings. I wonder if that is the issue?