2.1 存储桶概述
2.1.1 什么是存储桶
存储桶(Bucket)是MinIO中的顶级容器,用于组织和管理对象。每个存储桶都有唯一的名称,并且可以包含无限数量的对象。
”`python from minio import Minio from minio.error import S3Error from datetime import datetime, timedelta import json from typing import List, Dict, Any, Optional import re
class BucketManager: “”“存储桶管理器”“”
def __init__(self, client: Minio):
self.client = client
@staticmethod
def validate_bucket_name(bucket_name: str) -> Dict[str, Any]:
"""验证存储桶名称"""
rules = {
'length': (3, 63),
'pattern': r'^[a-z0-9][a-z0-9.-]*[a-z0-9]$',
'no_ip': r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$',
'no_consecutive_dots': r'\.\.'
}
errors = []
warnings = []
# 检查长度
if not (rules['length'][0] <= len(bucket_name) <= rules['length'][1]):
errors.append(f"名称长度必须在{rules['length'][0]}-{rules['length'][1]}字符之间")
# 检查字符模式
if not re.match(rules['pattern'], bucket_name):
errors.append("名称只能包含小写字母、数字、点(.)和连字符(-)")
# 检查是否为IP地址格式
if re.match(rules['no_ip'], bucket_name):
errors.append("名称不能是IP地址格式")
# 检查连续的点
if re.search(rules['no_consecutive_dots'], bucket_name):
errors.append("名称不能包含连续的点(..)")
# 检查特殊前缀
if bucket_name.startswith('xn--'):
warnings.append("避免使用'xn--'前缀")
return {
'valid': len(errors) == 0,
'errors': errors,
'warnings': warnings,
'bucket_name': bucket_name
}
def create_bucket(self, bucket_name: str, region: str = None,
object_lock: bool = False) -> Dict[str, Any]:
"""创建存储桶"""
try:
# 验证存储桶名称
validation = self.validate_bucket_name(bucket_name)
if not validation['valid']:
return {
'success': False,
'error': f"存储桶名称无效: {', '.join(validation['errors'])}",
'validation': validation
}
# 检查存储桶是否已存在
if self.client.bucket_exists(bucket_name):
return {
'success': False,
'error': f"存储桶 '{bucket_name}' 已存在",
'bucket_name': bucket_name
}
# 创建存储桶
self.client.make_bucket(
bucket_name=bucket_name,
location=region,
object_lock=object_lock
)
return {
'success': True,
'message': f"存储桶 '{bucket_name}' 创建成功",
'bucket_name': bucket_name,
'region': region,
'object_lock': object_lock,
'created_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"创建存储桶失败: {e}",
'bucket_name': bucket_name
}
def list_buckets(self, detailed: bool = False) -> Dict[str, Any]:
"""列出所有存储桶"""
try:
buckets = self.client.list_buckets()
bucket_list = []
for bucket in buckets:
bucket_info = {
'name': bucket.name,
'creation_date': bucket.creation_date.isoformat() if bucket.creation_date else None
}
if detailed:
# 获取详细信息
try:
# 获取存储桶策略
try:
policy = self.client.get_bucket_policy(bucket.name)
bucket_info['has_policy'] = True
bucket_info['policy_size'] = len(policy)
except:
bucket_info['has_policy'] = False
# 获取对象数量(仅前1000个)
objects = list(self.client.list_objects(bucket.name, recursive=True))
bucket_info['object_count'] = len(objects)
bucket_info['total_size'] = sum(obj.size for obj in objects if obj.size)
except Exception as e:
bucket_info['detail_error'] = str(e)
bucket_list.append(bucket_info)
return {
'success': True,
'total_buckets': len(bucket_list),
'buckets': bucket_list,
'retrieved_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"列出存储桶失败: {e}"
}
def delete_bucket(self, bucket_name: str, force: bool = False) -> Dict[str, Any]:
"""删除存储桶"""
try:
# 检查存储桶是否存在
if not self.client.bucket_exists(bucket_name):
return {
'success': False,
'error': f"存储桶 '{bucket_name}' 不存在",
'bucket_name': bucket_name
}
# 如果强制删除,先删除所有对象
if force:
try:
objects = self.client.list_objects(bucket_name, recursive=True)
object_names = [obj.object_name for obj in objects]
if object_names:
# 批量删除对象
errors = self.client.remove_objects(
bucket_name,
[self.client.delete_object_tags(bucket_name, name) for name in object_names]
)
# 检查删除错误
error_list = list(errors)
if error_list:
return {
'success': False,
'error': f"删除对象时出错: {error_list}",
'bucket_name': bucket_name
}
print(f"已删除 {len(object_names)} 个对象")
except Exception as e:
return {
'success': False,
'error': f"清空存储桶失败: {e}",
'bucket_name': bucket_name
}
# 删除存储桶
self.client.remove_bucket(bucket_name)
return {
'success': True,
'message': f"存储桶 '{bucket_name}' 删除成功",
'bucket_name': bucket_name,
'force_delete': force,
'deleted_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"删除存储桶失败: {e}",
'bucket_name': bucket_name
}
2.2 存储桶策略管理
2.2.1 访问策略配置
”`python import json from typing import Union
class BucketPolicyManager: “”“存储桶策略管理器”“”
def __init__(self, client: Minio):
self.client = client
def create_policy(self, effect: str, principal: str, action: Union[str, List[str]],
resource: Union[str, List[str]], condition: Dict = None) -> Dict[str, Any]:
"""创建策略语句"""
# 确保action和resource是列表
if isinstance(action, str):
action = [action]
if isinstance(resource, str):
resource = [resource]
statement = {
"Effect": effect,
"Principal": principal,
"Action": action,
"Resource": resource
}
if condition:
statement["Condition"] = condition
return statement
def create_bucket_policy(self, statements: List[Dict[str, Any]]) -> str:
"""创建完整的存储桶策略"""
policy = {
"Version": "2012-10-17",
"Statement": statements
}
return json.dumps(policy, indent=2)
def set_public_read_policy(self, bucket_name: str, prefix: str = "*") -> Dict[str, Any]:
"""设置公共读取策略"""
try:
# 创建公共读取策略
statement = self.create_policy(
effect="Allow",
principal="*",
action="s3:GetObject",
resource=f"arn:aws:s3:::{bucket_name}/{prefix}"
)
policy_json = self.create_bucket_policy([statement])
# 应用策略
self.client.set_bucket_policy(bucket_name, policy_json)
return {
'success': True,
'message': f"存储桶 '{bucket_name}' 公共读取策略设置成功",
'bucket_name': bucket_name,
'prefix': prefix,
'policy': policy_json
}
except S3Error as e:
return {
'success': False,
'error': f"设置策略失败: {e}",
'bucket_name': bucket_name
}
def set_authenticated_read_write_policy(self, bucket_name: str,
user_arn: str = None) -> Dict[str, Any]:
"""设置认证用户读写策略"""
try:
principal = user_arn if user_arn else {"AWS": "*"}
statements = [
# 读取权限
self.create_policy(
effect="Allow",
principal=principal,
action=["s3:GetObject", "s3:GetObjectVersion"],
resource=f"arn:aws:s3:::{bucket_name}/*"
),
# 写入权限
self.create_policy(
effect="Allow",
principal=principal,
action=["s3:PutObject", "s3:DeleteObject"],
resource=f"arn:aws:s3:::{bucket_name}/*"
),
# 列表权限
self.create_policy(
effect="Allow",
principal=principal,
action="s3:ListBucket",
resource=f"arn:aws:s3:::{bucket_name}"
)
]
policy_json = self.create_bucket_policy(statements)
# 应用策略
self.client.set_bucket_policy(bucket_name, policy_json)
return {
'success': True,
'message': f"存储桶 '{bucket_name}' 认证用户读写策略设置成功",
'bucket_name': bucket_name,
'user_arn': user_arn,
'policy': policy_json
}
except S3Error as e:
return {
'success': False,
'error': f"设置策略失败: {e}",
'bucket_name': bucket_name
}
def set_time_limited_policy(self, bucket_name: str,
expiry_hours: int = 24) -> Dict[str, Any]:
"""设置时间限制策略"""
try:
# 计算过期时间
expiry_time = datetime.now() + timedelta(hours=expiry_hours)
# 创建时间条件
condition = {
"DateLessThan": {
"aws:CurrentTime": expiry_time.strftime("%Y-%m-%dT%H:%M:%SZ")
}
}
statement = self.create_policy(
effect="Allow",
principal="*",
action="s3:GetObject",
resource=f"arn:aws:s3:::{bucket_name}/*",
condition=condition
)
policy_json = self.create_bucket_policy([statement])
# 应用策略
self.client.set_bucket_policy(bucket_name, policy_json)
return {
'success': True,
'message': f"存储桶 '{bucket_name}' 时间限制策略设置成功",
'bucket_name': bucket_name,
'expiry_time': expiry_time.isoformat(),
'expiry_hours': expiry_hours,
'policy': policy_json
}
except S3Error as e:
return {
'success': False,
'error': f"设置策略失败: {e}",
'bucket_name': bucket_name
}
def get_bucket_policy(self, bucket_name: str) -> Dict[str, Any]:
"""获取存储桶策略"""
try:
policy_json = self.client.get_bucket_policy(bucket_name)
policy_dict = json.loads(policy_json)
return {
'success': True,
'bucket_name': bucket_name,
'policy': policy_dict,
'policy_json': policy_json,
'statement_count': len(policy_dict.get('Statement', []))
}
except S3Error as e:
if "NoSuchBucketPolicy" in str(e):
return {
'success': True,
'bucket_name': bucket_name,
'policy': None,
'message': "存储桶没有设置策略"
}
else:
return {
'success': False,
'error': f"获取策略失败: {e}",
'bucket_name': bucket_name
}
def delete_bucket_policy(self, bucket_name: str) -> Dict[str, Any]:
"""删除存储桶策略"""
try:
self.client.delete_bucket_policy(bucket_name)
return {
'success': True,
'message': f"存储桶 '{bucket_name}' 策略删除成功",
'bucket_name': bucket_name,
'deleted_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"删除策略失败: {e}",
'bucket_name': bucket_name
}
2.3 存储桶通知配置
2.3.1 事件通知设置
”`python from minio.notificationconfig import ( NotificationConfig, QueueConfig, TopicConfig, CloudFuncConfig ) from minio.commonconfig import Filter, FilterRule
class BucketNotificationManager: “”“存储桶通知管理器”“”
def __init__(self, client: Minio):
self.client = client
def create_queue_notification(self, queue_arn: str, events: List[str],
prefix_filter: str = None,
suffix_filter: str = None) -> QueueConfig:
"""创建队列通知配置"""
# 创建过滤器
filter_rules = []
if prefix_filter:
filter_rules.append(FilterRule("prefix", prefix_filter))
if suffix_filter:
filter_rules.append(FilterRule("suffix", suffix_filter))
filter_config = Filter(filter_rules) if filter_rules else None
# 创建队列配置
queue_config = QueueConfig(
queue_arn=queue_arn,
events=events,
config_filter=filter_config
)
return queue_config
def create_topic_notification(self, topic_arn: str, events: List[str],
prefix_filter: str = None,
suffix_filter: str = None) -> TopicConfig:
"""创建主题通知配置"""
# 创建过滤器
filter_rules = []
if prefix_filter:
filter_rules.append(FilterRule("prefix", prefix_filter))
if suffix_filter:
filter_rules.append(FilterRule("suffix", suffix_filter))
filter_config = Filter(filter_rules) if filter_rules else None
# 创建主题配置
topic_config = TopicConfig(
topic_arn=topic_arn,
events=events,
config_filter=filter_config
)
return topic_config
def set_bucket_notification(self, bucket_name: str,
queue_configs: List[QueueConfig] = None,
topic_configs: List[TopicConfig] = None,
cloud_func_configs: List[CloudFuncConfig] = None) -> Dict[str, Any]:
"""设置存储桶通知"""
try:
# 创建通知配置
notification_config = NotificationConfig(
queue_config_list=queue_configs or [],
topic_config_list=topic_configs or [],
cloud_func_config_list=cloud_func_configs or []
)
# 应用通知配置
self.client.set_bucket_notification(bucket_name, notification_config)
return {
'success': True,
'message': f"存储桶 '{bucket_name}' 通知配置设置成功",
'bucket_name': bucket_name,
'queue_configs': len(queue_configs or []),
'topic_configs': len(topic_configs or []),
'cloud_func_configs': len(cloud_func_configs or []),
'configured_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"设置通知配置失败: {e}",
'bucket_name': bucket_name
}
def get_bucket_notification(self, bucket_name: str) -> Dict[str, Any]:
"""获取存储桶通知配置"""
try:
notification_config = self.client.get_bucket_notification(bucket_name)
return {
'success': True,
'bucket_name': bucket_name,
'queue_configs': len(notification_config.queue_config_list),
'topic_configs': len(notification_config.topic_config_list),
'cloud_func_configs': len(notification_config.cloud_func_config_list),
'notification_config': notification_config
}
except S3Error as e:
return {
'success': False,
'error': f"获取通知配置失败: {e}",
'bucket_name': bucket_name
}
def delete_bucket_notification(self, bucket_name: str) -> Dict[str, Any]:
"""删除存储桶通知配置"""
try:
# 创建空的通知配置
empty_notification = NotificationConfig()
# 应用空配置(删除所有通知)
self.client.set_bucket_notification(bucket_name, empty_notification)
return {
'success': True,
'message': f"存储桶 '{bucket_name}' 通知配置删除成功",
'bucket_name': bucket_name,
'deleted_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"删除通知配置失败: {e}",
'bucket_name': bucket_name
}
2.4 存储桶版本控制
2.4.1 版本控制管理
”`python from minio.versioningconfig import VersioningConfig, ENABLED, SUSPENDED
class BucketVersioningManager: “”“存储桶版本控制管理器”“”
def __init__(self, client: Minio):
self.client = client
def enable_versioning(self, bucket_name: str) -> Dict[str, Any]:
"""启用版本控制"""
try:
# 创建版本控制配置
versioning_config = VersioningConfig(ENABLED)
# 应用版本控制配置
self.client.set_bucket_versioning(bucket_name, versioning_config)
return {
'success': True,
'message': f"存储桶 '{bucket_name}' 版本控制已启用",
'bucket_name': bucket_name,
'status': 'Enabled',
'enabled_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"启用版本控制失败: {e}",
'bucket_name': bucket_name
}
def suspend_versioning(self, bucket_name: str) -> Dict[str, Any]:
"""暂停版本控制"""
try:
# 创建版本控制配置
versioning_config = VersioningConfig(SUSPENDED)
# 应用版本控制配置
self.client.set_bucket_versioning(bucket_name, versioning_config)
return {
'success': True,
'message': f"存储桶 '{bucket_name}' 版本控制已暂停",
'bucket_name': bucket_name,
'status': 'Suspended',
'suspended_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"暂停版本控制失败: {e}",
'bucket_name': bucket_name
}
def get_versioning_status(self, bucket_name: str) -> Dict[str, Any]:
"""获取版本控制状态"""
try:
versioning_config = self.client.get_bucket_versioning(bucket_name)
return {
'success': True,
'bucket_name': bucket_name,
'status': versioning_config.status,
'mfa_delete': getattr(versioning_config, 'mfa_delete', None),
'retrieved_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"获取版本控制状态失败: {e}",
'bucket_name': bucket_name
}
def list_object_versions(self, bucket_name: str, prefix: str = None,
max_keys: int = 1000) -> Dict[str, Any]:
"""列出对象版本"""
try:
# 获取对象版本列表
versions = self.client.list_objects(
bucket_name=bucket_name,
prefix=prefix,
recursive=True,
include_version=True
)
version_list = []
for version in versions:
version_info = {
'object_name': version.object_name,
'version_id': getattr(version, 'version_id', None),
'is_latest': getattr(version, 'is_latest', True),
'size': version.size,
'etag': version.etag,
'last_modified': version.last_modified.isoformat() if version.last_modified else None,
'storage_class': getattr(version, 'storage_class', None)
}
version_list.append(version_info)
if len(version_list) >= max_keys:
break
return {
'success': True,
'bucket_name': bucket_name,
'prefix': prefix,
'total_versions': len(version_list),
'versions': version_list,
'retrieved_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"列出对象版本失败: {e}",
'bucket_name': bucket_name
}
2.5 存储桶加密配置
2.5.1 服务端加密设置
”`python from minio.sseconfig import SSEConfig, Rule from minio.sse import SseS3, SseKms
class BucketEncryptionManager: “”“存储桶加密管理器”“”
def __init__(self, client: Minio):
self.client = client
def set_default_encryption(self, bucket_name: str,
encryption_type: str = "AES256",
kms_key_id: str = None) -> Dict[str, Any]:
"""设置默认加密"""
try:
# 根据加密类型创建SSE配置
if encryption_type.upper() == "AES256":
sse = SseS3()
elif encryption_type.upper() == "KMS":
if not kms_key_id:
return {
'success': False,
'error': "KMS加密需要提供密钥ID",
'bucket_name': bucket_name
}
sse = SseKms(kms_key_id)
else:
return {
'success': False,
'error': f"不支持的加密类型: {encryption_type}",
'bucket_name': bucket_name
}
# 创建加密规则
rule = Rule(sse)
# 创建SSE配置
sse_config = SSEConfig(rule)
# 应用加密配置
self.client.set_bucket_encryption(bucket_name, sse_config)
return {
'success': True,
'message': f"存储桶 '{bucket_name}' 默认加密设置成功",
'bucket_name': bucket_name,
'encryption_type': encryption_type,
'kms_key_id': kms_key_id,
'configured_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"设置加密配置失败: {e}",
'bucket_name': bucket_name
}
def get_bucket_encryption(self, bucket_name: str) -> Dict[str, Any]:
"""获取存储桶加密配置"""
try:
sse_config = self.client.get_bucket_encryption(bucket_name)
# 解析加密配置
rule = sse_config.rule
sse = rule.sse
encryption_info = {
'algorithm': sse.algorithm if hasattr(sse, 'algorithm') else None,
'kms_key_id': sse.key if hasattr(sse, 'key') else None,
'type': type(sse).__name__
}
return {
'success': True,
'bucket_name': bucket_name,
'encryption': encryption_info,
'sse_config': sse_config,
'retrieved_at': datetime.now().isoformat()
}
except S3Error as e:
if "ServerSideEncryptionConfigurationNotFoundError" in str(e):
return {
'success': True,
'bucket_name': bucket_name,
'encryption': None,
'message': "存储桶没有设置加密配置"
}
else:
return {
'success': False,
'error': f"获取加密配置失败: {e}",
'bucket_name': bucket_name
}
def delete_bucket_encryption(self, bucket_name: str) -> Dict[str, Any]:
"""删除存储桶加密配置"""
try:
self.client.delete_bucket_encryption(bucket_name)
return {
'success': True,
'message': f"存储桶 '{bucket_name}' 加密配置删除成功",
'bucket_name': bucket_name,
'deleted_at': datetime.now().isoformat()
}
except S3Error as e:
return {
'success': False,
'error': f"删除加密配置失败: {e}",
'bucket_name': bucket_name
}
2.6 综合管理示例
2.6.1 完整的存储桶管理类
”`python class ComprehensiveBucketManager: “”“综合存储桶管理器”“”
def __init__(self, client: Minio):
self.client = client
self.bucket_manager = BucketManager(client)
self.policy_manager = BucketPolicyManager(client)
self.notification_manager = BucketNotificationManager(client)
self.versioning_manager = BucketVersioningManager(client)
self.encryption_manager = BucketEncryptionManager(client)
def create_production_bucket(self, bucket_name: str,
config: Dict[str, Any]) -> Dict[str, Any]:
"""创建生产环境存储桶"""
results = {
'bucket_name': bucket_name,
'operations': [],
'success': True,
'errors': []
}
try:
# 1. 创建存储桶
create_result = self.bucket_manager.create_bucket(
bucket_name=bucket_name,
region=config.get('region'),
object_lock=config.get('object_lock', False)
)
results['operations'].append(('create_bucket', create_result))
if not create_result['success']:
results['success'] = False
results['errors'].append(create_result['error'])
return results
# 2. 设置版本控制
if config.get('enable_versioning', False):
versioning_result = self.versioning_manager.enable_versioning(bucket_name)
results['operations'].append(('enable_versioning', versioning_result))
if not versioning_result['success']:
results['errors'].append(versioning_result['error'])
# 3. 设置加密
encryption_config = config.get('encryption')
if encryption_config:
encryption_result = self.encryption_manager.set_default_encryption(
bucket_name=bucket_name,
encryption_type=encryption_config.get('type', 'AES256'),
kms_key_id=encryption_config.get('kms_key_id')
)
results['operations'].append(('set_encryption', encryption_result))
if not encryption_result['success']:
results['errors'].append(encryption_result['error'])
# 4. 设置访问策略
policy_config = config.get('policy')
if policy_config:
policy_type = policy_config.get('type')
if policy_type == 'public_read':
policy_result = self.policy_manager.set_public_read_policy(
bucket_name=bucket_name,
prefix=policy_config.get('prefix', '*')
)
elif policy_type == 'authenticated_rw':
policy_result = self.policy_manager.set_authenticated_read_write_policy(
bucket_name=bucket_name,
user_arn=policy_config.get('user_arn')
)
elif policy_type == 'time_limited':
policy_result = self.policy_manager.set_time_limited_policy(
bucket_name=bucket_name,
expiry_hours=policy_config.get('expiry_hours', 24)
)
else:
policy_result = {'success': False, 'error': f"未知策略类型: {policy_type}"}
results['operations'].append(('set_policy', policy_result))
if not policy_result['success']:
results['errors'].append(policy_result['error'])
# 5. 设置通知
notification_config = config.get('notifications')
if notification_config:
queue_configs = []
topic_configs = []
# 处理队列通知
for queue_config in notification_config.get('queues', []):
queue = self.notification_manager.create_queue_notification(
queue_arn=queue_config['arn'],
events=queue_config['events'],
prefix_filter=queue_config.get('prefix'),
suffix_filter=queue_config.get('suffix')
)
queue_configs.append(queue)
# 处理主题通知
for topic_config in notification_config.get('topics', []):
topic = self.notification_manager.create_topic_notification(
topic_arn=topic_config['arn'],
events=topic_config['events'],
prefix_filter=topic_config.get('prefix'),
suffix_filter=topic_config.get('suffix')
)
topic_configs.append(topic)
if queue_configs or topic_configs:
notification_result = self.notification_manager.set_bucket_notification(
bucket_name=bucket_name,
queue_configs=queue_configs,
topic_configs=topic_configs
)
results['operations'].append(('set_notifications', notification_result))
if not notification_result['success']:
results['errors'].append(notification_result['error'])
# 设置最终成功状态
if results['errors']:
results['success'] = False
results['message'] = f"存储桶创建完成,但有 {len(results['errors'])} 个错误"
else:
results['message'] = f"生产环境存储桶 '{bucket_name}' 创建成功"
results['created_at'] = datetime.now().isoformat()
except Exception as e:
results['success'] = False
results['error'] = f"创建生产环境存储桶失败: {e}"
return results
def get_bucket_summary(self, bucket_name: str) -> Dict[str, Any]:
"""获取存储桶完整摘要"""
summary = {
'bucket_name': bucket_name,
'exists': False,
'basic_info': {},
'policy': {},
'versioning': {},
'encryption': {},
'notifications': {},
'objects': {},
'retrieved_at': datetime.now().isoformat()
}
try:
# 检查存储桶是否存在
if not self.client.bucket_exists(bucket_name):
summary['error'] = f"存储桶 '{bucket_name}' 不存在"
return summary
summary['exists'] = True
# 获取基本信息
buckets_result = self.bucket_manager.list_buckets(detailed=True)
if buckets_result['success']:
for bucket in buckets_result['buckets']:
if bucket['name'] == bucket_name:
summary['basic_info'] = bucket
break
# 获取策略信息
summary['policy'] = self.policy_manager.get_bucket_policy(bucket_name)
# 获取版本控制信息
summary['versioning'] = self.versioning_manager.get_versioning_status(bucket_name)
# 获取加密信息
summary['encryption'] = self.encryption_manager.get_bucket_encryption(bucket_name)
# 获取通知信息
summary['notifications'] = self.notification_manager.get_bucket_notification(bucket_name)
# 获取对象统计
try:
objects = list(self.client.list_objects(bucket_name, recursive=True))
total_size = sum(obj.size for obj in objects if obj.size)
summary['objects'] = {
'count': len(objects),
'total_size': total_size,
'total_size_mb': round(total_size / (1024 * 1024), 2) if total_size else 0
}
except Exception as e:
summary['objects'] = {'error': str(e)}
except Exception as e:
summary['error'] = f"获取存储桶摘要失败: {e}"
return summary
使用示例
def bucket_management_example(): “”“存储桶管理示例”“”
print("=== MinIO存储桶管理示例 ===")
try:
# 连接MinIO
client = Minio(
endpoint="localhost:9000",
access_key="minioadmin",
secret_key="minioadmin",
secure=False
)
# 创建综合管理器
manager = ComprehensiveBucketManager(client)
# 生产环境配置
production_config = {
'region': 'us-east-1',
'enable_versioning': True,
'object_lock': False,
'encryption': {
'type': 'AES256'
},
'policy': {
'type': 'authenticated_rw'
}
}
# 创建生产环境存储桶
print("\n1. 创建生产环境存储桶...")
bucket_name = "production-data-bucket"
create_result = manager.create_production_bucket(bucket_name, production_config)
print(f"创建结果: {create_result['success']}")
if create_result['errors']:
print(f"错误: {create_result['errors']}")
# 获取存储桶摘要
print("\n2. 获取存储桶摘要...")
summary = manager.get_bucket_summary(bucket_name)
print(json.dumps(summary, indent=2, ensure_ascii=False, default=str))
print("\n✅ 存储桶管理示例完成!")
except Exception as e:
print(f"❌ 示例执行失败: {e}")
if name == “main”: bucket_management_example()
2.7 总结
本章详细介绍了MinIO存储桶的管理功能:
2.7.1 核心功能
- 存储桶基本操作:创建、删除、列表、验证
- 访问策略管理:公共访问、认证访问、时间限制
- 事件通知配置:队列通知、主题通知、过滤规则
- 版本控制管理:启用、暂停、版本列表
- 加密配置:服务端加密、KMS集成
2.7.2 最佳实践
- 遵循存储桶命名规范
- 合理设置访问策略
- 启用版本控制保护数据
- 配置适当的加密策略
- 设置必要的事件通知
2.7.3 下一步学习
- 对象操作和元数据管理
- 预签名URL和临时访问
- 多部分上传和大文件处理
- 生命周期管理和自动化
下一章将介绍MinIO的对象操作和文件管理功能。