Search code examples
pythonairflowsftpairflow-2.xpysftp

"The requested operation cannot be performed because there is a file transfer in progress" error when doing multiple SFTP operation


We try to download the file from sftp and then delete the file (via Airflow), we get keeping this error on one of the sftp servers, it's working fine on other sftp servers.

[Errno 13] The requested operation cannot be performed because there is a file transfer in progress

        with ssh_hook_SFTP.get_conn() as ssh_client:
            sftp_client = ssh_client.open_sftp()

            bucket = storage_bucket_client.get_bucket(target_bucket)
            proxy_obj = sftp_client.file(sftp_file_path)
            blob = Blob(gcp_bucket_path, bucket)
            blob.upload_from_file(proxy_obj)

            log.info(f'Executing delete of {sftp_file_path}')
            sftp_client.remove(sftp_file_path)

here is the stacktrace

Traceback (most recent call last):
  File "/home/airflow/gcs/plugins/operators/PcsSftpToGcpOperator.py", line 148, in execute
    self.SFTPAgent(storage_bucket_client, sftp_client, source_folder, file.filename, target_bucket, post_pull_action, rename_prefix, files_list)
  File "/home/airflow/gcs/plugins/operators/PcsSftpToGcpOperator.py", line 262, in SFTPAgent
    sftp_client.remove(sftp_path)
  File "/opt/python3.8/lib/python3.8/site-packages/paramiko/sftp_client.py", line 398, in remove
    self._request(CMD_REMOVE, path)
  File "/opt/python3.8/lib/python3.8/site-packages/paramiko/sftp_client.py", line 813, in _request
    return self._read_response(num)
  File "/opt/python3.8/lib/python3.8/site-packages/paramiko/sftp_client.py", line 865, in _read_response
    self._convert_status(msg)
  File "/opt/python3.8/lib/python3.8/site-packages/paramiko/sftp_client.py", line 896, in _convert_status
    raise IOError(errno.EACCES, text)
PermissionError: [Errno 13] The requested operation cannot be performed because there is a file transfer in progress.

Any help appreciated

I confirmed we can download and delete the file manually from sftp with same credentials, so this is not permission issue for sure


Solution

  • I found the answer in case someone has same issue, we need to close the file

            with ssh_hook_SFTP.get_conn() as ssh_client:
                sftp_client = ssh_client.open_sftp()
    
                bucket = storage_bucket_client.get_bucket(target_bucket)
                try:
                  proxy_obj = sftp_client.file(sftp_file_path)
                  blob = Blob(gcp_bucket_path, bucket)
                  blob.upload_from_file(proxy_obj)
                finally:
                  proxy_obj.close()
    
                log.info(f'Executing delete of {sftp_file_path}')
                sftp_client.remove(sftp_file_path)