5.1 消息持久化机制

5.1.1 持久化配置管理器

from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Any, Callable
import pika
import json
import time
import threading
import logging
from pathlib import Path

class PersistenceLevel(Enum):
    """持久化级别"""
    NONE = "none"              # 不持久化
    MEMORY = "memory"          # 内存持久化
    DISK = "disk"              # 磁盘持久化
    REPLICATED = "replicated"  # 复制持久化

class DeliveryMode(Enum):
    """投递模式"""
    NON_PERSISTENT = 1  # 非持久化
    PERSISTENT = 2      # 持久化

@dataclass
class PersistenceConfig:
    """持久化配置"""
    exchange_durable: bool = True
    queue_durable: bool = True
    message_persistent: bool = True
    delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT
    sync_writes: bool = True
    fsync_interval: int = 1000  # 毫秒
    backup_enabled: bool = False
    backup_path: Optional[str] = None

class PersistenceManager:
    """持久化管理器"""
    
    def __init__(self, channel, config: PersistenceConfig = None):
        self.channel = channel
        self.config = config or PersistenceConfig()
        self.logger = logging.getLogger(__name__)
        self.persistence_stats = {
            'messages_persisted': 0,
            'messages_lost': 0,
            'sync_operations': 0,
            'backup_operations': 0
        }
        self.lock = threading.Lock()
    
    def create_durable_exchange(self, exchange_name: str, exchange_type: str = 'direct',
                               arguments: Optional[Dict[str, Any]] = None) -> bool:
        """创建持久化交换机"""
        try:
            self.channel.exchange_declare(
                exchange=exchange_name,
                exchange_type=exchange_type,
                durable=self.config.exchange_durable,
                auto_delete=False,
                internal=False,
                arguments=arguments or {}
            )
            
            self.logger.info(f"✅ 持久化交换机创建成功: {exchange_name}")
            return True
            
        except Exception as e:
            self.logger.error(f"❌ 持久化交换机创建失败: {e}")
            return False
    
    def create_durable_queue(self, queue_name: str, 
                            arguments: Optional[Dict[str, Any]] = None) -> bool:
        """创建持久化队列"""
        try:
            # 设置队列参数
            queue_args = arguments or {}
            
            # 如果启用了备份,设置备份队列
            if self.config.backup_enabled:
                queue_args['x-message-ttl'] = 86400000  # 24小时TTL
                queue_args['x-dead-letter-exchange'] = f"{queue_name}.backup"
            
            result = self.channel.queue_declare(
                queue=queue_name,
                durable=self.config.queue_durable,
                exclusive=False,
                auto_delete=False,
                arguments=queue_args
            )
            
            self.logger.info(f"✅ 持久化队列创建成功: {queue_name}")
            return True
            
        except Exception as e:
            self.logger.error(f"❌ 持久化队列创建失败: {e}")
            return False
    
    def publish_persistent_message(self, exchange_name: str, routing_key: str,
                                  message: Dict[str, Any], 
                                  properties: Optional[pika.BasicProperties] = None) -> bool:
        """发布持久化消息"""
        try:
            # 设置消息属性
            if properties is None:
                properties = pika.BasicProperties()
            
            # 设置持久化模式
            if self.config.message_persistent:
                properties.delivery_mode = self.config.delivery_mode.value
            
            # 添加时间戳和消息ID
            properties.timestamp = int(time.time())
            properties.message_id = f"msg_{int(time.time() * 1000000)}"
            
            # 序列化消息
            body = json.dumps(message, ensure_ascii=False)
            
            # 发布消息
            self.channel.basic_publish(
                exchange=exchange_name,
                routing_key=routing_key,
                body=body.encode('utf-8'),
                properties=properties,
                mandatory=True  # 确保消息能够路由到队列
            )
            
            # 如果启用同步写入,等待确认
            if self.config.sync_writes:
                self.channel.confirm_delivery()
            
            with self.lock:
                self.persistence_stats['messages_persisted'] += 1
                if self.config.sync_writes:
                    self.persistence_stats['sync_operations'] += 1
            
            self.logger.info(f"✅ 持久化消息发布成功: {exchange_name}/{routing_key}")
            return True
            
        except Exception as e:
            with self.lock:
                self.persistence_stats['messages_lost'] += 1
            
            self.logger.error(f"❌ 持久化消息发布失败: {e}")
            return False
    
    def create_backup_queue(self, original_queue: str) -> bool:
        """创建备份队列"""
        if not self.config.backup_enabled:
            return False
        
        try:
            backup_queue = f"{original_queue}.backup"
            backup_exchange = f"{original_queue}.backup"
            
            # 创建备份交换机
            self.create_durable_exchange(backup_exchange, 'direct')
            
            # 创建备份队列
            self.create_durable_queue(backup_queue)
            
            # 绑定备份队列
            self.channel.queue_bind(
                exchange=backup_exchange,
                queue=backup_queue,
                routing_key=''
            )
            
            with self.lock:
                self.persistence_stats['backup_operations'] += 1
            
            self.logger.info(f"✅ 备份队列创建成功: {backup_queue}")
            return True
            
        except Exception as e:
            self.logger.error(f"❌ 备份队列创建失败: {e}")
            return False
    
    def get_persistence_stats(self) -> Dict[str, int]:
        """获取持久化统计"""
        with self.lock:
            return self.persistence_stats.copy()
    
    def reset_stats(self):
        """重置统计信息"""
        with self.lock:
            for key in self.persistence_stats:
                self.persistence_stats[key] = 0

5.1.2 消息确认机制

from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
import uuid

