Search code examples
azureazure-storageparquetazure-synapse

Error while reading Parquet files from Azure Synapse


I'm currently reading Parquet files stored in an Azure Storage Gen2 account. These Parquet files are continuously loaded and updated from an application where I preprocess and aggregate the data.

The issue I'm encountering is that when I launch a pipeline from Azure Synapse, it encounters an error when trying to read the Parquet files that are currently in use (i.e., being written to at that moment).

To work around this, I've attempted copying the Parquet files to another directory to create a snapshot of the data. It seems that the copying process is successful, and the rest of the pipeline runs perfectly. However, the problem arises when I copy the original Parquet files, as many of them end up corrupted.

Do you have any suggestions or ideas on how I can prevent the corruption of these Parquet files or read them directly without encountering errors?


Solution

  • I have tried the below approach in Azure synapse spark notebook.

    For example I have used a Dedicated Staging Area in Azure Storage account first write the new Parquet files. Once the new files are completely written. then move or copy them to the destination directory(GOLD)

    Staging Data: The Staging data is having 3 records. later I have performed INSERT,UPDATE,DELETE

    enter image description here

    • I am reading the Parquet files from the staging directory and creating a Delta table
    • As the Delta tables provide features like ACID transactions, schema evolution, and time travel queries, which can be valuable for data consistency and data quality in your pipeline.

    The below is the code:

    from pyspark.sql import SparkSession
    from delta import DeltaTable
    from pyspark.sql.functions import col, when
    spark = SparkSession.builder \
        .appName("ReadParquetAndWriteDelta") \
        .config("spark.hadoop.fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") \   .config("spark.hadoop.fs.azure.account.auth.type.adlsgen2dileep.dfs.core.windows.net", "SharedKey") \ .config("spark.hadoop.fs.azure.account.auth.key.adlsgen2dileep.dfs.core.windows.net", "Ga6fbmUJVhYqPQck17fKrOHu9qKMEmq2tFbJF5NQHzKHA8Ynlo5OylWV/6Pu6rJtEvHoKIGhTWeQ+AStXpsorA==") \
        .getOrCreate()
    adls_account_name = "adlsgen2dileep"
    container_name = "folder02"
    staging_directory = "staging"  # Staging directory where the Parquet files are located
    gold_directory = "gold"  # Destination directory for the Delta table
    parquet_directory = "emp.parquet"
    gold_table_name = "employee_gold"
    staging_path = f"abfss://{container_name}@{adls_account_name}.dfs.core.windows.net/{staging_directory}/{parquet_directory}"
    staging_df = spark.read.parquet(staging_path)
    delta_table_path = f"abfss://{container_name}@{adls_account_name}.dfs.core.windows.net/{gold_directory}/{gold_table_name}"
    delta_table = DeltaTable.forPath(spark, delta_table_path)
    **The merge operation**
    delta_table.alias("gold_table").merge(
        source=staging_df.alias("staging"),
        condition="gold_table.Name = staging.Name"
    ).whenMatchedUpdate(
        set={
            "Age": when(col("staging.Age").isNotNull(), col("staging.Age")).otherwise(col("gold_table.Age"))
        }
    ).whenNotMatchedInsert(
        values={
            "Name": col("staging.Name"),
            "Age": col("staging.Age")
        }
    ).execute()
    **Identifying records in the Delta table that do not exist in the staging data and mark them for deletion**
    delta_table.toDF().createOrReplaceTempView("delta_table")
    staging_df.createOrReplaceTempView("staging_df")
    deleted_records = spark.sql("""
        SELECT delta_table.*
        FROM delta_table
        LEFT JOIN staging_df ON delta_table.Name = staging_df.Name
        WHERE staging_df.Name IS NULL
    """)
    **Delete records from the Delta table that are identified for deletion**
    delta_table.alias("gold_table").merge(
        source=deleted_records.alias("staging"),
        condition="gold_table.Name = staging.Name"
    ).whenMatchedDelete().execute()
    #Delta DataFrame without the deleted records
    result_df = delta_table.toDF()
    result_df.show()
    

    enter image description here

    Using a MERGE operation in your Delta table for handling updates and upserts (inserts when not matched) efficiently. This allows you to maintain the state of your data by merging new or updated records into the existing data.

    from pyspark.sql import SparkSession
    from delta import DeltaTable
    spark = SparkSession.builder \
        .appName("ReadParquetAndWriteDelta") \
        .config("spark.hadoop.fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") \  .config("spark.hadoop.fs.azure.account.auth.type.adlsgen2dileep.dfs.core.windows.net", "SharedKey") \  .config("spark.hadoop.fs.azure.account.auth.key.adlsgen2dileep.dfs.core.windows.net", "Ga6fbmUJVhYqPQck17fKrOHu9qKMEmq2tFbJF5NQHzKHA8Ynlo5OylWV/6Pu6rJtEvHoKIGhTWeQ+AStXpsorA==") \
        .getOrCreate()
    adls_account_name = "adlsgen2dileep"
    container_name = "folder02"
    gold_directory = "gold"
    gold_table_name = "employee_gold"
    delta_table_path = f"abfss://{container_name}@{adls_account_name}.dfs.core.windows.net/{gold_directory}/{gold_table_name}"
    delta_table = DeltaTable.forPath(spark, delta_table_path)
    #history of changes (inserts, updates, deletes)
    history_df = delta_table.history().select(
        "version",
        "operation",
        "operationParameters",
        "timestamp",
        "userId",
        "userName"
    )
    display(history_df)
    

    enter image description here

    Above code provides the history of changes (inserts, updates, deletes) made to the Delta table and stores it in a DataFrame called history_df. The delta_table.history() method is used to fetch this information.