Search code examples
scalaakkaakka-stream

Implement simple architecture using Akka Graphs


I’m attempting to setup a simple graph structure that process data via invoking rest services, forwards the result of each service to an intermediary processing unit before forwarding the result. Here is a high level architecture :

enter image description here

Can this be defined using Akka graph streams ? Reading https://doc.akka.io/docs/akka/current/stream/stream-graphs.html I don't understand how to even implement this simple architecture.

I've tried to implement custom code to execute functions within a graph :

package com.graph

class RestG {

  def flow (in : String) : String = {
    return in + "extra"
  }

}

object RestG {

  case class Flow(in: String) {

    def out : String = in+"out"
  }

  def main(args: Array[String]): Unit = {

    List(new RestG().flow("test") , new RestG().flow("test2")).foreach(println)

  }

}

I'm unsure how to send data between the functions. So I think I should be using Akka Graphs but how to implement the architecture above ?


Solution

  • Here's how I would approach the problem. First some types:

    type Data = Int
    type RestService1Response = String
    type RestService2Response = String
    type DisplayedResult = Boolean
    

    Then stub functions to asynchronously call the external services:

    def callRestService1(data: Data): Future[RestService1Response] = ???
    def callRestService2(data: Data): Future[RestService2Response] = ???
    def resultCombiner(resp1: RestService1Response, resp2: RestService2Response): DisplayedResult = ???
    

    Now for the Akka Streams (I'm leaving out setting up an ActorSystem etc.)

    import akka.Done
    import akka.stream.scaladsl._
    
    type SourceMatVal = Any
    val dataSource: Source[Data, SourceMatVal] = ???
    
    def restServiceFlow[Response](callF: Data => Future[Data, Response], maxInflight: Int) = Flow[Data].mapAsync(maxInflight)(callF)
    
    // NB: since we're fanning out, there's no reason to have different maxInflights here...
    val service1 = restServiceFlow(callRestService1, 4)
    val service2 = restServiceFlow(callRestService2, 4)
    
    val downstream = Flow[(RestService1Response, RestService2Response)]
      .map((resultCombiner _).tupled)
    
    val splitAndCombine = GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._
    
      val fanOut = b.add(Broadcast[Data](2))
      val fanIn = b.add(Zip[RestService1Response, RestService2Response])
    
      fanOut.out(0).via(service1) ~> fanIn.in0
      fanOut.out(1).via(service2) ~> fanIn.in1
    
      FlowShape(fanOut.in, fanIn.out)
    }
    
    // This future will complete with a `Done` if/when the stream completes
    val future: Future[Done] = dataSource
      .via(splitAndCombine)
      .via(downstream)
      .runForeach { displayableData =>
        ??? // Display the data
      }
    

    It's possible to do all the wiring within the Graph DSL, but I generally prefer to keep my graph stages as simple as possible and only use them to the extent that the standard methods on Source/Flow/Sink can't do what I want.