Search code examples
csvapache-sparkpysparkapache-spark-sql

Read a csv file in pyspark while enforcing schema but also ignoring extra columns at the end


I'm trying to read a pipe delimited file in pyspark. My requirements are:

-> Read the file only if all the columns are in the given order in the schema. If the order changes, or if a particular column is missing, ignore the file. I did this using the options header='True',enforceSchema=False

-> Now, the requirement is that while doing so, incase there are extra columns in the original file at the end, then ignore those columns but read the file. The current code will raise an exception because while enforcing schema, the number of columns in file and in the schema should also match.

This is the code which I'm using to read the file. df = spark.read.schema(schema).options(delimiter='|',header='True',enforceSchema=False).csv(file)

How do I achieve this requirement?

Example:

Suppose schema is A,B,C,D

If file columns are A|B|C|D, read the file and continue. (This part is working)

If file columns are A|C|B|D , then ignore the file/raise error. (This part is working)

If file columns are A|B|C|D|E , then also read the file and continue. Do not raise error here. (This part is what I'm trying to achieve)


Solution

  • try this :

    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType
    
    spark = SparkSession.builder.appName("ReadCSV").getOrCreate()
    
    schema_str = "A,B,C,D"
    schema = StructType.fromDDL(schema_str)
    
    def file_matches_schema(filepath, schema_str, delimiter):
        header = spark.read.options(delimiter=delimiter, header='True', maxRows=1).csv(filepath).columns
        return ','.join(header).startswith(schema_str)
    
    file = "/path/to/your/file.csv"
    delimiter = '|'
    
    if file_matches_schema(file, schema_str, delimiter):
        df = spark.read.schema(schema).options(delimiter=delimiter, header='True').csv(file)
    else:
        print("File columns do not match the desired schema order or have missing columns!")