8.1 用户与权限管理

8.1.1 用户管理

import hashlib
import secrets
import logging
from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Set
from datetime import datetime, timedelta

class UserRole(Enum):
    """用户角色"""
    ADMINISTRATOR = "administrator"  # 管理员
    MONITORING = "monitoring"        # 监控
    POLICYMAKER = "policymaker"      # 策略制定者
    MANAGEMENT = "management"        # 管理
    IMPERSONATOR = "impersonator"    # 模拟者
    NONE = "none"                    # 无角色

@dataclass
class UserInfo:
    """用户信息"""
    username: str
    password_hash: str
    tags: List[UserRole]
    created_at: datetime
    last_login: Optional[datetime] = None
    is_active: bool = True
    password_expires_at: Optional[datetime] = None
    failed_login_attempts: int = 0
    locked_until: Optional[datetime] = None

@dataclass
class Permission:
    """权限信息"""
    vhost: str
    configure: str  # 配置权限正则表达式
    write: str      # 写权限正则表达式
    read: str       # 读权限正则表达式

class UserManager:
    """用户管理器"""
    
    def __init__(self, management_api):
        self.management_api = management_api
        self.logger = logging.getLogger(__name__)
        
        # 用户存储(实际应该使用数据库)
        self.users: Dict[str, UserInfo] = {}
        self.user_permissions: Dict[str, List[Permission]] = {}
        
        # 安全配置
        self.password_policy = {
            'min_length': 8,
            'require_uppercase': True,
            'require_lowercase': True,
            'require_digits': True,
            'require_special_chars': True,
            'password_history_size': 5,
            'password_expiry_days': 90,
            'max_failed_attempts': 5,
            'lockout_duration_minutes': 30
        }
        
        self.password_history: Dict[str, List[str]] = {}
    
    def create_user(self, username: str, password: str, 
                   tags: List[UserRole] = None) -> bool:
        """创建用户"""
        try:
            if username in self.users:
                self.logger.error(f"用户已存在: {username}")
                return False
            
            # 验证密码策略
            if not self._validate_password(password):
                return False
            
            # 生成密码哈希
            password_hash = self._hash_password(password)
            
            # 设置密码过期时间
            password_expires_at = None
            if self.password_policy['password_expiry_days'] > 0:
                password_expires_at = datetime.now() + timedelta(
                    days=self.password_policy['password_expiry_days']
                )
            
            # 创建用户信息
            user_info = UserInfo(
                username=username,
                password_hash=password_hash,
                tags=tags or [UserRole.NONE],
                created_at=datetime.now(),
                password_expires_at=password_expires_at
            )
            
            self.users[username] = user_info
            self.user_permissions[username] = []
            self.password_history[username] = [password_hash]
            
            # 在RabbitMQ中创建用户
            self._create_rabbitmq_user(username, password, tags or [])
            
            self.logger.info(f"用户创建成功: {username}")
            return True
            
        except Exception as e:
            self.logger.error(f"创建用户失败: {e}")
            return False
    
    def _validate_password(self, password: str) -> bool:
        """验证密码策略"""
        policy = self.password_policy
        
        # 检查长度
        if len(password) < policy['min_length']:
            self.logger.error(f"密码长度不足,至少需要{policy['min_length']}个字符")
            return False
        
        # 检查大写字母
        if policy['require_uppercase'] and not any(c.isupper() for c in password):
            self.logger.error("密码必须包含大写字母")
            return False
        
        # 检查小写字母
        if policy['require_lowercase'] and not any(c.islower() for c in password):
            self.logger.error("密码必须包含小写字母")
            return False
        
        # 检查数字
        if policy['require_digits'] and not any(c.isdigit() for c in password):
            self.logger.error("密码必须包含数字")
            return False
        
        # 检查特殊字符
        if policy['require_special_chars']:
            special_chars = "!@#$%^&*()_+-=[]{}|;:,.<>?"
            if not any(c in special_chars for c in password):
                self.logger.error("密码必须包含特殊字符")
                return False
        
        return True
    
    def _hash_password(self, password: str) -> str:
        """生成密码哈希"""
        salt = secrets.token_hex(16)
        password_hash = hashlib.pbkdf2_hmac('sha256', 
                                          password.encode('utf-8'), 
                                          salt.encode('utf-8'), 
                                          100000)
        return salt + password_hash.hex()
    
    def _verify_password(self, password: str, password_hash: str) -> bool:
        """验证密码"""
        try:
            salt = password_hash[:32]
            stored_hash = password_hash[32:]
            
            computed_hash = hashlib.pbkdf2_hmac('sha256',
                                              password.encode('utf-8'),
                                              salt.encode('utf-8'),
                                              100000)
            
            return computed_hash.hex() == stored_hash
        except Exception:
            return False
    
    def _create_rabbitmq_user(self, username: str, password: str, tags: List[UserRole]):
        """在RabbitMQ中创建用户"""
        try:
            # 转换标签
            tag_strings = [tag.value for tag in tags if tag != UserRole.NONE]
            
            # 使用管理API创建用户
            user_data = {
                'password': password,
                'tags': ','.join(tag_strings)
            }
            
            self.management_api.create_user(username, user_data)
            
        except Exception as e:
            self.logger.error(f"在RabbitMQ中创建用户失败: {e}")
            raise
    
    def authenticate_user(self, username: str, password: str) -> bool:
        """用户认证"""
        try:
            if username not in self.users:
                self.logger.warning(f"用户不存在: {username}")
                return False
            
            user_info = self.users[username]
            
            # 检查用户是否被锁定
            if user_info.locked_until and datetime.now() < user_info.locked_until:
                self.logger.warning(f"用户被锁定: {username}")
                return False
            
            # 检查用户是否激活
            if not user_info.is_active:
                self.logger.warning(f"用户未激活: {username}")
                return False
            
            # 检查密码是否过期
            if (user_info.password_expires_at and 
                datetime.now() > user_info.password_expires_at):
                self.logger.warning(f"用户密码已过期: {username}")
                return False
            
            # 验证密码
            if self._verify_password(password, user_info.password_hash):
                # 认证成功,重置失败次数
                user_info.failed_login_attempts = 0
                user_info.last_login = datetime.now()
                user_info.locked_until = None
                
                self.logger.info(f"用户认证成功: {username}")
                return True
            else:
                # 认证失败,增加失败次数
                user_info.failed_login_attempts += 1
                
                # 检查是否需要锁定用户
                if (user_info.failed_login_attempts >= 
                    self.password_policy['max_failed_attempts']):
                    
                    lockout_duration = timedelta(
                        minutes=self.password_policy['lockout_duration_minutes']
                    )
                    user_info.locked_until = datetime.now() + lockout_duration
                    
                    self.logger.warning(f"用户因多次失败登录被锁定: {username}")
                
                self.logger.warning(f"用户认证失败: {username}")
                return False
                
        except Exception as e:
            self.logger.error(f"用户认证异常: {e}")
            return False
    
    def change_password(self, username: str, old_password: str, 
                       new_password: str) -> bool:
        """修改密码"""
        try:
            if username not in self.users:
                self.logger.error(f"用户不存在: {username}")
                return False
            
            user_info = self.users[username]
            
            # 验证旧密码
            if not self._verify_password(old_password, user_info.password_hash):
                self.logger.error("旧密码验证失败")
                return False
            
            # 验证新密码策略
            if not self._validate_password(new_password):
                return False
            
            # 检查密码历史
            new_password_hash = self._hash_password(new_password)
            history = self.password_history.get(username, [])
            
            for old_hash in history[-self.password_policy['password_history_size']:]:
                if self._verify_password(new_password, old_hash):
                    self.logger.error("新密码不能与历史密码相同")
                    return False
            
            # 更新密码
            user_info.password_hash = new_password_hash
            
            # 更新密码过期时间
            if self.password_policy['password_expiry_days'] > 0:
                user_info.password_expires_at = datetime.now() + timedelta(
                    days=self.password_policy['password_expiry_days']
                )
            
            # 更新密码历史
            history.append(new_password_hash)
            if len(history) > self.password_policy['password_history_size']:
                history.pop(0)
            
            # 在RabbitMQ中更新密码
            self._update_rabbitmq_password(username, new_password)
            
            self.logger.info(f"密码修改成功: {username}")
            return True
            
        except Exception as e:
            self.logger.error(f"修改密码失败: {e}")
            return False
    
    def _update_rabbitmq_password(self, username: str, new_password: str):
        """在RabbitMQ中更新密码"""
        try:
            user_data = {'password': new_password}
            self.management_api.update_user(username, user_data)
        except Exception as e:
            self.logger.error(f"在RabbitMQ中更新密码失败: {e}")
            raise
    
    def set_user_permissions(self, username: str, vhost: str, 
                           configure: str = "", write: str = "", 
                           read: str = "") -> bool:
        """设置用户权限"""
        try:
            if username not in self.users:
                self.logger.error(f"用户不存在: {username}")
                return False
            
            # 创建权限对象
            permission = Permission(
                vhost=vhost,
                configure=configure,
                write=write,
                read=read
            )
            
            # 更新本地权限
            if username not in self.user_permissions:
                self.user_permissions[username] = []
            
            # 移除同一vhost的旧权限
            self.user_permissions[username] = [
                p for p in self.user_permissions[username] if p.vhost != vhost
            ]
            
            # 添加新权限
            self.user_permissions[username].append(permission)
            
            # 在RabbitMQ中设置权限
            self._set_rabbitmq_permissions(username, vhost, configure, write, read)
            
            self.logger.info(f"用户权限设置成功: {username}@{vhost}")
            return True
            
        except Exception as e:
            self.logger.error(f"设置用户权限失败: {e}")
            return False
    
    def _set_rabbitmq_permissions(self, username: str, vhost: str,
                                configure: str, write: str, read: str):
        """在RabbitMQ中设置权限"""
        try:
            permission_data = {
                'configure': configure,
                'write': write,
                'read': read
            }
            
            self.management_api.set_permissions(vhost, username, permission_data)
            
        except Exception as e:
            self.logger.error(f"在RabbitMQ中设置权限失败: {e}")
            raise
    
    def get_user_permissions(self, username: str) -> List[Permission]:
        """获取用户权限"""
        return self.user_permissions.get(username, [])
    
    def delete_user(self, username: str) -> bool:
        """删除用户"""
        try:
            if username not in self.users:
                self.logger.error(f"用户不存在: {username}")
                return False
            
            # 删除本地用户信息
            del self.users[username]
            if username in self.user_permissions:
                del self.user_permissions[username]
            if username in self.password_history:
                del self.password_history[username]
            
            # 在RabbitMQ中删除用户
            self.management_api.delete_user(username)
            
            self.logger.info(f"用户删除成功: {username}")
            return True
            
        except Exception as e:
            self.logger.error(f"删除用户失败: {e}")
            return False
    
    def list_users(self) -> List[Dict[str, Any]]:
        """列出所有用户"""
        users = []
        for username, user_info in self.users.items():
            users.append({
                'username': username,
                'tags': [tag.value for tag in user_info.tags],
                'created_at': user_info.created_at.isoformat(),
                'last_login': user_info.last_login.isoformat() if user_info.last_login else None,
                'is_active': user_info.is_active,
                'password_expires_at': user_info.password_expires_at.isoformat() if user_info.password_expires_at else None,
                'failed_login_attempts': user_info.failed_login_attempts,
                'is_locked': user_info.locked_until and datetime.now() < user_info.locked_until
            })
        return users
    
    def unlock_user(self, username: str) -> bool:
        """解锁用户"""
        try:
            if username not in self.users:
                self.logger.error(f"用户不存在: {username}")
                return False
            
            user_info = self.users[username]
            user_info.locked_until = None
            user_info.failed_login_attempts = 0
            
            self.logger.info(f"用户解锁成功: {username}")
            return True
            
        except Exception as e:
            self.logger.error(f"解锁用户失败: {e}")
            return False
    
    def get_user_security_status(self, username: str) -> Dict[str, Any]:
        """获取用户安全状态"""
        if username not in self.users:
            return {}
        
        user_info = self.users[username]
        
        return {
            'username': username,
            'is_active': user_info.is_active,
            'password_expires_at': user_info.password_expires_at.isoformat() if user_info.password_expires_at else None,
            'password_expired': user_info.password_expires_at and datetime.now() > user_info.password_expires_at,
            'failed_login_attempts': user_info.failed_login_attempts,
            'is_locked': user_info.locked_until and datetime.now() < user_info.locked_until,
            'locked_until': user_info.locked_until.isoformat() if user_info.locked_until else None,
            'last_login': user_info.last_login.isoformat() if user_info.last_login else None,
            'created_at': user_info.created_at.isoformat(),
            'tags': [tag.value for tag in user_info.tags]
        }

