What is the best way to read a BigQuery table from Airflow and save the result as a dataframe? I have a python module to read the table and can run this locally to fetch the table:
def reading(ds, **kwargs):
project_id = '{project-id}'
credentials = service_account.Credentials.from_service_account_file('/usr/local/airflow/extras/credentials.json')
bqclient = bq.Client(credentials= credentials,project=project_id)
query_string = bqclient.query("""
SELECT *
FROM `{project-id}.{schema-name}.{table-name}`""")
dataframe = (
bqclient.query(query_string)
.result()
.to_dataframe(
create_bqstorage_client=True,
)
)
print(dataframe.head())
However, when I am trying to call the module in an Airflow DAG, it gives an error:
_pickle.PicklingError: Pickling client objects is explicitly not supported. Clients have non-trivial state that is local and unpickleable.
Here is the full error stack:
*** Reading local file: /usr/local/airflow/logs/test_BQ_read/BQ_reading/2021-09-22T09:16:02.147166+00:00/1.log
[2021-09-22 09:16:05,401] {taskinstance.py:897} INFO - Dependencies all met for <TaskInstance: test_BQ_read.BQ_reading 2021-09-22T09:16:02.147166+00:00 [queued]>
[2021-09-22 09:16:05,434] {taskinstance.py:897} INFO - Dependencies all met for <TaskInstance: test_BQ_read.BQ_reading 2021-09-22T09:16:02.147166+00:00 [queued]>
[2021-09-22 09:16:05,434] {taskinstance.py:1088} INFO -
--------------------------------------------------------------------------------
[2021-09-22 09:16:05,434] {taskinstance.py:1089} INFO - Starting attempt 1 of 1
[2021-09-22 09:16:05,434] {taskinstance.py:1090} INFO -
--------------------------------------------------------------------------------
[2021-09-22 09:16:05,454] {taskinstance.py:1108} INFO - Executing <Task(PythonOperator): BQ_reading> on 2021-09-22T09:16:02.147166+00:00
[2021-09-22 09:16:05,469] {standard_task_runner.py:52} INFO - Started process 37080 to run task
[2021-09-22 09:16:05,492] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'test_BQ_read', 'BQ_reading', '2021-09-22T09:16:02.147166+00:00', '--job-id', '538', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/test_BQ_read.py', '--cfg-path', '/tmp/tmp3m9gbm14', '--error-file', '/tmp/tmps5fthy88']
[2021-09-22 09:16:05,494] {standard_task_runner.py:77} INFO - Job 538: Subtask BQ_reading
[2021-09-22 09:16:05,606] {logging_mixin.py:104} INFO - Running <TaskInstance: test_BQ_read.BQ_reading 2021-09-22T09:16:02.147166+00:00 [running]> on host ffce3e5ffe75
[2021-09-22 09:16:05,737] {taskinstance.py:1303} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_BQ_read
AIRFLOW_CTX_TASK_ID=BQ_reading
AIRFLOW_CTX_EXECUTION_DATE=2021-09-22T09:16:02.147166+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-09-22T09:16:02.147166+00:00
[2021-09-22 09:16:06,165] {taskinstance.py:1502} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1362, in _execute_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 150, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 161, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/dags/test_BQ_read.py", line 43, in reading
result = bqclient.query(query_string)
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 3223, in query
query_job._begin(retry=retry, timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/job/query.py", line 1138, in _begin
super(QueryJob, self)._begin(client=client, retry=retry, timeout=timeout)
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/job/base.py", line 468, in _begin
data=self.to_api_repr(),
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/job/query.py", line 819, in to_api_repr
configuration = self._configuration.to_api_repr()
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/job/query.py", line 605, in to_api_repr
resource = copy.deepcopy(self._properties)
File "/usr/local/lib/python3.7/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.7/copy.py", line 241, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.7/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.7/copy.py", line 241, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.7/copy.py", line 180, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/local/lib/python3.7/copy.py", line 281, in _reconstruct
state = deepcopy(state, memo)
File "/usr/local/lib/python3.7/copy.py", line 150, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.7/copy.py", line 241, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.7/copy.py", line 169, in deepcopy
rv = reductor(4)
File "/usr/local/lib/python3.7/site-packages/google/cloud/client.py", line 167, in __getstate__
"Clients have non-trivial state that is local and unpickleable.",
_pickle.PicklingError: Pickling client objects is explicitly not supported.
Clients have non-trivial state that is local and unpickleable.
[2021-09-22 09:16:06,172] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=test_BQ_read, task_id=BQ_reading, execution_date=20210922T091602, start_date=20210922T091605, end_date=20210922T091606
[2021-09-22 09:16:06,368] {local_task_job.py:153} INFO - Task exited with return code 1
Any suggestions on how to read the table correctly?
This is how it worked for me by making changes to the python module
from google.cloud import bigquery
from google.oauth2 import service_account
import google.auth
from datetime import datetime, timedelta
from airflow import DAG
from airflow import models
from airflow.models import Variable
import pandas as pd
from airflow.operators.python_operator import PythonOperator
def reading(ds, **kwargs):
credentials = service_account.Credentials.from_service_account_file('/usr/local/airflow/extras/credentials.json')
project_id = '{project-id}'
bqclient = bigquery.Client(credentials= credentials,project=project_id)
# Download a table.
table = bigquery.TableReference.from_string(
""project-id}.{schema_name}.{table_name}"
)
rows = bqclient.list_rows(
table,
)
dataframe = rows.to_dataframe(
create_bqstorage_client=True,
)
print(dataframe.head())
with models.DAG(
'test_BQ_read',
schedule_interval=None,
start_date='2021-09-22',
tags=["example"],
catchup=False
) as dag:
BQ_reading = PythonOperator(
task_id='BQ_reading',
python_callable=reading,
)
Probably something to do with pandas and BigQuery causing the pickle error.