Search code examples
pysparkfilterdata-manipulation

Advanced Filtering Operations in PySpark


Currently, I'm making calculations using PySpark on a dataframe where information on how loans are paid by borrowers is shown.

I'm new to PySpark and decided to ask for help while trying to execute complex filtering operations.

My dataframe is:

ID     ContractDate Loansum Debt MaturityDate Bank
ID1    2024-06-01   100     10   2024-06-18   A       
ID1    2024-06-05   50      20   2024-06-17   B       
ID1    2024-06-10   50      20   2024-06-15   C
ID1    2024-06-10   90      70   2024-06-30   D       
ID1    2024-06-15   50      50                R
ID1    2024-07-15   100     90                D

ID2    2024-08-01   70      20   2024-09-01   A       
ID2    2024-08-08   50      10   2024-08-26   B            
ID2    2024-08-20   32      32                R

ID3    2024-09-01   60      10   2024-09-24   A       
ID3    2024-09-03   40      10   2024-09-23   B            
ID3    2024-09-22   22      22                R

ID4    2024-10-03   40      10   2024-10-23   B

ID5    2024-11-01   70      20   2024-11-21   A       
ID5    2024-11-08   50      10   2024-11-22   B            
ID5    2024-11-20   40      40                R

My goal is to filter for rows with multiple criteria.

For instance, for each unique ID, my goal is to get the rows only if:

  • All previous to loan issued by the bank "R" loans are paid (MaturityDate) in no more than 5 days after a loan by the bank "R" is issued (ContractDate) and
  • Loansum of a loan issued by the bank "R" is equal or no more than 10% higher than the sum of debt (Debt) of its preceding loans given the first condition

Expected result:

ID     ContractDate Loansum Debt MaturityDate Bank Marker
ID1    2024-06-01   100     10   2024-06-18   A    Previous   
ID1    2024-06-05   50      20   2024-06-17   B    Previous   
ID1    2024-06-10   50      20   2024-06-15   C    Previous
ID1    2024-06-15   50      50                R    Last

ID3    2024-09-01   60      10   2024-09-24   A    Previous      
ID3    2024-09-03   40      10   2024-09-23   B    Previous           
ID3    2024-09-22   50      22                R    Last

Any help is highly appreciated!


Solution

  • You can do this with window operations, but for this particular problem I feel that joins are more intuitive (and we take advantage of the fact that R only occurs once per ID).

    First we get the date of the contracts for banks R in each ID, then join that back to the original, and create the Marker column:

    maxDateDF = sparkDF.filter(F.col('Bank') == 'R').groupby('ID').agg(F.max('ContractDate').alias('RContractDate'))
    sparkDF = sparkDF.join(
        maxDateDF, on=['ID']
    ).filter(
        (F.col('ContractDate') <= F.col('RContractDate'))
    ).withColumn(
        'Marker', F.when(F.col('Bank') == 'R', 'Last').otherwise('Previous')
    )
    
    +---+------------+-------+----+------------+----+-------------+--------+
    | ID|ContractDate|Loansum|Debt|MaturityDate|Bank|RContractDate|  Marker|
    +---+------------+-------+----+------------+----+-------------+--------+
    |ID1|  2024-06-01|    100|  10|  2024-06-18|   A|   2024-06-15|Previous|
    |ID1|  2024-06-05|     50|  20|  2024-06-17|   B|   2024-06-15|Previous|
    |ID1|  2024-06-10|     50|  20|  2024-06-15|   C|   2024-06-15|Previous|
    |ID1|  2024-06-10|     90|  70|  2024-06-30|   D|   2024-06-15|Previous|
    |ID1|  2024-06-15|     50|  50|        NULL|   R|   2024-06-15|    Last|
    |ID2|  2024-08-01|     70|  20|  2024-09-01|   A|   2024-08-20|Previous|
    |ID2|  2024-08-08|     50|  10|  2024-08-26|   B|   2024-08-20|Previous|
    |ID2|  2024-08-20|     32|  32|        NULL|   R|   2024-08-20|    Last|
    |ID3|  2024-09-01|     60|  10|  2024-09-24|   A|   2024-09-22|Previous|
    |ID3|  2024-09-03|     40|  10|  2024-09-23|   B|   2024-09-22|Previous|
    |ID3|  2024-09-22|     22|  22|        NULL|   R|   2024-09-22|    Last|
    |ID5|  2024-11-01|     70|  20|  2024-11-21|   A|   2024-11-20|Previous|
    |ID5|  2024-11-08|     50|  10|  2024-11-22|   B|   2024-11-20|Previous|
    |ID5|  2024-11-20|     40|  40|        NULL|   R|   2024-11-20|    Last|
    +---+------------+-------+----+------------+----+-------------+--------+
    

    Notice that the above sparkDF doesn't satisfy conditions 1 or 2 yet, but is a precursor since we only want records occurring before R.

    We start with condition (2) by figuring out which IDs have a loansum no more than 10% higher than the sum of debt given the first condition:

    sparkDFcond = sparkDF.filter(
        (F.datediff('MaturityDate','RContractDate') <= 5)
    ).groupby(
        'ID'
    ).agg(
        (1.10*F.sum('Debt')).alias('MaxTotalDebt')
    )
    
    +---+-----------------+
    | ID|     MaxTotalDebt|
    +---+-----------------+
    |ID1|55.00000000000001|
    |ID3|             22.0|
    |ID5|             33.0|
    +---+-----------------+
    

    By joining this back to the earlier sparkDF, we can figure out which IDs meet condition (2):

    validID = sparkDF.join(
        sparkDFcond, on=['ID']
    ).filter(
        (F.col('Bank') == 'R') &
        (F.col('Loansum') <= F.col('MaxTotalDebt'))
    ).select('ID')
    

    And then we make sure condition (1) is met by filtering out the appropriate records, and join this with the valid IDs for condition (2):

    validDF = sparkDF.filter(
        (F.datediff('MaturityDate','RContractDate') <= 5) | (F.col('Bank') == 'R')
    ).join(validID, on=['ID'])
    

    Final result:

    +---+------------+-------+----+------------+----+-------------+--------+
    | ID|ContractDate|Loansum|Debt|MaturityDate|Bank|RContractDate|  Marker|
    +---+------------+-------+----+------------+----+-------------+--------+
    |ID1|  2024-06-01|    100|  10|  2024-06-18|   A|   2024-06-15|Previous|
    |ID1|  2024-06-05|     50|  20|  2024-06-17|   B|   2024-06-15|Previous|
    |ID1|  2024-06-10|     50|  20|  2024-06-15|   C|   2024-06-15|Previous|
    |ID1|  2024-06-15|     50|  50|        NULL|   R|   2024-06-15|    Last|
    |ID3|  2024-09-01|     60|  10|  2024-09-24|   A|   2024-09-22|Previous|
    |ID3|  2024-09-03|     40|  10|  2024-09-23|   B|   2024-09-22|Previous|
    |ID3|  2024-09-22|     22|  22|        NULL|   R|   2024-09-22|    Last|
    +---+------------+-------+----+------------+----+-------------+--------+