1.1 RocketMQ概述

1.1.1 什么是RocketMQ

Apache RocketMQ是一个开源的分布式消息中间件,由阿里巴巴开发并贡献给Apache软件基金会。它是一个低延迟、高可靠、可伸缩、易于使用的消息中间件,专为大规模分布式系统设计。

核心特性:

  1. 高性能:单机支持万级QPS,集群支持千万级QPS
  2. 高可用:支持多Master多Slave架构,99.95%的高可用性
  3. 高可靠:消息零丢失,支持同步刷盘、异步刷盘
  4. 低延迟:毫秒级延迟
  5. 海量消息堆积:支持万亿级消息堆积,不影响性能
  6. 顺序消息:支持全局顺序消息和分区顺序消息
  7. 事务消息:支持分布式事务消息
  8. 定时消息:支持任意时间精度的定时消息
  9. 消息过滤:支持Tag和SQL92过滤
  10. 回溯消费:支持按时间回溯消费

1.1.2 RocketMQ架构组件

from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Optional
from datetime import datetime

class ComponentType(Enum):
    """组件类型"""
    NAME_SERVER = "name_server"
    BROKER = "broker"
    PRODUCER = "producer"
    CONSUMER = "consumer"
    ADMIN_TOOL = "admin_tool"

class BrokerRole(Enum):
    """Broker角色"""
    ASYNC_MASTER = "async_master"  # 异步复制Master
    SYNC_MASTER = "sync_master"    # 同步双写Master
    SLAVE = "slave"                # Slave

class FlushDiskType(Enum):
    """刷盘方式"""
    SYNC_FLUSH = "sync_flush"      # 同步刷盘
    ASYNC_FLUSH = "async_flush"    # 异步刷盘

@dataclass
class NameServerInfo:
    """NameServer信息"""
    host: str
    port: int
    status: str
    start_time: datetime
    
    def get_address(self) -> str:
        return f"{self.host}:{self.port}"

@dataclass
class BrokerInfo:
    """Broker信息"""
    broker_id: int
    broker_name: str
    cluster_name: str
    host: str
    port: int
    role: BrokerRole
    flush_disk_type: FlushDiskType
    status: str
    start_time: datetime
    
    def get_address(self) -> str:
        return f"{self.host}:{self.port}"
    
    def is_master(self) -> bool:
        return self.role in [BrokerRole.ASYNC_MASTER, BrokerRole.SYNC_MASTER]
    
    def is_slave(self) -> bool:
        return self.role == BrokerRole.SLAVE

class RocketMQArchitecture:
    """RocketMQ架构管理"""
    
    def __init__(self):
        self.name_servers: List[NameServerInfo] = []
        self.brokers: List[BrokerInfo] = []
        self.clusters: Dict[str, List[str]] = {}  # cluster_name -> broker_names
    
    def add_name_server(self, host: str, port: int = 9876) -> NameServerInfo:
        """添加NameServer"""
        name_server = NameServerInfo(
            host=host,
            port=port,
            status="running",
            start_time=datetime.now()
        )
        self.name_servers.append(name_server)
        return name_server
    
    def add_broker(self, broker_id: int, broker_name: str, cluster_name: str,
                   host: str, port: int = 10911, role: BrokerRole = BrokerRole.ASYNC_MASTER,
                   flush_disk_type: FlushDiskType = FlushDiskType.ASYNC_FLUSH) -> BrokerInfo:
        """添加Broker"""
        broker = BrokerInfo(
            broker_id=broker_id,
            broker_name=broker_name,
            cluster_name=cluster_name,
            host=host,
            port=port,
            role=role,
            flush_disk_type=flush_disk_type,
            status="running",
            start_time=datetime.now()
        )
        self.brokers.append(broker)
        
        # 更新集群信息
        if cluster_name not in self.clusters:
            self.clusters[cluster_name] = []
        if broker_name not in self.clusters[cluster_name]:
            self.clusters[cluster_name].append(broker_name)
        
        return broker
    
    def get_name_server_list(self) -> str:
        """获取NameServer地址列表"""
        return ";".join([ns.get_address() for ns in self.name_servers])
    
    def get_brokers_by_cluster(self, cluster_name: str) -> List[BrokerInfo]:
        """根据集群名获取Broker列表"""
        return [broker for broker in self.brokers if broker.cluster_name == cluster_name]
    
    def get_master_brokers(self) -> List[BrokerInfo]:
        """获取所有Master Broker"""
        return [broker for broker in self.brokers if broker.is_master()]
    
    def get_slave_brokers(self) -> List[BrokerInfo]:
        """获取所有Slave Broker"""
        return [broker for broker in self.brokers if broker.is_slave()]
    
    def get_architecture_summary(self) -> Dict[str, any]:
        """获取架构摘要"""
        return {
            "name_servers": {
                "count": len(self.name_servers),
                "addresses": [ns.get_address() for ns in self.name_servers]
            },
            "brokers": {
                "total_count": len(self.brokers),
                "master_count": len(self.get_master_brokers()),
                "slave_count": len(self.get_slave_brokers())
            },
            "clusters": {
                "count": len(self.clusters),
                "details": {cluster: len(brokers) for cluster, brokers in self.clusters.items()}
            }
        }

# 架构示例
def create_sample_architecture():
    """创建示例架构"""
    arch = RocketMQArchitecture()
    
    # 添加NameServer集群
    arch.add_name_server("192.168.1.10", 9876)
    arch.add_name_server("192.168.1.11", 9876)
    
    # 添加Broker集群1
    arch.add_broker(0, "broker-a", "DefaultCluster", "192.168.1.20", 10911, 
                   BrokerRole.SYNC_MASTER, FlushDiskType.SYNC_FLUSH)
    arch.add_broker(1, "broker-a", "DefaultCluster", "192.168.1.21", 10911, 
                   BrokerRole.SLAVE, FlushDiskType.ASYNC_FLUSH)
    
    # 添加Broker集群2
    arch.add_broker(0, "broker-b", "DefaultCluster", "192.168.1.22", 10911, 
                   BrokerRole.SYNC_MASTER, FlushDiskType.SYNC_FLUSH)
    arch.add_broker(1, "broker-b", "DefaultCluster", "192.168.1.23", 10911, 
                   BrokerRole.SLAVE, FlushDiskType.ASYNC_FLUSH)
    
    return arch

