Search code examples
c#dataframeapache-spark.net-spark

Time Efficient gap filling data in dataframe using .NET for Spark


I would like to fill gaps in my DataFrame using .NET for Spark.

The current DataFrame (rawData) contains data on a minute interval between reportFrom and reportTo

DateTime reportFrom = new DateTime(2021, 3, 4, 0, 0, 0);
DateTime reportTo = new DateTime(2021, 3, 5, 0, 0, 0);

Some intervals are missing and I would like to fill them in with the last known value.

+----+-----+---+----+------+------------------+--------------------+------------------+
|Year|Month|Day|Hour|Minute|Id                |                Type|             Value|
+----+-----+---+----+------+------------------+--------------------+------------------+
|2021|    3|  4|   0|     0|                87|               Power|               0.0|
|2021|    3|  4|   0|     1|                87|               Power|               0.0|
|2021|    3|  4|   0|     2|                87|               Power|               0.0|
...
|2021|    3|  4|  14|     2|                87|               Power|             380.0|
|2021|    3|  4|  14|     3|                87|               Power|             380.0|
|2021|    3|  4|  14|     4|                87|               Power|             380.0|
|2021|    3|  4|  14|     5|                87|               Power|             380.0|
|2021|    3|  4|  14|     7|                87|               Power|             380.0|
...
|2021|    3|  4|  22|     7|                87|               Power|               0.0|

The result I am expecting after the first step (inserting missing minutes) is:

+----+-----+---+----+------+------------------+--------------------+------------------+
|Year|Month|Day|Hour|Minute|Id                |                Type|             Value|
+----+-----+---+----+------+------------------+--------------------+------------------+
|2021|    3|  4|   0|     0|                87|               Power|               0.0|
|2021|    3|  4|   0|     1|                87|               Power|               0.0|
|2021|    3|  4|   0|     2|                87|               Power|               0.0|
...
|2021|    3|  4|  14|     2|                87|               Power|             380.0|
|2021|    3|  4|  14|     3|                87|               Power|             380.0|
|2021|    3|  4|  14|     4|                87|               Power|             380.0|
|2021|    3|  4|  14|     5|                87|               Power|             380.0|
|2021|    3|  4|  14|     6|              null|                null|              null|
|2021|    3|  4|  14|     7|                87|               Power|             380.0|
|2021|    3|  4|  14|     8|              null|                null|              null|
...
|2021|    3|  4|  23|    59|              null|                null|              null|               

So far, I used to create a new DataFrame with all the minutes and then performing left outer Join on both dataframes.

int inc = 1;
List<DateTime> timeList = new List<DateTime>();
while (reportFrom < reportTo)
{
    timeList.Add(reportFrom);
    reportFrom = reportFrom.AddMinutes(inc);
}    

var toFillTime0 = new List<object> { -1, 0, 0, 0, 0 };

var dataToFill = spark.CreateDataFrame(
    new List<GenericRow> { new GenericRow(toFillTime0.ToArray()) },
    new StructType(                     //shema
    new List<StructField>()
    {
            new StructField("Year0", new IntegerType()),
            new StructField("Month0", new IntegerType()),
            new StructField("Day0", new IntegerType()),
            new StructField("Hour0", new IntegerType()),
            new StructField("Minute0", new IntegerType()),
    }));

foreach (DateTime time in timeList)
{

    var toFillTime = new List<object> { time.Year, time.Month, time.Day, time.Hour, time.Minute };

    var dataToFillt = spark.CreateDataFrame(
        new List<GenericRow> { new GenericRow(toFillTime.ToArray()) },
        new StructType(                     //shema
        new List<StructField>()
        {
            new StructField("Year0", new IntegerType()),
            new StructField("Month0", new IntegerType()),
            new StructField("Day0", new IntegerType()),
            new StructField("Hour0", new IntegerType()),
            new StructField("Minute0", new IntegerType()),
        }));

    dataToFill = dataToFill.Union(dataToFillt);

}

dataToFill = dataToFill.Filter("Year0 > 0");    

var toFillReportDataReq = dataToFill.Join(rawData,
                dataToFill["Year0"] == update10["Year"] & dataToFill["Month0"] == update10["Month"] & dataToFill["Day0"] == update10["Day"]
                & dataToFill["Hour0"] == update10["Hour"] & dataToFill["Minute0"] == update10["Minute"], "left_outer");    

A few rows of toFillReportDataReq are shown bellow:

|2021|    3|  4|  22|     4|                87|               Power|               0.0|
|2021|    3|  4|  22|     5|                87|               Power|               0.0|
|2021|    3|  4|  22|     6|                87|               Power|               0.0|
|2021|    3|  4|  22|     7|                87|               Power|               0.0|
|2021|    3|  4|  22|     8|              null|                null|              null|
|2021|    3|  4|  22|     9|              null|                null|              null|
|2021|    3|  4|  22|    10|              null|                null|              null|
|2021|    3|  4|  22|    11|              null|                null|              null|
|2021|    3|  4|  22|    12|              null|                null|              null|
|2021|    3|  4|  22|    13|              null|                null|              null|
|2021|    3|  4|  22|    14|              null|                null|              null|

