Search code examples
pythondaskdelta-lakeminio

Connecting to Delta Lake hosted on MinIO from Dask


I'm trying to connect to a DeltaLake table that is stored on MinIO rather than S3. I can do this directly with the deltalake Python package as follows:

storage_options = {
    "AWS_ENDPOINT_URL": "http://localhost:9000",
    "AWS_REGION": "local",
    "AWS_ACCESS_KEY_ID": access_key,
    "AWS_SECRET_ACCESS_KEY": secret_key,
    "AWS_S3_ALLOW_UNSAFE_RENAME": "true",
    "AWS_ALLOW_HTTP": "true"
}

dt = DeltaTable("s3a://my_bucket/data", storage_options=storage_options)
df = dt.to_pandas()

However, I want to read into a Dask dataframe instead, so I'm trying to use dask-deltatable. As it uses deltalake under the hood, I assumed the following would work:

ddf = dask_deltatable.read_deltalake("s3a://my_bucket/data", storage_options=storage_options)

However, it still seems to be trying to connect to AWS:

OSError                                   Traceback (most recent call last)
Cell In[3], line 1
----> 1 ddf = dask_deltatable.read_deltalake("s3a://my_bucket/data", storage_options=storage_options)

File ~/.local/lib/python3.10/site-packages/dask_deltatable/core.py:285, in read_deltalake(path, catalog, database_name, table_name, version, columns, storage_options, datetime, delta_storage_options, **kwargs)
    282         raise ValueError("Please Provide Delta Table path")
    284     delta_storage_options = utils.maybe_set_aws_credentials(path, delta_storage_options)  # type: ignore
--> 285     resultdf = _read_from_filesystem(
    286         path=path,
    287         version=version,
    288         columns=columns,
    289         storage_options=storage_options,
    290         datetime=datetime,
    291         delta_storage_options=delta_storage_options,
    292         **kwargs,
    293     )
    294 return resultdf

File ~/.local/lib/python3.10/site-packages/dask_deltatable/core.py:102, in _read_from_filesystem(path, version, columns, datetime, storage_options, delta_storage_options, **kwargs)
     99 delta_storage_options = utils.maybe_set_aws_credentials(path, delta_storage_options)  # type: ignore
    101 fs, fs_token, _ = get_fs_token_paths(path, storage_options=storage_options)
--> 102 dt = DeltaTable(
    103     table_uri=path, version=version, storage_options=delta_storage_options
    104 )
    105 if datetime is not None:
    106     dt.load_as_version(datetime)

File ~/.local/lib/python3.10/site-packages/deltalake/table.py:297, in DeltaTable.__init__(self, table_uri, version, storage_options, without_files, log_buffer_size)
    277 """
    278 Create the Delta Table from a path with an optional version.
    279 Multiple StorageBackends are currently supported: AWS S3, Azure Data Lake Storage Gen2, Google Cloud Storage (GCS) and local URI.
   (...)
    294 
    295 """
    296 self._storage_options = storage_options
--> 297 self._table = RawDeltaTable(
    298     str(table_uri),
    299     version=version,
    300     storage_options=storage_options,
    301     without_files=without_files,
    302     log_buffer_size=log_buffer_size,
    303 )

OSError: Generic S3 error: Error after 10 retries in 13.6945151s, max_retries:10, retry_timeout:180s, source:error sending request for url (http://169.254.169.254/latest/api/token)

Has anyone successfully managed to read from Deltalake into a Dask dataframe from MinIO, and if so how?


Solution

  • Firstly: a reasonable workaround would be to set those values as environment variables. That way they should be picked up by any s3 frameworks in operation.

    In the docstring, we have

    storage_options : dict, default None Key/value pairs to be passed on to the fsspec backend, if any.

    delta_storage_options : dict, default None Key/value pairs to be passed on to the delta-rs filesystem, if any.

    I suspect that here, the second set of options are exactly as you have them.

    For fsspec, see this documentation showing how to set the endpoint, region and key/secret. The remaining values would be part of client_kwargs , and you'll need to look in the botocore docs for that - but I suspect they are not needed.