Search code examples

_pickle.PicklingError when trying to read a table from BigQuery and save it as a dataframe using Airflow

What is the best way to read a BigQuery table from Airflow and save the result as a dataframe? I have a python module to read the table and can run this locally to fetch the table:

def reading(ds, **kwargs):
    project_id = '{project-id}'
    credentials = service_account.Credentials.from_service_account_file('/usr/local/airflow/extras/credentials.json')
    bqclient = bq.Client(credentials= credentials,project=project_id)
    query_string = bqclient.query("""
        SELECT *
        FROM   `{project-id}.{schema-name}.{table-name}`""")

    dataframe = (

However, when I am trying to call the module in an Airflow DAG, it gives an error:

_pickle.PicklingError: Pickling client objects is explicitly not supported. Clients have non-trivial state that is local and unpickleable.

Here is the full error stack:

*** Reading local file: /usr/local/airflow/logs/test_BQ_read/BQ_reading/2021-09-22T09:16:02.147166+00:00/1.log
[2021-09-22 09:16:05,401] {} INFO - Dependencies all met for <TaskInstance: test_BQ_read.BQ_reading 2021-09-22T09:16:02.147166+00:00 [queued]>
[2021-09-22 09:16:05,434] {} INFO - Dependencies all met for <TaskInstance: test_BQ_read.BQ_reading 2021-09-22T09:16:02.147166+00:00 [queued]>
[2021-09-22 09:16:05,434] {} INFO - 
[2021-09-22 09:16:05,434] {} INFO - Starting attempt 1 of 1
[2021-09-22 09:16:05,434] {} INFO - 
[2021-09-22 09:16:05,454] {} INFO - Executing <Task(PythonOperator): BQ_reading> on 2021-09-22T09:16:02.147166+00:00
[2021-09-22 09:16:05,469] {} INFO - Started process 37080 to run task
[2021-09-22 09:16:05,492] {} INFO - Running: ['airflow', 'tasks', 'run', 'test_BQ_read', 'BQ_reading', '2021-09-22T09:16:02.147166+00:00', '--job-id', '538', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/', '--cfg-path', '/tmp/tmp3m9gbm14', '--error-file', '/tmp/tmps5fthy88']
[2021-09-22 09:16:05,494] {} INFO - Job 538: Subtask BQ_reading
[2021-09-22 09:16:05,606] {} INFO - Running <TaskInstance: test_BQ_read.BQ_reading 2021-09-22T09:16:02.147166+00:00 [running]> on host ffce3e5ffe75
[2021-09-22 09:16:05,737] {} INFO - Exporting the following env vars:
[2021-09-22 09:16:06,165] {} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/", line 1158, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/", line 1332, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/", line 1362, in _execute_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/", line 150, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/", line 161, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/usr/local/airflow/dags/", line 43, in reading
    result = bqclient.query(query_string)
  File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/", line 3223, in query
    query_job._begin(retry=retry, timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/job/", line 1138, in _begin
    super(QueryJob, self)._begin(client=client, retry=retry, timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/job/", line 468, in _begin
  File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/job/", line 819, in to_api_repr
    configuration = self._configuration.to_api_repr()
  File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/job/", line 605, in to_api_repr
    resource = copy.deepcopy(self._properties)
  File "/usr/local/lib/python3.7/", line 150, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.7/", line 241, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.7/", line 150, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.7/", line 241, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.7/", line 180, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/usr/local/lib/python3.7/", line 281, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/local/lib/python3.7/", line 150, in deepcopy
    y = copier(x, memo)
  File "/usr/local/lib/python3.7/", line 241, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/lib/python3.7/", line 169, in deepcopy
    rv = reductor(4)
  File "/usr/local/lib/python3.7/site-packages/google/cloud/", line 167, in __getstate__
    "Clients have non-trivial state that is local and unpickleable.",
_pickle.PicklingError: Pickling client objects is explicitly not supported.
Clients have non-trivial state that is local and unpickleable.
[2021-09-22 09:16:06,172] {} INFO - Marking task as FAILED. dag_id=test_BQ_read, task_id=BQ_reading, execution_date=20210922T091602, start_date=20210922T091605, end_date=20210922T091606
[2021-09-22 09:16:06,368] {} INFO - Task exited with return code 1

Any suggestions on how to read the table correctly?


  • This is how it worked for me by making changes to the python module

    from import bigquery
    from google.oauth2 import service_account
    import google.auth
    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow import models
    from airflow.models import Variable
    import pandas as pd
    from airflow.operators.python_operator import PythonOperator
    def reading(ds, **kwargs):
        credentials = service_account.Credentials.from_service_account_file('/usr/local/airflow/extras/credentials.json')
        project_id = '{project-id}'
        bqclient = bigquery.Client(credentials= credentials,project=project_id)
        # Download a table.
        table = bigquery.TableReference.from_string(
        rows = bqclient.list_rows(
        dataframe = rows.to_dataframe(
    with models.DAG(
    ) as dag:
        BQ_reading = PythonOperator(

    Probably something to do with pandas and BigQuery causing the pickle error.