4.1 预签名URL概述

4.1.1 预签名URL的概念和优势

预签名URL是一种临时的、带有签名的URL,允许客户端在不提供访问凭证的情况下访问MinIO中的对象。这种机制提供了安全、灵活的访问控制方式。

主要优势: - 安全性:无需暴露访问密钥 - 时效性:可设置过期时间 - 灵活性:支持不同的HTTP方法 - 便利性:可直接在浏览器中使用

”`python from minio import Minio from minio.error import S3Error from datetime import datetime, timedelta from urllib.parse import urlparse, parse_qs import requests import json from typing import Dict, List, Any, Optional, Union

class PresignedURLManager: “”“预签名URL管理器”“”

def __init__(self, client: Minio):
    self.client = client

def generate_download_url(self, bucket_name: str, object_name: str,
                         expires: timedelta = timedelta(hours=1),
                         response_headers: Dict[str, str] = None,
                         version_id: str = None) -> Dict[str, Any]:
    """生成下载预签名URL"""
    try:
        # 检查对象是否存在
        try:
            self.client.stat_object(bucket_name, object_name, version_id=version_id)
        except S3Error as e:
            if e.code == 'NoSuchKey':
                return {
                    'success': False,
                    'error': f"对象不存在: {bucket_name}/{object_name}",
                    'bucket_name': bucket_name,
                    'object_name': object_name
                }
            raise

        # 生成预签名URL
        presigned_url = self.client.presigned_get_object(
            bucket_name=bucket_name,
            object_name=object_name,
            expires=expires,
            response_headers=response_headers,
            version_id=version_id
        )

        # 计算过期时间
        expires_at = datetime.now() + expires

        return {
            'success': True,
            'message': f"下载URL生成成功: {bucket_name}/{object_name}",
            'bucket_name': bucket_name,
            'object_name': object_name,
            'version_id': version_id,
            'presigned_url': presigned_url,
            'expires_in_seconds': int(expires.total_seconds()),
            'expires_at': expires_at.isoformat(),
            'response_headers': response_headers,
            'method': 'GET',
            'generated_at': datetime.now().isoformat()
        }

    except S3Error as e:
        return {
            'success': False,
            'error': f"生成下载URL失败: {e}",
            'bucket_name': bucket_name,
            'object_name': object_name
        }

def generate_upload_url(self, bucket_name: str, object_name: str,
                       expires: timedelta = timedelta(hours=1),
                       content_type: str = None,
                       content_length_range: tuple = None) -> Dict[str, Any]:
    """生成上传预签名URL"""
    try:
        # 准备上传策略
        policy = {}
        if content_type:
            policy['content-type'] = content_type
        if content_length_range:
            policy['content-length-range'] = content_length_range

        # 生成预签名URL
        presigned_url = self.client.presigned_put_object(
            bucket_name=bucket_name,
            object_name=object_name,
            expires=expires
        )

        # 计算过期时间
        expires_at = datetime.now() + expires

        return {
            'success': True,
            'message': f"上传URL生成成功: {bucket_name}/{object_name}",
            'bucket_name': bucket_name,
            'object_name': object_name,
            'presigned_url': presigned_url,
            'expires_in_seconds': int(expires.total_seconds()),
            'expires_at': expires_at.isoformat(),
            'content_type': content_type,
            'content_length_range': content_length_range,
            'method': 'PUT',
            'generated_at': datetime.now().isoformat()
        }

    except S3Error as e:
        return {
            'success': False,
            'error': f"生成上传URL失败: {e}",
            'bucket_name': bucket_name,
            'object_name': object_name
        }

def generate_post_policy(self, bucket_name: str, object_name_prefix: str = "",
                        expires: timedelta = timedelta(hours=1),
                        content_length_range: tuple = None,
                        content_type_starts_with: str = None) -> Dict[str, Any]:
    """生成POST表单上传策略"""
    try:
        from minio.policy import PostPolicy

        # 创建POST策略
        policy = PostPolicy()
        policy.set_bucket_name(bucket_name)

        # 设置对象名称前缀
        if object_name_prefix:
            policy.set_key_startswith(object_name_prefix)

        # 设置过期时间
        policy.set_expires(datetime.now() + expires)

        # 设置内容长度范围
        if content_length_range:
            policy.set_content_length_range(content_length_range[0], content_length_range[1])

        # 设置内容类型
        if content_type_starts_with:
            policy.set_content_type_startswith(content_type_starts_with)

        # 生成POST表单数据
        form_data = self.client.presigned_post_policy(policy)

        return {
            'success': True,
            'message': f"POST策略生成成功: {bucket_name}",
            'bucket_name': bucket_name,
            'object_name_prefix': object_name_prefix,
            'form_data': form_data,
            'expires_in_seconds': int(expires.total_seconds()),
            'expires_at': (datetime.now() + expires).isoformat(),
            'content_length_range': content_length_range,
            'content_type_starts_with': content_type_starts_with,
            'method': 'POST',
            'generated_at': datetime.now().isoformat()
        }

    except Exception as e:
        return {
            'success': False,
            'error': f"生成POST策略失败: {e}",
            'bucket_name': bucket_name
        }

def validate_url(self, presigned_url: str) -> Dict[str, Any]:
    """验证预签名URL"""
    try:
        # 解析URL
        parsed_url = urlparse(presigned_url)
        query_params = parse_qs(parsed_url.query)

        # 提取签名信息
        signature = query_params.get('X-Amz-Signature', [None])[0]
        expires = query_params.get('X-Amz-Expires', [None])[0]
        date = query_params.get('X-Amz-Date', [None])[0]

        # 验证URL格式
        is_valid_format = all([signature, expires, date])

        # 检查是否过期(基于URL中的时间信息)
        is_expired = False
        if date and expires:
            try:
                # 解析日期和过期时间
                url_date = datetime.strptime(date, '%Y%m%dT%H%M%SZ')
                expires_seconds = int(expires)
                expiry_time = url_date + timedelta(seconds=expires_seconds)
                is_expired = datetime.now() > expiry_time
            except:
                pass

        return {
            'success': True,
            'presigned_url': presigned_url,
            'is_valid_format': is_valid_format,
            'is_expired': is_expired,
            'signature': signature,
            'expires_in_seconds': expires,
            'date': date,
            'host': parsed_url.netloc,
            'path': parsed_url.path,
            'validated_at': datetime.now().isoformat()
        }

    except Exception as e:
        return {
            'success': False,
            'error': f"验证URL失败: {e}",
            'presigned_url': presigned_url
        }

4.2 安全访问控制

4.2.1 访问策略管理

”`python import json from minio.policy import Policy, Statement, Effect, Principal, Action, Resource, Condition

