加载中...

利用多线程加速下载文件(Python)


利用多线程加速下载文件(Python)

需要的库

tqdm
requests
retry
signal
multitasking

完整源码

# import time
import json
from tqdm import tqdm
from typing import List, Tuple
import os
import requests
from retry import retry
import signal
import multitasking

signal.signal(signal.SIGINT, multitasking.killall)


class Download:
    def __init__(self):
        """
        多线程下载文件
        """
        self.__file_size = None     # 文件大小/单位b
        self.__file_url = None  # 文件路径
        self.__MB = pow(1024, 2)  # 1MB == 1024^2B
        self.savePath = "D:\\AppData\\Download"  # 保存路径
        self.__headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
                          'Chrome/99.0.4844.74 Safari/537.36 Edg/99.0.1150.46'
        }
        self.__split_max = 20  # 分块数
        self.__file_min_mb = 10  # 每块最小内存
        self.__retry_tries = 3  # 每次连接失败重试次数

    @property
    def split_max(self):
        """
        最大线程数
        :return:
        """
        return self.__split_max

    @split_max.setter
    def split_max(self, value):
        """
        设置最大线程数
        :param value:
        :return:
        """
        if isinstance(value, int) or value.isdigit():
            self.__split_max = int(value)
        else:
            raise ValueError("split_max value not is int")

    def download(self, file_url: str, file_name: str):
        """
        下载文件
        :param file_url: 文件链接
        :param file_name: 文件保存名
        :return:
        """
        # 如果保存目录不存在,创建
        if not os.path.exists(self.savePath):
            os.makedirs(self.savePath)
        self.__file_url = file_url
        # 获取文件大小/单位b
        self.__file_size = self.__get_file_size()
        self.__download(file_name=file_name)

    def __download(self, file_name):
        @retry(tries=self.__retry_tries)
        @multitasking.task
        def split_download(start, ending):
            """
            分块下载
            :param start: 开头
            :param ending: 结尾
            :return:
            """
            # 拷贝,避开线程之间的影响
            part_headers = self.__headers.copy()
            # 分段下载的核心/设置下载的文件部分
            part_headers['Range'] = 'bytes={start}-{ending}'.format(start=start, ending=ending)
            # 发起请求并获取响应(流式)
            response = session.get(self.__file_url, headers=part_headers, stream=True)
            # 每次读取的流式响应大小/128b
            data_size = 128
            # 暂存已获取的数据
            datas = []
            for data in response.iter_content(chunk_size=data_size):
                # 暂存获取的响应
                datas.append(data)
                # 更新进度条
                download_tqdm.update(data_size)
            file.seek(start)
            for data in datas:
                file.write(data)
            # 释放已写入的资源
            del datas

        # session保持浏览器状态
        session = requests.Session()
        # 打开文件
        file = open(os.path.join(self.savePath, file_name), "wb")
        # 创建进度条
        download_tqdm = tqdm(total=self.__file_size, desc=f'下载文件[{file_name}]')
        # 开始分线下载
        for part in self.__split_list():
            split_download(part[0], part[1])
        # 等待全部线程结束
        multitasking.wait_for_tasks()
        file.close()
        download_tqdm.close()

    def __split_list(self) -> List[Tuple[int, int]]:
        """
        获取分块列表
        :return:
        """
        # 获取文件大小/单位mb
        file_size = self.__file_size / self.__MB
        # 判断文件大小是否小于最小分块内存
        if file_size < self.__file_min_mb:
            return [(0, self.__file_size)]
        # 获取实际线程数
        split_sum = round(file_size / self.__file_min_mb)
        split_sum = self.__split_max if split_sum > self.__split_max else split_sum
        # 获取每块下载的内存大小
        split_size = self.__file_size // split_sum

        # print("file_size:", self.__file_size)
        # print("file_MB:", file_size)
        # print("split_sum:", split_sum)
        # print("split_size:", split_size)
        # print("split_size_mb:", split_size / self.__MB)

        # 分块列表
        split_list = []
        # 获取分块列表
        for start in range(0, self.__file_size, split_size):
            if start + 2 * split_size > self.__file_size:
                ending = self.__file_size
                split_list.append((start, ending))
                break
            else:
                ending = start + split_size
                split_list.append((start, ending))
        return split_list

    def __get_file_size(self):
        """
        获取文件大小/单位b
        :return:
        """
        response = requests.head(self.__file_url)
        file_size = response.headers.get('Content-Length')
        # print(json.dumps(dict(response.headers), indent=4, ensure_ascii=False))
        if file_size is None:
            raise ValueError('该文件不支持多线程分段下载!')
        return eval(file_size)

示例代码

if __name__ == '__main__':
    # Anaconda3
    # 文件下载路径
    url = "https://repo.anaconda.com/archive/Anaconda3-2021.11-Windows-x86_64.exe"
    # 文件保存名字
    file_name = "Anaconda3-2021.11-Windows-x86_64.exe"
    # 实例化
    d = Download()
    # 设置最大线程数
    # d.split_max = 36
    d.download(file_url=url, file_name=file_name)

注意事项

1、使用时更改文件下载路径和文件保存的名字

2、文件下载后默认保存在”D:\AppData\Download”下(可以自行更改)

3、下载速度和网速、线程数相关,小于10M的文件单线程下载(可以自行更改)

4、当网速不佳时,开启过多的线程反而会比正常下载更慢


文章作者: 知蝉
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 知蝉 !
评论
  目录