I am trying to SFTP a file to a remote server in chunks using threads and the python paramiko library.
It opens a local file and sftp chunks to the remote server in different threads.
I am basically following this solution which uses the same approach to download large file over SFTP. I would like to send large files instead. Downloading solution
However, I'm getting in write_chunks()
on the line for chunk in infile.readv(chunks):
in getting this error:
AttributeError: '_io.BufferedReader' object has no attribute 'readv'
Could anybody assist with this error please. I thought that infile
is a file descriptor. I don't understand why it is an _io.BufferedReader object
.
import threading, os, time, paramiko
import time, paramiko
MAX_RETRIES = 10
ftp_server = "server.com"
port = 22
remote_file = "/home/filecopy.bin"
local_file = "/home/file.bin"
ssh_conn = sftp_client = None
username = "none"
password = "none"
#you could make the number of threads relative to file size
NUM_THREADS = 2
MAX_RETRIES = 10
def make_filepart_path(file_path, part_number):
"""creates filepart path from filepath"""
return "%s.filepart.%s" % (file_path, part_number+1)
def write_chunks(chunks, tnum, remote_file_part, username, password, ftp_server, max_retries):
ssh_conn = sftp_client = None
for retry in range(max_retries):
try:
ssh_conn = paramiko.Transport((ftp_server, port))
ssh_conn.connect(username=username, password=password)
sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
with sftp_client.open(remote_file_part, "wb") as outfile:
with open(local_file, "rb") as infile:
for chunk in infile.readv(chunks):
outfile.write(chunk)
break
except (EOFError, paramiko.ssh_exception.SSHException, OSError) as x:
retry += 1
print("%s %s Thread %s - > retrying %s..." % (type(x), x, tnum, retry))
time.sleep(abs(retry) * 10)
finally:
if hasattr(sftp_client, "close") and callable(sftp_client.close):
sftp_client.close()
if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
ssh_conn.close()
start_time = time.time()
for retry in range(MAX_RETRIES):
try:
ssh_conn = paramiko.Transport((ftp_server, port))
ssh_conn.connect(username=username, password=password)
sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
# connect to get the file's size in order to calculate chunks
#filesize = sftp_client.stat(remote_file).st_size
filesize = os.stat(local_file).st_size
sftp_client.close()
ssh_conn.close()
chunksize = pow(2, 12)
chunks = [(offset, chunksize) for offset in range(0, filesize, chunksize)]
thread_chunk_size = (len(chunks) // NUM_THREADS) + 1
# break the chunks into sub lists to hand off to threads
thread_chunks = [chunks[i:i+thread_chunk_size] for i in range(0, len(chunks) - 1, thread_chunk_size)]
threads = []
fileparts = []
for thread_num in range(len(thread_chunks)):
remote_file_part = make_filepart_path(remote_file, thread_num)
args = (thread_chunks[thread_num], thread_num, remote_file_part, username, password, ftp_server, MAX_RETRIES)
threads.append(threading.Thread(target=write_chunks, args=args))
fileparts.append(remote_file_part)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
# join file parts into one file, remove fileparts
with sftp_client.open(remote_file_part, "wb") as outfile:
for filepart in fileparts:
with open(filepart, "rb") as infile:
outfile.write(infile.read())
os.remove(filepart)
break
except (EOFError, paramiko.ssh_exception.SSHException, OSError) as x:
retry += 1
print("%s %s - > retrying %s..." % (type(x), x, retry))
time.sleep(abs(retry) * 10)
finally:
if hasattr(sftp_client, "close") and callable(sftp_client.close):
sftp_client.close()
if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
ssh_conn.close()
print("Loading File %s Took %d seconds " % (sftp_file, time.time() - start_time))
Stack trace:
Exception in thread Thread-4:
Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "simpleNNInference.py", line 210, in write_chunks
for chunk in infile.readv(chunks):
AttributeError: '_io.BufferedReader' object has no attribute 'readv'
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "simpleNNInference.py", line 210, in write_chunks
for chunk in infile.readv(chunks):
AttributeError: '_io.BufferedReader' object has no attribute 'readv'
For an example how to do a parallel multi part upload of one large file, see the following example.
Note that most SFTP servers (including OpenSSH until very recent 9.0) do not allow merging files remotely. So you have to revert to shell command for that.
import os
import threading
import paramiko
sftp_server = "example.com"
username = "username"
password = "password"
local_path = "/local/path/file.dat"
remote_path = "/remote/path/file.dat"
threads_count = 4
size = os.path.getsize(local_path)
part_size = int(size / threads_count)
def open_ssh():
ssh = paramiko.SSHClient()
ssh.connect(sftp_server, username=username, password=password)
return ssh
def upload_part(num, offset, part_size, remote_path_part):
print(f"Running thread {num}")
try:
ssh = open_ssh()
sftp = ssh.open_sftp()
with open(local_path, "rb") as fl:
fl.seek(offset)
with sftp.open(remote_path_part, "wb") as fr:
fr.set_pipelined(True)
size = 0
while size < part_size:
s = 32768
if size + s > part_size:
s = part_size - size
data = fl.read(s)
fr.write(data)
size += len(data)
if len(data) == 0:
break
except (paramiko.ssh_exception.SSHException) as x:
print(f"Thread {num} failed: {x}")
print(f"Thread {num} done")
print("Starting")
offset = 0
threads = []
part_filenames = []
for num in range(threads_count):
if num == threads_count - 1:
part_size = size - offset
remote_path_part = f"{remote_path}.{num}"
args = (num, offset, part_size, remote_path_part)
print(f"Starting thread {num} offset {offset} size {part_size} " +
f"part name {remote_path_part}")
thread = threading.Thread(target=upload_part, args=args)
threads.append(thread)
part_filenames.append(remote_path_part)
thread.start()
print(f"Started thread {num}")
offset += part_size
for num in range(len(threads)):
print(f"Waiting for thread {num}")
threads[num].join()
print("All thread done")
parts_list = " ".join(part_filenames)
merge_command =
f"rm \"{remote_path}\" 2> /dev/null ; " + \
f"for i in {parts_list} ; do cat \"$i\" >> {remote_path} && " + \
"rm \"$i\" || break ; done"
print(f"Merge command: {merge_command}");
ssh = open_ssh()
stdin, stdout, stderr = ssh.exec_command(merge_command)
print(stdout.read().decode("utf-8"))
print(stderr.read().decode("utf-8"))
I'm not sure how much is that backed up by the SFTP specification, but many SFTP servers, including OpenSSH, allow writing to the same file from multiple connections in parallel. So you can do even without merging the files – by uploading directly to the respective parts of the target file:
import os
import threading
import paramiko
sftp_server = "example.com"
username = "username"
password = "password"
local_path = "/local/path/file.dat"
remote_path = "/remote/path/file.dat"
threads_count = 4
size = os.path.getsize(local_path)
part_size = int(size / threads_count)
lock = threading.Lock()
created = False
def upload_part(num, offset, part_size):
print(f"Running thread {num}")
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(sftp_server, port=port, username=username, password=password)
sftp = ssh.open_sftp()
with open(local_path, "rb") as fl:
fl.seek(offset)
with lock:
global created
m = "r+" if created else "w"
created = True
fr = sftp.open(remote_path, m)
with fr:
fr.seek(offset)
fr.set_pipelined(True)
size = 0
while size < part_size:
s = 32768
if size + s > part_size:
s = part_size - size
data = fl.read(s)
fr.write(data)
size += len(data)
if len(data) == 0:
break
except (paramiko.ssh_exception.SSHException) as x:
print(f"Thread {num} failed: {x}")
print(f"Thread {num} done")
print("Starting")
offset = 0
threads = []
for num in range(threads_count):
if num == threads_count - 1:
part_size = size - offset
args = (num, offset, part_size)
print(f"Starting thread {num} offset {offset} size {part_size}")
thread = threading.Thread(target=upload_part, args=args)
threads.append(thread)
thread.start()
print(f"Started thread {num}")
offset += part_size
for num in range(len(threads)):
print(f"Waiting for thread {num}")
threads[num].join()
print("All thread done")