Search code examples
scalascalaz7scalaz-stream

Logging and ignoring exception from Task in scalaz-streams


Let's take an example from some scalaz-stream docs, but with a theoretical twist.

import scalaz.stream._
import scalaz.concurrent.Task

val converter: Task[Unit] =
  io.linesR("testdata/fahrenheit.txt")
    .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
    .map(line => fahrenheitToCelsius(line.toDouble).toString)
    .intersperse("\n")
    .pipe(text.utf8Encode)
    .to(io.fileChunkW("testdata/celsius.txt"))
    .run

// at the end of the universe...
val u: Unit = converter.run

In this case the file might very well contain some non-double string, and the fahrenheitToCelsius will throw some NumberFormatException. Let's say that in this case we want to maybe log this error and ignore it for further stream processing. What's the idiomatic way of doing it? I've seen some examples, but they usually collectFrom the stream.


Solution

  • You can do it with scalaz.\/ and additional processing steps

      def fahrenheitToCelsius(line: String): Throwable \/ String = 
         \/.fromTryCatchNonFatal {
            val fahrenheit = line.toDouble
            val celsius = fahrenheit // replace with real convert
            celsius.toString
         }
    
      def collectSome[T]: PartialFunction[Option[T], T] = {
        case Some(v) => v
      }
    
      def logThrowable[T]: PartialFunction[Throwable \/ T, Option[T]] = {
        case -\/(err) => 
          err.printStackTrace()
          None
        case \/-(v) => Some(v)
      }
    
      val converter: Task[Unit] =
        io.linesR("testdata/fahrenheit.txt")
          .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
          .map(fahrenheitToCelsius)
          .map(logThrowable)
          .collect(collectSome)
          .intersperse("\n")
          .pipe(text.utf8Encode)
          .to(io.fileChunkW("testdata/celsius.txt"))
          .run