Search code examples
javajava-8spring-cloud-streamspring-cloud-function

Spring Cloud Stream Function : Invoke Function<T,R> via REST call and output it to a KAFKA Topic


I have simple @Bean (Java 8 Functions) which are mapped to a destination topic (-out and -in).

@Bean
public Function<String, String> transform() {
    return payload -> payload.toUpperCase();
}

@Bean
public Consumer<String> receive() {
    return payload -> logger.info("Data received: " + payload);
}

.yml config:

spring:
  cloud:
    stream:
      function:
        definition: transform;receive
      bindings:
        transform-out-0:
          destination: myTopic
        receive-in-0:
          destination: myTopic

Now, I want to invoke the transform function via a REST call so that it's output goes to the destination topic (i.e. transform-out-0 mapped to myTopic) and is picked up by the consumer from this destination (receive-in-0 mapped to myTopic). Basically, every REST call should spawn a new instance of a KAFKA Producer and close it.

How can I achieve this please using spring-cloud-stream ?

Thanks

Angshuman


Solution

  • You should use StreamBridge instead of having that transform function. This is the new recommended approach for dynamic destinations in Spring Cloud Stream. Here is the basic idea:

    @Autowired
    private StreamBridge streamBridge;
    
    @RequestMapping
    public void delegateToSupplier(@RequestBody String body) {
        streamBridge.send("transform-out-0", body);
    }
    

    and then provide this property through configuration - spring.cloud.stream.source: transform

    Spring Cloud Stream will create an output binding called transform-out-0 for you. Each time the REST endpoint is called, through StreamBridge, you will send the data to the destination topic.

    For more info see this.