Search code examples
amazon-web-servicesaws-glue

Transform - Custom Code - runs great w 2 filters but want 3+ filters?


In AWS Glue Studio, I created a "Transform - Custom code" to filter some records based on - eventually - many columns, keeping only records that violate any column not validating correctly so the log can be looked at to determine invalid values.

The following code works to filter based only on two columns:

def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
    df = dfc.select(list(dfc.keys())[0]).toDF()
    from pyspark.sql import functions as sf

    dfFirstRule = df.filter((df['Cardholder_ID'] < 3911539932589))
    dfFirstRule = dfFirstRule.withColumn('invalid_cardholder_id', sf.lit('true'))

    dfSecondRule = df.filter((df['Patient_Effective_Date'] < 20230101))
    dfSecondRule = dfSecondRule.withColumn('invalid_patient_effective_date', sf.lit('true'))

    outputDf = dfFirstRule.unionByName(dfSecondRule, allowMissingColumns=True)

    output = DynamicFrame.fromDF(outputDf, glueContext, "output")
    return (DynamicFrameCollection({"out0": output}, glueContext))

If I want to add another rule (for now and later many rules), how can I restructure this so that it is more elegant and efficient to handle more than just two rules?


Solution

  • Did it!

    def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
        rules = [
            {'column': 'Cardholder_ID', 'operator': '<', 'value': 3911539932589, 'errors': 'invalid_cardholder_id'},
            {'column': 'Patient_Effective_Date', 'operator': '<', 'value': 20230101, 'errors': 'invalid_patient_effective_date'},
            {'column': 'Patient_Term_Reason', 'operator': '<', 'value': 1, 'errors': 'invalid_patient_term_reason'},
            {'column': 'Patient_Term_Date', 'operator': '<', 'value': 20230101, 'errors': 'invalid_patient_term_date'}
            # more rules here
            ]
        df = dfc.select(list(dfc.keys())[0]).toDF()
        from pyspark.sql import functions as sf
        outputDf = None
        for rule in rules:
            filteredDf = df.filter((df[rule['column']] < rule['value']))
            filteredDf = filteredDf.withColumn(rule['errors'], sf.lit('true'))
            if outputDf is None:
                outputDf = filteredDf
            else:
                outputDf = outputDf.unionByName(filteredDf, allowMissingColumns=True)
        output = DynamicFrame.fromDF(outputDf, glueContext, "output")
        return (DynamicFrameCollection({"out0": output}, glueContext))