Search code examples
pythonboto3aws-glueapache-iceberg

Creating an Iceberg Table on S3 Using PyIceberg and Glue Catalog


I am attempting to create an Iceberg Table on S3 using the Glue Catalog and the PyIceberg library. My goal is to define a schema, partitioning specifications, and then create a table using PyIceberg. However, despite multiple attempts, I haven't been able to achieve this successfully and keep encountering an error related to empty path components in metadata paths.

Here's a simplified version of the code I'm using:

import boto3
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import TimestampType, DoubleType, StringType, NestedField
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import YearTransform, MonthTransform, DayTransform

def create_iceberg_table():
    # Replace with your S3 bucket and table names
    s3_bucket = "my-bucket-name"
    table_name = "my-table-name"
    database_name = "iceberg_catalog"

    # Define the table schema
    schema = Schema(
        NestedField(field_id=1, name="field1", field_type=DoubleType(), required=False),
        NestedField(field_id=2, name="field2", field_type=StringType(), required=False),
        # ... more fields ...
    )

    # Define the partitioning specification with transformations
    partition_spec = PartitionSpec(
        PartitionField(field_id=3, source_id=3, transform=YearTransform(), name="year"),
        PartitionField(field_id=3, source_id=3, transform=MonthTransform(), name="month"),
        # ... more partition fields ...
    )

    # Create the Glue client
    glue_client = boto3.client("glue")

    # Specify the catalog URI where Glue should store the metadata
    catalog_uri = f"s3://{s3_bucket}/catalog"
    # Load the Glue catalog for the specified database
    catalog = load_catalog("test", client=glue_client, uri=catalog_uri, type="GLUE")

    # Create the Iceberg table in the Glue Catalog
    catalog.create_table(
        identifier=f"{database_name}.{table_name}",
        schema=schema,
        partition_spec=partition_spec,
        location=f"s3://{s3_bucket}/{table_name}/"
    )

    print("Iceberg table created successfully!")

if __name__ == "__main__":
    create_iceberg_table()

My understanding is that the PyIceberg library interacts with the Glue Catalog to manage metadata, schema, and partitions, but I seem to be missing a crucial step or misconfiguring something.

How can I properly generate an Iceberg Table on S3 using the Glue Catalog and PyIceberg?

Traceback:

