Search code examples
scalaapache-sparkserializationmap-function

Iterate over a spark dataframe and store each row value in variables of another class


I want to iterate over a spark dataframe and store values of each row in a classes data members (global variables).

Code:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{
  StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.Row

object Main {
  class Input_Class {
    var name: String = "";
    var age: String = "";
    var gender: String = "";

    def setter(src: Row) {
      var row = src.toSeq
      var i = 0;
      name = (row(i)).toString;
      i += 1;
      age = (row(i)).toString;
      i += 1;
      gender = (row(i)).toString;
    }
  }
  class Manager extends Serializable{
    var inputObj = new Input_Class();
    def inputSetter(src: Row) = {
        inputObj.setter(src);
    }
  }

  def main(args: Array[String]) {
    val spark = SparkSession.builder.appName("App").config("spark.master", "local").getOrCreate()
    val df = spark.read.csv("data.csv ");
    var ManagerObj = new Manager();
    df.rdd.map(row => {
        ManagerObj.inputSetter(row)
    })
    spark.stop()
  }
}

I am not sure whether this code is correct or not. Am I using the map operator wrong? As the error says that it is not serialisable. Please help me out here I am a newbie and don't have much experience on this, if there is a better or another way of achieving what I am doing please do recommend.

This is the error that I get:

20/06/03 17:44:13 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
org.apache.spark.SparkException: Task not serializable                          
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
  at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:371)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.map(RDD.scala:370)
  at Main$.main(Untitled-2.scala:57)
  ... 51 elided
Caused by: java.io.NotSerializableException: Main$Manager
Serialization stack:
        - object not serializable (class: Main$Manager, value: Main$Manager@108f206f)
        - field (class: scala.runtime.ObjectRef, name: elem, type: class java.lang.Object)
        - object (class scala.runtime.ObjectRef, Main$Manager@108f206f)
        - element of array (index: 0)
        - array (class [Ljava.lang.Object;, size 1)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class Main$, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic Main$.$anonfun$main$1$adapted:(Lscala/runtime/ObjectRef;Lorg/apache/spark/sql/Row;)Ljava/lang/Object;, instantiatedMethodType=(Lorg/apache/spark/sql/Row;)Ljava/lang/Object;, numCaptured=1])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class Main$$$Lambda$2851/2090377899, Main$$$Lambda$2851/2090377899@7722c8e6)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
  ... 60 more

Thanks!


Solution

  • you are using Manager class instance in closure. Please extends Serializable interface in Manager

     class Manager extends Serializable {
        var inputObj = new Input_Class();
        def inputSetter(src: Row) = {
            inputObj.setter(src);
        }
      }