概述
数据源是Grafana的核心组件,它定义了Grafana如何连接和查询外部数据系统。Grafana支持60多种不同类型的数据源,从时序数据库到关系数据库,从日志系统到云服务监控。正确配置数据源是创建有效仪表板的基础。
学习目标
通过本章学习,你将掌握: - 理解数据源的类型和特点 - 配置常用数据源(Prometheus、MySQL、InfluxDB等) - 掌握数据源的高级配置选项 - 学会数据源的测试和故障排除 - 了解数据源的安全配置 - 掌握数据源的批量管理和自动化配置
数据源类型概览
from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Optional, Any
import json
class DataSourceCategory(Enum):
"""数据源分类"""
TIME_SERIES = "time_series" # 时序数据库
RELATIONAL = "relational" # 关系数据库
DOCUMENT = "document" # 文档数据库
LOGGING = "logging" # 日志系统
CLOUD = "cloud" # 云服务
MONITORING = "monitoring" # 监控系统
ANALYTICS = "analytics" # 分析平台
OTHER = "other" # 其他
class DataSourceType(Enum):
"""具体数据源类型"""
# 时序数据库
PROMETHEUS = "prometheus"
INFLUXDB = "influxdb"
GRAPHITE = "graphite"
OPENTSDB = "opentsdb"
TIMESCALEDB = "postgres" # 使用PostgreSQL插件
# 关系数据库
MYSQL = "mysql"
POSTGRESQL = "postgres"
MSSQL = "mssql"
ORACLE = "oracle"
# 文档数据库
ELASTICSEARCH = "elasticsearch"
MONGODB = "mongodb"
# 日志系统
LOKI = "loki"
SPLUNK = "splunk"
# 云服务
CLOUDWATCH = "cloudwatch"
AZURE_MONITOR = "azuremonitor"
GOOGLE_CLOUD = "stackdriver"
# 其他
JSON = "simplejson"
CSV = "marcusolsson-csv-datasource"
ZABBIX = "alexanderzobnin-zabbix-datasource"
@dataclass
class DataSourceConfig:
"""数据源配置"""
name: str
type: DataSourceType
url: str
access: str = "proxy" # proxy 或 direct
is_default: bool = False
basic_auth: bool = False
basic_auth_user: str = ""
basic_auth_password: str = ""
with_credentials: bool = False
json_data: Dict[str, Any] = None
secure_json_data: Dict[str, str] = None
version: int = 1
read_only: bool = False
def __post_init__(self):
if self.json_data is None:
self.json_data = {}
if self.secure_json_data is None:
self.secure_json_data = {}
class DataSourceManager:
"""数据源管理器"""
def __init__(self):
self.data_sources = []
self.supported_types = {
DataSourceType.PROMETHEUS: {
"category": DataSourceCategory.TIME_SERIES,
"description": "开源监控和告警工具包",
"query_language": "PromQL",
"default_port": 9090,
"features": ["metrics", "alerting", "annotations"]
},
DataSourceType.INFLUXDB: {
"category": DataSourceCategory.TIME_SERIES,
"description": "时序数据库",
"query_language": "InfluxQL/Flux",
"default_port": 8086,
"features": ["metrics", "logs", "annotations"]
},
DataSourceType.MYSQL: {
"category": DataSourceCategory.RELATIONAL,
"description": "关系型数据库",
"query_language": "SQL",
"default_port": 3306,
"features": ["table", "annotations"]
},
DataSourceType.POSTGRESQL: {
"category": DataSourceCategory.RELATIONAL,
"description": "高级关系型数据库",
"query_language": "SQL",
"default_port": 5432,
"features": ["table", "annotations", "time_series"]
},
DataSourceType.ELASTICSEARCH: {
"category": DataSourceCategory.LOGGING,
"description": "搜索和分析引擎",
"query_language": "Lucene/KQL",
"default_port": 9200,
"features": ["logs", "metrics", "annotations"]
},
DataSourceType.LOKI: {
"category": DataSourceCategory.LOGGING,
"description": "日志聚合系统",
"query_language": "LogQL",
"default_port": 3100,
"features": ["logs", "annotations"]
},
DataSourceType.CLOUDWATCH: {
"category": DataSourceCategory.CLOUD,
"description": "AWS监控服务",
"query_language": "CloudWatch Query",
"default_port": 443,
"features": ["metrics", "logs", "annotations"]
}
}
def get_supported_types(self, category: DataSourceCategory = None) -> List[Dict]:
"""获取支持的数据源类型"""
if category:
return [
{"type": ds_type, **info}
for ds_type, info in self.supported_types.items()
if info["category"] == category
]
return [{"type": ds_type, **info} for ds_type, info in self.supported_types.items()]
def create_data_source(self, config: DataSourceConfig) -> Dict:
"""创建数据源"""
if config.type not in self.supported_types:
raise ValueError(f"不支持的数据源类型: {config.type}")
data_source = {
"name": config.name,
"type": config.type.value,
"url": config.url,
"access": config.access,
"isDefault": config.is_default,
"basicAuth": config.basic_auth,
"basicAuthUser": config.basic_auth_user,
"withCredentials": config.with_credentials,
"jsonData": config.json_data,
"version": config.version,
"readOnly": config.read_only
}
if config.basic_auth and config.basic_auth_password:
data_source["basicAuthPassword"] = config.basic_auth_password
if config.secure_json_data:
data_source["secureJsonData"] = config.secure_json_data
self.data_sources.append(data_source)
return data_source
def get_connection_template(self, ds_type: DataSourceType) -> Dict:
"""获取数据源连接模板"""
templates = {
DataSourceType.PROMETHEUS: {
"url": "http://localhost:9090",
"json_data": {
"httpMethod": "POST",
"queryTimeout": "60s",
"timeInterval": "15s"
}
},
DataSourceType.INFLUXDB: {
"url": "http://localhost:8086",
"json_data": {
"database": "mydb",
"httpMode": "GET",
"version": "1.8"
}
},
DataSourceType.MYSQL: {
"url": "localhost:3306",
"json_data": {
"database": "grafana",
"maxOpenConns": 0,
"maxIdleConns": 2,
"connMaxLifetime": 14400
}
},
DataSourceType.POSTGRESQL: {
"url": "localhost:5432",
"json_data": {
"database": "postgres",
"sslmode": "disable",
"maxOpenConns": 0,
"maxIdleConns": 2,
"connMaxLifetime": 14400,
"postgresVersion": 1200,
"timescaledb": False
}
},
DataSourceType.ELASTICSEARCH: {
"url": "http://localhost:9200",
"json_data": {
"esVersion": "7.10.0",
"timeField": "@timestamp",
"interval": "Daily",
"logMessageField": "message",
"logLevelField": "level"
}
},
DataSourceType.LOKI: {
"url": "http://localhost:3100",
"json_data": {
"maxLines": 1000,
"derivedFields": []
}
}
}
return templates.get(ds_type, {})
def validate_connection(self, config: DataSourceConfig) -> Dict:
"""验证数据源连接"""
# 模拟连接测试
validation_result = {
"status": "success",
"message": "数据源连接成功",
"details": {
"version": "模拟版本",
"database": config.json_data.get("database", "N/A"),
"response_time": "50ms"
}
}
# 根据数据源类型进行特定验证
if config.type == DataSourceType.PROMETHEUS:
validation_result["details"]["build_info"] = "prometheus-2.40.0"
validation_result["details"]["storage"] = "local"
elif config.type == DataSourceType.MYSQL:
validation_result["details"]["version"] = "8.0.32"
validation_result["details"]["charset"] = "utf8mb4"
elif config.type == DataSourceType.INFLUXDB:
validation_result["details"]["version"] = "1.8.10"
validation_result["details"]["databases"] = ["mydb", "_internal"]
return validation_result
def export_config(self) -> str:
"""导出数据源配置"""
config = {
"apiVersion": 1,
"datasources": self.data_sources
}
return json.dumps(config, indent=2, ensure_ascii=False)
# 使用示例
manager = DataSourceManager()
# 查看支持的时序数据库
time_series_dbs = manager.get_supported_types(DataSourceCategory.TIME_SERIES)
print("支持的时序数据库:", time_series_dbs)
# 创建Prometheus数据源
prometheus_config = DataSourceConfig(
name="Prometheus",
type=DataSourceType.PROMETHEUS,
url="http://localhost:9090",
is_default=True
)
prometheus_ds = manager.create_data_source(prometheus_config)
print("Prometheus数据源:", prometheus_ds)
常用数据源配置
1. Prometheus配置
Prometheus是最流行的监控数据源,特别适合Kubernetes和微服务环境。
基本配置
# prometheus数据源配置
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
access: proxy
url: http://localhost:9090
isDefault: true
jsonData:
httpMethod: POST
queryTimeout: 60s
timeInterval: 15s
exemplarTraceIdDestinations:
- name: trace_id
datasourceUid: jaeger_uid
高级配置选项
class PrometheusDataSource:
"""Prometheus数据源配置类"""
def __init__(self):
self.basic_config = {
"name": "Prometheus",
"type": "prometheus",
"access": "proxy",
"url": "http://localhost:9090",
"isDefault": True
}
def configure_advanced_options(self, **kwargs):
"""配置高级选项"""
json_data = {
# HTTP方法
"httpMethod": kwargs.get("http_method", "POST"),
# 查询超时
"queryTimeout": kwargs.get("query_timeout", "60s"),
# 时间间隔
"timeInterval": kwargs.get("time_interval", "15s"),
# 自定义查询参数
"customQueryParameters": kwargs.get("custom_params", ""),
# 禁用指标查找
"disableMetricsLookup": kwargs.get("disable_metrics_lookup", False),
# Exemplar配置
"exemplarTraceIdDestinations": kwargs.get("exemplar_destinations", []),
# 增量查询
"incrementalQuerying": kwargs.get("incremental_querying", False),
# 缓存级别
"cacheLevel": kwargs.get("cache_level", "Low"),
# 并发查询数
"maxConcurrentShardRequests": kwargs.get("max_concurrent_requests", 5)
}
self.basic_config["jsonData"] = json_data
return self.basic_config
def configure_authentication(self, auth_type="none", **auth_params):
"""配置认证"""
if auth_type == "basic":
self.basic_config["basicAuth"] = True
self.basic_config["basicAuthUser"] = auth_params.get("username", "")
self.basic_config["secureJsonData"] = {
"basicAuthPassword": auth_params.get("password", "")
}
elif auth_type == "bearer":
self.basic_config["secureJsonData"] = {
"httpHeaderValue1": f"Bearer {auth_params.get('token', '')}"
}
self.basic_config["jsonData"]["httpHeaderName1"] = "Authorization"
elif auth_type == "custom_header":
header_name = auth_params.get("header_name", "X-API-Key")
header_value = auth_params.get("header_value", "")
self.basic_config["jsonData"]["httpHeaderName1"] = header_name
self.basic_config["secureJsonData"] = {
"httpHeaderValue1": header_value
}
return self.basic_config
def configure_tls(self, **tls_params):
"""配置TLS"""
json_data = self.basic_config.get("jsonData", {})
secure_json_data = self.basic_config.get("secureJsonData", {})
# TLS客户端认证
if tls_params.get("tls_auth"):
json_data["tlsAuth"] = True
json_data["tlsAuthWithCACert"] = tls_params.get("with_ca_cert", False)
if tls_params.get("client_cert"):
secure_json_data["tlsClientCert"] = tls_params["client_cert"]
if tls_params.get("client_key"):
secure_json_data["tlsClientKey"] = tls_params["client_key"]
if tls_params.get("ca_cert"):
secure_json_data["tlsCACert"] = tls_params["ca_cert"]
# 跳过TLS验证
if tls_params.get("skip_verify"):
json_data["tlsSkipVerify"] = True
# 服务器名称
if tls_params.get("server_name"):
json_data["serverName"] = tls_params["server_name"]
self.basic_config["jsonData"] = json_data
self.basic_config["secureJsonData"] = secure_json_data
return self.basic_config
# 使用示例
prometheus = PrometheusDataSource()
# 配置高级选项
config = prometheus.configure_advanced_options(
http_method="GET",
query_timeout="30s",
time_interval="10s",
disable_metrics_lookup=True,
max_concurrent_requests=10
)
# 配置Bearer Token认证
config = prometheus.configure_authentication(
auth_type="bearer",
token="your-api-token"
)
# 配置TLS
config = prometheus.configure_tls(
tls_auth=True,
skip_verify=False,
server_name="prometheus.example.com"
)
print("Prometheus配置:", json.dumps(config, indent=2))
2. MySQL配置
MySQL数据源适合展示业务数据和生成报表。
class MySQLDataSource:
"""MySQL数据源配置类"""
def __init__(self):
self.basic_config = {
"name": "MySQL",
"type": "mysql",
"access": "proxy",
"url": "localhost:3306",
"user": "grafana",
"database": "grafana"
}
def configure_connection(self, **params):
"""配置连接参数"""
json_data = {
# 数据库名称
"database": params.get("database", "grafana"),
# 连接池设置
"maxOpenConns": params.get("max_open_conns", 0), # 0表示无限制
"maxIdleConns": params.get("max_idle_conns", 2),
"connMaxLifetime": params.get("conn_max_lifetime", 14400), # 秒
# SSL设置
"sslmode": params.get("ssl_mode", "false"), # false, true, skip-verify
# 时区设置
"timezone": params.get("timezone", "UTC"),
# 查询超时
"queryTimeout": params.get("query_timeout", "60s"),
# 时间列设置
"timeField": params.get("time_field", "time"),
"metricField": params.get("metric_field", "metric")
}
self.basic_config["jsonData"] = json_data
# 密码配置
if params.get("password"):
self.basic_config["secureJsonData"] = {
"password": params["password"]
}
return self.basic_config
def get_sample_queries(self):
"""获取示例查询"""
return {
"time_series": {
"description": "时序数据查询",
"query": """
SELECT
UNIX_TIMESTAMP(time_column) as time_sec,
value_column as value,
metric_name
FROM metrics_table
WHERE $__timeFilter(time_column)
ORDER BY time_column ASC
"""
},
"table": {
"description": "表格数据查询",
"query": """
SELECT
id,
name,
status,
created_at
FROM users
WHERE created_at >= $__timeFrom() AND created_at <= $__timeTo()
ORDER BY created_at DESC
"""
},
"single_stat": {
"description": "单值统计查询",
"query": """
SELECT
COUNT(*) as total_users
FROM users
WHERE status = 'active'
"""
},
"aggregation": {
"description": "聚合查询",
"query": """
SELECT
DATE(created_at) as date,
COUNT(*) as daily_registrations
FROM users
WHERE $__timeFilter(created_at)
GROUP BY DATE(created_at)
ORDER BY date ASC
"""
}
}
def get_macros_reference(self):
"""获取宏参考"""
return {
"$__time(column)": "将列转换为时间戳",
"$__timeEpoch(column)": "将列转换为Unix时间戳",
"$__timeFilter(column)": "添加时间范围过滤条件",
"$__timeFrom()": "查询开始时间",
"$__timeTo()": "查询结束时间",
"$__timeGroup(column, interval)": "按时间间隔分组",
"$__unixEpochFilter(column)": "Unix时间戳过滤",
"$__unixEpochFrom()": "Unix时间戳开始时间",
"$__unixEpochTo()": "Unix时间戳结束时间"
}
# 使用示例
mysql_ds = MySQLDataSource()
# 配置连接
config = mysql_ds.configure_connection(
database="monitoring",
max_open_conns=10,
max_idle_conns=5,
ssl_mode="true",
timezone="Asia/Shanghai",
password="your-password"
)
print("MySQL配置:", json.dumps(config, indent=2))
print("示例查询:", mysql_ds.get_sample_queries())
print("宏参考:", mysql_ds.get_macros_reference())
3. InfluxDB配置
InfluxDB是专门的时序数据库,适合存储和查询大量的时序数据。
class InfluxDBDataSource:
"""InfluxDB数据源配置类"""
def __init__(self, version="1.8"):
self.version = version
self.basic_config = {
"name": "InfluxDB",
"type": "influxdb",
"access": "proxy",
"url": "http://localhost:8086"
}
def configure_v1(self, **params):
"""配置InfluxDB 1.x"""
json_data = {
"version": "1.8",
"database": params.get("database", "mydb"),
"httpMode": params.get("http_mode", "GET"), # GET 或 POST
"retentionPolicy": params.get("retention_policy", ""),
"queryTimeout": params.get("query_timeout", "60s"),
"timeInterval": params.get("time_interval", "10s")
}
# 认证配置
if params.get("username") and params.get("password"):
self.basic_config["user"] = params["username"]
self.basic_config["secureJsonData"] = {
"password": params["password"]
}
self.basic_config["jsonData"] = json_data
return self.basic_config
def configure_v2(self, **params):
"""配置InfluxDB 2.x"""
json_data = {
"version": "Flux",
"organization": params.get("organization", "my-org"),
"defaultBucket": params.get("default_bucket", "my-bucket"),
"queryTimeout": params.get("query_timeout", "60s"),
"timeInterval": params.get("time_interval", "10s")
}
# Token认证
if params.get("token"):
self.basic_config["secureJsonData"] = {
"token": params["token"]
}
self.basic_config["jsonData"] = json_data
return self.basic_config
def get_sample_queries(self):
"""获取示例查询"""
if self.version.startswith("1."):
return self._get_influxql_queries()
else:
return self._get_flux_queries()
def _get_influxql_queries(self):
"""InfluxQL查询示例"""
return {
"basic_select": {
"description": "基本查询",
"query": 'SELECT mean("value") FROM "cpu_usage" WHERE $timeFilter GROUP BY time($__interval) fill(null)'
},
"multiple_series": {
"description": "多序列查询",
"query": 'SELECT mean("value") FROM "cpu_usage" WHERE $timeFilter GROUP BY time($__interval), "host" fill(null)'
},
"aggregation": {
"description": "聚合查询",
"query": 'SELECT max("value"), min("value"), mean("value") FROM "memory_usage" WHERE $timeFilter GROUP BY time($__interval) fill(null)'
},
"derivative": {
"description": "导数计算",
"query": 'SELECT derivative(mean("value"), 1s) FROM "network_bytes" WHERE $timeFilter GROUP BY time($__interval) fill(null)'
}
}
def _get_flux_queries(self):
"""Flux查询示例"""
return {
"basic_query": {
"description": "基本Flux查询",
"query": '''
from(bucket: "my-bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "cpu")
|> filter(fn: (r) => r["_field"] == "usage_idle")
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> yield(name: "mean")
'''
},
"multiple_measurements": {
"description": "多测量值查询",
"query": '''
from(bucket: "my-bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "system")
|> filter(fn: (r) => r["_field"] == "load1" or r["_field"] == "load5" or r["_field"] == "load15")
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> yield(name: "mean")
'''
},
"transformation": {
"description": "数据转换",
"query": '''
from(bucket: "my-bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "disk")
|> filter(fn: (r) => r["_field"] == "used_percent")
|> map(fn: (r) => ({ r with _value: r._value * 100.0 }))
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> yield(name: "percent")
'''
}
}
# 使用示例
# InfluxDB 1.x配置
influx_v1 = InfluxDBDataSource(version="1.8")
config_v1 = influx_v1.configure_v1(
database="telegraf",
username="admin",
password="password",
http_mode="POST"
)
# InfluxDB 2.x配置
influx_v2 = InfluxDBDataSource(version="2.0")
config_v2 = influx_v2.configure_v2(
organization="my-org",
default_bucket="monitoring",
token="your-influxdb-token"
)
print("InfluxDB 1.x配置:", json.dumps(config_v1, indent=2))
print("InfluxDB 2.x配置:", json.dumps(config_v2, indent=2))
print("查询示例:", influx_v1.get_sample_queries())
4. Elasticsearch配置
Elasticsearch适合日志分析和全文搜索场景。
class ElasticsearchDataSource:
"""Elasticsearch数据源配置类"""
def __init__(self):
self.basic_config = {
"name": "Elasticsearch",
"type": "elasticsearch",
"access": "proxy",
"url": "http://localhost:9200"
}
def configure_basic(self, **params):
"""基本配置"""
json_data = {
# ES版本
"esVersion": params.get("version", "7.10.0"),
# 时间字段
"timeField": params.get("time_field", "@timestamp"),
# 索引设置
"interval": params.get("interval", "Daily"), # Daily, Weekly, Monthly, Yearly
"indexPattern": params.get("index_pattern", "[logstash-]YYYY.MM.DD"),
# 日志字段
"logMessageField": params.get("log_message_field", "message"),
"logLevelField": params.get("log_level_field", "level"),
# 查询设置
"maxConcurrentShardRequests": params.get("max_concurrent_requests", 5),
"queryTimeout": params.get("query_timeout", "60s"),
# 包含冻结索引
"includeFrozen": params.get("include_frozen", False)
}
self.basic_config["jsonData"] = json_data
return self.basic_config
def configure_authentication(self, auth_type="none", **auth_params):
"""配置认证"""
if auth_type == "basic":
self.basic_config["basicAuth"] = True
self.basic_config["basicAuthUser"] = auth_params.get("username", "")
self.basic_config["secureJsonData"] = {
"basicAuthPassword": auth_params.get("password", "")
}
elif auth_type == "api_key":
self.basic_config["secureJsonData"] = {
"apiKey": auth_params.get("api_key", "")
}
return self.basic_config
def get_sample_queries(self):
"""获取示例查询"""
return {
"logs_query": {
"description": "日志查询",
"query": {
"query": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
"gte": "$__timeFrom()",
"lte": "$__timeTo()",
"format": "epoch_millis"
}
}
},
{
"match": {
"level": "ERROR"
}
}
]
}
},
"sort": [
{
"@timestamp": {
"order": "desc"
}
}
]
}
},
"metrics_query": {
"description": "指标查询",
"query": {
"aggs": {
"time_buckets": {
"date_histogram": {
"field": "@timestamp",
"interval": "$__interval",
"min_doc_count": 0
},
"aggs": {
"avg_response_time": {
"avg": {
"field": "response_time"
}
}
}
}
},
"query": {
"range": {
"@timestamp": {
"gte": "$__timeFrom()",
"lte": "$__timeTo()",
"format": "epoch_millis"
}
}
}
}
},
"terms_aggregation": {
"description": "词条聚合",
"query": {
"aggs": {
"top_hosts": {
"terms": {
"field": "host.keyword",
"size": 10
},
"aggs": {
"error_count": {
"filter": {
"term": {
"level": "ERROR"
}
}
}
}
}
},
"query": {
"range": {
"@timestamp": {
"gte": "$__timeFrom()",
"lte": "$__timeTo()",
"format": "epoch_millis"
}
}
}
}
}
}
# 使用示例
es_ds = ElasticsearchDataSource()
# 基本配置
config = es_ds.configure_basic(
version="7.15.0",
time_field="@timestamp",
interval="Daily",
index_pattern="[logs-]YYYY.MM.DD",
log_message_field="message",
log_level_field="level"
)
# 配置认证
config = es_ds.configure_authentication(
auth_type="basic",
username="elastic",
password="password"
)
print("Elasticsearch配置:", json.dumps(config, indent=2))
print("查询示例:", es_ds.get_sample_queries())
数据源管理最佳实践
1. 配置文件管理
使用配置文件进行数据源的批量管理和版本控制:
# datasources.yml
apiVersion: 1
datasources:
# Prometheus监控
- name: Prometheus
type: prometheus
access: proxy
url: http://prometheus:9090
isDefault: true
jsonData:
httpMethod: POST
queryTimeout: 60s
timeInterval: 15s
version: 1
editable: false
# MySQL业务数据库
- name: MySQL-Business
type: mysql
access: proxy
url: mysql:3306
user: grafana
database: business
jsonData:
maxOpenConns: 10
maxIdleConns: 5
connMaxLifetime: 14400
sslmode: "false"
secureJsonData:
password: ${MYSQL_PASSWORD}
version: 1
editable: false
# InfluxDB时序数据
- name: InfluxDB-Metrics
type: influxdb
access: proxy
url: http://influxdb:8086
user: grafana
database: metrics
jsonData:
httpMode: GET
queryTimeout: 60s
secureJsonData:
password: ${INFLUXDB_PASSWORD}
version: 1
editable: false
# Elasticsearch日志
- name: Elasticsearch-Logs
type: elasticsearch
access: proxy
url: http://elasticsearch:9200
basicAuth: true
basicAuthUser: elastic
jsonData:
esVersion: "7.15.0"
timeField: "@timestamp"
interval: Daily
logMessageField: message
logLevelField: level
maxConcurrentShardRequests: 5
secureJsonData:
basicAuthPassword: ${ELASTICSEARCH_PASSWORD}
version: 1
editable: false
# Loki日志聚合
- name: Loki
type: loki
access: proxy
url: http://loki:3100
jsonData:
maxLines: 1000
derivedFields:
- name: TraceID
matcherRegex: "trace_id=(\\w+)"
url: "http://jaeger:16686/trace/${__value.raw}"
datasourceUid: jaeger
version: 1
editable: false
# CloudWatch (AWS)
- name: CloudWatch
type: cloudwatch
jsonData:
authType: keys
defaultRegion: us-east-1
customMetricsNamespaces: "MyApp,MyService"
secureJsonData:
accessKey: ${AWS_ACCESS_KEY_ID}
secretKey: ${AWS_SECRET_ACCESS_KEY}
version: 1
editable: false
2. 环境变量管理
# .env文件
# 数据库密码
MYSQL_PASSWORD=secure_mysql_password
INFLUXDB_PASSWORD=secure_influxdb_password
ELASTICSEARCH_PASSWORD=secure_elasticsearch_password
# AWS凭证
AWS_ACCESS_KEY_ID=your_access_key
AWS_SECRET_ACCESS_KEY=your_secret_key
# API令牌
PROMETHEUS_TOKEN=your_prometheus_token
GRAFANA_API_KEY=your_grafana_api_key
3. 自动化配置脚本
import os
import json
import requests
from typing import List, Dict
class GrafanaDataSourceManager:
"""Grafana数据源自动化管理"""
def __init__(self, grafana_url: str, api_key: str):
self.grafana_url = grafana_url.rstrip('/')
self.api_key = api_key
self.headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
def create_data_source(self, config: Dict) -> Dict:
"""创建数据源"""
url = f"{self.grafana_url}/api/datasources"
response = requests.post(url, headers=self.headers, json=config)
if response.status_code == 200:
return {"status": "success", "data": response.json()}
else:
return {"status": "error", "message": response.text}
def update_data_source(self, ds_id: int, config: Dict) -> Dict:
"""更新数据源"""
url = f"{self.grafana_url}/api/datasources/{ds_id}"
response = requests.put(url, headers=self.headers, json=config)
if response.status_code == 200:
return {"status": "success", "data": response.json()}
else:
return {"status": "error", "message": response.text}
def delete_data_source(self, ds_id: int) -> Dict:
"""删除数据源"""
url = f"{self.grafana_url}/api/datasources/{ds_id}"
response = requests.delete(url, headers=self.headers)
if response.status_code == 200:
return {"status": "success", "message": "数据源删除成功"}
else:
return {"status": "error", "message": response.text}
def list_data_sources(self) -> List[Dict]:
"""列出所有数据源"""
url = f"{self.grafana_url}/api/datasources"
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
return response.json()
else:
return []
def test_data_source(self, ds_id: int) -> Dict:
"""测试数据源连接"""
url = f"{self.grafana_url}/api/datasources/{ds_id}/health"
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
return {"status": "success", "data": response.json()}
else:
return {"status": "error", "message": response.text}
def batch_create_from_config(self, config_file: str) -> List[Dict]:
"""从配置文件批量创建数据源"""
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
results = []
for ds_config in config.get('datasources', []):
# 替换环境变量
ds_config = self._replace_env_vars(ds_config)
# 检查数据源是否已存在
existing_ds = self._find_data_source_by_name(ds_config['name'])
if existing_ds:
# 更新现有数据源
result = self.update_data_source(existing_ds['id'], ds_config)
result['action'] = 'updated'
else:
# 创建新数据源
result = self.create_data_source(ds_config)
result['action'] = 'created'
result['name'] = ds_config['name']
results.append(result)
return results
def _replace_env_vars(self, config: Dict) -> Dict:
"""替换配置中的环境变量"""
config_str = json.dumps(config)
# 替换${VAR_NAME}格式的环境变量
import re
pattern = r'\$\{([^}]+)\}'
def replace_var(match):
var_name = match.group(1)
return os.getenv(var_name, match.group(0))
config_str = re.sub(pattern, replace_var, config_str)
return json.loads(config_str)
def _find_data_source_by_name(self, name: str) -> Dict:
"""根据名称查找数据源"""
data_sources = self.list_data_sources()
for ds in data_sources:
if ds['name'] == name:
return ds
return None
def backup_data_sources(self, backup_file: str) -> Dict:
"""备份所有数据源配置"""
data_sources = self.list_data_sources()
# 移除敏感信息
for ds in data_sources:
if 'secureJsonData' in ds:
del ds['secureJsonData']
if 'basicAuthPassword' in ds:
del ds['basicAuthPassword']
backup_config = {
"apiVersion": 1,
"datasources": data_sources
}
with open(backup_file, 'w', encoding='utf-8') as f:
json.dump(backup_config, f, indent=2, ensure_ascii=False)
return {"status": "success", "message": f"备份完成: {backup_file}"}
def health_check_all(self) -> Dict:
"""检查所有数据源健康状态"""
data_sources = self.list_data_sources()
results = []
for ds in data_sources:
health_result = self.test_data_source(ds['id'])
results.append({
"name": ds['name'],
"type": ds['type'],
"status": health_result['status'],
"details": health_result.get('data', health_result.get('message', ''))
})
healthy_count = sum(1 for r in results if r['status'] == 'success')
total_count = len(results)
return {
"summary": {
"total": total_count,
"healthy": healthy_count,
"unhealthy": total_count - healthy_count
},
"details": results
}
# 使用示例
if __name__ == "__main__":
# 初始化管理器
manager = GrafanaDataSourceManager(
grafana_url="http://localhost:3000",
api_key="your-api-key"
)
# 批量创建数据源
results = manager.batch_create_from_config("datasources.json")
print("批量创建结果:", results)
# 健康检查
health_status = manager.health_check_all()
print("健康检查结果:", health_status)
# 备份配置
backup_result = manager.backup_data_sources("datasources_backup.json")
print("备份结果:", backup_result)
数据源安全配置
1. 网络安全
class DataSourceSecurity:
"""数据源安全配置"""
def __init__(self):
self.security_configs = {}
def configure_network_security(self, **params):
"""配置网络安全"""
return {
# 网络隔离
"network_isolation": {
"use_private_network": params.get("private_network", True),
"allowed_ip_ranges": params.get("allowed_ips", ["10.0.0.0/8", "172.16.0.0/12"]),
"firewall_rules": {
"inbound": [
{"port": 3000, "source": "grafana_subnet"},
{"port": 9090, "source": "monitoring_subnet"}
],
"outbound": [
{"port": 443, "destination": "any", "protocol": "https"},
{"port": 3306, "destination": "database_subnet"}
]
}
},
# VPN配置
"vpn_config": {
"use_vpn": params.get("use_vpn", False),
"vpn_type": params.get("vpn_type", "wireguard"),
"vpn_endpoint": params.get("vpn_endpoint", "")
},
# 代理配置
"proxy_config": {
"use_proxy": params.get("use_proxy", False),
"proxy_url": params.get("proxy_url", ""),
"no_proxy": params.get("no_proxy", "localhost,127.0.0.1")
}
}
def configure_authentication(self, **params):
"""配置认证安全"""
return {
# 强认证
"strong_auth": {
"require_mfa": params.get("require_mfa", True),
"password_policy": {
"min_length": 12,
"require_uppercase": True,
"require_lowercase": True,
"require_numbers": True,
"require_symbols": True,
"max_age_days": 90
},
"session_timeout": params.get("session_timeout", 3600)
},
# API密钥管理
"api_key_management": {
"rotation_interval": params.get("key_rotation_days", 30),
"key_strength": "256-bit",
"access_logging": True,
"rate_limiting": {
"requests_per_minute": 100,
"burst_limit": 200
}
},
# OAuth/SAML
"sso_config": {
"enable_oauth": params.get("enable_oauth", False),
"oauth_provider": params.get("oauth_provider", "google"),
"saml_enabled": params.get("enable_saml", False),
"auto_login": params.get("auto_login", False)
}
}
def configure_encryption(self, **params):
"""配置加密"""
return {
# 传输加密
"transport_encryption": {
"enforce_https": True,
"tls_version": "1.3",
"cipher_suites": [
"TLS_AES_256_GCM_SHA384",
"TLS_CHACHA20_POLY1305_SHA256",
"TLS_AES_128_GCM_SHA256"
],
"hsts_enabled": True,
"hsts_max_age": 31536000
},
# 证书管理
"certificate_management": {
"use_custom_ca": params.get("custom_ca", False),
"cert_rotation_days": params.get("cert_rotation", 90),
"ocsp_stapling": True,
"certificate_transparency": True
},
# 数据加密
"data_encryption": {
"encrypt_at_rest": params.get("encrypt_at_rest", True),
"encryption_algorithm": "AES-256-GCM",
"key_management": "vault",
"key_rotation_interval": 365
}
}
def configure_access_control(self, **params):
"""配置访问控制"""
return {
# 基于角色的访问控制
"rbac": {
"enable_rbac": True,
"default_role": "Viewer",
"role_hierarchy": {
"Admin": ["Editor", "Viewer"],
"Editor": ["Viewer"],
"Viewer": []
},
"data_source_permissions": {
"prometheus": ["Admin", "Editor", "Viewer"],
"mysql_sensitive": ["Admin"],
"logs": ["Admin", "Editor"]
}
},
# 审计日志
"audit_logging": {
"enable_audit": True,
"log_level": "INFO",
"log_retention_days": 365,
"log_events": [
"login", "logout", "data_source_access",
"dashboard_view", "query_execution",
"configuration_change"
],
"log_format": "json",
"log_destination": "syslog"
},
# IP白名单
"ip_whitelist": {
"enable_whitelist": params.get("enable_ip_whitelist", False),
"allowed_ips": params.get("allowed_ips", []),
"blocked_ips": params.get("blocked_ips", []),
"geo_blocking": {
"enable": False,
"allowed_countries": ["US", "CN", "JP"]
}
}
}
def generate_security_checklist(self):
"""生成安全检查清单"""
return {
"network_security": [
"✓ 使用私有网络",
"✓ 配置防火墙规则",
"✓ 启用网络隔离",
"✓ 使用VPN连接",
"✓ 配置代理服务器"
],
"authentication": [
"✓ 启用多因素认证",
"✓ 实施强密码策略",
"✓ 定期轮换API密钥",
"✓ 配置SSO集成",
"✓ 设置会话超时"
],
"encryption": [
"✓ 强制使用HTTPS",
"✓ 配置TLS 1.3",
"✓ 启用HSTS",
"✓ 使用强加密套件",
"✓ 定期更新证书"
],
"access_control": [
"✓ 启用基于角色的访问控制",
"✓ 配置数据源权限",
"✓ 启用审计日志",
"✓ 配置IP白名单",
"✓ 实施最小权限原则"
],
"monitoring": [
"✓ 监控登录尝试",
"✓ 监控异常查询",
"✓ 设置安全告警",
"✓ 定期安全审计",
"✓ 监控数据访问模式"
]
}
# 使用示例
security = DataSourceSecurity()
network_config = security.configure_network_security(private_network=True, use_vpn=True)
auth_config = security.configure_authentication(require_mfa=True, session_timeout=1800)
encryption_config = security.configure_encryption(encrypt_at_rest=True)
access_config = security.configure_access_control(enable_ip_whitelist=True)
checklist = security.generate_security_checklist()
print("安全配置:", {
"network": network_config,
"authentication": auth_config,
"encryption": encryption_config,
"access_control": access_config
})
print("安全检查清单:", checklist)
故障排除
1. 常见连接问题
class DataSourceTroubleshooter:
"""数据源故障排除工具"""
def __init__(self):
self.common_issues = {
"connection_refused": {
"description": "连接被拒绝",
"possible_causes": [
"目标服务未运行",
"端口配置错误",
"防火墙阻止连接",
"网络不可达"
],
"solutions": [
"检查目标服务状态",
"验证端口配置",
"检查防火墙规则",
"测试网络连通性"
]
},
"authentication_failed": {
"description": "认证失败",
"possible_causes": [
"用户名或密码错误",
"API密钥过期",
"权限不足",
"认证方式配置错误"
],
"solutions": [
"验证凭据正确性",
"更新API密钥",
"检查用户权限",
"确认认证方式"
]
},
"timeout": {
"description": "查询超时",
"possible_causes": [
"查询过于复杂",
"数据量过大",
"网络延迟高",
"服务器负载高"
],
"solutions": [
"优化查询语句",
"减少查询时间范围",
"增加超时时间",
"检查服务器性能"
]
},
"ssl_error": {
"description": "SSL/TLS错误",
"possible_causes": [
"证书过期",
"证书不匹配",
"CA证书缺失",
"TLS版本不兼容"
],
"solutions": [
"更新证书",
"验证证书域名",
"安装CA证书",
"调整TLS配置"
]
}
}
def diagnose_connection(self, error_message: str) -> Dict:
"""诊断连接问题"""
error_lower = error_message.lower()
for issue_type, issue_info in self.common_issues.items():
# 简单的关键词匹配
keywords = {
"connection_refused": ["connection refused", "连接被拒绝", "connect failed"],
"authentication_failed": ["authentication", "unauthorized", "401", "403"],
"timeout": ["timeout", "超时", "time out"],
"ssl_error": ["ssl", "tls", "certificate", "证书"]
}
if any(keyword in error_lower for keyword in keywords.get(issue_type, [])):
return {
"issue_type": issue_type,
"description": issue_info["description"],
"possible_causes": issue_info["possible_causes"],
"solutions": issue_info["solutions"],
"next_steps": self._get_next_steps(issue_type)
}
return {
"issue_type": "unknown",
"description": "未知错误",
"message": error_message,
"general_solutions": [
"检查Grafana日志",
"验证数据源配置",
"测试网络连接",
"联系系统管理员"
]
}
def _get_next_steps(self, issue_type: str) -> List[str]:
"""获取下一步操作建议"""
next_steps = {
"connection_refused": [
"使用telnet测试端口连通性",
"检查目标服务日志",
"验证网络路由",
"检查DNS解析"
],
"authentication_failed": [
"在数据源系统中验证用户",
"检查权限分配",
"测试API密钥",
"查看认证日志"
],
"timeout": [
"监控查询执行时间",
"分析慢查询日志",
"检查系统资源使用",
"优化查询索引"
],
"ssl_error": [
"检查证书有效期",
"验证证书链",
"测试SSL连接",
"更新证书存储"
]
}
return next_steps.get(issue_type, [])
def generate_test_commands(self, ds_config: Dict) -> Dict:
"""生成测试命令"""
ds_type = ds_config.get("type", "")
url = ds_config.get("url", "")
commands = {
"basic_connectivity": {
"description": "基本连通性测试",
"commands": []
},
"service_specific": {
"description": "服务特定测试",
"commands": []
}
}
# 解析URL
import urllib.parse
parsed_url = urllib.parse.urlparse(url)
host = parsed_url.hostname or "localhost"
port = parsed_url.port
# 基本连通性测试
if port:
commands["basic_connectivity"]["commands"].extend([
f"ping -c 4 {host}",
f"telnet {host} {port}",
f"nmap -p {port} {host}",
f"curl -I {url}"
])
# 服务特定测试
if ds_type == "prometheus":
commands["service_specific"]["commands"].extend([
f"curl {url}/api/v1/status/config",
f"curl {url}/api/v1/targets",
f"curl {url}/metrics"
])
elif ds_type == "mysql":
commands["service_specific"]["commands"].extend([
f"mysql -h {host} -P {port} -u username -p -e 'SELECT 1'",
f"mysqladmin -h {host} -P {port} -u username -p ping"
])
elif ds_type == "influxdb":
commands["service_specific"]["commands"].extend([
f"curl {url}/ping",
f"curl {url}/query?q=SHOW+DATABASES"
])
elif ds_type == "elasticsearch":
commands["service_specific"]["commands"].extend([
f"curl {url}/_cluster/health",
f"curl {url}/_cat/indices"
])
return commands
# 使用示例
troubleshooter = DataSourceTroubleshooter()
# 诊断错误
error_msg = "connection refused to prometheus:9090"
diagnosis = troubleshooter.diagnose_connection(error_msg)
print("诊断结果:", diagnosis)
# 生成测试命令
ds_config = {
"type": "prometheus",
"url": "http://prometheus:9090"
}
test_commands = troubleshooter.generate_test_commands(ds_config)
print("测试命令:", test_commands)
2. 性能优化
class DataSourcePerformanceOptimizer:
"""数据源性能优化工具"""
def __init__(self):
self.optimization_strategies = {
"query_optimization": {
"description": "查询优化",
"techniques": [
"使用时间范围限制",
"减少返回数据量",
"使用聚合函数",
"避免复杂的正则表达式",
"使用索引字段过滤"
]
},
"connection_pooling": {
"description": "连接池优化",
"techniques": [
"配置合适的连接池大小",
"设置连接超时时间",
"启用连接复用",
"监控连接使用情况"
]
},
"caching": {
"description": "缓存优化",
"techniques": [
"启用查询结果缓存",
"配置合适的缓存TTL",
"使用Redis缓存",
"实施分层缓存策略"
]
},
"data_source_tuning": {
"description": "数据源调优",
"techniques": [
"优化数据库索引",
"调整内存配置",
"优化存储引擎",
"配置读写分离"
]
}
}
def analyze_performance(self, ds_config: Dict, metrics: Dict) -> Dict:
"""分析性能指标"""
analysis = {
"overall_score": 0,
"issues": [],
"recommendations": [],
"optimizations": []
}
# 分析查询响应时间
avg_response_time = metrics.get("avg_response_time", 0)
if avg_response_time > 5000: # 5秒
analysis["issues"].append("查询响应时间过长")
analysis["recommendations"].extend([
"优化查询语句",
"减少查询时间范围",
"添加适当的索引"
])
# 分析连接数
active_connections = metrics.get("active_connections", 0)
max_connections = ds_config.get("max_connections", 100)
if active_connections / max_connections > 0.8:
analysis["issues"].append("连接池使用率过高")
analysis["recommendations"].extend([
"增加连接池大小",
"优化连接复用",
"检查连接泄漏"
])
# 分析错误率
error_rate = metrics.get("error_rate", 0)
if error_rate > 0.05: # 5%
analysis["issues"].append("错误率过高")
analysis["recommendations"].extend([
"检查数据源健康状态",
"优化查询语句",
"增加重试机制"
])
# 计算总体评分
score = 100
score -= min(avg_response_time / 100, 50) # 响应时间影响
score -= min(error_rate * 100, 30) # 错误率影响
score -= min((active_connections / max_connections) * 20, 20) # 连接使用率影响
analysis["overall_score"] = max(0, int(score))
return analysis
def generate_optimization_config(self, ds_type: str, performance_level: str = "balanced") -> Dict:
"""生成优化配置"""
configs = {
"prometheus": {
"high_performance": {
"timeout": "30s",
"max_concurrent_queries": 20,
"query_timeout": "2m",
"cache_level": "high",
"step_alignment": True
},
"balanced": {
"timeout": "60s",
"max_concurrent_queries": 10,
"query_timeout": "5m",
"cache_level": "medium",
"step_alignment": False
},
"resource_saving": {
"timeout": "120s",
"max_concurrent_queries": 5,
"query_timeout": "10m",
"cache_level": "low",
"step_alignment": False
}
},
"mysql": {
"high_performance": {
"max_open_conns": 100,
"max_idle_conns": 10,
"conn_max_lifetime": "1h",
"timeout": "30s",
"read_timeout": "30s",
"write_timeout": "30s"
},
"balanced": {
"max_open_conns": 50,
"max_idle_conns": 5,
"conn_max_lifetime": "2h",
"timeout": "60s",
"read_timeout": "60s",
"write_timeout": "60s"
},
"resource_saving": {
"max_open_conns": 20,
"max_idle_conns": 2,
"conn_max_lifetime": "4h",
"timeout": "120s",
"read_timeout": "120s",
"write_timeout": "120s"
}
}
}
return configs.get(ds_type, {}).get(performance_level, {})
def generate_monitoring_queries(self, ds_type: str) -> Dict:
"""生成性能监控查询"""
queries = {
"prometheus": {
"query_duration": 'histogram_quantile(0.95, rate(prometheus_engine_query_duration_seconds_bucket[5m]))',
"active_queries": 'prometheus_engine_queries',
"query_rate": 'rate(prometheus_engine_queries_total[5m])',
"memory_usage": 'process_resident_memory_bytes{job="prometheus"}',
"disk_usage": 'prometheus_tsdb_symbol_table_size_bytes + prometheus_tsdb_head_series'
},
"mysql": {
"connection_count": 'mysql_global_status_threads_connected',
"query_rate": 'rate(mysql_global_status_questions[5m])',
"slow_queries": 'rate(mysql_global_status_slow_queries[5m])',
"innodb_buffer_pool": 'mysql_global_status_innodb_buffer_pool_pages_total',
"table_locks": 'mysql_global_status_table_locks_waited'
},
"influxdb": {
"query_duration": 'influxdb_queryExecutor_queriesExecuted_mean',
"write_rate": 'rate(influxdb_write_total[5m])',
"series_count": 'influxdb_database_numSeries',
"memory_usage": 'influxdb_runtime_MemStats_Alloc',
"disk_usage": 'influxdb_shard_diskBytes'
}
}
return queries.get(ds_type, {})
# 使用示例
optimizer = DataSourcePerformanceOptimizer()
# 性能分析
ds_config = {"type": "prometheus", "max_connections": 100}
metrics = {
"avg_response_time": 3000, # 3秒
"active_connections": 85,
"error_rate": 0.02 # 2%
}
analysis = optimizer.analyze_performance(ds_config, metrics)
print("性能分析:", analysis)
# 生成优化配置
optimization_config = optimizer.generate_optimization_config("prometheus", "high_performance")
print("优化配置:", optimization_config)
# 生成监控查询
monitoring_queries = optimizer.generate_monitoring_queries("prometheus")
print("监控查询:", monitoring_queries)
总结
关键要点
数据源类型选择
- 根据数据特性选择合适的数据源类型
- 考虑性能、可扩展性和维护成本
- 评估与现有系统的集成难度
配置最佳实践
- 使用环境变量管理敏感信息
- 实施配置版本控制
- 建立配置审核流程
- 定期备份配置文件
安全考虑
- 启用传输加密(TLS/SSL)
- 实施强认证机制
- 配置网络访问控制
- 定期安全审计
性能优化
- 优化查询语句和时间范围
- 配置合适的连接池
- 启用查询缓存
- 监控关键性能指标
故障排除
- 建立系统化的诊断流程
- 准备常用的测试工具和命令
- 维护问题解决知识库
- 实施预防性监控
最佳实践
配置管理
- 使用基础设施即代码(IaC)
- 实施配置自动化部署
- 建立环境一致性检查
- 维护配置文档
监控和告警
- 监控数据源健康状态
- 设置性能阈值告警
- 跟踪查询性能趋势
- 建立故障响应流程
容量规划
- 定期评估资源使用情况
- 预测增长趋势
- 制定扩容计划
- 优化资源分配
下一步学习
深入特定数据源
- 学习目标数据源的高级特性
- 掌握特定的优化技巧
- 了解最新版本特性
仪表板设计
- 学习有效的数据可视化原则
- 掌握面板配置技巧
- 了解用户体验设计
高级查询
- 学习复杂查询语法
- 掌握数据转换技巧
- 了解查询优化方法
集成和自动化
- 学习API使用
- 掌握自动化配置
- 了解CI/CD集成