6.1 RocketMQ集群架构

6.1.1 集群架构概述

from enum import Enum
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
import time
import json

# 集群角色定义
class ClusterRole(Enum):
    """集群角色枚举"""
    NAME_SERVER = "nameserver"
    BROKER_MASTER = "broker_master"
    BROKER_SLAVE = "broker_slave"
    PROXY = "proxy"
    CONSOLE = "console"

# 集群模式定义
class ClusterMode(Enum):
    """集群模式枚举"""
    SINGLE = "single"  # 单机模式
    CLUSTER = "cluster"  # 集群模式
    DLEDGER = "dledger"  # DLedger模式

# 同步策略
class SyncStrategy(Enum):
    """同步策略枚举"""
    SYNC_MASTER = "SYNC_MASTER"  # 同步双写
    ASYNC_MASTER = "ASYNC_MASTER"  # 异步复制

# 刷盘策略
class FlushDiskType(Enum):
    """刷盘策略枚举"""
    SYNC_FLUSH = "SYNC_FLUSH"  # 同步刷盘
    ASYNC_FLUSH = "ASYNC_FLUSH"  # 异步刷盘

@dataclass
class NodeInfo:
    """节点信息"""
    node_id: str
    role: ClusterRole
    host: str
    port: int
    status: str = "UNKNOWN"
    last_update_time: int = 0
    
    def __post_init__(self):
        if self.last_update_time == 0:
            self.last_update_time = int(time.time() * 1000)
    
    def is_online(self) -> bool:
        """检查节点是否在线"""
        current_time = int(time.time() * 1000)
        return (current_time - self.last_update_time) < 30000  # 30秒超时
    
    def get_address(self) -> str:
        """获取节点地址"""
        return f"{self.host}:{self.port}"

@dataclass
class BrokerConfig:
    """Broker配置"""
    broker_name: str
    broker_id: int  # 0表示Master,>0表示Slave
    cluster_name: str
    name_server_addr: str
    
    # 存储配置
    store_path_root_dir: str = "/opt/rocketmq/store"
    store_path_commit_log: str = "/opt/rocketmq/store/commitlog"
    store_path_consume_queue: str = "/opt/rocketmq/store/consumequeue"
    store_path_index: str = "/opt/rocketmq/store/index"
    
    # 性能配置
    send_message_thread_pool_nums: int = 16
    pull_message_thread_pool_nums: int = 32
    admin_broker_thread_pool_nums: int = 16
    client_manager_thread_pool_nums: int = 32
    
    # 同步配置
    broker_role: str = "ASYNC_MASTER"
    flush_disk_type: str = "ASYNC_FLUSH"
    
    # 网络配置
    listen_port: int = 10911
    ha_listen_port: int = 10912
    
    # 其他配置
    delete_when: str = "04"
    file_reserved_time: int = 72
    broker_cluster_name: str = "DefaultCluster"
    
    def to_properties(self) -> str:
        """转换为properties格式"""
        properties = []
        properties.append(f"brokerName={self.broker_name}")
        properties.append(f"brokerId={self.broker_id}")
        properties.append(f"clusterName={self.cluster_name}")
        properties.append(f"namesrvAddr={self.name_server_addr}")
        properties.append(f"storePathRootDir={self.store_path_root_dir}")
        properties.append(f"storePathCommitLog={self.store_path_commit_log}")
        properties.append(f"storePathConsumeQueue={self.store_path_consume_queue}")
        properties.append(f"storePathIndex={self.store_path_index}")
        properties.append(f"sendMessageThreadPoolNums={self.send_message_thread_pool_nums}")
        properties.append(f"pullMessageThreadPoolNums={self.pull_message_thread_pool_nums}")
        properties.append(f"adminBrokerThreadPoolNums={self.admin_broker_thread_pool_nums}")
        properties.append(f"clientManagerThreadPoolNums={self.client_manager_thread_pool_nums}")
        properties.append(f"brokerRole={self.broker_role}")
        properties.append(f"flushDiskType={self.flush_disk_type}")
        properties.append(f"listenPort={self.listen_port}")
        properties.append(f"haListenPort={self.ha_listen_port}")
        properties.append(f"deleteWhen={self.delete_when}")
        properties.append(f"fileReservedTime={self.file_reserved_time}")
        properties.append(f"brokerClusterName={self.broker_cluster_name}")
        
        return "\n".join(properties)

class RocketMQClusterArchitecture:
    """RocketMQ集群架构"""
    
    def __init__(self, cluster_name: str, mode: ClusterMode = ClusterMode.CLUSTER):
        self.cluster_name = cluster_name
        self.mode = mode
        self.nodes: Dict[str, NodeInfo] = {}
        self.brokers: Dict[str, List[NodeInfo]] = {}  # broker_name -> [master, slave1, slave2, ...]
        self.name_servers: List[NodeInfo] = []
        
    def add_name_server(self, host: str, port: int = 9876) -> str:
        """添加NameServer节点"""
        node_id = f"nameserver_{host}_{port}"
        node = NodeInfo(
            node_id=node_id,
            role=ClusterRole.NAME_SERVER,
            host=host,
            port=port,
            status="RUNNING"
        )
        self.nodes[node_id] = node
        self.name_servers.append(node)
        return node_id
    
    def add_broker(self, broker_name: str, host: str, broker_id: int = 0, 
                   port: int = 10911) -> str:
        """添加Broker节点"""
        role = ClusterRole.BROKER_MASTER if broker_id == 0 else ClusterRole.BROKER_SLAVE
        node_id = f"broker_{broker_name}_{broker_id}_{host}_{port}"
        
        node = NodeInfo(
            node_id=node_id,
            role=role,
            host=host,
            port=port,
            status="RUNNING"
        )
        
        self.nodes[node_id] = node
        
        if broker_name not in self.brokers:
            self.brokers[broker_name] = []
        
        # 按broker_id排序插入
        inserted = False
        for i, existing_node in enumerate(self.brokers[broker_name]):
            existing_broker_id = int(existing_node.node_id.split('_')[2])
            if broker_id < existing_broker_id:
                self.brokers[broker_name].insert(i, node)
                inserted = True
                break
        
        if not inserted:
            self.brokers[broker_name].append(node)
        
        return node_id
    
    def get_name_server_addresses(self) -> str:
        """获取NameServer地址列表"""
        addresses = []
        for ns in self.name_servers:
            if ns.is_online():
                addresses.append(ns.get_address())
        return ";".join(addresses)
    
    def get_broker_master(self, broker_name: str) -> Optional[NodeInfo]:
        """获取Broker的Master节点"""
        if broker_name in self.brokers and self.brokers[broker_name]:
            master = self.brokers[broker_name][0]
            if master.role == ClusterRole.BROKER_MASTER:
                return master
        return None
    
    def get_broker_slaves(self, broker_name: str) -> List[NodeInfo]:
        """获取Broker的Slave节点列表"""
        if broker_name in self.brokers:
            return [node for node in self.brokers[broker_name] 
                   if node.role == ClusterRole.BROKER_SLAVE]
        return []
    
    def get_cluster_topology(self) -> Dict[str, Any]:
        """获取集群拓扑信息"""
        topology = {
            "cluster_name": self.cluster_name,
            "mode": self.mode.value,
            "name_servers": [],
            "brokers": {},
            "total_nodes": len(self.nodes),
            "online_nodes": sum(1 for node in self.nodes.values() if node.is_online())
        }
        
        # NameServer信息
        for ns in self.name_servers:
            topology["name_servers"].append({
                "node_id": ns.node_id,
                "address": ns.get_address(),
                "status": "ONLINE" if ns.is_online() else "OFFLINE",
                "last_update": ns.last_update_time
            })
        
        # Broker信息
        for broker_name, broker_nodes in self.brokers.items():
            broker_info = {
                "master": None,
                "slaves": [],
                "total_nodes": len(broker_nodes),
                "online_nodes": sum(1 for node in broker_nodes if node.is_online())
            }
            
            for node in broker_nodes:
                node_info = {
                    "node_id": node.node_id,
                    "address": node.get_address(),
                    "status": "ONLINE" if node.is_online() else "OFFLINE",
                    "last_update": node.last_update_time
                }
                
                if node.role == ClusterRole.BROKER_MASTER:
                    broker_info["master"] = node_info
                else:
                    broker_info["slaves"].append(node_info)
            
            topology["brokers"][broker_name] = broker_info
        
        return topology
    
    def generate_broker_config(self, broker_name: str, broker_id: int) -> BrokerConfig:
        """生成Broker配置"""
        name_server_addr = self.get_name_server_addresses()
        
        # 根据broker_id确定角色
        if broker_id == 0:
            broker_role = "ASYNC_MASTER"
        else:
            broker_role = "SLAVE"
        
        config = BrokerConfig(
            broker_name=broker_name,
            broker_id=broker_id,
            cluster_name=self.cluster_name,
            name_server_addr=name_server_addr,
            broker_role=broker_role
        )
        
        return config
    
    def validate_cluster(self) -> List[str]:
        """验证集群配置"""
        issues = []
        
        # 检查NameServer
        if not self.name_servers:
            issues.append("集群中没有NameServer节点")
        elif len(self.name_servers) < 2:
            issues.append("建议至少部署2个NameServer节点以保证高可用")
        
        # 检查Broker
        if not self.brokers:
            issues.append("集群中没有Broker节点")
        else:
            for broker_name, broker_nodes in self.brokers.items():
                # 检查是否有Master
                has_master = any(node.role == ClusterRole.BROKER_MASTER 
                               for node in broker_nodes)
                if not has_master:
                    issues.append(f"Broker组 {broker_name} 没有Master节点")
                
                # 检查Slave数量
                slave_count = sum(1 for node in broker_nodes 
                                if node.role == ClusterRole.BROKER_SLAVE)
                if slave_count == 0:
                    issues.append(f"Broker组 {broker_name} 没有Slave节点,建议至少部署1个Slave")
        
        # 检查节点在线状态
        offline_nodes = [node.node_id for node in self.nodes.values() 
                        if not node.is_online()]
        if offline_nodes:
            issues.append(f"以下节点离线: {', '.join(offline_nodes)}")
        
        return issues
    
    def print_cluster_info(self):
        """打印集群信息"""
        print(f"RocketMQ集群信息 - {self.cluster_name}")
        print("=" * 60)
        print(f"集群模式: {self.mode.value}")
        print(f"总节点数: {len(self.nodes)}")
        print(f"在线节点数: {sum(1 for node in self.nodes.values() if node.is_online())}")
        
        print("\nNameServer节点:")
        for ns in self.name_servers:
            status = "在线" if ns.is_online() else "离线"
            print(f"  - {ns.get_address()} [{status}]")
        
        print("\nBroker节点:")
        for broker_name, broker_nodes in self.brokers.items():
            print(f"  {broker_name}:")
            for node in broker_nodes:
                role = "Master" if node.role == ClusterRole.BROKER_MASTER else "Slave"
                status = "在线" if node.is_online() else "离线"
                print(f"    - {node.get_address()} [{role}] [{status}]")
        
        # 打印验证结果
        issues = self.validate_cluster()
        if issues:
            print("\n⚠️  集群配置问题:")
            for issue in issues:
                print(f"  - {issue}")
        else:
            print("\n✅ 集群配置正常")

6.1.2 集群部署模式

# 集群部署模式
class ClusterDeploymentMode:
    """集群部署模式"""
    
    @staticmethod
    def create_single_master_cluster(cluster_name: str) -> RocketMQClusterArchitecture:
        """创建单Master集群"""
        cluster = RocketMQClusterArchitecture(cluster_name, ClusterMode.SINGLE)
        
        # 添加NameServer
        cluster.add_name_server("192.168.1.10", 9876)
        
        # 添加单个Master Broker
        cluster.add_broker("broker-a", "192.168.1.11", 0, 10911)
        
        return cluster
    
    @staticmethod
    def create_multi_master_cluster(cluster_name: str, master_count: int = 2) -> RocketMQClusterArchitecture:
        """创建多Master集群"""
        cluster = RocketMQClusterArchitecture(cluster_name, ClusterMode.CLUSTER)
        
        # 添加NameServer集群
        for i in range(2):
            cluster.add_name_server(f"192.168.1.{10 + i}", 9876)
        
        # 添加多个Master Broker
        for i in range(master_count):
            broker_name = f"broker-{chr(ord('a') + i)}"
            cluster.add_broker(broker_name, f"192.168.1.{20 + i}", 0, 10911)
        
        return cluster
    
    @staticmethod
    def create_master_slave_cluster(cluster_name: str, broker_groups: int = 2) -> RocketMQClusterArchitecture:
        """创建Master-Slave集群"""
        cluster = RocketMQClusterArchitecture(cluster_name, ClusterMode.CLUSTER)
        
        # 添加NameServer集群
        for i in range(2):
            cluster.add_name_server(f"192.168.1.{10 + i}", 9876)
        
        # 添加Broker组(每组包含1个Master和1个Slave)
        for i in range(broker_groups):
            broker_name = f"broker-{chr(ord('a') + i)}"
            
            # Master节点
            cluster.add_broker(broker_name, f"192.168.1.{20 + i * 2}", 0, 10911)
            
            # Slave节点
            cluster.add_broker(broker_name, f"192.168.1.{21 + i * 2}", 1, 10911)
        
        return cluster
    
    @staticmethod
    def create_dledger_cluster(cluster_name: str, broker_groups: int = 1, 
                              replicas_per_group: int = 3) -> RocketMQClusterArchitecture:
        """创建DLedger集群"""
        cluster = RocketMQClusterArchitecture(cluster_name, ClusterMode.DLEDGER)
        
        # 添加NameServer集群
        for i in range(2):
            cluster.add_name_server(f"192.168.1.{10 + i}", 9876)
        
        # 添加DLedger Broker组
        for group_id in range(broker_groups):
            broker_name = f"broker-{chr(ord('a') + group_id)}"
            
            for replica_id in range(replicas_per_group):
                # DLedger模式下,所有节点都是潜在的Master
                host = f"192.168.1.{30 + group_id * replicas_per_group + replica_id}"
                cluster.add_broker(broker_name, host, replica_id, 10911)
        
        return cluster

# 集群部署示例
class ClusterDeploymentExample:
    """集群部署示例"""
    
    def __init__(self):
        self.clusters: Dict[str, RocketMQClusterArchitecture] = {}
    
    def create_development_cluster(self) -> str:
        """创建开发环境集群"""
        cluster_name = "dev-cluster"
        cluster = ClusterDeploymentMode.create_single_master_cluster(cluster_name)
        self.clusters[cluster_name] = cluster
        
        print(f"创建开发环境集群: {cluster_name}")
        cluster.print_cluster_info()
        
        return cluster_name
    
    def create_testing_cluster(self) -> str:
        """创建测试环境集群"""
        cluster_name = "test-cluster"
        cluster = ClusterDeploymentMode.create_multi_master_cluster(cluster_name, 2)
        self.clusters[cluster_name] = cluster
        
        print(f"创建测试环境集群: {cluster_name}")
        cluster.print_cluster_info()
        
        return cluster_name
    
    def create_production_cluster(self) -> str:
        """创建生产环境集群"""
        cluster_name = "prod-cluster"
        cluster = ClusterDeploymentMode.create_master_slave_cluster(cluster_name, 3)
        self.clusters[cluster_name] = cluster
        
        print(f"创建生产环境集群: {cluster_name}")
        cluster.print_cluster_info()
        
        return cluster_name
    
    def create_high_availability_cluster(self) -> str:
        """创建高可用集群"""
        cluster_name = "ha-cluster"
        cluster = ClusterDeploymentMode.create_dledger_cluster(cluster_name, 2, 3)
        self.clusters[cluster_name] = cluster
        
        print(f"创建高可用集群: {cluster_name}")
        cluster.print_cluster_info()
        
        return cluster_name
    
    def get_cluster(self, cluster_name: str) -> Optional[RocketMQClusterArchitecture]:
        """获取集群"""
        return self.clusters.get(cluster_name)
    
    def list_clusters(self) -> List[str]:
        """列出所有集群"""
        return list(self.clusters.keys())
    
    def compare_clusters(self):
        """比较不同集群配置"""
        print("集群配置对比:")
        print("=" * 80)
        
        headers = ["集群名称", "模式", "NameServer数", "Broker组数", "总节点数", "适用场景"]
        print(f"{headers[0]:<15} {headers[1]:<10} {headers[2]:<12} {headers[3]:<10} {headers[4]:<8} {headers[5]}")
        print("-" * 80)
        
        scenarios = {
            "dev-cluster": "开发环境",
            "test-cluster": "测试环境",
            "prod-cluster": "生产环境",
            "ha-cluster": "高可用环境"
        }
        
        for cluster_name, cluster in self.clusters.items():
            topology = cluster.get_cluster_topology()
            ns_count = len(topology["name_servers"])
            broker_count = len(topology["brokers"])
            total_nodes = topology["total_nodes"]
            scenario = scenarios.get(cluster_name, "未知")
            
            print(f"{cluster_name:<15} {cluster.mode.value:<10} {ns_count:<12} {broker_count:<10} {total_nodes:<8} {scenario}")

# 使用示例
if __name__ == "__main__":
    # 创建集群部署示例
    deployment = ClusterDeploymentExample()
    
    # 创建不同环境的集群
    deployment.create_development_cluster()
    print("\n" + "=" * 80 + "\n")
    
    deployment.create_testing_cluster()
    print("\n" + "=" * 80 + "\n")
    
    deployment.create_production_cluster()
    print("\n" + "=" * 80 + "\n")
    
    deployment.create_high_availability_cluster()
    print("\n" + "=" * 80 + "\n")
    
    # 比较集群配置
    deployment.compare_clusters()

6.2 集群安装与配置

6.2.1 环境准备

import os
import subprocess
import platform
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass

@dataclass
class SystemRequirement:
    """系统要求"""
    min_cpu_cores: int
    min_memory_gb: int
    min_disk_gb: int
    java_version: str
    os_versions: List[str]
    
    def check_cpu(self) -> Tuple[bool, str]:
        """检查CPU"""
        try:
            cpu_count = os.cpu_count()
            if cpu_count >= self.min_cpu_cores:
                return True, f"CPU核心数: {cpu_count} (满足要求: {self.min_cpu_cores})"
            else:
                return False, f"CPU核心数不足: {cpu_count} < {self.min_cpu_cores}"
        except Exception as e:
            return False, f"无法检查CPU: {str(e)}"
    
    def check_memory(self) -> Tuple[bool, str]:
        """检查内存"""
        try:
            if platform.system() == "Linux":
                with open('/proc/meminfo', 'r') as f:
                    for line in f:
                        if line.startswith('MemTotal:'):
                            mem_kb = int(line.split()[1])
                            mem_gb = mem_kb / 1024 / 1024
                            if mem_gb >= self.min_memory_gb:
                                return True, f"内存大小: {mem_gb:.1f}GB (满足要求: {self.min_memory_gb}GB)"
                            else:
                                return False, f"内存不足: {mem_gb:.1f}GB < {self.min_memory_gb}GB"
            else:
                return True, "跳过内存检查(非Linux系统)"
        except Exception as e:
            return False, f"无法检查内存: {str(e)}"
    
    def check_disk(self, path: str = "/") -> Tuple[bool, str]:
        """检查磁盘空间"""
        try:
            statvfs = os.statvfs(path)
            free_gb = (statvfs.f_frsize * statvfs.f_bavail) / (1024**3)
            if free_gb >= self.min_disk_gb:
                return True, f"可用磁盘空间: {free_gb:.1f}GB (满足要求: {self.min_disk_gb}GB)"
            else:
                return False, f"磁盘空间不足: {free_gb:.1f}GB < {self.min_disk_gb}GB"
        except Exception as e:
            return False, f"无法检查磁盘空间: {str(e)}"
    
    def check_java(self) -> Tuple[bool, str]:
        """检查Java版本"""
        try:
            result = subprocess.run(['java', '-version'], 
                                  capture_output=True, text=True, stderr=subprocess.STDOUT)
            if result.returncode == 0:
                java_info = result.stderr or result.stdout
                if self.java_version in java_info:
                    return True, f"Java版本检查通过: {java_info.split()[2]}"
                else:
                    return False, f"Java版本不匹配,要求: {self.java_version}"
            else:
                return False, "Java未安装或无法执行"
        except Exception as e:
            return False, f"无法检查Java版本: {str(e)}"
    
    def check_os(self) -> Tuple[bool, str]:
        """检查操作系统"""
        try:
            current_os = platform.system()
            os_version = platform.release()
            os_info = f"{current_os} {os_version}"
            
            for supported_os in self.os_versions:
                if supported_os.lower() in current_os.lower():
                    return True, f"操作系统: {os_info} (支持)"
            
            return False, f"操作系统不支持: {os_info}"
        except Exception as e:
            return False, f"无法检查操作系统: {str(e)}"

