Good morning y'all, quick question about schemas and pyspark dataframes. Let's say I start with a schema and use that schema to load data into a dataframe from a file. So far, so good.
However, I noticed that when an optional field (so nullable=true on the schema) doesn't exist at all in my data, pyspark just doesn't include it in the schema (instead of setting its value to null), so my code seizes up when I do a select on that field later down the line.
So my question, is there a way to have pyspark create every element in the schema and set them to null when they don't exist instead of just removing them from the schema?
When reading CSV files with a specified schema, it is possible that the data in the files does not match the schema.
PERMISSIVE (default): nulls are inserted for fields that could not be parsed correctly DROPMALFORMED: drops lines that contain fields that could not be parsed FAILFAST: aborts the reading if any malformed data is found To set the mode, use the mode option.
Sample data and schema:
repair_request_schema = StructType([
StructField("request_id", StringType(), nullable=True),
StructField("customer_name", StringType(), nullable=True),
StructField("repair_type", StringType(), nullable=True),
StructField("repair_cost", IntegerType(), nullable=True),
])
repair_request_data = [
("1", "John Doe", "Electrical", 200),
("2", "Jane Smith", "Plumbing", None)
]
I have tried the below as an example:
read_repair_request_df = (
spark.read.option("mode", "PERMISSIVE")
.schema(repair_request_schema)
.csv("/FileStore/tables/repair_req.csv")
)
read_repair_request_df.show()
Results:
+----------+-------------+-----------+-----------+
|request_id|customer_name|repair_type|repair_cost|
+----------+-------------+-----------+-----------+
| 1| John Doe| Electrical| 200|
| 2| Jane Smith| Plumbing| NULL|
+----------+-------------+-----------+-----------+
The PERMISSIVE
mode will handle missing fields by setting them to null.
In the PERMISSIVE
mode it is possible to inspect the rows that could not be parsed correctly using one of the following methods:
You can provide a custom path to the option badRecordsPath
to record corrupt records to a file.
You can add the column _corrupt_record
to the schema provided to the DataFrameReader to review corrupt records in the resultant DataFrame.
Reference:
Work with malformed CSV records
Second Approach: fill null values with a default value using fillna method.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("gender", StringType(), True)
])
data = [("Alice", 25, "F"), (None, None, "M"), ("Bob", 30, None)]
df = spark.createDataFrame(data, schema)
df = df.fillna({"name": "Unknown", "age": -1, "gender": "Unknown"})
df.select(col("name"), col("age"), col("gender")).show()
Results:
+-------+---+-------+
| name|age| gender|
+-------+---+-------+
| Alice| 25| F|
|Unknown| -1| M|
| Bob| 30|Unknown|
+-------+---+-------+