4.1 消费者基础
4.1.1 消费者核心概念
from enum import Enum
from typing import List, Dict, Optional, Callable
from dataclasses import dataclass
from abc import ABC, abstractmethod
import time
import threading
# 消费者状态枚举
class ConsumerState(Enum):
CREATE_JUST = "CREATE_JUST"
RUNNING = "RUNNING"
SHUTDOWN_ALREADY = "SHUTDOWN_ALREADY"
SERVICE_STATE_OK = "SERVICE_STATE_OK"
# 消费模式枚举
class ConsumeMode(Enum):
CONCURRENTLY = "CONCURRENTLY" # 并发消费
ORDERLY = "ORDERLY" # 顺序消费
# 消息模型枚举
class MessageModel(Enum):
BROADCASTING = "BROADCASTING" # 广播模式
CLUSTERING = "CLUSTERING" # 集群模式
# 消费起始位置枚举
class ConsumeFromWhere(Enum):
CONSUME_FROM_LAST_OFFSET = "CONSUME_FROM_LAST_OFFSET"
CONSUME_FROM_FIRST_OFFSET = "CONSUME_FROM_FIRST_OFFSET"
CONSUME_FROM_TIMESTAMP = "CONSUME_FROM_TIMESTAMP"
# 消费结果枚举
class ConsumeResult(Enum):
SUCCESS = "SUCCESS"
RECONSUME_LATER = "RECONSUME_LATER"
# 拉取状态枚举
class PullStatus(Enum):
FOUND = "FOUND"
NO_NEW_MSG = "NO_NEW_MSG"
NO_MATCHED_MSG = "NO_MATCHED_MSG"
OFFSET_ILLEGAL = "OFFSET_ILLEGAL"
# 消息扩展信息
@dataclass
class MessageExt:
topic: str
tags: str
keys: str
body: bytes
msg_id: str
queue_id: int
queue_offset: int
born_timestamp: int
store_timestamp: int
reconsume_times: int = 0
properties: Dict[str, str] = None
def __post_init__(self):
if self.properties is None:
self.properties = {}
def get_property(self, key: str) -> Optional[str]:
"""获取消息属性"""
return self.properties.get(key)
def put_property(self, key: str, value: str):
"""设置消息属性"""
self.properties[key] = value
# 消费上下文
@dataclass
class ConsumeContext:
consumer_group: str
message_queue: 'MessageQueue'
ack_index: int = -1
delay_level_when_next_consume: int = 0
def get_delay_level_when_next_consume(self) -> int:
return self.delay_level_when_next_consume
def set_delay_level_when_next_consume(self, delay_level: int):
self.delay_level_when_next_consume = delay_level
# 拉取结果
@dataclass
class PullResult:
pull_status: PullStatus
next_begin_offset: int
min_offset: int
max_offset: int
msg_found_list: List[MessageExt] = None
def __post_init__(self):
if self.msg_found_list is None:
self.msg_found_list = []
# 消费者配置
@dataclass
class ConsumerConfig:
consumer_group: str
name_server_addr: str
consume_mode: ConsumeMode = ConsumeMode.CONCURRENTLY
message_model: MessageModel = MessageModel.CLUSTERING
consume_from_where: ConsumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
consume_timestamp: str = "20231201080000" # yyyyMMddHHmmss
max_reconsume_times: int = 16
consume_timeout: int = 15 # 分钟
consume_thread_min: int = 20
consume_thread_max: int = 64
adjust_thread_pool_nums_threshold: int = 100000
pull_interval: int = 0 # 毫秒
pull_batch_size: int = 32
consume_message_batch_max_size: int = 1
post_subscription_when_pull: bool = False
unit_name: str = ""
max_cached_message_count: int = 1000
max_cached_message_size_in_mib: int = 512
def validate(self) -> bool:
"""验证配置"""
if not self.consumer_group or not self.name_server_addr:
return False
if self.consume_thread_min <= 0 or self.consume_thread_max <= 0:
return False
if self.consume_thread_min > self.consume_thread_max:
return False
return True
# 消费者异常类
class ConsumerException(Exception):
"""消费者基础异常"""
pass
class ConsumerStartException(ConsumerException):
"""消费者启动异常"""
pass
class ConsumerShutdownException(ConsumerException):
"""消费者关闭异常"""
pass
class SubscriptionException(ConsumerException):
"""订阅异常"""
pass
class ConsumeException(ConsumerException):
"""消费异常"""
pass
class PullException(ConsumerException):
"""拉取异常"""
pass
4.1.2 消息监听器接口
# 消息监听器接口
class MessageListener(ABC):
"""消息监听器基础接口"""
@abstractmethod
def consume_message(self, messages: List[MessageExt], context: ConsumeContext) -> ConsumeResult:
"""消费消息"""
pass
# 并发消息监听器
class MessageListenerConcurrently(MessageListener):
"""并发消息监听器接口"""
@abstractmethod
def consume_message(self, messages: List[MessageExt], context: ConsumeContext) -> ConsumeResult:
"""并发消费消息"""
pass
# 顺序消息监听器
class MessageListenerOrderly(MessageListener):
"""顺序消息监听器接口"""
@abstractmethod
def consume_message(self, messages: List[MessageExt], context: ConsumeContext) -> ConsumeResult:
"""顺序消费消息"""
pass
# 示例消息监听器实现
class SimpleMessageListener(MessageListenerConcurrently):
"""简单消息监听器实现"""
def __init__(self, handler: Callable[[List[MessageExt]], ConsumeResult] = None):
self.handler = handler or self._default_handler
self.consume_count = 0
self.success_count = 0
self.failed_count = 0
def consume_message(self, messages: List[MessageExt], context: ConsumeContext) -> ConsumeResult:
"""消费消息"""
try:
self.consume_count += len(messages)
result = self.handler(messages)
if result == ConsumeResult.SUCCESS:
self.success_count += len(messages)
else:
self.failed_count += len(messages)
return result
except Exception as e:
print(f"消费消息异常: {e}")
self.failed_count += len(messages)
return ConsumeResult.RECONSUME_LATER
def _default_handler(self, messages: List[MessageExt]) -> ConsumeResult:
"""默认消息处理器"""
for msg in messages:
print(f"消费消息: Topic={msg.topic}, Tags={msg.tags}, Body={msg.body.decode('utf-8')}")
return ConsumeResult.SUCCESS
def get_statistics(self) -> Dict[str, int]:
"""获取消费统计"""
return {
"consume_count": self.consume_count,
"success_count": self.success_count,
"failed_count": self.failed_count
}
# 业务消息监听器
class BusinessMessageListener(MessageListenerConcurrently):
"""业务消息监听器"""
def __init__(self, business_handler: Callable[[MessageExt], bool]):
self.business_handler = business_handler
self.retry_times = {}
def consume_message(self, messages: List[MessageExt], context: ConsumeContext) -> ConsumeResult:
"""消费业务消息"""
for msg in messages:
try:
# 检查重试次数
if msg.reconsume_times >= 3:
print(f"消息重试次数过多,丢弃消息: {msg.msg_id}")
continue
# 处理业务逻辑
success = self.business_handler(msg)
if not success:
print(f"业务处理失败,消息将重试: {msg.msg_id}")
return ConsumeResult.RECONSUME_LATER
except Exception as e:
print(f"处理消息异常: {msg.msg_id}, 错误: {e}")
return ConsumeResult.RECONSUME_LATER
return ConsumeResult.SUCCESS
4.2 Push消费者
4.2.1 Push消费者实现
class DefaultMQPushConsumer:
"""默认Push消费者实现"""
def __init__(self, config: ConsumerConfig):
self.config = config
self.state = ConsumerState.CREATE_JUST
self.subscription_table: Dict[str, str] = {}
self.message_listener: Optional[MessageListener] = None
self.consume_service: Optional['ConsumeMessageService'] = None
self.pull_service: Optional['PullMessageService'] = None
self.rebalance_service: Optional['RebalanceService'] = None
self.offset_store: Optional['OffsetStore'] = None
self.client_factory: Optional['MQClientFactory'] = None
self.consume_stats = {
"total_consumed": 0,
"success_consumed": 0,
"failed_consumed": 0,
"start_time": 0
}
def start(self):
"""启动消费者"""
if self.state != ConsumerState.CREATE_JUST:
raise ConsumerStartException(f"消费者状态错误: {self.state}")
if not self.config.validate():
raise ConsumerStartException("消费者配置无效")
if not self.message_listener:
raise ConsumerStartException("未设置消息监听器")
try:
# 初始化客户端工厂
self.client_factory = MQClientFactory(self.config.name_server_addr)
# 初始化偏移量存储
if self.config.message_model == MessageModel.BROADCASTING:
self.offset_store = LocalFileOffsetStore(self.config.consumer_group)
else:
self.offset_store = RemoteBrokerOffsetStore(self.config.consumer_group, self.client_factory)
# 初始化消费服务
if self.config.consume_mode == ConsumeMode.CONCURRENTLY:
self.consume_service = ConsumeMessageConcurrentlyService(self, self.message_listener)
else:
self.consume_service = ConsumeMessageOrderlyService(self, self.message_listener)
# 初始化拉取服务
self.pull_service = PullMessageService(self)
# 初始化负载均衡服务
self.rebalance_service = RebalanceService(self)
# 启动各个服务
self.offset_store.load()
self.consume_service.start()
self.pull_service.start()
self.rebalance_service.start()
# 注册消费者
self.client_factory.register_consumer(self.config.consumer_group, self)
self.state = ConsumerState.RUNNING
self.consume_stats["start_time"] = int(time.time() * 1000)
print(f"Push消费者启动成功: {self.config.consumer_group}")
except Exception as e:
self.state = ConsumerState.CREATE_JUST
raise ConsumerStartException(f"启动消费者失败: {e}")
def shutdown(self):
"""关闭消费者"""
if self.state != ConsumerState.RUNNING:
return
try:
# 停止各个服务
if self.rebalance_service:
self.rebalance_service.shutdown()
if self.pull_service:
self.pull_service.shutdown()
if self.consume_service:
self.consume_service.shutdown()
if self.offset_store:
self.offset_store.persist_all()
# 注销消费者
if self.client_factory:
self.client_factory.unregister_consumer(self.config.consumer_group)
self.state = ConsumerState.SHUTDOWN_ALREADY
print(f"Push消费者关闭成功: {self.config.consumer_group}")
except Exception as e:
raise ConsumerShutdownException(f"关闭消费者失败: {e}")
def subscribe(self, topic: str, sub_expression: str = "*"):
"""订阅主题"""
if self.state == ConsumerState.RUNNING:
raise SubscriptionException("消费者运行时不能修改订阅")
self.subscription_table[topic] = sub_expression
print(f"订阅主题: {topic}, 表达式: {sub_expression}")
def unsubscribe(self, topic: str):
"""取消订阅"""
if self.state == ConsumerState.RUNNING:
raise SubscriptionException("消费者运行时不能修改订阅")
if topic in self.subscription_table:
del self.subscription_table[topic]
print(f"取消订阅主题: {topic}")
def register_message_listener(self, listener: MessageListener):
"""注册消息监听器"""
self.message_listener = listener
def get_subscription_table(self) -> Dict[str, str]:
"""获取订阅表"""
return self.subscription_table.copy()
def update_consume_stats(self, success_count: int, failed_count: int):
"""更新消费统计"""
self.consume_stats["total_consumed"] += success_count + failed_count
self.consume_stats["success_consumed"] += success_count
self.consume_stats["failed_consumed"] += failed_count
def get_consume_stats(self) -> Dict[str, int]:
"""获取消费统计"""
stats = self.consume_stats.copy()
if stats["start_time"] > 0:
stats["running_time_ms"] = int(time.time() * 1000) - stats["start_time"]
return stats
4.2.2 消费消息服务
class ConsumeMessageService(ABC):
"""消费消息服务接口"""
@abstractmethod
def start(self):
"""启动服务"""
pass
@abstractmethod
def shutdown(self):
"""关闭服务"""
pass
@abstractmethod
def submit_consume_request(self, consume_request: 'ConsumeRequest'):
"""提交消费请求"""
pass
class ConsumeMessageConcurrentlyService(ConsumeMessageService):
"""并发消费消息服务"""
def __init__(self, consumer: DefaultMQPushConsumer, listener: MessageListenerConcurrently):
self.consumer = consumer
self.listener = listener
self.consume_executor = None
self.scheduled_executor = None
self.running = False
self.consume_requests_queue = []
self.consume_requests_lock = threading.Lock()
def start(self):
"""启动并发消费服务"""
from concurrent.futures import ThreadPoolExecutor
import schedule
self.consume_executor = ThreadPoolExecutor(
max_workers=self.consumer.config.consume_thread_max,
thread_name_prefix="ConsumeMessageThread"
)
self.scheduled_executor = ThreadPoolExecutor(
max_workers=2,
thread_name_prefix="ConsumeScheduledThread"
)
self.running = True
# 启动消费请求处理线程
self.scheduled_executor.submit(self._process_consume_requests)
# 启动清理过期消息线程
self.scheduled_executor.submit(self._clean_expired_messages)
print("并发消费服务启动成功")
def shutdown(self):
"""关闭并发消费服务"""
self.running = False
if self.consume_executor:
self.consume_executor.shutdown(wait=True)
if self.scheduled_executor:
self.scheduled_executor.shutdown(wait=True)
print("并发消费服务关闭成功")
def submit_consume_request(self, consume_request: 'ConsumeRequest'):
"""提交消费请求"""
with self.consume_requests_lock:
self.consume_requests_queue.append(consume_request)
def _process_consume_requests(self):
"""处理消费请求"""
while self.running:
try:
with self.consume_requests_lock:
if self.consume_requests_queue:
request = self.consume_requests_queue.pop(0)
self.consume_executor.submit(self._consume_message, request)
time.sleep(0.01) # 避免CPU占用过高
except Exception as e:
print(f"处理消费请求异常: {e}")
def _consume_message(self, request: 'ConsumeRequest'):
"""消费消息"""
try:
context = ConsumeContext(
consumer_group=self.consumer.config.consumer_group,
message_queue=request.message_queue
)
# 调用监听器消费消息
result = self.listener.consume_message(request.messages, context)
# 处理消费结果
if result == ConsumeResult.SUCCESS:
# 更新偏移量
self._update_offset(request)
self.consumer.update_consume_stats(len(request.messages), 0)
else:
# 发送回Broker重新消费
self._send_message_back(request)
self.consumer.update_consume_stats(0, len(request.messages))
except Exception as e:
print(f"消费消息异常: {e}")
self._send_message_back(request)
self.consumer.update_consume_stats(0, len(request.messages))
def _update_offset(self, request: 'ConsumeRequest'):
"""更新消费偏移量"""
if request.messages:
last_msg = request.messages[-1]
new_offset = last_msg.queue_offset + 1
self.consumer.offset_store.update_offset(
request.message_queue,
new_offset,
increment_only=True
)
def _send_message_back(self, request: 'ConsumeRequest'):
"""发送消息回Broker"""
for msg in request.messages:
try:
# 增加重试次数
msg.reconsume_times += 1
# 如果重试次数超过限制,发送到死信队列
if msg.reconsume_times > self.consumer.config.max_reconsume_times:
print(f"消息重试次数超限,发送到死信队列: {msg.msg_id}")
# 这里应该发送到死信队列
else:
print(f"消息消费失败,发送回Broker重试: {msg.msg_id}")
# 这里应该发送回Broker
except Exception as e:
print(f"发送消息回Broker异常: {e}")
def _clean_expired_messages(self):
"""清理过期消息"""
while self.running:
try:
# 定期清理过期的消费请求
current_time = int(time.time() * 1000)
with self.consume_requests_lock:
self.consume_requests_queue = [
req for req in self.consume_requests_queue
if current_time - req.submit_timestamp < 300000 # 5分钟
]
time.sleep(60) # 每分钟清理一次
except Exception as e:
print(f"清理过期消息异常: {e}")
# 消费请求
@dataclass
class ConsumeRequest:
messages: List[MessageExt]
message_queue: 'MessageQueue'
process_queue: 'ProcessQueue'
submit_timestamp: int = 0
def __post_init__(self):
if self.submit_timestamp == 0:
self.submit_timestamp = int(time.time() * 1000)
4.3 Pull消费者
4.3.1 Pull消费者实现
class DefaultMQPullConsumer:
"""默认Pull消费者实现"""
def __init__(self, config: ConsumerConfig):
self.config = config
self.state = ConsumerState.CREATE_JUST
self.client_factory: Optional['MQClientFactory'] = None
self.offset_store: Optional['OffsetStore'] = None
self.pull_stats = {
"total_pulled": 0,
"success_pulled": 0,
"failed_pulled": 0,
"start_time": 0
}
def start(self):
"""启动Pull消费者"""
if self.state != ConsumerState.CREATE_JUST:
raise ConsumerStartException(f"消费者状态错误: {self.state}")
if not self.config.validate():
raise ConsumerStartException("消费者配置无效")
try:
# 初始化客户端工厂
self.client_factory = MQClientFactory(self.config.name_server_addr)
# 初始化偏移量存储
if self.config.message_model == MessageModel.BROADCASTING:
self.offset_store = LocalFileOffsetStore(self.config.consumer_group)
else:
self.offset_store = RemoteBrokerOffsetStore(self.config.consumer_group, self.client_factory)
# 加载偏移量
self.offset_store.load()
# 注册消费者
self.client_factory.register_consumer(self.config.consumer_group, self)
self.state = ConsumerState.RUNNING
self.pull_stats["start_time"] = int(time.time() * 1000)
print(f"Pull消费者启动成功: {self.config.consumer_group}")
except Exception as e:
self.state = ConsumerState.CREATE_JUST
raise ConsumerStartException(f"启动Pull消费者失败: {e}")
def shutdown(self):
"""关闭Pull消费者"""
if self.state != ConsumerState.RUNNING:
return
try:
# 持久化偏移量
if self.offset_store:
self.offset_store.persist_all()
# 注销消费者
if self.client_factory:
self.client_factory.unregister_consumer(self.config.consumer_group)
self.state = ConsumerState.SHUTDOWN_ALREADY
print(f"Pull消费者关闭成功: {self.config.consumer_group}")
except Exception as e:
raise ConsumerShutdownException(f"关闭Pull消费者失败: {e}")
def fetch_subscribe_message_queues(self, topic: str) -> List['MessageQueue']:
"""获取订阅的消息队列"""
if self.state != ConsumerState.RUNNING:
raise PullException("消费者未运行")
try:
# 从NameServer获取Topic路由信息
route_data = self.client_factory.get_topic_route_info(topic)
if not route_data or not route_data.queue_datas:
return []
message_queues = []
for queue_data in route_data.queue_datas:
for i in range(queue_data.read_queue_nums):
mq = MessageQueue(topic, queue_data.broker_name, i)
message_queues.append(mq)
return message_queues
except Exception as e:
raise PullException(f"获取消息队列失败: {e}")
def pull_block_if_not_found(self, mq: 'MessageQueue', sub_expression: str,
offset: int, max_nums: int, timeout: int = 20000) -> PullResult:
"""拉取消息(阻塞模式)"""
return self._pull_sync_impl(mq, sub_expression, offset, max_nums, timeout, True)
def pull_no_block(self, mq: 'MessageQueue', sub_expression: str,
offset: int, max_nums: int) -> PullResult:
"""拉取消息(非阻塞模式)"""
return self._pull_sync_impl(mq, sub_expression, offset, max_nums, 0, False)
def pull_async(self, mq: 'MessageQueue', sub_expression: str, offset: int,
max_nums: int, callback: 'PullCallback', timeout: int = 20000):
"""异步拉取消息"""
if self.state != ConsumerState.RUNNING:
raise PullException("消费者未运行")
def async_pull():
try:
result = self._pull_sync_impl(mq, sub_expression, offset, max_nums, timeout, True)
callback.on_success(result)
self.pull_stats["success_pulled"] += len(result.msg_found_list)
except Exception as e:
callback.on_exception(e)
self.pull_stats["failed_pulled"] += 1
# 异步执行拉取
threading.Thread(target=async_pull, daemon=True).start()
def _pull_sync_impl(self, mq: 'MessageQueue', sub_expression: str,
offset: int, max_nums: int, timeout: int, block: bool) -> PullResult:
"""同步拉取消息实现"""
if self.state != ConsumerState.RUNNING:
raise PullException("消费者未运行")
try:
# 验证参数
if max_nums <= 0 or max_nums > 32:
raise PullException("拉取消息数量必须在1-32之间")
# 获取Broker地址
broker_addr = self.client_factory.find_broker_address_in_subscribe(
mq.broker_name, 0, False
)
if not broker_addr:
raise PullException(f"找不到Broker地址: {mq.broker_name}")
# 构建拉取请求
pull_request = PullMessageRequest(
consumer_group=self.config.consumer_group,
topic=mq.topic,
queue_id=mq.queue_id,
queue_offset=offset,
max_msg_nums=max_nums,
sys_flag=0,
commit_offset=offset,
suspend_timeout_millis=timeout if block else 0,
subscription=sub_expression,
sub_version=int(time.time() * 1000)
)
# 发送拉取请求到Broker
response = self.client_factory.send_pull_request(broker_addr, pull_request)
# 解析响应
result = self._parse_pull_response(response)
self.pull_stats["total_pulled"] += len(result.msg_found_list)
self.pull_stats["success_pulled"] += len(result.msg_found_list)
return result
except Exception as e:
self.pull_stats["failed_pulled"] += 1
raise PullException(f"拉取消息失败: {e}")
def _parse_pull_response(self, response: 'PullMessageResponse') -> PullResult:
"""解析拉取响应"""
if response.code == 0: # SUCCESS
return PullResult(
pull_status=PullStatus.FOUND,
next_begin_offset=response.next_begin_offset,
min_offset=response.min_offset,
max_offset=response.max_offset,
msg_found_list=response.messages or []
)
elif response.code == 1: # PULL_NOT_FOUND
return PullResult(
pull_status=PullStatus.NO_NEW_MSG,
next_begin_offset=response.next_begin_offset,
min_offset=response.min_offset,
max_offset=response.max_offset
)
elif response.code == 2: # PULL_RETRY_IMMEDIATELY
return PullResult(
pull_status=PullStatus.NO_MATCHED_MSG,
next_begin_offset=response.next_begin_offset,
min_offset=response.min_offset,
max_offset=response.max_offset
)
else:
return PullResult(
pull_status=PullStatus.OFFSET_ILLEGAL,
next_begin_offset=response.next_begin_offset,
min_offset=response.min_offset,
max_offset=response.max_offset
)
def update_consume_offset(self, mq: 'MessageQueue', offset: int):
"""更新消费偏移量"""
if self.offset_store:
self.offset_store.update_offset(mq, offset, increment_only=False)
def fetch_consume_offset(self, mq: 'MessageQueue', from_store: bool = True) -> int:
"""获取消费偏移量"""
if self.offset_store:
return self.offset_store.read_offset(mq, from_store)
return -1
def get_pull_stats(self) -> Dict[str, int]:
"""获取拉取统计"""
stats = self.pull_stats.copy()
if stats["start_time"] > 0:
stats["running_time_ms"] = int(time.time() * 1000) - stats["start_time"]
return stats
# 拉取回调接口
class PullCallback(ABC):
"""拉取回调接口"""
@abstractmethod
def on_success(self, pull_result: PullResult):
"""拉取成功回调"""
pass
@abstractmethod
def on_exception(self, exception: Exception):
"""拉取异常回调"""
pass
# 拉取消息请求
@dataclass
class PullMessageRequest:
consumer_group: str
topic: str
queue_id: int
queue_offset: int
max_msg_nums: int
sys_flag: int
commit_offset: int
suspend_timeout_millis: int
subscription: str
sub_version: int
# 拉取消息响应
@dataclass
class PullMessageResponse:
code: int
next_begin_offset: int
min_offset: int
max_offset: int
messages: List[MessageExt] = None
def __post_init__(self):
if self.messages is None:
self.messages = []
4.3.2 Pull消费者使用示例
# Pull消费者使用示例
class PullConsumerExample:
"""Pull消费者使用示例"""
def __init__(self):
# 配置消费者
self.config = ConsumerConfig(
consumer_group="pull_consumer_group",
name_server_addr="localhost:9876",
message_model=MessageModel.CLUSTERING
)
# 创建Pull消费者
self.consumer = DefaultMQPullConsumer(self.config)
def start_consume(self):
"""开始消费"""
try:
# 启动消费者
self.consumer.start()
# 获取消息队列
topic = "TestTopic"
message_queues = self.consumer.fetch_subscribe_message_queues(topic)
print(f"获取到 {len(message_queues)} 个消息队列")
# 为每个队列启动消费线程
for mq in message_queues:
threading.Thread(
target=self._consume_from_queue,
args=(mq,),
daemon=True
).start()
# 主线程等待
while True:
time.sleep(10)
stats = self.consumer.get_pull_stats()
print(f"拉取统计: {stats}")
except KeyboardInterrupt:
print("收到中断信号,正在关闭消费者...")
finally:
self.consumer.shutdown()
def _consume_from_queue(self, mq: 'MessageQueue'):
"""从指定队列消费消息"""
print(f"开始消费队列: {mq}")
# 获取消费偏移量
offset = self.consumer.fetch_consume_offset(mq, from_store=True)
if offset < 0:
offset = 0
while True:
try:
# 拉取消息
pull_result = self.consumer.pull_block_if_not_found(
mq=mq,
sub_expression="*",
offset=offset,
max_nums=32,
timeout=20000
)
# 处理拉取结果
if pull_result.pull_status == PullStatus.FOUND:
# 消费消息
for msg in pull_result.msg_found_list:
self._process_message(msg)
# 更新偏移量
offset = pull_result.next_begin_offset
self.consumer.update_consume_offset(mq, offset)
elif pull_result.pull_status == PullStatus.NO_NEW_MSG:
print(f"队列 {mq} 暂无新消息")
time.sleep(1)
elif pull_result.pull_status == PullStatus.NO_MATCHED_MSG:
offset = pull_result.next_begin_offset
else:
print(f"拉取消息失败: {pull_result.pull_status}")
time.sleep(1)
except Exception as e:
print(f"消费队列 {mq} 异常: {e}")
time.sleep(5)
def _process_message(self, msg: MessageExt):
"""处理消息"""
try:
print(f"处理消息: Topic={msg.topic}, Tags={msg.tags}, Body={msg.body.decode('utf-8')}")
# 这里添加具体的业务逻辑
# ...
except Exception as e:
print(f"处理消息异常: {msg.msg_id}, 错误: {e}")
# 异步拉取回调实现
class AsyncPullCallback(PullCallback):
"""异步拉取回调实现"""
def __init__(self, consumer: DefaultMQPullConsumer, mq: 'MessageQueue'):
self.consumer = consumer
self.mq = mq
self.processed_count = 0
def on_success(self, pull_result: PullResult):
"""拉取成功回调"""
try:
if pull_result.pull_status == PullStatus.FOUND:
# 处理消息
for msg in pull_result.msg_found_list:
self._process_message(msg)
self.processed_count += 1
# 更新偏移量
self.consumer.update_consume_offset(self.mq, pull_result.next_begin_offset)
print(f"异步处理 {len(pull_result.msg_found_list)} 条消息")
# 继续拉取下一批消息
self._continue_pull(pull_result.next_begin_offset)
except Exception as e:
print(f"处理拉取结果异常: {e}")
def on_exception(self, exception: Exception):
"""拉取异常回调"""
print(f"异步拉取异常: {exception}")
# 延迟后重试
time.sleep(1)
offset = self.consumer.fetch_consume_offset(self.mq, from_store=True)
self._continue_pull(offset if offset >= 0 else 0)
def _process_message(self, msg: MessageExt):
"""处理消息"""
print(f"异步处理消息: {msg.msg_id}")
# 添加具体业务逻辑
def _continue_pull(self, offset: int):
"""继续拉取消息"""
self.consumer.pull_async(
mq=self.mq,
sub_expression="*",
offset=offset,
max_nums=32,
callback=self,
timeout=20000
)
# 使用示例
if __name__ == "__main__":
example = PullConsumerExample()
example.start_consume()
4.4 消费者最佳实践
4.4.1 消费幂等性
class IdempotentMessageProcessor:
"""幂等消息处理器"""
def __init__(self, cache_size: int = 10000):
self.processed_messages = {} # 已处理消息缓存
self.cache_size = cache_size
self.lock = threading.Lock()
def is_processed(self, msg_id: str) -> bool:
"""检查消息是否已处理"""
with self.lock:
return msg_id in self.processed_messages
def mark_processed(self, msg_id: str):
"""标记消息已处理"""
with self.lock:
self.processed_messages[msg_id] = int(time.time() * 1000)
# 清理过期缓存
if len(self.processed_messages) > self.cache_size:
self._cleanup_cache()
def _cleanup_cache(self):
"""清理过期缓存"""
current_time = int(time.time() * 1000)
expired_keys = [
msg_id for msg_id, timestamp in self.processed_messages.items()
if current_time - timestamp > 3600000 # 1小时过期
]
for key in expired_keys:
del self.processed_messages[key]
class IdempotentMessageListener(MessageListenerConcurrently):
"""幂等消息监听器"""
def __init__(self, business_processor: Callable[[MessageExt], bool]):
self.business_processor = business_processor
self.idempotent_processor = IdempotentMessageProcessor()
def consume_message(self, messages: List[MessageExt], context: ConsumeContext) -> ConsumeResult:
"""幂等消费消息"""
for msg in messages:
try:
# 检查消息是否已处理
if self.idempotent_processor.is_processed(msg.msg_id):
print(f"消息已处理,跳过: {msg.msg_id}")
continue
# 处理业务逻辑
success = self.business_processor(msg)
if success:
# 标记消息已处理
self.idempotent_processor.mark_processed(msg.msg_id)
else:
print(f"业务处理失败: {msg.msg_id}")
return ConsumeResult.RECONSUME_LATER
except Exception as e:
print(f"处理消息异常: {msg.msg_id}, 错误: {e}")
return ConsumeResult.RECONSUME_LATER
return ConsumeResult.SUCCESS
4.4.2 消费限流
class RateLimitedMessageListener(MessageListenerConcurrently):
"""限流消息监听器"""
def __init__(self, business_processor: Callable[[MessageExt], bool],
max_rate: int = 100):
self.business_processor = business_processor
self.max_rate = max_rate # 每秒最大处理数
self.tokens = max_rate
self.last_refill = time.time()
self.lock = threading.Lock()
def consume_message(self, messages: List[MessageExt], context: ConsumeContext) -> ConsumeResult:
"""限流消费消息"""
# 检查令牌桶
if not self._acquire_tokens(len(messages)):
print(f"触发限流,延迟消费 {len(messages)} 条消息")
time.sleep(0.1)
return ConsumeResult.RECONSUME_LATER
# 处理消息
for msg in messages:
try:
success = self.business_processor(msg)
if not success:
return ConsumeResult.RECONSUME_LATER
except Exception as e:
print(f"处理消息异常: {msg.msg_id}, 错误: {e}")
return ConsumeResult.RECONSUME_LATER
return ConsumeResult.SUCCESS
def _acquire_tokens(self, count: int) -> bool:
"""获取令牌"""
with self.lock:
now = time.time()
# 补充令牌
if now > self.last_refill:
elapsed = now - self.last_refill
self.tokens = min(self.max_rate, self.tokens + elapsed * self.max_rate)
self.last_refill = now
# 检查是否有足够令牌
if self.tokens >= count:
self.tokens -= count
return True
return False
4.4.3 消费监控
class MonitoringMessageListener(MessageListenerConcurrently):
"""监控消息监听器"""
def __init__(self, business_processor: Callable[[MessageExt], bool]):
self.business_processor = business_processor
self.metrics = {
"total_consumed": 0,
"success_consumed": 0,
"failed_consumed": 0,
"avg_process_time": 0,
"last_consume_time": 0
}
self.process_times = []
self.lock = threading.Lock()
def consume_message(self, messages: List[MessageExt], context: ConsumeContext) -> ConsumeResult:
"""监控消费消息"""
start_time = time.time()
try:
for msg in messages:
success = self.business_processor(msg)
if not success:
self._update_metrics(len(messages), 0, time.time() - start_time)
return ConsumeResult.RECONSUME_LATER
self._update_metrics(len(messages), len(messages), time.time() - start_time)
return ConsumeResult.SUCCESS
except Exception as e:
print(f"消费消息异常: {e}")
self._update_metrics(len(messages), 0, time.time() - start_time)
return ConsumeResult.RECONSUME_LATER
def _update_metrics(self, total: int, success: int, process_time: float):
"""更新监控指标"""
with self.lock:
self.metrics["total_consumed"] += total
self.metrics["success_consumed"] += success
self.metrics["failed_consumed"] += (total - success)
self.metrics["last_consume_time"] = int(time.time() * 1000)
# 更新平均处理时间
self.process_times.append(process_time)
if len(self.process_times) > 100: # 保留最近100次记录
self.process_times.pop(0)
self.metrics["avg_process_time"] = sum(self.process_times) / len(self.process_times)
def get_metrics(self) -> Dict[str, float]:
"""获取监控指标"""
with self.lock:
return self.metrics.copy()
def print_metrics(self):
"""打印监控指标"""
metrics = self.get_metrics()
print(f"消费监控指标:")
print(f" 总消费数: {metrics['total_consumed']}")
print(f" 成功消费数: {metrics['success_consumed']}")
print(f" 失败消费数: {metrics['failed_consumed']}")
print(f" 平均处理时间: {metrics['avg_process_time']:.3f}s")
print(f" 最后消费时间: {metrics['last_consume_time']}")
4.5 本章总结
4.5.1 核心知识点
消费者类型:
- Push消费者:服务端主动推送消息
- Pull消费者:客户端主动拉取消息
消费模式:
- 并发消费:多线程并发处理消息
- 顺序消费:单线程顺序处理消息
消息模型:
- 集群模式:消费者组内负载均衡
- 广播模式:每个消费者都消费所有消息
消费最佳实践:
- 幂等性处理
- 限流控制
- 监控告警
4.5.2 最佳实践
- 选择合适的消费者类型
- 实现消费幂等性
- 合理设置消费参数
- 添加监控和告警
- 处理消费异常
4.5.3 练习题
- 实现一个支持批量消费的消息监听器
- 设计一个消费者负载均衡算法
- 实现消费者的健康检查机制
- 开发一个消费延迟监控工具
- 设计消费者的故障恢复策略