Search code examples
c#dataframeapache-sparkwindow-functions.net-spark

Forward filling in .NET for Spark


I am looking at the window function for a Spark DataFrame in .NET (C#).

I have a DataFrame df with columns Year, Month, Day, Hour, Minute, ID, Type and Value:

| 2021 |  3  |  4  |  8  |  9  |  87  |  Type1  |  380.5  |

| 2021 |  3  |  4  |  8  |  10 | null |   null  |   null  |

| 2021 |  3  |  4  |  8  |  11 | null |   null  |   null  |

| 2021 |  3  |  4  |  8  |  12 | null |   null  |   null  |

| 2021 |  3  |  4  |  8  |  13 |  87  |  Type1  |    0.0  |

| 2021 |  3  |  4  |  8  |  14 |  87  |  Type1  |    0.0  |

I would like to fill empty rows (nulls) with values from previous row based on year, month, day, hour, minute as shown below:

| 2021 |  3  |  4  |  8  |  9  |  87  |  Type1  |  380.5  |

| 2021 |  3  |  4  |  8  |  10 |  87  |  Type1  |  380.5  |

| 2021 |  3  |  4  |  8  |  11 |  87  |  Type1  |  380.5  |

| 2021 |  3  |  4  |  8  |  12 |  87  |  Type1  |  380.5  |

| 2021 |  3  |  4  |  8  |  13 |  87  |  Type1  |    0.0  |

| 2021 |  3  |  4  |  8  |  14 |  87  |  Type1  |    0.0  |

So far, I found solutions using Windows and Lag function in scala, but I am not sure how to do it in C#. In scala the window would be defined as something like:

val window = Window.orderBy("Year", "Month", "Day", "Hour", "Minute")

I would like to add a newValue column using

var filledDataFrame = df.WithColumn("newValue", Functions.When(df["Value"].IsNull(), Functions.Lag(df["Value"], 1).Over(window)).Otherwise(df["Value"])

How to define a window in .NET for Spark and use Lag function for forward-filling null values?


Solution

  • To use Lag and a Window with .NET for Apache Spark you are very close and would need:

    var spark = SparkSession.Builder().GetOrCreate();
    var df = spark.CreateDataFrame(new List<GenericRow>()
    {
        new GenericRow(new object[] {2021, 3, 4, 8, 9, 87, "Type1", 380.5}),
        new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
        new GenericRow(new object[] {2021, 3, 4, 8, 11, null, null, null}),
        new GenericRow(new object[] {2021, 3, 4, 8, 12, null, null, null}),
        new GenericRow(new object[] {2021, 3, 4, 8, 13, 87, "Type1", 0.0}),
        new GenericRow(new object[] {2021, 3, 4, 8, 14, 87, "Type1", 0.0})
    }, new StructType(new List<StructField>()
    {
        new StructField("Year", new IntegerType()),
        new StructField("Month", new IntegerType()),
        new StructField("Day", new IntegerType()),
        new StructField("Hour", new IntegerType()),
        new StructField("Minute", new IntegerType()),
        new StructField("ID", new IntegerType()),
        new StructField("Type", new StringType()),
        new StructField("Value", new DoubleType()),
    
    }));
    
    var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
    var filledDataFrame = df.WithColumn("newValue",
        Functions.When(df["Value"].IsNull(),
                Functions.Lag(df["Value"], 1).Over(window))
            .Otherwise(df["Value"]));
    
    filledDataFrame.Show(1000, 10000);
    

    This would result in:

    +----+-----+---+----+------+----+-----+-----+--------+
    |Year|Month|Day|Hour|Minute|  ID| Type|Value|newValue|
    +----+-----+---+----+------+----+-----+-----+--------+
    |2021|    3|  4|   8|     9|  87|Type1|380.5|   380.5|
    |2021|    3|  4|   8|    10|null| null| null|   380.5|
    |2021|    3|  4|   8|    11|null| null| null|    null|
    |2021|    3|  4|   8|    12|null| null| null|    null|
    |2021|    3|  4|   8|    13|  87|Type1|  0.0|     0.0|
    |2021|    3|  4|   8|    14|  87|Type1|  0.0|     0.0|
    +----+-----+---+----+------+----+-----+-----+--------+
    

    but you probably want Last instead of Lag as you can skip nulls:

    var spark = SparkSession.Builder().GetOrCreate();
    var df = spark.CreateDataFrame(new List<GenericRow>()
    {
        new GenericRow(new object[] {2021, 3, 4, 8, 9, 87, "Type1", 380.5}),
        new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
        new GenericRow(new object[] {2021, 3, 4, 8, 11, null, null, null}),
        new GenericRow(new object[] {2021, 3, 4, 8, 12, null, null, null}),
        new GenericRow(new object[] {2021, 3, 4, 8, 13, 87, "Type1", 0.0}),
        new GenericRow(new object[] {2021, 3, 4, 8, 14, 87, "Type1", 0.0})
    }, new StructType(new List<StructField>()
    {
        new StructField("Year", new IntegerType()),
        new StructField("Month", new IntegerType()),
        new StructField("Day", new IntegerType()),
        new StructField("Hour", new IntegerType()),
        new StructField("Minute", new IntegerType()),
        new StructField("ID", new IntegerType()),
        new StructField("Type", new StringType()),
        new StructField("Value", new DoubleType()),
    
    }));
    
    var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
    var filledDataFrame = df.WithColumn("newValue",
        Functions.When(df["Value"].IsNull(),
            Functions.Last(df["Value"], true).Over(window))
            .Otherwise(df["Value"]));
    
    filledDataFrame.Show(1000, 10000);
    
    

    Which results in:

    +----+-----+---+----+------+----+-----+-----+--------+
    |Year|Month|Day|Hour|Minute|  ID| Type|Value|newValue|
    +----+-----+---+----+------+----+-----+-----+--------+
    |2021|    3|  4|   8|     9|  87|Type1|380.5|   380.5|
    |2021|    3|  4|   8|    10|null| null| null|   380.5|
    |2021|    3|  4|   8|    11|null| null| null|   380.5|
    |2021|    3|  4|   8|    12|null| null| null|   380.5|
    |2021|    3|  4|   8|    13|  87|Type1|  0.0|     0.0|
    |2021|    3|  4|   8|    14|  87|Type1|  0.0|     0.0|
    +----+-----+---+----+------+----+-----+-----+--------+
    
    

    hope it helps!

    ed

    (the using statements needed to make this work)

    using System;
    using System.Collections.Generic;
    using Microsoft.Spark.Sql;
    using Microsoft.Spark.Sql.Expressions;
    using Microsoft.Spark.Sql.Types;