Search code examples
reactive-programmingspring-webfluxproject-reactorreactive-streams

Emitting from Flux<String> until one of conditions are met?


I am given an InputStream which is representing a resources (most probably a .txt file) and my assignment is to parse lines of that file.

I have to make my program as modifiable as possible, meaning that today I'm reading from a file, but tomorrow data could arrive over a TCP stream from the other side of the world. No assumptions should be made about the speed at which new lines are arriving.

Having that in mind, I decided to try asynchronous/reactive approach (if someone could provide some good resource to explain the difference, that would be grate).

I added in pom.xml:

    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-webflux -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
        <version>2.4.4</version>
    </dependency>

Also, I have to read/parse the file until 100 lines have been processed or until 30 seconds have been passed since I started with processing. So which ever condition is met, processing should stop.

So far, I have:

    String absolutePath = "C:\\Users\\User\\Desktop\\input.json"; 
    InputStream is = new FileInputStream(absolutePath);
    
    
    Flux<String> stringFlux = Flux.using(
            () -> new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8)).lines().limit(100),
            Flux::fromStream,
            Stream::close
    ).take(100);
    
    stringFlux
    .subscribe(s -> System.out.println(s + " " + new Date()), (error) -> System.out.println(error),  () -> System.out.println("complete Chanel"));

I deliberately left .limit(100) and .take(100) since I'm not sure which is better (maybe both are wrong?). Also, I don't know how to set up time constraint.


Solution

  • I have to read/parse the file until 100 lines have been processed

    You've outlined the two sensible options here - you either limit the underlying stream, or you limit the flux itself. Usually I advocate applying the limit to the source if at all possible (which would be the stream in this case), but if you want to be able to swap out that source easily and keeping the limit in place, then applying take(100) on the outer flux might be the better option.

    It's up to you really, there's no hard and fast rule as to what's better here - pick one or the other.

    ...or until 30 seconds have been passed since I started with processing.

    There's another overload of take() that uses Duration. Just use that, so your flux ends with .take(100).take(Duration.ofSeconds(30)).

    (Not wholly relevant to the question, but as a general note - Files.lines() is a more succinct way of reading a file rather than using the BufferedReader these days.)