Search code examples
hadoophiveapache-pigcloudera-cdhhcatalog

Reading a non-string partitioned Hive table in Pig


I am trying to read data from a Hive table using Pig. Details follow:

  • Hive version 1.1
  • Pig 0.12
  • Hadoop 2.6.0
  • Cloudera Distribution 5.4.4

Hive table schema:

map <string, string>
yyyy int
mm int
dd int

Partitions are yyyy(int), mm(int), dd(int)

Pig code:

input_data = LOAD ‘dbname.tablename'
             USING org.apache.hive.hcatalog.pig.HCatLoader()
             ;

input_data_f = FILTER input_data BY yyyy == 2016 AND
                                      mm == 7 AND
                                      dd == 19
                                      ;

rmf input_data_dump;
STORE input_data_f INTO ‘input_data_dump';

Command used to run: pig -useHCatalog -f ./read_input.pig

I get the following error.

Error:
Pig Stack Trace
---------------
ERROR 2017: Internal error creating job configuration.

org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobCreationException: ERROR 2017: Internal error creating job configuration.
        at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.getJob(JobControlCompiler.java:873)
        at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.compile(JobControlCompiler.java:298)
        at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher.launchPig(MapReduceLauncher.java:190)
        at org.apache.pig.PigServer.launchPlan(PigServer.java:1334)
        at org.apache.pig.PigServer.executeCompiledLogicalPlan(PigServer.java:1319)
        at org.apache.pig.PigServer.execute(PigServer.java:1309)
        at org.apache.pig.PigServer.executeBatch(PigServer.java:387)
        at org.apache.pig.PigServer.executeBatch(PigServer.java:365)
        at org.apache.pig.tools.grunt.GruntParser.executeBatch(GruntParser.java:140)
        at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:202)
        at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:173)
        at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:84)
        at org.apache.pig.Main.run(Main.java:478)
        at org.apache.pig.Main.main(Main.java:156)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: java.io.IOException: MetaException(message:Filtering is supported only on partition keys of type string)
        at org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(HCatInputFormat.java:97)
        at org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(HCatInputFormat.java:61)
        at org.apache.hive.hcatalog.pig.HCatLoader.setLocation(HCatLoader.java:125)
        at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.getJob(JobControlCompiler.java:498)
        ... 19 more
Caused by: MetaException(message:Filtering is supported only on partition keys of type string)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partitions_by_filter_result$get_partitions_by_filter_resultStandardScheme.read(ThriftHiveMetastore.java)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partitions_by_filter_result$get_partitions_by_filter_resultStandardScheme.read(ThriftHiveMetastore.java)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partitions_by_filter_result.read(ThriftHiveMetastore.java)
        at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partitions_by_filter(ThriftHiveMetastore.java:2132)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partitions_by_filter(ThriftHiveMetastore.java:2116)
        at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.listPartitionsByFilter(HiveMetaStoreClient.java:1047)
        at org.apache.hive.hcatalog.mapreduce.InitializeInput.getInputJobInfo(InitializeInput.java:113)
        at org.apache.hive.hcatalog.mapreduce.InitializeInput.setInput(InitializeInput.java:86)
        at org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(HCatInputFormat.java:95)
        ... 22 more

Looking on the web got me to https://issues.apache.org/jira/browse/HIVE-7164

Is setting hive.metastore.integral.jdo.pushdownto true in hive-site.xml the only solution? This is a corporate setup so am not sure if I can make changes to hive-site.xml and if I get the admin to make the change will there be any side effects?

Tried the following:

Attempt 1

set hive.metastore.integral.jdo.pushdown true;

input_data = LOAD ‘dbname.tablename'
             USING org.apache.hive.hcatalog.pig.HCatLoader()
             ;

input_data_f = FILTER input_data BY yyyy == 2016 AND
                                      mm == 7 AND
                                      dd == 19
                                      ;

STORE input_data_f INTO ‘input_data_dump';

I see this in the log:

org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer - {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, DuplicateForEachColumnRewrite, GroupByConstParallelSetter, ImplicitSplitInserter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, NewPartitionFilterOptimizer, PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter], RULES_DISABLED=[FilterLogicExpressionSimplifier, PartitionFilterOptimizer]}

Attempt 2

set hive.metastore.integral.jdo.pushdown true;
set pig.exec.useOldPartitionFilterOptimizer true;

input_data = LOAD ‘dbname.tablename'
             USING org.apache.hive.hcatalog.pig.HCatLoader()
             ;

input_data_f = FILTER input_data BY yyyy == 2016;
input_data_f1 = FILTER input_data_f BY mm == 7;
input_data_f2 = FILTER input_data_f1 BY dd == 19;

STORE input_data_f2 INTO ‘input_data_dump';

I see this in the log:

org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer - {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, DuplicateForEachColumnRewrite, GroupByConstParallelSetter, ImplicitSplitInserter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, PartitionFilterOptimizer, PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter], RULES_DISABLED=[FilterLogicExpressionSimplifier, NewPartitionFilterOptimizer]}

Attempt 3

set pig.exec.useOldPartitionFilterOptimizer true;

input_data = LOAD ‘dbname.tablename'
             USING org.apache.hive.hcatalog.pig.HCatLoader()
             ;

input_data_f = FILTER input_data BY yyyy == 2016;
input_data_f1 = FILTER input_data_f BY mm == 7;
input_data_f2 = FILTER input_data_f1 BY dd == 19;

STORE input_data_f2 INTO ‘input_data_dump';

I see this in the log:

org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer - {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, DuplicateForEachColumnRewrite, GroupByConstParallelSetter, ImplicitSplitInserter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, PartitionFilterOptimizer, PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter], RULES_DISABLED=[FilterLogicExpressionSimplifier, NewPartitionFilterOptimizer]}

With the above attempts I still get the same error.

Appreciate the help.


Solution

  • Update:
    Partition filter does not pushed into loader in some cases:
    In Pig 0.12.0, Pig only pushes the first filter to the loader. You will get the same result, but there is a performance downgrade because of it. - To get around this, you should use one filter statement for all partition. Or you can specify: pig.exec.useOldPartitionFilterOptimizer=true see deails here - known issue of 0.12

    For pig script specific properties you can use one of these options:

    - The pig.properties file (add the directory that contains the pig.properties file to the classpath)
    - The -D command line option and a Pig property (pig -Dpig.tmpfilecompression=true)
    - The -P command line option and a properties file (pig -P mypig.properties)
    - The set command (set pig.exec.nocombiner true) directly in pig sctipt

    more details on properties here. . .

    Test: cast to type chararray

    $ hadoop version
    Hadoop 2.6.0-cdh5.7.0
    
    $ pig -version
    Apache Pig version 0.12.0-cdh5.7.0 (rexported) 
    
    $ cat pig_test1
    -- set hive.metastore.integral.jdo.pushdown true;
    input_data = LOAD 'cards.props'
                 USING org.apache.hive.hcatalog.pig.HCatLoader()
                 ;
    
    input_data_f = FILTER input_data BY (chararray)yyyy == '2106' AND
                                         (chararray)mm == '8' AND
                                          (chararray)dd == '4'
                                          ;
    dump input_data_f;
    

    2016-08-04 17:15:54,541 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job
    ([1#test1],2106,8,4)
    ([2#test2],2106,8,4)
    ([3#test3],2106,8,4)
    

    hive> select * from props;
    OK
    {"1":"test1"}   2106    8   4
    {"2":"test2"}   2106    8   4
    {"3":"test3"}   2106    8   4