4.1 消费者基础

4.1.1 消费者核心概念

from enum import Enum
from typing import List, Dict, Optional, Callable
from dataclasses import dataclass
from abc import ABC, abstractmethod
import time
import threading

# 消费者状态枚举
class ConsumerState(Enum):
    CREATE_JUST = "CREATE_JUST"
    RUNNING = "RUNNING"
    SHUTDOWN_ALREADY = "SHUTDOWN_ALREADY"
    SERVICE_STATE_OK = "SERVICE_STATE_OK"

# 消费模式枚举
class ConsumeMode(Enum):
    CONCURRENTLY = "CONCURRENTLY"  # 并发消费
    ORDERLY = "ORDERLY"            # 顺序消费

# 消息模型枚举
class MessageModel(Enum):
    BROADCASTING = "BROADCASTING"  # 广播模式
    CLUSTERING = "CLUSTERING"      # 集群模式

# 消费起始位置枚举
class ConsumeFromWhere(Enum):
    CONSUME_FROM_LAST_OFFSET = "CONSUME_FROM_LAST_OFFSET"
    CONSUME_FROM_FIRST_OFFSET = "CONSUME_FROM_FIRST_OFFSET"
    CONSUME_FROM_TIMESTAMP = "CONSUME_FROM_TIMESTAMP"

# 消费结果枚举
class ConsumeResult(Enum):
    SUCCESS = "SUCCESS"
    RECONSUME_LATER = "RECONSUME_LATER"

# 拉取状态枚举
class PullStatus(Enum):
    FOUND = "FOUND"
    NO_NEW_MSG = "NO_NEW_MSG"
    NO_MATCHED_MSG = "NO_MATCHED_MSG"
    OFFSET_ILLEGAL = "OFFSET_ILLEGAL"

# 消息扩展信息
@dataclass
class MessageExt:
    topic: str
    tags: str
    keys: str
    body: bytes
    msg_id: str
    queue_id: int
    queue_offset: int
    born_timestamp: int
    store_timestamp: int
    reconsume_times: int = 0
    properties: Dict[str, str] = None
    
    def __post_init__(self):
        if self.properties is None:
            self.properties = {}
    
    def get_property(self, key: str) -> Optional[str]:
        """获取消息属性"""
        return self.properties.get(key)
    
    def put_property(self, key: str, value: str):
        """设置消息属性"""
        self.properties[key] = value

# 消费上下文
@dataclass
class ConsumeContext:
    consumer_group: str
    message_queue: 'MessageQueue'
    ack_index: int = -1
    delay_level_when_next_consume: int = 0
    
    def get_delay_level_when_next_consume(self) -> int:
        return self.delay_level_when_next_consume
    
    def set_delay_level_when_next_consume(self, delay_level: int):
        self.delay_level_when_next_consume = delay_level

# 拉取结果
@dataclass
class PullResult:
    pull_status: PullStatus
    next_begin_offset: int
    min_offset: int
    max_offset: int
    msg_found_list: List[MessageExt] = None
    
    def __post_init__(self):
        if self.msg_found_list is None:
            self.msg_found_list = []

# 消费者配置
@dataclass
class ConsumerConfig:
    consumer_group: str
    name_server_addr: str
    consume_mode: ConsumeMode = ConsumeMode.CONCURRENTLY
    message_model: MessageModel = MessageModel.CLUSTERING
    consume_from_where: ConsumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
    consume_timestamp: str = "20231201080000"  # yyyyMMddHHmmss
    max_reconsume_times: int = 16
    consume_timeout: int = 15  # 分钟
    consume_thread_min: int = 20
    consume_thread_max: int = 64
    adjust_thread_pool_nums_threshold: int = 100000
    pull_interval: int = 0  # 毫秒
    pull_batch_size: int = 32
    consume_message_batch_max_size: int = 1
    post_subscription_when_pull: bool = False
    unit_name: str = ""
    max_cached_message_count: int = 1000
    max_cached_message_size_in_mib: int = 512
    
    def validate(self) -> bool:
        """验证配置"""
        if not self.consumer_group or not self.name_server_addr:
            return False
        if self.consume_thread_min <= 0 or self.consume_thread_max <= 0:
            return False
        if self.consume_thread_min > self.consume_thread_max:
            return False
        return True

# 消费者异常类
class ConsumerException(Exception):
    """消费者基础异常"""
    pass

class ConsumerStartException(ConsumerException):
    """消费者启动异常"""
    pass

class ConsumerShutdownException(ConsumerException):
    """消费者关闭异常"""
    pass

class SubscriptionException(ConsumerException):
    """订阅异常"""
    pass

class ConsumeException(ConsumerException):
    """消费异常"""
    pass

class PullException(ConsumerException):
    """拉取异常"""
    pass

4.1.2 消息监听器接口

