Search code examples
pythonmapreducebotoelastic-map-reducemrjob

How can I use s3 object names as inputs to an MRJob mapper, but not the s3 objects themselves?


I'm missing something obvious about Yelp's mrjob job library. Setting up an MRJob class is almost trivially easy. Running it over a file or stdin also so. But how can I change the input to the job from a file either locally or in s3, to, say, keys in an s3 bucket?

Something like this. Suppose I wanted to count all objects in my S3 bucket that start with the string 'foo':

import re

class MRCountS3Objects(MRJob):

    define mapper(self, _, botoS3Key):
        if re.match('^foo', botoS3Key.name):
            yield 'foo', 1

    define reduce(self, name, occurrences):
        yield name, sum(occurrences)

It's a highly contrived example, but you probably get my drift. How can I tell MRJob to operate over a stream of s3 objects, ignoring the content of the objects? I saw the S3Filesystem.get_s3_keys() method, which gets me exactly the stream I need, but I'm not sure where to go from there.


Solution

  • Figured at least one way to accomplish this. Your MRJob has a stdin attribute that can be assigned to any iterator, and then you can run the job programmatically. This code, for example, should work over my-bucket's key names:

    from mrjob.job import MRJob
    from mrjob.emr import EMRJobRunner
    
    class MRS3KeyProcessor(MRJob):
        # Do some MRJob stuff.
        ...
    
    def s3_name_generator(bucket):
        """Generator that returns boto.s3.Key names.
        """
        # Could also use raw boto here.
        emr = EMRJobRunner()
        key_stream = emr.fs.get_s3_keys(bucket)
        for key in key_stream:
            yield key.name
    
    def main():
        # The '-' argument signifies that we use stdin.
        mr_job = MRCountS3Objects(['--runner', 'inline', '-'])
        stdin = s3_name_generator('my-bucket')
        mr_job.stdin = stdin
        results = []
        with mr_job.make_runner() as runner:
            runner.run()
            for line in runner.stream_output():
                key, value = mr_job.parse_output_line(line)
                results.append((key, value))
        print(results)
    
    if __name__ == '__main__':
        main()