if __name__ == "__main__":
    # 创建示例架构
    architecture = create_sample_architecture()
    
    # 显示架构信息
    summary = architecture.get_architecture_summary()
    print("RocketMQ架构摘要:")
    print(f"NameServer数量: {summary['name_servers']['count']}")
    print(f"NameServer地址: {', '.join(summary['name_servers']['addresses'])}")
    print(f"Broker总数: {summary['brokers']['total_count']}")
    print(f"Master Broker数量: {summary['brokers']['master_count']}")
    print(f"Slave Broker数量: {summary['brokers']['slave_count']}")
    print(f"集群数量: {summary['clusters']['count']}")
    print(f"集群详情: {summary['clusters']['details']}")

1.1.3 RocketMQ与其他消息队列对比

from dataclasses import dataclass
from typing import List, Dict
from enum import Enum

class MessageQueueType(Enum):
    """消息队列类型"""
    ROCKETMQ = "RocketMQ"
    KAFKA = "Kafka"
    RABBITMQ = "RabbitMQ"
    ACTIVEMQ = "ActiveMQ"
    PULSAR = "Pulsar"

@dataclass
class PerformanceMetrics:
    """性能指标"""
    throughput_qps: int  # 吞吐量(QPS)
    latency_ms: float    # 延迟(毫秒)
    availability: float  # 可用性(%)
    
class FeatureSupport(Enum):
    """功能支持级别"""
    EXCELLENT = "优秀"
    GOOD = "良好"
    FAIR = "一般"
    POOR = "较差"
    NOT_SUPPORTED = "不支持"

@dataclass
class MessageQueueComparison:
    """消息队列对比"""
    name: str
    performance: PerformanceMetrics
    ordered_message: FeatureSupport
    transaction_message: FeatureSupport
    delayed_message: FeatureSupport
    message_filter: FeatureSupport
    message_replay: FeatureSupport
    cluster_support: FeatureSupport
    ops_complexity: FeatureSupport  # 运维复杂度(越低越好)
    ecosystem: FeatureSupport
    
class MessageQueueComparator:
    """消息队列对比器"""
    
    def __init__(self):
        self.comparisons = self._initialize_comparisons()
    
    def _initialize_comparisons(self) -> Dict[MessageQueueType, MessageQueueComparison]:
        """初始化对比数据"""
        return {
            MessageQueueType.ROCKETMQ: MessageQueueComparison(
                name="Apache RocketMQ",
                performance=PerformanceMetrics(throughput_qps=100000, latency_ms=1.0, availability=99.95),
                ordered_message=FeatureSupport.EXCELLENT,
                transaction_message=FeatureSupport.EXCELLENT,
                delayed_message=FeatureSupport.EXCELLENT,
                message_filter=FeatureSupport.EXCELLENT,
                message_replay=FeatureSupport.EXCELLENT,
                cluster_support=FeatureSupport.EXCELLENT,
                ops_complexity=FeatureSupport.GOOD,
                ecosystem=FeatureSupport.GOOD
            ),
            MessageQueueType.KAFKA: MessageQueueComparison(
                name="Apache Kafka",
                performance=PerformanceMetrics(throughput_qps=1000000, latency_ms=5.0, availability=99.9),
                ordered_message=FeatureSupport.GOOD,
                transaction_message=FeatureSupport.GOOD,
                delayed_message=FeatureSupport.POOR,
                message_filter=FeatureSupport.FAIR,
                message_replay=FeatureSupport.EXCELLENT,
                cluster_support=FeatureSupport.EXCELLENT,
                ops_complexity=FeatureSupport.FAIR,
                ecosystem=FeatureSupport.EXCELLENT
            ),
            MessageQueueType.RABBITMQ: MessageQueueComparison(
                name="RabbitMQ",
                performance=PerformanceMetrics(throughput_qps=20000, latency_ms=2.0, availability=99.9),
                ordered_message=FeatureSupport.GOOD,
                transaction_message=FeatureSupport.GOOD,
                delayed_message=FeatureSupport.EXCELLENT,
                message_filter=FeatureSupport.EXCELLENT,
                message_replay=FeatureSupport.FAIR,
                cluster_support=FeatureSupport.GOOD,
                ops_complexity=FeatureSupport.EXCELLENT,
                ecosystem=FeatureSupport.EXCELLENT
            ),
            MessageQueueType.ACTIVEMQ: MessageQueueComparison(
                name="Apache ActiveMQ",
                performance=PerformanceMetrics(throughput_qps=10000, latency_ms=10.0, availability=99.5),
                ordered_message=FeatureSupport.GOOD,
                transaction_message=FeatureSupport.EXCELLENT,
                delayed_message=FeatureSupport.GOOD,
                message_filter=FeatureSupport.GOOD,
                message_replay=FeatureSupport.FAIR,
                cluster_support=FeatureSupport.GOOD,
                ops_complexity=FeatureSupport.GOOD,
                ecosystem=FeatureSupport.GOOD
            ),
            MessageQueueType.PULSAR: MessageQueueComparison(
                name="Apache Pulsar",
                performance=PerformanceMetrics(throughput_qps=300000, latency_ms=3.0, availability=99.95),
                ordered_message=FeatureSupport.EXCELLENT,
                transaction_message=FeatureSupport.GOOD,
                delayed_message=FeatureSupport.EXCELLENT,
                message_filter=FeatureSupport.GOOD,
                message_replay=FeatureSupport.EXCELLENT,
                cluster_support=FeatureSupport.EXCELLENT,
                ops_complexity=FeatureSupport.FAIR,
                ecosystem=FeatureSupport.FAIR
            )
        }
    
    def get_comparison_table(self) -> List[Dict[str, any]]:
        """获取对比表格"""
        table = []
        
        for mq_type, comparison in self.comparisons.items():
            table.append({
                "消息队列": comparison.name,
                "吞吐量(QPS)": f"{comparison.performance.throughput_qps:,}",
                "延迟(ms)": comparison.performance.latency_ms,
                "可用性(%)": comparison.performance.availability,
                "顺序消息": comparison.ordered_message.value,
                "事务消息": comparison.transaction_message.value,
                "延时消息": comparison.delayed_message.value,
                "消息过滤": comparison.message_filter.value,
                "消息回溯": comparison.message_replay.value,
                "集群支持": comparison.cluster_support.value,
                "运维复杂度": comparison.ops_complexity.value,
                "生态系统": comparison.ecosystem.value
            })
        
        return table
    
    def get_recommendation(self, requirements: Dict[str, str]) -> MessageQueueType:
        """根据需求推荐消息队列"""
        # 简化的推荐逻辑
        if requirements.get("throughput") == "ultra_high":
            return MessageQueueType.KAFKA
        elif requirements.get("transaction") == "required":
            return MessageQueueType.ROCKETMQ
        elif requirements.get("ease_of_use") == "high":
            return MessageQueueType.RABBITMQ
        elif requirements.get("ordered_message") == "strict":
            return MessageQueueType.ROCKETMQ
        else:
            return MessageQueueType.ROCKETMQ
    
    def print_comparison_table(self):
        """打印对比表格"""
        table = self.get_comparison_table()
        
        print("\n" + "="*120)
        print("消息队列对比表")
        print("="*120)
        
        # 打印表头
        headers = list(table[0].keys())
        header_line = "|".join([f"{header:^12}" for header in headers])
        print(header_line)
        print("-" * len(header_line))
        
        # 打印数据行
        for row in table:
            data_line = "|".join([f"{str(value):^12}" for value in row.values()])
            print(data_line)
        
        print("="*120)