# 消息监听器接口
class MessageListener(ABC):
    """消息监听器基础接口"""
    
    @abstractmethod
    def consume_message(self, messages: List[MessageExt], context: ConsumeContext) -> ConsumeResult:
        """消费消息"""
        pass

# 并发消息监听器
class MessageListenerConcurrently(MessageListener):
    """并发消息监听器接口"""
    
    @abstractmethod
    def consume_message(self, messages: List[MessageExt], context: ConsumeContext) -> ConsumeResult:
        """并发消费消息"""
        pass

# 顺序消息监听器
class MessageListenerOrderly(MessageListener):
    """顺序消息监听器接口"""
    
    @abstractmethod
    def consume_message(self, messages: List[MessageExt], context: ConsumeContext) -> ConsumeResult:
        """顺序消费消息"""
        pass

# 示例消息监听器实现
class SimpleMessageListener(MessageListenerConcurrently):
    """简单消息监听器实现"""
    
    def __init__(self, handler: Callable[[List[MessageExt]], ConsumeResult] = None):
        self.handler = handler or self._default_handler
        self.consume_count = 0
        self.success_count = 0
        self.failed_count = 0
    
    def consume_message(self, messages: List[MessageExt], context: ConsumeContext) -> ConsumeResult:
        """消费消息"""
        try:
            self.consume_count += len(messages)
            result = self.handler(messages)
            
            if result == ConsumeResult.SUCCESS:
                self.success_count += len(messages)
            else:
                self.failed_count += len(messages)
            
            return result
        except Exception as e:
            print(f"消费消息异常: {e}")
            self.failed_count += len(messages)
            return ConsumeResult.RECONSUME_LATER
    
    def _default_handler(self, messages: List[MessageExt]) -> ConsumeResult:
        """默认消息处理器"""
        for msg in messages:
            print(f"消费消息: Topic={msg.topic}, Tags={msg.tags}, Body={msg.body.decode('utf-8')}")
        return ConsumeResult.SUCCESS
    
    def get_statistics(self) -> Dict[str, int]:
        """获取消费统计"""
        return {
            "consume_count": self.consume_count,
            "success_count": self.success_count,
            "failed_count": self.failed_count
        }

# 业务消息监听器
class BusinessMessageListener(MessageListenerConcurrently):
    """业务消息监听器"""
    
    def __init__(self, business_handler: Callable[[MessageExt], bool]):
        self.business_handler = business_handler
        self.retry_times = {}
    
    def consume_message(self, messages: List[MessageExt], context: ConsumeContext) -> ConsumeResult:
        """消费业务消息"""
        for msg in messages:
            try:
                # 检查重试次数
                if msg.reconsume_times >= 3:
                    print(f"消息重试次数过多,丢弃消息: {msg.msg_id}")
                    continue
                
                # 处理业务逻辑
                success = self.business_handler(msg)
                
                if not success:
                    print(f"业务处理失败,消息将重试: {msg.msg_id}")
                    return ConsumeResult.RECONSUME_LATER
                    
            except Exception as e:
                print(f"处理消息异常: {msg.msg_id}, 错误: {e}")
                return ConsumeResult.RECONSUME_LATER
        
        return ConsumeResult.SUCCESS

4.2 Push消费者

4.2.1 Push消费者实现

