Search code examples
scalascalaz-stream

How can I speed up scalaz-stream text processing?


How can I speed up the following scalaz-stream code? Currently it takes about 5 minutes to process 70MB of text, so I am probably doing something quite wrong, since a plain scala equivalent would take a few seconds.

(follow-up to another question)

  val converter2: Task[Unit] = {
    val docSep = "~~~"
    io.linesR("myInput.txt")
      .flatMap(line => { val words = line.split(" ");
          if (words.length==0 || words(0)!=docSep) Process(line)
          else Process(docSep, words.tail.mkString(" ")) })
      .split(_ == docSep)
      .filter(_ != Vector())
      .map(lines => lines.head + ": " + lines.tail.mkString(" "))
      .intersperse("\n")
      .pipe(text.utf8Encode)
      .to(io.fileChunkW("correctButSlowOutput.txt"))
      .run
  }

Solution

  • The following is based on @user1763729 's suggestion of chunking. It feels clunky though, and it's just as slow as the original version.

      val converter: Task[Unit] = {
        val docSep = "~~~"
        io.linesR("myInput.txt")
          .intersperse("\n") // handle empty documents (chunkBy has to switch from true to false)
          .zipWithPrevious // chunkBy cuts only *after* the predicate turns false
          .chunkBy{ 
            case (Some(prev), line) => { val words = line.split(" "); words.length == 0 || words(0) != docSep } 
            case (None, line) => true }
          .map(_.map(_._1.getOrElse(""))) // get previous element
          .map(_.filter(!Set("", "\n").contains(_)))
          .map(lines => lines.head.split(" ").tail.mkString(" ") + ": " + lines.tail.mkString(" "))
          .intersperse("\n")
          .pipe(text.utf8Encode)
          .to(io.fileChunkW("stillSlowOutput.txt"))
          .run
      }
    

    EDIT:

    Actually, doing the following (just reading the file, no writing or processing) already takes 1.5 minutes, so I guess there's not much hope to speed this up.

      val converter: Task[Unit] = {
        io.linesR("myInput.txt")
          .pipe(text.utf8Encode)
          .run
      }