Search code examples
sqlapache-sparkpysparkapache-spark-sqlhive

I need to calculate profit/loss for given stock data set, ensuring that the first bought items are sold first


Consider the following sample dataset.

Date symbol qty price per qty type
07 July 2022 REL2300PE 200 50 buy
07 July 2022 IDBI2300PE 200 50 sell
15 July 2022 REL2300PE 100 50 buy
15 July 2022 IDBI2300PE 20 50 buy
16 July 2022 REL2300PE 200 35 buy
30 July 2022 IDBI2300PE 60 50 sell
30 July 2022 REL2300PE 450 45 sell
30 July 2022 IDBI2300PE 200 25 sell

If we focus on the stock symbol 'REL2300PE', it is being sold in quantities of 450, resulting in a total sale value of $20,250. Now, let's calculate the cost price for the first 450 units of this stock. The cost price can be calculated by summing up the product of the quantity and price per quantity for each buying trade associated with this stock. In this case, the cost price for the first 450 units is calculated as follows: 200 units * $50 + 100 units * $50 + 150 units * $35 = $20,250. Since the sale value and the cost price for the first 450 units are the same ($20,250), the profit/loss for this stock should be 0.


Solution

  • You could follow this algorithm:

    • Calculate the cumulative sum for the sells and buys for the current row and all prev rows, and consider the sell qty with negative
    • Calculate the cumulative sum for the sales only for the current row and all following rows, so you can get the total sold items at the oldest buy qty
    • Get the difference between the first two and consider it as the actual consumed-qty
    • Group by symbol/product and calculate the profit/loss

    Input:

    from pyspark.sql import Window
    from pyspark.sql.functions import col, when, sum
    
    data = [\
        ('05 July 2022', 'IDBI2300PE', 500, 45, 'buy'),\
        ('07 July 2022', 'REL2300PE', 200, 50, 'buy'),\
        ('07 July 2022', 'IDBI2300PE', 200, 50, 'sell'),\
        ('15 July 2022', 'REL2300PE', 100, 50, 'buy'),\
        ('15 July 2022', 'IDBI2300PE', 20, 50, 'buy'),\
        ('16 July 2022', 'REL2300PE', 200, 35, 'buy'),\
        ('20 July 2022', 'REL2300PE', 200, 45, 'sell'),\
        ('30 July 2022', 'IDBI2300PE', 60, 50, 'sell'),\
        ('30 July 2022', 'REL2300PE', 250, 45, 'sell'),\
        ('31 July 2022', 'IDBI2300PE', 200, 25, 'sell')]
    
    df = spark.createDataFrame(data, ["Date", "symbol", "qty", "price", "type"])
    

    Calculate consumed qty:

    # Calculate cumulative sum for sells and buys for the current and all prev rows 
    cum_sum_wind = Window.partitionBy('symbol').orderBy('Date').rangeBetween(Window.unboundedPreceding, 0)
    df = df.withColumn('cum_sum', sum(when(col('type') == 'sell', -1 * col('qty'))\
                                        .otherwise(col('qty'))).over(cum_sum_wind))
    
    # Calculate cumulative sum for sells only for the current and all following rows
    sell_cum_sum_wind = Window.partitionBy('symbol').orderBy('Date').rangeBetween(0, Window.unboundedFollowing)
    df = df.withColumn('sell_cum_sum', sum(when(col('type') == 'sell', col('qty'))\
                                             .otherwise(0)).over(sell_cum_sum_wind))
    
    # Calculate the actual consumed qty
    df = df.withColumn('cons_qty', when(col('type') == 'sell', col('qty'))\ 
                                    .when(col('sell_cum_sum') > col('cum_sum'), col('qty'))\
                                    # If negative then nothing is consumed from this row
                                    .when((col('qty') - (col('cum_sum') - col('sell_cum_sum'))) < 0, 0)\
                                    .otherwise(col('qty') - (col('cum_sum') - col('sell_cum_sum'))))
    
    df.show()
    
    +------------+----------+---+-----+----+-------+------------+--------+
    |        Date|    symbol|qty|price|type|cum_sum|sell_cum_sum|cons_qty|
    +------------+----------+---+-----+----+-------+------------+--------+
    |05 July 2022|IDBI2300PE|500|   45| buy|    500|         460|     460|
    |07 July 2022|IDBI2300PE|200|   50|sell|    300|         460|     200|
    |15 July 2022|IDBI2300PE| 20|   50| buy|    320|         260|       0|
    |30 July 2022|IDBI2300PE| 60|   50|sell|    260|         260|      60|
    |31 July 2022|IDBI2300PE|200|   25|sell|     60|         200|     200|
    |07 July 2022| REL2300PE|200|   50| buy|    200|         450|     200|
    |15 July 2022| REL2300PE|100|   50| buy|    300|         450|     100|
    |16 July 2022| REL2300PE|200|   35| buy|    500|         450|     150|
    |20 July 2022| REL2300PE|200|   45|sell|    300|         450|     200|
    |30 July 2022| REL2300PE|250|   45|sell|     50|         250|     250|
    +------------+----------+---+-----+----+-------+------------+--------+
    

    Calculate global profit/loss per symbol:

    # Groupby symbol and calculate the profit/loss
    result = df.groupby('symbol')\
        .agg(
            sum(when(col('type') == 'buy', -1 * col('price') * col('cons_qty'))\
                    .otherwise(col('price') * col('cons_qty'))\
            ).alias("profit_loss"))
    result.show()
    
    +----------+-----------+
    |    symbol|profit_loss|
    +----------+-----------+
    |IDBI2300PE|      -2700|
    | REL2300PE|          0|
    +----------+-----------+