8.2 虚拟主机与资源隔离

8.2.1 虚拟主机管理

@dataclass
class VHostInfo:
    """虚拟主机信息"""
    name: str
    description: str
    created_at: datetime
    is_active: bool = True
    max_connections: Optional[int] = None
    max_queues: Optional[int] = None
    max_exchanges: Optional[int] = None
    tags: List[str] = None

@dataclass
class ResourceQuota:
    """资源配额"""
    max_connections: Optional[int] = None
    max_queues: Optional[int] = None
    max_exchanges: Optional[int] = None
    max_bindings: Optional[int] = None
    max_channels_per_connection: Optional[int] = None
    max_message_size: Optional[int] = None  # 字节
    max_queue_length: Optional[int] = None
    max_memory_usage: Optional[int] = None  # 字节

class VHostManager:
    """虚拟主机管理器"""
    
    def __init__(self, management_api):
        self.management_api = management_api
        self.logger = logging.getLogger(__name__)
        
        # 虚拟主机存储
        self.vhosts: Dict[str, VHostInfo] = {}
        self.vhost_quotas: Dict[str, ResourceQuota] = {}
        self.vhost_usage: Dict[str, Dict[str, int]] = {}
    
    def create_vhost(self, name: str, description: str = "", 
                    quota: ResourceQuota = None, tags: List[str] = None) -> bool:
        """创建虚拟主机"""
        try:
            if name in self.vhosts:
                self.logger.error(f"虚拟主机已存在: {name}")
                return False
            
            # 创建虚拟主机信息
            vhost_info = VHostInfo(
                name=name,
                description=description,
                created_at=datetime.now(),
                tags=tags or []
            )
            
            self.vhosts[name] = vhost_info
            
            # 设置资源配额
            if quota:
                self.vhost_quotas[name] = quota
            
            # 初始化使用统计
            self.vhost_usage[name] = {
                'connections': 0,
                'queues': 0,
                'exchanges': 0,
                'bindings': 0,
                'channels': 0,
                'memory_usage': 0
            }
            
            # 在RabbitMQ中创建虚拟主机
            self._create_rabbitmq_vhost(name, description, tags or [])
            
            self.logger.info(f"虚拟主机创建成功: {name}")
            return True
            
        except Exception as e:
            self.logger.error(f"创建虚拟主机失败: {e}")
            return False
    
    def _create_rabbitmq_vhost(self, name: str, description: str, tags: List[str]):
        """在RabbitMQ中创建虚拟主机"""
        try:
            vhost_data = {
                'description': description,
                'tags': ','.join(tags)
            }
            
            self.management_api.create_vhost(name, vhost_data)
            
        except Exception as e:
            self.logger.error(f"在RabbitMQ中创建虚拟主机失败: {e}")
            raise
    
    def delete_vhost(self, name: str) -> bool:
        """删除虚拟主机"""
        try:
            if name not in self.vhosts:
                self.logger.error(f"虚拟主机不存在: {name}")
                return False
            
            # 删除本地信息
            del self.vhosts[name]
            if name in self.vhost_quotas:
                del self.vhost_quotas[name]
            if name in self.vhost_usage:
                del self.vhost_usage[name]
            
            # 在RabbitMQ中删除虚拟主机
            self.management_api.delete_vhost(name)
            
            self.logger.info(f"虚拟主机删除成功: {name}")
            return True
            
        except Exception as e:
            self.logger.error(f"删除虚拟主机失败: {e}")
            return False
    
    def set_vhost_quota(self, vhost_name: str, quota: ResourceQuota) -> bool:
        """设置虚拟主机资源配额"""
        try:
            if vhost_name not in self.vhosts:
                self.logger.error(f"虚拟主机不存在: {vhost_name}")
                return False
            
            self.vhost_quotas[vhost_name] = quota
            
            # 在RabbitMQ中设置配额(如果支持)
            self._set_rabbitmq_quota(vhost_name, quota)
            
            self.logger.info(f"虚拟主机配额设置成功: {vhost_name}")
            return True
            
        except Exception as e:
            self.logger.error(f"设置虚拟主机配额失败: {e}")
            return False
    
    def _set_rabbitmq_quota(self, vhost_name: str, quota: ResourceQuota):
        """在RabbitMQ中设置配额"""
        try:
            # RabbitMQ的配额设置通常通过策略实现
            if quota.max_queue_length:
                policy_data = {
                    'pattern': '.*',
                    'definition': {
                        'max-length': quota.max_queue_length
                    },
                    'priority': 1,
                    'apply-to': 'queues'
                }
                
                self.management_api.create_policy(
                    vhost_name, 'max-length-policy', policy_data
                )
            
            if quota.max_message_size:
                policy_data = {
                    'pattern': '.*',
                    'definition': {
                        'max-length-bytes': quota.max_message_size
                    },
                    'priority': 1,
                    'apply-to': 'queues'
                }
                
                self.management_api.create_policy(
                    vhost_name, 'max-size-policy', policy_data
                )
                
        except Exception as e:
            self.logger.error(f"在RabbitMQ中设置配额失败: {e}")
    
    def check_resource_quota(self, vhost_name: str, resource_type: str, 
                           increment: int = 1) -> bool:
        """检查资源配额"""
        try:
            if vhost_name not in self.vhost_quotas:
                return True  # 没有配额限制
            
            quota = self.vhost_quotas[vhost_name]
            usage = self.vhost_usage.get(vhost_name, {})
            
            current_usage = usage.get(resource_type, 0)
            
            # 检查各种资源类型的配额
            if resource_type == 'connections' and quota.max_connections:
                return current_usage + increment <= quota.max_connections
            
            elif resource_type == 'queues' and quota.max_queues:
                return current_usage + increment <= quota.max_queues
            
            elif resource_type == 'exchanges' and quota.max_exchanges:
                return current_usage + increment <= quota.max_exchanges
            
            elif resource_type == 'bindings' and quota.max_bindings:
                return current_usage + increment <= quota.max_bindings
            
            elif resource_type == 'memory_usage' and quota.max_memory_usage:
                return current_usage + increment <= quota.max_memory_usage
            
            return True
            
        except Exception as e:
            self.logger.error(f"检查资源配额失败: {e}")
            return False
    
    def update_resource_usage(self, vhost_name: str, resource_type: str, 
                            change: int):
        """更新资源使用量"""
        try:
            if vhost_name not in self.vhost_usage:
                self.vhost_usage[vhost_name] = {
                    'connections': 0,
                    'queues': 0,
                    'exchanges': 0,
                    'bindings': 0,
                    'channels': 0,
                    'memory_usage': 0
                }
            
            current = self.vhost_usage[vhost_name].get(resource_type, 0)
            self.vhost_usage[vhost_name][resource_type] = max(0, current + change)
            
        except Exception as e:
            self.logger.error(f"更新资源使用量失败: {e}")
    
    def get_vhost_info(self, vhost_name: str) -> Optional[Dict[str, Any]]:
        """获取虚拟主机信息"""
        if vhost_name not in self.vhosts:
            return None
        
        vhost_info = self.vhosts[vhost_name]
        quota = self.vhost_quotas.get(vhost_name)
        usage = self.vhost_usage.get(vhost_name, {})
        
        return {
            'name': vhost_info.name,
            'description': vhost_info.description,
            'created_at': vhost_info.created_at.isoformat(),
            'is_active': vhost_info.is_active,
            'tags': vhost_info.tags,
            'quota': {
                'max_connections': quota.max_connections if quota else None,
                'max_queues': quota.max_queues if quota else None,
                'max_exchanges': quota.max_exchanges if quota else None,
                'max_bindings': quota.max_bindings if quota else None,
                'max_channels_per_connection': quota.max_channels_per_connection if quota else None,
                'max_message_size': quota.max_message_size if quota else None,
                'max_queue_length': quota.max_queue_length if quota else None,
                'max_memory_usage': quota.max_memory_usage if quota else None
            } if quota else None,
            'usage': usage
        }
    
    def list_vhosts(self) -> List[Dict[str, Any]]:
        """列出所有虚拟主机"""
        vhosts = []
        for vhost_name in self.vhosts:
            vhost_info = self.get_vhost_info(vhost_name)
            if vhost_info:
                vhosts.append(vhost_info)
        return vhosts
    
    def get_resource_utilization(self, vhost_name: str) -> Dict[str, float]:
        """获取资源利用率"""
        if vhost_name not in self.vhosts:
            return {}
        
        quota = self.vhost_quotas.get(vhost_name)
        usage = self.vhost_usage.get(vhost_name, {})
        
        if not quota:
            return {}
        
        utilization = {}
        
        if quota.max_connections and quota.max_connections > 0:
            utilization['connections'] = usage.get('connections', 0) / quota.max_connections
        
        if quota.max_queues and quota.max_queues > 0:
            utilization['queues'] = usage.get('queues', 0) / quota.max_queues
        
        if quota.max_exchanges and quota.max_exchanges > 0:
            utilization['exchanges'] = usage.get('exchanges', 0) / quota.max_exchanges
        
        if quota.max_bindings and quota.max_bindings > 0:
            utilization['bindings'] = usage.get('bindings', 0) / quota.max_bindings
        
        if quota.max_memory_usage and quota.max_memory_usage > 0:
            utilization['memory'] = usage.get('memory_usage', 0) / quota.max_memory_usage
        
        return utilization

