概述

数据源是Grafana的核心组件,它定义了Grafana如何连接和查询外部数据系统。Grafana支持60多种不同类型的数据源,从时序数据库到关系数据库,从日志系统到云服务监控。正确配置数据源是创建有效仪表板的基础。

学习目标

通过本章学习,你将掌握: - 理解数据源的类型和特点 - 配置常用数据源(Prometheus、MySQL、InfluxDB等) - 掌握数据源的高级配置选项 - 学会数据源的测试和故障排除 - 了解数据源的安全配置 - 掌握数据源的批量管理和自动化配置

数据源类型概览

from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Optional, Any
import json

class DataSourceCategory(Enum):
    """数据源分类"""
    TIME_SERIES = "time_series"  # 时序数据库
    RELATIONAL = "relational"    # 关系数据库
    DOCUMENT = "document"        # 文档数据库
    LOGGING = "logging"          # 日志系统
    CLOUD = "cloud"              # 云服务
    MONITORING = "monitoring"    # 监控系统
    ANALYTICS = "analytics"      # 分析平台
    OTHER = "other"              # 其他

class DataSourceType(Enum):
    """具体数据源类型"""
    # 时序数据库
    PROMETHEUS = "prometheus"
    INFLUXDB = "influxdb"
    GRAPHITE = "graphite"
    OPENTSDB = "opentsdb"
    TIMESCALEDB = "postgres"  # 使用PostgreSQL插件
    
    # 关系数据库
    MYSQL = "mysql"
    POSTGRESQL = "postgres"
    MSSQL = "mssql"
    ORACLE = "oracle"
    
    # 文档数据库
    ELASTICSEARCH = "elasticsearch"
    MONGODB = "mongodb"
    
    # 日志系统
    LOKI = "loki"
    SPLUNK = "splunk"
    
    # 云服务
    CLOUDWATCH = "cloudwatch"
    AZURE_MONITOR = "azuremonitor"
    GOOGLE_CLOUD = "stackdriver"
    
    # 其他
    JSON = "simplejson"
    CSV = "marcusolsson-csv-datasource"
    ZABBIX = "alexanderzobnin-zabbix-datasource"

@dataclass
class DataSourceConfig:
    """数据源配置"""
    name: str
    type: DataSourceType
    url: str
    access: str = "proxy"  # proxy 或 direct
    is_default: bool = False
    basic_auth: bool = False
    basic_auth_user: str = ""
    basic_auth_password: str = ""
    with_credentials: bool = False
    json_data: Dict[str, Any] = None
    secure_json_data: Dict[str, str] = None
    version: int = 1
    read_only: bool = False
    
    def __post_init__(self):
        if self.json_data is None:
            self.json_data = {}
        if self.secure_json_data is None:
            self.secure_json_data = {}

class DataSourceManager:
    """数据源管理器"""
    
    def __init__(self):
        self.data_sources = []
        self.supported_types = {
            DataSourceType.PROMETHEUS: {
                "category": DataSourceCategory.TIME_SERIES,
                "description": "开源监控和告警工具包",
                "query_language": "PromQL",
                "default_port": 9090,
                "features": ["metrics", "alerting", "annotations"]
            },
            DataSourceType.INFLUXDB: {
                "category": DataSourceCategory.TIME_SERIES,
                "description": "时序数据库",
                "query_language": "InfluxQL/Flux",
                "default_port": 8086,
                "features": ["metrics", "logs", "annotations"]
            },
            DataSourceType.MYSQL: {
                "category": DataSourceCategory.RELATIONAL,
                "description": "关系型数据库",
                "query_language": "SQL",
                "default_port": 3306,
                "features": ["table", "annotations"]
            },
            DataSourceType.POSTGRESQL: {
                "category": DataSourceCategory.RELATIONAL,
                "description": "高级关系型数据库",
                "query_language": "SQL",
                "default_port": 5432,
                "features": ["table", "annotations", "time_series"]
            },
            DataSourceType.ELASTICSEARCH: {
                "category": DataSourceCategory.LOGGING,
                "description": "搜索和分析引擎",
                "query_language": "Lucene/KQL",
                "default_port": 9200,
                "features": ["logs", "metrics", "annotations"]
            },
            DataSourceType.LOKI: {
                "category": DataSourceCategory.LOGGING,
                "description": "日志聚合系统",
                "query_language": "LogQL",
                "default_port": 3100,
                "features": ["logs", "annotations"]
            },
            DataSourceType.CLOUDWATCH: {
                "category": DataSourceCategory.CLOUD,
                "description": "AWS监控服务",
                "query_language": "CloudWatch Query",
                "default_port": 443,
                "features": ["metrics", "logs", "annotations"]
            }
        }
    
    def get_supported_types(self, category: DataSourceCategory = None) -> List[Dict]:
        """获取支持的数据源类型"""
        if category:
            return [
                {"type": ds_type, **info} 
                for ds_type, info in self.supported_types.items() 
                if info["category"] == category
            ]
        return [{"type": ds_type, **info} for ds_type, info in self.supported_types.items()]
    
    def create_data_source(self, config: DataSourceConfig) -> Dict:
        """创建数据源"""
        if config.type not in self.supported_types:
            raise ValueError(f"不支持的数据源类型: {config.type}")
        
        data_source = {
            "name": config.name,
            "type": config.type.value,
            "url": config.url,
            "access": config.access,
            "isDefault": config.is_default,
            "basicAuth": config.basic_auth,
            "basicAuthUser": config.basic_auth_user,
            "withCredentials": config.with_credentials,
            "jsonData": config.json_data,
            "version": config.version,
            "readOnly": config.read_only
        }
        
        if config.basic_auth and config.basic_auth_password:
            data_source["basicAuthPassword"] = config.basic_auth_password
        
        if config.secure_json_data:
            data_source["secureJsonData"] = config.secure_json_data
        
        self.data_sources.append(data_source)
        return data_source
    
    def get_connection_template(self, ds_type: DataSourceType) -> Dict:
        """获取数据源连接模板"""
        templates = {
            DataSourceType.PROMETHEUS: {
                "url": "http://localhost:9090",
                "json_data": {
                    "httpMethod": "POST",
                    "queryTimeout": "60s",
                    "timeInterval": "15s"
                }
            },
            DataSourceType.INFLUXDB: {
                "url": "http://localhost:8086",
                "json_data": {
                    "database": "mydb",
                    "httpMode": "GET",
                    "version": "1.8"
                }
            },
            DataSourceType.MYSQL: {
                "url": "localhost:3306",
                "json_data": {
                    "database": "grafana",
                    "maxOpenConns": 0,
                    "maxIdleConns": 2,
                    "connMaxLifetime": 14400
                }
            },
            DataSourceType.POSTGRESQL: {
                "url": "localhost:5432",
                "json_data": {
                    "database": "postgres",
                    "sslmode": "disable",
                    "maxOpenConns": 0,
                    "maxIdleConns": 2,
                    "connMaxLifetime": 14400,
                    "postgresVersion": 1200,
                    "timescaledb": False
                }
            },
            DataSourceType.ELASTICSEARCH: {
                "url": "http://localhost:9200",
                "json_data": {
                    "esVersion": "7.10.0",
                    "timeField": "@timestamp",
                    "interval": "Daily",
                    "logMessageField": "message",
                    "logLevelField": "level"
                }
            },
            DataSourceType.LOKI: {
                "url": "http://localhost:3100",
                "json_data": {
                    "maxLines": 1000,
                    "derivedFields": []
                }
            }
        }
        return templates.get(ds_type, {})
    
    def validate_connection(self, config: DataSourceConfig) -> Dict:
        """验证数据源连接"""
        # 模拟连接测试
        validation_result = {
            "status": "success",
            "message": "数据源连接成功",
            "details": {
                "version": "模拟版本",
                "database": config.json_data.get("database", "N/A"),
                "response_time": "50ms"
            }
        }
        
        # 根据数据源类型进行特定验证
        if config.type == DataSourceType.PROMETHEUS:
            validation_result["details"]["build_info"] = "prometheus-2.40.0"
            validation_result["details"]["storage"] = "local"
        elif config.type == DataSourceType.MYSQL:
            validation_result["details"]["version"] = "8.0.32"
            validation_result["details"]["charset"] = "utf8mb4"
        elif config.type == DataSourceType.INFLUXDB:
            validation_result["details"]["version"] = "1.8.10"
            validation_result["details"]["databases"] = ["mydb", "_internal"]
        
        return validation_result
    
    def export_config(self) -> str:
        """导出数据源配置"""
        config = {
            "apiVersion": 1,
            "datasources": self.data_sources
        }
        return json.dumps(config, indent=2, ensure_ascii=False)