Traceback (most recent call last):
  File "/home/workspaceuser/app/create_iceberg_tbl.py", line 72, in <module>
    create_iceberg_table()
  File "/home/workspaceuser/app/create_iceberg_tbl.py", line 62, in create_iceberg_table
    catalog.create_table(
  File "/home/workspaceuser/layers/paketo-buildpacks_cpython/cpython/lib/python3.8/site-packages/pyiceberg/catalog/glue.py", line 220, in create_table
    self._write_metadata(metadata, io, metadata_location)
  File "/home/workspaceuser/layers/paketo-buildpacks_cpython/cpython/lib/python3.8/site-packages/pyiceberg/catalog/__init__.py", line 544, in _write_metadata
    ToOutputFile.table_metadata(metadata, io.new_output(metadata_path))
  File "/home/workspaceuser/layers/paketo-buildpacks_cpython/cpython/lib/python3.8/site-packages/pyiceberg/serializers.py", line 71, in table_metadata
    with output_file.create(overwrite=overwrite) as output_stream:
  File "/home/workspaceuser/layers/paketo-buildpacks_cpython/cpython/lib/python3.8/site-packages/pyiceberg/io/pyarrow.py", line 256, in create
    if not overwrite and self.exists() is True:
  File "/home/workspaceuser/layers/paketo-buildpacks_cpython/cpython/lib/python3.8/site-packages/pyiceberg/io/pyarrow.py", line 200, in exists
    self._file_info()  # raises FileNotFoundError if it does not exist
  File "/home/workspaceuser/layers/paketo-buildpacks_cpython/cpython/lib/python3.8/site-packages/pyiceberg/io/pyarrow.py", line 182, in _file_info
    file_info = self._filesystem.get_file_info(self._path)
  File "pyarrow/_fs.pyx", line 571, in pyarrow._fs.FileSystem.get_file_info
  File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Empty path component in path ua-weather-data/hourly_forecasts//metadata/00000-232e3e60-1c1a-4eb8-959e-6940b563acd4.metadata.json

Solution

  • I came across this post in LinkedIn that had an example of how to accomplish this - thanks dipankar mazumdar!!!

    Removed the boto3 library, instantiated the glue catalog with the proper syntax, and created a properly formed catalog.create_table command.

    Here is the adjusted working code:

    from pyiceberg.catalog import load_catalog
    from pyiceberg.table import Table
    from pyiceberg.schema import Schema
    from pyiceberg.types import DoubleType, StringType, TimestampType, NestedField
    from pyiceberg.partitioning import PartitionSpec, PartitionField
    from pyiceberg.transforms import YearTransform, MonthTransform, DayTransform
    from pyiceberg.table.sorting import SortOrder, SortField
    from pyiceberg.transforms import IdentityTransform
    
    def create_iceberg_table():
        # Specify the Glue Catalog database name and URI
        glue_database_name = "iceberg_catalog"
        glue_catalog_uri = "s3://ua-weather-data/catalog"  # Replace with your Glue Catalog URI
    
        # Instantiate glue catalog
        catalog = load_catalog("glue", **{"type": "glue"})
        #catalog = load_catalog(catalog_impl="org.apache.iceberg.aws.glue.GlueCatalog", name=glue_database_name, uri=glue_catalog_uri)
    
        # Define the Iceberg schema
        schema = Schema(
            NestedField(field_id=1, name="cloudCover", field_type=DoubleType(), required=False),
            NestedField(field_id=2, name="dayOfWeek", field_type=StringType(), required=False),
            NestedField(field_id=3, name="dayOrNight", field_type=StringType(), required=False),
            NestedField(field_id=4, name="expirationTimeUtc", field_type=TimestampType(), required=False),
            NestedField(field_id=5, name="iconCode", field_type=DoubleType(), required=False),
            NestedField(field_id=6, name="iconCodeExtend", field_type=DoubleType(), required=False),
            NestedField(field_id=7, name="precipChance", field_type=DoubleType(), required=False),
            NestedField(field_id=8, name="precipType", field_type=StringType(), required=False),
            NestedField(field_id=9, name="pressureMeanSeaLevel", field_type=DoubleType(), required=False),
            NestedField(field_id=10, name="qpf", field_type=DoubleType(), required=False),
            NestedField(field_id=11, name="qpfSnow", field_type=DoubleType(), required=False),
            NestedField(field_id=12, name="relativeHumidity", field_type=DoubleType(), required=False),
            NestedField(field_id=13, name="temperature", field_type=DoubleType(), required=False),
            NestedField(field_id=14, name="temperatureFeelsLike", field_type=DoubleType(), required=False),
            NestedField(field_id=15, name="temperatureHeatIndex", field_type=DoubleType(), required=False),
            NestedField(field_id=16, name="temperatureWindChill", field_type=DoubleType(), required=False),
            NestedField(field_id=17, name="uvDescription", field_type=StringType(), required=False),
            NestedField(field_id=18, name="uvIndex", field_type=DoubleType(), required=False),
            NestedField(field_id=19, name="validTimeLocal", field_type=TimestampType(), required=True),
            NestedField(field_id=20, name="validTimeUtc", field_type=DoubleType(), required=False),
            NestedField(field_id=21, name="visibility", field_type=DoubleType(), required=False),
            NestedField(field_id=22, name="windDirection", field_type=DoubleType(), required=False),
            NestedField(field_id=23, name="windDirectionCardinal", field_type=StringType(), required=False),
            NestedField(field_id=24, name="windGust", field_type=DoubleType(), required=False),
            NestedField(field_id=25, name="windSpeed", field_type=DoubleType(), required=False),
            NestedField(field_id=26, name="wxPhraseLong", field_type=StringType(), required=False),
            NestedField(field_id=27, name="wxPhraseShort", field_type=StringType(), required=False),
            NestedField(field_id=28, name="wxSeverity", field_type=DoubleType(), required=False),
            NestedField(field_id=29, name="data_origin", field_type=StringType(), required=True)
        )
    
        # Define the partitioning specification with year, month, and day
        partition_spec = PartitionSpec(
            PartitionField(field_id=19, source_id=19, transform=YearTransform(), name="validTimeLocal_year"),
            PartitionField(field_id=19, source_id=19, transform=MonthTransform(), name="validTimeLocal_month"),
            PartitionField(field_id=19, source_id=19, transform=DayTransform(), name="validTimeLocal_day")
        )
    
        # Define the sorting order using validTimeUtc field
        sort_order = SortOrder(SortField(source_id=20, transform=IdentityTransform()))
    
        # Create the Iceberg table using the Iceberg catalog
        table_name = "iceberg_catalog.hourly_forecasts"
        catalog.create_table(
            identifier=table_name,
            location="s3://ua-weather-data/catalog",
            schema=schema,
            partition_spec=partition_spec,
            sort_order=sort_order
        )
    
        print("Iceberg table created using AWS Glue Catalog.")
    
    if __name__ == "__main__":
        create_iceberg_table()