# 使用示例
if __name__ == "__main__":
    comparator = MessageQueueComparator()
    
    # 打印对比表格
    comparator.print_comparison_table()
    
    # 推荐示例
    requirements = {
        "throughput": "high",
        "transaction": "required",
        "ordered_message": "strict"
    }
    
    recommendation = comparator.get_recommendation(requirements)
    print(f"\n根据您的需求,推荐使用: {recommendation.value}")

1.2 RocketMQ安装与配置

1.2.1 系统要求

硬件要求: - CPU: 2核心以上 - 内存: 4GB以上(生产环境建议8GB以上) - 磁盘: 50GB以上可用空间 - 网络: 千兆网卡

软件要求: - 操作系统: Linux/Windows/macOS - JDK: 1.8或以上版本 - Maven: 3.2.x或以上版本(源码编译时需要)

1.2.2 下载与安装

import os
import subprocess
import platform
from pathlib import Path
from typing import Optional

class RocketMQInstaller:
    """RocketMQ安装器"""
    
    def __init__(self, install_dir: str = "/opt/rocketmq"):
        self.install_dir = Path(install_dir)
        self.version = "4.9.4"
        self.download_url = f"https://archive.apache.org/dist/rocketmq/{self.version}/rocketmq-all-{self.version}-bin-release.zip"
        self.system = platform.system().lower()
    
    def check_java_version(self) -> bool:
        """检查Java版本"""
        try:
            result = subprocess.run(["java", "-version"], 
                                  capture_output=True, text=True)
            if result.returncode == 0:
                version_info = result.stderr
                print(f"Java版本信息: {version_info.split()[2]}")
                return True
            else:
                print("未找到Java,请先安装JDK 1.8或以上版本")
                return False
        except FileNotFoundError:
            print("未找到Java,请先安装JDK 1.8或以上版本")
            return False
    
    def download_rocketmq(self) -> bool:
        """下载RocketMQ"""
        try:
            print(f"开始下载RocketMQ {self.version}...")
            
            # 创建安装目录
            self.install_dir.mkdir(parents=True, exist_ok=True)
            
            # 下载文件
            download_file = self.install_dir / f"rocketmq-all-{self.version}-bin-release.zip"
            
            if download_file.exists():
                print(f"文件已存在: {download_file}")
                return True
            
            # 使用wget或curl下载
            if self.system == "linux" or self.system == "darwin":
                cmd = ["wget", "-O", str(download_file), self.download_url]
                try:
                    subprocess.run(cmd, check=True)
                except subprocess.CalledProcessError:
                    # 尝试使用curl
                    cmd = ["curl", "-L", "-o", str(download_file), self.download_url]
                    subprocess.run(cmd, check=True)
            else:
                print("请手动下载RocketMQ安装包")
                print(f"下载地址: {self.download_url}")
                print(f"保存到: {download_file}")
                return False
            
            print("下载完成")
            return True
            
        except Exception as e:
            print(f"下载失败: {e}")
            return False
    
    def extract_rocketmq(self) -> bool:
        """解压RocketMQ"""
        try:
            print("开始解压RocketMQ...")
            
            zip_file = self.install_dir / f"rocketmq-all-{self.version}-bin-release.zip"
            if not zip_file.exists():
                print(f"安装包不存在: {zip_file}")
                return False
            
            # 解压文件
            if self.system == "linux" or self.system == "darwin":
                cmd = ["unzip", "-q", str(zip_file), "-d", str(self.install_dir)]
                subprocess.run(cmd, check=True)
            else:
                import zipfile
                with zipfile.ZipFile(zip_file, 'r') as zip_ref:
                    zip_ref.extractall(self.install_dir)
            
            # 创建软链接
            extracted_dir = self.install_dir / f"rocketmq-all-{self.version}-bin-release"
            symlink = self.install_dir / "current"
            
            if symlink.exists():
                symlink.unlink()
            
            if self.system != "windows":
                symlink.symlink_to(extracted_dir)
            
            print("解压完成")
            return True
            
        except Exception as e:
            print(f"解压失败: {e}")
            return False
    
    def configure_environment(self) -> bool:
        """配置环境变量"""
        try:
            print("配置环境变量...")
            
            rocketmq_home = self.install_dir / "current"
            
            # 生成环境变量配置
            env_config = f"""
# RocketMQ Environment Variables
export ROCKETMQ_HOME={rocketmq_home}
export PATH=$PATH:$ROCKETMQ_HOME/bin

# RocketMQ JVM Options
export JAVA_OPT="-server -Xms1g -Xmx1g -Xmn512m"
"""
            
            # 写入配置文件
            config_file = self.install_dir / "rocketmq_env.sh"
            with open(config_file, 'w') as f:
                f.write(env_config)
            
            print(f"环境变量配置已保存到: {config_file}")
            print("请执行以下命令加载环境变量:")
            print(f"source {config_file}")
            
            return True
            
        except Exception as e:
            print(f"配置环境变量失败: {e}")
            return False
    
    def install(self) -> bool:
        """执行完整安装"""
        print("开始安装RocketMQ...")
        
        # 检查Java环境
        if not self.check_java_version():
            return False
        
        # 下载RocketMQ
        if not self.download_rocketmq():
            return False
        
        # 解压RocketMQ
        if not self.extract_rocketmq():
            return False
        
        # 配置环境变量
        if not self.configure_environment():
            return False
        
        print("\nRocketMQ安装完成!")
        print("\n后续步骤:")
        print("1. 加载环境变量")
        print("2. 启动NameServer")
        print("3. 启动Broker")
        print("4. 验证安装")
        
        return True

# 安装示例
if __name__ == "__main__":
    installer = RocketMQInstaller("/opt/rocketmq")
    installer.install()

1.2.3 配置文件详解

from dataclasses import dataclass, field
from typing import Dict, List, Optional
from pathlib import Path
import configparser