# 使用示例
manager = DataSourceManager()

# 查看支持的时序数据库
time_series_dbs = manager.get_supported_types(DataSourceCategory.TIME_SERIES)
print("支持的时序数据库:", time_series_dbs)

# 创建Prometheus数据源
prometheus_config = DataSourceConfig(
    name="Prometheus",
    type=DataSourceType.PROMETHEUS,
    url="http://localhost:9090",
    is_default=True
)
prometheus_ds = manager.create_data_source(prometheus_config)
print("Prometheus数据源:", prometheus_ds)

常用数据源配置

1. Prometheus配置

Prometheus是最流行的监控数据源,特别适合Kubernetes和微服务环境。

基本配置

# prometheus数据源配置
apiVersion: 1
datasources:
  - name: Prometheus
    type: prometheus
    access: proxy
    url: http://localhost:9090
    isDefault: true
    jsonData:
      httpMethod: POST
      queryTimeout: 60s
      timeInterval: 15s
      exemplarTraceIdDestinations:
        - name: trace_id
          datasourceUid: jaeger_uid

高级配置选项

class PrometheusDataSource:
    """Prometheus数据源配置类"""
    
    def __init__(self):
        self.basic_config = {
            "name": "Prometheus",
            "type": "prometheus",
            "access": "proxy",
            "url": "http://localhost:9090",
            "isDefault": True
        }
    
    def configure_advanced_options(self, **kwargs):
        """配置高级选项"""
        json_data = {
            # HTTP方法
            "httpMethod": kwargs.get("http_method", "POST"),
            
            # 查询超时
            "queryTimeout": kwargs.get("query_timeout", "60s"),
            
            # 时间间隔
            "timeInterval": kwargs.get("time_interval", "15s"),
            
            # 自定义查询参数
            "customQueryParameters": kwargs.get("custom_params", ""),
            
            # 禁用指标查找
            "disableMetricsLookup": kwargs.get("disable_metrics_lookup", False),
            
            # Exemplar配置
            "exemplarTraceIdDestinations": kwargs.get("exemplar_destinations", []),
            
            # 增量查询
            "incrementalQuerying": kwargs.get("incremental_querying", False),
            
            # 缓存级别
            "cacheLevel": kwargs.get("cache_level", "Low"),
            
            # 并发查询数
            "maxConcurrentShardRequests": kwargs.get("max_concurrent_requests", 5)
        }
        
        self.basic_config["jsonData"] = json_data
        return self.basic_config
    
    def configure_authentication(self, auth_type="none", **auth_params):
        """配置认证"""
        if auth_type == "basic":
            self.basic_config["basicAuth"] = True
            self.basic_config["basicAuthUser"] = auth_params.get("username", "")
            self.basic_config["secureJsonData"] = {
                "basicAuthPassword": auth_params.get("password", "")
            }
        elif auth_type == "bearer":
            self.basic_config["secureJsonData"] = {
                "httpHeaderValue1": f"Bearer {auth_params.get('token', '')}"
            }
            self.basic_config["jsonData"]["httpHeaderName1"] = "Authorization"
        elif auth_type == "custom_header":
            header_name = auth_params.get("header_name", "X-API-Key")
            header_value = auth_params.get("header_value", "")
            self.basic_config["jsonData"]["httpHeaderName1"] = header_name
            self.basic_config["secureJsonData"] = {
                "httpHeaderValue1": header_value
            }
        
        return self.basic_config
    
    def configure_tls(self, **tls_params):
        """配置TLS"""
        json_data = self.basic_config.get("jsonData", {})
        secure_json_data = self.basic_config.get("secureJsonData", {})
        
        # TLS客户端认证
        if tls_params.get("tls_auth"):
            json_data["tlsAuth"] = True
            json_data["tlsAuthWithCACert"] = tls_params.get("with_ca_cert", False)
            
            if tls_params.get("client_cert"):
                secure_json_data["tlsClientCert"] = tls_params["client_cert"]
            if tls_params.get("client_key"):
                secure_json_data["tlsClientKey"] = tls_params["client_key"]
            if tls_params.get("ca_cert"):
                secure_json_data["tlsCACert"] = tls_params["ca_cert"]
        
        # 跳过TLS验证
        if tls_params.get("skip_verify"):
            json_data["tlsSkipVerify"] = True
        
        # 服务器名称
        if tls_params.get("server_name"):
            json_data["serverName"] = tls_params["server_name"]
        
        self.basic_config["jsonData"] = json_data
        self.basic_config["secureJsonData"] = secure_json_data
        return self.basic_config

# 使用示例
prometheus = PrometheusDataSource()

# 配置高级选项
config = prometheus.configure_advanced_options(
    http_method="GET",
    query_timeout="30s",
    time_interval="10s",
    disable_metrics_lookup=True,
    max_concurrent_requests=10
)

# 配置Bearer Token认证
config = prometheus.configure_authentication(
    auth_type="bearer",
    token="your-api-token"
)

# 配置TLS
config = prometheus.configure_tls(
    tls_auth=True,
    skip_verify=False,
    server_name="prometheus.example.com"
)