8.3 SSL/TLS加密配置

8.3.1 SSL证书管理

import ssl
import socket
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from pathlib import Path

@dataclass
class SSLConfig:
    """SSL配置"""
    cert_file: str
    key_file: str
    ca_file: Optional[str] = None
    verify_mode: str = "CERT_REQUIRED"  # CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED
    ciphers: Optional[str] = None
    protocol: str = "TLSv1_2"  # TLSv1_2, TLSv1_3
    check_hostname: bool = True

@dataclass
class CertificateInfo:
    """证书信息"""
    subject: str
    issuer: str
    serial_number: str
    not_before: datetime
    not_after: datetime
    fingerprint: str
    is_ca: bool
    key_usage: List[str]
    san_dns: List[str]  # Subject Alternative Names
    san_ip: List[str]

class SSLManager:
    """SSL管理器"""
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.certificates: Dict[str, CertificateInfo] = {}
    
    def generate_self_signed_cert(self, hostname: str, 
                                 cert_file: str, key_file: str,
                                 validity_days: int = 365) -> bool:
        """生成自签名证书"""
        try:
            # 生成私钥
            private_key = rsa.generate_private_key(
                public_exponent=65537,
                key_size=2048
            )
            
            # 创建证书主题
            subject = issuer = x509.Name([
                x509.NameAttribute(NameOID.COUNTRY_NAME, "CN"),
                x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Beijing"),
                x509.NameAttribute(NameOID.LOCALITY_NAME, "Beijing"),
                x509.NameAttribute(NameOID.ORGANIZATION_NAME, "RabbitMQ"),
                x509.NameAttribute(NameOID.COMMON_NAME, hostname),
            ])
            
            # 创建证书
            cert = x509.CertificateBuilder().subject_name(
                subject
            ).issuer_name(
                issuer
            ).public_key(
                private_key.public_key()
            ).serial_number(
                x509.random_serial_number()
            ).not_valid_before(
                datetime.now()
            ).not_valid_after(
                datetime.now() + timedelta(days=validity_days)
            ).add_extension(
                x509.SubjectAlternativeName([
                    x509.DNSName(hostname),
                    x509.DNSName("localhost"),
                    x509.IPAddress(socket.inet_aton("127.0.0.1"))
                ]),
                critical=False,
            ).add_extension(
                x509.KeyUsage(
                    digital_signature=True,
                    key_encipherment=True,
                    key_agreement=False,
                    key_cert_sign=False,
                    crl_sign=False,
                    content_commitment=False,
                    data_encipherment=False,
                    encipher_only=False,
                    decipher_only=False
                ),
                critical=True,
            ).add_extension(
                x509.ExtendedKeyUsage([
                    x509.oid.ExtendedKeyUsageOID.SERVER_AUTH,
                    x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH
                ]),
                critical=True,
            ).sign(private_key, hashes.SHA256())
            
            # 保存证书
            with open(cert_file, "wb") as f:
                f.write(cert.public_bytes(serialization.Encoding.PEM))
            
            # 保存私钥
            with open(key_file, "wb") as f:
                f.write(private_key.private_bytes(
                    encoding=serialization.Encoding.PEM,
                    format=serialization.PrivateFormat.PKCS8,
                    encryption_algorithm=serialization.NoEncryption()
                ))
            
            # 设置文件权限
            Path(cert_file).chmod(0o644)
            Path(key_file).chmod(0o600)
            
            self.logger.info(f"自签名证书生成成功: {cert_file}")
            return True
            
        except Exception as e:
            self.logger.error(f"生成自签名证书失败: {e}")
            return False
    
    def load_certificate(self, cert_file: str) -> Optional[CertificateInfo]:
        """加载证书信息"""
        try:
            with open(cert_file, 'rb') as f:
                cert_data = f.read()
            
            cert = x509.load_pem_x509_certificate(cert_data)
            
            # 提取证书信息
            subject = cert.subject.rfc4514_string()
            issuer = cert.issuer.rfc4514_string()
            serial_number = str(cert.serial_number)
            not_before = cert.not_valid_before
            not_after = cert.not_valid_after
            
            # 计算指纹
            fingerprint = cert.fingerprint(hashes.SHA256()).hex()
            
            # 检查是否为CA证书
            is_ca = False
            try:
                basic_constraints = cert.extensions.get_extension_for_oid(
                    x509.oid.ExtensionOID.BASIC_CONSTRAINTS
                ).value
                is_ca = basic_constraints.ca
            except x509.ExtensionNotFound:
                pass
            
            # 获取密钥用途
            key_usage = []
            try:
                ku = cert.extensions.get_extension_for_oid(
                    x509.oid.ExtensionOID.KEY_USAGE
                ).value
                
                if ku.digital_signature:
                    key_usage.append("digital_signature")
                if ku.key_encipherment:
                    key_usage.append("key_encipherment")
                if ku.key_cert_sign:
                    key_usage.append("key_cert_sign")
                if ku.crl_sign:
                    key_usage.append("crl_sign")
            except x509.ExtensionNotFound:
                pass
            
            # 获取SAN
            san_dns = []
            san_ip = []
            try:
                san = cert.extensions.get_extension_for_oid(
                    x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME
                ).value
                
                for name in san:
                    if isinstance(name, x509.DNSName):
                        san_dns.append(name.value)
                    elif isinstance(name, x509.IPAddress):
                        san_ip.append(str(name.ip_address))
            except x509.ExtensionNotFound:
                pass
            
            cert_info = CertificateInfo(
                subject=subject,
                issuer=issuer,
                serial_number=serial_number,
                not_before=not_before,
                not_after=not_after,
                fingerprint=fingerprint,
                is_ca=is_ca,
                key_usage=key_usage,
                san_dns=san_dns,
                san_ip=san_ip
            )
            
            self.certificates[cert_file] = cert_info
            return cert_info
            
        except Exception as e:
            self.logger.error(f"加载证书失败: {e}")
            return None
    
    def validate_certificate(self, cert_file: str, 
                           ca_file: Optional[str] = None) -> bool:
        """验证证书"""
        try:
            cert_info = self.load_certificate(cert_file)
            if not cert_info:
                return False
            
            # 检查证书是否过期
            now = datetime.now()
            if now < cert_info.not_before:
                self.logger.error(f"证书尚未生效: {cert_file}")
                return False
            
            if now > cert_info.not_after:
                self.logger.error(f"证书已过期: {cert_file}")
                return False
            
            # 如果提供了CA文件,验证证书链
            if ca_file:
                if not self._verify_certificate_chain(cert_file, ca_file):
                    return False
            
            self.logger.info(f"证书验证通过: {cert_file}")
            return True
            
        except Exception as e:
            self.logger.error(f"验证证书失败: {e}")
            return False
    
    def _verify_certificate_chain(self, cert_file: str, ca_file: str) -> bool:
        """验证证书链"""
        try:
            # 加载证书
            with open(cert_file, 'rb') as f:
                cert_data = f.read()
            cert = x509.load_pem_x509_certificate(cert_data)
            
            # 加载CA证书
            with open(ca_file, 'rb') as f:
                ca_data = f.read()
            ca_cert = x509.load_pem_x509_certificate(ca_data)
            
            # 验证签名(简化实现)
            # 实际应该使用更完整的证书链验证
            if cert.issuer == ca_cert.subject:
                self.logger.info("证书链验证通过")
                return True
            else:
                self.logger.error("证书链验证失败")
                return False
                
        except Exception as e:
            self.logger.error(f"验证证书链失败: {e}")
            return False
    
    def create_ssl_context(self, ssl_config: SSLConfig) -> ssl.SSLContext:
        """创建SSL上下文"""
        try:
            # 选择协议版本
            if ssl_config.protocol == "TLSv1_3":
                context = ssl.SSLContext(ssl.PROTOCOL_TLS)
                context.minimum_version = ssl.TLSVersion.TLSv1_3
            else:
                context = ssl.SSLContext(ssl.PROTOCOL_TLS)
                context.minimum_version = ssl.TLSVersion.TLSv1_2
            
            # 设置验证模式
            if ssl_config.verify_mode == "CERT_NONE":
                context.check_hostname = False
                context.verify_mode = ssl.CERT_NONE
            elif ssl_config.verify_mode == "CERT_OPTIONAL":
                context.check_hostname = ssl_config.check_hostname
                context.verify_mode = ssl.CERT_OPTIONAL
            else:  # CERT_REQUIRED
                context.check_hostname = ssl_config.check_hostname
                context.verify_mode = ssl.CERT_REQUIRED
            
            # 加载证书和私钥
            context.load_cert_chain(ssl_config.cert_file, ssl_config.key_file)
            
            # 加载CA证书
            if ssl_config.ca_file:
                context.load_verify_locations(ssl_config.ca_file)
            
            # 设置密码套件
            if ssl_config.ciphers:
                context.set_ciphers(ssl_config.ciphers)
            
            self.logger.info("SSL上下文创建成功")
            return context
            
        except Exception as e:
            self.logger.error(f"创建SSL上下文失败: {e}")
            raise
    
    def get_certificate_expiry_days(self, cert_file: str) -> Optional[int]:
        """获取证书剩余有效天数"""
        try:
            cert_info = self.load_certificate(cert_file)
            if not cert_info:
                return None
            
            now = datetime.now()
            if now > cert_info.not_after:
                return 0  # 已过期
            
            delta = cert_info.not_after - now
            return delta.days
            
        except Exception as e:
            self.logger.error(f"获取证书有效期失败: {e}")
            return None
    
    def list_certificates(self) -> List[Dict[str, Any]]:
        """列出所有证书信息"""
        certificates = []
        for cert_file, cert_info in self.certificates.items():
            expiry_days = self.get_certificate_expiry_days(cert_file)
            
            certificates.append({
                'file': cert_file,
                'subject': cert_info.subject,
                'issuer': cert_info.issuer,
                'serial_number': cert_info.serial_number,
                'not_before': cert_info.not_before.isoformat(),
                'not_after': cert_info.not_after.isoformat(),
                'fingerprint': cert_info.fingerprint,
                'is_ca': cert_info.is_ca,
                'key_usage': cert_info.key_usage,
                'san_dns': cert_info.san_dns,
                'san_ip': cert_info.san_ip,
                'expiry_days': expiry_days,
                'is_expired': expiry_days is not None and expiry_days <= 0
            })
        
        return certificates

