概述

仪表板是Grafana的核心功能,它将来自不同数据源的数据以可视化的方式呈现给用户。一个设计良好的仪表板能够帮助用户快速理解数据趋势、发现问题并做出决策。本章将详细介绍如何创建和设计高效、美观的Grafana仪表板。

学习目标

通过本章学习,您将能够:

  1. 掌握仪表板基础概念

    • 理解仪表板的组成结构
    • 了解面板类型和用途
    • 掌握布局设计原则
  2. 创建和配置面板

    • 学会添加和配置各种类型的面板
    • 掌握查询编辑器的使用
    • 了解数据转换和处理
  3. 设计用户友好的界面

    • 应用可视化设计最佳实践
    • 创建响应式布局
    • 实现交互式功能
  4. 优化仪表板性能

    • 减少查询负载
    • 优化刷新策略
    • 提高用户体验

仪表板基础概念

1. 仪表板架构

from enum import Enum
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from datetime import datetime

class PanelType(Enum):
    """面板类型枚举"""
    GRAPH = "graph"
    SINGLESTAT = "singlestat"
    TABLE = "table"
    HEATMAP = "heatmap"
    GAUGE = "gauge"
    BAR_GAUGE = "bargauge"
    STAT = "stat"
    PIE_CHART = "piechart"
    WORLDMAP = "worldmap"
    TEXT = "text"
    LOGS = "logs"
    NODE_GRAPH = "nodeGraph"
    ALERT_LIST = "alertlist"
    DASHBOARD_LIST = "dashlist"
    PLUGIN_LIST = "pluginlist"

class VisualizationType(Enum):
    """可视化类型枚举"""
    TIME_SERIES = "timeseries"
    BAR_CHART = "barchart"
    HISTOGRAM = "histogram"
    CANDLESTICK = "candlestick"
    STATE_TIMELINE = "state-timeline"
    STATUS_HISTORY = "status-history"
    FLAME_GRAPH = "flamegraph"
    TRACE_VIEW = "traces"

class RefreshInterval(Enum):
    """刷新间隔枚举"""
    OFF = ""
    FIVE_SECONDS = "5s"
    TEN_SECONDS = "10s"
    THIRTY_SECONDS = "30s"
    ONE_MINUTE = "1m"
    FIVE_MINUTES = "5m"
    FIFTEEN_MINUTES = "15m"
    THIRTY_MINUTES = "30m"
    ONE_HOUR = "1h"
    TWO_HOURS = "2h"
    ONE_DAY = "1d"

@dataclass
class GridPos:
    """面板网格位置"""
    x: int  # X坐标 (0-24)
    y: int  # Y坐标
    w: int  # 宽度 (1-24)
    h: int  # 高度

@dataclass
class TimeRange:
    """时间范围"""
    from_time: str  # 开始时间
    to_time: str    # 结束时间
    
    @classmethod
    def last_hours(cls, hours: int):
        return cls(f"now-{hours}h", "now")
    
    @classmethod
    def last_days(cls, days: int):
        return cls(f"now-{days}d", "now")
    
    @classmethod
    def custom_range(cls, start: str, end: str):
        return cls(start, end)

@dataclass
class Target:
    """查询目标"""
    expr: str                    # 查询表达式
    datasource: str             # 数据源
    legend_format: str = ""     # 图例格式
    interval: str = ""          # 查询间隔
    instant: bool = False       # 即时查询
    ref_id: str = "A"          # 引用ID
    hide: bool = False          # 是否隐藏

@dataclass
class Panel:
    """面板配置"""
    id: int
    title: str
    type: PanelType
    grid_pos: GridPos
    targets: List[Target]
    datasource: Optional[str] = None
    description: str = ""
    transparent: bool = False
    options: Dict[str, Any] = None
    field_config: Dict[str, Any] = None
    
    def __post_init__(self):
        if self.options is None:
            self.options = {}
        if self.field_config is None:
            self.field_config = {}

@dataclass
class Variable:
    """仪表板变量"""
    name: str
    type: str                   # query, custom, constant, datasource, interval, textbox
    label: str = ""
    description: str = ""
    query: str = ""
    datasource: str = ""
    multi: bool = False         # 是否支持多选
    include_all: bool = False   # 是否包含"All"选项
    all_value: str = ""
    current: Dict[str, Any] = None
    options: List[Dict[str, Any]] = None
    
    def __post_init__(self):
        if self.current is None:
            self.current = {}
        if self.options is None:
            self.options = []

@dataclass
class Dashboard:
    """仪表板配置"""
    id: Optional[int]
    uid: str
    title: str
    description: str = ""
    tags: List[str] = None
    panels: List[Panel] = None
    templating: List[Variable] = None
    time: TimeRange = None
    refresh: RefreshInterval = RefreshInterval.FIVE_MINUTES
    timezone: str = "browser"
    editable: bool = True
    graph_tooltip: str = "shared_crosshair"  # default, shared_crosshair, shared_tooltip
    
    def __post_init__(self):
        if self.tags is None:
            self.tags = []
        if self.panels is None:
            self.panels = []
        if self.templating is None:
            self.templating = []
        if self.time is None:
            self.time = TimeRange.last_hours(6)

class DashboardManager:
    """仪表板管理器"""
    
    def __init__(self):
        self.dashboards: Dict[str, Dashboard] = {}
        self.panel_id_counter = 1
    
    def create_dashboard(self, title: str, description: str = "", tags: List[str] = None) -> Dashboard:
        """创建新仪表板"""
        import uuid
        uid = str(uuid.uuid4())[:8]
        
        dashboard = Dashboard(
            id=None,
            uid=uid,
            title=title,
            description=description,
            tags=tags or []
        )
        
        self.dashboards[uid] = dashboard
        return dashboard
    
    def add_panel(self, dashboard: Dashboard, panel_config: Dict) -> Panel:
        """添加面板到仪表板"""
        panel = Panel(
            id=self.panel_id_counter,
            title=panel_config.get("title", f"Panel {self.panel_id_counter}"),
            type=PanelType(panel_config.get("type", "graph")),
            grid_pos=GridPos(
                x=panel_config.get("x", 0),
                y=panel_config.get("y", 0),
                w=panel_config.get("w", 12),
                h=panel_config.get("h", 8)
            ),
            targets=panel_config.get("targets", []),
            datasource=panel_config.get("datasource"),
            description=panel_config.get("description", ""),
            transparent=panel_config.get("transparent", False),
            options=panel_config.get("options", {}),
            field_config=panel_config.get("field_config", {})
        )
        
        dashboard.panels.append(panel)
        self.panel_id_counter += 1
        return panel
    
    def add_variable(self, dashboard: Dashboard, var_config: Dict) -> Variable:
        """添加变量到仪表板"""
        variable = Variable(
            name=var_config["name"],
            type=var_config["type"],
            label=var_config.get("label", ""),
            description=var_config.get("description", ""),
            query=var_config.get("query", ""),
            datasource=var_config.get("datasource", ""),
            multi=var_config.get("multi", False),
            include_all=var_config.get("include_all", False),
            all_value=var_config.get("all_value", ""),
            current=var_config.get("current", {}),
            options=var_config.get("options", [])
        )
        
        dashboard.templating.append(variable)
        return variable
    
    def get_dashboard_json(self, dashboard: Dashboard) -> Dict:
        """获取仪表板JSON配置"""
        return {
            "id": dashboard.id,
            "uid": dashboard.uid,
            "title": dashboard.title,
            "description": dashboard.description,
            "tags": dashboard.tags,
            "timezone": dashboard.timezone,
            "editable": dashboard.editable,
            "graphTooltip": dashboard.graph_tooltip,
            "time": {
                "from": dashboard.time.from_time,
                "to": dashboard.time.to_time
            },
            "refresh": dashboard.refresh.value,
            "panels": [
                self._panel_to_dict(panel) for panel in dashboard.panels
            ],
            "templating": {
                "list": [
                    self._variable_to_dict(var) for var in dashboard.templating
                ]
            },
            "version": 1,
            "schemaVersion": 30
        }
    
    def _panel_to_dict(self, panel: Panel) -> Dict:
        """将面板转换为字典"""
        return {
            "id": panel.id,
            "title": panel.title,
            "type": panel.type.value,
            "gridPos": {
                "x": panel.grid_pos.x,
                "y": panel.grid_pos.y,
                "w": panel.grid_pos.w,
                "h": panel.grid_pos.h
            },
            "targets": [
                {
                    "expr": target.expr,
                    "datasource": target.datasource,
                    "legendFormat": target.legend_format,
                    "interval": target.interval,
                    "instant": target.instant,
                    "refId": target.ref_id,
                    "hide": target.hide
                } for target in panel.targets
            ],
            "datasource": panel.datasource,
            "description": panel.description,
            "transparent": panel.transparent,
            "options": panel.options,
            "fieldConfig": panel.field_config
        }
    
    def _variable_to_dict(self, variable: Variable) -> Dict:
        """将变量转换为字典"""
        return {
            "name": variable.name,
            "type": variable.type,
            "label": variable.label,
            "description": variable.description,
            "query": variable.query,
            "datasource": variable.datasource,
            "multi": variable.multi,
            "includeAll": variable.include_all,
            "allValue": variable.all_value,
            "current": variable.current,
            "options": variable.options
        }
    
    def calculate_layout(self, panels_config: List[Dict]) -> List[GridPos]:
        """计算面板布局"""
        positions = []
        current_y = 0
        current_x = 0
        row_height = 0
        
        for config in panels_config:
            width = config.get("width", 12)
            height = config.get("height", 8)
            
            # 检查是否需要换行
            if current_x + width > 24:
                current_y += row_height
                current_x = 0
                row_height = 0
            
            positions.append(GridPos(
                x=current_x,
                y=current_y,
                w=width,
                h=height
            ))
            
            current_x += width
            row_height = max(row_height, height)
        
        return positions

