Search code examples
hadoopapache-pig

How to create missing records within date-time range in pig latin


I have input records of the form

2013-07-09T19:17Z,f1,f2
2013-07-09T03:17Z,f1,f2
2013-07-09T21:17Z,f1,f2
2013-07-09T16:17Z,f1,f2
2013-07-09T16:14Z,f1,f2
2013-07-09T16:16Z,f1,f2
2013-07-09T01:17Z,f1,f2
2013-07-09T16:18Z,f1,f2

These represent timestamps and events. I have written these by hand, but actual data should be sorted based on time.

I would like to generate a set of records which would be input to graph plotting function which needs continuous time series. I would like to fill in missing values, i.e. if there are entries for "2013-07-09T19:17Z" and "2013-07-09T19:19Z", I would like to generate entry for "2013-07-09T19:18Z" with predefined value.

My thoughts on doing this:

  1. Use MIN and MAX to find the start and end date in the series
  2. Write UDF which takes min and max and returns relation with missing timestamps
  3. Join above 2 relations

I cannot get my head around on how to implement this in PIG though. Would appreciate any help.

Thanks!


Solution

  • Generate another file using a script (outside pig)with all time stamps between MIN and MAX , including MIN and MAX. Load this as a second data set. Here is a sample that I used from your data set. Please note I filled in only few gaps not all.

    2013-07-09T01:17Z,d1,d2
    2013-07-09T01:18Z,d1,d2
    2013-07-09T03:17Z,d1,d2
    2013-07-09T16:14Z,d1,d2
    2013-07-09T16:15Z,d1,d2
    2013-07-09T16:16Z,d1,d2
    2013-07-09T16:17Z,d1,d2
    2013-07-09T16:18Z,d1,d2
    2013-07-09T19:17Z,d1,d2
    2013-07-09T21:17Z,d1,d2
    

    Do a COGROUP on the original dataset and the generated dataset above. Use a nested FOREACH GENERATE to write output dataset. If first dataset is empty, use the values from second set to generate output dataset else the first dataset. Here is the piece of code I used on these two datasets.

    Org_Set = LOAD 'pigMissingData/timeSeries' USING PigStorage(',') AS (timeStamp, fl1, fl2);
    Default_set = LOAD 'pigMissingData/timeSeriesFull' USING PigStorage(',') AS (timeStamp, fl1, fl2);
    coGrouped = COGROUP Org_Set BY timeStamp, Default_set BY timeStamp;
    
    Filled_Data_set = FOREACH coGrouped {
        x = COUNT(times);
        y = (x == 0?  (Default_set.fl1, Default_set.fl2): (Org_Set.fl1, Org_Set.fl2));
        GENERATE FLATTEN(group), FLATTEN(y.$0), FLATTEN(y.$1);
    };
    

    if you need further clarification or help let me know