class DefaultMQPushConsumer:
    """默认Push消费者实现"""
    
    def __init__(self, config: ConsumerConfig):
        self.config = config
        self.state = ConsumerState.CREATE_JUST
        self.subscription_table: Dict[str, str] = {}
        self.message_listener: Optional[MessageListener] = None
        self.consume_service: Optional['ConsumeMessageService'] = None
        self.pull_service: Optional['PullMessageService'] = None
        self.rebalance_service: Optional['RebalanceService'] = None
        self.offset_store: Optional['OffsetStore'] = None
        self.client_factory: Optional['MQClientFactory'] = None
        self.consume_stats = {
            "total_consumed": 0,
            "success_consumed": 0,
            "failed_consumed": 0,
            "start_time": 0
        }
    
    def start(self):
        """启动消费者"""
        if self.state != ConsumerState.CREATE_JUST:
            raise ConsumerStartException(f"消费者状态错误: {self.state}")
        
        if not self.config.validate():
            raise ConsumerStartException("消费者配置无效")
        
        if not self.message_listener:
            raise ConsumerStartException("未设置消息监听器")
        
        try:
            # 初始化客户端工厂
            self.client_factory = MQClientFactory(self.config.name_server_addr)
            
            # 初始化偏移量存储
            if self.config.message_model == MessageModel.BROADCASTING:
                self.offset_store = LocalFileOffsetStore(self.config.consumer_group)
            else:
                self.offset_store = RemoteBrokerOffsetStore(self.config.consumer_group, self.client_factory)
            
            # 初始化消费服务
            if self.config.consume_mode == ConsumeMode.CONCURRENTLY:
                self.consume_service = ConsumeMessageConcurrentlyService(self, self.message_listener)
            else:
                self.consume_service = ConsumeMessageOrderlyService(self, self.message_listener)
            
            # 初始化拉取服务
            self.pull_service = PullMessageService(self)
            
            # 初始化负载均衡服务
            self.rebalance_service = RebalanceService(self)
            
            # 启动各个服务
            self.offset_store.load()
            self.consume_service.start()
            self.pull_service.start()
            self.rebalance_service.start()
            
            # 注册消费者
            self.client_factory.register_consumer(self.config.consumer_group, self)
            
            self.state = ConsumerState.RUNNING
            self.consume_stats["start_time"] = int(time.time() * 1000)
            
            print(f"Push消费者启动成功: {self.config.consumer_group}")
            
        except Exception as e:
            self.state = ConsumerState.CREATE_JUST
            raise ConsumerStartException(f"启动消费者失败: {e}")
    
    def shutdown(self):
        """关闭消费者"""
        if self.state != ConsumerState.RUNNING:
            return
        
        try:
            # 停止各个服务
            if self.rebalance_service:
                self.rebalance_service.shutdown()
            
            if self.pull_service:
                self.pull_service.shutdown()
            
            if self.consume_service:
                self.consume_service.shutdown()
            
            if self.offset_store:
                self.offset_store.persist_all()
            
            # 注销消费者
            if self.client_factory:
                self.client_factory.unregister_consumer(self.config.consumer_group)
            
            self.state = ConsumerState.SHUTDOWN_ALREADY
            print(f"Push消费者关闭成功: {self.config.consumer_group}")
            
        except Exception as e:
            raise ConsumerShutdownException(f"关闭消费者失败: {e}")
    
    def subscribe(self, topic: str, sub_expression: str = "*"):
        """订阅主题"""
        if self.state == ConsumerState.RUNNING:
            raise SubscriptionException("消费者运行时不能修改订阅")
        
        self.subscription_table[topic] = sub_expression
        print(f"订阅主题: {topic}, 表达式: {sub_expression}")
    
    def unsubscribe(self, topic: str):
        """取消订阅"""
        if self.state == ConsumerState.RUNNING:
            raise SubscriptionException("消费者运行时不能修改订阅")
        
        if topic in self.subscription_table:
            del self.subscription_table[topic]
            print(f"取消订阅主题: {topic}")
    
    def register_message_listener(self, listener: MessageListener):
        """注册消息监听器"""
        self.message_listener = listener
    
    def get_subscription_table(self) -> Dict[str, str]:
        """获取订阅表"""
        return self.subscription_table.copy()
    
    def update_consume_stats(self, success_count: int, failed_count: int):
        """更新消费统计"""
        self.consume_stats["total_consumed"] += success_count + failed_count
        self.consume_stats["success_consumed"] += success_count
        self.consume_stats["failed_consumed"] += failed_count
    
    def get_consume_stats(self) -> Dict[str, int]:
        """获取消费统计"""
        stats = self.consume_stats.copy()
        if stats["start_time"] > 0:
            stats["running_time_ms"] = int(time.time() * 1000) - stats["start_time"]
        return stats

4.2.2 消费消息服务

class ConsumeMessageService(ABC):
    """消费消息服务接口"""
    
    @abstractmethod
    def start(self):
        """启动服务"""
        pass
    
    @abstractmethod
    def shutdown(self):
        """关闭服务"""
        pass
    
    @abstractmethod
    def submit_consume_request(self, consume_request: 'ConsumeRequest'):
        """提交消费请求"""
        pass