print("Prometheus配置:", json.dumps(config, indent=2))

2. MySQL配置

MySQL数据源适合展示业务数据和生成报表。

class MySQLDataSource:
    """MySQL数据源配置类"""
    
    def __init__(self):
        self.basic_config = {
            "name": "MySQL",
            "type": "mysql",
            "access": "proxy",
            "url": "localhost:3306",
            "user": "grafana",
            "database": "grafana"
        }
    
    def configure_connection(self, **params):
        """配置连接参数"""
        json_data = {
            # 数据库名称
            "database": params.get("database", "grafana"),
            
            # 连接池设置
            "maxOpenConns": params.get("max_open_conns", 0),  # 0表示无限制
            "maxIdleConns": params.get("max_idle_conns", 2),
            "connMaxLifetime": params.get("conn_max_lifetime", 14400),  # 秒
            
            # SSL设置
            "sslmode": params.get("ssl_mode", "false"),  # false, true, skip-verify
            
            # 时区设置
            "timezone": params.get("timezone", "UTC"),
            
            # 查询超时
            "queryTimeout": params.get("query_timeout", "60s"),
            
            # 时间列设置
            "timeField": params.get("time_field", "time"),
            "metricField": params.get("metric_field", "metric")
        }
        
        self.basic_config["jsonData"] = json_data
        
        # 密码配置
        if params.get("password"):
            self.basic_config["secureJsonData"] = {
                "password": params["password"]
            }
        
        return self.basic_config
    
    def get_sample_queries(self):
        """获取示例查询"""
        return {
            "time_series": {
                "description": "时序数据查询",
                "query": """
                SELECT
                  UNIX_TIMESTAMP(time_column) as time_sec,
                  value_column as value,
                  metric_name
                FROM metrics_table
                WHERE $__timeFilter(time_column)
                ORDER BY time_column ASC
                """
            },
            "table": {
                "description": "表格数据查询",
                "query": """
                SELECT
                  id,
                  name,
                  status,
                  created_at
                FROM users
                WHERE created_at >= $__timeFrom() AND created_at <= $__timeTo()
                ORDER BY created_at DESC
                """
            },
            "single_stat": {
                "description": "单值统计查询",
                "query": """
                SELECT
                  COUNT(*) as total_users
                FROM users
                WHERE status = 'active'
                """
            },
            "aggregation": {
                "description": "聚合查询",
                "query": """
                SELECT
                  DATE(created_at) as date,
                  COUNT(*) as daily_registrations
                FROM users
                WHERE $__timeFilter(created_at)
                GROUP BY DATE(created_at)
                ORDER BY date ASC
                """
            }
        }
    
    def get_macros_reference(self):
        """获取宏参考"""
        return {
            "$__time(column)": "将列转换为时间戳",
            "$__timeEpoch(column)": "将列转换为Unix时间戳",
            "$__timeFilter(column)": "添加时间范围过滤条件",
            "$__timeFrom()": "查询开始时间",
            "$__timeTo()": "查询结束时间",
            "$__timeGroup(column, interval)": "按时间间隔分组",
            "$__unixEpochFilter(column)": "Unix时间戳过滤",
            "$__unixEpochFrom()": "Unix时间戳开始时间",
            "$__unixEpochTo()": "Unix时间戳结束时间"
        }

# 使用示例
mysql_ds = MySQLDataSource()

# 配置连接
config = mysql_ds.configure_connection(
    database="monitoring",
    max_open_conns=10,
    max_idle_conns=5,
    ssl_mode="true",
    timezone="Asia/Shanghai",
    password="your-password"
)

print("MySQL配置:", json.dumps(config, indent=2))
print("示例查询:", mysql_ds.get_sample_queries())
print("宏参考:", mysql_ds.get_macros_reference())

3. InfluxDB配置

InfluxDB是专门的时序数据库,适合存储和查询大量的时序数据。

class InfluxDBDataSource:
    """InfluxDB数据源配置类"""
    
    def __init__(self, version="1.8"):
        self.version = version
        self.basic_config = {
            "name": "InfluxDB",
            "type": "influxdb",
            "access": "proxy",
            "url": "http://localhost:8086"
        }
    
    def configure_v1(self, **params):
        """配置InfluxDB 1.x"""
        json_data = {
            "version": "1.8",
            "database": params.get("database", "mydb"),
            "httpMode": params.get("http_mode", "GET"),  # GET 或 POST
            "retentionPolicy": params.get("retention_policy", ""),
            "queryTimeout": params.get("query_timeout", "60s"),
            "timeInterval": params.get("time_interval", "10s")
        }
        
        # 认证配置
        if params.get("username") and params.get("password"):
            self.basic_config["user"] = params["username"]
            self.basic_config["secureJsonData"] = {
                "password": params["password"]
            }
        
        self.basic_config["jsonData"] = json_data
        return self.basic_config
    
    def configure_v2(self, **params):
        """配置InfluxDB 2.x"""
        json_data = {
            "version": "Flux",
            "organization": params.get("organization", "my-org"),
            "defaultBucket": params.get("default_bucket", "my-bucket"),
            "queryTimeout": params.get("query_timeout", "60s"),
            "timeInterval": params.get("time_interval", "10s")
        }
        
        # Token认证
        if params.get("token"):
            self.basic_config["secureJsonData"] = {
                "token": params["token"]
            }
        
        self.basic_config["jsonData"] = json_data
        return self.basic_config
    
    def get_sample_queries(self):
        """获取示例查询"""
        if self.version.startswith("1."):
            return self._get_influxql_queries()
        else:
            return self._get_flux_queries()
    
    def _get_influxql_queries(self):
        """InfluxQL查询示例"""
        return {
            "basic_select": {
                "description": "基本查询",
                "query": 'SELECT mean("value") FROM "cpu_usage" WHERE $timeFilter GROUP BY time($__interval) fill(null)'
            },
            "multiple_series": {
                "description": "多序列查询",
                "query": 'SELECT mean("value") FROM "cpu_usage" WHERE $timeFilter GROUP BY time($__interval), "host" fill(null)'
            },
            "aggregation": {
                "description": "聚合查询",
                "query": 'SELECT max("value"), min("value"), mean("value") FROM "memory_usage" WHERE $timeFilter GROUP BY time($__interval) fill(null)'
            },
            "derivative": {
                "description": "导数计算",
                "query": 'SELECT derivative(mean("value"), 1s) FROM "network_bytes" WHERE $timeFilter GROUP BY time($__interval) fill(null)'
            }
        }
    
    def _get_flux_queries(self):
        """Flux查询示例"""
        return {
            "basic_query": {
                "description": "基本Flux查询",
                "query": '''
                from(bucket: "my-bucket")
                  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
                  |> filter(fn: (r) => r["_measurement"] == "cpu")
                  |> filter(fn: (r) => r["_field"] == "usage_idle")
                  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
                  |> yield(name: "mean")
                '''
            },
            "multiple_measurements": {
                "description": "多测量值查询",
                "query": '''
                from(bucket: "my-bucket")
                  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
                  |> filter(fn: (r) => r["_measurement"] == "system")
                  |> filter(fn: (r) => r["_field"] == "load1" or r["_field"] == "load5" or r["_field"] == "load15")
                  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
                  |> yield(name: "mean")
                '''
            },
            "transformation": {
                "description": "数据转换",
                "query": '''
                from(bucket: "my-bucket")
                  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
                  |> filter(fn: (r) => r["_measurement"] == "disk")
                  |> filter(fn: (r) => r["_field"] == "used_percent")
                  |> map(fn: (r) => ({ r with _value: r._value * 100.0 }))
                  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
                  |> yield(name: "percent")
                '''
            }
        }

