Search code examples
pysparkfiltering

Performing filtering in PySpark


Currently I'm performing calculations on a database that contains information on how loans are paid by borrowers. It is a huge dataset so I'm using PySpark and have just faced with an issue of how to use advanced filtering operations.

My dataframe looks like this:

Name    ID     ContractDate LoanSum Status
A       ID1    2022-10-10   10      Closed 
A       ID1    2022-10-15   13      Active
A       ID1    2022-10-30   20      Active
B       ID2    2022-11-05   30      Active
C       ID3    2022-12-10   40      Closed
C       ID3    2022-12-12   43      Active
C       ID3    2022-12-19   46      Active
D       ID4    2022-12-10   10      Closed
D       ID4    2022-12-12   30      Active

I have to create a dataframe that contains all loans issued to specific borrowers (group by ID) where the number of days between two loans (assigned to one unique ID) is less than 15 and the difference between loansums issued to one specific borrower is less or equal then 3.

In other words, I have to obtain the following table (expected result):

Name    ID     ContractDate LoanSum Status
A       ID1    2022-10-10   10      Closed 
A       ID1    2022-10-15   13      Active
C       ID3    2022-12-10   40      Closed
C       ID3    2022-12-12   43      Active
C       ID3    2022-12-19   46      Active

Thank you in advance


Solution

  • Use lag and lead to compare the data between previous row, and get both by criteria.

    from pyspark.sql import functions as f
    from pyspark.sql import Window
    
    df = spark.createDataFrame(data).toDF('Name','ID','ContractDate','LoanSum','Status')
    df.show()
    
    cols = df.columns
    w = Window.partitionBy('ID').orderBy('ContractDate')
    
    df.withColumn('PreviousContractDate', f.lag('ContractDate').over(w)) \
      .withColumn('PreviousLoanSum', f.lag('LoanSum').over(w)) \
      .withColumn('Target', f.expr('datediff(ContractDate, PreviousContractDate) < 15 and LoanSum - PreviousLoanSum <= 3')) \
      .withColumn('Target', f.col('Target') | f.lead('Target').over(w)) \
      .filter('Target == True') \
      .select(cols[0], *cols[1:]) \
      .show()
    
    +----+---+------------+-------+------+
    |Name| ID|ContractDate|LoanSum|Status|
    +----+---+------------+-------+------+
    |   A|ID1|  2022-10-10|     10|Closed|
    |   A|ID1|  2022-10-15|     13|Active|
    |   A|ID1|  2022-10-30|     20|Active|
    |   B|ID2|  2022-11-05|     30|Active|
    |   C|ID3|  2022-12-10|     40|Closed|
    |   C|ID3|  2022-12-12|     43|Active|
    |   C|ID3|  2022-12-19|     46|Active|
    |   D|ID4|  2022-12-10|     10|Closed|
    |   D|ID4|  2022-12-12|     30|Active|
    +----+---+------------+-------+------+
    
    +----+---+------------+-------+------+
    |Name| ID|ContractDate|LoanSum|Status|
    +----+---+------------+-------+------+
    |   A|ID1|  2022-10-10|     10|Closed|
    |   A|ID1|  2022-10-15|     13|Active|
    |   C|ID3|  2022-12-10|     40|Closed|
    |   C|ID3|  2022-12-12|     43|Active|
    |   C|ID3|  2022-12-19|     46|Active|
    +----+---+------------+-------+------+