1.1 RocketMQ概述
1.1.1 什么是RocketMQ
Apache RocketMQ是一个开源的分布式消息中间件,由阿里巴巴开发并贡献给Apache软件基金会。它是一个低延迟、高可靠、可伸缩、易于使用的消息中间件,专为大规模分布式系统设计。
核心特性:
- 高性能:单机支持万级QPS,集群支持千万级QPS
- 高可用:支持多Master多Slave架构,99.95%的高可用性
- 高可靠:消息零丢失,支持同步刷盘、异步刷盘
- 低延迟:毫秒级延迟
- 海量消息堆积:支持万亿级消息堆积,不影响性能
- 顺序消息:支持全局顺序消息和分区顺序消息
- 事务消息:支持分布式事务消息
- 定时消息:支持任意时间精度的定时消息
- 消息过滤:支持Tag和SQL92过滤
- 回溯消费:支持按时间回溯消费
1.1.2 RocketMQ架构组件
from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Optional
from datetime import datetime
class ComponentType(Enum):
"""组件类型"""
NAME_SERVER = "name_server"
BROKER = "broker"
PRODUCER = "producer"
CONSUMER = "consumer"
ADMIN_TOOL = "admin_tool"
class BrokerRole(Enum):
"""Broker角色"""
ASYNC_MASTER = "async_master" # 异步复制Master
SYNC_MASTER = "sync_master" # 同步双写Master
SLAVE = "slave" # Slave
class FlushDiskType(Enum):
"""刷盘方式"""
SYNC_FLUSH = "sync_flush" # 同步刷盘
ASYNC_FLUSH = "async_flush" # 异步刷盘
@dataclass
class NameServerInfo:
"""NameServer信息"""
host: str
port: int
status: str
start_time: datetime
def get_address(self) -> str:
return f"{self.host}:{self.port}"
@dataclass
class BrokerInfo:
"""Broker信息"""
broker_id: int
broker_name: str
cluster_name: str
host: str
port: int
role: BrokerRole
flush_disk_type: FlushDiskType
status: str
start_time: datetime
def get_address(self) -> str:
return f"{self.host}:{self.port}"
def is_master(self) -> bool:
return self.role in [BrokerRole.ASYNC_MASTER, BrokerRole.SYNC_MASTER]
def is_slave(self) -> bool:
return self.role == BrokerRole.SLAVE
class RocketMQArchitecture:
"""RocketMQ架构管理"""
def __init__(self):
self.name_servers: List[NameServerInfo] = []
self.brokers: List[BrokerInfo] = []
self.clusters: Dict[str, List[str]] = {} # cluster_name -> broker_names
def add_name_server(self, host: str, port: int = 9876) -> NameServerInfo:
"""添加NameServer"""
name_server = NameServerInfo(
host=host,
port=port,
status="running",
start_time=datetime.now()
)
self.name_servers.append(name_server)
return name_server
def add_broker(self, broker_id: int, broker_name: str, cluster_name: str,
host: str, port: int = 10911, role: BrokerRole = BrokerRole.ASYNC_MASTER,
flush_disk_type: FlushDiskType = FlushDiskType.ASYNC_FLUSH) -> BrokerInfo:
"""添加Broker"""
broker = BrokerInfo(
broker_id=broker_id,
broker_name=broker_name,
cluster_name=cluster_name,
host=host,
port=port,
role=role,
flush_disk_type=flush_disk_type,
status="running",
start_time=datetime.now()
)
self.brokers.append(broker)
# 更新集群信息
if cluster_name not in self.clusters:
self.clusters[cluster_name] = []
if broker_name not in self.clusters[cluster_name]:
self.clusters[cluster_name].append(broker_name)
return broker
def get_name_server_list(self) -> str:
"""获取NameServer地址列表"""
return ";".join([ns.get_address() for ns in self.name_servers])
def get_brokers_by_cluster(self, cluster_name: str) -> List[BrokerInfo]:
"""根据集群名获取Broker列表"""
return [broker for broker in self.brokers if broker.cluster_name == cluster_name]
def get_master_brokers(self) -> List[BrokerInfo]:
"""获取所有Master Broker"""
return [broker for broker in self.brokers if broker.is_master()]
def get_slave_brokers(self) -> List[BrokerInfo]:
"""获取所有Slave Broker"""
return [broker for broker in self.brokers if broker.is_slave()]
def get_architecture_summary(self) -> Dict[str, any]:
"""获取架构摘要"""
return {
"name_servers": {
"count": len(self.name_servers),
"addresses": [ns.get_address() for ns in self.name_servers]
},
"brokers": {
"total_count": len(self.brokers),
"master_count": len(self.get_master_brokers()),
"slave_count": len(self.get_slave_brokers())
},
"clusters": {
"count": len(self.clusters),
"details": {cluster: len(brokers) for cluster, brokers in self.clusters.items()}
}
}
# 架构示例
def create_sample_architecture():
"""创建示例架构"""
arch = RocketMQArchitecture()
# 添加NameServer集群
arch.add_name_server("192.168.1.10", 9876)
arch.add_name_server("192.168.1.11", 9876)
# 添加Broker集群1
arch.add_broker(0, "broker-a", "DefaultCluster", "192.168.1.20", 10911,
BrokerRole.SYNC_MASTER, FlushDiskType.SYNC_FLUSH)
arch.add_broker(1, "broker-a", "DefaultCluster", "192.168.1.21", 10911,
BrokerRole.SLAVE, FlushDiskType.ASYNC_FLUSH)
# 添加Broker集群2
arch.add_broker(0, "broker-b", "DefaultCluster", "192.168.1.22", 10911,
BrokerRole.SYNC_MASTER, FlushDiskType.SYNC_FLUSH)
arch.add_broker(1, "broker-b", "DefaultCluster", "192.168.1.23", 10911,
BrokerRole.SLAVE, FlushDiskType.ASYNC_FLUSH)
return arch
if __name__ == "__main__":
# 创建示例架构
architecture = create_sample_architecture()
# 显示架构信息
summary = architecture.get_architecture_summary()
print("RocketMQ架构摘要:")
print(f"NameServer数量: {summary['name_servers']['count']}")
print(f"NameServer地址: {', '.join(summary['name_servers']['addresses'])}")
print(f"Broker总数: {summary['brokers']['total_count']}")
print(f"Master Broker数量: {summary['brokers']['master_count']}")
print(f"Slave Broker数量: {summary['brokers']['slave_count']}")
print(f"集群数量: {summary['clusters']['count']}")
print(f"集群详情: {summary['clusters']['details']}")
1.1.3 RocketMQ与其他消息队列对比
from dataclasses import dataclass
from typing import List, Dict
from enum import Enum
class MessageQueueType(Enum):
"""消息队列类型"""
ROCKETMQ = "RocketMQ"
KAFKA = "Kafka"
RABBITMQ = "RabbitMQ"
ACTIVEMQ = "ActiveMQ"
PULSAR = "Pulsar"
@dataclass
class PerformanceMetrics:
"""性能指标"""
throughput_qps: int # 吞吐量(QPS)
latency_ms: float # 延迟(毫秒)
availability: float # 可用性(%)
class FeatureSupport(Enum):
"""功能支持级别"""
EXCELLENT = "优秀"
GOOD = "良好"
FAIR = "一般"
POOR = "较差"
NOT_SUPPORTED = "不支持"
@dataclass
class MessageQueueComparison:
"""消息队列对比"""
name: str
performance: PerformanceMetrics
ordered_message: FeatureSupport
transaction_message: FeatureSupport
delayed_message: FeatureSupport
message_filter: FeatureSupport
message_replay: FeatureSupport
cluster_support: FeatureSupport
ops_complexity: FeatureSupport # 运维复杂度(越低越好)
ecosystem: FeatureSupport
class MessageQueueComparator:
"""消息队列对比器"""
def __init__(self):
self.comparisons = self._initialize_comparisons()
def _initialize_comparisons(self) -> Dict[MessageQueueType, MessageQueueComparison]:
"""初始化对比数据"""
return {
MessageQueueType.ROCKETMQ: MessageQueueComparison(
name="Apache RocketMQ",
performance=PerformanceMetrics(throughput_qps=100000, latency_ms=1.0, availability=99.95),
ordered_message=FeatureSupport.EXCELLENT,
transaction_message=FeatureSupport.EXCELLENT,
delayed_message=FeatureSupport.EXCELLENT,
message_filter=FeatureSupport.EXCELLENT,
message_replay=FeatureSupport.EXCELLENT,
cluster_support=FeatureSupport.EXCELLENT,
ops_complexity=FeatureSupport.GOOD,
ecosystem=FeatureSupport.GOOD
),
MessageQueueType.KAFKA: MessageQueueComparison(
name="Apache Kafka",
performance=PerformanceMetrics(throughput_qps=1000000, latency_ms=5.0, availability=99.9),
ordered_message=FeatureSupport.GOOD,
transaction_message=FeatureSupport.GOOD,
delayed_message=FeatureSupport.POOR,
message_filter=FeatureSupport.FAIR,
message_replay=FeatureSupport.EXCELLENT,
cluster_support=FeatureSupport.EXCELLENT,
ops_complexity=FeatureSupport.FAIR,
ecosystem=FeatureSupport.EXCELLENT
),
MessageQueueType.RABBITMQ: MessageQueueComparison(
name="RabbitMQ",
performance=PerformanceMetrics(throughput_qps=20000, latency_ms=2.0, availability=99.9),
ordered_message=FeatureSupport.GOOD,
transaction_message=FeatureSupport.GOOD,
delayed_message=FeatureSupport.EXCELLENT,
message_filter=FeatureSupport.EXCELLENT,
message_replay=FeatureSupport.FAIR,
cluster_support=FeatureSupport.GOOD,
ops_complexity=FeatureSupport.EXCELLENT,
ecosystem=FeatureSupport.EXCELLENT
),
MessageQueueType.ACTIVEMQ: MessageQueueComparison(
name="Apache ActiveMQ",
performance=PerformanceMetrics(throughput_qps=10000, latency_ms=10.0, availability=99.5),
ordered_message=FeatureSupport.GOOD,
transaction_message=FeatureSupport.EXCELLENT,
delayed_message=FeatureSupport.GOOD,
message_filter=FeatureSupport.GOOD,
message_replay=FeatureSupport.FAIR,
cluster_support=FeatureSupport.GOOD,
ops_complexity=FeatureSupport.GOOD,
ecosystem=FeatureSupport.GOOD
),
MessageQueueType.PULSAR: MessageQueueComparison(
name="Apache Pulsar",
performance=PerformanceMetrics(throughput_qps=300000, latency_ms=3.0, availability=99.95),
ordered_message=FeatureSupport.EXCELLENT,
transaction_message=FeatureSupport.GOOD,
delayed_message=FeatureSupport.EXCELLENT,
message_filter=FeatureSupport.GOOD,
message_replay=FeatureSupport.EXCELLENT,
cluster_support=FeatureSupport.EXCELLENT,
ops_complexity=FeatureSupport.FAIR,
ecosystem=FeatureSupport.FAIR
)
}
def get_comparison_table(self) -> List[Dict[str, any]]:
"""获取对比表格"""
table = []
for mq_type, comparison in self.comparisons.items():
table.append({
"消息队列": comparison.name,
"吞吐量(QPS)": f"{comparison.performance.throughput_qps:,}",
"延迟(ms)": comparison.performance.latency_ms,
"可用性(%)": comparison.performance.availability,
"顺序消息": comparison.ordered_message.value,
"事务消息": comparison.transaction_message.value,
"延时消息": comparison.delayed_message.value,
"消息过滤": comparison.message_filter.value,
"消息回溯": comparison.message_replay.value,
"集群支持": comparison.cluster_support.value,
"运维复杂度": comparison.ops_complexity.value,
"生态系统": comparison.ecosystem.value
})
return table
def get_recommendation(self, requirements: Dict[str, str]) -> MessageQueueType:
"""根据需求推荐消息队列"""
# 简化的推荐逻辑
if requirements.get("throughput") == "ultra_high":
return MessageQueueType.KAFKA
elif requirements.get("transaction") == "required":
return MessageQueueType.ROCKETMQ
elif requirements.get("ease_of_use") == "high":
return MessageQueueType.RABBITMQ
elif requirements.get("ordered_message") == "strict":
return MessageQueueType.ROCKETMQ
else:
return MessageQueueType.ROCKETMQ
def print_comparison_table(self):
"""打印对比表格"""
table = self.get_comparison_table()
print("\n" + "="*120)
print("消息队列对比表")
print("="*120)
# 打印表头
headers = list(table[0].keys())
header_line = "|".join([f"{header:^12}" for header in headers])
print(header_line)
print("-" * len(header_line))
# 打印数据行
for row in table:
data_line = "|".join([f"{str(value):^12}" for value in row.values()])
print(data_line)
print("="*120)
# 使用示例
if __name__ == "__main__":
comparator = MessageQueueComparator()
# 打印对比表格
comparator.print_comparison_table()
# 推荐示例
requirements = {
"throughput": "high",
"transaction": "required",
"ordered_message": "strict"
}
recommendation = comparator.get_recommendation(requirements)
print(f"\n根据您的需求,推荐使用: {recommendation.value}")
1.2 RocketMQ安装与配置
1.2.1 系统要求
硬件要求: - CPU: 2核心以上 - 内存: 4GB以上(生产环境建议8GB以上) - 磁盘: 50GB以上可用空间 - 网络: 千兆网卡
软件要求: - 操作系统: Linux/Windows/macOS - JDK: 1.8或以上版本 - Maven: 3.2.x或以上版本(源码编译时需要)
1.2.2 下载与安装
import os
import subprocess
import platform
from pathlib import Path
from typing import Optional
class RocketMQInstaller:
"""RocketMQ安装器"""
def __init__(self, install_dir: str = "/opt/rocketmq"):
self.install_dir = Path(install_dir)
self.version = "4.9.4"
self.download_url = f"https://archive.apache.org/dist/rocketmq/{self.version}/rocketmq-all-{self.version}-bin-release.zip"
self.system = platform.system().lower()
def check_java_version(self) -> bool:
"""检查Java版本"""
try:
result = subprocess.run(["java", "-version"],
capture_output=True, text=True)
if result.returncode == 0:
version_info = result.stderr
print(f"Java版本信息: {version_info.split()[2]}")
return True
else:
print("未找到Java,请先安装JDK 1.8或以上版本")
return False
except FileNotFoundError:
print("未找到Java,请先安装JDK 1.8或以上版本")
return False
def download_rocketmq(self) -> bool:
"""下载RocketMQ"""
try:
print(f"开始下载RocketMQ {self.version}...")
# 创建安装目录
self.install_dir.mkdir(parents=True, exist_ok=True)
# 下载文件
download_file = self.install_dir / f"rocketmq-all-{self.version}-bin-release.zip"
if download_file.exists():
print(f"文件已存在: {download_file}")
return True
# 使用wget或curl下载
if self.system == "linux" or self.system == "darwin":
cmd = ["wget", "-O", str(download_file), self.download_url]
try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError:
# 尝试使用curl
cmd = ["curl", "-L", "-o", str(download_file), self.download_url]
subprocess.run(cmd, check=True)
else:
print("请手动下载RocketMQ安装包")
print(f"下载地址: {self.download_url}")
print(f"保存到: {download_file}")
return False
print("下载完成")
return True
except Exception as e:
print(f"下载失败: {e}")
return False
def extract_rocketmq(self) -> bool:
"""解压RocketMQ"""
try:
print("开始解压RocketMQ...")
zip_file = self.install_dir / f"rocketmq-all-{self.version}-bin-release.zip"
if not zip_file.exists():
print(f"安装包不存在: {zip_file}")
return False
# 解压文件
if self.system == "linux" or self.system == "darwin":
cmd = ["unzip", "-q", str(zip_file), "-d", str(self.install_dir)]
subprocess.run(cmd, check=True)
else:
import zipfile
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
zip_ref.extractall(self.install_dir)
# 创建软链接
extracted_dir = self.install_dir / f"rocketmq-all-{self.version}-bin-release"
symlink = self.install_dir / "current"
if symlink.exists():
symlink.unlink()
if self.system != "windows":
symlink.symlink_to(extracted_dir)
print("解压完成")
return True
except Exception as e:
print(f"解压失败: {e}")
return False
def configure_environment(self) -> bool:
"""配置环境变量"""
try:
print("配置环境变量...")
rocketmq_home = self.install_dir / "current"
# 生成环境变量配置
env_config = f"""
# RocketMQ Environment Variables
export ROCKETMQ_HOME={rocketmq_home}
export PATH=$PATH:$ROCKETMQ_HOME/bin
# RocketMQ JVM Options
export JAVA_OPT="-server -Xms1g -Xmx1g -Xmn512m"
"""
# 写入配置文件
config_file = self.install_dir / "rocketmq_env.sh"
with open(config_file, 'w') as f:
f.write(env_config)
print(f"环境变量配置已保存到: {config_file}")
print("请执行以下命令加载环境变量:")
print(f"source {config_file}")
return True
except Exception as e:
print(f"配置环境变量失败: {e}")
return False
def install(self) -> bool:
"""执行完整安装"""
print("开始安装RocketMQ...")
# 检查Java环境
if not self.check_java_version():
return False
# 下载RocketMQ
if not self.download_rocketmq():
return False
# 解压RocketMQ
if not self.extract_rocketmq():
return False
# 配置环境变量
if not self.configure_environment():
return False
print("\nRocketMQ安装完成!")
print("\n后续步骤:")
print("1. 加载环境变量")
print("2. 启动NameServer")
print("3. 启动Broker")
print("4. 验证安装")
return True
# 安装示例
if __name__ == "__main__":
installer = RocketMQInstaller("/opt/rocketmq")
installer.install()
1.2.3 配置文件详解
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from pathlib import Path
import configparser
@dataclass
class NameServerConfig:
"""NameServer配置"""
listen_port: int = 9876
server_worker_threads: int = 8
server_callback_executor_threads: int = 0
server_selector_threads: int = 3
server_oneway_semaphore_value: int = 256
server_async_semaphore_value: int = 64
kvconfig_path: str = "/tmp/kvconfig.json"
config_store_path: str = "/tmp/nacos"
def to_properties(self) -> Dict[str, str]:
"""转换为properties格式"""
return {
"listenPort": str(self.listen_port),
"serverWorkerThreads": str(self.server_worker_threads),
"serverCallbackExecutorThreads": str(self.server_callback_executor_threads),
"serverSelectorThreads": str(self.server_selector_threads),
"serverOnewaySemaphoreValue": str(self.server_oneway_semaphore_value),
"serverAsyncSemaphoreValue": str(self.server_async_semaphore_value),
"kvConfigPath": self.kvconfig_path,
"configStorePath": self.config_store_path
}
@dataclass
class BrokerConfig:
"""Broker配置"""
# 基本配置
broker_cluster_name: str = "DefaultCluster"
broker_name: str = "broker-a"
broker_id: int = 0
delete_when: str = "04"
file_reserved_time: int = 72
broker_role: str = "ASYNC_MASTER"
flush_disk_type: str = "ASYNC_FLUSH"
# 网络配置
listen_port: int = 10911
namesrv_addr: str = "localhost:9876"
broker_ip1: Optional[str] = None
broker_ip2: Optional[str] = None
# 存储配置
store_path_root_dir: str = "/tmp/rocketmq/store"
store_path_commit_log: str = "/tmp/rocketmq/store/commitlog"
store_path_consume_queue: str = "/tmp/rocketmq/store/consumequeue"
store_path_index: str = "/tmp/rocketmq/store/index"
# 性能配置
send_message_thread_pool_nums: int = 1
pull_message_thread_pool_nums: int = 16
admin_broker_thread_pool_nums: int = 16
client_manage_thread_pool_nums: int = 32
# 消息配置
max_message_size: int = 4194304 # 4MB
map_file_size_commit_log: int = 1073741824 # 1GB
map_file_size_consume_queue: int = 6000000
# 高可用配置
ha_listen_port: int = 10912
ha_send_heartbeat_interval: int = 5000
ha_housekeeping_interval: int = 20000
def to_properties(self) -> Dict[str, str]:
"""转换为properties格式"""
props = {
"brokerClusterName": self.broker_cluster_name,
"brokerName": self.broker_name,
"brokerId": str(self.broker_id),
"deleteWhen": self.delete_when,
"fileReservedTime": str(self.file_reserved_time),
"brokerRole": self.broker_role,
"flushDiskType": self.flush_disk_type,
"listenPort": str(self.listen_port),
"namesrvAddr": self.namesrv_addr,
"storePathRootDir": self.store_path_root_dir,
"storePathCommitLog": self.store_path_commit_log,
"storePathConsumeQueue": self.store_path_consume_queue,
"storePathIndex": self.store_path_index,
"sendMessageThreadPoolNums": str(self.send_message_thread_pool_nums),
"pullMessageThreadPoolNums": str(self.pull_message_thread_pool_nums),
"adminBrokerThreadPoolNums": str(self.admin_broker_thread_pool_nums),
"clientManageThreadPoolNums": str(self.client_manage_thread_pool_nums),
"maxMessageSize": str(self.max_message_size),
"mapFileSizeCommitLog": str(self.map_file_size_commit_log),
"mapFileSizeConsumeQueue": str(self.map_file_size_consume_queue),
"haListenPort": str(self.ha_listen_port),
"haSendHeartbeatInterval": str(self.ha_send_heartbeat_interval),
"haHousekeepingInterval": str(self.ha_housekeeping_interval)
}
# 添加可选配置
if self.broker_ip1:
props["brokerIP1"] = self.broker_ip1
if self.broker_ip2:
props["brokerIP2"] = self.broker_ip2
return props
class RocketMQConfigManager:
"""RocketMQ配置管理器"""
def __init__(self, config_dir: str):
self.config_dir = Path(config_dir)
self.config_dir.mkdir(parents=True, exist_ok=True)
def generate_nameserver_config(self, config: NameServerConfig,
filename: str = "namesrv.properties") -> Path:
"""生成NameServer配置文件"""
config_file = self.config_dir / filename
with open(config_file, 'w') as f:
f.write("# NameServer Configuration\n")
f.write(f"# Generated at {datetime.now()}\n\n")
for key, value in config.to_properties().items():
f.write(f"{key}={value}\n")
return config_file
def generate_broker_config(self, config: BrokerConfig,
filename: str = "broker.properties") -> Path:
"""生成Broker配置文件"""
config_file = self.config_dir / filename
with open(config_file, 'w') as f:
f.write("# Broker Configuration\n")
f.write(f"# Generated at {datetime.now()}\n\n")
# 按类别组织配置
props = config.to_properties()
# 基本配置
f.write("# Basic Configuration\n")
basic_keys = ["brokerClusterName", "brokerName", "brokerId", "brokerRole", "flushDiskType"]
for key in basic_keys:
if key in props:
f.write(f"{key}={props[key]}\n")
f.write("\n")
# 网络配置
f.write("# Network Configuration\n")
network_keys = ["listenPort", "namesrvAddr", "brokerIP1", "brokerIP2"]
for key in network_keys:
if key in props:
f.write(f"{key}={props[key]}\n")
f.write("\n")
# 存储配置
f.write("# Storage Configuration\n")
storage_keys = ["storePathRootDir", "storePathCommitLog", "storePathConsumeQueue", "storePathIndex"]
for key in storage_keys:
if key in props:
f.write(f"{key}={props[key]}\n")
f.write("\n")
# 其他配置
f.write("# Other Configuration\n")
other_keys = set(props.keys()) - set(basic_keys + network_keys + storage_keys)
for key in sorted(other_keys):
f.write(f"{key}={props[key]}\n")
return config_file
def generate_cluster_configs(self, cluster_name: str = "DefaultCluster",
nameserver_hosts: List[str] = None,
broker_configs: List[Dict] = None) -> Dict[str, Path]:
"""生成集群配置文件"""
if nameserver_hosts is None:
nameserver_hosts = ["localhost"]
if broker_configs is None:
broker_configs = [
{"broker_name": "broker-a", "broker_id": 0, "role": "ASYNC_MASTER"},
{"broker_name": "broker-a", "broker_id": 1, "role": "SLAVE"},
{"broker_name": "broker-b", "broker_id": 0, "role": "ASYNC_MASTER"},
{"broker_name": "broker-b", "broker_id": 1, "role": "SLAVE"}
]
config_files = {}
# 生成NameServer配置
for i, host in enumerate(nameserver_hosts):
ns_config = NameServerConfig()
config_file = self.generate_nameserver_config(
ns_config, f"namesrv-{i+1}.properties"
)
config_files[f"nameserver_{i+1}"] = config_file
# 生成Broker配置
namesrv_addr = ";".join([f"{host}:9876" for host in nameserver_hosts])
for broker_cfg in broker_configs:
broker_config = BrokerConfig(
broker_cluster_name=cluster_name,
broker_name=broker_cfg["broker_name"],
broker_id=broker_cfg["broker_id"],
broker_role=broker_cfg["role"],
namesrv_addr=namesrv_addr
)
filename = f"broker-{broker_cfg['broker_name']}-{broker_cfg['broker_id']}.properties"
config_file = self.generate_broker_config(broker_config, filename)
config_files[f"broker_{broker_cfg['broker_name']}_{broker_cfg['broker_id']}"] = config_file
return config_files
# 配置生成示例
if __name__ == "__main__":
from datetime import datetime
# 创建配置管理器
config_manager = RocketMQConfigManager("/opt/rocketmq/conf")
# 生成集群配置
config_files = config_manager.generate_cluster_configs(
cluster_name="ProductionCluster",
nameserver_hosts=["192.168.1.10", "192.168.1.11"],
broker_configs=[
{"broker_name": "broker-a", "broker_id": 0, "role": "SYNC_MASTER"},
{"broker_name": "broker-a", "broker_id": 1, "role": "SLAVE"},
{"broker_name": "broker-b", "broker_id": 0, "role": "SYNC_MASTER"},
{"broker_name": "broker-b", "broker_id": 1, "role": "SLAVE"}
]
)
print("生成的配置文件:")
for name, path in config_files.items():
print(f"{name}: {path}")
1.3 启动与验证
1.3.1 启动脚本
import subprocess
import time
import os
import signal
from pathlib import Path
from typing import List, Dict, Optional
class RocketMQService:
"""RocketMQ服务管理"""
def __init__(self, rocketmq_home: str):
self.rocketmq_home = Path(rocketmq_home)
self.bin_dir = self.rocketmq_home / "bin"
self.processes: Dict[str, subprocess.Popen] = {}
# 设置环境变量
os.environ["ROCKETMQ_HOME"] = str(self.rocketmq_home)
def start_nameserver(self, config_file: Optional[str] = None,
jvm_options: Optional[str] = None) -> bool:
"""启动NameServer"""
try:
print("启动NameServer...")
# 构建启动命令
if os.name == 'nt': # Windows
cmd = [str(self.bin_dir / "mqnamesrv.cmd")]
else: # Linux/macOS
cmd = ["sh", str(self.bin_dir / "mqnamesrv")]
# 添加配置文件参数
if config_file:
cmd.extend(["-c", config_file])
# 设置JVM参数
env = os.environ.copy()
if jvm_options:
env["JAVA_OPT"] = jvm_options
else:
env["JAVA_OPT"] = "-server -Xms1g -Xmx1g -Xmn512m"
# 启动进程
process = subprocess.Popen(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
self.processes["nameserver"] = process
# 等待启动
time.sleep(3)
if process.poll() is None:
print("NameServer启动成功")
return True
else:
stdout, stderr = process.communicate()
print(f"NameServer启动失败: {stderr}")
return False
except Exception as e:
print(f"启动NameServer失败: {e}")
return False
def start_broker(self, config_file: Optional[str] = None,
jvm_options: Optional[str] = None,
broker_name: str = "broker") -> bool:
"""启动Broker"""
try:
print(f"启动Broker ({broker_name})...")
# 构建启动命令
if os.name == 'nt': # Windows
cmd = [str(self.bin_dir / "mqbroker.cmd")]
else: # Linux/macOS
cmd = ["sh", str(self.bin_dir / "mqbroker")]
# 添加配置文件参数
if config_file:
cmd.extend(["-c", config_file])
# 设置JVM参数
env = os.environ.copy()
if jvm_options:
env["JAVA_OPT"] = jvm_options
else:
env["JAVA_OPT"] = "-server -Xms2g -Xmx2g -Xmn1g"
# 启动进程
process = subprocess.Popen(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
self.processes[broker_name] = process
# 等待启动
time.sleep(5)
if process.poll() is None:
print(f"Broker ({broker_name}) 启动成功")
return True
else:
stdout, stderr = process.communicate()
print(f"Broker ({broker_name}) 启动失败: {stderr}")
return False
except Exception as e:
print(f"启动Broker失败: {e}")
return False
def stop_service(self, service_name: str) -> bool:
"""停止服务"""
try:
if service_name not in self.processes:
print(f"服务 {service_name} 未运行")
return True
process = self.processes[service_name]
if process.poll() is None:
print(f"停止服务 {service_name}...")
# 发送终止信号
if os.name == 'nt':
process.terminate()
else:
process.send_signal(signal.SIGTERM)
# 等待进程结束
try:
process.wait(timeout=10)
print(f"服务 {service_name} 已停止")
except subprocess.TimeoutExpired:
print(f"强制终止服务 {service_name}")
process.kill()
process.wait()
del self.processes[service_name]
return True
else:
print(f"服务 {service_name} 已经停止")
del self.processes[service_name]
return True
except Exception as e:
print(f"停止服务失败: {e}")
return False
def stop_all(self) -> bool:
"""停止所有服务"""
success = True
# 先停止Broker
broker_services = [name for name in self.processes.keys() if name != "nameserver"]
for service_name in broker_services:
if not self.stop_service(service_name):
success = False
# 再停止NameServer
if "nameserver" in self.processes:
if not self.stop_service("nameserver"):
success = False
return success
def get_service_status(self) -> Dict[str, str]:
"""获取服务状态"""
status = {}
for service_name, process in self.processes.items():
if process.poll() is None:
status[service_name] = "running"
else:
status[service_name] = "stopped"
return status
def check_ports(self) -> Dict[str, bool]:
"""检查端口占用"""
import socket
ports = {
"nameserver": 9876,
"broker": 10911,
"broker_ha": 10912
}
port_status = {}
for service, port in ports.items():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
result = sock.connect_ex(('localhost', port))
port_status[f"{service}({port})"] = result == 0
except Exception:
port_status[f"{service}({port})"] = False
finally:
sock.close()
return port_status
# 启动脚本示例
class RocketMQClusterManager:
"""RocketMQ集群管理器"""
def __init__(self, rocketmq_home: str, config_dir: str):
self.service = RocketMQService(rocketmq_home)
self.config_dir = Path(config_dir)
def start_cluster(self, cluster_config: Dict) -> bool:
"""启动集群"""
try:
print("启动RocketMQ集群...")
# 启动NameServer
nameserver_configs = cluster_config.get("nameservers", [])
for i, ns_config in enumerate(nameserver_configs):
config_file = self.config_dir / f"namesrv-{i+1}.properties"
if not self.service.start_nameserver(str(config_file)):
return False
time.sleep(2)
# 等待NameServer完全启动
time.sleep(5)
# 启动Broker
broker_configs = cluster_config.get("brokers", [])
for broker_config in broker_configs:
broker_name = f"{broker_config['broker_name']}-{broker_config['broker_id']}"
config_file = self.config_dir / f"broker-{broker_config['broker_name']}-{broker_config['broker_id']}.properties"
if not self.service.start_broker(str(config_file), broker_name=broker_name):
return False
time.sleep(3)
print("RocketMQ集群启动完成")
return True
except Exception as e:
print(f"启动集群失败: {e}")
return False
def stop_cluster(self) -> bool:
"""停止集群"""
print("停止RocketMQ集群...")
return self.service.stop_all()
def get_cluster_status(self) -> Dict[str, any]:
"""获取集群状态"""
return {
"services": self.service.get_service_status(),
"ports": self.service.check_ports()
}
# 使用示例
if __name__ == "__main__":
# 创建集群管理器
cluster_manager = RocketMQClusterManager(
rocketmq_home="/opt/rocketmq/current",
config_dir="/opt/rocketmq/conf"
)
# 集群配置
cluster_config = {
"nameservers": [
{"host": "localhost", "port": 9876}
],
"brokers": [
{"broker_name": "broker-a", "broker_id": 0},
{"broker_name": "broker-a", "broker_id": 1}
]
}
try:
# 启动集群
if cluster_manager.start_cluster(cluster_config):
print("\n集群启动成功!")
# 显示状态
status = cluster_manager.get_cluster_status()
print("\n服务状态:")
for service, state in status["services"].items():
print(f" {service}: {state}")
print("\n端口状态:")
for port, active in status["ports"].items():
print(f" {port}: {'活跃' if active else '未活跃'}")
# 等待用户输入停止
input("\n按回车键停止集群...")
# 停止集群
cluster_manager.stop_cluster()
print("集群已停止")
except KeyboardInterrupt:
print("\n收到中断信号,停止集群...")
cluster_manager.stop_cluster()
1.3.2 验证安装
import subprocess
import json
from typing import Dict, List, Optional
from dataclasses import dataclass
@dataclass
class VerificationResult:
"""验证结果"""
test_name: str
success: bool
message: str
details: Optional[Dict] = None
class RocketMQVerifier:
"""RocketMQ安装验证器"""
def __init__(self, rocketmq_home: str, namesrv_addr: str = "localhost:9876"):
self.rocketmq_home = rocketmq_home
self.namesrv_addr = namesrv_addr
self.admin_tool = f"{rocketmq_home}/bin/mqadmin"
def verify_nameserver(self) -> VerificationResult:
"""验证NameServer"""
try:
# 检查NameServer状态
cmd = ["sh", self.admin_tool, "namesrvStatus", "-n", self.namesrv_addr]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
return VerificationResult(
test_name="NameServer连接测试",
success=True,
message="NameServer运行正常",
details={"output": result.stdout}
)
else:
return VerificationResult(
test_name="NameServer连接测试",
success=False,
message=f"NameServer连接失败: {result.stderr}"
)
except Exception as e:
return VerificationResult(
test_name="NameServer连接测试",
success=False,
message=f"测试异常: {e}"
)
def verify_broker(self) -> VerificationResult:
"""验证Broker"""
try:
# 检查Broker状态
cmd = ["sh", self.admin_tool, "clusterList", "-n", self.namesrv_addr]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
output = result.stdout
if "BrokerName" in output and "TotalQueues" in output:
return VerificationResult(
test_name="Broker状态测试",
success=True,
message="Broker运行正常",
details={"cluster_info": output}
)
else:
return VerificationResult(
test_name="Broker状态测试",
success=False,
message="未发现活跃的Broker"
)
else:
return VerificationResult(
test_name="Broker状态测试",
success=False,
message=f"获取Broker状态失败: {result.stderr}"
)
except Exception as e:
return VerificationResult(
test_name="Broker状态测试",
success=False,
message=f"测试异常: {e}"
)
def verify_topic_operations(self) -> VerificationResult:
"""验证Topic操作"""
test_topic = "VerificationTestTopic"
try:
# 创建测试Topic
create_cmd = [
"sh", self.admin_tool, "updateTopic",
"-n", self.namesrv_addr,
"-t", test_topic,
"-c", "DefaultCluster"
]
create_result = subprocess.run(create_cmd, capture_output=True, text=True, timeout=10)
if create_result.returncode != 0:
return VerificationResult(
test_name="Topic操作测试",
success=False,
message=f"创建Topic失败: {create_result.stderr}"
)
# 查询Topic
query_cmd = [
"sh", self.admin_tool, "topicStatus",
"-n", self.namesrv_addr,
"-t", test_topic
]
query_result = subprocess.run(query_cmd, capture_output=True, text=True, timeout=10)
if query_result.returncode == 0:
# 删除测试Topic
delete_cmd = [
"sh", self.admin_tool, "deleteTopic",
"-n", self.namesrv_addr,
"-t", test_topic,
"-c", "DefaultCluster"
]
subprocess.run(delete_cmd, capture_output=True, text=True, timeout=10)
return VerificationResult(
test_name="Topic操作测试",
success=True,
message="Topic创建和查询成功",
details={"topic_info": query_result.stdout}
)
else:
return VerificationResult(
test_name="Topic操作测试",
success=False,
message=f"查询Topic失败: {query_result.stderr}"
)
except Exception as e:
return VerificationResult(
test_name="Topic操作测试",
success=False,
message=f"测试异常: {e}"
)
def verify_message_operations(self) -> VerificationResult:
"""验证消息操作"""
try:
# 这里可以添加简单的消息发送和接收测试
# 由于需要编写Java代码或使用Python客户端,这里简化处理
return VerificationResult(
test_name="消息操作测试",
success=True,
message="消息操作功能可用(需要客户端测试)"
)
except Exception as e:
return VerificationResult(
test_name="消息操作测试",
success=False,
message=f"测试异常: {e}"
)
def run_all_verifications(self) -> List[VerificationResult]:
"""运行所有验证测试"""
results = []
print("开始验证RocketMQ安装...")
# 验证NameServer
print("1. 验证NameServer...")
ns_result = self.verify_nameserver()
results.append(ns_result)
print(f" 结果: {'✓' if ns_result.success else '✗'} {ns_result.message}")
# 验证Broker
print("2. 验证Broker...")
broker_result = self.verify_broker()
results.append(broker_result)
print(f" 结果: {'✓' if broker_result.success else '✗'} {broker_result.message}")
# 验证Topic操作
print("3. 验证Topic操作...")
topic_result = self.verify_topic_operations()
results.append(topic_result)
print(f" 结果: {'✓' if topic_result.success else '✗'} {topic_result.message}")
# 验证消息操作
print("4. 验证消息操作...")
msg_result = self.verify_message_operations()
results.append(msg_result)
print(f" 结果: {'✓' if msg_result.success else '✗'} {msg_result.message}")
return results
def generate_verification_report(self, results: List[VerificationResult]) -> str:
"""生成验证报告"""
report = ["\n" + "="*60]
report.append("RocketMQ安装验证报告")
report.append("="*60)
success_count = sum(1 for r in results if r.success)
total_count = len(results)
report.append(f"\n总体结果: {success_count}/{total_count} 项测试通过")
report.append(f"成功率: {success_count/total_count*100:.1f}%")
report.append("\n详细结果:")
for i, result in enumerate(results, 1):
status = "✓ 通过" if result.success else "✗ 失败"
report.append(f"{i}. {result.test_name}: {status}")
report.append(f" {result.message}")
if result.details:
report.append(f" 详情: {str(result.details)[:100]}...")
if success_count == total_count:
report.append("\n🎉 恭喜!RocketMQ安装验证全部通过!")
else:
report.append("\n⚠️ 部分测试失败,请检查配置和服务状态")
report.append("="*60)
return "\n".join(report)
# 验证示例
if __name__ == "__main__":
# 创建验证器
verifier = RocketMQVerifier(
rocketmq_home="/opt/rocketmq/current",
namesrv_addr="localhost:9876"
)
# 运行验证
results = verifier.run_all_verifications()
# 生成报告
report = verifier.generate_verification_report(results)
print(report)
1.4 常见问题与解决方案
1.4.1 安装问题
from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Optional
class ProblemCategory(Enum):
"""问题类别"""
INSTALLATION = "安装问题"
CONFIGURATION = "配置问题"
STARTUP = "启动问题"
NETWORK = "网络问题"
PERFORMANCE = "性能问题"
PERMISSION = "权限问题"
class SeverityLevel(Enum):
"""严重程度"""
CRITICAL = "严重"
HIGH = "高"
MEDIUM = "中等"
LOW = "低"
@dataclass
class Solution:
"""解决方案"""
description: str
steps: List[str]
commands: List[str] = None
notes: List[str] = None
@dataclass
class CommonProblem:
"""常见问题"""
title: str
category: ProblemCategory
severity: SeverityLevel
symptoms: List[str]
causes: List[str]
solutions: List[Solution]
related_logs: List[str] = None
class RocketMQTroubleshooter:
"""RocketMQ故障排除器"""
def __init__(self):
self.problems = self._initialize_problems()
def _initialize_problems(self) -> List[CommonProblem]:
"""初始化常见问题库"""
return [
CommonProblem(
title="Java环境未配置或版本不兼容",
category=ProblemCategory.INSTALLATION,
severity=SeverityLevel.CRITICAL,
symptoms=[
"启动时提示找不到java命令",
"Java版本过低警告",
"JAVA_HOME未设置错误"
],
causes=[
"系统未安装JDK",
"JDK版本低于1.8",
"JAVA_HOME环境变量未设置",
"PATH环境变量未包含Java路径"
],
solutions=[
Solution(
description="安装并配置JDK 1.8+",
steps=[
"下载并安装JDK 1.8或更高版本",
"设置JAVA_HOME环境变量",
"将$JAVA_HOME/bin添加到PATH",
"验证Java安装"
],
commands=[
"java -version",
"echo $JAVA_HOME",
"export JAVA_HOME=/usr/lib/jvm/java-8-openjdk",
"export PATH=$PATH:$JAVA_HOME/bin"
]
)
]
),
CommonProblem(
title="内存不足导致启动失败",
category=ProblemCategory.STARTUP,
severity=SeverityLevel.HIGH,
symptoms=[
"OutOfMemoryError异常",
"进程启动后立即退出",
"系统响应缓慢"
],
causes=[
"JVM堆内存设置过大",
"系统可用内存不足",
"内存泄漏"
],
solutions=[
Solution(
description="调整JVM内存参数",
steps=[
"检查系统可用内存",
"调整JVM堆内存大小",
"优化GC参数",
"监控内存使用情况"
],
commands=[
"free -h",
"export JAVA_OPT='-server -Xms512m -Xmx1g -Xmn256m'",
"jstat -gc <pid>"
],
notes=[
"NameServer建议内存: 1GB",
"Broker建议内存: 2GB+",
"生产环境建议使用G1GC"
]
)
]
),
CommonProblem(
title="端口被占用",
category=ProblemCategory.NETWORK,
severity=SeverityLevel.MEDIUM,
symptoms=[
"Address already in use错误",
"端口绑定失败",
"服务无法启动"
],
causes=[
"默认端口被其他程序占用",
"防火墙阻止端口访问",
"多个实例使用相同端口"
],
solutions=[
Solution(
description="检查并释放端口",
steps=[
"检查端口占用情况",
"终止占用端口的进程",
"修改配置使用其他端口",
"配置防火墙规则"
],
commands=[
"netstat -tulpn | grep :9876",
"lsof -i :9876",
"kill -9 <pid>",
"firewall-cmd --add-port=9876/tcp --permanent"
]
)
]
),
CommonProblem(
title="Broker无法连接到NameServer",
category=ProblemCategory.NETWORK,
severity=SeverityLevel.HIGH,
symptoms=[
"Broker启动后在NameServer中不可见",
"连接超时错误",
"路由信息为空"
],
causes=[
"NameServer地址配置错误",
"网络连接问题",
"NameServer未启动",
"防火墙阻止连接"
],
solutions=[
Solution(
description="检查网络连接和配置",
steps=[
"验证NameServer是否运行",
"检查网络连通性",
"验证配置文件中的地址",
"检查防火墙设置"
],
commands=[
"telnet <nameserver_ip> 9876",
"ping <nameserver_ip>",
"sh mqadmin clusterList -n <nameserver_addr>"
]
)
]
)
]
def diagnose_by_symptoms(self, symptoms: List[str]) -> List[CommonProblem]:
"""根据症状诊断问题"""
matched_problems = []
for problem in self.problems:
for symptom in symptoms:
if any(symptom.lower() in s.lower() for s in problem.symptoms):
matched_problems.append(problem)
break
return matched_problems
def get_problems_by_category(self, category: ProblemCategory) -> List[CommonProblem]:
"""根据类别获取问题"""
return [p for p in self.problems if p.category == category]
def generate_troubleshooting_guide(self) -> str:
"""生成故障排除指南"""
guide = ["# RocketMQ故障排除指南\n"]
# 按类别组织问题
categories = {}
for problem in self.problems:
if problem.category not in categories:
categories[problem.category] = []
categories[problem.category].append(problem)
for category, problems in categories.items():
guide.append(f"## {category.value}\n")
for i, problem in enumerate(problems, 1):
guide.append(f"### {i}. {problem.title}")
guide.append(f"**严重程度**: {problem.severity.value}\n")
guide.append("**症状**:")
for symptom in problem.symptoms:
guide.append(f"- {symptom}")
guide.append("")
guide.append("**可能原因**:")
for cause in problem.causes:
guide.append(f"- {cause}")
guide.append("")
guide.append("**解决方案**:")
for j, solution in enumerate(problem.solutions, 1):
guide.append(f"**方案{j}**: {solution.description}")
guide.append("步骤:")
for step in solution.steps:
guide.append(f" 1. {step}")
if solution.commands:
guide.append("相关命令:")
for cmd in solution.commands:
guide.append(f" ```bash\n {cmd}\n ```")
if solution.notes:
guide.append("注意事项:")
for note in solution.notes:
guide.append(f" - {note}")
guide.append("")
guide.append("---\n")
return "\n".join(guide)
# 使用示例
if __name__ == "__main__":
troubleshooter = RocketMQTroubleshooter()
# 根据症状诊断
symptoms = ["OutOfMemoryError", "进程启动后立即退出"]
matched_problems = troubleshooter.diagnose_by_symptoms(symptoms)
print("根据症状匹配的问题:")
for problem in matched_problems:
print(f"- {problem.title} ({problem.severity.value})")
# 生成完整的故障排除指南
guide = troubleshooter.generate_troubleshooting_guide()
print("\n" + guide[:500] + "...") # 显示前500字符
1.5 本章总结
1.5.1 核心知识点
- RocketMQ特性:高性能、高可用、低延迟、海量消息堆积
- 架构组件:NameServer、Broker、Producer、Consumer
- 安装配置:Java环境、下载安装、配置文件、环境变量
- 启动验证:服务启动、连接测试、功能验证
- 故障排除:常见问题识别和解决方案
1.5.2 最佳实践
- 环境准备:确保Java 1.8+,充足的内存和磁盘空间
- 配置优化:根据业务需求调整JVM参数和Broker配置
- 监控验证:定期检查服务状态和性能指标
- 故障预防:建立监控告警机制,定期备份配置
1.5.3 练习题
- 在本地环境安装RocketMQ并启动单机版集群
- 配置一个2Master-2Slave的RocketMQ集群
- 编写脚本自动化RocketMQ的启动和停止
- 模拟常见故障并使用故障排除指南解决
- 优化JVM参数提升RocketMQ性能