8.1 性能监控与指标

8.1.1 核心性能指标

from dataclasses import dataclass
from typing import Dict, List, Optional, Any
from enum import Enum
import time
import threading
from collections import defaultdict, deque
import statistics

class MetricType(Enum):
    """指标类型"""
    COUNTER = "counter"          # 计数器
    GAUGE = "gauge"              # 仪表盘
    HISTOGRAM = "histogram"      # 直方图
    TIMER = "timer"              # 计时器
    RATE = "rate"                # 速率

class ComponentType(Enum):
    """组件类型"""
    PRODUCER = "producer"        # 生产者
    CONSUMER = "consumer"        # 消费者
    BROKER = "broker"            # Broker
    NAMESERVER = "nameserver"    # NameServer
    SYSTEM = "system"            # 系统

@dataclass
class MetricDefinition:
    """指标定义"""
    name: str
    type: MetricType
    component: ComponentType
    description: str
    unit: str
    tags: Dict[str, str]
    threshold_warning: Optional[float] = None
    threshold_critical: Optional[float] = None

@dataclass
class MetricValue:
    """指标值"""
    timestamp: float
    value: float
    tags: Dict[str, str]

@dataclass
class PerformanceSnapshot:
    """性能快照"""
    timestamp: float
    component: ComponentType
    metrics: Dict[str, float]
    status: str
    alerts: List[str]

