Search code examples
scalascalaz7scalaz-stream

Processing multiple files in parallel with scalaz streams


I'm trying to use scalaz-stream to process multiple files simultaneously, applying a single function to each line in the files, across all the files. For concreteness, suppose I have a function that takes a list of strings

def f(lines: Seq[String]): Something = ???

And a couple of files, the first:

foo1
foo2
foo3

the second:

bar1
bar2
bar3

The result of the whole process should be:

List(
  f(Seq("foo1", "bar1")), 
  f(Seq("foo2", "bar2")), 
  f(Seq("foo3", "bar3"))
)

(or more likely written directly into some other file)

The number of files is not known beforehand, and the number of lines may vary between the different files, but I'm okay with padding (at runtime) the ends of the shorter files with a default values, or cutting out the longer files.

So essentially, I need a way to combine a Seq[Process[Task, String]] (obtained via something like io.linesR) into a single Process[Task, Seq[String]].

What would be the simplest way to achieve that?

Or, more generally, how do I combine n instances of Process[F, I] into a single instance Process[F, Seq[I]]?

I'm sure there's some standard combinator for this purpose, but I wasn't able to figure it out...

Thanks.


Solution

  • This combinator doesn't exist yet, but you could add it. I think it will be something like:

    def zipN[F[_], A](xs: Seq[Process[F,A]]): Process[F,Seq[A]] = 
      if (xs.isEmpty) Process.halt
      else xs.map(_ map (Vector(_))).reduceLeft(_.zipWith(_)(_ ++ _))
    

    You could also add zipAllN, which takes a value to pad the sequences with (and which uses zipAll, and alignN, which allows streams to 'drop out' of the output process when they are exhausted. (So the output sequence may get shorter.)

    I would suggest you implement it as a 'balanced' reduce rather than a left or right reduce, since it will be more efficient that way.

    Please do submit a pull request + tests if you end up implementing this for real!