class AccessPolicyManager: “”“访问策略管理器”“”

def __init__(self, client: Minio):
    self.client = client

def create_read_only_policy(self, bucket_name: str, object_prefix: str = "*") -> str:
    """创建只读访问策略"""
    policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {"AWS": "*"},
                "Action": ["s3:GetObject"],
                "Resource": [f"arn:aws:s3:::{bucket_name}/{object_prefix}"]
            }
        ]
    }
    return json.dumps(policy, indent=2)

def create_read_write_policy(self, bucket_name: str, object_prefix: str = "*") -> str:
    """创建读写访问策略"""
    policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {"AWS": "*"},
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:DeleteObject"
                ],
                "Resource": [f"arn:aws:s3:::{bucket_name}/{object_prefix}"]
            },
            {
                "Effect": "Allow",
                "Principal": {"AWS": "*"},
                "Action": ["s3:ListBucket"],
                "Resource": [f"arn:aws:s3:::{bucket_name}"],
                "Condition": {
                    "StringLike": {
                        "s3:prefix": [object_prefix.rstrip('*')]
                    }
                }
            }
        ]
    }
    return json.dumps(policy, indent=2)

def create_time_limited_policy(self, bucket_name: str, object_prefix: str = "*",
                              start_time: datetime = None,
                              end_time: datetime = None) -> str:
    """创建时间限制访问策略"""
    if not start_time:
        start_time = datetime.now()
    if not end_time:
        end_time = start_time + timedelta(hours=24)

    policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {"AWS": "*"},
                "Action": ["s3:GetObject"],
                "Resource": [f"arn:aws:s3:::{bucket_name}/{object_prefix}"],
                "Condition": {
                    "DateGreaterThan": {
                        "aws:CurrentTime": start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
                    },
                    "DateLessThan": {
                        "aws:CurrentTime": end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
                    }
                }
            }
        ]
    }
    return json.dumps(policy, indent=2)

def create_ip_restricted_policy(self, bucket_name: str, allowed_ips: List[str],
                               object_prefix: str = "*") -> str:
    """创建IP限制访问策略"""
    policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {"AWS": "*"},
                "Action": ["s3:GetObject"],
                "Resource": [f"arn:aws:s3:::{bucket_name}/{object_prefix}"],
                "Condition": {
                    "IpAddress": {
                        "aws:SourceIp": allowed_ips
                    }
                }
            }
        ]
    }
    return json.dumps(policy, indent=2)

