Search code examples
palantir-foundryfoundry-code-repositories

In Foundry, how can we output only new rows from a snapshot input dataset with incremental?


In our pipeline we have a dataset which is updated using a snapshot transaction everyday. We would like to export only the new rows everyday - considering the entire history of the input dataset.

enter image description here

How can we achieve that? Can we use incremental transform?


Solution

  • Yes, you can create an incremental transform to export only new rows, even if your input dataset is built using a snapshot transaction. Each time your incremental dataset will build, it will store the new transactions into an historical dataset and use it to identify the rows already sent to output only the new ones.

    enter image description here

    Please find below a code example with the snapshot input dataset ‘alerts’:

    from transforms.api import transform, Input, Output, incremental
    from pyspark.sql import types as T
    
    schema = T.StructType([
            T.StructField("issue_id", T.StringType()),
            T.StructField("user", T.StringType()),
            T.StructField("type", T.StringType()),
            T.StructField("description", T.StringType())])
    
    
    @incremental(semantic_version=1, snapshot_inputs=['alerts'])
    @transform(
        alerts=Input("ri.foundry.main.dataset.1ce7ba64-796d-484c-b816-c40ef93e7550"),
        output_dataset_export=Output("ri.foundry.main.dataset.07d558e2-0e51-447d-b937-ca5ecf4586a4"),
        output_dataset_historical=Output("ri.foundry.main.dataset.7cecded4-e01a-4b81-8d73-ad8a769995de"),
    )
    def proposals(alerts, output_dataset_export, output_dataset_historical):
    
        alerts_df = alerts.dataframe()
    
        # Get the alerts previously exported
        alerts_previously_exported = output_dataset_historical.dataframe('previous', schema)
    
        # Identify the alerts which have never been exported
        alert_to_export = alerts_df.join(alerts_previously_exported, 'issue_id', 'left_anti')
    
        # Update export dataframe
        alert_to_export.localCheckpoint(eager=True)
        output_dataset_export.set_mode('replace')
        output_dataset_export.write_dataframe(alert_to_export)
    
        # Update historical dataframe
        alerts_df.localCheckpoint(eager=True)
        output_dataset_historical.set_mode('replace')
        output_dataset_historical.write_dataframe(alerts_df)