Search code examples
javagoogle-bigquerygoogle-cloud-dataflowapache-beamgoogle-api-java-client

java.lang.IllegalStateException: getTransportChannel() called when needsExecutor() is true while using bigquery client library in dataflow


Due to some criteria I need to write some entries to a specific table using bigquery client library in my dataflow job, I can query a table using bq client library without any issue but when I try to write into the table I get the following error:

[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on project <project name>: An exception occured while executing the Java class. java.lang.IllegalStateException: getTransportChannel() called when needsExecutor() is true -> [Help 1]

Below is the full error

[org.##.addg.gcp.##.main()] INFO org.apache.beam.runners.dataflow.DataflowRunner - PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 220 files. Enable logging at DEBUG level to see which files will be staged.
[WARNING]
java.lang.RuntimeException: java.lang.IllegalStateException: getTransportChannel() called when needsExecutor() is true
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.newBigQueryWriteClient (BigQueryServicesImpl.java:1262)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.access$800 (BigQueryServicesImpl.java:134)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.<init> (BigQueryServicesImpl.java:523)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.<init> (BigQueryServicesImpl.java:451)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.getDatasetService (BigQueryServicesImpl.java:168)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead.validate (BigQueryIO.java:985)
    at org.apache.beam.sdk.Pipeline$ValidateVisitor.enterCompositeTransform (Pipeline.java:662)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:581)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:585)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:240)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:214)
    at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:469)
    at org.apache.beam.sdk.Pipeline.validate (Pipeline.java:598)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
    at org.##.addg.gcp.##.main (##.java:125)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:254)
    at java.lang.Thread.run (Thread.java:829)
Caused by: java.lang.IllegalStateException: getTransportChannel() called when needsExecutor() is true
    at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel (InstantiatingGrpcChannelProvider.java:204)
    at com.google.api.gax.rpc.ClientContext.create (ClientContext.java:205)
    at com.google.cloud.bigquery.storage.v1beta2.stub.GrpcBigQueryWriteStub.create (GrpcBigQueryWriteStub.java:138)
    at com.google.cloud.bigquery.storage.v1beta2.stub.BigQueryWriteStubSettings.createStub (BigQueryWriteStubSettings.java:145)
    at com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient.<init> (BigQueryWriteClient.java:128)
    at com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient.create (BigQueryWriteClient.java:109)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.newBigQueryWriteClient (BigQueryServicesImpl.java:1257)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.access$800 (BigQueryServicesImpl.java:134)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.<init> (BigQueryServicesImpl.java:523)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.<init> (BigQueryServicesImpl.java:451)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.getDatasetService (BigQueryServicesImpl.java:168)
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead.validate (BigQueryIO.java:985)
    at org.apache.beam.sdk.Pipeline$ValidateVisitor.enterCompositeTransform (Pipeline.java:662)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:581)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:585)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:240)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:214)
    at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:469)
    at org.apache.beam.sdk.Pipeline.validate (Pipeline.java:598)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
    at org.##.addg.gcp.##.main (##.java:125)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:254)
    at java.lang.Thread.run (Thread.java:829)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  14.009 s
[INFO] Finished at: 2021-11-17T10:55:59Z

The line 125 of main class just has the piepline.run(options).waitUntilFinish() statement Here's the code that I'm running

public class ## {

    public static void main(String[] args) throws IOException {
        
        DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(DataflowPipelineOptions.class);
        Pipeline pipeline = Pipeline.create(options);
        
        PCollection<TableRow> orders = pipeline
                .apply(BigQueryIO.readTableRows().fromQuery("query").usingStandardSql());
        
        PCollection<TableRow> customers = pipeline
                .apply(BigQueryIO.readTableRows().fromQuery("query").usingStandardSql());
        
        PCollection<Long> count = customers.apply(Count.globally());
        PCollectionView<Long> countView = count.apply(View.asSingleton());
        
        PCollection<Void> c = count.apply(ParDo.of(new MakeCountEntryFn("processed_records", countView)).withSideInput("count", countView));
        
        PCollection<TableRow> result = customers.apply(new Join(orders, "CustomerID"));
        
        BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
        
        TableId tableId = TableId.of("jointest", "abc");
        WriteChannelConfiguration writeChannelConfiguration =
                WriteChannelConfiguration.newBuilder(tableId).setFormatOptions(FormatOptions.json()).build();
        
        TableDataWriteChannel writer = bigquery.writer(writeChannelConfiguration);
        
        JSONObject jsonData = new JSONObject();
        jsonData.put("run_id", "test");
        jsonData.put("job_status", "running");
        
        String data = String.valueOf(jsonData);
         try {
              writer.write(ByteBuffer.wrap(data.getBytes(Charsets.UTF_8)));   
        } finally {
              writer.close();
        }
        
        pipeline.run(options).waitUntilFinish();

    }
    
    public static void writeToAbcTable(JSONObject jsonObject, String dataset, String table) throws IOException {
        
        BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
        
        TableId tableId = TableId.of(dataset, table);
        WriteChannelConfiguration writeChannelConfiguration =
                WriteChannelConfiguration.newBuilder(tableId).setFormatOptions(FormatOptions.json()).build();
        
        TableDataWriteChannel writer = bigquery.writer(writeChannelConfiguration);
        
        JSONObject jsonData = new JSONObject();
        jsonData.put("run_id", "test");
        jsonData.put("job_status", "running");
        
        String data = String.valueOf(jsonObject);
         try {
              writer.write(ByteBuffer.wrap(data.getBytes(Charsets.UTF_8)));   
        } finally {
              writer.close();
        }
    }
    
    public static class MakeCountEntryFn extends DoFn<Long, Void>{
        
        private String countColumn;
        private PCollectionView<Long> countView;

        public MakeCountEntryFn(String countColumn, PCollectionView<Long> countView) {
            this.countColumn = countColumn;
            this.countView = countView;
        }
        
        @ProcessElement
        public void ProcessElement(ProcessContext c) {
            Long i = c.sideInput(countView);
            JSONObject obj = new JSONObject();
            
            obj.put(countColumn, String.valueOf(c.element()));
            try {
                writeToAbcTable(obj, "test", "##");
            } catch (IOException e) {
                
                e.printStackTrace();
            }
        }
    }

}

I'm not sure what I need to change to get this code to work without that issue, any help would be appreciated. Thanks!


Solution

  • The failure is occurring during the validation of the configuration of the BigQueryIO connector, before running the job. The line that fails is

    DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions);
    

    Here, bqOptions are determined by the pipeline options you provide, which in this case come in as args. However, I would not expect this to cause such a failure.

    My best guess is that this is a dependency conflict, where the version of the gax, gRPC, and BigQuery storage client libraries are not aligned. To eliminate this from consideration, you can use the GCP Libraries BOM (https://mvnrepository.com/artifact/com.google.cloud/libraries-bom) which Beam also uses to align its GCP dependencies.