I'm doing some POC with GCP Dataflow and add some JSON object to BigQuery.
import apache_beam as beam
import apache_beam.io.gcp.bigquery as b_query
p1 = beam.Pipeline()
trips_schema = 'trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING'
freq = (
p1
| beam.Create(["{\"vendor_id\":33,\"trip_id\": 1000474,\"trip_distance\": 2.3999996185302734,\"fare_amount\": 42.13,\"store_and_fwd_flag\": \"Y\"}"])
| beam.Map(print)
| 'Write Record to BigQuery' >> b_query.WriteToBigQuery(table='trips2', dataset='my_poc',
custom_gcs_temp_location='gs://XXXX-stage'
'-xxxx/temp',
schema=trips_schema, project='xxxxxxxx-xxx-2',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, )
)
p1.run()
Now when I'm running this code, I'm getting the following error:
RuntimeError: BigQuery job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_957_6db0f0222c18bf6ef55dfb301cf9b7b2_2e2519daf47e48888bd08fc7661da2e6 failed. Error Result: <ErrorProto
location: 'gs://xxxx-stage-xxx/temp/bq_load/4b49a738d79b47ba8b188f04d10bf8f0/xxxx-dev-x.my_poc.trips2/258cab64-9d98-4fbc-8c16-087bdd0ea93c'
message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details. File: gs://xxxx-stage-xxx/temp/bq_load/4b49a738d79b47ba8b188f04d10bf8f0/xxxxx-dev-x.my_poc.trips2/258cab64-9d98-4fbc-8c16-087bdd0ea93c'
reason: 'invalid'> [while running 'Write Record to BigQuery/BigQueryBatchFileLoads/TriggerLoadJobsWithoutTempTables/ParDo(TriggerLoadJobs)']
File in the stage bucket which referred in error contains null.
Please help.
If you want to insert a Json
file to a BigQuery
table with Beam
and Dataflow
, you have to read the Json
file with NEWLINE_DELIMITED_JSON
format, example :
Your code with a mock as input :
import apache_beam as beam
import apache_beam.io.gcp.bigquery as b_query
def map_to_dict(input: str) -> Dict:
return json.loads(input)
p1 = beam.Pipeline()
trips_schema = 'trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING'
inputs: List[str] = [
"{\"vendor_id\":33,\"trip_id\": 1000474,\"trip_distance\": 2.3999996185302734,""\"fare_amount\": 42.13,\"store_and_fwd_flag\": \"Y\"}",
"{\"vendor_id\":34,\"trip_id\": 1000475,\"trip_distance\": 2.3999996185302734,""\"fare_amount\": 42.13,\"store_and_fwd_flag\": \"Y\"}"
]
freq = (
p1
| beam.Create(inputs)
| beam.Map(print)
| beam.Map(map_to_dict)
| 'Write Record to BigQuery' >> b_query.WriteToBigQuery(table='trips2', dataset='my_poc',
custom_gcs_temp_location='gs://XXXX-stage'
'-xxxx/temp',
schema=trips_schema, project='xxxxxxxx-xxx-2',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, )
)
p1.run()
Example with a Json
file from GCS
:
input.json :
{"vendor_id":33,"trip_id": 1000474,"trip_distance": 2.3999996185302734,"fare_amount": 42.13,"store_and_fwd_flag": "Y"}
{"vendor_id":34,"trip_id": 1000475,"trip_distance": 2.3999996185302734,"fare_amount": 42.13,"store_and_fwd_flag": "Y"}
import apache_beam as beam
import apache_beam.io.gcp.bigquery as b_query
def map_to_dict(input: str) -> Dict:
return json.loads(input)
p1 = beam.Pipeline()
trips_schema = 'trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING'
freq = (
p1
| ReadFromText("gs://your-bucket/object/input.json")
| beam.Map(print)
| beam.Map(map_to_dict)
| 'Write Record to BigQuery' >> b_query.WriteToBigQuery(table='trips2', dataset='my_poc',
custom_gcs_temp_location='gs://XXXX-stage'
'-xxxx/temp',
schema=trips_schema, project='xxxxxxxx-xxx-2',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, )
)
p1.run()
The following conditions need to be respected :
PCollection
to BigQuery
Json
object should match exactly the schema of the BigQuery
table. You can ignore extra fields with this parameter in the WriteToBigQuery
output connector : ignore_unknown_columns