Search code examples
scalaapache-sparkapache-spark-sqlemr

Spark dataframe transform in time window


I have two dataframes. [AllAccounts]: contains audit for all accounts for all users

UserId, AccountId, Balance, CreatedOn
1, acc1, 200.01, 2016-12-06T17:09:36.123-05:00
1, acc2, 189.00, 2016-12-06T17:09:38.123-05:00  
1, acc1, 700.01, 2016-12-07T17:09:36.123-05:00
1, acc2, 189.00, 2016-12-07T17:09:38.123-05:00
1, acc3, 010.01, 2016-12-07T17:09:39.123-05:00
1, acc1, 900.01, 2016-12-08T17:09:36.123-05:00

[ActiveAccounts]: contains audit for only the active account(could be zero or 1) for any user

UserId, AccountId, CreatedOn
1, acc2, 189.00, 2016-12-06T17:09:38.123-05:00
1, acc3, 010.01, 2016-12-07T17:09:39.123-05:00

I want to transform these into a single DF which is of the format

UserId, AccountId, Balance, CreatedOn, IsActive
1, acc1, 200.01, 2016-12-06T17:09:36.123-05:00, false
1, acc2, 189.00, 2016-12-06T17:09:38.123-05:00, true 
1, acc1, 700.01, 2016-12-07T17:09:36.123-05:00, false
1, acc2, 189.00, 2016-12-07T17:09:38.123-05:00, true
1, acc3, 010.01, 2016-12-07T17:09:39.123-05:00, true
1, acc1, 900.01, 2016-12-08T17:09:36.123-05:00, false

So based on accounts in ActiveAccounts, i need to flag the rows in first df appropriately. As in the example, acc2 for userId 1 was marked active on 2016-12-06T17:09:38.123-05:00 and acc3 was marked active on 2016-12-07T17:09:39.123-05:00. So btw these time ranges acc2 will be marked true and 2016-12-07T17:09:39 onwards acc3 will be marked true.

What will be a an efficient way to do this.


Solution

  • If I understand properly the account (1, acc1) is active between its creation time and that of (1, acc2).

    We can do this in a few steps:

    • create a data frame with the start/end times for each account
    • join with AllAccounts
    • flag the rows of the resulting dataframe

    I haven't tested this, so there may be syntax mistakes.

    To accomplish the first task, we need to partition the dataframe by user and then look at the next creation time. This calls for a window function:

    val window = Window.partitionBy("UserId").orderBy("StartTime")
    val activeTimes = ActiveAccounts.withColumnRenamed("CreatedOn", "StartTime")
      .withColumn("EndTime", lead("StartTime") over window)
    

    Note that the last EndTime for each user will be null. Now join:

    val withActive = AllAcounts.join(activeTimes, Seq("UserId", "AccountId"))
    

    (This should be a left join if you might be missing active times for some accounts.)

    Then you have to go through and flag the accounts as active:

    val withFlags = withActive.withColumn("isActive",
      $"CreatedOn" >= $"StartTime" && 
     ($"EndTime".isNull || ($"CreatedOn" < $"EndTime)))