The replacement of null values in the column Values is already covered using window and last function.

The values in columns Id and Type are replaced with var id = 87 and "Power" using

toFillReportDataReq = toFillReportDataReq.WithColumn("Id", Functions.When(toFillReportDataReq["Id"].IsNull(), id)
   .Otherwise(toFillReportDataReq["Id"]))
   .WithColumn("Type", Functions.When(toFillReportDataReq["Type"].IsNull(), "Power")
    .Otherwise(toFillReportDataReq["Type"]));

This method returns the results I want, but it is very time consuming (inefficient).

My questions are following:

  • Is there a more sufficient way to create a new DataFrame containing all the minutes between the intervals specified?
  • Is there any way to avoid Join in this method?
  • What is the best way to define all the values in the column Id to id and Type to "Power"?

Thanks!


Solution

  • This is the approach I would take:

    1. Build a DataFrame that has a row for every minute you want to represent (I used spark.Range to project a row for every minute I need)
    2. For each ID from Range, add one minute to the start date
    3. Join the date's to your original data frame using a left_outer join so you don't lose any rows
    4. Then use Last to fill in any gaps - note if you start with a null, the newValue will be null until you get some data
    using System;
    using System.Collections.Generic;
    using Microsoft.Spark.Sql;
    using Microsoft.Spark.Sql.Expressions;
    using Microsoft.Spark.Sql.Types;
    
    namespace StackOverflow
    {
        class Program
        {
            static void Main(string[] args)
            {
                var spark = SparkSession.Builder().GetOrCreate();
                
                //Sample data set - we will fill in the missing minutes
                var df = spark.CreateDataFrame(new List<GenericRow>()
                {
                    new GenericRow(new object[] {2021, 3, 4, 8, 3, 87, "Type1", 380.5}),
                    new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
                    new GenericRow(new object[] {2021, 3, 4, 8, 20, null, null, null}),
                    new GenericRow(new object[] {2021, 3, 4, 8, 25, null, null, null}),
                    new GenericRow(new object[] {2021, 3, 4, 8, 35, 87, "Type1", 0.0}),
                    new GenericRow(new object[] {2021, 3, 4, 8, 45, 87, "Type1", 0.0})
                }, new StructType(new List<StructField>()
                {
                    new StructField("Year", new IntegerType()),
                    new StructField("Month", new IntegerType()),
                    new StructField("Day", new IntegerType()),
                    new StructField("Hour", new IntegerType()),
                    new StructField("Minute", new IntegerType()),
                    new StructField("ID", new IntegerType()),
                    new StructField("Type", new StringType()),
                    new StructField("Value", new DoubleType()),
                }));
                
                //start and end time
                var reportFrom = new DateTime(2021, 3, 4, 7, 0, 0);
                var reportTo = new DateTime(2021, 3, 4, 9, 0, 0);
                
                //convert start time to unix epoch as we can't pass a DateTime to spark (yet!)
                var unixFromTime = (reportFrom - new DateTime(1970, 1, 1, 0, 0, 0, 0)).TotalSeconds;
                
                //how many total rows do we need?
                var minutesToCreate = reportTo.Subtract(reportFrom).TotalMinutes;
                
                //create a dataframe with 1 row for every minute we need
                var everyMinute = spark.Range((long) minutesToCreate);
                
                //Add the reportFrom unix epoch
                everyMinute = everyMinute.WithColumn("BaseTime", Functions.Lit(unixFromTime));
                
                //add to the unix epoch, the Id (incrementing number) multiplied by 60 - if we didn't mul(60) it would add seconds and not minutes
                everyMinute = everyMinute.WithColumn("Time",
                    Functions.Lit(unixFromTime)
                        .Plus(Functions.Col("Id").Cast("Int").Multiply(Functions.Lit(60))));
                
                //convert the unix epoch to an actual timestamp and drop all the intermediate columns
                everyMinute = everyMinute.WithColumn("Date",
                    Functions.ToTimestamp(Functions.FromUnixTime(Functions.Col("Time")))).Select("Date");
                    
                //convert timestamp into individual columns
    
                everyMinute = everyMinute.WithColumn("Year", Functions.Year(Functions.Col("Date")));
                everyMinute = everyMinute.WithColumn("Month", Functions.Month(Functions.Col("Date")));
                everyMinute = everyMinute.WithColumn("Day", Functions.DayOfMonth(Functions.Col("Date")));
                everyMinute = everyMinute.WithColumn("Hour", Functions.Hour(Functions.Col("Date")));
                everyMinute = everyMinute.WithColumn("Minute", Functions.Minute(Functions.Col("Date")));
    
                //join both data frames so...
                var dfAllData = everyMinute.Join(df, new List<string>() {"Year", "Month", "Day", "Hour", "Minute"}, "left_outer");
                
                //add in data using Last
                var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
                var filledDataFrame = dfAllData.WithColumn("newValue",
                    Functions.When(dfAllData["Value"].IsNull(),
                            Functions.Last(dfAllData["Value"], true).Over(window))
                        .Otherwise(dfAllData["Value"]));
    
                filledDataFrame.Show(1000, 10000);
            }
        }
    }
    

    ed