I'm following the example here for writing to Cloud Bigtable from Spark Streaming: https://github.com/GoogleCloudPlatform/cloud-bigtable-examples/tree/master/scala/spark-streaming
In my instance, I'm consuming from Kafka, doing some transformations, then need to write them to my Bigtable instance. Initially, using all the dependency versions from that example, I was getting UNAUTHORIZED errors from timeouts when trying to access anything from Bigtable past connecting:
Refreshing the OAuth token Retrying failed call. Failure #1, got: Status{code=UNAUTHENTICATED, description=Unexpected failure get auth token,
cause=java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at com.google.bigtable.repackaged.com.google.cloud.bigtable.grpc.io.RefreshingOAuth2CredentialsInterceptor.getHeader(RefreshingOAuth2CredentialsInterceptor.java:290)
I then bumped the bigtable-hbase-1.x-hadoop
dependency to something more recent, like 1.9.0, and get through the authentication for the table admin work, but then get an additional UNAUTHORIZED when actually trying to make the saveAsNewAPIHadoopDataset()
call:
Retrying failed call. Failure #1, got: Status{code=UNAUTHENTICATED, description=Request had invalid authentication credentials. Expected OAuth 2 access token, login cookie or other valid authentication credential.
See https://developers.google.com/identity/sign-in/web/devconsole-project., cause=null} on channel 34.
Trailers: Metadata(www-authenticate=Bearer realm="https://accounts.google.com/",bigtable-channel-id=34)
I've found that removing the conf.set(BigtableOptionsFactory.BIGTABLE_HOST_KEY, BigtableOptions.BIGTABLE_BATCH_DATA_HOST_DEFAULT)
from the setBatchConfigOptions()
method allows the call to authenticate with the default host fine, and will process several Kafka messages, but then stalls, hangs up and eventually throws a No route to host
error:
019-07-25 17:29:12 INFO JobScheduler:54 - Added jobs for time 1564093750000 ms
2019-07-25 17:29:21 INFO JobScheduler:54 - Added jobs for time 1564093760000 ms
2019-07-25 17:29:31 INFO JobScheduler:54 - Added jobs for time 1564093770000 ms
2019-07-25 17:29:36 WARN OperationAccountant:116 - No operations completed within the last 30 seconds. There are still 1 operations in progress.
2019-07-25 17:29:36 WARN OperationAccountant:116 - No operations completed within the last 30 seconds. There are still 1 operations in progress.
2019-07-25 17:29:36 WARN OperationAccountant:116 - No operations completed within the last 30 seconds. There are still 1 operations in progress.
2019-07-25 17:29:36 WARN OperationAccountant:116 - No operations completed within the last 30 seconds. There are still 1 operations in progress.
2019-07-25 17:29:36 WARN OperationAccountant:116 - No operations completed within the last 30 seconds. There are still 1 operations in progress.
2019-07-25 17:29:36 WARN OperationAccountant:116 - No operations completed within the last 30 seconds. There are still 1 operations in progress.
2019-07-25 17:29:36 WARN OperationAccountant:116 - No operations completed within the last 30 seconds. There are still 1 operations in progress.
2019-07-25 17:29:36 WARN OperationAccountant:116 - No operations completed within the last 30 seconds. There are still 1 operations in progress.
2019-07-25 17:29:38 WARN AbstractRetryingOperation:130 - Retrying failed call.
Failure #1, got: Status{code=UNAVAILABLE, description=io exception, cause=com.google.bigtable.repackaged.io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AnnotatedNoRouteToHostException: No route to host: batch-bigtable.googleapis.com/2607:f8b0:400f:801:0:0:0:200a:443
I'm assuming this is an issue with dependency versions since that example is fairly old, but can't find any newer examples of writing to Bigtable from Spark Streaming. I haven't had any luck finding version combinations that work with bigtable-hbase-2.x-hadoop
.
Current POM:
<scala.version>2.11.0</scala.version>
<spark.version>2.3.3</spark.version>
<hbase.version>1.3.1</hbase.version>
<bigtable.version>1.9.0</bigtable.version>
<dependencies>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.7.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>26.0-jre</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-logging</artifactId>
<version>1.74.0</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-hbase-2.x-hadoop</artifactId>
<version>${bigtable.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>0.95.0-alpha</version>
</dependency>
The authentication issues with batch mode were a known issue in the Bigtable API. They recently released 1.12.0 which addressed these issues. The NoRouteToHostException was isolated to running locally, and ended up being a corporate firewall issue which resolved when setting -Dhttps.proxyHost and -Dhttps.proxyPort.