@dataclass
class NameServerConfig:
    """NameServer配置"""
    listen_port: int = 9876
    server_worker_threads: int = 8
    server_callback_executor_threads: int = 0
    server_selector_threads: int = 3
    server_oneway_semaphore_value: int = 256
    server_async_semaphore_value: int = 64
    kvconfig_path: str = "/tmp/kvconfig.json"
    config_store_path: str = "/tmp/nacos"
    
    def to_properties(self) -> Dict[str, str]:
        """转换为properties格式"""
        return {
            "listenPort": str(self.listen_port),
            "serverWorkerThreads": str(self.server_worker_threads),
            "serverCallbackExecutorThreads": str(self.server_callback_executor_threads),
            "serverSelectorThreads": str(self.server_selector_threads),
            "serverOnewaySemaphoreValue": str(self.server_oneway_semaphore_value),
            "serverAsyncSemaphoreValue": str(self.server_async_semaphore_value),
            "kvConfigPath": self.kvconfig_path,
            "configStorePath": self.config_store_path
        }

@dataclass
class BrokerConfig:
    """Broker配置"""
    # 基本配置
    broker_cluster_name: str = "DefaultCluster"
    broker_name: str = "broker-a"
    broker_id: int = 0
    delete_when: str = "04"
    file_reserved_time: int = 72
    broker_role: str = "ASYNC_MASTER"
    flush_disk_type: str = "ASYNC_FLUSH"
    
    # 网络配置
    listen_port: int = 10911
    namesrv_addr: str = "localhost:9876"
    broker_ip1: Optional[str] = None
    broker_ip2: Optional[str] = None
    
    # 存储配置
    store_path_root_dir: str = "/tmp/rocketmq/store"
    store_path_commit_log: str = "/tmp/rocketmq/store/commitlog"
    store_path_consume_queue: str = "/tmp/rocketmq/store/consumequeue"
    store_path_index: str = "/tmp/rocketmq/store/index"
    
    # 性能配置
    send_message_thread_pool_nums: int = 1
    pull_message_thread_pool_nums: int = 16
    admin_broker_thread_pool_nums: int = 16
    client_manage_thread_pool_nums: int = 32
    
    # 消息配置
    max_message_size: int = 4194304  # 4MB
    map_file_size_commit_log: int = 1073741824  # 1GB
    map_file_size_consume_queue: int = 6000000
    
    # 高可用配置
    ha_listen_port: int = 10912
    ha_send_heartbeat_interval: int = 5000
    ha_housekeeping_interval: int = 20000
    
    def to_properties(self) -> Dict[str, str]:
        """转换为properties格式"""
        props = {
            "brokerClusterName": self.broker_cluster_name,
            "brokerName": self.broker_name,
            "brokerId": str(self.broker_id),
            "deleteWhen": self.delete_when,
            "fileReservedTime": str(self.file_reserved_time),
            "brokerRole": self.broker_role,
            "flushDiskType": self.flush_disk_type,
            "listenPort": str(self.listen_port),
            "namesrvAddr": self.namesrv_addr,
            "storePathRootDir": self.store_path_root_dir,
            "storePathCommitLog": self.store_path_commit_log,
            "storePathConsumeQueue": self.store_path_consume_queue,
            "storePathIndex": self.store_path_index,
            "sendMessageThreadPoolNums": str(self.send_message_thread_pool_nums),
            "pullMessageThreadPoolNums": str(self.pull_message_thread_pool_nums),
            "adminBrokerThreadPoolNums": str(self.admin_broker_thread_pool_nums),
            "clientManageThreadPoolNums": str(self.client_manage_thread_pool_nums),
            "maxMessageSize": str(self.max_message_size),
            "mapFileSizeCommitLog": str(self.map_file_size_commit_log),
            "mapFileSizeConsumeQueue": str(self.map_file_size_consume_queue),
            "haListenPort": str(self.ha_listen_port),
            "haSendHeartbeatInterval": str(self.ha_send_heartbeat_interval),
            "haHousekeepingInterval": str(self.ha_housekeeping_interval)
        }
        
        # 添加可选配置
        if self.broker_ip1:
            props["brokerIP1"] = self.broker_ip1
        if self.broker_ip2:
            props["brokerIP2"] = self.broker_ip2
        
        return props

class RocketMQConfigManager:
    """RocketMQ配置管理器"""
    
    def __init__(self, config_dir: str):
        self.config_dir = Path(config_dir)
        self.config_dir.mkdir(parents=True, exist_ok=True)
    
    def generate_nameserver_config(self, config: NameServerConfig, 
                                 filename: str = "namesrv.properties") -> Path:
        """生成NameServer配置文件"""
        config_file = self.config_dir / filename
        
        with open(config_file, 'w') as f:
            f.write("# NameServer Configuration\n")
            f.write(f"# Generated at {datetime.now()}\n\n")
            
            for key, value in config.to_properties().items():
                f.write(f"{key}={value}\n")
        
        return config_file
    
    def generate_broker_config(self, config: BrokerConfig, 
                             filename: str = "broker.properties") -> Path:
        """生成Broker配置文件"""
        config_file = self.config_dir / filename
        
        with open(config_file, 'w') as f:
            f.write("# Broker Configuration\n")
            f.write(f"# Generated at {datetime.now()}\n\n")
            
            # 按类别组织配置
            props = config.to_properties()
            
            # 基本配置
            f.write("# Basic Configuration\n")
            basic_keys = ["brokerClusterName", "brokerName", "brokerId", "brokerRole", "flushDiskType"]
            for key in basic_keys:
                if key in props:
                    f.write(f"{key}={props[key]}\n")
            f.write("\n")
            
            # 网络配置
            f.write("# Network Configuration\n")
            network_keys = ["listenPort", "namesrvAddr", "brokerIP1", "brokerIP2"]
            for key in network_keys:
                if key in props:
                    f.write(f"{key}={props[key]}\n")
            f.write("\n")
            
            # 存储配置
            f.write("# Storage Configuration\n")
            storage_keys = ["storePathRootDir", "storePathCommitLog", "storePathConsumeQueue", "storePathIndex"]
            for key in storage_keys:
                if key in props:
                    f.write(f"{key}={props[key]}\n")
            f.write("\n")
            
            # 其他配置
            f.write("# Other Configuration\n")
            other_keys = set(props.keys()) - set(basic_keys + network_keys + storage_keys)
            for key in sorted(other_keys):
                f.write(f"{key}={props[key]}\n")
        
        return config_file
    
    def generate_cluster_configs(self, cluster_name: str = "DefaultCluster", 
                               nameserver_hosts: List[str] = None,
                               broker_configs: List[Dict] = None) -> Dict[str, Path]:
        """生成集群配置文件"""
        if nameserver_hosts is None:
            nameserver_hosts = ["localhost"]
        
        if broker_configs is None:
            broker_configs = [
                {"broker_name": "broker-a", "broker_id": 0, "role": "ASYNC_MASTER"},
                {"broker_name": "broker-a", "broker_id": 1, "role": "SLAVE"},
                {"broker_name": "broker-b", "broker_id": 0, "role": "ASYNC_MASTER"},
                {"broker_name": "broker-b", "broker_id": 1, "role": "SLAVE"}
            ]
        
        config_files = {}
        
        # 生成NameServer配置
        for i, host in enumerate(nameserver_hosts):
            ns_config = NameServerConfig()
            config_file = self.generate_nameserver_config(
                ns_config, f"namesrv-{i+1}.properties"
            )
            config_files[f"nameserver_{i+1}"] = config_file
        
        # 生成Broker配置
        namesrv_addr = ";".join([f"{host}:9876" for host in nameserver_hosts])
        
        for broker_cfg in broker_configs:
            broker_config = BrokerConfig(
                broker_cluster_name=cluster_name,
                broker_name=broker_cfg["broker_name"],
                broker_id=broker_cfg["broker_id"],
                broker_role=broker_cfg["role"],
                namesrv_addr=namesrv_addr
            )
            
            filename = f"broker-{broker_cfg['broker_name']}-{broker_cfg['broker_id']}.properties"
            config_file = self.generate_broker_config(broker_config, filename)
            config_files[f"broker_{broker_cfg['broker_name']}_{broker_cfg['broker_id']}"] = config_file
        
        return config_files

