Search code examples
apache-sparkhbasegoogle-cloud-dataprocgoogle-cloud-bigtable

Dependency Issues for Cloud DataProc + Spark + Cloud BigTable with JAVA


I need to create an application to run on Cloud DataProc and process large BigTable writes, scans, and deletes in massively parallel fashion using Spark. This could be in JAVA (or Python if it's doable).

I am trying to write the minimum code using Eclipse to achieve the basic functionality of getting an RDD out of a BigTable table, either by using bulkPut/bulkDelete/butkGet or by using newAPIHadoopRDD() or something similar.

I have seen multiple posts on SO and elsewhere how this can be done and on various challenges with connecting Bigtable API, HBase API, and Spark. Some of these posts are quite outdated by now (couple years old, so may not be relevant). So far I haven't managed to get anything working, mostly due to various dependency clashes or inconsistencies. No matter what combination of dependencies and versions I try in POM.XML I get either ClassNotFound or NoSuchMethod exception when I try running things.

Can I get some advice please on what are the 'working' combinations of Spark, HBase, and Bigtable dependency versions and packages that I need to include? My POM.xml currently looks like below.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>FFSpark5</groupId>
  <artifactId>FFSpark5</artifactId>
  <version>0.0.1-SNAPSHOT</version>


    <properties>
        <bigtable.version>1.0.0</bigtable.version>
        <hbase.version>1.3.1</hbase.version>
        <hbase-shaded.version>2.0.0-alpha2</hbase-shaded.version>
        <hbase-spark.version>2.0.0-alpha4</hbase-spark.version>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
        <spark.version>1.6.2</spark.version>
        <spark-streaming.version>1.6.2</spark-streaming.version>
        <scala.version>2.11.0</scala.version>
        <scalatest.version>3.0.3</scalatest.version>
        <bigtable.projectID>my_gcp_project_id</bigtable.projectID>
        <bigtable.instanceID>my_bigtable_instance_name</bigtable.instanceID>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark-streaming.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-spark</artifactId>
         <version>${hbase-spark.version}</version>
        </dependency>

        <dependency>
            <groupId>com.google.cloud.bigtable</groupId>
            <artifactId>bigtable-hbase-1.x-hadoop</artifactId>
            <version>${bigtable.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-shaded-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-shaded-client</artifactId>
            <version>${hbase-shaded.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-shaded-server</artifactId>
            <version>${hbase-shaded.version}</version>
        </dependency>

    </dependencies>

    <build>
    <outputDirectory>target/classes</outputDirectory>

    <resources>
        <resource>
            <directory>src/main/resources</directory>
            <filtering>true</filtering>
        </resource>
    </resources>

    <plugins>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
                <goal>single</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <descriptorRefs>
              <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
          <archive>
            <manifest>
                <mainClass>FFSpark5</mainClass>
            </manifest>
          </archive>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

I realize there is probably a lot of wrong stuff in this version of POM.xml, but I have played around with many combinations of dependencies and versions, and couldn't get any of them do actually work. This latest one seems to be going the furthest in terms of the log outputs, but still breaks. Here's the latest stack trace:

18/03/12 15:37:17 INFO BigtableSession: Bigtable options: BigtableOptions{dataHost=bigtable.googleapis.com, tableAdminHost=bigtableadmin.googleapis.com, instanceAdminHost=bigtableadmin.googleapis.com .... (lost of other options here)}.
18/03/12 15:37:17 INFO RefreshingOAuth2CredentialsInterceptor: Refreshing the OAuth token
18/03/12 15:37:19 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 210.6 KB, free 210.6 KB)
18/03/12 15:37:19 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 19.9 KB, free 230.5 KB)
18/03/12 15:37:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:58982 (size: 19.9 KB, free: 457.9 MB)
18/03/12 15:37:19 INFO SparkContext: Created broadcast 0 from broadcast at HBaseContext.scala:73
18/03/12 15:37:19 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 160.0 B, free 230.6 KB)
18/03/12 15:37:19 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 120.0 B, free 230.7 KB)
18/03/12 15:37:19 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:58982 (size: 120.0 B, free: 457.9 MB)
18/03/12 15:37:19 INFO SparkContext: Created broadcast 1 from broadcast at HBaseContext.scala:74
Direct test done
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Scan.getMvccReadPoint()J
    at org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor.getMvccReadPoint(PackagePrivateFieldAccessor.java:39)
    at org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toScan(ProtobufUtil.java:1088)
    at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.convertScanToString(TableMapReduceUtil.java:601)
    at FFSpark5.main(FFSpark5.java:64)