# 使用示例
manager = DashboardManager()

# 创建仪表板
dashboard = manager.create_dashboard(
    title="系统监控仪表板",
    description="监控系统性能和健康状态",
    tags=["monitoring", "system", "performance"]
)

# 添加变量
manager.add_variable(dashboard, {
    "name": "instance",
    "type": "query",
    "label": "实例",
    "query": "label_values(up, instance)",
    "datasource": "prometheus",
    "multi": True,
    "include_all": True
})

# 添加面板
manager.add_panel(dashboard, {
    "title": "CPU使用率",
    "type": "timeseries",
    "x": 0, "y": 0, "w": 12, "h": 8,
    "targets": [
        Target(
            expr="100 - (avg(rate(node_cpu_seconds_total{mode='idle',instance=~'$instance'}[5m])) * 100)",
            datasource="prometheus",
            legend_format="CPU使用率"
        )
    ],
    "datasource": "prometheus"
})

manager.add_panel(dashboard, {
    "title": "内存使用率",
    "type": "gauge",
    "x": 12, "y": 0, "w": 12, "h": 8,
    "targets": [
        Target(
            expr="(1 - (node_memory_MemAvailable_bytes{instance=~'$instance'} / node_memory_MemTotal_bytes{instance=~'$instance'})) * 100",
            datasource="prometheus",
            legend_format="内存使用率"
        )
    ],
    "datasource": "prometheus"
})

# 生成JSON配置
dashboard_json = manager.get_dashboard_json(dashboard)
print("仪表板配置:", dashboard_json)

2. 面板类型详解

class PanelTypeGuide:
    """面板类型使用指南"""
    
    def __init__(self):
        self.panel_guides = {
            "timeseries": {
                "description": "时间序列图表,显示数据随时间的变化",
                "use_cases": [
                    "监控指标趋势",
                    "性能数据分析",
                    "业务指标跟踪",
                    "异常检测"
                ],
                "best_practices": [
                    "使用合适的时间范围",
                    "选择恰当的聚合函数",
                    "设置有意义的图例",
                    "配置阈值线"
                ],
                "configuration": {
                    "legend": {
                        "displayMode": "table",
                        "placement": "bottom",
                        "values": ["min", "max", "mean", "current"]
                    },
                    "tooltip": {
                        "mode": "multi",
                        "sort": "desc"
                    },
                    "axes": {
                        "left": {
                            "unit": "percent",
                            "min": 0,
                            "max": 100
                        }
                    }
                }
            },
            "stat": {
                "description": "单值显示,突出显示关键指标",
                "use_cases": [
                    "KPI指标展示",
                    "当前状态显示",
                    "计数器显示",
                    "百分比显示"
                ],
                "best_practices": [
                    "使用大字体显示主要数值",
                    "添加趋势指示器",
                    "设置颜色阈值",
                    "包含上下文信息"
                ],
                "configuration": {
                    "reduceOptions": {
                        "values": False,
                        "calcs": ["lastNotNull"],
                        "fields": ""
                    },
                    "textMode": "auto",
                    "colorMode": "value",
                    "graphMode": "area",
                    "justifyMode": "auto"
                }
            },
            "gauge": {
                "description": "仪表盘显示,直观显示数值在范围内的位置",
                "use_cases": [
                    "百分比显示",
                    "容量使用率",
                    "性能指标",
                    "健康评分"
                ],
                "best_practices": [
                    "设置合理的最小最大值",
                    "使用颜色区间表示状态",
                    "添加阈值标记",
                    "保持简洁的标签"
                ],
                "configuration": {
                    "min": 0,
                    "max": 100,
                    "thresholds": {
                        "steps": [
                            {"color": "green", "value": 0},
                            {"color": "yellow", "value": 70},
                            {"color": "red", "value": 90}
                        ]
                    }
                }
            },
            "table": {
                "description": "表格显示,展示结构化数据",
                "use_cases": [
                    "详细数据列表",
                    "多维度数据对比",
                    "日志记录显示",
                    "配置信息展示"
                ],
                "best_practices": [
                    "合理设置列宽",
                    "使用排序功能",
                    "添加过滤器",
                    "高亮重要数据"
                ],
                "configuration": {
                    "showHeader": True,
                    "sortBy": [{"desc": True, "displayName": "Time"}],
                    "styles": [
                        {
                            "pattern": "Time",
                            "type": "date",
                            "dateFormat": "YYYY-MM-DD HH:mm:ss"
                        }
                    ]
                }
            },
            "heatmap": {
                "description": "热力图,显示数据密度和分布",
                "use_cases": [
                    "响应时间分布",
                    "用户活动模式",
                    "错误率分析",
                    "资源使用模式"
                ],
                "best_practices": [
                    "选择合适的颜色方案",
                    "设置恰当的桶大小",
                    "添加数值标签",
                    "使用对数刻度(如需要)"
                ],
                "configuration": {
                    "xAxis": {
                        "show": True
                    },
                    "yAxis": {
                        "show": True,
                        "logBase": 1
                    },
                    "color": {
                        "mode": "spectrum",
                        "cardColor": "#b4ff00",
                        "colorScale": "sqrt",
                        "exponent": 0.5,
                        "colorScheme": "interpolateOranges"
                    }
                }
            },
            "piechart": {
                "description": "饼图,显示部分与整体的关系",
                "use_cases": [
                    "资源分配显示",
                    "分类数据占比",
                    "状态分布",
                    "成本分析"
                ],
                "best_practices": [
                    "限制分类数量(<7个)",
                    "使用对比鲜明的颜色",
                    "显示百分比标签",
                    "考虑使用环形图"
                ],
                "configuration": {
                    "pieType": "pie",  # pie, donut
                    "tooltipMode": "single",
                    "legendDisplayMode": "table",
                    "legendPlacement": "right",
                    "legendValues": ["value", "percent"]
                }
            }
        }
    
    def get_panel_guide(self, panel_type: str) -> Dict:
        """获取面板类型指南"""
        return self.panel_guides.get(panel_type, {
            "description": "未知面板类型",
            "use_cases": [],
            "best_practices": [],
            "configuration": {}
        })
    
    def recommend_panel_type(self, data_characteristics: Dict) -> List[str]:
        """根据数据特征推荐面板类型"""
        recommendations = []
        
        # 时间序列数据
        if data_characteristics.get("has_time_dimension", False):
            recommendations.append("timeseries")
        
        # 单一数值
        if data_characteristics.get("single_value", False):
            recommendations.extend(["stat", "gauge"])
        
        # 分类数据
        if data_characteristics.get("categorical", False):
            recommendations.extend(["piechart", "table"])
        
        # 多维数据
        if data_characteristics.get("multi_dimensional", False):
            recommendations.extend(["table", "heatmap"])
        
        # 分布数据
        if data_characteristics.get("distribution", False):
            recommendations.extend(["heatmap", "histogram"])
        
        return recommendations
    
    def generate_panel_config(self, panel_type: str, data_source: str, query: str) -> Dict:
        """生成面板配置模板"""
        base_config = {
            "type": panel_type,
            "datasource": data_source,
            "targets": [
                {
                    "expr": query,
                    "refId": "A"
                }
            ]
        }
        
        # 添加类型特定配置
        guide = self.get_panel_guide(panel_type)
        if guide.get("configuration"):
            base_config.update(guide["configuration"])
        
        return base_config