8.4 访问控制与审计

8.4.1 访问控制列表

class AccessLevel(Enum):
    """访问级别"""
    NONE = "none"
    READ = "read"
    WRITE = "write"
    CONFIGURE = "configure"
    ADMIN = "admin"

@dataclass
class AccessRule:
    """访问规则"""
    resource_type: str  # queue, exchange, binding, vhost
    resource_pattern: str  # 正则表达式
    access_level: AccessLevel
    conditions: Dict[str, Any] = None  # 额外条件

@dataclass
class AccessPolicy:
    """访问策略"""
    name: str
    description: str
    rules: List[AccessRule]
    priority: int = 0
    is_active: bool = True
    created_at: datetime = None
    updated_at: datetime = None

class AccessControlManager:
    """访问控制管理器"""
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.policies: Dict[str, AccessPolicy] = {}
        self.user_policies: Dict[str, List[str]] = {}  # 用户 -> 策略列表
        self.role_policies: Dict[str, List[str]] = {}  # 角色 -> 策略列表
    
    def create_policy(self, policy: AccessPolicy) -> bool:
        """创建访问策略"""
        try:
            if policy.name in self.policies:
                self.logger.error(f"策略已存在: {policy.name}")
                return False
            
            policy.created_at = datetime.now()
            policy.updated_at = datetime.now()
            
            self.policies[policy.name] = policy
            
            self.logger.info(f"访问策略创建成功: {policy.name}")
            return True
            
        except Exception as e:
            self.logger.error(f"创建访问策略失败: {e}")
            return False
    
    def assign_policy_to_user(self, username: str, policy_name: str) -> bool:
        """为用户分配策略"""
        try:
            if policy_name not in self.policies:
                self.logger.error(f"策略不存在: {policy_name}")
                return False
            
            if username not in self.user_policies:
                self.user_policies[username] = []
            
            if policy_name not in self.user_policies[username]:
                self.user_policies[username].append(policy_name)
            
            self.logger.info(f"策略分配成功: {username} -> {policy_name}")
            return True
            
        except Exception as e:
            self.logger.error(f"分配策略失败: {e}")
            return False
    
    def check_access(self, username: str, resource_type: str, 
                    resource_name: str, action: str, 
                    user_roles: List[str] = None) -> bool:
        """检查访问权限"""
        try:
            # 获取用户的所有策略
            user_policy_names = self.user_policies.get(username, [])
            
            # 获取角色的所有策略
            role_policy_names = []
            if user_roles:
                for role in user_roles:
                    role_policy_names.extend(self.role_policies.get(role, []))
            
            # 合并所有策略
            all_policy_names = set(user_policy_names + role_policy_names)
            
            # 按优先级排序策略
            policies = []
            for policy_name in all_policy_names:
                if policy_name in self.policies:
                    policy = self.policies[policy_name]
                    if policy.is_active:
                        policies.append(policy)
            
            policies.sort(key=lambda p: p.priority, reverse=True)
            
            # 检查每个策略的规则
            for policy in policies:
                for rule in policy.rules:
                    if self._match_rule(rule, resource_type, resource_name, action):
                        if rule.access_level == AccessLevel.ADMIN:
                            return True
                        elif action == "read" and rule.access_level in [AccessLevel.READ, AccessLevel.WRITE, AccessLevel.CONFIGURE]:
                            return True
                        elif action == "write" and rule.access_level in [AccessLevel.WRITE, AccessLevel.CONFIGURE]:
                            return True
                        elif action == "configure" and rule.access_level == AccessLevel.CONFIGURE:
                            return True
                        elif rule.access_level == AccessLevel.NONE:
                            return False
            
            # 默认拒绝访问
            return False
            
        except Exception as e:
            self.logger.error(f"检查访问权限失败: {e}")
            return False
    
    def _match_rule(self, rule: AccessRule, resource_type: str, 
                   resource_name: str, action: str) -> bool:
        """匹配访问规则"""
        try:
            import re
            
            # 检查资源类型
            if rule.resource_type != "*" and rule.resource_type != resource_type:
                return False
            
            # 检查资源名称模式
            if not re.match(rule.resource_pattern, resource_name):
                return False
            
            # 检查额外条件
            if rule.conditions:
                # 这里可以添加更复杂的条件检查
                pass
            
            return True
            
        except Exception as e:
            self.logger.error(f"匹配访问规则失败: {e}")
            return False

