I am trying to read a lot of parquet files from my S3 bucket. The goal is to write some code to read these data, apply some logic on it using pandas/dask then upload them back to S3.
Each parquet contains ~130 columns and 1 row and some of the files might have slight variations in schema. These files' path varies based on which grouping category they belong, let's say group1, group2, and group3, as follows: s3://path/to/my/files/{group_N}/filename1.parquet.gz
.
In earlier stages of my code I generate (list_objects_V2
with NextPaginationToken
) a dictionary holding as key the group_name and as value the list of all parquet paths under that group (eg: {str : List[str]}
). Nonetheless, based on the method I have used to read the parquet files, this combination has been also just group_name to dir_path (eg: {str:str}
).
I have tried several solutions (Dask, PyArrow, Boto3's get_object and BytesIO buffer, PySpark, AWS Wrangler) and, the ones which where working were taking between 2 hours and (estimated) 1 day to run.
The best running solution has been Boto3's get_object and BytesIO buffer:
def read_parquet_objects(self, objects_dict: dict) -> dict:
df_holder = {}
for key in objects_dict.keys():
logging.info("Start reading parquets for: " + key)
df_list = list()
for i in objects_dict[key]:
object = self.s3_client.client.get_object(Bucket=my_bucket, Key=i)
bytes_ = BytesIO(object["Body"].read())
df_list.append(pd.read_parquet(bytes_))
pandas_df = pd.concat(df_list, axis=0, ignore_index=True)
df_holder[key] = pandas_df
return df_holder
Which has been taking roughly 2 hours to run.
Then I have tried Dask, PyArrow, and AWS Wrangler, which were taking too long to run. Based on what they have compledeted before I stopped the process, they would have taken 1+ days to run. Here's the code:
#AWS WRANGLER
def read_parquet_objects(self, objects_dict: dict) -> dict:
for key in objects_dict.keys():
print("Start reading parquet for: " + key)
df = wr.s3.read_parquet(path=objects_dict[key], use_threads=True)
return df
# DASK
def read_parquet_objects(self, objects_dict: dict) -> dict:
for key in objects_dict.keys():
df_list = list()
for i in objects_dict[key]:
df_list.append(dd.read_parquet(path=i, gather_statistics=False))
df = dd.concat(df_list, axis=0)
pandas_df = df.compute()
objects_dict[key] = pandas_df
return objects_dict
# PYARROW
def read_parquet_objects(self, objects_dict: dict) -> dict:
for key in objects_dict.keys():
df_list = list()
for i in objects_dict[key]:
dataset = pq.ParquetDataset(i, filesystem=FILESYSTEM)
table = dataset.read()
single_df = table.to_pandas()
df_list.append(single_df)
pandas_df = pd.concat(dfs, axis=0)
objects_dict[key] = pandas_df
return objects_dict
These three methods were by far the slowest, reading ~1 parquet per second (where AWS Wrangler was fastest with ~2 per second).
Then I have tried to Spark using PySpark.
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=com.amazonaws:aws-java-sdk-bundle:1.11.271,org.apache.hadoop:hadoop-aws:3.1.2 pyspark-shell"
sparkConf = SparkConf()
sparkConf.setAppName("spark")
sparkConf.set("spark.executor.instances", "4")
sparkConf.set("spark.executor.cores", "4")
sparkConf.set("spark.driver.memory", "512m")
sparkConf.set("spark.executor.memory", "512m")
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.access.key", os.environ['AWS_ACCESS_KEY_ID'])
hadoop_conf.set("fs.s3a.secret.key", os.environ['AWS_SECRET_ACCESS_KEY'])
hadoop_conf.set("fs.s3a.session.token", os.environ['AWS_SESSION_TOKEN'])
hadoop_conf.set("fs.s3a.experimental.input.fadvise", "sequential")
PROTOCOL = 's3a://'
def read_parquet_objects(self, objects_dict: dict):
for key in objects_dict.keys():
df = spark.read.option("mergeSchema", "true").parquet(objects_dict[key])
df.show()
return df
objects_dict[key]
in this case contains the key of my parquet files (as if the base path without file name). When I run the code, it once got stuck on Stage 0 ([Stage 0:> (0 + 8) / 8]
) and then (after leaving it up for the night) the morning after I woke up to this error message HTTP ERROR 500 java.lang.OutOfMemoryError: GC overhead limit exceeded
. Now I am getting a different error, com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
which I am really not sure how to interpret.
Now the question: is there any better, faster (< 30 sec) way of reading these parquet files so to transform them in pandas/dask? Am I using a completely wrong approach? Should I maybe use some in-house AWS services (Lambda, Step Functions, Athena) to get the better results? I am actually surprised about how slow the read_parquet functions are in this case.
TL:DR; It appears all your code is single threaded which means you are spending a lot of time blocked on IO.
Some options:
Last, if you have control over the data I would recommend against using parquet as a format, there is a lot of overhead per column for such a small number of rows per file. Avro might be a good alternative or if possible try to consolidate multiple rows per file.