概述
仪表板是Grafana的核心功能,它将来自不同数据源的数据以可视化的方式呈现给用户。一个设计良好的仪表板能够帮助用户快速理解数据趋势、发现问题并做出决策。本章将详细介绍如何创建和设计高效、美观的Grafana仪表板。
学习目标
通过本章学习,您将能够:
掌握仪表板基础概念
- 理解仪表板的组成结构
- 了解面板类型和用途
- 掌握布局设计原则
创建和配置面板
- 学会添加和配置各种类型的面板
- 掌握查询编辑器的使用
- 了解数据转换和处理
设计用户友好的界面
- 应用可视化设计最佳实践
- 创建响应式布局
- 实现交互式功能
优化仪表板性能
- 减少查询负载
- 优化刷新策略
- 提高用户体验
仪表板基础概念
1. 仪表板架构
from enum import Enum
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from datetime import datetime
class PanelType(Enum):
"""面板类型枚举"""
GRAPH = "graph"
SINGLESTAT = "singlestat"
TABLE = "table"
HEATMAP = "heatmap"
GAUGE = "gauge"
BAR_GAUGE = "bargauge"
STAT = "stat"
PIE_CHART = "piechart"
WORLDMAP = "worldmap"
TEXT = "text"
LOGS = "logs"
NODE_GRAPH = "nodeGraph"
ALERT_LIST = "alertlist"
DASHBOARD_LIST = "dashlist"
PLUGIN_LIST = "pluginlist"
class VisualizationType(Enum):
"""可视化类型枚举"""
TIME_SERIES = "timeseries"
BAR_CHART = "barchart"
HISTOGRAM = "histogram"
CANDLESTICK = "candlestick"
STATE_TIMELINE = "state-timeline"
STATUS_HISTORY = "status-history"
FLAME_GRAPH = "flamegraph"
TRACE_VIEW = "traces"
class RefreshInterval(Enum):
"""刷新间隔枚举"""
OFF = ""
FIVE_SECONDS = "5s"
TEN_SECONDS = "10s"
THIRTY_SECONDS = "30s"
ONE_MINUTE = "1m"
FIVE_MINUTES = "5m"
FIFTEEN_MINUTES = "15m"
THIRTY_MINUTES = "30m"
ONE_HOUR = "1h"
TWO_HOURS = "2h"
ONE_DAY = "1d"
@dataclass
class GridPos:
"""面板网格位置"""
x: int # X坐标 (0-24)
y: int # Y坐标
w: int # 宽度 (1-24)
h: int # 高度
@dataclass
class TimeRange:
"""时间范围"""
from_time: str # 开始时间
to_time: str # 结束时间
@classmethod
def last_hours(cls, hours: int):
return cls(f"now-{hours}h", "now")
@classmethod
def last_days(cls, days: int):
return cls(f"now-{days}d", "now")
@classmethod
def custom_range(cls, start: str, end: str):
return cls(start, end)
@dataclass
class Target:
"""查询目标"""
expr: str # 查询表达式
datasource: str # 数据源
legend_format: str = "" # 图例格式
interval: str = "" # 查询间隔
instant: bool = False # 即时查询
ref_id: str = "A" # 引用ID
hide: bool = False # 是否隐藏
@dataclass
class Panel:
"""面板配置"""
id: int
title: str
type: PanelType
grid_pos: GridPos
targets: List[Target]
datasource: Optional[str] = None
description: str = ""
transparent: bool = False
options: Dict[str, Any] = None
field_config: Dict[str, Any] = None
def __post_init__(self):
if self.options is None:
self.options = {}
if self.field_config is None:
self.field_config = {}
@dataclass
class Variable:
"""仪表板变量"""
name: str
type: str # query, custom, constant, datasource, interval, textbox
label: str = ""
description: str = ""
query: str = ""
datasource: str = ""
multi: bool = False # 是否支持多选
include_all: bool = False # 是否包含"All"选项
all_value: str = ""
current: Dict[str, Any] = None
options: List[Dict[str, Any]] = None
def __post_init__(self):
if self.current is None:
self.current = {}
if self.options is None:
self.options = []
@dataclass
class Dashboard:
"""仪表板配置"""
id: Optional[int]
uid: str
title: str
description: str = ""
tags: List[str] = None
panels: List[Panel] = None
templating: List[Variable] = None
time: TimeRange = None
refresh: RefreshInterval = RefreshInterval.FIVE_MINUTES
timezone: str = "browser"
editable: bool = True
graph_tooltip: str = "shared_crosshair" # default, shared_crosshair, shared_tooltip
def __post_init__(self):
if self.tags is None:
self.tags = []
if self.panels is None:
self.panels = []
if self.templating is None:
self.templating = []
if self.time is None:
self.time = TimeRange.last_hours(6)
class DashboardManager:
"""仪表板管理器"""
def __init__(self):
self.dashboards: Dict[str, Dashboard] = {}
self.panel_id_counter = 1
def create_dashboard(self, title: str, description: str = "", tags: List[str] = None) -> Dashboard:
"""创建新仪表板"""
import uuid
uid = str(uuid.uuid4())[:8]
dashboard = Dashboard(
id=None,
uid=uid,
title=title,
description=description,
tags=tags or []
)
self.dashboards[uid] = dashboard
return dashboard
def add_panel(self, dashboard: Dashboard, panel_config: Dict) -> Panel:
"""添加面板到仪表板"""
panel = Panel(
id=self.panel_id_counter,
title=panel_config.get("title", f"Panel {self.panel_id_counter}"),
type=PanelType(panel_config.get("type", "graph")),
grid_pos=GridPos(
x=panel_config.get("x", 0),
y=panel_config.get("y", 0),
w=panel_config.get("w", 12),
h=panel_config.get("h", 8)
),
targets=panel_config.get("targets", []),
datasource=panel_config.get("datasource"),
description=panel_config.get("description", ""),
transparent=panel_config.get("transparent", False),
options=panel_config.get("options", {}),
field_config=panel_config.get("field_config", {})
)
dashboard.panels.append(panel)
self.panel_id_counter += 1
return panel
def add_variable(self, dashboard: Dashboard, var_config: Dict) -> Variable:
"""添加变量到仪表板"""
variable = Variable(
name=var_config["name"],
type=var_config["type"],
label=var_config.get("label", ""),
description=var_config.get("description", ""),
query=var_config.get("query", ""),
datasource=var_config.get("datasource", ""),
multi=var_config.get("multi", False),
include_all=var_config.get("include_all", False),
all_value=var_config.get("all_value", ""),
current=var_config.get("current", {}),
options=var_config.get("options", [])
)
dashboard.templating.append(variable)
return variable
def get_dashboard_json(self, dashboard: Dashboard) -> Dict:
"""获取仪表板JSON配置"""
return {
"id": dashboard.id,
"uid": dashboard.uid,
"title": dashboard.title,
"description": dashboard.description,
"tags": dashboard.tags,
"timezone": dashboard.timezone,
"editable": dashboard.editable,
"graphTooltip": dashboard.graph_tooltip,
"time": {
"from": dashboard.time.from_time,
"to": dashboard.time.to_time
},
"refresh": dashboard.refresh.value,
"panels": [
self._panel_to_dict(panel) for panel in dashboard.panels
],
"templating": {
"list": [
self._variable_to_dict(var) for var in dashboard.templating
]
},
"version": 1,
"schemaVersion": 30
}
def _panel_to_dict(self, panel: Panel) -> Dict:
"""将面板转换为字典"""
return {
"id": panel.id,
"title": panel.title,
"type": panel.type.value,
"gridPos": {
"x": panel.grid_pos.x,
"y": panel.grid_pos.y,
"w": panel.grid_pos.w,
"h": panel.grid_pos.h
},
"targets": [
{
"expr": target.expr,
"datasource": target.datasource,
"legendFormat": target.legend_format,
"interval": target.interval,
"instant": target.instant,
"refId": target.ref_id,
"hide": target.hide
} for target in panel.targets
],
"datasource": panel.datasource,
"description": panel.description,
"transparent": panel.transparent,
"options": panel.options,
"fieldConfig": panel.field_config
}
def _variable_to_dict(self, variable: Variable) -> Dict:
"""将变量转换为字典"""
return {
"name": variable.name,
"type": variable.type,
"label": variable.label,
"description": variable.description,
"query": variable.query,
"datasource": variable.datasource,
"multi": variable.multi,
"includeAll": variable.include_all,
"allValue": variable.all_value,
"current": variable.current,
"options": variable.options
}
def calculate_layout(self, panels_config: List[Dict]) -> List[GridPos]:
"""计算面板布局"""
positions = []
current_y = 0
current_x = 0
row_height = 0
for config in panels_config:
width = config.get("width", 12)
height = config.get("height", 8)
# 检查是否需要换行
if current_x + width > 24:
current_y += row_height
current_x = 0
row_height = 0
positions.append(GridPos(
x=current_x,
y=current_y,
w=width,
h=height
))
current_x += width
row_height = max(row_height, height)
return positions
# 使用示例
manager = DashboardManager()
# 创建仪表板
dashboard = manager.create_dashboard(
title="系统监控仪表板",
description="监控系统性能和健康状态",
tags=["monitoring", "system", "performance"]
)
# 添加变量
manager.add_variable(dashboard, {
"name": "instance",
"type": "query",
"label": "实例",
"query": "label_values(up, instance)",
"datasource": "prometheus",
"multi": True,
"include_all": True
})
# 添加面板
manager.add_panel(dashboard, {
"title": "CPU使用率",
"type": "timeseries",
"x": 0, "y": 0, "w": 12, "h": 8,
"targets": [
Target(
expr="100 - (avg(rate(node_cpu_seconds_total{mode='idle',instance=~'$instance'}[5m])) * 100)",
datasource="prometheus",
legend_format="CPU使用率"
)
],
"datasource": "prometheus"
})
manager.add_panel(dashboard, {
"title": "内存使用率",
"type": "gauge",
"x": 12, "y": 0, "w": 12, "h": 8,
"targets": [
Target(
expr="(1 - (node_memory_MemAvailable_bytes{instance=~'$instance'} / node_memory_MemTotal_bytes{instance=~'$instance'})) * 100",
datasource="prometheus",
legend_format="内存使用率"
)
],
"datasource": "prometheus"
})
# 生成JSON配置
dashboard_json = manager.get_dashboard_json(dashboard)
print("仪表板配置:", dashboard_json)
2. 面板类型详解
class PanelTypeGuide:
"""面板类型使用指南"""
def __init__(self):
self.panel_guides = {
"timeseries": {
"description": "时间序列图表,显示数据随时间的变化",
"use_cases": [
"监控指标趋势",
"性能数据分析",
"业务指标跟踪",
"异常检测"
],
"best_practices": [
"使用合适的时间范围",
"选择恰当的聚合函数",
"设置有意义的图例",
"配置阈值线"
],
"configuration": {
"legend": {
"displayMode": "table",
"placement": "bottom",
"values": ["min", "max", "mean", "current"]
},
"tooltip": {
"mode": "multi",
"sort": "desc"
},
"axes": {
"left": {
"unit": "percent",
"min": 0,
"max": 100
}
}
}
},
"stat": {
"description": "单值显示,突出显示关键指标",
"use_cases": [
"KPI指标展示",
"当前状态显示",
"计数器显示",
"百分比显示"
],
"best_practices": [
"使用大字体显示主要数值",
"添加趋势指示器",
"设置颜色阈值",
"包含上下文信息"
],
"configuration": {
"reduceOptions": {
"values": False,
"calcs": ["lastNotNull"],
"fields": ""
},
"textMode": "auto",
"colorMode": "value",
"graphMode": "area",
"justifyMode": "auto"
}
},
"gauge": {
"description": "仪表盘显示,直观显示数值在范围内的位置",
"use_cases": [
"百分比显示",
"容量使用率",
"性能指标",
"健康评分"
],
"best_practices": [
"设置合理的最小最大值",
"使用颜色区间表示状态",
"添加阈值标记",
"保持简洁的标签"
],
"configuration": {
"min": 0,
"max": 100,
"thresholds": {
"steps": [
{"color": "green", "value": 0},
{"color": "yellow", "value": 70},
{"color": "red", "value": 90}
]
}
}
},
"table": {
"description": "表格显示,展示结构化数据",
"use_cases": [
"详细数据列表",
"多维度数据对比",
"日志记录显示",
"配置信息展示"
],
"best_practices": [
"合理设置列宽",
"使用排序功能",
"添加过滤器",
"高亮重要数据"
],
"configuration": {
"showHeader": True,
"sortBy": [{"desc": True, "displayName": "Time"}],
"styles": [
{
"pattern": "Time",
"type": "date",
"dateFormat": "YYYY-MM-DD HH:mm:ss"
}
]
}
},
"heatmap": {
"description": "热力图,显示数据密度和分布",
"use_cases": [
"响应时间分布",
"用户活动模式",
"错误率分析",
"资源使用模式"
],
"best_practices": [
"选择合适的颜色方案",
"设置恰当的桶大小",
"添加数值标签",
"使用对数刻度(如需要)"
],
"configuration": {
"xAxis": {
"show": True
},
"yAxis": {
"show": True,
"logBase": 1
},
"color": {
"mode": "spectrum",
"cardColor": "#b4ff00",
"colorScale": "sqrt",
"exponent": 0.5,
"colorScheme": "interpolateOranges"
}
}
},
"piechart": {
"description": "饼图,显示部分与整体的关系",
"use_cases": [
"资源分配显示",
"分类数据占比",
"状态分布",
"成本分析"
],
"best_practices": [
"限制分类数量(<7个)",
"使用对比鲜明的颜色",
"显示百分比标签",
"考虑使用环形图"
],
"configuration": {
"pieType": "pie", # pie, donut
"tooltipMode": "single",
"legendDisplayMode": "table",
"legendPlacement": "right",
"legendValues": ["value", "percent"]
}
}
}
def get_panel_guide(self, panel_type: str) -> Dict:
"""获取面板类型指南"""
return self.panel_guides.get(panel_type, {
"description": "未知面板类型",
"use_cases": [],
"best_practices": [],
"configuration": {}
})
def recommend_panel_type(self, data_characteristics: Dict) -> List[str]:
"""根据数据特征推荐面板类型"""
recommendations = []
# 时间序列数据
if data_characteristics.get("has_time_dimension", False):
recommendations.append("timeseries")
# 单一数值
if data_characteristics.get("single_value", False):
recommendations.extend(["stat", "gauge"])
# 分类数据
if data_characteristics.get("categorical", False):
recommendations.extend(["piechart", "table"])
# 多维数据
if data_characteristics.get("multi_dimensional", False):
recommendations.extend(["table", "heatmap"])
# 分布数据
if data_characteristics.get("distribution", False):
recommendations.extend(["heatmap", "histogram"])
return recommendations
def generate_panel_config(self, panel_type: str, data_source: str, query: str) -> Dict:
"""生成面板配置模板"""
base_config = {
"type": panel_type,
"datasource": data_source,
"targets": [
{
"expr": query,
"refId": "A"
}
]
}
# 添加类型特定配置
guide = self.get_panel_guide(panel_type)
if guide.get("configuration"):
base_config.update(guide["configuration"])
return base_config
# 使用示例
guide = PanelTypeGuide()
# 获取面板类型指南
timeseries_guide = guide.get_panel_guide("timeseries")
print("时间序列面板指南:", timeseries_guide)
# 推荐面板类型
data_chars = {
"has_time_dimension": True,
"single_value": False,
"categorical": False,
"multi_dimensional": False,
"distribution": False
}
recommendations = guide.recommend_panel_type(data_chars)
print("推荐面板类型:", recommendations)
# 生成面板配置
config = guide.generate_panel_config(
"timeseries",
"prometheus",
"rate(http_requests_total[5m])"
)
print("面板配置:", config)
查询编辑器与数据处理
1. 查询编辑器使用
class QueryEditor:
"""查询编辑器管理器"""
def __init__(self):
self.query_templates = {
"prometheus": {
"cpu_usage": "100 - (avg(rate(node_cpu_seconds_total{mode='idle'}[5m])) * 100)",
"memory_usage": "(1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) * 100",
"disk_usage": "100 - ((node_filesystem_avail_bytes * 100) / node_filesystem_size_bytes)",
"network_traffic": "rate(node_network_receive_bytes_total[5m])",
"http_requests": "rate(http_requests_total[5m])",
"error_rate": "rate(http_requests_total{status=~'5..'}[5m]) / rate(http_requests_total[5m]) * 100",
"response_time": "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))"
},
"mysql": {
"connections": "SELECT COUNT(*) FROM information_schema.processlist",
"slow_queries": "SHOW GLOBAL STATUS LIKE 'Slow_queries'",
"qps": "SHOW GLOBAL STATUS LIKE 'Questions'",
"innodb_buffer": "SHOW GLOBAL STATUS LIKE 'Innodb_buffer_pool_pages_total'",
"table_locks": "SHOW GLOBAL STATUS LIKE 'Table_locks_waited'"
},
"influxdb": {
"cpu_usage": "SELECT mean(usage_idle) FROM cpu WHERE time >= now() - 1h GROUP BY time(5m)",
"memory_usage": "SELECT mean(used_percent) FROM mem WHERE time >= now() - 1h GROUP BY time(5m)",
"disk_io": "SELECT mean(read_bytes), mean(write_bytes) FROM diskio WHERE time >= now() - 1h GROUP BY time(5m)"
},
"elasticsearch": {
"log_count": '{"query": {"range": {"@timestamp": {"gte": "now-1h"}}}}',
"error_logs": '{"query": {"bool": {"must": [{"range": {"@timestamp": {"gte": "now-1h"}}}, {"term": {"level": "error"}}]}}}',
"response_codes": '{"aggs": {"status_codes": {"terms": {"field": "response_code"}}}}'
}
}
self.query_functions = {
"prometheus": {
"aggregation": ["sum", "avg", "min", "max", "count", "stddev", "stdvar"],
"rate": ["rate", "irate", "increase", "delta"],
"time": ["time", "hour", "minute", "day_of_month", "day_of_week", "days_in_month"],
"math": ["abs", "ceil", "floor", "round", "sqrt", "exp", "ln", "log2", "log10"],
"prediction": ["predict_linear", "holt_winters"],
"histogram": ["histogram_quantile", "histogram_count", "histogram_sum"]
},
"mysql": {
"aggregation": ["COUNT", "SUM", "AVG", "MIN", "MAX", "GROUP_CONCAT"],
"string": ["CONCAT", "SUBSTRING", "LENGTH", "UPPER", "LOWER", "TRIM"],
"date": ["NOW", "DATE", "TIME", "YEAR", "MONTH", "DAY", "DATE_FORMAT"],
"math": ["ABS", "CEIL", "FLOOR", "ROUND", "SQRT", "POW", "MOD"]
}
}
def get_query_template(self, datasource: str, metric_type: str) -> str:
"""获取查询模板"""
return self.query_templates.get(datasource, {}).get(metric_type, "")
def get_available_functions(self, datasource: str) -> Dict[str, List[str]]:
"""获取可用函数列表"""
return self.query_functions.get(datasource, {})
def build_prometheus_query(self, metric: str, filters: Dict = None,
aggregation: str = None, rate_interval: str = "5m") -> str:
"""构建Prometheus查询"""
query = metric
# 添加过滤器
if filters:
filter_parts = []
for key, value in filters.items():
if isinstance(value, list):
filter_parts.append(f'{key}=~"{"||".join(value)}"')
else:
filter_parts.append(f'{key}="{value}"')
if filter_parts:
query += "{" + ",".join(filter_parts) + "}"
# 添加rate函数
if "_total" in metric or "_count" in metric:
query = f"rate({query}[{rate_interval}])"
# 添加聚合函数
if aggregation:
query = f"{aggregation}({query})"
return query
def build_mysql_query(self, table: str, columns: List[str],
conditions: Dict = None, group_by: List[str] = None,
order_by: str = None, limit: int = None) -> str:
"""构建MySQL查询"""
query = f"SELECT {', '.join(columns)} FROM {table}"
# 添加WHERE条件
if conditions:
where_parts = []
for key, value in conditions.items():
if isinstance(value, list):
where_parts.append(f"{key} IN ({', '.join(map(str, value))})")
else:
where_parts.append(f"{key} = '{value}'")
if where_parts:
query += " WHERE " + " AND ".join(where_parts)
# 添加GROUP BY
if group_by:
query += f" GROUP BY {', '.join(group_by)}"
# 添加ORDER BY
if order_by:
query += f" ORDER BY {order_by}"
# 添加LIMIT
if limit:
query += f" LIMIT {limit}"
return query
def validate_query(self, datasource: str, query: str) -> Dict:
"""验证查询语法"""
validation_result = {
"valid": True,
"errors": [],
"warnings": [],
"suggestions": []
}
if datasource == "prometheus":
# 检查基本语法
if not query.strip():
validation_result["valid"] = False
validation_result["errors"].append("查询不能为空")
# 检查括号匹配
if query.count("(") != query.count(")"):
validation_result["valid"] = False
validation_result["errors"].append("括号不匹配")
# 检查常见错误
if "rate(" in query and "[" not in query:
validation_result["warnings"].append("rate函数需要时间范围参数")
# 性能建议
if ".*" in query:
validation_result["suggestions"].append("避免使用正则表达式.*,考虑更具体的标签匹配")
if not any(interval in query for interval in ["[1m]", "[5m]", "[15m]", "[1h]"]):
if any(func in query for func in ["rate", "irate", "increase"]):
validation_result["suggestions"].append("建议为rate函数指定合适的时间间隔")
elif datasource == "mysql":
# 检查SQL基本语法
if not query.strip().upper().startswith(("SELECT", "SHOW", "DESCRIBE")):
validation_result["warnings"].append("建议使用只读查询(SELECT, SHOW, DESCRIBE)")
# 检查是否有LIMIT
if "SELECT" in query.upper() and "LIMIT" not in query.upper():
validation_result["suggestions"].append("建议添加LIMIT子句以限制返回结果数量")
return validation_result
def optimize_query(self, datasource: str, query: str) -> Dict:
"""优化查询性能"""
optimization = {
"original_query": query,
"optimized_query": query,
"optimizations": [],
"performance_impact": "low"
}
if datasource == "prometheus":
optimized = query
# 优化聚合顺序
if "sum(rate(" in query:
# 建议先聚合再计算rate
optimization["optimizations"].append("考虑调整聚合函数顺序以提高性能")
# 优化时间范围
if "[1s]" in query:
optimized = optimized.replace("[1s]", "[5s]")
optimization["optimizations"].append("增加时间间隔从1s到5s以减少计算负载")
# 添加标签过滤
if "{" not in query and "up" not in query:
optimization["optimizations"].append("建议添加标签过滤器以减少查询范围")
optimization["optimized_query"] = optimized
elif datasource == "mysql":
optimized = query
# 添加索引建议
if "WHERE" in query.upper():
optimization["optimizations"].append("确保WHERE子句中的列有适当的索引")
# 限制结果集
if "LIMIT" not in query.upper() and "SELECT" in query.upper():
optimized += " LIMIT 1000"
optimization["optimizations"].append("添加LIMIT子句限制结果集大小")
optimization["optimized_query"] = optimized
return optimization
# 使用示例
editor = QueryEditor()
# 获取查询模板
cpu_query = editor.get_query_template("prometheus", "cpu_usage")
print("CPU使用率查询:", cpu_query)
# 构建Prometheus查询
query = editor.build_prometheus_query(
"http_requests_total",
filters={"job": "api-server", "status": ["200", "201"]},
aggregation="sum",
rate_interval="5m"
)
print("构建的查询:", query)
# 验证查询
validation = editor.validate_query("prometheus", query)
print("查询验证:", validation)
# 优化查询
optimization = editor.optimize_query("prometheus", query)
print("查询优化:", optimization)
2. 数据转换与处理
class DataTransformer:
"""数据转换器"""
def __init__(self):
self.transformations = {
"reduce": {
"description": "数据聚合,将多个值减少为单个值",
"functions": ["last", "lastNotNull", "first", "firstNotNull", "min", "max", "mean", "sum", "count", "range", "diff", "stdDev"]
},
"filter": {
"description": "数据过滤,根据条件筛选数据",
"types": ["byName", "byRegex", "byType", "byValue"]
},
"organize": {
"description": "重新组织数据结构",
"operations": ["rename", "hide", "reorder"]
},
"calculate": {
"description": "计算新字段",
"functions": ["binary", "unary", "window"]
},
"convert": {
"description": "数据类型转换",
"types": ["string", "number", "time", "boolean"]
},
"group": {
"description": "数据分组",
"methods": ["byField", "byValue", "byTime"]
}
}
def create_reduce_transformation(self, reducer: str, fields: List[str] = None) -> Dict:
"""创建聚合转换"""
return {
"id": "reduce",
"options": {
"reducers": [reducer],
"fields": fields or "",
"values": False
}
}
def create_filter_transformation(self, filter_type: str, pattern: str,
include: bool = True) -> Dict:
"""创建过滤转换"""
filter_config = {
"id": "filterFieldsByName",
"options": {
"include": {
"pattern": pattern
} if include else None,
"exclude": {
"pattern": pattern
} if not include else None
}
}
if filter_type == "byRegex":
filter_config["id"] = "filterFieldsByName"
filter_config["options"]["include" if include else "exclude"]["options"] = "regex"
elif filter_type == "byType":
filter_config["id"] = "filterFieldsByType"
filter_config["options"] = {"include": pattern} if include else {"exclude": pattern}
return filter_config
def create_organize_transformation(self, field_configs: List[Dict]) -> Dict:
"""创建字段组织转换"""
return {
"id": "organize",
"options": {
"excludeByName": {config["name"]: True for config in field_configs if config.get("hide", False)},
"indexByName": {config["name"]: i for i, config in enumerate(field_configs) if "order" in config},
"renameByName": {config["name"]: config["rename"] for config in field_configs if "rename" in config}
}
}
def create_calculate_transformation(self, expression: str, alias: str) -> Dict:
"""创建计算字段转换"""
return {
"id": "calculateField",
"options": {
"mode": "binary",
"reduce": {
"reducer": "sum"
},
"binary": {
"left": "Field A",
"operator": "+",
"right": "Field B"
},
"alias": alias
}
}
def create_convert_transformation(self, field_name: str, target_type: str) -> Dict:
"""创建类型转换"""
return {
"id": "convertFieldType",
"options": {
"conversions": [
{
"targetField": field_name,
"destinationType": target_type
}
]
}
}
def create_group_transformation(self, group_by: List[str]) -> Dict:
"""创建分组转换"""
return {
"id": "groupBy",
"options": {
"fields": {
field: {
"aggregations": ["last"],
"operation": "groupby"
} for field in group_by
}
}
}
def create_time_series_transformation(self, time_field: str, value_field: str) -> Dict:
"""创建时间序列转换"""
return {
"id": "seriesToColumns",
"options": {
"byField": time_field
}
}
def create_threshold_transformation(self, field: str, thresholds: List[Dict]) -> Dict:
"""创建阈值转换"""
return {
"id": "configFromData",
"options": {
"configPath": "thresholds",
"mappings": [
{
"field": field,
"handlerKey": "threshold1",
"reducer": "last"
}
]
}
}
def generate_transformation_pipeline(self, requirements: List[Dict]) -> List[Dict]:
"""生成转换管道"""
pipeline = []
for req in requirements:
transform_type = req.get("type")
if transform_type == "reduce":
pipeline.append(self.create_reduce_transformation(
req.get("reducer", "last"),
req.get("fields")
))
elif transform_type == "filter":
pipeline.append(self.create_filter_transformation(
req.get("filter_type", "byName"),
req.get("pattern", ""),
req.get("include", True)
))
elif transform_type == "organize":
pipeline.append(self.create_organize_transformation(
req.get("field_configs", [])
))
elif transform_type == "calculate":
pipeline.append(self.create_calculate_transformation(
req.get("expression", ""),
req.get("alias", "calculated")
))
elif transform_type == "convert":
pipeline.append(self.create_convert_transformation(
req.get("field_name", ""),
req.get("target_type", "string")
))
elif transform_type == "group":
pipeline.append(self.create_group_transformation(
req.get("group_by", [])
))
return pipeline
def get_transformation_examples(self) -> Dict:
"""获取转换示例"""
return {
"cpu_percentage": {
"description": "将CPU使用率转换为百分比",
"transformations": [
{
"id": "calculateField",
"options": {
"mode": "binary",
"binary": {
"left": "cpu_usage",
"operator": "*",
"right": "100"
},
"alias": "CPU使用率(%)"
}
}
]
},
"memory_available": {
"description": "计算可用内存",
"transformations": [
{
"id": "calculateField",
"options": {
"mode": "binary",
"binary": {
"left": "memory_total",
"operator": "-",
"right": "memory_used"
},
"alias": "可用内存"
}
}
]
},
"error_rate": {
"description": "计算错误率",
"transformations": [
{
"id": "calculateField",
"options": {
"mode": "binary",
"binary": {
"left": "errors",
"operator": "/",
"right": "total_requests"
},
"alias": "错误率"
}
},
{
"id": "calculateField",
"options": {
"mode": "binary",
"binary": {
"left": "错误率",
"operator": "*",
"right": "100"
},
"alias": "错误率(%)"
}
}
]
}
}
# 使用示例
transformer = DataTransformer()
# 创建转换管道
requirements = [
{
"type": "filter",
"filter_type": "byName",
"pattern": "cpu_*",
"include": True
},
{
"type": "reduce",
"reducer": "mean",
"fields": ["cpu_usage"]
},
{
"type": "organize",
"field_configs": [
{"name": "cpu_usage", "rename": "CPU使用率", "order": 0},
{"name": "timestamp", "hide": True}
]
}
]
pipeline = transformer.generate_transformation_pipeline(requirements)
print("转换管道:", pipeline)
# 获取转换示例
examples = transformer.get_transformation_examples()
print("转换示例:", examples["cpu_percentage"])
可视化设计最佳实践
1. 设计原则
class VisualizationDesignGuide:
"""可视化设计指南"""
def __init__(self):
self.design_principles = {
"clarity": {
"description": "清晰性 - 信息传达要清晰明确",
"guidelines": [
"使用合适的图表类型",
"避免信息过载",
"保持一致的颜色方案",
"使用清晰的标签和标题"
]
},
"simplicity": {
"description": "简洁性 - 去除不必要的元素",
"guidelines": [
"减少图表装饰",
"使用空白空间",
"限制颜色数量",
"避免3D效果"
]
},
"consistency": {
"description": "一致性 - 保持设计统一",
"guidelines": [
"统一的字体和大小",
"一致的颜色编码",
"标准化的图例位置",
"统一的时间格式"
]
},
"accessibility": {
"description": "可访问性 - 确保所有用户都能理解",
"guidelines": [
"使用色盲友好的颜色",
"提供替代文本",
"确保足够的对比度",
"支持键盘导航"
]
}
}
self.color_schemes = {
"status": {
"success": "#28a745",
"warning": "#ffc107",
"error": "#dc3545",
"info": "#17a2b8"
},
"performance": {
"excellent": "#28a745",
"good": "#6f42c1",
"average": "#fd7e14",
"poor": "#dc3545"
},
"categorical": [
"#1f77b4", "#ff7f0e", "#2ca02c", "#d62728",
"#9467bd", "#8c564b", "#e377c2", "#7f7f7f",
"#bcbd22", "#17becf"
],
"sequential": {
"blue": ["#f7fbff", "#deebf7", "#c6dbef", "#9ecae1", "#6baed6", "#4292c6", "#2171b5", "#08519c", "#08306b"],
"green": ["#f7fcf5", "#e5f5e0", "#c7e9c0", "#a1d99b", "#74c476", "#41ab5d", "#238b45", "#006d2c", "#00441b"],
"red": ["#fff5f0", "#fee0d2", "#fcbba1", "#fc9272", "#fb6a4a", "#ef3b2c", "#cb181d", "#a50f15", "#67000d"]
}
}
def get_chart_recommendations(self, data_type: str, data_characteristics: Dict) -> Dict:
"""获取图表类型推荐"""
recommendations = {
"time_series": {
"primary": "time_series",
"alternatives": ["line", "area"],
"use_cases": ["监控指标", "趋势分析", "性能跟踪"]
},
"categorical": {
"primary": "bar",
"alternatives": ["pie", "donut"],
"use_cases": ["分类统计", "比例分析", "排名显示"]
},
"distribution": {
"primary": "histogram",
"alternatives": ["heatmap", "stat"],
"use_cases": ["数据分布", "频率分析", "异常检测"]
},
"correlation": {
"primary": "scatter",
"alternatives": ["heatmap"],
"use_cases": ["相关性分析", "聚类分析", "异常点识别"]
},
"geographic": {
"primary": "geomap",
"alternatives": ["worldmap"],
"use_cases": ["地理分布", "区域分析", "位置跟踪"]
},
"hierarchical": {
"primary": "treemap",
"alternatives": ["sunburst"],
"use_cases": ["层次结构", "组织架构", "文件系统"]
}
}
# 根据数据特征调整推荐
data_size = data_characteristics.get("size", "medium")
update_frequency = data_characteristics.get("update_frequency", "medium")
if data_size == "large":
# 大数据集建议使用聚合视图
for rec in recommendations.values():
if "heatmap" not in rec["alternatives"]:
rec["alternatives"].append("heatmap")
if update_frequency == "high":
# 高频更新建议使用实时图表
for rec in recommendations.values():
rec["considerations"] = rec.get("considerations", []) + ["考虑使用实时刷新"]
return recommendations.get(data_type, recommendations["time_series"])
def generate_color_palette(self, chart_type: str, data_count: int) -> List[str]:
"""生成颜色调色板"""
if chart_type in ["stat", "gauge"]:
return [self.color_schemes["status"]["info"]]
elif chart_type in ["bar", "pie", "donut"]:
if data_count <= len(self.color_schemes["categorical"]):
return self.color_schemes["categorical"][:data_count]
else:
# 生成更多颜色
import colorsys
colors = []
for i in range(data_count):
hue = i / data_count
rgb = colorsys.hsv_to_rgb(hue, 0.7, 0.9)
hex_color = "#{:02x}{:02x}{:02x}".format(
int(rgb[0] * 255), int(rgb[1] * 255), int(rgb[2] * 255)
)
colors.append(hex_color)
return colors
elif chart_type == "heatmap":
return self.color_schemes["sequential"]["blue"]
else:
return self.color_schemes["categorical"][:data_count]
def create_dashboard_layout(self, panels: List[Dict], screen_size: str = "desktop") -> Dict:
"""创建仪表板布局"""
layouts = {
"mobile": {"grid_width": 12, "panel_height": 8, "margin": 1},
"tablet": {"grid_width": 16, "panel_height": 6, "margin": 2},
"desktop": {"grid_width": 24, "panel_height": 8, "margin": 2},
"large": {"grid_width": 32, "panel_height": 10, "margin": 3}
}
layout_config = layouts.get(screen_size, layouts["desktop"])
grid_width = layout_config["grid_width"]
panel_height = layout_config["panel_height"]
margin = layout_config["margin"]
positioned_panels = []
current_x = 0
current_y = 0
for i, panel in enumerate(panels):
panel_width = panel.get("width", grid_width // 2)
# 检查是否需要换行
if current_x + panel_width > grid_width:
current_x = 0
current_y += panel_height + margin
positioned_panel = {
**panel,
"gridPos": {
"x": current_x,
"y": current_y,
"w": panel_width,
"h": panel.get("height", panel_height)
}
}
positioned_panels.append(positioned_panel)
current_x += panel_width + margin
return {
"panels": positioned_panels,
"layout_config": layout_config,
"total_height": current_y + panel_height
}
def validate_design(self, dashboard_config: Dict) -> Dict:
"""验证设计质量"""
validation = {
"score": 0,
"max_score": 100,
"issues": [],
"suggestions": [],
"strengths": []
}
panels = dashboard_config.get("panels", [])
# 检查面板数量
panel_count = len(panels)
if panel_count <= 6:
validation["score"] += 20
validation["strengths"].append("面板数量适中,易于理解")
elif panel_count <= 12:
validation["score"] += 10
validation["suggestions"].append("考虑将相关面板分组或使用多个仪表板")
else:
validation["issues"].append("面板过多,可能导致信息过载")
# 检查颜色使用
colors_used = set()
for panel in panels:
panel_colors = panel.get("fieldConfig", {}).get("defaults", {}).get("color", {})
if "fixedColor" in panel_colors:
colors_used.add(panel_colors["fixedColor"])
if len(colors_used) <= 5:
validation["score"] += 15
validation["strengths"].append("颜色使用克制,保持一致性")
else:
validation["suggestions"].append("减少颜色种类,提高视觉一致性")
# 检查标题和描述
panels_with_titles = sum(1 for panel in panels if panel.get("title"))
if panels_with_titles == panel_count:
validation["score"] += 15
validation["strengths"].append("所有面板都有清晰的标题")
else:
validation["issues"].append(f"有{panel_count - panels_with_titles}个面板缺少标题")
# 检查布局合理性
total_width = sum(panel.get("gridPos", {}).get("w", 0) for panel in panels)
if total_width > 0:
validation["score"] += 20
validation["strengths"].append("布局结构合理")
# 检查数据源多样性
datasources = set()
for panel in panels:
for target in panel.get("targets", []):
if "datasource" in target:
datasources.add(target["datasource"].get("type", "unknown"))
if len(datasources) <= 3:
validation["score"] += 10
validation["strengths"].append("数据源集中,便于管理")
else:
validation["suggestions"].append("考虑减少数据源种类或分离到不同仪表板")
# 检查刷新间隔
refresh_interval = dashboard_config.get("refresh", "5s")
if refresh_interval in ["30s", "1m", "5m"]:
validation["score"] += 10
validation["strengths"].append("刷新间隔设置合理")
elif refresh_interval in ["1s", "5s"]:
validation["suggestions"].append("考虑增加刷新间隔以减少服务器负载")
# 检查时间范围
time_range = dashboard_config.get("time", {}).get("from", "now-1h")
if "now-" in time_range:
validation["score"] += 10
validation["strengths"].append("使用相对时间范围,便于实时监控")
return validation
# 使用示例
design_guide = VisualizationDesignGuide()
# 获取图表推荐
recommendation = design_guide.get_chart_recommendations(
"time_series",
{"size": "large", "update_frequency": "high"}
)
print("图表推荐:", recommendation)
# 生成颜色调色板
colors = design_guide.generate_color_palette("bar", 5)
print("颜色调色板:", colors)
# 创建布局
panels = [
{"title": "CPU使用率", "width": 12, "height": 8},
{"title": "内存使用率", "width": 12, "height": 8},
{"title": "网络流量", "width": 24, "height": 6}
]
layout = design_guide.create_dashboard_layout(panels, "desktop")
print("布局配置:", layout)
2. 响应式设计
class ResponsiveDesignManager:
"""响应式设计管理器"""
def __init__(self):
self.breakpoints = {
"xs": {"min_width": 0, "max_width": 575, "grid_columns": 12},
"sm": {"min_width": 576, "max_width": 767, "grid_columns": 12},
"md": {"min_width": 768, "max_width": 991, "grid_columns": 16},
"lg": {"min_width": 992, "max_width": 1199, "grid_columns": 20},
"xl": {"min_width": 1200, "max_width": 1599, "grid_columns": 24},
"xxl": {"min_width": 1600, "max_width": float('inf'), "grid_columns": 32}
}
self.panel_sizes = {
"small": {"xs": 12, "sm": 12, "md": 8, "lg": 6, "xl": 6, "xxl": 4},
"medium": {"xs": 12, "sm": 12, "md": 16, "lg": 10, "xl": 12, "xxl": 8},
"large": {"xs": 12, "sm": 12, "md": 16, "lg": 20, "xl": 24, "xxl": 16},
"full": {"xs": 12, "sm": 12, "md": 16, "lg": 20, "xl": 24, "xxl": 32}
}
def generate_responsive_layout(self, panels: List[Dict]) -> Dict:
"""生成响应式布局"""
responsive_panels = []
for panel in panels:
panel_size = panel.get("size", "medium")
responsive_panel = {
**panel,
"responsive": {
breakpoint: {
"gridPos": {
"w": self.panel_sizes[panel_size][breakpoint],
"h": self._get_responsive_height(panel, breakpoint)
}
}
for breakpoint in self.breakpoints.keys()
}
}
responsive_panels.append(responsive_panel)
return {
"panels": responsive_panels,
"breakpoints": self.breakpoints
}
def _get_responsive_height(self, panel: Dict, breakpoint: str) -> int:
"""获取响应式高度"""
base_height = panel.get("height", 8)
# 在小屏幕上增加高度以保持可读性
if breakpoint in ["xs", "sm"]:
return min(base_height + 2, 12)
elif breakpoint == "md":
return base_height + 1
else:
return base_height
def optimize_for_mobile(self, dashboard_config: Dict) -> Dict:
"""移动端优化"""
mobile_config = dashboard_config.copy()
# 调整刷新间隔
mobile_config["refresh"] = "1m" # 减少移动端刷新频率
# 简化面板
for panel in mobile_config.get("panels", []):
# 移除复杂的转换
if len(panel.get("transformations", [])) > 2:
panel["transformations"] = panel["transformations"][:2]
# 简化图例
if "legend" in panel.get("options", {}):
panel["options"]["legend"]["displayMode"] = "list"
panel["options"]["legend"]["placement"] = "bottom"
# 调整字体大小
if "fieldConfig" in panel:
panel["fieldConfig"]["defaults"]["custom"] = panel["fieldConfig"]["defaults"].get("custom", {})
panel["fieldConfig"]["defaults"]["custom"]["textSize"] = {"title": 14, "value": 16}
return mobile_config
def create_mobile_dashboard(self, desktop_dashboard: Dict) -> Dict:
"""创建移动端专用仪表板"""
mobile_dashboard = {
"title": desktop_dashboard.get("title", "") + " (Mobile)",
"tags": desktop_dashboard.get("tags", []) + ["mobile"],
"refresh": "1m",
"time": desktop_dashboard.get("time", {}),
"panels": []
}
# 选择最重要的面板
important_panels = []
for panel in desktop_dashboard.get("panels", []):
priority = panel.get("priority", "medium")
if priority in ["high", "critical"]:
important_panels.append(panel)
# 如果重要面板不足,添加其他面板
if len(important_panels) < 4:
for panel in desktop_dashboard.get("panels", []):
if panel not in important_panels and len(important_panels) < 6:
important_panels.append(panel)
# 重新布局面板
for i, panel in enumerate(important_panels):
mobile_panel = {
**panel,
"gridPos": {
"x": 0,
"y": i * 10,
"w": 12,
"h": 8
}
}
# 简化面板配置
if mobile_panel.get("type") == "graph":
mobile_panel["type"] = "timeseries"
mobile_dashboard["panels"].append(mobile_panel)
return mobile_dashboard
# 使用示例
responsive_manager = ResponsiveDesignManager()
# 生成响应式布局
panels = [
{"title": "CPU", "size": "small", "height": 6, "priority": "high"},
{"title": "Memory", "size": "small", "height": 6, "priority": "high"},
{"title": "Network", "size": "large", "height": 8, "priority": "medium"}
]
responsive_layout = responsive_manager.generate_responsive_layout(panels)
print("响应式布局:", responsive_layout["panels"][0]["responsive"])
# 创建移动端仪表板
desktop_dashboard = {
"title": "系统监控",
"panels": panels
}
mobile_dashboard = responsive_manager.create_mobile_dashboard(desktop_dashboard)
print("移动端仪表板:", mobile_dashboard["title"])
性能优化
1. 查询优化
class DashboardPerformanceOptimizer:
"""仪表板性能优化器"""
def __init__(self):
self.optimization_rules = {
"query_optimization": {
"reduce_time_range": "减少查询时间范围",
"add_filters": "添加标签过滤器",
"use_recording_rules": "使用记录规则",
"optimize_aggregation": "优化聚合函数"
},
"refresh_optimization": {
"increase_interval": "增加刷新间隔",
"conditional_refresh": "条件刷新",
"cache_queries": "缓存查询结果"
},
"panel_optimization": {
"reduce_panel_count": "减少面板数量",
"simplify_visualizations": "简化可视化",
"lazy_loading": "延迟加载"
}
}
def analyze_dashboard_performance(self, dashboard_config: Dict) -> Dict:
"""分析仪表板性能"""
analysis = {
"performance_score": 0,
"max_score": 100,
"bottlenecks": [],
"recommendations": [],
"metrics": {}
}
panels = dashboard_config.get("panels", [])
# 分析面板数量
panel_count = len(panels)
if panel_count <= 6:
analysis["performance_score"] += 25
elif panel_count <= 12:
analysis["performance_score"] += 15
analysis["recommendations"].append("考虑减少面板数量或使用分页")
else:
analysis["bottlenecks"].append(f"面板过多({panel_count}个),可能影响加载性能")
analysis["metrics"]["panel_count"] = panel_count
# 分析查询复杂度
complex_queries = 0
total_queries = 0
for panel in panels:
for target in panel.get("targets", []):
total_queries += 1
query = target.get("expr", "")
# 检查查询复杂度
if self._is_complex_query(query):
complex_queries += 1
if total_queries > 0:
complexity_ratio = complex_queries / total_queries
if complexity_ratio <= 0.3:
analysis["performance_score"] += 20
elif complexity_ratio <= 0.6:
analysis["performance_score"] += 10
analysis["recommendations"].append("优化复杂查询以提高性能")
else:
analysis["bottlenecks"].append(f"复杂查询比例过高({complexity_ratio:.1%})")
analysis["metrics"]["query_complexity"] = {
"total": total_queries,
"complex": complex_queries,
"ratio": complex_queries / max(total_queries, 1)
}
# 分析刷新间隔
refresh_interval = dashboard_config.get("refresh", "5s")
if refresh_interval in ["30s", "1m", "5m"]:
analysis["performance_score"] += 20
elif refresh_interval in ["10s", "15s"]:
analysis["performance_score"] += 10
analysis["recommendations"].append("考虑增加刷新间隔")
else:
analysis["bottlenecks"].append(f"刷新间隔过短({refresh_interval})")
analysis["metrics"]["refresh_interval"] = refresh_interval
# 分析时间范围
time_range = dashboard_config.get("time", {}).get("from", "now-1h")
if "now-" in time_range:
range_value = time_range.replace("now-", "")
if range_value in ["15m", "30m", "1h"]:
analysis["performance_score"] += 15
elif range_value in ["3h", "6h", "12h"]:
analysis["performance_score"] += 10
analysis["recommendations"].append("考虑减少默认时间范围")
else:
analysis["recommendations"].append("时间范围可能过大,影响查询性能")
analysis["metrics"]["time_range"] = time_range
# 分析数据转换
transformation_count = 0
for panel in panels:
transformation_count += len(panel.get("transformations", []))
if transformation_count <= panel_count:
analysis["performance_score"] += 10
else:
analysis["recommendations"].append("减少数据转换以提高性能")
analysis["metrics"]["transformations"] = transformation_count
# 分析数据源分布
datasources = set()
for panel in panels:
for target in panel.get("targets", []):
if "datasource" in target:
datasources.add(target["datasource"].get("uid", "unknown"))
if len(datasources) <= 2:
analysis["performance_score"] += 10
elif len(datasources) <= 4:
analysis["performance_score"] += 5
else:
analysis["recommendations"].append("考虑减少数据源数量")
analysis["metrics"]["datasource_count"] = len(datasources)
return analysis
def _is_complex_query(self, query: str) -> bool:
"""判断查询是否复杂"""
if not query:
return False
complexity_indicators = [
"rate(" in query and "[" in query, # 有rate函数
query.count("(") > 3, # 嵌套函数多
"by (" in query or "without (" in query, # 有分组
"or" in query or "and" in query, # 有逻辑运算
"offset" in query, # 有时间偏移
len(query) > 100 # 查询字符串长
]
return sum(complexity_indicators) >= 2
def generate_optimization_plan(self, dashboard_config: Dict) -> Dict:
"""生成优化计划"""
analysis = self.analyze_dashboard_performance(dashboard_config)
optimization_plan = {
"current_score": analysis["performance_score"],
"target_score": 85,
"optimizations": [],
"estimated_improvement": 0
}
# 基于分析结果生成优化建议
if analysis["metrics"]["panel_count"] > 8:
optimization_plan["optimizations"].append({
"type": "reduce_panels",
"description": "减少面板数量或使用分页",
"impact": "high",
"effort": "medium",
"estimated_gain": 15
})
if analysis["metrics"]["query_complexity"]["ratio"] > 0.5:
optimization_plan["optimizations"].append({
"type": "optimize_queries",
"description": "优化复杂查询,使用记录规则",
"impact": "high",
"effort": "high",
"estimated_gain": 20
})
if analysis["metrics"]["refresh_interval"] in ["1s", "5s"]:
optimization_plan["optimizations"].append({
"type": "increase_refresh",
"description": "增加刷新间隔到30s或1m",
"impact": "medium",
"effort": "low",
"estimated_gain": 10
})
if analysis["metrics"]["transformations"] > analysis["metrics"]["panel_count"] * 2:
optimization_plan["optimizations"].append({
"type": "reduce_transformations",
"description": "减少数据转换,在查询层面处理",
"impact": "medium",
"effort": "medium",
"estimated_gain": 8
})
if analysis["metrics"]["datasource_count"] > 3:
optimization_plan["optimizations"].append({
"type": "consolidate_datasources",
"description": "整合数据源或分离仪表板",
"impact": "low",
"effort": "high",
"estimated_gain": 5
})
# 计算总的预期改进
optimization_plan["estimated_improvement"] = sum(
opt["estimated_gain"] for opt in optimization_plan["optimizations"]
)
return optimization_plan
def apply_optimizations(self, dashboard_config: Dict, optimizations: List[str]) -> Dict:
"""应用优化"""
optimized_config = dashboard_config.copy()
for optimization in optimizations:
if optimization == "increase_refresh":
current_refresh = optimized_config.get("refresh", "5s")
if current_refresh in ["1s", "5s"]:
optimized_config["refresh"] = "30s"
elif current_refresh in ["10s", "15s"]:
optimized_config["refresh"] = "1m"
elif optimization == "reduce_transformations":
for panel in optimized_config.get("panels", []):
transformations = panel.get("transformations", [])
if len(transformations) > 2:
# 保留最重要的转换
panel["transformations"] = transformations[:2]
elif optimization == "optimize_queries":
for panel in optimized_config.get("panels", []):
for target in panel.get("targets", []):
query = target.get("expr", "")
if self._is_complex_query(query):
# 简化查询(示例)
if "rate(" in query and "[1s]" in query:
target["expr"] = query.replace("[1s]", "[5s]")
elif optimization == "reduce_panels":
panels = optimized_config.get("panels", [])
if len(panels) > 8:
# 保留前8个面板
optimized_config["panels"] = panels[:8]
optimized_config["tags"] = optimized_config.get("tags", []) + ["optimized"]
return optimized_config
def create_performance_report(self, dashboard_config: Dict) -> str:
"""创建性能报告"""
analysis = self.analyze_dashboard_performance(dashboard_config)
plan = self.generate_optimization_plan(dashboard_config)
report = f"""
# 仪表板性能分析报告
## 当前性能评分
**{analysis['performance_score']}/100**
## 关键指标
- 面板数量: {analysis['metrics']['panel_count']}
- 查询复杂度: {analysis['metrics']['query_complexity']['ratio']:.1%}
- 刷新间隔: {analysis['metrics']['refresh_interval']}
- 数据转换: {analysis['metrics']['transformations']}
- 数据源数量: {analysis['metrics']['datasource_count']}
## 性能瓶颈
"""
for bottleneck in analysis['bottlenecks']:
report += f"- {bottleneck}\n"
report += "\n## 优化建议\n"
for rec in analysis['recommendations']:
report += f"- {rec}\n"
report += "\n## 优化计划\n"
for opt in plan['optimizations']:
report += f"- **{opt['description']}** (影响: {opt['impact']}, 工作量: {opt['effort']}, 预期提升: {opt['estimated_gain']}分)\n"
report += f"\n## 预期改进\n"
report += f"实施所有优化后,预期性能评分可提升至: **{analysis['performance_score'] + plan['estimated_improvement']}/100**"
return report
# 使用示例
optimizer = DashboardPerformanceOptimizer()
# 分析仪表板性能
dashboard_config = {
"panels": [
{"targets": [{"expr": "rate(http_requests_total[1s])"}], "transformations": [{}, {}, {}]},
{"targets": [{"expr": "sum(rate(cpu_usage[5m])) by (instance)"}]},
# ... 更多面板
],
"refresh": "5s",
"time": {"from": "now-6h"}
}
analysis = optimizer.analyze_dashboard_performance(dashboard_config)
print("性能分析:", analysis["performance_score"])
# 生成优化计划
plan = optimizer.generate_optimization_plan(dashboard_config)
print("优化计划:", len(plan["optimizations"]), "项优化")
# 创建性能报告
report = optimizer.create_performance_report(dashboard_config)
print("性能报告已生成")
2. 缓存策略
class DashboardCacheManager:
"""仪表板缓存管理器"""
def __init__(self):
self.cache_strategies = {
"query_cache": {
"description": "查询结果缓存",
"ttl_options": ["30s", "1m", "5m", "15m", "1h"],
"suitable_for": ["静态数据", "历史数据", "聚合数据"]
},
"dashboard_cache": {
"description": "仪表板配置缓存",
"ttl_options": ["5m", "15m", "1h", "24h"],
"suitable_for": ["稳定配置", "模板仪表板"]
},
"data_source_cache": {
"description": "数据源连接缓存",
"ttl_options": ["1m", "5m", "15m"],
"suitable_for": ["远程数据源", "API数据源"]
}
}
def recommend_cache_strategy(self, panel_config: Dict) -> Dict:
"""推荐缓存策略"""
recommendations = {
"query_cache": None,
"refresh_cache": None,
"reasoning": []
}
# 分析查询特征
targets = panel_config.get("targets", [])
if not targets:
return recommendations
for target in targets:
query = target.get("expr", "")
datasource_type = target.get("datasource", {}).get("type", "")
# 基于查询类型推荐缓存
if "rate(" in query:
recommendations["query_cache"] = "1m"
recommendations["reasoning"].append("rate查询适合短期缓存")
elif "sum(" in query or "avg(" in query:
recommendations["query_cache"] = "5m"
recommendations["reasoning"].append("聚合查询适合中期缓存")
elif "histogram_quantile" in query:
recommendations["query_cache"] = "30s"
recommendations["reasoning"].append("分位数查询计算复杂,适合缓存")
# 基于数据源类型推荐缓存
if datasource_type == "mysql":
recommendations["query_cache"] = "5m"
recommendations["reasoning"].append("MySQL查询适合中期缓存")
elif datasource_type == "elasticsearch":
recommendations["query_cache"] = "1m"
recommendations["reasoning"].append("Elasticsearch查询适合短期缓存")
# 基于刷新间隔推荐缓存
refresh_interval = panel_config.get("interval", "")
if refresh_interval:
if refresh_interval in ["1m", "5m"]:
recommendations["refresh_cache"] = refresh_interval
recommendations["reasoning"].append(f"基于{refresh_interval}刷新间隔设置缓存")
return recommendations
def generate_cache_config(self, dashboard_config: Dict) -> Dict:
"""生成缓存配置"""
cache_config = {
"global_settings": {
"enable_query_cache": True,
"default_cache_ttl": "1m",
"max_cache_size": "100MB"
},
"panel_cache_settings": {},
"datasource_cache_settings": {}
}
# 为每个面板生成缓存设置
for i, panel in enumerate(dashboard_config.get("panels", [])):
panel_id = panel.get("id", f"panel_{i}")
recommendations = self.recommend_cache_strategy(panel)
if recommendations["query_cache"]:
cache_config["panel_cache_settings"][panel_id] = {
"query_cache_ttl": recommendations["query_cache"],
"enable_cache": True
}
# 为数据源生成缓存设置
datasources = set()
for panel in dashboard_config.get("panels", []):
for target in panel.get("targets", []):
if "datasource" in target:
ds_uid = target["datasource"].get("uid")
ds_type = target["datasource"].get("type")
if ds_uid:
datasources.add((ds_uid, ds_type))
for ds_uid, ds_type in datasources:
if ds_type == "prometheus":
cache_config["datasource_cache_settings"][ds_uid] = {
"connection_cache_ttl": "5m",
"query_cache_ttl": "1m"
}
elif ds_type == "mysql":
cache_config["datasource_cache_settings"][ds_uid] = {
"connection_cache_ttl": "15m",
"query_cache_ttl": "5m"
}
return cache_config
def estimate_cache_benefits(self, dashboard_config: Dict, cache_config: Dict) -> Dict:
"""估算缓存收益"""
benefits = {
"query_reduction": 0,
"load_time_improvement": 0,
"server_load_reduction": 0,
"cost_savings": 0
}
panels = dashboard_config.get("panels", [])
refresh_interval = dashboard_config.get("refresh", "30s")
# 计算查询减少量
total_queries = len(panels) * len(dashboard_config.get("panels", [{}])[0].get("targets", [{}]))
# 基于缓存TTL计算查询减少
if refresh_interval == "30s":
queries_per_hour = total_queries * 120 # 每小时查询次数
elif refresh_interval == "1m":
queries_per_hour = total_queries * 60
else:
queries_per_hour = total_queries * 12 # 假设5分钟刷新
# 估算缓存命中率
cache_hit_rate = 0.7 # 假设70%缓存命中率
benefits["query_reduction"] = queries_per_hour * cache_hit_rate
# 估算加载时间改进
avg_query_time = 200 # 假设平均查询时间200ms
cached_query_time = 10 # 缓存查询时间10ms
benefits["load_time_improvement"] = (avg_query_time - cached_query_time) * cache_hit_rate
# 估算服务器负载减少
benefits["server_load_reduction"] = benefits["query_reduction"] * 0.1 # 假设每个查询占用0.1%CPU
# 估算成本节省(基于云服务计费)
cost_per_query = 0.001 # 假设每个查询成本0.001元
benefits["cost_savings"] = benefits["query_reduction"] * cost_per_query * 24 # 每日节省
return benefits
# 使用示例
cache_manager = DashboardCacheManager()
# 推荐缓存策略
panel_config = {
"targets": [
{"expr": "rate(http_requests_total[5m])", "datasource": {"type": "prometheus"}}
],
"interval": "1m"
}
recommendation = cache_manager.recommend_cache_strategy(panel_config)
print("缓存推荐:", recommendation)
# 生成缓存配置
dashboard_config = {
"panels": [panel_config],
"refresh": "30s"
}
cache_config = cache_manager.generate_cache_config(dashboard_config)
print("缓存配置:", cache_config["global_settings"])
# 估算缓存收益
benefits = cache_manager.estimate_cache_benefits(dashboard_config, cache_config)
print("缓存收益:", benefits)
总结
关键要点
仪表板设计原则
- 清晰性:信息传达要清晰明确
- 简洁性:去除不必要的元素
- 一致性:保持设计统一
- 可访问性:确保所有用户都能理解
面板类型选择
- 时间序列数据:使用时间序列图或折线图
- 分类数据:使用柱状图或饼图
- 分布数据:使用直方图或热力图
- 地理数据:使用地图可视化
查询优化
- 使用合适的时间范围和聚合函数
- 添加标签过滤器减少查询范围
- 验证和优化查询语法
- 使用记录规则处理复杂查询
数据转换
- 合理使用数据转换功能
- 避免过度转换影响性能
- 优先在查询层面处理数据
响应式设计
- 考虑不同屏幕尺寸的显示效果
- 为移动端优化仪表板
- 使用合适的布局和字体大小
性能优化
- 控制面板数量和查询复杂度
- 设置合理的刷新间隔
- 使用缓存策略减少服务器负载
- 监控和分析仪表板性能
最佳实践
设计阶段
- 明确仪表板目标和受众
- 选择合适的可视化类型
- 保持设计一致性和简洁性
- 考虑用户体验和可访问性
开发阶段
- 优化查询性能
- 合理使用数据转换
- 实现响应式布局
- 添加适当的交互功能
部署阶段
- 配置缓存策略
- 设置合理的刷新间隔
- 监控仪表板性能
- 收集用户反馈
维护阶段
- 定期审查和优化
- 更新过时的查询和配置
- 根据需求调整设计
- 保持文档更新
下一步学习建议
深入学习特定可视化类型
- 掌握高级图表配置
- 学习自定义可视化插件
- 了解数据可视化理论
实践项目
- 创建业务监控仪表板
- 构建系统性能仪表板
- 开发用户行为分析仪表板
技术拓展
- 学习Grafana插件开发
- 掌握高级查询技巧
- 了解可视化最佳实践
工具集成
- 与告警系统集成
- 自动化仪表板部署
- 集成外部数据源
通过本章的学习,你应该能够创建功能完善、性能优良的Grafana仪表板,为数据可视化和监控提供强有力的支持。