# 使用示例
# InfluxDB 1.x配置
influx_v1 = InfluxDBDataSource(version="1.8")
config_v1 = influx_v1.configure_v1(
    database="telegraf",
    username="admin",
    password="password",
    http_mode="POST"
)

# InfluxDB 2.x配置
influx_v2 = InfluxDBDataSource(version="2.0")
config_v2 = influx_v2.configure_v2(
    organization="my-org",
    default_bucket="monitoring",
    token="your-influxdb-token"
)

print("InfluxDB 1.x配置:", json.dumps(config_v1, indent=2))
print("InfluxDB 2.x配置:", json.dumps(config_v2, indent=2))
print("查询示例:", influx_v1.get_sample_queries())

4. Elasticsearch配置

Elasticsearch适合日志分析和全文搜索场景。

class ElasticsearchDataSource:
    """Elasticsearch数据源配置类"""
    
    def __init__(self):
        self.basic_config = {
            "name": "Elasticsearch",
            "type": "elasticsearch",
            "access": "proxy",
            "url": "http://localhost:9200"
        }
    
    def configure_basic(self, **params):
        """基本配置"""
        json_data = {
            # ES版本
            "esVersion": params.get("version", "7.10.0"),
            
            # 时间字段
            "timeField": params.get("time_field", "@timestamp"),
            
            # 索引设置
            "interval": params.get("interval", "Daily"),  # Daily, Weekly, Monthly, Yearly
            "indexPattern": params.get("index_pattern", "[logstash-]YYYY.MM.DD"),
            
            # 日志字段
            "logMessageField": params.get("log_message_field", "message"),
            "logLevelField": params.get("log_level_field", "level"),
            
            # 查询设置
            "maxConcurrentShardRequests": params.get("max_concurrent_requests", 5),
            "queryTimeout": params.get("query_timeout", "60s"),
            
            # 包含冻结索引
            "includeFrozen": params.get("include_frozen", False)
        }
        
        self.basic_config["jsonData"] = json_data
        return self.basic_config
    
    def configure_authentication(self, auth_type="none", **auth_params):
        """配置认证"""
        if auth_type == "basic":
            self.basic_config["basicAuth"] = True
            self.basic_config["basicAuthUser"] = auth_params.get("username", "")
            self.basic_config["secureJsonData"] = {
                "basicAuthPassword": auth_params.get("password", "")
            }
        elif auth_type == "api_key":
            self.basic_config["secureJsonData"] = {
                "apiKey": auth_params.get("api_key", "")
            }
        
        return self.basic_config
    
    def get_sample_queries(self):
        """获取示例查询"""
        return {
            "logs_query": {
                "description": "日志查询",
                "query": {
                    "query": {
                        "bool": {
                            "must": [
                                {
                                    "range": {
                                        "@timestamp": {
                                            "gte": "$__timeFrom()",
                                            "lte": "$__timeTo()",
                                            "format": "epoch_millis"
                                        }
                                    }
                                },
                                {
                                    "match": {
                                        "level": "ERROR"
                                    }
                                }
                            ]
                        }
                    },
                    "sort": [
                        {
                            "@timestamp": {
                                "order": "desc"
                            }
                        }
                    ]
                }
            },
            "metrics_query": {
                "description": "指标查询",
                "query": {
                    "aggs": {
                        "time_buckets": {
                            "date_histogram": {
                                "field": "@timestamp",
                                "interval": "$__interval",
                                "min_doc_count": 0
                            },
                            "aggs": {
                                "avg_response_time": {
                                    "avg": {
                                        "field": "response_time"
                                    }
                                }
                            }
                        }
                    },
                    "query": {
                        "range": {
                            "@timestamp": {
                                "gte": "$__timeFrom()",
                                "lte": "$__timeTo()",
                                "format": "epoch_millis"
                            }
                        }
                    }
                }
            },
            "terms_aggregation": {
                "description": "词条聚合",
                "query": {
                    "aggs": {
                        "top_hosts": {
                            "terms": {
                                "field": "host.keyword",
                                "size": 10
                            },
                            "aggs": {
                                "error_count": {
                                    "filter": {
                                        "term": {
                                            "level": "ERROR"
                                        }
                                    }
                                }
                            }
                        }
                    },
                    "query": {
                        "range": {
                            "@timestamp": {
                                "gte": "$__timeFrom()",
                                "lte": "$__timeTo()",
                                "format": "epoch_millis"
                            }
                        }
                    }
                }
            }
        }

# 使用示例
es_ds = ElasticsearchDataSource()

# 基本配置
config = es_ds.configure_basic(
    version="7.15.0",
    time_field="@timestamp",
    interval="Daily",
    index_pattern="[logs-]YYYY.MM.DD",
    log_message_field="message",
    log_level_field="level"
)

# 配置认证
config = es_ds.configure_authentication(
    auth_type="basic",
    username="elastic",
    password="password"
)

print("Elasticsearch配置:", json.dumps(config, indent=2))
print("查询示例:", es_ds.get_sample_queries())

数据源管理最佳实践

1. 配置文件管理

使用配置文件进行数据源的批量管理和版本控制:

# datasources.yml
apiVersion: 1

