2.1 存储桶概述

2.1.1 什么是存储桶

存储桶(Bucket)是MinIO中的顶级容器,用于组织和管理对象。每个存储桶都有唯一的名称,并且可以包含无限数量的对象。

”`python from minio import Minio from minio.error import S3Error from datetime import datetime, timedelta import json from typing import List, Dict, Any, Optional import re

class BucketManager: “”“存储桶管理器”“”

def __init__(self, client: Minio):
    self.client = client

@staticmethod
def validate_bucket_name(bucket_name: str) -> Dict[str, Any]:
    """验证存储桶名称"""
    rules = {
        'length': (3, 63),
        'pattern': r'^[a-z0-9][a-z0-9.-]*[a-z0-9]$',
        'no_ip': r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$',
        'no_consecutive_dots': r'\.\.'
    }

    errors = []
    warnings = []

    # 检查长度
    if not (rules['length'][0] <= len(bucket_name) <= rules['length'][1]):
        errors.append(f"名称长度必须在{rules['length'][0]}-{rules['length'][1]}字符之间")

    # 检查字符模式
    if not re.match(rules['pattern'], bucket_name):
        errors.append("名称只能包含小写字母、数字、点(.)和连字符(-)")

    # 检查是否为IP地址格式
    if re.match(rules['no_ip'], bucket_name):
        errors.append("名称不能是IP地址格式")

    # 检查连续的点
    if re.search(rules['no_consecutive_dots'], bucket_name):
        errors.append("名称不能包含连续的点(..)")

    # 检查特殊前缀
    if bucket_name.startswith('xn--'):
        warnings.append("避免使用'xn--'前缀")

    return {
        'valid': len(errors) == 0,
        'errors': errors,
        'warnings': warnings,
        'bucket_name': bucket_name
    }

def create_bucket(self, bucket_name: str, region: str = None, 
                 object_lock: bool = False) -> Dict[str, Any]:
    """创建存储桶"""
    try:
        # 验证存储桶名称
        validation = self.validate_bucket_name(bucket_name)
        if not validation['valid']:
            return {
                'success': False,
                'error': f"存储桶名称无效: {', '.join(validation['errors'])}",
                'validation': validation
            }

        # 检查存储桶是否已存在
        if self.client.bucket_exists(bucket_name):
            return {
                'success': False,
                'error': f"存储桶 '{bucket_name}' 已存在",
                'bucket_name': bucket_name
            }

        # 创建存储桶
        self.client.make_bucket(
            bucket_name=bucket_name,
            location=region,
            object_lock=object_lock
        )

        return {
            'success': True,
            'message': f"存储桶 '{bucket_name}' 创建成功",
            'bucket_name': bucket_name,
            'region': region,
            'object_lock': object_lock,
            'created_at': datetime.now().isoformat()
        }

    except S3Error as e:
        return {
            'success': False,
            'error': f"创建存储桶失败: {e}",
            'bucket_name': bucket_name
        }

def list_buckets(self, detailed: bool = False) -> Dict[str, Any]:
    """列出所有存储桶"""
    try:
        buckets = self.client.list_buckets()
        bucket_list = []

        for bucket in buckets:
            bucket_info = {
                'name': bucket.name,
                'creation_date': bucket.creation_date.isoformat() if bucket.creation_date else None
            }

            if detailed:
                # 获取详细信息
                try:
                    # 获取存储桶策略
                    try:
                        policy = self.client.get_bucket_policy(bucket.name)
                        bucket_info['has_policy'] = True
                        bucket_info['policy_size'] = len(policy)
                    except:
                        bucket_info['has_policy'] = False

                    # 获取对象数量(仅前1000个)
                    objects = list(self.client.list_objects(bucket.name, recursive=True))
                    bucket_info['object_count'] = len(objects)
                    bucket_info['total_size'] = sum(obj.size for obj in objects if obj.size)

                except Exception as e:
                    bucket_info['detail_error'] = str(e)

            bucket_list.append(bucket_info)

        return {
            'success': True,
            'total_buckets': len(bucket_list),
            'buckets': bucket_list,
            'retrieved_at': datetime.now().isoformat()
        }

    except S3Error as e:
        return {
            'success': False,
            'error': f"列出存储桶失败: {e}"
        }

def delete_bucket(self, bucket_name: str, force: bool = False) -> Dict[str, Any]:
    """删除存储桶"""
    try:
        # 检查存储桶是否存在
        if not self.client.bucket_exists(bucket_name):
            return {
                'success': False,
                'error': f"存储桶 '{bucket_name}' 不存在",
                'bucket_name': bucket_name
            }

        # 如果强制删除,先删除所有对象
        if force:
            try:
                objects = self.client.list_objects(bucket_name, recursive=True)
                object_names = [obj.object_name for obj in objects]

                if object_names:
                    # 批量删除对象
                    errors = self.client.remove_objects(
                        bucket_name,
                        [self.client.delete_object_tags(bucket_name, name) for name in object_names]
                    )

                    # 检查删除错误
                    error_list = list(errors)
                    if error_list:
                        return {
                            'success': False,
                            'error': f"删除对象时出错: {error_list}",
                            'bucket_name': bucket_name
                        }

                    print(f"已删除 {len(object_names)} 个对象")

            except Exception as e:
                return {
                    'success': False,
                    'error': f"清空存储桶失败: {e}",
                    'bucket_name': bucket_name
                }

        # 删除存储桶
        self.client.remove_bucket(bucket_name)

        return {
            'success': True,
            'message': f"存储桶 '{bucket_name}' 删除成功",
            'bucket_name': bucket_name,
            'force_delete': force,
            'deleted_at': datetime.now().isoformat()
        }

    except S3Error as e:
        return {
            'success': False,
            'error': f"删除存储桶失败: {e}",
            'bucket_name': bucket_name
        }

2.2 存储桶策略管理

2.2.1 访问策略配置

”`python import json from typing import Union

