Search code examples
scalaapache-sparkapache-spark-sqlspark-csv

Read CSV with last column as array of values (and the values are inside parenthesis and separated by comma) in Spark


I have a CSV file where the last column is inside parenthesis and the values are separated by commas. The number of values is variable in the last column. When I read them to as Dataframe with some column names as follows, I get Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match. My CSV file looks like this

a1,b1,true,2017-05-16T07:00:41.0000000,2.5,(c1,d1,e1)
a2,b2,true,2017-05-26T07:00:42.0000000,0.5,(c2,d2,e2,f2,g2)
a2,b2,true,2017-05-26T07:00:42.0000000,0.5,(c2)
a2,b2,true,2017-05-26T07:00:42.0000000,0.5,(c2,d2)
a2,b2,true,2017-05-26T07:00:42.0000000,0.5,(c2,d2,e2)
a2,b2,true,2017-05-26T07:00:42.0000000,0.5,(c2,d2,e2,k2,f2)

what I finally want is something like this:

root
 |-- MId: string (nullable = true)
 |-- PId: string (nullable = true)
 |-- IsTeacher: boolean(nullable = true)
 |-- STime: datetype(nullable = true)
 |-- TotalMinutes: double(nullable = true)
 |-- SomeArrayHeader: array<string>(nullable = true)

I have written the following code till now:

val infoDF =
  sqlContext.read.format("csv")
    .option("header", "false")
    .load(inputPath)
    .toDF(
      "MId",
      "PId",
      "IsTeacher",
      "STime",
      "TotalMinutes",
      "SomeArrayHeader")

I thought of reading them without giving column names and then cast the columns which are after the 5th columns to array type. But then I am having problems with the parentheses. Is there a way I can do this while reading and telling that fields inside parenthesis are actually one field of type array.


Solution

  • Ok. The solution is only tactical for your case. The below one worked for me

      val df = spark.read.option("quote", "(").csv("in/staff.csv").toDF(
        "MId",
        "PId",
        "IsTeacher",
        "STime",
        "TotalMinutes",
        "arr")
      df.show()
      val df2 = df.withColumn("arr",split(regexp_replace('arr,"[)]",""),","))
      df2.printSchema()
      df2.show()
    

    Output:

    +---+---+---------+--------------------+------------+---------------+
    |MId|PId|IsTeacher|               STime|TotalMinutes|            arr|
    +---+---+---------+--------------------+------------+---------------+
    | a1| b1|     true|2017-05-16T07:00:...|         2.5|      c1,d1,e1)|
    | a2| b2|     true|2017-05-26T07:00:...|         0.5|c2,d2,e2,f2,g2)|
    | a2| b2|     true|2017-05-26T07:00:...|         0.5|            c2)|
    | a2| b2|     true|2017-05-26T07:00:...|         0.5|         c2,d2)|
    | a2| b2|     true|2017-05-26T07:00:...|         0.5|      c2,d2,e2)|
    | a2| b2|     true|2017-05-26T07:00:...|         0.5|c2,d2,e2,k2,f2)|
    +---+---+---------+--------------------+------------+---------------+
    
    root
     |-- MId: string (nullable = true)
     |-- PId: string (nullable = true)
     |-- IsTeacher: string (nullable = true)
     |-- STime: string (nullable = true)
     |-- TotalMinutes: string (nullable = true)
     |-- arr: array (nullable = true)
     |    |-- element: string (containsNull = true)
    
    +---+---+---------+--------------------+------------+--------------------+
    |MId|PId|IsTeacher|               STime|TotalMinutes|                 arr|
    +---+---+---------+--------------------+------------+--------------------+
    | a1| b1|     true|2017-05-16T07:00:...|         2.5|        [c1, d1, e1]|
    | a2| b2|     true|2017-05-26T07:00:...|         0.5|[c2, d2, e2, f2, g2]|
    | a2| b2|     true|2017-05-26T07:00:...|         0.5|                [c2]|
    | a2| b2|     true|2017-05-26T07:00:...|         0.5|            [c2, d2]|
    | a2| b2|     true|2017-05-26T07:00:...|         0.5|        [c2, d2, e2]|
    | a2| b2|     true|2017-05-26T07:00:...|         0.5|[c2, d2, e2, k2, f2]|
    +---+---+---------+--------------------+------------+--------------------+