I am writing a Flink transformer and I have a custom object Histogram
with the following attributes:
case class Histogram(
nRows: Int,
nCols: Int,
min: Int,
step: Double,
private val countMatrix: Array[ArrayBuffer[Double]],
private val cutMatrixL1: Array[ArrayBuffer[Double]],
val distribMatrixL1: Array[ArrayBuffer[Map[Int, Double]]],
private val distribMatrixL2: Array[ArrayBuffer[Map[Int, Double]]],
private val cutMatrixL2: ArrayBuffer[ArrayBuffer[Double]])
extends Serializable {
???
}
This is my FitOperation
:
implicit val fitOp = new FitOperation[PIDiscretizerTransformer, LabeledVector] {
override def fit(
instance: PIDiscretizerTransformer,
fitParameters: ParameterMap,
input: DataSet[LabeledVector]): Unit = {
// get params...
val metric = input.map { x ⇒
// (instance, histrogram totalCount)
(x, Histogram(nAttrs, l1InitialBins, min, instance.step), 1)
}.reduce { (m1, m2) ⇒
// Update Layer 1
val updatedL1 = updateL1(m1._1, m1._2, instance.step, initialElems, alpha, m1._3)
// Update Layer 2 if neccesary
val updatedL2 = if (m1._3 % l2updateExamples == 0) {
updateL2(m1._1, updatedL1)
} else updatedL1
(m2._1, updatedL2, m1._3 + 1)
}.map(_._2)
// instance.metricsOption = Some(metric)
}
}
This works well, but if I uncomment the last line: instance.metricsOption = Some(metric)
I get a java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
How could I find which object in my class Histogram
is causing the problem? As far as I know ArrayBuffer
is serializable, so is Map. Although I've found this SO question:
Map can not be serializable in scala?
which says .mapValues
is not serializable, but I am not using .mapValues
anywhere.
The problem is that you are referring to instance.step
inside your MapFunction
. instance
is of type PIDiscretizerTransformer
which cannot be serialized. Thus, you need to compute step outside of the MapFunction
and pass the value into the function. Then your program should be serializable.