Search code examples
javasqlapache-sparkhashmap

Why is the data not getting added to the map while the same function can by used to print dataset to console. What am I doing wrong here?


dataset = dataset.withColumn("Probability", callUDF("checkProb", col("Confirmed"), col("Population")));
    
Map<String, Double> probability= new HashMap<>();
ArrayList<String> a =new ArrayList<>();
dataset= dataset.limit(35);
dataset.show(36);
dataset.foreach((ForeachFunction<Row>) row -> a.add(row.getAs("State").toString()));
                    
System.out.println(a.size());

the size prints 0 no matter what i do. i have tried arraylist and map but doesn't work.


Solution

  • Spark distributes the workload to different executors. The driver process provides a copy of each local variable for each executor. This copy is independent of the original variable and if an executor alters the copy, the original variable stays unchanged. foreach is run by the executors, and each executor gets its own copy of a. You can see that if you print the identityHashCode of the ArrayList:

    ArrayList<String> a = new ArrayList<>();
    dataset = dataset.limit(35);
    dataset.show(36);
    System.out.println("a in the driver process: " + System.identityHashCode(a));
    dataset.foreach((ForeachFunction<Row>) row -> {
        a.add(row.getAs("value").toString());
        System.out.println("a on an executor " + System.identityHashCode(a));
    });
    System.out.println("back in the driver process: " + System.identityHashCode(a));
    

    prints

    a in the driver process: 1859780907
    a on an executor 229101481
    a on an executor 2105534525
    a on an executor 1982276971
    back in the driver process: 1859780907
    

    Therefore, the ArrayList on which you call size() gets never altered.

    Btw: it is a bad practice to use the driver's local variables on the executors as this may cause (not only performance) problems. You should consider using broadcast variables and accumulators.