class BucketPolicyManager: “”“存储桶策略管理器”“”

def __init__(self, client: Minio):
    self.client = client

def create_policy(self, effect: str, principal: str, action: Union[str, List[str]], 
                 resource: Union[str, List[str]], condition: Dict = None) -> Dict[str, Any]:
    """创建策略语句"""

    # 确保action和resource是列表
    if isinstance(action, str):
        action = [action]
    if isinstance(resource, str):
        resource = [resource]

    statement = {
        "Effect": effect,
        "Principal": principal,
        "Action": action,
        "Resource": resource
    }

    if condition:
        statement["Condition"] = condition

    return statement

def create_bucket_policy(self, statements: List[Dict[str, Any]]) -> str:
    """创建完整的存储桶策略"""
    policy = {
        "Version": "2012-10-17",
        "Statement": statements
    }

    return json.dumps(policy, indent=2)

def set_public_read_policy(self, bucket_name: str, prefix: str = "*") -> Dict[str, Any]:
    """设置公共读取策略"""
    try:
        # 创建公共读取策略
        statement = self.create_policy(
            effect="Allow",
            principal="*",
            action="s3:GetObject",
            resource=f"arn:aws:s3:::{bucket_name}/{prefix}"
        )

        policy_json = self.create_bucket_policy([statement])

        # 应用策略
        self.client.set_bucket_policy(bucket_name, policy_json)

        return {
            'success': True,
            'message': f"存储桶 '{bucket_name}' 公共读取策略设置成功",
            'bucket_name': bucket_name,
            'prefix': prefix,
            'policy': policy_json
        }

    except S3Error as e:
        return {
            'success': False,
            'error': f"设置策略失败: {e}",
            'bucket_name': bucket_name
        }

def set_authenticated_read_write_policy(self, bucket_name: str, 
                                      user_arn: str = None) -> Dict[str, Any]:
    """设置认证用户读写策略"""
    try:
        principal = user_arn if user_arn else {"AWS": "*"}

        statements = [
            # 读取权限
            self.create_policy(
                effect="Allow",
                principal=principal,
                action=["s3:GetObject", "s3:GetObjectVersion"],
                resource=f"arn:aws:s3:::{bucket_name}/*"
            ),
            # 写入权限
            self.create_policy(
                effect="Allow",
                principal=principal,
                action=["s3:PutObject", "s3:DeleteObject"],
                resource=f"arn:aws:s3:::{bucket_name}/*"
            ),
            # 列表权限
            self.create_policy(
                effect="Allow",
                principal=principal,
                action="s3:ListBucket",
                resource=f"arn:aws:s3:::{bucket_name}"
            )
        ]

        policy_json = self.create_bucket_policy(statements)

        # 应用策略
        self.client.set_bucket_policy(bucket_name, policy_json)

        return {
            'success': True,
            'message': f"存储桶 '{bucket_name}' 认证用户读写策略设置成功",
            'bucket_name': bucket_name,
            'user_arn': user_arn,
            'policy': policy_json
        }

    except S3Error as e:
        return {
            'success': False,
            'error': f"设置策略失败: {e}",
            'bucket_name': bucket_name
        }

