Search code examples
apache-sparkdataframeapache-spark-sqlsql-timestamp

Getting weekly and daily averages of timestamp data


I currently have data on a Spark data frame that is formatted as such:

Timestamp    Number
.........    ......
M-D-Y        3
M-D-Y        4900

The timestamp data is in no way uniform or consistent (i.e., I could have one value that is present on March 1, 2015, and the next value in the table be for the date September 1, 2015 ... also, I could have multiple entries per date).

So I wanted to do two things

  1. Calculate the number of entries per week. So I would essentially want a new table that represented the number of rows in which the timestamp column was in the week that the row corresponded to. If there are multiple years present, I would ideally want to average the values per each year to get a single value.
  2. Average the number column for each week. So for every week of the year, I would have a value that represents the average of the number column (0 if there is no entry within that week).

Solution

  • Parsing date is relatively easy using built-in functions by combining unix_timestamp and simple type casting:

    sqlContext.sql(
      "SELECT CAST(UNIX_TIMESTAMP('March 1, 2015', 'MMM d, yyyy') AS TIMESTAMP)"
    ).show(false)
    
    // +---------------------+
    // |_c0                  |
    // +---------------------+
    // |2015-03-01 00:00:00.0|
    // +---------------------+
    

    With DataFrame DSL equivalent code would be something like this:

    import org.apache.spark.sql.functions.unix_timestamp
    
    unix_timestamp($"date", "MMM d, yyyy").cast("timestamp")
    

    To fill missing entries you can use different tricks. The simplest approach is to use the same parsing logic as above. First let's create a few helpers:

    def leap(year: Int) = {
      ((year % 4 == 0) && (year % 100 != 0)) || (year % 400 == 0)
    }
    def weeksForYear(year: Int) = (1 to 52).map(w => s"$year $w")
    def daysForYear(year: Int) = (1 to { if(leap(2000)) 366 else 366 }).map(
      d => s"$year $d"
    )
    

    and example reference data (here for weeks but you can do the same thing for days):

    import org.apache.spark.sql.functions.{year, weekofyear}'
    
    val exprs = Seq(year($"date").alias("year"), weekofyear($"date").alias("week"))
    
    val weeks2015 = Seq(2015)
      .flatMap(weeksForYear _)
      .map(Tuple1.apply)
      .toDF("date")
      .withColumn("date", unix_timestamp($"date", "yyyy w").cast("timestamp"))
      .select(exprs: _*)
    

    Finally you can transform the original data:

    val df = Seq(
      ("March 1, 2015", 3), ("September 1, 2015", 4900)).toDF("Timestamp", "Number")
    
    val dfParsed = df
      .withColumn("date", unix_timestamp($"timestamp", "MMM d, yyyy").cast("timestamp"))
      .select(exprs :+ $"Number": _*)
    

    merge and aggregate:

    weeks2015.join(dfParsed, Seq("year", "week"), "left")
      .groupBy($"year", $"week")
      .agg(count($"Number"), avg($"Number"))
      .na.fill(0)