class ConfirmationMode(Enum):
    """确认模式"""
    NONE = "none"          # 无确认
    SIMPLE = "simple"      # 简单确认
    BATCH = "batch"        # 批量确认
    ASYNC = "async"        # 异步确认

@dataclass
class ConfirmationConfig:
    """确认配置"""
    mode: ConfirmationMode = ConfirmationMode.SIMPLE
    timeout: int = 5000  # 毫秒
    retry_count: int = 3
    batch_size: int = 100
    async_workers: int = 4

class ConfirmationCallback(ABC):
    """确认回调接口"""
    
    @abstractmethod
    def on_ack(self, delivery_tag: int, multiple: bool = False):
        """消息确认回调"""
        pass
    
    @abstractmethod
    def on_nack(self, delivery_tag: int, multiple: bool = False, requeue: bool = True):
        """消息拒绝回调"""
        pass
    
    @abstractmethod
    def on_return(self, channel, method, properties, body):
        """消息返回回调"""
        pass

class MessageConfirmationManager:
    """消息确认管理器"""
    
    def __init__(self, channel, config: ConfirmationConfig = None):
        self.channel = channel
        self.config = config or ConfirmationConfig()
        self.pending_confirmations = {}
        self.confirmation_callbacks = []
        self.lock = threading.Lock()
        self.executor = ThreadPoolExecutor(max_workers=self.config.async_workers)
        self.logger = logging.getLogger(__name__)
        
        # 设置确认模式
        self._setup_confirmation_mode()
    
    def _setup_confirmation_mode(self):
        """设置确认模式"""
        if self.config.mode != ConfirmationMode.NONE:
            # 启用发布确认
            self.channel.confirm_delivery()
            
            # 设置确认回调
            self.channel.add_on_ack_callback(self._on_delivery_confirmation)
            self.channel.add_on_nack_callback(self._on_delivery_rejection)
            self.channel.add_on_return_callback(self._on_message_return)
    
    def add_confirmation_callback(self, callback: ConfirmationCallback):
        """添加确认回调"""
        self.confirmation_callbacks.append(callback)
    
    def _on_delivery_confirmation(self, method_frame):
        """投递确认处理"""
        delivery_tag = method_frame.method.delivery_tag
        multiple = method_frame.method.multiple
        
        with self.lock:
            if multiple:
                # 批量确认
                confirmed_tags = [tag for tag in self.pending_confirmations.keys() 
                                if tag <= delivery_tag]
                for tag in confirmed_tags:
                    del self.pending_confirmations[tag]
            else:
                # 单个确认
                if delivery_tag in self.pending_confirmations:
                    del self.pending_confirmations[delivery_tag]
        
        # 调用回调
        for callback in self.confirmation_callbacks:
            try:
                callback.on_ack(delivery_tag, multiple)
            except Exception as e:
                self.logger.error(f"确认回调执行失败: {e}")
    
    def _on_delivery_rejection(self, method_frame):
        """投递拒绝处理"""
        delivery_tag = method_frame.method.delivery_tag
        multiple = method_frame.method.multiple
        
        with self.lock:
            if multiple:
                # 批量拒绝
                rejected_tags = [tag for tag in self.pending_confirmations.keys() 
                               if tag <= delivery_tag]
                for tag in rejected_tags:
                    if tag in self.pending_confirmations:
                        del self.pending_confirmations[tag]
            else:
                # 单个拒绝
                if delivery_tag in self.pending_confirmations:
                    del self.pending_confirmations[delivery_tag]
        
        # 调用回调
        for callback in self.confirmation_callbacks:
            try:
                callback.on_nack(delivery_tag, multiple)
            except Exception as e:
                self.logger.error(f"拒绝回调执行失败: {e}")
    
    def _on_message_return(self, channel, method, properties, body):
        """消息返回处理"""
        self.logger.warning(f"消息返回: {method.reply_text}")
        
        # 调用回调
        for callback in self.confirmation_callbacks:
            try:
                callback.on_return(channel, method, properties, body)
            except Exception as e:
                self.logger.error(f"返回回调执行失败: {e}")
    
    def publish_with_confirmation(self, exchange: str, routing_key: str, 
                                 body: bytes, properties: pika.BasicProperties = None) -> str:
        """发布消息并等待确认"""
        message_id = str(uuid.uuid4())
        
        # 记录待确认消息
        with self.lock:
            delivery_tag = self.channel._delivery_tag_iter.__next__()
            self.pending_confirmations[delivery_tag] = {
                'message_id': message_id,
                'timestamp': time.time(),
                'exchange': exchange,
                'routing_key': routing_key
            }
        
        try:
            # 发布消息
            self.channel.basic_publish(
                exchange=exchange,
                routing_key=routing_key,
                body=body,
                properties=properties,
                mandatory=True
            )
            
            if self.config.mode == ConfirmationMode.SIMPLE:
                # 同步等待确认
                if self.channel.confirm_delivery():
                    self.logger.info(f"✅ 消息确认成功: {message_id}")
                else:
                    self.logger.error(f"❌ 消息确认失败: {message_id}")
            
            return message_id
            
        except Exception as e:
            # 移除待确认记录
            with self.lock:
                if delivery_tag in self.pending_confirmations:
                    del self.pending_confirmations[delivery_tag]
            
            self.logger.error(f"❌ 消息发布失败: {e}")
            raise
    
    def get_pending_confirmations(self) -> Dict[int, Dict[str, Any]]:
        """获取待确认消息"""
        with self.lock:
            return self.pending_confirmations.copy()
    
    def cleanup_expired_confirmations(self, timeout_seconds: int = 30):
        """清理过期的确认记录"""
        current_time = time.time()
        expired_tags = []
        
        with self.lock:
            for tag, info in self.pending_confirmations.items():
                if current_time - info['timestamp'] > timeout_seconds:
                    expired_tags.append(tag)
            
            for tag in expired_tags:
                del self.pending_confirmations[tag]
        
        if expired_tags:
            self.logger.warning(f"清理过期确认记录: {len(expired_tags)} 条")

