Search code examples
hadoopamazon-web-servicesapache-pigelastic-map-reduce

Pig group by and average function


I have data that looks like this

STN--- WBAN   YEARMODA    TEMP       DEWP      SLP        STP       VISIB      WDSP     MXSPD   GUST    MAX     MIN   PRCP   SNDP   FRSHTT
030050 99999  19291029    46.7  4    42.0  4   990.9  4  9999.9  0   10.9  4   13.0  4   13.0  999.9    46.9*   44.1  99.99  999.9  010000
030050 99999  19291030    43.5  4    33.5  4  1015.4  4  9999.9  0   12.4  4   14.3  4   18.1  999.9    46.9    42.1   0.00I 999.9  000000
030050 99999  19291031    43.7  4    37.3  4  1026.8  4  9999.9  0   12.4  4    4.5  4    8.9  999.9    46.9*   37.9   0.00I 999.9  000000
030050 99999  19291101    49.2  4    45.5  4  1019.9  4  9999.9  0    6.2  4    8.2  4   13.0  999.9    51.1*   46.0  99.99  999.9  010000
030050 99999  19291102    47.0  4    44.5  4  1013.6  4  9999.9  0    7.8  4    6.2  4    8.9  999.9    51.1    44.1   0.00I 999.9  000000
030050 99999  19291103    44.0  4    36.0  4  1009.2  4  9999.9  0   10.9  4    8.0  4    8.9  999.9    50.0    42.1   0.00I 999.9  000000

I want to get the average for each month, in this case: 10 and 11.

First I load the data using:

RAW_LOGS = LOAD 'data' as (line:chararray);

Then I separate the data into different variables using a regex:

LOGS_BASE = FOREACH RAW_LOGS GENERATE 
    FLATTEN( 
       REGEX_EXTRACT_ALL(line, '^(\\d+)\\s+(\\d+)\\s+(\\d{4})(\\d{2})(\\d{2})\\s+(\\d+\\.\\d).*$')  
    ) 
    as (
      STN: int, 
      WBAN: int, 
      YEAR: int, 
      MONTH: int,
      DAY: int,
      TEMP: float
  );

Next I get rid of the top tuple which previously contained the header data:

no_nulls = FILTER LOGS_BASE BY STN is not null;

Then I group the data by STN, WBAN, YEAR, and MONTH:

grouped = group no_nulls by STN..MONTH;

And finally I try to generate an Average and run into an error:

C = FOREACH grouped GENERATE AVG(LOGS_BASE.TEMP);

ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1045:
<line 17, column 29> Could not infer the matching function for org.apache.pig.builtin.AVG as    multiple or none of them fit. Please use an explicit cast.

I think the error may be with my Regex in that it is returning the TEMP as a string even though I am telling it to be a double but I could be wrong.

EDIT: I changed C to:

C = FOREACH grouped GENERATE AVG(no_nulls.TEMP);

and now I get this error:

HadoopVersion   PigVersion      UserId  StartedAt       FinishedAt      Features
1.0.3   0.9.2-amzn      hadoop  2013-04-20 19:55:25     2013-04-20 19:57:21     GROUP_BY,FILTER

Failed!

Failed Jobs:
JobId   Alias   Feature Message Outputs
job_201304201942_0001   C,LOGS_BASE,RAW_LOGS,grouped,no_nulls   GROUP_BY,COMBINER       Message: Job failed! Error - # of failed Map Tasks exceeded allowed limit. FailedCount: 1. LastFailedTask: task_201304201942_0001_m_000000 hdfs://10.254.106.85:9000/tmp/temp413183623/tmp1677272203,

The log has a bit more info:

org.apache.pig.backend.executionengine.ExecException: ERROR 2106: Error while computing average in Initial
    at org.apache.pig.builtin.FloatAvg$Initial.exec(FloatAvg.java:99)
    at org.apache.pig.builtin.FloatAvg$Initial.exec(FloatAvg.java:75)
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:216)
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:253)
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:334)
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:332)
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:284)
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:290)
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNext(POLocalRearrange.java:256)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.runPipeline(PigGenericMapBase.java:267)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:262)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:64)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:771)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1132)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Float
    at org.apache.pig.builtin.FloatAvg$Initial.exec(FloatAvg.java:86)
    ... 19 more

Pig Stack Trace
---------------
ERROR 2997: Unable to recreate exception from backed error: org.apache.pig.backend.executionengine.ExecException: ERROR 2106: Error while computing average in Initial

org.apache.pig.impl.logicalLayer.FrontendException: ERROR 1066: Unable to open iterator for alias C. Backend error : Unable to recreate exception from backed error: org.apache.pig.backend.executionengine.ExecException: ERROR 2106: Error while computing average in Initial
    at org.apache.pig.PigServer.openIterator(PigServer.java:890)
    at org.apache.pig.tools.grunt.GruntParser.processDump(GruntParser.java:679)
    at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:303)
    at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:189)
    at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:165)
    at org.apache.pig.tools.grunt.Grunt.run(Grunt.java:69)
    at org.apache.pig.Main.run(Main.java:500)
    at org.apache.pig.Main.main(Main.java:114)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:187)
Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 2997: Unable to recreate exception from backed error: org.apache.pig.backend.executionengine.ExecException: ERROR 2106: Error while computing average in Initial
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.Launcher.getErrorMessages(Launcher.java:221)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.Launcher.getStats(Launcher.java:151)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher.launchPig(MapReduceLauncher.java:354)
    at org.apache.pig.PigServer.launchPlan(PigServer.java:1313)
    at org.apache.pig.PigServer.executeCompiledLogicalPlan(PigServer.java:1298)
    at org.apache.pig.PigServer.storeEx(PigServer.java:995)
    at org.apache.pig.PigServer.store(PigServer.java:962)
    at org.apache.pig.PigServer.openIterator(PigServer.java:875)

Solution

  • It turns out that temp was being treated as a String instead of a Float. I applied the code used here and got it to work. Even though I told Pig to treat the TEMP column as a float it was still reading it in as a chararray. This ended up being a one line fix by putting (tuple(int,int,int,int,int,float)) right before my REGEX_EXTRACT_ALL function. Here's what that code looks like:

    LOGS_BASE = FOREACH RAW_LOGS GENERATE 
        FLATTEN( 
            (tuple(int,int,int,int,int,float))
           REGEX_EXTRACT_ALL(line, '^(\\d+)\\s+(\\d+)\\s+(\\d{4})(\\d{2})(\\d{2})\\s+(-?\\d+\\.\\d).*$')  
        ) 
        as (
          STN: int, 
          WBAN: int, 
          YEAR: int, 
          MONTH: int,
          DAY: int,
          TEMP: float
      );