I am trying to deploy apache-airflow on my k8s cluster with the community helm chart and apache/airflow:2.1.2 image
The airflow uses kubernetesExecutor as its executor and i have some troubles regarding managing logs
I am using elasticsearch for remote logging (logstash+filebeat for log aggregation)
I can see that logs are appearing in my elasticsearch cluster with all important fields (log_id,offset) but when i try to view them on my airflow-web ui , i can see that the containter experiences the error below
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/flask/app.py", line 2447, in wsgi_app
response = self.full_dispatch_request()
File "/home/airflow/.local/lib/python3.6/site-packages/flask/app.py", line 1952, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/home/airflow/.local/lib/python3.6/site-packages/flask/app.py", line 1821, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/home/airflow/.local/lib/python3.6/site-packages/flask/_compat.py", line 39, in reraise
raise value
File "/home/airflow/.local/lib/python3.6/site-packages/flask/app.py", line 1950, in full_dispatch_request
rv = self.dispatch_request()
File "/home/airflow/.local/lib/python3.6/site-packages/flask/app.py", line 1936, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/www/auth.py", line 34, in decorated
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/www/decorators.py", line 60, in wrapper
return f(*args, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/www/views.py", line 1111, in get_logs_with_metadata
logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/log/log_reader.py", line 57, in read_log_chunks
logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py", line 228, in read
log, metadata = self._read(task_instance, try_number_element, metadata)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/elasticsearch/log/es_task_handler.py", line 159, in _read
logs_by_host = self._group_logs_by_host(logs)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/elasticsearch/log/es_task_handler.py", line 128, in _group_logs_by_host
grouped_logs[key].append(log)
TypeError: unhashable type: 'AttrDict'
What i want to happen: Be able to view logs in my airflow-web ui
Can anyone tell me what did i do wrong?
Appendix:
this is my filebeat.yaml file which reads the container logs (deployed as a daemonset in k8s using the official helm chart)
filebeat.autodiscover:
providers:
- type: kubernetes
templates:
- condition:
contains:
kubernetes.labels.kubernetes_executor: "True"
equals:
kubernetes.namespace: "air"
config:
- type: docker
containers.ids: '*'
processors:
- decode_json_fields:
fields: ["message"]
target: ""
overwrite_keys: true
- rename:
fields:
- from: "log.offset"
to: "offset"
output.logstash:
hosts: ["logstash-logstash:5044"]
my logstash config file also deployed on k8s with the official helm chart
input {
beats {
port => 5044
}
}
filter {
mutate {
add_field => {
"log_id" => "%{[kubernetes][labels][dag_id]}-%{[kubernetes][labels][task_id]}-%{[kubernetes][labels][execution_date]}-%{[kubernetes][labels][try_number]}"
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "air-%{+YYYY.MM.dd_hh}"
ssl => false
}
}
it seems like the problem was with the host
field generated by filebeats.
just added drop_field
processor on host
or rename
host.name
to host
and you can have your logs in your web ui
new filebeat.yaml file:
filebeat.autodiscover:
providers:
- type: kubernetes
templates:
- condition:
contains:
kubernetes.labels.kubernetes_executor: "True"
equals:
kubernetes.namespace: "air"
config:
- type: docker
containers.ids: '*'
processors:
- decode_json_fields:
fields: ["message"]
target: ""
overwrite_keys: true
- drop_fields:
fields:
- host
- rename:
fields:
- from: "log.offset"
to: "offset"
output.logstash:
hosts: ["logstash-logstash:5044"]