Search code examples
pythonpysparkfiltering

Advanced filtering in PySpark


Currently I'm performing some calculations on a large database that contains various information on how loans are paid by various borrowers. From technical point of view, I'm using PySpark and have just faced with an issue of how to use advanced filtering operations.

For example my dataframe looks like this:

Name    ID     ContractDate LoanSum Status
Boris   ID3    2022-10-10   10      Closed 
Boris   ID3    2022-10-15   10      Active
Boris   ID3    2022-11-22   15      Active
John    ID1    2022-11-05   30      Active
Martin  ID6    2022-12-10   40      Closed
Martin  ID6    2022-12-12   40      Active
Martin  ID6    2022-07-11   40      Active

I have to create a dataframe that contains all loans issued by an organization to specific borrowers (group by ID) where the number of days between two loans (assigned to one unique ID) is less than 5 and the loansum is the same.

In other words, I have to obtain the following table:

Name    ID     ContractDate LoanSum Status
Boris   ID3    2022-10-10   10      Closed 
Boris   ID3    2022-10-15   10      Active
Martin  ID6    2022-12-10   40      Closed
Martin  ID6    2022-12-12   40      Active

What should I do in order to run this filtering?

Thank you in advance


Solution

  • Code

    # Create a window spec
    w = Window.partitionBy('Name', 'ID', 'LoanSum').orderBy('ContractDate')
    
    # Calculate forward(x) and backward(y) differences
    x = F.datediff('ContractDate', F.lag('ContractDate').over(w))
    y = F.datediff('ContractDate', F.lag('ContractDate', -1).over(w))
    
    # Boolean condition to filter the rows where the number 
    # of days between two loans is less than or equal to 5 days
    mask = (F.abs(x) <= 5) | (F.abs(y) <= 5)
    
    # Make sure to convert ContractDate to date type
    result = df.withColumn('ContractDate', F.to_date('ContractDate'))
    
    # filter the rows using the boolean mask
    result = result.select('*', mask.alias('mask')).filter('mask').drop('mask')
    

    Result

    +------+---+------------+-------+------+
    |  Name| ID|ContractDate|LoanSum|Status|
    +------+---+------------+-------+------+
    | Boris|ID3|  2022-10-10|     10|Closed|
    | Boris|ID3|  2022-10-15|     10|Active|
    |Martin|ID6|  2022-12-10|     40|Closed|
    |Martin|ID6|  2022-12-12|     40|Active|
    +------+---+------------+-------+------+