From reading a Google groups post from 2016 : “.map() is converted to a .via()”
src : https://groups.google.com/g/akka-user/c/EzHygZpcCHg
Are the following lines of code equivalent :
Source.repeat(json).take(3).via(mapToDtoFlow).to(printSink).run(actorSystem);
Source.repeat(json).take(3).map(x -> mapper.readValue(x, RequestDto.class)).to(printSink).run(actorSystem);
Are there scenarios when a map should be used instead of flow?
src :
RequestDTO :
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.jackson.Jacksonized;
import java.util.Date;
@Getter
@Setter
@Builder
@ToString
@Jacksonized
public class RequestDto {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:sss")
private final Date datePurchased;
}
StreamManager (contains main method) :
import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.concurrent.CompletionStage;
public class StreamManager {
final static ObjectMapper mapper = new ObjectMapper();
private static final Flow<String, RequestDto, NotUsed> mapToDtoFlow = Flow.of(String.class)
.map(input -> mapper.readValue(input, RequestDto.class))
.log("error");
public static void main(String args[]) {
final ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");
final Sink<RequestDto, CompletionStage<Done>> printSink = Sink.foreach(System.out::println);
final String json = "{\"datePurchased\":\"2022-03-03 21:32:017\"}";
Source.repeat(json).take(3).via(mapToDtoFlow).to(printSink).run(actorSystem);
Source.repeat(json).take(3).map(x -> mapper.readValue(x, RequestDto.class)).to(printSink).run(actorSystem);
}
}
map
is converted to a via
, but it's not an exactly syntactically equivalent via
as you'd get from Flow.of().map()
.
The first would translate to a .via(Map(f))
, where Map
is a GraphStage
which implements the map operation.
In the second case, the mapToDtoFlow
(ignoring the log
) would itself be (in Scala notation) Flow[String].via(Map(f))
so you'd be adding another layer of via
: .via(Flow[String].via(Map(f)))
.
For all intents and purposes, they're the same (I suspect that the materializer, when it comes time to interpret the RunnableGraph
you've built, will treat them identically).
Taking the .log
into account, mapToDtoFlow
is equivalent (again in Scala):
Flow[String]
.via(Map(f))
.via(Log(...))
There are basically three levels of defining streams in Akka Streams, from highest level to lowest level:
GraphStage
sThe DSLs merely specify succinct ways of building GraphStage
s and the fundamental way to link GraphStage
s with Flow
shape is through the via
operation.