3.1 对象基本操作
3.1.1 对象上传
”`python from minio import Minio from minio.error import S3Error from datetime import datetime, timedelta import io import os import json import hashlib import mimetypes from typing import Dict, List, Any, Optional, Union from pathlib import Path
class ObjectManager: “”“对象管理器”“”
def __init__(self, client: Minio):
self.client = client
def upload_file(self, bucket_name: str, object_name: str, file_path: str,
content_type: str = None, metadata: Dict[str, str] = None,
tags: Dict[str, str] = None, progress_callback=None) -> Dict[str, Any]:
"""上传文件"""
try:
# 检查文件是否存在
if not os.path.exists(file_path):
return {
'success': False,
'error': f"文件不存在: {file_path}",
'file_path': file_path
}
# 获取文件信息
file_size = os.path.getsize(file_path)
# 自动检测内容类型
if not content_type:
content_type, _ = mimetypes.guess_type(file_path)
if not content_type:
content_type = 'application/octet-stream'
# 计算文件哈希
file_hash = self._calculate_file_hash(file_path)
# 准备元数据
if not metadata:
metadata = {}
metadata.update({
'original-filename': os.path.basename(file_path),
'upload-timestamp': datetime.now().isoformat(),
'file-size': str(file_size),
'file-hash': file_hash
})
# 上传文件
start_time = datetime.now()
result = self.client.fput_object(
bucket_name=bucket_name,
object_name=object_name,
file_path=file_path,
content_type=content_type,
metadata=metadata,
tags=tags,
progress=progress_callback
)
upload_time = (datetime.now() - start_time).total_seconds()
return {
'success': True,
'message': f"文件上传成功: {file_path} -> {bucket_name}/{object_name}",
'bucket_name': bucket_name,
'object_name': object_name,
'file_path': file_path,
'file_size': file_size,
'content_type': content_type,
'etag': result.etag,
'version_id': result.version_id,
'upload_time_seconds': upload_time,
'upload_speed_mbps': round((file_size / (1024 * 1024)) / upload_time, 2) if upload_time > 0 else 0,
'metadata': metadata,
'tags': tags,
'uploaded_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"上传文件失败: {e}",
'bucket_name': bucket_name,
'object_name': object_name,
'file_path': file_path
}
def upload_data(self, bucket_name: str, object_name: str, data: Union[bytes, str],
content_type: str = 'application/octet-stream',
metadata: Dict[str, str] = None,
tags: Dict[str, str] = None) -> Dict[str, Any]:
"""上传数据"""
try:
# 处理字符串数据
if isinstance(data, str):
data = data.encode('utf-8')
if content_type == 'application/octet-stream':
content_type = 'text/plain; charset=utf-8'
data_size = len(data)
data_stream = io.BytesIO(data)
# 计算数据哈希
data_hash = hashlib.md5(data).hexdigest()
# 准备元数据
if not metadata:
metadata = {}
metadata.update({
'upload-timestamp': datetime.now().isoformat(),
'data-size': str(data_size),
'data-hash': data_hash,
'upload-type': 'data'
})
# 上传数据
start_time = datetime.now()
result = self.client.put_object(
bucket_name=bucket_name,
object_name=object_name,
data=data_stream,
length=data_size,
content_type=content_type,
metadata=metadata,
tags=tags
)
upload_time = (datetime.now() - start_time).total_seconds()
return {
'success': True,
'message': f"数据上传成功: {data_size} 字节 -> {bucket_name}/{object_name}",
'bucket_name': bucket_name,
'object_name': object_name,
'data_size': data_size,
'content_type': content_type,
'etag': result.etag,
'version_id': result.version_id,
'upload_time_seconds': upload_time,
'metadata': metadata,
'tags': tags,
'uploaded_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"上传数据失败: {e}",
'bucket_name': bucket_name,
'object_name': object_name
}
def _calculate_file_hash(self, file_path: str) -> str:
"""计算文件MD5哈希"""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
3.1.2 对象下载
”`python class ObjectDownloader: “”“对象下载器”“”
def __init__(self, client: Minio):
self.client = client
def download_file(self, bucket_name: str, object_name: str, file_path: str,
version_id: str = None, progress_callback=None) -> Dict[str, Any]:
"""下载文件"""
try:
# 创建目标目录
os.makedirs(os.path.dirname(file_path), exist_ok=True)
# 获取对象信息
stat = self.client.stat_object(bucket_name, object_name, version_id=version_id)
# 下载文件
start_time = datetime.now()
self.client.fget_object(
bucket_name=bucket_name,
object_name=object_name,
file_path=file_path,
version_id=version_id,
progress=progress_callback
)
download_time = (datetime.now() - start_time).total_seconds()
# 验证下载的文件
downloaded_size = os.path.getsize(file_path)
return {
'success': True,
'message': f"文件下载成功: {bucket_name}/{object_name} -> {file_path}",
'bucket_name': bucket_name,
'object_name': object_name,
'file_path': file_path,
'file_size': downloaded_size,
'expected_size': stat.size,
'size_match': downloaded_size == stat.size,
'content_type': stat.content_type,
'etag': stat.etag,
'version_id': version_id,
'download_time_seconds': download_time,
'download_speed_mbps': round((downloaded_size / (1024 * 1024)) / download_time, 2) if download_time > 0 else 0,
'last_modified': stat.last_modified.isoformat() if stat.last_modified else None,
'metadata': stat.metadata,
'downloaded_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"下载文件失败: {e}",
'bucket_name': bucket_name,
'object_name': object_name,
'file_path': file_path
}
def download_data(self, bucket_name: str, object_name: str,
version_id: str = None, offset: int = 0,
length: int = None) -> Dict[str, Any]:
"""下载数据到内存"""
try:
# 获取对象信息
stat = self.client.stat_object(bucket_name, object_name, version_id=version_id)
# 下载数据
start_time = datetime.now()
response = self.client.get_object(
bucket_name=bucket_name,
object_name=object_name,
version_id=version_id,
offset=offset,
length=length
)
data = response.read()
response.close()
download_time = (datetime.now() - start_time).total_seconds()
return {
'success': True,
'message': f"数据下载成功: {bucket_name}/{object_name}",
'bucket_name': bucket_name,
'object_name': object_name,
'data': data,
'data_size': len(data),
'expected_size': stat.size,
'content_type': stat.content_type,
'etag': stat.etag,
'version_id': version_id,
'offset': offset,
'length': length,
'download_time_seconds': download_time,
'last_modified': stat.last_modified.isoformat() if stat.last_modified else None,
'metadata': stat.metadata,
'downloaded_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"下载数据失败: {e}",
'bucket_name': bucket_name,
'object_name': object_name
}
def download_partial(self, bucket_name: str, object_name: str,
start_byte: int, end_byte: int,
version_id: str = None) -> Dict[str, Any]:
"""部分下载(范围下载)"""
try:
# 计算下载长度
length = end_byte - start_byte + 1
# 下载指定范围的数据
result = self.download_data(
bucket_name=bucket_name,
object_name=object_name,
version_id=version_id,
offset=start_byte,
length=length
)
if result['success']:
result['start_byte'] = start_byte
result['end_byte'] = end_byte
result['range'] = f"bytes={start_byte}-{end_byte}"
return result
except Exception as e:
return {
'success': False,
'error': f"部分下载失败: {e}",
'bucket_name': bucket_name,
'object_name': object_name,
'start_byte': start_byte,
'end_byte': end_byte
}
3.2 对象元数据管理
3.2.1 元数据操作
”`python class ObjectMetadataManager: “”“对象元数据管理器”“”
def __init__(self, client: Minio):
self.client = client
def get_object_info(self, bucket_name: str, object_name: str,
version_id: str = None) -> Dict[str, Any]:
"""获取对象详细信息"""
try:
# 获取对象统计信息
stat = self.client.stat_object(bucket_name, object_name, version_id=version_id)
# 获取对象标签
try:
tags = self.client.get_object_tags(bucket_name, object_name, version_id=version_id)
tags_dict = {tag.key: tag.value for tag in tags.tag_set}
except:
tags_dict = {}
return {
'success': True,
'bucket_name': bucket_name,
'object_name': object_name,
'version_id': version_id,
'size': stat.size,
'size_mb': round(stat.size / (1024 * 1024), 2) if stat.size else 0,
'etag': stat.etag,
'content_type': stat.content_type,
'last_modified': stat.last_modified.isoformat() if stat.last_modified else None,
'metadata': stat.metadata or {},
'tags': tags_dict,
'storage_class': getattr(stat, 'storage_class', None),
'expires': getattr(stat, 'expires', None),
'cache_control': getattr(stat, 'cache_control', None),
'content_disposition': getattr(stat, 'content_disposition', None),
'content_encoding': getattr(stat, 'content_encoding', None),
'content_language': getattr(stat, 'content_language', None),
'retrieved_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"获取对象信息失败: {e}",
'bucket_name': bucket_name,
'object_name': object_name,
'version_id': version_id
}
def update_metadata(self, bucket_name: str, object_name: str,
metadata: Dict[str, str], version_id: str = None) -> Dict[str, Any]:
"""更新对象元数据"""
try:
# 获取当前对象信息
current_info = self.get_object_info(bucket_name, object_name, version_id)
if not current_info['success']:
return current_info
# 合并元数据
new_metadata = current_info['metadata'].copy()
new_metadata.update(metadata)
new_metadata['metadata-updated'] = datetime.now().isoformat()
# 复制对象以更新元数据
from minio.commonconfig import CopySource
copy_source = CopySource(
bucket_name=bucket_name,
object_name=object_name,
version_id=version_id
)
result = self.client.copy_object(
bucket_name=bucket_name,
object_name=object_name,
source=copy_source,
metadata=new_metadata,
metadata_directive="REPLACE"
)
return {
'success': True,
'message': f"对象元数据更新成功: {bucket_name}/{object_name}",
'bucket_name': bucket_name,
'object_name': object_name,
'version_id': version_id,
'new_version_id': result.version_id,
'etag': result.etag,
'old_metadata': current_info['metadata'],
'new_metadata': new_metadata,
'updated_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"更新元数据失败: {e}",
'bucket_name': bucket_name,
'object_name': object_name
}
def set_object_tags(self, bucket_name: str, object_name: str,
tags: Dict[str, str], version_id: str = None) -> Dict[str, Any]:
"""设置对象标签"""
try:
from minio.tagging import Tagging
# 创建标签对象
tagging = Tagging()
for key, value in tags.items():
tagging[key] = value
# 设置标签
self.client.set_object_tags(
bucket_name=bucket_name,
object_name=object_name,
tags=tagging,
version_id=version_id
)
return {
'success': True,
'message': f"对象标签设置成功: {bucket_name}/{object_name}",
'bucket_name': bucket_name,
'object_name': object_name,
'version_id': version_id,
'tags': tags,
'tag_count': len(tags),
'set_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"设置标签失败: {e}",
'bucket_name': bucket_name,
'object_name': object_name,
'tags': tags
}
def delete_object_tags(self, bucket_name: str, object_name: str,
version_id: str = None) -> Dict[str, Any]:
"""删除对象标签"""
try:
# 获取当前标签
current_info = self.get_object_info(bucket_name, object_name, version_id)
current_tags = current_info.get('tags', {}) if current_info['success'] else {}
# 删除标签
self.client.delete_object_tags(
bucket_name=bucket_name,
object_name=object_name,
version_id=version_id
)
return {
'success': True,
'message': f"对象标签删除成功: {bucket_name}/{object_name}",
'bucket_name': bucket_name,
'object_name': object_name,
'version_id': version_id,
'deleted_tags': current_tags,
'deleted_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"删除标签失败: {e}",
'bucket_name': bucket_name,
'object_name': object_name
}
3.3 对象列表和搜索
3.3.1 对象列表操作
”`python class ObjectLister: “”“对象列表器”“”
def __init__(self, client: Minio):
self.client = client
def list_objects(self, bucket_name: str, prefix: str = None,
recursive: bool = True, include_version: bool = False,
max_keys: int = 1000, start_after: str = None) -> Dict[str, Any]:
"""列出对象"""
try:
objects = self.client.list_objects(
bucket_name=bucket_name,
prefix=prefix,
recursive=recursive,
include_version=include_version,
start_after=start_after
)
object_list = []
total_size = 0
for obj in objects:
object_info = {
'object_name': obj.object_name,
'size': obj.size,
'size_mb': round(obj.size / (1024 * 1024), 2) if obj.size else 0,
'etag': obj.etag,
'last_modified': obj.last_modified.isoformat() if obj.last_modified else None,
'content_type': getattr(obj, 'content_type', None),
'storage_class': getattr(obj, 'storage_class', None)
}
# 版本信息(如果包含版本)
if include_version:
object_info.update({
'version_id': getattr(obj, 'version_id', None),
'is_latest': getattr(obj, 'is_latest', True),
'is_delete_marker': getattr(obj, 'is_delete_marker', False)
})
object_list.append(object_info)
if obj.size:
total_size += obj.size
# 限制返回数量
if len(object_list) >= max_keys:
break
return {
'success': True,
'bucket_name': bucket_name,
'prefix': prefix,
'recursive': recursive,
'include_version': include_version,
'total_objects': len(object_list),
'total_size': total_size,
'total_size_mb': round(total_size / (1024 * 1024), 2),
'objects': object_list,
'retrieved_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"列出对象失败: {e}",
'bucket_name': bucket_name,
'prefix': prefix
}
def search_objects(self, bucket_name: str, search_term: str,
search_in: List[str] = None, case_sensitive: bool = False,
max_results: int = 100) -> Dict[str, Any]:
"""搜索对象"""
try:
if not search_in:
search_in = ['object_name', 'content_type', 'metadata']
# 获取所有对象
all_objects_result = self.list_objects(bucket_name, max_keys=10000)
if not all_objects_result['success']:
return all_objects_result
matched_objects = []
search_term_processed = search_term if case_sensitive else search_term.lower()
for obj in all_objects_result['objects']:
match_found = False
match_details = []
# 搜索对象名称
if 'object_name' in search_in:
obj_name = obj['object_name'] if case_sensitive else obj['object_name'].lower()
if search_term_processed in obj_name:
match_found = True
match_details.append('object_name')
# 搜索内容类型
if 'content_type' in search_in and obj.get('content_type'):
content_type = obj['content_type'] if case_sensitive else obj['content_type'].lower()
if search_term_processed in content_type:
match_found = True
match_details.append('content_type')
# 搜索元数据(需要单独获取)
if 'metadata' in search_in and not match_found:
try:
metadata_result = ObjectMetadataManager(self.client).get_object_info(
bucket_name, obj['object_name']
)
if metadata_result['success']:
metadata = metadata_result['metadata']
for key, value in metadata.items():
search_text = f"{key}:{value}" if case_sensitive else f"{key}:{value}".lower()
if search_term_processed in search_text:
match_found = True
match_details.append(f'metadata.{key}')
break
except:
pass
if match_found:
obj['match_details'] = match_details
matched_objects.append(obj)
if len(matched_objects) >= max_results:
break
return {
'success': True,
'bucket_name': bucket_name,
'search_term': search_term,
'search_in': search_in,
'case_sensitive': case_sensitive,
'total_searched': len(all_objects_result['objects']),
'total_matched': len(matched_objects),
'matched_objects': matched_objects,
'searched_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f"搜索对象失败: {e}",
'bucket_name': bucket_name,
'search_term': search_term
}
def get_objects_by_size(self, bucket_name: str, min_size: int = None,
max_size: int = None, prefix: str = None) -> Dict[str, Any]:
"""按大小筛选对象"""
try:
# 获取所有对象
all_objects_result = self.list_objects(bucket_name, prefix=prefix, max_keys=10000)
if not all_objects_result['success']:
return all_objects_result
filtered_objects = []
for obj in all_objects_result['objects']:
obj_size = obj['size'] or 0
# 应用大小过滤器
if min_size is not None and obj_size < min_size:
continue
if max_size is not None and obj_size > max_size:
continue
filtered_objects.append(obj)
# 按大小排序
filtered_objects.sort(key=lambda x: x['size'] or 0, reverse=True)
return {
'success': True,
'bucket_name': bucket_name,
'prefix': prefix,
'min_size': min_size,
'max_size': max_size,
'total_objects': len(filtered_objects),
'total_size': sum(obj['size'] or 0 for obj in filtered_objects),
'objects': filtered_objects,
'filtered_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f"按大小筛选对象失败: {e}",
'bucket_name': bucket_name
}
def get_objects_by_date(self, bucket_name: str, start_date: datetime = None,
end_date: datetime = None, prefix: str = None) -> Dict[str, Any]:
"""按日期筛选对象"""
try:
# 获取所有对象
all_objects_result = self.list_objects(bucket_name, prefix=prefix, max_keys=10000)
if not all_objects_result['success']:
return all_objects_result
filtered_objects = []
for obj in all_objects_result['objects']:
if not obj['last_modified']:
continue
obj_date = datetime.fromisoformat(obj['last_modified'].replace('Z', '+00:00'))
# 应用日期过滤器
if start_date and obj_date < start_date:
continue
if end_date and obj_date > end_date:
continue
filtered_objects.append(obj)
# 按日期排序
filtered_objects.sort(key=lambda x: x['last_modified'] or '', reverse=True)
return {
'success': True,
'bucket_name': bucket_name,
'prefix': prefix,
'start_date': start_date.isoformat() if start_date else None,
'end_date': end_date.isoformat() if end_date else None,
'total_objects': len(filtered_objects),
'objects': filtered_objects,
'filtered_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f"按日期筛选对象失败: {e}",
'bucket_name': bucket_name
}
3.4 对象复制和移动
3.4.1 对象复制操作
”`python from minio.commonconfig import CopySource
class ObjectCopyManager: “”“对象复制管理器”“”
def __init__(self, client: Minio):
self.client = client
def copy_object(self, source_bucket: str, source_object: str,
dest_bucket: str, dest_object: str,
source_version_id: str = None,
metadata: Dict[str, str] = None,
tags: Dict[str, str] = None,
preserve_metadata: bool = True) -> Dict[str, Any]:
"""复制对象"""
try:
# 创建复制源
copy_source = CopySource(
bucket_name=source_bucket,
object_name=source_object,
version_id=source_version_id
)
# 获取源对象信息
source_info = ObjectMetadataManager(self.client).get_object_info(
source_bucket, source_object, source_version_id
)
# 准备元数据
if preserve_metadata and source_info['success']:
copy_metadata = source_info['metadata'].copy()
if metadata:
copy_metadata.update(metadata)
copy_metadata['copied-from'] = f"{source_bucket}/{source_object}"
copy_metadata['copied-at'] = datetime.now().isoformat()
else:
copy_metadata = metadata or {}
# 执行复制
start_time = datetime.now()
result = self.client.copy_object(
bucket_name=dest_bucket,
object_name=dest_object,
source=copy_source,
metadata=copy_metadata,
metadata_directive="REPLACE" if copy_metadata else "COPY",
tags=tags
)
copy_time = (datetime.now() - start_time).total_seconds()
return {
'success': True,
'message': f"对象复制成功: {source_bucket}/{source_object} -> {dest_bucket}/{dest_object}",
'source_bucket': source_bucket,
'source_object': source_object,
'source_version_id': source_version_id,
'dest_bucket': dest_bucket,
'dest_object': dest_object,
'dest_version_id': result.version_id,
'etag': result.etag,
'copy_time_seconds': copy_time,
'preserve_metadata': preserve_metadata,
'metadata': copy_metadata,
'tags': tags,
'source_size': source_info.get('size', 0) if source_info['success'] else 0,
'copied_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"复制对象失败: {e}",
'source_bucket': source_bucket,
'source_object': source_object,
'dest_bucket': dest_bucket,
'dest_object': dest_object
}
def move_object(self, source_bucket: str, source_object: str,
dest_bucket: str, dest_object: str,
source_version_id: str = None,
metadata: Dict[str, str] = None,
tags: Dict[str, str] = None) -> Dict[str, Any]:
"""移动对象(复制后删除源对象)"""
try:
# 先复制对象
copy_result = self.copy_object(
source_bucket=source_bucket,
source_object=source_object,
dest_bucket=dest_bucket,
dest_object=dest_object,
source_version_id=source_version_id,
metadata=metadata,
tags=tags
)
if not copy_result['success']:
return copy_result
# 删除源对象
delete_result = ObjectDeleter(self.client).delete_object(
bucket_name=source_bucket,
object_name=source_object,
version_id=source_version_id
)
if not delete_result['success']:
# 复制成功但删除失败,记录警告
copy_result['warning'] = f"对象复制成功但删除源对象失败: {delete_result['error']}"
copy_result['operation'] = 'copy_only'
else:
copy_result['operation'] = 'move'
copy_result['source_deleted'] = True
copy_result['message'] = f"对象移动成功: {source_bucket}/{source_object} -> {dest_bucket}/{dest_object}"
return copy_result
except Exception as e:
return {
'success': False,
'error': f"移动对象失败: {e}",
'source_bucket': source_bucket,
'source_object': source_object,
'dest_bucket': dest_bucket,
'dest_object': dest_object
}
def batch_copy(self, operations: List[Dict[str, str]],
preserve_metadata: bool = True) -> Dict[str, Any]:
"""批量复制对象"""
results = {
'total_operations': len(operations),
'successful': 0,
'failed': 0,
'operations': [],
'start_time': datetime.now().isoformat()
}
for i, op in enumerate(operations):
try:
copy_result = self.copy_object(
source_bucket=op['source_bucket'],
source_object=op['source_object'],
dest_bucket=op['dest_bucket'],
dest_object=op['dest_object'],
source_version_id=op.get('source_version_id'),
metadata=op.get('metadata'),
tags=op.get('tags'),
preserve_metadata=preserve_metadata
)
copy_result['operation_index'] = i
results['operations'].append(copy_result)
if copy_result['success']:
results['successful'] += 1
else:
results['failed'] += 1
except Exception as e:
error_result = {
'success': False,
'error': f"批量复制操作失败: {e}",
'operation_index': i,
'operation': op
}
results['operations'].append(error_result)
results['failed'] += 1
results['end_time'] = datetime.now().isoformat()
results['success_rate'] = (results['successful'] / results['total_operations']) * 100
return results
3.5 对象删除操作
3.5.1 对象删除管理
”`python class ObjectDeleter: “”“对象删除器”“”
def __init__(self, client: Minio):
self.client = client
def delete_object(self, bucket_name: str, object_name: str,
version_id: str = None) -> Dict[str, Any]:
"""删除单个对象"""
try:
# 获取对象信息(删除前)
object_info = ObjectMetadataManager(self.client).get_object_info(
bucket_name, object_name, version_id
)
# 删除对象
self.client.remove_object(
bucket_name=bucket_name,
object_name=object_name,
version_id=version_id
)
return {
'success': True,
'message': f"对象删除成功: {bucket_name}/{object_name}",
'bucket_name': bucket_name,
'object_name': object_name,
'version_id': version_id,
'deleted_size': object_info.get('size', 0) if object_info['success'] else 0,
'deleted_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"删除对象失败: {e}",
'bucket_name': bucket_name,
'object_name': object_name,
'version_id': version_id
}
def delete_objects(self, bucket_name: str, object_names: List[str],
version_ids: List[str] = None) -> Dict[str, Any]:
"""批量删除对象"""
try:
# 准备删除对象列表
if version_ids:
if len(object_names) != len(version_ids):
return {
'success': False,
'error': "对象名称和版本ID数量不匹配",
'bucket_name': bucket_name
}
delete_objects = [
self.client.delete_object_tags(bucket_name, name, version_id)
for name, version_id in zip(object_names, version_ids)
]
else:
delete_objects = [
self.client.delete_object_tags(bucket_name, name)
for name in object_names
]
# 执行批量删除
start_time = datetime.now()
errors = self.client.remove_objects(bucket_name, delete_objects)
error_list = list(errors)
delete_time = (datetime.now() - start_time).total_seconds()
# 统计结果
total_requested = len(object_names)
failed_count = len(error_list)
successful_count = total_requested - failed_count
return {
'success': failed_count == 0,
'message': f"批量删除完成: {successful_count}/{total_requested} 成功",
'bucket_name': bucket_name,
'total_requested': total_requested,
'successful_count': successful_count,
'failed_count': failed_count,
'errors': [str(error) for error in error_list],
'delete_time_seconds': delete_time,
'deleted_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"批量删除失败: {e}",
'bucket_name': bucket_name,
'object_count': len(object_names)
}
def delete_objects_by_prefix(self, bucket_name: str, prefix: str,
recursive: bool = True,
dry_run: bool = False) -> Dict[str, Any]:
"""按前缀删除对象"""
try:
# 获取匹配的对象列表
objects_result = ObjectLister(self.client).list_objects(
bucket_name=bucket_name,
prefix=prefix,
recursive=recursive,
max_keys=10000
)
if not objects_result['success']:
return objects_result
object_names = [obj['object_name'] for obj in objects_result['objects']]
if not object_names:
return {
'success': True,
'message': f"没有找到匹配前缀 '{prefix}' 的对象",
'bucket_name': bucket_name,
'prefix': prefix,
'matched_objects': 0
}
# 如果是试运行,只返回将要删除的对象列表
if dry_run:
return {
'success': True,
'message': f"试运行: 找到 {len(object_names)} 个匹配的对象",
'bucket_name': bucket_name,
'prefix': prefix,
'matched_objects': len(object_names),
'objects_to_delete': object_names,
'total_size': objects_result['total_size'],
'dry_run': True
}
# 执行删除
delete_result = self.delete_objects(bucket_name, object_names)
delete_result['prefix'] = prefix
delete_result['matched_objects'] = len(object_names)
return delete_result
except Exception as e:
return {
'success': False,
'error': f"按前缀删除失败: {e}",
'bucket_name': bucket_name,
'prefix': prefix
}
3.6 综合文件管理示例
3.6.1 完整的文件管理器
”`python class ComprehensiveFileManager: “”“综合文件管理器”“”
def __init__(self, client: Minio):
self.client = client
self.uploader = ObjectManager(client)
self.downloader = ObjectDownloader(client)
self.metadata_manager = ObjectMetadataManager(client)
self.lister = ObjectLister(client)
self.copy_manager = ObjectCopyManager(client)
self.deleter = ObjectDeleter(client)
def upload_directory(self, bucket_name: str, local_dir: str,
remote_prefix: str = "",
include_patterns: List[str] = None,
exclude_patterns: List[str] = None) -> Dict[str, Any]:
"""上传整个目录"""
import fnmatch
results = {
'local_dir': local_dir,
'bucket_name': bucket_name,
'remote_prefix': remote_prefix,
'total_files': 0,
'successful': 0,
'failed': 0,
'total_size': 0,
'uploads': [],
'start_time': datetime.now().isoformat()
}
try:
# 遍历本地目录
local_path = Path(local_dir)
if not local_path.exists():
return {
'success': False,
'error': f"本地目录不存在: {local_dir}"
}
for file_path in local_path.rglob('*'):
if file_path.is_file():
relative_path = file_path.relative_to(local_path)
file_name = str(relative_path).replace('\\', '/')
# 应用包含/排除模式
if include_patterns:
if not any(fnmatch.fnmatch(file_name, pattern) for pattern in include_patterns):
continue
if exclude_patterns:
if any(fnmatch.fnmatch(file_name, pattern) for pattern in exclude_patterns):
continue
# 构建远程对象名称
remote_object = f"{remote_prefix.rstrip('/')}/{file_name}" if remote_prefix else file_name
# 上传文件
upload_result = self.uploader.upload_file(
bucket_name=bucket_name,
object_name=remote_object,
file_path=str(file_path)
)
results['uploads'].append(upload_result)
results['total_files'] += 1
if upload_result['success']:
results['successful'] += 1
results['total_size'] += upload_result['file_size']
else:
results['failed'] += 1
results['end_time'] = datetime.now().isoformat()
results['success'] = results['failed'] == 0
results['success_rate'] = (results['successful'] / results['total_files']) * 100 if results['total_files'] > 0 else 0
except Exception as e:
results['success'] = False
results['error'] = f"上传目录失败: {e}"
return results
def download_directory(self, bucket_name: str, remote_prefix: str,
local_dir: str, create_dirs: bool = True) -> Dict[str, Any]:
"""下载整个目录"""
results = {
'bucket_name': bucket_name,
'remote_prefix': remote_prefix,
'local_dir': local_dir,
'total_objects': 0,
'successful': 0,
'failed': 0,
'total_size': 0,
'downloads': [],
'start_time': datetime.now().isoformat()
}
try:
# 获取远程对象列表
objects_result = self.lister.list_objects(
bucket_name=bucket_name,
prefix=remote_prefix,
recursive=True
)
if not objects_result['success']:
return objects_result
# 创建本地目录
if create_dirs:
os.makedirs(local_dir, exist_ok=True)
for obj in objects_result['objects']:
# 构建本地文件路径
relative_path = obj['object_name']
if remote_prefix:
relative_path = relative_path[len(remote_prefix.rstrip('/')) + 1:]
local_file_path = os.path.join(local_dir, relative_path)
# 创建子目录
if create_dirs:
os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
# 下载文件
download_result = self.downloader.download_file(
bucket_name=bucket_name,
object_name=obj['object_name'],
file_path=local_file_path
)
results['downloads'].append(download_result)
results['total_objects'] += 1
if download_result['success']:
results['successful'] += 1
results['total_size'] += download_result['file_size']
else:
results['failed'] += 1
results['end_time'] = datetime.now().isoformat()
results['success'] = results['failed'] == 0
results['success_rate'] = (results['successful'] / results['total_objects']) * 100 if results['total_objects'] > 0 else 0
except Exception as e:
results['success'] = False
results['error'] = f"下载目录失败: {e}"
return results
def sync_directory(self, bucket_name: str, local_dir: str,
remote_prefix: str = "", direction: str = "upload",
delete_extra: bool = False) -> Dict[str, Any]:
"""同步目录"""
results = {
'direction': direction,
'local_dir': local_dir,
'bucket_name': bucket_name,
'remote_prefix': remote_prefix,
'delete_extra': delete_extra,
'operations': [],
'start_time': datetime.now().isoformat()
}
try:
if direction == "upload":
# 上传同步:本地 -> 远程
upload_result = self.upload_directory(
bucket_name=bucket_name,
local_dir=local_dir,
remote_prefix=remote_prefix
)
results['upload_result'] = upload_result
results['success'] = upload_result['success']
elif direction == "download":
# 下载同步:远程 -> 本地
download_result = self.download_directory(
bucket_name=bucket_name,
remote_prefix=remote_prefix,
local_dir=local_dir
)
results['download_result'] = download_result
results['success'] = download_result['success']
else:
results['success'] = False
results['error'] = f"不支持的同步方向: {direction}"
results['end_time'] = datetime.now().isoformat()
except Exception as e:
results['success'] = False
results['error'] = f"目录同步失败: {e}"
return results
使用示例
def file_management_example(): “”“文件管理示例”“”
print("=== MinIO文件管理示例 ===")
try:
# 连接MinIO
client = Minio(
endpoint="localhost:9000",
access_key="minioadmin",
secret_key="minioadmin",
secure=False
)
# 创建文件管理器
file_manager = ComprehensiveFileManager(client)
bucket_name = "file-management-demo"
# 创建存储桶
if not client.bucket_exists(bucket_name):
client.make_bucket(bucket_name)
print(f"✅ 创建存储桶: {bucket_name}")
# 1. 上传单个文件
print("\n1. 上传单个文件...")
# 创建测试文件
test_file = "test_upload.txt"
with open(test_file, 'w', encoding='utf-8') as f:
f.write("这是一个测试文件\n包含中文内容\n上传时间: " + datetime.now().isoformat())
upload_result = file_manager.uploader.upload_file(
bucket_name=bucket_name,
object_name="uploads/test_file.txt",
file_path=test_file,
metadata={'description': '测试文件', 'category': 'demo'},
tags={'type': 'test', 'env': 'demo'}
)
print(f"上传结果: {upload_result['success']}")
# 2. 获取对象信息
print("\n2. 获取对象信息...")
info_result = file_manager.metadata_manager.get_object_info(
bucket_name, "uploads/test_file.txt"
)
if info_result['success']:
print(f"文件大小: {info_result['size']} 字节")
print(f"内容类型: {info_result['content_type']}")
print(f"元数据: {info_result['metadata']}")
print(f"标签: {info_result['tags']}")
# 3. 下载文件
print("\n3. 下载文件...")
download_result = file_manager.downloader.download_file(
bucket_name=bucket_name,
object_name="uploads/test_file.txt",
file_path="downloaded_test_file.txt"
)
print(f"下载结果: {download_result['success']}")
# 4. 搜索对象
print("\n4. 搜索对象...")
search_result = file_manager.lister.search_objects(
bucket_name=bucket_name,
search_term="test",
search_in=['object_name', 'metadata']
)
print(f"搜索到 {search_result['total_matched']} 个匹配对象")
# 5. 复制对象
print("\n5. 复制对象...")
copy_result = file_manager.copy_manager.copy_object(
source_bucket=bucket_name,
source_object="uploads/test_file.txt",
dest_bucket=bucket_name,
dest_object="backups/test_file_backup.txt"
)
print(f"复制结果: {copy_result['success']}")
# 6. 列出所有对象
print("\n6. 列出所有对象...")
list_result = file_manager.lister.list_objects(bucket_name)
print(f"存储桶中共有 {list_result['total_objects']} 个对象")
for obj in list_result['objects']:
print(f" - {obj['object_name']} ({obj['size_mb']} MB)")
# 清理测试文件
os.remove(test_file)
os.remove("downloaded_test_file.txt")
print("\n✅ 文件管理示例完成!")
except Exception as e:
print(f"❌ 示例执行失败: {e}")
if name == “main”: file_management_example()
3.7 总结
本章详细介绍了MinIO的对象操作和文件管理功能:
3.7.1 核心功能
- 对象基本操作:上传、下载、元数据管理
- 对象列表和搜索:列表、筛选、搜索功能
- 对象复制和移动:单个和批量操作
- 对象删除管理:安全删除和批量删除
- 目录同步:上传、下载整个目录结构
3.7.2 最佳实践
- 合理设置对象元数据和标签
- 使用进度回调监控大文件传输
- 实施适当的错误处理和重试机制
- 利用批量操作提高效率
- 定期清理不需要的对象
3.7.3 下一步学习
- 预签名URL和临时访问
- 多部分上传和断点续传
- 对象生命周期管理
- 跨区域复制和同步
下一章将介绍MinIO的预签名URL和安全访问控制。