Search code examples
springapache-kafkastreamcloud

Trying to publish object to Kafka using Spring cloud stream with Functional(Supplier) model


I am trying to publish object to Kafka topic using spring cloud stream with functional model. Here is the code snippet for my requirement.

Controller:

@PostMapping(path = "/publish")
public void publish(@RequestBody SampleObject obj) {
    service.publish(obj);
    log.info("Published Data {} successfully", obj.toString());
}

Service Class:

@Bean
public Supplier<Object> publish(SampleObject obj) {
    return () -> {
        log.info("posting data to kafka topic {}", obj);
        return obj;
    };
}

My requirement is I need to send request object from controller to service and publish that object to Kafka topic.

NOTE: I don't want to go with deprecated model.


Solution

  • You can consider using StreamBridge.

    Ref: https://docs.spring.io/spring-cloud-stream/docs/3.0.10.RELEASE/reference/html/spring-cloud-stream.html#_using_streambridge

    @SpringBootApplication
    @Controller
    public class WebSourceApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.source=toStream");
        }
    
        @Autowired
        private StreamBridge streamBridge;
    
        @RequestMapping
        @ResponseStatus(HttpStatus.ACCEPTED)
        public void delegateToSupplier(@RequestBody String body) {
            System.out.println("Sending " + body);
            streamBridge.send("toStream-out-0", body);
        }
    }