Search code examples
javascalaakka-stream

Akka Streams GraphStage


Using GraphStage is recommended in Akka Streams, but I could not find any documentation on using the getStageActor() method in Java (all of the documentation that I have found used Scala).

How can I convert the following code to Java?

lazy val self: StageActor = getStageActor(onMessage)

and

private def onMessage(x: (ActorRef, Any)): Unit =
{
  x match {
    case (_, msg: String) =>
      log.info("received msg, queueing: {} ", msg)
      messages = messages.enqueue(msg)
      pump()
  }
}

Solution

  • According to the getStageActor method documentation, it accepts a value of type

    scala.Function1<scala.Tuple2<ActorRef,java.lang.Object>, scala.runtime.BoxedUnit>
    

    which in Scala looks like

    ((ActorRef, AnyRef)) => Unit
    

    In Java this type would be semantically equivalent (using the Function interface) to

    Function<Tuple<ActorRef, Object>, Void>
    

    where Tuple<A, B> is a class which contains two values of types A and B.

    Therefore, to call the getStageActor method, you need to create a value of the aforementioned type. You can do it directly by constructing an instance of a class extending AbstractFunction1:

    import scala.Function1;
    import scala.Tuple2;
    import scala.runtime.AbstractFunction1;
    import scala.runtime.BoxedUnit;
    
    getStateActor(new AbstractFunction1<Tuple2<ActorRef, Object>, BoxedUnit>() {
        @Override
        public BoxedUnit apply(Tuple2<ActorRef, Object> args) {
            return BoxedUnit.UNIT;
        }
    });
    

    If you use Java 8, there are syntactically nicer ways to do it using lambda expressions.

    If you use Scala 2.12+, then scala.Function1 is a functional interface, and you can use lambda expressions directly:

    getStateActor((args: Tuple2<ActorRef, Object>) -> BoxedUnit.UNIT);
    

    If you use an older version of Scala, then due to the way traits are compiled, Function1 is not a functional interface, and you will need to use the scala-java8-compat library. With it, the code looks like

    import static scala.compat.java8.JFunction.*;
    
    getStateActor(func((args: Tuple2<ActorRef, Object>) -> BoxedUnit.UNIT));
    

    Then, to implement the logic of the function, you can access elements of the tuple using the _1() and _2() methods:

    (args: Tuple2<ActorRef, Object>) -> {
        Object msg = args._2();
        if (msg instanceof String) {
            log.info("received msg, queueing: {} ", msg);
            messages = messages.enqueue((String) msg);
            pump();
        }
        return BoxedUnit.UNIT;
    }
    

    This is a direct translation of the logic that you wanted to convert.