概述
本章将深入探讨TiDB集群的管理和运维实践,包括集群扩容缩容、备份恢复、监控告警、故障处理等关键运维技能。掌握这些知识对于维护生产环境中的TiDB集群至关重要。
学习目标
通过本章学习,您将了解: - TiDB集群的扩容和缩容操作 - 数据备份和恢复策略 - 集群监控和告警配置 - 常见故障的诊断和处理 - 性能调优和容量规划 - 安全管理和权限控制 - 数据迁移和同步
集群扩容与缩容
1. 集群扩容
## 监控与告警
### 1. Prometheus监控配置
**Prometheus配置文件:**
```yaml
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "tidb_rules.yml"
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093
scrape_configs:
# TiDB监控
- job_name: 'tidb'
static_configs:
- targets: ['10.0.2.10:10080', '10.0.2.11:10080']
scrape_interval: 15s
metrics_path: /metrics
# TiKV监控
- job_name: 'tikv'
static_configs:
- targets: ['10.0.3.10:20181', '10.0.3.11:20181', '10.0.3.12:20181']
scrape_interval: 15s
metrics_path: /metrics
# PD监控
- job_name: 'pd'
static_configs:
- targets: ['10.0.1.10:2379', '10.0.1.11:2379', '10.0.1.12:2379']
scrape_interval: 15s
metrics_path: /metrics
# TiFlash监控
- job_name: 'tiflash'
static_configs:
- targets: ['10.0.4.10:8234', '10.0.4.11:8234']
scrape_interval: 15s
metrics_path: /metrics
# Node Exporter
- job_name: 'node'
static_configs:
- targets: ['10.0.1.10:9100', '10.0.1.11:9100', '10.0.1.12:9100',
'10.0.2.10:9100', '10.0.2.11:9100',
'10.0.3.10:9100', '10.0.3.11:9100', '10.0.3.12:9100',
'10.0.4.10:9100', '10.0.4.11:9100']
scrape_interval: 15s
告警规则配置:
# tidb_rules.yml
groups:
- name: tidb.rules
rules:
# TiDB服务可用性
- alert: TiDBDown
expr: up{job="tidb"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "TiDB server is down"
description: "TiDB server {{ $labels.instance }} has been down for more than 1 minute."
# TiKV服务可用性
- alert: TiKVDown
expr: up{job="tikv"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "TiKV server is down"
description: "TiKV server {{ $labels.instance }} has been down for more than 1 minute."
# PD服务可用性
- alert: PDDown
expr: up{job="pd"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "PD server is down"
description: "PD server {{ $labels.instance }} has been down for more than 1 minute."
# TiKV磁盘使用率
- alert: TiKVDiskUsageHigh
expr: (tikv_store_size_bytes / tikv_store_capacity_bytes) > 0.8
for: 5m
labels:
severity: warning
annotations:
summary: "TiKV disk usage is high"
description: "TiKV {{ $labels.instance }} disk usage is above 80% (current: {{ $value | humanizePercentage }})."
# TiDB查询延迟
- alert: TiDBQueryDurationHigh
expr: histogram_quantile(0.99, rate(tidb_server_handle_query_duration_seconds_bucket[5m])) > 1
for: 5m
labels:
severity: warning
annotations:
summary: "TiDB query duration is high"
description: "TiDB {{ $labels.instance }} 99th percentile query duration is above 1s (current: {{ $value }}s)."
# TiKV Region数量
- alert: TiKVRegionCountHigh
expr: tikv_raftstore_region_count > 20000
for: 5m
labels:
severity: warning
annotations:
summary: "TiKV region count is high"
description: "TiKV {{ $labels.instance }} region count is above 20000 (current: {{ $value }})."
# TiDB连接数
- alert: TiDBConnectionHigh
expr: tidb_server_connections > 500
for: 5m
labels:
severity: warning
annotations:
summary: "TiDB connection count is high"
description: "TiDB {{ $labels.instance }} connection count is above 500 (current: {{ $value }})."
# TiKV Leader切换频率
- alert: TiKVLeaderChangeHigh
expr: rate(tikv_pd_heartbeat_tick_total{type="leader"}[5m]) > 10
for: 5m
labels:
severity: warning
annotations:
summary: "TiKV leader change rate is high"
description: "TiKV {{ $labels.instance }} leader change rate is above 10/s (current: {{ $value }}/s)."
2. Grafana仪表板
TiDB Overview仪表板配置:
{
"dashboard": {
"title": "TiDB Overview",
"panels": [
{
"title": "TiDB QPS",
"type": "graph",
"targets": [
{
"expr": "sum(rate(tidb_server_query_total[5m])) by (instance)",
"legendFormat": "{{ instance }}"
}
]
},
{
"title": "TiDB Query Duration",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.99, rate(tidb_server_handle_query_duration_seconds_bucket[5m]))",
"legendFormat": "99th percentile"
},
{
"expr": "histogram_quantile(0.95, rate(tidb_server_handle_query_duration_seconds_bucket[5m]))",
"legendFormat": "95th percentile"
}
]
},
{
"title": "TiKV CPU Usage",
"type": "graph",
"targets": [
{
"expr": "rate(tikv_thread_cpu_seconds_total[5m]) * 100",
"legendFormat": "{{ instance }}"
}
]
},
{
"title": "TiKV Disk Usage",
"type": "graph",
"targets": [
{
"expr": "tikv_store_size_bytes / tikv_store_capacity_bytes * 100",
"legendFormat": "{{ instance }}"
}
]
}
]
}
}
3. 关键监控指标
TiDB指标:
-- 查询TPS
SELECT
DATE_FORMAT(time, '%Y-%m-%d %H:%i') as time_window,
SUM(count) / 60 as qps
FROM INFORMATION_SCHEMA.CLUSTER_STATEMENTS_SUMMARY_HISTORY
WHERE time >= NOW() - INTERVAL 1 HOUR
GROUP BY time_window
ORDER BY time_window;
-- 慢查询统计
SELECT
digest_text,
COUNT(*) as count,
AVG(query_time) as avg_time,
MAX(query_time) as max_time
FROM INFORMATION_SCHEMA.CLUSTER_SLOW_QUERY
WHERE time >= NOW() - INTERVAL 1 HOUR
GROUP BY digest_text
ORDER BY count DESC
LIMIT 10;
-- 连接数统计
SELECT
instance,
COUNT(*) as connection_count
FROM INFORMATION_SCHEMA.CLUSTER_PROCESSLIST
GROUP BY instance;
TiKV指标:
# 通过Prometheus API查询TiKV指标
# TiKV CPU使用率
curl 'http://prometheus:9090/api/v1/query?query=rate(tikv_thread_cpu_seconds_total[5m])'
# TiKV内存使用
curl 'http://prometheus:9090/api/v1/query?query=tikv_allocator_stats{type="allocated"}'
# TiKV磁盘使用率
curl 'http://prometheus:9090/api/v1/query?query=tikv_store_size_bytes/tikv_store_capacity_bytes'
# TiKV Region数量
curl 'http://prometheus:9090/api/v1/query?query=tikv_raftstore_region_count'
故障诊断与处理
1. 常见故障场景
节点宕机处理:
# 检查集群状态
tiup cluster display production-cluster
# 检查具体节点状态
tiup cluster check production-cluster --node 10.0.3.10:20160
# 重启故障节点
tiup cluster restart production-cluster --node 10.0.3.10:20160
# 如果节点无法恢复,进行替换
tiup cluster scale-in production-cluster --node 10.0.3.10:20160
tiup cluster scale-out production-cluster scale-out.yaml
数据不一致处理:
# 检查Region一致性
tiup ctl:v7.5.0 pd -u http://10.0.1.10:2379 region check miss-peer
tiup ctl:v7.5.0 pd -u http://10.0.1.10:2379 region check extra-peer
tiup ctl:v7.5.0 pd -u http://10.0.1.10:2379 region check down-peer
# 修复不一致的Region
tiup ctl:v7.5.0 pd -u http://10.0.1.10:2379 operator add remove-peer <region_id> <store_id>
tiup ctl:v7.5.0 pd -u http://10.0.1.10:2379 operator add add-peer <region_id> <store_id>
性能问题诊断:
-- 查看当前运行的查询
SELECT
id,
user,
host,
db,
command,
time,
state,
info
FROM INFORMATION_SCHEMA.PROCESSLIST
WHERE command != 'Sleep'
ORDER BY time DESC;
-- 分析慢查询
SELECT
query_time,
parse_time,
compile_time,
process_time,
wait_time,
backoff_time,
get_commit_ts_time,
commit_time,
query
FROM INFORMATION_SCHEMA.SLOW_QUERY
WHERE time >= NOW() - INTERVAL 1 HOUR
ORDER BY query_time DESC
LIMIT 10;
-- 检查热点Region
SELECT
table_name,
region_id,
start_key,
end_key,
read_bytes,
written_bytes
FROM INFORMATION_SCHEMA.TIKV_REGION_STATUS
WHERE read_bytes > 1000000 OR written_bytes > 1000000
ORDER BY read_bytes + written_bytes DESC;
2. 故障处理流程
故障响应流程:
#!/bin/bash
# incident_response.sh - 故障响应脚本
set -e
INCIDENT_ID=$1
SEVERITY=$2
if [ -z "$INCIDENT_ID" ] || [ -z "$SEVERITY" ]; then
echo "Usage: $0 <incident_id> <severity>"
echo "Severity: critical, high, medium, low"
exit 1
fi
echo "开始故障响应: $INCIDENT_ID (严重程度: $SEVERITY)"
echo "时间: $(date)"
# 1. 收集基础信息
echo "\n=== 收集集群状态 ==="
tiup cluster display production-cluster > /tmp/cluster_status_${INCIDENT_ID}.log
tiup cluster check production-cluster > /tmp/cluster_check_${INCIDENT_ID}.log
# 2. 检查关键指标
echo "\n=== 检查关键指标 ==="
curl -s 'http://prometheus:9090/api/v1/query?query=up{job="tidb"}' > /tmp/tidb_status_${INCIDENT_ID}.json
curl -s 'http://prometheus:9090/api/v1/query?query=up{job="tikv"}' > /tmp/tikv_status_${INCIDENT_ID}.json
curl -s 'http://prometheus:9090/api/v1/query?query=up{job="pd"}' > /tmp/pd_status_${INCIDENT_ID}.json
# 3. 收集日志
echo "\n=== 收集日志 ==="
mkdir -p /tmp/logs_${INCIDENT_ID}
for node in $(tiup cluster display production-cluster | grep -E 'tidb|tikv|pd' | awk '{print $2}'); do
echo "收集节点 $node 的日志"
ssh $node "tail -1000 /data/tidb/log/*.log" > /tmp/logs_${INCIDENT_ID}/${node}.log 2>/dev/null || true
done
# 4. 根据严重程度执行相应操作
case $SEVERITY in
"critical")
echo "\n=== 执行紧急响应 ==="
# 发送紧急通知
curl -X POST -H 'Content-type: application/json' \
--data "{\"text\":\"🚨 TiDB集群严重故障: $INCIDENT_ID\"}" \
$EMERGENCY_WEBHOOK_URL
# 尝试自动恢复
./auto_recovery.sh $INCIDENT_ID
;;
"high")
echo "\n=== 执行高优先级响应 ==="
# 发送告警通知
curl -X POST -H 'Content-type: application/json' \
--data "{\"text\":\"⚠️ TiDB集群高优先级故障: $INCIDENT_ID\"}" \
$ALERT_WEBHOOK_URL
;;
*)
echo "\n=== 记录故障信息 ==="
echo "故障已记录,等待人工处理"
;;
esac
# 5. 生成故障报告
echo "\n=== 生成故障报告 ==="
cat > /tmp/incident_report_${INCIDENT_ID}.md << EOF
# 故障报告: $INCIDENT_ID
## 基本信息
- 故障ID: $INCIDENT_ID
- 严重程度: $SEVERITY
- 发生时间: $(date)
- 响应人员: $(whoami)
## 故障现象
- 集群状态: 见附件 cluster_status_${INCIDENT_ID}.log
- 健康检查: 见附件 cluster_check_${INCIDENT_ID}.log
- 服务状态: 见附件 *_status_${INCIDENT_ID}.json
## 处理步骤
1. 收集故障信息
2. 分析故障原因
3. 执行恢复操作
4. 验证系统状态
## 后续行动
- [ ] 根因分析
- [ ] 预防措施
- [ ] 文档更新
- [ ] 团队分享
EOF
echo "故障响应完成,报告保存在: /tmp/incident_report_${INCIDENT_ID}.md"
3. 自动恢复脚本
#!/bin/bash
# auto_recovery.sh - 自动恢复脚本
set -e
INCIDENT_ID=$1
echo "开始自动恢复流程: $INCIDENT_ID"
# 1. 检查并重启故障服务
echo "\n=== 检查服务状态 ==="
for service in tidb tikv pd; do
echo "检查 $service 服务"
# 获取故障节点
failed_nodes=$(curl -s "http://prometheus:9090/api/v1/query?query=up{job=\"$service\"}==0" | \
jq -r '.data.result[].metric.instance' 2>/dev/null || echo "")
if [ -n "$failed_nodes" ]; then
echo "发现故障节点: $failed_nodes"
for node in $failed_nodes; do
echo "尝试重启节点: $node"
# 重启节点
tiup cluster restart production-cluster --node $node
# 等待服务启动
sleep 30
# 验证节点状态
if curl -s "http://$node/status" > /dev/null 2>&1; then
echo "节点 $node 恢复成功"
else
echo "节点 $node 恢复失败,需要人工干预"
# 发送失败通知
curl -X POST -H 'Content-type: application/json' \
--data "{\"text\":\"❌ 节点 $node 自动恢复失败\"}" \
$ALERT_WEBHOOK_URL
fi
done
else
echo "$service 服务正常"
fi
done
# 2. 检查集群健康状态
echo "\n=== 验证集群健康状态 ==="
tiup cluster check production-cluster
if [ $? -eq 0 ]; then
echo "集群健康检查通过"
# 发送恢复成功通知
curl -X POST -H 'Content-type: application/json' \
--data "{\"text\":\"✅ TiDB集群自动恢复成功: $INCIDENT_ID\"}" \
$SUCCESS_WEBHOOK_URL
else
echo "集群健康检查失败,需要人工干预"
# 发送恢复失败通知
curl -X POST -H 'Content-type: application/json' \
--data "{\"text\":\"❌ TiDB集群自动恢复失败: $INCIDENT_ID\"}" \
$ALERT_WEBHOOK_URL
fi
echo "自动恢复流程完成"
性能调优
1. 系统参数优化
操作系统优化:
# 内核参数优化
cat >> /etc/sysctl.conf << EOF
# 网络优化
net.core.rmem_max = 134217728
net.core.wmem_max = 134217728
net.ipv4.tcp_rmem = 4096 65536 134217728
net.ipv4.tcp_wmem = 4096 65536 134217728
net.core.netdev_max_backlog = 5000
# 文件系统优化
fs.file-max = 1000000
fs.nr_open = 1000000
# 虚拟内存优化
vm.swappiness = 1
vm.dirty_ratio = 15
vm.dirty_background_ratio = 5
EOF
sysctl -p
# 用户限制优化
cat >> /etc/security/limits.conf << EOF
* soft nofile 1000000
* hard nofile 1000000
* soft nproc 1000000
* hard nproc 1000000
EOF
# 禁用透明大页
echo never > /sys/kernel/mm/transparent_hugepage/enabled
echo never > /sys/kernel/mm/transparent_hugepage/defrag
TiDB参数优化:
# tidb.toml
[performance]
# 最大连接数
max-connections = 2000
# 查询内存限制
mem-quota-query = 34359738368 # 32GB
# 事务内存限制
txn-local-latches = 2048000
# 统计信息相关
stats-lease = "3s"
run-auto-analyze = true
[prepared-plan-cache]
# 预处理语句缓存
enabled = true
capacity = 1000
memory-guard-ratio = 0.1
[tikv-client]
# TiKV客户端配置
grpc-connection-count = 4
grpc-keepalive-time = 10
grpc-keepalive-timeout = 3
commit-timeout = "41s"
max-batch-size = 128
TiKV参数优化:
# tikv.toml
[server]
# gRPC配置
grpc-concurrency = 8
grpc-raft-conn-num = 10
grpc-stream-initial-window-size = "2MB"
[storage]
# 调度器配置
scheduler-worker-pool-size = 8
scheduler-pending-write-threshold = "100MB"
[raftstore]
# Raft配置
raft-log-gc-tick-interval = "10s"
raft-log-gc-threshold = 50
raft-log-gc-count-limit = 50000
raft-log-gc-size-limit = "72MB"
# Region配置
region-max-size = "144MB"
region-split-size = "96MB"
region-max-keys = 1440000
region-split-keys = 960000
[rocksdb]
# RocksDB配置
max-background-jobs = 12
max-sub-compactions = 3
[rocksdb.defaultcf]
block-size = "64KB"
block-cache-size = "45%"
write-buffer-size = "128MB"
max-write-buffer-number = 5
min-write-buffer-number-to-merge = 1
max-bytes-for-level-base = "512MB"
target-file-size-base = "8MB"
level0-file-num-compaction-trigger = 4
level0-slowdown-writes-trigger = 20
level0-stop-writes-trigger = 36
compaction-pri = 3
dynamic-level-bytes = true
2. 查询优化
索引优化:
-- 分析表统计信息
ANALYZE TABLE users;
-- 查看索引使用情况
SELECT
table_schema,
table_name,
index_name,
seq_in_index,
column_name,
cardinality
FROM INFORMATION_SCHEMA.STATISTICS
WHERE table_schema = 'mydb'
ORDER BY table_name, index_name, seq_in_index;
-- 查找未使用的索引
SELECT
object_schema,
object_name,
index_name
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE index_name IS NOT NULL
AND count_star = 0
AND object_schema = 'mydb';
-- 创建复合索引
CREATE INDEX idx_user_status_created ON users(status, created_at);
-- 创建表达式索引
CREATE INDEX idx_user_email_lower ON users((LOWER(email)));
查询重写:
-- 原始查询(性能较差)
SELECT * FROM orders o
WHERE EXISTS (
SELECT 1 FROM order_items oi
WHERE oi.order_id = o.id
AND oi.product_id = 123
);
-- 优化后的查询
SELECT DISTINCT o.* FROM orders o
INNER JOIN order_items oi ON o.id = oi.order_id
WHERE oi.product_id = 123;
-- 使用LIMIT优化大结果集
SELECT * FROM large_table
WHERE condition = 'value'
ORDER BY id
LIMIT 1000;
-- 使用分页查询
SELECT * FROM large_table
WHERE id > 1000000
ORDER BY id
LIMIT 1000;
3. 容量规划
存储容量规划:
def calculate_storage_requirements(data_size_gb, replica_count=3,
growth_rate=0.2, planning_period_months=12):
"""
计算存储容量需求
Args:
data_size_gb: 当前数据大小(GB)
replica_count: 副本数量
growth_rate: 年增长率
planning_period_months: 规划周期(月)
"""
# 计算未来数据大小
future_data_size = data_size_gb * (1 + growth_rate) ** (planning_period_months / 12)
# 考虑副本和安全余量
total_storage_needed = future_data_size * replica_count * 1.3 # 30%安全余量
# 计算每个TiKV节点的存储需求
tikv_node_count = max(3, replica_count) # 至少3个节点
storage_per_node = total_storage_needed / tikv_node_count
return {
'current_data_size_gb': data_size_gb,
'future_data_size_gb': future_data_size,
'total_storage_needed_gb': total_storage_needed,
'tikv_node_count': tikv_node_count,
'storage_per_node_gb': storage_per_node,
'recommended_disk_size_gb': storage_per_node * 1.2 # 20%额外空间
}
# 示例计算
result = calculate_storage_requirements(1000, 3, 0.3, 24)
print(f"当前数据: {result['current_data_size_gb']}GB")
print(f"预计数据: {result['future_data_size_gb']:.1f}GB")
print(f"总存储需求: {result['total_storage_needed_gb']:.1f}GB")
print(f"TiKV节点数: {result['tikv_node_count']}")
print(f"每节点存储: {result['storage_per_node_gb']:.1f}GB")
print(f"推荐磁盘大小: {result['recommended_disk_size_gb']:.1f}GB")
计算资源规划:
def calculate_compute_requirements(peak_qps, avg_query_complexity=1.0,
target_response_time_ms=100):
"""
计算计算资源需求
Args:
peak_qps: 峰值QPS
avg_query_complexity: 平均查询复杂度(1.0为基准)
target_response_time_ms: 目标响应时间(毫秒)
"""
# 计算TiDB节点需求
# 假设每个TiDB节点可以处理1000 QPS的简单查询
base_qps_per_tidb = 1000 / avg_query_complexity
tidb_node_count = max(2, int(peak_qps / base_qps_per_tidb * 1.5)) # 50%安全余量
# 计算TiKV节点需求
# 基于存储和计算需求
tikv_node_count = max(3, int(peak_qps / 2000 * avg_query_complexity))
# 计算资源配置
tidb_cpu_cores = max(8, int(peak_qps / tidb_node_count / 100))
tidb_memory_gb = max(16, tidb_cpu_cores * 2)
tikv_cpu_cores = max(16, int(peak_qps / tikv_node_count / 50))
tikv_memory_gb = max(32, tikv_cpu_cores * 2)
return {
'peak_qps': peak_qps,
'tidb_node_count': tidb_node_count,
'tikv_node_count': tikv_node_count,
'tidb_cpu_cores': tidb_cpu_cores,
'tidb_memory_gb': tidb_memory_gb,
'tikv_cpu_cores': tikv_cpu_cores,
'tikv_memory_gb': tikv_memory_gb,
'estimated_monthly_cost': (tidb_node_count * 500 + tikv_node_count * 800)
}
# 示例计算
result = calculate_compute_requirements(10000, 1.5, 50)
print(f"峰值QPS: {result['peak_qps']}")
print(f"TiDB节点: {result['tidb_node_count']}个")
print(f"TiKV节点: {result['tikv_node_count']}个")
print(f"TiDB配置: {result['tidb_cpu_cores']}核CPU, {result['tidb_memory_gb']}GB内存")
print(f"TiKV配置: {result['tikv_cpu_cores']}核CPU, {result['tikv_memory_gb']}GB内存")
print(f"预估月成本: ${result['estimated_monthly_cost']}")
安全管理
1. 用户权限管理
创建用户和角色:
-- 创建角色
CREATE ROLE 'app_read';
CREATE ROLE 'app_write';
CREATE ROLE 'dba_admin';
-- 为角色分配权限
GRANT SELECT ON mydb.* TO 'app_read';
GRANT SELECT, INSERT, UPDATE, DELETE ON mydb.* TO 'app_write';
GRANT ALL PRIVILEGES ON *.* TO 'dba_admin' WITH GRANT OPTION;
-- 创建用户
CREATE USER 'app_user'@'%' IDENTIFIED BY 'strong_password';
CREATE USER 'readonly_user'@'%' IDENTIFIED BY 'readonly_password';
CREATE USER 'admin_user'@'%' IDENTIFIED BY 'admin_password';
-- 为用户分配角色
GRANT 'app_write' TO 'app_user'@'%';
GRANT 'app_read' TO 'readonly_user'@'%';
GRANT 'dba_admin' TO 'admin_user'@'%';
-- 设置默认角色
SET DEFAULT ROLE 'app_write' TO 'app_user'@'%';
SET DEFAULT ROLE 'app_read' TO 'readonly_user'@'%';
SET DEFAULT ROLE 'dba_admin' TO 'admin_user'@'%';
权限审计:
-- 查看用户权限
SHOW GRANTS FOR 'app_user'@'%';
-- 查看所有用户
SELECT user, host, account_locked, password_expired
FROM mysql.user;
-- 查看角色权限
SELECT * FROM mysql.role_edges;
SELECT * FROM mysql.default_roles;
-- 查看权限使用情况
SELECT
user,
host,
db,
command_type,
argument,
mysql_errno,
message
FROM mysql.general_log
WHERE event_time >= NOW() - INTERVAL 1 HOUR
AND command_type IN ('Connect', 'Query', 'Execute')
ORDER BY event_time DESC;
2. 数据加密
传输加密(TLS):
# 生成CA证书
openssl genrsa -out ca-key.pem 4096
openssl req -new -x509 -days 3650 -key ca-key.pem -out ca.pem
# 生成服务器证书
openssl genrsa -out server-key.pem 4096
openssl req -new -key server-key.pem -out server.csr
openssl x509 -req -days 365 -in server.csr -CA ca.pem -CAkey ca-key.pem -CAcreateserial -out server.pem
# 生成客户端证书
openssl genrsa -out client-key.pem 4096
openssl req -new -key client-key.pem -out client.csr
openssl x509 -req -days 365 -in client.csr -CA ca.pem -CAkey ca-key.pem -CAcreateserial -out client.pem
TiDB TLS配置:
# tidb.toml
[security]
# 启用TLS
ssl-cert = "/path/to/server.pem"
ssl-key = "/path/to/server-key.pem"
ssl-ca = "/path/to/ca.pem"
# 要求客户端证书
require-secure-transport = true
# 集群内部通信加密
cluster-ssl-cert = "/path/to/server.pem"
cluster-ssl-key = "/path/to/server-key.pem"
cluster-ssl-ca = "/path/to/ca.pem"
静态数据加密:
# tikv.toml
[security.encryption]
# 启用静态加密
data-encryption-method = "aes256-ctr"
data-key-rotation-period = "7d"
# 主密钥配置
[security.encryption.master-key]
type = "kms"
key-id = "your-kms-key-id"
region = "us-west-2"
endpoint = "https://kms.us-west-2.amazonaws.com"
3. 网络安全
防火墙配置:
# TiDB节点(4000端口)
firewall-cmd --permanent --add-port=4000/tcp
firewall-cmd --permanent --add-port=10080/tcp # 状态端口
# TiKV节点(20160端口)
firewall-cmd --permanent --add-port=20160/tcp
firewall-cmd --permanent --add-port=20180/tcp # 状态端口
# PD节点(2379, 2380端口)
firewall-cmd --permanent --add-port=2379/tcp
firewall-cmd --permanent --add-port=2380/tcp
# 监控端口
firewall-cmd --permanent --add-port=9090/tcp # Prometheus
firewall-cmd --permanent --add-port=3000/tcp # Grafana
firewall-cmd --permanent --add-port=9093/tcp # Alertmanager
firewall-cmd --reload
网络隔离:
# kubernetes网络策略示例
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: tidb-network-policy
namespace: tidb-cluster
spec:
podSelector:
matchLabels:
app: tidb
policyTypes:
- Ingress
- Egress
ingress:
- from:
- namespaceSelector:
matchLabels:
name: application
ports:
- protocol: TCP
port: 4000
- from:
- namespaceSelector:
matchLabels:
name: monitoring
ports:
- protocol: TCP
port: 10080
egress:
- to:
- podSelector:
matchLabels:
app: tikv
ports:
- protocol: TCP
port: 20160
- to:
- podSelector:
matchLabels:
app: pd
ports:
- protocol: TCP
port: 2379
数据迁移与同步
1. 数据导入
使用TiDB Lightning进行批量导入:
# 安装TiDB Lightning
tiup install tidb-lightning
# 准备配置文件
cat > lightning.toml << EOF
[lightning]
level = "info"
file = "tidb-lightning.log"
[tikv-importer]
backend = "local"
sorted-kv-dir = "/tmp/sorted-kv-dir"
[mydumper]
data-source-dir = "/data/export"
[tidb]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""
status-port = 10080
pd-addr = "127.0.0.1:2379"
[post-restore]
checksum = true
analyze = true
EOF
# 执行导入
tiup tidb-lightning -config lightning.toml
使用Dumpling进行数据导出:
# 导出整个数据库
tiup dumpling -h 127.0.0.1 -P 4000 -u root -p password -B mydb -o /data/backup/
# 导出特定表
tiup dumpling -h 127.0.0.1 -P 4000 -u root -p password -B mydb -T users,orders -o /data/backup/
# 并行导出
tiup dumpling -h 127.0.0.1 -P 4000 -u root -p password -B mydb -o /data/backup/ -t 8
# 压缩导出
tiup dumpling -h 127.0.0.1 -P 4000 -u root -p password -B mydb -o /data/backup/ --compress gzip
2. 实时同步
使用TiCDC进行数据同步:
# 启动TiCDC服务
tiup cluster start-cdc production-cluster
# 创建同步任务
tiup ctl:v7.5.0 cdc changefeed create \
--pd=http://127.0.0.1:2379 \
--sink-uri="mysql://user:password@127.0.0.1:3306/" \
--changefeed-id="replication-task-1"
# 查看同步状态
tiup ctl:v7.5.0 cdc changefeed list --pd=http://127.0.0.1:2379
# 暂停同步任务
tiup ctl:v7.5.0 cdc changefeed pause \
--pd=http://127.0.0.1:2379 \
--changefeed-id="replication-task-1"
# 恢复同步任务
tiup ctl:v7.5.0 cdc changefeed resume \
--pd=http://127.0.0.1:2379 \
--changefeed-id="replication-task-1"
同步到Kafka:
# 创建Kafka同步任务
tiup ctl:v7.5.0 cdc changefeed create \
--pd=http://127.0.0.1:2379 \
--sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro" \
--changefeed-id="kafka-sync-task" \
--config changefeed.toml
# changefeed.toml配置文件
cat > changefeed.toml << EOF
[filter]
rules = ['*.*', '!test.*']
[sink]
protocol = "avro"
[sink.kafka-config]
partition-num = 3
replication-factor = 1
kafka-version = "2.6.0"
max-message-bytes = 1048576
compression = "gzip"
[sink.schema-registry]
url = "http://127.0.0.1:8081"
EOF
3. 跨集群迁移
使用BR进行集群迁移:
#!/bin/bash
# cluster_migration.sh - 集群迁移脚本
set -e
SOURCE_PD="127.0.0.1:2379"
TARGET_PD="192.168.1.10:2379"
BACKUP_PATH="s3://migration-backup/cluster-migration"
echo "开始集群迁移: $(date)"
# 1. 备份源集群
echo "=== 备份源集群 ==="
br backup full \
--pd $SOURCE_PD \
--storage "$BACKUP_PATH" \
--ratelimit 200MB \
--concurrency 8
if [ $? -ne 0 ]; then
echo "备份失败,迁移终止"
exit 1
fi
# 2. 验证目标集群
echo "=== 验证目标集群 ==="
tiup cluster check target-cluster
if [ $? -ne 0 ]; then
echo "目标集群检查失败,迁移终止"
exit 1
fi
# 3. 恢复到目标集群
echo "=== 恢复到目标集群 ==="
br restore full \
--pd $TARGET_PD \
--storage "$BACKUP_PATH" \
--concurrency 8
if [ $? -ne 0 ]; then
echo "恢复失败,请检查目标集群状态"
exit 1
fi
# 4. 验证数据一致性
echo "=== 验证数据一致性 ==="
# 这里可以添加数据校验逻辑
mysql -h 127.0.0.1 -P 4000 -u root -e "SELECT COUNT(*) FROM information_schema.tables;"
mysql -h 192.168.1.10 -P 4000 -u root -e "SELECT COUNT(*) FROM information_schema.tables;"
echo "集群迁移完成: $(date)"
运维最佳实践
1. 日常运维检查清单
每日检查: - [ ] 集群节点状态检查 - [ ] 关键监控指标检查 - [ ] 备份任务执行状态 - [ ] 磁盘空间使用率 - [ ] 慢查询分析 - [ ] 告警信息处理
每周检查: - [ ] 性能趋势分析 - [ ] 容量规划评估 - [ ] 安全日志审计 - [ ] 备份恢复测试 - [ ] 文档更新维护
每月检查: - [ ] 集群健康度评估 - [ ] 性能基准测试 - [ ] 灾难恢复演练 - [ ] 版本更新计划 - [ ] 成本优化分析
2. 运维自动化脚本
健康检查脚本:
#!/bin/bash
# health_check.sh - 集群健康检查脚本
set -e
CLUSTER_NAME="production-cluster"
REPORT_FILE="/tmp/health_report_$(date +%Y%m%d_%H%M%S).txt"
echo "TiDB集群健康检查报告" > $REPORT_FILE
echo "检查时间: $(date)" >> $REPORT_FILE
echo "集群名称: $CLUSTER_NAME" >> $REPORT_FILE
echo "" >> $REPORT_FILE
# 1. 集群状态检查
echo "=== 集群状态 ===" >> $REPORT_FILE
tiup cluster display $CLUSTER_NAME >> $REPORT_FILE 2>&1
echo "" >> $REPORT_FILE
# 2. 节点健康检查
echo "=== 节点健康检查 ===" >> $REPORT_FILE
tiup cluster check $CLUSTER_NAME >> $REPORT_FILE 2>&1
echo "" >> $REPORT_FILE
# 3. 关键指标检查
echo "=== 关键指标 ===" >> $REPORT_FILE
# TiDB QPS
qps=$(curl -s 'http://prometheus:9090/api/v1/query?query=sum(rate(tidb_server_query_total[5m]))' | jq -r '.data.result[0].value[1]' 2>/dev/null || echo "N/A")
echo "当前QPS: $qps" >> $REPORT_FILE
# TiKV存储使用率
storage_usage=$(curl -s 'http://prometheus:9090/api/v1/query?query=avg(tikv_store_size_bytes/tikv_store_capacity_bytes)*100' | jq -r '.data.result[0].value[1]' 2>/dev/null || echo "N/A")
echo "平均存储使用率: ${storage_usage}%" >> $REPORT_FILE
# 连接数
connections=$(mysql -h 127.0.0.1 -P 4000 -u root -e "SELECT COUNT(*) as connections FROM information_schema.processlist;" -s -N 2>/dev/null || echo "N/A")
echo "当前连接数: $connections" >> $REPORT_FILE
echo "" >> $REPORT_FILE
# 4. 慢查询检查
echo "=== 慢查询TOP5 ===" >> $REPORT_FILE
mysql -h 127.0.0.1 -P 4000 -u root -e "
SELECT
ROUND(query_time, 2) as query_time_sec,
LEFT(query, 100) as query_preview
FROM information_schema.slow_query
WHERE time >= NOW() - INTERVAL 1 HOUR
ORDER BY query_time DESC
LIMIT 5;
" >> $REPORT_FILE 2>&1
echo "" >> $REPORT_FILE
# 5. 磁盘空间检查
echo "=== 磁盘空间 ===" >> $REPORT_FILE
for node in $(tiup cluster display $CLUSTER_NAME | grep -E 'tikv|tidb|pd' | awk '{print $2}' | sort -u); do
echo "节点 $node:" >> $REPORT_FILE
ssh $node "df -h | grep -E '/$|/data'" >> $REPORT_FILE 2>/dev/null || echo " 无法连接" >> $REPORT_FILE
done
echo "" >> $REPORT_FILE
# 6. 生成健康评分
echo "=== 健康评分 ===" >> $REPORT_FILE
# 简单的健康评分逻辑
score=100
# 检查节点状态
down_nodes=$(tiup cluster display $CLUSTER_NAME | grep -c "Down" || echo 0)
if [ $down_nodes -gt 0 ]; then
score=$((score - down_nodes * 20))
echo "发现 $down_nodes 个宕机节点,扣除 $((down_nodes * 20)) 分" >> $REPORT_FILE
fi
# 检查存储使用率
if [ "$storage_usage" != "N/A" ] && [ $(echo "$storage_usage > 80" | bc -l) -eq 1 ]; then
score=$((score - 10))
echo "存储使用率超过80%,扣除10分" >> $REPORT_FILE
fi
# 检查连接数
if [ "$connections" != "N/A" ] && [ $connections -gt 1000 ]; then
score=$((score - 5))
echo "连接数超过1000,扣除5分" >> $REPORT_FILE
fi
echo "最终健康评分: $score/100" >> $REPORT_FILE
# 7. 发送报告
if [ $score -lt 80 ]; then
# 健康评分低于80分,发送告警
curl -X POST -H 'Content-type: application/json' \
--data "{\"text\":\"⚠️ TiDB集群健康评分: $score/100,请检查详细报告\"}" \
$ALERT_WEBHOOK_URL
fi
echo "健康检查完成,报告保存在: $REPORT_FILE"
cat $REPORT_FILE
3. 监控告警配置
Alertmanager配置:
# alertmanager.yml
global:
smtp_smarthost: 'smtp.company.com:587'
smtp_from: 'alerts@company.com'
smtp_auth_username: 'alerts@company.com'
smtp_auth_password: 'password'
route:
group_by: ['alertname', 'cluster']
group_wait: 10s
group_interval: 10s
repeat_interval: 1h
receiver: 'web.hook'
routes:
- match:
severity: critical
receiver: 'critical-alerts'
- match:
severity: warning
receiver: 'warning-alerts'
receivers:
- name: 'web.hook'
webhook_configs:
- url: 'http://webhook-server:5000/alerts'
- name: 'critical-alerts'
email_configs:
- to: 'dba-team@company.com'
subject: '🚨 TiDB Critical Alert: {{ .GroupLabels.alertname }}'
body: |
{{ range .Alerts }}
Alert: {{ .Annotations.summary }}
Description: {{ .Annotations.description }}
Instance: {{ .Labels.instance }}
Severity: {{ .Labels.severity }}
{{ end }}
webhook_configs:
- url: 'http://webhook-server:5000/critical'
- name: 'warning-alerts'
email_configs:
- to: 'ops-team@company.com'
subject: '⚠️ TiDB Warning: {{ .GroupLabels.alertname }}'
body: |
{{ range .Alerts }}
Alert: {{ .Annotations.summary }}
Description: {{ .Annotations.description }}
Instance: {{ .Labels.instance }}
{{ end }}
总结
本章详细介绍了TiDB集群的管理与运维实践,涵盖了以下关键领域:
关键要点
集群扩缩容:
- 掌握TiUP工具的使用
- 理解扩缩容的规划和执行流程
- 学会资源需求估算和成本控制
备份恢复:
- 制定完善的备份策略
- 掌握BR工具的使用
- 实现自动化备份和监控
监控告警:
- 配置Prometheus和Grafana监控
- 设置合理的告警规则
- 建立完善的告警处理流程
故障处理:
- 建立标准化的故障响应流程
- 实现自动化故障恢复
- 完善故障记录和分析机制
性能调优:
- 系统参数优化
- 查询和索引优化
- 容量规划和资源配置
安全管理:
- 用户权限控制
- 数据传输和存储加密
- 网络安全配置
数据迁移:
- 批量数据导入导出
- 实时数据同步
- 跨集群迁移
最佳实践
运维规范:
- 建立标准化的运维流程
- 制定详细的操作手册
- 定期进行运维培训
自动化运维:
- 实现监控告警自动化
- 开发运维脚本和工具
- 建立CI/CD流水线
容量管理:
- 定期进行容量规划
- 监控资源使用趋势
- 及时进行扩容决策
安全防护:
- 实施最小权限原则
- 定期进行安全审计
- 建立安全事件响应机制
文档管理:
- 维护完整的运维文档
- 记录故障处理经验
- 定期更新操作手册
下一步学习建议
深入学习:
- TiDB内核原理和架构
- 分布式系统理论
- 数据库性能调优
实践项目:
- 搭建测试环境进行实验
- 参与开源项目贡献
- 分享运维经验和最佳实践
技能拓展:
- 学习云原生技术
- 掌握容器化部署
- 了解微服务架构
认证考试:
- 参加TiDB认证考试
- 获得相关技术认证
- 提升专业技能水平
通过本章的学习,您应该能够独立完成TiDB集群的日常运维工作,处理常见的故障场景,并建立完善的监控和告警体系。这些技能对于维护生产环境中的TiDB集群至关重要。
python from enum import Enum from dataclasses import dataclass from typing import Dict, List, Any, Optional, Tuple, Union import json import time import random from datetime import datetime, timedelta
class NodeType(Enum): “”“节点类型”“” TIDB = “tidb” # TiDB节点 TIKV = “tikv” # TiKV节点 PD = “pd” # PD节点 TIFLASH = “tiflash” # TiFlash节点 TIPROXY = “tiproxy” # TiProxy节点 PUMP = “pump” # Pump节点 DRAINER = “drainer” # Drainer节点
class ScaleOperation(Enum): “”“扩缩容操作类型”“” SCALE_OUT = “scale_out” # 扩容 SCALE_IN = “scale_in” # 缩容 REPLACE = “replace” # 替换 UPGRADE = “upgrade” # 升级
class ClusterStatus(Enum): “”“集群状态”“” HEALTHY = “healthy” # 健康 WARNING = “warning” # 警告 CRITICAL = “critical” # 严重 DOWN = “down” # 宕机 SCALING = “scaling” # 扩缩容中 UPGRADING = “upgrading” # 升级中
class BackupType(Enum): “”“备份类型”“” FULL = “full” # 全量备份 INCREMENTAL = “incremental” # 增量备份 LOG = “log” # 日志备份 SNAPSHOT = “snapshot” # 快照备份
@dataclass class ClusterNode: “”“集群节点信息”“” node_id: str node_type: NodeType host: str port: int status: str version: str uptime: str cpu_usage: float memory_usage: float disk_usage: float network_io: Dict[str, float] last_heartbeat: datetime
@dataclass class ScaleConfig: “”“扩缩容配置”“” operation: ScaleOperation node_type: NodeType target_nodes: List[Dict[str, Any]] resource_requirements: Dict[str, Any] estimated_duration: int # 分钟 rollback_plan: Optional[str] validation_steps: List[str]
@dataclass class BackupConfig: “”“备份配置”“” backup_type: BackupType storage_path: str compression: bool encryption: bool retention_days: int schedule: str # cron表达式 parallel_threads: int bandwidth_limit: str exclude_tables: List[str]
@dataclass class MonitoringAlert: “”“监控告警”“” alert_id: str alert_name: str severity: str description: str metric_name: str threshold: float current_value: float duration: int # 秒 labels: Dict[str, str] annotations: Dict[str, str] created_at: datetime resolved_at: Optional[datetime]
@dataclass class MaintenanceTask: “”“运维任务”“” task_id: str task_name: str task_type: str description: str scheduled_time: datetime estimated_duration: int # 分钟 impact_level: str prerequisites: List[str] rollback_plan: str assigned_to: str status: str
class TiDBClusterManager: “”“TiDB集群管理器”“”
def __init__(self, cluster_name: str):
self.cluster_name = cluster_name
self.nodes = self._initialize_cluster_nodes()
self.monitoring_rules = self._initialize_monitoring_rules()
self.backup_configs = self._initialize_backup_configs()
self.maintenance_procedures = self._initialize_maintenance_procedures()
def _initialize_cluster_nodes(self) -> List[ClusterNode]:
"""初始化集群节点"""
nodes = []
# PD节点
for i in range(3):
node = ClusterNode(
node_id=f"pd-{i+1}",
node_type=NodeType.PD,
host=f"10.0.1.{i+10}",
port=2379,
status="Up",
version="v7.5.0",
uptime="15d 8h 30m",
cpu_usage=random.uniform(10, 30),
memory_usage=random.uniform(40, 60),
disk_usage=random.uniform(20, 40),
network_io={"in": random.uniform(10, 50), "out": random.uniform(10, 50)},
last_heartbeat=datetime.now() - timedelta(seconds=random.randint(1, 30))
)
nodes.append(node)
# TiDB节点
for i in range(2):
node = ClusterNode(
node_id=f"tidb-{i+1}",
node_type=NodeType.TIDB,
host=f"10.0.2.{i+10}",
port=4000,
status="Up",
version="v7.5.0",
uptime="15d 8h 30m",
cpu_usage=random.uniform(30, 70),
memory_usage=random.uniform(50, 80),
disk_usage=random.uniform(10, 30),
network_io={"in": random.uniform(50, 200), "out": random.uniform(50, 200)},
last_heartbeat=datetime.now() - timedelta(seconds=random.randint(1, 30))
)
nodes.append(node)
# TiKV节点
for i in range(3):
node = ClusterNode(
node_id=f"tikv-{i+1}",
node_type=NodeType.TIKV,
host=f"10.0.3.{i+10}",
port=20160,
status="Up",
version="v7.5.0",
uptime="15d 8h 30m",
cpu_usage=random.uniform(40, 80),
memory_usage=random.uniform(60, 90),
disk_usage=random.uniform(50, 80),
network_io={"in": random.uniform(100, 500), "out": random.uniform(100, 500)},
last_heartbeat=datetime.now() - timedelta(seconds=random.randint(1, 30))
)
nodes.append(node)
# TiFlash节点
for i in range(2):
node = ClusterNode(
node_id=f"tiflash-{i+1}",
node_type=NodeType.TIFLASH,
host=f"10.0.4.{i+10}",
port=9000,
status="Up",
version="v7.5.0",
uptime="15d 8h 30m",
cpu_usage=random.uniform(20, 60),
memory_usage=random.uniform(40, 70),
disk_usage=random.uniform(30, 60),
network_io={"in": random.uniform(50, 300), "out": random.uniform(50, 300)},
last_heartbeat=datetime.now() - timedelta(seconds=random.randint(1, 30))
)
nodes.append(node)
return nodes
def _initialize_monitoring_rules(self) -> List[Dict[str, Any]]:
"""初始化监控规则"""
return [
{
"name": "TiDB Server Down",
"expr": "up{job=\"tidb\"} == 0",
"for": "1m",
"severity": "critical",
"description": "TiDB server {{ $labels.instance }} is down",
"runbook_url": "https://docs.pingcap.com/tidb/stable/alert-rules#tidb_server_down"
},
{
"name": "TiKV Server Down",
"expr": "up{job=\"tikv\"} == 0",
"for": "1m",
"severity": "critical",
"description": "TiKV server {{ $labels.instance }} is down",
"runbook_url": "https://docs.pingcap.com/tidb/stable/alert-rules#tikv_server_down"
},
{
"name": "PD Server Down",
"expr": "up{job=\"pd\"} == 0",
"for": "1m",
"severity": "critical",
"description": "PD server {{ $labels.instance }} is down",
"runbook_url": "https://docs.pingcap.com/tidb/stable/alert-rules#pd_server_down"
},
{
"name": "TiKV Disk Usage High",
"expr": "(tikv_store_size_bytes / tikv_store_capacity_bytes) > 0.8",
"for": "5m",
"severity": "warning",
"description": "TiKV {{ $labels.instance }} disk usage is above 80%",
"runbook_url": "https://docs.pingcap.com/tidb/stable/alert-rules#tikv_disk_usage_high"
},
{
"name": "TiDB Query Duration High",
"expr": "histogram_quantile(0.99, rate(tidb_server_handle_query_duration_seconds_bucket[5m])) > 1",
"for": "5m",
"severity": "warning",
"description": "TiDB {{ $labels.instance }} 99th percentile query duration is above 1s",
"runbook_url": "https://docs.pingcap.com/tidb/stable/alert-rules#tidb_query_duration_high"
},
{
"name": "TiKV Region Count High",
"expr": "tikv_raftstore_region_count > 20000",
"for": "5m",
"severity": "warning",
"description": "TiKV {{ $labels.instance }} region count is above 20000",
"runbook_url": "https://docs.pingcap.com/tidb/stable/alert-rules#tikv_region_count_high"
}
]
def _initialize_backup_configs(self) -> Dict[str, BackupConfig]:
"""初始化备份配置"""
return {
"daily_full": BackupConfig(
backup_type=BackupType.FULL,
storage_path="s3://tidb-backup/daily/",
compression=True,
encryption=True,
retention_days=30,
schedule="0 2 * * *", # 每天凌晨2点
parallel_threads=8,
bandwidth_limit="100MB/s",
exclude_tables=["test.temp_table", "log.access_log"]
),
"hourly_incremental": BackupConfig(
backup_type=BackupType.INCREMENTAL,
storage_path="s3://tidb-backup/incremental/",
compression=True,
encryption=True,
retention_days=7,
schedule="0 * * * *", # 每小时
parallel_threads=4,
bandwidth_limit="50MB/s",
exclude_tables=[]
),
"log_backup": BackupConfig(
backup_type=BackupType.LOG,
storage_path="s3://tidb-backup/log/",
compression=True,
encryption=True,
retention_days=7,
schedule="continuous",
parallel_threads=2,
bandwidth_limit="20MB/s",
exclude_tables=[]
)
}
def _initialize_maintenance_procedures(self) -> Dict[str, Dict[str, Any]]:
"""初始化运维程序"""
return {
"scale_out_tikv": {
"name": "TiKV节点扩容",
"description": "向集群添加新的TiKV节点",
"steps": [
"准备新服务器并安装操作系统",
"配置网络和防火墙规则",
"安装TiKV软件包",
"生成TiKV配置文件",
"启动TiKV服务",
"验证节点状态",
"等待数据平衡完成",
"验证集群健康状态"
],
"estimated_duration": 120, # 分钟
"rollback_plan": "停止新节点服务,从集群中移除",
"prerequisites": ["确保有足够的网络带宽", "验证存储空间充足", "确认PD集群健康"]
},
"upgrade_cluster": {
"name": "集群版本升级",
"description": "升级TiDB集群到新版本",
"steps": [
"备份集群数据",
"下载新版本软件包",
"升级PD节点(滚动升级)",
"升级TiKV节点(滚动升级)",
"升级TiDB节点(滚动升级)",
"升级TiFlash节点(滚动升级)",
"验证集群功能",
"更新监控配置"
],
"estimated_duration": 240, # 分钟
"rollback_plan": "回滚到之前版本,恢复备份数据",
"prerequisites": ["完成数据备份", "验证新版本兼容性", "准备回滚方案"]
},
"disaster_recovery": {
"name": "灾难恢复",
"description": "从备份恢复集群数据",
"steps": [
"评估故障范围",
"准备新的集群环境",
"恢复PD集群",
"恢复TiKV数据",
"启动TiDB服务",
"验证数据完整性",
"切换应用流量",
"监控系统状态"
],
"estimated_duration": 360, # 分钟
"rollback_plan": "切换到备用集群或降级服务",
"prerequisites": ["确认备份数据可用", "准备恢复环境", "通知相关团队"]
}
}
def get_cluster_status(self) -> Dict[str, Any]:
"""获取集群状态"""
total_nodes = len(self.nodes)
healthy_nodes = len([n for n in self.nodes if n.status == "Up"])
# 按节点类型统计
node_stats = {}
for node_type in NodeType:
type_nodes = [n for n in self.nodes if n.node_type == node_type]
if type_nodes:
node_stats[node_type.value] = {
"total": len(type_nodes),
"healthy": len([n for n in type_nodes if n.status == "Up"]),
"avg_cpu": sum(n.cpu_usage for n in type_nodes) / len(type_nodes),
"avg_memory": sum(n.memory_usage for n in type_nodes) / len(type_nodes),
"avg_disk": sum(n.disk_usage for n in type_nodes) / len(type_nodes)
}
# 确定整体状态
if healthy_nodes == total_nodes:
overall_status = ClusterStatus.HEALTHY
elif healthy_nodes >= total_nodes * 0.8:
overall_status = ClusterStatus.WARNING
elif healthy_nodes >= total_nodes * 0.5:
overall_status = ClusterStatus.CRITICAL
else:
overall_status = ClusterStatus.DOWN
return {
"cluster_name": self.cluster_name,
"overall_status": overall_status.value,
"total_nodes": total_nodes,
"healthy_nodes": healthy_nodes,
"node_statistics": node_stats,
"last_updated": datetime.now().isoformat()
}
def plan_scale_operation(self, operation: ScaleOperation, node_type: NodeType,
target_count: int) -> ScaleConfig:
"""规划扩缩容操作"""
current_nodes = [n for n in self.nodes if n.node_type == node_type]
current_count = len(current_nodes)
if operation == ScaleOperation.SCALE_OUT:
nodes_to_add = target_count - current_count
if nodes_to_add <= 0:
raise ValueError("目标节点数必须大于当前节点数")
target_nodes = []
for i in range(nodes_to_add):
node_config = {
"host": f"10.0.{node_type.value[0]}.{current_count + i + 20}",
"port": self._get_default_port(node_type),
"data_dir": f"/data/{node_type.value}",
"log_dir": f"/logs/{node_type.value}"
}
target_nodes.append(node_config)
estimated_duration = 60 + nodes_to_add * 30 # 基础时间 + 每个节点30分钟
elif operation == ScaleOperation.SCALE_IN:
nodes_to_remove = current_count - target_count
if nodes_to_remove <= 0:
raise ValueError("目标节点数必须小于当前节点数")
# 选择要移除的节点(通常选择最新的节点)
target_nodes = []
for i in range(nodes_to_remove):
node = current_nodes[-(i+1)]
target_nodes.append({
"node_id": node.node_id,
"host": node.host,
"port": node.port
})
estimated_duration = 30 + nodes_to_remove * 45 # 基础时间 + 每个节点45分钟
# 资源需求估算
resource_requirements = self._estimate_resource_requirements(node_type, target_count)
# 验证步骤
validation_steps = [
"检查节点网络连通性",
"验证存储空间充足",
"确认集群状态健康",
"验证负载均衡",
"检查监控指标"
]
return ScaleConfig(
operation=operation,
node_type=node_type,
target_nodes=target_nodes,
resource_requirements=resource_requirements,
estimated_duration=estimated_duration,
rollback_plan=f"回滚{operation.value}操作,恢复到原始节点配置",
validation_steps=validation_steps
)
def _get_default_port(self, node_type: NodeType) -> int:
"""获取节点类型的默认端口"""
port_mapping = {
NodeType.TIDB: 4000,
NodeType.TIKV: 20160,
NodeType.PD: 2379,
NodeType.TIFLASH: 9000,
NodeType.TIPROXY: 6000
}
return port_mapping.get(node_type, 8080)
def _estimate_resource_requirements(self, node_type: NodeType, node_count: int) -> Dict[str, Any]:
"""估算资源需求"""
# 每个节点类型的基础资源需求
base_requirements = {
NodeType.TIDB: {"cpu_cores": 8, "memory_gb": 16, "disk_gb": 100},
NodeType.TIKV: {"cpu_cores": 16, "memory_gb": 32, "disk_gb": 1000},
NodeType.PD: {"cpu_cores": 4, "memory_gb": 8, "disk_gb": 100},
NodeType.TIFLASH: {"cpu_cores": 16, "memory_gb": 64, "disk_gb": 2000}
}
base_req = base_requirements.get(node_type, {"cpu_cores": 4, "memory_gb": 8, "disk_gb": 100})
return {
"total_cpu_cores": base_req["cpu_cores"] * node_count,
"total_memory_gb": base_req["memory_gb"] * node_count,
"total_disk_gb": base_req["disk_gb"] * node_count,
"network_bandwidth_mbps": 1000 * node_count,
"estimated_cost_monthly": self._estimate_monthly_cost(node_type, node_count)
}
def _estimate_monthly_cost(self, node_type: NodeType, node_count: int) -> float:
"""估算月度成本"""
# 简化的成本估算(实际应根据云服务商定价)
cost_per_node = {
NodeType.TIDB: 500.0,
NodeType.TIKV: 800.0,
NodeType.PD: 300.0,
NodeType.TIFLASH: 1000.0
}
base_cost = cost_per_node.get(node_type, 400.0)
return base_cost * node_count
def generate_scale_commands(self, scale_config: ScaleConfig) -> List[str]:
"""生成扩缩容命令"""
commands = []
if scale_config.operation == ScaleOperation.SCALE_OUT:
# 扩容命令
commands.append("# TiDB集群扩容操作")
commands.append(f"# 扩容{scale_config.node_type.value}节点")
commands.append("")
# 生成配置文件
commands.append("# 1. 生成扩容配置文件")
commands.append(f"cat > scale-out-{scale_config.node_type.value}.yaml << EOF")
if scale_config.node_type == NodeType.TIKV:
for i, node in enumerate(scale_config.target_nodes):
commands.append(f"tikv_servers:")
commands.append(f" - host: {node['host']}")
commands.append(f" port: {node['port']}")
commands.append(f" status_port: {node['port'] + 1}")
commands.append(f" data_dir: {node['data_dir']}")
commands.append(f" log_dir: {node['log_dir']}")
commands.append("")
elif scale_config.node_type == NodeType.TIDB:
for i, node in enumerate(scale_config.target_nodes):
commands.append(f"tidb_servers:")
commands.append(f" - host: {node['host']}")
commands.append(f" port: {node['port']}")
commands.append(f" status_port: {node['port'] + 80}")
commands.append(f" log_dir: {node['log_dir']}")
commands.append("")
commands.append("EOF")
commands.append("")
# 执行扩容
commands.append("# 2. 执行扩容操作")
commands.append(f"tiup cluster scale-out {self.cluster_name} scale-out-{scale_config.node_type.value}.yaml")
commands.append("")
# 验证扩容结果
commands.append("# 3. 验证扩容结果")
commands.append(f"tiup cluster display {self.cluster_name}")
commands.append(f"tiup cluster check {self.cluster_name}")
elif scale_config.operation == ScaleOperation.SCALE_IN:
# 缩容命令
commands.append("# TiDB集群缩容操作")
commands.append(f"# 缩容{scale_config.node_type.value}节点")
commands.append("")
# 生成节点列表
node_list = ",".join([f"{node['host']}:{node['port']}" for node in scale_config.target_nodes])
commands.append("# 1. 执行缩容操作")
commands.append(f"tiup cluster scale-in {self.cluster_name} --node {node_list}")
commands.append("")
# 验证缩容结果
commands.append("# 2. 验证缩容结果")
commands.append(f"tiup cluster display {self.cluster_name}")
commands.append(f"tiup cluster check {self.cluster_name}")
return commands
def create_backup_plan(self, backup_name: str) -> Dict[str, Any]:
"""创建备份计划"""
config = self.backup_configs.get(backup_name)
if not config:
raise ValueError(f"未找到备份配置: {backup_name}")
# 生成备份脚本
backup_script = self._generate_backup_script(config)
# 生成恢复脚本
restore_script = self._generate_restore_script(config)
# 估算备份大小和时间
estimated_size_gb = random.uniform(50, 500) # 模拟估算
estimated_duration_minutes = int(estimated_size_gb / 10) # 假设10GB/分钟
return {
"backup_name": backup_name,
"backup_type": config.backup_type.value,
"schedule": config.schedule,
"storage_path": config.storage_path,
"estimated_size_gb": estimated_size_gb,
"estimated_duration_minutes": estimated_duration_minutes,
"retention_days": config.retention_days,
"backup_script": backup_script,
"restore_script": restore_script,
"monitoring_queries": self._generate_backup_monitoring_queries()
}
def _generate_backup_script(self, config: BackupConfig) -> List[str]:
"""生成备份脚本"""
script = []
if config.backup_type == BackupType.FULL:
script.extend([
"#!/bin/bash",
"# TiDB全量备份脚本",
"",
"set -e",
"",
"# 配置变量",
f"BACKUP_PATH='{config.storage_path}'",
f"PARALLEL_THREADS={config.parallel_threads}",
f"BANDWIDTH_LIMIT='{config.bandwidth_limit}'",
"BACKUP_DATE=$(date +%Y%m%d_%H%M%S)",
"",
"# 执行全量备份",
"br backup full \\",
" --pd 127.0.0.1:2379 \\",
f" --storage '${{BACKUP_PATH}}/full_${{BACKUP_DATE}}' \\",
f" --ratelimit {config.bandwidth_limit} \\",
f" --concurrency {config.parallel_threads}"
])
if config.compression:
script.append(" --compression lz4")
if config.exclude_tables:
exclude_filter = "|".join(config.exclude_tables)
script.append(f" --filter '!{exclude_filter}'")
elif config.backup_type == BackupType.LOG:
script.extend([
"#!/bin/bash",
"# TiDB日志备份脚本",
"",
"set -e",
"",
"# 启动日志备份",
"br log start \\",
" --pd 127.0.0.1:2379 \\",
f" --storage '{config.storage_path}' \\",
" --task-name log-backup"
])
script.extend([
"",
"# 检查备份状态",
"if [ $? -eq 0 ]; then",
" echo \"备份成功完成: $(date)\"",
" # 发送成功通知",
" curl -X POST -H 'Content-type: application/json' \\",
" --data '{\"text\":\"TiDB备份成功完成\"}' \\",
" $WEBHOOK_URL",
"else",
" echo \"备份失败: $(date)\"",
" # 发送失败通知",
" curl -X POST -H 'Content-type: application/json' \\",
" --data '{\"text\":\"TiDB备份失败,请检查日志\"}' \\",
" $WEBHOOK_URL",
" exit 1",
"fi"
])
return script
def _generate_restore_script(self, config: BackupConfig) -> List[str]:
"""生成恢复脚本"""
script = [
"#!/bin/bash",
"# TiDB数据恢复脚本",
"",
"set -e",
"",
"# 检查参数",
"if [ $# -ne 1 ]; then",
" echo \"Usage: $0 <backup_path>\"",
" exit 1",
"fi",
"",
"BACKUP_PATH=$1",
f"PARALLEL_THREADS={config.parallel_threads}",
"",
"# 确认恢复操作",
"echo \"警告: 此操作将覆盖现有数据!\"",
"read -p \"确认继续? (yes/no): \" confirm",
"if [ \"$confirm\" != \"yes\" ]; then",
" echo \"操作已取消\"",
" exit 1",
"fi",
"",
"# 执行恢复",
"echo \"开始恢复数据: $(date)\"",
"br restore full \\",
" --pd 127.0.0.1:2379 \\",
" --storage \"$BACKUP_PATH\" \\",
f" --concurrency {config.parallel_threads}",
"",
"# 检查恢复状态",
"if [ $? -eq 0 ]; then",
" echo \"恢复成功完成: $(date)\"",
"else",
" echo \"恢复失败: $(date)\"",
" exit 1",
"fi"
]
return script
def _generate_backup_monitoring_queries(self) -> List[Dict[str, str]]:
"""生成备份监控查询"""
return [
{
"name": "备份任务状态",
"query": "SELECT task_name, status, start_time, end_time, backup_size FROM mysql.backup_history ORDER BY start_time DESC LIMIT 10;"
},
{
"name": "备份存储使用情况",
"query": "SELECT backup_type, COUNT(*) as backup_count, SUM(backup_size) as total_size FROM mysql.backup_history WHERE start_time >= DATE_SUB(NOW(), INTERVAL 30 DAY) GROUP BY backup_type;"
},
{
"name": "备份成功率",
"query": "SELECT DATE(start_time) as backup_date, COUNT(*) as total_backups, SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successful_backups, ROUND(SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) as success_rate FROM mysql.backup_history WHERE start_time >= DATE_SUB(NOW(), INTERVAL 7 DAY) GROUP BY DATE(start_time) ORDER BY backup_date;"
}
]
TiDB集群管理演示
print(“\n\n=== TiDB集群管理与运维 ===”)
cluster_manager = TiDBClusterManager(“production-cluster”)
print(“\n1. 集群状态概览:”) cluster_status = cluster_manager.get_cluster_status() print(f” 集群名称: {cluster_status[‘cluster_name’]}“) print(f” 整体状态: {cluster_status[‘overall_status’]}“) print(f” 节点总数: {cluster_status[‘total_nodes’]}“) print(f” 健康节点: {cluster_status[‘healthy_nodes’]}“) print(f” 最后更新: {cluster_status[‘last_updated’]}“)
print(”\n 节点统计:“) for node_type, stats in cluster_status[‘node_statistics’].items(): print(f” {node_type.upper()}:“) print(f” 总数: {stats[‘total’]}, 健康: {stats[‘healthy’]}“) print(f” 平均CPU: {stats[‘avg_cpu’]:.1f}%, 内存: {stats[‘avg_memory’]:.1f}%, 磁盘: {stats[‘avg_disk’]:.1f}%“)
print(”\n2. 扩容计划:“) scale_scenarios = [ (ScaleOperation.SCALE_OUT, NodeType.TIKV, 5), (ScaleOperation.SCALE_OUT, NodeType.TIDB, 4), (ScaleOperation.SCALE_IN, NodeType.TIFLASH, 1) ]
for operation, node_type, target_count in scale_scenarios: try: scale_config = cluster_manager.plan_scale_operation(operation, node_type, target_count) print(f”\n {operation.value} {node_type.value}节点到{target_count}个:“) print(f” 操作类型: {scale_config.operation.value}“) print(f” 目标节点数: {len(scale_config.target_nodes)}“) print(f” 预计耗时: {scale_config.estimated_duration}分钟”) print(f” 资源需求:“) req = scale_config.resource_requirements print(f” CPU核心: {req[‘total_cpu_cores’]}“) print(f” 内存: {req[‘total_memory_gb’]}GB”) print(f” 存储: {req[‘total_disk_gb’]}GB”) print(f” 预估月成本: ${req[‘estimated_cost_monthly’]:.2f}“) except ValueError as e: print(f”\n {operation.value} {node_type.value}节点: {e}“)
print(”\n3. 扩容命令生成:“) try: tikv_scale_config = cluster_manager.plan_scale_operation(ScaleOperation.SCALE_OUT, NodeType.TIKV, 5) commands = cluster_manager.generate_scale_commands(tikv_scale_config) print(f”\n TiKV扩容命令 ({len(commands)}行):“) for i, cmd in enumerate(commands[:15], 1): print(f” {i:2d}. {cmd}“) if len(commands) > 15: print(f” … (还有{len(commands)-15}行)“) except ValueError as e: print(f” 扩容命令生成失败: {e}“)
print(”\n4. 备份计划:“) for backup_name in [‘daily_full’, ‘hourly_incremental’, ‘log_backup’]: backup_plan = cluster_manager.create_backup_plan(backup_name) print(f”\n {backup_plan[‘backup_name’]}:“) print(f” 类型: {backup_plan[‘backup_type’]}“) print(f” 调度: {backup_plan[‘schedule’]}“) print(f” 存储路径: {backup_plan[‘storage_path’]}“) print(f” 预估大小: {backup_plan[‘estimated_size_gb’]:.1f}GB”) print(f” 预估耗时: {backup_plan[‘estimated_duration_minutes’]}分钟”) print(f” 保留天数: {backup_plan[‘retention_days’]}天”) print(f” 备份脚本: {len(backup_plan[‘backup_script’])}行”) print(f” 恢复脚本: {len(backup_plan[‘restore_script’])}行”)
print(“\n5. 监控规则:”) for i, rule in enumerate(cluster_manager.monitoring_rules[:5], 1): print(f”\n 规则{i}: {rule[‘name’]}“) print(f” 表达式: {rule[‘expr’]}“) print(f” 持续时间: {rule[‘for’]}“) print(f” 严重程度: {rule[‘severity’]}“) print(f” 描述: {rule[‘description’]}“)
print(”\n6. 运维程序:“) for proc_name, proc_info in cluster_manager.maintenance_procedures.items(): print(f”\n {proc_info[‘name’]}:“) print(f” 描述: {proc_info[‘description’]}“) print(f” 预计耗时: {proc_info[‘estimated_duration’]}分钟”) print(f” 步骤数: {len(proc_info[‘steps’])}“) print(f” 前置条件: {len(proc_info[‘prerequisites’])}项”) print(“\n 主要步骤:”) for i, step in enumerate(proc_info[‘steps’][:5], 1): print(f” {i}. {step}“) if len(proc_info[‘steps’]) > 5: print(f” … (还有{len(proc_info[‘steps’])-5}个步骤)“)
## 数据备份与恢复
### 1. 备份策略设计
**全量备份:**
```bash
# 使用BR工具进行全量备份
br backup full \
--pd 127.0.0.1:2379 \
--storage 's3://backup-bucket/full-backup-20231201' \
--ratelimit 128 \
--concurrency 8 \
--compression lz4
# 备份特定数据库
br backup db \
--pd 127.0.0.1:2379 \
--db mydb \
--storage 's3://backup-bucket/db-backup-20231201'
# 备份特定表
br backup table \
--pd 127.0.0.1:2379 \
--db mydb \
--table users \
--storage 's3://backup-bucket/table-backup-20231201'
增量备份:
# 启动日志备份(增量备份基础)
br log start \
--pd 127.0.0.1:2379 \
--storage 's3://backup-bucket/log-backup' \
--task-name log-backup-task
# 查看日志备份状态
br log status \
--pd 127.0.0.1:2379 \
--task-name log-backup-task
# 停止日志备份
br log stop \
--pd 127.0.0.1:2379 \
--task-name log-backup-task
2. 数据恢复
全量恢复:
# 恢复全量备份
br restore full \
--pd 127.0.0.1:2379 \
--storage 's3://backup-bucket/full-backup-20231201' \
--concurrency 8
# 恢复到指定数据库
br restore db \
--pd 127.0.0.1:2379 \
--db mydb_restored \
--storage 's3://backup-bucket/db-backup-20231201'
时间点恢复(PITR):
# 恢复到指定时间点
br restore point \
--pd 127.0.0.1:2379 \
--storage 's3://backup-bucket/log-backup' \
--full-backup-storage 's3://backup-bucket/full-backup-20231201' \
--restored-ts '2023-12-01 15:30:00'
3. 备份自动化
Crontab配置:
# 编辑crontab
crontab -e
# 添加备份任务
# 每天凌晨2点执行全量备份
0 2 * * * /opt/tidb/scripts/full_backup.sh >> /var/log/tidb_backup.log 2>&1
# 每小时执行一次备份检查
0 * * * * /opt/tidb/scripts/check_log_backup.sh >> /var/log/tidb_backup.log 2>&1
备份脚本示例:
#!/bin/bash
# full_backup.sh - TiDB全量备份脚本
set -e
# 配置变量
BACKUP_DATE=$(date +%Y%m%d_%H%M%S)
BACKUP_PATH="s3://tidb-backup/full_${BACKUP_DATE}"
PD_ENDPOINTS="127.0.0.1:2379"
CONCURRENCY=8
RATELIMIT="100MB"
# 备份前检查
echo "开始备份检查: $(date)"
tiup cluster check production-cluster
# 执行备份
echo "开始全量备份: $(date)"
br backup full \
--pd $PD_ENDPOINTS \
--storage "$BACKUP_PATH" \
--ratelimit $RATELIMIT \
--concurrency $CONCURRENCY \
--compression lz4
if [ $? -eq 0 ]; then
echo "备份成功完成: $(date)"
# 清理旧备份(保留30天)
aws s3 ls s3://tidb-backup/ | grep full_ | \
while read -r line; do
backup_date=$(echo $line | awk '{print $4}' | sed 's/full_//' | sed 's/_.*$//')
if [ $(date -d "$backup_date" +%s) -lt $(date -d "30 days ago" +%s) ]; then
backup_dir=$(echo $line | awk '{print $4}')
echo "删除过期备份: $backup_dir"
aws s3 rm s3://tidb-backup/$backup_dir --recursive
fi
done
# 发送成功通知
curl -X POST -H 'Content-type: application/json' \
--data '{"text":"TiDB全量备份成功完成"}' \
$SLACK_WEBHOOK_URL
else
echo "备份失败: $(date)"
# 发送失败通知
curl -X POST -H 'Content-type: application/json' \
--data '{"text":"TiDB全量备份失败,请检查日志"}' \
$SLACK_WEBHOOK_URL
exit 1
fi