Search code examples
scalapipeline

How to elegantly implement the pipeline pattern using Scala


I'm looking to build a pipeline pattern with Scala. I wish after I write the pipeline objects, they could be connected together like this:

Pipeline1 :: Pipeline2 :: Pipeline3 ...

I have experimented with a few ideas so far. Some work and some don't. But none of them seems to completely get rid of boilerplate code. The following is the closest I've got.

First define the Pipeline and Source abstract class:

// I is the input type and O is the output type of the pipeline
abstract class Pipeline[I, +O](p: Pipeline[_, _ <: I]) {

  val source = p
  val name: String
  def produce(): O
  def stats():String
}
abstract class Source[+T] extends Pipeline[AnyRef, T](null)

Next, I created two pipelines and try to link them together

// this creates a random integer
class RandomInteger extends Source[Int] {
  override val name = "randInt"

  def produce() = {
    scala.Math.round(scala.Math.random.asInstanceOf[Float] * 10)
  }

  def stats()="this pipeline is stateless"
}

// multiply it by ten
class TimesTen(p: Pipeline[_, Int]) extends Pipeline[Int, Int](p) {
  private var count = 0 // this is a simple state of the pipeline
  override val name = "Times"
  def produce = {
    val i = source.produce()
    count += 1 // updating the state
    i * 10
  }
  def stats() = "this pipeline has been called for " + count + " times"
}

object TimesTen {
  // this code achieves the desired connection using ::
  // but this has to be repeated in each pipeline subclass. 
  // how to remove or abstract away this boilerplate code? 
  def ::(that: Pipeline[_, Int]) = new TimesTen(that)
}

This is the main class where two pipelines are linked.

object Pipeline {
  def main(args: Array[String]) {
    val p = new RandomInteger() :: TimesTen
    println(p.source)
    for (i <- 0 to 10)
      println(p.produce())
    println(p.stats())
  }
}

So this code works. But I would have to repeat the code in the TimesTen companion object in every pipeline class I write. This is certainly not desirable. Is there any better way to do this? Reflection might work, but I heard bad things about it, such as anything involving reflection is bad design. I'm also unsure about Scala's support for reflection.

Thank you for your time.

Update: I designed this toy problem to make it easy to understand. As a general solution, and as my application requires, each pipeline object has a state, which is ideally encapsulated within the object itself rather than exposed to every other pipeline. I have modified the code above to reflect this. I wish there could be an object-based solution. I'm still experimenting and will let you know if I find one.

Update 2: After some thoughts, I think the idea of the pipeline is really just a generalized function that contains some internal states as well as the ability to compose a Function0 function with a Function1 function. In Scala, the Function0 class does not have the compose() or andThen() method.


Solution

  • Here is the solution with objects using andThen. The idea is to force the creation of Function1 objects by using the input Unit. Connecting two Pipelines creates a new Pipeline with the two functions together. This solution allows Pipelines to have internal states.

    A further simplification would be to use apply() instead of produce(). This is left as an exercise for the reader.

    abstract class Pipeline[-I, +O] {
    
      val name: String
      def produce : I => O
      def stats(): String
    
      def ->[X](seg:Pipeline[_ >: O, X]):Pipeline[I, X] = {
        val func = this.produce
        val outerName = this.name
        new Pipeline[I, X] {
          val name = outerName + "." + seg.name
          def produce = func andThen seg.produce 
          def stats = seg.stats
        }
      }
    }
    
    abstract class Source[+T] extends Pipeline[Unit, T] {
    }
    
    class RandomInteger extends Source[Int] {
      override val name = "randInt"
      def produce: Unit => Int = (x:Unit) => scala.Math.round(scala.Math.random.asInstanceOf[Float] * 10) 
      def stats() = "stateless"
    }
    
    class TimesTen() extends Pipeline[Int, Int] {
      private var count = 0
      override val name = "times"
      def produce : Int => Int = (x:Int) => {    
        count += 1
        x * 10
      }
      def stats() = "called for " + count + " times"
    }
    
    
    object Main {
      def main(args: Array[String]) {
        val p = new RandomInteger() -> new TimesTen() 
    
        for (i <- 0 to 10)
          println(p.produce())
        println(p.name)    // print "randInt.times"
        println(p.stats()) // print "called for 11 times"
      }
    }