def set_time_limited_policy(self, bucket_name: str, 
                           expiry_hours: int = 24) -> Dict[str, Any]:
    """设置时间限制策略"""
    try:
        # 计算过期时间
        expiry_time = datetime.now() + timedelta(hours=expiry_hours)

        # 创建时间条件
        condition = {
            "DateLessThan": {
                "aws:CurrentTime": expiry_time.strftime("%Y-%m-%dT%H:%M:%SZ")
            }
        }

        statement = self.create_policy(
            effect="Allow",
            principal="*",
            action="s3:GetObject",
            resource=f"arn:aws:s3:::{bucket_name}/*",
            condition=condition
        )

        policy_json = self.create_bucket_policy([statement])

        # 应用策略
        self.client.set_bucket_policy(bucket_name, policy_json)

        return {
            'success': True,
            'message': f"存储桶 '{bucket_name}' 时间限制策略设置成功",
            'bucket_name': bucket_name,
            'expiry_time': expiry_time.isoformat(),
            'expiry_hours': expiry_hours,
            'policy': policy_json
        }

    except S3Error as e:
        return {
            'success': False,
            'error': f"设置策略失败: {e}",
            'bucket_name': bucket_name
        }

def get_bucket_policy(self, bucket_name: str) -> Dict[str, Any]:
    """获取存储桶策略"""
    try:
        policy_json = self.client.get_bucket_policy(bucket_name)
        policy_dict = json.loads(policy_json)

        return {
            'success': True,
            'bucket_name': bucket_name,
            'policy': policy_dict,
            'policy_json': policy_json,
            'statement_count': len(policy_dict.get('Statement', []))
        }

    except S3Error as e:
        if "NoSuchBucketPolicy" in str(e):
            return {
                'success': True,
                'bucket_name': bucket_name,
                'policy': None,
                'message': "存储桶没有设置策略"
            }
        else:
            return {
                'success': False,
                'error': f"获取策略失败: {e}",
                'bucket_name': bucket_name
            }

def delete_bucket_policy(self, bucket_name: str) -> Dict[str, Any]:
    """删除存储桶策略"""
    try:
        self.client.delete_bucket_policy(bucket_name)

        return {
            'success': True,
            'message': f"存储桶 '{bucket_name}' 策略删除成功",
            'bucket_name': bucket_name,
            'deleted_at': datetime.now().isoformat()
        }

    except S3Error as e:
        return {
            'success': False,
            'error': f"删除策略失败: {e}",
            'bucket_name': bucket_name
        }

2.3 存储桶通知配置

2.3.1 事件通知设置

