Search code examples
google-cloud-platformdatabricksdatabricks-autoloader

Databricks autoloader works on compute cluster, but does not work within a task in workflows


I feel like I am going crazy with this. I have tested a data pipeline on my standard compute cluster. I am loading new files as batch from a Google Cloud Storage bucket. Autoloader works exactly as expected from my notebook on my compute cluster. Then, I simply used this notebook as a first task in a workflow using a new job cluster. In order to test this pipeline as a workflow I first removed all checkpoint files and directories before starting the run using this command.

dbutils.fs.rm(checkpoint_path, True)

For some reason, the code works perfectly when testing, but in workflows, I get "streaming stopped" and no data from autoloader. Here is my config for autoloader:

file_path = "gs://raw_zone_twitter"
table_name = f"twitter_data_autoloader"
checkpoint_path = f"/tmp/_checkpoint/twitter_checkpoint"

spark.sql(f"DROP TABLE IF EXISTS {table_name}")

query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.withColumn("filePath", input_file_name())
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(once=True)
.toTable(table_name))

When running this as a workflow I see that the checkpoint directory is created, but there is no data inside.

The code between testing on my compute cluster, and the task in my workflow is exactly the same (same notebook), so I really have no idea why autoloader is not working within my workflow...

To confirm, I am using the same IAM service account for both my compute cluster and my job cluster.


Solution

  • I thought I would provide an answer to my own question since I have found the source of the issue and this might be helpful to others. There is also a related but not identical discussion on this SO question.

    PySpark Wait to finish in notebook (Databricks)

    In my case, autoloader IS working on my job cluster. The diffence between my compute cluster notebook and job is that in my notebook I am running my code cell by cell. In jobs, each cell is submitted to the cluster for processing in a distributed manner, and each subsequent cell is run after the previous ends. The issue is that when the cell containing my autoloader code ends, the stream has in fact not finished. Therefore, any outputs/tables/variables that are created by my stream are not available in memory of storage when downstream cells are executed.

    I solved this issue by enforcing that new cells cannot run until the stream has ended. Directly after the code above I added this method.

    query.awaitTermination()