datasources:
  # Prometheus监控
  - name: Prometheus
    type: prometheus
    access: proxy
    url: http://prometheus:9090
    isDefault: true
    jsonData:
      httpMethod: POST
      queryTimeout: 60s
      timeInterval: 15s
    version: 1
    editable: false

  # MySQL业务数据库
  - name: MySQL-Business
    type: mysql
    access: proxy
    url: mysql:3306
    user: grafana
    database: business
    jsonData:
      maxOpenConns: 10
      maxIdleConns: 5
      connMaxLifetime: 14400
      sslmode: "false"
    secureJsonData:
      password: ${MYSQL_PASSWORD}
    version: 1
    editable: false

  # InfluxDB时序数据
  - name: InfluxDB-Metrics
    type: influxdb
    access: proxy
    url: http://influxdb:8086
    user: grafana
    database: metrics
    jsonData:
      httpMode: GET
      queryTimeout: 60s
    secureJsonData:
      password: ${INFLUXDB_PASSWORD}
    version: 1
    editable: false

  # Elasticsearch日志
  - name: Elasticsearch-Logs
    type: elasticsearch
    access: proxy
    url: http://elasticsearch:9200
    basicAuth: true
    basicAuthUser: elastic
    jsonData:
      esVersion: "7.15.0"
      timeField: "@timestamp"
      interval: Daily
      logMessageField: message
      logLevelField: level
      maxConcurrentShardRequests: 5
    secureJsonData:
      basicAuthPassword: ${ELASTICSEARCH_PASSWORD}
    version: 1
    editable: false

  # Loki日志聚合
  - name: Loki
    type: loki
    access: proxy
    url: http://loki:3100
    jsonData:
      maxLines: 1000
      derivedFields:
        - name: TraceID
          matcherRegex: "trace_id=(\\w+)"
          url: "http://jaeger:16686/trace/${__value.raw}"
          datasourceUid: jaeger
    version: 1
    editable: false

  # CloudWatch (AWS)
  - name: CloudWatch
    type: cloudwatch
    jsonData:
      authType: keys
      defaultRegion: us-east-1
      customMetricsNamespaces: "MyApp,MyService"
    secureJsonData:
      accessKey: ${AWS_ACCESS_KEY_ID}
      secretKey: ${AWS_SECRET_ACCESS_KEY}
    version: 1
    editable: false

2. 环境变量管理

# .env文件
# 数据库密码
MYSQL_PASSWORD=secure_mysql_password
INFLUXDB_PASSWORD=secure_influxdb_password
ELASTICSEARCH_PASSWORD=secure_elasticsearch_password

# AWS凭证
AWS_ACCESS_KEY_ID=your_access_key
AWS_SECRET_ACCESS_KEY=your_secret_key

# API令牌
PROMETHEUS_TOKEN=your_prometheus_token
GRAFANA_API_KEY=your_grafana_api_key

3. 自动化配置脚本

import os
import json
import requests
from typing import List, Dict

class GrafanaDataSourceManager:
    """Grafana数据源自动化管理"""
    
    def __init__(self, grafana_url: str, api_key: str):
        self.grafana_url = grafana_url.rstrip('/')
        self.api_key = api_key
        self.headers = {
            'Authorization': f'Bearer {api_key}',
            'Content-Type': 'application/json'
        }
    
    def create_data_source(self, config: Dict) -> Dict:
        """创建数据源"""
        url = f"{self.grafana_url}/api/datasources"
        response = requests.post(url, headers=self.headers, json=config)
        
        if response.status_code == 200:
            return {"status": "success", "data": response.json()}
        else:
            return {"status": "error", "message": response.text}
    
    def update_data_source(self, ds_id: int, config: Dict) -> Dict:
        """更新数据源"""
        url = f"{self.grafana_url}/api/datasources/{ds_id}"
        response = requests.put(url, headers=self.headers, json=config)
        
        if response.status_code == 200:
            return {"status": "success", "data": response.json()}
        else:
            return {"status": "error", "message": response.text}
    
    def delete_data_source(self, ds_id: int) -> Dict:
        """删除数据源"""
        url = f"{self.grafana_url}/api/datasources/{ds_id}"
        response = requests.delete(url, headers=self.headers)
        
        if response.status_code == 200:
            return {"status": "success", "message": "数据源删除成功"}
        else:
            return {"status": "error", "message": response.text}
    
    def list_data_sources(self) -> List[Dict]:
        """列出所有数据源"""
        url = f"{self.grafana_url}/api/datasources"
        response = requests.get(url, headers=self.headers)
        
        if response.status_code == 200:
            return response.json()
        else:
            return []
    
    def test_data_source(self, ds_id: int) -> Dict:
        """测试数据源连接"""
        url = f"{self.grafana_url}/api/datasources/{ds_id}/health"
        response = requests.get(url, headers=self.headers)
        
        if response.status_code == 200:
            return {"status": "success", "data": response.json()}
        else:
            return {"status": "error", "message": response.text}
    
    def batch_create_from_config(self, config_file: str) -> List[Dict]:
        """从配置文件批量创建数据源"""
        with open(config_file, 'r', encoding='utf-8') as f:
            config = json.load(f)
        
        results = []
        for ds_config in config.get('datasources', []):
            # 替换环境变量
            ds_config = self._replace_env_vars(ds_config)
            
            # 检查数据源是否已存在
            existing_ds = self._find_data_source_by_name(ds_config['name'])
            
            if existing_ds:
                # 更新现有数据源
                result = self.update_data_source(existing_ds['id'], ds_config)
                result['action'] = 'updated'
            else:
                # 创建新数据源
                result = self.create_data_source(ds_config)
                result['action'] = 'created'
            
            result['name'] = ds_config['name']
            results.append(result)
        
        return results
    
    def _replace_env_vars(self, config: Dict) -> Dict:
        """替换配置中的环境变量"""
        config_str = json.dumps(config)
        
        # 替换${VAR_NAME}格式的环境变量
        import re
        pattern = r'\$\{([^}]+)\}'
        
        def replace_var(match):
            var_name = match.group(1)
            return os.getenv(var_name, match.group(0))
        
        config_str = re.sub(pattern, replace_var, config_str)
        return json.loads(config_str)
    
    def _find_data_source_by_name(self, name: str) -> Dict:
        """根据名称查找数据源"""
        data_sources = self.list_data_sources()
        for ds in data_sources:
            if ds['name'] == name:
                return ds
        return None
    
    def backup_data_sources(self, backup_file: str) -> Dict:
        """备份所有数据源配置"""
        data_sources = self.list_data_sources()
        
        # 移除敏感信息
        for ds in data_sources:
            if 'secureJsonData' in ds:
                del ds['secureJsonData']
            if 'basicAuthPassword' in ds:
                del ds['basicAuthPassword']
        
        backup_config = {
            "apiVersion": 1,
            "datasources": data_sources
        }
        
        with open(backup_file, 'w', encoding='utf-8') as f:
            json.dump(backup_config, f, indent=2, ensure_ascii=False)
        
        return {"status": "success", "message": f"备份完成: {backup_file}"}
    
    def health_check_all(self) -> Dict:
        """检查所有数据源健康状态"""
        data_sources = self.list_data_sources()
        results = []
        
        for ds in data_sources:
            health_result = self.test_data_source(ds['id'])
            results.append({
                "name": ds['name'],
                "type": ds['type'],
                "status": health_result['status'],
                "details": health_result.get('data', health_result.get('message', ''))
            })
        
        healthy_count = sum(1 for r in results if r['status'] == 'success')
        total_count = len(results)
        
        return {
            "summary": {
                "total": total_count,
                "healthy": healthy_count,
                "unhealthy": total_count - healthy_count
            },
            "details": results
        }

