Consider the following sample dataset.
Date | symbol | qty | price per qty | type |
---|---|---|---|---|
07 July 2022 | REL2300PE | 200 | 50 | buy |
07 July 2022 | IDBI2300PE | 200 | 50 | sell |
15 July 2022 | REL2300PE | 100 | 50 | buy |
15 July 2022 | IDBI2300PE | 20 | 50 | buy |
16 July 2022 | REL2300PE | 200 | 35 | buy |
30 July 2022 | IDBI2300PE | 60 | 50 | sell |
30 July 2022 | REL2300PE | 450 | 45 | sell |
30 July 2022 | IDBI2300PE | 200 | 25 | sell |
If we focus on the stock symbol 'REL2300PE', it is being sold in quantities of 450, resulting in a total sale value of $20,250. Now, let's calculate the cost price for the first 450 units of this stock. The cost price can be calculated by summing up the product of the quantity and price per quantity for each buying trade associated with this stock. In this case, the cost price for the first 450 units is calculated as follows: 200 units * $50 + 100 units * $50 + 150 units * $35 = $20,250. Since the sale value and the cost price for the first 450 units are the same ($20,250), the profit/loss for this stock should be 0.
You could follow this algorithm:
Input:
from pyspark.sql import Window
from pyspark.sql.functions import col, when, sum
data = [\
('05 July 2022', 'IDBI2300PE', 500, 45, 'buy'),\
('07 July 2022', 'REL2300PE', 200, 50, 'buy'),\
('07 July 2022', 'IDBI2300PE', 200, 50, 'sell'),\
('15 July 2022', 'REL2300PE', 100, 50, 'buy'),\
('15 July 2022', 'IDBI2300PE', 20, 50, 'buy'),\
('16 July 2022', 'REL2300PE', 200, 35, 'buy'),\
('20 July 2022', 'REL2300PE', 200, 45, 'sell'),\
('30 July 2022', 'IDBI2300PE', 60, 50, 'sell'),\
('30 July 2022', 'REL2300PE', 250, 45, 'sell'),\
('31 July 2022', 'IDBI2300PE', 200, 25, 'sell')]
df = spark.createDataFrame(data, ["Date", "symbol", "qty", "price", "type"])
Calculate consumed qty:
# Calculate cumulative sum for sells and buys for the current and all prev rows
cum_sum_wind = Window.partitionBy('symbol').orderBy('Date').rangeBetween(Window.unboundedPreceding, 0)
df = df.withColumn('cum_sum', sum(when(col('type') == 'sell', -1 * col('qty'))\
.otherwise(col('qty'))).over(cum_sum_wind))
# Calculate cumulative sum for sells only for the current and all following rows
sell_cum_sum_wind = Window.partitionBy('symbol').orderBy('Date').rangeBetween(0, Window.unboundedFollowing)
df = df.withColumn('sell_cum_sum', sum(when(col('type') == 'sell', col('qty'))\
.otherwise(0)).over(sell_cum_sum_wind))
# Calculate the actual consumed qty
df = df.withColumn('cons_qty', when(col('type') == 'sell', col('qty'))\
.when(col('sell_cum_sum') > col('cum_sum'), col('qty'))\
# If negative then nothing is consumed from this row
.when((col('qty') - (col('cum_sum') - col('sell_cum_sum'))) < 0, 0)\
.otherwise(col('qty') - (col('cum_sum') - col('sell_cum_sum'))))
df.show()
+------------+----------+---+-----+----+-------+------------+--------+
| Date| symbol|qty|price|type|cum_sum|sell_cum_sum|cons_qty|
+------------+----------+---+-----+----+-------+------------+--------+
|05 July 2022|IDBI2300PE|500| 45| buy| 500| 460| 460|
|07 July 2022|IDBI2300PE|200| 50|sell| 300| 460| 200|
|15 July 2022|IDBI2300PE| 20| 50| buy| 320| 260| 0|
|30 July 2022|IDBI2300PE| 60| 50|sell| 260| 260| 60|
|31 July 2022|IDBI2300PE|200| 25|sell| 60| 200| 200|
|07 July 2022| REL2300PE|200| 50| buy| 200| 450| 200|
|15 July 2022| REL2300PE|100| 50| buy| 300| 450| 100|
|16 July 2022| REL2300PE|200| 35| buy| 500| 450| 150|
|20 July 2022| REL2300PE|200| 45|sell| 300| 450| 200|
|30 July 2022| REL2300PE|250| 45|sell| 50| 250| 250|
+------------+----------+---+-----+----+-------+------------+--------+
Calculate global profit/loss per symbol:
# Groupby symbol and calculate the profit/loss
result = df.groupby('symbol')\
.agg(
sum(when(col('type') == 'buy', -1 * col('price') * col('cons_qty'))\
.otherwise(col('price') * col('cons_qty'))\
).alias("profit_loss"))
result.show()
+----------+-----------+
| symbol|profit_loss|
+----------+-----------+
|IDBI2300PE| -2700|
| REL2300PE| 0|
+----------+-----------+