Search code examples
hadoopamazon-web-servicesamazon-s3elastic-map-reduceemr

Concatenate S3 files to read in EMR


I have an S3 bucket with log files that I want to concatenate, then use as an input to an EMR job. The log files are in paths like: bucket-name/[date]/product/out/[hour]/[minute-based-file]. I'd like to take all the minute logs in all the hour directories in all the date directories, and concatenate them into one file. I want to use that file as an input to an EMR job. The original log files need to be preserved, and the new combined log file will probably be written to a different S3 bucket.

I tried using hadoop fs -getmerge on the EMR master node via SSH, but got this error:

This file system object (file:///) does not support access to the request path 's3://target-bucket-name/merged.log'

The source S3 bucket has some other files in it, so I don't want to include all of its files. The wildcard match looks like this: s3n://bucket-name/*/product/out/*/log.*.

The purpose is to get around the problem of having tens/hundreds of thousands of small (10k-3mb) input files to EMR, and instead give it one large file that it can split more efficiently.


Solution

  • I ended up just writing a script that wraps some Hadoop filesystem commands to do this.

    #!/usr/bin/env ruby
    
    require 'date'
    
    # Merge minute-based log files into daily log files
    # Usage: Run on EMR master (e.g. SSH to master then `ruby ~/merge-historical-logs.rb [FROM [TO]]`)
    
    SOURCE_BUCKET_NAME      = 's3-logs-bucket'
    DESTINATION_BUCKET_NAME = 's3-merged-logs-bucket'
    
    # Optional date inputs
    min_date = if ARGV[0]
      min_date_args = ARGV[0].split('-').map {|item| item.to_i}
      Date.new(*min_date_args)
    else
      Date.new [2012, 9, 1]
    end
    
    max_date = if ARGV[1]
      max_date_args = ARGV[1].split('-').map {|item| item.to_i}
      Date.new(*max_date_args)
    else
      Date.today
    end
    
    # Setup directories
    hdfs_logs_dir = '/mnt/tmp/logs'
    local_tmp_dir = './_tmp_merges'
    
    puts "Cleaning up filesystem"
    system "hadoop fs -rmr #{hdfs_logs_dir}"
    system "rm -rf #{local_tmp_dir}*"
    
    puts "Making HDFS directories"
    system "hadoop fs -mkdir #{hdfs_logs_dir}"
    
    # We will progress backwards, from max to min
    date = max_date
    while date >= min_date
      # Format date pieces
      year  = date.year
      month = "%02d" % date.month
      day   = "%02d" % date.day
    
      # Make a directory in HDFS to store this day's hourly logs
      today_hours_dir = "#{hdfs_logs_dir}/#{year}-#{month}-#{day}"
      puts "Making today's hourly directory"
      system "hadoop fs -mkdir #{today_hours_dir}"
    
      # Break the day's hours into a few chunks
      # This seems to avoid some problems when we run lots of getmerge commands in parallel
      [*(0..23)].each_slice(8).to_a.each do |hour_chunk|
        hour_chunk.each do |_hour|
          hour = "%02d" % _hour
    
          # Setup args to merge minute logs into hour logs
          source_file = "s3://#{SOURCE_BUCKET_NAME}/#{year}-#{month}-#{day}/product/out/#{hour}/"
          output_file = "#{local_tmp_dir}/#{hour}.log"
    
          # Launch each hour's getmerge in the background
          full_command = "hadoop fs -getmerge #{source_file} #{output_file}"
          puts "Forking: #{full_command}"
          fork { system full_command }
        end
    
        # Wait for this batch of the germerge's to finish
        Process.waitall
      end
    
      # Delete the local temp files Hadoop created
      puts "Removing temp files"
      system "rm #{local_tmp_dir}/.*.crc"
    
      # Move local hourly logs to hdfs to free up local space
      puts "Moving local logs to HDFS"
      system "hadoop fs -put #{local_tmp_dir}/* #{today_hours_dir}"
    
      puts "Removing local logs"
      system "rm -rf #{local_tmp_dir}"
    
      # Merge the day's hourly logs into a single daily log file
      daily_log_file_name = "#{year}-#{month}-#{day}.log"
      daily_log_file_path = "#{local_tmp_dir}_day/#{daily_log_file_name}"
      puts "Merging hourly logs into daily log"
      system "hadoop fs -getmerge #{today_hours_dir}/ #{daily_log_file_path}"
    
      # Write the daily log file to another s3 bucket
      puts "Writing daily log to s3"
      system "hadoop fs -put #{daily_log_file_path} s3://#{DESTINATION_BUCKET_DIR}/daily-merged-logs/#{daily_log_file_name}"
    
      # Remove daily log locally
      puts "Removing local daily logs"
      system "rm -rf #{local_tmp_dir}_day"
    
      # Remove the hourly logs from HDFS
      puts "Removing HDFS hourly logs"
      system "hadoop fs -rmr #{today_hours_dir}"
    
      # Go back in time
      date -= 1
    end