7.1 事务消息

7.1.1 事务消息原理

from dataclasses import dataclass
from typing import List, Dict, Optional, Callable
from enum import Enum
import time
import uuid
import threading
from abc import ABC, abstractmethod

class TransactionState(Enum):
    """事务状态"""
    UNKNOWN = "unknown"          # 未知状态
    COMMIT = "commit"            # 提交事务
    ROLLBACK = "rollback"        # 回滚事务
    PREPARED = "prepared"        # 预提交状态

class MessageType(Enum):
    """消息类型"""
    NORMAL = "normal"            # 普通消息
    TRANSACTION = "transaction"  # 事务消息
    HALF = "half"                # 半消息

@dataclass
class TransactionMessage:
    """事务消息"""
    message_id: str
    topic: str
    tag: str
    body: str
    transaction_id: str
    state: TransactionState
    message_type: MessageType
    properties: Dict[str, str]
    created_time: float
    updated_time: float
    retry_count: int = 0
    max_retry: int = 3

class TransactionListener(ABC):
    """事务监听器接口"""
    
    @abstractmethod
    def execute_local_transaction(self, message: TransactionMessage, args: Dict) -> TransactionState:
        """执行本地事务"""
        pass
    
    @abstractmethod
    def check_local_transaction(self, message: TransactionMessage) -> TransactionState:
        """检查本地事务状态"""
        pass

class TransactionProducer:
    """事务消息生产者"""
    
    def __init__(self, producer_group: str, nameserver_addr: str):
        self.producer_group = producer_group
        self.nameserver_addr = nameserver_addr
        self.transaction_listener: Optional[TransactionListener] = None
        self.half_messages: Dict[str, TransactionMessage] = {}
        self.check_thread_pool_size = 5
        self.check_interval = 60  # 秒
        self._running = False
        self._check_thread = None
    
    def set_transaction_listener(self, listener: TransactionListener):
        """设置事务监听器"""
        self.transaction_listener = listener
    
    def start(self):
        """启动生产者"""
        if not self.transaction_listener:
            raise ValueError("事务监听器未设置")
        
        self._running = True
        self._check_thread = threading.Thread(target=self._check_transaction_status)
        self._check_thread.daemon = True
        self._check_thread.start()
        
        print(f"事务消息生产者已启动: {self.producer_group}")
    
    def shutdown(self):
        """关闭生产者"""
        self._running = False
        if self._check_thread:
            self._check_thread.join()
        print(f"事务消息生产者已关闭: {self.producer_group}")
    
    def send_message_in_transaction(self, topic: str, tag: str, body: str, 
                                  local_transaction_args: Dict = None) -> TransactionMessage:
        """发送事务消息"""
        if not self.transaction_listener:
            raise ValueError("事务监听器未设置")
        
        # 1. 发送半消息
        message = TransactionMessage(
            message_id=str(uuid.uuid4()),
            topic=topic,
            tag=tag,
            body=body,
            transaction_id=str(uuid.uuid4()),
            state=TransactionState.PREPARED,
            message_type=MessageType.HALF,
            properties={},
            created_time=time.time(),
            updated_time=time.time()
        )
        
        # 存储半消息
        self.half_messages[message.transaction_id] = message
        print(f"发送半消息: {message.message_id}, 事务ID: {message.transaction_id}")
        
        # 2. 执行本地事务
        try:
            local_state = self.transaction_listener.execute_local_transaction(
                message, local_transaction_args or {}
            )
            
            # 3. 根据本地事务结果决定提交或回滚
            if local_state == TransactionState.COMMIT:
                self._commit_transaction(message)
            elif local_state == TransactionState.ROLLBACK:
                self._rollback_transaction(message)
            else:
                # 未知状态,等待回查
                message.state = TransactionState.UNKNOWN
                message.updated_time = time.time()
                print(f"本地事务状态未知,等待回查: {message.transaction_id}")
        
        except Exception as e:
            print(f"执行本地事务失败: {e}")
            self._rollback_transaction(message)
        
        return message
    
    def _commit_transaction(self, message: TransactionMessage):
        """提交事务"""
        message.state = TransactionState.COMMIT
        message.message_type = MessageType.TRANSACTION
        message.updated_time = time.time()
        
        # 模拟发送真正的消息到Broker
        print(f"提交事务消息: {message.message_id}, 事务ID: {message.transaction_id}")
        
        # 从半消息存储中移除
        if message.transaction_id in self.half_messages:
            del self.half_messages[message.transaction_id]
    
    def _rollback_transaction(self, message: TransactionMessage):
        """回滚事务"""
        message.state = TransactionState.ROLLBACK
        message.updated_time = time.time()
        
        print(f"回滚事务消息: {message.message_id}, 事务ID: {message.transaction_id}")
        
        # 从半消息存储中移除
        if message.transaction_id in self.half_messages:
            del self.half_messages[message.transaction_id]
    
    def _check_transaction_status(self):
        """定期检查事务状态"""
        while self._running:
            try:
                current_time = time.time()
                expired_messages = []
                
                for transaction_id, message in self.half_messages.items():
                    # 检查是否超时需要回查
                    if current_time - message.updated_time > self.check_interval:
                        expired_messages.append(message)
                
                for message in expired_messages:
                    self._check_local_transaction_status(message)
                
                time.sleep(30)  # 每30秒检查一次
            
            except Exception as e:
                print(f"检查事务状态异常: {e}")
    
    def _check_local_transaction_status(self, message: TransactionMessage):
        """回查本地事务状态"""
        try:
            if message.retry_count >= message.max_retry:
                print(f"事务回查超过最大重试次数,回滚事务: {message.transaction_id}")
                self._rollback_transaction(message)
                return
            
            message.retry_count += 1
            print(f"回查本地事务状态: {message.transaction_id}, 重试次数: {message.retry_count}")
            
            local_state = self.transaction_listener.check_local_transaction(message)
            
            if local_state == TransactionState.COMMIT:
                self._commit_transaction(message)
            elif local_state == TransactionState.ROLLBACK:
                self._rollback_transaction(message)
            else:
                # 仍然未知,更新时间等待下次回查
                message.updated_time = time.time()
                print(f"本地事务状态仍然未知: {message.transaction_id}")
        
        except Exception as e:
            print(f"回查本地事务状态异常: {e}")
            message.updated_time = time.time()

# 示例事务监听器实现
class OrderTransactionListener(TransactionListener):
    """订单事务监听器"""
    
    def __init__(self):
        self.local_transactions: Dict[str, bool] = {}  # 模拟本地事务状态存储
    
    def execute_local_transaction(self, message: TransactionMessage, args: Dict) -> TransactionState:
        """执行本地事务 - 创建订单"""
        try:
            order_id = args.get('order_id')
            user_id = args.get('user_id')
            amount = args.get('amount')
            
            print(f"执行本地事务 - 创建订单: {order_id}, 用户: {user_id}, 金额: {amount}")
            
            # 模拟订单创建逻辑
            if amount > 0:
                # 订单创建成功
                self.local_transactions[message.transaction_id] = True
                print(f"订单创建成功: {order_id}")
                return TransactionState.COMMIT
            else:
                # 订单创建失败
                self.local_transactions[message.transaction_id] = False
                print(f"订单创建失败: {order_id}")
                return TransactionState.ROLLBACK
        
        except Exception as e:
            print(f"执行本地事务异常: {e}")
            self.local_transactions[message.transaction_id] = False
            return TransactionState.ROLLBACK
    
    def check_local_transaction(self, message: TransactionMessage) -> TransactionState:
        """检查本地事务状态"""
        transaction_id = message.transaction_id
        
        if transaction_id in self.local_transactions:
            success = self.local_transactions[transaction_id]
            print(f"回查本地事务状态: {transaction_id}, 结果: {success}")
            return TransactionState.COMMIT if success else TransactionState.ROLLBACK
        else:
            print(f"未找到本地事务记录: {transaction_id}")
            return TransactionState.UNKNOWN

# 使用示例
if __name__ == "__main__":
    # 创建事务监听器
    listener = OrderTransactionListener()
    
    # 创建事务生产者
    producer = TransactionProducer("order_producer_group", "localhost:9876")
    producer.set_transaction_listener(listener)
    producer.start()
    
    try:
        # 发送事务消息
        print("=== 发送事务消息示例 ===")
        
        # 成功的事务
        message1 = producer.send_message_in_transaction(
            topic="order_topic",
            tag="create_order",
            body="订单创建消息",
            local_transaction_args={
                'order_id': 'ORDER_001',
                'user_id': 'USER_123',
                'amount': 100.0
            }
        )
        
        # 失败的事务
        message2 = producer.send_message_in_transaction(
            topic="order_topic",
            tag="create_order",
            body="订单创建消息",
            local_transaction_args={
                'order_id': 'ORDER_002',
                'user_id': 'USER_456',
                'amount': -50.0  # 负数金额,会导致失败
            }
        )
        
        # 等待一段时间观察事务处理
        time.sleep(5)
        
        print(f"\n当前半消息数量: {len(producer.half_messages)}")
        for tid, msg in producer.half_messages.items():
            print(f"  事务ID: {tid}, 状态: {msg.state.value}")
    
    finally:
        producer.shutdown()

7.1.2 事务消息最佳实践

from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
import json
import time

class TransactionPattern(Enum):
    """事务模式"""
    SAGA = "saga"                    # Saga模式
    TCC = "tcc"                      # TCC模式
    TWO_PHASE_COMMIT = "2pc"         # 两阶段提交
    MESSAGE_QUEUE = "mq"             # 消息队列事务

class CompensationAction(Enum):
    """补偿动作"""
    RETRY = "retry"                  # 重试
    COMPENSATE = "compensate"        # 补偿
    MANUAL = "manual"                # 人工处理
    IGNORE = "ignore"                # 忽略

@dataclass
class TransactionStep:
    """事务步骤"""
    step_id: str
    service_name: str
    action: str
    compensation_action: str
    timeout: int
    retry_count: int
    parameters: Dict
    dependencies: List[str]  # 依赖的步骤

@dataclass
class TransactionSaga:
    """Saga事务"""
    saga_id: str
    steps: List[TransactionStep]
    current_step: int
    status: str
    created_time: float
    updated_time: float
    context: Dict  # 事务上下文

class TransactionBestPractices:
    """事务消息最佳实践"""
    
    def __init__(self):
        self.practices = self._create_best_practices()
        self.patterns = self._create_transaction_patterns()
        self.compensation_strategies = self._create_compensation_strategies()
    
    def _create_best_practices(self) -> List[Dict]:
        """创建最佳实践列表"""
        return [
            {
                "category": "设计原则",
                "title": "幂等性设计",
                "description": "确保事务操作具有幂等性,避免重复执行产生副作用",
                "implementation": [
                    "使用唯一业务ID标识每个操作",
                    "在数据库中记录操作状态",
                    "实现幂等性检查逻辑",
                    "使用版本号或时间戳防止并发冲突"
                ],
                "example": """
# 幂等性实现示例
class IdempotentService:
    def __init__(self):
        self.operation_records = {}  # 操作记录
    
    def execute_with_idempotent(self, operation_id: str, operation_func, *args, **kwargs):
        # 检查是否已执行
        if operation_id in self.operation_records:
            return self.operation_records[operation_id]
        
        # 执行操作
        result = operation_func(*args, **kwargs)
        
        # 记录操作结果
        self.operation_records[operation_id] = result
        return result
"""
            },
            {
                "category": "设计原则",
                "title": "最终一致性",
                "description": "接受短期的数据不一致,通过异步处理达到最终一致性",
                "implementation": [
                    "设计合理的补偿机制",
                    "实现数据同步检查",
                    "提供数据修复工具",
                    "监控数据一致性状态"
                ],
                "example": """
# 最终一致性检查
class ConsistencyChecker:
    def check_data_consistency(self, transaction_id: str):
        # 检查各个服务的数据状态
        order_status = self.get_order_status(transaction_id)
        payment_status = self.get_payment_status(transaction_id)
        inventory_status = self.get_inventory_status(transaction_id)
        
        # 检查一致性
        if not self.is_consistent(order_status, payment_status, inventory_status):
            # 触发数据修复
            self.trigger_data_repair(transaction_id)
"""
            },
            {
                "category": "错误处理",
                "title": "超时处理策略",
                "description": "合理设置超时时间,实现超时后的处理策略",
                "implementation": [
                    "设置合理的超时时间",
                    "实现超时重试机制",
                    "提供超时补偿逻辑",
                    "记录超时事件用于分析"
                ],
                "example": """
# 超时处理示例
class TimeoutHandler:
    def __init__(self, default_timeout=30):
        self.default_timeout = default_timeout
    
    def execute_with_timeout(self, func, timeout=None, retry_count=3):
        timeout = timeout or self.default_timeout
        
        for attempt in range(retry_count):
            try:
                # 设置超时执行
                result = self.execute_with_deadline(func, timeout)
                return result
            except TimeoutError:
                if attempt == retry_count - 1:
                    # 最后一次重试失败,执行补偿
                    self.handle_timeout_compensation()
                    raise
                # 指数退避重试
                time.sleep(2 ** attempt)
"""
            },
            {
                "category": "性能优化",
                "title": "批量处理",
                "description": "对于大量事务消息,采用批量处理提升性能",
                "implementation": [
                    "实现消息批量发送",
                    "批量执行本地事务",
                    "批量提交或回滚",
                    "优化数据库批量操作"
                ],
                "example": """
# 批量事务处理
class BatchTransactionProcessor:
    def __init__(self, batch_size=100):
        self.batch_size = batch_size
        self.pending_transactions = []
    
    def add_transaction(self, transaction):
        self.pending_transactions.append(transaction)
        
        if len(self.pending_transactions) >= self.batch_size:
            self.process_batch()
    
    def process_batch(self):
        if not self.pending_transactions:
            return
        
        # 批量执行本地事务
        results = self.execute_batch_local_transactions(self.pending_transactions)
        
        # 批量提交或回滚
        self.batch_commit_or_rollback(results)
        
        self.pending_transactions.clear()
"""
            },
            {
                "category": "监控告警",
                "title": "事务监控",
                "description": "建立完善的事务监控和告警机制",
                "implementation": [
                    "监控事务成功率",
                    "监控事务执行时间",
                    "监控回查频率",
                    "设置异常告警"
                ],
                "example": """
# 事务监控
class TransactionMonitor:
    def __init__(self):
        self.metrics = {
            'total_transactions': 0,
            'successful_transactions': 0,
            'failed_transactions': 0,
            'timeout_transactions': 0,
            'check_back_count': 0
        }
    
    def record_transaction_result(self, result: str):
        self.metrics['total_transactions'] += 1
        self.metrics[f'{result}_transactions'] += 1
        
        # 检查告警条件
        self.check_alert_conditions()
    
    def check_alert_conditions(self):
        total = self.metrics['total_transactions']
        if total > 0:
            failure_rate = self.metrics['failed_transactions'] / total
            if failure_rate > 0.1:  # 失败率超过10%
                self.send_alert(f"事务失败率过高: {failure_rate:.2%}")
"""
            }
        ]
    
    def _create_transaction_patterns(self) -> Dict:
        """创建事务模式"""
        return {
            TransactionPattern.SAGA: {
                "name": "Saga模式",
                "description": "将长事务分解为多个短事务,每个短事务都有对应的补偿操作",
                "适用场景": [
                    "跨多个微服务的长事务",
                    "对一致性要求不是特别严格的场景",
                    "需要高可用性的分布式系统"
                ],
                "优点": [
                    "避免长时间锁定资源",
                    "提高系统可用性",
                    "支持复杂的业务流程"
                ],
                "缺点": [
                    "实现复杂度较高",
                    "需要设计补偿逻辑",
                    "数据一致性较弱"
                ]
            },
            TransactionPattern.TCC: {
                "name": "TCC模式",
                "description": "Try-Confirm-Cancel三阶段提交模式",
                "适用场景": [
                    "对一致性要求较高的场景",
                    "资源预留类业务",
                    "金融交易系统"
                ],
                "优点": [
                    "强一致性保证",
                    "性能较好",
                    "支持嵌套事务"
                ],
                "缺点": [
                    "业务侵入性强",
                    "实现复杂",
                    "需要预留资源"
                ]
            },
            TransactionPattern.MESSAGE_QUEUE: {
                "name": "消息队列事务",
                "description": "基于消息队列的最终一致性事务",
                "适用场景": [
                    "异步处理场景",
                    "事件驱动架构",
                    "数据同步场景"
                ],
                "优点": [
                    "解耦性好",
                    "高可用性",
                    "支持异步处理"
                ],
                "缺点": [
                    "最终一致性",
                    "消息可能丢失",
                    "调试困难"
                ]
            }
        }
    
    def _create_compensation_strategies(self) -> Dict:
        """创建补偿策略"""
        return {
            CompensationAction.RETRY: {
                "name": "重试策略",
                "description": "对失败的操作进行重试",
                "实现要点": [
                    "设置合理的重试次数",
                    "使用指数退避算法",
                    "避免重试风暴",
                    "记录重试日志"
                ],
                "示例代码": """
class RetryStrategy:
    def __init__(self, max_retries=3, base_delay=1):
        self.max_retries = max_retries
        self.base_delay = base_delay
    
    def execute_with_retry(self, func, *args, **kwargs):
        for attempt in range(self.max_retries + 1):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                if attempt == self.max_retries:
                    raise e
                delay = self.base_delay * (2 ** attempt)
                time.sleep(delay)
"""
            },
            CompensationAction.COMPENSATE: {
                "name": "补偿策略",
                "description": "执行相反的操作来撤销已完成的操作",
                "实现要点": [
                    "设计幂等的补偿操作",
                    "保证补偿操作的可靠性",
                    "记录补偿执行状态",
                    "支持部分补偿"
                ],
                "示例代码": """
class CompensationManager:
    def __init__(self):
        self.compensation_actions = {}
    
    def register_compensation(self, action_id, compensation_func):
        self.compensation_actions[action_id] = compensation_func
    
    def execute_compensation(self, action_id, *args, **kwargs):
        if action_id in self.compensation_actions:
            return self.compensation_actions[action_id](*args, **kwargs)
        else:
            raise ValueError(f"未找到补偿操作: {action_id}")
"""
            }
        }
    
    def create_saga_transaction(self, saga_id: str, steps: List[Dict]) -> TransactionSaga:
        """创建Saga事务"""
        transaction_steps = []
        
        for step_data in steps:
            step = TransactionStep(
                step_id=step_data['step_id'],
                service_name=step_data['service_name'],
                action=step_data['action'],
                compensation_action=step_data['compensation_action'],
                timeout=step_data.get('timeout', 30),
                retry_count=step_data.get('retry_count', 3),
                parameters=step_data.get('parameters', {}),
                dependencies=step_data.get('dependencies', [])
            )
            transaction_steps.append(step)
        
        return TransactionSaga(
            saga_id=saga_id,
            steps=transaction_steps,
            current_step=0,
            status='CREATED',
            created_time=time.time(),
            updated_time=time.time(),
            context={}
        )
    
    def print_best_practices(self):
        """打印最佳实践"""
        print("=== 事务消息最佳实践 ===")
        
        for practice in self.practices:
            print(f"\n【{practice['category']}】{practice['title']}")
            print(f"描述: {practice['description']}")
            
            print("\n实现要点:")
            for impl in practice['implementation']:
                print(f"  • {impl}")
            
            if 'example' in practice:
                print(f"\n示例代码:")
                print(practice['example'])
    
    def print_transaction_patterns(self):
        """打印事务模式"""
        print("\n=== 分布式事务模式 ===")
        
        for pattern, info in self.patterns.items():
            print(f"\n【{info['name']}】")
            print(f"描述: {info['description']}")
            
            print("\n适用场景:")
            for scenario in info['适用场景']:
                print(f"  • {scenario}")
            
            print("\n优点:")
            for advantage in info['优点']:
                print(f"  ✓ {advantage}")
            
            print("\n缺点:")
            for disadvantage in info['缺点']:
                print(f"  ✗ {disadvantage}")

# 使用示例
if __name__ == "__main__":
    practices = TransactionBestPractices()
    
    # 打印最佳实践
    practices.print_best_practices()
    
    # 打印事务模式
    practices.print_transaction_patterns()
    
    # 创建Saga事务示例
    print("\n=== Saga事务示例 ===")
    
    saga_steps = [
        {
            'step_id': 'create_order',
            'service_name': 'order_service',
            'action': 'create_order',
            'compensation_action': 'cancel_order',
            'timeout': 30,
            'parameters': {'user_id': '123', 'product_id': '456'}
        },
        {
            'step_id': 'reserve_inventory',
            'service_name': 'inventory_service',
            'action': 'reserve_inventory',
            'compensation_action': 'release_inventory',
            'timeout': 20,
            'dependencies': ['create_order'],
            'parameters': {'product_id': '456', 'quantity': 1}
        },
        {
            'step_id': 'process_payment',
            'service_name': 'payment_service',
            'action': 'process_payment',
            'compensation_action': 'refund_payment',
            'timeout': 60,
            'dependencies': ['create_order', 'reserve_inventory'],
            'parameters': {'amount': 100.0, 'user_id': '123'}
        }
    ]
    
    saga = practices.create_saga_transaction('SAGA_001', saga_steps)
    
    print(f"Saga事务已创建: {saga.saga_id}")
    print(f"步骤数量: {len(saga.steps)}")
    for step in saga.steps:
        print(f"  步骤: {step.step_id} -> {step.service_name}.{step.action}")
        if step.dependencies:
            print(f"    依赖: {', '.join(step.dependencies)}")

7.2 延迟消息

7.2.1 延迟消息实现

from dataclasses import dataclass
from typing import List, Dict, Optional, Callable
from enum import Enum
import time
import heapq
import threading
import uuid
from abc import ABC, abstractmethod

class DelayLevel(Enum):
    """延迟级别"""
    LEVEL_1 = (1, "1s")      # 1秒
    LEVEL_2 = (2, "5s")      # 5秒
    LEVEL_3 = (3, "10s")     # 10秒
    LEVEL_4 = (4, "30s")     # 30秒
    LEVEL_5 = (5, "1m")      # 1分钟
    LEVEL_6 = (6, "2m")      # 2分钟
    LEVEL_7 = (7, "3m")      # 3分钟
    LEVEL_8 = (8, "4m")      # 4分钟
    LEVEL_9 = (9, "5m")      # 5分钟
    LEVEL_10 = (10, "6m")    # 6分钟
    LEVEL_11 = (11, "7m")    # 7分钟
    LEVEL_12 = (12, "8m")    # 8分钟
    LEVEL_13 = (13, "9m")    # 9分钟
    LEVEL_14 = (14, "10m")   # 10分钟
    LEVEL_15 = (15, "20m")   # 20分钟
    LEVEL_16 = (16, "30m")   # 30分钟
    LEVEL_17 = (17, "1h")    # 1小时
    LEVEL_18 = (18, "2h")    # 2小时
    
    def __init__(self, level: int, description: str):
        self.level = level
        self.description = description
    
    @classmethod
    def get_delay_seconds(cls, level: int) -> int:
        """获取延迟秒数"""
        delay_map = {
            1: 1, 2: 5, 3: 10, 4: 30,
            5: 60, 6: 120, 7: 180, 8: 240, 9: 300,
            10: 360, 11: 420, 12: 480, 13: 540, 14: 600,
            15: 1200, 16: 1800, 17: 3600, 18: 7200
        }
        return delay_map.get(level, 0)

@dataclass
class DelayMessage:
    """延迟消息"""
    message_id: str
    topic: str
    tag: str
    body: str
    delay_level: int
    delay_seconds: int
    scheduled_time: float  # 预定执行时间
    created_time: float
    retry_count: int = 0
    max_retry: int = 3
    properties: Dict[str, str] = None
    
    def __post_init__(self):
        if self.properties is None:
            self.properties = {}
    
    def __lt__(self, other):
        """用于堆排序"""
        return self.scheduled_time < other.scheduled_time

class DelayMessageListener(ABC):
    """延迟消息监听器"""
    
    @abstractmethod
    def on_message(self, message: DelayMessage) -> bool:
        """处理延迟消息"""
        pass

