Search code examples
hbasebigtablegoogle-cloud-bigtable

Hbase vs Google Bigtable: scan for large number of rows


I am trying to a Scan with a start and end row on Bigtable. The elements between the scan are around 100K. I want to get them in batches which I was able to do in HBase with using setCaching(500).

In Bigtable, it seems setCaching is ignored and it tries to get the entire resultset in 1 RPC. How can it be achieved similar to HBase?

I am using Java driver bigtable-hbase-1.1 and version 1.0.0-pre3

Bigtable configuration:

Configuration conf = new Configuration();
conf.set("google.bigtable.buffered.mutator.throttling.enable", "false");
conf.set("google.bigtable.rpc.timeout.ms", "1500000");
conf.set("google.bigtable.grpc.read.partial.row.timeout.ms","1500000");
conf.set("google.bigtable.long.rpc.timeout.ms", "1500000");
conf.set("google.bigtable.grpc.retry.deadlineexceeded.enable", "false");
conf.set("google.bigtable.buffered.mutator.max.inflight.rpcs", "500");
conf.set("google.bigtable.bulk.max.row.key.count", "500");

Configuration conff =  BigtableConfiguration.configure(conf,projectID,instanceID);
connection = BigtableConfiguration.connect(conff);

Scanner configuration:

byte[] start = "prefix".getbytes() ;
byte[] end =  Bytes.add("prefix".getbytes(),(byte))0xff);
Scan scan = new Scan(start, end);

Expected number of rows to come out is of the order of 100Ks.


Solution

  • You don’t have to worry about batching when reading rows. The Bigtable responses get streamed and are backpressure aware. We rely on GRPC to buffer chunks of the stream as well. Here is a link to an introduction about GRPC streaming: https://grpc.io/docs/guides/concepts.html#server-streaming-rpc

    Would mind trying out this sample code and let me know if it works (ie. no deadline exceeded errors). If the sample code works, please modify it to scan your own data and make sure that it still works. If something doesn't, please let me know.

    pom.xml:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>com.google.cloud.example</groupId>
      <artifactId>row-write-read-example</artifactId>
      <version>1.0-SNAPSHOT</version>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.12</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>com.google.cloud.bigtable</groupId>
          <artifactId>bigtable-hbase-1.x</artifactId>
          <version>1.0.0-pre3</version>
        </dependency>
      </dependencies>
    
      <build>
        <plugins>
          <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.6.2</version>
            <configuration>
              <source>1.8</source>
              <target>1.8</target>
            </configuration>
          </plugin>
        </plugins>
      </build>
    </project>
    

    java:

    import com.google.cloud.bigtable.hbase.BigtableConfiguration;
    import java.io.IOException;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HConstants;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Admin;
    import org.apache.hadoop.hbase.client.BufferedMutator;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.client.Table;
    
    public class WriteReadTest {
      private static final String PROJECT_ID = "<YOUR_PROJECT_ID>";
      private static final String INSTANCE_ID = "<YOUR_INSTANCE_ID>";
      private static final String TABLE_ID = "<YOUR_NONEXISTENT_TABLE>";
      private static final String FAMILY = "cf";
    
      private static final TableName TABLE_NAME = TableName.valueOf(TABLE_ID);
    
      public static void main(String[] args) throws IOException {
        try(Connection connection = BigtableConfiguration.connect(PROJECT_ID, INSTANCE_ID);
            Admin admin = connection.getAdmin()) {
    
          // Setup
          admin.createTable(
              new HTableDescriptor(TABLE_NAME)
                  .addFamily(new HColumnDescriptor(FAMILY))
          );
    
          try {
            // Write the rows
            populateTable(connection, 2_000_000);
    
            // Read the rows
            readFullTable(connection);
          } finally {
            admin.disableTable(TABLE_NAME);
            admin.deleteTable(TABLE_NAME);
          }
    
        }
      }
    
      private static void populateTable(Connection connection, int rowCount) throws IOException {
        long startTime = System.currentTimeMillis();
        int buckets = 100;
        int maxWidth = Integer.toString(buckets).length();
    
        try(BufferedMutator bufferedMutator = connection.getBufferedMutator(TABLE_NAME)) {
          for (int i = 0; i < rowCount; i++) {
            String prefix = String.format("%0" + maxWidth + "d", i % buckets);
            String key = prefix + "-" + String.format("%010d", i);
            String value = "value-" + key;
    
            Put put = new Put(key.getBytes())
                .addColumn(
                    FAMILY.getBytes(),
                    HConstants.EMPTY_BYTE_ARRAY,
                    value.getBytes()
                );
    
            bufferedMutator.mutate(put);
          }
        }
    
        long endTime = System.currentTimeMillis();
        System.out.printf("Populated table in %d secs, writing %d rows\n", (endTime - startTime) / 1000, rowCount);
      }
    
      private static void readFullTable(Connection connection) throws IOException {
        long startTime = System.currentTimeMillis();
    
        int count = 0;
        try(Table table = connection.getTable(TABLE_NAME);
            ResultScanner scanner = table.getScanner(new Scan("0".getBytes(), "z".getBytes()))) {
    
          for(Result row = scanner.next(); row != null; row = scanner.next()) {
            count++;
          }
        }
    
        long endTime = System.currentTimeMillis();
    
        System.out.printf("Scanned table in %d secs, reading %d rows\n", (endTime - startTime) / 1000, count);
      }
    }