”`python from minio.notificationconfig import ( NotificationConfig, QueueConfig, TopicConfig, CloudFuncConfig ) from minio.commonconfig import Filter, FilterRule

class BucketNotificationManager: “”“存储桶通知管理器”“”

def __init__(self, client: Minio):
    self.client = client

def create_queue_notification(self, queue_arn: str, events: List[str],
                            prefix_filter: str = None, 
                            suffix_filter: str = None) -> QueueConfig:
    """创建队列通知配置"""

    # 创建过滤器
    filter_rules = []
    if prefix_filter:
        filter_rules.append(FilterRule("prefix", prefix_filter))
    if suffix_filter:
        filter_rules.append(FilterRule("suffix", suffix_filter))

    filter_config = Filter(filter_rules) if filter_rules else None

    # 创建队列配置
    queue_config = QueueConfig(
        queue_arn=queue_arn,
        events=events,
        config_filter=filter_config
    )

    return queue_config

def create_topic_notification(self, topic_arn: str, events: List[str],
                            prefix_filter: str = None,
                            suffix_filter: str = None) -> TopicConfig:
    """创建主题通知配置"""

    # 创建过滤器
    filter_rules = []
    if prefix_filter:
        filter_rules.append(FilterRule("prefix", prefix_filter))
    if suffix_filter:
        filter_rules.append(FilterRule("suffix", suffix_filter))

    filter_config = Filter(filter_rules) if filter_rules else None

    # 创建主题配置
    topic_config = TopicConfig(
        topic_arn=topic_arn,
        events=events,
        config_filter=filter_config
    )

    return topic_config

def set_bucket_notification(self, bucket_name: str, 
                          queue_configs: List[QueueConfig] = None,
                          topic_configs: List[TopicConfig] = None,
                          cloud_func_configs: List[CloudFuncConfig] = None) -> Dict[str, Any]:
    """设置存储桶通知"""
    try:
        # 创建通知配置
        notification_config = NotificationConfig(
            queue_config_list=queue_configs or [],
            topic_config_list=topic_configs or [],
            cloud_func_config_list=cloud_func_configs or []
        )

        # 应用通知配置
        self.client.set_bucket_notification(bucket_name, notification_config)

        return {
            'success': True,
            'message': f"存储桶 '{bucket_name}' 通知配置设置成功",
            'bucket_name': bucket_name,
            'queue_configs': len(queue_configs or []),
            'topic_configs': len(topic_configs or []),
            'cloud_func_configs': len(cloud_func_configs or []),
            'configured_at': datetime.now().isoformat()
        }

    except S3Error as e:
        return {
            'success': False,
            'error': f"设置通知配置失败: {e}",
            'bucket_name': bucket_name
        }

def get_bucket_notification(self, bucket_name: str) -> Dict[str, Any]:
    """获取存储桶通知配置"""
    try:
        notification_config = self.client.get_bucket_notification(bucket_name)

        return {
            'success': True,
            'bucket_name': bucket_name,
            'queue_configs': len(notification_config.queue_config_list),
            'topic_configs': len(notification_config.topic_config_list),
            'cloud_func_configs': len(notification_config.cloud_func_config_list),
            'notification_config': notification_config
        }

    except S3Error as e:
        return {
            'success': False,
            'error': f"获取通知配置失败: {e}",
            'bucket_name': bucket_name
        }

def delete_bucket_notification(self, bucket_name: str) -> Dict[str, Any]:
    """删除存储桶通知配置"""
    try:
        # 创建空的通知配置
        empty_notification = NotificationConfig()

        # 应用空配置(删除所有通知)
        self.client.set_bucket_notification(bucket_name, empty_notification)

        return {
            'success': True,
            'message': f"存储桶 '{bucket_name}' 通知配置删除成功",
            'bucket_name': bucket_name,
            'deleted_at': datetime.now().isoformat()
        }

    except S3Error as e:
        return {
            'success': False,
            'error': f"删除通知配置失败: {e}",
            'bucket_name': bucket_name
        }

2.4 存储桶版本控制

2.4.1 版本控制管理

”`python from minio.versioningconfig import VersioningConfig, ENABLED, SUSPENDED

class BucketVersioningManager: “”“存储桶版本控制管理器”“”

def __init__(self, client: Minio):
    self.client = client

def enable_versioning(self, bucket_name: str) -> Dict[str, Any]:
    """启用版本控制"""
    try:
        # 创建版本控制配置
        versioning_config = VersioningConfig(ENABLED)

        # 应用版本控制配置
        self.client.set_bucket_versioning(bucket_name, versioning_config)

        return {
            'success': True,
            'message': f"存储桶 '{bucket_name}' 版本控制已启用",
            'bucket_name': bucket_name,
            'status': 'Enabled',
            'enabled_at': datetime.now().isoformat()
        }

    except S3Error as e:
        return {
            'success': False,
            'error': f"启用版本控制失败: {e}",
            'bucket_name': bucket_name
        }

def suspend_versioning(self, bucket_name: str) -> Dict[str, Any]:
    """暂停版本控制"""
    try:
        # 创建版本控制配置
        versioning_config = VersioningConfig(SUSPENDED)

        # 应用版本控制配置
        self.client.set_bucket_versioning(bucket_name, versioning_config)

        return {
            'success': True,
            'message': f"存储桶 '{bucket_name}' 版本控制已暂停",
            'bucket_name': bucket_name,
            'status': 'Suspended',
            'suspended_at': datetime.now().isoformat()
        }

    except S3Error as e:
        return {
            'success': False,
            'error': f"暂停版本控制失败: {e}",
            'bucket_name': bucket_name
        }

def get_versioning_status(self, bucket_name: str) -> Dict[str, Any]:
    """获取版本控制状态"""
    try:
        versioning_config = self.client.get_bucket_versioning(bucket_name)

        return {
            'success': True,
            'bucket_name': bucket_name,
            'status': versioning_config.status,
            'mfa_delete': getattr(versioning_config, 'mfa_delete', None),
            'retrieved_at': datetime.now().isoformat()
        }

    except S3Error as e:
        return {
            'success': False,
            'error': f"获取版本控制状态失败: {e}",
            'bucket_name': bucket_name
        }

