Search code examples
spring-webfluxspring-cloud-streamspring-cloud-function

Is there an integration of spring cloud function webflux + spring cloud stream with http source


I am trying to integrate spring cloud stream with spring cloud function webflux

as they are deprecating spring cloud reactive streams in future releases I am trying to use spring cloud functions https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.1.2.RELEASE/single/spring-cloud-stream.html#spring-cloud-stream-preface-notable-deprecations

Spring cloud web function can expose an end point of its function with paths like in the doc

https://cloud.spring.io/spring-cloud-static/spring-cloud-function/1.0.0.RELEASE/single/spring-cloud-function.html

from cloud stream I can see the source needs to be defined as supplier https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.1.2.RELEASE/single/spring-cloud-stream.html#_spring_cloud_function

but my use case is to get POST data from reactive http end point and ingest into kafka, is there any way achieving it from spring cloud function web and spring cloud stream ?

from the doc for spring cloud function with spring cloud stream

@SpringBootApplication
@EnableBinding(Source.class)
public static class SourceFromSupplier {
    public static void main(String[] args) {
        SpringApplication.run(SourceFromSupplier.class, "--spring.cloud.stream.function.definition=date");
    }
    @Bean
    public Supplier<Date> date() {
        return () -> new Date(12345L);
    }
}

and if i run this i can see date is getting inserted into kafka every 1 second and if i call the get endpoint for supplier like localhost:/8080/date results in a date response, is there any way of injecting the paylaod from post to kafka with spring cloud function ?


Solution

  • There is an issue which your question helped to discover and it has to do with lifecycle inconsistency between auto-configurations provided by function and stream. The issue manifests itself in a way that the rest point created by Spring Cloud Functions can not see the bindings as it is created much earlier

    So we'll address the issue shortly. Meanwhile there is a workaround which would require you to access output channel from the ApplicationContext (see below):

    @SpringBootApplication
    @EnableBinding(Source.class)
    public class SimpleFunctionRabbitDemoApplication {
    
      public static void main(String[] args) throws Exception {      
        SpringApplication.run(SimpleFunctionRabbitDemoApplication.class);
      }
    
      @Bean
      public Consumer<String> storeSync(ApplicationContext context) {
         return v -> {
            MessageChannel channel = context.getBean(Source.OUTPUT, MessageChannel.class);
            channel.send(MessageBuilder.withPayload(v).build());
         };
      }
    }