Search code examples
pythongoogle-cloud-storageapache-iceberg

Pyiceberg catalog in GCS: I cannot use pyceberg with google Cloud storage


I want to use the library pyiceberg with Google cloud storage.

I have a catalog created in Google Cloud storage using Pyspark and I would want to read this tables from there.

I see this documentation to create a catalog object for GSC, but I really don't understand how to connect to it or how to create a config object for google cloud.

I tried:

catalog = load_catalog(
    uri="gs://catalog",
    type="gcsfs"
)

but I get an error:

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
Cell In[4], line 1
----> 1 catalog = load_catalog(
      2     name="gcsfs", 
      
File ~/opt/anaconda3/envs/pyceberg/lib/python3.11/site-packages/pyiceberg/catalog/__init__.py:212, in load_catalog(name, **properties)
    210 catalog_type = None
    211 if provided_catalog_type and isinstance(provided_catalog_type, str):
--> 212     catalog_type = CatalogType[provided_catalog_type.upper()]
    213 elif not provided_catalog_type:
    214     catalog_type = infer_catalog_type(name, conf)

File ~/opt/anaconda3/envs/pyceberg/lib/python3.11/enum.py:792, in EnumType.__getitem__(cls, name)
    788 def __getitem__(cls, name):
    789     """
    790     Return the member matching `name`.
    791     """
--> 792     return cls._member_map_[name]

KeyError: 'GCSFS'

I installed the package pypiceberg[gcsfs].

I see in the PYICEBERG github repository

AVAILABLE_CATALOGS: dict[CatalogType, Callable[[str, Properties], Catalog]] = {
    CatalogType.REST: load_rest,
    CatalogType.HIVE: load_hive,
    CatalogType.GLUE: load_glue,
    CatalogType.DYNAMODB: load_dynamodb,
    CatalogType.SQL: load_sql,
}


Solution

  • Pyiceberg is a python library for working with Iceberg tables.

    First, get the OAuth2 token using the service account file. I'm running this in a collab so I needed to get it done this way. You could do this differently if running in a container.

    import google.auth
    from google.auth.transport.requests import Request
    from pyiceberg import catalog
    
    
    def get_access_token(service_account_file, scopes):
      """
      Retrieves an access token from Google Cloud Platform using service account credentials.
    
      Args:
          service_account_file: Path to the service account JSON key file.
          scopes: List of OAuth scopes required for your application.
    
      Returns:
          The access token as a string.
      """
    
      credentials, name = google.auth.load_credentials_from_file(
          service_account_file, scopes=scopes)
    
      request = Request()
      credentials.refresh(request)  # Forces token refresh if needed
      return credentials
    
    # Example usage
    service_account_file = "/path-to-service-account-file.json"  # Replace with your path
    scopes = ["https://www.googleapis.com/auth/cloud-platform"]  # Adjust scopes as needed
    
    access_token = get_access_token(service_account_file, scopes)
    
    

    Next, the catalog is loaded. We use the OAUTH2 credentials retrieved using our service account key.

    I have redacted datetime_to_unix_ms function to focus on the main task.

    Since you are just beginning, I suggest keeping your implementation light by using a database for your registry.

    If you already have an EMR cluster up, you should look into using Hive metastore instead.

    For this example, we will use an sqlite database for our central registry. You can replace this with any of the SQL database options supported by the SQLalchemy library.

    REGISTRY_DATABASE_URI = "sqlite:///catalog.db" # replace this with your database URI
    
    catalog_inst = catalog.load_catalog(
        "default",
        **{
            "uri": REGISTRY_DATABASE_URI, 
            "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
            "gcs.oauth2.token-expires-at": datetime_to_unix_ms(access_token.expiry),
            "gcs.project-id": "project-id", # replace with your gcp project id
            "gcs.oauth2.token": access_token.token,
            "gcs.default-bucket-location": "gs://bucket/", # replace with your gcs bucket
            "warehouse": "gs://bucket/" # replace with your gcs bucket
        }
    )
    

    Finally, we create an example table with some data using Pyarrow:

    import pyarrow as pa
    
    catalog_inst.create_namespace("default") # Replace this with your namespace
    
    # Define the schema for the book table
    schema = pa.schema([
        ('title', pa.string())
    ])
    
    catalog_inst.drop_table("default.books") # Replace this with your table
    table = catalog_inst.create_table("default.books", schema=schema)
    
    # Create some sample data
    titles = ["The Lord of the Rings", "Pride and Prejudice", "Moby Dick"]
    
    # Create Arrow arrays from the data
    title_array = pa.array(titles, type=pa.string())
    table_data = pa.Table.from_arrays([title_array], names=schema.names)
    
    table.append(table_data)