Search code examples
scalaiofunctional-programmingscalazscala-cats

Designing referentially transparent function for reading from channel


I'm trying to adhere to pure-FP style and want to design a referentially transparent function.

I have a java.nio.channels.SeekableByteChannel, which is a data source. And soon as I open a file and get the SeekableFileChannel instance, I need to read the first lines of the file and use these lines to determine the seek position.

So I created the following function:

object AdjustChannelAndGetStream {

  def apply(ch: SeekableFileChannel)
           (firstChunksToOffset: List[Array[Byte]] => Long): fs2.Stream[Id, Array[Byte]] {
    val offset = //depending on the first bytes read from the file 
                 //get the number of bytes read before
    val newChannel = ch.position(offset)
    //finally wrap newChannel into fs2.Stream
  }
}

The thing is, the function looks ugly. It does not suspend side effects, which makes it difficult to test (mocking SeekableByteChannel).

I tend to wrap SeekableByteChannel into IO[SeekableByteChannel] (Scalaz/Cats does not matter), but I don't see how it can help (we need the same mock of SeekableByteChannel, but now wrapped into IO).

Can you help me to design this function in pure-FP style (or at least to make it not so ugly)?


Solution

  • When you need to wrap impure code, most of the time (based on my experience), it's not going to "be pretty". But, what we gain is that we only have a single point which deals with the "messy stuff", and we get a nice abstraction from there on.

    What we want is to create a stream which is bound by an IO effect. Instead of Stream[Id, SeekableByteChannel], we're really in Stream[IO, SeekableByteChannel] because we are in the IO effect context:

    import java.nio.channels.SeekableByteChannel
    import cats.effect.IO
    
    object AdjustChannelAndGetStream {
        def apply(ch: SeekableByteChannel)(
            firstChunksToOffset: List[Array[Byte]] => Long)
          : fs2.Stream[IO, SeekableByteChannel] = {
          fs2.Stream.eval {
            IO {
              val offset: Int = ???
              ch.position(offset)
            }
          }
        }
    }
    

    This way, we suspend the side effect, which is what we want to make these side effectful computation RT, and apply transformations on the stream from this point on.