Search code examples
javaakkaakka-stream

How to split a stream into two flows using GraphDSL


I'm learning akka streams and as an example I'm attempting to split data into two separate flows depending on the structure of JSON received from a source using the GraphDSL.

Here is the complete code I've stitched together below :

import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.ClosedShape;
import akka.stream.FlowShape;
import akka.stream.SourceShape;
import akka.stream.UniformFanOutShape;
import akka.stream.javadsl.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import objectmappertest.Request;
import java.util.Collections;
import java.util.concurrent.CompletionStage;

public class FilterObj {

    public static void main(String args[]){

        final ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");

        final String json1  = "{\"datePurchased\":\"2022-02-03 21:32:017\"}";
        final String json2  = "{\"unknownField\":\"test\"}";

        final ObjectMapper mapper = new ObjectMapper();

        final Source<String, NotUsed> source1 = Source.repeat(json1).take(3);
        final Source<String, NotUsed> source2 = Source.repeat(json2).take(3);
        final Source<String, NotUsed> source3 = Source.repeat(json2).take(3);

        final Source<String, NotUsed> source =
                Source.combine(source1, source2 , Collections.singletonList(source3), Merge::create);

        final Flow<String, Request, NotUsed> flowOperation = Flow.of(String.class)
                .map ( input -> {
                    final JsonNode request = mapper.readTree(input);
                    if(request.has("datePurchased")){
                        return mapper.readValue(request.toString() , Request.class);
                    }
                    else {
                        return mapper.readValue(request.toString() , Request.class);
                    }

                }).log("error");

        final Sink<Request, CompletionStage<Done>> printSink = Sink.foreach(System.out::println);

        final RunnableGraph<CompletionStage<Done>> graph = RunnableGraph.fromGraph(
                GraphDSL.create(printSink, (builder, out) -> {
                    final SourceShape<String> sourceShape = builder.add(source);
                    final FlowShape<String, Request> flow1Shape = builder.add(flowOperation);

                    final UniformFanOutShape<String, String> broadcast =
                            builder.add(Broadcast.create(2));
                    builder.from(sourceShape)
                            .viaFanOut(broadcast)
                            .via(flow1Shape)
                            .to(out);

                    return ClosedShape.getInstance();
                })
        );
        graph.run(actorSystem);
        
    }
}

Request class :

import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Builder;
import lombok.ToString;
import lombok.extern.jackson.Jacksonized;
import java.util.Date;

@Builder
@ToString
@Jacksonized
public class Request
{
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:sss")
    private final Date datePurchased;
}

Running the above graph prints :

17:16:57.129 INFO  akka.event.slf4j.Slf4jLogger  [NativeMethodAccessorImpl.java:-2] - Slf4jLogger started
SLF4J: A number (1) of logging calls during the initialization phase have been intercepted and are
SLF4J: now being replayed. These are subject to the filtering rules of the underlying logging system.
SLF4J: See also http://www.slf4j.org/codes.html#replay
Exception in thread "main" java.lang.IllegalStateException: Illegal GraphDSL usage. Outlets [Broadcast.out1] were not returned in the resulting shape and not connected.
    at akka.stream.scaladsl.GraphDSL$Builder.result(Graph.scala:1690)
    at akka.stream.scaladsl.GraphApply.createGraph(GraphApply.scala:1144)
    at akka.stream.scaladsl.GraphApply.createGraph(GraphApply.scala:46)
    at akka.stream.scaladsl.GraphApply.createGraph$(GraphApply.scala:41)
    at akka.stream.scaladsl.GraphDSL$.createGraph(Graph.scala:1557)
    at akka.stream.javadsl.GraphCreate.create(GraphCreate.scala:26)
    at akka.stream.javadsl.GraphDSL.create(Graph.scala)
    at FilterObj.FilterObj.main(FilterObj.java:70)

To emulate two types of JSON payloads I merge sources into a single source using :

    final String json1  = "{\"datePurchased\":\"2022-02-03 21:32:017\"}";
    final String json2  = "{\"unknownField\":\"test\"}";

    final ObjectMapper mapper = new ObjectMapper();

    final Source<String, NotUsed> source1 = Source.repeat(json1).take(3);
    final Source<String, NotUsed> source2 = Source.repeat(json2).take(3);
    final Source<String, NotUsed> source3 = Source.repeat(json2).take(3);

    final Source<String, NotUsed> source =
            Source.combine(source1, source2 , Collections.singletonList(source3), Merge::create);

I'm unsure how to correctly incorporate the logic to split the data depending if datePurchased is contained with the JSON being parsed, currently it's implemented as

                if(request.has("datePurchased")){
                    return mapper.readValue(request.toString() , Request.class);
                }
                else {
                    return mapper.readValue(request.toString() , Request.class);
                }

I think I need a UniformFanOutShape but I'm unsure how to incorporate it into the GraphDSL .


Solution

  • If you want to split data than you need Partition rather than Broadcast.

    They are both fanout shapes, but whereas Broadcast sends incoming message to all its outputs, Partition allows you to use custom logic to decide which output a message should go to... which is what I think you're trying to do.

    The documentation is pretty good with code samples: https://doc.akka.io/docs/akka/current/stream/operators/Partition.html