Search code examples
javaapache-sparkuser-defined-functionsapache-spark-datasetaccumulator

Spark Accumalator value not incrementing


I have been working on Spark Datasets recently, I have a scenario where I have to generate row number for each row and store it in a column named "Ids". This row number starts from 1, 2, 3... and increments based on the number of rows in dataset. (In my case there are 10000-20000 records)

Consider, I have a dataset 'empDataset' with values:

name , dept , project
---------------------
Tina, Finance , abc
Leena, Finance , abc
Joe, Marketing , xyz

Now for the above dataset I want to add a Column 'Ids' with values incrementing from 1,2,3.. so on.

The expected output is this

name , dept , project ,Ids
--------------------------
Tina, Finance , abc , 1
Leena, Finance , abc , 2
Joe, Marketing , xyz , 3

I also want to store this output in a another dataset and use it further for different transformations.

Need help to solve this problem statement.!!

My code snippet :

LongAccumulator  accValue = spark.sparkContext().longAccumulator();
long rowNumber = 1;

spark.udf().register("randomNumberGenerator", new UDF1<String, Long>() {

            @Override
            public Long call(String namCol) throws Exception {
                    accum.add(rowNumber);
                    System.out.println("inside" + accum.value());
                    return accum.value();
                }
        }, DataTypes.LongType);

Dataset<Row> empDatasetWithIds= empDataset.withColumn("Ids",callUDF("randomNumberGenerator",
                col(name)));

Dataset<Row> filterDept = empDatasetWithIds.filter(...here filtering with dept...)

The output I am getting is empDatasetWithIds (Incorrect output) :

name , dept , project ,Ids
--------------------------
Tina, Finance , abc , 1
Leena, Finance , abc , 2
Joe, Marketing , xyz , 1

The above code works fine when run on local mode but on the cluster mode the values do not increment.

I also went through below links: https://community.hortonworks.com/questions/36888/spark-java-accumulator-not-incrementing.html Spark Java Accumulator not incrementing

The spark accumalators require an action to trigger the job. In my scenario, I am further performing filter transformation on the dataset, how can I solve this problem. Need help.


Solution

  • Accumulators are variables that are used to accumulate data across the executors and send them back to the driver. If you read its value from the executor, the behavior is not defined (AFAIK). I think you would probably get what has been accumulated for the local partition so far. Indeed, the goal of spark is to make parallel computations. Therefore, when using an accumulator, the data is accumulated for each partition in a separate accumulator which are then merged and sent back to the driver (map reduce paradigm). So you cannot use an accumulator to share information between the executors. That's just not what it's meant for

    What you can do however is this either use zipWithIndex from the RDD API if you need consecutive indices, or monoticallyIncreasingId from the SparkSQL API if you just need increasing indices. The former triggers a small spark job while the later is almost free (no spark job).

    Option 1 (increasing but not necessarily consecutive indices)

    yourDataframe.withColumn("id", functions.monotonicallyIncreasingId());
    

    Option 2 (consecutive and increasing indices)

    StructType schema = yourDataframe.schema();
    schema.add(new StructField("id", DataTypes.LongType, false,null));
    JavaRDD<Row> rdd = yourDataframe.toJavaRDD().zipWithIndex()
        .map(x -> {
             Collection<Object> row = JavaConverters.asJavaCollection(x._1.toSeq());
             Long index = x._2;
             row.add(index);
             return RowFactory.create(row);
        });
    Dataset<Row> indexedData = spark.createDataFrame(rdd, schema);