Search code examples
mapreducehivehbaseimpalaapache-phoenix

what is the Fastest way to extract data from HBase


I have around 5TB of data distributed across 30 different tables in the HBase. My use case is that based on two specific column in each table that is YEAR and Country i have to create 5K different text files. I have integrated HIVE and HBase for this purpose but extraction from HIVE takes very long time . I have to finish this within 10 hours of time . Seeking your idea how to achieve that . I have some question regarding this.

  1. Is HIVE HBase integration is good approach?
  2. Extracting data from HBase using mapreduce will be good idea?
  3. I can not use Apache Phoenix because its not released with HBase .
  4. IMPALA also uses high memory so my cluster in not configured for that.

public int run(String[] args) throws Exception {
    int result = 0;
    if (hbaseConf == null)
        hbaseConf = getHbaseConfiguration();
    Job job = new Job(hbaseConf);
            job.setJarByClass(HBaseToFileDriver.class);
            job.setJobName("Importing Data from HBase to File:::" + args[0]);
        Scan scan = new Scan();
        scan.setCaching(5000); // 1 is the default in Scan, which will be bad
                       // for
                       // MapReduce jobs
        scan.setCacheBlocks(false); // don't set to true for MR jobs
        scan.addFamily(Bytes.toBytes("cf"));

        TableMapReduceUtil.initTableMapperJob(args[0], scan, MyMapper.class, null, null, job);
        // No reducers. Just write straight to output files.
        job.setNumReduceTasks(0);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Result.class);
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean b = job.waitForCompletion(true);
        if (!b) {
            throw new IOException("error with job!");
        }
        return result;
        }
    }

My Data in the HBase is like

���U"9����|Japan|2012   48433172245     1001371402      FundamentalSeries       NULL    NULL    139     238474518       1.65494205533344        Price2SFCFLPsr  NULL   False            3011645 1000190205      False   True    I       Japan   2012

C��t�I�\���7|ThirdPartyPrivate|2009     48934711562     1001371402      FundamentalSeries       NULL    NULL    9       5631268 21.2315827835749        STCA_PoP       NULL     False           3011645 1000193170      False   True    I       ThirdPartyPrivate       2009


�����^Z4Ga�|Japan|2013  48433158708     1001371402      FundamentalSeries       NULL    NULL    507     160531379       1.1248E10       STAX_TTM        500186  False  3011646  1000193168      False   False   I       Japan   2013


        G\�=�HO�S�|Japan|2008   48433173983     1001371402      FundamentalSeries       NULL    NULL    153     1961706488      0.500256556630127       RIBEIT_TTM     NULL     False           3011646 1000193016      False   False   I       Japan   2008


        �G��G�i0�]|Japan|2012   48433336633     1001371402      FundamentalSeries       NULL    NULL    894     3112047463      14.3904580667924        Ev2SEBIT_Avg5  NULL     False           3011645 1000190030      False   True    I       Japan   2012


        ���r����/8|Japan|2015   48433251137     1001371402      FundamentalSeries       NULL    NULL    200     2907364871      -46.9431625157866       SNOPA_YoY      NULL     False           3011646 1000423629      False   False   I       Japan   2015



�)H�<�����t|Japan|2008  48433139729     1001371402      FundamentalSeries       NULL    NULL    1170    2604636883      0.267980759053007       PPE2ANOA        NULL   False            3011646 1001262486      False   False   I       Japan   2008



'H�&�g���|Japan|2005    48433195827     1001371402      FundamentalSeries       NULL    NULL    147     450289107       0.540110660915134       Ev2SEBIT        NULL   False            3011645 1000190028      False   True    I       Japan   2005



c�\��17ɟ�|Japan|2013    48433160145     1001371402      FundamentalSeries       NULL    NULL    885     2010667500      -19.6553084635268       SAMI_TTM_YoY    NULL   False            3011646 1000190297      False   False   I       Japan   2013


j���}��||Japan|2010     48433159175     1001371402      FundamentalSeries       NULL    NULL    214     420693538       -17.3468681844827       SCOR_YoY        NULL   False            3011646 1000192789      False   False   I       Japan   2010

Solution

  • Option 1 : Please note that hive hbase integration & querying hive will also use mapreduce behind the scene...

    But you don't have fine grain control over mapreduce executed by hive.

    Option 3: You have also ruled out Option 3 i.e Phoenix, which you have mentioned.

    Option 4 : Impala is faster but you have certain limitations. so ruled out

    Option 2 : Out of my experience with hbase, I d offer Extracting data from HBase using mapreduce. i.e your Option 2 which will give more granular control over execution of job.

    But in this approach also you have to fine tune your job.

    scan.setCaching(500);
    scan.setCacheBlocks(false); 
    
    • Most importantly you have to design your rowkey to avoid hot spotting and use efficient filters (like FuzzyRowFilter for instance see here), to ensure fast access.
    • try to avoid column value filters as much as possible to ensure full table scan should not happen.
    • Note that number of regions of the tables is equal to number of mappers launched for that particular job. so pre-split the tables to between some range ( for example 0-9) so that all your rows will fall under these definite regions(of-course it can further split into multiple regions, but this is one way if ensuring less number of regions and hence all the mappers gets sufficient number of records to process...)

    If I correctly understood. you want to generate multiple sequence files;

    Please see the usage pattern using MultipleOutputs.

    see Usage pattern for job submission:

     Job job = new Job();
    
     FileInputFormat.setInputPath(job, inDir);
     FileOutputFormat.setOutputPath(job, outDir);
    
     job.setMapperClass(MOMap.class);
     job.setReducerClass(MOReduce.class);
     ...
    
     // Defines additional single text based output 'text' for the job
     MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
     LongWritable.class, Text.class);
    
     // Defines additional sequence-file based output 'sequence' for the job
     MultipleOutputs.addNamedOutput(job, "seq",
       SequenceFileOutputFormat.class,
       LongWritable.class, Text.class);
     ...
    
     job.waitForCompletion(true);
     ...
    

    When used in conjuction with org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat, MultipleOutputs can mimic the behaviour of MultipleTextOutputFormat and MultipleSequenceFileOutputFormat from the old Hadoop API - ie, output can be written from the Reducer to more than one location.