5.1 顺序消息

5.1.1 顺序消息概念

from enum import Enum
from typing import List, Dict, Optional, Callable
from dataclasses import dataclass
from abc import ABC, abstractmethod
import time
import threading
import hashlib

# 顺序类型枚举
class OrderType(Enum):
    GLOBAL_ORDER = "GLOBAL_ORDER"    # 全局顺序
    PARTITION_ORDER = "PARTITION_ORDER"  # 分区顺序

# 顺序消息状态
class OrderMessageStatus(Enum):
    SUCCESS = "SUCCESS"
    ROLLBACK = "ROLLBACK"
    COMMIT = "COMMIT"
    UNKNOWN = "UNKNOWN"

# 顺序消息
@dataclass
class OrderMessage:
    topic: str
    tags: str
    keys: str
    body: bytes
    order_id: str  # 顺序标识
    order_type: OrderType = OrderType.PARTITION_ORDER
    properties: Dict[str, str] = None
    
    def __post_init__(self):
        if self.properties is None:
            self.properties = {}
        
        # 设置顺序标识属性
        self.properties["ORDER_ID"] = self.order_id
        self.properties["ORDER_TYPE"] = self.order_type.value
    
    def get_order_key(self) -> str:
        """获取顺序键"""
        return self.order_id

# 顺序消息选择器
class OrderMessageQueueSelector:
    """顺序消息队列选择器"""
    
    def select(self, queues: List['MessageQueue'], message: OrderMessage, arg: str) -> 'MessageQueue':
        """选择消息队列"""
        if not queues:
            raise ValueError("消息队列列表为空")
        
        # 根据顺序ID计算队列索引
        order_key = message.get_order_key()
        hash_value = self._hash(order_key)
        index = hash_value % len(queues)
        
        return queues[index]
    
    def _hash(self, key: str) -> int:
        """计算哈希值"""
        return hash(key) & 0x7fffffff

# 顺序消息生产者
class OrderMessageProducer:
    """顺序消息生产者"""
    
    def __init__(self, producer_config: 'ProducerConfig'):
        self.config = producer_config
        self.producer = None  # 底层生产者
        self.queue_selector = OrderMessageQueueSelector()
        self.send_stats = {
            "total_sent": 0,
            "success_sent": 0,
            "failed_sent": 0
        }
    
    def start(self):
        """启动顺序消息生产者"""
        # 初始化底层生产者
        from rocketmq_producer import DefaultMQProducer
        self.producer = DefaultMQProducer(self.config)
        self.producer.start()
        print("顺序消息生产者启动成功")
    
    def shutdown(self):
        """关闭顺序消息生产者"""
        if self.producer:
            self.producer.shutdown()
        print("顺序消息生产者关闭成功")
    
    def send_order_message(self, message: OrderMessage) -> 'SendResult':
        """发送顺序消息"""
        try:
            # 转换为普通消息
            normal_message = self._convert_to_message(message)
            
            # 使用顺序选择器发送
            result = self.producer.send(
                message=normal_message,
                selector=self.queue_selector,
                arg=message.order_id
            )
            
            self.send_stats["total_sent"] += 1
            self.send_stats["success_sent"] += 1
            
            return result
            
        except Exception as e:
            self.send_stats["total_sent"] += 1
            self.send_stats["failed_sent"] += 1
            raise Exception(f"发送顺序消息失败: {e}")
    
    def send_order_message_async(self, message: OrderMessage, 
                               callback: 'SendCallback'):
        """异步发送顺序消息"""
        try:
            normal_message = self._convert_to_message(message)
            
            # 包装回调
            wrapped_callback = OrderSendCallback(callback, self.send_stats)
            
            self.producer.send_async(
                message=normal_message,
                selector=self.queue_selector,
                arg=message.order_id,
                callback=wrapped_callback
            )
            
        except Exception as e:
            self.send_stats["total_sent"] += 1
            self.send_stats["failed_sent"] += 1
            callback.on_exception(e)
    
    def _convert_to_message(self, order_message: OrderMessage) -> 'Message':
        """转换为普通消息"""
        from rocketmq_producer import Message
        
        message = Message(
            topic=order_message.topic,
            tags=order_message.tags,
            keys=order_message.keys,
            body=order_message.body
        )
        
        # 复制属性
        for key, value in order_message.properties.items():
            message.put_property(key, value)
        
        return message
    
    def get_send_stats(self) -> Dict[str, int]:
        """获取发送统计"""
        return self.send_stats.copy()

# 顺序发送回调包装器
class OrderSendCallback:
    """顺序发送回调包装器"""
    
    def __init__(self, callback: 'SendCallback', stats: Dict[str, int]):
        self.callback = callback
        self.stats = stats
    
    def on_success(self, result: 'SendResult'):
        """发送成功回调"""
        self.stats["total_sent"] += 1
        self.stats["success_sent"] += 1
        self.callback.on_success(result)
    
    def on_exception(self, exception: Exception):
        """发送异常回调"""
        self.stats["total_sent"] += 1
        self.stats["failed_sent"] += 1
        self.callback.on_exception(exception)

5.1.2 顺序消息消费

# 顺序消息消费者
class OrderMessageConsumer:
    """顺序消息消费者"""
    
    def __init__(self, consumer_config: 'ConsumerConfig'):
        self.config = consumer_config
        # 强制设置为顺序消费模式
        self.config.consume_mode = ConsumeMode.ORDERLY
        self.consumer = None
        self.order_listener: Optional['OrderMessageListener'] = None
        self.consume_stats = {
            "total_consumed": 0,
            "success_consumed": 0,
            "failed_consumed": 0
        }
    
    def start(self):
        """启动顺序消息消费者"""
        if not self.order_listener:
            raise Exception("未设置顺序消息监听器")
        
        # 初始化底层消费者
        from rocketmq_consumer import DefaultMQPushConsumer
        self.consumer = DefaultMQPushConsumer(self.config)
        
        # 注册顺序消息监听器
        wrapped_listener = OrderMessageListenerWrapper(
            self.order_listener, self.consume_stats
        )
        self.consumer.register_message_listener(wrapped_listener)
        
        self.consumer.start()
        print("顺序消息消费者启动成功")
    
    def shutdown(self):
        """关闭顺序消息消费者"""
        if self.consumer:
            self.consumer.shutdown()
        print("顺序消息消费者关闭成功")
    
    def subscribe(self, topic: str, sub_expression: str = "*"):
        """订阅主题"""
        if self.consumer:
            self.consumer.subscribe(topic, sub_expression)
    
    def register_order_listener(self, listener: 'OrderMessageListener'):
        """注册顺序消息监听器"""
        self.order_listener = listener
    
    def get_consume_stats(self) -> Dict[str, int]:
        """获取消费统计"""
        return self.consume_stats.copy()

# 顺序消息监听器接口
class OrderMessageListener(ABC):
    """顺序消息监听器接口"""
    
    @abstractmethod
    def consume_order_message(self, messages: List['MessageExt'], 
                            context: 'ConsumeOrderlyContext') -> 'ConsumeOrderlyResult':
        """消费顺序消息"""
        pass

# 顺序消费上下文
@dataclass
class ConsumeOrderlyContext:
    consumer_group: str
    message_queue: 'MessageQueue'
    auto_commit: bool = True
    suspend_current_queue_time_millis: int = 1000
    
    def set_auto_commit(self, auto_commit: bool):
        """设置自动提交"""
        self.auto_commit = auto_commit
    
    def set_suspend_current_queue_time_millis(self, time_millis: int):
        """设置暂停当前队列时间"""
        self.suspend_current_queue_time_millis = time_millis

# 顺序消费结果
class ConsumeOrderlyResult(Enum):
    SUCCESS = "SUCCESS"
    SUSPEND_CURRENT_QUEUE_A_MOMENT = "SUSPEND_CURRENT_QUEUE_A_MOMENT"

# 顺序消息监听器包装器
class OrderMessageListenerWrapper:
    """顺序消息监听器包装器"""
    
    def __init__(self, order_listener: OrderMessageListener, 
                 stats: Dict[str, int]):
        self.order_listener = order_listener
        self.stats = stats
    
    def consume_message(self, messages: List['MessageExt'], 
                      context: 'ConsumeContext') -> 'ConsumeResult':
        """消费消息(适配器方法)"""
        try:
            # 转换上下文
            order_context = ConsumeOrderlyContext(
                consumer_group=context.consumer_group,
                message_queue=context.message_queue
            )
            
            # 调用顺序监听器
            result = self.order_listener.consume_order_message(messages, order_context)
            
            # 更新统计
            if result == ConsumeOrderlyResult.SUCCESS:
                self.stats["total_consumed"] += len(messages)
                self.stats["success_consumed"] += len(messages)
                return ConsumeResult.SUCCESS
            else:
                self.stats["total_consumed"] += len(messages)
                self.stats["failed_consumed"] += len(messages)
                return ConsumeResult.RECONSUME_LATER
                
        except Exception as e:
            print(f"顺序消费异常: {e}")
            self.stats["total_consumed"] += len(messages)
            self.stats["failed_consumed"] += len(messages)
            return ConsumeResult.RECONSUME_LATER

# 业务顺序消息监听器
class BusinessOrderMessageListener(OrderMessageListener):
    """业务顺序消息监听器"""
    
    def __init__(self, business_handler: Callable[[List['MessageExt']], bool]):
        self.business_handler = business_handler
        self.processed_orders = {}  # 已处理的订单
        self.lock = threading.Lock()
    
    def consume_order_message(self, messages: List['MessageExt'], 
                            context: ConsumeOrderlyContext) -> ConsumeOrderlyResult:
        """消费顺序消息"""
        try:
            # 检查消息顺序
            if not self._check_message_order(messages):
                print("消息顺序检查失败,暂停队列")
                context.set_suspend_current_queue_time_millis(5000)
                return ConsumeOrderlyResult.SUSPEND_CURRENT_QUEUE_A_MOMENT
            
            # 处理业务逻辑
            success = self.business_handler(messages)
            
            if success:
                # 记录已处理的订单
                self._record_processed_orders(messages)
                return ConsumeOrderlyResult.SUCCESS
            else:
                print("业务处理失败,暂停队列")
                return ConsumeOrderlyResult.SUSPEND_CURRENT_QUEUE_A_MOMENT
                
        except Exception as e:
            print(f"顺序消费异常: {e}")
            return ConsumeOrderlyResult.SUSPEND_CURRENT_QUEUE_A_MOMENT
    
    def _check_message_order(self, messages: List['MessageExt']) -> bool:
        """检查消息顺序"""
        for msg in messages:
            order_id = msg.get_property("ORDER_ID")
            if not order_id:
                continue
            
            # 检查是否按顺序到达
            with self.lock:
                last_offset = self.processed_orders.get(order_id, -1)
                if msg.queue_offset <= last_offset:
                    return False
        
        return True
    
    def _record_processed_orders(self, messages: List['MessageExt']):
        """记录已处理的订单"""
        with self.lock:
            for msg in messages:
                order_id = msg.get_property("ORDER_ID")
                if order_id:
                    self.processed_orders[order_id] = msg.queue_offset

5.2 事务消息

5.2.1 事务消息概念

# 事务消息状态
class LocalTransactionState(Enum):
    COMMIT_MESSAGE = "COMMIT_MESSAGE"      # 提交事务
    ROLLBACK_MESSAGE = "ROLLBACK_MESSAGE"  # 回滚事务
    UNKNOWN = "UNKNOWN"                    # 未知状态