class ConsumeMessageConcurrentlyService(ConsumeMessageService):
    """并发消费消息服务"""
    
    def __init__(self, consumer: DefaultMQPushConsumer, listener: MessageListenerConcurrently):
        self.consumer = consumer
        self.listener = listener
        self.consume_executor = None
        self.scheduled_executor = None
        self.running = False
        self.consume_requests_queue = []
        self.consume_requests_lock = threading.Lock()
    
    def start(self):
        """启动并发消费服务"""
        from concurrent.futures import ThreadPoolExecutor
        import schedule
        
        self.consume_executor = ThreadPoolExecutor(
            max_workers=self.consumer.config.consume_thread_max,
            thread_name_prefix="ConsumeMessageThread"
        )
        
        self.scheduled_executor = ThreadPoolExecutor(
            max_workers=2,
            thread_name_prefix="ConsumeScheduledThread"
        )
        
        self.running = True
        
        # 启动消费请求处理线程
        self.scheduled_executor.submit(self._process_consume_requests)
        
        # 启动清理过期消息线程
        self.scheduled_executor.submit(self._clean_expired_messages)
        
        print("并发消费服务启动成功")
    
    def shutdown(self):
        """关闭并发消费服务"""
        self.running = False
        
        if self.consume_executor:
            self.consume_executor.shutdown(wait=True)
        
        if self.scheduled_executor:
            self.scheduled_executor.shutdown(wait=True)
        
        print("并发消费服务关闭成功")
    
    def submit_consume_request(self, consume_request: 'ConsumeRequest'):
        """提交消费请求"""
        with self.consume_requests_lock:
            self.consume_requests_queue.append(consume_request)
    
    def _process_consume_requests(self):
        """处理消费请求"""
        while self.running:
            try:
                with self.consume_requests_lock:
                    if self.consume_requests_queue:
                        request = self.consume_requests_queue.pop(0)
                        self.consume_executor.submit(self._consume_message, request)
                
                time.sleep(0.01)  # 避免CPU占用过高
                
            except Exception as e:
                print(f"处理消费请求异常: {e}")
    
    def _consume_message(self, request: 'ConsumeRequest'):
        """消费消息"""
        try:
            context = ConsumeContext(
                consumer_group=self.consumer.config.consumer_group,
                message_queue=request.message_queue
            )
            
            # 调用监听器消费消息
            result = self.listener.consume_message(request.messages, context)
            
            # 处理消费结果
            if result == ConsumeResult.SUCCESS:
                # 更新偏移量
                self._update_offset(request)
                self.consumer.update_consume_stats(len(request.messages), 0)
            else:
                # 发送回Broker重新消费
                self._send_message_back(request)
                self.consumer.update_consume_stats(0, len(request.messages))
            
        except Exception as e:
            print(f"消费消息异常: {e}")
            self._send_message_back(request)
            self.consumer.update_consume_stats(0, len(request.messages))
    
    def _update_offset(self, request: 'ConsumeRequest'):
        """更新消费偏移量"""
        if request.messages:
            last_msg = request.messages[-1]
            new_offset = last_msg.queue_offset + 1
            self.consumer.offset_store.update_offset(
                request.message_queue,
                new_offset,
                increment_only=True
            )
    
    def _send_message_back(self, request: 'ConsumeRequest'):
        """发送消息回Broker"""
        for msg in request.messages:
            try:
                # 增加重试次数
                msg.reconsume_times += 1
                
                # 如果重试次数超过限制,发送到死信队列
                if msg.reconsume_times > self.consumer.config.max_reconsume_times:
                    print(f"消息重试次数超限,发送到死信队列: {msg.msg_id}")
                    # 这里应该发送到死信队列
                else:
                    print(f"消息消费失败,发送回Broker重试: {msg.msg_id}")
                    # 这里应该发送回Broker
                    
            except Exception as e:
                print(f"发送消息回Broker异常: {e}")
    
    def _clean_expired_messages(self):
        """清理过期消息"""
        while self.running:
            try:
                # 定期清理过期的消费请求
                current_time = int(time.time() * 1000)
                
                with self.consume_requests_lock:
                    self.consume_requests_queue = [
                        req for req in self.consume_requests_queue
                        if current_time - req.submit_timestamp < 300000  # 5分钟
                    ]
                
                time.sleep(60)  # 每分钟清理一次
                
            except Exception as e:
                print(f"清理过期消息异常: {e}")

# 消费请求
@dataclass
class ConsumeRequest:
    messages: List[MessageExt]
    message_queue: 'MessageQueue'
    process_queue: 'ProcessQueue'
    submit_timestamp: int = 0
    
    def __post_init__(self):
        if self.submit_timestamp == 0:
            self.submit_timestamp = int(time.time() * 1000)

4.3 Pull消费者

4.3.1 Pull消费者实现

