概述

本章将详细介绍Grafana的告警系统,包括告警规则配置、通知渠道设置、告警策略管理和故障排除。通过学习本章内容,你将能够构建完整的监控告警体系。

学习目标

  • 理解Grafana告警系统的架构和工作原理
  • 掌握告警规则的创建和配置方法
  • 学会配置各种通知渠道
  • 了解告警策略和静默管理
  • 掌握告警系统的监控和故障排除

告警系统架构

1. 核心组件

from enum import Enum
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from datetime import datetime, timedelta

class AlertState(Enum):
    """告警状态枚举"""
    NORMAL = "Normal"
    PENDING = "Pending"
    ALERTING = "Alerting"
    NO_DATA = "NoData"
    EXECUTION_ERROR = "ExecutionError"

class NotificationChannel(Enum):
    """通知渠道枚举"""
    EMAIL = "email"
    SLACK = "slack"
    WEBHOOK = "webhook"
    PAGERDUTY = "pagerduty"
    TEAMS = "teams"
    DISCORD = "discord"
    TELEGRAM = "telegram"
    WECHAT = "wechat"
    DINGTALK = "dingtalk"

class AlertSeverity(Enum):
    """告警严重级别"""
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
    INFO = "info"

@dataclass
class AlertCondition:
    """告警条件"""
    query: str
    reducer: str  # avg, min, max, sum, count, last, median, diff, diff_abs, count_non_null
    evaluator_type: str  # gt, lt, within_range, outside_range, no_value
    evaluator_params: List[float]
    time_range: str
    
class AlertRule:
    """告警规则类"""
    
    def __init__(self, name: str, datasource_uid: str):
        self.name = name
        self.datasource_uid = datasource_uid
        self.uid = f"alert_{name.lower().replace(' ', '_')}"
        self.conditions: List[AlertCondition] = []
        self.frequency = "10s"
        self.handler = 1  # 1 for keep_state, 0 for alerting
        self.no_data_state = AlertState.NO_DATA
        self.exec_err_state = AlertState.EXECUTION_ERROR
        self.for_duration = "5m"
        self.annotations = {}
        self.labels = {}
        self.notification_uids: List[str] = []
        self.message = ""
        self.severity = AlertSeverity.MEDIUM
    
    def add_condition(self, condition: AlertCondition) -> 'AlertRule':
        """添加告警条件"""
        self.conditions.append(condition)
        return self
    
    def set_frequency(self, frequency: str) -> 'AlertRule':
        """设置评估频率"""
        self.frequency = frequency
        return self
    
    def set_for_duration(self, duration: str) -> 'AlertRule':
        """设置持续时间"""
        self.for_duration = duration
        return self
    
    def add_annotation(self, key: str, value: str) -> 'AlertRule':
        """添加注释"""
        self.annotations[key] = value
        return self
    
    def add_label(self, key: str, value: str) -> 'AlertRule':
        """添加标签"""
        self.labels[key] = value
        return self
    
    def set_severity(self, severity: AlertSeverity) -> 'AlertRule':
        """设置严重级别"""
        self.severity = severity
        return self
    
    def add_notification(self, notification_uid: str) -> 'AlertRule':
        """添加通知渠道"""
        self.notification_uids.append(notification_uid)
        return self
    
    def set_message(self, message: str) -> 'AlertRule':
        """设置告警消息"""
        self.message = message
        return self
    
    def to_dict(self) -> Dict:
        """转换为字典格式"""
        return {
            "uid": self.uid,
            "title": self.name,
            "condition": "A",
            "data": [
                {
                    "refId": "A",
                    "queryType": "",
                    "relativeTimeRange": {
                        "from": 600,
                        "to": 0
                    },
                    "model": {
                        "expr": self.conditions[0].query if self.conditions else "",
                        "interval": "",
                        "refId": "A"
                    }
                }
            ],
            "intervalSeconds": self._parse_duration(self.frequency),
            "maxDataPoints": 43200,
            "noDataState": self.no_data_state.value,
            "execErrState": self.exec_err_state.value,
            "for": self.for_duration,
            "annotations": self.annotations,
            "labels": self.labels,
            "message": self.message
        }
    
    def _parse_duration(self, duration: str) -> int:
        """解析持续时间字符串"""
        if duration.endswith('s'):
            return int(duration[:-1])
        elif duration.endswith('m'):
            return int(duration[:-1]) * 60
        elif duration.endswith('h'):
            return int(duration[:-1]) * 3600
        return 60  # 默认60秒

class AlertManager:
    """告警管理器"""
    
    def __init__(self):
        self.rules: Dict[str, AlertRule] = {}
        self.notification_channels: Dict[str, Dict] = {}
        self.alert_groups: Dict[str, List[str]] = {}
        self.silences: List[Dict] = []
    
    def create_rule(self, name: str, datasource_uid: str) -> AlertRule:
        """创建告警规则"""
        rule = AlertRule(name, datasource_uid)
        self.rules[rule.uid] = rule
        return rule
    
    def get_rule(self, uid: str) -> Optional[AlertRule]:
        """获取告警规则"""
        return self.rules.get(uid)
    
    def delete_rule(self, uid: str) -> bool:
        """删除告警规则"""
        if uid in self.rules:
            del self.rules[uid]
            return True
        return False
    
    def list_rules(self, folder: Optional[str] = None) -> List[AlertRule]:
        """列出告警规则"""
        rules = list(self.rules.values())
        if folder:
            # 根据文件夹过滤(简化实现)
            rules = [rule for rule in rules if rule.labels.get('folder') == folder]
        return rules
    
    def create_notification_channel(self, name: str, channel_type: NotificationChannel, settings: Dict) -> str:
        """创建通知渠道"""
        uid = f"notifier_{name.lower().replace(' ', '_')}"
        self.notification_channels[uid] = {
            "uid": uid,
            "name": name,
            "type": channel_type.value,
            "settings": settings,
            "isDefault": False,
            "sendReminder": False,
            "disableResolveMessage": False,
            "frequency": "10s"
        }
        return uid
    
    def get_notification_channel(self, uid: str) -> Optional[Dict]:
        """获取通知渠道"""
        return self.notification_channels.get(uid)
    
    def create_alert_group(self, name: str, rule_uids: List[str]) -> str:
        """创建告警组"""
        self.alert_groups[name] = rule_uids
        return name
    
    def add_silence(self, matcher: Dict, starts_at: datetime, ends_at: datetime, comment: str) -> str:
        """添加静默规则"""
        silence_id = f"silence_{len(self.silences) + 1}"
        silence = {
            "id": silence_id,
            "matchers": [matcher],
            "startsAt": starts_at.isoformat(),
            "endsAt": ends_at.isoformat(),
            "comment": comment,
            "createdBy": "admin",
            "status": {
                "state": "active"
            }
        }
        self.silences.append(silence)
        return silence_id
    
    def get_alert_status(self, rule_uid: str) -> Dict:
        """获取告警状态"""
        rule = self.get_rule(rule_uid)
        if not rule:
            return {"error": "Rule not found"}
        
        # 模拟告警状态
        return {
            "rule_uid": rule_uid,
            "rule_name": rule.name,
            "state": AlertState.NORMAL.value,
            "last_evaluation": datetime.now().isoformat(),
            "evaluation_duration": "150ms",
            "annotations": rule.annotations,
            "labels": rule.labels
        }
    
    def test_notification(self, channel_uid: str) -> Dict:
        """测试通知渠道"""
        channel = self.get_notification_channel(channel_uid)
        if not channel:
            return {"success": False, "error": "Channel not found"}
        
        # 模拟测试结果
        return {
            "success": True,
            "message": f"Test notification sent to {channel['name']}",
            "channel_type": channel['type'],
            "timestamp": datetime.now().isoformat()
        }
    
    def export_config(self) -> Dict:
        """导出告警配置"""
        return {
            "rules": [rule.to_dict() for rule in self.rules.values()],
            "notification_channels": list(self.notification_channels.values()),
            "alert_groups": self.alert_groups,
            "silences": self.silences
        }
    
    def import_config(self, config: Dict) -> bool:
        """导入告警配置"""
        try:
            # 导入规则
            for rule_data in config.get("rules", []):
                rule = AlertRule(rule_data["title"], "default")
                rule.uid = rule_data["uid"]
                rule.annotations = rule_data.get("annotations", {})
                rule.labels = rule_data.get("labels", {})
                rule.message = rule_data.get("message", "")
                self.rules[rule.uid] = rule
            
            # 导入通知渠道
            for channel_data in config.get("notification_channels", []):
                self.notification_channels[channel_data["uid"]] = channel_data
            
            # 导入告警组
            self.alert_groups.update(config.get("alert_groups", {}))
            
            # 导入静默规则
            self.silences.extend(config.get("silences", []))
            
            return True
        except Exception as e:
            print(f"Import failed: {e}")
            return False

# 使用示例
alert_manager = AlertManager()

# 创建告警规则
rule = alert_manager.create_rule("High CPU Usage", "prometheus_uid")
rule.add_condition(AlertCondition(
    query="avg(cpu_usage_percent) by (instance)",
    reducer="avg",
    evaluator_type="gt",
    evaluator_params=[80.0],
    time_range="5m"
)).set_frequency("30s").set_for_duration("2m").add_annotation(
    "description", "CPU usage is above 80%"
).add_label("severity", "warning").set_severity(AlertSeverity.HIGH)

print("告警规则创建完成:", rule.name)

# 创建通知渠道
email_channel = alert_manager.create_notification_channel(
    "Email Alerts",
    NotificationChannel.EMAIL,
    {
        "addresses": "admin@example.com;ops@example.com",
        "subject": "Grafana Alert",
        "singleEmail": False
    }
)

print("通知渠道创建完成:", email_channel)

# 添加通知到规则
rule.add_notification(email_channel)

# 获取告警状态
status = alert_manager.get_alert_status(rule.uid)
print("告警状态:", status["state"])

