Let's take an example from some scalaz-stream docs, but with a theoretical twist.
import scalaz.stream._
import scalaz.concurrent.Task
val converter: Task[Unit] =
io.linesR("testdata/fahrenheit.txt")
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble).toString)
.intersperse("\n")
.pipe(text.utf8Encode)
.to(io.fileChunkW("testdata/celsius.txt"))
.run
// at the end of the universe...
val u: Unit = converter.run
In this case the file might very well contain some non-double string, and the fahrenheitToCelsius
will throw some NumberFormatException
. Let's say that in this case we want to maybe log this error and ignore it for further stream processing. What's the idiomatic way of doing it? I've seen some examples, but they usually collectFrom
the stream.
You can do it with scalaz.\/ and additional processing steps
def fahrenheitToCelsius(line: String): Throwable \/ String =
\/.fromTryCatchNonFatal {
val fahrenheit = line.toDouble
val celsius = fahrenheit // replace with real convert
celsius.toString
}
def collectSome[T]: PartialFunction[Option[T], T] = {
case Some(v) => v
}
def logThrowable[T]: PartialFunction[Throwable \/ T, Option[T]] = {
case -\/(err) =>
err.printStackTrace()
None
case \/-(v) => Some(v)
}
val converter: Task[Unit] =
io.linesR("testdata/fahrenheit.txt")
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(fahrenheitToCelsius)
.map(logThrowable)
.collect(collectSome)
.intersperse("\n")
.pipe(text.utf8Encode)
.to(io.fileChunkW("testdata/celsius.txt"))
.run