class DefaultMQPullConsumer:
    """默认Pull消费者实现"""
    
    def __init__(self, config: ConsumerConfig):
        self.config = config
        self.state = ConsumerState.CREATE_JUST
        self.client_factory: Optional['MQClientFactory'] = None
        self.offset_store: Optional['OffsetStore'] = None
        self.pull_stats = {
            "total_pulled": 0,
            "success_pulled": 0,
            "failed_pulled": 0,
            "start_time": 0
        }
    
    def start(self):
        """启动Pull消费者"""
        if self.state != ConsumerState.CREATE_JUST:
            raise ConsumerStartException(f"消费者状态错误: {self.state}")
        
        if not self.config.validate():
            raise ConsumerStartException("消费者配置无效")
        
        try:
            # 初始化客户端工厂
            self.client_factory = MQClientFactory(self.config.name_server_addr)
            
            # 初始化偏移量存储
            if self.config.message_model == MessageModel.BROADCASTING:
                self.offset_store = LocalFileOffsetStore(self.config.consumer_group)
            else:
                self.offset_store = RemoteBrokerOffsetStore(self.config.consumer_group, self.client_factory)
            
            # 加载偏移量
            self.offset_store.load()
            
            # 注册消费者
            self.client_factory.register_consumer(self.config.consumer_group, self)
            
            self.state = ConsumerState.RUNNING
            self.pull_stats["start_time"] = int(time.time() * 1000)
            
            print(f"Pull消费者启动成功: {self.config.consumer_group}")
            
        except Exception as e:
            self.state = ConsumerState.CREATE_JUST
            raise ConsumerStartException(f"启动Pull消费者失败: {e}")
    
    def shutdown(self):
        """关闭Pull消费者"""
        if self.state != ConsumerState.RUNNING:
            return
        
        try:
            # 持久化偏移量
            if self.offset_store:
                self.offset_store.persist_all()
            
            # 注销消费者
            if self.client_factory:
                self.client_factory.unregister_consumer(self.config.consumer_group)
            
            self.state = ConsumerState.SHUTDOWN_ALREADY
            print(f"Pull消费者关闭成功: {self.config.consumer_group}")
            
        except Exception as e:
            raise ConsumerShutdownException(f"关闭Pull消费者失败: {e}")
    
    def fetch_subscribe_message_queues(self, topic: str) -> List['MessageQueue']:
        """获取订阅的消息队列"""
        if self.state != ConsumerState.RUNNING:
            raise PullException("消费者未运行")
        
        try:
            # 从NameServer获取Topic路由信息
            route_data = self.client_factory.get_topic_route_info(topic)
            
            if not route_data or not route_data.queue_datas:
                return []
            
            message_queues = []
            for queue_data in route_data.queue_datas:
                for i in range(queue_data.read_queue_nums):
                    mq = MessageQueue(topic, queue_data.broker_name, i)
                    message_queues.append(mq)
            
            return message_queues
            
        except Exception as e:
            raise PullException(f"获取消息队列失败: {e}")
    
    def pull_block_if_not_found(self, mq: 'MessageQueue', sub_expression: str, 
                               offset: int, max_nums: int, timeout: int = 20000) -> PullResult:
        """拉取消息(阻塞模式)"""
        return self._pull_sync_impl(mq, sub_expression, offset, max_nums, timeout, True)
    
    def pull_no_block(self, mq: 'MessageQueue', sub_expression: str, 
                     offset: int, max_nums: int) -> PullResult:
        """拉取消息(非阻塞模式)"""
        return self._pull_sync_impl(mq, sub_expression, offset, max_nums, 0, False)
    
    def pull_async(self, mq: 'MessageQueue', sub_expression: str, offset: int, 
                  max_nums: int, callback: 'PullCallback', timeout: int = 20000):
        """异步拉取消息"""
        if self.state != ConsumerState.RUNNING:
            raise PullException("消费者未运行")
        
        def async_pull():
            try:
                result = self._pull_sync_impl(mq, sub_expression, offset, max_nums, timeout, True)
                callback.on_success(result)
                self.pull_stats["success_pulled"] += len(result.msg_found_list)
            except Exception as e:
                callback.on_exception(e)
                self.pull_stats["failed_pulled"] += 1
        
        # 异步执行拉取
        threading.Thread(target=async_pull, daemon=True).start()
    
    def _pull_sync_impl(self, mq: 'MessageQueue', sub_expression: str, 
                       offset: int, max_nums: int, timeout: int, block: bool) -> PullResult:
        """同步拉取消息实现"""
        if self.state != ConsumerState.RUNNING:
            raise PullException("消费者未运行")
        
        try:
            # 验证参数
            if max_nums <= 0 or max_nums > 32:
                raise PullException("拉取消息数量必须在1-32之间")
            
            # 获取Broker地址
            broker_addr = self.client_factory.find_broker_address_in_subscribe(
                mq.broker_name, 0, False
            )
            
            if not broker_addr:
                raise PullException(f"找不到Broker地址: {mq.broker_name}")
            
            # 构建拉取请求
            pull_request = PullMessageRequest(
                consumer_group=self.config.consumer_group,
                topic=mq.topic,
                queue_id=mq.queue_id,
                queue_offset=offset,
                max_msg_nums=max_nums,
                sys_flag=0,
                commit_offset=offset,
                suspend_timeout_millis=timeout if block else 0,
                subscription=sub_expression,
                sub_version=int(time.time() * 1000)
            )
            
            # 发送拉取请求到Broker
            response = self.client_factory.send_pull_request(broker_addr, pull_request)
            
            # 解析响应
            result = self._parse_pull_response(response)
            
            self.pull_stats["total_pulled"] += len(result.msg_found_list)
            self.pull_stats["success_pulled"] += len(result.msg_found_list)
            
            return result
            
        except Exception as e:
            self.pull_stats["failed_pulled"] += 1
            raise PullException(f"拉取消息失败: {e}")
    
    def _parse_pull_response(self, response: 'PullMessageResponse') -> PullResult:
        """解析拉取响应"""
        if response.code == 0:  # SUCCESS
            return PullResult(
                pull_status=PullStatus.FOUND,
                next_begin_offset=response.next_begin_offset,
                min_offset=response.min_offset,
                max_offset=response.max_offset,
                msg_found_list=response.messages or []
            )
        elif response.code == 1:  # PULL_NOT_FOUND
            return PullResult(
                pull_status=PullStatus.NO_NEW_MSG,
                next_begin_offset=response.next_begin_offset,
                min_offset=response.min_offset,
                max_offset=response.max_offset
            )
        elif response.code == 2:  # PULL_RETRY_IMMEDIATELY
            return PullResult(
                pull_status=PullStatus.NO_MATCHED_MSG,
                next_begin_offset=response.next_begin_offset,
                min_offset=response.min_offset,
                max_offset=response.max_offset
            )
        else:
            return PullResult(
                pull_status=PullStatus.OFFSET_ILLEGAL,
                next_begin_offset=response.next_begin_offset,
                min_offset=response.min_offset,
                max_offset=response.max_offset
            )
    
    def update_consume_offset(self, mq: 'MessageQueue', offset: int):
        """更新消费偏移量"""
        if self.offset_store:
            self.offset_store.update_offset(mq, offset, increment_only=False)
    
    def fetch_consume_offset(self, mq: 'MessageQueue', from_store: bool = True) -> int:
        """获取消费偏移量"""
        if self.offset_store:
            return self.offset_store.read_offset(mq, from_store)
        return -1
    
    def get_pull_stats(self) -> Dict[str, int]:
        """获取拉取统计"""
        stats = self.pull_stats.copy()
        if stats["start_time"] > 0:
            stats["running_time_ms"] = int(time.time() * 1000) - stats["start_time"]
        return stats

