I have to access a FTP server that does not use UTF-8 encoding. So when airflow tries to connect to it, it crashes.
I was able to reproduce the problem using the underlining ftplib that Airflow uses, as it follows:
ftp = FTP('myserver', user='xxxx', passwd='yyyy', encoding='utf-8')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/python3.11/ftplib.py", line 121, in __init__
self.connect(host)
File "/usr/lib/python3.11/ftplib.py", line 162, in connect
self.welcome = self.getresp()
^^^^^^^^^^^^^^
File "/usr/lib/python3.11/ftplib.py", line 244, in getresp
resp = self.getmultiline()
^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/ftplib.py", line 230, in getmultiline
line = self.getline()
^^^^^^^^^^^^^^
File "/usr/lib/python3.11/ftplib.py", line 212, in getline
line = self.file.readline(self.maxline + 1)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<frozen codecs>", line 322, in decode
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xe1 in position 60: invalid continuation byte
And when using latin-1, there is no problem:
ftp = FTP('myserver', user='xxxx', passwd='yyyyy', encoding='latin-1')
print(ftp.welcome)
220-Microsoft FTP Service
220 FTP XXXXX, utilizado pelos usuários do orgão e Editoras.
But I don't see any options on how to change the encoding when using an airflow operator or sensor. On extras it ignores {"encoding":"latin-1"}.
Looking at the https://github.com/apache/airflow/blob/6ec97dc6491c3f7d7cee3da2e6d2acb4e7bddba3/airflow/providers/ftp/hooks/ftp.py#L62, there's indeed nothing that sets the encoding.
For your own project, you could subclass the FTPHook and FTPFileTransmitOperator, for example:
import ftplib
from airflow.compat.functools import cached_property
from airflow.providers.ftp.hooks.ftp import FTPHook
from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator
class FTPHookWithEncoding(FTPHook):
def get_conn(self) -> ftplib.FTP:
if self.conn is None:
params = self.get_connection(self.ftp_conn_id)
pasv = params.extra_dejson.get("passive", True)
encoding = params.extra_dejson.get("encoding")
self.conn = ftplib.FTP(params.host, params.login, params.password, encoding=encoding)
self.conn.set_pasv(pasv)
return self.conn
class FTPFileTransmitOperatorWithEncoding(FTPFileTransmitOperator):
@cached_property
def hook(self) -> FTPHookWithEncoding:
return FTPHookWithEncoding(ftp_conn_id=self.ftp_conn_id)
Since the code above is subclassing Airflow's FTPHook
and FTPFileTransmitOperator
, you're re-using all the methods from those classes. The additions with this code are:
encoding = params.extra_dejson.get("encoding")
. Note this example makes it a required extras field, adjust to your needs.ftplib.FTP(params.host, params.login, params.password, encoding=encoding)
.FTPHookWithEncoding
in the operator's hook()
method.You might need to adjust this for the specific version of Airflow that you're running.
And in your DAG, you'll need to update the operator to use your custom operator. For example:
from my_package.operators import FTPFileTransmitOperatorWithEncoding
FTPFileTransmitOperatorWithEncoding(task_id="...", ...)