def set_bucket_policy(self, bucket_name: str, policy: str) -> Dict[str, Any]:
    """设置存储桶策略"""
    try:
        # 验证策略格式
        try:
            json.loads(policy)
        except json.JSONDecodeError as e:
            return {
                'success': False,
                'error': f"策略JSON格式无效: {e}",
                'bucket_name': bucket_name
            }

        # 设置策略
        self.client.set_bucket_policy(bucket_name, policy)

        return {
            'success': True,
            'message': f"存储桶策略设置成功: {bucket_name}",
            'bucket_name': bucket_name,
            'policy': policy,
            'set_at': datetime.now().isoformat()
        }

    except S3Error as e:
        return {
            'success': False,
            'error': f"设置存储桶策略失败: {e}",
            'bucket_name': bucket_name
        }

def get_bucket_policy(self, bucket_name: str) -> Dict[str, Any]:
    """获取存储桶策略"""
    try:
        policy = self.client.get_bucket_policy(bucket_name)

        # 解析策略
        try:
            policy_dict = json.loads(policy)
        except:
            policy_dict = None

        return {
            'success': True,
            'bucket_name': bucket_name,
            'policy': policy,
            'policy_dict': policy_dict,
            'retrieved_at': datetime.now().isoformat()
        }

    except S3Error as e:
        if e.code == 'NoSuchBucketPolicy':
            return {
                'success': True,
                'bucket_name': bucket_name,
                'policy': None,
                'message': "存储桶没有设置策略",
                'retrieved_at': datetime.now().isoformat()
            }
        return {
            'success': False,
            'error': f"获取存储桶策略失败: {e}",
            'bucket_name': bucket_name
        }

def delete_bucket_policy(self, bucket_name: str) -> Dict[str, Any]:
    """删除存储桶策略"""
    try:
        self.client.delete_bucket_policy(bucket_name)

        return {
            'success': True,
            'message': f"存储桶策略删除成功: {bucket_name}",
            'bucket_name': bucket_name,
            'deleted_at': datetime.now().isoformat()
        }

    except S3Error as e:
        return {
            'success': False,
            'error': f"删除存储桶策略失败: {e}",
            'bucket_name': bucket_name
        }

4.2.2 用户和组管理