8.4.2 审计日志

class AuditEventType(Enum):
    """审计事件类型"""
    USER_LOGIN = "user_login"
    USER_LOGOUT = "user_logout"
    USER_CREATE = "user_create"
    USER_DELETE = "user_delete"
    USER_UPDATE = "user_update"
    PERMISSION_GRANT = "permission_grant"
    PERMISSION_REVOKE = "permission_revoke"
    QUEUE_CREATE = "queue_create"
    QUEUE_DELETE = "queue_delete"
    EXCHANGE_CREATE = "exchange_create"
    EXCHANGE_DELETE = "exchange_delete"
    MESSAGE_PUBLISH = "message_publish"
    MESSAGE_CONSUME = "message_consume"
    CONNECTION_OPEN = "connection_open"
    CONNECTION_CLOSE = "connection_close"
    POLICY_CREATE = "policy_create"
    POLICY_UPDATE = "policy_update"
    POLICY_DELETE = "policy_delete"
    ACCESS_DENIED = "access_denied"

@dataclass
class AuditEvent:
    """审计事件"""
    event_id: str
    event_type: AuditEventType
    timestamp: datetime
    username: str
    source_ip: str
    resource_type: Optional[str] = None
    resource_name: Optional[str] = None
    action: Optional[str] = None
    result: str = "success"  # success, failure, error
    details: Dict[str, Any] = None
    session_id: Optional[str] = None

