I have a pig script which does grouping and count distinct customers as below
by_customer = GROUP customer BY (start_date, spc);
cust_cnt = FOREACH by_customer {
cust = DISTINCT customer.cid;
GENERATE FLATTEN(group), COUNT(cust);
};
The issue is that the last reducer hangs or fails due to memory issues. I can see that the data distributed among the reducers is highly skewed. Is there a way to distribute the output of group by such that each reducer gets only one grouped bag.
I fixed this issue by doing a workaround to get distinct count without using distinct keyword.
by_customer = GROUP customer BY (cid,start_date,spc);
dist_customer = FOREACH by_customer GENERATE group.start_date as start_date,group.spc as spc,1 as cst_cnt;
cust = GROUP dist_customer by (start_date,spc);
cust_cnt = FOREACH cust GENERATE FLATTEN(group), SUM(dist_customer.cst_cnt);
This worked perfectly. Not sure why distinct does not work.
Thanks for the assistance.