def list_object_versions(self, bucket_name: str, prefix: str = None,
                       max_keys: int = 1000) -> Dict[str, Any]:
    """列出对象版本"""
    try:
        # 获取对象版本列表
        versions = self.client.list_objects(
            bucket_name=bucket_name,
            prefix=prefix,
            recursive=True,
            include_version=True
        )

        version_list = []
        for version in versions:
            version_info = {
                'object_name': version.object_name,
                'version_id': getattr(version, 'version_id', None),
                'is_latest': getattr(version, 'is_latest', True),
                'size': version.size,
                'etag': version.etag,
                'last_modified': version.last_modified.isoformat() if version.last_modified else None,
                'storage_class': getattr(version, 'storage_class', None)
            }
            version_list.append(version_info)

            if len(version_list) >= max_keys:
                break

        return {
            'success': True,
            'bucket_name': bucket_name,
            'prefix': prefix,
            'total_versions': len(version_list),
            'versions': version_list,
            'retrieved_at': datetime.now().isoformat()
        }

    except S3Error as e:
        return {
            'success': False,
            'error': f"列出对象版本失败: {e}",
            'bucket_name': bucket_name
        }

2.5 存储桶加密配置

2.5.1 服务端加密设置

”`python from minio.sseconfig import SSEConfig, Rule from minio.sse import SseS3, SseKms

class BucketEncryptionManager: “”“存储桶加密管理器”“”

def __init__(self, client: Minio):
    self.client = client

def set_default_encryption(self, bucket_name: str, 
                         encryption_type: str = "AES256",
                         kms_key_id: str = None) -> Dict[str, Any]:
    """设置默认加密"""
    try:
        # 根据加密类型创建SSE配置
        if encryption_type.upper() == "AES256":
            sse = SseS3()
        elif encryption_type.upper() == "KMS":
            if not kms_key_id:
                return {
                    'success': False,
                    'error': "KMS加密需要提供密钥ID",
                    'bucket_name': bucket_name
                }
            sse = SseKms(kms_key_id)
        else:
            return {
                'success': False,
                'error': f"不支持的加密类型: {encryption_type}",
                'bucket_name': bucket_name
            }

        # 创建加密规则
        rule = Rule(sse)

        # 创建SSE配置
        sse_config = SSEConfig(rule)

        # 应用加密配置
        self.client.set_bucket_encryption(bucket_name, sse_config)

        return {
            'success': True,
            'message': f"存储桶 '{bucket_name}' 默认加密设置成功",
            'bucket_name': bucket_name,
            'encryption_type': encryption_type,
            'kms_key_id': kms_key_id,
            'configured_at': datetime.now().isoformat()
        }

    except S3Error as e:
        return {
            'success': False,
            'error': f"设置加密配置失败: {e}",
            'bucket_name': bucket_name
        }

def get_bucket_encryption(self, bucket_name: str) -> Dict[str, Any]:
    """获取存储桶加密配置"""
    try:
        sse_config = self.client.get_bucket_encryption(bucket_name)

        # 解析加密配置
        rule = sse_config.rule
        sse = rule.sse

        encryption_info = {
            'algorithm': sse.algorithm if hasattr(sse, 'algorithm') else None,
            'kms_key_id': sse.key if hasattr(sse, 'key') else None,
            'type': type(sse).__name__
        }

        return {
            'success': True,
            'bucket_name': bucket_name,
            'encryption': encryption_info,
            'sse_config': sse_config,
            'retrieved_at': datetime.now().isoformat()
        }

    except S3Error as e:
        if "ServerSideEncryptionConfigurationNotFoundError" in str(e):
            return {
                'success': True,
                'bucket_name': bucket_name,
                'encryption': None,
                'message': "存储桶没有设置加密配置"
            }
        else:
            return {
                'success': False,
                'error': f"获取加密配置失败: {e}",
                'bucket_name': bucket_name
            }

def delete_bucket_encryption(self, bucket_name: str) -> Dict[str, Any]:
    """删除存储桶加密配置"""
    try:
        self.client.delete_bucket_encryption(bucket_name)

        return {
            'success': True,
            'message': f"存储桶 '{bucket_name}' 加密配置删除成功",
            'bucket_name': bucket_name,
            'deleted_at': datetime.now().isoformat()
        }

    except S3Error as e:
        return {
            'success': False,
            'error': f"删除加密配置失败: {e}",
            'bucket_name': bucket_name
        }

2.6 综合管理示例

2.6.1 完整的存储桶管理类

”`python class ComprehensiveBucketManager: “”“综合存储桶管理器”“”

