Search code examples
javaspring-integrationspring-cloud-function

Using Spring Integration executorChannel with Spring Cloud Function


I'm using spring cloud function to process data from kafka with Flux. By default it processes data in the consumer thread (where message is consumed). I'm going to implement threadpool for paraller data processing and throttling and there is a great implementation in Spring Cloud Integration called executorChannel (https://docs.spring.io/spring-integration/api/org/springframework/integration/channel/ExecutorChannel.html)

Function implementation example:

public static class FN1 implements Function<Flux<String>, Flux<String>> {
  public Flux<String> apply(Flux<String> data) {
    return data
      .map(f ->  doSomething() )      
  }
}

So I found no simple way to connect functions implemented that way via executorChannel.

M.b. there is a way to define inputChannel type?

UPD: Read comments under Oleg's answer. They are very useful.


Solution

  • You mean something like this?

    @SpringBootApplication
    public class SampleFunctoinAppApplication  {
    
        public static void main(String[] args) throws Exception {
    
            ApplicationContext context = SpringApplication.run(SampleFunctoinAppApplication.class, args);
            SubscribableChannel output = context.getBean("output", SubscribableChannel.class);
            output.subscribe(System.out::println);
    
            MessageChannel channel = context.getBean("executorChannel", MessageChannel.class);
            channel.send(new GenericMessage<String>("hello"));
        }
    
        @Bean
        public IntegrationFlow flow() {
            return IntegrationFlows
                    .from("executorChannel")
                    .transform(echo())
                    .channel("output")
                    .get();
        }
    
        @Bean
        public ExecutorChannel executorChannel() {
            return new ExecutorChannel(Executors.newCachedThreadPool());
        }
    
        public Function<String, String> echo() {
            return v -> v;
        }
    }
    

    What do you mean by "define inputChannel type"?