# 拉取回调接口
class PullCallback(ABC):
    """拉取回调接口"""
    
    @abstractmethod
    def on_success(self, pull_result: PullResult):
        """拉取成功回调"""
        pass
    
    @abstractmethod
    def on_exception(self, exception: Exception):
        """拉取异常回调"""
        pass

# 拉取消息请求
@dataclass
class PullMessageRequest:
    consumer_group: str
    topic: str
    queue_id: int
    queue_offset: int
    max_msg_nums: int
    sys_flag: int
    commit_offset: int
    suspend_timeout_millis: int
    subscription: str
    sub_version: int

# 拉取消息响应
@dataclass
class PullMessageResponse:
    code: int
    next_begin_offset: int
    min_offset: int
    max_offset: int
    messages: List[MessageExt] = None
    
    def __post_init__(self):
        if self.messages is None:
            self.messages = []

4.3.2 Pull消费者使用示例

# Pull消费者使用示例
class PullConsumerExample:
    """Pull消费者使用示例"""
    
    def __init__(self):
        # 配置消费者
        self.config = ConsumerConfig(
            consumer_group="pull_consumer_group",
            name_server_addr="localhost:9876",
            message_model=MessageModel.CLUSTERING
        )
        
        # 创建Pull消费者
        self.consumer = DefaultMQPullConsumer(self.config)
    
    def start_consume(self):
        """开始消费"""
        try:
            # 启动消费者
            self.consumer.start()
            
            # 获取消息队列
            topic = "TestTopic"
            message_queues = self.consumer.fetch_subscribe_message_queues(topic)
            
            print(f"获取到 {len(message_queues)} 个消息队列")
            
            # 为每个队列启动消费线程
            for mq in message_queues:
                threading.Thread(
                    target=self._consume_from_queue,
                    args=(mq,),
                    daemon=True
                ).start()
            
            # 主线程等待
            while True:
                time.sleep(10)
                stats = self.consumer.get_pull_stats()
                print(f"拉取统计: {stats}")
                
        except KeyboardInterrupt:
            print("收到中断信号,正在关闭消费者...")
        finally:
            self.consumer.shutdown()
    
    def _consume_from_queue(self, mq: 'MessageQueue'):
        """从指定队列消费消息"""
        print(f"开始消费队列: {mq}")
        
        # 获取消费偏移量
        offset = self.consumer.fetch_consume_offset(mq, from_store=True)
        if offset < 0:
            offset = 0
        
        while True:
            try:
                # 拉取消息
                pull_result = self.consumer.pull_block_if_not_found(
                    mq=mq,
                    sub_expression="*",
                    offset=offset,
                    max_nums=32,
                    timeout=20000
                )
                
                # 处理拉取结果
                if pull_result.pull_status == PullStatus.FOUND:
                    # 消费消息
                    for msg in pull_result.msg_found_list:
                        self._process_message(msg)
                    
                    # 更新偏移量
                    offset = pull_result.next_begin_offset
                    self.consumer.update_consume_offset(mq, offset)
                    
                elif pull_result.pull_status == PullStatus.NO_NEW_MSG:
                    print(f"队列 {mq} 暂无新消息")
                    time.sleep(1)
                    
                elif pull_result.pull_status == PullStatus.NO_MATCHED_MSG:
                    offset = pull_result.next_begin_offset
                    
                else:
                    print(f"拉取消息失败: {pull_result.pull_status}")
                    time.sleep(1)
                    
            except Exception as e:
                print(f"消费队列 {mq} 异常: {e}")
                time.sleep(5)
    
    def _process_message(self, msg: MessageExt):
        """处理消息"""
        try:
            print(f"处理消息: Topic={msg.topic}, Tags={msg.tags}, Body={msg.body.decode('utf-8')}")
            
            # 这里添加具体的业务逻辑
            # ...
            
        except Exception as e:
            print(f"处理消息异常: {msg.msg_id}, 错误: {e}")

