Search code examples
performancehadoophdfsapache-pig

How do I force PigStorage to output a few large files instead of thousands of tiny files?


I'm using these lines in my pig script:

set default_parallel 20;
requests = LOAD ‘/user/me/todayslogs.gz’ USING customParser;
intermediate_results = < some-processing ... >
some_data = FOREACH intermediate_results GENERATE day, request_id, result;
STORE some_data INTO '/user/me/output_data' USING PigStorage(',');

'/user/me/todayslogs.gz' contains thousands of gzipped files, each of size 200 MB.

When the script completes, '/user/me/output_data' has thousands of tiny (<1 KB ) files on HDFS.

I must read the files in '/user/me/output_data' in another pig script for further processing. I see that it hurts performance then. The performance is worse if the files output by some_data are gzip-ed.

Here's the output from the MapReduceLauncher.

2013-11-04 12:38:11,961 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Processing aliases campaign_join,detailed_data,detailed_requests,fields_to_retain,grouped_by_reqid,impressions_and_clicks,minimal_data,ids_cleaned,request_id,requests,requests_only,requests_typed,xids_from_request
2013-11-04 12:38:11,961 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - detailed locations: M: requests[30,11],campaign_join[35,16],null[-1,-1],null[-1,-1],detailed_requests[37,20],detailed_data[39,16],null[-1,-1],minimal_data[49,15],null[-1,-1],ids_cleaned[62,18],grouped_by_reqid[65,21] C:  R: null[-1,-1],xids_from_request[66,21],impressions_and_clicks[69,26],fields_to_retain[70,20],requests_only[67,17],request_id[68,18],requests_typed[73,17]

How do I force PigStorage to write the output into fewer output files?


Solution

  • The reason this is happening is because your job is map-only. There is no need for a reduce phase in the processing you do, so each mapper outputs records to its own file, and you end up with one file for each mapper. If you have thousands of input files, you have thousands of output files.

    The reason this goes away when you use an ORDER BY is because that triggers a reduce phase, at which point the default parallelism of 20 comes into play.

    If you want to avoid this behavior, you have to force a reduce phase somehow. Since you're already doing a JOIN, you could just choose to not do this USING 'replicated'. Alternatively, if you were in a situation where you weren't doing a join, you could force it using a do-nothing GROUP BY, like so:

    reduced = FOREACH (GROUP some_data BY RANDOM()) GENERATE FLATTEN(some_data);