# 事务消息
@dataclass
class TransactionMessage:
    topic: str
    tags: str
    keys: str
    body: bytes
    transaction_id: str  # 事务ID
    properties: Dict[str, str] = None
    
    def __post_init__(self):
        if self.properties is None:
            self.properties = {}
        
        # 设置事务标识
        self.properties["TRANSACTION_ID"] = self.transaction_id
        self.properties["TRAN_MSG"] = "true"

# 事务监听器接口
class TransactionListener(ABC):
    """事务监听器接口"""
    
    @abstractmethod
    def execute_local_transaction(self, message: TransactionMessage, 
                                arg: object) -> LocalTransactionState:
        """执行本地事务"""
        pass
    
    @abstractmethod
    def check_local_transaction(self, message: TransactionMessage) -> LocalTransactionState:
        """检查本地事务状态"""
        pass

# 事务消息生产者
class TransactionMQProducer:
    """事务消息生产者"""
    
    def __init__(self, producer_config: 'ProducerConfig'):
        self.config = producer_config
        self.producer = None
        self.transaction_listener: Optional[TransactionListener] = None
        self.transaction_check_executor = None
        self.transaction_states = {}  # 事务状态缓存
        self.transaction_stats = {
            "total_transactions": 0,
            "committed_transactions": 0,
            "rollback_transactions": 0,
            "unknown_transactions": 0
        }
        self.lock = threading.Lock()
    
    def start(self):
        """启动事务消息生产者"""
        if not self.transaction_listener:
            raise Exception("未设置事务监听器")
        
        # 初始化底层生产者
        from rocketmq_producer import DefaultMQProducer
        self.producer = DefaultMQProducer(self.config)
        
        # 启动事务检查线程池
        from concurrent.futures import ThreadPoolExecutor
        self.transaction_check_executor = ThreadPoolExecutor(
            max_workers=5,
            thread_name_prefix="TransactionCheck"
        )
        
        self.producer.start()
        print("事务消息生产者启动成功")
    
    def shutdown(self):
        """关闭事务消息生产者"""
        if self.transaction_check_executor:
            self.transaction_check_executor.shutdown(wait=True)
        
        if self.producer:
            self.producer.shutdown()
        
        print("事务消息生产者关闭成功")
    
    def set_transaction_listener(self, listener: TransactionListener):
        """设置事务监听器"""
        self.transaction_listener = listener
    
    def send_message_in_transaction(self, message: TransactionMessage, 
                                  arg: object = None) -> 'TransactionSendResult':
        """发送事务消息"""
        try:
            # 1. 发送半消息(Prepare消息)
            prepare_result = self._send_prepare_message(message)
            
            # 2. 执行本地事务
            local_state = self.transaction_listener.execute_local_transaction(message, arg)
            
            # 3. 根据本地事务结果提交或回滚
            end_result = self._end_transaction(message, prepare_result, local_state)
            
            # 4. 更新统计
            self._update_transaction_stats(local_state)
            
            return TransactionSendResult(
                send_result=prepare_result,
                local_transaction_state=local_state,
                end_result=end_result
            )
            
        except Exception as e:
            # 异常时回滚事务
            if 'prepare_result' in locals():
                self._end_transaction(message, prepare_result, LocalTransactionState.ROLLBACK_MESSAGE)
            
            self._update_transaction_stats(LocalTransactionState.ROLLBACK_MESSAGE)
            raise Exception(f"发送事务消息失败: {e}")
    
    def _send_prepare_message(self, message: TransactionMessage) -> 'SendResult':
        """发送准备消息"""
        # 转换为普通消息
        normal_message = self._convert_to_message(message)
        
        # 标记为准备消息
        normal_message.put_property("PREPARE_MESSAGE", "true")
        
        # 发送到Broker
        result = self.producer.send(normal_message)
        
        # 缓存事务状态
        with self.lock:
            self.transaction_states[message.transaction_id] = {
                "message": message,
                "prepare_result": result,
                "state": LocalTransactionState.UNKNOWN,
                "create_time": int(time.time() * 1000)
            }
        
        return result
    
    def _end_transaction(self, message: TransactionMessage, 
                        prepare_result: 'SendResult', 
                        state: LocalTransactionState) -> bool:
        """结束事务"""
        try:
            # 发送事务结束请求到Broker
            end_request = TransactionEndRequest(
                transaction_id=message.transaction_id,
                msg_id=prepare_result.msg_id,
                transaction_state=state
            )
            
            # 这里应该发送到Broker
            success = self._send_end_transaction_request(end_request)
            
            # 更新缓存状态
            with self.lock:
                if message.transaction_id in self.transaction_states:
                    self.transaction_states[message.transaction_id]["state"] = state
            
            return success
            
        except Exception as e:
            print(f"结束事务失败: {e}")
            return False
    
    def _send_end_transaction_request(self, request: 'TransactionEndRequest') -> bool:
        """发送事务结束请求"""
        # 模拟发送到Broker
        print(f"发送事务结束请求: {request.transaction_id}, 状态: {request.transaction_state}")
        return True
    
    def check_transaction_state(self, transaction_id: str) -> LocalTransactionState:
        """检查事务状态(Broker回查)"""
        try:
            with self.lock:
                transaction_info = self.transaction_states.get(transaction_id)
            
            if not transaction_info:
                return LocalTransactionState.UNKNOWN
            
            # 调用监听器检查本地事务状态
            message = transaction_info["message"]
            state = self.transaction_listener.check_local_transaction(message)
            
            # 更新缓存状态
            with self.lock:
                transaction_info["state"] = state
            
            return state
            
        except Exception as e:
            print(f"检查事务状态异常: {e}")
            return LocalTransactionState.UNKNOWN
    
    def _convert_to_message(self, transaction_message: TransactionMessage) -> 'Message':
        """转换为普通消息"""
        from rocketmq_producer import Message
        
        message = Message(
            topic=transaction_message.topic,
            tags=transaction_message.tags,
            keys=transaction_message.keys,
            body=transaction_message.body
        )
        
        # 复制属性
        for key, value in transaction_message.properties.items():
            message.put_property(key, value)
        
        return message
    
    def _update_transaction_stats(self, state: LocalTransactionState):
        """更新事务统计"""
        with self.lock:
            self.transaction_stats["total_transactions"] += 1
            
            if state == LocalTransactionState.COMMIT_MESSAGE:
                self.transaction_stats["committed_transactions"] += 1
            elif state == LocalTransactionState.ROLLBACK_MESSAGE:
                self.transaction_stats["rollback_transactions"] += 1
            else:
                self.transaction_stats["unknown_transactions"] += 1
    
    def get_transaction_stats(self) -> Dict[str, int]:
        """获取事务统计"""
        with self.lock:
            return self.transaction_stats.copy()
    
    def cleanup_expired_transactions(self, expire_time_ms: int = 300000):
        """清理过期事务"""
        current_time = int(time.time() * 1000)
        expired_transactions = []
        
        with self.lock:
            for transaction_id, info in self.transaction_states.items():
                if current_time - info["create_time"] > expire_time_ms:
                    expired_transactions.append(transaction_id)
            
            for transaction_id in expired_transactions:
                del self.transaction_states[transaction_id]
        
        print(f"清理过期事务: {len(expired_transactions)} 个")

# 事务发送结果
@dataclass
class TransactionSendResult:
    send_result: 'SendResult'
    local_transaction_state: LocalTransactionState
    end_result: bool

# 事务结束请求
@dataclass
class TransactionEndRequest:
    transaction_id: str
    msg_id: str
    transaction_state: LocalTransactionState

5.2.2 事务监听器实现

# 数据库事务监听器
class DatabaseTransactionListener(TransactionListener):
    """数据库事务监听器"""
    
    def __init__(self, db_connection):
        self.db_connection = db_connection
        self.transaction_records = {}  # 事务记录
        self.lock = threading.Lock()
    
    def execute_local_transaction(self, message: TransactionMessage, 
                                arg: object) -> LocalTransactionState:
        """执行本地数据库事务"""
        transaction_id = message.transaction_id
        
        try:
            # 开始数据库事务
            self.db_connection.begin()
            
            # 执行业务逻辑
            success = self._execute_business_logic(message, arg)
            
            if success:
                # 提交数据库事务
                self.db_connection.commit()
                
                # 记录事务状态
                with self.lock:
                    self.transaction_records[transaction_id] = {
                        "state": LocalTransactionState.COMMIT_MESSAGE,
                        "timestamp": int(time.time() * 1000),
                        "message": message
                    }
                
                return LocalTransactionState.COMMIT_MESSAGE
            else:
                # 回滚数据库事务
                self.db_connection.rollback()
                
                with self.lock:
                    self.transaction_records[transaction_id] = {
                        "state": LocalTransactionState.ROLLBACK_MESSAGE,
                        "timestamp": int(time.time() * 1000),
                        "message": message
                    }
                
                return LocalTransactionState.ROLLBACK_MESSAGE
                
        except Exception as e:
            print(f"执行本地事务异常: {e}")
            
            # 回滚数据库事务
            try:
                self.db_connection.rollback()
            except:
                pass
            
            with self.lock:
                self.transaction_records[transaction_id] = {
                    "state": LocalTransactionState.ROLLBACK_MESSAGE,
                    "timestamp": int(time.time() * 1000),
                    "message": message,
                    "error": str(e)
                }
            
            return LocalTransactionState.ROLLBACK_MESSAGE
    
    def check_local_transaction(self, message: TransactionMessage) -> LocalTransactionState:
        """检查本地事务状态"""
        transaction_id = message.transaction_id
        
        # 从缓存中查找
        with self.lock:
            record = self.transaction_records.get(transaction_id)
        
        if record:
            return record["state"]
        
        # 从数据库中查找事务记录
        try:
            state = self._query_transaction_state_from_db(transaction_id)
            return state
        except Exception as e:
            print(f"查询事务状态异常: {e}")
            return LocalTransactionState.UNKNOWN
    
    def _execute_business_logic(self, message: TransactionMessage, arg: object) -> bool:
        """执行业务逻辑"""
        # 这里实现具体的业务逻辑
        # 例如:更新订单状态、扣减库存等
        
        try:
            # 模拟业务操作
            business_data = message.body.decode('utf-8')
            print(f"执行业务逻辑: {business_data}")
            
            # 执行SQL操作
            cursor = self.db_connection.cursor()
            cursor.execute(
                "INSERT INTO transaction_log (transaction_id, message_data, create_time) VALUES (?, ?, ?)",
                (message.transaction_id, business_data, int(time.time() * 1000))
            )
            
            return True
            
        except Exception as e:
            print(f"业务逻辑执行失败: {e}")
            return False
    
    def _query_transaction_state_from_db(self, transaction_id: str) -> LocalTransactionState:
        """从数据库查询事务状态"""
        try:
            cursor = self.db_connection.cursor()
            cursor.execute(
                "SELECT state FROM transaction_records WHERE transaction_id = ?",
                (transaction_id,)
            )
            
            result = cursor.fetchone()
            if result:
                state_str = result[0]
                return LocalTransactionState(state_str)
            else:
                return LocalTransactionState.UNKNOWN
                
        except Exception as e:
            print(f"查询数据库事务状态失败: {e}")
            return LocalTransactionState.UNKNOWN

