Search code examples
sql-order-byapache-pig

Apache Pig: Does ORDER BY with PARALLEL ensure consistent hashing/distribution?


If I load one dataset, order it on a specific key with a parallel clause, and then store it, I can get multiple files, part-r-00000 through part-r-00XXX, depending on what I specify in the parallel statement.

If I then load a new dataset, say another day's worth of data, with some new keys, and some of the same keys, order it, and then store it, is there any way to guarantee that part-r-00000 from yesterday's data will contain the same keyspace as part-r-00000 from today's data?

Is there even a way to guarantee that all of the records will be contained in a single part file, or is it possible that a key could get split across 2 files, given enough records?

I guess the question is really about how the ordering function works in pig - does it use a consistent hash-mod algorithm to distribute data, or does it order the whole set, and then divide it up?

The intent or hope would be that if the keyspace is consistently partitioned, it would be easy enough to perform rolling merges of data per part file. If it is not, I guess the question becomes, is there some operator or way of writing pig to enable that sort of consistent hashing?

Not sure if my question is very clear, but any help would be appreciated - having trouble figuring it out based on the docs. Thanks in advance!


Solution

  • Alrighty, attempting to answer my own question.

    It appears that Pig does NOT have a way of ensuring said consistent distribution of results into files. This is partly based on docs, partly based on information about how hadoop works, and partly based on observation.

    When pig does a partitioned order-by (eg, using the PARALLEL clause to get more than one reducer), it seems to force an intermediate job between whatever comes before the order-by, and the ordering itself. From what I can tell, pig looks at 1-10% of the data (based on the number of mappers in the intermediate job being 1-10% of the number in the load step) and gets a sampled distribution of the keys you are attempting to sort on.

    My guess/thinking is that pig figures out the key distribution, and then uses a custom partitioner from the mappers to the reducers. The partitioner maps a range of keys to each reducer, so it becomes a simple lexical comparison - "is this record greater than my assigned end_key? pass it down the line to the next reducer."

    Of course, there are two factors to consider that basically mean that Pig would not be consistent on different datasets, or even on a re-run of the same dataset. For one, since pig is sampling data in the intermediate job, I imagine it's possible to get a different sample and thus a different key distribution. Also, consider an example of two different datasets with widely varying key distributions. Pig would necessarily come up with a different distribution, and thus if key X was in part-r-00111 one day, it would not necessarily end up there the next day.

    Hope this helps anyone else looking into this.

    EDIT

    I found a couple of resources from O'Reilly that seem to back up my hypothesis.

    One is about map reduce patterns. It basically describes the standard total-order problem as being solvable by a two-pass approach, one "analyze" phase and a final sort phase.

    The second is about pig's order by specifically. It says (in case the link ever dies):

    As discussed earlier in “Group”, skew of the values in data is very common. This affects order just as it does group, causing some reducers to take significantly longer than others. To address this, Pig balances the output across reducers. It does this by first sampling the input of the order statement to get an estimate of the key distribution. Based on this sample, it then builds a partitioner that produces a balanced total order... An important side effect of the way Pig distributes records to minimize skew is that it breaks the MapReduce convention that all instances of a given key are sent to the same partition. If you have other processing that depends on this convention, do not use Pig’s order statement to sort data for it... Also, Pig adds an additional MapReduce job to your pipeline to do the sampling. Because this sampling is very lightweight (it reads only the first record of every block), it generally takes less than 5% of the total job time.