5.1.3 事务管理

class TransactionMode(Enum):
    """事务模式"""
    NONE = "none"          # 无事务
    LOCAL = "local"        # 本地事务
    DISTRIBUTED = "distributed"  # 分布式事务

@dataclass
class TransactionConfig:
    """事务配置"""
    mode: TransactionMode = TransactionMode.LOCAL
    timeout: int = 30000  # 毫秒
    auto_commit: bool = False
    isolation_level: str = "READ_COMMITTED"
    retry_count: int = 3

class TransactionManager:
    """事务管理器"""
    
    def __init__(self, channel, config: TransactionConfig = None):
        self.channel = channel
        self.config = config or TransactionConfig()
        self.transaction_active = False
        self.transaction_id = None
        self.operations = []
        self.lock = threading.Lock()
        self.logger = logging.getLogger(__name__)
    
    def begin_transaction(self) -> str:
        """开始事务"""
        with self.lock:
            if self.transaction_active:
                raise RuntimeError("事务已经开始")
            
            try:
                # 启用事务模式
                self.channel.tx_select()
                
                self.transaction_id = str(uuid.uuid4())
                self.transaction_active = True
                self.operations.clear()
                
                self.logger.info(f"✅ 事务开始: {self.transaction_id}")
                return self.transaction_id
                
            except Exception as e:
                self.logger.error(f"❌ 事务开始失败: {e}")
                raise
    
    def add_operation(self, operation_type: str, **kwargs):
        """添加事务操作"""
        if not self.transaction_active:
            raise RuntimeError("事务未开始")
        
        operation = {
            'type': operation_type,
            'timestamp': time.time(),
            'params': kwargs
        }
        
        self.operations.append(operation)
    
    def publish_in_transaction(self, exchange: str, routing_key: str, 
                              body: bytes, properties: pika.BasicProperties = None) -> bool:
        """在事务中发布消息"""
        if not self.transaction_active:
            raise RuntimeError("事务未开始")
        
        try:
            # 记录操作
            self.add_operation('publish', 
                             exchange=exchange, 
                             routing_key=routing_key, 
                             body=body, 
                             properties=properties)
            
            # 执行发布
            self.channel.basic_publish(
                exchange=exchange,
                routing_key=routing_key,
                body=body,
                properties=properties
            )
            
            self.logger.info(f"📝 事务中发布消息: {exchange}/{routing_key}")
            return True
            
        except Exception as e:
            self.logger.error(f"❌ 事务中发布消息失败: {e}")
            return False
    
    def commit_transaction(self) -> bool:
        """提交事务"""
        with self.lock:
            if not self.transaction_active:
                raise RuntimeError("事务未开始")
            
            try:
                # 提交事务
                self.channel.tx_commit()
                
                self.logger.info(f"✅ 事务提交成功: {self.transaction_id} ({len(self.operations)} 个操作)")
                
                # 重置状态
                self.transaction_active = False
                self.transaction_id = None
                self.operations.clear()
                
                return True
                
            except Exception as e:
                self.logger.error(f"❌ 事务提交失败: {e}")
                # 尝试回滚
                self.rollback_transaction()
                return False
    
    def rollback_transaction(self) -> bool:
        """回滚事务"""
        with self.lock:
            if not self.transaction_active:
                raise RuntimeError("事务未开始")
            
            try:
                # 回滚事务
                self.channel.tx_rollback()
                
                self.logger.info(f"🔄 事务回滚成功: {self.transaction_id}")
                
                # 重置状态
                self.transaction_active = False
                self.transaction_id = None
                self.operations.clear()
                
                return True
                
            except Exception as e:
                self.logger.error(f"❌ 事务回滚失败: {e}")
                return False
    
    def get_transaction_info(self) -> Dict[str, Any]:
        """获取事务信息"""
        with self.lock:
            return {
                'transaction_id': self.transaction_id,
                'active': self.transaction_active,
                'operations_count': len(self.operations),
                'operations': self.operations.copy()
            }
    
    def __enter__(self):
        """上下文管理器入口"""
        self.begin_transaction()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """上下文管理器出口"""
        if exc_type is None:
            # 没有异常,提交事务
            self.commit_transaction()
        else:
            # 有异常,回滚事务
            self.rollback_transaction()
        
        return False  # 不抑制异常

5.2 可靠性保证机制

5.2.1 消息重试机制

class RetryStrategy(Enum):
    """重试策略"""
    FIXED_DELAY = "fixed"      # 固定延迟
    EXPONENTIAL = "exponential"  # 指数退避
    LINEAR = "linear"          # 线性增长
    CUSTOM = "custom"          # 自定义

@dataclass
class RetryConfig:
    """重试配置"""
    strategy: RetryStrategy = RetryStrategy.EXPONENTIAL
    max_attempts: int = 3
    initial_delay: int = 1000  # 毫秒
    max_delay: int = 30000     # 毫秒
    multiplier: float = 2.0
    jitter: bool = True
    dead_letter_enabled: bool = True
    dead_letter_exchange: str = "dlx"
    dead_letter_routing_key: str = "failed"