2. 告警工作流程

class AlertWorkflow:
    """告警工作流程管理"""
    
    def __init__(self):
        self.workflow_steps = {
            "evaluation": "评估告警条件",
            "state_change": "状态变更检测",
            "notification": "发送通知",
            "escalation": "告警升级",
            "resolution": "告警解决"
        }
        self.escalation_rules: List[Dict] = []
    
    def create_escalation_rule(self, name: str, conditions: Dict, actions: List[Dict]) -> Dict:
        """创建升级规则"""
        rule = {
            "name": name,
            "conditions": conditions,
            "actions": actions,
            "enabled": True,
            "created_at": datetime.now().isoformat()
        }
        self.escalation_rules.append(rule)
        return rule
    
    def simulate_alert_lifecycle(self, alert_rule: AlertRule) -> List[Dict]:
        """模拟告警生命周期"""
        lifecycle_events = []
        
        # 1. 初始评估
        lifecycle_events.append({
            "timestamp": datetime.now().isoformat(),
            "event": "evaluation_started",
            "rule": alert_rule.name,
            "state": AlertState.NORMAL.value,
            "details": "开始评估告警条件"
        })
        
        # 2. 条件触发
        lifecycle_events.append({
            "timestamp": (datetime.now() + timedelta(seconds=30)).isoformat(),
            "event": "condition_triggered",
            "rule": alert_rule.name,
            "state": AlertState.PENDING.value,
            "details": "告警条件被触发,进入待定状态"
        })
        
        # 3. 持续时间满足
        lifecycle_events.append({
            "timestamp": (datetime.now() + timedelta(minutes=2)).isoformat(),
            "event": "state_changed_to_alerting",
            "rule": alert_rule.name,
            "state": AlertState.ALERTING.value,
            "details": f"持续时间{alert_rule.for_duration}满足,状态变更为告警"
        })
        
        # 4. 发送通知
        for notification_uid in alert_rule.notification_uids:
            lifecycle_events.append({
                "timestamp": (datetime.now() + timedelta(minutes=2, seconds=10)).isoformat(),
                "event": "notification_sent",
                "rule": alert_rule.name,
                "state": AlertState.ALERTING.value,
                "details": f"通知已发送到 {notification_uid}"
            })
        
        # 5. 告警解决
        lifecycle_events.append({
            "timestamp": (datetime.now() + timedelta(minutes=10)).isoformat(),
            "event": "alert_resolved",
            "rule": alert_rule.name,
            "state": AlertState.NORMAL.value,
            "details": "告警条件不再满足,告警已解决"
        })
        
        return lifecycle_events
    
    def generate_workflow_diagram(self) -> str:
        """生成工作流程图"""
        diagram = """
# 告警工作流程图

```mermaid
graph TD
    A[监控数据] --> B[评估告警条件]
    B --> C{条件满足?}
    C -->|是| D[进入Pending状态]
    C -->|否| E[保持Normal状态]
    D --> F{持续时间满足?}
    F -->|是| G[进入Alerting状态]
    F -->|否| D
    G --> H[发送通知]
    H --> I[检查升级规则]
    I --> J{需要升级?}
    J -->|是| K[执行升级动作]
    J -->|否| L[等待条件变化]
    K --> L
    L --> M{条件解决?}
    M -->|是| N[发送解决通知]
    M -->|否| L
    N --> E
    E --> B

工作流程说明

  1. 数据收集: 从配置的数据源收集监控数据

  2. 条件评估: 根据告警规则评估数据是否满足告警条件

  3. 状态管理: 管理告警状态转换(Normal → Pending → Alerting)

  4. 通知发送: 当状态变为Alerting时发送通知

  5. 升级处理: 根据升级规则执行相应动作

  6. 解决处理: 当条件不再满足时发送解决通知 “”” return diagram

    def create_notification_template(self, channel_type: NotificationChannel) -> Dict: “”“创建通知模板”“” templates = { NotificationChannel.EMAIL: { “subject”: “[{{ .Status | toUpper }}] {{ .GroupLabels.alertname }}”, “body”: “”” 告警详情:

  • 规则名称: {{ .GroupLabels.alertname }}
  • 严重级别: {{ .GroupLabels.severity }}
  • 状态: {{ .Status }}
  • 触发时间: {{ .StartsAt }}
  • 描述: {{ .CommonAnnotations.description }}

查看详情: {{ .ExternalURL }} “”” }, NotificationChannel.SLACK: { “channel”: “#alerts”, “username”: “Grafana”, “title”: “{{ .Status | toUpper }}: {{ .GroupLabels.alertname }}”, “text”: “{{ .CommonAnnotations.description }}”, “color”: “{{ if eq .Status \“firing\” }}danger{{ else }}good{{ end }}” }, NotificationChannel.WEBHOOK: { “url”: “https://hooks.example.com/webhook”, “method”: “POST”, “headers”: { “Content-Type”: “application/json”, “Authorization”: “Bearer YOUR_TOKEN” }, “body”: { “alert_name”: “{{ .GroupLabels.alertname }}”, “status”: “{{ .Status }}”, “severity”: “{{ .GroupLabels.severity }}”, “description”: “{{ .CommonAnnotations.description }}”, “starts_at”: “{{ .StartsAt }}”, “external_url”: “{{ .ExternalURL }}” } } }

    return templates.get(channel_type, {})

def validate_alert_rule(self, rule: AlertRule) -> Dict:
    """验证告警规则"""
    validation_result = {
        "valid": True,
        "errors": [],
        "warnings": [],
        "suggestions": []
    }

    # 检查基本配置
    if not rule.name:
        validation_result["errors"].append("告警规则名称不能为空")
        validation_result["valid"] = False

    if not rule.conditions:
        validation_result["errors"].append("至少需要一个告警条件")
        validation_result["valid"] = False

    # 检查频率设置
    frequency_seconds = rule._parse_duration(rule.frequency)
    if frequency_seconds < 10:
        validation_result["warnings"].append("评估频率过高可能影响性能")

    # 检查持续时间
    for_seconds = rule._parse_duration(rule.for_duration)
    if for_seconds < frequency_seconds:
        validation_result["warnings"].append("持续时间应该大于评估频率")

    # 检查通知配置
    if not rule.notification_uids:
        validation_result["warnings"].append("未配置通知渠道")

    # 检查标签和注释
    if not rule.labels.get("severity"):
        validation_result["suggestions"].append("建议添加severity标签")

    if not rule.annotations.get("description"):
        validation_result["suggestions"].append("建议添加description注释")

    return validation_result

使用示例

workflow = AlertWorkflow()

创建升级规则

escalation_rule = workflow.create_escalation_rule( “Critical Alert Escalation”, { “severity”: “critical”, “duration”: “15m”, “no_response”: True }, [ {“type”: “notify_manager”, “target”: “manager@example.com”}, {“type”: “create_incident”, “system”: “pagerduty”} ] )

print(“升级规则创建完成:”, escalation_rule[“name”])

模拟告警生命周期

lifecycle = workflow.simulate_alert_lifecycle(rule) print(“告警生命周期事件数:”, len(lifecycle))

验证告警规则

validation = workflow.validate_alert_rule(rule) print(“规则验证结果:”, “通过” if validation[“valid”] else “失败”)

生成通知模板

email_template = workflow.create_notification_template(NotificationChannel.EMAIL) print(“邮件模板主题:”, email_template[“subject”])


## 告警规则配置

### 1. 基础告警规则

```python
class AlertRuleBuilder:
    """告警规则构建器"""
    
    def __init__(self):
        self.rule_templates = {
            "cpu_usage": {
                "name": "High CPU Usage",
                "query": "avg(100 - (avg by (instance) (irate(node_cpu_seconds_total{mode=\"idle\"}[5m])) * 100))",
                "threshold": 80,
                "operator": "gt",
                "duration": "5m",
                "severity": AlertSeverity.HIGH
            },
            "memory_usage": {
                "name": "High Memory Usage",
                "query": "(1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) * 100",
                "threshold": 85,
                "operator": "gt",
                "duration": "3m",
                "severity": AlertSeverity.HIGH
            },
            "disk_usage": {
                "name": "High Disk Usage",
                "query": "(1 - (node_filesystem_avail_bytes / node_filesystem_size_bytes)) * 100",
                "threshold": 90,
                "operator": "gt",
                "duration": "1m",
                "severity": AlertSeverity.CRITICAL
            },
            "service_down": {
                "name": "Service Down",
                "query": "up",
                "threshold": 1,
                "operator": "lt",
                "duration": "1m",
                "severity": AlertSeverity.CRITICAL
            },
            "response_time": {
                "name": "High Response Time",
                "query": "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))",
                "threshold": 2,
                "operator": "gt",
                "duration": "2m",
                "severity": AlertSeverity.MEDIUM
            },
            "error_rate": {
                "name": "High Error Rate",
                "query": "rate(http_requests_total{status=~\"5..\"}[5m]) / rate(http_requests_total[5m]) * 100",
                "threshold": 5,
                "operator": "gt",
                "duration": "3m",
                "severity": AlertSeverity.HIGH
            }
        }
    
    def create_from_template(self, template_name: str, alert_manager: AlertManager, 
                           datasource_uid: str, custom_params: Dict = None) -> AlertRule:
        """从模板创建告警规则"""
        if template_name not in self.rule_templates:
            raise ValueError(f"Unknown template: {template_name}")
        
        template = self.rule_templates[template_name].copy()
        
        # 应用自定义参数
        if custom_params:
            template.update(custom_params)
        
        # 创建告警规则
        rule = alert_manager.create_rule(template["name"], datasource_uid)
        
        # 添加条件
        condition = AlertCondition(
            query=template["query"],
            reducer="last",
            evaluator_type=template["operator"],
            evaluator_params=[template["threshold"]],
            time_range="5m"
        )
        
        rule.add_condition(condition)
        rule.set_for_duration(template["duration"])
        rule.set_severity(template["severity"])
        
        # 添加默认注释和标签
        rule.add_annotation("description", f"{template['name']} - Threshold: {template['threshold']}")
        rule.add_label("severity", template["severity"].value)
        rule.add_label("template", template_name)
        
        return rule
    
    def create_composite_rule(self, name: str, alert_manager: AlertManager, 
                            datasource_uid: str, conditions: List[Dict]) -> AlertRule:
        """创建复合条件告警规则"""
        rule = alert_manager.create_rule(name, datasource_uid)
        
        for i, condition_config in enumerate(conditions):
            condition = AlertCondition(
                query=condition_config["query"],
                reducer=condition_config.get("reducer", "last"),
                evaluator_type=condition_config.get("operator", "gt"),
                evaluator_params=[condition_config["threshold"]],
                time_range=condition_config.get("time_range", "5m")
            )
            rule.add_condition(condition)
        
        return rule
    
    def create_anomaly_detection_rule(self, name: str, alert_manager: AlertManager, 
                                     datasource_uid: str, metric: str, 
                                     sensitivity: float = 2.0) -> AlertRule:
        """创建异常检测告警规则"""
        # 使用统计方法检测异常
        query = f"""
        (
          {metric} - 
          avg_over_time({metric}[1h])
        ) / stddev_over_time({metric}[1h]) > {sensitivity}
        """
        
        rule = alert_manager.create_rule(name, datasource_uid)
        
        condition = AlertCondition(
            query=query,
            reducer="last",
            evaluator_type="gt",
            evaluator_params=[0],
            time_range="5m"
        )
        
        rule.add_condition(condition)
        rule.set_for_duration("2m")
        rule.add_annotation("description", f"Anomaly detected in {metric}")
        rule.add_annotation("sensitivity", str(sensitivity))
        rule.add_label("type", "anomaly_detection")
        
        return rule
    
    def create_threshold_rule(self, name: str, alert_manager: AlertManager, 
                            datasource_uid: str, query: str, threshold: float, 
                            operator: str = "gt", duration: str = "5m") -> AlertRule:
        """创建阈值告警规则"""
        rule = alert_manager.create_rule(name, datasource_uid)
        
        condition = AlertCondition(
            query=query,
            reducer="last",
            evaluator_type=operator,
            evaluator_params=[threshold],
            time_range="5m"
        )
        
        rule.add_condition(condition)
        rule.set_for_duration(duration)
        rule.add_annotation("description", f"Threshold alert: {query} {operator} {threshold}")
        rule.add_label("type", "threshold")
        
        return rule
    
    def create_rate_of_change_rule(self, name: str, alert_manager: AlertManager, 
                                 datasource_uid: str, metric: str, 
                                 change_threshold: float, time_window: str = "5m") -> AlertRule:
        """创建变化率告警规则"""
        query = f"rate({metric}[{time_window}])"
        
        rule = alert_manager.create_rule(name, datasource_uid)
        
        condition = AlertCondition(
            query=query,
            reducer="last",
            evaluator_type="gt",
            evaluator_params=[change_threshold],
            time_range=time_window
        )
        
        rule.add_condition(condition)
        rule.set_for_duration("2m")
        rule.add_annotation("description", f"Rate of change alert for {metric}")
        rule.add_label("type", "rate_of_change")
        
        return rule
    
    def get_template_list(self) -> List[Dict]:
        """获取模板列表"""
        return [
            {
                "name": name,
                "description": template["name"],
                "severity": template["severity"].value,
                "threshold": template["threshold"],
                "duration": template["duration"]
            }
            for name, template in self.rule_templates.items()
        ]
    
    def validate_query(self, query: str, datasource_type: str = "prometheus") -> Dict:
        """验证查询语法"""
        validation_result = {
            "valid": True,
            "errors": [],
            "warnings": [],
            "suggestions": []
        }
        
        if datasource_type == "prometheus":
            # 基本语法检查
            if not query.strip():
                validation_result["errors"].append("查询不能为空")
                validation_result["valid"] = False
            
            # 检查括号匹配
            if query.count('(') != query.count(')'):
                validation_result["errors"].append("括号不匹配")
                validation_result["valid"] = False
            
            # 检查常见函数
            prometheus_functions = ['rate', 'irate', 'increase', 'sum', 'avg', 'max', 'min']
            used_functions = [func for func in prometheus_functions if func in query]
            
            if 'rate(' in query and '[' not in query:
                validation_result["warnings"].append("rate函数通常需要时间范围参数")
            
            if len(used_functions) > 3:
                validation_result["warnings"].append("查询可能过于复杂")
            
            # 性能建议
            if 'without' not in query and 'by' not in query and any(func in query for func in ['sum', 'avg']):
                validation_result["suggestions"].append("考虑使用by或without子句优化聚合查询")
        
        return validation_result

# 使用示例
builder = AlertRuleBuilder()

# 从模板创建告警规则
cpu_rule = builder.create_from_template(
    "cpu_usage", 
    alert_manager, 
    "prometheus_uid",
    {"threshold": 75, "duration": "3m"}  # 自定义参数
)

print("CPU告警规则创建完成:", cpu_rule.name)

# 创建异常检测规则
anomaly_rule = builder.create_anomaly_detection_rule(
    "Response Time Anomaly",
    alert_manager,
    "prometheus_uid",
    "http_request_duration_seconds",
    sensitivity=2.5
)

print("异常检测规则创建完成:", anomaly_rule.name)

# 验证查询
validation = builder.validate_query("rate(http_requests_total[5m])")
print("查询验证结果:", "通过" if validation["valid"] else "失败")

# 获取模板列表
templates = builder.get_template_list()
print("可用模板数量:", len(templates))

2. 高级告警配置

class AdvancedAlertConfig:
    """高级告警配置"""
    
    def __init__(self):
        self.alert_policies: Dict[str, Dict] = {}
        self.maintenance_windows: List[Dict] = []
        self.alert_dependencies: Dict[str, List[str]] = {}
    
    def create_alert_policy(self, name: str, rules: List[str], 
                          notification_policy: Dict) -> str:
        """创建告警策略"""
        policy_id = f"policy_{name.lower().replace(' ', '_')}"
        
        self.alert_policies[policy_id] = {
            "name": name,
            "rules": rules,
            "notification_policy": notification_policy,
            "enabled": True,
            "created_at": datetime.now().isoformat()
        }
        
        return policy_id
    
    def create_notification_policy(self, severity_routing: Dict, 
                                 time_based_routing: Dict = None) -> Dict:
        """创建通知策略"""
        policy = {
            "severity_routing": severity_routing,
            "time_based_routing": time_based_routing or {},
            "escalation": {
                "enabled": True,
                "levels": [
                    {"delay": "5m", "channels": ["primary"]},
                    {"delay": "15m", "channels": ["secondary"]},
                    {"delay": "30m", "channels": ["manager"]}
                ]
            },
            "grouping": {
                "enabled": True,
                "by": ["alertname", "instance"],
                "wait": "10s",
                "interval": "5m"
            },
            "inhibition": {
                "enabled": True,
                "rules": [
                    {
                        "source_match": {"severity": "critical"},
                        "target_match": {"severity": "warning"},
                        "equal": ["instance"]
                    }
                ]
            }
        }
        
        return policy
    
    def create_maintenance_window(self, name: str, start_time: datetime, 
                                end_time: datetime, affected_services: List[str], 
                                alert_actions: str = "silence") -> str:
        """创建维护窗口"""
        window_id = f"maint_{len(self.maintenance_windows) + 1}"
        
        window = {
            "id": window_id,
            "name": name,
            "start_time": start_time.isoformat(),
            "end_time": end_time.isoformat(),
            "affected_services": affected_services,
            "alert_actions": alert_actions,  # silence, reduce_severity, custom
            "created_by": "admin",
            "status": "scheduled"
        }
        
        self.maintenance_windows.append(window)
        return window_id
    
    def set_alert_dependency(self, dependent_alert: str, dependencies: List[str]):
        """设置告警依赖关系"""
        self.alert_dependencies[dependent_alert] = dependencies
    
    def create_conditional_alert(self, name: str, alert_manager: AlertManager, 
                               datasource_uid: str, primary_condition: Dict, 
                               secondary_conditions: List[Dict], 
                               logic_operator: str = "AND") -> AlertRule:
        """创建条件告警"""
        rule = alert_manager.create_rule(name, datasource_uid)
        
        # 构建复合查询
        if logic_operator == "AND":
            # 所有条件都必须满足
            combined_query = f"({primary_condition['query']}) and "
            combined_query += " and ".join([f"({cond['query']})" for cond in secondary_conditions])
        elif logic_operator == "OR":
            # 任一条件满足即可
            combined_query = f"({primary_condition['query']}) or "
            combined_query += " or ".join([f"({cond['query']})" for cond in secondary_conditions])
        else:
            combined_query = primary_condition['query']
        
        condition = AlertCondition(
            query=combined_query,
            reducer="last",
            evaluator_type=primary_condition.get("operator", "gt"),
            evaluator_params=[primary_condition.get("threshold", 0)],
            time_range="5m"
        )
        
        rule.add_condition(condition)
        rule.add_annotation("logic_operator", logic_operator)
        rule.add_label("type", "conditional")
        
        return rule
    
    def create_multi_metric_alert(self, name: str, alert_manager: AlertManager, 
                                datasource_uid: str, metrics: List[Dict], 
                                correlation_threshold: float = 0.8) -> AlertRule:
        """创建多指标关联告警"""
        rule = alert_manager.create_rule(name, datasource_uid)
        
        # 构建关联查询(简化示例)
        base_metric = metrics[0]
        correlation_query = f"""
        (
          {base_metric['query']} > {base_metric['threshold']}
        ) and (
          {' and '.join([f"{m['query']} > {m['threshold']}" for m in metrics[1:]])}
        )
        """
        
        condition = AlertCondition(
            query=correlation_query,
            reducer="last",
            evaluator_type="gt",
            evaluator_params=[0],
            time_range="5m"
        )
        
        rule.add_condition(condition)
        rule.add_annotation("correlation_threshold", str(correlation_threshold))
        rule.add_annotation("metrics_count", str(len(metrics)))
        rule.add_label("type", "multi_metric")
        
        return rule
    
    def create_time_based_alert(self, name: str, alert_manager: AlertManager, 
                              datasource_uid: str, query: str, 
                              time_conditions: Dict) -> AlertRule:
        """创建基于时间的告警"""
        rule = alert_manager.create_rule(name, datasource_uid)
        
        # 添加时间条件到查询
        time_query = query
        if "business_hours_only" in time_conditions and time_conditions["business_hours_only"]:
            time_query = f"({query}) and on() (hour() >= 9 and hour() <= 17)"
        
        if "weekdays_only" in time_conditions and time_conditions["weekdays_only"]:
            time_query = f"({time_query}) and on() (day_of_week() >= 1 and day_of_week() <= 5)"
        
        condition = AlertCondition(
            query=time_query,
            reducer="last",
            evaluator_type="gt",
            evaluator_params=[time_conditions.get("threshold", 0)],
            time_range="5m"
        )
        
        rule.add_condition(condition)
        rule.add_annotation("time_conditions", str(time_conditions))
        rule.add_label("type", "time_based")
        
        return rule
    
    def generate_alert_summary_report(self, alert_manager: AlertManager) -> str:
        """生成告警摘要报告"""
        rules = alert_manager.list_rules()
        
        # 统计信息
        total_rules = len(rules)
        severity_counts = {}
        type_counts = {}
        
        for rule in rules:
            severity = rule.labels.get("severity", "unknown")
            severity_counts[severity] = severity_counts.get(severity, 0) + 1
            
            rule_type = rule.labels.get("type", "basic")
            type_counts[rule_type] = type_counts.get(rule_type, 0) + 1
        
        report = f"""
