Search code examples
javaakka

Difference between map and Flow


From reading a Google groups post from 2016 : “.map() is converted to a .via()”

src : https://groups.google.com/g/akka-user/c/EzHygZpcCHg

Are the following lines of code equivalent :

Source.repeat(json).take(3).via(mapToDtoFlow).to(printSink).run(actorSystem);
Source.repeat(json).take(3).map(x -> mapper.readValue(x, RequestDto.class)).to(printSink).run(actorSystem);

Are there scenarios when a map should be used instead of flow?

src :

RequestDTO :

import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.jackson.Jacksonized;

import java.util.Date;

@Getter
@Setter
@Builder
@ToString
@Jacksonized
public class RequestDto {
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:sss")
    private final Date datePurchased;
}

StreamManager (contains main method) :

import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.concurrent.CompletionStage;

public class StreamManager {

    final static ObjectMapper mapper = new ObjectMapper();

    private static final Flow<String, RequestDto, NotUsed> mapToDtoFlow = Flow.of(String.class)
            .map(input -> mapper.readValue(input, RequestDto.class))
            .log("error");

    public static void main(String args[]) {

        final ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");
        final Sink<RequestDto, CompletionStage<Done>> printSink = Sink.foreach(System.out::println);

        final String json = "{\"datePurchased\":\"2022-03-03 21:32:017\"}";

        Source.repeat(json).take(3).via(mapToDtoFlow).to(printSink).run(actorSystem);
        Source.repeat(json).take(3).map(x -> mapper.readValue(x, RequestDto.class)).to(printSink).run(actorSystem);

    }

}

Solution

  • map is converted to a via, but it's not an exactly syntactically equivalent via as you'd get from Flow.of().map().

    The first would translate to a .via(Map(f)), where Map is a GraphStage which implements the map operation.

    In the second case, the mapToDtoFlow (ignoring the log) would itself be (in Scala notation) Flow[String].via(Map(f)) so you'd be adding another layer of via: .via(Flow[String].via(Map(f))).

    For all intents and purposes, they're the same (I suspect that the materializer, when it comes time to interpret the RunnableGraph you've built, will treat them identically).

    Taking the .log into account, mapToDtoFlow is equivalent (again in Scala):

    Flow[String]
      .via(Map(f))
      .via(Log(...))
    

    There are basically three levels of defining streams in Akka Streams, from highest level to lowest level:

    • the Java/Scala DSLs
    • the Java/Scala Graph DSLs
    • GraphStages

    The DSLs merely specify succinct ways of building GraphStages and the fundamental way to link GraphStages with Flow shape is through the via operation.