2.1 核心概念详解
2.1.1 消息模型
from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
import json
import uuid
class MessageType(Enum):
"""消息类型"""
NORMAL = "normal" # 普通消息
ORDERED = "ordered" # 顺序消息
TRANSACTION = "transaction" # 事务消息
DELAY = "delay" # 延时消息
BATCH = "batch" # 批量消息
class MessageStatus(Enum):
"""消息状态"""
PENDING = "pending" # 待发送
SENT = "sent" # 已发送
CONSUMED = "consumed" # 已消费
FAILED = "failed" # 发送失败
RETRY = "retry" # 重试中
@dataclass
class MessageProperty:
"""消息属性"""
key: str
value: str
description: Optional[str] = None
@dataclass
class Message:
"""RocketMQ消息模型"""
# 基本属性
topic: str
body: bytes
tags: Optional[str] = None
keys: Optional[str] = None
# 系统属性
msg_id: str = None
queue_id: int = None
queue_offset: int = None
born_timestamp: datetime = None
store_timestamp: datetime = None
# 扩展属性
properties: Dict[str, str] = None
delay_time_level: int = 0 # 延时级别
retry_times: int = 0 # 重试次数
# 事务相关
transaction_id: Optional[str] = None
def __post_init__(self):
if self.msg_id is None:
self.msg_id = str(uuid.uuid4())
if self.born_timestamp is None:
self.born_timestamp = datetime.now()
if self.properties is None:
self.properties = {}
def add_property(self, key: str, value: str) -> None:
"""添加消息属性"""
self.properties[key] = value
def get_property(self, key: str, default: str = None) -> str:
"""获取消息属性"""
return self.properties.get(key, default)
def set_delay_time_level(self, level: int) -> None:
"""设置延时级别
延时级别对应的时间:
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
"""
if 1 <= level <= 18:
self.delay_time_level = level
else:
raise ValueError("延时级别必须在1-18之间")
def get_delay_time(self) -> Optional[timedelta]:
"""获取延时时间"""
delay_levels = {
1: timedelta(seconds=1),
2: timedelta(seconds=5),
3: timedelta(seconds=10),
4: timedelta(seconds=30),
5: timedelta(minutes=1),
6: timedelta(minutes=2),
7: timedelta(minutes=3),
8: timedelta(minutes=4),
9: timedelta(minutes=5),
10: timedelta(minutes=6),
11: timedelta(minutes=7),
12: timedelta(minutes=8),
13: timedelta(minutes=9),
14: timedelta(minutes=10),
15: timedelta(minutes=20),
16: timedelta(minutes=30),
17: timedelta(hours=1),
18: timedelta(hours=2)
}
return delay_levels.get(self.delay_time_level)
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"msgId": self.msg_id,
"topic": self.topic,
"tags": self.tags,
"keys": self.keys,
"body": self.body.decode('utf-8') if isinstance(self.body, bytes) else self.body,
"queueId": self.queue_id,
"queueOffset": self.queue_offset,
"bornTimestamp": self.born_timestamp.isoformat() if self.born_timestamp else None,
"storeTimestamp": self.store_timestamp.isoformat() if self.store_timestamp else None,
"properties": self.properties,
"delayTimeLevel": self.delay_time_level,
"retryTimes": self.retry_times,
"transactionId": self.transaction_id
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Message':
"""从字典创建消息"""
body = data.get('body', '')
if isinstance(body, str):
body = body.encode('utf-8')
msg = cls(
topic=data['topic'],
body=body,
tags=data.get('tags'),
keys=data.get('keys')
)
msg.msg_id = data.get('msgId')
msg.queue_id = data.get('queueId')
msg.queue_offset = data.get('queueOffset')
if data.get('bornTimestamp'):
msg.born_timestamp = datetime.fromisoformat(data['bornTimestamp'])
if data.get('storeTimestamp'):
msg.store_timestamp = datetime.fromisoformat(data['storeTimestamp'])
msg.properties = data.get('properties', {})
msg.delay_time_level = data.get('delayTimeLevel', 0)
msg.retry_times = data.get('retryTimes', 0)
msg.transaction_id = data.get('transactionId')
return msg
class MessageBuilder:
"""消息构建器"""
def __init__(self, topic: str):
self.topic = topic
self.body = b''
self.tags = None
self.keys = None
self.properties = {}
self.delay_time_level = 0
self.transaction_id = None
def set_body(self, body: Any) -> 'MessageBuilder':
"""设置消息体"""
if isinstance(body, str):
self.body = body.encode('utf-8')
elif isinstance(body, bytes):
self.body = body
else:
self.body = json.dumps(body).encode('utf-8')
return self
def set_tags(self, tags: str) -> 'MessageBuilder':
"""设置消息标签"""
self.tags = tags
return self
def set_keys(self, keys: str) -> 'MessageBuilder':
"""设置消息键"""
self.keys = keys
return self
def add_property(self, key: str, value: str) -> 'MessageBuilder':
"""添加属性"""
self.properties[key] = value
return self
def set_delay_time_level(self, level: int) -> 'MessageBuilder':
"""设置延时级别"""
self.delay_time_level = level
return self
def set_transaction_id(self, transaction_id: str) -> 'MessageBuilder':
"""设置事务ID"""
self.transaction_id = transaction_id
return self
def build(self) -> Message:
"""构建消息"""
msg = Message(
topic=self.topic,
body=self.body,
tags=self.tags,
keys=self.keys,
properties=self.properties.copy(),
delay_time_level=self.delay_time_level,
transaction_id=self.transaction_id
)
return msg
# 消息工厂
class MessageFactory:
"""消息工厂"""
@staticmethod
def create_normal_message(topic: str, body: Any, tags: str = None, keys: str = None) -> Message:
"""创建普通消息"""
return MessageBuilder(topic).set_body(body).set_tags(tags).set_keys(keys).build()
@staticmethod
def create_delay_message(topic: str, body: Any, delay_level: int,
tags: str = None, keys: str = None) -> Message:
"""创建延时消息"""
return (MessageBuilder(topic)
.set_body(body)
.set_tags(tags)
.set_keys(keys)
.set_delay_time_level(delay_level)
.build())
@staticmethod
def create_transaction_message(topic: str, body: Any, transaction_id: str,
tags: str = None, keys: str = None) -> Message:
"""创建事务消息"""
return (MessageBuilder(topic)
.set_body(body)
.set_tags(tags)
.set_keys(keys)
.set_transaction_id(transaction_id)
.build())
@staticmethod
def create_ordered_message(topic: str, body: Any, order_key: str,
tags: str = None, keys: str = None) -> Message:
"""创建顺序消息"""
return (MessageBuilder(topic)
.set_body(body)
.set_tags(tags)
.set_keys(keys)
.add_property("ORDER_KEY", order_key)
.build())
# 使用示例
if __name__ == "__main__":
# 创建普通消息
normal_msg = MessageFactory.create_normal_message(
topic="TestTopic",
body={"orderId": "12345", "amount": 100.0},
tags="order",
keys="order_12345"
)
print(f"普通消息: {normal_msg.msg_id}")
# 创建延时消息
delay_msg = MessageFactory.create_delay_message(
topic="DelayTopic",
body="这是一个延时5秒的消息",
delay_level=2, # 5秒延时
tags="delay"
)
print(f"延时消息: {delay_msg.msg_id}, 延时时间: {delay_msg.get_delay_time()}")
# 创建事务消息
tx_msg = MessageFactory.create_transaction_message(
topic="TransactionTopic",
body="事务消息内容",
transaction_id="tx_001",
tags="transaction"
)
print(f"事务消息: {tx_msg.msg_id}, 事务ID: {tx_msg.transaction_id}")
# 创建顺序消息
ordered_msg = MessageFactory.create_ordered_message(
topic="OrderedTopic",
body="顺序消息内容",
order_key="user_001",
tags="ordered"
)
print(f"顺序消息: {ordered_msg.msg_id}, 顺序键: {ordered_msg.get_property('ORDER_KEY')}")
# 消息序列化
msg_dict = normal_msg.to_dict()
print(f"\n消息字典: {json.dumps(msg_dict, indent=2, ensure_ascii=False)}")
# 消息反序列化
restored_msg = Message.from_dict(msg_dict)
print(f"\n恢复的消息ID: {restored_msg.msg_id}")
2.1.2 Topic与Queue
from dataclasses import dataclass, field
from typing import List, Dict, Set, Optional
from enum import Enum
import threading
from datetime import datetime
class TopicType(Enum):
"""Topic类型"""
NORMAL = "normal" # 普通Topic
FIFO = "fifo" # 顺序Topic
DELAY = "delay" # 延时Topic
RETRY = "retry" # 重试Topic
DLQ = "dlq" # 死信Topic
class QueueType(Enum):
"""队列类型"""
READ_WRITE = "rw" # 可读可写
READ_ONLY = "r" # 只读
WRITE_ONLY = "w" # 只写
NONE = "none" # 不可读写
@dataclass
class QueueInfo:
"""队列信息"""
queue_id: int
broker_name: str
queue_type: QueueType = QueueType.READ_WRITE
min_offset: int = 0
max_offset: int = 0
last_update_time: datetime = field(default_factory=datetime.now)
def is_readable(self) -> bool:
"""是否可读"""
return self.queue_type in [QueueType.READ_WRITE, QueueType.READ_ONLY]
def is_writable(self) -> bool:
"""是否可写"""
return self.queue_type in [QueueType.READ_WRITE, QueueType.WRITE_ONLY]
def get_queue_size(self) -> int:
"""获取队列大小"""
return max(0, self.max_offset - self.min_offset)
@dataclass
class TopicConfig:
"""Topic配置"""
topic_name: str
topic_type: TopicType = TopicType.NORMAL
read_queue_nums: int = 4
write_queue_nums: int = 4
permission: int = 6 # 读写权限 (4=读, 2=写, 6=读写)
topic_filter_type: str = "SINGLE_TAG"
topic_sys_flag: int = 0
order: bool = False
def __post_init__(self):
if self.topic_type == TopicType.FIFO:
self.order = True
@dataclass
class TopicRouteInfo:
"""Topic路由信息"""
topic_name: str
queue_datas: List[QueueInfo] = field(default_factory=list)
broker_datas: List[Dict] = field(default_factory=list)
filter_server_table: Dict[str, List[str]] = field(default_factory=dict)
def get_read_queues(self) -> List[QueueInfo]:
"""获取可读队列"""
return [q for q in self.queue_datas if q.is_readable()]
def get_write_queues(self) -> List[QueueInfo]:
"""获取可写队列"""
return [q for q in self.queue_datas if q.is_writable()]
def get_queue_by_id(self, queue_id: int, broker_name: str) -> Optional[QueueInfo]:
"""根据ID和Broker名称获取队列"""
for queue in self.queue_datas:
if queue.queue_id == queue_id and queue.broker_name == broker_name:
return queue
return None
class TopicManager:
"""Topic管理器"""
def __init__(self):
self.topics: Dict[str, TopicConfig] = {}
self.topic_routes: Dict[str, TopicRouteInfo] = {}
self.lock = threading.RLock()
def create_topic(self, topic_config: TopicConfig) -> bool:
"""创建Topic"""
with self.lock:
if topic_config.topic_name in self.topics:
return False
self.topics[topic_config.topic_name] = topic_config
# 创建路由信息
route_info = TopicRouteInfo(topic_name=topic_config.topic_name)
# 为每个Broker创建队列
brokers = ["broker-a", "broker-b"] # 示例Broker列表
for broker_name in brokers:
# 创建读队列
for i in range(topic_config.read_queue_nums):
queue = QueueInfo(
queue_id=i,
broker_name=broker_name,
queue_type=QueueType.READ_WRITE
)
route_info.queue_datas.append(queue)
self.topic_routes[topic_config.topic_name] = route_info
return True
def delete_topic(self, topic_name: str) -> bool:
"""删除Topic"""
with self.lock:
if topic_name not in self.topics:
return False
del self.topics[topic_name]
if topic_name in self.topic_routes:
del self.topic_routes[topic_name]
return True
def update_topic_config(self, topic_name: str, config: TopicConfig) -> bool:
"""更新Topic配置"""
with self.lock:
if topic_name not in self.topics:
return False
self.topics[topic_name] = config
return True
def get_topic_config(self, topic_name: str) -> Optional[TopicConfig]:
"""获取Topic配置"""
return self.topics.get(topic_name)
def get_topic_route_info(self, topic_name: str) -> Optional[TopicRouteInfo]:
"""获取Topic路由信息"""
return self.topic_routes.get(topic_name)
def list_topics(self) -> List[str]:
"""列出所有Topic"""
return list(self.topics.keys())
def get_topic_stats(self, topic_name: str) -> Dict[str, any]:
"""获取Topic统计信息"""
if topic_name not in self.topics:
return {}
config = self.topics[topic_name]
route_info = self.topic_routes.get(topic_name)
stats = {
"topicName": topic_name,
"topicType": config.topic_type.value,
"readQueueNums": config.read_queue_nums,
"writeQueueNums": config.write_queue_nums,
"permission": config.permission,
"order": config.order
}
if route_info:
read_queues = route_info.get_read_queues()
write_queues = route_info.get_write_queues()
stats.update({
"totalQueues": len(route_info.queue_datas),
"readableQueues": len(read_queues),
"writableQueues": len(write_queues),
"totalMessages": sum(q.get_queue_size() for q in route_info.queue_datas),
"brokers": list(set(q.broker_name for q in route_info.queue_datas))
})
return stats
class QueueSelector:
"""队列选择器"""
@staticmethod
def select_queue_by_hash(queues: List[QueueInfo], hash_key: str) -> QueueInfo:
"""根据哈希值选择队列"""
if not queues:
raise ValueError("队列列表为空")
hash_value = hash(hash_key)
index = abs(hash_value) % len(queues)
return queues[index]
@staticmethod
def select_queue_by_round_robin(queues: List[QueueInfo], counter: int) -> QueueInfo:
"""轮询选择队列"""
if not queues:
raise ValueError("队列列表为空")
index = counter % len(queues)
return queues[index]
@staticmethod
def select_queue_by_random(queues: List[QueueInfo]) -> QueueInfo:
"""随机选择队列"""
import random
if not queues:
raise ValueError("队列列表为空")
return random.choice(queues)
@staticmethod
def select_queue_by_load(queues: List[QueueInfo]) -> QueueInfo:
"""根据负载选择队列(选择消息数最少的队列)"""
if not queues:
raise ValueError("队列列表为空")
return min(queues, key=lambda q: q.get_queue_size())
# 使用示例
if __name__ == "__main__":
# 创建Topic管理器
topic_manager = TopicManager()
# 创建普通Topic
normal_topic = TopicConfig(
topic_name="OrderTopic",
topic_type=TopicType.NORMAL,
read_queue_nums=4,
write_queue_nums=4
)
success = topic_manager.create_topic(normal_topic)
print(f"创建普通Topic: {success}")
# 创建顺序Topic
fifo_topic = TopicConfig(
topic_name="OrderedTopic",
topic_type=TopicType.FIFO,
read_queue_nums=1,
write_queue_nums=1
)
success = topic_manager.create_topic(fifo_topic)
print(f"创建顺序Topic: {success}")
# 获取Topic统计信息
stats = topic_manager.get_topic_stats("OrderTopic")
print(f"\nTopic统计信息:")
for key, value in stats.items():
print(f" {key}: {value}")
# 获取路由信息并选择队列
route_info = topic_manager.get_topic_route_info("OrderTopic")
if route_info:
write_queues = route_info.get_write_queues()
print(f"\n可写队列数量: {len(write_queues)}")
# 使用不同的队列选择策略
if write_queues:
# 哈希选择
hash_queue = QueueSelector.select_queue_by_hash(write_queues, "order_12345")
print(f"哈希选择队列: {hash_queue.broker_name}-{hash_queue.queue_id}")
# 轮询选择
rr_queue = QueueSelector.select_queue_by_round_robin(write_queues, 0)
print(f"轮询选择队列: {rr_queue.broker_name}-{rr_queue.queue_id}")
# 负载选择
load_queue = QueueSelector.select_queue_by_load(write_queues)
print(f"负载选择队列: {load_queue.broker_name}-{load_queue.queue_id}")
# 列出所有Topic
topics = topic_manager.list_topics()
print(f"\n所有Topic: {topics}")
2.1.3 Producer与Consumer
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable, Any
from enum import Enum
import threading
import time
from datetime import datetime
class ProducerType(Enum):
"""生产者类型"""
DEFAULT = "default" # 默认生产者
TRANSACTION = "transaction" # 事务生产者
BATCH = "batch" # 批量生产者
class ConsumerType(Enum):
"""消费者类型"""
PUSH = "push" # 推模式
PULL = "pull" # 拉模式
class ConsumeMode(Enum):
"""消费模式"""
CLUSTERING = "clustering" # 集群消费
BROADCASTING = "broadcasting" # 广播消费
class ConsumeResult(Enum):
"""消费结果"""
SUCCESS = "success" # 消费成功
RECONSUME_LATER = "reconsume_later" # 稍后重新消费
SUSPEND = "suspend" # 暂停消费
class SendResult(Enum):
"""发送结果"""
SEND_OK = "send_ok" # 发送成功
FLUSH_DISK_TIMEOUT = "flush_disk_timeout" # 刷盘超时
FLUSH_SLAVE_TIMEOUT = "flush_slave_timeout" # 同步到Slave超时
SLAVE_NOT_AVAILABLE = "slave_not_available" # Slave不可用
@dataclass
class SendResultInfo:
"""发送结果信息"""
send_status: SendResult
msg_id: str
message_queue: 'MessageQueue'
queue_offset: int
transaction_id: Optional[str] = None
region_id: Optional[str] = None
trace_on: bool = True
@dataclass
class MessageQueue:
"""消息队列"""
topic: str
broker_name: str
queue_id: int
def __str__(self) -> str:
return f"MessageQueue[topic={self.topic}, brokerName={self.broker_name}, queueId={self.queue_id}]"
def __eq__(self, other) -> bool:
if not isinstance(other, MessageQueue):
return False
return (self.topic == other.topic and
self.broker_name == other.broker_name and
self.queue_id == other.queue_id)
def __hash__(self) -> int:
return hash((self.topic, self.broker_name, self.queue_id))
@dataclass
class ConsumeContext:
"""消费上下文"""
message: Message
message_queue: MessageQueue
consumer_group: str
retry_times: int = 0
consume_start_time: datetime = field(default_factory=datetime.now)
class MessageQueueSelector(ABC):
"""消息队列选择器接口"""
@abstractmethod
def select(self, queues: List[MessageQueue], message: Message, arg: Any) -> MessageQueue:
"""选择消息队列"""
pass
class HashQueueSelector(MessageQueueSelector):
"""哈希队列选择器"""
def select(self, queues: List[MessageQueue], message: Message, arg: Any) -> MessageQueue:
if not queues:
raise ValueError("队列列表为空")
hash_key = str(arg) if arg is not None else message.keys or message.msg_id
hash_value = hash(hash_key)
index = abs(hash_value) % len(queues)
return queues[index]
class RoundRobinQueueSelector(MessageQueueSelector):
"""轮询队列选择器"""
def __init__(self):
self.counter = 0
self.lock = threading.Lock()
def select(self, queues: List[MessageQueue], message: Message, arg: Any) -> MessageQueue:
if not queues:
raise ValueError("队列列表为空")
with self.lock:
index = self.counter % len(queues)
self.counter += 1
return queues[index]
class MessageListener(ABC):
"""消息监听器接口"""
@abstractmethod
def consume_message(self, messages: List[Message], context: ConsumeContext) -> ConsumeResult:
"""消费消息"""
pass
class TransactionListener(ABC):
"""事务监听器接口"""
@abstractmethod
def execute_local_transaction(self, message: Message, arg: Any) -> 'LocalTransactionState':
"""执行本地事务"""
pass
@abstractmethod
def check_local_transaction(self, message: Message) -> 'LocalTransactionState':
"""检查本地事务状态"""
pass
class LocalTransactionState(Enum):
"""本地事务状态"""
COMMIT_MESSAGE = "commit" # 提交事务
ROLLBACK_MESSAGE = "rollback" # 回滚事务
UNKNOWN = "unknown" # 未知状态
@dataclass
class ProducerConfig:
"""生产者配置"""
producer_group: str
namesrv_addr: str
instance_name: str = "DEFAULT"
max_message_size: int = 4194304 # 4MB
send_msg_timeout: int = 3000 # 3秒
compress_msg_body_over_howmuch: int = 4096
retry_times_when_send_failed: int = 2
retry_times_when_send_async_failed: int = 2
retry_another_broker_when_not_store_ok: bool = False
create_topic_key: str = "TBW102"
default_topic_queue_nums: int = 4
@dataclass
class ConsumerConfig:
"""消费者配置"""
consumer_group: str
namesrv_addr: str
instance_name: str = "DEFAULT"
consume_mode: ConsumeMode = ConsumeMode.CLUSTERING
consume_from_where: str = "CONSUME_FROM_LAST_OFFSET"
consume_thread_min: int = 20
consume_thread_max: int = 64
consume_message_batch_max_size: int = 1
pull_threshold_for_queue: int = 1000
pull_interval: int = 0
consume_timeout: int = 15 # 分钟
max_reconsume_times: int = 16
class DefaultMQProducer:
"""默认消息生产者"""
def __init__(self, config: ProducerConfig):
self.config = config
self.started = False
self.queue_selector = RoundRobinQueueSelector()
self.topic_publish_info: Dict[str, List[MessageQueue]] = {}
self.lock = threading.RLock()
def start(self) -> None:
"""启动生产者"""
with self.lock:
if self.started:
return
# 初始化连接和路由信息
self._init_topic_route()
self.started = True
print(f"Producer {self.config.producer_group} started")
def shutdown(self) -> None:
"""关闭生产者"""
with self.lock:
if not self.started:
return
self.started = False
print(f"Producer {self.config.producer_group} shutdown")
def _init_topic_route(self) -> None:
"""初始化Topic路由信息"""
# 模拟从NameServer获取路由信息
default_queues = [
MessageQueue("DefaultTopic", "broker-a", 0),
MessageQueue("DefaultTopic", "broker-a", 1),
MessageQueue("DefaultTopic", "broker-b", 0),
MessageQueue("DefaultTopic", "broker-b", 1)
]
self.topic_publish_info["DefaultTopic"] = default_queues
def send(self, message: Message, selector: MessageQueueSelector = None,
selector_arg: Any = None, timeout: int = None) -> SendResultInfo:
"""同步发送消息"""
if not self.started:
raise RuntimeError("Producer未启动")
# 获取Topic的队列信息
queues = self.topic_publish_info.get(message.topic, [])
if not queues:
raise RuntimeError(f"Topic {message.topic} 路由信息不存在")
# 选择队列
if selector:
selected_queue = selector.select(queues, message, selector_arg)
else:
selected_queue = self.queue_selector.select(queues, message, selector_arg)
# 模拟发送消息
send_timeout = timeout or self.config.send_msg_timeout
try:
# 这里应该是实际的网络发送逻辑
time.sleep(0.01) # 模拟网络延迟
result = SendResultInfo(
send_status=SendResult.SEND_OK,
msg_id=message.msg_id,
message_queue=selected_queue,
queue_offset=int(time.time() * 1000) # 模拟队列偏移量
)
print(f"消息发送成功: {message.msg_id} -> {selected_queue}")
return result
except Exception as e:
print(f"消息发送失败: {e}")
raise
def send_async(self, message: Message, callback: Callable[[SendResultInfo, Exception], None],
selector: MessageQueueSelector = None, selector_arg: Any = None) -> None:
"""异步发送消息"""
def async_send():
try:
result = self.send(message, selector, selector_arg)
callback(result, None)
except Exception as e:
callback(None, e)
thread = threading.Thread(target=async_send)
thread.daemon = True
thread.start()
def send_oneway(self, message: Message, selector: MessageQueueSelector = None,
selector_arg: Any = None) -> None:
"""单向发送消息(不关心结果)"""
def oneway_send():
try:
self.send(message, selector, selector_arg)
except Exception:
pass # 忽略异常
thread = threading.Thread(target=oneway_send)
thread.daemon = True
thread.start()
class DefaultMQPushConsumer:
"""默认推模式消费者"""
def __init__(self, config: ConsumerConfig):
self.config = config
self.started = False
self.subscriptions: Dict[str, str] = {} # topic -> tag
self.message_listener: Optional[MessageListener] = None
self.consume_threads: List[threading.Thread] = []
self.running = False
self.lock = threading.RLock()
def subscribe(self, topic: str, sub_expression: str = "*") -> None:
"""订阅Topic"""
self.subscriptions[topic] = sub_expression
print(f"订阅Topic: {topic}, 表达式: {sub_expression}")
def register_message_listener(self, listener: MessageListener) -> None:
"""注册消息监听器"""
self.message_listener = listener
def start(self) -> None:
"""启动消费者"""
with self.lock:
if self.started:
return
if not self.message_listener:
raise RuntimeError("未注册消息监听器")
if not self.subscriptions:
raise RuntimeError("未订阅任何Topic")
self.running = True
self.started = True
# 启动消费线程
for i in range(self.config.consume_thread_min):
thread = threading.Thread(target=self._consume_loop, name=f"ConsumeThread-{i}")
thread.daemon = True
thread.start()
self.consume_threads.append(thread)
print(f"Consumer {self.config.consumer_group} started")
def shutdown(self) -> None:
"""关闭消费者"""
with self.lock:
if not self.started:
return
self.running = False
self.started = False
# 等待消费线程结束
for thread in self.consume_threads:
thread.join(timeout=1)
self.consume_threads.clear()
print(f"Consumer {self.config.consumer_group} shutdown")
def _consume_loop(self) -> None:
"""消费循环"""
while self.running:
try:
# 模拟从Broker拉取消息
messages = self._pull_messages()
if messages:
for message in messages:
self._consume_message(message)
else:
time.sleep(0.1) # 没有消息时短暂休眠
except Exception as e:
print(f"消费异常: {e}")
time.sleep(1)
def _pull_messages(self) -> List[Message]:
"""拉取消息(模拟)"""
# 这里应该是实际的消息拉取逻辑
# 为了演示,我们返回空列表
return []
def _consume_message(self, message: Message) -> None:
"""消费单个消息"""
try:
# 创建消费上下文
context = ConsumeContext(
message=message,
message_queue=MessageQueue(message.topic, "broker-a", 0),
consumer_group=self.config.consumer_group
)
# 调用消息监听器
result = self.message_listener.consume_message([message], context)
if result == ConsumeResult.SUCCESS:
print(f"消息消费成功: {message.msg_id}")
elif result == ConsumeResult.RECONSUME_LATER:
print(f"消息稍后重新消费: {message.msg_id}")
# 这里应该实现重试逻辑
else:
print(f"消息消费暂停: {message.msg_id}")
except Exception as e:
print(f"消费消息异常: {e}")
# 使用示例
class OrderMessageListener(MessageListener):
"""订单消息监听器"""
def consume_message(self, messages: List[Message], context: ConsumeContext) -> ConsumeResult:
for message in messages:
try:
# 处理订单消息
body = message.body.decode('utf-8')
print(f"处理订单消息: {body}")
# 模拟业务处理
time.sleep(0.1)
return ConsumeResult.SUCCESS
except Exception as e:
print(f"处理订单消息失败: {e}")
return ConsumeResult.RECONSUME_LATER
return ConsumeResult.SUCCESS
if __name__ == "__main__":
# 创建生产者配置
producer_config = ProducerConfig(
producer_group="OrderProducerGroup",
namesrv_addr="localhost:9876"
)
# 创建消费者配置
consumer_config = ConsumerConfig(
consumer_group="OrderConsumerGroup",
namesrv_addr="localhost:9876"
)
# 创建生产者
producer = DefaultMQProducer(producer_config)
producer.start()
# 创建消费者
consumer = DefaultMQPushConsumer(consumer_config)
consumer.subscribe("OrderTopic", "order")
consumer.register_message_listener(OrderMessageListener())
consumer.start()
try:
# 发送消息
for i in range(5):
message = MessageFactory.create_normal_message(
topic="OrderTopic",
body=f"订单消息 {i}",
tags="order",
keys=f"order_{i}"
)
# 同步发送
result = producer.send(message)
print(f"发送结果: {result.send_status}")
time.sleep(1)
# 等待消费
time.sleep(5)
finally:
# 关闭生产者和消费者
producer.shutdown()
consumer.shutdown()
2.2 架构深入分析
2.2.1 NameServer架构
from dataclasses import dataclass, field
from typing import Dict, List, Set, Optional, Tuple
from datetime import datetime, timedelta
import threading
import json
from enum import Enum
class RouteInfoType(Enum):
"""路由信息类型"""
TOPIC_ROUTE = "topic_route"
BROKER_LIVE = "broker_live"
CLUSTER_INFO = "cluster_info"
FILTER_SERVER = "filter_server"
@dataclass
class BrokerLiveInfo:
"""Broker存活信息"""
last_update_timestamp: datetime
data_version: int
channel: str # 连接通道标识
ha_server_addr: str # 高可用服务地址
def is_alive(self, timeout_ms: int = 120000) -> bool:
"""检查Broker是否存活"""
now = datetime.now()
timeout = timedelta(milliseconds=timeout_ms)
return (now - self.last_update_timestamp) < timeout
@dataclass
class BrokerData:
"""Broker数据"""
cluster: str
broker_name: str
broker_addrs: Dict[int, str] # brokerId -> address
def get_master_addr(self) -> Optional[str]:
"""获取Master地址"""
return self.broker_addrs.get(0) # brokerId=0为Master
def get_slave_addrs(self) -> Dict[int, str]:
"""获取Slave地址"""
return {bid: addr for bid, addr in self.broker_addrs.items() if bid != 0}
@dataclass
class QueueData:
"""队列数据"""
broker_name: str
read_queue_nums: int
write_queue_nums: int
perm: int # 权限
topic_syn_flag: int
@dataclass
class TopicRouteData:
"""Topic路由数据"""
order_topic_conf: Optional[str] = None
queue_datas: List[QueueData] = field(default_factory=list)
broker_datas: List[BrokerData] = field(default_factory=list)
filter_server_table: Dict[str, List[str]] = field(default_factory=dict)
def clone(self) -> 'TopicRouteData':
"""克隆路由数据"""
return TopicRouteData(
order_topic_conf=self.order_topic_conf,
queue_datas=self.queue_datas.copy(),
broker_datas=self.broker_datas.copy(),
filter_server_table=self.filter_server_table.copy()
)
class RouteInfoManager:
"""路由信息管理器"""
def __init__(self):
# 核心路由表
self.topic_queue_table: Dict[str, List[QueueData]] = {} # topic -> queues
self.broker_addr_table: Dict[str, BrokerData] = {} # brokerName -> BrokerData
self.cluster_addr_table: Dict[str, Set[str]] = {} # cluster -> brokerNames
self.broker_live_table: Dict[str, BrokerLiveInfo] = {} # brokerAddr -> BrokerLiveInfo
self.filter_server_table: Dict[str, List[str]] = {} # topic -> filterServers
# 读写锁
self.lock = threading.RWLock() if hasattr(threading, 'RWLock') else threading.RLock()
# 清理任务
self.cleanup_thread = None
self.running = False
def start(self) -> None:
"""启动路由信息管理器"""
self.running = True
self.cleanup_thread = threading.Thread(target=self._cleanup_offline_brokers, daemon=True)
self.cleanup_thread.start()
print("RouteInfoManager started")
def shutdown(self) -> None:
"""关闭路由信息管理器"""
self.running = False
if self.cleanup_thread:
self.cleanup_thread.join(timeout=1)
print("RouteInfoManager shutdown")
def register_broker(self, cluster_name: str, broker_addr: str, broker_name: str,
broker_id: int, ha_server_addr: str, topic_config_wrapper: Dict,
filter_server_list: List[str], channel: str) -> bool:
"""注册Broker"""
with self.lock:
# 更新集群信息
if cluster_name not in self.cluster_addr_table:
self.cluster_addr_table[cluster_name] = set()
self.cluster_addr_table[cluster_name].add(broker_name)
# 更新Broker地址信息
broker_data = self.broker_addr_table.get(broker_name)
if broker_data is None:
broker_data = BrokerData(
cluster=cluster_name,
broker_name=broker_name,
broker_addrs={}
)
self.broker_addr_table[broker_name] = broker_data
# 更新Broker地址映射
old_addr = broker_data.broker_addrs.get(broker_id)
broker_data.broker_addrs[broker_id] = broker_addr
# 更新Broker存活信息
self.broker_live_table[broker_addr] = BrokerLiveInfo(
last_update_timestamp=datetime.now(),
data_version=1,
channel=channel,
ha_server_addr=ha_server_addr
)
# 如果是Master Broker,更新Topic配置
if broker_id == 0 and topic_config_wrapper:
self._update_topic_config(broker_name, topic_config_wrapper)
# 更新过滤服务器信息
if filter_server_list:
for topic in topic_config_wrapper.keys():
self.filter_server_table[topic] = filter_server_list
# 清理旧地址的存活信息
if old_addr and old_addr != broker_addr and old_addr in self.broker_live_table:
del self.broker_live_table[old_addr]
print(f"Broker注册成功: {broker_name}[{broker_id}] @ {broker_addr}")
return True
def _update_topic_config(self, broker_name: str, topic_config_wrapper: Dict) -> None:
"""更新Topic配置"""
for topic_name, topic_config in topic_config_wrapper.items():
queue_data = QueueData(
broker_name=broker_name,
read_queue_nums=topic_config.get('readQueueNums', 4),
write_queue_nums=topic_config.get('writeQueueNums', 4),
perm=topic_config.get('perm', 6),
topic_syn_flag=topic_config.get('topicSynFlag', 0)
)
if topic_name not in self.topic_queue_table:
self.topic_queue_table[topic_name] = []
# 更新或添加队列数据
existing_queue = None
for i, qd in enumerate(self.topic_queue_table[topic_name]):
if qd.broker_name == broker_name:
existing_queue = i
break
if existing_queue is not None:
self.topic_queue_table[topic_name][existing_queue] = queue_data
else:
self.topic_queue_table[topic_name].append(queue_data)
def unregister_broker(self, cluster_name: str, broker_addr: str, broker_name: str,
broker_id: int) -> None:
"""注销Broker"""
with self.lock:
# 移除存活信息
if broker_addr in self.broker_live_table:
del self.broker_live_table[broker_addr]
# 移除Broker地址
broker_data = self.broker_addr_table.get(broker_name)
if broker_data and broker_id in broker_data.broker_addrs:
del broker_data.broker_addrs[broker_id]
# 如果没有其他Broker实例,移除整个BrokerData
if not broker_data.broker_addrs:
del self.broker_addr_table[broker_name]
# 从集群中移除
if cluster_name in self.cluster_addr_table:
self.cluster_addr_table[cluster_name].discard(broker_name)
if not self.cluster_addr_table[cluster_name]:
del self.cluster_addr_table[cluster_name]
# 移除相关的Topic队列信息
self._remove_topic_by_broker(broker_name)
print(f"Broker注销成功: {broker_name}[{broker_id}] @ {broker_addr}")
def _remove_topic_by_broker(self, broker_name: str) -> None:
"""移除指定Broker的Topic信息"""
topics_to_remove = []
for topic, queue_list in self.topic_queue_table.items():
# 移除该Broker的队列
self.topic_queue_table[topic] = [q for q in queue_list if q.broker_name != broker_name]
# 如果Topic没有队列了,标记删除
if not self.topic_queue_table[topic]:
topics_to_remove.append(topic)
# 删除空Topic
for topic in topics_to_remove:
del self.topic_queue_table[topic]
if topic in self.filter_server_table:
del self.filter_server_table[topic]
def pickup_topic_route_data(self, topic: str) -> Optional[TopicRouteData]:
"""获取Topic路由数据"""
with self.lock:
if topic not in self.topic_queue_table:
return None
route_data = TopicRouteData()
# 获取队列数据
route_data.queue_datas = self.topic_queue_table[topic].copy()
# 获取相关的Broker数据
broker_names = set(qd.broker_name for qd in route_data.queue_datas)
for broker_name in broker_names:
if broker_name in self.broker_addr_table:
route_data.broker_datas.append(self.broker_addr_table[broker_name])
# 获取过滤服务器信息
if topic in self.filter_server_table:
route_data.filter_server_table[topic] = self.filter_server_table[topic].copy()
return route_data
def get_all_cluster_info(self) -> Dict[str, any]:
"""获取所有集群信息"""
with self.lock:
cluster_info = {
"brokerAddrTable": {},
"clusterAddrTable": {}
}
# 转换Broker地址表
for broker_name, broker_data in self.broker_addr_table.items():
cluster_info["brokerAddrTable"][broker_name] = {
"cluster": broker_data.cluster,
"brokerName": broker_data.broker_name,
"brokerAddrs": broker_data.broker_addrs
}
# 转换集群地址表
for cluster, broker_names in self.cluster_addr_table.items():
cluster_info["clusterAddrTable"][cluster] = list(broker_names)
return cluster_info
def get_broker_live_info(self, broker_addr: str) -> Optional[BrokerLiveInfo]:
"""获取Broker存活信息"""
with self.lock:
return self.broker_live_table.get(broker_addr)
def get_all_topic_list(self) -> List[str]:
"""获取所有Topic列表"""
with self.lock:
return list(self.topic_queue_table.keys())
def _cleanup_offline_brokers(self) -> None:
"""清理离线Broker"""
while self.running:
try:
with self.lock:
offline_brokers = []
for broker_addr, live_info in self.broker_live_table.items():
if not live_info.is_alive():
offline_brokers.append(broker_addr)
# 移除离线Broker
for broker_addr in offline_brokers:
# 查找对应的Broker信息
broker_name = None
broker_id = None
cluster_name = None
for name, data in self.broker_addr_table.items():
for bid, addr in data.broker_addrs.items():
if addr == broker_addr:
broker_name = name
broker_id = bid
cluster_name = data.cluster
break
if broker_name:
break
if broker_name:
self.unregister_broker(cluster_name, broker_addr, broker_name, broker_id)
print(f"清理离线Broker: {broker_name}[{broker_id}] @ {broker_addr}")
# 每30秒检查一次
threading.Event().wait(30)
except Exception as e:
print(f"清理离线Broker异常: {e}")
threading.Event().wait(10)
class NameServerController:
"""NameServer控制器"""
def __init__(self, config: Dict[str, any]):
self.config = config
self.route_info_manager = RouteInfoManager()
self.kvconfig_manager = KVConfigManager()
self.broker_housekeeping_service = BrokerHousekeepingService(self.route_info_manager)
self.started = False
self.lock = threading.RLock()
def start(self) -> None:
"""启动NameServer"""
with self.lock:
if self.started:
return
self.route_info_manager.start()
self.broker_housekeeping_service.start()
self.started = True
print(f"NameServer启动成功,监听端口: {self.config.get('listenPort', 9876)}")
def shutdown(self) -> None:
"""关闭NameServer"""
with self.lock:
if not self.started:
return
self.route_info_manager.shutdown()
self.broker_housekeeping_service.shutdown()
self.started = False
print("NameServer关闭成功")
def register_broker_all(self, cluster_name: str, broker_addr: str, broker_name: str,
broker_id: int, ha_server_addr: str, topic_config_wrapper: Dict,
filter_server_list: List[str], oneway: bool,
timeout_millis: int, channel: str) -> Dict[str, any]:
"""注册Broker到所有NameServer"""
result = {
"haServerAddr": ha_server_addr,
"masterAddr": None
}
# 注册Broker
register_first = self.route_info_manager.register_broker(
cluster_name, broker_addr, broker_name, broker_id,
ha_server_addr, topic_config_wrapper, filter_server_list, channel
)
# 获取Master地址
broker_data = self.route_info_manager.broker_addr_table.get(broker_name)
if broker_data:
result["masterAddr"] = broker_data.get_master_addr()
return result
def get_route_info_by_topic(self, topic: str) -> Optional[Dict[str, any]]:
"""根据Topic获取路由信息"""
route_data = self.route_info_manager.pickup_topic_route_data(topic)
if route_data is None:
return None
return {
"orderTopicConf": route_data.order_topic_conf,
"queueDatas": [
{
"brokerName": qd.broker_name,
"readQueueNums": qd.read_queue_nums,
"writeQueueNums": qd.write_queue_nums,
"perm": qd.perm,
"topicSynFlag": qd.topic_syn_flag
} for qd in route_data.queue_datas
],
"brokerDatas": [
{
"cluster": bd.cluster,
"brokerName": bd.broker_name,
"brokerAddrs": bd.broker_addrs
} for bd in route_data.broker_datas
],
"filterServerTable": route_data.filter_server_table
}
class KVConfigManager:
"""KV配置管理器"""
def __init__(self):
self.config_table: Dict[str, Dict[str, str]] = {}
self.lock = threading.RWLock() if hasattr(threading, 'RWLock') else threading.RLock()
def put_kv_config(self, namespace: str, key: str, value: str) -> None:
"""设置KV配置"""
with self.lock:
if namespace not in self.config_table:
self.config_table[namespace] = {}
self.config_table[namespace][key] = value
def get_kv_config(self, namespace: str, key: str) -> Optional[str]:
"""获取KV配置"""
with self.lock:
return self.config_table.get(namespace, {}).get(key)
def delete_kv_config(self, namespace: str, key: str) -> None:
"""删除KV配置"""
with self.lock:
if namespace in self.config_table and key in self.config_table[namespace]:
del self.config_table[namespace][key]
if not self.config_table[namespace]:
del self.config_table[namespace]
def get_kv_list_by_namespace(self, namespace: str) -> Dict[str, str]:
"""根据命名空间获取KV列表"""
with self.lock:
return self.config_table.get(namespace, {}).copy()
class BrokerHousekeepingService:
"""Broker管家服务"""
def __init__(self, route_info_manager: RouteInfoManager):
self.route_info_manager = route_info_manager
self.running = False
self.housekeeping_thread = None
def start(self) -> None:
"""启动管家服务"""
self.running = True
self.housekeeping_thread = threading.Thread(target=self._housekeeping_loop, daemon=True)
self.housekeeping_thread.start()
print("BrokerHousekeepingService started")
def shutdown(self) -> None:
"""关闭管家服务"""
self.running = False
if self.housekeeping_thread:
self.housekeeping_thread.join(timeout=1)
print("BrokerHousekeepingService shutdown")
def _housekeeping_loop(self) -> None:
"""管家服务循环"""
while self.running:
try:
# 检查Broker连接状态
self._check_broker_connections()
# 每10秒检查一次
threading.Event().wait(10)
except Exception as e:
print(f"BrokerHousekeeping异常: {e}")
threading.Event().wait(5)
def _check_broker_connections(self) -> None:
"""检查Broker连接状态"""
# 这里应该检查网络连接状态
# 为了演示,我们只是打印日志
pass
# 使用示例
if __name__ == "__main__":
# NameServer配置
nameserver_config = {
"listenPort": 9876,
"serverWorkerThreads": 8,
"serverCallbackExecutorThreads": 0,
"serverSelectorThreads": 3,
"serverOnewaySemaphoreValue": 256,
"serverAsyncSemaphoreValue": 64,
"serverChannelMaxIdleTimeSeconds": 120,
"serverSocketSndBufSize": 65535,
"serverSocketRcvBufSize": 65535,
"serverPooledByteBufAllocatorEnable": True,
"useEpollNativeSelector": False
}
# 创建NameServer控制器
nameserver = NameServerController(nameserver_config)
nameserver.start()
try:
# 模拟Broker注册
topic_config = {
"OrderTopic": {
"readQueueNums": 4,
"writeQueueNums": 4,
"perm": 6,
"topicSynFlag": 0
},
"PaymentTopic": {
"readQueueNums": 2,
"writeQueueNums": 2,
"perm": 6,
"topicSynFlag": 0
}
}
# 注册Master Broker
result = nameserver.register_broker_all(
cluster_name="DefaultCluster",
broker_addr="192.168.1.100:10911",
broker_name="broker-a",
broker_id=0,
ha_server_addr="192.168.1.100:10912",
topic_config_wrapper=topic_config,
filter_server_list=[],
oneway=False,
timeout_millis=3000,
channel="channel_001"
)
print(f"Broker注册结果: {result}")
# 注册Slave Broker
nameserver.register_broker_all(
cluster_name="DefaultCluster",
broker_addr="192.168.1.101:10911",
broker_name="broker-a",
broker_id=1,
ha_server_addr="192.168.1.101:10912",
topic_config_wrapper={},
filter_server_list=[],
oneway=False,
timeout_millis=3000,
channel="channel_002"
)
# 获取Topic路由信息
route_info = nameserver.get_route_info_by_topic("OrderTopic")
if route_info:
print(f"\nOrderTopic路由信息:")
print(f"队列数据: {len(route_info['queueDatas'])}")
print(f"Broker数据: {len(route_info['brokerDatas'])}")
for qd in route_info['queueDatas']:
print(f" 队列: {qd['brokerName']} - 读:{qd['readQueueNums']} 写:{qd['writeQueueNums']}")
for bd in route_info['brokerDatas']:
print(f" Broker: {bd['brokerName']} - 集群:{bd['cluster']} 地址:{bd['brokerAddrs']}")
# 获取集群信息
cluster_info = nameserver.route_info_manager.get_all_cluster_info()
print(f"\n集群信息:")
print(f"集群数量: {len(cluster_info['clusterAddrTable'])}")
print(f"Broker数量: {len(cluster_info['brokerAddrTable'])}")
# 获取所有Topic列表
topics = nameserver.route_info_manager.get_all_topic_list()
print(f"\n所有Topic: {topics}")
# 等待一段时间观察
time.sleep(5)
finally:
nameserver.shutdown()
2.2.2 Broker架构
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable, Any, Tuple
from enum import Enum
import threading
import time
from datetime import datetime, timedelta
import os
import json
class BrokerRole(Enum):
"""Broker角色"""
ASYNC_MASTER = "ASYNC_MASTER" # 异步主
SYNC_MASTER = "SYNC_MASTER" # 同步主
SLAVE = "SLAVE" # 从
class FlushDiskType(Enum):
"""刷盘类型"""
SYNC_FLUSH = "SYNC_FLUSH" # 同步刷盘
ASYNC_FLUSH = "ASYNC_FLUSH" # 异步刷盘
class MessageStoreConfig:
"""消息存储配置"""
def __init__(self):
# 存储路径配置
self.store_path_root_dir = "/tmp/rocketmq/store" # 存储根目录
self.store_path_commit_log = "/tmp/rocketmq/store/commitlog" # CommitLog路径
self.store_path_consume_queue = "/tmp/rocketmq/store/consumequeue" # ConsumeQueue路径
self.store_path_index = "/tmp/rocketmq/store/index" # Index路径
# 文件大小配置
self.mapped_file_size_commit_log = 1024 * 1024 * 1024 # 1GB
self.mapped_file_size_consume_queue = 300000 * 20 # 6MB
# 刷盘配置
self.flush_disk_type = FlushDiskType.ASYNC_FLUSH
self.flush_interval_commit_log = 500 # 毫秒
self.flush_interval_consume_queue = 1000 # 毫秒
# 清理配置
self.file_reserved_time = 72 # 小时
self.delete_when = "04" # 删除时间点
self.disk_max_used_space_ratio = 75 # 磁盘最大使用率
# 其他配置
self.max_message_size = 4 * 1024 * 1024 # 4MB
self.check_crc_on_recover = True
self.flush_commit_log_least_pages = 4
self.commit_interval_commit_log = 200
@dataclass
class CommitLogData:
"""CommitLog数据"""
offset: int
size: int
data: bytes
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class ConsumeQueueData:
"""ConsumeQueue数据"""
offset: int # CommitLog偏移量
size: int # 消息大小
tag_hash_code: int # Tag哈希码
class CommitLog:
"""CommitLog实现"""
def __init__(self, config: MessageStoreConfig):
self.config = config
self.mapped_files: List[Dict] = [] # 映射文件列表
self.wrote_position = 0 # 写入位置
self.committed_position = 0 # 已提交位置
self.flushed_position = 0 # 已刷盘位置
self.lock = threading.RLock()
# 创建存储目录
os.makedirs(config.store_path_commit_log, exist_ok=True)
def put_message(self, message: Message) -> Tuple[bool, int]:
"""存储消息"""
with self.lock:
# 序列化消息
message_bytes = self._serialize_message(message)
message_size = len(message_bytes)
# 检查当前文件是否有足够空间
current_file = self._get_last_mapped_file()
if current_file is None or current_file['remaining_space'] < message_size:
current_file = self._create_mapped_file()
# 写入消息
offset = current_file['start_offset'] + current_file['wrote_position']
current_file['data'][current_file['wrote_position']:current_file['wrote_position'] + message_size] = message_bytes
current_file['wrote_position'] += message_size
current_file['remaining_space'] -= message_size
self.wrote_position = offset + message_size
# 异步刷盘
if self.config.flush_disk_type == FlushDiskType.ASYNC_FLUSH:
self._async_flush()
else:
self._sync_flush()
return True, offset
def _serialize_message(self, message: Message) -> bytes:
"""序列化消息"""
# 简化的消息序列化
message_dict = message.to_dict()
message_json = json.dumps(message_dict, ensure_ascii=False)
return message_json.encode('utf-8')
def _get_last_mapped_file(self) -> Optional[Dict]:
"""获取最后一个映射文件"""
return self.mapped_files[-1] if self.mapped_files else None
def _create_mapped_file(self) -> Dict:
"""创建新的映射文件"""
file_index = len(self.mapped_files)
start_offset = file_index * self.config.mapped_file_size_commit_log
mapped_file = {
'file_name': f"commitlog_{file_index:020d}",
'start_offset': start_offset,
'file_size': self.config.mapped_file_size_commit_log,
'wrote_position': 0,
'committed_position': 0,
'flushed_position': 0,
'remaining_space': self.config.mapped_file_size_commit_log,
'data': bytearray(self.config.mapped_file_size_commit_log)
}
self.mapped_files.append(mapped_file)
return mapped_file
def _async_flush(self) -> None:
"""异步刷盘"""
# 这里应该实现异步刷盘逻辑
pass
def _sync_flush(self) -> None:
"""同步刷盘"""
# 这里应该实现同步刷盘逻辑
self.flushed_position = self.wrote_position
def get_message(self, offset: int, size: int) -> Optional[bytes]:
"""根据偏移量获取消息"""
with self.lock:
# 查找对应的映射文件
for mapped_file in self.mapped_files:
if (offset >= mapped_file['start_offset'] and
offset < mapped_file['start_offset'] + mapped_file['file_size']):
relative_offset = offset - mapped_file['start_offset']
if relative_offset + size <= mapped_file['wrote_position']:
return bytes(mapped_file['data'][relative_offset:relative_offset + size])
return None
class ConsumeQueue:
"""ConsumeQueue实现"""
def __init__(self, topic: str, queue_id: int, config: MessageStoreConfig):
self.topic = topic
self.queue_id = queue_id
self.config = config
self.consume_queue_data: List[ConsumeQueueData] = []
self.max_offset = 0
self.min_offset = 0
self.lock = threading.RLock()
# 创建存储目录
self.store_path = os.path.join(config.store_path_consume_queue, topic, str(queue_id))
os.makedirs(self.store_path, exist_ok=True)
def put_message_position_info(self, offset: int, size: int, tag_hash_code: int) -> None:
"""存储消息位置信息"""
with self.lock:
consume_queue_data = ConsumeQueueData(
offset=offset,
size=size,
tag_hash_code=tag_hash_code
)
self.consume_queue_data.append(consume_queue_data)
self.max_offset += 1
def get_index_buffer(self, start_index: int) -> List[ConsumeQueueData]:
"""获取索引缓冲区"""
with self.lock:
if start_index < 0 or start_index >= len(self.consume_queue_data):
return []
return self.consume_queue_data[start_index:]
def get_earliest_unit(self) -> Optional[ConsumeQueueData]:
"""获取最早的单元"""
with self.lock:
return self.consume_queue_data[0] if self.consume_queue_data else None
def get_latest_unit(self) -> Optional[ConsumeQueueData]:
"""获取最新的单元"""
with self.lock:
return self.consume_queue_data[-1] if self.consume_queue_data else None
class IndexFile:
"""索引文件实现"""
def __init__(self, config: MessageStoreConfig):
self.config = config
self.hash_slot_count = 5000000 # 哈希槽数量
self.index_count_per_slot = 4 # 每个槽的索引数量
self.index_table: Dict[str, List[Dict]] = {} # key -> [index_info]
self.lock = threading.RLock()
# 创建存储目录
os.makedirs(config.store_path_index, exist_ok=True)
def put_key(self, key: str, phy_offset: int, store_timestamp: int) -> bool:
"""存储索引键"""
with self.lock:
if key not in self.index_table:
self.index_table[key] = []
index_info = {
'key': key,
'phy_offset': phy_offset,
'store_timestamp': store_timestamp,
'create_time': datetime.now()
}
self.index_table[key].append(index_info)
return True
def select_phy_offset(self, key: str, max_num: int, begin_timestamp: int,
end_timestamp: int) -> List[int]:
"""根据键选择物理偏移量"""
with self.lock:
if key not in self.index_table:
return []
result = []
for index_info in self.index_table[key]:
if (index_info['store_timestamp'] >= begin_timestamp and
index_info['store_timestamp'] <= end_timestamp):
result.append(index_info['phy_offset'])
if len(result) >= max_num:
break
return result
class DefaultMessageStore:
"""默认消息存储实现"""
def __init__(self, config: MessageStoreConfig, broker_config: Dict):
self.config = config
self.broker_config = broker_config
self.running = False
# 核心组件
self.commit_log = CommitLog(config)
self.consume_queue_table: Dict[str, Dict[int, ConsumeQueue]] = {} # topic -> {queueId -> ConsumeQueue}
self.index_service = IndexFile(config)
# 统计信息
self.store_stats = {
'put_message_entire_time_max': 0,
'put_message_times_total': 0,
'put_message_size_total': 0,
'get_message_entire_time_max': 0,
'get_message_times_total': 0
}
self.lock = threading.RLock()
def start(self) -> None:
"""启动消息存储"""
with self.lock:
if self.running:
return
# 恢复数据
self._recover()
# 启动后台服务
self._start_background_services()
self.running = True
print("MessageStore started")
def shutdown(self) -> None:
"""关闭消息存储"""
with self.lock:
if not self.running:
return
self.running = False
print("MessageStore shutdown")
def put_message(self, message: Message) -> 'PutMessageResult':
"""存储消息"""
start_time = time.time()
try:
# 存储到CommitLog
success, offset = self.commit_log.put_message(message)
if not success:
return PutMessageResult(PutMessageStatus.PUT_MESSAGE_FAILED, None, offset)
# 构建ConsumeQueue
self._build_consume_queue(message, offset, len(self.commit_log._serialize_message(message)))
# 构建索引
if message.keys:
self._build_index(message, offset)
# 更新统计信息
elapsed_time = (time.time() - start_time) * 1000
self.store_stats['put_message_entire_time_max'] = max(
self.store_stats['put_message_entire_time_max'], elapsed_time
)
self.store_stats['put_message_times_total'] += 1
self.store_stats['put_message_size_total'] += len(message.body)
return PutMessageResult(PutMessageStatus.PUT_MESSAGE_OK, message, offset)
except Exception as e:
print(f"存储消息异常: {e}")
return PutMessageResult(PutMessageStatus.PUT_MESSAGE_FAILED, message, 0)
def _build_consume_queue(self, message: Message, offset: int, size: int) -> None:
"""构建ConsumeQueue"""
topic = message.topic
queue_id = message.queue_id or 0
# 获取或创建ConsumeQueue
if topic not in self.consume_queue_table:
self.consume_queue_table[topic] = {}
if queue_id not in self.consume_queue_table[topic]:
self.consume_queue_table[topic][queue_id] = ConsumeQueue(topic, queue_id, self.config)
consume_queue = self.consume_queue_table[topic][queue_id]
# 计算Tag哈希码
tag_hash_code = hash(message.tags) if message.tags else 0
# 存储位置信息
consume_queue.put_message_position_info(offset, size, tag_hash_code)
def _build_index(self, message: Message, offset: int) -> None:
"""构建索引"""
if message.keys:
keys = message.keys.split(" ")
for key in keys:
if key.strip():
self.index_service.put_key(
key.strip(),
offset,
int(message.born_timestamp.timestamp() * 1000)
)
def get_message(self, group: str, topic: str, queue_id: int, offset: int,
max_msg_nums: int, message_filter) -> 'GetMessageResult':
"""获取消息"""
start_time = time.time()
try:
# 获取ConsumeQueue
if (topic not in self.consume_queue_table or
queue_id not in self.consume_queue_table[topic]):
return GetMessageResult(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE, 0, 0, 0, [])
consume_queue = self.consume_queue_table[topic][queue_id]
# 获取消息
messages = []
current_offset = offset
consume_queue_data_list = consume_queue.get_index_buffer(current_offset)
for i, cq_data in enumerate(consume_queue_data_list):
if len(messages) >= max_msg_nums:
break
# 从CommitLog读取消息
message_bytes = self.commit_log.get_message(cq_data.offset, cq_data.size)
if message_bytes:
try:
message_dict = json.loads(message_bytes.decode('utf-8'))
message = Message.from_dict(message_dict)
messages.append(message)
current_offset += 1
except Exception as e:
print(f"反序列化消息失败: {e}")
continue
# 更新统计信息
elapsed_time = (time.time() - start_time) * 1000
self.store_stats['get_message_entire_time_max'] = max(
self.store_stats['get_message_entire_time_max'], elapsed_time
)
self.store_stats['get_message_times_total'] += 1
if messages:
return GetMessageResult(GetMessageStatus.FOUND, current_offset,
consume_queue.min_offset, consume_queue.max_offset, messages)
else:
return GetMessageResult(GetMessageStatus.NO_MESSAGE_IN_QUEUE, current_offset,
consume_queue.min_offset, consume_queue.max_offset, [])
except Exception as e:
print(f"获取消息异常: {e}")
return GetMessageResult(GetMessageStatus.MESSAGE_WAS_REMOVING, offset, 0, 0, [])
def _recover(self) -> None:
"""恢复数据"""
# 这里应该实现数据恢复逻辑
print("数据恢复完成")
def _start_background_services(self) -> None:
"""启动后台服务"""
# 启动刷盘服务
# 启动清理服务
# 启动统计服务
pass
def get_store_stats(self) -> Dict[str, any]:
"""获取存储统计信息"""
return self.store_stats.copy()
class PutMessageStatus(Enum):
"""存储消息状态"""
PUT_MESSAGE_OK = "PUT_MESSAGE_OK"
PUT_MESSAGE_FAILED = "PUT_MESSAGE_FAILED"
FLUSH_DISK_TIMEOUT = "FLUSH_DISK_TIMEOUT"
FLUSH_SLAVE_TIMEOUT = "FLUSH_SLAVE_TIMEOUT"
SLAVE_NOT_AVAILABLE = "SLAVE_NOT_AVAILABLE"
class GetMessageStatus(Enum):
"""获取消息状态"""
FOUND = "FOUND"
NO_MATCHED_MESSAGE = "NO_MATCHED_MESSAGE"
MESSAGE_WAS_REMOVING = "MESSAGE_WAS_REMOVING"
OFFSET_FOUND_NULL = "OFFSET_FOUND_NULL"
OFFSET_OVERFLOW_BADLY = "OFFSET_OVERFLOW_BADLY"
OFFSET_OVERFLOW_ONE = "OFFSET_OVERFLOW_ONE"
OFFSET_TOO_SMALL = "OFFSET_TOO_SMALL"
NO_MATCHED_LOGIC_QUEUE = "NO_MATCHED_LOGIC_QUEUE"
NO_MESSAGE_IN_QUEUE = "NO_MESSAGE_IN_QUEUE"
@dataclass
class PutMessageResult:
"""存储消息结果"""
put_message_status: PutMessageStatus
message: Optional[Message]
phy_offset: int
auto_recall: bool = False
@dataclass
class GetMessageResult:
"""获取消息结果"""
status: GetMessageStatus
next_begin_offset: int
min_offset: int
max_offset: int
message_list: List[Message]
suggest_pulling_from_slave: bool = False
class BrokerController:
"""Broker控制器"""
def __init__(self, broker_config: Dict, netty_server_config: Dict,
netty_client_config: Dict, message_store_config: MessageStoreConfig):
self.broker_config = broker_config
self.netty_server_config = netty_server_config
self.netty_client_config = netty_client_config
self.message_store_config = message_store_config
# 核心组件
self.message_store = DefaultMessageStore(message_store_config, broker_config)
self.topic_config_manager = TopicConfigManager(self)
self.consumer_offset_manager = ConsumerOffsetManager(self)
self.subscription_group_manager = SubscriptionGroupManager(self)
self.producer_manager = ProducerManager()
self.consumer_manager = ConsumerManager(self)
# 网络组件
self.remotingServer = None # 远程服务器
self.fastRemotingServer = None # 快速远程服务器
# 状态管理
self.broker_stats = BrokerStats()
self.send_message_executor = None
self.pull_message_executor = None
self.admin_broker_executor = None
# 同步组件
self.slave_synchronize = SlaveSynchronize(self)
self.broker_outer_api = BrokerOuterAPI(netty_client_config)
self.shutdown_hook = False
self.lock = threading.RLock()
def start(self) -> bool:
"""启动Broker"""
try:
# 启动消息存储
self.message_store.start()
# 启动网络服务
self._start_remoting_server()
# 启动各种管理器
self.topic_config_manager.start()
self.consumer_offset_manager.start()
self.subscription_group_manager.start()
# 启动统计服务
self.broker_stats.start()
# 注册到NameServer
self._register_broker_all(True, False, True)
# 启动定时任务
self._start_scheduled_tasks()
print(f"Broker启动成功: {self.broker_config['brokerName']}")
return True
except Exception as e:
print(f"Broker启动失败: {e}")
return False
def shutdown(self) -> None:
"""关闭Broker"""
with self.lock:
if self.shutdown_hook:
return
self.shutdown_hook = True
# 注销NameServer注册
self._unregister_broker_all()
# 关闭各种服务
if self.remotingServer:
self.remotingServer.shutdown()
if self.fastRemotingServer:
self.fastRemotingServer.shutdown()
# 关闭消息存储
self.message_store.shutdown()
# 关闭管理器
self.topic_config_manager.shutdown()
self.consumer_offset_manager.shutdown()
self.subscription_group_manager.shutdown()
print(f"Broker关闭成功: {self.broker_config['brokerName']}")
def _start_remoting_server(self) -> None:
"""启动远程服务器"""
# 这里应该启动Netty服务器
# 为了演示,我们只是打印日志
listen_port = self.netty_server_config.get('listenPort', 10911)
print(f"RemotingServer启动,监听端口: {listen_port}")
fast_listen_port = self.netty_server_config.get('fastListenPort', 10909)
if fast_listen_port > 0:
print(f"FastRemotingServer启动,监听端口: {fast_listen_port}")
def _register_broker_all(self, check_order_config: bool, oneway: bool,
force_register: bool) -> None:
"""注册Broker到所有NameServer"""
topic_config_wrapper = self.topic_config_manager.build_topic_config_serialize_wrapper()
# 这里应该向所有NameServer发送注册请求
print(f"向NameServer注册Broker: {self.broker_config['brokerName']}")
def _unregister_broker_all(self) -> None:
"""从所有NameServer注销Broker"""
print(f"从NameServer注销Broker: {self.broker_config['brokerName']}")
def _start_scheduled_tasks(self) -> None:
"""启动定时任务"""
# 定时注册Broker
# 定时清理过期消息
# 定时统计
print("定时任务启动")
def put_message(self, message: Message) -> PutMessageResult:
"""存储消息"""
# 检查Broker状态
if self.shutdown_hook:
return PutMessageResult(PutMessageStatus.PUT_MESSAGE_FAILED, message, 0)
# 检查Topic配置
topic_config = self.topic_config_manager.select_topic_config(message.topic)
if topic_config is None:
# 自动创建Topic
topic_config = self.topic_config_manager.create_topic_in_send_message_method(
message.topic, 4, 6, 0
)
# 存储消息
result = self.message_store.put_message(message)
# 更新统计
self.broker_stats.inc_put_nums()
self.broker_stats.inc_put_size(len(message.body))
return result
def get_message(self, group: str, topic: str, queue_id: int, offset: int,
max_msg_nums: int, message_filter) -> GetMessageResult:
"""获取消息"""
# 检查Broker状态
if self.shutdown_hook:
return GetMessageResult(GetMessageStatus.MESSAGE_WAS_REMOVING, offset, 0, 0, [])
# 获取消息
result = self.message_store.get_message(group, topic, queue_id, offset,
max_msg_nums, message_filter)
# 更新统计
self.broker_stats.inc_get_nums()
return result
class TopicConfigManager:
"""Topic配置管理器"""
def __init__(self, broker_controller):
self.broker_controller = broker_controller
self.topic_config_table: Dict[str, Dict] = {}
self.lock = threading.RLock()
# 系统Topic
self._init_system_topic()
def _init_system_topic(self) -> None:
"""初始化系统Topic"""
# 自动创建Topic
self.topic_config_table["TBW102"] = {
"topicName": "TBW102",
"readQueueNums": 8,
"writeQueueNums": 8,
"perm": 7,
"topicSysFlag": 0,
"topicFilterType": "SINGLE_TAG",
"order": False
}
# Broker自身Topic
broker_name = self.broker_controller.broker_config.get('brokerName', 'DefaultBroker')
self.topic_config_table[broker_name] = {
"topicName": broker_name,
"readQueueNums": 1,
"writeQueueNums": 1,
"perm": 7,
"topicSysFlag": 0,
"topicFilterType": "SINGLE_TAG",
"order": False
}
def start(self) -> None:
"""启动Topic配置管理器"""
print("TopicConfigManager started")
def shutdown(self) -> None:
"""关闭Topic配置管理器"""
print("TopicConfigManager shutdown")
def select_topic_config(self, topic: str) -> Optional[Dict]:
"""选择Topic配置"""
with self.lock:
return self.topic_config_table.get(topic)
def create_topic_in_send_message_method(self, topic: str, default_topic_queue_nums: int,
perm: int, topic_sys_flag: int) -> Dict:
"""在发送消息时创建Topic"""
with self.lock:
topic_config = {
"topicName": topic,
"readQueueNums": default_topic_queue_nums,
"writeQueueNums": default_topic_queue_nums,
"perm": perm,
"topicSysFlag": topic_sys_flag,
"topicFilterType": "SINGLE_TAG",
"order": False
}
self.topic_config_table[topic] = topic_config
print(f"自动创建Topic: {topic}")
return topic_config
def build_topic_config_serialize_wrapper(self) -> Dict:
"""构建Topic配置序列化包装器"""
with self.lock:
return {
"topicConfigTable": self.topic_config_table.copy(),
"dataVersion": {
"timestamp": int(time.time() * 1000),
"counter": 1
}
}
class ConsumerOffsetManager:
"""消费者偏移量管理器"""
def __init__(self, broker_controller):
self.broker_controller = broker_controller
self.offset_table: Dict[str, Dict[int, int]] = {} # group@topic -> {queueId -> offset}
self.lock = threading.RLock()
def start(self) -> None:
"""启动消费者偏移量管理器"""
print("ConsumerOffsetManager started")
def shutdown(self) -> None:
"""关闭消费者偏移量管理器"""
print("ConsumerOffsetManager shutdown")
def commit_offset(self, client_host: str, group: str, topic: str,
queue_id: int, offset: int) -> None:
"""提交偏移量"""
key = f"{group}@{topic}"
with self.lock:
if key not in self.offset_table:
self.offset_table[key] = {}
self.offset_table[key][queue_id] = offset
print(f"提交偏移量: {key} queueId={queue_id} offset={offset}")
def query_offset(self, group: str, topic: str, queue_id: int) -> int:
"""查询偏移量"""
key = f"{group}@{topic}"
with self.lock:
if key in self.offset_table and queue_id in self.offset_table[key]:
return self.offset_table[key][queue_id]
return -1
class SubscriptionGroupManager:
"""订阅组管理器"""
def __init__(self, broker_controller):
self.broker_controller = broker_controller
self.subscription_group_table: Dict[str, Dict] = {}
self.lock = threading.RLock()
# 初始化默认订阅组
self._init_default_subscription_group()
def _init_default_subscription_group(self) -> None:
"""初始化默认订阅组"""
default_group = {
"groupName": "DEFAULT_CONSUMER",
"consumeEnable": True,
"consumeFromMinEnable": True,
"consumeBroadcastEnable": True,
"retryQueueNums": 1,
"retryMaxTimes": 16,
"brokerId": 0,
"whichBrokerWhenConsumeSlowly": 1
}
self.subscription_group_table["DEFAULT_CONSUMER"] = default_group
def start(self) -> None:
"""启动订阅组管理器"""
print("SubscriptionGroupManager started")
def shutdown(self) -> None:
"""关闭订阅组管理器"""
print("SubscriptionGroupManager shutdown")
def find_subscription_group_config(self, group: str) -> Optional[Dict]:
"""查找订阅组配置"""
with self.lock:
return self.subscription_group_table.get(group)
class ProducerManager:
"""生产者管理器"""
def __init__(self):
self.group_channel_table: Dict[str, Dict] = {} # group -> {channel -> client_info}
self.lock = threading.RLock()
def register_producer(self, group: str, client_channel_info: Dict) -> None:
"""注册生产者"""
with self.lock:
if group not in self.group_channel_table:
self.group_channel_table[group] = {}
channel_id = client_channel_info.get('clientId', 'unknown')
self.group_channel_table[group][channel_id] = client_channel_info
print(f"注册生产者: group={group} client={channel_id}")
def unregister_producer(self, group: str, client_channel_info: Dict) -> None:
"""注销生产者"""
with self.lock:
if group in self.group_channel_table:
channel_id = client_channel_info.get('clientId', 'unknown')
if channel_id in self.group_channel_table[group]:
del self.group_channel_table[group][channel_id]
print(f"注销生产者: group={group} client={channel_id}")
if not self.group_channel_table[group]:
del self.group_channel_table[group]
class ConsumerManager:
"""消费者管理器"""
def __init__(self, broker_controller):
self.broker_controller = broker_controller
self.group_channel_table: Dict[str, Dict] = {} # group -> {channel -> client_info}
self.lock = threading.RLock()
def register_consumer(self, group: str, client_channel_info: Dict,
consume_type: str, message_model: str,
consume_from_where: str, subscription_data_set: List[Dict]) -> bool:
"""注册消费者"""
with self.lock:
if group not in self.group_channel_table:
self.group_channel_table[group] = {}
channel_id = client_channel_info.get('clientId', 'unknown')
consumer_info = {
'clientChannelInfo': client_channel_info,
'consumeType': consume_type,
'messageModel': message_model,
'consumeFromWhere': consume_from_where,
'subscriptionDataSet': subscription_data_set,
'lastUpdateTimestamp': int(time.time() * 1000)
}
self.group_channel_table[group][channel_id] = consumer_info
print(f"注册消费者: group={group} client={channel_id} type={consume_type}")
return True
def unregister_consumer(self, group: str, client_channel_info: Dict) -> None:
"""注销消费者"""
with self.lock:
if group in self.group_channel_table:
channel_id = client_channel_info.get('clientId', 'unknown')
if channel_id in self.group_channel_table[group]:
del self.group_channel_table[group][channel_id]
print(f"注销消费者: group={group} client={channel_id}")
if not self.group_channel_table[group]:
del self.group_channel_table[group]
class BrokerStats:
"""Broker统计信息"""
def __init__(self):
self.put_nums = 0
self.put_size = 0
self.get_nums = 0
self.get_size = 0
self.get_found_nums = 0
self.get_miss_nums = 0
self.put_message_distribute_time = [0] * 13 # 不同时间段的分布
self.lock = threading.RLock()
def start(self) -> None:
"""启动统计服务"""
print("BrokerStats started")
def inc_put_nums(self) -> None:
"""增加Put次数"""
with self.lock:
self.put_nums += 1
def inc_put_size(self, size: int) -> None:
"""增加Put大小"""
with self.lock:
self.put_size += size
def inc_get_nums(self) -> None:
"""增加Get次数"""
with self.lock:
self.get_nums += 1
def get_stats_info(self) -> Dict[str, any]:
"""获取统计信息"""
with self.lock:
return {
'putNums': self.put_nums,
'putSize': self.put_size,
'getNums': self.get_nums,
'getSize': self.get_size,
'getFoundNums': self.get_found_nums,
'getMissNums': self.get_miss_nums
}
class SlaveSynchronize:
"""从节点同步"""
def __init__(self, broker_controller):
self.broker_controller = broker_controller
self.master_addr = None
self.sync_thread = None
self.running = False
def start(self) -> None:
"""启动同步服务"""
if self.broker_controller.broker_config.get('brokerId', 0) != 0:
self.running = True
self.sync_thread = threading.Thread(target=self._sync_loop, daemon=True)
self.sync_thread.start()
print("SlaveSynchronize started")
def shutdown(self) -> None:
"""关闭同步服务"""
self.running = False
if self.sync_thread:
self.sync_thread.join(timeout=1)
print("SlaveSynchronize shutdown")
def _sync_loop(self) -> None:
"""同步循环"""
while self.running:
try:
# 从Master同步数据
self._sync_from_master()
# 每5秒同步一次
threading.Event().wait(5)
except Exception as e:
print(f"同步异常: {e}")
threading.Event().wait(10)
def _sync_from_master(self) -> None:
"""从Master同步"""
# 这里应该实现从Master同步数据的逻辑
pass
class BrokerOuterAPI:
"""Broker外部API"""
def __init__(self, netty_client_config: Dict):
self.netty_client_config = netty_client_config
self.name_server_addr_list: List[str] = []
def update_name_server_addr_list(self, addrs: List[str]) -> None:
"""更新NameServer地址列表"""
self.name_server_addr_list = addrs.copy()
print(f"更新NameServer地址列表: {addrs}")
def register_broker_all(self, cluster_name: str, broker_addr: str, broker_name: str,
broker_id: int, ha_server_addr: str, topic_config_wrapper: Dict,
filter_server_list: List[str], oneway: bool, timeout_millis: int) -> Dict:
"""向所有NameServer注册Broker"""
# 这里应该实现向NameServer注册的网络请求
print(f"向NameServer注册Broker: {broker_name}")
return {"success": True}
# 使用示例
if __name__ == "__main__":
# Broker配置
broker_config = {
"brokerClusterName": "DefaultCluster",
"brokerName": "broker-a",
"brokerId": 0,
"deleteWhen": "04",
"fileReservedTime": 48,
"brokerRole": BrokerRole.ASYNC_MASTER,
"flushDiskType": FlushDiskType.ASYNC_FLUSH
}
# 网络配置
netty_server_config = {
"listenPort": 10911,
"serverWorkerThreads": 8,
"serverCallbackExecutorThreads": 0,
"serverSelectorThreads": 3,
"serverOnewaySemaphoreValue": 256,
"serverAsyncSemaphoreValue": 64,
"serverChannelMaxIdleTimeSeconds": 120
}
netty_client_config = {
"clientWorkerThreads": 4,
"clientCallbackExecutorThreads": 0,
"clientOnewaySemaphoreValue": 65535,
"clientAsyncSemaphoreValue": 65535,
"connectTimeoutMillis": 3000,
"channelNotActiveInterval": 60000
}
# 消息存储配置
message_store_config = MessageStoreConfig()
# 创建Broker控制器
broker_controller = BrokerController(
broker_config, netty_server_config,
netty_client_config, message_store_config
)
# 启动Broker
if broker_controller.start():
try:
# 模拟发送消息
message = Message(
topic="TestTopic",
body="Hello RocketMQ".encode('utf-8'),
tags="TagA",
keys="OrderID001"
)
result = broker_controller.put_message(message)
print(f"发送消息结果: {result.put_message_status}")
# 模拟消费消息
get_result = broker_controller.get_message(
"TestGroup", "TestTopic", 0, 0, 32, None
)
print(f"消费消息结果: {get_result.status} 消息数量: {len(get_result.message_list)}")
# 获取统计信息
stats = broker_controller.broker_stats.get_stats_info()
print(f"Broker统计信息: {stats}")
# 获取存储统计信息
store_stats = broker_controller.message_store.get_store_stats()
print(f"存储统计信息: {store_stats}")
# 等待一段时间
time.sleep(10)
finally:
broker_controller.shutdown()
else:
print("Broker启动失败")
2.3 消息流转过程
2.3.1 消息发送流程
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
from enum import Enum
import time
import threading
import hashlib
import random
class SendStatus(Enum):
"""发送状态"""
SEND_OK = "SEND_OK"
FLUSH_DISK_TIMEOUT = "FLUSH_DISK_TIMEOUT"
FLUSH_SLAVE_TIMEOUT = "FLUSH_SLAVE_TIMEOUT"
SLAVE_NOT_AVAILABLE = "SLAVE_NOT_AVAILABLE"
class CommunicationMode(Enum):
"""通信模式"""
SYNC = "SYNC" # 同步
ASYNC = "ASYNC" # 异步
ONEWAY = "ONEWAY" # 单向
@dataclass
class SendResult:
"""发送结果"""
send_status: SendStatus
msg_id: str
message_queue: 'MessageQueue'
queue_offset: int
transaction_id: Optional[str] = None
offset_msg_id: Optional[str] = None
region_id: Optional[str] = None
trace_on: bool = True
@dataclass
class MessageQueue:
"""消息队列"""
topic: str
broker_name: str
queue_id: int
def __hash__(self):
return hash(f"{self.topic}@{self.broker_name}@{self.queue_id}")
def __eq__(self, other):
if not isinstance(other, MessageQueue):
return False
return (self.topic == other.topic and
self.broker_name == other.broker_name and
self.queue_id == other.queue_id)
class MessageSendFlow:
"""消息发送流程"""
def __init__(self, producer, nameserver_client):
self.producer = producer
self.nameserver_client = nameserver_client
self.topic_publish_info_table: Dict[str, 'TopicPublishInfo'] = {}
self.lock = threading.RLock()
def send_message(self, message: Message, communication_mode: CommunicationMode,
send_callback=None, timeout: int = 3000) -> SendResult:
"""发送消息主流程"""
# 1. 验证消息
self._validate_message(message)
# 2. 获取Topic发布信息
topic_publish_info = self._try_to_find_topic_publish_info(message.topic)
if topic_publish_info is None or not topic_publish_info.ok:
raise Exception(f"No route info of this topic: {message.topic}")
# 3. 选择消息队列
message_queue = self._select_one_message_queue(topic_publish_info, None)
# 4. 发送消息到Broker
return self._send_message_to_broker(message, message_queue, communication_mode,
send_callback, timeout)
def _validate_message(self, message: Message) -> None:
"""验证消息"""
if not message.topic:
raise ValueError("Topic不能为空")
if not message.body:
raise ValueError("消息体不能为空")
if len(message.body) > 4 * 1024 * 1024: # 4MB
raise ValueError("消息体大小不能超过4MB")
# 检查Topic名称规范
if not self._is_valid_topic_name(message.topic):
raise ValueError(f"Topic名称不规范: {message.topic}")
def _is_valid_topic_name(self, topic: str) -> bool:
"""检查Topic名称是否规范"""
if len(topic) > 127:
return False
# 不能包含特殊字符
invalid_chars = [' ', '\t', '\r', '\n', '#', '*', '?', '>', '<', '|', '&']
for char in invalid_chars:
if char in topic:
return False
return True
def _try_to_find_topic_publish_info(self, topic: str) -> Optional['TopicPublishInfo']:
"""尝试查找Topic发布信息"""
with self.lock:
# 先从缓存获取
topic_publish_info = self.topic_publish_info_table.get(topic)
if topic_publish_info is None or not topic_publish_info.ok:
# 从NameServer获取路由信息
topic_route_data = self.nameserver_client.get_topic_route_info_from_name_server(topic)
if topic_route_data:
topic_publish_info = self._topic_route_data_to_topic_publish_info(topic, topic_route_data)
self.topic_publish_info_table[topic] = topic_publish_info
return topic_publish_info
def _topic_route_data_to_topic_publish_info(self, topic: str, route_data: Dict) -> 'TopicPublishInfo':
"""将路由数据转换为发布信息"""
info = TopicPublishInfo()
info.topic_route_data = route_data
info.order_topic = route_data.get('orderTopicConf', '')
# 构建消息队列列表
queue_datas = route_data.get('queueDatas', [])
broker_datas = route_data.get('brokerDatas', [])
# 创建Broker地址映射
broker_addr_table = {}
for broker_data in broker_datas:
broker_addr_table[broker_data['brokerName']] = broker_data['brokerAddrs']
# 创建消息队列
message_queue_list = []
for queue_data in queue_datas:
broker_name = queue_data['brokerName']
if broker_name in broker_addr_table:
broker_addrs = broker_addr_table[broker_name]
# 只选择Master Broker (brokerId = 0)
if '0' in broker_addrs:
for i in range(queue_data['writeQueueNums']):
mq = MessageQueue(topic, broker_name, i)
message_queue_list.append(mq)
info.message_queue_list = message_queue_list
info.send_which_queue = random.randint(0, len(message_queue_list) - 1) if message_queue_list else 0
info.have_topic_router_info = True
return info
def _select_one_message_queue(self, topic_publish_info: 'TopicPublishInfo',
last_broker_name: Optional[str]) -> MessageQueue:
"""选择一个消息队列"""
if last_broker_name is None:
# 轮询选择
return self._select_one_message_queue_by_round_robin(topic_publish_info)
else:
# 故障规避选择
return self._select_one_message_queue_by_fault_strategy(topic_publish_info, last_broker_name)
def _select_one_message_queue_by_round_robin(self, topic_publish_info: 'TopicPublishInfo') -> MessageQueue:
"""轮询选择消息队列"""
message_queue_list = topic_publish_info.message_queue_list
if not message_queue_list:
raise Exception("没有可用的消息队列")
# 简单轮询
index = topic_publish_info.send_which_queue % len(message_queue_list)
topic_publish_info.send_which_queue += 1
return message_queue_list[index]
def _select_one_message_queue_by_fault_strategy(self, topic_publish_info: 'TopicPublishInfo',
last_broker_name: str) -> MessageQueue:
"""故障规避选择消息队列"""
message_queue_list = topic_publish_info.message_queue_list
if not message_queue_list:
raise Exception("没有可用的消息队列")
# 尝试选择不同Broker的队列
for _ in range(len(message_queue_list)):
index = topic_publish_info.send_which_queue % len(message_queue_list)
topic_publish_info.send_which_queue += 1
mq = message_queue_list[index]
if mq.broker_name != last_broker_name:
return mq
# 如果所有队列都在同一个Broker,则返回第一个
return message_queue_list[0]
def _send_message_to_broker(self, message: Message, message_queue: MessageQueue,
communication_mode: CommunicationMode, send_callback,
timeout: int) -> SendResult:
"""发送消息到Broker"""
# 获取Broker地址
broker_addr = self._find_broker_address_in_publish(message_queue.broker_name)
if not broker_addr:
raise Exception(f"找不到Broker地址: {message_queue.broker_name}")
# 生成消息ID
msg_id = self._generate_message_id()
# 设置消息属性
message.queue_id = message_queue.queue_id
message.born_timestamp = datetime.now()
# 根据通信模式发送
if communication_mode == CommunicationMode.SYNC:
return self._send_message_sync(message, message_queue, broker_addr, timeout, msg_id)
elif communication_mode == CommunicationMode.ASYNC:
return self._send_message_async(message, message_queue, broker_addr, timeout,
send_callback, msg_id)
elif communication_mode == CommunicationMode.ONEWAY:
self._send_message_oneway(message, message_queue, broker_addr, timeout, msg_id)
return SendResult(SendStatus.SEND_OK, msg_id, message_queue, 0)
else:
raise ValueError(f"不支持的通信模式: {communication_mode}")
def _find_broker_address_in_publish(self, broker_name: str) -> Optional[str]:
"""在发布信息中查找Broker地址"""
# 这里应该从路由信息中获取Broker地址
# 为了演示,返回模拟地址
return f"192.168.1.100:10911" # Master Broker地址
def _generate_message_id(self) -> str:
"""生成消息ID"""
timestamp = int(time.time() * 1000)
random_num = random.randint(1000, 9999)
return f"{timestamp}{random_num}"
def _send_message_sync(self, message: Message, message_queue: MessageQueue,
broker_addr: str, timeout: int, msg_id: str) -> SendResult:
"""同步发送消息"""
print(f"同步发送消息到 {broker_addr}: {message.topic}")
# 模拟网络请求
time.sleep(0.01) # 模拟网络延迟
# 模拟发送成功
return SendResult(
send_status=SendStatus.SEND_OK,
msg_id=msg_id,
message_queue=message_queue,
queue_offset=random.randint(1000, 9999)
)
def _send_message_async(self, message: Message, message_queue: MessageQueue,
broker_addr: str, timeout: int, send_callback, msg_id: str) -> SendResult:
"""异步发送消息"""
print(f"异步发送消息到 {broker_addr}: {message.topic}")
def async_send():
try:
time.sleep(0.01) # 模拟网络延迟
result = SendResult(
send_status=SendStatus.SEND_OK,
msg_id=msg_id,
message_queue=message_queue,
queue_offset=random.randint(1000, 9999)
)
if send_callback:
send_callback.on_success(result)
except Exception as e:
if send_callback:
send_callback.on_exception(e)
# 启动异步线程
threading.Thread(target=async_send, daemon=True).start()
# 立即返回
return SendResult(
send_status=SendStatus.SEND_OK,
msg_id=msg_id,
message_queue=message_queue,
queue_offset=0
)
def _send_message_oneway(self, message: Message, message_queue: MessageQueue,
broker_addr: str, timeout: int, msg_id: str) -> None:
"""单向发送消息"""
print(f"单向发送消息到 {broker_addr}: {message.topic}")
def oneway_send():
time.sleep(0.01) # 模拟网络延迟
print(f"单向消息发送完成: {msg_id}")
# 启动异步线程,不等待结果
threading.Thread(target=oneway_send, daemon=True).start()
class TopicPublishInfo:
"""Topic发布信息"""
def __init__(self):
self.order_topic: str = ""
self.have_topic_router_info: bool = False
self.message_queue_list: List[MessageQueue] = []
self.send_which_queue: int = 0
self.topic_route_data: Optional[Dict] = None
@property
def ok(self) -> bool:
"""检查发布信息是否可用"""
return self.message_queue_list is not None and len(self.message_queue_list) > 0
class SendCallback:
"""发送回调接口"""
def on_success(self, send_result: SendResult) -> None:
"""发送成功回调"""
pass
def on_exception(self, exception: Exception) -> None:
"""发送异常回调"""
pass
class NameServerClient:
"""NameServer客户端"""
def __init__(self, nameserver_addr_list: List[str]):
self.nameserver_addr_list = nameserver_addr_list
self.current_index = 0
def get_topic_route_info_from_name_server(self, topic: str) -> Optional[Dict]:
"""从NameServer获取Topic路由信息"""
print(f"从NameServer获取Topic路由信息: {topic}")
# 模拟返回路由信息
return {
"orderTopicConf": "",
"queueDatas": [
{
"brokerName": "broker-a",
"readQueueNums": 4,
"writeQueueNums": 4,
"perm": 6,
"topicSynFlag": 0
}
],
"brokerDatas": [
{
"cluster": "DefaultCluster",
"brokerName": "broker-a",
"brokerAddrs": {
"0": "192.168.1.100:10911",
"1": "192.168.1.101:10911"
}
}
],
"filterServerTable": {}
}
# 使用示例
if __name__ == "__main__":
# 创建NameServer客户端
nameserver_client = NameServerClient(["192.168.1.100:9876"])
# 创建消息发送流程
message_send_flow = MessageSendFlow(None, nameserver_client)
# 创建消息
message = Message(
topic="TestTopic",
body="Hello RocketMQ".encode('utf-8'),
tags="TagA",
keys="OrderID001"
)
# 同步发送消息
try:
result = message_send_flow.send_message(message, CommunicationMode.SYNC)
print(f"同步发送结果: {result.send_status} msgId={result.msg_id}")
except Exception as e:
print(f"发送失败: {e}")
# 异步发送消息
class MySendCallback(SendCallback):
def on_success(self, send_result: SendResult) -> None:
print(f"异步发送成功: {send_result.send_status} msgId={send_result.msg_id}")
def on_exception(self, exception: Exception) -> None:
print(f"异步发送失败: {exception}")
callback = MySendCallback()
try:
result = message_send_flow.send_message(message, CommunicationMode.ASYNC, callback)
print(f"异步发送提交成功: {result.msg_id}")
except Exception as e:
print(f"异步发送提交失败: {e}")
# 单向发送消息
try:
result = message_send_flow.send_message(message, CommunicationMode.ONEWAY)
print(f"单向发送提交成功: {result.msg_id}")
except Exception as e:
print(f"单向发送提交失败: {e}")
2.3.2 消息消费流程
from dataclasses import dataclass
from typing import Dict, List, Optional, Set
from enum import Enum
import time
import threading
import queue
class ConsumeFromWhere(Enum):
"""消费起始位置"""
CONSUME_FROM_LAST_OFFSET = "CONSUME_FROM_LAST_OFFSET" # 从最后偏移量开始
CONSUME_FROM_FIRST_OFFSET = "CONSUME_FROM_FIRST_OFFSET" # 从第一个偏移量开始
CONSUME_FROM_TIMESTAMP = "CONSUME_FROM_TIMESTAMP" # 从指定时间戳开始
class ConsumeType(Enum):
"""消费类型"""
CONSUME_ACTIVELY = "CONSUME_ACTIVELY" # 主动消费(Pull)
CONSUME_PASSIVELY = "CONSUME_PASSIVELY" # 被动消费(Push)
class MessageModel(Enum):
"""消息模式"""
BROADCASTING = "BROADCASTING" # 广播模式
CLUSTERING = "CLUSTERING" # 集群模式
class ConsumeStatus(Enum):
"""消费状态"""
CONSUME_SUCCESS = "CONSUME_SUCCESS" # 消费成功
RECONSUME_LATER = "RECONSUME_LATER" # 稍后重新消费
class PullStatus(Enum):
"""拉取状态"""
FOUND = "FOUND" # 找到消息
NO_NEW_MSG = "NO_NEW_MSG" # 没有新消息
NO_MATCHED_MSG = "NO_MATCHED_MSG" # 没有匹配的消息
OFFSET_ILLEGAL = "OFFSET_ILLEGAL" # 偏移量非法
@dataclass
class PullResult:
"""拉取结果"""
pull_status: PullStatus
next_begin_offset: int
min_offset: int
max_offset: int
msg_found_list: List[Message]
@dataclass
class ConsumeMessageContext:
"""消费消息上下文"""
consumer_group: str
message_queue: MessageQueue
ack_index: int = -1
consume_start_timestamp: int = 0
success: bool = True
status: str = ""
mq_trace_context: Optional[Dict] = None
props: Dict[str, str] = None
def __post_init__(self):
if self.props is None:
self.props = {}
class MessageConsumeFlow:
"""消息消费流程"""
def __init__(self, consumer_group: str, nameserver_client, message_model: MessageModel):
self.consumer_group = consumer_group
self.nameserver_client = nameserver_client
self.message_model = message_model
# 订阅信息
self.subscription_inner: Dict[str, 'SubscriptionData'] = {}
# 队列分配
self.process_queue_table: Dict[MessageQueue, 'ProcessQueue'] = {}
# 偏移量存储
self.offset_store: 'OffsetStore' = None
# 消费服务
self.consume_message_service: 'ConsumeMessageService' = None
# 拉取服务
self.pull_message_service: 'PullMessageService' = None
# 负载均衡服务
self.rebalance_service: 'RebalanceService' = None
self.lock = threading.RLock()
self.running = False
def start(self) -> None:
"""启动消费者"""
with self.lock:
if self.running:
return
# 初始化偏移量存储
if self.message_model == MessageModel.BROADCASTING:
self.offset_store = LocalFileOffsetStore(self.consumer_group)
else:
self.offset_store = RemoteBrokerOffsetStore(self.consumer_group, self.nameserver_client)
self.offset_store.load()
# 启动消费服务
self.consume_message_service = ConsumeMessageConcurrentlyService(self)
self.consume_message_service.start()
# 启动拉取服务
self.pull_message_service = PullMessageService(self)
self.pull_message_service.start()
# 启动负载均衡服务
self.rebalance_service = RebalanceService(self)
self.rebalance_service.start()
self.running = True
print(f"消费者启动成功: {self.consumer_group}")
def shutdown(self) -> None:
"""关闭消费者"""
with self.lock:
if not self.running:
return
self.running = False
# 关闭各种服务
if self.rebalance_service:
self.rebalance_service.shutdown()
if self.pull_message_service:
self.pull_message_service.shutdown()
if self.consume_message_service:
self.consume_message_service.shutdown()
# 持久化偏移量
if self.offset_store:
self.offset_store.persist_all(set(self.process_queue_table.keys()))
print(f"消费者关闭成功: {self.consumer_group}")
def subscribe(self, topic: str, sub_expression: str) -> None:
"""订阅Topic"""
with self.lock:
subscription_data = SubscriptionData(
topic=topic,
sub_string=sub_expression,
tags_set=self._parse_tags(sub_expression),
class_filter_mode=False,
code_set=set(),
sub_version=int(time.time() * 1000)
)
self.subscription_inner[topic] = subscription_data
print(f"订阅Topic: {topic} 表达式: {sub_expression}")
def _parse_tags(self, sub_expression: str) -> Set[str]:
"""解析标签表达式"""
if sub_expression == "*":
return set()
tags = set()
for tag in sub_expression.split("||"):
tag = tag.strip()
if tag:
tags.add(tag)
return tags
def pull_message(self, message_queue: MessageQueue, sub_expression: str,
offset: int, max_nums: int) -> PullResult:
"""拉取消息"""
# 查找Broker地址
broker_addr = self._find_broker_address_in_subscribe(message_queue.broker_name, False)
if not broker_addr:
raise Exception(f"找不到Broker地址: {message_queue.broker_name}")
# 构建拉取请求
print(f"从 {broker_addr} 拉取消息: {message_queue.topic} queueId={message_queue.queue_id} offset={offset}")
# 模拟网络请求
time.sleep(0.01)
# 模拟返回结果
if offset < 1000: # 模拟有消息
messages = []
for i in range(min(max_nums, 5)): # 最多返回5条消息
msg = Message(
topic=message_queue.topic,
body=f"Message {offset + i}".encode('utf-8'),
tags="TagA",
keys=f"Key{offset + i}"
)
msg.queue_id = message_queue.queue_id
msg.queue_offset = offset + i
messages.append(msg)
return PullResult(
pull_status=PullStatus.FOUND,
next_begin_offset=offset + len(messages),
min_offset=0,
max_offset=2000,
msg_found_list=messages
)
else:
return PullResult(
pull_status=PullStatus.NO_NEW_MSG,
next_begin_offset=offset,
min_offset=0,
max_offset=2000,
msg_found_list=[]
)
def _find_broker_address_in_subscribe(self, broker_name: str, only_this_broker: bool) -> Optional[str]:
"""在订阅中查找Broker地址"""
# 这里应该从路由信息中获取Broker地址
# 为了演示,返回模拟地址
return f"192.168.1.100:10911"
def consume_message_directly(self, message: Message, broker_name: str) -> ConsumeStatus:
"""直接消费消息"""
print(f"直接消费消息: {message.topic} msgId={getattr(message, 'msg_id', 'unknown')}")
# 模拟消息处理
time.sleep(0.001)
# 模拟消费成功
return ConsumeStatus.CONSUME_SUCCESS
@dataclass
class SubscriptionData:
"""订阅数据"""
topic: str
sub_string: str
tags_set: Set[str]
class_filter_mode: bool
code_set: Set[int]
sub_version: int
class ProcessQueue:
"""处理队列"""
def __init__(self):
self.msg_tree_map: Dict[int, Message] = {} # offset -> message
self.msg_count = 0
self.msg_size = 0
self.consuming_msg_orderly_tree_map: Dict[int, Message] = {}
self.lock = threading.RLock()
self.last_pull_timestamp = int(time.time() * 1000)
self.last_consume_timestamp = int(time.time() * 1000)
self.dropped = False
def put_message(self, messages: List[Message]) -> bool:
"""放入消息"""
with self.lock:
for msg in messages:
offset = getattr(msg, 'queue_offset', 0)
self.msg_tree_map[offset] = msg
self.msg_count += 1
self.msg_size += len(msg.body)
return True
def take_messages(self, batch_size: int) -> List[Message]:
"""取出消息"""
with self.lock:
messages = []
offsets_to_remove = []
for offset in sorted(self.msg_tree_map.keys()):
if len(messages) >= batch_size:
break
msg = self.msg_tree_map[offset]
messages.append(msg)
offsets_to_remove.append(offset)
# 移除已取出的消息
for offset in offsets_to_remove:
msg = self.msg_tree_map.pop(offset)
self.msg_count -= 1
self.msg_size -= len(msg.body)
return messages
def is_dropped(self) -> bool:
"""是否已丢弃"""
return self.dropped
def set_dropped(self, dropped: bool) -> None:
"""设置丢弃状态"""
self.dropped = dropped
class OffsetStore:
"""偏移量存储接口"""
def load(self) -> None:
"""加载偏移量"""
pass
def update_offset(self, mq: MessageQueue, offset: int, increase_only: bool) -> None:
"""更新偏移量"""
pass
def read_offset(self, mq: MessageQueue, read_type: str) -> int:
"""读取偏移量"""
return -1
def persist_all(self, mqs: Set[MessageQueue]) -> None:
"""持久化所有偏移量"""
pass
class LocalFileOffsetStore(OffsetStore):
"""本地文件偏移量存储"""
def __init__(self, consumer_group: str):
self.consumer_group = consumer_group
self.offset_table: Dict[MessageQueue, int] = {}
self.lock = threading.RLock()
def load(self) -> None:
"""从本地文件加载偏移量"""
print(f"从本地文件加载偏移量: {self.consumer_group}")
# 这里应该从文件加载偏移量
def update_offset(self, mq: MessageQueue, offset: int, increase_only: bool) -> None:
"""更新偏移量"""
with self.lock:
if increase_only:
current_offset = self.offset_table.get(mq, -1)
if offset > current_offset:
self.offset_table[mq] = offset
else:
self.offset_table[mq] = offset
def read_offset(self, mq: MessageQueue, read_type: str) -> int:
"""读取偏移量"""
with self.lock:
return self.offset_table.get(mq, -1)
def persist_all(self, mqs: Set[MessageQueue]) -> None:
"""持久化所有偏移量到本地文件"""
print(f"持久化偏移量到本地文件: {self.consumer_group}")
# 这里应该将偏移量写入文件
class RemoteBrokerOffsetStore(OffsetStore):
"""远程Broker偏移量存储"""
def __init__(self, consumer_group: str, nameserver_client):
self.consumer_group = consumer_group
self.nameserver_client = nameserver_client
self.offset_table: Dict[MessageQueue, int] = {}
self.lock = threading.RLock()
def load(self) -> None:
"""从Broker加载偏移量"""
print(f"从Broker加载偏移量: {self.consumer_group}")
# 这里应该从Broker加载偏移量
def update_offset(self, mq: MessageQueue, offset: int, increase_only: bool) -> None:
"""更新偏移量"""
with self.lock:
if increase_only:
current_offset = self.offset_table.get(mq, -1)
if offset > current_offset:
self.offset_table[mq] = offset
else:
self.offset_table[mq] = offset
def read_offset(self, mq: MessageQueue, read_type: str) -> int:
"""读取偏移量"""
with self.lock:
return self.offset_table.get(mq, -1)
def persist_all(self, mqs: Set[MessageQueue]) -> None:
"""持久化所有偏移量到Broker"""
print(f"持久化偏移量到Broker: {self.consumer_group}")
# 这里应该将偏移量提交到Broker
class ConsumeMessageService:
"""消费消息服务接口"""
def start(self) -> None:
"""启动服务"""
pass
def shutdown(self) -> None:
"""关闭服务"""
pass
def submit_consume_request(self, messages: List[Message], process_queue: ProcessQueue,
message_queue: MessageQueue, dispatch_to_consume: bool) -> None:
"""提交消费请求"""
pass
class ConsumeMessageConcurrentlyService(ConsumeMessageService):
"""并发消费消息服务"""
def __init__(self, consumer):
self.consumer = consumer
self.consume_executor = None
self.running = False
def start(self) -> None:
"""启动并发消费服务"""
self.running = True
print("ConsumeMessageConcurrentlyService started")
def shutdown(self) -> None:
"""关闭并发消费服务"""
self.running = False
print("ConsumeMessageConcurrentlyService shutdown")
def submit_consume_request(self, messages: List[Message], process_queue: ProcessQueue,
message_queue: MessageQueue, dispatch_to_consume: bool) -> None:
"""提交消费请求"""
if not self.running:
return
# 创建消费任务
consume_request = ConsumeRequest(messages, process_queue, message_queue)
# 异步执行消费
threading.Thread(target=self._consume_message, args=(consume_request,), daemon=True).start()
def _consume_message(self, consume_request: 'ConsumeRequest') -> None:
"""消费消息"""
try:
# 模拟消息处理
for message in consume_request.messages:
status = self.consumer.consume_message_directly(message, consume_request.message_queue.broker_name)
print(f"消费消息: {message.topic} status={status}")
# 更新偏移量
if consume_request.messages:
last_message = consume_request.messages[-1]
offset = getattr(last_message, 'queue_offset', 0) + 1
self.consumer.offset_store.update_offset(consume_request.message_queue, offset, True)
except Exception as e:
print(f"消费消息异常: {e}")
@dataclass
class ConsumeRequest:
"""消费请求"""
messages: List[Message]
process_queue: ProcessQueue
message_queue: MessageQueue
class PullMessageService:
"""拉取消息服务"""
def __init__(self, consumer):
self.consumer = consumer
self.pull_request_queue = queue.Queue()
self.running = False
self.pull_thread = None
def start(self) -> None:
"""启动拉取服务"""
self.running = True
self.pull_thread = threading.Thread(target=self._pull_message_loop, daemon=True)
self.pull_thread.start()
print("PullMessageService started")
def shutdown(self) -> None:
"""关闭拉取服务"""
self.running = False
if self.pull_thread:
self.pull_thread.join(timeout=1)
print("PullMessageService shutdown")
def execute_pull_request_immediately(self, pull_request: 'PullRequest') -> None:
"""立即执行拉取请求"""
try:
self.pull_request_queue.put(pull_request, timeout=1)
except queue.Full:
print("拉取请求队列已满")
def _pull_message_loop(self) -> None:
"""拉取消息循环"""
while self.running:
try:
pull_request = self.pull_request_queue.get(timeout=1)
self._pull_message(pull_request)
except queue.Empty:
continue
except Exception as e:
print(f"拉取消息异常: {e}")
def _pull_message(self, pull_request: 'PullRequest') -> None:
"""拉取消息"""
if pull_request.process_queue.is_dropped():
return
# 获取订阅信息
subscription_data = self.consumer.subscription_inner.get(pull_request.message_queue.topic)
if subscription_data is None:
return
# 计算拉取偏移量
pull_offset = pull_request.next_offset
try:
# 拉取消息
pull_result = self.consumer.pull_message(
pull_request.message_queue,
subscription_data.sub_string,
pull_offset,
32 # 最大拉取数量
)
# 处理拉取结果
self._process_pull_result(pull_request, pull_result)
except Exception as e:
print(f"拉取消息失败: {e}")
# 延迟重试
threading.Timer(5.0, lambda: self.execute_pull_request_immediately(pull_request)).start()
def _process_pull_result(self, pull_request: 'PullRequest', pull_result: PullResult) -> None:
"""处理拉取结果"""
if pull_result.pull_status == PullStatus.FOUND:
# 找到消息,放入处理队列
pull_request.process_queue.put_message(pull_result.msg_found_list)
# 提交消费请求
self.consumer.consume_message_service.submit_consume_request(
pull_result.msg_found_list,
pull_request.process_queue,
pull_request.message_queue,
True
)
# 更新下次拉取偏移量
pull_request.next_offset = pull_result.next_begin_offset
# 立即进行下次拉取
self.execute_pull_request_immediately(pull_request)
elif pull_result.pull_status == PullStatus.NO_NEW_MSG:
# 没有新消息,更新偏移量后延迟拉取
pull_request.next_offset = pull_result.next_begin_offset
# 延迟拉取
threading.Timer(1.0, lambda: self.execute_pull_request_immediately(pull_request)).start()
else:
# 其他状态,延迟重试
threading.Timer(3.0, lambda: self.execute_pull_request_immediately(pull_request)).start()
@dataclass
class PullRequest:
"""拉取请求"""
consumer_group: str
message_queue: MessageQueue
process_queue: ProcessQueue
next_offset: int
locked_first: bool = False
class RebalanceService:
"""负载均衡服务"""
def __init__(self, consumer):
self.consumer = consumer
self.running = False
self.rebalance_thread = None
def start(self) -> None:
"""启动负载均衡服务"""
self.running = True
self.rebalance_thread = threading.Thread(target=self._rebalance_loop, daemon=True)
self.rebalance_thread.start()
print("RebalanceService started")
def shutdown(self) -> None:
"""关闭负载均衡服务"""
self.running = False
if self.rebalance_thread:
self.rebalance_thread.join(timeout=1)
print("RebalanceService shutdown")
def _rebalance_loop(self) -> None:
"""负载均衡循环"""
while self.running:
try:
self._do_rebalance()
time.sleep(20) # 每20秒进行一次负载均衡
except Exception as e:
print(f"负载均衡异常: {e}")
time.sleep(10)
def _do_rebalance(self) -> None:
"""执行负载均衡"""
print(f"执行负载均衡: {self.consumer.consumer_group}")
# 为每个订阅的Topic执行负载均衡
for topic in self.consumer.subscription_inner.keys():
self._rebalance_by_topic(topic)
def _rebalance_by_topic(self, topic: str) -> None:
"""按Topic进行负载均衡"""
# 获取Topic的队列信息
message_queues = self._get_topic_subscribe_info(topic)
if not message_queues:
return
# 获取消费者列表
consumer_ids = self._find_consumer_id_list(topic, self.consumer.consumer_group)
if not consumer_ids:
return
# 分配队列
allocated_queues = self._allocate_message_queue_strategy(message_queues, consumer_ids, "current_consumer_id")
# 更新处理队列
self._update_process_queue_table_in_rebalance(topic, allocated_queues)
def _get_topic_subscribe_info(self, topic: str) -> List[MessageQueue]:
"""获取Topic订阅信息"""
# 模拟返回队列信息
return [
MessageQueue(topic, "broker-a", 0),
MessageQueue(topic, "broker-a", 1),
MessageQueue(topic, "broker-a", 2),
MessageQueue(topic, "broker-a", 3)
]
def _find_consumer_id_list(self, topic: str, consumer_group: str) -> List[str]:
"""查找消费者ID列表"""
# 模拟返回消费者列表
return ["consumer1", "consumer2"]
def _allocate_message_queue_strategy(self, message_queues: List[MessageQueue],
consumer_ids: List[str], current_consumer_id: str) -> List[MessageQueue]:
"""分配消息队列策略"""
# 简单的平均分配策略
if current_consumer_id not in consumer_ids:
return []
index = consumer_ids.index(current_consumer_id)
allocated = []
for i, mq in enumerate(message_queues):
if i % len(consumer_ids) == index:
allocated.append(mq)
return allocated
def _update_process_queue_table_in_rebalance(self, topic: str, allocated_queues: List[MessageQueue]) -> None:
"""在负载均衡中更新处理队列表"""
# 移除不再分配给当前消费者的队列
queues_to_remove = []
for mq in self.consumer.process_queue_table.keys():
if mq.topic == topic and mq not in allocated_queues:
queues_to_remove.append(mq)
for mq in queues_to_remove:
process_queue = self.consumer.process_queue_table.pop(mq)
process_queue.set_dropped(True)
print(f"移除队列: {mq.topic} queueId={mq.queue_id}")
# 添加新分配的队列
for mq in allocated_queues:
if mq not in self.consumer.process_queue_table:
process_queue = ProcessQueue()
self.consumer.process_queue_table[mq] = process_queue
# 计算拉取偏移量
next_offset = self._compute_pull_from_where(mq)
# 创建拉取请求
pull_request = PullRequest(
consumer_group=self.consumer.consumer_group,
message_queue=mq,
process_queue=process_queue,
next_offset=next_offset
)
# 提交拉取请求
self.consumer.pull_message_service.execute_pull_request_immediately(pull_request)
print(f"添加队列: {mq.topic} queueId={mq.queue_id} offset={next_offset}")
def _compute_pull_from_where(self, mq: MessageQueue) -> int:
"""计算拉取起始位置"""
# 先尝试从偏移量存储中读取
offset = self.consumer.offset_store.read_offset(mq, "READ_FROM_STORE")
if offset >= 0:
return offset
# 如果没有存储的偏移量,根据消费策略确定起始位置
# 这里简化为从0开始
return 0
# 使用示例
if __name__ == "__main__":
# 创建NameServer客户端
nameserver_client = NameServerClient(["192.168.1.100:9876"])
# 创建消费者
consumer = MessageConsumeFlow(
consumer_group="test_consumer_group",
nameserver_client=nameserver_client,
message_model=MessageModel.CLUSTERING
)
# 订阅Topic
consumer.subscribe("TestTopic", "TagA || TagB")
# 启动消费者
try:
consumer.start()
print("消费者启动成功,开始消费消息...")
# 模拟运行一段时间
time.sleep(30)
except Exception as e:
print(f"消费者运行异常: {e}")
finally:
# 关闭消费者
consumer.shutdown()
print("消费者已关闭")
2.4 架构总结
2.4.1 整体架构图
from dataclasses import dataclass
from typing import Dict, List
from enum import Enum
class ComponentType(Enum):
"""组件类型"""
NAMESERVER = "NameServer"
BROKER = "Broker"
PRODUCER = "Producer"
CONSUMER = "Consumer"
class ComponentStatus(Enum):
"""组件状态"""
RUNNING = "RUNNING"
STOPPED = "STOPPED"
ERROR = "ERROR"
@dataclass
class ComponentInfo:
"""组件信息"""
name: str
component_type: ComponentType
address: str
status: ComponentStatus
properties: Dict[str, str]
class RocketMQArchitectureOverview:
"""RocketMQ架构概览"""
def __init__(self):
self.components: List[ComponentInfo] = []
self.connections: List[Dict[str, str]] = []
def add_component(self, component: ComponentInfo) -> None:
"""添加组件"""
self.components.append(component)
def add_connection(self, from_component: str, to_component: str,
connection_type: str, description: str) -> None:
"""添加连接关系"""
self.connections.append({
"from": from_component,
"to": to_component,
"type": connection_type,
"description": description
})
def generate_architecture_diagram(self) -> str:
"""生成架构图"""
diagram = """
┌─────────────────────────────────────────────────────────────────┐
│ RocketMQ 整体架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ NameServer1 │ │ NameServer2 │ │ NameServer3 │ │
│ │ │ │ │ │ │ │
│ │ 路由注册 │ │ 路由注册 │ │ 路由注册 │ │
│ │ 路由发现 │ │ 路由发现 │ │ 路由发现 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ └───────────────────┼───────────────────┘ │
│ │ │
│ ┌──────────────────────────┼──────────────────────────┐ │
│ │ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ │ Broker1 │ │ Broker2 │ │ Broker3 │ │
│ │ │ │ │ │ │ │ │
│ │ │ Master │ │ Master │ │ Slave │ │
│ │ │ CommitLog │ │ CommitLog │ │ CommitLog │ │
│ │ │ ConsumeQueue│ │ ConsumeQueue│ │ ConsumeQueue│ │
│ │ │ IndexFile │ │ IndexFile │ │ IndexFile │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │ │
│ └─────────┼───────────────────┼───────────────────┼──────────┘
│ │ │ │ │
│ ┌──────────┼───────────────────┼───────────────────┼──────────┐
│ │ │ │ │ │
│ │ ┌───────▼────┐ ┌───────▼────┐ ┌───────▼────┐ │
│ │ │ Producer1 │ │ Producer2 │ │ Producer3 │ │
│ │ │ │ │ │ │ │ │
│ │ │ 消息发送 │ │ 消息发送 │ │ 消息发送 │ │
│ │ │ 负载均衡 │ │ 负载均衡 │ │ 负载均衡 │ │
│ │ └────────────┘ └────────────┘ └────────────┘ │
│ │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ │ Consumer1 │ │ Consumer2 │ │ Consumer3 │ │
│ │ │ │ │ │ │ │ │
│ │ │ 消息拉取 │ │ 消息拉取 │ │ 消息拉取 │ │
│ │ │ 负载均衡 │ │ 负载均衡 │ │ 负载均衡 │ │
│ │ │ 偏移量管理 │ │ 偏移量管理 │ │ 偏移量管理 │ │
│ │ └────────────┘ └────────────┘ └────────────┘ │
│ └─────────────────────────────────────────────────────────────┘
└─────────────────────────────────────────────────────────────────┘
"""
return diagram
def get_component_responsibilities(self) -> Dict[ComponentType, List[str]]:
"""获取组件职责"""
return {
ComponentType.NAMESERVER: [
"维护Broker路由信息",
"提供路由发现服务",
"接受Broker注册",
"处理客户端路由查询",
"集群状态管理"
],
ComponentType.BROKER: [
"消息存储和管理",
"消息队列管理",
"消费者偏移量管理",
"消息过滤",
"主从同步",
"消息查询和统计"
],
ComponentType.PRODUCER: [
"消息发送",
"路由选择",
"负载均衡",
"故障转移",
"事务消息处理"
],
ComponentType.CONSUMER: [
"消息消费",
"偏移量管理",
"负载均衡",
"消息过滤",
"消费进度跟踪"
]
}
def get_data_flow_description(self) -> List[str]:
"""获取数据流描述"""
return [
"1. Broker启动时向NameServer注册自己的信息",
"2. Producer启动时从NameServer获取Broker路由信息",
"3. Producer根据路由信息选择合适的Broker发送消息",
"4. Broker接收消息并存储到CommitLog",
"5. Broker构建ConsumeQueue索引",
"6. Consumer启动时从NameServer获取Broker路由信息",
"7. Consumer向Broker发起消息拉取请求",
"8. Broker根据ConsumeQueue返回消息给Consumer",
"9. Consumer处理消息并更新消费偏移量",
"10. Broker定期向NameServer发送心跳保持注册信息"
]
def generate_summary_report(self) -> str:
"""生成架构总结报告"""
report = "\n=== RocketMQ 架构总结报告 ===\n\n"
# 组件概览
report += "## 1. 组件概览\n"
component_count = {}
for component in self.components:
comp_type = component.component_type
component_count[comp_type] = component_count.get(comp_type, 0) + 1
for comp_type, count in component_count.items():
report += f"- {comp_type.value}: {count} 个实例\n"
# 组件职责
report += "\n## 2. 组件职责\n"
responsibilities = self.get_component_responsibilities()
for comp_type, duties in responsibilities.items():
report += f"\n### {comp_type.value}\n"
for duty in duties:
report += f"- {duty}\n"
# 数据流
report += "\n## 3. 数据流程\n"
data_flows = self.get_data_flow_description()
for flow in data_flows:
report += f"{flow}\n"
# 架构特点
report += "\n## 4. 架构特点\n"
report += "- **高可用性**: NameServer集群无状态,Broker支持主从模式\n"
report += "- **高性能**: 基于文件系统的存储,顺序写入,零拷贝技术\n"
report += "- **可扩展性**: 支持水平扩展,动态增减Broker节点\n"
report += "- **消息可靠性**: 支持同步/异步刷盘,主从同步\n"
report += "- **负载均衡**: Producer和Consumer都支持负载均衡\n"
report += "- **消息过滤**: 支持Tag和SQL92过滤\n"
return report
# 使用示例
if __name__ == "__main__":
# 创建架构概览
architecture = RocketMQArchitectureOverview()
# 添加NameServer组件
architecture.add_component(ComponentInfo(
name="nameserver-1",
component_type=ComponentType.NAMESERVER,
address="192.168.1.100:9876",
status=ComponentStatus.RUNNING,
properties={"cluster": "DefaultCluster"}
))
architecture.add_component(ComponentInfo(
name="nameserver-2",
component_type=ComponentType.NAMESERVER,
address="192.168.1.101:9876",
status=ComponentStatus.RUNNING,
properties={"cluster": "DefaultCluster"}
))
# 添加Broker组件
architecture.add_component(ComponentInfo(
name="broker-a-master",
component_type=ComponentType.BROKER,
address="192.168.1.100:10911",
status=ComponentStatus.RUNNING,
properties={"cluster": "DefaultCluster", "role": "ASYNC_MASTER"}
))
architecture.add_component(ComponentInfo(
name="broker-a-slave",
component_type=ComponentType.BROKER,
address="192.168.1.101:10911",
status=ComponentStatus.RUNNING,
properties={"cluster": "DefaultCluster", "role": "SLAVE"}
))
# 添加Producer组件
architecture.add_component(ComponentInfo(
name="producer-1",
component_type=ComponentType.PRODUCER,
address="192.168.1.200:0",
status=ComponentStatus.RUNNING,
properties={"group": "test_producer_group"}
))
# 添加Consumer组件
architecture.add_component(ComponentInfo(
name="consumer-1",
component_type=ComponentType.CONSUMER,
address="192.168.1.201:0",
status=ComponentStatus.RUNNING,
properties={"group": "test_consumer_group"}
))
# 添加连接关系
architecture.add_connection(
"broker-a-master", "nameserver-1", "注册", "Broker向NameServer注册路由信息"
)
architecture.add_connection(
"producer-1", "nameserver-1", "查询", "Producer查询路由信息"
)
architecture.add_connection(
"producer-1", "broker-a-master", "发送", "Producer发送消息到Broker"
)
architecture.add_connection(
"consumer-1", "broker-a-master", "拉取", "Consumer从Broker拉取消息"
)
# 生成架构图
print(architecture.generate_architecture_diagram())
# 生成总结报告
print(architecture.generate_summary_report())
2.4.2 关键设计原则
from dataclasses import dataclass
from typing import List, Dict
from enum import Enum
class DesignPrinciple(Enum):
"""设计原则"""
HIGH_AVAILABILITY = "高可用性"
HIGH_PERFORMANCE = "高性能"
SCALABILITY = "可扩展性"
RELIABILITY = "可靠性"
SIMPLICITY = "简单性"
@dataclass
class PrincipleImplementation:
"""原则实现"""
principle: DesignPrinciple
description: str
implementations: List[str]
benefits: List[str]
class RocketMQDesignPrinciples:
"""RocketMQ设计原则"""
def __init__(self):
self.principles = self._initialize_principles()
def _initialize_principles(self) -> List[PrincipleImplementation]:
"""初始化设计原则"""
return [
PrincipleImplementation(
principle=DesignPrinciple.HIGH_AVAILABILITY,
description="确保系统在部分组件故障时仍能正常工作",
implementations=[
"NameServer集群无状态设计,任意节点故障不影响整体服务",
"Broker主从架构,主节点故障时从节点可以接管",
"Producer和Consumer支持故障转移",
"消息持久化存储,防止数据丢失"
],
benefits=[
"系统可用性达到99.9%以上",
"单点故障不会导致整个系统不可用",
"快速故障恢复能力"
]
),
PrincipleImplementation(
principle=DesignPrinciple.HIGH_PERFORMANCE,
description="提供高吞吐量和低延迟的消息处理能力",
implementations=[
"基于文件系统的顺序写入存储",
"零拷贝技术减少数据复制开销",
"批量消息处理",
"异步处理机制",
"内存映射文件(mmap)技术"
],
benefits=[
"单机支持万级TPS",
"毫秒级消息延迟",
"高效的磁盘利用率"
]
),
PrincipleImplementation(
principle=DesignPrinciple.SCALABILITY,
description="支持系统水平扩展以应对业务增长",
implementations=[
"无状态的NameServer设计",
"Broker集群支持动态扩容",
"Topic分区机制",
"消费者组负载均衡",
"生产者负载均衡"
],
benefits=[
"支持线性扩展",
"动态调整系统容量",
"负载均匀分布"
]
),
PrincipleImplementation(
principle=DesignPrinciple.RELIABILITY,
description="确保消息的可靠传输和处理",
implementations=[
"消息持久化存储",
"同步/异步刷盘机制",
"主从同步复制",
"消息重试机制",
"死信队列处理",
"事务消息支持"
],
benefits=[
"消息零丢失",
"至少一次投递保证",
"事务一致性保证"
]
),
PrincipleImplementation(
principle=DesignPrinciple.SIMPLICITY,
description="保持架构和操作的简单性",
implementations=[
"清晰的组件职责划分",
"简单的部署和配置",
"统一的管理接口",
"丰富的监控和运维工具"
],
benefits=[
"降低学习和使用成本",
"减少运维复杂度",
"提高开发效率"
]
)
]
def get_principle_summary(self) -> str:
"""获取设计原则总结"""
summary = "\n=== RocketMQ 设计原则总结 ===\n\n"
for principle_impl in self.principles:
summary += f"## {principle_impl.principle.value}\n"
summary += f"{principle_impl.description}\n\n"
summary += "### 实现方式:\n"
for impl in principle_impl.implementations:
summary += f"- {impl}\n"
summary += "\n### 带来的好处:\n"
for benefit in principle_impl.benefits:
summary += f"- {benefit}\n"
summary += "\n" + "-" * 50 + "\n\n"
return summary
def get_principle_matrix(self) -> Dict[str, Dict[str, str]]:
"""获取设计原则矩阵"""
matrix = {}
for principle_impl in self.principles:
principle_name = principle_impl.principle.value
matrix[principle_name] = {
"描述": principle_impl.description,
"关键实现": "; ".join(principle_impl.implementations[:3]), # 取前3个
"主要收益": "; ".join(principle_impl.benefits)
}
return matrix
# 使用示例
if __name__ == "__main__":
design_principles = RocketMQDesignPrinciples()
# 打印设计原则总结
print(design_principles.get_principle_summary())
# 打印设计原则矩阵
matrix = design_principles.get_principle_matrix()
print("\n=== 设计原则矩阵 ===\n")
for principle, details in matrix.items():
print(f"**{principle}**")
for key, value in details.items():
print(f" {key}: {value}")
print()
2.5 本章总结
本章深入介绍了RocketMQ的核心概念与架构设计,主要内容包括:
核心知识点
消息模型
- 消息的基本结构和属性
- 不同类型消息的特点和使用场景
- 消息的生命周期管理
Topic与Queue
- Topic的概念和管理
- Queue的分配和负载均衡
- 路由信息的维护和查询
Producer与Consumer
- 生产者的消息发送机制
- 消费者的消息消费模式
- 负载均衡和故障转移
NameServer架构
- 路由信息管理
- 集群协调和服务发现
- 无状态设计的优势
Broker架构
- 消息存储机制
- 主从复制和高可用
- 性能优化技术
消息流转过程
- 消息发送的完整流程
- 消息消费的详细步骤
- 偏移量管理和负载均衡
最佳实践
架构设计
- 合理规划NameServer集群规模
- 根据业务需求配置Broker主从关系
- 设计合适的Topic和Queue数量
性能优化
- 利用批量发送提高吞吐量
- 合理配置刷盘策略
- 优化消费者并发度
可靠性保证
- 配置合适的消息重试策略
- 实现消息幂等性处理
- 监控系统健康状态
练习题
- 解释RocketMQ中NameServer的作用和工作原理
- 描述消息从Producer发送到Consumer消费的完整流程
- 分析RocketMQ如何实现高可用性和高性能
- 设计一个电商订单系统的消息队列架构
- 比较不同消息模式(集群模式vs广播模式)的适用场景
通过本章的学习,你应该对RocketMQ的核心架构有了深入的理解,为后续的实际应用打下了坚实的基础。 “`