I'm wondering if ordering a dataframe before partitioning will make a difference in computation speed/resource consumption. To be specific, I have parquet files saved in Databricks and I want to filter on two columns, but the other one is too granular to only partition on it. If first ordering the dataset, I see the second column is ordered in the partition, but does it actually make any sense for Databricks? Will Databricks recognize that the records are sorted on the second column and speed up the read?
I want to look for movies based on their year, but sometimes also based on their name. But, movies name is too cardinal for it to be a partition itself, so I didn't include it in the partitionBy.
df.orderBy("year","movie_name").write.partitionBy("year").csv("dbfs:/FileStore/movies")
Is the above query any better than this one?
df.write.partitionBy("year").csv("dbfs:/FileStore/movies")
Or what's the best method for partitioning in such very cases? Year and Name will for sure be the two most used columns in the dataset.
You won't gain anything by ordering before writing parquet file, because the information about the ordering is not persisted in the file itself.
You can verify this by using the explain()
method of a dataframe. Let's do it ourselves:
df = spark.createDataFrame(
[
("niceMovie2", 2020),
("niceMovie1", 2020),
("niceMovie3", 2021)
],
["Title", "Year"]
)
>>> df.show()
+----------+----+
| Title|Year|
+----------+----+
|niceMovie2|2020|
|niceMovie1|2020|
|niceMovie3|2021|
+----------+----+
Let's write the dataframe to a parquet file without ordering and then reread it, filtering out a movie by Year
and Title
:
df.write.partitionBy("Year").parquet("movies.parquet")
df2 = spark \
.read \
.parquet("movies.parquet") \
.filter(col("Year") == 2020) \
.filter(col("Title") == "niceMovie2")
>>> df2.explain()
== Physical Plan ==
*(1) Filter (isnotnull(Title#4) AND (Title#4 = niceMovie2))
+- *(1) ColumnarToRow
+- FileScan parquet [Title#4,Year#5] Batched: true, DataFilters: [isnotnull(Title#4), (Title#4 = niceMovie2)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/somewhere/movies.parquet], PartitionFilters: [isnotnull(Year#5), (Year#5 = 2020)], PushedFilters: [IsNotNull(Title), EqualTo(Title,niceMovie2)], ReadSchema: struct<Title:string>
As you can see in the physical plan, the file will be read and the partitioning will be used to efficiently remove unused years (see the PartitionFilters: [isnotnull(Year #13), (Year#13 = 2020)]
bit).
Now let's do the same, but order the dataframe by Title
before reading it:
df.orderBy("Title").write.partitionBy("Year").parquet("moviesOrdered.parquet")
df3 = spark \
.read \
.parquet("movies.parquet") \
.filter(col("Year") == 2020) \
.filter(col("Title") == "niceMovie2")
>>> df3.explain()
== Physical Plan ==
*(1) Filter (isnotnull(Title#0) AND (Title#0 = niceMovie2))
+- *(1) ColumnarToRow
+- FileScan parquet [Title#0,Year#1] Batched: true, DataFilters: [isnotnull(Title#0), (Title#0 = niceMovie2)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/somewhere/moviesOrdered.parquet], PartitionFilters: [isnotnull(Year#1), (Year#1 = 2020)], PushedFilters: [IsNotNull(Title), EqualTo(Title,niceMovie2)], ReadSchema: struct<Title:string>
You see that in this case, the physical plan is exactly the same. So there is no difference in performance.