import os
import requests
from urllib.parse import urlsplit
def download_file(
url: str,
save_path,
chunk_size: int = 8192,
timeout: int = 10
) -> str:
"""
通用文件下载函数
:param url: 文件下载地址
:param save_path: 保存路径(可以是目录或完整路径)
:param chunk_size: 下载分块大小(字节)
:param timeout: 请求超时时间(秒)
:return: 最终保存的完整文件路径
Usage:
>>> download_file("https://example.com/image.png", "./downloads")
>>> download_file("https://example.com/doc.pdf", "/tmp/report.pdf")
"""
# 创建会话保持连接
with requests.Session() as session:
try:
response = session.get(url, stream=True, timeout=timeout)
response.raise_for_status() # 自动处理HTTP错误
# 获取文件名逻辑
filename = None
content_disposition = response.headers.get('Content-Disposition', '')
if 'filename=' in content_disposition:
filename = content_disposition.split('filename=')[-1].strip('"\'')
# 从URL路径获取文件名(如果header中没有)
if not filename:
path = urlsplit(url).path
filename = os.path.basename(path) or 'downloaded_file'
# 自动补充文件扩展名(根据Content-Type)
mime_map = {
'image/png': '.png',
'application/pdf': '.pdf',
'application/msword': '.doc',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx',
'text/plain': '.txt'
}
content_type = response.headers.get('Content-Type', '').split(';')[0]
ext = mime_map.get(content_type, os.path.splitext(filename)[1])
if not os.path.splitext(filename)[1]:
filename += ext
# 构建完整保存路径
if save_path:
if os.path.isdir(save_path):
filepath = os.path.join(save_path, filename)
else:
filepath = save_path
else:
filepath = filename
# 确保目录存在
os.makedirs(os.path.dirname(filepath), exist_ok=True)
# 分块下载大文件
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk: # 过滤保持活动的空块
f.write(chunk)
return filepath
except requests.exceptions.RequestException as e:
raise RuntimeError(f"下载失败: {str(e)}") from e
except IOError as e:
raise RuntimeError(f"文件写入失败: {str(e)}") from e