# 配置生成示例
if __name__ == "__main__":
    from datetime import datetime
    
    # 创建配置管理器
    config_manager = RocketMQConfigManager("/opt/rocketmq/conf")
    
    # 生成集群配置
    config_files = config_manager.generate_cluster_configs(
        cluster_name="ProductionCluster",
        nameserver_hosts=["192.168.1.10", "192.168.1.11"],
        broker_configs=[
            {"broker_name": "broker-a", "broker_id": 0, "role": "SYNC_MASTER"},
            {"broker_name": "broker-a", "broker_id": 1, "role": "SLAVE"},
            {"broker_name": "broker-b", "broker_id": 0, "role": "SYNC_MASTER"},
            {"broker_name": "broker-b", "broker_id": 1, "role": "SLAVE"}
        ]
    )
    
    print("生成的配置文件:")
    for name, path in config_files.items():
        print(f"{name}: {path}")

1.3 启动与验证

1.3.1 启动脚本

import subprocess
import time
import os
import signal
from pathlib import Path
from typing import List, Dict, Optional

class RocketMQService:
    """RocketMQ服务管理"""
    
    def __init__(self, rocketmq_home: str):
        self.rocketmq_home = Path(rocketmq_home)
        self.bin_dir = self.rocketmq_home / "bin"
        self.processes: Dict[str, subprocess.Popen] = {}
        
        # 设置环境变量
        os.environ["ROCKETMQ_HOME"] = str(self.rocketmq_home)
    
    def start_nameserver(self, config_file: Optional[str] = None, 
                        jvm_options: Optional[str] = None) -> bool:
        """启动NameServer"""
        try:
            print("启动NameServer...")
            
            # 构建启动命令
            if os.name == 'nt':  # Windows
                cmd = [str(self.bin_dir / "mqnamesrv.cmd")]
            else:  # Linux/macOS
                cmd = ["sh", str(self.bin_dir / "mqnamesrv")]
            
            # 添加配置文件参数
            if config_file:
                cmd.extend(["-c", config_file])
            
            # 设置JVM参数
            env = os.environ.copy()
            if jvm_options:
                env["JAVA_OPT"] = jvm_options
            else:
                env["JAVA_OPT"] = "-server -Xms1g -Xmx1g -Xmn512m"
            
            # 启动进程
            process = subprocess.Popen(
                cmd,
                env=env,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True
            )
            
            self.processes["nameserver"] = process
            
            # 等待启动
            time.sleep(3)
            
            if process.poll() is None:
                print("NameServer启动成功")
                return True
            else:
                stdout, stderr = process.communicate()
                print(f"NameServer启动失败: {stderr}")
                return False
                
        except Exception as e:
            print(f"启动NameServer失败: {e}")
            return False
    
    def start_broker(self, config_file: Optional[str] = None,
                    jvm_options: Optional[str] = None,
                    broker_name: str = "broker") -> bool:
        """启动Broker"""
        try:
            print(f"启动Broker ({broker_name})...")
            
            # 构建启动命令
            if os.name == 'nt':  # Windows
                cmd = [str(self.bin_dir / "mqbroker.cmd")]
            else:  # Linux/macOS
                cmd = ["sh", str(self.bin_dir / "mqbroker")]
            
            # 添加配置文件参数
            if config_file:
                cmd.extend(["-c", config_file])
            
            # 设置JVM参数
            env = os.environ.copy()
            if jvm_options:
                env["JAVA_OPT"] = jvm_options
            else:
                env["JAVA_OPT"] = "-server -Xms2g -Xmx2g -Xmn1g"
            
            # 启动进程
            process = subprocess.Popen(
                cmd,
                env=env,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True
            )
            
            self.processes[broker_name] = process
            
            # 等待启动
            time.sleep(5)
            
            if process.poll() is None:
                print(f"Broker ({broker_name}) 启动成功")
                return True
            else:
                stdout, stderr = process.communicate()
                print(f"Broker ({broker_name}) 启动失败: {stderr}")
                return False
                
        except Exception as e:
            print(f"启动Broker失败: {e}")
            return False
    
    def stop_service(self, service_name: str) -> bool:
        """停止服务"""
        try:
            if service_name not in self.processes:
                print(f"服务 {service_name} 未运行")
                return True
            
            process = self.processes[service_name]
            
            if process.poll() is None:
                print(f"停止服务 {service_name}...")
                
                # 发送终止信号
                if os.name == 'nt':
                    process.terminate()
                else:
                    process.send_signal(signal.SIGTERM)
                
                # 等待进程结束
                try:
                    process.wait(timeout=10)
                    print(f"服务 {service_name} 已停止")
                except subprocess.TimeoutExpired:
                    print(f"强制终止服务 {service_name}")
                    process.kill()
                    process.wait()
                
                del self.processes[service_name]
                return True
            else:
                print(f"服务 {service_name} 已经停止")
                del self.processes[service_name]
                return True
                
        except Exception as e:
            print(f"停止服务失败: {e}")
            return False
    
    def stop_all(self) -> bool:
        """停止所有服务"""
        success = True
        
        # 先停止Broker
        broker_services = [name for name in self.processes.keys() if name != "nameserver"]
        for service_name in broker_services:
            if not self.stop_service(service_name):
                success = False
        
        # 再停止NameServer
        if "nameserver" in self.processes:
            if not self.stop_service("nameserver"):
                success = False
        
        return success
    
    def get_service_status(self) -> Dict[str, str]:
        """获取服务状态"""
        status = {}
        
        for service_name, process in self.processes.items():
            if process.poll() is None:
                status[service_name] = "running"
            else:
                status[service_name] = "stopped"
        
        return status
    
    def check_ports(self) -> Dict[str, bool]:
        """检查端口占用"""
        import socket
        
        ports = {
            "nameserver": 9876,
            "broker": 10911,
            "broker_ha": 10912
        }
        
        port_status = {}
        
        for service, port in ports.items():
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            try:
                result = sock.connect_ex(('localhost', port))
                port_status[f"{service}({port})"] = result == 0
            except Exception:
                port_status[f"{service}({port})"] = False
            finally:
                sock.close()
        
        return port_status

