I am implementing Spring Batch-Integration RemoteChunking. https://docs.spring.io/spring-batch/docs/current/reference/html/spring-batch-integration.html#remote-chunking
I've come across the deprecation of @Input and the documentation says we have to use the functional style.
How can I use the Consumer (which is used on the spring cloud stream) in my Spring Batch Integration Flows?
package pt.bayonne.sensei.RemoteChunking.manager;
import org.aspectj.bridge.Message;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import java.util.function.Consumer;
import java.util.function.Supplier;
@Profile("!worker")
@Configuration
public class FunctionalBinders {
@Bean
public Sinks.Many<Object> sink() {
return Sinks.many()
.replay()
.latest();
}
@Bean
public Supplier<Flux<Object>> clientRequests() {
return () -> sink()
.asFlux()
.cache();
}
@Bean
public Consumer<Message<?>> onClientReplies(){
return message -> {
//do your stuff
};
}
}
My JobConfiguration
public TaskletStep dispatchStep(){
return this.remoteChunkingManagerStepBuilderFactory.get("dispatch-step")
.chunk(10)
.reader(reader())
.outputChannel((message, timeout) -> sink.tryEmitNext(message).isSuccess())
.inputChannel(replies()) //how to use the functional style here?
.build();
}
I know that it needs a PollableChannel but my question is how to use the functional style on my RemoteChunkingManagerStepBuilderFactory?
Any example would be very appreciated. Thanks a lot.
.outputChannel((message, timeout) -> sink.tryEmitNext(message).isSuccess()) .inputChannel(replies()) //how to use the functional style here?
The method org.springframework.batch.integration.chunk.RemoteChunkingManagerStepBuilder#outputChannel(MessageChannel)
accepts a org.springframework.messaging.MessageChannel
which is a functional interface (See how it is annotated with @FunctionalInterface
). Hence you can use a lambda to define the output channel in the builder.
However, org.springframework.batch.integration.chunk.RemoteChunkingManagerStepBuilder#inputChannel(PollableChannel)
accepts a org.springframework.messaging.PollableChannel
which is not a functional interface. That's why it is not possible to use a lambda in here.