# 使用示例
guide = PanelTypeGuide()

# 获取面板类型指南
timeseries_guide = guide.get_panel_guide("timeseries")
print("时间序列面板指南:", timeseries_guide)

# 推荐面板类型
data_chars = {
    "has_time_dimension": True,
    "single_value": False,
    "categorical": False,
    "multi_dimensional": False,
    "distribution": False
}
recommendations = guide.recommend_panel_type(data_chars)
print("推荐面板类型:", recommendations)

# 生成面板配置
config = guide.generate_panel_config(
    "timeseries",
    "prometheus",
    "rate(http_requests_total[5m])"
)
print("面板配置:", config)

查询编辑器与数据处理

1. 查询编辑器使用

class QueryEditor:
    """查询编辑器管理器"""
    
    def __init__(self):
        self.query_templates = {
            "prometheus": {
                "cpu_usage": "100 - (avg(rate(node_cpu_seconds_total{mode='idle'}[5m])) * 100)",
                "memory_usage": "(1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) * 100",
                "disk_usage": "100 - ((node_filesystem_avail_bytes * 100) / node_filesystem_size_bytes)",
                "network_traffic": "rate(node_network_receive_bytes_total[5m])",
                "http_requests": "rate(http_requests_total[5m])",
                "error_rate": "rate(http_requests_total{status=~'5..'}[5m]) / rate(http_requests_total[5m]) * 100",
                "response_time": "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))"
            },
            "mysql": {
                "connections": "SELECT COUNT(*) FROM information_schema.processlist",
                "slow_queries": "SHOW GLOBAL STATUS LIKE 'Slow_queries'",
                "qps": "SHOW GLOBAL STATUS LIKE 'Questions'",
                "innodb_buffer": "SHOW GLOBAL STATUS LIKE 'Innodb_buffer_pool_pages_total'",
                "table_locks": "SHOW GLOBAL STATUS LIKE 'Table_locks_waited'"
            },
            "influxdb": {
                "cpu_usage": "SELECT mean(usage_idle) FROM cpu WHERE time >= now() - 1h GROUP BY time(5m)",
                "memory_usage": "SELECT mean(used_percent) FROM mem WHERE time >= now() - 1h GROUP BY time(5m)",
                "disk_io": "SELECT mean(read_bytes), mean(write_bytes) FROM diskio WHERE time >= now() - 1h GROUP BY time(5m)"
            },
            "elasticsearch": {
                "log_count": '{"query": {"range": {"@timestamp": {"gte": "now-1h"}}}}',
                "error_logs": '{"query": {"bool": {"must": [{"range": {"@timestamp": {"gte": "now-1h"}}}, {"term": {"level": "error"}}]}}}',
                "response_codes": '{"aggs": {"status_codes": {"terms": {"field": "response_code"}}}}'
            }
        }
        
        self.query_functions = {
            "prometheus": {
                "aggregation": ["sum", "avg", "min", "max", "count", "stddev", "stdvar"],
                "rate": ["rate", "irate", "increase", "delta"],
                "time": ["time", "hour", "minute", "day_of_month", "day_of_week", "days_in_month"],
                "math": ["abs", "ceil", "floor", "round", "sqrt", "exp", "ln", "log2", "log10"],
                "prediction": ["predict_linear", "holt_winters"],
                "histogram": ["histogram_quantile", "histogram_count", "histogram_sum"]
            },
            "mysql": {
                "aggregation": ["COUNT", "SUM", "AVG", "MIN", "MAX", "GROUP_CONCAT"],
                "string": ["CONCAT", "SUBSTRING", "LENGTH", "UPPER", "LOWER", "TRIM"],
                "date": ["NOW", "DATE", "TIME", "YEAR", "MONTH", "DAY", "DATE_FORMAT"],
                "math": ["ABS", "CEIL", "FLOOR", "ROUND", "SQRT", "POW", "MOD"]
            }
        }
    
    def get_query_template(self, datasource: str, metric_type: str) -> str:
        """获取查询模板"""
        return self.query_templates.get(datasource, {}).get(metric_type, "")
    
    def get_available_functions(self, datasource: str) -> Dict[str, List[str]]:
        """获取可用函数列表"""
        return self.query_functions.get(datasource, {})
    
    def build_prometheus_query(self, metric: str, filters: Dict = None, 
                             aggregation: str = None, rate_interval: str = "5m") -> str:
        """构建Prometheus查询"""
        query = metric
        
        # 添加过滤器
        if filters:
            filter_parts = []
            for key, value in filters.items():
                if isinstance(value, list):
                    filter_parts.append(f'{key}=~"{"||".join(value)}"')
                else:
                    filter_parts.append(f'{key}="{value}"')
            
            if filter_parts:
                query += "{" + ",".join(filter_parts) + "}"
        
        # 添加rate函数
        if "_total" in metric or "_count" in metric:
            query = f"rate({query}[{rate_interval}])"
        
        # 添加聚合函数
        if aggregation:
            query = f"{aggregation}({query})"
        
        return query
    
    def build_mysql_query(self, table: str, columns: List[str], 
                         conditions: Dict = None, group_by: List[str] = None,
                         order_by: str = None, limit: int = None) -> str:
        """构建MySQL查询"""
        query = f"SELECT {', '.join(columns)} FROM {table}"
        
        # 添加WHERE条件
        if conditions:
            where_parts = []
            for key, value in conditions.items():
                if isinstance(value, list):
                    where_parts.append(f"{key} IN ({', '.join(map(str, value))})") 
                else:
                    where_parts.append(f"{key} = '{value}'")
            
            if where_parts:
                query += " WHERE " + " AND ".join(where_parts)
        
        # 添加GROUP BY
        if group_by:
            query += f" GROUP BY {', '.join(group_by)}"
        
        # 添加ORDER BY
        if order_by:
            query += f" ORDER BY {order_by}"
        
        # 添加LIMIT
        if limit:
            query += f" LIMIT {limit}"
        
        return query
    
    def validate_query(self, datasource: str, query: str) -> Dict:
        """验证查询语法"""
        validation_result = {
            "valid": True,
            "errors": [],
            "warnings": [],
            "suggestions": []
        }
        
        if datasource == "prometheus":
            # 检查基本语法
            if not query.strip():
                validation_result["valid"] = False
                validation_result["errors"].append("查询不能为空")
            
            # 检查括号匹配
            if query.count("(") != query.count(")"):
                validation_result["valid"] = False
                validation_result["errors"].append("括号不匹配")
            
            # 检查常见错误
            if "rate(" in query and "[" not in query:
                validation_result["warnings"].append("rate函数需要时间范围参数")
            
            # 性能建议
            if ".*" in query:
                validation_result["suggestions"].append("避免使用正则表达式.*,考虑更具体的标签匹配")
            
            if not any(interval in query for interval in ["[1m]", "[5m]", "[15m]", "[1h]"]):
                if any(func in query for func in ["rate", "irate", "increase"]):
                    validation_result["suggestions"].append("建议为rate函数指定合适的时间间隔")
        
        elif datasource == "mysql":
            # 检查SQL基本语法
            if not query.strip().upper().startswith(("SELECT", "SHOW", "DESCRIBE")):
                validation_result["warnings"].append("建议使用只读查询(SELECT, SHOW, DESCRIBE)")
            
            # 检查是否有LIMIT
            if "SELECT" in query.upper() and "LIMIT" not in query.upper():
                validation_result["suggestions"].append("建议添加LIMIT子句以限制返回结果数量")
        
        return validation_result
    
    def optimize_query(self, datasource: str, query: str) -> Dict:
        """优化查询性能"""
        optimization = {
            "original_query": query,
            "optimized_query": query,
            "optimizations": [],
            "performance_impact": "low"
        }
        
        if datasource == "prometheus":
            optimized = query
            
            # 优化聚合顺序
            if "sum(rate(" in query:
                # 建议先聚合再计算rate
                optimization["optimizations"].append("考虑调整聚合函数顺序以提高性能")
            
            # 优化时间范围
            if "[1s]" in query:
                optimized = optimized.replace("[1s]", "[5s]")
                optimization["optimizations"].append("增加时间间隔从1s到5s以减少计算负载")
            
            # 添加标签过滤
            if "{" not in query and "up" not in query:
                optimization["optimizations"].append("建议添加标签过滤器以减少查询范围")
            
            optimization["optimized_query"] = optimized
        
        elif datasource == "mysql":
            optimized = query
            
            # 添加索引建议
            if "WHERE" in query.upper():
                optimization["optimizations"].append("确保WHERE子句中的列有适当的索引")
            
            # 限制结果集
            if "LIMIT" not in query.upper() and "SELECT" in query.upper():
                optimized += " LIMIT 1000"
                optimization["optimizations"].append("添加LIMIT子句限制结果集大小")
            
            optimization["optimized_query"] = optimized
        
        return optimization

