Search code examples
python-3.xgoogle-bigqueryetlgoogle-cloud-dataflowapache-beam

Dataflow (Apache Beam) can't write on BigQuery


I have a pipeline that has to write two records on BigQuery in its final steps and I really don't know why it seems that it inserts nothing. I have no errors, the table exists and it already contains the records, indeed I have to use TRUNCATE/INSERT mode.

Can someone please help me figure out why it is not working as I expected to do?

This is my pipeline:

     p = beam.Pipeline(options=pipeline_options)

    (p
        | 'Read Configuration Table ' >> beam.io.Read(beam.io.BigQuerySource(config['ENVIRONMENT']['configuration_table']))
        | 'Get Files from Server' >> beam.Map(import_file)
        | 'Upload files on Bucket' >> beam.Map(upload_file_on_bucket)
        | 'Set record update' >> beam.Map(set_last_step)
        | 'Update table' >> beam.io.gcp.bigquery.WriteToBigQuery(
                table=config['ENVIRONMENT']['configuration_table'],
                write_disposition='WRITE_TRUNCATE',
                schema=('folder:STRING, last_file:STRING')
                )
     )

with

def set_last_step(file_list):
    logging.info(msg='UPDATE CONFIGURATION TABLE - working on: ' + str(file_list))
    folder = ''

    if 'original' in file_list:
        if '1951' in file_list:
            folder = '1951'
        else:
            folder = '1952'
        dic = {'folder': folder, 'last_file': file_list['original']}
        logging.info(msg='UPDATE CONFIGURATION TABLE - no work done, reporting original record: ' + str(dic))
    else:
        folder = list(file_list.keys())[0]
        path = list(file_list.values())[0]
        dic = {'folder': folder, 'last_file': path}
        logging.info(msg='UPDATE CONFIGURATION TABLE - work done, reporting new record: ' + str(dic))

    purge(dir=os.path.join(HOME_PATH, 'download'), pattern=folder+"_")

    logging.info(msg='UPDATE CONFIGURATION TABLE - record to be updated: ' + str(dic))

    return dic

Input records to WriteToBigQuery stage (and obviously output from 'Update table' stage) are:

{'folder': '1952', 'last_file': '1952_2019120617.log.gz'}
{'folder': '1951', 'last_file': '1951_2019120617.log.gz'}

Debug information from DataFlow is:

2019-12-06 18:09:36 DEBUG    Creating or getting table <TableReference
 datasetId: 'MYDATASET'
 projectId: 'MYPROJECT'
 tableId: 'MYTABLE'> with schema {'fields': [{'name': 'folder', 'type': 'STRING', 'mode': 'NULLABLE'}, {'name': 'last_file', 'type': 'STRING', 'mode': 'NULLABLE'}]}.
2019-12-06 18:09:36 DEBUG    Created the table with id MYTABLE
2019-12-06 18:09:36 INFO     Created table MYPROJECT.MYDATASET.MYTABLE with schema <TableSchema
 fields: [<TableFieldSchema
 fields: []
 mode: 'NULLABLE'
 name: 'folder'
 type: 'STRING'>, <TableFieldSchema
 fields: []
 mode: 'NULLABLE'
 name: 'last_file'
 type: 'STRING'>]>. Result: <Table
 creationTime: 1575652176727
 etag: '0/GXOOeXPCmYsMfgGNxl2Q=='
 id: 'MYPROJECT:MYDATASET.MYTABLE'
 kind: 'bigquery#table'
 lastModifiedTime: 1575652176766
 location: 'EU'
 numBytes: 0
 numLongTermBytes: 0
 numRows: 0
 schema: <TableSchema
 fields: [<TableFieldSchema
 fields: []
 mode: 'NULLABLE'
 name: 'folder'
 type: 'STRING'>, <TableFieldSchema
 fields: []
 mode: 'NULLABLE'
 name: 'last_file'
 type: 'STRING'>]>
 selfLink: 'https://www.googleapis.com/bigquery/v2/projects/MYPROJECT/datasets/MYDATASET/tables/MYTABLE'
 tableReference: <TableReference
 datasetId: 'MYDATASET'
 projectId: 'MYPROJECT'
 tableId: 'MYTABLE'> with schema {'fields': [{'name': 'folder', 'type': 'STRING', 'mode': 'NULLABLE'}, {'name': 'last_file', 'type': 'STRING', 'mode': 'NULLABLE'}]}.
2019-12-06 18:09:36 DEBUG    Created the table with id MYTABLE
2019-12-06 18:09:36 INFO     Created table MYPROJECT.MYDATASET.MYTABLE with schema <TableSchema
 fields: [<TableFieldSchema
 fields: []
 mode: 'NULLABLE'
 name: 'folder'
 type: 'STRING'>, <TableFieldSchema
 fields: []
 mode: 'NULLABLE'
 name: 'last_file'
 type: 'STRING'>]>. Result: <Table
 creationTime: 1575652176727
 etag: '0/GXOOeXPCmYsMfgGNxl2Q=='
 id: 'MYPROJECT:MYDATASET.MYTABLE'
 kind: 'bigquery#table'
 lastModifiedTime: 1575652176766
 location: 'EU'
 numBytes: 0
 numLongTermBytes: 0
 numRows: 0
 schema: <TableSchema
 fields: [<TableFieldSchema
 fields: []
 mode: 'NULLABLE'
 name: 'folder'
 type: 'STRING'>, <TableFieldSchema
 fields: []
 mode: 'NULLABLE'
 name: 'last_file'
 type: 'STRING'>]>
 selfLink: 'https://www.googleapis.com/bigquery/v2/projects/MYPROJECT/datasets/MYDATASET/tables/MYTABLE'
 tableReference: <TableReference
 datasetId: 'MYDATASET'
 projectId: 'MYPROJECT'
 tableId: 'MYTABLE'>
 type: 'TABLE'>.
2019-12-06 18:09:36 WARNING  Sleeping for 150 seconds before the write as BigQuery inserts can be routed to deleted table for 2 mins after the delete and create.
2019-12-06 18:12:06 DEBUG    Attempting to flush to all destinations. Total buffered: 2
2019-12-06 18:12:06 DEBUG    Flushing data to MYPROJECT:MYDATASET.MYTABLE. Total 2 rows.
2019-12-06 18:12:07 DEBUG    Passed: True. Errors are []

Solution

  • The problem is related to the fact that Dataflow uses a stream approach on BigQuery, that means that data is persisted in the database not immediately but after a time window... so you only have to wait some minute.