I am looking at the window function for a Spark DataFrame
in .NET (C#).
I have a DataFrame df
with columns Year, Month, Day, Hour, Minute, ID, Type and Value:
| 2021 | 3 | 4 | 8 | 9 | 87 | Type1 | 380.5 |
| 2021 | 3 | 4 | 8 | 10 | null | null | null |
| 2021 | 3 | 4 | 8 | 11 | null | null | null |
| 2021 | 3 | 4 | 8 | 12 | null | null | null |
| 2021 | 3 | 4 | 8 | 13 | 87 | Type1 | 0.0 |
| 2021 | 3 | 4 | 8 | 14 | 87 | Type1 | 0.0 |
I would like to fill empty rows (nulls) with values from previous row based on year, month, day, hour, minute as shown below:
| 2021 | 3 | 4 | 8 | 9 | 87 | Type1 | 380.5 |
| 2021 | 3 | 4 | 8 | 10 | 87 | Type1 | 380.5 |
| 2021 | 3 | 4 | 8 | 11 | 87 | Type1 | 380.5 |
| 2021 | 3 | 4 | 8 | 12 | 87 | Type1 | 380.5 |
| 2021 | 3 | 4 | 8 | 13 | 87 | Type1 | 0.0 |
| 2021 | 3 | 4 | 8 | 14 | 87 | Type1 | 0.0 |
So far, I found solutions using Windows and Lag function in scala, but I am not sure how to do it in C#. In scala the window would be defined as something like:
val window = Window.orderBy("Year", "Month", "Day", "Hour", "Minute")
I would like to add a newValue column using
var filledDataFrame = df.WithColumn("newValue", Functions.When(df["Value"].IsNull(), Functions.Lag(df["Value"], 1).Over(window)).Otherwise(df["Value"])
How to define a window in .NET for Spark and use Lag function for forward-filling null values?
To use Lag and a Window with .NET for Apache Spark you are very close and would need:
var spark = SparkSession.Builder().GetOrCreate();
var df = spark.CreateDataFrame(new List<GenericRow>()
{
new GenericRow(new object[] {2021, 3, 4, 8, 9, 87, "Type1", 380.5}),
new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 11, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 12, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 13, 87, "Type1", 0.0}),
new GenericRow(new object[] {2021, 3, 4, 8, 14, 87, "Type1", 0.0})
}, new StructType(new List<StructField>()
{
new StructField("Year", new IntegerType()),
new StructField("Month", new IntegerType()),
new StructField("Day", new IntegerType()),
new StructField("Hour", new IntegerType()),
new StructField("Minute", new IntegerType()),
new StructField("ID", new IntegerType()),
new StructField("Type", new StringType()),
new StructField("Value", new DoubleType()),
}));
var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
var filledDataFrame = df.WithColumn("newValue",
Functions.When(df["Value"].IsNull(),
Functions.Lag(df["Value"], 1).Over(window))
.Otherwise(df["Value"]));
filledDataFrame.Show(1000, 10000);
This would result in:
+----+-----+---+----+------+----+-----+-----+--------+
|Year|Month|Day|Hour|Minute| ID| Type|Value|newValue|
+----+-----+---+----+------+----+-----+-----+--------+
|2021| 3| 4| 8| 9| 87|Type1|380.5| 380.5|
|2021| 3| 4| 8| 10|null| null| null| 380.5|
|2021| 3| 4| 8| 11|null| null| null| null|
|2021| 3| 4| 8| 12|null| null| null| null|
|2021| 3| 4| 8| 13| 87|Type1| 0.0| 0.0|
|2021| 3| 4| 8| 14| 87|Type1| 0.0| 0.0|
+----+-----+---+----+------+----+-----+-----+--------+
but you probably want Last
instead of Lag
as you can skip nulls:
var spark = SparkSession.Builder().GetOrCreate();
var df = spark.CreateDataFrame(new List<GenericRow>()
{
new GenericRow(new object[] {2021, 3, 4, 8, 9, 87, "Type1", 380.5}),
new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 11, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 12, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 13, 87, "Type1", 0.0}),
new GenericRow(new object[] {2021, 3, 4, 8, 14, 87, "Type1", 0.0})
}, new StructType(new List<StructField>()
{
new StructField("Year", new IntegerType()),
new StructField("Month", new IntegerType()),
new StructField("Day", new IntegerType()),
new StructField("Hour", new IntegerType()),
new StructField("Minute", new IntegerType()),
new StructField("ID", new IntegerType()),
new StructField("Type", new StringType()),
new StructField("Value", new DoubleType()),
}));
var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
var filledDataFrame = df.WithColumn("newValue",
Functions.When(df["Value"].IsNull(),
Functions.Last(df["Value"], true).Over(window))
.Otherwise(df["Value"]));
filledDataFrame.Show(1000, 10000);
Which results in:
+----+-----+---+----+------+----+-----+-----+--------+
|Year|Month|Day|Hour|Minute| ID| Type|Value|newValue|
+----+-----+---+----+------+----+-----+-----+--------+
|2021| 3| 4| 8| 9| 87|Type1|380.5| 380.5|
|2021| 3| 4| 8| 10|null| null| null| 380.5|
|2021| 3| 4| 8| 11|null| null| null| 380.5|
|2021| 3| 4| 8| 12|null| null| null| 380.5|
|2021| 3| 4| 8| 13| 87|Type1| 0.0| 0.0|
|2021| 3| 4| 8| 14| 87|Type1| 0.0| 0.0|
+----+-----+---+----+------+----+-----+-----+--------+
hope it helps!
ed
(the using statements needed to make this work)
using System;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Expressions;
using Microsoft.Spark.Sql.Types;