Search code examples
aggregationolapcubeactivepivot

How to aggregate on a column while grouping by several columns values using CoPPer?


I have a dataset with the current stock for some products:

+--------------+-------+
| Product      | Stock |
+--------------+-------+
| chocolate    |   300 |
| coal         |    70 |
| orange juice |   400 |
+--------------+-------+

and the sales for every product over the years for the current month and the next month in another dataset:

+--------------+------+-------+-------+
| Product      | Year | Month | Sales |
+--------------+------+-------+-------+
| chocolate    | 2017 |    05 |    55 |
| chocolate    | 2017 |    04 |   250 |
| chocolate    | 2016 |    05 |    70 |
| chocolate    | 2016 |    04 |   200 |
|     |        |   |  |     | |     | |
| coal         | 2017 |    05 |    40 |
| coal         | 2017 |    04 |    30 |
| coal         | 2016 |    05 |    50 |
| coal         | 2016 |    04 |    20 |
|     |        |   |  |     | |     | |
| orange juice | 2017 |    05 |   400 |
| orange juice | 2017 |    04 |   350 |
| orange juice | 2016 |    05 |   400 |
| orange juice | 2016 |    04 |   300 |
+--------------+--------------+-------+

I want to compute the stock that I will need to order for the next month, by computing the expected sales over the current month and the next month, using the following formula:

ExpectedSales = max(salesMaxCurrentMonth) + max(salesMaxNextMonth)

The orders will then be

Orders = ExpectedSales * (1 + margin) - Stock

Where margin is, for example, 10%.

I tried to group by several columns using GroupBy, as in the following, but it seems to aggregate by Stock instead of Product:

salesDataset
    .groupBy(Columns.col("Month"), Columns.col(“Product”))
    .agg(Columns.max(“Sales”).as(“SalesMaxPerMonth”))
    .agg(Columns.sum(“SalesMaxPerMonth”).as(SalesPeriod))
    .withColumn(
        “SalesExpected”, 
        Columns.col(“SalesPeriod”).multiply(Columns.literal(1 + margin)))
    .withColumn(
        “Orders”,
        Columns.col(“SalesExpected”).minus(Columns.col(“Stock”)))
    .withColumn(
        “Orders”,
        Columns.col(“Orders”).map((Double a) -> a >= 0 ? a: 0))
    .doNotAggregateAbove()
    .toCellSet()
    .show();

Solution

  • You got the logic correct in terms of aggregation but there is another way to build your CellSet, where you provide a map to describe the location of the query which generates it.

    salesDataset
        .groupBy(Columns.col("Month"), Columns.col(“Product”))
        .agg(Columns.max(“Sales”).as(“SalesMaxPerMonth”))
        .agg(Columns.sum(“SalesMaxPerMonth”).as(SalesPeriod))
        .withColumn(
            “SalesExpected”, 
            Columns.col(“SalesPeriod”).multiply(Columns.literal(1 + margin)))
        .withColumn(“Orders”, Columns.col(“SalesExpected”).minus(Columns.col(“Stock”)))
        .withColumn(“Orders”, Columns.col(“Orders”).map((Double a) -> a >= 0 ? a: 0))
        .doNotAggregateAbove()
        .toCellSet(
            Empty.<String, Object>map()
            .put(“Product”,null)
            .put(“Stock”, null))
        .show();
    

    Where null in a location represents the wildcard *.