# 业务事务监听器
class BusinessTransactionListener(TransactionListener):
    """业务事务监听器"""
    
    def __init__(self, business_executor: Callable[[TransactionMessage, object], bool]):
        self.business_executor = business_executor
        self.transaction_cache = {}  # 事务缓存
        self.lock = threading.Lock()
    
    def execute_local_transaction(self, message: TransactionMessage, 
                                arg: object) -> LocalTransactionState:
        """执行本地业务事务"""
        transaction_id = message.transaction_id
        
        try:
            # 执行业务逻辑
            success = self.business_executor(message, arg)
            
            state = LocalTransactionState.COMMIT_MESSAGE if success else LocalTransactionState.ROLLBACK_MESSAGE
            
            # 缓存事务状态
            with self.lock:
                self.transaction_cache[transaction_id] = {
                    "state": state,
                    "timestamp": int(time.time() * 1000),
                    "message": message
                }
            
            return state
            
        except Exception as e:
            print(f"执行业务事务异常: {e}")
            
            with self.lock:
                self.transaction_cache[transaction_id] = {
                    "state": LocalTransactionState.ROLLBACK_MESSAGE,
                    "timestamp": int(time.time() * 1000),
                    "message": message,
                    "error": str(e)
                }
            
            return LocalTransactionState.ROLLBACK_MESSAGE
    
    def check_local_transaction(self, message: TransactionMessage) -> LocalTransactionState:
        """检查本地事务状态"""
        transaction_id = message.transaction_id
        
        with self.lock:
            cache_record = self.transaction_cache.get(transaction_id)
        
        if cache_record:
            return cache_record["state"]
        
        # 如果缓存中没有,返回未知状态
        return LocalTransactionState.UNKNOWN
    
    def get_transaction_records(self) -> Dict[str, Dict]:
        """获取事务记录"""
        with self.lock:
            return self.transaction_cache.copy()
    
    def cleanup_expired_records(self, expire_time_ms: int = 600000):
        """清理过期记录"""
        current_time = int(time.time() * 1000)
        expired_keys = []
        
        with self.lock:
            for transaction_id, record in self.transaction_cache.items():
                if current_time - record["timestamp"] > expire_time_ms:
                    expired_keys.append(transaction_id)
            
            for key in expired_keys:
                del self.transaction_cache[key]
        
        print(f"清理过期事务记录: {len(expired_keys)} 个")

5.3 延时消息

5.3.1 延时消息实现

# 延时级别枚举
class DelayLevel(Enum):
    LEVEL_1 = 1   # 1s
    LEVEL_2 = 2   # 5s
    LEVEL_3 = 3   # 10s
    LEVEL_4 = 4   # 30s
    LEVEL_5 = 5   # 1m
    LEVEL_6 = 6   # 2m
    LEVEL_7 = 7   # 3m
    LEVEL_8 = 8   # 4m
    LEVEL_9 = 9   # 5m
    LEVEL_10 = 10 # 6m
    LEVEL_11 = 11 # 7m
    LEVEL_12 = 12 # 8m
    LEVEL_13 = 13 # 9m
    LEVEL_14 = 14 # 10m
    LEVEL_15 = 15 # 20m
    LEVEL_16 = 16 # 30m
    LEVEL_17 = 17 # 1h
    LEVEL_18 = 18 # 2h

# 延时消息
@dataclass
class DelayMessage:
    topic: str
    tags: str
    keys: str
    body: bytes
    delay_level: DelayLevel
    properties: Dict[str, str] = None
    
    def __post_init__(self):
        if self.properties is None:
            self.properties = {}
        
        # 设置延时级别
        self.properties["DELAY"] = str(self.delay_level.value)
    
    def get_delay_time_ms(self) -> int:
        """获取延时时间(毫秒)"""
        delay_times = {
            DelayLevel.LEVEL_1: 1000,
            DelayLevel.LEVEL_2: 5000,
            DelayLevel.LEVEL_3: 10000,
            DelayLevel.LEVEL_4: 30000,
            DelayLevel.LEVEL_5: 60000,
            DelayLevel.LEVEL_6: 120000,
            DelayLevel.LEVEL_7: 180000,
            DelayLevel.LEVEL_8: 240000,
            DelayLevel.LEVEL_9: 300000,
            DelayLevel.LEVEL_10: 360000,
            DelayLevel.LEVEL_11: 420000,
            DelayLevel.LEVEL_12: 480000,
            DelayLevel.LEVEL_13: 540000,
            DelayLevel.LEVEL_14: 600000,
            DelayLevel.LEVEL_15: 1200000,
            DelayLevel.LEVEL_16: 1800000,
            DelayLevel.LEVEL_17: 3600000,
            DelayLevel.LEVEL_18: 7200000,
        }
        return delay_times.get(self.delay_level, 1000)

# 延时消息生产者
class DelayMessageProducer:
    """延时消息生产者"""
    
    def __init__(self, producer_config: 'ProducerConfig'):
        self.config = producer_config
        self.producer = None
        self.delay_stats = {
            "total_sent": 0,
            "success_sent": 0,
            "failed_sent": 0,
            "delay_levels": {level.value: 0 for level in DelayLevel}
        }
    
    def start(self):
        """启动延时消息生产者"""
        from rocketmq_producer import DefaultMQProducer
        self.producer = DefaultMQProducer(self.config)
        self.producer.start()
        print("延时消息生产者启动成功")
    
    def shutdown(self):
        """关闭延时消息生产者"""
        if self.producer:
            self.producer.shutdown()
        print("延时消息生产者关闭成功")
    
    def send_delay_message(self, message: DelayMessage) -> 'SendResult':
        """发送延时消息"""
        try:
            # 转换为普通消息
            normal_message = self._convert_to_message(message)
            
            # 发送消息
            result = self.producer.send(normal_message)
            
            # 更新统计
            self.delay_stats["total_sent"] += 1
            self.delay_stats["success_sent"] += 1
            self.delay_stats["delay_levels"][message.delay_level.value] += 1
            
            print(f"发送延时消息成功: {result.msg_id}, 延时级别: {message.delay_level}")
            return result
            
        except Exception as e:
            self.delay_stats["total_sent"] += 1
            self.delay_stats["failed_sent"] += 1
            raise Exception(f"发送延时消息失败: {e}")
    
    def send_delay_message_async(self, message: DelayMessage, 
                               callback: 'SendCallback'):
        """异步发送延时消息"""
        try:
            normal_message = self._convert_to_message(message)
            
            # 包装回调
            wrapped_callback = DelayMessageSendCallback(
                callback, self.delay_stats, message.delay_level
            )
            
            self.producer.send_async(normal_message, wrapped_callback)
            
        except Exception as e:
            self.delay_stats["total_sent"] += 1
            self.delay_stats["failed_sent"] += 1
            callback.on_exception(e)
    
    def send_delay_message_at_time(self, message: DelayMessage, 
                                 deliver_time: int) -> 'SendResult':
        """在指定时间发送延时消息"""
        current_time = int(time.time() * 1000)
        delay_time = deliver_time - current_time
        
        if delay_time <= 0:
            # 立即发送
            normal_message = self._convert_to_message_without_delay(message)
            return self.producer.send(normal_message)
        
        # 计算最接近的延时级别
        delay_level = self._calculate_delay_level(delay_time)
        message.delay_level = delay_level
        
        return self.send_delay_message(message)
    
    def _convert_to_message(self, delay_message: DelayMessage) -> 'Message':
        """转换为普通消息"""
        from rocketmq_producer import Message
        
        message = Message(
            topic=delay_message.topic,
            tags=delay_message.tags,
            keys=delay_message.keys,
            body=delay_message.body
        )
        
        # 设置延时级别
        message.put_property("DELAY", str(delay_message.delay_level.value))
        
        # 复制其他属性
        for key, value in delay_message.properties.items():
            if key != "DELAY":
                message.put_property(key, value)
        
        return message
    
    def _convert_to_message_without_delay(self, delay_message: DelayMessage) -> 'Message':
        """转换为普通消息(不设置延时)"""
        from rocketmq_producer import Message
        
        message = Message(
            topic=delay_message.topic,
            tags=delay_message.tags,
            keys=delay_message.keys,
            body=delay_message.body
        )
        
        # 复制属性(排除延时属性)
        for key, value in delay_message.properties.items():
            if key != "DELAY":
                message.put_property(key, value)
        
        return message
    
    def _calculate_delay_level(self, delay_time_ms: int) -> DelayLevel:
        """计算最接近的延时级别"""
        delay_times = [
            (DelayLevel.LEVEL_1, 1000),
            (DelayLevel.LEVEL_2, 5000),
            (DelayLevel.LEVEL_3, 10000),
            (DelayLevel.LEVEL_4, 30000),
            (DelayLevel.LEVEL_5, 60000),
            (DelayLevel.LEVEL_6, 120000),
            (DelayLevel.LEVEL_7, 180000),
            (DelayLevel.LEVEL_8, 240000),
            (DelayLevel.LEVEL_9, 300000),
            (DelayLevel.LEVEL_10, 360000),
            (DelayLevel.LEVEL_11, 420000),
            (DelayLevel.LEVEL_12, 480000),
            (DelayLevel.LEVEL_13, 540000),
            (DelayLevel.LEVEL_14, 600000),
            (DelayLevel.LEVEL_15, 1200000),
            (DelayLevel.LEVEL_16, 1800000),
            (DelayLevel.LEVEL_17, 3600000),
            (DelayLevel.LEVEL_18, 7200000),
        ]
        
        # 找到最接近的延时级别
        best_level = DelayLevel.LEVEL_1
        min_diff = abs(delay_time_ms - 1000)
        
        for level, time_ms in delay_times:
            diff = abs(delay_time_ms - time_ms)
            if diff < min_diff:
                min_diff = diff
                best_level = level
        
        return best_level
    
    def get_delay_stats(self) -> Dict:
        """获取延时消息统计"""
        return self.delay_stats.copy()

# 延时消息发送回调
class DelayMessageSendCallback:
    """延时消息发送回调"""
    
    def __init__(self, callback: 'SendCallback', stats: Dict, delay_level: DelayLevel):
        self.callback = callback
        self.stats = stats
        self.delay_level = delay_level
    
    def on_success(self, result: 'SendResult'):
        """发送成功回调"""
        self.stats["total_sent"] += 1
        self.stats["success_sent"] += 1
        self.stats["delay_levels"][self.delay_level.value] += 1
        self.callback.on_success(result)
    
    def on_exception(self, exception: Exception):
        """发送异常回调"""
        self.stats["total_sent"] += 1
        self.stats["failed_sent"] += 1
        self.callback.on_exception(exception)

5.3.2 延时消息使用示例

