Search code examples
mapreduceboto3hadoop-streamingcommon-crawl

Processing many WARC archives from CommonCrawl using Hadoop Streaming and MapReduce


I am working on a project in which I need to download crawl data (from CommonCrawl) for specific URLs from an S3 container and then process that data.

Currently I have a MapReduce job (Python via Hadoop Streaming) which gets the correct S3 file paths for a list of URLs. Then I am trying to use a second MapReduce job to process this output by downloading the data from the commoncrawl S3 bucket. In the mapper I am using boto3 to download the gzip contents for a specific URL from the commoncrawl S3 bucket and then output some information about the the gzip contents (word counter information, content length, URLs linked to, etc.). The reducer then goes through this output to get the final word count, URL list, etc.

The output file from the first MapReduce job is only about 6mb in size (but will be larger once we scale to the full dataset). When I run the second MapReduce, this file is only split twice. Normally this is not a problem for such a small file, but the mapper code I described above (fetching S3 data, spitting out mapped output, etc.) takes a while to run for each URL. Since the file is only splitting twice, there are only 2 mappers being run. I need to increase the number of splits so that the mapping can be done faster.

I have tried setting "mapreduce.input.fileinputformat.split.maxsize" and "mapreduce.input.fileinputformat.split.minsize" for the MapReduce job, but it doesn't change the number of splits taking place.

Here is some of the code from the mapper:

s3 = boto3.client('s3', 'us-west-2', config=Config(signature_version=UNSIGNED))
offset_end = offset + length - 1

gz_file = s3.get_object(Bucket='commoncrawl', Key=filename, Range='bytes=%s-%s' % (offset, offset_end))[
        'Body'].read()

fileobj = io.BytesIO(gz_file)

with gzip.open(fileobj, 'rb') as file:
    [do stuff]

I also manually split the input file up into multiple files with a maximum of 100 lines. This had the desired effect of giving me more mappers, but then I began encountering a ConnectionError from the s3client.get_object() call:

Traceback (most recent call last):
  File "dmapper.py", line 103, in <module>
    commoncrawl_reader(base_url, full_url, offset, length, warc_file)
  File "dmapper.py", line 14, in commoncrawl_reader
    gz_file = s3.get_object(Bucket='commoncrawl', Key=filename, Range='bytes=%s-%s' % (offset, offset_end))[
  File "/usr/lib/python3.6/site-packages/botocore/client.py", line 314, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/usr/lib/python3.6/site-packages/botocore/client.py", line 599, in _make_api_call
    operation_model, request_dict)
  File "/usr/lib/python3.6/site-packages/botocore/endpoint.py", line 148, in make_request
    return self._send_request(request_dict, operation_model)
  File "/usr/lib/python3.6/site-packages/botocore/endpoint.py", line 177, in _send_request
    success_response, exception):
  File "/usr/lib/python3.6/site-packages/botocore/endpoint.py", line 273, in _needs_retry
    caught_exception=caught_exception, request_dict=request_dict)
  File "/usr/lib/python3.6/site-packages/botocore/hooks.py", line 227, in emit
    return self._emit(event_name, kwargs)
  File "/usr/lib/python3.6/site-packages/botocore/hooks.py", line 210, in _emit
    response = handler(**kwargs)
  File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 183, in __call__
    if self._checker(attempts, response, caught_exception):
  File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 251, in __call__
    caught_exception)
  File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 277, in _should_retry
    return self._checker(attempt_number, response, caught_exception)
  File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 317, in __call__
    caught_exception)
  File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 223, in __call__
    attempt_number, caught_exception)
  File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 359, in _check_caught_exception
    raise caught_exception
  File "/usr/lib/python3.6/site-packages/botocore/endpoint.py", line 222, in _get_response
    proxies=self.proxies, timeout=self.timeout)
  File "/usr/lib/python3.6/site-packages/botocore/vendored/requests/sessions.py", line 573, in send
    r = adapter.send(request, **kwargs)
  File "/usr/lib/python3.6/site-packages/botocore/vendored/requests/adapters.py", line 415, in send
    raise ConnectionError(err, request=request)
botocore.vendored.requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))

I am currently running this with only a handful of URLs, but I will need to do it with several thousand (each with many subdirectories) once I get it working.

I am not certain where to start with fixing this. I feel that it is highly likely there is better approach than what I am trying. The fact that the mapper seems to take so long for each URL seems like a big indication that I am approaching this wrong. I should also mention that the mapper and the reducer both run correctly if run directly as a pipe command:

"cat short_url_list.txt | python mapper.py | sort | python reducer.py" -> Produces desired output, but would take too long to run on the entire list of URLs.

Any guidance would be greatly appreciated.


Solution

  • The MapReduce API provides the NLineInputFormat. The property "mapreduce.input.lineinputformat.linespermap" allows to control how many lines (here WARC records) are passed to a mapper at maximum. It works with mrjob, cf. Ilya's WARC indexer.

    Regarding the S3 connection error: it's better to run the job in the us-east-1 AWS region where the data is located.