This is a weird error I'm coming across. In my Python 3.7 environment I have installed Airflow 2, speedtest-cli
and few other things using pip and I keep seeing this error popup in the Airflow UI:
Broken DAG: [/env/app/airflow/dags/my_dag.py] Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/speedtest.py", line 156, in <module>
import __builtin__
ModuleNotFoundError: No module named '__builtin__'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/speedtest.py", line 179, in <module>
_py3_utf8_stdout = _Py3Utf8Output(sys.stdout)
File "/usr/local/lib/python3.7/site-packages/speedtest.py", line 166, in __init__
buf = FileIO(f.fileno(), 'w')
AttributeError: 'StreamLogWriter' object has no attribute 'fileno'
For sanity checks I did run the following and saw no problems:
~# python airflow/dags/my_dag.py
/usr/local/lib/python3.7/site-packages/airflow/utils/decorators.py:94 DeprecationWarning: provide_context is deprecated as of 2.0 and is no longer required
~# airflow dags list
dag_id | filepath | owner | paused
===========+===============+=========+=======
my_dag | my_dag.py | rafay | False
~# airflow tasks list my_dag
[2021-03-08 16:46:26,950] {dagbag.py:448} INFO - Filling up the DagBag from /env/app/airflow/dags
/usr/local/lib/python3.7/site-packages/airflow/utils/decorators.py:94 DeprecationWarning: provide_context is deprecated as of 2.0 and is no longer required
Start_backup
get_configs
get_targets
push_targets
So nothing out of the ordinary and testing each of the tasks does not cause problems either. Further running the speedtest-cli
script independently outside of Airflow does not raise any errors either. The script goes something like this:
import speedtest
def get_upload_speed():
"""
Calculates the upload speed of the internet in using speedtest api
Returns:
Returns upload speed in Mbps
"""
try:
s = speedtest.Speedtest()
upload = s.upload()
except speedtest.SpeedtestException as e:
raise AirflowException("Failed to check network bandwidth make sure internet is available.\nException: {}".format(e))
return round(upload / (1024**2), 2)
I even went to the exact line of speedtest.py
as mentioned Broken DAG error, line 156, it seems fine and runs fine when I put in in the python interpreter.
try:
import __builtin__
except ImportError:
import builtins
from io import TextIOWrapper, FileIO
So, how do I diagnose this? Seems like a package import problem of some sort
Edit: If it helps here is my directory and import structure for my_dag.py
- airflow
- dags
- tasks
- get_configs.py
- get_taargets.py
- push_targets.py (speedtest is imported here)
- my_dag.py
The import sequence of tasks in the dag file are as follows:
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from tasks.get_configs import get_configs
from tasks.get_targets import get_targets
from tasks.push_targets import push_targets
...
The Airflow StreamLogWriter
(and other log-related facilities) do not implement the fileno
method expected by "standard" Python (I/O) log facility clients (confirmed by a todo
comment). The problem here happens also when enabling the faulthandler
standard library in an Airflow task.
So what to do at this point? Aside opening an issue or sending a PR to Airflow, it is really case by case. In the speedtest-cli
situation, it may be necessary to isolate the function calling fileno
, and try to "replace" it (e.g. forking the library, changing the function if it can be isolated and injected, perhaps choosing a configuration that does not use that part of the code).
In my particular case, there is no way to bypass the code, and a fork was the most straightforward method.