# 告警配置摘要报告

## 基本统计
- 总告警规则数: {total_rules}
- 告警策略数: {len(self.alert_policies)}
- 维护窗口数: {len(self.maintenance_windows)}
- 依赖关系数: {len(self.alert_dependencies)}

## 严重级别分布
"""
        
        for severity, count in severity_counts.items():
            percentage = (count / total_rules * 100) if total_rules > 0 else 0
            report += f"- {severity}: {count} ({percentage:.1f}%)\n"
        
        report += "\n## 规则类型分布\n"
        for rule_type, count in type_counts.items():
            percentage = (count / total_rules * 100) if total_rules > 0 else 0
            report += f"- {rule_type}: {count} ({percentage:.1f}%)\n"
        
        # 活跃维护窗口
        now = datetime.now()
        active_windows = [
            w for w in self.maintenance_windows 
            if datetime.fromisoformat(w["start_time"]) <= now <= datetime.fromisoformat(w["end_time"])
        ]
        
        if active_windows:
            report += f"\n## 当前活跃维护窗口\n"
            for window in active_windows:
                report += f"- {window['name']}: {window['start_time']} - {window['end_time']}\n"
        
        return report

# 使用示例
advanced_config = AdvancedAlertConfig()

# 创建通知策略
notification_policy = advanced_config.create_notification_policy(
    severity_routing={
        "critical": ["pagerduty", "email", "slack"],
        "high": ["email", "slack"],
        "medium": ["slack"],
        "low": ["email"]
    },
    time_based_routing={
        "business_hours": ["slack", "email"],
        "after_hours": ["pagerduty"]
    }
)

print("通知策略创建完成")

# 创建告警策略
policy_id = advanced_config.create_alert_policy(
    "Production Monitoring",
    [cpu_rule.uid, anomaly_rule.uid],
    notification_policy
)

print("告警策略创建完成:", policy_id)

# 创建维护窗口
maintenance_id = advanced_config.create_maintenance_window(
    "Database Maintenance",
    datetime.now() + timedelta(days=1),
    datetime.now() + timedelta(days=1, hours=2),
    ["database", "api"],
    "silence"
)

print("维护窗口创建完成:", maintenance_id)

# 生成摘要报告
summary_report = advanced_config.generate_alert_summary_report(alert_manager)
print("摘要报告已生成")

通知渠道配置

1. 邮件通知配置

class EmailNotificationConfig:
    """邮件通知配置"""
    
    def __init__(self):
        self.smtp_settings = {
            "host": "smtp.example.com",
            "port": 587,
            "username": "alerts@example.com",
            "password": "your_password",
            "from_address": "alerts@example.com",
            "from_name": "Grafana Alerts",
            "skip_verify": False,
            "startTLS_policy": "MandatoryStartTLS"
        }
        self.email_templates = {}
    
    def create_email_channel(self, alert_manager: AlertManager, name: str, 
                           addresses: List[str], subject_template: str = None) -> str:
        """创建邮件通知渠道"""
        settings = {
            "addresses": ";".join(addresses),
            "subject": subject_template or "[{{ .Status | toUpper }}] {{ .GroupLabels.alertname }}",
            "body": self._get_default_email_template(),
            "singleEmail": False
        }
        
        return alert_manager.create_notification_channel(name, NotificationChannel.EMAIL, settings)
    
    def _get_default_email_template(self) -> str:
        """获取默认邮件模板"""
        return """
