Search code examples
pysparkdatabricksspark-streamingazure-databricksspark-structured-streaming

Unexpected Behavior using WHEN | OTHERWISE


We have developed one streaming process which use many other delta tables to enrich the final data product.

Lets call it FinalDataProduct the delta table where the data is inserted, semiLayout a dataframe that contains the whole tables used to enrinch the final table that is merged into FinalDataProduct, and stagingTable a delta table that is joined and return the SHIFTS(the name of the column is 'SHIFTS' and the values we need that can be many).

This stagingTable is joinedto semiLayout dataframe by one common key (example semiLayout have key with value = 21451, so stagingTable has the same value) applying a left join.

There are times that stagingTable does not have a value that can be joined to semiLayout dataframe, which would end having the column 'SHIFTS' empty or null.

After the join, in the final operations before the merge we apply a withColumn using the following logic: .withColumn("SHIFTS", when(col("SHIFTS").isNull(), lit(1)).otherwise(col("SHIFTS"))).

In a summary we ensure that the column SHIFTS can not be a NULL Value.

However, we have been having a few case where it is a NULL Value.

What I have tried so far was:

  • Valide the logic of the bussiness and code but everythings is ok, we never replace the value or do something else with that column, just the join and the withColumn.

  • The values are INTEGER and there are not mismatch of data types.

  • I have tried runing the code in batch and the values are filled without problem.

  • I have been testing the same data source in real time in a different delta table (basically a clone of the original target delta table FinalDataProduct) with the same information using the same code in the same enviroment and the values were filled.

  • I have valide the versions to see if they were updated but they weren't.

  • I have valided that the staging table is not the issue

It only has happened in a few records, for example of 1millon records 60 does not have the value but if I re run the code in batch or if I delete the checkpoints in the clone table it fills the value.

Does anyone has experience a behavior like that? Could it be that the condition of doing .withColumn("SHIFTS", when(col("SHIFTS").isNull(), lit(1).otherwise(col("SHIFTS"))) is no detecting the Null Type and could be interpretating something else? If it could be, why does it only happens in the streaming running?

If anyone has experienced something similar I would appreciate your help or suggestions. Or even if anyone has not have a some similar situation but has any idea or aproach feel free to share your comment.


Solution

  • Apparently, the code you submited is not exact.
    you wrote this:
    .withColumn("SHIFTS", when(col("SHIFTS")).isNull(), lit(1).otherwise(col("SHIFTS")))
    it should probably be this :
    .withColumn("SHIFTS", when(col("SHIFTS").isNull(), lit(1)).otherwise(col("SHIFTS")))

    You can achieve the same with : withColumn("SHIFTS", coalesce(col("SHIFTS), lit(1)))