Search code examples
elasticsearchlogginglogstashairflowfilebeat

airflow web-ui cant retrieve logs from elasticsearch


I am trying to deploy apache-airflow on my k8s cluster with the community helm chart and apache/airflow:2.1.2 image

The airflow uses kubernetesExecutor as its executor and i have some troubles regarding managing logs

I am using elasticsearch for remote logging (logstash+filebeat for log aggregation)

I can see that logs are appearing in my elasticsearch cluster with all important fields (log_id,offset) but when i try to view them on my airflow-web ui , i can see that the containter experiences the error below

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/flask/app.py", line 2447, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/airflow/.local/lib/python3.6/site-packages/flask/app.py", line 1952, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/airflow/.local/lib/python3.6/site-packages/flask/app.py", line 1821, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/home/airflow/.local/lib/python3.6/site-packages/flask/_compat.py", line 39, in reraise
    raise value
  File "/home/airflow/.local/lib/python3.6/site-packages/flask/app.py", line 1950, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/airflow/.local/lib/python3.6/site-packages/flask/app.py", line 1936, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/www/auth.py", line 34, in decorated
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/www/decorators.py", line 60, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/www/views.py", line 1111, in get_logs_with_metadata
    logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/log/log_reader.py", line 57, in read_log_chunks
    logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/log/file_task_handler.py", line 228, in read
    log, metadata = self._read(task_instance, try_number_element, metadata)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/elasticsearch/log/es_task_handler.py", line 159, in _read
    logs_by_host = self._group_logs_by_host(logs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/elasticsearch/log/es_task_handler.py", line 128, in _group_logs_by_host
    grouped_logs[key].append(log)
TypeError: unhashable type: 'AttrDict'

What i want to happen: Be able to view logs in my airflow-web ui

Can anyone tell me what did i do wrong?

Appendix:

this is my filebeat.yaml file which reads the container logs (deployed as a daemonset in k8s using the official helm chart)

      filebeat.autodiscover:
        providers:
          - type: kubernetes
            templates:
              - condition:
                  contains:
                    kubernetes.labels.kubernetes_executor: "True"
                  equals:
                    kubernetes.namespace: "air"
                config:
                  - type: docker
                    containers.ids: '*'
      processors:
        - decode_json_fields:
            fields: ["message"]
            target: ""
            overwrite_keys: true
        - rename:
            fields:
              - from: "log.offset"
                to: "offset"
      output.logstash:
        hosts: ["logstash-logstash:5044"]

my logstash config file also deployed on k8s with the official helm chart

    input {
    beats {
      port => 5044
      }
    }
    filter {
        mutate {
          add_field => {
            "log_id" => "%{[kubernetes][labels][dag_id]}-%{[kubernetes][labels][task_id]}-%{[kubernetes][labels][execution_date]}-%{[kubernetes][labels][try_number]}"
          }
        }
    }
    output {
        elasticsearch {
          hosts => ["elasticsearch:9200"]
          index => "air-%{+YYYY.MM.dd_hh}"
          ssl => false
        }
    }

Solution

  • it seems like the problem was with the host field generated by filebeats.

    just added drop_field processor on host or rename host.name to host and you can have your logs in your web ui

    new filebeat.yaml file:

    filebeat.autodiscover:
      providers:
        - type: kubernetes
          templates:
            - condition:
                contains:
                  kubernetes.labels.kubernetes_executor: "True"
                equals:
                  kubernetes.namespace: "air"
              config:
                - type: docker
                  containers.ids: '*'
    processors:
      - decode_json_fields:
          fields: ["message"]
          target: ""
          overwrite_keys: true
      - drop_fields:
          fields:
            - host
      - rename:
          fields:
            - from: "log.offset"
              to: "offset"
    output.logstash:
      hosts: ["logstash-logstash:5044"]