# 使用示例
editor = QueryEditor()

# 获取查询模板
cpu_query = editor.get_query_template("prometheus", "cpu_usage")
print("CPU使用率查询:", cpu_query)

# 构建Prometheus查询
query = editor.build_prometheus_query(
    "http_requests_total",
    filters={"job": "api-server", "status": ["200", "201"]},
    aggregation="sum",
    rate_interval="5m"
)
print("构建的查询:", query)

# 验证查询
validation = editor.validate_query("prometheus", query)
print("查询验证:", validation)

# 优化查询
optimization = editor.optimize_query("prometheus", query)
print("查询优化:", optimization)

2. 数据转换与处理

class DataTransformer:
    """数据转换器"""
    
    def __init__(self):
        self.transformations = {
            "reduce": {
                "description": "数据聚合,将多个值减少为单个值",
                "functions": ["last", "lastNotNull", "first", "firstNotNull", "min", "max", "mean", "sum", "count", "range", "diff", "stdDev"]
            },
            "filter": {
                "description": "数据过滤,根据条件筛选数据",
                "types": ["byName", "byRegex", "byType", "byValue"]
            },
            "organize": {
                "description": "重新组织数据结构",
                "operations": ["rename", "hide", "reorder"]
            },
            "calculate": {
                "description": "计算新字段",
                "functions": ["binary", "unary", "window"]
            },
            "convert": {
                "description": "数据类型转换",
                "types": ["string", "number", "time", "boolean"]
            },
            "group": {
                "description": "数据分组",
                "methods": ["byField", "byValue", "byTime"]
            }
        }
    
    def create_reduce_transformation(self, reducer: str, fields: List[str] = None) -> Dict:
        """创建聚合转换"""
        return {
            "id": "reduce",
            "options": {
                "reducers": [reducer],
                "fields": fields or "",
                "values": False
            }
        }
    
    def create_filter_transformation(self, filter_type: str, pattern: str, 
                                   include: bool = True) -> Dict:
        """创建过滤转换"""
        filter_config = {
            "id": "filterFieldsByName",
            "options": {
                "include": {
                    "pattern": pattern
                } if include else None,
                "exclude": {
                    "pattern": pattern
                } if not include else None
            }
        }
        
        if filter_type == "byRegex":
            filter_config["id"] = "filterFieldsByName"
            filter_config["options"]["include" if include else "exclude"]["options"] = "regex"
        elif filter_type == "byType":
            filter_config["id"] = "filterFieldsByType"
            filter_config["options"] = {"include": pattern} if include else {"exclude": pattern}
        
        return filter_config
    
    def create_organize_transformation(self, field_configs: List[Dict]) -> Dict:
        """创建字段组织转换"""
        return {
            "id": "organize",
            "options": {
                "excludeByName": {config["name"]: True for config in field_configs if config.get("hide", False)},
                "indexByName": {config["name"]: i for i, config in enumerate(field_configs) if "order" in config},
                "renameByName": {config["name"]: config["rename"] for config in field_configs if "rename" in config}
            }
        }
    
    def create_calculate_transformation(self, expression: str, alias: str) -> Dict:
        """创建计算字段转换"""
        return {
            "id": "calculateField",
            "options": {
                "mode": "binary",
                "reduce": {
                    "reducer": "sum"
                },
                "binary": {
                    "left": "Field A",
                    "operator": "+",
                    "right": "Field B"
                },
                "alias": alias
            }
        }
    
    def create_convert_transformation(self, field_name: str, target_type: str) -> Dict:
        """创建类型转换"""
        return {
            "id": "convertFieldType",
            "options": {
                "conversions": [
                    {
                        "targetField": field_name,
                        "destinationType": target_type
                    }
                ]
            }
        }
    
    def create_group_transformation(self, group_by: List[str]) -> Dict:
        """创建分组转换"""
        return {
            "id": "groupBy",
            "options": {
                "fields": {
                    field: {
                        "aggregations": ["last"],
                        "operation": "groupby"
                    } for field in group_by
                }
            }
        }
    
    def create_time_series_transformation(self, time_field: str, value_field: str) -> Dict:
        """创建时间序列转换"""
        return {
            "id": "seriesToColumns",
            "options": {
                "byField": time_field
            }
        }
    
    def create_threshold_transformation(self, field: str, thresholds: List[Dict]) -> Dict:
        """创建阈值转换"""
        return {
            "id": "configFromData",
            "options": {
                "configPath": "thresholds",
                "mappings": [
                    {
                        "field": field,
                        "handlerKey": "threshold1",
                        "reducer": "last"
                    }
                ]
            }
        }
    
    def generate_transformation_pipeline(self, requirements: List[Dict]) -> List[Dict]:
        """生成转换管道"""
        pipeline = []
        
        for req in requirements:
            transform_type = req.get("type")
            
            if transform_type == "reduce":
                pipeline.append(self.create_reduce_transformation(
                    req.get("reducer", "last"),
                    req.get("fields")
                ))
            
            elif transform_type == "filter":
                pipeline.append(self.create_filter_transformation(
                    req.get("filter_type", "byName"),
                    req.get("pattern", ""),
                    req.get("include", True)
                ))
            
            elif transform_type == "organize":
                pipeline.append(self.create_organize_transformation(
                    req.get("field_configs", [])
                ))
            
            elif transform_type == "calculate":
                pipeline.append(self.create_calculate_transformation(
                    req.get("expression", ""),
                    req.get("alias", "calculated")
                ))
            
            elif transform_type == "convert":
                pipeline.append(self.create_convert_transformation(
                    req.get("field_name", ""),
                    req.get("target_type", "string")
                ))
            
            elif transform_type == "group":
                pipeline.append(self.create_group_transformation(
                    req.get("group_by", [])
                ))
        
        return pipeline
    
    def get_transformation_examples(self) -> Dict:
        """获取转换示例"""
        return {
            "cpu_percentage": {
                "description": "将CPU使用率转换为百分比",
                "transformations": [
                    {
                        "id": "calculateField",
                        "options": {
                            "mode": "binary",
                            "binary": {
                                "left": "cpu_usage",
                                "operator": "*",
                                "right": "100"
                            },
                            "alias": "CPU使用率(%)"
                        }
                    }
                ]
            },
            "memory_available": {
                "description": "计算可用内存",
                "transformations": [
                    {
                        "id": "calculateField",
                        "options": {
                            "mode": "binary",
                            "binary": {
                                "left": "memory_total",
                                "operator": "-",
                                "right": "memory_used"
                            },
                            "alias": "可用内存"
                        }
                    }
                ]
            },
            "error_rate": {
                "description": "计算错误率",
                "transformations": [
                    {
                        "id": "calculateField",
                        "options": {
                            "mode": "binary",
                            "binary": {
                                "left": "errors",
                                "operator": "/",
                                "right": "total_requests"
                            },
                            "alias": "错误率"
                        }
                    },
                    {
                        "id": "calculateField",
                        "options": {
                            "mode": "binary",
                            "binary": {
                                "left": "错误率",
                                "operator": "*",
                                "right": "100"
                            },
                            "alias": "错误率(%)"
                        }
                    }
                ]
            }
        }

# 使用示例
transformer = DataTransformer()

# 创建转换管道
requirements = [
    {
        "type": "filter",
        "filter_type": "byName",
        "pattern": "cpu_*",
        "include": True
    },
    {
        "type": "reduce",
        "reducer": "mean",
        "fields": ["cpu_usage"]
    },
    {
        "type": "organize",
        "field_configs": [
            {"name": "cpu_usage", "rename": "CPU使用率", "order": 0},
            {"name": "timestamp", "hide": True}
        ]
    }
]

pipeline = transformer.generate_transformation_pipeline(requirements)
print("转换管道:", pipeline)

# 获取转换示例
examples = transformer.get_transformation_examples()
print("转换示例:", examples["cpu_percentage"])

可视化设计最佳实践

1. 设计原则

