Search code examples
pysparkdatabricksazure-databricks

Pyspark schema and dataframe interaction on optional fields


Good morning y'all, quick question about schemas and pyspark dataframes. Let's say I start with a schema and use that schema to load data into a dataframe from a file. So far, so good.

However, I noticed that when an optional field (so nullable=true on the schema) doesn't exist at all in my data, pyspark just doesn't include it in the schema (instead of setting its value to null), so my code seizes up when I do a select on that field later down the line.

So my question, is there a way to have pyspark create every element in the schema and set them to null when they don't exist instead of just removing them from the schema?


Solution

  • When reading CSV files with a specified schema, it is possible that the data in the files does not match the schema.

    PERMISSIVE (default): nulls are inserted for fields that could not be parsed correctly DROPMALFORMED: drops lines that contain fields that could not be parsed FAILFAST: aborts the reading if any malformed data is found To set the mode, use the mode option.

    Sample data and schema:

    repair_request_schema = StructType([
        StructField("request_id", StringType(), nullable=True),
        StructField("customer_name", StringType(), nullable=True),
        StructField("repair_type", StringType(), nullable=True),
        StructField("repair_cost", IntegerType(), nullable=True),
    ])
    repair_request_data = [
        ("1", "John Doe", "Electrical", 200),
        ("2", "Jane Smith", "Plumbing", None)
    ]
    

    I have tried the below as an example:

    read_repair_request_df = (
        spark.read.option("mode", "PERMISSIVE")
        .schema(repair_request_schema)
        .csv("/FileStore/tables/repair_req.csv")
    )
    read_repair_request_df.show()
    

    Results:

    +----------+-------------+-----------+-----------+
    |request_id|customer_name|repair_type|repair_cost|
    +----------+-------------+-----------+-----------+
    |         1|     John Doe| Electrical|        200|
    |         2|   Jane Smith|   Plumbing|       NULL|
    +----------+-------------+-----------+-----------+
    

    The PERMISSIVE mode will handle missing fields by setting them to null.

    In the PERMISSIVE mode it is possible to inspect the rows that could not be parsed correctly using one of the following methods:

    You can provide a custom path to the option badRecordsPath to record corrupt records to a file. You can add the column _corrupt_record to the schema provided to the DataFrameReader to review corrupt records in the resultant DataFrame.

    Reference:

    Work with malformed CSV records

    Second Approach: fill null values with a default value using fillna method.

    from pyspark.sql.types import StructType, StructField, StringType, IntegerType
    from pyspark.sql.functions import col
    schema =  StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("gender", StringType(), True)
    ])
    data = [("Alice", 25, "F"), (None, None, "M"), ("Bob", 30, None)]
    df = spark.createDataFrame(data, schema)
    df = df.fillna({"name": "Unknown", "age": -1, "gender": "Unknown"})
    df.select(col("name"), col("age"), col("gender")).show()
    

    Results:

    +-------+---+-------+
    |   name|age| gender|
    +-------+---+-------+
    |  Alice| 25|      F|
    |Unknown| -1|      M|
    |    Bob| 30|Unknown|
    +-------+---+-------+