Search code examples
scalaapache-sparkapache-zeppelin

Class not found when applying udf in Apache Zeppelin


LATER EDIT: It seems the problem is related to the Apache Zeppelin interpreter. I'm using Apache Zeppelin 0.6.0 on Spark 1.6.0. When running the same code in spark-shell (2.0.0) there were no issues.

This might be a bit too specific, but maybe it helps others that get similar errors with UDFs.

What I want is to create a column in a Spark Dataframe based on a different column in that DF and a Seq of strings. So, create column "urban" and put 1 if the value in column "location" is in the sequence "cities" else put 0.

Tried solving it in several different ways. I get the same error. The final version is based on these posts: Use of Seq.contains(String) and Create new column with udf. This is what I have now:

val cities = Seq("london", "paris")
df.filter(lower($"location") isin (cities : _*)).count()

Long = 5485947 So I have records with those 2 locations

import org.apache.spark.sql.functions._
val urbanFlag: (String => Int) = (arg: String) => {if (cities.contains(arg)) 1 else 0}
val urbf = udf(urbanFlag)
df.withColumn("urban", urbf(lower($"location"))).show(100)

When I run this I get "Job aborted due to stage failure", the error:

java.lang.ClassNotFoundException: $iwC$$iwC$$iwC$$iwC$$iwC$$$$725d9ae18728ec9520b65ad133e3b55$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1

...and a huge stacktrace. I'd guess there is something about anonymous function but what?


Solution

  • Maybe there's an issue with the way you're defining the UDF? This works for me:

    import org.apache.spark.sql.functions._
    
    val data = sqlContext.read.json(sc.parallelize(Seq("{'location' : 'london'}", "{'location': 'tokyo'}")))
    
    val cities = Seq("london", "paris")
    val urbf = udf { city: String => if (cities.contains(city)) 1 else 0 }
    
    data.select($"location", urbf($"location")).show
    
    +--------+-------------+
    |location|UDF(location)|
    +--------+-------------+
    |  london|            1|
    |   tokyo|            0|
    +--------+-------------+
    

    Note that I'm defining the UDF directly, i.e. without an intermediate.