Search code examples
spring-reactor

Why does ConnectableFlux.connect() block?


I'm new to Spring Reactor. I've been trying to understand how the ConnectableFlux class works. I've read the docs and seen examples posted online but am stuck on an issue.

Can someone tell me why the connect() method is blocking? I don't see anything in the documentation that says it should block..especially since it returns a Disposable for later use. Given my example code below, I never get past the connect() method.

I'm trying to basically simulate the old style Listener interface paradigm I've used many times in the past. I want to learn how to recreate a Service class & Listener architecture using Reactive streams. Where I have a simple Service class and it has a method called "addUpdateListener(Listener l)" and then when my service class "doStuff()" method it triggers some events to be passed to any listeners.

I should say that I will be writing an API for others to use, so when I say Service class I don't mean the @Service in Spring terms. It will be a plain java singleton class.

I'm just using Spring Reactor for the Reactive Streams. I was also looking at RxJava.. but wanted to see if Spring Reactor Core would work.

I was starting with a test class below just to understand the library syntax and then got stuck on the blocking issue.

I think what I'm looking for is described here: Multiple Subscribers

UPDATE: Running my code through a debugger, the code inside ConnectableFlux connect method, never returns. It hangs on the internal connect method and never returns from that method.

reactor.core.publisher.ConnectableFlux

public final Disposable connect() {
        Disposable[] out = new Disposable[]{null};
        this.connect((r) -> {
            out[0] = r;
        });
        return out[0];
    }

Any help would be great!

Here is my maven pom as well

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>SpringReactorTest</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
    </properties>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>Bismuth-RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>classworlds:classworlds</exclude>
                                    <exclude>junit:junit</exclude>
                                    <exclude>jmock:*</exclude>
                                    <exclude>*:xml-apis</exclude>
                                    <exclude>org.apache.maven:lib:tests</exclude>
                                    <exclude>log4j:log4j:jar:</exclude>
                                </excludes>
                            </artifactSet>
                            <minimizeJar>true</minimizeJar>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;

import java.util.concurrent.TimeUnit;

import static java.time.Duration.ofSeconds;

/**
 * Testing ConnectableFlux
 */
public class Main {

    private final static Logger LOG = LoggerFactory.getLogger(Main.class);

    public static void main(String[] args) throws InterruptedException {
        Main m = new Main();

        // Get the connectable
        ConnectableFlux<Object> flux = m.fluxPrintTime();

        // Subscribe some listeners
        // Tried using a new thread for the subscribers, but the connect call still blocks
        LOG.info("Subscribing");
        Disposable disposable = flux.subscribe(e -> LOG.info("Fast 1 - {}", e));
        Disposable disposable2 = flux.subscribe(e -> LOG.info("Fast 2 - {}", e));

        LOG.info("Connecting...");
        Disposable connect = flux.connect();// WHY does this block??
        LOG.info("Connected..");

        // Sleep 5 seconds
        TimeUnit.SECONDS.sleep(5);

        // Cleanup - Remove listeners
        LOG.info("Disposing");
        connect.dispose();
        disposable.dispose();
        disposable2.dispose();
        LOG.info("Disposed called");
    }

    // Just create a test flux
    public ConnectableFlux<Object> fluxPrintTime() {
        return Flux.create(fluxSink -> {
            while (true) {
                fluxSink.next(System.currentTimeMillis());
            }
        }).doOnSubscribe(ignore -> LOG.info("Connecting to source"))
                .sample(ofSeconds(2))
                .publish();
    }
}

Running the above code gives the following output.. it just prints the time in milliseconds until I Ctrl-C the process..

09:36:21.463 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
09:36:21.478 [main] INFO Main - Subscribing
09:36:21.481 [main] INFO Main - Connecting...
09:36:21.490 [main] INFO Main - Connecting to source
09:36:23.492 [parallel-1] INFO Main - Fast 1 - 1589808983492
09:36:23.493 [parallel-1] INFO Main - Fast 2 - 1589808983492
09:36:25.493 [parallel-1] INFO Main - Fast 1 - 1589808985493
09:36:25.493 [parallel-1] INFO Main - Fast 2 - 1589808985493
09:36:27.490 [parallel-1] INFO Main - Fast 1 - 1589808987490
09:36:27.490 [parallel-1] INFO Main - Fast 2 - 1589808987490
09:36:29.493 [parallel-1] INFO Main - Fast 1 - 1589808989493
...

Solution

  • I received an answer from the Spring Reactor team and I'm just posting it here in case anyone else runs into this...

    The crux of the issue is that you're entering an infinite loop in Flux.create. The moment the flux gets subscribed, it will enter the loop and never exit it, producing data as fast as the CPU can. With Flux.create you should at least have a call to sink.complete() at some point.

    I suggest to experiment with eg. Flux.interval as a source for your regular ticks, it will get rid of that extraneous complexity of Flux.create, which puts you in charge of lower level concepts of Reactive Streams (the onNext/onComplete/onError signals, that you'll need to learn about, but maybe not just right now 😄 ).

    As a side note, I would take into consideration that emulating a listener-based API with Reactor (or RxJava) is not doing justice to what reactive programming can do. It is a constrained use case that will probably drive your focus and expectations away from the real benefits of reactive programming

    From a higher perspective:

    The broad idea of ConnectableFlux#connect() is that you have a "transient" source that you want to share between multiple subscribers, but it gets triggered the moment someone subscribes to it. So in order not to miss any event, you turn the source into a ConnectableFlux, perform some set up (subscribe several subscribers) and manually trigger the source (by calling connect()). It is not blocking, and returns a Disposable` that represents the upstream connection (in case you also want to manually cancel/dispose the whole subscription).

    PS: Bismuth is now clearly outdated, prefer using the latest Dysprosium release train