Search code examples
aws-glue

AWS Glue. How to create a compound key for Job bookmarks?


I have a JDBC source (PostgreSQL) with a table, which I want to fetch by Glue.

My table has columns:

id          (bigint)
name        (string)
updated_at  (timestamp)

I've set up the table in the Glue data catalog with a crawler, set up a job and enabled Job bookmarks.

And when I run the job, it automatically defines new rows by new ids.

But I want to use the compound key -> [ id + updated_at ].

It will alow me to detect all updates in the source table.

How can I do it ?

AWS docs say that this feature is available (https://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html):

For JDBC sources, the following rules apply:
   * For each table, AWS Glue uses one or more columns as bookmark keys to determine new and processed data. The bookmark keys combine to form a single compound key.
   * You can specify the columns to use as bookmark keys. If you don't specify bookmark keys, AWS Glue by default uses the primary key as the bookmark key, provided that it is sequentially increasing or decreasing (with no gaps).

Should I define the table manually (without crawlers) ?

Thanks !


Solution

  • The AWS doc on using job bookmarks states that you can specify jobBookmarkKeys and jobBookmarkKeysSortOrder in your Glue job's script.

    You can specify jobBookmarkKeys and jobBookmarkKeysSortOrder in the following ways:

    create_dynamic_frame.from_catalog — Use additional_options. create_dynamic_frame.from_options — Use connection_options.

    As an example, starting from the generated script when you have a JDBC source in a Glue job, you can add the following lines:

    your_node = glueContext.create_dynamic_frame.from_options(
        connection_type="sqlserver",
        connection_options={
            "useConnectionProperties": "true",
            "dbtable": "your.table",
            "connectionName": "Your Connection Name",
            # Add the following two lines:
            "jobBookmarkKeys": ["updated_at", "id"],
            "jobBookmarkKeysSortOrder": "asc",
        },
        transformation_ctx="your_node",
    )
    

    The types of two job bookmark options can be found in the AWS Glue docs.

    jobBookmarkKeys — Array of column names.

    jobBookmarkKeysSortOrder — String defining how to compare values based on sort order. Valid values: "asc", "desc".

    Note, for jobBookmarkKeys, the order of your column names matters.

    If you run this Glue job and search the executor logs in Cloudwatch, you'll find the SQL query used to find the latest compound key. The order of your columns in jobBookmarkKeys defines the order of the ORDER BY clause in this SQL statement used to find the latest compound key.

    SELECT
        *
    FROM (
        select TOP 1
            updated_at,
            id
        from
            your.table
        order by
            updated_at DESC,
            id DESC)
    

    For a row to then be picked up in the incremental ingest, every column in your compound key must satisfy the incremental condition. In this example, both the row's updated_at and id must be greater than the previous bookmark's updated_at and id values. Here's the SQL statement from the Cloudwatch logs:

    SELECT
        *
    FROM (
        SELECT
            *
        FROM
            your.table
        WHERE ((updated_at > '2023-01-01 14:15:00.0')
            or(updated_at = '2023-01-01 14:15:00.0'
                AND id > '1234'))
        and((updated_at < '2023-01-01 14:30:00.0')
        or(updated_at = '2023-01-01 14:30:00.0'
            AND id <= '5678')))