# 使用示例
if __name__ == "__main__":
    # 初始化管理器
    manager = GrafanaDataSourceManager(
        grafana_url="http://localhost:3000",
        api_key="your-api-key"
    )
    
    # 批量创建数据源
    results = manager.batch_create_from_config("datasources.json")
    print("批量创建结果:", results)
    
    # 健康检查
    health_status = manager.health_check_all()
    print("健康检查结果:", health_status)
    
    # 备份配置
    backup_result = manager.backup_data_sources("datasources_backup.json")
    print("备份结果:", backup_result)

数据源安全配置

1. 网络安全

class DataSourceSecurity:
    """数据源安全配置"""
    
    def __init__(self):
        self.security_configs = {}
    
    def configure_network_security(self, **params):
        """配置网络安全"""
        return {
            # 网络隔离
            "network_isolation": {
                "use_private_network": params.get("private_network", True),
                "allowed_ip_ranges": params.get("allowed_ips", ["10.0.0.0/8", "172.16.0.0/12"]),
                "firewall_rules": {
                    "inbound": [
                        {"port": 3000, "source": "grafana_subnet"},
                        {"port": 9090, "source": "monitoring_subnet"}
                    ],
                    "outbound": [
                        {"port": 443, "destination": "any", "protocol": "https"},
                        {"port": 3306, "destination": "database_subnet"}
                    ]
                }
            },
            
            # VPN配置
            "vpn_config": {
                "use_vpn": params.get("use_vpn", False),
                "vpn_type": params.get("vpn_type", "wireguard"),
                "vpn_endpoint": params.get("vpn_endpoint", "")
            },
            
            # 代理配置
            "proxy_config": {
                "use_proxy": params.get("use_proxy", False),
                "proxy_url": params.get("proxy_url", ""),
                "no_proxy": params.get("no_proxy", "localhost,127.0.0.1")
            }
        }
    
    def configure_authentication(self, **params):
        """配置认证安全"""
        return {
            # 强认证
            "strong_auth": {
                "require_mfa": params.get("require_mfa", True),
                "password_policy": {
                    "min_length": 12,
                    "require_uppercase": True,
                    "require_lowercase": True,
                    "require_numbers": True,
                    "require_symbols": True,
                    "max_age_days": 90
                },
                "session_timeout": params.get("session_timeout", 3600)
            },
            
            # API密钥管理
            "api_key_management": {
                "rotation_interval": params.get("key_rotation_days", 30),
                "key_strength": "256-bit",
                "access_logging": True,
                "rate_limiting": {
                    "requests_per_minute": 100,
                    "burst_limit": 200
                }
            },
            
            # OAuth/SAML
            "sso_config": {
                "enable_oauth": params.get("enable_oauth", False),
                "oauth_provider": params.get("oauth_provider", "google"),
                "saml_enabled": params.get("enable_saml", False),
                "auto_login": params.get("auto_login", False)
            }
        }
    
    def configure_encryption(self, **params):
        """配置加密"""
        return {
            # 传输加密
            "transport_encryption": {
                "enforce_https": True,
                "tls_version": "1.3",
                "cipher_suites": [
                    "TLS_AES_256_GCM_SHA384",
                    "TLS_CHACHA20_POLY1305_SHA256",
                    "TLS_AES_128_GCM_SHA256"
                ],
                "hsts_enabled": True,
                "hsts_max_age": 31536000
            },
            
            # 证书管理
            "certificate_management": {
                "use_custom_ca": params.get("custom_ca", False),
                "cert_rotation_days": params.get("cert_rotation", 90),
                "ocsp_stapling": True,
                "certificate_transparency": True
            },
            
            # 数据加密
            "data_encryption": {
                "encrypt_at_rest": params.get("encrypt_at_rest", True),
                "encryption_algorithm": "AES-256-GCM",
                "key_management": "vault",
                "key_rotation_interval": 365
            }
        }
    
    def configure_access_control(self, **params):
        """配置访问控制"""
        return {
            # 基于角色的访问控制
            "rbac": {
                "enable_rbac": True,
                "default_role": "Viewer",
                "role_hierarchy": {
                    "Admin": ["Editor", "Viewer"],
                    "Editor": ["Viewer"],
                    "Viewer": []
                },
                "data_source_permissions": {
                    "prometheus": ["Admin", "Editor", "Viewer"],
                    "mysql_sensitive": ["Admin"],
                    "logs": ["Admin", "Editor"]
                }
            },
            
            # 审计日志
            "audit_logging": {
                "enable_audit": True,
                "log_level": "INFO",
                "log_retention_days": 365,
                "log_events": [
                    "login", "logout", "data_source_access",
                    "dashboard_view", "query_execution",
                    "configuration_change"
                ],
                "log_format": "json",
                "log_destination": "syslog"
            },
            
            # IP白名单
            "ip_whitelist": {
                "enable_whitelist": params.get("enable_ip_whitelist", False),
                "allowed_ips": params.get("allowed_ips", []),
                "blocked_ips": params.get("blocked_ips", []),
                "geo_blocking": {
                    "enable": False,
                    "allowed_countries": ["US", "CN", "JP"]
                }
            }
        }
    
    def generate_security_checklist(self):
        """生成安全检查清单"""
        return {
            "network_security": [
                "✓ 使用私有网络",
                "✓ 配置防火墙规则",
                "✓ 启用网络隔离",
                "✓ 使用VPN连接",
                "✓ 配置代理服务器"
            ],
            "authentication": [
                "✓ 启用多因素认证",
                "✓ 实施强密码策略",
                "✓ 定期轮换API密钥",
                "✓ 配置SSO集成",
                "✓ 设置会话超时"
            ],
            "encryption": [
                "✓ 强制使用HTTPS",
                "✓ 配置TLS 1.3",
                "✓ 启用HSTS",
                "✓ 使用强加密套件",
                "✓ 定期更新证书"
            ],
            "access_control": [
                "✓ 启用基于角色的访问控制",
                "✓ 配置数据源权限",
                "✓ 启用审计日志",
                "✓ 配置IP白名单",
                "✓ 实施最小权限原则"
            ],
            "monitoring": [
                "✓ 监控登录尝试",
                "✓ 监控异常查询",
                "✓ 设置安全告警",
                "✓ 定期安全审计",
                "✓ 监控数据访问模式"
            ]
        }

# 使用示例
security = DataSourceSecurity()
network_config = security.configure_network_security(private_network=True, use_vpn=True)
auth_config = security.configure_authentication(require_mfa=True, session_timeout=1800)
encryption_config = security.configure_encryption(encrypt_at_rest=True)
access_config = security.configure_access_control(enable_ip_whitelist=True)
checklist = security.generate_security_checklist()

print("安全配置:", {
    "network": network_config,
    "authentication": auth_config,
    "encryption": encryption_config,
    "access_control": access_config
})
print("安全检查清单:", checklist)

故障排除

1. 常见连接问题

