Search code examples
scalaapache-flink

Understanding Flink - Task Not Serializable


I am working on a Flink project and came across an issue that I managed to resolve with the help of answers from Stackoverflow. However, it is not clear to me why the proposed solutions actually work and I found information about the topic to be sparse. Consider the following code:

object DeCP {
  def main(args: Array[String]): Unit = {
    val params: ParameterTool = ParameterTool.fromArgs(args)

    // Get the execution environment and read the data
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    val queryPoints: DataSet[Point] = readQueryPoints(env, params)
    val points: DataSet[Point] = readFeatureVector(env, params)

    // Process the query points
    queryPoints
      .map(new KNNRich)
      .withBroadcastSet(points, "pointsIn")
      .print
  }

  final class KNNRich extends RichMapFunction[Point, (Point, Vector[Point])]{
    private var pointsIn: Traversable[Point] = _

    override def open(parameters: Configuration): Unit =
      pointsIn = getRuntimeContext.getBroadcastVariable[Point]("pointsIn").asScala

    def map(queryPoint: Point): (Point, Vector[Point]) = {
      val dataSetIn = ExecutionEnvironment.getExecutionEnvironment
                                          .fromCollection(pointsIn.toVector)
      val cluster = new Cluster(dataSetIn, queryPoint)
      val knn = cluster.kNearestNeighbor(queryPoint, 3) // This call causes problems
      (queryPoint, knn.collect.toVector)
    }
  }
}

The Cluster class and companion object are defined as:

class Cluster(var points: DataSet[Point],
              var clusterLeader: Point) extends Serializable {
  private var queryPoint: Point = _

  def distance(p: Point): Point = {
    p.eucDist(queryPoint)
  }

  def kNearestNeighbor(queryPoint: Point, k: Int): DataSet[Point] = {
    this.queryPoint = queryPoint

    this.points.map{p => distance(p)} // Task not serializable
    this.points.map{p => p.eucDist(queryPoint)} // Works
    this.points.map{p => Cluster.staticDistance(queryPoint, p)} // Works
  }
}

object Cluster {
  def staticDistance(queryPoint: Point, p: Point): Point = {
    p.eucDist(queryPoint)
  }
}

The call to the distance method causes a task not serializable exception, but replacing the method call with the definition fixes the issue. Similarly, defining the exact same method as a member of the companion object allows the code to function properly.

Why does the first call not work, but the two other calls work? What happens if you have a more complicated execution flow on the class, that is not easily replaced as methods on the companion object?


Solution

  • By performing DataSet transformations you are just creating the logical plan of your pipeline. The pipeline is submitted to the cluster by calling execute/print/collect.

    When the pipeline is submitted to the cluster, each function such as your RichMapFunction is serialized, sent to the cluster, duplicated for each parallel instance, and executed independently. When you get a "Task not serializable" exception, this means that your RichMapFunction is referencing variables/objects outside of this class transitively. You should make sure that a function is an independent block.

    By calling points.map{} you are implicitly creating a MapFunction. But this MapFunction has a reference to the instance of Cluster thus is not independent. Flink tries to serialize Cluster as well but fails. If distance would be static (defined in the companion object), then Cluster does not need to be serialized as well.

    By the way, another problem of your example is that you are not using the DataSet API as intended. Usually, you should not create a pipeline within a running pipeline. This could also cause unintended side effects.