Search code examples
pythongoogle-bigqueryparquet

Parquet file does not map correctly columns scheme


I would like to materialize query from BigQuery with function

def events_ga4(self):
        EVENTS_TARGET = 'tests.events'
        # last_date = self.bq.get_last_push_date(EVENTS_TARGET, 'session_date')
        # if self.is_already_done_for_today(last_date):
        #     self.log(f'{EVENTS_TARGET} already done, nothing changed')
        #     return False
        self.log(f'start {EVENTS_TARGET}')
        # dates = get_data_ragne(last_date)
        dates = [(datetime.datetime.today() - datetime.timedelta(1)).date().strftime("%Y%m%d")]
        for date in dates:
            self.log(f'downloading {date}')
            query = f'''
                    select user_pseudo_id, parse_date('%Y%m%d', event_date) session_date,
                    event_timestamp, event_name, event_params, ecommerce.transaction_id
                    FROM `ga4.events_{date}`
                    where user_pseudo_id is not null
            '''
            self.log('query')
            data = self.bq.query(query)
            self.log('transforming to arrow')
            data_to_update = data.to_arrow()
            self.log('save to parquet file')
            Path('reports').mkdir(exist_ok=True)
            big_file = Path(f'reports/events_{date}.parquet')
            pq.write_table(data_to_update, big_file, compression=None)
            self.log('load')
            print(pq.read_schema(big_file))
            # self.get_table_schema('testy_ab_events.json')
            self.bq.load_from_parquet(EVENTS_TARGET, big_file, 'append')
            self.log('deleting big file')
            big_file.unlink()
            self.log(f'done {EVENTS_TARGET} for {date}')

Here is the function to load data to BQ:

def load_from_parquet(self, table_id: str, path_to_csv: Path, mode: str, schema: dict = None) -> str:
        with open(path_to_csv, 'rb') as f:
            return f"{self._load_to_bq(table_id=table_id, file=f, mode=mode, file_type='parquet', schema=schema)}, many rows loaded."

    def _load_to_bq(self, table_id, file, mode, file_type='nd json', schema=None) -> str:
        # job config
        # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload
        source_formats = {
            'nd json': bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
            'csv': bigquery.SourceFormat.CSV,
            'parquet': bigquery.SourceFormat.PARQUET
        }
        job_config = bigquery.LoadJobConfig(
            source_format=source_formats[file_type]
        )
        if schema:
            job_config.schema = self.format_schema(schema)
        else:
            job_config.autodetect = True
        if mode == 'append':
            job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
        elif mode == 'turncate':
            job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
        elif mode == 'empty':
            job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY
        else:
            raise ValueError(f'{mode} is not valid mode. Choose append, turncate or empty')

Here is the json scheme:

[{
        "name": "event_params",
        "type": "RECORD",
        "mode": "REPEATED",
        "fields": [{
                "name": "key",
                "type": "STRING",
                "mode": "NULLABLE"
            },
            {
                "name": "value",
                "type": "RECORD",
                "mode": "NULLABLE",
                "fields": [{
                        "name": "string_value",
                        "type": "STRING",
                        "mode": "NULLABLE"
                    },
                    {
                        "name": "int_value",
                        "type": "INTEGER",
                        "mode": "NULLABLE"
                    },
                    {
                        "name": "float_value",
                        "type": "FLOAT",
                        "mode": "NULLABLE"
                    },
                    {
                        "name": "double_value",
                        "type": "FLOAT",
                        "mode": "NULLABLE"
                    }
                ]
            }
        ]
    }
]

and I'm still getting error:

google.api_core.exceptions.BadRequest: 400 400 Provided Schema does not match Table analityka-269913:testy_ab.events. Cannot add fields (field: event_params.list)
errors[]:
{'reason': 'invalid', 'message': 'Provided Schema does not match Table analityka-269913:testy_ab.events. Cannot add fields (field: event_params.list)'}

I tried to read arrow from python function read_from_parquet but it took too much RAM, I would like to have this parquet thing to work, not by json nd :/


Solution

  • It looks like this error is being caused because the target schema reflects what one would expect using when enabling list inference but this flag is not being set in the load job.