Search code examples
apache-pigapache-datafu

DataFu BagGroup will group all the bags instead of group at the FOREACH scope. How to fix?


I am using DataFu to baggroup my a bag. The is as following:

pvlist_grp = GROUP pvlist by uid;
uid_vid_pv = FOREACH pvlist_grp {
                vids = FOREACH pvlist GENERATE date, vid;   
                GENERATE uid,
                vids as vid,
                BagGroup(pvlist.(date, uid, vid), pvlist.date) as grouped;
                }
uid_vid_pv: {uid: chararray,vid: {(date: chararray,vid: chararray)},grouped: {(group: chararray,{(date: chararray,uid: chararray,vid: chararray)})}}

When I dump the first 10, I see all the vids that contains (date, vid) for each uid. However, the grouped shows other uid records. For example:

(60,{(20160103,255),(20160103,255),(20160103,257),(20160103,255),(20160101,252)},{(20160103,{(20160103,21,18),(20160103,21,453),(20160103,21,452),(20160103,21,67),(20160103,21,18),(20160103,21,455),(20160103,21,43),(20160103,21,453),(20160103,21,16),(20160103,21,45),(20160103,21,18),(20160103,21,18),(20160103,21,67),(20160103,21,455),.............})})

The dumped result shows the baggroup with other uid data in it. It groups the entire vid bags from all uids, but I want it just per uid.

The idea result should be like:

(60,{(20160103,255),(20160103,255),(20160103,257),(20160103,255),(20160101,252)},{(20160103,{(20160103,255),(20160103,255),(20160103,257),(20160103,255)}),(20160101,{(20160101,252)})})

Any Help why? I am using pig 1.2.0.

UPdate:

Looks like the BagGroup sort of calls operation from memory. So the first uid BagGroup is always correct, then it adds the bags to operate together from what has been processed before. IE. if the first record is uid 21, then the BagGroup has all the 21 results Grouped. Next, if the second record uid 60, then the BagGroup will output 21 and 60 results together.


Solution

  • I had exactly the same problem. In order to solve it, I had to modify the BagGroup UDF (ver.1.2.0). Adding groups.clear(); at the beginning of the exec method resolves this issue.

    @SuppressWarnings("unchecked")
    @Override
    public DataBag exec(Tuple input) throws IOException {
        fieldNames = (List<String>)getInstanceProperties().get(FIELD_NAMES_PROPERTY);
    
        DataBag inputBag = (DataBag)input.get(0);   
    
        groups.clear();
    
        for (Tuple tuple : inputBag) {
            Tuple key = extractKey(tuple);
            addGroup(key, tuple);
        }
    }