I have around 12K binary files, each of 100mb in size and contains multiple compressed records with variables lengths. I am trying to find the most efficient way to read them, uncompress and then write back in parquet format. The cluster i have has is 6 nodes with 4 cores each.
At this moment with pseudocode below, it takes around 8 hrs to read all the files and writing back to parquet is very very slow.
def reader(file_name):
keyMsgList = []
with open(file_name, "rb") as f:
while True:
header = f.read(12)
if not header:
break
keyBytes = header[0:8]
msgLenBytes = header[8:12]
# conver keyBytes & msgLenBytes to int
message = f.read(msgLen)
keyMsgList.append((key, decode(message)))
return keyMsgList
files = os.listdir("/path/to/binary/files")
rddFiles = sc.parallelize(files, 6000)
df = spark.createDataFrame(rddFiles.flatMap(reader), schema)
df.repartition(6000).write.mode("append").partitionBy("key").parquet("/directory")
The rational behind choosing 6000 here sc.parallelize(files, 6000)
is creating partitions each with 200 MB in size i.e. (12k files * 100mb size) / 200MB
. Being the sequential nature of file content that is needs to read each of them byte by byte, not sure if read can be further optimised?
Similarly, when writing back to parquet, the number in repartition(6000)
is to make sure data is distributed uniformly and all executors can write in parallel. However, it turns out be a very slow operation.
One solution is to increase the number of executors, which will improve the read performance but not sure if it will improve writes?
Looking for any suggestion on how can I improve the performance here?
repartition
but coalesce
.See here. You identified the bottleneck of the repartition
operatio, this is because you have launched a full shuffle. With coalesce
you won't do that. You will end up with N partitions also. They won't be as balanced as those you would get with repartition
but does it matter ?
I would recommend you to favor coalesce
rather than repartition
Your application runs with 6 nodes with 4 cores. You have 6000 partitions. This means you have around 250 partitions by core (not even counting what is given to your master). That's, in my opinion, too much.
Since your partitions are small (around 200Mb) your master probably spend more time awaiting anwsers from executor than executing the queries.
I would recommend you to reduce the number of partitions
DataFrame API operations are generally faster and better than a hand-coded solution.
Maybe have a look at pyspark.sql.functions
to see if you can find something there (see here). I don't know if it's relevent since I have not seen your data but that's a general recommendation I do from my experience.