class AuditLogger:
    """审计日志记录器"""
    
    def __init__(self, log_file: str = "rabbitmq_audit.log"):
        self.log_file = log_file
        self.logger = logging.getLogger(__name__)
        
        # 配置审计日志格式
        self.audit_logger = logging.getLogger("audit")
        handler = logging.FileHandler(log_file)
        formatter = logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.audit_logger.addHandler(handler)
        self.audit_logger.setLevel(logging.INFO)
        
        # 内存中的事件存储(用于查询)
        self.events: List[AuditEvent] = []
        self.max_events = 10000  # 最大事件数量
    
    def log_event(self, event: AuditEvent):
        """记录审计事件"""
        try:
            # 生成事件ID
            if not event.event_id:
                event.event_id = secrets.token_hex(16)
            
            # 设置时间戳
            if not event.timestamp:
                event.timestamp = datetime.now()
            
            # 记录到文件
            log_message = self._format_event(event)
            self.audit_logger.info(log_message)
            
            # 存储到内存
            self.events.append(event)
            
            # 限制内存中的事件数量
            if len(self.events) > self.max_events:
                self.events.pop(0)
            
        except Exception as e:
            self.logger.error(f"记录审计事件失败: {e}")
    
    def _format_event(self, event: AuditEvent) -> str:
        """格式化审计事件"""
        parts = [
            f"EVENT_ID={event.event_id}",
            f"TYPE={event.event_type.value}",
            f"USER={event.username}",
            f"SOURCE_IP={event.source_ip}",
            f"RESULT={event.result}"
        ]
        
        if event.resource_type:
            parts.append(f"RESOURCE_TYPE={event.resource_type}")
        
        if event.resource_name:
            parts.append(f"RESOURCE_NAME={event.resource_name}")
        
        if event.action:
            parts.append(f"ACTION={event.action}")
        
        if event.session_id:
            parts.append(f"SESSION_ID={event.session_id}")
        
        if event.details:
            import json
            parts.append(f"DETAILS={json.dumps(event.details)}")
        
        return " | ".join(parts)
    
    def query_events(self, 
                    start_time: Optional[datetime] = None,
                    end_time: Optional[datetime] = None,
                    event_type: Optional[AuditEventType] = None,
                    username: Optional[str] = None,
                    source_ip: Optional[str] = None,
                    result: Optional[str] = None,
                    limit: int = 100) -> List[AuditEvent]:
        """查询审计事件"""
        try:
            filtered_events = []
            
            for event in self.events:
                # 时间范围过滤
                if start_time and event.timestamp < start_time:
                    continue
                if end_time and event.timestamp > end_time:
                    continue
                
                # 事件类型过滤
                if event_type and event.event_type != event_type:
                    continue
                
                # 用户过滤
                if username and event.username != username:
                    continue
                
                # 源IP过滤
                if source_ip and event.source_ip != source_ip:
                    continue
                
                # 结果过滤
                if result and event.result != result:
                    continue
                
                filtered_events.append(event)
            
            # 按时间倒序排序
            filtered_events.sort(key=lambda e: e.timestamp, reverse=True)
            
            # 限制结果数量
            return filtered_events[:limit]
            
        except Exception as e:
            self.logger.error(f"查询审计事件失败: {e}")
            return []
    
    def get_security_summary(self, hours: int = 24) -> Dict[str, Any]:
        """获取安全摘要"""
        try:
            start_time = datetime.now() - timedelta(hours=hours)
            recent_events = self.query_events(start_time=start_time)
            
            # 统计各类事件
            event_counts = {}
            failed_logins = 0
            access_denied = 0
            successful_logins = 0
            
            for event in recent_events:
                event_type = event.event_type.value
                event_counts[event_type] = event_counts.get(event_type, 0) + 1
                
                if event.event_type == AuditEventType.USER_LOGIN:
                    if event.result == "success":
                        successful_logins += 1
                    else:
                        failed_logins += 1
                
                elif event.event_type == AuditEventType.ACCESS_DENIED:
                    access_denied += 1
            
            # 统计活跃用户
            active_users = set()
            for event in recent_events:
                if event.event_type in [AuditEventType.USER_LOGIN, 
                                      AuditEventType.MESSAGE_PUBLISH,
                                      AuditEventType.MESSAGE_CONSUME]:
                    active_users.add(event.username)
            
            # 统计源IP
            source_ips = {}
            for event in recent_events:
                ip = event.source_ip
                source_ips[ip] = source_ips.get(ip, 0) + 1
            
            return {
                'period_hours': hours,
                'total_events': len(recent_events),
                'event_counts': event_counts,
                'security_metrics': {
                    'successful_logins': successful_logins,
                    'failed_logins': failed_logins,
                    'access_denied': access_denied,
                    'active_users': len(active_users),
                    'unique_source_ips': len(source_ips)
                },
                'top_source_ips': sorted(source_ips.items(), 
                                       key=lambda x: x[1], reverse=True)[:10],
                'active_users_list': list(active_users)
            }
            
        except Exception as e:
            self.logger.error(f"获取安全摘要失败: {e}")
            return {}
    
    def export_events(self, file_path: str, 
                     start_time: Optional[datetime] = None,
                     end_time: Optional[datetime] = None) -> bool:
        """导出审计事件"""
        try:
            events = self.query_events(start_time=start_time, end_time=end_time, limit=None)
            
            import csv
            with open(file_path, 'w', newline='', encoding='utf-8') as csvfile:
                fieldnames = ['event_id', 'event_type', 'timestamp', 'username', 
                            'source_ip', 'resource_type', 'resource_name', 
                            'action', 'result', 'session_id', 'details']
                
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                writer.writeheader()
                
                for event in events:
                    row = {
                        'event_id': event.event_id,
                        'event_type': event.event_type.value,
                        'timestamp': event.timestamp.isoformat(),
                        'username': event.username,
                        'source_ip': event.source_ip,
                        'resource_type': event.resource_type or '',
                        'resource_name': event.resource_name or '',
                        'action': event.action or '',
                        'result': event.result,
                        'session_id': event.session_id or '',
                        'details': str(event.details) if event.details else ''
                    }
                    writer.writerow(row)
            
            self.logger.info(f"审计事件导出成功: {file_path}")
            return True
            
        except Exception as e:
            self.logger.error(f"导出审计事件失败: {e}")
            return False

## 8.5 完整的安全配置示例

### 8.5.1 安全管理器集成

