I'm trying to give a string list to the source_objects
field for the GoogleCloudStorageToBigQueryOperator
but with the following code I get an error:
string indices must be integers, not unicode
Things I don't know :
return
value of the get_file_name
with an XCOM at the DAG scope ?xcom_pull
function at the DAG scope without having to provide a context? It seemed to me that task instances do not need to provide a context.Things I thought of :
The thing I want to do :
Also, it seems that some of the operator's fields use a feature called templated_field
, what's the mechanism behind the template fields? Isn't just for PythonOperator
and BashOperator
?
And a last one, why does PythonOperator
does not return a TaskInstance
?
with DAG('bq_load_file_from_cloud_function', default_args=default_args) as dag:
def get_file_name_from_conf(ds, **kwargs):
fileName = kwargs['dag_run'].conf['fileName']
return [fileName]
get_file_name = PythonOperator(
task_id='get_file_name',
provide_context=True,
python_callable=get_file_name_from_conf)
bq_load = GoogleCloudStorageToBigQueryOperator(
task_id='bq_load',
bucket='src_bucket',
#source_objects=['data.csv'],
source_objects=get_file_name.xcom_pull(context='', task_ids='get_file_name'),
destination_project_dataset_table='project:dataset.table',
write_disposition='WRITE_EMPTY')
bq_load.set_upstream(get_file_name)
I'm kind of new to Python and Airflow. These kinds of things are supposed to be trivial I guess. I'm sure there's something I misunderstood here.
After many tests, I came up with this solution, thanks to tobi6 for his comment which pointed me in the right direction. I had to use the template_fields feature.
When I tried to return a list with a single string, I had concatenation errors, so I had to return a single string in my XCOM and the surround the template call to XCOM with brackets to make the result a list.
Here's the final code :
with DAG('bq_load_file_from_cloud_function', default_args=default_args) as dag:
def get_file_name_from_conf(ds, **kwargs):
return kwargs['dag_run'].conf['fileName']
get_file_name = PythonOperator(
task_id='get_file_name',
provide_context=True,
python_callable=get_file_name_from_conf)
bq_load = GoogleCloudStorageToBigQueryOperator(
task_id='bq_load',
bucket='src_bucket',
source_objects=["{{ task_instance.xcom_pull(task_ids='get_file_name') }}"],
destination_project_dataset_table='project:dataset.table',
write_disposition='WRITE_APPEND')
bq_load.set_upstream(get_file_name)