Search code examples
scalaakkarpcapache-sparkfinagle

Scala distributed execution of function objects


Given the following function objects,

val f : Int => Double = (i:Int) => i + 0.1

val g1 : Double => Double = (x:Double) => x*10

val g2 : Double => Double = (x:Double) => x/10

val h : (Double,Double) => Double = (x:Double,y:Double) => x+y

and for instance 3 remote servers or nodes (IP xxx.xxx.xxx.1, IP 2 and IP 3), how to distribute the execution of this program,

val fx = f(1)
val g1x = g1( fx )
val g2x = g2( fx )
val res = h ( g1x, g2x )

so that

  • fx is computed in IP 1,
  • g1x is computed in IP 2,
  • g2x is computed in IP 3,
  • res is computed in IP 1

May Scala Akka or Apache Spark provide a simple approach to this ?

Update

  • RPC (Remote Procedure Call) Finagle as suggested by @pkinsky may be a feasible choice.
  • Consider load-balancing policies as a mechanism for selecting a node for execution, at least any free available node policy.

Solution

  • I can speak for Apache Spark. It can do what you are looking for with the code below. But it's not designed for this kind of parallel computation. It is designed for parallel computation where you also have a large amount of parallel data distributed on many machines. So the solution looks a bit silly, as we distribute a single integer across a single machine for example (for f(1)).

    Also, Spark is designed to run the same computation on all the data. So running g1() and g2() in parallel goes a bit against the design. (It's possible, but not elegant, as you see.)

    // Distribute the input (1) across 1 machine.
    val rdd1 = sc.parallelize(Seq(1), numSlices = 1)
    // Run f() on the input, collect the results and take the first (and only) result.
    val fx = rdd1.map(f(_)).collect.head
    // The next stage's input will be (1, fx), (2, fx) distributed across 2 machines.
    val rdd2 = sc.parallelize(Seq((1, fx), (2, fx)), numSlices = 2)
    // Run g1() on one machine, g2() on the other.
    val gxs = rdd2.map {
      case (1, x) => g1(x)
      case (2, x) => g2(x)
    }.collect
    val g1x = gxs(0)
    val g2x = gxs(1)
    // Same deal for h() as for f(). The input is (g1x, g2x), distributed to 1 machine.
    val rdd3 = sc.parallelize(Seq((g1x, g2x)), numSlices = 1)
    val res = rdd3.map { case (g1x, g2x) => h(g1x, g2x) }.collect.head
    

    You can see that Spark code is based around the concept of RDDs. An RDD is like an array, except it's partitioned across multiple machines. sc.parallelize() creates such a parallel collection from a local collection. For example rdd2 in the above code will be created from the local collection Seq((1, fx), (2, fx)) and split across two machines. One machine will have Seq((1, fx)), the other will have Seq((2, fx)).

    Next we do a transformation on the RDD. map is a common transformation that creates a new RDD of the same length by applying a function to each element. (Same as Scala's map.) The map we run on rdd2 will replace (1, x) with g1(x) and (2, x) with g2(x). So on one machine it will cause g1() to run, while on the other g2() will run.

    Transformations run lazily, only when you want to access the results. The methods that access the results are called actions. The most straightforward example is collect, which downloads the contents of the entire RDD from the cluster to the local machine. (It is exactly the opposite of sc.parallelize().)

    You can try and see all this if you download Spark, start bin/spark-shell, and copy your function definitions and the above code into the shell.