In our pipeline we have a dataset which is updated using a snapshot transaction everyday. We would like to export only the new rows everyday - considering the entire history of the input dataset.
How can we achieve that? Can we use incremental transform?
Yes, you can create an incremental transform to export only new rows, even if your input dataset is built using a snapshot transaction. Each time your incremental dataset will build, it will store the new transactions into an historical dataset and use it to identify the rows already sent to output only the new ones.
Please find below a code example with the snapshot input dataset ‘alerts’:
from transforms.api import transform, Input, Output, incremental
from pyspark.sql import types as T
schema = T.StructType([
T.StructField("issue_id", T.StringType()),
T.StructField("user", T.StringType()),
T.StructField("type", T.StringType()),
T.StructField("description", T.StringType())])
@incremental(semantic_version=1, snapshot_inputs=['alerts'])
@transform(
alerts=Input("ri.foundry.main.dataset.1ce7ba64-796d-484c-b816-c40ef93e7550"),
output_dataset_export=Output("ri.foundry.main.dataset.07d558e2-0e51-447d-b937-ca5ecf4586a4"),
output_dataset_historical=Output("ri.foundry.main.dataset.7cecded4-e01a-4b81-8d73-ad8a769995de"),
)
def proposals(alerts, output_dataset_export, output_dataset_historical):
alerts_df = alerts.dataframe()
# Get the alerts previously exported
alerts_previously_exported = output_dataset_historical.dataframe('previous', schema)
# Identify the alerts which have never been exported
alert_to_export = alerts_df.join(alerts_previously_exported, 'issue_id', 'left_anti')
# Update export dataframe
alert_to_export.localCheckpoint(eager=True)
output_dataset_export.set_mode('replace')
output_dataset_export.write_dataframe(alert_to_export)
# Update historical dataframe
alerts_df.localCheckpoint(eager=True)
output_dataset_historical.set_mode('replace')
output_dataset_historical.write_dataframe(alerts_df)