And below is my basic code. The idea is to do 3 tests:

Test 1: simply tries to access Bigtable directly via the simple Bigtable API, just to make sure there are no simple issues, like authentication, etc. This works fine

Test 2: tries to get newAPIHadoopRDD(). This fails

Test 3: tries to get bulkPut(). This fails

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.spark.JavaHBaseContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.client.Connection;

class PutFunction implements Function<String, Put> {

    private static final long serialVersionUID = 1L;


    public Put call(String v) throws Exception {
      String[] cells = v.split(",");
      Put put = new Put(Bytes.toBytes(cells[0]));

      put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]),
              Bytes.toBytes(cells[3]));
      return put;
    }
  }


public class FFSpark5
{
    public static void main(String args[]) throws IOException
    {
        SparkConf conf = new SparkConf().setAppName("SparkTest").setMaster("local");        
        JavaSparkContext sc = new JavaSparkContext(conf);       
        Configuration hBaseConf = HBaseConfiguration.create();
        Connection connection = ConnectionFactory.createConnection(hBaseConf);
        JavaHBaseContext hbaseContext = new JavaHBaseContext(sc, hBaseConf);

        // test 1: simple direct Bigtable access
        connection.getTable(TableName.valueOf("FFTransFlat".getBytes()))
            .put(new Put("abc".getBytes())
            .addColumn("CI".getBytes(), "I".getBytes(), "val".getBytes()));
        System.out.println("Direct test done");


        // Test 2: newAPIHadoopRDD() 
        Scan scan1 = new Scan();
        scan1.setCaching(500);
        scan1.setCacheBlocks(false);

        hBaseConf.set(TableInputFormat.INPUT_TABLE, "FFTransFlat");
        hBaseConf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(scan1));

        JavaPairRDD<ImmutableBytesWritable, Result> source = sc
                .newAPIHadoopRDD(hBaseConf, TableInputFormat.class,
                        ImmutableBytesWritable.class, Result.class);

        System.out.println(source.count());

        // Test 3: bulkPut()  

         List<String> list = new ArrayList<String>(5);
            list.add("1,CI,a,1");   
            list.add("2,CI,a,2");   
            list.add("3,CI,a,3");   

            JavaRDD<String> rdd = sc.parallelize(list);

            byte tableName[] = "FFTransFlat".getBytes();
            hbaseContext.bulkPut(rdd,
                    TableName.valueOf(tableName),
                    new PutFunction());

            System.out.println(source.count());


        connection.close();
    }

}

I see from the DataProc website that Spark 2.2.0 and 1.6.2 are supported. I had issues with 2.2.0, so I'm using 1.6.2.

Can I get some advice on the following: What is the right combination of dependencies and versions to use (specifically to work with Cloud Bigtable, not HBase cluster)

Is the recommended to achieve parallelization by using newAPIHadoopRDD or something like bulkRead()/bulkDelete()/etc. ? Or is there another preferred and performant way to do MPP with DataProc/Bigtable?

Apologies for the lengthy post -- this is our first DataProc attempt.

*** UPDATE:

I managed to get something working after updating the Bigtable dependency to bigtable-hbase-2.x-hadoop and the HBase versions to 2.0.0-alpha2. At least the bulkPut seems to be working at this stage. Now will work on cleaning out the unneeded stuff from dependencies.


Solution

  • Here is a full working example of Cloud Bigtable with Hortonwork's SHC which is based on HBase 1.. We will work on creating a similar example with HBase 2.'s new Spark integration built on Cloud Bigtable artifacts designed to work with the new HBase 2.* APIs (tracking issue link).