<!DOCTYPE html>
<html>
<head>
    <style>
        body { font-family: Arial, sans-serif; }
        .alert-header { background-color: {{ if eq .Status "firing" }}#d32f2f{{ else }}#388e3c{{ end }}; color: white; padding: 10px; }
        .alert-content { padding: 20px; }
        .alert-details { background-color: #f5f5f5; padding: 10px; margin: 10px 0; }
        .label { font-weight: bold; }
    </style>
</head>
<body>
    <div class="alert-header">
        <h2>{{ .Status | toUpper }}: {{ .GroupLabels.alertname }}</h2>
    </div>
    <div class="alert-content">
        <p><span class="label">状态:</span> {{ .Status }}</p>
        <p><span class="label">严重级别:</span> {{ .GroupLabels.severity }}</p>
        <p><span class="label">触发时间:</span> {{ .StartsAt }}</p>
        {{ if .EndsAt }}
        <p><span class="label">结束时间:</span> {{ .EndsAt }}</p>
        {{ end }}
        
        <div class="alert-details">
            <h3>告警详情</h3>
            <p>{{ .CommonAnnotations.description }}</p>
            {{ if .CommonAnnotations.summary }}
            <p><span class="label">摘要:</span> {{ .CommonAnnotations.summary }}</p>
            {{ end }}
        </div>
        
        <div class="alert-details">
            <h3>标签信息</h3>
            {{ range .GroupLabels.SortedPairs }}
            <p><span class="label">{{ .Name }}:</span> {{ .Value }}</p>
            {{ end }}
        </div>
        
        <p><a href="{{ .ExternalURL }}">查看Grafana仪表板</a></p>
    </div>
</body>
</html>
"""
    
    def create_custom_template(self, name: str, subject: str, body: str) -> Dict:
        """创建自定义邮件模板"""
        template = {
            "name": name,
            "subject": subject,
            "body": body,
            "created_at": datetime.now().isoformat()
        }
        self.email_templates[name] = template
        return template
    
    def get_smtp_config(self) -> Dict:
        """获取SMTP配置"""
        return {
            "enabled": True,
            "host": self.smtp_settings["host"],
            "port": self.smtp_settings["port"],
            "user": self.smtp_settings["username"],
            "password": self.smtp_settings["password"],
            "cert_file": "",
            "key_file": "",
            "skip_verify": self.smtp_settings["skip_verify"],
            "from_address": self.smtp_settings["from_address"],
            "from_name": self.smtp_settings["from_name"],
            "ehlo_identity": "",
            "startTLS_policy": self.smtp_settings["startTLS_policy"]
        }

# 使用示例
email_config = EmailNotificationConfig()

# 创建邮件通知渠道
email_channel = email_config.create_email_channel(
    alert_manager,
    "Operations Team",
    ["ops@example.com", "admin@example.com"],
    "[ALERT] {{ .GroupLabels.alertname }} - {{ .GroupLabels.severity }}"
)

print("邮件通知渠道创建完成:", email_channel)

2. Slack通知配置

class SlackNotificationConfig:
    """Slack通知配置"""
    
    def __init__(self):
        self.webhook_urls = {}
        self.bot_tokens = {}
    
    def create_slack_webhook_channel(self, alert_manager: AlertManager, name: str, 
                                   webhook_url: str, channel: str = "#alerts", 
                                   username: str = "Grafana") -> str:
        """创建Slack Webhook通知渠道"""
        settings = {
            "url": webhook_url,
            "channel": channel,
            "username": username,
            "title": "{{ .Status | toUpper }}: {{ .GroupLabels.alertname }}",
            "text": self._get_slack_message_template(),
            "color": "{{ if eq .Status \"firing\" }}danger{{ else }}good{{ end }}",
            "iconEmoji": ":exclamation:",
            "iconUrl": "",
            "linkNames": False,
            "mentionChannel": "here",
            "mentionUsers": "",
            "mentionGroups": ""
        }
        
        return alert_manager.create_notification_channel(name, NotificationChannel.SLACK, settings)
    
    def create_slack_bot_channel(self, alert_manager: AlertManager, name: str, 
                               bot_token: str, channel: str = "#alerts") -> str:
        """创建Slack Bot通知渠道"""
        settings = {
            "token": bot_token,
            "channel": channel,
            "title": "{{ .Status | toUpper }}: {{ .GroupLabels.alertname }}",
            "text": self._get_slack_message_template()
        }
        
        return alert_manager.create_notification_channel(name, NotificationChannel.SLACK, settings)
    
    def _get_slack_message_template(self) -> str:
        """获取Slack消息模板"""
        return """
*告警状态:* {{ .Status | toUpper }}
*告警名称:* {{ .GroupLabels.alertname }}
*严重级别:* {{ .GroupLabels.severity }}
*触发时间:* {{ .StartsAt }}
{{ if .EndsAt }}*结束时间:* {{ .EndsAt }}{{ end }}

*描述:* {{ .CommonAnnotations.description }}
{{ if .CommonAnnotations.summary }}*摘要:* {{ .CommonAnnotations.summary }}{{ end }}

*标签:*
{{ range .GroupLabels.SortedPairs }}
• {{ .Name }}: {{ .Value }}
{{ end }}

<{{ .ExternalURL }}|查看详情>
"""
    
    def create_slack_blocks_template(self) -> List[Dict]:
        """创建Slack Blocks模板"""
        return [
            {
                "type": "header",
                "text": {
                    "type": "plain_text",
                    "text": "{{ .Status | toUpper }}: {{ .GroupLabels.alertname }}"
                }
            },
            {
                "type": "section",
                "fields": [
                    {
                        "type": "mrkdwn",
                        "text": "*状态:*\n{{ .Status }}"
                    },
                    {
                        "type": "mrkdwn",
                        "text": "*严重级别:*\n{{ .GroupLabels.severity }}"
                    },
                    {
                        "type": "mrkdwn",
                        "text": "*触发时间:*\n{{ .StartsAt }}"
                    },
                    {
                        "type": "mrkdwn",
                        "text": "*实例:*\n{{ .GroupLabels.instance }}"
                    }
                ]
            },
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": "*描述:*\n{{ .CommonAnnotations.description }}"
                }
            },
            {
                "type": "actions",
                "elements": [
                    {
                        "type": "button",
                        "text": {
                            "type": "plain_text",
                            "text": "查看仪表板"
                        },
                        "url": "{{ .ExternalURL }}",
                        "style": "primary"
                    },
                    {
                        "type": "button",
                        "text": {
                            "type": "plain_text",
                            "text": "静默告警"
                        },
                        "url": "{{ .ExternalURL }}/alerting/silences",
                        "style": "danger"
                    }
                ]
            }
        ]

# 使用示例
slack_config = SlackNotificationConfig()

# 创建Slack Webhook通知渠道
slack_channel = slack_config.create_slack_webhook_channel(
    alert_manager,
    "Slack Alerts",
    "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
    "#monitoring",
    "Grafana Bot"
)

print("Slack通知渠道创建完成:", slack_channel)

3. 企业微信和钉钉配置

class EnterpriseNotificationConfig:
    """企业通信工具通知配置"""
    
    def __init__(self):
        self.wechat_settings = {}
        self.dingtalk_settings = {}
    
    def create_wechat_channel(self, alert_manager: AlertManager, name: str, 
                            corp_id: str, agent_id: str, secret: str, 
                            to_user: str = "@all") -> str:
        """创建企业微信通知渠道"""
        settings = {
            "corp_id": corp_id,
            "agent_id": agent_id,
            "secret": secret,
            "to_user": to_user,
            "to_party": "",
            "to_tag": "",
            "message": self._get_wechat_message_template(),
            "api_url": "https://qyapi.weixin.qq.com/cgi-bin/"
        }
        
        return alert_manager.create_notification_channel(name, NotificationChannel.WECHAT, settings)
    
    def create_dingtalk_channel(self, alert_manager: AlertManager, name: str, 
                              webhook_url: str, secret: str = None) -> str:
        """创建钉钉通知渠道"""
        settings = {
            "url": webhook_url,
            "secret": secret or "",
            "message": self._get_dingtalk_message_template(),
            "msgtype": "markdown",
            "title": "Grafana告警通知"
        }
        
        return alert_manager.create_notification_channel(name, NotificationChannel.DINGTALK, settings)
    
    def _get_wechat_message_template(self) -> str:
        """获取企业微信消息模板"""
        return """
【{{ .Status | toUpper }}】{{ .GroupLabels.alertname }}

告警状态: {{ .Status }}
严重级别: {{ .GroupLabels.severity }}
触发时间: {{ .StartsAt }}
{{ if .EndsAt }}结束时间: {{ .EndsAt }}{{ end }}

告警描述:
{{ .CommonAnnotations.description }}

标签信息:
{{ range .GroupLabels.SortedPairs }}
{{ .Name }}: {{ .Value }}
{{ end }}

查看详情: {{ .ExternalURL }}
"""
    
    def _get_dingtalk_message_template(self) -> str:
        """获取钉钉消息模板"""
        return """
## {{ .Status | toUpper }}: {{ .GroupLabels.alertname }}

**告警状态:** {{ .Status }}

**严重级别:** {{ .GroupLabels.severity }}

**触发时间:** {{ .StartsAt }}

{{ if .EndsAt }}**结束时间:** {{ .EndsAt }}{{ end }}

**告警描述:**

{{ .CommonAnnotations.description }}

**标签信息:**

{{ range .GroupLabels.SortedPairs }}
- {{ .Name }}: {{ .Value }}
{{ end }}

[查看详情]({{ .ExternalURL }})
"""
    
    def create_teams_channel(self, alert_manager: AlertManager, name: str, 
                           webhook_url: str) -> str:
        """创建Microsoft Teams通知渠道"""
        settings = {
            "url": webhook_url,
            "title": "{{ .Status | toUpper }}: {{ .GroupLabels.alertname }}",
            "text": self._get_teams_message_template(),
            "sectionTitle": "告警详情"
        }
        
        return alert_manager.create_notification_channel(name, NotificationChannel.TEAMS, settings)
    
    def _get_teams_message_template(self) -> str:
        """获取Teams消息模板"""
        return """
**告警状态:** {{ .Status }}

**严重级别:** {{ .GroupLabels.severity }}

**触发时间:** {{ .StartsAt }}

{{ if .EndsAt }}**结束时间:** {{ .EndsAt }}{{ end }}

**描述:** {{ .CommonAnnotations.description }}

**标签:**
{{ range .GroupLabels.SortedPairs }}
- {{ .Name }}: {{ .Value }}
{{ end }}

[查看Grafana]({{ .ExternalURL }})
"""

# 使用示例
enterprise_config = EnterpriseNotificationConfig()

# 创建企业微信通知渠道
wechat_channel = enterprise_config.create_wechat_channel(
    alert_manager,
    "WeChat Alerts",
    "your_corp_id",
    "your_agent_id",
    "your_secret",
    "@all"
)

print("企业微信通知渠道创建完成:", wechat_channel)

# 创建钉钉通知渠道
dingtalk_channel = enterprise_config.create_dingtalk_channel(
    alert_manager,
    "DingTalk Alerts",
    "https://oapi.dingtalk.com/robot/send?access_token=YOUR_TOKEN",
    "your_secret"
)

print("钉钉通知渠道创建完成:", dingtalk_channel)

4. Webhook和自定义通知

class WebhookNotificationConfig:
    """Webhook和自定义通知配置"""
    
    def __init__(self):
        self.webhook_templates = {}
    
    def create_webhook_channel(self, alert_manager: AlertManager, name: str, 
                             url: str, method: str = "POST", 
                             headers: Dict = None, auth: Dict = None) -> str:
        """创建Webhook通知渠道"""
        default_headers = {
            "Content-Type": "application/json",
            "User-Agent": "Grafana"
        }
        
        if headers:
            default_headers.update(headers)
        
        settings = {
            "url": url,
            "httpMethod": method,
            "maxAlerts": 0,
            "authorization": auth or {},
            "httpHeaders": default_headers,
            "body": self._get_webhook_payload_template()
        }
        
        return alert_manager.create_notification_channel(name, NotificationChannel.WEBHOOK, settings)
    
    def _get_webhook_payload_template(self) -> str:
        """获取Webhook负载模板"""
        return """
{
  "alert_name": "{{ .GroupLabels.alertname }}",
  "status": "{{ .Status }}",
  "severity": "{{ .GroupLabels.severity }}",
  "starts_at": "{{ .StartsAt }}",
  "ends_at": "{{ .EndsAt }}",
  "description": "{{ .CommonAnnotations.description }}",
  "summary": "{{ .CommonAnnotations.summary }}",
  "labels": {
    {{ range .GroupLabels.SortedPairs }}
    "{{ .Name }}": "{{ .Value }}"{{ if not (last $.GroupLabels.SortedPairs .) }},{{ end }}
    {{ end }}
  },
  "annotations": {
    {{ range .CommonAnnotations.SortedPairs }}
    "{{ .Name }}": "{{ .Value }}"{{ if not (last $.CommonAnnotations.SortedPairs .) }},{{ end }}
    {{ end }}
  },
  "external_url": "{{ .ExternalURL }}",
  "timestamp": "{{ now }}"
}
"""
    
    def create_pagerduty_channel(self, alert_manager: AlertManager, name: str, 
                               integration_key: str, severity: str = "error") -> str:
        """创建PagerDuty通知渠道"""
        settings = {
            "integrationKey": integration_key,
            "severity": severity,
            "class": "grafana",
            "component": "{{ .GroupLabels.alertname }}",
            "group": "{{ .GroupLabels.instance }}",
            "summary": "{{ .CommonAnnotations.summary }}",
            "source": "Grafana"
        }
        
        return alert_manager.create_notification_channel(name, NotificationChannel.PAGERDUTY, settings)
    
    def create_custom_api_channel(self, alert_manager: AlertManager, name: str, 
                                api_config: Dict) -> str:
        """创建自定义API通知渠道"""
        # 构建自定义API调用配置
        webhook_url = api_config["base_url"] + api_config.get("endpoint", "/alerts")
        
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {api_config.get('api_token', '')}",
            "X-API-Version": api_config.get("api_version", "v1")
        }
        
        # 自定义负载格式
        custom_payload = {
            "event_type": "alert",
            "source": "grafana",
            "alert": {
                "name": "{{ .GroupLabels.alertname }}",
                "status": "{{ .Status }}",
                "severity": "{{ .GroupLabels.severity }}",
                "timestamp": "{{ .StartsAt }}",
                "description": "{{ .CommonAnnotations.description }}",
                "labels": "{{ .GroupLabels }}",
                "annotations": "{{ .CommonAnnotations }}"
            },
            "metadata": {
                "grafana_url": "{{ .ExternalURL }}",
                "rule_id": "{{ .GroupLabels.rule_id }}",
                "dashboard_id": "{{ .GroupLabels.dashboard_id }}"
            }
        }
        
        settings = {
            "url": webhook_url,
            "httpMethod": "POST",
            "httpHeaders": headers,
            "body": json.dumps(custom_payload, indent=2)
        }
        
        return alert_manager.create_notification_channel(name, NotificationChannel.WEBHOOK, settings)
    
    def create_notification_test_suite(self, alert_manager: AlertManager) -> Dict:
        """创建通知测试套件"""
        test_results = {}
        
        # 测试所有通知渠道
        for channel_uid, channel in alert_manager.notification_channels.items():
            test_result = alert_manager.test_notification(channel_uid)
            test_results[channel["name"]] = {
                "channel_type": channel["type"],
                "success": test_result["success"],
                "message": test_result.get("message", ""),
                "error": test_result.get("error", ""),
                "timestamp": test_result["timestamp"]
            }
        
        return {
            "total_channels": len(alert_manager.notification_channels),
            "successful_tests": sum(1 for r in test_results.values() if r["success"]),
            "failed_tests": sum(1 for r in test_results.values() if not r["success"]),
            "test_results": test_results,
            "test_timestamp": datetime.now().isoformat()
        }
    
    def generate_notification_config_backup(self, alert_manager: AlertManager) -> Dict:
        """生成通知配置备份"""
        backup = {
            "backup_timestamp": datetime.now().isoformat(),
            "grafana_version": "9.0.0",
            "notification_channels": [],
            "alert_rules": [],
            "notification_policies": []
        }
        
        # 备份通知渠道
        for channel in alert_manager.notification_channels.values():
            # 移除敏感信息
            safe_channel = channel.copy()
            if "settings" in safe_channel:
                settings = safe_channel["settings"].copy()
                # 移除密码、token等敏感信息
                sensitive_keys = ["password", "token", "secret", "key", "webhook_url"]
                for key in sensitive_keys:
                    if key in settings:
                        settings[key] = "[REDACTED]"
                safe_channel["settings"] = settings
            
            backup["notification_channels"].append(safe_channel)
        
        # 备份告警规则(仅包含通知相关配置)
        for rule in alert_manager.rules.values():
            rule_backup = {
                "uid": rule.uid,
                "name": rule.name,
                "notification_uids": rule.notification_uids,
                "labels": rule.labels,
                "annotations": rule.annotations
            }
            backup["alert_rules"].append(rule_backup)
        
        return backup

# 使用示例
webhook_config = WebhookNotificationConfig()

# 创建Webhook通知渠道
webhook_channel = webhook_config.create_webhook_channel(
    alert_manager,
    "Custom Webhook",
    "https://api.example.com/webhooks/alerts",
    "POST",
    {"X-API-Key": "your-api-key"},
    {"type": "bearer", "token": "your-token"}
)

print("Webhook通知渠道创建完成:", webhook_channel)

# 创建PagerDuty通知渠道
pagerduty_channel = webhook_config.create_pagerduty_channel(
    alert_manager,
    "PagerDuty Alerts",
    "your-integration-key",
    "critical"
)

print("PagerDuty通知渠道创建完成:", pagerduty_channel)

# 运行通知测试套件
test_suite_results = webhook_config.create_notification_test_suite(alert_manager)
print(f"通知测试完成: {test_suite_results['successful_tests']}/{test_suite_results['total_channels']} 成功")

# 生成配置备份
config_backup = webhook_config.generate_notification_config_backup(alert_manager)
print("配置备份已生成,包含", len(config_backup["notification_channels"]), "个通知渠道")

告警策略管理

1. 告警分组和路由

class AlertPolicyManager:
    """告警策略管理器"""
    
    def __init__(self):
        self.policies = {}
        self.routes = {}
        self.inhibit_rules = {}
    
    def create_alert_policy(self, name: str, match_labels: Dict, 
                          notification_channels: List[str], 
                          group_by: List[str] = None, 
                          group_wait: str = "10s",
                          group_interval: str = "5m",
                          repeat_interval: str = "12h") -> str:
        """创建告警策略"""
        policy_uid = f"policy_{uuid.uuid4().hex[:8]}"
        
        policy = {
            "uid": policy_uid,
            "name": name,
            "match_labels": match_labels,
            "notification_channels": notification_channels,
            "group_by": group_by or ["alertname", "cluster", "service"],
            "group_wait": group_wait,
            "group_interval": group_interval,
            "repeat_interval": repeat_interval,
            "created_at": datetime.now().isoformat()
        }
        
        self.policies[policy_uid] = policy
        return policy_uid
    
    def create_routing_tree(self) -> Dict:
        """创建告警路由树"""
        routing_tree = {
            "receiver": "default",
            "group_by": ["alertname"],
            "routes": [
                {
                    "match": {"severity": "critical"},
                    "receiver": "critical-alerts",
                    "group_wait": "5s",
                    "group_interval": "2m",
                    "repeat_interval": "5m",
                    "routes": [
                        {
                            "match": {"service": "database"},
                            "receiver": "database-team",
                            "group_wait": "0s",
                            "repeat_interval": "2m"
                        },
                        {
                            "match": {"service": "web"},
                            "receiver": "web-team",
                            "group_wait": "0s",
                            "repeat_interval": "2m"
                        }
                    ]
                },
                {
                    "match": {"severity": "warning"},
                    "receiver": "warning-alerts",
                    "group_wait": "30s",
                    "group_interval": "10m",
                    "repeat_interval": "1h"
                },
                {
                    "match": {"alertname": "Watchdog"},
                    "receiver": "null",
                    "group_wait": "0s",
                    "group_interval": "1m",
                    "repeat_interval": "1m"
                }
            ]
        }
        
        return routing_tree
    
    def create_inhibit_rules(self) -> List[Dict]:
        """创建告警抑制规则"""
        inhibit_rules = [
            {
                "source_match": {"severity": "critical"},
                "target_match": {"severity": "warning"},
                "equal": ["alertname", "instance"]
            },
            {
                "source_match": {"alertname": "NodeDown"},
                "target_match_re": {"alertname": ".*"},
                "equal": ["instance"]
            },
            {
                "source_match": {"service": "database", "severity": "critical"},
                "target_match": {"service": "web", "severity": "warning"},
                "equal": ["cluster"]
            }
        ]
        
        return inhibit_rules
    
    def create_time_based_routing(self) -> Dict:
        """创建基于时间的路由"""
        time_routing = {
            "receiver": "default",
            "routes": [
                {
                    "match": {"severity": "critical"},
                    "receiver": "oncall-primary",
                    "active_time_intervals": ["business-hours"]
                },
                {
                    "match": {"severity": "critical"},
                    "receiver": "oncall-secondary",
                    "active_time_intervals": ["after-hours"]
                },
                {
                    "match": {"severity": "warning"},
                    "receiver": "email-only",
                    "active_time_intervals": ["business-hours"]
                },
                {
                    "match": {"severity": "warning"},
                    "receiver": "null",
                    "active_time_intervals": ["after-hours"]
                }
            ],
            "time_intervals": [
                {
                    "name": "business-hours",
                    "time_intervals": [
                        {
                            "times": [
                                {"start_time": "09:00", "end_time": "18:00"}
                            ],
                            "weekdays": ["monday:friday"]
                        }
                    ]
                },
                {
                    "name": "after-hours",
                    "time_intervals": [
                        {
                            "times": [
                                {"start_time": "18:01", "end_time": "08:59"}
                            ],
                            "weekdays": ["monday:friday"]
                        },
                        {
                            "times": [
                                {"start_time": "00:00", "end_time": "23:59"}
                            ],
                            "weekdays": ["saturday", "sunday"]
                        }
                    ]
                }
            ]
        }
        
        return time_routing
    
    def create_escalation_policy(self, name: str, steps: List[Dict]) -> Dict:
        """创建告警升级策略"""
        escalation_policy = {
            "name": name,
            "steps": steps,
            "created_at": datetime.now().isoformat()
        }
        
        # 示例升级步骤
        example_steps = [
            {
                "step": 1,
                "wait_time": "5m",
                "receivers": ["primary-oncall"],
                "notification_methods": ["email", "sms"]
            },
            {
                "step": 2,
                "wait_time": "10m",
                "receivers": ["secondary-oncall"],
                "notification_methods": ["email", "sms", "phone"]
            },
            {
                "step": 3,
                "wait_time": "15m",
                "receivers": ["manager", "team-lead"],
                "notification_methods": ["email", "phone"]
            }
        ]
        
        return escalation_policy
    
    def generate_alertmanager_config(self) -> Dict:
        """生成Alertmanager配置"""
        config = {
            "global": {
                "smtp_smarthost": "localhost:587",
                "smtp_from": "alerts@example.com",
                "resolve_timeout": "5m"
            },
            "templates": [
                "/etc/alertmanager/templates/*.tmpl"
            ],
            "route": self.create_routing_tree(),
            "inhibit_rules": self.create_inhibit_rules(),
            "receivers": [
                {
                    "name": "default",
                    "email_configs": [
                        {
                            "to": "admin@example.com",
                            "subject": "[DEFAULT] {{ .GroupLabels.alertname }}",
                            "body": "{{ range .Alerts }}{{ .Annotations.description }}{{ end }}"
                        }
                    ]
                },
                {
                    "name": "critical-alerts",
                    "email_configs": [
                        {
                            "to": "oncall@example.com",
                            "subject": "[CRITICAL] {{ .GroupLabels.alertname }}",
                            "body": "{{ template \"email.default.html\" . }}"
                        }
                    ],
                    "slack_configs": [
                        {
                            "api_url": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
                            "channel": "#alerts-critical",
                            "title": "Critical Alert: {{ .GroupLabels.alertname }}",
                            "text": "{{ range .Alerts }}{{ .Annotations.description }}{{ end }}"
                        }
                    ]
                },
                {
                    "name": "null"
                }
            ]
        }
        
        return config

# 使用示例
policy_manager = AlertPolicyManager()

# 创建告警策略
critical_policy = policy_manager.create_alert_policy(
    "Critical Alerts Policy",
    {"severity": "critical"},
    ["email-critical", "slack-critical", "pagerduty"],
    ["alertname", "instance"],
    "5s",
    "2m",
    "5m"
)

print("关键告警策略创建完成:", critical_policy)

# 生成Alertmanager配置
alertmanager_config = policy_manager.generate_alertmanager_config()
print("Alertmanager配置已生成")

2. 静默和维护窗口

class SilenceManager:
    """静默管理器"""
    
    def __init__(self):
        self.silences = {}
        self.maintenance_windows = {}
    
    def create_silence(self, matchers: List[Dict], starts_at: datetime, 
                     ends_at: datetime, created_by: str, comment: str) -> str:
        """创建静默规则"""
        silence_id = f"silence_{uuid.uuid4().hex[:8]}"
        
        silence = {
            "id": silence_id,
            "matchers": matchers,
            "starts_at": starts_at.isoformat(),
            "ends_at": ends_at.isoformat(),
            "created_by": created_by,
            "comment": comment,
            "created_at": datetime.now().isoformat(),
            "status": "active" if starts_at <= datetime.now() <= ends_at else "pending"
        }
        
        self.silences[silence_id] = silence
        return silence_id
    
    def create_maintenance_window(self, name: str, services: List[str], 
                                start_time: datetime, end_time: datetime, 
                                description: str, contact: str) -> str:
        """创建维护窗口"""
        window_id = f"maint_{uuid.uuid4().hex[:8]}"
        
        # 为每个服务创建静默规则
        silence_ids = []
        for service in services:
            matchers = [
                {"name": "service", "value": service, "isRegex": False},
                {"name": "alertname", "value": ".*", "isRegex": True}
            ]
            
            silence_id = self.create_silence(
                matchers,
                start_time,
                end_time,
                contact,
                f"Maintenance window: {name} - {description}"
            )
            silence_ids.append(silence_id)
        
        maintenance_window = {
            "id": window_id,
            "name": name,
            "services": services,
            "start_time": start_time.isoformat(),
            "end_time": end_time.isoformat(),
            "description": description,
            "contact": contact,
            "silence_ids": silence_ids,
            "status": "scheduled",
            "created_at": datetime.now().isoformat()
        }
        
        self.maintenance_windows[window_id] = maintenance_window
        return window_id
    
    def create_recurring_silence(self, name: str, matchers: List[Dict], 
                               schedule: Dict, duration_hours: int, 
                               created_by: str, comment: str) -> str:
        """创建周期性静默"""
        recurring_id = f"recurring_{uuid.uuid4().hex[:8]}"
        
        # 生成未来30天的静默规则
        silence_ids = []
        current_date = datetime.now().date()
        end_date = current_date + timedelta(days=30)
        
        while current_date <= end_date:
            # 检查是否匹配调度规则
            if self._matches_schedule(current_date, schedule):
                start_time = datetime.combine(
                    current_date, 
                    datetime.strptime(schedule["start_time"], "%H:%M").time()
                )
                end_time = start_time + timedelta(hours=duration_hours)
                
                silence_id = self.create_silence(
                    matchers,
                    start_time,
                    end_time,
                    created_by,
                    f"Recurring silence: {name} - {comment}"
                )
                silence_ids.append(silence_id)
            
            current_date += timedelta(days=1)
        
        recurring_silence = {
            "id": recurring_id,
            "name": name,
            "matchers": matchers,
            "schedule": schedule,
            "duration_hours": duration_hours,
            "created_by": created_by,
            "comment": comment,
            "silence_ids": silence_ids,
            "created_at": datetime.now().isoformat()
        }
        
        return recurring_id
    
    def _matches_schedule(self, date: datetime.date, schedule: Dict) -> bool:
        """检查日期是否匹配调度规则"""
        weekday = date.weekday()  # 0=Monday, 6=Sunday
        
        if "weekdays" in schedule:
            if weekday not in schedule["weekdays"]:
                return False
        
        if "monthly_days" in schedule:
            if date.day not in schedule["monthly_days"]:
                return False
        
        return True
    
    def get_active_silences(self) -> List[Dict]:
        """获取当前活跃的静默规则"""
        now = datetime.now()
        active_silences = []
        
        for silence in self.silences.values():
            starts_at = datetime.fromisoformat(silence["starts_at"])
            ends_at = datetime.fromisoformat(silence["ends_at"])
            
            if starts_at <= now <= ends_at:
                active_silences.append(silence)
        
        return active_silences
    
    def expire_silence(self, silence_id: str) -> bool:
        """手动过期静默规则"""
        if silence_id in self.silences:
            self.silences[silence_id]["ends_at"] = datetime.now().isoformat()
            self.silences[silence_id]["status"] = "expired"
            return True
        return False
    
    def generate_silence_report(self) -> Dict:
        """生成静默规则报告"""
        now = datetime.now()
        report = {
            "total_silences": len(self.silences),
            "active_silences": 0,
            "expired_silences": 0,
            "pending_silences": 0,
            "maintenance_windows": len(self.maintenance_windows),
            "silence_details": [],
            "generated_at": now.isoformat()
        }
        
        for silence in self.silences.values():
            starts_at = datetime.fromisoformat(silence["starts_at"])
            ends_at = datetime.fromisoformat(silence["ends_at"])
            
            if starts_at <= now <= ends_at:
                report["active_silences"] += 1
                status = "active"
            elif now > ends_at:
                report["expired_silences"] += 1
                status = "expired"
            else:
                report["pending_silences"] += 1
                status = "pending"
            
            report["silence_details"].append({
                "id": silence["id"],
                "status": status,
                "created_by": silence["created_by"],
                "comment": silence["comment"],
                "duration": str(ends_at - starts_at),
                "matchers_count": len(silence["matchers"])
            })
        
        return report

# 使用示例
silence_manager = SilenceManager()

# 创建临时静默
silence_id = silence_manager.create_silence(
    [
        {"name": "alertname", "value": "HighCPUUsage", "isRegex": False},
        {"name": "instance", "value": "server-01", "isRegex": False}
    ],
    datetime.now(),
    datetime.now() + timedelta(hours=2),
    "admin@example.com",
    "Investigating high CPU usage on server-01"
)

print("静默规则创建完成:", silence_id)

# 创建维护窗口
maintenance_id = silence_manager.create_maintenance_window(
    "Database Maintenance",
    ["mysql", "redis", "mongodb"],
    datetime.now() + timedelta(days=1),
    datetime.now() + timedelta(days=1, hours=4),
    "Scheduled database maintenance and updates",
    "dba@example.com"
)

print("维护窗口创建完成:", maintenance_id)

# 创建周期性静默(每周日凌晨2点维护2小时)
recurring_id = silence_manager.create_recurring_silence(
    "Weekly Backup Silence",
    [
        {"name": "alertname", "value": "BackupRunning", "isRegex": False}
    ],
    {
        "weekdays": [6],  # Sunday
        "start_time": "02:00"
    },
    2,
    "system@example.com",
    "Weekly backup process"
)

print("周期性静默创建完成:", recurring_id)

# 生成静默报告
silence_report = silence_manager.generate_silence_report()
print(f"静默报告: {silence_report['active_silences']} 活跃, {silence_report['expired_silences']} 已过期")

故障排除

1. 告警故障诊断

class AlertTroubleshooter:
    """告警故障排除工具"""
    
    def __init__(self):
        self.diagnostic_tests = {}
        self.common_issues = {}
    
    def diagnose_alert_issues(self, alert_manager: AlertManager) -> Dict:
        """诊断告警系统问题"""
        diagnosis = {
            "timestamp": datetime.now().isoformat(),
            "overall_health": "unknown",
            "issues_found": [],
            "recommendations": [],
            "test_results": {}
        }
        
        # 测试告警规则
        rule_test = self._test_alert_rules(alert_manager)
        diagnosis["test_results"]["alert_rules"] = rule_test
        
        # 测试通知渠道
        notification_test = self._test_notification_channels(alert_manager)
        diagnosis["test_results"]["notification_channels"] = notification_test
        
        # 测试数据源连接
        datasource_test = self._test_datasource_connectivity()
        diagnosis["test_results"]["datasources"] = datasource_test
        
        # 分析问题
        issues = self._analyze_issues(diagnosis["test_results"])
        diagnosis["issues_found"] = issues
        
        # 生成建议
        recommendations = self._generate_recommendations(issues)
        diagnosis["recommendations"] = recommendations
        
        # 确定整体健康状态
        diagnosis["overall_health"] = self._determine_health_status(issues)
        
        return diagnosis
    
    def _test_alert_rules(self, alert_manager: AlertManager) -> Dict:
        """测试告警规则"""
        test_result = {
            "total_rules": len(alert_manager.rules),
            "active_rules": 0,
            "inactive_rules": 0,
            "error_rules": 0,
            "rule_details": []
        }
        
        for rule in alert_manager.rules.values():
            rule_status = {
                "name": rule.name,
                "uid": rule.uid,
                "status": "unknown",
                "last_evaluation": "never",
                "errors": []
            }
            
            # 模拟规则状态检查
            if hasattr(rule, 'condition') and rule.condition:
                try:
                    # 检查查询语法
                    if "invalid" in rule.condition.query.lower():
                        rule_status["status"] = "error"
                        rule_status["errors"].append("Invalid query syntax")
                        test_result["error_rules"] += 1
                    elif rule.no_data_state == AlertState.NO_DATA:
                        rule_status["status"] = "no_data"
                        test_result["inactive_rules"] += 1
                    else:
                        rule_status["status"] = "active"
                        rule_status["last_evaluation"] = datetime.now().isoformat()
                        test_result["active_rules"] += 1
                except Exception as e:
                    rule_status["status"] = "error"
                    rule_status["errors"].append(str(e))
                    test_result["error_rules"] += 1
            
            test_result["rule_details"].append(rule_status)
        
        return test_result
    
    def _test_notification_channels(self, alert_manager: AlertManager) -> Dict:
        """测试通知渠道"""
        test_result = {
            "total_channels": len(alert_manager.notification_channels),
            "working_channels": 0,
            "failed_channels": 0,
            "channel_details": []
        }
        
        for channel_uid, channel in alert_manager.notification_channels.items():
            channel_status = {
                "name": channel["name"],
                "type": channel["type"],
                "uid": channel_uid,
                "status": "unknown",
                "last_test": "never",
                "errors": []
            }
            
            # 执行通知测试
            try:
                test_response = alert_manager.test_notification(channel_uid)
                if test_response["success"]:
                    channel_status["status"] = "working"
                    channel_status["last_test"] = test_response["timestamp"]
                    test_result["working_channels"] += 1
                else:
                    channel_status["status"] = "failed"
                    channel_status["errors"].append(test_response.get("error", "Unknown error"))
                    test_result["failed_channels"] += 1
            except Exception as e:
                channel_status["status"] = "failed"
                channel_status["errors"].append(str(e))
                test_result["failed_channels"] += 1
            
            test_result["channel_details"].append(channel_status)
        
        return test_result
    
    def _test_datasource_connectivity(self) -> Dict:
        """测试数据源连接"""
        test_result = {
            "total_datasources": 0,
            "connected_datasources": 0,
            "failed_datasources": 0,
            "datasource_details": []
        }
        
        # 模拟数据源测试
        common_datasources = [
            {"name": "Prometheus", "type": "prometheus", "url": "http://localhost:9090"},
            {"name": "MySQL", "type": "mysql", "url": "localhost:3306"},
            {"name": "InfluxDB", "type": "influxdb", "url": "http://localhost:8086"}
        ]
        
        for ds in common_datasources:
            ds_status = {
                "name": ds["name"],
                "type": ds["type"],
                "url": ds["url"],
                "status": "unknown",
                "response_time": 0,
                "errors": []
            }
            
            # 模拟连接测试
            try:
                # 这里应该是实际的连接测试逻辑
                import random
                if random.choice([True, False, True]):  # 2/3概率成功
                    ds_status["status"] = "connected"
                    ds_status["response_time"] = random.randint(10, 500)
                    test_result["connected_datasources"] += 1
                else:
                    ds_status["status"] = "failed"
                    ds_status["errors"].append("Connection timeout")
                    test_result["failed_datasources"] += 1
            except Exception as e:
                ds_status["status"] = "failed"
                ds_status["errors"].append(str(e))
                test_result["failed_datasources"] += 1
            
            test_result["datasource_details"].append(ds_status)
            test_result["total_datasources"] += 1
        
        return test_result
    
    def _analyze_issues(self, test_results: Dict) -> List[Dict]:
        """分析测试结果中的问题"""
        issues = []
        
        # 分析告警规则问题
        rule_results = test_results.get("alert_rules", {})
        if rule_results.get("error_rules", 0) > 0:
            issues.append({
                "category": "alert_rules",
                "severity": "high",
                "title": "告警规则错误",
                "description": f"发现 {rule_results['error_rules']} 个错误的告警规则",
                "affected_items": [r["name"] for r in rule_results.get("rule_details", []) if r["status"] == "error"]
            })
        
        # 分析通知渠道问题
        notification_results = test_results.get("notification_channels", {})
        if notification_results.get("failed_channels", 0) > 0:
            issues.append({
                "category": "notifications",
                "severity": "medium",
                "title": "通知渠道故障",
                "description": f"发现 {notification_results['failed_channels']} 个故障的通知渠道",
                "affected_items": [c["name"] for c in notification_results.get("channel_details", []) if c["status"] == "failed"]
            })
        
        # 分析数据源问题
        datasource_results = test_results.get("datasources", {})
        if datasource_results.get("failed_datasources", 0) > 0:
            issues.append({
                "category": "datasources",
                "severity": "high",
                "title": "数据源连接失败",
                "description": f"发现 {datasource_results['failed_datasources']} 个无法连接的数据源",
                "affected_items": [d["name"] for d in datasource_results.get("datasource_details", []) if d["status"] == "failed"]
            })
        
        return issues
    
    def _generate_recommendations(self, issues: List[Dict]) -> List[Dict]:
        """根据问题生成建议"""
        recommendations = []
        
        for issue in issues:
            if issue["category"] == "alert_rules":
                recommendations.append({
                    "category": "alert_rules",
                    "priority": "high",
                    "title": "修复告警规则",
                    "actions": [
                        "检查告警规则的查询语法",
                        "验证数据源连接",
                        "更新过时的指标名称",
                        "测试规则条件逻辑"
                    ]
                })
            
            elif issue["category"] == "notifications":
                recommendations.append({
                    "category": "notifications",
                    "priority": "medium",
                    "title": "修复通知渠道",
                    "actions": [
                        "验证通知渠道配置",
                        "检查API密钥和令牌",
                        "测试网络连接",
                        "更新过期的认证信息"
                    ]
                })
            
            elif issue["category"] == "datasources":
                recommendations.append({
                    "category": "datasources",
                    "priority": "high",
                    "title": "修复数据源连接",
                    "actions": [
                        "检查数据源服务状态",
                        "验证网络连接",
                        "更新连接配置",
                        "检查认证凭据"
                    ]
                })
        
        return recommendations
    
    def _determine_health_status(self, issues: List[Dict]) -> str:
        """确定整体健康状态"""
        if not issues:
            return "healthy"
        
        high_severity_issues = [i for i in issues if i["severity"] == "high"]
        if high_severity_issues:
            return "critical"
        
        medium_severity_issues = [i for i in issues if i["severity"] == "medium"]
        if medium_severity_issues:
            return "warning"
        
        return "degraded"
    
    def generate_health_report(self, alert_manager: AlertManager) -> str:
        """生成健康报告"""
        diagnosis = self.diagnose_alert_issues(alert_manager)
        
        report = f"""
# Grafana告警系统健康报告

**生成时间:** {diagnosis['timestamp']}
**整体状态:** {diagnosis['overall_health'].upper()}

## 系统概览

- **告警规则:** {diagnosis['test_results']['alert_rules']['total_rules']} 总计
  - 活跃: {diagnosis['test_results']['alert_rules']['active_rules']}
  - 错误: {diagnosis['test_results']['alert_rules']['error_rules']}

- **通知渠道:** {diagnosis['test_results']['notification_channels']['total_channels']} 总计
  - 正常: {diagnosis['test_results']['notification_channels']['working_channels']}
  - 故障: {diagnosis['test_results']['notification_channels']['failed_channels']}

- **数据源:** {diagnosis['test_results']['datasources']['total_datasources']} 总计
  - 连接: {diagnosis['test_results']['datasources']['connected_datasources']}
  - 失败: {diagnosis['test_results']['datasources']['failed_datasources']}

## 发现的问题

"""
        
        if diagnosis['issues_found']:
            for issue in diagnosis['issues_found']:
                report += f"""
### {issue['title']} ({issue['severity'].upper()})

{issue['description']}

**受影响的项目:**
"""
                for item in issue['affected_items']:
                    report += f"- {item}\n"
                report += "\n"
        else:
            report += "未发现问题。\n\n"
        
        report += "## 建议措施\n\n"
        
        if diagnosis['recommendations']:
            for rec in diagnosis['recommendations']:
                report += f"""
### {rec['title']} (优先级: {rec['priority'].upper()})

"""
                for action in rec['actions']:
                    report += f"- {action}\n"
                report += "\n"
        else:
            report += "无需采取措施。\n"
        
        return report

# 使用示例
troubleshooter = AlertTroubleshooter()

# 诊断告警系统
diagnosis = troubleshooter.diagnose_alert_issues(alert_manager)
print(f"系统健康状态: {diagnosis['overall_health']}")
print(f"发现问题数量: {len(diagnosis['issues_found'])}")

# 生成健康报告
health_report = troubleshooter.generate_health_report(alert_manager)
print("\n=== 健康报告 ===")
print(health_report)

总结

关键要点

  1. 告警规则设计

    • 合理设置阈值和评估间隔
    • 使用标签和注释提供上下文信息
    • 实现多级告警和依赖关系
  2. 通知渠道配置

    • 支持多种通知方式(邮件、Slack、企业微信等)
    • 配置消息模板和格式化
    • 实现通知测试和验证
  3. 告警策略管理

    • 创建路由规则和分组策略
    • 实现告警抑制和静默
    • 配置升级策略和时间窗口
  4. 故障排除

    • 定期检查告警系统健康状态
    • 监控通知渠道可用性
    • 分析和解决常见问题

最佳实践

  1. 规则管理

    • 使用版本控制管理告警规则
    • 定期审查和优化告警阈值
    • 避免告警风暴和噪音
  2. 通知优化

    • 根据严重级别选择通知方式
    • 实现智能分组和去重
    • 提供丰富的上下文信息
  3. 运维管理

    • 建立告警响应流程
    • 定期进行告警演练
    • 收集和分析告警指标

下一步学习

  1. 高级功能

    • 学习Grafana Unified Alerting
    • 探索机器学习异常检测
    • 集成外部告警系统
  2. 实践项目

    • 构建完整的监控告警体系
    • 实现自动化告警管理
    • 开发自定义通知插件
  3. 相关技术

    • Prometheus告警规则
    • Alertmanager配置
    • 监控最佳实践

通过本教程,你已经掌握了Grafana告警系统的核心概念和实践技能。继续探索和实践,构建可靠的监控告警体系!