Search code examples
pythonpalantir-foundryfoundry-code-repositories

In Foundry, how to read entire current output dataset which has snaphot transaction type?


I do not know if the title of question is appropriate but basically I want to read whole output dataset.

I uploaded the output dataset as a parquet file into Foundry (which is "Append" transaction). My input data is calculated from python transform in code workbook (which is showing as SNAPSHOT in transaction details).

Now, I want to read that uploaded output dataset and do some comparison with input_df and replace that whole output dataset with new output data after comparisons with input_df. My output dataset has 123 rows and input dataset has 86 rows. As a testing purpose, I tried to read the "previous" output_df and union it with input_df and built the code repo, I got 86 rows in output_df (which is exactly the same data of input_df). I think I read the output_df as a empty dataset. How can I read entire data (123 rows) of output_df ?

from transforms.api import transform, incremental, Input, Output, configure
from pyspark.sql import types as T
from pyspark.sql import functions as F

schema = T.StructType(
    [
        T.StructField("wellname", T.StringType()),
        T.StructField("start_date", T.TimestampType()),
        T.StructField("woe_limit_psia", T.DoubleType()),
        T.StructField("end_date", T.TimestampType()),
    ]
)


@configure(profile=["KUBERNETES_NO_EXECUTORS"])
@incremental(
    snapshot_inputs=["input_df"]
)
@transform(
    input_df=Input(
        "ri.foundry.lava-catalog.dataset.974e09fa-fba6-4867-a7b1-f860ab7a7046"
    ),
    output_df=Output(
        "ri.foundry.lava-catalog.dataset.95804158-098a-4f0c-afea-64abd10fbc1f"
    )
)

def incremental_filter(input_df, output_df):
    df_current = input_df.dataframe("added")

    columns_in_order = [field.name for field in schema.fields]
    df_current = df_current.select(columns_in_order)

    df_union = df_current.unionByName(output_df.dataframe(mode="previous", schema=schema))

    mode = "replace"
    df_union.localCheckpoint(eager=True)
    output_df.set_mode(mode)

    # Write the output dataframe
    output_df.write_dataframe(df_union) 

Output: enter image description here


Solution

  • output_df.dataframe(mode="current", schema=schema) should work. Even not specifying a mode should work, as it will simply "just pick what is on the output".

    More information there: https://www.palantir.com/docs/foundry/transforms-python/incremental-reference/#incrementaltransforminput