class EnvironmentChecker:
    """环境检查器"""
    
    def __init__(self):
        # 定义不同角色的系统要求
        self.requirements = {
            "nameserver": SystemRequirement(
                min_cpu_cores=2,
                min_memory_gb=2,
                min_disk_gb=10,
                java_version="1.8",
                os_versions=["Linux", "Windows", "Darwin"]
            ),
            "broker": SystemRequirement(
                min_cpu_cores=4,
                min_memory_gb=8,
                min_disk_gb=100,
                java_version="1.8",
                os_versions=["Linux", "Windows", "Darwin"]
            ),
            "proxy": SystemRequirement(
                min_cpu_cores=2,
                min_memory_gb=4,
                min_disk_gb=20,
                java_version="1.8",
                os_versions=["Linux", "Windows", "Darwin"]
            )
        }
    
    def check_environment(self, role: str, storage_path: str = "/opt/rocketmq") -> Dict[str, Any]:
        """检查环境"""
        if role not in self.requirements:
            return {"error": f"未知角色: {role}"}
        
        requirement = self.requirements[role]
        results = {
            "role": role,
            "checks": {},
            "passed": True,
            "summary": []
        }
        
        # 执行各项检查
        checks = [
            ("CPU", requirement.check_cpu),
            ("内存", requirement.check_memory),
            ("磁盘", lambda: requirement.check_disk(storage_path)),
            ("Java", requirement.check_java),
            ("操作系统", requirement.check_os)
        ]
        
        for check_name, check_func in checks:
            try:
                passed, message = check_func()
                results["checks"][check_name] = {
                    "passed": passed,
                    "message": message
                }
                
                if not passed:
                    results["passed"] = False
                
                results["summary"].append(f"{'✅' if passed else '❌'} {check_name}: {message}")
            except Exception as e:
                results["checks"][check_name] = {
                    "passed": False,
                    "message": f"检查失败: {str(e)}"
                }
                results["passed"] = False
                results["summary"].append(f"❌ {check_name}: 检查失败: {str(e)}")
        
        return results
    
    def check_network_connectivity(self, hosts: List[str], port: int = 22) -> Dict[str, Any]:
        """检查网络连通性"""
        results = {
            "port": port,
            "connectivity": {},
            "all_reachable": True
        }
        
        for host in hosts:
            try:
                # 简单的网络连通性检查(这里使用ping)
                if platform.system() == "Windows":
                    cmd = ["ping", "-n", "1", "-w", "3000", host]
                else:
                    cmd = ["ping", "-c", "1", "-W", "3", host]
                
                result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
                
                if result.returncode == 0:
                    results["connectivity"][host] = {
                        "reachable": True,
                        "message": "连接正常"
                    }
                else:
                    results["connectivity"][host] = {
                        "reachable": False,
                        "message": "无法连接"
                    }
                    results["all_reachable"] = False
            
            except subprocess.TimeoutExpired:
                results["connectivity"][host] = {
                    "reachable": False,
                    "message": "连接超时"
                }
                results["all_reachable"] = False
            except Exception as e:
                results["connectivity"][host] = {
                    "reachable": False,
                    "message": f"检查失败: {str(e)}"
                }
                results["all_reachable"] = False
        
        return results
    
    def generate_environment_report(self, cluster: RocketMQClusterArchitecture) -> str:
        """生成环境检查报告"""
        report = []
        report.append(f"RocketMQ集群环境检查报告 - {cluster.cluster_name}")
        report.append("=" * 60)
        report.append(f"检查时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
        report.append(f"集群模式: {cluster.mode.value}")
        report.append("")
        
        # 检查NameServer节点
        report.append("NameServer节点检查:")
        report.append("-" * 30)
        for ns in cluster.name_servers:
            report.append(f"节点: {ns.get_address()}")
            env_result = self.check_environment("nameserver")
            for summary_line in env_result["summary"]:
                report.append(f"  {summary_line}")
            report.append("")
        
        # 检查Broker节点
        report.append("Broker节点检查:")
        report.append("-" * 30)
        for broker_name, broker_nodes in cluster.brokers.items():
            report.append(f"Broker组: {broker_name}")
            for node in broker_nodes:
                role_name = "Master" if node.role == ClusterRole.BROKER_MASTER else "Slave"
                report.append(f"  {role_name}节点: {node.get_address()}")
                env_result = self.check_environment("broker")
                for summary_line in env_result["summary"]:
                    report.append(f"    {summary_line}")
            report.append("")
        
        # 网络连通性检查
        all_hosts = []
        for node in cluster.nodes.values():
            all_hosts.append(node.host)
        
        if all_hosts:
            report.append("网络连通性检查:")
            report.append("-" * 30)
            connectivity_result = self.check_network_connectivity(list(set(all_hosts)))
            for host, conn_info in connectivity_result["connectivity"].items():
                status = "✅" if conn_info["reachable"] else "❌"
                report.append(f"{status} {host}: {conn_info['message']}")
            report.append("")
        
        # 总结
        report.append("检查总结:")
        report.append("-" * 30)
        
        # 统计检查结果
        total_nodes = len(cluster.nodes)
        # 这里简化处理,实际应该根据真实检查结果统计
        report.append(f"总节点数: {total_nodes}")
        report.append(f"环境检查: 请查看上述详细结果")
        report.append(f"网络连通性: {'正常' if connectivity_result.get('all_reachable', False) else '存在问题'}")
        
        return "\n".join(report)
    
    def print_environment_check(self, role: str):
        """打印环境检查结果"""
        print(f"RocketMQ {role.upper()} 环境检查")
        print("=" * 50)
        
        result = self.check_environment(role)
        
        if "error" in result:
            print(f"❌ {result['error']}")
            return
        
        for summary_line in result["summary"]:
            print(summary_line)
        
        print("\n总体结果:")
        if result["passed"]:
            print("✅ 环境检查通过,可以安装RocketMQ")
        else:
            print("❌ 环境检查未通过,请解决上述问题后重试")

# 使用示例
if __name__ == "__main__":
    # 创建环境检查器
    checker = EnvironmentChecker()
    
    # 检查NameServer环境
    print("检查NameServer环境:")
    checker.print_environment_check("nameserver")
    
    print("\n" + "=" * 60 + "\n")
    
    # 检查Broker环境
    print("检查Broker环境:")
    checker.print_environment_check("broker")
    
    print("\n" + "=" * 60 + "\n")
    
    # 生成集群环境报告
    cluster = ClusterDeploymentMode.create_production_cluster("prod-cluster")
    report = checker.generate_environment_report(cluster)
    print("集群环境检查报告:")
    print(report)

6.2.2 软件安装

import shutil
import tarfile
import zipfile
import urllib.request
from pathlib import Path

class RocketMQInstaller:
    """RocketMQ安装器"""
    
    def __init__(self, install_dir: str = "/opt/rocketmq"):
        self.install_dir = Path(install_dir)
        self.download_dir = Path("/tmp/rocketmq_download")
        self.version = "5.1.4"
        self.download_urls = {
            "binary": f"https://archive.apache.org/dist/rocketmq/{self.version}/rocketmq-all-{self.version}-bin-release.zip",
            "source": f"https://archive.apache.org/dist/rocketmq/{self.version}/rocketmq-all-{self.version}-source-release.zip"
        }
    
    def create_directories(self):
        """创建必要的目录"""
        directories = [
            self.install_dir,
            self.install_dir / "logs",
            self.install_dir / "store",
            self.install_dir / "conf",
            self.download_dir
        ]
        
        for directory in directories:
            directory.mkdir(parents=True, exist_ok=True)
            print(f"创建目录: {directory}")
    
    def download_rocketmq(self, package_type: str = "binary") -> Path:
        """下载RocketMQ"""
        if package_type not in self.download_urls:
            raise ValueError(f"不支持的包类型: {package_type}")
        
        url = self.download_urls[package_type]
        filename = url.split("/")[-1]
        download_path = self.download_dir / filename
        
        if download_path.exists():
            print(f"文件已存在: {download_path}")
            return download_path
        
        print(f"开始下载: {url}")
        try:
            urllib.request.urlretrieve(url, download_path)
            print(f"下载完成: {download_path}")
            return download_path
        except Exception as e:
            print(f"下载失败: {str(e)}")
            raise
    
    def extract_package(self, package_path: Path) -> Path:
        """解压安装包"""
        print(f"开始解压: {package_path}")
        
        if package_path.suffix == '.zip':
            with zipfile.ZipFile(package_path, 'r') as zip_ref:
                zip_ref.extractall(self.download_dir)
        elif package_path.suffix in ['.tar', '.gz']:
            with tarfile.open(package_path, 'r:*') as tar_ref:
                tar_ref.extractall(self.download_dir)
        else:
            raise ValueError(f"不支持的文件格式: {package_path.suffix}")
        
        # 查找解压后的目录
        extracted_dirs = [d for d in self.download_dir.iterdir() 
                         if d.is_dir() and 'rocketmq' in d.name.lower()]
        
        if not extracted_dirs:
            raise RuntimeError("未找到解压后的RocketMQ目录")
        
        extracted_dir = extracted_dirs[0]
        print(f"解压完成: {extracted_dir}")
        return extracted_dir
    
    def install_rocketmq(self, extracted_dir: Path):
        """安装RocketMQ"""
        print(f"开始安装RocketMQ到: {self.install_dir}")
        
        # 复制文件
        for item in extracted_dir.iterdir():
            dest = self.install_dir / item.name
            if item.is_dir():
                if dest.exists():
                    shutil.rmtree(dest)
                shutil.copytree(item, dest)
            else:
                shutil.copy2(item, dest)
            print(f"复制: {item.name}")
        
        # 设置执行权限
        bin_dir = self.install_dir / "bin"
        if bin_dir.exists():
            for script in bin_dir.glob("*.sh"):
                script.chmod(0o755)
                print(f"设置执行权限: {script.name}")
    
    def create_service_scripts(self):
        """创建服务启动脚本"""
        scripts_dir = self.install_dir / "scripts"
        scripts_dir.mkdir(exist_ok=True)
        
        # NameServer启动脚本
        nameserver_script = scripts_dir / "start-nameserver.sh"
        nameserver_content = f"""#!/bin/bash
# RocketMQ NameServer启动脚本

export ROCKETMQ_HOME={self.install_dir}
export JAVA_HOME=${{JAVA_HOME:-/usr/lib/jvm/java-8-openjdk}}

# JVM参数
export JAVA_OPT="-server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
export JAVA_OPT="$JAVA_OPT -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection"
export JAVA_OPT="$JAVA_OPT -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled"
export JAVA_OPT="$JAVA_OPT -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled"
export JAVA_OPT="$JAVA_OPT -XX:SurvivorRatio=8 -XX:-UseParNewGC"
export JAVA_OPT="$JAVA_OPT -verbose:gc -Xloggc:$ROCKETMQ_HOME/logs/rmq_srv_gc.log"
export JAVA_OPT="$JAVA_OPT -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
export JAVA_OPT="$JAVA_OPT -Djava.ext.dirs=$JAVA_HOME/jre/lib/ext:$ROCKETMQ_HOME/lib"
export JAVA_OPT="$JAVA_OPT -cp $ROCKETMQ_HOME/conf:$CLASSPATH"

echo "Starting RocketMQ NameServer..."
nohup java $JAVA_OPT org.apache.rocketmq.namesrv.NamesrvStartup > $ROCKETMQ_HOME/logs/namesrv.log 2>&1 &
echo $! > $ROCKETMQ_HOME/logs/namesrv.pid
echo "NameServer started, PID: $(cat $ROCKETMQ_HOME/logs/namesrv.pid)"
"""
        
        with open(nameserver_script, 'w') as f:
            f.write(nameserver_content)
        nameserver_script.chmod(0o755)
        
        # Broker启动脚本
        broker_script = scripts_dir / "start-broker.sh"
        broker_content = f"""#!/bin/bash
# RocketMQ Broker启动脚本

if [ $# -lt 1 ]; then
    echo "Usage: $0 <config-file>"
    exit 1
fi

CONFIG_FILE=$1

export ROCKETMQ_HOME={self.install_dir}
export JAVA_HOME=${{JAVA_HOME:-/usr/lib/jvm/java-8-openjdk}}

# JVM参数
export JAVA_OPT="-server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
export JAVA_OPT="$JAVA_OPT -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection"
export JAVA_OPT="$JAVA_OPT -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled"
export JAVA_OPT="$JAVA_OPT -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled"
export JAVA_OPT="$JAVA_OPT -XX:SurvivorRatio=8 -XX:-UseParNewGC"
export JAVA_OPT="$JAVA_OPT -verbose:gc -Xloggc:$ROCKETMQ_HOME/logs/rmq_broker_gc.log"
export JAVA_OPT="$JAVA_OPT -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
export JAVA_OPT="$JAVA_OPT -Djava.ext.dirs=$JAVA_HOME/jre/lib/ext:$ROCKETMQ_HOME/lib"
export JAVA_OPT="$JAVA_OPT -cp $ROCKETMQ_HOME/conf:$CLASSPATH"

echo "Starting RocketMQ Broker with config: $CONFIG_FILE"
nohup java $JAVA_OPT org.apache.rocketmq.broker.BrokerStartup -c $CONFIG_FILE > $ROCKETMQ_HOME/logs/broker.log 2>&1 &
echo $! > $ROCKETMQ_HOME/logs/broker.pid
echo "Broker started, PID: $(cat $ROCKETMQ_HOME/logs/broker.pid)"
"""
        
        with open(broker_script, 'w') as f:
            f.write(broker_content)
        broker_script.chmod(0o755)
        
        # 停止脚本
        stop_script = scripts_dir / "stop-all.sh"
        stop_content = f"""#!/bin/bash
# RocketMQ停止脚本

export ROCKETMQ_HOME={self.install_dir}

echo "Stopping RocketMQ services..."

# 停止Broker
if [ -f $ROCKETMQ_HOME/logs/broker.pid ]; then
    BROKER_PID=$(cat $ROCKETMQ_HOME/logs/broker.pid)
    if ps -p $BROKER_PID > /dev/null; then
        echo "Stopping Broker (PID: $BROKER_PID)..."
        kill $BROKER_PID
        sleep 5
        if ps -p $BROKER_PID > /dev/null; then
            echo "Force killing Broker..."
            kill -9 $BROKER_PID
        fi
    fi
    rm -f $ROCKETMQ_HOME/logs/broker.pid
fi

# 停止NameServer
if [ -f $ROCKETMQ_HOME/logs/namesrv.pid ]; then
    NAMESRV_PID=$(cat $ROCKETMQ_HOME/logs/namesrv.pid)
    if ps -p $NAMESRV_PID > /dev/null; then
        echo "Stopping NameServer (PID: $NAMESRV_PID)..."
        kill $NAMESRV_PID
        sleep 5
        if ps -p $NAMESRV_PID > /dev/null; then
            echo "Force killing NameServer..."
            kill -9 $NAMESRV_PID
        fi
    fi
    rm -f $ROCKETMQ_HOME/logs/namesrv.pid
fi

echo "RocketMQ services stopped."
"""
        
        with open(stop_script, 'w') as f:
            f.write(stop_content)
        stop_script.chmod(0o755)
        
        print(f"服务脚本创建完成: {scripts_dir}")
    
    def install(self, package_type: str = "binary") -> bool:
        """执行完整安装流程"""
        try:
            print("开始RocketMQ安装流程...")
            
            # 1. 创建目录
            self.create_directories()
            
            # 2. 下载软件包
            package_path = self.download_rocketmq(package_type)
            
            # 3. 解压软件包
            extracted_dir = self.extract_package(package_path)
            
            # 4. 安装软件
            self.install_rocketmq(extracted_dir)
            
            # 5. 创建服务脚本
            self.create_service_scripts()
            
            print(f"\n✅ RocketMQ安装完成!")
            print(f"安装目录: {self.install_dir}")
            print(f"启动脚本: {self.install_dir}/scripts/")
            print(f"配置文件: {self.install_dir}/conf/")
            print(f"日志目录: {self.install_dir}/logs/")
            
            return True
            
        except Exception as e:
            print(f"❌ 安装失败: {str(e)}")
            return False
    
    def uninstall(self) -> bool:
        """卸载RocketMQ"""
        try:
            print(f"开始卸载RocketMQ: {self.install_dir}")
            
            # 停止服务
            stop_script = self.install_dir / "scripts" / "stop-all.sh"
            if stop_script.exists():
                subprocess.run([str(stop_script)], check=False)
            
            # 删除安装目录
            if self.install_dir.exists():
                shutil.rmtree(self.install_dir)
                print(f"删除安装目录: {self.install_dir}")
            
            # 清理下载目录
            if self.download_dir.exists():
                shutil.rmtree(self.download_dir)
                print(f"清理下载目录: {self.download_dir}")
            
            print("✅ RocketMQ卸载完成")
            return True
            
        except Exception as e:
            print(f"❌ 卸载失败: {str(e)}")
            return False

class ConfigurationGenerator:
    """配置文件生成器"""
    
    def __init__(self, install_dir: str = "/opt/rocketmq"):
        self.install_dir = Path(install_dir)
        self.conf_dir = self.install_dir / "conf"
    
    def generate_nameserver_config(self, listen_port: int = 9876) -> Path:
        """生成NameServer配置"""
        config_content = f"""# NameServer配置文件

# 监听端口
listenPort={listen_port}

# 服务器回调线程池线程数量
serverCallbackExecutorThreads=8

# 服务器Selector线程池线程数量,默认3,建议设置为cpu核数
serverSelectorThreads=3

# 服务器Worker线程池线程数量,默认8,建议设置为cpu核数
serverWorkerThreads=8

# 服务器OneWay调用信号量值,默认256
serverOnewaySemaphoreValue=256

# 服务器异步调用信号量值,默认64
serverAsyncSemaphoreValue=64

# 扫描不活跃的Broker间隔时间,默认5秒
scanNotActiveBrokerInterval=5000

# 删除Topic配置表中不活跃的Topic间隔时间,默认10分钟
deleteTopicByQueueData=600000
"""
        
        config_path = self.conf_dir / "namesrv.properties"
        with open(config_path, 'w') as f:
            f.write(config_content)
        
        print(f"NameServer配置生成: {config_path}")
        return config_path
    
    def generate_broker_config(self, broker_config: BrokerConfig) -> Path:
        """生成Broker配置"""
        config_content = broker_config.to_properties()
        
        # 添加额外的性能和可靠性配置
        additional_config = f"""

# 消息存储配置
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000

# 刷盘配置
flushIntervalCommitLog=500
flushCommitLogTimed=false
flushIntervalConsumeQueue=1000
flushConsumeQueueTimed=false

# 清理文件配置
cleanFileForciblyEnable=true
cleanResourceInterval=10000
cleanCommitLogInterval=10000

# 事务配置
transactionTimeOut=6000
transactionCheckMax=15
transactionCheckInterval=60000

# 消息配置
maxMessageSize=65536
checkCRCOnRecover=true

# 网络配置
sendMessageThreadPoolNums=16
pullMessageThreadPoolNums=32
queryMessageThreadPoolNums=8
adminBrokerThreadPoolNums=16
clientManagerThreadPoolNums=32
consumerManagerThreadPoolNums=32
heartbeatThreadPoolNums=8
endTransactionThreadPoolNums=12

# 其他配置
waitTimeMillsInSendQueue=200
waitTimeMillsInPullQueue=5000
waitTimeMillsInHeartbeatQueue=31000
waitTimeMillsInTransactionQueue=3000
"""
        
        config_content += additional_config
        
        config_filename = f"broker-{broker_config.broker_name}-{broker_config.broker_id}.properties"
        config_path = self.conf_dir / config_filename
        
        with open(config_path, 'w') as f:
            f.write(config_content)
        
        print(f"Broker配置生成: {config_path}")
        return config_path
    
    def generate_cluster_configs(self, cluster: RocketMQClusterArchitecture) -> Dict[str, Path]:
        """为整个集群生成配置文件"""
        configs = {}
        
        # 生成NameServer配置
        if cluster.name_servers:
            ns_config = self.generate_nameserver_config()
            configs["nameserver"] = ns_config
        
        # 生成Broker配置
        broker_configs = {}
        for broker_name, broker_nodes in cluster.brokers.items():
            for node in broker_nodes:
                # 从node_id中提取broker_id
                broker_id = int(node.node_id.split('_')[2])
                broker_config = cluster.generate_broker_config(broker_name, broker_id)
                config_path = self.generate_broker_config(broker_config)
                broker_configs[f"{broker_name}-{broker_id}"] = config_path
        
        configs["brokers"] = broker_configs
        
        return configs

# 使用示例
if __name__ == "__main__":
    # 安装RocketMQ
    installer = RocketMQInstaller("/opt/rocketmq")
    
    print("开始安装RocketMQ...")
    if installer.install():
        print("\n生成集群配置...")
        
        # 创建集群
        cluster = ClusterDeploymentMode.create_production_cluster("prod-cluster")
        
        # 生成配置文件
        config_generator = ConfigurationGenerator("/opt/rocketmq")
        configs = config_generator.generate_cluster_configs(cluster)
        
        print("\n配置文件生成完成:")
        for config_type, config_info in configs.items():
            if isinstance(config_info, dict):
                print(f"{config_type}:")
                for name, path in config_info.items():
                    print(f"  {name}: {path}")
            else:
                print(f"{config_type}: {config_info}")
    else:
         print("安装失败,请检查错误信息")

6.3 集群启动与管理

6.3.1 集群启动流程

import subprocess
import time
import socket
from typing import List, Dict, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed

class ClusterManager:
    """集群管理器"""
    
    def __init__(self, install_dir: str = "/opt/rocketmq"):
        self.install_dir = Path(install_dir)
        self.scripts_dir = self.install_dir / "scripts"
        self.logs_dir = self.install_dir / "logs"
        self.conf_dir = self.install_dir / "conf"
        
    def check_port_available(self, host: str, port: int) -> bool:
        """检查端口是否可用"""
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
                sock.settimeout(3)
                result = sock.connect_ex((host, port))
                return result != 0  # 0表示端口被占用
        except Exception:
            return False
    
    def wait_for_service(self, host: str, port: int, timeout: int = 60) -> bool:
        """等待服务启动"""
        start_time = time.time()
        while time.time() - start_time < timeout:
            if not self.check_port_available(host, port):
                return True
            time.sleep(2)
        return False
    
    def start_nameserver(self, host: str = "localhost", port: int = 9876) -> bool:
        """启动NameServer"""
        print(f"启动NameServer: {host}:{port}")
        
        # 检查端口是否被占用
        if not self.check_port_available(host, port):
            print(f"端口 {port} 已被占用")
            return False
        
        try:
            # 启动NameServer
            start_script = self.scripts_dir / "start-nameserver.sh"
            if not start_script.exists():
                raise FileNotFoundError(f"启动脚本不存在: {start_script}")
            
            result = subprocess.run(
                [str(start_script)],
                capture_output=True,
                text=True,
                timeout=30
            )
            
            if result.returncode != 0:
                print(f"NameServer启动失败: {result.stderr}")
                return False
            
            # 等待服务启动
            if self.wait_for_service(host, port):
                print(f"✅ NameServer启动成功: {host}:{port}")
                return True
            else:
                print(f"❌ NameServer启动超时: {host}:{port}")
                return False
                
        except Exception as e:
            print(f"❌ NameServer启动异常: {str(e)}")
            return False
    
    def start_broker(self, config_file: str, host: str = "localhost", port: int = 10911) -> bool:
        """启动Broker"""
        print(f"启动Broker: {config_file}")
        
        # 检查配置文件
        config_path = self.conf_dir / config_file
        if not config_path.exists():
            print(f"配置文件不存在: {config_path}")
            return False
        
        # 检查端口是否被占用
        if not self.check_port_available(host, port):
            print(f"端口 {port} 已被占用")
            return False
        
        try:
            # 启动Broker
            start_script = self.scripts_dir / "start-broker.sh"
            if not start_script.exists():
                raise FileNotFoundError(f"启动脚本不存在: {start_script}")
            
            result = subprocess.run(
                [str(start_script), str(config_path)],
                capture_output=True,
                text=True,
                timeout=60
            )
            
            if result.returncode != 0:
                print(f"Broker启动失败: {result.stderr}")
                return False
            
            # 等待服务启动
            if self.wait_for_service(host, port):
                print(f"✅ Broker启动成功: {config_file}")
                return True
            else:
                print(f"❌ Broker启动超时: {config_file}")
                return False
                
        except Exception as e:
            print(f"❌ Broker启动异常: {str(e)}")
            return False
    
    def start_cluster(self, cluster: RocketMQClusterArchitecture) -> bool:
        """启动整个集群"""
        print(f"开始启动集群: {cluster.cluster_name}")
        
        success_count = 0
        total_services = len(cluster.name_servers) + sum(len(brokers) for brokers in cluster.brokers.values())
        
        # 1. 启动所有NameServer
        print("\n=== 启动NameServer ===")
        for ns_node in cluster.name_servers:
            if self.start_nameserver(ns_node.host, 9876):
                success_count += 1
            time.sleep(5)  # 等待NameServer完全启动
        
        # 2. 启动所有Broker
        print("\n=== 启动Broker ===")
        for broker_name, broker_nodes in cluster.brokers.items():
            for node in broker_nodes:
                broker_id = int(node.node_id.split('_')[2])
                config_file = f"broker-{broker_name}-{broker_id}.properties"
                if self.start_broker(config_file, node.host, 10911):
                    success_count += 1
                time.sleep(3)  # 等待Broker注册到NameServer
        
        # 3. 验证集群状态
        print("\n=== 验证集群状态 ===")
        time.sleep(10)  # 等待所有服务完全启动
        cluster_status = self.check_cluster_status(cluster)
        
        if success_count == total_services and cluster_status:
            print(f"\n✅ 集群启动成功! ({success_count}/{total_services})")
            return True
        else:
            print(f"\n❌ 集群启动失败! ({success_count}/{total_services})")
            return False
    
    def stop_cluster(self) -> bool:
        """停止集群"""
        print("停止RocketMQ集群...")
        
        try:
            stop_script = self.scripts_dir / "stop-all.sh"
            if not stop_script.exists():
                print(f"停止脚本不存在: {stop_script}")
                return False
            
            result = subprocess.run(
                [str(stop_script)],
                capture_output=True,
                text=True,
                timeout=30
            )
            
            if result.returncode == 0:
                print("✅ 集群停止成功")
                return True
            else:
                print(f"❌ 集群停止失败: {result.stderr}")
                return False
                
        except Exception as e:
            print(f"❌ 集群停止异常: {str(e)}")
            return False
    
    def restart_cluster(self, cluster: RocketMQClusterArchitecture) -> bool:
        """重启集群"""
        print("重启RocketMQ集群...")
        
        # 先停止
        if not self.stop_cluster():
            return False
        
        # 等待完全停止
        time.sleep(10)
        
        # 再启动
        return self.start_cluster(cluster)
    
    def check_cluster_status(self, cluster: RocketMQClusterArchitecture) -> bool:
        """检查集群状态"""
        print("检查集群状态...")
        
        all_healthy = True
        
        # 检查NameServer
        print("\nNameServer状态:")
        for ns_node in cluster.name_servers:
            if self.check_port_available(ns_node.host, 9876):
                print(f"  ❌ {ns_node.host}:9876 - 不可达")
                all_healthy = False
            else:
                print(f"  ✅ {ns_node.host}:9876 - 正常")
        
        # 检查Broker
        print("\nBroker状态:")
        for broker_name, broker_nodes in cluster.brokers.items():
            for node in broker_nodes:
                if self.check_port_available(node.host, 10911):
                    print(f"  ❌ {broker_name}({node.host}:10911) - 不可达")
                    all_healthy = False
                else:
                    print(f"  ✅ {broker_name}({node.host}:10911) - 正常")
        
        return all_healthy
    
    def get_cluster_info(self) -> Dict:
        """获取集群信息"""
        try:
            # 使用RocketMQ管理工具获取集群信息
            mqadmin_script = self.install_dir / "bin" / "mqadmin"
            if not mqadmin_script.exists():
                return {"error": "mqadmin工具不存在"}
            
            # 获取集群列表
            result = subprocess.run(
                [str(mqadmin_script), "clusterList", "-n", "localhost:9876"],
                capture_output=True,
                text=True,
                timeout=30
            )
            
            if result.returncode == 0:
                return {
                    "status": "success",
                    "cluster_info": result.stdout,
                    "timestamp": time.time()
                }
            else:
                return {
                    "status": "error",
                    "error": result.stderr,
                    "timestamp": time.time()
                }
                
        except Exception as e:
            return {
                "status": "error",
                "error": str(e),
                "timestamp": time.time()
            }

class ClusterHealthChecker:
    """集群健康检查器"""
    
    def __init__(self, cluster_manager: ClusterManager):
        self.cluster_manager = cluster_manager
        self.health_history = []
    
    def check_service_health(self, host: str, port: int, service_name: str) -> Dict:
        """检查单个服务健康状态"""
        start_time = time.time()
        
        # 检查端口连通性
        port_available = self.cluster_manager.check_port_available(host, port)
        response_time = (time.time() - start_time) * 1000  # 毫秒
        
        return {
            "service": service_name,
            "host": host,
            "port": port,
            "healthy": not port_available,  # 端口不可用表示服务正在运行
            "response_time": response_time,
            "timestamp": time.time()
        }
    
    def check_cluster_health(self, cluster: RocketMQClusterArchitecture) -> Dict:
        """检查整个集群健康状态"""
        health_report = {
            "cluster_name": cluster.cluster_name,
            "timestamp": time.time(),
            "nameservers": [],
            "brokers": [],
            "overall_healthy": True
        }
        
        # 检查NameServer
        for ns_node in cluster.name_servers:
            ns_health = self.check_service_health(ns_node.host, 9876, f"NameServer-{ns_node.node_id}")
            health_report["nameservers"].append(ns_health)
            if not ns_health["healthy"]:
                health_report["overall_healthy"] = False
        
        # 检查Broker
        for broker_name, broker_nodes in cluster.brokers.items():
            for node in broker_nodes:
                broker_health = self.check_service_health(node.host, 10911, f"Broker-{broker_name}-{node.node_id}")
                health_report["brokers"].append(broker_health)
                if not broker_health["healthy"]:
                    health_report["overall_healthy"] = False
        
        # 记录健康历史
        self.health_history.append(health_report)
        
        # 只保留最近100次检查记录
        if len(self.health_history) > 100:
            self.health_history = self.health_history[-100:]
        
        return health_report
    
    def get_health_summary(self) -> Dict:
        """获取健康状态摘要"""
        if not self.health_history:
            return {"error": "没有健康检查历史"}
        
        latest = self.health_history[-1]
        
        # 计算可用性统计
        total_services = len(latest["nameservers"]) + len(latest["brokers"])
        healthy_services = sum(1 for ns in latest["nameservers"] if ns["healthy"]) + \
                          sum(1 for broker in latest["brokers"] if broker["healthy"])
        
        availability = (healthy_services / total_services * 100) if total_services > 0 else 0
        
        # 计算平均响应时间
        all_services = latest["nameservers"] + latest["brokers"]
        avg_response_time = sum(service["response_time"] for service in all_services) / len(all_services) if all_services else 0
        
        return {
            "cluster_name": latest["cluster_name"],
            "overall_healthy": latest["overall_healthy"],
            "availability_percentage": round(availability, 2),
            "total_services": total_services,
            "healthy_services": healthy_services,
            "average_response_time_ms": round(avg_response_time, 2),
            "last_check_time": latest["timestamp"],
            "check_history_count": len(self.health_history)
        }
    
    def continuous_health_check(self, cluster: RocketMQClusterArchitecture, interval: int = 30, duration: int = 300):
        """持续健康检查"""
        print(f"开始持续健康检查,间隔: {interval}秒,持续: {duration}秒")
        
        start_time = time.time()
        check_count = 0
        
        while time.time() - start_time < duration:
            check_count += 1
            print(f"\n=== 第 {check_count} 次健康检查 ===")
            
            health_report = self.check_cluster_health(cluster)
            summary = self.get_health_summary()
            
            print(f"集群状态: {'✅ 健康' if health_report['overall_healthy'] else '❌ 异常'}")
            print(f"可用性: {summary['availability_percentage']}%")
            print(f"平均响应时间: {summary['average_response_time_ms']}ms")
            
            # 如果集群不健康,打印详细信息
            if not health_report["overall_healthy"]:
                print("\n异常服务:")
                for ns in health_report["nameservers"]:
                    if not ns["healthy"]:
                        print(f"  ❌ {ns['service']} ({ns['host']}:{ns['port']})")
                for broker in health_report["brokers"]:
                    if not broker["healthy"]:
                        print(f"  ❌ {broker['service']} ({broker['host']}:{broker['port']})")
            
            time.sleep(interval)
        
        print(f"\n健康检查完成,共检查 {check_count} 次")

# 使用示例
if __name__ == "__main__":
    # 创建集群管理器
    cluster_manager = ClusterManager("/opt/rocketmq")
    
    # 创建集群
    cluster = ClusterDeploymentMode.create_production_cluster("prod-cluster")
    
    # 启动集群
    print("启动集群...")
    if cluster_manager.start_cluster(cluster):
        print("\n集群启动成功,开始健康检查...")
        
        # 创建健康检查器
        health_checker = ClusterHealthChecker(cluster_manager)
        
        # 执行一次健康检查
        health_report = health_checker.check_cluster_health(cluster)
        summary = health_checker.get_health_summary()
        
        print(f"\n集群健康摘要:")
        print(f"  集群名称: {summary['cluster_name']}")
        print(f"  整体状态: {'健康' if summary['overall_healthy'] else '异常'}")
        print(f"  可用性: {summary['availability_percentage']}%")
        print(f"  服务总数: {summary['total_services']}")
        print(f"  健康服务: {summary['healthy_services']}")
        print(f"  平均响应时间: {summary['average_response_time_ms']}ms")
        
        # 可选:执行持续健康检查
         # health_checker.continuous_health_check(cluster, interval=30, duration=300)
     else:
         print("集群启动失败")

6.4 监控与运维

6.4.1 性能监控

import psutil
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict

@dataclass
class SystemMetrics:
    """系统指标"""
    timestamp: float
    cpu_percent: float
    memory_percent: float
    memory_used_gb: float
    memory_total_gb: float
    disk_usage_percent: float
    disk_used_gb: float
    disk_total_gb: float
    network_bytes_sent: int
    network_bytes_recv: int
    load_average: List[float]

@dataclass
class RocketMQMetrics:
    """RocketMQ指标"""
    timestamp: float
    nameserver_count: int
    broker_count: int
    topic_count: int
    queue_count: int
    producer_count: int
    consumer_count: int
    message_produce_rate: float
    message_consume_rate: float
    message_accumulation: int
    broker_disk_usage: Dict[str, float]
    broker_memory_usage: Dict[str, float]

class MetricsCollector:
    """指标收集器"""
    
    def __init__(self, cluster_manager: ClusterManager):
        self.cluster_manager = cluster_manager
        self.metrics_history = []
        self.max_history_size = 1000
    
    def collect_system_metrics(self) -> SystemMetrics:
        """收集系统指标"""
        # CPU使用率
        cpu_percent = psutil.cpu_percent(interval=1)
        
        # 内存使用情况
        memory = psutil.virtual_memory()
        memory_used_gb = memory.used / (1024**3)
        memory_total_gb = memory.total / (1024**3)
        
        # 磁盘使用情况
        disk = psutil.disk_usage('/')
        disk_used_gb = disk.used / (1024**3)
        disk_total_gb = disk.total / (1024**3)
        
        # 网络使用情况
        network = psutil.net_io_counters()
        
        # 系统负载
        load_avg = psutil.getloadavg() if hasattr(psutil, 'getloadavg') else [0.0, 0.0, 0.0]
        
        return SystemMetrics(
            timestamp=time.time(),
            cpu_percent=cpu_percent,
            memory_percent=memory.percent,
            memory_used_gb=round(memory_used_gb, 2),
            memory_total_gb=round(memory_total_gb, 2),
            disk_usage_percent=disk.percent,
            disk_used_gb=round(disk_used_gb, 2),
            disk_total_gb=round(disk_total_gb, 2),
            network_bytes_sent=network.bytes_sent,
            network_bytes_recv=network.bytes_recv,
            load_average=list(load_avg)
        )
    
    def collect_rocketmq_metrics(self, cluster: RocketMQClusterArchitecture) -> RocketMQMetrics:
        """收集RocketMQ指标"""
        try:
            # 获取集群信息
            cluster_info = self.cluster_manager.get_cluster_info()
            
            # 解析集群信息(这里简化处理)
            nameserver_count = len(cluster.name_servers)
            broker_count = sum(len(brokers) for brokers in cluster.brokers.values())
            
            # 模拟其他指标(实际应该通过RocketMQ管理接口获取)
            topic_count = self._get_topic_count()
            queue_count = self._get_queue_count()
            producer_count = self._get_producer_count()
            consumer_count = self._get_consumer_count()
            message_produce_rate = self._get_message_produce_rate()
            message_consume_rate = self._get_message_consume_rate()
            message_accumulation = self._get_message_accumulation()
            broker_disk_usage = self._get_broker_disk_usage(cluster)
            broker_memory_usage = self._get_broker_memory_usage(cluster)
            
            return RocketMQMetrics(
                timestamp=time.time(),
                nameserver_count=nameserver_count,
                broker_count=broker_count,
                topic_count=topic_count,
                queue_count=queue_count,
                producer_count=producer_count,
                consumer_count=consumer_count,
                message_produce_rate=message_produce_rate,
                message_consume_rate=message_consume_rate,
                message_accumulation=message_accumulation,
                broker_disk_usage=broker_disk_usage,
                broker_memory_usage=broker_memory_usage
            )
            
        except Exception as e:
            print(f"收集RocketMQ指标失败: {str(e)}")
            return RocketMQMetrics(
                timestamp=time.time(),
                nameserver_count=0,
                broker_count=0,
                topic_count=0,
                queue_count=0,
                producer_count=0,
                consumer_count=0,
                message_produce_rate=0.0,
                message_consume_rate=0.0,
                message_accumulation=0,
                broker_disk_usage={},
                broker_memory_usage={}
            )
    
    def _get_topic_count(self) -> int:
        """获取Topic数量"""
        try:
            mqadmin_script = self.cluster_manager.install_dir / "bin" / "mqadmin"
            result = subprocess.run(
                [str(mqadmin_script), "topicList", "-n", "localhost:9876"],
                capture_output=True,
                text=True,
                timeout=10
            )
            if result.returncode == 0:
                # 解析输出获取Topic数量
                lines = result.stdout.strip().split('\n')
                return len([line for line in lines if line.strip() and not line.startswith('#')])
            return 0
        except Exception:
            return 0
    
    def _get_queue_count(self) -> int:
        """获取队列数量"""
        # 简化实现,实际应该通过管理接口获取
        return self._get_topic_count() * 4  # 假设每个Topic有4个队列
    
    def _get_producer_count(self) -> int:
        """获取生产者数量"""
        try:
            mqadmin_script = self.cluster_manager.install_dir / "bin" / "mqadmin"
            result = subprocess.run(
                [str(mqadmin_script), "producerConnection", "-n", "localhost:9876"],
                capture_output=True,
                text=True,
                timeout=10
            )
            if result.returncode == 0:
                # 解析输出获取生产者数量
                return result.stdout.count('ClientId:')
            return 0
        except Exception:
            return 0
    
    def _get_consumer_count(self) -> int:
        """获取消费者数量"""
        try:
            mqladmin_script = self.cluster_manager.install_dir / "bin" / "mqadmin"
            result = subprocess.run(
                [str(mqladmin_script), "consumerConnection", "-n", "localhost:9876"],
                capture_output=True,
                text=True,
                timeout=10
            )
            if result.returncode == 0:
                # 解析输出获取消费者数量
                return result.stdout.count('ClientId:')
            return 0
        except Exception:
            return 0
    
    def _get_message_produce_rate(self) -> float:
        """获取消息生产速率(消息/秒)"""
        # 简化实现,实际应该通过监控数据计算
        if len(self.metrics_history) >= 2:
            current = self.metrics_history[-1]
            previous = self.metrics_history[-2]
            if hasattr(current, 'total_produced_messages') and hasattr(previous, 'total_produced_messages'):
                time_diff = current.timestamp - previous.timestamp
                message_diff = current.total_produced_messages - previous.total_produced_messages
                return message_diff / time_diff if time_diff > 0 else 0.0
        return 0.0
    
    def _get_message_consume_rate(self) -> float:
        """获取消息消费速率(消息/秒)"""
        # 简化实现,实际应该通过监控数据计算
        if len(self.metrics_history) >= 2:
            current = self.metrics_history[-1]
            previous = self.metrics_history[-2]
            if hasattr(current, 'total_consumed_messages') and hasattr(previous, 'total_consumed_messages'):
                time_diff = current.timestamp - previous.timestamp
                message_diff = current.total_consumed_messages - previous.total_consumed_messages
                return message_diff / time_diff if time_diff > 0 else 0.0
        return 0.0
    
    def _get_message_accumulation(self) -> int:
        """获取消息堆积数量"""
        try:
            mqladmin_script = self.cluster_manager.install_dir / "bin" / "mqadmin"
            result = subprocess.run(
                [str(mqladmin_script), "consumerProgress", "-n", "localhost:9876"],
                capture_output=True,
                text=True,
                timeout=10
            )
            if result.returncode == 0:
                # 解析输出获取堆积消息数量
                total_diff = 0
                for line in result.stdout.split('\n'):
                    if 'Diff' in line:
                        try:
                            diff = int(line.split()[-1])
                            total_diff += diff
                        except (ValueError, IndexError):
                            continue
                return total_diff
            return 0
        except Exception:
            return 0
    
    def _get_broker_disk_usage(self, cluster: RocketMQClusterArchitecture) -> Dict[str, float]:
        """获取Broker磁盘使用率"""
        disk_usage = {}
        for broker_name, broker_nodes in cluster.brokers.items():
            for node in broker_nodes:
                try:
                    # 简化实现,实际应该通过JMX或管理接口获取
                    disk = psutil.disk_usage(self.cluster_manager.install_dir / "store")
                    disk_usage[f"{broker_name}-{node.node_id}"] = disk.percent
                except Exception:
                    disk_usage[f"{broker_name}-{node.node_id}"] = 0.0
        return disk_usage
    
    def _get_broker_memory_usage(self, cluster: RocketMQClusterArchitecture) -> Dict[str, float]:
        """获取Broker内存使用率"""
        memory_usage = {}
        for broker_name, broker_nodes in cluster.brokers.items():
            for node in broker_nodes:
                try:
                    # 简化实现,实际应该通过JMX获取JVM内存使用情况
                    memory = psutil.virtual_memory()
                    memory_usage[f"{broker_name}-{node.node_id}"] = memory.percent
                except Exception:
                    memory_usage[f"{broker_name}-{node.node_id}"] = 0.0
        return memory_usage
    
    def collect_all_metrics(self, cluster: RocketMQClusterArchitecture) -> Dict:
        """收集所有指标"""
        system_metrics = self.collect_system_metrics()
        rocketmq_metrics = self.collect_rocketmq_metrics(cluster)
        
        combined_metrics = {
            "timestamp": time.time(),
            "system": asdict(system_metrics),
            "rocketmq": asdict(rocketmq_metrics)
        }
        
        # 保存到历史记录
        self.metrics_history.append(combined_metrics)
        
        # 限制历史记录大小
        if len(self.metrics_history) > self.max_history_size:
            self.metrics_history = self.metrics_history[-self.max_history_size:]
        
        return combined_metrics
    
    def get_metrics_summary(self, duration_minutes: int = 60) -> Dict:
        """获取指标摘要"""
        if not self.metrics_history:
            return {"error": "没有指标历史"}
        
        # 获取指定时间范围内的指标
        cutoff_time = time.time() - (duration_minutes * 60)
        recent_metrics = [m for m in self.metrics_history if m["timestamp"] >= cutoff_time]
        
        if not recent_metrics:
            return {"error": f"没有最近{duration_minutes}分钟的指标数据"}
        
        # 计算平均值
        system_metrics = [m["system"] for m in recent_metrics]
        rocketmq_metrics = [m["rocketmq"] for m in recent_metrics]
        
        avg_cpu = sum(m["cpu_percent"] for m in system_metrics) / len(system_metrics)
        avg_memory = sum(m["memory_percent"] for m in system_metrics) / len(system_metrics)
        avg_disk = sum(m["disk_usage_percent"] for m in system_metrics) / len(system_metrics)
        
        avg_produce_rate = sum(m["message_produce_rate"] for m in rocketmq_metrics) / len(rocketmq_metrics)
        avg_consume_rate = sum(m["message_consume_rate"] for m in rocketmq_metrics) / len(rocketmq_metrics)
        avg_accumulation = sum(m["message_accumulation"] for m in rocketmq_metrics) / len(rocketmq_metrics)
        
        return {
            "duration_minutes": duration_minutes,
            "sample_count": len(recent_metrics),
            "system_summary": {
                "avg_cpu_percent": round(avg_cpu, 2),
                "avg_memory_percent": round(avg_memory, 2),
                "avg_disk_percent": round(avg_disk, 2)
            },
            "rocketmq_summary": {
                "avg_produce_rate": round(avg_produce_rate, 2),
                "avg_consume_rate": round(avg_consume_rate, 2),
                "avg_accumulation": round(avg_accumulation, 2),
                "latest_topic_count": rocketmq_metrics[-1]["topic_count"],
                "latest_producer_count": rocketmq_metrics[-1]["producer_count"],
                "latest_consumer_count": rocketmq_metrics[-1]["consumer_count"]
            },
            "timestamp": time.time()
        }

class AlertManager:
    """告警管理器"""
    
    def __init__(self, metrics_collector: MetricsCollector):
        self.metrics_collector = metrics_collector
        self.alert_rules = []
        self.alert_history = []
        self.max_alert_history = 500
    
    def add_alert_rule(self, name: str, condition: callable, severity: str = "warning", description: str = ""):
        """添加告警规则"""
        rule = {
            "name": name,
            "condition": condition,
            "severity": severity,
            "description": description,
            "enabled": True,
            "created_at": time.time()
        }
        self.alert_rules.append(rule)
        print(f"添加告警规则: {name} ({severity})")
    
    def check_alerts(self, metrics: Dict) -> List[Dict]:
        """检查告警"""
        alerts = []
        
        for rule in self.alert_rules:
            if not rule["enabled"]:
                continue
            
            try:
                if rule["condition"](metrics):
                    alert = {
                        "rule_name": rule["name"],
                        "severity": rule["severity"],
                        "description": rule["description"],
                        "timestamp": time.time(),
                        "metrics_snapshot": metrics
                    }
                    alerts.append(alert)
                    
                    # 记录告警历史
                    self.alert_history.append(alert)
                    
                    # 限制历史记录大小
                    if len(self.alert_history) > self.max_alert_history:
                        self.alert_history = self.alert_history[-self.max_alert_history:]
                    
                    print(f"🚨 告警触发: {rule['name']} - {rule['description']}")
                    
            except Exception as e:
                print(f"告警规则检查失败 {rule['name']}: {str(e)}")
        
        return alerts
    
    def setup_default_alert_rules(self):
        """设置默认告警规则"""
        # CPU使用率告警
        self.add_alert_rule(
            name="high_cpu_usage",
            condition=lambda m: m["system"]["cpu_percent"] > 80,
            severity="warning",
            description="CPU使用率超过80%"
        )
        
        # 内存使用率告警
        self.add_alert_rule(
            name="high_memory_usage",
            condition=lambda m: m["system"]["memory_percent"] > 85,
            severity="warning",
            description="内存使用率超过85%"
        )
        
        # 磁盘使用率告警
        self.add_alert_rule(
            name="high_disk_usage",
            condition=lambda m: m["system"]["disk_usage_percent"] > 90,
            severity="critical",
            description="磁盘使用率超过90%"
        )
        
        # 消息堆积告警
        self.add_alert_rule(
            name="message_accumulation",
            condition=lambda m: m["rocketmq"]["message_accumulation"] > 10000,
            severity="warning",
            description="消息堆积超过10000条"
        )
        
        # Broker离线告警
        self.add_alert_rule(
            name="broker_offline",
            condition=lambda m: m["rocketmq"]["broker_count"] == 0,
            severity="critical",
            description="所有Broker离线"
        )
        
        # 生产消费速率不匹配告警
        self.add_alert_rule(
            name="produce_consume_rate_mismatch",
            condition=lambda m: (m["rocketmq"]["message_produce_rate"] > 0 and 
                                m["rocketmq"]["message_consume_rate"] < m["rocketmq"]["message_produce_rate"] * 0.5),
            severity="warning",
            description="消费速率明显低于生产速率"
        )
        
        print(f"设置了 {len(self.alert_rules)} 个默认告警规则")
    
    def get_alert_summary(self, duration_hours: int = 24) -> Dict:
        """获取告警摘要"""
        cutoff_time = time.time() - (duration_hours * 3600)
        recent_alerts = [a for a in self.alert_history if a["timestamp"] >= cutoff_time]
        
        # 按严重程度统计
        severity_count = {}
        for alert in recent_alerts:
            severity = alert["severity"]
            severity_count[severity] = severity_count.get(severity, 0) + 1
        
        # 按规则统计
        rule_count = {}
        for alert in recent_alerts:
            rule_name = alert["rule_name"]
            rule_count[rule_name] = rule_count.get(rule_name, 0) + 1
        
        return {
            "duration_hours": duration_hours,
            "total_alerts": len(recent_alerts),
            "severity_breakdown": severity_count,
            "rule_breakdown": rule_count,
            "latest_alerts": recent_alerts[-10:] if recent_alerts else [],
            "timestamp": time.time()
        }

# 使用示例
if __name__ == "__main__":
    # 创建集群管理器和指标收集器
    cluster_manager = ClusterManager("/opt/rocketmq")
    metrics_collector = MetricsCollector(cluster_manager)
    alert_manager = AlertManager(metrics_collector)
    
    # 设置默认告警规则
    alert_manager.setup_default_alert_rules()
    
    # 创建集群
    cluster = ClusterDeploymentMode.create_production_cluster("prod-cluster")
    
    print("开始监控集群...")
    
    # 持续监控
    for i in range(10):  # 监控10次
        print(f"\n=== 第 {i+1} 次监控 ===")
        
        # 收集指标
        metrics = metrics_collector.collect_all_metrics(cluster)
        
        # 检查告警
        alerts = alert_manager.check_alerts(metrics)
        
        # 打印当前状态
        print(f"系统CPU: {metrics['system']['cpu_percent']}%")
        print(f"系统内存: {metrics['system']['memory_percent']}%")
        print(f"系统磁盘: {metrics['system']['disk_usage_percent']}%")
        print(f"消息生产速率: {metrics['rocketmq']['message_produce_rate']} msg/s")
        print(f"消息消费速率: {metrics['rocketmq']['message_consume_rate']} msg/s")
        print(f"消息堆积: {metrics['rocketmq']['message_accumulation']} 条")
        
        if alerts:
            print(f"触发告警: {len(alerts)} 个")
        else:
            print("无告警")
        
        time.sleep(30)  # 30秒监控间隔
    
    # 获取监控摘要
    print("\n=== 监控摘要 ===")
    metrics_summary = metrics_collector.get_metrics_summary(60)
    alert_summary = alert_manager.get_alert_summary(1)
    
    print(f"指标摘要: {json.dumps(metrics_summary, indent=2, ensure_ascii=False)}")
     print(f"告警摘要: {json.dumps(alert_summary, indent=2, ensure_ascii=False)}")

6.4.2 日志管理

import logging
import os
import gzip
import shutil
from pathlib import Path
from datetime import datetime, timedelta
from typing import List, Dict, Optional
from dataclasses import dataclass
from enum import Enum

class LogLevel(Enum):
    """日志级别"""
    DEBUG = "DEBUG"
    INFO = "INFO"
    WARN = "WARN"
    ERROR = "ERROR"
    FATAL = "FATAL"

@dataclass
class LogEntry:
    """日志条目"""
    timestamp: datetime
    level: LogLevel
    thread: str
    logger: str
    message: str
    exception: Optional[str] = None

class LogParser:
    """日志解析器"""
    
    def __init__(self):
        # RocketMQ日志格式:2024-01-15 10:30:45 INFO main - Starting NameServer
        self.log_pattern = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (\w+) (\w+) - (.*)'
    
    def parse_log_line(self, line: str) -> Optional[LogEntry]:
        """解析单行日志"""
        import re
        
        match = re.match(self.log_pattern, line.strip())
        if not match:
            return None
        
        timestamp_str, level_str, thread, message = match.groups()
        
        try:
            timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
            level = LogLevel(level_str)
            
            # 检查是否包含异常信息
            exception = None
            if 'Exception' in message or 'Error' in message:
                exception = message
            
            return LogEntry(
                timestamp=timestamp,
                level=level,
                thread=thread,
                logger="RocketMQ",
                message=message,
                exception=exception
            )
        except (ValueError, KeyError):
            return None
    
    def parse_log_file(self, file_path: Path, max_lines: int = 10000) -> List[LogEntry]:
        """解析日志文件"""
        entries = []
        
        try:
            with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                for i, line in enumerate(f):
                    if i >= max_lines:
                        break
                    
                    entry = self.parse_log_line(line)
                    if entry:
                        entries.append(entry)
        except Exception as e:
            print(f"解析日志文件失败 {file_path}: {str(e)}")
        
        return entries

class LogManager:
    """日志管理器"""
    
    def __init__(self, rocketmq_home: str):
        self.rocketmq_home = Path(rocketmq_home)
        self.logs_dir = self.rocketmq_home / "logs"
        self.parser = LogParser()
        
        # 确保日志目录存在
        self.logs_dir.mkdir(exist_ok=True)
    
    def get_log_files(self) -> Dict[str, List[Path]]:
        """获取所有日志文件"""
        log_files = {
            "nameserver": [],
            "broker": [],
            "client": [],
            "store": [],
            "other": []
        }
        
        if not self.logs_dir.exists():
            return log_files
        
        for log_file in self.logs_dir.glob("*.log"):
            filename = log_file.name.lower()
            
            if "nameserver" in filename or "namesrv" in filename:
                log_files["nameserver"].append(log_file)
            elif "broker" in filename:
                log_files["broker"].append(log_file)
            elif "client" in filename:
                log_files["client"].append(log_file)
            elif "store" in filename:
                log_files["store"].append(log_file)
            else:
                log_files["other"].append(log_file)
        
        # 按修改时间排序
        for category in log_files:
            log_files[category].sort(key=lambda x: x.stat().st_mtime, reverse=True)
        
        return log_files
    
    def search_logs(self, keyword: str, log_type: str = "all", 
                   level: Optional[LogLevel] = None, 
                   start_time: Optional[datetime] = None,
                   end_time: Optional[datetime] = None,
                   max_results: int = 100) -> List[LogEntry]:
        """搜索日志"""
        results = []
        log_files = self.get_log_files()
        
        # 确定要搜索的日志文件
        files_to_search = []
        if log_type == "all":
            for category_files in log_files.values():
                files_to_search.extend(category_files)
        else:
            files_to_search = log_files.get(log_type, [])
        
        for log_file in files_to_search:
            if len(results) >= max_results:
                break
            
            entries = self.parser.parse_log_file(log_file)
            
            for entry in entries:
                if len(results) >= max_results:
                    break
                
                # 关键词过滤
                if keyword.lower() not in entry.message.lower():
                    continue
                
                # 日志级别过滤
                if level and entry.level != level:
                    continue
                
                # 时间范围过滤
                if start_time and entry.timestamp < start_time:
                    continue
                if end_time and entry.timestamp > end_time:
                    continue
                
                results.append(entry)
        
        # 按时间排序
        results.sort(key=lambda x: x.timestamp, reverse=True)
        return results
    
    def get_error_logs(self, hours: int = 24, max_results: int = 50) -> List[LogEntry]:
        """获取错误日志"""
        start_time = datetime.now() - timedelta(hours=hours)
        
        error_logs = []
        error_logs.extend(self.search_logs("", level=LogLevel.ERROR, start_time=start_time, max_results=max_results//2))
        error_logs.extend(self.search_logs("", level=LogLevel.FATAL, start_time=start_time, max_results=max_results//2))
        
        # 去重并排序
        seen = set()
        unique_logs = []
        for log in error_logs:
            key = (log.timestamp, log.message)
            if key not in seen:
                seen.add(key)
                unique_logs.append(log)
        
        unique_logs.sort(key=lambda x: x.timestamp, reverse=True)
        return unique_logs[:max_results]
    
    def analyze_logs(self, hours: int = 24) -> Dict:
        """分析日志"""
        start_time = datetime.now() - timedelta(hours=hours)
        log_files = self.get_log_files()
        
        analysis = {
            "time_range": {
                "start": start_time.isoformat(),
                "end": datetime.now().isoformat(),
                "hours": hours
            },
            "log_counts": {
                "DEBUG": 0,
                "INFO": 0,
                "WARN": 0,
                "ERROR": 0,
                "FATAL": 0
            },
            "error_patterns": {},
            "warning_patterns": {},
            "top_errors": [],
            "file_sizes": {}
        }
        
        # 统计各类型日志文件大小
        for category, files in log_files.items():
            total_size = sum(f.stat().st_size for f in files)
            analysis["file_sizes"][category] = {
                "count": len(files),
                "total_size_mb": round(total_size / (1024*1024), 2)
            }
        
        # 分析日志内容
        all_entries = []
        for category_files in log_files.values():
            for log_file in category_files:
                entries = self.parser.parse_log_file(log_file, max_lines=5000)
                all_entries.extend(entries)
        
        # 过滤时间范围
        recent_entries = [e for e in all_entries if e.timestamp >= start_time]
        
        # 统计日志级别
        for entry in recent_entries:
            analysis["log_counts"][entry.level.value] += 1
        
        # 分析错误模式
        error_messages = [e.message for e in recent_entries if e.level in [LogLevel.ERROR, LogLevel.FATAL]]
        warning_messages = [e.message for e in recent_entries if e.level == LogLevel.WARN]
        
        # 统计错误模式
        for message in error_messages:
            # 简化错误消息以识别模式
            pattern = self._extract_error_pattern(message)
            analysis["error_patterns"][pattern] = analysis["error_patterns"].get(pattern, 0) + 1
        
        # 统计警告模式
        for message in warning_messages:
            pattern = self._extract_error_pattern(message)
            analysis["warning_patterns"][pattern] = analysis["warning_patterns"].get(pattern, 0) + 1
        
        # 获取最频繁的错误
        analysis["top_errors"] = sorted(
            analysis["error_patterns"].items(),
            key=lambda x: x[1],
            reverse=True
        )[:10]
        
        return analysis
    
    def _extract_error_pattern(self, message: str) -> str:
        """提取错误模式"""
        import re
        
        # 移除时间戳、IP地址、端口号等变化的部分
        pattern = re.sub(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', '[TIMESTAMP]', message)
        pattern = re.sub(r'\d+\.\d+\.\d+\.\d+', '[IP]', pattern)
        pattern = re.sub(r':\d+', ':[PORT]', pattern)
        pattern = re.sub(r'\d+', '[NUMBER]', pattern)
        
        # 截取前100个字符作为模式
        return pattern[:100]
    
    def rotate_logs(self, max_size_mb: int = 100, max_files: int = 10):
        """日志轮转"""
        log_files = self.get_log_files()
        
        for category, files in log_files.items():
            for log_file in files:
                try:
                    # 检查文件大小
                    size_mb = log_file.stat().st_size / (1024 * 1024)
                    
                    if size_mb > max_size_mb:
                        print(f"轮转日志文件: {log_file} ({size_mb:.2f}MB)")
                        
                        # 创建压缩备份
                        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
                        backup_name = f"{log_file.stem}_{timestamp}.log.gz"
                        backup_path = log_file.parent / backup_name
                        
                        with open(log_file, 'rb') as f_in:
                            with gzip.open(backup_path, 'wb') as f_out:
                                shutil.copyfileobj(f_in, f_out)
                        
                        # 清空原文件
                        with open(log_file, 'w') as f:
                            f.write('')
                        
                        print(f"日志已备份到: {backup_path}")
                        
                        # 清理旧备份文件
                        self._cleanup_old_backups(log_file.parent, log_file.stem, max_files)
                        
                except Exception as e:
                    print(f"轮转日志文件失败 {log_file}: {str(e)}")
    
    def _cleanup_old_backups(self, log_dir: Path, log_stem: str, max_files: int):
        """清理旧的备份文件"""
        backup_pattern = f"{log_stem}_*.log.gz"
        backup_files = list(log_dir.glob(backup_pattern))
        
        if len(backup_files) > max_files:
            # 按修改时间排序,删除最旧的文件
            backup_files.sort(key=lambda x: x.stat().st_mtime)
            
            for old_file in backup_files[:-max_files]:
                try:
                    old_file.unlink()
                    print(f"删除旧备份文件: {old_file}")
                except Exception as e:
                    print(f"删除备份文件失败 {old_file}: {str(e)}")
    
    def export_logs(self, output_file: str, log_type: str = "all", 
                   start_time: Optional[datetime] = None,
                   end_time: Optional[datetime] = None):
        """导出日志"""
        log_files = self.get_log_files()
        
        # 确定要导出的日志文件
        files_to_export = []
        if log_type == "all":
            for category_files in log_files.values():
                files_to_export.extend(category_files)
        else:
            files_to_export = log_files.get(log_type, [])
        
        with open(output_file, 'w', encoding='utf-8') as out_file:
            out_file.write(f"# RocketMQ日志导出\n")
            out_file.write(f"# 导出时间: {datetime.now().isoformat()}\n")
            out_file.write(f"# 日志类型: {log_type}\n")
            if start_time:
                out_file.write(f"# 开始时间: {start_time.isoformat()}\n")
            if end_time:
                out_file.write(f"# 结束时间: {end_time.isoformat()}\n")
            out_file.write("\n")
            
            for log_file in files_to_export:
                out_file.write(f"\n=== {log_file.name} ===\n")
                
                entries = self.parser.parse_log_file(log_file)
                
                for entry in entries:
                    # 时间范围过滤
                    if start_time and entry.timestamp < start_time:
                        continue
                    if end_time and entry.timestamp > end_time:
                        continue
                    
                    out_file.write(f"{entry.timestamp} {entry.level.value} {entry.thread} - {entry.message}\n")
        
        print(f"日志已导出到: {output_file}")

# 使用示例
if __name__ == "__main__":
    # 创建日志管理器
    log_manager = LogManager("/opt/rocketmq")
    
    print("=== 日志文件概览 ===")
    log_files = log_manager.get_log_files()
    for category, files in log_files.items():
        print(f"{category}: {len(files)} 个文件")
        for file in files[:3]:  # 只显示前3个
            size_mb = file.stat().st_size / (1024*1024)
            print(f"  - {file.name} ({size_mb:.2f}MB)")
    
    print("\n=== 错误日志分析 ===")
    error_logs = log_manager.get_error_logs(hours=24)
    print(f"最近24小时发现 {len(error_logs)} 个错误")
    
    for error in error_logs[:5]:  # 显示前5个错误
        print(f"[{error.timestamp}] {error.level.value}: {error.message[:100]}...")
    
    print("\n=== 日志统计分析 ===")
    analysis = log_manager.analyze_logs(hours=24)
    print(f"日志级别统计: {analysis['log_counts']}")
    print(f"文件大小统计: {analysis['file_sizes']}")
    
    if analysis['top_errors']:
        print("\n最频繁的错误:")
        for pattern, count in analysis['top_errors'][:5]:
            print(f"  {count}次: {pattern}")
    
    print("\n=== 搜索特定关键词 ===")
    search_results = log_manager.search_logs("connection", max_results=10)
    print(f"找到 {len(search_results)} 条包含'connection'的日志")
    
    for result in search_results[:3]:
        print(f"[{result.timestamp}] {result.level.value}: {result.message[:80]}...")
    
    print("\n=== 执行日志轮转 ===")
    log_manager.rotate_logs(max_size_mb=50, max_files=5)
    
    print("\n=== 导出日志 ===")
    yesterday = datetime.now() - timedelta(days=1)
    log_manager.export_logs(
         output_file="/tmp/rocketmq_logs_export.txt",
         log_type="broker",
         start_time=yesterday
     )

6.4.3 故障排查

import subprocess
import socket
import telnetlib
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import json
import time

class HealthStatus(Enum):
    """健康状态"""
    HEALTHY = "healthy"
    WARNING = "warning"
    CRITICAL = "critical"
    UNKNOWN = "unknown"

@dataclass
class DiagnosticResult:
    """诊断结果"""
    component: str
    status: HealthStatus
    message: str
    details: Dict
    suggestions: List[str]
    timestamp: float

class TroubleshootingGuide:
    """故障排查指南"""
    
    def __init__(self, cluster_manager: ClusterManager, log_manager: LogManager):
        self.cluster_manager = cluster_manager
        self.log_manager = log_manager
        self.diagnostic_results = []
    
    def diagnose_cluster(self, cluster: RocketMQClusterArchitecture) -> List[DiagnosticResult]:
        """诊断整个集群"""
        results = []
        
        # 诊断NameServer
        results.extend(self._diagnose_nameservers(cluster))
        
        # 诊断Broker
        results.extend(self._diagnose_brokers(cluster))
        
        # 诊断网络连接
        results.extend(self._diagnose_network(cluster))
        
        # 诊断磁盘空间
        results.extend(self._diagnose_disk_space())
        
        # 诊断内存使用
        results.extend(self._diagnose_memory_usage())
        
        # 诊断日志错误
        results.extend(self._diagnose_log_errors())
        
        # 诊断消息堆积
        results.extend(self._diagnose_message_accumulation())
        
        self.diagnostic_results = results
        return results
    
    def _diagnose_nameservers(self, cluster: RocketMQClusterArchitecture) -> List[DiagnosticResult]:
        """诊断NameServer"""
        results = []
        
        for nameserver in cluster.name_servers:
            try:
                # 检查端口是否可达
                is_reachable = self._check_port_connectivity(nameserver.host, nameserver.port)
                
                if is_reachable:
                    # 检查NameServer状态
                    status_info = self._get_nameserver_status(nameserver.host, nameserver.port)
                    
                    if status_info:
                        results.append(DiagnosticResult(
                            component=f"NameServer-{nameserver.host}:{nameserver.port}",
                            status=HealthStatus.HEALTHY,
                            message="NameServer运行正常",
                            details=status_info,
                            suggestions=[],
                            timestamp=time.time()
                        ))
                    else:
                        results.append(DiagnosticResult(
                            component=f"NameServer-{nameserver.host}:{nameserver.port}",
                            status=HealthStatus.WARNING,
                            message="NameServer端口可达但状态异常",
                            details={"host": nameserver.host, "port": nameserver.port},
                            suggestions=[
                                "检查NameServer进程是否正常运行",
                                "查看NameServer日志文件",
                                "检查JVM内存使用情况"
                            ],
                            timestamp=time.time()
                        ))
                else:
                    results.append(DiagnosticResult(
                        component=f"NameServer-{nameserver.host}:{nameserver.port}",
                        status=HealthStatus.CRITICAL,
                        message="NameServer不可达",
                        details={"host": nameserver.host, "port": nameserver.port},
                        suggestions=[
                            "检查NameServer进程是否启动",
                            "检查防火墙设置",
                            "检查网络连接",
                            "查看启动日志"
                        ],
                        timestamp=time.time()
                    ))
                    
            except Exception as e:
                results.append(DiagnosticResult(
                    component=f"NameServer-{nameserver.host}:{nameserver.port}",
                    status=HealthStatus.UNKNOWN,
                    message=f"诊断失败: {str(e)}",
                    details={"error": str(e)},
                    suggestions=["检查网络连接", "验证配置信息"],
                    timestamp=time.time()
                ))
        
        return results
    
    def _diagnose_brokers(self, cluster: RocketMQClusterArchitecture) -> List[DiagnosticResult]:
        """诊断Broker"""
        results = []
        
        for broker_name, broker_nodes in cluster.brokers.items():
            for broker in broker_nodes:
                try:
                    # 检查Broker端口
                    is_reachable = self._check_port_connectivity(broker.host, broker.port)
                    
                    if is_reachable:
                        # 检查Broker状态
                        broker_stats = self._get_broker_stats(broker.host, broker.port)
                        
                        if broker_stats:
                            # 分析Broker健康状况
                            status, issues = self._analyze_broker_health(broker_stats)
                            
                            results.append(DiagnosticResult(
                                component=f"Broker-{broker_name}-{broker.node_id}",
                                status=status,
                                message=f"Broker状态: {status.value}",
                                details=broker_stats,
                                suggestions=issues,
                                timestamp=time.time()
                            ))
                        else:
                            results.append(DiagnosticResult(
                                component=f"Broker-{broker_name}-{broker.node_id}",
                                status=HealthStatus.WARNING,
                                message="无法获取Broker统计信息",
                                details={"host": broker.host, "port": broker.port},
                                suggestions=[
                                    "检查Broker进程状态",
                                    "查看Broker日志",
                                    "验证JMX配置"
                                ],
                                timestamp=time.time()
                            ))
                    else:
                        results.append(DiagnosticResult(
                            component=f"Broker-{broker_name}-{broker.node_id}",
                            status=HealthStatus.CRITICAL,
                            message="Broker不可达",
                            details={"host": broker.host, "port": broker.port},
                            suggestions=[
                                "检查Broker进程是否启动",
                                "检查防火墙设置",
                                "验证Broker配置",
                                "查看启动日志"
                            ],
                            timestamp=time.time()
                        ))
                        
                except Exception as e:
                    results.append(DiagnosticResult(
                        component=f"Broker-{broker_name}-{broker.node_id}",
                        status=HealthStatus.UNKNOWN,
                        message=f"诊断失败: {str(e)}",
                        details={"error": str(e)},
                        suggestions=["检查网络连接", "验证配置信息"],
                        timestamp=time.time()
                    ))
        
        return results
    
    def _diagnose_network(self, cluster: RocketMQClusterArchitecture) -> List[DiagnosticResult]:
        """诊断网络连接"""
        results = []
        
        # 检查NameServer之间的连接
        for i, ns1 in enumerate(cluster.name_servers):
            for j, ns2 in enumerate(cluster.name_servers):
                if i != j:
                    connectivity = self._check_port_connectivity(ns2.host, ns2.port, source_host=ns1.host)
                    
                    if not connectivity:
                        results.append(DiagnosticResult(
                            component="Network",
                            status=HealthStatus.WARNING,
                            message=f"NameServer间连接异常: {ns1.host} -> {ns2.host}:{ns2.port}",
                            details={"source": f"{ns1.host}:{ns1.port}", "target": f"{ns2.host}:{ns2.port}"},
                            suggestions=[
                                "检查网络路由",
                                "检查防火墙规则",
                                "验证DNS解析"
                            ],
                            timestamp=time.time()
                        ))
        
        # 检查Broker到NameServer的连接
        for broker_name, broker_nodes in cluster.brokers.items():
            for broker in broker_nodes:
                for nameserver in cluster.name_servers:
                    connectivity = self._check_port_connectivity(
                        nameserver.host, nameserver.port, source_host=broker.host
                    )
                    
                    if not connectivity:
                        results.append(DiagnosticResult(
                            component="Network",
                            status=HealthStatus.CRITICAL,
                            message=f"Broker到NameServer连接失败: {broker.host} -> {nameserver.host}:{nameserver.port}",
                            details={
                                "broker": f"{broker.host}:{broker.port}",
                                "nameserver": f"{nameserver.host}:{nameserver.port}"
                            },
                            suggestions=[
                                "检查网络连接",
                                "验证防火墙设置",
                                "检查路由配置"
                            ],
                            timestamp=time.time()
                        ))
        
        return results
    
    def _diagnose_disk_space(self) -> List[DiagnosticResult]:
        """诊断磁盘空间"""
        results = []
        
        try:
            import psutil
            
            # 检查RocketMQ数据目录
            store_path = self.cluster_manager.install_dir / "store"
            if store_path.exists():
                disk_usage = psutil.disk_usage(str(store_path))
                usage_percent = (disk_usage.used / disk_usage.total) * 100
                
                if usage_percent > 90:
                    status = HealthStatus.CRITICAL
                    message = f"磁盘空间严重不足: {usage_percent:.1f}%"
                    suggestions = [
                        "立即清理磁盘空间",
                        "删除旧的CommitLog文件",
                        "增加磁盘容量",
                        "配置自动清理策略"
                    ]
                elif usage_percent > 80:
                    status = HealthStatus.WARNING
                    message = f"磁盘空间不足: {usage_percent:.1f}%"
                    suggestions = [
                        "监控磁盘使用情况",
                        "计划清理策略",
                        "考虑扩容"
                    ]
                else:
                    status = HealthStatus.HEALTHY
                    message = f"磁盘空间充足: {usage_percent:.1f}%"
                    suggestions = []
                
                results.append(DiagnosticResult(
                    component="DiskSpace",
                    status=status,
                    message=message,
                    details={
                        "path": str(store_path),
                        "usage_percent": round(usage_percent, 2),
                        "free_gb": round(disk_usage.free / (1024**3), 2),
                        "total_gb": round(disk_usage.total / (1024**3), 2)
                    },
                    suggestions=suggestions,
                    timestamp=time.time()
                ))
                
        except Exception as e:
            results.append(DiagnosticResult(
                component="DiskSpace",
                status=HealthStatus.UNKNOWN,
                message=f"无法检查磁盘空间: {str(e)}",
                details={"error": str(e)},
                suggestions=["手动检查磁盘空间"],
                timestamp=time.time()
            ))
        
        return results
    
    def _diagnose_memory_usage(self) -> List[DiagnosticResult]:
        """诊断内存使用"""
        results = []
        
        try:
            import psutil
            
            memory = psutil.virtual_memory()
            usage_percent = memory.percent
            
            if usage_percent > 90:
                status = HealthStatus.CRITICAL
                message = f"内存使用率过高: {usage_percent:.1f}%"
                suggestions = [
                    "检查内存泄漏",
                    "调整JVM堆大小",
                    "重启服务释放内存",
                    "增加物理内存"
                ]
            elif usage_percent > 80:
                status = HealthStatus.WARNING
                message = f"内存使用率较高: {usage_percent:.1f}%"
                suggestions = [
                    "监控内存使用趋势",
                    "优化JVM参数",
                    "检查是否有内存泄漏"
                ]
            else:
                status = HealthStatus.HEALTHY
                message = f"内存使用正常: {usage_percent:.1f}%"
                suggestions = []
            
            results.append(DiagnosticResult(
                component="Memory",
                status=status,
                message=message,
                details={
                    "usage_percent": round(usage_percent, 2),
                    "used_gb": round(memory.used / (1024**3), 2),
                    "total_gb": round(memory.total / (1024**3), 2),
                    "available_gb": round(memory.available / (1024**3), 2)
                },
                suggestions=suggestions,
                timestamp=time.time()
            ))
            
        except Exception as e:
            results.append(DiagnosticResult(
                component="Memory",
                status=HealthStatus.UNKNOWN,
                message=f"无法检查内存使用: {str(e)}",
                details={"error": str(e)},
                suggestions=["手动检查内存使用情况"],
                timestamp=time.time()
            ))
        
        return results
    
    def _diagnose_log_errors(self) -> List[DiagnosticResult]:
        """诊断日志错误"""
        results = []
        
        try:
            # 获取最近的错误日志
            error_logs = self.log_manager.get_error_logs(hours=1, max_results=20)
            
            if len(error_logs) > 10:
                status = HealthStatus.CRITICAL
                message = f"最近1小时内发现大量错误: {len(error_logs)} 个"
                suggestions = [
                    "立即查看错误日志详情",
                    "检查系统资源",
                    "验证配置文件",
                    "考虑重启服务"
                ]
            elif len(error_logs) > 5:
                status = HealthStatus.WARNING
                message = f"最近1小时内发现一些错误: {len(error_logs)} 个"
                suggestions = [
                    "查看错误日志详情",
                    "监控错误趋势",
                    "检查相关配置"
                ]
            elif len(error_logs) > 0:
                status = HealthStatus.WARNING
                message = f"最近1小时内发现少量错误: {len(error_logs)} 个"
                suggestions = ["查看错误日志详情"]
            else:
                status = HealthStatus.HEALTHY
                message = "最近1小时内无错误日志"
                suggestions = []
            
            # 提取错误详情
            error_details = []
            for error in error_logs[:5]:  # 只显示前5个错误
                error_details.append({
                    "timestamp": error.timestamp.isoformat(),
                    "level": error.level.value,
                    "message": error.message[:200]  # 截取前200字符
                })
            
            results.append(DiagnosticResult(
                component="LogErrors",
                status=status,
                message=message,
                details={
                    "error_count": len(error_logs),
                    "recent_errors": error_details
                },
                suggestions=suggestions,
                timestamp=time.time()
            ))
            
        except Exception as e:
            results.append(DiagnosticResult(
                component="LogErrors",
                status=HealthStatus.UNKNOWN,
                message=f"无法分析日志错误: {str(e)}",
                details={"error": str(e)},
                suggestions=["手动检查日志文件"],
                timestamp=time.time()
            ))
        
        return results
    
    def _diagnose_message_accumulation(self) -> List[DiagnosticResult]:
        """诊断消息堆积"""
        results = []
        
        try:
            # 获取消息堆积信息(简化实现)
            accumulation = self._get_message_accumulation()
            
            if accumulation > 50000:
                status = HealthStatus.CRITICAL
                message = f"严重消息堆积: {accumulation} 条"
                suggestions = [
                    "立即检查消费者状态",
                    "增加消费者实例",
                    "检查消费逻辑性能",
                    "考虑暂停生产者"
                ]
            elif accumulation > 10000:
                status = HealthStatus.WARNING
                message = f"消息堆积较多: {accumulation} 条"
                suggestions = [
                    "监控消费速率",
                    "检查消费者性能",
                    "考虑扩容消费者"
                ]
            elif accumulation > 1000:
                status = HealthStatus.WARNING
                message = f"少量消息堆积: {accumulation} 条"
                suggestions = ["监控消费情况"]
            else:
                status = HealthStatus.HEALTHY
                message = f"消息消费正常: {accumulation} 条堆积"
                suggestions = []
            
            results.append(DiagnosticResult(
                component="MessageAccumulation",
                status=status,
                message=message,
                details={"accumulation_count": accumulation},
                suggestions=suggestions,
                timestamp=time.time()
            ))
            
        except Exception as e:
            results.append(DiagnosticResult(
                component="MessageAccumulation",
                status=HealthStatus.UNKNOWN,
                message=f"无法检查消息堆积: {str(e)}",
                details={"error": str(e)},
                suggestions=["手动检查消息队列状态"],
                timestamp=time.time()
            ))
        
        return results
    
    def _check_port_connectivity(self, host: str, port: int, source_host: str = None, timeout: int = 5) -> bool:
        """检查端口连通性"""
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(timeout)
            
            if source_host:
                sock.bind((source_host, 0))
            
            result = sock.connect_ex((host, port))
            sock.close()
            
            return result == 0
        except Exception:
            return False
    
    def _get_nameserver_status(self, host: str, port: int) -> Optional[Dict]:
        """获取NameServer状态"""
        try:
            # 简化实现,实际应该通过管理接口获取
            mqadmin_script = self.cluster_manager.install_dir / "bin" / "mqadmin"
            result = subprocess.run(
                [str(mqadmin_script), "clusterList", "-n", f"{host}:{port}"],
                capture_output=True,
                text=True,
                timeout=10
            )
            
            if result.returncode == 0:
                return {
                    "status": "running",
                    "response_time": "< 1s",
                    "cluster_info": result.stdout[:500]  # 截取前500字符
                }
            return None
        except Exception:
            return None
    
    def _get_broker_stats(self, host: str, port: int) -> Optional[Dict]:
        """获取Broker统计信息"""
        try:
            # 简化实现,实际应该通过JMX或管理接口获取
            mqadmin_script = self.cluster_manager.install_dir / "bin" / "mqadmin"
            result = subprocess.run(
                [str(mqadmin_script), "brokerStatus", "-n", "localhost:9876", "-b", f"{host}:{port}"],
                capture_output=True,
                text=True,
                timeout=10
            )
            
            if result.returncode == 0:
                return {
                    "status": "running",
                    "stats": result.stdout[:1000]  # 截取前1000字符
                }
            return None
        except Exception:
            return None
    
    def _analyze_broker_health(self, broker_stats: Dict) -> Tuple[HealthStatus, List[str]]:
        """分析Broker健康状况"""
        issues = []
        status = HealthStatus.HEALTHY
        
        # 简化的健康检查逻辑
        stats_text = broker_stats.get("stats", "")
        
        if "ERROR" in stats_text or "Exception" in stats_text:
            status = HealthStatus.WARNING
            issues.append("Broker状态中发现错误信息")
        
        if "OutOfMemory" in stats_text:
            status = HealthStatus.CRITICAL
            issues.append("Broker内存不足")
        
        if "DiskFull" in stats_text:
            status = HealthStatus.CRITICAL
            issues.append("Broker磁盘空间不足")
        
        return status, issues
    
    def _get_message_accumulation(self) -> int:
        """获取消息堆积数量"""
        try:
            mqadmin_script = self.cluster_manager.install_dir / "bin" / "mqadmin"
            result = subprocess.run(
                [str(mqadmin_script), "consumerProgress", "-n", "localhost:9876"],
                capture_output=True,
                text=True,
                timeout=10
            )
            
            if result.returncode == 0:
                total_diff = 0
                for line in result.stdout.split('\n'):
                    if 'Diff' in line:
                        try:
                            diff = int(line.split()[-1])
                            total_diff += diff
                        except (ValueError, IndexError):
                            continue
                return total_diff
            return 0
        except Exception:
            return 0
    
    def generate_diagnostic_report(self) -> str:
        """生成诊断报告"""
        if not self.diagnostic_results:
            return "没有诊断结果"
        
        report = []
        report.append("# RocketMQ集群诊断报告")
        report.append(f"生成时间: {datetime.now().isoformat()}")
        report.append(f"诊断项目数: {len(self.diagnostic_results)}")
        report.append("")
        
        # 按状态分组
        status_groups = {
            HealthStatus.CRITICAL: [],
            HealthStatus.WARNING: [],
            HealthStatus.HEALTHY: [],
            HealthStatus.UNKNOWN: []
        }
        
        for result in self.diagnostic_results:
            status_groups[result.status].append(result)
        
        # 生成摘要
        report.append("## 诊断摘要")
        for status, results in status_groups.items():
            if results:
                report.append(f"- {status.value.upper()}: {len(results)} 项")
        report.append("")
        
        # 详细结果
        for status in [HealthStatus.CRITICAL, HealthStatus.WARNING, HealthStatus.HEALTHY, HealthStatus.UNKNOWN]:
            results = status_groups[status]
            if results:
                report.append(f"## {status.value.upper()} 项目")
                for result in results:
                    report.append(f"### {result.component}")
                    report.append(f"**状态**: {result.status.value}")
                    report.append(f"**消息**: {result.message}")
                    
                    if result.suggestions:
                        report.append("**建议**:")
                        for suggestion in result.suggestions:
                            report.append(f"- {suggestion}")
                    
                    if result.details:
                        report.append("**详情**:")
                        for key, value in result.details.items():
                            report.append(f"- {key}: {value}")
                    
                    report.append("")
        
        return "\n".join(report)
    
    def get_quick_fixes(self) -> Dict[str, List[str]]:
        """获取快速修复建议"""
        fixes = {
            "critical_issues": [],
            "warning_issues": [],
            "general_maintenance": []
        }
        
        for result in self.diagnostic_results:
            if result.status == HealthStatus.CRITICAL:
                fixes["critical_issues"].extend(result.suggestions)
            elif result.status == HealthStatus.WARNING:
                fixes["warning_issues"].extend(result.suggestions)
        
        # 添加一般性维护建议
        fixes["general_maintenance"] = [
            "定期检查日志文件",
            "监控系统资源使用情况",
            "定期备份重要数据",
            "保持RocketMQ版本更新",
            "定期清理过期数据"
        ]
        
        # 去重
        for category in fixes:
            fixes[category] = list(set(fixes[category]))
        
        return fixes

# 使用示例
if __name__ == "__main__":
    # 创建故障排查工具
    cluster_manager = ClusterManager("/opt/rocketmq")
    log_manager = LogManager("/opt/rocketmq")
    troubleshooter = TroubleshootingGuide(cluster_manager, log_manager)
    
    # 创建集群配置
    cluster = ClusterDeploymentMode.create_production_cluster("prod-cluster")
    
    print("开始集群诊断...")
    
    # 执行诊断
    diagnostic_results = troubleshooter.diagnose_cluster(cluster)
    
    # 生成报告
    report = troubleshooter.generate_diagnostic_report()
    print(report)
    
    # 获取快速修复建议
    quick_fixes = troubleshooter.get_quick_fixes()
    
    print("\n=== 快速修复建议 ===")
    if quick_fixes["critical_issues"]:
        print("\n🚨 紧急问题:")
        for fix in quick_fixes["critical_issues"][:5]:
            print(f"  - {fix}")
    
    if quick_fixes["warning_issues"]:
        print("\n⚠️ 警告问题:")
        for fix in quick_fixes["warning_issues"][:5]:
            print(f"  - {fix}")
    
    print("\n🔧 日常维护:")
     for fix in quick_fixes["general_maintenance"][:5]:
         print(f"  - {fix}")

6.5 备份与恢复

6.5.1 数据备份策略

import shutil
import tarfile
import gzip
import hashlib
from pathlib import Path
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
import json
import time
from datetime import datetime, timedelta

class BackupType(Enum):
    """备份类型"""
    FULL = "full"          # 全量备份
    INCREMENTAL = "incremental"  # 增量备份
    DIFFERENTIAL = "differential"  # 差异备份
    METADATA = "metadata"    # 元数据备份

class BackupStatus(Enum):
    """备份状态"""
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

@dataclass
class BackupConfig:
    """备份配置"""
    backup_type: BackupType
    source_path: str
    backup_path: str
    retention_days: int = 30
    compression: bool = True
    encryption: bool = False
    verify_integrity: bool = True
    exclude_patterns: List[str] = None
    max_backup_size_gb: int = 100
    
    def __post_init__(self):
        if self.exclude_patterns is None:
            self.exclude_patterns = []

@dataclass
class BackupRecord:
    """备份记录"""
    backup_id: str
    backup_type: BackupType
    status: BackupStatus
    start_time: datetime
    end_time: Optional[datetime]
    source_path: str
    backup_path: str
    file_count: int
    backup_size_bytes: int
    checksum: Optional[str]
    error_message: Optional[str]
    metadata: Dict

class RocketMQBackupManager:
    """RocketMQ备份管理器"""
    
    def __init__(self, install_dir: str, backup_root: str):
        self.install_dir = Path(install_dir)
        self.backup_root = Path(backup_root)
        self.backup_root.mkdir(parents=True, exist_ok=True)
        
        # 备份记录文件
        self.backup_records_file = self.backup_root / "backup_records.json"
        self.backup_records = self._load_backup_records()
        
        # 默认备份配置
        self.default_configs = self._create_default_configs()
    
    def _create_default_configs(self) -> Dict[str, BackupConfig]:
        """创建默认备份配置"""
        configs = {}
        
        # CommitLog备份配置
        configs["commitlog"] = BackupConfig(
            backup_type=BackupType.INCREMENTAL,
            source_path=str(self.install_dir / "store" / "commitlog"),
            backup_path=str(self.backup_root / "commitlog"),
            retention_days=7,
            compression=True,
            verify_integrity=True,
            exclude_patterns=["*.tmp", "*.lock"]
        )
        
        # ConsumeQueue备份配置
        configs["consumequeue"] = BackupConfig(
            backup_type=BackupType.DIFFERENTIAL,
            source_path=str(self.install_dir / "store" / "consumequeue"),
            backup_path=str(self.backup_root / "consumequeue"),
            retention_days=14,
            compression=True,
            verify_integrity=True
        )
        
        # Index备份配置
        configs["index"] = BackupConfig(
            backup_type=BackupType.FULL,
            source_path=str(self.install_dir / "store" / "index"),
            backup_path=str(self.backup_root / "index"),
            retention_days=30,
            compression=True,
            verify_integrity=True
        )
        
        # 配置文件备份
        configs["config"] = BackupConfig(
            backup_type=BackupType.FULL,
            source_path=str(self.install_dir / "conf"),
            backup_path=str(self.backup_root / "config"),
            retention_days=90,
            compression=True,
            verify_integrity=True
        )
        
        # 元数据备份
        configs["metadata"] = BackupConfig(
            backup_type=BackupType.METADATA,
            source_path=str(self.install_dir / "store"),
            backup_path=str(self.backup_root / "metadata"),
            retention_days=30,
            compression=True,
            verify_integrity=True
        )
        
        return configs
    
    def create_backup(self, config_name: str, custom_config: BackupConfig = None) -> str:
        """创建备份"""
        config = custom_config or self.default_configs.get(config_name)
        if not config:
            raise ValueError(f"未找到备份配置: {config_name}")
        
        backup_id = f"{config_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        
        # 创建备份记录
        record = BackupRecord(
            backup_id=backup_id,
            backup_type=config.backup_type,
            status=BackupStatus.PENDING,
            start_time=datetime.now(),
            end_time=None,
            source_path=config.source_path,
            backup_path=config.backup_path,
            file_count=0,
            backup_size_bytes=0,
            checksum=None,
            error_message=None,
            metadata={}
        )
        
        try:
            record.status = BackupStatus.RUNNING
            self._save_backup_record(record)
            
            # 执行备份
            if config.backup_type == BackupType.FULL:
                self._create_full_backup(config, record)
            elif config.backup_type == BackupType.INCREMENTAL:
                self._create_incremental_backup(config, record)
            elif config.backup_type == BackupType.DIFFERENTIAL:
                self._create_differential_backup(config, record)
            elif config.backup_type == BackupType.METADATA:
                self._create_metadata_backup(config, record)
            
            record.status = BackupStatus.COMPLETED
            record.end_time = datetime.now()
            
            # 验证备份完整性
            if config.verify_integrity:
                self._verify_backup_integrity(record)
            
        except Exception as e:
            record.status = BackupStatus.FAILED
            record.error_message = str(e)
            record.end_time = datetime.now()
            raise
        
        finally:
            self._save_backup_record(record)
        
        return backup_id
    
    def _create_full_backup(self, config: BackupConfig, record: BackupRecord):
        """创建全量备份"""
        source_path = Path(config.source_path)
        if not source_path.exists():
            raise FileNotFoundError(f"源路径不存在: {source_path}")
        
        # 创建备份目录
        backup_dir = Path(config.backup_path) / record.backup_id
        backup_dir.mkdir(parents=True, exist_ok=True)
        
        # 复制文件
        file_count = 0
        total_size = 0
        
        for file_path in source_path.rglob("*"):
            if file_path.is_file() and not self._should_exclude(file_path, config.exclude_patterns):
                relative_path = file_path.relative_to(source_path)
                target_path = backup_dir / relative_path
                target_path.parent.mkdir(parents=True, exist_ok=True)
                
                shutil.copy2(file_path, target_path)
                file_count += 1
                total_size += file_path.stat().st_size
        
        # 压缩备份
        if config.compression:
            archive_path = backup_dir.with_suffix(".tar.gz")
            self._create_archive(backup_dir, archive_path)
            shutil.rmtree(backup_dir)
            record.backup_path = str(archive_path)
            record.backup_size_bytes = archive_path.stat().st_size
        else:
            record.backup_path = str(backup_dir)
            record.backup_size_bytes = total_size
        
        record.file_count = file_count
        record.metadata["backup_method"] = "full_copy"
    
    def _create_incremental_backup(self, config: BackupConfig, record: BackupRecord):
        """创建增量备份"""
        # 获取最后一次备份的时间
        last_backup_time = self._get_last_backup_time(config.source_path)
        
        source_path = Path(config.source_path)
        backup_dir = Path(config.backup_path) / record.backup_id
        backup_dir.mkdir(parents=True, exist_ok=True)
        
        file_count = 0
        total_size = 0
        
        # 只备份修改时间晚于上次备份的文件
        for file_path in source_path.rglob("*"):
            if file_path.is_file() and not self._should_exclude(file_path, config.exclude_patterns):
                file_mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
                
                if file_mtime > last_backup_time:
                    relative_path = file_path.relative_to(source_path)
                    target_path = backup_dir / relative_path
                    target_path.parent.mkdir(parents=True, exist_ok=True)
                    
                    shutil.copy2(file_path, target_path)
                    file_count += 1
                    total_size += file_path.stat().st_size
        
        # 压缩备份
        if config.compression and file_count > 0:
            archive_path = backup_dir.with_suffix(".tar.gz")
            self._create_archive(backup_dir, archive_path)
            shutil.rmtree(backup_dir)
            record.backup_path = str(archive_path)
            record.backup_size_bytes = archive_path.stat().st_size
        else:
            record.backup_path = str(backup_dir)
            record.backup_size_bytes = total_size
        
        record.file_count = file_count
        record.metadata["backup_method"] = "incremental"
        record.metadata["last_backup_time"] = last_backup_time.isoformat()
    
    def _create_differential_backup(self, config: BackupConfig, record: BackupRecord):
        """创建差异备份"""
        # 获取最后一次全量备份的时间
        last_full_backup_time = self._get_last_full_backup_time(config.source_path)
        
        source_path = Path(config.source_path)
        backup_dir = Path(config.backup_path) / record.backup_id
        backup_dir.mkdir(parents=True, exist_ok=True)
        
        file_count = 0
        total_size = 0
        
        # 备份自上次全量备份以来修改的所有文件
        for file_path in source_path.rglob("*"):
            if file_path.is_file() and not self._should_exclude(file_path, config.exclude_patterns):
                file_mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
                
                if file_mtime > last_full_backup_time:
                    relative_path = file_path.relative_to(source_path)
                    target_path = backup_dir / relative_path
                    target_path.parent.mkdir(parents=True, exist_ok=True)
                    
                    shutil.copy2(file_path, target_path)
                    file_count += 1
                    total_size += file_path.stat().st_size
        
        # 压缩备份
        if config.compression and file_count > 0:
            archive_path = backup_dir.with_suffix(".tar.gz")
            self._create_archive(backup_dir, archive_path)
            shutil.rmtree(backup_dir)
            record.backup_path = str(archive_path)
            record.backup_size_bytes = archive_path.stat().st_size
        else:
            record.backup_path = str(backup_dir)
            record.backup_size_bytes = total_size
        
        record.file_count = file_count
        record.metadata["backup_method"] = "differential"
        record.metadata["last_full_backup_time"] = last_full_backup_time.isoformat()
    
    def _create_metadata_backup(self, config: BackupConfig, record: BackupRecord):
        """创建元数据备份"""
        backup_dir = Path(config.backup_path) / record.backup_id
        backup_dir.mkdir(parents=True, exist_ok=True)
        
        metadata = {
            "backup_time": datetime.now().isoformat(),
            "rocketmq_version": self._get_rocketmq_version(),
            "cluster_info": self._get_cluster_metadata(),
            "topic_info": self._get_topic_metadata(),
            "consumer_info": self._get_consumer_metadata(),
            "broker_config": self._get_broker_config_metadata()
        }
        
        # 保存元数据
        metadata_file = backup_dir / "metadata.json"
        with open(metadata_file, 'w', encoding='utf-8') as f:
            json.dump(metadata, f, indent=2, ensure_ascii=False)
        
        record.file_count = 1
        record.backup_size_bytes = metadata_file.stat().st_size
        record.backup_path = str(backup_dir)
        record.metadata["backup_method"] = "metadata_only"
    
    def _should_exclude(self, file_path: Path, exclude_patterns: List[str]) -> bool:
        """检查文件是否应该被排除"""
        if not exclude_patterns:
            return False
        
        import fnmatch
        file_name = file_path.name
        
        for pattern in exclude_patterns:
            if fnmatch.fnmatch(file_name, pattern):
                return True
        
        return False
    
    def _create_archive(self, source_dir: Path, archive_path: Path):
        """创建压缩归档"""
        with tarfile.open(archive_path, "w:gz") as tar:
            tar.add(source_dir, arcname=source_dir.name)
    
    def _get_last_backup_time(self, source_path: str) -> datetime:
        """获取最后一次备份时间"""
        last_time = datetime.min
        
        for record in self.backup_records:
            if (record.source_path == source_path and 
                record.status == BackupStatus.COMPLETED and
                record.end_time and record.end_time > last_time):
                last_time = record.end_time
        
        return last_time
    
    def _get_last_full_backup_time(self, source_path: str) -> datetime:
        """获取最后一次全量备份时间"""
        last_time = datetime.min
        
        for record in self.backup_records:
            if (record.source_path == source_path and 
                record.backup_type == BackupType.FULL and
                record.status == BackupStatus.COMPLETED and
                record.end_time and record.end_time > last_time):
                last_time = record.end_time
        
        return last_time
    
    def _verify_backup_integrity(self, record: BackupRecord):
        """验证备份完整性"""
        backup_path = Path(record.backup_path)
        
        if backup_path.suffix == ".gz":
            # 验证压缩文件
            try:
                with tarfile.open(backup_path, "r:gz") as tar:
                    tar.getmembers()  # 尝试读取所有成员
                record.checksum = self._calculate_file_checksum(backup_path)
            except Exception as e:
                raise ValueError(f"备份文件损坏: {e}")
        else:
            # 验证目录
            if not backup_path.exists():
                raise ValueError("备份目录不存在")
            record.checksum = self._calculate_directory_checksum(backup_path)
    
    def _calculate_file_checksum(self, file_path: Path) -> str:
        """计算文件校验和"""
        hash_md5 = hashlib.md5()
        with open(file_path, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                hash_md5.update(chunk)
        return hash_md5.hexdigest()
    
    def _calculate_directory_checksum(self, dir_path: Path) -> str:
        """计算目录校验和"""
        hash_md5 = hashlib.md5()
        
        for file_path in sorted(dir_path.rglob("*")):
            if file_path.is_file():
                # 添加文件路径
                hash_md5.update(str(file_path.relative_to(dir_path)).encode())
                # 添加文件内容
                with open(file_path, "rb") as f:
                    for chunk in iter(lambda: f.read(4096), b""):
                        hash_md5.update(chunk)
        
        return hash_md5.hexdigest()
    
    def _get_rocketmq_version(self) -> str:
        """获取RocketMQ版本"""
        try:
            version_file = self.install_dir / "VERSION"
            if version_file.exists():
                return version_file.read_text().strip()
            return "unknown"
        except Exception:
            return "unknown"
    
    def _get_cluster_metadata(self) -> Dict:
        """获取集群元数据"""
        try:
            # 简化实现,实际应该通过管理接口获取
            mqadmin_script = self.install_dir / "bin" / "mqadmin"
            result = subprocess.run(
                [str(mqadmin_script), "clusterList", "-n", "localhost:9876"],
                capture_output=True,
                text=True,
                timeout=10
            )
            
            if result.returncode == 0:
                return {"cluster_list": result.stdout}
            return {"error": "无法获取集群信息"}
        except Exception as e:
            return {"error": str(e)}
    
    def _get_topic_metadata(self) -> Dict:
        """获取Topic元数据"""
        try:
            mqadmin_script = self.install_dir / "bin" / "mqadmin"
            result = subprocess.run(
                [str(mqadmin_script), "topicList", "-n", "localhost:9876"],
                capture_output=True,
                text=True,
                timeout=10
            )
            
            if result.returncode == 0:
                return {"topic_list": result.stdout}
            return {"error": "无法获取Topic信息"}
        except Exception as e:
            return {"error": str(e)}
    
    def _get_consumer_metadata(self) -> Dict:
        """获取消费者元数据"""
        try:
            mqadmin_script = self.install_dir / "bin" / "mqadmin"
            result = subprocess.run(
                [str(mqadmin_script), "consumerProgress", "-n", "localhost:9876"],
                capture_output=True,
                text=True,
                timeout=10
            )
            
            if result.returncode == 0:
                return {"consumer_progress": result.stdout}
            return {"error": "无法获取消费者信息"}
        except Exception as e:
            return {"error": str(e)}
    
    def _get_broker_config_metadata(self) -> Dict:
        """获取Broker配置元数据"""
        try:
            config_files = []
            conf_dir = self.install_dir / "conf"
            
            if conf_dir.exists():
                for config_file in conf_dir.rglob("*.properties"):
                    config_files.append({
                        "file": str(config_file.relative_to(self.install_dir)),
                        "content": config_file.read_text(encoding='utf-8')
                    })
            
            return {"config_files": config_files}
        except Exception as e:
            return {"error": str(e)}
    
    def _load_backup_records(self) -> List[BackupRecord]:
        """加载备份记录"""
        if not self.backup_records_file.exists():
            return []
        
        try:
            with open(self.backup_records_file, 'r', encoding='utf-8') as f:
                data = json.load(f)
            
            records = []
            for item in data:
                record = BackupRecord(
                    backup_id=item['backup_id'],
                    backup_type=BackupType(item['backup_type']),
                    status=BackupStatus(item['status']),
                    start_time=datetime.fromisoformat(item['start_time']),
                    end_time=datetime.fromisoformat(item['end_time']) if item['end_time'] else None,
                    source_path=item['source_path'],
                    backup_path=item['backup_path'],
                    file_count=item['file_count'],
                    backup_size_bytes=item['backup_size_bytes'],
                    checksum=item.get('checksum'),
                    error_message=item.get('error_message'),
                    metadata=item.get('metadata', {})
                )
                records.append(record)
            
            return records
        except Exception:
            return []
    
    def _save_backup_record(self, record: BackupRecord):
        """保存备份记录"""
        # 更新记录列表
        existing_index = None
        for i, existing_record in enumerate(self.backup_records):
            if existing_record.backup_id == record.backup_id:
                existing_index = i
                break
        
        if existing_index is not None:
            self.backup_records[existing_index] = record
        else:
            self.backup_records.append(record)
        
        # 保存到文件
        data = []
        for r in self.backup_records:
            data.append({
                'backup_id': r.backup_id,
                'backup_type': r.backup_type.value,
                'status': r.status.value,
                'start_time': r.start_time.isoformat(),
                'end_time': r.end_time.isoformat() if r.end_time else None,
                'source_path': r.source_path,
                'backup_path': r.backup_path,
                'file_count': r.file_count,
                'backup_size_bytes': r.backup_size_bytes,
                'checksum': r.checksum,
                'error_message': r.error_message,
                'metadata': r.metadata
            })
        
        with open(self.backup_records_file, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)
    
    def list_backups(self, source_path: str = None, backup_type: BackupType = None) -> List[BackupRecord]:
        """列出备份记录"""
        filtered_records = self.backup_records
        
        if source_path:
            filtered_records = [r for r in filtered_records if r.source_path == source_path]
        
        if backup_type:
            filtered_records = [r for r in filtered_records if r.backup_type == backup_type]
        
        # 按时间倒序排列
        return sorted(filtered_records, key=lambda x: x.start_time, reverse=True)
    
    def delete_backup(self, backup_id: str) -> bool:
        """删除备份"""
        record = None
        for r in self.backup_records:
            if r.backup_id == backup_id:
                record = r
                break
        
        if not record:
            return False
        
        try:
            # 删除备份文件
            backup_path = Path(record.backup_path)
            if backup_path.exists():
                if backup_path.is_file():
                    backup_path.unlink()
                else:
                    shutil.rmtree(backup_path)
            
            # 从记录中移除
            self.backup_records.remove(record)
            self._save_backup_records()
            
            return True
        except Exception:
            return False
    
    def cleanup_old_backups(self):
        """清理过期备份"""
        current_time = datetime.now()
        
        for config_name, config in self.default_configs.items():
            retention_period = timedelta(days=config.retention_days)
            cutoff_time = current_time - retention_period
            
            # 找到过期的备份
            expired_backups = [
                r for r in self.backup_records
                if (r.source_path == config.source_path and
                    r.start_time < cutoff_time and
                    r.status == BackupStatus.COMPLETED)
            ]
            
            # 删除过期备份
            for backup in expired_backups:
                self.delete_backup(backup.backup_id)
                print(f"已删除过期备份: {backup.backup_id}")
    
    def get_backup_statistics(self) -> Dict:
        """获取备份统计信息"""
        total_backups = len(self.backup_records)
        successful_backups = len([r for r in self.backup_records if r.status == BackupStatus.COMPLETED])
        failed_backups = len([r for r in self.backup_records if r.status == BackupStatus.FAILED])
        
        total_size = sum(r.backup_size_bytes for r in self.backup_records if r.status == BackupStatus.COMPLETED)
        
        # 按类型统计
        type_stats = {}
        for backup_type in BackupType:
            type_records = [r for r in self.backup_records if r.backup_type == backup_type]
            type_stats[backup_type.value] = {
                "count": len(type_records),
                "successful": len([r for r in type_records if r.status == BackupStatus.COMPLETED]),
                "total_size_gb": sum(r.backup_size_bytes for r in type_records if r.status == BackupStatus.COMPLETED) / (1024**3)
            }
        
        return {
             "total_backups": total_backups,
             "successful_backups": successful_backups,
             "failed_backups": failed_backups,
             "success_rate": (successful_backups / total_backups * 100) if total_backups > 0 else 0,
             "total_size_gb": total_size / (1024**3),
             "by_type": type_stats
         }

6.5.2 数据恢复策略

import subprocess
from typing import List, Dict, Optional, Tuple
from enum import Enum
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
import json
import shutil
import tarfile

class RecoveryType(Enum):
    """恢复类型"""
    FULL_RESTORE = "full_restore"          # 全量恢复
    PARTIAL_RESTORE = "partial_restore"    # 部分恢复
    POINT_IN_TIME = "point_in_time"        # 时间点恢复
    METADATA_RESTORE = "metadata_restore"  # 元数据恢复

class RecoveryStatus(Enum):
    """恢复状态"""
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

@dataclass
class RecoveryPlan:
    """恢复计划"""
    recovery_id: str
    recovery_type: RecoveryType
    target_time: Optional[datetime]
    backup_ids: List[str]
    target_path: str
    verify_after_restore: bool = True
    stop_services: bool = True
    backup_current: bool = True
    restore_config: bool = True
    restore_data: bool = True

@dataclass
class RecoveryRecord:
    """恢复记录"""
    recovery_id: str
    recovery_type: RecoveryType
    status: RecoveryStatus
    start_time: datetime
    end_time: Optional[datetime]
    backup_ids: List[str]
    target_path: str
    files_restored: int
    bytes_restored: int
    error_message: Optional[str]
    metadata: Dict

class RocketMQRecoveryManager:
    """RocketMQ恢复管理器"""
    
    def __init__(self, install_dir: str, backup_manager: RocketMQBackupManager):
        self.install_dir = Path(install_dir)
        self.backup_manager = backup_manager
        self.recovery_records = []
        
        # 恢复记录文件
        self.recovery_records_file = backup_manager.backup_root / "recovery_records.json"
        self.recovery_records = self._load_recovery_records()
    
    def create_recovery_plan(self, recovery_type: RecoveryType, 
                           target_time: datetime = None,
                           backup_ids: List[str] = None) -> RecoveryPlan:
        """创建恢复计划"""
        recovery_id = f"recovery_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        
        if backup_ids is None:
            backup_ids = self._select_backup_for_recovery(recovery_type, target_time)
        
        plan = RecoveryPlan(
            recovery_id=recovery_id,
            recovery_type=recovery_type,
            target_time=target_time,
            backup_ids=backup_ids,
            target_path=str(self.install_dir),
            verify_after_restore=True,
            stop_services=True,
            backup_current=True,
            restore_config=True,
            restore_data=True
        )
        
        return plan
    
    def execute_recovery(self, plan: RecoveryPlan) -> str:
        """执行恢复"""
        # 创建恢复记录
        record = RecoveryRecord(
            recovery_id=plan.recovery_id,
            recovery_type=plan.recovery_type,
            status=RecoveryStatus.PENDING,
            start_time=datetime.now(),
            end_time=None,
            backup_ids=plan.backup_ids,
            target_path=plan.target_path,
            files_restored=0,
            bytes_restored=0,
            error_message=None,
            metadata={}
        )
        
        try:
            record.status = RecoveryStatus.RUNNING
            self._save_recovery_record(record)
            
            # 1. 停止服务
            if plan.stop_services:
                self._stop_rocketmq_services()
                record.metadata["services_stopped"] = True
            
            # 2. 备份当前数据
            if plan.backup_current:
                current_backup_id = self._backup_current_data()
                record.metadata["current_backup_id"] = current_backup_id
            
            # 3. 执行恢复
            if plan.recovery_type == RecoveryType.FULL_RESTORE:
                self._execute_full_restore(plan, record)
            elif plan.recovery_type == RecoveryType.PARTIAL_RESTORE:
                self._execute_partial_restore(plan, record)
            elif plan.recovery_type == RecoveryType.POINT_IN_TIME:
                self._execute_point_in_time_restore(plan, record)
            elif plan.recovery_type == RecoveryType.METADATA_RESTORE:
                self._execute_metadata_restore(plan, record)
            
            # 4. 验证恢复结果
            if plan.verify_after_restore:
                self._verify_recovery(record)
            
            record.status = RecoveryStatus.COMPLETED
            record.end_time = datetime.now()
            
        except Exception as e:
            record.status = RecoveryStatus.FAILED
            record.error_message = str(e)
            record.end_time = datetime.now()
            raise
        
        finally:
            self._save_recovery_record(record)
        
        return plan.recovery_id
    
    def _select_backup_for_recovery(self, recovery_type: RecoveryType, 
                                  target_time: datetime = None) -> List[str]:
        """选择用于恢复的备份"""
        if recovery_type == RecoveryType.FULL_RESTORE:
            # 选择最新的全量备份
            full_backups = self.backup_manager.list_backups(backup_type=BackupType.FULL)
            if full_backups:
                return [full_backups[0].backup_id]
        
        elif recovery_type == RecoveryType.POINT_IN_TIME and target_time:
            # 选择时间点之前的备份
            backup_ids = []
            
            # 找到目标时间之前最近的全量备份
            full_backups = [
                b for b in self.backup_manager.list_backups(backup_type=BackupType.FULL)
                if b.start_time <= target_time
            ]
            
            if full_backups:
                backup_ids.append(full_backups[0].backup_id)
                
                # 找到全量备份之后、目标时间之前的增量备份
                incremental_backups = [
                    b for b in self.backup_manager.list_backups(backup_type=BackupType.INCREMENTAL)
                    if full_backups[0].start_time < b.start_time <= target_time
                ]
                
                # 按时间排序
                incremental_backups.sort(key=lambda x: x.start_time)
                backup_ids.extend([b.backup_id for b in incremental_backups])
            
            return backup_ids
        
        elif recovery_type == RecoveryType.METADATA_RESTORE:
            # 选择最新的元数据备份
            metadata_backups = self.backup_manager.list_backups(backup_type=BackupType.METADATA)
            if metadata_backups:
                return [metadata_backups[0].backup_id]
        
        return []
    
    def _execute_full_restore(self, plan: RecoveryPlan, record: RecoveryRecord):
        """执行全量恢复"""
        for backup_id in plan.backup_ids:
            backup_record = self._get_backup_record(backup_id)
            if not backup_record:
                raise ValueError(f"备份记录不存在: {backup_id}")
            
            # 恢复数据
            if plan.restore_data:
                self._restore_backup_data(backup_record, record)
            
            # 恢复配置
            if plan.restore_config:
                self._restore_backup_config(backup_record, record)
    
    def _execute_partial_restore(self, plan: RecoveryPlan, record: RecoveryRecord):
        """执行部分恢复"""
        # 只恢复指定的组件
        for backup_id in plan.backup_ids:
            backup_record = self._get_backup_record(backup_id)
            if not backup_record:
                continue
            
            # 根据备份类型决定恢复内容
            if "commitlog" in backup_record.source_path:
                self._restore_commitlog(backup_record, record)
            elif "consumequeue" in backup_record.source_path:
                self._restore_consumequeue(backup_record, record)
            elif "index" in backup_record.source_path:
                self._restore_index(backup_record, record)
            elif "conf" in backup_record.source_path:
                self._restore_config(backup_record, record)
    
    def _execute_point_in_time_restore(self, plan: RecoveryPlan, record: RecoveryRecord):
        """执行时间点恢复"""
        # 按顺序应用备份
        for backup_id in plan.backup_ids:
            backup_record = self._get_backup_record(backup_id)
            if not backup_record:
                continue
            
            if backup_record.backup_type == BackupType.FULL:
                self._restore_backup_data(backup_record, record)
            elif backup_record.backup_type == BackupType.INCREMENTAL:
                self._apply_incremental_backup(backup_record, record)
            elif backup_record.backup_type == BackupType.DIFFERENTIAL:
                self._apply_differential_backup(backup_record, record)
    
    def _execute_metadata_restore(self, plan: RecoveryPlan, record: RecoveryRecord):
        """执行元数据恢复"""
        for backup_id in plan.backup_ids:
            backup_record = self._get_backup_record(backup_id)
            if not backup_record:
                continue
            
            if backup_record.backup_type == BackupType.METADATA:
                self._restore_metadata(backup_record, record)
    
    def _restore_backup_data(self, backup_record: BackupRecord, recovery_record: RecoveryRecord):
        """恢复备份数据"""
        backup_path = Path(backup_record.backup_path)
        target_path = Path(recovery_record.target_path)
        
        if backup_path.suffix == ".gz":
            # 解压缩备份
            with tarfile.open(backup_path, "r:gz") as tar:
                tar.extractall(target_path.parent)
                
                # 移动解压的内容到目标位置
                extracted_dir = target_path.parent / backup_path.stem.replace(".tar", "")
                if extracted_dir.exists():
                    if target_path.exists():
                        shutil.rmtree(target_path)
                    shutil.move(str(extracted_dir), str(target_path))
        else:
            # 直接复制目录
            if target_path.exists():
                shutil.rmtree(target_path)
            shutil.copytree(backup_path, target_path)
        
        recovery_record.files_restored += backup_record.file_count
        recovery_record.bytes_restored += backup_record.backup_size_bytes
    
    def _restore_commitlog(self, backup_record: BackupRecord, recovery_record: RecoveryRecord):
        """恢复CommitLog"""
        source_path = Path(backup_record.backup_path)
        target_path = self.install_dir / "store" / "commitlog"
        
        # 清空现有CommitLog
        if target_path.exists():
            shutil.rmtree(target_path)
        
        # 恢复CommitLog
        if source_path.suffix == ".gz":
            with tarfile.open(source_path, "r:gz") as tar:
                tar.extractall(target_path.parent)
        else:
            shutil.copytree(source_path, target_path)
        
        recovery_record.files_restored += backup_record.file_count
        recovery_record.bytes_restored += backup_record.backup_size_bytes
    
    def _restore_consumequeue(self, backup_record: BackupRecord, recovery_record: RecoveryRecord):
        """恢复ConsumeQueue"""
        source_path = Path(backup_record.backup_path)
        target_path = self.install_dir / "store" / "consumequeue"
        
        # 清空现有ConsumeQueue
        if target_path.exists():
            shutil.rmtree(target_path)
        
        # 恢复ConsumeQueue
        if source_path.suffix == ".gz":
            with tarfile.open(source_path, "r:gz") as tar:
                tar.extractall(target_path.parent)
        else:
            shutil.copytree(source_path, target_path)
        
        recovery_record.files_restored += backup_record.file_count
        recovery_record.bytes_restored += backup_record.backup_size_bytes
    
    def _restore_index(self, backup_record: BackupRecord, recovery_record: RecoveryRecord):
        """恢复Index"""
        source_path = Path(backup_record.backup_path)
        target_path = self.install_dir / "store" / "index"
        
        # 清空现有Index
        if target_path.exists():
            shutil.rmtree(target_path)
        
        # 恢复Index
        if source_path.suffix == ".gz":
            with tarfile.open(source_path, "r:gz") as tar:
                tar.extractall(target_path.parent)
        else:
            shutil.copytree(source_path, target_path)
        
        recovery_record.files_restored += backup_record.file_count
        recovery_record.bytes_restored += backup_record.backup_size_bytes
    
    def _restore_config(self, backup_record: BackupRecord, recovery_record: RecoveryRecord):
        """恢复配置文件"""
        source_path = Path(backup_record.backup_path)
        target_path = self.install_dir / "conf"
        
        # 备份现有配置
        if target_path.exists():
            backup_config_path = target_path.parent / f"conf_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
            shutil.copytree(target_path, backup_config_path)
            shutil.rmtree(target_path)
        
        # 恢复配置
        if source_path.suffix == ".gz":
            with tarfile.open(source_path, "r:gz") as tar:
                tar.extractall(target_path.parent)
        else:
            shutil.copytree(source_path, target_path)
        
        recovery_record.files_restored += backup_record.file_count
        recovery_record.bytes_restored += backup_record.backup_size_bytes
    
    def _apply_incremental_backup(self, backup_record: BackupRecord, recovery_record: RecoveryRecord):
        """应用增量备份"""
        source_path = Path(backup_record.backup_path)
        
        if source_path.suffix == ".gz":
            # 解压到临时目录
            temp_dir = source_path.parent / f"temp_{backup_record.backup_id}"
            with tarfile.open(source_path, "r:gz") as tar:
                tar.extractall(temp_dir)
            
            # 复制文件到目标位置
            extracted_dir = temp_dir / source_path.stem.replace(".tar", "")
            self._copy_incremental_files(extracted_dir, self.install_dir)
            
            # 清理临时目录
            shutil.rmtree(temp_dir)
        else:
            self._copy_incremental_files(source_path, self.install_dir)
        
        recovery_record.files_restored += backup_record.file_count
        recovery_record.bytes_restored += backup_record.backup_size_bytes
    
    def _apply_differential_backup(self, backup_record: BackupRecord, recovery_record: RecoveryRecord):
        """应用差异备份"""
        # 差异备份的应用方式与增量备份类似
        self._apply_incremental_backup(backup_record, recovery_record)
    
    def _copy_incremental_files(self, source_dir: Path, target_dir: Path):
        """复制增量文件"""
        for file_path in source_dir.rglob("*"):
            if file_path.is_file():
                relative_path = file_path.relative_to(source_dir)
                target_file = target_dir / relative_path
                
                # 确保目标目录存在
                target_file.parent.mkdir(parents=True, exist_ok=True)
                
                # 复制文件
                shutil.copy2(file_path, target_file)
    
    def _restore_metadata(self, backup_record: BackupRecord, recovery_record: RecoveryRecord):
        """恢复元数据"""
        metadata_file = Path(backup_record.backup_path) / "metadata.json"
        
        if metadata_file.exists():
            with open(metadata_file, 'r', encoding='utf-8') as f:
                metadata = json.load(f)
            
            # 恢复Topic配置
            if "topic_info" in metadata:
                self._restore_topics(metadata["topic_info"])
            
            # 恢复消费者组配置
            if "consumer_info" in metadata:
                self._restore_consumer_groups(metadata["consumer_info"])
            
            # 恢复Broker配置
            if "broker_config" in metadata:
                self._restore_broker_configs(metadata["broker_config"])
            
            recovery_record.metadata["metadata_restored"] = True
    
    def _restore_topics(self, topic_info: Dict):
        """恢复Topic配置"""
        # 这里应该通过管理接口恢复Topic
        # 简化实现
        pass
    
    def _restore_consumer_groups(self, consumer_info: Dict):
        """恢复消费者组配置"""
        # 这里应该通过管理接口恢复消费者组
        # 简化实现
        pass
    
    def _restore_broker_configs(self, broker_config: Dict):
        """恢复Broker配置"""
        if "config_files" in broker_config:
            for config_file_info in broker_config["config_files"]:
                file_path = self.install_dir / config_file_info["file"]
                file_path.parent.mkdir(parents=True, exist_ok=True)
                
                with open(file_path, 'w', encoding='utf-8') as f:
                    f.write(config_file_info["content"])
    
    def _stop_rocketmq_services(self):
        """停止RocketMQ服务"""
        try:
            # 停止Broker
            broker_script = self.install_dir / "bin" / "mqshutdown"
            if broker_script.exists():
                subprocess.run([str(broker_script), "broker"], timeout=30)
            
            # 停止NameServer
            if broker_script.exists():
                subprocess.run([str(broker_script), "namesrv"], timeout=30)
            
            time.sleep(5)  # 等待服务完全停止
        except Exception as e:
            print(f"停止服务时出错: {e}")
    
    def _backup_current_data(self) -> str:
        """备份当前数据"""
        # 创建当前数据的备份
        backup_config = BackupConfig(
            backup_type=BackupType.FULL,
            source_path=str(self.install_dir / "store"),
            backup_path=str(self.backup_manager.backup_root / "pre_recovery"),
            retention_days=7,
            compression=True,
            verify_integrity=False
        )
        
        return self.backup_manager.create_backup("pre_recovery", backup_config)
    
    def _verify_recovery(self, record: RecoveryRecord):
        """验证恢复结果"""
        # 检查关键文件是否存在
        store_dir = self.install_dir / "store"
        
        checks = {
            "commitlog_exists": (store_dir / "commitlog").exists(),
            "consumequeue_exists": (store_dir / "consumequeue").exists(),
            "index_exists": (store_dir / "index").exists(),
            "config_exists": (self.install_dir / "conf").exists()
        }
        
        record.metadata["verification_results"] = checks
        
        # 如果关键组件缺失,标记为失败
        if not all(checks.values()):
            raise ValueError(f"恢复验证失败: {checks}")
    
    def _get_backup_record(self, backup_id: str) -> Optional[BackupRecord]:
        """获取备份记录"""
        for record in self.backup_manager.backup_records:
            if record.backup_id == backup_id:
                return record
        return None
    
    def _load_recovery_records(self) -> List[RecoveryRecord]:
        """加载恢复记录"""
        if not self.recovery_records_file.exists():
            return []
        
        try:
            with open(self.recovery_records_file, 'r', encoding='utf-8') as f:
                data = json.load(f)
            
            records = []
            for item in data:
                record = RecoveryRecord(
                    recovery_id=item['recovery_id'],
                    recovery_type=RecoveryType(item['recovery_type']),
                    status=RecoveryStatus(item['status']),
                    start_time=datetime.fromisoformat(item['start_time']),
                    end_time=datetime.fromisoformat(item['end_time']) if item['end_time'] else None,
                    backup_ids=item['backup_ids'],
                    target_path=item['target_path'],
                    files_restored=item['files_restored'],
                    bytes_restored=item['bytes_restored'],
                    error_message=item.get('error_message'),
                    metadata=item.get('metadata', {})
                )
                records.append(record)
            
            return records
        except Exception:
            return []
    
    def _save_recovery_record(self, record: RecoveryRecord):
        """保存恢复记录"""
        # 更新记录列表
        existing_index = None
        for i, existing_record in enumerate(self.recovery_records):
            if existing_record.recovery_id == record.recovery_id:
                existing_index = i
                break
        
        if existing_index is not None:
            self.recovery_records[existing_index] = record
        else:
            self.recovery_records.append(record)
        
        # 保存到文件
        data = []
        for r in self.recovery_records:
            data.append({
                'recovery_id': r.recovery_id,
                'recovery_type': r.recovery_type.value,
                'status': r.status.value,
                'start_time': r.start_time.isoformat(),
                'end_time': r.end_time.isoformat() if r.end_time else None,
                'backup_ids': r.backup_ids,
                'target_path': r.target_path,
                'files_restored': r.files_restored,
                'bytes_restored': r.bytes_restored,
                'error_message': r.error_message,
                'metadata': r.metadata
            })
        
        with open(self.recovery_records_file, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)
    
    def list_recovery_records(self) -> List[RecoveryRecord]:
        """列出恢复记录"""
        return sorted(self.recovery_records, key=lambda x: x.start_time, reverse=True)
    
    def get_recovery_statistics(self) -> Dict:
        """获取恢复统计信息"""
        total_recoveries = len(self.recovery_records)
        successful_recoveries = len([r for r in self.recovery_records if r.status == RecoveryStatus.COMPLETED])
        failed_recoveries = len([r for r in self.recovery_records if r.status == RecoveryStatus.FAILED])
        
        total_files_restored = sum(r.files_restored for r in self.recovery_records if r.status == RecoveryStatus.COMPLETED)
        total_bytes_restored = sum(r.bytes_restored for r in self.recovery_records if r.status == RecoveryStatus.COMPLETED)
        
        # 按类型统计
        type_stats = {}
        for recovery_type in RecoveryType:
            type_records = [r for r in self.recovery_records if r.recovery_type == recovery_type]
            type_stats[recovery_type.value] = {
                "count": len(type_records),
                "successful": len([r for r in type_records if r.status == RecoveryStatus.COMPLETED]),
                "total_files_restored": sum(r.files_restored for r in type_records if r.status == RecoveryStatus.COMPLETED),
                "total_size_gb": sum(r.bytes_restored for r in type_records if r.status == RecoveryStatus.COMPLETED) / (1024**3)
            }
        
        return {
             "total_recoveries": total_recoveries,
             "successful_recoveries": successful_recoveries,
             "failed_recoveries": failed_recoveries,
             "success_rate": (successful_recoveries / total_recoveries * 100) if total_recoveries > 0 else 0,
             "total_files_restored": total_files_restored,
             "total_size_gb": total_bytes_restored / (1024**3),
             "by_type": type_stats
         }

# 使用示例
if __name__ == "__main__":
    import time
    
    # 初始化备份和恢复管理器
    backup_manager = RocketMQBackupManager("/opt/rocketmq", "/backup/rocketmq")
    recovery_manager = RocketMQRecoveryManager("/opt/rocketmq", backup_manager)
    
    print("=== RocketMQ 备份恢复管理演示 ===")
    
    # 1. 创建备份
    print("\n1. 创建全量备份...")
    full_backup_config = backup_manager.create_default_backup_configs()["full_backup"]
    full_backup_id = backup_manager.create_backup("manual_full", full_backup_config)
    print(f"全量备份创建完成: {full_backup_id}")
    
    # 等待一段时间模拟数据变化
    time.sleep(2)
    
    # 2. 创建增量备份
    print("\n2. 创建增量备份...")
    incremental_config = backup_manager.create_default_backup_configs()["incremental_backup"]
    incremental_backup_id = backup_manager.create_backup("manual_incremental", incremental_config)
    print(f"增量备份创建完成: {incremental_backup_id}")
    
    # 3. 列出备份
    print("\n3. 当前备份列表:")
    backups = backup_manager.list_backups()
    for backup in backups[:5]:  # 显示最近5个备份
        print(f"  - {backup.backup_id}: {backup.backup_type.value} ({backup.start_time.strftime('%Y-%m-%d %H:%M:%S')})")
    
    # 4. 创建恢复计划
    print("\n4. 创建恢复计划...")
    recovery_plan = recovery_manager.create_recovery_plan(
        recovery_type=RecoveryType.FULL_RESTORE
    )
    print(f"恢复计划创建完成: {recovery_plan.recovery_id}")
    print(f"  - 恢复类型: {recovery_plan.recovery_type.value}")
    print(f"  - 备份ID: {recovery_plan.backup_ids}")
    print(f"  - 目标路径: {recovery_plan.target_path}")
    
    # 5. 模拟恢复(实际环境中需要谨慎执行)
    print("\n5. 模拟恢复过程...")
    try:
        # 注意:这里只是演示,实际恢复会停止服务并修改数据
        # recovery_id = recovery_manager.execute_recovery(recovery_plan)
        # print(f"恢复完成: {recovery_id}")
        print("恢复计划已准备就绪(演示模式,未实际执行)")
    except Exception as e:
        print(f"恢复失败: {e}")
    
    # 6. 显示统计信息
    print("\n6. 备份统计信息:")
    backup_stats = backup_manager.get_backup_statistics()
    print(f"  - 总备份数: {backup_stats['total_backups']}")
    print(f"  - 成功率: {backup_stats['success_rate']:.1f}%")
    print(f"  - 总大小: {backup_stats['total_size_gb']:.2f} GB")
    
    print("\n7. 恢复统计信息:")
    recovery_stats = recovery_manager.get_recovery_statistics()
    print(f"  - 总恢复数: {recovery_stats['total_recoveries']}")
    print(f"  - 成功率: {recovery_stats['success_rate']:.1f}%")
    print(f"  - 总恢复大小: {recovery_stats['total_size_gb']:.2f} GB")
    
    print("\n=== 演示完成 ===")

6.5.3 容灾演练

import random
import threading
from typing import List, Dict, Callable
from enum import Enum
from dataclasses import dataclass
from datetime import datetime, timedelta
import json
import time

class DrillType(Enum):
    """演练类型"""
    FULL_DISASTER = "full_disaster"        # 全灾难演练
    PARTIAL_FAILURE = "partial_failure"    # 部分故障演练
    DATA_CORRUPTION = "data_corruption"    # 数据损坏演练
    NETWORK_PARTITION = "network_partition" # 网络分区演练
    HARDWARE_FAILURE = "hardware_failure"  # 硬件故障演练

class DrillStatus(Enum):
    """演练状态"""
    PLANNED = "planned"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

@dataclass
class DrillScenario:
    """演练场景"""
    scenario_id: str
    drill_type: DrillType
    description: str
    failure_components: List[str]
    expected_recovery_time: int  # 预期恢复时间(分钟)
    success_criteria: List[str]
    preparation_steps: List[str]
    execution_steps: List[str]
    verification_steps: List[str]

@dataclass
class DrillRecord:
    """演练记录"""
    drill_id: str
    scenario_id: str
    drill_type: DrillType
    status: DrillStatus
    start_time: datetime
    end_time: Optional[datetime]
    actual_recovery_time: Optional[int]
    success: bool
    participants: List[str]
    observations: List[str]
    issues_found: List[str]
    improvements: List[str]
    metadata: Dict

class DisasterRecoveryDrill:
    """容灾演练管理器"""
    
    def __init__(self, cluster_manager: ClusterManager, 
                 backup_manager: RocketMQBackupManager,
                 recovery_manager: RocketMQRecoveryManager):
        self.cluster_manager = cluster_manager
        self.backup_manager = backup_manager
        self.recovery_manager = recovery_manager
        self.drill_records = []
        self.scenarios = self._create_default_scenarios()
        
        # 演练记录文件
        self.drill_records_file = Path("/var/log/rocketmq/drill_records.json")
        self.drill_records = self._load_drill_records()
    
    def _create_default_scenarios(self) -> List[DrillScenario]:
        """创建默认演练场景"""
        scenarios = [
            DrillScenario(
                scenario_id="full_disaster_01",
                drill_type=DrillType.FULL_DISASTER,
                description="完全灾难恢复演练 - 所有节点故障",
                failure_components=["all_nameservers", "all_brokers", "storage"],
                expected_recovery_time=30,
                success_criteria=[
                    "所有服务在30分钟内恢复",
                    "数据完整性验证通过",
                    "消息生产消费正常",
                    "集群状态健康"
                ],
                preparation_steps=[
                    "确认最新备份可用",
                    "准备恢复环境",
                    "通知相关人员",
                    "准备监控工具"
                ],
                execution_steps=[
                    "模拟所有节点故障",
                    "启动恢复流程",
                    "恢复NameServer",
                    "恢复Broker集群",
                    "验证数据完整性",
                    "恢复业务流量"
                ],
                verification_steps=[
                    "检查集群状态",
                    "验证消息生产",
                    "验证消息消费",
                    "检查数据一致性",
                    "性能基准测试"
                ]
            ),
            DrillScenario(
                scenario_id="partial_failure_01",
                drill_type=DrillType.PARTIAL_FAILURE,
                description="部分故障演练 - 单个Broker故障",
                failure_components=["broker_1"],
                expected_recovery_time=10,
                success_criteria=[
                    "故障Broker在10分钟内恢复",
                    "其他Broker正常运行",
                    "消息路由自动切换",
                    "无消息丢失"
                ],
                preparation_steps=[
                    "确认集群状态正常",
                    "准备监控工具",
                    "通知运维团队"
                ],
                execution_steps=[
                    "停止目标Broker",
                    "观察集群反应",
                    "启动故障恢复",
                    "验证服务恢复"
                ],
                verification_steps=[
                    "检查Broker状态",
                    "验证消息路由",
                    "检查消息堆积",
                    "验证负载均衡"
                ]
            ),
            DrillScenario(
                scenario_id="data_corruption_01",
                drill_type=DrillType.DATA_CORRUPTION,
                description="数据损坏恢复演练 - CommitLog损坏",
                failure_components=["commitlog"],
                expected_recovery_time=20,
                success_criteria=[
                    "数据在20分钟内恢复",
                    "消息完整性验证通过",
                    "服务正常运行"
                ],
                preparation_steps=[
                    "创建测试备份",
                    "准备恢复脚本",
                    "设置监控告警"
                ],
                execution_steps=[
                    "模拟CommitLog损坏",
                    "检测故障",
                    "停止相关服务",
                    "从备份恢复数据",
                    "重启服务",
                    "验证数据完整性"
                ],
                verification_steps=[
                    "检查CommitLog完整性",
                    "验证消息可读性",
                    "检查索引一致性",
                    "性能测试"
                ]
            ),
            DrillScenario(
                scenario_id="network_partition_01",
                drill_type=DrillType.NETWORK_PARTITION,
                description="网络分区演练 - NameServer网络隔离",
                failure_components=["nameserver_network"],
                expected_recovery_time=15,
                success_criteria=[
                    "网络恢复后服务自动重连",
                    "Broker重新注册成功",
                    "路由信息更新正常"
                ],
                preparation_steps=[
                    "配置网络隔离工具",
                    "准备网络监控",
                    "设置恢复脚本"
                ],
                execution_steps=[
                    "隔离NameServer网络",
                    "观察Broker行为",
                    "恢复网络连接",
                    "验证服务重连"
                ],
                verification_steps=[
                    "检查网络连通性",
                    "验证服务注册",
                    "检查路由更新",
                    "测试消息路由"
                ]
            )
        ]
        
        return scenarios
    
    def schedule_drill(self, scenario_id: str, scheduled_time: datetime, 
                      participants: List[str]) -> str:
        """安排演练"""
        drill_id = f"drill_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        
        scenario = self._get_scenario(scenario_id)
        if not scenario:
            raise ValueError(f"演练场景不存在: {scenario_id}")
        
        record = DrillRecord(
            drill_id=drill_id,
            scenario_id=scenario_id,
            drill_type=scenario.drill_type,
            status=DrillStatus.PLANNED,
            start_time=scheduled_time,
            end_time=None,
            actual_recovery_time=None,
            success=False,
            participants=participants,
            observations=[],
            issues_found=[],
            improvements=[],
            metadata={
                "scheduled_time": scheduled_time.isoformat(),
                "scenario": scenario.__dict__
            }
        )
        
        self.drill_records.append(record)
        self._save_drill_records()
        
        return drill_id
    
    def execute_drill(self, drill_id: str) -> bool:
        """执行演练"""
        record = self._get_drill_record(drill_id)
        if not record:
            raise ValueError(f"演练记录不存在: {drill_id}")
        
        scenario = self._get_scenario(record.scenario_id)
        if not scenario:
            raise ValueError(f"演练场景不存在: {record.scenario_id}")
        
        try:
            record.status = DrillStatus.RUNNING
            record.start_time = datetime.now()
            self._save_drill_records()
            
            print(f"开始执行演练: {scenario.description}")
            
            # 1. 准备阶段
            print("\n=== 准备阶段 ===")
            for step in scenario.preparation_steps:
                print(f"执行: {step}")
                self._execute_preparation_step(step, record)
                time.sleep(1)
            
            # 2. 故障注入阶段
            print("\n=== 故障注入阶段 ===")
            failure_start_time = datetime.now()
            for component in scenario.failure_components:
                print(f"模拟故障: {component}")
                self._inject_failure(component, record)
                time.sleep(2)
            
            # 3. 执行恢复阶段
            print("\n=== 恢复阶段 ===")
            for step in scenario.execution_steps:
                print(f"执行: {step}")
                self._execute_recovery_step(step, record)
                time.sleep(2)
            
            recovery_end_time = datetime.now()
            record.actual_recovery_time = int((recovery_end_time - failure_start_time).total_seconds() / 60)
            
            # 4. 验证阶段
            print("\n=== 验证阶段 ===")
            verification_results = []
            for step in scenario.verification_steps:
                print(f"验证: {step}")
                result = self._execute_verification_step(step, record)
                verification_results.append(result)
                time.sleep(1)
            
            # 5. 评估结果
            record.success = self._evaluate_drill_success(scenario, record, verification_results)
            record.status = DrillStatus.COMPLETED
            record.end_time = datetime.now()
            
            print(f"\n=== 演练完成 ===")
            print(f"演练结果: {'成功' if record.success else '失败'}")
            print(f"实际恢复时间: {record.actual_recovery_time} 分钟")
            print(f"预期恢复时间: {scenario.expected_recovery_time} 分钟")
            
            return record.success
            
        except Exception as e:
            record.status = DrillStatus.FAILED
            record.end_time = datetime.now()
            record.issues_found.append(f"演练执行异常: {str(e)}")
            print(f"演练执行失败: {e}")
            return False
        
        finally:
            self._save_drill_records()
    
    def _execute_preparation_step(self, step: str, record: DrillRecord):
        """执行准备步骤"""
        if "备份" in step:
            # 创建演练前备份
            backup_config = self.backup_manager.create_default_backup_configs()["full_backup"]
            backup_id = self.backup_manager.create_backup("drill_preparation", backup_config)
            record.metadata["preparation_backup_id"] = backup_id
            record.observations.append(f"创建演练备份: {backup_id}")
        
        elif "监控" in step:
            # 启动监控
            record.observations.append("启动演练监控")
        
        elif "通知" in step:
            # 发送通知
            record.observations.append("发送演练通知")
    
    def _inject_failure(self, component: str, record: DrillRecord):
        """注入故障"""
        if component == "all_nameservers":
            # 停止所有NameServer
            record.observations.append("停止所有NameServer服务")
            
        elif component == "all_brokers":
            # 停止所有Broker
            record.observations.append("停止所有Broker服务")
            
        elif component.startswith("broker_"):
            # 停止指定Broker
            broker_id = component.split("_")[1]
            record.observations.append(f"停止Broker {broker_id}")
            
        elif component == "commitlog":
            # 模拟CommitLog损坏
            record.observations.append("模拟CommitLog文件损坏")
            
        elif component == "nameserver_network":
            # 模拟网络分区
            record.observations.append("模拟NameServer网络分区")
        
        record.metadata[f"failure_{component}"] = datetime.now().isoformat()
    
    def _execute_recovery_step(self, step: str, record: DrillRecord):
        """执行恢复步骤"""
        if "恢复NameServer" in step:
            record.observations.append("启动NameServer恢复流程")
            
        elif "恢复Broker" in step:
            record.observations.append("启动Broker恢复流程")
            
        elif "数据完整性" in step:
            record.observations.append("执行数据完整性验证")
            
        elif "业务流量" in step:
            record.observations.append("恢复业务流量")
        
        elif "从备份恢复" in step:
            # 执行数据恢复
            recovery_plan = self.recovery_manager.create_recovery_plan(
                recovery_type=RecoveryType.FULL_RESTORE
            )
            record.observations.append(f"创建恢复计划: {recovery_plan.recovery_id}")
    
    def _execute_verification_step(self, step: str, record: DrillRecord) -> bool:
        """执行验证步骤"""
        if "集群状态" in step:
            # 检查集群状态
            result = True  # 模拟检查结果
            record.observations.append(f"集群状态检查: {'通过' if result else '失败'}")
            return result
            
        elif "消息生产" in step:
            # 验证消息生产
            result = True
            record.observations.append(f"消息生产验证: {'通过' if result else '失败'}")
            return result
            
        elif "消息消费" in step:
            # 验证消息消费
            result = True
            record.observations.append(f"消息消费验证: {'通过' if result else '失败'}")
            return result
            
        elif "数据一致性" in step:
            # 检查数据一致性
            result = True
            record.observations.append(f"数据一致性检查: {'通过' if result else '失败'}")
            return result
            
        elif "性能" in step:
            # 性能测试
            result = True
            record.observations.append(f"性能测试: {'通过' if result else '失败'}")
            return result
        
        return True
    
    def _evaluate_drill_success(self, scenario: DrillScenario, record: DrillRecord, 
                              verification_results: List[bool]) -> bool:
        """评估演练成功性"""
        # 检查恢复时间
        time_success = record.actual_recovery_time <= scenario.expected_recovery_time
        
        # 检查验证结果
        verification_success = all(verification_results)
        
        # 检查是否有严重问题
        no_critical_issues = len([issue for issue in record.issues_found if "严重" in issue or "失败" in issue]) == 0
        
        success = time_success and verification_success and no_critical_issues
        
        # 记录评估结果
        record.metadata["evaluation"] = {
            "time_success": time_success,
            "verification_success": verification_success,
            "no_critical_issues": no_critical_issues,
            "overall_success": success
        }
        
        return success
    
    def generate_drill_report(self, drill_id: str) -> Dict:
        """生成演练报告"""
        record = self._get_drill_record(drill_id)
        if not record:
            raise ValueError(f"演练记录不存在: {drill_id}")
        
        scenario = self._get_scenario(record.scenario_id)
        
        report = {
            "drill_info": {
                "drill_id": record.drill_id,
                "scenario_id": record.scenario_id,
                "drill_type": record.drill_type.value,
                "description": scenario.description if scenario else "未知场景",
                "start_time": record.start_time.isoformat(),
                "end_time": record.end_time.isoformat() if record.end_time else None,
                "duration_minutes": int((record.end_time - record.start_time).total_seconds() / 60) if record.end_time else None,
                "participants": record.participants
            },
            "results": {
                "status": record.status.value,
                "success": record.success,
                "expected_recovery_time": scenario.expected_recovery_time if scenario else None,
                "actual_recovery_time": record.actual_recovery_time,
                "time_performance": "优秀" if record.actual_recovery_time and scenario and record.actual_recovery_time <= scenario.expected_recovery_time * 0.8 else 
                                  "良好" if record.actual_recovery_time and scenario and record.actual_recovery_time <= scenario.expected_recovery_time else "需改进"
            },
            "observations": record.observations,
            "issues_found": record.issues_found,
            "improvements": record.improvements,
            "evaluation": record.metadata.get("evaluation", {}),
            "recommendations": self._generate_recommendations(record, scenario)
        }
        
        return report
    
    def _generate_recommendations(self, record: DrillRecord, scenario: DrillScenario) -> List[str]:
        """生成改进建议"""
        recommendations = []
        
        # 基于恢复时间的建议
        if record.actual_recovery_time and scenario:
            if record.actual_recovery_time > scenario.expected_recovery_time:
                recommendations.append("恢复时间超出预期,建议优化恢复流程")
                recommendations.append("考虑增加自动化恢复脚本")
            
            if record.actual_recovery_time > scenario.expected_recovery_time * 1.5:
                recommendations.append("恢复时间严重超时,需要重新评估恢复策略")
        
        # 基于问题的建议
        if record.issues_found:
            recommendations.append("发现问题需要及时修复")
            recommendations.append("建议增加相关监控和告警")
        
        # 基于演练类型的建议
        if record.drill_type == DrillType.FULL_DISASTER:
            recommendations.append("建议定期进行全灾难演练")
            recommendations.append("考虑建立异地容灾中心")
        
        elif record.drill_type == DrillType.DATA_CORRUPTION:
            recommendations.append("建议增加数据校验机制")
            recommendations.append("考虑实施更频繁的备份策略")
        
        return recommendations
    
    def _get_scenario(self, scenario_id: str) -> Optional[DrillScenario]:
        """获取演练场景"""
        for scenario in self.scenarios:
            if scenario.scenario_id == scenario_id:
                return scenario
        return None
    
    def _get_drill_record(self, drill_id: str) -> Optional[DrillRecord]:
        """获取演练记录"""
        for record in self.drill_records:
            if record.drill_id == drill_id:
                return record
        return None
    
    def _load_drill_records(self) -> List[DrillRecord]:
        """加载演练记录"""
        if not self.drill_records_file.exists():
            return []
        
        try:
            with open(self.drill_records_file, 'r', encoding='utf-8') as f:
                data = json.load(f)
            
            records = []
            for item in data:
                record = DrillRecord(
                    drill_id=item['drill_id'],
                    scenario_id=item['scenario_id'],
                    drill_type=DrillType(item['drill_type']),
                    status=DrillStatus(item['status']),
                    start_time=datetime.fromisoformat(item['start_time']),
                    end_time=datetime.fromisoformat(item['end_time']) if item['end_time'] else None,
                    actual_recovery_time=item.get('actual_recovery_time'),
                    success=item.get('success', False),
                    participants=item.get('participants', []),
                    observations=item.get('observations', []),
                    issues_found=item.get('issues_found', []),
                    improvements=item.get('improvements', []),
                    metadata=item.get('metadata', {})
                )
                records.append(record)
            
            return records
        except Exception:
            return []
    
    def _save_drill_records(self):
        """保存演练记录"""
        data = []
        for record in self.drill_records:
            data.append({
                'drill_id': record.drill_id,
                'scenario_id': record.scenario_id,
                'drill_type': record.drill_type.value,
                'status': record.status.value,
                'start_time': record.start_time.isoformat(),
                'end_time': record.end_time.isoformat() if record.end_time else None,
                'actual_recovery_time': record.actual_recovery_time,
                'success': record.success,
                'participants': record.participants,
                'observations': record.observations,
                'issues_found': record.issues_found,
                'improvements': record.improvements,
                'metadata': record.metadata
            })
        
        # 确保目录存在
        self.drill_records_file.parent.mkdir(parents=True, exist_ok=True)
        
        with open(self.drill_records_file, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)
    
    def list_scenarios(self) -> List[DrillScenario]:
        """列出所有演练场景"""
        return self.scenarios
    
    def list_drill_records(self) -> List[DrillRecord]:
        """列出演练记录"""
        return sorted(self.drill_records, key=lambda x: x.start_time, reverse=True)
    
    def get_drill_statistics(self) -> Dict:
        """获取演练统计信息"""
        total_drills = len(self.drill_records)
        successful_drills = len([r for r in self.drill_records if r.success])
        completed_drills = len([r for r in self.drill_records if r.status == DrillStatus.COMPLETED])
        
        # 按类型统计
        type_stats = {}
        for drill_type in DrillType:
            type_records = [r for r in self.drill_records if r.drill_type == drill_type]
            type_stats[drill_type.value] = {
                "count": len(type_records),
                "successful": len([r for r in type_records if r.success]),
                "avg_recovery_time": sum(r.actual_recovery_time for r in type_records if r.actual_recovery_time) / len([r for r in type_records if r.actual_recovery_time]) if any(r.actual_recovery_time for r in type_records) else 0
            }
        
        return {
             "total_drills": total_drills,
             "successful_drills": successful_drills,
             "completed_drills": completed_drills,
             "success_rate": (successful_drills / completed_drills * 100) if completed_drills > 0 else 0,
             "by_type": type_stats
         }

# 使用示例
if __name__ == "__main__":
    from datetime import datetime, timedelta
    
    # 初始化管理器
    cluster_manager = ClusterManager([
        {"host": "192.168.1.10", "nameserver_port": 9876},
        {"host": "192.168.1.11", "nameserver_port": 9876}
    ])
    backup_manager = RocketMQBackupManager("/opt/rocketmq", "/backup/rocketmq")
    recovery_manager = RocketMQRecoveryManager("/opt/rocketmq", backup_manager)
    
    # 创建容灾演练管理器
    drill_manager = DisasterRecoveryDrill(cluster_manager, backup_manager, recovery_manager)
    
    print("=== RocketMQ 容灾演练管理演示 ===")
    
    # 1. 列出可用的演练场景
    print("\n1. 可用的演练场景:")
    scenarios = drill_manager.list_scenarios()
    for scenario in scenarios:
        print(f"  - {scenario.scenario_id}: {scenario.description}")
        print(f"    类型: {scenario.drill_type.value}")
        print(f"    预期恢复时间: {scenario.expected_recovery_time} 分钟")
        print(f"    故障组件: {', '.join(scenario.failure_components)}")
        print()
    
    # 2. 安排演练
    print("\n2. 安排部分故障演练...")
    scheduled_time = datetime.now() + timedelta(minutes=5)
    participants = ["运维工程师A", "系统管理员B", "开发工程师C"]
    
    drill_id = drill_manager.schedule_drill(
        scenario_id="partial_failure_01",
        scheduled_time=scheduled_time,
        participants=participants
    )
    print(f"演练已安排: {drill_id}")
    print(f"计划时间: {scheduled_time.strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"参与人员: {', '.join(participants)}")
    
    # 3. 执行演练(模拟)
    print("\n3. 执行演练...")
    try:
        success = drill_manager.execute_drill(drill_id)
        print(f"演练执行完成,结果: {'成功' if success else '失败'}")
    except Exception as e:
        print(f"演练执行异常: {e}")
    
    # 4. 生成演练报告
    print("\n4. 生成演练报告...")
    try:
        report = drill_manager.generate_drill_report(drill_id)
        print("演练报告:")
        print(f"  演练ID: {report['drill_info']['drill_id']}")
        print(f"  演练类型: {report['drill_info']['drill_type']}")
        print(f"  演练描述: {report['drill_info']['description']}")
        print(f"  演练状态: {report['results']['status']}")
        print(f"  演练结果: {'成功' if report['results']['success'] else '失败'}")
        print(f"  预期恢复时间: {report['results']['expected_recovery_time']} 分钟")
        print(f"  实际恢复时间: {report['results']['actual_recovery_time']} 分钟")
        print(f"  时间表现: {report['results']['time_performance']}")
        
        if report['observations']:
            print("  观察记录:")
            for obs in report['observations'][:5]:  # 显示前5条
                print(f"    - {obs}")
        
        if report['recommendations']:
            print("  改进建议:")
            for rec in report['recommendations']:
                print(f"    - {rec}")
    
    except Exception as e:
        print(f"生成报告失败: {e}")
    
    # 5. 显示演练统计
    print("\n5. 演练统计信息:")
    stats = drill_manager.get_drill_statistics()
    print(f"  总演练次数: {stats['total_drills']}")
    print(f"  成功演练次数: {stats['successful_drills']}")
    print(f"  完成演练次数: {stats['completed_drills']}")
    print(f"  成功率: {stats['success_rate']:.1f}%")
    
    print("\n  按类型统计:")
    for drill_type, type_stats in stats['by_type'].items():
        if type_stats['count'] > 0:
            print(f"    {drill_type}: {type_stats['count']} 次, 成功 {type_stats['successful']} 次, 平均恢复时间 {type_stats['avg_recovery_time']:.1f} 分钟")
    
    # 6. 列出演练记录
    print("\n6. 最近的演练记录:")
    records = drill_manager.list_drill_records()
    for record in records[:3]:  # 显示最近3次演练
        print(f"  - {record.drill_id}: {record.drill_type.value} ({record.status.value})")
        print(f"    时间: {record.start_time.strftime('%Y-%m-%d %H:%M:%S')}")
        print(f"    结果: {'成功' if record.success else '失败'}")
        if record.actual_recovery_time:
            print(f"    恢复时间: {record.actual_recovery_time} 分钟")
        print()
    
    print("\n=== 容灾演练演示完成 ===")

6.6 本章总结

6.6.1 核心知识点

from dataclasses import dataclass
from typing import List, Dict
from enum import Enum

class DeploymentComplexity(Enum):
    """部署复杂度"""
    SIMPLE = "simple"
    MODERATE = "moderate"
    COMPLEX = "complex"
    EXPERT = "expert"

@dataclass
class ClusterDeploymentSummary:
    """集群部署总结"""
    topic: str
    description: str
    complexity: DeploymentComplexity
    key_points: List[str]
    best_practices: List[str]
    common_issues: List[str]
    monitoring_metrics: List[str]

class RocketMQDeploymentSummary:
    """RocketMQ集群部署与运维知识总结"""
    
    def __init__(self):
        self.summaries = self._create_summaries()
    
    def _create_summaries(self) -> List[ClusterDeploymentSummary]:
        """创建知识点总结"""
        return [
            ClusterDeploymentSummary(
                topic="集群架构设计",
                description="RocketMQ集群的架构模式和部署策略",
                complexity=DeploymentComplexity.MODERATE,
                key_points=[
                    "NameServer集群:无状态,提供路由信息",
                    "Broker集群:Master-Slave架构,支持读写分离",
                    "DLedger模式:基于Raft协议的高可用方案",
                    "多Master模式:提供更高的并发处理能力",
                    "网络规划:合理的网络拓扑和端口配置"
                ],
                best_practices=[
                    "NameServer部署奇数个节点(3或5个)",
                    "Broker采用Master-Slave配置确保高可用",
                    "不同机房部署实现异地容灾",
                    "合理规划磁盘存储和网络带宽",
                    "使用专用网络隔离集群流量"
                ],
                common_issues=[
                    "NameServer单点故障",
                    "Broker磁盘空间不足",
                    "网络分区导致的脑裂",
                    "时钟同步问题",
                    "防火墙端口配置错误"
                ],
                monitoring_metrics=[
                    "集群节点状态",
                    "网络连通性",
                    "服务响应时间",
                    "资源使用率",
                    "错误日志统计"
                ]
            ),
            ClusterDeploymentSummary(
                topic="环境准备与安装",
                description="RocketMQ集群的环境配置和软件安装",
                complexity=DeploymentComplexity.SIMPLE,
                key_points=[
                    "系统要求:Linux/Windows,JDK 1.8+",
                    "硬件配置:CPU、内存、磁盘、网络规划",
                    "软件下载:官方二进制包或源码编译",
                    "目录结构:bin、conf、lib、logs等",
                    "环境变量:JAVA_HOME、ROCKETMQ_HOME等"
                ],
                best_practices=[
                    "使用SSD磁盘提升I/O性能",
                    "配置足够的内存避免频繁GC",
                    "设置合适的文件描述符限制",
                    "配置NTP时间同步",
                    "创建专用用户运行服务"
                ],
                common_issues=[
                    "JDK版本不兼容",
                    "内存配置不足",
                    "文件权限问题",
                    "端口被占用",
                    "磁盘空间不足"
                ],
                monitoring_metrics=[
                    "系统资源使用率",
                    "JVM内存使用",
                    "磁盘I/O性能",
                    "网络带宽使用",
                    "文件描述符使用"
                ]
            ),
            ClusterDeploymentSummary(
                topic="集群启动与管理",
                description="RocketMQ集群的启动、停止和日常管理",
                complexity=DeploymentComplexity.MODERATE,
                key_points=[
                    "启动顺序:先NameServer,后Broker",
                    "配置文件:broker.conf、logback配置",
                    "服务管理:systemd、supervisor等",
                    "健康检查:端口检测、服务状态",
                    "集群信息:路由表、Broker状态"
                ],
                best_practices=[
                    "使用系统服务管理进程",
                    "配置自动重启机制",
                    "实施滚动重启策略",
                    "定期检查集群健康状态",
                    "建立标准化运维流程"
                ],
                common_issues=[
                    "服务启动失败",
                    "Broker注册失败",
                    "集群状态不一致",
                    "内存泄漏导致OOM",
                    "网络连接超时"
                ],
                monitoring_metrics=[
                    "服务运行状态",
                    "进程CPU和内存使用",
                    "集群连接数",
                    "服务响应时间",
                    "错误率统计"
                ]
            ),
            ClusterDeploymentSummary(
                topic="监控与运维",
                description="RocketMQ集群的监控、告警和运维管理",
                complexity=DeploymentComplexity.COMPLEX,
                key_points=[
                    "性能监控:TPS、延迟、堆积量",
                    "系统监控:CPU、内存、磁盘、网络",
                    "日志管理:收集、分析、轮转",
                    "告警机制:阈值设置、通知方式",
                    "故障排查:诊断工具、问题定位"
                ],
                best_practices=[
                    "建立完善的监控体系",
                    "设置合理的告警阈值",
                    "实施自动化运维",
                    "定期进行性能调优",
                    "建立运维知识库"
                ],
                common_issues=[
                    "监控数据不准确",
                    "告警风暴",
                    "日志文件过大",
                    "性能瓶颈难以定位",
                    "故障恢复时间过长"
                ],
                monitoring_metrics=[
                    "消息生产消费TPS",
                    "消息堆积数量",
                    "Broker磁盘使用率",
                    "GC频率和耗时",
                    "网络连接状态"
                ]
            ),
            ClusterDeploymentSummary(
                topic="备份与恢复",
                description="RocketMQ集群的数据备份和灾难恢复",
                complexity=DeploymentComplexity.EXPERT,
                key_points=[
                    "备份策略:全量、增量、差异备份",
                    "备份内容:CommitLog、ConsumeQueue、Index",
                    "恢复类型:完全恢复、部分恢复、时间点恢复",
                    "容灾演练:故障模拟、恢复验证",
                    "数据一致性:备份验证、恢复测试"
                ],
                best_practices=[
                    "制定完善的备份策略",
                    "定期进行恢复测试",
                    "实施异地备份",
                    "自动化备份流程",
                    "建立容灾预案"
                ],
                common_issues=[
                    "备份数据不完整",
                    "恢复时间过长",
                    "数据一致性问题",
                    "备份存储空间不足",
                    "恢复流程复杂"
                ],
                monitoring_metrics=[
                    "备份成功率",
                    "备份文件大小",
                    "恢复时间",
                    "数据完整性",
                    "存储空间使用率"
                ]
            )
        ]
    
    def get_summary_by_topic(self, topic: str) -> ClusterDeploymentSummary:
        """根据主题获取总结"""
        for summary in self.summaries:
            if summary.topic == topic:
                return summary
        raise ValueError(f"未找到主题: {topic}")
    
    def get_all_topics(self) -> List[str]:
        """获取所有主题"""
        return [summary.topic for summary in self.summaries]
    
    def print_summary(self, topic: str = None):
        """打印总结信息"""
        if topic:
            summaries = [self.get_summary_by_topic(topic)]
        else:
            summaries = self.summaries
        
        for summary in summaries:
            print(f"\n=== {summary.topic} ===")
            print(f"描述: {summary.description}")
            print(f"复杂度: {summary.complexity.value}")
            
            print("\n关键点:")
            for point in summary.key_points:
                print(f"  • {point}")
            
            print("\n最佳实践:")
            for practice in summary.best_practices:
                print(f"  ✓ {practice}")
            
            print("\n常见问题:")
            for issue in summary.common_issues:
                print(f"  ⚠ {issue}")
            
            print("\n监控指标:")
            for metric in summary.monitoring_metrics:
                print(f"  📊 {metric}")

6.6.2 最佳实践指南

from dataclasses import dataclass
from typing import List, Dict
from enum import Enum

class PracticeCategory(Enum):
    """实践类别"""
    ARCHITECTURE = "architecture"      # 架构设计
    DEPLOYMENT = "deployment"          # 部署实施
    OPERATION = "operation"            # 运维管理
    PERFORMANCE = "performance"        # 性能优化
    SECURITY = "security"              # 安全管理
    DISASTER_RECOVERY = "disaster_recovery"  # 容灾恢复

@dataclass
class BestPractice:
    """最佳实践"""
    category: PracticeCategory
    title: str
    description: str
    implementation_steps: List[str]
    benefits: List[str]
    risks_if_ignored: List[str]
    priority: str  # high, medium, low

class RocketMQBestPractices:
    """RocketMQ集群部署与运维最佳实践"""
    
    def __init__(self):
        self.practices = self._create_best_practices()
    
    def _create_best_practices(self) -> List[BestPractice]:
        """创建最佳实践列表"""
        return [
            BestPractice(
                category=PracticeCategory.ARCHITECTURE,
                title="高可用架构设计",
                description="设计具备高可用性和容错能力的RocketMQ集群架构",
                implementation_steps=[
                    "部署至少3个NameServer节点,分布在不同机器",
                    "每个Broker配置Master-Slave,实现读写分离",
                    "使用DLedger模式提供自动故障转移",
                    "跨机房部署实现异地容灾",
                    "配置负载均衡器分发客户端请求"
                ],
                benefits=[
                    "消除单点故障",
                    "提供自动故障恢复",
                    "支持零停机维护",
                    "提升系统可靠性"
                ],
                risks_if_ignored=[
                    "单点故障导致服务中断",
                    "数据丢失风险",
                    "恢复时间过长",
                    "业务连续性受影响"
                ],
                priority="high"
            ),
            BestPractice(
                category=PracticeCategory.DEPLOYMENT,
                title="标准化部署流程",
                description="建立标准化、自动化的RocketMQ集群部署流程",
                implementation_steps=[
                    "创建部署脚本和配置模板",
                    "使用配置管理工具(如Ansible)",
                    "实施环境一致性检查",
                    "建立部署验证流程",
                    "文档化部署步骤和回滚方案"
                ],
                benefits=[
                    "减少人为错误",
                    "提高部署效率",
                    "确保环境一致性",
                    "便于快速扩容"
                ],
                risks_if_ignored=[
                    "部署错误频发",
                    "环境不一致",
                    "部署时间过长",
                    "难以快速扩容"
                ],
                priority="high"
            ),
            BestPractice(
                category=PracticeCategory.OPERATION,
                title="全面监控体系",
                description="建立覆盖系统、应用、业务的全方位监控体系",
                implementation_steps=[
                    "部署监控系统(如Prometheus + Grafana)",
                    "配置系统级监控指标",
                    "设置RocketMQ专用监控指标",
                    "建立告警规则和通知机制",
                    "创建监控大屏和报表"
                ],
                benefits=[
                    "及时发现问题",
                    "提供性能洞察",
                    "支持容量规划",
                    "优化运维效率"
                ],
                risks_if_ignored=[
                    "问题发现滞后",
                    "性能瓶颈难以定位",
                    "容量规划不准确",
                    "运维成本高"
                ],
                priority="high"
            ),
            BestPractice(
                category=PracticeCategory.PERFORMANCE,
                title="性能调优策略",
                description="系统性地进行RocketMQ集群性能优化",
                implementation_steps=[
                    "调优JVM参数(堆大小、GC策略)",
                    "优化操作系统参数(文件描述符、网络缓冲)",
                    "配置合适的刷盘策略",
                    "调整消息存储参数",
                    "优化网络和磁盘I/O"
                ],
                benefits=[
                    "提升消息吞吐量",
                    "降低消息延迟",
                    "减少资源消耗",
                    "提高系统稳定性"
                ],
                risks_if_ignored=[
                    "性能瓶颈",
                    "资源浪费",
                    "用户体验差",
                    "系统不稳定"
                ],
                priority="medium"
            ),
            BestPractice(
                category=PracticeCategory.SECURITY,
                title="安全加固措施",
                description="实施全面的安全防护和访问控制",
                implementation_steps=[
                    "启用ACL访问控制",
                    "配置SSL/TLS加密传输",
                    "设置防火墙规则",
                    "实施网络隔离",
                    "定期安全审计和漏洞扫描"
                ],
                benefits=[
                    "保护数据安全",
                    "防止未授权访问",
                    "符合合规要求",
                    "降低安全风险"
                ],
                risks_if_ignored=[
                    "数据泄露风险",
                    "未授权访问",
                    "合规问题",
                    "安全事件"
                ],
                priority="high"
            ),
            BestPractice(
                category=PracticeCategory.DISASTER_RECOVERY,
                title="容灾恢复计划",
                description="制定完善的备份策略和灾难恢复计划",
                implementation_steps=[
                    "制定备份策略(全量、增量、差异)",
                    "建立异地备份机制",
                    "定期进行容灾演练",
                    "制定恢复时间目标(RTO)和恢复点目标(RPO)",
                    "建立应急响应流程"
                ],
                benefits=[
                    "确保业务连续性",
                    "快速故障恢复",
                    "降低数据丢失风险",
                    "提高应急响应能力"
                ],
                risks_if_ignored=[
                    "灾难性数据丢失",
                    "长时间服务中断",
                    "业务影响严重",
                    "恢复困难"
                ],
                priority="high"
            )
        ]
    
    def get_practices_by_category(self, category: PracticeCategory) -> List[BestPractice]:
        """根据类别获取最佳实践"""
        return [p for p in self.practices if p.category == category]
    
    def get_high_priority_practices(self) -> List[BestPractice]:
        """获取高优先级最佳实践"""
        return [p for p in self.practices if p.priority == "high"]
    
    def print_practices(self, category: PracticeCategory = None, priority: str = None):
        """打印最佳实践"""
        practices = self.practices
        
        if category:
            practices = [p for p in practices if p.category == category]
        
        if priority:
            practices = [p for p in practices if p.priority == priority]
        
        for practice in practices:
            print(f"\n=== {practice.title} ===")
            print(f"类别: {practice.category.value}")
            print(f"优先级: {practice.priority}")
            print(f"描述: {practice.description}")
            
            print("\n实施步骤:")
            for step in practice.implementation_steps:
                print(f"  1. {step}")
            
            print("\n收益:")
            for benefit in practice.benefits:
                print(f"  ✓ {benefit}")
            
            print("\n忽略风险:")
             for risk in practice.risks_if_ignored:
                 print(f"  ⚠ {risk}")

# 使用示例
if __name__ == "__main__":
    # 创建最佳实践管理器
    best_practices = RocketMQBestPractices()
    
    print("=== RocketMQ集群部署与运维最佳实践 ===")
    
    # 1. 显示所有高优先级实践
    print("\n1. 高优先级最佳实践:")
    high_priority_practices = best_practices.get_high_priority_practices()
    for practice in high_priority_practices:
        print(f"  • {practice.title} ({practice.category.value})")
    
    # 2. 按类别显示实践
    print("\n2. 架构设计最佳实践:")
    best_practices.print_practices(category=PracticeCategory.ARCHITECTURE)
    
    print("\n3. 运维管理最佳实践:")
    best_practices.print_practices(category=PracticeCategory.OPERATION)

6.6.3 性能优化指南

from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum

class OptimizationLevel(Enum):
    """优化级别"""
    BASIC = "basic"          # 基础优化
    INTERMEDIATE = "intermediate"  # 中级优化
    ADVANCED = "advanced"    # 高级优化
    EXPERT = "expert"        # 专家级优化

class ComponentType(Enum):
    """组件类型"""
    NAMESERVER = "nameserver"
    BROKER = "broker"
    PRODUCER = "producer"
    CONSUMER = "consumer"
    SYSTEM = "system"
    NETWORK = "network"
    STORAGE = "storage"

@dataclass
class OptimizationItem:
    """优化项"""
    component: ComponentType
    level: OptimizationLevel
    title: str
    description: str
    parameters: Dict[str, str]
    expected_improvement: str
    impact: str  # low, medium, high
    risk: str    # low, medium, high

class RocketMQPerformanceOptimizer:
    """RocketMQ性能优化器"""
    
    def __init__(self):
        self.optimizations = self._create_optimizations()
    
    def _create_optimizations(self) -> List[OptimizationItem]:
        """创建优化项列表"""
        return [
            # NameServer优化
            OptimizationItem(
                component=ComponentType.NAMESERVER,
                level=OptimizationLevel.BASIC,
                title="NameServer JVM优化",
                description="优化NameServer的JVM参数以提升性能",
                parameters={
                    "-Xms": "2g",
                    "-Xmx": "2g",
                    "-XX:+UseG1GC": "",
                    "-XX:G1HeapRegionSize": "16m",
                    "-XX:G1ReservePercent": "25",
                    "-XX:InitiatingHeapOccupancyPercent": "30"
                },
                expected_improvement="减少GC停顿时间,提升响应速度",
                impact="medium",
                risk="low"
            ),
            OptimizationItem(
                component=ComponentType.NAMESERVER,
                level=OptimizationLevel.INTERMEDIATE,
                title="NameServer网络优化",
                description="优化NameServer的网络配置",
                parameters={
                    "serverWorkerThreads": "8",
                    "serverCallbackExecutorThreads": "0",
                    "serverSelectorThreads": "3",
                    "serverOnewaySemaphoreValue": "256",
                    "serverAsyncSemaphoreValue": "64"
                },
                expected_improvement="提升网络处理能力和并发性能",
                impact="medium",
                risk="low"
            ),
            
            # Broker优化
            OptimizationItem(
                component=ComponentType.BROKER,
                level=OptimizationLevel.BASIC,
                title="Broker JVM优化",
                description="优化Broker的JVM参数以提升性能",
                parameters={
                    "-Xms": "8g",
                    "-Xmx": "8g",
                    "-XX:+UseG1GC": "",
                    "-XX:G1HeapRegionSize": "16m",
                    "-XX:G1ReservePercent": "25",
                    "-XX:InitiatingHeapOccupancyPercent": "30",
                    "-XX:+UnlockExperimentalVMOptions": "",
                    "-XX:+UseG1GC": "",
                    "-XX:+ParallelRefProcEnabled": ""
                },
                expected_improvement="减少GC停顿,提升消息处理吞吐量",
                impact="high",
                risk="low"
            ),
            OptimizationItem(
                component=ComponentType.BROKER,
                level=OptimizationLevel.INTERMEDIATE,
                title="Broker存储优化",
                description="优化Broker的消息存储配置",
                parameters={
                    "flushDiskType": "ASYNC_FLUSH",
                    "flushIntervalCommitLog": "500",
                    "flushCommitLogTimed": "true",
                    "flushIntervalConsumeQueue": "1000",
                    "flushConsumeQueueTimed": "true",
                    "mapedFileSizeCommitLog": "1073741824",
                    "mapedFileSizeConsumeQueue": "6000000"
                },
                expected_improvement="提升磁盘写入性能,减少I/O延迟",
                impact="high",
                risk="medium"
            ),
            OptimizationItem(
                component=ComponentType.BROKER,
                level=OptimizationLevel.ADVANCED,
                title="Broker线程池优化",
                description="优化Broker的线程池配置",
                parameters={
                    "sendMessageThreadPoolNums": "16",
                    "pullMessageThreadPoolNums": "16",
                    "queryMessageThreadPoolNums": "8",
                    "adminBrokerThreadPoolNums": "16",
                    "clientManagerThreadPoolNums": "32",
                    "consumerManagerThreadPoolNums": "32"
                },
                expected_improvement="提升消息处理并发能力",
                impact="high",
                risk="medium"
            ),
            
            # 系统级优化
            OptimizationItem(
                component=ComponentType.SYSTEM,
                level=OptimizationLevel.BASIC,
                title="操作系统参数优化",
                description="优化操作系统级别的参数",
                parameters={
                    "vm.swappiness": "1",
                    "vm.dirty_background_ratio": "5",
                    "vm.dirty_ratio": "15",
                    "vm.dirty_writeback_centisecs": "100",
                    "vm.dirty_expire_centisecs": "200",
                    "net.core.rmem_default": "262144",
                    "net.core.rmem_max": "16777216",
                    "net.core.wmem_default": "262144",
                    "net.core.wmem_max": "16777216"
                },
                expected_improvement="提升系统整体性能和网络吞吐量",
                impact="medium",
                risk="low"
            ),
            OptimizationItem(
                component=ComponentType.SYSTEM,
                level=OptimizationLevel.INTERMEDIATE,
                title="文件系统优化",
                description="优化文件系统配置",
                parameters={
                    "mount_options": "noatime,nodiratime,nobarrier",
                    "scheduler": "deadline",
                    "read_ahead_kb": "4096",
                    "max_sectors_kb": "512"
                },
                expected_improvement="提升磁盘I/O性能",
                impact="medium",
                risk="low"
            ),
            
            # 网络优化
            OptimizationItem(
                component=ComponentType.NETWORK,
                level=OptimizationLevel.INTERMEDIATE,
                title="网络参数优化",
                description="优化网络相关参数",
                parameters={
                    "net.ipv4.tcp_tw_reuse": "1",
                    "net.ipv4.tcp_tw_recycle": "1",
                    "net.ipv4.tcp_fin_timeout": "30",
                    "net.ipv4.tcp_keepalive_time": "1200",
                    "net.ipv4.tcp_max_syn_backlog": "8192",
                    "net.core.netdev_max_backlog": "5000",
                    "net.core.somaxconn": "65535"
                },
                expected_improvement="提升网络连接处理能力",
                impact="medium",
                risk="low"
            ),
            
            # 存储优化
            OptimizationItem(
                component=ComponentType.STORAGE,
                level=OptimizationLevel.ADVANCED,
                title="存储I/O优化",
                description="优化存储I/O性能",
                parameters={
                    "io_scheduler": "deadline",
                    "queue_depth": "32",
                    "read_ahead": "4096",
                    "write_cache": "enabled",
                    "barrier": "disabled"
                },
                expected_improvement="显著提升磁盘I/O性能",
                impact="high",
                risk="medium"
            )
        ]
    
    def get_optimizations_by_component(self, component: ComponentType) -> List[OptimizationItem]:
        """根据组件类型获取优化项"""
        return [opt for opt in self.optimizations if opt.component == component]
    
    def get_optimizations_by_level(self, level: OptimizationLevel) -> List[OptimizationItem]:
        """根据优化级别获取优化项"""
        return [opt for opt in self.optimizations if opt.level == level]
    
    def get_basic_optimizations(self) -> List[OptimizationItem]:
        """获取基础优化项"""
        return self.get_optimizations_by_level(OptimizationLevel.BASIC)
    
    def generate_optimization_script(self, component: ComponentType, level: OptimizationLevel = None) -> str:
        """生成优化脚本"""
        optimizations = self.get_optimizations_by_component(component)
        
        if level:
            optimizations = [opt for opt in optimizations if opt.level == level]
        
        script_lines = []
        script_lines.append(f"#!/bin/bash")
        script_lines.append(f"# RocketMQ {component.value} 性能优化脚本")
        script_lines.append(f"# 生成时间: $(date)")
        script_lines.append("")
        
        for opt in optimizations:
            script_lines.append(f"# {opt.title}")
            script_lines.append(f"# {opt.description}")
            script_lines.append(f"# 预期改进: {opt.expected_improvement}")
            script_lines.append(f"# 影响级别: {opt.impact}, 风险级别: {opt.risk}")
            
            if component == ComponentType.SYSTEM:
                for param, value in opt.parameters.items():
                    if param.startswith("net.") or param.startswith("vm."):
                        script_lines.append(f"echo '{param} = {value}' >> /etc/sysctl.conf")
                    else:
                        script_lines.append(f"# {param}: {value}")
                script_lines.append("sysctl -p")
            
            elif component in [ComponentType.NAMESERVER, ComponentType.BROKER]:
                script_lines.append(f"# JVM参数配置 (添加到启动脚本):")
                for param, value in opt.parameters.items():
                    if value:
                        script_lines.append(f"# {param}={value}")
                    else:
                        script_lines.append(f"# {param}")
            
            script_lines.append("")
        
        return "\n".join(script_lines)
    
    def print_optimization_summary(self):
        """打印优化总结"""
        print("=== RocketMQ性能优化总结 ===")
        
        # 按组件分组
        components = {}
        for opt in self.optimizations:
            if opt.component not in components:
                components[opt.component] = []
            components[opt.component].append(opt)
        
        for component, opts in components.items():
            print(f"\n{component.value.upper()} 优化项 ({len(opts)}个):")
            for opt in opts:
                print(f"  • {opt.title} ({opt.level.value}) - 影响: {opt.impact}, 风险: {opt.risk}")
                print(f"    {opt.description}")
                print(f"    预期改进: {opt.expected_improvement}")
                print()

# 使用示例
if __name__ == "__main__":
    optimizer = RocketMQPerformanceOptimizer()
    
    print("=== RocketMQ性能优化指南 ===")
    
    # 1. 显示优化总结
    optimizer.print_optimization_summary()
    
    # 2. 生成Broker优化脚本
    print("\n=== Broker优化脚本 ===")
    broker_script = optimizer.generate_optimization_script(ComponentType.BROKER, OptimizationLevel.BASIC)
    print(broker_script)
    
    # 3. 生成系统优化脚本
    print("\n=== 系统优化脚本 ===")
    system_script = optimizer.generate_optimization_script(ComponentType.SYSTEM)
    print(system_script)
    
    # 4. 显示基础优化建议
    print("\n=== 基础优化建议 ===")
    basic_opts = optimizer.get_basic_optimizations()
    for opt in basic_opts:
        print(f"• {opt.component.value}: {opt.title}")
        print(f"  {opt.description}")
        print(f"  预期改进: {opt.expected_improvement}")
        print()

6.6.4 练习题

from dataclasses import dataclass
from typing import List, Dict
from enum import Enum

class ExerciseType(Enum):
    """练习类型"""
    DEPLOYMENT = "deployment"      # 部署实践
    CONFIGURATION = "configuration"  # 配置优化
    MONITORING = "monitoring"      # 监控运维
    TROUBLESHOOTING = "troubleshooting"  # 故障排查
    PERFORMANCE = "performance"    # 性能调优
    DISASTER_RECOVERY = "disaster_recovery"  # 容灾恢复

class Difficulty(Enum):
    """难度级别"""
    BEGINNER = "beginner"      # 初级
    INTERMEDIATE = "intermediate"  # 中级
    ADVANCED = "advanced"      # 高级
    EXPERT = "expert"          # 专家级

@dataclass
class Exercise:
    """练习题"""
    exercise_id: str
    title: str
    type: ExerciseType
    difficulty: Difficulty
    description: str
    requirements: List[str]
    tasks: List[str]
    expected_outcomes: List[str]
    evaluation_criteria: List[str]
    hints: List[str]
    estimated_time: str

class RocketMQDeploymentExercises:
    """RocketMQ集群部署与运维练习题"""
    
    def __init__(self):
        self.exercises = self._create_exercises()
    
    def _create_exercises(self) -> List[Exercise]:
        """创建练习题列表"""
        return [
            Exercise(
                exercise_id="deploy_001",
                title="RocketMQ集群部署实践",
                type=ExerciseType.DEPLOYMENT,
                difficulty=Difficulty.INTERMEDIATE,
                description="在3台服务器上部署一个高可用的RocketMQ集群,包括3个NameServer和6个Broker(3主3从)",
                requirements=[
                    "3台Linux服务器(CentOS 7+或Ubuntu 18+)",
                    "每台服务器至少4GB内存,50GB磁盘空间",
                    "JDK 1.8或更高版本",
                    "网络互通,防火墙配置正确"
                ],
                tasks=[
                    "在每台服务器上安装和配置JDK环境",
                    "下载并解压RocketMQ二进制包",
                    "配置3个NameServer节点",
                    "配置6个Broker节点(3主3从架构)",
                    "编写启动和停止脚本",
                    "验证集群状态和功能",
                    "编写简单的生产者和消费者程序测试集群"
                ],
                expected_outcomes=[
                    "成功部署3节点NameServer集群",
                    "成功部署6节点Broker集群(主从架构)",
                    "所有节点正常运行并相互发现",
                    "生产者能够成功发送消息",
                    "消费者能够成功消费消息",
                    "主从同步正常工作"
                ],
                evaluation_criteria=[
                    "集群架构设计合理性",
                    "配置文件正确性",
                    "启动脚本完整性",
                    "集群功能验证完整性",
                    "文档记录详细程度"
                ],
                hints=[
                    "注意配置文件中的IP地址和端口",
                    "确保防火墙开放必要端口",
                    "使用mqadmin工具验证集群状态",
                    "测试主从切换功能"
                ],
                estimated_time="4-6小时"
            ),
            Exercise(
                exercise_id="config_001",
                title="RocketMQ性能调优配置",
                type=ExerciseType.CONFIGURATION,
                difficulty=Difficulty.ADVANCED,
                description="对已部署的RocketMQ集群进行性能调优,优化JVM参数、存储配置和网络参数",
                requirements=[
                    "已部署的RocketMQ集群",
                    "性能测试工具(如JMeter或自定义测试程序)",
                    "系统监控工具",
                    "对JVM调优有基本了解"
                ],
                tasks=[
                    "分析当前集群的性能瓶颈",
                    "优化NameServer和Broker的JVM参数",
                    "调整Broker的存储配置(刷盘策略、文件大小等)",
                    "优化操作系统参数(网络、文件系统等)",
                    "配置监控指标收集",
                    "进行性能测试对比",
                    "编写性能调优报告"
                ],
                expected_outcomes=[
                    "消息吞吐量提升20%以上",
                    "消息延迟降低15%以上",
                    "GC停顿时间减少",
                    "系统资源利用率优化",
                    "建立性能基线和监控体系"
                ],
                evaluation_criteria=[
                    "性能提升效果",
                    "配置参数合理性",
                    "测试方法科学性",
                    "监控体系完整性",
                    "调优报告质量"
                ],
                hints=[
                    "使用G1GC替代默认GC",
                    "调整堆内存大小和GC参数",
                    "考虑使用异步刷盘提升性能",
                    "监控磁盘I/O和网络带宽使用情况"
                ],
                estimated_time="6-8小时"
            ),
            Exercise(
                exercise_id="monitor_001",
                title="RocketMQ监控体系建设",
                type=ExerciseType.MONITORING,
                difficulty=Difficulty.INTERMEDIATE,
                description="为RocketMQ集群建立完整的监控和告警体系",
                requirements=[
                    "运行中的RocketMQ集群",
                    "Prometheus和Grafana环境",
                    "告警通知渠道(邮件、钉钉等)",
                    "基本的监控知识"
                ],
                tasks=[
                    "部署RocketMQ Exporter收集指标",
                    "配置Prometheus采集RocketMQ指标",
                    "在Grafana中创建RocketMQ监控大屏",
                    "设置关键指标的告警规则",
                    "配置告警通知渠道",
                    "编写监控运维手册",
                    "模拟故障验证告警效果"
                ],
                expected_outcomes=[
                    "实时监控集群状态和性能指标",
                    "直观的监控大屏展示",
                    "及时的故障告警通知",
                    "完整的监控运维文档",
                    "有效的故障响应机制"
                ],
                evaluation_criteria=[
                    "监控指标覆盖度",
                    "大屏设计美观性和实用性",
                    "告警规则合理性",
                    "告警响应及时性",
                    "文档完整性"
                ],
                hints=[
                    "重点监控消息堆积、TPS、延迟等关键指标",
                    "设置合理的告警阈值避免误报",
                    "使用标签和注解增强可读性",
                    "建立告警升级机制"
                ],
                estimated_time="4-5小时"
            ),
            Exercise(
                exercise_id="trouble_001",
                title="RocketMQ故障排查实战",
                type=ExerciseType.TROUBLESHOOTING,
                difficulty=Difficulty.ADVANCED,
                description="模拟和解决RocketMQ集群的各种故障场景",
                requirements=[
                    "运行中的RocketMQ集群",
                    "故障注入工具或脚本",
                    "日志分析工具",
                    "网络和系统诊断工具"
                ],
                tasks=[
                    "模拟NameServer节点故障",
                    "模拟Broker主节点故障",
                    "模拟网络分区故障",
                    "模拟磁盘空间不足",
                    "模拟内存不足导致的OOM",
                    "分析故障原因和影响",
                    "制定故障恢复方案",
                    "编写故障排查手册"
                ],
                expected_outcomes=[
                    "快速定位和解决各类故障",
                    "理解故障对系统的影响",
                    "掌握故障恢复的最佳实践",
                    "建立故障响应流程",
                    "完善的故障排查文档"
                ],
                evaluation_criteria=[
                    "故障定位速度",
                    "解决方案有效性",
                    "故障分析深度",
                    "恢复方案合理性",
                    "文档实用性"
                ],
                hints=[
                    "重点关注日志文件中的错误信息",
                    "使用mqadmin工具检查集群状态",
                    "分析网络连接和端口状态",
                    "监控系统资源使用情况"
                ],
                estimated_time="6-8小时"
            ),
            Exercise(
                exercise_id="disaster_001",
                title="RocketMQ容灾恢复演练",
                type=ExerciseType.DISASTER_RECOVERY,
                difficulty=Difficulty.EXPERT,
                description="设计和实施RocketMQ集群的容灾恢复方案",
                requirements=[
                    "多机房RocketMQ集群",
                    "备份存储系统",
                    "自动化部署工具",
                    "容灾演练环境"
                ],
                tasks=[
                    "设计跨机房容灾架构",
                    "制定数据备份策略",
                    "实施自动化备份流程",
                    "编写容灾切换脚本",
                    "进行容灾演练",
                    "验证数据一致性",
                    "优化恢复时间",
                    "编写容灾运维手册"
                ],
                expected_outcomes=[
                    "完整的容灾架构设计",
                    "自动化的备份恢复流程",
                    "快速的故障切换能力",
                    "数据零丢失或最小丢失",
                    "标准化的容灾操作流程"
                ],
                evaluation_criteria=[
                    "容灾架构合理性",
                    "备份策略完整性",
                    "切换速度和成功率",
                    "数据一致性保证",
                    "操作流程标准化程度"
                ],
                hints=[
                    "考虑RTO和RPO指标要求",
                    "使用DLedger模式提高可用性",
                    "定期验证备份数据完整性",
                    "建立多层次的容灾方案"
                ],
                estimated_time="8-12小时"
            )
        ]
    
    def get_exercises_by_type(self, exercise_type: ExerciseType) -> List[Exercise]:
        """根据类型获取练习题"""
        return [ex for ex in self.exercises if ex.type == exercise_type]
    
    def get_exercises_by_difficulty(self, difficulty: Difficulty) -> List[Exercise]:
        """根据难度获取练习题"""
        return [ex for ex in self.exercises if ex.difficulty == difficulty]
    
    def get_exercise_by_id(self, exercise_id: str) -> Exercise:
        """根据ID获取练习题"""
        for exercise in self.exercises:
            if exercise.exercise_id == exercise_id:
                return exercise
        raise ValueError(f"未找到练习题: {exercise_id}")
    
    def print_exercise(self, exercise_id: str):
        """打印练习题详情"""
        exercise = self.get_exercise_by_id(exercise_id)
        
        print(f"=== {exercise.title} ===")
        print(f"练习ID: {exercise.exercise_id}")
        print(f"类型: {exercise.type.value}")
        print(f"难度: {exercise.difficulty.value}")
        print(f"预估时间: {exercise.estimated_time}")
        print(f"\n描述: {exercise.description}")
        
        print("\n环境要求:")
        for req in exercise.requirements:
            print(f"  • {req}")
        
        print("\n任务列表:")
        for i, task in enumerate(exercise.tasks, 1):
            print(f"  {i}. {task}")
        
        print("\n预期成果:")
        for outcome in exercise.expected_outcomes:
            print(f"  ✓ {outcome}")
        
        print("\n评估标准:")
        for criteria in exercise.evaluation_criteria:
            print(f"  📊 {criteria}")
        
        print("\n提示:")
        for hint in exercise.hints:
            print(f"  💡 {hint}")
    
    def print_exercise_list(self):
        """打印练习题列表"""
        print("=== RocketMQ集群部署与运维练习题 ===")
        
        # 按类型分组
        types = {}
        for exercise in self.exercises:
            if exercise.type not in types:
                types[exercise.type] = []
            types[exercise.type].append(exercise)
        
        for exercise_type, exercises in types.items():
            print(f"\n{exercise_type.value.upper()} ({len(exercises)}题):")
            for exercise in exercises:
                print(f"  • {exercise.exercise_id}: {exercise.title} ({exercise.difficulty.value})")
                print(f"    {exercise.description}")
                print(f"    预估时间: {exercise.estimated_time}")
                print()

# 使用示例
if __name__ == "__main__":
    exercises = RocketMQDeploymentExercises()
    
    print("=== RocketMQ集群部署与运维练习题 ===")
    
    # 1. 显示所有练习题
    exercises.print_exercise_list()
    
    # 2. 显示特定练习题详情
    print("\n=== 练习题详情示例 ===")
    exercises.print_exercise("deploy_001")
    
    # 3. 按难度筛选
    print("\n=== 中级难度练习题 ===")
    intermediate_exercises = exercises.get_exercises_by_difficulty(Difficulty.INTERMEDIATE)
    for exercise in intermediate_exercises:
        print(f"• {exercise.title} ({exercise.type.value})")