本章将通过具体的实战案例,展示MinIO在不同场景下的应用实践,包括文件管理系统、图片处理服务、数据备份系统、内容分发网络等典型应用场景。
10.1 文件管理系统
10.1.1 企业文件管理平台
import os
import hashlib
import mimetypes
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
from minio import Minio
from minio.error import S3Error
import json
import logging
from pathlib import Path
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
@dataclass
class FileMetadata:
"""文件元数据"""
file_id: str
filename: str
size: int
content_type: str
upload_time: datetime
uploader: str
department: str
tags: List[str]
checksum: str
version: int
is_public: bool
expiry_date: Optional[datetime] = None
description: str = ""
class EnterpriseFileManager:
"""企业文件管理系统"""
def __init__(self, endpoint: str, access_key: str, secret_key: str, secure: bool = False):
self.client = Minio(endpoint, access_key, secret_key, secure=secure)
self.bucket_name = "enterprise-files"
self.metadata_bucket = "file-metadata"
self.logger = logging.getLogger('EnterpriseFileManager')
# 初始化存储桶
self._initialize_buckets()
# 文件类型配置
self.allowed_types = {
'documents': ['.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.txt'],
'images': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.svg'],
'videos': ['.mp4', '.avi', '.mov', '.wmv', '.flv'],
'archives': ['.zip', '.rar', '.7z', '.tar', '.gz']
}
# 部门配置
self.departments = {
'hr': {'quota': 10 * 1024 * 1024 * 1024, 'retention_days': 2555}, # 10GB, 7年
'finance': {'quota': 50 * 1024 * 1024 * 1024, 'retention_days': 3650}, # 50GB, 10年
'it': {'quota': 100 * 1024 * 1024 * 1024, 'retention_days': 1825}, # 100GB, 5年
'marketing': {'quota': 20 * 1024 * 1024 * 1024, 'retention_days': 1095} # 20GB, 3年
}
def _initialize_buckets(self):
"""初始化存储桶"""
try:
# 创建主文件存储桶
if not self.client.bucket_exists(self.bucket_name):
self.client.make_bucket(self.bucket_name)
self.logger.info(f"创建存储桶: {self.bucket_name}")
# 创建元数据存储桶
if not self.client.bucket_exists(self.metadata_bucket):
self.client.make_bucket(self.metadata_bucket)
self.logger.info(f"创建元数据存储桶: {self.metadata_bucket}")
except Exception as e:
self.logger.error(f"初始化存储桶失败: {e}")
raise
def upload_file(self, file_path: str, uploader: str, department: str,
tags: List[str] = None, description: str = "",
is_public: bool = False, expiry_days: int = None) -> Dict[str, Any]:
"""上传文件"""
try:
if not os.path.exists(file_path):
return {'success': False, 'error': '文件不存在'}
# 检查部门权限
if department not in self.departments:
return {'success': False, 'error': '无效的部门'}
# 检查文件类型
file_ext = Path(file_path).suffix.lower()
if not self._is_allowed_file_type(file_ext):
return {'success': False, 'error': f'不支持的文件类型: {file_ext}'}
# 检查部门配额
quota_check = self._check_department_quota(department, os.path.getsize(file_path))
if not quota_check['allowed']:
return {'success': False, 'error': quota_check['message']}
# 生成文件ID和路径
file_id = self._generate_file_id(file_path)
filename = os.path.basename(file_path)
object_name = f"{department}/{datetime.now().strftime('%Y/%m')}/{file_id}_{filename}"
# 计算文件校验和
checksum = self._calculate_checksum(file_path)
# 检查重复文件
duplicate_check = self._check_duplicate_file(checksum, department)
if duplicate_check['exists']:
return {
'success': False,
'error': '文件已存在',
'existing_file': duplicate_check['file_info']
}
# 上传文件
content_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream'
# 设置元数据
metadata = {
'uploader': uploader,
'department': department,
'upload_time': datetime.now().isoformat(),
'checksum': checksum,
'tags': ','.join(tags or []),
'description': description,
'is_public': str(is_public)
}
if expiry_days:
expiry_date = datetime.now() + timedelta(days=expiry_days)
metadata['expiry_date'] = expiry_date.isoformat()
# 执行上传
self.client.fput_object(
self.bucket_name,
object_name,
file_path,
content_type=content_type,
metadata=metadata
)
# 创建文件元数据记录
file_metadata = FileMetadata(
file_id=file_id,
filename=filename,
size=os.path.getsize(file_path),
content_type=content_type,
upload_time=datetime.now(),
uploader=uploader,
department=department,
tags=tags or [],
checksum=checksum,
version=1,
is_public=is_public,
expiry_date=datetime.now() + timedelta(days=expiry_days) if expiry_days else None,
description=description
)
# 保存元数据
self._save_file_metadata(file_metadata)
# 生成访问URL
access_url = None
if is_public:
access_url = self.client.presigned_get_object(
self.bucket_name, object_name, expires=timedelta(days=7)
)
self.logger.info(f"文件上传成功: {filename} -> {object_name}")
return {
'success': True,
'file_id': file_id,
'object_name': object_name,
'access_url': access_url,
'metadata': file_metadata.__dict__
}
except Exception as e:
self.logger.error(f"文件上传失败: {e}")
return {'success': False, 'error': str(e)}
def download_file(self, file_id: str, user: str, download_path: str = None) -> Dict[str, Any]:
"""下载文件"""
try:
# 获取文件元数据
metadata = self._get_file_metadata(file_id)
if not metadata:
return {'success': False, 'error': '文件不存在'}
# 检查访问权限
access_check = self._check_file_access(metadata, user)
if not access_check['allowed']:
return {'success': False, 'error': access_check['message']}
# 构建对象名称
object_name = self._get_object_name_by_metadata(metadata)
# 设置下载路径
if not download_path:
download_path = os.path.join(os.getcwd(), 'downloads', metadata['filename'])
# 确保下载目录存在
os.makedirs(os.path.dirname(download_path), exist_ok=True)
# 下载文件
self.client.fget_object(self.bucket_name, object_name, download_path)
# 验证文件完整性
downloaded_checksum = self._calculate_checksum(download_path)
if downloaded_checksum != metadata['checksum']:
os.remove(download_path)
return {'success': False, 'error': '文件完整性验证失败'}
# 记录下载日志
self._log_file_access(file_id, user, 'download')
self.logger.info(f"文件下载成功: {metadata['filename']} -> {download_path}")
return {
'success': True,
'download_path': download_path,
'metadata': metadata
}
except Exception as e:
self.logger.error(f"文件下载失败: {e}")
return {'success': False, 'error': str(e)}
def search_files(self, query: Dict[str, Any], user: str) -> Dict[str, Any]:
"""搜索文件"""
try:
# 获取用户可访问的部门
accessible_departments = self._get_user_departments(user)
# 构建搜索条件
search_results = []
# 遍历所有文件元数据
metadata_objects = self.client.list_objects(
self.metadata_bucket, recursive=True
)
for obj in metadata_objects:
try:
# 获取元数据
response = self.client.get_object(self.metadata_bucket, obj.object_name)
metadata = json.loads(response.read().decode('utf-8'))
# 检查部门访问权限
if metadata['department'] not in accessible_departments:
continue
# 应用搜索过滤器
if self._matches_search_criteria(metadata, query):
search_results.append(metadata)
except Exception as e:
self.logger.warning(f"处理元数据失败: {obj.object_name}, {e}")
continue
# 排序结果
sort_by = query.get('sort_by', 'upload_time')
reverse = query.get('sort_order', 'desc') == 'desc'
if sort_by in ['upload_time', 'size']:
search_results.sort(
key=lambda x: x.get(sort_by, 0),
reverse=reverse
)
# 分页
page = query.get('page', 1)
page_size = query.get('page_size', 20)
start_idx = (page - 1) * page_size
end_idx = start_idx + page_size
paginated_results = search_results[start_idx:end_idx]
return {
'success': True,
'results': paginated_results,
'total_count': len(search_results),
'page': page,
'page_size': page_size,
'total_pages': (len(search_results) + page_size - 1) // page_size
}
except Exception as e:
self.logger.error(f"文件搜索失败: {e}")
return {'success': False, 'error': str(e)}
def delete_file(self, file_id: str, user: str, reason: str = "") -> Dict[str, Any]:
"""删除文件"""
try:
# 获取文件元数据
metadata = self._get_file_metadata(file_id)
if not metadata:
return {'success': False, 'error': '文件不存在'}
# 检查删除权限
delete_check = self._check_delete_permission(metadata, user)
if not delete_check['allowed']:
return {'success': False, 'error': delete_check['message']}
# 构建对象名称
object_name = self._get_object_name_by_metadata(metadata)
# 删除文件
self.client.remove_object(self.bucket_name, object_name)
# 删除元数据
metadata_object_name = f"{file_id}.json"
self.client.remove_object(self.metadata_bucket, metadata_object_name)
# 记录删除日志
self._log_file_access(file_id, user, 'delete', {'reason': reason})
self.logger.info(f"文件删除成功: {metadata['filename']}")
return {
'success': True,
'message': '文件删除成功',
'deleted_file': metadata
}
except Exception as e:
self.logger.error(f"文件删除失败: {e}")
return {'success': False, 'error': str(e)}
def get_department_statistics(self, department: str) -> Dict[str, Any]:
"""获取部门统计信息"""
try:
if department not in self.departments:
return {'success': False, 'error': '无效的部门'}
stats = {
'department': department,
'total_files': 0,
'total_size': 0,
'file_types': {},
'monthly_uploads': {},
'top_uploaders': {},
'quota_usage': {
'used': 0,
'total': self.departments[department]['quota'],
'percentage': 0
}
}
# 遍历部门文件
department_prefix = f"{department}/"
objects = self.client.list_objects(
self.bucket_name, prefix=department_prefix, recursive=True
)
for obj in objects:
stats['total_files'] += 1
stats['total_size'] += obj.size
# 文件类型统计
file_ext = Path(obj.object_name).suffix.lower()
stats['file_types'][file_ext] = stats['file_types'].get(file_ext, 0) + 1
# 获取文件元数据进行详细统计
try:
file_id = obj.object_name.split('/')[-1].split('_')[0]
metadata = self._get_file_metadata(file_id)
if metadata:
# 月度上传统计
upload_month = metadata['upload_time'][:7] # YYYY-MM
stats['monthly_uploads'][upload_month] = stats['monthly_uploads'].get(upload_month, 0) + 1
# 上传者统计
uploader = metadata['uploader']
stats['top_uploaders'][uploader] = stats['top_uploaders'].get(uploader, 0) + 1
except Exception as e:
self.logger.warning(f"获取文件元数据失败: {obj.object_name}, {e}")
continue
# 计算配额使用率
stats['quota_usage']['used'] = stats['total_size']
if stats['quota_usage']['total'] > 0:
stats['quota_usage']['percentage'] = (
stats['total_size'] / stats['quota_usage']['total'] * 100
)
return {
'success': True,
'statistics': stats
}
except Exception as e:
self.logger.error(f"获取部门统计失败: {e}")
return {'success': False, 'error': str(e)}
def cleanup_expired_files(self) -> Dict[str, Any]:
"""清理过期文件"""
try:
cleanup_results = {
'cleaned_files': [],
'failed_cleanups': [],
'total_cleaned': 0,
'space_freed': 0
}
current_time = datetime.now()
# 遍历所有元数据
metadata_objects = self.client.list_objects(
self.metadata_bucket, recursive=True
)
for obj in metadata_objects:
try:
# 获取元数据
response = self.client.get_object(self.metadata_bucket, obj.object_name)
metadata = json.loads(response.read().decode('utf-8'))
# 检查是否过期
if metadata.get('expiry_date'):
expiry_date = datetime.fromisoformat(metadata['expiry_date'])
if current_time > expiry_date:
# 删除过期文件
object_name = self._get_object_name_by_metadata(metadata)
# 获取文件大小
file_obj = self.client.stat_object(self.bucket_name, object_name)
file_size = file_obj.size
# 删除文件和元数据
self.client.remove_object(self.bucket_name, object_name)
self.client.remove_object(self.metadata_bucket, obj.object_name)
cleanup_results['cleaned_files'].append({
'file_id': metadata['file_id'],
'filename': metadata['filename'],
'size': file_size,
'expiry_date': metadata['expiry_date']
})
cleanup_results['total_cleaned'] += 1
cleanup_results['space_freed'] += file_size
self.logger.info(f"清理过期文件: {metadata['filename']}")
# 检查部门保留期限
department = metadata.get('department')
if department in self.departments:
retention_days = self.departments[department]['retention_days']
upload_time = datetime.fromisoformat(metadata['upload_time'])
retention_deadline = upload_time + timedelta(days=retention_days)
if current_time > retention_deadline:
# 删除超过保留期的文件
object_name = self._get_object_name_by_metadata(metadata)
try:
file_obj = self.client.stat_object(self.bucket_name, object_name)
file_size = file_obj.size
self.client.remove_object(self.bucket_name, object_name)
self.client.remove_object(self.metadata_bucket, obj.object_name)
cleanup_results['cleaned_files'].append({
'file_id': metadata['file_id'],
'filename': metadata['filename'],
'size': file_size,
'reason': 'retention_expired'
})
cleanup_results['total_cleaned'] += 1
cleanup_results['space_freed'] += file_size
self.logger.info(f"清理超期文件: {metadata['filename']}")
except Exception as e:
cleanup_results['failed_cleanups'].append({
'file_id': metadata['file_id'],
'error': str(e)
})
except Exception as e:
self.logger.warning(f"处理文件清理失败: {obj.object_name}, {e}")
cleanup_results['failed_cleanups'].append({
'object': obj.object_name,
'error': str(e)
})
continue
self.logger.info(
f"文件清理完成: 清理 {cleanup_results['total_cleaned']} 个文件, "
f"释放空间 {self._format_size(cleanup_results['space_freed'])}"
)
return {
'success': True,
'cleanup_results': cleanup_results
}
except Exception as e:
self.logger.error(f"文件清理失败: {e}")
return {'success': False, 'error': str(e)}
# 辅助方法
def _generate_file_id(self, file_path: str) -> str:
"""生成文件ID"""
timestamp = str(int(datetime.now().timestamp() * 1000))
filename = os.path.basename(file_path)
return hashlib.md5(f"{timestamp}_{filename}".encode()).hexdigest()[:16]
def _calculate_checksum(self, file_path: str) -> str:
"""计算文件校验和"""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def _is_allowed_file_type(self, file_ext: str) -> bool:
"""检查文件类型是否允许"""
for category, extensions in self.allowed_types.items():
if file_ext in extensions:
return True
return False
def _check_department_quota(self, department: str, file_size: int) -> Dict[str, Any]:
"""检查部门配额"""
try:
dept_config = self.departments[department]
quota = dept_config['quota']
# 计算当前使用量
current_usage = 0
department_prefix = f"{department}/"
objects = self.client.list_objects(
self.bucket_name, prefix=department_prefix, recursive=True
)
for obj in objects:
current_usage += obj.size
if current_usage + file_size > quota:
return {
'allowed': False,
'message': f'超出部门配额限制 ({self._format_size(quota)})'
}
return {'allowed': True}
except Exception as e:
return {'allowed': False, 'message': f'配额检查失败: {e}'}
def _check_duplicate_file(self, checksum: str, department: str) -> Dict[str, Any]:
"""检查重复文件"""
try:
# 遍历部门的元数据文件
metadata_objects = self.client.list_objects(
self.metadata_bucket, recursive=True
)
for obj in metadata_objects:
try:
response = self.client.get_object(self.metadata_bucket, obj.object_name)
metadata = json.loads(response.read().decode('utf-8'))
if (metadata.get('checksum') == checksum and
metadata.get('department') == department):
return {
'exists': True,
'file_info': metadata
}
except Exception:
continue
return {'exists': False}
except Exception as e:
self.logger.warning(f"重复文件检查失败: {e}")
return {'exists': False}
def _save_file_metadata(self, metadata: FileMetadata):
"""保存文件元数据"""
metadata_dict = {
'file_id': metadata.file_id,
'filename': metadata.filename,
'size': metadata.size,
'content_type': metadata.content_type,
'upload_time': metadata.upload_time.isoformat(),
'uploader': metadata.uploader,
'department': metadata.department,
'tags': metadata.tags,
'checksum': metadata.checksum,
'version': metadata.version,
'is_public': metadata.is_public,
'description': metadata.description
}
if metadata.expiry_date:
metadata_dict['expiry_date'] = metadata.expiry_date.isoformat()
metadata_json = json.dumps(metadata_dict, ensure_ascii=False, indent=2)
metadata_object_name = f"{metadata.file_id}.json"
self.client.put_object(
self.metadata_bucket,
metadata_object_name,
data=metadata_json.encode('utf-8'),
length=len(metadata_json.encode('utf-8')),
content_type='application/json'
)
def _get_file_metadata(self, file_id: str) -> Optional[Dict[str, Any]]:
"""获取文件元数据"""
try:
metadata_object_name = f"{file_id}.json"
response = self.client.get_object(self.metadata_bucket, metadata_object_name)
return json.loads(response.read().decode('utf-8'))
except Exception:
return None
def _check_file_access(self, metadata: Dict[str, Any], user: str) -> Dict[str, Any]:
"""检查文件访问权限"""
# 简化的权限检查逻辑
user_departments = self._get_user_departments(user)
if metadata['department'] not in user_departments:
return {'allowed': False, 'message': '无权限访问此文件'}
# 检查文件是否过期
if metadata.get('expiry_date'):
expiry_date = datetime.fromisoformat(metadata['expiry_date'])
if datetime.now() > expiry_date:
return {'allowed': False, 'message': '文件已过期'}
return {'allowed': True}
def _get_user_departments(self, user: str) -> List[str]:
"""获取用户可访问的部门(简化实现)"""
# 这里应该从用户管理系统获取实际权限
# 简化实现:假设用户名包含部门信息
if 'admin' in user.lower():
return list(self.departments.keys())
for dept in self.departments.keys():
if dept in user.lower():
return [dept]
return ['it'] # 默认IT部门
def _get_object_name_by_metadata(self, metadata: Dict[str, Any]) -> str:
"""根据元数据构建对象名称"""
upload_time = datetime.fromisoformat(metadata['upload_time'])
return f"{metadata['department']}/{upload_time.strftime('%Y/%m')}/{metadata['file_id']}_{metadata['filename']}"
def _matches_search_criteria(self, metadata: Dict[str, Any], query: Dict[str, Any]) -> bool:
"""检查是否匹配搜索条件"""
# 文件名搜索
if 'filename' in query:
if query['filename'].lower() not in metadata['filename'].lower():
return False
# 标签搜索
if 'tags' in query:
file_tags = metadata.get('tags', [])
if not any(tag in file_tags for tag in query['tags']):
return False
# 文件类型搜索
if 'content_type' in query:
if query['content_type'] not in metadata['content_type']:
return False
# 上传者搜索
if 'uploader' in query:
if query['uploader'].lower() not in metadata['uploader'].lower():
return False
# 日期范围搜索
if 'date_from' in query or 'date_to' in query:
upload_time = datetime.fromisoformat(metadata['upload_time'])
if 'date_from' in query:
date_from = datetime.fromisoformat(query['date_from'])
if upload_time < date_from:
return False
if 'date_to' in query:
date_to = datetime.fromisoformat(query['date_to'])
if upload_time > date_to:
return False
# 文件大小搜索
if 'size_min' in query or 'size_max' in query:
file_size = metadata['size']
if 'size_min' in query and file_size < query['size_min']:
return False
if 'size_max' in query and file_size > query['size_max']:
return False
return True
def _check_delete_permission(self, metadata: Dict[str, Any], user: str) -> Dict[str, Any]:
"""检查删除权限"""
# 简化的删除权限检查
if metadata['uploader'] == user:
return {'allowed': True}
if 'admin' in user.lower():
return {'allowed': True}
return {'allowed': False, 'message': '无权限删除此文件'}
def _log_file_access(self, file_id: str, user: str, action: str, details: Dict[str, Any] = None):
"""记录文件访问日志"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'file_id': file_id,
'user': user,
'action': action,
'details': details or {}
}
# 这里可以将日志保存到数据库或日志文件
self.logger.info(f"文件访问: {json.dumps(log_entry, ensure_ascii=False)}")
def _format_size(self, size_bytes: int) -> str:
"""格式化文件大小"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_bytes < 1024.0:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.1f} PB"
# 使用示例
if __name__ == "__main__":
# 创建文件管理器
file_manager = EnterpriseFileManager(
endpoint="localhost:9000",
access_key="minioadmin",
secret_key="minioadmin",
secure=False
)
# 上传文件示例
upload_result = file_manager.upload_file(
file_path="/path/to/document.pdf",
uploader="john.doe",
department="hr",
tags=["合同", "重要"],
description="员工合同文档",
is_public=False,
expiry_days=2555 # 7年
)
if upload_result['success']:
file_id = upload_result['file_id']
print(f"文件上传成功,ID: {file_id}")
# 搜索文件示例
search_result = file_manager.search_files(
query={
'tags': ['合同'],
'department': 'hr',
'date_from': '2024-01-01T00:00:00',
'page': 1,
'page_size': 10
},
user="hr.manager"
)
if search_result['success']:
print(f"找到 {search_result['total_count']} 个文件")
for file_info in search_result['results']:
print(f"- {file_info['filename']} ({file_info['uploader']})")
# 获取部门统计
stats_result = file_manager.get_department_statistics('hr')
if stats_result['success']:
stats = stats_result['statistics']
print(f"\nHR部门统计:")
print(f"总文件数: {stats['total_files']}")
print(f"总大小: {file_manager._format_size(stats['total_size'])}")
print(f"配额使用率: {stats['quota_usage']['percentage']:.1f}%")
# 清理过期文件
cleanup_result = file_manager.cleanup_expired_files()
if cleanup_result['success']:
results = cleanup_result['cleanup_results']
print(f"\n清理结果: 删除 {results['total_cleaned']} 个文件")
print(f"释放空间: {file_manager._format_size(results['space_freed'])}")
10.1.2 文件版本管理
class FileVersionManager:
"""文件版本管理器"""
def __init__(self, file_manager: EnterpriseFileManager):
self.file_manager = file_manager
self.client = file_manager.client
self.versions_bucket = "file-versions"
self.logger = logging.getLogger('FileVersionManager')
# 初始化版本存储桶
self._initialize_version_bucket()
def _initialize_version_bucket(self):
"""初始化版本存储桶"""
try:
if not self.client.bucket_exists(self.versions_bucket):
self.client.make_bucket(self.versions_bucket)
self.logger.info(f"创建版本存储桶: {self.versions_bucket}")
except Exception as e:
self.logger.error(f"初始化版本存储桶失败: {e}")
raise
def create_new_version(self, original_file_id: str, new_file_path: str,
user: str, change_notes: str = "") -> Dict[str, Any]:
"""创建文件新版本"""
try:
# 获取原文件元数据
original_metadata = self.file_manager._get_file_metadata(original_file_id)
if not original_metadata:
return {'success': False, 'error': '原文件不存在'}
# 检查修改权限
if not self._check_modify_permission(original_metadata, user):
return {'success': False, 'error': '无权限修改此文件'}
# 备份当前版本
backup_result = self._backup_current_version(original_file_id)
if not backup_result['success']:
return backup_result
# 上传新版本
new_version = original_metadata['version'] + 1
# 更新文件内容
object_name = self.file_manager._get_object_name_by_metadata(original_metadata)
# 计算新文件校验和
new_checksum = self.file_manager._calculate_checksum(new_file_path)
# 上传新文件内容
content_type = mimetypes.guess_type(new_file_path)[0] or 'application/octet-stream'
metadata = {
'version': str(new_version),
'previous_version': str(original_metadata['version']),
'modifier': user,
'modification_time': datetime.now().isoformat(),
'change_notes': change_notes,
'checksum': new_checksum
}
self.client.fput_object(
self.file_manager.bucket_name,
object_name,
new_file_path,
content_type=content_type,
metadata=metadata
)
# 更新文件元数据
original_metadata['version'] = new_version
original_metadata['checksum'] = new_checksum
original_metadata['size'] = os.path.getsize(new_file_path)
original_metadata['last_modified'] = datetime.now().isoformat()
original_metadata['last_modifier'] = user
# 保存更新的元数据
self._update_file_metadata(original_file_id, original_metadata)
# 记录版本历史
version_record = {
'file_id': original_file_id,
'version': new_version,
'modifier': user,
'modification_time': datetime.now().isoformat(),
'change_notes': change_notes,
'checksum': new_checksum,
'size': os.path.getsize(new_file_path),
'backup_location': backup_result['backup_location']
}
self._save_version_record(version_record)
self.logger.info(f"文件版本更新成功: {original_metadata['filename']} v{new_version}")
return {
'success': True,
'new_version': new_version,
'backup_location': backup_result['backup_location'],
'version_record': version_record
}
except Exception as e:
self.logger.error(f"创建文件版本失败: {e}")
return {'success': False, 'error': str(e)}
def get_version_history(self, file_id: str) -> Dict[str, Any]:
"""获取文件版本历史"""
try:
version_history = []
# 获取版本记录
version_prefix = f"{file_id}/"
version_objects = self.client.list_objects(
self.versions_bucket, prefix=version_prefix, recursive=True
)
for obj in version_objects:
if obj.object_name.endswith('.json'):
try:
response = self.client.get_object(self.versions_bucket, obj.object_name)
version_record = json.loads(response.read().decode('utf-8'))
version_history.append(version_record)
except Exception as e:
self.logger.warning(f"读取版本记录失败: {obj.object_name}, {e}")
continue
# 按版本号排序
version_history.sort(key=lambda x: x['version'], reverse=True)
return {
'success': True,
'file_id': file_id,
'version_history': version_history,
'total_versions': len(version_history)
}
except Exception as e:
self.logger.error(f"获取版本历史失败: {e}")
return {'success': False, 'error': str(e)}
def restore_version(self, file_id: str, target_version: int, user: str) -> Dict[str, Any]:
"""恢复到指定版本"""
try:
# 获取当前文件元数据
current_metadata = self.file_manager._get_file_metadata(file_id)
if not current_metadata:
return {'success': False, 'error': '文件不存在'}
# 检查恢复权限
if not self._check_modify_permission(current_metadata, user):
return {'success': False, 'error': '无权限恢复此文件'}
# 获取目标版本信息
target_version_record = self._get_version_record(file_id, target_version)
if not target_version_record:
return {'success': False, 'error': f'版本 {target_version} 不存在'}
# 备份当前版本
backup_result = self._backup_current_version(file_id)
if not backup_result['success']:
return backup_result
# 恢复目标版本
backup_location = target_version_record['backup_location']
current_object_name = self.file_manager._get_object_name_by_metadata(current_metadata)
# 复制备份版本到当前位置
copy_source = {
'Bucket': self.versions_bucket,
'Key': backup_location
}
self.client.copy_object(
self.file_manager.bucket_name,
current_object_name,
copy_source
)
# 更新元数据
new_version = current_metadata['version'] + 1
current_metadata['version'] = new_version
current_metadata['checksum'] = target_version_record['checksum']
current_metadata['size'] = target_version_record['size']
current_metadata['last_modified'] = datetime.now().isoformat()
current_metadata['last_modifier'] = user
self._update_file_metadata(file_id, current_metadata)
# 记录恢复操作
restore_record = {
'file_id': file_id,
'version': new_version,
'restored_from_version': target_version,
'restorer': user,
'restore_time': datetime.now().isoformat(),
'change_notes': f'恢复到版本 {target_version}',
'checksum': target_version_record['checksum'],
'size': target_version_record['size'],
'backup_location': backup_result['backup_location']
}
self._save_version_record(restore_record)
self.logger.info(f"文件版本恢复成功: {current_metadata['filename']} 恢复到 v{target_version}")
return {
'success': True,
'new_version': new_version,
'restored_from_version': target_version,
'restore_record': restore_record
}
except Exception as e:
self.logger.error(f"版本恢复失败: {e}")
return {'success': False, 'error': str(e)}
def _backup_current_version(self, file_id: str) -> Dict[str, Any]:
"""备份当前版本"""
try:
# 获取当前文件信息
current_metadata = self.file_manager._get_file_metadata(file_id)
if not current_metadata:
return {'success': False, 'error': '文件不存在'}
current_object_name = self.file_manager._get_object_name_by_metadata(current_metadata)
current_version = current_metadata['version']
# 生成备份位置
backup_location = f"{file_id}/v{current_version}_{current_metadata['filename']}"
# 复制当前文件到版本存储桶
copy_source = {
'Bucket': self.file_manager.bucket_name,
'Key': current_object_name
}
self.client.copy_object(
self.versions_bucket,
backup_location,
copy_source
)
return {
'success': True,
'backup_location': backup_location
}
except Exception as e:
self.logger.error(f"备份当前版本失败: {e}")
return {'success': False, 'error': str(e)}
def _check_modify_permission(self, metadata: Dict[str, Any], user: str) -> bool:
"""检查修改权限"""
# 简化的权限检查
if metadata['uploader'] == user:
return True
if 'admin' in user.lower():
return True
# 检查部门权限
user_departments = self.file_manager._get_user_departments(user)
if metadata['department'] in user_departments:
return True
return False
def _update_file_metadata(self, file_id: str, metadata: Dict[str, Any]):
"""更新文件元数据"""
metadata_json = json.dumps(metadata, ensure_ascii=False, indent=2)
metadata_object_name = f"{file_id}.json"
self.client.put_object(
self.file_manager.metadata_bucket,
metadata_object_name,
data=metadata_json.encode('utf-8'),
length=len(metadata_json.encode('utf-8')),
content_type='application/json'
)
def _save_version_record(self, version_record: Dict[str, Any]):
"""保存版本记录"""
record_json = json.dumps(version_record, ensure_ascii=False, indent=2)
record_object_name = f"{version_record['file_id']}/v{version_record['version']}.json"
self.client.put_object(
self.versions_bucket,
record_object_name,
data=record_json.encode('utf-8'),
length=len(record_json.encode('utf-8')),
content_type='application/json'
)
def _get_version_record(self, file_id: str, version: int) -> Optional[Dict[str, Any]]:
"""获取版本记录"""
try:
record_object_name = f"{file_id}/v{version}.json"
response = self.client.get_object(self.versions_bucket, record_object_name)
return json.loads(response.read().decode('utf-8'))
except Exception:
return None
10.2 图片处理服务
10.2.1 智能图片管理系统
from PIL import Image, ImageFilter, ImageEnhance
import io
from typing import Tuple, List
import base64
import cv2
import numpy as np
from concurrent.futures import ThreadPoolExecutor
class ImageProcessingService:
"""图片处理服务"""
def __init__(self, endpoint: str, access_key: str, secret_key: str, secure: bool = False):
self.client = Minio(endpoint, access_key, secret_key, secure=secure)
self.bucket_name = "image-storage"
self.processed_bucket = "processed-images"
self.thumbnails_bucket = "image-thumbnails"
self.logger = logging.getLogger('ImageProcessingService')
# 初始化存储桶
self._initialize_buckets()
# 支持的图片格式
self.supported_formats = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp']
# 缩略图尺寸配置
self.thumbnail_sizes = {
'small': (150, 150),
'medium': (300, 300),
'large': (600, 600)
}
def _initialize_buckets(self):
"""初始化存储桶"""
buckets = [self.bucket_name, self.processed_bucket, self.thumbnails_bucket]
for bucket in buckets:
try:
if not self.client.bucket_exists(bucket):
self.client.make_bucket(bucket)
self.logger.info(f"创建存储桶: {bucket}")
except Exception as e:
self.logger.error(f"创建存储桶失败 {bucket}: {e}")
raise
def upload_and_process_image(self, image_path: str, user_id: str,
auto_process: bool = True) -> Dict[str, Any]:
"""上传并处理图片"""
try:
# 验证图片文件
if not self._is_valid_image(image_path):
return {'success': False, 'error': '不支持的图片格式'}
# 生成图片ID
image_id = self._generate_image_id(image_path, user_id)
filename = os.path.basename(image_path)
# 分析图片信息
image_info = self._analyze_image(image_path)
# 上传原始图片
original_object_name = f"original/{user_id}/{image_id}_{filename}"
with open(image_path, 'rb') as file_data:
self.client.put_object(
self.bucket_name,
original_object_name,
file_data,
length=os.path.getsize(image_path),
content_type=f"image/{image_info['format'].lower()}",
metadata={
'user_id': user_id,
'upload_time': datetime.now().isoformat(),
'width': str(image_info['width']),
'height': str(image_info['height']),
'format': image_info['format'],
'size': str(image_info['size'])
}
)
result = {
'success': True,
'image_id': image_id,
'original_url': original_object_name,
'image_info': image_info,
'processed_versions': {},
'thumbnails': {}
}
if auto_process:
# 自动处理图片
processing_result = self._auto_process_image(image_path, image_id, user_id)
result['processed_versions'] = processing_result.get('processed_versions', {})
result['thumbnails'] = processing_result.get('thumbnails', {})
self.logger.info(f"图片上传成功: {filename} -> {image_id}")
return result
except Exception as e:
self.logger.error(f"图片上传处理失败: {e}")
return {'success': False, 'error': str(e)}
def _auto_process_image(self, image_path: str, image_id: str, user_id: str) -> Dict[str, Any]:
"""自动处理图片"""
result = {
'processed_versions': {},
'thumbnails': {}
}
try:
# 生成缩略图
thumbnail_result = self.generate_thumbnails(image_path, image_id, user_id)
if thumbnail_result['success']:
result['thumbnails'] = thumbnail_result['thumbnails']
# 图片优化
optimization_result = self.optimize_image(image_path, image_id, user_id)
if optimization_result['success']:
result['processed_versions']['optimized'] = optimization_result['optimized_url']
# 格式转换
webp_result = self.convert_to_webp(image_path, image_id, user_id)
if webp_result['success']:
result['processed_versions']['webp'] = webp_result['webp_url']
return result
except Exception as e:
self.logger.error(f"自动处理图片失败: {e}")
return result
def generate_thumbnails(self, image_path: str, image_id: str, user_id: str) -> Dict[str, Any]:
"""生成缩略图"""
try:
thumbnails = {}
with Image.open(image_path) as img:
for size_name, (width, height) in self.thumbnail_sizes.items():
# 创建缩略图
thumbnail = img.copy()
thumbnail.thumbnail((width, height), Image.Resampling.LANCZOS)
# 保存到内存
thumbnail_buffer = io.BytesIO()
thumbnail.save(thumbnail_buffer, format='JPEG', quality=85)
thumbnail_buffer.seek(0)
# 上传缩略图
thumbnail_object_name = f"{user_id}/{image_id}_{size_name}.jpg"
self.client.put_object(
self.thumbnails_bucket,
thumbnail_object_name,
thumbnail_buffer,
length=thumbnail_buffer.getbuffer().nbytes,
content_type='image/jpeg',
metadata={
'original_image_id': image_id,
'thumbnail_size': size_name,
'width': str(thumbnail.width),
'height': str(thumbnail.height)
}
)
thumbnails[size_name] = {
'url': thumbnail_object_name,
'width': thumbnail.width,
'height': thumbnail.height
}
return {
'success': True,
'thumbnails': thumbnails
}
except Exception as e:
self.logger.error(f"生成缩略图失败: {e}")
return {'success': False, 'error': str(e)}
def optimize_image(self, image_path: str, image_id: str, user_id: str,
quality: int = 85) -> Dict[str, Any]:
"""优化图片"""
try:
with Image.open(image_path) as img:
# 转换为RGB模式(如果需要)
if img.mode in ('RGBA', 'LA', 'P'):
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P':
img = img.convert('RGBA')
background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None)
img = background
# 优化图片
optimized_buffer = io.BytesIO()
img.save(optimized_buffer, format='JPEG', quality=quality, optimize=True)
optimized_buffer.seek(0)
# 上传优化后的图片
optimized_object_name = f"{user_id}/{image_id}_optimized.jpg"
self.client.put_object(
self.processed_bucket,
optimized_object_name,
optimized_buffer,
length=optimized_buffer.getbuffer().nbytes,
content_type='image/jpeg',
metadata={
'original_image_id': image_id,
'processing_type': 'optimized',
'quality': str(quality),
'original_size': str(os.path.getsize(image_path)),
'optimized_size': str(optimized_buffer.getbuffer().nbytes)
}
)
return {
'success': True,
'optimized_url': optimized_object_name,
'original_size': os.path.getsize(image_path),
'optimized_size': optimized_buffer.getbuffer().nbytes,
'compression_ratio': optimized_buffer.getbuffer().nbytes / os.path.getsize(image_path)
}
except Exception as e:
self.logger.error(f"优化图片失败: {e}")
return {'success': False, 'error': str(e)}
def convert_to_webp(self, image_path: str, image_id: str, user_id: str) -> Dict[str, Any]:
"""转换为WebP格式"""
try:
with Image.open(image_path) as img:
# 转换为WebP
webp_buffer = io.BytesIO()
img.save(webp_buffer, format='WEBP', quality=85)
webp_buffer.seek(0)
# 上传WebP图片
webp_object_name = f"{user_id}/{image_id}.webp"
self.client.put_object(
self.processed_bucket,
webp_object_name,
webp_buffer,
length=webp_buffer.getbuffer().nbytes,
content_type='image/webp',
metadata={
'original_image_id': image_id,
'processing_type': 'webp_conversion',
'original_size': str(os.path.getsize(image_path)),
'webp_size': str(webp_buffer.getbuffer().nbytes)
}
)
return {
'success': True,
'webp_url': webp_object_name,
'original_size': os.path.getsize(image_path),
'webp_size': webp_buffer.getbuffer().nbytes,
'compression_ratio': webp_buffer.getbuffer().nbytes / os.path.getsize(image_path)
}
except Exception as e:
self.logger.error(f"WebP转换失败: {e}")
return {'success': False, 'error': str(e)}
def apply_filters(self, image_id: str, user_id: str, filters: List[str]) -> Dict[str, Any]:
"""应用图片滤镜"""
try:
# 获取原始图片
original_object_name = self._get_original_image_path(image_id, user_id)
if not original_object_name:
return {'success': False, 'error': '原始图片不存在'}
# 下载原始图片
response = self.client.get_object(self.bucket_name, original_object_name)
image_data = response.read()
# 应用滤镜
filtered_images = {}
with Image.open(io.BytesIO(image_data)) as img:
for filter_name in filters:
filtered_img = self._apply_single_filter(img.copy(), filter_name)
if filtered_img:
# 保存滤镜图片
filtered_buffer = io.BytesIO()
filtered_img.save(filtered_buffer, format='JPEG', quality=90)
filtered_buffer.seek(0)
# 上传滤镜图片
filtered_object_name = f"{user_id}/{image_id}_{filter_name}.jpg"
self.client.put_object(
self.processed_bucket,
filtered_object_name,
filtered_buffer,
length=filtered_buffer.getbuffer().nbytes,
content_type='image/jpeg',
metadata={
'original_image_id': image_id,
'processing_type': 'filter',
'filter_name': filter_name
}
)
filtered_images[filter_name] = filtered_object_name
return {
'success': True,
'filtered_images': filtered_images
}
except Exception as e:
self.logger.error(f"应用滤镜失败: {e}")
return {'success': False, 'error': str(e)}
def _apply_single_filter(self, img: Image.Image, filter_name: str) -> Optional[Image.Image]:
"""应用单个滤镜"""
try:
if filter_name == 'blur':
return img.filter(ImageFilter.BLUR)
elif filter_name == 'sharpen':
return img.filter(ImageFilter.SHARPEN)
elif filter_name == 'edge_enhance':
return img.filter(ImageFilter.EDGE_ENHANCE)
elif filter_name == 'emboss':
return img.filter(ImageFilter.EMBOSS)
elif filter_name == 'grayscale':
return img.convert('L').convert('RGB')
elif filter_name == 'sepia':
return self._apply_sepia_filter(img)
elif filter_name == 'vintage':
return self._apply_vintage_filter(img)
elif filter_name == 'brightness_up':
enhancer = ImageEnhance.Brightness(img)
return enhancer.enhance(1.3)
elif filter_name == 'brightness_down':
enhancer = ImageEnhance.Brightness(img)
return enhancer.enhance(0.7)
elif filter_name == 'contrast_up':
enhancer = ImageEnhance.Contrast(img)
return enhancer.enhance(1.3)
elif filter_name == 'saturation_up':
enhancer = ImageEnhance.Color(img)
return enhancer.enhance(1.3)
else:
return None
except Exception as e:
self.logger.error(f"应用滤镜失败 {filter_name}: {e}")
return None
def _apply_sepia_filter(self, img: Image.Image) -> Image.Image:
"""应用复古滤镜"""
pixels = img.load()
for i in range(img.width):
for j in range(img.height):
r, g, b = pixels[i, j][:3]
tr = int(0.393 * r + 0.769 * g + 0.189 * b)
tg = int(0.349 * r + 0.686 * g + 0.168 * b)
tb = int(0.272 * r + 0.534 * g + 0.131 * b)
pixels[i, j] = (min(255, tr), min(255, tg), min(255, tb))
return img
def _apply_vintage_filter(self, img: Image.Image) -> Image.Image:
"""应用怀旧滤镜"""
# 降低饱和度
enhancer = ImageEnhance.Color(img)
img = enhancer.enhance(0.7)
# 增加对比度
enhancer = ImageEnhance.Contrast(img)
img = enhancer.enhance(1.2)
# 添加暖色调
pixels = img.load()
for i in range(img.width):
for j in range(img.height):
r, g, b = pixels[i, j][:3]
r = min(255, int(r * 1.1))
g = min(255, int(g * 1.05))
b = min(255, int(b * 0.9))
pixels[i, j] = (r, g, b)
return img
# 辅助方法
def _is_valid_image(self, image_path: str) -> bool:
"""验证图片文件"""
try:
file_ext = Path(image_path).suffix.lower()
return file_ext in self.supported_formats
except Exception:
return False
def _analyze_image(self, image_path: str) -> Dict[str, Any]:
"""分析图片信息"""
try:
with Image.open(image_path) as img:
return {
'width': img.width,
'height': img.height,
'format': img.format,
'mode': img.mode,
'size': os.path.getsize(image_path)
}
except Exception as e:
self.logger.error(f"分析图片失败: {e}")
return {}
def _generate_image_id(self, image_path: str, user_id: str) -> str:
"""生成图片ID"""
timestamp = str(int(datetime.now().timestamp() * 1000))
filename = os.path.basename(image_path)
return hashlib.md5(f"{timestamp}_{user_id}_{filename}".encode()).hexdigest()[:16]
def _get_original_image_path(self, image_id: str, user_id: str) -> Optional[str]:
"""获取原始图片路径"""
try:
# 搜索原始图片
prefix = f"original/{user_id}/{image_id}_"
objects = self.client.list_objects(self.bucket_name, prefix=prefix)
for obj in objects:
return obj.object_name
return None
except Exception:
return None
## 10.3 数据备份系统
### 10.3.1 自动化备份管理
```python
import schedule
import time
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import zipfile
import tempfile
import shutil
from concurrent.futures import ThreadPoolExecutor, as_completed
class DataBackupSystem:
"""数据备份系统"""
def __init__(self, source_endpoint: str, source_access_key: str, source_secret_key: str,
backup_endpoint: str, backup_access_key: str, backup_secret_key: str,
secure: bool = False):
# 源MinIO客户端
self.source_client = Minio(source_endpoint, source_access_key, source_secret_key, secure=secure)
# 备份MinIO客户端
self.backup_client = Minio(backup_endpoint, backup_access_key, backup_secret_key, secure=secure)
self.logger = logging.getLogger('DataBackupSystem')
# 备份配置
self.backup_config = {
'retention_days': 30, # 备份保留天数
'compression_enabled': True, # 启用压缩
'encryption_enabled': False, # 启用加密
'max_concurrent_uploads': 5, # 最大并发上传数
'chunk_size': 64 * 1024 * 1024, # 64MB
'verify_backup': True # 验证备份完整性
}
# 备份统计
self.backup_stats = {
'total_backups': 0,
'successful_backups': 0,
'failed_backups': 0,
'total_size_backed_up': 0,
'last_backup_time': None
}
def create_backup_job(self, source_bucket: str, backup_bucket: str,
backup_type: str = 'incremental') -> Dict[str, Any]:
"""创建备份任务"""
try:
backup_id = self._generate_backup_id(source_bucket, backup_type)
backup_timestamp = datetime.now()
# 确保备份存储桶存在
if not self.backup_client.bucket_exists(backup_bucket):
self.backup_client.make_bucket(backup_bucket)
self.logger.info(f"创建备份存储桶: {backup_bucket}")
# 获取源文件列表
source_objects = self._get_source_objects(source_bucket, backup_type, backup_timestamp)
if not source_objects:
return {
'success': True,
'backup_id': backup_id,
'message': '没有需要备份的文件',
'objects_count': 0
}
# 执行备份
backup_result = self._execute_backup(
source_bucket, backup_bucket, source_objects, backup_id, backup_type
)
# 更新统计信息
self._update_backup_stats(backup_result)
# 保存备份元数据
backup_metadata = {
'backup_id': backup_id,
'source_bucket': source_bucket,
'backup_bucket': backup_bucket,
'backup_type': backup_type,
'backup_timestamp': backup_timestamp.isoformat(),
'objects_count': len(source_objects),
'total_size': backup_result.get('total_size', 0),
'status': 'completed' if backup_result['success'] else 'failed',
'duration': backup_result.get('duration', 0),
'failed_objects': backup_result.get('failed_objects', [])
}
self._save_backup_metadata(backup_bucket, backup_metadata)
self.logger.info(
f"备份任务完成: {backup_id}, 成功备份 {backup_result.get('successful_count', 0)} 个文件"
)
return {
'success': backup_result['success'],
'backup_id': backup_id,
'backup_metadata': backup_metadata,
'backup_result': backup_result
}
except Exception as e:
self.logger.error(f"创建备份任务失败: {e}")
return {'success': False, 'error': str(e)}
def _execute_backup(self, source_bucket: str, backup_bucket: str,
source_objects: List[str], backup_id: str, backup_type: str) -> Dict[str, Any]:
"""执行备份操作"""
start_time = time.time()
successful_count = 0
failed_objects = []
total_size = 0
try:
with ThreadPoolExecutor(max_workers=self.backup_config['max_concurrent_uploads']) as executor:
# 提交备份任务
future_to_object = {
executor.submit(
self._backup_single_object,
source_bucket, backup_bucket, obj, backup_id, backup_type
): obj for obj in source_objects
}
# 处理完成的任务
for future in as_completed(future_to_object):
obj = future_to_object[future]
try:
result = future.result()
if result['success']:
successful_count += 1
total_size += result.get('size', 0)
else:
failed_objects.append({
'object': obj,
'error': result.get('error', 'Unknown error')
})
except Exception as e:
failed_objects.append({
'object': obj,
'error': str(e)
})
duration = time.time() - start_time
success_rate = successful_count / len(source_objects) if source_objects else 1.0
return {
'success': success_rate > 0.8, # 80%成功率认为备份成功
'successful_count': successful_count,
'failed_count': len(failed_objects),
'failed_objects': failed_objects,
'total_size': total_size,
'duration': duration,
'success_rate': success_rate
}
except Exception as e:
self.logger.error(f"执行备份失败: {e}")
return {
'success': False,
'error': str(e),
'successful_count': successful_count,
'failed_objects': failed_objects
}
def _backup_single_object(self, source_bucket: str, backup_bucket: str,
object_name: str, backup_id: str, backup_type: str) -> Dict[str, Any]:
"""备份单个对象"""
try:
# 获取源对象
source_object = self.source_client.get_object(source_bucket, object_name)
# 生成备份对象名
backup_object_name = f"{backup_type}/{backup_id}/{object_name}"
# 如果启用压缩
if self.backup_config['compression_enabled']:
backup_object_name += '.gz'
# 压缩数据
with tempfile.NamedTemporaryFile() as temp_file:
# 读取源数据
shutil.copyfileobj(source_object, temp_file)
temp_file.seek(0)
# 压缩并上传
with tempfile.NamedTemporaryFile() as compressed_file:
with gzip.open(compressed_file.name, 'wb') as gz_file:
shutil.copyfileobj(temp_file, gz_file)
compressed_file.seek(0)
file_size = os.path.getsize(compressed_file.name)
self.backup_client.put_object(
backup_bucket,
backup_object_name,
compressed_file,
file_size
)
else:
# 直接上传
file_size = source_object._content_length if hasattr(source_object, '_content_length') else 0
self.backup_client.put_object(
backup_bucket,
backup_object_name,
source_object,
file_size
)
# 验证备份
if self.backup_config['verify_backup']:
if not self._verify_backup(backup_bucket, backup_object_name):
return {'success': False, 'error': '备份验证失败'}
return {
'success': True,
'size': file_size,
'backup_object_name': backup_object_name
}
except Exception as e:
self.logger.error(f"备份对象失败 {object_name}: {e}")
return {'success': False, 'error': str(e)}
finally:
if 'source_object' in locals():
source_object.close()
def restore_backup(self, backup_bucket: str, backup_id: str,
target_bucket: str, target_prefix: str = '') -> Dict[str, Any]:
"""恢复备份"""
try:
# 获取备份元数据
backup_metadata = self._get_backup_metadata(backup_bucket, backup_id)
if not backup_metadata:
return {'success': False, 'error': '备份元数据不存在'}
# 确保目标存储桶存在
if not self.source_client.bucket_exists(target_bucket):
self.source_client.make_bucket(target_bucket)
self.logger.info(f"创建目标存储桶: {target_bucket}")
# 获取备份对象列表
backup_prefix = f"{backup_metadata['backup_type']}/{backup_id}/"
backup_objects = list(self.backup_client.list_objects(backup_bucket, prefix=backup_prefix))
if not backup_objects:
return {'success': False, 'error': '没有找到备份文件'}
# 执行恢复
restore_result = self._execute_restore(
backup_bucket, target_bucket, backup_objects, target_prefix, backup_metadata
)
self.logger.info(
f"恢复任务完成: {backup_id}, 成功恢复 {restore_result.get('successful_count', 0)} 个文件"
)
return {
'success': restore_result['success'],
'backup_id': backup_id,
'restore_result': restore_result
}
except Exception as e:
self.logger.error(f"恢复备份失败: {e}")
return {'success': False, 'error': str(e)}
def schedule_backup(self, source_bucket: str, backup_bucket: str,
schedule_time: str, backup_type: str = 'incremental'):
"""调度备份任务"""
def backup_job():
self.logger.info(f"开始执行调度备份: {source_bucket} -> {backup_bucket}")
result = self.create_backup_job(source_bucket, backup_bucket, backup_type)
if result['success']:
self.logger.info(f"调度备份成功: {result['backup_id']}")
else:
self.logger.error(f"调度备份失败: {result.get('error', 'Unknown error')}")
# 解析调度时间
if schedule_time == 'daily':
schedule.every().day.at("02:00").do(backup_job)
elif schedule_time == 'weekly':
schedule.every().week.do(backup_job)
elif schedule_time == 'hourly':
schedule.every().hour.do(backup_job)
else:
# 自定义时间格式 "HH:MM"
schedule.every().day.at(schedule_time).do(backup_job)
self.logger.info(f"备份任务已调度: {schedule_time}")
def start_scheduler(self):
"""启动调度器"""
self.logger.info("启动备份调度器")
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次
def cleanup_old_backups(self, backup_bucket: str) -> Dict[str, Any]:
"""清理过期备份"""
try:
cutoff_date = datetime.now() - timedelta(days=self.backup_config['retention_days'])
deleted_count = 0
total_size_freed = 0
# 获取所有备份元数据
metadata_objects = list(self.backup_client.list_objects(
backup_bucket, prefix='metadata/', recursive=True
))
for metadata_obj in metadata_objects:
try:
# 获取备份元数据
metadata_content = self.backup_client.get_object(
backup_bucket, metadata_obj.object_name
).read()
backup_metadata = json.loads(metadata_content.decode('utf-8'))
backup_date = datetime.fromisoformat(backup_metadata['backup_timestamp'])
# 检查是否过期
if backup_date < cutoff_date:
# 删除备份文件
backup_prefix = f"{backup_metadata['backup_type']}/{backup_metadata['backup_id']}/"
backup_objects = list(self.backup_client.list_objects(
backup_bucket, prefix=backup_prefix
))
for backup_obj in backup_objects:
self.backup_client.remove_object(backup_bucket, backup_obj.object_name)
total_size_freed += backup_obj.size
# 删除元数据
self.backup_client.remove_object(backup_bucket, metadata_obj.object_name)
deleted_count += 1
self.logger.info(f"删除过期备份: {backup_metadata['backup_id']}")
except Exception as e:
self.logger.error(f"处理备份元数据失败 {metadata_obj.object_name}: {e}")
continue
return {
'success': True,
'deleted_backups': deleted_count,
'size_freed': total_size_freed,
'cutoff_date': cutoff_date.isoformat()
}
except Exception as e:
self.logger.error(f"清理过期备份失败: {e}")
return {'success': False, 'error': str(e)}
# 辅助方法
def _generate_backup_id(self, source_bucket: str, backup_type: str) -> str:
"""生成备份ID"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
return f"{backup_type}_{source_bucket}_{timestamp}"
def _get_source_objects(self, source_bucket: str, backup_type: str,
backup_timestamp: datetime) -> List[str]:
"""获取需要备份的源对象列表"""
try:
objects = list(self.source_client.list_objects(source_bucket, recursive=True))
if backup_type == 'full':
return [obj.object_name for obj in objects]
elif backup_type == 'incremental':
# 获取上次备份时间
last_backup_time = self._get_last_backup_time(source_bucket)
if last_backup_time:
return [
obj.object_name for obj in objects
if obj.last_modified > last_backup_time
]
else:
# 如果没有上次备份记录,执行全量备份
return [obj.object_name for obj in objects]
else:
return []
except Exception as e:
self.logger.error(f"获取源对象列表失败: {e}")
return []
def _save_backup_metadata(self, backup_bucket: str, backup_metadata: Dict[str, Any]):
"""保存备份元数据"""
try:
metadata_key = f"metadata/{backup_metadata['backup_id']}.json"
metadata_content = json.dumps(backup_metadata, indent=2, ensure_ascii=False)
self.backup_client.put_object(
backup_bucket,
metadata_key,
io.BytesIO(metadata_content.encode('utf-8')),
len(metadata_content.encode('utf-8'))
)
except Exception as e:
self.logger.error(f"保存备份元数据失败: {e}")
def _get_backup_metadata(self, backup_bucket: str, backup_id: str) -> Optional[Dict[str, Any]]:
"""获取备份元数据"""
try:
metadata_key = f"metadata/{backup_id}.json"
metadata_content = self.backup_client.get_object(backup_bucket, metadata_key).read()
return json.loads(metadata_content.decode('utf-8'))
except Exception as e:
self.logger.error(f"获取备份元数据失败: {e}")
return None
def _verify_backup(self, backup_bucket: str, backup_object_name: str) -> bool:
"""验证备份完整性"""
try:
# 简单验证:检查对象是否存在
self.backup_client.stat_object(backup_bucket, backup_object_name)
return True
except Exception:
return False
def _update_backup_stats(self, backup_result: Dict[str, Any]):
"""更新备份统计信息"""
self.backup_stats['total_backups'] += 1
if backup_result['success']:
self.backup_stats['successful_backups'] += 1
else:
self.backup_stats['failed_backups'] += 1
self.backup_stats['total_size_backed_up'] += backup_result.get('total_size', 0)
self.backup_stats['last_backup_time'] = datetime.now().isoformat()
def get_backup_stats(self) -> Dict[str, Any]:
"""获取备份统计信息"""
return self.backup_stats.copy()
10.3.2 完整应用示例
# 数据备份系统使用示例
def main():
# 配置源和备份MinIO服务器
source_config = {
'endpoint': 'localhost:9000',
'access_key': 'minioadmin',
'secret_key': 'minioadmin'
}
backup_config = {
'endpoint': 'backup.example.com:9000',
'access_key': 'backup_admin',
'secret_key': 'backup_password'
}
# 创建备份系统
backup_system = DataBackupSystem(
source_config['endpoint'], source_config['access_key'], source_config['secret_key'],
backup_config['endpoint'], backup_config['access_key'], backup_config['secret_key']
)
# 1. 执行全量备份
print("执行全量备份...")
full_backup_result = backup_system.create_backup_job(
source_bucket='production-data',
backup_bucket='backup-storage',
backup_type='full'
)
if full_backup_result['success']:
print(f"全量备份成功: {full_backup_result['backup_id']}")
print(f"备份文件数: {full_backup_result['backup_metadata']['objects_count']}")
print(f"备份大小: {full_backup_result['backup_metadata']['total_size']} bytes")
else:
print(f"全量备份失败: {full_backup_result['error']}")
# 2. 调度增量备份
print("\n设置增量备份调度...")
backup_system.schedule_backup(
source_bucket='production-data',
backup_bucket='backup-storage',
schedule_time='daily', # 每天执行
backup_type='incremental'
)
# 3. 模拟数据恢复
if full_backup_result['success']:
print("\n执行数据恢复...")
restore_result = backup_system.restore_backup(
backup_bucket='backup-storage',
backup_id=full_backup_result['backup_id'],
target_bucket='restored-data'
)
if restore_result['success']:
print(f"数据恢复成功: {restore_result['backup_id']}")
else:
print(f"数据恢复失败: {restore_result['error']}")
# 4. 清理过期备份
print("\n清理过期备份...")
cleanup_result = backup_system.cleanup_old_backups('backup-storage')
if cleanup_result['success']:
print(f"清理完成: 删除 {cleanup_result['deleted_backups']} 个过期备份")
print(f"释放空间: {cleanup_result['size_freed']} bytes")
else:
print(f"清理失败: {cleanup_result['error']}")
# 5. 获取备份统计
print("\n备份统计信息:")
stats = backup_system.get_backup_stats()
for key, value in stats.items():
print(f"{key}: {value}")
if __name__ == '__main__':
main()
10.4 日志分析系统
10.4.1 日志收集和存储
import re
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Generator
import gzip
import json
from collections import defaultdict, Counter
import pandas as pd
import matplotlib.pyplot as plt
from io import StringIO, BytesIO
class LogAnalysisSystem:
"""日志分析系统"""
def __init__(self, endpoint: str, access_key: str, secret_key: str, secure: bool = False):
self.client = Minio(endpoint, access_key, secret_key, secure=secure)
self.logger = logging.getLogger('LogAnalysisSystem')
# 日志配置
self.log_config = {
'bucket_name': 'log-storage',
'retention_days': 90,
'compression_enabled': True,
'index_enabled': True,
'real_time_analysis': True
}
# 日志模式
self.log_patterns = {
'apache': r'(?P<ip>\S+) \S+ \S+ \[(?P<timestamp>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>\S+)" (?P<status>\d+) (?P<size>\S+)',
'nginx': r'(?P<ip>\S+) - \S+ \[(?P<timestamp>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>\S+)" (?P<status>\d+) (?P<size>\d+)',
'application': r'(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (?P<level>\w+) (?P<logger>\S+) - (?P<message>.*)',
'json': None # JSON格式日志
}
# 分析结果缓存
self.analysis_cache = {}
# 确保存储桶存在
self._ensure_bucket_exists()
def upload_log_file(self, log_file_path: str, log_type: str,
source_system: str, timestamp: Optional[datetime] = None) -> Dict[str, Any]:
"""上传日志文件"""
try:
if timestamp is None:
timestamp = datetime.now()
# 生成对象名
date_str = timestamp.strftime('%Y/%m/%d')
filename = os.path.basename(log_file_path)
object_name = f"{log_type}/{source_system}/{date_str}/{filename}"
# 如果启用压缩
if self.log_config['compression_enabled'] and not filename.endswith('.gz'):
object_name += '.gz'
# 压缩文件
with open(log_file_path, 'rb') as f_in:
with tempfile.NamedTemporaryFile() as temp_file:
with gzip.open(temp_file.name, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
temp_file.seek(0)
file_size = os.path.getsize(temp_file.name)
self.client.put_object(
self.log_config['bucket_name'],
object_name,
temp_file,
file_size
)
else:
# 直接上传
file_size = os.path.getsize(log_file_path)
with open(log_file_path, 'rb') as file_data:
self.client.put_object(
self.log_config['bucket_name'],
object_name,
file_data,
file_size
)
# 创建索引(如果启用)
if self.log_config['index_enabled']:
self._create_log_index(object_name, log_type, source_system, timestamp, file_size)
self.logger.info(f"日志文件上传成功: {object_name}")
return {
'success': True,
'object_name': object_name,
'file_size': file_size,
'compressed': self.log_config['compression_enabled']
}
except Exception as e:
self.logger.error(f"上传日志文件失败: {e}")
return {'success': False, 'error': str(e)}
def analyze_logs(self, log_type: str, source_system: str,
start_date: datetime, end_date: datetime,
analysis_type: str = 'summary') -> Dict[str, Any]:
"""分析日志"""
try:
# 获取日志文件列表
log_objects = self._get_log_objects(log_type, source_system, start_date, end_date)
if not log_objects:
return {'success': False, 'error': '没有找到匹配的日志文件'}
# 解析日志
parsed_logs = []
for obj in log_objects:
logs = self._parse_log_file(obj.object_name, log_type)
parsed_logs.extend(logs)
if not parsed_logs:
return {'success': False, 'error': '没有解析到有效的日志条目'}
# 执行分析
if analysis_type == 'summary':
analysis_result = self._analyze_summary(parsed_logs)
elif analysis_type == 'traffic':
analysis_result = self._analyze_traffic(parsed_logs)
elif analysis_type == 'errors':
analysis_result = self._analyze_errors(parsed_logs)
elif analysis_type == 'performance':
analysis_result = self._analyze_performance(parsed_logs)
elif analysis_type == 'security':
analysis_result = self._analyze_security(parsed_logs)
else:
return {'success': False, 'error': f'不支持的分析类型: {analysis_type}'}
# 保存分析结果
result_object_name = self._save_analysis_result(
analysis_result, log_type, source_system, analysis_type, start_date, end_date
)
return {
'success': True,
'analysis_result': analysis_result,
'result_object_name': result_object_name,
'logs_processed': len(parsed_logs),
'files_processed': len(log_objects)
}
except Exception as e:
self.logger.error(f"日志分析失败: {e}")
return {'success': False, 'error': str(e)}
def generate_report(self, log_type: str, source_system: str,
start_date: datetime, end_date: datetime) -> Dict[str, Any]:
"""生成日志报告"""
try:
# 执行多种分析
analyses = {}
analysis_types = ['summary', 'traffic', 'errors', 'performance', 'security']
for analysis_type in analysis_types:
result = self.analyze_logs(
log_type, source_system, start_date, end_date, analysis_type
)
if result['success']:
analyses[analysis_type] = result['analysis_result']
# 生成综合报告
report = {
'report_id': self._generate_report_id(log_type, source_system, start_date, end_date),
'log_type': log_type,
'source_system': source_system,
'period': {
'start_date': start_date.isoformat(),
'end_date': end_date.isoformat()
},
'generated_at': datetime.now().isoformat(),
'analyses': analyses,
'summary': self._generate_report_summary(analyses)
}
# 保存报告
report_object_name = self._save_report(report)
# 生成可视化图表
charts = self._generate_charts(analyses)
return {
'success': True,
'report': report,
'report_object_name': report_object_name,
'charts': charts
}
except Exception as e:
self.logger.error(f"生成日志报告失败: {e}")
return {'success': False, 'error': str(e)}
def search_logs(self, query: str, log_type: str, source_system: str,
start_date: datetime, end_date: datetime,
limit: int = 1000) -> Dict[str, Any]:
"""搜索日志"""
try:
# 获取日志文件列表
log_objects = self._get_log_objects(log_type, source_system, start_date, end_date)
if not log_objects:
return {'success': False, 'error': '没有找到匹配的日志文件'}
# 搜索匹配的日志条目
matching_logs = []
processed_count = 0
for obj in log_objects:
if len(matching_logs) >= limit:
break
logs = self._search_in_log_file(obj.object_name, query, log_type)
matching_logs.extend(logs[:limit - len(matching_logs)])
processed_count += 1
return {
'success': True,
'matching_logs': matching_logs,
'total_matches': len(matching_logs),
'files_searched': processed_count,
'query': query
}
except Exception as e:
self.logger.error(f"搜索日志失败: {e}")
return {'success': False, 'error': str(e)}
# 辅助方法
def _ensure_bucket_exists(self):
"""确保存储桶存在"""
try:
if not self.client.bucket_exists(self.log_config['bucket_name']):
self.client.make_bucket(self.log_config['bucket_name'])
self.logger.info(f"创建日志存储桶: {self.log_config['bucket_name']}")
except Exception as e:
self.logger.error(f"创建存储桶失败: {e}")
def _parse_log_file(self, object_name: str, log_type: str) -> List[Dict[str, Any]]:
"""解析日志文件"""
try:
# 获取日志文件
log_object = self.client.get_object(self.log_config['bucket_name'], object_name)
# 处理压缩文件
if object_name.endswith('.gz'):
content = gzip.decompress(log_object.read()).decode('utf-8')
else:
content = log_object.read().decode('utf-8')
# 解析日志条目
parsed_logs = []
pattern = self.log_patterns.get(log_type)
for line in content.strip().split('\n'):
if not line.strip():
continue
if log_type == 'json':
try:
log_entry = json.loads(line)
parsed_logs.append(log_entry)
except json.JSONDecodeError:
continue
elif pattern:
match = re.match(pattern, line)
if match:
log_entry = match.groupdict()
parsed_logs.append(log_entry)
else:
# 简单文本日志
parsed_logs.append({'raw': line})
return parsed_logs
except Exception as e:
self.logger.error(f"解析日志文件失败 {object_name}: {e}")
return []
finally:
if 'log_object' in locals():
log_object.close()
def _analyze_summary(self, logs: List[Dict[str, Any]]) -> Dict[str, Any]:
"""生成摘要分析"""
total_logs = len(logs)
# 统计状态码(如果有)
status_codes = Counter()
methods = Counter()
ips = Counter()
for log in logs:
if 'status' in log:
status_codes[log['status']] += 1
if 'method' in log:
methods[log['method']] += 1
if 'ip' in log:
ips[log['ip']] += 1
return {
'total_logs': total_logs,
'top_status_codes': dict(status_codes.most_common(10)),
'top_methods': dict(methods.most_common(10)),
'top_ips': dict(ips.most_common(10)),
'unique_ips': len(ips)
}
def _generate_report_id(self, log_type: str, source_system: str,
start_date: datetime, end_date: datetime) -> str:
"""生成报告ID"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
return f"report_{log_type}_{source_system}_{timestamp}"
10.5 总结
MinIO 实战应用案例展示了对象存储在现代应用架构中的重要作用:
10.5.1 核心应用场景
企业文件管理
- 统一文件存储和管理
- 版本控制和历史追踪
- 权限管理和安全控制
- 自动化文件生命周期管理
图片处理服务
- 智能图片上传和存储
- 自动缩略图生成
- 多种格式转换和优化
- 丰富的滤镜和特效处理
数据备份系统
- 自动化备份调度
- 增量和全量备份策略
- 数据压缩和加密
- 备份验证和恢复
日志分析系统
- 大规模日志收集和存储
- 实时日志分析和搜索
- 多维度统计和报告
- 可视化图表生成
10.5.2 技术特点
高可扩展性
- 支持PB级数据存储
- 水平扩展能力
- 分布式架构设计
高性能
- 并发处理能力
- 多部分上传优化
- 智能缓存策略
高可靠性
- 数据冗余和备份
- 故障自动恢复
- 完整性验证
易于集成
- 标准S3 API兼容
- 丰富的SDK支持
- 灵活的部署方式
10.5.3 最佳实践
存储桶设计
- 合理的命名规范
- 适当的分区策略
- 生命周期管理
性能优化
- 并发上传下载
- 数据压缩
- 缓存策略
安全管理
- 访问控制策略
- 数据加密
- 审计日志
运维监控
- 系统监控
- 性能分析
- 告警机制
通过这些实战案例,我们可以看到 MinIO 在企业级应用中的强大能力和灵活性。无论是文件管理、图片处理、数据备份还是日志分析,MinIO 都能提供稳定、高效的对象存储解决方案。
下一章预告:在下一章中,我们将深入探讨 MinIO 的高级特性和企业级部署实践,包括多租户管理、数据治理、合规性要求等内容。