def __init__(self, client: Minio):
    self.client = client
    self.bucket_manager = BucketManager(client)
    self.policy_manager = BucketPolicyManager(client)
    self.notification_manager = BucketNotificationManager(client)
    self.versioning_manager = BucketVersioningManager(client)
    self.encryption_manager = BucketEncryptionManager(client)

def create_production_bucket(self, bucket_name: str, 
                           config: Dict[str, Any]) -> Dict[str, Any]:
    """创建生产环境存储桶"""
    results = {
        'bucket_name': bucket_name,
        'operations': [],
        'success': True,
        'errors': []
    }

    try:
        # 1. 创建存储桶
        create_result = self.bucket_manager.create_bucket(
            bucket_name=bucket_name,
            region=config.get('region'),
            object_lock=config.get('object_lock', False)
        )
        results['operations'].append(('create_bucket', create_result))

        if not create_result['success']:
            results['success'] = False
            results['errors'].append(create_result['error'])
            return results

        # 2. 设置版本控制
        if config.get('enable_versioning', False):
            versioning_result = self.versioning_manager.enable_versioning(bucket_name)
            results['operations'].append(('enable_versioning', versioning_result))

            if not versioning_result['success']:
                results['errors'].append(versioning_result['error'])

        # 3. 设置加密
        encryption_config = config.get('encryption')
        if encryption_config:
            encryption_result = self.encryption_manager.set_default_encryption(
                bucket_name=bucket_name,
                encryption_type=encryption_config.get('type', 'AES256'),
                kms_key_id=encryption_config.get('kms_key_id')
            )
            results['operations'].append(('set_encryption', encryption_result))

            if not encryption_result['success']:
                results['errors'].append(encryption_result['error'])

        # 4. 设置访问策略
        policy_config = config.get('policy')
        if policy_config:
            policy_type = policy_config.get('type')

            if policy_type == 'public_read':
                policy_result = self.policy_manager.set_public_read_policy(
                    bucket_name=bucket_name,
                    prefix=policy_config.get('prefix', '*')
                )
            elif policy_type == 'authenticated_rw':
                policy_result = self.policy_manager.set_authenticated_read_write_policy(
                    bucket_name=bucket_name,
                    user_arn=policy_config.get('user_arn')
                )
            elif policy_type == 'time_limited':
                policy_result = self.policy_manager.set_time_limited_policy(
                    bucket_name=bucket_name,
                    expiry_hours=policy_config.get('expiry_hours', 24)
                )
            else:
                policy_result = {'success': False, 'error': f"未知策略类型: {policy_type}"}

            results['operations'].append(('set_policy', policy_result))

            if not policy_result['success']:
                results['errors'].append(policy_result['error'])

        # 5. 设置通知
        notification_config = config.get('notifications')
        if notification_config:
            queue_configs = []
            topic_configs = []

            # 处理队列通知
            for queue_config in notification_config.get('queues', []):
                queue = self.notification_manager.create_queue_notification(
                    queue_arn=queue_config['arn'],
                    events=queue_config['events'],
                    prefix_filter=queue_config.get('prefix'),
                    suffix_filter=queue_config.get('suffix')
                )
                queue_configs.append(queue)

            # 处理主题通知
            for topic_config in notification_config.get('topics', []):
                topic = self.notification_manager.create_topic_notification(
                    topic_arn=topic_config['arn'],
                    events=topic_config['events'],
                    prefix_filter=topic_config.get('prefix'),
                    suffix_filter=topic_config.get('suffix')
                )
                topic_configs.append(topic)

            if queue_configs or topic_configs:
                notification_result = self.notification_manager.set_bucket_notification(
                    bucket_name=bucket_name,
                    queue_configs=queue_configs,
                    topic_configs=topic_configs
                )
                results['operations'].append(('set_notifications', notification_result))

                if not notification_result['success']:
                    results['errors'].append(notification_result['error'])

        # 设置最终成功状态
        if results['errors']:
            results['success'] = False
            results['message'] = f"存储桶创建完成,但有 {len(results['errors'])} 个错误"
        else:
            results['message'] = f"生产环境存储桶 '{bucket_name}' 创建成功"

        results['created_at'] = datetime.now().isoformat()

    except Exception as e:
        results['success'] = False
        results['error'] = f"创建生产环境存储桶失败: {e}"

    return results

