Search code examples
scalatype-level-computationfs2

How to convert `fs2.Stream[IO, T]` to `Iterator[T]` in Scala


Need to fill in the methods next and hasNext and preserve laziness

new Iterator[T] {
  val stream: fs2.Stream[IO, T] = ...

  def next(): T = ???
  def hasNext(): Boolean = ???

}

But cannot figure out how an earth to do this from a fs2.Stream? All the methods on a Stream (or on the "compiled" thing) are fairly useless.

If this is simply impossible to do in a reasonable amount of code, then that itself is a satisfactory answer and we will just rip out fs2.Stream from the codebase - just want to check first!


Solution

  • fs2.Stream, while similar in concept to Iterator, cannot be converted to one while preserving laziness. I'll try to elaborate on why...

    Both represent a pull-based series of items, but the way in which they represent that series and implement the laziness differs too much.

    As you already know, Iterator represents its pull in terms of the next() and hasNext methods, both of which are synchronous and blocking. To consume the iterator and return a value, you can directly call those methods e.g. in a loop, or use one of its many convenience methods.

    fs2.Stream supports two capabilities that make it incompatible with that interface:

    • cats.effect.Resource can be included in the construction of a Stream. For example, you could construct a fs2.Stream[IO, Byte] representing the contents of a file. When consuming that stream, even if you abort early or do some strange flatMap, the underlying Resource is honored and your file handle is guaranteed to be closed. If you were trying to do the same thing with iterator, the "abort early" case would pose problems, forcing you to do something like Iterator[Byte] with Closeable and the caller would have to make sure to .close() it, or some other pattern.
    • Evaluation of "effects". In this context, effects are types like IO or Future, where the process of obtaining the value may perform some possibly-asynchronous action, and may perform side-effects. Asynchrony poses a problem when trying to force the process into a synchronous interface, since it forces you to block your current thread to wait for the asynchronous answer, which can cause deadlocks if you aren't careful. Libraries like cats-effect strongly discourage you from calling methods like unsafeRunSync.

    fs2.Stream does allow for some special cases that prevent the inclusion of Resource and Effects, via its Pure type alias which you can use in place of IO. That gets you access to Stream.PureOps, but that only gets you methods that consume the whole stream by building a collection; the laziness you want to preserve would be lost.

    Side note: you can convert an Iterator to a Stream.

    The only way to "convert" a Stream to an Iterator is to consume it to some collection type via e.g. .compile.toList, which would get you an IO[List[T]], then .map(_.iterator) that to get an IO[Iterator[T]]. But ultimately that doesn't fit what you're asking for since it forces you to consume the stream to a buffer, breaking laziness.

    @Dima mentioned the "XY Problem", which was poorly-received since they didn't really elaborate (initially) on the incompatibility, but they're right. It would be helpful to know why you're trying to make a Stream-to-Iterator conversion, in case there's some other approach that would serve your overall goal instead.