Search code examples
apache-sparkpyspark

Optimising Spark read and write performance


I have around 12K binary files, each of 100mb in size and contains multiple compressed records with variables lengths. I am trying to find the most efficient way to read them, uncompress and then write back in parquet format. The cluster i have has is 6 nodes with 4 cores each.

At this moment with pseudocode below, it takes around 8 hrs to read all the files and writing back to parquet is very very slow.

def reader(file_name):
    keyMsgList = []
    with open(file_name, "rb") as f:
        while True:
            header = f.read(12)
            if not header:
                break
            keyBytes = header[0:8]
            msgLenBytes = header[8:12]

            # conver keyBytes & msgLenBytes to int 
            message = f.read(msgLen)
            keyMsgList.append((key, decode(message)))
    return keyMsgList
files = os.listdir("/path/to/binary/files")
rddFiles = sc.parallelize(files, 6000)
df = spark.createDataFrame(rddFiles.flatMap(reader), schema)
df.repartition(6000).write.mode("append").partitionBy("key").parquet("/directory")

The rational behind choosing 6000 here sc.parallelize(files, 6000) is creating partitions each with 200 MB in size i.e. (12k files * 100mb size) / 200MB. Being the sequential nature of file content that is needs to read each of them byte by byte, not sure if read can be further optimised? Similarly, when writing back to parquet, the number in repartition(6000) is to make sure data is distributed uniformly and all executors can write in parallel. However, it turns out be a very slow operation.

One solution is to increase the number of executors, which will improve the read performance but not sure if it will improve writes?

Looking for any suggestion on how can I improve the performance here?


Solution

  • Suggestion 1: do not use repartition but coalesce.

    See here. You identified the bottleneck of the repartition operatio, this is because you have launched a full shuffle. With coalesce you won't do that. You will end up with N partitions also. They won't be as balanced as those you would get with repartition but does it matter ?

    I would recommend you to favor coalesce rather than repartition

    Suggestion 2: 6000 partitions is maybe not optimal

    Your application runs with 6 nodes with 4 cores. You have 6000 partitions. This means you have around 250 partitions by core (not even counting what is given to your master). That's, in my opinion, too much.

    Since your partitions are small (around 200Mb) your master probably spend more time awaiting anwsers from executor than executing the queries.

    I would recommend you to reduce the number of partitions

    Suggestion 3: can you use the DataFrame API ?

    DataFrame API operations are generally faster and better than a hand-coded solution.

    Maybe have a look at pyspark.sql.functions to see if you can find something there (see here). I don't know if it's relevent since I have not seen your data but that's a general recommendation I do from my experience.