I trying to getting data from FTP server's txt file by GCP Composer Tasks.
So i imported and used ftplib package in code.
like this.
ftp = FTP()
ftp.connect(host=HOST,port=PORT, timeout=600)
ftp.login(user=USER,passwd=PSWD)
ftp.set_pasv(True)
ftp.sendcmd('TYPE A')
conn = ftp.transfercmd(F"RETR {PATH}")
fp = conn.makefile('rb')
but. this (conn = ftp.transfercmd(F"RETR {PATH}")
) code made TimeoutError: [Errno 110] Connection timed out
Error.
ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1166, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
result = task_copy.execute(context=context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 150, in execute
return_value = self.execute_callable()
File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 161, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/airflow/gcs/dags/FTP_ZIPCODE_to_BQ_DAG.py", line 91, in replace_BQ_table
conn = ftp.transfercmd(F"RETR {PATH}")
File "/opt/python3.8/lib/python3.8/ftplib.py", line 389, in transfercmd
return self.ntransfercmd(cmd, rest)[0]
File "/opt/python3.8/lib/python3.8/ftplib.py", line 350, in ntransfercmd
conn = socket.create_connection((host, port), self.timeout,
File "/opt/python3.8/lib/python3.8/socket.py", line 808, in create_connection
raise err
File "/opt/python3.8/lib/python3.8/socket.py", line 796, in create_connection
sock.connect(sa)
TimeoutError: [Errno 110] Connection timed out
Did you know reason????
and... this is my gcp composer evironments.
One thing that can be happening is that the connection is not working. In order to get the connection working, you need to set up Cloud NAT together with Cloud Composer to give the workers public internet access, as described in this documentation.