class MessageRetryManager:
    """消息重试管理器"""
    
    def __init__(self, channel, config: RetryConfig = None):
        self.channel = channel
        self.config = config or RetryConfig()
        self.retry_stats = {
            'total_retries': 0,
            'successful_retries': 0,
            'failed_retries': 0,
            'dead_letters': 0
        }
        self.lock = threading.Lock()
        self.logger = logging.getLogger(__name__)
        
        # 设置死信交换机
        if self.config.dead_letter_enabled:
            self._setup_dead_letter_exchange()
    
    def _setup_dead_letter_exchange(self):
        """设置死信交换机"""
        try:
            # 创建死信交换机
            self.channel.exchange_declare(
                exchange=self.config.dead_letter_exchange,
                exchange_type='direct',
                durable=True
            )
            
            # 创建死信队列
            dlq_name = f"{self.config.dead_letter_exchange}.queue"
            self.channel.queue_declare(
                queue=dlq_name,
                durable=True
            )
            
            # 绑定死信队列
            self.channel.queue_bind(
                exchange=self.config.dead_letter_exchange,
                queue=dlq_name,
                routing_key=self.config.dead_letter_routing_key
            )
            
            self.logger.info(f"✅ 死信交换机设置完成: {self.config.dead_letter_exchange}")
            
        except Exception as e:
            self.logger.error(f"❌ 死信交换机设置失败: {e}")
    
    def create_retry_queue(self, original_queue: str, retry_delay: int) -> str:
        """创建重试队列"""
        retry_queue = f"{original_queue}.retry.{retry_delay}"
        
        try:
            # 设置重试队列参数
            arguments = {
                'x-message-ttl': retry_delay,  # 消息TTL
                'x-dead-letter-exchange': '',  # 死信交换机(默认交换机)
                'x-dead-letter-routing-key': original_queue  # 重新路由到原队列
            }
            
            self.channel.queue_declare(
                queue=retry_queue,
                durable=True,
                arguments=arguments
            )
            
            self.logger.info(f"✅ 重试队列创建成功: {retry_queue}")
            return retry_queue
            
        except Exception as e:
            self.logger.error(f"❌ 重试队列创建失败: {e}")
            raise
    
    def calculate_retry_delay(self, attempt: int) -> int:
        """计算重试延迟"""
        if self.config.strategy == RetryStrategy.FIXED_DELAY:
            delay = self.config.initial_delay
        elif self.config.strategy == RetryStrategy.EXPONENTIAL:
            delay = int(self.config.initial_delay * (self.config.multiplier ** (attempt - 1)))
        elif self.config.strategy == RetryStrategy.LINEAR:
            delay = self.config.initial_delay * attempt
        else:
            delay = self.config.initial_delay
        
        # 限制最大延迟
        delay = min(delay, self.config.max_delay)
        
        # 添加抖动
        if self.config.jitter:
            import random
            jitter_range = int(delay * 0.1)  # 10% 抖动
            delay += random.randint(-jitter_range, jitter_range)
        
        return max(delay, 100)  # 最小100ms
    
    def retry_message(self, original_queue: str, message_body: bytes, 
                     properties: pika.BasicProperties, attempt: int) -> bool:
        """重试消息"""
        if attempt > self.config.max_attempts:
            # 超过最大重试次数,发送到死信队列
            return self._send_to_dead_letter(message_body, properties, original_queue)
        
        try:
            # 计算重试延迟
            retry_delay = self.calculate_retry_delay(attempt)
            
            # 创建重试队列
            retry_queue = self.create_retry_queue(original_queue, retry_delay)
            
            # 更新消息属性
            if properties.headers is None:
                properties.headers = {}
            
            properties.headers['x-retry-count'] = attempt
            properties.headers['x-original-queue'] = original_queue
            properties.headers['x-retry-delay'] = retry_delay
            properties.headers['x-retry-timestamp'] = int(time.time())
            
            # 发送到重试队列
            self.channel.basic_publish(
                exchange='',
                routing_key=retry_queue,
                body=message_body,
                properties=properties
            )
            
            with self.lock:
                self.retry_stats['total_retries'] += 1
            
            self.logger.info(f"🔄 消息重试: 队列={original_queue}, 尝试={attempt}, 延迟={retry_delay}ms")
            return True
            
        except Exception as e:
            with self.lock:
                self.retry_stats['failed_retries'] += 1
            
            self.logger.error(f"❌ 消息重试失败: {e}")
            return False
    
    def _send_to_dead_letter(self, message_body: bytes, properties: pika.BasicProperties, 
                           original_queue: str) -> bool:
        """发送到死信队列"""
        if not self.config.dead_letter_enabled:
            return False
        
        try:
            # 更新消息属性
            if properties.headers is None:
                properties.headers = {}
            
            properties.headers['x-death-reason'] = 'max-retries-exceeded'
            properties.headers['x-original-queue'] = original_queue
            properties.headers['x-death-timestamp'] = int(time.time())
            
            # 发送到死信交换机
            self.channel.basic_publish(
                exchange=self.config.dead_letter_exchange,
                routing_key=self.config.dead_letter_routing_key,
                body=message_body,
                properties=properties
            )
            
            with self.lock:
                self.retry_stats['dead_letters'] += 1
            
            self.logger.warning(f"💀 消息发送到死信队列: {original_queue}")
            return True
            
        except Exception as e:
            self.logger.error(f"❌ 发送到死信队列失败: {e}")
            return False
    
    def get_retry_stats(self) -> Dict[str, int]:
        """获取重试统计"""
        with self.lock:
            return self.retry_stats.copy()
    
    def reset_stats(self):
        """重置统计信息"""
        with self.lock:
            for key in self.retry_stats:
                self.retry_stats[key] = 0

