Search code examples
scalascalazloopstransducer-machinesscalaz-stream

Using Scalaz Stream for parsing task (replacing Scalaz Iteratees)


Introduction

I use Scalaz 7's iteratees in a number of projects, primarily for processing large-ish files. I'd like to start switching to Scalaz streams, which are designed to replace the iteratee package (which frankly is missing a lot of pieces and is kind of a pain to use).

Streams are based on machines (another variation on the iteratee idea), which have also been implemented in Haskell. I've used the Haskell machines library a bit, but the relationship between machines and streams isn't completely obvious (to me, at least), and the documentation for the streams library is still a little sparse.

This question is about a simple parsing task that I'd like to see implemented using streams instead of iteratees. I'll answer the question myself if nobody else beats me to it, but I'm sure I'm not the only one who's making (or at least considering) this transition, and since I need to work through this exercise anyway, I figured I might as well do it in public.

Task

Supposed I've got a file containing sentences that have been tokenized and tagged with parts of speech:

no UH
, ,
it PRP
was VBD
n't RB
monday NNP
. .

the DT
equity NN
market NN
was VBD
illiquid JJ
. .

There's one token per line, words and parts of speech are separated by a single space, and blank lines represent sentence boundaries. I want to parse this file and return a list of sentences, which we might as well represent as lists of tuples of strings:

List((no,UH), (,,,), (it,PRP), (was,VBD), (n't,RB), (monday,NNP), (.,.))
List((the,DT), (equity,NN), (market,NN), (was,VBD), (illiquid,JJ), (.,.)

As usual, we want to fail gracefully if we hit invalid input or file reading exceptions, we don't want to have to worry about closing resources manually, etc.

An iteratee solution

First for some general file reading stuff (that really ought to be part of the iteratee package, which currently doesn't provide anything remotely this high-level):

import java.io.{ BufferedReader, File, FileReader }
import scalaz._, Scalaz._, effect.IO
import iteratee.{ Iteratee => I, _ }

type ErrorOr[A] = EitherT[IO, Throwable, A]

def tryIO[A, B](action: IO[B]) = I.iterateeT[A, ErrorOr, B](
  EitherT(action.catchLeft).map(I.sdone(_, I.emptyInput))
)

def enumBuffered(r: => BufferedReader) = new EnumeratorT[String, ErrorOr] {
  lazy val reader = r
  def apply[A] = (s: StepT[String, ErrorOr, A]) => s.mapCont(k =>
    tryIO(IO(Option(reader.readLine))).flatMap {
      case None       => s.pointI
      case Some(line) => k(I.elInput(line)) >>== apply[A]
    }
  )
}

def enumFile(f: File) = new EnumeratorT[String, ErrorOr] {
  def apply[A] = (s: StepT[String, ErrorOr, A]) => tryIO(
    IO(new BufferedReader(new FileReader(f)))
  ).flatMap(reader => I.iterateeT[String, ErrorOr, A](
    EitherT(
      enumBuffered(reader).apply(s).value.run.ensuring(IO(reader.close()))
    )
  ))
}

And then our sentence reader:

def sentence: IterateeT[String, ErrorOr, List[(String, String)]] = {
  import I._

  def loop(acc: List[(String, String)])(s: Input[String]):
    IterateeT[String, ErrorOr, List[(String, String)]] = s(
    el = _.trim.split(" ") match {
      case Array(form, pos) => cont(loop(acc :+ (form, pos)))
      case Array("")        => cont(done(acc, _))
      case pieces           =>
        val throwable: Throwable = new Exception(
          "Invalid line: %s!".format(pieces.mkString(" "))
        )

        val error: ErrorOr[List[(String, String)]] = EitherT.left(
          throwable.point[IO]
        )

        IterateeT.IterateeTMonadTrans[String].liftM(error)
    },
    empty = cont(loop(acc)),
    eof = done(acc, eofInput)
  )
  cont(loop(Nil))
}

And finally our parsing action:

val action =
  I.consume[List[(String, String)], ErrorOr, List] %=
  sentence.sequenceI &=
  enumFile(new File("example.txt"))

We can demonstrate that it works:

scala> action.run.run.unsafePerformIO().foreach(_.foreach(println))
List((no,UH), (,,,), (it,PRP), (was,VBD), (n't,RB), (monday,NNP), (.,.))
List((the,DT), (equity,NN), (market,NN), (was,VBD), (illiquid,JJ), (.,.))

And we're done.

What I want

More or less the same program implemented using Scalaz streams instead of iteratees.


Solution

  • A scalaz-stream solution:

    import scalaz.std.vector._
    import scalaz.syntax.traverse._
    import scalaz.std.string._
    
    val action = linesR("example.txt").map(_.trim).
      splitOn("").flatMap(_.traverseU { s => s.split(" ") match {
        case Array(form, pos) => emit(form -> pos)
        case _ => fail(new Exception(s"Invalid input $s"))
      }})
    

    We can demonstrate that it works:

    scala> action.collect.attempt.run.foreach(_.foreach(println))
    Vector((no,UH), (,,,), (it,PRP), (was,VBD), (n't,RB), (monday,NNP), (.,.))
    Vector((the,DT), (equity,NN), (market,NN), (was,VBD), (illiquid,JJ), (.,.))
    

    And we're done.

    The traverseU function is a common Scalaz combinator. In this case it's being used to traverse, in the Process monad, the sentence Vector generated by splitOn. It's equivalent to map followed by sequence.