Search code examples
apache-sparkuser-defined-functionsbroadcast

Spark freezes indefinitely when trying to access broadcast variable inside UDF


 public Dataset<Row> myfunc(SparkSession spark, Dataset<Row> dfa, Dataset<Row> dfb){
    JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());

    Broadcast<Dataset<Row>> excRateBrdCast = jsc.broadcast(dfa); // very small local test DS . 5 rows 4 cols
    log.info(" ##### " + excRateBrdCast.value().count()); //works
    spark.udf().register("someudf", new UDF4<Date, String, String, Double, Double>(){
        @Override
        public Double call(Date cola, String colb, String colc, Double original){
            Dataset<Row> excBrdcastRecv = excRateBrdCast.value();
            Double newRate = original; // If I set a debugger here and do excBrdcastRecv.count() . It freezes no error. But no result
            if(!colc.equals("SOME")){
                Dataset<Row> ds6 = excBrdcastRecv.filter(row -> {
                    boolean cond1 = row.getAs("cola").toString().equals(cola.toString());
                    boolean cond2 = row.getAs("colb").toString().equals(colb);
                    return cond1 && cond2;
                });
                Double val9 = ds6.first().getAs("colc"); //Spark in local mode freezes here . No error. Just dont proceed
                newRate = newRate*val9;
            }
            return newRate;
        }
    }, DataTypes.DoubleType);

    Dataset<Row> newDs = dfb.withColumn
            ("addedColumn", callUDF("someudf", col("cola"), col("colb"), col("colc"), col("cold")));

    return newDs;
}

A few pointers -

  1. If I remove access to excRateBrdCast.value() and send back hard coded values it works fine.
  2. Using spark 2.11 with java
  3. All data sets are very small local test data set so size isn't an issue.
  4. Not getting ant error just processing stuck when trying to access broadcast variable Double newRate = original; // If I set a debugger here and do excBrdcastRecv.count() . It freezes no error. But no result . same as when action is called
  5. log stuck at INFO DAGScheduler - Submitting 1 missing tasks from ResultStage 46 (MapPartitionsRDD[267] at first at PositionsControls.java:178) (first 15 tasks are for partitions Vector(0)) INFO TaskSchedulerImpl - Adding task set 46.0 with 1 tasks
  6. Running in local mode

Solution

  • Well there was a classical error in the above code. So the variable excRateBrdCast is been broadcast-ed. Then new UDF4 is registered. Spark will actually execute that UDF across several executor machines. On those machine , spark won't be able to see excRateBrdCast so it will halt forever for excRateBrdCast.value() to arrive. So some how we need to pass the excRateBrdCast to UDF. Their serial appearance in code-block is a deception .

    So what you need to do is take UDF in another class. And Don't do inline initialization .. Define a parameterized constructor in the UDF which takes broadcast variable excRateBrdCast and pass it during initialization .

    Then it will be able to see the broadcast variable.