9.1 常见问题诊断
9.1.1 连接问题诊断
from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Optional, Any
from datetime import datetime, timedelta
import logging
import psutil
import socket
import ssl
import time
import json
class ConnectionIssueType(Enum):
"""连接问题类型"""
NETWORK_UNREACHABLE = "network_unreachable"
CONNECTION_REFUSED = "connection_refused"
AUTHENTICATION_FAILED = "authentication_failed"
SSL_HANDSHAKE_FAILED = "ssl_handshake_failed"
TIMEOUT = "timeout"
PERMISSION_DENIED = "permission_denied"
RESOURCE_EXHAUSTED = "resource_exhausted"
PROTOCOL_ERROR = "protocol_error"
class DiagnosticSeverity(Enum):
"""诊断严重程度"""
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
@dataclass
class DiagnosticResult:
"""诊断结果"""
issue_type: ConnectionIssueType
severity: DiagnosticSeverity
description: str
details: Dict[str, Any]
recommendations: List[str]
timestamp: datetime
resolved: bool = False
@dataclass
class ConnectionDiagnosticConfig:
"""连接诊断配置"""
host: str
port: int
username: str
password: str
vhost: str = "/"
ssl_enabled: bool = False
ssl_cert_path: Optional[str] = None
timeout: int = 30
max_retries: int = 3
retry_delay: float = 1.0
class ConnectionDiagnostic:
"""连接诊断器"""
def __init__(self, config: ConnectionDiagnosticConfig):
self.config = config
self.logger = logging.getLogger(__name__)
self.results: List[DiagnosticResult] = []
def diagnose_connection(self) -> List[DiagnosticResult]:
"""诊断连接问题"""
self.results.clear()
try:
# 1. 网络连通性检查
self._check_network_connectivity()
# 2. 端口可达性检查
self._check_port_accessibility()
# 3. SSL配置检查
if self.config.ssl_enabled:
self._check_ssl_configuration()
# 4. 认证检查
self._check_authentication()
# 5. 权限检查
self._check_permissions()
# 6. 资源检查
self._check_resource_limits()
except Exception as e:
self.logger.error(f"诊断过程中发生错误: {e}")
self.results.append(DiagnosticResult(
issue_type=ConnectionIssueType.PROTOCOL_ERROR,
severity=DiagnosticSeverity.ERROR,
description=f"诊断过程异常: {str(e)}",
details={"exception": str(e)},
recommendations=["检查诊断工具配置", "查看详细错误日志"],
timestamp=datetime.now()
))
return self.results
def _check_network_connectivity(self):
"""检查网络连通性"""
try:
# 尝试解析主机名
ip_address = socket.gethostbyname(self.config.host)
# 检查网络可达性
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.config.timeout)
result = sock.connect_ex((ip_address, self.config.port))
sock.close()
if result != 0:
self.results.append(DiagnosticResult(
issue_type=ConnectionIssueType.NETWORK_UNREACHABLE,
severity=DiagnosticSeverity.CRITICAL,
description=f"无法连接到 {self.config.host}:{self.config.port}",
details={
"host": self.config.host,
"ip": ip_address,
"port": self.config.port,
"error_code": result
},
recommendations=[
"检查网络连接",
"验证主机名和端口",
"检查防火墙设置",
"确认RabbitMQ服务运行状态"
],
timestamp=datetime.now()
))
## 9.2 性能监控与调优
### 9.2.1 性能监控系统
```python
class MetricType(Enum):
"""指标类型"""
THROUGHPUT = "throughput"
LATENCY = "latency"
MEMORY = "memory"
CPU = "cpu"
DISK = "disk"
NETWORK = "network"
CONNECTION = "connection"
QUEUE = "queue"
class AlertLevel(Enum):
"""告警级别"""
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class PerformanceMetric:
"""性能指标"""
name: str
metric_type: MetricType
value: float
unit: str
timestamp: datetime
threshold_warning: Optional[float] = None
threshold_critical: Optional[float] = None
tags: Dict[str, str] = None
@dataclass
class PerformanceAlert:
"""性能告警"""
metric_name: str
level: AlertLevel
message: str
current_value: float
threshold: float
timestamp: datetime
resolved: bool = False
resolution_time: Optional[datetime] = None
class PerformanceMonitor:
"""性能监控器"""
def __init__(self, management_api):
self.management_api = management_api
self.logger = logging.getLogger(__name__)
self.metrics: List[PerformanceMetric] = []
self.alerts: List[PerformanceAlert] = []
self.monitoring_enabled = False
self.monitoring_interval = 60 # 秒
def start_monitoring(self, interval: int = 60):
"""启动性能监控"""
self.monitoring_interval = interval
self.monitoring_enabled = True
self.logger.info(f"性能监控已启动,间隔: {interval}秒")
def stop_monitoring(self):
"""停止性能监控"""
self.monitoring_enabled = False
self.logger.info("性能监控已停止")
def collect_metrics(self) -> List[PerformanceMetric]:
"""收集性能指标"""
current_metrics = []
timestamp = datetime.now()
try:
# 收集系统指标
current_metrics.extend(self._collect_system_metrics(timestamp))
# 收集RabbitMQ指标
current_metrics.extend(self._collect_rabbitmq_metrics(timestamp))
# 收集队列指标
current_metrics.extend(self._collect_queue_metrics(timestamp))
# 收集连接指标
current_metrics.extend(self._collect_connection_metrics(timestamp))
# 存储指标
self.metrics.extend(current_metrics)
# 检查告警
self._check_alerts(current_metrics)
except Exception as e:
self.logger.error(f"收集性能指标失败: {e}")
return current_metrics
def _collect_system_metrics(self, timestamp: datetime) -> List[PerformanceMetric]:
"""收集系统指标"""
metrics = []
try:
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
metrics.append(PerformanceMetric(
name="system.cpu.usage",
metric_type=MetricType.CPU,
value=cpu_percent,
unit="percent",
timestamp=timestamp,
threshold_warning=80.0,
threshold_critical=95.0
))
# 内存使用率
memory = psutil.virtual_memory()
metrics.append(PerformanceMetric(
name="system.memory.usage",
metric_type=MetricType.MEMORY,
value=memory.percent,
unit="percent",
timestamp=timestamp,
threshold_warning=85.0,
threshold_critical=95.0
))
# 磁盘使用率
disk = psutil.disk_usage('/')
metrics.append(PerformanceMetric(
name="system.disk.usage",
metric_type=MetricType.DISK,
value=disk.percent,
unit="percent",
timestamp=timestamp,
threshold_warning=85.0,
threshold_critical=95.0
))
# 网络IO
net_io = psutil.net_io_counters()
metrics.append(PerformanceMetric(
name="system.network.bytes_sent",
metric_type=MetricType.NETWORK,
value=net_io.bytes_sent,
unit="bytes",
timestamp=timestamp
))
metrics.append(PerformanceMetric(
name="system.network.bytes_recv",
metric_type=MetricType.NETWORK,
value=net_io.bytes_recv,
unit="bytes",
timestamp=timestamp
))
except Exception as e:
self.logger.error(f"收集系统指标失败: {e}")
return metrics
def _collect_rabbitmq_metrics(self, timestamp: datetime) -> List[PerformanceMetric]:
"""收集RabbitMQ指标"""
metrics = []
try:
# 模拟从管理API获取指标
# 实际实现应该调用RabbitMQ管理API
# 消息发布速率
metrics.append(PerformanceMetric(
name="rabbitmq.message.publish_rate",
metric_type=MetricType.THROUGHPUT,
value=150.5,
unit="messages/sec",
timestamp=timestamp
))
# 消息消费速率
metrics.append(PerformanceMetric(
name="rabbitmq.message.deliver_rate",
metric_type=MetricType.THROUGHPUT,
value=145.2,
unit="messages/sec",
timestamp=timestamp
))
# 内存使用
metrics.append(PerformanceMetric(
name="rabbitmq.memory.used",
metric_type=MetricType.MEMORY,
value=512 * 1024 * 1024, # 512MB
unit="bytes",
timestamp=timestamp,
threshold_warning=1024 * 1024 * 1024, # 1GB
threshold_critical=2048 * 1024 * 1024 # 2GB
))
# 连接数
metrics.append(PerformanceMetric(
name="rabbitmq.connections.total",
metric_type=MetricType.CONNECTION,
value=25,
unit="count",
timestamp=timestamp,
threshold_warning=100,
threshold_critical=200
))
# 队列数
metrics.append(PerformanceMetric(
name="rabbitmq.queues.total",
metric_type=MetricType.QUEUE,
value=15,
unit="count",
timestamp=timestamp
))
except Exception as e:
self.logger.error(f"收集RabbitMQ指标失败: {e}")
return metrics
def _collect_queue_metrics(self, timestamp: datetime) -> List[PerformanceMetric]:
"""收集队列指标"""
metrics = []
try:
# 模拟获取队列列表和指标
queues = [
{"name": "task_queue", "messages": 1500, "consumers": 3},
{"name": "notification_queue", "messages": 50, "consumers": 1},
{"name": "log_queue", "messages": 10000, "consumers": 2}
]
total_messages = 0
total_consumers = 0
for queue in queues:
queue_name = queue["name"]
messages = queue["messages"]
consumers = queue["consumers"]
total_messages += messages
total_consumers += consumers
# 队列消息数
metrics.append(PerformanceMetric(
name=f"rabbitmq.queue.messages",
metric_type=MetricType.QUEUE,
value=messages,
unit="count",
timestamp=timestamp,
threshold_warning=5000,
threshold_critical=10000,
tags={"queue": queue_name}
))
# 队列消费者数
metrics.append(PerformanceMetric(
name=f"rabbitmq.queue.consumers",
metric_type=MetricType.QUEUE,
value=consumers,
unit="count",
timestamp=timestamp,
tags={"queue": queue_name}
))
# 总计指标
metrics.append(PerformanceMetric(
name="rabbitmq.messages.total",
metric_type=MetricType.QUEUE,
value=total_messages,
unit="count",
timestamp=timestamp,
threshold_warning=20000,
threshold_critical=50000
))
except Exception as e:
self.logger.error(f"收集队列指标失败: {e}")
return metrics
def _collect_connection_metrics(self, timestamp: datetime) -> List[PerformanceMetric]:
"""收集连接指标"""
metrics = []
try:
# 模拟连接指标
metrics.append(PerformanceMetric(
name="rabbitmq.connections.active",
metric_type=MetricType.CONNECTION,
value=20,
unit="count",
timestamp=timestamp
))
metrics.append(PerformanceMetric(
name="rabbitmq.channels.total",
metric_type=MetricType.CONNECTION,
value=45,
unit="count",
timestamp=timestamp
))
except Exception as e:
self.logger.error(f"收集连接指标失败: {e}")
return metrics
def _check_alerts(self, metrics: List[PerformanceMetric]):
"""检查告警条件"""
for metric in metrics:
# 检查临界告警
if (metric.threshold_critical is not None and
metric.value >= metric.threshold_critical):
alert = PerformanceAlert(
metric_name=metric.name,
level=AlertLevel.CRITICAL,
message=f"{metric.name} 达到临界阈值: {metric.value}{metric.unit}",
current_value=metric.value,
threshold=metric.threshold_critical,
timestamp=datetime.now()
)
self.alerts.append(alert)
self.logger.critical(alert.message)
# 检查警告告警
elif (metric.threshold_warning is not None and
metric.value >= metric.threshold_warning):
alert = PerformanceAlert(
metric_name=metric.name,
level=AlertLevel.WARNING,
message=f"{metric.name} 达到警告阈值: {metric.value}{metric.unit}",
current_value=metric.value,
threshold=metric.threshold_warning,
timestamp=datetime.now()
)
self.alerts.append(alert)
self.logger.warning(alert.message)
def get_performance_summary(self, hours: int = 1) -> Dict[str, Any]:
"""获取性能摘要"""
cutoff_time = datetime.now() - timedelta(hours=hours)
recent_metrics = [m for m in self.metrics if m.timestamp >= cutoff_time]
recent_alerts = [a for a in self.alerts if a.timestamp >= cutoff_time]
# 按类型分组指标
metrics_by_type = {}
for metric in recent_metrics:
metric_type = metric.metric_type.value
if metric_type not in metrics_by_type:
metrics_by_type[metric_type] = []
metrics_by_type[metric_type].append(metric)
# 计算平均值
averages = {}
for metric_type, type_metrics in metrics_by_type.items():
if type_metrics:
avg_value = sum(m.value for m in type_metrics) / len(type_metrics)
averages[metric_type] = avg_value
# 告警统计
alert_counts = {}
for alert in recent_alerts:
level = alert.level.value
alert_counts[level] = alert_counts.get(level, 0) + 1
return {
"period_hours": hours,
"total_metrics": len(recent_metrics),
"total_alerts": len(recent_alerts),
"alert_breakdown": alert_counts,
"metric_averages": averages,
"latest_metrics": {
metric.name: {
"value": metric.value,
"unit": metric.unit,
"timestamp": metric.timestamp.isoformat()
}
for metric in recent_metrics[-10:] # 最近10个指标
},
"active_alerts": [
{
"metric": alert.metric_name,
"level": alert.level.value,
"message": alert.message,
"timestamp": alert.timestamp.isoformat()
}
for alert in recent_alerts if not alert.resolved
]
}
9.2.2 性能调优建议
class TuningCategory(Enum):
"""调优类别"""
MEMORY = "memory"
CPU = "cpu"
NETWORK = "network"
DISK = "disk"
CONFIGURATION = "configuration"
APPLICATION = "application"
@dataclass
class TuningRecommendation:
"""调优建议"""
category: TuningCategory
priority: str # high, medium, low
title: str
description: str
impact: str
implementation_steps: List[str]
estimated_improvement: str
risks: List[str]
prerequisites: List[str]
class PerformanceTuner:
"""性能调优器"""
def __init__(self, performance_monitor: PerformanceMonitor):
self.performance_monitor = performance_monitor
self.logger = logging.getLogger(__name__)
def analyze_performance(self) -> List[TuningRecommendation]:
"""分析性能并提供调优建议"""
recommendations = []
# 获取最近的性能数据
summary = self.performance_monitor.get_performance_summary(hours=24)
# 分析内存使用
recommendations.extend(self._analyze_memory_usage(summary))
# 分析CPU使用
recommendations.extend(self._analyze_cpu_usage(summary))
# 分析队列性能
recommendations.extend(self._analyze_queue_performance(summary))
# 分析连接性能
recommendations.extend(self._analyze_connection_performance(summary))
# 分析配置优化
recommendations.extend(self._analyze_configuration(summary))
# 按优先级排序
priority_order = {"high": 0, "medium": 1, "low": 2}
recommendations.sort(key=lambda x: priority_order.get(x.priority, 3))
return recommendations
def _analyze_memory_usage(self, summary: Dict[str, Any]) -> List[TuningRecommendation]:
"""分析内存使用"""
recommendations = []
# 检查系统内存使用
memory_avg = summary["metric_averages"].get("memory", 0)
if memory_avg > 85:
recommendations.append(TuningRecommendation(
category=TuningCategory.MEMORY,
priority="high",
title="系统内存使用率过高",
description=f"系统内存平均使用率为 {memory_avg:.1f}%,超过推荐阈值",
impact="可能导致系统性能下降、OOM错误和服务不稳定",
implementation_steps=[
"增加系统内存",
"优化RabbitMQ内存配置",
"启用消息分页到磁盘",
"减少消息积压",
"优化消息大小"
],
estimated_improvement="20-40% 性能提升",
risks=["服务重启可能影响业务", "配置错误可能导致数据丢失"],
prerequisites=["备份当前配置", "准备回滚方案"]
))
# 检查RabbitMQ内存使用
rabbitmq_memory = summary["latest_metrics"].get("rabbitmq.memory.used", {}).get("value", 0)
if rabbitmq_memory > 1024 * 1024 * 1024: # 1GB
recommendations.append(TuningRecommendation(
category=TuningCategory.MEMORY,
priority="medium",
title="RabbitMQ内存使用过高",
description=f"RabbitMQ内存使用 {rabbitmq_memory/(1024*1024*1024):.1f}GB",
impact="可能触发内存告警和流控机制",
implementation_steps=[
"配置内存高水位阈值",
"启用惰性队列",
"优化消息TTL",
"增加消费者数量"
],
estimated_improvement="15-30% 内存使用减少",
risks=["配置变更可能影响消息处理"],
prerequisites=["了解当前消息模式"]
))
return recommendations
def _analyze_cpu_usage(self, summary: Dict[str, Any]) -> List[TuningRecommendation]:
"""分析CPU使用"""
recommendations = []
cpu_avg = summary["metric_averages"].get("cpu", 0)
if cpu_avg > 80:
recommendations.append(TuningRecommendation(
category=TuningCategory.CPU,
priority="high",
title="CPU使用率过高",
description=f"CPU平均使用率为 {cpu_avg:.1f}%",
impact="可能导致消息处理延迟和系统响应缓慢",
implementation_steps=[
"增加CPU核心数",
"优化Erlang虚拟机配置",
"调整调度器数量",
"优化消息路由算法",
"减少不必要的插件"
],
estimated_improvement="25-50% 处理能力提升",
risks=["配置错误可能导致性能下降"],
prerequisites=["分析CPU使用模式", "确定瓶颈组件"]
))
return recommendations
def _analyze_queue_performance(self, summary: Dict[str, Any]) -> List[TuningRecommendation]:
"""分析队列性能"""
recommendations = []
total_messages = summary["latest_metrics"].get("rabbitmq.messages.total", {}).get("value", 0)
if total_messages > 20000:
recommendations.append(TuningRecommendation(
category=TuningCategory.APPLICATION,
priority="high",
title="队列消息积压严重",
description=f"总消息数为 {total_messages},存在严重积压",
impact="可能导致消息处理延迟和内存压力",
implementation_steps=[
"增加消费者数量",
"优化消费者处理逻辑",
"实施消息批处理",
"配置队列长度限制",
"设置消息TTL"
],
estimated_improvement="50-80% 处理速度提升",
risks=["快速消费可能导致下游系统压力"],
prerequisites=["评估下游系统容量", "准备监控方案"]
))
# 检查发布和消费速率
publish_rate = summary["latest_metrics"].get("rabbitmq.message.publish_rate", {}).get("value", 0)
deliver_rate = summary["latest_metrics"].get("rabbitmq.message.deliver_rate", {}).get("value", 0)
if publish_rate > deliver_rate * 1.2: # 发布速率比消费速率高20%
recommendations.append(TuningRecommendation(
category=TuningCategory.APPLICATION,
priority="medium",
title="消费速度跟不上发布速度",
description=f"发布速率 {publish_rate:.1f}/s > 消费速率 {deliver_rate:.1f}/s",
impact="将导致消息持续积压",
implementation_steps=[
"增加消费者实例",
"优化消费者性能",
"实施背压机制",
"考虑消息分片"
],
estimated_improvement="平衡发布和消费速率",
risks=["消费者增加可能增加系统负载"],
prerequisites=["分析消费者瓶颈"]
))
return recommendations
def _analyze_connection_performance(self, summary: Dict[str, Any]) -> List[TuningRecommendation]:
"""分析连接性能"""
recommendations = []
total_connections = summary["latest_metrics"].get("rabbitmq.connections.total", {}).get("value", 0)
if total_connections > 100:
recommendations.append(TuningRecommendation(
category=TuningCategory.CONFIGURATION,
priority="medium",
title="连接数过多",
description=f"当前连接数为 {total_connections}",
impact="可能导致资源消耗过多和性能下降",
implementation_steps=[
"实施连接池",
"优化连接复用",
"设置连接限制",
"监控连接生命周期"
],
estimated_improvement="20-30% 资源使用减少",
risks=["连接限制可能影响客户端"],
prerequisites=["分析连接使用模式"]
))
return recommendations
def _analyze_configuration(self, summary: Dict[str, Any]) -> List[TuningRecommendation]:
"""分析配置优化"""
recommendations = []
# 通用配置建议
recommendations.append(TuningRecommendation(
category=TuningCategory.CONFIGURATION,
priority="low",
title="优化RabbitMQ配置参数",
description="基于当前负载模式优化配置",
impact="提升整体性能和稳定性",
implementation_steps=[
"调整vm_memory_high_watermark",
"配置disk_free_limit",
"优化tcp_listen_options",
"设置合适的heartbeat间隔",
"配置cluster_partition_handling"
],
estimated_improvement="10-20% 整体性能提升",
risks=["配置错误可能导致服务不稳定"],
prerequisites=["备份当前配置", "在测试环境验证"]
))
return recommendations
def generate_tuning_report(self) -> str:
"""生成调优报告"""
recommendations = self.analyze_performance()
report = ["# RabbitMQ 性能调优报告\n"]
report.append(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
report.append(f"总建议数: {len(recommendations)}\n\n")
# 按优先级分组
high_priority = [r for r in recommendations if r.priority == "high"]
medium_priority = [r for r in recommendations if r.priority == "medium"]
low_priority = [r for r in recommendations if r.priority == "low"]
if high_priority:
report.append("## 🔴 高优先级建议\n")
for i, rec in enumerate(high_priority, 1):
report.append(f"### {i}. {rec.title}\n")
report.append(f"**类别**: {rec.category.value}\n")
report.append(f"**描述**: {rec.description}\n")
report.append(f"**影响**: {rec.impact}\n")
report.append(f"**预期改善**: {rec.estimated_improvement}\n")
report.append("**实施步骤**:\n")
for step in rec.implementation_steps:
report.append(f"- {step}\n")
report.append("\n")
if medium_priority:
report.append("## 🟡 中优先级建议\n")
for i, rec in enumerate(medium_priority, 1):
report.append(f"### {i}. {rec.title}\n")
report.append(f"**描述**: {rec.description}\n")
report.append(f"**预期改善**: {rec.estimated_improvement}\n")
report.append("\n")
if low_priority:
report.append("## 🟢 低优先级建议\n")
for i, rec in enumerate(low_priority, 1):
report.append(f"### {i}. {rec.title}\n")
report.append(f"**描述**: {rec.description}\n")
report.append("\n")
return "".join(report)
9.3 运维自动化
9.3.1 自动化部署与配置
class DeploymentStage(Enum):
"""部署阶段"""
PREPARATION = "preparation"
INSTALLATION = "installation"
CONFIGURATION = "configuration"
VALIDATION = "validation"
ROLLBACK = "rollback"
COMPLETED = "completed"
@dataclass
class DeploymentTask:
"""部署任务"""
name: str
stage: DeploymentStage
command: str
timeout: int = 300
retry_count: int = 3
rollback_command: Optional[str] = None
dependencies: List[str] = None
validation_command: Optional[str] = None
@dataclass
class DeploymentResult:
"""部署结果"""
task_name: str
success: bool
output: str
error: Optional[str] = None
execution_time: float = 0.0
timestamp: datetime = None
class AutomationManager:
"""运维自动化管理器"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.deployment_history: List[DeploymentResult] = []
self.rollback_stack: List[DeploymentTask] = []
def execute_deployment(self, tasks: List[DeploymentTask]) -> bool:
"""执行部署任务"""
self.logger.info(f"开始执行部署,共 {len(tasks)} 个任务")
try:
# 按阶段排序任务
sorted_tasks = self._sort_tasks_by_stage(tasks)
# 执行任务
for task in sorted_tasks:
result = self._execute_task(task)
self.deployment_history.append(result)
if not result.success:
self.logger.error(f"任务 {task.name} 执行失败,开始回滚")
self._rollback_deployment()
return False
# 添加到回滚栈
if task.rollback_command:
self.rollback_stack.append(task)
self.logger.info("部署成功完成")
return True
except Exception as e:
self.logger.error(f"部署过程中发生异常: {e}")
self._rollback_deployment()
return False
def _sort_tasks_by_stage(self, tasks: List[DeploymentTask]) -> List[DeploymentTask]:
"""按阶段排序任务"""
stage_order = {
DeploymentStage.PREPARATION: 0,
DeploymentStage.INSTALLATION: 1,
DeploymentStage.CONFIGURATION: 2,
DeploymentStage.VALIDATION: 3
}
return sorted(tasks, key=lambda t: stage_order.get(t.stage, 999))
def _execute_task(self, task: DeploymentTask) -> DeploymentResult:
"""执行单个任务"""
self.logger.info(f"执行任务: {task.name}")
start_time = time.time()
for attempt in range(task.retry_count):
try:
# 模拟命令执行
output = self._run_command(task.command, task.timeout)
# 验证任务结果
if task.validation_command:
validation_output = self._run_command(task.validation_command, 30)
if "error" in validation_output.lower():
raise Exception(f"验证失败: {validation_output}")
execution_time = time.time() - start_time
return DeploymentResult(
task_name=task.name,
success=True,
output=output,
execution_time=execution_time,
timestamp=datetime.now()
)
except Exception as e:
self.logger.warning(f"任务 {task.name} 第 {attempt + 1} 次尝试失败: {e}")
if attempt == task.retry_count - 1:
execution_time = time.time() - start_time
return DeploymentResult(
task_name=task.name,
success=False,
output="",
error=str(e),
execution_time=execution_time,
timestamp=datetime.now()
)
time.sleep(2 ** attempt) # 指数退避
def _run_command(self, command: str, timeout: int) -> str:
"""运行命令"""
# 模拟命令执行
self.logger.debug(f"执行命令: {command}")
# 模拟不同命令的输出
if "install" in command:
return "Installation completed successfully"
elif "config" in command:
return "Configuration updated"
elif "start" in command:
return "Service started"
elif "status" in command:
return "Service is running"
else:
return "Command executed successfully"
def _rollback_deployment(self):
"""回滚部署"""
self.logger.info("开始回滚部署")
# 反向执行回滚命令
while self.rollback_stack:
task = self.rollback_stack.pop()
if task.rollback_command:
try:
self.logger.info(f"回滚任务: {task.name}")
output = self._run_command(task.rollback_command, 60)
self.logger.info(f"回滚成功: {output}")
except Exception as e:
self.logger.error(f"回滚任务 {task.name} 失败: {e}")
self.logger.info("回滚完成")
def create_rabbitmq_deployment_tasks(self) -> List[DeploymentTask]:
"""创建RabbitMQ部署任务"""
return [
# 准备阶段
DeploymentTask(
name="检查系统要求",
stage=DeploymentStage.PREPARATION,
command="check_system_requirements",
validation_command="validate_system_requirements"
),
DeploymentTask(
name="备份现有配置",
stage=DeploymentStage.PREPARATION,
command="backup_existing_config",
rollback_command="restore_config_backup"
),
# 安装阶段
DeploymentTask(
name="安装RabbitMQ",
stage=DeploymentStage.INSTALLATION,
command="install_rabbitmq",
timeout=600,
rollback_command="uninstall_rabbitmq",
validation_command="check_rabbitmq_installation"
),
DeploymentTask(
name="安装管理插件",
stage=DeploymentStage.INSTALLATION,
command="enable_management_plugin",
rollback_command="disable_management_plugin"
),
# 配置阶段
DeploymentTask(
name="配置RabbitMQ",
stage=DeploymentStage.CONFIGURATION,
command="configure_rabbitmq",
rollback_command="restore_default_config",
validation_command="validate_rabbitmq_config"
),
DeploymentTask(
name="创建用户和权限",
stage=DeploymentStage.CONFIGURATION,
command="setup_users_and_permissions",
rollback_command="remove_custom_users"
),
# 验证阶段
DeploymentTask(
name="启动RabbitMQ服务",
stage=DeploymentStage.VALIDATION,
command="start_rabbitmq_service",
rollback_command="stop_rabbitmq_service",
validation_command="check_service_status"
),
DeploymentTask(
name="验证连接",
stage=DeploymentStage.VALIDATION,
command="test_rabbitmq_connection",
validation_command="validate_connection_test"
)
]
def get_deployment_report(self) -> Dict[str, Any]:
"""获取部署报告"""
if not self.deployment_history:
return {"status": "no_deployments", "message": "没有部署历史"}
successful_tasks = [r for r in self.deployment_history if r.success]
failed_tasks = [r for r in self.deployment_history if not r.success]
total_time = sum(r.execution_time for r in self.deployment_history)
return {
"total_tasks": len(self.deployment_history),
"successful_tasks": len(successful_tasks),
"failed_tasks": len(failed_tasks),
"success_rate": len(successful_tasks) / len(self.deployment_history) * 100,
"total_execution_time": total_time,
"average_task_time": total_time / len(self.deployment_history),
"task_details": [
{
"name": r.task_name,
"success": r.success,
"execution_time": r.execution_time,
"timestamp": r.timestamp.isoformat() if r.timestamp else None,
"error": r.error
}
for r in self.deployment_history
]
}
9.3.2 监控告警自动化
class AlertChannel(Enum):
"""告警通道"""
EMAIL = "email"
SMS = "sms"
WEBHOOK = "webhook"
SLACK = "slack"
DINGTALK = "dingtalk"
@dataclass
class AlertRule:
"""告警规则"""
name: str
metric_name: str
condition: str # >, <, >=, <=, ==
threshold: float
duration: int # 持续时间(秒)
severity: AlertLevel
channels: List[AlertChannel]
enabled: bool = True
cooldown: int = 300 # 冷却时间(秒)
last_triggered: Optional[datetime] = None
@dataclass
class AlertNotification:
"""告警通知"""
rule_name: str
message: str
severity: AlertLevel
channels: List[AlertChannel]
timestamp: datetime
sent: bool = False
retry_count: int = 0
class AlertManager:
"""告警管理器"""
def __init__(self, performance_monitor: PerformanceMonitor):
self.performance_monitor = performance_monitor
self.logger = logging.getLogger(__name__)
self.alert_rules: List[AlertRule] = []
self.notifications: List[AlertNotification] = []
self.alert_history: List[Dict[str, Any]] = []
def add_alert_rule(self, rule: AlertRule):
"""添加告警规则"""
self.alert_rules.append(rule)
self.logger.info(f"添加告警规则: {rule.name}")
def remove_alert_rule(self, rule_name: str):
"""移除告警规则"""
self.alert_rules = [r for r in self.alert_rules if r.name != rule_name]
self.logger.info(f"移除告警规则: {rule_name}")
def check_alerts(self):
"""检查告警条件"""
current_metrics = self.performance_monitor.collect_metrics()
for rule in self.alert_rules:
if not rule.enabled:
continue
# 检查冷却时间
if (rule.last_triggered and
datetime.now() - rule.last_triggered < timedelta(seconds=rule.cooldown)):
continue
# 查找匹配的指标
matching_metrics = [m for m in current_metrics if m.name == rule.metric_name]
for metric in matching_metrics:
if self._evaluate_condition(metric.value, rule.condition, rule.threshold):
self._trigger_alert(rule, metric)
def _evaluate_condition(self, value: float, condition: str, threshold: float) -> bool:
"""评估告警条件"""
if condition == ">":
return value > threshold
elif condition == ">=":
return value >= threshold
elif condition == "<":
return value < threshold
elif condition == "<=":
return value <= threshold
elif condition == "==":
return value == threshold
else:
return False
def _trigger_alert(self, rule: AlertRule, metric):
"""触发告警"""
message = f"告警: {rule.name}\n" \
f"指标: {metric.name}\n" \
f"当前值: {metric.value} {metric.unit}\n" \
f"阈值: {rule.threshold}\n" \
f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
notification = AlertNotification(
rule_name=rule.name,
message=message,
severity=rule.severity,
channels=rule.channels,
timestamp=datetime.now()
)
self.notifications.append(notification)
rule.last_triggered = datetime.now()
# 发送通知
self._send_notification(notification)
# 记录告警历史
self.alert_history.append({
"rule_name": rule.name,
"metric_name": metric.name,
"value": metric.value,
"threshold": rule.threshold,
"severity": rule.severity.value,
"timestamp": datetime.now().isoformat()
})
self.logger.warning(f"触发告警: {rule.name}")
def _send_notification(self, notification: AlertNotification):
"""发送通知"""
for channel in notification.channels:
try:
if channel == AlertChannel.EMAIL:
self._send_email(notification)
elif channel == AlertChannel.SMS:
self._send_sms(notification)
elif channel == AlertChannel.WEBHOOK:
self._send_webhook(notification)
elif channel == AlertChannel.SLACK:
self._send_slack(notification)
elif channel == AlertChannel.DINGTALK:
self._send_dingtalk(notification)
notification.sent = True
self.logger.info(f"通知已发送: {channel.value}")
except Exception as e:
self.logger.error(f"发送通知失败 {channel.value}: {e}")
notification.retry_count += 1
def _send_email(self, notification: AlertNotification):
"""发送邮件通知"""
# 模拟邮件发送
self.logger.info(f"发送邮件: {notification.message}")
def _send_sms(self, notification: AlertNotification):
"""发送短信通知"""
# 模拟短信发送
self.logger.info(f"发送短信: {notification.message}")
def _send_webhook(self, notification: AlertNotification):
"""发送Webhook通知"""
# 模拟Webhook发送
payload = {
"alert": notification.rule_name,
"message": notification.message,
"severity": notification.severity.value,
"timestamp": notification.timestamp.isoformat()
}
self.logger.info(f"发送Webhook: {payload}")
def _send_slack(self, notification: AlertNotification):
"""发送Slack通知"""
# 模拟Slack发送
self.logger.info(f"发送Slack消息: {notification.message}")
def _send_dingtalk(self, notification: AlertNotification):
"""发送钉钉通知"""
# 模拟钉钉发送
self.logger.info(f"发送钉钉消息: {notification.message}")
def create_default_alert_rules(self) -> List[AlertRule]:
"""创建默认告警规则"""
return [
AlertRule(
name="系统内存使用率过高",
metric_name="system.memory.usage",
condition=">",
threshold=85.0,
duration=300,
severity=AlertLevel.WARNING,
channels=[AlertChannel.EMAIL, AlertChannel.WEBHOOK]
),
AlertRule(
name="系统内存使用率临界",
metric_name="system.memory.usage",
condition=">",
threshold=95.0,
duration=60,
severity=AlertLevel.CRITICAL,
channels=[AlertChannel.EMAIL, AlertChannel.SMS, AlertChannel.WEBHOOK]
),
AlertRule(
name="CPU使用率过高",
metric_name="system.cpu.usage",
condition=">",
threshold=80.0,
duration=300,
severity=AlertLevel.WARNING,
channels=[AlertChannel.EMAIL]
),
AlertRule(
name="队列消息积压",
metric_name="rabbitmq.messages.total",
condition=">",
threshold=10000,
duration=600,
severity=AlertLevel.WARNING,
channels=[AlertChannel.EMAIL, AlertChannel.SLACK]
),
AlertRule(
name="RabbitMQ内存使用过高",
metric_name="rabbitmq.memory.used",
condition=">",
threshold=1073741824, # 1GB
duration=300,
severity=AlertLevel.WARNING,
channels=[AlertChannel.EMAIL]
)
]
def get_alert_summary(self, hours: int = 24) -> Dict[str, Any]:
"""获取告警摘要"""
cutoff_time = datetime.now() - timedelta(hours=hours)
recent_alerts = [a for a in self.alert_history if
datetime.fromisoformat(a["timestamp"]) >= cutoff_time]
# 按严重程度统计
severity_counts = {}
for alert in recent_alerts:
severity = alert["severity"]
severity_counts[severity] = severity_counts.get(severity, 0) + 1
# 按规则统计
rule_counts = {}
for alert in recent_alerts:
rule = alert["rule_name"]
rule_counts[rule] = rule_counts.get(rule, 0) + 1
return {
"period_hours": hours,
"total_alerts": len(recent_alerts),
"severity_breakdown": severity_counts,
"rule_breakdown": rule_counts,
"active_rules": len([r for r in self.alert_rules if r.enabled]),
"total_rules": len(self.alert_rules),
"recent_alerts": recent_alerts[-10:] # 最近10个告警
}
9.4 完整的运维管理示例
class OperationsManager:
"""运维管理器"""
def __init__(self, management_api_url: str = "http://localhost:15672"):
self.management_api_url = management_api_url
self.logger = logging.getLogger(__name__)
# 初始化各个组件
self.connection_diagnostic = ConnectionDiagnostic(
ConnectionDiagnosticConfig(
host="localhost",
port=5672,
username="admin",
password="admin123",
vhost="/",
ssl_enabled=False
)
)
self.queue_diagnostic = QueueDiagnostic(management_api_url)
self.performance_monitor = PerformanceMonitor(management_api_url)
self.performance_tuner = PerformanceTuner(self.performance_monitor)
self.automation_manager = AutomationManager()
self.alert_manager = AlertManager(self.performance_monitor)
# 初始化告警规则
default_rules = self.alert_manager.create_default_alert_rules()
for rule in default_rules:
self.alert_manager.add_alert_rule(rule)
def initialize_monitoring(self):
"""初始化监控"""
self.logger.info("初始化RabbitMQ运维监控")
# 启动性能监控
self.performance_monitor.start_monitoring(interval=60)
# 执行初始诊断
self.run_full_diagnostic()
self.logger.info("监控初始化完成")
def run_full_diagnostic(self) -> Dict[str, Any]:
"""运行完整诊断"""
self.logger.info("开始运行完整诊断")
diagnostic_results = {
"timestamp": datetime.now().isoformat(),
"connection_diagnostic": None,
"queue_diagnostic": None,
"performance_summary": None,
"tuning_recommendations": None,
"overall_status": "unknown"
}
try:
# 连接诊断
self.logger.info("执行连接诊断")
connection_results = self.connection_diagnostic.run_full_diagnostic()
diagnostic_results["connection_diagnostic"] = {
"total_checks": len(connection_results),
"passed_checks": len([r for r in connection_results if r.severity != DiagnosticSeverity.ERROR]),
"failed_checks": len([r for r in connection_results if r.severity == DiagnosticSeverity.ERROR]),
"details": [
{
"check": r.check_name,
"status": "pass" if r.severity != DiagnosticSeverity.ERROR else "fail",
"message": r.message,
"severity": r.severity.value
}
for r in connection_results
]
}
# 队列诊断
self.logger.info("执行队列诊断")
queue_results = self.queue_diagnostic.diagnose_all_queues()
diagnostic_results["queue_diagnostic"] = {
"total_queues": len(queue_results),
"healthy_queues": len([r for r in queue_results if r.severity != DiagnosticSeverity.ERROR]),
"problematic_queues": len([r for r in queue_results if r.severity == DiagnosticSeverity.ERROR]),
"details": [
{
"queue": r.queue_name,
"issue_type": r.issue_type.value if r.issue_type else "none",
"message": r.message,
"severity": r.severity.value
}
for r in queue_results
]
}
# 性能摘要
self.logger.info("收集性能指标")
self.performance_monitor.collect_metrics()
performance_summary = self.performance_monitor.get_performance_summary(hours=1)
diagnostic_results["performance_summary"] = performance_summary
# 调优建议
self.logger.info("生成调优建议")
recommendations = self.performance_tuner.analyze_performance()
diagnostic_results["tuning_recommendations"] = [
{
"title": rec.title,
"category": rec.category.value,
"priority": rec.priority,
"description": rec.description,
"estimated_improvement": rec.estimated_improvement
}
for rec in recommendations
]
# 确定整体状态
connection_issues = diagnostic_results["connection_diagnostic"]["failed_checks"]
queue_issues = diagnostic_results["queue_diagnostic"]["problematic_queues"]
high_priority_recommendations = len([r for r in recommendations if r.priority == "high"])
if connection_issues > 0 or queue_issues > 0:
diagnostic_results["overall_status"] = "critical"
elif high_priority_recommendations > 0:
diagnostic_results["overall_status"] = "warning"
else:
diagnostic_results["overall_status"] = "healthy"
self.logger.info(f"诊断完成,整体状态: {diagnostic_results['overall_status']}")
except Exception as e:
self.logger.error(f"诊断过程中发生错误: {e}")
diagnostic_results["overall_status"] = "error"
diagnostic_results["error"] = str(e)
return diagnostic_results
def deploy_rabbitmq(self) -> bool:
"""部署RabbitMQ"""
self.logger.info("开始部署RabbitMQ")
# 创建部署任务
deployment_tasks = self.automation_manager.create_rabbitmq_deployment_tasks()
# 执行部署
success = self.automation_manager.execute_deployment(deployment_tasks)
if success:
self.logger.info("RabbitMQ部署成功")
# 部署成功后运行诊断
self.run_full_diagnostic()
else:
self.logger.error("RabbitMQ部署失败")
return success
def start_continuous_monitoring(self, check_interval: int = 300):
"""启动持续监控"""
self.logger.info(f"启动持续监控,检查间隔: {check_interval}秒")
def monitoring_loop():
while self.performance_monitor.monitoring_enabled:
try:
# 收集性能指标
self.performance_monitor.collect_metrics()
# 检查告警
self.alert_manager.check_alerts()
# 等待下次检查
time.sleep(check_interval)
except Exception as e:
self.logger.error(f"监控循环中发生错误: {e}")
time.sleep(60) # 错误时等待1分钟后重试
# 在后台线程中运行监控
import threading
monitoring_thread = threading.Thread(target=monitoring_loop, daemon=True)
monitoring_thread.start()
self.logger.info("持续监控已启动")
def stop_monitoring(self):
"""停止监控"""
self.performance_monitor.stop_monitoring()
self.logger.info("监控已停止")
def generate_operations_report(self) -> str:
"""生成运维报告"""
self.logger.info("生成运维报告")
# 获取各种数据
diagnostic_results = self.run_full_diagnostic()
performance_summary = self.performance_monitor.get_performance_summary(hours=24)
alert_summary = self.alert_manager.get_alert_summary(hours=24)
deployment_report = self.automation_manager.get_deployment_report()
tuning_report = self.performance_tuner.generate_tuning_report()
# 生成报告
report = []
report.append("# RabbitMQ 运维管理报告\n")
report.append(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
# 整体状态
status_emoji = {
"healthy": "✅",
"warning": "⚠️",
"critical": "🔴",
"error": "❌",
"unknown": "❓"
}
overall_status = diagnostic_results["overall_status"]
report.append(f"## 整体状态: {status_emoji.get(overall_status, '❓')} {overall_status.upper()}\n\n")
# 连接诊断摘要
conn_diag = diagnostic_results["connection_diagnostic"]
if conn_diag:
report.append("## 连接诊断\n")
report.append(f"- 总检查项: {conn_diag['total_checks']}\n")
report.append(f"- 通过检查: {conn_diag['passed_checks']}\n")
report.append(f"- 失败检查: {conn_diag['failed_checks']}\n\n")
# 队列诊断摘要
queue_diag = diagnostic_results["queue_diagnostic"]
if queue_diag:
report.append("## 队列诊断\n")
report.append(f"- 总队列数: {queue_diag['total_queues']}\n")
report.append(f"- 健康队列: {queue_diag['healthy_queues']}\n")
report.append(f"- 问题队列: {queue_diag['problematic_queues']}\n\n")
# 性能摘要
report.append("## 性能摘要(24小时)\n")
report.append(f"- 总指标数: {performance_summary['total_metrics']}\n")
report.append(f"- 总告警数: {performance_summary['total_alerts']}\n")
if performance_summary['metric_averages']:
report.append("- 平均指标:\n")
for metric_type, avg_value in performance_summary['metric_averages'].items():
report.append(f" - {metric_type}: {avg_value:.2f}\n")
report.append("\n")
# 告警摘要
report.append("## 告警摘要(24小时)\n")
report.append(f"- 总告警数: {alert_summary['total_alerts']}\n")
report.append(f"- 活跃规则: {alert_summary['active_rules']}/{alert_summary['total_rules']}\n")
if alert_summary['severity_breakdown']:
report.append("- 按严重程度分布:\n")
for severity, count in alert_summary['severity_breakdown'].items():
report.append(f" - {severity}: {count}\n")
report.append("\n")
# 部署摘要
if deployment_report["status"] != "no_deployments":
report.append("## 部署摘要\n")
report.append(f"- 总任务数: {deployment_report['total_tasks']}\n")
report.append(f"- 成功任务: {deployment_report['successful_tasks']}\n")
report.append(f"- 失败任务: {deployment_report['failed_tasks']}\n")
report.append(f"- 成功率: {deployment_report['success_rate']:.1f}%\n\n")
# 调优建议
recommendations = diagnostic_results.get("tuning_recommendations", [])
high_priority = [r for r in recommendations if r["priority"] == "high"]
if high_priority:
report.append("## 🔴 高优先级调优建议\n")
for i, rec in enumerate(high_priority, 1):
report.append(f"### {i}. {rec['title']}\n")
report.append(f"**类别**: {rec['category']}\n")
report.append(f"**描述**: {rec['description']}\n")
report.append(f"**预期改善**: {rec['estimated_improvement']}\n\n")
# 添加详细的调优报告
report.append("\n---\n\n")
report.append(tuning_report)
return "".join(report)
def get_health_status(self) -> Dict[str, Any]:
"""获取健康状态"""
try:
# 快速健康检查
connection_result = self.connection_diagnostic.check_network_connectivity()
# 获取最新性能指标
recent_metrics = self.performance_monitor.collect_metrics()
# 获取活跃告警
alert_summary = self.alert_manager.get_alert_summary(hours=1)
active_alerts = alert_summary.get("recent_alerts", [])
# 确定健康状态
if connection_result.severity == DiagnosticSeverity.ERROR:
status = "unhealthy"
message = "连接检查失败"
elif len([a for a in active_alerts if a.get("severity") == "critical"]) > 0:
status = "critical"
message = "存在临界告警"
elif len([a for a in active_alerts if a.get("severity") == "warning"]) > 0:
status = "warning"
message = "存在警告告警"
else:
status = "healthy"
message = "系统运行正常"
return {
"status": status,
"message": message,
"timestamp": datetime.now().isoformat(),
"metrics_count": len(recent_metrics),
"active_alerts": len(active_alerts),
"monitoring_enabled": self.performance_monitor.monitoring_enabled
}
except Exception as e:
return {
"status": "error",
"message": f"健康检查失败: {e}",
"timestamp": datetime.now().isoformat(),
"error": str(e)
}
# 完整的运维管理示例
def operations_management_example():
"""运维管理示例"""
print("=== RabbitMQ 运维管理示例 ===")
# 创建运维管理器
ops_manager = OperationsManager()
try:
# 1. 初始化监控
print("\n1. 初始化监控...")
ops_manager.initialize_monitoring()
# 2. 运行完整诊断
print("\n2. 运行完整诊断...")
diagnostic_results = ops_manager.run_full_diagnostic()
print(f"诊断完成,整体状态: {diagnostic_results['overall_status']}")
# 3. 检查健康状态
print("\n3. 检查健康状态...")
health_status = ops_manager.get_health_status()
print(f"健康状态: {health_status['status']} - {health_status['message']}")
# 4. 启动持续监控
print("\n4. 启动持续监控...")
ops_manager.start_continuous_monitoring(check_interval=60)
# 5. 模拟运行一段时间
print("\n5. 模拟监控运行...")
time.sleep(5) # 实际环境中会运行更长时间
# 6. 生成运维报告
print("\n6. 生成运维报告...")
report = ops_manager.generate_operations_report()
print("运维报告已生成")
# 保存报告到文件
with open("rabbitmq_operations_report.md", "w", encoding="utf-8") as f:
f.write(report)
print("报告已保存到 rabbitmq_operations_report.md")
# 7. 停止监控
print("\n7. 停止监控...")
ops_manager.stop_monitoring()
print("\n运维管理示例完成!")
except Exception as e:
print(f"运维管理过程中发生错误: {e}")
ops_manager.stop_monitoring()
if __name__ == "__main__":
operations_management_example()
9.5 本章总结
核心知识点
故障诊断
- 连接问题诊断:网络连通性、端口可达性、SSL配置、认证权限
- 队列问题诊断:队列长度、消费者状态、消息积压、内存使用
- 系统化的诊断流程和结果分析
性能监控与调优
- 多维度性能指标收集:系统、RabbitMQ、队列、连接
- 智能告警机制:阈值设置、告警级别、冷却时间
- 自动化调优建议:基于性能数据的智能分析
运维自动化
- 自动化部署:任务编排、错误处理、回滚机制
- 监控告警自动化:多通道通知、规则管理、历史记录
- 持续监控:后台监控、异常处理、状态报告
最佳实践
监控策略
- 建立分层监控体系:系统层、应用层、业务层
- 设置合理的告警阈值,避免告警疲劳
- 实施预防性监控,提前发现潜在问题
故障处理
- 建立标准化的故障诊断流程
- 维护详细的故障处理手册
- 实施故障复盘机制,持续改进
自动化运维
- 自动化重复性操作,减少人为错误
- 建立完善的回滚机制
- 实施渐进式部署策略
性能优化
- 基于数据驱动的性能调优
- 定期进行性能评估和优化
- 建立性能基线和趋势分析
练习题
基础练习
- 实现一个简单的RabbitMQ连接检查工具
- 创建队列监控脚本,检测消息积压情况
- 设计告警规则,监控关键性能指标
进阶练习
- 开发完整的RabbitMQ监控仪表板
- 实现自动化的性能调优建议系统
- 构建多环境的自动化部署流水线
综合项目
- 设计企业级RabbitMQ运维管理平台
- 实现基于机器学习的异常检测系统
- 构建完整的故障自愈机制
通过本章的学习,你应该能够: - 快速诊断和解决RabbitMQ常见问题 - 建立完善的监控和告警体系 - 实施自动化运维流程 - 进行系统性的性能优化 - 构建可靠的运维管理平台
这些技能将帮助你在生产环境中有效管理RabbitMQ集群,确保系统的稳定性和高可用性。 else: self.logger.info(f”网络连通性检查通过: {self.config.host}:{self.config.port}“)
except socket.gaierror as e:
self.results.append(DiagnosticResult(
issue_type=ConnectionIssueType.NETWORK_UNREACHABLE,
severity=DiagnosticSeverity.CRITICAL,
description=f"主机名解析失败: {self.config.host}",
details={"error": str(e)},
recommendations=[
"检查主机名拼写",
"验证DNS配置",
"尝试使用IP地址"
],
timestamp=datetime.now()
))
def _check_port_accessibility(self):
"""检查端口可达性"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.config.timeout)
start_time = time.time()
result = sock.connect_ex((self.config.host, self.config.port))
connect_time = time.time() - start_time
sock.close()
if result != 0:
self.results.append(DiagnosticResult(
issue_type=ConnectionIssueType.CONNECTION_REFUSED,
severity=DiagnosticSeverity.ERROR,
description=f"端口 {self.config.port} 连接被拒绝",
details={
"port": self.config.port,
"error_code": result,
"connect_time": connect_time
},
recommendations=[
"检查RabbitMQ服务状态",
"验证端口配置",
"检查防火墙规则",
"确认服务监听地址"
],
timestamp=datetime.now()
))
elif connect_time > 5.0: # 连接时间过长
self.results.append(DiagnosticResult(
issue_type=ConnectionIssueType.TIMEOUT,
severity=DiagnosticSeverity.WARNING,
description=f"连接时间过长: {connect_time:.2f}秒",
details={"connect_time": connect_time},
recommendations=[
"检查网络延迟",
"优化网络配置",
"考虑增加超时时间"
],
timestamp=datetime.now()
))
except socket.timeout:
self.results.append(DiagnosticResult(
issue_type=ConnectionIssueType.TIMEOUT,
severity=DiagnosticSeverity.ERROR,
description=f"连接超时 ({self.config.timeout}秒)",
details={"timeout": self.config.timeout},
recommendations=[
"增加连接超时时间",
"检查网络延迟",
"验证服务器负载"
],
timestamp=datetime.now()
))
def _check_ssl_configuration(self):
"""检查SSL配置"""
try:
context = ssl.create_default_context()
if self.config.ssl_cert_path:
context.load_verify_locations(self.config.ssl_cert_path)
with socket.create_connection((self.config.host, self.config.port),
timeout=self.config.timeout) as sock:
with context.wrap_socket(sock, server_hostname=self.config.host) as ssock:
cert = ssock.getpeercert()
# 检查证书有效期
not_after = datetime.strptime(cert['notAfter'], '%b %d %H:%M:%S %Y %Z')
days_until_expiry = (not_after - datetime.now()).days
if days_until_expiry < 30:
self.results.append(DiagnosticResult(
issue_type=ConnectionIssueType.SSL_HANDSHAKE_FAILED,
severity=DiagnosticSeverity.WARNING,
description=f"SSL证书即将过期 ({days_until_expiry}天)",
details={
"certificate": cert,
"days_until_expiry": days_until_expiry
},
recommendations=[
"更新SSL证书",
"设置证书过期提醒"
],
timestamp=datetime.now()
))
self.logger.info("SSL配置检查通过")
except ssl.SSLError as e:
self.results.append(DiagnosticResult(
issue_type=ConnectionIssueType.SSL_HANDSHAKE_FAILED,
severity=DiagnosticSeverity.ERROR,
description=f"SSL握手失败: {str(e)}",
details={"ssl_error": str(e)},
recommendations=[
"检查SSL证书配置",
"验证证书链",
"确认SSL协议版本",
"检查证书有效期"
],
timestamp=datetime.now()
))
def _check_authentication(self):
"""检查认证配置"""
try:
import pika
credentials = pika.PlainCredentials(
self.config.username,
self.config.password
)
parameters = pika.ConnectionParameters(
host=self.config.host,
port=self.config.port,
virtual_host=self.config.vhost,
credentials=credentials,
connection_attempts=1,
retry_delay=0,
socket_timeout=self.config.timeout
)
connection = pika.BlockingConnection(parameters)
connection.close()
self.logger.info("认证检查通过")
except pika.exceptions.AuthenticationError as e:
self.results.append(DiagnosticResult(
issue_type=ConnectionIssueType.AUTHENTICATION_FAILED,
severity=DiagnosticSeverity.ERROR,
description=f"认证失败: {str(e)}",
details={
"username": self.config.username,
"vhost": self.config.vhost
},
recommendations=[
"验证用户名和密码",
"检查用户是否存在",
"确认用户权限",
"检查虚拟主机访问权限"
],
timestamp=datetime.now()
))
except Exception as e:
self.logger.error(f"认证检查失败: {e}")
def _check_permissions(self):
"""检查权限配置"""
# 这里可以通过管理API检查用户权限
# 简化实现,实际应该调用RabbitMQ管理API
pass
def _check_resource_limits(self):
"""检查资源限制"""
try:
# 检查系统资源
memory_percent = psutil.virtual_memory().percent
cpu_percent = psutil.cpu_percent(interval=1)
if memory_percent > 90:
self.results.append(DiagnosticResult(
issue_type=ConnectionIssueType.RESOURCE_EXHAUSTED,
severity=DiagnosticSeverity.WARNING,
description=f"系统内存使用率过高: {memory_percent:.1f}%",
details={"memory_percent": memory_percent},
recommendations=[
"释放系统内存",
"检查内存泄漏",
"增加系统内存"
],
timestamp=datetime.now()
))
if cpu_percent > 95:
self.results.append(DiagnosticResult(
issue_type=ConnectionIssueType.RESOURCE_EXHAUSTED,
severity=DiagnosticSeverity.WARNING,
description=f"系统CPU使用率过高: {cpu_percent:.1f}%",
details={"cpu_percent": cpu_percent},
recommendations=[
"检查高CPU进程",
"优化应用性能",
"增加CPU资源"
],
timestamp=datetime.now()
))
except Exception as e:
self.logger.error(f"资源检查失败: {e}")
def get_diagnostic_summary(self) -> Dict[str, Any]:
"""获取诊断摘要"""
if not self.results:
return {"status": "no_issues", "message": "未发现问题"}
severity_counts = {}
for result in self.results:
severity = result.severity.value
severity_counts[severity] = severity_counts.get(severity, 0) + 1
critical_issues = [r for r in self.results if r.severity == DiagnosticSeverity.CRITICAL]
return {
"status": "issues_found" if self.results else "healthy",
"total_issues": len(self.results),
"severity_breakdown": severity_counts,
"critical_issues": len(critical_issues),
"most_critical": critical_issues[0].description if critical_issues else None,
"timestamp": datetime.now().isoformat()
}
### 9.1.2 队列问题诊断
```python
class QueueIssueType(Enum):
"""队列问题类型"""
QUEUE_NOT_FOUND = "queue_not_found"
QUEUE_FULL = "queue_full"
CONSUMER_TIMEOUT = "consumer_timeout"
MESSAGE_TTL_EXPIRED = "message_ttl_expired"
DEAD_LETTER_OVERFLOW = "dead_letter_overflow"
MEMORY_PRESSURE = "memory_pressure"
SLOW_CONSUMER = "slow_consumer"
UNACKED_MESSAGES = "unacked_messages"
@dataclass
class QueueDiagnosticInfo:
"""队列诊断信息"""
name: str
vhost: str
messages: int
consumers: int
memory: int
message_rate: float
consumer_rate: float
unacked_messages: int
message_ttl: Optional[int]
max_length: Optional[int]
dead_letter_exchange: Optional[str]
class QueueDiagnostic:
"""队列诊断器"""
def __init__(self, management_api):
self.management_api = management_api
self.logger = logging.getLogger(__name__)
self.results: List[DiagnosticResult] = []
def diagnose_queue(self, queue_name: str, vhost: str = "/") -> List[DiagnosticResult]:
"""诊断队列问题"""
self.results.clear()
try:
# 获取队列信息
queue_info = self._get_queue_info(queue_name, vhost)
if not queue_info:
self.results.append(DiagnosticResult(
issue_type=QueueIssueType.QUEUE_NOT_FOUND,
severity=DiagnosticSeverity.ERROR,
description=f"队列不存在: {queue_name}",
details={"queue_name": queue_name, "vhost": vhost},
recommendations=[
"检查队列名称拼写",
"确认队列已创建",
"验证虚拟主机"
],
timestamp=datetime.now()
))
return self.results
# 检查队列长度
self._check_queue_length(queue_info)
# 检查消费者状态
self._check_consumer_status(queue_info)
# 检查消息积压
self._check_message_backlog(queue_info)
# 检查内存使用
self._check_memory_usage(queue_info)
# 检查未确认消息
self._check_unacked_messages(queue_info)
# 检查TTL配置
self._check_ttl_configuration(queue_info)
except Exception as e:
self.logger.error(f"队列诊断失败: {e}")
self.results.append(DiagnosticResult(
issue_type=QueueIssueType.QUEUE_NOT_FOUND,
severity=DiagnosticSeverity.ERROR,
description=f"队列诊断异常: {str(e)}",
details={"exception": str(e)},
recommendations=["检查队列访问权限", "验证管理API连接"],
timestamp=datetime.now()
))
return self.results
def _get_queue_info(self, queue_name: str, vhost: str) -> Optional[QueueDiagnosticInfo]:
"""获取队列信息"""
try:
# 模拟从管理API获取队列信息
# 实际实现应该调用RabbitMQ管理API
queue_data = {
"name": queue_name,
"vhost": vhost,
"messages": 1000,
"consumers": 2,
"memory": 1024*1024, # 1MB
"message_stats": {
"publish_details": {"rate": 10.5},
"deliver_details": {"rate": 8.2}
},
"messages_unacknowledged": 50,
"arguments": {
"x-message-ttl": 3600000, # 1小时
"x-max-length": 10000,
"x-dead-letter-exchange": "dlx"
}
}
return QueueDiagnosticInfo(
name=queue_data["name"],
vhost=queue_data["vhost"],
messages=queue_data["messages"],
consumers=queue_data["consumers"],
memory=queue_data["memory"],
message_rate=queue_data["message_stats"]["publish_details"]["rate"],
consumer_rate=queue_data["message_stats"]["deliver_details"]["rate"],
unacked_messages=queue_data["messages_unacknowledged"],
message_ttl=queue_data["arguments"].get("x-message-ttl"),
max_length=queue_data["arguments"].get("x-max-length"),
dead_letter_exchange=queue_data["arguments"].get("x-dead-letter-exchange")
)
except Exception as e:
self.logger.error(f"获取队列信息失败: {e}")
return None
def _check_queue_length(self, queue_info: QueueDiagnosticInfo):
"""检查队列长度"""
if queue_info.max_length and queue_info.messages >= queue_info.max_length * 0.9:
self.results.append(DiagnosticResult(
issue_type=QueueIssueType.QUEUE_FULL,
severity=DiagnosticSeverity.WARNING,
description=f"队列接近最大长度: {queue_info.messages}/{queue_info.max_length}",
details={
"current_messages": queue_info.messages,
"max_length": queue_info.max_length,
"usage_percent": (queue_info.messages / queue_info.max_length) * 100
},
recommendations=[
"增加消费者数量",
"优化消费者性能",
"增加队列最大长度",
"检查死信队列配置"
],
timestamp=datetime.now()
))
def _check_consumer_status(self, queue_info: QueueDiagnosticInfo):
"""检查消费者状态"""
if queue_info.consumers == 0 and queue_info.messages > 0:
self.results.append(DiagnosticResult(
issue_type=QueueIssueType.SLOW_CONSUMER,
severity=DiagnosticSeverity.ERROR,
description=f"队列无消费者但有 {queue_info.messages} 条消息",
details={
"messages": queue_info.messages,
"consumers": queue_info.consumers
},
recommendations=[
"启动消费者应用",
"检查消费者连接状态",
"验证消费者配置"
],
timestamp=datetime.now()
))
elif queue_info.message_rate > queue_info.consumer_rate * 2:
self.results.append(DiagnosticResult(
issue_type=QueueIssueType.SLOW_CONSUMER,
severity=DiagnosticSeverity.WARNING,
description=f"消费速度慢于生产速度: {queue_info.consumer_rate:.1f} < {queue_info.message_rate:.1f}",
details={
"message_rate": queue_info.message_rate,
"consumer_rate": queue_info.consumer_rate,
"rate_ratio": queue_info.message_rate / max(queue_info.consumer_rate, 0.1)
},
recommendations=[
"增加消费者数量",
"优化消费者处理逻辑",
"使用消费者预取限制",
"考虑消息批处理"
],
timestamp=datetime.now()
))
def _check_message_backlog(self, queue_info: QueueDiagnosticInfo):
"""检查消息积压"""
if queue_info.messages > 10000: # 消息积压阈值
severity = DiagnosticSeverity.CRITICAL if queue_info.messages > 50000 else DiagnosticSeverity.WARNING
self.results.append(DiagnosticResult(
issue_type=QueueIssueType.SLOW_CONSUMER,
severity=severity,
description=f"消息积压严重: {queue_info.messages} 条消息",
details={"messages": queue_info.messages},
recommendations=[
"紧急增加消费者",
"检查消费者异常",
"考虑消息清理",
"分析消息处理瓶颈"
],
timestamp=datetime.now()
))
def _check_memory_usage(self, queue_info: QueueDiagnosticInfo):
"""检查内存使用"""
memory_mb = queue_info.memory / (1024 * 1024)
if memory_mb > 100: # 100MB阈值
self.results.append(DiagnosticResult(
issue_type=QueueIssueType.MEMORY_PRESSURE,
severity=DiagnosticSeverity.WARNING,
description=f"队列内存使用过高: {memory_mb:.1f}MB",
details={"memory_mb": memory_mb},
recommendations=[
"增加消费速度",
"检查消息大小",
"考虑消息持久化到磁盘",
"优化消息格式"
],
timestamp=datetime.now()
))
def _check_unacked_messages(self, queue_info: QueueDiagnosticInfo):
"""检查未确认消息"""
if queue_info.unacked_messages > queue_info.messages * 0.5:
self.results.append(DiagnosticResult(
issue_type=QueueIssueType.UNACKED_MESSAGES,
severity=DiagnosticSeverity.WARNING,
description=f"未确认消息过多: {queue_info.unacked_messages}/{queue_info.messages}",
details={
"unacked_messages": queue_info.unacked_messages,
"total_messages": queue_info.messages,
"unacked_percent": (queue_info.unacked_messages / max(queue_info.messages, 1)) * 100
},
recommendations=[
"检查消费者确认机制",
"减少消费者预取数量",
"检查消费者处理时间",
"考虑消息超时设置"
],
timestamp=datetime.now()
))
def _check_ttl_configuration(self, queue_info: QueueDiagnosticInfo):
"""检查TTL配置"""
if queue_info.message_ttl:
ttl_hours = queue_info.message_ttl / (1000 * 3600) # 转换为小时
if ttl_hours < 1: # TTL小于1小时
self.results.append(DiagnosticResult(
issue_type=QueueIssueType.MESSAGE_TTL_EXPIRED,
severity=DiagnosticSeverity.INFO,
description=f"消息TTL较短: {ttl_hours:.1f}小时",
details={"ttl_hours": ttl_hours},
recommendations=[
"确认TTL设置合理",
"监控消息过期情况",
"配置死信队列"
],
timestamp=datetime.now()
))