Search code examples
amazon-web-servicesapache-pigamazon-emr

AWS's own example of submitting Pig job does not work due to issue with piggybank.jar


I have been trying to test out submitting Pig jobs on AWS EMR following Amazon's guide. I made the change to the Pig script to ensure that it can find the piggybank.jar as instructed by Amazon. When I run the script I get an ERROR 1070 indicated that one of functions available in piggybank cannot be resolved. Any ideas on what is going wrong?

Key part of error

2018-03-15 21:47:08,258 ERROR org.apache.pig.PigServer (main): exception 
during parsing: Error during parsing. Could not resolve 
org.apache.pig.piggybank.evaluation.string.EXTRACT using imports: [, 
java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
Failed to parse: Pig script failed to parse: <file s3://cis442f-
data/pigons3/do-reports4.pig, line 26, column 6> Failed to generate logical plan. Nested exception: org.apache.pig.backend.executionengine.ExecException: ERROR 1070: Could not resolve org.apache.pig.piggybank.evaluation.string.EXTRACT using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]

The first part of the script is as follows:

Line 26 referred to in the error is contains "EXTRACT("

register file:/usr/lib/pig/lib/piggybank.jar;
DEFINE EXTRACT org.apache.pig.piggybank.evaluation.string.EXTRACT;
DEFINE FORMAT org.apache.pig.piggybank.evaluation.string.FORMAT;
DEFINE REPLACE org.apache.pig.piggybank.evaluation.string.REPLACE;
DEFINE DATE_TIME org.apache.pig.piggybank.evaluation.datetime.DATE_TIME;
DEFINE FORMAT_DT org.apache.pig.piggybank.evaluation.datetime.FORMAT_DT;


--
-- import logs and break into tuples
--
raw_logs =
  -- load the weblogs into a sequence of one element tuples
  LOAD '$INPUT' USING TextLoader AS (line:chararray);

logs_base =
  -- for each weblog string convert the weblong string into a
  -- structure with named fields
  FOREACH
    raw_logs
  GENERATE
    FLATTEN (
      EXTRACT(
        line,
        '^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] "(.+?)" (\\S+) (\\S+) "([^"]*)" "([^"]*)"'
      )
    )
    AS (
      remoteAddr: chararray, remoteLogname: chararray, user: chararray, time: chararray,
      request: chararray, status: int, bytes_string: chararray, referrer: chararray,
      browser: chararray
    )
  ;

Solution

  • The original script from Amazon would not work because it relied on an older version of piggybank. Here is an updated version that does not need piggybank at all.

    --
    -- import logs and break into tuples
    --
    
    raw_logs =
      -- load the weblogs into a sequence of one element tuples
      LOAD '$INPUT' USING TextLoader AS (line:chararray);
    
    logs_base =
      -- for each weblog string convert the weblong string into a
      -- structure with named fields
      FOREACH
        raw_logs
      GENERATE
        FLATTEN (
          REGEX_EXTRACT_ALL(
            line,
            '^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] "(.+?)" (\\S+) (\\S+) "([^"]*)" "([^"]*)"'
          )
        )
        AS (
          remoteAddr: chararray, remoteLogname: chararray, user: chararray, time: chararray,
          request: chararray, status: int, bytes_string: chararray, referrer: chararray,
          browser: chararray
        )
      ;
    
    logs =
      -- convert from string values to typed values such as date_time and integers
      FOREACH 
        logs_base 
      GENERATE 
        *, 
        ToDate(time, 'dd/MMM/yyyy:HH:mm:ss Z', 'UTC') as dtime,
        (int)REPLACE(bytes_string, '-', '0')          as bytes
      ;
    
    
    --
    -- determine total number of requests and bytes served by UTC hour of day
    -- aggregating as a typical day across the total time of the logs
    --
    by_hour_count =
      -- group logs by their hour of day, counting the number of logs in that hour
      -- and the sum of the bytes of rows for that hour
      FOREACH 
        (GROUP logs BY GetHour(dtime))
      GENERATE 
        $0, 
        COUNT($1) AS num_requests, 
        SUM($1.bytes) AS num_bytes
      ;
    
    STORE by_hour_count INTO '$OUTPUT/total_requests_bytes_per_hour';
    
    
    
    --
    -- top 50 X.X.X.* blocks
    --
    by_ip_count =
      -- group weblog entries by the ip address from the remote address field
      -- and count the number of entries for each address blok as well as
      -- the sum of the bytes
        FOREACH
          (GROUP logs BY (chararray)REGEX_EXTRACT(remoteAddr, '(\\d+\\.\\d+\\.\\d+)', 1))
      --     (GROUP logs BY block)
        GENERATE  $0,
          COUNT($1) AS num_requests,
          SUM($1.bytes) AS num_bytes
      ; 
    
    
    
    by_ip_count_sorted =  ORDER by_ip_count BY num_requests DESC;
    
    by_ip_count_limited =
      -- order ip by the number of requests they make
      LIMIT by_ip_count_sorted 50;
    
    STORE by_ip_count_limited into '$OUTPUT/top_50_ips';
    
    
    --
    -- top 50 external referrers
    --
    by_referrer_count =
      -- group by the referrer URL and count the number of requests
      FOREACH
        (GROUP logs BY (chararray)REGEX_EXTRACT(referrer, '(http:\\/\\/[a-z0-9\\.-]+)', 1))
      GENERATE
        FLATTEN($0),
        COUNT($1) AS num_requests
      ;
    
    by_referrer_count_filtered =
      -- exclude matches for example.org
      FILTER by_referrer_count BY NOT $0 matches '.*example\\.org';
    
    by_referrer_count_sorted =
      -- take the top 50 results
      ORDER by_referrer_count_filtered BY $1 DESC;
    
    by_referrer_count_limited =
      -- take the top 50 results
      LIMIT by_referrer_count_sorted 50;
    
    STORE by_referrer_count_limited INTO '$OUTPUT/top_50_external_referrers';
    
    
    --
    -- top search terms coming from bing or google
    --
    google_and_bing_urls =
      -- find referrer fields that match either bing or google
      FILTER
        (FOREACH logs GENERATE referrer)
      BY
        referrer matches '.*bing.*'
      OR
        referrer matches '.*google.*'
      ;
    
    search_terms =
      -- extract from each referrer url the search phrases
      FOREACH
        google_and_bing_urls
      GENERATE
        FLATTEN(REGEX_EXTRACT_ALL(referrer, '.*[&\\?]q=([^&]+).*')) as (term:chararray)
      ;
    
    search_terms_filtered =
      -- reject urls that contained no search terms
      FILTER search_terms BY NOT $0 IS NULL;
    
    search_terms_count =
      -- for each search phrase count the number of weblogs entries that contained it
      FOREACH
        (GROUP search_terms_filtered BY $0)
      GENERATE
        $0,
        COUNT($1) AS num
      ;
    
    search_terms_count_sorted =
      -- order the results
      ORDER search_terms_count BY num DESC;
    
    search_terms_count_limited =
      -- take the top 50 results
      LIMIT search_terms_count_sorted 50;
    
    STORE search_terms_count_limited INTO '$OUTPUT/top_50_search_terms_from_bing_google';