5.1 顺序消息
5.1.1 顺序消息概念
from enum import Enum
from typing import List, Dict, Optional, Callable
from dataclasses import dataclass
from abc import ABC, abstractmethod
import time
import threading
import hashlib
# 顺序类型枚举
class OrderType(Enum):
GLOBAL_ORDER = "GLOBAL_ORDER" # 全局顺序
PARTITION_ORDER = "PARTITION_ORDER" # 分区顺序
# 顺序消息状态
class OrderMessageStatus(Enum):
SUCCESS = "SUCCESS"
ROLLBACK = "ROLLBACK"
COMMIT = "COMMIT"
UNKNOWN = "UNKNOWN"
# 顺序消息
@dataclass
class OrderMessage:
topic: str
tags: str
keys: str
body: bytes
order_id: str # 顺序标识
order_type: OrderType = OrderType.PARTITION_ORDER
properties: Dict[str, str] = None
def __post_init__(self):
if self.properties is None:
self.properties = {}
# 设置顺序标识属性
self.properties["ORDER_ID"] = self.order_id
self.properties["ORDER_TYPE"] = self.order_type.value
def get_order_key(self) -> str:
"""获取顺序键"""
return self.order_id
# 顺序消息选择器
class OrderMessageQueueSelector:
"""顺序消息队列选择器"""
def select(self, queues: List['MessageQueue'], message: OrderMessage, arg: str) -> 'MessageQueue':
"""选择消息队列"""
if not queues:
raise ValueError("消息队列列表为空")
# 根据顺序ID计算队列索引
order_key = message.get_order_key()
hash_value = self._hash(order_key)
index = hash_value % len(queues)
return queues[index]
def _hash(self, key: str) -> int:
"""计算哈希值"""
return hash(key) & 0x7fffffff
# 顺序消息生产者
class OrderMessageProducer:
"""顺序消息生产者"""
def __init__(self, producer_config: 'ProducerConfig'):
self.config = producer_config
self.producer = None # 底层生产者
self.queue_selector = OrderMessageQueueSelector()
self.send_stats = {
"total_sent": 0,
"success_sent": 0,
"failed_sent": 0
}
def start(self):
"""启动顺序消息生产者"""
# 初始化底层生产者
from rocketmq_producer import DefaultMQProducer
self.producer = DefaultMQProducer(self.config)
self.producer.start()
print("顺序消息生产者启动成功")
def shutdown(self):
"""关闭顺序消息生产者"""
if self.producer:
self.producer.shutdown()
print("顺序消息生产者关闭成功")
def send_order_message(self, message: OrderMessage) -> 'SendResult':
"""发送顺序消息"""
try:
# 转换为普通消息
normal_message = self._convert_to_message(message)
# 使用顺序选择器发送
result = self.producer.send(
message=normal_message,
selector=self.queue_selector,
arg=message.order_id
)
self.send_stats["total_sent"] += 1
self.send_stats["success_sent"] += 1
return result
except Exception as e:
self.send_stats["total_sent"] += 1
self.send_stats["failed_sent"] += 1
raise Exception(f"发送顺序消息失败: {e}")
def send_order_message_async(self, message: OrderMessage,
callback: 'SendCallback'):
"""异步发送顺序消息"""
try:
normal_message = self._convert_to_message(message)
# 包装回调
wrapped_callback = OrderSendCallback(callback, self.send_stats)
self.producer.send_async(
message=normal_message,
selector=self.queue_selector,
arg=message.order_id,
callback=wrapped_callback
)
except Exception as e:
self.send_stats["total_sent"] += 1
self.send_stats["failed_sent"] += 1
callback.on_exception(e)
def _convert_to_message(self, order_message: OrderMessage) -> 'Message':
"""转换为普通消息"""
from rocketmq_producer import Message
message = Message(
topic=order_message.topic,
tags=order_message.tags,
keys=order_message.keys,
body=order_message.body
)
# 复制属性
for key, value in order_message.properties.items():
message.put_property(key, value)
return message
def get_send_stats(self) -> Dict[str, int]:
"""获取发送统计"""
return self.send_stats.copy()
# 顺序发送回调包装器
class OrderSendCallback:
"""顺序发送回调包装器"""
def __init__(self, callback: 'SendCallback', stats: Dict[str, int]):
self.callback = callback
self.stats = stats
def on_success(self, result: 'SendResult'):
"""发送成功回调"""
self.stats["total_sent"] += 1
self.stats["success_sent"] += 1
self.callback.on_success(result)
def on_exception(self, exception: Exception):
"""发送异常回调"""
self.stats["total_sent"] += 1
self.stats["failed_sent"] += 1
self.callback.on_exception(exception)
5.1.2 顺序消息消费
# 顺序消息消费者
class OrderMessageConsumer:
"""顺序消息消费者"""
def __init__(self, consumer_config: 'ConsumerConfig'):
self.config = consumer_config
# 强制设置为顺序消费模式
self.config.consume_mode = ConsumeMode.ORDERLY
self.consumer = None
self.order_listener: Optional['OrderMessageListener'] = None
self.consume_stats = {
"total_consumed": 0,
"success_consumed": 0,
"failed_consumed": 0
}
def start(self):
"""启动顺序消息消费者"""
if not self.order_listener:
raise Exception("未设置顺序消息监听器")
# 初始化底层消费者
from rocketmq_consumer import DefaultMQPushConsumer
self.consumer = DefaultMQPushConsumer(self.config)
# 注册顺序消息监听器
wrapped_listener = OrderMessageListenerWrapper(
self.order_listener, self.consume_stats
)
self.consumer.register_message_listener(wrapped_listener)
self.consumer.start()
print("顺序消息消费者启动成功")
def shutdown(self):
"""关闭顺序消息消费者"""
if self.consumer:
self.consumer.shutdown()
print("顺序消息消费者关闭成功")
def subscribe(self, topic: str, sub_expression: str = "*"):
"""订阅主题"""
if self.consumer:
self.consumer.subscribe(topic, sub_expression)
def register_order_listener(self, listener: 'OrderMessageListener'):
"""注册顺序消息监听器"""
self.order_listener = listener
def get_consume_stats(self) -> Dict[str, int]:
"""获取消费统计"""
return self.consume_stats.copy()
# 顺序消息监听器接口
class OrderMessageListener(ABC):
"""顺序消息监听器接口"""
@abstractmethod
def consume_order_message(self, messages: List['MessageExt'],
context: 'ConsumeOrderlyContext') -> 'ConsumeOrderlyResult':
"""消费顺序消息"""
pass
# 顺序消费上下文
@dataclass
class ConsumeOrderlyContext:
consumer_group: str
message_queue: 'MessageQueue'
auto_commit: bool = True
suspend_current_queue_time_millis: int = 1000
def set_auto_commit(self, auto_commit: bool):
"""设置自动提交"""
self.auto_commit = auto_commit
def set_suspend_current_queue_time_millis(self, time_millis: int):
"""设置暂停当前队列时间"""
self.suspend_current_queue_time_millis = time_millis
# 顺序消费结果
class ConsumeOrderlyResult(Enum):
SUCCESS = "SUCCESS"
SUSPEND_CURRENT_QUEUE_A_MOMENT = "SUSPEND_CURRENT_QUEUE_A_MOMENT"
# 顺序消息监听器包装器
class OrderMessageListenerWrapper:
"""顺序消息监听器包装器"""
def __init__(self, order_listener: OrderMessageListener,
stats: Dict[str, int]):
self.order_listener = order_listener
self.stats = stats
def consume_message(self, messages: List['MessageExt'],
context: 'ConsumeContext') -> 'ConsumeResult':
"""消费消息(适配器方法)"""
try:
# 转换上下文
order_context = ConsumeOrderlyContext(
consumer_group=context.consumer_group,
message_queue=context.message_queue
)
# 调用顺序监听器
result = self.order_listener.consume_order_message(messages, order_context)
# 更新统计
if result == ConsumeOrderlyResult.SUCCESS:
self.stats["total_consumed"] += len(messages)
self.stats["success_consumed"] += len(messages)
return ConsumeResult.SUCCESS
else:
self.stats["total_consumed"] += len(messages)
self.stats["failed_consumed"] += len(messages)
return ConsumeResult.RECONSUME_LATER
except Exception as e:
print(f"顺序消费异常: {e}")
self.stats["total_consumed"] += len(messages)
self.stats["failed_consumed"] += len(messages)
return ConsumeResult.RECONSUME_LATER
# 业务顺序消息监听器
class BusinessOrderMessageListener(OrderMessageListener):
"""业务顺序消息监听器"""
def __init__(self, business_handler: Callable[[List['MessageExt']], bool]):
self.business_handler = business_handler
self.processed_orders = {} # 已处理的订单
self.lock = threading.Lock()
def consume_order_message(self, messages: List['MessageExt'],
context: ConsumeOrderlyContext) -> ConsumeOrderlyResult:
"""消费顺序消息"""
try:
# 检查消息顺序
if not self._check_message_order(messages):
print("消息顺序检查失败,暂停队列")
context.set_suspend_current_queue_time_millis(5000)
return ConsumeOrderlyResult.SUSPEND_CURRENT_QUEUE_A_MOMENT
# 处理业务逻辑
success = self.business_handler(messages)
if success:
# 记录已处理的订单
self._record_processed_orders(messages)
return ConsumeOrderlyResult.SUCCESS
else:
print("业务处理失败,暂停队列")
return ConsumeOrderlyResult.SUSPEND_CURRENT_QUEUE_A_MOMENT
except Exception as e:
print(f"顺序消费异常: {e}")
return ConsumeOrderlyResult.SUSPEND_CURRENT_QUEUE_A_MOMENT
def _check_message_order(self, messages: List['MessageExt']) -> bool:
"""检查消息顺序"""
for msg in messages:
order_id = msg.get_property("ORDER_ID")
if not order_id:
continue
# 检查是否按顺序到达
with self.lock:
last_offset = self.processed_orders.get(order_id, -1)
if msg.queue_offset <= last_offset:
return False
return True
def _record_processed_orders(self, messages: List['MessageExt']):
"""记录已处理的订单"""
with self.lock:
for msg in messages:
order_id = msg.get_property("ORDER_ID")
if order_id:
self.processed_orders[order_id] = msg.queue_offset
5.2 事务消息
5.2.1 事务消息概念
# 事务消息状态
class LocalTransactionState(Enum):
COMMIT_MESSAGE = "COMMIT_MESSAGE" # 提交事务
ROLLBACK_MESSAGE = "ROLLBACK_MESSAGE" # 回滚事务
UNKNOWN = "UNKNOWN" # 未知状态
# 事务消息
@dataclass
class TransactionMessage:
topic: str
tags: str
keys: str
body: bytes
transaction_id: str # 事务ID
properties: Dict[str, str] = None
def __post_init__(self):
if self.properties is None:
self.properties = {}
# 设置事务标识
self.properties["TRANSACTION_ID"] = self.transaction_id
self.properties["TRAN_MSG"] = "true"
# 事务监听器接口
class TransactionListener(ABC):
"""事务监听器接口"""
@abstractmethod
def execute_local_transaction(self, message: TransactionMessage,
arg: object) -> LocalTransactionState:
"""执行本地事务"""
pass
@abstractmethod
def check_local_transaction(self, message: TransactionMessage) -> LocalTransactionState:
"""检查本地事务状态"""
pass
# 事务消息生产者
class TransactionMQProducer:
"""事务消息生产者"""
def __init__(self, producer_config: 'ProducerConfig'):
self.config = producer_config
self.producer = None
self.transaction_listener: Optional[TransactionListener] = None
self.transaction_check_executor = None
self.transaction_states = {} # 事务状态缓存
self.transaction_stats = {
"total_transactions": 0,
"committed_transactions": 0,
"rollback_transactions": 0,
"unknown_transactions": 0
}
self.lock = threading.Lock()
def start(self):
"""启动事务消息生产者"""
if not self.transaction_listener:
raise Exception("未设置事务监听器")
# 初始化底层生产者
from rocketmq_producer import DefaultMQProducer
self.producer = DefaultMQProducer(self.config)
# 启动事务检查线程池
from concurrent.futures import ThreadPoolExecutor
self.transaction_check_executor = ThreadPoolExecutor(
max_workers=5,
thread_name_prefix="TransactionCheck"
)
self.producer.start()
print("事务消息生产者启动成功")
def shutdown(self):
"""关闭事务消息生产者"""
if self.transaction_check_executor:
self.transaction_check_executor.shutdown(wait=True)
if self.producer:
self.producer.shutdown()
print("事务消息生产者关闭成功")
def set_transaction_listener(self, listener: TransactionListener):
"""设置事务监听器"""
self.transaction_listener = listener
def send_message_in_transaction(self, message: TransactionMessage,
arg: object = None) -> 'TransactionSendResult':
"""发送事务消息"""
try:
# 1. 发送半消息(Prepare消息)
prepare_result = self._send_prepare_message(message)
# 2. 执行本地事务
local_state = self.transaction_listener.execute_local_transaction(message, arg)
# 3. 根据本地事务结果提交或回滚
end_result = self._end_transaction(message, prepare_result, local_state)
# 4. 更新统计
self._update_transaction_stats(local_state)
return TransactionSendResult(
send_result=prepare_result,
local_transaction_state=local_state,
end_result=end_result
)
except Exception as e:
# 异常时回滚事务
if 'prepare_result' in locals():
self._end_transaction(message, prepare_result, LocalTransactionState.ROLLBACK_MESSAGE)
self._update_transaction_stats(LocalTransactionState.ROLLBACK_MESSAGE)
raise Exception(f"发送事务消息失败: {e}")
def _send_prepare_message(self, message: TransactionMessage) -> 'SendResult':
"""发送准备消息"""
# 转换为普通消息
normal_message = self._convert_to_message(message)
# 标记为准备消息
normal_message.put_property("PREPARE_MESSAGE", "true")
# 发送到Broker
result = self.producer.send(normal_message)
# 缓存事务状态
with self.lock:
self.transaction_states[message.transaction_id] = {
"message": message,
"prepare_result": result,
"state": LocalTransactionState.UNKNOWN,
"create_time": int(time.time() * 1000)
}
return result
def _end_transaction(self, message: TransactionMessage,
prepare_result: 'SendResult',
state: LocalTransactionState) -> bool:
"""结束事务"""
try:
# 发送事务结束请求到Broker
end_request = TransactionEndRequest(
transaction_id=message.transaction_id,
msg_id=prepare_result.msg_id,
transaction_state=state
)
# 这里应该发送到Broker
success = self._send_end_transaction_request(end_request)
# 更新缓存状态
with self.lock:
if message.transaction_id in self.transaction_states:
self.transaction_states[message.transaction_id]["state"] = state
return success
except Exception as e:
print(f"结束事务失败: {e}")
return False
def _send_end_transaction_request(self, request: 'TransactionEndRequest') -> bool:
"""发送事务结束请求"""
# 模拟发送到Broker
print(f"发送事务结束请求: {request.transaction_id}, 状态: {request.transaction_state}")
return True
def check_transaction_state(self, transaction_id: str) -> LocalTransactionState:
"""检查事务状态(Broker回查)"""
try:
with self.lock:
transaction_info = self.transaction_states.get(transaction_id)
if not transaction_info:
return LocalTransactionState.UNKNOWN
# 调用监听器检查本地事务状态
message = transaction_info["message"]
state = self.transaction_listener.check_local_transaction(message)
# 更新缓存状态
with self.lock:
transaction_info["state"] = state
return state
except Exception as e:
print(f"检查事务状态异常: {e}")
return LocalTransactionState.UNKNOWN
def _convert_to_message(self, transaction_message: TransactionMessage) -> 'Message':
"""转换为普通消息"""
from rocketmq_producer import Message
message = Message(
topic=transaction_message.topic,
tags=transaction_message.tags,
keys=transaction_message.keys,
body=transaction_message.body
)
# 复制属性
for key, value in transaction_message.properties.items():
message.put_property(key, value)
return message
def _update_transaction_stats(self, state: LocalTransactionState):
"""更新事务统计"""
with self.lock:
self.transaction_stats["total_transactions"] += 1
if state == LocalTransactionState.COMMIT_MESSAGE:
self.transaction_stats["committed_transactions"] += 1
elif state == LocalTransactionState.ROLLBACK_MESSAGE:
self.transaction_stats["rollback_transactions"] += 1
else:
self.transaction_stats["unknown_transactions"] += 1
def get_transaction_stats(self) -> Dict[str, int]:
"""获取事务统计"""
with self.lock:
return self.transaction_stats.copy()
def cleanup_expired_transactions(self, expire_time_ms: int = 300000):
"""清理过期事务"""
current_time = int(time.time() * 1000)
expired_transactions = []
with self.lock:
for transaction_id, info in self.transaction_states.items():
if current_time - info["create_time"] > expire_time_ms:
expired_transactions.append(transaction_id)
for transaction_id in expired_transactions:
del self.transaction_states[transaction_id]
print(f"清理过期事务: {len(expired_transactions)} 个")
# 事务发送结果
@dataclass
class TransactionSendResult:
send_result: 'SendResult'
local_transaction_state: LocalTransactionState
end_result: bool
# 事务结束请求
@dataclass
class TransactionEndRequest:
transaction_id: str
msg_id: str
transaction_state: LocalTransactionState
5.2.2 事务监听器实现
# 数据库事务监听器
class DatabaseTransactionListener(TransactionListener):
"""数据库事务监听器"""
def __init__(self, db_connection):
self.db_connection = db_connection
self.transaction_records = {} # 事务记录
self.lock = threading.Lock()
def execute_local_transaction(self, message: TransactionMessage,
arg: object) -> LocalTransactionState:
"""执行本地数据库事务"""
transaction_id = message.transaction_id
try:
# 开始数据库事务
self.db_connection.begin()
# 执行业务逻辑
success = self._execute_business_logic(message, arg)
if success:
# 提交数据库事务
self.db_connection.commit()
# 记录事务状态
with self.lock:
self.transaction_records[transaction_id] = {
"state": LocalTransactionState.COMMIT_MESSAGE,
"timestamp": int(time.time() * 1000),
"message": message
}
return LocalTransactionState.COMMIT_MESSAGE
else:
# 回滚数据库事务
self.db_connection.rollback()
with self.lock:
self.transaction_records[transaction_id] = {
"state": LocalTransactionState.ROLLBACK_MESSAGE,
"timestamp": int(time.time() * 1000),
"message": message
}
return LocalTransactionState.ROLLBACK_MESSAGE
except Exception as e:
print(f"执行本地事务异常: {e}")
# 回滚数据库事务
try:
self.db_connection.rollback()
except:
pass
with self.lock:
self.transaction_records[transaction_id] = {
"state": LocalTransactionState.ROLLBACK_MESSAGE,
"timestamp": int(time.time() * 1000),
"message": message,
"error": str(e)
}
return LocalTransactionState.ROLLBACK_MESSAGE
def check_local_transaction(self, message: TransactionMessage) -> LocalTransactionState:
"""检查本地事务状态"""
transaction_id = message.transaction_id
# 从缓存中查找
with self.lock:
record = self.transaction_records.get(transaction_id)
if record:
return record["state"]
# 从数据库中查找事务记录
try:
state = self._query_transaction_state_from_db(transaction_id)
return state
except Exception as e:
print(f"查询事务状态异常: {e}")
return LocalTransactionState.UNKNOWN
def _execute_business_logic(self, message: TransactionMessage, arg: object) -> bool:
"""执行业务逻辑"""
# 这里实现具体的业务逻辑
# 例如:更新订单状态、扣减库存等
try:
# 模拟业务操作
business_data = message.body.decode('utf-8')
print(f"执行业务逻辑: {business_data}")
# 执行SQL操作
cursor = self.db_connection.cursor()
cursor.execute(
"INSERT INTO transaction_log (transaction_id, message_data, create_time) VALUES (?, ?, ?)",
(message.transaction_id, business_data, int(time.time() * 1000))
)
return True
except Exception as e:
print(f"业务逻辑执行失败: {e}")
return False
def _query_transaction_state_from_db(self, transaction_id: str) -> LocalTransactionState:
"""从数据库查询事务状态"""
try:
cursor = self.db_connection.cursor()
cursor.execute(
"SELECT state FROM transaction_records WHERE transaction_id = ?",
(transaction_id,)
)
result = cursor.fetchone()
if result:
state_str = result[0]
return LocalTransactionState(state_str)
else:
return LocalTransactionState.UNKNOWN
except Exception as e:
print(f"查询数据库事务状态失败: {e}")
return LocalTransactionState.UNKNOWN
# 业务事务监听器
class BusinessTransactionListener(TransactionListener):
"""业务事务监听器"""
def __init__(self, business_executor: Callable[[TransactionMessage, object], bool]):
self.business_executor = business_executor
self.transaction_cache = {} # 事务缓存
self.lock = threading.Lock()
def execute_local_transaction(self, message: TransactionMessage,
arg: object) -> LocalTransactionState:
"""执行本地业务事务"""
transaction_id = message.transaction_id
try:
# 执行业务逻辑
success = self.business_executor(message, arg)
state = LocalTransactionState.COMMIT_MESSAGE if success else LocalTransactionState.ROLLBACK_MESSAGE
# 缓存事务状态
with self.lock:
self.transaction_cache[transaction_id] = {
"state": state,
"timestamp": int(time.time() * 1000),
"message": message
}
return state
except Exception as e:
print(f"执行业务事务异常: {e}")
with self.lock:
self.transaction_cache[transaction_id] = {
"state": LocalTransactionState.ROLLBACK_MESSAGE,
"timestamp": int(time.time() * 1000),
"message": message,
"error": str(e)
}
return LocalTransactionState.ROLLBACK_MESSAGE
def check_local_transaction(self, message: TransactionMessage) -> LocalTransactionState:
"""检查本地事务状态"""
transaction_id = message.transaction_id
with self.lock:
cache_record = self.transaction_cache.get(transaction_id)
if cache_record:
return cache_record["state"]
# 如果缓存中没有,返回未知状态
return LocalTransactionState.UNKNOWN
def get_transaction_records(self) -> Dict[str, Dict]:
"""获取事务记录"""
with self.lock:
return self.transaction_cache.copy()
def cleanup_expired_records(self, expire_time_ms: int = 600000):
"""清理过期记录"""
current_time = int(time.time() * 1000)
expired_keys = []
with self.lock:
for transaction_id, record in self.transaction_cache.items():
if current_time - record["timestamp"] > expire_time_ms:
expired_keys.append(transaction_id)
for key in expired_keys:
del self.transaction_cache[key]
print(f"清理过期事务记录: {len(expired_keys)} 个")
5.3 延时消息
5.3.1 延时消息实现
# 延时级别枚举
class DelayLevel(Enum):
LEVEL_1 = 1 # 1s
LEVEL_2 = 2 # 5s
LEVEL_3 = 3 # 10s
LEVEL_4 = 4 # 30s
LEVEL_5 = 5 # 1m
LEVEL_6 = 6 # 2m
LEVEL_7 = 7 # 3m
LEVEL_8 = 8 # 4m
LEVEL_9 = 9 # 5m
LEVEL_10 = 10 # 6m
LEVEL_11 = 11 # 7m
LEVEL_12 = 12 # 8m
LEVEL_13 = 13 # 9m
LEVEL_14 = 14 # 10m
LEVEL_15 = 15 # 20m
LEVEL_16 = 16 # 30m
LEVEL_17 = 17 # 1h
LEVEL_18 = 18 # 2h
# 延时消息
@dataclass
class DelayMessage:
topic: str
tags: str
keys: str
body: bytes
delay_level: DelayLevel
properties: Dict[str, str] = None
def __post_init__(self):
if self.properties is None:
self.properties = {}
# 设置延时级别
self.properties["DELAY"] = str(self.delay_level.value)
def get_delay_time_ms(self) -> int:
"""获取延时时间(毫秒)"""
delay_times = {
DelayLevel.LEVEL_1: 1000,
DelayLevel.LEVEL_2: 5000,
DelayLevel.LEVEL_3: 10000,
DelayLevel.LEVEL_4: 30000,
DelayLevel.LEVEL_5: 60000,
DelayLevel.LEVEL_6: 120000,
DelayLevel.LEVEL_7: 180000,
DelayLevel.LEVEL_8: 240000,
DelayLevel.LEVEL_9: 300000,
DelayLevel.LEVEL_10: 360000,
DelayLevel.LEVEL_11: 420000,
DelayLevel.LEVEL_12: 480000,
DelayLevel.LEVEL_13: 540000,
DelayLevel.LEVEL_14: 600000,
DelayLevel.LEVEL_15: 1200000,
DelayLevel.LEVEL_16: 1800000,
DelayLevel.LEVEL_17: 3600000,
DelayLevel.LEVEL_18: 7200000,
}
return delay_times.get(self.delay_level, 1000)
# 延时消息生产者
class DelayMessageProducer:
"""延时消息生产者"""
def __init__(self, producer_config: 'ProducerConfig'):
self.config = producer_config
self.producer = None
self.delay_stats = {
"total_sent": 0,
"success_sent": 0,
"failed_sent": 0,
"delay_levels": {level.value: 0 for level in DelayLevel}
}
def start(self):
"""启动延时消息生产者"""
from rocketmq_producer import DefaultMQProducer
self.producer = DefaultMQProducer(self.config)
self.producer.start()
print("延时消息生产者启动成功")
def shutdown(self):
"""关闭延时消息生产者"""
if self.producer:
self.producer.shutdown()
print("延时消息生产者关闭成功")
def send_delay_message(self, message: DelayMessage) -> 'SendResult':
"""发送延时消息"""
try:
# 转换为普通消息
normal_message = self._convert_to_message(message)
# 发送消息
result = self.producer.send(normal_message)
# 更新统计
self.delay_stats["total_sent"] += 1
self.delay_stats["success_sent"] += 1
self.delay_stats["delay_levels"][message.delay_level.value] += 1
print(f"发送延时消息成功: {result.msg_id}, 延时级别: {message.delay_level}")
return result
except Exception as e:
self.delay_stats["total_sent"] += 1
self.delay_stats["failed_sent"] += 1
raise Exception(f"发送延时消息失败: {e}")
def send_delay_message_async(self, message: DelayMessage,
callback: 'SendCallback'):
"""异步发送延时消息"""
try:
normal_message = self._convert_to_message(message)
# 包装回调
wrapped_callback = DelayMessageSendCallback(
callback, self.delay_stats, message.delay_level
)
self.producer.send_async(normal_message, wrapped_callback)
except Exception as e:
self.delay_stats["total_sent"] += 1
self.delay_stats["failed_sent"] += 1
callback.on_exception(e)
def send_delay_message_at_time(self, message: DelayMessage,
deliver_time: int) -> 'SendResult':
"""在指定时间发送延时消息"""
current_time = int(time.time() * 1000)
delay_time = deliver_time - current_time
if delay_time <= 0:
# 立即发送
normal_message = self._convert_to_message_without_delay(message)
return self.producer.send(normal_message)
# 计算最接近的延时级别
delay_level = self._calculate_delay_level(delay_time)
message.delay_level = delay_level
return self.send_delay_message(message)
def _convert_to_message(self, delay_message: DelayMessage) -> 'Message':
"""转换为普通消息"""
from rocketmq_producer import Message
message = Message(
topic=delay_message.topic,
tags=delay_message.tags,
keys=delay_message.keys,
body=delay_message.body
)
# 设置延时级别
message.put_property("DELAY", str(delay_message.delay_level.value))
# 复制其他属性
for key, value in delay_message.properties.items():
if key != "DELAY":
message.put_property(key, value)
return message
def _convert_to_message_without_delay(self, delay_message: DelayMessage) -> 'Message':
"""转换为普通消息(不设置延时)"""
from rocketmq_producer import Message
message = Message(
topic=delay_message.topic,
tags=delay_message.tags,
keys=delay_message.keys,
body=delay_message.body
)
# 复制属性(排除延时属性)
for key, value in delay_message.properties.items():
if key != "DELAY":
message.put_property(key, value)
return message
def _calculate_delay_level(self, delay_time_ms: int) -> DelayLevel:
"""计算最接近的延时级别"""
delay_times = [
(DelayLevel.LEVEL_1, 1000),
(DelayLevel.LEVEL_2, 5000),
(DelayLevel.LEVEL_3, 10000),
(DelayLevel.LEVEL_4, 30000),
(DelayLevel.LEVEL_5, 60000),
(DelayLevel.LEVEL_6, 120000),
(DelayLevel.LEVEL_7, 180000),
(DelayLevel.LEVEL_8, 240000),
(DelayLevel.LEVEL_9, 300000),
(DelayLevel.LEVEL_10, 360000),
(DelayLevel.LEVEL_11, 420000),
(DelayLevel.LEVEL_12, 480000),
(DelayLevel.LEVEL_13, 540000),
(DelayLevel.LEVEL_14, 600000),
(DelayLevel.LEVEL_15, 1200000),
(DelayLevel.LEVEL_16, 1800000),
(DelayLevel.LEVEL_17, 3600000),
(DelayLevel.LEVEL_18, 7200000),
]
# 找到最接近的延时级别
best_level = DelayLevel.LEVEL_1
min_diff = abs(delay_time_ms - 1000)
for level, time_ms in delay_times:
diff = abs(delay_time_ms - time_ms)
if diff < min_diff:
min_diff = diff
best_level = level
return best_level
def get_delay_stats(self) -> Dict:
"""获取延时消息统计"""
return self.delay_stats.copy()
# 延时消息发送回调
class DelayMessageSendCallback:
"""延时消息发送回调"""
def __init__(self, callback: 'SendCallback', stats: Dict, delay_level: DelayLevel):
self.callback = callback
self.stats = stats
self.delay_level = delay_level
def on_success(self, result: 'SendResult'):
"""发送成功回调"""
self.stats["total_sent"] += 1
self.stats["success_sent"] += 1
self.stats["delay_levels"][self.delay_level.value] += 1
self.callback.on_success(result)
def on_exception(self, exception: Exception):
"""发送异常回调"""
self.stats["total_sent"] += 1
self.stats["failed_sent"] += 1
self.callback.on_exception(exception)
5.3.2 延时消息使用示例
# 延时消息使用示例
class DelayMessageExample:
"""延时消息使用示例"""
def __init__(self):
# 配置生产者
self.producer_config = ProducerConfig(
producer_group="delay_producer_group",
name_server_addr="localhost:9876"
)
# 创建延时消息生产者
self.producer = DelayMessageProducer(self.producer_config)
def start_example(self):
"""启动示例"""
try:
# 启动生产者
self.producer.start()
# 发送不同延时级别的消息
self._send_various_delay_messages()
# 发送定时消息
self._send_scheduled_messages()
# 打印统计信息
self._print_statistics()
finally:
self.producer.shutdown()
def _send_various_delay_messages(self):
"""发送不同延时级别的消息"""
delay_levels = [
DelayLevel.LEVEL_1, # 1秒
DelayLevel.LEVEL_3, # 10秒
DelayLevel.LEVEL_5, # 1分钟
DelayLevel.LEVEL_15, # 20分钟
DelayLevel.LEVEL_17 # 1小时
]
for i, delay_level in enumerate(delay_levels):
message = DelayMessage(
topic="DelayTopic",
tags="DelayTag",
keys=f"delay_key_{i}",
body=f"延时消息内容 {i}, 延时级别: {delay_level.name}".encode('utf-8'),
delay_level=delay_level
)
try:
result = self.producer.send_delay_message(message)
print(f"发送延时消息成功: {result.msg_id}")
except Exception as e:
print(f"发送延时消息失败: {e}")
def _send_scheduled_messages(self):
"""发送定时消息"""
# 5分钟后发送
deliver_time = int(time.time() * 1000) + 5 * 60 * 1000
message = DelayMessage(
topic="DelayTopic",
tags="ScheduledTag",
keys="scheduled_key",
body="定时消息内容".encode('utf-8'),
delay_level=DelayLevel.LEVEL_1 # 会被重新计算
)
try:
result = self.producer.send_delay_message_at_time(message, deliver_time)
print(f"发送定时消息成功: {result.msg_id}")
except Exception as e:
print(f"发送定时消息失败: {e}")
def _print_statistics(self):
"""打印统计信息"""
stats = self.producer.get_delay_stats()
print("\n延时消息统计:")
print(f"总发送数: {stats['total_sent']}")
print(f"成功发送数: {stats['success_sent']}")
print(f"失败发送数: {stats['failed_sent']}")
print("\n各延时级别统计:")
for level, count in stats['delay_levels'].items():
if count > 0:
print(f" 级别 {level}: {count} 条")
# 延时消息消费者示例
class DelayMessageConsumerExample:
"""延时消息消费者示例"""
def __init__(self):
# 配置消费者
self.consumer_config = ConsumerConfig(
consumer_group="delay_consumer_group",
name_server_addr="localhost:9876"
)
# 创建消费者
from rocketmq_consumer import DefaultMQPushConsumer
self.consumer = DefaultMQPushConsumer(self.consumer_config)
def start_consume(self):
"""开始消费延时消息"""
try:
# 注册消息监听器
listener = DelayMessageListener()
self.consumer.register_message_listener(listener)
# 订阅主题
self.consumer.subscribe("DelayTopic", "*")
# 启动消费者
self.consumer.start()
print("延时消息消费者启动成功,等待消息...")
# 等待消息
while True:
time.sleep(10)
stats = listener.get_consume_stats()
print(f"消费统计: {stats}")
except KeyboardInterrupt:
print("收到中断信号,正在关闭消费者...")
finally:
self.consumer.shutdown()
# 延时消息监听器
class DelayMessageListener(MessageListenerConcurrently):
"""延时消息监听器"""
def __init__(self):
self.consume_stats = {
"total_consumed": 0,
"success_consumed": 0,
"failed_consumed": 0,
"delay_levels": {}
}
def consume_message(self, messages: List['MessageExt'],
context: 'ConsumeContext') -> 'ConsumeResult':
"""消费延时消息"""
for msg in messages:
try:
# 获取延时级别
delay_level = msg.get_property("DELAY")
# 计算实际延时时间
actual_delay = self._calculate_actual_delay(msg)
print(f"收到延时消息:")
print(f" 消息ID: {msg.msg_id}")
print(f" 延时级别: {delay_level}")
print(f" 实际延时: {actual_delay}ms")
print(f" 消息内容: {msg.body.decode('utf-8')}")
# 更新统计
self.consume_stats["total_consumed"] += 1
self.consume_stats["success_consumed"] += 1
if delay_level:
level_stats = self.consume_stats["delay_levels"]
level_stats[delay_level] = level_stats.get(delay_level, 0) + 1
except Exception as e:
print(f"处理延时消息异常: {e}")
self.consume_stats["total_consumed"] += 1
self.consume_stats["failed_consumed"] += 1
return ConsumeResult.RECONSUME_LATER
return ConsumeResult.SUCCESS
def _calculate_actual_delay(self, msg: 'MessageExt') -> int:
"""计算实际延时时间"""
try:
# 消息存储时间 - 消息产生时间
return msg.store_timestamp - msg.born_timestamp
except:
return 0
def get_consume_stats(self) -> Dict:
"""获取消费统计"""
return self.consume_stats.copy()
# 使用示例
if __name__ == "__main__":
# 发送延时消息示例
producer_example = DelayMessageExample()
producer_example.start_example()
# 消费延时消息示例
# consumer_example = DelayMessageConsumerExample()
# consumer_example.start_consume()
5.4 批量消息
5.4.1 批量消息实现
# 批量消息
@dataclass
class BatchMessage:
topic: str
messages: List['Message']
def __post_init__(self):
if not self.messages:
raise ValueError("批量消息不能为空")
# 验证所有消息的主题一致
for msg in self.messages:
if msg.topic != self.topic:
raise ValueError(f"消息主题不一致: {msg.topic} != {self.topic}")
def get_total_size(self) -> int:
"""获取批量消息总大小"""
total_size = 0
for msg in self.messages:
total_size += len(msg.body)
total_size += len(msg.topic.encode('utf-8'))
total_size += len(msg.tags.encode('utf-8')) if msg.tags else 0
total_size += len(msg.keys.encode('utf-8')) if msg.keys else 0
# 属性大小
for key, value in msg.properties.items():
total_size += len(key.encode('utf-8'))
total_size += len(value.encode('utf-8'))
return total_size
def get_message_count(self) -> int:
"""获取消息数量"""
return len(self.messages)
# 批量消息生产者
class BatchMessageProducer:
"""批量消息生产者"""
def __init__(self, producer_config: 'ProducerConfig'):
self.config = producer_config
self.producer = None
self.max_batch_size = 1024 * 1024 # 1MB
self.max_batch_count = 1000
self.batch_stats = {
"total_batches": 0,
"total_messages": 0,
"success_batches": 0,
"failed_batches": 0,
"avg_batch_size": 0
}
self.batch_sizes = []
def start(self):
"""启动批量消息生产者"""
from rocketmq_producer import DefaultMQProducer
self.producer = DefaultMQProducer(self.config)
self.producer.start()
print("批量消息生产者启动成功")
def shutdown(self):
"""关闭批量消息生产者"""
if self.producer:
self.producer.shutdown()
print("批量消息生产者关闭成功")
def send_batch(self, batch_message: BatchMessage) -> 'SendResult':
"""发送批量消息"""
try:
# 验证批量消息
self._validate_batch_message(batch_message)
# 如果批量过大,分割发送
if self._need_split_batch(batch_message):
return self._send_split_batch(batch_message)
# 发送单个批量
result = self._send_single_batch(batch_message)
# 更新统计
self._update_batch_stats(batch_message, True)
return result
except Exception as e:
self._update_batch_stats(batch_message, False)
raise Exception(f"发送批量消息失败: {e}")
def send_batch_async(self, batch_message: BatchMessage,
callback: 'BatchSendCallback'):
"""异步发送批量消息"""
try:
self._validate_batch_message(batch_message)
if self._need_split_batch(batch_message):
self._send_split_batch_async(batch_message, callback)
else:
self._send_single_batch_async(batch_message, callback)
except Exception as e:
self._update_batch_stats(batch_message, False)
callback.on_exception(e)
def _validate_batch_message(self, batch_message: BatchMessage):
"""验证批量消息"""
if not batch_message.messages:
raise ValueError("批量消息不能为空")
if len(batch_message.messages) > self.max_batch_count:
raise ValueError(f"批量消息数量超限: {len(batch_message.messages)} > {self.max_batch_count}")
# 检查消息是否包含延时或事务属性
for msg in batch_message.messages:
if msg.get_property("DELAY") or msg.get_property("TRAN_MSG"):
raise ValueError("批量消息不支持延时或事务消息")
def _need_split_batch(self, batch_message: BatchMessage) -> bool:
"""判断是否需要分割批量"""
return batch_message.get_total_size() > self.max_batch_size
def _send_single_batch(self, batch_message: BatchMessage) -> 'SendResult':
"""发送单个批量消息"""
# 将批量消息编码为单个消息
encoded_message = self._encode_batch_message(batch_message)
# 发送到Broker
result = self.producer.send(encoded_message)
print(f"发送批量消息成功: {result.msg_id}, 消息数量: {batch_message.get_message_count()}")
return result
def _send_single_batch_async(self, batch_message: BatchMessage,
callback: 'BatchSendCallback'):
"""异步发送单个批量消息"""
encoded_message = self._encode_batch_message(batch_message)
# 包装回调
wrapped_callback = BatchSendCallbackWrapper(
callback, self.batch_stats, batch_message
)
self.producer.send_async(encoded_message, wrapped_callback)
def _send_split_batch(self, batch_message: BatchMessage) -> 'SendResult':
"""分割发送批量消息"""
split_batches = self._split_batch_message(batch_message)
results = []
for split_batch in split_batches:
result = self._send_single_batch(split_batch)
results.append(result)
# 返回第一个结果(可以考虑返回聚合结果)
return results[0] if results else None
def _send_split_batch_async(self, batch_message: BatchMessage,
callback: 'BatchSendCallback'):
"""异步分割发送批量消息"""
split_batches = self._split_batch_message(batch_message)
# 创建聚合回调
aggregated_callback = AggregatedBatchSendCallback(
callback, len(split_batches)
)
for split_batch in split_batches:
self._send_single_batch_async(split_batch, aggregated_callback)
def _split_batch_message(self, batch_message: BatchMessage) -> List[BatchMessage]:
"""分割批量消息"""
split_batches = []
current_batch = []
current_size = 0
for msg in batch_message.messages:
msg_size = self._calculate_message_size(msg)
if (current_size + msg_size > self.max_batch_size or
len(current_batch) >= self.max_batch_count) and current_batch:
# 创建新的批量
split_batches.append(BatchMessage(
topic=batch_message.topic,
messages=current_batch.copy()
))
current_batch = []
current_size = 0
current_batch.append(msg)
current_size += msg_size
# 添加最后一个批量
if current_batch:
split_batches.append(BatchMessage(
topic=batch_message.topic,
messages=current_batch
))
return split_batches
def _encode_batch_message(self, batch_message: BatchMessage) -> 'Message':
"""编码批量消息"""
from rocketmq_producer import Message
import json
# 将批量消息序列化
batch_data = {
"topic": batch_message.topic,
"messages": []
}
for msg in batch_message.messages:
msg_data = {
"tags": msg.tags,
"keys": msg.keys,
"body": msg.body.decode('utf-8') if isinstance(msg.body, bytes) else msg.body,
"properties": msg.properties
}
batch_data["messages"].append(msg_data)
# 创建编码后的消息
encoded_message = Message(
topic=batch_message.topic,
tags="BATCH",
keys=f"batch_{int(time.time() * 1000)}",
body=json.dumps(batch_data).encode('utf-8')
)
# 设置批量标识
encoded_message.put_property("BATCH_MESSAGE", "true")
encoded_message.put_property("BATCH_SIZE", str(batch_message.get_message_count()))
return encoded_message
def _calculate_message_size(self, message: 'Message') -> int:
"""计算单个消息大小"""
size = len(message.body)
size += len(message.topic.encode('utf-8'))
size += len(message.tags.encode('utf-8')) if message.tags else 0
size += len(message.keys.encode('utf-8')) if message.keys else 0
for key, value in message.properties.items():
size += len(key.encode('utf-8'))
size += len(value.encode('utf-8'))
return size
def _update_batch_stats(self, batch_message: BatchMessage, success: bool):
"""更新批量统计"""
self.batch_stats["total_batches"] += 1
self.batch_stats["total_messages"] += batch_message.get_message_count()
if success:
self.batch_stats["success_batches"] += 1
else:
self.batch_stats["failed_batches"] += 1
# 记录批量大小
batch_size = batch_message.get_total_size()
self.batch_sizes.append(batch_size)
# 计算平均批量大小
if self.batch_sizes:
self.batch_stats["avg_batch_size"] = sum(self.batch_sizes) // len(self.batch_sizes)
def get_batch_stats(self) -> Dict:
"""获取批量统计"""
return self.batch_stats.copy()
def set_max_batch_size(self, max_size: int):
"""设置最大批量大小"""
self.max_batch_size = max_size
def set_max_batch_count(self, max_count: int):
"""设置最大批量数量"""
self.max_batch_count = max_count
# 批量发送回调接口
class BatchSendCallback(ABC):
"""批量发送回调接口"""
@abstractmethod
def on_success(self, result: 'SendResult', batch_message: BatchMessage):
"""批量发送成功回调"""
pass
@abstractmethod
def on_exception(self, exception: Exception):
"""批量发送异常回调"""
pass
# 批量发送回调包装器
class BatchSendCallbackWrapper:
"""批量发送回调包装器"""
def __init__(self, callback: BatchSendCallback, stats: Dict, batch_message: BatchMessage):
self.callback = callback
self.stats = stats
self.batch_message = batch_message
def on_success(self, result: 'SendResult'):
"""发送成功回调"""
# 更新统计
self.stats["total_batches"] += 1
self.stats["success_batches"] += 1
self.stats["total_messages"] += self.batch_message.get_message_count()
self.callback.on_success(result, self.batch_message)
def on_exception(self, exception: Exception):
"""发送异常回调"""
# 更新统计
self.stats["total_batches"] += 1
self.stats["failed_batches"] += 1
self.stats["total_messages"] += self.batch_message.get_message_count()
self.callback.on_exception(exception)
# 聚合批量发送回调
class AggregatedBatchSendCallback(BatchSendCallback):
"""聚合批量发送回调"""
def __init__(self, original_callback: BatchSendCallback, total_batches: int):
self.original_callback = original_callback
self.total_batches = total_batches
self.completed_batches = 0
self.success_results = []
self.exceptions = []
self.lock = threading.Lock()
def on_success(self, result: 'SendResult', batch_message: BatchMessage):
"""批量发送成功回调"""
with self.lock:
self.completed_batches += 1
self.success_results.append((result, batch_message))
if self.completed_batches == self.total_batches:
# 所有批量都完成了
if self.exceptions:
# 有异常发生
self.original_callback.on_exception(self.exceptions[0])
else:
# 全部成功
first_result, first_batch = self.success_results[0]
self.original_callback.on_success(first_result, first_batch)
def on_exception(self, exception: Exception):
"""批量发送异常回调"""
with self.lock:
self.completed_batches += 1
self.exceptions.append(exception)
if self.completed_batches == self.total_batches:
# 所有批量都完成了
self.original_callback.on_exception(self.exceptions[0])
### 5.4.2 批量消息消费
```python
# 批量消息消费者
class BatchMessageConsumer:
"""批量消息消费者"""
def __init__(self, consumer_config: 'ConsumerConfig'):
self.config = consumer_config
self.consumer = None
self.batch_listener: Optional['BatchMessageListener'] = None
self.consume_stats = {
"total_batches": 0,
"total_messages": 0,
"success_batches": 0,
"failed_batches": 0
}
def start(self):
"""启动批量消息消费者"""
if not self.batch_listener:
raise Exception("未设置批量消息监听器")
from rocketmq_consumer import DefaultMQPushConsumer
self.consumer = DefaultMQPushConsumer(self.config)
# 注册批量消息监听器
wrapped_listener = BatchMessageListenerWrapper(
self.batch_listener, self.consume_stats
)
self.consumer.register_message_listener(wrapped_listener)
self.consumer.start()
print("批量消息消费者启动成功")
def shutdown(self):
"""关闭批量消息消费者"""
if self.consumer:
self.consumer.shutdown()
print("批量消息消费者关闭成功")
def subscribe(self, topic: str, sub_expression: str = "*"):
"""订阅主题"""
if self.consumer:
self.consumer.subscribe(topic, sub_expression)
def register_batch_listener(self, listener: 'BatchMessageListener'):
"""注册批量消息监听器"""
self.batch_listener = listener
def get_consume_stats(self) -> Dict[str, int]:
"""获取消费统计"""
return self.consume_stats.copy()
# 批量消息监听器接口
class BatchMessageListener(ABC):
"""批量消息监听器接口"""
@abstractmethod
def consume_batch_message(self, batch_messages: List['MessageExt']) -> 'ConsumeResult':
"""消费批量消息"""
pass
# 批量消息监听器包装器
class BatchMessageListenerWrapper:
"""批量消息监听器包装器"""
def __init__(self, batch_listener: BatchMessageListener, stats: Dict[str, int]):
self.batch_listener = batch_listener
self.stats = stats
def consume_message(self, messages: List['MessageExt'],
context: 'ConsumeContext') -> 'ConsumeResult':
"""消费消息(适配器方法)"""
try:
# 检查是否为批量消息
batch_messages = []
individual_messages = []
for msg in messages:
if msg.get_property("BATCH_MESSAGE") == "true":
# 解码批量消息
decoded_messages = self._decode_batch_message(msg)
batch_messages.extend(decoded_messages)
else:
individual_messages.append(msg)
# 处理批量消息
if batch_messages:
result = self.batch_listener.consume_batch_message(batch_messages)
self._update_batch_stats(len(batch_messages), result == ConsumeResult.SUCCESS)
return result
# 处理单个消息
if individual_messages:
result = self.batch_listener.consume_batch_message(individual_messages)
self._update_individual_stats(len(individual_messages), result == ConsumeResult.SUCCESS)
return result
return ConsumeResult.SUCCESS
except Exception as e:
print(f"批量消费异常: {e}")
self._update_batch_stats(len(messages), False)
return ConsumeResult.RECONSUME_LATER
def _decode_batch_message(self, batch_msg: 'MessageExt') -> List['MessageExt']:
"""解码批量消息"""
import json
try:
# 解析批量消息数据
batch_data = json.loads(batch_msg.body.decode('utf-8'))
decoded_messages = []
for msg_data in batch_data["messages"]:
# 创建消息扩展对象
decoded_msg = MessageExt(
topic=batch_data["topic"],
tags=msg_data["tags"],
keys=msg_data["keys"],
body=msg_data["body"].encode('utf-8'),
msg_id=f"{batch_msg.msg_id}_{len(decoded_messages)}",
queue_id=batch_msg.queue_id,
queue_offset=batch_msg.queue_offset,
born_timestamp=batch_msg.born_timestamp,
store_timestamp=batch_msg.store_timestamp
)
# 设置属性
for key, value in msg_data["properties"].items():
decoded_msg.put_property(key, value)
decoded_messages.append(decoded_msg)
return decoded_messages
except Exception as e:
print(f"解码批量消息失败: {e}")
return []
def _update_batch_stats(self, message_count: int, success: bool):
"""更新批量统计"""
self.stats["total_batches"] += 1
self.stats["total_messages"] += message_count
if success:
self.stats["success_batches"] += 1
else:
self.stats["failed_batches"] += 1
def _update_individual_stats(self, message_count: int, success: bool):
"""更新单个消息统计"""
self.stats["total_messages"] += message_count
if success:
self.stats["success_batches"] += 1
else:
self.stats["failed_batches"] += 1
# 业务批量消息监听器
class BusinessBatchMessageListener(BatchMessageListener):
"""业务批量消息监听器"""
def __init__(self, batch_processor: Callable[[List['MessageExt']], bool]):
self.batch_processor = batch_processor
self.processed_count = 0
self.failed_count = 0
def consume_batch_message(self, batch_messages: List['MessageExt']) -> 'ConsumeResult':
"""消费批量消息"""
try:
print(f"收到批量消息: {len(batch_messages)} 条")
# 处理批量消息
success = self.batch_processor(batch_messages)
if success:
self.processed_count += len(batch_messages)
print(f"批量消息处理成功: {len(batch_messages)} 条")
return ConsumeResult.SUCCESS
else:
self.failed_count += len(batch_messages)
print(f"批量消息处理失败: {len(batch_messages)} 条")
return ConsumeResult.RECONSUME_LATER
except Exception as e:
print(f"批量消息处理异常: {e}")
self.failed_count += len(batch_messages)
return ConsumeResult.RECONSUME_LATER
def get_process_stats(self) -> Dict[str, int]:
"""获取处理统计"""
return {
"processed_count": self.processed_count,
"failed_count": self.failed_count,
"total_count": self.processed_count + self.failed_count
}
5.4.3 批量消息使用示例
# 批量消息使用示例
class BatchMessageExample:
"""批量消息使用示例"""
def __init__(self):
# 配置生产者
self.producer_config = ProducerConfig(
producer_group="batch_producer_group",
name_server_addr="localhost:9876"
)
# 配置消费者
self.consumer_config = ConsumerConfig(
consumer_group="batch_consumer_group",
name_server_addr="localhost:9876"
)
# 创建生产者和消费者
self.producer = BatchMessageProducer(self.producer_config)
self.consumer = BatchMessageConsumer(self.consumer_config)
def start_producer_example(self):
"""启动生产者示例"""
try:
self.producer.start()
# 发送批量消息
self._send_batch_messages()
# 发送大批量消息(会自动分割)
self._send_large_batch_messages()
# 异步发送批量消息
self._send_batch_messages_async()
# 打印统计信息
self._print_producer_stats()
finally:
self.producer.shutdown()
def start_consumer_example(self):
"""启动消费者示例"""
try:
# 注册批量消息监听器
listener = BusinessBatchMessageListener(self._process_batch_messages)
self.consumer.register_batch_listener(listener)
# 订阅主题
self.consumer.subscribe("BatchTopic", "*")
# 启动消费者
self.consumer.start()
print("批量消息消费者启动成功,等待消息...")
# 等待消息
while True:
time.sleep(10)
stats = self.consumer.get_consume_stats()
process_stats = listener.get_process_stats()
print(f"消费统计: {stats}")
print(f"处理统计: {process_stats}")
except KeyboardInterrupt:
print("收到中断信号,正在关闭消费者...")
finally:
self.consumer.shutdown()
def _send_batch_messages(self):
"""发送批量消息"""
from rocketmq_producer import Message
# 创建批量消息
messages = []
for i in range(10):
message = Message(
topic="BatchTopic",
tags="BatchTag",
keys=f"batch_key_{i}",
body=f"批量消息内容 {i}".encode('utf-8')
)
messages.append(message)
batch_message = BatchMessage(
topic="BatchTopic",
messages=messages
)
try:
result = self.producer.send_batch(batch_message)
print(f"发送批量消息成功: {result.msg_id}")
except Exception as e:
print(f"发送批量消息失败: {e}")
def _send_large_batch_messages(self):
"""发送大批量消息(会自动分割)"""
from rocketmq_producer import Message
# 创建大批量消息
messages = []
for i in range(1500): # 超过最大批量数量
message = Message(
topic="BatchTopic",
tags="LargeBatchTag",
keys=f"large_batch_key_{i}",
body=f"大批量消息内容 {i} - {'X' * 1000}".encode('utf-8') # 增加消息大小
)
messages.append(message)
batch_message = BatchMessage(
topic="BatchTopic",
messages=messages
)
try:
result = self.producer.send_batch(batch_message)
print(f"发送大批量消息成功: {result.msg_id}")
except Exception as e:
print(f"发送大批量消息失败: {e}")
def _send_batch_messages_async(self):
"""异步发送批量消息"""
from rocketmq_producer import Message
# 创建批量消息
messages = []
for i in range(5):
message = Message(
topic="BatchTopic",
tags="AsyncBatchTag",
keys=f"async_batch_key_{i}",
body=f"异步批量消息内容 {i}".encode('utf-8')
)
messages.append(message)
batch_message = BatchMessage(
topic="BatchTopic",
messages=messages
)
# 创建回调
callback = AsyncBatchSendCallback()
try:
self.producer.send_batch_async(batch_message, callback)
print("异步发送批量消息请求已提交")
# 等待回调完成
time.sleep(2)
except Exception as e:
print(f"异步发送批量消息失败: {e}")
def _process_batch_messages(self, messages: List['MessageExt']) -> bool:
"""处理批量消息"""
try:
print(f"\n处理批量消息: {len(messages)} 条")
for i, msg in enumerate(messages):
print(f" 消息 {i+1}:")
print(f" ID: {msg.msg_id}")
print(f" Tags: {msg.tags}")
print(f" Keys: {msg.keys}")
print(f" 内容: {msg.body.decode('utf-8')[:50]}...")
# 模拟批量处理
time.sleep(0.1 * len(messages))
print(f"批量消息处理完成: {len(messages)} 条")
return True
except Exception as e:
print(f"批量消息处理异常: {e}")
return False
def _print_producer_stats(self):
"""打印生产者统计信息"""
stats = self.producer.get_batch_stats()
print("\n批量消息生产者统计:")
print(f"总批量数: {stats['total_batches']}")
print(f"总消息数: {stats['total_messages']}")
print(f"成功批量数: {stats['success_batches']}")
print(f"失败批量数: {stats['failed_batches']}")
print(f"平均批量大小: {stats['avg_batch_size']} 字节")
# 异步批量发送回调
class AsyncBatchSendCallback(BatchSendCallback):
"""异步批量发送回调"""
def on_success(self, result: 'SendResult', batch_message: BatchMessage):
"""批量发送成功回调"""
print(f"异步批量发送成功: {result.msg_id}, 消息数量: {batch_message.get_message_count()}")
def on_exception(self, exception: Exception):
"""批量发送异常回调"""
print(f"异步批量发送失败: {exception}")
# 使用示例
if __name__ == "__main__":
example = BatchMessageExample()
# 发送批量消息示例
example.start_producer_example()
# 消费批量消息示例
# example.start_consumer_example()
5.5 消息过滤
5.5.1 标签过滤
# 标签过滤枚举
class FilterType(Enum):
"""过滤类型枚举"""
TAG = "TAG" # 标签过滤
SQL92 = "SQL92" # SQL92过滤
# 消息过滤器接口
class MessageFilter(ABC):
"""消息过滤器接口"""
@abstractmethod
def match(self, message: 'MessageExt') -> bool:
"""检查消息是否匹配过滤条件"""
pass
@abstractmethod
def get_filter_expression(self) -> str:
"""获取过滤表达式"""
pass
@abstractmethod
def get_filter_type(self) -> FilterType:
"""获取过滤类型"""
pass
# 标签过滤器
class TagFilter(MessageFilter):
"""标签过滤器"""
def __init__(self, tag_expression: str):
self.tag_expression = tag_expression
self.tags = self._parse_tag_expression(tag_expression)
def match(self, message: 'MessageExt') -> bool:
"""检查消息标签是否匹配"""
if not message.tags:
return "*" in self.tags
# 支持通配符
if "*" in self.tags:
return True
# 精确匹配
return message.tags in self.tags
def get_filter_expression(self) -> str:
"""获取过滤表达式"""
return self.tag_expression
def get_filter_type(self) -> FilterType:
"""获取过滤类型"""
return FilterType.TAG
def _parse_tag_expression(self, expression: str) -> Set[str]:
"""解析标签表达式"""
if not expression or expression == "*":
return {"*"}
# 支持多个标签,用 || 分隔
tags = set()
for tag in expression.split("||"):
tag = tag.strip()
if tag:
tags.add(tag)
return tags if tags else {"*"}
# SQL92过滤器
class SQL92Filter(MessageFilter):
"""SQL92过滤器"""
def __init__(self, sql_expression: str):
self.sql_expression = sql_expression
self.compiled_expression = self._compile_expression(sql_expression)
def match(self, message: 'MessageExt') -> bool:
"""检查消息是否匹配SQL表达式"""
try:
# 构建消息上下文
context = self._build_message_context(message)
# 执行SQL表达式
return self._evaluate_expression(self.compiled_expression, context)
except Exception as e:
print(f"SQL过滤器执行异常: {e}")
return False
def get_filter_expression(self) -> str:
"""获取过滤表达式"""
return self.sql_expression
def get_filter_type(self) -> FilterType:
"""获取过滤类型"""
return FilterType.SQL92
def _compile_expression(self, expression: str) -> Dict[str, Any]:
"""编译SQL表达式"""
# 简化的SQL解析器
import re
compiled = {
"original": expression,
"conditions": []
}
# 解析WHERE条件
where_match = re.search(r'WHERE\s+(.+)', expression, re.IGNORECASE)
if where_match:
where_clause = where_match.group(1)
# 解析条件(简化版本)
conditions = self._parse_where_clause(where_clause)
compiled["conditions"] = conditions
return compiled
def _parse_where_clause(self, where_clause: str) -> List[Dict[str, Any]]:
"""解析WHERE子句"""
import re
conditions = []
# 支持的操作符
operators = ['=', '!=', '>', '<', '>=', '<=', 'LIKE', 'IN']
# 简单的条件解析
for op in operators:
pattern = rf'(\w+)\s*{re.escape(op)}\s*([\'\"]?[^\s\'\"]+[\'\"]?)'
matches = re.findall(pattern, where_clause, re.IGNORECASE)
for match in matches:
field, value = match
conditions.append({
"field": field,
"operator": op,
"value": value.strip('\"\'')
})
return conditions
def _build_message_context(self, message: 'MessageExt') -> Dict[str, Any]:
"""构建消息上下文"""
context = {
"tags": message.tags or "",
"keys": message.keys or "",
"msgId": message.msg_id,
"topic": message.topic,
"queueId": message.queue_id,
"bornTimestamp": message.born_timestamp,
"storeTimestamp": message.store_timestamp
}
# 添加用户属性
if hasattr(message, 'properties') and message.properties:
for key, value in message.properties.items():
context[key] = value
return context
def _evaluate_expression(self, compiled_expr: Dict[str, Any],
context: Dict[str, Any]) -> bool:
"""评估表达式"""
conditions = compiled_expr.get("conditions", [])
if not conditions:
return True
# 简单的AND逻辑(实际应该支持更复杂的逻辑)
for condition in conditions:
if not self._evaluate_condition(condition, context):
return False
return True
def _evaluate_condition(self, condition: Dict[str, Any],
context: Dict[str, Any]) -> bool:
"""评估单个条件"""
field = condition["field"]
operator = condition["operator"]
expected_value = condition["value"]
actual_value = context.get(field, "")
try:
if operator == "=":
return str(actual_value) == str(expected_value)
elif operator == "!=":
return str(actual_value) != str(expected_value)
elif operator == ">":
return float(actual_value) > float(expected_value)
elif operator == "<":
return float(actual_value) < float(expected_value)
elif operator == ">=":
return float(actual_value) >= float(expected_value)
elif operator == "<=":
return float(actual_value) <= float(expected_value)
elif operator.upper() == "LIKE":
import re
pattern = expected_value.replace("%", ".*")
return bool(re.match(pattern, str(actual_value)))
elif operator.upper() == "IN":
values = expected_value.strip("()").split(",")
values = [v.strip().strip("'\"") for v in values]
return str(actual_value) in values
else:
return False
except (ValueError, TypeError):
return False
# 过滤消息生产者
class FilterMessageProducer:
"""支持过滤的消息生产者"""
def __init__(self, producer_config: 'ProducerConfig'):
self.config = producer_config
self.producer = None
self.send_stats = {
"total_sent": 0,
"success_sent": 0,
"failed_sent": 0
}
def start(self):
"""启动生产者"""
from rocketmq_producer import DefaultMQProducer
self.producer = DefaultMQProducer(self.config)
self.producer.start()
print("过滤消息生产者启动成功")
def shutdown(self):
"""关闭生产者"""
if self.producer:
self.producer.shutdown()
print("过滤消息生产者关闭成功")
def send_message_with_tags(self, topic: str, tags: str, keys: str,
body: str, properties: Dict[str, str] = None) -> 'SendResult':
"""发送带标签的消息"""
from rocketmq_producer import Message
message = Message(
topic=topic,
tags=tags,
keys=keys,
body=body.encode('utf-8')
)
# 设置用户属性
if properties:
for key, value in properties.items():
message.put_property(key, value)
try:
result = self.producer.send(message)
self.send_stats["total_sent"] += 1
self.send_stats["success_sent"] += 1
return result
except Exception as e:
self.send_stats["total_sent"] += 1
self.send_stats["failed_sent"] += 1
raise e
def send_messages_batch_with_different_tags(self, topic: str,
message_data: List[Dict[str, Any]]) -> List['SendResult']:
"""批量发送不同标签的消息"""
results = []
for data in message_data:
try:
result = self.send_message_with_tags(
topic=topic,
tags=data.get("tags", ""),
keys=data.get("keys", ""),
body=data.get("body", ""),
properties=data.get("properties", {})
)
results.append(result)
except Exception as e:
print(f"发送消息失败: {e}")
results.append(None)
return results
def get_send_stats(self) -> Dict[str, int]:
"""获取发送统计"""
return self.send_stats.copy()
# 过滤消息消费者
class FilterMessageConsumer:
"""支持过滤的消息消费者"""
def __init__(self, consumer_config: 'ConsumerConfig'):
self.config = consumer_config
self.consumer = None
self.message_filter: Optional[MessageFilter] = None
self.filter_stats = {
"total_received": 0,
"filtered_out": 0,
"processed": 0
}
def start(self):
"""启动消费者"""
from rocketmq_consumer import DefaultMQPushConsumer
self.consumer = DefaultMQPushConsumer(self.config)
# 注册过滤消息监听器
listener = FilterMessageListener(self.message_filter, self.filter_stats)
self.consumer.register_message_listener(listener)
self.consumer.start()
print("过滤消息消费者启动成功")
def shutdown(self):
"""关闭消费者"""
if self.consumer:
self.consumer.shutdown()
print("过滤消息消费者关闭成功")
def subscribe_with_tag_filter(self, topic: str, tag_expression: str):
"""使用标签过滤订阅主题"""
self.message_filter = TagFilter(tag_expression)
if self.consumer:
self.consumer.subscribe(topic, tag_expression)
def subscribe_with_sql_filter(self, topic: str, sql_expression: str):
"""使用SQL过滤订阅主题"""
self.message_filter = SQL92Filter(sql_expression)
if self.consumer:
# 注意:实际的RocketMQ需要在Broker端支持SQL过滤
self.consumer.subscribe(topic, "*") # 客户端过滤
def get_filter_stats(self) -> Dict[str, int]:
"""获取过滤统计"""
return self.filter_stats.copy()
# 过滤消息监听器
class FilterMessageListener:
"""过滤消息监听器"""
def __init__(self, message_filter: Optional[MessageFilter],
stats: Dict[str, int]):
self.message_filter = message_filter
self.stats = stats
def consume_message(self, messages: List['MessageExt'],
context: 'ConsumeContext') -> 'ConsumeResult':
"""消费消息"""
try:
filtered_messages = []
for message in messages:
self.stats["total_received"] += 1
# 应用过滤器
if self.message_filter and not self.message_filter.match(message):
self.stats["filtered_out"] += 1
continue
filtered_messages.append(message)
# 处理过滤后的消息
if filtered_messages:
result = self._process_filtered_messages(filtered_messages)
if result == ConsumeResult.SUCCESS:
self.stats["processed"] += len(filtered_messages)
return result
return ConsumeResult.SUCCESS
except Exception as e:
print(f"过滤消息处理异常: {e}")
return ConsumeResult.RECONSUME_LATER
def _process_filtered_messages(self, messages: List['MessageExt']) -> 'ConsumeResult':
"""处理过滤后的消息"""
try:
for message in messages:
print(f"处理过滤消息:")
print(f" ID: {message.msg_id}")
print(f" Topic: {message.topic}")
print(f" Tags: {message.tags}")
print(f" Keys: {message.keys}")
print(f" 内容: {message.body.decode('utf-8')[:100]}...")
# 打印用户属性
if hasattr(message, 'properties') and message.properties:
print(f" 属性: {message.properties}")
print()
return ConsumeResult.SUCCESS
except Exception as e:
print(f"处理过滤消息异常: {e}")
return ConsumeResult.RECONSUME_LATER
5.5.2 消息过滤使用示例
# 消息过滤使用示例
class MessageFilterExample:
"""消息过滤使用示例"""
def __init__(self):
# 配置生产者
self.producer_config = ProducerConfig(
producer_group="filter_producer_group",
name_server_addr="localhost:9876"
)
# 配置消费者
self.consumer_config = ConsumerConfig(
consumer_group="filter_consumer_group",
name_server_addr="localhost:9876"
)
# 创建生产者和消费者
self.producer = FilterMessageProducer(self.producer_config)
self.consumer = FilterMessageConsumer(self.consumer_config)
def start_producer_example(self):
"""启动生产者示例"""
try:
self.producer.start()
# 发送不同标签的消息
self._send_messages_with_different_tags()
# 发送带属性的消息
self._send_messages_with_properties()
# 打印发送统计
stats = self.producer.get_send_stats()
print(f"\n发送统计: {stats}")
finally:
self.producer.shutdown()
def start_tag_filter_consumer_example(self):
"""启动标签过滤消费者示例"""
try:
# 订阅特定标签的消息
self.consumer.subscribe_with_tag_filter("FilterTopic", "VIP || IMPORTANT")
# 启动消费者
self.consumer.start()
print("标签过滤消费者启动成功,等待消息...")
# 等待消息
while True:
time.sleep(10)
stats = self.consumer.get_filter_stats()
print(f"过滤统计: {stats}")
except KeyboardInterrupt:
print("收到中断信号,正在关闭消费者...")
finally:
self.consumer.shutdown()
def start_sql_filter_consumer_example(self):
"""启动SQL过滤消费者示例"""
try:
# 使用SQL表达式过滤消息
sql_expression = "WHERE level > 5 AND category = 'order'"
self.consumer.subscribe_with_sql_filter("FilterTopic", sql_expression)
# 启动消费者
self.consumer.start()
print("SQL过滤消费者启动成功,等待消息...")
# 等待消息
while True:
time.sleep(10)
stats = self.consumer.get_filter_stats()
print(f"过滤统计: {stats}")
except KeyboardInterrupt:
print("收到中断信号,正在关闭消费者...")
finally:
self.consumer.shutdown()
def _send_messages_with_different_tags(self):
"""发送不同标签的消息"""
message_data = [
{
"tags": "VIP",
"keys": "vip_user_001",
"body": "VIP用户订单消息",
"properties": {"level": "10", "category": "order"}
},
{
"tags": "NORMAL",
"keys": "normal_user_001",
"body": "普通用户订单消息",
"properties": {"level": "3", "category": "order"}
},
{
"tags": "IMPORTANT",
"keys": "important_001",
"body": "重要系统消息",
"properties": {"level": "8", "category": "system"}
},
{
"tags": "DEBUG",
"keys": "debug_001",
"body": "调试消息",
"properties": {"level": "1", "category": "debug"}
}
]
results = self.producer.send_messages_batch_with_different_tags(
"FilterTopic", message_data
)
for i, result in enumerate(results):
if result:
print(f"发送消息 {i+1} 成功: {result.msg_id}")
else:
print(f"发送消息 {i+1} 失败")
def _send_messages_with_properties(self):
"""发送带属性的消息"""
# 发送高级别订单消息
self.producer.send_message_with_tags(
topic="FilterTopic",
tags="ORDER",
keys="high_level_order",
body="高级别订单消息",
properties={
"level": "9",
"category": "order",
"priority": "high",
"amount": "1000.00"
}
)
# 发送低级别通知消息
self.producer.send_message_with_tags(
topic="FilterTopic",
tags="NOTIFICATION",
keys="low_level_notification",
body="低级别通知消息",
properties={
"level": "2",
"category": "notification",
"priority": "low"
}
)
# 过滤器测试
class FilterTest:
"""过滤器测试"""
def test_tag_filter(self):
"""测试标签过滤器"""
print("测试标签过滤器:")
# 创建测试消息
test_messages = [
self._create_test_message("VIP"),
self._create_test_message("NORMAL"),
self._create_test_message("IMPORTANT"),
self._create_test_message("DEBUG")
]
# 测试不同的标签过滤器
filters = [
TagFilter("VIP"),
TagFilter("VIP || IMPORTANT"),
TagFilter("*"),
TagFilter("PREMIUM")
]
for filter_obj in filters:
print(f"\n过滤表达式: {filter_obj.get_filter_expression()}")
for msg in test_messages:
match = filter_obj.match(msg)
print(f" 消息标签 '{msg.tags}': {'匹配' if match else '不匹配'}")
def test_sql_filter(self):
"""测试SQL过滤器"""
print("\n测试SQL过滤器:")
# 创建带属性的测试消息
test_messages = [
self._create_test_message_with_properties("ORDER", {"level": "8", "category": "order"}),
self._create_test_message_with_properties("ORDER", {"level": "3", "category": "order"}),
self._create_test_message_with_properties("NOTIFICATION", {"level": "6", "category": "notification"}),
self._create_test_message_with_properties("DEBUG", {"level": "1", "category": "debug"})
]
# 测试不同的SQL过滤器
sql_filters = [
"WHERE level > 5",
"WHERE level > 5 AND category = 'order'",
"WHERE category = 'notification'",
"WHERE level <= 3"
]
for sql_expr in sql_filters:
filter_obj = SQL92Filter(sql_expr)
print(f"\nSQL表达式: {sql_expr}")
for msg in test_messages:
match = filter_obj.match(msg)
props = getattr(msg, 'properties', {})
print(f" 消息 (level={props.get('level', 'N/A')}, category={props.get('category', 'N/A')}): {'匹配' if match else '不匹配'}")
def _create_test_message(self, tags: str) -> 'MessageExt':
"""创建测试消息"""
from rocketmq_consumer import MessageExt
return MessageExt(
topic="TestTopic",
tags=tags,
keys=f"test_key_{tags.lower()}",
body=f"测试消息内容 - {tags}".encode('utf-8'),
msg_id=f"test_msg_{tags.lower()}",
queue_id=0,
queue_offset=0,
born_timestamp=int(time.time() * 1000),
store_timestamp=int(time.time() * 1000)
)
def _create_test_message_with_properties(self, tags: str,
properties: Dict[str, str]) -> 'MessageExt':
"""创建带属性的测试消息"""
msg = self._create_test_message(tags)
msg.properties = properties
return msg
# 使用示例
if __name__ == "__main__":
# 测试过滤器
filter_test = FilterTest()
filter_test.test_tag_filter()
filter_test.test_sql_filter()
# 消息过滤示例
example = MessageFilterExample()
# 发送消息示例
example.start_producer_example()
# 标签过滤消费示例
# example.start_tag_filter_consumer_example()
# SQL过滤消费示例
# example.start_sql_filter_consumer_example()
5.6 本章总结
5.6.1 核心知识点
# RocketMQ高级特性总结
class RocketMQAdvancedFeaturesSummary:
"""RocketMQ高级特性总结"""
def __init__(self):
self.features = {
"顺序消息": {
"描述": "保证消息按照发送顺序被消费",
"类型": ["全局顺序", "分区顺序"],
"关键点": [
"使用MessageQueueSelector选择队列",
"同一队列内消息顺序消费",
"消费者需要顺序处理消息"
],
"适用场景": [
"订单状态变更",
"账户余额变动",
"库存变化记录"
]
},
"事务消息": {
"描述": "保证本地事务与消息发送的一致性",
"流程": [
"发送半消息",
"执行本地事务",
"提交或回滚事务消息"
],
"关键点": [
"实现TransactionListener接口",
"处理事务状态检查",
"确保幂等性"
],
"适用场景": [
"分布式事务",
"数据一致性保证",
"业务解耦"
]
},
"延时消息": {
"描述": "消息在指定时间后才能被消费",
"实现方式": [
"预定义延时级别",
"指定延时时间"
],
"关键点": [
"支持18个延时级别",
"最大延时2小时",
"延时精度秒级"
],
"适用场景": [
"订单超时取消",
"定时任务",
"延时通知"
]
},
"批量消息": {
"描述": "一次发送多条消息,提高吞吐量",
"优势": [
"减少网络开销",
"提高发送效率",
"降低系统负载"
],
"限制": [
"消息总大小不超过4MB",
"同一批次消息必须是同一Topic",
"不支持延时消息"
],
"适用场景": [
"日志收集",
"数据同步",
"批量通知"
]
},
"消息过滤": {
"描述": "根据条件过滤消息,减少无效消费",
"类型": [
"标签过滤(Tag Filter)",
"SQL92过滤(SQL Filter)"
],
"关键点": [
"支持多标签过滤",
"SQL表达式过滤",
"客户端和服务端过滤"
],
"适用场景": [
"消息分类处理",
"条件消费",
"减少网络传输"
]
}
}
def get_feature_summary(self, feature_name: str) -> Dict[str, Any]:
"""获取特性总结"""
return self.features.get(feature_name, {})
def get_all_features(self) -> List[str]:
"""获取所有特性列表"""
return list(self.features.keys())
def print_summary(self):
"""打印特性总结"""
print("RocketMQ高级特性总结:")
print("=" * 50)
for feature_name, feature_info in self.features.items():
print(f"\n{feature_name}:")
print(f" 描述: {feature_info['描述']}")
for key, value in feature_info.items():
if key != '描述':
print(f" {key}:")
if isinstance(value, list):
for item in value:
print(f" - {item}")
else:
print(f" {value}")
5.6.2 最佳实践
# RocketMQ高级特性最佳实践
class RocketMQAdvancedBestPractices:
"""RocketMQ高级特性最佳实践"""
def __init__(self):
self.best_practices = {
"顺序消息最佳实践": [
"合理选择分区键,避免热点队列",
"消费者处理失败时要谨慎重试",
"避免长时间阻塞消费线程",
"监控队列消费进度",
"考虑消费者扩容对顺序的影响"
],
"事务消息最佳实践": [
"本地事务要保证幂等性",
"事务状态检查要快速响应",
"合理设置事务超时时间",
"记录事务执行日志",
"处理事务回查逻辑"
],
"延时消息最佳实践": [
"选择合适的延时级别",
"避免大量相同时间的延时消息",
"考虑时钟偏移问题",
"监控延时消息堆积",
"设计延时消息的取消机制"
],
"批量消息最佳实践": [
"控制批量大小,避免超过限制",
"合理设置批量发送间隔",
"处理批量发送失败的重试",
"监控批量发送性能",
"考虑消息顺序要求"
],
"消息过滤最佳实践": [
"优先使用标签过滤",
"SQL过滤表达式要简洁",
"避免复杂的过滤逻辑",
"在生产者端设置合适的标签",
"监控过滤效果和性能"
],
"性能优化实践": [
"合理配置生产者和消费者参数",
"使用异步发送提高吞吐量",
"批量处理消息",
"避免频繁创建连接",
"监控关键性能指标"
],
"可靠性保证实践": [
"实现消息幂等处理",
"设计合理的重试机制",
"处理死信队列消息",
"监控消息堆积情况",
"建立消息追踪机制"
]
}
def get_best_practices(self, category: str) -> List[str]:
"""获取最佳实践"""
return self.best_practices.get(category, [])
def print_best_practices(self):
"""打印最佳实践"""
print("RocketMQ高级特性最佳实践:")
print("=" * 50)
for category, practices in self.best_practices.items():
print(f"\n{category}:")
for i, practice in enumerate(practices, 1):
print(f" {i}. {practice}")
# 性能监控和调优
class RocketMQPerformanceMonitor:
"""RocketMQ性能监控"""
def __init__(self):
self.metrics = {
"生产者指标": [
"发送TPS(每秒事务数)",
"发送延迟(RT)",
"发送成功率",
"发送失败率",
"重试次数"
],
"消费者指标": [
"消费TPS",
"消费延迟",
"消费成功率",
"消费失败率",
"消息堆积量"
],
"Broker指标": [
"消息存储量",
"磁盘使用率",
"内存使用率",
"网络IO",
"队列深度"
],
"系统指标": [
"CPU使用率",
"内存使用率",
"磁盘IO",
"网络带宽",
"连接数"
]
}
def get_monitoring_metrics(self) -> Dict[str, List[str]]:
"""获取监控指标"""
return self.metrics
def print_monitoring_guide(self):
"""打印监控指南"""
print("RocketMQ性能监控指南:")
print("=" * 50)
for category, metrics in self.metrics.items():
print(f"\n{category}:")
for metric in metrics:
print(f" - {metric}")
print("\n监控建议:")
print(" 1. 建立完善的监控体系")
print(" 2. 设置合理的告警阈值")
print(" 3. 定期分析性能趋势")
print(" 4. 建立性能基线")
print(" 5. 制定性能调优计划")
5.6.3 练习题
# 练习题
class RocketMQAdvancedExercises:
"""RocketMQ高级特性练习题"""
def __init__(self):
self.exercises = [
{
"题目": "实现一个电商订单处理系统",
"要求": [
"使用顺序消息保证订单状态变更的顺序",
"使用事务消息保证库存扣减和订单创建的一致性",
"使用延时消息实现订单超时自动取消",
"使用消息过滤实现不同类型订单的分类处理"
],
"提示": [
"以订单ID作为分区键保证顺序",
"在事务监听器中处理库存操作",
"设置合适的订单超时时间",
"使用标签区分订单类型"
]
},
{
"题目": "设计一个日志收集系统",
"要求": [
"使用批量消息提高日志发送效率",
"使用消息过滤实现不同级别日志的处理",
"实现日志消息的压缩和解压缩",
"监控日志发送和消费的性能指标"
],
"提示": [
"按时间或数量触发批量发送",
"使用日志级别作为过滤条件",
"在消息体中实现压缩逻辑",
"记录发送和消费的统计信息"
]
},
{
"题目": "实现一个分布式任务调度系统",
"要求": [
"使用延时消息实现定时任务",
"使用事务消息保证任务状态的一致性",
"使用消息过滤实现任务类型的分发",
"实现任务的重试和失败处理机制"
],
"提示": [
"根据执行时间计算延时级别",
"在事务中更新任务状态",
"使用任务类型作为标签",
"设计合理的重试策略"
]
},
{
"题目": "优化现有的消息系统性能",
"要求": [
"分析当前系统的性能瓶颈",
"使用批量消息优化发送性能",
"使用异步发送提高吞吐量",
"实现消息的监控和告警"
],
"提示": [
"监控发送和消费的延迟",
"合理设置批量大小和间隔",
"使用回调处理异步结果",
"建立完善的监控体系"
]
}
]
def get_exercise(self, index: int) -> Dict[str, Any]:
"""获取练习题"""
if 0 <= index < len(self.exercises):
return self.exercises[index]
return {}
def print_all_exercises(self):
"""打印所有练习题"""
print("RocketMQ高级特性练习题:")
print("=" * 50)
for i, exercise in enumerate(self.exercises, 1):
print(f"\n练习题 {i}: {exercise['题目']}")
print("要求:")
for req in exercise['要求']:
print(f" - {req}")
print("提示:")
for hint in exercise['提示']:
print(f" - {hint}")
print()
# 使用示例
if __name__ == "__main__":
# 打印特性总结
summary = RocketMQAdvancedFeaturesSummary()
summary.print_summary()
print("\n" + "=" * 80 + "\n")
# 打印最佳实践
best_practices = RocketMQAdvancedBestPractices()
best_practices.print_best_practices()
print("\n" + "=" * 80 + "\n")
# 打印监控指南
monitor = RocketMQPerformanceMonitor()
monitor.print_monitoring_guide()
print("\n" + "=" * 80 + "\n")
# 打印练习题
exercises = RocketMQAdvancedExercises()
exercises.print_all_exercises()
本章小结:
本章详细介绍了RocketMQ的五大高级特性:
- 顺序消息 - 保证消息的有序处理,适用于对顺序有严格要求的业务场景
- 事务消息 - 保证分布式事务的一致性,解决本地事务与消息发送的原子性问题
- 延时消息 - 实现消息的延时投递,支持定时任务和延时处理场景
- 批量消息 - 提高消息发送效率,减少网络开销,适用于高吞吐量场景
- 消息过滤 - 实现消息的精确投递,减少无效消费,提高系统效率
通过学习这些高级特性,你可以根据不同的业务需求选择合适的消息处理方式,构建更加高效、可靠的分布式消息系统。在实际应用中,要结合业务特点和性能要求,合理使用这些特性,并建立完善的监控和运维体系。