### 5.2.2 消息幂等性保证

```python
import hashlib
from typing import Set, Dict, Optional, Callable
from datetime import datetime, timedelta

class IdempotencyStrategy(Enum):
    """幂等性策略"""
    MESSAGE_ID = "message_id"      # 基于消息ID
    CONTENT_HASH = "content_hash"  # 基于内容哈希
    CUSTOM_KEY = "custom_key"      # 自定义键
    COMPOSITE = "composite"        # 组合策略

@dataclass
class IdempotencyConfig:
    """幂等性配置"""
    strategy: IdempotencyStrategy = IdempotencyStrategy.MESSAGE_ID
    ttl_seconds: int = 3600  # 幂等性记录TTL
    storage_type: str = "memory"  # memory, redis, database
    max_cache_size: int = 10000
    cleanup_interval: int = 300  # 清理间隔(秒)
    hash_algorithm: str = "sha256"

class IdempotencyManager:
    """幂等性管理器"""
    
    def __init__(self, config: IdempotencyConfig = None):
        self.config = config or IdempotencyConfig()
        self.processed_messages: Dict[str, datetime] = {}
        self.lock = threading.Lock()
        self.logger = logging.getLogger(__name__)
        
        # 启动清理线程
        self._start_cleanup_thread()
    
    def _start_cleanup_thread(self):
        """启动清理线程"""
        def cleanup_expired():
            while True:
                try:
                    self._cleanup_expired_records()
                    time.sleep(self.config.cleanup_interval)
                except Exception as e:
                    self.logger.error(f"清理过期记录失败: {e}")
        
        cleanup_thread = threading.Thread(target=cleanup_expired, daemon=True)
        cleanup_thread.start()
    
    def _cleanup_expired_records(self):
        """清理过期记录"""
        current_time = datetime.now()
        expired_keys = []
        
        with self.lock:
            for key, timestamp in self.processed_messages.items():
                if current_time - timestamp > timedelta(seconds=self.config.ttl_seconds):
                    expired_keys.append(key)
            
            for key in expired_keys:
                del self.processed_messages[key]
        
        if expired_keys:
            self.logger.info(f"清理过期幂等性记录: {len(expired_keys)} 条")
    
    def generate_idempotency_key(self, message_body: bytes, 
                                properties: pika.BasicProperties = None,
                                custom_key: str = None) -> str:
        """生成幂等性键"""
        if self.config.strategy == IdempotencyStrategy.MESSAGE_ID:
            if properties and properties.message_id:
                return f"msg_id:{properties.message_id}"
            else:
                # 如果没有消息ID,回退到内容哈希
                return self._generate_content_hash(message_body)
        
        elif self.config.strategy == IdempotencyStrategy.CONTENT_HASH:
            return self._generate_content_hash(message_body)
        
        elif self.config.strategy == IdempotencyStrategy.CUSTOM_KEY:
            if custom_key:
                return f"custom:{custom_key}"
            else:
                raise ValueError("自定义键策略需要提供custom_key参数")
        
        elif self.config.strategy == IdempotencyStrategy.COMPOSITE:
            # 组合策略:消息ID + 内容哈希
            msg_id = properties.message_id if properties and properties.message_id else "no_id"
            content_hash = self._generate_content_hash(message_body)
            return f"composite:{msg_id}:{content_hash}"
        
        else:
            raise ValueError(f"不支持的幂等性策略: {self.config.strategy}")
    
    def _generate_content_hash(self, message_body: bytes) -> str:
        """生成内容哈希"""
        if self.config.hash_algorithm == "md5":
            hash_obj = hashlib.md5()
        elif self.config.hash_algorithm == "sha1":
            hash_obj = hashlib.sha1()
        elif self.config.hash_algorithm == "sha256":
            hash_obj = hashlib.sha256()
        else:
            raise ValueError(f"不支持的哈希算法: {self.config.hash_algorithm}")
        
        hash_obj.update(message_body)
        return f"hash:{hash_obj.hexdigest()}"
    
    def is_duplicate(self, idempotency_key: str) -> bool:
        """检查是否为重复消息"""
        with self.lock:
            return idempotency_key in self.processed_messages
    
    def mark_as_processed(self, idempotency_key: str):
        """标记消息为已处理"""
        with self.lock:
            # 检查缓存大小限制
            if len(self.processed_messages) >= self.config.max_cache_size:
                # 移除最旧的记录
                oldest_key = min(self.processed_messages.keys(), 
                                key=lambda k: self.processed_messages[k])
                del self.processed_messages[oldest_key]
            
            self.processed_messages[idempotency_key] = datetime.now()
    
    def process_message_idempotent(self, message_body: bytes, 
                                  properties: pika.BasicProperties,
                                  processor: Callable[[bytes, pika.BasicProperties], bool],
                                  custom_key: str = None) -> bool:
        """幂等性消息处理"""
        # 生成幂等性键
        idempotency_key = self.generate_idempotency_key(message_body, properties, custom_key)
        
        # 检查是否为重复消息
        if self.is_duplicate(idempotency_key):
            self.logger.info(f"🔄 检测到重复消息,跳过处理: {idempotency_key}")
            return True  # 重复消息视为处理成功
        
        try:
            # 处理消息
            result = processor(message_body, properties)
            
            if result:
                # 处理成功,标记为已处理
                self.mark_as_processed(idempotency_key)
                self.logger.info(f"✅ 消息处理成功: {idempotency_key}")
            else:
                self.logger.error(f"❌ 消息处理失败: {idempotency_key}")
            
            return result
            
        except Exception as e:
            self.logger.error(f"❌ 消息处理异常: {idempotency_key}, 错误: {e}")
            return False
    
    def get_processed_count(self) -> int:
        """获取已处理消息数量"""
        with self.lock:
            return len(self.processed_messages)
    
    def clear_all_records(self):
        """清空所有记录"""
        with self.lock:
            self.processed_messages.clear()
        self.logger.info("清空所有幂等性记录")

### 5.2.3 高可用性配置

```python
class HAMode(Enum):
    """高可用模式"""
    NONE = "none"              # 无高可用
    MIRRORED = "mirrored"      # 镜像队列
    QUORUM = "quorum"          # 仲裁队列
    CLASSIC_HA = "classic_ha"  # 经典高可用

