Search code examples
sqlapache-sparkapache-spark-sql

Apache Spark inserts quotations marks in the first column


My Setup

I'm using the following components

  • spark-core_2.10
  • spark-sql_2.10

My Problem

this is basically my code

Dataset<Row> rowsSource = spark.read()
    .option("header", "true")
    .option("delimiter", ";")
    .csv("source.csv");

Dataset<Row> rowsTarget = spark.read()
    .option("header", "true")
    .option("delimiter", ";")
    .csv("target.csv");

rowsSource.createOrReplaceTempView("source");
rowsTarget.createOrReplaceTempView("target");

Dataset<Row> result = spark.sql("SELECT source.Id FROM source" +
                                " LEFT OUTER JOIN target USING(Id)" +
                                " WHERE target.Id IS NULL");

result.show();

here's some test data:

source:

"Id";"Status"

"1";"ERROR"

"2";"OK"

target:

"Id";"Status"

"2";"OK"

I expect, that the SQL Statement finds exactly one Id and that is "1"

but if I run it, and exception occurs in the line where the SQL statement is being executed

2017-03-21 17:00:09,693 INFO  [main] com.materna.mobility.smart.selenium.Aaaa: starting
Exception in thread "main" org.apache.spark.sql.AnalysisException: USING column `Detail` cannot be resolved on the left side of the join. The left-side columns: ["Detail", Detailp, Detaild, Detailb, Amount - 2016 48 +0100/1, Amount - 2016 49 +0100/1, Amount - 2016 50 +0100/1, Amount - 2016 51 +0100/1, Amount - 2016 52 +0100/1];
    at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$90$$anonfun$apply$56.apply(Analyzer.scala:1977)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$90$$anonfun$apply$56.apply(Analyzer.scala:1977)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$90.apply(Analyzer.scala:1976)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$90.apply(Analyzer.scala:1975)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$commonNaturalJoinProcessing(Analyzer.scala:1975)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$$anonfun$apply$31.applyOrElse(Analyzer.scala:1961)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$$anonfun$apply$31.applyOrElse(Analyzer.scala:1958)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:58)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:58)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$.apply(Analyzer.scala:1958)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$.apply(Analyzer.scala:1957)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
    at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
    at scala.collection.immutable.List.foldLeft(List.scala:84)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:62)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:63)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592)
    at MyClass.main(MyClass.java:48)

if I insert an additional semicolon(;) before the Id, everything works as expected, here is an example of that:

;"Id";"Status"

I think Spark then parses 3 columns, but since the first is not valid it gets ignored.


Solution

  • The Problem

    My CSV files had a BOM(Byte order mark) included(which I just found out)

    The byte order mark (BOM) is a Unicode character, U+FEFF Byte order mark (BOM), whose appearance as a magic number at the start of a text stream can signal several things to a program consuming the text

    after doing some more search I found this problem: https://github.com/databricks/spark-csv/issues/142

    apparently this has been an issue since 2015

    The fix

    The easiest is just to remove the BOM from the file.

    Another fix I found out(see question above), is that you can add an additional semicolon in front of your first column name. Apparently it then parses one more column, but the first one is invalid and gets ignored. HOWEVER: I strongly advise you to not use this, as it might be fixed in the future and the above solution is way more reliable

    A visualization

    Wikipedia states, that if I use UTF-8(which I did) I should expect the following characters(in hex) at the front of my file "EF BB BF"

    here you can see what I expected the CSV files should look like(since I didn't know they had a BOM just yet), but in reality how they look

    since I lack the reputation I cannot post images of the content, but here you go: