Search code examples
apache-sparkpysparkazure-databricksdatabricks-unity-catalog

Photon ran out of memory while executing this query. Photon failed to reserve 349.4 MiB for hash table var-len key data


I am trying to run below code with Delta Live Tables.

@dlt.view
def data():
    return (spark.readStream
        .option("skipChangeCommits", "true")
        .format("delta")
        .table("table")
        .withColumnRenamed("col","col_name")

    )
    
schema = StructType([
    StructField("col_name", StringType(), True),
  ])
dlt.create_streaming_table(
    name="table",
    spark_conf={},
    table_properties={"quality":"bronze"},
    partition_cols=["col"],
    schema=schema
    )
dlt.apply_changes(
    target="table",
    source="data",
    keys=["userId"],
    sequence_by=col("col_name"),
    stored_as_scd_type=1 
)

While running this I get error

Caused by: org.apache.spark.memory.SparkOutOfMemoryError: Photon ran out of memory while executing this query. Photon failed to reserve 349.6 MiB for hash table var-len key data, in ParquetDictionaryEncoder, in FileWriterNode(id=34323, output_schema=[]), in task.


Solution

  • By increasing the memory allocation per executor and reducing the total number of executors, each executor will have more memory available.

    You can do this by setting the spark.databricks.delta.photon.buffer.maxMemory configuration property to a higher value. You can add the following line of code before creating the dlt object:

    spark.conf.set("spark.databricks.delta.photon.buffer.maxMemory", "8g")
    

    I have tried the below:

    spark.conf.set("spark.databricks.delta.properties.defaults.spark.driver.memory", "8g")
    spark.conf.set("spark.databricks.delta.properties.defaults.spark.executor.memory", "8g")
    spark.conf.set("spark.databricks.delta.photon.buffer.maxMemory", "8g")
    

    Results:

    driver_memory = spark.conf.get("spark.databricks.delta.properties.defaults.spark.driver.memory")
    executor_memory = spark.conf.get("spark.databricks.delta.properties.defaults.spark.executor.memory")
    photon_memory = spark.conf.get("spark.databricks.delta.photon.buffer.maxMemory")
    print("Driver Memory:", driver_memory)
    print("Executor Memory:", executor_memory)
    print("Photon Buffer Max Memory:", photon_memory)
    
    Driver Memory: 8g
    Executor Memory: 8g
    Photon Buffer Max Memory: 8g
    

    Reference: SO link