Search code examples
scalascalazenumeratorloopsscalaz7

Scalaz 7 Iteratee to process large zip file (OutOfMemoryError)


I'm trying to use the scalaz iteratee package to process a large zip file in constant space. I have a long-running process I need to perform on each file in the zip file. Those processes can (and should) be run in parallel.

I created an EnumeratorT that inflates each ZipEntry into a File object. The signature looks like:

def enumZipFile(f:File):EnumeratorT[IoExceptionOr[IO[File]], IO]

I want to attach an IterateeT that will perform the long-running process on each file. I basically end up with something like:

type IOE[A] = IoExceptionOr[A]

def action(f:File):IO[List[Promise[IOE[File]]]] = (
  consume[Promise[IOE[File]], IO, List] %=
  map[IOE[File], Promise[IOE[File]], IO](longRunningProcess) %=
  map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
  enumZipFile(f)
).run

def longRunningProcess:(iof:IOE[File]):Promise[IOE[File]] =
  Promise { Thread.sleep(5000); iof }

When I try to run it:

action(new File("/really/big/file.zip")).unsafePerformIO.sequence.get

I get a java.lang.OutOfMemoryError: Java heap space message. That makes sense to me, since it's trying to build up a massive list in memory of all these IO and Promise objects.

A few questions:

  • Does anyone have any ideas on how to avoid this? It feels like I'm approaching the problem incorrectly, because I really only care about the longRunningProcess for its side-effects.
  • Is the Enumerator approach here the wrong approach?

I'm pretty much out of ideas, so anything will help.

Thanks!

Update #1

Here is the stack trace:

[error] java.lang.OutOfMemoryError: Java heap space
[error]         at scalaz.Free.flatMap(Free.scala:46)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)

I am currently taking the advice of nadavwr to ensure everything is acting like I think it is. I will report back any updates.

Update #2

Using ideas from both the answers below, I found a decent solution. As huynhjl suggested (and I verified using nadavwr's suggestion of analyzing the heap dump), consume was causing every inflated ZipEntry to be held in memory, which is why the process was running out of memory. I changed consume to foldM and updated the long-running process to just return a Promise[IOE[Unit]] instead of a reference to the file. That way I have a collection of all IoExceptions at the end. Here is the working solution:

def action(f:File):IO[List[Promise[IOE[Unit]]]] = (
  foldM[Promise[IOE[Unit]], IO, List[Promise[IOE[Unit]]]](List.empty)((acc,x) => IO(x :: acc)) %=
  map[IOE[File], Promise[IOE[Unit]], IO](longRunningProcess) %=
  map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
  enumZipFile(f)
).run

def longRunningProcess:(iof:IOE[File]):Promise[IOE[Unit]] =
  Promise { Thread.sleep(5000); iof.map(println) }

This solution inflates each entry while asynchronously uploading them. At the end, I have a huge list of fulfilled Promise objects that contain any errors. I still not fully convinced this is the correct use of an Iteratee, but I do now have several reusable, composeable pieces that I can use in other pieces of our system (this is a very common pattern for us).

Thanks for all your help!


Solution

  • Don't use consume. See my other recent answer: How to use IO with Scalaz7 Iteratees without overflowing the stack?

    foldM may be a better choice.

    Also try to map the file to something else (like a success return code) to see if that allows the JVM to garbage collect the inflated zip entries.