# 延时消息使用示例
class DelayMessageExample:
    """延时消息使用示例"""
    
    def __init__(self):
        # 配置生产者
        self.producer_config = ProducerConfig(
            producer_group="delay_producer_group",
            name_server_addr="localhost:9876"
        )
        
        # 创建延时消息生产者
        self.producer = DelayMessageProducer(self.producer_config)
    
    def start_example(self):
        """启动示例"""
        try:
            # 启动生产者
            self.producer.start()
            
            # 发送不同延时级别的消息
            self._send_various_delay_messages()
            
            # 发送定时消息
            self._send_scheduled_messages()
            
            # 打印统计信息
            self._print_statistics()
            
        finally:
            self.producer.shutdown()
    
    def _send_various_delay_messages(self):
        """发送不同延时级别的消息"""
        delay_levels = [
            DelayLevel.LEVEL_1,   # 1秒
            DelayLevel.LEVEL_3,   # 10秒
            DelayLevel.LEVEL_5,   # 1分钟
            DelayLevel.LEVEL_15,  # 20分钟
            DelayLevel.LEVEL_17   # 1小时
        ]
        
        for i, delay_level in enumerate(delay_levels):
            message = DelayMessage(
                topic="DelayTopic",
                tags="DelayTag",
                keys=f"delay_key_{i}",
                body=f"延时消息内容 {i}, 延时级别: {delay_level.name}".encode('utf-8'),
                delay_level=delay_level
            )
            
            try:
                result = self.producer.send_delay_message(message)
                print(f"发送延时消息成功: {result.msg_id}")
            except Exception as e:
                print(f"发送延时消息失败: {e}")
    
    def _send_scheduled_messages(self):
        """发送定时消息"""
        # 5分钟后发送
        deliver_time = int(time.time() * 1000) + 5 * 60 * 1000
        
        message = DelayMessage(
            topic="DelayTopic",
            tags="ScheduledTag",
            keys="scheduled_key",
            body="定时消息内容".encode('utf-8'),
            delay_level=DelayLevel.LEVEL_1  # 会被重新计算
        )
        
        try:
            result = self.producer.send_delay_message_at_time(message, deliver_time)
            print(f"发送定时消息成功: {result.msg_id}")
        except Exception as e:
            print(f"发送定时消息失败: {e}")
    
    def _print_statistics(self):
        """打印统计信息"""
        stats = self.producer.get_delay_stats()
        print("\n延时消息统计:")
        print(f"总发送数: {stats['total_sent']}")
        print(f"成功发送数: {stats['success_sent']}")
        print(f"失败发送数: {stats['failed_sent']}")
        print("\n各延时级别统计:")
        for level, count in stats['delay_levels'].items():
            if count > 0:
                print(f"  级别 {level}: {count} 条")

# 延时消息消费者示例
class DelayMessageConsumerExample:
    """延时消息消费者示例"""
    
    def __init__(self):
        # 配置消费者
        self.consumer_config = ConsumerConfig(
            consumer_group="delay_consumer_group",
            name_server_addr="localhost:9876"
        )
        
        # 创建消费者
        from rocketmq_consumer import DefaultMQPushConsumer
        self.consumer = DefaultMQPushConsumer(self.consumer_config)
    
    def start_consume(self):
        """开始消费延时消息"""
        try:
            # 注册消息监听器
            listener = DelayMessageListener()
            self.consumer.register_message_listener(listener)
            
            # 订阅主题
            self.consumer.subscribe("DelayTopic", "*")
            
            # 启动消费者
            self.consumer.start()
            
            print("延时消息消费者启动成功,等待消息...")
            
            # 等待消息
            while True:
                time.sleep(10)
                stats = listener.get_consume_stats()
                print(f"消费统计: {stats}")
                
        except KeyboardInterrupt:
            print("收到中断信号,正在关闭消费者...")
        finally:
            self.consumer.shutdown()

# 延时消息监听器
class DelayMessageListener(MessageListenerConcurrently):
    """延时消息监听器"""
    
    def __init__(self):
        self.consume_stats = {
            "total_consumed": 0,
            "success_consumed": 0,
            "failed_consumed": 0,
            "delay_levels": {}
        }
    
    def consume_message(self, messages: List['MessageExt'], 
                      context: 'ConsumeContext') -> 'ConsumeResult':
        """消费延时消息"""
        for msg in messages:
            try:
                # 获取延时级别
                delay_level = msg.get_property("DELAY")
                
                # 计算实际延时时间
                actual_delay = self._calculate_actual_delay(msg)
                
                print(f"收到延时消息:")
                print(f"  消息ID: {msg.msg_id}")
                print(f"  延时级别: {delay_level}")
                print(f"  实际延时: {actual_delay}ms")
                print(f"  消息内容: {msg.body.decode('utf-8')}")
                
                # 更新统计
                self.consume_stats["total_consumed"] += 1
                self.consume_stats["success_consumed"] += 1
                
                if delay_level:
                    level_stats = self.consume_stats["delay_levels"]
                    level_stats[delay_level] = level_stats.get(delay_level, 0) + 1
                
            except Exception as e:
                print(f"处理延时消息异常: {e}")
                self.consume_stats["total_consumed"] += 1
                self.consume_stats["failed_consumed"] += 1
                return ConsumeResult.RECONSUME_LATER
        
        return ConsumeResult.SUCCESS
    
    def _calculate_actual_delay(self, msg: 'MessageExt') -> int:
        """计算实际延时时间"""
        try:
            # 消息存储时间 - 消息产生时间
            return msg.store_timestamp - msg.born_timestamp
        except:
            return 0
    
    def get_consume_stats(self) -> Dict:
        """获取消费统计"""
        return self.consume_stats.copy()

# 使用示例
if __name__ == "__main__":
    # 发送延时消息示例
    producer_example = DelayMessageExample()
    producer_example.start_example()
    
    # 消费延时消息示例
    # consumer_example = DelayMessageConsumerExample()
    # consumer_example.start_consume()

5.4 批量消息

5.4.1 批量消息实现

# 批量消息
@dataclass
class BatchMessage:
    topic: str
    messages: List['Message']
    
    def __post_init__(self):
        if not self.messages:
            raise ValueError("批量消息不能为空")
        
        # 验证所有消息的主题一致
        for msg in self.messages:
            if msg.topic != self.topic:
                raise ValueError(f"消息主题不一致: {msg.topic} != {self.topic}")
    
    def get_total_size(self) -> int:
        """获取批量消息总大小"""
        total_size = 0
        for msg in self.messages:
            total_size += len(msg.body)
            total_size += len(msg.topic.encode('utf-8'))
            total_size += len(msg.tags.encode('utf-8')) if msg.tags else 0
            total_size += len(msg.keys.encode('utf-8')) if msg.keys else 0
            
            # 属性大小
            for key, value in msg.properties.items():
                total_size += len(key.encode('utf-8'))
                total_size += len(value.encode('utf-8'))
        
        return total_size
    
    def get_message_count(self) -> int:
        """获取消息数量"""
        return len(self.messages)

# 批量消息生产者
class BatchMessageProducer:
    """批量消息生产者"""
    
    def __init__(self, producer_config: 'ProducerConfig'):
        self.config = producer_config
        self.producer = None
        self.max_batch_size = 1024 * 1024  # 1MB
        self.max_batch_count = 1000
        self.batch_stats = {
            "total_batches": 0,
            "total_messages": 0,
            "success_batches": 0,
            "failed_batches": 0,
            "avg_batch_size": 0
        }
        self.batch_sizes = []
    
    def start(self):
        """启动批量消息生产者"""
        from rocketmq_producer import DefaultMQProducer
        self.producer = DefaultMQProducer(self.config)
        self.producer.start()
        print("批量消息生产者启动成功")
    
    def shutdown(self):
        """关闭批量消息生产者"""
        if self.producer:
            self.producer.shutdown()
        print("批量消息生产者关闭成功")
    
    def send_batch(self, batch_message: BatchMessage) -> 'SendResult':
        """发送批量消息"""
        try:
            # 验证批量消息
            self._validate_batch_message(batch_message)
            
            # 如果批量过大,分割发送
            if self._need_split_batch(batch_message):
                return self._send_split_batch(batch_message)
            
            # 发送单个批量
            result = self._send_single_batch(batch_message)
            
            # 更新统计
            self._update_batch_stats(batch_message, True)
            
            return result
            
        except Exception as e:
            self._update_batch_stats(batch_message, False)
            raise Exception(f"发送批量消息失败: {e}")
    
    def send_batch_async(self, batch_message: BatchMessage, 
                        callback: 'BatchSendCallback'):
        """异步发送批量消息"""
        try:
            self._validate_batch_message(batch_message)
            
            if self._need_split_batch(batch_message):
                self._send_split_batch_async(batch_message, callback)
            else:
                self._send_single_batch_async(batch_message, callback)
                
        except Exception as e:
            self._update_batch_stats(batch_message, False)
            callback.on_exception(e)
    
    def _validate_batch_message(self, batch_message: BatchMessage):
        """验证批量消息"""
        if not batch_message.messages:
            raise ValueError("批量消息不能为空")
        
        if len(batch_message.messages) > self.max_batch_count:
            raise ValueError(f"批量消息数量超限: {len(batch_message.messages)} > {self.max_batch_count}")
        
        # 检查消息是否包含延时或事务属性
        for msg in batch_message.messages:
            if msg.get_property("DELAY") or msg.get_property("TRAN_MSG"):
                raise ValueError("批量消息不支持延时或事务消息")
    
    def _need_split_batch(self, batch_message: BatchMessage) -> bool:
        """判断是否需要分割批量"""
        return batch_message.get_total_size() > self.max_batch_size
    
    def _send_single_batch(self, batch_message: BatchMessage) -> 'SendResult':
        """发送单个批量消息"""
        # 将批量消息编码为单个消息
        encoded_message = self._encode_batch_message(batch_message)
        
        # 发送到Broker
        result = self.producer.send(encoded_message)
        
        print(f"发送批量消息成功: {result.msg_id}, 消息数量: {batch_message.get_message_count()}")
        return result
    
    def _send_single_batch_async(self, batch_message: BatchMessage, 
                               callback: 'BatchSendCallback'):
        """异步发送单个批量消息"""
        encoded_message = self._encode_batch_message(batch_message)
        
        # 包装回调
        wrapped_callback = BatchSendCallbackWrapper(
            callback, self.batch_stats, batch_message
        )
        
        self.producer.send_async(encoded_message, wrapped_callback)
    
    def _send_split_batch(self, batch_message: BatchMessage) -> 'SendResult':
        """分割发送批量消息"""
        split_batches = self._split_batch_message(batch_message)
        results = []
        
        for split_batch in split_batches:
            result = self._send_single_batch(split_batch)
            results.append(result)
        
        # 返回第一个结果(可以考虑返回聚合结果)
        return results[0] if results else None
    
    def _send_split_batch_async(self, batch_message: BatchMessage, 
                              callback: 'BatchSendCallback'):
        """异步分割发送批量消息"""
        split_batches = self._split_batch_message(batch_message)
        
        # 创建聚合回调
        aggregated_callback = AggregatedBatchSendCallback(
            callback, len(split_batches)
        )
        
        for split_batch in split_batches:
            self._send_single_batch_async(split_batch, aggregated_callback)
    
    def _split_batch_message(self, batch_message: BatchMessage) -> List[BatchMessage]:
        """分割批量消息"""
        split_batches = []
        current_batch = []
        current_size = 0
        
        for msg in batch_message.messages:
            msg_size = self._calculate_message_size(msg)
            
            if (current_size + msg_size > self.max_batch_size or 
                len(current_batch) >= self.max_batch_count) and current_batch:
                # 创建新的批量
                split_batches.append(BatchMessage(
                    topic=batch_message.topic,
                    messages=current_batch.copy()
                ))
                current_batch = []
                current_size = 0
            
            current_batch.append(msg)
            current_size += msg_size
        
        # 添加最后一个批量
        if current_batch:
            split_batches.append(BatchMessage(
                topic=batch_message.topic,
                messages=current_batch
            ))
        
        return split_batches
    
    def _encode_batch_message(self, batch_message: BatchMessage) -> 'Message':
        """编码批量消息"""
        from rocketmq_producer import Message
        import json
        
        # 将批量消息序列化
        batch_data = {
            "topic": batch_message.topic,
            "messages": []
        }
        
        for msg in batch_message.messages:
            msg_data = {
                "tags": msg.tags,
                "keys": msg.keys,
                "body": msg.body.decode('utf-8') if isinstance(msg.body, bytes) else msg.body,
                "properties": msg.properties
            }
            batch_data["messages"].append(msg_data)
        
        # 创建编码后的消息
        encoded_message = Message(
            topic=batch_message.topic,
            tags="BATCH",
            keys=f"batch_{int(time.time() * 1000)}",
            body=json.dumps(batch_data).encode('utf-8')
        )
        
        # 设置批量标识
        encoded_message.put_property("BATCH_MESSAGE", "true")
        encoded_message.put_property("BATCH_SIZE", str(batch_message.get_message_count()))
        
        return encoded_message
    
    def _calculate_message_size(self, message: 'Message') -> int:
        """计算单个消息大小"""
        size = len(message.body)
        size += len(message.topic.encode('utf-8'))
        size += len(message.tags.encode('utf-8')) if message.tags else 0
        size += len(message.keys.encode('utf-8')) if message.keys else 0
        
        for key, value in message.properties.items():
            size += len(key.encode('utf-8'))
            size += len(value.encode('utf-8'))
        
        return size
    
    def _update_batch_stats(self, batch_message: BatchMessage, success: bool):
        """更新批量统计"""
        self.batch_stats["total_batches"] += 1
        self.batch_stats["total_messages"] += batch_message.get_message_count()
        
        if success:
            self.batch_stats["success_batches"] += 1
        else:
            self.batch_stats["failed_batches"] += 1
        
        # 记录批量大小
        batch_size = batch_message.get_total_size()
        self.batch_sizes.append(batch_size)
        
        # 计算平均批量大小
        if self.batch_sizes:
            self.batch_stats["avg_batch_size"] = sum(self.batch_sizes) // len(self.batch_sizes)
    
    def get_batch_stats(self) -> Dict:
        """获取批量统计"""
        return self.batch_stats.copy()
    
    def set_max_batch_size(self, max_size: int):
        """设置最大批量大小"""
        self.max_batch_size = max_size
    
    def set_max_batch_count(self, max_count: int):
        """设置最大批量数量"""
        self.max_batch_count = max_count

