I need to pass Airflow connection settings(AWS, Postgres) to docker container environment variables
I'm trying to do this using custom Operator and BaseHook. \
class S3ToPostgresDockerOperator(DockerOperator):
@apply_defaults
def __init__(self, aws_conn_id='aws_default', postgres_conn_id='postgres_default', **kwargs):
super(S3ToPostgresDockerOperator, self).__init__(**kwargs)
self.aws_conn = BaseHook.get_connection(aws_conn_id)
self.pg_conn = BaseHook.get_connection(postgres_conn_id)
Is it possible to do something like that, or if not how should I do it?
java_unpack_csv = S3ToPostgresDockerOperator(
...
environment={
'AWS_ACCESS_KEY': '{{ ??? }}',
'AWS_SECRET_KEY': '{{ ??? }}'
}
)
You can build up the environment kwarg passed in the DockerOperator
constructor.
For example,
class S3ToPostgresDockerOperator(DockerOperator):
@apply_defaults
def __init__(self, aws_conn_id='aws_default', postgres_conn_id='postgres_default', **kwargs):
self.aws_conn = BaseHook.get_connection(aws_conn_id)
self.pg_conn = BaseHook.get_connection(postgres_conn_id)
credentials = self.aws_conn.get_credentials()
kwargs['environment'] = dict(
kwargs.pop('environment', {}),
AWS_ACCESS_KEY=credentials.access_key,
AWS_SECRET_KEY=credentials.secret_key,
PG_DATABASE_URI=self.pg_conn.get_uri()
)
super(S3ToPostgresDockerOperator, self).__init__(**kwargs)