Search code examples
pythonairflowspeed-test

Apache Airflow giving broken DAG error cannot import __builtin__ for speedtest.py


This is a weird error I'm coming across. In my Python 3.7 environment I have installed Airflow 2, speedtest-cli and few other things using pip and I keep seeing this error popup in the Airflow UI:

Broken DAG: [/env/app/airflow/dags/my_dag.py] Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/speedtest.py", line 156, in <module>
    import __builtin__
ModuleNotFoundError: No module named '__builtin__'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/speedtest.py", line 179, in <module>
    _py3_utf8_stdout = _Py3Utf8Output(sys.stdout)
  File "/usr/local/lib/python3.7/site-packages/speedtest.py", line 166, in __init__
    buf = FileIO(f.fileno(), 'w')
AttributeError: 'StreamLogWriter' object has no attribute 'fileno'

For sanity checks I did run the following and saw no problems:

~# python airflow/dags/my_dag.py 
/usr/local/lib/python3.7/site-packages/airflow/utils/decorators.py:94 DeprecationWarning: provide_context is deprecated as of 2.0 and is no longer required

~# airflow dags list
dag_id     | filepath      | owner   | paused
===========+===============+=========+=======
my_dag | my_dag.py | rafay   | False  


~# airflow tasks list my_dag 
[2021-03-08 16:46:26,950] {dagbag.py:448} INFO - Filling up the DagBag from /env/app/airflow/dags
/usr/local/lib/python3.7/site-packages/airflow/utils/decorators.py:94 DeprecationWarning: provide_context is deprecated as of 2.0 and is no longer required
Start_backup
get_configs
get_targets
push_targets

So nothing out of the ordinary and testing each of the tasks does not cause problems either. Further running the speedtest-cli script independently outside of Airflow does not raise any errors either. The script goes something like this:

import speedtest

def get_upload_speed():
    """
    Calculates the upload speed of the internet in using speedtest api

    Returns:
        Returns upload speed in Mbps
    """

    try:
        s = speedtest.Speedtest()
        upload = s.upload()
    except speedtest.SpeedtestException as e:
        raise AirflowException("Failed to check network bandwidth make sure internet is available.\nException: {}".format(e))

    return round(upload / (1024**2), 2)

I even went to the exact line of speedtest.py as mentioned Broken DAG error, line 156, it seems fine and runs fine when I put in in the python interpreter.

try:
    import __builtin__
except ImportError:
    import builtins
    from io import TextIOWrapper, FileIO

So, how do I diagnose this? Seems like a package import problem of some sort

Edit: If it helps here is my directory and import structure for my_dag.py

- airflow
  - dags
    - tasks
      - get_configs.py
      - get_taargets.py
      - push_targets.py  (speedtest is imported here)
    - my_dag.py

The import sequence of tasks in the dag file are as follows:

from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from tasks.get_configs import get_configs
from tasks.get_targets import get_targets
from tasks.push_targets import push_targets

...

Solution

  • The Airflow StreamLogWriter (and other log-related facilities) do not implement the fileno method expected by "standard" Python (I/O) log facility clients (confirmed by a todo comment). The problem here happens also when enabling the faulthandler standard library in an Airflow task.

    So what to do at this point? Aside opening an issue or sending a PR to Airflow, it is really case by case. In the speedtest-cli situation, it may be necessary to isolate the function calling fileno, and try to "replace" it (e.g. forking the library, changing the function if it can be isolated and injected, perhaps choosing a configuration that does not use that part of the code).

    In my particular case, there is no way to bypass the code, and a fork was the most straightforward method.