public Dataset<Row> myfunc(SparkSession spark, Dataset<Row> dfa, Dataset<Row> dfb){
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
Broadcast<Dataset<Row>> excRateBrdCast = jsc.broadcast(dfa); // very small local test DS . 5 rows 4 cols
log.info(" ##### " + excRateBrdCast.value().count()); //works
spark.udf().register("someudf", new UDF4<Date, String, String, Double, Double>(){
@Override
public Double call(Date cola, String colb, String colc, Double original){
Dataset<Row> excBrdcastRecv = excRateBrdCast.value();
Double newRate = original; // If I set a debugger here and do excBrdcastRecv.count() . It freezes no error. But no result
if(!colc.equals("SOME")){
Dataset<Row> ds6 = excBrdcastRecv.filter(row -> {
boolean cond1 = row.getAs("cola").toString().equals(cola.toString());
boolean cond2 = row.getAs("colb").toString().equals(colb);
return cond1 && cond2;
});
Double val9 = ds6.first().getAs("colc"); //Spark in local mode freezes here . No error. Just dont proceed
newRate = newRate*val9;
}
return newRate;
}
}, DataTypes.DoubleType);
Dataset<Row> newDs = dfb.withColumn
("addedColumn", callUDF("someudf", col("cola"), col("colb"), col("colc"), col("cold")));
return newDs;
}
A few pointers -
Double newRate = original; // If I set a debugger here and do excBrdcastRecv.count() . It freezes no error. But no result
. same as when action is calledINFO DAGScheduler - Submitting 1 missing tasks from ResultStage 46 (MapPartitionsRDD[267] at first at PositionsControls.java:178) (first 15 tasks are for partitions Vector(0))
INFO TaskSchedulerImpl - Adding task set 46.0 with 1 tasks
Well there was a classical error in the above code. So the variable excRateBrdCast
is been broadcast-ed. Then new UDF4
is registered. Spark will actually execute that UDF across several executor machines. On those machine , spark won't be able to see excRateBrdCast
so it will halt forever for excRateBrdCast.value()
to arrive. So some how we need to pass the excRateBrdCast
to UDF. Their serial appearance in code-block is a deception .
So what you need to do is take UDF in another class. And Don't do inline initialization .. Define a parameterized constructor in the UDF which takes broadcast variable excRateBrdCast
and pass it during initialization .
Then it will be able to see the broadcast variable.