I am working on writing a python script to load the data from Pub/Sub to BigQuery using Storage Write API's streaming method with default stream. I am trying to adapt https://github.com/googleapis/python-bigquery-storage/blob/main/samples/snippets/append_rows_proto2.py to my needs but I am running into an error
As per the google documentation, I have converted my data in ProtoBuf format for Python client.
However I am getting this error continuously while trying to run my program.
(venv) {{MY_COMPUTER}} {{FOLDER_NAME}} % python3 default_Stream.py
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): metadata.google. internal.:80
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): metadata.google. internal.:80
DEBUG:google.cloud.logging_v2.handlers.transports.background_thread:Background thread started.
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): metadata.google. internal.:80
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): metadata.google. internal.:80
Traceback (most recent call last) :
File "default_Stream.py" line 116, in <module>
append_rows_default(“{{GCLOUD_PROJECT_NAME}}, “{{BIGQUERY_DATASET_NAME}}”, “{{BIGQUERY_TABLE}}”)
File "default_ Stream.py", line 95, in append_rows default
response_future_1 = append_rows_stream. send (request)
File “{{VIRTUAL_ENVIRONMENT_PATH}}/venv/lib/python3.7/site-packages/google/cloud/bigquery_storage_v1/writer.py", line 234, in send
return self._open(request)
File "{{VIRTUAL_ENVIRONMENT_PATH}}/venv/lib/python3.7/site-packages/google/cloud/bigquery_storage_v1/writer.py", line 207, in _open
raise request_exception
google.api_core.exceptions. Unknown: None There was a problem opening the stream. Try turning on DEBUG level logs to
see the error.
Waiting up to 5 seconds.
Sent all pending logs.
Here is my script:
# [START bigquerystorage_append_rows_default]
"""
This code sample demonstrates how to write records
using the low-level generated client for Python.
"""
from xmlrpc.client import boolean
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
import logging
import google.cloud.logging
#from google.cloud import logging
# If you update the customer_record.proto protocol buffer definition, run:
#
# protoc --python_out=. customer_record.proto
#
# from the samples/snippets directory to generate the debezium_record_pb2.py module.
import debezium_record_pb2
def create_row_data(id: int, name: str, role: int, joining_date: int, last_updated: int, is_deleted: boolean):
row = debezium_record_pb2.DebeziumRecord()
row.column1 = column1
row.column2 = column2
row.column3 = column3
row.column4 = column4
row.column5 = column5
row.column6 = column6
return row.SerializeToString()
def append_rows_default(project_id: str, dataset_id: str, table_id: str):
"""Create a write stream, write some sample data, and commit the stream."""
client = google.cloud.logging.Client()
logging.basicConfig(level=logging.DEBUG)
client.setup_logging()
#logging.getLogger().setLevel(logging.INFO)
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
stream_name = f'{parent}/_default'
write_stream = types.WriteStream()
#write_stream.type_ = types.WriteStream.Type.PENDING
# write_stream = write_client.create_write_stream(
# parent=parent, write_stream=write_stream
# )
#stream_name = write_stream.name
# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()
# The initial request must contain the stream name.
request_template.write_stream = stream_name
# So that BigQuery knows how to parse the serialized_rows, generate a
# protocol buffer representation of your message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
debezium_record_pb2.DebeziumRecord.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Some stream types support an unbounded number of requests. Construct an
# AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# Create a batch of row data by appending proto2 serialized bytes to the
# serialized_rows repeated field.
proto_rows = types.ProtoRows()
proto_rows.serialized_rows.append(create_row_data(8, "E", 13, 1643673600000, 1654556118813, False))
#proto_rows.serialized_rows.append(create_row_data(2, "Bob"))
# Set an offset to allow resuming this stream if the connection breaks.
# Keep track of which requests the server has acknowledged and resume the
# stream at the first non-acknowledged message. If the server has already
# processed a message with that offset, it will return an ALREADY_EXISTS
# error, which can be safely ignored.
#
# The first request must always have an offset of 0.
request = types.AppendRowsRequest()
request.offset = 0
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
logging.basicConfig(level=logging.DEBUG)
response_future_1 = append_rows_stream.send(request)
logging.basicConfig(level=logging.DEBUG)
print(response_future_1.result())
#print(response_future_2.result())
# Shutdown background threads and close the streaming connection.
append_rows_stream.close()
# No new records can be written to the stream after this method has been called.
write_client.finalize_write_stream(name=write_stream.name)
# Commit the stream you created earlier.
batch_commit_write_streams_request = types.BatchCommitWriteStreamsRequest()
batch_commit_write_streams_request.parent = parent
batch_commit_write_streams_request.write_streams = [write_stream.name]
write_client.batch_commit_write_streams(batch_commit_write_streams_request)
print(f"Writes to stream: '{write_stream.name}' have been committed.")
if __name__ == "__main__":
append_rows_default(“{{GCLOUD_PROJECT_NAME}}, “{{BIGQUERY_DATASET_NAME}}”, “{{BIGQUERY_TABLE}}”)
# [END bigquerystorage_append_rows_default]
This is my proto file (debezium_record_pb2.py)
syntax = "proto3";
// cannot contain fields which are not present in the table.
message DebeziumRecord {
uint32 column1 = 1;
string column2 = 2;
uint32 column3 = 3;
uint64 column4 = 4;
uint64 column5 = 5;
bool column6 = 6;
}
This is the definition of my BigQuery table
CREATE TABLE `{{GCLOUD_PROJECT_NAME}}.{{BIGQUERY_DATASET_NAME}}.{{BIGQUERY_TABLE}}`
(
column1 INT64 NOT NULL,
column2 STRING,
column3 INT64,
column4 INT64 NOT NULL,
column5 INT64,
column6 BOOL
);
I have been stuck on this error and cannot proceed further.Any pointers would be really appreciated.
Thanks
From another team member of the posting user:
We had to fix our logging output in order to see what the error actually was
We changed this portion of default_Stream.py
if __name__ == "__main__":
append_rows_default(“{{GCLOUD_PROJECT_NAME}}, “{{BIGQUERY_DATASET_NAME}}”, “{{BIGQUERY_TABLE}}”)
# [END bigquerystorage_append_rows_default]
to
if __name__ == "__main__":
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
#logging.FileHandler("debug.log"),
logging.StreamHandler()
]
)
append_rows_default(“{{GCLOUD_PROJECT_NAME}}, “{{BIGQUERY_DATASET_NAME}}”, “{{BIGQUERY_TABLE}}”)
# [END bigquerystorage_append_rows_default]
Then we ran python3 default_Stream.py --log=DEBUG
Once we were actually getting the error message logged to the standard output, we saw that the error was
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.INVALID_ARGUMENT
details = "The proto field mismatched with BigQuery field at DebeziumRecord.column4, the proto field type uint64, BigQuery field type INTEGER Entity: projects/{{GCLOUD_PROJECT_NAME}}/datasets/{{BIGQUERY_DATASET_NAME}}/tables/{{BIGQUERY_TABLE}}/_default"
debug_error_string = "{"created":"@1656037879.048726680","description":"Error received from peer ipv4:142.251.6.95:443","file":"src/core/lib/surface/call.cc","file_line":966,"grpc_message":"The proto field mismatched with BigQuery field at DebeziumRecord.column4, the proto field type uint64, BigQuery field type INTEGER Entity: projects/{{GCLOUD_PROJECT_NAME}}/datasets/{{BIGQUERY_DATASET_NAME}}/tables/{{BIGQUERY_TABLE}}/_default","grpc_status":3}"
>
To fix that error we corrected the data types of column4 and column5 to be int64
instead of uint64
, per https://cloud.google.com/bigquery/docs/write-api#data_type_conversions
There are still additional errors/issues with default_Stream.py that we are working through, but this was the answer to this question