Search code examples
hbaseapache-phoenix

Upsert in Apache Phoenix EXTREMELY slow how to improve write performance?


I am doing a POC where I am using Pheonix for single write that updates the database after the write. So I cannot batch update the write. I am getting 1 transaction per second as TPS. I have 3 node EMR cluster. I am using HBase with S3 as backend.

I tried tuning parameter I found online I created threaded app but still performance is extremely slow.

I have written a threaded program to batch upsert data into Phoenix. I am using Phoenix because of secondary index capability. I am getting extremely slow performance in write.

Explain query looks like following

0: jdbc:phoenix:localhost:2181:/hbase> EXPLAIN select count(1) from VBQL_PHOENIX_TRANSCRIPT5 . . . . . . . . . . . . . . . . . . .> ; +---------------------------------------------------------------------------------------------------------------+-----------------+------+ | PLAN
| EST_BYTES_READ | EST_ | +---------------------------------------------------------------------------------------------------------------+-----------------+------+ | CLIENT 100-CHUNK 6838539 ROWS 314572800 BYTES PARALLEL 100-WAY FULL SCAN OVER VBQL_PHOENIX_TRANSCRIPT_INDEX5 | 314572800 | 6838 | | SERVER FILTER BY FIRST KEY ONLY
| 314572800 | 6838 | | SERVER AGGREGATE INTO SINGLE ROW
| 314572800 | 6838 | +---------------------------------------------------------------------------------------------------------------+-----------------+------+ Tuning Settings used in HBase. Problem is it is very hard to scale, I tried adding more nodes to Hbase cluster, and I also tried adding more threads to the client program but it is not scaling beyond 6K per minutes, which is VERY SLOW. Any help is greatly appreciated.

 <property> 
      <name>index.writer.threads.max</name> 
      <value>30</value> 
 </property> 
 <property> 
      <name>index.builder.threads.max</name> 
      <value>30</value> 
 </property> 
 <property> 
      <name>phoenix.query.threadPoolSize</name> 
      <value>256</value> 
 </property> 
 <property> 
      <name>index.builder.threads.keepalivetime</name> 
      <value>90000</value> 
 </property> 
<property> 
      <name>phoenix.query.timeoutMs</name> 
      <value>90000</value> 
 </property>
<property> 
      <name>phoenix.default.update.cache.frequency</name> 
      <value>300000</value> 
 </property

I have created a table like following

CREATE TABLE IF NOT EXISTS VBQL_PHOENIX_TRANSCRIPT ( PK VARCHAR NOT NULL PRIMARY KEY, IMMUTABLES.VBMETAJSON VARCHAR, IMMUTABLES.ACCOUNTID VARCHAR, IMMUTABLES.DATECREATED VARCHAR, IMMUTABLES.DATEFINISHED VARCHAR, IMMUTABLES.MEDIAID VARCHAR, IMMUTABLES.JOBID VARCHAR, IMMUTABLES.STATUS VARCHAR, UPDATABLE.METADATA VARCHAR, CATEGORIES.C_ACOUNTID_CATEGORYNAME VARCHAR, COMPUTED.ADDITIONALMETRICS VARCHAR) SALT_BUCKETS =100;

with secondary index like this:

CREATE INDEX  VBQL_PHOENIX_TRANSCRIPT_INDEX5  ON  VBQL_PHOENIX_TRANSCRIPT5 (IMMUTABLES.MEDIAID) ;


Sample Upsert
UPSERT INTO VBQL_PHOENIX_TRANSCRIPT2  ( PK , IMMUTABLES.ACCOUNTID , IMMUTABLES.DATECREATED , IMMUTABLES.DATEFINISHED ,
IMMUTABLES.MEDIAID , IMMUTABLES.JOBID , IMMUTABLES.STATUS  ) 
VALUES ('5DAD32BA-9656-41F3-BD38-BBF890B85CD62018-05-18T18:09:38.60700005D681A95C-8CDA-47B2-93BE-C165B1DEC7D8', 'AAAAAAAAAAAAAAA5DAD32BA-9656', 
'2018-04-18T18:09:38.607+0000', '2018-05-18T18:09:38.607+0000','5D681A95C-8CDA-47B2-93BE-C165B1DEC7D8', 'JOB123', 'FINISHED');