class PerformanceMonitor:
    """性能监控器"""
    
    def __init__(self):
        self.metrics_definitions = self._create_metrics_definitions()
        self.metrics_data = defaultdict(lambda: deque(maxlen=1000))
        self.alerts = []
        self.monitoring_enabled = True
        self._lock = threading.Lock()
    
    def _create_metrics_definitions(self) -> Dict[str, MetricDefinition]:
        """创建指标定义"""
        definitions = {}
        
        # 生产者指标
        producer_metrics = [
            MetricDefinition(
                name="producer_send_tps",
                type=MetricType.RATE,
                component=ComponentType.PRODUCER,
                description="生产者发送TPS",
                unit="msg/s",
                tags={"component": "producer"},
                threshold_warning=1000.0,
                threshold_critical=500.0
            ),
            MetricDefinition(
                name="producer_send_latency_avg",
                type=MetricType.TIMER,
                component=ComponentType.PRODUCER,
                description="生产者发送平均延迟",
                unit="ms",
                tags={"component": "producer"},
                threshold_warning=100.0,
                threshold_critical=500.0
            ),
            MetricDefinition(
                name="producer_send_latency_p99",
                type=MetricType.TIMER,
                component=ComponentType.PRODUCER,
                description="生产者发送P99延迟",
                unit="ms",
                tags={"component": "producer"},
                threshold_warning=200.0,
                threshold_critical=1000.0
            ),
            MetricDefinition(
                name="producer_send_success_rate",
                type=MetricType.RATE,
                component=ComponentType.PRODUCER,
                description="生产者发送成功率",
                unit="%",
                tags={"component": "producer"},
                threshold_warning=99.0,
                threshold_critical=95.0
            ),
            MetricDefinition(
                name="producer_queue_size",
                type=MetricType.GAUGE,
                component=ComponentType.PRODUCER,
                description="生产者队列大小",
                unit="count",
                tags={"component": "producer"},
                threshold_warning=10000.0,
                threshold_critical=50000.0
            )
        ]
        
        # 消费者指标
        consumer_metrics = [
            MetricDefinition(
                name="consumer_consume_tps",
                type=MetricType.RATE,
                component=ComponentType.CONSUMER,
                description="消费者消费TPS",
                unit="msg/s",
                tags={"component": "consumer"},
                threshold_warning=1000.0,
                threshold_critical=500.0
            ),
            MetricDefinition(
                name="consumer_consume_latency_avg",
                type=MetricType.TIMER,
                component=ComponentType.CONSUMER,
                description="消费者消费平均延迟",
                unit="ms",
                tags={"component": "consumer"},
                threshold_warning=100.0,
                threshold_critical=500.0
            ),
            MetricDefinition(
                name="consumer_lag",
                type=MetricType.GAUGE,
                component=ComponentType.CONSUMER,
                description="消费者消费延迟",
                unit="count",
                tags={"component": "consumer"},
                threshold_warning=10000.0,
                threshold_critical=100000.0
            ),
            MetricDefinition(
                name="consumer_success_rate",
                type=MetricType.RATE,
                component=ComponentType.CONSUMER,
                description="消费者消费成功率",
                unit="%",
                tags={"component": "consumer"},
                threshold_warning=99.0,
                threshold_critical=95.0
            )
        ]
        
        # Broker指标
        broker_metrics = [
            MetricDefinition(
                name="broker_put_tps",
                type=MetricType.RATE,
                component=ComponentType.BROKER,
                description="Broker写入TPS",
                unit="msg/s",
                tags={"component": "broker"},
                threshold_warning=5000.0,
                threshold_critical=2000.0
            ),
            MetricDefinition(
                name="broker_get_tps",
                type=MetricType.RATE,
                component=ComponentType.BROKER,
                description="Broker读取TPS",
                unit="msg/s",
                tags={"component": "broker"},
                threshold_warning=5000.0,
                threshold_critical=2000.0
            ),
            MetricDefinition(
                name="broker_put_latency_avg",
                type=MetricType.TIMER,
                component=ComponentType.BROKER,
                description="Broker写入平均延迟",
                unit="ms",
                tags={"component": "broker"},
                threshold_warning=10.0,
                threshold_critical=50.0
            ),
            MetricDefinition(
                name="broker_disk_usage",
                type=MetricType.GAUGE,
                component=ComponentType.BROKER,
                description="Broker磁盘使用率",
                unit="%",
                tags={"component": "broker"},
                threshold_warning=80.0,
                threshold_critical=90.0
            ),
            MetricDefinition(
                name="broker_memory_usage",
                type=MetricType.GAUGE,
                component=ComponentType.BROKER,
                description="Broker内存使用率",
                unit="%",
                tags={"component": "broker"},
                threshold_warning=80.0,
                threshold_critical=90.0
            )
        ]
        
        # 系统指标
        system_metrics = [
            MetricDefinition(
                name="system_cpu_usage",
                type=MetricType.GAUGE,
                component=ComponentType.SYSTEM,
                description="系统CPU使用率",
                unit="%",
                tags={"component": "system"},
                threshold_warning=80.0,
                threshold_critical=90.0
            ),
            MetricDefinition(
                name="system_memory_usage",
                type=MetricType.GAUGE,
                component=ComponentType.SYSTEM,
                description="系统内存使用率",
                unit="%",
                tags={"component": "system"},
                threshold_warning=80.0,
                threshold_critical=90.0
            ),
            MetricDefinition(
                name="system_disk_io_util",
                type=MetricType.GAUGE,
                component=ComponentType.SYSTEM,
                description="系统磁盘IO使用率",
                unit="%",
                tags={"component": "system"},
                threshold_warning=80.0,
                threshold_critical=95.0
            ),
            MetricDefinition(
                name="system_network_io",
                type=MetricType.RATE,
                component=ComponentType.SYSTEM,
                description="系统网络IO",
                unit="MB/s",
                tags={"component": "system"},
                threshold_warning=100.0,
                threshold_critical=500.0
            )
        ]
        
        # 合并所有指标
        all_metrics = producer_metrics + consumer_metrics + broker_metrics + system_metrics
        for metric in all_metrics:
            definitions[metric.name] = metric
        
        return definitions
    
    def record_metric(self, metric_name: str, value: float, tags: Dict[str, str] = None):
        """记录指标"""
        if not self.monitoring_enabled:
            return
        
        with self._lock:
            metric_value = MetricValue(
                timestamp=time.time(),
                value=value,
                tags=tags or {}
            )
            self.metrics_data[metric_name].append(metric_value)
            
            # 检查阈值告警
            self._check_threshold(metric_name, value)
    
    def _check_threshold(self, metric_name: str, value: float):
        """检查阈值告警"""
        definition = self.metrics_definitions.get(metric_name)
        if not definition:
            return
        
        current_time = time.time()
        
        if definition.threshold_critical and value >= definition.threshold_critical:
            alert = f"CRITICAL: {metric_name} = {value} {definition.unit} (threshold: {definition.threshold_critical})"
            self.alerts.append((current_time, "CRITICAL", alert))
        elif definition.threshold_warning and value >= definition.threshold_warning:
            alert = f"WARNING: {metric_name} = {value} {definition.unit} (threshold: {definition.threshold_warning})"
            self.alerts.append((current_time, "WARNING", alert))
    
    def get_metric_statistics(self, metric_name: str, duration_seconds: int = 300) -> Dict[str, float]:
        """获取指标统计信息"""
        with self._lock:
            data = self.metrics_data.get(metric_name, deque())
            if not data:
                return {}
            
            current_time = time.time()
            recent_data = [item.value for item in data 
                          if current_time - item.timestamp <= duration_seconds]
            
            if not recent_data:
                return {}
            
            return {
                "count": len(recent_data),
                "min": min(recent_data),
                "max": max(recent_data),
                "avg": statistics.mean(recent_data),
                "median": statistics.median(recent_data),
                "p95": self._percentile(recent_data, 95),
                "p99": self._percentile(recent_data, 99)
            }
    
    def _percentile(self, data: List[float], percentile: float) -> float:
        """计算百分位数"""
        if not data:
            return 0.0
        sorted_data = sorted(data)
        index = int(len(sorted_data) * percentile / 100)
        return sorted_data[min(index, len(sorted_data) - 1)]
    
    def get_performance_snapshot(self, component: ComponentType) -> PerformanceSnapshot:
        """获取性能快照"""
        current_time = time.time()
        metrics = {}
        alerts = []
        
        # 收集组件相关指标
        for metric_name, definition in self.metrics_definitions.items():
            if definition.component == component:
                stats = self.get_metric_statistics(metric_name)
                if stats:
                    metrics[metric_name] = stats.get("avg", 0.0)
        
        # 收集最近的告警
        recent_alerts = [alert[2] for alert in self.alerts 
                        if current_time - alert[0] <= 300]  # 最近5分钟
        
        # 判断状态
        status = "HEALTHY"
        if any("CRITICAL" in alert for alert in recent_alerts):
            status = "CRITICAL"
        elif any("WARNING" in alert for alert in recent_alerts):
            status = "WARNING"
        
        return PerformanceSnapshot(
            timestamp=current_time,
            component=component,
            metrics=metrics,
            status=status,
            alerts=recent_alerts
        )
    
    def print_performance_report(self):
        """打印性能报告"""
        print("=== RocketMQ性能监控报告 ===")
        print(f"报告时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
        
        for component in ComponentType:
            snapshot = self.get_performance_snapshot(component)
            print(f"\n📊 {component.value.upper()} 组件状态: {snapshot.status}")
            
            if snapshot.metrics:
                print("关键指标:")
                for metric_name, value in snapshot.metrics.items():
                    definition = self.metrics_definitions.get(metric_name)
                    unit = definition.unit if definition else ""
                    print(f"  • {metric_name}: {value:.2f} {unit}")
            
            if snapshot.alerts:
                print("⚠️ 告警信息:")
                for alert in snapshot.alerts[-3:]:  # 显示最近3条告警
                    print(f"  • {alert}")
    
    def enable_monitoring(self):
        """启用监控"""
        self.monitoring_enabled = True
    
    def disable_monitoring(self):
        """禁用监控"""
        self.monitoring_enabled = False
    
    def clear_alerts(self):
        """清除告警"""
        with self._lock:
            self.alerts.clear()

# 使用示例
if __name__ == "__main__":
    # 创建性能监控器
    monitor = PerformanceMonitor()
    
    # 模拟记录一些指标
    import random
    
    # 生产者指标
    monitor.record_metric("producer_send_tps", random.uniform(800, 1200))
    monitor.record_metric("producer_send_latency_avg", random.uniform(50, 150))
    monitor.record_metric("producer_send_success_rate", random.uniform(98, 100))
    
    # 消费者指标
    monitor.record_metric("consumer_consume_tps", random.uniform(900, 1100))
    monitor.record_metric("consumer_lag", random.uniform(1000, 5000))
    
    # Broker指标
    monitor.record_metric("broker_put_tps", random.uniform(3000, 6000))
    monitor.record_metric("broker_disk_usage", random.uniform(60, 85))
    monitor.record_metric("broker_memory_usage", random.uniform(70, 90))
    
    # 系统指标
    monitor.record_metric("system_cpu_usage", random.uniform(40, 85))
    monitor.record_metric("system_memory_usage", random.uniform(60, 85))
    
    # 打印性能报告
    monitor.print_performance_report()
    
    # 获取特定指标的统计信息
    stats = monitor.get_metric_statistics("producer_send_tps")
    print(f"\n生产者TPS统计: {stats}")

8.1.2 监控数据收集

from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
import json
import requests
import subprocess
import psutil
import time

class CollectorType(Enum):
    """收集器类型"""
    JMX = "jmx"                  # JMX收集器
    HTTP = "http"                # HTTP收集器
    SYSTEM = "system"            # 系统收集器
    LOG = "log"                  # 日志收集器
    CUSTOM = "custom"            # 自定义收集器

@dataclass
class CollectorConfig:
    """收集器配置"""
    name: str
    type: CollectorType
    enabled: bool
    interval_seconds: int
    endpoint: Optional[str] = None
    credentials: Optional[Dict[str, str]] = None
    parameters: Optional[Dict[str, Any]] = None

class MetricCollector(ABC):
    """指标收集器基类"""
    
    def __init__(self, config: CollectorConfig):
        self.config = config
        self.last_collection_time = 0
    
    @abstractmethod
    def collect(self) -> Dict[str, float]:
        """收集指标数据"""
        pass
    
    def should_collect(self) -> bool:
        """判断是否应该收集数据"""
        if not self.config.enabled:
            return False
        
        current_time = time.time()
        return current_time - self.last_collection_time >= self.config.interval_seconds
    
    def update_collection_time(self):
        """更新收集时间"""
        self.last_collection_time = time.time()

class JMXCollector(MetricCollector):
    """JMX指标收集器"""
    
    def __init__(self, config: CollectorConfig):
        super().__init__(config)
        self.jmx_metrics = {
            # Broker JMX指标
            "broker.put.tps": "org.apache.rocketmq:type=BrokerStats,name=PutTps",
            "broker.get.tps": "org.apache.rocketmq:type=BrokerStats,name=GetTps",
            "broker.put.latency": "org.apache.rocketmq:type=BrokerStats,name=PutLatency",
            "broker.get.latency": "org.apache.rocketmq:type=BrokerStats,name=GetLatency",
            "broker.message.store.size": "org.apache.rocketmq:type=BrokerStats,name=MessageStoreSize",
            
            # JVM指标
            "jvm.heap.used": "java.lang:type=Memory,name=HeapMemoryUsage.used",
            "jvm.heap.max": "java.lang:type=Memory,name=HeapMemoryUsage.max",
            "jvm.gc.count": "java.lang:type=GarbageCollector,name=*,attr=CollectionCount",
            "jvm.gc.time": "java.lang:type=GarbageCollector,name=*,attr=CollectionTime",
            "jvm.thread.count": "java.lang:type=Threading,name=ThreadCount"
        }
    
    def collect(self) -> Dict[str, float]:
        """通过JMX收集指标"""
        metrics = {}
        
        try:
            # 这里应该使用实际的JMX客户端
            # 为了示例,我们模拟一些数据
            import random
            
            for metric_name, jmx_path in self.jmx_metrics.items():
                if "tps" in metric_name:
                    metrics[metric_name] = random.uniform(1000, 5000)
                elif "latency" in metric_name:
                    metrics[metric_name] = random.uniform(1, 50)
                elif "size" in metric_name:
                    metrics[metric_name] = random.uniform(1000000, 10000000)
                elif "heap" in metric_name:
                    if "used" in metric_name:
                        metrics[metric_name] = random.uniform(500000000, 2000000000)
                    else:
                        metrics[metric_name] = 4000000000
                elif "gc" in metric_name:
                    if "count" in metric_name:
                        metrics[metric_name] = random.uniform(100, 1000)
                    else:
                        metrics[metric_name] = random.uniform(1000, 10000)
                elif "thread" in metric_name:
                    metrics[metric_name] = random.uniform(50, 200)
            
        except Exception as e:
            print(f"JMX收集失败: {e}")
        
        self.update_collection_time()
        return metrics

class HTTPCollector(MetricCollector):
    """HTTP指标收集器"""
    
    def collect(self) -> Dict[str, float]:
        """通过HTTP API收集指标"""
        metrics = {}
        
        try:
            if not self.config.endpoint:
                return metrics
            
            # 发送HTTP请求获取指标
            response = requests.get(
                self.config.endpoint,
                timeout=10,
                auth=(self.config.credentials.get("username"), 
                     self.config.credentials.get("password")) if self.config.credentials else None
            )
            
            if response.status_code == 200:
                data = response.json()
                # 解析响应数据
                metrics = self._parse_http_response(data)
            
        except Exception as e:
            print(f"HTTP收集失败: {e}")
        
        self.update_collection_time()
        return metrics
    
    def _parse_http_response(self, data: Dict[str, Any]) -> Dict[str, float]:
        """解析HTTP响应数据"""
        metrics = {}
        
        # 根据实际API响应格式解析
        if "metrics" in data:
            for metric in data["metrics"]:
                name = metric.get("name")
                value = metric.get("value")
                if name and isinstance(value, (int, float)):
                    metrics[name] = float(value)
        
        return metrics

class SystemCollector(MetricCollector):
    """系统指标收集器"""
    
    def collect(self) -> Dict[str, float]:
        """收集系统指标"""
        metrics = {}
        
        try:
            # CPU使用率
            cpu_percent = psutil.cpu_percent(interval=1)
            metrics["system.cpu.usage"] = cpu_percent
            
            # 内存使用情况
            memory = psutil.virtual_memory()
            metrics["system.memory.usage"] = memory.percent
            metrics["system.memory.used"] = memory.used
            metrics["system.memory.available"] = memory.available
            
            # 磁盘使用情况
            disk = psutil.disk_usage('/')
            metrics["system.disk.usage"] = (disk.used / disk.total) * 100
            metrics["system.disk.used"] = disk.used
            metrics["system.disk.free"] = disk.free
            
            # 磁盘IO
            disk_io = psutil.disk_io_counters()
            if disk_io:
                metrics["system.disk.read.bytes"] = disk_io.read_bytes
                metrics["system.disk.write.bytes"] = disk_io.write_bytes
                metrics["system.disk.read.count"] = disk_io.read_count
                metrics["system.disk.write.count"] = disk_io.write_count
            
            # 网络IO
            network_io = psutil.net_io_counters()
            if network_io:
                metrics["system.network.bytes.sent"] = network_io.bytes_sent
                metrics["system.network.bytes.recv"] = network_io.bytes_recv
                metrics["system.network.packets.sent"] = network_io.packets_sent
                metrics["system.network.packets.recv"] = network_io.packets_recv
            
            # 负载平均值
            load_avg = psutil.getloadavg()
            metrics["system.load.1min"] = load_avg[0]
            metrics["system.load.5min"] = load_avg[1]
            metrics["system.load.15min"] = load_avg[2]
            
        except Exception as e:
            print(f"系统指标收集失败: {e}")
        
        self.update_collection_time()
        return metrics

class LogCollector(MetricCollector):
    """日志指标收集器"""
    
    def __init__(self, config: CollectorConfig):
        super().__init__(config)
        self.log_patterns = {
            "error.count": r"ERROR",
            "warn.count": r"WARN",
            "send.success": r"Send message success",
            "send.failed": r"Send message failed",
            "consume.success": r"Consume message success",
            "consume.failed": r"Consume message failed"
        }
        self.last_position = 0
    
    def collect(self) -> Dict[str, float]:
        """从日志文件收集指标"""
        metrics = {}
        
        try:
            log_file = self.config.parameters.get("log_file")
            if not log_file:
                return metrics
            
            # 读取日志文件新增内容
            with open(log_file, 'r', encoding='utf-8') as f:
                f.seek(self.last_position)
                new_content = f.read()
                self.last_position = f.tell()
            
            # 统计各种模式的出现次数
            import re
            for metric_name, pattern in self.log_patterns.items():
                count = len(re.findall(pattern, new_content, re.IGNORECASE))
                metrics[metric_name] = count
            
        except Exception as e:
            print(f"日志收集失败: {e}")
        
        self.update_collection_time()
        return metrics

class MetricCollectionManager:
    """指标收集管理器"""
    
    def __init__(self):
        self.collectors: List[MetricCollector] = []
        self.performance_monitor = PerformanceMonitor()
    
    def add_collector(self, collector: MetricCollector):
        """添加收集器"""
        self.collectors.append(collector)
    
    def create_default_collectors(self) -> List[MetricCollector]:
        """创建默认收集器"""
        collectors = []
        
        # JMX收集器
        jmx_config = CollectorConfig(
            name="rocketmq_jmx",
            type=CollectorType.JMX,
            enabled=True,
            interval_seconds=30,
            endpoint="localhost:9876"
        )
        collectors.append(JMXCollector(jmx_config))
        
        # HTTP收集器
        http_config = CollectorConfig(
            name="rocketmq_http",
            type=CollectorType.HTTP,
            enabled=True,
            interval_seconds=30,
            endpoint="http://localhost:8080/metrics"
        )
        collectors.append(HTTPCollector(http_config))
        
        # 系统收集器
        system_config = CollectorConfig(
            name="system_metrics",
            type=CollectorType.SYSTEM,
            enabled=True,
            interval_seconds=60
        )
        collectors.append(SystemCollector(system_config))
        
        # 日志收集器
        log_config = CollectorConfig(
            name="rocketmq_logs",
            type=CollectorType.LOG,
            enabled=True,
            interval_seconds=60,
            parameters={"log_file": "/opt/rocketmq/logs/rocketmqlogs/broker.log"}
        )
        collectors.append(LogCollector(log_config))
        
        return collectors
    
    def collect_all_metrics(self) -> Dict[str, float]:
        """收集所有指标"""
        all_metrics = {}
        
        for collector in self.collectors:
            if collector.should_collect():
                try:
                    metrics = collector.collect()
                    all_metrics.update(metrics)
                    
                    # 将指标记录到性能监控器
                    for metric_name, value in metrics.items():
                        self.performance_monitor.record_metric(metric_name, value)
                        
                except Exception as e:
                    print(f"收集器 {collector.config.name} 收集失败: {e}")
        
        return all_metrics
    
    def start_collection(self):
        """开始收集"""
        print("开始指标收集...")
        
        # 添加默认收集器
        for collector in self.create_default_collectors():
            self.add_collector(collector)
        
        # 持续收集指标
        import threading
        import time
        
        def collection_loop():
            while True:
                try:
                    metrics = self.collect_all_metrics()
                    if metrics:
                        print(f"收集到 {len(metrics)} 个指标")
                    time.sleep(10)  # 每10秒检查一次
                except Exception as e:
                    print(f"收集循环错误: {e}")
                    time.sleep(30)
        
        collection_thread = threading.Thread(target=collection_loop, daemon=True)
        collection_thread.start()
    
    def get_performance_monitor(self) -> PerformanceMonitor:
        """获取性能监控器"""
        return self.performance_monitor

# 使用示例
if __name__ == "__main__":
    # 创建收集管理器
    manager = MetricCollectionManager()
    
    # 开始收集
    manager.start_collection()
    
    # 等待一段时间让收集器工作
    time.sleep(60)
    
    # 获取性能报告
    monitor = manager.get_performance_monitor()
    monitor.print_performance_report()

8.2 参数调优

8.2.1 Broker参数优化

from dataclasses import dataclass
from typing import Dict, List, Optional, Any
from enum import Enum
import os
import json

class ParameterCategory(Enum):
    """参数类别"""
    STORAGE = "storage"          # 存储相关
    NETWORK = "network"          # 网络相关
    MEMORY = "memory"            # 内存相关
    THREAD = "thread"            # 线程相关
    PERFORMANCE = "performance"  # 性能相关
    RELIABILITY = "reliability"  # 可靠性相关

class ParameterType(Enum):
    """参数类型"""
    INTEGER = "integer"
    FLOAT = "float"
    STRING = "string"
    BOOLEAN = "boolean"
    SIZE = "size"                # 大小类型(如1GB, 512MB)
    DURATION = "duration"        # 时间类型(如30s, 5m)

@dataclass
class ParameterDefinition:
    """参数定义"""
    name: str
    category: ParameterCategory
    type: ParameterType
    description: str
    default_value: Any
    recommended_value: Any
    min_value: Optional[Any] = None
    max_value: Optional[Any] = None
    unit: Optional[str] = None
    impact: str = ""  # 性能影响说明
    tuning_tips: List[str] = None

@dataclass
class OptimizationScenario:
    """优化场景"""
    name: str
    description: str
    characteristics: List[str]
    parameter_overrides: Dict[str, Any]
    expected_improvements: List[str]

class BrokerParameterOptimizer:
    """Broker参数优化器"""
    
    def __init__(self):
        self.parameters = self._create_parameter_definitions()
        self.scenarios = self._create_optimization_scenarios()
    
    def _create_parameter_definitions(self) -> Dict[str, ParameterDefinition]:
        """创建参数定义"""
        parameters = {}
        
        # 存储相关参数
        storage_params = [
            ParameterDefinition(
                name="mapedFileSizeCommitLog",
                category=ParameterCategory.STORAGE,
                type=ParameterType.SIZE,
                description="CommitLog文件大小",
                default_value="1GB",
                recommended_value="1GB",
                min_value="128MB",
                max_value="2GB",
                unit="bytes",
                impact="影响单个CommitLog文件大小,过大会影响恢复时间,过小会产生过多文件",
                tuning_tips=[
                    "高吞吐场景建议使用1GB",
                    "低延迟场景可以考虑512MB",
                    "SSD存储可以适当增大到2GB"
                ]
            ),
            ParameterDefinition(
                name="mapedFileSizeConsumeQueue",
                category=ParameterCategory.STORAGE,
                type=ParameterType.SIZE,
                description="ConsumeQueue文件大小",
                default_value="6MB",
                recommended_value="6MB",
                min_value="1MB",
                max_value="32MB",
                unit="bytes",
                impact="影响消费队列文件大小,影响消费性能",
                tuning_tips=[
                    "通常保持默认值即可",
                    "高并发消费场景可以适当增大"
                ]
            ),
            ParameterDefinition(
                name="flushDiskType",
                category=ParameterCategory.STORAGE,
                type=ParameterType.STRING,
                description="刷盘方式",
                default_value="ASYNC_FLUSH",
                recommended_value="ASYNC_FLUSH",
                impact="SYNC_FLUSH保证数据安全但性能较低,ASYNC_FLUSH性能高但有数据丢失风险",
                tuning_tips=[
                    "高性能场景使用ASYNC_FLUSH",
                    "高可靠性场景使用SYNC_FLUSH",
                    "可以通过主从复制提高可靠性"
                ]
            ),
            ParameterDefinition(
                name="flushIntervalCommitLog",
                category=ParameterCategory.STORAGE,
                type=ParameterType.DURATION,
                description="CommitLog刷盘间隔",
                default_value="500ms",
                recommended_value="200ms",
                min_value="100ms",
                max_value="5000ms",
                unit="milliseconds",
                impact="影响数据持久化频率和性能",
                tuning_tips=[
                    "SSD存储可以设置更小的间隔",
                    "机械硬盘建议500ms以上",
                    "高吞吐场景可以适当增大间隔"
                ]
            ),
            ParameterDefinition(
                name="flushCommitLogTimed",
                category=ParameterCategory.STORAGE,
                type=ParameterType.BOOLEAN,
                description="是否定时刷盘",
                default_value=True,
                recommended_value=True,
                impact="控制是否按时间间隔刷盘",
                tuning_tips=[
                    "建议开启定时刷盘",
                    "配合flushIntervalCommitLog使用"
                ]
            )
        ]
        
        # 网络相关参数
        network_params = [
            ParameterDefinition(
                name="sendMessageThreadPoolNums",
                category=ParameterCategory.NETWORK,
                type=ParameterType.INTEGER,
                description="发送消息线程池大小",
                default_value=1,
                recommended_value=4,
                min_value=1,
                max_value=32,
                impact="影响消息发送的并发处理能力",
                tuning_tips=[
                    "根据CPU核数调整,建议为CPU核数的1-2倍",
                    "高并发场景可以适当增大",
                    "过大会增加上下文切换开销"
                ]
            ),
            ParameterDefinition(
                name="pullMessageThreadPoolNums",
                category=ParameterCategory.NETWORK,
                type=ParameterType.INTEGER,
                description="拉取消息线程池大小",
                default_value=16,
                recommended_value=32,
                min_value=8,
                max_value=64,
                impact="影响消息拉取的并发处理能力",
                tuning_tips=[
                    "消费者较多时可以适当增大",
                    "建议为CPU核数的2-4倍"
                ]
            ),
            ParameterDefinition(
                name="serverWorkerThreads",
                category=ParameterCategory.NETWORK,
                type=ParameterType.INTEGER,
                description="Netty工作线程数",
                default_value=8,
                recommended_value=16,
                min_value=4,
                max_value=64,
                impact="影响网络IO处理能力",
                tuning_tips=[
                    "建议设置为CPU核数的2倍",
                    "高并发场景可以适当增大"
                ]
            ),
            ParameterDefinition(
                name="serverCallbackExecutorThreads",
                category=ParameterCategory.NETWORK,
                type=ParameterType.INTEGER,
                description="回调执行线程数",
                default_value=0,
                recommended_value=4,
                min_value=0,
                max_value=16,
                impact="影响回调处理性能",
                tuning_tips=[
                    "0表示使用公共线程池",
                    "有大量回调时建议设置专用线程"
                ]
            )
        ]
        
        # 内存相关参数
        memory_params = [
            ParameterDefinition(
                name="transientStorePoolEnable",
                category=ParameterCategory.MEMORY,
                type=ParameterType.BOOLEAN,
                description="是否启用堆外内存池",
                default_value=False,
                recommended_value=True,
                impact="启用后可以减少GC压力,提高性能",
                tuning_tips=[
                    "64位JVM且内存充足时建议启用",
                    "需要配合transientStorePoolSize使用"
                ]
            ),
            ParameterDefinition(
                name="transientStorePoolSize",
                category=ParameterCategory.MEMORY,
                type=ParameterType.INTEGER,
                description="堆外内存池大小",
                default_value=5,
                recommended_value=10,
                min_value=3,
                max_value=20,
                impact="影响堆外内存使用量",
                tuning_tips=[
                    "建议设置为CommitLog文件数量的2倍",
                    "内存充足时可以适当增大"
                ]
            ),
            ParameterDefinition(
                name="useReentrantLockWhenPutMessage",
                category=ParameterCategory.MEMORY,
                type=ParameterType.BOOLEAN,
                description="写消息时使用可重入锁",
                default_value=True,
                recommended_value=True,
                impact="影响写消息的并发性能",
                tuning_tips=[
                    "建议保持默认值",
                    "高并发写入场景必须启用"
                ]
            )
        ]
        
        # 性能相关参数
        performance_params = [
            ParameterDefinition(
                name="putMsgIndexHightWater",
                category=ParameterCategory.PERFORMANCE,
                type=ParameterType.INTEGER,
                description="消息索引高水位",
                default_value=600000,
                recommended_value=600000,
                min_value=100000,
                max_value=2000000,
                impact="影响索引构建性能",
                tuning_tips=[
                    "高吞吐场景可以适当增大",
                    "内存不足时可以适当减小"
                ]
            ),
            ParameterDefinition(
                name="maxTransferBytesOnMessageInMemory",
                category=ParameterCategory.PERFORMANCE,
                type=ParameterType.SIZE,
                description="内存中消息传输最大字节数",
                default_value="256KB",
                recommended_value="512KB",
                min_value="64KB",
                max_value="2MB",
                unit="bytes",
                impact="影响消息传输性能",
                tuning_tips=[
                    "网络带宽充足时可以增大",
                    "大消息场景建议增大此值"
                ]
            ),
            ParameterDefinition(
                name="maxTransferCountOnMessageInMemory",
                category=ParameterCategory.PERFORMANCE,
                type=ParameterType.INTEGER,
                description="内存中消息传输最大数量",
                default_value=32,
                recommended_value=64,
                min_value=16,
                max_value=256,
                impact="影响批量传输性能",
                tuning_tips=[
                    "小消息场景可以增大此值",
                    "配合maxTransferBytesOnMessageInMemory调整"
                ]
            )
        ]
        
        # 合并所有参数
        all_params = storage_params + network_params + memory_params + performance_params
        for param in all_params:
            parameters[param.name] = param
        
        return parameters
    
    def _create_optimization_scenarios(self) -> Dict[str, OptimizationScenario]:
        """创建优化场景"""
        scenarios = {}
        
        # 高吞吐场景
        scenarios["high_throughput"] = OptimizationScenario(
            name="高吞吐优化",
            description="适用于高吞吐量、对延迟要求不高的场景",
            characteristics=[
                "消息量大(>10万TPS)",
                "对延迟容忍度较高(>100ms)",
                "批量处理为主",
                "存储空间充足"
            ],
            parameter_overrides={
                "mapedFileSizeCommitLog": "2GB",
                "flushIntervalCommitLog": "1000ms",
                "sendMessageThreadPoolNums": 8,
                "pullMessageThreadPoolNums": 64,
                "transientStorePoolEnable": True,
                "transientStorePoolSize": 15,
                "maxTransferBytesOnMessageInMemory": "1MB",
                "maxTransferCountOnMessageInMemory": 128
            },
            expected_improvements=[
                "提高消息写入吞吐量",
                "减少文件切换频率",
                "提高批量处理效率",
                "降低磁盘IO压力"
            ]
        )
        
        # 低延迟场景
        scenarios["low_latency"] = OptimizationScenario(
            name="低延迟优化",
            description="适用于对延迟敏感的实时处理场景",
            characteristics=[
                "对延迟要求极高(<10ms)",
                "消息量中等(<5万TPS)",
                "实时处理为主",
                "可以牺牲部分吞吐量"
            ],
            parameter_overrides={
                "flushDiskType": "ASYNC_FLUSH",
                "flushIntervalCommitLog": "100ms",
                "sendMessageThreadPoolNums": 16,
                "serverWorkerThreads": 32,
                "transientStorePoolEnable": True,
                "useReentrantLockWhenPutMessage": True,
                "maxTransferBytesOnMessageInMemory": "128KB",
                "maxTransferCountOnMessageInMemory": 16
            },
            expected_improvements=[
                "降低消息处理延迟",
                "提高实时响应能力",
                "减少消息堆积",
                "提高并发处理能力"
            ]
        )
        
        # 高可靠场景
        scenarios["high_reliability"] = OptimizationScenario(
            name="高可靠性优化",
            description="适用于对数据安全性要求极高的场景",
            characteristics=[
                "不能容忍数据丢失",
                "可以牺牲性能保证可靠性",
                "金融、支付等关键业务",
                "需要强一致性保证"
            ],
            parameter_overrides={
                "flushDiskType": "SYNC_FLUSH",
                "flushIntervalCommitLog": "100ms",
                "flushCommitLogTimed": True,
                "transientStorePoolEnable": False,
                "useReentrantLockWhenPutMessage": True
            },
            expected_improvements=[
                "保证数据不丢失",
                "提高数据一致性",
                "增强故障恢复能力",
                "满足合规要求"
            ]
        )
        
        # 资源受限场景
        scenarios["resource_limited"] = OptimizationScenario(
            name="资源受限优化",
            description="适用于硬件资源有限的环境",
            characteristics=[
                "CPU核数较少(<8核)",
                "内存有限(<8GB)",
                "存储空间有限",
                "网络带宽有限"
            ],
            parameter_overrides={
                "mapedFileSizeCommitLog": "512MB",
                "sendMessageThreadPoolNums": 2,
                "pullMessageThreadPoolNums": 8,
                "serverWorkerThreads": 4,
                "transientStorePoolEnable": False,
                "maxTransferBytesOnMessageInMemory": "64KB",
                "maxTransferCountOnMessageInMemory": 16,
                "putMsgIndexHightWater": 200000
            },
            expected_improvements=[
                "降低资源消耗",
                "减少内存使用",
                "降低CPU负载",
                "适应有限环境"
            ]
        )
        
        return scenarios
    
    def get_parameter_recommendations(self, scenario: str = None) -> Dict[str, Any]:
        """获取参数推荐配置"""
        if scenario and scenario in self.scenarios:
            # 基于场景的推荐
            base_config = {param.name: param.recommended_value 
                          for param in self.parameters.values()}
            scenario_config = self.scenarios[scenario]
            base_config.update(scenario_config.parameter_overrides)
            return base_config
        else:
            # 通用推荐配置
            return {param.name: param.recommended_value 
                   for param in self.parameters.values()}
    
    def generate_broker_config(self, scenario: str = None, 
                              custom_overrides: Dict[str, Any] = None) -> str:
        """生成Broker配置文件"""
        config = self.get_parameter_recommendations(scenario)
        
        if custom_overrides:
            config.update(custom_overrides)
        
        config_lines = []
        config_lines.append("# RocketMQ Broker Configuration")
        config_lines.append(f"# Generated for scenario: {scenario or 'general'}")
        config_lines.append("# Auto-generated by BrokerParameterOptimizer")
        config_lines.append("")
        
        # 按类别组织配置
        categories = {}
        for param_name, value in config.items():
            param_def = self.parameters.get(param_name)
            if param_def:
                category = param_def.category.value
                if category not in categories:
                    categories[category] = []
                categories[category].append((param_name, value, param_def))
        
        for category, params in categories.items():
            config_lines.append(f"# {category.upper()} Parameters")
            for param_name, value, param_def in params:
                config_lines.append(f"# {param_def.description}")
                if param_def.impact:
                    config_lines.append(f"# Impact: {param_def.impact}")
                config_lines.append(f"{param_name}={value}")
                config_lines.append("")
        
        return "\n".join(config_lines)
    
    def analyze_current_config(self, config_file: str) -> Dict[str, Any]:
        """分析当前配置"""
        analysis = {
            "issues": [],
            "recommendations": [],
            "optimizations": []
        }
        
        try:
            # 读取当前配置
            current_config = {}
            with open(config_file, 'r', encoding='utf-8') as f:
                for line in f:
                    line = line.strip()
                    if line and not line.startswith('#') and '=' in line:
                        key, value = line.split('=', 1)
                        current_config[key.strip()] = value.strip()
            
            # 分析每个参数
            for param_name, param_def in self.parameters.items():
                current_value = current_config.get(param_name)
                recommended_value = param_def.recommended_value
                
                if current_value is None:
                    analysis["issues"].append(
                        f"缺少参数 {param_name},建议设置为 {recommended_value}"
                    )
                elif str(current_value) != str(recommended_value):
                    analysis["recommendations"].append(
                        f"参数 {param_name} 当前值: {current_value}, 建议值: {recommended_value}"
                    )
            
            # 提供优化建议
            analysis["optimizations"] = [
                "考虑根据实际业务场景选择优化方案",
                "定期监控性能指标并调整参数",
                "在测试环境验证参数变更效果",
                "建立参数变更的回滚机制"
            ]
            
        except Exception as e:
            analysis["issues"].append(f"配置文件分析失败: {e}")
        
        return analysis
    
    def print_parameter_guide(self, category: ParameterCategory = None):
        """打印参数调优指南"""
        print("=== RocketMQ Broker参数调优指南 ===")
        
        if category:
            params = [p for p in self.parameters.values() if p.category == category]
            print(f"\n📋 {category.value.upper()} 类别参数:")
        else:
            params = list(self.parameters.values())
            print("\n📋 所有参数:")
        
        for param in params:
            print(f"\n🔧 {param.name}")
            print(f"   描述: {param.description}")
            print(f"   类别: {param.category.value}")
            print(f"   默认值: {param.default_value}")
            print(f"   推荐值: {param.recommended_value}")
            if param.unit:
                print(f"   单位: {param.unit}")
            if param.impact:
                print(f"   影响: {param.impact}")
            if param.tuning_tips:
                print(f"   调优建议:")
                for tip in param.tuning_tips:
                    print(f"     • {tip}")
    
    def print_scenario_guide(self):
        """打印场景优化指南"""
        print("=== RocketMQ优化场景指南 ===")
        
        for scenario_name, scenario in self.scenarios.items():
            print(f"\n🎯 {scenario.name}")
            print(f"描述: {scenario.description}")
            
            print(f"\n特征:")
            for char in scenario.characteristics:
                print(f"  • {char}")
            
            print(f"\n参数调整:")
            for param, value in scenario.parameter_overrides.items():
                print(f"  • {param} = {value}")
            
            print(f"\n预期改进:")
            for improvement in scenario.expected_improvements:
                print(f"  • {improvement}")

# 使用示例
if __name__ == "__main__":
    # 创建参数优化器
    optimizer = BrokerParameterOptimizer()
    
    # 打印参数指南
    optimizer.print_parameter_guide(ParameterCategory.STORAGE)
    
    # 打印场景指南
    optimizer.print_scenario_guide()
    
    # 生成高吞吐场景的配置
    config = optimizer.generate_broker_config("high_throughput")
    print("\n=== 高吞吐场景配置 ===")
    print(config)
    
    # 分析现有配置
    # analysis = optimizer.analyze_current_config("/opt/rocketmq/conf/broker.conf")
    # print(f"\n配置分析结果: {analysis}")

8.2.2 JVM参数优化

from dataclasses import dataclass
from typing import Dict, List, Optional, Any
from enum import Enum
import platform
import psutil

class JVMType(Enum):
    """JVM类型"""
    ORACLE_HOTSPOT = "oracle_hotspot"
    OPENJDK = "openjdk"
    GRAALVM = "graalvm"
    AZUL_ZULU = "azul_zulu"

class GCType(Enum):
    """垃圾收集器类型"""
    G1GC = "g1gc"
    CMS = "cms"
    PARALLEL = "parallel"
    ZGC = "zgc"
    SHENANDOAH = "shenandoah"

class JVMScenario(Enum):
    """JVM优化场景"""
    HIGH_THROUGHPUT = "high_throughput"    # 高吞吐
    LOW_LATENCY = "low_latency"            # 低延迟
    LARGE_HEAP = "large_heap"              # 大堆内存
    MEMORY_LIMITED = "memory_limited"      # 内存受限

@dataclass
class JVMParameter:
    """JVM参数"""
    name: str
    value: str
    description: str
    category: str
    impact: str = ""
    conditions: List[str] = None  # 适用条件

@dataclass
class JVMConfiguration:
    """JVM配置"""
    scenario: JVMScenario
    gc_type: GCType
    heap_size: str
    parameters: List[JVMParameter]
    description: str
    requirements: List[str]

class JVMOptimizer:
    """JVM参数优化器"""
    
    def __init__(self):
        self.system_info = self._get_system_info()
        self.configurations = self._create_jvm_configurations()
    
    def _get_system_info(self) -> Dict[str, Any]:
        """获取系统信息"""
        return {
            "cpu_count": psutil.cpu_count(),
            "memory_total": psutil.virtual_memory().total,
            "memory_available": psutil.virtual_memory().available,
            "platform": platform.system(),
            "architecture": platform.architecture()[0]
        }
    
    def _create_jvm_configurations(self) -> Dict[JVMScenario, List[JVMConfiguration]]:
        """创建JVM配置"""
        configurations = {}
        
        # 高吞吐场景配置
        configurations[JVMScenario.HIGH_THROUGHPUT] = [
            JVMConfiguration(
                scenario=JVMScenario.HIGH_THROUGHPUT,
                gc_type=GCType.G1GC,
                heap_size="8g",
                description="适用于高吞吐量场景的G1GC配置",
                requirements=[
                    "内存 >= 16GB",
                    "CPU >= 8核",
                    "对延迟要求不高",
                    "消息量大"
                ],
                parameters=[
                    JVMParameter(
                        name="-Xms",
                        value="8g",
                        description="初始堆大小",
                        category="memory",
                        impact="设置与-Xmx相同避免动态扩容"
                    ),
                    JVMParameter(
                        name="-Xmx",
                        value="8g",
                        description="最大堆大小",
                        category="memory",
                        impact="建议为系统内存的50-70%"
                    ),
                    JVMParameter(
                        name="-XX:+UseG1GC",
                        value="",
                        description="使用G1垃圾收集器",
                        category="gc",
                        impact="适合大堆内存和低延迟要求"
                    ),
                    JVMParameter(
                        name="-XX:MaxGCPauseMillis",
                        value="200",
                        description="最大GC暂停时间",
                        category="gc",
                        impact="G1GC的暂停时间目标"
                    ),
                    JVMParameter(
                        name="-XX:G1HeapRegionSize",
                        value="16m",
                        description="G1堆区域大小",
                        category="gc",
                        impact="影响G1的内存管理粒度"
                    ),
                    JVMParameter(
                        name="-XX:G1NewSizePercent",
                        value="30",
                        description="新生代占堆的百分比",
                        category="gc",
                        impact="影响年轻代GC频率"
                    ),
                    JVMParameter(
                        name="-XX:G1MaxNewSizePercent",
                        value="40",
                        description="新生代最大占堆的百分比",
                        category="gc",
                        impact="限制年轻代最大大小"
                    ),
                    JVMParameter(
                        name="-XX:+UnlockExperimentalVMOptions",
                        value="",
                        description="解锁实验性VM选项",
                        category="experimental",
                        impact="启用实验性优化"
                    ),
                    JVMParameter(
                        name="-XX:+UseStringDeduplication",
                        value="",
                        description="启用字符串去重",
                        category="memory",
                        impact="减少重复字符串的内存占用"
                    ),
                    JVMParameter(
                        name="-XX:+DisableExplicitGC",
                        value="",
                        description="禁用显式GC",
                        category="gc",
                        impact="防止System.gc()调用"
                    )
                ]
            ),
            JVMConfiguration(
                scenario=JVMScenario.HIGH_THROUGHPUT,
                gc_type=GCType.PARALLEL,
                heap_size="8g",
                description="适用于高吞吐量场景的Parallel GC配置",
                requirements=[
                    "内存 >= 16GB",
                    "CPU >= 8核",
                    "可以容忍较长的GC暂停",
                    "追求最大吞吐量"
                ],
                parameters=[
                    JVMParameter(
                        name="-Xms",
                        value="8g",
                        description="初始堆大小",
                        category="memory"
                    ),
                    JVMParameter(
                        name="-Xmx",
                        value="8g",
                        description="最大堆大小",
                        category="memory"
                    ),
                    JVMParameter(
                        name="-XX:+UseParallelGC",
                        value="",
                        description="使用并行垃圾收集器",
                        category="gc",
                        impact="适合CPU密集型应用"
                    ),
                    JVMParameter(
                        name="-XX:ParallelGCThreads",
                        value="8",
                        description="并行GC线程数",
                        category="gc",
                        impact="建议设置为CPU核数"
                    ),
                    JVMParameter(
                        name="-XX:+UseAdaptiveSizePolicy",
                        value="",
                        description="启用自适应大小策略",
                        category="gc",
                        impact="自动调整堆各代大小"
                    )
                ]
            )
        ]
        
        # 低延迟场景配置
        configurations[JVMScenario.LOW_LATENCY] = [
            JVMConfiguration(
                scenario=JVMScenario.LOW_LATENCY,
                gc_type=GCType.G1GC,
                heap_size="4g",
                description="适用于低延迟场景的G1GC配置",
                requirements=[
                    "对延迟要求极高(<10ms)",
                    "内存 >= 8GB",
                    "实时处理场景"
                ],
                parameters=[
                    JVMParameter(
                        name="-Xms",
                        value="4g",
                        description="初始堆大小",
                        category="memory"
                    ),
                    JVMParameter(
                        name="-Xmx",
                        value="4g",
                        description="最大堆大小",
                        category="memory"
                    ),
                    JVMParameter(
                        name="-XX:+UseG1GC",
                        value="",
                        description="使用G1垃圾收集器",
                        category="gc"
                    ),
                    JVMParameter(
                        name="-XX:MaxGCPauseMillis",
                        value="10",
                        description="最大GC暂停时间",
                        category="gc",
                        impact="极低的暂停时间目标"
                    ),
                    JVMParameter(
                        name="-XX:G1HeapRegionSize",
                        value="8m",
                        description="G1堆区域大小",
                        category="gc",
                        impact="较小的区域大小减少暂停时间"
                    ),
                    JVMParameter(
                        name="-XX:+G1UseAdaptiveIHOP",
                        value="",
                        description="自适应IHOP",
                        category="gc",
                        impact="动态调整并发标记阈值"
                    ),
                    JVMParameter(
                        name="-XX:G1MixedGCCountTarget",
                        value="8",
                        description="混合GC目标次数",
                        category="gc",
                        impact="减少单次GC的工作量"
                    )
                ]
            ),
            JVMConfiguration(
                scenario=JVMScenario.LOW_LATENCY,
                gc_type=GCType.ZGC,
                heap_size="4g",
                description="适用于低延迟场景的ZGC配置(JDK 11+)",
                requirements=[
                    "JDK 11+",
                    "对延迟要求极高(<1ms)",
                    "内存 >= 8GB",
                    "可以接受额外的CPU开销"
                ],
                parameters=[
                    JVMParameter(
                        name="-Xms",
                        value="4g",
                        description="初始堆大小",
                        category="memory"
                    ),
                    JVMParameter(
                        name="-Xmx",
                        value="4g",
                        description="最大堆大小",
                        category="memory"
                    ),
                    JVMParameter(
                        name="-XX:+UnlockExperimentalVMOptions",
                        value="",
                        description="解锁实验性选项",
                        category="experimental"
                    ),
                    JVMParameter(
                        name="-XX:+UseZGC",
                        value="",
                        description="使用ZGC收集器",
                        category="gc",
                        impact="超低延迟垃圾收集器"
                    ),
                    JVMParameter(
                        name="-XX:+UseLargePages",
                        value="",
                        description="使用大页内存",
                        category="memory",
                        impact="提高内存访问性能"
                    )
                ]
            )
        ]
        
        # 大堆内存场景配置
        configurations[JVMScenario.LARGE_HEAP] = [
            JVMConfiguration(
                scenario=JVMScenario.LARGE_HEAP,
                gc_type=GCType.G1GC,
                heap_size="32g",
                description="适用于大堆内存场景的G1GC配置",
                requirements=[
                    "内存 >= 64GB",
                    "需要大量内存缓存",
                    "长时间运行的应用"
                ],
                parameters=[
                    JVMParameter(
                        name="-Xms",
                        value="32g",
                        description="初始堆大小",
                        category="memory"
                    ),
                    JVMParameter(
                        name="-Xmx",
                        value="32g",
                        description="最大堆大小",
                        category="memory"
                    ),
                    JVMParameter(
                        name="-XX:+UseG1GC",
                        value="",
                        description="使用G1垃圾收集器",
                        category="gc"
                    ),
                    JVMParameter(
                        name="-XX:MaxGCPauseMillis",
                        value="100",
                        description="最大GC暂停时间",
                        category="gc"
                    ),
                    JVMParameter(
                        name="-XX:G1HeapRegionSize",
                        value="32m",
                        description="G1堆区域大小",
                        category="gc",
                        impact="大堆需要更大的区域"
                    ),
                    JVMParameter(
                        name="-XX:+UseLargePages",
                        value="",
                        description="使用大页内存",
                        category="memory",
                        impact="大堆内存必须启用"
                    ),
                    JVMParameter(
                        name="-XX:LargePageSizeInBytes",
                        value="2m",
                        description="大页大小",
                        category="memory"
                    )
                ]
            )
        ]
        
        # 内存受限场景配置
        configurations[JVMScenario.MEMORY_LIMITED] = [
            JVMConfiguration(
                scenario=JVMScenario.MEMORY_LIMITED,
                gc_type=GCType.G1GC,
                heap_size="2g",
                description="适用于内存受限场景的配置",
                requirements=[
                    "内存 < 8GB",
                    "资源受限环境",
                    "容器化部署"
                ],
                parameters=[
                    JVMParameter(
                        name="-Xms",
                        value="2g",
                        description="初始堆大小",
                        category="memory"
                    ),
                    JVMParameter(
                        name="-Xmx",
                        value="2g",
                        description="最大堆大小",
                        category="memory"
                    ),
                    JVMParameter(
                        name="-XX:+UseG1GC",
                        value="",
                        description="使用G1垃圾收集器",
                        category="gc"
                    ),
                    JVMParameter(
                        name="-XX:MaxGCPauseMillis",
                        value="50",
                        description="最大GC暂停时间",
                        category="gc"
                    ),
                    JVMParameter(
                        name="-XX:+UseStringDeduplication",
                        value="",
                        description="启用字符串去重",
                        category="memory",
                        impact="节省内存空间"
                    ),
                    JVMParameter(
                        name="-XX:+UseCompressedOops",
                        value="",
                        description="压缩对象指针",
                        category="memory",
                        impact="减少内存占用"
                    ),
                    JVMParameter(
                        name="-XX:+UseCompressedClassPointers",
                        value="",
                        description="压缩类指针",
                        category="memory",
                        impact="进一步减少内存占用"
                    )
                ]
            )
        ]
        
        return configurations
    
    def recommend_configuration(self, scenario: JVMScenario, 
                              memory_gb: int = None) -> JVMConfiguration:
        """推荐JVM配置"""
        configs = self.configurations.get(scenario, [])
        
        if not configs:
            raise ValueError(f"不支持的场景: {scenario}")
        
        # 如果指定了内存大小,调整堆大小
        if memory_gb:
            recommended_heap = min(memory_gb * 0.7, memory_gb - 2)  # 预留2GB给系统
            
            # 选择最适合的配置
            best_config = configs[0]
            for config in configs:
                current_heap = int(config.heap_size.rstrip('g'))
                if abs(current_heap - recommended_heap) < abs(int(best_config.heap_size.rstrip('g')) - recommended_heap):
                    best_config = config
            
            # 调整堆大小参数
            adjusted_config = JVMConfiguration(
                scenario=best_config.scenario,
                gc_type=best_config.gc_type,
                heap_size=f"{int(recommended_heap)}g",
                description=best_config.description,
                requirements=best_config.requirements,
                parameters=[]
            )
            
            for param in best_config.parameters:
                if param.name in ["-Xms", "-Xmx"]:
                    adjusted_param = JVMParameter(
                        name=param.name,
                        value=f"{int(recommended_heap)}g",
                        description=param.description,
                        category=param.category,
                        impact=param.impact,
                        conditions=param.conditions
                    )
                    adjusted_config.parameters.append(adjusted_param)
                else:
                    adjusted_config.parameters.append(param)
            
            return adjusted_config
        
        return configs[0]
    
    def generate_jvm_options(self, configuration: JVMConfiguration, 
                           additional_options: List[str] = None) -> str:
        """生成JVM启动参数"""
        options = []
        
        # 添加配置参数
        for param in configuration.parameters:
            if param.value:
                options.append(f"{param.name}={param.value}")
            else:
                options.append(param.name)
        
        # 添加通用参数
        common_options = [
            "-server",
            "-Djava.awt.headless=true",
            "-Dfile.encoding=UTF-8",
            "-Duser.timezone=Asia/Shanghai",
            "-XX:+HeapDumpOnOutOfMemoryError",
            "-XX:HeapDumpPath=./heapdump.hprof",
            "-XX:+PrintGCDetails",
            "-XX:+PrintGCTimeStamps",
            "-XX:+PrintGCApplicationStoppedTime",
            "-Xloggc:./gc.log"
        ]
        
        options.extend(common_options)
        
        # 添加额外选项
        if additional_options:
            options.extend(additional_options)
        
        return " ".join(options)
    
    def analyze_gc_log(self, gc_log_path: str) -> Dict[str, Any]:
        """分析GC日志"""
        analysis = {
            "total_gc_time": 0,
            "gc_count": 0,
            "avg_gc_time": 0,
            "max_gc_time": 0,
            "recommendations": []
        }
        
        try:
            with open(gc_log_path, 'r') as f:
                gc_times = []
                for line in f:
                    # 简单的GC时间解析(实际实现需要更复杂的解析逻辑)
                    if "GC" in line and "ms" in line:
                        # 提取GC时间(这里是简化的实现)
                        import re
                        time_match = re.search(r'(\d+\.\d+)ms', line)
                        if time_match:
                            gc_time = float(time_match.group(1))
                            gc_times.append(gc_time)
                
                if gc_times:
                    analysis["total_gc_time"] = sum(gc_times)
                    analysis["gc_count"] = len(gc_times)
                    analysis["avg_gc_time"] = sum(gc_times) / len(gc_times)
                    analysis["max_gc_time"] = max(gc_times)
                    
                    # 生成建议
                    if analysis["avg_gc_time"] > 100:
                        analysis["recommendations"].append("平均GC时间过长,考虑调整堆大小或GC策略")
                    
                    if analysis["max_gc_time"] > 1000:
                        analysis["recommendations"].append("最大GC时间过长,可能需要优化应用或调整JVM参数")
                    
                    if analysis["gc_count"] > 1000:
                        analysis["recommendations"].append("GC频率过高,考虑增加堆大小")
        
        except Exception as e:
            analysis["error"] = f"GC日志分析失败: {e}"
        
        return analysis
    
    def print_configuration_guide(self, scenario: JVMScenario = None):
        """打印JVM配置指南"""
        print("=== RocketMQ JVM优化指南 ===")
        
        if scenario:
            configs = self.configurations.get(scenario, [])
            print(f"\n🎯 {scenario.value.upper()} 场景配置:")
        else:
            configs = []
            for scenario_configs in self.configurations.values():
                configs.extend(scenario_configs)
            print("\n🎯 所有场景配置:")
        
        for config in configs:
            print(f"\n📋 {config.description}")
            print(f"场景: {config.scenario.value}")
            print(f"GC类型: {config.gc_type.value}")
            print(f"堆大小: {config.heap_size}")
            
            print(f"\n要求:")
            for req in config.requirements:
                print(f"  • {req}")
            
            print(f"\n参数:")
            for param in config.parameters:
                print(f"  • {param.name}={param.value} - {param.description}")
                if param.impact:
                    print(f"    影响: {param.impact}")

# 使用示例
if __name__ == "__main__":
    # 创建JVM优化器
    optimizer = JVMOptimizer()
    
    # 打印配置指南
    optimizer.print_configuration_guide(JVMScenario.LOW_LATENCY)
    
    # 推荐配置
    config = optimizer.recommend_configuration(JVMScenario.HIGH_THROUGHPUT, memory_gb=16)
    print(f"\n推荐配置: {config.description}")
    
    # 生成JVM选项
    jvm_options = optimizer.generate_jvm_options(config)
    print(f"\nJVM启动参数:\n{jvm_options}")
    
    # 分析GC日志
    # gc_analysis = optimizer.analyze_gc_log("./gc.log")
    # print(f"\nGC分析结果: {gc_analysis}")

8.2.3 生产者和消费者参数调优

from dataclasses import dataclass
from typing import Dict, List, Optional, Any
from enum import Enum
import time

class ClientType(Enum):
    """客户端类型"""
    PRODUCER = "producer"
    CONSUMER = "consumer"

class OptimizationGoal(Enum):
    """优化目标"""
    THROUGHPUT = "throughput"      # 吞吐量
    LATENCY = "latency"            # 延迟
    RELIABILITY = "reliability"    # 可靠性
    RESOURCE = "resource"          # 资源使用

@dataclass
class ClientParameter:
    """客户端参数"""
    name: str
    client_type: ClientType
    default_value: Any
    recommended_value: Any
    description: str
    impact: str
    tuning_tips: List[str] = None
    optimization_goals: List[OptimizationGoal] = None

@dataclass
class ClientConfiguration:
    """客户端配置"""
    name: str
    client_type: ClientType
    optimization_goal: OptimizationGoal
    description: str
    parameters: Dict[str, Any]
    expected_performance: Dict[str, str]
    use_cases: List[str]

class ClientParameterOptimizer:
    """客户端参数优化器"""
    
    def __init__(self):
        self.parameters = self._create_parameter_definitions()
        self.configurations = self._create_client_configurations()
    
    def _create_parameter_definitions(self) -> Dict[str, ClientParameter]:
        """创建参数定义"""
        parameters = {}
        
        # 生产者参数
        producer_params = [
            ClientParameter(
                name="sendMsgTimeout",
                client_type=ClientType.PRODUCER,
                default_value=3000,
                recommended_value=5000,
                description="发送消息超时时间(ms)",
                impact="影响发送消息的超时控制",
                tuning_tips=[
                    "网络不稳定时适当增大",
                    "低延迟场景可以减小",
                    "建议设置为网络RTT的10-20倍"
                ],
                optimization_goals=[OptimizationGoal.RELIABILITY, OptimizationGoal.LATENCY]
            ),
            ClientParameter(
                name="retryTimesWhenSendFailed",
                client_type=ClientType.PRODUCER,
                default_value=2,
                recommended_value=3,
                description="发送失败重试次数",
                impact="影响消息发送的可靠性",
                tuning_tips=[
                    "高可靠性场景增加重试次数",
                    "低延迟场景减少重试次数",
                    "配合重试间隔使用"
                ],
                optimization_goals=[OptimizationGoal.RELIABILITY]
            ),
            ClientParameter(
                name="retryTimesWhenSendAsyncFailed",
                client_type=ClientType.PRODUCER,
                default_value=2,
                recommended_value=3,
                description="异步发送失败重试次数",
                impact="影响异步发送的可靠性",
                tuning_tips=[
                    "异步发送场景的重试控制",
                    "避免过多重试影响性能"
                ],
                optimization_goals=[OptimizationGoal.RELIABILITY, OptimizationGoal.THROUGHPUT]
            ),
            ClientParameter(
                name="maxMessageSize",
                client_type=ClientType.PRODUCER,
                default_value=4194304,  # 4MB
                recommended_value=4194304,
                description="最大消息大小(bytes)",
                impact="限制单个消息的大小",
                tuning_tips=[
                    "大消息场景可以适当增大",
                    "需要与Broker配置保持一致",
                    "过大会影响网络传输性能"
                ],
                optimization_goals=[OptimizationGoal.THROUGHPUT]
            ),
            ClientParameter(
                name="compressMsgBodyOverHowmuch",
                client_type=ClientType.PRODUCER,
                default_value=4096,
                recommended_value=4096,
                description="消息体压缩阈值(bytes)",
                impact="超过此大小的消息会被压缩",
                tuning_tips=[
                    "大消息场景建议启用压缩",
                    "CPU资源充足时可以降低阈值",
                    "网络带宽有限时建议压缩"
                ],
                optimization_goals=[OptimizationGoal.THROUGHPUT, OptimizationGoal.RESOURCE]
            ),
            ClientParameter(
                name="defaultTopicQueueNums",
                client_type=ClientType.PRODUCER,
                default_value=4,
                recommended_value=8,
                description="默认Topic队列数量",
                impact="影响消息分发的并行度",
                tuning_tips=[
                    "高并发场景增加队列数量",
                    "队列数量应该是消费者数量的倍数",
                    "过多队列会增加管理开销"
                ],
                optimization_goals=[OptimizationGoal.THROUGHPUT]
            )
        ]
        
        # 消费者参数
        consumer_params = [
            ClientParameter(
                name="consumeMessageBatchMaxSize",
                client_type=ClientType.CONSUMER,
                default_value=1,
                recommended_value=32,
                description="批量消费最大消息数",
                impact="影响批量消费的效率",
                tuning_tips=[
                    "高吞吐场景增大批量大小",
                    "低延迟场景减小批量大小",
                    "需要考虑消息处理时间"
                ],
                optimization_goals=[OptimizationGoal.THROUGHPUT]
            ),
            ClientParameter(
                name="pullBatchSize",
                client_type=ClientType.CONSUMER,
                default_value=32,
                recommended_value=64,
                description="拉取消息批量大小",
                impact="影响消息拉取的效率",
                tuning_tips=[
                    "网络带宽充足时可以增大",
                    "小消息场景建议增大",
                    "大消息场景适当减小"
                ],
                optimization_goals=[OptimizationGoal.THROUGHPUT]
            ),
            ClientParameter(
                name="pullInterval",
                client_type=ClientType.CONSUMER,
                default_value=0,
                recommended_value=0,
                description="拉取消息间隔(ms)",
                impact="控制消息拉取频率",
                tuning_tips=[
                    "0表示立即拉取",
                    "资源受限时可以设置间隔",
                    "实时性要求高时设置为0"
                ],
                optimization_goals=[OptimizationGoal.LATENCY, OptimizationGoal.RESOURCE]
            ),
            ClientParameter(
                name="consumeThreadMin",
                client_type=ClientType.CONSUMER,
                default_value=20,
                recommended_value=32,
                description="消费线程池最小线程数",
                impact="影响消费的并发能力",
                tuning_tips=[
                    "根据消息处理复杂度调整",
                    "IO密集型任务增加线程数",
                    "CPU密集型任务控制线程数"
                ],
                optimization_goals=[OptimizationGoal.THROUGHPUT]
            ),
            ClientParameter(
                name="consumeThreadMax",
                client_type=ClientType.CONSUMER,
                default_value=64,
                recommended_value=128,
                description="消费线程池最大线程数",
                impact="限制消费线程的最大数量",
                tuning_tips=[
                    "高并发场景可以增大",
                    "避免过多线程导致上下文切换",
                    "建议为CPU核数的2-4倍"
                ],
                optimization_goals=[OptimizationGoal.THROUGHPUT]
            ),
            ClientParameter(
                name="consumeTimeout",
                client_type=ClientType.CONSUMER,
                default_value=15,
                recommended_value=30,
                description="消费超时时间(分钟)",
                impact="消费消息的超时控制",
                tuning_tips=[
                    "复杂业务逻辑增加超时时间",
                    "避免设置过短导致误判",
                    "配合业务处理时间设置"
                ],
                optimization_goals=[OptimizationGoal.RELIABILITY]
            ),
            ClientParameter(
                name="maxReconsumeTimes",
                client_type=ClientType.CONSUMER,
                default_value=16,
                recommended_value=16,
                description="最大重新消费次数",
                impact="控制消息重试次数",
                tuning_tips=[
                    "业务容错性强时可以增加",
                    "避免无限重试",
                    "配合死信队列使用"
                ],
                optimization_goals=[OptimizationGoal.RELIABILITY]
            ),
            ClientParameter(
                name="suspendTimeMillis",
                client_type=ClientType.CONSUMER,
                default_value=1000,
                recommended_value=1000,
                description="消费失败暂停时间(ms)",
                impact="消费失败后的暂停时间",
                tuning_tips=[
                    "临时故障场景适当增加暂停时间",
                    "快速恢复场景减少暂停时间"
                ],
                optimization_goals=[OptimizationGoal.RELIABILITY]
            )
        ]
        
        # 合并参数
        all_params = producer_params + consumer_params
        for param in all_params:
            parameters[param.name] = param
        
        return parameters
    
    def _create_client_configurations(self) -> Dict[str, List[ClientConfiguration]]:
        """创建客户端配置"""
        configurations = {}
        
        # 生产者配置
        configurations["producer"] = [
            ClientConfiguration(
                name="高吞吐生产者",
                client_type=ClientType.PRODUCER,
                optimization_goal=OptimizationGoal.THROUGHPUT,
                description="适用于高吞吐量消息发送场景",
                parameters={
                    "sendMsgTimeout": 10000,
                    "retryTimesWhenSendFailed": 2,
                    "retryTimesWhenSendAsyncFailed": 2,
                    "maxMessageSize": 4194304,
                    "compressMsgBodyOverHowmuch": 2048,
                    "defaultTopicQueueNums": 16
                },
                expected_performance={
                    "throughput": "10万+ TPS",
                    "latency": "10-50ms",
                    "reliability": "高"
                },
                use_cases=[
                    "日志收集",
                    "数据同步",
                    "批量处理",
                    "大数据场景"
                ]
            ),
            ClientConfiguration(
                name="低延迟生产者",
                client_type=ClientType.PRODUCER,
                optimization_goal=OptimizationGoal.LATENCY,
                description="适用于低延迟消息发送场景",
                parameters={
                    "sendMsgTimeout": 3000,
                    "retryTimesWhenSendFailed": 1,
                    "retryTimesWhenSendAsyncFailed": 1,
                    "maxMessageSize": 1048576,  # 1MB
                    "compressMsgBodyOverHowmuch": 8192,
                    "defaultTopicQueueNums": 8
                },
                expected_performance={
                    "throughput": "5万+ TPS",
                    "latency": "1-10ms",
                    "reliability": "中等"
                },
                use_cases=[
                    "实时通知",
                    "在线交易",
                    "实时监控",
                    "游戏消息"
                ]
            ),
            ClientConfiguration(
                name="高可靠生产者",
                client_type=ClientType.PRODUCER,
                optimization_goal=OptimizationGoal.RELIABILITY,
                description="适用于高可靠性要求的场景",
                parameters={
                    "sendMsgTimeout": 15000,
                    "retryTimesWhenSendFailed": 5,
                    "retryTimesWhenSendAsyncFailed": 5,
                    "maxMessageSize": 4194304,
                    "compressMsgBodyOverHowmuch": 4096,
                    "defaultTopicQueueNums": 4
                },
                expected_performance={
                    "throughput": "2万+ TPS",
                    "latency": "50-200ms",
                    "reliability": "极高"
                },
                use_cases=[
                    "金融交易",
                    "支付系统",
                    "订单处理",
                    "关键业务"
                ]
            )
        ]
        
        # 消费者配置
        configurations["consumer"] = [
            ClientConfiguration(
                name="高吞吐消费者",
                client_type=ClientType.CONSUMER,
                optimization_goal=OptimizationGoal.THROUGHPUT,
                description="适用于高吞吐量消息消费场景",
                parameters={
                    "consumeMessageBatchMaxSize": 64,
                    "pullBatchSize": 128,
                    "pullInterval": 0,
                    "consumeThreadMin": 64,
                    "consumeThreadMax": 256,
                    "consumeTimeout": 60,
                    "maxReconsumeTimes": 16,
                    "suspendTimeMillis": 1000
                },
                expected_performance={
                    "throughput": "10万+ TPS",
                    "latency": "10-100ms",
                    "resource_usage": "高"
                },
                use_cases=[
                    "数据处理",
                    "日志分析",
                    "批量计算",
                    "ETL任务"
                ]
            ),
            ClientConfiguration(
                name="低延迟消费者",
                client_type=ClientType.CONSUMER,
                optimization_goal=OptimizationGoal.LATENCY,
                description="适用于低延迟消息消费场景",
                parameters={
                    "consumeMessageBatchMaxSize": 1,
                    "pullBatchSize": 16,
                    "pullInterval": 0,
                    "consumeThreadMin": 32,
                    "consumeThreadMax": 64,
                    "consumeTimeout": 15,
                    "maxReconsumeTimes": 8,
                    "suspendTimeMillis": 500
                },
                expected_performance={
                    "throughput": "5万+ TPS",
                    "latency": "1-10ms",
                    "resource_usage": "中等"
                },
                use_cases=[
                    "实时推送",
                    "在线服务",
                    "实时计算",
                    "监控告警"
                ]
            ),
            ClientConfiguration(
                name="资源受限消费者",
                client_type=ClientType.CONSUMER,
                optimization_goal=OptimizationGoal.RESOURCE,
                description="适用于资源受限环境的消费场景",
                parameters={
                    "consumeMessageBatchMaxSize": 16,
                    "pullBatchSize": 32,
                    "pullInterval": 100,
                    "consumeThreadMin": 8,
                    "consumeThreadMax": 16,
                    "consumeTimeout": 30,
                    "maxReconsumeTimes": 16,
                    "suspendTimeMillis": 2000
                },
                expected_performance={
                    "throughput": "1万+ TPS",
                    "latency": "100-500ms",
                    "resource_usage": "低"
                },
                use_cases=[
                    "边缘计算",
                    "容器化部署",
                    "开发测试",
                    "小规模应用"
                ]
            )
        ]
        
        return configurations
    
    def get_parameter_recommendations(self, client_type: ClientType, 
                                    optimization_goal: OptimizationGoal) -> Dict[str, Any]:
        """获取参数推荐"""
        # 获取相关参数
        relevant_params = {
            name: param for name, param in self.parameters.items()
            if param.client_type == client_type and 
               (not param.optimization_goals or optimization_goal in param.optimization_goals)
        }
        
        # 返回推荐值
        return {name: param.recommended_value for name, param in relevant_params.items()}
    
    def get_configuration(self, client_type: ClientType, 
                         optimization_goal: OptimizationGoal) -> Optional[ClientConfiguration]:
        """获取客户端配置"""
        client_type_str = client_type.value
        configs = self.configurations.get(client_type_str, [])
        
        for config in configs:
            if config.optimization_goal == optimization_goal:
                return config
        
        return None
    
    def generate_client_config(self, client_type: ClientType, 
                             optimization_goal: OptimizationGoal,
                             custom_overrides: Dict[str, Any] = None) -> str:
        """生成客户端配置代码"""
        config = self.get_configuration(client_type, optimization_goal)
        if not config:
            return "未找到匹配的配置"
        
        parameters = config.parameters.copy()
        if custom_overrides:
            parameters.update(custom_overrides)
        
        if client_type == ClientType.PRODUCER:
            return self._generate_producer_config(config, parameters)
        else:
            return self._generate_consumer_config(config, parameters)
    
    def _generate_producer_config(self, config: ClientConfiguration, 
                                parameters: Dict[str, Any]) -> str:
        """生成生产者配置代码"""
        code_lines = [
            f"// {config.name} - {config.description}",
            "DefaultMQProducer producer = new DefaultMQProducer();",
            "producer.setProducerGroup(\"your_producer_group\");",
            "producer.setNamesrvAddr(\"localhost:9876\");",
            ""
        ]
        
        # 添加参数配置
        for param_name, value in parameters.items():
            param_def = self.parameters.get(param_name)
            if param_def:
                code_lines.append(f"// {param_def.description}")
                method_name = self._get_setter_method_name(param_name)
                code_lines.append(f"producer.{method_name}({value});")
                code_lines.append("")
        
        code_lines.extend([
            "try {",
            "    producer.start();",
            "    System.out.println(\"Producer started successfully\");",
            "} catch (MQClientException e) {",
            "    e.printStackTrace();",
            "}"
        ])
        
        return "\n".join(code_lines)
    
    def _generate_consumer_config(self, config: ClientConfiguration, 
                                parameters: Dict[str, Any]) -> str:
        """生成消费者配置代码"""
        code_lines = [
            f"// {config.name} - {config.description}",
            "DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();",
            "consumer.setConsumerGroup(\"your_consumer_group\");",
            "consumer.setNamesrvAddr(\"localhost:9876\");",
            "consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);",
            ""
        ]
        
        # 添加参数配置
        for param_name, value in parameters.items():
            param_def = self.parameters.get(param_name)
            if param_def:
                code_lines.append(f"// {param_def.description}")
                method_name = self._get_setter_method_name(param_name)
                code_lines.append(f"consumer.{method_name}({value});")
                code_lines.append("")
        
        code_lines.extend([
            "// 订阅主题",
            "consumer.subscribe(\"your_topic\", \"*\");",
            "",
            "// 注册消息监听器",
            "consumer.registerMessageListener(new MessageListenerConcurrently() {",
            "    @Override",
            "    public ConsumeConcurrentlyStatus consumeMessage(",
            "            List<MessageExt> messages,",
            "            ConsumeConcurrentlyContext context) {",
            "        // 处理消息逻辑",
            "        for (MessageExt message : messages) {",
            "            System.out.println(\"Received message: \" + new String(message.getBody()));",
            "        }",
            "        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;",
            "    }",
            "});",
            "",
            "try {",
            "    consumer.start();",
            "    System.out.println(\"Consumer started successfully\");",
            "} catch (MQClientException e) {",
            "    e.printStackTrace();",
            "}"
        ])
        
        return "\n".join(code_lines)
    
    def _get_setter_method_name(self, param_name: str) -> str:
        """获取setter方法名"""
        # 简化的方法名映射
        method_mapping = {
            "sendMsgTimeout": "setSendMsgTimeout",
            "retryTimesWhenSendFailed": "setRetryTimesWhenSendFailed",
            "retryTimesWhenSendAsyncFailed": "setRetryTimesWhenSendAsyncFailed",
            "maxMessageSize": "setMaxMessageSize",
            "compressMsgBodyOverHowmuch": "setCompressMsgBodyOverHowmuch",
            "defaultTopicQueueNums": "setDefaultTopicQueueNums",
            "consumeMessageBatchMaxSize": "setConsumeMessageBatchMaxSize",
            "pullBatchSize": "setPullBatchSize",
            "pullInterval": "setPullInterval",
            "consumeThreadMin": "setConsumeThreadMin",
            "consumeThreadMax": "setConsumeThreadMax",
            "consumeTimeout": "setConsumeTimeout",
            "maxReconsumeTimes": "setMaxReconsumeTimes",
            "suspendTimeMillis": "setSuspendTimeMillis"
        }
        
        return method_mapping.get(param_name, f"set{param_name.capitalize()}")
    
    def print_optimization_guide(self, client_type: ClientType = None):
        """打印优化指南"""
        print("=== RocketMQ客户端参数优化指南 ===")
        
        if client_type:
            params = [p for p in self.parameters.values() if p.client_type == client_type]
            print(f"\n📋 {client_type.value.upper()} 参数:")
        else:
            params = list(self.parameters.values())
            print("\n📋 所有客户端参数:")
        
        for param in params:
            print(f"\n🔧 {param.name}")
            print(f"   类型: {param.client_type.value}")
            print(f"   描述: {param.description}")
            print(f"   默认值: {param.default_value}")
            print(f"   推荐值: {param.recommended_value}")
            print(f"   影响: {param.impact}")
            
            if param.optimization_goals:
                goals = [goal.value for goal in param.optimization_goals]
                print(f"   优化目标: {', '.join(goals)}")
            
            if param.tuning_tips:
                print(f"   调优建议:")
                for tip in param.tuning_tips:
                    print(f"     • {tip}")
    
    def print_configuration_guide(self):
        """打印配置指南"""
        print("=== RocketMQ客户端配置指南 ===")
        
        for client_type_str, configs in self.configurations.items():
            print(f"\n🎯 {client_type_str.upper()} 配置:")
            
            for config in configs:
                print(f"\n📋 {config.name}")
                print(f"描述: {config.description}")
                print(f"优化目标: {config.optimization_goal.value}")
                
                print(f"\n预期性能:")
                for metric, value in config.expected_performance.items():
                    print(f"  • {metric}: {value}")
                
                print(f"\n适用场景:")
                for use_case in config.use_cases:
                    print(f"  • {use_case}")
                
                print(f"\n关键参数:")
                for param_name, value in config.parameters.items():
                    param_def = self.parameters.get(param_name)
                    desc = param_def.description if param_def else "未知参数"
                    print(f"  • {param_name} = {value} ({desc})")

# 使用示例
if __name__ == "__main__":
    # 创建客户端参数优化器
    optimizer = ClientParameterOptimizer()
    
    # 打印优化指南
    optimizer.print_optimization_guide(ClientType.PRODUCER)
    
    # 打印配置指南
    optimizer.print_configuration_guide()
    
    # 生成高吞吐生产者配置
    producer_config = optimizer.generate_client_config(
        ClientType.PRODUCER, 
        OptimizationGoal.THROUGHPUT
    )
    print(f"\n=== 高吞吐生产者配置 ===\n{producer_config}")
    
    # 生成低延迟消费者配置
    consumer_config = optimizer.generate_client_config(
        ClientType.CONSUMER, 
        OptimizationGoal.LATENCY
    )
    print(f"\n=== 低延迟消费者配置 ===\n{consumer_config}")

8.3 网络和存储优化

8.3.1 网络优化

from dataclasses import dataclass
from typing import Dict, List, Optional, Any, Tuple
from enum import Enum
import subprocess
import platform
import socket
import time

class NetworkOptimizationType(Enum):
    """网络优化类型"""
    BANDWIDTH = "bandwidth"        # 带宽优化
    LATENCY = "latency"            # 延迟优化
    THROUGHPUT = "throughput"      # 吞吐量优化
    RELIABILITY = "reliability"    # 可靠性优化

class NetworkProtocol(Enum):
    """网络协议"""
    TCP = "tcp"
    UDP = "udp"

@dataclass
class NetworkParameter:
    """网络参数"""
    name: str
    category: str
    default_value: str
    recommended_value: str
    description: str
    impact: str
    optimization_type: NetworkOptimizationType
    platform: str = "linux"  # linux, windows, all

@dataclass
class NetworkConfiguration:
    """网络配置"""
    name: str
    description: str
    optimization_type: NetworkOptimizationType
    parameters: List[NetworkParameter]
    expected_improvement: str
    use_cases: List[str]

class NetworkOptimizer:
    """网络优化器"""
    
    def __init__(self):
        self.platform = platform.system().lower()
        self.parameters = self._create_network_parameters()
        self.configurations = self._create_network_configurations()
    
    def _create_network_parameters(self) -> List[NetworkParameter]:
        """创建网络参数"""
        parameters = [
            # TCP缓冲区优化
            NetworkParameter(
                name="net.core.rmem_max",
                category="tcp_buffer",
                default_value="212992",
                recommended_value="134217728",  # 128MB
                description="TCP接收缓冲区最大值",
                impact="增大接收缓冲区提高网络吞吐量",
                optimization_type=NetworkOptimizationType.THROUGHPUT,
                platform="linux"
            ),
            NetworkParameter(
                name="net.core.wmem_max",
                category="tcp_buffer",
                default_value="212992",
                recommended_value="134217728",  # 128MB
                description="TCP发送缓冲区最大值",
                impact="增大发送缓冲区提高网络吞吐量",
                optimization_type=NetworkOptimizationType.THROUGHPUT,
                platform="linux"
            ),
            NetworkParameter(
                name="net.ipv4.tcp_rmem",
                category="tcp_buffer",
                default_value="4096 65536 4194304",
                recommended_value="4096 87380 134217728",
                description="TCP接收缓冲区大小(最小 默认 最大)",
                impact="优化TCP接收窗口大小",
                optimization_type=NetworkOptimizationType.THROUGHPUT,
                platform="linux"
            ),
            NetworkParameter(
                name="net.ipv4.tcp_wmem",
                category="tcp_buffer",
                default_value="4096 16384 4194304",
                recommended_value="4096 65536 134217728",
                description="TCP发送缓冲区大小(最小 默认 最大)",
                impact="优化TCP发送窗口大小",
                optimization_type=NetworkOptimizationType.THROUGHPUT,
                platform="linux"
            ),
            
            # TCP连接优化
            NetworkParameter(
                name="net.core.somaxconn",
                category="tcp_connection",
                default_value="128",
                recommended_value="65535",
                description="监听队列最大长度",
                impact="增加并发连接处理能力",
                optimization_type=NetworkOptimizationType.THROUGHPUT,
                platform="linux"
            ),
            NetworkParameter(
                name="net.core.netdev_max_backlog",
                category="tcp_connection",
                default_value="1000",
                recommended_value="30000",
                description="网络设备接收队列长度",
                impact="提高网络包处理能力",
                optimization_type=NetworkOptimizationType.THROUGHPUT,
                platform="linux"
            ),
            NetworkParameter(
                name="net.ipv4.tcp_max_syn_backlog",
                category="tcp_connection",
                default_value="1024",
                recommended_value="65535",
                description="SYN队列最大长度",
                impact="提高新连接建立速度",
                optimization_type=NetworkOptimizationType.THROUGHPUT,
                platform="linux"
            ),
            
            # TCP性能优化
            NetworkParameter(
                name="net.ipv4.tcp_window_scaling",
                category="tcp_performance",
                default_value="1",
                recommended_value="1",
                description="启用TCP窗口缩放",
                impact="支持大于64KB的TCP窗口",
                optimization_type=NetworkOptimizationType.THROUGHPUT,
                platform="linux"
            ),
            NetworkParameter(
                name="net.ipv4.tcp_timestamps",
                category="tcp_performance",
                default_value="1",
                recommended_value="1",
                description="启用TCP时间戳",
                impact="提高TCP性能和安全性",
                optimization_type=NetworkOptimizationType.THROUGHPUT,
                platform="linux"
            ),
            NetworkParameter(
                name="net.ipv4.tcp_sack",
                category="tcp_performance",
                default_value="1",
                recommended_value="1",
                description="启用选择性确认",
                impact="提高丢包恢复效率",
                optimization_type=NetworkOptimizationType.RELIABILITY,
                platform="linux"
            ),
            NetworkParameter(
                name="net.ipv4.tcp_fack",
                category="tcp_performance",
                default_value="1",
                recommended_value="1",
                description="启用前向确认",
                impact="配合SACK提高性能",
                optimization_type=NetworkOptimizationType.RELIABILITY,
                platform="linux"
            ),
            
            # 延迟优化
            NetworkParameter(
                name="net.ipv4.tcp_low_latency",
                category="latency",
                default_value="0",
                recommended_value="1",
                description="启用低延迟模式",
                impact="减少TCP延迟",
                optimization_type=NetworkOptimizationType.LATENCY,
                platform="linux"
            ),
            NetworkParameter(
                name="net.ipv4.tcp_no_delay",
                category="latency",
                default_value="0",
                recommended_value="1",
                description="禁用Nagle算法",
                impact="减少小包延迟",
                optimization_type=NetworkOptimizationType.LATENCY,
                platform="linux"
            ),
            
            # 连接复用优化
            NetworkParameter(
                name="net.ipv4.tcp_tw_reuse",
                category="connection_reuse",
                default_value="0",
                recommended_value="1",
                description="启用TIME_WAIT套接字重用",
                impact="提高连接复用效率",
                optimization_type=NetworkOptimizationType.THROUGHPUT,
                platform="linux"
            ),
            NetworkParameter(
                name="net.ipv4.tcp_fin_timeout",
                category="connection_reuse",
                default_value="60",
                recommended_value="30",
                description="FIN_WAIT_2状态超时时间",
                impact="加快连接回收",
                optimization_type=NetworkOptimizationType.THROUGHPUT,
                platform="linux"
            ),
            NetworkParameter(
                name="net.ipv4.tcp_keepalive_time",
                category="connection_reuse",
                default_value="7200",
                recommended_value="600",
                description="TCP保活时间(秒)",
                impact="更快检测死连接",
                optimization_type=NetworkOptimizationType.RELIABILITY,
                platform="linux"
            ),
            NetworkParameter(
                name="net.ipv4.tcp_keepalive_probes",
                category="connection_reuse",
                default_value="9",
                recommended_value="3",
                description="TCP保活探测次数",
                impact="减少保活检测时间",
                optimization_type=NetworkOptimizationType.RELIABILITY,
                platform="linux"
            ),
            NetworkParameter(
                name="net.ipv4.tcp_keepalive_intvl",
                category="connection_reuse",
                default_value="75",
                recommended_value="15",
                description="TCP保活探测间隔(秒)",
                impact="加快死连接检测",
                optimization_type=NetworkOptimizationType.RELIABILITY,
                platform="linux"
            )
        ]
        
        return parameters
    
    def _create_network_configurations(self) -> Dict[NetworkOptimizationType, NetworkConfiguration]:
        """创建网络配置"""
        configurations = {}
        
        # 高吞吐量配置
        throughput_params = [
            param for param in self.parameters 
            if param.optimization_type == NetworkOptimizationType.THROUGHPUT
        ]
        configurations[NetworkOptimizationType.THROUGHPUT] = NetworkConfiguration(
            name="高吞吐量网络配置",
            description="适用于高吞吐量消息传输场景",
            optimization_type=NetworkOptimizationType.THROUGHPUT,
            parameters=throughput_params,
            expected_improvement="提高网络吞吐量50-200%",
            use_cases=[
                "大批量数据传输",
                "高并发消息处理",
                "数据同步场景",
                "批处理任务"
            ]
        )
        
        # 低延迟配置
        latency_params = [
            param for param in self.parameters 
            if param.optimization_type == NetworkOptimizationType.LATENCY
        ]
        configurations[NetworkOptimizationType.LATENCY] = NetworkConfiguration(
            name="低延迟网络配置",
            description="适用于低延迟消息传输场景",
            optimization_type=NetworkOptimizationType.LATENCY,
            parameters=latency_params,
            expected_improvement="减少网络延迟20-50%",
            use_cases=[
                "实时交易系统",
                "在线游戏",
                "实时监控",
                "即时通讯"
            ]
        )
        
        # 可靠性配置
        reliability_params = [
            param for param in self.parameters 
            if param.optimization_type == NetworkOptimizationType.RELIABILITY
        ]
        configurations[NetworkOptimizationType.RELIABILITY] = NetworkConfiguration(
            name="高可靠性网络配置",
            description="适用于高可靠性要求的场景",
            optimization_type=NetworkOptimizationType.RELIABILITY,
            parameters=reliability_params,
            expected_improvement="提高连接稳定性和错误恢复能力",
            use_cases=[
                "金融系统",
                "关键业务",
                "长连接应用",
                "不稳定网络环境"
            ]
        )
        
        return configurations
    
    def get_current_network_config(self) -> Dict[str, str]:
        """获取当前网络配置"""
        current_config = {}
        
        if self.platform == "linux":
            for param in self.parameters:
                if param.platform in ["linux", "all"]:
                    try:
                        # 读取sysctl参数
                        result = subprocess.run(
                            ["sysctl", "-n", param.name],
                            capture_output=True,
                            text=True,
                            timeout=5
                        )
                        if result.returncode == 0:
                            current_config[param.name] = result.stdout.strip()
                    except Exception:
                        current_config[param.name] = "未知"
        
        return current_config
    
    def apply_network_optimization(self, optimization_type: NetworkOptimizationType,
                                 dry_run: bool = True) -> Dict[str, Any]:
        """应用网络优化"""
        config = self.configurations.get(optimization_type)
        if not config:
            return {"error": f"不支持的优化类型: {optimization_type}"}
        
        results = {
            "optimization_type": optimization_type.value,
            "applied_parameters": [],
            "failed_parameters": [],
            "dry_run": dry_run
        }
        
        for param in config.parameters:
            if param.platform not in [self.platform, "all"]:
                continue
            
            try:
                if dry_run:
                    # 仅显示将要执行的命令
                    command = f"sysctl -w {param.name}={param.recommended_value}"
                    results["applied_parameters"].append({
                        "name": param.name,
                        "command": command,
                        "description": param.description
                    })
                else:
                    # 实际执行优化
                    if self.platform == "linux":
                        result = subprocess.run(
                            ["sysctl", "-w", f"{param.name}={param.recommended_value}"],
                            capture_output=True,
                            text=True,
                            timeout=5
                        )
                        if result.returncode == 0:
                            results["applied_parameters"].append({
                                "name": param.name,
                                "value": param.recommended_value,
                                "description": param.description
                            })
                        else:
                            results["failed_parameters"].append({
                                "name": param.name,
                                "error": result.stderr
                            })
            except Exception as e:
                results["failed_parameters"].append({
                    "name": param.name,
                    "error": str(e)
                })
        
        return results
    
    def generate_sysctl_config(self, optimization_type: NetworkOptimizationType) -> str:
        """生成sysctl配置文件"""
        config = self.configurations.get(optimization_type)
        if not config:
            return f"# 不支持的优化类型: {optimization_type}"
        
        lines = [
            f"# {config.name}",
            f"# {config.description}",
            f"# 预期改进: {config.expected_improvement}",
            ""
        ]
        
        # 按类别分组参数
        categories = {}
        for param in config.parameters:
            if param.platform in [self.platform, "all"]:
                if param.category not in categories:
                    categories[param.category] = []
                categories[param.category].append(param)
        
        for category, params in categories.items():
            lines.append(f"# {category.upper()} 优化")
            for param in params:
                lines.append(f"# {param.description} - {param.impact}")
                lines.append(f"{param.name} = {param.recommended_value}")
                lines.append("")
        
        return "\n".join(lines)
    
    def test_network_performance(self, target_host: str = "localhost", 
                               target_port: int = 9876) -> Dict[str, Any]:
        """测试网络性能"""
        results = {
            "target": f"{target_host}:{target_port}",
            "tests": {}
        }
        
        # TCP连接测试
        try:
            start_time = time.time()
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(5)
            result = sock.connect_ex((target_host, target_port))
            connect_time = (time.time() - start_time) * 1000
            sock.close()
            
            results["tests"]["tcp_connect"] = {
                "success": result == 0,
                "connect_time_ms": round(connect_time, 2),
                "status": "成功" if result == 0 else f"失败 (错误码: {result})"
            }
        except Exception as e:
            results["tests"]["tcp_connect"] = {
                "success": False,
                "error": str(e)
            }
        
        # 带宽测试(简化版)
        try:
            # 这里可以实现更复杂的带宽测试
            results["tests"]["bandwidth"] = {
                "note": "需要专门的带宽测试工具如iperf3"
            }
        except Exception as e:
            results["tests"]["bandwidth"] = {
                "error": str(e)
            }
        
        return results
    
    def print_optimization_guide(self, optimization_type: NetworkOptimizationType = None):
        """打印网络优化指南"""
        print("=== RocketMQ网络优化指南 ===")
        
        if optimization_type:
            config = self.configurations.get(optimization_type)
            if config:
                print(f"\n🎯 {config.name}")
                print(f"描述: {config.description}")
                print(f"预期改进: {config.expected_improvement}")
                
                print(f"\n适用场景:")
                for use_case in config.use_cases:
                    print(f"  • {use_case}")
                
                print(f"\n优化参数:")
                for param in config.parameters:
                    if param.platform in [self.platform, "all"]:
                        print(f"  • {param.name}")
                        print(f"    描述: {param.description}")
                        print(f"    默认值: {param.default_value}")
                        print(f"    推荐值: {param.recommended_value}")
                        print(f"    影响: {param.impact}")
                        print()
        else:
            for opt_type, config in self.configurations.items():
                print(f"\n🎯 {config.name}")
                print(f"类型: {opt_type.value}")
                print(f"描述: {config.description}")
                print(f"预期改进: {config.expected_improvement}")
                print(f"参数数量: {len(config.parameters)}")

# 使用示例
if __name__ == "__main__":
    # 创建网络优化器
    optimizer = NetworkOptimizer()
    
    # 打印优化指南
    optimizer.print_optimization_guide(NetworkOptimizationType.THROUGHPUT)
    
    # 获取当前配置
    current_config = optimizer.get_current_network_config()
    print(f"\n当前网络配置: {current_config}")
    
    # 生成sysctl配置
    sysctl_config = optimizer.generate_sysctl_config(NetworkOptimizationType.THROUGHPUT)
    print(f"\n=== sysctl配置文件 ===\n{sysctl_config}")
    
    # 应用优化(dry run)
    results = optimizer.apply_network_optimization(NetworkOptimizationType.THROUGHPUT, dry_run=True)
    print(f"\n优化结果: {results}")
    
    # 测试网络性能
    # perf_results = optimizer.test_network_performance()
    # print(f"\n网络性能测试: {perf_results}")

8.3.2 存储优化

from dataclasses import dataclass
from typing import Dict, List, Optional, Any, Tuple
from enum import Enum
import os
import subprocess
import shutil
import time
import json

class StorageType(Enum):
    """存储类型"""
    SSD = "ssd"
    HDD = "hdd"
    NVME = "nvme"
    NETWORK = "network"

class FileSystemType(Enum):
    """文件系统类型"""
    EXT4 = "ext4"
    XFS = "xfs"
    BTRFS = "btrfs"
    ZFS = "zfs"
    NTFS = "ntfs"

class OptimizationTarget(Enum):
    """优化目标"""
    THROUGHPUT = "throughput"      # 吞吐量
    LATENCY = "latency"            # 延迟
    DURABILITY = "durability"      # 持久性
    SPACE = "space"                # 空间利用率

@dataclass
class StorageParameter:
    """存储参数"""
    name: str
    category: str
    default_value: str
    recommended_value: str
    description: str
    impact: str
    storage_type: StorageType
    filesystem: FileSystemType = None
    optimization_target: OptimizationTarget = None

@dataclass
class StorageConfiguration:
    """存储配置"""
    name: str
    description: str
    storage_type: StorageType
    filesystem: FileSystemType
    optimization_target: OptimizationTarget
    mount_options: List[str]
    kernel_parameters: List[StorageParameter]
    rocketmq_parameters: Dict[str, Any]
    expected_improvement: str
    use_cases: List[str]

class StorageOptimizer:
    """存储优化器"""
    
    def __init__(self):
        self.platform = os.name
        self.configurations = self._create_storage_configurations()
    
    def _create_storage_configurations(self) -> Dict[Tuple[StorageType, OptimizationTarget], StorageConfiguration]:
        """创建存储配置"""
        configurations = {}
        
        # SSD + 高吞吐量配置
        configurations[(StorageType.SSD, OptimizationTarget.THROUGHPUT)] = StorageConfiguration(
            name="SSD高吞吐量配置",
            description="适用于SSD存储的高吞吐量优化",
            storage_type=StorageType.SSD,
            filesystem=FileSystemType.XFS,
            optimization_target=OptimizationTarget.THROUGHPUT,
            mount_options=[
                "noatime",          # 不更新访问时间
                "nodiratime",       # 不更新目录访问时间
                "nobarrier",        # 禁用写屏障(SSD不需要)
                "largeio",          # 启用大IO
                "inode64",          # 64位inode
                "swalloc"           # 条带化分配
            ],
            kernel_parameters=[
                StorageParameter(
                    name="vm.dirty_ratio",
                    category="memory",
                    default_value="20",
                    recommended_value="15",
                    description="脏页占总内存的百分比",
                    impact="控制内存中脏页的比例",
                    storage_type=StorageType.SSD,
                    optimization_target=OptimizationTarget.THROUGHPUT
                ),
                StorageParameter(
                    name="vm.dirty_background_ratio",
                    category="memory",
                    default_value="10",
                    recommended_value="5",
                    description="后台写入脏页的阈值",
                    impact="控制后台写入的触发点",
                    storage_type=StorageType.SSD,
                    optimization_target=OptimizationTarget.THROUGHPUT
                ),
                StorageParameter(
                    name="vm.dirty_expire_centisecs",
                    category="memory",
                    default_value="3000",
                    recommended_value="1500",
                    description="脏页过期时间(厘秒)",
                    impact="减少脏页在内存中的停留时间",
                    storage_type=StorageType.SSD,
                    optimization_target=OptimizationTarget.THROUGHPUT
                ),
                StorageParameter(
                    name="vm.dirty_writeback_centisecs",
                    category="memory",
                    default_value="500",
                    recommended_value="100",
                    description="脏页写回间隔(厘秒)",
                    impact="增加写回频率",
                    storage_type=StorageType.SSD,
                    optimization_target=OptimizationTarget.THROUGHPUT
                )
            ],
            rocketmq_parameters={
                "flushDiskType": "ASYNC_FLUSH",
                "flushIntervalCommitLog": 200,
                "flushCommitLogTimed": True,
                "flushIntervalConsumeQueue": 1000,
                "flushConsumeQueueTimed": True,
                "maxTransferBytesOnCommitlog": 262144,
                "maxTransferCountOnCommitlog": 32
            },
            expected_improvement="提高写入吞吐量50-100%",
            use_cases=[
                "高并发写入",
                "大批量数据处理",
                "日志收集系统",
                "数据同步场景"
            ]
        )
        
        # SSD + 低延迟配置
        configurations[(StorageType.SSD, OptimizationTarget.LATENCY)] = StorageConfiguration(
            name="SSD低延迟配置",
            description="适用于SSD存储的低延迟优化",
            storage_type=StorageType.SSD,
            filesystem=FileSystemType.EXT4,
            optimization_target=OptimizationTarget.LATENCY,
            mount_options=[
                "noatime",
                "nodiratime",
                "nobarrier",
                "data=writeback",   # 写回模式,降低延迟
                "commit=1"          # 1秒提交间隔
            ],
            kernel_parameters=[
                StorageParameter(
                    name="vm.dirty_ratio",
                    category="memory",
                    default_value="20",
                    recommended_value="5",
                    description="脏页占总内存的百分比",
                    impact="减少脏页积累,降低延迟",
                    storage_type=StorageType.SSD,
                    optimization_target=OptimizationTarget.LATENCY
                ),
                StorageParameter(
                    name="vm.dirty_background_ratio",
                    category="memory",
                    default_value="10",
                    recommended_value="2",
                    description="后台写入脏页的阈值",
                    impact="更早触发后台写入",
                    storage_type=StorageType.SSD,
                    optimization_target=OptimizationTarget.LATENCY
                ),
                StorageParameter(
                    name="vm.dirty_expire_centisecs",
                    category="memory",
                    default_value="3000",
                    recommended_value="500",
                    description="脏页过期时间(厘秒)",
                    impact="快速写入脏页",
                    storage_type=StorageType.SSD,
                    optimization_target=OptimizationTarget.LATENCY
                )
            ],
            rocketmq_parameters={
                "flushDiskType": "SYNC_FLUSH",
                "flushIntervalCommitLog": 0,
                "flushCommitLogTimed": False,
                "flushIntervalConsumeQueue": 1000,
                "flushConsumeQueueTimed": True,
                "maxTransferBytesOnCommitlog": 65536,
                "maxTransferCountOnCommitlog": 16
            },
            expected_improvement="减少写入延迟50-80%",
            use_cases=[
                "实时交易系统",
                "在线游戏",
                "实时监控",
                "低延迟消息传递"
            ]
        )
        
        # NVMe + 极致性能配置
        configurations[(StorageType.NVME, OptimizationTarget.THROUGHPUT)] = StorageConfiguration(
            name="NVMe极致性能配置",
            description="适用于NVMe存储的极致性能优化",
            storage_type=StorageType.NVME,
            filesystem=FileSystemType.XFS,
            optimization_target=OptimizationTarget.THROUGHPUT,
            mount_options=[
                "noatime",
                "nodiratime",
                "nobarrier",
                "largeio",
                "inode64",
                "swalloc",
                "allocsize=16m"     # 大分配单元
            ],
            kernel_parameters=[
                StorageParameter(
                    name="vm.dirty_ratio",
                    category="memory",
                    default_value="20",
                    recommended_value="40",
                    description="脏页占总内存的百分比",
                    impact="NVMe可以处理更多脏页",
                    storage_type=StorageType.NVME,
                    optimization_target=OptimizationTarget.THROUGHPUT
                ),
                StorageParameter(
                    name="vm.dirty_background_ratio",
                    category="memory",
                    default_value="10",
                    recommended_value="20",
                    description="后台写入脏页的阈值",
                    impact="充分利用NVMe性能",
                    storage_type=StorageType.NVME,
                    optimization_target=OptimizationTarget.THROUGHPUT
                )
            ],
            rocketmq_parameters={
                "flushDiskType": "ASYNC_FLUSH",
                "flushIntervalCommitLog": 500,
                "flushCommitLogTimed": True,
                "flushIntervalConsumeQueue": 1000,
                "flushConsumeQueueTimed": True,
                "maxTransferBytesOnCommitlog": 1048576,  # 1MB
                "maxTransferCountOnCommitlog": 64
            },
            expected_improvement="提高写入吞吐量200-500%",
            use_cases=[
                "超高并发系统",
                "大数据处理",
                "高频交易",
                "实时分析"
            ]
        )
        
        # HDD + 持久性配置
        configurations[(StorageType.HDD, OptimizationTarget.DURABILITY)] = StorageConfiguration(
            name="HDD高持久性配置",
            description="适用于HDD存储的高持久性优化",
            storage_type=StorageType.HDD,
            filesystem=FileSystemType.EXT4,
            optimization_target=OptimizationTarget.DURABILITY,
            mount_options=[
                "noatime",
                "data=ordered",     # 有序写入
                "barrier=1",        # 启用写屏障
                "commit=5"          # 5秒提交间隔
            ],
            kernel_parameters=[
                StorageParameter(
                    name="vm.dirty_ratio",
                    category="memory",
                    default_value="20",
                    recommended_value="10",
                    description="脏页占总内存的百分比",
                    impact="减少数据丢失风险",
                    storage_type=StorageType.HDD,
                    optimization_target=OptimizationTarget.DURABILITY
                ),
                StorageParameter(
                    name="vm.dirty_background_ratio",
                    category="memory",
                    default_value="10",
                    recommended_value="5",
                    description="后台写入脏页的阈值",
                    impact="及时写入数据",
                    storage_type=StorageType.HDD,
                    optimization_target=OptimizationTarget.DURABILITY
                )
            ],
            rocketmq_parameters={
                "flushDiskType": "SYNC_FLUSH",
                "flushIntervalCommitLog": 0,
                "flushCommitLogTimed": False,
                "flushIntervalConsumeQueue": 1000,
                "flushConsumeQueueTimed": True,
                "maxTransferBytesOnCommitlog": 32768,
                "maxTransferCountOnCommitlog": 8
            },
            expected_improvement="提高数据持久性和一致性",
            use_cases=[
                "金融系统",
                "关键业务数据",
                "审计日志",
                "合规要求场景"
            ]
        )
        
        return configurations
    
    def detect_storage_type(self, path: str) -> StorageType:
        """检测存储类型"""
        try:
            # 简化的存储类型检测
            if os.name == 'posix':  # Linux/Unix
                # 通过/sys/block检测
                device = self._get_device_from_path(path)
                if device:
                    rotational_path = f"/sys/block/{device}/queue/rotational"
                    if os.path.exists(rotational_path):
                        with open(rotational_path, 'r') as f:
                            rotational = f.read().strip()
                            if rotational == "0":
                                # 进一步检测是否为NVMe
                                if "nvme" in device:
                                    return StorageType.NVME
                                else:
                                    return StorageType.SSD
                            else:
                                return StorageType.HDD
            
            # 默认返回SSD
            return StorageType.SSD
        except Exception:
            return StorageType.SSD
    
    def _get_device_from_path(self, path: str) -> Optional[str]:
        """从路径获取设备名"""
        try:
            # 获取挂载点信息
            result = subprocess.run(
                ["df", path],
                capture_output=True,
                text=True,
                timeout=5
            )
            if result.returncode == 0:
                lines = result.stdout.strip().split('\n')
                if len(lines) > 1:
                    device_path = lines[1].split()[0]
                    # 提取设备名
                    device = os.path.basename(device_path)
                    # 去掉分区号
                    import re
                    device = re.sub(r'\d+$', '', device)
                    return device
        except Exception:
            pass
        return None
    
    def get_filesystem_type(self, path: str) -> Optional[FileSystemType]:
        """获取文件系统类型"""
        try:
            result = subprocess.run(
                ["df", "-T", path],
                capture_output=True,
                text=True,
                timeout=5
            )
            if result.returncode == 0:
                lines = result.stdout.strip().split('\n')
                if len(lines) > 1:
                    fs_type = lines[1].split()[1].lower()
                    for fs in FileSystemType:
                        if fs.value == fs_type:
                            return fs
        except Exception:
            pass
        return None
    
    def analyze_storage_performance(self, path: str) -> Dict[str, Any]:
        """分析存储性能"""
        results = {
            "path": path,
            "storage_type": self.detect_storage_type(path).value,
            "filesystem": None,
            "tests": {}
        }
        
        # 检测文件系统
        fs_type = self.get_filesystem_type(path)
        if fs_type:
            results["filesystem"] = fs_type.value
        
        # 写入性能测试
        try:
            test_file = os.path.join(path, "rocketmq_perf_test.tmp")
            test_data = b"0" * 1024 * 1024  # 1MB
            
            # 顺序写入测试
            start_time = time.time()
            with open(test_file, 'wb') as f:
                for _ in range(100):  # 写入100MB
                    f.write(test_data)
                f.flush()
                os.fsync(f.fileno())
            write_time = time.time() - start_time
            
            # 顺序读取测试
            start_time = time.time()
            with open(test_file, 'rb') as f:
                while f.read(1024 * 1024):
                    pass
            read_time = time.time() - start_time
            
            # 清理测试文件
            os.remove(test_file)
            
            results["tests"]["sequential_write"] = {
                "throughput_mb_s": round(100 / write_time, 2),
                "time_seconds": round(write_time, 2)
            }
            
            results["tests"]["sequential_read"] = {
                "throughput_mb_s": round(100 / read_time, 2),
                "time_seconds": round(read_time, 2)
            }
            
        except Exception as e:
            results["tests"]["error"] = str(e)
        
        return results
    
    def recommend_configuration(self, storage_type: StorageType, 
                              optimization_target: OptimizationTarget) -> Optional[StorageConfiguration]:
        """推荐存储配置"""
        return self.configurations.get((storage_type, optimization_target))
    
    def generate_mount_command(self, device: str, mount_point: str, 
                             config: StorageConfiguration) -> str:
        """生成挂载命令"""
        options = ",".join(config.mount_options)
        return f"mount -t {config.filesystem.value} -o {options} {device} {mount_point}"
    
    def generate_fstab_entry(self, device: str, mount_point: str, 
                           config: StorageConfiguration) -> str:
        """生成fstab条目"""
        options = ",".join(config.mount_options)
        return f"{device} {mount_point} {config.filesystem.value} {options} 0 2"
    
    def generate_sysctl_config(self, config: StorageConfiguration) -> str:
        """生成sysctl配置"""
        lines = [
            f"# {config.name}",
            f"# {config.description}",
            f"# 预期改进: {config.expected_improvement}",
            ""
        ]
        
        for param in config.kernel_parameters:
            lines.append(f"# {param.description} - {param.impact}")
            lines.append(f"{param.name} = {param.recommended_value}")
            lines.append("")
        
        return "\n".join(lines)
    
    def generate_broker_config(self, config: StorageConfiguration) -> str:
        """生成Broker配置"""
        lines = [
            f"# {config.name} - RocketMQ Broker配置",
            f"# {config.description}",
            ""
        ]
        
        for key, value in config.rocketmq_parameters.items():
            if isinstance(value, bool):
                lines.append(f"{key}={str(value).lower()}")
            else:
                lines.append(f"{key}={value}")
        
        return "\n".join(lines)
    
    def print_optimization_guide(self, storage_type: StorageType = None, 
                               optimization_target: OptimizationTarget = None):
        """打印存储优化指南"""
        print("=== RocketMQ存储优化指南 ===")
        
        if storage_type and optimization_target:
            config = self.configurations.get((storage_type, optimization_target))
            if config:
                print(f"\n🎯 {config.name}")
                print(f"存储类型: {config.storage_type.value}")
                print(f"文件系统: {config.filesystem.value}")
                print(f"优化目标: {config.optimization_target.value}")
                print(f"描述: {config.description}")
                print(f"预期改进: {config.expected_improvement}")
                
                print(f"\n适用场景:")
                for use_case in config.use_cases:
                    print(f"  • {use_case}")
                
                print(f"\n挂载选项:")
                for option in config.mount_options:
                    print(f"  • {option}")
                
                print(f"\n内核参数:")
                for param in config.kernel_parameters:
                    print(f"  • {param.name} = {param.recommended_value}")
                    print(f"    {param.description} - {param.impact}")
                
                print(f"\nRocketMQ参数:")
                for key, value in config.rocketmq_parameters.items():
                    print(f"  • {key} = {value}")
        else:
            for (st, ot), config in self.configurations.items():
                print(f"\n🎯 {config.name}")
                print(f"存储类型: {st.value}, 优化目标: {ot.value}")
                print(f"描述: {config.description}")
                print(f"预期改进: {config.expected_improvement}")

# 使用示例
if __name__ == "__main__":
    # 创建存储优化器
    optimizer = StorageOptimizer()
    
    # 检测存储类型
    storage_type = optimizer.detect_storage_type("/opt/rocketmq")
    print(f"检测到的存储类型: {storage_type.value}")
    
    # 分析存储性能
    # perf_results = optimizer.analyze_storage_performance("/tmp")
    # print(f"\n存储性能分析: {json.dumps(perf_results, indent=2)}")
    
    # 推荐配置
    config = optimizer.recommend_configuration(StorageType.SSD, OptimizationTarget.THROUGHPUT)
    if config:
        print(f"\n推荐配置: {config.name}")
        
        # 生成配置文件
        mount_cmd = optimizer.generate_mount_command("/dev/sdb1", "/opt/rocketmq", config)
        print(f"\n挂载命令: {mount_cmd}")
        
        fstab_entry = optimizer.generate_fstab_entry("/dev/sdb1", "/opt/rocketmq", config)
        print(f"\nfstab条目: {fstab_entry}")
        
        sysctl_config = optimizer.generate_sysctl_config(config)
        print(f"\n=== sysctl配置 ===\n{sysctl_config}")
        
        broker_config = optimizer.generate_broker_config(config)
        print(f"\n=== Broker配置 ===\n{broker_config}")
    
    # 打印优化指南
    optimizer.print_optimization_guide(StorageType.SSD, OptimizationTarget.THROUGHPUT)

8.4 内存管理优化

8.4.1 内存分配策略

from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
import psutil
import gc
import threading
import time
from datetime import datetime

class MemoryType(Enum):
    """内存类型"""
    HEAP = "heap"  # 堆内存
    DIRECT = "direct"  # 直接内存
    STACK = "stack"  # 栈内存
    METASPACE = "metaspace"  # 元空间
    CODE_CACHE = "code_cache"  # 代码缓存

class AllocationStrategy(Enum):
    """内存分配策略"""
    CONSERVATIVE = "conservative"  # 保守策略
    AGGRESSIVE = "aggressive"  # 激进策略
    BALANCED = "balanced"  # 平衡策略
    ADAPTIVE = "adaptive"  # 自适应策略

class MemoryPool(Enum):
    """内存池类型"""
    YOUNG_GEN = "young_generation"  # 年轻代
    OLD_GEN = "old_generation"  # 老年代
    SURVIVOR = "survivor"  # 幸存者区
    EDEN = "eden"  # Eden区
    PERM_GEN = "permanent_generation"  # 永久代

@dataclass
class MemoryConfiguration:
    """内存配置"""
    memory_type: MemoryType
    initial_size: int  # 初始大小(MB)
    max_size: int  # 最大大小(MB)
    allocation_strategy: AllocationStrategy
    gc_algorithm: str
    parameters: Dict[str, Any]
    
class MemoryAllocator:
    """内存分配器"""
    
    def __init__(self):
        self.configurations = {}
        self.allocation_history = []
        self.memory_pools = {}
        self.gc_stats = {}
        
    def configure_memory_pools(self) -> Dict[MemoryPool, MemoryConfiguration]:
        """配置内存池"""
        configs = {
            MemoryPool.YOUNG_GEN: MemoryConfiguration(
                memory_type=MemoryType.HEAP,
                initial_size=512,
                max_size=2048,
                allocation_strategy=AllocationStrategy.AGGRESSIVE,
                gc_algorithm="G1GC",
                parameters={
                    "NewRatio": 2,
                    "SurvivorRatio": 8,
                    "MaxTenuringThreshold": 15,
                    "PretenureSizeThreshold": "1m"
                }
            ),
            MemoryPool.OLD_GEN: MemoryConfiguration(
                memory_type=MemoryType.HEAP,
                initial_size=1024,
                max_size=4096,
                allocation_strategy=AllocationStrategy.CONSERVATIVE,
                gc_algorithm="G1GC",
                parameters={
                    "G1HeapRegionSize": "16m",
                    "G1MixedGCCountTarget": 8,
                    "G1OldCSetRegionThreshold": 10
                }
            ),
            MemoryPool.METASPACE: MemoryConfiguration(
                memory_type=MemoryType.METASPACE,
                initial_size=256,
                max_size=512,
                allocation_strategy=AllocationStrategy.BALANCED,
                gc_algorithm="G1GC",
                parameters={
                    "MetaspaceSize": "256m",
                    "MaxMetaspaceSize": "512m",
                    "CompressedClassSpaceSize": "1g"
                }
            )
        }
        
        self.memory_pools.update(configs)
        return configs
    
    def optimize_allocation_strategy(self, workload_type: str) -> AllocationStrategy:
        """优化分配策略"""
        strategies = {
            "high_throughput": AllocationStrategy.AGGRESSIVE,
            "low_latency": AllocationStrategy.CONSERVATIVE,
            "mixed_workload": AllocationStrategy.BALANCED,
            "variable_load": AllocationStrategy.ADAPTIVE
        }
        
        return strategies.get(workload_type, AllocationStrategy.BALANCED)
    
    def calculate_optimal_sizes(self, system_memory: int, 
                              broker_count: int) -> Dict[str, int]:
        """计算最优内存大小"""
        # 系统预留内存
        system_reserved = max(1024, system_memory * 0.1)  # 至少1GB或10%
        
        # 可用内存
        available_memory = system_memory - system_reserved
        
        # 每个Broker的内存分配
        broker_memory = available_memory // broker_count
        
        # 内存分配比例
        heap_ratio = 0.7  # 70%用于堆内存
        direct_ratio = 0.2  # 20%用于直接内存
        other_ratio = 0.1  # 10%用于其他
        
        return {
            "heap_size": int(broker_memory * heap_ratio),
            "direct_memory": int(broker_memory * direct_ratio),
            "young_gen": int(broker_memory * heap_ratio * 0.3),
            "old_gen": int(broker_memory * heap_ratio * 0.7),
            "metaspace": min(512, int(broker_memory * 0.05))
        }
    
    def generate_jvm_memory_options(self, memory_sizes: Dict[str, int]) -> List[str]:
        """生成JVM内存选项"""
        options = [
            f"-Xms{memory_sizes['heap_size']}m",
            f"-Xmx{memory_sizes['heap_size']}m",
            f"-XX:MaxDirectMemorySize={memory_sizes['direct_memory']}m",
            f"-XX:MetaspaceSize={memory_sizes['metaspace']}m",
            f"-XX:MaxMetaspaceSize={memory_sizes['metaspace']}m",
            
            # 年轻代配置
            f"-XX:NewSize={memory_sizes['young_gen']}m",
            f"-XX:MaxNewSize={memory_sizes['young_gen']}m",
            
            # GC配置
            "-XX:+UseG1GC",
            "-XX:G1HeapRegionSize=16m",
            "-XX:G1ReservePercent=25",
            "-XX:InitiatingHeapOccupancyPercent=30",
            "-XX:SoftRefLRUPolicyMSPerMB=0",
            
            # 内存优化
            "-XX:+UseCompressedOops",
            "-XX:+UseCompressedClassPointers",
            "-XX:+UseLargePages",
            "-XX:+AlwaysPreTouch",
            
            # 监控和调试
            "-XX:+PrintGCDetails",
            "-XX:+PrintGCTimeStamps",
            "-XX:+PrintGCApplicationStoppedTime",
            "-Xloggc:gc.log"
        ]
        
        return options
    
    def monitor_memory_usage(self) -> Dict[str, Any]:
        """监控内存使用情况"""
        process = psutil.Process()
        memory_info = process.memory_info()
        
        return {
            "rss": memory_info.rss // 1024 // 1024,  # MB
            "vms": memory_info.vms // 1024 // 1024,  # MB
            "percent": process.memory_percent(),
            "available": psutil.virtual_memory().available // 1024 // 1024,
            "timestamp": datetime.now().isoformat()
        }
    
    def detect_memory_leaks(self, threshold: float = 5.0) -> List[Dict[str, Any]]:
        """检测内存泄漏"""
        leaks = []
        
        # 获取当前内存使用
        current_usage = self.monitor_memory_usage()
        
        # 检查内存增长趋势
        if len(self.allocation_history) > 10:
            recent_usage = self.allocation_history[-10:]
            growth_rate = (current_usage['rss'] - recent_usage[0]['rss']) / len(recent_usage)
            
            if growth_rate > threshold:
                leaks.append({
                    "type": "memory_growth",
                    "growth_rate": growth_rate,
                    "current_usage": current_usage['rss'],
                    "threshold": threshold
                })
        
        # 记录当前使用情况
        self.allocation_history.append(current_usage)
        
        # 保持历史记录在合理范围内
        if len(self.allocation_history) > 100:
            self.allocation_history = self.allocation_history[-50:]
        
        return leaks
    
    def optimize_gc_parameters(self, latency_target: int = 100) -> Dict[str, str]:
        """优化GC参数"""
        if latency_target <= 50:  # 低延迟要求
            return {
                "XX:MaxGCPauseMillis": "50",
                "XX:G1MixedGCLiveThresholdPercent": "85",
                "XX:G1HeapWastePercent": "5",
                "XX:G1MixedGCCountTarget": "4",
                "XX:+G1UseAdaptiveIHOP": "",
                "XX:G1AdaptiveIHOPNumInitialSamples": "3"
            }
        elif latency_target <= 100:  # 中等延迟要求
            return {
                "XX:MaxGCPauseMillis": "100",
                "XX:G1MixedGCLiveThresholdPercent": "75",
                "XX:G1HeapWastePercent": "10",
                "XX:G1MixedGCCountTarget": "8",
                "XX:+G1UseAdaptiveIHOP": ""
            }
        else:  # 高吞吐量要求
            return {
                "XX:MaxGCPauseMillis": "200",
                "XX:G1MixedGCLiveThresholdPercent": "65",
                "XX:G1HeapWastePercent": "15",
                "XX:G1MixedGCCountTarget": "16",
                "XX:+UnlockExperimentalVMOptions": "",
                "XX:+UseTransparentHugePages": ""
            }

# 使用示例
if __name__ == "__main__":
    allocator = MemoryAllocator()
    
    # 配置内存池
    pools = allocator.configure_memory_pools()
    print("内存池配置:")
    for pool, config in pools.items():
        print(f"  {pool.value}: {config.initial_size}MB - {config.max_size}MB")
    
    # 计算最优内存大小
    system_memory = 16384  # 16GB
    broker_count = 3
    memory_sizes = allocator.calculate_optimal_sizes(system_memory, broker_count)
    print(f"\n内存分配 (系统内存: {system_memory}MB, Broker数量: {broker_count}):")
    for key, value in memory_sizes.items():
        print(f"  {key}: {value}MB")
    
    # 生成JVM选项
    jvm_options = allocator.generate_jvm_memory_options(memory_sizes)
    print("\nJVM内存选项:")
    for option in jvm_options:
        print(f"  {option}")
    
    # 优化GC参数
    gc_params = allocator.optimize_gc_parameters(latency_target=50)
    print("\nGC优化参数:")
    for param, value in gc_params.items():
        print(f"  -{param}={value}" if value else f"  -{param}")

8.4.2 垃圾回收优化

class GCType(Enum):
    """垃圾回收器类型"""
    SERIAL = "Serial"  # 串行GC
    PARALLEL = "Parallel"  # 并行GC
    CMS = "CMS"  # CMS GC
    G1 = "G1"  # G1 GC
    ZGC = "ZGC"  # ZGC
    SHENANDOAH = "Shenandoah"  # Shenandoah GC

class GCPhase(Enum):
    """GC阶段"""
    YOUNG = "young"  # 年轻代GC
    OLD = "old"  # 老年代GC
    MIXED = "mixed"  # 混合GC
    FULL = "full"  # 完整GC

@dataclass
class GCMetrics:
    """GC指标"""
    gc_type: GCType
    phase: GCPhase
    duration: float  # 持续时间(ms)
    before_size: int  # GC前大小(MB)
    after_size: int  # GC后大小(MB)
    freed_memory: int  # 释放内存(MB)
    timestamp: datetime
    pause_time: float  # 暂停时间(ms)

@dataclass
class GCConfiguration:
    """GC配置"""
    gc_type: GCType
    heap_size: int  # 堆大小(MB)
    young_gen_size: int  # 年轻代大小(MB)
    parameters: Dict[str, str]
    expected_pause: float  # 期望暂停时间(ms)
    throughput_target: float  # 吞吐量目标(%)

class GCOptimizer:
    """垃圾回收优化器"""
    
    def __init__(self):
        self.gc_history = []
        self.configurations = {}
        self.performance_baselines = {}
        
    def analyze_gc_logs(self, log_file: str) -> List[GCMetrics]:
        """分析GC日志"""
        metrics = []
        
        # 模拟GC日志解析
        sample_metrics = [
            GCMetrics(
                gc_type=GCType.G1,
                phase=GCPhase.YOUNG,
                duration=15.2,
                before_size=1024,
                after_size=256,
                freed_memory=768,
                timestamp=datetime.now(),
                pause_time=12.5
            ),
            GCMetrics(
                gc_type=GCType.G1,
                phase=GCPhase.MIXED,
                duration=45.8,
                before_size=3072,
                after_size=1536,
                freed_memory=1536,
                timestamp=datetime.now(),
                pause_time=38.2
            )
        ]
        
        metrics.extend(sample_metrics)
        self.gc_history.extend(metrics)
        return metrics
    
    def calculate_gc_statistics(self) -> Dict[str, Any]:
        """计算GC统计信息"""
        if not self.gc_history:
            return {}
        
        young_gcs = [m for m in self.gc_history if m.phase == GCPhase.YOUNG]
        old_gcs = [m for m in self.gc_history if m.phase in [GCPhase.OLD, GCPhase.MIXED]]
        
        stats = {
            "total_gcs": len(self.gc_history),
            "young_gc_count": len(young_gcs),
            "old_gc_count": len(old_gcs),
            "avg_young_pause": statistics.mean([m.pause_time for m in young_gcs]) if young_gcs else 0,
            "avg_old_pause": statistics.mean([m.pause_time for m in old_gcs]) if old_gcs else 0,
            "max_pause": max([m.pause_time for m in self.gc_history]),
            "total_freed_memory": sum([m.freed_memory for m in self.gc_history]),
            "gc_frequency": len(self.gc_history) / 3600 if self.gc_history else 0  # 每小时GC次数
        }
        
        return stats
    
    def recommend_gc_algorithm(self, requirements: Dict[str, Any]) -> GCType:
        """推荐GC算法"""
        heap_size = requirements.get("heap_size", 4096)  # MB
        latency_requirement = requirements.get("max_pause_ms", 100)
        throughput_requirement = requirements.get("min_throughput", 95)
        
        if heap_size > 32768:  # > 32GB
            if latency_requirement <= 10:
                return GCType.ZGC
            elif latency_requirement <= 50:
                return GCType.SHENANDOAH
            else:
                return GCType.G1
        elif heap_size > 4096:  # > 4GB
            if latency_requirement <= 100:
                return GCType.G1
            else:
                return GCType.PARALLEL
        else:  # <= 4GB
            if throughput_requirement >= 98:
                return GCType.PARALLEL
            else:
                return GCType.G1
    
    def generate_gc_configuration(self, gc_type: GCType, 
                                heap_size: int,
                                latency_target: int = 100) -> GCConfiguration:
        """生成GC配置"""
        young_gen_size = heap_size // 3  # 年轻代占1/3
        
        if gc_type == GCType.G1:
            parameters = {
                "XX:+UseG1GC": "",
                "XX:MaxGCPauseMillis": str(latency_target),
                "XX:G1HeapRegionSize": "16m" if heap_size <= 8192 else "32m",
                "XX:G1NewSizePercent": "30",
                "XX:G1MaxNewSizePercent": "40",
                "XX:G1ReservePercent": "15",
                "XX:InitiatingHeapOccupancyPercent": "45",
                "XX:G1MixedGCCountTarget": "8",
                "XX:G1MixedGCLiveThresholdPercent": "85"
            }
            
            if latency_target <= 50:
                parameters.update({
                    "XX:G1MixedGCCountTarget": "4",
                    "XX:G1HeapWastePercent": "5",
                    "XX:+G1UseAdaptiveIHOP": ""
                })
                
        elif gc_type == GCType.ZGC:
            parameters = {
                "XX:+UnlockExperimentalVMOptions": "",
                "XX:+UseZGC": "",
                "XX:+UseLargePages": "",
                "XX:ZCollectionInterval": "5",
                "XX:ZUncommitDelay": "300"
            }
            
        elif gc_type == GCType.PARALLEL:
            parameters = {
                "XX:+UseParallelGC": "",
                "XX:ParallelGCThreads": str(min(8, psutil.cpu_count())),
                "XX:MaxGCPauseMillis": str(latency_target * 2),
                "XX:GCTimeRatio": "99",
                "XX:+UseAdaptiveSizePolicy": ""
            }
            
        elif gc_type == GCType.CMS:
            parameters = {
                "XX:+UseConcMarkSweepGC": "",
                "XX:+UseParNewGC": "",
                "XX:CMSInitiatingOccupancyFraction": "75",
                "XX:+UseCMSInitiatingOccupancyOnly": "",
                "XX:+CMSParallelRemarkEnabled": "",
                "XX:+CMSClassUnloadingEnabled": ""
            }
            
        else:  # SHENANDOAH
            parameters = {
                "XX:+UnlockExperimentalVMOptions": "",
                "XX:+UseShenandoahGC": "",
                "XX:ShenandoahGCHeuristics": "adaptive",
                "XX:+UseLargePages": ""
            }
        
        return GCConfiguration(
            gc_type=gc_type,
            heap_size=heap_size,
            young_gen_size=young_gen_size,
            parameters=parameters,
            expected_pause=latency_target,
            throughput_target=95.0
        )
    
    def tune_gc_parameters(self, current_stats: Dict[str, Any], 
                          target_pause: float = 100) -> Dict[str, str]:
        """调优GC参数"""
        tuned_params = {}
        
        avg_pause = current_stats.get("avg_young_pause", 0)
        max_pause = current_stats.get("max_pause", 0)
        gc_frequency = current_stats.get("gc_frequency", 0)
        
        # 如果暂停时间过长
        if avg_pause > target_pause * 1.2:
            tuned_params.update({
                "XX:MaxGCPauseMillis": str(int(target_pause * 0.8)),
                "XX:G1MixedGCCountTarget": "4",  # 减少混合GC目标
                "XX:G1HeapWastePercent": "5"  # 减少堆浪费百分比
            })
        
        # 如果GC频率过高
        if gc_frequency > 10:  # 每小时超过10次
            tuned_params.update({
                "XX:InitiatingHeapOccupancyPercent": "60",  # 提高触发阈值
                "XX:G1ReservePercent": "20"  # 增加预留空间
            })
        
        # 如果最大暂停时间过长
        if max_pause > target_pause * 2:
            tuned_params.update({
                "XX:+G1UseAdaptiveIHOP": "",
                "XX:G1AdaptiveIHOPNumInitialSamples": "3",
                "XX:G1MixedGCLiveThresholdPercent": "90"
            })
        
        return tuned_params
    
    def generate_gc_monitoring_script(self) -> str:
        """生成GC监控脚本"""
        script = '''
#!/bin/bash

# GC监控脚本
LOG_FILE="gc_monitor.log"
GC_LOG="gc.log"
ALERT_THRESHOLD=200  # 暂停时间阈值(ms)

echo "$(date): 开始GC监控" >> $LOG_FILE

while true; do
    if [ -f "$GC_LOG" ]; then
        # 检查最近的GC暂停时间
        LAST_PAUSE=$(tail -n 10 $GC_LOG | grep -o "[0-9.]\+ms" | tail -n 1 | sed 's/ms//')
        
        if [ ! -z "$LAST_PAUSE" ]; then
            PAUSE_INT=$(echo "$LAST_PAUSE" | cut -d'.' -f1)
            
            if [ "$PAUSE_INT" -gt "$ALERT_THRESHOLD" ]; then
                echo "$(date): 警告 - GC暂停时间过长: ${LAST_PAUSE}ms" >> $LOG_FILE
                # 可以在这里添加告警通知
            fi
        fi
        
        # 统计GC频率
        GC_COUNT=$(grep -c "GC pause" $GC_LOG)
        echo "$(date): 当前GC次数: $GC_COUNT" >> $LOG_FILE
    fi
    
    sleep 60  # 每分钟检查一次
done
'''
        return script
    
    def print_gc_recommendations(self, requirements: Dict[str, Any]):
        """打印GC推荐配置"""
        recommended_gc = self.recommend_gc_algorithm(requirements)
        config = self.generate_gc_configuration(
            recommended_gc, 
            requirements.get("heap_size", 4096),
            requirements.get("max_pause_ms", 100)
        )
        
        print(f"\n🎯 推荐GC配置")
        print(f"GC算法: {config.gc_type.value}")
        print(f"堆大小: {config.heap_size}MB")
        print(f"年轻代大小: {config.young_gen_size}MB")
        print(f"期望暂停时间: {config.expected_pause}ms")
        print(f"吞吐量目标: {config.throughput_target}%")
        
        print(f"\n📋 JVM参数:")
        for param, value in config.parameters.items():
            if value:
                print(f"  -{param}={value}")
            else:
                print(f"  -{param}")

# 使用示例
if __name__ == "__main__":
    optimizer = GCOptimizer()
    
    # 分析GC日志
    metrics = optimizer.analyze_gc_logs("gc.log")
    print(f"解析到 {len(metrics)} 条GC记录")
    
    # 计算统计信息
    stats = optimizer.calculate_gc_statistics()
    print(f"\nGC统计信息:")
    for key, value in stats.items():
        print(f"  {key}: {value}")
    
    # 推荐GC配置
    requirements = {
        "heap_size": 8192,  # 8GB
        "max_pause_ms": 50,
        "min_throughput": 95
    }
    
    optimizer.print_gc_recommendations(requirements)
    
    # 生成调优参数
    tuned_params = optimizer.tune_gc_parameters(stats, target_pause=50)
    if tuned_params:
        print(f"\n🔧 调优建议:")
        for param, value in tuned_params.items():
            print(f"  -{param}={value}")

8.5 缓存优化

8.5.1 消息缓存策略

from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Any, Tuple
import threading
import time
from collections import OrderedDict, defaultdict
import hashlib
import json
from datetime import datetime, timedelta

class CacheType(Enum):
    """缓存类型"""
    MEMORY = "memory"  # 内存缓存
    DISK = "disk"  # 磁盘缓存
    HYBRID = "hybrid"  # 混合缓存
    DISTRIBUTED = "distributed"  # 分布式缓存

class EvictionPolicy(Enum):
    """淘汰策略"""
    LRU = "lru"  # 最近最少使用
    LFU = "lfu"  # 最少使用频率
    FIFO = "fifo"  # 先进先出
    TTL = "ttl"  # 生存时间
    RANDOM = "random"  # 随机淘汰

class CacheLevel(Enum):
    """缓存级别"""
    L1 = "l1"  # 一级缓存(内存)
    L2 = "l2"  # 二级缓存(SSD)
    L3 = "l3"  # 三级缓存(HDD)

@dataclass
class CacheEntry:
    """缓存条目"""
    key: str
    value: Any
    timestamp: float
    access_count: int
    size: int  # 字节数
    ttl: Optional[float] = None  # 生存时间
    last_access: float = 0
    
    def is_expired(self) -> bool:
        """检查是否过期"""
        if self.ttl is None:
            return False
        return time.time() - self.timestamp > self.ttl
    
    def update_access(self):
        """更新访问信息"""
        self.access_count += 1
        self.last_access = time.time()

@dataclass
class CacheConfiguration:
    """缓存配置"""
    cache_type: CacheType
    max_size: int  # 最大大小(MB)
    eviction_policy: EvictionPolicy
    ttl_seconds: Optional[int] = None
    max_entries: Optional[int] = None
    compression_enabled: bool = False
    persistence_enabled: bool = False
    
class MessageCache:
    """消息缓存"""
    
    def __init__(self, config: CacheConfiguration):
        self.config = config
        self.cache = OrderedDict()  # 用于LRU
        self.access_frequency = defaultdict(int)  # 用于LFU
        self.lock = threading.RLock()
        self.stats = {
            "hits": 0,
            "misses": 0,
            "evictions": 0,
            "size_bytes": 0,
            "entry_count": 0
        }
        
    def get(self, key: str) -> Optional[Any]:
        """获取缓存值"""
        with self.lock:
            if key in self.cache:
                entry = self.cache[key]
                
                # 检查是否过期
                if entry.is_expired():
                    self._remove_entry(key)
                    self.stats["misses"] += 1
                    return None
                
                # 更新访问信息
                entry.update_access()
                self.access_frequency[key] += 1
                
                # LRU: 移到末尾
                if self.config.eviction_policy == EvictionPolicy.LRU:
                    self.cache.move_to_end(key)
                
                self.stats["hits"] += 1
                return entry.value
            else:
                self.stats["misses"] += 1
                return None
    
    def put(self, key: str, value: Any, ttl: Optional[float] = None) -> bool:
        """存储缓存值"""
        with self.lock:
            # 计算值大小
            value_size = self._calculate_size(value)
            
            # 检查是否需要淘汰
            if not self._ensure_capacity(value_size):
                return False
            
            # 创建缓存条目
            entry = CacheEntry(
                key=key,
                value=value,
                timestamp=time.time(),
                access_count=1,
                size=value_size,
                ttl=ttl or self.config.ttl_seconds,
                last_access=time.time()
            )
            
            # 如果key已存在,先移除旧条目
            if key in self.cache:
                self._remove_entry(key)
            
            # 添加新条目
            self.cache[key] = entry
            self.access_frequency[key] = 1
            self.stats["size_bytes"] += value_size
            self.stats["entry_count"] += 1
            
            return True
    
    def remove(self, key: str) -> bool:
        """移除缓存条目"""
        with self.lock:
            if key in self.cache:
                self._remove_entry(key)
                return True
            return False
    
    def clear(self):
        """清空缓存"""
        with self.lock:
            self.cache.clear()
            self.access_frequency.clear()
            self.stats["size_bytes"] = 0
            self.stats["entry_count"] = 0
    
    def _remove_entry(self, key: str):
        """移除条目(内部方法)"""
        if key in self.cache:
            entry = self.cache.pop(key)
            self.access_frequency.pop(key, 0)
            self.stats["size_bytes"] -= entry.size
            self.stats["entry_count"] -= 1
    
    def _ensure_capacity(self, new_size: int) -> bool:
        """确保有足够容量"""
        max_size_bytes = self.config.max_size * 1024 * 1024
        
        # 检查大小限制
        while (self.stats["size_bytes"] + new_size > max_size_bytes or 
               (self.config.max_entries and 
                self.stats["entry_count"] >= self.config.max_entries)):
            
            if not self._evict_one():
                return False
        
        return True
    
    def _evict_one(self) -> bool:
        """淘汰一个条目"""
        if not self.cache:
            return False
        
        key_to_evict = None
        
        if self.config.eviction_policy == EvictionPolicy.LRU:
            # 淘汰最久未使用的
            key_to_evict = next(iter(self.cache))
            
        elif self.config.eviction_policy == EvictionPolicy.LFU:
            # 淘汰使用频率最低的
            key_to_evict = min(self.access_frequency.keys(), 
                             key=lambda k: self.access_frequency[k])
            
        elif self.config.eviction_policy == EvictionPolicy.FIFO:
            # 淘汰最早添加的
            key_to_evict = next(iter(self.cache))
            
        elif self.config.eviction_policy == EvictionPolicy.TTL:
            # 淘汰最早过期的
            now = time.time()
            expired_keys = [k for k, v in self.cache.items() if v.is_expired()]
            if expired_keys:
                key_to_evict = expired_keys[0]
            else:
                key_to_evict = next(iter(self.cache))
        
        if key_to_evict:
            self._remove_entry(key_to_evict)
            self.stats["evictions"] += 1
            return True
        
        return False
    
    def _calculate_size(self, value: Any) -> int:
        """计算值的大小"""
        try:
            if isinstance(value, (str, bytes)):
                return len(value.encode('utf-8') if isinstance(value, str) else value)
            else:
                return len(json.dumps(value, default=str).encode('utf-8'))
        except:
            return 1024  # 默认1KB
    
    def get_stats(self) -> Dict[str, Any]:
        """获取缓存统计信息"""
        with self.lock:
            hit_rate = (self.stats["hits"] / 
                       (self.stats["hits"] + self.stats["misses"]) 
                       if (self.stats["hits"] + self.stats["misses"]) > 0 else 0)
            
            return {
                **self.stats,
                "hit_rate": hit_rate,
                "miss_rate": 1 - hit_rate,
                "size_mb": self.stats["size_bytes"] / (1024 * 1024),
                "avg_entry_size": (self.stats["size_bytes"] / self.stats["entry_count"] 
                                 if self.stats["entry_count"] > 0 else 0)
            }

class MultiLevelCache:
    """多级缓存"""
    
    def __init__(self):
        self.levels = {}
        self.promotion_threshold = 3  # 访问次数阈值
        
    def add_level(self, level: CacheLevel, cache: MessageCache):
        """添加缓存级别"""
        self.levels[level] = cache
    
    def get(self, key: str) -> Optional[Any]:
        """从多级缓存获取值"""
        # 从L1开始查找
        for level in [CacheLevel.L1, CacheLevel.L2, CacheLevel.L3]:
            if level in self.levels:
                value = self.levels[level].get(key)
                if value is not None:
                    # 提升到更高级别的缓存
                    self._promote_to_higher_level(key, value, level)
                    return value
        return None
    
    def put(self, key: str, value: Any, ttl: Optional[float] = None):
        """存储到多级缓存"""
        # 优先存储到L1缓存
        if CacheLevel.L1 in self.levels:
            if not self.levels[CacheLevel.L1].put(key, value, ttl):
                # L1缓存满了,存储到L2
                if CacheLevel.L2 in self.levels:
                    self.levels[CacheLevel.L2].put(key, value, ttl)
        
    def _promote_to_higher_level(self, key: str, value: Any, current_level: CacheLevel):
        """提升到更高级别的缓存"""
        if current_level == CacheLevel.L2 and CacheLevel.L1 in self.levels:
            self.levels[CacheLevel.L1].put(key, value)
        elif current_level == CacheLevel.L3 and CacheLevel.L2 in self.levels:
            self.levels[CacheLevel.L2].put(key, value)
    
    def get_overall_stats(self) -> Dict[str, Any]:
        """获取整体统计信息"""
        overall_stats = {
            "total_hits": 0,
            "total_misses": 0,
            "total_size_mb": 0,
            "levels": {}
        }
        
        for level, cache in self.levels.items():
            stats = cache.get_stats()
            overall_stats["levels"][level.value] = stats
            overall_stats["total_hits"] += stats["hits"]
            overall_stats["total_misses"] += stats["misses"]
            overall_stats["total_size_mb"] += stats["size_mb"]
        
        total_requests = overall_stats["total_hits"] + overall_stats["total_misses"]
        overall_stats["overall_hit_rate"] = (overall_stats["total_hits"] / total_requests 
                                           if total_requests > 0 else 0)
        
        return overall_stats

# 使用示例
if __name__ == "__main__":
    # 创建L1缓存配置(内存缓存)
    l1_config = CacheConfiguration(
        cache_type=CacheType.MEMORY,
        max_size=256,  # 256MB
        eviction_policy=EvictionPolicy.LRU,
        ttl_seconds=300,  # 5分钟
        max_entries=10000
    )
    
    # 创建L2缓存配置(SSD缓存)
    l2_config = CacheConfiguration(
        cache_type=CacheType.DISK,
        max_size=1024,  # 1GB
        eviction_policy=EvictionPolicy.LFU,
        ttl_seconds=3600,  # 1小时
        max_entries=50000
    )
    
    # 创建缓存实例
    l1_cache = MessageCache(l1_config)
    l2_cache = MessageCache(l2_config)
    
    # 创建多级缓存
    multi_cache = MultiLevelCache()
    multi_cache.add_level(CacheLevel.L1, l1_cache)
    multi_cache.add_level(CacheLevel.L2, l2_cache)
    
    # 测试缓存操作
    test_data = {
        "message_1": {"content": "Hello World", "timestamp": time.time()},
        "message_2": {"content": "RocketMQ Cache", "timestamp": time.time()}
    }
    
    # 存储数据
    for key, value in test_data.items():
        multi_cache.put(key, value)
        print(f"存储: {key}")
    
    # 读取数据
    for key in test_data.keys():
        value = multi_cache.get(key)
        print(f"读取: {key} = {value}")
    
    # 打印统计信息
    stats = multi_cache.get_overall_stats()
    print(f"\n缓存统计信息:")
    print(f"总命中率: {stats['overall_hit_rate']:.2%}")
    print(f"总大小: {stats['total_size_mb']:.2f}MB")
    
    for level, level_stats in stats["levels"].items():
        print(f"\n{level.upper()}级缓存:")
        print(f"  命中率: {level_stats['hit_rate']:.2%}")
         print(f"  条目数: {level_stats['entry_count']}")
         print(f"  大小: {level_stats['size_mb']:.2f}MB")

8.5.2 预读取和预热策略

from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Any, Callable, Set
import asyncio
import threading
import time
from concurrent.futures import ThreadPoolExecutor
import heapq
from collections import deque
import logging

class PrefetchStrategy(Enum):
    """预读取策略"""
    SEQUENTIAL = "sequential"  # 顺序预读
    PATTERN_BASED = "pattern_based"  # 基于模式
    ML_PREDICTED = "ml_predicted"  # 机器学习预测
    TIME_BASED = "time_based"  # 基于时间
    FREQUENCY_BASED = "frequency_based"  # 基于频率

class WarmupStrategy(Enum):
    """预热策略"""
    EAGER = "eager"  # 立即预热
    LAZY = "lazy"  # 懒加载预热
    SCHEDULED = "scheduled"  # 定时预热
    ADAPTIVE = "adaptive"  # 自适应预热

@dataclass
class AccessPattern:
    """访问模式"""
    key: str
    access_time: float
    access_count: int
    sequence_position: int
    context: Dict[str, Any]

@dataclass
class PrefetchRequest:
    """预读取请求"""
    key: str
    priority: int
    predicted_access_time: float
    confidence: float
    strategy: PrefetchStrategy
    
    def __lt__(self, other):
        return self.priority < other.priority

@dataclass
class WarmupConfiguration:
    """预热配置"""
    strategy: WarmupStrategy
    batch_size: int = 100
    max_concurrent: int = 10
    warmup_percentage: float = 0.8  # 预热缓存容量的百分比
    schedule_interval: int = 3600  # 定时预热间隔(秒)
    pattern_window: int = 1000  # 模式分析窗口大小

class PatternAnalyzer:
    """访问模式分析器"""
    
    def __init__(self, window_size: int = 1000):
        self.window_size = window_size
        self.access_history = deque(maxlen=window_size)
        self.patterns = {}
        self.sequence_patterns = {}
        
    def record_access(self, key: str, context: Dict[str, Any] = None):
        """记录访问"""
        pattern = AccessPattern(
            key=key,
            access_time=time.time(),
            access_count=1,
            sequence_position=len(self.access_history),
            context=context or {}
        )
        
        self.access_history.append(pattern)
        self._update_patterns()
    
    def _update_patterns(self):
        """更新访问模式"""
        if len(self.access_history) < 2:
            return
        
        # 分析顺序模式
        recent_keys = [p.key for p in list(self.access_history)[-10:]]
        for i in range(len(recent_keys) - 1):
            current_key = recent_keys[i]
            next_key = recent_keys[i + 1]
            
            if current_key not in self.sequence_patterns:
                self.sequence_patterns[current_key] = {}
            
            if next_key not in self.sequence_patterns[current_key]:
                self.sequence_patterns[current_key][next_key] = 0
            
            self.sequence_patterns[current_key][next_key] += 1
    
    def predict_next_keys(self, current_key: str, count: int = 5) -> List[str]:
        """预测下一个可能访问的键"""
        if current_key not in self.sequence_patterns:
            return []
        
        # 按访问频率排序
        next_keys = sorted(
            self.sequence_patterns[current_key].items(),
            key=lambda x: x[1],
            reverse=True
        )
        
        return [key for key, _ in next_keys[:count]]
    
    def get_access_frequency(self, key: str) -> int:
        """获取访问频率"""
        return sum(1 for p in self.access_history if p.key == key)
    
    def get_time_patterns(self) -> Dict[str, List[float]]:
        """获取时间访问模式"""
        time_patterns = {}
        for pattern in self.access_history:
            if pattern.key not in time_patterns:
                time_patterns[pattern.key] = []
            time_patterns[pattern.key].append(pattern.access_time)
        return time_patterns

class PrefetchManager:
    """预读取管理器"""
    
    def __init__(self, cache: MessageCache, config: WarmupConfiguration):
        self.cache = cache
        self.config = config
        self.pattern_analyzer = PatternAnalyzer(config.pattern_window)
        self.prefetch_queue = []
        self.executor = ThreadPoolExecutor(max_workers=config.max_concurrent)
        self.running = False
        self.data_loader: Optional[Callable[[str], Any]] = None
        
    def set_data_loader(self, loader: Callable[[str], Any]):
        """设置数据加载器"""
        self.data_loader = loader
    
    def start(self):
        """启动预读取管理器"""
        self.running = True
        threading.Thread(target=self._prefetch_worker, daemon=True).start()
        
        if self.config.strategy == WarmupStrategy.SCHEDULED:
            threading.Thread(target=self._scheduled_warmup, daemon=True).start()
    
    def stop(self):
        """停止预读取管理器"""
        self.running = False
        self.executor.shutdown(wait=True)
    
    def record_access(self, key: str, context: Dict[str, Any] = None):
        """记录访问并触发预读取"""
        self.pattern_analyzer.record_access(key, context)
        self._trigger_prefetch(key)
    
    def _trigger_prefetch(self, current_key: str):
        """触发预读取"""
        if not self.data_loader:
            return
        
        # 基于模式预测下一个可能访问的键
        predicted_keys = self.pattern_analyzer.predict_next_keys(current_key)
        
        for i, key in enumerate(predicted_keys):
            # 检查是否已在缓存中
            if self.cache.get(key) is None:
                request = PrefetchRequest(
                    key=key,
                    priority=len(predicted_keys) - i,  # 优先级递减
                    predicted_access_time=time.time() + (i + 1) * 10,  # 预测访问时间
                    confidence=0.8 - i * 0.1,  # 置信度递减
                    strategy=PrefetchStrategy.PATTERN_BASED
                )
                
                heapq.heappush(self.prefetch_queue, request)
    
    def _prefetch_worker(self):
        """预读取工作线程"""
        while self.running:
            try:
                if self.prefetch_queue:
                    request = heapq.heappop(self.prefetch_queue)
                    
                    # 检查是否仍需要预读取
                    if self.cache.get(request.key) is None:
                        future = self.executor.submit(self._load_and_cache, request)
                        # 不等待完成,继续处理下一个请求
                
                time.sleep(0.1)  # 避免CPU占用过高
            except Exception as e:
                logging.error(f"预读取错误: {e}")
    
    def _load_and_cache(self, request: PrefetchRequest):
        """加载数据并缓存"""
        try:
            if self.data_loader:
                data = self.data_loader(request.key)
                if data is not None:
                    # 设置较短的TTL,因为是预读取的数据
                    ttl = 300  # 5分钟
                    self.cache.put(request.key, data, ttl)
                    logging.debug(f"预读取成功: {request.key}")
        except Exception as e:
            logging.error(f"预读取失败 {request.key}: {e}")
    
    def _scheduled_warmup(self):
        """定时预热"""
        while self.running:
            try:
                self.warmup_cache()
                time.sleep(self.config.schedule_interval)
            except Exception as e:
                logging.error(f"定时预热错误: {e}")
    
    def warmup_cache(self, keys: Optional[List[str]] = None):
        """预热缓存"""
        if not self.data_loader:
            return
        
        if keys is None:
            # 基于访问频率选择要预热的键
            keys = self._select_warmup_keys()
        
        # 分批预热
        for i in range(0, len(keys), self.config.batch_size):
            batch = keys[i:i + self.config.batch_size]
            futures = []
            
            for key in batch:
                if self.cache.get(key) is None:  # 只预热不在缓存中的数据
                    future = self.executor.submit(self._warmup_single_key, key)
                    futures.append(future)
            
            # 等待当前批次完成
            for future in futures:
                try:
                    future.result(timeout=30)  # 30秒超时
                except Exception as e:
                    logging.error(f"预热失败: {e}")
    
    def _select_warmup_keys(self) -> List[str]:
        """选择要预热的键"""
        # 基于访问频率选择
        key_frequencies = {}
        for pattern in self.pattern_analyzer.access_history:
            key = pattern.key
            if key not in key_frequencies:
                key_frequencies[key] = 0
            key_frequencies[key] += 1
        
        # 按频率排序,选择前N个
        sorted_keys = sorted(key_frequencies.items(), key=lambda x: x[1], reverse=True)
        
        # 计算要预热的数量
        cache_capacity = self.cache.config.max_entries or 1000
        warmup_count = int(cache_capacity * self.config.warmup_percentage)
        
        return [key for key, _ in sorted_keys[:warmup_count]]
    
    def _warmup_single_key(self, key: str):
        """预热单个键"""
        try:
            if self.data_loader:
                data = self.data_loader(key)
                if data is not None:
                    # 预热数据设置较长的TTL
                    ttl = 3600  # 1小时
                    self.cache.put(key, data, ttl)
                    logging.debug(f"预热成功: {key}")
        except Exception as e:
            logging.error(f"预热失败 {key}: {e}")
    
    def get_prefetch_stats(self) -> Dict[str, Any]:
        """获取预读取统计信息"""
        return {
            "queue_size": len(self.prefetch_queue),
            "pattern_history_size": len(self.pattern_analyzer.access_history),
            "sequence_patterns_count": len(self.pattern_analyzer.sequence_patterns),
            "running": self.running
        }

# 使用示例
if __name__ == "__main__":
    # 模拟数据加载器
    def mock_data_loader(key: str) -> Optional[Dict[str, Any]]:
        """模拟数据加载"""
        time.sleep(0.1)  # 模拟加载延迟
        return {
            "key": key,
            "data": f"Data for {key}",
            "timestamp": time.time()
        }
    
    # 创建缓存配置
    cache_config = CacheConfiguration(
        cache_type=CacheType.MEMORY,
        max_size=128,  # 128MB
        eviction_policy=EvictionPolicy.LRU,
        ttl_seconds=600,  # 10分钟
        max_entries=1000
    )
    
    # 创建预热配置
    warmup_config = WarmupConfiguration(
        strategy=WarmupStrategy.ADAPTIVE,
        batch_size=50,
        max_concurrent=5,
        warmup_percentage=0.6,
        pattern_window=500
    )
    
    # 创建缓存和预读取管理器
    cache = MessageCache(cache_config)
    prefetch_manager = PrefetchManager(cache, warmup_config)
    prefetch_manager.set_data_loader(mock_data_loader)
    prefetch_manager.start()
    
    try:
        # 模拟访问模式
        access_sequence = [
            "user_1", "user_2", "user_3",
            "user_1", "user_4", "user_2",
            "user_5", "user_1", "user_6"
        ]
        
        print("开始模拟访问...")
        for key in access_sequence:
            # 记录访问(触发预读取)
            prefetch_manager.record_access(key)
            
            # 尝试从缓存获取
            value = cache.get(key)
            if value:
                print(f"缓存命中: {key}")
            else:
                # 缓存未命中,手动加载
                value = mock_data_loader(key)
                cache.put(key, value)
                print(f"缓存未命中,已加载: {key}")
            
            time.sleep(0.5)  # 模拟访问间隔
        
        # 等待预读取完成
        time.sleep(2)
        
        # 打印统计信息
        cache_stats = cache.get_stats()
        prefetch_stats = prefetch_manager.get_prefetch_stats()
        
        print(f"\n缓存统计:")
        print(f"  命中率: {cache_stats['hit_rate']:.2%}")
        print(f"  条目数: {cache_stats['entry_count']}")
        print(f"  大小: {cache_stats['size_mb']:.2f}MB")
        
        print(f"\n预读取统计:")
        print(f"  队列大小: {prefetch_stats['queue_size']}")
        print(f"  模式历史: {prefetch_stats['pattern_history_size']}")
        print(f"  序列模式: {prefetch_stats['sequence_patterns_count']}")
        
    finally:
         prefetch_manager.stop()

8.6 性能基准测试

8.6.1 基准测试框架

from enum import Enum
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Callable, Tuple
import time
import threading
import statistics
import json
import csv
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
import psutil
import gc
from contextlib import contextmanager

class BenchmarkType(Enum):
    """基准测试类型"""
    THROUGHPUT = "throughput"  # 吞吐量测试
    LATENCY = "latency"  # 延迟测试
    STRESS = "stress"  # 压力测试
    ENDURANCE = "endurance"  # 耐久性测试
    SCALABILITY = "scalability"  # 可扩展性测试

class MetricType(Enum):
    """指标类型"""
    TPS = "tps"  # 每秒事务数
    QPS = "qps"  # 每秒查询数
    LATENCY_P50 = "latency_p50"  # 50%延迟
    LATENCY_P95 = "latency_p95"  # 95%延迟
    LATENCY_P99 = "latency_p99"  # 99%延迟
    CPU_USAGE = "cpu_usage"  # CPU使用率
    MEMORY_USAGE = "memory_usage"  # 内存使用率
    ERROR_RATE = "error_rate"  # 错误率

@dataclass
class BenchmarkResult:
    """基准测试结果"""
    test_name: str
    test_type: BenchmarkType
    start_time: datetime
    end_time: datetime
    duration: float  # 秒
    total_operations: int
    successful_operations: int
    failed_operations: int
    metrics: Dict[MetricType, float] = field(default_factory=dict)
    latencies: List[float] = field(default_factory=list)
    errors: List[str] = field(default_factory=list)
    system_metrics: Dict[str, Any] = field(default_factory=dict)
    
    @property
    def success_rate(self) -> float:
        """成功率"""
        if self.total_operations == 0:
            return 0.0
        return self.successful_operations / self.total_operations
    
    @property
    def error_rate(self) -> float:
        """错误率"""
        return 1.0 - self.success_rate
    
    @property
    def throughput(self) -> float:
        """吞吐量(操作/秒)"""
        if self.duration == 0:
            return 0.0
        return self.successful_operations / self.duration

@dataclass
class BenchmarkConfig:
    """基准测试配置"""
    test_name: str
    test_type: BenchmarkType
    duration_seconds: Optional[int] = None  # 测试持续时间
    total_operations: Optional[int] = None  # 总操作数
    concurrent_threads: int = 1  # 并发线程数
    warmup_seconds: int = 10  # 预热时间
    cooldown_seconds: int = 5  # 冷却时间
    sample_interval: float = 1.0  # 采样间隔
    enable_gc: bool = True  # 启用垃圾回收监控
    
class SystemMonitor:
    """系统监控器"""
    
    def __init__(self, sample_interval: float = 1.0):
        self.sample_interval = sample_interval
        self.monitoring = False
        self.metrics = []
        self.monitor_thread = None
        
    def start(self):
        """开始监控"""
        self.monitoring = True
        self.metrics = []
        self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
        self.monitor_thread.start()
    
    def stop(self) -> Dict[str, Any]:
        """停止监控并返回统计信息"""
        self.monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join(timeout=5)
        
        if not self.metrics:
            return {}
        
        # 计算统计信息
        cpu_values = [m['cpu_percent'] for m in self.metrics]
        memory_values = [m['memory_percent'] for m in self.metrics]
        
        return {
            'cpu_avg': statistics.mean(cpu_values),
            'cpu_max': max(cpu_values),
            'cpu_min': min(cpu_values),
            'memory_avg': statistics.mean(memory_values),
            'memory_max': max(memory_values),
            'memory_min': min(memory_values),
            'sample_count': len(self.metrics)
        }
    
    def _monitor_loop(self):
        """监控循环"""
        while self.monitoring:
            try:
                cpu_percent = psutil.cpu_percent(interval=None)
                memory = psutil.virtual_memory()
                
                self.metrics.append({
                    'timestamp': time.time(),
                    'cpu_percent': cpu_percent,
                    'memory_percent': memory.percent,
                    'memory_used': memory.used,
                    'memory_available': memory.available
                })
                
                time.sleep(self.sample_interval)
            except Exception as e:
                print(f"监控错误: {e}")
                break

class BenchmarkRunner:
    """基准测试运行器"""
    
    def __init__(self):
        self.results = []
        
    def run_benchmark(self, 
                     config: BenchmarkConfig,
                     test_function: Callable[[], Tuple[bool, float, Optional[str]]]) -> BenchmarkResult:
        """运行基准测试
        
        Args:
            config: 测试配置
            test_function: 测试函数,返回(成功标志, 延迟, 错误信息)
        """
        print(f"开始基准测试: {config.test_name}")
        
        # 预热
        if config.warmup_seconds > 0:
            print(f"预热 {config.warmup_seconds} 秒...")
            self._warmup(test_function, config.warmup_seconds)
        
        # 启动系统监控
        monitor = SystemMonitor(config.sample_interval)
        monitor.start()
        
        # 执行测试
        start_time = datetime.now()
        start_timestamp = time.time()
        
        if config.test_type == BenchmarkType.THROUGHPUT:
            result = self._run_throughput_test(config, test_function)
        elif config.test_type == BenchmarkType.LATENCY:
            result = self._run_latency_test(config, test_function)
        elif config.test_type == BenchmarkType.STRESS:
            result = self._run_stress_test(config, test_function)
        elif config.test_type == BenchmarkType.ENDURANCE:
            result = self._run_endurance_test(config, test_function)
        else:
            result = self._run_generic_test(config, test_function)
        
        end_time = datetime.now()
        end_timestamp = time.time()
        
        # 停止监控
        system_metrics = monitor.stop()
        
        # 冷却
        if config.cooldown_seconds > 0:
            print(f"冷却 {config.cooldown_seconds} 秒...")
            time.sleep(config.cooldown_seconds)
        
        # 创建结果对象
        benchmark_result = BenchmarkResult(
            test_name=config.test_name,
            test_type=config.test_type,
            start_time=start_time,
            end_time=end_time,
            duration=end_timestamp - start_timestamp,
            total_operations=result['total_operations'],
            successful_operations=result['successful_operations'],
            failed_operations=result['failed_operations'],
            latencies=result['latencies'],
            errors=result['errors'],
            system_metrics=system_metrics
        )
        
        # 计算指标
        self._calculate_metrics(benchmark_result)
        
        self.results.append(benchmark_result)
        print(f"测试完成: {config.test_name}")
        
        return benchmark_result
    
    def _warmup(self, test_function: Callable, duration: int):
        """预热"""
        end_time = time.time() + duration
        while time.time() < end_time:
            try:
                test_function()
            except:
                pass
            time.sleep(0.01)
    
    def _run_throughput_test(self, config: BenchmarkConfig, test_function: Callable) -> Dict[str, Any]:
        """运行吞吐量测试"""
        results = {
            'total_operations': 0,
            'successful_operations': 0,
            'failed_operations': 0,
            'latencies': [],
            'errors': []
        }
        
        def worker():
            worker_results = {
                'total': 0,
                'success': 0,
                'failed': 0,
                'latencies': [],
                'errors': []
            }
            
            if config.duration_seconds:
                end_time = time.time() + config.duration_seconds
                while time.time() < end_time:
                    success, latency, error = test_function()
                    worker_results['total'] += 1
                    if success:
                        worker_results['success'] += 1
                        worker_results['latencies'].append(latency)
                    else:
                        worker_results['failed'] += 1
                        if error:
                            worker_results['errors'].append(error)
            else:
                operations_per_thread = config.total_operations // config.concurrent_threads
                for _ in range(operations_per_thread):
                    success, latency, error = test_function()
                    worker_results['total'] += 1
                    if success:
                        worker_results['success'] += 1
                        worker_results['latencies'].append(latency)
                    else:
                        worker_results['failed'] += 1
                        if error:
                            worker_results['errors'].append(error)
            
            return worker_results
        
        # 并发执行
        with ThreadPoolExecutor(max_workers=config.concurrent_threads) as executor:
            futures = [executor.submit(worker) for _ in range(config.concurrent_threads)]
            
            for future in as_completed(futures):
                worker_result = future.result()
                results['total_operations'] += worker_result['total']
                results['successful_operations'] += worker_result['success']
                results['failed_operations'] += worker_result['failed']
                results['latencies'].extend(worker_result['latencies'])
                results['errors'].extend(worker_result['errors'])
        
        return results
    
    def _run_latency_test(self, config: BenchmarkConfig, test_function: Callable) -> Dict[str, Any]:
        """运行延迟测试(单线程)"""
        results = {
            'total_operations': 0,
            'successful_operations': 0,
            'failed_operations': 0,
            'latencies': [],
            'errors': []
        }
        
        if config.duration_seconds:
            end_time = time.time() + config.duration_seconds
            while time.time() < end_time:
                success, latency, error = test_function()
                results['total_operations'] += 1
                if success:
                    results['successful_operations'] += 1
                    results['latencies'].append(latency)
                else:
                    results['failed_operations'] += 1
                    if error:
                        results['errors'].append(error)
        else:
            for _ in range(config.total_operations or 1000):
                success, latency, error = test_function()
                results['total_operations'] += 1
                if success:
                    results['successful_operations'] += 1
                    results['latencies'].append(latency)
                else:
                    results['failed_operations'] += 1
                    if error:
                        results['errors'].append(error)
        
        return results
    
    def _run_stress_test(self, config: BenchmarkConfig, test_function: Callable) -> Dict[str, Any]:
        """运行压力测试"""
        # 压力测试使用更多线程
        stress_config = BenchmarkConfig(
            test_name=config.test_name,
            test_type=config.test_type,
            duration_seconds=config.duration_seconds,
            total_operations=config.total_operations,
            concurrent_threads=config.concurrent_threads * 2,  # 双倍线程
            warmup_seconds=0,
            cooldown_seconds=0,
            sample_interval=config.sample_interval
        )
        
        return self._run_throughput_test(stress_config, test_function)
    
    def _run_endurance_test(self, config: BenchmarkConfig, test_function: Callable) -> Dict[str, Any]:
        """运行耐久性测试"""
        # 耐久性测试运行更长时间
        endurance_config = BenchmarkConfig(
            test_name=config.test_name,
            test_type=config.test_type,
            duration_seconds=config.duration_seconds or 3600,  # 默认1小时
            concurrent_threads=config.concurrent_threads,
            warmup_seconds=0,
            cooldown_seconds=0,
            sample_interval=config.sample_interval
        )
        
        return self._run_throughput_test(endurance_config, test_function)
    
    def _run_generic_test(self, config: BenchmarkConfig, test_function: Callable) -> Dict[str, Any]:
        """运行通用测试"""
        return self._run_throughput_test(config, test_function)
    
    def _calculate_metrics(self, result: BenchmarkResult):
        """计算指标"""
        if result.latencies:
            result.metrics[MetricType.LATENCY_P50] = statistics.median(result.latencies)
            result.metrics[MetricType.LATENCY_P95] = self._percentile(result.latencies, 95)
            result.metrics[MetricType.LATENCY_P99] = self._percentile(result.latencies, 99)
        
        result.metrics[MetricType.TPS] = result.throughput
        result.metrics[MetricType.QPS] = result.throughput
        result.metrics[MetricType.ERROR_RATE] = result.error_rate
        
        if result.system_metrics:
            result.metrics[MetricType.CPU_USAGE] = result.system_metrics.get('cpu_avg', 0)
            result.metrics[MetricType.MEMORY_USAGE] = result.system_metrics.get('memory_avg', 0)
    
    def _percentile(self, data: List[float], percentile: int) -> float:
        """计算百分位数"""
        if not data:
            return 0.0
        sorted_data = sorted(data)
        index = int(len(sorted_data) * percentile / 100)
        return sorted_data[min(index, len(sorted_data) - 1)]
    
    def generate_report(self, output_file: Optional[str] = None) -> str:
        """生成测试报告"""
        if not self.results:
            return "没有测试结果"
        
        report = []
        report.append("# RocketMQ 性能基准测试报告")
        report.append(f"\n生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report.append(f"测试数量: {len(self.results)}\n")
        
        for result in self.results:
            report.append(f"## {result.test_name}")
            report.append(f"测试类型: {result.test_type.value}")
            report.append(f"测试时间: {result.start_time.strftime('%Y-%m-%d %H:%M:%S')} - {result.end_time.strftime('%Y-%m-%d %H:%M:%S')}")
            report.append(f"持续时间: {result.duration:.2f} 秒")
            report.append(f"总操作数: {result.total_operations:,}")
            report.append(f"成功操作数: {result.successful_operations:,}")
            report.append(f"失败操作数: {result.failed_operations:,}")
            report.append(f"成功率: {result.success_rate:.2%}")
            report.append(f"吞吐量: {result.throughput:.2f} ops/sec")
            
            if result.metrics:
                report.append("\n### 性能指标")
                for metric_type, value in result.metrics.items():
                    if metric_type in [MetricType.LATENCY_P50, MetricType.LATENCY_P95, MetricType.LATENCY_P99]:
                        report.append(f"{metric_type.value}: {value:.3f} ms")
                    elif metric_type in [MetricType.CPU_USAGE, MetricType.MEMORY_USAGE, MetricType.ERROR_RATE]:
                        report.append(f"{metric_type.value}: {value:.2%}")
                    else:
                        report.append(f"{metric_type.value}: {value:.2f}")
            
            if result.system_metrics:
                report.append("\n### 系统指标")
                for key, value in result.system_metrics.items():
                    if isinstance(value, float):
                        report.append(f"{key}: {value:.2f}")
                    else:
                        report.append(f"{key}: {value}")
            
            if result.errors:
                report.append(f"\n### 错误信息 (前10个)")
                for error in result.errors[:10]:
                    report.append(f"- {error}")
            
            report.append("\n" + "-" * 50 + "\n")
        
        report_text = "\n".join(report)
        
        if output_file:
            with open(output_file, 'w', encoding='utf-8') as f:
                f.write(report_text)
            print(f"报告已保存到: {output_file}")
        
        return report_text
    
    def export_csv(self, output_file: str):
        """导出CSV格式结果"""
        if not self.results:
            return
        
        with open(output_file, 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            
            # 写入标题行
            headers = [
                'test_name', 'test_type', 'start_time', 'duration',
                'total_operations', 'successful_operations', 'failed_operations',
                'success_rate', 'throughput', 'latency_p50', 'latency_p95', 'latency_p99',
                'cpu_avg', 'memory_avg'
            ]
            writer.writerow(headers)
            
            # 写入数据行
            for result in self.results:
                row = [
                    result.test_name,
                    result.test_type.value,
                    result.start_time.strftime('%Y-%m-%d %H:%M:%S'),
                    result.duration,
                    result.total_operations,
                    result.successful_operations,
                    result.failed_operations,
                    result.success_rate,
                    result.throughput,
                    result.metrics.get(MetricType.LATENCY_P50, 0),
                    result.metrics.get(MetricType.LATENCY_P95, 0),
                    result.metrics.get(MetricType.LATENCY_P99, 0),
                    result.system_metrics.get('cpu_avg', 0),
                    result.system_metrics.get('memory_avg', 0)
                ]
                writer.writerow(row)
        
        print(f"CSV结果已保存到: {output_file}")

# 使用示例
if __name__ == "__main__":
    import random
    
    # 模拟测试函数
    def mock_send_message() -> Tuple[bool, float, Optional[str]]:
        """模拟发送消息"""
        start_time = time.time()
        
        # 模拟处理时间
        processing_time = random.uniform(0.001, 0.01)  # 1-10ms
        time.sleep(processing_time)
        
        # 模拟成功/失败
        success = random.random() > 0.05  # 95%成功率
        
        latency = (time.time() - start_time) * 1000  # 转换为毫秒
        error = None if success else "模拟发送失败"
        
        return success, latency, error
    
    # 创建基准测试运行器
    runner = BenchmarkRunner()
    
    # 吞吐量测试
    throughput_config = BenchmarkConfig(
        test_name="消息发送吞吐量测试",
        test_type=BenchmarkType.THROUGHPUT,
        duration_seconds=30,
        concurrent_threads=10,
        warmup_seconds=5
    )
    
    runner.run_benchmark(throughput_config, mock_send_message)
    
    # 延迟测试
    latency_config = BenchmarkConfig(
        test_name="消息发送延迟测试",
        test_type=BenchmarkType.LATENCY,
        total_operations=1000,
        concurrent_threads=1,
        warmup_seconds=5
    )
    
    runner.run_benchmark(latency_config, mock_send_message)
    
    # 生成报告
    report = runner.generate_report("benchmark_report.md")
    print(report)
    
    # 导出CSV
     runner.export_csv("benchmark_results.csv")

8.6.2 RocketMQ专用基准测试

from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Any, Tuple
import time
import threading
import json
from concurrent.futures import ThreadPoolExecutor
import uuid
from rocketmq.client import Producer, PushConsumer, Message
from rocketmq import SendStatus

class RocketMQTestType(Enum):
    """RocketMQ测试类型"""
    PRODUCER_THROUGHPUT = "producer_throughput"  # 生产者吞吐量
    CONSUMER_THROUGHPUT = "consumer_throughput"  # 消费者吞吐量
    END_TO_END_LATENCY = "end_to_end_latency"  # 端到端延迟
    BATCH_SEND = "batch_send"  # 批量发送
    ORDERED_MESSAGE = "ordered_message"  # 顺序消息
    TRANSACTION_MESSAGE = "transaction_message"  # 事务消息
    DELAYED_MESSAGE = "delayed_message"  # 延迟消息

@dataclass
class RocketMQBenchmarkConfig:
    """RocketMQ基准测试配置"""
    test_type: RocketMQTestType
    nameserver_addr: str
    topic: str
    group_id: str
    message_size: int = 1024  # 消息大小(字节)
    batch_size: int = 1  # 批量大小
    producer_count: int = 1  # 生产者数量
    consumer_count: int = 1  # 消费者数量
    test_duration: int = 60  # 测试持续时间(秒)
    warmup_duration: int = 10  # 预热时间(秒)
    
class RocketMQBenchmark:
    """RocketMQ基准测试"""
    
    def __init__(self, config: RocketMQBenchmarkConfig):
        self.config = config
        self.producers = []
        self.consumers = []
        self.results = {
            'sent_count': 0,
            'received_count': 0,
            'send_failures': 0,
            'consume_failures': 0,
            'send_latencies': [],
            'consume_latencies': [],
            'end_to_end_latencies': [],
            'errors': []
        }
        self.lock = threading.Lock()
        self.running = False
        
    def setup(self):
        """设置测试环境"""
        try:
            # 创建生产者
            for i in range(self.config.producer_count):
                producer = Producer(f"{self.config.group_id}_producer_{i}")
                producer.set_name_server_address(self.config.nameserver_addr)
                producer.start()
                self.producers.append(producer)
            
            # 创建消费者
            for i in range(self.config.consumer_count):
                consumer = PushConsumer(f"{self.config.group_id}_consumer_{i}")
                consumer.set_name_server_address(self.config.nameserver_addr)
                consumer.subscribe(self.config.topic, self._consume_callback)
                consumer.start()
                self.consumers.append(consumer)
                
            print(f"创建了 {len(self.producers)} 个生产者和 {len(self.consumers)} 个消费者")
            
        except Exception as e:
            print(f"设置失败: {e}")
            raise
    
    def cleanup(self):
        """清理测试环境"""
        # 停止生产者
        for producer in self.producers:
            try:
                producer.shutdown()
            except:
                pass
        
        # 停止消费者
        for consumer in self.consumers:
            try:
                consumer.shutdown()
            except:
                pass
        
        self.producers.clear()
        self.consumers.clear()
    
    def _consume_callback(self, message):
        """消费回调函数"""
        try:
            consume_start = time.time()
            
            # 解析消息获取发送时间
            message_data = json.loads(message.body.decode('utf-8'))
            send_time = message_data.get('send_time', consume_start)
            
            # 计算端到端延迟
            end_to_end_latency = (consume_start - send_time) * 1000
            
            with self.lock:
                self.results['received_count'] += 1
                self.results['end_to_end_latencies'].append(end_to_end_latency)
            
            return True
            
        except Exception as e:
            with self.lock:
                self.results['consume_failures'] += 1
                self.results['errors'].append(f"消费失败: {e}")
            return False
    
    def _create_message(self, producer_id: int, sequence: int) -> Message:
        """创建测试消息"""
        message_data = {
            'producer_id': producer_id,
            'sequence': sequence,
            'send_time': time.time(),
            'message_id': str(uuid.uuid4()),
            'payload': 'x' * (self.config.message_size - 200)  # 预留元数据空间
        }
        
        message_body = json.dumps(message_data).encode('utf-8')
        message = Message(self.config.topic)
        message.set_body(message_body)
        
        return message
    
    def run_producer_throughput_test(self) -> Dict[str, Any]:
        """运行生产者吞吐量测试"""
        print("开始生产者吞吐量测试...")
        
        def producer_worker(producer_id: int):
            producer = self.producers[producer_id]
            sequence = 0
            
            while self.running:
                try:
                    send_start = time.time()
                    
                    if self.config.batch_size == 1:
                        # 单条发送
                        message = self._create_message(producer_id, sequence)
                        result = producer.send_sync(message)
                        
                        if result.status == SendStatus.OK:
                            send_latency = (time.time() - send_start) * 1000
                            with self.lock:
                                self.results['sent_count'] += 1
                                self.results['send_latencies'].append(send_latency)
                        else:
                            with self.lock:
                                self.results['send_failures'] += 1
                    else:
                        # 批量发送
                        messages = []
                        for i in range(self.config.batch_size):
                            message = self._create_message(producer_id, sequence + i)
                            messages.append(message)
                        
                        # 注意: RocketMQ Python客户端可能不支持批量发送,这里模拟
                        success_count = 0
                        for message in messages:
                            result = producer.send_sync(message)
                            if result.status == SendStatus.OK:
                                success_count += 1
                        
                        send_latency = (time.time() - send_start) * 1000
                        with self.lock:
                            self.results['sent_count'] += success_count
                            self.results['send_failures'] += (self.config.batch_size - success_count)
                            self.results['send_latencies'].append(send_latency)
                    
                    sequence += self.config.batch_size
                    
                except Exception as e:
                    with self.lock:
                        self.results['send_failures'] += 1
                        self.results['errors'].append(f"发送失败: {e}")
        
        # 启动生产者线程
        self.running = True
        threads = []
        
        for i in range(len(self.producers)):
            thread = threading.Thread(target=producer_worker, args=(i,))
            thread.start()
            threads.append(thread)
        
        # 运行指定时间
        time.sleep(self.config.test_duration)
        self.running = False
        
        # 等待线程结束
        for thread in threads:
            thread.join()
        
        return self._calculate_results()
    
    def run_end_to_end_latency_test(self) -> Dict[str, Any]:
        """运行端到端延迟测试"""
        print("开始端到端延迟测试...")
        
        # 等待消费者准备就绪
        time.sleep(2)
        
        def latency_producer():
            producer = self.producers[0]
            sequence = 0
            
            while self.running:
                try:
                    message = self._create_message(0, sequence)
                    result = producer.send_sync(message)
                    
                    if result.status == SendStatus.OK:
                        with self.lock:
                            self.results['sent_count'] += 1
                    else:
                        with self.lock:
                            self.results['send_failures'] += 1
                    
                    sequence += 1
                    time.sleep(0.1)  # 控制发送频率
                    
                except Exception as e:
                    with self.lock:
                        self.results['send_failures'] += 1
                        self.results['errors'].append(f"发送失败: {e}")
        
        # 启动延迟测试
        self.running = True
        producer_thread = threading.Thread(target=latency_producer)
        producer_thread.start()
        
        # 运行指定时间
        time.sleep(self.config.test_duration)
        self.running = False
        
        producer_thread.join()
        
        # 等待消息消费完成
        time.sleep(5)
        
        return self._calculate_results()
    
    def run_ordered_message_test(self) -> Dict[str, Any]:
        """运行顺序消息测试"""
        print("开始顺序消息测试...")
        
        def ordered_producer():
            producer = self.producers[0]
            sequence = 0
            
            while self.running:
                try:
                    message = self._create_message(0, sequence)
                    # 设置消息队列选择器确保顺序
                    message.set_keys(f"order_key_{sequence % 10}")
                    
                    result = producer.send_sync(message)
                    
                    if result.status == SendStatus.OK:
                        with self.lock:
                            self.results['sent_count'] += 1
                    else:
                        with self.lock:
                            self.results['send_failures'] += 1
                    
                    sequence += 1
                    time.sleep(0.01)  # 控制发送频率
                    
                except Exception as e:
                    with self.lock:
                        self.results['send_failures'] += 1
                        self.results['errors'].append(f"发送失败: {e}")
        
        self.running = True
        producer_thread = threading.Thread(target=ordered_producer)
        producer_thread.start()
        
        time.sleep(self.config.test_duration)
        self.running = False
        
        producer_thread.join()
        time.sleep(5)  # 等待消费完成
        
        return self._calculate_results()
    
    def run_delayed_message_test(self) -> Dict[str, Any]:
        """运行延迟消息测试"""
        print("开始延迟消息测试...")
        
        def delayed_producer():
            producer = self.producers[0]
            sequence = 0
            
            while self.running:
                try:
                    message = self._create_message(0, sequence)
                    # 设置延迟级别 (1-18, 对应不同的延迟时间)
                    message.set_delay_time_level(3)  # 10秒延迟
                    
                    result = producer.send_sync(message)
                    
                    if result.status == SendStatus.OK:
                        with self.lock:
                            self.results['sent_count'] += 1
                    else:
                        with self.lock:
                            self.results['send_failures'] += 1
                    
                    sequence += 1
                    time.sleep(1)  # 控制发送频率
                    
                except Exception as e:
                    with self.lock:
                        self.results['send_failures'] += 1
                        self.results['errors'].append(f"发送失败: {e}")
        
        self.running = True
        producer_thread = threading.Thread(target=delayed_producer)
        producer_thread.start()
        
        time.sleep(self.config.test_duration)
        self.running = False
        
        producer_thread.join()
        time.sleep(15)  # 等待延迟消息和消费完成
        
        return self._calculate_results()
    
    def _calculate_results(self) -> Dict[str, Any]:
        """计算测试结果"""
        with self.lock:
            total_sent = self.results['sent_count']
            total_received = self.results['received_count']
            send_failures = self.results['send_failures']
            consume_failures = self.results['consume_failures']
            
            # 计算吞吐量
            send_tps = total_sent / self.config.test_duration if self.config.test_duration > 0 else 0
            receive_tps = total_received / self.config.test_duration if self.config.test_duration > 0 else 0
            
            # 计算延迟统计
            send_latencies = self.results['send_latencies']
            end_to_end_latencies = self.results['end_to_end_latencies']
            
            result = {
                'test_type': self.config.test_type.value,
                'duration': self.config.test_duration,
                'message_size': self.config.message_size,
                'batch_size': self.config.batch_size,
                'producer_count': self.config.producer_count,
                'consumer_count': self.config.consumer_count,
                'total_sent': total_sent,
                'total_received': total_received,
                'send_failures': send_failures,
                'consume_failures': consume_failures,
                'send_tps': send_tps,
                'receive_tps': receive_tps,
                'send_success_rate': total_sent / (total_sent + send_failures) if (total_sent + send_failures) > 0 else 0,
                'consume_success_rate': total_received / (total_received + consume_failures) if (total_received + consume_failures) > 0 else 0,
                'errors': self.results['errors'][:10]  # 只保留前10个错误
            }
            
            if send_latencies:
                result.update({
                    'send_latency_avg': sum(send_latencies) / len(send_latencies),
                    'send_latency_p50': self._percentile(send_latencies, 50),
                    'send_latency_p95': self._percentile(send_latencies, 95),
                    'send_latency_p99': self._percentile(send_latencies, 99)
                })
            
            if end_to_end_latencies:
                result.update({
                    'e2e_latency_avg': sum(end_to_end_latencies) / len(end_to_end_latencies),
                    'e2e_latency_p50': self._percentile(end_to_end_latencies, 50),
                    'e2e_latency_p95': self._percentile(end_to_end_latencies, 95),
                    'e2e_latency_p99': self._percentile(end_to_end_latencies, 99)
                })
            
            return result
    
    def _percentile(self, data: List[float], percentile: int) -> float:
        """计算百分位数"""
        if not data:
            return 0.0
        sorted_data = sorted(data)
        index = int(len(sorted_data) * percentile / 100)
        return sorted_data[min(index, len(sorted_data) - 1)]

class RocketMQBenchmarkSuite:
    """RocketMQ基准测试套件"""
    
    def __init__(self, nameserver_addr: str, base_topic: str, base_group: str):
        self.nameserver_addr = nameserver_addr
        self.base_topic = base_topic
        self.base_group = base_group
        self.results = []
    
    def run_full_benchmark(self) -> List[Dict[str, Any]]:
        """运行完整的基准测试套件"""
        test_configs = [
            # 生产者吞吐量测试
            RocketMQBenchmarkConfig(
                test_type=RocketMQTestType.PRODUCER_THROUGHPUT,
                nameserver_addr=self.nameserver_addr,
                topic=f"{self.base_topic}_throughput",
                group_id=f"{self.base_group}_throughput",
                message_size=1024,
                producer_count=5,
                consumer_count=5,
                test_duration=60
            ),
            
            # 端到端延迟测试
            RocketMQBenchmarkConfig(
                test_type=RocketMQTestType.END_TO_END_LATENCY,
                nameserver_addr=self.nameserver_addr,
                topic=f"{self.base_topic}_latency",
                group_id=f"{self.base_group}_latency",
                message_size=512,
                producer_count=1,
                consumer_count=1,
                test_duration=30
            ),
            
            # 批量发送测试
            RocketMQBenchmarkConfig(
                test_type=RocketMQTestType.BATCH_SEND,
                nameserver_addr=self.nameserver_addr,
                topic=f"{self.base_topic}_batch",
                group_id=f"{self.base_group}_batch",
                message_size=1024,
                batch_size=10,
                producer_count=3,
                consumer_count=3,
                test_duration=45
            ),
            
            # 顺序消息测试
            RocketMQBenchmarkConfig(
                test_type=RocketMQTestType.ORDERED_MESSAGE,
                nameserver_addr=self.nameserver_addr,
                topic=f"{self.base_topic}_ordered",
                group_id=f"{self.base_group}_ordered",
                message_size=512,
                producer_count=1,
                consumer_count=1,
                test_duration=30
            ),
            
            # 延迟消息测试
            RocketMQBenchmarkConfig(
                test_type=RocketMQTestType.DELAYED_MESSAGE,
                nameserver_addr=self.nameserver_addr,
                topic=f"{self.base_topic}_delayed",
                group_id=f"{self.base_group}_delayed",
                message_size=512,
                producer_count=1,
                consumer_count=1,
                test_duration=30
            )
        ]
        
        for config in test_configs:
            print(f"\n{'='*50}")
            print(f"开始测试: {config.test_type.value}")
            print(f"{'='*50}")
            
            benchmark = RocketMQBenchmark(config)
            
            try:
                benchmark.setup()
                
                # 预热
                if config.warmup_duration > 0:
                    print(f"预热 {config.warmup_duration} 秒...")
                    time.sleep(config.warmup_duration)
                
                # 运行测试
                if config.test_type == RocketMQTestType.PRODUCER_THROUGHPUT:
                    result = benchmark.run_producer_throughput_test()
                elif config.test_type == RocketMQTestType.END_TO_END_LATENCY:
                    result = benchmark.run_end_to_end_latency_test()
                elif config.test_type == RocketMQTestType.ORDERED_MESSAGE:
                    result = benchmark.run_ordered_message_test()
                elif config.test_type == RocketMQTestType.DELAYED_MESSAGE:
                    result = benchmark.run_delayed_message_test()
                else:
                    result = benchmark.run_producer_throughput_test()
                
                self.results.append(result)
                self._print_result(result)
                
            except Exception as e:
                print(f"测试失败: {e}")
                
            finally:
                benchmark.cleanup()
                time.sleep(5)  # 测试间隔
        
        return self.results
    
    def _print_result(self, result: Dict[str, Any]):
        """打印测试结果"""
        print(f"\n测试结果:")
        print(f"  测试类型: {result['test_type']}")
        print(f"  持续时间: {result['duration']} 秒")
        print(f"  消息大小: {result['message_size']} 字节")
        print(f"  发送TPS: {result['send_tps']:.2f}")
        print(f"  接收TPS: {result['receive_tps']:.2f}")
        print(f"  发送成功率: {result['send_success_rate']:.2%}")
        print(f"  消费成功率: {result['consume_success_rate']:.2%}")
        
        if 'send_latency_avg' in result:
            print(f"  发送延迟(平均): {result['send_latency_avg']:.2f} ms")
            print(f"  发送延迟(P95): {result['send_latency_p95']:.2f} ms")
        
        if 'e2e_latency_avg' in result:
            print(f"  端到端延迟(平均): {result['e2e_latency_avg']:.2f} ms")
            print(f"  端到端延迟(P95): {result['e2e_latency_p95']:.2f} ms")
        
        if result['errors']:
            print(f"  错误示例: {result['errors'][0]}")
    
    def generate_benchmark_report(self, output_file: str):
        """生成基准测试报告"""
        if not self.results:
            return
        
        report = []
        report.append("# RocketMQ 专用基准测试报告")
        report.append(f"\n生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
        report.append(f"NameServer: {self.nameserver_addr}")
        report.append(f"测试数量: {len(self.results)}\n")
        
        for result in self.results:
            report.append(f"## {result['test_type']} 测试结果")
            report.append(f"- 持续时间: {result['duration']} 秒")
            report.append(f"- 消息大小: {result['message_size']} 字节")
            report.append(f"- 生产者数量: {result['producer_count']}")
            report.append(f"- 消费者数量: {result['consumer_count']}")
            report.append(f"- 发送TPS: {result['send_tps']:.2f}")
            report.append(f"- 接收TPS: {result['receive_tps']:.2f}")
            report.append(f"- 发送成功率: {result['send_success_rate']:.2%}")
            report.append(f"- 消费成功率: {result['consume_success_rate']:.2%}")
            
            if 'send_latency_avg' in result:
                report.append(f"- 发送延迟(平均): {result['send_latency_avg']:.2f} ms")
                report.append(f"- 发送延迟(P50): {result['send_latency_p50']:.2f} ms")
                report.append(f"- 发送延迟(P95): {result['send_latency_p95']:.2f} ms")
                report.append(f"- 发送延迟(P99): {result['send_latency_p99']:.2f} ms")
            
            if 'e2e_latency_avg' in result:
                report.append(f"- 端到端延迟(平均): {result['e2e_latency_avg']:.2f} ms")
                report.append(f"- 端到端延迟(P50): {result['e2e_latency_p50']:.2f} ms")
                report.append(f"- 端到端延迟(P95): {result['e2e_latency_p95']:.2f} ms")
                report.append(f"- 端到端延迟(P99): {result['e2e_latency_p99']:.2f} ms")
            
            report.append("")
        
        with open(output_file, 'w', encoding='utf-8') as f:
            f.write('\n'.join(report))
        
        print(f"基准测试报告已保存到: {output_file}")

# 使用示例
if __name__ == "__main__":
    # 配置RocketMQ连接信息
    nameserver_addr = "localhost:9876"
    base_topic = "benchmark_test"
    base_group = "benchmark_group"
    
    # 创建基准测试套件
    benchmark_suite = RocketMQBenchmarkSuite(nameserver_addr, base_topic, base_group)
    
    try:
        # 运行完整的基准测试
        results = benchmark_suite.run_full_benchmark()
        
        # 生成报告
        benchmark_suite.generate_benchmark_report("rocketmq_benchmark_report.md")
        
        print(f"\n{'='*50}")
        print("所有基准测试完成!")
        print(f"{'='*50}")
        
    except Exception as e:
         print(f"基准测试失败: {e}")

8.7 性能调优最佳实践

8.7.1 调优策略和方法

from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Any, Tuple
import json
import time
from abc import ABC, abstractmethod

class TuningPhase(Enum):
    """调优阶段"""
    BASELINE = "baseline"  # 基线测试
    ANALYSIS = "analysis"  # 性能分析
    OPTIMIZATION = "optimization"  # 优化实施
    VALIDATION = "validation"  # 验证测试
    MONITORING = "monitoring"  # 持续监控

class OptimizationTarget(Enum):
    """优化目标"""
    THROUGHPUT = "throughput"  # 吞吐量
    LATENCY = "latency"  # 延迟
    RESOURCE_USAGE = "resource_usage"  # 资源使用率
    STABILITY = "stability"  # 稳定性
    COST = "cost"  # 成本

class TuningScope(Enum):
    """调优范围"""
    BROKER = "broker"  # Broker配置
    PRODUCER = "producer"  # 生产者配置
    CONSUMER = "consumer"  # 消费者配置
    SYSTEM = "system"  # 系统配置
    NETWORK = "network"  # 网络配置
    STORAGE = "storage"  # 存储配置

@dataclass
class TuningParameter:
    """调优参数"""
    name: str
    scope: TuningScope
    current_value: Any
    recommended_value: Any
    impact_level: str  # high, medium, low
    description: str
    validation_method: str

@dataclass
class TuningResult:
    """调优结果"""
    parameter: TuningParameter
    before_metrics: Dict[str, float]
    after_metrics: Dict[str, float]
    improvement: Dict[str, float]
    success: bool
    notes: str

class PerformanceTuner(ABC):
    """性能调优器基类"""
    
    @abstractmethod
    def analyze_current_state(self) -> Dict[str, Any]:
        """分析当前状态"""
        pass
    
    @abstractmethod
    def generate_recommendations(self) -> List[TuningParameter]:
        """生成调优建议"""
        pass
    
    @abstractmethod
    def apply_tuning(self, parameter: TuningParameter) -> TuningResult:
        """应用调优"""
        pass

class BrokerTuner(PerformanceTuner):
    """Broker调优器"""
    
    def __init__(self, broker_config_path: str):
        self.broker_config_path = broker_config_path
        self.current_config = self._load_config()
        self.baseline_metrics = {}
    
    def _load_config(self) -> Dict[str, Any]:
        """加载当前配置"""
        try:
            with open(self.broker_config_path, 'r') as f:
                # 简化的配置解析
                config = {}
                for line in f:
                    if '=' in line and not line.strip().startswith('#'):
                        key, value = line.strip().split('=', 1)
                        config[key] = value
                return config
        except Exception as e:
            print(f"加载配置失败: {e}")
            return {}
    
    def analyze_current_state(self) -> Dict[str, Any]:
        """分析当前Broker状态"""
        analysis = {
            'config_analysis': self._analyze_config(),
            'resource_analysis': self._analyze_resources(),
            'performance_analysis': self._analyze_performance()
        }
        return analysis
    
    def _analyze_config(self) -> Dict[str, Any]:
        """分析配置"""
        config_issues = []
        recommendations = []
        
        # 检查关键配置项
        key_configs = {
            'sendMessageThreadPoolNums': {'min': 16, 'max': 64, 'default': 16},
            'pullMessageThreadPoolNums': {'min': 16, 'max': 64, 'default': 16},
            'flushDiskType': {'recommended': 'ASYNC_FLUSH'},
            'brokerRole': {'recommended': 'ASYNC_MASTER'},
            'deleteWhen': {'recommended': '04'},
            'fileReservedTime': {'min': 48, 'recommended': 72}
        }
        
        for param, constraints in key_configs.items():
            current_value = self.current_config.get(param)
            
            if current_value is None:
                config_issues.append(f"缺少关键配置: {param}")
                if 'default' in constraints:
                    recommendations.append(f"设置 {param}={constraints['default']}")
                elif 'recommended' in constraints:
                    recommendations.append(f"设置 {param}={constraints['recommended']}")
            else:
                # 检查数值范围
                if 'min' in constraints and 'max' in constraints:
                    try:
                        value = int(current_value)
                        if value < constraints['min']:
                            recommendations.append(f"增加 {param} 到至少 {constraints['min']}")
                        elif value > constraints['max']:
                            recommendations.append(f"减少 {param} 到最多 {constraints['max']}")
                    except ValueError:
                        config_issues.append(f"配置值格式错误: {param}={current_value}")
                
                # 检查推荐值
                if 'recommended' in constraints and current_value != constraints['recommended']:
                    recommendations.append(f"建议设置 {param}={constraints['recommended']}")
        
        return {
            'issues': config_issues,
            'recommendations': recommendations,
            'config_score': max(0, 100 - len(config_issues) * 10 - len(recommendations) * 5)
        }
    
    def _analyze_resources(self) -> Dict[str, Any]:
        """分析资源使用"""
        # 模拟资源分析
        return {
            'memory_usage': 75.5,  # 百分比
            'cpu_usage': 45.2,
            'disk_usage': 60.8,
            'network_usage': 30.1,
            'gc_frequency': 12.5,  # 次/分钟
            'recommendations': [
                "考虑增加堆内存大小",
                "优化GC参数",
                "监控磁盘I/O性能"
            ]
        }
    
    def _analyze_performance(self) -> Dict[str, Any]:
        """分析性能指标"""
        # 模拟性能分析
        return {
            'send_tps': 8500,
            'consume_tps': 8200,
            'avg_send_latency': 2.5,  # ms
            'p99_send_latency': 15.8,
            'message_accumulation': 1250,
            'bottlenecks': [
                "磁盘写入延迟较高",
                "消费者处理速度不足",
                "网络带宽利用率偏低"
            ]
        }
    
    def generate_recommendations(self) -> List[TuningParameter]:
        """生成Broker调优建议"""
        analysis = self.analyze_current_state()
        recommendations = []
        
        # 基于分析结果生成具体的调优参数
        config_analysis = analysis['config_analysis']
        resource_analysis = analysis['resource_analysis']
        performance_analysis = analysis['performance_analysis']
        
        # 线程池调优
        if performance_analysis['send_tps'] > 5000:
            recommendations.append(TuningParameter(
                name="sendMessageThreadPoolNums",
                scope=TuningScope.BROKER,
                current_value=self.current_config.get('sendMessageThreadPoolNums', '16'),
                recommended_value='32',
                impact_level='high',
                description="增加发送消息线程池大小以提高吞吐量",
                validation_method="监控发送TPS和CPU使用率"
            ))
        
        # 内存调优
        if resource_analysis['memory_usage'] > 70:
            recommendations.append(TuningParameter(
                name="-Xmx",
                scope=TuningScope.BROKER,
                current_value="4g",
                recommended_value="6g",
                impact_level='high',
                description="增加JVM堆内存大小",
                validation_method="监控内存使用率和GC频率"
            ))
        
        # 磁盘刷盘策略
        if performance_analysis['avg_send_latency'] > 5:
            recommendations.append(TuningParameter(
                name="flushDiskType",
                scope=TuningScope.BROKER,
                current_value=self.current_config.get('flushDiskType', 'SYNC_FLUSH'),
                recommended_value='ASYNC_FLUSH',
                impact_level='medium',
                description="使用异步刷盘降低延迟",
                validation_method="监控发送延迟和数据安全性"
            ))
        
        # 消息存储优化
        recommendations.append(TuningParameter(
            name="mapedFileSizeCommitLog",
            scope=TuningScope.BROKER,
            current_value=self.current_config.get('mapedFileSizeCommitLog', '1073741824'),
            recommended_value='2147483648',  # 2GB
            impact_level='medium',
            description="增加CommitLog文件大小减少文件切换",
            validation_method="监控磁盘I/O和文件数量"
        ))
        
        return recommendations
    
    def apply_tuning(self, parameter: TuningParameter) -> TuningResult:
        """应用Broker调优"""
        # 记录调优前的指标
        before_metrics = self._collect_metrics()
        
        try:
            # 应用配置更改
            self._update_config(parameter.name, parameter.recommended_value)
            
            # 等待配置生效
            time.sleep(5)
            
            # 收集调优后的指标
            after_metrics = self._collect_metrics()
            
            # 计算改进
            improvement = self._calculate_improvement(before_metrics, after_metrics)
            
            return TuningResult(
                parameter=parameter,
                before_metrics=before_metrics,
                after_metrics=after_metrics,
                improvement=improvement,
                success=True,
                notes="调优成功应用"
            )
            
        except Exception as e:
            return TuningResult(
                parameter=parameter,
                before_metrics=before_metrics,
                after_metrics={},
                improvement={},
                success=False,
                notes=f"调优失败: {e}"
            )
    
    def _collect_metrics(self) -> Dict[str, float]:
        """收集性能指标"""
        # 模拟指标收集
        return {
            'send_tps': 8500 + (time.time() % 1000),
            'consume_tps': 8200 + (time.time() % 800),
            'avg_latency': 2.5 + (time.time() % 2),
            'memory_usage': 75.5 + (time.time() % 10),
            'cpu_usage': 45.2 + (time.time() % 20)
        }
    
    def _update_config(self, param_name: str, param_value: str):
        """更新配置"""
        # 模拟配置更新
        self.current_config[param_name] = param_value
        print(f"已更新配置: {param_name}={param_value}")
    
    def _calculate_improvement(self, before: Dict[str, float], after: Dict[str, float]) -> Dict[str, float]:
        """计算改进百分比"""
        improvement = {}
        for key in before:
            if key in after and before[key] > 0:
                improvement[key] = ((after[key] - before[key]) / before[key]) * 100
        return improvement

class TuningOrchestrator:
    """调优编排器"""
    
    def __init__(self):
        self.tuners = {}
        self.tuning_history = []
        self.current_phase = TuningPhase.BASELINE
    
    def register_tuner(self, scope: TuningScope, tuner: PerformanceTuner):
        """注册调优器"""
        self.tuners[scope] = tuner
    
    def execute_tuning_plan(self, target: OptimizationTarget, scopes: List[TuningScope]) -> Dict[str, Any]:
        """执行调优计划"""
        plan_results = {
            'target': target.value,
            'scopes': [scope.value for scope in scopes],
            'phases': {},
            'overall_success': True,
            'total_improvements': {}
        }
        
        # 阶段1: 基线测试
        self.current_phase = TuningPhase.BASELINE
        baseline_metrics = self._collect_baseline_metrics(scopes)
        plan_results['phases']['baseline'] = baseline_metrics
        
        # 阶段2: 分析
        self.current_phase = TuningPhase.ANALYSIS
        analysis_results = self._perform_analysis(scopes)
        plan_results['phases']['analysis'] = analysis_results
        
        # 阶段3: 优化实施
        self.current_phase = TuningPhase.OPTIMIZATION
        optimization_results = self._perform_optimization(scopes, target)
        plan_results['phases']['optimization'] = optimization_results
        
        # 阶段4: 验证
        self.current_phase = TuningPhase.VALIDATION
        validation_results = self._perform_validation(baseline_metrics)
        plan_results['phases']['validation'] = validation_results
        
        # 阶段5: 监控设置
        self.current_phase = TuningPhase.MONITORING
        monitoring_setup = self._setup_monitoring(scopes)
        plan_results['phases']['monitoring'] = monitoring_setup
        
        # 计算总体改进
        plan_results['total_improvements'] = self._calculate_total_improvements(
            baseline_metrics, validation_results
        )
        
        return plan_results
    
    def _collect_baseline_metrics(self, scopes: List[TuningScope]) -> Dict[str, Any]:
        """收集基线指标"""
        baseline = {}
        
        for scope in scopes:
            if scope in self.tuners:
                tuner = self.tuners[scope]
                baseline[scope.value] = tuner.analyze_current_state()
        
        return baseline
    
    def _perform_analysis(self, scopes: List[TuningScope]) -> Dict[str, Any]:
        """执行性能分析"""
        analysis = {}
        
        for scope in scopes:
            if scope in self.tuners:
                tuner = self.tuners[scope]
                recommendations = tuner.generate_recommendations()
                analysis[scope.value] = {
                    'recommendation_count': len(recommendations),
                    'high_impact_count': len([r for r in recommendations if r.impact_level == 'high']),
                    'recommendations': [{
                        'name': r.name,
                        'current': r.current_value,
                        'recommended': r.recommended_value,
                        'impact': r.impact_level,
                        'description': r.description
                    } for r in recommendations]
                }
        
        return analysis
    
    def _perform_optimization(self, scopes: List[TuningScope], target: OptimizationTarget) -> Dict[str, Any]:
        """执行优化"""
        optimization = {}
        
        for scope in scopes:
            if scope in self.tuners:
                tuner = self.tuners[scope]
                recommendations = tuner.generate_recommendations()
                
                # 根据目标过滤和排序建议
                filtered_recommendations = self._filter_recommendations_by_target(
                    recommendations, target
                )
                
                scope_results = []
                for recommendation in filtered_recommendations[:3]:  # 限制每次最多3个调优
                    result = tuner.apply_tuning(recommendation)
                    scope_results.append({
                        'parameter': recommendation.name,
                        'success': result.success,
                        'improvement': result.improvement,
                        'notes': result.notes
                    })
                    self.tuning_history.append(result)
                
                optimization[scope.value] = scope_results
        
        return optimization
    
    def _filter_recommendations_by_target(self, recommendations: List[TuningParameter], 
                                         target: OptimizationTarget) -> List[TuningParameter]:
        """根据目标过滤建议"""
        # 根据优化目标排序建议
        target_priorities = {
            OptimizationTarget.THROUGHPUT: ['sendMessageThreadPoolNums', 'pullMessageThreadPoolNums'],
            OptimizationTarget.LATENCY: ['flushDiskType', 'brokerRole'],
            OptimizationTarget.RESOURCE_USAGE: ['-Xmx', '-Xms'],
            OptimizationTarget.STABILITY: ['fileReservedTime', 'deleteWhen'],
            OptimizationTarget.COST: ['mapedFileSizeCommitLog', 'mapedFileSizeConsumeQueue']
        }
        
        priority_params = target_priorities.get(target, [])
        
        # 按优先级和影响级别排序
        def sort_key(rec):
            priority_score = 0 if rec.name in priority_params else 1
            impact_score = {'high': 0, 'medium': 1, 'low': 2}[rec.impact_level]
            return (priority_score, impact_score)
        
        return sorted(recommendations, key=sort_key)
    
    def _perform_validation(self, baseline_metrics: Dict[str, Any]) -> Dict[str, Any]:
        """执行验证测试"""
        # 模拟验证测试
        validation = {
            'test_duration': 300,  # 5分钟
            'metrics_comparison': {},
            'regression_detected': False,
            'validation_passed': True
        }
        
        # 模拟指标对比
        for scope in baseline_metrics:
            validation['metrics_comparison'][scope] = {
                'throughput_improvement': 15.5,
                'latency_improvement': -8.2,  # 负数表示延迟降低
                'resource_efficiency': 12.3
            }
        
        return validation
    
    def _setup_monitoring(self, scopes: List[TuningScope]) -> Dict[str, Any]:
        """设置监控"""
        monitoring = {
            'alerts_configured': [],
            'dashboards_created': [],
            'monitoring_frequency': '1分钟'
        }
        
        for scope in scopes:
            monitoring['alerts_configured'].append(f"{scope.value}_performance_alert")
            monitoring['dashboards_created'].append(f"{scope.value}_performance_dashboard")
        
        return monitoring
    
    def _calculate_total_improvements(self, baseline: Dict[str, Any], 
                                    validation: Dict[str, Any]) -> Dict[str, float]:
        """计算总体改进"""
        total_improvements = {
            'throughput': 0.0,
            'latency': 0.0,
            'resource_efficiency': 0.0
        }
        
        if 'metrics_comparison' in validation:
            comparisons = validation['metrics_comparison']
            scope_count = len(comparisons)
            
            if scope_count > 0:
                for scope_data in comparisons.values():
                    total_improvements['throughput'] += scope_data.get('throughput_improvement', 0)
                    total_improvements['latency'] += scope_data.get('latency_improvement', 0)
                    total_improvements['resource_efficiency'] += scope_data.get('resource_efficiency', 0)
                
                # 计算平均值
                for key in total_improvements:
                    total_improvements[key] /= scope_count
        
        return total_improvements
    
    def generate_tuning_report(self, output_file: str):
        """生成调优报告"""
        if not self.tuning_history:
            return
        
        report = []
        report.append("# RocketMQ 性能调优报告")
        report.append(f"\n生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
        report.append(f"调优项目数量: {len(self.tuning_history)}\n")
        
        # 成功率统计
        successful_tunings = [t for t in self.tuning_history if t.success]
        success_rate = len(successful_tunings) / len(self.tuning_history) * 100
        report.append(f"## 调优概览")
        report.append(f"- 总调优项目: {len(self.tuning_history)}")
        report.append(f"- 成功项目: {len(successful_tunings)}")
        report.append(f"- 成功率: {success_rate:.1f}%\n")
        
        # 详细结果
        report.append("## 调优详情")
        for i, result in enumerate(self.tuning_history, 1):
            report.append(f"### {i}. {result.parameter.name}")
            report.append(f"- 范围: {result.parameter.scope.value}")
            report.append(f"- 调优前: {result.parameter.current_value}")
            report.append(f"- 调优后: {result.parameter.recommended_value}")
            report.append(f"- 影响级别: {result.parameter.impact_level}")
            report.append(f"- 状态: {'成功' if result.success else '失败'}")
            
            if result.improvement:
                report.append("- 改进情况:")
                for metric, improvement in result.improvement.items():
                    report.append(f"  - {metric}: {improvement:+.2f}%")
            
            report.append(f"- 说明: {result.parameter.description}")
            report.append(f"- 备注: {result.notes}\n")
        
        with open(output_file, 'w', encoding='utf-8') as f:
            f.write('\n'.join(report))
        
        print(f"调优报告已保存到: {output_file}")

# 使用示例
if __name__ == "__main__":
    # 创建调优编排器
    orchestrator = TuningOrchestrator()
    
    # 注册Broker调优器
    broker_tuner = BrokerTuner("/opt/rocketmq/conf/broker.conf")
    orchestrator.register_tuner(TuningScope.BROKER, broker_tuner)
    
    # 执行吞吐量优化计划
    print("开始执行RocketMQ性能调优...")
    results = orchestrator.execute_tuning_plan(
        target=OptimizationTarget.THROUGHPUT,
        scopes=[TuningScope.BROKER]
    )
    
    # 打印结果摘要
    print(f"\n调优完成!")
    print(f"目标: {results['target']}")
    print(f"范围: {', '.join(results['scopes'])}")
    print(f"总体改进:")
    for metric, improvement in results['total_improvements'].items():
        print(f"  {metric}: {improvement:+.2f}%")
    
    # 生成详细报告
     orchestrator.generate_tuning_report("rocketmq_tuning_report.md")

8.7.2 配置模板和最佳实践

from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
import json
import os

class DeploymentEnvironment(Enum):
    """部署环境"""
    DEVELOPMENT = "development"
    TESTING = "testing"
    STAGING = "staging"
    PRODUCTION = "production"

class WorkloadType(Enum):
    """工作负载类型"""
    HIGH_THROUGHPUT = "high_throughput"  # 高吞吐量
    LOW_LATENCY = "low_latency"  # 低延迟
    MIXED = "mixed"  # 混合负载
    BATCH_PROCESSING = "batch_processing"  # 批处理
    REAL_TIME = "real_time"  # 实时处理

class HardwareProfile(Enum):
    """硬件配置"""
    SMALL = "small"  # 小型配置
    MEDIUM = "medium"  # 中型配置
    LARGE = "large"  # 大型配置
    XLARGE = "xlarge"  # 超大型配置

@dataclass
class ConfigurationTemplate:
    """配置模板"""
    name: str
    environment: DeploymentEnvironment
    workload_type: WorkloadType
    hardware_profile: HardwareProfile
    broker_config: Dict[str, Any]
    jvm_config: Dict[str, Any]
    system_config: Dict[str, Any]
    description: str
    use_cases: List[str]

class ConfigurationTemplateManager:
    """配置模板管理器"""
    
    def __init__(self):
        self.templates = self._initialize_templates()
    
    def _initialize_templates(self) -> Dict[str, ConfigurationTemplate]:
        """初始化配置模板"""
        templates = {}
        
        # 高吞吐量生产环境模板
        templates['prod_high_throughput'] = ConfigurationTemplate(
            name="生产环境高吞吐量配置",
            environment=DeploymentEnvironment.PRODUCTION,
            workload_type=WorkloadType.HIGH_THROUGHPUT,
            hardware_profile=HardwareProfile.LARGE,
            broker_config={
                # 基础配置
                'brokerClusterName': 'DefaultCluster',
                'brokerName': 'broker-a',
                'brokerId': '0',
                'brokerRole': 'ASYNC_MASTER',
                'deleteWhen': '04',
                'fileReservedTime': '48',
                
                # 网络配置
                'listenPort': '10911',
                'namesrvAddr': 'localhost:9876',
                
                # 存储配置
                'storePathRootDir': '/opt/rocketmq/store',
                'storePathCommitLog': '/opt/rocketmq/store/commitlog',
                'mapedFileSizeCommitLog': '2147483648',  # 2GB
                'mapedFileSizeConsumeQueue': '6000000',
                'mapedFileSizeConsumeQueueExt': '50331648',
                
                # 刷盘配置
                'flushDiskType': 'ASYNC_FLUSH',
                'flushIntervalCommitLog': '1000',
                'flushCommitLogTimed': 'false',
                'flushIntervalConsumeQueue': '1000',
                
                # 线程池配置
                'sendMessageThreadPoolNums': '64',
                'pullMessageThreadPoolNums': '64',
                'queryMessageThreadPoolNums': '32',
                'adminBrokerThreadPoolNums': '16',
                'clientManagerThreadPoolNums': '32',
                'consumerManagerThreadPoolNums': '32',
                
                # 性能优化
                'useReentrantLockWhenPutMessage': 'true',
                'warmMapedFileEnable': 'true',
                'offsetCheckInSlave': 'true',
                'debugLockEnable': 'false',
                'transferMsgByHeap': 'true',
                'maxTransferBytesOnMessageInMemory': '262144',
                'maxTransferCountOnMessageInMemory': '32',
                'maxTransferBytesOnMessageInDisk': '65536',
                'maxTransferCountOnMessageInDisk': '8',
                
                # 消息限制
                'maxMessageSize': '4194304',  # 4MB
                'osPageCacheBusyTimeOutMills': '1000',
                'defaultQueryMaxNum': '32',
                
                # 高可用配置
                'syncFlushTimeout': '5000',
                'slaveReadEnable': 'false',
                'haMasterAddress': '',
                'haListenPort': '10912'
            },
            jvm_config={
                'heap_size': '8g',
                'young_gen': '4g',
                'gc_algorithm': 'G1GC',
                'gc_options': [
                    '-XX:+UseG1GC',
                    '-XX:G1HeapRegionSize=16m',
                    '-XX:G1ReservePercent=25',
                    '-XX:InitiatingHeapOccupancyPercent=30',
                    '-XX:SoftRefLRUPolicyMSPerMB=0',
                    '-XX:SurvivorRatio=8',
                    '-XX:+DisableExplicitGC',
                    '-XX:+UseStringDeduplication'
                ],
                'jvm_options': [
                    '-server',
                    '-Xms8g',
                    '-Xmx8g',
                    '-Xmn4g',
                    '-XX:+UseLargePages',
                    '-XX:+AlwaysPreTouch',
                    '-XX:MaxDirectMemorySize=15g',
                    '-XX:+PrintGCDetails',
                    '-XX:+PrintGCTimeStamps',
                    '-XX:+PrintGCApplicationStoppedTime',
                    '-XX:+PrintAdaptiveSizePolicy',
                    '-Xloggc:/opt/rocketmq/logs/rmq_broker_gc_%p.log',
                    '-XX:+UseGCLogFileRotation',
                    '-XX:NumberOfGCLogFiles=5',
                    '-XX:GCLogFileSize=30m'
                ]
            },
            system_config={
                'vm.swappiness': '1',
                'vm.max_map_count': '655360',
                'net.core.somaxconn': '65535',
                'net.core.netdev_max_backlog': '65535',
                'net.ipv4.tcp_max_syn_backlog': '65535',
                'net.ipv4.tcp_keepalive_time': '600',
                'net.ipv4.tcp_keepalive_intvl': '30',
                'net.ipv4.tcp_keepalive_probes': '3'
            },
            description="适用于生产环境的高吞吐量配置,支持每秒数万条消息的处理",
            use_cases=[
                "电商订单处理",
                "日志收集系统",
                "数据同步管道",
                "批量数据处理"
            ]
        )
        
        # 低延迟生产环境模板
        templates['prod_low_latency'] = ConfigurationTemplate(
            name="生产环境低延迟配置",
            environment=DeploymentEnvironment.PRODUCTION,
            workload_type=WorkloadType.LOW_LATENCY,
            hardware_profile=HardwareProfile.LARGE,
            broker_config={
                # 基础配置
                'brokerClusterName': 'DefaultCluster',
                'brokerName': 'broker-a',
                'brokerId': '0',
                'brokerRole': 'SYNC_MASTER',  # 同步主从保证一致性
                'deleteWhen': '04',
                'fileReservedTime': '72',
                
                # 网络配置
                'listenPort': '10911',
                'namesrvAddr': 'localhost:9876',
                
                # 存储配置
                'storePathRootDir': '/opt/rocketmq/store',
                'storePathCommitLog': '/opt/rocketmq/store/commitlog',
                'mapedFileSizeCommitLog': '1073741824',  # 1GB,减少文件大小
                'mapedFileSizeConsumeQueue': '3000000',
                
                # 刷盘配置 - 优化延迟
                'flushDiskType': 'SYNC_FLUSH',  # 同步刷盘保证数据安全
                'flushIntervalCommitLog': '500',  # 更频繁的刷盘
                'flushCommitLogTimed': 'true',
                'flushIntervalConsumeQueue': '500',
                
                # 线程池配置 - 减少线程数避免上下文切换
                'sendMessageThreadPoolNums': '32',
                'pullMessageThreadPoolNums': '32',
                'queryMessageThreadPoolNums': '16',
                'adminBrokerThreadPoolNums': '8',
                'clientManagerThreadPoolNums': '16',
                'consumerManagerThreadPoolNums': '16',
                
                # 性能优化 - 针对延迟优化
                'useReentrantLockWhenPutMessage': 'false',  # 减少锁竞争
                'warmMapedFileEnable': 'true',
                'offsetCheckInSlave': 'true',
                'debugLockEnable': 'false',
                'transferMsgByHeap': 'false',  # 使用直接内存
                'maxTransferBytesOnMessageInMemory': '131072',
                'maxTransferCountOnMessageInMemory': '16',
                
                # 消息限制
                'maxMessageSize': '1048576',  # 1MB,限制消息大小
                'osPageCacheBusyTimeOutMills': '500',
                'defaultQueryMaxNum': '16',
                
                # 高可用配置
                'syncFlushTimeout': '2000',  # 减少同步超时
                'slaveReadEnable': 'true',
                'haMasterAddress': '',
                'haListenPort': '10912'
            },
            jvm_config={
                'heap_size': '6g',
                'young_gen': '2g',
                'gc_algorithm': 'ParallelGC',  # 使用并行GC减少停顿
                'gc_options': [
                    '-XX:+UseParallelGC',
                    '-XX:+UseParallelOldGC',
                    '-XX:ParallelGCThreads=8',
                    '-XX:+UseAdaptiveSizePolicy',
                    '-XX:SoftRefLRUPolicyMSPerMB=0',
                    '-XX:SurvivorRatio=8',
                    '-XX:+DisableExplicitGC'
                ],
                'jvm_options': [
                    '-server',
                    '-Xms6g',
                    '-Xmx6g',
                    '-Xmn2g',
                    '-XX:+UseLargePages',
                    '-XX:+AlwaysPreTouch',
                    '-XX:MaxDirectMemorySize=10g',
                    '-XX:+PrintGCDetails',
                    '-XX:+PrintGCTimeStamps',
                    '-Xloggc:/opt/rocketmq/logs/rmq_broker_gc_%p.log'
                ]
            },
            system_config={
                'vm.swappiness': '1',
                'vm.max_map_count': '655360',
                'net.core.somaxconn': '32768',
                'net.core.netdev_max_backlog': '32768',
                'net.ipv4.tcp_max_syn_backlog': '32768',
                'net.ipv4.tcp_keepalive_time': '300',
                'net.ipv4.tcp_keepalive_intvl': '15',
                'net.ipv4.tcp_keepalive_probes': '3',
                'net.ipv4.tcp_nodelay': '1'  # 禁用Nagle算法
            },
            description="适用于对延迟敏感的生产环境,优化消息传输延迟",
            use_cases=[
                "实时交易系统",
                "在线游戏",
                "实时监控告警",
                "金融风控系统"
            ]
        )
        
        # 开发环境模板
        templates['dev_general'] = ConfigurationTemplate(
            name="开发环境通用配置",
            environment=DeploymentEnvironment.DEVELOPMENT,
            workload_type=WorkloadType.MIXED,
            hardware_profile=HardwareProfile.SMALL,
            broker_config={
                # 基础配置
                'brokerClusterName': 'DevCluster',
                'brokerName': 'dev-broker',
                'brokerId': '0',
                'brokerRole': 'ASYNC_MASTER',
                'deleteWhen': '04',
                'fileReservedTime': '24',  # 开发环境保留时间短
                
                # 网络配置
                'listenPort': '10911',
                'namesrvAddr': 'localhost:9876',
                
                # 存储配置
                'storePathRootDir': '/tmp/rocketmq/store',
                'storePathCommitLog': '/tmp/rocketmq/store/commitlog',
                'mapedFileSizeCommitLog': '268435456',  # 256MB
                'mapedFileSizeConsumeQueue': '1500000',
                
                # 刷盘配置
                'flushDiskType': 'ASYNC_FLUSH',
                'flushIntervalCommitLog': '5000',
                'flushCommitLogTimed': 'false',
                'flushIntervalConsumeQueue': '5000',
                
                # 线程池配置 - 开发环境使用较少线程
                'sendMessageThreadPoolNums': '8',
                'pullMessageThreadPoolNums': '8',
                'queryMessageThreadPoolNums': '4',
                'adminBrokerThreadPoolNums': '4',
                'clientManagerThreadPoolNums': '8',
                'consumerManagerThreadPoolNums': '8',
                
                # 性能优化
                'useReentrantLockWhenPutMessage': 'true',
                'warmMapedFileEnable': 'false',  # 开发环境不需要预热
                'offsetCheckInSlave': 'false',
                'debugLockEnable': 'true',  # 开发环境启用调试
                'transferMsgByHeap': 'true',
                
                # 消息限制
                'maxMessageSize': '2097152',  # 2MB
                'osPageCacheBusyTimeOutMills': '2000',
                'defaultQueryMaxNum': '16'
            },
            jvm_config={
                'heap_size': '2g',
                'young_gen': '1g',
                'gc_algorithm': 'G1GC',
                'gc_options': [
                    '-XX:+UseG1GC',
                    '-XX:G1HeapRegionSize=8m',
                    '-XX:+DisableExplicitGC'
                ],
                'jvm_options': [
                    '-server',
                    '-Xms2g',
                    '-Xmx2g',
                    '-Xmn1g',
                    '-XX:MaxDirectMemorySize=3g',
                    '-XX:+PrintGCDetails',
                    '-Xloggc:/tmp/rocketmq/logs/rmq_broker_gc.log'
                ]
            },
            system_config={
                'vm.swappiness': '10',
                'vm.max_map_count': '262144'
            },
            description="适用于开发和测试环境的轻量级配置",
            use_cases=[
                "功能开发测试",
                "集成测试",
                "性能测试",
                "学习和实验"
            ]
        )
        
        return templates
    
    def get_template(self, template_name: str) -> Optional[ConfigurationTemplate]:
        """获取配置模板"""
        return self.templates.get(template_name)
    
    def list_templates(self, environment: Optional[DeploymentEnvironment] = None,
                      workload_type: Optional[WorkloadType] = None) -> List[ConfigurationTemplate]:
        """列出配置模板"""
        templates = list(self.templates.values())
        
        if environment:
            templates = [t for t in templates if t.environment == environment]
        
        if workload_type:
            templates = [t for t in templates if t.workload_type == workload_type]
        
        return templates
    
    def recommend_template(self, environment: DeploymentEnvironment,
                          workload_type: WorkloadType,
                          hardware_profile: HardwareProfile) -> Optional[ConfigurationTemplate]:
        """推荐配置模板"""
        # 精确匹配
        for template in self.templates.values():
            if (template.environment == environment and
                template.workload_type == workload_type and
                template.hardware_profile == hardware_profile):
                return template
        
        # 部分匹配
        for template in self.templates.values():
            if (template.environment == environment and
                template.workload_type == workload_type):
                return template
        
        # 环境匹配
        for template in self.templates.values():
            if template.environment == environment:
                return template
        
        return None
    
    def generate_broker_config(self, template_name: str, output_path: str,
                              custom_overrides: Optional[Dict[str, Any]] = None):
        """生成Broker配置文件"""
        template = self.get_template(template_name)
        if not template:
            raise ValueError(f"模板不存在: {template_name}")
        
        config = template.broker_config.copy()
        
        # 应用自定义覆盖
        if custom_overrides:
            config.update(custom_overrides)
        
        # 生成配置文件
        config_lines = []
        config_lines.append(f"# RocketMQ Broker配置")
        config_lines.append(f"# 模板: {template.name}")
        config_lines.append(f"# 环境: {template.environment.value}")
        config_lines.append(f"# 工作负载: {template.workload_type.value}")
        config_lines.append(f"# 硬件配置: {template.hardware_profile.value}")
        config_lines.append(f"# 生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
        config_lines.append("")
        
        for key, value in config.items():
            config_lines.append(f"{key}={value}")
        
        # 确保输出目录存在
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        
        with open(output_path, 'w', encoding='utf-8') as f:
            f.write('\n'.join(config_lines))
        
        print(f"Broker配置文件已生成: {output_path}")
    
    def generate_jvm_config(self, template_name: str, output_path: str):
        """生成JVM配置文件"""
        template = self.get_template(template_name)
        if not template:
            raise ValueError(f"模板不存在: {template_name}")
        
        jvm_config = template.jvm_config
        
        # 生成JVM启动脚本
        script_lines = []
        script_lines.append("#!/bin/bash")
        script_lines.append(f"# RocketMQ Broker JVM配置")
        script_lines.append(f"# 模板: {template.name}")
        script_lines.append("")
        
        # JVM选项
        jvm_opts = jvm_config['jvm_options'] + jvm_config['gc_options']
        script_lines.append("JAVA_OPT=\"\\")
        for opt in jvm_opts:
            script_lines.append(f"  {opt} \\")
        script_lines.append("\"")
        script_lines.append("")
        script_lines.append("export JAVA_OPT")
        
        # 确保输出目录存在
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        
        with open(output_path, 'w', encoding='utf-8') as f:
            f.write('\n'.join(script_lines))
        
        # 设置执行权限
        os.chmod(output_path, 0o755)
        
        print(f"JVM配置脚本已生成: {output_path}")
    
    def generate_system_config(self, template_name: str, output_path: str):
        """生成系统配置文件"""
        template = self.get_template(template_name)
        if not template:
            raise ValueError(f"模板不存在: {template_name}")
        
        system_config = template.system_config
        
        # 生成sysctl配置
        config_lines = []
        config_lines.append(f"# RocketMQ系统优化配置")
        config_lines.append(f"# 模板: {template.name}")
        config_lines.append("")
        
        for key, value in system_config.items():
            config_lines.append(f"{key} = {value}")
        
        # 确保输出目录存在
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        
        with open(output_path, 'w', encoding='utf-8') as f:
            f.write('\n'.join(config_lines))
        
        print(f"系统配置文件已生成: {output_path}")
    
    def generate_complete_config(self, template_name: str, output_dir: str,
                               custom_overrides: Optional[Dict[str, Any]] = None):
        """生成完整的配置文件集合"""
        template = self.get_template(template_name)
        if not template:
            raise ValueError(f"模板不存在: {template_name}")
        
        # 确保输出目录存在
        os.makedirs(output_dir, exist_ok=True)
        
        # 生成各种配置文件
        self.generate_broker_config(
            template_name,
            os.path.join(output_dir, 'broker.conf'),
            custom_overrides
        )
        
        self.generate_jvm_config(
            template_name,
            os.path.join(output_dir, 'jvm.sh')
        )
        
        self.generate_system_config(
            template_name,
            os.path.join(output_dir, 'sysctl.conf')
        )
        
        # 生成README文件
        readme_path = os.path.join(output_dir, 'README.md')
        self._generate_readme(template, readme_path)
        
        print(f"完整配置已生成到目录: {output_dir}")
    
    def _generate_readme(self, template: ConfigurationTemplate, output_path: str):
        """生成README文件"""
        readme_lines = []
        readme_lines.append(f"# {template.name}")
        readme_lines.append("")
        readme_lines.append(f"**描述**: {template.description}")
        readme_lines.append("")
        readme_lines.append("## 配置信息")
        readme_lines.append(f"- 环境: {template.environment.value}")
        readme_lines.append(f"- 工作负载类型: {template.workload_type.value}")
        readme_lines.append(f"- 硬件配置: {template.hardware_profile.value}")
        readme_lines.append("")
        readme_lines.append("## 适用场景")
        for use_case in template.use_cases:
            readme_lines.append(f"- {use_case}")
        readme_lines.append("")
        readme_lines.append("## 文件说明")
        readme_lines.append("- `broker.conf`: Broker配置文件")
        readme_lines.append("- `jvm.sh`: JVM启动参数脚本")
        readme_lines.append("- `sysctl.conf`: 系统内核参数配置")
        readme_lines.append("")
        readme_lines.append("## 使用方法")
        readme_lines.append("1. 将broker.conf复制到RocketMQ配置目录")
        readme_lines.append("2. 在启动脚本中引用jvm.sh")
        readme_lines.append("3. 使用sysctl -p应用系统配置")
        
        with open(output_path, 'w', encoding='utf-8') as f:
            f.write('\n'.join(readme_lines))

# 使用示例
if __name__ == "__main__":
    # 创建配置模板管理器
    template_manager = ConfigurationTemplateManager()
    
    # 列出所有模板
    print("可用的配置模板:")
    for template in template_manager.list_templates():
        print(f"- {template.name} ({template.environment.value}, {template.workload_type.value})")
    
    # 推荐模板
    recommended = template_manager.recommend_template(
        environment=DeploymentEnvironment.PRODUCTION,
        workload_type=WorkloadType.HIGH_THROUGHPUT,
        hardware_profile=HardwareProfile.LARGE
    )
    
    if recommended:
        print(f"\n推荐模板: {recommended.name}")
        print(f"描述: {recommended.description}")
        
        # 生成完整配置
        template_manager.generate_complete_config(
             'prod_high_throughput',
             '/tmp/rocketmq_config',
             custom_overrides={
                 'brokerName': 'prod-broker-01',
                 'namesrvAddr': 'nameserver1:9876;nameserver2:9876'
             }
         )

8.7.3 故障排查和性能诊断

from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Any, Tuple
import re
import json
import time
from datetime import datetime, timedelta

class IssueType(Enum):
    """问题类型"""
    PERFORMANCE = "performance"  # 性能问题
    AVAILABILITY = "availability"  # 可用性问题
    CONSISTENCY = "consistency"  # 一致性问题
    RESOURCE = "resource"  # 资源问题
    NETWORK = "network"  # 网络问题
    STORAGE = "storage"  # 存储问题
    CONFIGURATION = "configuration"  # 配置问题

class Severity(Enum):
    """严重程度"""
    CRITICAL = "critical"  # 严重
    HIGH = "high"  # 高
    MEDIUM = "medium"  # 中
    LOW = "low"  # 低
    INFO = "info"  # 信息

class DiagnosticStatus(Enum):
    """诊断状态"""
    HEALTHY = "healthy"  # 健康
    WARNING = "warning"  # 警告
    ERROR = "error"  # 错误
    CRITICAL = "critical"  # 严重
    UNKNOWN = "unknown"  # 未知

@dataclass
class DiagnosticResult:
    """诊断结果"""
    component: str
    status: DiagnosticStatus
    issue_type: Optional[IssueType]
    severity: Severity
    message: str
    details: Dict[str, Any]
    recommendations: List[str]
    timestamp: datetime

@dataclass
class PerformanceIssue:
    """性能问题"""
    issue_id: str
    issue_type: IssueType
    severity: Severity
    title: str
    description: str
    symptoms: List[str]
    root_causes: List[str]
    solutions: List[str]
    prevention: List[str]
    related_metrics: List[str]

class LogAnalyzer:
    """日志分析器"""
    
    def __init__(self):
        self.error_patterns = self._initialize_error_patterns()
        self.performance_patterns = self._initialize_performance_patterns()
    
    def _initialize_error_patterns(self) -> Dict[str, Dict[str, Any]]:
        """初始化错误模式"""
        return {
            'broker_startup_failed': {
                'pattern': r'Failed to start.*broker',
                'issue_type': IssueType.AVAILABILITY,
                'severity': Severity.CRITICAL,
                'description': 'Broker启动失败'
            },
            'disk_full': {
                'pattern': r'No space left on device|Disk.*full',
                'issue_type': IssueType.STORAGE,
                'severity': Severity.CRITICAL,
                'description': '磁盘空间不足'
            },
            'memory_leak': {
                'pattern': r'OutOfMemoryError|Memory.*leak',
                'issue_type': IssueType.RESOURCE,
                'severity': Severity.HIGH,
                'description': '内存泄漏或内存不足'
            },
            'network_timeout': {
                'pattern': r'Connection.*timeout|Network.*timeout',
                'issue_type': IssueType.NETWORK,
                'severity': Severity.MEDIUM,
                'description': '网络连接超时'
            },
            'message_loss': {
                'pattern': r'Message.*lost|Lost.*message',
                'issue_type': IssueType.CONSISTENCY,
                'severity': Severity.HIGH,
                'description': '消息丢失'
            },
            'high_latency': {
                'pattern': r'High.*latency|Latency.*exceeded',
                'issue_type': IssueType.PERFORMANCE,
                'severity': Severity.MEDIUM,
                'description': '高延迟'
            }
        }
    
    def _initialize_performance_patterns(self) -> Dict[str, Dict[str, Any]]:
        """初始化性能模式"""
        return {
            'gc_pressure': {
                'pattern': r'GC.*time.*exceeded|Full GC.*frequent',
                'issue_type': IssueType.PERFORMANCE,
                'severity': Severity.MEDIUM,
                'description': 'GC压力过大'
            },
            'thread_pool_exhausted': {
                'pattern': r'Thread.*pool.*exhausted|No.*thread.*available',
                'issue_type': IssueType.RESOURCE,
                'severity': Severity.HIGH,
                'description': '线程池耗尽'
            },
            'slow_consumer': {
                'pattern': r'Consumer.*slow|Slow.*consumption',
                'issue_type': IssueType.PERFORMANCE,
                'severity': Severity.MEDIUM,
                'description': '消费者处理缓慢'
            }
        }
    
    def analyze_logs(self, log_content: str) -> List[DiagnosticResult]:
        """分析日志内容"""
        results = []
        
        # 分析错误模式
        for pattern_name, pattern_info in self.error_patterns.items():
            matches = re.findall(pattern_info['pattern'], log_content, re.IGNORECASE)
            if matches:
                result = DiagnosticResult(
                    component='broker',
                    status=DiagnosticStatus.ERROR,
                    issue_type=pattern_info['issue_type'],
                    severity=pattern_info['severity'],
                    message=pattern_info['description'],
                    details={
                        'pattern': pattern_name,
                        'matches': len(matches),
                        'sample_matches': matches[:5]
                    },
                    recommendations=self._get_recommendations(pattern_name),
                    timestamp=datetime.now()
                )
                results.append(result)
        
        # 分析性能模式
        for pattern_name, pattern_info in self.performance_patterns.items():
            matches = re.findall(pattern_info['pattern'], log_content, re.IGNORECASE)
            if matches:
                result = DiagnosticResult(
                    component='broker',
                    status=DiagnosticStatus.WARNING,
                    issue_type=pattern_info['issue_type'],
                    severity=pattern_info['severity'],
                    message=pattern_info['description'],
                    details={
                        'pattern': pattern_name,
                        'matches': len(matches),
                        'sample_matches': matches[:5]
                    },
                    recommendations=self._get_recommendations(pattern_name),
                    timestamp=datetime.now()
                )
                results.append(result)
        
        return results
    
    def _get_recommendations(self, pattern_name: str) -> List[str]:
        """获取推荐解决方案"""
        recommendations = {
            'broker_startup_failed': [
                '检查配置文件语法',
                '验证端口是否被占用',
                '检查磁盘空间和权限',
                '查看详细错误日志'
            ],
            'disk_full': [
                '清理过期日志文件',
                '增加磁盘空间',
                '调整消息保留策略',
                '启用日志压缩'
            ],
            'memory_leak': [
                '分析堆转储文件',
                '调整JVM堆大小',
                '检查应用程序内存使用',
                '优化GC参数'
            ],
            'network_timeout': [
                '检查网络连接',
                '调整超时参数',
                '优化网络配置',
                '检查防火墙设置'
            ],
            'message_loss': [
                '检查刷盘策略',
                '验证主从同步',
                '检查消费者确认机制',
                '启用消息轨迹'
            ],
            'high_latency': [
                '优化网络配置',
                '调整批处理大小',
                '检查磁盘I/O性能',
                '优化消费者处理逻辑'
            ],
            'gc_pressure': [
                '调整堆大小',
                '优化GC算法',
                '减少对象创建',
                '调整GC参数'
            ],
            'thread_pool_exhausted': [
                '增加线程池大小',
                '优化任务处理逻辑',
                '检查线程泄漏',
                '实施背压机制'
            ],
            'slow_consumer': [
                '优化消费者逻辑',
                '增加消费者实例',
                '调整批处理大小',
                '检查下游系统性能'
            ]
        }
        
        return recommendations.get(pattern_name, ['联系技术支持'])

class PerformanceDiagnostic:
    """性能诊断器"""
    
    def __init__(self):
        self.log_analyzer = LogAnalyzer()
        self.known_issues = self._initialize_known_issues()
    
    def _initialize_known_issues(self) -> Dict[str, PerformanceIssue]:
        """初始化已知问题库"""
        issues = {}
        
        # 高延迟问题
        issues['high_latency'] = PerformanceIssue(
            issue_id='high_latency',
            issue_type=IssueType.PERFORMANCE,
            severity=Severity.MEDIUM,
            title='消息传输延迟过高',
            description='消息从生产者到消费者的端到端延迟超过预期阈值',
            symptoms=[
                '消息处理时间增长',
                '消费者lag增加',
                '用户体验下降',
                '实时性要求无法满足'
            ],
            root_causes=[
                '网络延迟或丢包',
                'Broker处理能力不足',
                '磁盘I/O性能瓶颈',
                'GC停顿时间过长',
                '消费者处理逻辑复杂',
                '批处理配置不当'
            ],
            solutions=[
                '优化网络配置,使用更快的网络',
                '增加Broker实例或升级硬件',
                '使用SSD存储,优化磁盘配置',
                '调整JVM和GC参数',
                '优化消费者处理逻辑',
                '调整批处理大小和频率'
            ],
            prevention=[
                '建立性能监控和告警',
                '定期进行性能测试',
                '容量规划和预扩容',
                '代码审查和性能优化'
            ],
            related_metrics=[
                'message_latency',
                'consumer_lag',
                'broker_cpu_usage',
                'disk_io_wait',
                'gc_time'
            ]
        )
        
        # 吞吐量下降问题
        issues['throughput_degradation'] = PerformanceIssue(
            issue_id='throughput_degradation',
            issue_type=IssueType.PERFORMANCE,
            severity=Severity.HIGH,
            title='消息吞吐量显著下降',
            description='系统处理消息的速率明显低于正常水平',
            symptoms=[
                '消息积压增加',
                '处理速率下降',
                '资源利用率异常',
                '响应时间增长'
            ],
            root_causes=[
                '资源竞争和瓶颈',
                '配置参数不当',
                '网络带宽限制',
                '存储性能下降',
                '应用程序bug',
                '外部依赖性能问题'
            ],
            solutions=[
                '识别和解决资源瓶颈',
                '优化配置参数',
                '升级网络带宽',
                '优化存储配置',
                '修复应用程序问题',
                '优化外部依赖调用'
            ],
            prevention=[
                '容量规划和监控',
                '性能基准测试',
                '定期配置审查',
                '自动化性能测试'
            ],
            related_metrics=[
                'message_throughput',
                'cpu_usage',
                'memory_usage',
                'disk_usage',
                'network_bandwidth'
            ]
        )
        
        # 消息丢失问题
        issues['message_loss'] = PerformanceIssue(
            issue_id='message_loss',
            issue_type=IssueType.CONSISTENCY,
            severity=Severity.CRITICAL,
            title='消息丢失',
            description='生产的消息未能成功传递到消费者',
            symptoms=[
                '消息计数不匹配',
                '业务数据缺失',
                '消费者接收不到消息',
                '消息轨迹中断'
            ],
            root_causes=[
                '异步刷盘配置下的系统崩溃',
                '网络分区导致的数据不一致',
                '消费者处理失败未重试',
                '消息过期被清理',
                '配置错误导致的路由问题'
            ],
            solutions=[
                '启用同步刷盘模式',
                '配置主从同步',
                '实现消费者重试机制',
                '调整消息保留策略',
                '检查和修复路由配置'
            ],
            prevention=[
                '使用事务消息',
                '启用消息轨迹',
                '实施端到端监控',
                '定期数据一致性检查'
            ],
            related_metrics=[
                'message_count',
                'consumer_success_rate',
                'broker_availability',
                'replication_lag'
            ]
        )
        
        return issues
    
    def diagnose_system(self, metrics: Dict[str, float],
                       logs: str = "") -> List[DiagnosticResult]:
        """诊断系统状态"""
        results = []
        
        # 分析日志
        if logs:
            log_results = self.log_analyzer.analyze_logs(logs)
            results.extend(log_results)
        
        # 分析指标
        metric_results = self._analyze_metrics(metrics)
        results.extend(metric_results)
        
        return results
    
    def _analyze_metrics(self, metrics: Dict[str, float]) -> List[DiagnosticResult]:
        """分析性能指标"""
        results = []
        
        # 检查延迟
        if 'message_latency' in metrics and metrics['message_latency'] > 1000:  # 1秒
            result = DiagnosticResult(
                component='messaging',
                status=DiagnosticStatus.WARNING,
                issue_type=IssueType.PERFORMANCE,
                severity=Severity.MEDIUM,
                message=f"消息延迟过高: {metrics['message_latency']:.2f}ms",
                details={'current_latency': metrics['message_latency']},
                recommendations=self.known_issues['high_latency'].solutions,
                timestamp=datetime.now()
            )
            results.append(result)
        
        # 检查吞吐量
        if 'message_throughput' in metrics and metrics['message_throughput'] < 1000:  # 1000 msg/s
            result = DiagnosticResult(
                component='messaging',
                status=DiagnosticStatus.WARNING,
                issue_type=IssueType.PERFORMANCE,
                severity=Severity.MEDIUM,
                message=f"消息吞吐量过低: {metrics['message_throughput']:.2f} msg/s",
                details={'current_throughput': metrics['message_throughput']},
                recommendations=self.known_issues['throughput_degradation'].solutions,
                timestamp=datetime.now()
            )
            results.append(result)
        
        # 检查CPU使用率
        if 'cpu_usage' in metrics and metrics['cpu_usage'] > 80:  # 80%
            result = DiagnosticResult(
                component='system',
                status=DiagnosticStatus.WARNING,
                issue_type=IssueType.RESOURCE,
                severity=Severity.MEDIUM,
                message=f"CPU使用率过高: {metrics['cpu_usage']:.1f}%",
                details={'current_cpu_usage': metrics['cpu_usage']},
                recommendations=[
                    '检查CPU密集型任务',
                    '优化应用程序逻辑',
                    '考虑水平扩展',
                    '升级硬件配置'
                ],
                timestamp=datetime.now()
            )
            results.append(result)
        
        # 检查内存使用率
        if 'memory_usage' in metrics and metrics['memory_usage'] > 85:  # 85%
            result = DiagnosticResult(
                component='system',
                status=DiagnosticStatus.WARNING,
                issue_type=IssueType.RESOURCE,
                severity=Severity.HIGH,
                message=f"内存使用率过高: {metrics['memory_usage']:.1f}%",
                details={'current_memory_usage': metrics['memory_usage']},
                recommendations=[
                    '检查内存泄漏',
                    '调整JVM堆大小',
                    '优化数据结构',
                    '增加物理内存'
                ],
                timestamp=datetime.now()
            )
            results.append(result)
        
        # 检查磁盘使用率
        if 'disk_usage' in metrics and metrics['disk_usage'] > 90:  # 90%
            result = DiagnosticResult(
                component='storage',
                status=DiagnosticStatus.ERROR,
                issue_type=IssueType.STORAGE,
                severity=Severity.HIGH,
                message=f"磁盘使用率过高: {metrics['disk_usage']:.1f}%",
                details={'current_disk_usage': metrics['disk_usage']},
                recommendations=[
                    '清理过期文件',
                    '增加存储空间',
                    '调整数据保留策略',
                    '启用数据压缩'
                ],
                timestamp=datetime.now()
            )
            results.append(result)
        
        return results
    
    def get_issue_details(self, issue_id: str) -> Optional[PerformanceIssue]:
        """获取问题详情"""
        return self.known_issues.get(issue_id)
    
    def generate_diagnostic_report(self, results: List[DiagnosticResult],
                                 output_path: str):
        """生成诊断报告"""
        report_lines = []
        report_lines.append("# RocketMQ性能诊断报告")
        report_lines.append(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report_lines.append("")
        
        # 统计信息
        total_issues = len(results)
        critical_issues = len([r for r in results if r.severity == Severity.CRITICAL])
        high_issues = len([r for r in results if r.severity == Severity.HIGH])
        medium_issues = len([r for r in results if r.severity == Severity.MEDIUM])
        
        report_lines.append("## 问题统计")
        report_lines.append(f"- 总问题数: {total_issues}")
        report_lines.append(f"- 严重问题: {critical_issues}")
        report_lines.append(f"- 高优先级问题: {high_issues}")
        report_lines.append(f"- 中优先级问题: {medium_issues}")
        report_lines.append("")
        
        # 按严重程度分组
        severity_groups = {}
        for result in results:
            if result.severity not in severity_groups:
                severity_groups[result.severity] = []
            severity_groups[result.severity].append(result)
        
        # 生成详细报告
        for severity in [Severity.CRITICAL, Severity.HIGH, Severity.MEDIUM, Severity.LOW]:
            if severity in severity_groups:
                report_lines.append(f"## {severity.value.upper()}级问题")
                report_lines.append("")
                
                for i, result in enumerate(severity_groups[severity], 1):
                    report_lines.append(f"### {i}. {result.message}")
                    report_lines.append(f"**组件**: {result.component}")
                    report_lines.append(f"**状态**: {result.status.value}")
                    if result.issue_type:
                        report_lines.append(f"**问题类型**: {result.issue_type.value}")
                    report_lines.append(f"**时间**: {result.timestamp.strftime('%Y-%m-%d %H:%M:%S')}")
                    report_lines.append("")
                    
                    if result.details:
                        report_lines.append("**详细信息**:")
                        for key, value in result.details.items():
                            report_lines.append(f"- {key}: {value}")
                        report_lines.append("")
                    
                    if result.recommendations:
                        report_lines.append("**建议解决方案**:")
                        for rec in result.recommendations:
                            report_lines.append(f"- {rec}")
                        report_lines.append("")
        
        # 写入文件
        with open(output_path, 'w', encoding='utf-8') as f:
            f.write('\n'.join(report_lines))
        
        print(f"诊断报告已生成: {output_path}")

# 使用示例
if __name__ == "__main__":
    # 创建性能诊断器
    diagnostic = PerformanceDiagnostic()
    
    # 模拟性能指标
    metrics = {
        'message_latency': 1500,  # 1.5秒延迟
        'message_throughput': 500,  # 500 msg/s
        'cpu_usage': 85,  # 85% CPU使用率
        'memory_usage': 90,  # 90% 内存使用率
        'disk_usage': 95  # 95% 磁盘使用率
    }
    
    # 模拟日志内容
    log_content = """
    2024-01-15 10:30:15 ERROR Failed to start broker due to port conflict
    2024-01-15 10:31:20 WARN High latency detected: 2000ms
    2024-01-15 10:32:10 ERROR OutOfMemoryError: Java heap space
    """
    
    # 执行诊断
    results = diagnostic.diagnose_system(metrics, log_content)
    
    # 打印结果
    print(f"发现 {len(results)} 个问题:")
    for result in results:
        print(f"- [{result.severity.value.upper()}] {result.message}")
    
    # 生成报告
     diagnostic.generate_diagnostic_report(results, "/tmp/diagnostic_report.md")

8.7.4 监控告警和自动化运维

from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Any, Callable
import json
import time
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime, timedelta
import threading
import queue

class AlertLevel(Enum):
    """告警级别"""
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    CRITICAL = "critical"

class AlertChannel(Enum):
    """告警渠道"""
    EMAIL = "email"
    SMS = "sms"
    WEBHOOK = "webhook"
    SLACK = "slack"
    DINGTALK = "dingtalk"

class MetricOperator(Enum):
    """指标操作符"""
    GREATER_THAN = "gt"
    LESS_THAN = "lt"
    GREATER_EQUAL = "ge"
    LESS_EQUAL = "le"
    EQUAL = "eq"
    NOT_EQUAL = "ne"

@dataclass
class AlertRule:
    """告警规则"""
    rule_id: str
    name: str
    metric_name: str
    operator: MetricOperator
    threshold: float
    duration: int  # 持续时间(秒)
    level: AlertLevel
    channels: List[AlertChannel]
    description: str
    enabled: bool = True
    last_triggered: Optional[datetime] = None
    trigger_count: int = 0

@dataclass
class Alert:
    """告警信息"""
    alert_id: str
    rule_id: str
    level: AlertLevel
    title: str
    message: str
    metric_name: str
    current_value: float
    threshold: float
    timestamp: datetime
    resolved: bool = False
    resolved_at: Optional[datetime] = None

@dataclass
class NotificationConfig:
    """通知配置"""
    channel: AlertChannel
    config: Dict[str, Any]
    enabled: bool = True

class AlertManager:
    """告警管理器"""
    
    def __init__(self):
        self.rules: Dict[str, AlertRule] = {}
        self.active_alerts: Dict[str, Alert] = {}
        self.notification_configs: Dict[AlertChannel, NotificationConfig] = {}
        self.alert_history: List[Alert] = []
        self.metric_history: Dict[str, List[Tuple[datetime, float]]] = {}
        self.running = False
        self.check_thread = None
        self.alert_queue = queue.Queue()
        
    def add_rule(self, rule: AlertRule):
        """添加告警规则"""
        self.rules[rule.rule_id] = rule
        print(f"告警规则已添加: {rule.name}")
    
    def remove_rule(self, rule_id: str):
        """移除告警规则"""
        if rule_id in self.rules:
            del self.rules[rule_id]
            print(f"告警规则已移除: {rule_id}")
    
    def configure_notification(self, config: NotificationConfig):
        """配置通知渠道"""
        self.notification_configs[config.channel] = config
        print(f"通知渠道已配置: {config.channel.value}")
    
    def update_metric(self, metric_name: str, value: float, timestamp: Optional[datetime] = None):
        """更新指标值"""
        if timestamp is None:
            timestamp = datetime.now()
        
        # 保存指标历史
        if metric_name not in self.metric_history:
            self.metric_history[metric_name] = []
        
        self.metric_history[metric_name].append((timestamp, value))
        
        # 保留最近1小时的数据
        cutoff_time = timestamp - timedelta(hours=1)
        self.metric_history[metric_name] = [
            (ts, val) for ts, val in self.metric_history[metric_name]
            if ts > cutoff_time
        ]
        
        # 检查告警规则
        self._check_rules(metric_name, value, timestamp)
    
    def _check_rules(self, metric_name: str, value: float, timestamp: datetime):
        """检查告警规则"""
        for rule in self.rules.values():
            if not rule.enabled or rule.metric_name != metric_name:
                continue
            
            # 检查阈值条件
            triggered = self._evaluate_condition(value, rule.operator, rule.threshold)
            
            if triggered:
                # 检查持续时间
                if self._check_duration(rule, timestamp):
                    self._trigger_alert(rule, value, timestamp)
            else:
                # 检查是否需要解决告警
                self._resolve_alert(rule.rule_id, timestamp)
    
    def _evaluate_condition(self, value: float, operator: MetricOperator, threshold: float) -> bool:
        """评估条件"""
        if operator == MetricOperator.GREATER_THAN:
            return value > threshold
        elif operator == MetricOperator.LESS_THAN:
            return value < threshold
        elif operator == MetricOperator.GREATER_EQUAL:
            return value >= threshold
        elif operator == MetricOperator.LESS_EQUAL:
            return value <= threshold
        elif operator == MetricOperator.EQUAL:
            return value == threshold
        elif operator == MetricOperator.NOT_EQUAL:
            return value != threshold
        return False
    
    def _check_duration(self, rule: AlertRule, timestamp: datetime) -> bool:
        """检查持续时间"""
        if rule.duration <= 0:
            return True
        
        # 获取指标历史
        history = self.metric_history.get(rule.metric_name, [])
        if not history:
            return False
        
        # 检查在持续时间内是否一直满足条件
        start_time = timestamp - timedelta(seconds=rule.duration)
        relevant_history = [(ts, val) for ts, val in history if ts >= start_time]
        
        if len(relevant_history) < 2:  # 需要至少2个数据点
            return False
        
        # 检查所有数据点是否都满足条件
        for _, value in relevant_history:
            if not self._evaluate_condition(value, rule.operator, rule.threshold):
                return False
        
        return True
    
    def _trigger_alert(self, rule: AlertRule, value: float, timestamp: datetime):
        """触发告警"""
        # 检查是否已经有活跃的告警
        if rule.rule_id in self.active_alerts:
            return
        
        # 创建告警
        alert = Alert(
            alert_id=f"{rule.rule_id}_{int(timestamp.timestamp())}",
            rule_id=rule.rule_id,
            level=rule.level,
            title=f"告警: {rule.name}",
            message=f"{rule.description}\n当前值: {value:.2f}, 阈值: {rule.threshold:.2f}",
            metric_name=rule.metric_name,
            current_value=value,
            threshold=rule.threshold,
            timestamp=timestamp
        )
        
        # 添加到活跃告警
        self.active_alerts[rule.rule_id] = alert
        self.alert_history.append(alert)
        
        # 更新规则统计
        rule.last_triggered = timestamp
        rule.trigger_count += 1
        
        # 发送通知
        self._send_notifications(alert, rule.channels)
        
        print(f"告警触发: {alert.title}")
    
    def _resolve_alert(self, rule_id: str, timestamp: datetime):
        """解决告警"""
        if rule_id in self.active_alerts:
            alert = self.active_alerts[rule_id]
            alert.resolved = True
            alert.resolved_at = timestamp
            
            # 从活跃告警中移除
            del self.active_alerts[rule_id]
            
            # 发送解决通知
            rule = self.rules.get(rule_id)
            if rule:
                self._send_resolution_notification(alert, rule.channels)
            
            print(f"告警已解决: {alert.title}")
    
    def _send_notifications(self, alert: Alert, channels: List[AlertChannel]):
        """发送通知"""
        for channel in channels:
            if channel in self.notification_configs:
                config = self.notification_configs[channel]
                if config.enabled:
                    self.alert_queue.put((alert, channel, config))
    
    def _send_resolution_notification(self, alert: Alert, channels: List[AlertChannel]):
        """发送解决通知"""
        for channel in channels:
            if channel in self.notification_configs:
                config = self.notification_configs[channel]
                if config.enabled:
                    # 创建解决通知
                    resolution_alert = Alert(
                        alert_id=f"{alert.alert_id}_resolved",
                        rule_id=alert.rule_id,
                        level=AlertLevel.INFO,
                        title=f"告警已解决: {alert.title.replace('告警: ', '')}",
                        message=f"告警已解决\n原始告警时间: {alert.timestamp.strftime('%Y-%m-%d %H:%M:%S')}\n解决时间: {alert.resolved_at.strftime('%Y-%m-%d %H:%M:%S')}",
                        metric_name=alert.metric_name,
                        current_value=alert.current_value,
                        threshold=alert.threshold,
                        timestamp=alert.resolved_at or datetime.now(),
                        resolved=True
                    )
                    self.alert_queue.put((resolution_alert, channel, config))
    
    def start_monitoring(self):
        """开始监控"""
        if self.running:
            return
        
        self.running = True
        
        # 启动通知处理线程
        notification_thread = threading.Thread(target=self._process_notifications)
        notification_thread.daemon = True
        notification_thread.start()
        
        print("告警监控已启动")
    
    def stop_monitoring(self):
        """停止监控"""
        self.running = False
        print("告警监控已停止")
    
    def _process_notifications(self):
        """处理通知队列"""
        while self.running:
            try:
                alert, channel, config = self.alert_queue.get(timeout=1)
                self._send_notification(alert, channel, config)
            except queue.Empty:
                continue
            except Exception as e:
                print(f"发送通知失败: {e}")
    
    def _send_notification(self, alert: Alert, channel: AlertChannel, config: NotificationConfig):
        """发送单个通知"""
        try:
            if channel == AlertChannel.EMAIL:
                self._send_email_notification(alert, config.config)
            elif channel == AlertChannel.WEBHOOK:
                self._send_webhook_notification(alert, config.config)
            elif channel == AlertChannel.SLACK:
                self._send_slack_notification(alert, config.config)
            elif channel == AlertChannel.DINGTALK:
                self._send_dingtalk_notification(alert, config.config)
            else:
                print(f"不支持的通知渠道: {channel}")
        except Exception as e:
            print(f"发送{channel.value}通知失败: {e}")
    
    def _send_email_notification(self, alert: Alert, config: Dict[str, Any]):
        """发送邮件通知"""
        smtp_server = config.get('smtp_server')
        smtp_port = config.get('smtp_port', 587)
        username = config.get('username')
        password = config.get('password')
        to_emails = config.get('to_emails', [])
        
        if not all([smtp_server, username, password, to_emails]):
            raise ValueError("邮件配置不完整")
        
        # 创建邮件
        msg = MIMEMultipart()
        msg['From'] = username
        msg['To'] = ', '.join(to_emails)
        msg['Subject'] = f"[RocketMQ] {alert.title}"
        
        # 邮件内容
        body = f"""
        告警详情:
        
        级别: {alert.level.value.upper()}
        指标: {alert.metric_name}
        当前值: {alert.current_value:.2f}
        阈值: {alert.threshold:.2f}
        时间: {alert.timestamp.strftime('%Y-%m-%d %H:%M:%S')}
        
        描述:
        {alert.message}
        """
        
        msg.attach(MIMEText(body, 'plain', 'utf-8'))
        
        # 发送邮件
        with smtplib.SMTP(smtp_server, smtp_port) as server:
            server.starttls()
            server.login(username, password)
            server.send_message(msg)
        
        print(f"邮件通知已发送: {alert.title}")
    
    def _send_webhook_notification(self, alert: Alert, config: Dict[str, Any]):
        """发送Webhook通知"""
        import requests
        
        url = config.get('url')
        headers = config.get('headers', {})
        
        if not url:
            raise ValueError("Webhook URL未配置")
        
        payload = {
            'alert_id': alert.alert_id,
            'rule_id': alert.rule_id,
            'level': alert.level.value,
            'title': alert.title,
            'message': alert.message,
            'metric_name': alert.metric_name,
            'current_value': alert.current_value,
            'threshold': alert.threshold,
            'timestamp': alert.timestamp.isoformat(),
            'resolved': alert.resolved
        }
        
        response = requests.post(url, json=payload, headers=headers, timeout=10)
        response.raise_for_status()
        
        print(f"Webhook通知已发送: {alert.title}")
    
    def _send_slack_notification(self, alert: Alert, config: Dict[str, Any]):
        """发送Slack通知"""
        import requests
        
        webhook_url = config.get('webhook_url')
        channel = config.get('channel', '#alerts')
        username = config.get('username', 'RocketMQ Monitor')
        
        if not webhook_url:
            raise ValueError("Slack Webhook URL未配置")
        
        # 根据告警级别选择颜色
        color_map = {
            AlertLevel.INFO: 'good',
            AlertLevel.WARNING: 'warning',
            AlertLevel.ERROR: 'danger',
            AlertLevel.CRITICAL: 'danger'
        }
        
        payload = {
            'channel': channel,
            'username': username,
            'attachments': [{
                'color': color_map.get(alert.level, 'warning'),
                'title': alert.title,
                'text': alert.message,
                'fields': [
                    {'title': '指标', 'value': alert.metric_name, 'short': True},
                    {'title': '当前值', 'value': f'{alert.current_value:.2f}', 'short': True},
                    {'title': '阈值', 'value': f'{alert.threshold:.2f}', 'short': True},
                    {'title': '时间', 'value': alert.timestamp.strftime('%Y-%m-%d %H:%M:%S'), 'short': True}
                ],
                'ts': int(alert.timestamp.timestamp())
            }]
        }
        
        response = requests.post(webhook_url, json=payload, timeout=10)
        response.raise_for_status()
        
        print(f"Slack通知已发送: {alert.title}")
    
    def _send_dingtalk_notification(self, alert: Alert, config: Dict[str, Any]):
        """发送钉钉通知"""
        import requests
        import hmac
        import hashlib
        import base64
        import urllib.parse
        
        webhook_url = config.get('webhook_url')
        secret = config.get('secret')
        
        if not webhook_url:
            raise ValueError("钉钉Webhook URL未配置")
        
        # 如果有密钥,计算签名
        if secret:
            timestamp = str(round(time.time() * 1000))
            secret_enc = secret.encode('utf-8')
            string_to_sign = f'{timestamp}\n{secret}'
            string_to_sign_enc = string_to_sign.encode('utf-8')
            hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
            sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
            webhook_url = f'{webhook_url}&timestamp={timestamp}&sign={sign}'
        
        # 构建消息
        level_emoji = {
            AlertLevel.INFO: '🔵',
            AlertLevel.WARNING: '🟡',
            AlertLevel.ERROR: '🔴',
            AlertLevel.CRITICAL: '🚨'
        }
        
        text = f"""{level_emoji.get(alert.level, '⚠️')} {alert.title}

📊 指标: {alert.metric_name}
📈 当前值: {alert.current_value:.2f}
🎯 阈值: {alert.threshold:.2f}
⏰ 时间: {alert.timestamp.strftime('%Y-%m-%d %H:%M:%S')}

📝 描述: {alert.message}"""
        
        payload = {
            'msgtype': 'text',
            'text': {
                'content': text
            }
        }
        
        response = requests.post(webhook_url, json=payload, timeout=10)
        response.raise_for_status()
        
        print(f"钉钉通知已发送: {alert.title}")
    
    def get_active_alerts(self) -> List[Alert]:
        """获取活跃告警"""
        return list(self.active_alerts.values())
    
    def get_alert_history(self, hours: int = 24) -> List[Alert]:
        """获取告警历史"""
        cutoff_time = datetime.now() - timedelta(hours=hours)
        return [alert for alert in self.alert_history if alert.timestamp > cutoff_time]
    
    def get_alert_statistics(self) -> Dict[str, Any]:
        """获取告警统计"""
        total_rules = len(self.rules)
        active_alerts = len(self.active_alerts)
        
        # 按级别统计
        level_stats = {}
        for alert in self.active_alerts.values():
            level = alert.level.value
            level_stats[level] = level_stats.get(level, 0) + 1
        
        # 按规则统计触发次数
        rule_stats = {}
        for rule in self.rules.values():
            rule_stats[rule.name] = {
                'trigger_count': rule.trigger_count,
                'last_triggered': rule.last_triggered.isoformat() if rule.last_triggered else None,
                'enabled': rule.enabled
            }
        
        return {
            'total_rules': total_rules,
            'active_alerts': active_alerts,
            'level_statistics': level_stats,
            'rule_statistics': rule_stats,
            'notification_channels': list(self.notification_configs.keys())
        }

# 使用示例
if __name__ == "__main__":
    # 创建告警管理器
    alert_manager = AlertManager()
    
    # 配置邮件通知
    email_config = NotificationConfig(
        channel=AlertChannel.EMAIL,
        config={
            'smtp_server': 'smtp.gmail.com',
            'smtp_port': 587,
            'username': 'your-email@gmail.com',
            'password': 'your-password',
            'to_emails': ['admin@company.com']
        }
    )
    alert_manager.configure_notification(email_config)
    
    # 配置钉钉通知
    dingtalk_config = NotificationConfig(
        channel=AlertChannel.DINGTALK,
        config={
            'webhook_url': 'https://oapi.dingtalk.com/robot/send?access_token=your-token',
            'secret': 'your-secret'
        }
    )
    alert_manager.configure_notification(dingtalk_config)
    
    # 添加告警规则
    cpu_rule = AlertRule(
        rule_id='cpu_high',
        name='CPU使用率过高',
        metric_name='cpu_usage',
        operator=MetricOperator.GREATER_THAN,
        threshold=80.0,
        duration=300,  # 5分钟
        level=AlertLevel.WARNING,
        channels=[AlertChannel.EMAIL, AlertChannel.DINGTALK],
        description='CPU使用率超过80%持续5分钟'
    )
    alert_manager.add_rule(cpu_rule)
    
    memory_rule = AlertRule(
        rule_id='memory_high',
        name='内存使用率过高',
        metric_name='memory_usage',
        operator=MetricOperator.GREATER_THAN,
        threshold=90.0,
        duration=60,  # 1分钟
        level=AlertLevel.ERROR,
        channels=[AlertChannel.EMAIL, AlertChannel.DINGTALK],
        description='内存使用率超过90%持续1分钟'
    )
    alert_manager.add_rule(memory_rule)
    
    # 启动监控
    alert_manager.start_monitoring()
    
    # 模拟指标更新
    import time
    for i in range(10):
        # 模拟CPU和内存使用率
        cpu_usage = 85 + (i % 3) * 5  # 85-95%
        memory_usage = 92 + (i % 2) * 3  # 92-95%
        
        alert_manager.update_metric('cpu_usage', cpu_usage)
        alert_manager.update_metric('memory_usage', memory_usage)
        
        print(f"更新指标 - CPU: {cpu_usage}%, Memory: {memory_usage}%")
        time.sleep(2)
    
    # 查看告警统计
    stats = alert_manager.get_alert_statistics()
    print(f"\n告警统计: {json.dumps(stats, indent=2, ensure_ascii=False)}")
    
    # 停止监控
     alert_manager.stop_monitoring()

8.7.5 自动化运维工具

from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Any, Callable
import subprocess
import json
import yaml
import os
import shutil
import tempfile
from pathlib import Path
from datetime import datetime, timedelta
import threading
import time
import logging

class AutomationTaskType(Enum):
    """自动化任务类型"""
    DEPLOYMENT = "deployment"
    SCALING = "scaling"
    BACKUP = "backup"
    MAINTENANCE = "maintenance"
    MONITORING = "monitoring"
    RECOVERY = "recovery"

class TaskStatus(Enum):
    """任务状态"""
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

class TriggerType(Enum):
    """触发器类型"""
    SCHEDULE = "schedule"
    EVENT = "event"
    MANUAL = "manual"
    ALERT = "alert"

@dataclass
class AutomationTask:
    """自动化任务"""
    task_id: str
    name: str
    task_type: AutomationTaskType
    script_path: str
    parameters: Dict[str, Any]
    trigger_type: TriggerType
    schedule: Optional[str] = None  # cron表达式
    timeout: int = 3600  # 超时时间(秒)
    retry_count: int = 3
    enabled: bool = True
    created_at: datetime = None
    last_run: Optional[datetime] = None
    next_run: Optional[datetime] = None
    run_count: int = 0
    success_count: int = 0
    
    def __post_init__(self):
        if self.created_at is None:
            self.created_at = datetime.now()

@dataclass
class TaskExecution:
    """任务执行记录"""
    execution_id: str
    task_id: str
    status: TaskStatus
    start_time: datetime
    end_time: Optional[datetime] = None
    exit_code: Optional[int] = None
    output: str = ""
    error: str = ""
    retry_attempt: int = 0

class RocketMQAutomation:
    """RocketMQ自动化运维工具"""
    
    def __init__(self, config_dir: str = "/etc/rocketmq-automation"):
        self.config_dir = Path(config_dir)
        self.config_dir.mkdir(parents=True, exist_ok=True)
        
        self.tasks: Dict[str, AutomationTask] = {}
        self.executions: List[TaskExecution] = []
        self.running = False
        self.scheduler_thread = None
        
        # 设置日志
        self.logger = logging.getLogger('rocketmq_automation')
        self.logger.setLevel(logging.INFO)
        
        # 加载配置
        self._load_configuration()
        
    def _load_configuration(self):
        """加载配置"""
        config_file = self.config_dir / "automation.yaml"
        if config_file.exists():
            with open(config_file, 'r', encoding='utf-8') as f:
                config = yaml.safe_load(f)
                
            # 加载任务
            for task_data in config.get('tasks', []):
                task = AutomationTask(**task_data)
                self.tasks[task.task_id] = task
                
            self.logger.info(f"已加载 {len(self.tasks)} 个自动化任务")
    
    def _save_configuration(self):
        """保存配置"""
        config = {
            'tasks': [
                {
                    'task_id': task.task_id,
                    'name': task.name,
                    'task_type': task.task_type.value,
                    'script_path': task.script_path,
                    'parameters': task.parameters,
                    'trigger_type': task.trigger_type.value,
                    'schedule': task.schedule,
                    'timeout': task.timeout,
                    'retry_count': task.retry_count,
                    'enabled': task.enabled
                }
                for task in self.tasks.values()
            ]
        }
        
        config_file = self.config_dir / "automation.yaml"
        with open(config_file, 'w', encoding='utf-8') as f:
            yaml.dump(config, f, default_flow_style=False, allow_unicode=True)
    
    def add_task(self, task: AutomationTask):
        """添加自动化任务"""
        self.tasks[task.task_id] = task
        self._save_configuration()
        self.logger.info(f"已添加自动化任务: {task.name}")
    
    def remove_task(self, task_id: str):
        """移除自动化任务"""
        if task_id in self.tasks:
            task_name = self.tasks[task_id].name
            del self.tasks[task_id]
            self._save_configuration()
            self.logger.info(f"已移除自动化任务: {task_name}")
    
    def enable_task(self, task_id: str):
        """启用任务"""
        if task_id in self.tasks:
            self.tasks[task_id].enabled = True
            self._save_configuration()
            self.logger.info(f"已启用任务: {self.tasks[task_id].name}")
    
    def disable_task(self, task_id: str):
        """禁用任务"""
        if task_id in self.tasks:
            self.tasks[task_id].enabled = False
            self._save_configuration()
            self.logger.info(f"已禁用任务: {self.tasks[task_id].name}")
    
    def execute_task(self, task_id: str, manual: bool = False) -> TaskExecution:
        """执行任务"""
        if task_id not in self.tasks:
            raise ValueError(f"任务不存在: {task_id}")
        
        task = self.tasks[task_id]
        if not task.enabled and not manual:
            raise ValueError(f"任务已禁用: {task.name}")
        
        # 创建执行记录
        execution = TaskExecution(
            execution_id=f"{task_id}_{int(datetime.now().timestamp())}",
            task_id=task_id,
            status=TaskStatus.RUNNING,
            start_time=datetime.now()
        )
        
        self.executions.append(execution)
        
        try:
            # 更新任务统计
            task.last_run = execution.start_time
            task.run_count += 1
            
            # 执行脚本
            result = self._run_script(task, execution)
            
            if result['success']:
                execution.status = TaskStatus.COMPLETED
                execution.exit_code = 0
                task.success_count += 1
                self.logger.info(f"任务执行成功: {task.name}")
            else:
                execution.status = TaskStatus.FAILED
                execution.exit_code = result.get('exit_code', 1)
                self.logger.error(f"任务执行失败: {task.name}")
            
            execution.output = result.get('output', '')
            execution.error = result.get('error', '')
            
        except Exception as e:
            execution.status = TaskStatus.FAILED
            execution.error = str(e)
            self.logger.error(f"任务执行异常: {task.name} - {e}")
        
        finally:
            execution.end_time = datetime.now()
            self._save_configuration()
        
        return execution
    
    def _run_script(self, task: AutomationTask, execution: TaskExecution) -> Dict[str, Any]:
        """运行脚本"""
        script_path = Path(task.script_path)
        if not script_path.exists():
            return {
                'success': False,
                'error': f'脚本文件不存在: {script_path}',
                'exit_code': 1
            }
        
        # 准备环境变量
        env = os.environ.copy()
        env.update({
            'ROCKETMQ_TASK_ID': task.task_id,
            'ROCKETMQ_TASK_NAME': task.name,
            'ROCKETMQ_EXECUTION_ID': execution.execution_id,
            'ROCKETMQ_TASK_TYPE': task.task_type.value
        })
        
        # 添加参数到环境变量
        for key, value in task.parameters.items():
            env[f'ROCKETMQ_PARAM_{key.upper()}'] = str(value)
        
        try:
            # 执行脚本
            process = subprocess.Popen(
                [str(script_path)],
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                env=env,
                text=True,
                cwd=script_path.parent
            )
            
            # 等待执行完成或超时
            try:
                stdout, stderr = process.communicate(timeout=task.timeout)
                return {
                    'success': process.returncode == 0,
                    'output': stdout,
                    'error': stderr,
                    'exit_code': process.returncode
                }
            except subprocess.TimeoutExpired:
                process.kill()
                return {
                    'success': False,
                    'error': f'任务执行超时 ({task.timeout}秒)',
                    'exit_code': 124
                }
                
        except Exception as e:
            return {
                'success': False,
                'error': f'执行脚本时发生异常: {e}',
                'exit_code': 1
            }
    
    def start_scheduler(self):
        """启动调度器"""
        if self.running:
            return
        
        self.running = True
        self.scheduler_thread = threading.Thread(target=self._scheduler_loop)
        self.scheduler_thread.daemon = True
        self.scheduler_thread.start()
        
        self.logger.info("自动化调度器已启动")
    
    def stop_scheduler(self):
        """停止调度器"""
        self.running = False
        if self.scheduler_thread:
            self.scheduler_thread.join()
        
        self.logger.info("自动化调度器已停止")
    
    def _scheduler_loop(self):
        """调度器循环"""
        while self.running:
            try:
                current_time = datetime.now()
                
                for task in self.tasks.values():
                    if not task.enabled:
                        continue
                    
                    if task.trigger_type == TriggerType.SCHEDULE:
                        if self._should_run_scheduled_task(task, current_time):
                            try:
                                self.execute_task(task.task_id)
                            except Exception as e:
                                self.logger.error(f"调度执行任务失败: {task.name} - {e}")
                
                time.sleep(60)  # 每分钟检查一次
                
            except Exception as e:
                self.logger.error(f"调度器循环异常: {e}")
                time.sleep(60)
    
    def _should_run_scheduled_task(self, task: AutomationTask, current_time: datetime) -> bool:
        """检查是否应该运行调度任务"""
        if not task.schedule:
            return False
        
        # 简单的cron表达式解析(仅支持基本格式)
        # 格式: 分钟 小时 日 月 星期
        # 例如: "0 2 * * *" 表示每天凌晨2点
        
        try:
            parts = task.schedule.split()
            if len(parts) != 5:
                return False
            
            minute, hour, day, month, weekday = parts
            
            # 检查是否匹配当前时间
            if minute != '*' and int(minute) != current_time.minute:
                return False
            
            if hour != '*' and int(hour) != current_time.hour:
                return False
            
            if day != '*' and int(day) != current_time.day:
                return False
            
            if month != '*' and int(month) != current_time.month:
                return False
            
            if weekday != '*' and int(weekday) != current_time.weekday():
                return False
            
            # 检查是否已经在这个时间点运行过
            if task.last_run:
                last_run_minute = task.last_run.replace(second=0, microsecond=0)
                current_minute = current_time.replace(second=0, microsecond=0)
                if last_run_minute == current_minute:
                    return False
            
            return True
            
        except (ValueError, IndexError):
            self.logger.error(f"无效的cron表达式: {task.schedule}")
            return False
    
    def get_task_status(self, task_id: str) -> Dict[str, Any]:
        """获取任务状态"""
        if task_id not in self.tasks:
            return {'error': '任务不存在'}
        
        task = self.tasks[task_id]
        
        # 获取最近的执行记录
        recent_executions = [
            exec for exec in self.executions
            if exec.task_id == task_id
        ][-10:]  # 最近10次执行
        
        return {
            'task_id': task.task_id,
            'name': task.name,
            'type': task.task_type.value,
            'enabled': task.enabled,
            'run_count': task.run_count,
            'success_count': task.success_count,
            'success_rate': task.success_count / task.run_count if task.run_count > 0 else 0,
            'last_run': task.last_run.isoformat() if task.last_run else None,
            'next_run': task.next_run.isoformat() if task.next_run else None,
            'recent_executions': [
                {
                    'execution_id': exec.execution_id,
                    'status': exec.status.value,
                    'start_time': exec.start_time.isoformat(),
                    'end_time': exec.end_time.isoformat() if exec.end_time else None,
                    'exit_code': exec.exit_code
                }
                for exec in recent_executions
            ]
        }
    
    def get_all_tasks_status(self) -> List[Dict[str, Any]]:
        """获取所有任务状态"""
        return [self.get_task_status(task_id) for task_id in self.tasks.keys()]
    
    def create_deployment_task(self, name: str, broker_config: Dict[str, Any], 
                             nameserver_config: Dict[str, Any]) -> str:
        """创建部署任务"""
        task_id = f"deploy_{int(datetime.now().timestamp())}"
        
        # 创建部署脚本
        script_content = self._generate_deployment_script(broker_config, nameserver_config)
        script_path = self.config_dir / "scripts" / f"{task_id}.sh"
        script_path.parent.mkdir(exist_ok=True)
        
        with open(script_path, 'w', encoding='utf-8') as f:
            f.write(script_content)
        
        script_path.chmod(0o755)
        
        # 创建任务
        task = AutomationTask(
            task_id=task_id,
            name=name,
            task_type=AutomationTaskType.DEPLOYMENT,
            script_path=str(script_path),
            parameters={
                'broker_config': broker_config,
                'nameserver_config': nameserver_config
            },
            trigger_type=TriggerType.MANUAL
        )
        
        self.add_task(task)
        return task_id
    
    def _generate_deployment_script(self, broker_config: Dict[str, Any], 
                                  nameserver_config: Dict[str, Any]) -> str:
        """生成部署脚本"""
        return f'''
#!/bin/bash
set -e

echo "开始RocketMQ部署..."

# 设置变量
ROCKETMQ_HOME="{broker_config.get('rocketmq_home', '/opt/rocketmq')}"
NAMESRV_ADDR="{nameserver_config.get('address', 'localhost:9876')}"
BROKER_NAME="{broker_config.get('broker_name', 'broker-a')}"

# 检查RocketMQ安装
if [ ! -d "$ROCKETMQ_HOME" ]; then
    echo "错误: RocketMQ未安装在 $ROCKETMQ_HOME"
    exit 1
fi

# 停止现有服务
echo "停止现有RocketMQ服务..."
pkill -f "org.apache.rocketmq" || true
sleep 5

# 启动NameServer
echo "启动NameServer..."
nohup $ROCKETMQ_HOME/bin/mqnamesrv > $ROCKETMQ_HOME/logs/namesrv.log 2>&1 &
sleep 10

# 检查NameServer状态
if ! pgrep -f "org.apache.rocketmq.namesrv.NamesrvStartup" > /dev/null; then
    echo "错误: NameServer启动失败"
    exit 1
fi

# 启动Broker
echo "启动Broker..."
export NAMESRV_ADDR="$NAMESRV_ADDR"
nohup $ROCKETMQ_HOME/bin/mqbroker -n "$NAMESRV_ADDR" -c $ROCKETMQ_HOME/conf/broker.conf > $ROCKETMQ_HOME/logs/broker.log 2>&1 &
sleep 10

# 检查Broker状态
if ! pgrep -f "org.apache.rocketmq.broker.BrokerStartup" > /dev/null; then
    echo "错误: Broker启动失败"
    exit 1
fi

echo "RocketMQ部署完成!"
echo "NameServer地址: $NAMESRV_ADDR"
echo "Broker名称: $BROKER_NAME"
        '''
    
    def create_backup_task(self, name: str, backup_config: Dict[str, Any]) -> str:
        """创建备份任务"""
        task_id = f"backup_{int(datetime.now().timestamp())}"
        
        # 创建备份脚本
        script_content = self._generate_backup_script(backup_config)
        script_path = self.config_dir / "scripts" / f"{task_id}.sh"
        script_path.parent.mkdir(exist_ok=True)
        
        with open(script_path, 'w', encoding='utf-8') as f:
            f.write(script_content)
        
        script_path.chmod(0o755)
        
        # 创建任务
        task = AutomationTask(
            task_id=task_id,
            name=name,
            task_type=AutomationTaskType.BACKUP,
            script_path=str(script_path),
            parameters=backup_config,
            trigger_type=TriggerType.SCHEDULE,
            schedule=backup_config.get('schedule', '0 2 * * *')  # 默认每天凌晨2点
        )
        
        self.add_task(task)
        return task_id
    
    def _generate_backup_script(self, backup_config: Dict[str, Any]) -> str:
        """生成备份脚本"""
        data_dir = backup_config.get('data_dir', '/opt/rocketmq/store')
        backup_dir = backup_config.get('backup_dir', '/backup/rocketmq')
        retention_days = backup_config.get('retention_days', 7)
        
        return f'''
#!/bin/bash
set -e

echo "开始RocketMQ数据备份..."

# 设置变量
DATA_DIR="{data_dir}"
BACKUP_DIR="{backup_dir}"
RETENTION_DAYS={retention_days}
TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
BACKUP_PATH="$BACKUP_DIR/rocketmq_backup_$TIMESTAMP"

# 创建备份目录
mkdir -p "$BACKUP_DIR"

# 检查数据目录
if [ ! -d "$DATA_DIR" ]; then
    echo "错误: 数据目录不存在 $DATA_DIR"
    exit 1
fi

# 创建备份
echo "备份数据到 $BACKUP_PATH"
cp -r "$DATA_DIR" "$BACKUP_PATH"

# 压缩备份
echo "压缩备份文件..."
tar -czf "$BACKUP_PATH.tar.gz" -C "$BACKUP_DIR" "$(basename $BACKUP_PATH)"
rm -rf "$BACKUP_PATH"

# 清理旧备份
echo "清理 $RETENTION_DAYS 天前的备份..."
find "$BACKUP_DIR" -name "rocketmq_backup_*.tar.gz" -mtime +$RETENTION_DAYS -delete

echo "备份完成: $BACKUP_PATH.tar.gz"
echo "备份大小: $(du -h $BACKUP_PATH.tar.gz | cut -f1)"
        '''

# 使用示例
if __name__ == "__main__":
    # 创建自动化运维工具
    automation = RocketMQAutomation("/tmp/rocketmq-automation")
    
    # 创建部署任务
    deployment_task_id = automation.create_deployment_task(
        name="生产环境RocketMQ部署",
        broker_config={
            'rocketmq_home': '/opt/rocketmq',
            'broker_name': 'broker-prod-a',
            'cluster_name': 'DefaultCluster'
        },
        nameserver_config={
            'address': '192.168.1.100:9876;192.168.1.101:9876'
        }
    )
    
    # 创建备份任务
    backup_task_id = automation.create_backup_task(
        name="每日数据备份",
        backup_config={
            'data_dir': '/opt/rocketmq/store',
            'backup_dir': '/backup/rocketmq',
            'retention_days': 30,
            'schedule': '0 2 * * *'  # 每天凌晨2点
        }
    )
    
    # 创建监控任务
    monitoring_task = AutomationTask(
        task_id='health_check',
        name='健康检查',
        task_type=AutomationTaskType.MONITORING,
        script_path='/opt/scripts/health_check.sh',
        parameters={'check_interval': 300},
        trigger_type=TriggerType.SCHEDULE,
        schedule='*/5 * * * *'  # 每5分钟执行一次
    )
    automation.add_task(monitoring_task)
    
    # 启动调度器
    automation.start_scheduler()
    
    # 手动执行部署任务
    print("执行部署任务...")
    execution = automation.execute_task(deployment_task_id, manual=True)
    print(f"执行结果: {execution.status.value}")
    
    # 查看所有任务状态
    print("\n所有任务状态:")
    for status in automation.get_all_tasks_status():
        print(f"- {status['name']}: {status['success_rate']:.1%} 成功率")
    
    # 模拟运行一段时间
    import time
    print("\n调度器运行中...")
    time.sleep(10)
    
    # 停止调度器
     automation.stop_scheduler()
     print("自动化运维工具已停止")

8.8 总结

本章详细介绍了RocketMQ的性能优化与调优方法,涵盖了从基础监控到高级自动化运维的完整体系:

8.8.1 核心优化领域

  1. 性能监控与指标

    • 建立完善的监控体系
    • 定义关键性能指标
    • 实时监控和告警
  2. 网络优化

    • 网络配置优化
    • 连接池管理
    • 网络延迟优化
  3. 存储优化

    • 磁盘I/O优化
    • 存储配置调优
    • 文件系统优化
  4. 内存管理优化

    • 内存分配策略
    • 垃圾回收优化
    • JVM参数调优
  5. 缓存优化

    • 多级缓存策略
    • 预读取和预热
    • 缓存命中率优化
  6. 性能基准测试

    • 标准化测试框架
    • RocketMQ专用测试
    • 性能基线建立
  7. 调优最佳实践

    • 系统化调优方法
    • 配置模板管理
    • 故障诊断和排查
    • 监控告警体系
    • 自动化运维工具

8.8.2 优化策略总结

  1. 分层优化

    • 硬件层:CPU、内存、磁盘、网络
    • 系统层:操作系统、文件系统、网络配置
    • 应用层:RocketMQ配置、JVM参数
    • 业务层:消息设计、使用模式
  2. 渐进式优化

    • 建立基线
    • 识别瓶颈
    • 逐步优化
    • 验证效果
  3. 数据驱动

    • 基于监控数据
    • 量化优化效果
    • 持续改进

8.8.3 最佳实践建议

  1. 监控先行

    • 在优化前建立完善的监控体系
    • 设置合理的告警阈值
    • 定期分析监控数据
  2. 测试验证

    • 在测试环境验证优化效果
    • 使用标准化的基准测试
    • 记录优化前后的性能对比
  3. 文档化管理

    • 记录所有配置变更
    • 维护配置模板
    • 建立知识库
  4. 自动化运维

    • 自动化部署和配置
    • 自动化监控和告警
    • 自动化故障恢复
  5. 持续优化

    • 定期性能评估
    • 跟踪业务增长
    • 预防性优化

8.8.4 注意事项

  1. 避免过度优化

    • 根据实际需求优化
    • 平衡性能和复杂度
    • 考虑维护成本
  2. 兼容性考虑

    • 确保配置兼容性
    • 考虑版本升级影响
    • 保持向后兼容
  3. 安全性保障

    • 不牺牲安全性换取性能
    • 定期安全评估
    • 遵循安全最佳实践

通过系统化的性能优化和调优,可以显著提升RocketMQ的性能表现,满足高并发、低延迟的业务需求。同时,建立完善的监控和自动化运维体系,能够确保系统的稳定性和可维护性。

参考资料

  1. Apache RocketMQ官方文档
  2. RocketMQ性能调优指南
  3. Java虚拟机性能优化
  4. Linux系统性能优化
  5. 网络性能优化最佳实践
  6. 存储系统性能优化
  7. 监控系统设计模式
  8. 自动化运维最佳实践

本章内容涵盖了RocketMQ性能优化的各个方面,提供了完整的工具和方法。在实际应用中,应根据具体的业务场景和性能要求,选择合适的优化策略和工具。