Search code examples
airflowdockeroperator

Airflow custom operator variables


I need to pass Airflow connection settings(AWS, Postgres) to docker container environment variables
I'm trying to do this using custom Operator and BaseHook. \

class S3ToPostgresDockerOperator(DockerOperator):
    @apply_defaults
    def __init__(self, aws_conn_id='aws_default', postgres_conn_id='postgres_default', **kwargs):
        super(S3ToPostgresDockerOperator, self).__init__(**kwargs)
        self.aws_conn = BaseHook.get_connection(aws_conn_id)
        self.pg_conn = BaseHook.get_connection(postgres_conn_id)

Is it possible to do something like that, or if not how should I do it?

    java_unpack_csv = S3ToPostgresDockerOperator(
        ...
        environment={
            'AWS_ACCESS_KEY': '{{ ??? }}',
            'AWS_SECRET_KEY': '{{ ??? }}'
        }
    )

Solution

  • You can build up the environment kwarg passed in the DockerOperator constructor.

    For example,

    class S3ToPostgresDockerOperator(DockerOperator):
        @apply_defaults
        def __init__(self, aws_conn_id='aws_default', postgres_conn_id='postgres_default', **kwargs):
            self.aws_conn = BaseHook.get_connection(aws_conn_id)
            self.pg_conn = BaseHook.get_connection(postgres_conn_id)
      
            credentials = self.aws_conn.get_credentials()
            kwargs['environment'] = dict(
                kwargs.pop('environment', {}),
                AWS_ACCESS_KEY=credentials.access_key,
                AWS_SECRET_KEY=credentials.secret_key,
                PG_DATABASE_URI=self.pg_conn.get_uri()
            )
            super(S3ToPostgresDockerOperator, self).__init__(**kwargs)