Due to some criteria I need to write some entries to a specific table using bigquery client library in my dataflow job, I can query a table using bq client library without any issue but when I try to write into the table I get the following error:
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on project <project name>: An exception occured while executing the Java class. java.lang.IllegalStateException: getTransportChannel() called when needsExecutor() is true -> [Help 1]
Below is the full error
[org.##.addg.gcp.##.main()] INFO org.apache.beam.runners.dataflow.DataflowRunner - PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 220 files. Enable logging at DEBUG level to see which files will be staged.
[WARNING]
java.lang.RuntimeException: java.lang.IllegalStateException: getTransportChannel() called when needsExecutor() is true
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.newBigQueryWriteClient (BigQueryServicesImpl.java:1262)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.access$800 (BigQueryServicesImpl.java:134)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.<init> (BigQueryServicesImpl.java:523)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.<init> (BigQueryServicesImpl.java:451)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.getDatasetService (BigQueryServicesImpl.java:168)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead.validate (BigQueryIO.java:985)
at org.apache.beam.sdk.Pipeline$ValidateVisitor.enterCompositeTransform (Pipeline.java:662)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:581)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:240)
at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:214)
at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:469)
at org.apache.beam.sdk.Pipeline.validate (Pipeline.java:598)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
at org.##.addg.gcp.##.main (##.java:125)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:254)
at java.lang.Thread.run (Thread.java:829)
Caused by: java.lang.IllegalStateException: getTransportChannel() called when needsExecutor() is true
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel (InstantiatingGrpcChannelProvider.java:204)
at com.google.api.gax.rpc.ClientContext.create (ClientContext.java:205)
at com.google.cloud.bigquery.storage.v1beta2.stub.GrpcBigQueryWriteStub.create (GrpcBigQueryWriteStub.java:138)
at com.google.cloud.bigquery.storage.v1beta2.stub.BigQueryWriteStubSettings.createStub (BigQueryWriteStubSettings.java:145)
at com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient.<init> (BigQueryWriteClient.java:128)
at com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient.create (BigQueryWriteClient.java:109)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.newBigQueryWriteClient (BigQueryServicesImpl.java:1257)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.access$800 (BigQueryServicesImpl.java:134)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.<init> (BigQueryServicesImpl.java:523)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.<init> (BigQueryServicesImpl.java:451)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.getDatasetService (BigQueryServicesImpl.java:168)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead.validate (BigQueryIO.java:985)
at org.apache.beam.sdk.Pipeline$ValidateVisitor.enterCompositeTransform (Pipeline.java:662)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:581)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:240)
at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:214)
at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:469)
at org.apache.beam.sdk.Pipeline.validate (Pipeline.java:598)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
at org.##.addg.gcp.##.main (##.java:125)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:254)
at java.lang.Thread.run (Thread.java:829)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 14.009 s
[INFO] Finished at: 2021-11-17T10:55:59Z
The line 125 of main class just has the piepline.run(options).waitUntilFinish() statement Here's the code that I'm running
public class ## {
public static void main(String[] args) throws IOException {
DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(DataflowPipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
PCollection<TableRow> orders = pipeline
.apply(BigQueryIO.readTableRows().fromQuery("query").usingStandardSql());
PCollection<TableRow> customers = pipeline
.apply(BigQueryIO.readTableRows().fromQuery("query").usingStandardSql());
PCollection<Long> count = customers.apply(Count.globally());
PCollectionView<Long> countView = count.apply(View.asSingleton());
PCollection<Void> c = count.apply(ParDo.of(new MakeCountEntryFn("processed_records", countView)).withSideInput("count", countView));
PCollection<TableRow> result = customers.apply(new Join(orders, "CustomerID"));
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
TableId tableId = TableId.of("jointest", "abc");
WriteChannelConfiguration writeChannelConfiguration =
WriteChannelConfiguration.newBuilder(tableId).setFormatOptions(FormatOptions.json()).build();
TableDataWriteChannel writer = bigquery.writer(writeChannelConfiguration);
JSONObject jsonData = new JSONObject();
jsonData.put("run_id", "test");
jsonData.put("job_status", "running");
String data = String.valueOf(jsonData);
try {
writer.write(ByteBuffer.wrap(data.getBytes(Charsets.UTF_8)));
} finally {
writer.close();
}
pipeline.run(options).waitUntilFinish();
}
public static void writeToAbcTable(JSONObject jsonObject, String dataset, String table) throws IOException {
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
TableId tableId = TableId.of(dataset, table);
WriteChannelConfiguration writeChannelConfiguration =
WriteChannelConfiguration.newBuilder(tableId).setFormatOptions(FormatOptions.json()).build();
TableDataWriteChannel writer = bigquery.writer(writeChannelConfiguration);
JSONObject jsonData = new JSONObject();
jsonData.put("run_id", "test");
jsonData.put("job_status", "running");
String data = String.valueOf(jsonObject);
try {
writer.write(ByteBuffer.wrap(data.getBytes(Charsets.UTF_8)));
} finally {
writer.close();
}
}
public static class MakeCountEntryFn extends DoFn<Long, Void>{
private String countColumn;
private PCollectionView<Long> countView;
public MakeCountEntryFn(String countColumn, PCollectionView<Long> countView) {
this.countColumn = countColumn;
this.countView = countView;
}
@ProcessElement
public void ProcessElement(ProcessContext c) {
Long i = c.sideInput(countView);
JSONObject obj = new JSONObject();
obj.put(countColumn, String.valueOf(c.element()));
try {
writeToAbcTable(obj, "test", "##");
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
I'm not sure what I need to change to get this code to work without that issue, any help would be appreciated. Thanks!
The failure is occurring during the validation of the configuration of the BigQueryIO connector, before running the job. The line that fails is
DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions);
Here, bqOptions
are determined by the pipeline options you provide, which in this case come in as args
. However, I would not expect this to cause such a failure.
My best guess is that this is a dependency conflict, where the version of the gax, gRPC, and BigQuery storage client libraries are not aligned. To eliminate this from consideration, you can use the GCP Libraries BOM (https://mvnrepository.com/artifact/com.google.cloud/libraries-bom) which Beam also uses to align its GCP dependencies.