I want to migrate the data tables from AWS database to BigQuery. I have a specific table named sampletable which includes id, user_id and log. Log is a JSON field that contains a dictionary which consists of keys and its respective values.
'reason': {
'id': 5,
'name': 'Sample name'
'contact': {
number = 123
address = None
}
},
'subreason': {
'id': 80,
'name': 'Sample name',
'is_active': True,
'created_at': '2022-07-18T18:33:28.911Z',
'deleted_at': None,
'complaint_id': 5,
},
This is the function that loads the data from the table to BigQuery:
def load_data(table_id, data):
print("load_data::Writing records to table", table_id)
job_config = bigquery.LoadJobConfig(
write_disposition="WRITE_APPEND",
schema=[
bigquery.SchemaField("id", "INT64"),
bigquery.SchemaField("user_id", "INT64"),
bigquery.SchemaField("log", "JSON"),
],
)
try:
start = time.time()
job = client.load_table_from_dataframe(
data, table_id, job_config=job_config
)
job.result()
end = time.time()
print("load_data::Time taken for writing " + str(data.shape[0]) + " records: ", end - start, "s")
except Exception as e:
print("load_data::exception", e)
print("load_data::Could not establish connection with Google BigQuery. Terminating program")
conn.close()
sys.exit()
However, an exception arises. The exception is that "exception cannot mix list and non-list, non-null values".
I tried changing the schema in this way:
schema=[
bigquery.SchemaField("id", "INT64"),
bigquery.SchemaField("user_id", "INT64"),
bigquery.SchemaField("log", "RECORD"), fields=
[
bigquery.SchemaField("reason", "RECORD", fields=
[
bigquery.SchemaField("id", "INT64"),
bigquery.SchemaField("name", "STRING")
bigquery.SchemaField("contact", "RECORD", fields=
[
bigquery.SchemaField("number", "STRING")
bigquery.SchemaField("address," "STRING"))
]
]),
bigquery.SchemaField("subreason", "RECORD", fields=
[
bigquery.SchemaField("id", "INT64"),
bigquery.SchemaField("name", "STRING")
bigquery.SchemaField("is_active", "BOOLEAN")
bigquery.SchemaField("created_at", "TIMESTAMP")
bigquery.SchemaField("deleted_at", "TIMESTAMP")
bigquery.SchemaField("complaint_id", "INT64")
]),
])
However, I get the exception " with type dict: was expecting tuple of (key, value) pair " Can anyone guide me in this issue as I am new to data migration of JSON columns in tables? What is the proper way to modify the schema to accept the JSON columns for migration?
You can try and consider below approach.
In this approach, you will be loading the data as JSON
data type in BigQuery. However, there will be manual adjustment on the JSON file since BigQuery accepts new-line delimited JSON for data ingestion. See below sample updated file json file.
{"log":{"reason":{"contact":{"address": null,"number": 123},"id": 5,"name": "Sample name"},"subreason": {"complaint_id": 5,"created_at": "2022-07-18T18:33:28.911Z","deleted_at": "None","id": 80,"is_active": true,"name": "Sample name"}}}
Notice that I compressed the JSON into one key which is named "log"
and also compressed it into one line to satisfy new-line delimited JSON.
Below is the python code I used to ingest the data:
table_id = "your-project.-your-dataset.your-table"
file_path = "/path/of/your_json_file.json"
def load_table_file(file_path, table_id):
# [START bigquery_load_from_file]
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the table to create.
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, autodetect=True,
#write_disposition="WRITE_APPEND",
schema=[
bigquery.SchemaField("log", "JSON"),
],
)
with open(file_path, "rb") as source_file:
job = client.load_table_from_file(source_file, table_id, job_config=job_config)
job.result() # Waits for the job to complete.
table = client.get_table(table_id) # Make an API request.
print(
"Loaded {} rows and {} columns to {}".format(
table.num_rows, len(table.schema), table_id
)
)
# [END bigquery_load_from_file]
return table
load_table_file(file_path, table_id)
Output: