Search code examples
pythonftpairflow

When using FTP on Apache Airflow, how do I inform the encoding?


I have to access a FTP server that does not use UTF-8 encoding. So when airflow tries to connect to it, it crashes.

I was able to reproduce the problem using the underlining ftplib that Airflow uses, as it follows:

ftp = FTP('myserver', user='xxxx', passwd='yyyy', encoding='utf-8')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python3.11/ftplib.py", line 121, in __init__
    self.connect(host)
  File "/usr/lib/python3.11/ftplib.py", line 162, in connect
    self.welcome = self.getresp()
                   ^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/ftplib.py", line 244, in getresp
    resp = self.getmultiline()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/ftplib.py", line 230, in getmultiline
    line = self.getline()
           ^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/ftplib.py", line 212, in getline
    line = self.file.readline(self.maxline + 1)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<frozen codecs>", line 322, in decode
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xe1 in position 60: invalid continuation byte

And when using latin-1, there is no problem:

ftp = FTP('myserver', user='xxxx', passwd='yyyyy', encoding='latin-1')
print(ftp.welcome)
220-Microsoft FTP Service
220 FTP XXXXX, utilizado pelos usuários do orgão e Editoras.

But I don't see any options on how to change the encoding when using an airflow operator or sensor. On extras it ignores {"encoding":"latin-1"}.


Solution

  • Looking at the https://github.com/apache/airflow/blob/6ec97dc6491c3f7d7cee3da2e6d2acb4e7bddba3/airflow/providers/ftp/hooks/ftp.py#L62, there's indeed nothing that sets the encoding.

    For your own project, you could subclass the FTPHook and FTPFileTransmitOperator, for example:

    import ftplib
    
    from airflow.compat.functools import cached_property
    from airflow.providers.ftp.hooks.ftp import FTPHook
    from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator
    
    
    class FTPHookWithEncoding(FTPHook):
        def get_conn(self) -> ftplib.FTP:
            if self.conn is None:
                params = self.get_connection(self.ftp_conn_id)
                pasv = params.extra_dejson.get("passive", True)
                encoding = params.extra_dejson.get("encoding")
                self.conn = ftplib.FTP(params.host, params.login, params.password, encoding=encoding)
                self.conn.set_pasv(pasv)
    
            return self.conn
    
    
    class FTPFileTransmitOperatorWithEncoding(FTPFileTransmitOperator):
        @cached_property
        def hook(self) -> FTPHookWithEncoding:
            return FTPHookWithEncoding(ftp_conn_id=self.ftp_conn_id)
    

    Since the code above is subclassing Airflow's FTPHook and FTPFileTransmitOperator, you're re-using all the methods from those classes. The additions with this code are:

    1. Fetch the encoding from the extras with encoding = params.extra_dejson.get("encoding"). Note this example makes it a required extras field, adjust to your needs.
    2. Set the encoding with ftplib.FTP(params.host, params.login, params.password, encoding=encoding).
    3. Initialize and return the custom FTPHookWithEncoding in the operator's hook() method.

    You might need to adjust this for the specific version of Airflow that you're running.

    And in your DAG, you'll need to update the operator to use your custom operator. For example:

    from my_package.operators import FTPFileTransmitOperatorWithEncoding
    
    FTPFileTransmitOperatorWithEncoding(task_id="...", ...)