I'm learning akka streams and as an example I'm attempting to split data into two separate flows depending on the structure of JSON received from a source using the GraphDSL
.
Here is the complete code I've stitched together below :
import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.ClosedShape;
import akka.stream.FlowShape;
import akka.stream.SourceShape;
import akka.stream.UniformFanOutShape;
import akka.stream.javadsl.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import objectmappertest.Request;
import java.util.Collections;
import java.util.concurrent.CompletionStage;
public class FilterObj {
public static void main(String args[]){
final ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");
final String json1 = "{\"datePurchased\":\"2022-02-03 21:32:017\"}";
final String json2 = "{\"unknownField\":\"test\"}";
final ObjectMapper mapper = new ObjectMapper();
final Source<String, NotUsed> source1 = Source.repeat(json1).take(3);
final Source<String, NotUsed> source2 = Source.repeat(json2).take(3);
final Source<String, NotUsed> source3 = Source.repeat(json2).take(3);
final Source<String, NotUsed> source =
Source.combine(source1, source2 , Collections.singletonList(source3), Merge::create);
final Flow<String, Request, NotUsed> flowOperation = Flow.of(String.class)
.map ( input -> {
final JsonNode request = mapper.readTree(input);
if(request.has("datePurchased")){
return mapper.readValue(request.toString() , Request.class);
}
else {
return mapper.readValue(request.toString() , Request.class);
}
}).log("error");
final Sink<Request, CompletionStage<Done>> printSink = Sink.foreach(System.out::println);
final RunnableGraph<CompletionStage<Done>> graph = RunnableGraph.fromGraph(
GraphDSL.create(printSink, (builder, out) -> {
final SourceShape<String> sourceShape = builder.add(source);
final FlowShape<String, Request> flow1Shape = builder.add(flowOperation);
final UniformFanOutShape<String, String> broadcast =
builder.add(Broadcast.create(2));
builder.from(sourceShape)
.viaFanOut(broadcast)
.via(flow1Shape)
.to(out);
return ClosedShape.getInstance();
})
);
graph.run(actorSystem);
}
}
Request class :
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Builder;
import lombok.ToString;
import lombok.extern.jackson.Jacksonized;
import java.util.Date;
@Builder
@ToString
@Jacksonized
public class Request
{
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:sss")
private final Date datePurchased;
}
Running the above graph prints :
17:16:57.129 INFO akka.event.slf4j.Slf4jLogger [NativeMethodAccessorImpl.java:-2] - Slf4jLogger started
SLF4J: A number (1) of logging calls during the initialization phase have been intercepted and are
SLF4J: now being replayed. These are subject to the filtering rules of the underlying logging system.
SLF4J: See also http://www.slf4j.org/codes.html#replay
Exception in thread "main" java.lang.IllegalStateException: Illegal GraphDSL usage. Outlets [Broadcast.out1] were not returned in the resulting shape and not connected.
at akka.stream.scaladsl.GraphDSL$Builder.result(Graph.scala:1690)
at akka.stream.scaladsl.GraphApply.createGraph(GraphApply.scala:1144)
at akka.stream.scaladsl.GraphApply.createGraph(GraphApply.scala:46)
at akka.stream.scaladsl.GraphApply.createGraph$(GraphApply.scala:41)
at akka.stream.scaladsl.GraphDSL$.createGraph(Graph.scala:1557)
at akka.stream.javadsl.GraphCreate.create(GraphCreate.scala:26)
at akka.stream.javadsl.GraphDSL.create(Graph.scala)
at FilterObj.FilterObj.main(FilterObj.java:70)
To emulate two types of JSON payloads I merge sources into a single source using :
final String json1 = "{\"datePurchased\":\"2022-02-03 21:32:017\"}";
final String json2 = "{\"unknownField\":\"test\"}";
final ObjectMapper mapper = new ObjectMapper();
final Source<String, NotUsed> source1 = Source.repeat(json1).take(3);
final Source<String, NotUsed> source2 = Source.repeat(json2).take(3);
final Source<String, NotUsed> source3 = Source.repeat(json2).take(3);
final Source<String, NotUsed> source =
Source.combine(source1, source2 , Collections.singletonList(source3), Merge::create);
I'm unsure how to correctly incorporate the logic to split the data depending if datePurchased
is contained with the JSON being parsed, currently it's implemented as
if(request.has("datePurchased")){
return mapper.readValue(request.toString() , Request.class);
}
else {
return mapper.readValue(request.toString() , Request.class);
}
I think I need a UniformFanOutShape
but I'm unsure how to incorporate it into the GraphDSL .
If you want to split data than you need Partition
rather than Broadcast
.
They are both fanout shapes, but whereas Broadcast
sends incoming message to all its outputs, Partition
allows you to use custom logic to decide which output a message should go to... which is what I think you're trying to do.
The documentation is pretty good with code samples: https://doc.akka.io/docs/akka/current/stream/operators/Partition.html