class VisualizationDesignGuide:
    """可视化设计指南"""
    
    def __init__(self):
        self.design_principles = {
            "clarity": {
                "description": "清晰性 - 信息传达要清晰明确",
                "guidelines": [
                    "使用合适的图表类型",
                    "避免信息过载",
                    "保持一致的颜色方案",
                    "使用清晰的标签和标题"
                ]
            },
            "simplicity": {
                "description": "简洁性 - 去除不必要的元素",
                "guidelines": [
                    "减少图表装饰",
                    "使用空白空间",
                    "限制颜色数量",
                    "避免3D效果"
                ]
            },
            "consistency": {
                "description": "一致性 - 保持设计统一",
                "guidelines": [
                    "统一的字体和大小",
                    "一致的颜色编码",
                    "标准化的图例位置",
                    "统一的时间格式"
                ]
            },
            "accessibility": {
                "description": "可访问性 - 确保所有用户都能理解",
                "guidelines": [
                    "使用色盲友好的颜色",
                    "提供替代文本",
                    "确保足够的对比度",
                    "支持键盘导航"
                ]
            }
        }
        
        self.color_schemes = {
            "status": {
                "success": "#28a745",
                "warning": "#ffc107", 
                "error": "#dc3545",
                "info": "#17a2b8"
            },
            "performance": {
                "excellent": "#28a745",
                "good": "#6f42c1",
                "average": "#fd7e14",
                "poor": "#dc3545"
            },
            "categorical": [
                "#1f77b4", "#ff7f0e", "#2ca02c", "#d62728",
                "#9467bd", "#8c564b", "#e377c2", "#7f7f7f",
                "#bcbd22", "#17becf"
            ],
            "sequential": {
                "blue": ["#f7fbff", "#deebf7", "#c6dbef", "#9ecae1", "#6baed6", "#4292c6", "#2171b5", "#08519c", "#08306b"],
                "green": ["#f7fcf5", "#e5f5e0", "#c7e9c0", "#a1d99b", "#74c476", "#41ab5d", "#238b45", "#006d2c", "#00441b"],
                "red": ["#fff5f0", "#fee0d2", "#fcbba1", "#fc9272", "#fb6a4a", "#ef3b2c", "#cb181d", "#a50f15", "#67000d"]
            }
        }
    
    def get_chart_recommendations(self, data_type: str, data_characteristics: Dict) -> Dict:
        """获取图表类型推荐"""
        recommendations = {
            "time_series": {
                "primary": "time_series",
                "alternatives": ["line", "area"],
                "use_cases": ["监控指标", "趋势分析", "性能跟踪"]
            },
            "categorical": {
                "primary": "bar",
                "alternatives": ["pie", "donut"],
                "use_cases": ["分类统计", "比例分析", "排名显示"]
            },
            "distribution": {
                "primary": "histogram",
                "alternatives": ["heatmap", "stat"],
                "use_cases": ["数据分布", "频率分析", "异常检测"]
            },
            "correlation": {
                "primary": "scatter",
                "alternatives": ["heatmap"],
                "use_cases": ["相关性分析", "聚类分析", "异常点识别"]
            },
            "geographic": {
                "primary": "geomap",
                "alternatives": ["worldmap"],
                "use_cases": ["地理分布", "区域分析", "位置跟踪"]
            },
            "hierarchical": {
                "primary": "treemap",
                "alternatives": ["sunburst"],
                "use_cases": ["层次结构", "组织架构", "文件系统"]
            }
        }
        
        # 根据数据特征调整推荐
        data_size = data_characteristics.get("size", "medium")
        update_frequency = data_characteristics.get("update_frequency", "medium")
        
        if data_size == "large":
            # 大数据集建议使用聚合视图
            for rec in recommendations.values():
                if "heatmap" not in rec["alternatives"]:
                    rec["alternatives"].append("heatmap")
        
        if update_frequency == "high":
            # 高频更新建议使用实时图表
            for rec in recommendations.values():
                rec["considerations"] = rec.get("considerations", []) + ["考虑使用实时刷新"]
        
        return recommendations.get(data_type, recommendations["time_series"])
    
    def generate_color_palette(self, chart_type: str, data_count: int) -> List[str]:
        """生成颜色调色板"""
        if chart_type in ["stat", "gauge"]:
            return [self.color_schemes["status"]["info"]]
        
        elif chart_type in ["bar", "pie", "donut"]:
            if data_count <= len(self.color_schemes["categorical"]):
                return self.color_schemes["categorical"][:data_count]
            else:
                # 生成更多颜色
                import colorsys
                colors = []
                for i in range(data_count):
                    hue = i / data_count
                    rgb = colorsys.hsv_to_rgb(hue, 0.7, 0.9)
                    hex_color = "#{:02x}{:02x}{:02x}".format(
                        int(rgb[0] * 255), int(rgb[1] * 255), int(rgb[2] * 255)
                    )
                    colors.append(hex_color)
                return colors
        
        elif chart_type == "heatmap":
            return self.color_schemes["sequential"]["blue"]
        
        else:
            return self.color_schemes["categorical"][:data_count]
    
    def create_dashboard_layout(self, panels: List[Dict], screen_size: str = "desktop") -> Dict:
        """创建仪表板布局"""
        layouts = {
            "mobile": {"grid_width": 12, "panel_height": 8, "margin": 1},
            "tablet": {"grid_width": 16, "panel_height": 6, "margin": 2},
            "desktop": {"grid_width": 24, "panel_height": 8, "margin": 2},
            "large": {"grid_width": 32, "panel_height": 10, "margin": 3}
        }
        
        layout_config = layouts.get(screen_size, layouts["desktop"])
        grid_width = layout_config["grid_width"]
        panel_height = layout_config["panel_height"]
        margin = layout_config["margin"]
        
        positioned_panels = []
        current_x = 0
        current_y = 0
        
        for i, panel in enumerate(panels):
            panel_width = panel.get("width", grid_width // 2)
            
            # 检查是否需要换行
            if current_x + panel_width > grid_width:
                current_x = 0
                current_y += panel_height + margin
            
            positioned_panel = {
                **panel,
                "gridPos": {
                    "x": current_x,
                    "y": current_y,
                    "w": panel_width,
                    "h": panel.get("height", panel_height)
                }
            }
            
            positioned_panels.append(positioned_panel)
            current_x += panel_width + margin
        
        return {
            "panels": positioned_panels,
            "layout_config": layout_config,
            "total_height": current_y + panel_height
        }
    
    def validate_design(self, dashboard_config: Dict) -> Dict:
        """验证设计质量"""
        validation = {
            "score": 0,
            "max_score": 100,
            "issues": [],
            "suggestions": [],
            "strengths": []
        }
        
        panels = dashboard_config.get("panels", [])
        
        # 检查面板数量
        panel_count = len(panels)
        if panel_count <= 6:
            validation["score"] += 20
            validation["strengths"].append("面板数量适中,易于理解")
        elif panel_count <= 12:
            validation["score"] += 10
            validation["suggestions"].append("考虑将相关面板分组或使用多个仪表板")
        else:
            validation["issues"].append("面板过多,可能导致信息过载")
        
        # 检查颜色使用
        colors_used = set()
        for panel in panels:
            panel_colors = panel.get("fieldConfig", {}).get("defaults", {}).get("color", {})
            if "fixedColor" in panel_colors:
                colors_used.add(panel_colors["fixedColor"])
        
        if len(colors_used) <= 5:
            validation["score"] += 15
            validation["strengths"].append("颜色使用克制,保持一致性")
        else:
            validation["suggestions"].append("减少颜色种类,提高视觉一致性")
        
        # 检查标题和描述
        panels_with_titles = sum(1 for panel in panels if panel.get("title"))
        if panels_with_titles == panel_count:
            validation["score"] += 15
            validation["strengths"].append("所有面板都有清晰的标题")
        else:
            validation["issues"].append(f"有{panel_count - panels_with_titles}个面板缺少标题")
        
        # 检查布局合理性
        total_width = sum(panel.get("gridPos", {}).get("w", 0) for panel in panels)
        if total_width > 0:
            validation["score"] += 20
            validation["strengths"].append("布局结构合理")
        
        # 检查数据源多样性
        datasources = set()
        for panel in panels:
            for target in panel.get("targets", []):
                if "datasource" in target:
                    datasources.add(target["datasource"].get("type", "unknown"))
        
        if len(datasources) <= 3:
            validation["score"] += 10
            validation["strengths"].append("数据源集中,便于管理")
        else:
            validation["suggestions"].append("考虑减少数据源种类或分离到不同仪表板")
        
        # 检查刷新间隔
        refresh_interval = dashboard_config.get("refresh", "5s")
        if refresh_interval in ["30s", "1m", "5m"]:
            validation["score"] += 10
            validation["strengths"].append("刷新间隔设置合理")
        elif refresh_interval in ["1s", "5s"]:
            validation["suggestions"].append("考虑增加刷新间隔以减少服务器负载")
        
        # 检查时间范围
        time_range = dashboard_config.get("time", {}).get("from", "now-1h")
        if "now-" in time_range:
            validation["score"] += 10
            validation["strengths"].append("使用相对时间范围,便于实时监控")
        
        return validation

# 使用示例
design_guide = VisualizationDesignGuide()

# 获取图表推荐
recommendation = design_guide.get_chart_recommendations(
    "time_series",
    {"size": "large", "update_frequency": "high"}
)
print("图表推荐:", recommendation)

# 生成颜色调色板
colors = design_guide.generate_color_palette("bar", 5)
print("颜色调色板:", colors)

# 创建布局
panels = [
    {"title": "CPU使用率", "width": 12, "height": 8},
    {"title": "内存使用率", "width": 12, "height": 8},
    {"title": "网络流量", "width": 24, "height": 6}
]
layout = design_guide.create_dashboard_layout(panels, "desktop")
print("布局配置:", layout)

2. 响应式设计

class ResponsiveDesignManager:
    """响应式设计管理器"""
    
    def __init__(self):
        self.breakpoints = {
            "xs": {"min_width": 0, "max_width": 575, "grid_columns": 12},
            "sm": {"min_width": 576, "max_width": 767, "grid_columns": 12},
            "md": {"min_width": 768, "max_width": 991, "grid_columns": 16},
            "lg": {"min_width": 992, "max_width": 1199, "grid_columns": 20},
            "xl": {"min_width": 1200, "max_width": 1599, "grid_columns": 24},
            "xxl": {"min_width": 1600, "max_width": float('inf'), "grid_columns": 32}
        }
        
        self.panel_sizes = {
            "small": {"xs": 12, "sm": 12, "md": 8, "lg": 6, "xl": 6, "xxl": 4},
            "medium": {"xs": 12, "sm": 12, "md": 16, "lg": 10, "xl": 12, "xxl": 8},
            "large": {"xs": 12, "sm": 12, "md": 16, "lg": 20, "xl": 24, "xxl": 16},
            "full": {"xs": 12, "sm": 12, "md": 16, "lg": 20, "xl": 24, "xxl": 32}
        }
    
    def generate_responsive_layout(self, panels: List[Dict]) -> Dict:
        """生成响应式布局"""
        responsive_panels = []
        
        for panel in panels:
            panel_size = panel.get("size", "medium")
            responsive_panel = {
                **panel,
                "responsive": {
                    breakpoint: {
                        "gridPos": {
                            "w": self.panel_sizes[panel_size][breakpoint],
                            "h": self._get_responsive_height(panel, breakpoint)
                        }
                    }
                    for breakpoint in self.breakpoints.keys()
                }
            }
            responsive_panels.append(responsive_panel)
        
        return {
            "panels": responsive_panels,
            "breakpoints": self.breakpoints
        }
    
    def _get_responsive_height(self, panel: Dict, breakpoint: str) -> int:
        """获取响应式高度"""
        base_height = panel.get("height", 8)
        
        # 在小屏幕上增加高度以保持可读性
        if breakpoint in ["xs", "sm"]:
            return min(base_height + 2, 12)
        elif breakpoint == "md":
            return base_height + 1
        else:
            return base_height
    
    def optimize_for_mobile(self, dashboard_config: Dict) -> Dict:
        """移动端优化"""
        mobile_config = dashboard_config.copy()
        
        # 调整刷新间隔
        mobile_config["refresh"] = "1m"  # 减少移动端刷新频率
        
        # 简化面板
        for panel in mobile_config.get("panels", []):
            # 移除复杂的转换
            if len(panel.get("transformations", [])) > 2:
                panel["transformations"] = panel["transformations"][:2]
            
            # 简化图例
            if "legend" in panel.get("options", {}):
                panel["options"]["legend"]["displayMode"] = "list"
                panel["options"]["legend"]["placement"] = "bottom"
            
            # 调整字体大小
            if "fieldConfig" in panel:
                panel["fieldConfig"]["defaults"]["custom"] = panel["fieldConfig"]["defaults"].get("custom", {})
                panel["fieldConfig"]["defaults"]["custom"]["textSize"] = {"title": 14, "value": 16}
        
        return mobile_config
    
    def create_mobile_dashboard(self, desktop_dashboard: Dict) -> Dict:
        """创建移动端专用仪表板"""
        mobile_dashboard = {
            "title": desktop_dashboard.get("title", "") + " (Mobile)",
            "tags": desktop_dashboard.get("tags", []) + ["mobile"],
            "refresh": "1m",
            "time": desktop_dashboard.get("time", {}),
            "panels": []
        }
        
        # 选择最重要的面板
        important_panels = []
        for panel in desktop_dashboard.get("panels", []):
            priority = panel.get("priority", "medium")
            if priority in ["high", "critical"]:
                important_panels.append(panel)
        
        # 如果重要面板不足,添加其他面板
        if len(important_panels) < 4:
            for panel in desktop_dashboard.get("panels", []):
                if panel not in important_panels and len(important_panels) < 6:
                    important_panels.append(panel)
        
        # 重新布局面板
        for i, panel in enumerate(important_panels):
            mobile_panel = {
                **panel,
                "gridPos": {
                    "x": 0,
                    "y": i * 10,
                    "w": 12,
                    "h": 8
                }
            }
            
            # 简化面板配置
            if mobile_panel.get("type") == "graph":
                mobile_panel["type"] = "timeseries"
            
            mobile_dashboard["panels"].append(mobile_panel)
        
        return mobile_dashboard

# 使用示例
responsive_manager = ResponsiveDesignManager()

# 生成响应式布局
panels = [
    {"title": "CPU", "size": "small", "height": 6, "priority": "high"},
    {"title": "Memory", "size": "small", "height": 6, "priority": "high"},
    {"title": "Network", "size": "large", "height": 8, "priority": "medium"}
]

responsive_layout = responsive_manager.generate_responsive_layout(panels)
print("响应式布局:", responsive_layout["panels"][0]["responsive"])

# 创建移动端仪表板
desktop_dashboard = {
    "title": "系统监控",
    "panels": panels
}

mobile_dashboard = responsive_manager.create_mobile_dashboard(desktop_dashboard)
print("移动端仪表板:", mobile_dashboard["title"])

性能优化

1. 查询优化

class DashboardPerformanceOptimizer:
    """仪表板性能优化器"""
    
    def __init__(self):
        self.optimization_rules = {
            "query_optimization": {
                "reduce_time_range": "减少查询时间范围",
                "add_filters": "添加标签过滤器",
                "use_recording_rules": "使用记录规则",
                "optimize_aggregation": "优化聚合函数"
            },
            "refresh_optimization": {
                "increase_interval": "增加刷新间隔",
                "conditional_refresh": "条件刷新",
                "cache_queries": "缓存查询结果"
            },
            "panel_optimization": {
                "reduce_panel_count": "减少面板数量",
                "simplify_visualizations": "简化可视化",
                "lazy_loading": "延迟加载"
            }
        }
    
    def analyze_dashboard_performance(self, dashboard_config: Dict) -> Dict:
        """分析仪表板性能"""
        analysis = {
            "performance_score": 0,
            "max_score": 100,
            "bottlenecks": [],
            "recommendations": [],
            "metrics": {}
        }
        
        panels = dashboard_config.get("panels", [])
        
        # 分析面板数量
        panel_count = len(panels)
        if panel_count <= 6:
            analysis["performance_score"] += 25
        elif panel_count <= 12:
            analysis["performance_score"] += 15
            analysis["recommendations"].append("考虑减少面板数量或使用分页")
        else:
            analysis["bottlenecks"].append(f"面板过多({panel_count}个),可能影响加载性能")
        
        analysis["metrics"]["panel_count"] = panel_count
        
        # 分析查询复杂度
        complex_queries = 0
        total_queries = 0
        
        for panel in panels:
            for target in panel.get("targets", []):
                total_queries += 1
                query = target.get("expr", "")
                
                # 检查查询复杂度
                if self._is_complex_query(query):
                    complex_queries += 1
        
        if total_queries > 0:
            complexity_ratio = complex_queries / total_queries
            if complexity_ratio <= 0.3:
                analysis["performance_score"] += 20
            elif complexity_ratio <= 0.6:
                analysis["performance_score"] += 10
                analysis["recommendations"].append("优化复杂查询以提高性能")
            else:
                analysis["bottlenecks"].append(f"复杂查询比例过高({complexity_ratio:.1%})")
        
        analysis["metrics"]["query_complexity"] = {
            "total": total_queries,
            "complex": complex_queries,
            "ratio": complex_queries / max(total_queries, 1)
        }
        
        # 分析刷新间隔
        refresh_interval = dashboard_config.get("refresh", "5s")
        if refresh_interval in ["30s", "1m", "5m"]:
            analysis["performance_score"] += 20
        elif refresh_interval in ["10s", "15s"]:
            analysis["performance_score"] += 10
            analysis["recommendations"].append("考虑增加刷新间隔")
        else:
            analysis["bottlenecks"].append(f"刷新间隔过短({refresh_interval})")
        
        analysis["metrics"]["refresh_interval"] = refresh_interval
        
        # 分析时间范围
        time_range = dashboard_config.get("time", {}).get("from", "now-1h")
        if "now-" in time_range:
            range_value = time_range.replace("now-", "")
            if range_value in ["15m", "30m", "1h"]:
                analysis["performance_score"] += 15
            elif range_value in ["3h", "6h", "12h"]:
                analysis["performance_score"] += 10
                analysis["recommendations"].append("考虑减少默认时间范围")
            else:
                analysis["recommendations"].append("时间范围可能过大,影响查询性能")
        
        analysis["metrics"]["time_range"] = time_range
        
        # 分析数据转换
        transformation_count = 0
        for panel in panels:
            transformation_count += len(panel.get("transformations", []))
        
        if transformation_count <= panel_count:
            analysis["performance_score"] += 10
        else:
            analysis["recommendations"].append("减少数据转换以提高性能")
        
        analysis["metrics"]["transformations"] = transformation_count
        
        # 分析数据源分布
        datasources = set()
        for panel in panels:
            for target in panel.get("targets", []):
                if "datasource" in target:
                    datasources.add(target["datasource"].get("uid", "unknown"))
        
        if len(datasources) <= 2:
            analysis["performance_score"] += 10
        elif len(datasources) <= 4:
            analysis["performance_score"] += 5
        else:
            analysis["recommendations"].append("考虑减少数据源数量")
        
        analysis["metrics"]["datasource_count"] = len(datasources)
        
        return analysis
    
    def _is_complex_query(self, query: str) -> bool:
        """判断查询是否复杂"""
        if not query:
            return False
        
        complexity_indicators = [
            "rate(" in query and "[" in query,  # 有rate函数
            query.count("(") > 3,  # 嵌套函数多
            "by (" in query or "without (" in query,  # 有分组
            "or" in query or "and" in query,  # 有逻辑运算
            "offset" in query,  # 有时间偏移
            len(query) > 100  # 查询字符串长
        ]
        
        return sum(complexity_indicators) >= 2
    
    def generate_optimization_plan(self, dashboard_config: Dict) -> Dict:
        """生成优化计划"""
        analysis = self.analyze_dashboard_performance(dashboard_config)
        
        optimization_plan = {
            "current_score": analysis["performance_score"],
            "target_score": 85,
            "optimizations": [],
            "estimated_improvement": 0
        }
        
        # 基于分析结果生成优化建议
        if analysis["metrics"]["panel_count"] > 8:
            optimization_plan["optimizations"].append({
                "type": "reduce_panels",
                "description": "减少面板数量或使用分页",
                "impact": "high",
                "effort": "medium",
                "estimated_gain": 15
            })
        
        if analysis["metrics"]["query_complexity"]["ratio"] > 0.5:
            optimization_plan["optimizations"].append({
                "type": "optimize_queries",
                "description": "优化复杂查询,使用记录规则",
                "impact": "high",
                "effort": "high",
                "estimated_gain": 20
            })
        
        if analysis["metrics"]["refresh_interval"] in ["1s", "5s"]:
            optimization_plan["optimizations"].append({
                "type": "increase_refresh",
                "description": "增加刷新间隔到30s或1m",
                "impact": "medium",
                "effort": "low",
                "estimated_gain": 10
            })
        
        if analysis["metrics"]["transformations"] > analysis["metrics"]["panel_count"] * 2:
            optimization_plan["optimizations"].append({
                "type": "reduce_transformations",
                "description": "减少数据转换,在查询层面处理",
                "impact": "medium",
                "effort": "medium",
                "estimated_gain": 8
            })
        
        if analysis["metrics"]["datasource_count"] > 3:
            optimization_plan["optimizations"].append({
                "type": "consolidate_datasources",
                "description": "整合数据源或分离仪表板",
                "impact": "low",
                "effort": "high",
                "estimated_gain": 5
            })
        
        # 计算总的预期改进
        optimization_plan["estimated_improvement"] = sum(
            opt["estimated_gain"] for opt in optimization_plan["optimizations"]
        )
        
        return optimization_plan
    
    def apply_optimizations(self, dashboard_config: Dict, optimizations: List[str]) -> Dict:
        """应用优化"""
        optimized_config = dashboard_config.copy()
        
        for optimization in optimizations:
            if optimization == "increase_refresh":
                current_refresh = optimized_config.get("refresh", "5s")
                if current_refresh in ["1s", "5s"]:
                    optimized_config["refresh"] = "30s"
                elif current_refresh in ["10s", "15s"]:
                    optimized_config["refresh"] = "1m"
            
            elif optimization == "reduce_transformations":
                for panel in optimized_config.get("panels", []):
                    transformations = panel.get("transformations", [])
                    if len(transformations) > 2:
                        # 保留最重要的转换
                        panel["transformations"] = transformations[:2]
            
            elif optimization == "optimize_queries":
                for panel in optimized_config.get("panels", []):
                    for target in panel.get("targets", []):
                        query = target.get("expr", "")
                        if self._is_complex_query(query):
                            # 简化查询(示例)
                            if "rate(" in query and "[1s]" in query:
                                target["expr"] = query.replace("[1s]", "[5s]")
            
            elif optimization == "reduce_panels":
                panels = optimized_config.get("panels", [])
                if len(panels) > 8:
                    # 保留前8个面板
                    optimized_config["panels"] = panels[:8]
                    optimized_config["tags"] = optimized_config.get("tags", []) + ["optimized"]
        
        return optimized_config
    
    def create_performance_report(self, dashboard_config: Dict) -> str:
        """创建性能报告"""
        analysis = self.analyze_dashboard_performance(dashboard_config)
        plan = self.generate_optimization_plan(dashboard_config)
        
        report = f"""
# 仪表板性能分析报告

## 当前性能评分
**{analysis['performance_score']}/100**

## 关键指标
- 面板数量: {analysis['metrics']['panel_count']}
- 查询复杂度: {analysis['metrics']['query_complexity']['ratio']:.1%}
- 刷新间隔: {analysis['metrics']['refresh_interval']}
- 数据转换: {analysis['metrics']['transformations']}
- 数据源数量: {analysis['metrics']['datasource_count']}

## 性能瓶颈
"""
        
        for bottleneck in analysis['bottlenecks']:
            report += f"- {bottleneck}\n"
        
        report += "\n## 优化建议\n"
        for rec in analysis['recommendations']:
            report += f"- {rec}\n"
        
        report += "\n## 优化计划\n"
        for opt in plan['optimizations']:
            report += f"- **{opt['description']}** (影响: {opt['impact']}, 工作量: {opt['effort']}, 预期提升: {opt['estimated_gain']}分)\n"
        
        report += f"\n## 预期改进\n"
        report += f"实施所有优化后,预期性能评分可提升至: **{analysis['performance_score'] + plan['estimated_improvement']}/100**"
        
        return report

# 使用示例
optimizer = DashboardPerformanceOptimizer()

# 分析仪表板性能
dashboard_config = {
    "panels": [
        {"targets": [{"expr": "rate(http_requests_total[1s])"}], "transformations": [{}, {}, {}]},
        {"targets": [{"expr": "sum(rate(cpu_usage[5m])) by (instance)"}]},
        # ... 更多面板
    ],
    "refresh": "5s",
    "time": {"from": "now-6h"}
}

analysis = optimizer.analyze_dashboard_performance(dashboard_config)
print("性能分析:", analysis["performance_score"])

# 生成优化计划
plan = optimizer.generate_optimization_plan(dashboard_config)
print("优化计划:", len(plan["optimizations"]), "项优化")

# 创建性能报告
report = optimizer.create_performance_report(dashboard_config)
print("性能报告已生成")

2. 缓存策略

class DashboardCacheManager:
    """仪表板缓存管理器"""
    
    def __init__(self):
        self.cache_strategies = {
            "query_cache": {
                "description": "查询结果缓存",
                "ttl_options": ["30s", "1m", "5m", "15m", "1h"],
                "suitable_for": ["静态数据", "历史数据", "聚合数据"]
            },
            "dashboard_cache": {
                "description": "仪表板配置缓存",
                "ttl_options": ["5m", "15m", "1h", "24h"],
                "suitable_for": ["稳定配置", "模板仪表板"]
            },
            "data_source_cache": {
                "description": "数据源连接缓存",
                "ttl_options": ["1m", "5m", "15m"],
                "suitable_for": ["远程数据源", "API数据源"]
            }
        }
    
    def recommend_cache_strategy(self, panel_config: Dict) -> Dict:
        """推荐缓存策略"""
        recommendations = {
            "query_cache": None,
            "refresh_cache": None,
            "reasoning": []
        }
        
        # 分析查询特征
        targets = panel_config.get("targets", [])
        if not targets:
            return recommendations
        
        for target in targets:
            query = target.get("expr", "")
            datasource_type = target.get("datasource", {}).get("type", "")
            
            # 基于查询类型推荐缓存
            if "rate(" in query:
                recommendations["query_cache"] = "1m"
                recommendations["reasoning"].append("rate查询适合短期缓存")
            elif "sum(" in query or "avg(" in query:
                recommendations["query_cache"] = "5m"
                recommendations["reasoning"].append("聚合查询适合中期缓存")
            elif "histogram_quantile" in query:
                recommendations["query_cache"] = "30s"
                recommendations["reasoning"].append("分位数查询计算复杂,适合缓存")
            
            # 基于数据源类型推荐缓存
            if datasource_type == "mysql":
                recommendations["query_cache"] = "5m"
                recommendations["reasoning"].append("MySQL查询适合中期缓存")
            elif datasource_type == "elasticsearch":
                recommendations["query_cache"] = "1m"
                recommendations["reasoning"].append("Elasticsearch查询适合短期缓存")
        
        # 基于刷新间隔推荐缓存
        refresh_interval = panel_config.get("interval", "")
        if refresh_interval:
            if refresh_interval in ["1m", "5m"]:
                recommendations["refresh_cache"] = refresh_interval
                recommendations["reasoning"].append(f"基于{refresh_interval}刷新间隔设置缓存")
        
        return recommendations
    
    def generate_cache_config(self, dashboard_config: Dict) -> Dict:
        """生成缓存配置"""
        cache_config = {
            "global_settings": {
                "enable_query_cache": True,
                "default_cache_ttl": "1m",
                "max_cache_size": "100MB"
            },
            "panel_cache_settings": {},
            "datasource_cache_settings": {}
        }
        
        # 为每个面板生成缓存设置
        for i, panel in enumerate(dashboard_config.get("panels", [])):
            panel_id = panel.get("id", f"panel_{i}")
            recommendations = self.recommend_cache_strategy(panel)
            
            if recommendations["query_cache"]:
                cache_config["panel_cache_settings"][panel_id] = {
                    "query_cache_ttl": recommendations["query_cache"],
                    "enable_cache": True
                }
        
        # 为数据源生成缓存设置
        datasources = set()
        for panel in dashboard_config.get("panels", []):
            for target in panel.get("targets", []):
                if "datasource" in target:
                    ds_uid = target["datasource"].get("uid")
                    ds_type = target["datasource"].get("type")
                    if ds_uid:
                        datasources.add((ds_uid, ds_type))
        
        for ds_uid, ds_type in datasources:
            if ds_type == "prometheus":
                cache_config["datasource_cache_settings"][ds_uid] = {
                    "connection_cache_ttl": "5m",
                    "query_cache_ttl": "1m"
                }
            elif ds_type == "mysql":
                cache_config["datasource_cache_settings"][ds_uid] = {
                    "connection_cache_ttl": "15m",
                    "query_cache_ttl": "5m"
                }
        
        return cache_config
    
    def estimate_cache_benefits(self, dashboard_config: Dict, cache_config: Dict) -> Dict:
        """估算缓存收益"""
        benefits = {
            "query_reduction": 0,
            "load_time_improvement": 0,
            "server_load_reduction": 0,
            "cost_savings": 0
        }
        
        panels = dashboard_config.get("panels", [])
        refresh_interval = dashboard_config.get("refresh", "30s")
        
        # 计算查询减少量
        total_queries = len(panels) * len(dashboard_config.get("panels", [{}])[0].get("targets", [{}]))
        
        # 基于缓存TTL计算查询减少
        if refresh_interval == "30s":
            queries_per_hour = total_queries * 120  # 每小时查询次数
        elif refresh_interval == "1m":
            queries_per_hour = total_queries * 60
        else:
            queries_per_hour = total_queries * 12  # 假设5分钟刷新
        
        # 估算缓存命中率
        cache_hit_rate = 0.7  # 假设70%缓存命中率
        benefits["query_reduction"] = queries_per_hour * cache_hit_rate
        
        # 估算加载时间改进
        avg_query_time = 200  # 假设平均查询时间200ms
        cached_query_time = 10  # 缓存查询时间10ms
        benefits["load_time_improvement"] = (avg_query_time - cached_query_time) * cache_hit_rate
        
        # 估算服务器负载减少
        benefits["server_load_reduction"] = benefits["query_reduction"] * 0.1  # 假设每个查询占用0.1%CPU
        
        # 估算成本节省(基于云服务计费)
        cost_per_query = 0.001  # 假设每个查询成本0.001元
        benefits["cost_savings"] = benefits["query_reduction"] * cost_per_query * 24  # 每日节省
        
        return benefits

# 使用示例
cache_manager = DashboardCacheManager()

# 推荐缓存策略
panel_config = {
    "targets": [
        {"expr": "rate(http_requests_total[5m])", "datasource": {"type": "prometheus"}}
    ],
    "interval": "1m"
}

recommendation = cache_manager.recommend_cache_strategy(panel_config)
print("缓存推荐:", recommendation)

# 生成缓存配置
dashboard_config = {
    "panels": [panel_config],
    "refresh": "30s"
}

cache_config = cache_manager.generate_cache_config(dashboard_config)
print("缓存配置:", cache_config["global_settings"])

# 估算缓存收益
benefits = cache_manager.estimate_cache_benefits(dashboard_config, cache_config)
print("缓存收益:", benefits)

总结

关键要点

  1. 仪表板设计原则

    • 清晰性:信息传达要清晰明确
    • 简洁性:去除不必要的元素
    • 一致性:保持设计统一
    • 可访问性:确保所有用户都能理解
  2. 面板类型选择

    • 时间序列数据:使用时间序列图或折线图
    • 分类数据:使用柱状图或饼图
    • 分布数据:使用直方图或热力图
    • 地理数据:使用地图可视化
  3. 查询优化

    • 使用合适的时间范围和聚合函数
    • 添加标签过滤器减少查询范围
    • 验证和优化查询语法
    • 使用记录规则处理复杂查询
  4. 数据转换

    • 合理使用数据转换功能
    • 避免过度转换影响性能
    • 优先在查询层面处理数据
  5. 响应式设计

    • 考虑不同屏幕尺寸的显示效果
    • 为移动端优化仪表板
    • 使用合适的布局和字体大小
  6. 性能优化

    • 控制面板数量和查询复杂度
    • 设置合理的刷新间隔
    • 使用缓存策略减少服务器负载
    • 监控和分析仪表板性能

最佳实践

  1. 设计阶段

    • 明确仪表板目标和受众
    • 选择合适的可视化类型
    • 保持设计一致性和简洁性
    • 考虑用户体验和可访问性
  2. 开发阶段

    • 优化查询性能
    • 合理使用数据转换
    • 实现响应式布局
    • 添加适当的交互功能
  3. 部署阶段

    • 配置缓存策略
    • 设置合理的刷新间隔
    • 监控仪表板性能
    • 收集用户反馈
  4. 维护阶段

    • 定期审查和优化
    • 更新过时的查询和配置
    • 根据需求调整设计
    • 保持文档更新

下一步学习建议

  1. 深入学习特定可视化类型

    • 掌握高级图表配置
    • 学习自定义可视化插件
    • 了解数据可视化理论
  2. 实践项目

    • 创建业务监控仪表板
    • 构建系统性能仪表板
    • 开发用户行为分析仪表板
  3. 技术拓展

    • 学习Grafana插件开发
    • 掌握高级查询技巧
    • 了解可视化最佳实践
  4. 工具集成

    • 与告警系统集成
    • 自动化仪表板部署
    • 集成外部数据源

通过本章的学习,你应该能够创建功能完善、性能优良的Grafana仪表板,为数据可视化和监控提供强有力的支持。