利用多线程加速下载文件(Python)
需要的库
tqdm
requests
retry
signal
multitasking
完整源码
# import time
import json
from tqdm import tqdm
from typing import List, Tuple
import os
import requests
from retry import retry
import signal
import multitasking
signal.signal(signal.SIGINT, multitasking.killall)
class Download:
def __init__(self):
"""
多线程下载文件
"""
self.__file_size = None # 文件大小/单位b
self.__file_url = None # 文件路径
self.__MB = pow(1024, 2) # 1MB == 1024^2B
self.savePath = "D:\\AppData\\Download" # 保存路径
self.__headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/99.0.4844.74 Safari/537.36 Edg/99.0.1150.46'
}
self.__split_max = 20 # 分块数
self.__file_min_mb = 10 # 每块最小内存
self.__retry_tries = 3 # 每次连接失败重试次数
@property
def split_max(self):
"""
最大线程数
:return:
"""
return self.__split_max
@split_max.setter
def split_max(self, value):
"""
设置最大线程数
:param value:
:return:
"""
if isinstance(value, int) or value.isdigit():
self.__split_max = int(value)
else:
raise ValueError("split_max value not is int")
def download(self, file_url: str, file_name: str):
"""
下载文件
:param file_url: 文件链接
:param file_name: 文件保存名
:return:
"""
# 如果保存目录不存在,创建
if not os.path.exists(self.savePath):
os.makedirs(self.savePath)
self.__file_url = file_url
# 获取文件大小/单位b
self.__file_size = self.__get_file_size()
self.__download(file_name=file_name)
def __download(self, file_name):
@retry(tries=self.__retry_tries)
@multitasking.task
def split_download(start, ending):
"""
分块下载
:param start: 开头
:param ending: 结尾
:return:
"""
# 拷贝,避开线程之间的影响
part_headers = self.__headers.copy()
# 分段下载的核心/设置下载的文件部分
part_headers['Range'] = 'bytes={start}-{ending}'.format(start=start, ending=ending)
# 发起请求并获取响应(流式)
response = session.get(self.__file_url, headers=part_headers, stream=True)
# 每次读取的流式响应大小/128b
data_size = 128
# 暂存已获取的数据
datas = []
for data in response.iter_content(chunk_size=data_size):
# 暂存获取的响应
datas.append(data)
# 更新进度条
download_tqdm.update(data_size)
file.seek(start)
for data in datas:
file.write(data)
# 释放已写入的资源
del datas
# session保持浏览器状态
session = requests.Session()
# 打开文件
file = open(os.path.join(self.savePath, file_name), "wb")
# 创建进度条
download_tqdm = tqdm(total=self.__file_size, desc=f'下载文件[{file_name}]')
# 开始分线下载
for part in self.__split_list():
split_download(part[0], part[1])
# 等待全部线程结束
multitasking.wait_for_tasks()
file.close()
download_tqdm.close()
def __split_list(self) -> List[Tuple[int, int]]:
"""
获取分块列表
:return:
"""
# 获取文件大小/单位mb
file_size = self.__file_size / self.__MB
# 判断文件大小是否小于最小分块内存
if file_size < self.__file_min_mb:
return [(0, self.__file_size)]
# 获取实际线程数
split_sum = round(file_size / self.__file_min_mb)
split_sum = self.__split_max if split_sum > self.__split_max else split_sum
# 获取每块下载的内存大小
split_size = self.__file_size // split_sum
# print("file_size:", self.__file_size)
# print("file_MB:", file_size)
# print("split_sum:", split_sum)
# print("split_size:", split_size)
# print("split_size_mb:", split_size / self.__MB)
# 分块列表
split_list = []
# 获取分块列表
for start in range(0, self.__file_size, split_size):
if start + 2 * split_size > self.__file_size:
ending = self.__file_size
split_list.append((start, ending))
break
else:
ending = start + split_size
split_list.append((start, ending))
return split_list
def __get_file_size(self):
"""
获取文件大小/单位b
:return:
"""
response = requests.head(self.__file_url)
file_size = response.headers.get('Content-Length')
# print(json.dumps(dict(response.headers), indent=4, ensure_ascii=False))
if file_size is None:
raise ValueError('该文件不支持多线程分段下载!')
return eval(file_size)
示例代码
if __name__ == '__main__':
# Anaconda3
# 文件下载路径
url = "https://repo.anaconda.com/archive/Anaconda3-2021.11-Windows-x86_64.exe"
# 文件保存名字
file_name = "Anaconda3-2021.11-Windows-x86_64.exe"
# 实例化
d = Download()
# 设置最大线程数
# d.split_max = 36
d.download(file_url=url, file_name=file_name)
注意事项
1、使用时更改文件下载路径和文件保存的名字
2、文件下载后默认保存在”D:\AppData\Download”下(可以自行更改)
3、下载速度和网速、线程数相关,小于10M的文件单线程下载(可以自行更改)
4、当网速不佳时,开启过多的线程反而会比正常下载更慢