class DelayMessageScheduler:
    """延迟消息调度器"""
    
    def __init__(self):
        self.delay_queues: Dict[int, List[DelayMessage]] = {}  # 按延迟级别分组的队列
        self.scheduled_messages = []  # 已调度的消息堆
        self.listeners: Dict[str, DelayMessageListener] = {}  # 主题监听器
        self._running = False
        self._scheduler_thread = None
        self._lock = threading.Lock()
    
    def start(self):
        """启动调度器"""
        self._running = True
        self._scheduler_thread = threading.Thread(target=self._schedule_loop)
        self._scheduler_thread.daemon = True
        self._scheduler_thread.start()
        print("延迟消息调度器已启动")
    
    def shutdown(self):
        """关闭调度器"""
        self._running = False
        if self._scheduler_thread:
            self._scheduler_thread.join()
        print("延迟消息调度器已关闭")
    
    def register_listener(self, topic: str, listener: DelayMessageListener):
        """注册主题监听器"""
        self.listeners[topic] = listener
        print(f"已注册主题监听器: {topic}")
    
    def schedule_message(self, topic: str, tag: str, body: str, 
                        delay_level: int = None, delay_seconds: int = None) -> DelayMessage:
        """调度延迟消息"""
        if delay_level is not None:
            actual_delay_seconds = DelayLevel.get_delay_seconds(delay_level)
        elif delay_seconds is not None:
            actual_delay_seconds = delay_seconds
            delay_level = self._find_closest_delay_level(delay_seconds)
        else:
            raise ValueError("必须指定delay_level或delay_seconds")
        
        message = DelayMessage(
            message_id=str(uuid.uuid4()),
            topic=topic,
            tag=tag,
            body=body,
            delay_level=delay_level,
            delay_seconds=actual_delay_seconds,
            scheduled_time=time.time() + actual_delay_seconds,
            created_time=time.time()
        )
        
        with self._lock:
            # 添加到对应延迟级别的队列
            if delay_level not in self.delay_queues:
                self.delay_queues[delay_level] = []
            self.delay_queues[delay_level].append(message)
            
            # 添加到调度堆
            heapq.heappush(self.scheduled_messages, message)
        
        print(f"延迟消息已调度: {message.message_id}, 延迟: {actual_delay_seconds}秒")
        return message
    
    def _find_closest_delay_level(self, delay_seconds: int) -> int:
        """找到最接近的延迟级别"""
        min_diff = float('inf')
        closest_level = 1
        
        for level in range(1, 19):
            level_seconds = DelayLevel.get_delay_seconds(level)
            diff = abs(level_seconds - delay_seconds)
            if diff < min_diff:
                min_diff = diff
                closest_level = level
        
        return closest_level
    
    def _schedule_loop(self):
        """调度循环"""
        while self._running:
            try:
                current_time = time.time()
                messages_to_deliver = []
                
                with self._lock:
                    # 检查是否有到期的消息
                    while (self.scheduled_messages and 
                           self.scheduled_messages[0].scheduled_time <= current_time):
                        message = heapq.heappop(self.scheduled_messages)
                        messages_to_deliver.append(message)
                
                # 投递到期的消息
                for message in messages_to_deliver:
                    self._deliver_message(message)
                
                time.sleep(1)  # 每秒检查一次
            
            except Exception as e:
                print(f"调度循环异常: {e}")
    
    def _deliver_message(self, message: DelayMessage):
        """投递消息"""
        try:
            listener = self.listeners.get(message.topic)
            if listener:
                success = listener.on_message(message)
                if success:
                    print(f"延迟消息投递成功: {message.message_id}")
                    self._remove_from_delay_queue(message)
                else:
                    print(f"延迟消息处理失败: {message.message_id}")
                    self._handle_delivery_failure(message)
            else:
                print(f"未找到主题监听器: {message.topic}")
                self._handle_delivery_failure(message)
        
        except Exception as e:
            print(f"投递消息异常: {e}")
            self._handle_delivery_failure(message)
    
    def _handle_delivery_failure(self, message: DelayMessage):
        """处理投递失败"""
        message.retry_count += 1
        
        if message.retry_count <= message.max_retry:
            # 重新调度,延迟时间递增
            retry_delay = min(60 * (2 ** message.retry_count), 3600)  # 最大1小时
            message.scheduled_time = time.time() + retry_delay
            
            with self._lock:
                heapq.heappush(self.scheduled_messages, message)
            
            print(f"延迟消息重新调度: {message.message_id}, 重试次数: {message.retry_count}")
        else:
            print(f"延迟消息超过最大重试次数,丢弃: {message.message_id}")
            self._remove_from_delay_queue(message)
    
    def _remove_from_delay_queue(self, message: DelayMessage):
        """从延迟队列中移除消息"""
        delay_level = message.delay_level
        if delay_level in self.delay_queues:
            try:
                self.delay_queues[delay_level].remove(message)
            except ValueError:
                pass  # 消息可能已经被移除
    
    def get_queue_status(self) -> Dict:
        """获取队列状态"""
        with self._lock:
            status = {
                'total_scheduled': len(self.scheduled_messages),
                'delay_queues': {}
            }
            
            for level, messages in self.delay_queues.items():
                status['delay_queues'][level] = {
                    'count': len(messages),
                    'description': DelayLevel.get_delay_seconds(level)
                }
        
        return status

class DelayMessageProducer:
    """延迟消息生产者"""
    
    def __init__(self, scheduler: DelayMessageScheduler):
        self.scheduler = scheduler
    
    def send_delay_message(self, topic: str, tag: str, body: str, 
                          delay_level: int = None, delay_seconds: int = None) -> DelayMessage:
        """发送延迟消息"""
        return self.scheduler.schedule_message(topic, tag, body, delay_level, delay_seconds)
    
    def send_scheduled_message(self, topic: str, tag: str, body: str, 
                             scheduled_time: float) -> DelayMessage:
        """发送定时消息"""
        delay_seconds = max(0, int(scheduled_time - time.time()))
        return self.send_delay_message(topic, tag, body, delay_seconds=delay_seconds)

# 示例延迟消息监听器
class OrderTimeoutListener(DelayMessageListener):
    """订单超时监听器"""
    
    def __init__(self):
        self.processed_orders = set()
    
    def on_message(self, message: DelayMessage) -> bool:
        """处理订单超时消息"""
        try:
            # 解析消息内容
            import json
            order_data = json.loads(message.body)
            order_id = order_data.get('order_id')
            
            if order_id in self.processed_orders:
                print(f"订单已处理,忽略超时消息: {order_id}")
                return True
            
            # 检查订单状态
            order_status = self._get_order_status(order_id)
            
            if order_status == 'UNPAID':
                # 订单未支付,执行超时处理
                self._handle_order_timeout(order_id)
                print(f"订单超时处理完成: {order_id}")
            else:
                print(f"订单状态已变更,无需超时处理: {order_id}, 状态: {order_status}")
            
            self.processed_orders.add(order_id)
            return True
        
        except Exception as e:
            print(f"处理订单超时消息异常: {e}")
            return False
    
    def _get_order_status(self, order_id: str) -> str:
        """获取订单状态(模拟)"""
        # 模拟订单状态查询
        import random
        statuses = ['UNPAID', 'PAID', 'CANCELLED']
        return random.choice(statuses)
    
    def _handle_order_timeout(self, order_id: str):
        """处理订单超时"""
        # 模拟订单超时处理逻辑
        print(f"执行订单超时处理: {order_id}")
        print(f"  1. 取消订单: {order_id}")
        print(f"  2. 释放库存")
        print(f"  3. 发送通知")

# 使用示例
if __name__ == "__main__":
    # 创建延迟消息调度器
    scheduler = DelayMessageScheduler()
    
    # 创建生产者
    producer = DelayMessageProducer(scheduler)
    
    # 创建监听器
    order_listener = OrderTimeoutListener()
    
    # 注册监听器
    scheduler.register_listener("order_timeout", order_listener)
    
    # 启动调度器
    scheduler.start()
    
    try:
        print("=== 延迟消息示例 ===")
        
        # 发送不同延迟级别的消息
        import json
        
        # 5秒后超时的订单
        order_data = {
            'order_id': 'ORDER_001',
            'user_id': 'USER_123',
            'amount': 100.0,
            'created_time': time.time()
        }
        
        message1 = producer.send_delay_message(
            topic="order_timeout",
            tag="timeout_check",
            body=json.dumps(order_data),
            delay_level=2  # 5秒
        )
        
        # 30秒后超时的订单
        order_data2 = {
            'order_id': 'ORDER_002',
            'user_id': 'USER_456',
            'amount': 200.0,
            'created_time': time.time()
        }
        
        message2 = producer.send_delay_message(
            topic="order_timeout",
            tag="timeout_check",
            body=json.dumps(order_data2),
            delay_level=4  # 30秒
        )
        
        # 自定义延迟时间的消息
        message3 = producer.send_delay_message(
            topic="order_timeout",
            tag="timeout_check",
            body=json.dumps({
                'order_id': 'ORDER_003',
                'user_id': 'USER_789',
                'amount': 300.0,
                'created_time': time.time()
            }),
            delay_seconds=15  # 15秒
        )
        
        # 定时消息(指定具体时间)
        scheduled_time = time.time() + 25  # 25秒后
        message4 = producer.send_scheduled_message(
            topic="order_timeout",
            tag="scheduled_check",
            body=json.dumps({
                'order_id': 'ORDER_004',
                'user_id': 'USER_999',
                'amount': 400.0,
                'created_time': time.time()
            }),
            scheduled_time=scheduled_time
        )
        
        print(f"\n已发送4条延迟消息")
        
        # 显示队列状态
        for i in range(6):
            time.sleep(10)
            status = scheduler.get_queue_status()
            print(f"\n第{i+1}次检查 - 队列状态:")
            print(f"  总调度消息数: {status['total_scheduled']}")
            for level, info in status['delay_queues'].items():
                if info['count'] > 0:
                    print(f"  延迟级别{level}: {info['count']}条消息")
    
    finally:
        scheduler.shutdown()

7.2.2 延迟消息应用场景

from dataclasses import dataclass
from typing import List, Dict, Optional, Callable
from enum import Enum
import time
import json
import uuid
from datetime import datetime, timedelta

class ScenarioType(Enum):
    """应用场景类型"""
    ORDER_TIMEOUT = "order_timeout"          # 订单超时
    PAYMENT_REMINDER = "payment_reminder"    # 支付提醒
    TASK_SCHEDULING = "task_scheduling"      # 任务调度
    DATA_CLEANUP = "data_cleanup"            # 数据清理
    RETRY_MECHANISM = "retry_mechanism"      # 重试机制
    NOTIFICATION = "notification"            # 通知提醒

@dataclass
class DelayScenario:
    """延迟场景"""
    scenario_type: ScenarioType
    name: str
    description: str
    typical_delay: str
    use_cases: List[str]
    implementation_tips: List[str]
    example_code: str

class DelayMessageScenarios:
    """延迟消息应用场景"""
    
    def __init__(self):
        self.scenarios = self._create_scenarios()
    
    def _create_scenarios(self) -> Dict[ScenarioType, DelayScenario]:
        """创建应用场景"""
        return {
            ScenarioType.ORDER_TIMEOUT: DelayScenario(
                scenario_type=ScenarioType.ORDER_TIMEOUT,
                name="订单超时处理",
                description="在订单创建后设定超时时间,如果用户未在规定时间内完成支付,自动取消订单",
                typical_delay="15分钟-30分钟",
                use_cases=[
                    "电商订单支付超时取消",
                    "预约服务超时释放",
                    "购物车商品超时清理",
                    "临时锁定资源超时释放"
                ],
                implementation_tips=[
                    "设置合理的超时时间,平衡用户体验和资源利用",
                    "在取消订单前检查订单当前状态",
                    "提供订单延期功能",
                    "记录超时原因用于分析优化"
                ],
                example_code="""
# 订单超时处理示例
class OrderTimeoutHandler:
    def __init__(self, producer: DelayMessageProducer):
        self.producer = producer
    
    def create_order_with_timeout(self, order_data: Dict, timeout_minutes: int = 30):
        # 创建订单
        order_id = self.create_order(order_data)
        
        # 发送延迟消息检查订单状态
        timeout_message = {
            'order_id': order_id,
            'action': 'check_timeout',
            'created_time': time.time()
        }
        
        self.producer.send_delay_message(
            topic='order_timeout',
            tag='timeout_check',
            body=json.dumps(timeout_message),
            delay_seconds=timeout_minutes * 60
        )
        
        return order_id
    
    def handle_order_timeout(self, order_id: str):
        order_status = self.get_order_status(order_id)
        
        if order_status == 'UNPAID':
            # 取消订单
            self.cancel_order(order_id)
            # 释放库存
            self.release_inventory(order_id)
            # 发送通知
            self.send_timeout_notification(order_id)
"""
            ),
            
            ScenarioType.PAYMENT_REMINDER: DelayScenario(
                scenario_type=ScenarioType.PAYMENT_REMINDER,
                name="支付提醒",
                description="在用户创建订单后,分阶段发送支付提醒,提高支付转化率",
                typical_delay="5分钟、15分钟、30分钟",
                use_cases=[
                    "电商支付提醒",
                    "会员续费提醒",
                    "账单支付提醒",
                    "服务费用催缴"
                ],
                implementation_tips=[
                    "设计多级提醒策略,避免过度打扰",
                    "个性化提醒内容和渠道",
                    "提供一键支付链接",
                    "统计提醒效果优化策略"
                ],
                example_code="""
# 支付提醒示例
class PaymentReminderService:
    def __init__(self, producer: DelayMessageProducer):
        self.producer = producer
        self.reminder_levels = [
            {'delay_minutes': 5, 'message': '您的订单还有5分钟到期,请尽快完成支付'},
            {'delay_minutes': 15, 'message': '您的订单即将到期,点击立即支付'},
            {'delay_minutes': 25, 'message': '最后提醒:您的订单5分钟后将自动取消'}
        ]
    
    def schedule_payment_reminders(self, order_id: str, user_id: str):
        for level, reminder in enumerate(self.reminder_levels):
            reminder_data = {
                'order_id': order_id,
                'user_id': user_id,
                'level': level + 1,
                'message': reminder['message'],
                'action': 'send_reminder'
            }
            
            self.producer.send_delay_message(
                topic='payment_reminder',
                tag=f'level_{level + 1}',
                body=json.dumps(reminder_data),
                delay_seconds=reminder['delay_minutes'] * 60
            )
    
    def send_reminder(self, reminder_data: Dict):
        # 检查订单是否已支付
        if self.is_order_paid(reminder_data['order_id']):
            return  # 已支付,不发送提醒
        
        # 发送提醒通知
        self.send_notification(
            user_id=reminder_data['user_id'],
            message=reminder_data['message'],
            order_id=reminder_data['order_id']
        )
"""
            ),
            
            ScenarioType.TASK_SCHEDULING: DelayScenario(
                scenario_type=ScenarioType.TASK_SCHEDULING,
                name="任务调度",
                description="延迟执行特定任务,如定时报表生成、数据同步、系统维护等",
                typical_delay="几小时到几天",
                use_cases=[
                    "定时报表生成",
                    "数据备份任务",
                    "系统健康检查",
                    "缓存预热任务",
                    "批量数据处理"
                ],
                implementation_tips=[
                    "考虑任务的幂等性",
                    "设计任务失败重试机制",
                    "监控任务执行状态",
                    "支持任务取消和重新调度"
                ],
                example_code="""
# 任务调度示例
class TaskScheduler:
    def __init__(self, producer: DelayMessageProducer):
        self.producer = producer
    
    def schedule_daily_report(self, report_type: str, target_date: str):
        # 计算执行时间(每天凌晨2点)
        from datetime import datetime, timedelta
        target_datetime = datetime.strptime(f"{target_date} 02:00:00", "%Y-%m-%d %H:%M:%S")
        delay_seconds = int((target_datetime - datetime.now()).total_seconds())
        
        if delay_seconds > 0:
            task_data = {
                'task_type': 'daily_report',
                'report_type': report_type,
                'target_date': target_date,
                'scheduled_time': target_datetime.isoformat()
            }
            
            self.producer.send_delay_message(
                topic='task_scheduler',
                tag='daily_report',
                body=json.dumps(task_data),
                delay_seconds=delay_seconds
            )
    
    def schedule_data_cleanup(self, cleanup_type: str, delay_days: int = 7):
        cleanup_data = {
            'task_type': 'data_cleanup',
            'cleanup_type': cleanup_type,
            'delay_days': delay_days,
            'scheduled_time': (datetime.now() + timedelta(days=delay_days)).isoformat()
        }
        
        self.producer.send_delay_message(
            topic='task_scheduler',
            tag='data_cleanup',
            body=json.dumps(cleanup_data),
            delay_seconds=delay_days * 24 * 3600
        )
"""
            ),
            
            ScenarioType.RETRY_MECHANISM: DelayScenario(
                scenario_type=ScenarioType.RETRY_MECHANISM,
                name="重试机制",
                description="对失败的操作进行延迟重试,实现指数退避或固定间隔重试",
                typical_delay="1秒、5秒、30秒、2分钟、10分钟",
                use_cases=[
                    "API调用失败重试",
                    "数据同步失败重试",
                    "消息发送失败重试",
                    "文件上传失败重试"
                ],
                implementation_tips=[
                    "使用指数退避算法避免重试风暴",
                    "设置最大重试次数",
                    "记录重试历史用于问题分析",
                    "对不同类型的错误采用不同重试策略"
                ],
                example_code="""
# 重试机制示例
class RetryManager:
    def __init__(self, producer: DelayMessageProducer):
        self.producer = producer
        self.max_retries = 5
        self.base_delay = 1  # 基础延迟秒数
    
    def execute_with_retry(self, task_data: Dict, retry_count: int = 0):
        try:
            # 执行任务
            result = self.execute_task(task_data)
            return result
        except Exception as e:
            if retry_count < self.max_retries:
                # 计算下次重试延迟(指数退避)
                delay_seconds = self.base_delay * (2 ** retry_count)
                
                retry_data = task_data.copy()
                retry_data.update({
                    'retry_count': retry_count + 1,
                    'last_error': str(e),
                    'next_retry_time': (datetime.now() + timedelta(seconds=delay_seconds)).isoformat()
                })
                
                # 发送延迟重试消息
                self.producer.send_delay_message(
                    topic='retry_tasks',
                    tag='retry_execution',
                    body=json.dumps(retry_data),
                    delay_seconds=delay_seconds
                )
                
                print(f"任务执行失败,{delay_seconds}秒后重试(第{retry_count + 1}次)")
            else:
                print(f"任务执行失败,已达到最大重试次数: {e}")
                # 发送到死信队列或人工处理
                self.handle_final_failure(task_data, e)
"""
            )
        }
    
    def get_scenario(self, scenario_type: ScenarioType) -> Optional[DelayScenario]:
        """获取指定场景"""
        return self.scenarios.get(scenario_type)
    
    def print_scenarios(self):
        """打印所有应用场景"""
        print("=== 延迟消息应用场景 ===")
        
        for scenario_type, scenario in self.scenarios.items():
            print(f"\n【{scenario.name}】")
            print(f"描述: {scenario.description}")
            print(f"典型延迟: {scenario.typical_delay}")
            
            print("\n应用场景:")
            for use_case in scenario.use_cases:
                print(f"  • {use_case}")
            
            print("\n实现要点:")
            for tip in scenario.implementation_tips:
                print(f"  ✓ {tip}")
            
            print(f"\n示例代码:")
            print(scenario.example_code)
    
    def create_scenario_demo(self, scenario_type: ScenarioType) -> Dict:
        """创建场景演示"""
        scenario = self.get_scenario(scenario_type)
        if not scenario:
            return {}
        
        demo_data = {
            'scenario_name': scenario.name,
            'demo_id': str(uuid.uuid4()),
            'created_time': datetime.now().isoformat(),
            'status': 'created'
        }
        
        if scenario_type == ScenarioType.ORDER_TIMEOUT:
            demo_data.update({
                'order_id': f'ORDER_{int(time.time())}',
                'user_id': 'USER_DEMO',
                'amount': 99.99,
                'timeout_minutes': 30
            })
        
        elif scenario_type == ScenarioType.PAYMENT_REMINDER:
            demo_data.update({
                'order_id': f'ORDER_{int(time.time())}',
                'user_id': 'USER_DEMO',
                'reminder_levels': 3,
                'intervals': [5, 15, 25]  # 分钟
            })
        
        elif scenario_type == ScenarioType.TASK_SCHEDULING:
            demo_data.update({
                'task_type': 'daily_report',
                'target_date': (datetime.now() + timedelta(days=1)).strftime('%Y-%m-%d'),
                'execution_time': '02:00:00'
            })
        
        return demo_data

# 使用示例
if __name__ == "__main__":
    scenarios = DelayMessageScenarios()
    
    # 打印所有应用场景
    scenarios.print_scenarios()
    
    # 创建场景演示
    print("\n=== 场景演示 ===")
    
    for scenario_type in ScenarioType:
        demo = scenarios.create_scenario_demo(scenario_type)
        if demo:
            print(f"\n{demo['scenario_name']}演示:")
            for key, value in demo.items():
                if key != 'scenario_name':
                    print(f"  {key}: {value}")

7.3 批量消息

7.3.1 批量消息处理

from dataclasses import dataclass
from typing import List, Dict, Optional, Callable, Iterator
from enum import Enum
import time
import uuid
import threading
import json
from concurrent.futures import ThreadPoolExecutor, as_completed

class BatchMode(Enum):
    """批量模式"""
    SIZE_BASED = "size_based"        # 基于大小的批量
    TIME_BASED = "time_based"        # 基于时间的批量
    MIXED = "mixed"                  # 混合模式

class BatchStatus(Enum):
    """批量状态"""
    PENDING = "pending"              # 待处理
    PROCESSING = "processing"        # 处理中
    COMPLETED = "completed"          # 已完成
    FAILED = "failed"                # 失败
    PARTIAL_SUCCESS = "partial_success"  # 部分成功

@dataclass
class BatchMessage:
    """批量消息"""
    message_id: str
    topic: str
    tag: str
    body: str
    properties: Dict[str, str]
    created_time: float
    
    def __post_init__(self):
        if self.properties is None:
            self.properties = {}

@dataclass
class MessageBatch:
    """消息批次"""
    batch_id: str
    messages: List[BatchMessage]
    batch_size: int
    total_size_bytes: int
    created_time: float
    status: BatchStatus
    processed_count: int = 0
    failed_count: int = 0
    error_messages: List[str] = None
    
    def __post_init__(self):
        if self.error_messages is None:
            self.error_messages = []

@dataclass
class BatchConfig:
    """批量配置"""
    max_batch_size: int = 1000           # 最大批量大小
    max_batch_bytes: int = 4 * 1024 * 1024  # 最大批量字节数(4MB)
    batch_timeout_ms: int = 1000         # 批量超时时间(毫秒)
    max_retry_count: int = 3             # 最大重试次数
    thread_pool_size: int = 10           # 线程池大小
    enable_compression: bool = True       # 启用压缩
    compression_threshold: int = 1024     # 压缩阈值