HBASE IS INSTALLED ON EMR CLUSTER HERE TABLE IS CREATED USING ABOVE CREATE TABLE COMMANDS

EMR Cluster is 4 node m4.4xlarge cluster (32 vCore, 64 GiB memory, EBS only storage
EBS Storage:32 GiB)

Client is a Java program running in EC2 (m4.10xlarge m4.10xlarge    40 CPU  160  RAM 10 GiB Network EBS Only    10 Gbps 4,000 Mbps) Client is a multithreaded program that creates atomic connection to Hbase and performs inserts.


CLIENT hbase-site.xml looks like following:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
     /**
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
-->
<configuration>
  <property>
  <name>hbase.regionserver.wal.codec</name>
  <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>

  <property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
  </property>

  <property>
    <name>hbase.zookeeper.quorum</name>
    <value>10.16.129.55</value>
  </property>

  <property>
    <name>hbase.rootdir</name>
    <value>s3://dev-mock-transcription/</value>
  </property>

  <property>
    <name>dfs.support.append</name>
    <value>true</value>
  </property>

  <property>
    <name>hbase.rest.port</name>
    <value>8070</value>
  </property>


  <property>
    <name>hbase.replication</name>
    <value>false</value>
  </property>

  <property>
    <name>hbase.balancer.tablesOnMaster</name>
    <value>hbase:meta</value>
  </property>

  <property>
    <name>hbase.bucketcache.size</name>
    <value>8192</value>
  </property>

  <property>
    <name>hbase.master.balancer.uselocality</name>
    <value>false</value>
  </property>

  <property>
    <name>hbase.master.startup.retainassign</name>
    <value>false</value>
  </property>

  <property>
    <name>hbase.wal.dir</name>
    <value>hdfs://10.16.129.55:8020/user/hbase/WAL</value>
  </property>

  <property>
    <name>hbase.bulkload.retries.retryOnIOException</name>
    <value>true</value>
  </property>

  <property>
    <name>hbase.bucketcache.ioengine</name>
    <value>files:/mnt1/hbase/bucketcache</value>
  </property>

   <property>
      <name>hbase.rpc.timeout</name>
      <value>1800000</value>
    </property>


  <property>
      <name>phoenix.query.timeoutMs</name>
      <value>18000000</value>
                    </property>
     <property>
      <name>phbase.regionserver.lease.period</name>
      <value>18000000</value>
    </property>

      <property>
      <name>hbase.client.scanner.caching</name>
      <value>180000</value>
    </property>

      <property>
      <name>phbase.client.scanner.timeout.period</name>
      <value>18000000</value>
    </property>

 <property>
      <name>index.writer.threads.max</name>
      <value>30</value>
 </property>
 <property>
      <name>index.builder.threads.max</name>
      <value>30</value>
 </property>
 <property>
      <name>phoenix.query.threadPoolSize</name>
      <value>256</value>
 </property>
 <property>
      <name>index.builder.threads.keepalivetime</name>
      <value>90000</value>
 </property>
<property>
      <name>phoenix.query.timeoutMs</name>
      <value>90000</value>
 </property>


</configuration>



HBASE ENV LOOKS LIKE FOLLOWING:

[ec2-user@ip-10-16-129-55 conf]$ cat hbase-env.sh 
#
#/**
# * Licensed to the Apache Software Foundation (ASF) under one
# * or more contributor license agreements.  See the NOTICE file
# * distributed with this work for additional information
# * regarding copyright ownership.  The ASF licenses this file
# * to you under the Apache License, Version 2.0 (the
# * "License"); you may not use this file except in compliance
# * with the License.  You may obtain a copy of the License at
# *
# *     http://www.apache.org/licenses/LICENSE-2.0
# *
# * Unless required by applicable law or agreed to in writing, software
# * distributed under the License is distributed on an "AS IS" BASIS,
# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# * See the License for the specific language governing permissions and
# * limitations under the License.
# */

