I have been working on Spark Datasets recently, I have a scenario where I have to generate row number for each row and store it in a column named "Ids". This row number starts from 1, 2, 3... and increments based on the number of rows in dataset. (In my case there are 10000-20000 records)
Consider, I have a dataset 'empDataset' with values:
name , dept , project
---------------------
Tina, Finance , abc
Leena, Finance , abc
Joe, Marketing , xyz
Now for the above dataset I want to add a Column 'Ids' with values incrementing from 1,2,3.. so on.
The expected output is this
name , dept , project ,Ids
--------------------------
Tina, Finance , abc , 1
Leena, Finance , abc , 2
Joe, Marketing , xyz , 3
I also want to store this output in a another dataset and use it further for different transformations.
Need help to solve this problem statement.!!
My code snippet :
LongAccumulator accValue = spark.sparkContext().longAccumulator();
long rowNumber = 1;
spark.udf().register("randomNumberGenerator", new UDF1<String, Long>() {
@Override
public Long call(String namCol) throws Exception {
accum.add(rowNumber);
System.out.println("inside" + accum.value());
return accum.value();
}
}, DataTypes.LongType);
Dataset<Row> empDatasetWithIds= empDataset.withColumn("Ids",callUDF("randomNumberGenerator",
col(name)));
Dataset<Row> filterDept = empDatasetWithIds.filter(...here filtering with dept...)
The output I am getting is empDatasetWithIds (Incorrect output) :
name , dept , project ,Ids
--------------------------
Tina, Finance , abc , 1
Leena, Finance , abc , 2
Joe, Marketing , xyz , 1
The above code works fine when run on local mode but on the cluster mode the values do not increment.
I also went through below links: https://community.hortonworks.com/questions/36888/spark-java-accumulator-not-incrementing.html Spark Java Accumulator not incrementing
The spark accumalators require an action to trigger the job. In my scenario, I am further performing filter transformation on the dataset, how can I solve this problem. Need help.
Accumulators are variables that are used to accumulate data across the executors and send them back to the driver. If you read its value from the executor, the behavior is not defined (AFAIK). I think you would probably get what has been accumulated for the local partition so far. Indeed, the goal of spark is to make parallel computations. Therefore, when using an accumulator, the data is accumulated for each partition in a separate accumulator which are then merged and sent back to the driver (map reduce paradigm). So you cannot use an accumulator to share information between the executors. That's just not what it's meant for
What you can do however is this either use zipWithIndex
from the RDD API if you need consecutive indices, or monoticallyIncreasingId
from the SparkSQL API if you just need increasing indices. The former triggers a small spark job while the later is almost free (no spark job).
Option 1 (increasing but not necessarily consecutive indices)
yourDataframe.withColumn("id", functions.monotonicallyIncreasingId());
Option 2 (consecutive and increasing indices)
StructType schema = yourDataframe.schema();
schema.add(new StructField("id", DataTypes.LongType, false,null));
JavaRDD<Row> rdd = yourDataframe.toJavaRDD().zipWithIndex()
.map(x -> {
Collection<Object> row = JavaConverters.asJavaCollection(x._1.toSeq());
Long index = x._2;
row.add(index);
return RowFactory.create(row);
});
Dataset<Row> indexedData = spark.createDataFrame(rdd, schema);