class DataSourceTroubleshooter:
    """数据源故障排除工具"""
    
    def __init__(self):
        self.common_issues = {
            "connection_refused": {
                "description": "连接被拒绝",
                "possible_causes": [
                    "目标服务未运行",
                    "端口配置错误",
                    "防火墙阻止连接",
                    "网络不可达"
                ],
                "solutions": [
                    "检查目标服务状态",
                    "验证端口配置",
                    "检查防火墙规则",
                    "测试网络连通性"
                ]
            },
            "authentication_failed": {
                "description": "认证失败",
                "possible_causes": [
                    "用户名或密码错误",
                    "API密钥过期",
                    "权限不足",
                    "认证方式配置错误"
                ],
                "solutions": [
                    "验证凭据正确性",
                    "更新API密钥",
                    "检查用户权限",
                    "确认认证方式"
                ]
            },
            "timeout": {
                "description": "查询超时",
                "possible_causes": [
                    "查询过于复杂",
                    "数据量过大",
                    "网络延迟高",
                    "服务器负载高"
                ],
                "solutions": [
                    "优化查询语句",
                    "减少查询时间范围",
                    "增加超时时间",
                    "检查服务器性能"
                ]
            },
            "ssl_error": {
                "description": "SSL/TLS错误",
                "possible_causes": [
                    "证书过期",
                    "证书不匹配",
                    "CA证书缺失",
                    "TLS版本不兼容"
                ],
                "solutions": [
                    "更新证书",
                    "验证证书域名",
                    "安装CA证书",
                    "调整TLS配置"
                ]
            }
        }
    
    def diagnose_connection(self, error_message: str) -> Dict:
        """诊断连接问题"""
        error_lower = error_message.lower()
        
        for issue_type, issue_info in self.common_issues.items():
            # 简单的关键词匹配
            keywords = {
                "connection_refused": ["connection refused", "连接被拒绝", "connect failed"],
                "authentication_failed": ["authentication", "unauthorized", "401", "403"],
                "timeout": ["timeout", "超时", "time out"],
                "ssl_error": ["ssl", "tls", "certificate", "证书"]
            }
            
            if any(keyword in error_lower for keyword in keywords.get(issue_type, [])):
                return {
                    "issue_type": issue_type,
                    "description": issue_info["description"],
                    "possible_causes": issue_info["possible_causes"],
                    "solutions": issue_info["solutions"],
                    "next_steps": self._get_next_steps(issue_type)
                }
        
        return {
            "issue_type": "unknown",
            "description": "未知错误",
            "message": error_message,
            "general_solutions": [
                "检查Grafana日志",
                "验证数据源配置",
                "测试网络连接",
                "联系系统管理员"
            ]
        }
    
    def _get_next_steps(self, issue_type: str) -> List[str]:
        """获取下一步操作建议"""
        next_steps = {
            "connection_refused": [
                "使用telnet测试端口连通性",
                "检查目标服务日志",
                "验证网络路由",
                "检查DNS解析"
            ],
            "authentication_failed": [
                "在数据源系统中验证用户",
                "检查权限分配",
                "测试API密钥",
                "查看认证日志"
            ],
            "timeout": [
                "监控查询执行时间",
                "分析慢查询日志",
                "检查系统资源使用",
                "优化查询索引"
            ],
            "ssl_error": [
                "检查证书有效期",
                "验证证书链",
                "测试SSL连接",
                "更新证书存储"
            ]
        }
        return next_steps.get(issue_type, [])
    
    def generate_test_commands(self, ds_config: Dict) -> Dict:
        """生成测试命令"""
        ds_type = ds_config.get("type", "")
        url = ds_config.get("url", "")
        
        commands = {
            "basic_connectivity": {
                "description": "基本连通性测试",
                "commands": []
            },
            "service_specific": {
                "description": "服务特定测试",
                "commands": []
            }
        }
        
        # 解析URL
        import urllib.parse
        parsed_url = urllib.parse.urlparse(url)
        host = parsed_url.hostname or "localhost"
        port = parsed_url.port
        
        # 基本连通性测试
        if port:
            commands["basic_connectivity"]["commands"].extend([
                f"ping -c 4 {host}",
                f"telnet {host} {port}",
                f"nmap -p {port} {host}",
                f"curl -I {url}"
            ])
        
        # 服务特定测试
        if ds_type == "prometheus":
            commands["service_specific"]["commands"].extend([
                f"curl {url}/api/v1/status/config",
                f"curl {url}/api/v1/targets",
                f"curl {url}/metrics"
            ])
        elif ds_type == "mysql":
            commands["service_specific"]["commands"].extend([
                f"mysql -h {host} -P {port} -u username -p -e 'SELECT 1'",
                f"mysqladmin -h {host} -P {port} -u username -p ping"
            ])
        elif ds_type == "influxdb":
            commands["service_specific"]["commands"].extend([
                f"curl {url}/ping",
                f"curl {url}/query?q=SHOW+DATABASES"
            ])
        elif ds_type == "elasticsearch":
            commands["service_specific"]["commands"].extend([
                f"curl {url}/_cluster/health",
                f"curl {url}/_cat/indices"
            ])
        
        return commands

# 使用示例
troubleshooter = DataSourceTroubleshooter()

# 诊断错误
error_msg = "connection refused to prometheus:9090"
diagnosis = troubleshooter.diagnose_connection(error_msg)
print("诊断结果:", diagnosis)

# 生成测试命令
ds_config = {
    "type": "prometheus",
    "url": "http://prometheus:9090"
}
test_commands = troubleshooter.generate_test_commands(ds_config)
print("测试命令:", test_commands)

2. 性能优化