```python
class SecurityManager:
    """安全管理器"""
    
    def __init__(self, management_api):
        self.management_api = management_api
        self.logger = logging.getLogger(__name__)
        
        # 初始化各个管理器
        self.user_manager = UserManager(management_api)
        self.vhost_manager = VHostManager(management_api)
        self.ssl_manager = SSLManager()
        self.access_control = AccessControlManager()
        self.audit_logger = AuditLogger()
        
        # 安全配置
        self.security_config = {
            'enable_ssl': True,
            'require_client_cert': False,
            'session_timeout': 3600,  # 秒
            'max_concurrent_sessions': 10,
            'enable_audit_logging': True,
            'password_complexity_check': True,
            'account_lockout_enabled': True
        }
        
        # 活跃会话管理
        self.active_sessions: Dict[str, Dict[str, Any]] = {}
    
    def initialize_security(self, ssl_config: Optional[SSLConfig] = None) -> bool:
        """初始化安全配置"""
        try:
            self.logger.info("开始初始化安全配置")
            
            # 创建默认管理员用户
            if not self._create_default_admin():
                return False
            
            # 配置SSL
            if self.security_config['enable_ssl'] and ssl_config:
                if not self._configure_ssl(ssl_config):
                    return False
            
            # 创建默认访问策略
            if not self._create_default_policies():
                return False
            
            # 创建默认虚拟主机
            if not self._create_default_vhosts():
                return False
            
            self.logger.info("安全配置初始化完成")
            return True
            
        except Exception as e:
            self.logger.error(f"初始化安全配置失败: {e}")
            return False
    
    def _create_default_admin(self) -> bool:
        """创建默认管理员用户"""
        try:
            admin_password = secrets.token_urlsafe(16)
            
            success = self.user_manager.create_user(
                username="admin",
                password=admin_password,
                tags=[UserRole.ADMINISTRATOR]
            )
            
            if success:
                # 设置管理员权限
                self.user_manager.set_user_permissions(
                    username="admin",
                    vhost="/",
                    configure=".*",
                    write=".*",
                    read=".*"
                )
                
                self.logger.info(f"默认管理员创建成功,密码: {admin_password}")
                
                # 记录审计事件
                self.audit_logger.log_event(AuditEvent(
                    event_type=AuditEventType.USER_CREATE,
                    username="system",
                    source_ip="127.0.0.1",
                    resource_name="admin",
                    details={'created_by': 'system_initialization'}
                ))
            
            return success
            
        except Exception as e:
            self.logger.error(f"创建默认管理员失败: {e}")
            return False
    
    def _configure_ssl(self, ssl_config: SSLConfig) -> bool:
        """配置SSL"""
        try:
            # 验证证书
            if not self.ssl_manager.validate_certificate(ssl_config.cert_file, ssl_config.ca_file):
                return False
            
            # 创建SSL上下文
            ssl_context = self.ssl_manager.create_ssl_context(ssl_config)
            
            self.logger.info("SSL配置成功")
            return True
            
        except Exception as e:
            self.logger.error(f"配置SSL失败: {e}")
            return False
    
    def _create_default_policies(self) -> bool:
        """创建默认访问策略"""
        try:
            # 管理员策略
            admin_policy = AccessPolicy(
                name="admin_policy",
                description="管理员完全访问权限",
                rules=[
                    AccessRule(
                        resource_type="*",
                        resource_pattern=".*",
                        access_level=AccessLevel.ADMIN
                    )
                ],
                priority=100
            )
            
            # 只读策略
            readonly_policy = AccessPolicy(
                name="readonly_policy",
                description="只读访问权限",
                rules=[
                    AccessRule(
                        resource_type="*",
                        resource_pattern=".*",
                        access_level=AccessLevel.READ
                    )
                ],
                priority=10
            )
            
            # 应用策略
            policies_created = (
                self.access_control.create_policy(admin_policy) and
                self.access_control.create_policy(readonly_policy)
            )
            
            if policies_created:
                # 为管理员分配策略
                self.access_control.assign_policy_to_user("admin", "admin_policy")
            
            return policies_created
            
        except Exception as e:
            self.logger.error(f"创建默认策略失败: {e}")
            return False
    
    def _create_default_vhosts(self) -> bool:
        """创建默认虚拟主机"""
        try:
            # 创建生产环境虚拟主机
            prod_quota = ResourceQuota(
                max_connections=1000,
                max_queues=500,
                max_exchanges=100,
                max_memory_usage=1024*1024*1024  # 1GB
            )
            
            success = self.vhost_manager.create_vhost(
                name="production",
                description="生产环境虚拟主机",
                quota=prod_quota,
                tags=["production", "critical"]
            )
            
            return success
            
        except Exception as e:
            self.logger.error(f"创建默认虚拟主机失败: {e}")
            return False
    
    def authenticate_and_authorize(self, username: str, password: str, 
                                 source_ip: str, action: str = "login") -> Dict[str, Any]:
        """认证和授权"""
        try:
            # 用户认证
            auth_result = self.user_manager.authenticate_user(username, password)
            
            if auth_result:
                # 创建会话
                session_id = self._create_session(username, source_ip)
                
                # 记录成功登录
                self.audit_logger.log_event(AuditEvent(
                    event_type=AuditEventType.USER_LOGIN,
                    username=username,
                    source_ip=source_ip,
                    result="success",
                    session_id=session_id
                ))
                
                return {
                    'success': True,
                    'session_id': session_id,
                    'message': '认证成功'
                }
            else:
                # 记录失败登录
                self.audit_logger.log_event(AuditEvent(
                    event_type=AuditEventType.USER_LOGIN,
                    username=username,
                    source_ip=source_ip,
                    result="failure",
                    details={'reason': 'invalid_credentials'}
                ))
                
                return {
                    'success': False,
                    'message': '认证失败'
                }
                
        except Exception as e:
            self.logger.error(f"认证和授权失败: {e}")
            return {
                'success': False,
                'message': '系统错误'
            }
    
    def _create_session(self, username: str, source_ip: str) -> str:
        """创建用户会话"""
        session_id = secrets.token_hex(32)
        
        # 检查并清理过期会话
        self._cleanup_expired_sessions()
        
        # 检查用户的并发会话数
        user_sessions = [s for s in self.active_sessions.values() 
                        if s['username'] == username]
        
        if len(user_sessions) >= self.security_config['max_concurrent_sessions']:
            # 移除最旧的会话
            oldest_session = min(user_sessions, key=lambda s: s['created_at'])
            del self.active_sessions[oldest_session['session_id']]
        
        # 创建新会话
        self.active_sessions[session_id] = {
            'session_id': session_id,
            'username': username,
            'source_ip': source_ip,
            'created_at': datetime.now(),
            'last_activity': datetime.now(),
            'is_active': True
        }
        
        return session_id
    
    def _cleanup_expired_sessions(self):
        """清理过期会话"""
        try:
            timeout = timedelta(seconds=self.security_config['session_timeout'])
            now = datetime.now()
            
            expired_sessions = []
            for session_id, session in self.active_sessions.items():
                if now - session['last_activity'] > timeout:
                    expired_sessions.append(session_id)
            
            for session_id in expired_sessions:
                session = self.active_sessions[session_id]
                
                # 记录会话过期
                self.audit_logger.log_event(AuditEvent(
                    event_type=AuditEventType.USER_LOGOUT,
                    username=session['username'],
                    source_ip=session['source_ip'],
                    session_id=session_id,
                    details={'reason': 'session_timeout'}
                ))
                
                del self.active_sessions[session_id]
                
        except Exception as e:
            self.logger.error(f"清理过期会话失败: {e}")
    
    def check_permission(self, session_id: str, resource_type: str, 
                        resource_name: str, action: str) -> bool:
        """检查权限"""
        try:
            # 验证会话
            if session_id not in self.active_sessions:
                return False
            
            session = self.active_sessions[session_id]
            
            # 检查会话是否过期
            timeout = timedelta(seconds=self.security_config['session_timeout'])
            if datetime.now() - session['last_activity'] > timeout:
                return False
            
            # 更新最后活动时间
            session['last_activity'] = datetime.now()
            
            username = session['username']
            
            # 获取用户角色
            user_info = self.user_manager.users.get(username)
            user_roles = [role.value for role in user_info.tags] if user_info else []
            
            # 检查访问权限
            has_permission = self.access_control.check_access(
                username, resource_type, resource_name, action, user_roles
            )
            
            if not has_permission:
                # 记录访问被拒绝
                self.audit_logger.log_event(AuditEvent(
                    event_type=AuditEventType.ACCESS_DENIED,
                    username=username,
                    source_ip=session['source_ip'],
                    resource_type=resource_type,
                    resource_name=resource_name,
                    action=action,
                    session_id=session_id
                ))
            
            return has_permission
            
        except Exception as e:
            self.logger.error(f"检查权限失败: {e}")
            return False
    
    def get_security_status(self) -> Dict[str, Any]:
        """获取安全状态"""
        try:
            # 获取用户统计
            users = self.user_manager.list_users()
            active_users = len([u for u in users if u['is_active']])
            locked_users = len([u for u in users if u['is_locked']])
            
            # 获取会话统计
            self._cleanup_expired_sessions()
            active_sessions = len(self.active_sessions)
            
            # 获取证书状态
            certificates = self.ssl_manager.list_certificates()
            expiring_certs = len([c for c in certificates 
                                if c['expiry_days'] is not None and c['expiry_days'] <= 30])
            
            # 获取安全摘要
            security_summary = self.audit_logger.get_security_summary(24)
            
            return {
                'users': {
                    'total': len(users),
                    'active': active_users,
                    'locked': locked_users
                },
                'sessions': {
                    'active': active_sessions,
                    'max_allowed': self.security_config['max_concurrent_sessions']
                },
                'certificates': {
                    'total': len(certificates),
                    'expiring_soon': expiring_certs
                },
                'audit_summary': security_summary,
                'security_config': self.security_config
            }
            
        except Exception as e:
             self.logger.error(f"获取安全状态失败: {e}")
             return {}

### 8.5.2 完整的安全配置示例

```python
def security_configuration_example():
    """完整的安全配置示例"""
    
    # 模拟管理API
    class MockManagementAPI:
        def create_user(self, username, user_data):
            print(f"创建用户: {username}")
        
        def update_user(self, username, user_data):
            print(f"更新用户: {username}")
        
        def delete_user(self, username):
            print(f"删除用户: {username}")
        
        def set_permissions(self, vhost, username, permission_data):
            print(f"设置权限: {username}@{vhost}")
        
        def create_vhost(self, name, vhost_data):
            print(f"创建虚拟主机: {name}")
        
        def delete_vhost(self, name):
            print(f"删除虚拟主机: {name}")
        
        def create_policy(self, vhost, name, policy_data):
            print(f"创建策略: {name}@{vhost}")
    
    # 初始化安全管理器
    management_api = MockManagementAPI()
    security_manager = SecurityManager(management_api)
    
    print("=== RabbitMQ 安全配置示例 ===")
    
    # 1. 生成SSL证书
    print("\n1. 生成SSL证书")
    ssl_manager = security_manager.ssl_manager
    
    cert_success = ssl_manager.generate_self_signed_cert(
        hostname="rabbitmq.example.com",
        cert_file="/etc/rabbitmq/ssl/server.crt",
        key_file="/etc/rabbitmq/ssl/server.key",
        validity_days=365
    )
    
    if cert_success:
        print("✓ SSL证书生成成功")
        
        # 加载并验证证书
        cert_info = ssl_manager.load_certificate("/etc/rabbitmq/ssl/server.crt")
        if cert_info:
            print(f"  证书主题: {cert_info.subject}")
            print(f"  有效期至: {cert_info.not_after}")
            print(f"  指纹: {cert_info.fingerprint[:16]}...")
    
    # 2. 初始化安全配置
    print("\n2. 初始化安全配置")
    
    ssl_config = SSLConfig(
        cert_file="/etc/rabbitmq/ssl/server.crt",
        key_file="/etc/rabbitmq/ssl/server.key",
        verify_mode="CERT_REQUIRED",
        protocol="TLSv1_2"
    )
    
    init_success = security_manager.initialize_security(ssl_config)
    if init_success:
        print("✓ 安全配置初始化成功")
    
    # 3. 创建用户和角色
    print("\n3. 创建用户和角色")
    
    # 创建开发者用户
    dev_user_created = security_manager.user_manager.create_user(
        username="developer",
        password="DevPass123!",
        tags=[UserRole.NONE]
    )
    
    if dev_user_created:
        print("✓ 开发者用户创建成功")
        
        # 设置开发环境权限
        security_manager.user_manager.set_user_permissions(
            username="developer",
            vhost="development",
            configure="dev\..*",
            write="dev\..*",
            read="dev\..*"
        )
    
    # 创建监控用户
    monitor_user_created = security_manager.user_manager.create_user(
        username="monitor",
        password="MonitorPass123!",
        tags=[UserRole.MONITORING]
    )
    
    if monitor_user_created:
        print("✓ 监控用户创建成功")
        
        # 设置只读权限
        security_manager.user_manager.set_user_permissions(
            username="monitor",
            vhost="/",
            configure="",
            write="",
            read=".*"
        )
    
    # 4. 创建虚拟主机和资源配额
    print("\n4. 创建虚拟主机和资源配额")
    
    # 开发环境
    dev_quota = ResourceQuota(
        max_connections=100,
        max_queues=50,
        max_exchanges=20,
        max_memory_usage=512*1024*1024  # 512MB
    )
    
    dev_vhost_created = security_manager.vhost_manager.create_vhost(
        name="development",
        description="开发环境虚拟主机",
        quota=dev_quota,
        tags=["development", "non-critical"]
    )
    
    if dev_vhost_created:
        print("✓ 开发环境虚拟主机创建成功")
    
    # 测试环境
    test_quota = ResourceQuota(
        max_connections=200,
        max_queues=100,
        max_exchanges=50,
        max_memory_usage=1024*1024*1024  # 1GB
    )
    
    test_vhost_created = security_manager.vhost_manager.create_vhost(
        name="testing",
        description="测试环境虚拟主机",
        quota=test_quota,
        tags=["testing", "staging"]
    )
    
    if test_vhost_created:
        print("✓ 测试环境虚拟主机创建成功")
    
    # 5. 创建访问策略
    print("\n5. 创建访问策略")
    
    # 开发者策略
    dev_policy = AccessPolicy(
        name="developer_policy",
        description="开发者访问策略",
        rules=[
            AccessRule(
                resource_type="queue",
                resource_pattern="dev\..*",
                access_level=AccessLevel.WRITE
            ),
            AccessRule(
                resource_type="exchange",
                resource_pattern="dev\..*",
                access_level=AccessLevel.CONFIGURE
            )
        ],
        priority=50
    )
    
    dev_policy_created = security_manager.access_control.create_policy(dev_policy)
    if dev_policy_created:
        print("✓ 开发者策略创建成功")
        
        # 分配策略给开发者
        security_manager.access_control.assign_policy_to_user("developer", "developer_policy")
    
    # 监控策略
    monitor_policy = AccessPolicy(
        name="monitor_policy",
        description="监控访问策略",
        rules=[
            AccessRule(
                resource_type="*",
                resource_pattern=".*",
                access_level=AccessLevel.READ
            )
        ],
        priority=20
    )
    
    monitor_policy_created = security_manager.access_control.create_policy(monitor_policy)
    if monitor_policy_created:
        print("✓ 监控策略创建成功")
        
        # 分配策略给监控用户
        security_manager.access_control.assign_policy_to_user("monitor", "monitor_policy")
    
    # 6. 模拟用户认证和权限检查
    print("\n6. 用户认证和权限检查")
    
    # 开发者登录
    dev_auth = security_manager.authenticate_and_authorize(
        username="developer",
        password="DevPass123!",
        source_ip="192.168.1.100"
    )
    
    if dev_auth['success']:
        print("✓ 开发者认证成功")
        session_id = dev_auth['session_id']
        
        # 检查队列创建权限
        can_create_queue = security_manager.check_permission(
            session_id=session_id,
            resource_type="queue",
            resource_name="dev.test.queue",
            action="configure"
        )
        
        print(f"  开发队列创建权限: {'✓' if can_create_queue else '✗'}")
        
        # 检查生产队列访问权限(应该被拒绝)
        can_access_prod = security_manager.check_permission(
            session_id=session_id,
            resource_type="queue",
            resource_name="prod.critical.queue",
            action="write"
        )
        
        print(f"  生产队列访问权限: {'✓' if can_access_prod else '✗ (正确拒绝)'}")
    
    # 监控用户登录
    monitor_auth = security_manager.authenticate_and_authorize(
        username="monitor",
        password="MonitorPass123!",
        source_ip="192.168.1.200"
    )
    
    if monitor_auth['success']:
        print("✓ 监控用户认证成功")
        session_id = monitor_auth['session_id']
        
        # 检查读取权限
        can_read = security_manager.check_permission(
            session_id=session_id,
            resource_type="queue",
            resource_name="any.queue",
            action="read"
        )
        
        print(f"  队列读取权限: {'✓' if can_read else '✗'}")
        
        # 检查写入权限(应该被拒绝)
        can_write = security_manager.check_permission(
            session_id=session_id,
            resource_type="queue",
            resource_name="any.queue",
            action="write"
        )
        
        print(f"  队列写入权限: {'✓' if can_write else '✗ (正确拒绝)'}")
    
    # 7. 获取安全状态报告
    print("\n7. 安全状态报告")
    
    security_status = security_manager.get_security_status()
    
    print(f"  用户总数: {security_status['users']['total']}")
    print(f"  活跃用户: {security_status['users']['active']}")
    print(f"  锁定用户: {security_status['users']['locked']}")
    print(f"  活跃会话: {security_status['sessions']['active']}")
    print(f"  最大会话数: {security_status['sessions']['max_allowed']}")
    
    # 8. 审计日志查询
    print("\n8. 审计日志查询")
    
    # 查询最近的登录事件
    recent_logins = security_manager.audit_logger.query_events(
        event_type=AuditEventType.USER_LOGIN,
        limit=5
    )
    
    print(f"  最近登录事件: {len(recent_logins)} 条")
    for event in recent_logins:
        print(f"    {event.timestamp.strftime('%H:%M:%S')} - {event.username} from {event.source_ip} ({event.result})")
    
    # 查询访问被拒绝事件
    denied_events = security_manager.audit_logger.query_events(
        event_type=AuditEventType.ACCESS_DENIED,
        limit=3
    )
    
    print(f"  访问拒绝事件: {len(denied_events)} 条")
    for event in denied_events:
        print(f"    {event.timestamp.strftime('%H:%M:%S')} - {event.username} 访问 {event.resource_name} 被拒绝")
    
    # 9. 导出审计日志
    print("\n9. 导出审计日志")
    
    export_success = security_manager.audit_logger.export_events(
        file_path="rabbitmq_audit_export.csv",
        start_time=datetime.now() - timedelta(hours=24)
    )
    
    if export_success:
        print("✓ 审计日志导出成功: rabbitmq_audit_export.csv")
    
    print("\n=== 安全配置示例完成 ===")

