We have a large pile of web log data. We need to sessionize it, and also generate the previous domain, and next domain for each session. I am testing via an interactive job flow on AWS EMR.
Right now I'm able to get the data sessionized using this code here: http://goo.gl/L52Wf . It took a little work to get familiar with compiling and using a UDF, but I've made it that far.
Here is the header row and first line from the input file (tab delimited):
ID Date Rule code Project UID respondent_uid Type Tab ID URL domain URL path Duration Exit cause Details
11111111 2012-09-25T11:21:20.000Z 20120914_START_USTEST 20120914_TESTSITE_US_TR test6_EN_9 PAGE_VIEWED FF1348568479042 http://www.google.fr 11 OTHER
This is a tuple from the SESSIONS
relation (steps to get relation are shown below):
(2012-09-27 04:42:20.000,11999603,20120914_URL_ALL,20120914_TESTSITE_US_TR,2082810875_US_9,PAGE_VIEWED,CH17,http://hotmail.com,_news/2012/09/26/14113684,28,WINDOW_DEACTIVATED,,3019222a-5c4d-4767-a82e-2b4df5d9db6d)
This is roughly what I'm running right now to sessionize the test data:
register s3://TestBucket/Sessionize.jar
define Sessionize datafu.pig.sessions.Sessionize('30m');
A = load 's3://TestBucket/party2.gz' USING PigStorage() as (id: chararray, data_date: chararray, rule_code: chararray, project_uid: chararray, respondent_uid: chararray, type: chararray, tab_id: chararray, url_domain: chararray, url_path: chararray, duration: chararray, exit_cause: chararray, details: chararray);
B = foreach A generate $1, $0, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11;
C = filter B by id neq 'ID';
VIEWS = group C by (respondent_uid, url_domain);
SESSIONS = foreach VIEWS { VISITS = order C by data_date; generate FLATTEN(Sessionize(VISITS)) as (data_date: chararray, id: chararray, rule_code: chararray, project_uid: chararray, respondent_uid: chararray, type: chararray, tab_id: chararray, url_domain: chararray, url_path: chararray, duration: chararray, exit_cause: chararray, details: chararray, session_id); }
(The step at B is to move the date to the first position. Step at C is to filter out the file header)
I'm lost as far as the right direction to go with this from here.
Can I iterate over my SESSIONS
relation with foreach
and get next and previous domain from pig script? Would it be better to write a custom UDF and pass the SESSIONS
relation to that? (Writing my own UDF would be an adventure!..)
Any advice would be greatly appreciated. Even if someone could recommend what NOT to do, might be just as helpful, so I don't waste time researching a junk approach. I'm quite new to Hadoop and pig script so this is definitely not one of my strong areas (yet..).
I wouldn't at all be surprised if someone could improve on the solution below, however, it works for my situation. I used the sessionize UDF (mentioned in my question) as a reference for writing the below UDF.
import java.io.IOException;
import java.util.ArrayList;
import org.apache.pig.Accumulator;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
public class PreviousNext extends EvalFunc<DataBag> implements Accumulator<DataBag>
{
private DataBag outputBag;
private String previous;
private String next;
public PreviousNext()
{
cleanup();
}
@Override
public DataBag exec(Tuple input) throws IOException
{
accumulate(input);
DataBag outputBag = getValue();
cleanup();
return outputBag;
}
@Override
public void accumulate(Tuple input) throws IOException
{
ArrayList<String> domains = new ArrayList<String>();
DataBag d = (DataBag)input.get(0);
//put all domains into ArrayList to allow for
//accessing specific indexes
for(Tuple t : d)
{
domains.add((String)t.get(2));
}
//add empty string for "next domain" value for last iteration
domains.add("");
int i = 0;
previous = "";
for(Tuple t : d)
{
next = domains.get(i+1);
Tuple t_new = TupleFactory.getInstance().newTuple(t.getAll());
t_new.append(previous);
t_new.append(next);
outputBag.add(t_new);
//current domain is previous for next iteration
previous = domains.get(i);
i++;
}
}
@Override
public void cleanup()
{
this.outputBag = BagFactory.getInstance().newDefaultBag();
}
@Override
public DataBag getValue()
{
return outputBag;
}
@Override
public Schema outputSchema(Schema input)
{
try
{
Schema.FieldSchema inputFieldSchema = input.getField(0);
if (inputFieldSchema.type != DataType.BAG)
{
throw new RuntimeException("Expected a BAG as input");
}
Schema inputBagSchema = inputFieldSchema.schema;
if (inputBagSchema.getField(0).type != DataType.TUPLE)
{
throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s", DataType.findTypeName(inputBagSchema.getField(0).type)));
}
Schema inputTupleSchema = inputBagSchema.getField(0).schema;
Schema outputTupleSchema = inputTupleSchema.clone();
outputTupleSchema.add(new Schema.FieldSchema("previous_domain", DataType.CHARARRAY));
outputTupleSchema.add(new Schema.FieldSchema("next_domain", DataType.CHARARRAY));
return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),outputTupleSchema,DataType.BAG));
}
catch (CloneNotSupportedException e)
{
throw new RuntimeException(e);
}
catch (FrontendException e)
{
throw new RuntimeException(e);
}
}
}