Search code examples
pythongoogle-cloud-platformgoogle-cloud-dataflowapache-beam

Dataflow - add JSON file to BigQuery


I'm doing some POC with GCP Dataflow and add some JSON object to BigQuery.

import apache_beam as beam
import apache_beam.io.gcp.bigquery as b_query

p1 = beam.Pipeline()
trips_schema = 'trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING'

freq = (
        p1
        | beam.Create(["{\"vendor_id\":33,\"trip_id\": 1000474,\"trip_distance\": 2.3999996185302734,\"fare_amount\": 42.13,\"store_and_fwd_flag\": \"Y\"}"])
        | beam.Map(print)
        | 'Write Record to BigQuery' >> b_query.WriteToBigQuery(table='trips2', dataset='my_poc',
                                                                custom_gcs_temp_location='gs://XXXX-stage'
                                                                                         '-xxxx/temp',
                                                                schema=trips_schema, project='xxxxxxxx-xxx-2',
                                                                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                                                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, )
)
p1.run()

Now when I'm running this code, I'm getting the following error:

RuntimeError: BigQuery job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_957_6db0f0222c18bf6ef55dfb301cf9b7b2_2e2519daf47e48888bd08fc7661da2e6 failed. Error Result: <ErrorProto
 location: 'gs://xxxx-stage-xxx/temp/bq_load/4b49a738d79b47ba8b188f04d10bf8f0/xxxx-dev-x.my_poc.trips2/258cab64-9d98-4fbc-8c16-087bdd0ea93c'
 message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details. File: gs://xxxx-stage-xxx/temp/bq_load/4b49a738d79b47ba8b188f04d10bf8f0/xxxxx-dev-x.my_poc.trips2/258cab64-9d98-4fbc-8c16-087bdd0ea93c'
 reason: 'invalid'> [while running 'Write Record to BigQuery/BigQueryBatchFileLoads/TriggerLoadJobsWithoutTempTables/ParDo(TriggerLoadJobs)']

File in the stage bucket which referred in error contains null.

Please help.


Solution

  • If you want to insert a Json file to a BigQuery table with Beam and Dataflow, you have to read the Json file with NEWLINE_DELIMITED_JSON format, example :

    Your code with a mock as input :

    import apache_beam as beam
    import apache_beam.io.gcp.bigquery as b_query
    
    def map_to_dict(input: str) -> Dict:
        return json.loads(input)
    
    p1 = beam.Pipeline()
    trips_schema = 'trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING'
    
    inputs: List[str] = [
                "{\"vendor_id\":33,\"trip_id\": 1000474,\"trip_distance\": 2.3999996185302734,""\"fare_amount\": 42.13,\"store_and_fwd_flag\": \"Y\"}",
                "{\"vendor_id\":34,\"trip_id\": 1000475,\"trip_distance\": 2.3999996185302734,""\"fare_amount\": 42.13,\"store_and_fwd_flag\": \"Y\"}"
            ]
    
    freq = (
            p1
            | beam.Create(inputs)
            | beam.Map(print)
            | beam.Map(map_to_dict)
            | 'Write Record to BigQuery' >> b_query.WriteToBigQuery(table='trips2', dataset='my_poc',
                                                                    custom_gcs_temp_location='gs://XXXX-stage'
                                                                                             '-xxxx/temp',
                                                                    schema=trips_schema, project='xxxxxxxx-xxx-2',
                                                                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                                                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, )
    )
    p1.run()
    

    Example with a Json file from GCS :

    input.json :

    {"vendor_id":33,"trip_id": 1000474,"trip_distance": 2.3999996185302734,"fare_amount": 42.13,"store_and_fwd_flag": "Y"}
    {"vendor_id":34,"trip_id": 1000475,"trip_distance": 2.3999996185302734,"fare_amount": 42.13,"store_and_fwd_flag": "Y"}
    
    import apache_beam as beam
    import apache_beam.io.gcp.bigquery as b_query
    
    def map_to_dict(input: str) -> Dict:
        return json.loads(input)
    
    p1 = beam.Pipeline()
    trips_schema = 'trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING'
    
    freq = (
            p1
            | ReadFromText("gs://your-bucket/object/input.json")
            | beam.Map(print)
            | beam.Map(map_to_dict)
            | 'Write Record to BigQuery' >> b_query.WriteToBigQuery(table='trips2', dataset='my_poc',
                                                                    custom_gcs_temp_location='gs://XXXX-stage'
                                                                                             '-xxxx/temp',
                                                                    schema=trips_schema, project='xxxxxxxx-xxx-2',
                                                                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                                                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, )
    )
    p1.run()
    

    The following conditions need to be respected :

    • You need to transform your string to Dict before to save the PCollection to BigQuery
    • The Json object should match exactly the schema of the BigQuery table. You can ignore extra fields with this parameter in the WriteToBigQuery output connector : ignore_unknown_columns