Search code examples
pythondataframeapache-sparkpysparktxt

How can I separate data where no of fields not matching in each row using spark


I am trying to filter records which donot have expected number of fields in row below is my code

no_of_rows_in_each_column=3 delimiter = ","

input.csv

    emp_id,emp_name,salary
    1,"siva 
    Prasad",100
    2,pavan,200,extra
    3,prem,300
    4,john

Expecetd output dataframes

Correct_input_data_frame

emp_id,emp_name,salary
1,"siva Prasad",100
3,prem,300

wrong_file.csv it is file

emp_id,emp_name,salary,no_of_fields
2,pavan,200,extra,4 fields in row 3 fields expected
4,john, 2 fields in row 3 expected

I tried this, seems able to read but len() function not working on rows.

input_df = (spark.read
      .option("multiline", "true")
      .option("quote", '"')
      .option("header", "true")
      .option("escape", "\\")
      .option("escape", '"')
      .csv('input.csv')
)

correct = input_df.(filter(len(row{}) = 3)
wrong_data = input_df.(filter(len(row{})<>3)

Solution

  • You should specify the schema, then you can use the columnNameOfCorruptRecordoption.

    I've implemented it using Scala, but the Python implementation should be similar.

    val df = spark.read
        .schema("emp_id Long, emp_name String, salary Long, corrupted_record String")
        .option("columnNameOfCorruptRecord", "corrupted_record")
        .option("multiline", "true")
        .option("ignoreLeadingWhiteSpace", false)
        .option("quote", "\"")
        .option("header", "true")
        .option("escape", "\\")
        .csv("input.csv")
    
    df.show()
    

    The result is:

    +------+------------+------+-----------------+
    |emp_id|    emp_name|salary| corrupted_record|
    +------+------------+------+-----------------+
    |     1|siva\nPrasad|   100|             null|
    |     2|       pavan|   200|2,pavan,200,extra|
    |     3|        prem|   300|             null|
    |     4|        john|  null|           4,john|
    +------+------------+------+-----------------+
    

    Now, it is pretty straightforward to filter correct and wrong data:

    val correctDF = df.filter(col("corrupted_record").isNull)
    val wrongDF = df.filter(col("corrupted_record").isNotNull)