Search code examples
sessionhadoopamazon-web-servicesapache-pigelastic-map-reduce

Sessionized web logs, get previous and next domain


We have a large pile of web log data. We need to sessionize it, and also generate the previous domain, and next domain for each session. I am testing via an interactive job flow on AWS EMR.

Right now I'm able to get the data sessionized using this code here: http://goo.gl/L52Wf . It took a little work to get familiar with compiling and using a UDF, but I've made it that far.

Here is the header row and first line from the input file (tab delimited):

ID  Date    Rule code   Project UID respondent_uid  Type    Tab ID  URL domain  URL path    Duration    Exit cause  Details
11111111    2012-09-25T11:21:20.000Z    20120914_START_USTEST   20120914_TESTSITE_US_TR test6_EN_9  PAGE_VIEWED FF1348568479042 http://www.google.fr        11  OTHER   

This is a tuple from the SESSIONS relation (steps to get relation are shown below):

(2012-09-27 04:42:20.000,11999603,20120914_URL_ALL,20120914_TESTSITE_US_TR,2082810875_US_9,PAGE_VIEWED,CH17,http://hotmail.com,_news/2012/09/26/14113684,28,WINDOW_DEACTIVATED,,3019222a-5c4d-4767-a82e-2b4df5d9db6d)

This is roughly what I'm running right now to sessionize the test data:

register s3://TestBucket/Sessionize.jar

define Sessionize datafu.pig.sessions.Sessionize('30m');

A = load 's3://TestBucket/party2.gz' USING PigStorage() as (id: chararray, data_date: chararray, rule_code: chararray, project_uid: chararray, respondent_uid: chararray, type: chararray, tab_id: chararray, url_domain: chararray, url_path: chararray, duration: chararray, exit_cause: chararray, details: chararray);

B = foreach A generate $1, $0, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11;

C = filter B by id neq 'ID';

VIEWS = group C by (respondent_uid, url_domain);

SESSIONS = foreach VIEWS { VISITS = order C by data_date; generate FLATTEN(Sessionize(VISITS)) as (data_date: chararray, id: chararray, rule_code: chararray, project_uid: chararray, respondent_uid: chararray, type: chararray, tab_id: chararray, url_domain: chararray, url_path: chararray, duration: chararray, exit_cause: chararray, details: chararray, session_id); }

(The step at B is to move the date to the first position. Step at C is to filter out the file header)

I'm lost as far as the right direction to go with this from here.

Can I iterate over my SESSIONS relation with foreach and get next and previous domain from pig script? Would it be better to write a custom UDF and pass the SESSIONS relation to that? (Writing my own UDF would be an adventure!..)

Any advice would be greatly appreciated. Even if someone could recommend what NOT to do, might be just as helpful, so I don't waste time researching a junk approach. I'm quite new to Hadoop and pig script so this is definitely not one of my strong areas (yet..).


Solution

  • I wouldn't at all be surprised if someone could improve on the solution below, however, it works for my situation. I used the sessionize UDF (mentioned in my question) as a reference for writing the below UDF.

    import java.io.IOException;
    import java.util.ArrayList;
    import org.apache.pig.Accumulator;
    import org.apache.pig.EvalFunc;
    import org.apache.pig.data.BagFactory;
    import org.apache.pig.data.DataBag;
    import org.apache.pig.data.DataType;
    import org.apache.pig.data.Tuple;
    import org.apache.pig.data.TupleFactory;
    import org.apache.pig.impl.logicalLayer.FrontendException;
    import org.apache.pig.impl.logicalLayer.schema.Schema;
    
    public class PreviousNext extends EvalFunc<DataBag> implements Accumulator<DataBag>
    {
    
        private DataBag outputBag;
        private String previous;
        private String next;
    
        public PreviousNext()
        {
            cleanup();
        }
    
        @Override
        public DataBag exec(Tuple input) throws IOException 
        {   
            accumulate(input);
            DataBag outputBag = getValue();
            cleanup();
    
            return outputBag;
        }
    
        @Override
        public void accumulate(Tuple input) throws IOException 
        {
            ArrayList<String> domains = new ArrayList<String>();
    
            DataBag d = (DataBag)input.get(0);
    
            //put all domains into ArrayList to allow for
            //accessing specific indexes
            for(Tuple t : d)
            {
                domains.add((String)t.get(2));
            }
    
            //add empty string for "next domain" value for last iteration
            domains.add("");
    
            int i = 0;
    
            previous = "";
    
            for(Tuple t : d)
            {   
                next = domains.get(i+1);
    
                Tuple t_new = TupleFactory.getInstance().newTuple(t.getAll());
    
                t_new.append(previous);
                t_new.append(next);
    
                outputBag.add(t_new);
    
                //current domain is previous for next iteration
                previous = domains.get(i);
    
                i++;
            }
    
        }
    
        @Override
        public void cleanup() 
        {
            this.outputBag = BagFactory.getInstance().newDefaultBag();
    
        }
    
        @Override
        public DataBag getValue() 
        {
            return outputBag;
        }
    
    
        @Override
        public Schema outputSchema(Schema input)
          {
            try 
            {
              Schema.FieldSchema inputFieldSchema = input.getField(0);
    
              if (inputFieldSchema.type != DataType.BAG)
              {
                throw new RuntimeException("Expected a BAG as input");
              }
    
              Schema inputBagSchema = inputFieldSchema.schema;
    
              if (inputBagSchema.getField(0).type != DataType.TUPLE)
              {
                throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s", DataType.findTypeName(inputBagSchema.getField(0).type)));
              }
    
              Schema inputTupleSchema = inputBagSchema.getField(0).schema;
    
              Schema outputTupleSchema = inputTupleSchema.clone();
    
              outputTupleSchema.add(new Schema.FieldSchema("previous_domain", DataType.CHARARRAY));
    
              outputTupleSchema.add(new Schema.FieldSchema("next_domain", DataType.CHARARRAY));
    
              return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),outputTupleSchema,DataType.BAG));
            }
            catch (CloneNotSupportedException e) 
            {
              throw new RuntimeException(e);
            }
    
            catch (FrontendException e) 
            {
              throw new RuntimeException(e);
            }
          }
    
    
    }