Search code examples
apache-spark-sqlpalantir-foundryfoundry-code-repositories

Pipeline generation - passing in simple datastructures like lists/arrays


For a code repository project in Palantir Foundry, I am struggling with re-using some of my transformation logic.

It seems almost trivial, but: is there way to send an Input to a Transform that is not a dataset/dataframe reference?

In my case I want to pass in strings or lists/arrays.

This is my code:

from pyspark.sql import functions as F
from transforms.api import Transform, Input, Output


def my_computation(result, customFilter, scope, my_categories, my_mappings):
    scope_df = scope.dataframe()
    my_categories_df = my_categories.dataframe()
    my_mappings_df = my_mappings.dataframe()
 
    filtered_cat_df = (
        my_categories_df
        .filter(F.col('CAT_NAME').isin(customFilter))
    )
 
    # ... more logic
 
 
def generateTransforms(config):
    transforms = []
 
    for key, value in config.items():
        o = {}
        for outKey, outValue in value['outputs'].items():
            o[outKey] = Output(outValue)
 
        i = {}
        for inpKey, inpValue in value['inputs'].items():
            i[inpKey] = Input(inpValue)
 
        i['customFilter'] = Input(value['my_custom_filter'])
 
        transforms.append(Transform(my_computation, inputs=i, outputs=o))
 
    return transforms
 
 
config = {
    "transform_one": {
        "my_custom_filter": {
            "foo",
            "bar"
        },
        "inputs": {
            "scope": "/my-project/input/scope",
            "my_categories": "/my-project/input/my_categories",
            "my_mappings": "/my-project/input/my_mappings"
        },
        "outputs": {
            "result": "/my-project/output/result"
        }
    }
}
 
TRANSFORMS = generateTransforms(config)

The concrete question is: how can I send in the values from my_custom_filter into customFilter in the transformation function my_computation?

If I execute it like above, I get the error "TypeError: unhashable type: 'set'"


Solution

  • This looks like a python issue, any chance you can point out which line is causing the error?

    Reading throung your code, I would guess it's this line:

    i['customFilter'] = Input(value['my_custom_filter'])
    

    Your python logic is wrong, if we unpack your code you're trying to do this call:

    i['customFilter'] = Input({"foo", "bar"})
    

    Edit to answer the comment on how to create a python transform to lock a variable in a closure:

    def create_transform(inputs={}, outputs={}, my_other_var):
        @transform(**inputs, **outputs)
        def compute(input_foo, input_bar, output_foobar, ctx):
             df = input_foo.dataframe()
             df = df.withColumn("mycol", F.lit(my_other_var))
             output_foorbar.write_dataframe(df)
    
        return compute
    

    and now you can call this:

     transforms.append(create_tranform(inputs, outptus, "foobar"))