Search code examples
airflowdockeroperator

Airflow unexpected argument 'mounts'


I'm trying to set up an Airflow ETL pipeline that extracts images from the .bag file. I wanna extract it inside docker and I'm using DockerOperator. Docker image is pulled from private GitLab repository. The script I want to run is a python script inside a Docker container. The .bag file is on my external-SSD so I'm trying to mount it inside docker. Is there something wrong with the code or is it a different kind of problem?

Error:

[2021-09-16 10:39:17,010] {docker.py:246} INFO - Starting docker container from image registry.gitlab.com/url/of/gitlab:a24a3f05
[2021-09-16 10:39:17,010] {taskinstance.py:1462} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/filip/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/filip/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/filip/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1312, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/filip/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 343, in execute
    return self._run_image()
  File "/home/filip/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 265, in _run_image
    return self._run_image_with_mounts(self.mounts, add_tmp_variable=False)
  File "/home/filip/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 287, in _run_image_with_mounts
    privileged=self.privileged,
  File "/usr/lib/python3/dist-packages/docker/api/container.py", line 607, in create_host_config
    return HostConfig(*args, **kwargs)
TypeError: __init__() got an unexpected keyword argument 'mounts'
[2021-09-16 10:39:17,014] {taskinstance.py:1512} INFO - Marking task as FAILED. dag_id=ETL-test, task_id=docker_extract, execution_date=20210916T083912, start_date=20210916T083915, end_date=20210916T083917
[2021-09-16 10:39:17,062] {local_task_job.py:151} INFO - Task exited with return code 1
[2021-09-16 10:39:17,085] {local_task_job.py:261} INFO - 0 downstream tasks scheduled from follow-on schedule check

This is my code :

from airflow import DAG
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
from airflow.operators.dummy import DummyOperator
from airflow.providers.docker.operators.docker import DockerOperator
from docker.types import Mount
from airflow.operators.bash_operator import BashOperator

ssd_dir=Mount(source='/media/filip/external-ssd', target='/external-ssd', type='bind')

dag = DAG(
    'ETL-test',
    default_args = {
    'owner'                 : 'admin',
    'description'           : 'Extract data from bag, simple test',
    'depend_on_past'        : False,
    'start_date'            : datetime(2021, 9, 13),
    },
)

start_dag = DummyOperator(
task_id='start_dag',
dag=dag
)

extract = DockerOperator(
api_version="auto",
task_id='docker_extract',
image='registry.gitlab.com/url/of/gitlab:a24a3f05',
container_name='extract-test',
mounts=[ssd_dir],
auto_remove = True,
force_pull = False,
mount_tmp_dir=False,
command='python3 rgb_image_extraction.py --bagfile /external-ssd/2021-09-01-13-17-10.bag --output_dir /external-ssd/airflow --camera_topic /kirby1/vm0/stereo/left/color/image_rect --every_n_img 20 --timestamp_as_name',
docker_conn_id='gitlab_registry',
dag=dag
)

test = BashOperator(
task_id='print_hello',
bash_command='echo "hello world"',
dag=dag
)

start_dag >> extract >> test 

Solution

  • I think you have an old docker python library installed. If you want to make sure airflow 2.1.0 works, you should always use constraints mechanism as described in https://airflow.apache.org/docs/apache-airflow/stable/installation.html otherwise you risk you will have outdated dependencies.

    For example if you use Python 3.6, the right constraints are https://raw.githubusercontent.com/apache/airflow/constraints-2.1.3/constraints-3.6.txt and there docker python library is 5.0.0 I bet you have much older version.