Search code examples
pythonpandasamazon-s3boto3

Using multiple sets of credentials in to_parquet when transfering to s3 using pandas


I have a pipeline that does some data processing and then transfers parquet files to s3.

I wanted to push this data directly to s3 without saving it locally, so I thought the best way for that would just be to use an s3 URI in the to_parquet call, as follows:

@task(name='upload_parquet', retries=2, retry_delay_seconds=2)
def upload_to_s3(df: pd.DataFrame, bucket: str, key: str):
    bucket = bucket if not bucket.endswith('/') else bucket[:-1]

    access_key, secret_key = get_s3_credentials(bucket)
    os.environ['AWS_ACCESS_KEY_ID'] = access_key
    os.environ['AWS_SECRET_ACCESS_KEY'] = secret_key.get_secret_value()

    df.to_parquet(f's3://{bucket}/{key}', engine='pyarrow', compression='snappy')

    del os.environ['AWS_ACCESS_KEY_ID']
    del os.environ['AWS_SECRET_ACCESS_KEY']

This works for the first bucket. But, as soon as the second bucket (and second set of credentials) is used, I receive an AccessDenied error.

My current guess is that boto3 might possibly cache the credentials at some level.

What's the recommended way of dealing with this situation? I wish to keep it as simple as possible.


Solution

  • In case anyone else stumbles upon this: it didn't seem like there was any way to affect this credentials-caching mechanism in boto. As a result, I used s3fs directly and saved the parquet file to a buffer in memory.

    This solution probably doesn't scale very well to very large amounts of data, but it worked for me:

    import s3fs
    from io import BytesIO
    
    df = ...
    s3 = s3fs.S3FileSystem(key=access_key, secret=secret_key)
    
    buffer = BytesIO()
    s3_path = f's3://{bucket}/{key}'
    df.to_parquet(buffer, engine='pyarrow', compression='snappy')
    
    with s3.open(s3_path, 'wb') as result_file:
        result_file.write(buffer.getvalue())
    

    It would be much better to try and stream the file in chunks, but I'm unsure if the s3fs API allows for that. buffer.getvalue() was good enough for my purposes.