I am trying to SFTP a file to a remote server in chunks using threads and the python paramiko library.
It opens a local file and sftp chunks to the remote server in different threads.
I am basically following this solution which uses the same approach to download large file over SFTP. I would like to send large files instead. Downloading solution
However, I’m getting in write_chunks()
on the line for chunk in infile.readv(chunks):
in getting this error:
AttributeError: ‘_io.BufferedReader’ object has no attribute ‘readv’
Could anybody assist with this error please. I thought that infile
is a file descriptor. I don’t understand why it is an _io.BufferedReader object
.
import threading, os, time, paramiko import time, paramiko MAX_RETRIES = 10 ftp_server = "server.com" port = 22 remote_file = "/home/filecopy.bin" local_file = "/home/file.bin" ssh_conn = sftp_client = None username = "none" password = "none" #you could make the number of threads relative to file size NUM_THREADS = 2 MAX_RETRIES = 10 def make_filepart_path(file_path, part_number): """creates filepart path from filepath""" return "%s.filepart.%s" % (file_path, part_number+1) def write_chunks(chunks, tnum, remote_file_part, username, password, ftp_server, max_retries): ssh_conn = sftp_client = None for retry in range(max_retries): try: ssh_conn = paramiko.Transport((ftp_server, port)) ssh_conn.connect(username=username, password=password) sftp_client = paramiko.SFTPClient.from_transport(ssh_conn) with sftp_client.open(remote_file_part, "wb") as outfile: with open(local_file, "rb") as infile: for chunk in infile.readv(chunks): outfile.write(chunk) break except (EOFError, paramiko.ssh_exception.SSHException, OSError) as x: retry += 1 print("%s %s Thread %s - > retrying %s..." % (type(x), x, tnum, retry)) time.sleep(abs(retry) * 10) finally: if hasattr(sftp_client, "close") and callable(sftp_client.close): sftp_client.close() if hasattr(ssh_conn, "close") and callable(ssh_conn.close): ssh_conn.close() start_time = time.time() for retry in range(MAX_RETRIES): try: ssh_conn = paramiko.Transport((ftp_server, port)) ssh_conn.connect(username=username, password=password) sftp_client = paramiko.SFTPClient.from_transport(ssh_conn) # connect to get the file's size in order to calculate chunks #filesize = sftp_client.stat(remote_file).st_size filesize = os.stat(local_file).st_size sftp_client.close() ssh_conn.close() chunksize = pow(2, 12) chunks = [(offset, chunksize) for offset in range(0, filesize, chunksize)] thread_chunk_size = (len(chunks) // NUM_THREADS) + 1 # break the chunks into sub lists to hand off to threads thread_chunks = [chunks[i:i+thread_chunk_size] for i in range(0, len(chunks) - 1, thread_chunk_size)] threads = [] fileparts = [] for thread_num in range(len(thread_chunks)): remote_file_part = make_filepart_path(remote_file, thread_num) args = (thread_chunks[thread_num], thread_num, remote_file_part, username, password, ftp_server, MAX_RETRIES) threads.append(threading.Thread(target=write_chunks, args=args)) fileparts.append(remote_file_part) for thread in threads: thread.start() for thread in threads: thread.join() # join file parts into one file, remove fileparts with sftp_client.open(remote_file_part, "wb") as outfile: for filepart in fileparts: with open(filepart, "rb") as infile: outfile.write(infile.read()) os.remove(filepart) break except (EOFError, paramiko.ssh_exception.SSHException, OSError) as x: retry += 1 print("%s %s - > retrying %s..." % (type(x), x, retry)) time.sleep(abs(retry) * 10) finally: if hasattr(sftp_client, "close") and callable(sftp_client.close): sftp_client.close() if hasattr(ssh_conn, "close") and callable(ssh_conn.close): ssh_conn.close() print("Loading File %s Took %d seconds " % (sftp_file, time.time() - start_time))
Stack trace:
Exception in thread Thread-4: Traceback (most recent call last): File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner self.run() File "/usr/lib/python3.6/threading.py", line 864, in run self._target(*self._args, **self._kwargs) File "simpleNNInference.py", line 210, in write_chunks for chunk in infile.readv(chunks): AttributeError: '_io.BufferedReader' object has no attribute 'readv' Exception in thread Thread-3: Traceback (most recent call last): File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner self.run() File "/usr/lib/python3.6/threading.py", line 864, in run self._target(*self._args, **self._kwargs) File "simpleNNInference.py", line 210, in write_chunks for chunk in infile.readv(chunks): AttributeError: '_io.BufferedReader' object has no attribute 'readv'
Advertisement
Answer
For an example how to do a parallel multi part upload of one large file, see the following example.
Note that most SFTP servers (including OpenSSH) do not allow merging files remotely. So you have to revert to shell command for that.
import os import threading import paramiko sftp_server = "example.com" username = "username" password = "password" local_path = "/local/path/file.dat" remote_path = "/remote/path/file.dat" threads_count = 4 size = os.path.getsize(local_path) part_size = int(size / threads_count) def open_ssh(): ssh = paramiko.SSHClient() ssh.connect(sftp_server, username=username, password=password) return ssh def upload_part(num, offset, part_size, remote_path_part): print(f"Running thread {num}") try: ssh = open_ssh() sftp = ssh.open_sftp() with open(local_path, "rb") as fl: fl.seek(offset) with sftp.open(remote_path_part, "wb") as fr: fr.set_pipelined(True) size = 0 while size < part_size: s = 32768 if size + s > part_size: s = part_size - size data = fl.read(s) fr.write(data) size += len(data) if len(data) == 0: break except (paramiko.ssh_exception.SSHException) as x: print(f"Thread {num} failed: {x}") print(f"Thread {num} done") print("Starting") offset = 0 threads = [] part_filenames = [] for num in range(threads_count): if num == threads_count - 1: part_size = size - offset remote_path_part = f"{remote_path}.{num}" args = (num, offset, part_size, remote_path_part) print(f"Starting thread {num} offset {offset} size {part_size} " + f"part name {remote_path_part}") thread = threading.Thread(target=upload_part, args=args) threads.append(thread) part_filenames.append(remote_path_part) thread.start() print(f"Started thread {num}") offset += part_size for num in range(len(threads)): print(f"Waiting for thread {num}") threads[num].join() print("All thread done") parts_list = " ".join(part_filenames) merge_command = f"rm "{remote_path}" 2> /dev/null ; " + f"for i in {parts_list} ; do cat "$i" >> {remote_path} && " + "rm "$i" || break ; done" print(f"Merge command: {merge_command}"); ssh = open_ssh() stdin, stdout, stderr = ssh.exec_command(merge_command) print(stdout.read().decode("utf-8")) print(stderr.read().decode("utf-8"))
I’m not sure how much is that backed up by the SFTP specification, but many SFTP servers, including OpenSSH, allow writing to the same file from multiple connections in parallel. So you can do even without merging the files – by uploading directly to the respective parts of the target file:
import os import threading import paramiko sftp_server = "example.com" username = "username" password = "password" local_path = "/local/path/file.dat" remote_path = "/remote/path/file.dat" threads_count = 4 size = os.path.getsize(local_path) part_size = int(size / threads_count) lock = threading.Lock() created = False def upload_part(num, offset, part_size): print(f"Running thread {num}") try: ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(sftp_server, port=port, username=username, password=password) sftp = ssh.open_sftp() with open(local_path, "rb") as fl: fl.seek(offset) with lock: global created m = "r+" if created else "w" created = True fr = sftp.open(remote_path, m) with fr: fr.seek(offset) fr.set_pipelined(True) size = 0 while size < part_size: s = 32768 if size + s > part_size: s = part_size - size data = fl.read(s) fr.write(data) size += len(data) if len(data) == 0: break except (paramiko.ssh_exception.SSHException) as x: print(f"Thread {num} failed: {x}") print(f"Thread {num} done") print("Starting") offset = 0 threads = [] for num in range(threads_count): if num == threads_count - 1: part_size = size - offset args = (num, offset, part_size) print(f"Starting thread {num} offset {offset} size {part_size}") thread = threading.Thread(target=upload_part, args=args) threads.append(thread) thread.start() print(f"Started thread {num}") offset += part_size for num in range(len(threads)): print(f"Waiting for thread {num}") threads[num].join() print("All thread done")