def get_bucket_summary(self, bucket_name: str) -> Dict[str, Any]:
    """获取存储桶完整摘要"""
    summary = {
        'bucket_name': bucket_name,
        'exists': False,
        'basic_info': {},
        'policy': {},
        'versioning': {},
        'encryption': {},
        'notifications': {},
        'objects': {},
        'retrieved_at': datetime.now().isoformat()
    }

    try:
        # 检查存储桶是否存在
        if not self.client.bucket_exists(bucket_name):
            summary['error'] = f"存储桶 '{bucket_name}' 不存在"
            return summary

        summary['exists'] = True

        # 获取基本信息
        buckets_result = self.bucket_manager.list_buckets(detailed=True)
        if buckets_result['success']:
            for bucket in buckets_result['buckets']:
                if bucket['name'] == bucket_name:
                    summary['basic_info'] = bucket
                    break

        # 获取策略信息
        summary['policy'] = self.policy_manager.get_bucket_policy(bucket_name)

        # 获取版本控制信息
        summary['versioning'] = self.versioning_manager.get_versioning_status(bucket_name)

        # 获取加密信息
        summary['encryption'] = self.encryption_manager.get_bucket_encryption(bucket_name)

        # 获取通知信息
        summary['notifications'] = self.notification_manager.get_bucket_notification(bucket_name)

        # 获取对象统计
        try:
            objects = list(self.client.list_objects(bucket_name, recursive=True))
            total_size = sum(obj.size for obj in objects if obj.size)

            summary['objects'] = {
                'count': len(objects),
                'total_size': total_size,
                'total_size_mb': round(total_size / (1024 * 1024), 2) if total_size else 0
            }
        except Exception as e:
            summary['objects'] = {'error': str(e)}

    except Exception as e:
        summary['error'] = f"获取存储桶摘要失败: {e}"

    return summary

使用示例

def bucket_management_example(): “”“存储桶管理示例”“”

print("=== MinIO存储桶管理示例 ===")

try:
    # 连接MinIO
    client = Minio(
        endpoint="localhost:9000",
        access_key="minioadmin",
        secret_key="minioadmin",
        secure=False
    )

    # 创建综合管理器
    manager = ComprehensiveBucketManager(client)

    # 生产环境配置
    production_config = {
        'region': 'us-east-1',
        'enable_versioning': True,
        'object_lock': False,
        'encryption': {
            'type': 'AES256'
        },
        'policy': {
            'type': 'authenticated_rw'
        }
    }

    # 创建生产环境存储桶
    print("\n1. 创建生产环境存储桶...")
    bucket_name = "production-data-bucket"
    create_result = manager.create_production_bucket(bucket_name, production_config)

    print(f"创建结果: {create_result['success']}")
    if create_result['errors']:
        print(f"错误: {create_result['errors']}")

    # 获取存储桶摘要
    print("\n2. 获取存储桶摘要...")
    summary = manager.get_bucket_summary(bucket_name)
    print(json.dumps(summary, indent=2, ensure_ascii=False, default=str))

    print("\n✅ 存储桶管理示例完成!")

except Exception as e:
    print(f"❌ 示例执行失败: {e}")

if name == “main”: bucket_management_example()

2.7 总结

本章详细介绍了MinIO存储桶的管理功能:

2.7.1 核心功能

  1. 存储桶基本操作:创建、删除、列表、验证
  2. 访问策略管理:公共访问、认证访问、时间限制
  3. 事件通知配置:队列通知、主题通知、过滤规则
  4. 版本控制管理:启用、暂停、版本列表
  5. 加密配置:服务端加密、KMS集成

2.7.2 最佳实践

  • 遵循存储桶命名规范
  • 合理设置访问策略
  • 启用版本控制保护数据
  • 配置适当的加密策略
  • 设置必要的事件通知

2.7.3 下一步学习

  • 对象操作和元数据管理
  • 预签名URL和临时访问
  • 多部分上传和大文件处理
  • 生命周期管理和自动化

下一章将介绍MinIO的对象操作和文件管理功能。