@dataclass
class HAConfig:
    """高可用配置"""
    mode: HAMode = HAMode.QUORUM
    replication_factor: int = 3
    sync_mode: str = "automatic"  # automatic, manual
    promotion_on_shutdown: str = "when-synced"  # when-synced, always
    promotion_on_failure: str = "when-synced"
    max_length: Optional[int] = None
    max_length_bytes: Optional[int] = None
    overflow: str = "drop-head"  # drop-head, reject-publish

class HighAvailabilityManager:
    """高可用管理器"""
    
    def __init__(self, channel, config: HAConfig = None):
        self.channel = channel
        self.config = config or HAConfig()
        self.logger = logging.getLogger(__name__)
    
    def create_ha_queue(self, queue_name: str, 
                       additional_args: Optional[Dict[str, Any]] = None) -> bool:
        """创建高可用队列"""
        try:
            # 基础参数
            arguments = additional_args or {}
            
            if self.config.mode == HAMode.MIRRORED:
                # 镜像队列配置
                arguments.update({
                    'x-ha-policy': 'exactly',
                    'x-ha-policy-params': self.config.replication_factor,
                    'x-ha-sync-mode': self.config.sync_mode,
                    'x-ha-promote-on-shutdown': self.config.promotion_on_shutdown,
                    'x-ha-promote-on-failure': self.config.promotion_on_failure
                })
            
            elif self.config.mode == HAMode.QUORUM:
                # 仲裁队列配置
                arguments.update({
                    'x-queue-type': 'quorum',
                    'x-quorum-initial-group-size': self.config.replication_factor
                })
                
                # 可选的长度限制
                if self.config.max_length:
                    arguments['x-max-length'] = self.config.max_length
                if self.config.max_length_bytes:
                    arguments['x-max-length-bytes'] = self.config.max_length_bytes
                if self.config.overflow:
                    arguments['x-overflow'] = self.config.overflow
            
            elif self.config.mode == HAMode.CLASSIC_HA:
                # 经典高可用配置
                arguments.update({
                    'x-ha-policy': 'all',  # 所有节点
                    'x-ha-sync-mode': 'automatic'
                })
            
            # 声明队列
            self.channel.queue_declare(
                queue=queue_name,
                durable=True,
                exclusive=False,
                auto_delete=False,
                arguments=arguments
            )
            
            self.logger.info(f"✅ 高可用队列创建成功: {queue_name} (模式: {self.config.mode.value})")
            return True
            
        except Exception as e:
            self.logger.error(f"❌ 高可用队列创建失败: {e}")
            return False
    
    def create_ha_exchange(self, exchange_name: str, exchange_type: str = 'direct',
                          additional_args: Optional[Dict[str, Any]] = None) -> bool:
        """创建高可用交换机"""
        try:
            arguments = additional_args or {}
            
            # 交换机本身是无状态的,但可以设置一些高可用相关的参数
            if self.config.mode in [HAMode.MIRRORED, HAMode.QUORUM]:
                arguments['x-ha-policy'] = 'all'
            
            self.channel.exchange_declare(
                exchange=exchange_name,
                exchange_type=exchange_type,
                durable=True,
                auto_delete=False,
                internal=False,
                arguments=arguments
            )
            
            self.logger.info(f"✅ 高可用交换机创建成功: {exchange_name}")
            return True
            
        except Exception as e:
            self.logger.error(f"❌ 高可用交换机创建失败: {e}")
            return False
    
    def setup_ha_topology(self, topology_config: Dict[str, Any]) -> bool:
        """设置高可用拓扑"""
        try:
            # 创建交换机
            for exchange_config in topology_config.get('exchanges', []):
                self.create_ha_exchange(
                    exchange_name=exchange_config['name'],
                    exchange_type=exchange_config.get('type', 'direct'),
                    additional_args=exchange_config.get('arguments', {})
                )
            
            # 创建队列
            for queue_config in topology_config.get('queues', []):
                self.create_ha_queue(
                    queue_name=queue_config['name'],
                    additional_args=queue_config.get('arguments', {})
                )
            
            # 创建绑定
            for binding_config in topology_config.get('bindings', []):
                self.channel.queue_bind(
                    exchange=binding_config['exchange'],
                    queue=binding_config['queue'],
                    routing_key=binding_config.get('routing_key', '')
                )
            
            self.logger.info("✅ 高可用拓扑设置完成")
            return True
            
        except Exception as e:
            self.logger.error(f"❌ 高可用拓扑设置失败: {e}")
            return False
    
    def get_queue_info(self, queue_name: str) -> Optional[Dict[str, Any]]:
        """获取队列信息"""
        try:
            # 这里需要使用管理API来获取队列详细信息
            # 简化示例,实际应该调用RabbitMQ管理API
            method = self.channel.queue_declare(queue=queue_name, passive=True)
            
            return {
                'name': queue_name,
                'message_count': method.method.message_count,
                'consumer_count': method.method.consumer_count,
                'durable': True  # 假设是持久化队列
            }
            
        except Exception as e:
            self.logger.error(f"❌ 获取队列信息失败: {e}")
            return None