class BatchMessageProcessor:
    """批量消息处理器"""
    
    def __init__(self, config: BatchConfig = None):
        self.config = config or BatchConfig()
        self.pending_messages: List[BatchMessage] = []
        self.processing_batches: Dict[str, MessageBatch] = {}
        self.completed_batches: List[MessageBatch] = []
        self.message_handlers: Dict[str, Callable] = {}  # 主题处理器
        self._lock = threading.Lock()
        self._running = False
        self._batch_thread = None
        self._executor = None
    
    def start(self):
        """启动批量处理器"""
        self._running = True
        self._executor = ThreadPoolExecutor(max_workers=self.config.thread_pool_size)
        self._batch_thread = threading.Thread(target=self._batch_processing_loop)
        self._batch_thread.daemon = True
        self._batch_thread.start()
        print("批量消息处理器已启动")
    
    def shutdown(self):
        """关闭批量处理器"""
        self._running = False
        if self._batch_thread:
            self._batch_thread.join()
        if self._executor:
            self._executor.shutdown(wait=True)
        print("批量消息处理器已关闭")
    
    def register_handler(self, topic: str, handler: Callable[[List[BatchMessage]], bool]):
        """注册主题处理器"""
        self.message_handlers[topic] = handler
        print(f"已注册主题处理器: {topic}")
    
    def add_message(self, topic: str, tag: str, body: str, properties: Dict[str, str] = None) -> str:
        """添加消息到批量队列"""
        message = BatchMessage(
            message_id=str(uuid.uuid4()),
            topic=topic,
            tag=tag,
            body=body,
            properties=properties or {},
            created_time=time.time()
        )
        
        with self._lock:
            self.pending_messages.append(message)
        
        return message.message_id
    
    def add_messages(self, messages_data: List[Dict]) -> List[str]:
        """批量添加消息"""
        message_ids = []
        
        with self._lock:
            for msg_data in messages_data:
                message = BatchMessage(
                    message_id=str(uuid.uuid4()),
                    topic=msg_data['topic'],
                    tag=msg_data.get('tag', ''),
                    body=msg_data['body'],
                    properties=msg_data.get('properties', {}),
                    created_time=time.time()
                )
                self.pending_messages.append(message)
                message_ids.append(message.message_id)
        
        return message_ids
    
    def _batch_processing_loop(self):
        """批量处理循环"""
        last_batch_time = time.time()
        
        while self._running:
            try:
                current_time = time.time()
                should_create_batch = False
                
                with self._lock:
                    # 检查是否需要创建批次
                    if len(self.pending_messages) >= self.config.max_batch_size:
                        should_create_batch = True
                    elif (len(self.pending_messages) > 0 and 
                          (current_time - last_batch_time) * 1000 >= self.config.batch_timeout_ms):
                        should_create_batch = True
                    elif self._calculate_batch_size() >= self.config.max_batch_bytes:
                        should_create_batch = True
                
                if should_create_batch:
                    batch = self._create_batch()
                    if batch:
                        self._submit_batch_for_processing(batch)
                        last_batch_time = current_time
                
                time.sleep(0.1)  # 100ms检查间隔
            
            except Exception as e:
                print(f"批量处理循环异常: {e}")
    
    def _calculate_batch_size(self) -> int:
        """计算当前批次大小(字节)"""
        total_size = 0
        for message in self.pending_messages:
            total_size += len(message.body.encode('utf-8'))
            total_size += sum(len(k.encode('utf-8')) + len(v.encode('utf-8')) 
                            for k, v in message.properties.items())
        return total_size
    
    def _create_batch(self) -> Optional[MessageBatch]:
        """创建消息批次"""
        with self._lock:
            if not self.pending_messages:
                return None
            
            # 确定批次大小
            batch_size = min(len(self.pending_messages), self.config.max_batch_size)
            batch_messages = self.pending_messages[:batch_size]
            self.pending_messages = self.pending_messages[batch_size:]
            
            # 计算批次总大小
            total_size = sum(len(msg.body.encode('utf-8')) for msg in batch_messages)
            
            batch = MessageBatch(
                batch_id=str(uuid.uuid4()),
                messages=batch_messages,
                batch_size=len(batch_messages),
                total_size_bytes=total_size,
                created_time=time.time(),
                status=BatchStatus.PENDING
            )
            
            self.processing_batches[batch.batch_id] = batch
            return batch
    
    def _submit_batch_for_processing(self, batch: MessageBatch):
        """提交批次进行处理"""
        batch.status = BatchStatus.PROCESSING
        future = self._executor.submit(self._process_batch, batch)
        
        # 异步处理完成回调
        def batch_completed(fut):
            try:
                result = fut.result()
                self._handle_batch_completion(batch, result)
            except Exception as e:
                self._handle_batch_error(batch, e)
        
        future.add_done_callback(batch_completed)
    
    def _process_batch(self, batch: MessageBatch) -> bool:
        """处理消息批次"""
        print(f"开始处理批次: {batch.batch_id}, 消息数量: {batch.batch_size}")
        
        # 按主题分组消息
        topic_groups = self._group_messages_by_topic(batch.messages)
        
        success_count = 0
        failed_count = 0
        
        for topic, messages in topic_groups.items():
            try:
                handler = self.message_handlers.get(topic)
                if handler:
                    # 调用主题处理器
                    if handler(messages):
                        success_count += len(messages)
                        print(f"主题 {topic} 处理成功: {len(messages)} 条消息")
                    else:
                        failed_count += len(messages)
                        batch.error_messages.append(f"主题 {topic} 处理失败")
                        print(f"主题 {topic} 处理失败: {len(messages)} 条消息")
                else:
                    failed_count += len(messages)
                    batch.error_messages.append(f"未找到主题 {topic} 的处理器")
                    print(f"未找到主题 {topic} 的处理器")
            
            except Exception as e:
                failed_count += len(messages)
                error_msg = f"处理主题 {topic} 异常: {str(e)}"
                batch.error_messages.append(error_msg)
                print(error_msg)
        
        batch.processed_count = success_count
        batch.failed_count = failed_count
        
        return failed_count == 0
    
    def _group_messages_by_topic(self, messages: List[BatchMessage]) -> Dict[str, List[BatchMessage]]:
        """按主题分组消息"""
        groups = {}
        for message in messages:
            if message.topic not in groups:
                groups[message.topic] = []
            groups[message.topic].append(message)
        return groups
    
    def _handle_batch_completion(self, batch: MessageBatch, success: bool):
        """处理批次完成"""
        if success:
            batch.status = BatchStatus.COMPLETED
            print(f"批次处理完成: {batch.batch_id}")
        else:
            if batch.processed_count > 0:
                batch.status = BatchStatus.PARTIAL_SUCCESS
                print(f"批次部分成功: {batch.batch_id}, 成功: {batch.processed_count}, 失败: {batch.failed_count}")
            else:
                batch.status = BatchStatus.FAILED
                print(f"批次处理失败: {batch.batch_id}")
        
        # 移动到已完成列表
        with self._lock:
            if batch.batch_id in self.processing_batches:
                del self.processing_batches[batch.batch_id]
            self.completed_batches.append(batch)
            
            # 保持已完成批次列表大小
            if len(self.completed_batches) > 1000:
                self.completed_batches = self.completed_batches[-500:]
    
    def _handle_batch_error(self, batch: MessageBatch, error: Exception):
        """处理批次错误"""
        batch.status = BatchStatus.FAILED
        batch.error_messages.append(f"批次处理异常: {str(error)}")
        print(f"批次处理异常: {batch.batch_id}, 错误: {error}")
        
        with self._lock:
            if batch.batch_id in self.processing_batches:
                del self.processing_batches[batch.batch_id]
            self.completed_batches.append(batch)
    
    def get_statistics(self) -> Dict:
        """获取处理统计信息"""
        with self._lock:
            pending_count = len(self.pending_messages)
            processing_count = len(self.processing_batches)
            completed_count = len(self.completed_batches)
            
            # 统计已完成批次的详细信息
            total_processed = 0
            total_failed = 0
            successful_batches = 0
            failed_batches = 0
            partial_success_batches = 0
            
            for batch in self.completed_batches:
                total_processed += batch.processed_count
                total_failed += batch.failed_count
                
                if batch.status == BatchStatus.COMPLETED:
                    successful_batches += 1
                elif batch.status == BatchStatus.FAILED:
                    failed_batches += 1
                elif batch.status == BatchStatus.PARTIAL_SUCCESS:
                    partial_success_batches += 1
        
        return {
            'pending_messages': pending_count,
            'processing_batches': processing_count,
            'completed_batches': completed_count,
            'total_processed_messages': total_processed,
            'total_failed_messages': total_failed,
            'successful_batches': successful_batches,
            'failed_batches': failed_batches,
            'partial_success_batches': partial_success_batches,
            'success_rate': total_processed / (total_processed + total_failed) if (total_processed + total_failed) > 0 else 0
        }

# 示例消息处理器
class OrderMessageHandler:
    """订单消息处理器"""
    
    def __init__(self):
        self.processed_orders = set()
    
    def handle_order_messages(self, messages: List[BatchMessage]) -> bool:
        """处理订单消息"""
        try:
            print(f"处理订单消息批次,数量: {len(messages)}")
            
            for message in messages:
                order_data = json.loads(message.body)
                order_id = order_data.get('order_id')
                
                if order_id not in self.processed_orders:
                    # 处理订单
                    self._process_order(order_data)
                    self.processed_orders.add(order_id)
                    print(f"  处理订单: {order_id}")
                else:
                    print(f"  订单已处理,跳过: {order_id}")
            
            return True
        
        except Exception as e:
            print(f"处理订单消息异常: {e}")
            return False
    
    def _process_order(self, order_data: Dict):
        """处理单个订单"""
        # 模拟订单处理逻辑
        time.sleep(0.01)  # 模拟处理时间

class NotificationMessageHandler:
    """通知消息处理器"""
    
    def handle_notification_messages(self, messages: List[BatchMessage]) -> bool:
        """处理通知消息"""
        try:
            print(f"处理通知消息批次,数量: {len(messages)}")
            
            # 批量发送通知
            notification_data = []
            for message in messages:
                data = json.loads(message.body)
                notification_data.append(data)
            
            # 模拟批量发送
            self._send_batch_notifications(notification_data)
            
            return True
        
        except Exception as e:
            print(f"处理通知消息异常: {e}")
            return False
    
    def _send_batch_notifications(self, notifications: List[Dict]):
        """批量发送通知"""
        print(f"  批量发送通知: {len(notifications)} 条")
        time.sleep(0.05)  # 模拟发送时间

# 使用示例
if __name__ == "__main__":
    # 创建批量配置
    config = BatchConfig(
        max_batch_size=100,
        max_batch_bytes=1024 * 1024,  # 1MB
        batch_timeout_ms=2000,  # 2秒
        thread_pool_size=5
    )
    
    # 创建批量处理器
    processor = BatchMessageProcessor(config)
    
    # 创建消息处理器
    order_handler = OrderMessageHandler()
    notification_handler = NotificationMessageHandler()
    
    # 注册处理器
    processor.register_handler('order_topic', order_handler.handle_order_messages)
    processor.register_handler('notification_topic', notification_handler.handle_notification_messages)
    
    # 启动处理器
    processor.start()
    
    try:
        print("=== 批量消息处理示例 ===")
        
        # 添加订单消息
        order_messages = []
        for i in range(150):
            order_data = {
                'topic': 'order_topic',
                'tag': 'create_order',
                'body': json.dumps({
                    'order_id': f'ORDER_{i:04d}',
                    'user_id': f'USER_{i % 10}',
                    'amount': 100.0 + i,
                    'created_time': time.time()
                }),
                'properties': {'source': 'batch_demo'}
            }
            order_messages.append(order_data)
        
        # 批量添加订单消息
        order_ids = processor.add_messages(order_messages)
        print(f"已添加 {len(order_ids)} 条订单消息")
        
        # 添加通知消息
        for i in range(80):
            processor.add_message(
                topic='notification_topic',
                tag='user_notification',
                body=json.dumps({
                    'user_id': f'USER_{i % 20}',
                    'message': f'您有新的订单更新 {i}',
                    'type': 'order_update'
                }),
                properties={'priority': 'normal'}
            )
        
        print("已添加 80 条通知消息")
        
        # 等待处理完成
        for i in range(10):
            time.sleep(2)
            stats = processor.get_statistics()
            print(f"\n第{i+1}次统计:")
            print(f"  待处理消息: {stats['pending_messages']}")
            print(f"  处理中批次: {stats['processing_batches']}")
            print(f"  已完成批次: {stats['completed_batches']}")
            print(f"  处理成功消息: {stats['total_processed_messages']}")
            print(f"  处理失败消息: {stats['total_failed_messages']}")
            print(f"  成功率: {stats['success_rate']:.2%}")
            
            if stats['pending_messages'] == 0 and stats['processing_batches'] == 0:
                print("\n所有消息处理完成!")
                break
    
    finally:
        processor.shutdown()

7.3.2 批量消息优化策略

from dataclasses import dataclass
from typing import List, Dict, Optional, Callable, Any
from enum import Enum
import time
import threading
import json
import gzip
import pickle
from collections import defaultdict
import hashlib

class CompressionType(Enum):
    """压缩类型"""
    NONE = "none"
    GZIP = "gzip"
    LZ4 = "lz4"
    SNAPPY = "snappy"

class PartitionStrategy(Enum):
    """分区策略"""
    ROUND_ROBIN = "round_robin"      # 轮询
    HASH_KEY = "hash_key"            # 哈希键
    STICKY = "sticky"                # 粘性分区
    CUSTOM = "custom"                # 自定义

@dataclass
class BatchOptimizationConfig:
    """批量优化配置"""
    # 压缩配置
    compression_type: CompressionType = CompressionType.GZIP
    compression_level: int = 6
    compression_threshold: int = 1024  # 压缩阈值(字节)
    
    # 分区配置
    partition_strategy: PartitionStrategy = PartitionStrategy.HASH_KEY
    partition_count: int = 16
    
    # 缓存配置
    enable_message_cache: bool = True
    cache_size: int = 10000
    cache_ttl_seconds: int = 300
    
    # 性能配置
    enable_async_processing: bool = True
    async_buffer_size: int = 1000
    flush_interval_ms: int = 100
    
    # 重复检测
    enable_deduplication: bool = True
    dedup_window_seconds: int = 3600

class MessageCompressor:
    """消息压缩器"""
    
    def __init__(self, compression_type: CompressionType, level: int = 6):
        self.compression_type = compression_type
        self.level = level
    
    def compress(self, data: bytes) -> bytes:
        """压缩数据"""
        if self.compression_type == CompressionType.GZIP:
            return gzip.compress(data, compresslevel=self.level)
        elif self.compression_type == CompressionType.NONE:
            return data
        else:
            # 其他压缩算法的实现
            return data
    
    def decompress(self, data: bytes) -> bytes:
        """解压数据"""
        if self.compression_type == CompressionType.GZIP:
            return gzip.decompress(data)
        elif self.compression_type == CompressionType.NONE:
            return data
        else:
            return data
    
    def should_compress(self, data: bytes, threshold: int) -> bool:
        """判断是否需要压缩"""
        return len(data) >= threshold and self.compression_type != CompressionType.NONE

class MessagePartitioner:
    """消息分区器"""
    
    def __init__(self, strategy: PartitionStrategy, partition_count: int):
        self.strategy = strategy
        self.partition_count = partition_count
        self._round_robin_counter = 0
        self._sticky_partition = 0
        self._sticky_batch_count = 0
        self._sticky_threshold = 100  # 粘性分区阈值
    
    def get_partition(self, message: BatchMessage) -> int:
        """获取消息分区"""
        if self.strategy == PartitionStrategy.ROUND_ROBIN:
            return self._round_robin_partition()
        elif self.strategy == PartitionStrategy.HASH_KEY:
            return self._hash_partition(message)
        elif self.strategy == PartitionStrategy.STICKY:
            return self._sticky_partition_strategy()
        else:
            return 0
    
    def _round_robin_partition(self) -> int:
        """轮询分区"""
        partition = self._round_robin_counter % self.partition_count
        self._round_robin_counter += 1
        return partition
    
    def _hash_partition(self, message: BatchMessage) -> int:
        """哈希分区"""
        # 使用消息的主题和标签作为哈希键
        hash_key = f"{message.topic}:{message.tag}"
        hash_value = hashlib.md5(hash_key.encode()).hexdigest()
        return int(hash_value, 16) % self.partition_count
    
    def _sticky_partition_strategy(self) -> int:
        """粘性分区策略"""
        if self._sticky_batch_count >= self._sticky_threshold:
            # 切换到下一个分区
            self._sticky_partition = (self._sticky_partition + 1) % self.partition_count
            self._sticky_batch_count = 0
        
        self._sticky_batch_count += 1
        return self._sticky_partition

class MessageCache:
    """消息缓存"""
    
    def __init__(self, max_size: int, ttl_seconds: int):
        self.max_size = max_size
        self.ttl_seconds = ttl_seconds
        self._cache: Dict[str, Dict] = {}
        self._access_times: Dict[str, float] = {}
        self._lock = threading.Lock()
    
    def get(self, key: str) -> Optional[Any]:
        """获取缓存"""
        with self._lock:
            if key in self._cache:
                # 检查是否过期
                if time.time() - self._access_times[key] < self.ttl_seconds:
                    self._access_times[key] = time.time()
                    return self._cache[key]
                else:
                    # 过期,删除
                    del self._cache[key]
                    del self._access_times[key]
            return None
    
    def put(self, key: str, value: Any):
        """放入缓存"""
        with self._lock:
            # 检查缓存大小
            if len(self._cache) >= self.max_size:
                self._evict_oldest()
            
            self._cache[key] = value
            self._access_times[key] = time.time()
    
    def _evict_oldest(self):
        """淘汰最旧的缓存项"""
        if not self._access_times:
            return
        
        oldest_key = min(self._access_times.keys(), key=lambda k: self._access_times[k])
        del self._cache[oldest_key]
        del self._access_times[oldest_key]
    
    def clear_expired(self):
        """清理过期缓存"""
        with self._lock:
            current_time = time.time()
            expired_keys = [
                key for key, access_time in self._access_times.items()
                if current_time - access_time >= self.ttl_seconds
            ]
            
            for key in expired_keys:
                del self._cache[key]
                del self._access_times[key]

class MessageDeduplicator:
    """消息去重器"""
    
    def __init__(self, window_seconds: int):
        self.window_seconds = window_seconds
        self._seen_messages: Dict[str, float] = {}
        self._lock = threading.Lock()
    
    def is_duplicate(self, message: BatchMessage) -> bool:
        """检查是否重复消息"""
        message_id = self._generate_message_id(message)
        current_time = time.time()
        
        with self._lock:
            # 清理过期记录
            self._cleanup_expired(current_time)
            
            if message_id in self._seen_messages:
                return True
            
            self._seen_messages[message_id] = current_time
            return False
    
    def _generate_message_id(self, message: BatchMessage) -> str:
        """生成消息ID"""
        # 基于消息内容生成唯一ID
        content = f"{message.topic}:{message.tag}:{message.body}"
        return hashlib.sha256(content.encode()).hexdigest()
    
    def _cleanup_expired(self, current_time: float):
        """清理过期记录"""
        expired_ids = [
            msg_id for msg_id, timestamp in self._seen_messages.items()
            if current_time - timestamp >= self.window_seconds
        ]
        
        for msg_id in expired_ids:
            del self._seen_messages[msg_id]

class OptimizedBatchProcessor:
    """优化的批量处理器"""
    
    def __init__(self, config: BatchOptimizationConfig):
        self.config = config
        self.compressor = MessageCompressor(config.compression_type, config.compression_level)
        self.partitioner = MessagePartitioner(config.partition_strategy, config.partition_count)
        
        # 可选组件
        self.cache = MessageCache(config.cache_size, config.cache_ttl_seconds) if config.enable_message_cache else None
        self.deduplicator = MessageDeduplicator(config.dedup_window_seconds) if config.enable_deduplication else None
        
        # 分区缓冲区
        self.partition_buffers: Dict[int, List[BatchMessage]] = defaultdict(list)
        self._buffer_lock = threading.Lock()
        
        # 统计信息
        self.stats = {
            'total_messages': 0,
            'compressed_messages': 0,
            'duplicate_messages': 0,
            'cache_hits': 0,
            'cache_misses': 0,
            'compression_ratio': 0.0
        }
    
    def process_message(self, message: BatchMessage) -> bool:
        """处理单个消息"""
        self.stats['total_messages'] += 1
        
        # 去重检查
        if self.deduplicator and self.deduplicator.is_duplicate(message):
            self.stats['duplicate_messages'] += 1
            return False
        
        # 缓存检查
        if self.cache:
            cache_key = f"{message.topic}:{message.tag}:{hashlib.md5(message.body.encode()).hexdigest()}"
            cached_result = self.cache.get(cache_key)
            if cached_result:
                self.stats['cache_hits'] += 1
                return True
            else:
                self.stats['cache_misses'] += 1
        
        # 消息压缩
        if self.compressor.should_compress(message.body.encode(), self.config.compression_threshold):
            original_size = len(message.body.encode())
            compressed_body = self.compressor.compress(message.body.encode())
            message.body = compressed_body.decode('latin-1')  # 保存压缩数据
            message.properties['compressed'] = 'true'
            message.properties['original_size'] = str(original_size)
            
            self.stats['compressed_messages'] += 1
            compression_ratio = len(compressed_body) / original_size
            self.stats['compression_ratio'] = (
                (self.stats['compression_ratio'] * (self.stats['compressed_messages'] - 1) + compression_ratio) /
                self.stats['compressed_messages']
            )
        
        # 分区分配
        partition = self.partitioner.get_partition(message)
        
        with self._buffer_lock:
            self.partition_buffers[partition].append(message)
        
        # 缓存结果
        if self.cache:
            self.cache.put(cache_key, True)
        
        return True
    
    def get_partition_batches(self) -> Dict[int, List[BatchMessage]]:
        """获取分区批次"""
        with self._buffer_lock:
            result = dict(self.partition_buffers)
            self.partition_buffers.clear()
            return result
    
    def get_statistics(self) -> Dict:
        """获取统计信息"""
        stats = self.stats.copy()
        
        if self.cache:
            cache_total = stats['cache_hits'] + stats['cache_misses']
            stats['cache_hit_rate'] = stats['cache_hits'] / cache_total if cache_total > 0 else 0
        
        if stats['total_messages'] > 0:
            stats['duplicate_rate'] = stats['duplicate_messages'] / stats['total_messages']
            stats['compression_rate'] = stats['compressed_messages'] / stats['total_messages']
        
        return stats
    
    def cleanup(self):
        """清理资源"""
        if self.cache:
            self.cache.clear_expired()

# 使用示例
if __name__ == "__main__":
    # 创建优化配置
    config = BatchOptimizationConfig(
        compression_type=CompressionType.GZIP,
        compression_threshold=512,
        partition_strategy=PartitionStrategy.HASH_KEY,
        partition_count=8,
        enable_message_cache=True,
        cache_size=5000,
        enable_deduplication=True
    )
    
    # 创建优化处理器
    processor = OptimizedBatchProcessor(config)
    
    print("=== 批量消息优化示例 ===")
    
    # 模拟处理消息
    for i in range(1000):
        message = BatchMessage(
            message_id=f"msg_{i}",
            topic=f"topic_{i % 5}",
            tag=f"tag_{i % 3}",
            body=f"这是一条测试消息,内容比较长,用于测试压缩效果。消息编号: {i}" * 10,
            properties={},
            created_time=time.time()
        )
        
        processor.process_message(message)
        
        # 添加一些重复消息
        if i % 100 == 0:
            processor.process_message(message)  # 重复消息
    
    # 获取分区批次
    partition_batches = processor.get_partition_batches()
    print(f"\n分区分布:")
    for partition, messages in partition_batches.items():
        print(f"  分区 {partition}: {len(messages)} 条消息")
    
    # 显示统计信息
    stats = processor.get_statistics()
    print(f"\n处理统计:")
    print(f"  总消息数: {stats['total_messages']}")
    print(f"  压缩消息数: {stats['compressed_messages']}")
    print(f"  重复消息数: {stats['duplicate_messages']}")
    print(f"  缓存命中数: {stats['cache_hits']}")
    print(f"  缓存未命中数: {stats['cache_misses']}")
    print(f"  压缩率: {stats.get('compression_rate', 0):.2%}")
    print(f"  重复率: {stats.get('duplicate_rate', 0):.2%}")
    print(f"  缓存命中率: {stats.get('cache_hit_rate', 0):.2%}")
    print(f"  平均压缩比: {stats['compression_ratio']:.2f}")

7.4 顺序消息

7.4.1 顺序消息实现

from dataclasses import dataclass
from typing import List, Dict, Optional, Callable, Any
from enum import Enum
import time
import threading
import queue
import uuid
import json
from collections import defaultdict, OrderedDict

class OrderType(Enum):
    """顺序类型"""
    GLOBAL = "global"        # 全局顺序
    PARTITION = "partition"  # 分区顺序
    FIFO = "fifo"           # 先进先出

class MessageStatus(Enum):
    """消息状态"""
    PENDING = "pending"      # 待处理
    PROCESSING = "processing"  # 处理中
    COMPLETED = "completed"  # 已完成
    FAILED = "failed"        # 失败
    SKIPPED = "skipped"      # 跳过

@dataclass
class OrderedMessage:
    """顺序消息"""
    message_id: str
    topic: str
    tag: str
    body: str
    order_key: str          # 顺序键
    sequence_id: int        # 序列号
    partition_id: int       # 分区ID
    properties: Dict[str, str]
    created_time: float
    status: MessageStatus = MessageStatus.PENDING
    retry_count: int = 0
    
    def __post_init__(self):
        if self.properties is None:
            self.properties = {}

@dataclass
class OrderConfig:
    """顺序配置"""
    order_type: OrderType = OrderType.PARTITION
    max_retry_count: int = 3
    retry_delay_ms: int = 1000
    timeout_ms: int = 30000
    enable_strict_order: bool = True    # 严格顺序
    enable_message_skip: bool = False   # 允许跳过失败消息
    batch_size: int = 100
    consumer_thread_count: int = 1      # 顺序消费线程数

