Search code examples
scalaapache-sparkaggregate-functionsuser-defined-functionskryo

How can you use a nested Map as a buffer in a Spark Aggregator?


I am trying to implement a Scala Spark Aggregator with a Map containing non-primitive types (for example, Map[String, Set[String]]) as its buffer. It seems like I can use kryo or ExpressionEncoder to encode a collection of primitives (for example, Set[String]) but when I embed it within a Map it can't seem to find the encoder.

How do I create an encoder for such nested types?

I have tried things like the following:

def bufferEncoder: Encoder[Map[String, Set[String]]] = Encoders.kryo[Map[String, Set[String]]]

and

def bufferEncoder: Encoder[Map[String, Set[String]]] = implicitly(ExpressionEncoder[Map[String, Set[String]]])

For another Aggregator I wrote, I used

  def bufferEncoder: Encoder[Set[String]] = Encoders.kryo[Set[String]]

which worked.

But when I try the first two options I get this error:

java.lang.UnsupportedOperationException: No Encoder found for java.util.Map[String,Array[String]]

UPDATE:

I am adding two code samples, as simple as I can make them. The only difference is that the first uses an Array within the Map, and the second uses a Set. The first compiles and runs (the results aren't important) but the second gets the exception I have described above. Please note that in both examples, I am using a mutable Scala Map and Set.

class MapSetTest extends Aggregator[String, Map[String, Set[String]], Int] with Serializable {

override def zero = Map[String, Set[String]]()

override def reduce(buffer: Map[String, Set[String]], newItem: String) = {
    buffer.put(newItem, Set[String]() + newItem)
    buffer
}


override def merge(b1: Map[String, Set[String]], b2: Map[String, Set[String]]) = {
  b1
}

override def finish(reduction: Map[String, Set[String]]): Int = {
  reduction.size
}

def bufferEncoder = implicitly[Encoder[Map[String, Set[String]]]]
def outputEncoder = Encoders.scalaInt
}

vs.

class MapArrayTest extends Aggregator[String, Map[String, Array[String]], Int] with Serializable {

override def zero = Map[String, Array[String]]()

override def reduce(buffer: Map[String, Array[String]], newItem: String) = {
  buffer.put(newItem, Array[String](newItem))
  buffer
}


override def merge(b1: Map[String, Array[String]], b2: Map[String, Array[String]]) = {
  b1
}

override def finish(reduction: Map[String, Array[String]]): Int = {
  reduction.size
}

def bufferEncoder = implicitly[Encoder[Map[String, Array[String]]]]

def outputEncoder = Encoders.scalaInt

} }


Solution

  • Whilst per my comment I wouldn't recommend kyro this code:

    def bufferEncoder: Encoder[Map[String, Set[String]]] = implicitly(ExpressionEncoder[Map[String, Set[String]]])
    

    is not going to work on it's own.

    1. Don't use ExpressionEncoder, only use Encoder (ExpressionEncoder is an implementation detail).
    2. You must import sparkSession.implicits._ to get the correct encoder derivation (using Sparks in-built encoders). Those implicits are bound to the Expression type - not to ExpressionEncoder
    3. Consider only materialising encoders when you need them, and pass around implicits if you need to, this will make it easier to swap which encoder you use. (e.g. if you use frameless)

    Unfortunately point 2. isn't directly called out in the getting started page.

    import sparkSession.implicits._
    val enc = implicitly[Encoder[Map[String,Set[String]]]]
    

    will work, this however, as you've found out, won't

    import sparkSession.implicits._
    val enc = implicitly[ExpressionEncoder[Map[String,Set[String]]]]