Search code examples
data-ingestiondagster

How can I do an incremental load based on record ID in Dagster


I am trying to consume an HTTP API in my Dagster code. The API provides a log of "changes" which contain an incrementing ID. It supports an optional parameter fromUpdateId, which lets you only fetch updates that have a higher ID than some value.

This should let me do incremental loads, by looking at the highest update ID I have seen so far, and providing this as the parameter.

How can I accomplish this in Dagster? I am thinking it should be possible to write the highest ID as metadata when I materialize the asset. The metadata would then be available the next time the asset is materialized.


Solution

  • I am thinking it should be possible to write the highest ID as metadata when I materialize the asset. The metadata would then be available the next time the asset is materialized.

    That sounds like the right approach to me.

    Here's some Python code that implements that approach:

    from dagster import asset, Output
    
    
    @asset
    def asset1(context):
        asset_key = context.asset_key_for_output()
        latest_materialization_event = context.instance.get_latest_materialization_events(
            [asset_key]
        ).get(asset_key)
    
        if latest_materialization_event:
            materialization = (
                latest_materialization_event.dagster_event.event_specific_data.materialization
            )
            metadata = {entry.label: entry.entry_data for entry in materialization.metadata_entries}
            cursor = metadata["cursor"].value
        else:
            cursor = 0
    
        return Output(value=..., metadata={"cursor": cursor + 1})