Type mismatch when following example from documentation in fs2

I am trying to build an application using FS2 (0.10.0). I've taken this example from the documentation:

import fs2._
// import fs2._

import fs2.async
// import fs2.async

import scala.concurrent.ExecutionContext
// import scala.concurrent.ExecutionContext

import cats.effect.{ Effect, IO }
// import cats.effect.{Effect, IO}

type Row = List[String]
// defined type alias Row

trait CSVHandle {
  def withRows(cb: Either[Throwable,Row] => Unit): Unit
// defined trait CSVHandle

def rows[F[_]](h: CSVHandle)(implicit F: Effect[F], ec: ExecutionContext): Stream[F,Row] =
  for {
    q <- Stream.eval(async.unboundedQueue[F,Either[Throwable,Row]])
    _ <- Stream.suspend { h.withRows { e => async.unsafeRunAsync(q.enqueue1(e))(_ => IO.unit) }; Stream.emit(()) }
    row <- q.dequeue.rethrow
  } yield row
// rows: [F[_]](h: CSVHandle)(implicit F: cats.effect.Effect[F], implicit ec: scala.concurrent.ExecutionContext)fs2.Stream[F,Row]

But it fails on compilation with:

type mismatch;
[error]  found   : fs2.Stream[F,Row]
[error]  required: fs2.Stream[fs2.Pure,?]
[error] Expanded types:
[error] found   : fs2.Stream[F,List[String]]
[error] required: fs2.Stream[fs2.Pure,?]"
[error]       row <- q.dequeue.rethrow

I'm afraid I'm stuck and I don't understand why is that happening. Any idea?


  • As per this Gitter conversation there is an error in the example. Use instead:

     def rows[F[_]](h: CSVHandle)(implicit F: Effect[F], ec: ExecutionContext): Stream[F,Row] =
       for {
         q <- Stream.eval(async.unboundedQueue[F,Either[Throwable,Row]])
         _ <-  Stream.eval { F.delay(h.withRows(e => async.unsafeRunAsync(q.enqueue1(e))(_ => IO.unit))) }
         row <- q.dequeue.rethrow
       } yield row