# Set environment variables here.

# This script sets variables multiple times over the course of starting an hbase process,
# so try to keep things idempotent unless you want to take an even deeper look
# into the startup scripts (bin/hbase, etc.)

# The java implementation to use.  Java 1.7+ required.
# export JAVA_HOME=/usr/java/jdk1.6.0/

# Extra Java CLASSPATH elements.  Optional.
export HBASE_CLASSPATH=/etc/hadoop/conf

# The maximum amount of heap to use. Default is left to JVM default.
# export HBASE_HEAPSIZE=1G
export HBASE_HEAPSIZE=1024

# Uncomment below if you intend to use off heap cache. For example, to allocate 8G of 
# offheap, set the value to "8G".
# export HBASE_OFFHEAPSIZE=1G

# Extra Java runtime options.
# Below are what we set by default.  May only work with SUN JVM.
# For more on why as well as other possible settings,
# see http://wiki.apache.org/hadoop/PerformanceTuning
export HBASE_OPTS="$HBASE_OPTS -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSInitiatingOccupancyFraction=70 -Dsun.net.inetaddr.ttl=60 -Dnetworkaddress.cache.ttl=60"


# Uncomment one of the below three options to enable java garbage collection logging for the server-side processes.

# This enables basic gc logging to the .out file.
# export SERVER_GC_OPTS="-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps"

# This enables basic gc logging to its own file.
# If FILE-PATH is not replaced, the log file(.gc) would still be generated in the HBASE_LOG_DIR .
# export SERVER_GC_OPTS="-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:<FILE-PATH>"

# This enables basic GC logging to its own file with automatic log rolling. Only applies to jdk 1.6.0_34+ and 1.7.0_2+.
# If FILE-PATH is not replaced, the log file(.gc) would still be generated in the HBASE_LOG_DIR .
# export SERVER_GC_OPTS="-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:<FILE-PATH> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=1 -XX:GCLogFileSize=512M"

# Uncomment one of the below three options to enable java garbage collection logging for the client processes.

# This enables basic gc logging to the .out file.
# export CLIENT_GC_OPTS="-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps"

# This enables basic gc logging to its own file.
# If FILE-PATH is not replaced, the log file(.gc) would still be generated in the HBASE_LOG_DIR .
# export CLIENT_GC_OPTS="-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:<FILE-PATH>"

# This enables basic GC logging to its own file with automatic log rolling. Only applies to jdk 1.6.0_34+ and 1.7.0_2+.
# If FILE-PATH is not replaced, the log file(.gc) would still be generated in the HBASE_LOG_DIR .
# export CLIENT_GC_OPTS="-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:<FILE-PATH> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=1 -XX:GCLogFileSize=512M"

# See the package documentation for org.apache.hadoop.hbase.io.hfile for other configurations
# needed setting up off-heap block caching. 

# Uncomment and adjust to enable JMX exporting
# See jmxremote.password and jmxremote.access in $JRE_HOME/lib/management to configure remote password access.
# More details at: http://java.sun.com/javase/6/docs/technotes/guides/management/agent.html
# NOTE: HBase provides an alternative JMX implementation to fix the random ports issue, please see JMX
# section in HBase Reference Guide for instructions.

