概述
本章将详细介绍Grafana的告警系统,包括告警规则配置、通知渠道设置、告警策略管理和故障排除。通过学习本章内容,你将能够构建完整的监控告警体系。
学习目标
- 理解Grafana告警系统的架构和工作原理
- 掌握告警规则的创建和配置方法
- 学会配置各种通知渠道
- 了解告警策略和静默管理
- 掌握告警系统的监控和故障排除
告警系统架构
1. 核心组件
from enum import Enum
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from datetime import datetime, timedelta
class AlertState(Enum):
"""告警状态枚举"""
NORMAL = "Normal"
PENDING = "Pending"
ALERTING = "Alerting"
NO_DATA = "NoData"
EXECUTION_ERROR = "ExecutionError"
class NotificationChannel(Enum):
"""通知渠道枚举"""
EMAIL = "email"
SLACK = "slack"
WEBHOOK = "webhook"
PAGERDUTY = "pagerduty"
TEAMS = "teams"
DISCORD = "discord"
TELEGRAM = "telegram"
WECHAT = "wechat"
DINGTALK = "dingtalk"
class AlertSeverity(Enum):
"""告警严重级别"""
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
@dataclass
class AlertCondition:
"""告警条件"""
query: str
reducer: str # avg, min, max, sum, count, last, median, diff, diff_abs, count_non_null
evaluator_type: str # gt, lt, within_range, outside_range, no_value
evaluator_params: List[float]
time_range: str
class AlertRule:
"""告警规则类"""
def __init__(self, name: str, datasource_uid: str):
self.name = name
self.datasource_uid = datasource_uid
self.uid = f"alert_{name.lower().replace(' ', '_')}"
self.conditions: List[AlertCondition] = []
self.frequency = "10s"
self.handler = 1 # 1 for keep_state, 0 for alerting
self.no_data_state = AlertState.NO_DATA
self.exec_err_state = AlertState.EXECUTION_ERROR
self.for_duration = "5m"
self.annotations = {}
self.labels = {}
self.notification_uids: List[str] = []
self.message = ""
self.severity = AlertSeverity.MEDIUM
def add_condition(self, condition: AlertCondition) -> 'AlertRule':
"""添加告警条件"""
self.conditions.append(condition)
return self
def set_frequency(self, frequency: str) -> 'AlertRule':
"""设置评估频率"""
self.frequency = frequency
return self
def set_for_duration(self, duration: str) -> 'AlertRule':
"""设置持续时间"""
self.for_duration = duration
return self
def add_annotation(self, key: str, value: str) -> 'AlertRule':
"""添加注释"""
self.annotations[key] = value
return self
def add_label(self, key: str, value: str) -> 'AlertRule':
"""添加标签"""
self.labels[key] = value
return self
def set_severity(self, severity: AlertSeverity) -> 'AlertRule':
"""设置严重级别"""
self.severity = severity
return self
def add_notification(self, notification_uid: str) -> 'AlertRule':
"""添加通知渠道"""
self.notification_uids.append(notification_uid)
return self
def set_message(self, message: str) -> 'AlertRule':
"""设置告警消息"""
self.message = message
return self
def to_dict(self) -> Dict:
"""转换为字典格式"""
return {
"uid": self.uid,
"title": self.name,
"condition": "A",
"data": [
{
"refId": "A",
"queryType": "",
"relativeTimeRange": {
"from": 600,
"to": 0
},
"model": {
"expr": self.conditions[0].query if self.conditions else "",
"interval": "",
"refId": "A"
}
}
],
"intervalSeconds": self._parse_duration(self.frequency),
"maxDataPoints": 43200,
"noDataState": self.no_data_state.value,
"execErrState": self.exec_err_state.value,
"for": self.for_duration,
"annotations": self.annotations,
"labels": self.labels,
"message": self.message
}
def _parse_duration(self, duration: str) -> int:
"""解析持续时间字符串"""
if duration.endswith('s'):
return int(duration[:-1])
elif duration.endswith('m'):
return int(duration[:-1]) * 60
elif duration.endswith('h'):
return int(duration[:-1]) * 3600
return 60 # 默认60秒
class AlertManager:
"""告警管理器"""
def __init__(self):
self.rules: Dict[str, AlertRule] = {}
self.notification_channels: Dict[str, Dict] = {}
self.alert_groups: Dict[str, List[str]] = {}
self.silences: List[Dict] = []
def create_rule(self, name: str, datasource_uid: str) -> AlertRule:
"""创建告警规则"""
rule = AlertRule(name, datasource_uid)
self.rules[rule.uid] = rule
return rule
def get_rule(self, uid: str) -> Optional[AlertRule]:
"""获取告警规则"""
return self.rules.get(uid)
def delete_rule(self, uid: str) -> bool:
"""删除告警规则"""
if uid in self.rules:
del self.rules[uid]
return True
return False
def list_rules(self, folder: Optional[str] = None) -> List[AlertRule]:
"""列出告警规则"""
rules = list(self.rules.values())
if folder:
# 根据文件夹过滤(简化实现)
rules = [rule for rule in rules if rule.labels.get('folder') == folder]
return rules
def create_notification_channel(self, name: str, channel_type: NotificationChannel, settings: Dict) -> str:
"""创建通知渠道"""
uid = f"notifier_{name.lower().replace(' ', '_')}"
self.notification_channels[uid] = {
"uid": uid,
"name": name,
"type": channel_type.value,
"settings": settings,
"isDefault": False,
"sendReminder": False,
"disableResolveMessage": False,
"frequency": "10s"
}
return uid
def get_notification_channel(self, uid: str) -> Optional[Dict]:
"""获取通知渠道"""
return self.notification_channels.get(uid)
def create_alert_group(self, name: str, rule_uids: List[str]) -> str:
"""创建告警组"""
self.alert_groups[name] = rule_uids
return name
def add_silence(self, matcher: Dict, starts_at: datetime, ends_at: datetime, comment: str) -> str:
"""添加静默规则"""
silence_id = f"silence_{len(self.silences) + 1}"
silence = {
"id": silence_id,
"matchers": [matcher],
"startsAt": starts_at.isoformat(),
"endsAt": ends_at.isoformat(),
"comment": comment,
"createdBy": "admin",
"status": {
"state": "active"
}
}
self.silences.append(silence)
return silence_id
def get_alert_status(self, rule_uid: str) -> Dict:
"""获取告警状态"""
rule = self.get_rule(rule_uid)
if not rule:
return {"error": "Rule not found"}
# 模拟告警状态
return {
"rule_uid": rule_uid,
"rule_name": rule.name,
"state": AlertState.NORMAL.value,
"last_evaluation": datetime.now().isoformat(),
"evaluation_duration": "150ms",
"annotations": rule.annotations,
"labels": rule.labels
}
def test_notification(self, channel_uid: str) -> Dict:
"""测试通知渠道"""
channel = self.get_notification_channel(channel_uid)
if not channel:
return {"success": False, "error": "Channel not found"}
# 模拟测试结果
return {
"success": True,
"message": f"Test notification sent to {channel['name']}",
"channel_type": channel['type'],
"timestamp": datetime.now().isoformat()
}
def export_config(self) -> Dict:
"""导出告警配置"""
return {
"rules": [rule.to_dict() for rule in self.rules.values()],
"notification_channels": list(self.notification_channels.values()),
"alert_groups": self.alert_groups,
"silences": self.silences
}
def import_config(self, config: Dict) -> bool:
"""导入告警配置"""
try:
# 导入规则
for rule_data in config.get("rules", []):
rule = AlertRule(rule_data["title"], "default")
rule.uid = rule_data["uid"]
rule.annotations = rule_data.get("annotations", {})
rule.labels = rule_data.get("labels", {})
rule.message = rule_data.get("message", "")
self.rules[rule.uid] = rule
# 导入通知渠道
for channel_data in config.get("notification_channels", []):
self.notification_channels[channel_data["uid"]] = channel_data
# 导入告警组
self.alert_groups.update(config.get("alert_groups", {}))
# 导入静默规则
self.silences.extend(config.get("silences", []))
return True
except Exception as e:
print(f"Import failed: {e}")
return False
# 使用示例
alert_manager = AlertManager()
# 创建告警规则
rule = alert_manager.create_rule("High CPU Usage", "prometheus_uid")
rule.add_condition(AlertCondition(
query="avg(cpu_usage_percent) by (instance)",
reducer="avg",
evaluator_type="gt",
evaluator_params=[80.0],
time_range="5m"
)).set_frequency("30s").set_for_duration("2m").add_annotation(
"description", "CPU usage is above 80%"
).add_label("severity", "warning").set_severity(AlertSeverity.HIGH)
print("告警规则创建完成:", rule.name)
# 创建通知渠道
email_channel = alert_manager.create_notification_channel(
"Email Alerts",
NotificationChannel.EMAIL,
{
"addresses": "admin@example.com;ops@example.com",
"subject": "Grafana Alert",
"singleEmail": False
}
)
print("通知渠道创建完成:", email_channel)
# 添加通知到规则
rule.add_notification(email_channel)
# 获取告警状态
status = alert_manager.get_alert_status(rule.uid)
print("告警状态:", status["state"])
2. 告警工作流程
class AlertWorkflow:
"""告警工作流程管理"""
def __init__(self):
self.workflow_steps = {
"evaluation": "评估告警条件",
"state_change": "状态变更检测",
"notification": "发送通知",
"escalation": "告警升级",
"resolution": "告警解决"
}
self.escalation_rules: List[Dict] = []
def create_escalation_rule(self, name: str, conditions: Dict, actions: List[Dict]) -> Dict:
"""创建升级规则"""
rule = {
"name": name,
"conditions": conditions,
"actions": actions,
"enabled": True,
"created_at": datetime.now().isoformat()
}
self.escalation_rules.append(rule)
return rule
def simulate_alert_lifecycle(self, alert_rule: AlertRule) -> List[Dict]:
"""模拟告警生命周期"""
lifecycle_events = []
# 1. 初始评估
lifecycle_events.append({
"timestamp": datetime.now().isoformat(),
"event": "evaluation_started",
"rule": alert_rule.name,
"state": AlertState.NORMAL.value,
"details": "开始评估告警条件"
})
# 2. 条件触发
lifecycle_events.append({
"timestamp": (datetime.now() + timedelta(seconds=30)).isoformat(),
"event": "condition_triggered",
"rule": alert_rule.name,
"state": AlertState.PENDING.value,
"details": "告警条件被触发,进入待定状态"
})
# 3. 持续时间满足
lifecycle_events.append({
"timestamp": (datetime.now() + timedelta(minutes=2)).isoformat(),
"event": "state_changed_to_alerting",
"rule": alert_rule.name,
"state": AlertState.ALERTING.value,
"details": f"持续时间{alert_rule.for_duration}满足,状态变更为告警"
})
# 4. 发送通知
for notification_uid in alert_rule.notification_uids:
lifecycle_events.append({
"timestamp": (datetime.now() + timedelta(minutes=2, seconds=10)).isoformat(),
"event": "notification_sent",
"rule": alert_rule.name,
"state": AlertState.ALERTING.value,
"details": f"通知已发送到 {notification_uid}"
})
# 5. 告警解决
lifecycle_events.append({
"timestamp": (datetime.now() + timedelta(minutes=10)).isoformat(),
"event": "alert_resolved",
"rule": alert_rule.name,
"state": AlertState.NORMAL.value,
"details": "告警条件不再满足,告警已解决"
})
return lifecycle_events
def generate_workflow_diagram(self) -> str:
"""生成工作流程图"""
diagram = """
# 告警工作流程图
```mermaid
graph TD
A[监控数据] --> B[评估告警条件]
B --> C{条件满足?}
C -->|是| D[进入Pending状态]
C -->|否| E[保持Normal状态]
D --> F{持续时间满足?}
F -->|是| G[进入Alerting状态]
F -->|否| D
G --> H[发送通知]
H --> I[检查升级规则]
I --> J{需要升级?}
J -->|是| K[执行升级动作]
J -->|否| L[等待条件变化]
K --> L
L --> M{条件解决?}
M -->|是| N[发送解决通知]
M -->|否| L
N --> E
E --> B
工作流程说明
数据收集: 从配置的数据源收集监控数据
条件评估: 根据告警规则评估数据是否满足告警条件
状态管理: 管理告警状态转换(Normal → Pending → Alerting)
通知发送: 当状态变为Alerting时发送通知
升级处理: 根据升级规则执行相应动作
解决处理: 当条件不再满足时发送解决通知 “”” return diagram
def create_notification_template(self, channel_type: NotificationChannel) -> Dict: “”“创建通知模板”“” templates = { NotificationChannel.EMAIL: { “subject”: “[{{ .Status | toUpper }}] {{ .GroupLabels.alertname }}”, “body”: “”” 告警详情:
- 规则名称: {{ .GroupLabels.alertname }}
- 严重级别: {{ .GroupLabels.severity }}
- 状态: {{ .Status }}
- 触发时间: {{ .StartsAt }}
- 描述: {{ .CommonAnnotations.description }}
查看详情: {{ .ExternalURL }} “”” }, NotificationChannel.SLACK: { “channel”: “#alerts”, “username”: “Grafana”, “title”: “{{ .Status | toUpper }}: {{ .GroupLabels.alertname }}”, “text”: “{{ .CommonAnnotations.description }}”, “color”: “{{ if eq .Status \“firing\” }}danger{{ else }}good{{ end }}” }, NotificationChannel.WEBHOOK: { “url”: “https://hooks.example.com/webhook”, “method”: “POST”, “headers”: { “Content-Type”: “application/json”, “Authorization”: “Bearer YOUR_TOKEN” }, “body”: { “alert_name”: “{{ .GroupLabels.alertname }}”, “status”: “{{ .Status }}”, “severity”: “{{ .GroupLabels.severity }}”, “description”: “{{ .CommonAnnotations.description }}”, “starts_at”: “{{ .StartsAt }}”, “external_url”: “{{ .ExternalURL }}” } } }
return templates.get(channel_type, {})
def validate_alert_rule(self, rule: AlertRule) -> Dict:
"""验证告警规则"""
validation_result = {
"valid": True,
"errors": [],
"warnings": [],
"suggestions": []
}
# 检查基本配置
if not rule.name:
validation_result["errors"].append("告警规则名称不能为空")
validation_result["valid"] = False
if not rule.conditions:
validation_result["errors"].append("至少需要一个告警条件")
validation_result["valid"] = False
# 检查频率设置
frequency_seconds = rule._parse_duration(rule.frequency)
if frequency_seconds < 10:
validation_result["warnings"].append("评估频率过高可能影响性能")
# 检查持续时间
for_seconds = rule._parse_duration(rule.for_duration)
if for_seconds < frequency_seconds:
validation_result["warnings"].append("持续时间应该大于评估频率")
# 检查通知配置
if not rule.notification_uids:
validation_result["warnings"].append("未配置通知渠道")
# 检查标签和注释
if not rule.labels.get("severity"):
validation_result["suggestions"].append("建议添加severity标签")
if not rule.annotations.get("description"):
validation_result["suggestions"].append("建议添加description注释")
return validation_result
使用示例
workflow = AlertWorkflow()
创建升级规则
escalation_rule = workflow.create_escalation_rule( “Critical Alert Escalation”, { “severity”: “critical”, “duration”: “15m”, “no_response”: True }, [ {“type”: “notify_manager”, “target”: “manager@example.com”}, {“type”: “create_incident”, “system”: “pagerduty”} ] )
print(“升级规则创建完成:”, escalation_rule[“name”])
模拟告警生命周期
lifecycle = workflow.simulate_alert_lifecycle(rule) print(“告警生命周期事件数:”, len(lifecycle))
验证告警规则
validation = workflow.validate_alert_rule(rule) print(“规则验证结果:”, “通过” if validation[“valid”] else “失败”)
生成通知模板
email_template = workflow.create_notification_template(NotificationChannel.EMAIL) print(“邮件模板主题:”, email_template[“subject”])
## 告警规则配置
### 1. 基础告警规则
```python
class AlertRuleBuilder:
"""告警规则构建器"""
def __init__(self):
self.rule_templates = {
"cpu_usage": {
"name": "High CPU Usage",
"query": "avg(100 - (avg by (instance) (irate(node_cpu_seconds_total{mode=\"idle\"}[5m])) * 100))",
"threshold": 80,
"operator": "gt",
"duration": "5m",
"severity": AlertSeverity.HIGH
},
"memory_usage": {
"name": "High Memory Usage",
"query": "(1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) * 100",
"threshold": 85,
"operator": "gt",
"duration": "3m",
"severity": AlertSeverity.HIGH
},
"disk_usage": {
"name": "High Disk Usage",
"query": "(1 - (node_filesystem_avail_bytes / node_filesystem_size_bytes)) * 100",
"threshold": 90,
"operator": "gt",
"duration": "1m",
"severity": AlertSeverity.CRITICAL
},
"service_down": {
"name": "Service Down",
"query": "up",
"threshold": 1,
"operator": "lt",
"duration": "1m",
"severity": AlertSeverity.CRITICAL
},
"response_time": {
"name": "High Response Time",
"query": "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))",
"threshold": 2,
"operator": "gt",
"duration": "2m",
"severity": AlertSeverity.MEDIUM
},
"error_rate": {
"name": "High Error Rate",
"query": "rate(http_requests_total{status=~\"5..\"}[5m]) / rate(http_requests_total[5m]) * 100",
"threshold": 5,
"operator": "gt",
"duration": "3m",
"severity": AlertSeverity.HIGH
}
}
def create_from_template(self, template_name: str, alert_manager: AlertManager,
datasource_uid: str, custom_params: Dict = None) -> AlertRule:
"""从模板创建告警规则"""
if template_name not in self.rule_templates:
raise ValueError(f"Unknown template: {template_name}")
template = self.rule_templates[template_name].copy()
# 应用自定义参数
if custom_params:
template.update(custom_params)
# 创建告警规则
rule = alert_manager.create_rule(template["name"], datasource_uid)
# 添加条件
condition = AlertCondition(
query=template["query"],
reducer="last",
evaluator_type=template["operator"],
evaluator_params=[template["threshold"]],
time_range="5m"
)
rule.add_condition(condition)
rule.set_for_duration(template["duration"])
rule.set_severity(template["severity"])
# 添加默认注释和标签
rule.add_annotation("description", f"{template['name']} - Threshold: {template['threshold']}")
rule.add_label("severity", template["severity"].value)
rule.add_label("template", template_name)
return rule
def create_composite_rule(self, name: str, alert_manager: AlertManager,
datasource_uid: str, conditions: List[Dict]) -> AlertRule:
"""创建复合条件告警规则"""
rule = alert_manager.create_rule(name, datasource_uid)
for i, condition_config in enumerate(conditions):
condition = AlertCondition(
query=condition_config["query"],
reducer=condition_config.get("reducer", "last"),
evaluator_type=condition_config.get("operator", "gt"),
evaluator_params=[condition_config["threshold"]],
time_range=condition_config.get("time_range", "5m")
)
rule.add_condition(condition)
return rule
def create_anomaly_detection_rule(self, name: str, alert_manager: AlertManager,
datasource_uid: str, metric: str,
sensitivity: float = 2.0) -> AlertRule:
"""创建异常检测告警规则"""
# 使用统计方法检测异常
query = f"""
(
{metric} -
avg_over_time({metric}[1h])
) / stddev_over_time({metric}[1h]) > {sensitivity}
"""
rule = alert_manager.create_rule(name, datasource_uid)
condition = AlertCondition(
query=query,
reducer="last",
evaluator_type="gt",
evaluator_params=[0],
time_range="5m"
)
rule.add_condition(condition)
rule.set_for_duration("2m")
rule.add_annotation("description", f"Anomaly detected in {metric}")
rule.add_annotation("sensitivity", str(sensitivity))
rule.add_label("type", "anomaly_detection")
return rule
def create_threshold_rule(self, name: str, alert_manager: AlertManager,
datasource_uid: str, query: str, threshold: float,
operator: str = "gt", duration: str = "5m") -> AlertRule:
"""创建阈值告警规则"""
rule = alert_manager.create_rule(name, datasource_uid)
condition = AlertCondition(
query=query,
reducer="last",
evaluator_type=operator,
evaluator_params=[threshold],
time_range="5m"
)
rule.add_condition(condition)
rule.set_for_duration(duration)
rule.add_annotation("description", f"Threshold alert: {query} {operator} {threshold}")
rule.add_label("type", "threshold")
return rule
def create_rate_of_change_rule(self, name: str, alert_manager: AlertManager,
datasource_uid: str, metric: str,
change_threshold: float, time_window: str = "5m") -> AlertRule:
"""创建变化率告警规则"""
query = f"rate({metric}[{time_window}])"
rule = alert_manager.create_rule(name, datasource_uid)
condition = AlertCondition(
query=query,
reducer="last",
evaluator_type="gt",
evaluator_params=[change_threshold],
time_range=time_window
)
rule.add_condition(condition)
rule.set_for_duration("2m")
rule.add_annotation("description", f"Rate of change alert for {metric}")
rule.add_label("type", "rate_of_change")
return rule
def get_template_list(self) -> List[Dict]:
"""获取模板列表"""
return [
{
"name": name,
"description": template["name"],
"severity": template["severity"].value,
"threshold": template["threshold"],
"duration": template["duration"]
}
for name, template in self.rule_templates.items()
]
def validate_query(self, query: str, datasource_type: str = "prometheus") -> Dict:
"""验证查询语法"""
validation_result = {
"valid": True,
"errors": [],
"warnings": [],
"suggestions": []
}
if datasource_type == "prometheus":
# 基本语法检查
if not query.strip():
validation_result["errors"].append("查询不能为空")
validation_result["valid"] = False
# 检查括号匹配
if query.count('(') != query.count(')'):
validation_result["errors"].append("括号不匹配")
validation_result["valid"] = False
# 检查常见函数
prometheus_functions = ['rate', 'irate', 'increase', 'sum', 'avg', 'max', 'min']
used_functions = [func for func in prometheus_functions if func in query]
if 'rate(' in query and '[' not in query:
validation_result["warnings"].append("rate函数通常需要时间范围参数")
if len(used_functions) > 3:
validation_result["warnings"].append("查询可能过于复杂")
# 性能建议
if 'without' not in query and 'by' not in query and any(func in query for func in ['sum', 'avg']):
validation_result["suggestions"].append("考虑使用by或without子句优化聚合查询")
return validation_result
# 使用示例
builder = AlertRuleBuilder()
# 从模板创建告警规则
cpu_rule = builder.create_from_template(
"cpu_usage",
alert_manager,
"prometheus_uid",
{"threshold": 75, "duration": "3m"} # 自定义参数
)
print("CPU告警规则创建完成:", cpu_rule.name)
# 创建异常检测规则
anomaly_rule = builder.create_anomaly_detection_rule(
"Response Time Anomaly",
alert_manager,
"prometheus_uid",
"http_request_duration_seconds",
sensitivity=2.5
)
print("异常检测规则创建完成:", anomaly_rule.name)
# 验证查询
validation = builder.validate_query("rate(http_requests_total[5m])")
print("查询验证结果:", "通过" if validation["valid"] else "失败")
# 获取模板列表
templates = builder.get_template_list()
print("可用模板数量:", len(templates))
2. 高级告警配置
class AdvancedAlertConfig:
"""高级告警配置"""
def __init__(self):
self.alert_policies: Dict[str, Dict] = {}
self.maintenance_windows: List[Dict] = []
self.alert_dependencies: Dict[str, List[str]] = {}
def create_alert_policy(self, name: str, rules: List[str],
notification_policy: Dict) -> str:
"""创建告警策略"""
policy_id = f"policy_{name.lower().replace(' ', '_')}"
self.alert_policies[policy_id] = {
"name": name,
"rules": rules,
"notification_policy": notification_policy,
"enabled": True,
"created_at": datetime.now().isoformat()
}
return policy_id
def create_notification_policy(self, severity_routing: Dict,
time_based_routing: Dict = None) -> Dict:
"""创建通知策略"""
policy = {
"severity_routing": severity_routing,
"time_based_routing": time_based_routing or {},
"escalation": {
"enabled": True,
"levels": [
{"delay": "5m", "channels": ["primary"]},
{"delay": "15m", "channels": ["secondary"]},
{"delay": "30m", "channels": ["manager"]}
]
},
"grouping": {
"enabled": True,
"by": ["alertname", "instance"],
"wait": "10s",
"interval": "5m"
},
"inhibition": {
"enabled": True,
"rules": [
{
"source_match": {"severity": "critical"},
"target_match": {"severity": "warning"},
"equal": ["instance"]
}
]
}
}
return policy
def create_maintenance_window(self, name: str, start_time: datetime,
end_time: datetime, affected_services: List[str],
alert_actions: str = "silence") -> str:
"""创建维护窗口"""
window_id = f"maint_{len(self.maintenance_windows) + 1}"
window = {
"id": window_id,
"name": name,
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat(),
"affected_services": affected_services,
"alert_actions": alert_actions, # silence, reduce_severity, custom
"created_by": "admin",
"status": "scheduled"
}
self.maintenance_windows.append(window)
return window_id
def set_alert_dependency(self, dependent_alert: str, dependencies: List[str]):
"""设置告警依赖关系"""
self.alert_dependencies[dependent_alert] = dependencies
def create_conditional_alert(self, name: str, alert_manager: AlertManager,
datasource_uid: str, primary_condition: Dict,
secondary_conditions: List[Dict],
logic_operator: str = "AND") -> AlertRule:
"""创建条件告警"""
rule = alert_manager.create_rule(name, datasource_uid)
# 构建复合查询
if logic_operator == "AND":
# 所有条件都必须满足
combined_query = f"({primary_condition['query']}) and "
combined_query += " and ".join([f"({cond['query']})" for cond in secondary_conditions])
elif logic_operator == "OR":
# 任一条件满足即可
combined_query = f"({primary_condition['query']}) or "
combined_query += " or ".join([f"({cond['query']})" for cond in secondary_conditions])
else:
combined_query = primary_condition['query']
condition = AlertCondition(
query=combined_query,
reducer="last",
evaluator_type=primary_condition.get("operator", "gt"),
evaluator_params=[primary_condition.get("threshold", 0)],
time_range="5m"
)
rule.add_condition(condition)
rule.add_annotation("logic_operator", logic_operator)
rule.add_label("type", "conditional")
return rule
def create_multi_metric_alert(self, name: str, alert_manager: AlertManager,
datasource_uid: str, metrics: List[Dict],
correlation_threshold: float = 0.8) -> AlertRule:
"""创建多指标关联告警"""
rule = alert_manager.create_rule(name, datasource_uid)
# 构建关联查询(简化示例)
base_metric = metrics[0]
correlation_query = f"""
(
{base_metric['query']} > {base_metric['threshold']}
) and (
{' and '.join([f"{m['query']} > {m['threshold']}" for m in metrics[1:]])}
)
"""
condition = AlertCondition(
query=correlation_query,
reducer="last",
evaluator_type="gt",
evaluator_params=[0],
time_range="5m"
)
rule.add_condition(condition)
rule.add_annotation("correlation_threshold", str(correlation_threshold))
rule.add_annotation("metrics_count", str(len(metrics)))
rule.add_label("type", "multi_metric")
return rule
def create_time_based_alert(self, name: str, alert_manager: AlertManager,
datasource_uid: str, query: str,
time_conditions: Dict) -> AlertRule:
"""创建基于时间的告警"""
rule = alert_manager.create_rule(name, datasource_uid)
# 添加时间条件到查询
time_query = query
if "business_hours_only" in time_conditions and time_conditions["business_hours_only"]:
time_query = f"({query}) and on() (hour() >= 9 and hour() <= 17)"
if "weekdays_only" in time_conditions and time_conditions["weekdays_only"]:
time_query = f"({time_query}) and on() (day_of_week() >= 1 and day_of_week() <= 5)"
condition = AlertCondition(
query=time_query,
reducer="last",
evaluator_type="gt",
evaluator_params=[time_conditions.get("threshold", 0)],
time_range="5m"
)
rule.add_condition(condition)
rule.add_annotation("time_conditions", str(time_conditions))
rule.add_label("type", "time_based")
return rule
def generate_alert_summary_report(self, alert_manager: AlertManager) -> str:
"""生成告警摘要报告"""
rules = alert_manager.list_rules()
# 统计信息
total_rules = len(rules)
severity_counts = {}
type_counts = {}
for rule in rules:
severity = rule.labels.get("severity", "unknown")
severity_counts[severity] = severity_counts.get(severity, 0) + 1
rule_type = rule.labels.get("type", "basic")
type_counts[rule_type] = type_counts.get(rule_type, 0) + 1
report = f"""
# 告警配置摘要报告
## 基本统计
- 总告警规则数: {total_rules}
- 告警策略数: {len(self.alert_policies)}
- 维护窗口数: {len(self.maintenance_windows)}
- 依赖关系数: {len(self.alert_dependencies)}
## 严重级别分布
"""
for severity, count in severity_counts.items():
percentage = (count / total_rules * 100) if total_rules > 0 else 0
report += f"- {severity}: {count} ({percentage:.1f}%)\n"
report += "\n## 规则类型分布\n"
for rule_type, count in type_counts.items():
percentage = (count / total_rules * 100) if total_rules > 0 else 0
report += f"- {rule_type}: {count} ({percentage:.1f}%)\n"
# 活跃维护窗口
now = datetime.now()
active_windows = [
w for w in self.maintenance_windows
if datetime.fromisoformat(w["start_time"]) <= now <= datetime.fromisoformat(w["end_time"])
]
if active_windows:
report += f"\n## 当前活跃维护窗口\n"
for window in active_windows:
report += f"- {window['name']}: {window['start_time']} - {window['end_time']}\n"
return report
# 使用示例
advanced_config = AdvancedAlertConfig()
# 创建通知策略
notification_policy = advanced_config.create_notification_policy(
severity_routing={
"critical": ["pagerduty", "email", "slack"],
"high": ["email", "slack"],
"medium": ["slack"],
"low": ["email"]
},
time_based_routing={
"business_hours": ["slack", "email"],
"after_hours": ["pagerduty"]
}
)
print("通知策略创建完成")
# 创建告警策略
policy_id = advanced_config.create_alert_policy(
"Production Monitoring",
[cpu_rule.uid, anomaly_rule.uid],
notification_policy
)
print("告警策略创建完成:", policy_id)
# 创建维护窗口
maintenance_id = advanced_config.create_maintenance_window(
"Database Maintenance",
datetime.now() + timedelta(days=1),
datetime.now() + timedelta(days=1, hours=2),
["database", "api"],
"silence"
)
print("维护窗口创建完成:", maintenance_id)
# 生成摘要报告
summary_report = advanced_config.generate_alert_summary_report(alert_manager)
print("摘要报告已生成")
通知渠道配置
1. 邮件通知配置
class EmailNotificationConfig:
"""邮件通知配置"""
def __init__(self):
self.smtp_settings = {
"host": "smtp.example.com",
"port": 587,
"username": "alerts@example.com",
"password": "your_password",
"from_address": "alerts@example.com",
"from_name": "Grafana Alerts",
"skip_verify": False,
"startTLS_policy": "MandatoryStartTLS"
}
self.email_templates = {}
def create_email_channel(self, alert_manager: AlertManager, name: str,
addresses: List[str], subject_template: str = None) -> str:
"""创建邮件通知渠道"""
settings = {
"addresses": ";".join(addresses),
"subject": subject_template or "[{{ .Status | toUpper }}] {{ .GroupLabels.alertname }}",
"body": self._get_default_email_template(),
"singleEmail": False
}
return alert_manager.create_notification_channel(name, NotificationChannel.EMAIL, settings)
def _get_default_email_template(self) -> str:
"""获取默认邮件模板"""
return """
<!DOCTYPE html>
<html>
<head>
<style>
body { font-family: Arial, sans-serif; }
.alert-header { background-color: {{ if eq .Status "firing" }}#d32f2f{{ else }}#388e3c{{ end }}; color: white; padding: 10px; }
.alert-content { padding: 20px; }
.alert-details { background-color: #f5f5f5; padding: 10px; margin: 10px 0; }
.label { font-weight: bold; }
</style>
</head>
<body>
<div class="alert-header">
<h2>{{ .Status | toUpper }}: {{ .GroupLabels.alertname }}</h2>
</div>
<div class="alert-content">
<p><span class="label">状态:</span> {{ .Status }}</p>
<p><span class="label">严重级别:</span> {{ .GroupLabels.severity }}</p>
<p><span class="label">触发时间:</span> {{ .StartsAt }}</p>
{{ if .EndsAt }}
<p><span class="label">结束时间:</span> {{ .EndsAt }}</p>
{{ end }}
<div class="alert-details">
<h3>告警详情</h3>
<p>{{ .CommonAnnotations.description }}</p>
{{ if .CommonAnnotations.summary }}
<p><span class="label">摘要:</span> {{ .CommonAnnotations.summary }}</p>
{{ end }}
</div>
<div class="alert-details">
<h3>标签信息</h3>
{{ range .GroupLabels.SortedPairs }}
<p><span class="label">{{ .Name }}:</span> {{ .Value }}</p>
{{ end }}
</div>
<p><a href="{{ .ExternalURL }}">查看Grafana仪表板</a></p>
</div>
</body>
</html>
"""
def create_custom_template(self, name: str, subject: str, body: str) -> Dict:
"""创建自定义邮件模板"""
template = {
"name": name,
"subject": subject,
"body": body,
"created_at": datetime.now().isoformat()
}
self.email_templates[name] = template
return template
def get_smtp_config(self) -> Dict:
"""获取SMTP配置"""
return {
"enabled": True,
"host": self.smtp_settings["host"],
"port": self.smtp_settings["port"],
"user": self.smtp_settings["username"],
"password": self.smtp_settings["password"],
"cert_file": "",
"key_file": "",
"skip_verify": self.smtp_settings["skip_verify"],
"from_address": self.smtp_settings["from_address"],
"from_name": self.smtp_settings["from_name"],
"ehlo_identity": "",
"startTLS_policy": self.smtp_settings["startTLS_policy"]
}
# 使用示例
email_config = EmailNotificationConfig()
# 创建邮件通知渠道
email_channel = email_config.create_email_channel(
alert_manager,
"Operations Team",
["ops@example.com", "admin@example.com"],
"[ALERT] {{ .GroupLabels.alertname }} - {{ .GroupLabels.severity }}"
)
print("邮件通知渠道创建完成:", email_channel)
2. Slack通知配置
class SlackNotificationConfig:
"""Slack通知配置"""
def __init__(self):
self.webhook_urls = {}
self.bot_tokens = {}
def create_slack_webhook_channel(self, alert_manager: AlertManager, name: str,
webhook_url: str, channel: str = "#alerts",
username: str = "Grafana") -> str:
"""创建Slack Webhook通知渠道"""
settings = {
"url": webhook_url,
"channel": channel,
"username": username,
"title": "{{ .Status | toUpper }}: {{ .GroupLabels.alertname }}",
"text": self._get_slack_message_template(),
"color": "{{ if eq .Status \"firing\" }}danger{{ else }}good{{ end }}",
"iconEmoji": ":exclamation:",
"iconUrl": "",
"linkNames": False,
"mentionChannel": "here",
"mentionUsers": "",
"mentionGroups": ""
}
return alert_manager.create_notification_channel(name, NotificationChannel.SLACK, settings)
def create_slack_bot_channel(self, alert_manager: AlertManager, name: str,
bot_token: str, channel: str = "#alerts") -> str:
"""创建Slack Bot通知渠道"""
settings = {
"token": bot_token,
"channel": channel,
"title": "{{ .Status | toUpper }}: {{ .GroupLabels.alertname }}",
"text": self._get_slack_message_template()
}
return alert_manager.create_notification_channel(name, NotificationChannel.SLACK, settings)
def _get_slack_message_template(self) -> str:
"""获取Slack消息模板"""
return """
*告警状态:* {{ .Status | toUpper }}
*告警名称:* {{ .GroupLabels.alertname }}
*严重级别:* {{ .GroupLabels.severity }}
*触发时间:* {{ .StartsAt }}
{{ if .EndsAt }}*结束时间:* {{ .EndsAt }}{{ end }}
*描述:* {{ .CommonAnnotations.description }}
{{ if .CommonAnnotations.summary }}*摘要:* {{ .CommonAnnotations.summary }}{{ end }}
*标签:*
{{ range .GroupLabels.SortedPairs }}
• {{ .Name }}: {{ .Value }}
{{ end }}
<{{ .ExternalURL }}|查看详情>
"""
def create_slack_blocks_template(self) -> List[Dict]:
"""创建Slack Blocks模板"""
return [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "{{ .Status | toUpper }}: {{ .GroupLabels.alertname }}"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*状态:*\n{{ .Status }}"
},
{
"type": "mrkdwn",
"text": "*严重级别:*\n{{ .GroupLabels.severity }}"
},
{
"type": "mrkdwn",
"text": "*触发时间:*\n{{ .StartsAt }}"
},
{
"type": "mrkdwn",
"text": "*实例:*\n{{ .GroupLabels.instance }}"
}
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*描述:*\n{{ .CommonAnnotations.description }}"
}
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {
"type": "plain_text",
"text": "查看仪表板"
},
"url": "{{ .ExternalURL }}",
"style": "primary"
},
{
"type": "button",
"text": {
"type": "plain_text",
"text": "静默告警"
},
"url": "{{ .ExternalURL }}/alerting/silences",
"style": "danger"
}
]
}
]
# 使用示例
slack_config = SlackNotificationConfig()
# 创建Slack Webhook通知渠道
slack_channel = slack_config.create_slack_webhook_channel(
alert_manager,
"Slack Alerts",
"https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
"#monitoring",
"Grafana Bot"
)
print("Slack通知渠道创建完成:", slack_channel)
3. 企业微信和钉钉配置
class EnterpriseNotificationConfig:
"""企业通信工具通知配置"""
def __init__(self):
self.wechat_settings = {}
self.dingtalk_settings = {}
def create_wechat_channel(self, alert_manager: AlertManager, name: str,
corp_id: str, agent_id: str, secret: str,
to_user: str = "@all") -> str:
"""创建企业微信通知渠道"""
settings = {
"corp_id": corp_id,
"agent_id": agent_id,
"secret": secret,
"to_user": to_user,
"to_party": "",
"to_tag": "",
"message": self._get_wechat_message_template(),
"api_url": "https://qyapi.weixin.qq.com/cgi-bin/"
}
return alert_manager.create_notification_channel(name, NotificationChannel.WECHAT, settings)
def create_dingtalk_channel(self, alert_manager: AlertManager, name: str,
webhook_url: str, secret: str = None) -> str:
"""创建钉钉通知渠道"""
settings = {
"url": webhook_url,
"secret": secret or "",
"message": self._get_dingtalk_message_template(),
"msgtype": "markdown",
"title": "Grafana告警通知"
}
return alert_manager.create_notification_channel(name, NotificationChannel.DINGTALK, settings)
def _get_wechat_message_template(self) -> str:
"""获取企业微信消息模板"""
return """
【{{ .Status | toUpper }}】{{ .GroupLabels.alertname }}
告警状态: {{ .Status }}
严重级别: {{ .GroupLabels.severity }}
触发时间: {{ .StartsAt }}
{{ if .EndsAt }}结束时间: {{ .EndsAt }}{{ end }}
告警描述:
{{ .CommonAnnotations.description }}
标签信息:
{{ range .GroupLabels.SortedPairs }}
{{ .Name }}: {{ .Value }}
{{ end }}
查看详情: {{ .ExternalURL }}
"""
def _get_dingtalk_message_template(self) -> str:
"""获取钉钉消息模板"""
return """
## {{ .Status | toUpper }}: {{ .GroupLabels.alertname }}
**告警状态:** {{ .Status }}
**严重级别:** {{ .GroupLabels.severity }}
**触发时间:** {{ .StartsAt }}
{{ if .EndsAt }}**结束时间:** {{ .EndsAt }}{{ end }}
**告警描述:**
{{ .CommonAnnotations.description }}
**标签信息:**
{{ range .GroupLabels.SortedPairs }}
- {{ .Name }}: {{ .Value }}
{{ end }}
[查看详情]({{ .ExternalURL }})
"""
def create_teams_channel(self, alert_manager: AlertManager, name: str,
webhook_url: str) -> str:
"""创建Microsoft Teams通知渠道"""
settings = {
"url": webhook_url,
"title": "{{ .Status | toUpper }}: {{ .GroupLabels.alertname }}",
"text": self._get_teams_message_template(),
"sectionTitle": "告警详情"
}
return alert_manager.create_notification_channel(name, NotificationChannel.TEAMS, settings)
def _get_teams_message_template(self) -> str:
"""获取Teams消息模板"""
return """
**告警状态:** {{ .Status }}
**严重级别:** {{ .GroupLabels.severity }}
**触发时间:** {{ .StartsAt }}
{{ if .EndsAt }}**结束时间:** {{ .EndsAt }}{{ end }}
**描述:** {{ .CommonAnnotations.description }}
**标签:**
{{ range .GroupLabels.SortedPairs }}
- {{ .Name }}: {{ .Value }}
{{ end }}
[查看Grafana]({{ .ExternalURL }})
"""
# 使用示例
enterprise_config = EnterpriseNotificationConfig()
# 创建企业微信通知渠道
wechat_channel = enterprise_config.create_wechat_channel(
alert_manager,
"WeChat Alerts",
"your_corp_id",
"your_agent_id",
"your_secret",
"@all"
)
print("企业微信通知渠道创建完成:", wechat_channel)
# 创建钉钉通知渠道
dingtalk_channel = enterprise_config.create_dingtalk_channel(
alert_manager,
"DingTalk Alerts",
"https://oapi.dingtalk.com/robot/send?access_token=YOUR_TOKEN",
"your_secret"
)
print("钉钉通知渠道创建完成:", dingtalk_channel)
4. Webhook和自定义通知
class WebhookNotificationConfig:
"""Webhook和自定义通知配置"""
def __init__(self):
self.webhook_templates = {}
def create_webhook_channel(self, alert_manager: AlertManager, name: str,
url: str, method: str = "POST",
headers: Dict = None, auth: Dict = None) -> str:
"""创建Webhook通知渠道"""
default_headers = {
"Content-Type": "application/json",
"User-Agent": "Grafana"
}
if headers:
default_headers.update(headers)
settings = {
"url": url,
"httpMethod": method,
"maxAlerts": 0,
"authorization": auth or {},
"httpHeaders": default_headers,
"body": self._get_webhook_payload_template()
}
return alert_manager.create_notification_channel(name, NotificationChannel.WEBHOOK, settings)
def _get_webhook_payload_template(self) -> str:
"""获取Webhook负载模板"""
return """
{
"alert_name": "{{ .GroupLabels.alertname }}",
"status": "{{ .Status }}",
"severity": "{{ .GroupLabels.severity }}",
"starts_at": "{{ .StartsAt }}",
"ends_at": "{{ .EndsAt }}",
"description": "{{ .CommonAnnotations.description }}",
"summary": "{{ .CommonAnnotations.summary }}",
"labels": {
{{ range .GroupLabels.SortedPairs }}
"{{ .Name }}": "{{ .Value }}"{{ if not (last $.GroupLabels.SortedPairs .) }},{{ end }}
{{ end }}
},
"annotations": {
{{ range .CommonAnnotations.SortedPairs }}
"{{ .Name }}": "{{ .Value }}"{{ if not (last $.CommonAnnotations.SortedPairs .) }},{{ end }}
{{ end }}
},
"external_url": "{{ .ExternalURL }}",
"timestamp": "{{ now }}"
}
"""
def create_pagerduty_channel(self, alert_manager: AlertManager, name: str,
integration_key: str, severity: str = "error") -> str:
"""创建PagerDuty通知渠道"""
settings = {
"integrationKey": integration_key,
"severity": severity,
"class": "grafana",
"component": "{{ .GroupLabels.alertname }}",
"group": "{{ .GroupLabels.instance }}",
"summary": "{{ .CommonAnnotations.summary }}",
"source": "Grafana"
}
return alert_manager.create_notification_channel(name, NotificationChannel.PAGERDUTY, settings)
def create_custom_api_channel(self, alert_manager: AlertManager, name: str,
api_config: Dict) -> str:
"""创建自定义API通知渠道"""
# 构建自定义API调用配置
webhook_url = api_config["base_url"] + api_config.get("endpoint", "/alerts")
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_config.get('api_token', '')}",
"X-API-Version": api_config.get("api_version", "v1")
}
# 自定义负载格式
custom_payload = {
"event_type": "alert",
"source": "grafana",
"alert": {
"name": "{{ .GroupLabels.alertname }}",
"status": "{{ .Status }}",
"severity": "{{ .GroupLabels.severity }}",
"timestamp": "{{ .StartsAt }}",
"description": "{{ .CommonAnnotations.description }}",
"labels": "{{ .GroupLabels }}",
"annotations": "{{ .CommonAnnotations }}"
},
"metadata": {
"grafana_url": "{{ .ExternalURL }}",
"rule_id": "{{ .GroupLabels.rule_id }}",
"dashboard_id": "{{ .GroupLabels.dashboard_id }}"
}
}
settings = {
"url": webhook_url,
"httpMethod": "POST",
"httpHeaders": headers,
"body": json.dumps(custom_payload, indent=2)
}
return alert_manager.create_notification_channel(name, NotificationChannel.WEBHOOK, settings)
def create_notification_test_suite(self, alert_manager: AlertManager) -> Dict:
"""创建通知测试套件"""
test_results = {}
# 测试所有通知渠道
for channel_uid, channel in alert_manager.notification_channels.items():
test_result = alert_manager.test_notification(channel_uid)
test_results[channel["name"]] = {
"channel_type": channel["type"],
"success": test_result["success"],
"message": test_result.get("message", ""),
"error": test_result.get("error", ""),
"timestamp": test_result["timestamp"]
}
return {
"total_channels": len(alert_manager.notification_channels),
"successful_tests": sum(1 for r in test_results.values() if r["success"]),
"failed_tests": sum(1 for r in test_results.values() if not r["success"]),
"test_results": test_results,
"test_timestamp": datetime.now().isoformat()
}
def generate_notification_config_backup(self, alert_manager: AlertManager) -> Dict:
"""生成通知配置备份"""
backup = {
"backup_timestamp": datetime.now().isoformat(),
"grafana_version": "9.0.0",
"notification_channels": [],
"alert_rules": [],
"notification_policies": []
}
# 备份通知渠道
for channel in alert_manager.notification_channels.values():
# 移除敏感信息
safe_channel = channel.copy()
if "settings" in safe_channel:
settings = safe_channel["settings"].copy()
# 移除密码、token等敏感信息
sensitive_keys = ["password", "token", "secret", "key", "webhook_url"]
for key in sensitive_keys:
if key in settings:
settings[key] = "[REDACTED]"
safe_channel["settings"] = settings
backup["notification_channels"].append(safe_channel)
# 备份告警规则(仅包含通知相关配置)
for rule in alert_manager.rules.values():
rule_backup = {
"uid": rule.uid,
"name": rule.name,
"notification_uids": rule.notification_uids,
"labels": rule.labels,
"annotations": rule.annotations
}
backup["alert_rules"].append(rule_backup)
return backup
# 使用示例
webhook_config = WebhookNotificationConfig()
# 创建Webhook通知渠道
webhook_channel = webhook_config.create_webhook_channel(
alert_manager,
"Custom Webhook",
"https://api.example.com/webhooks/alerts",
"POST",
{"X-API-Key": "your-api-key"},
{"type": "bearer", "token": "your-token"}
)
print("Webhook通知渠道创建完成:", webhook_channel)
# 创建PagerDuty通知渠道
pagerduty_channel = webhook_config.create_pagerduty_channel(
alert_manager,
"PagerDuty Alerts",
"your-integration-key",
"critical"
)
print("PagerDuty通知渠道创建完成:", pagerduty_channel)
# 运行通知测试套件
test_suite_results = webhook_config.create_notification_test_suite(alert_manager)
print(f"通知测试完成: {test_suite_results['successful_tests']}/{test_suite_results['total_channels']} 成功")
# 生成配置备份
config_backup = webhook_config.generate_notification_config_backup(alert_manager)
print("配置备份已生成,包含", len(config_backup["notification_channels"]), "个通知渠道")
告警策略管理
1. 告警分组和路由
class AlertPolicyManager:
"""告警策略管理器"""
def __init__(self):
self.policies = {}
self.routes = {}
self.inhibit_rules = {}
def create_alert_policy(self, name: str, match_labels: Dict,
notification_channels: List[str],
group_by: List[str] = None,
group_wait: str = "10s",
group_interval: str = "5m",
repeat_interval: str = "12h") -> str:
"""创建告警策略"""
policy_uid = f"policy_{uuid.uuid4().hex[:8]}"
policy = {
"uid": policy_uid,
"name": name,
"match_labels": match_labels,
"notification_channels": notification_channels,
"group_by": group_by or ["alertname", "cluster", "service"],
"group_wait": group_wait,
"group_interval": group_interval,
"repeat_interval": repeat_interval,
"created_at": datetime.now().isoformat()
}
self.policies[policy_uid] = policy
return policy_uid
def create_routing_tree(self) -> Dict:
"""创建告警路由树"""
routing_tree = {
"receiver": "default",
"group_by": ["alertname"],
"routes": [
{
"match": {"severity": "critical"},
"receiver": "critical-alerts",
"group_wait": "5s",
"group_interval": "2m",
"repeat_interval": "5m",
"routes": [
{
"match": {"service": "database"},
"receiver": "database-team",
"group_wait": "0s",
"repeat_interval": "2m"
},
{
"match": {"service": "web"},
"receiver": "web-team",
"group_wait": "0s",
"repeat_interval": "2m"
}
]
},
{
"match": {"severity": "warning"},
"receiver": "warning-alerts",
"group_wait": "30s",
"group_interval": "10m",
"repeat_interval": "1h"
},
{
"match": {"alertname": "Watchdog"},
"receiver": "null",
"group_wait": "0s",
"group_interval": "1m",
"repeat_interval": "1m"
}
]
}
return routing_tree
def create_inhibit_rules(self) -> List[Dict]:
"""创建告警抑制规则"""
inhibit_rules = [
{
"source_match": {"severity": "critical"},
"target_match": {"severity": "warning"},
"equal": ["alertname", "instance"]
},
{
"source_match": {"alertname": "NodeDown"},
"target_match_re": {"alertname": ".*"},
"equal": ["instance"]
},
{
"source_match": {"service": "database", "severity": "critical"},
"target_match": {"service": "web", "severity": "warning"},
"equal": ["cluster"]
}
]
return inhibit_rules
def create_time_based_routing(self) -> Dict:
"""创建基于时间的路由"""
time_routing = {
"receiver": "default",
"routes": [
{
"match": {"severity": "critical"},
"receiver": "oncall-primary",
"active_time_intervals": ["business-hours"]
},
{
"match": {"severity": "critical"},
"receiver": "oncall-secondary",
"active_time_intervals": ["after-hours"]
},
{
"match": {"severity": "warning"},
"receiver": "email-only",
"active_time_intervals": ["business-hours"]
},
{
"match": {"severity": "warning"},
"receiver": "null",
"active_time_intervals": ["after-hours"]
}
],
"time_intervals": [
{
"name": "business-hours",
"time_intervals": [
{
"times": [
{"start_time": "09:00", "end_time": "18:00"}
],
"weekdays": ["monday:friday"]
}
]
},
{
"name": "after-hours",
"time_intervals": [
{
"times": [
{"start_time": "18:01", "end_time": "08:59"}
],
"weekdays": ["monday:friday"]
},
{
"times": [
{"start_time": "00:00", "end_time": "23:59"}
],
"weekdays": ["saturday", "sunday"]
}
]
}
]
}
return time_routing
def create_escalation_policy(self, name: str, steps: List[Dict]) -> Dict:
"""创建告警升级策略"""
escalation_policy = {
"name": name,
"steps": steps,
"created_at": datetime.now().isoformat()
}
# 示例升级步骤
example_steps = [
{
"step": 1,
"wait_time": "5m",
"receivers": ["primary-oncall"],
"notification_methods": ["email", "sms"]
},
{
"step": 2,
"wait_time": "10m",
"receivers": ["secondary-oncall"],
"notification_methods": ["email", "sms", "phone"]
},
{
"step": 3,
"wait_time": "15m",
"receivers": ["manager", "team-lead"],
"notification_methods": ["email", "phone"]
}
]
return escalation_policy
def generate_alertmanager_config(self) -> Dict:
"""生成Alertmanager配置"""
config = {
"global": {
"smtp_smarthost": "localhost:587",
"smtp_from": "alerts@example.com",
"resolve_timeout": "5m"
},
"templates": [
"/etc/alertmanager/templates/*.tmpl"
],
"route": self.create_routing_tree(),
"inhibit_rules": self.create_inhibit_rules(),
"receivers": [
{
"name": "default",
"email_configs": [
{
"to": "admin@example.com",
"subject": "[DEFAULT] {{ .GroupLabels.alertname }}",
"body": "{{ range .Alerts }}{{ .Annotations.description }}{{ end }}"
}
]
},
{
"name": "critical-alerts",
"email_configs": [
{
"to": "oncall@example.com",
"subject": "[CRITICAL] {{ .GroupLabels.alertname }}",
"body": "{{ template \"email.default.html\" . }}"
}
],
"slack_configs": [
{
"api_url": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
"channel": "#alerts-critical",
"title": "Critical Alert: {{ .GroupLabels.alertname }}",
"text": "{{ range .Alerts }}{{ .Annotations.description }}{{ end }}"
}
]
},
{
"name": "null"
}
]
}
return config
# 使用示例
policy_manager = AlertPolicyManager()
# 创建告警策略
critical_policy = policy_manager.create_alert_policy(
"Critical Alerts Policy",
{"severity": "critical"},
["email-critical", "slack-critical", "pagerduty"],
["alertname", "instance"],
"5s",
"2m",
"5m"
)
print("关键告警策略创建完成:", critical_policy)
# 生成Alertmanager配置
alertmanager_config = policy_manager.generate_alertmanager_config()
print("Alertmanager配置已生成")
2. 静默和维护窗口
class SilenceManager:
"""静默管理器"""
def __init__(self):
self.silences = {}
self.maintenance_windows = {}
def create_silence(self, matchers: List[Dict], starts_at: datetime,
ends_at: datetime, created_by: str, comment: str) -> str:
"""创建静默规则"""
silence_id = f"silence_{uuid.uuid4().hex[:8]}"
silence = {
"id": silence_id,
"matchers": matchers,
"starts_at": starts_at.isoformat(),
"ends_at": ends_at.isoformat(),
"created_by": created_by,
"comment": comment,
"created_at": datetime.now().isoformat(),
"status": "active" if starts_at <= datetime.now() <= ends_at else "pending"
}
self.silences[silence_id] = silence
return silence_id
def create_maintenance_window(self, name: str, services: List[str],
start_time: datetime, end_time: datetime,
description: str, contact: str) -> str:
"""创建维护窗口"""
window_id = f"maint_{uuid.uuid4().hex[:8]}"
# 为每个服务创建静默规则
silence_ids = []
for service in services:
matchers = [
{"name": "service", "value": service, "isRegex": False},
{"name": "alertname", "value": ".*", "isRegex": True}
]
silence_id = self.create_silence(
matchers,
start_time,
end_time,
contact,
f"Maintenance window: {name} - {description}"
)
silence_ids.append(silence_id)
maintenance_window = {
"id": window_id,
"name": name,
"services": services,
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat(),
"description": description,
"contact": contact,
"silence_ids": silence_ids,
"status": "scheduled",
"created_at": datetime.now().isoformat()
}
self.maintenance_windows[window_id] = maintenance_window
return window_id
def create_recurring_silence(self, name: str, matchers: List[Dict],
schedule: Dict, duration_hours: int,
created_by: str, comment: str) -> str:
"""创建周期性静默"""
recurring_id = f"recurring_{uuid.uuid4().hex[:8]}"
# 生成未来30天的静默规则
silence_ids = []
current_date = datetime.now().date()
end_date = current_date + timedelta(days=30)
while current_date <= end_date:
# 检查是否匹配调度规则
if self._matches_schedule(current_date, schedule):
start_time = datetime.combine(
current_date,
datetime.strptime(schedule["start_time"], "%H:%M").time()
)
end_time = start_time + timedelta(hours=duration_hours)
silence_id = self.create_silence(
matchers,
start_time,
end_time,
created_by,
f"Recurring silence: {name} - {comment}"
)
silence_ids.append(silence_id)
current_date += timedelta(days=1)
recurring_silence = {
"id": recurring_id,
"name": name,
"matchers": matchers,
"schedule": schedule,
"duration_hours": duration_hours,
"created_by": created_by,
"comment": comment,
"silence_ids": silence_ids,
"created_at": datetime.now().isoformat()
}
return recurring_id
def _matches_schedule(self, date: datetime.date, schedule: Dict) -> bool:
"""检查日期是否匹配调度规则"""
weekday = date.weekday() # 0=Monday, 6=Sunday
if "weekdays" in schedule:
if weekday not in schedule["weekdays"]:
return False
if "monthly_days" in schedule:
if date.day not in schedule["monthly_days"]:
return False
return True
def get_active_silences(self) -> List[Dict]:
"""获取当前活跃的静默规则"""
now = datetime.now()
active_silences = []
for silence in self.silences.values():
starts_at = datetime.fromisoformat(silence["starts_at"])
ends_at = datetime.fromisoformat(silence["ends_at"])
if starts_at <= now <= ends_at:
active_silences.append(silence)
return active_silences
def expire_silence(self, silence_id: str) -> bool:
"""手动过期静默规则"""
if silence_id in self.silences:
self.silences[silence_id]["ends_at"] = datetime.now().isoformat()
self.silences[silence_id]["status"] = "expired"
return True
return False
def generate_silence_report(self) -> Dict:
"""生成静默规则报告"""
now = datetime.now()
report = {
"total_silences": len(self.silences),
"active_silences": 0,
"expired_silences": 0,
"pending_silences": 0,
"maintenance_windows": len(self.maintenance_windows),
"silence_details": [],
"generated_at": now.isoformat()
}
for silence in self.silences.values():
starts_at = datetime.fromisoformat(silence["starts_at"])
ends_at = datetime.fromisoformat(silence["ends_at"])
if starts_at <= now <= ends_at:
report["active_silences"] += 1
status = "active"
elif now > ends_at:
report["expired_silences"] += 1
status = "expired"
else:
report["pending_silences"] += 1
status = "pending"
report["silence_details"].append({
"id": silence["id"],
"status": status,
"created_by": silence["created_by"],
"comment": silence["comment"],
"duration": str(ends_at - starts_at),
"matchers_count": len(silence["matchers"])
})
return report
# 使用示例
silence_manager = SilenceManager()
# 创建临时静默
silence_id = silence_manager.create_silence(
[
{"name": "alertname", "value": "HighCPUUsage", "isRegex": False},
{"name": "instance", "value": "server-01", "isRegex": False}
],
datetime.now(),
datetime.now() + timedelta(hours=2),
"admin@example.com",
"Investigating high CPU usage on server-01"
)
print("静默规则创建完成:", silence_id)
# 创建维护窗口
maintenance_id = silence_manager.create_maintenance_window(
"Database Maintenance",
["mysql", "redis", "mongodb"],
datetime.now() + timedelta(days=1),
datetime.now() + timedelta(days=1, hours=4),
"Scheduled database maintenance and updates",
"dba@example.com"
)
print("维护窗口创建完成:", maintenance_id)
# 创建周期性静默(每周日凌晨2点维护2小时)
recurring_id = silence_manager.create_recurring_silence(
"Weekly Backup Silence",
[
{"name": "alertname", "value": "BackupRunning", "isRegex": False}
],
{
"weekdays": [6], # Sunday
"start_time": "02:00"
},
2,
"system@example.com",
"Weekly backup process"
)
print("周期性静默创建完成:", recurring_id)
# 生成静默报告
silence_report = silence_manager.generate_silence_report()
print(f"静默报告: {silence_report['active_silences']} 活跃, {silence_report['expired_silences']} 已过期")
故障排除
1. 告警故障诊断
class AlertTroubleshooter:
"""告警故障排除工具"""
def __init__(self):
self.diagnostic_tests = {}
self.common_issues = {}
def diagnose_alert_issues(self, alert_manager: AlertManager) -> Dict:
"""诊断告警系统问题"""
diagnosis = {
"timestamp": datetime.now().isoformat(),
"overall_health": "unknown",
"issues_found": [],
"recommendations": [],
"test_results": {}
}
# 测试告警规则
rule_test = self._test_alert_rules(alert_manager)
diagnosis["test_results"]["alert_rules"] = rule_test
# 测试通知渠道
notification_test = self._test_notification_channels(alert_manager)
diagnosis["test_results"]["notification_channels"] = notification_test
# 测试数据源连接
datasource_test = self._test_datasource_connectivity()
diagnosis["test_results"]["datasources"] = datasource_test
# 分析问题
issues = self._analyze_issues(diagnosis["test_results"])
diagnosis["issues_found"] = issues
# 生成建议
recommendations = self._generate_recommendations(issues)
diagnosis["recommendations"] = recommendations
# 确定整体健康状态
diagnosis["overall_health"] = self._determine_health_status(issues)
return diagnosis
def _test_alert_rules(self, alert_manager: AlertManager) -> Dict:
"""测试告警规则"""
test_result = {
"total_rules": len(alert_manager.rules),
"active_rules": 0,
"inactive_rules": 0,
"error_rules": 0,
"rule_details": []
}
for rule in alert_manager.rules.values():
rule_status = {
"name": rule.name,
"uid": rule.uid,
"status": "unknown",
"last_evaluation": "never",
"errors": []
}
# 模拟规则状态检查
if hasattr(rule, 'condition') and rule.condition:
try:
# 检查查询语法
if "invalid" in rule.condition.query.lower():
rule_status["status"] = "error"
rule_status["errors"].append("Invalid query syntax")
test_result["error_rules"] += 1
elif rule.no_data_state == AlertState.NO_DATA:
rule_status["status"] = "no_data"
test_result["inactive_rules"] += 1
else:
rule_status["status"] = "active"
rule_status["last_evaluation"] = datetime.now().isoformat()
test_result["active_rules"] += 1
except Exception as e:
rule_status["status"] = "error"
rule_status["errors"].append(str(e))
test_result["error_rules"] += 1
test_result["rule_details"].append(rule_status)
return test_result
def _test_notification_channels(self, alert_manager: AlertManager) -> Dict:
"""测试通知渠道"""
test_result = {
"total_channels": len(alert_manager.notification_channels),
"working_channels": 0,
"failed_channels": 0,
"channel_details": []
}
for channel_uid, channel in alert_manager.notification_channels.items():
channel_status = {
"name": channel["name"],
"type": channel["type"],
"uid": channel_uid,
"status": "unknown",
"last_test": "never",
"errors": []
}
# 执行通知测试
try:
test_response = alert_manager.test_notification(channel_uid)
if test_response["success"]:
channel_status["status"] = "working"
channel_status["last_test"] = test_response["timestamp"]
test_result["working_channels"] += 1
else:
channel_status["status"] = "failed"
channel_status["errors"].append(test_response.get("error", "Unknown error"))
test_result["failed_channels"] += 1
except Exception as e:
channel_status["status"] = "failed"
channel_status["errors"].append(str(e))
test_result["failed_channels"] += 1
test_result["channel_details"].append(channel_status)
return test_result
def _test_datasource_connectivity(self) -> Dict:
"""测试数据源连接"""
test_result = {
"total_datasources": 0,
"connected_datasources": 0,
"failed_datasources": 0,
"datasource_details": []
}
# 模拟数据源测试
common_datasources = [
{"name": "Prometheus", "type": "prometheus", "url": "http://localhost:9090"},
{"name": "MySQL", "type": "mysql", "url": "localhost:3306"},
{"name": "InfluxDB", "type": "influxdb", "url": "http://localhost:8086"}
]
for ds in common_datasources:
ds_status = {
"name": ds["name"],
"type": ds["type"],
"url": ds["url"],
"status": "unknown",
"response_time": 0,
"errors": []
}
# 模拟连接测试
try:
# 这里应该是实际的连接测试逻辑
import random
if random.choice([True, False, True]): # 2/3概率成功
ds_status["status"] = "connected"
ds_status["response_time"] = random.randint(10, 500)
test_result["connected_datasources"] += 1
else:
ds_status["status"] = "failed"
ds_status["errors"].append("Connection timeout")
test_result["failed_datasources"] += 1
except Exception as e:
ds_status["status"] = "failed"
ds_status["errors"].append(str(e))
test_result["failed_datasources"] += 1
test_result["datasource_details"].append(ds_status)
test_result["total_datasources"] += 1
return test_result
def _analyze_issues(self, test_results: Dict) -> List[Dict]:
"""分析测试结果中的问题"""
issues = []
# 分析告警规则问题
rule_results = test_results.get("alert_rules", {})
if rule_results.get("error_rules", 0) > 0:
issues.append({
"category": "alert_rules",
"severity": "high",
"title": "告警规则错误",
"description": f"发现 {rule_results['error_rules']} 个错误的告警规则",
"affected_items": [r["name"] for r in rule_results.get("rule_details", []) if r["status"] == "error"]
})
# 分析通知渠道问题
notification_results = test_results.get("notification_channels", {})
if notification_results.get("failed_channels", 0) > 0:
issues.append({
"category": "notifications",
"severity": "medium",
"title": "通知渠道故障",
"description": f"发现 {notification_results['failed_channels']} 个故障的通知渠道",
"affected_items": [c["name"] for c in notification_results.get("channel_details", []) if c["status"] == "failed"]
})
# 分析数据源问题
datasource_results = test_results.get("datasources", {})
if datasource_results.get("failed_datasources", 0) > 0:
issues.append({
"category": "datasources",
"severity": "high",
"title": "数据源连接失败",
"description": f"发现 {datasource_results['failed_datasources']} 个无法连接的数据源",
"affected_items": [d["name"] for d in datasource_results.get("datasource_details", []) if d["status"] == "failed"]
})
return issues
def _generate_recommendations(self, issues: List[Dict]) -> List[Dict]:
"""根据问题生成建议"""
recommendations = []
for issue in issues:
if issue["category"] == "alert_rules":
recommendations.append({
"category": "alert_rules",
"priority": "high",
"title": "修复告警规则",
"actions": [
"检查告警规则的查询语法",
"验证数据源连接",
"更新过时的指标名称",
"测试规则条件逻辑"
]
})
elif issue["category"] == "notifications":
recommendations.append({
"category": "notifications",
"priority": "medium",
"title": "修复通知渠道",
"actions": [
"验证通知渠道配置",
"检查API密钥和令牌",
"测试网络连接",
"更新过期的认证信息"
]
})
elif issue["category"] == "datasources":
recommendations.append({
"category": "datasources",
"priority": "high",
"title": "修复数据源连接",
"actions": [
"检查数据源服务状态",
"验证网络连接",
"更新连接配置",
"检查认证凭据"
]
})
return recommendations
def _determine_health_status(self, issues: List[Dict]) -> str:
"""确定整体健康状态"""
if not issues:
return "healthy"
high_severity_issues = [i for i in issues if i["severity"] == "high"]
if high_severity_issues:
return "critical"
medium_severity_issues = [i for i in issues if i["severity"] == "medium"]
if medium_severity_issues:
return "warning"
return "degraded"
def generate_health_report(self, alert_manager: AlertManager) -> str:
"""生成健康报告"""
diagnosis = self.diagnose_alert_issues(alert_manager)
report = f"""
# Grafana告警系统健康报告
**生成时间:** {diagnosis['timestamp']}
**整体状态:** {diagnosis['overall_health'].upper()}
## 系统概览
- **告警规则:** {diagnosis['test_results']['alert_rules']['total_rules']} 总计
- 活跃: {diagnosis['test_results']['alert_rules']['active_rules']}
- 错误: {diagnosis['test_results']['alert_rules']['error_rules']}
- **通知渠道:** {diagnosis['test_results']['notification_channels']['total_channels']} 总计
- 正常: {diagnosis['test_results']['notification_channels']['working_channels']}
- 故障: {diagnosis['test_results']['notification_channels']['failed_channels']}
- **数据源:** {diagnosis['test_results']['datasources']['total_datasources']} 总计
- 连接: {diagnosis['test_results']['datasources']['connected_datasources']}
- 失败: {diagnosis['test_results']['datasources']['failed_datasources']}
## 发现的问题
"""
if diagnosis['issues_found']:
for issue in diagnosis['issues_found']:
report += f"""
### {issue['title']} ({issue['severity'].upper()})
{issue['description']}
**受影响的项目:**
"""
for item in issue['affected_items']:
report += f"- {item}\n"
report += "\n"
else:
report += "未发现问题。\n\n"
report += "## 建议措施\n\n"
if diagnosis['recommendations']:
for rec in diagnosis['recommendations']:
report += f"""
### {rec['title']} (优先级: {rec['priority'].upper()})
"""
for action in rec['actions']:
report += f"- {action}\n"
report += "\n"
else:
report += "无需采取措施。\n"
return report
# 使用示例
troubleshooter = AlertTroubleshooter()
# 诊断告警系统
diagnosis = troubleshooter.diagnose_alert_issues(alert_manager)
print(f"系统健康状态: {diagnosis['overall_health']}")
print(f"发现问题数量: {len(diagnosis['issues_found'])}")
# 生成健康报告
health_report = troubleshooter.generate_health_report(alert_manager)
print("\n=== 健康报告 ===")
print(health_report)
总结
关键要点
告警规则设计
- 合理设置阈值和评估间隔
- 使用标签和注释提供上下文信息
- 实现多级告警和依赖关系
通知渠道配置
- 支持多种通知方式(邮件、Slack、企业微信等)
- 配置消息模板和格式化
- 实现通知测试和验证
告警策略管理
- 创建路由规则和分组策略
- 实现告警抑制和静默
- 配置升级策略和时间窗口
故障排除
- 定期检查告警系统健康状态
- 监控通知渠道可用性
- 分析和解决常见问题
最佳实践
规则管理
- 使用版本控制管理告警规则
- 定期审查和优化告警阈值
- 避免告警风暴和噪音
通知优化
- 根据严重级别选择通知方式
- 实现智能分组和去重
- 提供丰富的上下文信息
运维管理
- 建立告警响应流程
- 定期进行告警演练
- 收集和分析告警指标
下一步学习
高级功能
- 学习Grafana Unified Alerting
- 探索机器学习异常检测
- 集成外部告警系统
实践项目
- 构建完整的监控告警体系
- 实现自动化告警管理
- 开发自定义通知插件
相关技术
- Prometheus告警规则
- Alertmanager配置
- 监控最佳实践
通过本教程,你已经掌握了Grafana告警系统的核心概念和实践技能。继续探索和实践,构建可靠的监控告警体系!