Search code examples
rcsvhadoopapache-sparksparkr

Duplicate columns in Spark Dataframe


I have a 10GB csv file in hadoop cluster with duplicate columns. I try to analyse it in SparkR so I use spark-csv package to parse it as DataFrame:

  df <- read.df(
    sqlContext,
    FILE_PATH,
    source = "com.databricks.spark.csv",
    header = "true",
    mode = "DROPMALFORMED"
  )

But since df have duplicate Email columns, if I want to select this column, it would error out:

select(df, 'Email')

15/11/19 15:41:58 ERROR RBackendHandler: select on 1422 failed
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : 
  org.apache.spark.sql.AnalysisException: Reference 'Email' is ambiguous, could be: Email#350, Email#361.;
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:278)
...

I want to keep the first occurrence of Email column and delete the latter, how can I do that?


Solution

  • The best way would be to change the column name upstream ;)

    However, it seems that is not possible, so there are a couple of options:

    1. If the case of the columns are different("email" vs "Email") you can turn on case sensitivity:

           sql(sqlContext, "set spark.sql.caseSensitive=true")
      
    2. If the column names are exactly the same, you will need to manually specify the schema and skip the first row to avoid the headers:

      customSchema <- structType(
      structField("year", "integer"), 
      structField("make", "string"),
      structField("model", "string"),
      structField("comment", "string"),
      structField("blank", "string"))
      
      df <- read.df(sqlContext, "cars.csv", source = "com.databricks.spark.csv", header="true", schema = customSchema)