class OrderedMessageQueue:
    """顺序消息队列"""
    
    def __init__(self, order_key: str, config: OrderConfig):
        self.order_key = order_key
        self.config = config
        self.messages: OrderedDict[int, OrderedMessage] = OrderedDict()
        self.current_sequence = 0
        self.processing_sequence = 0
        self._lock = threading.Lock()
        self._condition = threading.Condition(self._lock)
    
    def add_message(self, message: OrderedMessage) -> bool:
        """添加消息到队列"""
        with self._lock:
            if message.sequence_id in self.messages:
                return False  # 重复消息
            
            self.messages[message.sequence_id] = message
            self.current_sequence = max(self.current_sequence, message.sequence_id)
            self._condition.notify_all()
            return True
    
    def get_next_message(self, timeout_ms: int = None) -> Optional[OrderedMessage]:
        """获取下一条消息"""
        timeout_seconds = timeout_ms / 1000.0 if timeout_ms else None
        
        with self._condition:
            while True:
                # 查找下一条可处理的消息
                next_seq = self.processing_sequence + 1
                
                if next_seq in self.messages:
                    message = self.messages[next_seq]
                    if message.status == MessageStatus.PENDING:
                        message.status = MessageStatus.PROCESSING
                        self.processing_sequence = next_seq
                        return message
                
                # 检查是否可以跳过失败的消息
                if self.config.enable_message_skip:
                    skipped_message = self._find_skippable_message()
                    if skipped_message:
                        return skipped_message
                
                # 等待新消息
                if not self._condition.wait(timeout_seconds):
                    return None  # 超时
    
    def _find_skippable_message(self) -> Optional[OrderedMessage]:
        """查找可跳过的消息"""
        next_seq = self.processing_sequence + 1
        
        if next_seq in self.messages:
            message = self.messages[next_seq]
            if (message.status == MessageStatus.FAILED and 
                message.retry_count >= self.config.max_retry_count):
                message.status = MessageStatus.SKIPPED
                self.processing_sequence = next_seq
                return message
        
        return None
    
    def complete_message(self, sequence_id: int, success: bool):
        """完成消息处理"""
        with self._lock:
            if sequence_id in self.messages:
                message = self.messages[sequence_id]
                if success:
                    message.status = MessageStatus.COMPLETED
                    # 清理已完成的消息
                    self._cleanup_completed_messages()
                else:
                    message.status = MessageStatus.FAILED
                    message.retry_count += 1
                    
                    if message.retry_count < self.config.max_retry_count:
                        # 重新设置为待处理状态
                        message.status = MessageStatus.PENDING
                        self.processing_sequence = sequence_id - 1  # 回退序列号
                
                self._condition.notify_all()
    
    def _cleanup_completed_messages(self):
        """清理已完成的消息"""
        # 从头开始清理连续的已完成消息
        to_remove = []
        for seq_id, message in self.messages.items():
            if message.status == MessageStatus.COMPLETED:
                to_remove.append(seq_id)
            else:
                break
        
        for seq_id in to_remove:
            del self.messages[seq_id]
    
    def get_queue_info(self) -> Dict:
        """获取队列信息"""
        with self._lock:
            pending_count = sum(1 for msg in self.messages.values() if msg.status == MessageStatus.PENDING)
            processing_count = sum(1 for msg in self.messages.values() if msg.status == MessageStatus.PROCESSING)
            failed_count = sum(1 for msg in self.messages.values() if msg.status == MessageStatus.FAILED)
            
            return {
                'order_key': self.order_key,
                'total_messages': len(self.messages),
                'pending_messages': pending_count,
                'processing_messages': processing_count,
                'failed_messages': failed_count,
                'current_sequence': self.current_sequence,
                'processing_sequence': self.processing_sequence
            }

class OrderedMessageProducer:
    """顺序消息生产者"""
    
    def __init__(self, config: OrderConfig):
        self.config = config
        self.sequence_generators: Dict[str, int] = defaultdict(int)
        self._lock = threading.Lock()
    
    def send_ordered_message(self, topic: str, tag: str, body: str, 
                           order_key: str, properties: Dict[str, str] = None) -> OrderedMessage:
        """发送顺序消息"""
        with self._lock:
            # 生成序列号
            self.sequence_generators[order_key] += 1
            sequence_id = self.sequence_generators[order_key]
        
        # 计算分区ID(基于order_key)
        partition_id = hash(order_key) % 16  # 假设16个分区
        
        message = OrderedMessage(
            message_id=str(uuid.uuid4()),
            topic=topic,
            tag=tag,
            body=body,
            order_key=order_key,
            sequence_id=sequence_id,
            partition_id=partition_id,
            properties=properties or {},
            created_time=time.time()
        )
        
        print(f"发送顺序消息: {order_key}#{sequence_id}")
        return message
    
    def send_batch_ordered_messages(self, messages_data: List[Dict]) -> List[OrderedMessage]:
        """批量发送顺序消息"""
        messages = []
        
        for msg_data in messages_data:
            message = self.send_ordered_message(
                topic=msg_data['topic'],
                tag=msg_data.get('tag', ''),
                body=msg_data['body'],
                order_key=msg_data['order_key'],
                properties=msg_data.get('properties')
            )
            messages.append(message)
        
        return messages

class OrderedMessageConsumer:
    """顺序消息消费者"""
    
    def __init__(self, config: OrderConfig):
        self.config = config
        self.message_queues: Dict[str, OrderedMessageQueue] = {}
        self.message_handlers: Dict[str, Callable] = {}
        self._running = False
        self._consumer_threads: List[threading.Thread] = []
        self._lock = threading.Lock()
    
    def register_handler(self, topic: str, handler: Callable[[OrderedMessage], bool]):
        """注册消息处理器"""
        self.message_handlers[topic] = handler
        print(f"已注册顺序消息处理器: {topic}")
    
    def add_message(self, message: OrderedMessage):
        """添加消息到消费队列"""
        with self._lock:
            if message.order_key not in self.message_queues:
                self.message_queues[message.order_key] = OrderedMessageQueue(
                    message.order_key, self.config
                )
            
            self.message_queues[message.order_key].add_message(message)
    
    def start(self):
        """启动消费者"""
        self._running = True
        
        # 为每个顺序键创建消费线程
        for i in range(self.config.consumer_thread_count):
            thread = threading.Thread(target=self._consume_messages, args=(i,))
            thread.daemon = True
            thread.start()
            self._consumer_threads.append(thread)
        
        print(f"顺序消息消费者已启动,线程数: {len(self._consumer_threads)}")
    
    def shutdown(self):
        """关闭消费者"""
        self._running = False
        
        for thread in self._consumer_threads:
            thread.join()
        
        print("顺序消息消费者已关闭")
    
    def _consume_messages(self, thread_id: int):
        """消费消息循环"""
        print(f"消费线程 {thread_id} 已启动")
        
        while self._running:
            try:
                # 轮询所有队列
                processed = False
                
                with self._lock:
                    queue_keys = list(self.message_queues.keys())
                
                for order_key in queue_keys:
                    if not self._running:
                        break
                    
                    queue = self.message_queues.get(order_key)
                    if not queue:
                        continue
                    
                    # 获取下一条消息
                    message = queue.get_next_message(timeout_ms=100)
                    if message:
                        success = self._process_message(message)
                        queue.complete_message(message.sequence_id, success)
                        processed = True
                
                if not processed:
                    time.sleep(0.01)  # 短暂休眠
            
            except Exception as e:
                print(f"消费线程 {thread_id} 异常: {e}")
                time.sleep(1)
    
    def _process_message(self, message: OrderedMessage) -> bool:
        """处理单条消息"""
        try:
            print(f"处理顺序消息: {message.order_key}#{message.sequence_id}")
            
            handler = self.message_handlers.get(message.topic)
            if handler:
                return handler(message)
            else:
                print(f"未找到主题 {message.topic} 的处理器")
                return False
        
        except Exception as e:
            print(f"处理消息异常: {e}")
            return False
    
    def get_consumer_statistics(self) -> Dict:
        """获取消费者统计信息"""
        with self._lock:
            total_queues = len(self.message_queues)
            queue_stats = []
            
            for order_key, queue in self.message_queues.items():
                queue_info = queue.get_queue_info()
                queue_stats.append(queue_info)
        
        return {
            'total_queues': total_queues,
            'consumer_threads': len(self._consumer_threads),
            'queue_statistics': queue_stats
        }

# 示例处理器
class OrderMessageHandler:
    """订单消息处理器"""
    
    def __init__(self):
        self.processed_orders = []
    
    def handle_order_message(self, message: OrderedMessage) -> bool:
        """处理订单消息"""
        try:
            order_data = json.loads(message.body)
            order_id = order_data.get('order_id')
            
            print(f"  处理订单: {order_id}, 顺序: {message.order_key}#{message.sequence_id}")
            
            # 模拟处理时间
            time.sleep(0.1)
            
            self.processed_orders.append({
                'order_id': order_id,
                'order_key': message.order_key,
                'sequence_id': message.sequence_id,
                'processed_time': time.time()
            })
            
            return True
        
        except Exception as e:
            print(f"处理订单消息失败: {e}")
            return False

# 使用示例
if __name__ == "__main__":
    # 创建配置
    config = OrderConfig(
        order_type=OrderType.PARTITION,
        max_retry_count=3,
        enable_strict_order=True,
        consumer_thread_count=2
    )
    
    # 创建生产者和消费者
    producer = OrderedMessageProducer(config)
    consumer = OrderedMessageConsumer(config)
    
    # 创建处理器
    order_handler = OrderMessageHandler()
    consumer.register_handler('order_topic', order_handler.handle_order_message)
    
    # 启动消费者
    consumer.start()
    
    try:
        print("=== 顺序消息示例 ===")
        
        # 模拟发送顺序消息
        order_keys = ['user_001', 'user_002', 'user_003']
        
        for i in range(30):
            order_key = order_keys[i % len(order_keys)]
            
            message = producer.send_ordered_message(
                topic='order_topic',
                tag='order_update',
                body=json.dumps({
                    'order_id': f'ORDER_{i:03d}',
                    'user_id': order_key,
                    'action': 'update_status',
                    'status': f'status_{i % 5}',
                    'timestamp': time.time()
                }),
                order_key=order_key
            )
            
            # 添加到消费队列
            consumer.add_message(message)
            
            time.sleep(0.05)  # 模拟发送间隔
        
        # 等待处理完成
        time.sleep(5)
        
        # 显示统计信息
        stats = consumer.get_consumer_statistics()
        print(f"\n消费者统计:")
        print(f"  总队列数: {stats['total_queues']}")
        print(f"  消费线程数: {stats['consumer_threads']}")
        
        print(f"\n队列详情:")
        for queue_stat in stats['queue_statistics']:
            print(f"  队列 {queue_stat['order_key']}:")
            print(f"    总消息: {queue_stat['total_messages']}")
            print(f"    待处理: {queue_stat['pending_messages']}")
            print(f"    处理中: {queue_stat['processing_messages']}")
            print(f"    失败: {queue_stat['failed_messages']}")
            print(f"    当前序列: {queue_stat['current_sequence']}")
            print(f"    处理序列: {queue_stat['processing_sequence']}")
        
        print(f"\n已处理订单数: {len(order_handler.processed_orders)}")
        
        # 验证顺序性
        print(f"\n顺序验证:")
        for order_key in order_keys:
            user_orders = [o for o in order_handler.processed_orders if o['order_key'] == order_key]
            user_orders.sort(key=lambda x: x['processed_time'])
            
            sequences = [o['sequence_id'] for o in user_orders]
            is_ordered = sequences == sorted(sequences)
            print(f"  {order_key}: {'✓' if is_ordered else '✗'} 顺序正确 {sequences}")
    
    finally:
         consumer.shutdown()

7.4.2 顺序消息应用场景

from dataclasses import dataclass
from typing import List, Dict, Optional, Any
from enum import Enum
import time
import json
import uuid

class OrderScenarioType(Enum):
    """顺序场景类型"""
    ORDER_PROCESSING = "order_processing"    # 订单处理
    PAYMENT_FLOW = "payment_flow"            # 支付流程
    INVENTORY_UPDATE = "inventory_update"    # 库存更新
    USER_OPERATION = "user_operation"        # 用户操作
    DATA_SYNC = "data_sync"                  # 数据同步

@dataclass
class OrderScenario:
    """顺序场景"""
    scenario_type: OrderScenarioType
    name: str
    description: str
    order_key_strategy: str
    example_messages: List[Dict]
    business_rules: List[str]

class OrderedMessageScenarios:
    """顺序消息应用场景"""
    
    def __init__(self):
        self.scenarios = self._create_scenarios()
    
    def _create_scenarios(self) -> Dict[OrderScenarioType, OrderScenario]:
        """创建应用场景"""
        return {
            OrderScenarioType.ORDER_PROCESSING: OrderScenario(
                scenario_type=OrderScenarioType.ORDER_PROCESSING,
                name="订单处理流程",
                description="确保同一订单的状态变更按正确顺序处理",
                order_key_strategy="order_id",
                example_messages=[
                    {"action": "create", "order_id": "ORDER_001", "status": "created"},
                    {"action": "pay", "order_id": "ORDER_001", "status": "paid"},
                    {"action": "ship", "order_id": "ORDER_001", "status": "shipped"},
                    {"action": "deliver", "order_id": "ORDER_001", "status": "delivered"}
                ],
                business_rules=[
                    "订单必须先创建才能支付",
                    "订单必须先支付才能发货",
                    "订单必须先发货才能确认收货",
                    "状态变更不能跳跃或回退"
                ]
            ),
            
            OrderScenarioType.PAYMENT_FLOW: OrderScenario(
                scenario_type=OrderScenarioType.PAYMENT_FLOW,
                name="支付流程处理",
                description="确保支付相关操作的顺序性和一致性",
                order_key_strategy="payment_id",
                example_messages=[
                    {"action": "initiate", "payment_id": "PAY_001", "amount": 100.0},
                    {"action": "authorize", "payment_id": "PAY_001", "auth_code": "AUTH123"},
                    {"action": "capture", "payment_id": "PAY_001", "captured_amount": 100.0},
                    {"action": "settle", "payment_id": "PAY_001", "settlement_id": "SETTLE123"}
                ],
                business_rules=[
                    "支付必须先发起才能授权",
                    "支付必须先授权才能扣款",
                    "支付必须先扣款才能结算",
                    "每个步骤都需要前置条件验证"
                ]
            ),
            
            OrderScenarioType.INVENTORY_UPDATE: OrderScenario(
                scenario_type=OrderScenarioType.INVENTORY_UPDATE,
                name="库存更新同步",
                description="确保同一商品的库存变更按时间顺序处理",
                order_key_strategy="product_id",
                example_messages=[
                    {"action": "restock", "product_id": "PROD_001", "quantity": 100},
                    {"action": "reserve", "product_id": "PROD_001", "quantity": 10},
                    {"action": "sell", "product_id": "PROD_001", "quantity": 5},
                    {"action": "return", "product_id": "PROD_001", "quantity": 2}
                ],
                business_rules=[
                    "库存变更必须按时间顺序处理",
                    "不能出现负库存",
                    "预留和销售操作需要检查可用库存",
                    "退货操作需要增加可用库存"
                ]
            ),
            
            OrderScenarioType.USER_OPERATION: OrderScenario(
                scenario_type=OrderScenarioType.USER_OPERATION,
                name="用户操作序列",
                description="确保同一用户的操作按顺序执行",
                order_key_strategy="user_id",
                example_messages=[
                    {"action": "login", "user_id": "USER_001", "session_id": "SESSION123"},
                    {"action": "browse", "user_id": "USER_001", "page": "product_list"},
                    {"action": "add_cart", "user_id": "USER_001", "product_id": "PROD_001"},
                    {"action": "checkout", "user_id": "USER_001", "order_id": "ORDER_001"}
                ],
                business_rules=[
                    "用户必须先登录才能操作",
                    "购物车操作需要有效会话",
                    "结账操作需要验证购物车内容",
                    "操作序列需要保持会话一致性"
                ]
            ),
            
            OrderScenarioType.DATA_SYNC: OrderScenario(
                scenario_type=OrderScenarioType.DATA_SYNC,
                name="数据同步处理",
                description="确保数据变更在多个系统间按顺序同步",
                order_key_strategy="entity_id",
                example_messages=[
                    {"action": "create", "entity_id": "ENTITY_001", "data": {"name": "Test"}},
                    {"action": "update", "entity_id": "ENTITY_001", "data": {"name": "Updated"}},
                    {"action": "relate", "entity_id": "ENTITY_001", "related_id": "REL_001"},
                    {"action": "delete", "entity_id": "ENTITY_001", "soft_delete": True}
                ],
                business_rules=[
                    "实体必须先创建才能更新",
                    "实体必须存在才能建立关联",
                    "删除操作需要检查关联关系",
                    "数据变更需要保持引用完整性"
                ]
            )
        }
    
    def get_scenario(self, scenario_type: OrderScenarioType) -> OrderScenario:
        """获取场景"""
        return self.scenarios.get(scenario_type)
    
    def list_scenarios(self) -> List[OrderScenario]:
        """列出所有场景"""
        return list(self.scenarios.values())
    
    def demonstrate_scenario(self, scenario_type: OrderScenarioType):
        """演示场景"""
        scenario = self.get_scenario(scenario_type)
        if not scenario:
            print(f"未找到场景: {scenario_type}")
            return
        
        print(f"\n=== {scenario.name} ===")
        print(f"描述: {scenario.description}")
        print(f"顺序键策略: {scenario.order_key_strategy}")
        
        print(f"\n业务规则:")
        for i, rule in enumerate(scenario.business_rules, 1):
            print(f"  {i}. {rule}")
        
        print(f"\n消息示例:")
        for i, msg in enumerate(scenario.example_messages, 1):
            print(f"  {i}. {json.dumps(msg, ensure_ascii=False)}")

# 具体场景实现示例
class OrderProcessingHandler:
    """订单处理场景处理器"""
    
    def __init__(self):
        self.orders = {}  # 订单状态存储
        self.valid_transitions = {
            None: ['created'],
            'created': ['paid', 'cancelled'],
            'paid': ['shipped', 'refunded'],
            'shipped': ['delivered', 'returned'],
            'delivered': ['completed'],
            'cancelled': [],
            'refunded': [],
            'returned': ['refunded'],
            'completed': []
        }
    
    def handle_order_message(self, message: OrderedMessage) -> bool:
        """处理订单消息"""
        try:
            order_data = json.loads(message.body)
            order_id = order_data['order_id']
            action = order_data['action']
            new_status = order_data['status']
            
            # 获取当前状态
            current_status = self.orders.get(order_id, {}).get('status')
            
            # 验证状态转换
            if not self._is_valid_transition(current_status, new_status):
                print(f"  ❌ 无效状态转换: {current_status} -> {new_status}")
                return False
            
            # 更新订单状态
            if order_id not in self.orders:
                self.orders[order_id] = {}
            
            self.orders[order_id].update({
                'status': new_status,
                'action': action,
                'updated_time': time.time(),
                'sequence_id': message.sequence_id
            })
            
            print(f"  ✅ 订单 {order_id}: {action} -> {new_status}")
            return True
        
        except Exception as e:
            print(f"  ❌ 处理订单消息失败: {e}")
            return False
    
    def _is_valid_transition(self, current_status: str, new_status: str) -> bool:
        """验证状态转换是否有效"""
        valid_next_statuses = self.valid_transitions.get(current_status, [])
        return new_status in valid_next_statuses
    
    def get_order_status(self, order_id: str) -> Dict:
        """获取订单状态"""
        return self.orders.get(order_id, {})
    
    def get_all_orders(self) -> Dict:
        """获取所有订单"""
        return self.orders.copy()

class PaymentFlowHandler:
    """支付流程场景处理器"""
    
    def __init__(self):
        self.payments = {}
        self.payment_states = {
            'initiated': ['authorized', 'failed'],
            'authorized': ['captured', 'voided'],
            'captured': ['settled', 'refunded'],
            'settled': ['completed'],
            'voided': [],
            'failed': ['retry_initiated'],
            'refunded': ['completed'],
            'retry_initiated': ['authorized', 'failed'],
            'completed': []
        }
    
    def handle_payment_message(self, message: OrderedMessage) -> bool:
        """处理支付消息"""
        try:
            payment_data = json.loads(message.body)
            payment_id = payment_data['payment_id']
            action = payment_data['action']
            
            # 根据动作确定新状态
            status_mapping = {
                'initiate': 'initiated',
                'authorize': 'authorized',
                'capture': 'captured',
                'settle': 'settled',
                'void': 'voided',
                'refund': 'refunded',
                'fail': 'failed'
            }
            
            new_status = status_mapping.get(action)
            if not new_status:
                print(f"  ❌ 未知支付动作: {action}")
                return False
            
            # 获取当前状态
            current_status = self.payments.get(payment_id, {}).get('status')
            
            # 验证状态转换
            if current_status and new_status not in self.payment_states.get(current_status, []):
                print(f"  ❌ 无效支付状态转换: {current_status} -> {new_status}")
                return False
            
            # 更新支付状态
            if payment_id not in self.payments:
                self.payments[payment_id] = {}
            
            self.payments[payment_id].update({
                'status': new_status,
                'action': action,
                'data': payment_data,
                'updated_time': time.time(),
                'sequence_id': message.sequence_id
            })
            
            print(f"  ✅ 支付 {payment_id}: {action} -> {new_status}")
            return True
        
        except Exception as e:
            print(f"  ❌ 处理支付消息失败: {e}")
            return False
    
    def get_payment_status(self, payment_id: str) -> Dict:
        """获取支付状态"""
        return self.payments.get(payment_id, {})

# 使用示例
if __name__ == "__main__":
    # 创建场景管理器
    scenarios = OrderedMessageScenarios()
    
    print("=== 顺序消息应用场景演示 ===")
    
    # 演示所有场景
    for scenario_type in OrderScenarioType:
        scenarios.demonstrate_scenario(scenario_type)
    
    # 具体场景测试
    print("\n\n=== 订单处理场景测试 ===")
    
    # 创建配置和组件
    config = OrderConfig(
        order_type=OrderType.PARTITION,
        enable_strict_order=True
    )
    
    producer = OrderedMessageProducer(config)
    consumer = OrderedMessageConsumer(config)
    order_handler = OrderProcessingHandler()
    payment_handler = PaymentFlowHandler()
    
    # 注册处理器
    consumer.register_handler('order_topic', order_handler.handle_order_message)
    consumer.register_handler('payment_topic', payment_handler.handle_payment_message)
    
    # 启动消费者
    consumer.start()
    
    try:
        # 测试订单处理流程
        order_messages = [
            {'action': 'create', 'order_id': 'ORDER_001', 'status': 'created'},
            {'action': 'pay', 'order_id': 'ORDER_001', 'status': 'paid'},
            {'action': 'ship', 'order_id': 'ORDER_001', 'status': 'shipped'},
            {'action': 'deliver', 'order_id': 'ORDER_001', 'status': 'delivered'}
        ]
        
        print("\n发送订单消息:")
        for msg_data in order_messages:
            message = producer.send_ordered_message(
                topic='order_topic',
                tag='order_update',
                body=json.dumps(msg_data),
                order_key=msg_data['order_id']
            )
            consumer.add_message(message)
            time.sleep(0.1)
        
        # 测试支付流程
        payment_messages = [
            {'action': 'initiate', 'payment_id': 'PAY_001', 'amount': 100.0},
            {'action': 'authorize', 'payment_id': 'PAY_001', 'auth_code': 'AUTH123'},
            {'action': 'capture', 'payment_id': 'PAY_001', 'captured_amount': 100.0},
            {'action': 'settle', 'payment_id': 'PAY_001', 'settlement_id': 'SETTLE123'}
        ]
        
        print("\n发送支付消息:")
        for msg_data in payment_messages:
            message = producer.send_ordered_message(
                topic='payment_topic',
                tag='payment_update',
                body=json.dumps(msg_data),
                order_key=msg_data['payment_id']
            )
            consumer.add_message(message)
            time.sleep(0.1)
        
        # 等待处理完成
        time.sleep(2)
        
        # 显示处理结果
        print("\n订单处理结果:")
        for order_id, order_info in order_handler.get_all_orders().items():
            print(f"  {order_id}: {order_info}")
        
        print("\n支付处理结果:")
        for payment_id in ['PAY_001']:
            payment_info = payment_handler.get_payment_status(payment_id)
            if payment_info:
                print(f"  {payment_id}: {payment_info}")
    
    finally:
        consumer.shutdown()

7.5 过滤消息

7.5.1 消息过滤实现

from dataclasses import dataclass
from typing import List, Dict, Optional, Callable, Any, Union
from enum import Enum
import re
import json
import time
import threading
from abc import ABC, abstractmethod

