Search code examples
scalaserializationapache-flinkserializable

Identify which object is not serializable in Apache-Flink


I am writing a Flink transformer and I have a custom object Histogram with the following attributes:

case class Histogram(
  nRows: Int,
  nCols: Int,
  min: Int,
  step: Double,
  private val countMatrix: Array[ArrayBuffer[Double]],
  private val cutMatrixL1: Array[ArrayBuffer[Double]],
  val distribMatrixL1: Array[ArrayBuffer[Map[Int, Double]]],
  private val distribMatrixL2: Array[ArrayBuffer[Map[Int, Double]]],
  private val cutMatrixL2: ArrayBuffer[ArrayBuffer[Double]])
  extends Serializable {
    ???
}

This is my FitOperation:

implicit val fitOp = new FitOperation[PIDiscretizerTransformer, LabeledVector] {
    override def fit(
                      instance: PIDiscretizerTransformer,
                      fitParameters: ParameterMap,
                      input: DataSet[LabeledVector]): Unit = {

      // get params...

      val metric = input.map { x ⇒
        // (instance, histrogram totalCount)
        (x, Histogram(nAttrs, l1InitialBins, min, instance.step), 1)
      }.reduce { (m1, m2) ⇒
        // Update Layer 1
        val updatedL1 = updateL1(m1._1, m1._2, instance.step, initialElems, alpha, m1._3)

        //         Update Layer 2 if neccesary
        val updatedL2 = if (m1._3 % l2updateExamples == 0) {
          updateL2(m1._1, updatedL1)
        } else updatedL1

        (m2._1, updatedL2, m1._3 + 1)
      }.map(_._2)

      //      instance.metricsOption = Some(metric)
    }
  }

This works well, but if I uncomment the last line: instance.metricsOption = Some(metric) I get a java.io.NotSerializableException: org.apache.flink.api.scala.DataSet

How could I find which object in my class Histogram is causing the problem? As far as I know ArrayBuffer is serializable, so is Map. Although I've found this SO question:

Map can not be serializable in scala?

which says .mapValues is not serializable, but I am not using .mapValues anywhere.


Solution

  • The problem is that you are referring to instance.step inside your MapFunction. instance is of type PIDiscretizerTransformer which cannot be serialized. Thus, you need to compute step outside of the MapFunction and pass the value into the function. Then your program should be serializable.