Search code examples
t-sqlpysparkapache-spark-sqllaglead

Implementation of lag function by updating the same column


I've to update the lag value of barcode (offset=1) to barcode

case 
  when ( lag(barcode,1) over (order by barcode ) 
        and  Datediff(SS, eventdate,lag(next_eventdate,1) over (order by barcode)) < 3*3600 ) 
  THEN 1 
  ELSE 0 
END as FLAG 

I've implemented it on pyspark, but is giving me an error

from pyspark.sql.functions import col, unix_timestamp
timeDiff = unix_timestamp('eventdate', format="ss")- unix_timestamp(F.lag('next_eventdate', 1), format="ss")
ww= Window.orderBy("barcode") 
Tgt_df_tos = Tgt_df_7.withColumn('FLAG',F.when((F.lag('barcode', 1)) & ( timeDiff <= 10800),"1").otherwise('0'))   

Error I'm getting

AnalysisException: "cannot resolve '(lag(`barcode`, 1, NULL) AND ((unix_timestamp(`eventdate`, 'ss') - unix_timestamp(lag(`next_eventdate`, 1, NULL), 'ss')) <= CAST(10800 AS BIGINT)))' due to data type mismatch: differing types in '(lag(`barcode`, 1, NULL) AND ((unix_timestamp(`eventdate`, 'ss') - unix_timestamp(lag(`next_eventdate`, 1, NULL), 'ss')) <= CAST(10800 AS BIGINT)))' (int and boolean).

Solution

  • I'm not familiar with pyspark but it seems to me that the problem is in the CASE statement.

    CASE WHEN (
            LAG(barcode,1) OVER (ORDER BY barcode ) 
        AND
            DATEDIFF(SS, eventdate, LAG(next_eventdate, 1) OVER(ORDER BY barcode)) < 3*3600
    )
    

    There are two expressions: "LAG(barcode,1) OVER (ORDER BY barcode )" which evaluates to an INTEGER.

    "DATEDIFF(SS, eventdate, LAG(next_eventdate, 1) OVER(ORDER BY barcode)) < 3*3600" which evaluates to a boolean (because of the inequality).

    These expressions are combined with the AND operator typically used to combine two Boolean expressions. I believe that this is cause of the error.

    LAG(barcode,1) OVER (ORDER BY barcode ) evaluates to an INTEGER and not a Boolean.

    So the expression looks something like:

    CASE WHEN (324857 AND True) THEN 1 ELSE 0 END as FLAG
    
    AnalysisException: "cannot resolve .... (int and boolean).