class FilterType(Enum):
    """过滤器类型"""
    TAG = "tag"                    # 标签过滤
    SQL = "sql"                    # SQL表达式过滤
    PROPERTY = "property"          # 属性过滤
    CONTENT = "content"            # 内容过滤
    CUSTOM = "custom"              # 自定义过滤

class FilterOperator(Enum):
    """过滤操作符"""
    EQUALS = "="
    NOT_EQUALS = "!="
    GREATER_THAN = ">"
    LESS_THAN = "<"
    GREATER_EQUAL = ">="
    LESS_EQUAL = "<="
    CONTAINS = "CONTAINS"
    NOT_CONTAINS = "NOT_CONTAINS"
    STARTS_WITH = "STARTS_WITH"
    ENDS_WITH = "ENDS_WITH"
    REGEX = "REGEX"
    IN = "IN"
    NOT_IN = "NOT_IN"
    IS_NULL = "IS_NULL"
    IS_NOT_NULL = "IS_NOT_NULL"

@dataclass
class FilterCondition:
    """过滤条件"""
    field: str                     # 字段名
    operator: FilterOperator       # 操作符
    value: Any                     # 比较值
    case_sensitive: bool = True    # 是否区分大小写

@dataclass
class FilterRule:
    """过滤规则"""
    rule_id: str
    name: str
    filter_type: FilterType
    conditions: List[FilterCondition]
    logic_operator: str = "AND"    # AND 或 OR
    enabled: bool = True
    priority: int = 0              # 优先级,数字越大优先级越高
    description: str = ""

@dataclass
class FilterableMessage:
    """可过滤消息"""
    message_id: str
    topic: str
    tag: str
    body: str
    properties: Dict[str, str]
    created_time: float
    
    def get_field_value(self, field: str) -> Any:
        """获取字段值"""
        if field == "topic":
            return self.topic
        elif field == "tag":
            return self.tag
        elif field == "body":
            return self.body
        elif field == "created_time":
            return self.created_time
        elif field.startswith("properties."):
            prop_name = field[11:]  # 去掉 "properties." 前缀
            return self.properties.get(prop_name)
        else:
            return getattr(self, field, None)

class MessageFilter(ABC):
    """消息过滤器基类"""
    
    @abstractmethod
    def match(self, message: FilterableMessage) -> bool:
        """检查消息是否匹配过滤条件"""
        pass

class TagFilter(MessageFilter):
    """标签过滤器"""
    
    def __init__(self, tags: List[str]):
        self.tags = set(tags)
    
    def match(self, message: FilterableMessage) -> bool:
        """检查标签是否匹配"""
        return message.tag in self.tags

class PropertyFilter(MessageFilter):
    """属性过滤器"""
    
    def __init__(self, conditions: List[FilterCondition], logic_operator: str = "AND"):
        self.conditions = conditions
        self.logic_operator = logic_operator.upper()
    
    def match(self, message: FilterableMessage) -> bool:
        """检查属性是否匹配"""
        results = []
        
        for condition in self.conditions:
            field_value = message.get_field_value(condition.field)
            result = self._evaluate_condition(field_value, condition)
            results.append(result)
        
        if self.logic_operator == "AND":
            return all(results)
        elif self.logic_operator == "OR":
            return any(results)
        else:
            return False
    
    def _evaluate_condition(self, field_value: Any, condition: FilterCondition) -> bool:
        """评估单个条件"""
        try:
            value = condition.value
            
            # 处理字符串比较的大小写
            if isinstance(field_value, str) and isinstance(value, str) and not condition.case_sensitive:
                field_value = field_value.lower()
                value = value.lower()
            
            if condition.operator == FilterOperator.EQUALS:
                return field_value == value
            elif condition.operator == FilterOperator.NOT_EQUALS:
                return field_value != value
            elif condition.operator == FilterOperator.GREATER_THAN:
                return field_value > value
            elif condition.operator == FilterOperator.LESS_THAN:
                return field_value < value
            elif condition.operator == FilterOperator.GREATER_EQUAL:
                return field_value >= value
            elif condition.operator == FilterOperator.LESS_EQUAL:
                return field_value <= value
            elif condition.operator == FilterOperator.CONTAINS:
                return str(value) in str(field_value)
            elif condition.operator == FilterOperator.NOT_CONTAINS:
                return str(value) not in str(field_value)
            elif condition.operator == FilterOperator.STARTS_WITH:
                return str(field_value).startswith(str(value))
            elif condition.operator == FilterOperator.ENDS_WITH:
                return str(field_value).endswith(str(value))
            elif condition.operator == FilterOperator.REGEX:
                pattern = re.compile(str(value), re.IGNORECASE if not condition.case_sensitive else 0)
                return bool(pattern.search(str(field_value)))
            elif condition.operator == FilterOperator.IN:
                return field_value in value if isinstance(value, (list, tuple, set)) else False
            elif condition.operator == FilterOperator.NOT_IN:
                return field_value not in value if isinstance(value, (list, tuple, set)) else True
            elif condition.operator == FilterOperator.IS_NULL:
                return field_value is None
            elif condition.operator == FilterOperator.IS_NOT_NULL:
                return field_value is not None
            else:
                return False
        
        except Exception:
            return False

class SQLFilter(MessageFilter):
    """SQL表达式过滤器"""
    
    def __init__(self, sql_expression: str):
        self.sql_expression = sql_expression
        self.compiled_expression = self._compile_expression(sql_expression)
    
    def _compile_expression(self, expression: str) -> Callable:
        """编译SQL表达式为Python函数"""
        # 简化的SQL表达式解析
        # 支持基本的WHERE子句语法
        # 例如: "tag = 'order' AND properties.user_id = '123'"
        
        # 替换SQL操作符为Python操作符
        python_expr = expression
        python_expr = re.sub(r'\bAND\b', ' and ', python_expr, flags=re.IGNORECASE)
        python_expr = re.sub(r'\bOR\b', ' or ', python_expr, flags=re.IGNORECASE)
        python_expr = re.sub(r'\bNOT\b', ' not ', python_expr, flags=re.IGNORECASE)
        
        # 替换字段引用
        python_expr = re.sub(r'\btag\b', 'message.tag', python_expr)
        python_expr = re.sub(r'\btopic\b', 'message.topic', python_expr)
        python_expr = re.sub(r'\bbody\b', 'message.body', python_expr)
        python_expr = re.sub(r'\bproperties\.(\w+)', r'message.properties.get("\1")', python_expr)
        
        def evaluate(message):
            try:
                # 创建安全的执行环境
                safe_dict = {
                    'message': message,
                    '__builtins__': {},
                    'len': len,
                    'str': str,
                    'int': int,
                    'float': float
                }
                return eval(python_expr, safe_dict)
            except Exception:
                return False
        
        return evaluate
    
    def match(self, message: FilterableMessage) -> bool:
        """检查SQL表达式是否匹配"""
        return self.compiled_expression(message)

class ContentFilter(MessageFilter):
    """内容过滤器"""
    
    def __init__(self, keywords: List[str], match_all: bool = False, case_sensitive: bool = True):
        self.keywords = keywords
        self.match_all = match_all
        self.case_sensitive = case_sensitive
    
    def match(self, message: FilterableMessage) -> bool:
        """检查内容是否匹配关键词"""
        content = message.body
        if not self.case_sensitive:
            content = content.lower()
            keywords = [kw.lower() for kw in self.keywords]
        else:
            keywords = self.keywords
        
        if self.match_all:
            return all(kw in content for kw in keywords)
        else:
            return any(kw in content for kw in keywords)

class CustomFilter(MessageFilter):
    """自定义过滤器"""
    
    def __init__(self, filter_function: Callable[[FilterableMessage], bool]):
        self.filter_function = filter_function
    
    def match(self, message: FilterableMessage) -> bool:
        """使用自定义函数过滤"""
        return self.filter_function(message)

class MessageFilterEngine:
    """消息过滤引擎"""
    
    def __init__(self):
        self.filters: Dict[str, MessageFilter] = {}
        self.filter_rules: Dict[str, FilterRule] = {}
        self._lock = threading.Lock()
        self.statistics = {
            'total_messages': 0,
            'filtered_messages': 0,
            'filter_hits': defaultdict(int)
        }
    
    def add_filter_rule(self, rule: FilterRule) -> bool:
        """添加过滤规则"""
        try:
            with self._lock:
                # 根据规则类型创建过滤器
                filter_obj = self._create_filter(rule)
                if filter_obj:
                    self.filters[rule.rule_id] = filter_obj
                    self.filter_rules[rule.rule_id] = rule
                    print(f"已添加过滤规则: {rule.name}")
                    return True
                return False
        except Exception as e:
            print(f"添加过滤规则失败: {e}")
            return False
    
    def _create_filter(self, rule: FilterRule) -> Optional[MessageFilter]:
        """根据规则创建过滤器"""
        if rule.filter_type == FilterType.TAG:
            # 从条件中提取标签
            tags = []
            for condition in rule.conditions:
                if condition.field == "tag" and condition.operator == FilterOperator.EQUALS:
                    tags.append(condition.value)
            return TagFilter(tags) if tags else None
        
        elif rule.filter_type == FilterType.PROPERTY:
            return PropertyFilter(rule.conditions, rule.logic_operator)
        
        elif rule.filter_type == FilterType.SQL:
            # 假设第一个条件包含SQL表达式
            if rule.conditions:
                sql_expr = rule.conditions[0].value
                return SQLFilter(sql_expr)
            return None
        
        elif rule.filter_type == FilterType.CONTENT:
            # 从条件中提取关键词
            keywords = []
            match_all = rule.logic_operator == "AND"
            for condition in rule.conditions:
                if condition.field == "body":
                    keywords.append(condition.value)
            return ContentFilter(keywords, match_all) if keywords else None
        
        return None
    
    def remove_filter_rule(self, rule_id: str) -> bool:
        """移除过滤规则"""
        with self._lock:
            if rule_id in self.filters:
                del self.filters[rule_id]
                del self.filter_rules[rule_id]
                print(f"已移除过滤规则: {rule_id}")
                return True
            return False
    
    def filter_message(self, message: FilterableMessage) -> bool:
        """过滤消息"""
        self.statistics['total_messages'] += 1
        
        with self._lock:
            # 按优先级排序规则
            sorted_rules = sorted(
                [(rule_id, rule) for rule_id, rule in self.filter_rules.items() if rule.enabled],
                key=lambda x: x[1].priority,
                reverse=True
            )
        
        for rule_id, rule in sorted_rules:
            filter_obj = self.filters.get(rule_id)
            if filter_obj and filter_obj.match(message):
                self.statistics['filtered_messages'] += 1
                self.statistics['filter_hits'][rule_id] += 1
                return True
        
        return False
    
    def get_filter_statistics(self) -> Dict:
        """获取过滤统计信息"""
        stats = self.statistics.copy()
        stats['filter_hit_rate'] = (
            stats['filtered_messages'] / stats['total_messages'] 
            if stats['total_messages'] > 0 else 0
        )
        return stats
    
    def list_filter_rules(self) -> List[FilterRule]:
        """列出所有过滤规则"""
        with self._lock:
            return list(self.filter_rules.values())

# 使用示例
if __name__ == "__main__":
    # 创建过滤引擎
    filter_engine = MessageFilterEngine()
    
    print("=== 消息过滤示例 ===")
    
    # 添加标签过滤规则
    tag_rule = FilterRule(
        rule_id="tag_filter_001",
        name="订单标签过滤",
        filter_type=FilterType.TAG,
        conditions=[
            FilterCondition("tag", FilterOperator.EQUALS, "order")
        ],
        priority=1
    )
    filter_engine.add_filter_rule(tag_rule)
    
    # 添加属性过滤规则
    property_rule = FilterRule(
        rule_id="property_filter_001",
        name="VIP用户过滤",
        filter_type=FilterType.PROPERTY,
        conditions=[
            FilterCondition("properties.user_level", FilterOperator.EQUALS, "VIP"),
            FilterCondition("properties.amount", FilterOperator.GREATER_THAN, 1000)
        ],
        logic_operator="AND",
        priority=2
    )
    filter_engine.add_filter_rule(property_rule)
    
    # 添加内容过滤规则
    content_rule = FilterRule(
        rule_id="content_filter_001",
        name="紧急消息过滤",
        filter_type=FilterType.CONTENT,
        conditions=[
            FilterCondition("body", FilterOperator.CONTAINS, "紧急"),
            FilterCondition("body", FilterOperator.CONTAINS, "urgent")
        ],
        logic_operator="OR",
        priority=3
    )
    filter_engine.add_filter_rule(content_rule)
    
    # 测试消息
    test_messages = [
        FilterableMessage(
            message_id="msg_001",
            topic="order_topic",
            tag="order",
            body="普通订单消息",
            properties={"user_id": "123", "user_level": "NORMAL"},
            created_time=time.time()
        ),
        FilterableMessage(
            message_id="msg_002",
            topic="order_topic",
            tag="order",
            body="VIP用户大额订单",
            properties={"user_id": "456", "user_level": "VIP", "amount": "1500"},
            created_time=time.time()
        ),
        FilterableMessage(
            message_id="msg_003",
            topic="notification_topic",
            tag="alert",
            body="紧急系统维护通知",
            properties={"priority": "high"},
            created_time=time.time()
        ),
        FilterableMessage(
            message_id="msg_004",
            topic="user_topic",
            tag="login",
            body="用户登录消息",
            properties={"user_id": "789"},
            created_time=time.time()
        )
    ]
    
    # 过滤测试
    print("\n过滤结果:")
    for message in test_messages:
        is_matched = filter_engine.filter_message(message)
        status = "✅ 匹配" if is_matched else "❌ 不匹配"
        print(f"  {message.message_id}: {status} - {message.tag} - {message.body[:20]}...")
    
    # 显示统计信息
    stats = filter_engine.get_filter_statistics()
    print(f"\n过滤统计:")
    print(f"  总消息数: {stats['total_messages']}")
    print(f"  匹配消息数: {stats['filtered_messages']}")
    print(f"  匹配率: {stats['filter_hit_rate']:.2%}")
    
    print(f"\n规则命中统计:")
    for rule_id, hits in stats['filter_hits'].items():
        rule = filter_engine.filter_rules.get(rule_id)
        rule_name = rule.name if rule else rule_id
         print(f"  {rule_name}: {hits} 次")

7.5.2 过滤消息应用场景

from dataclasses import dataclass
from typing import List, Dict, Optional, Any, Set
from enum import Enum
import json
import time
from collections import defaultdict

class FilterScenarioType(Enum):
    """过滤场景类型"""
    USER_SUBSCRIPTION = "user_subscription"      # 用户订阅过滤
    BUSINESS_ROUTING = "business_routing"        # 业务路由过滤
    PRIORITY_FILTERING = "priority_filtering"    # 优先级过滤
    CONTENT_CLASSIFICATION = "content_classification"  # 内容分类过滤
    GEOGRAPHIC_FILTERING = "geographic_filtering"  # 地理位置过滤

@dataclass
class FilterScenario:
    """过滤场景"""
    scenario_type: FilterScenarioType
    name: str
    description: str
    filter_rules: List[FilterRule]
    use_cases: List[str]
    benefits: List[str]

class MessageFilterScenarios:
    """消息过滤应用场景"""
    
    def __init__(self):
        self.scenarios = self._create_scenarios()
    
    def _create_scenarios(self) -> Dict[FilterScenarioType, FilterScenario]:
        """创建过滤场景"""
        return {
            FilterScenarioType.USER_SUBSCRIPTION: FilterScenario(
                scenario_type=FilterScenarioType.USER_SUBSCRIPTION,
                name="用户订阅过滤",
                description="根据用户订阅偏好过滤消息",
                filter_rules=[
                    FilterRule(
                        rule_id="user_interest_filter",
                        name="用户兴趣过滤",
                        filter_type=FilterType.PROPERTY,
                        conditions=[
                            FilterCondition("properties.category", FilterOperator.IN, ["electronics", "books"]),
                            FilterCondition("properties.user_id", FilterOperator.EQUALS, "USER_123")
                        ],
                        logic_operator="AND"
                    ),
                    FilterRule(
                        rule_id="subscription_level_filter",
                        name="订阅级别过滤",
                        filter_type=FilterType.PROPERTY,
                        conditions=[
                            FilterCondition("properties.subscription_level", FilterOperator.IN, ["premium", "vip"])
                        ]
                    )
                ],
                use_cases=[
                    "个性化推荐系统",
                    "新闻订阅服务",
                    "商品促销通知",
                    "内容分发网络"
                ],
                benefits=[
                    "减少无关消息推送",
                    "提高用户体验",
                    "降低网络带宽消耗",
                    "提升消息处理效率"
                ]
            ),
            
            FilterScenarioType.BUSINESS_ROUTING: FilterScenario(
                scenario_type=FilterScenarioType.BUSINESS_ROUTING,
                name="业务路由过滤",
                description="根据业务规则将消息路由到不同处理器",
                filter_rules=[
                    FilterRule(
                        rule_id="order_type_filter",
                        name="订单类型过滤",
                        filter_type=FilterType.PROPERTY,
                        conditions=[
                            FilterCondition("properties.order_type", FilterOperator.EQUALS, "urgent")
                        ]
                    ),
                    FilterRule(
                        rule_id="amount_threshold_filter",
                        name="金额阈值过滤",
                        filter_type=FilterType.PROPERTY,
                        conditions=[
                            FilterCondition("properties.amount", FilterOperator.GREATER_THAN, 10000)
                        ]
                    ),
                    FilterRule(
                        rule_id="region_filter",
                        name="地区过滤",
                        filter_type=FilterType.PROPERTY,
                        conditions=[
                            FilterCondition("properties.region", FilterOperator.IN, ["north", "south"])
                        ]
                    )
                ],
                use_cases=[
                    "订单处理分流",
                    "支付风控路由",
                    "客服工单分配",
                    "物流配送优化"
                ],
                benefits=[
                    "提高处理效率",
                    "实现负载均衡",
                    "支持业务隔离",
                    "便于系统扩展"
                ]
            ),
            
            FilterScenarioType.PRIORITY_FILTERING: FilterScenario(
                scenario_type=FilterScenarioType.PRIORITY_FILTERING,
                name="优先级过滤",
                description="根据消息优先级进行过滤和处理",
                filter_rules=[
                    FilterRule(
                        rule_id="high_priority_filter",
                        name="高优先级过滤",
                        filter_type=FilterType.PROPERTY,
                        conditions=[
                            FilterCondition("properties.priority", FilterOperator.IN, ["high", "critical"])
                        ]
                    ),
                    FilterRule(
                        rule_id="emergency_filter",
                        name="紧急消息过滤",
                        filter_type=FilterType.CONTENT,
                        conditions=[
                            FilterCondition("body", FilterOperator.CONTAINS, "emergency"),
                            FilterCondition("body", FilterOperator.CONTAINS, "urgent")
                        ],
                        logic_operator="OR"
                    )
                ],
                use_cases=[
                    "系统告警处理",
                    "紧急事件响应",
                    "VIP客户服务",
                    "实时监控报警"
                ],
                benefits=[
                    "确保重要消息优先处理",
                    "提高系统响应速度",
                    "降低业务风险",
                    "改善服务质量"
                ]
            ),
            
            FilterScenarioType.CONTENT_CLASSIFICATION: FilterScenario(
                scenario_type=FilterScenarioType.CONTENT_CLASSIFICATION,
                name="内容分类过滤",
                description="根据内容特征对消息进行分类过滤",
                filter_rules=[
                    FilterRule(
                        rule_id="spam_filter",
                        name="垃圾消息过滤",
                        filter_type=FilterType.CONTENT,
                        conditions=[
                            FilterCondition("body", FilterOperator.CONTAINS, "spam"),
                            FilterCondition("body", FilterOperator.CONTAINS, "advertisement")
                        ],
                        logic_operator="OR"
                    ),
                    FilterRule(
                        rule_id="sensitive_content_filter",
                        name="敏感内容过滤",
                        filter_type=FilterType.CONTENT,
                        conditions=[
                            FilterCondition("body", FilterOperator.REGEX, r"\b(password|secret|confidential)\b")
                        ]
                    )
                ],
                use_cases=[
                    "内容审核系统",
                    "垃圾邮件过滤",
                    "敏感信息检测",
                    "内容推荐引擎"
                ],
                benefits=[
                    "提高内容质量",
                    "保护用户隐私",
                    "降低安全风险",
                    "优化用户体验"
                ]
            ),
            
            FilterScenarioType.GEOGRAPHIC_FILTERING: FilterScenario(
                scenario_type=FilterScenarioType.GEOGRAPHIC_FILTERING,
                name="地理位置过滤",
                description="根据地理位置信息过滤消息",
                filter_rules=[
                    FilterRule(
                        rule_id="city_filter",
                        name="城市过滤",
                        filter_type=FilterType.PROPERTY,
                        conditions=[
                            FilterCondition("properties.city", FilterOperator.IN, ["beijing", "shanghai", "guangzhou"])
                        ]
                    ),
                    FilterRule(
                        rule_id="timezone_filter",
                        name="时区过滤",
                        filter_type=FilterType.PROPERTY,
                        conditions=[
                            FilterCondition("properties.timezone", FilterOperator.EQUALS, "Asia/Shanghai")
                        ]
                    )
                ],
                use_cases=[
                    "本地化服务推送",
                    "区域性促销活动",
                    "天气预报服务",
                    "交通信息推送"
                ],
                benefits=[
                    "提供精准服务",
                    "降低无效推送",
                    "提高转化率",
                    "优化资源利用"
                ]
            )
        }
    
    def get_scenario(self, scenario_type: FilterScenarioType) -> FilterScenario:
        """获取场景"""
        return self.scenarios.get(scenario_type)
    
    def list_scenarios(self) -> List[FilterScenario]:
        """列出所有场景"""
        return list(self.scenarios.values())
    
    def demonstrate_scenario(self, scenario_type: FilterScenarioType):
        """演示场景"""
        scenario = self.get_scenario(scenario_type)
        if not scenario:
            print(f"未找到场景: {scenario_type}")
            return
        
        print(f"\n=== {scenario.name} ===")
        print(f"描述: {scenario.description}")
        
        print(f"\n应用场景:")
        for i, use_case in enumerate(scenario.use_cases, 1):
            print(f"  {i}. {use_case}")
        
        print(f"\n业务收益:")
        for i, benefit in enumerate(scenario.benefits, 1):
            print(f"  {i}. {benefit}")
        
        print(f"\n过滤规则:")
        for i, rule in enumerate(scenario.filter_rules, 1):
            print(f"  {i}. {rule.name}: {rule.description}")

# 具体场景实现示例
class UserSubscriptionFilter:
    """用户订阅过滤器"""
    
    def __init__(self):
        self.user_subscriptions = {}  # 用户订阅信息
        self.filter_engine = MessageFilterEngine()
    
    def add_user_subscription(self, user_id: str, categories: List[str], 
                            subscription_level: str = "basic"):
        """添加用户订阅"""
        self.user_subscriptions[user_id] = {
            'categories': set(categories),
            'subscription_level': subscription_level,
            'created_time': time.time()
        }
        
        # 创建用户专属过滤规则
        rule = FilterRule(
            rule_id=f"user_{user_id}_filter",
            name=f"用户{user_id}订阅过滤",
            filter_type=FilterType.PROPERTY,
            conditions=[
                FilterCondition("properties.user_id", FilterOperator.EQUALS, user_id),
                FilterCondition("properties.category", FilterOperator.IN, categories)
            ],
            logic_operator="AND"
        )
        
        self.filter_engine.add_filter_rule(rule)
        print(f"已为用户 {user_id} 添加订阅过滤: {categories}")
    
    def filter_for_user(self, user_id: str, message: FilterableMessage) -> bool:
        """为特定用户过滤消息"""
        subscription = self.user_subscriptions.get(user_id)
        if not subscription:
            return False
        
        # 检查分类匹配
        message_category = message.properties.get('category')
        if message_category not in subscription['categories']:
            return False
        
        # 检查订阅级别
        message_level = message.properties.get('required_level', 'basic')
        user_level = subscription['subscription_level']
        
        level_hierarchy = {'basic': 0, 'premium': 1, 'vip': 2}
        if level_hierarchy.get(user_level, 0) < level_hierarchy.get(message_level, 0):
            return False
        
        return True
    
    def get_user_messages(self, user_id: str, messages: List[FilterableMessage]) -> List[FilterableMessage]:
        """获取用户相关消息"""
        filtered_messages = []
        for message in messages:
            if self.filter_for_user(user_id, message):
                filtered_messages.append(message)
        return filtered_messages