## 5.3 完整示例

### 5.3.1 可靠消息发布示例

```python
def reliable_message_publishing_example():
    """可靠消息发布示例"""
    import pika
    import json
    import time
    from datetime import datetime
    
    # 连接配置
    connection_params = pika.ConnectionParameters(
        host='localhost',
        port=5672,
        virtual_host='/',
        credentials=pika.PlainCredentials('guest', 'guest'),
        heartbeat=600,
        blocked_connection_timeout=300
    )
    
    try:
        # 建立连接
        connection = pika.BlockingConnection(connection_params)
        channel = connection.channel()
        
        print("🔗 RabbitMQ连接建立成功")
        
        # 1. 配置持久化管理器
        persistence_config = PersistenceConfig(
            exchange_durable=True,
            queue_durable=True,
            message_persistent=True,
            delivery_mode=DeliveryMode.PERSISTENT,
            sync_writes=True,
            backup_enabled=True
        )
        
        persistence_manager = PersistenceManager(channel, persistence_config)
        
        # 2. 配置确认管理器
        confirmation_config = ConfirmationConfig(
            mode=ConfirmationMode.SIMPLE,
            timeout=5000,
            retry_count=3
        )
        
        confirmation_manager = MessageConfirmationManager(channel, confirmation_config)
        
        # 3. 配置高可用管理器
        ha_config = HAConfig(
            mode=HAMode.QUORUM,
            replication_factor=3
        )
        
        ha_manager = HighAvailabilityManager(channel, ha_config)
        
        # 4. 设置拓扑
        exchange_name = "reliable.exchange"
        queue_name = "reliable.queue"
        routing_key = "reliable.message"
        
        # 创建高可用交换机和队列
        ha_manager.create_ha_exchange(exchange_name, 'direct')
        ha_manager.create_ha_queue(queue_name)
        
        # 绑定队列
        channel.queue_bind(
            exchange=exchange_name,
            queue=queue_name,
            routing_key=routing_key
        )
        
        print(f"📋 拓扑设置完成: {exchange_name} -> {queue_name}")
        
        # 5. 使用事务发布消息
        transaction_manager = TransactionManager(channel)
        
        with transaction_manager:
            for i in range(5):
                message = {
                    'id': f"msg_{i+1}",
                    'content': f"可靠消息内容 {i+1}",
                    'timestamp': datetime.now().isoformat(),
                    'priority': 'high' if i % 2 == 0 else 'normal'
                }
                
                # 设置消息属性
                properties = pika.BasicProperties(
                    delivery_mode=DeliveryMode.PERSISTENT.value,
                    message_id=message['id'],
                    timestamp=int(time.time()),
                    content_type='application/json',
                    headers={
                        'source': 'reliable_publisher',
                        'version': '1.0'
                    }
                )
                
                # 在事务中发布消息
                success = transaction_manager.publish_in_transaction(
                    exchange=exchange_name,
                    routing_key=routing_key,
                    body=json.dumps(message, ensure_ascii=False).encode('utf-8'),
                    properties=properties
                )
                
                if success:
                    print(f"📤 消息 {message['id']} 已加入事务")
                else:
                    print(f"❌ 消息 {message['id']} 加入事务失败")
        
        print("✅ 事务提交完成,所有消息已可靠发布")
        
        # 6. 获取统计信息
        persistence_stats = persistence_manager.get_persistence_stats()
        print(f"📊 持久化统计: {persistence_stats}")
        
        pending_confirmations = confirmation_manager.get_pending_confirmations()
        print(f"⏳ 待确认消息: {len(pending_confirmations)} 条")
        
    except Exception as e:
        print(f"❌ 可靠消息发布失败: {e}")
    
    finally:
        if 'connection' in locals() and connection.is_open:
            connection.close()
            print("🔌 连接已关闭")

### 5.3.2 可靠消息消费示例

```python
def reliable_message_consumption_example():
    """可靠消息消费示例"""
    import pika
    import json
    import time
    from datetime import datetime
    
    # 连接配置
    connection_params = pika.ConnectionParameters(
        host='localhost',
        port=5672,
        virtual_host='/',
        credentials=pika.PlainCredentials('guest', 'guest')
    )
    
    try:
        # 建立连接
        connection = pika.BlockingConnection(connection_params)
        channel = connection.channel()
        
        print("🔗 RabbitMQ连接建立成功")
        
        # 1. 配置重试管理器
        retry_config = RetryConfig(
            strategy=RetryStrategy.EXPONENTIAL,
            max_attempts=3,
            initial_delay=1000,
            max_delay=30000,
            dead_letter_enabled=True
        )
        
        retry_manager = MessageRetryManager(channel, retry_config)
        
        # 2. 配置幂等性管理器
        idempotency_config = IdempotencyConfig(
            strategy=IdempotencyStrategy.MESSAGE_ID,
            ttl_seconds=3600
        )
        
        idempotency_manager = IdempotencyManager(idempotency_config)
        
        # 3. 定义消息处理器
        def process_message(body: bytes, properties: pika.BasicProperties) -> bool:
            """处理消息"""
            try:
                # 解析消息
                message = json.loads(body.decode('utf-8'))
                
                print(f"📨 处理消息: {message['id']} - {message['content']}")
                
                # 模拟处理逻辑
                if message.get('priority') == 'high':
                    # 高优先级消息,快速处理
                    time.sleep(0.1)
                else:
                    # 普通消息,正常处理
                    time.sleep(0.5)
                
                # 模拟处理失败(10%概率)
                import random
                if random.random() < 0.1:
                    raise Exception("模拟处理失败")
                
                print(f"✅ 消息处理成功: {message['id']}")
                return True
                
            except Exception as e:
                print(f"❌ 消息处理失败: {e}")
                return False
        
        # 4. 定义消息回调
        def message_callback(channel, method, properties, body):
            """消息回调"""
            print(f"📥 收到消息: delivery_tag={method.delivery_tag}")
            
            # 获取重试次数
            retry_count = 0
            if properties.headers and 'x-retry-count' in properties.headers:
                retry_count = properties.headers['x-retry-count']
            
            # 幂等性处理
            success = idempotency_manager.process_message_idempotent(
                message_body=body,
                properties=properties,
                processor=process_message
            )
            
            if success:
                # 处理成功,确认消息
                channel.basic_ack(delivery_tag=method.delivery_tag)
                print(f"✅ 消息确认: delivery_tag={method.delivery_tag}")
            else:
                # 处理失败,尝试重试
                retry_success = retry_manager.retry_message(
                    original_queue=method.routing_key,
                    message_body=body,
                    properties=properties,
                    attempt=retry_count + 1
                )
                
                if retry_success:
                    # 重试成功,确认原消息
                    channel.basic_ack(delivery_tag=method.delivery_tag)
                    print(f"🔄 消息已重试: delivery_tag={method.delivery_tag}")
                else:
                    # 重试失败,拒绝消息
                    channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
                    print(f"❌ 消息拒绝: delivery_tag={method.delivery_tag}")
        
        # 5. 设置消费者
        queue_name = "reliable.queue"
        
        # 设置QoS
        channel.basic_qos(prefetch_count=10)
        
        # 开始消费
        channel.basic_consume(
            queue=queue_name,
            on_message_callback=message_callback,
            auto_ack=False  # 手动确认
        )
        
        print(f"🎯 开始消费队列: {queue_name}")
        print("按 Ctrl+C 停止消费...")
        
        # 开始消费循环
        try:
            channel.start_consuming()
        except KeyboardInterrupt:
            print("\n⏹️ 停止消费")
            channel.stop_consuming()
        
        # 6. 显示统计信息
        retry_stats = retry_manager.get_retry_stats()
        print(f"📊 重试统计: {retry_stats}")
        
        processed_count = idempotency_manager.get_processed_count()
        print(f"🔄 已处理消息数: {processed_count}")
        
    except Exception as e:
        print(f"❌ 可靠消息消费失败: {e}")
    
    finally:
        if 'connection' in locals() and connection.is_open:
            connection.close()
            print("🔌 连接已关闭")