# 启动脚本示例
class RocketMQClusterManager:
    """RocketMQ集群管理器"""
    
    def __init__(self, rocketmq_home: str, config_dir: str):
        self.service = RocketMQService(rocketmq_home)
        self.config_dir = Path(config_dir)
    
    def start_cluster(self, cluster_config: Dict) -> bool:
        """启动集群"""
        try:
            print("启动RocketMQ集群...")
            
            # 启动NameServer
            nameserver_configs = cluster_config.get("nameservers", [])
            for i, ns_config in enumerate(nameserver_configs):
                config_file = self.config_dir / f"namesrv-{i+1}.properties"
                if not self.service.start_nameserver(str(config_file)):
                    return False
                time.sleep(2)
            
            # 等待NameServer完全启动
            time.sleep(5)
            
            # 启动Broker
            broker_configs = cluster_config.get("brokers", [])
            for broker_config in broker_configs:
                broker_name = f"{broker_config['broker_name']}-{broker_config['broker_id']}"
                config_file = self.config_dir / f"broker-{broker_config['broker_name']}-{broker_config['broker_id']}.properties"
                
                if not self.service.start_broker(str(config_file), broker_name=broker_name):
                    return False
                time.sleep(3)
            
            print("RocketMQ集群启动完成")
            return True
            
        except Exception as e:
            print(f"启动集群失败: {e}")
            return False
    
    def stop_cluster(self) -> bool:
        """停止集群"""
        print("停止RocketMQ集群...")
        return self.service.stop_all()
    
    def get_cluster_status(self) -> Dict[str, any]:
        """获取集群状态"""
        return {
            "services": self.service.get_service_status(),
            "ports": self.service.check_ports()
        }

# 使用示例
if __name__ == "__main__":
    # 创建集群管理器
    cluster_manager = RocketMQClusterManager(
        rocketmq_home="/opt/rocketmq/current",
        config_dir="/opt/rocketmq/conf"
    )
    
    # 集群配置
    cluster_config = {
        "nameservers": [
            {"host": "localhost", "port": 9876}
        ],
        "brokers": [
            {"broker_name": "broker-a", "broker_id": 0},
            {"broker_name": "broker-a", "broker_id": 1}
        ]
    }
    
    try:
        # 启动集群
        if cluster_manager.start_cluster(cluster_config):
            print("\n集群启动成功!")
            
            # 显示状态
            status = cluster_manager.get_cluster_status()
            print("\n服务状态:")
            for service, state in status["services"].items():
                print(f"  {service}: {state}")
            
            print("\n端口状态:")
            for port, active in status["ports"].items():
                print(f"  {port}: {'活跃' if active else '未活跃'}")
            
            # 等待用户输入停止
            input("\n按回车键停止集群...")
            
        # 停止集群
        cluster_manager.stop_cluster()
        print("集群已停止")
        
    except KeyboardInterrupt:
        print("\n收到中断信号,停止集群...")
        cluster_manager.stop_cluster()

1.3.2 验证安装

import subprocess
import json
from typing import Dict, List, Optional
from dataclasses import dataclass

@dataclass
class VerificationResult:
    """验证结果"""
    test_name: str
    success: bool
    message: str
    details: Optional[Dict] = None