class BusinessRoutingFilter:
    """业务路由过滤器"""
    
    def __init__(self):
        self.routing_rules = {}
        self.processors = {}
    
    def register_processor(self, processor_name: str, processor_func):
        """注册处理器"""
        self.processors[processor_name] = processor_func
        print(f"已注册处理器: {processor_name}")
    
    def add_routing_rule(self, rule_name: str, conditions: List[FilterCondition], 
                        processor_name: str, priority: int = 0):
        """添加路由规则"""
        self.routing_rules[rule_name] = {
            'conditions': conditions,
            'processor': processor_name,
            'priority': priority,
            'hit_count': 0
        }
        print(f"已添加路由规则: {rule_name} -> {processor_name}")
    
    def route_message(self, message: FilterableMessage) -> Optional[str]:
        """路由消息到合适的处理器"""
        # 按优先级排序规则
        sorted_rules = sorted(
            self.routing_rules.items(),
            key=lambda x: x[1]['priority'],
            reverse=True
        )
        
        for rule_name, rule_info in sorted_rules:
            if self._match_conditions(message, rule_info['conditions']):
                rule_info['hit_count'] += 1
                processor_name = rule_info['processor']
                
                # 执行处理器
                if processor_name in self.processors:
                    self.processors[processor_name](message)
                    return processor_name
        
        return None
    
    def _match_conditions(self, message: FilterableMessage, conditions: List[FilterCondition]) -> bool:
        """检查条件是否匹配"""
        property_filter = PropertyFilter(conditions, "AND")
        return property_filter.match(message)
    
    def get_routing_statistics(self) -> Dict:
        """获取路由统计"""
        stats = {}
        for rule_name, rule_info in self.routing_rules.items():
            stats[rule_name] = {
                'processor': rule_info['processor'],
                'hit_count': rule_info['hit_count'],
                'priority': rule_info['priority']
            }
        return stats

# 使用示例
if __name__ == "__main__":
    # 创建场景管理器
    scenarios = MessageFilterScenarios()
    
    print("=== 消息过滤应用场景演示 ===")
    
    # 演示所有场景
    for scenario_type in FilterScenarioType:
        scenarios.demonstrate_scenario(scenario_type)
    
    print("\n\n=== 用户订阅过滤示例 ===")
    
    # 创建用户订阅过滤器
    user_filter = UserSubscriptionFilter()
    
    # 添加用户订阅
    user_filter.add_user_subscription("user_001", ["electronics", "books"], "premium")
    user_filter.add_user_subscription("user_002", ["clothing", "sports"], "basic")
    
    # 测试消息
    test_messages = [
        FilterableMessage(
            message_id="msg_001",
            topic="product_topic",
            tag="promotion",
            body="电子产品促销活动",
            properties={"user_id": "user_001", "category": "electronics", "required_level": "basic"},
            created_time=time.time()
        ),
        FilterableMessage(
            message_id="msg_002",
            topic="product_topic",
            tag="promotion",
            body="VIP专享图书优惠",
            properties={"user_id": "user_001", "category": "books", "required_level": "vip"},
            created_time=time.time()
        ),
        FilterableMessage(
            message_id="msg_003",
            topic="product_topic",
            tag="promotion",
            body="运动装备特价",
            properties={"user_id": "user_002", "category": "sports", "required_level": "basic"},
            created_time=time.time()
        )
    ]
    
    # 为用户过滤消息
    for user_id in ["user_001", "user_002"]:
        user_messages = user_filter.get_user_messages(user_id, test_messages)
        print(f"\n用户 {user_id} 的消息:")
        for msg in user_messages:
            print(f"  - {msg.message_id}: {msg.body}")
    
    print("\n\n=== 业务路由过滤示例 ===")
    
    # 创建业务路由过滤器
    routing_filter = BusinessRoutingFilter()
    
    # 注册处理器
    def urgent_processor(message):
        print(f"  🚨 紧急处理器处理: {message.message_id}")
    
    def high_value_processor(message):
        print(f"  💰 高价值处理器处理: {message.message_id}")
    
    def normal_processor(message):
        print(f"  📝 普通处理器处理: {message.message_id}")
    
    routing_filter.register_processor("urgent", urgent_processor)
    routing_filter.register_processor("high_value", high_value_processor)
    routing_filter.register_processor("normal", normal_processor)
    
    # 添加路由规则
    routing_filter.add_routing_rule(
        "urgent_orders",
        [FilterCondition("properties.order_type", FilterOperator.EQUALS, "urgent")],
        "urgent",
        priority=3
    )
    
    routing_filter.add_routing_rule(
        "high_value_orders",
        [FilterCondition("properties.amount", FilterOperator.GREATER_THAN, 10000)],
        "high_value",
        priority=2
    )
    
    routing_filter.add_routing_rule(
        "normal_orders",
        [FilterCondition("tag", FilterOperator.EQUALS, "order")],
        "normal",
        priority=1
    )
    
    # 测试路由
    routing_messages = [
        FilterableMessage(
            message_id="order_001",
            topic="order_topic",
            tag="order",
            body="紧急订单",
            properties={"order_type": "urgent", "amount": "5000"},
            created_time=time.time()
        ),
        FilterableMessage(
            message_id="order_002",
            topic="order_topic",
            tag="order",
            body="大额订单",
            properties={"order_type": "normal", "amount": "15000"},
            created_time=time.time()
        ),
        FilterableMessage(
            message_id="order_003",
            topic="order_topic",
            tag="order",
            body="普通订单",
            properties={"order_type": "normal", "amount": "500"},
            created_time=time.time()
        )
    ]
    
    print("\n路由结果:")
    for message in routing_messages:
        processor = routing_filter.route_message(message)
        if not processor:
            print(f"  ❌ 消息 {message.message_id} 未找到匹配的处理器")
    
    # 显示路由统计
    stats = routing_filter.get_routing_statistics()
    print(f"\n路由统计:")
    for rule_name, stat in stats.items():
        print(f"  {rule_name}: {stat['hit_count']} 次命中 -> {stat['processor']}")

7.6 消息路由

7.6.1 消息路由实现

from dataclasses import dataclass
from typing import List, Dict, Optional, Callable, Any, Set, Tuple
from enum import Enum
import json
import time
import threading
import hashlib
from abc import ABC, abstractmethod
from collections import defaultdict

class RouteType(Enum):
    """路由类型"""
    DIRECT = "direct"              # 直接路由
    TOPIC = "topic"                # 主题路由
    FANOUT = "fanout"              # 广播路由
    HEADER = "header"              # 头部路由
    CONSISTENT_HASH = "consistent_hash"  # 一致性哈希路由
    WEIGHTED = "weighted"          # 权重路由
    CONDITIONAL = "conditional"    # 条件路由

class RouteStrategy(Enum):
    """路由策略"""
    ROUND_ROBIN = "round_robin"    # 轮询
    RANDOM = "random"              # 随机
    LEAST_CONNECTIONS = "least_connections"  # 最少连接
    WEIGHTED_ROUND_ROBIN = "weighted_round_robin"  # 加权轮询
    HASH = "hash"                  # 哈希
    PRIORITY = "priority"          # 优先级

@dataclass
class RouteTarget:
    """路由目标"""
    target_id: str
    name: str
    endpoint: str
    weight: int = 1
    priority: int = 0
    max_connections: int = 100
    current_connections: int = 0
    enabled: bool = True
    health_check_url: Optional[str] = None
    metadata: Dict[str, Any] = None
    
    def __post_init__(self):
        if self.metadata is None:
            self.metadata = {}

@dataclass
class RouteRule:
    """路由规则"""
    rule_id: str
    name: str
    route_type: RouteType
    strategy: RouteStrategy
    conditions: List[FilterCondition]
    targets: List[RouteTarget]
    enabled: bool = True
    priority: int = 0
    description: str = ""
    
    def matches(self, message: FilterableMessage) -> bool:
        """检查消息是否匹配路由规则"""
        if not self.enabled:
            return False
        
        if not self.conditions:
            return True
        
        property_filter = PropertyFilter(self.conditions, "AND")
        return property_filter.match(message)

@dataclass
class RoutingResult:
    """路由结果"""
    message_id: str
    rule_id: str
    target_id: str
    route_time: float
    success: bool
    error_message: Optional[str] = None

class MessageRouter(ABC):
    """消息路由器基类"""
    
    @abstractmethod
    def route(self, message: FilterableMessage, targets: List[RouteTarget]) -> Optional[RouteTarget]:
        """路由消息到目标"""
        pass

class RoundRobinRouter(MessageRouter):
    """轮询路由器"""
    
    def __init__(self):
        self.current_index = 0
        self._lock = threading.Lock()
    
    def route(self, message: FilterableMessage, targets: List[RouteTarget]) -> Optional[RouteTarget]:
        """轮询选择目标"""
        if not targets:
            return None
        
        # 过滤可用目标
        available_targets = [t for t in targets if t.enabled]
        if not available_targets:
            return None
        
        with self._lock:
            target = available_targets[self.current_index % len(available_targets)]
            self.current_index += 1
            return target

class WeightedRouter(MessageRouter):
    """加权路由器"""
    
    def __init__(self):
        self.current_weights = {}
        self._lock = threading.Lock()
    
    def route(self, message: FilterableMessage, targets: List[RouteTarget]) -> Optional[RouteTarget]:
        """加权选择目标"""
        if not targets:
            return None
        
        available_targets = [t for t in targets if t.enabled]
        if not available_targets:
            return None
        
        with self._lock:
            # 初始化权重
            for target in available_targets:
                if target.target_id not in self.current_weights:
                    self.current_weights[target.target_id] = 0
            
            # 计算当前权重
            total_weight = sum(t.weight for t in available_targets)
            if total_weight == 0:
                return available_targets[0]
            
            # 更新当前权重
            for target in available_targets:
                self.current_weights[target.target_id] += target.weight
            
            # 选择权重最大的目标
            selected_target = max(available_targets, 
                                key=lambda t: self.current_weights[t.target_id])
            
            # 减少选中目标的权重
            self.current_weights[selected_target.target_id] -= total_weight
            
            return selected_target

class HashRouter(MessageRouter):
    """哈希路由器"""
    
    def __init__(self, hash_key_func: Callable[[FilterableMessage], str] = None):
        self.hash_key_func = hash_key_func or self._default_hash_key
    
    def _default_hash_key(self, message: FilterableMessage) -> str:
        """默认哈希键生成"""
        return message.properties.get('routing_key', message.message_id)
    
    def route(self, message: FilterableMessage, targets: List[RouteTarget]) -> Optional[RouteTarget]:
        """哈希选择目标"""
        if not targets:
            return None
        
        available_targets = [t for t in targets if t.enabled]
        if not available_targets:
            return None
        
        hash_key = self.hash_key_func(message)
        hash_value = int(hashlib.md5(hash_key.encode()).hexdigest(), 16)
        index = hash_value % len(available_targets)
        
        return available_targets[index]

class PriorityRouter(MessageRouter):
    """优先级路由器"""
    
    def route(self, message: FilterableMessage, targets: List[RouteTarget]) -> Optional[RouteTarget]:
        """按优先级选择目标"""
        if not targets:
            return None
        
        available_targets = [t for t in targets if t.enabled]
        if not available_targets:
            return None
        
        # 按优先级排序,优先级高的在前
        sorted_targets = sorted(available_targets, key=lambda t: t.priority, reverse=True)
        return sorted_targets[0]

class LeastConnectionsRouter(MessageRouter):
    """最少连接路由器"""
    
    def route(self, message: FilterableMessage, targets: List[RouteTarget]) -> Optional[RouteTarget]:
        """选择连接数最少的目标"""
        if not targets:
            return None
        
        available_targets = [t for t in targets if t.enabled and t.current_connections < t.max_connections]
        if not available_targets:
            return None
        
        # 选择当前连接数最少的目标
        return min(available_targets, key=lambda t: t.current_connections)

class MessageRoutingEngine:
    """消息路由引擎"""
    
    def __init__(self):
        self.rules: Dict[str, RouteRule] = {}
        self.routers: Dict[RouteStrategy, MessageRouter] = {
            RouteStrategy.ROUND_ROBIN: RoundRobinRouter(),
            RouteStrategy.WEIGHTED_ROUND_ROBIN: WeightedRouter(),
            RouteStrategy.HASH: HashRouter(),
            RouteStrategy.PRIORITY: PriorityRouter(),
            RouteStrategy.LEAST_CONNECTIONS: LeastConnectionsRouter()
        }
        self.routing_history: List[RoutingResult] = []
        self.statistics = defaultdict(int)
        self._lock = threading.Lock()
    
    def add_route_rule(self, rule: RouteRule) -> bool:
        """添加路由规则"""
        try:
            with self._lock:
                self.rules[rule.rule_id] = rule
                print(f"已添加路由规则: {rule.name}")
                return True
        except Exception as e:
            print(f"添加路由规则失败: {e}")
            return False
    
    def remove_route_rule(self, rule_id: str) -> bool:
        """移除路由规则"""
        with self._lock:
            if rule_id in self.rules:
                del self.rules[rule_id]
                print(f"已移除路由规则: {rule_id}")
                return True
            return False
    
    def route_message(self, message: FilterableMessage) -> Optional[RoutingResult]:
        """路由消息"""
        start_time = time.time()
        
        with self._lock:
            # 按优先级排序规则
            sorted_rules = sorted(
                [(rule_id, rule) for rule_id, rule in self.rules.items()],
                key=lambda x: x[1].priority,
                reverse=True
            )
        
        for rule_id, rule in sorted_rules:
            if rule.matches(message):
                try:
                    # 选择路由器
                    router = self.routers.get(rule.strategy)
                    if not router:
                        continue
                    
                    # 路由到目标
                    target = router.route(message, rule.targets)
                    if target:
                        # 更新连接数
                        target.current_connections += 1
                        
                        # 记录路由结果
                        result = RoutingResult(
                            message_id=message.message_id,
                            rule_id=rule_id,
                            target_id=target.target_id,
                            route_time=time.time() - start_time,
                            success=True
                        )
                        
                        self.routing_history.append(result)
                        self.statistics[f"rule_{rule_id}_hits"] += 1
                        self.statistics[f"target_{target.target_id}_hits"] += 1
                        
                        return result
                
                except Exception as e:
                    # 记录失败结果
                    result = RoutingResult(
                        message_id=message.message_id,
                        rule_id=rule_id,
                        target_id="",
                        route_time=time.time() - start_time,
                        success=False,
                        error_message=str(e)
                    )
                    
                    self.routing_history.append(result)
                    self.statistics["routing_failures"] += 1
                    
                    return result
        
        # 没有匹配的规则
        result = RoutingResult(
            message_id=message.message_id,
            rule_id="",
            target_id="",
            route_time=time.time() - start_time,
            success=False,
            error_message="No matching route rule found"
        )
        
        self.routing_history.append(result)
        self.statistics["no_route_found"] += 1
        
        return result
    
    def release_connection(self, target_id: str):
        """释放连接"""
        with self._lock:
            for rule in self.rules.values():
                for target in rule.targets:
                    if target.target_id == target_id and target.current_connections > 0:
                        target.current_connections -= 1
                        break
    
    def get_routing_statistics(self) -> Dict:
        """获取路由统计"""
        stats = dict(self.statistics)
        stats['total_routes'] = len(self.routing_history)
        
        if self.routing_history:
            successful_routes = [r for r in self.routing_history if r.success]
            stats['success_rate'] = len(successful_routes) / len(self.routing_history)
            
            if successful_routes:
                avg_route_time = sum(r.route_time for r in successful_routes) / len(successful_routes)
                stats['avg_route_time'] = avg_route_time
        
        return stats
    
    def get_target_health(self) -> Dict[str, Dict]:
        """获取目标健康状态"""
        health_status = {}
        
        with self._lock:
            for rule in self.rules.values():
                for target in rule.targets:
                    health_status[target.target_id] = {
                        'name': target.name,
                        'enabled': target.enabled,
                        'current_connections': target.current_connections,
                        'max_connections': target.max_connections,
                        'utilization': target.current_connections / target.max_connections if target.max_connections > 0 else 0,
                        'weight': target.weight,
                        'priority': target.priority
                    }
        
        return health_status
    
    def list_route_rules(self) -> List[RouteRule]:
        """列出所有路由规则"""
        with self._lock:
            return list(self.rules.values())

# 使用示例
if __name__ == "__main__":
    # 创建路由引擎
    routing_engine = MessageRoutingEngine()
    
    print("=== 消息路由示例 ===")
    
    # 创建路由目标
    targets = [
        RouteTarget(
            target_id="processor_1",
            name="处理器1",
            endpoint="http://localhost:8001",
            weight=3,
            priority=1,
            max_connections=50
        ),
        RouteTarget(
            target_id="processor_2",
            name="处理器2",
            endpoint="http://localhost:8002",
            weight=2,
            priority=2,
            max_connections=30
        ),
        RouteTarget(
            target_id="processor_3",
            name="处理器3",
            endpoint="http://localhost:8003",
            weight=1,
            priority=3,
            max_connections=20
        )
    ]
    
    # 添加路由规则
    
    # 1. 高优先级消息路由
    high_priority_rule = RouteRule(
        rule_id="high_priority_route",
        name="高优先级消息路由",
        route_type=RouteType.CONDITIONAL,
        strategy=RouteStrategy.PRIORITY,
        conditions=[
            FilterCondition("properties.priority", FilterOperator.EQUALS, "high")
        ],
        targets=targets,
        priority=3
    )
    routing_engine.add_route_rule(high_priority_rule)
    
    # 2. 用户消息加权路由
    user_weighted_rule = RouteRule(
        rule_id="user_weighted_route",
        name="用户消息加权路由",
        route_type=RouteType.WEIGHTED,
        strategy=RouteStrategy.WEIGHTED_ROUND_ROBIN,
        conditions=[
            FilterCondition("tag", FilterOperator.EQUALS, "user_message")
        ],
        targets=targets,
        priority=2
    )
    routing_engine.add_route_rule(user_weighted_rule)
    
    # 3. 订单消息哈希路由
    order_hash_rule = RouteRule(
        rule_id="order_hash_route",
        name="订单消息哈希路由",
        route_type=RouteType.CONSISTENT_HASH,
        strategy=RouteStrategy.HASH,
        conditions=[
            FilterCondition("tag", FilterOperator.EQUALS, "order")
        ],
        targets=targets,
        priority=1
    )
    routing_engine.add_route_rule(order_hash_rule)
    
    # 4. 默认轮询路由
    default_rule = RouteRule(
        rule_id="default_route",
        name="默认轮询路由",
        route_type=RouteType.DIRECT,
        strategy=RouteStrategy.ROUND_ROBIN,
        conditions=[],  # 无条件,匹配所有
        targets=targets,
        priority=0
    )
    routing_engine.add_route_rule(default_rule)
    
    # 测试消息
    test_messages = [
        FilterableMessage(
            message_id="msg_001",
            topic="system_topic",
            tag="alert",
            body="高优先级系统告警",
            properties={"priority": "high", "routing_key": "system_alert"},
            created_time=time.time()
        ),
        FilterableMessage(
            message_id="msg_002",
            topic="user_topic",
            tag="user_message",
            body="用户消息",
            properties={"user_id": "123", "routing_key": "user_123"},
            created_time=time.time()
        ),
        FilterableMessage(
            message_id="msg_003",
            topic="order_topic",
            tag="order",
            body="订单消息",
            properties={"order_id": "ORDER_001", "routing_key": "ORDER_001"},
            created_time=time.time()
        ),
        FilterableMessage(
            message_id="msg_004",
            topic="notification_topic",
            tag="notification",
            body="通知消息",
            properties={"type": "email"},
            created_time=time.time()
        )
    ]
    
    # 路由测试
    print("\n路由结果:")
    for message in test_messages:
        result = routing_engine.route_message(message)
        if result and result.success:
            print(f"  ✅ {message.message_id} -> {result.target_id} (规则: {result.rule_id}, 耗时: {result.route_time:.3f}s)")
        else:
            error_msg = result.error_message if result else "未知错误"
            print(f"  ❌ {message.message_id} 路由失败: {error_msg}")
    
    # 显示统计信息
    stats = routing_engine.get_routing_statistics()
    print(f"\n路由统计:")
    print(f"  总路由数: {stats.get('total_routes', 0)}")
    print(f"  成功率: {stats.get('success_rate', 0):.2%}")
    print(f"  平均路由时间: {stats.get('avg_route_time', 0):.3f}s")
    
    # 显示目标健康状态
    health = routing_engine.get_target_health()
    print(f"\n目标健康状态:")
    for target_id, status in health.items():
        print(f"  {status['name']}: 连接数 {status['current_connections']}/{status['max_connections']} "
              f"(利用率: {status['utilization']:.1%}, 权重: {status['weight']}, 优先级: {status['priority']})")
    
    # 释放连接
     for target in targets:
         routing_engine.release_connection(target.target_id)

7.6.2 消息路由应用场景

from dataclasses import dataclass
from typing import List, Dict, Optional, Any, Set
from enum import Enum
import json
import time
from collections import defaultdict

class RoutingScenarioType(Enum):
    """路由场景类型"""
    MICROSERVICE_ROUTING = "microservice_routing"      # 微服务路由
    LOAD_BALANCING = "load_balancing"                  # 负载均衡
    GEOGRAPHIC_ROUTING = "geographic_routing"          # 地理路由
    CANARY_DEPLOYMENT = "canary_deployment"            # 金丝雀部署
    FAILOVER_ROUTING = "failover_routing"              # 故障转移路由
    CONTENT_BASED_ROUTING = "content_based_routing"    # 基于内容的路由

@dataclass
class RoutingScenario:
    """路由场景"""
    scenario_type: RoutingScenarioType
    name: str
    description: str
    route_rules: List[RouteRule]
    use_cases: List[str]
    benefits: List[str]
    challenges: List[str]

