We try to download the file from sftp and then delete the file (via Airflow), we get keeping this error on one of the sftp servers, it's working fine on other sftp servers.
[Errno 13] The requested operation cannot be performed because there is a file transfer in progress
with ssh_hook_SFTP.get_conn() as ssh_client:
sftp_client = ssh_client.open_sftp()
bucket = storage_bucket_client.get_bucket(target_bucket)
proxy_obj = sftp_client.file(sftp_file_path)
blob = Blob(gcp_bucket_path, bucket)
blob.upload_from_file(proxy_obj)
log.info(f'Executing delete of {sftp_file_path}')
sftp_client.remove(sftp_file_path)
here is the stacktrace
Traceback (most recent call last):
File "/home/airflow/gcs/plugins/operators/PcsSftpToGcpOperator.py", line 148, in execute
self.SFTPAgent(storage_bucket_client, sftp_client, source_folder, file.filename, target_bucket, post_pull_action, rename_prefix, files_list)
File "/home/airflow/gcs/plugins/operators/PcsSftpToGcpOperator.py", line 262, in SFTPAgent
sftp_client.remove(sftp_path)
File "/opt/python3.8/lib/python3.8/site-packages/paramiko/sftp_client.py", line 398, in remove
self._request(CMD_REMOVE, path)
File "/opt/python3.8/lib/python3.8/site-packages/paramiko/sftp_client.py", line 813, in _request
return self._read_response(num)
File "/opt/python3.8/lib/python3.8/site-packages/paramiko/sftp_client.py", line 865, in _read_response
self._convert_status(msg)
File "/opt/python3.8/lib/python3.8/site-packages/paramiko/sftp_client.py", line 896, in _convert_status
raise IOError(errno.EACCES, text)
PermissionError: [Errno 13] The requested operation cannot be performed because there is a file transfer in progress.
Any help appreciated
I confirmed we can download and delete the file manually from sftp with same credentials, so this is not permission issue for sure
I found the answer in case someone has same issue, we need to close the file
with ssh_hook_SFTP.get_conn() as ssh_client:
sftp_client = ssh_client.open_sftp()
bucket = storage_bucket_client.get_bucket(target_bucket)
try:
proxy_obj = sftp_client.file(sftp_file_path)
blob = Blob(gcp_bucket_path, bucket)
blob.upload_from_file(proxy_obj)
finally:
proxy_obj.close()
log.info(f'Executing delete of {sftp_file_path}')
sftp_client.remove(sftp_file_path)