”`python class UserAccessManager: “”“用户访问管理器”“”

def __init__(self, admin_client: Minio):
    self.admin_client = admin_client
    self.users = {}  # 模拟用户数据库
    self.groups = {}  # 模拟组数据库

def create_user_credentials(self, username: str, permissions: List[str],
                           expires: timedelta = timedelta(days=30)) -> Dict[str, Any]:
    """创建用户凭证"""
    try:
        import secrets
        import string

        # 生成访问密钥
        access_key = f"user_{username}_{secrets.token_hex(8)}"
        secret_key = ''.join(secrets.choice(string.ascii_letters + string.digits) for _ in range(32))

        # 计算过期时间
        expires_at = datetime.now() + expires

        # 存储用户信息
        user_info = {
            'username': username,
            'access_key': access_key,
            'secret_key': secret_key,
            'permissions': permissions,
            'created_at': datetime.now().isoformat(),
            'expires_at': expires_at.isoformat(),
            'is_active': True
        }

        self.users[username] = user_info

        return {
            'success': True,
            'message': f"用户凭证创建成功: {username}",
            'username': username,
            'access_key': access_key,
            'secret_key': secret_key,
            'permissions': permissions,
            'expires_at': expires_at.isoformat(),
            'created_at': datetime.now().isoformat()
        }

    except Exception as e:
        return {
            'success': False,
            'error': f"创建用户凭证失败: {e}",
            'username': username
        }

def create_temporary_access(self, bucket_name: str, object_prefix: str,
                           permissions: List[str],
                           expires: timedelta = timedelta(hours=1)) -> Dict[str, Any]:
    """创建临时访问凭证"""
    try:
        # 生成临时凭证
        temp_credentials = self.create_user_credentials(
            username=f"temp_{secrets.token_hex(4)}",
            permissions=permissions,
            expires=expires
        )

        if not temp_credentials['success']:
            return temp_credentials

        # 创建临时客户端
        temp_client = Minio(
            endpoint=self.admin_client._base_url.netloc,
            access_key=temp_credentials['access_key'],
            secret_key=temp_credentials['secret_key'],
            secure=self.admin_client._base_url.scheme == 'https'
        )

        # 生成预签名URL(如果需要)
        presigned_urls = {}
        if 'read' in permissions:
            url_manager = PresignedURLManager(temp_client)
            download_url = url_manager.generate_download_url(
                bucket_name=bucket_name,
                object_name=f"{object_prefix}*",
                expires=expires
            )
            if download_url['success']:
                presigned_urls['download'] = download_url['presigned_url']

        if 'write' in permissions:
            url_manager = PresignedURLManager(temp_client)
            upload_url = url_manager.generate_upload_url(
                bucket_name=bucket_name,
                object_name=f"{object_prefix}temp_upload",
                expires=expires
            )
            if upload_url['success']:
                presigned_urls['upload'] = upload_url['presigned_url']

        return {
            'success': True,
            'message': f"临时访问凭证创建成功",
            'bucket_name': bucket_name,
            'object_prefix': object_prefix,
            'permissions': permissions,
            'credentials': {
                'access_key': temp_credentials['access_key'],
                'secret_key': temp_credentials['secret_key']
            },
            'presigned_urls': presigned_urls,
            'expires_at': temp_credentials['expires_at'],
            'created_at': datetime.now().isoformat()
        }

    except Exception as e:
        return {
            'success': False,
            'error': f"创建临时访问失败: {e}",
            'bucket_name': bucket_name
        }

def validate_user_access(self, username: str, bucket_name: str,
                        object_name: str, action: str) -> Dict[str, Any]:
    """验证用户访问权限"""
    try:
        # 检查用户是否存在
        if username not in self.users:
            return {
                'success': False,
                'error': f"用户不存在: {username}",
                'username': username,
                'access_granted': False
            }

        user_info = self.users[username]

        # 检查用户是否激活
        if not user_info['is_active']:
            return {
                'success': False,
                'error': f"用户已被禁用: {username}",
                'username': username,
                'access_granted': False
            }

        # 检查是否过期
        expires_at = datetime.fromisoformat(user_info['expires_at'])
        if datetime.now() > expires_at:
            return {
                'success': False,
                'error': f"用户凭证已过期: {username}",
                'username': username,
                'expires_at': user_info['expires_at'],
                'access_granted': False
            }

        # 检查权限
        permissions = user_info['permissions']
        action_permission_map = {
            'read': ['read', 'read_write', 'admin'],
            'write': ['write', 'read_write', 'admin'],
            'delete': ['delete', 'admin'],
            'admin': ['admin']
        }

        required_permissions = action_permission_map.get(action, [])
        has_permission = any(perm in permissions for perm in required_permissions)

        return {
            'success': True,
            'username': username,
            'bucket_name': bucket_name,
            'object_name': object_name,
            'action': action,
            'access_granted': has_permission,
            'user_permissions': permissions,
            'required_permissions': required_permissions,
            'validated_at': datetime.now().isoformat()
        }

    except Exception as e:
        return {
            'success': False,
            'error': f"验证用户访问失败: {e}",
            'username': username
        }

def revoke_user_access(self, username: str) -> Dict[str, Any]:
    """撤销用户访问"""
    try:
        if username not in self.users:
            return {
                'success': False,
                'error': f"用户不存在: {username}",
                'username': username
            }

        # 禁用用户
        self.users[username]['is_active'] = False
        self.users[username]['revoked_at'] = datetime.now().isoformat()

        return {
            'success': True,
            'message': f"用户访问已撤销: {username}",
            'username': username,
            'revoked_at': datetime.now().isoformat()
        }

    except Exception as e:
        return {
            'success': False,
            'error': f"撤销用户访问失败: {e}",
            'username': username
        }

4.3 安全最佳实践

4.3.1 安全配置检查器

”`python class SecurityAuditor: “”“安全审计器”“”

def __init__(self, client: Minio):
    self.client = client
    self.policy_manager = AccessPolicyManager(client)

