Search code examples
apache-pigdata-cleaning

Pig data cleaning based on date


I have 2 datasets as shown below:
1. ID and location

{ID, beginning year, ending year, location}. 

sample:

(1001, 2010, 2012, CA)
(1001, 2013, 2015, WA)
(1002, 2009, 2015, AZ)
(1003, 2014, 2015, FL)

2. ID and connection

{ID1, ID2, connection creating date}

sample:

(1001, 1002, 2013)
(1001, 1003, 2014)

I want to count the number of connections based on location and year. I assume once the connection is created, it never expires. The results I am looking for is below

{Location 1, Location2, year, number of connections}

In the example above, it should be:

(WA, AZ,2013,1)
(WA, AZ,2014,1)
(WA, AZ,2015,1)
(WA, FL,2014,1)
(WA, FL,2015,1)

Does anyone know how to accomplish that in Apache pig?


Solution

  • As mentioned in your comment, we will at some point need to move to yearly information. To minimizes impact of data size bloat up, we need to move it as far down as possible in our pig script. First thing we need to do is the following data translation:

    {ID1, ID2, connection creating date} -> {Location1, Location2, start_year, end_year}
    

    This can be achived with the following pig script statements:

    locationData = LOAD 'path1' USING PigStorage('\t') AS (ID:chararray, beginning_year:long, ending_year:long, location:chararray);
    connectionData = LOAD 'path2' USING PigStorage('\t') AS (ID1:chararray, ID2:chararray, connection_year:long);
    
    partialJoin = JOIN connectionData USING ID1, locationData USING ID;
    partialExtracted = FOREACH partialJoin GENERATE
                               ID2,
                               connection_year,
                               location AS location1,
                               (beginning_year > connection_year ? beginning_year : connection_year) AS start_year,
                               ending_year AS end_year;
    
    fullJoin = JOIN partialExtracted USING ID2, locationData USING ID;
    fullExtracted = FOREACH fullJoin GENERATE,
                               location1,
                               location AS location2,
                               (beginning_year > start_year ? beginning_year : start_year) AS start_year,
                               (ending_year < end_year ? ending_year : end_year ) AS end_year;
    
    fullFiltered = FILTER fullExtracted BY (end_year < start_year);
    

    We are now ready to explode the data to get yearly information. Essentially, the following data translation needs to happen:

    {Location1, Location2, start_year, end_year} -> {Location1, Location2, year}
    e.g.
    WA, AZ, 2013, 2015
    ->
    WA, AZ, 2013
    WA, AZ, 2014
    WA, AZ, 2015
    

    Here a UDF is unavoidable. We will need a UDF which takes start year and end year and returns a bag of the range of years. You should be able to follow online tutorial to write your UDF. Lets say this UDF is called getYearRange(). Your script will look as follows:

    fullExploded = FOREACH fullFiltered GENERATE
                           location1, location2,
                           FLATTEN(getYearRange(start_year, end_year)) AS year;
    

    All that remains is a GROUP BY to get your final counts:

    fullGrouped = GROUP fullExploded BY (location1, location2, year);
    finalOutput = FOREACH fullGrouped GENERATE 
                  FLATTEN(group) AS (location1, location2, year),
                  COUNT(fullExploded) AS count;
    

    The above describes the data flow. You might need to add additional steps to take care of edge cases and ensure data sanity.