Search code examples
pythongoogle-bigqueryprotocol-buffersgoogle-api-python-client

google.api_core.exceptions.Unknown: None There was a problem opening the stream. Try turning on DEBUG level logs to see the error


I am working on writing a python script to load the data from Pub/Sub to BigQuery using Storage Write API's streaming method with default stream. I am trying to adapt https://github.com/googleapis/python-bigquery-storage/blob/main/samples/snippets/append_rows_proto2.py to my needs but I am running into an error

As per the google documentation, I have converted my data in ProtoBuf format for Python client.

However I am getting this error continuously while trying to run my program.

(venv) {{MY_COMPUTER}} {{FOLDER_NAME}} % python3 default_Stream.py
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): metadata.google. internal.:80
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): metadata.google. internal.:80
DEBUG:google.cloud.logging_v2.handlers.transports.background_thread:Background thread started.
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): metadata.google. internal.:80
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): metadata.google. internal.:80
Traceback (most recent call last) :
    File "default_Stream.py" line 116, in <module>
        append_rows_default(“{{GCLOUD_PROJECT_NAME}}, “{{BIGQUERY_DATASET_NAME}}”, “{{BIGQUERY_TABLE}}”)
    File "default_ Stream.py", line 95, in append_rows default
        response_future_1 = append_rows_stream. send (request)
    File “{{VIRTUAL_ENVIRONMENT_PATH}}/venv/lib/python3.7/site-packages/google/cloud/bigquery_storage_v1/writer.py", line 234, in send
        return self._open(request)
    File "{{VIRTUAL_ENVIRONMENT_PATH}}/venv/lib/python3.7/site-packages/google/cloud/bigquery_storage_v1/writer.py", line 207, in _open
        raise request_exception
google.api_core.exceptions. Unknown: None There was a problem opening the stream. Try turning on DEBUG level logs to
see the error.
Waiting up to 5 seconds.
Sent all pending logs.

Here is my script:

# [START bigquerystorage_append_rows_default]
"""
This code sample demonstrates how to write records
using the low-level generated client for Python.
"""

from xmlrpc.client import boolean
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
import logging
import google.cloud.logging
#from google.cloud import logging
# If you update the customer_record.proto protocol buffer definition, run:
#
#   protoc --python_out=. customer_record.proto
#
# from the samples/snippets directory to generate the debezium_record_pb2.py module.
import debezium_record_pb2


def create_row_data(id: int, name: str, role: int, joining_date: int, last_updated: int, is_deleted: boolean):
    row = debezium_record_pb2.DebeziumRecord()
    row.column1 = column1
    row.column2 = column2
    row.column3 = column3
    row.column4 = column4
    row.column5 = column5
    row.column6 = column6
    return row.SerializeToString()


def append_rows_default(project_id: str, dataset_id: str, table_id: str):

    """Create a write stream, write some sample data, and commit the stream."""

    client = google.cloud.logging.Client()
    logging.basicConfig(level=logging.DEBUG)
    client.setup_logging()
    #logging.getLogger().setLevel(logging.INFO)

    write_client = bigquery_storage_v1.BigQueryWriteClient()
    parent = write_client.table_path(project_id, dataset_id, table_id)
    stream_name = f'{parent}/_default'
    write_stream = types.WriteStream()
        
    #write_stream.type_ = types.WriteStream.Type.PENDING
    # write_stream = write_client.create_write_stream(
    #     parent=parent, write_stream=write_stream
    # )
    #stream_name = write_stream.name
    

    # Create a template with fields needed for the first request.
    request_template = types.AppendRowsRequest()

    # The initial request must contain the stream name.
    request_template.write_stream = stream_name

    # So that BigQuery knows how to parse the serialized_rows, generate a
    # protocol buffer representation of your message descriptor.
    proto_schema = types.ProtoSchema()
    proto_descriptor = descriptor_pb2.DescriptorProto()
    debezium_record_pb2.DebeziumRecord.DESCRIPTOR.CopyToProto(proto_descriptor)
    proto_schema.proto_descriptor = proto_descriptor
    proto_data = types.AppendRowsRequest.ProtoData()
    proto_data.writer_schema = proto_schema
    request_template.proto_rows = proto_data

    # Some stream types support an unbounded number of requests. Construct an
    # AppendRowsStream to send an arbitrary number of requests to a stream.
    append_rows_stream = writer.AppendRowsStream(write_client, request_template)

    # Create a batch of row data by appending proto2 serialized bytes to the
    # serialized_rows repeated field.
    proto_rows = types.ProtoRows()
    proto_rows.serialized_rows.append(create_row_data(8, "E", 13, 1643673600000, 1654556118813, False))
    #proto_rows.serialized_rows.append(create_row_data(2, "Bob"))

    # Set an offset to allow resuming this stream if the connection breaks.
    # Keep track of which requests the server has acknowledged and resume the
    # stream at the first non-acknowledged message. If the server has already
    # processed a message with that offset, it will return an ALREADY_EXISTS
    # error, which can be safely ignored.
    #
    # The first request must always have an offset of 0.
    request = types.AppendRowsRequest()
    request.offset = 0
    proto_data = types.AppendRowsRequest.ProtoData()
    proto_data.rows = proto_rows
    request.proto_rows = proto_data

    logging.basicConfig(level=logging.DEBUG)
    response_future_1 = append_rows_stream.send(request)
    logging.basicConfig(level=logging.DEBUG)

    print(response_future_1.result())
    #print(response_future_2.result())

    # Shutdown background threads and close the streaming connection.
    append_rows_stream.close()

    # No new records can be written to the stream after this method has been called.
    write_client.finalize_write_stream(name=write_stream.name)

    # Commit the stream you created earlier.
    batch_commit_write_streams_request = types.BatchCommitWriteStreamsRequest()
    batch_commit_write_streams_request.parent = parent
    batch_commit_write_streams_request.write_streams = [write_stream.name]
    write_client.batch_commit_write_streams(batch_commit_write_streams_request)

    print(f"Writes to stream: '{write_stream.name}' have been committed.")