# 异步拉取回调实现
class AsyncPullCallback(PullCallback):
    """异步拉取回调实现"""
    
    def __init__(self, consumer: DefaultMQPullConsumer, mq: 'MessageQueue'):
        self.consumer = consumer
        self.mq = mq
        self.processed_count = 0
    
    def on_success(self, pull_result: PullResult):
        """拉取成功回调"""
        try:
            if pull_result.pull_status == PullStatus.FOUND:
                # 处理消息
                for msg in pull_result.msg_found_list:
                    self._process_message(msg)
                    self.processed_count += 1
                
                # 更新偏移量
                self.consumer.update_consume_offset(self.mq, pull_result.next_begin_offset)
                
                print(f"异步处理 {len(pull_result.msg_found_list)} 条消息")
            
            # 继续拉取下一批消息
            self._continue_pull(pull_result.next_begin_offset)
            
        except Exception as e:
            print(f"处理拉取结果异常: {e}")
    
    def on_exception(self, exception: Exception):
        """拉取异常回调"""
        print(f"异步拉取异常: {exception}")
        
        # 延迟后重试
        time.sleep(1)
        offset = self.consumer.fetch_consume_offset(self.mq, from_store=True)
        self._continue_pull(offset if offset >= 0 else 0)
    
    def _process_message(self, msg: MessageExt):
        """处理消息"""
        print(f"异步处理消息: {msg.msg_id}")
        # 添加具体业务逻辑
    
    def _continue_pull(self, offset: int):
        """继续拉取消息"""
        self.consumer.pull_async(
            mq=self.mq,
            sub_expression="*",
            offset=offset,
            max_nums=32,
            callback=self,
            timeout=20000
        )

# 使用示例
if __name__ == "__main__":
    example = PullConsumerExample()
    example.start_consume()

4.4 消费者最佳实践

4.4.1 消费幂等性

class IdempotentMessageProcessor:
    """幂等消息处理器"""
    
    def __init__(self, cache_size: int = 10000):
        self.processed_messages = {}  # 已处理消息缓存
        self.cache_size = cache_size
        self.lock = threading.Lock()
    
    def is_processed(self, msg_id: str) -> bool:
        """检查消息是否已处理"""
        with self.lock:
            return msg_id in self.processed_messages
    
    def mark_processed(self, msg_id: str):
        """标记消息已处理"""
        with self.lock:
            self.processed_messages[msg_id] = int(time.time() * 1000)
            
            # 清理过期缓存
            if len(self.processed_messages) > self.cache_size:
                self._cleanup_cache()
    
    def _cleanup_cache(self):
        """清理过期缓存"""
        current_time = int(time.time() * 1000)
        expired_keys = [
            msg_id for msg_id, timestamp in self.processed_messages.items()
            if current_time - timestamp > 3600000  # 1小时过期
        ]
        
        for key in expired_keys:
            del self.processed_messages[key]

class IdempotentMessageListener(MessageListenerConcurrently):
    """幂等消息监听器"""
    
    def __init__(self, business_processor: Callable[[MessageExt], bool]):
        self.business_processor = business_processor
        self.idempotent_processor = IdempotentMessageProcessor()
    
    def consume_message(self, messages: List[MessageExt], context: ConsumeContext) -> ConsumeResult:
        """幂等消费消息"""
        for msg in messages:
            try:
                # 检查消息是否已处理
                if self.idempotent_processor.is_processed(msg.msg_id):
                    print(f"消息已处理,跳过: {msg.msg_id}")
                    continue
                
                # 处理业务逻辑
                success = self.business_processor(msg)
                
                if success:
                    # 标记消息已处理
                    self.idempotent_processor.mark_processed(msg.msg_id)
                else:
                    print(f"业务处理失败: {msg.msg_id}")
                    return ConsumeResult.RECONSUME_LATER
                    
            except Exception as e:
                print(f"处理消息异常: {msg.msg_id}, 错误: {e}")
                return ConsumeResult.RECONSUME_LATER
        
        return ConsumeResult.SUCCESS

4.4.2 消费限流

