I have written a Python script that downloads a single file using 32 connections if available.
I have written a multiconnection downloader that works fine without pausing, but won’t stop downloading after resuming, the progress would go beyond 100%…
Like this:
Download mode: Multi-thread (press Space to pause/resume, press Escape to stop) [████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████] 120% completed, paused: False Download mode: Multi-thread (press Space to pause/resume, press Escape to stop) 1798.08 MiB downloaded, 1489.83 MiB total, -308.25 MiB remaining, download speed: 22.73 MiB/s Minimum speed: 0.00 MiB/s, average speed: 4.54 MiB/s, maximum speed: 75.00 MiB/s Task started on 2021-08-09 16:57:03, 00:06:35 elapsed, ETA: -1:59:47
After progress exceeds 100%, there will be error messages like this:
Exception in thread Thread-78: Traceback (most recent call last): File "C:Program FilesPython39libthreading.py", line 973, in _bootstrap_inner self.run() File "C:Program FilesPython39libthreading.py", line 910, in run self._target(*self._args, **self._kwargs) File "D:MyScriptdownloader.py", line 70, in multidown mm[position: position+len(chunk)] = chunk IndexError: mmap slice assignment is wrong size
(The above doesn’t include all of the error message)
I have encountered all sorts of errors after resuming, but most importantly, the server will often send extra bytes from previous request, whose connection is dead and needless to say this breaks the whole code.
How should I implement pause and resume correctly?
I am thinking about multiprocessing, I assume the sessions and connections are all PID and port number related, and so far I haven’t encountered a new run of the script that received extra bytes from previous runs of the script, so I guess using another process with a new PID and new port number plus requests.session()
plus {'connection': 'close'}
for each download should guarantee that no extra bytes from previous connections will be received, I just don’t know how to share variables between processes…
The code:
downloader.py
import json import keyboard import os import re import requests import sys import time import validators from collections import deque from datetime import datetime, timedelta from math import inf from mmap import mmap from pathlib import Path from ping3 import ping from reprint import output from threading import Thread def timestring(sec): sec = int(sec) m, s = divmod(sec, 60) h, m = divmod(m, 60) return f'{h:02d}:{m:02d}:{s:02d}' class Downloader: def __init__(self): self.recent = deque([0] * 12, maxlen=12) self.recentspeeds = deque([0] * 200, maxlen=200) self.paused = False self.progress = dict() class Multidown: def __init__(self, obj, id): self.count = 0 self.position = 0 self.completed = False self.id = id self.parent = obj def multidown(self, url, start, end): interrupted = False s = requests.session() s.headers.update({'connection': 'close', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:91.0) Gecko/20100101 Firefox/91.0'}) r = s.get( url, headers={'range': 'bytes={0}-{1}'.format(start, end)}, stream=True) length = int(r.headers['content-length']) while end - length + (self.id != self.parent.progress['connections'] - 1) != start or r.status_code != 206: r.close() s.close() del r del s time.sleep(0.02) s = requests.session() r = s.get( url, headers={'range': 'bytes={0}-{1}'.format(start, end)}, stream=True) length = int(r.headers['content-length']) self.position = start for chunk in r.iter_content(1048576): if self.parent.paused: self.parent.mm.flush() r.connection.close() r.close() s.close() del r del s interrupted = True break if chunk: self.parent.mm[self.position: self.position+len(chunk)] = chunk self.count += len(chunk) self.position += len(chunk) self.parent.progress[self.id]['count'] = self.count self.parent.progress[self.id]['position'] = self.position if not interrupted: r.close() s.close() if self.count == self.parent.progress[self.id]['length']: self.completed = True self.parent.progress[self.id]['completed'] = True self.parent.mm.flush() class Singledown: def __init__(self): self.count = 0 def singledown(self, url, path): with requests.get(url, stream=True) as r: with path.open('wb') as file: for chunk in r.iter_content(1048576): if chunk: self.count += len(chunk) file.write(chunk) def download(self, url, filepath, num_connections=32, overwrite=False): singlethread = False threads = [] bcontinue = False filepath = filepath.replace('\', '/') if (not re.match('^[a-zA-Z]:/(((?![<>:"/|?*]).)+((?<![ .])/)?)*$', filepath) or not Path(filepath[:3]).exists()): print('Invalid windows file path has been inputted, process will now stop.') return if not validators.url(url): print('Invalid url been inputted, process will now stop.') return if url.lower().startswith('ftp://'): print( "`requests` module doesn't suport File Transfer Protocol, process will now stop") return path = Path(filepath) if not path.exists(): bcontinue = True else: if path.is_file(): if overwrite: bcontinue = True else: while True: answer = input( f'`{filepath}` already exists, do you want to overwrite it? n(Yes, No):').lower() if answer in ['y', 'yes', 'n', 'no']: if answer.startswith('y'): os.remove(filepath) bcontinue = True break else: print('Invalid input detected, retaking input.') if not bcontinue: print( f'Overwritting {filepath} has been aborted, process will now stop.') return bcontinue = False server = url.split('/')[2] ok = ping(server, timeout=2) if ok == False: print( 'The server of the inputted url is non-existent, process will now stop.') return if ok: bcontinue = True if not ok: print('Connection has timed out, will reattempt to ping server 5 times.') for i in range(5): print( f'Reattempting to ping server, retrying {i + 1} out of 5') ok = ping(server, timeout=2) if ok: print( f'Connection successful on retry {i + 1}, process will now continue.') bcontinue = True break else: print(f'Retry {i + 1} out of 5 timed out' + (i != 4) * ', reattempting in 1 second.' + (i == 4) * '.') time.sleep(1) if not bcontinue: print('Failed to connect server, connection timed out, process will now stop') return bcontinue = False head = requests.head(url) if head.status_code == 200: bcontinue = True else: for i in range(5): print(f'Server responce is invalid, retrying {i + 1} out of 5') head = requests.head(url) if head.status_code == 200: print( f'Connection successful on retry {i + 1}, process will now continue.') bcontinue = True break else: print(f'Retry {i + 1} out of 5 failed to access data' + (i != 4) * ', reattempting in 1 second.' + (i == 4) * '.') time.sleep(1) if not bcontinue: print("Can't establish a connection with access to data, can't download target file, process will now stop.") return folder = '/'.join(filepath.split('/')[:-1]) Path(folder).mkdir(parents=True, exist_ok=True) headers = head.headers total = headers.get('content-length') if not total: print( f'Cannot find the total length of the content of {url}, the file will be downloaded using a single thread.') started = datetime.now() print('Task started on %s.' % started.strftime('%Y-%m-%d %H:%M:%S')) sd = self.Singledown() th = Thread(target=sd.singledown, args=(url, path)) threads.append(sd) th.start() total = inf singlethread = True else: total = int(total) if not headers.get('accept-ranges'): print( 'Server does not support the `range` parameter, the file will be downloaded using a single thread.') started = datetime.now() print('Task started on %s.' % started.strftime('%Y-%m-%d %H:%M:%S')) sd = self.Singledown() th = Thread(target=sd.singledown, args=(url, path)) threads.append(sd) th.start() singlethread = True else: segment = total / num_connections started = datetime.now() lastpressed = started path.touch() file = path.open('wb') file.seek(total - 1) file.write(b'') file.close() file = path.open(mode='r+b') self.mm = mmap(file.fileno(), 0) print('Task started on %s.' % started.strftime('%Y-%m-%d %H:%M:%S')) self.progress['total'] = total self.progress['connections'] = num_connections for i in range(num_connections): md = self.Multidown(self, i) start = int(segment * i) end = int(segment * (i + 1)) - (i != num_connections - 1) length = end - start + (i != num_connections - 1) th = Thread(target=md.multidown, args=( url, start, end)) threads.append(md) self.progress[i] = dict() self.progress[i]['start'] = start self.progress[i]['position'] = start self.progress[i]['end'] = end self.progress[i]['count'] = 0 self.progress[i]['length'] = length self.progress[i]['completed'] = False th.start() Path(filepath + '.progress.json').write_text(json.dumps(self.progress, indent=4)) downloaded = 0 totalMiB = total / 1048576 speeds = [] interval = 0.04 with output(initial_len=5, interval=0) as dynamic_print: while True: Path(filepath + '.progress.json').write_text(json.dumps(self.progress, indent=4)) status = sum([i.completed for i in threads]) downloaded = sum(i.count for i in threads) self.recent.append(downloaded) done = int(100 * downloaded / total) doneMiB = downloaded / 1048576 gt0 = len([i for i in self.recent if i]) if not gt0: speed = 0 else: recent = list(self.recent)[12 - gt0:] if len(recent) == 1: speed = recent[0] / 1048576 / interval else: diff = [b - a for a, b in zip(recent, recent[1:])] speed = sum(diff) / len(diff) / 1048576 / interval speeds.append(speed) self.recentspeeds.append(speed) nzspeeds = [i for i in speeds if i] if nzspeeds: minspeed = min(nzspeeds) else: minspeed = 0 maxspeed = max(speeds) now = datetime.now() elapsed = (now - started).total_seconds() meanspeed = downloaded / elapsed / 1048576 remaining = totalMiB - doneMiB dynamic_print[0] = '[{0}{1}] {2}'.format( 'u2588' * done, 'u00b7' * (100-done), str(done)) + '% completed' + (not singlethread) * ', paused: {0}'.format(self.paused) dynamic_print[1] = 'Download mode: ' + singlethread * 'Single-thread' + (not singlethread) * 'Multi-thread (press Space to pause/resume, press Escape to stop)' dynamic_print[2] = '{0:.2f} MiB downloaded, {1:.2f} MiB total, {2:.2f} MiB remaining, download speed: {3:.2f} MiB/s'.format( doneMiB, totalMiB, remaining, speed) if speed and total != inf: eta = timestring(remaining / speed) else: eta = '99:59:59' dynamic_print[3] = 'Minimum speed: {0:.2f} MiB/s, average speed: {1:.2f} MiB/s, maximum speed: {2:.2f} MiB/s'.format( minspeed, meanspeed, maxspeed) dynamic_print[4] = 'Task started on {0}, {1} elapsed, ETA: {2}'.format( started.strftime('%Y-%m-%d %H:%M:%S'), timestring(elapsed), eta) if keyboard.is_pressed('space'): if not singlethread: pressed = datetime.now() if (pressed - lastpressed).total_seconds() > 0.5: lastpressed = pressed if self.paused: for i, md in enumerate(threads): if not md.completed: th = Thread(target=md.multidown, args=( url, self.progress[i]['position'], self.progress[i]['end'])) th.start() self.paused = not self.paused if keyboard.is_pressed('esc'): if not singlethread: ended = datetime.now() self.paused = True break if status == len(threads): if not singlethread: self.mm.close() ended = datetime.now() break time.sleep(interval) time_spent = (ended - started).total_seconds() meanspeed = total / time_spent / 1048576 status = sum([i.completed for i in threads]) if status == len(threads): print('Task completed on {0}, total time elapsed: {1}, average speed: {2:.2f} MiB/s'.format( ended.strftime('%Y-%m-%d %H:%M:%S'), timestring(time_spent), meanspeed)) else: print('Task interrupted on {0}, total time elapsed: {1}, average speed: {2:.2f} MiB/s'.format( ended.strftime('%Y-%m-%d %H:%M:%S'), timestring(time_spent), meanspeed)) if __name__ == '__main__': d = Downloader() d.download(*sys.argv[1:])
For testing purposes this is a dumbed-down version of the script, with all checks removed while retaining the same functionality (sorry it really takes all these lines to show the download information):
import json import os import requests import sys import time from collections import deque from datetime import datetime, timedelta from math import inf from mmap import mmap from pathlib import Path from reprint import output from threading import Thread def timestring(sec): sec = int(sec) m, s = divmod(sec, 60) h, m = divmod(m, 60) return f'{h:02d}:{m:02d}:{s:02d}' class Downloader: def __init__(self): self.recent = deque([0] * 12, maxlen=12) self.recentspeeds = deque([0] * 200, maxlen=200) self.paused = False self.progress = dict() self.UA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:91.0) Gecko/20100101 Firefox/91.0' class Multidown: def __init__(self, obj, id): self.count = 0 self.position = 0 self.completed = False self.id = id self.parent = obj self.UA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:91.0) Gecko/20100101 Firefox/91.0' def multidown(self, url, start, end): interrupted = False s = requests.session() s.headers.update({'connection': 'close', 'user-agent': self.UA}) r = s.get( url, headers={'range': 'bytes={0}-{1}'.format(start, end)}, stream=True) length = int(r.headers['content-length']) while end - length + (self.id != self.parent.progress['connections'] - 1) != start or r.status_code != 206: r.close() s.close() del r del s time.sleep(0.02) s = requests.session() r = s.get( url, headers={'range': 'bytes={0}-{1}'.format(start, end)}, stream=True) length = int(r.headers['content-length']) self.position = start for chunk in r.iter_content(1048576): if self.parent.paused: self.parent.mm.flush() r.connection.close() r.close() s.close() del r del s interrupted = True break if chunk: self.parent.mm[self.position: self.position+len(chunk)] = chunk self.count += len(chunk) self.position += len(chunk) self.parent.progress[self.id]['count'] = self.count self.parent.progress[self.id]['position'] = self.position if not interrupted: r.close() s.close() if self.count == self.parent.progress[self.id]['length']: self.completed = True self.parent.progress[self.id]['completed'] = True self.parent.mm.flush() def download(self, url, filepath, num_connections=32, overwrite=False): singlethread = False threads = [] bcontinue = False filepath = filepath.replace('\', '/') if Path(filepath).exists(): os.remove(filepath) folder = '/'.join(filepath.split('/')[:-1]) Path(folder).mkdir(parents=True, exist_ok=True) head = requests.head(url, headers={'user-agent': self.UA}) path = Path(filepath) headers = head.headers total = headers.get('content-length') if total: total = int(total) if headers.get('accept-ranges'): segment = total / num_connections started = datetime.now() lastpressed = started path.touch() file = path.open('wb') file.seek(total - 1) file.write(b'') file.close() file = path.open(mode='r+b') self.mm = mmap(file.fileno(), 0) print('Task started on %s.' % started.strftime('%Y-%m-%d %H:%M:%S')) self.progress['total'] = total self.progress['connections'] = num_connections for i in range(num_connections): md = self.Multidown(self, i) start = int(segment * i) end = int(segment * (i + 1)) - (i != num_connections - 1) length = end - start + (i != num_connections - 1) th = Thread(target=md.multidown, args=( url, start, end)) threads.append(md) self.progress[i] = dict() self.progress[i]['start'] = start self.progress[i]['position'] = start self.progress[i]['end'] = end self.progress[i]['count'] = 0 self.progress[i]['length'] = length self.progress[i]['completed'] = False th.start() Path(filepath + '.progress.json').write_text(json.dumps(self.progress, indent=4)) downloaded = 0 totalMiB = total / 1048576 speeds = [] interval = 0.04 with output(initial_len=5, interval=0) as dynamic_print: while True: Path(filepath + '.progress.json').write_text(json.dumps(self.progress, indent=4)) status = sum([i.completed for i in threads]) downloaded = sum(i.count for i in threads) self.recent.append(downloaded) done = int(100 * downloaded / total) doneMiB = downloaded / 1048576 gt0 = len([i for i in self.recent if i]) if not gt0: speed = 0 else: recent = list(self.recent)[12 - gt0:] if len(recent) == 1: speed = recent[0] / 1048576 / interval else: diff = [b - a for a, b in zip(recent, recent[1:])] speed = sum(diff) / len(diff) / 1048576 / interval speeds.append(speed) self.recentspeeds.append(speed) nzspeeds = [i for i in speeds if i] if nzspeeds: minspeed = min(nzspeeds) else: minspeed = 0 maxspeed = max(speeds) now = datetime.now() elapsed = (now - started).total_seconds() meanspeed = downloaded / elapsed / 1048576 remaining = totalMiB - doneMiB dynamic_print[0] = '[{0}{1}] {2}'.format( 'u2588' * done, 'u00b7' * (100-done), str(done)) + '% completed' + (not singlethread) * ', paused: {0}'.format(self.paused) dynamic_print[1] = 'Download mode: ' + singlethread * 'Single-thread' + (not singlethread) * 'Multi-thread (press Space to pause/resume, press Escape to stop)' dynamic_print[2] = '{0:.2f} MiB downloaded, {1:.2f} MiB total, {2:.2f} MiB remaining, download speed: {3:.2f} MiB/s'.format( doneMiB, totalMiB, remaining, speed) if speed and total != inf: eta = timestring(remaining / speed) else: eta = '99:59:59' dynamic_print[3] = 'Minimum speed: {0:.2f} MiB/s, average speed: {1:.2f} MiB/s, maximum speed: {2:.2f} MiB/s'.format( minspeed, meanspeed, maxspeed) dynamic_print[4] = 'Task started on {0}, {1} elapsed, ETA: {2}'.format( started.strftime('%Y-%m-%d %H:%M:%S'), timestring(elapsed), eta) if PAUSE: if not singlethread: pressed = datetime.now() if (pressed - lastpressed).total_seconds() > 0.5: lastpressed = pressed if self.paused: for i, md in enumerate(threads): if not md.completed: th = Thread(target=md.multidown, args=( url, self.progress[i]['position'], self.progress[i]['end'])) th.start() self.paused = not self.paused if status == len(threads): if not singlethread: self.mm.close() ended = datetime.now() break time.sleep(interval) time_spent = (ended - started).total_seconds() meanspeed = total / time_spent / 1048576 status = sum([i.completed for i in threads]) if status == len(threads): print('Task completed on {0}, total time elapsed: {1}, average speed: {2:.2f} MiB/s'.format( ended.strftime('%Y-%m-%d %H:%M:%S'), timestring(time_spent), meanspeed)) else: print('Task interrupted on {0}, total time elapsed: {1}, average speed: {2:.2f} MiB/s'.format( ended.strftime('%Y-%m-%d %H:%M:%S'), timestring(time_spent), meanspeed)) if __name__ == '__main__': import hashlib global PAUSE PAUSE = False chash = '5674E59283D95EFE8C88770515A9BBC80CBB77CB67602389FD91DEF26D26AED2' d = Downloader() if sys.argv[1] == '0': d.download('http://ipv4.download.thinkbroadband.com/1GB.zip', 'C:/test/1GB.zip') elif sys.argv[1] == '1': th1 = Thread(target=d.download, args=('http://ipv4.download.thinkbroadband.com/1GB.zip', 'C:/test/1GB.zip')) th1.start() def test(): while th1.is_alive(): global PAUSE PAUSE = not PAUSE time.sleep(10) th2 = Thread(target=test) th2.start() while th1.is_alive(): pass sha256_hash = hashlib.sha256() with open('C:/test/1GB.zip',"rb") as f: for byte_block in iter(lambda: f.read(1048576),b""): sha256_hash.update(byte_block) print(sha256_hash.hexdigest().lower() == chash.lower())
The url isn’t accessible without a VPN in my locale, and test 0
always results True
, that is, if the connection hasn’t gone dead during the download, and test 1
sometimes results True
, sometimes results False
, sometimes it doesn’t finish(progress bar goes beyond 100%)…
How can my code be salvaged?
Advertisement
Answer
This might not be your only problem but you have a race condition that could show up if you pause and resume quickly (where the definition of quickly varies greatly depending on your circumstances). Consider that you’ve got 32 threads each requesting a MB chunk, let’s call them threads 0-31. They are sitting their downloading and you pause. The threads do not know that you paused until they get a chunk of data as they are sitting in blocking io. Not sure what speed your connection is or how many cores your machine has (threads will sometimes act in parallel when they don’t need the GIL,) but this process could take a lot longer than you expect. Then you unpause and your code creates new threads 32-63 but some or all of threads 0-31 are still waiting for the next chunk. You set threads 32-63 in motion and then you turn off your pause flag. Those threads that didn’t end from 0-31 then wake up and see that things aren’t paused. Now you have multiple threads accessing the same state variables
self.parent.mm[self.position: self.position + len(chunk)] = chunk self.count += len(chunk) self.position += len(chunk) self.parent.progress[self.id]['count'] = self.count self.parent.progress[self.id]['position'] = self.position
so if thread 0 is downloading the same chunk as thread 31 they both keep updating all the same state and they add to position and count even though they are downloading overlapping parts of the file. You even reuse the objects that the threads live inside of so that state can get really really messed up.
for i, md in enumerate(threads): if not md.completed: th = Thread(target=md.multidown, args=(url, self.progress[i]['position'], self.progress[i]['end'])) th.start()
There might be some other problems in your code and it is a lot to sort through so I suggest taking the time to do some refactoring to eliminate duplicate code and organise things into more functions. I don’t believe in crazy tiny functions, but you could use a few sub functions like download_multi(download_state) and download_single maybe. I am relatively confident however that your current problem will be solved if you ensure the threads you have running actually end after you pause. To do so you need to actually hold references to your threads
somewhere:
actual_threads = []
When you create your threads (the first time and after you unpause, or preferably this would be in a function and you’d do it there and return the list):
th = Thread(target=md.multidown, args=( url, start, end)) threads.append(md) actual_threads.append(th)
Then when you unpause:
self.paused = not self.paused for th in actual_threads: th.join()
This way you have the threads working, they quit when you pause and you rebuild them. So join should return as soon as they break out of the blocking io call to iter_content. This way those threads are always dead before you make the new ones.
What I would do myself however would be to create sockets from each thread to the main process. When pause is detected the threads shut down the request and save any data that’s already waiting in the OS buffer then go into a blocking receive on the socket (there might be a way to use select with a socket and requests to allow you to even break out of the blocking io involved in r.iter_content immediately but I leave that for your research). When the program is unpaused the main process would send some value to indicate the program should restart (you’d want at least two signals the threads would recognise, one for quitting gracefully and one to resume. The codes can be single characters.) When the value is sent to each thread that thread will unblock and can then restart the download using requests and its previous state like nothing happened.