class DataSourcePerformanceOptimizer:
    """数据源性能优化工具"""
    
    def __init__(self):
        self.optimization_strategies = {
            "query_optimization": {
                "description": "查询优化",
                "techniques": [
                    "使用时间范围限制",
                    "减少返回数据量",
                    "使用聚合函数",
                    "避免复杂的正则表达式",
                    "使用索引字段过滤"
                ]
            },
            "connection_pooling": {
                "description": "连接池优化",
                "techniques": [
                    "配置合适的连接池大小",
                    "设置连接超时时间",
                    "启用连接复用",
                    "监控连接使用情况"
                ]
            },
            "caching": {
                "description": "缓存优化",
                "techniques": [
                    "启用查询结果缓存",
                    "配置合适的缓存TTL",
                    "使用Redis缓存",
                    "实施分层缓存策略"
                ]
            },
            "data_source_tuning": {
                "description": "数据源调优",
                "techniques": [
                    "优化数据库索引",
                    "调整内存配置",
                    "优化存储引擎",
                    "配置读写分离"
                ]
            }
        }
    
    def analyze_performance(self, ds_config: Dict, metrics: Dict) -> Dict:
        """分析性能指标"""
        analysis = {
            "overall_score": 0,
            "issues": [],
            "recommendations": [],
            "optimizations": []
        }
        
        # 分析查询响应时间
        avg_response_time = metrics.get("avg_response_time", 0)
        if avg_response_time > 5000:  # 5秒
            analysis["issues"].append("查询响应时间过长")
            analysis["recommendations"].extend([
                "优化查询语句",
                "减少查询时间范围",
                "添加适当的索引"
            ])
        
        # 分析连接数
        active_connections = metrics.get("active_connections", 0)
        max_connections = ds_config.get("max_connections", 100)
        if active_connections / max_connections > 0.8:
            analysis["issues"].append("连接池使用率过高")
            analysis["recommendations"].extend([
                "增加连接池大小",
                "优化连接复用",
                "检查连接泄漏"
            ])
        
        # 分析错误率
        error_rate = metrics.get("error_rate", 0)
        if error_rate > 0.05:  # 5%
            analysis["issues"].append("错误率过高")
            analysis["recommendations"].extend([
                "检查数据源健康状态",
                "优化查询语句",
                "增加重试机制"
            ])
        
        # 计算总体评分
        score = 100
        score -= min(avg_response_time / 100, 50)  # 响应时间影响
        score -= min(error_rate * 100, 30)  # 错误率影响
        score -= min((active_connections / max_connections) * 20, 20)  # 连接使用率影响
        
        analysis["overall_score"] = max(0, int(score))
        
        return analysis
    
    def generate_optimization_config(self, ds_type: str, performance_level: str = "balanced") -> Dict:
        """生成优化配置"""
        configs = {
            "prometheus": {
                "high_performance": {
                    "timeout": "30s",
                    "max_concurrent_queries": 20,
                    "query_timeout": "2m",
                    "cache_level": "high",
                    "step_alignment": True
                },
                "balanced": {
                    "timeout": "60s",
                    "max_concurrent_queries": 10,
                    "query_timeout": "5m",
                    "cache_level": "medium",
                    "step_alignment": False
                },
                "resource_saving": {
                    "timeout": "120s",
                    "max_concurrent_queries": 5,
                    "query_timeout": "10m",
                    "cache_level": "low",
                    "step_alignment": False
                }
            },
            "mysql": {
                "high_performance": {
                    "max_open_conns": 100,
                    "max_idle_conns": 10,
                    "conn_max_lifetime": "1h",
                    "timeout": "30s",
                    "read_timeout": "30s",
                    "write_timeout": "30s"
                },
                "balanced": {
                    "max_open_conns": 50,
                    "max_idle_conns": 5,
                    "conn_max_lifetime": "2h",
                    "timeout": "60s",
                    "read_timeout": "60s",
                    "write_timeout": "60s"
                },
                "resource_saving": {
                    "max_open_conns": 20,
                    "max_idle_conns": 2,
                    "conn_max_lifetime": "4h",
                    "timeout": "120s",
                    "read_timeout": "120s",
                    "write_timeout": "120s"
                }
            }
        }
        
        return configs.get(ds_type, {}).get(performance_level, {})
    
    def generate_monitoring_queries(self, ds_type: str) -> Dict:
        """生成性能监控查询"""
        queries = {
            "prometheus": {
                "query_duration": 'histogram_quantile(0.95, rate(prometheus_engine_query_duration_seconds_bucket[5m]))',
                "active_queries": 'prometheus_engine_queries',
                "query_rate": 'rate(prometheus_engine_queries_total[5m])',
                "memory_usage": 'process_resident_memory_bytes{job="prometheus"}',
                "disk_usage": 'prometheus_tsdb_symbol_table_size_bytes + prometheus_tsdb_head_series'
            },
            "mysql": {
                "connection_count": 'mysql_global_status_threads_connected',
                "query_rate": 'rate(mysql_global_status_questions[5m])',
                "slow_queries": 'rate(mysql_global_status_slow_queries[5m])',
                "innodb_buffer_pool": 'mysql_global_status_innodb_buffer_pool_pages_total',
                "table_locks": 'mysql_global_status_table_locks_waited'
            },
            "influxdb": {
                "query_duration": 'influxdb_queryExecutor_queriesExecuted_mean',
                "write_rate": 'rate(influxdb_write_total[5m])',
                "series_count": 'influxdb_database_numSeries',
                "memory_usage": 'influxdb_runtime_MemStats_Alloc',
                "disk_usage": 'influxdb_shard_diskBytes'
            }
        }
        
        return queries.get(ds_type, {})

# 使用示例
optimizer = DataSourcePerformanceOptimizer()

# 性能分析
ds_config = {"type": "prometheus", "max_connections": 100}
metrics = {
    "avg_response_time": 3000,  # 3秒
    "active_connections": 85,
    "error_rate": 0.02  # 2%
}

analysis = optimizer.analyze_performance(ds_config, metrics)
print("性能分析:", analysis)

# 生成优化配置
optimization_config = optimizer.generate_optimization_config("prometheus", "high_performance")
print("优化配置:", optimization_config)

# 生成监控查询
monitoring_queries = optimizer.generate_monitoring_queries("prometheus")
print("监控查询:", monitoring_queries)

总结

关键要点

  1. 数据源类型选择

    • 根据数据特性选择合适的数据源类型
    • 考虑性能、可扩展性和维护成本
    • 评估与现有系统的集成难度
  2. 配置最佳实践

    • 使用环境变量管理敏感信息
    • 实施配置版本控制
    • 建立配置审核流程
    • 定期备份配置文件
  3. 安全考虑

    • 启用传输加密(TLS/SSL)
    • 实施强认证机制
    • 配置网络访问控制
    • 定期安全审计
  4. 性能优化

    • 优化查询语句和时间范围
    • 配置合适的连接池
    • 启用查询缓存
    • 监控关键性能指标
  5. 故障排除

    • 建立系统化的诊断流程
    • 准备常用的测试工具和命令
    • 维护问题解决知识库
    • 实施预防性监控

最佳实践

  1. 配置管理

    • 使用基础设施即代码(IaC)
    • 实施配置自动化部署
    • 建立环境一致性检查
    • 维护配置文档
  2. 监控和告警

    • 监控数据源健康状态
    • 设置性能阈值告警
    • 跟踪查询性能趋势
    • 建立故障响应流程
  3. 容量规划

    • 定期评估资源使用情况
    • 预测增长趋势
    • 制定扩容计划
    • 优化资源分配

下一步学习

  1. 深入特定数据源

    • 学习目标数据源的高级特性
    • 掌握特定的优化技巧
    • 了解最新版本特性
  2. 仪表板设计

    • 学习有效的数据可视化原则
    • 掌握面板配置技巧
    • 了解用户体验设计
  3. 高级查询

    • 学习复杂查询语法
    • 掌握数据转换技巧
    • 了解查询优化方法
  4. 集成和自动化

    • 学习API使用
    • 掌握自动化配置
    • 了解CI/CD集成