def audit_bucket_security(self, bucket_name: str) -> Dict[str, Any]:
    """审计存储桶安全配置"""
    audit_results = {
        'bucket_name': bucket_name,
        'audit_timestamp': datetime.now().isoformat(),
        'security_score': 0,
        'max_score': 100,
        'checks': [],
        'recommendations': []
    }

    try:
        # 检查1: 存储桶是否存在
        if not self.client.bucket_exists(bucket_name):
            audit_results['checks'].append({
                'check': 'bucket_exists',
                'status': 'fail',
                'message': '存储桶不存在',
                'score': 0
            })
            return audit_results

        audit_results['checks'].append({
            'check': 'bucket_exists',
            'status': 'pass',
            'message': '存储桶存在',
            'score': 10
        })
        audit_results['security_score'] += 10

        # 检查2: 存储桶策略
        policy_result = self.policy_manager.get_bucket_policy(bucket_name)
        if policy_result['success'] and policy_result['policy']:
            audit_results['checks'].append({
                'check': 'bucket_policy',
                'status': 'pass',
                'message': '存储桶已设置访问策略',
                'score': 20
            })
            audit_results['security_score'] += 20

            # 分析策略内容
            try:
                policy_dict = json.loads(policy_result['policy'])
                statements = policy_dict.get('Statement', [])

                # 检查是否有过于宽松的策略
                has_public_read = False
                has_public_write = False

                for statement in statements:
                    principal = statement.get('Principal', {})
                    actions = statement.get('Action', [])

                    if principal == '*' or principal.get('AWS') == '*':
                        if 's3:GetObject' in actions:
                            has_public_read = True
                        if any(action in actions for action in ['s3:PutObject', 's3:DeleteObject']):
                            has_public_write = True

                if has_public_write:
                    audit_results['checks'].append({
                        'check': 'public_write_access',
                        'status': 'warning',
                        'message': '存储桶允许公共写入访问',
                        'score': -10
                    })
                    audit_results['security_score'] -= 10
                    audit_results['recommendations'].append(
                        '限制公共写入访问,使用预签名URL或特定用户权限'
                    )

                if has_public_read:
                    audit_results['checks'].append({
                        'check': 'public_read_access',
                        'status': 'info',
                        'message': '存储桶允许公共读取访问',
                        'score': 0
                    })
                    audit_results['recommendations'].append(
                        '确认公共读取访问是否必要,考虑使用预签名URL'
                    )

            except:
                audit_results['checks'].append({
                    'check': 'policy_analysis',
                    'status': 'warning',
                    'message': '无法解析存储桶策略',
                    'score': -5
                })
                audit_results['security_score'] -= 5
        else:
            audit_results['checks'].append({
                'check': 'bucket_policy',
                'status': 'warning',
                'message': '存储桶未设置访问策略',
                'score': 0
            })
            audit_results['recommendations'].append(
                '设置适当的存储桶访问策略以控制访问权限'
            )

        # 检查3: 版本控制
        try:
            versioning = self.client.get_bucket_versioning(bucket_name)
            if versioning and versioning.status == 'Enabled':
                audit_results['checks'].append({
                    'check': 'versioning',
                    'status': 'pass',
                    'message': '版本控制已启用',
                    'score': 15
                })
                audit_results['security_score'] += 15
            else:
                audit_results['checks'].append({
                    'check': 'versioning',
                    'status': 'warning',
                    'message': '版本控制未启用',
                    'score': 0
                })
                audit_results['recommendations'].append(
                    '启用版本控制以防止意外删除和修改'
                )
        except:
            audit_results['checks'].append({
                'check': 'versioning',
                'status': 'unknown',
                'message': '无法检查版本控制状态',
                'score': 0
            })

        # 检查4: 加密配置
        try:
            encryption = self.client.get_bucket_encryption(bucket_name)
            if encryption:
                audit_results['checks'].append({
                    'check': 'encryption',
                    'status': 'pass',
                    'message': '存储桶已启用加密',
                    'score': 20
                })
                audit_results['security_score'] += 20
            else:
                audit_results['checks'].append({
                    'check': 'encryption',
                    'status': 'warning',
                    'message': '存储桶未启用加密',
                    'score': 0
                })
                audit_results['recommendations'].append(
                    '启用服务器端加密以保护数据安全'
                )
        except:
            audit_results['checks'].append({
                'check': 'encryption',
                'status': 'unknown',
                'message': '无法检查加密配置',
                'score': 0
            })

        # 检查5: 对象数量和大小
        try:
            from ..object_operations import ObjectLister
            lister = ObjectLister(self.client)
            objects_result = lister.list_objects(bucket_name, max_keys=1000)

            if objects_result['success']:
                object_count = objects_result['total_objects']
                total_size_mb = objects_result['total_size_mb']

                audit_results['checks'].append({
                    'check': 'object_inventory',
                    'status': 'info',
                    'message': f'存储桶包含 {object_count} 个对象,总大小 {total_size_mb} MB',
                    'score': 5
                })
                audit_results['security_score'] += 5

                # 检查是否有敏感文件名
                sensitive_patterns = ['password', 'secret', 'key', 'token', 'credential']
                sensitive_objects = []

                for obj in objects_result['objects']:
                    obj_name_lower = obj['object_name'].lower()
                    for pattern in sensitive_patterns:
                        if pattern in obj_name_lower:
                            sensitive_objects.append(obj['object_name'])
                            break

                if sensitive_objects:
                    audit_results['checks'].append({
                        'check': 'sensitive_object_names',
                        'status': 'warning',
                        'message': f'发现 {len(sensitive_objects)} 个可能包含敏感信息的对象名称',
                        'score': -5,
                        'details': sensitive_objects[:5]  # 只显示前5个
                    })
                    audit_results['security_score'] -= 5
                    audit_results['recommendations'].append(
                        '避免在对象名称中包含敏感信息,使用通用的命名规则'
                    )
        except:
            pass

        # 计算最终安全评级
        if audit_results['security_score'] >= 80:
            audit_results['security_level'] = 'excellent'
        elif audit_results['security_score'] >= 60:
            audit_results['security_level'] = 'good'
        elif audit_results['security_score'] >= 40:
            audit_results['security_level'] = 'fair'
        else:
            audit_results['security_level'] = 'poor'

        return audit_results

    except Exception as e:
        audit_results['error'] = f"安全审计失败: {e}"
        return audit_results

