Search code examples
azure-data-factoryazure-data-lakeazure-synapse

how to join synapse data lake tables with condition defined in column


I'm having trouble figuring out how to do this join. I have two synapse data lake tables, e.g.:

EquipmentReading

equipment_id,time_utc,temperature
6,2022-05-20T02:16,70
6,2022-05-20T02:17,80
6,2022-05-20T02:18,90
AlertDefinition

id,condition,value,description
1,>=,90,the temperature is too high
2,<=,70,the temperature is too low

that I want to join to create a third data lake table, e.g.:

Incident

alert_id,equipment_id,time_utc
2,6,2022-05-20T02:16
1,6,2022-05-20T02:18

The join needs to be conditional in the sense that the 'temperature' column in the EquipmentReading table should be compared using the 'condition' and 'value' fields in the AlertDefinition table to create the rows in the Incidents table. I'm looking at tutorials like data flow joins, but really unsure how to get off the ground.

I suppose I could do the join in pyspark code like this:

alert_definition_sdf = spark.sql("SELECT * FROM db1.AlertDefinition ORDER BY id ASC")
equipment_reading_sdf = spark.sql("SELECT * FROM db1.EquipmentReading ORDER BY time_utc ASC")

readings = equipment_reading_sdf.select('equipment_id', 'temperature', 'time_utc').collect()
alert_definitions = alert_definition_sdf.select('id', 'condition', 'value').collect()

incidents = []
for reading in readings:
    for alert_definition in alert_definitions:
        eval_string = str(reading['temperature']) + alert_definition['condition'] + str(alert_definition['value'])
        if (eval(eval_string)):
            alert = {
                "equipment_id": reading['equipment_id'],
                "alert_id": alert_definition['id'],
                "time_utc": reading['time_utc']   
            }
            incidents.append(alert)

incident_table_sdf = spark.createDataFrame(incidents)
incident_table_sdf.write.format('csv').option('header',True).mode('overwrite').saveAsTable("db1.Incident")

but that seems less that ideal, especially since it uses the 'collect' method which is only intended for small datasets. I'm trying to get this to work as a data flow join.


Solution

  • Neither using collect() nor using nested loops is ideal when dealing with large amounts of data. Since the values of conditional operator is a column in the AlertDefinition table, there isn’t another option but to use collect to get the conditional operator and the value. There is a better way to achieve this instead of using multiple collect() and nested loops.

    Create temporary view for both dataframes using createOrReplaceTempView(). Get the values of 'condition' and 'value' columns using collect(). Using for loop, create an SQL query. Use spark.sql() to execute this query to get the desired results. Try using the following code.

    equipment_reading_sdf.createOrReplaceTempView('reading')
    alert_definition_sdf.createOrReplaceTempView('alert')
    
    list_of_conditions = alert_definition_sdf.select('condition', 'value').collect()
    query = "select a.id,r.equipment_id,r.time_utc from reading r, alert a where "
    for i in list_of_conditions:
        condition = "(a.condition='"+ i['condition'] +"' and r.temperature"+ i['condition']+str(i['value']) +")"
        query = query + condition +" or "
    
    solution_df = spark.sql(query[:-3])  #slicing to remove an extra 'or'
    

    Using dataflow joins might not possible because the condition you want to apply is present as 'column values' in AlertDefiniton table (not as columns) and each row from this table is representing each condition. You need to build a query after extracting the values of these columns (value and condition column) as they are necessary to apply the condition and the method specified above might be a better way to do it.

    Output image: https://i.sstatic.net/4jnli.png

    Another Approach:

    We are using collect() to get the values of condition and value columns from AlertDefinition table. But in this scenario, no matter how many records are present in this table, it is only possible that there can only be 6 distinct conditional operator values in condition column (>=, <=, =, <, >, <>).

    So, we can write an SQL query directly including all the possible conditional operators manually irrespective of the values number of records in AlertDefinition. This way there is no need to use collect(). The following approach might work for the sample table data given (add all the remaining 4 conditional operators for larger table data).

    equipment_reading_sdf.createOrReplaceTempView('reading')
    alert_definition_sdf.createOrReplaceTempView('alert')
    
    solution_df = spark.sql("""select a.id,r.equipment_id,r.time_utc from reading r, alert a where
        (a.condition = '>=' and r.temperature>=a.value) or
        (a.condition = '<=' and r.temperature<=a.value)
    """)