Search code examples
pythonapache-sparkpysparkapache-spark-sqlorc

Pyspark - Merge multiple ORC schemas


I have 2 different directories with an ORC file under them. These 2 files have different schemas. Upon reading both directories into the same DataFrame, the final schema depends on the order of the paths.

Consider the following code to replicate this:

data = [
    (1, "player1", "google.com", True),
    (2, "player1", "youtube.com", True),
    (3, "player2", "facebook.com", True),
    (4, "player2", "record.pt", True),
    (5, "player2", "yahoo.com", True),
    (6, "player3", "facebook.com", False),
    (7, "player3", "record.pt", True),
    (8, "player3", "yahoo.com", True),
    (9, "player4", "", True),
    (10, "player4", "record.pt", True),
    (11, "player4", "abola.pt", True),
    (12, "player4", None, True)
]

data2 = [
    (13, "player1", True),
    (14, "player2", True),
    (15, "player3", True),
    (16, "player4", True),
    (17, "player3", True),
    (18, "player3", True),
]

spark = SparkSession.builder.getOrCreate()

df1 = spark.createDataFrame(data, ["id", "splayer", "website", "bool"])
df2 = spark.createDataFrame(data2, ["id", "splayer", "bool"])

df1.coalesce(1).write.orc('temporary/bla=1', mode='overwrite')
df2.coalesce(1).write.orc('temporary/bla=2', mode='overwrite')

df = spark.read.option("mergeSchema", "true").option("basePath", "temporary").orc(['temporary/bla=2', 'temporary/bla=1'])

df.show()

This yields the output:

+---+-------+-----+---+                                                         
| id|splayer| bool|bla|
+---+-------+-----+---+
|  1|player1| true|  1|
|  2|player1| true|  1|
|  3|player2| true|  1|
|  4|player2| true|  1|
|  5|player2| true|  1|
|  6|player3|false|  1|
|  7|player3| true|  1|
|  8|player3| true|  1|
|  9|player4| true|  1|
| 10|player4| true|  1|
| 11|player4| true|  1|
| 12|player4| true|  1|
| 13|player1| true|  2|
| 14|player2| true|  2|
| 15|player3| true|  2|
| 16|player4| true|  2|
| 17|player3| true|  2|
| 18|player3| true|  2|
+---+-------+-----+---+

If I change the order of the directories, the following output will generated:

+---+-------+------------+-----+---+                                            
| id|splayer|     website| bool|bla|
+---+-------+------------+-----+---+
|  1|player1|  google.com| true|  1|
|  2|player1| youtube.com| true|  1|
|  3|player2|facebook.com| true|  1|
|  4|player2|   record.pt| true|  1|
|  5|player2|   yahoo.com| true|  1|
|  6|player3|facebook.com|false|  1|
|  7|player3|   record.pt| true|  1|
|  8|player3|   yahoo.com| true|  1|
|  9|player4|            | true|  1|
| 10|player4|   record.pt| true|  1|
| 11|player4|    abola.pt| true|  1|
| 12|player4|        null| true|  1|
| 13|player1|        null| true|  2|
| 14|player2|        null| true|  2|
| 15|player3|        null| true|  2|
| 16|player4|        null| true|  2|
| 17|player3|        null| true|  2|
| 18|player3|        null| true|  2|
+---+-------+------------+-----+---+

When I researched this problem, I found several posts stating that option("mergeSchema", "true") would be a solution. In fact, there was an pull request for this.

Is there a solution for this or is it still an open issue?

I am using (Py)Spark 2.4.3 and Python 3.6.8.

Thank you in advance!

UPDATE:

The PR mentioned above will only be available for Spark 3.0.0. Thank you for the information @Shaido.


Solution

  • I'm encountering this same issue as a result of schema evolution from some vendor data. I've been trying a few different ideas since the ORC mergeSchema option isn't available prior to Spark 3.0 and we are running 2.3 My first thought was to create an empty dataframe with my complete schema including any new columns and save that as an ORC file to a directory that is alphabetically first in order. For example, if my data was partitioned by load_date then I would have folders such as load_date=00000000, load_date=20200501, load_date=20200601, etc. I would then put my empty dataframe with full schema in the 00000000 partition. This worked but it isn't all that clean and I'm not confident that there doesn't exist an edge case where the ORC reader wouldn't somehow choose a different ORC file to base the schema off of. So then I thought of just providing a schema with all the cols I need to the ORC reader and that works.

    schema = StructType([StructField('state', StringType(), True), StructField('new_col_middle', StringType(), True), StructField('abbr', StringType(), False), StructField('population', IntegerType(), False), StructField('new_col2', StringType(), False)])
    df = spark.read.schema(schema).orc('/data/sandbox/orc_schema_evolution/')
    

    in the orc_schema_evolution folder in HDFS we have the partitioned load_date folders where some ORC files have the schema ('state', 'population') and others have a schema of ('state','population','abbr'). Notice that i was even able to rearrange the order of existing columns with this method.