I would like to fill gaps in my DataFrame
using .NET for Spark.
The current DataFrame (rawData)
contains data on a minute interval between reportFrom
and reportTo
DateTime reportFrom = new DateTime(2021, 3, 4, 0, 0, 0);
DateTime reportTo = new DateTime(2021, 3, 5, 0, 0, 0);
Some intervals are missing and I would like to fill them in with the last known value.
+----+-----+---+----+------+------------------+--------------------+------------------+
|Year|Month|Day|Hour|Minute|Id | Type| Value|
+----+-----+---+----+------+------------------+--------------------+------------------+
|2021| 3| 4| 0| 0| 87| Power| 0.0|
|2021| 3| 4| 0| 1| 87| Power| 0.0|
|2021| 3| 4| 0| 2| 87| Power| 0.0|
...
|2021| 3| 4| 14| 2| 87| Power| 380.0|
|2021| 3| 4| 14| 3| 87| Power| 380.0|
|2021| 3| 4| 14| 4| 87| Power| 380.0|
|2021| 3| 4| 14| 5| 87| Power| 380.0|
|2021| 3| 4| 14| 7| 87| Power| 380.0|
...
|2021| 3| 4| 22| 7| 87| Power| 0.0|
The result I am expecting after the first step (inserting missing minutes) is:
+----+-----+---+----+------+------------------+--------------------+------------------+
|Year|Month|Day|Hour|Minute|Id | Type| Value|
+----+-----+---+----+------+------------------+--------------------+------------------+
|2021| 3| 4| 0| 0| 87| Power| 0.0|
|2021| 3| 4| 0| 1| 87| Power| 0.0|
|2021| 3| 4| 0| 2| 87| Power| 0.0|
...
|2021| 3| 4| 14| 2| 87| Power| 380.0|
|2021| 3| 4| 14| 3| 87| Power| 380.0|
|2021| 3| 4| 14| 4| 87| Power| 380.0|
|2021| 3| 4| 14| 5| 87| Power| 380.0|
|2021| 3| 4| 14| 6| null| null| null|
|2021| 3| 4| 14| 7| 87| Power| 380.0|
|2021| 3| 4| 14| 8| null| null| null|
...
|2021| 3| 4| 23| 59| null| null| null|
So far, I used to create a new DataFrame
with all the minutes and then performing left outer Join
on both dataframes.
int inc = 1;
List<DateTime> timeList = new List<DateTime>();
while (reportFrom < reportTo)
{
timeList.Add(reportFrom);
reportFrom = reportFrom.AddMinutes(inc);
}
var toFillTime0 = new List<object> { -1, 0, 0, 0, 0 };
var dataToFill = spark.CreateDataFrame(
new List<GenericRow> { new GenericRow(toFillTime0.ToArray()) },
new StructType( //shema
new List<StructField>()
{
new StructField("Year0", new IntegerType()),
new StructField("Month0", new IntegerType()),
new StructField("Day0", new IntegerType()),
new StructField("Hour0", new IntegerType()),
new StructField("Minute0", new IntegerType()),
}));
foreach (DateTime time in timeList)
{
var toFillTime = new List<object> { time.Year, time.Month, time.Day, time.Hour, time.Minute };
var dataToFillt = spark.CreateDataFrame(
new List<GenericRow> { new GenericRow(toFillTime.ToArray()) },
new StructType( //shema
new List<StructField>()
{
new StructField("Year0", new IntegerType()),
new StructField("Month0", new IntegerType()),
new StructField("Day0", new IntegerType()),
new StructField("Hour0", new IntegerType()),
new StructField("Minute0", new IntegerType()),
}));
dataToFill = dataToFill.Union(dataToFillt);
}
dataToFill = dataToFill.Filter("Year0 > 0");
var toFillReportDataReq = dataToFill.Join(rawData,
dataToFill["Year0"] == update10["Year"] & dataToFill["Month0"] == update10["Month"] & dataToFill["Day0"] == update10["Day"]
& dataToFill["Hour0"] == update10["Hour"] & dataToFill["Minute0"] == update10["Minute"], "left_outer");
A few rows of toFillReportDataReq
are shown bellow:
|2021| 3| 4| 22| 4| 87| Power| 0.0|
|2021| 3| 4| 22| 5| 87| Power| 0.0|
|2021| 3| 4| 22| 6| 87| Power| 0.0|
|2021| 3| 4| 22| 7| 87| Power| 0.0|
|2021| 3| 4| 22| 8| null| null| null|
|2021| 3| 4| 22| 9| null| null| null|
|2021| 3| 4| 22| 10| null| null| null|
|2021| 3| 4| 22| 11| null| null| null|
|2021| 3| 4| 22| 12| null| null| null|
|2021| 3| 4| 22| 13| null| null| null|
|2021| 3| 4| 22| 14| null| null| null|
The replacement of null values in the column Values
is already covered using window
and last
function.
The values in columns Id
and Type
are replaced with var id = 87
and "Power" using
toFillReportDataReq = toFillReportDataReq.WithColumn("Id", Functions.When(toFillReportDataReq["Id"].IsNull(), id)
.Otherwise(toFillReportDataReq["Id"]))
.WithColumn("Type", Functions.When(toFillReportDataReq["Type"].IsNull(), "Power")
.Otherwise(toFillReportDataReq["Type"]));
This method returns the results I want, but it is very time consuming (inefficient).
My questions are following:
DataFrame
containing all the minutes between the intervals specified?Thanks!
This is the approach I would take:
using System;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Expressions;
using Microsoft.Spark.Sql.Types;
namespace StackOverflow
{
class Program
{
static void Main(string[] args)
{
var spark = SparkSession.Builder().GetOrCreate();
//Sample data set - we will fill in the missing minutes
var df = spark.CreateDataFrame(new List<GenericRow>()
{
new GenericRow(new object[] {2021, 3, 4, 8, 3, 87, "Type1", 380.5}),
new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 20, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 25, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 35, 87, "Type1", 0.0}),
new GenericRow(new object[] {2021, 3, 4, 8, 45, 87, "Type1", 0.0})
}, new StructType(new List<StructField>()
{
new StructField("Year", new IntegerType()),
new StructField("Month", new IntegerType()),
new StructField("Day", new IntegerType()),
new StructField("Hour", new IntegerType()),
new StructField("Minute", new IntegerType()),
new StructField("ID", new IntegerType()),
new StructField("Type", new StringType()),
new StructField("Value", new DoubleType()),
}));
//start and end time
var reportFrom = new DateTime(2021, 3, 4, 7, 0, 0);
var reportTo = new DateTime(2021, 3, 4, 9, 0, 0);
//convert start time to unix epoch as we can't pass a DateTime to spark (yet!)
var unixFromTime = (reportFrom - new DateTime(1970, 1, 1, 0, 0, 0, 0)).TotalSeconds;
//how many total rows do we need?
var minutesToCreate = reportTo.Subtract(reportFrom).TotalMinutes;
//create a dataframe with 1 row for every minute we need
var everyMinute = spark.Range((long) minutesToCreate);
//Add the reportFrom unix epoch
everyMinute = everyMinute.WithColumn("BaseTime", Functions.Lit(unixFromTime));
//add to the unix epoch, the Id (incrementing number) multiplied by 60 - if we didn't mul(60) it would add seconds and not minutes
everyMinute = everyMinute.WithColumn("Time",
Functions.Lit(unixFromTime)
.Plus(Functions.Col("Id").Cast("Int").Multiply(Functions.Lit(60))));
//convert the unix epoch to an actual timestamp and drop all the intermediate columns
everyMinute = everyMinute.WithColumn("Date",
Functions.ToTimestamp(Functions.FromUnixTime(Functions.Col("Time")))).Select("Date");
//convert timestamp into individual columns
everyMinute = everyMinute.WithColumn("Year", Functions.Year(Functions.Col("Date")));
everyMinute = everyMinute.WithColumn("Month", Functions.Month(Functions.Col("Date")));
everyMinute = everyMinute.WithColumn("Day", Functions.DayOfMonth(Functions.Col("Date")));
everyMinute = everyMinute.WithColumn("Hour", Functions.Hour(Functions.Col("Date")));
everyMinute = everyMinute.WithColumn("Minute", Functions.Minute(Functions.Col("Date")));
//join both data frames so...
var dfAllData = everyMinute.Join(df, new List<string>() {"Year", "Month", "Day", "Hour", "Minute"}, "left_outer");
//add in data using Last
var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
var filledDataFrame = dfAllData.WithColumn("newValue",
Functions.When(dfAllData["Value"].IsNull(),
Functions.Last(dfAllData["Value"], true).Over(window))
.Otherwise(dfAllData["Value"]));
filledDataFrame.Show(1000, 10000);
}
}
}
ed