class RocketMQVerifier:
    """RocketMQ安装验证器"""
    
    def __init__(self, rocketmq_home: str, namesrv_addr: str = "localhost:9876"):
        self.rocketmq_home = rocketmq_home
        self.namesrv_addr = namesrv_addr
        self.admin_tool = f"{rocketmq_home}/bin/mqadmin"
    
    def verify_nameserver(self) -> VerificationResult:
        """验证NameServer"""
        try:
            # 检查NameServer状态
            cmd = ["sh", self.admin_tool, "namesrvStatus", "-n", self.namesrv_addr]
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
            
            if result.returncode == 0:
                return VerificationResult(
                    test_name="NameServer连接测试",
                    success=True,
                    message="NameServer运行正常",
                    details={"output": result.stdout}
                )
            else:
                return VerificationResult(
                    test_name="NameServer连接测试",
                    success=False,
                    message=f"NameServer连接失败: {result.stderr}"
                )
                
        except Exception as e:
            return VerificationResult(
                test_name="NameServer连接测试",
                success=False,
                message=f"测试异常: {e}"
            )
    
    def verify_broker(self) -> VerificationResult:
        """验证Broker"""
        try:
            # 检查Broker状态
            cmd = ["sh", self.admin_tool, "clusterList", "-n", self.namesrv_addr]
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
            
            if result.returncode == 0:
                output = result.stdout
                if "BrokerName" in output and "TotalQueues" in output:
                    return VerificationResult(
                        test_name="Broker状态测试",
                        success=True,
                        message="Broker运行正常",
                        details={"cluster_info": output}
                    )
                else:
                    return VerificationResult(
                        test_name="Broker状态测试",
                        success=False,
                        message="未发现活跃的Broker"
                    )
            else:
                return VerificationResult(
                    test_name="Broker状态测试",
                    success=False,
                    message=f"获取Broker状态失败: {result.stderr}"
                )
                
        except Exception as e:
            return VerificationResult(
                test_name="Broker状态测试",
                success=False,
                message=f"测试异常: {e}"
            )
    
    def verify_topic_operations(self) -> VerificationResult:
        """验证Topic操作"""
        test_topic = "VerificationTestTopic"
        
        try:
            # 创建测试Topic
            create_cmd = [
                "sh", self.admin_tool, "updateTopic",
                "-n", self.namesrv_addr,
                "-t", test_topic,
                "-c", "DefaultCluster"
            ]
            
            create_result = subprocess.run(create_cmd, capture_output=True, text=True, timeout=10)
            
            if create_result.returncode != 0:
                return VerificationResult(
                    test_name="Topic操作测试",
                    success=False,
                    message=f"创建Topic失败: {create_result.stderr}"
                )
            
            # 查询Topic
            query_cmd = [
                "sh", self.admin_tool, "topicStatus",
                "-n", self.namesrv_addr,
                "-t", test_topic
            ]
            
            query_result = subprocess.run(query_cmd, capture_output=True, text=True, timeout=10)
            
            if query_result.returncode == 0:
                # 删除测试Topic
                delete_cmd = [
                    "sh", self.admin_tool, "deleteTopic",
                    "-n", self.namesrv_addr,
                    "-t", test_topic,
                    "-c", "DefaultCluster"
                ]
                subprocess.run(delete_cmd, capture_output=True, text=True, timeout=10)
                
                return VerificationResult(
                    test_name="Topic操作测试",
                    success=True,
                    message="Topic创建和查询成功",
                    details={"topic_info": query_result.stdout}
                )
            else:
                return VerificationResult(
                    test_name="Topic操作测试",
                    success=False,
                    message=f"查询Topic失败: {query_result.stderr}"
                )
                
        except Exception as e:
            return VerificationResult(
                test_name="Topic操作测试",
                success=False,
                message=f"测试异常: {e}"
            )
    
    def verify_message_operations(self) -> VerificationResult:
        """验证消息操作"""
        try:
            # 这里可以添加简单的消息发送和接收测试
            # 由于需要编写Java代码或使用Python客户端,这里简化处理
            return VerificationResult(
                test_name="消息操作测试",
                success=True,
                message="消息操作功能可用(需要客户端测试)"
            )
            
        except Exception as e:
            return VerificationResult(
                test_name="消息操作测试",
                success=False,
                message=f"测试异常: {e}"
            )
    
    def run_all_verifications(self) -> List[VerificationResult]:
        """运行所有验证测试"""
        results = []
        
        print("开始验证RocketMQ安装...")
        
        # 验证NameServer
        print("1. 验证NameServer...")
        ns_result = self.verify_nameserver()
        results.append(ns_result)
        print(f"   结果: {'✓' if ns_result.success else '✗'} {ns_result.message}")
        
        # 验证Broker
        print("2. 验证Broker...")
        broker_result = self.verify_broker()
        results.append(broker_result)
        print(f"   结果: {'✓' if broker_result.success else '✗'} {broker_result.message}")
        
        # 验证Topic操作
        print("3. 验证Topic操作...")
        topic_result = self.verify_topic_operations()
        results.append(topic_result)
        print(f"   结果: {'✓' if topic_result.success else '✗'} {topic_result.message}")
        
        # 验证消息操作
        print("4. 验证消息操作...")
        msg_result = self.verify_message_operations()
        results.append(msg_result)
        print(f"   结果: {'✓' if msg_result.success else '✗'} {msg_result.message}")
        
        return results
    
    def generate_verification_report(self, results: List[VerificationResult]) -> str:
        """生成验证报告"""
        report = ["\n" + "="*60]
        report.append("RocketMQ安装验证报告")
        report.append("="*60)
        
        success_count = sum(1 for r in results if r.success)
        total_count = len(results)
        
        report.append(f"\n总体结果: {success_count}/{total_count} 项测试通过")
        report.append(f"成功率: {success_count/total_count*100:.1f}%")
        
        report.append("\n详细结果:")
        for i, result in enumerate(results, 1):
            status = "✓ 通过" if result.success else "✗ 失败"
            report.append(f"{i}. {result.test_name}: {status}")
            report.append(f"   {result.message}")
            if result.details:
                report.append(f"   详情: {str(result.details)[:100]}...")
        
        if success_count == total_count:
            report.append("\n🎉 恭喜!RocketMQ安装验证全部通过!")
        else:
            report.append("\n⚠️  部分测试失败,请检查配置和服务状态")
        
        report.append("="*60)
        
        return "\n".join(report)

# 验证示例
if __name__ == "__main__":
    # 创建验证器
    verifier = RocketMQVerifier(
        rocketmq_home="/opt/rocketmq/current",
        namesrv_addr="localhost:9876"
    )
    
    # 运行验证
    results = verifier.run_all_verifications()
    
    # 生成报告
    report = verifier.generate_verification_report(results)
    print(report)

1.4 常见问题与解决方案

1.4.1 安装问题

from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Optional

class ProblemCategory(Enum):
    """问题类别"""
    INSTALLATION = "安装问题"
    CONFIGURATION = "配置问题"
    STARTUP = "启动问题"
    NETWORK = "网络问题"
    PERFORMANCE = "性能问题"
    PERMISSION = "权限问题"

class SeverityLevel(Enum):
    """严重程度"""
    CRITICAL = "严重"
    HIGH = "高"
    MEDIUM = "中等"
    LOW = "低"

@dataclass
class Solution:
    """解决方案"""
    description: str
    steps: List[str]
    commands: List[str] = None
    notes: List[str] = None

@dataclass
class CommonProblem:
    """常见问题"""
    title: str
    category: ProblemCategory
    severity: SeverityLevel
    symptoms: List[str]
    causes: List[str]
    solutions: List[Solution]
    related_logs: List[str] = None

