Search code examples
hbaseapache-kafkakafka-consumer-apikafka-producer-apihbase-client

Hbase Table.batch takes 300 secs to insert 800,000 entries into table


I am reading a json file of size 30 mb, process to create column family and key values. Then create Put object, insert rowkey and values into it. Create list of such put objects and call Table.batch() and pass this list. I am calling this when my arraylist size is 50000. Then clear list and call next batch. However to process while file which eventually has 800,000 entries takes 300 secs. I also tired table.put but it was even slower. I am using hbase 1.1. I am getting that json from Kafka. Any suggestions to improve performance is appreciated. I checked SO forums but not much help. I will share code if you want to have a look at it.

Regards

Raghavendra

public static void processData(String jsonData)
{
    if (jsonData == null || jsonData.isEmpty())
    {
        System.out.println("JSON data is null or empty. Nothing to process");
        return;
    }

    long startTime = System.currentTimeMillis();

    Table table = null;
    try
    {
        table = HBaseConfigUtil.getInstance().getConnection().getTable(TableName.valueOf("MYTABLE"));
    }
    catch (IOException e1)
    {
        System.out.println(e1);
    }

    Put processData = null;
    List<Put> bulkData = new ArrayList<Put>();

    try
    {

        //Read the json and generate the model into a class    
        //ProcessExecutions is List<ProcessExecution>
        ProcessExecutions peData = JsonToColumnData.gson.fromJson(jsonData, ProcessExecutions.class);

        if (peData != null)
        {
            //Read the data and pass it to Hbase
            for (ProcessExecution pe : peData.processExecutions)
            {
                //Class Header stores some header information
                Header headerData = pe.getHeader();   

                String rowKey = headerData.getRowKey();
                processData = new Put(Bytes.toBytes(JsonToColumnData.rowKey));
                processData.addColumn(Bytes.toBytes("Data"),
                                Bytes.toBytes("Time"),
                                Bytes.toBytes("value"));

                //Add to list
                bulkData.add(processData);            
                if (bulkData.size() >= 50000) //hardcoded for demo
                {
                    long tmpTime = System.currentTimeMillis();
                    Object[] results = null;
                    table.batch(bulkData, results);                     
                    bulkData.clear();
                    System.gc();
                }
            } //end for
            //Complete the remaining write operation
            if (bulkData.size() > 0)
            {
                Object[] results = null;
                table.batch(bulkData, results);
                bulkData.clear();
                //Try to free memory
                System.gc();
            }
    }
    catch (Exception e)
    {
        System.out.println(e);
        e.printStackTrace();
    }
    finally
    {
        try
        {
            table.close();
        }
        catch (IOException e)
        {
            System.out.println("Error closing table " + e);
            e.printStackTrace();
        }
    }

}


//This function is added here to show the connection
 /*public Connection getConnection()
{

    try
    {
        if (this.connection == null)
        {
            ExecutorService executor = Executors.newFixedThreadPool(HBaseConfigUtil.THREADCOUNT);
            this.connection = ConnectionFactory.createConnection(this.getHBaseConfiguration(), executor);
        }
    }
    catch (IOException e)
    {
        e.printStackTrace();
        System.out.println("Error in getting connection " + e.getMessage());
    }

    return this.connection;
}*/

Solution

  • I had the same case where I need to parse 5 GB json and insert to hbase table ...You can try the below way(which should work), which proved very fast for batch of 100000 records in my case.

    public void addMultipleRecordsAtaShot(final ArrayList<Put> puts, final String tableName) throws Exception {
            try {
                final HTable table = new HTable(HBaseConnection.getHBaseConfiguration(), getTable(tableName));
                table.put(puts);
                LOG.info("INSERT record[s] " + puts.size() + " to table " + tableName + " OK.");
            } catch (final Throwable e) {
                e.printStackTrace();
            } finally {
                LOG.info("Processed ---> " + puts.size());
                if (puts != null) {
                    puts.clear();
                }
            }
        }
    

    For more details to increase buffer size check my answer in a different context to increase buffer size please refer doc https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Table.html