Search code examples
javaazureapache-sparkdatabricksazure-databricks

How to check if a class is installed on Databricks through a Spark Listener?


Using a Spark Listener on Databricks, I am trying to see if a given class is installed but given Databricks' way of installing packages, the Listener cannot see packages installed after the cluster has been started.

In a Java Spark Listener, is there a better way to recognize that a class is installed for packages installed via Databricks' Libraries API / UI?

Summary

  • Using a SparkListener installed via a cluster-scoped init script on Databricks.
  • Using ClassLoader in the Listener to check if a given class is installed.
  • On Apache Spark
    • Works on Apache Spark if the Listener is installed via --packages or --jars.
    • Fails on Apache Spark if the Listener is installed via --conf spark.driver.extraClassPath and the desired libraries were installed via --packages or --jars.
  • On (Azure) Databricks
    • Works on Databricks if the library already exists in the /datbaricks/jars directory which is the $CLASSPATH directory.
    • Fails on Databricks if the library is installed via the Libraries API / UI (jars installed this way seem to end up at /local_disk0/tmp).

Spark Listener Details

With Apache Spark, I can install a Spark Listener via --packages + --conf spark.extraListeners=listener.MyListener and leverage a ClassLoader in the Spark Listener to check for any class installed through --jars, --packages, or on the class path. The listener to detect if a class exists looks like this.

public class MyListener extends org.apache.spark.scheduler.SparkListener {

  private static final Logger log = LoggerFactory.getLogger("MyLogger");
  @Override
  public void onJobStart(SparkListenerJobStart jobStart) {
    try{
      log.info("Trying LogicalRelation");
      MyListener.class.getClassLoader().loadClass(
      "org.apache.spark.sql.execution.datasources.LogicalRelation"
      );
      log.info("Got logical relation");
    }
    catch (ClassNotFoundException e){
      log.info("Couldn't find LogicalRelation");
    }

    try{
      log.info("Trying org.apache.iceberg.catalog.Catalog");
      MyListener.class.getClassLoader().loadClass("org.apache.iceberg.catalog.Catalog");
      log.info("Got org.apache.iceberg.catalog.Catalog!!!!");
    } catch(ClassNotFoundException e){
      log.info("Could not get org.apache.iceberg.catalog.Catalog");
    }

    try{
      log.info("Trying Kusto DefaultSource");
      MyListener.class.getClassLoader().loadClass("com.microsoft.kusto.spark.datasource.DefaultSource");
      log.info("Got Kusto DefaultSource!!!!");
    } catch(ClassNotFoundException e){
      log.info("Could not get Kusto DefaultSource");
    }
  }
}

On Databricks, the listener is installed via an init script that looks like:

cp -f /dbfs/databricks/custom/listener.jar /mnt/driver-daemon/jars || { echo "Error"; exit 1;}

cat << 'EOF' > /databricks/driver/conf/customer-listener.conf
[driver] {
  "spark.extraListeners" = "listener.MyListener"
}
EOF

This installation approach is similar to other public listeners:

Attempted to use URLClassLoader

It seems that the Scala ClassLoader doesn't play nicely with a Java classloader. I attempted to add a URLClassLoader as per another SO post on setting a different classloader but the ClassNotFoundException continues.

This code on a Databricks Interactive notebook, however, does successfully find my test classes

URLClassLoader ucl;
    try {
      log.info("URL Class Loader Attempt V3");
      File file = new File("/local_disk0/tmp/");
      URL classUrl = file.toURI().toURL();
      URL[] urls = new URL[] { classUrl };
      System.out.println(urls.toString());
      ucl = new URLClassLoader(urls, getClass().getClassLoader());
      ucl.loadClass("com.microsoft.kusto.spark.datasource.DefaultSource");
      try {
        ucl.close();
      } catch (IOException e) {
        log.error("Failed to close url classloader");
      }
      log.info("GOT KustoLIBRARY with URL Class Loader!");
    } catch (ClassNotFoundException e) {
      // Still hitting this one
      log.info("Could not get Kusto Library with URLClassLoader");
    } catch (MalformedURLException e) {
      log.info("The URL was malformed");
    }

Databricks Library Details

With Databricks, the majority of users use the Libraries feature which installs jars AFTER spark has started and allows for users to easily install a jar via the Databricks UI or through an API.

When using the above listener and the ClassLoader it will consistently raise a ClassNotFoundException for packages installed via the Libraries API.

In the Databricks logs, I can see the desired jar being installed in the logs.

22/07/14 13:32:34 INFO DriverCorral: [Thread 123] AttachLibraries - candidate libraries: List(JavaJarId(dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar,,NONE)
22/07/14 13:32:34 INFO DriverCorral: [Thread 123] AttachLibraries - new libraries to install (including resolved dependencies): List(JavaJarId(dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar,,NONE))
22/07/14 13:32:37 INFO SharedDriverContext: [Thread 123] attachLibrariesToSpark JavaJarId(dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar,,NONE)
22/07/14 13:32:37 INFO LibraryDownloadManager: Downloading a library that was not in the cache: JavaJarId(dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar,,NONE)
22/07/14 13:32:37 INFO LibraryDownloadManager: Attempt 1: wait until library JavaJarId(dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar,,NONE) is downloaded
22/07/14 13:32:37 INFO LibraryDownloadManager: Downloaded library JavaJarId(dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar,,NONE) as local file /local_disk0/tmp/addedFile2043314239110388521kusto_spark_3_0_2_12_3_0_0-6add9.jar in 39 milliseconds
22/07/14 13:32:37 INFO SharedDriverContext: Successfully saved library JavaJarId(dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar,,NONE) to local file /local_disk0/tmp/addedFile2043314239110388521kusto_spark_3_0_2_12_3_0_0-6add9.jar
22/07/14 13:32:37 INFO SharedDriverContext: Successfully attached library dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar to Spark
22/07/14 13:32:37 INFO LibraryState: [Thread 123] Successfully attached library dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar

If I were to install the desired jar/package and all of it's dependencies into the /databricks/jars folder, the Spark Listener can successfully detect that the packages are installed. Confirmed by Databricks Employee on SO. However, this is not a common practice given the Databricks Libraries feature.

So, it all seems to boil down to: How do I get the main ClassLoader on a Databricks interactive or job clusters to recognize libraries installed via the Spark Application context (as seen in the Libraries API / UI)?

Thank you for any insights!


Solution

  • Using Thread.currentThread().getContextClassLoader().loadClass("<class_name>") instead of MyListener.class.getClassLoader().loadClass("<class_name>") appears to work as required in this case.

    The Apache Spark implementation also uses Thread.currentThread().getContextClassLoader.

    The following Stack Overflow posts are helpful for understanding the difference between the two approaches:

    This article also seems to have even more information about the different types of classLoaders in Java.

    Hope this helps!