Search code examples
spring-batchspring-cloud-streamspring-integration-dslspring-cloud-functionspring-batch-integration

How to use the functional style on my RemoteChunkingManagerStepBuilderFactory?


I am implementing Spring Batch-Integration RemoteChunking. https://docs.spring.io/spring-batch/docs/current/reference/html/spring-batch-integration.html#remote-chunking

I've come across the deprecation of @Input and the documentation says we have to use the functional style.

How can I use the Consumer (which is used on the spring cloud stream) in my Spring Batch Integration Flows?

package pt.bayonne.sensei.RemoteChunking.manager;

import org.aspectj.bridge.Message;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import java.util.function.Consumer;
import java.util.function.Supplier;

@Profile("!worker")
@Configuration
public class FunctionalBinders {


    @Bean
    public Sinks.Many<Object> sink() {
        return Sinks.many()
                .replay()
                .latest();
    }


    @Bean
    public Supplier<Flux<Object>> clientRequests() {
        return () -> sink()
                .asFlux()
                .cache();
    }
    
    @Bean
    public Consumer<Message<?>> onClientReplies(){
        return message -> {
            //do your stuff
        };
    }
}

My JobConfiguration

public TaskletStep dispatchStep(){
       return this.remoteChunkingManagerStepBuilderFactory.get("dispatch-step")
                .chunk(10)
                .reader(reader())
                .outputChannel((message, timeout) -> sink.tryEmitNext(message).isSuccess())
                .inputChannel(replies()) //how to use the functional style here?
                .build();

}

I know that it needs a PollableChannel but my question is how to use the functional style on my RemoteChunkingManagerStepBuilderFactory?

Any example would be very appreciated. Thanks a lot.


Solution

  • .outputChannel((message, timeout) -> sink.tryEmitNext(message).isSuccess()) .inputChannel(replies()) //how to use the functional style here?

    The method org.springframework.batch.integration.chunk.RemoteChunkingManagerStepBuilder#outputChannel(MessageChannel) accepts a org.springframework.messaging.MessageChannel which is a functional interface (See how it is annotated with @FunctionalInterface). Hence you can use a lambda to define the output channel in the builder.

    However, org.springframework.batch.integration.chunk.RemoteChunkingManagerStepBuilder#inputChannel(PollableChannel) accepts a org.springframework.messaging.PollableChannel which is not a functional interface. That's why it is not possible to use a lambda in here.