Search code examples
scalaapache-sparkfilterrowsdrop

Drop rows in spark which dont follow schema


currently, schema for my table is:

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- aisle_id: string (nullable = true)
 |-- department_id: string (nullable = true)

I want to apply the below schema on the above table and delete all the rows which do not follow the below schema:

val productsSchema = StructType(Seq(
    StructField("product_id",IntegerType,nullable = true),
    StructField("product_name",StringType,nullable = true),
    StructField("aisle_id",IntegerType,nullable = true),
    StructField("department_id",IntegerType,nullable = true)
  ))

Solution

  • Use option "DROPMALFORMED" while loading the data which ignores corrupted records.

    spark.read.format("json")
      .option("mode", "DROPMALFORMED")
      .option("header", "true")
      .schema(productsSchema)
      .load("sample.json")