def generate_security_report(self, bucket_names: List[str]) -> Dict[str, Any]:
    """生成安全报告"""
    report = {
        'report_timestamp': datetime.now().isoformat(),
        'total_buckets': len(bucket_names),
        'bucket_audits': [],
        'summary': {
            'excellent': 0,
            'good': 0,
            'fair': 0,
            'poor': 0
        },
        'overall_recommendations': []
    }

    for bucket_name in bucket_names:
        audit_result = self.audit_bucket_security(bucket_name)
        report['bucket_audits'].append(audit_result)

        # 统计安全等级
        security_level = audit_result.get('security_level', 'unknown')
        if security_level in report['summary']:
            report['summary'][security_level] += 1

    # 生成总体建议
    if report['summary']['poor'] > 0:
        report['overall_recommendations'].append(
            f"{report['summary']['poor']} 个存储桶安全配置较差,需要立即改进"
        )

    if report['summary']['fair'] > 0:
        report['overall_recommendations'].append(
            f"{report['summary']['fair']} 个存储桶安全配置一般,建议优化"
        )

    # 计算总体安全评分
    total_score = sum(
        audit['security_score'] for audit in report['bucket_audits']
        if 'security_score' in audit
    )
    max_total_score = len(bucket_names) * 100
    report['overall_security_score'] = (total_score / max_total_score * 100) if max_total_score > 0 else 0

    return report

4.4 实际应用示例

4.4.1 安全文件分享系统

