Search code examples
scalaapache-sparkparquet

Spark dump to parquet with column as array of structures


I need to load a csv file which has a column that has an array of structures, and dump it to another location in parquet format. My csv file has two columns, column A and B. The data type of column B is array<struct<x: bigint, y:bigint>>

I tried to load the csv file with a schema as follows:

val schemaB = ArrayType(StructType(Seq(StructField("x",LongType),StructField("y",LongType))))
val schema = new StructType().add("A",StringType).add("B",schemaB)
spark.read.option("sep", "\t").schema(schema).csv(<location>)

However, this didn't work. I got the below error:

org.apache.spark.sql.AnalysisException: CSV data source does not support array<struct<x:bigint,y:bigint>&gt; data type.;</struct<x:bigint,y:bigint>

I even tried casting to the required type, but that didn't work.

This is an example of how the column B looks:

|B                                                                                                                                                                                                                                                                                                                                                                                                                   |
+---------------------------------------------------------------------------------------------+
|68222:102332,21215:1000,10982:70330,|
|93302:13320,263721:902615,9382:100020,|

Solution

  • You can use transform function if you are using latest version of spark i.e 2.4+

    Read as a string first, split by "," to get list and again split by ":" to get x and y

    val schema = new StructType().add("A",StringType).add("B",StringType)
    val df = spark.read.option("delimiter", "\t").schema(schema).csv("path to csv")
    val splitExpr =  expr("transform(split(B, ','), x -> (split(x, ':')[0] as x, split(x, ':')[1] as y))")
    
    val result = df.select($"A", splitExpr.cast("array<struct<x: long, y:long>>") as "B" )
    

    Now this you can save in parquet If You are using older version of spark then you need to write an udf Final Schema:

    root
     |-- A: string (nullable = true)
     |-- B: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- x: long (nullable = true)
     |    |    |-- y: long (nullable = true)