I am having a problem with FS2 and exception handling. What I want is that, given a Stream[IO,A]
, when I map on it using an f: A => B
that can throw exception, I obtain a Stream[IO,Either[Throwable,B]]
.
I tried the following, and it works as expected:
import cats.effect.IO
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
.map(x => x * x)
.map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
.attempt
x1.compile.toVector.unsafeRunSync().foreach(println)
It prints:
Right(1)
Right(4)
Left(java.lang.RuntimeException: I don't like 9s)
However, my problems start when I try to do anything with that Stream
.
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
.map(x => x * x)
.map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
.attempt.map(identity)
x1.compile.toVector.unsafeRunSync().foreach(println)
Blows up with the exception and kills the application:
java.lang.RuntimeException: I don't like 9s
at swaps.fm.A$A32$A$A32.$anonfun$x1$2(tmp2.sc:7)
at scala.runtime.java8.JFunction1$mcII$sp.apply(tmp2.sc:8)
...
Even weirder, using take
to have the Stream
return only the elements that I know to be OK, still blows up in the same way:
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
.map(x => x * x)
.map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
.attempt.take(2)
x1.compile.toVector.unsafeRunSync().foreach(println)
Can anybody clarify why this is happening? Is this a bug or (un)expected behaviour?
N.B. This behaviour is present in FS2 0.10.0-M7 and 0.10.0
It seems the problem is here:
self
.stage(depth.increment, defer, o => emit(f(o)), os => {
var i = 0; while (i < os.size) { emit(f(os(i))); i += 1; }
This is the code inside Segment.map
. When you allocate the vector using:
Stream.emits(Vector(1,2,3,4))
fs2 will allocate a single segment. Looking at the code above of map
, os.size
represents the size of the segment, meaning, map
will always map the entire segment size. This means that even though you asked you take(2)
, we're effectively still mapping the entire segment.
We can prove this by changing the code a little:
def main(args: Array[String]): Unit = {
val x1 = fs2.Stream
.emits(Vector(1, 2, 3, 4))
.segmentLimit(1)
.covary[IO]
.map { seg =>
if (seg.sum.force.run > 3) throw new RuntimeException("I don't like 9s")
else seg
}
.attempt
.take(2)
println(x1.compile.toVector.unsafeRunSync())
The important part here is the segmentLimit
, which makes the stream chunk the data flowing inside to segments of size one. When we run this code, we get:
Vector(Right(Chunk(1)), Right(Chunk(2)))
Is this a bug or not? Not sure. I'd consult with the maintainers on the Gitter channel.