In Python, I'm trying to write local JSON to my bigquery table with Apache Beam. But I keep getting this error:
/opt/homebrew/lib/python3.11/site-packages/apache_beam/io/gcp/bigquery.py:2028: BeamDeprecationWarning: options is deprecated since First stable release. References to <pipeline>.options will not be supported
is_streaming_pipeline = p.options.view_as(StandardOptions).streaming
2.48.0: Pulling from apache/beam_java11_sdk
Digest: sha256:ab9e4fb16e4a3b8090309ceed1f22c0d7de64ee9f27d688e4a35145cabbfa179
Status: Image is up to date for apache/beam_java11_sdk:2.48.0
docker.io/apache/beam_java11_sdk:2.48.0
WARNING: The requested image's platform (linux/amd64) does not match the detected host platform (linux/arm64/v8) and no specific platform was requested
d8aaf6813f9c5df568e0f7c97947e802950a0ce598796bcb59660109baa51e9f
Traceback (most recent call last):
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/common.py", line 1418, in process
return self.do_fn_invoker.invoke_process(windowed_value)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/common.py", line 624, in invoke_process
self.output_handler.handle_process_outputs(
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/common.py", line 1582, in handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/common.py", line 1695, in _write_value_to_tag
self.main_receivers.receive(windowed_value)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/worker/operations.py", line 239, in receive
self.update_counters_start(windowed_value)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/worker/operations.py", line 198, in update_counters_start
self.opcounter.update_from(windowed_value)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/worker/opcounters.py", line 213, in update_from
self.do_sample(windowed_value)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/worker/opcounters.py", line 265, in do_sample
self.coder_impl.get_estimated_size_and_observables(windowed_value))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/coders/coder_impl.py", line 1506, in get_estimated_size_and_observables
self._value_coder.get_estimated_size_and_observables(
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/coders/coder_impl.py", line 209, in get_estimated_size_and_observables
return self.estimate_size(value, nested), []
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/coders/coder_impl.py", line 1584, in estimate_size
value_size = self._value_coder.estimate_size(value)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/coders/coder_impl.py", line 248, in estimate_size
self.encode_to_stream(value, out, nested)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/coders/coder_impl.py", line 1769, in encode_to_stream
component_coder.encode_to_stream(attr, out, True)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/coders/coder_impl.py", line 1170, in encode_to_stream
self._elem_coder.encode_to_stream(elem, out, True)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/coders/coder_impl.py", line 1769, in encode_to_stream
component_coder.encode_to_stream(attr, out, True)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/coders/coder_impl.py", line 270, in encode_to_stream
return stream.write(self._encoder(value), nested)
^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 429, in encode
return value.encode('utf-8')
^^^^^^^^^^^^
AttributeError: 'int' object has no attribute 'encode'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/myaccount/projects/myproj/Outdoor Elements/filter.py", line 275, in <module>
main()
File "/Users/myaccount/projects/myproj/Outdoor Elements/filter.py", line 266, in main
beam_to_DB(output_json, "myproj-324103:viewable_datasets." + item, "/Users/myaccount/projects/myproj/Outdoor Elements/schema.json")
File "/Users/myaccount/projects/myproj/Outdoor Elements/filter.py", line 58, in beam_to_DB
pipeline.run().wait_until_finish()
^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/pipeline.py", line 577, in run
return self.runner.run_pipeline(self, self._options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/direct/direct_runner.py", line 129, in run_pipeline
return runner.run_pipeline(pipeline, options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 202, in run_pipeline
self._latest_run_result = self.run_via_runner_api(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 224, in run_via_runner_api
return self.run_stages(stage_context, stages)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 455, in run_stages
bundle_results = self._execute_bundle(
^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 783, in _execute_bundle
self._run_bundle(
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1012, in _run_bundle
result, splits = bundle_manager.process_bundle(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1348, in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 379, in push
response = self.worker.do_instruction(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 629, in do_instruction
return getattr(self, request_type)(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 667, in process_bundle
bundle_processor.process_bundle(instruction_id))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1061, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 231, in process_encoded
self.output(decoded_value)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/worker/operations.py", line 528, in output
_cast_to_receiver(self.receivers[output_index]).receive(windowed_value)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/worker/operations.py", line 240, in receive
self.consumer.process(windowed_value)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/worker/operations.py", line 908, in process
delayed_applications = self.dofn_runner.process(o)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/common.py", line 1420, in process
self._reraise_augmented(exn)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/common.py", line 1492, in _reraise_augmented
raise exn
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/common.py", line 1418, in process
return self.do_fn_invoker.invoke_process(windowed_value)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/common.py", line 624, in invoke_process
self.output_handler.handle_process_outputs(
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/common.py", line 1582, in handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/common.py", line 1695, in _write_value_to_tag
self.main_receivers.receive(windowed_value)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/worker/operations.py", line 240, in receive
self.consumer.process(windowed_value)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/worker/operations.py", line 908, in process
delayed_applications = self.dofn_runner.process(o)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/common.py", line 1420, in process
self._reraise_augmented(exn)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/common.py", line 1492, in _reraise_augmented
raise exn
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/common.py", line 1418, in process
return self.do_fn_invoker.invoke_process(windowed_value)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/common.py", line 624, in invoke_process
self.output_handler.handle_process_outputs(
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/common.py", line 1582, in handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/common.py", line 1695, in _write_value_to_tag
self.main_receivers.receive(windowed_value)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/worker/operations.py", line 240, in receive
self.consumer.process(windowed_value)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/worker/operations.py", line 908, in process
delayed_applications = self.dofn_runner.process(o)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/common.py", line 1420, in process
self._reraise_augmented(exn)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/common.py", line 1508, in _reraise_augmented
raise new_exn.with_traceback(tb)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/common.py", line 1418, in process
return self.do_fn_invoker.invoke_process(windowed_value)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/common.py", line 624, in invoke_process
self.output_handler.handle_process_outputs(
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/common.py", line 1582, in handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/common.py", line 1695, in _write_value_to_tag
self.main_receivers.receive(windowed_value)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/worker/operations.py", line 239, in receive
self.update_counters_start(windowed_value)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/worker/operations.py", line 198, in update_counters_start
self.opcounter.update_from(windowed_value)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/worker/opcounters.py", line 213, in update_from
self.do_sample(windowed_value)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/runners/worker/opcounters.py", line 265, in do_sample
self.coder_impl.get_estimated_size_and_observables(windowed_value))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/coders/coder_impl.py", line 1506, in get_estimated_size_and_observables
self._value_coder.get_estimated_size_and_observables(
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/coders/coder_impl.py", line 209, in get_estimated_size_and_observables
return self.estimate_size(value, nested), []
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/coders/coder_impl.py", line 1584, in estimate_size
value_size = self._value_coder.estimate_size(value)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/coders/coder_impl.py", line 248, in estimate_size
self.encode_to_stream(value, out, nested)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/coders/coder_impl.py", line 1769, in encode_to_stream
component_coder.encode_to_stream(attr, out, True)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/coders/coder_impl.py", line 1170, in encode_to_stream
self._elem_coder.encode_to_stream(elem, out, True)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/coders/coder_impl.py", line 1769, in encode_to_stream
component_coder.encode_to_stream(attr, out, True)
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/coders/coder_impl.py", line 270, in encode_to_stream
return stream.write(self._encoder(value), nested)
^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 429, in encode
return value.encode('utf-8')
^^^^^^^^^^^^
AttributeError: 'int' object has no attribute 'encode' [while running 'WriteToBigQuery/Map(<lambda at bigquery.py:2157>)']
File "/opt/homebrew/lib/python3.11/site-packages/apache_beam/coders/coders.py", line 429, in encode
return value.encode('utf-8')
AttributeError: 'int' object has no attribute 'encode' [while running 'WriteToBigQuery/Map(<lambda at bigquery.py:2157>)']
My code:
import os
import glob
import json
import geopandas as gpd
import apache_beam as beam
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
def beam_to_DB(data, db_table, schema):
if isinstance(schema, str):
with open(schema, 'r') as file:
schema = json.load(file)
# Create a pipeline.
pipeline = beam.Pipeline()
pcollection = pipeline | beam.Create([data])
# Write data to BigQuery.
pcollection | beam.io.WriteToBigQuery(
db_table,
schema={"fields": schema},
method='BATCH_INSERT',
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
# Run the pipeline.
pipeline.run().wait_until_finish()
How can I determine what in my code is causing this error?
You can use logs to find errors in your code. Using logs, you can find from which part of your code the error is generated. You can consider the following as an example.
test.json
{
"fields": [{
"name": "name", "type": "STRING", "mode": "NULLABLE"
}, {
"name": "age", "type": "INTEGER", "mode": "NULLABLE"
}]
}
test.py
db_table='dataset.table'
data=[{ 'name': 'a', 'age':46 }, { 'name': 'c', 'age': 24 },]
table_schema='./test.json'
class logs(beam.DoFn):
def process(self,element):
logging.info("Data is %s ", element)
def beam_to_DB(data, db_table, schemas):
if isinstance(schemas, str):
with open(schemas, 'r') as file:
schemas = json.load(file)
pipeline = beam.Pipeline(options=beam_options)
pcollection = pipeline |'Pcollection create' >> beam.Create(data)
pcollection| 'logging_pcollection' >> beam.ParDo(logs())
try:
pcollection|'Write to BigQuery' >> WriteToBigQuery(table=db_table, schema=schemas,method='BATCH_INSERT', create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
except Exception as e:
logging.error(f"Exception in Write to BigQuery - {str(e)}")
pipeline.run().wait_until_finish()
beam_to_DB(data, db_table, table_schema)
Graph view:
It sounds like the error you're getting is coming from either json or the data you're passing. In the above example I have provided the json, data I am used to execute the code successfully and I also included the log function. I hope this answer will help you.