I would like to materialize query from BigQuery with function
def events_ga4(self):
EVENTS_TARGET = 'tests.events'
# last_date = self.bq.get_last_push_date(EVENTS_TARGET, 'session_date')
# if self.is_already_done_for_today(last_date):
# self.log(f'{EVENTS_TARGET} already done, nothing changed')
# return False
self.log(f'start {EVENTS_TARGET}')
# dates = get_data_ragne(last_date)
dates = [(datetime.datetime.today() - datetime.timedelta(1)).date().strftime("%Y%m%d")]
for date in dates:
self.log(f'downloading {date}')
query = f'''
select user_pseudo_id, parse_date('%Y%m%d', event_date) session_date,
event_timestamp, event_name, event_params, ecommerce.transaction_id
FROM `ga4.events_{date}`
where user_pseudo_id is not null
'''
self.log('query')
data = self.bq.query(query)
self.log('transforming to arrow')
data_to_update = data.to_arrow()
self.log('save to parquet file')
Path('reports').mkdir(exist_ok=True)
big_file = Path(f'reports/events_{date}.parquet')
pq.write_table(data_to_update, big_file, compression=None)
self.log('load')
print(pq.read_schema(big_file))
# self.get_table_schema('testy_ab_events.json')
self.bq.load_from_parquet(EVENTS_TARGET, big_file, 'append')
self.log('deleting big file')
big_file.unlink()
self.log(f'done {EVENTS_TARGET} for {date}')
Here is the function to load data to BQ:
def load_from_parquet(self, table_id: str, path_to_csv: Path, mode: str, schema: dict = None) -> str:
with open(path_to_csv, 'rb') as f:
return f"{self._load_to_bq(table_id=table_id, file=f, mode=mode, file_type='parquet', schema=schema)}, many rows loaded."
def _load_to_bq(self, table_id, file, mode, file_type='nd json', schema=None) -> str:
# job config
# https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload
source_formats = {
'nd json': bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
'csv': bigquery.SourceFormat.CSV,
'parquet': bigquery.SourceFormat.PARQUET
}
job_config = bigquery.LoadJobConfig(
source_format=source_formats[file_type]
)
if schema:
job_config.schema = self.format_schema(schema)
else:
job_config.autodetect = True
if mode == 'append':
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
elif mode == 'turncate':
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
elif mode == 'empty':
job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY
else:
raise ValueError(f'{mode} is not valid mode. Choose append, turncate or empty')
Here is the json scheme:
[{
"name": "event_params",
"type": "RECORD",
"mode": "REPEATED",
"fields": [{
"name": "key",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "value",
"type": "RECORD",
"mode": "NULLABLE",
"fields": [{
"name": "string_value",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "int_value",
"type": "INTEGER",
"mode": "NULLABLE"
},
{
"name": "float_value",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "double_value",
"type": "FLOAT",
"mode": "NULLABLE"
}
]
}
]
}
]
and I'm still getting error:
google.api_core.exceptions.BadRequest: 400 400 Provided Schema does not match Table analityka-269913:testy_ab.events. Cannot add fields (field: event_params.list)
errors[]:
{'reason': 'invalid', 'message': 'Provided Schema does not match Table analityka-269913:testy_ab.events. Cannot add fields (field: event_params.list)'}
I tried to read arrow from python function read_from_parquet but it took too much RAM, I would like to have this parquet thing to work, not by json nd :/
It looks like this error is being caused because the target schema reflects what one would expect using when enabling list inference but this flag is not being set in the load job.