# export HBASE_JMX_BASE="-Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false"
# export HBASE_MASTER_OPTS="$HBASE_MASTER_OPTS $HBASE_JMX_BASE -Dcom.sun.management.jmxremote.port=10101"
# export HBASE_REGIONSERVER_OPTS="$HBASE_REGIONSERVER_OPTS $HBASE_JMX_BASE -Dcom.sun.management.jmxremote.port=10102"
# export HBASE_THRIFT_OPTS="$HBASE_THRIFT_OPTS $HBASE_JMX_BASE -Dcom.sun.management.jmxremote.port=10103"
# export HBASE_ZOOKEEPER_OPTS="$HBASE_ZOOKEEPER_OPTS $HBASE_JMX_BASE -Dcom.sun.management.jmxremote.port=10104"
# export HBASE_REST_OPTS="$HBASE_REST_OPTS $HBASE_JMX_BASE -Dcom.sun.management.jmxremote.port=10105"

# File naming hosts on which HRegionServers will run.  $HBASE_HOME/conf/regionservers by default.
# export HBASE_REGIONSERVERS=${HBASE_HOME}/conf/regionservers

# Uncomment and adjust to keep all the Region Server pages mapped to be memory resident
#HBASE_REGIONSERVER_MLOCK=true
#HBASE_REGIONSERVER_UID="hbase"

# File naming hosts on which backup HMaster will run.  $HBASE_HOME/conf/backup-masters by default.
# export HBASE_BACKUP_MASTERS=${HBASE_HOME}/conf/backup-masters

# Extra ssh options.  Empty by default.
# export HBASE_SSH_OPTS="-o ConnectTimeout=1 -o SendEnv=HBASE_CONF_DIR"

# Where log files are stored.  $HBASE_HOME/logs by default.
# export HBASE_LOG_DIR=${HBASE_HOME}/logs

# Enable remote JDWP debugging of major HBase processes. Meant for Core Developers 
# export HBASE_MASTER_OPTS="$HBASE_MASTER_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=8070"
# export HBASE_REGIONSERVER_OPTS="$HBASE_REGIONSERVER_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=8071"
# export HBASE_THRIFT_OPTS="$HBASE_THRIFT_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=8072"
# export HBASE_ZOOKEEPER_OPTS="$HBASE_ZOOKEEPER_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=8073"

# A string representing this instance of hbase. $USER by default.
# export HBASE_IDENT_STRING=$USER

# The scheduling priority for daemon processes.  See 'man nice'.
# export HBASE_NICENESS=10

# The directory where pid files are stored. /tmp by default.
# export HBASE_PID_DIR=/var/hadoop/pids

# Seconds to sleep between slave commands.  Unset by default.  This
# can be useful in large clusters, where, e.g., slave rsyncs can
# otherwise arrive faster than the master can service them.
# export HBASE_SLAVE_SLEEP=0.1

# Tell HBase whether it should manage it's own instance of Zookeeper or not.
# export HBASE_MANAGES_ZK=true

# The default log rolling policy is RFA, where the log file is rolled as per the size defined for the 
# RFA appender. Please refer to the log4j.properties file to see more details on this appender.
# In case one needs to do log rolling on a date change, one should set the environment property
# HBASE_ROOT_LOGGER to "<DESIRED_LOG LEVEL>,DRFA".
# For example:
# HBASE_ROOT_LOGGER=INFO,DRFA
# The reason for changing default to RFA is to avoid the boundary case of filling out disk space as 
# DRFA doesn't put any cap on the log size. Please refer to HBase-5655 for more context.

export HBASE_MANAGES_ZK=false
export HBASE_DAEMON_DEFAULT_ROOT_LOGGER=INFO,DRFA
export HBASE_DAEMON_DEFAULT_SECURITY_LOGGER=INFO,DRFAS
export HBASE_CLASSPATH=${HBASE_CLASSPATH}${HBASE_CLASSPATH:+:}$(ls -1 /usr/lib/phoenix/phoenix-*-HBase-*-server.jar)

