I'm using spring cloud function to process data from kafka with Flux. By default it processes data in the consumer thread (where message is consumed). I'm going to implement threadpool for paraller data processing and throttling and there is a great implementation in Spring Cloud Integration called executorChannel (https://docs.spring.io/spring-integration/api/org/springframework/integration/channel/ExecutorChannel.html)
Function implementation example:
public static class FN1 implements Function<Flux<String>, Flux<String>> {
public Flux<String> apply(Flux<String> data) {
return data
.map(f -> doSomething() )
}
}
So I found no simple way to connect functions implemented that way via executorChannel.
M.b. there is a way to define inputChannel type?
UPD: Read comments under Oleg's answer. They are very useful.
You mean something like this?
@SpringBootApplication
public class SampleFunctoinAppApplication {
public static void main(String[] args) throws Exception {
ApplicationContext context = SpringApplication.run(SampleFunctoinAppApplication.class, args);
SubscribableChannel output = context.getBean("output", SubscribableChannel.class);
output.subscribe(System.out::println);
MessageChannel channel = context.getBean("executorChannel", MessageChannel.class);
channel.send(new GenericMessage<String>("hello"));
}
@Bean
public IntegrationFlow flow() {
return IntegrationFlows
.from("executorChannel")
.transform(echo())
.channel("output")
.get();
}
@Bean
public ExecutorChannel executorChannel() {
return new ExecutorChannel(Executors.newCachedThreadPool());
}
public Function<String, String> echo() {
return v -> v;
}
}
What do you mean by "define inputChannel type"?