Search code examples
google-bigqueryorchestrationluigi

output for append job in BigQuery using Luigi Orchestrator


I have a Bigquery task which only aims to append a daily temp table (Table-xxxx-xx-xx) to an existing table (PersistingTable).

I am not sure how to handle the output(self) method. Indeed, I can not just output PersistingTable as a luigi.contrib.bigquery.BigQueryTarget, since it already exists before the process started. Has anyone asked himself such a question?


Solution

  • I could not find an answer anywhere else so I will give my solution even though this is a very old question.

    I created a new class that inherits from luigi.contrib.bigquery.BigQueryLoadTask

    class BigQueryLoadIncremental(luigi.contrib.bigquery.BigQueryLoadTask):
            '''
            a subclass that checks whether a write-log on gcs exists to append data to the table
            needs to define Two Outputs! [0] of type BigQueryTarget and [1] of type GCSTarget
            Everything else is left unchanged
            '''
    
        def exists(self):
            return luigi.contrib.gcs.GCSClient.exists(self.output()[1].path)
    
        @property
        def write_disposition(self):
            """
            Set to WRITE_APPEND as this subclass only makes sense for this
            """
            return luigi.contrib.bigquery.WriteDisposition.WRITE_APPEND
    
        def run(self):
            output = self.output()[0]
            gcs_output = self.output()[1]
            assert isinstance(output,
                              luigi.contrib.bigquery.BigQueryTarget), 'Output[0] must be a BigQueryTarget, not %s' % (
            output)
            assert isinstance(gcs_output,
                              luigi.contrib.gcs.GCSTarget), 'Output[1] must be a Cloud Storage Target, not %s' % (
                gcs_output)
    
            bq_client = output.client
    
            source_uris = self.source_uris()
            assert all(x.startswith('gs://') for x in source_uris)
    
            job = {
                'projectId': output.table.project_id,
                'configuration': {
                    'load': {
                        'destinationTable': {
                            'projectId': output.table.project_id,
                            'datasetId': output.table.dataset_id,
                            'tableId': output.table.table_id,
                        },
                        'encoding': self.encoding,
                        'sourceFormat': self.source_format,
                        'writeDisposition': self.write_disposition,
                        'sourceUris': source_uris,
                        'maxBadRecords': self.max_bad_records,
                        'ignoreUnknownValues': self.ignore_unknown_values
                    }
                }
            }
    
            if self.source_format == luigi.contrib.bigquery.SourceFormat.CSV:
                job['configuration']['load']['fieldDelimiter'] = self.field_delimiter
                job['configuration']['load']['skipLeadingRows'] = self.skip_leading_rows
                job['configuration']['load']['allowJaggedRows'] = self.allow_jagged_rows
                job['configuration']['load']['allowQuotedNewlines'] = self.allow_quoted_new_lines
    
            if self.schema:
                job['configuration']['load']['schema'] = {'fields': self.schema}
    
            # test write to and removal of GCS pseudo output in order to make sure this does not fail.
            gcs_output.fs.put_string(
                'test write for task {} (this file should have been removed immediately)'.format(self.task_id),
                gcs_output.path)
            gcs_output.fs.remove(gcs_output.path)
    
            bq_client.run_job(output.table.project_id, job, dataset=output.table.dataset)
    
            gcs_output.fs.put_string(
                'success! The following BigQuery Job went through without errors: {}'.format(self.task_id), gcs_output.path)
    

    it uses a second output (which might violate luigis atomicity principle) on google cloud storage. Example usage:

    class LeadsToBigQuery(BigQueryLoadIncremental):
        date = luigi.DateParameter(default=datetime.date.today())
    
    
        def output(self):
            return luigi.contrib.bigquery.BigQueryTarget(project_id=...,
                                                         dataset_id=...,
                                                         table_id=...), \
                   create_gcs_target(...)