I'm constructing a data pipeline using Akka streams and Akka HTTP. The use case is quite simple, receive a web request from a user which will do two things. First create a session by calling a 3rd party API, secondly committing this session to some persistent storage, when we have received the session it will then proxy the original user request but add the session data.
I have started working on the first branch of the data pipeline which is the session processing but I'm wondering if there is a more elegant way of unmarshalling the HTTP response from the 3rd party API to a POJO currently I'm using Jackson.unmarshaller.unmarshal
which returns a CompletionStage<T>
which I then have to unwrap into T
. It's not very elegant and I'm guessing that Akka HTTP has more clever ways of doing this.
Here is the code I have right now
private final Source<Session, NotUsed> session =
Source.fromCompletionStage(
getHttp().singleRequest(getSessionRequest(), getMat())).
map(r -> Jackson.unmarshaller(Session.class).unmarshal(r.entity(), getMat())).
map(f -> f.toCompletableFuture().get()).
alsoTo(storeSession);
Akka Streams offers you mapAsync
, a stage to handle asynchronous computation in your pipeline in a configurable, non-blocking way.
Your code should become something like
Source.fromCompletionStage(
getHttp().singleRequest(getSessionRequest(), getMat())).
mapAsync(4, r -> Jackson.unmarshaller(Session.class).unmarshal(r.entity(), getMat())).
alsoTo(storeSession);
Note that:
CompletableFuture.get
is a blocking call. This can cause dreadful issues in your pipeline.Int
parameter required by mapAsync
(parallelism) allows for fine-tuning of how many parallel async operations can be run at the same time.More info in mapAsync
can be found in the docs.