Search code examples
scalaexceptionfunctional-programmingfs2

FS2 Stream exception handling not working


I am having a problem with FS2 and exception handling. What I want is that, given a Stream[IO,A], when I map on it using an f: A => B that can throw exception, I obtain a Stream[IO,Either[Throwable,B]].

I tried the following, and it works as expected:

import cats.effect.IO
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
  .map(x => x * x)
  .map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
  .attempt
x1.compile.toVector.unsafeRunSync().foreach(println)

It prints:

Right(1)
Right(4)
Left(java.lang.RuntimeException: I don't like 9s)

However, my problems start when I try to do anything with that Stream.

val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
  .map(x => x * x)
  .map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
  .attempt.map(identity)

x1.compile.toVector.unsafeRunSync().foreach(println)

Blows up with the exception and kills the application:

java.lang.RuntimeException: I don't like 9s
    at swaps.fm.A$A32$A$A32.$anonfun$x1$2(tmp2.sc:7)
    at scala.runtime.java8.JFunction1$mcII$sp.apply(tmp2.sc:8)
    ...

Even weirder, using take to have the Stream return only the elements that I know to be OK, still blows up in the same way:

val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
  .map(x => x * x)
  .map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
  .attempt.take(2)

x1.compile.toVector.unsafeRunSync().foreach(println)

Can anybody clarify why this is happening? Is this a bug or (un)expected behaviour?

N.B. This behaviour is present in FS2 0.10.0-M7 and 0.10.0


Solution

  • It seems the problem is here:

    self
     .stage(depth.increment, defer, o => emit(f(o)), os => {
       var i = 0; while (i < os.size) { emit(f(os(i))); i += 1; }
    

    This is the code inside Segment.map. When you allocate the vector using:

    Stream.emits(Vector(1,2,3,4))
    

    fs2 will allocate a single segment. Looking at the code above of map, os.size represents the size of the segment, meaning, map will always map the entire segment size. This means that even though you asked you take(2), we're effectively still mapping the entire segment.

    We can prove this by changing the code a little:

    def main(args: Array[String]): Unit = {
      val x1 = fs2.Stream
        .emits(Vector(1, 2, 3, 4))
        .segmentLimit(1)
        .covary[IO]
        .map { seg =>
          if (seg.sum.force.run > 3) throw new RuntimeException("I don't like 9s")
          else seg
        }
        .attempt
        .take(2)
    
    println(x1.compile.toVector.unsafeRunSync())
    

    The important part here is the segmentLimit, which makes the stream chunk the data flowing inside to segments of size one. When we run this code, we get:

    Vector(Right(Chunk(1)), Right(Chunk(2)))
    

    Is this a bug or not? Not sure. I'd consult with the maintainers on the Gitter channel.