Search code examples
scalazscalaz-stream

side-effects for `wye` combinators when using `halt` from scalaz-stream


filter (which uses halt inside) terminates other branch even if it has some side-effects:

scala> val p = Process("1","2", "3")
scala> val p1 = p.filter(_ => true).map(_ + "p1").observe(io.stdOutLines)
scala> val p2 = p.filter(_ => false).map(_ + "p2").observe(io.stdOutLines)
scala> (p1 yip p2).run.run
1p1

scala> val p2 = p.filter(_ => true).map(_ + "p1").observe(io.stdOutLines)
scala> (p1 yip p2).run.run
1p1
1p2
2p1
2p2
3p1
3p2

Seems logical as there is no value to be returned to yip after that filter. But what about side-effects, specified with observe?

My current solution is to use flatMap to specify default value:

scala> val p1 = p.map(_ + "p1").flatMap(x => Process.emit(x).observe(io.stdOutLines))

scala> val p2 = p.map(_ + "p2").flatMap(x => Process.emit(""))

scala> (p1 yip p2).run.run
1p1
2p1
3p1

But maybe there is a way to use filter?

P.S. merge combinator executes side-effects for other branch (as it doesn't require value to be returned), but it doesn't wait for other branch if one halts (even if it has side-effects).


Solution

  • Actually it should be just something like that:

    in.map(emit).flatMap{ p =>
      val p1 = p.map(_ + "p1").filter(_ => true).observe(out)
      val p2 = p.map(_ + "p2").filter(_ => false).observe(out)
      p1 merge p2
    }.run.run
    

    It makes all side effects being in order as filter can't get more than one value (produced by emit)