class RocketMQTroubleshooter:
    """RocketMQ故障排除器"""
    
    def __init__(self):
        self.problems = self._initialize_problems()
    
    def _initialize_problems(self) -> List[CommonProblem]:
        """初始化常见问题库"""
        return [
            CommonProblem(
                title="Java环境未配置或版本不兼容",
                category=ProblemCategory.INSTALLATION,
                severity=SeverityLevel.CRITICAL,
                symptoms=[
                    "启动时提示找不到java命令",
                    "Java版本过低警告",
                    "JAVA_HOME未设置错误"
                ],
                causes=[
                    "系统未安装JDK",
                    "JDK版本低于1.8",
                    "JAVA_HOME环境变量未设置",
                    "PATH环境变量未包含Java路径"
                ],
                solutions=[
                    Solution(
                        description="安装并配置JDK 1.8+",
                        steps=[
                            "下载并安装JDK 1.8或更高版本",
                            "设置JAVA_HOME环境变量",
                            "将$JAVA_HOME/bin添加到PATH",
                            "验证Java安装"
                        ],
                        commands=[
                            "java -version",
                            "echo $JAVA_HOME",
                            "export JAVA_HOME=/usr/lib/jvm/java-8-openjdk",
                            "export PATH=$PATH:$JAVA_HOME/bin"
                        ]
                    )
                ]
            ),
            
            CommonProblem(
                title="内存不足导致启动失败",
                category=ProblemCategory.STARTUP,
                severity=SeverityLevel.HIGH,
                symptoms=[
                    "OutOfMemoryError异常",
                    "进程启动后立即退出",
                    "系统响应缓慢"
                ],
                causes=[
                    "JVM堆内存设置过大",
                    "系统可用内存不足",
                    "内存泄漏"
                ],
                solutions=[
                    Solution(
                        description="调整JVM内存参数",
                        steps=[
                            "检查系统可用内存",
                            "调整JVM堆内存大小",
                            "优化GC参数",
                            "监控内存使用情况"
                        ],
                        commands=[
                            "free -h",
                            "export JAVA_OPT='-server -Xms512m -Xmx1g -Xmn256m'",
                            "jstat -gc <pid>"
                        ],
                        notes=[
                            "NameServer建议内存: 1GB",
                            "Broker建议内存: 2GB+",
                            "生产环境建议使用G1GC"
                        ]
                    )
                ]
            ),
            
            CommonProblem(
                title="端口被占用",
                category=ProblemCategory.NETWORK,
                severity=SeverityLevel.MEDIUM,
                symptoms=[
                    "Address already in use错误",
                    "端口绑定失败",
                    "服务无法启动"
                ],
                causes=[
                    "默认端口被其他程序占用",
                    "防火墙阻止端口访问",
                    "多个实例使用相同端口"
                ],
                solutions=[
                    Solution(
                        description="检查并释放端口",
                        steps=[
                            "检查端口占用情况",
                            "终止占用端口的进程",
                            "修改配置使用其他端口",
                            "配置防火墙规则"
                        ],
                        commands=[
                            "netstat -tulpn | grep :9876",
                            "lsof -i :9876",
                            "kill -9 <pid>",
                            "firewall-cmd --add-port=9876/tcp --permanent"
                        ]
                    )
                ]
            ),
            
            CommonProblem(
                title="Broker无法连接到NameServer",
                category=ProblemCategory.NETWORK,
                severity=SeverityLevel.HIGH,
                symptoms=[
                    "Broker启动后在NameServer中不可见",
                    "连接超时错误",
                    "路由信息为空"
                ],
                causes=[
                    "NameServer地址配置错误",
                    "网络连接问题",
                    "NameServer未启动",
                    "防火墙阻止连接"
                ],
                solutions=[
                    Solution(
                        description="检查网络连接和配置",
                        steps=[
                            "验证NameServer是否运行",
                            "检查网络连通性",
                            "验证配置文件中的地址",
                            "检查防火墙设置"
                        ],
                        commands=[
                            "telnet <nameserver_ip> 9876",
                            "ping <nameserver_ip>",
                            "sh mqadmin clusterList -n <nameserver_addr>"
                        ]
                    )
                ]
            )
        ]
    
    def diagnose_by_symptoms(self, symptoms: List[str]) -> List[CommonProblem]:
        """根据症状诊断问题"""
        matched_problems = []
        
        for problem in self.problems:
            for symptom in symptoms:
                if any(symptom.lower() in s.lower() for s in problem.symptoms):
                    matched_problems.append(problem)
                    break
        
        return matched_problems
    
    def get_problems_by_category(self, category: ProblemCategory) -> List[CommonProblem]:
        """根据类别获取问题"""
        return [p for p in self.problems if p.category == category]
    
    def generate_troubleshooting_guide(self) -> str:
        """生成故障排除指南"""
        guide = ["# RocketMQ故障排除指南\n"]
        
        # 按类别组织问题
        categories = {}
        for problem in self.problems:
            if problem.category not in categories:
                categories[problem.category] = []
            categories[problem.category].append(problem)
        
        for category, problems in categories.items():
            guide.append(f"## {category.value}\n")
            
            for i, problem in enumerate(problems, 1):
                guide.append(f"### {i}. {problem.title}")
                guide.append(f"**严重程度**: {problem.severity.value}\n")
                
                guide.append("**症状**:")
                for symptom in problem.symptoms:
                    guide.append(f"- {symptom}")
                guide.append("")
                
                guide.append("**可能原因**:")
                for cause in problem.causes:
                    guide.append(f"- {cause}")
                guide.append("")
                
                guide.append("**解决方案**:")
                for j, solution in enumerate(problem.solutions, 1):
                    guide.append(f"**方案{j}**: {solution.description}")
                    guide.append("步骤:")
                    for step in solution.steps:
                        guide.append(f"  1. {step}")
                    
                    if solution.commands:
                        guide.append("相关命令:")
                        for cmd in solution.commands:
                            guide.append(f"  ```bash\n  {cmd}\n  ```")
                    
                    if solution.notes:
                        guide.append("注意事项:")
                        for note in solution.notes:
                            guide.append(f"  - {note}")
                    guide.append("")
                
                guide.append("---\n")
        
        return "\n".join(guide)

# 使用示例
if __name__ == "__main__":
    troubleshooter = RocketMQTroubleshooter()
    
    # 根据症状诊断
    symptoms = ["OutOfMemoryError", "进程启动后立即退出"]
    matched_problems = troubleshooter.diagnose_by_symptoms(symptoms)
    
    print("根据症状匹配的问题:")
    for problem in matched_problems:
        print(f"- {problem.title} ({problem.severity.value})")
    
    # 生成完整的故障排除指南
    guide = troubleshooter.generate_troubleshooting_guide()
    print("\n" + guide[:500] + "...")  # 显示前500字符

1.5 本章总结

1.5.1 核心知识点

  1. RocketMQ特性:高性能、高可用、低延迟、海量消息堆积
  2. 架构组件:NameServer、Broker、Producer、Consumer
  3. 安装配置:Java环境、下载安装、配置文件、环境变量
  4. 启动验证:服务启动、连接测试、功能验证
  5. 故障排除:常见问题识别和解决方案

1.5.2 最佳实践

  1. 环境准备:确保Java 1.8+,充足的内存和磁盘空间
  2. 配置优化:根据业务需求调整JVM参数和Broker配置
  3. 监控验证:定期检查服务状态和性能指标
  4. 故障预防:建立监控告警机制,定期备份配置

1.5.3 练习题

  1. 在本地环境安装RocketMQ并启动单机版集群
  2. 配置一个2Master-2Slave的RocketMQ集群
  3. 编写脚本自动化RocketMQ的启动和停止
  4. 模拟常见故障并使用故障排除指南解决
  5. 优化JVM参数提升RocketMQ性能