I am trying to filter records which donot have expected number of fields in row below is my code
no_of_rows_in_each_column=3 delimiter = ","
input.csv
emp_id,emp_name,salary
1,"siva
Prasad",100
2,pavan,200,extra
3,prem,300
4,john
Expecetd output dataframes
Correct_input_data_frame
emp_id,emp_name,salary
1,"siva Prasad",100
3,prem,300
wrong_file.csv it is file
emp_id,emp_name,salary,no_of_fields
2,pavan,200,extra,4 fields in row 3 fields expected
4,john, 2 fields in row 3 expected
I tried this, seems able to read but len() function not working on rows.
input_df = (spark.read
.option("multiline", "true")
.option("quote", '"')
.option("header", "true")
.option("escape", "\\")
.option("escape", '"')
.csv('input.csv')
)
correct = input_df.(filter(len(row{}) = 3)
wrong_data = input_df.(filter(len(row{})<>3)
You should specify the schema, then you can use the columnNameOfCorruptRecord
option.
I've implemented it using Scala, but the Python implementation should be similar.
val df = spark.read
.schema("emp_id Long, emp_name String, salary Long, corrupted_record String")
.option("columnNameOfCorruptRecord", "corrupted_record")
.option("multiline", "true")
.option("ignoreLeadingWhiteSpace", false)
.option("quote", "\"")
.option("header", "true")
.option("escape", "\\")
.csv("input.csv")
df.show()
The result is:
+------+------------+------+-----------------+
|emp_id| emp_name|salary| corrupted_record|
+------+------------+------+-----------------+
| 1|siva\nPrasad| 100| null|
| 2| pavan| 200|2,pavan,200,extra|
| 3| prem| 300| null|
| 4| john| null| 4,john|
+------+------------+------+-----------------+
Now, it is pretty straightforward to filter correct and wrong data:
val correctDF = df.filter(col("corrupted_record").isNull)
val wrongDF = df.filter(col("corrupted_record").isNotNull)