class MessageRoutingScenarios:
    """消息路由应用场景"""
    
    def __init__(self):
        self.scenarios = self._create_scenarios()
    
    def _create_scenarios(self) -> Dict[RoutingScenarioType, RoutingScenario]:
        """创建路由场景"""
        # 创建通用目标
        microservice_targets = [
            RouteTarget("user_service", "用户服务", "http://user-service:8080", weight=2, priority=1),
            RouteTarget("order_service", "订单服务", "http://order-service:8080", weight=3, priority=1),
            RouteTarget("payment_service", "支付服务", "http://payment-service:8080", weight=1, priority=2)
        ]
        
        load_balance_targets = [
            RouteTarget("server_1", "服务器1", "http://server1:8080", weight=3, max_connections=100),
            RouteTarget("server_2", "服务器2", "http://server2:8080", weight=2, max_connections=80),
            RouteTarget("server_3", "服务器3", "http://server3:8080", weight=1, max_connections=60)
        ]
        
        geographic_targets = [
            RouteTarget("beijing_dc", "北京数据中心", "http://beijing.example.com", priority=1),
            RouteTarget("shanghai_dc", "上海数据中心", "http://shanghai.example.com", priority=2),
            RouteTarget("guangzhou_dc", "广州数据中心", "http://guangzhou.example.com", priority=3)
        ]
        
        canary_targets = [
            RouteTarget("stable_version", "稳定版本", "http://stable.example.com", weight=9),
            RouteTarget("canary_version", "金丝雀版本", "http://canary.example.com", weight=1)
        ]
        
        return {
            RoutingScenarioType.MICROSERVICE_ROUTING: RoutingScenario(
                scenario_type=RoutingScenarioType.MICROSERVICE_ROUTING,
                name="微服务路由",
                description="根据消息类型将请求路由到不同的微服务",
                route_rules=[
                    RouteRule(
                        rule_id="user_service_route",
                        name="用户服务路由",
                        route_type=RouteType.CONDITIONAL,
                        strategy=RouteStrategy.ROUND_ROBIN,
                        conditions=[
                            FilterCondition("properties.service_type", FilterOperator.EQUALS, "user")
                        ],
                        targets=[microservice_targets[0]],
                        priority=3
                    ),
                    RouteRule(
                        rule_id="order_service_route",
                        name="订单服务路由",
                        route_type=RouteType.CONDITIONAL,
                        strategy=RouteStrategy.HASH,
                        conditions=[
                            FilterCondition("properties.service_type", FilterOperator.EQUALS, "order")
                        ],
                        targets=[microservice_targets[1]],
                        priority=3
                    ),
                    RouteRule(
                        rule_id="payment_service_route",
                        name="支付服务路由",
                        route_type=RouteType.CONDITIONAL,
                        strategy=RouteStrategy.PRIORITY,
                        conditions=[
                            FilterCondition("properties.service_type", FilterOperator.EQUALS, "payment")
                        ],
                        targets=[microservice_targets[2]],
                        priority=3
                    )
                ],
                use_cases=[
                    "分布式系统架构",
                    "服务解耦",
                    "API网关路由",
                    "事件驱动架构"
                ],
                benefits=[
                    "提高系统可扩展性",
                    "实现服务隔离",
                    "支持独立部署",
                    "便于维护和升级"
                ],
                challenges=[
                    "服务发现复杂性",
                    "网络延迟增加",
                    "分布式事务处理",
                    "监控和调试困难"
                ]
            ),
            
            RoutingScenarioType.LOAD_BALANCING: RoutingScenario(
                scenario_type=RoutingScenarioType.LOAD_BALANCING,
                name="负载均衡路由",
                description="在多个服务实例之间分配负载",
                route_rules=[
                    RouteRule(
                        rule_id="weighted_load_balance",
                        name="加权负载均衡",
                        route_type=RouteType.WEIGHTED,
                        strategy=RouteStrategy.WEIGHTED_ROUND_ROBIN,
                        conditions=[],
                        targets=load_balance_targets,
                        priority=1
                    ),
                    RouteRule(
                        rule_id="least_connections_balance",
                        name="最少连接负载均衡",
                        route_type=RouteType.CONDITIONAL,
                        strategy=RouteStrategy.LEAST_CONNECTIONS,
                        conditions=[
                            FilterCondition("properties.balance_type", FilterOperator.EQUALS, "least_conn")
                        ],
                        targets=load_balance_targets,
                        priority=2
                    )
                ],
                use_cases=[
                    "高并发Web应用",
                    "API服务集群",
                    "数据库连接池",
                    "缓存服务集群"
                ],
                benefits=[
                    "提高系统吞吐量",
                    "避免单点故障",
                    "优化资源利用",
                    "提升用户体验"
                ],
                challenges=[
                    "会话保持问题",
                    "健康检查机制",
                    "动态权重调整",
                    "故障检测和恢复"
                ]
            ),
            
            RoutingScenarioType.GEOGRAPHIC_ROUTING: RoutingScenario(
                scenario_type=RoutingScenarioType.GEOGRAPHIC_ROUTING,
                name="地理位置路由",
                description="根据用户地理位置路由到最近的数据中心",
                route_rules=[
                    RouteRule(
                        rule_id="beijing_region_route",
                        name="北京地区路由",
                        route_type=RouteType.CONDITIONAL,
                        strategy=RouteStrategy.PRIORITY,
                        conditions=[
                            FilterCondition("properties.region", FilterOperator.IN, ["beijing", "tianjin", "hebei"])
                        ],
                        targets=[geographic_targets[0]],
                        priority=3
                    ),
                    RouteRule(
                        rule_id="shanghai_region_route",
                        name="上海地区路由",
                        route_type=RouteType.CONDITIONAL,
                        strategy=RouteStrategy.PRIORITY,
                        conditions=[
                            FilterCondition("properties.region", FilterOperator.IN, ["shanghai", "jiangsu", "zhejiang"])
                        ],
                        targets=[geographic_targets[1]],
                        priority=3
                    ),
                    RouteRule(
                        rule_id="guangzhou_region_route",
                        name="广州地区路由",
                        route_type=RouteType.CONDITIONAL,
                        strategy=RouteStrategy.PRIORITY,
                        conditions=[
                            FilterCondition("properties.region", FilterOperator.IN, ["guangdong", "guangxi", "hainan"])
                        ],
                        targets=[geographic_targets[2]],
                        priority=3
                    )
                ],
                use_cases=[
                    "全球化应用部署",
                    "CDN内容分发",
                    "多地域容灾",
                    "就近服务访问"
                ],
                benefits=[
                    "降低网络延迟",
                    "提高访问速度",
                    "符合数据合规要求",
                    "优化用户体验"
                ],
                challenges=[
                    "地理位置识别准确性",
                    "跨地域数据同步",
                    "网络质量差异",
                    "法律法规限制"
                ]
            ),
            
            RoutingScenarioType.CANARY_DEPLOYMENT: RoutingScenario(
                scenario_type=RoutingScenarioType.CANARY_DEPLOYMENT,
                name="金丝雀部署路由",
                description="将少量流量路由到新版本进行测试",
                route_rules=[
                    RouteRule(
                        rule_id="canary_deployment_route",
                        name="金丝雀部署路由",
                        route_type=RouteType.WEIGHTED,
                        strategy=RouteStrategy.WEIGHTED_ROUND_ROBIN,
                        conditions=[],
                        targets=canary_targets,
                        priority=1
                    ),
                    RouteRule(
                        rule_id="beta_user_route",
                        name="Beta用户路由",
                        route_type=RouteType.CONDITIONAL,
                        strategy=RouteStrategy.PRIORITY,
                        conditions=[
                            FilterCondition("properties.user_type", FilterOperator.EQUALS, "beta")
                        ],
                        targets=[canary_targets[1]],  # 只路由到金丝雀版本
                        priority=2
                    )
                ],
                use_cases=[
                    "新功能灰度发布",
                    "A/B测试",
                    "风险控制部署",
                    "性能验证"
                ],
                benefits=[
                    "降低发布风险",
                    "快速问题发现",
                    "用户体验保障",
                    "渐进式升级"
                ],
                challenges=[
                    "流量分配策略",
                    "版本兼容性",
                    "监控指标设计",
                    "回滚机制"
                ]
            ),
            
            RoutingScenarioType.FAILOVER_ROUTING: RoutingScenario(
                scenario_type=RoutingScenarioType.FAILOVER_ROUTING,
                name="故障转移路由",
                description="当主服务不可用时自动切换到备用服务",
                route_rules=[
                    RouteRule(
                        rule_id="primary_service_route",
                        name="主服务路由",
                        route_type=RouteType.CONDITIONAL,
                        strategy=RouteStrategy.PRIORITY,
                        conditions=[
                            FilterCondition("properties.failover_enabled", FilterOperator.EQUALS, "true")
                        ],
                        targets=[
                            RouteTarget("primary_service", "主服务", "http://primary.example.com", priority=1),
                            RouteTarget("backup_service", "备用服务", "http://backup.example.com", priority=2)
                        ],
                        priority=1
                    )
                ],
                use_cases=[
                    "高可用系统设计",
                    "灾难恢复",
                    "服务降级",
                    "多活架构"
                ],
                benefits=[
                    "提高系统可用性",
                    "减少服务中断时间",
                    "自动故障恢复",
                    "业务连续性保障"
                ],
                challenges=[
                    "故障检测机制",
                    "数据一致性",
                    "切换时间控制",
                    "状态同步问题"
                ]
            ),
            
            RoutingScenarioType.CONTENT_BASED_ROUTING: RoutingScenario(
                scenario_type=RoutingScenarioType.CONTENT_BASED_ROUTING,
                name="基于内容的路由",
                description="根据消息内容特征进行智能路由",
                route_rules=[
                    RouteRule(
                        rule_id="image_processing_route",
                        name="图片处理路由",
                        route_type=RouteType.CONDITIONAL,
                        strategy=RouteStrategy.ROUND_ROBIN,
                        conditions=[
                            FilterCondition("properties.content_type", FilterOperator.EQUALS, "image")
                        ],
                        targets=[
                            RouteTarget("image_processor", "图片处理器", "http://image-processor:8080")
                        ],
                        priority=3
                    ),
                    RouteRule(
                        rule_id="text_processing_route",
                        name="文本处理路由",
                        route_type=RouteType.CONDITIONAL,
                        strategy=RouteStrategy.HASH,
                        conditions=[
                            FilterCondition("properties.content_type", FilterOperator.EQUALS, "text")
                        ],
                        targets=[
                            RouteTarget("text_processor", "文本处理器", "http://text-processor:8080")
                        ],
                        priority=3
                    ),
                    RouteRule(
                        rule_id="video_processing_route",
                        name="视频处理路由",
                        route_type=RouteType.CONDITIONAL,
                        strategy=RouteStrategy.LEAST_CONNECTIONS,
                        conditions=[
                            FilterCondition("properties.content_type", FilterOperator.EQUALS, "video")
                        ],
                        targets=[
                            RouteTarget("video_processor", "视频处理器", "http://video-processor:8080", max_connections=10)
                        ],
                        priority=3
                    )
                ],
                use_cases=[
                    "多媒体处理系统",
                    "内容管理平台",
                    "智能分析服务",
                    "工作流引擎"
                ],
                benefits=[
                    "专业化处理",
                    "资源优化配置",
                    "处理效率提升",
                    "系统模块化"
                ],
                challenges=[
                    "内容识别准确性",
                    "处理能力匹配",
                    "资源调度复杂性",
                    "性能监控"
                ]
            )
        }
    
    def get_scenario(self, scenario_type: RoutingScenarioType) -> RoutingScenario:
        """获取场景"""
        return self.scenarios.get(scenario_type)
    
    def list_scenarios(self) -> List[RoutingScenario]:
        """列出所有场景"""
        return list(self.scenarios.values())
    
    def demonstrate_scenario(self, scenario_type: RoutingScenarioType):
        """演示场景"""
        scenario = self.get_scenario(scenario_type)
        if not scenario:
            print(f"未找到场景: {scenario_type}")
            return
        
        print(f"\n=== {scenario.name} ===")
        print(f"描述: {scenario.description}")
        
        print(f"\n应用场景:")
        for i, use_case in enumerate(scenario.use_cases, 1):
            print(f"  {i}. {use_case}")
        
        print(f"\n业务收益:")
        for i, benefit in enumerate(scenario.benefits, 1):
            print(f"  {i}. {benefit}")
        
        print(f"\n挑战与难点:")
        for i, challenge in enumerate(scenario.challenges, 1):
            print(f"  {i}. {challenge}")
        
        print(f"\n路由规则数量: {len(scenario.route_rules)}")

# 高级路由策略实现
class AdvancedRoutingStrategies:
    """高级路由策略"""
    
    @staticmethod
    def create_circuit_breaker_router(failure_threshold: int = 5, 
                                    recovery_timeout: int = 60) -> 'CircuitBreakerRouter':
        """创建熔断器路由"""
        return CircuitBreakerRouter(failure_threshold, recovery_timeout)
    
    @staticmethod
    def create_adaptive_router(performance_window: int = 300) -> 'AdaptiveRouter':
        """创建自适应路由"""
        return AdaptiveRouter(performance_window)
    
    @staticmethod
    def create_sticky_session_router(session_timeout: int = 1800) -> 'StickySessionRouter':
        """创建会话保持路由"""
        return StickySessionRouter(session_timeout)

class CircuitBreakerRouter(MessageRouter):
    """熔断器路由器"""
    
    def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_counts = defaultdict(int)
        self.last_failure_times = defaultdict(float)
        self.circuit_states = defaultdict(str)  # open, closed, half_open
        self._lock = threading.Lock()
    
    def route(self, message: FilterableMessage, targets: List[RouteTarget]) -> Optional[RouteTarget]:
        """熔断器路由"""
        if not targets:
            return None
        
        available_targets = []
        current_time = time.time()
        
        with self._lock:
            for target in targets:
                if not target.enabled:
                    continue
                
                target_id = target.target_id
                circuit_state = self.circuit_states[target_id]
                
                if circuit_state == "open":
                    # 检查是否可以进入半开状态
                    if current_time - self.last_failure_times[target_id] > self.recovery_timeout:
                        self.circuit_states[target_id] = "half_open"
                        available_targets.append(target)
                elif circuit_state in ["closed", "half_open", ""]:
                    available_targets.append(target)
        
        if not available_targets:
            return None
        
        # 简单轮询选择
        return available_targets[0]
    
    def record_success(self, target_id: str):
        """记录成功"""
        with self._lock:
            self.failure_counts[target_id] = 0
            self.circuit_states[target_id] = "closed"
    
    def record_failure(self, target_id: str):
        """记录失败"""
        with self._lock:
            self.failure_counts[target_id] += 1
            self.last_failure_times[target_id] = time.time()
            
            if self.failure_counts[target_id] >= self.failure_threshold:
                self.circuit_states[target_id] = "open"

class AdaptiveRouter(MessageRouter):
    """自适应路由器"""
    
    def __init__(self, performance_window: int = 300):
        self.performance_window = performance_window
        self.response_times = defaultdict(list)
        self.success_rates = defaultdict(list)
        self._lock = threading.Lock()
    
    def route(self, message: FilterableMessage, targets: List[RouteTarget]) -> Optional[RouteTarget]:
        """自适应路由"""
        if not targets:
            return None
        
        available_targets = [t for t in targets if t.enabled]
        if not available_targets:
            return None
        
        # 计算每个目标的性能分数
        target_scores = {}
        current_time = time.time()
        
        with self._lock:
            for target in available_targets:
                target_id = target.target_id
                
                # 清理过期数据
                self._cleanup_old_data(target_id, current_time)
                
                # 计算平均响应时间
                avg_response_time = self._calculate_avg_response_time(target_id)
                
                # 计算成功率
                success_rate = self._calculate_success_rate(target_id)
                
                # 综合评分 (响应时间越低越好,成功率越高越好)
                score = success_rate / (avg_response_time + 0.001)  # 避免除零
                target_scores[target_id] = score
        
        # 选择评分最高的目标
        if target_scores:
            best_target_id = max(target_scores.keys(), key=lambda x: target_scores[x])
            return next(t for t in available_targets if t.target_id == best_target_id)
        
        return available_targets[0]
    
    def _cleanup_old_data(self, target_id: str, current_time: float):
        """清理过期数据"""
        cutoff_time = current_time - self.performance_window
        
        self.response_times[target_id] = [
            (timestamp, response_time) for timestamp, response_time in self.response_times[target_id]
            if timestamp > cutoff_time
        ]
        
        self.success_rates[target_id] = [
            (timestamp, success) for timestamp, success in self.success_rates[target_id]
            if timestamp > cutoff_time
        ]
    
    def _calculate_avg_response_time(self, target_id: str) -> float:
        """计算平均响应时间"""
        response_times = [rt for _, rt in self.response_times[target_id]]
        return sum(response_times) / len(response_times) if response_times else 1.0
    
    def _calculate_success_rate(self, target_id: str) -> float:
        """计算成功率"""
        success_data = [success for _, success in self.success_rates[target_id]]
        if not success_data:
            return 0.5  # 默认成功率
        return sum(success_data) / len(success_data)
    
    def record_performance(self, target_id: str, response_time: float, success: bool):
        """记录性能数据"""
        current_time = time.time()
        with self._lock:
            self.response_times[target_id].append((current_time, response_time))
            self.success_rates[target_id].append((current_time, 1.0 if success else 0.0))

class StickySessionRouter(MessageRouter):
    """会话保持路由器"""
    
    def __init__(self, session_timeout: int = 1800):
        self.session_timeout = session_timeout
        self.session_mappings = {}  # session_id -> target_id
        self.session_timestamps = {}  # session_id -> timestamp
        self._lock = threading.Lock()
    
    def route(self, message: FilterableMessage, targets: List[RouteTarget]) -> Optional[RouteTarget]:
        """会话保持路由"""
        if not targets:
            return None
        
        available_targets = [t for t in targets if t.enabled]
        if not available_targets:
            return None
        
        # 获取会话ID
        session_id = message.properties.get('session_id')
        if not session_id:
            # 没有会话ID,使用轮询
            return available_targets[0]
        
        current_time = time.time()
        
        with self._lock:
            # 清理过期会话
            self._cleanup_expired_sessions(current_time)
            
            # 检查是否有现有会话映射
            if session_id in self.session_mappings:
                target_id = self.session_mappings[session_id]
                target = next((t for t in available_targets if t.target_id == target_id), None)
                if target:
                    # 更新会话时间戳
                    self.session_timestamps[session_id] = current_time
                    return target
                else:
                    # 目标不可用,删除会话映射
                    del self.session_mappings[session_id]
                    del self.session_timestamps[session_id]
            
            # 创建新的会话映射
            selected_target = available_targets[0]  # 简单选择第一个可用目标
            self.session_mappings[session_id] = selected_target.target_id
            self.session_timestamps[session_id] = current_time
            
            return selected_target
    
    def _cleanup_expired_sessions(self, current_time: float):
        """清理过期会话"""
        expired_sessions = [
            session_id for session_id, timestamp in self.session_timestamps.items()
            if current_time - timestamp > self.session_timeout
        ]
        
        for session_id in expired_sessions:
            del self.session_mappings[session_id]
            del self.session_timestamps[session_id]

# 使用示例
if __name__ == "__main__":
    # 创建场景管理器
    scenarios = MessageRoutingScenarios()
    
    print("=== 消息路由应用场景演示 ===")
    
    # 演示所有场景
    for scenario_type in RoutingScenarioType:
        scenarios.demonstrate_scenario(scenario_type)
    
    print("\n\n=== 高级路由策略示例 ===")
    
    # 测试熔断器路由
    print("\n1. 熔断器路由测试")
    circuit_breaker = CircuitBreakerRouter(failure_threshold=3, recovery_timeout=10)
    
    test_targets = [
        RouteTarget("service_1", "服务1", "http://service1:8080"),
        RouteTarget("service_2", "服务2", "http://service2:8080")
    ]
    
    test_message = FilterableMessage(
        message_id="test_001",
        topic="test_topic",
        tag="test",
        body="测试消息",
        properties={},
        created_time=time.time()
    )
    
    # 模拟路由和失败
    for i in range(5):
        target = circuit_breaker.route(test_message, test_targets)
        if target:
            print(f"  路由到: {target.name}")
            # 模拟前3次失败
            if i < 3:
                circuit_breaker.record_failure(target.target_id)
                print(f"    记录失败")
            else:
                circuit_breaker.record_success(target.target_id)
                print(f"    记录成功")
        else:
            print(f"  无可用目标")
    
    # 测试自适应路由
    print("\n2. 自适应路由测试")
    adaptive_router = AdaptiveRouter(performance_window=60)
    
    # 模拟性能数据
    adaptive_router.record_performance("service_1", 0.1, True)
    adaptive_router.record_performance("service_1", 0.15, True)
    adaptive_router.record_performance("service_2", 0.3, False)
    adaptive_router.record_performance("service_2", 0.25, True)
    
    target = adaptive_router.route(test_message, test_targets)
    if target:
        print(f"  自适应路由选择: {target.name}")
    
    # 测试会话保持路由
    print("\n3. 会话保持路由测试")
    sticky_router = StickySessionRouter(session_timeout=300)
    
    session_message = FilterableMessage(
        message_id="session_001",
        topic="session_topic",
        tag="session",
        body="会话消息",
        properties={"session_id": "user_123_session"},
        created_time=time.time()
    )
    
    # 多次路由同一会话
    for i in range(3):
        target = sticky_router.route(session_message, test_targets)
        if target:
            print(f"  会话路由 {i+1}: {target.name}")

7.7 本章总结

7.7.1 核心知识点

from dataclasses import dataclass
from typing import List, Dict, Any
from enum import Enum

class FeatureCategory(Enum):
    """特性分类"""
    TRANSACTION = "transaction"        # 事务特性
    DELAY = "delay"                   # 延迟特性
    BATCH = "batch"                   # 批量特性
    ORDER = "order"                   # 顺序特性
    FILTER = "filter"                 # 过滤特性
    ROUTING = "routing"               # 路由特性

@dataclass
class FeatureSummary:
    """特性总结"""
    category: FeatureCategory
    name: str
    description: str
    key_concepts: List[str]
    implementation_points: List[str]
    use_cases: List[str]
    best_practices: List[str]
    common_pitfalls: List[str]

