I am trying to use spring-integration-file to poll a directory and create a reactive stream from files placed in this directory. This is working for the most part, but when I place a file but have no subscriber in place I get an exception. To demonstrate the problem I have written a small demo application:
import org.reactivestreams.Publisher;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.dsl.Files;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import java.io.File;
@SpringBootApplication
@RestController
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public Publisher<Message<File>> reactiveSource() {
return IntegrationFlows
.from(Files.inboundAdapter(new File("."))
.patternFilter("*.csv"),
e -> e.poller(Pollers.fixedDelay(1000)))
.channel("processFileChannel")
.toReactivePublisher();
}
@GetMapping(value = "/files", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> files() {
return Flux.from(reactiveSource())
.map(message -> message.getPayload().getAbsolutePath());
}
}
So if I now do a curl to localhost:8080/files and then place a csv file in the root directory of the project everything is fine, I see the path of the file as response to my curl. But when I don't do a curl and then place a file in the root directory I get the following exception:
java.lang.IllegalStateException: The [bean 'reactiveSource.channel#0'; defined in: 'com.example.demo.DemoApplication';
from source: 'bean method reactiveSource'] doesn't have subscribers to accept messages
at org.springframework.util.Assert.state(Assert.java:97)
at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:61)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
... 38 more
I thought one of the attributes of reactive streams was that when there was no subscriber the stream would not start due to the stream being lazy. But apparently this is not the case. Could someone explain what I would need to do to have the stream not start if there is no subscriber?
If you use one of the latest version, then you can use a FluxMessageChannel
channel instead of that DirectChannel
for the "processFileChannel"
. This way a SourcePollingChannel
adapter will becomes reactive and indeed the source is not going to be polled until a subscription happens to that FluxMessageChannel
.
You then create a Flux
in your files()
API from this FluxMessageChannel
- no need in the .toReactivePublisher()
.
See more in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/reactive-streams.html#source-polling-channel-adapter
The point is that .toReactivePublisher()
just makes an integration flow as a Publisher
exactly at this point. Everything before this point is in regular, imperative way and works independently from the downstream logic.
UPDATE
Something like this:
@Bean
FluxMessageChannel filesChannel() {
return new FluxMessageChannel();
}
@Bean
public IntegrationFlow reactiveSource() {
return IntegrationFlows
.from(Files.inboundAdapter(new File("."))
.patternFilter("*.csv"),
e -> e.poller(Pollers.fixedDelay(1000)))
.channel(filesChannel())
.get();
}
@GetMapping(value = "/files", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> files() {
return Flux.from(filesChannel())
.map(message -> ((File) message.getPayload()).getAbsolutePath());
}