8.1 监控系统概述
8.1.1 监控架构设计
MinIO提供了全面的监控和运维管理功能,支持多种监控方式和工具集成。
监控层次: - 系统级监控:CPU、内存、磁盘、网络 - 服务级监控:MinIO服务状态、API响应时间 - 业务级监控:存储桶使用情况、对象操作统计 - 安全监控:访问日志、异常行为检测
from minio import Minio
from minio.error import S3Error
from datetime import datetime, timedelta
import json
import requests
import threading
import time
from typing import Dict, List, Any, Optional, Tuple
import psutil
import logging
import sqlite3
import smtplib
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart
import matplotlib.pyplot as plt
import pandas as pd
from collections import defaultdict, deque
import subprocess
import os
import yaml
from prometheus_client import CollectorRegistry, Gauge, Counter, Histogram, start_http_server
import asyncio
import aiohttp
from dataclasses import dataclass
from enum import Enum
class AlertLevel(Enum):
"""告警级别"""
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
@dataclass
class MetricData:
"""监控指标数据"""
name: str
value: float
timestamp: datetime
labels: Dict[str, str] = None
unit: str = ""
description: str = ""
@dataclass
class AlertRule:
"""告警规则"""
name: str
metric: str
condition: str # >, <, >=, <=, ==, !=
threshold: float
duration: int # 持续时间(秒)
level: AlertLevel
message: str
enabled: bool = True
class MinIOMonitor:
"""MinIO监控系统"""
def __init__(self, endpoint: str, access_key: str, secret_key: str, secure: bool = False):
self.client = Minio(
endpoint=endpoint,
access_key=access_key,
secret_key=secret_key,
secure=secure
)
self.endpoint = endpoint
self.monitoring_active = False
self.metrics_history = defaultdict(deque)
self.alert_rules = []
self.alert_history = []
self.notification_handlers = []
# 监控配置
self.monitor_interval = 30 # 监控间隔(秒)
self.history_retention = 24 * 60 * 60 # 历史数据保留时间(秒)
self.max_history_points = 2880 # 最大历史数据点数(24小时,每30秒一个点)
# Prometheus指标
self.prometheus_registry = CollectorRegistry()
self.prometheus_metrics = self._setup_prometheus_metrics()
# 数据库连接
self.db_path = 'minio_monitoring.db'
self._init_database()
# 日志配置
self._setup_logging()
def _setup_logging(self):
"""设置日志"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('minio_monitor.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger('MinIOMonitor')
def _setup_prometheus_metrics(self) -> Dict[str, Any]:
"""设置Prometheus指标"""
metrics = {
'minio_up': Gauge(
'minio_up', 'MinIO服务是否可用',
registry=self.prometheus_registry
),
'minio_api_requests_total': Counter(
'minio_api_requests_total', 'API请求总数',
['method', 'status'], registry=self.prometheus_registry
),
'minio_api_request_duration_seconds': Histogram(
'minio_api_request_duration_seconds', 'API请求持续时间',
['method'], registry=self.prometheus_registry
),
'minio_bucket_count': Gauge(
'minio_bucket_count', '存储桶数量',
registry=self.prometheus_registry
),
'minio_object_count': Gauge(
'minio_object_count', '对象数量',
['bucket'], registry=self.prometheus_registry
),
'minio_bucket_size_bytes': Gauge(
'minio_bucket_size_bytes', '存储桶大小(字节)',
['bucket'], registry=self.prometheus_registry
),
'minio_disk_usage_percent': Gauge(
'minio_disk_usage_percent', '磁盘使用率',
registry=self.prometheus_registry
),
'minio_memory_usage_bytes': Gauge(
'minio_memory_usage_bytes', '内存使用量(字节)',
registry=self.prometheus_registry
),
'minio_cpu_usage_percent': Gauge(
'minio_cpu_usage_percent', 'CPU使用率',
registry=self.prometheus_registry
)
}
return metrics
def _init_database(self):
"""初始化数据库"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 创建指标表
cursor.execute('''
CREATE TABLE IF NOT EXISTS metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
value REAL NOT NULL,
timestamp DATETIME NOT NULL,
labels TEXT,
unit TEXT,
description TEXT
)
''')
# 创建告警表
cursor.execute('''
CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
rule_name TEXT NOT NULL,
level TEXT NOT NULL,
message TEXT NOT NULL,
triggered_at DATETIME NOT NULL,
resolved_at DATETIME,
status TEXT DEFAULT 'active'
)
''')
# 创建索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_metrics_name_timestamp ON metrics(name, timestamp)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_alerts_triggered_at ON alerts(triggered_at)')
conn.commit()
conn.close()
except Exception as e:
self.logger.error(f"初始化数据库失败: {e}")
def collect_system_metrics(self) -> Dict[str, MetricData]:
"""收集系统指标"""
try:
metrics = {}
current_time = datetime.now()
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
metrics['cpu_usage_percent'] = MetricData(
name='cpu_usage_percent',
value=cpu_percent,
timestamp=current_time,
unit='%',
description='CPU使用率'
)
# 内存使用情况
memory = psutil.virtual_memory()
metrics['memory_usage_bytes'] = MetricData(
name='memory_usage_bytes',
value=memory.used,
timestamp=current_time,
unit='bytes',
description='内存使用量'
)
metrics['memory_usage_percent'] = MetricData(
name='memory_usage_percent',
value=memory.percent,
timestamp=current_time,
unit='%',
description='内存使用率'
)
# 磁盘使用情况
disk = psutil.disk_usage('/')
metrics['disk_usage_bytes'] = MetricData(
name='disk_usage_bytes',
value=disk.used,
timestamp=current_time,
unit='bytes',
description='磁盘使用量'
)
metrics['disk_usage_percent'] = MetricData(
name='disk_usage_percent',
value=(disk.used / disk.total) * 100,
timestamp=current_time,
unit='%',
description='磁盘使用率'
)
# 网络IO
network = psutil.net_io_counters()
metrics['network_bytes_sent'] = MetricData(
name='network_bytes_sent',
value=network.bytes_sent,
timestamp=current_time,
unit='bytes',
description='网络发送字节数'
)
metrics['network_bytes_recv'] = MetricData(
name='network_bytes_recv',
value=network.bytes_recv,
timestamp=current_time,
unit='bytes',
description='网络接收字节数'
)
return metrics
except Exception as e:
self.logger.error(f"收集系统指标失败: {e}")
return {}
def collect_minio_metrics(self) -> Dict[str, MetricData]:
"""收集MinIO指标"""
try:
metrics = {}
current_time = datetime.now()
# 检查服务可用性
start_time = time.time()
try:
buckets = self.client.list_buckets()
service_available = 1
api_response_time = time.time() - start_time
except Exception:
service_available = 0
api_response_time = 0
buckets = []
metrics['service_available'] = MetricData(
name='service_available',
value=service_available,
timestamp=current_time,
description='MinIO服务可用性'
)
metrics['api_response_time'] = MetricData(
name='api_response_time',
value=api_response_time,
timestamp=current_time,
unit='seconds',
description='API响应时间'
)
# 存储桶统计
bucket_count = len(buckets)
metrics['bucket_count'] = MetricData(
name='bucket_count',
value=bucket_count,
timestamp=current_time,
description='存储桶数量'
)
# 统计每个存储桶的对象数量和大小
total_objects = 0
total_size = 0
for bucket in buckets:
try:
bucket_objects = 0
bucket_size = 0
# 限制检查的对象数量以避免性能问题
objects = list(self.client.list_objects(bucket.name, recursive=True))
bucket_objects = len(objects)
# 计算存储桶大小(限制检查的对象数量)
for obj in objects[:1000]: # 最多检查1000个对象
try:
stat = self.client.stat_object(bucket.name, obj.object_name)
bucket_size += stat.size
except:
pass
total_objects += bucket_objects
total_size += bucket_size
# 单个存储桶指标
metrics[f'bucket_objects_{bucket.name}'] = MetricData(
name='bucket_objects',
value=bucket_objects,
timestamp=current_time,
labels={'bucket': bucket.name},
description=f'存储桶 {bucket.name} 的对象数量'
)
metrics[f'bucket_size_{bucket.name}'] = MetricData(
name='bucket_size',
value=bucket_size,
timestamp=current_time,
labels={'bucket': bucket.name},
unit='bytes',
description=f'存储桶 {bucket.name} 的大小'
)
except Exception as bucket_error:
self.logger.warning(f"收集存储桶 {bucket.name} 指标失败: {bucket_error}")
metrics['total_objects'] = MetricData(
name='total_objects',
value=total_objects,
timestamp=current_time,
description='总对象数量'
)
metrics['total_size'] = MetricData(
name='total_size',
value=total_size,
timestamp=current_time,
unit='bytes',
description='总存储大小'
)
return metrics
except Exception as e:
self.logger.error(f"收集MinIO指标失败: {e}")
return {}
def collect_all_metrics(self) -> Dict[str, MetricData]:
"""收集所有指标"""
all_metrics = {}
# 收集系统指标
system_metrics = self.collect_system_metrics()
all_metrics.update(system_metrics)
# 收集MinIO指标
minio_metrics = self.collect_minio_metrics()
all_metrics.update(minio_metrics)
return all_metrics
def store_metrics(self, metrics: Dict[str, MetricData]):
"""存储指标到数据库"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
for metric in metrics.values():
labels_json = json.dumps(metric.labels) if metric.labels else None
cursor.execute('''
INSERT INTO metrics (name, value, timestamp, labels, unit, description)
VALUES (?, ?, ?, ?, ?, ?)
''', (
metric.name,
metric.value,
metric.timestamp,
labels_json,
metric.unit,
metric.description
))
conn.commit()
conn.close()
# 更新内存中的历史数据
for metric in metrics.values():
history = self.metrics_history[metric.name]
history.append((metric.timestamp, metric.value))
# 限制历史数据点数
while len(history) > self.max_history_points:
history.popleft()
except Exception as e:
self.logger.error(f"存储指标失败: {e}")
def update_prometheus_metrics(self, metrics: Dict[str, MetricData]):
"""更新Prometheus指标"""
try:
# 更新服务可用性
if 'service_available' in metrics:
self.prometheus_metrics['minio_up'].set(metrics['service_available'].value)
# 更新存储桶数量
if 'bucket_count' in metrics:
self.prometheus_metrics['minio_bucket_count'].set(metrics['bucket_count'].value)
# 更新系统指标
if 'cpu_usage_percent' in metrics:
self.prometheus_metrics['minio_cpu_usage_percent'].set(metrics['cpu_usage_percent'].value)
if 'memory_usage_bytes' in metrics:
self.prometheus_metrics['minio_memory_usage_bytes'].set(metrics['memory_usage_bytes'].value)
if 'disk_usage_percent' in metrics:
self.prometheus_metrics['minio_disk_usage_percent'].set(metrics['disk_usage_percent'].value)
# 更新存储桶特定指标
for metric_name, metric in metrics.items():
if metric.name == 'bucket_objects' and metric.labels:
bucket_name = metric.labels.get('bucket')
if bucket_name:
self.prometheus_metrics['minio_object_count'].labels(bucket=bucket_name).set(metric.value)
elif metric.name == 'bucket_size' and metric.labels:
bucket_name = metric.labels.get('bucket')
if bucket_name:
self.prometheus_metrics['minio_bucket_size_bytes'].labels(bucket=bucket_name).set(metric.value)
except Exception as e:
self.logger.error(f"更新Prometheus指标失败: {e}")
def add_alert_rule(self, rule: AlertRule):
"""添加告警规则"""
self.alert_rules.append(rule)
self.logger.info(f"添加告警规则: {rule.name}")
def check_alert_rules(self, metrics: Dict[str, MetricData]):
"""检查告警规则"""
try:
for rule in self.alert_rules:
if not rule.enabled:
continue
# 查找对应的指标
metric_found = False
for metric in metrics.values():
if metric.name == rule.metric:
metric_found = True
# 评估条件
triggered = self._evaluate_condition(
metric.value, rule.condition, rule.threshold
)
if triggered:
self._trigger_alert(rule, metric)
break
if not metric_found:
self.logger.warning(f"告警规则 {rule.name} 的指标 {rule.metric} 未找到")
except Exception as e:
self.logger.error(f"检查告警规则失败: {e}")
def _evaluate_condition(self, value: float, condition: str, threshold: float) -> bool:
"""评估告警条件"""
try:
if condition == '>':
return value > threshold
elif condition == '<':
return value < threshold
elif condition == '>=':
return value >= threshold
elif condition == '<=':
return value <= threshold
elif condition == '==':
return value == threshold
elif condition == '!=':
return value != threshold
else:
self.logger.error(f"不支持的条件: {condition}")
return False
except Exception as e:
self.logger.error(f"评估条件失败: {e}")
return False
def _trigger_alert(self, rule: AlertRule, metric: MetricData):
"""触发告警"""
try:
alert = {
'rule_name': rule.name,
'level': rule.level.value,
'message': rule.message.format(
metric_name=metric.name,
metric_value=metric.value,
threshold=rule.threshold,
timestamp=metric.timestamp
),
'triggered_at': datetime.now(),
'metric_value': metric.value,
'threshold': rule.threshold
}
# 存储告警到数据库
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO alerts (rule_name, level, message, triggered_at)
VALUES (?, ?, ?, ?)
''', (
alert['rule_name'],
alert['level'],
alert['message'],
alert['triggered_at']
))
conn.commit()
conn.close()
# 添加到历史记录
self.alert_history.append(alert)
# 发送通知
self._send_alert_notifications(alert)
self.logger.warning(f"触发告警: {alert['message']}")
except Exception as e:
self.logger.error(f"触发告警失败: {e}")
def _send_alert_notifications(self, alert: Dict[str, Any]):
"""发送告警通知"""
for handler in self.notification_handlers:
try:
handler(alert)
except Exception as e:
self.logger.error(f"发送告警通知失败: {e}")
def add_notification_handler(self, handler):
"""添加通知处理器"""
self.notification_handlers.append(handler)
def start_monitoring(self) -> Dict[str, Any]:
"""启动监控"""
try:
if self.monitoring_active:
return {
'success': False,
'error': '监控已在运行',
'monitoring_active': True
}
self.monitoring_active = True
def monitoring_loop():
while self.monitoring_active:
try:
# 收集指标
metrics = self.collect_all_metrics()
if metrics:
# 存储指标
self.store_metrics(metrics)
# 更新Prometheus指标
self.update_prometheus_metrics(metrics)
# 检查告警规则
self.check_alert_rules(metrics)
self.logger.info(f"收集了 {len(metrics)} 个指标")
time.sleep(self.monitor_interval)
except Exception as monitor_error:
self.logger.error(f"监控循环错误: {monitor_error}")
time.sleep(self.monitor_interval)
# 在后台线程中启动监控
monitor_thread = threading.Thread(target=monitoring_loop, daemon=True)
monitor_thread.start()
self.logger.info("监控系统已启动")
return {
'success': True,
'message': f'监控系统已启动,监控间隔: {self.monitor_interval}秒',
'monitoring_active': True,
'monitor_interval': self.monitor_interval,
'started_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f'启动监控失败: {e}',
'monitoring_active': False
}
def stop_monitoring(self) -> Dict[str, Any]:
"""停止监控"""
try:
if not self.monitoring_active:
return {
'success': False,
'error': '监控未在运行',
'monitoring_active': False
}
self.monitoring_active = False
self.logger.info("监控系统已停止")
return {
'success': True,
'message': '监控系统已停止',
'monitoring_active': False,
'stopped_at': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': f'停止监控失败: {e}',
'monitoring_active': self.monitoring_active
}
def get_metrics_history(self, metric_name: str, hours: int = 24) -> Dict[str, Any]:
"""获取指标历史数据"""
try:
end_time = datetime.now()
start_time = end_time - timedelta(hours=hours)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT timestamp, value FROM metrics
WHERE name = ? AND timestamp >= ? AND timestamp <= ?
ORDER BY timestamp
''', (metric_name, start_time, end_time))
rows = cursor.fetchall()
conn.close()
if not rows:
return {
'success': False,
'error': f'未找到指标 {metric_name} 的历史数据',
'metric_name': metric_name
}
timestamps = [datetime.fromisoformat(row[0]) for row in rows]
values = [row[1] for row in rows]
return {
'success': True,
'metric_name': metric_name,
'data_points': len(rows),
'start_time': start_time.isoformat(),
'end_time': end_time.isoformat(),
'timestamps': [ts.isoformat() for ts in timestamps],
'values': values,
'min_value': min(values),
'max_value': max(values),
'avg_value': sum(values) / len(values)
}
except Exception as e:
return {
'success': False,
'error': f'获取指标历史数据失败: {e}',
'metric_name': metric_name
}
def generate_report(self, hours: int = 24) -> Dict[str, Any]:
"""生成监控报告"""
try:
end_time = datetime.now()
start_time = end_time - timedelta(hours=hours)
report = {
'report_period': {
'start_time': start_time.isoformat(),
'end_time': end_time.isoformat(),
'duration_hours': hours
},
'system_summary': {},
'minio_summary': {},
'alerts_summary': {},
'recommendations': [],
'generated_at': datetime.now().isoformat()
}
# 系统指标摘要
system_metrics = ['cpu_usage_percent', 'memory_usage_percent', 'disk_usage_percent']
for metric in system_metrics:
history = self.get_metrics_history(metric, hours)
if history['success']:
report['system_summary'][metric] = {
'min': history['min_value'],
'max': history['max_value'],
'avg': history['avg_value'],
'current': history['values'][-1] if history['values'] else 0
}
# MinIO指标摘要
minio_metrics = ['service_available', 'api_response_time', 'bucket_count', 'total_objects']
for metric in minio_metrics:
history = self.get_metrics_history(metric, hours)
if history['success']:
report['minio_summary'][metric] = {
'min': history['min_value'],
'max': history['max_value'],
'avg': history['avg_value'],
'current': history['values'][-1] if history['values'] else 0
}
# 告警摘要
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT level, COUNT(*) FROM alerts
WHERE triggered_at >= ? AND triggered_at <= ?
GROUP BY level
''', (start_time, end_time))
alert_counts = dict(cursor.fetchall())
cursor.execute('''
SELECT COUNT(*) FROM alerts
WHERE triggered_at >= ? AND triggered_at <= ?
''', (start_time, end_time))
total_alerts = cursor.fetchone()[0]
conn.close()
report['alerts_summary'] = {
'total_alerts': total_alerts,
'by_level': alert_counts,
'critical_alerts': alert_counts.get('critical', 0),
'error_alerts': alert_counts.get('error', 0),
'warning_alerts': alert_counts.get('warning', 0)
}
# 生成建议
recommendations = []
# CPU使用率建议
if 'cpu_usage_percent' in report['system_summary']:
cpu_avg = report['system_summary']['cpu_usage_percent']['avg']
if cpu_avg > 80:
recommendations.append("CPU使用率过高,建议优化应用或增加计算资源")
elif cpu_avg < 10:
recommendations.append("CPU使用率较低,可以考虑减少资源分配")
# 内存使用率建议
if 'memory_usage_percent' in report['system_summary']:
memory_avg = report['system_summary']['memory_usage_percent']['avg']
if memory_avg > 85:
recommendations.append("内存使用率过高,建议增加内存或优化内存使用")
# 磁盘使用率建议
if 'disk_usage_percent' in report['system_summary']:
disk_avg = report['system_summary']['disk_usage_percent']['avg']
if disk_avg > 90:
recommendations.append("磁盘使用率过高,建议清理数据或扩展存储")
elif disk_avg > 80:
recommendations.append("磁盘使用率较高,建议监控存储增长趋势")
# API响应时间建议
if 'api_response_time' in report['minio_summary']:
api_avg = report['minio_summary']['api_response_time']['avg']
if api_avg > 1.0:
recommendations.append("API响应时间较慢,建议检查网络和服务器性能")
# 告警建议
if total_alerts > 0:
if alert_counts.get('critical', 0) > 0:
recommendations.append("存在严重告警,需要立即处理")
if alert_counts.get('error', 0) > 5:
recommendations.append("错误告警较多,建议检查系统配置")
report['recommendations'] = recommendations
return {
'success': True,
'report': report
}
except Exception as e:
return {
'success': False,
'error': f'生成监控报告失败: {e}'
}
## 8.2 告警和通知系统
### 8.2.1 告警规则配置
```python
class AlertManager:
"""告警管理器"""
def __init__(self, monitor: MinIOMonitor):
self.monitor = monitor
self.notification_channels = {}
self.alert_templates = {}
self.escalation_rules = []
self.silence_rules = []
def setup_default_alert_rules(self):
"""设置默认告警规则"""
default_rules = [
AlertRule(
name="MinIO服务不可用",
metric="service_available",
condition="<",
threshold=1,
duration=60,
level=AlertLevel.CRITICAL,
message="MinIO服务不可用,请立即检查服务状态"
),
AlertRule(
name="CPU使用率过高",
metric="cpu_usage_percent",
condition=">",
threshold=80,
duration=300,
level=AlertLevel.WARNING,
message="CPU使用率 {metric_value:.1f}% 超过阈值 {threshold}%"
),
AlertRule(
name="内存使用率过高",
metric="memory_usage_percent",
condition=">",
threshold=85,
duration=300,
level=AlertLevel.WARNING,
message="内存使用率 {metric_value:.1f}% 超过阈值 {threshold}%"
),
AlertRule(
name="磁盘使用率过高",
metric="disk_usage_percent",
condition=">",
threshold=90,
duration=600,
level=AlertLevel.ERROR,
message="磁盘使用率 {metric_value:.1f}% 超过阈值 {threshold}%"
),
AlertRule(
name="API响应时间过长",
metric="api_response_time",
condition=">",
threshold=2.0,
duration=180,
level=AlertLevel.WARNING,
message="API响应时间 {metric_value:.2f}秒 超过阈值 {threshold}秒"
)
]
for rule in default_rules:
self.monitor.add_alert_rule(rule)
return {
'success': True,
'message': f'已设置 {len(default_rules)} 个默认告警规则',
'rules_count': len(default_rules)
}
def add_email_notification(self, smtp_server: str, smtp_port: int,
username: str, password: str,
recipients: List[str]) -> Dict[str, Any]:
"""添加邮件通知"""
try:
def email_handler(alert: Dict[str, Any]):
try:
msg = MimeMultipart()
msg['From'] = username
msg['To'] = ', '.join(recipients)
msg['Subject'] = f"[MinIO告警] {alert['level'].upper()}: {alert['rule_name']}"
body = f"""
告警详情:
告警规则:{alert['rule_name']}
告警级别:{alert['level'].upper()}
告警消息:{alert['message']}
触发时间:{alert['triggered_at']}
指标值:{alert.get('metric_value', 'N/A')}
阈值:{alert.get('threshold', 'N/A')}
请及时处理此告警。
-- MinIO监控系统
"""
msg.attach(MimeText(body, 'plain', 'utf-8'))
server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls()
server.login(username, password)
server.send_message(msg)
server.quit()
except Exception as e:
self.monitor.logger.error(f"发送邮件通知失败: {e}")
self.monitor.add_notification_handler(email_handler)
self.notification_channels['email'] = {
'type': 'email',
'smtp_server': smtp_server,
'smtp_port': smtp_port,
'username': username,
'recipients': recipients,
'enabled': True
}
return {
'success': True,
'message': f'邮件通知已配置,收件人: {len(recipients)}个',
'recipients': recipients
}
except Exception as e:
return {
'success': False,
'error': f'配置邮件通知失败: {e}'
}
def add_webhook_notification(self, webhook_url: str,
headers: Dict[str, str] = None) -> Dict[str, Any]:
"""添加Webhook通知"""
try:
def webhook_handler(alert: Dict[str, Any]):
try:
payload = {
'alert_rule': alert['rule_name'],
'level': alert['level'],
'message': alert['message'],
'triggered_at': alert['triggered_at'].isoformat(),
'metric_value': alert.get('metric_value'),
'threshold': alert.get('threshold'),
'source': 'MinIO监控系统'
}
response = requests.post(
webhook_url,
json=payload,
headers=headers or {},
timeout=10
)
if response.status_code != 200:
self.monitor.logger.error(
f"Webhook通知失败,状态码: {response.status_code}"
)
except Exception as e:
self.monitor.logger.error(f"发送Webhook通知失败: {e}")
self.monitor.add_notification_handler(webhook_handler)
self.notification_channels['webhook'] = {
'type': 'webhook',
'url': webhook_url,
'headers': headers or {},
'enabled': True
}
return {
'success': True,
'message': f'Webhook通知已配置: {webhook_url}',
'webhook_url': webhook_url
}
except Exception as e:
return {
'success': False,
'error': f'配置Webhook通知失败: {e}'
}
## 8.3 实际应用示例
### 8.3.1 完整的监控系统部署
```python
class MinIOMonitoringSystem:
"""MinIO完整监控系统"""
def __init__(self):
self.monitor = None
self.alert_manager = None
self.prometheus_port = 8000
self.dashboard_data = {}
def setup_monitoring_system(self, endpoint: str, access_key: str,
secret_key: str) -> Dict[str, Any]:
"""设置监控系统"""
try:
print("=== MinIO监控系统设置 ===")
# 1. 初始化监控器
print("\n1. 初始化监控器...")
self.monitor = MinIOMonitor(endpoint, access_key, secret_key)
print("✅ 监控器初始化成功")
# 2. 设置告警管理器
print("\n2. 设置告警管理器...")
self.alert_manager = AlertManager(self.monitor)
# 设置默认告警规则
rules_result = self.alert_manager.setup_default_alert_rules()
print(f"✅ 已设置 {rules_result['rules_count']} 个告警规则")
# 3. 配置通知渠道
print("\n3. 配置通知渠道...")
# 配置邮件通知(示例)
email_result = self.alert_manager.add_email_notification(
smtp_server='smtp.gmail.com',
smtp_port=587,
username='your-email@gmail.com',
password='your-app-password',
recipients=['admin@company.com', 'ops@company.com']
)
if email_result['success']:
print(f"✅ 邮件通知已配置: {len(email_result['recipients'])}个收件人")
else:
print(f"⚠️ 邮件通知配置失败: {email_result['error']}")
# 配置Webhook通知(示例)
webhook_result = self.alert_manager.add_webhook_notification(
webhook_url='https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK',
headers={'Content-Type': 'application/json'}
)
if webhook_result['success']:
print("✅ Webhook通知已配置")
else:
print(f"⚠️ Webhook通知配置失败: {webhook_result['error']}")
# 4. 启动Prometheus指标服务器
print("\n4. 启动Prometheus指标服务器...")
try:
start_http_server(self.prometheus_port, registry=self.monitor.prometheus_registry)
print(f"✅ Prometheus指标服务器已启动: http://localhost:{self.prometheus_port}")
except Exception as prometheus_error:
print(f"⚠️ Prometheus服务器启动失败: {prometheus_error}")
# 5. 启动监控
print("\n5. 启动监控...")
monitor_result = self.monitor.start_monitoring()
if monitor_result['success']:
print(f"✅ 监控系统已启动,监控间隔: {monitor_result['monitor_interval']}秒")
else:
print(f"❌ 监控启动失败: {monitor_result['error']}")
return monitor_result
print("\n=== 监控系统设置完成 ===")
print("监控功能:")
print("- 系统指标监控 (CPU, 内存, 磁盘, 网络)")
print("- MinIO服务监控 (可用性, 响应时间, 存储统计)")
print("- 自动告警和通知")
print(f"- Prometheus指标导出: http://localhost:{self.prometheus_port}")
print("- 历史数据存储和分析")
return {
'success': True,
'message': '监控系统设置完成',
'prometheus_url': f'http://localhost:{self.prometheus_port}',
'monitor_interval': monitor_result['monitor_interval'],
'alert_rules_count': rules_result['rules_count'],
'notification_channels': list(self.alert_manager.notification_channels.keys())
}
except Exception as e:
return {
'success': False,
'error': f'设置监控系统失败: {e}'
}
def get_dashboard_data(self) -> Dict[str, Any]:
"""获取仪表板数据"""
try:
if not self.monitor:
return {
'success': False,
'error': '监控器未初始化'
}
# 收集当前指标
current_metrics = self.monitor.collect_all_metrics()
# 获取历史数据
key_metrics = [
'cpu_usage_percent', 'memory_usage_percent', 'disk_usage_percent',
'service_available', 'api_response_time', 'bucket_count', 'total_objects'
]
historical_data = {}
for metric in key_metrics:
history = self.monitor.get_metrics_history(metric, hours=1)
if history['success']:
historical_data[metric] = {
'timestamps': history['timestamps'][-60:], # 最近60个数据点
'values': history['values'][-60:],
'current': history['values'][-1] if history['values'] else 0
}
# 获取最近的告警
conn = sqlite3.connect(self.monitor.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT rule_name, level, message, triggered_at
FROM alerts
WHERE triggered_at >= datetime('now', '-1 hour')
ORDER BY triggered_at DESC
LIMIT 10
''')
recent_alerts = []
for row in cursor.fetchall():
recent_alerts.append({
'rule_name': row[0],
'level': row[1],
'message': row[2],
'triggered_at': row[3]
})
conn.close()
dashboard_data = {
'current_metrics': {
name: {
'value': metric.value,
'unit': metric.unit,
'timestamp': metric.timestamp.isoformat()
} for name, metric in current_metrics.items()
},
'historical_data': historical_data,
'recent_alerts': recent_alerts,
'system_status': {
'monitoring_active': self.monitor.monitoring_active,
'prometheus_port': self.prometheus_port,
'alert_rules_count': len(self.monitor.alert_rules),
'notification_channels': len(self.alert_manager.notification_channels) if self.alert_manager else 0
},
'updated_at': datetime.now().isoformat()
}
self.dashboard_data = dashboard_data
return {
'success': True,
'dashboard_data': dashboard_data
}
except Exception as e:
return {
'success': False,
'error': f'获取仪表板数据失败: {e}'
}
def generate_monitoring_report(self, hours: int = 24) -> Dict[str, Any]:
"""生成监控报告"""
try:
if not self.monitor:
return {
'success': False,
'error': '监控器未初始化'
}
return self.monitor.generate_report(hours)
except Exception as e:
return {
'success': False,
'error': f'生成监控报告失败: {e}'
}
def stop_monitoring_system(self) -> Dict[str, Any]:
"""停止监控系统"""
try:
if not self.monitor:
return {
'success': False,
'error': '监控器未初始化'
}
result = self.monitor.stop_monitoring()
if result['success']:
print("监控系统已停止")
return result
except Exception as e:
return {
'success': False,
'error': f'停止监控系统失败: {e}'
}
# 使用示例
if __name__ == "__main__":
# 创建监控系统
monitoring_system = MinIOMonitoringSystem()
# 设置监控系统
result = monitoring_system.setup_monitoring_system(
endpoint='localhost:9000',
access_key='minioadmin',
secret_key='minioadmin123'
)
if result['success']:
print("\n监控系统设置成功!")
print(f"Prometheus指标: {result['prometheus_url']}")
# 运行一段时间后生成报告
import time
print("\n等待60秒收集数据...")
time.sleep(60)
# 获取仪表板数据
dashboard_result = monitoring_system.get_dashboard_data()
if dashboard_result['success']:
print("\n仪表板数据获取成功")
# 生成报告
report_result = monitoring_system.generate_monitoring_report(hours=1)
if report_result['success']:
print("\n监控报告生成成功")
else:
print(f"\n监控系统设置失败: {result['error']}")
8.4 总结
本章详细介绍了MinIO的监控和运维管理,主要内容包括:
8.4.1 核心功能
全面监控
- 系统级指标监控
- MinIO服务监控
- 实时数据收集
- 历史数据存储
告警系统
- 灵活的告警规则
- 多级别告警
- 多渠道通知
- 告警历史记录
数据分析
- 指标趋势分析
- 性能报告生成
- 问题诊断建议
- 仪表板展示
8.4.2 技术特点
- 实时监控:持续收集和分析系统指标
- 智能告警:基于规则的自动告警机制
- 多渠道通知:支持邮件、Webhook等通知方式
- 数据持久化:SQLite数据库存储历史数据
- Prometheus集成:标准化指标导出
- 可视化支持:提供仪表板数据接口
8.4.3 最佳实践
- 监控策略:制定全面的监控策略和指标体系
- 告警配置:合理设置告警阈值和通知策略
- 数据保留:根据需求配置历史数据保留策略
- 性能优化:定期分析监控数据优化系统性能
- 故障预防:通过监控数据预防潜在问题
下一章我们将介绍MinIO的性能优化和调优,包括存储优化、网络优化、查询优化等内容。