if __name__ == "__main__":
    append_rows_default(“{{GCLOUD_PROJECT_NAME}}, “{{BIGQUERY_DATASET_NAME}}”, “{{BIGQUERY_TABLE}}”)

# [END bigquerystorage_append_rows_default]

This is my proto file (debezium_record_pb2.py)

syntax = "proto3";
// cannot contain fields which are not present in the table.
message DebeziumRecord {
    uint32 column1 = 1;
    string column2 = 2;
    uint32 column3 = 3;
    uint64 column4 = 4;
    uint64 column5 = 5;
    bool column6 = 6;
}

This is the definition of my BigQuery table

CREATE TABLE `{{GCLOUD_PROJECT_NAME}}.{{BIGQUERY_DATASET_NAME}}.{{BIGQUERY_TABLE}}`
(
  column1 INT64 NOT NULL,
  column2 STRING,
  column3 INT64,
  column4 INT64 NOT NULL,
  column5 INT64,
  column6 BOOL
);

I have been stuck on this error and cannot proceed further.Any pointers would be really appreciated.

Thanks


Solution

  • From another team member of the posting user:

    We had to fix our logging output in order to see what the error actually was

    We changed this portion of default_Stream.py

    if __name__ == "__main__":
        append_rows_default(“{{GCLOUD_PROJECT_NAME}}, “{{BIGQUERY_DATASET_NAME}}”, “{{BIGQUERY_TABLE}}”)
    
    # [END bigquerystorage_append_rows_default]
    

    to

    if __name__ == "__main__":
        logging.basicConfig(
            level=logging.DEBUG,
            format="%(asctime)s [%(levelname)s] %(message)s",
            handlers=[
                #logging.FileHandler("debug.log"),
                logging.StreamHandler()
            ]
        )
    
        append_rows_default(“{{GCLOUD_PROJECT_NAME}}, “{{BIGQUERY_DATASET_NAME}}”, “{{BIGQUERY_TABLE}}”)
    
    # [END bigquerystorage_append_rows_default]
    

    Then we ran python3 default_Stream.py --log=DEBUG

    Once we were actually getting the error message logged to the standard output, we saw that the error was

    grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
            status = StatusCode.INVALID_ARGUMENT
            details = "The proto field mismatched with BigQuery field at DebeziumRecord.column4, the proto field type uint64, BigQuery field type INTEGER Entity: projects/{{GCLOUD_PROJECT_NAME}}/datasets/{{BIGQUERY_DATASET_NAME}}/tables/{{BIGQUERY_TABLE}}/_default"
            debug_error_string = "{"created":"@1656037879.048726680","description":"Error received from peer ipv4:142.251.6.95:443","file":"src/core/lib/surface/call.cc","file_line":966,"grpc_message":"The proto field mismatched with BigQuery field at DebeziumRecord.column4, the proto field type uint64, BigQuery field type INTEGER Entity: projects/{{GCLOUD_PROJECT_NAME}}/datasets/{{BIGQUERY_DATASET_NAME}}/tables/{{BIGQUERY_TABLE}}/_default","grpc_status":3}"
    >
    

    To fix that error we corrected the data types of column4 and column5 to be int64 instead of uint64, per https://cloud.google.com/bigquery/docs/write-api#data_type_conversions


    There are still additional errors/issues with default_Stream.py that we are working through, but this was the answer to this question