大数据

python 文件下载功能

import os
import requests
from urllib.parse import urlsplit


def download_file(
        url: str,
        save_path,
        chunk_size: int = 8192,
        timeout: int = 10
) -> str:
    """
    通用文件下载函数

    :param url: 文件下载地址
    :param save_path: 保存路径(可以是目录或完整路径)
    :param chunk_size: 下载分块大小(字节)
    :param timeout: 请求超时时间(秒)
    :return: 最终保存的完整文件路径

    Usage:
    >>> download_file("https://example.com/image.png", "./downloads")
    >>> download_file("https://example.com/doc.pdf", "/tmp/report.pdf")
    """
    # 创建会话保持连接
    with requests.Session() as session:
        try:
            response = session.get(url, stream=True, timeout=timeout)
            response.raise_for_status()  # 自动处理HTTP错误

            # 获取文件名逻辑
            filename = None
            content_disposition = response.headers.get('Content-Disposition', '')
            if 'filename=' in content_disposition:
                filename = content_disposition.split('filename=')[-1].strip('"\'')

            # 从URL路径获取文件名(如果header中没有)
            if not filename:
                path = urlsplit(url).path
                filename = os.path.basename(path) or 'downloaded_file'

            # 自动补充文件扩展名(根据Content-Type)
            mime_map = {
                'image/png': '.png',
                'application/pdf': '.pdf',
                'application/msword': '.doc',
                'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx',
                'text/plain': '.txt'
            }

            content_type = response.headers.get('Content-Type', '').split(';')[0]
            ext = mime_map.get(content_type, os.path.splitext(filename)[1])
            if not os.path.splitext(filename)[1]:
                filename += ext

            # 构建完整保存路径
            if save_path:
                if os.path.isdir(save_path):
                    filepath = os.path.join(save_path, filename)
                else:
                    filepath = save_path
            else:
                filepath = filename

            # 确保目录存在
            os.makedirs(os.path.dirname(filepath), exist_ok=True)

            # 分块下载大文件
            with open(filepath, 'wb') as f:
                for chunk in response.iter_content(chunk_size=chunk_size):
                    if chunk:  # 过滤保持活动的空块
                        f.write(chunk)

            return filepath

        except requests.exceptions.RequestException as e:
            raise RuntimeError(f"下载失败: {str(e)}") from e
        except IOError as e:
            raise RuntimeError(f"文件写入失败: {str(e)}") from e