I'm having an issue with sorting output in parquet. I'm loading data from another parquet which is completely random and pretty big (thousands of lines - important fact). Getting info about phone users and desktop users and counting their queries + getting total number of queries.
Looking for table like this (sorted by total):
query | desktop_count | phone_count | total
------|---------------|-------------|------------
query1| 123 | 321 | 444
query2| 23 | 32 | 55
query3| 12 | 21 | 33
Problem is - whenever I wanna use any kind of function it splits into parts and repartition(1) joins them together but not sorted. Is there any way of joining like 20 parts of parquet into one but sorted? If there is any information needed, ask please.
Code (tried some more repartition):
def computeMSQueries(self):
pq = self.pyspark.read.parquet(*self.pySearchPaths)
desktop_df = pq.filter("hwType == 'DESKTOP'").groupby("query").count().repartition(1).withColumnRenamed('count','desktop_count')
phone_df = pq.filter("hwType == 'PHONE'").groupby("query").count().repartition(1).withColumnRenamed('count','phone_count')
res = desktop_df.join(phone_df, on=["query"], how='fullouter').fillna(0).withColumn("total", col("desktop_count")+col("phone_count")).orderBy(col('total').desc())
return res
self.computeMSQueries().repartition(1).write.parquet(outputDir)
When joining you should avoid using repartition()
, as this is very costly compared to coalesce()
, because coalesce()
allows avoiding data movement.
Another thing is that repartition()
the number of partitions can be increased/decreased, but with coalesce()
the number of partitions can only be decreased. This is why data is sorted and not shuffled.
Moreover, coalesce()
uses existing partitions to minimize the amount of data that's shuffled. The repartition()
creates new partitions and does a full shuffle. coalesce results in partitions with different amounts of data (sometimes partitions that have much different sizes) and repartition results in roughly equal sized partitions.
So you can use like below, just replacing repartition
with coalesce
will do the trick:
def computeMSQueries(self):
pq = self.pyspark.read.parquet(*self.pySearchPaths)
desktop_df = pq.filter("hwType == 'DESKTOP'").groupby("query").count().coalesce(1).withColumnRenamed('count','desktop_count')
phone_df = pq.filter("hwType == 'PHONE'").groupby("query").count().coalesce(1).withColumnRenamed('count','phone_count')
res = desktop_df.join(phone_df, on=["query"], how='fullouter').fillna(0).withColumn("total", col("desktop_count")+col("phone_count")).orderBy(col('total').desc())
return res
self.computeMSQueries().coalesce(1).write.parquet(outputDir)