class RocketMQAdvancedFeaturesSummary:
    """RocketMQ高级特性总结"""
    
    def __init__(self):
        self.feature_summaries = self._create_summaries()
        self.integration_patterns = self._create_integration_patterns()
        self.performance_considerations = self._create_performance_considerations()
    
    def _create_summaries(self) -> Dict[FeatureCategory, FeatureSummary]:
        """创建特性总结"""
        return {
            FeatureCategory.TRANSACTION: FeatureSummary(
                category=FeatureCategory.TRANSACTION,
                name="事务消息",
                description="保证消息发送和本地事务的最终一致性",
                key_concepts=[
                    "半消息机制",
                    "事务状态检查",
                    "最终一致性",
                    "补偿机制",
                    "幂等性设计"
                ],
                implementation_points=[
                    "实现TransactionListener接口",
                    "处理executeLocalTransaction方法",
                    "实现checkLocalTransaction回查逻辑",
                    "合理设置事务超时时间",
                    "确保本地事务幂等性"
                ],
                use_cases=[
                    "订单支付场景",
                    "库存扣减",
                    "账户转账",
                    "分布式事务",
                    "数据一致性保障"
                ],
                best_practices=[
                    "本地事务要快速执行",
                    "回查逻辑要幂等",
                    "合理设置重试次数",
                    "监控事务消息状态",
                    "设计补偿机制"
                ],
                common_pitfalls=[
                    "本地事务执行时间过长",
                    "回查逻辑不幂等",
                    "忽略异常处理",
                    "事务状态不明确",
                    "缺少监控告警"
                ]
            ),
            
            FeatureCategory.DELAY: FeatureSummary(
                category=FeatureCategory.DELAY,
                name="延迟消息",
                description="支持消息在指定时间后才能被消费",
                key_concepts=[
                    "延迟级别",
                    "定时投递",
                    "延迟队列",
                    "时间轮算法",
                    "精确延迟"
                ],
                implementation_points=[
                    "设置delayTimeLevel属性",
                    "使用MessageBuilder构建延迟消息",
                    "处理延迟消息的消费逻辑",
                    "监控延迟消息队列",
                    "合理设置延迟时间"
                ],
                use_cases=[
                    "订单超时处理",
                    "定时任务调度",
                    "支付超时提醒",
                    "缓存预热",
                    "重试机制"
                ],
                best_practices=[
                    "选择合适的延迟级别",
                    "避免大量长延迟消息",
                    "监控延迟消息积压",
                    "设计幂等消费逻辑",
                    "考虑时钟偏移影响"
                ],
                common_pitfalls=[
                    "延迟时间设置不当",
                    "大量延迟消息导致内存压力",
                    "忽略延迟精度限制",
                    "延迟消息丢失处理",
                    "时区问题"
                ]
            ),
            
            FeatureCategory.BATCH: FeatureSummary(
                category=FeatureCategory.BATCH,
                name="批量消息",
                description="支持批量发送和处理消息以提高吞吐量",
                key_concepts=[
                    "批量发送",
                    "批量大小控制",
                    "消息压缩",
                    "批量消费",
                    "性能优化"
                ],
                implementation_points=[
                    "使用sendBatch方法",
                    "控制批次大小和消息数量",
                    "实现消息分割逻辑",
                    "处理批量发送异常",
                    "优化序列化性能"
                ],
                use_cases=[
                    "日志收集",
                    "数据同步",
                    "批量通知",
                    "报表生成",
                    "ETL处理"
                ],
                best_practices=[
                    "合理控制批次大小",
                    "启用消息压缩",
                    "异步批量处理",
                    "监控批量处理性能",
                    "设计重试机制"
                ],
                common_pitfalls=[
                    "批次过大导致超时",
                    "批量消息部分失败处理",
                    "内存使用过多",
                    "忽略消息顺序",
                    "缺少错误处理"
                ]
            ),
            
            FeatureCategory.ORDER: FeatureSummary(
                category=FeatureCategory.ORDER,
                name="顺序消息",
                description="保证消息按照发送顺序被消费",
                key_concepts=[
                    "全局顺序",
                    "分区顺序",
                    "队列选择",
                    "顺序消费",
                    "消费锁定"
                ],
                implementation_points=[
                    "实现MessageQueueSelector",
                    "使用MessageListenerOrderly",
                    "处理消费失败重试",
                    "避免消费阻塞",
                    "监控消费进度"
                ],
                use_cases=[
                    "订单状态变更",
                    "账户余额变动",
                    "库存变化记录",
                    "用户行为轨迹",
                    "数据库binlog同步"
                ],
                best_practices=[
                    "合理选择分区键",
                    "避免热点队列",
                    "快速处理消息",
                    "设计幂等逻辑",
                    "监控消费延迟"
                ],
                common_pitfalls=[
                    "消费逻辑过于复杂",
                    "分区键选择不当",
                    "消费阻塞影响性能",
                    "重试机制破坏顺序",
                    "队列负载不均"
                ]
            ),
            
            FeatureCategory.FILTER: FeatureSummary(
                category=FeatureCategory.FILTER,
                name="消息过滤",
                description="根据条件过滤消息,减少无效消费",
                key_concepts=[
                    "Tag过滤",
                    "SQL过滤",
                    "属性过滤",
                    "内容过滤",
                    "过滤表达式"
                ],
                implementation_points=[
                    "设置消息Tag和属性",
                    "编写SQL过滤表达式",
                    "实现自定义过滤器",
                    "优化过滤性能",
                    "监控过滤效果"
                ],
                use_cases=[
                    "用户订阅过滤",
                    "地域消息分发",
                    "优先级消息处理",
                    "内容分类",
                    "业务路由"
                ],
                best_practices=[
                    "选择合适的过滤方式",
                    "优化过滤表达式",
                    "避免复杂过滤逻辑",
                    "监控过滤命中率",
                    "设计过滤规则管理"
                ],
                common_pitfalls=[
                    "过滤表达式过于复杂",
                    "过滤条件频繁变更",
                    "忽略过滤性能影响",
                    "过滤规则冲突",
                    "缺少过滤监控"
                ]
            ),
            
            FeatureCategory.ROUTING: FeatureSummary(
                category=FeatureCategory.ROUTING,
                name="消息路由",
                description="根据规则将消息路由到不同的处理器",
                key_concepts=[
                    "路由规则",
                    "负载均衡",
                    "故障转移",
                    "路由策略",
                    "动态路由"
                ],
                implementation_points=[
                    "定义路由规则",
                    "实现路由策略",
                    "处理路由失败",
                    "监控路由性能",
                    "支持动态配置"
                ],
                use_cases=[
                    "微服务路由",
                    "多机房部署",
                    "灰度发布",
                    "负载分发",
                    "容灾切换"
                ],
                best_practices=[
                    "设计清晰的路由规则",
                    "实现健康检查",
                    "支持路由回退",
                    "监控路由状态",
                    "优化路由性能"
                ],
                common_pitfalls=[
                    "路由规则过于复杂",
                    "缺少故障处理",
                    "路由性能瓶颈",
                    "配置管理混乱",
                    "监控不完善"
                ]
            )
        }
    
    def _create_integration_patterns(self) -> Dict[str, Dict[str, Any]]:
        """创建集成模式"""
        return {
            "事务+延迟": {
                "description": "事务消息结合延迟消息实现复杂业务流程",
                "scenario": "订单支付超时自动取消",
                "implementation": [
                    "发送事务消息创建订单",
                    "同时发送延迟消息设置超时",
                    "支付成功时取消延迟消息",
                    "超时消息触发订单取消事务"
                ],
                "benefits": ["业务流程自动化", "减少人工干预", "提高系统可靠性"]
            },
            
            "批量+过滤": {
                "description": "批量消息结合过滤实现高效数据处理",
                "scenario": "日志数据批量收集和分类处理",
                "implementation": [
                    "批量收集日志消息",
                    "根据日志级别过滤",
                    "不同级别路由到不同处理器",
                    "批量写入存储系统"
                ],
                "benefits": ["提高处理效率", "减少网络开销", "优化存储性能"]
            },
            
            "顺序+路由": {
                "description": "顺序消息结合路由实现有序业务处理",
                "scenario": "用户操作序列的有序处理和分发",
                "implementation": [
                    "按用户ID保证消息顺序",
                    "根据操作类型路由到不同服务",
                    "维护用户状态一致性",
                    "处理异常和重试"
                ],
                "benefits": ["保证业务一致性", "支持复杂业务流程", "提高系统可扩展性"]
            },
            
            "过滤+路由": {
                "description": "消息过滤结合路由实现智能消息分发",
                "scenario": "多租户系统的消息隔离和分发",
                "implementation": [
                    "根据租户ID过滤消息",
                    "按业务类型进一步过滤",
                    "路由到租户专属处理器",
                    "实现资源隔离"
                ],
                "benefits": ["实现多租户隔离", "优化资源利用", "提高安全性"]
            }
        }
    
    def _create_performance_considerations(self) -> Dict[str, List[str]]:
        """创建性能考虑因素"""
        return {
            "吞吐量优化": [
                "使用批量消息减少网络开销",
                "启用消息压缩节省带宽",
                "合理设置生产者和消费者线程数",
                "优化序列化和反序列化性能",
                "使用异步发送提高并发"
            ],
            
            "延迟优化": [
                "减少消息过滤的复杂度",
                "优化路由规则匹配算法",
                "使用本地缓存减少远程调用",
                "合理设置网络超时参数",
                "避免阻塞式消费处理"
            ],
            
            "内存优化": [
                "控制批量消息的大小",
                "及时释放消息对象",
                "合理设置消费者拉取数量",
                "避免大量延迟消息积压",
                "优化消息缓存策略"
            ],
            
            "可靠性保障": [
                "实现完善的重试机制",
                "设计熔断和降级策略",
                "监控关键性能指标",
                "建立告警和通知机制",
                "定期进行容灾演练"
            ]
        }
    
    def get_feature_summary(self, category: FeatureCategory) -> FeatureSummary:
        """获取特性总结"""
        return self.feature_summaries.get(category)
    
    def print_comprehensive_summary(self):
        """打印综合总结"""
        print("=== RocketMQ高级特性综合总结 ===")
        
        # 特性总结
        print("\n📋 特性总结:")
        for category, summary in self.feature_summaries.items():
            print(f"\n{summary.name}:")
            print(f"  描述: {summary.description}")
            print(f"  核心概念: {', '.join(summary.key_concepts[:3])}...")
            print(f"  主要应用: {', '.join(summary.use_cases[:2])}...")
        
        # 集成模式
        print("\n🔗 集成模式:")
        for pattern_name, pattern_info in self.integration_patterns.items():
            print(f"\n{pattern_name}:")
            print(f"  场景: {pattern_info['scenario']}")
            print(f"  收益: {', '.join(pattern_info['benefits'])}")
        
        # 性能考虑
        print("\n⚡ 性能优化:")
        for aspect, considerations in self.performance_considerations.items():
            print(f"\n{aspect}:")
            for consideration in considerations[:3]:
                print(f"  • {consideration}")
    
    def generate_feature_comparison(self) -> Dict[str, Dict[str, str]]:
        """生成特性对比"""
        comparison = {}
        
        for category, summary in self.feature_summaries.items():
            comparison[summary.name] = {
                "复杂度": self._assess_complexity(category),
                "性能影响": self._assess_performance_impact(category),
                "适用场景": self._assess_use_case_breadth(category),
                "实施难度": self._assess_implementation_difficulty(category)
            }
        
        return comparison
    
    def _assess_complexity(self, category: FeatureCategory) -> str:
        """评估复杂度"""
        complexity_map = {
            FeatureCategory.TRANSACTION: "高",
            FeatureCategory.DELAY: "中",
            FeatureCategory.BATCH: "低",
            FeatureCategory.ORDER: "中",
            FeatureCategory.FILTER: "中",
            FeatureCategory.ROUTING: "高"
        }
        return complexity_map.get(category, "中")
    
    def _assess_performance_impact(self, category: FeatureCategory) -> str:
        """评估性能影响"""
        impact_map = {
            FeatureCategory.TRANSACTION: "中",
            FeatureCategory.DELAY: "低",
            FeatureCategory.BATCH: "正面",
            FeatureCategory.ORDER: "中",
            FeatureCategory.FILTER: "低",
            FeatureCategory.ROUTING: "中"
        }
        return impact_map.get(category, "中")
    
    def _assess_use_case_breadth(self, category: FeatureCategory) -> str:
        """评估适用场景广度"""
        breadth_map = {
            FeatureCategory.TRANSACTION: "中",
            FeatureCategory.DELAY: "广",
            FeatureCategory.BATCH: "广",
            FeatureCategory.ORDER: "中",
            FeatureCategory.FILTER: "广",
            FeatureCategory.ROUTING: "广"
        }
        return breadth_map.get(category, "中")
    
    def _assess_implementation_difficulty(self, category: FeatureCategory) -> str:
        """评估实施难度"""
        difficulty_map = {
            FeatureCategory.TRANSACTION: "高",
            FeatureCategory.DELAY: "低",
            FeatureCategory.BATCH: "低",
            FeatureCategory.ORDER: "中",
            FeatureCategory.FILTER: "中",
            FeatureCategory.ROUTING: "高"
        }
        return difficulty_map.get(category, "中")

# 使用示例
if __name__ == "__main__":
    # 创建总结管理器
    summary = RocketMQAdvancedFeaturesSummary()
    
    # 打印综合总结
    summary.print_comprehensive_summary()
    
    # 生成特性对比
    comparison = summary.generate_feature_comparison()
    print("\n\n📊 特性对比表:")
    print(f"{'特性':<12} {'复杂度':<8} {'性能影响':<10} {'适用场景':<10} {'实施难度':<8}")
    print("-" * 60)
    
    for feature_name, metrics in comparison.items():
        print(f"{feature_name:<12} {metrics['复杂度']:<8} {metrics['性能影响']:<10} {metrics['适用场景']:<10} {metrics['实施难度']:<8}")
    
    # 详细特性说明
    print("\n\n📖 详细特性说明:")
    for category in FeatureCategory:
        feature_summary = summary.get_feature_summary(category)
        if feature_summary:
            print(f"\n=== {feature_summary.name} ===")
            print(f"描述: {feature_summary.description}")
            
            print(f"\n核心概念:")
            for concept in feature_summary.key_concepts:
                print(f"  • {concept}")
            
            print(f"\n最佳实践:")
            for practice in feature_summary.best_practices:
                print(f"  ✅ {practice}")
            
            print(f"\n常见陷阱:")
             for pitfall in feature_summary.common_pitfalls:
                 print(f"  ⚠️ {pitfall}")

7.7.2 学习建议

from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum

class LearningLevel(Enum):
    """学习级别"""
    BEGINNER = "beginner"        # 初学者
    INTERMEDIATE = "intermediate" # 中级
    ADVANCED = "advanced"        # 高级
    EXPERT = "expert"            # 专家

@dataclass
class LearningPath:
    """学习路径"""
    level: LearningLevel
    title: str
    description: str
    prerequisites: List[str]
    learning_objectives: List[str]
    recommended_features: List[str]
    practice_projects: List[str]
    estimated_time: str
    next_steps: List[str]

class RocketMQLearningGuide:
    """RocketMQ学习指南"""
    
    def __init__(self):
        self.learning_paths = self._create_learning_paths()
        self.common_mistakes = self._create_common_mistakes()
        self.best_practices = self._create_best_practices()
        self.resources = self._create_resources()
    
    def _create_learning_paths(self) -> Dict[LearningLevel, LearningPath]:
        """创建学习路径"""
        return {
            LearningLevel.BEGINNER: LearningPath(
                level=LearningLevel.BEGINNER,
                title="RocketMQ入门学习",
                description="掌握RocketMQ基础概念和简单应用",
                prerequisites=[
                    "Java基础知识",
                    "消息队列基本概念",
                    "分布式系统基础"
                ],
                learning_objectives=[
                    "理解消息队列的作用和价值",
                    "掌握RocketMQ基本架构",
                    "学会基本的生产者和消费者开发",
                    "了解Topic和Queue概念",
                    "掌握消息的发送和接收"
                ],
                recommended_features=[
                    "普通消息发送和接收",
                    "基本的Tag过滤",
                    "简单的批量消息"
                ],
                practice_projects=[
                    "简单的聊天系统",
                    "订单通知系统",
                    "日志收集系统"
                ],
                estimated_time="2-3周",
                next_steps=[
                    "学习消息过滤机制",
                    "了解消息持久化",
                    "掌握消费模式"
                ]
            ),
            
            LearningLevel.INTERMEDIATE: LearningPath(
                level=LearningLevel.INTERMEDIATE,
                title="RocketMQ进阶应用",
                description="掌握RocketMQ高级特性和实际应用",
                prerequisites=[
                    "RocketMQ基础知识",
                    "Spring Boot开发经验",
                    "微服务架构理解"
                ],
                learning_objectives=[
                    "掌握顺序消息的使用场景",
                    "理解延迟消息的实现原理",
                    "学会消息过滤的高级用法",
                    "掌握批量消息优化技巧",
                    "了解消息路由机制"
                ],
                recommended_features=[
                    "顺序消息",
                    "延迟消息",
                    "SQL过滤",
                    "批量消息优化",
                    "基本路由规则"
                ],
                practice_projects=[
                    "电商订单处理系统",
                    "用户行为分析系统",
                    "定时任务调度系统",
                    "多租户消息系统"
                ],
                estimated_time="4-6周",
                next_steps=[
                    "学习事务消息",
                    "掌握高级路由策略",
                    "了解性能优化"
                ]
            ),
            
            LearningLevel.ADVANCED: LearningPath(
                level=LearningLevel.ADVANCED,
                title="RocketMQ高级特性",
                description="掌握RocketMQ复杂场景和高级特性",
                prerequisites=[
                    "RocketMQ进阶知识",
                    "分布式事务理解",
                    "高并发系统设计经验"
                ],
                learning_objectives=[
                    "掌握事务消息的设计和实现",
                    "理解消息路由的高级策略",
                    "学会性能调优和监控",
                    "掌握故障处理和容灾设计",
                    "了解集群部署和运维"
                ],
                recommended_features=[
                    "事务消息",
                    "高级消息路由",
                    "性能优化",
                    "监控和告警",
                    "集群管理"
                ],
                practice_projects=[
                    "分布式事务系统",
                    "高并发消息处理平台",
                    "多机房消息同步系统",
                    "智能消息路由系统"
                ],
                estimated_time="6-8周",
                next_steps=[
                    "深入源码学习",
                    "参与开源贡献",
                    "架构设计实践"
                ]
            ),
            
            LearningLevel.EXPERT: LearningPath(
                level=LearningLevel.EXPERT,
                title="RocketMQ专家级应用",
                description="成为RocketMQ领域专家,能够解决复杂问题",
                prerequisites=[
                    "RocketMQ高级特性掌握",
                    "大规模系统设计经验",
                    "深度技术理解能力"
                ],
                learning_objectives=[
                    "深入理解RocketMQ内部机制",
                    "能够进行架构设计和优化",
                    "掌握复杂问题的诊断和解决",
                    "具备技术选型和决策能力",
                    "能够指导团队和培训他人"
                ],
                recommended_features=[
                    "源码深度分析",
                    "自定义扩展开发",
                    "性能极致优化",
                    "复杂场景设计",
                    "技术架构决策"
                ],
                practice_projects=[
                    "企业级消息中台",
                    "跨云消息同步系统",
                    "消息队列性能测试平台",
                    "RocketMQ扩展组件开发"
                ],
                estimated_time="持续学习",
                next_steps=[
                    "技术布道和分享",
                    "开源项目维护",
                    "技术标准制定"
                ]
            )
        }
    
    def _create_common_mistakes(self) -> Dict[str, List[str]]:
        """创建常见错误"""
        return {
            "设计阶段": [
                "没有合理规划Topic和Queue数量",
                "忽略消息顺序性需求分析",
                "过度设计复杂的路由规则",
                "没有考虑消息幂等性",
                "忽略性能和容量规划"
            ],
            
            "开发阶段": [
                "事务消息回查逻辑不幂等",
                "批量消息大小控制不当",
                "过滤表达式过于复杂",
                "异常处理不完善",
                "缺少必要的日志和监控"
            ],
            
            "部署阶段": [
                "集群配置不合理",
                "网络和防火墙配置错误",
                "资源分配不足",
                "缺少监控和告警",
                "没有制定运维流程"
            ],
            
            "运维阶段": [
                "忽略性能监控指标",
                "缺少容灾和备份策略",
                "版本升级计划不当",
                "问题排查能力不足",
                "缺少性能调优经验"
            ]
        }
    
    def _create_best_practices(self) -> Dict[str, List[str]]:
        """创建最佳实践"""
        return {
            "架构设计": [
                "根据业务特点选择合适的消息模式",
                "合理设计Topic和Queue的数量和分布",
                "考虑消息的生命周期和存储策略",
                "设计完善的监控和告警体系",
                "制定清晰的容灾和恢复策略"
            ],
            
            "开发实践": [
                "始终考虑消息的幂等性设计",
                "合理使用批量操作提高性能",
                "实现完善的异常处理和重试机制",
                "添加详细的日志和监控埋点",
                "编写充分的单元测试和集成测试"
            ],
            
            "性能优化": [
                "根据业务场景调整生产者和消费者参数",
                "合理使用消息压缩和批量发送",
                "优化消息序列化和反序列化",
                "监控和调整JVM参数",
                "定期进行性能测试和容量规划"
            ],
            
            "运维管理": [
                "建立完善的监控指标体系",
                "制定标准的运维操作流程",
                "定期进行备份和恢复演练",
                "建立问题排查和处理流程",
                "持续进行性能优化和调整"
            ]
        }
    
    def _create_resources(self) -> Dict[str, List[str]]:
        """创建学习资源"""
        return {
            "官方文档": [
                "RocketMQ官方文档",
                "Apache RocketMQ GitHub",
                "RocketMQ用户指南",
                "RocketMQ最佳实践",
                "RocketMQ FAQ"
            ],
            
            "技术博客": [
                "阿里云RocketMQ技术博客",
                "Apache RocketMQ社区博客",
                "各大技术平台RocketMQ文章",
                "开发者个人技术博客",
                "企业技术团队分享"
            ],
            
            "视频教程": [
                "RocketMQ官方视频教程",
                "在线教育平台课程",
                "技术会议演讲视频",
                "开发者分享视频",
                "企业内训视频"
            ],
            
            "实践项目": [
                "GitHub开源项目",
                "企业级案例研究",
                "技术竞赛项目",
                "个人练习项目",
                "社区贡献项目"
            ],
            
            "社区交流": [
                "RocketMQ官方社区",
                "技术论坛和问答平台",
                "微信群和QQ群",
                "技术会议和聚会",
                "开源贡献和代码审查"
            ]
        }
    
    def get_learning_path(self, level: LearningLevel) -> LearningPath:
        """获取学习路径"""
        return self.learning_paths.get(level)
    
    def recommend_learning_path(self, current_skills: List[str]) -> LearningLevel:
        """推荐学习路径"""
        skill_score = 0
        
        # 基础技能评分
        basic_skills = ["java", "消息队列", "分布式"]
        skill_score += sum(1 for skill in basic_skills if any(s in skill.lower() for s in current_skills))
        
        # RocketMQ技能评分
        rocketmq_skills = ["rocketmq", "生产者", "消费者", "topic"]
        skill_score += sum(2 for skill in rocketmq_skills if any(s in skill.lower() for s in current_skills)) 
        
        # 高级技能评分
        advanced_skills = ["事务消息", "路由", "性能优化", "集群"]
        skill_score += sum(3 for skill in advanced_skills if any(s in skill.lower() for s in current_skills))
        
        # 根据评分推荐级别
        if skill_score >= 15:
            return LearningLevel.EXPERT
        elif skill_score >= 10:
            return LearningLevel.ADVANCED
        elif skill_score >= 5:
            return LearningLevel.INTERMEDIATE
        else:
            return LearningLevel.BEGINNER
    
    def print_learning_guide(self, level: LearningLevel = None):
        """打印学习指南"""
        if level:
            path = self.get_learning_path(level)
            if path:
                self._print_single_path(path)
        else:
            print("=== RocketMQ完整学习指南 ===")
            for level in LearningLevel:
                path = self.get_learning_path(level)
                if path:
                    self._print_single_path(path)
                    print("\n" + "="*60 + "\n")
    
    def _print_single_path(self, path: LearningPath):
        """打印单个学习路径"""
        print(f"📚 {path.title} ({path.level.value.upper()})")
        print(f"描述: {path.description}")
        print(f"预计时间: {path.estimated_time}")
        
        print(f"\n📋 前置条件:")
        for prereq in path.prerequisites:
            print(f"  • {prereq}")
        
        print(f"\n🎯 学习目标:")
        for objective in path.learning_objectives:
            print(f"  • {objective}")
        
        print(f"\n🔧 推荐特性:")
        for feature in path.recommended_features:
            print(f"  • {feature}")
        
        print(f"\n💼 实践项目:")
        for project in path.practice_projects:
            print(f"  • {project}")
        
        print(f"\n➡️ 下一步:")
        for step in path.next_steps:
            print(f"  • {step}")
    
    def print_common_mistakes(self):
        """打印常见错误"""
        print("⚠️ 常见错误和陷阱:")
        for phase, mistakes in self.common_mistakes.items():
            print(f"\n{phase}:")
            for mistake in mistakes:
                print(f"  ❌ {mistake}")
    
    def print_best_practices(self):
        """打印最佳实践"""
        print("✅ 最佳实践建议:")
        for category, practices in self.best_practices.items():
            print(f"\n{category}:")
            for practice in practices:
                print(f"  ✅ {practice}")
    
    def print_resources(self):
        """打印学习资源"""
        print("📖 学习资源推荐:")
        for category, resources in self.resources.items():
            print(f"\n{category}:")
            for resource in resources:
                print(f"  📎 {resource}")

# 使用示例
if __name__ == "__main__":
    # 创建学习指南
    guide = RocketMQLearningGuide()
    
    # 技能评估和路径推荐
    current_skills = ["Java", "Spring Boot", "消息队列基础", "RocketMQ基础"]
    recommended_level = guide.recommend_learning_path(current_skills)
    print(f"根据您的技能背景,推荐学习级别: {recommended_level.value.upper()}")
    
    # 打印推荐的学习路径
    print("\n" + "="*60)
    guide.print_learning_guide(recommended_level)
    
    # 打印常见错误
    print("\n" + "="*60)
    guide.print_common_mistakes()
    
    # 打印最佳实践
    print("\n" + "="*60)
    guide.print_best_practices()
    
    # 打印学习资源
    print("\n" + "="*60)
    guide.print_resources()

7.7.3 下一步学习方向

本章介绍了RocketMQ的六大高级特性,为了更好地掌握和应用这些特性,建议按照以下方向继续深入学习:

1. 深入理解原理

  • 存储机制: 学习RocketMQ的消息存储原理,包括CommitLog、ConsumeQueue等
  • 网络通信: 了解Netty在RocketMQ中的应用和通信协议
  • 负载均衡: 深入研究消费者负载均衡算法和队列分配策略
  • 高可用机制: 学习主从复制、故障转移等高可用实现

2. 性能优化实践

  • 参数调优: 掌握关键配置参数的调优方法
  • JVM优化: 学习针对RocketMQ的JVM参数优化
  • 网络优化: 了解网络层面的性能优化技巧
  • 存储优化: 掌握磁盘I/O和存储的优化方法

3. 运维和监控

  • 集群部署: 学习生产环境的集群部署和配置
  • 监控体系: 建立完善的监控指标和告警机制
  • 故障排查: 掌握常见问题的诊断和解决方法
  • 容量规划: 学习容量评估和扩容策略

4. 企业级应用

  • 微服务集成: 在微服务架构中合理使用RocketMQ
  • 多机房部署: 学习跨机房的消息同步和容灾
  • 安全加固: 了解消息队列的安全防护措施
  • 合规要求: 满足企业级的合规和审计要求

5. 技术扩展

  • 源码学习: 深入阅读RocketMQ源码,理解实现细节
  • 自定义扩展: 开发自定义的插件和扩展组件
  • 技术选型: 学会在不同场景下选择合适的消息队列
  • 架构设计: 能够设计复杂的消息驱动架构

6. 社区参与

  • 开源贡献: 参与RocketMQ开源项目的贡献
  • 技术分享: 在技术社区分享使用经验和最佳实践
  • 问题反馈: 积极反馈使用中遇到的问题和建议
  • 标准制定: 参与相关技术标准的制定和推广

通过系统性的学习和实践,您将能够熟练掌握RocketMQ的高级特性,并在实际项目中发挥其强大的功能。记住,技术的掌握需要理论学习与实践相结合,建议在学习过程中多动手实践,多思考问题,多与社区交流。


本章小结

本章详细介绍了RocketMQ的六大高级特性:事务消息、延迟消息、批量消息、顺序消息、消息过滤和消息路由。每个特性都包含了原理解析、实现方式、应用场景和最佳实践。这些高级特性使RocketMQ能够满足复杂业务场景的需求,是构建高可靠、高性能消息系统的重要工具。

掌握这些高级特性,将帮助您: - 设计更加可靠的分布式系统 - 处理复杂的业务流程和数据一致性问题 - 优化系统性能和资源利用率 - 构建智能化的消息处理平台

在下一章中,我们将学习RocketMQ的集群部署和运维管理,了解如何在生产环境中稳定运行RocketMQ集群。