Search code examples
pythonapache-sparkpysparkinference

Empty string in mixed column nullifies a row when loaded using Spark


Consider the following JSON:

{"col1": "yoyo", "col2": 1.5}
{"col1": "",     "col2": 6}
{"col1": "456",  "col2": ""}
{"col1": 444,    "col2": 12}
{"col1": null,   "col2": 1.7}
{"col1": 3.14,   "col2": null}

which I load using (Py)Spark as follows:

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
df = spark.read.json("my.json")
df.show()

which yields:

+----+----+
|col1|col2|
+----+----+
|yoyo| 1.5|
|    | 6.0|
|null|null|  <---===***
| 444|12.0|
|null| 1.7|
|3.14|null|
+----+----+

I'm having hard time to understand why does the 3rd line is nullified. It seems like the reason is that the only string in the second column is the empty string "" and this somehow causes the nullification. Note that col1 contains an empty string at the 2nd row as well, but the row is not nullified.

For me, this is a very confusing and unexpected behaviour. I was not able to find hints in the documentation.

  • Is this behaviour expected? Why is it happening this way?
  • I would expect the 3rd row to hold the string "456" for col1 and the empty string "" for col2. How can I achieve this behaviour (which feels much more natural to me)?

Solution

  • It's not possible to mix different data types in a single column when using Spark.

    When reading the json file, Spark will try to infer the data type of each column (see the note at the bottom for more details). Here, Spark believes col1 to be of string type and col2 to be double. This can be confirmed by reading the json file and using printSchema on the dataframe.
    This means that the data is the parsed based on these inferred data types. Spark will therefore try to parse "" as a double which obviously fails. (For the second row in col1 it works since col1 is inferred to be of string type, "" is therefore a valid input.)

    When using spark.read.json it's possible to set different modes. From the documentation we have:

    mode -
    allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, PERMISSIVE.

    • PERMISSIVE: when it meets a corrupted record, puts the malformed string into a field configured by columnNameOfCorruptRecord, and sets other fields to null. To keep corrupt records, an user can set a string type field named columnNameOfCorruptRecord in an user-defined schema. If a schema does not have the field, it drops corrupt records during parsing. When inferring a schema, it implicitly adds a columnNameOfCorruptRecord field in an output schema.
    • DROPMALFORMED: ignores the whole corrupted records.
    • FAILFAST: throws an exception when it meets corrupted records.

    From the above, we can see that PERMISSIVE mode is used by default and that if a corrupted record is encountered, all fields are set to null. This is what happens in this case. To confirm, it's possible to set mode to FAILFAST,

    spark.read.json("my.json", mode='FAILFAST')
    

    which would give an exception.

    This could be solved by not inferring the data types and reading everything as strings instead.

    spark.read.json("my.json", primitivesAsString='true')
    

    Note: The schema inference for json is a bit different compared to other sources such as csv and txt, see here. For json files, both "" and null have special handling to deal with json generators that do not differentiate between the two. For csv files, a column with an empty string "" would still make the whole column be inferred to be string but this will not be the case for json.

    As a side note, replacing "" with e.g. "5" in col2 will make the inferred column type be string.