Suppose I have a pyspark dataframe with a number of unique account values, each of which have a unique number of entries, like so:
+-------------_+--------+--------+
| account| col1| col2 | col3 |
+--------+-----+--------+--------+
| 325235 | 59| -6| 625.64|
| 325235 | 23| -282| 923.47|
| 325235 | 77|-1310.89| 3603.48|
| 245623 | 120| 1.53| 1985.63|
| 245623 | 106| -12| 1985.06|
| 658567 | 84| -12| 194.67|
I want to specify a batch size, and assign multiple accounts to the same batch based on the batch size. Lets suppose I choose batch size = 2, then the output should be the following:
+-------------_+--------+--------+--------------+
| account| col1| col2 | col3 | batch_number |
+--------+-----+--------+--------+--------------+
| 325235 | 59| -6| 625.64| 1|
| 325235 | 23| -282| 923.47| 1|
| 325235 | 77|-1310.89| 3603.48| 1|
| 245623 | 120| 1.53| 1985.63| 1|
| 245623 | 106| -12| 1985.06| 1|
| 658567 | 84| -12| 194.67| 2|
I can then do a groupby on the batch_number
column and have multiple accounts per batch. Here is my working code, but it is too slow since I am doing a toPandas().
# Get unique accounts in source data
accounts = [row.account for row in source_data.select("account").distinct().collect()]
# Find number of batches based. Last batch will have size = remainder
num_batches, remainder = divmod(len(accounts), batchsize)
# Create batch dataframe where a batch number is assigned to each account.
batches = [i for _ in range(batchsize) for i in range(1, int(num_batches) + 1)] + [num_batches + 1 for i in range(remainder)]
batch_df = pd.DataFrame({"account": accounts, "batch_number": batches}, columns=["account", "batch_number"]).set_index("account")
# Add a zero column for batch number to source data which will be populated
source_data = source_data.withColumn("batch_number", lit(0))
# Map batch numbers of accounts back into the source data
source_data_p = source_data.toPandas()
for ind in source_data_p.index:
source_data_p.at[ind, "batch_number"] = batch_df.at[source_data_p.at[ind, "account"], "batch_number"]
# Convert mapped pandas df back to spark df
batched_df = sqlcontext.createDataFrame(source_data_p)
I would ideally like to get rid of the toPandas() call, and do the mapping in pyspark. I have seen a few related posts, like this one: How to batch up items from a PySpark DataFrame, but this doesn't fit into the flow of my code, so I will have to re-write the whole project just to implement this.
From what I understand you can use an indexer using mllib
or any other way and then floor division:
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer
n=2
idx = StringIndexer(inputCol="account",outputCol="batch_number")
(idx.fit(df).transform(df)
.withColumn("batch_number",F.floor(F.col("batch_number")/n)+1)).show()
+-------+----+--------+-------+------------+
|account|col1| col2| col3|batch_number|
+-------+----+--------+-------+------------+
| 325235| 59| -6.0| 625.64| 1|
| 325235| 23| -282.0| 923.47| 1|
| 325235| 77|-1310.89|3603.48| 1|
| 245623| 120| 1.53|1985.63| 1|
| 245623| 106| -12.0|1985.06| 1|
| 658567| 84| -12.0| 194.67| 2|
+-------+----+--------+-------+------------+