Search code examples
javascalamapreducescalding

What is the replacement for summing list in Scala-Scalding


I have following code where I maintain a large List: What I do here is go over the data stream and create an inverted index. I use twitter scalding api and dataTypePipe is type of TypedPipe

  lazy val cats = dataTypePipe.cross(cmsCats)
  .map(vf => (vf._1.itemId, vf._1.leafCats, vf._2))
    .flatMap {
    case (id, categorySet, cHhitters) => categorySet.map(cat => (
    ...
  }
    .filter(f => f._2.nonEmpty)
    .group.withReducers(4000)
    .sum
    .map {
    case ((token,bucket), ids) =>
      toIndexedRecord(ids, token, bucket)
  }

Due to a serialization issue I convert scala list to java list and use avro to write:

  def toIndexedRecord(ids: List[Long], token: String, bucket: Int): IndexRecord = {
     val javaList = ids.map(l => l: java.lang.Long).asJava //need to convert from scala long to java long
     new IndexRecord(token, bucket,javaList)
  }

But the issue is large number of information keeping in list cause Java Heap issue. I believe summing is also a contributor to this issue

2013-08-25 16:41:09,709 WARN org.apache.hadoop.mapred.Child: Error running child
cascading.pipe.OperatorException: [_pipe_0*_pipe_1][com.twitter.scalding.GroupBuilder$$anonfun$1.apply(GroupBuilder.scala:189)] operator Every failed executing operation: MRMAggregator[decl:'value']
    at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:136)
    at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:39)
    at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:49)
    at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:28)
    at cascading.flow.hadoop.stream.HadoopGroupGate.run(HadoopGroupGate.java:90)
    at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:133)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:522)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:421)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1232)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: java.lang.OutOfMemoryError: Java heap space
    at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
    at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
    at scala.collection.immutable.List.$colon$colon$colon(List.scala:127)
    at scala.collection.immutable.List.$plus$plus(List.scala:193)
    at com.twitter.algebird.ListMonoid.plus(Monoid.scala:86)
    at com.twitter.algebird.ListMonoid.plus(Monoid.scala:84)
    at com.twitter.scalding.KeyedList$$anonfun$sum$1.apply(TypedPipe.scala:264)
    at com.twitter.scalding.MRMAggregator.aggregate(Operations.scala:279)
    at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:128)

So my question is what can I do to avoid this situation.


Solution

  • Try .forceToReducers before the .sum. This OOM is happening map side as we are caching values. That may not help in your case.

    If the lists are truly too large however, there is really very little that can be done.