AND SERVER SIDE hbase-site.xmllooks like this:
<configuration>
  <property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
  </property>

  <property>
    <name>hbase.zookeeper.quorum</name>
    <value>ip-10-16-129-55.ec2.internal</value>
  </property>

  <property>
    <name>hbase.rootdir</name>
    <value>s3://dev-mock-transcription/</value>
  </property>

  <property>
    <name>dfs.support.append</name>
    <value>true</value>
  </property>

  <property>
    <name>hbase.rest.port</name>
    <value>8070</value>
  </property>

  <property>
    <name>hbase.replication</name>
    <value>false</value>
  </property>

  <property>
    <name>hbase.balancer.tablesOnMaster</name>
    <value>hbase:meta</value>
  </property>

  <property>
    <name>hbase.bucketcache.size</name>
    <value>8192</value>
  </property>

  <property>
    <name>hbase.master.balancer.uselocality</name>
    <value>false</value>
  </property>

  <property>
    <name>hbase.master.startup.retainassign</name>
    <value>false</value>
  </property>

  <property>
    <name>hbase.wal.dir</name>
    <value>hdfs://ip-10-16-129-55.ec2.internal:8020/user/hbase/WAL</value>
  </property>

  <property>
    <name>hbase.bulkload.retries.retryOnIOException</name>
    <value>true</value>
  </property>

  <property>
    <name>hbase.bucketcache.ioengine</name>
    <value>files:/mnt1/hbase/bucketcache</value>
  </property>



<property>
      <name>hbase.rpc.timeout</name>
      <value>180000</value>
    </property> 
  <property>
      <name>index.writer.threads.max</name>
      <value>30</value>
 </property>
 <property>
      <name>index.builder.threads.max</name>
      <value>30</value>
 </property>
 </configuration>

Solution

    1. For large write I use a Spark Job with phoenix-spark and I am able to reach 16k lines upserted per second with 4 executors.

    2. It is normal that your count query is very slow because, as your explain plan shows, it is doing a fullscan. Meaning it has to read all lines to have the actual count. Phoenix is very powerfull when querying data with its primarykey or at least the beginning of it. The more of the beginning of the primarykey you give the faster.

    ```

    0: jdbc:phoenix:> explain select count(1) from TEST_TABLE;
    +------------------------------------------------------------------------------+
    |                                     PLAN                                     |
    +------------------------------------------------------------------------------+
    | CLIENT 1-CHUNK 0 ROWS 0 BYTES PARALLEL 1-WAY FULL SCAN OVER TEST_TABLE       |
    |     SERVER FILTER BY FIRST KEY ONLY                                          |
    |     SERVER AGGREGATE INTO SINGLE ROW                                         |
    +------------------------------------------------------------------------------+
    3 rows selected (0.039 seconds)
    0: jdbc:phoenix:> select count(1) from TEST_TABLE;
    1 row selected (0.555 seconds)
    
    0: jdbc:phoenix:> explain select * from TEST_TABLE where PK like 'toto';
    +---------------------------------------------------------------------------------------+
    |                                         PLAN                                          |
    +---------------------------------------------------------------------------------------+
    | CLIENT 1-CHUNK PARALLEL 1-WAY ROUND ROBIN POINT LOOKUP ON 1 KEY OVER TEST_TABLE  |
    +---------------------------------------------------------------------------------------+
    1 row selected (0.047 seconds)
    0: jdbc:phoenix:> select * from TEST_TABLE where PK like 'to%';
    2 rows selected (0.142 seconds)
    
    0: jdbc:phoenix:> explain select * from TEST_TABLE where PK = 'toto';
    +---------------------------------------------------------------------------------------+
    |                                         PLAN                                          |
    +---------------------------------------------------------------------------------------+
    | CLIENT 1-CHUNK PARALLEL 1-WAY ROUND ROBIN POINT LOOKUP ON 1 KEY OVER TEST_TABLE       |
    +---------------------------------------------------------------------------------------+
    1 row selected (0.019 seconds)
    0: jdbc:phoenix:> select * from TEST_TABLE where PK = 'toto';
    1 row selected (0.05 seconds)
    0: jdbc:phoenix:>
    

    ```