”`python class SecureFileSharing: “”“安全文件分享系统”“”

def __init__(self, client: Minio):
    self.client = client
    self.url_manager = PresignedURLManager(client)
    self.policy_manager = AccessPolicyManager(client)
    self.user_manager = UserAccessManager(client)
    self.auditor = SecurityAuditor(client)
    self.shares = {}  # 分享记录

def create_secure_share(self, bucket_name: str, object_name: str,
                       share_type: str = "download",
                       expires: timedelta = timedelta(hours=24),
                       password: str = None,
                       max_downloads: int = None,
                       allowed_ips: List[str] = None) -> Dict[str, Any]:
    """创建安全分享链接"""
    try:
        import secrets

        # 生成分享ID
        share_id = secrets.token_urlsafe(16)

        # 生成预签名URL
        if share_type == "download":
            url_result = self.url_manager.generate_download_url(
                bucket_name=bucket_name,
                object_name=object_name,
                expires=expires
            )
        elif share_type == "upload":
            url_result = self.url_manager.generate_upload_url(
                bucket_name=bucket_name,
                object_name=object_name,
                expires=expires
            )
        else:
            return {
                'success': False,
                'error': f"不支持的分享类型: {share_type}"
            }

        if not url_result['success']:
            return url_result

        # 创建分享记录
        share_info = {
            'share_id': share_id,
            'bucket_name': bucket_name,
            'object_name': object_name,
            'share_type': share_type,
            'presigned_url': url_result['presigned_url'],
            'expires_at': url_result['expires_at'],
            'password': password,
            'max_downloads': max_downloads,
            'download_count': 0,
            'allowed_ips': allowed_ips or [],
            'created_at': datetime.now().isoformat(),
            'is_active': True,
            'access_log': []
        }

        self.shares[share_id] = share_info

        # 生成安全分享URL
        secure_url = f"https://your-app.com/share/{share_id}"

        return {
            'success': True,
            'message': f"安全分享创建成功: {bucket_name}/{object_name}",
            'share_id': share_id,
            'secure_url': secure_url,
            'presigned_url': url_result['presigned_url'],
            'share_type': share_type,
            'expires_at': url_result['expires_at'],
            'has_password': password is not None,
            'max_downloads': max_downloads,
            'allowed_ips': allowed_ips,
            'created_at': datetime.now().isoformat()
        }

    except Exception as e:
        return {
            'success': False,
            'error': f"创建安全分享失败: {e}",
            'bucket_name': bucket_name,
            'object_name': object_name
        }

def access_share(self, share_id: str, password: str = None,
                client_ip: str = None) -> Dict[str, Any]:
    """访问分享链接"""
    try:
        # 检查分享是否存在
        if share_id not in self.shares:
            return {
                'success': False,
                'error': '分享链接不存在或已过期',
                'share_id': share_id
            }

        share_info = self.shares[share_id]

        # 检查分享是否激活
        if not share_info['is_active']:
            return {
                'success': False,
                'error': '分享链接已被禁用',
                'share_id': share_id
            }

        # 检查是否过期
        expires_at = datetime.fromisoformat(share_info['expires_at'])
        if datetime.now() > expires_at:
            share_info['is_active'] = False
            return {
                'success': False,
                'error': '分享链接已过期',
                'share_id': share_id,
                'expires_at': share_info['expires_at']
            }

        # 检查密码
        if share_info['password'] and password != share_info['password']:
            # 记录访问失败
            share_info['access_log'].append({
                'timestamp': datetime.now().isoformat(),
                'client_ip': client_ip,
                'status': 'password_failed',
                'message': '密码错误'
            })
            return {
                'success': False,
                'error': '密码错误',
                'share_id': share_id
            }

        # 检查IP限制
        if share_info['allowed_ips'] and client_ip not in share_info['allowed_ips']:
            share_info['access_log'].append({
                'timestamp': datetime.now().isoformat(),
                'client_ip': client_ip,
                'status': 'ip_blocked',
                'message': 'IP地址不在允许列表中'
            })
            return {
                'success': False,
                'error': 'IP地址不在允许列表中',
                'share_id': share_id
            }

        # 检查下载次数限制
        if (share_info['max_downloads'] and 
            share_info['download_count'] >= share_info['max_downloads']):
            share_info['is_active'] = False
            return {
                'success': False,
                'error': '已达到最大下载次数限制',
                'share_id': share_id,
                'max_downloads': share_info['max_downloads']
            }

        # 记录成功访问
        share_info['access_log'].append({
            'timestamp': datetime.now().isoformat(),
            'client_ip': client_ip,
            'status': 'success',
            'message': '访问成功'
        })

        # 增加下载计数
        if share_info['share_type'] == 'download':
            share_info['download_count'] += 1

        return {
            'success': True,
            'message': '分享访问成功',
            'share_id': share_id,
            'presigned_url': share_info['presigned_url'],
            'bucket_name': share_info['bucket_name'],
            'object_name': share_info['object_name'],
            'share_type': share_info['share_type'],
            'download_count': share_info['download_count'],
            'max_downloads': share_info['max_downloads'],
            'accessed_at': datetime.now().isoformat()
        }

    except Exception as e:
        return {
            'success': False,
            'error': f"访问分享失败: {e}",
            'share_id': share_id
        }

def revoke_share(self, share_id: str) -> Dict[str, Any]:
    """撤销分享"""
    try:
        if share_id not in self.shares:
            return {
                'success': False,
                'error': '分享不存在',
                'share_id': share_id
            }

        self.shares[share_id]['is_active'] = False
        self.shares[share_id]['revoked_at'] = datetime.now().isoformat()

        return {
            'success': True,
            'message': '分享已撤销',
            'share_id': share_id,
            'revoked_at': datetime.now().isoformat()
        }

    except Exception as e:
        return {
            'success': False,
            'error': f"撤销分享失败: {e}",
            'share_id': share_id
        }

def get_share_analytics(self, share_id: str) -> Dict[str, Any]:
    """获取分享分析数据"""
    try:
        if share_id not in self.shares:
            return {
                'success': False,
                'error': '分享不存在',
                'share_id': share_id
            }

        share_info = self.shares[share_id]
        access_log = share_info['access_log']

        # 统计访问数据
        total_accesses = len(access_log)
        successful_accesses = len([log for log in access_log if log['status'] == 'success'])
        failed_accesses = total_accesses - successful_accesses

        # 统计IP访问
        ip_stats = {}
        for log in access_log:
            ip = log.get('client_ip', 'unknown')
            if ip not in ip_stats:
                ip_stats[ip] = {'total': 0, 'success': 0, 'failed': 0}
            ip_stats[ip]['total'] += 1
            if log['status'] == 'success':
                ip_stats[ip]['success'] += 1
            else:
                ip_stats[ip]['failed'] += 1

        return {
            'success': True,
            'share_id': share_id,
            'bucket_name': share_info['bucket_name'],
            'object_name': share_info['object_name'],
            'share_type': share_info['share_type'],
            'created_at': share_info['created_at'],
            'expires_at': share_info['expires_at'],
            'is_active': share_info['is_active'],
            'statistics': {
                'total_accesses': total_accesses,
                'successful_accesses': successful_accesses,
                'failed_accesses': failed_accesses,
                'success_rate': (successful_accesses / total_accesses * 100) if total_accesses > 0 else 0,
                'download_count': share_info['download_count'],
                'max_downloads': share_info['max_downloads'],
                'ip_statistics': ip_stats
            },
            'access_log': access_log,
            'analyzed_at': datetime.now().isoformat()
        }

    except Exception as e:
        return {
            'success': False,
            'error': f"获取分享分析失败: {e}",
            'share_id': share_id
        }

使用示例

def security_example(): “”“安全访问示例”“”

print("=== MinIO安全访问示例 ===")

try:
    # 连接MinIO
    client = Minio(
        endpoint="localhost:9000",
        access_key="minioadmin",
        secret_key="minioadmin",
        secure=False
    )

    bucket_name = "security-demo"

    # 创建存储桶
    if not client.bucket_exists(bucket_name):
        client.make_bucket(bucket_name)
        print(f"✅ 创建存储桶: {bucket_name}")

    # 创建安全文件分享系统
    sharing_system = SecureFileSharing(client)

    # 1. 上传测试文件
    print("\n1. 上传测试文件...")
    test_content = "这是一个需要安全分享的文件\n" + datetime.now().isoformat()
    client.put_object(
        bucket_name=bucket_name,
        object_name="secure/test_document.txt",
        data=io.BytesIO(test_content.encode()),
        length=len(test_content.encode()),
        content_type="text/plain"
    )

    # 2. 创建安全分享
    print("\n2. 创建安全分享...")
    share_result = sharing_system.create_secure_share(
        bucket_name=bucket_name,
        object_name="secure/test_document.txt",
        share_type="download",
        expires=timedelta(hours=1),
        password="secure123",
        max_downloads=5,
        allowed_ips=["127.0.0.1", "192.168.1.100"]
    )

    if share_result['success']:
        print(f"分享ID: {share_result['share_id']}")
        print(f"安全URL: {share_result['secure_url']}")
        print(f"过期时间: {share_result['expires_at']}")

    # 3. 模拟访问分享
    print("\n3. 模拟访问分享...")

    # 错误密码访问
    access_result = sharing_system.access_share(
        share_id=share_result['share_id'],
        password="wrong_password",
        client_ip="127.0.0.1"
    )
    print(f"错误密码访问: {access_result['success']}")

    # 正确密码访问
    access_result = sharing_system.access_share(
        share_id=share_result['share_id'],
        password="secure123",
        client_ip="127.0.0.1"
    )
    print(f"正确密码访问: {access_result['success']}")

    # 4. 安全审计
    print("\n4. 执行安全审计...")
    auditor = SecurityAuditor(client)
    audit_result = auditor.audit_bucket_security(bucket_name)

    print(f"安全评分: {audit_result['security_score']}/100")
    print(f"安全等级: {audit_result['security_level']}")
    print(f"检查项目: {len(audit_result['checks'])}")
    print(f"建议数量: {len(audit_result['recommendations'])}")

    # 5. 获取分享分析
    print("\n5. 获取分享分析...")
    analytics_result = sharing_system.get_share_analytics(share_result['share_id'])

    if analytics_result['success']:
        stats = analytics_result['statistics']
        print(f"总访问次数: {stats['total_accesses']}")
        print(f"成功访问: {stats['successful_accesses']}")
        print(f"失败访问: {stats['failed_accesses']}")
        print(f"成功率: {stats['success_rate']:.1f}%")

    print("\n✅ 安全访问示例完成!")

except Exception as e:
    print(f"❌ 示例执行失败: {e}")

if name == “main”: security_example()

4.5 总结

本章详细介绍了MinIO的预签名URL和安全访问控制:

4.5.1 核心功能

  1. 预签名URL:下载、上传、POST策略生成
  2. 访问策略:只读、读写、时间限制、IP限制策略
  3. 用户管理:临时凭证、权限验证、访问撤销
  4. 安全审计:配置检查、安全评分、报告生成
  5. 安全分享:密码保护、访问限制、分析统计

4.5.2 安全最佳实践

  • 设置合理的URL过期时间
  • 使用最小权限原则
  • 定期审计安全配置
  • 监控访问日志
  • 实施多层安全防护

4.5.3 下一步学习

  • 多部分上传和大文件处理
  • 对象生命周期管理
  • 跨区域复制和灾备
  • 性能优化和监控

下一章将介绍MinIO的多部分上传和大文件处理技术。