Search code examples
pythonpysparkpalantir-foundryfoundry-code-repositoriespyspark-schema

check if rows are already present in pyspark dataset


The schema of my output and input dataset is the same. Upon running this script, I want to first check create a new dataset using the filter_data function and save the results in a dataframe variable called "alerts". Now, I want to check if there are any rows in this "alerts" dataframe that are not already present in my existing output.

If there are no new rows, I want to return the existing output as it is (without any changes) so that the output dataset is not updated and the health checks on the output dataset are not triggered.

However, if the "alerts" dataset has new data that's not already present in the Output dataset, then i want to return "alerts" dataset.

from transforms.api import configure, transform_df, Input, Output
from numpy import mean, std
from datetime import timedelta, datetime
from pyspark.sql import functions as F

@configure(profile=[
    "DRIVER_MEMORY_MEDIUM"
    ])
@incremental(snapshot_inputs=['violations'])
@transform_df(
    Output("/Spring/Snowplow/data/derived/output_dataset"),
    violations=Input("xxx"),
)
def compute(violations, output):

    alerts = filter_data(violations, output)
    alerts_df = alerts.subtract(output)
    
    if alerts_df.count() > 0:
        return alerts
    else:
        return output



def filter_data(violations, output):

    todays_date = F.current_date()
    violations = violations.filter(F.col("failure_date") == todays_date)

    columns_to_keep = [
        "id",
        "value_1",
        "value_2",
        "date",
        "timestamp",
    ]
    return violations.select(*columns_to_keep)

This is what I am trying but currently I seem to be getting an error:

TypeError: compute() missing 1 required positional argument: 'output'

Solution

  • I see two separate issues in your code that may be causing problems.

    First, you can use either of the following:

    @transform_df(
        Output("/Spring/Snowplow/data/derived/output_dataset"),
        violations=Input("xxx"),
    )
    

    or

    @transform(
        output=Output("/Spring/Snowplow/data/derived/output_dataset"),
        violations=Input("xxx"),
    )
    

    But you can't use what you currently have, which is:

    @transform_df(
        output=Output("/Spring/Snowplow/data/derived/output_dataset"),
        violations=Input("xxx"),
    )
    

    The difference has to do with how the transform API takes different arguments than the transform_df API.

    I would recommend using transform just like how it's used in this example in the docs. What you call "output" in your example is called "processed" in that example. It accesses the data in the previous transaction of the output dataset and uses it in the current transaction.

    Second, in order to accomplish what you're trying to do here, you should leverage the ability to abort transactions. This example in the docs seems pretty similar to what you're trying to accomplish. This will "allow a job to successfully complete if the output dataset is unchanged (where no new data is written to the dataset)."

    Lastly, I think there's an error in your filter_data function in that it doesn't require an output argument under any scenario. And the code you shared is passing in schema_violations when that isn't defined anywhere and should probably be just violations based on what I'm seeing in your code snippet.