5.1 多部分上传概述
5.1.1 多部分上传的概念和优势
多部分上传是一种将大文件分割成多个小部分并行上传的技术,特别适用于大文件的上传场景。
主要优势: - 并行上传:多个部分可以同时上传,提高速度 - 断点续传:网络中断后可以从断点继续上传 - 内存效率:不需要将整个文件加载到内存 - 错误恢复:单个部分失败不影响其他部分
from minio import Minio
from minio.error import S3Error
from datetime import datetime, timedelta
import os
import io
import hashlib
import threading
import time
from typing import Dict, List, Any, Optional, Callable
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
class MultipartUploadManager:
"""多部分上传管理器"""
def __init__(self, client: Minio, chunk_size: int = 64 * 1024 * 1024):
self.client = client
self.chunk_size = chunk_size # 默认64MB
self.active_uploads = {} # 活跃的上传任务
self.upload_progress = {} # 上传进度
def calculate_file_parts(self, file_size: int) -> Dict[str, Any]:
"""计算文件分片信息"""
try:
# 计算分片数量
total_parts = (file_size + self.chunk_size - 1) // self.chunk_size
# 生成分片信息
parts_info = []
for part_number in range(1, total_parts + 1):
start_offset = (part_number - 1) * self.chunk_size
end_offset = min(start_offset + self.chunk_size, file_size)
part_size = end_offset - start_offset
parts_info.append({
'part_number': part_number,
'start_offset': start_offset,
'end_offset': end_offset,
'part_size': part_size
})
return {
'success': True,
'file_size': file_size,
'chunk_size': self.chunk_size,
'total_parts': total_parts,
'parts_info': parts_info,
'estimated_time_minutes': total_parts * 0.5 # 估算时间
}
except Exception as e:
return {
'success': False,
'error': f"计算文件分片失败: {e}",
'file_size': file_size
}
def initiate_multipart_upload(self, bucket_name: str, object_name: str,
content_type: str = None,
metadata: Dict[str, str] = None) -> Dict[str, Any]:
"""初始化多部分上传"""
try:
# 检查存储桶是否存在
if not self.client.bucket_exists(bucket_name):
return {
'success': False,
'error': f"存储桶不存在: {bucket_name}",
'bucket_name': bucket_name
}
# 初始化多部分上传
upload_id = self.client._create_multipart_upload(
bucket_name=bucket_name,
object_name=object_name,
headers={
'Content-Type': content_type or 'application/octet-stream'
},
metadata=metadata or {}
)
# 记录上传任务
upload_key = f"{bucket_name}/{object_name}/{upload_id}"
self.active_uploads[upload_key] = {
'bucket_name': bucket_name,
'object_name': object_name,
'upload_id': upload_id,
'content_type': content_type,
'metadata': metadata,
'initiated_at': datetime.now().isoformat(),
'parts': {},
'status': 'initiated'
}
return {
'success': True,
'message': f"多部分上传初始化成功: {bucket_name}/{object_name}",
'bucket_name': bucket_name,
'object_name': object_name,
'upload_id': upload_id,
'upload_key': upload_key,
'initiated_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"初始化多部分上传失败: {e}",
'bucket_name': bucket_name,
'object_name': object_name
}
def upload_part(self, upload_key: str, part_number: int,
data: bytes, progress_callback: Callable = None) -> Dict[str, Any]:
"""上传单个分片"""
try:
if upload_key not in self.active_uploads:
return {
'success': False,
'error': '上传任务不存在',
'upload_key': upload_key
}
upload_info = self.active_uploads[upload_key]
# 计算数据哈希
data_hash = hashlib.md5(data).hexdigest()
# 上传分片
start_time = time.time()
etag = self.client._upload_part(
bucket_name=upload_info['bucket_name'],
object_name=upload_info['object_name'],
upload_id=upload_info['upload_id'],
part_number=part_number,
data=io.BytesIO(data),
headers={}
)
upload_time = time.time() - start_time
upload_speed = len(data) / upload_time / 1024 / 1024 # MB/s
# 记录分片信息
part_info = {
'part_number': part_number,
'etag': etag,
'size': len(data),
'md5_hash': data_hash,
'upload_time_seconds': upload_time,
'upload_speed_mbps': upload_speed,
'uploaded_at': datetime.now().isoformat()
}
upload_info['parts'][part_number] = part_info
# 调用进度回调
if progress_callback:
progress_callback(part_number, len(data), upload_speed)
return {
'success': True,
'message': f"分片上传成功: Part {part_number}",
'upload_key': upload_key,
'part_number': part_number,
'etag': etag,
'size': len(data),
'upload_speed_mbps': upload_speed,
'uploaded_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f"上传分片失败: {e}",
'upload_key': upload_key,
'part_number': part_number
}
def complete_multipart_upload(self, upload_key: str) -> Dict[str, Any]:
"""完成多部分上传"""
try:
if upload_key not in self.active_uploads:
return {
'success': False,
'error': '上传任务不存在',
'upload_key': upload_key
}
upload_info = self.active_uploads[upload_key]
# 检查所有分片是否都已上传
if not upload_info['parts']:
return {
'success': False,
'error': '没有已上传的分片',
'upload_key': upload_key
}
# 准备分片列表
parts = []
for part_number in sorted(upload_info['parts'].keys()):
part_info = upload_info['parts'][part_number]
parts.append({
'PartNumber': part_number,
'ETag': part_info['etag']
})
# 完成多部分上传
result = self.client._complete_multipart_upload(
bucket_name=upload_info['bucket_name'],
object_name=upload_info['object_name'],
upload_id=upload_info['upload_id'],
parts=parts
)
# 计算总体统计
total_size = sum(part['size'] for part in upload_info['parts'].values())
total_time = sum(part['upload_time_seconds'] for part in upload_info['parts'].values())
average_speed = (total_size / total_time / 1024 / 1024) if total_time > 0 else 0
# 更新上传状态
upload_info['status'] = 'completed'
upload_info['completed_at'] = datetime.now().isoformat()
upload_info['etag'] = result.get('etag')
# 移除活跃上传记录
completed_upload = self.active_uploads.pop(upload_key)
return {
'success': True,
'message': f"多部分上传完成: {upload_info['bucket_name']}/{upload_info['object_name']}",
'bucket_name': upload_info['bucket_name'],
'object_name': upload_info['object_name'],
'upload_id': upload_info['upload_id'],
'etag': result.get('etag'),
'total_parts': len(upload_info['parts']),
'total_size_mb': total_size / 1024 / 1024,
'total_time_seconds': total_time,
'average_speed_mbps': average_speed,
'completed_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f"完成多部分上传失败: {e}",
'upload_key': upload_key
}
def abort_multipart_upload(self, upload_key: str) -> Dict[str, Any]:
"""中止多部分上传"""
try:
if upload_key not in self.active_uploads:
return {
'success': False,
'error': '上传任务不存在',
'upload_key': upload_key
}
upload_info = self.active_uploads[upload_key]
# 中止多部分上传
self.client._abort_multipart_upload(
bucket_name=upload_info['bucket_name'],
object_name=upload_info['object_name'],
upload_id=upload_info['upload_id']
)
# 移除上传记录
aborted_upload = self.active_uploads.pop(upload_key)
return {
'success': True,
'message': f"多部分上传已中止: {upload_info['bucket_name']}/{upload_info['object_name']}",
'bucket_name': upload_info['bucket_name'],
'object_name': upload_info['object_name'],
'upload_id': upload_info['upload_id'],
'aborted_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f"中止多部分上传失败: {e}",
'upload_key': upload_key
}
def list_active_uploads(self, bucket_name: str = None) -> Dict[str, Any]:
"""列出活跃的多部分上传"""
try:
active_uploads = []
for upload_key, upload_info in self.active_uploads.items():
if bucket_name is None or upload_info['bucket_name'] == bucket_name:
# 计算上传进度
total_parts = len(upload_info['parts'])
total_size = sum(part['size'] for part in upload_info['parts'].values())
active_uploads.append({
'upload_key': upload_key,
'bucket_name': upload_info['bucket_name'],
'object_name': upload_info['object_name'],
'upload_id': upload_info['upload_id'],
'status': upload_info['status'],
'initiated_at': upload_info['initiated_at'],
'total_parts_uploaded': total_parts,
'total_size_mb': total_size / 1024 / 1024,
'content_type': upload_info.get('content_type'),
'metadata': upload_info.get('metadata', {})
})
return {
'success': True,
'bucket_name': bucket_name,
'total_active_uploads': len(active_uploads),
'active_uploads': active_uploads,
'listed_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f"列出活跃上传失败: {e}",
'bucket_name': bucket_name
}
## 5.2 大文件上传器
### 5.2.1 并行上传实现
```python
class LargeFileUploader:
"""大文件上传器"""
def __init__(self, client: Minio, chunk_size: int = 64 * 1024 * 1024,
max_workers: int = 4):
self.client = client
self.chunk_size = chunk_size
self.max_workers = max_workers
self.multipart_manager = MultipartUploadManager(client, chunk_size)
def upload_large_file(self, file_path: str, bucket_name: str, object_name: str,
content_type: str = None, metadata: Dict[str, str] = None,
progress_callback: Callable = None) -> Dict[str, Any]:
"""上传大文件"""
try:
# 检查文件是否存在
if not os.path.exists(file_path):
return {
'success': False,
'error': f"文件不存在: {file_path}",
'file_path': file_path
}
# 获取文件信息
file_size = os.path.getsize(file_path)
file_name = os.path.basename(file_path)
print(f"开始上传大文件: {file_name} ({file_size / 1024 / 1024:.2f} MB)")
# 计算分片信息
parts_info = self.multipart_manager.calculate_file_parts(file_size)
if not parts_info['success']:
return parts_info
print(f"文件将分为 {parts_info['total_parts']} 个分片上传")
# 初始化多部分上传
init_result = self.multipart_manager.initiate_multipart_upload(
bucket_name=bucket_name,
object_name=object_name,
content_type=content_type,
metadata=metadata
)
if not init_result['success']:
return init_result
upload_key = init_result['upload_key']
try:
# 并行上传分片
upload_results = self._upload_parts_parallel(
file_path=file_path,
upload_key=upload_key,
parts_info=parts_info['parts_info'],
progress_callback=progress_callback
)
# 检查上传结果
failed_parts = [result for result in upload_results if not result['success']]
if failed_parts:
# 中止上传
self.multipart_manager.abort_multipart_upload(upload_key)
return {
'success': False,
'error': f"部分分片上传失败: {len(failed_parts)} 个",
'failed_parts': failed_parts[:5], # 只显示前5个失败的分片
'file_path': file_path
}
# 完成多部分上传
complete_result = self.multipart_manager.complete_multipart_upload(upload_key)
if complete_result['success']:
print(f"✅ 大文件上传完成: {file_name}")
print(f" 总大小: {complete_result['total_size_mb']:.2f} MB")
print(f" 总时间: {complete_result['total_time_seconds']:.2f} 秒")
print(f" 平均速度: {complete_result['average_speed_mbps']:.2f} MB/s")
return complete_result
except Exception as e:
# 发生错误时中止上传
self.multipart_manager.abort_multipart_upload(upload_key)
raise e
except Exception as e:
return {
'success': False,
'error': f"上传大文件失败: {e}",
'file_path': file_path
}
def _upload_parts_parallel(self, file_path: str, upload_key: str,
parts_info: List[Dict], progress_callback: Callable = None) -> List[Dict]:
"""并行上传分片"""
results = []
completed_parts = 0
total_parts = len(parts_info)
def part_progress_callback(part_number: int, size: int, speed: float):
nonlocal completed_parts
completed_parts += 1
if progress_callback:
progress_callback(completed_parts, total_parts, size, speed)
# 使用线程池并行上传
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交所有上传任务
future_to_part = {}
with open(file_path, 'rb') as file:
for part_info in parts_info:
# 读取分片数据
file.seek(part_info['start_offset'])
part_data = file.read(part_info['part_size'])
# 提交上传任务
future = executor.submit(
self.multipart_manager.upload_part,
upload_key,
part_info['part_number'],
part_data,
part_progress_callback
)
future_to_part[future] = part_info['part_number']
# 收集结果
for future in as_completed(future_to_part):
part_number = future_to_part[future]
try:
result = future.result()
results.append(result)
if result['success']:
print(f"分片 {part_number} 上传完成 ({result['upload_speed_mbps']:.2f} MB/s)")
else:
print(f"分片 {part_number} 上传失败: {result['error']}")
except Exception as e:
results.append({
'success': False,
'error': f"分片 {part_number} 上传异常: {e}",
'part_number': part_number
})
return results
def upload_large_stream(self, data_stream: io.IOBase, bucket_name: str, object_name: str,
total_size: int, content_type: str = None,
metadata: Dict[str, str] = None,
progress_callback: Callable = None) -> Dict[str, Any]:
"""上传大数据流"""
try:
print(f"开始上传大数据流: {object_name} ({total_size / 1024 / 1024:.2f} MB)")
# 计算分片信息
parts_info = self.multipart_manager.calculate_file_parts(total_size)
if not parts_info['success']:
return parts_info
# 初始化多部分上传
init_result = self.multipart_manager.initiate_multipart_upload(
bucket_name=bucket_name,
object_name=object_name,
content_type=content_type,
metadata=metadata
)
if not init_result['success']:
return init_result
upload_key = init_result['upload_key']
try:
# 顺序上传分片(流不支持并行)
upload_results = []
for i, part_info in enumerate(parts_info['parts_info']):
# 读取分片数据
part_data = data_stream.read(part_info['part_size'])
if len(part_data) == 0:
break
# 上传分片
result = self.multipart_manager.upload_part(
upload_key=upload_key,
part_number=part_info['part_number'],
data=part_data,
progress_callback=lambda pn, size, speed: progress_callback(
i + 1, len(parts_info['parts_info']), size, speed
) if progress_callback else None
)
upload_results.append(result)
if not result['success']:
print(f"分片 {part_info['part_number']} 上传失败: {result['error']}")
break
print(f"分片 {part_info['part_number']} 上传完成 ({result['upload_speed_mbps']:.2f} MB/s)")
# 检查上传结果
failed_parts = [result for result in upload_results if not result['success']]
if failed_parts:
self.multipart_manager.abort_multipart_upload(upload_key)
return {
'success': False,
'error': f"部分分片上传失败: {len(failed_parts)} 个",
'failed_parts': failed_parts[:5]
}
# 完成多部分上传
complete_result = self.multipart_manager.complete_multipart_upload(upload_key)
if complete_result['success']:
print(f"✅ 大数据流上传完成: {object_name}")
return complete_result
except Exception as e:
self.multipart_manager.abort_multipart_upload(upload_key)
raise e
except Exception as e:
return {
'success': False,
'error': f"上传大数据流失败: {e}",
'object_name': object_name
}
def resume_upload(self, file_path: str, bucket_name: str, object_name: str,
upload_id: str, progress_callback: Callable = None) -> Dict[str, Any]:
"""恢复中断的上传"""
try:
# 重建上传键
upload_key = f"{bucket_name}/{object_name}/{upload_id}"
# 检查文件
if not os.path.exists(file_path):
return {
'success': False,
'error': f"文件不存在: {file_path}",
'file_path': file_path
}
file_size = os.path.getsize(file_path)
# 获取已上传的分片
try:
uploaded_parts = self.client._list_parts(
bucket_name=bucket_name,
object_name=object_name,
upload_id=upload_id
)
except:
return {
'success': False,
'error': '无法获取已上传的分片信息,上传可能已过期',
'upload_id': upload_id
}
# 重建上传信息
self.multipart_manager.active_uploads[upload_key] = {
'bucket_name': bucket_name,
'object_name': object_name,
'upload_id': upload_id,
'status': 'resumed',
'parts': {}
}
# 记录已上传的分片
for part in uploaded_parts:
self.multipart_manager.active_uploads[upload_key]['parts'][part['PartNumber']] = {
'part_number': part['PartNumber'],
'etag': part['ETag'],
'size': part['Size'],
'uploaded_at': part['LastModified'].isoformat()
}
print(f"恢复上传: {len(uploaded_parts)} 个分片已完成")
# 计算需要上传的分片
parts_info = self.multipart_manager.calculate_file_parts(file_size)
uploaded_part_numbers = {part['PartNumber'] for part in uploaded_parts}
remaining_parts = [
part for part in parts_info['parts_info']
if part['part_number'] not in uploaded_part_numbers
]
if not remaining_parts:
# 所有分片都已上传,直接完成
return self.multipart_manager.complete_multipart_upload(upload_key)
print(f"需要上传剩余 {len(remaining_parts)} 个分片")
# 上传剩余分片
upload_results = self._upload_parts_parallel(
file_path=file_path,
upload_key=upload_key,
parts_info=remaining_parts,
progress_callback=progress_callback
)
# 检查结果并完成上传
failed_parts = [result for result in upload_results if not result['success']]
if failed_parts:
return {
'success': False,
'error': f"恢复上传失败: {len(failed_parts)} 个分片上传失败",
'failed_parts': failed_parts[:5]
}
return self.multipart_manager.complete_multipart_upload(upload_key)
except Exception as e:
return {
'success': False,
'error': f"恢复上传失败: {e}",
'file_path': file_path,
'upload_id': upload_id
}
## 5.3 下载管理器
### 5.3.1 分片下载和断点续传
```python
class LargeFileDownloader:
"""大文件下载器"""
def __init__(self, client: Minio, chunk_size: int = 64 * 1024 * 1024,
max_workers: int = 4):
self.client = client
self.chunk_size = chunk_size
self.max_workers = max_workers
def get_object_info(self, bucket_name: str, object_name: str) -> Dict[str, Any]:
"""获取对象信息"""
try:
stat = self.client.stat_object(bucket_name, object_name)
return {
'success': True,
'bucket_name': bucket_name,
'object_name': object_name,
'size': stat.size,
'etag': stat.etag,
'content_type': stat.content_type,
'last_modified': stat.last_modified.isoformat(),
'metadata': stat.metadata
}
except S3Error as e:
return {
'success': False,
'error': f"获取对象信息失败: {e}",
'bucket_name': bucket_name,
'object_name': object_name
}
def download_large_file(self, bucket_name: str, object_name: str, file_path: str,
progress_callback: Callable = None,
resume: bool = True) -> Dict[str, Any]:
"""下载大文件"""
try:
# 获取对象信息
obj_info = self.get_object_info(bucket_name, object_name)
if not obj_info['success']:
return obj_info
total_size = obj_info['size']
print(f"开始下载大文件: {object_name} ({total_size / 1024 / 1024:.2f} MB)")
# 检查是否支持断点续传
downloaded_size = 0
if resume and os.path.exists(file_path):
downloaded_size = os.path.getsize(file_path)
if downloaded_size >= total_size:
return {
'success': True,
'message': '文件已完整下载',
'file_path': file_path,
'size': total_size
}
print(f"检测到部分下载文件,从 {downloaded_size / 1024 / 1024:.2f} MB 处继续")
# 计算下载分片
remaining_size = total_size - downloaded_size
parts_info = self._calculate_download_parts(downloaded_size, total_size)
print(f"文件将分为 {len(parts_info)} 个分片下载")
# 并行下载分片
start_time = time.time()
download_results = self._download_parts_parallel(
bucket_name=bucket_name,
object_name=object_name,
file_path=file_path,
parts_info=parts_info,
progress_callback=progress_callback
)
download_time = time.time() - start_time
# 检查下载结果
failed_parts = [result for result in download_results if not result['success']]
if failed_parts:
return {
'success': False,
'error': f"部分分片下载失败: {len(failed_parts)} 个",
'failed_parts': failed_parts[:5],
'file_path': file_path
}
# 验证文件完整性
final_size = os.path.getsize(file_path)
if final_size != total_size:
return {
'success': False,
'error': f"文件大小不匹配: 期望 {total_size}, 实际 {final_size}",
'file_path': file_path
}
download_speed = total_size / download_time / 1024 / 1024
print(f"✅ 大文件下载完成: {os.path.basename(file_path)}")
print(f" 总大小: {total_size / 1024 / 1024:.2f} MB")
print(f" 总时间: {download_time:.2f} 秒")
print(f" 平均速度: {download_speed:.2f} MB/s")
return {
'success': True,
'message': f"大文件下载完成: {object_name}",
'bucket_name': bucket_name,
'object_name': object_name,
'file_path': file_path,
'total_size_mb': total_size / 1024 / 1024,
'download_time_seconds': download_time,
'average_speed_mbps': download_speed,
'downloaded_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f"下载大文件失败: {e}",
'bucket_name': bucket_name,
'object_name': object_name,
'file_path': file_path
}
def _calculate_download_parts(self, start_offset: int, total_size: int) -> List[Dict]:
"""计算下载分片"""
parts_info = []
current_offset = start_offset
part_number = 1
while current_offset < total_size:
end_offset = min(current_offset + self.chunk_size, total_size)
part_size = end_offset - current_offset
parts_info.append({
'part_number': part_number,
'start_offset': current_offset,
'end_offset': end_offset - 1, # HTTP Range 是包含的
'part_size': part_size
})
current_offset = end_offset
part_number += 1
return parts_info
def _download_parts_parallel(self, bucket_name: str, object_name: str,
file_path: str, parts_info: List[Dict],
progress_callback: Callable = None) -> List[Dict]:
"""并行下载分片"""
results = []
completed_parts = 0
total_parts = len(parts_info)
# 创建临时文件用于写入
temp_files = {}
def part_progress_callback(part_number: int, size: int, speed: float):
nonlocal completed_parts
completed_parts += 1
if progress_callback:
progress_callback(completed_parts, total_parts, size, speed)
try:
# 使用线程池并行下载
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交所有下载任务
future_to_part = {}
for part_info in parts_info:
future = executor.submit(
self._download_part,
bucket_name,
object_name,
part_info,
part_progress_callback
)
future_to_part[future] = part_info['part_number']
# 收集结果
for future in as_completed(future_to_part):
part_number = future_to_part[future]
try:
result = future.result()
results.append(result)
if result['success']:
temp_files[part_number] = result['temp_data']
print(f"分片 {part_number} 下载完成 ({result['download_speed_mbps']:.2f} MB/s)")
else:
print(f"分片 {part_number} 下载失败: {result['error']}")
except Exception as e:
results.append({
'success': False,
'error': f"分片 {part_number} 下载异常: {e}",
'part_number': part_number
})
# 合并分片到目标文件
if all(result['success'] for result in results):
self._merge_parts_to_file(file_path, parts_info, temp_files)
return results
finally:
# 清理临时数据
temp_files.clear()
def _download_part(self, bucket_name: str, object_name: str,
part_info: Dict, progress_callback: Callable = None) -> Dict[str, Any]:
"""下载单个分片"""
try:
start_time = time.time()
# 使用HTTP Range请求下载分片
response = self.client.get_object(
bucket_name=bucket_name,
object_name=object_name,
offset=part_info['start_offset'],
length=part_info['part_size']
)
# 读取数据
data = response.read()
response.close()
download_time = time.time() - start_time
download_speed = len(data) / download_time / 1024 / 1024 # MB/s
# 调用进度回调
if progress_callback:
progress_callback(part_info['part_number'], len(data), download_speed)
return {
'success': True,
'part_number': part_info['part_number'],
'size': len(data),
'download_speed_mbps': download_speed,
'temp_data': data,
'downloaded_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f"下载分片失败: {e}",
'part_number': part_info['part_number']
}
def _merge_parts_to_file(self, file_path: str, parts_info: List[Dict],
temp_files: Dict[int, bytes]):
"""合并分片到文件"""
# 确保目录存在
os.makedirs(os.path.dirname(file_path), exist_ok=True)
# 按分片顺序写入文件
with open(file_path, 'ab') as f:
for part_info in sorted(parts_info, key=lambda x: x['part_number']):
part_number = part_info['part_number']
if part_number in temp_files:
f.write(temp_files[part_number])
## 5.4 传输监控和管理
### 5.4.1 传输进度监控
```python
class TransferMonitor:
"""传输监控器"""
def __init__(self):
self.active_transfers = {}
self.transfer_history = []
self.lock = threading.Lock()
def start_transfer(self, transfer_id: str, transfer_type: str,
bucket_name: str, object_name: str,
total_size: int) -> Dict[str, Any]:
"""开始传输监控"""
with self.lock:
transfer_info = {
'transfer_id': transfer_id,
'transfer_type': transfer_type, # 'upload' or 'download'
'bucket_name': bucket_name,
'object_name': object_name,
'total_size': total_size,
'transferred_size': 0,
'progress_percentage': 0.0,
'start_time': time.time(),
'last_update_time': time.time(),
'current_speed_mbps': 0.0,
'average_speed_mbps': 0.0,
'estimated_remaining_seconds': 0,
'status': 'active',
'parts_completed': 0,
'total_parts': 0,
'error_count': 0,
'retry_count': 0
}
self.active_transfers[transfer_id] = transfer_info
return {
'success': True,
'transfer_id': transfer_id,
'message': f"开始监控传输: {transfer_type} {bucket_name}/{object_name}"
}
def update_progress(self, transfer_id: str, transferred_size: int,
parts_completed: int = None, total_parts: int = None) -> Dict[str, Any]:
"""更新传输进度"""
with self.lock:
if transfer_id not in self.active_transfers:
return {
'success': False,
'error': '传输不存在',
'transfer_id': transfer_id
}
transfer_info = self.active_transfers[transfer_id]
current_time = time.time()
# 更新基本信息
old_transferred_size = transfer_info['transferred_size']
transfer_info['transferred_size'] = transferred_size
transfer_info['progress_percentage'] = (transferred_size / transfer_info['total_size']) * 100
# 更新分片信息
if parts_completed is not None:
transfer_info['parts_completed'] = parts_completed
if total_parts is not None:
transfer_info['total_parts'] = total_parts
# 计算速度
time_diff = current_time - transfer_info['last_update_time']
if time_diff > 0:
size_diff = transferred_size - old_transferred_size
transfer_info['current_speed_mbps'] = (size_diff / time_diff) / 1024 / 1024
# 计算平均速度
total_time = current_time - transfer_info['start_time']
if total_time > 0:
transfer_info['average_speed_mbps'] = (transferred_size / total_time) / 1024 / 1024
# 估算剩余时间
remaining_size = transfer_info['total_size'] - transferred_size
if transfer_info['average_speed_mbps'] > 0:
transfer_info['estimated_remaining_seconds'] = remaining_size / (transfer_info['average_speed_mbps'] * 1024 * 1024)
transfer_info['last_update_time'] = current_time
return {
'success': True,
'transfer_id': transfer_id,
'progress_percentage': transfer_info['progress_percentage'],
'current_speed_mbps': transfer_info['current_speed_mbps'],
'average_speed_mbps': transfer_info['average_speed_mbps'],
'estimated_remaining_seconds': transfer_info['estimated_remaining_seconds']
}
def complete_transfer(self, transfer_id: str, success: bool = True,
error_message: str = None) -> Dict[str, Any]:
"""完成传输"""
with self.lock:
if transfer_id not in self.active_transfers:
return {
'success': False,
'error': '传输不存在',
'transfer_id': transfer_id
}
transfer_info = self.active_transfers.pop(transfer_id)
# 更新状态
transfer_info['status'] = 'completed' if success else 'failed'
transfer_info['end_time'] = time.time()
transfer_info['total_time'] = transfer_info['end_time'] - transfer_info['start_time']
if error_message:
transfer_info['error_message'] = error_message
# 添加到历史记录
self.transfer_history.append(transfer_info)
# 保持历史记录数量限制
if len(self.transfer_history) > 100:
self.transfer_history = self.transfer_history[-100:]
return {
'success': True,
'transfer_id': transfer_id,
'status': transfer_info['status'],
'total_time': transfer_info['total_time'],
'average_speed_mbps': transfer_info['average_speed_mbps']
}
def get_transfer_status(self, transfer_id: str) -> Dict[str, Any]:
"""获取传输状态"""
with self.lock:
# 检查活跃传输
if transfer_id in self.active_transfers:
transfer_info = self.active_transfers[transfer_id].copy()
transfer_info['is_active'] = True
return {
'success': True,
'transfer_info': transfer_info
}
# 检查历史记录
for transfer_info in reversed(self.transfer_history):
if transfer_info['transfer_id'] == transfer_id:
transfer_info = transfer_info.copy()
transfer_info['is_active'] = False
return {
'success': True,
'transfer_info': transfer_info
}
return {
'success': False,
'error': '传输不存在',
'transfer_id': transfer_id
}
def list_active_transfers(self) -> Dict[str, Any]:
"""列出活跃传输"""
with self.lock:
active_transfers = []
for transfer_id, transfer_info in self.active_transfers.items():
active_transfers.append({
'transfer_id': transfer_id,
'transfer_type': transfer_info['transfer_type'],
'bucket_name': transfer_info['bucket_name'],
'object_name': transfer_info['object_name'],
'progress_percentage': transfer_info['progress_percentage'],
'current_speed_mbps': transfer_info['current_speed_mbps'],
'estimated_remaining_seconds': transfer_info['estimated_remaining_seconds'],
'parts_completed': transfer_info['parts_completed'],
'total_parts': transfer_info['total_parts']
})
return {
'success': True,
'total_active_transfers': len(active_transfers),
'active_transfers': active_transfers,
'listed_at': datetime.now().isoformat()
}
def get_transfer_statistics(self) -> Dict[str, Any]:
"""获取传输统计"""
with self.lock:
# 统计活跃传输
active_count = len(self.active_transfers)
active_upload_count = sum(1 for t in self.active_transfers.values() if t['transfer_type'] == 'upload')
active_download_count = active_count - active_upload_count
# 统计历史传输
total_completed = len(self.transfer_history)
successful_transfers = sum(1 for t in self.transfer_history if t['status'] == 'completed')
failed_transfers = total_completed - successful_transfers
# 计算平均速度
if successful_transfers > 0:
avg_speed = sum(t['average_speed_mbps'] for t in self.transfer_history if t['status'] == 'completed') / successful_transfers
else:
avg_speed = 0
# 计算总传输量
total_bytes_transferred = sum(t['transferred_size'] for t in self.transfer_history)
return {
'success': True,
'active_transfers': {
'total': active_count,
'uploads': active_upload_count,
'downloads': active_download_count
},
'completed_transfers': {
'total': total_completed,
'successful': successful_transfers,
'failed': failed_transfers,
'success_rate': (successful_transfers / total_completed * 100) if total_completed > 0 else 0
},
'performance': {
'average_speed_mbps': avg_speed,
'total_bytes_transferred': total_bytes_transferred,
'total_mb_transferred': total_bytes_transferred / 1024 / 1024
},
'generated_at': datetime.now().isoformat()
}
## 5.5 实际应用示例
### 5.5.1 大文件传输系统
```python
class LargeFileTransferSystem:
"""大文件传输系统"""
def __init__(self, client: Minio):
self.client = client
self.uploader = LargeFileUploader(client)
self.downloader = LargeFileDownloader(client)
self.monitor = TransferMonitor()
def upload_with_monitoring(self, file_path: str, bucket_name: str, object_name: str,
content_type: str = None, metadata: Dict[str, str] = None) -> Dict[str, Any]:
"""带监控的文件上传"""
try:
# 生成传输ID
import uuid
transfer_id = str(uuid.uuid4())
# 获取文件大小
file_size = os.path.getsize(file_path)
# 开始监控
self.monitor.start_transfer(
transfer_id=transfer_id,
transfer_type='upload',
bucket_name=bucket_name,
object_name=object_name,
total_size=file_size
)
# 定义进度回调
def progress_callback(completed_parts: int, total_parts: int, part_size: int, speed: float):
# 估算已传输大小
estimated_transferred = completed_parts * self.uploader.chunk_size
estimated_transferred = min(estimated_transferred, file_size)
self.monitor.update_progress(
transfer_id=transfer_id,
transferred_size=estimated_transferred,
parts_completed=completed_parts,
total_parts=total_parts
)
print(f"上传进度: {completed_parts}/{total_parts} 分片 ({speed:.2f} MB/s)")
# 执行上传
result = self.uploader.upload_large_file(
file_path=file_path,
bucket_name=bucket_name,
object_name=object_name,
content_type=content_type,
metadata=metadata,
progress_callback=progress_callback
)
# 完成监控
self.monitor.complete_transfer(
transfer_id=transfer_id,
success=result['success'],
error_message=result.get('error')
)
# 添加传输ID到结果
result['transfer_id'] = transfer_id
return result
except Exception as e:
# 传输失败
self.monitor.complete_transfer(
transfer_id=transfer_id,
success=False,
error_message=str(e)
)
return {
'success': False,
'error': f"上传失败: {e}",
'transfer_id': transfer_id
}
def download_with_monitoring(self, bucket_name: str, object_name: str, file_path: str,
resume: bool = True) -> Dict[str, Any]:
"""带监控的文件下载"""
try:
# 生成传输ID
import uuid
transfer_id = str(uuid.uuid4())
# 获取对象信息
obj_info = self.downloader.get_object_info(bucket_name, object_name)
if not obj_info['success']:
return obj_info
total_size = obj_info['size']
# 开始监控
self.monitor.start_transfer(
transfer_id=transfer_id,
transfer_type='download',
bucket_name=bucket_name,
object_name=object_name,
total_size=total_size
)
# 定义进度回调
def progress_callback(completed_parts: int, total_parts: int, part_size: int, speed: float):
# 估算已传输大小
estimated_transferred = completed_parts * self.downloader.chunk_size
estimated_transferred = min(estimated_transferred, total_size)
self.monitor.update_progress(
transfer_id=transfer_id,
transferred_size=estimated_transferred,
parts_completed=completed_parts,
total_parts=total_parts
)
print(f"下载进度: {completed_parts}/{total_parts} 分片 ({speed:.2f} MB/s)")
# 执行下载
result = self.downloader.download_large_file(
bucket_name=bucket_name,
object_name=object_name,
file_path=file_path,
progress_callback=progress_callback,
resume=resume
)
# 完成监控
self.monitor.complete_transfer(
transfer_id=transfer_id,
success=result['success'],
error_message=result.get('error')
)
# 添加传输ID到结果
result['transfer_id'] = transfer_id
return result
except Exception as e:
# 传输失败
self.monitor.complete_transfer(
transfer_id=transfer_id,
success=False,
error_message=str(e)
)
return {
'success': False,
'error': f"下载失败: {e}",
'transfer_id': transfer_id
}
def get_system_status(self) -> Dict[str, Any]:
"""获取系统状态"""
# 获取传输统计
stats = self.monitor.get_transfer_statistics()
# 获取活跃传输
active_transfers = self.monitor.list_active_transfers()
# 获取多部分上传状态
multipart_uploads = self.uploader.multipart_manager.list_active_uploads()
return {
'success': True,
'system_status': {
'transfer_statistics': stats,
'active_transfers': active_transfers,
'multipart_uploads': multipart_uploads,
'uploader_config': {
'chunk_size_mb': self.uploader.chunk_size / 1024 / 1024,
'max_workers': self.uploader.max_workers
},
'downloader_config': {
'chunk_size_mb': self.downloader.chunk_size / 1024 / 1024,
'max_workers': self.downloader.max_workers
}
},
'generated_at': datetime.now().isoformat()
}
# 使用示例
def large_file_example():
"""大文件处理示例"""
print("=== MinIO大文件处理示例 ===")
try:
# 连接MinIO
client = Minio(
endpoint="localhost:9000",
access_key="minioadmin",
secret_key="minioadmin",
secure=False
)
bucket_name = "large-files"
# 创建存储桶
if not client.bucket_exists(bucket_name):
client.make_bucket(bucket_name)
print(f"✅ 创建存储桶: {bucket_name}")
# 创建传输系统
transfer_system = LargeFileTransferSystem(client)
# 1. 创建大文件用于测试
print("\n1. 创建测试大文件...")
test_file_path = "test_large_file.bin"
file_size_mb = 200 # 200MB测试文件
with open(test_file_path, 'wb') as f:
# 写入随机数据
chunk_size = 1024 * 1024 # 1MB chunks
for i in range(file_size_mb):
data = os.urandom(chunk_size)
f.write(data)
print(f"✅ 创建测试文件: {test_file_path} ({file_size_mb} MB)")
# 2. 上传大文件
print("\n2. 上传大文件...")
object_name = "test-large-file.bin"
upload_result = transfer_system.upload_with_monitoring(
file_path=test_file_path,
bucket_name=bucket_name,
object_name=object_name,
content_type="application/octet-stream",
metadata={"source": "test", "size_mb": str(file_size_mb)}
)
if upload_result['success']:
print(f"✅ 大文件上传成功")
print(f" 传输ID: {upload_result['transfer_id']}")
print(f" 平均速度: {upload_result['average_speed_mbps']:.2f} MB/s")
else:
print(f"❌ 上传失败: {upload_result['error']}")
return
# 3. 下载大文件
print("\n3. 下载大文件...")
download_path = "downloaded_large_file.bin"
download_result = transfer_system.download_with_monitoring(
bucket_name=bucket_name,
object_name=object_name,
file_path=download_path,
resume=True
)
if download_result['success']:
print(f"✅ 大文件下载成功")
print(f" 传输ID: {download_result['transfer_id']}")
print(f" 平均速度: {download_result['average_speed_mbps']:.2f} MB/s")
else:
print(f"❌ 下载失败: {download_result['error']}")
# 4. 验证文件完整性
print("\n4. 验证文件完整性...")
import hashlib
def calculate_file_hash(file_path):
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
original_hash = calculate_file_hash(test_file_path)
downloaded_hash = calculate_file_hash(download_path)
if original_hash == downloaded_hash:
print("✅ 文件完整性验证通过")
else:
print("❌ 文件完整性验证失败")
# 5. 获取系统状态
print("\n5. 系统状态...")
status = transfer_system.get_system_status()
if status['success']:
stats = status['system_status']['transfer_statistics']
print(f"传输统计:")
print(f" - 已完成传输: {stats['completed_transfers']['total']}")
print(f" - 成功率: {stats['completed_transfers']['success_rate']:.1f}%")
print(f" - 平均速度: {stats['performance']['average_speed_mbps']:.2f} MB/s")
print(f" - 总传输量: {stats['performance']['total_mb_transferred']:.2f} MB")
# 6. 清理测试文件
print("\n6. 清理测试文件...")
try:
os.remove(test_file_path)
os.remove(download_path)
print("✅ 测试文件清理完成")
except:
print("⚠️ 清理测试文件时出现问题")
print("\n=== 大文件处理示例完成 ===")
except Exception as e:
print(f"❌ 示例执行失败: {e}")
if __name__ == "__main__":
large_file_example()
5.6 总结
本章详细介绍了MinIO的多部分上传和大文件处理技术,主要内容包括:
5.6.1 核心功能
多部分上传管理
- 上传任务初始化和管理
- 分片上传和ETag管理
- 上传完成和中止操作
- 活跃上传列表和状态跟踪
大文件上传器
- 并行分片上传
- 断点续传支持
- 数据流上传
- 上传进度监控
大文件下载器
- 分片并行下载
- 断点续传下载
- 文件完整性验证
- 下载进度跟踪
传输监控系统
- 实时进度监控
- 传输速度计算
- 传输历史记录
- 系统性能统计
5.6.2 技术特点
- 高性能: 并行传输提高速度
- 可靠性: 断点续传和错误恢复
- 监控性: 实时进度和性能监控
- 可扩展: 支持自定义分片大小和并发数
- 易用性: 简单的API接口
5.6.3 最佳实践
- 分片大小选择: 根据网络条件和文件大小调整
- 并发控制: 避免过多并发导致资源竞争
- 错误处理: 实现重试机制和错误恢复
- 进度监控: 提供用户友好的进度反馈
- 资源管理: 及时清理临时文件和内存
下一章我们将介绍MinIO的事件通知和Webhook集成,学习如何实现实时的对象操作监控和自动化处理。