The following definition results in a memory leak:
def enumIterator1[E, F[_]: Monad](x: => Iterator[E]) : EnumeratorT[E, F] =
new EnumeratorT[E, F] {
def apply[A] = (s: StepT[E, F, A]) => {
def go(xs: Iterator[E]): IterateeT[E, F, A] =
if(xs.isEmpty) s.pointI
else {
val next = xs.next
s mapCont { k =>
k(Iteratee.elInput(next)) >>== enumIterator1[E, F](xs).apply[A]
}
}
go(x)
}
}
The leak can be observed with the following test:
(Iteratee.fold[Array[Byte], IO, Long](0L)(_+_.length)
&= enumIterator1(
Iterator.continually(
Array.fill(1 << 16)(0.toByte)).take(1 << 16))
).run.unsafePerformIO
However, a minor change (i.e., moving the xs.next
call) stops the leak:
def enumIterator1[E, F[_]: Monad](x: => Iterator[E]) : EnumeratorT[E, F] =
new EnumeratorT[E, F] {
def apply[A] = (s: StepT[E, F, A]) => {
def go(xs: Iterator[E]): IterateeT[E, F, A] =
if(xs.isEmpty) s.pointI
else {
// val next = xs.next (moved down)
s mapCont { k =>
val next = xs.next
k(Iteratee.elInput(next)) >>== enumIterator1[E, F](xs).apply[A]
}
}
go(x)
}
}
Why?
I have a vague notion that the explanation has to do with the reference pattern of the closures, but I can't come up with a specific reason for this behavior. I'm trying to track down a different memory leak, and I suspect (hope?) that understanding this leak may help to identify the cause of that one.
The problem is that the anonymous function passed to mapCont
closes over next
. In turn, this is closed-over by the lazy variable we pass to enumIterator, which is closed-over by the new Enumerator
formed by enumIterator1
, which is closed-over by the anonymous function in apply
, which is finally closed-over by the anonymous function passed to mapCont
for the next iteration.
So, by a chain of closures, each enumerator closes over its predecessor. This would probably happen whether next
was captured or not, so you'd have a minor memory leak either way. However, you end up capturing next
in one of these closures, which means that every value generated by your iterator stays in memory until the whole process is complete (and these values take up a lot of memory).
By moving next
inside the anonymous function passed to mapCont
, next
is not captured in our chain of closures any more, so the main memory leak disappears (although your closures still close over each other, which may be a concern).
The best way to fix this is probably to simplify it. As Brian Kernighan famously said:
Everyone knows that debugging is twice as hard as writing a program in the first place. So if you're as clever as you can be when you write it, how will you ever debug it?
I'm not certain I fully understand the code, but I suspect the following is equivalent:
def enumIterator1[E, F[_]: Monad](x: => Iterator[E]) : EnumeratorT[E, F] =
new EnumeratorT[E, F] {
def apply[A] = {
val xs = x
def innerApply(s: StepT[E, F, A]): IterateeT[E, F, A] = {
if(xs.isEmpty) s.pointI
else {
val next = xs.next
s mapCont { cont => // renamed k to cont, as the function, rather than the variable, is k
cont(Iteratee.elInput(next)) >>== innerApply
}
}
}
innerApply
}
}
You might also benefit from making things more explicit. For example, what if rather than having an anonymous EnumeratorT
that implicitly closes over anything it needs within its scope, you define a named class, with top level scope, and pass in anything it needs explicitly.
I used -XX:+HeapDumpOnOutOfMemoryError
, VisualVM, and javap
to find the cause of the issue. They should be everything you need.
I think I'm starting to grok what the code's supposed to do, and I've updated my code accordingly. I think the problem was the use of enumIterator1[E, F](xs).apply[A]
. The code was creating a new EnumeratorT
just to get at its apply method, but creating a by-name variable and closing over everything-and-its-dog in the process. Since the value of xs
doesn't change from one recursion to the next, we create an innerApply
method which closes over the val xs
, and re-use innerApply
.
I was curious, so I had a look around in the Scalaz source to see how they solve this problem. Here's some code with a similar bent from Scalaz itself:
def enumIterator[E, F[_]](x: => Iterator[E])(implicit MO: MonadPartialOrder[F, IO]) : EnumeratorT[E, F] =
new EnumeratorT[E, F] {
import MO._ // Remove this line, and you can copy and paste it into your code
def apply[A] = {
def go(xs: Iterator[E])(s: StepT[E, F, A]): IterateeT[E, F, A] =
if(xs.isEmpty) s.pointI
else {
s mapCont { k =>
val next = xs.next
k(elInput(next)) >>== go(xs)
}
}
go(x)
}
}
They use currying, rather than closure, to capture xs
, but it's still an "inner apply" approach.