# 批量发送回调接口
class BatchSendCallback(ABC):
    """批量发送回调接口"""
    
    @abstractmethod
    def on_success(self, result: 'SendResult', batch_message: BatchMessage):
        """批量发送成功回调"""
        pass
    
    @abstractmethod
    def on_exception(self, exception: Exception):
        """批量发送异常回调"""
        pass

# 批量发送回调包装器
class BatchSendCallbackWrapper:
    """批量发送回调包装器"""
    
    def __init__(self, callback: BatchSendCallback, stats: Dict, batch_message: BatchMessage):
        self.callback = callback
        self.stats = stats
        self.batch_message = batch_message
    
    def on_success(self, result: 'SendResult'):
        """发送成功回调"""
        # 更新统计
        self.stats["total_batches"] += 1
        self.stats["success_batches"] += 1
        self.stats["total_messages"] += self.batch_message.get_message_count()
        
        self.callback.on_success(result, self.batch_message)
    
    def on_exception(self, exception: Exception):
        """发送异常回调"""
        # 更新统计
        self.stats["total_batches"] += 1
        self.stats["failed_batches"] += 1
        self.stats["total_messages"] += self.batch_message.get_message_count()
        
        self.callback.on_exception(exception)

# 聚合批量发送回调
class AggregatedBatchSendCallback(BatchSendCallback):
    """聚合批量发送回调"""
    
    def __init__(self, original_callback: BatchSendCallback, total_batches: int):
        self.original_callback = original_callback
        self.total_batches = total_batches
        self.completed_batches = 0
        self.success_results = []
        self.exceptions = []
        self.lock = threading.Lock()
    
    def on_success(self, result: 'SendResult', batch_message: BatchMessage):
        """批量发送成功回调"""
        with self.lock:
            self.completed_batches += 1
            self.success_results.append((result, batch_message))
            
            if self.completed_batches == self.total_batches:
                # 所有批量都完成了
                if self.exceptions:
                    # 有异常发生
                    self.original_callback.on_exception(self.exceptions[0])
                else:
                    # 全部成功
                    first_result, first_batch = self.success_results[0]
                    self.original_callback.on_success(first_result, first_batch)
    
    def on_exception(self, exception: Exception):
        """批量发送异常回调"""
        with self.lock:
            self.completed_batches += 1
            self.exceptions.append(exception)
            
            if self.completed_batches == self.total_batches:
                 # 所有批量都完成了
                 self.original_callback.on_exception(self.exceptions[0])

### 5.4.2 批量消息消费