if __name__ == "__main__":
    security_configuration_example()

8.6 本章总结

8.6.1 核心知识点

  1. 用户与权限管理

    • 用户角色和权限体系
    • 密码策略和安全验证
    • 用户认证和会话管理
    • 权限分配和访问控制
  2. 虚拟主机与资源隔离

    • 虚拟主机的创建和管理
    • 资源配额和限制
    • 多租户环境的隔离
    • 资源使用监控
  3. SSL/TLS加密配置

    • SSL证书的生成和管理
    • 加密通信配置
    • 证书验证和链信任
    • 安全协议选择
  4. 访问控制与审计

    • 基于角色的访问控制(RBAC)
    • 访问策略和规则
    • 审计日志记录
    • 安全事件监控

8.6.2 最佳实践

  1. 密码安全

    • 实施强密码策略
    • 定期更换密码
    • 启用账户锁定机制
    • 监控登录失败
  2. 网络安全

    • 启用SSL/TLS加密
    • 使用有效的证书
    • 限制网络访问
    • 配置防火墙规则
  3. 权限管理

    • 遵循最小权限原则
    • 定期审查用户权限
    • 使用角色分离
    • 实施访问控制策略
  4. 审计监控

    • 启用全面的审计日志
    • 定期分析安全事件
    • 设置安全告警
    • 保留足够的日志历史

8.6.3 练习题

  1. 基础练习

    • 创建具有不同角色的用户
    • 配置虚拟主机和资源配额
    • 生成和配置SSL证书
    • 设置基本的访问控制策略
  2. 进阶练习

    • 实现多租户环境的完整隔离
    • 配置复杂的权限继承体系
    • 集成外部认证系统
    • 实现自动化的安全监控
  3. 实战项目

    • 设计企业级的安全架构
    • 实现合规性审计系统
    • 构建安全事件响应机制
    • 开发安全配置自动化工具

通过本章的学习,你应该能够: - 理解RabbitMQ的安全模型和机制 - 配置完整的用户权限管理系统 - 实施SSL/TLS加密和证书管理 - 建立有效的访问控制和审计体系 - 应用安全最佳实践保护RabbitMQ环境

下一章我们将学习RabbitMQ的故障排除与运维管理,包括常见问题诊断、性能调优和运维自动化等内容。