I am trying to integrate spring cloud stream with spring cloud function webflux
as they are deprecating spring cloud reactive streams in future releases I am trying to use spring cloud functions https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.1.2.RELEASE/single/spring-cloud-stream.html#spring-cloud-stream-preface-notable-deprecations
Spring cloud web function can expose an end point of its function with paths like in the doc
from cloud stream I can see the source needs to be defined as supplier https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.1.2.RELEASE/single/spring-cloud-stream.html#_spring_cloud_function
but my use case is to get POST data from reactive http end point and ingest into kafka, is there any way achieving it from spring cloud function web and spring cloud stream ?
from the doc for spring cloud function with spring cloud stream
@SpringBootApplication
@EnableBinding(Source.class)
public static class SourceFromSupplier {
public static void main(String[] args) {
SpringApplication.run(SourceFromSupplier.class, "--spring.cloud.stream.function.definition=date");
}
@Bean
public Supplier<Date> date() {
return () -> new Date(12345L);
}
}
and if i run this i can see date is getting inserted into kafka every 1 second and if i call the get endpoint for supplier like localhost:/8080/date results in a date response, is there any way of injecting the paylaod from post to kafka with spring cloud function ?
There is an issue which your question helped to discover and it has to do with lifecycle inconsistency between auto-configurations provided by function and stream. The issue manifests itself in a way that the rest point created by Spring Cloud Functions can not see the bindings as it is created much earlier
So we'll address the issue shortly. Meanwhile there is a workaround which would require you to access output
channel from the ApplicationContext (see below):
@SpringBootApplication
@EnableBinding(Source.class)
public class SimpleFunctionRabbitDemoApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(SimpleFunctionRabbitDemoApplication.class);
}
@Bean
public Consumer<String> storeSync(ApplicationContext context) {
return v -> {
MessageChannel channel = context.getBean(Source.OUTPUT, MessageChannel.class);
channel.send(MessageBuilder.withPayload(v).build());
};
}
}