Search code examples
pythonamazon-web-servicesamazon-s3parquet

Fastest way to read 100k+ small (40kb) parquets in S3 for data manipulation in Python. All methods used so far have been extremely slow (2+ hours)


I am trying to read a lot of parquet files from my S3 bucket. The goal is to write some code to read these data, apply some logic on it using pandas/dask then upload them back to S3.

Each parquet contains ~130 columns and 1 row and some of the files might have slight variations in schema. These files' path varies based on which grouping category they belong, let's say group1, group2, and group3, as follows: s3://path/to/my/files/{group_N}/filename1.parquet.gz.

In earlier stages of my code I generate (list_objects_V2 with NextPaginationToken) a dictionary holding as key the group_name and as value the list of all parquet paths under that group (eg: {str : List[str]}). Nonetheless, based on the method I have used to read the parquet files, this combination has been also just group_name to dir_path (eg: {str:str}).

I have tried several solutions (Dask, PyArrow, Boto3's get_object and BytesIO buffer, PySpark, AWS Wrangler) and, the ones which where working were taking between 2 hours and (estimated) 1 day to run.

The best running solution has been Boto3's get_object and BytesIO buffer:

    def read_parquet_objects(self, objects_dict: dict) -> dict:
    df_holder = {}
    for key in objects_dict.keys():
        logging.info("Start reading parquets for: " + key)
        df_list = list()
        for i in objects_dict[key]:
            object = self.s3_client.client.get_object(Bucket=my_bucket, Key=i)
            bytes_ = BytesIO(object["Body"].read())
            df_list.append(pd.read_parquet(bytes_))

        pandas_df = pd.concat(df_list, axis=0, ignore_index=True)
        df_holder[key] = pandas_df

    return df_holder

Which has been taking roughly 2 hours to run.

Then I have tried Dask, PyArrow, and AWS Wrangler, which were taking too long to run. Based on what they have compledeted before I stopped the process, they would have taken 1+ days to run. Here's the code:

    #AWS WRANGLER
    def read_parquet_objects(self, objects_dict: dict) -> dict:
        for key in objects_dict.keys():
            print("Start reading parquet for: " + key)
            df = wr.s3.read_parquet(path=objects_dict[key], use_threads=True)
        return df
    # DASK
    def read_parquet_objects(self, objects_dict: dict) -> dict:
        for key in objects_dict.keys():
            df_list = list()
            for i in objects_dict[key]:
                df_list.append(dd.read_parquet(path=i, gather_statistics=False))
            df = dd.concat(df_list, axis=0)
            pandas_df = df.compute()
            objects_dict[key] = pandas_df
        return objects_dict
    # PYARROW
    def read_parquet_objects(self, objects_dict: dict) -> dict:
        for key in objects_dict.keys():
            df_list = list()
            for i in objects_dict[key]:
                dataset = pq.ParquetDataset(i, filesystem=FILESYSTEM)
                table = dataset.read()
                single_df = table.to_pandas()
                df_list.append(single_df)
            pandas_df = pd.concat(dfs, axis=0)
            objects_dict[key] = pandas_df
        return objects_dict

These three methods were by far the slowest, reading ~1 parquet per second (where AWS Wrangler was fastest with ~2 per second).

Then I have tried to Spark using PySpark.

os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=com.amazonaws:aws-java-sdk-bundle:1.11.271,org.apache.hadoop:hadoop-aws:3.1.2 pyspark-shell"
sparkConf = SparkConf()
sparkConf.setAppName("spark")
sparkConf.set("spark.executor.instances", "4")
sparkConf.set("spark.executor.cores", "4")
sparkConf.set("spark.driver.memory", "512m")
sparkConf.set("spark.executor.memory", "512m")
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.access.key", os.environ['AWS_ACCESS_KEY_ID'])
hadoop_conf.set("fs.s3a.secret.key", os.environ['AWS_SECRET_ACCESS_KEY'])
hadoop_conf.set("fs.s3a.session.token", os.environ['AWS_SESSION_TOKEN'])
hadoop_conf.set("fs.s3a.experimental.input.fadvise", "sequential")
PROTOCOL = 's3a://'

    def read_parquet_objects(self, objects_dict: dict):
        for key in objects_dict.keys():
            df = spark.read.option("mergeSchema", "true").parquet(objects_dict[key])
            df.show()
        return df

objects_dict[key] in this case contains the key of my parquet files (as if the base path without file name). When I run the code, it once got stuck on Stage 0 ([Stage 0:> (0 + 8) / 8]) and then (after leaving it up for the night) the morning after I woke up to this error message HTTP ERROR 500 java.lang.OutOfMemoryError: GC overhead limit exceeded. Now I am getting a different error, com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool which I am really not sure how to interpret.

Now the question: is there any better, faster (< 30 sec) way of reading these parquet files so to transform them in pandas/dask? Am I using a completely wrong approach? Should I maybe use some in-house AWS services (Lambda, Step Functions, Athena) to get the better results? I am actually surprised about how slow the read_parquet functions are in this case.


Solution

  • TL:DR; It appears all your code is single threaded which means you are spending a lot of time blocked on IO.

    Some options:

    • Since you already have a list of files try using manual pyarrow dataset creation on the entire list instead of passing one file at a time. Datasets by default should use multiple threads.
    • Do the threading yourself. Have thread pool that you submit paths to and have each task read the data and append it to the list.

    Last, if you have control over the data I would recommend against using parquet as a format, there is a lot of overhead per column for such a small number of rows per file. Avro might be a good alternative or if possible try to consolidate multiple rows per file.