Search code examples
pythonpandaspysparkgroup-bybatch-processing

Pyspark create batch number column based on account


Suppose I have a pyspark dataframe with a number of unique account values, each of which have a unique number of entries, like so:

+-------------_+--------+--------+
| account| col1|  col2  | col3   |
+--------+-----+--------+--------+
| 325235 |   59|      -6|  625.64|
| 325235 |   23|    -282|  923.47|
| 325235 |   77|-1310.89| 3603.48|
| 245623 |  120|    1.53| 1985.63|
| 245623 |  106|     -12| 1985.06|
| 658567 |   84|     -12|  194.67|

I want to specify a batch size, and assign multiple accounts to the same batch based on the batch size. Lets suppose I choose batch size = 2, then the output should be the following:

+-------------_+--------+--------+--------------+
| account| col1|  col2  | col3   | batch_number |
+--------+-----+--------+--------+--------------+
| 325235 |   59|      -6|  625.64|             1|
| 325235 |   23|    -282|  923.47|             1|
| 325235 |   77|-1310.89| 3603.48|             1|
| 245623 |  120|    1.53| 1985.63|             1|
| 245623 |  106|     -12| 1985.06|             1|
| 658567 |   84|     -12|  194.67|             2|

I can then do a groupby on the batch_number column and have multiple accounts per batch. Here is my working code, but it is too slow since I am doing a toPandas().

# Get unique accounts in source data
accounts = [row.account for row in source_data.select("account").distinct().collect()]
    
# Find number of batches based. Last batch will have size = remainder
num_batches, remainder = divmod(len(accounts), batchsize)
    
# Create batch dataframe where a batch number is assigned to each account.
batches = [i for _ in range(batchsize) for i in range(1, int(num_batches) + 1)] + [num_batches + 1 for i in range(remainder)]
batch_df = pd.DataFrame({"account": accounts, "batch_number": batches}, columns=["account", "batch_number"]).set_index("account")
    
# Add a zero column for batch number to source data which will be populated
source_data = source_data.withColumn("batch_number", lit(0))
    
# Map batch numbers of accounts back into the source data
source_data_p = source_data.toPandas()
for ind in source_data_p.index:
    source_data_p.at[ind, "batch_number"] = batch_df.at[source_data_p.at[ind, "account"], "batch_number"]
        
# Convert mapped pandas df back to spark df
batched_df = sqlcontext.createDataFrame(source_data_p)

I would ideally like to get rid of the toPandas() call, and do the mapping in pyspark. I have seen a few related posts, like this one: How to batch up items from a PySpark DataFrame, but this doesn't fit into the flow of my code, so I will have to re-write the whole project just to implement this.


Solution

  • From what I understand you can use an indexer using mllib or any other way and then floor division:

    import pyspark.sql.functions as F
    from pyspark.ml.feature import StringIndexer
    

    n=2
    
    idx = StringIndexer(inputCol="account",outputCol="batch_number")
    
    (idx.fit(df).transform(df)
        .withColumn("batch_number",F.floor(F.col("batch_number")/n)+1)).show()
    

    +-------+----+--------+-------+------------+
    |account|col1|    col2|   col3|batch_number|
    +-------+----+--------+-------+------------+
    | 325235|  59|    -6.0| 625.64|           1|
    | 325235|  23|  -282.0| 923.47|           1|
    | 325235|  77|-1310.89|3603.48|           1|
    | 245623| 120|    1.53|1985.63|           1|
    | 245623| 106|   -12.0|1985.06|           1|
    | 658567|  84|   -12.0| 194.67|           2|
    +-------+----+--------+-------+------------+