# 运行示例
if __name__ == "__main__":
    print("=== RabbitMQ 消息持久化与可靠性保证示例 ===")
    
    # 运行发布示例
    print("\n1. 可靠消息发布示例:")
    reliable_message_publishing_example()
    
    time.sleep(2)
    
    # 运行消费示例
    print("\n2. 可靠消息消费示例:")
    reliable_message_consumption_example()

5.4 本章总结

5.4.1 核心知识点

  1. 消息持久化机制

    • 交换机持久化:durable=True
    • 队列持久化:durable=True
    • 消息持久化:delivery_mode=2
    • 同步写入和备份策略
  2. 消息确认机制

    • 发布确认:confirm_delivery()
    • 消费确认:basic_ack(), basic_nack()
    • 批量确认和异步确认
    • 确认回调处理
  3. 事务管理

    • 本地事务:tx_select(), tx_commit(), tx_rollback()
    • 事务上下文管理
    • 事务操作记录和回滚
  4. 消息重试机制

    • 重试策略:固定延迟、指数退避、线性增长
    • 死信队列处理
    • 重试次数限制
    • 延迟重试队列
  5. 消息幂等性

    • 幂等性键生成策略
    • 重复消息检测
    • 处理记录管理
    • TTL和清理机制
  6. 高可用性配置

    • 镜像队列:x-ha-policy
    • 仲裁队列:x-queue-type=quorum
    • 复制因子配置
    • 故障转移策略

5.4.2 最佳实践

  1. 持久化策略

    • 根据业务重要性选择持久化级别
    • 平衡性能和可靠性需求
    • 定期备份重要数据
  2. 确认机制

    • 生产环境必须启用发布确认
    • 合理设置确认超时时间
    • 实现确认失败的重试逻辑
  3. 事务使用

    • 谨慎使用事务,注意性能影响
    • 事务范围尽可能小
    • 实现事务超时和回滚机制
  4. 重试策略

    • 设置合理的重试次数和延迟
    • 使用指数退避避免系统过载
    • 配置死信队列处理最终失败
  5. 幂等性设计

    • 选择合适的幂等性键策略
    • 设置合理的记录TTL
    • 定期清理过期记录
  6. 高可用部署

    • 使用仲裁队列替代镜像队列
    • 设置合适的复制因子
    • 监控集群健康状态

5.4.3 练习题

  1. 基础练习

    • 实现一个支持持久化的消息发布器
    • 创建带有确认机制的消息消费者
    • 配置死信队列处理失败消息
  2. 进阶练习

    • 实现基于内容哈希的幂等性管理器
    • 设计支持多种重试策略的重试管理器
    • 创建高可用的消息处理系统
  3. 综合练习

    • 构建完整的可靠消息传递系统
    • 实现消息的端到端可靠性保证
    • 设计消息处理的监控和告警机制
  4. 性能优化

    • 优化持久化操作的性能
    • 实现批量确认机制
    • 设计高吞吐量的可靠消息系统 “`