Search code examples
spring-integrationreactive

Spring reactive file integration


I am trying to use spring-integration-file to poll a directory and create a reactive stream from files placed in this directory. This is working for the most part, but when I place a file but have no subscriber in place I get an exception. To demonstrate the problem I have written a small demo application:

import org.reactivestreams.Publisher;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.dsl.Files;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

import java.io.File;

@SpringBootApplication
@RestController
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public Publisher<Message<File>> reactiveSource() {
        return IntegrationFlows
                .from(Files.inboundAdapter(new File("."))
                                .patternFilter("*.csv"),
                        e -> e.poller(Pollers.fixedDelay(1000)))
                .channel("processFileChannel")
                .toReactivePublisher();
    }

    @GetMapping(value = "/files", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> files() {
        return Flux.from(reactiveSource())
                .map(message -> message.getPayload().getAbsolutePath());
    }
}

So if I now do a curl to localhost:8080/files and then place a csv file in the root directory of the project everything is fine, I see the path of the file as response to my curl. But when I don't do a curl and then place a file in the root directory I get the following exception:

java.lang.IllegalStateException: The [bean 'reactiveSource.channel#0'; defined in: 'com.example.demo.DemoApplication'; 
from source: 'bean method reactiveSource'] doesn't have subscribers to accept messages
        at org.springframework.util.Assert.state(Assert.java:97)
        at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:61)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
        ... 38 more

I thought one of the attributes of reactive streams was that when there was no subscriber the stream would not start due to the stream being lazy. But apparently this is not the case. Could someone explain what I would need to do to have the stream not start if there is no subscriber?


Solution

  • If you use one of the latest version, then you can use a FluxMessageChannel channel instead of that DirectChannel for the "processFileChannel". This way a SourcePollingChannel adapter will becomes reactive and indeed the source is not going to be polled until a subscription happens to that FluxMessageChannel.

    You then create a Flux in your files() API from this FluxMessageChannel - no need in the .toReactivePublisher().

    See more in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/reactive-streams.html#source-polling-channel-adapter

    The point is that .toReactivePublisher() just makes an integration flow as a Publisher exactly at this point. Everything before this point is in regular, imperative way and works independently from the downstream logic.

    UPDATE

    Something like this:

    @Bean
    FluxMessageChannel filesChannel() {
        return new FluxMessageChannel();
    }
    
    @Bean
    public IntegrationFlow reactiveSource() {
            return IntegrationFlows
                    .from(Files.inboundAdapter(new File("."))
                                    .patternFilter("*.csv"),
                            e -> e.poller(Pollers.fixedDelay(1000)))
                    .channel(filesChannel())
                    .get();
        }
    
    @GetMapping(value = "/files", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> files() {
            return Flux.from(filesChannel())
                    .map(message -> ((File) message.getPayload()).getAbsolutePath());
        }