```python
# 批量消息消费者
class BatchMessageConsumer:
    """批量消息消费者"""
    
    def __init__(self, consumer_config: 'ConsumerConfig'):
        self.config = consumer_config
        self.consumer = None
        self.batch_listener: Optional['BatchMessageListener'] = None
        self.consume_stats = {
            "total_batches": 0,
            "total_messages": 0,
            "success_batches": 0,
            "failed_batches": 0
        }
    
    def start(self):
        """启动批量消息消费者"""
        if not self.batch_listener:
            raise Exception("未设置批量消息监听器")
        
        from rocketmq_consumer import DefaultMQPushConsumer
        self.consumer = DefaultMQPushConsumer(self.config)
        
        # 注册批量消息监听器
        wrapped_listener = BatchMessageListenerWrapper(
            self.batch_listener, self.consume_stats
        )
        self.consumer.register_message_listener(wrapped_listener)
        
        self.consumer.start()
        print("批量消息消费者启动成功")
    
    def shutdown(self):
        """关闭批量消息消费者"""
        if self.consumer:
            self.consumer.shutdown()
        print("批量消息消费者关闭成功")
    
    def subscribe(self, topic: str, sub_expression: str = "*"):
        """订阅主题"""
        if self.consumer:
            self.consumer.subscribe(topic, sub_expression)
    
    def register_batch_listener(self, listener: 'BatchMessageListener'):
        """注册批量消息监听器"""
        self.batch_listener = listener
    
    def get_consume_stats(self) -> Dict[str, int]:
        """获取消费统计"""
        return self.consume_stats.copy()

# 批量消息监听器接口
class BatchMessageListener(ABC):
    """批量消息监听器接口"""
    
    @abstractmethod
    def consume_batch_message(self, batch_messages: List['MessageExt']) -> 'ConsumeResult':
        """消费批量消息"""
        pass

# 批量消息监听器包装器
class BatchMessageListenerWrapper:
    """批量消息监听器包装器"""
    
    def __init__(self, batch_listener: BatchMessageListener, stats: Dict[str, int]):
        self.batch_listener = batch_listener
        self.stats = stats
    
    def consume_message(self, messages: List['MessageExt'], 
                      context: 'ConsumeContext') -> 'ConsumeResult':
        """消费消息(适配器方法)"""
        try:
            # 检查是否为批量消息
            batch_messages = []
            individual_messages = []
            
            for msg in messages:
                if msg.get_property("BATCH_MESSAGE") == "true":
                    # 解码批量消息
                    decoded_messages = self._decode_batch_message(msg)
                    batch_messages.extend(decoded_messages)
                else:
                    individual_messages.append(msg)
            
            # 处理批量消息
            if batch_messages:
                result = self.batch_listener.consume_batch_message(batch_messages)
                self._update_batch_stats(len(batch_messages), result == ConsumeResult.SUCCESS)
                return result
            
            # 处理单个消息
            if individual_messages:
                result = self.batch_listener.consume_batch_message(individual_messages)
                self._update_individual_stats(len(individual_messages), result == ConsumeResult.SUCCESS)
                return result
            
            return ConsumeResult.SUCCESS
            
        except Exception as e:
            print(f"批量消费异常: {e}")
            self._update_batch_stats(len(messages), False)
            return ConsumeResult.RECONSUME_LATER
    
    def _decode_batch_message(self, batch_msg: 'MessageExt') -> List['MessageExt']:
        """解码批量消息"""
        import json
        
        try:
            # 解析批量消息数据
            batch_data = json.loads(batch_msg.body.decode('utf-8'))
            decoded_messages = []
            
            for msg_data in batch_data["messages"]:
                # 创建消息扩展对象
                decoded_msg = MessageExt(
                    topic=batch_data["topic"],
                    tags=msg_data["tags"],
                    keys=msg_data["keys"],
                    body=msg_data["body"].encode('utf-8'),
                    msg_id=f"{batch_msg.msg_id}_{len(decoded_messages)}",
                    queue_id=batch_msg.queue_id,
                    queue_offset=batch_msg.queue_offset,
                    born_timestamp=batch_msg.born_timestamp,
                    store_timestamp=batch_msg.store_timestamp
                )
                
                # 设置属性
                for key, value in msg_data["properties"].items():
                    decoded_msg.put_property(key, value)
                
                decoded_messages.append(decoded_msg)
            
            return decoded_messages
            
        except Exception as e:
            print(f"解码批量消息失败: {e}")
            return []
    
    def _update_batch_stats(self, message_count: int, success: bool):
        """更新批量统计"""
        self.stats["total_batches"] += 1
        self.stats["total_messages"] += message_count
        
        if success:
            self.stats["success_batches"] += 1
        else:
            self.stats["failed_batches"] += 1
    
    def _update_individual_stats(self, message_count: int, success: bool):
        """更新单个消息统计"""
        self.stats["total_messages"] += message_count
        
        if success:
            self.stats["success_batches"] += 1
        else:
            self.stats["failed_batches"] += 1

# 业务批量消息监听器
class BusinessBatchMessageListener(BatchMessageListener):
    """业务批量消息监听器"""
    
    def __init__(self, batch_processor: Callable[[List['MessageExt']], bool]):
        self.batch_processor = batch_processor
        self.processed_count = 0
        self.failed_count = 0
    
    def consume_batch_message(self, batch_messages: List['MessageExt']) -> 'ConsumeResult':
        """消费批量消息"""
        try:
            print(f"收到批量消息: {len(batch_messages)} 条")
            
            # 处理批量消息
            success = self.batch_processor(batch_messages)
            
            if success:
                self.processed_count += len(batch_messages)
                print(f"批量消息处理成功: {len(batch_messages)} 条")
                return ConsumeResult.SUCCESS
            else:
                self.failed_count += len(batch_messages)
                print(f"批量消息处理失败: {len(batch_messages)} 条")
                return ConsumeResult.RECONSUME_LATER
                
        except Exception as e:
            print(f"批量消息处理异常: {e}")
            self.failed_count += len(batch_messages)
            return ConsumeResult.RECONSUME_LATER
    
    def get_process_stats(self) -> Dict[str, int]:
        """获取处理统计"""
        return {
            "processed_count": self.processed_count,
            "failed_count": self.failed_count,
            "total_count": self.processed_count + self.failed_count
        }

5.4.3 批量消息使用示例

# 批量消息使用示例
class BatchMessageExample:
    """批量消息使用示例"""
    
    def __init__(self):
        # 配置生产者
        self.producer_config = ProducerConfig(
            producer_group="batch_producer_group",
            name_server_addr="localhost:9876"
        )
        
        # 配置消费者
        self.consumer_config = ConsumerConfig(
            consumer_group="batch_consumer_group",
            name_server_addr="localhost:9876"
        )
        
        # 创建生产者和消费者
        self.producer = BatchMessageProducer(self.producer_config)
        self.consumer = BatchMessageConsumer(self.consumer_config)
    
    def start_producer_example(self):
        """启动生产者示例"""
        try:
            self.producer.start()
            
            # 发送批量消息
            self._send_batch_messages()
            
            # 发送大批量消息(会自动分割)
            self._send_large_batch_messages()
            
            # 异步发送批量消息
            self._send_batch_messages_async()
            
            # 打印统计信息
            self._print_producer_stats()
            
        finally:
            self.producer.shutdown()
    
    def start_consumer_example(self):
        """启动消费者示例"""
        try:
            # 注册批量消息监听器
            listener = BusinessBatchMessageListener(self._process_batch_messages)
            self.consumer.register_batch_listener(listener)
            
            # 订阅主题
            self.consumer.subscribe("BatchTopic", "*")
            
            # 启动消费者
            self.consumer.start()
            
            print("批量消息消费者启动成功,等待消息...")
            
            # 等待消息
            while True:
                time.sleep(10)
                stats = self.consumer.get_consume_stats()
                process_stats = listener.get_process_stats()
                print(f"消费统计: {stats}")
                print(f"处理统计: {process_stats}")
                
        except KeyboardInterrupt:
            print("收到中断信号,正在关闭消费者...")
        finally:
            self.consumer.shutdown()
    
    def _send_batch_messages(self):
        """发送批量消息"""
        from rocketmq_producer import Message
        
        # 创建批量消息
        messages = []
        for i in range(10):
            message = Message(
                topic="BatchTopic",
                tags="BatchTag",
                keys=f"batch_key_{i}",
                body=f"批量消息内容 {i}".encode('utf-8')
            )
            messages.append(message)
        
        batch_message = BatchMessage(
            topic="BatchTopic",
            messages=messages
        )
        
        try:
            result = self.producer.send_batch(batch_message)
            print(f"发送批量消息成功: {result.msg_id}")
        except Exception as e:
            print(f"发送批量消息失败: {e}")
    
    def _send_large_batch_messages(self):
        """发送大批量消息(会自动分割)"""
        from rocketmq_producer import Message
        
        # 创建大批量消息
        messages = []
        for i in range(1500):  # 超过最大批量数量
            message = Message(
                topic="BatchTopic",
                tags="LargeBatchTag",
                keys=f"large_batch_key_{i}",
                body=f"大批量消息内容 {i} - {'X' * 1000}".encode('utf-8')  # 增加消息大小
            )
            messages.append(message)
        
        batch_message = BatchMessage(
            topic="BatchTopic",
            messages=messages
        )
        
        try:
            result = self.producer.send_batch(batch_message)
            print(f"发送大批量消息成功: {result.msg_id}")
        except Exception as e:
            print(f"发送大批量消息失败: {e}")
    
    def _send_batch_messages_async(self):
        """异步发送批量消息"""
        from rocketmq_producer import Message
        
        # 创建批量消息
        messages = []
        for i in range(5):
            message = Message(
                topic="BatchTopic",
                tags="AsyncBatchTag",
                keys=f"async_batch_key_{i}",
                body=f"异步批量消息内容 {i}".encode('utf-8')
            )
            messages.append(message)
        
        batch_message = BatchMessage(
            topic="BatchTopic",
            messages=messages
        )
        
        # 创建回调
        callback = AsyncBatchSendCallback()
        
        try:
            self.producer.send_batch_async(batch_message, callback)
            print("异步发送批量消息请求已提交")
            
            # 等待回调完成
            time.sleep(2)
            
        except Exception as e:
            print(f"异步发送批量消息失败: {e}")
    
    def _process_batch_messages(self, messages: List['MessageExt']) -> bool:
        """处理批量消息"""
        try:
            print(f"\n处理批量消息: {len(messages)} 条")
            
            for i, msg in enumerate(messages):
                print(f"  消息 {i+1}:")
                print(f"    ID: {msg.msg_id}")
                print(f"    Tags: {msg.tags}")
                print(f"    Keys: {msg.keys}")
                print(f"    内容: {msg.body.decode('utf-8')[:50]}...")
            
            # 模拟批量处理
            time.sleep(0.1 * len(messages))
            
            print(f"批量消息处理完成: {len(messages)} 条")
            return True
            
        except Exception as e:
            print(f"批量消息处理异常: {e}")
            return False
    
    def _print_producer_stats(self):
        """打印生产者统计信息"""
        stats = self.producer.get_batch_stats()
        print("\n批量消息生产者统计:")
        print(f"总批量数: {stats['total_batches']}")
        print(f"总消息数: {stats['total_messages']}")
        print(f"成功批量数: {stats['success_batches']}")
        print(f"失败批量数: {stats['failed_batches']}")
        print(f"平均批量大小: {stats['avg_batch_size']} 字节")

# 异步批量发送回调
class AsyncBatchSendCallback(BatchSendCallback):
    """异步批量发送回调"""
    
    def on_success(self, result: 'SendResult', batch_message: BatchMessage):
        """批量发送成功回调"""
        print(f"异步批量发送成功: {result.msg_id}, 消息数量: {batch_message.get_message_count()}")
    
    def on_exception(self, exception: Exception):
        """批量发送异常回调"""
        print(f"异步批量发送失败: {exception}")

# 使用示例
if __name__ == "__main__":
    example = BatchMessageExample()
    
    # 发送批量消息示例
    example.start_producer_example()
    
    # 消费批量消息示例
     # example.start_consumer_example()

5.5 消息过滤

5.5.1 标签过滤

# 标签过滤枚举
class FilterType(Enum):
    """过滤类型枚举"""
    TAG = "TAG"          # 标签过滤
    SQL92 = "SQL92"      # SQL92过滤
    
# 消息过滤器接口
class MessageFilter(ABC):
    """消息过滤器接口"""
    
    @abstractmethod
    def match(self, message: 'MessageExt') -> bool:
        """检查消息是否匹配过滤条件"""
        pass
    
    @abstractmethod
    def get_filter_expression(self) -> str:
        """获取过滤表达式"""
        pass
    
    @abstractmethod
    def get_filter_type(self) -> FilterType:
        """获取过滤类型"""
        pass

# 标签过滤器
class TagFilter(MessageFilter):
    """标签过滤器"""
    
    def __init__(self, tag_expression: str):
        self.tag_expression = tag_expression
        self.tags = self._parse_tag_expression(tag_expression)
    
    def match(self, message: 'MessageExt') -> bool:
        """检查消息标签是否匹配"""
        if not message.tags:
            return "*" in self.tags
        
        # 支持通配符
        if "*" in self.tags:
            return True
        
        # 精确匹配
        return message.tags in self.tags
    
    def get_filter_expression(self) -> str:
        """获取过滤表达式"""
        return self.tag_expression
    
    def get_filter_type(self) -> FilterType:
        """获取过滤类型"""
        return FilterType.TAG
    
    def _parse_tag_expression(self, expression: str) -> Set[str]:
        """解析标签表达式"""
        if not expression or expression == "*":
            return {"*"}
        
        # 支持多个标签,用 || 分隔
        tags = set()
        for tag in expression.split("||"):
            tag = tag.strip()
            if tag:
                tags.add(tag)
        
        return tags if tags else {"*"}

# SQL92过滤器
class SQL92Filter(MessageFilter):
    """SQL92过滤器"""
    
    def __init__(self, sql_expression: str):
        self.sql_expression = sql_expression
        self.compiled_expression = self._compile_expression(sql_expression)
    
    def match(self, message: 'MessageExt') -> bool:
        """检查消息是否匹配SQL表达式"""
        try:
            # 构建消息上下文
            context = self._build_message_context(message)
            
            # 执行SQL表达式
            return self._evaluate_expression(self.compiled_expression, context)
            
        except Exception as e:
            print(f"SQL过滤器执行异常: {e}")
            return False
    
    def get_filter_expression(self) -> str:
        """获取过滤表达式"""
        return self.sql_expression
    
    def get_filter_type(self) -> FilterType:
        """获取过滤类型"""
        return FilterType.SQL92
    
    def _compile_expression(self, expression: str) -> Dict[str, Any]:
        """编译SQL表达式"""
        # 简化的SQL解析器
        import re
        
        compiled = {
            "original": expression,
            "conditions": []
        }
        
        # 解析WHERE条件
        where_match = re.search(r'WHERE\s+(.+)', expression, re.IGNORECASE)
        if where_match:
            where_clause = where_match.group(1)
            
            # 解析条件(简化版本)
            conditions = self._parse_where_clause(where_clause)
            compiled["conditions"] = conditions
        
        return compiled
    
    def _parse_where_clause(self, where_clause: str) -> List[Dict[str, Any]]:
        """解析WHERE子句"""
        import re
        
        conditions = []
        
        # 支持的操作符
        operators = ['=', '!=', '>', '<', '>=', '<=', 'LIKE', 'IN']
        
        # 简单的条件解析
        for op in operators:
            pattern = rf'(\w+)\s*{re.escape(op)}\s*([\'\"]?[^\s\'\"]+[\'\"]?)'
            matches = re.findall(pattern, where_clause, re.IGNORECASE)
            
            for match in matches:
                field, value = match
                conditions.append({
                    "field": field,
                    "operator": op,
                    "value": value.strip('\"\'')
                })
        
        return conditions
    
    def _build_message_context(self, message: 'MessageExt') -> Dict[str, Any]:
        """构建消息上下文"""
        context = {
            "tags": message.tags or "",
            "keys": message.keys or "",
            "msgId": message.msg_id,
            "topic": message.topic,
            "queueId": message.queue_id,
            "bornTimestamp": message.born_timestamp,
            "storeTimestamp": message.store_timestamp
        }
        
        # 添加用户属性
        if hasattr(message, 'properties') and message.properties:
            for key, value in message.properties.items():
                context[key] = value
        
        return context
    
    def _evaluate_expression(self, compiled_expr: Dict[str, Any], 
                           context: Dict[str, Any]) -> bool:
        """评估表达式"""
        conditions = compiled_expr.get("conditions", [])
        
        if not conditions:
            return True
        
        # 简单的AND逻辑(实际应该支持更复杂的逻辑)
        for condition in conditions:
            if not self._evaluate_condition(condition, context):
                return False
        
        return True
    
    def _evaluate_condition(self, condition: Dict[str, Any], 
                          context: Dict[str, Any]) -> bool:
        """评估单个条件"""
        field = condition["field"]
        operator = condition["operator"]
        expected_value = condition["value"]
        
        actual_value = context.get(field, "")
        
        try:
            if operator == "=":
                return str(actual_value) == str(expected_value)
            elif operator == "!=":
                return str(actual_value) != str(expected_value)
            elif operator == ">":
                return float(actual_value) > float(expected_value)
            elif operator == "<":
                return float(actual_value) < float(expected_value)
            elif operator == ">=":
                return float(actual_value) >= float(expected_value)
            elif operator == "<=":
                return float(actual_value) <= float(expected_value)
            elif operator.upper() == "LIKE":
                import re
                pattern = expected_value.replace("%", ".*")
                return bool(re.match(pattern, str(actual_value)))
            elif operator.upper() == "IN":
                values = expected_value.strip("()").split(",")
                values = [v.strip().strip("'\"") for v in values]
                return str(actual_value) in values
            else:
                return False
                
        except (ValueError, TypeError):
            return False

# 过滤消息生产者
class FilterMessageProducer:
    """支持过滤的消息生产者"""
    
    def __init__(self, producer_config: 'ProducerConfig'):
        self.config = producer_config
        self.producer = None
        self.send_stats = {
            "total_sent": 0,
            "success_sent": 0,
            "failed_sent": 0
        }
    
    def start(self):
        """启动生产者"""
        from rocketmq_producer import DefaultMQProducer
        self.producer = DefaultMQProducer(self.config)
        self.producer.start()
        print("过滤消息生产者启动成功")
    
    def shutdown(self):
        """关闭生产者"""
        if self.producer:
            self.producer.shutdown()
        print("过滤消息生产者关闭成功")
    
    def send_message_with_tags(self, topic: str, tags: str, keys: str, 
                              body: str, properties: Dict[str, str] = None) -> 'SendResult':
        """发送带标签的消息"""
        from rocketmq_producer import Message
        
        message = Message(
            topic=topic,
            tags=tags,
            keys=keys,
            body=body.encode('utf-8')
        )
        
        # 设置用户属性
        if properties:
            for key, value in properties.items():
                message.put_property(key, value)
        
        try:
            result = self.producer.send(message)
            self.send_stats["total_sent"] += 1
            self.send_stats["success_sent"] += 1
            return result
        except Exception as e:
            self.send_stats["total_sent"] += 1
            self.send_stats["failed_sent"] += 1
            raise e
    
    def send_messages_batch_with_different_tags(self, topic: str, 
                                              message_data: List[Dict[str, Any]]) -> List['SendResult']:
        """批量发送不同标签的消息"""
        results = []
        
        for data in message_data:
            try:
                result = self.send_message_with_tags(
                    topic=topic,
                    tags=data.get("tags", ""),
                    keys=data.get("keys", ""),
                    body=data.get("body", ""),
                    properties=data.get("properties", {})
                )
                results.append(result)
            except Exception as e:
                print(f"发送消息失败: {e}")
                results.append(None)
        
        return results
    
    def get_send_stats(self) -> Dict[str, int]:
        """获取发送统计"""
        return self.send_stats.copy()

# 过滤消息消费者
class FilterMessageConsumer:
    """支持过滤的消息消费者"""
    
    def __init__(self, consumer_config: 'ConsumerConfig'):
        self.config = consumer_config
        self.consumer = None
        self.message_filter: Optional[MessageFilter] = None
        self.filter_stats = {
            "total_received": 0,
            "filtered_out": 0,
            "processed": 0
        }
    
    def start(self):
        """启动消费者"""
        from rocketmq_consumer import DefaultMQPushConsumer
        self.consumer = DefaultMQPushConsumer(self.config)
        
        # 注册过滤消息监听器
        listener = FilterMessageListener(self.message_filter, self.filter_stats)
        self.consumer.register_message_listener(listener)
        
        self.consumer.start()
        print("过滤消息消费者启动成功")
    
    def shutdown(self):
        """关闭消费者"""
        if self.consumer:
            self.consumer.shutdown()
        print("过滤消息消费者关闭成功")
    
    def subscribe_with_tag_filter(self, topic: str, tag_expression: str):
        """使用标签过滤订阅主题"""
        self.message_filter = TagFilter(tag_expression)
        if self.consumer:
            self.consumer.subscribe(topic, tag_expression)
    
    def subscribe_with_sql_filter(self, topic: str, sql_expression: str):
        """使用SQL过滤订阅主题"""
        self.message_filter = SQL92Filter(sql_expression)
        if self.consumer:
            # 注意:实际的RocketMQ需要在Broker端支持SQL过滤
            self.consumer.subscribe(topic, "*")  # 客户端过滤
    
    def get_filter_stats(self) -> Dict[str, int]:
        """获取过滤统计"""
        return self.filter_stats.copy()

# 过滤消息监听器
class FilterMessageListener:
    """过滤消息监听器"""
    
    def __init__(self, message_filter: Optional[MessageFilter], 
                 stats: Dict[str, int]):
        self.message_filter = message_filter
        self.stats = stats
    
    def consume_message(self, messages: List['MessageExt'], 
                      context: 'ConsumeContext') -> 'ConsumeResult':
        """消费消息"""
        try:
            filtered_messages = []
            
            for message in messages:
                self.stats["total_received"] += 1
                
                # 应用过滤器
                if self.message_filter and not self.message_filter.match(message):
                    self.stats["filtered_out"] += 1
                    continue
                
                filtered_messages.append(message)
            
            # 处理过滤后的消息
            if filtered_messages:
                result = self._process_filtered_messages(filtered_messages)
                if result == ConsumeResult.SUCCESS:
                    self.stats["processed"] += len(filtered_messages)
                return result
            
            return ConsumeResult.SUCCESS
            
        except Exception as e:
            print(f"过滤消息处理异常: {e}")
            return ConsumeResult.RECONSUME_LATER
    
    def _process_filtered_messages(self, messages: List['MessageExt']) -> 'ConsumeResult':
        """处理过滤后的消息"""
        try:
            for message in messages:
                print(f"处理过滤消息:")
                print(f"  ID: {message.msg_id}")
                print(f"  Topic: {message.topic}")
                print(f"  Tags: {message.tags}")
                print(f"  Keys: {message.keys}")
                print(f"  内容: {message.body.decode('utf-8')[:100]}...")
                
                # 打印用户属性
                if hasattr(message, 'properties') and message.properties:
                    print(f"  属性: {message.properties}")
                
                print()
            
            return ConsumeResult.SUCCESS
            
        except Exception as e:
            print(f"处理过滤消息异常: {e}")
            return ConsumeResult.RECONSUME_LATER

5.5.2 消息过滤使用示例

# 消息过滤使用示例
class MessageFilterExample:
    """消息过滤使用示例"""
    
    def __init__(self):
        # 配置生产者
        self.producer_config = ProducerConfig(
            producer_group="filter_producer_group",
            name_server_addr="localhost:9876"
        )
        
        # 配置消费者
        self.consumer_config = ConsumerConfig(
            consumer_group="filter_consumer_group",
            name_server_addr="localhost:9876"
        )
        
        # 创建生产者和消费者
        self.producer = FilterMessageProducer(self.producer_config)
        self.consumer = FilterMessageConsumer(self.consumer_config)
    
    def start_producer_example(self):
        """启动生产者示例"""
        try:
            self.producer.start()
            
            # 发送不同标签的消息
            self._send_messages_with_different_tags()
            
            # 发送带属性的消息
            self._send_messages_with_properties()
            
            # 打印发送统计
            stats = self.producer.get_send_stats()
            print(f"\n发送统计: {stats}")
            
        finally:
            self.producer.shutdown()
    
    def start_tag_filter_consumer_example(self):
        """启动标签过滤消费者示例"""
        try:
            # 订阅特定标签的消息
            self.consumer.subscribe_with_tag_filter("FilterTopic", "VIP || IMPORTANT")
            
            # 启动消费者
            self.consumer.start()
            
            print("标签过滤消费者启动成功,等待消息...")
            
            # 等待消息
            while True:
                time.sleep(10)
                stats = self.consumer.get_filter_stats()
                print(f"过滤统计: {stats}")
                
        except KeyboardInterrupt:
            print("收到中断信号,正在关闭消费者...")
        finally:
            self.consumer.shutdown()
    
    def start_sql_filter_consumer_example(self):
        """启动SQL过滤消费者示例"""
        try:
            # 使用SQL表达式过滤消息
            sql_expression = "WHERE level > 5 AND category = 'order'"
            self.consumer.subscribe_with_sql_filter("FilterTopic", sql_expression)
            
            # 启动消费者
            self.consumer.start()
            
            print("SQL过滤消费者启动成功,等待消息...")
            
            # 等待消息
            while True:
                time.sleep(10)
                stats = self.consumer.get_filter_stats()
                print(f"过滤统计: {stats}")
                
        except KeyboardInterrupt:
            print("收到中断信号,正在关闭消费者...")
        finally:
            self.consumer.shutdown()
    
    def _send_messages_with_different_tags(self):
        """发送不同标签的消息"""
        message_data = [
            {
                "tags": "VIP",
                "keys": "vip_user_001",
                "body": "VIP用户订单消息",
                "properties": {"level": "10", "category": "order"}
            },
            {
                "tags": "NORMAL",
                "keys": "normal_user_001",
                "body": "普通用户订单消息",
                "properties": {"level": "3", "category": "order"}
            },
            {
                "tags": "IMPORTANT",
                "keys": "important_001",
                "body": "重要系统消息",
                "properties": {"level": "8", "category": "system"}
            },
            {
                "tags": "DEBUG",
                "keys": "debug_001",
                "body": "调试消息",
                "properties": {"level": "1", "category": "debug"}
            }
        ]
        
        results = self.producer.send_messages_batch_with_different_tags(
            "FilterTopic", message_data
        )
        
        for i, result in enumerate(results):
            if result:
                print(f"发送消息 {i+1} 成功: {result.msg_id}")
            else:
                print(f"发送消息 {i+1} 失败")
    
    def _send_messages_with_properties(self):
        """发送带属性的消息"""
        # 发送高级别订单消息
        self.producer.send_message_with_tags(
            topic="FilterTopic",
            tags="ORDER",
            keys="high_level_order",
            body="高级别订单消息",
            properties={
                "level": "9",
                "category": "order",
                "priority": "high",
                "amount": "1000.00"
            }
        )
        
        # 发送低级别通知消息
        self.producer.send_message_with_tags(
            topic="FilterTopic",
            tags="NOTIFICATION",
            keys="low_level_notification",
            body="低级别通知消息",
            properties={
                "level": "2",
                "category": "notification",
                "priority": "low"
            }
        )

# 过滤器测试
class FilterTest:
    """过滤器测试"""
    
    def test_tag_filter(self):
        """测试标签过滤器"""
        print("测试标签过滤器:")
        
        # 创建测试消息
        test_messages = [
            self._create_test_message("VIP"),
            self._create_test_message("NORMAL"),
            self._create_test_message("IMPORTANT"),
            self._create_test_message("DEBUG")
        ]
        
        # 测试不同的标签过滤器
        filters = [
            TagFilter("VIP"),
            TagFilter("VIP || IMPORTANT"),
            TagFilter("*"),
            TagFilter("PREMIUM")
        ]
        
        for filter_obj in filters:
            print(f"\n过滤表达式: {filter_obj.get_filter_expression()}")
            for msg in test_messages:
                match = filter_obj.match(msg)
                print(f"  消息标签 '{msg.tags}': {'匹配' if match else '不匹配'}")
    
    def test_sql_filter(self):
        """测试SQL过滤器"""
        print("\n测试SQL过滤器:")
        
        # 创建带属性的测试消息
        test_messages = [
            self._create_test_message_with_properties("ORDER", {"level": "8", "category": "order"}),
            self._create_test_message_with_properties("ORDER", {"level": "3", "category": "order"}),
            self._create_test_message_with_properties("NOTIFICATION", {"level": "6", "category": "notification"}),
            self._create_test_message_with_properties("DEBUG", {"level": "1", "category": "debug"})
        ]
        
        # 测试不同的SQL过滤器
        sql_filters = [
            "WHERE level > 5",
            "WHERE level > 5 AND category = 'order'",
            "WHERE category = 'notification'",
            "WHERE level <= 3"
        ]
        
        for sql_expr in sql_filters:
            filter_obj = SQL92Filter(sql_expr)
            print(f"\nSQL表达式: {sql_expr}")
            
            for msg in test_messages:
                match = filter_obj.match(msg)
                props = getattr(msg, 'properties', {})
                print(f"  消息 (level={props.get('level', 'N/A')}, category={props.get('category', 'N/A')}): {'匹配' if match else '不匹配'}")
    
    def _create_test_message(self, tags: str) -> 'MessageExt':
        """创建测试消息"""
        from rocketmq_consumer import MessageExt
        
        return MessageExt(
            topic="TestTopic",
            tags=tags,
            keys=f"test_key_{tags.lower()}",
            body=f"测试消息内容 - {tags}".encode('utf-8'),
            msg_id=f"test_msg_{tags.lower()}",
            queue_id=0,
            queue_offset=0,
            born_timestamp=int(time.time() * 1000),
            store_timestamp=int(time.time() * 1000)
        )
    
    def _create_test_message_with_properties(self, tags: str, 
                                           properties: Dict[str, str]) -> 'MessageExt':
        """创建带属性的测试消息"""
        msg = self._create_test_message(tags)
        msg.properties = properties
        return msg

# 使用示例
if __name__ == "__main__":
    # 测试过滤器
    filter_test = FilterTest()
    filter_test.test_tag_filter()
    filter_test.test_sql_filter()
    
    # 消息过滤示例
    example = MessageFilterExample()
    
    # 发送消息示例
    example.start_producer_example()
    
    # 标签过滤消费示例
    # example.start_tag_filter_consumer_example()
    
    # SQL过滤消费示例
     # example.start_sql_filter_consumer_example()

5.6 本章总结

5.6.1 核心知识点

# RocketMQ高级特性总结
class RocketMQAdvancedFeaturesSummary:
    """RocketMQ高级特性总结"""
    
    def __init__(self):
        self.features = {
            "顺序消息": {
                "描述": "保证消息按照发送顺序被消费",
                "类型": ["全局顺序", "分区顺序"],
                "关键点": [
                    "使用MessageQueueSelector选择队列",
                    "同一队列内消息顺序消费",
                    "消费者需要顺序处理消息"
                ],
                "适用场景": [
                    "订单状态变更",
                    "账户余额变动",
                    "库存变化记录"
                ]
            },
            "事务消息": {
                "描述": "保证本地事务与消息发送的一致性",
                "流程": [
                    "发送半消息",
                    "执行本地事务",
                    "提交或回滚事务消息"
                ],
                "关键点": [
                    "实现TransactionListener接口",
                    "处理事务状态检查",
                    "确保幂等性"
                ],
                "适用场景": [
                    "分布式事务",
                    "数据一致性保证",
                    "业务解耦"
                ]
            },
            "延时消息": {
                "描述": "消息在指定时间后才能被消费",
                "实现方式": [
                    "预定义延时级别",
                    "指定延时时间"
                ],
                "关键点": [
                    "支持18个延时级别",
                    "最大延时2小时",
                    "延时精度秒级"
                ],
                "适用场景": [
                    "订单超时取消",
                    "定时任务",
                    "延时通知"
                ]
            },
            "批量消息": {
                "描述": "一次发送多条消息,提高吞吐量",
                "优势": [
                    "减少网络开销",
                    "提高发送效率",
                    "降低系统负载"
                ],
                "限制": [
                    "消息总大小不超过4MB",
                    "同一批次消息必须是同一Topic",
                    "不支持延时消息"
                ],
                "适用场景": [
                    "日志收集",
                    "数据同步",
                    "批量通知"
                ]
            },
            "消息过滤": {
                "描述": "根据条件过滤消息,减少无效消费",
                "类型": [
                    "标签过滤(Tag Filter)",
                    "SQL92过滤(SQL Filter)"
                ],
                "关键点": [
                    "支持多标签过滤",
                    "SQL表达式过滤",
                    "客户端和服务端过滤"
                ],
                "适用场景": [
                    "消息分类处理",
                    "条件消费",
                    "减少网络传输"
                ]
            }
        }
    
    def get_feature_summary(self, feature_name: str) -> Dict[str, Any]:
        """获取特性总结"""
        return self.features.get(feature_name, {})
    
    def get_all_features(self) -> List[str]:
        """获取所有特性列表"""
        return list(self.features.keys())
    
    def print_summary(self):
        """打印特性总结"""
        print("RocketMQ高级特性总结:")
        print("=" * 50)
        
        for feature_name, feature_info in self.features.items():
            print(f"\n{feature_name}:")
            print(f"  描述: {feature_info['描述']}")
            
            for key, value in feature_info.items():
                if key != '描述':
                    print(f"  {key}:")
                    if isinstance(value, list):
                        for item in value:
                            print(f"    - {item}")
                    else:
                        print(f"    {value}")

5.6.2 最佳实践

# RocketMQ高级特性最佳实践
class RocketMQAdvancedBestPractices:
    """RocketMQ高级特性最佳实践"""
    
    def __init__(self):
        self.best_practices = {
            "顺序消息最佳实践": [
                "合理选择分区键,避免热点队列",
                "消费者处理失败时要谨慎重试",
                "避免长时间阻塞消费线程",
                "监控队列消费进度",
                "考虑消费者扩容对顺序的影响"
            ],
            "事务消息最佳实践": [
                "本地事务要保证幂等性",
                "事务状态检查要快速响应",
                "合理设置事务超时时间",
                "记录事务执行日志",
                "处理事务回查逻辑"
            ],
            "延时消息最佳实践": [
                "选择合适的延时级别",
                "避免大量相同时间的延时消息",
                "考虑时钟偏移问题",
                "监控延时消息堆积",
                "设计延时消息的取消机制"
            ],
            "批量消息最佳实践": [
                "控制批量大小,避免超过限制",
                "合理设置批量发送间隔",
                "处理批量发送失败的重试",
                "监控批量发送性能",
                "考虑消息顺序要求"
            ],
            "消息过滤最佳实践": [
                "优先使用标签过滤",
                "SQL过滤表达式要简洁",
                "避免复杂的过滤逻辑",
                "在生产者端设置合适的标签",
                "监控过滤效果和性能"
            ],
            "性能优化实践": [
                "合理配置生产者和消费者参数",
                "使用异步发送提高吞吐量",
                "批量处理消息",
                "避免频繁创建连接",
                "监控关键性能指标"
            ],
            "可靠性保证实践": [
                "实现消息幂等处理",
                "设计合理的重试机制",
                "处理死信队列消息",
                "监控消息堆积情况",
                "建立消息追踪机制"
            ]
        }
    
    def get_best_practices(self, category: str) -> List[str]:
        """获取最佳实践"""
        return self.best_practices.get(category, [])
    
    def print_best_practices(self):
        """打印最佳实践"""
        print("RocketMQ高级特性最佳实践:")
        print("=" * 50)
        
        for category, practices in self.best_practices.items():
            print(f"\n{category}:")
            for i, practice in enumerate(practices, 1):
                print(f"  {i}. {practice}")

# 性能监控和调优
class RocketMQPerformanceMonitor:
    """RocketMQ性能监控"""
    
    def __init__(self):
        self.metrics = {
            "生产者指标": [
                "发送TPS(每秒事务数)",
                "发送延迟(RT)",
                "发送成功率",
                "发送失败率",
                "重试次数"
            ],
            "消费者指标": [
                "消费TPS",
                "消费延迟",
                "消费成功率",
                "消费失败率",
                "消息堆积量"
            ],
            "Broker指标": [
                "消息存储量",
                "磁盘使用率",
                "内存使用率",
                "网络IO",
                "队列深度"
            ],
            "系统指标": [
                "CPU使用率",
                "内存使用率",
                "磁盘IO",
                "网络带宽",
                "连接数"
            ]
        }
    
    def get_monitoring_metrics(self) -> Dict[str, List[str]]:
        """获取监控指标"""
        return self.metrics
    
    def print_monitoring_guide(self):
        """打印监控指南"""
        print("RocketMQ性能监控指南:")
        print("=" * 50)
        
        for category, metrics in self.metrics.items():
            print(f"\n{category}:")
            for metric in metrics:
                print(f"  - {metric}")
        
        print("\n监控建议:")
        print("  1. 建立完善的监控体系")
        print("  2. 设置合理的告警阈值")
        print("  3. 定期分析性能趋势")
        print("  4. 建立性能基线")
        print("  5. 制定性能调优计划")

5.6.3 练习题

# 练习题
class RocketMQAdvancedExercises:
    """RocketMQ高级特性练习题"""
    
    def __init__(self):
        self.exercises = [
            {
                "题目": "实现一个电商订单处理系统",
                "要求": [
                    "使用顺序消息保证订单状态变更的顺序",
                    "使用事务消息保证库存扣减和订单创建的一致性",
                    "使用延时消息实现订单超时自动取消",
                    "使用消息过滤实现不同类型订单的分类处理"
                ],
                "提示": [
                    "以订单ID作为分区键保证顺序",
                    "在事务监听器中处理库存操作",
                    "设置合适的订单超时时间",
                    "使用标签区分订单类型"
                ]
            },
            {
                "题目": "设计一个日志收集系统",
                "要求": [
                    "使用批量消息提高日志发送效率",
                    "使用消息过滤实现不同级别日志的处理",
                    "实现日志消息的压缩和解压缩",
                    "监控日志发送和消费的性能指标"
                ],
                "提示": [
                    "按时间或数量触发批量发送",
                    "使用日志级别作为过滤条件",
                    "在消息体中实现压缩逻辑",
                    "记录发送和消费的统计信息"
                ]
            },
            {
                "题目": "实现一个分布式任务调度系统",
                "要求": [
                    "使用延时消息实现定时任务",
                    "使用事务消息保证任务状态的一致性",
                    "使用消息过滤实现任务类型的分发",
                    "实现任务的重试和失败处理机制"
                ],
                "提示": [
                    "根据执行时间计算延时级别",
                    "在事务中更新任务状态",
                    "使用任务类型作为标签",
                    "设计合理的重试策略"
                ]
            },
            {
                "题目": "优化现有的消息系统性能",
                "要求": [
                    "分析当前系统的性能瓶颈",
                    "使用批量消息优化发送性能",
                    "使用异步发送提高吞吐量",
                    "实现消息的监控和告警"
                ],
                "提示": [
                    "监控发送和消费的延迟",
                    "合理设置批量大小和间隔",
                    "使用回调处理异步结果",
                    "建立完善的监控体系"
                ]
            }
        ]
    
    def get_exercise(self, index: int) -> Dict[str, Any]:
        """获取练习题"""
        if 0 <= index < len(self.exercises):
            return self.exercises[index]
        return {}
    
    def print_all_exercises(self):
        """打印所有练习题"""
        print("RocketMQ高级特性练习题:")
        print("=" * 50)
        
        for i, exercise in enumerate(self.exercises, 1):
            print(f"\n练习题 {i}: {exercise['题目']}")
            
            print("要求:")
            for req in exercise['要求']:
                print(f"  - {req}")
            
            print("提示:")
            for hint in exercise['提示']:
                print(f"  - {hint}")
            
            print()

# 使用示例
if __name__ == "__main__":
    # 打印特性总结
    summary = RocketMQAdvancedFeaturesSummary()
    summary.print_summary()
    
    print("\n" + "=" * 80 + "\n")
    
    # 打印最佳实践
    best_practices = RocketMQAdvancedBestPractices()
    best_practices.print_best_practices()
    
    print("\n" + "=" * 80 + "\n")
    
    # 打印监控指南
    monitor = RocketMQPerformanceMonitor()
    monitor.print_monitoring_guide()
    
    print("\n" + "=" * 80 + "\n")
    
    # 打印练习题
    exercises = RocketMQAdvancedExercises()
    exercises.print_all_exercises()

本章小结:

本章详细介绍了RocketMQ的五大高级特性:

  1. 顺序消息 - 保证消息的有序处理,适用于对顺序有严格要求的业务场景
  2. 事务消息 - 保证分布式事务的一致性,解决本地事务与消息发送的原子性问题
  3. 延时消息 - 实现消息的延时投递,支持定时任务和延时处理场景
  4. 批量消息 - 提高消息发送效率,减少网络开销,适用于高吞吐量场景
  5. 消息过滤 - 实现消息的精确投递,减少无效消费,提高系统效率

通过学习这些高级特性,你可以根据不同的业务需求选择合适的消息处理方式,构建更加高效、可靠的分布式消息系统。在实际应用中,要结合业务特点和性能要求,合理使用这些特性,并建立完善的监控和运维体系。