Search code examples
javaakkaakka-stream

How to create a Flow.Publisher that starts on subscribe from an akka-stream Source


I have a java interface that I have to implement that looks like that:

public Flow.Publisher<Packet> getLivePublisher();

This interface must return a Flow.Publisher that stays inactive until it is subscribed to, and the subscriber calls Subscription.next(n).

So far, my implementation looks like

return Source
    .fromIterator(() -> new LivePacketIterator())
    .async("live-dispatcher")
    .runWith(JavaFlowSupport.Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), actorSystem);

Unfortunately, this seems to immediately start getting elements from my LivePacketIterator, even when no subscribers ahve subscribed to the returned Flow.Publisher.

I understand that a Source is just a sort of a template for a Subscribable source of objects (my understanding is that it's like a Factory of Publishers), and that it only converts to a concrete active source once it's materialized. So if I understand correctly, I need to somehow materialize my Source to get a Flow.Publisher. But I want it to be materialized in a way that it only starts running when it is subscribed to.

I've also tried to use toMat()

return Source
                .fromIterator(() -> new LivePacketIterator(maximumPacketSize))
                .filter(OrderSnapshotPacket::isNotEmpty)
                .async(dbDispatcher)
                .toMat(JavaFlowSupport.Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), Keep.right())
                .???;

But I'm not sure what to do with the resulting RunnableGraph.

Am I understanding this correctly? Is there a way to do what I'm trying to do?


Solution

  • Unfortunately, this seems to immediately start getting elements from my LivePacketIterator, even when no subscribers ahve subscribed to the returned Flow.Publisher.

    What exactly do you observe to state this? I used a very similar snippet to yours:

    Flow.Publisher<Integer> integerPublisher =
          Source.from(List.of(1,2,3,4,5))
                .wireTap(System.out::println)
                .async()
                .runWith(
                  JavaFlowSupport.Sink.asPublisher(AsPublisher.WITHOUT_FANOUT),
                  ActorSystem.create());
    

    This will not start emitting items from the list until the publisher is subscribed to.

    I understand that a Source is just a sort of a template for a Subscribable source of objects (my understanding is that it's like a Factory of Publishers), and that it only converts to a concrete active source once it's materialized

    Kind of. All Flow.* interfaces are part of reactive streams specification for JVM. Akka Streams treats those interfaces as SPI and doesn't use them directly in its API. It introduces its own abstractions like Source, Flow and Sink. Akka Streams allows you to convert the processing stream expressed in its API to the lower level Flow.* just as you did in your snippet. This is useful if you say want to plugin Akka Streams processing pipeline to some other reactive streams implementation like say RxJava or Project Reactor. So Source is Akka Stream's abstraction that is somehow equivalent to Flow.Publisher, that is, it's a source of potentially infinite number of values. You need to connect Source to a Sink (potentially via a Flow) so that you get a RunnableGraph which you can run. This will set everything in motion and in most cases it will cause chain of subscriptions and elements will start flowing through the stream. But that is not the only option in case of JavaFlowSupport.Sink.asPublisher Sink, running the RunnableGraph will convert the whole Akka Stream to an instance of Flow.Publisher. The semantics here is that the subscription is deferred until something somewhere calls subscribe on that instance. Which is exactly what you're trying to achieve if I understand correctly.