class RateLimitedMessageListener(MessageListenerConcurrently):
    """限流消息监听器"""
    
    def __init__(self, business_processor: Callable[[MessageExt], bool], 
                 max_rate: int = 100):
        self.business_processor = business_processor
        self.max_rate = max_rate  # 每秒最大处理数
        self.tokens = max_rate
        self.last_refill = time.time()
        self.lock = threading.Lock()
    
    def consume_message(self, messages: List[MessageExt], context: ConsumeContext) -> ConsumeResult:
        """限流消费消息"""
        # 检查令牌桶
        if not self._acquire_tokens(len(messages)):
            print(f"触发限流,延迟消费 {len(messages)} 条消息")
            time.sleep(0.1)
            return ConsumeResult.RECONSUME_LATER
        
        # 处理消息
        for msg in messages:
            try:
                success = self.business_processor(msg)
                if not success:
                    return ConsumeResult.RECONSUME_LATER
            except Exception as e:
                print(f"处理消息异常: {msg.msg_id}, 错误: {e}")
                return ConsumeResult.RECONSUME_LATER
        
        return ConsumeResult.SUCCESS
    
    def _acquire_tokens(self, count: int) -> bool:
        """获取令牌"""
        with self.lock:
            now = time.time()
            
            # 补充令牌
            if now > self.last_refill:
                elapsed = now - self.last_refill
                self.tokens = min(self.max_rate, self.tokens + elapsed * self.max_rate)
                self.last_refill = now
            
            # 检查是否有足够令牌
            if self.tokens >= count:
                self.tokens -= count
                return True
            
            return False

4.4.3 消费监控

class MonitoringMessageListener(MessageListenerConcurrently):
    """监控消息监听器"""
    
    def __init__(self, business_processor: Callable[[MessageExt], bool]):
        self.business_processor = business_processor
        self.metrics = {
            "total_consumed": 0,
            "success_consumed": 0,
            "failed_consumed": 0,
            "avg_process_time": 0,
            "last_consume_time": 0
        }
        self.process_times = []
        self.lock = threading.Lock()
    
    def consume_message(self, messages: List[MessageExt], context: ConsumeContext) -> ConsumeResult:
        """监控消费消息"""
        start_time = time.time()
        
        try:
            for msg in messages:
                success = self.business_processor(msg)
                if not success:
                    self._update_metrics(len(messages), 0, time.time() - start_time)
                    return ConsumeResult.RECONSUME_LATER
            
            self._update_metrics(len(messages), len(messages), time.time() - start_time)
            return ConsumeResult.SUCCESS
            
        except Exception as e:
            print(f"消费消息异常: {e}")
            self._update_metrics(len(messages), 0, time.time() - start_time)
            return ConsumeResult.RECONSUME_LATER
    
    def _update_metrics(self, total: int, success: int, process_time: float):
        """更新监控指标"""
        with self.lock:
            self.metrics["total_consumed"] += total
            self.metrics["success_consumed"] += success
            self.metrics["failed_consumed"] += (total - success)
            self.metrics["last_consume_time"] = int(time.time() * 1000)
            
            # 更新平均处理时间
            self.process_times.append(process_time)
            if len(self.process_times) > 100:  # 保留最近100次记录
                self.process_times.pop(0)
            
            self.metrics["avg_process_time"] = sum(self.process_times) / len(self.process_times)
    
    def get_metrics(self) -> Dict[str, float]:
        """获取监控指标"""
        with self.lock:
            return self.metrics.copy()
    
    def print_metrics(self):
        """打印监控指标"""
        metrics = self.get_metrics()
        print(f"消费监控指标:")
        print(f"  总消费数: {metrics['total_consumed']}")
        print(f"  成功消费数: {metrics['success_consumed']}")
        print(f"  失败消费数: {metrics['failed_consumed']}")
        print(f"  平均处理时间: {metrics['avg_process_time']:.3f}s")
        print(f"  最后消费时间: {metrics['last_consume_time']}")

4.5 本章总结

4.5.1 核心知识点

  1. 消费者类型

    • Push消费者:服务端主动推送消息
    • Pull消费者:客户端主动拉取消息
  2. 消费模式

    • 并发消费:多线程并发处理消息
    • 顺序消费:单线程顺序处理消息
  3. 消息模型

    • 集群模式:消费者组内负载均衡
    • 广播模式:每个消费者都消费所有消息
  4. 消费最佳实践

    • 幂等性处理
    • 限流控制
    • 监控告警

4.5.2 最佳实践

  1. 选择合适的消费者类型
  2. 实现消费幂等性
  3. 合理设置消费参数
  4. 添加监控和告警
  5. 处理消费异常

4.5.3 练习题

  1. 实现一个支持批量消费的消息监听器
  2. 设计一个消费者负载均衡算法
  3. 实现消费者的健康检查机制
  4. 开发一个消费延迟监控工具
  5. 设计消费者的故障恢复策略