高可用性是现代数据库系统的核心要求之一。本章将深入探讨MySQL高可用架构的设计原理、实现方案和最佳实践,帮助您构建稳定可靠的数据库服务。
13.1 高可用架构概述
13.1.1 高可用性基本概念
高可用性(High Availability,HA)是指系统在长时间运行过程中,能够持续提供服务的能力。对于数据库系统,高可用性通常包括以下几个方面:
1. 可用性指标
- RTO(Recovery Time Objective): 恢复时间目标,系统故障后恢复服务的最大允许时间
- RPO(Recovery Point Objective): 恢复点目标,系统故障时允许丢失的最大数据量
- MTBF(Mean Time Between Failures): 平均故障间隔时间
- MTTR(Mean Time To Repair): 平均修复时间
2. 可用性等级
可用性等级 年停机时间 月停机时间 周停机时间
99% 3.65天 7.20小时 1.68小时
99.9% 8.76小时 43.2分钟 10.1分钟
99.99% 52.56分钟 4.32分钟 1.01分钟
99.999% 5.26分钟 25.9秒 6.05秒
13.1.2 MySQL高可用架构类型
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MySQL高可用架构分析工具
分析不同高可用方案的特点和适用场景
"""
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
import json
class HAType(Enum):
"""高可用架构类型"""
MASTER_SLAVE = "master_slave"
MASTER_MASTER = "master_master"
CLUSTER = "cluster"
PROXY = "proxy"
CLOUD = "cloud"
class FailoverType(Enum):
"""故障转移类型"""
MANUAL = "manual"
AUTOMATIC = "automatic"
SEMI_AUTOMATIC = "semi_automatic"
@dataclass
class HAArchitecture:
"""高可用架构配置"""
name: str
ha_type: HAType
failover_type: FailoverType
rto_seconds: int # 恢复时间目标(秒)
rpo_seconds: int # 恢复点目标(秒)
availability: float # 可用性百分比
complexity: str # 复杂度:low, medium, high
cost: str # 成本:low, medium, high
data_consistency: str # 数据一致性:eventual, strong
scalability: str # 扩展性:low, medium, high
description: str
pros: List[str]
cons: List[str]
use_cases: List[str]
class MySQLHAAnalyzer:
"""MySQL高可用架构分析器"""
def __init__(self):
self.architectures = self._initialize_architectures()
def _initialize_architectures(self) -> List[HAArchitecture]:
"""初始化高可用架构配置"""
return [
HAArchitecture(
name="主从复制(Master-Slave)",
ha_type=HAType.MASTER_SLAVE,
failover_type=FailoverType.MANUAL,
rto_seconds=300, # 5分钟
rpo_seconds=60, # 1分钟
availability=99.9,
complexity="low",
cost="low",
data_consistency="eventual",
scalability="medium",
description="一个主库处理写操作,一个或多个从库处理读操作",
pros=[
"配置简单,易于理解和维护",
"读写分离,提高查询性能",
"成本较低",
"支持数据备份和报表查询"
],
cons=[
"故障转移需要手动操作",
"存在单点故障风险",
"主从延迟可能影响数据一致性",
"写操作无法扩展"
],
use_cases=[
"读多写少的应用",
"对一致性要求不高的场景",
"预算有限的项目",
"数据分析和报表系统"
]
),
HAArchitecture(
name="主主复制(Master-Master)",
ha_type=HAType.MASTER_MASTER,
failover_type=FailoverType.SEMI_AUTOMATIC,
rto_seconds=60, # 1分钟
rpo_seconds=30, # 30秒
availability=99.95,
complexity="medium",
cost="medium",
data_consistency="eventual",
scalability="medium",
description="两个MySQL实例互为主从,都可以处理写操作",
pros=[
"消除单点故障",
"故障转移时间短",
"支持读写负载均衡",
"地理分布式部署"
],
cons=[
"数据冲突风险",
"配置复杂度增加",
"需要应用层协调",
"网络分区问题"
],
use_cases=[
"需要快速故障转移的应用",
"地理分布式系统",
"中等规模的Web应用",
"对RTO要求较高的场景"
]
),
HAArchitecture(
name="MySQL集群(MySQL Cluster/NDB)",
ha_type=HAType.CLUSTER,
failover_type=FailoverType.AUTOMATIC,
rto_seconds=10, # 10秒
rpo_seconds=0, # 无数据丢失
availability=99.99,
complexity="high",
cost="high",
data_consistency="strong",
scalability="high",
description="基于NDB存储引擎的分布式集群架构",
pros=[
"真正的无单点故障",
"自动故障检测和恢复",
"强一致性保证",
"水平扩展能力强",
"内存数据库性能"
],
cons=[
"配置和维护复杂",
"硬件成本高",
"学习曲线陡峭",
"功能限制(不支持外键等)",
"网络要求高"
],
use_cases=[
"电信级应用",
"金融交易系统",
"实时数据处理",
"对可用性要求极高的场景"
]
),
HAArchitecture(
name="代理层高可用(ProxySQL + MHA)",
ha_type=HAType.PROXY,
failover_type=FailoverType.AUTOMATIC,
rto_seconds=30, # 30秒
rpo_seconds=10, # 10秒
availability=99.95,
complexity="medium",
cost="medium",
data_consistency="eventual",
scalability="high",
description="通过代理层实现连接管理和自动故障转移",
pros=[
"自动故障检测和转移",
"连接池管理",
"读写分离透明化",
"负载均衡",
"应用无感知切换"
],
cons=[
"增加架构复杂度",
"代理层成为潜在瓶颈",
"需要额外的监控",
"故障排查复杂"
],
use_cases=[
"微服务架构",
"高并发Web应用",
"需要透明故障转移的系统",
"多租户应用"
]
),
HAArchitecture(
name="云数据库服务(RDS/Aurora)",
ha_type=HAType.CLOUD,
failover_type=FailoverType.AUTOMATIC,
rto_seconds=60, # 1分钟
rpo_seconds=5, # 5秒
availability=99.99,
complexity="low",
cost="high",
data_consistency="strong",
scalability="high",
description="云服务商提供的托管数据库服务",
pros=[
"完全托管,无需运维",
"自动备份和恢复",
"弹性扩展",
"多可用区部署",
"监控和告警完善"
],
cons=[
"成本较高",
"厂商锁定",
"定制化限制",
"数据迁移复杂"
],
use_cases=[
"初创公司",
"快速原型开发",
"缺乏DBA资源的团队",
"云原生应用"
]
)
]
def get_architecture_by_type(self, ha_type: HAType) -> Optional[HAArchitecture]:
"""根据类型获取架构配置"""
for arch in self.architectures:
if arch.ha_type == ha_type:
return arch
return None
def recommend_architecture(self, requirements: Dict) -> List[HAArchitecture]:
"""根据需求推荐架构"""
recommendations = []
max_rto = requirements.get('max_rto_seconds', 300)
max_rpo = requirements.get('max_rpo_seconds', 60)
min_availability = requirements.get('min_availability', 99.9)
max_complexity = requirements.get('max_complexity', 'high')
max_cost = requirements.get('max_cost', 'high')
complexity_levels = {'low': 1, 'medium': 2, 'high': 3}
cost_levels = {'low': 1, 'medium': 2, 'high': 3}
for arch in self.architectures:
if (arch.rto_seconds <= max_rto and
arch.rpo_seconds <= max_rpo and
arch.availability >= min_availability and
complexity_levels[arch.complexity] <= complexity_levels[max_complexity] and
cost_levels[arch.cost] <= cost_levels[max_cost]):
recommendations.append(arch)
# 按可用性和RTO排序
recommendations.sort(key=lambda x: (-x.availability, x.rto_seconds))
return recommendations
def compare_architectures(self, arch_names: List[str]) -> Dict:
"""比较不同架构"""
comparison = {
'architectures': [],
'comparison_matrix': {}
}
selected_archs = []
for name in arch_names:
for arch in self.architectures:
if arch.name == name:
selected_archs.append(arch)
break
if not selected_archs:
return comparison
comparison['architectures'] = [arch.name for arch in selected_archs]
# 创建比较矩阵
metrics = ['availability', 'rto_seconds', 'rpo_seconds', 'complexity', 'cost', 'scalability']
for metric in metrics:
comparison['comparison_matrix'][metric] = []
for arch in selected_archs:
comparison['comparison_matrix'][metric].append(getattr(arch, metric))
return comparison
def generate_architecture_report(self, arch_name: str) -> Dict:
"""生成架构详细报告"""
arch = None
for a in self.architectures:
if a.name == arch_name:
arch = a
break
if not arch:
return {}
return {
'name': arch.name,
'type': arch.ha_type.value,
'overview': {
'description': arch.description,
'availability': f"{arch.availability}%",
'rto': f"{arch.rto_seconds}秒",
'rpo': f"{arch.rpo_seconds}秒",
'complexity': arch.complexity,
'cost': arch.cost,
'scalability': arch.scalability,
'data_consistency': arch.data_consistency
},
'advantages': arch.pros,
'disadvantages': arch.cons,
'use_cases': arch.use_cases,
'implementation_considerations': self._get_implementation_considerations(arch)
}
def _get_implementation_considerations(self, arch: HAArchitecture) -> List[str]:
"""获取实施注意事项"""
considerations = {
HAType.MASTER_SLAVE: [
"配置主从复制参数",
"监控复制延迟",
"准备故障转移脚本",
"定期测试故障转移流程"
],
HAType.MASTER_MASTER: [
"避免自增ID冲突",
"配置双向复制",
"处理数据冲突策略",
"网络分区检测"
],
HAType.CLUSTER: [
"规划集群节点数量",
"配置数据分片",
"优化网络连接",
"监控集群状态"
],
HAType.PROXY: [
"配置代理规则",
"设置健康检查",
"优化连接池",
"监控代理性能"
],
HAType.CLOUD: [
"选择合适的实例类型",
"配置多可用区",
"设置自动备份",
"监控成本和性能"
]
}
return considerations.get(arch.ha_type, [])
def export_analysis(self, output_file: str = None):
"""导出分析结果"""
if not output_file:
output_file = "mysql_ha_analysis.json"
analysis_data = {
'architectures': [],
'summary': {
'total_architectures': len(self.architectures),
'availability_range': {
'min': min(arch.availability for arch in self.architectures),
'max': max(arch.availability for arch in self.architectures)
},
'rto_range': {
'min': min(arch.rto_seconds for arch in self.architectures),
'max': max(arch.rto_seconds for arch in self.architectures)
}
}
}
for arch in self.architectures:
arch_data = {
'name': arch.name,
'type': arch.ha_type.value,
'metrics': {
'availability': arch.availability,
'rto_seconds': arch.rto_seconds,
'rpo_seconds': arch.rpo_seconds,
'complexity': arch.complexity,
'cost': arch.cost,
'scalability': arch.scalability
},
'description': arch.description,
'pros': arch.pros,
'cons': arch.cons,
'use_cases': arch.use_cases
}
analysis_data['architectures'].append(arch_data)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(analysis_data, f, ensure_ascii=False, indent=2)
print(f"分析结果已导出到: {output_file}")
# 使用示例
if __name__ == '__main__':
analyzer = MySQLHAAnalyzer()
# 根据需求推荐架构
requirements = {
'max_rto_seconds': 60,
'max_rpo_seconds': 30,
'min_availability': 99.95,
'max_complexity': 'medium',
'max_cost': 'medium'
}
recommendations = analyzer.recommend_architecture(requirements)
print("推荐的高可用架构:")
for i, arch in enumerate(recommendations, 1):
print(f"{i}. {arch.name} (可用性: {arch.availability}%, RTO: {arch.rto_seconds}s)")
# 比较不同架构
comparison = analyzer.compare_architectures([
"主从复制(Master-Slave)",
"主主复制(Master-Master)",
"代理层高可用(ProxySQL + MHA)"
])
print("\n架构比较:")
print(json.dumps(comparison, ensure_ascii=False, indent=2))
# 生成详细报告
report = analyzer.generate_architecture_report("MySQL集群(MySQL Cluster/NDB)")
print("\nMySQL集群架构报告:")
print(json.dumps(report, ensure_ascii=False, indent=2))
# 导出分析结果
analyzer.export_analysis()
13.2 主从复制架构
13.2.1 主从复制原理
MySQL主从复制是最基础的高可用方案,通过二进制日志(binlog)实现数据同步。
复制过程
- 主库记录变更: 主库将所有数据变更记录到二进制日志
- 从库请求日志: 从库的I/O线程连接主库,请求二进制日志
- 传输日志: 主库的dump线程将二进制日志发送给从库
- 写入中继日志: 从库将接收到的日志写入中继日志(relay log)
- 应用变更: 从库的SQL线程读取中继日志并执行SQL语句
13.2.2 主从复制配置
#!/bin/bash
# MySQL主从复制自动配置脚本
# 支持一主多从、GTID复制、半同步复制等
set -euo pipefail
# 配置变量
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
LOG_FILE="/var/log/mysql_replication_setup.log"
CONFIG_DIR="/etc/mysql/conf.d"
DATA_DIR="/var/lib/mysql"
# 复制配置
MASTER_HOST=""
MASTER_PORT="3306"
MASTER_USER="root"
MASTER_PASSWORD=""
SLAVE_HOST=""
SLAVE_PORT="3306"
SLAVE_USER="root"
SLAVE_PASSWORD=""
REPL_USER="repl_user"
REPL_PASSWORD="repl_password"
USE_GTID="true"
USE_SEMI_SYNC="false"
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}
# 错误处理
error_exit() {
log "错误: $1"
exit 1
}
# 检查MySQL服务
check_mysql_service() {
local host=$1
local port=$2
local user=$3
local password=$4
if ! mysql -h"$host" -P"$port" -u"$user" -p"$password" -e "SELECT 1" >/dev/null 2>&1; then
error_exit "无法连接到MySQL服务器 $host:$port"
fi
log "MySQL服务器 $host:$port 连接正常"
}
# 生成MySQL配置文件
generate_mysql_config() {
local role=$1 # master 或 slave
local server_id=$2
local config_file="$CONFIG_DIR/replication.cnf"
log "生成MySQL配置文件: $config_file"
cat > "$config_file" << EOF
[mysqld]
# 服务器ID(必须唯一)
server-id = $server_id
# 二进制日志配置
log-bin = mysql-bin
binlog-format = ROW
expire-logs-days = 7
max-binlog-size = 100M
# GTID配置
EOF
if [ "$USE_GTID" = "true" ]; then
cat >> "$config_file" << EOF
gtid-mode = ON
enforce-gtid-consistency = ON
log-slave-updates = ON
EOF
fi
if [ "$role" = "master" ]; then
cat >> "$config_file" << EOF
# 主库配置
binlog-do-db =
binlog-ignore-db = mysql,information_schema,performance_schema,sys
# 半同步复制(主库)
EOF
if [ "$USE_SEMI_SYNC" = "true" ]; then
cat >> "$config_file" << EOF
plugin-load = "rpl_semi_sync_master=semisync_master.so"
rpl-semi-sync-master-enabled = 1
rpl-semi-sync-master-timeout = 1000
EOF
fi
else
cat >> "$config_file" << EOF
# 从库配置
read-only = 1
relay-log = relay-bin
relay-log-index = relay-bin.index
# 半同步复制(从库)
EOF
if [ "$USE_SEMI_SYNC" = "true" ]; then
cat >> "$config_file" << EOF
plugin-load = "rpl_semi_sync_slave=semisync_slave.so"
rpl-semi-sync-slave-enabled = 1
EOF
fi
fi
cat >> "$config_file" << EOF
# 性能优化
innodb-flush-log-at-trx-commit = 1
sync-binlog = 1
slave-parallel-type = LOGICAL_CLOCK
slave-parallel-workers = 4
slave-preserve-commit-order = 1
# 错误处理
slave-skip-errors = 1062,1032
EOF
log "MySQL配置文件已生成"
}
# 配置主库
setup_master() {
log "配置主库: $MASTER_HOST:$MASTER_PORT"
# 检查连接
check_mysql_service "$MASTER_HOST" "$MASTER_PORT" "$MASTER_USER" "$MASTER_PASSWORD"
# 创建复制用户
log "创建复制用户: $REPL_USER"
mysql -h"$MASTER_HOST" -P"$MASTER_PORT" -u"$MASTER_USER" -p"$MASTER_PASSWORD" << EOF
CREATE USER IF NOT EXISTS '$REPL_USER'@'%' IDENTIFIED BY '$REPL_PASSWORD';
GRANT REPLICATION SLAVE ON *.* TO '$REPL_USER'@'%';
FLUSH PRIVILEGES;
EOF
# 启用半同步复制插件(如果需要)
if [ "$USE_SEMI_SYNC" = "true" ]; then
log "启用半同步复制插件(主库)"
mysql -h"$MASTER_HOST" -P"$MASTER_PORT" -u"$MASTER_USER" -p"$MASTER_PASSWORD" << EOF
INSTALL PLUGIN rpl_semi_sync_master SONAME 'semisync_master.so';
SET GLOBAL rpl_semi_sync_master_enabled = 1;
SET GLOBAL rpl_semi_sync_master_timeout = 1000;
EOF
fi
# 获取主库状态
if [ "$USE_GTID" = "true" ]; then
log "使用GTID复制模式"
GTID_SET=$(mysql -h"$MASTER_HOST" -P"$MASTER_PORT" -u"$MASTER_USER" -p"$MASTER_PASSWORD" -sN -e "SELECT @@GLOBAL.gtid_executed;")
log "当前GTID集合: $GTID_SET"
echo "GTID_SET=$GTID_SET" > "/tmp/master_status.txt"
else
log "使用传统复制模式"
mysql -h"$MASTER_HOST" -P"$MASTER_PORT" -u"$MASTER_USER" -p"$MASTER_PASSWORD" -e "SHOW MASTER STATUS\G" > "/tmp/master_status.txt"
BINLOG_FILE=$(grep "File:" "/tmp/master_status.txt" | awk '{print $2}')
BINLOG_POS=$(grep "Position:" "/tmp/master_status.txt" | awk '{print $2}')
log "主库状态: File=$BINLOG_FILE, Position=$BINLOG_POS"
fi
log "主库配置完成"
}
# 配置从库
setup_slave() {
log "配置从库: $SLAVE_HOST:$SLAVE_PORT"
# 检查连接
check_mysql_service "$SLAVE_HOST" "$SLAVE_PORT" "$SLAVE_USER" "$SLAVE_PASSWORD"
# 停止从库(如果正在运行)
mysql -h"$SLAVE_HOST" -P"$SLAVE_PORT" -u"$SLAVE_USER" -p"$SLAVE_PASSWORD" -e "STOP SLAVE;" 2>/dev/null || true
# 启用半同步复制插件(如果需要)
if [ "$USE_SEMI_SYNC" = "true" ]; then
log "启用半同步复制插件(从库)"
mysql -h"$SLAVE_HOST" -P"$SLAVE_PORT" -u"$SLAVE_USER" -p"$SLAVE_PASSWORD" << EOF
INSTALL PLUGIN rpl_semi_sync_slave SONAME 'semisync_slave.so';
SET GLOBAL rpl_semi_sync_slave_enabled = 1;
EOF
fi
# 配置复制
if [ "$USE_GTID" = "true" ]; then
log "配置GTID复制"
mysql -h"$SLAVE_HOST" -P"$SLAVE_PORT" -u"$SLAVE_USER" -p"$SLAVE_PASSWORD" << EOF
CHANGE MASTER TO
MASTER_HOST='$MASTER_HOST',
MASTER_PORT=$MASTER_PORT,
MASTER_USER='$REPL_USER',
MASTER_PASSWORD='$REPL_PASSWORD',
MASTER_AUTO_POSITION=1;
EOF
else
log "配置传统复制"
# 读取主库状态
if [ ! -f "/tmp/master_status.txt" ]; then
error_exit "主库状态文件不存在,请先配置主库"
fi
BINLOG_FILE=$(grep "File:" "/tmp/master_status.txt" | awk '{print $2}')
BINLOG_POS=$(grep "Position:" "/tmp/master_status.txt" | awk '{print $2}')
mysql -h"$SLAVE_HOST" -P"$SLAVE_PORT" -u"$SLAVE_USER" -p"$SLAVE_PASSWORD" << EOF
CHANGE MASTER TO
MASTER_HOST='$MASTER_HOST',
MASTER_PORT=$MASTER_PORT,
MASTER_USER='$REPL_USER',
MASTER_PASSWORD='$REPL_PASSWORD',
MASTER_LOG_FILE='$BINLOG_FILE',
MASTER_LOG_POS=$BINLOG_POS;
EOF
fi
# 启动从库
log "启动从库复制"
mysql -h"$SLAVE_HOST" -P"$SLAVE_PORT" -u"$SLAVE_USER" -p"$SLAVE_PASSWORD" -e "START SLAVE;"
# 检查复制状态
sleep 2
check_slave_status
log "从库配置完成"
}
# 检查从库状态
check_slave_status() {
log "检查从库复制状态"
mysql -h"$SLAVE_HOST" -P"$SLAVE_PORT" -u"$SLAVE_USER" -p"$SLAVE_PASSWORD" -e "SHOW SLAVE STATUS\G" > "/tmp/slave_status.txt"
IO_RUNNING=$(grep "Slave_IO_Running:" "/tmp/slave_status.txt" | awk '{print $2}')
SQL_RUNNING=$(grep "Slave_SQL_Running:" "/tmp/slave_status.txt" | awk '{print $2}')
LAST_ERROR=$(grep "Last_Error:" "/tmp/slave_status.txt" | cut -d':' -f2- | xargs)
SECONDS_BEHIND=$(grep "Seconds_Behind_Master:" "/tmp/slave_status.txt" | awk '{print $2}')
log "IO线程状态: $IO_RUNNING"
log "SQL线程状态: $SQL_RUNNING"
log "复制延迟: $SECONDS_BEHIND 秒"
if [ "$LAST_ERROR" != "" ]; then
log "复制错误: $LAST_ERROR"
fi
if [ "$IO_RUNNING" = "Yes" ] && [ "$SQL_RUNNING" = "Yes" ]; then
log "复制状态正常"
return 0
else
log "复制状态异常"
return 1
fi
}
# 生成监控脚本
generate_monitoring_script() {
log "生成复制监控脚本"
cat > "/usr/local/bin/mysql_replication_monitor.sh" << 'EOF'
#!/bin/bash
# MySQL复制监控脚本
SLAVE_HOST="localhost"
SLAVE_PORT="3306"
SLAVE_USER="root"
SLAVE_PASSWORD=""
ALERT_EMAIL="admin@example.com"
MAX_LAG_SECONDS=60
# 检查复制状态
check_replication() {
local status_file="/tmp/slave_status_$(date +%s).txt"
mysql -h"$SLAVE_HOST" -P"$SLAVE_PORT" -u"$SLAVE_USER" -p"$SLAVE_PASSWORD" -e "SHOW SLAVE STATUS\G" > "$status_file" 2>/dev/null
if [ $? -ne 0 ]; then
echo "ERROR: 无法连接到从库"
return 1
fi
local io_running=$(grep "Slave_IO_Running:" "$status_file" | awk '{print $2}')
local sql_running=$(grep "Slave_SQL_Running:" "$status_file" | awk '{print $2}')
local last_error=$(grep "Last_Error:" "$status_file" | cut -d':' -f2- | xargs)
local seconds_behind=$(grep "Seconds_Behind_Master:" "$status_file" | awk '{print $2}')
echo "复制状态检查 - $(date)"
echo "IO线程: $io_running"
echo "SQL线程: $sql_running"
echo "延迟: $seconds_behind 秒"
# 检查复制是否正常
if [ "$io_running" != "Yes" ] || [ "$sql_running" != "Yes" ]; then
echo "ALERT: 复制线程停止"
echo "错误信息: $last_error"
send_alert "MySQL复制线程停止" "IO: $io_running, SQL: $sql_running, Error: $last_error"
rm -f "$status_file"
return 1
fi
# 检查复制延迟
if [ "$seconds_behind" != "NULL" ] && [ "$seconds_behind" -gt "$MAX_LAG_SECONDS" ]; then
echo "ALERT: 复制延迟过高"
send_alert "MySQL复制延迟告警" "当前延迟: $seconds_behind 秒,阈值: $MAX_LAG_SECONDS 秒"
fi
rm -f "$status_file"
return 0
}
# 发送告警
send_alert() {
local subject="$1"
local message="$2"
echo "$message" | mail -s "$subject" "$ALERT_EMAIL" 2>/dev/null || {
echo "WARNING: 无法发送邮件告警"
logger "MySQL Replication Alert: $subject - $message"
}
}
# 主函数
main() {
case "${1:-check}" in
check)
check_replication
;;
monitor)
while true; do
check_replication
sleep 60
done
;;
*)
echo "用法: $0 {check|monitor}"
echo " check - 检查一次复制状态"
echo " monitor - 持续监控复制状态"
exit 1
;;
esac
}
main "$@"
EOF
chmod +x "/usr/local/bin/mysql_replication_monitor.sh"
log "监控脚本已生成: /usr/local/bin/mysql_replication_monitor.sh"
}
# 生成故障转移脚本
generate_failover_script() {
log "生成故障转移脚本"
cat > "/usr/local/bin/mysql_failover.sh" << 'EOF'
#!/bin/bash
# MySQL主从故障转移脚本
set -euo pipefail
# 配置变量
OLD_MASTER_HOST=""
NEW_MASTER_HOST=""
MYSQL_USER="root"
MYSQL_PASSWORD=""
REPL_USER="repl_user"
REPL_PASSWORD="repl_password"
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1"
}
# 提升从库为主库
promote_slave_to_master() {
local new_master=$1
log "提升从库为主库: $new_master"
# 停止复制
mysql -h"$new_master" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "STOP SLAVE;"
# 重置从库状态
mysql -h"$new_master" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "RESET SLAVE ALL;"
# 启用写入
mysql -h"$new_master" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "SET GLOBAL read_only = 0;"
# 重置二进制日志
mysql -h"$new_master" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "RESET MASTER;"
log "从库已提升为主库"
}
# 重新配置其他从库
reconfigure_slaves() {
local new_master=$1
shift
local slaves=("$@")
for slave in "${slaves[@]}"; do
log "重新配置从库: $slave"
# 停止复制
mysql -h"$slave" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "STOP SLAVE;"
# 配置新主库
mysql -h"$slave" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" << EOF
CHANGE MASTER TO
MASTER_HOST='$new_master',
MASTER_USER='$REPL_USER',
MASTER_PASSWORD='$REPL_PASSWORD',
MASTER_AUTO_POSITION=1;
EOF
# 启动复制
mysql -h"$slave" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "START SLAVE;"
log "从库 $slave 已重新配置"
done
}
# 主函数
main() {
if [ $# -lt 2 ]; then
echo "用法: $0 <新主库IP> <从库IP1> [从库IP2] ..."
echo "示例: $0 192.168.1.101 192.168.1.102 192.168.1.103"
exit 1
fi
local new_master=$1
shift
local slaves=("$@")
log "开始故障转移流程"
log "新主库: $new_master"
log "从库列表: ${slaves[*]}"
# 提升新主库
promote_slave_to_master "$new_master"
# 重新配置从库
if [ ${#slaves[@]} -gt 0 ]; then
reconfigure_slaves "$new_master" "${slaves[@]}"
fi
log "故障转移完成"
echo
echo "请更新应用配置,将数据库连接指向新主库: $new_master"
}
main "$@"
EOF
chmod +x "/usr/local/bin/mysql_failover.sh"
log "故障转移脚本已生成: /usr/local/bin/mysql_failover.sh"
}
# 主函数
main() {
log "开始MySQL主从复制配置"
case "${1:-help}" in
master)
if [ -z "$MASTER_HOST" ]; then
error_exit "请设置MASTER_HOST变量"
fi
generate_mysql_config "master" "1"
setup_master
;;
slave)
if [ -z "$SLAVE_HOST" ] || [ -z "$MASTER_HOST" ]; then
error_exit "请设置SLAVE_HOST和MASTER_HOST变量"
fi
generate_mysql_config "slave" "2"
setup_slave
;;
check)
check_slave_status
;;
monitor)
generate_monitoring_script
;;
failover)
generate_failover_script
;;
all)
if [ -z "$MASTER_HOST" ] || [ -z "$SLAVE_HOST" ]; then
error_exit "请设置MASTER_HOST和SLAVE_HOST变量"
fi
generate_mysql_config "master" "1"
setup_master
generate_mysql_config "slave" "2"
setup_slave
generate_monitoring_script
generate_failover_script
;;
help|*)
echo "MySQL主从复制配置脚本"
echo
echo "用法: $0 [命令]"
echo
echo "命令:"
echo " master - 配置主库"
echo " slave - 配置从库"
echo " check - 检查复制状态"
echo " monitor - 生成监控脚本"
echo " failover - 生成故障转移脚本"
echo " all - 完整配置主从复制"
echo " help - 显示此帮助信息"
echo
echo "环境变量:"
echo " MASTER_HOST - 主库IP地址"
echo " MASTER_PORT - 主库端口 (默认: 3306)"
echo " MASTER_USER - 主库用户 (默认: root)"
echo " MASTER_PASSWORD - 主库密码"
echo " SLAVE_HOST - 从库IP地址"
echo " SLAVE_PORT - 从库端口 (默认: 3306)"
echo " SLAVE_USER - 从库用户 (默认: root)"
echo " SLAVE_PASSWORD - 从库密码"
echo " REPL_USER - 复制用户 (默认: repl_user)"
echo " REPL_PASSWORD - 复制密码 (默认: repl_password)"
echo " USE_GTID - 使用GTID (默认: true)"
echo " USE_SEMI_SYNC - 使用半同步复制 (默认: false)"
echo
echo "示例:"
echo " export MASTER_HOST=192.168.1.100"
echo " export SLAVE_HOST=192.168.1.101"
echo " export MASTER_PASSWORD=master_pass"
echo " export SLAVE_PASSWORD=slave_pass"
echo " $0 all"
exit 0
;;
esac
log "MySQL主从复制配置完成"
}
# 解析命令行参数
while [[ $# -gt 1 ]]; do
case $1 in
--master-host)
MASTER_HOST="$2"
shift 2
;;
--slave-host)
SLAVE_HOST="$2"
shift 2
;;
--master-password)
MASTER_PASSWORD="$2"
shift 2
;;
--slave-password)
SLAVE_PASSWORD="$2"
shift 2
;;
--repl-password)
REPL_PASSWORD="$2"
shift 2
;;
--use-gtid)
USE_GTID="$2"
shift 2
;;
--use-semi-sync)
USE_SEMI_SYNC="$2"
shift 2
;;
*)
break
;;
esac
done
# 执行主函数
main "$@"
13.7 故障转移机制
13.7.1 自动故障转移
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MySQL自动故障转移系统
支持主从复制、主主复制和集群环境的自动故障检测与转移
"""
import time
import json
import logging
import threading
import subprocess
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import pymysql
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class NodeRole(Enum):
"""节点角色"""
MASTER = "master"
SLAVE = "slave"
CANDIDATE = "candidate"
FAILED = "failed"
class FailoverStatus(Enum):
"""故障转移状态"""
MONITORING = "monitoring"
DETECTING = "detecting"
FAILING_OVER = "failing_over"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class MySQLNode:
"""MySQL节点信息"""
host: str
port: int
user: str
password: str
role: NodeRole
priority: int = 100
last_check: Optional[datetime] = None
is_healthy: bool = True
lag_seconds: int = 0
gtid_executed: str = ""
server_id: int = 0
class MySQLFailoverManager:
"""MySQL故障转移管理器"""
def __init__(self, config_file: str = "/etc/mysql_failover.json"):
self.config = self._load_config(config_file)
self.nodes = self._initialize_nodes()
self.current_master = None
self.status = FailoverStatus.MONITORING
self.failover_lock = threading.Lock()
self.monitoring_thread = None
self.is_running = False
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/mysql_failover.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def _load_config(self, config_file: str) -> Dict:
"""加载配置文件"""
try:
with open(config_file, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
# 默认配置
return {
"check_interval": 5,
"failover_timeout": 30,
"max_lag_seconds": 10,
"min_slaves_for_failover": 1,
"email_alerts": {
"enabled": True,
"smtp_server": "localhost",
"smtp_port": 587,
"username": "",
"password": "",
"from_email": "mysql-failover@example.com",
"to_emails": ["admin@example.com"]
},
"vip": {
"enabled": False,
"interface": "eth0",
"ip": "192.168.1.100",
"netmask": "255.255.255.0"
},
"nodes": [
{
"host": "192.168.1.10",
"port": 3306,
"user": "repl",
"password": "password",
"role": "master",
"priority": 100
},
{
"host": "192.168.1.11",
"port": 3306,
"user": "repl",
"password": "password",
"role": "slave",
"priority": 90
},
{
"host": "192.168.1.12",
"port": 3306,
"user": "repl",
"password": "password",
"role": "slave",
"priority": 80
}
]
}
def _initialize_nodes(self) -> List[MySQLNode]:
"""初始化节点列表"""
nodes = []
for node_config in self.config['nodes']:
node = MySQLNode(
host=node_config['host'],
port=node_config['port'],
user=node_config['user'],
password=node_config['password'],
role=NodeRole(node_config['role']),
priority=node_config.get('priority', 100)
)
nodes.append(node)
if node.role == NodeRole.MASTER:
self.current_master = node
return nodes
def _connect_to_node(self, node: MySQLNode) -> Optional[pymysql.Connection]:
"""连接到MySQL节点"""
try:
connection = pymysql.connect(
host=node.host,
port=node.port,
user=node.user,
password=node.password,
connect_timeout=5,
read_timeout=10,
charset='utf8mb4'
)
return connection
except Exception as e:
self.logger.warning(f"无法连接到节点 {node.host}:{node.port}: {e}")
return None
def _check_node_health(self, node: MySQLNode) -> bool:
"""检查节点健康状态"""
connection = self._connect_to_node(node)
if not connection:
node.is_healthy = False
return False
try:
with connection.cursor() as cursor:
# 检查基本连接
cursor.execute("SELECT 1")
# 获取服务器ID
cursor.execute("SELECT @@server_id")
node.server_id = cursor.fetchone()[0]
# 检查复制状态
if node.role == NodeRole.SLAVE:
cursor.execute("SHOW SLAVE STATUS")
slave_status = cursor.fetchone()
if slave_status:
# 检查复制延迟
node.lag_seconds = slave_status[32] or 0 # Seconds_Behind_Master
# 检查IO和SQL线程状态
io_running = slave_status[10] == 'Yes' # Slave_IO_Running
sql_running = slave_status[11] == 'Yes' # Slave_SQL_Running
if not (io_running and sql_running):
self.logger.warning(f"节点 {node.host} 复制线程异常")
node.is_healthy = False
return False
# 检查延迟是否超过阈值
if node.lag_seconds > self.config['max_lag_seconds']:
self.logger.warning(f"节点 {node.host} 复制延迟过高: {node.lag_seconds}秒")
# 获取GTID执行状态
cursor.execute("SELECT @@gtid_executed")
node.gtid_executed = cursor.fetchone()[0] or ""
node.is_healthy = True
node.last_check = datetime.now()
return True
except Exception as e:
self.logger.error(f"检查节点 {node.host} 健康状态失败: {e}")
node.is_healthy = False
return False
finally:
connection.close()
def _get_best_candidate(self) -> Optional[MySQLNode]:
"""选择最佳的故障转移候选节点"""
candidates = [
node for node in self.nodes
if node.role == NodeRole.SLAVE and node.is_healthy
]
if not candidates:
return None
# 按优先级和GTID位置排序
candidates.sort(key=lambda x: (-x.priority, x.lag_seconds))
return candidates[0]
def _promote_slave_to_master(self, new_master: MySQLNode) -> bool:
"""将从库提升为主库"""
connection = self._connect_to_node(new_master)
if not connection:
return False
try:
with connection.cursor() as cursor:
# 停止复制
cursor.execute("STOP SLAVE")
# 重置复制配置
cursor.execute("RESET SLAVE ALL")
# 设置为可写
cursor.execute("SET GLOBAL read_only = 0")
cursor.execute("SET GLOBAL super_read_only = 0")
# 刷新日志
cursor.execute("FLUSH LOGS")
self.logger.info(f"成功将 {new_master.host} 提升为主库")
return True
except Exception as e:
self.logger.error(f"提升 {new_master.host} 为主库失败: {e}")
return False
finally:
connection.close()
def _reconfigure_slaves(self, new_master: MySQLNode) -> bool:
"""重新配置其他从库指向新主库"""
success_count = 0
for node in self.nodes:
if node == new_master or node.role != NodeRole.SLAVE:
continue
if not node.is_healthy:
continue
connection = self._connect_to_node(node)
if not connection:
continue
try:
with connection.cursor() as cursor:
# 停止复制
cursor.execute("STOP SLAVE")
# 重新配置主库
change_master_sql = f"""
CHANGE MASTER TO
MASTER_HOST='{new_master.host}',
MASTER_PORT={new_master.port},
MASTER_USER='{new_master.user}',
MASTER_PASSWORD='{new_master.password}',
MASTER_AUTO_POSITION=1
"""
cursor.execute(change_master_sql)
# 启动复制
cursor.execute("START SLAVE")
self.logger.info(f"成功重新配置从库 {node.host}")
success_count += 1
except Exception as e:
self.logger.error(f"重新配置从库 {node.host} 失败: {e}")
finally:
connection.close()
return success_count > 0
def _update_vip(self, new_master: MySQLNode) -> bool:
"""更新虚拟IP"""
if not self.config['vip']['enabled']:
return True
vip_config = self.config['vip']
try:
# 删除旧的VIP(如果存在)
subprocess.run([
'ip', 'addr', 'del',
f"{vip_config['ip']}/{vip_config['netmask']}",
'dev', vip_config['interface']
], check=False, capture_output=True)
# 在新主库上添加VIP
if new_master.host == self._get_local_ip():
subprocess.run([
'ip', 'addr', 'add',
f"{vip_config['ip']}/{vip_config['netmask']}",
'dev', vip_config['interface']
], check=True, capture_output=True)
# 发送免费ARP
subprocess.run([
'arping', '-c', '3', '-I', vip_config['interface'],
vip_config['ip']
], check=False, capture_output=True)
self.logger.info(f"VIP {vip_config['ip']} 已迁移到 {new_master.host}")
return True
except Exception as e:
self.logger.error(f"更新VIP失败: {e}")
return False
def _get_local_ip(self) -> str:
"""获取本地IP地址"""
import socket
try:
# 连接到一个不存在的地址来获取本地IP
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
s.close()
return local_ip
except:
return "127.0.0.1"
def _send_alert(self, subject: str, message: str):
"""发送告警邮件"""
if not self.config['email_alerts']['enabled']:
return
email_config = self.config['email_alerts']
try:
msg = MIMEMultipart()
msg['From'] = email_config['from_email']
msg['To'] = ', '.join(email_config['to_emails'])
msg['Subject'] = subject
msg.attach(MIMEText(message, 'plain', 'utf-8'))
server = smtplib.SMTP(email_config['smtp_server'], email_config['smtp_port'])
if email_config['username']:
server.starttls()
server.login(email_config['username'], email_config['password'])
server.send_message(msg)
server.quit()
self.logger.info("告警邮件发送成功")
except Exception as e:
self.logger.error(f"发送告警邮件失败: {e}")
def perform_failover(self) -> bool:
"""执行故障转移"""
with self.failover_lock:
if self.status != FailoverStatus.MONITORING:
self.logger.warning("故障转移正在进行中,跳过")
return False
self.status = FailoverStatus.DETECTING
try:
# 检查是否真的需要故障转移
if self.current_master and self._check_node_health(self.current_master):
self.logger.info("主库恢复正常,取消故障转移")
self.status = FailoverStatus.MONITORING
return False
# 选择最佳候选节点
new_master = self._get_best_candidate()
if not new_master:
self.logger.error("没有可用的故障转移候选节点")
self.status = FailoverStatus.FAILED
self._send_alert(
"MySQL故障转移失败",
"没有可用的故障转移候选节点"
)
return False
self.logger.info(f"开始故障转移,新主库: {new_master.host}")
self.status = FailoverStatus.FAILING_OVER
# 提升新主库
if not self._promote_slave_to_master(new_master):
self.logger.error("提升新主库失败")
self.status = FailoverStatus.FAILED
return False
# 重新配置其他从库
if not self._reconfigure_slaves(new_master):
self.logger.warning("重新配置从库部分失败")
# 更新VIP
self._update_vip(new_master)
# 更新角色
if self.current_master:
self.current_master.role = NodeRole.FAILED
new_master.role = NodeRole.MASTER
self.current_master = new_master
self.status = FailoverStatus.COMPLETED
# 发送成功通知
self._send_alert(
"MySQL故障转移成功",
f"故障转移已完成,新主库: {new_master.host}:{new_master.port}"
)
self.logger.info("故障转移完成")
return True
except Exception as e:
self.logger.error(f"故障转移过程中发生错误: {e}")
self.status = FailoverStatus.FAILED
self._send_alert(
"MySQL故障转移异常",
f"故障转移过程中发生错误: {e}"
)
return False
finally:
# 重置状态
if self.status != FailoverStatus.COMPLETED:
time.sleep(30) # 等待30秒后重新开始监控
self.status = FailoverStatus.MONITORING
def _monitoring_loop(self):
"""监控循环"""
while self.is_running:
try:
# 检查所有节点健康状态
for node in self.nodes:
self._check_node_health(node)
# 检查主库状态
if self.current_master and not self.current_master.is_healthy:
self.logger.warning(f"检测到主库 {self.current_master.host} 异常")
# 等待一段时间再次确认
time.sleep(5)
if not self._check_node_health(self.current_master):
self.logger.error(f"主库 {self.current_master.host} 确认失效,开始故障转移")
self.perform_failover()
# 检查从库状态
unhealthy_slaves = [
node for node in self.nodes
if node.role == NodeRole.SLAVE and not node.is_healthy
]
if unhealthy_slaves:
for slave in unhealthy_slaves:
self.logger.warning(f"从库 {slave.host} 状态异常")
time.sleep(self.config['check_interval'])
except Exception as e:
self.logger.error(f"监控循环异常: {e}")
time.sleep(10)
def start_monitoring(self):
"""开始监控"""
if self.is_running:
self.logger.warning("监控已在运行中")
return
self.is_running = True
self.monitoring_thread = threading.Thread(target=self._monitoring_loop)
self.monitoring_thread.daemon = True
self.monitoring_thread.start()
self.logger.info("MySQL故障转移监控已启动")
def stop_monitoring(self):
"""停止监控"""
self.is_running = False
if self.monitoring_thread:
self.monitoring_thread.join(timeout=10)
self.logger.info("MySQL故障转移监控已停止")
def get_cluster_status(self) -> Dict:
"""获取集群状态"""
status = {
'timestamp': datetime.now().isoformat(),
'failover_status': self.status.value,
'current_master': None,
'nodes': []
}
if self.current_master:
status['current_master'] = {
'host': self.current_master.host,
'port': self.current_master.port,
'is_healthy': self.current_master.is_healthy,
'last_check': self.current_master.last_check.isoformat() if self.current_master.last_check else None
}
for node in self.nodes:
node_info = {
'host': node.host,
'port': node.port,
'role': node.role.value,
'priority': node.priority,
'is_healthy': node.is_healthy,
'lag_seconds': node.lag_seconds,
'server_id': node.server_id,
'last_check': node.last_check.isoformat() if node.last_check else None
}
status['nodes'].append(node_info)
return status
def manual_failover(self, target_host: str) -> bool:
"""手动故障转移到指定节点"""
target_node = None
for node in self.nodes:
if node.host == target_host and node.role == NodeRole.SLAVE:
target_node = node
break
if not target_node:
self.logger.error(f"未找到目标节点 {target_host} 或节点不是从库")
return False
if not target_node.is_healthy:
self.logger.error(f"目标节点 {target_host} 状态异常")
return False
self.logger.info(f"开始手动故障转移到 {target_host}")
# 临时设置当前主库为失效状态
old_master = self.current_master
if old_master:
old_master.is_healthy = False
# 执行故障转移
success = self.perform_failover()
# 如果失败,恢复原主库状态
if not success and old_master:
old_master.is_healthy = True
return success
# 使用示例
if __name__ == '__main__':
import sys
import signal
def signal_handler(signum, frame):
print("\n正在停止故障转移监控...")
failover_manager.stop_monitoring()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# 创建故障转移管理器
failover_manager = MySQLFailoverManager()
if len(sys.argv) > 1:
command = sys.argv[1]
if command == "status":
# 显示集群状态
status = failover_manager.get_cluster_status()
print(json.dumps(status, ensure_ascii=False, indent=2))
elif command == "failover" and len(sys.argv) > 2:
# 手动故障转移
target_host = sys.argv[2]
success = failover_manager.manual_failover(target_host)
print(f"手动故障转移{'成功' if success else '失败'}")
elif command == "monitor":
# 启动监控
failover_manager.start_monitoring()
print("故障转移监控已启动,按 Ctrl+C 停止")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
else:
print("用法:")
print(" python mysql_failover.py monitor # 启动监控")
print(" python mysql_failover.py status # 查看状态")
print(" python mysql_failover.py failover <host> # 手动故障转移")
else:
print("MySQL自动故障转移系统")
print("用法: python mysql_failover.py [monitor|status|failover <host>]")
13.7.2 故障转移测试
#!/bin/bash
# MySQL故障转移测试脚本
# 用于测试各种故障场景下的自动转移能力
set -euo pipefail
# 配置参数
MASTER_HOST="192.168.1.10"
SLAVE1_HOST="192.168.1.11"
SLAVE2_HOST="192.168.1.12"
MYSQL_USER="test"
MYSQL_PASSWORD="password"
TEST_DATABASE="failover_test"
LOG_FILE="/tmp/failover_test.log"
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}
# 错误处理
error_exit() {
log "错误: $1"
exit 1
}
# 执行MySQL命令
execute_mysql() {
local host="$1"
local sql="$2"
mysql -h "$host" -u "$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "$sql" 2>/dev/null
}
# 检查MySQL连接
check_mysql_connection() {
local host="$1"
if execute_mysql "$host" "SELECT 1" >/dev/null 2>&1; then
return 0
else
return 1
fi
}
# 获取主库信息
get_master_status() {
local host="$1"
execute_mysql "$host" "SHOW MASTER STATUS\\G" | grep -E "(File|Position)"
}
# 获取从库状态
get_slave_status() {
local host="$1"
execute_mysql "$host" "SHOW SLAVE STATUS\\G" | grep -E "(Master_Host|Slave_IO_Running|Slave_SQL_Running|Seconds_Behind_Master)"
}
# 创建测试数据
create_test_data() {
log "创建测试数据"
execute_mysql "$MASTER_HOST" "
CREATE DATABASE IF NOT EXISTS $TEST_DATABASE;
USE $TEST_DATABASE;
CREATE TABLE IF NOT EXISTS test_table (
id INT AUTO_INCREMENT PRIMARY KEY,
data VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO test_table (data) VALUES
('Initial data 1'),
('Initial data 2'),
('Initial data 3');
"
log "测试数据创建完成"
}
# 验证数据一致性
verify_data_consistency() {
local host="$1"
log "验证节点 $host 的数据一致性"
local count
count=$(execute_mysql "$host" "SELECT COUNT(*) FROM $TEST_DATABASE.test_table" | tail -1)
log "节点 $host 数据行数: $count"
# 显示最新的几条记录
execute_mysql "$host" "SELECT * FROM $TEST_DATABASE.test_table ORDER BY id DESC LIMIT 5"
}
# 模拟主库故障
simulate_master_failure() {
log "模拟主库故障 - 停止MySQL服务"
ssh "root@$MASTER_HOST" "systemctl stop mysql" || {
log "无法通过SSH停止主库服务,尝试其他方法"
return 1
}
log "主库服务已停止"
}
# 模拟网络分区
simulate_network_partition() {
log "模拟网络分区 - 阻断主库网络"
ssh "root@$MASTER_HOST" "iptables -A INPUT -p tcp --dport 3306 -j DROP" || {
log "无法设置网络分区"
return 1
}
log "网络分区已设置"
}
# 模拟高负载
simulate_high_load() {
log "模拟高负载场景"
# 在主库上执行大量写入操作
for i in {1..100}; do
execute_mysql "$MASTER_HOST" "
INSERT INTO $TEST_DATABASE.test_table (data)
VALUES ('Load test data $i');
" &
done
wait
log "高负载测试完成"
}
# 恢复主库
restore_master() {
log "恢复主库服务"
# 清除网络规则
ssh "root@$MASTER_HOST" "iptables -D INPUT -p tcp --dport 3306 -j DROP" 2>/dev/null || true
# 启动MySQL服务
ssh "root@$MASTER_HOST" "systemctl start mysql" || {
log "无法启动主库服务"
return 1
}
# 等待服务启动
sleep 10
log "主库服务已恢复"
}
# 等待故障转移完成
wait_for_failover() {
local timeout=60
local elapsed=0
log "等待故障转移完成(超时: ${timeout}秒)"
while [ $elapsed -lt $timeout ]; do
# 检查从库是否变为主库
if check_mysql_connection "$SLAVE1_HOST"; then
local read_only
read_only=$(execute_mysql "$SLAVE1_HOST" "SELECT @@read_only" | tail -1)
if [ "$read_only" = "0" ]; then
log "故障转移完成,$SLAVE1_HOST 已成为新主库"
return 0
fi
fi
if check_mysql_connection "$SLAVE2_HOST"; then
local read_only
read_only=$(execute_mysql "$SLAVE2_HOST" "SELECT @@read_only" | tail -1)
if [ "$read_only" = "0" ]; then
log "故障转移完成,$SLAVE2_HOST 已成为新主库"
return 0
fi
fi
sleep 5
elapsed=$((elapsed + 5))
log "等待故障转移... ($elapsed/$timeout 秒)"
done
log "故障转移超时"
return 1
}
# 测试场景1:主库服务停止
test_master_service_failure() {
log "=== 测试场景1:主库服务停止 ==="
# 记录初始状态
log "初始主库状态:"
get_master_status "$MASTER_HOST"
log "初始从库状态:"
get_slave_status "$SLAVE1_HOST"
get_slave_status "$SLAVE2_HOST"
# 模拟故障
simulate_master_failure
# 等待故障转移
if wait_for_failover; then
log "故障转移测试通过"
# 验证数据一致性
verify_data_consistency "$SLAVE1_HOST"
verify_data_consistency "$SLAVE2_HOST"
else
log "故障转移测试失败"
fi
# 恢复环境
restore_master
}
# 测试场景2:网络分区
test_network_partition() {
log "=== 测试场景2:网络分区 ==="
# 模拟网络分区
simulate_network_partition
# 等待故障转移
if wait_for_failover; then
log "网络分区故障转移测试通过"
else
log "网络分区故障转移测试失败"
fi
# 恢复环境
restore_master
}
# 测试场景3:高负载下的故障转移
test_high_load_failover() {
log "=== 测试场景3:高负载下的故障转移 ==="
# 启动高负载
simulate_high_load &
local load_pid=$!
# 等待一段时间让负载稳定
sleep 10
# 模拟故障
simulate_master_failure
# 停止负载生成
kill $load_pid 2>/dev/null || true
# 等待故障转移
if wait_for_failover; then
log "高负载故障转移测试通过"
else
log "高负载故障转移测试失败"
fi
# 恢复环境
restore_master
}
# 测试场景4:脑裂检测
test_split_brain_detection() {
log "=== 测试场景4:脑裂检测 ==="
# 模拟网络分区
simulate_network_partition
# 等待故障转移
wait_for_failover
# 恢复原主库网络
ssh "root@$MASTER_HOST" "iptables -D INPUT -p tcp --dport 3306 -j DROP" 2>/dev/null || true
# 检查是否有脑裂
sleep 10
local master_count=0
if check_mysql_connection "$MASTER_HOST"; then
local read_only
read_only=$(execute_mysql "$MASTER_HOST" "SELECT @@read_only" | tail -1)
if [ "$read_only" = "0" ]; then
master_count=$((master_count + 1))
log "检测到主库: $MASTER_HOST"
fi
fi
if check_mysql_connection "$SLAVE1_HOST"; then
local read_only
read_only=$(execute_mysql "$SLAVE1_HOST" "SELECT @@read_only" | tail -1)
if [ "$read_only" = "0" ]; then
master_count=$((master_count + 1))
log "检测到主库: $SLAVE1_HOST"
fi
fi
if check_mysql_connection "$SLAVE2_HOST"; then
local read_only
read_only=$(execute_mysql "$SLAVE2_HOST" "SELECT @@read_only" | tail -1)
if [ "$read_only" = "0" ]; then
master_count=$((master_count + 1))
log "检测到主库: $SLAVE2_HOST"
fi
fi
if [ $master_count -gt 1 ]; then
log "警告: 检测到脑裂,存在多个主库!"
else
log "脑裂检测正常,只有一个主库"
fi
}
# 性能测试
performance_test() {
log "=== 性能测试 ==="
local current_master
# 确定当前主库
if check_mysql_connection "$MASTER_HOST"; then
local read_only
read_only=$(execute_mysql "$MASTER_HOST" "SELECT @@read_only" | tail -1)
if [ "$read_only" = "0" ]; then
current_master="$MASTER_HOST"
fi
fi
if [ -z "${current_master:-}" ]; then
if check_mysql_connection "$SLAVE1_HOST"; then
local read_only
read_only=$(execute_mysql "$SLAVE1_HOST" "SELECT @@read_only" | tail -1)
if [ "$read_only" = "0" ]; then
current_master="$SLAVE1_HOST"
fi
fi
fi
if [ -z "${current_master:-}" ]; then
if check_mysql_connection "$SLAVE2_HOST"; then
local read_only
read_only=$(execute_mysql "$SLAVE2_HOST" "SELECT @@read_only" | tail -1)
if [ "$read_only" = "0" ]; then
current_master="$SLAVE2_HOST"
fi
fi
fi
if [ -z "${current_master:-}" ]; then
log "无法找到当前主库"
return 1
fi
log "当前主库: $current_master"
# 执行性能测试
local start_time=$(date +%s)
for i in {1..1000}; do
execute_mysql "$current_master" "
INSERT INTO $TEST_DATABASE.test_table (data)
VALUES ('Performance test $i');
" >/dev/null
done
local end_time=$(date +%s)
local duration=$((end_time - start_time))
local tps=$((1000 / duration))
log "性能测试完成: 1000次插入耗时 ${duration}秒,TPS: $tps"
}
# 生成测试报告
generate_report() {
local report_file="/tmp/failover_test_report_$(date +%Y%m%d_%H%M%S).txt"
{
echo "MySQL故障转移测试报告"
echo "======================"
echo "测试时间: $(date)"
echo "测试环境:"
echo " 主库: $MASTER_HOST"
echo " 从库1: $SLAVE1_HOST"
echo " 从库2: $SLAVE2_HOST"
echo
echo "测试结果:"
echo "--------"
# 提取测试结果
grep -E "(测试通过|测试失败|检测到脑裂|性能测试完成)" "$LOG_FILE" || echo "无测试结果"
echo
echo "详细日志:"
echo "--------"
cat "$LOG_FILE"
} > "$report_file"
log "测试报告已生成: $report_file"
}
# 清理环境
cleanup() {
log "清理测试环境"
# 删除测试数据库
for host in "$MASTER_HOST" "$SLAVE1_HOST" "$SLAVE2_HOST"; do
if check_mysql_connection "$host"; then
execute_mysql "$host" "DROP DATABASE IF EXISTS $TEST_DATABASE" 2>/dev/null || true
fi
done
# 清除网络规则
ssh "root@$MASTER_HOST" "iptables -D INPUT -p tcp --dport 3306 -j DROP" 2>/dev/null || true
log "环境清理完成"
}
# 主函数
main() {
case "${1:-all}" in
create-data)
create_test_data
;;
test1)
test_master_service_failure
;;
test2)
test_network_partition
;;
test3)
test_high_load_failover
;;
test4)
test_split_brain_detection
;;
performance)
performance_test
;;
cleanup)
cleanup
;;
report)
generate_report
;;
all)
log "开始完整的故障转移测试"
# 创建测试数据
create_test_data
# 执行所有测试
test_master_service_failure
sleep 30
test_network_partition
sleep 30
test_high_load_failover
sleep 30
test_split_brain_detection
sleep 30
performance_test
# 生成报告
generate_report
# 清理环境
cleanup
log "所有测试完成"
;;
help|*)
echo "MySQL故障转移测试脚本"
echo
echo "用法: $0 [命令]"
echo
echo "命令:"
echo " create-data - 创建测试数据"
echo " test1 - 测试主库服务停止"
echo " test2 - 测试网络分区"
echo " test3 - 测试高负载故障转移"
echo " test4 - 测试脑裂检测"
echo " performance - 性能测试"
echo " cleanup - 清理测试环境"
echo " report - 生成测试报告"
echo " all - 执行所有测试(默认)"
echo " help - 显示此帮助信息"
;;
esac
}
main "$@"
13.8 总结
本章详细介绍了MySQL高可用架构设计的各个方面,从基础的主从复制到复杂的集群架构,再到云数据库服务和自动故障转移机制。
13.8.1 核心要点回顾
高可用架构层次
- 主从复制:基础的高可用方案
- 主主复制:双向复制,提供更好的可用性
- MySQL集群:分布式架构,支持水平扩展
- 代理层高可用:通过中间件实现透明故障转移
- 云数据库:托管服务,简化运维复杂度
关键技术组件
- GTID:全局事务标识符,简化复制配置
- 半同步复制:平衡性能和数据安全性
- 并行复制:提高复制性能
- 故障检测:及时发现节点异常
- 自动故障转移:减少人工干预
架构选择考虑因素
- RTO/RPO要求
- 数据一致性需求
- 性能要求
- 运维复杂度
- 成本预算
13.8.2 实施建议
分阶段实施
- 第一阶段:建立基础主从复制
- 第二阶段:添加监控和告警
- 第三阶段:实现自动故障转移
- 第四阶段:优化性能和扩展性
最佳实践
- 定期进行故障转移演练
- 建立完善的监控体系
- 制定详细的运维手册
- 保持架构文档更新
- 定期备份和恢复测试
常见问题处理
- 复制延迟:优化网络、调整参数
- 脑裂问题:使用仲裁机制
- 数据不一致:定期校验和修复
- 性能瓶颈:读写分离、分库分表
13.8.3 发展趋势
云原生架构
- 容器化部署
- Kubernetes编排
- 服务网格集成
- 自动化运维
新技术融合
- MySQL 8.0新特性
- InnoDB Cluster
- MySQL Router
- Group Replication
智能化运维
- AI驱动的故障预测
- 自动化性能调优
- 智能容量规划
- 自愈系统
13.8.4 下一步学习方向
深入学习
- MySQL内核原理
- 分布式系统理论
- 云计算架构
- DevOps实践
实践项目
- 搭建完整的高可用环境
- 开发监控和管理工具
- 参与开源项目
- 性能调优案例
认证考试
- MySQL认证
- 云服务认证
- 架构师认证
- DevOps认证
通过本章的学习,你应该能够: - 理解各种高可用架构的原理和适用场景 - 设计和实施适合业务需求的高可用方案 - 配置和管理MySQL复制环境 - 实现自动故障检测和转移 - 选择合适的云数据库服务 - 进行故障转移测试和性能优化
下一章我们将学习MySQL的分库分表技术,探讨如何处理大规模数据的水平扩展问题。
13.2.3 主从复制优化
-- 主从复制性能优化SQL脚本
-- 优化复制性能和稳定性
-- 1. 并行复制配置
SET GLOBAL slave_parallel_type = 'LOGICAL_CLOCK';
SET GLOBAL slave_parallel_workers = 4;
SET GLOBAL slave_preserve_commit_order = 1;
SET GLOBAL slave_pending_jobs_size_max = 134217728; -- 128MB
-- 2. 复制过滤配置
-- 忽略系统数据库
CHANGE REPLICATION FILTER REPLICATE_IGNORE_DB = (mysql, information_schema, performance_schema, sys);
-- 只复制特定数据库
-- CHANGE REPLICATION FILTER REPLICATE_DO_DB = (app_db, user_db);
-- 忽略特定表
-- CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE = (app_db.temp_table, app_db.log_table);
-- 3. 二进制日志优化
SET GLOBAL binlog_cache_size = 1048576; -- 1MB
SET GLOBAL max_binlog_cache_size = 4294967296; -- 4GB
SET GLOBAL binlog_stmt_cache_size = 32768; -- 32KB
SET GLOBAL max_binlog_stmt_cache_size = 4294967296; -- 4GB
-- 4. 复制缓冲区优化
SET GLOBAL slave_net_timeout = 60;
SET GLOBAL slave_compressed_protocol = 1;
-- 5. 检查复制状态
SHOW SLAVE STATUS\G
-- 6. 查看复制延迟
SELECT
CHANNEL_NAME,
SERVICE_STATE,
LAST_ERROR_MESSAGE,
LAST_ERROR_TIMESTAMP
FROM performance_schema.replication_connection_status;
-- 7. 查看并行复制工作线程
SELECT
WORKER_ID,
THREAD_ID,
SERVICE_STATE,
LAST_ERROR_MESSAGE,
LAST_ERROR_TIMESTAMP
FROM performance_schema.replication_applier_status_by_worker;
-- 8. 复制性能监控
SELECT
EVENT_NAME,
COUNT_STAR,
SUM_TIMER_WAIT/1000000000000 AS SUM_TIMER_WAIT_SEC,
AVG_TIMER_WAIT/1000000000000 AS AVG_TIMER_WAIT_SEC
FROM performance_schema.events_waits_summary_global_by_event_name
WHERE EVENT_NAME LIKE '%replication%'
ORDER BY SUM_TIMER_WAIT DESC;
13.3 主主复制架构
13.3.1 主主复制配置
主主复制(Master-Master Replication)允许两个MySQL实例互为主从,实现双向数据同步。
#!/bin/bash
# MySQL主主复制配置脚本
# 配置双向复制,避免数据冲突
set -euo pipefail
# 配置变量
SERVER1_HOST="192.168.1.100"
SERVER2_HOST="192.168.1.101"
MYSQL_PORT="3306"
MYSQL_USER="root"
MYSQL_PASSWORD="password"
REPL_USER="repl_user"
REPL_PASSWORD="repl_password"
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1"
}
# 生成主主复制配置
generate_master_master_config() {
local server_id=$1
local auto_increment_offset=$2
local config_file="/etc/mysql/conf.d/master_master.cnf"
log "生成服务器$server_id的配置文件"
cat > "$config_file" << EOF
[mysqld]
# 服务器标识
server-id = $server_id
# 二进制日志
log-bin = mysql-bin
binlog-format = ROW
expire-logs-days = 7
max-binlog-size = 100M
# GTID配置
gtid-mode = ON
enforce-gtid-consistency = ON
log-slave-updates = ON
# 自增ID配置(避免冲突)
auto-increment-increment = 2
auto-increment-offset = $auto_increment_offset
# 主主复制优化
slave-parallel-type = LOGICAL_CLOCK
slave-parallel-workers = 4
slave-preserve-commit-order = 1
# 冲突解决
slave-skip-errors = 1062,1032
replicate-same-server-id = 0
# 性能优化
innodb-flush-log-at-trx-commit = 1
sync-binlog = 1
EOF
log "配置文件已生成: $config_file"
}
# 配置服务器复制
setup_server_replication() {
local local_host=$1
local remote_host=$2
local server_name=$3
log "配置$server_name复制: $local_host -> $remote_host"
# 创建复制用户
mysql -h"$local_host" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" << EOF
CREATE USER IF NOT EXISTS '$REPL_USER'@'%' IDENTIFIED BY '$REPL_PASSWORD';
GRANT REPLICATION SLAVE ON *.* TO '$REPL_USER'@'%';
FLUSH PRIVILEGES;
EOF
# 配置到远程服务器的复制
mysql -h"$local_host" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" << EOF
STOP SLAVE;
CHANGE MASTER TO
MASTER_HOST='$remote_host',
MASTER_PORT=$MYSQL_PORT,
MASTER_USER='$REPL_USER',
MASTER_PASSWORD='$REPL_PASSWORD',
MASTER_AUTO_POSITION=1;
START SLAVE;
EOF
log "$server_name复制配置完成"
}
# 检查复制状态
check_replication_status() {
local host=$1
local server_name=$2
log "检查$server_name复制状态"
mysql -h"$host" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" << 'EOF'
SELECT
'Slave Status' as Status_Type,
Slave_IO_Running,
Slave_SQL_Running,
Seconds_Behind_Master,
Last_Error
FROM
(SELECT
IF(Slave_IO_Running='Yes', 'Running', 'Stopped') as Slave_IO_Running,
IF(Slave_SQL_Running='Yes', 'Running', 'Stopped') as Slave_SQL_Running,
Seconds_Behind_Master,
Last_Error
FROM information_schema.SLAVE_HOSTS
UNION ALL
SELECT 'N/A', 'N/A', NULL, 'No slave status available'
LIMIT 1
) as slave_info;
EOF
# 显示GTID状态
mysql -h"$host" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "SELECT @@GLOBAL.gtid_executed as GTID_Executed;"
}
# 测试数据同步
test_data_sync() {
log "测试数据同步"
# 在服务器1创建测试数据
mysql -h"$SERVER1_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" << EOF
CREATE DATABASE IF NOT EXISTS test_replication;
USE test_replication;
CREATE TABLE IF NOT EXISTS sync_test (
id INT AUTO_INCREMENT PRIMARY KEY,
server_source VARCHAR(20),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
data_value VARCHAR(100)
);
INSERT INTO sync_test (server_source, data_value) VALUES ('server1', 'test from server 1');
EOF
# 等待同步
sleep 3
# 在服务器2创建测试数据
mysql -h"$SERVER2_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" << EOF
USE test_replication;
INSERT INTO sync_test (server_source, data_value) VALUES ('server2', 'test from server 2');
EOF
# 等待同步
sleep 3
# 检查两个服务器的数据
log "服务器1的数据:"
mysql -h"$SERVER1_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "SELECT * FROM test_replication.sync_test ORDER BY id;"
log "服务器2的数据:"
mysql -h"$SERVER2_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "SELECT * FROM test_replication.sync_test ORDER BY id;"
}
# 生成冲突检测脚本
generate_conflict_detection() {
cat > "/usr/local/bin/mysql_conflict_detector.sh" << 'EOF'
#!/bin/bash
# MySQL主主复制冲突检测脚本
SERVER1_HOST="192.168.1.100"
SERVER2_HOST="192.168.1.101"
MYSQL_USER="root"
MYSQL_PASSWORD="password"
# 检查复制错误
check_replication_errors() {
local host=$1
local server_name=$2
echo "检查$server_name复制错误:"
mysql -h"$host" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -sN << 'SQL'
SELECT
CONCAT('Last_Error: ', Last_Error) as Error_Info
FROM
(SHOW SLAVE STATUS) as slave_status
WHERE
Last_Error != '';
SQL
# 检查错误日志中的冲突
mysql -h"$host" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" << 'SQL'
SELECT
'Duplicate Key Errors' as Error_Type,
COUNT(*) as Error_Count
FROM
performance_schema.events_errors_summary_global_by_error
WHERE
ERROR_NUMBER = 1062; -- Duplicate entry error
SELECT
'Key Not Found Errors' as Error_Type,
COUNT(*) as Error_Count
FROM
performance_schema.events_errors_summary_global_by_error
WHERE
ERROR_NUMBER = 1032; -- Can't find record error
SQL
}
# 检查自增ID冲突
check_auto_increment_conflicts() {
echo "检查自增ID配置:"
for host in "$SERVER1_HOST" "$SERVER2_HOST"; do
echo "服务器 $host:"
mysql -h"$host" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "SHOW VARIABLES LIKE 'auto_increment_%';"
echo
done
}
# 检查GTID一致性
check_gtid_consistency() {
echo "检查GTID一致性:"
gtid1=$(mysql -h"$SERVER1_HOST" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -sN -e "SELECT @@GLOBAL.gtid_executed;")
gtid2=$(mysql -h"$SERVER2_HOST" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -sN -e "SELECT @@GLOBAL.gtid_executed;")
echo "服务器1 GTID: $gtid1"
echo "服务器2 GTID: $gtid2"
if [ "$gtid1" = "$gtid2" ]; then
echo "GTID一致性: 正常"
else
echo "GTID一致性: 异常 - 存在差异"
fi
}
# 主函数
main() {
echo "MySQL主主复制冲突检测 - $(date)"
echo "========================================"
check_replication_errors "$SERVER1_HOST" "服务器1"
echo
check_replication_errors "$SERVER2_HOST" "服务器2"
echo
check_auto_increment_conflicts
echo
check_gtid_consistency
echo
echo "检测完成"
}
main
EOF
chmod +x "/usr/local/bin/mysql_conflict_detector.sh"
log "冲突检测脚本已生成: /usr/local/bin/mysql_conflict_detector.sh"
}
# 主函数
main() {
case "${1:-help}" in
setup)
log "开始配置MySQL主主复制"
# 生成配置文件
log "为服务器1生成配置"
generate_master_master_config "1" "1"
log "为服务器2生成配置"
generate_master_master_config "2" "2"
echo "请将配置文件复制到对应服务器并重启MySQL服务"
echo "然后运行: $0 configure"
;;
configure)
log "配置双向复制"
# 配置服务器1到服务器2的复制
setup_server_replication "$SERVER1_HOST" "$SERVER2_HOST" "服务器1"
# 配置服务器2到服务器1的复制
setup_server_replication "$SERVER2_HOST" "$SERVER1_HOST" "服务器2"
log "主主复制配置完成"
;;
check)
check_replication_status "$SERVER1_HOST" "服务器1"
echo
check_replication_status "$SERVER2_HOST" "服务器2"
;;
test)
test_data_sync
;;
conflict)
generate_conflict_detection
;;
help|*)
echo "MySQL主主复制配置脚本"
echo
echo "用法: $0 [命令]"
echo
echo "命令:"
echo " setup - 生成配置文件"
echo " configure - 配置双向复制"
echo " check - 检查复制状态"
echo " test - 测试数据同步"
echo " conflict - 生成冲突检测脚本"
echo " help - 显示此帮助信息"
echo
echo "配置步骤:"
echo " 1. 修改脚本中的服务器IP和密码"
echo " 2. 运行 '$0 setup' 生成配置文件"
echo " 3. 将配置文件复制到对应服务器"
echo " 4. 重启两台MySQL服务器"
echo " 5. 运行 '$0 configure' 配置复制"
echo " 6. 运行 '$0 check' 检查状态"
echo " 7. 运行 '$0 test' 测试同步"
;;
esac
}
main "$@"
13.3.2 主主复制冲突处理
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MySQL主主复制冲突处理工具
自动检测和处理主主复制中的数据冲突
"""
import mysql.connector
import logging
import time
import json
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
class ConflictType(Enum):
"""冲突类型"""
DUPLICATE_KEY = "duplicate_key"
MISSING_RECORD = "missing_record"
DATA_INCONSISTENCY = "data_inconsistency"
AUTO_INCREMENT_CONFLICT = "auto_increment_conflict"
@dataclass
class ConflictInfo:
"""冲突信息"""
conflict_type: ConflictType
server: str
database: str
table: str
error_message: str
error_code: int
timestamp: datetime
resolution_strategy: str = ""
resolved: bool = False
class MySQLMasterMasterConflictResolver:
"""MySQL主主复制冲突解决器"""
def __init__(self, server1_config: Dict, server2_config: Dict):
self.server1_config = server1_config
self.server2_config = server2_config
self.conflicts = []
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/mysql_conflict_resolver.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def connect_to_server(self, config: Dict) -> mysql.connector.MySQLConnection:
"""连接到MySQL服务器"""
try:
conn = mysql.connector.connect(**config)
return conn
except mysql.connector.Error as e:
self.logger.error(f"连接失败 {config['host']}: {e}")
raise
def get_slave_status(self, conn: mysql.connector.MySQLConnection) -> Dict:
"""获取从库状态"""
cursor = conn.cursor(dictionary=True)
cursor.execute("SHOW SLAVE STATUS")
result = cursor.fetchone()
cursor.close()
return result or {}
def detect_replication_errors(self) -> List[ConflictInfo]:
"""检测复制错误"""
conflicts = []
for server_name, config in [("server1", self.server1_config), ("server2", self.server2_config)]:
try:
conn = self.connect_to_server(config)
slave_status = self.get_slave_status(conn)
if slave_status and slave_status.get('Last_Error'):
error_message = slave_status['Last_Error']
error_code = slave_status.get('Last_Errno', 0)
# 分析错误类型
conflict_type = self._analyze_error_type(error_code, error_message)
conflict = ConflictInfo(
conflict_type=conflict_type,
server=server_name,
database=self._extract_database_from_error(error_message),
table=self._extract_table_from_error(error_message),
error_message=error_message,
error_code=error_code,
timestamp=datetime.now()
)
conflicts.append(conflict)
self.logger.warning(f"检测到冲突: {server_name} - {error_message}")
conn.close()
except Exception as e:
self.logger.error(f"检测 {server_name} 时出错: {e}")
return conflicts
def _analyze_error_type(self, error_code: int, error_message: str) -> ConflictType:
"""分析错误类型"""
if error_code == 1062 or "Duplicate entry" in error_message:
return ConflictType.DUPLICATE_KEY
elif error_code == 1032 or "Can't find record" in error_message:
return ConflictType.MISSING_RECORD
elif "auto_increment" in error_message.lower():
return ConflictType.AUTO_INCREMENT_CONFLICT
else:
return ConflictType.DATA_INCONSISTENCY
def _extract_database_from_error(self, error_message: str) -> str:
"""从错误消息中提取数据库名"""
# 简单的正则匹配,实际应用中可能需要更复杂的解析
import re
match = re.search(r"database '([^']+)'", error_message)
return match.group(1) if match else "unknown"
def _extract_table_from_error(self, error_message: str) -> str:
"""从错误消息中提取表名"""
import re
match = re.search(r"table '([^']+)'", error_message)
return match.group(1) if match else "unknown"
def resolve_duplicate_key_conflict(self, conflict: ConflictInfo) -> bool:
"""解决重复键冲突"""
try:
server_config = self.server1_config if conflict.server == "server1" else self.server2_config
conn = self.connect_to_server(server_config)
# 策略1: 跳过重复键错误
cursor = conn.cursor()
cursor.execute("SET GLOBAL sql_slave_skip_counter = 1")
cursor.execute("START SLAVE")
cursor.close()
conflict.resolution_strategy = "跳过重复键错误"
conflict.resolved = True
self.logger.info(f"已解决重复键冲突: {conflict.server}")
conn.close()
return True
except Exception as e:
self.logger.error(f"解决重复键冲突失败: {e}")
return False
def resolve_missing_record_conflict(self, conflict: ConflictInfo) -> bool:
"""解决缺失记录冲突"""
try:
server_config = self.server1_config if conflict.server == "server1" else self.server2_config
conn = self.connect_to_server(server_config)
# 策略1: 跳过缺失记录错误
cursor = conn.cursor()
cursor.execute("SET GLOBAL sql_slave_skip_counter = 1")
cursor.execute("START SLAVE")
cursor.close()
conflict.resolution_strategy = "跳过缺失记录错误"
conflict.resolved = True
self.logger.info(f"已解决缺失记录冲突: {conflict.server}")
conn.close()
return True
except Exception as e:
self.logger.error(f"解决缺失记录冲突失败: {e}")
return False
def resolve_auto_increment_conflict(self, conflict: ConflictInfo) -> bool:
"""解决自增ID冲突"""
try:
# 检查并修复自增配置
for i, (server_name, config) in enumerate([("server1", self.server1_config), ("server2", self.server2_config)], 1):
conn = self.connect_to_server(config)
cursor = conn.cursor()
# 设置自增参数
cursor.execute("SET GLOBAL auto_increment_increment = 2")
cursor.execute(f"SET GLOBAL auto_increment_offset = {i}")
cursor.close()
conn.close()
conflict.resolution_strategy = "修复自增ID配置"
conflict.resolved = True
self.logger.info("已解决自增ID冲突")
return True
except Exception as e:
self.logger.error(f"解决自增ID冲突失败: {e}")
return False
def resolve_conflict(self, conflict: ConflictInfo) -> bool:
"""解决冲突"""
self.logger.info(f"开始解决冲突: {conflict.conflict_type.value} on {conflict.server}")
if conflict.conflict_type == ConflictType.DUPLICATE_KEY:
return self.resolve_duplicate_key_conflict(conflict)
elif conflict.conflict_type == ConflictType.MISSING_RECORD:
return self.resolve_missing_record_conflict(conflict)
elif conflict.conflict_type == ConflictType.AUTO_INCREMENT_CONFLICT:
return self.resolve_auto_increment_conflict(conflict)
else:
self.logger.warning(f"未知冲突类型: {conflict.conflict_type}")
return False
def check_data_consistency(self) -> Dict:
"""检查数据一致性"""
consistency_report = {
'timestamp': datetime.now().isoformat(),
'databases': {},
'overall_consistent': True
}
try:
conn1 = self.connect_to_server(self.server1_config)
conn2 = self.connect_to_server(self.server2_config)
# 获取数据库列表
cursor1 = conn1.cursor()
cursor1.execute("SHOW DATABASES")
databases = [db[0] for db in cursor1.fetchall() if db[0] not in ['information_schema', 'performance_schema', 'mysql', 'sys']]
cursor1.close()
for db in databases:
db_report = self._check_database_consistency(conn1, conn2, db)
consistency_report['databases'][db] = db_report
if not db_report['consistent']:
consistency_report['overall_consistent'] = False
conn1.close()
conn2.close()
except Exception as e:
self.logger.error(f"检查数据一致性时出错: {e}")
consistency_report['error'] = str(e)
return consistency_report
def _check_database_consistency(self, conn1, conn2, database: str) -> Dict:
"""检查单个数据库的一致性"""
db_report = {
'consistent': True,
'tables': {},
'row_count_diff': 0
}
try:
# 获取表列表
cursor1 = conn1.cursor()
cursor1.execute(f"USE {database}")
cursor1.execute("SHOW TABLES")
tables = [table[0] for table in cursor1.fetchall()]
cursor1.close()
cursor2 = conn2.cursor()
cursor2.execute(f"USE {database}")
for table in tables:
table_report = self._check_table_consistency(conn1, conn2, database, table)
db_report['tables'][table] = table_report
if not table_report['consistent']:
db_report['consistent'] = False
db_report['row_count_diff'] += abs(table_report.get('row_count_diff', 0))
cursor2.close()
except Exception as e:
self.logger.error(f"检查数据库 {database} 一致性时出错: {e}")
db_report['error'] = str(e)
db_report['consistent'] = False
return db_report
def _check_table_consistency(self, conn1, conn2, database: str, table: str) -> Dict:
"""检查单个表的一致性"""
table_report = {
'consistent': True,
'row_count_server1': 0,
'row_count_server2': 0,
'row_count_diff': 0,
'checksum_server1': '',
'checksum_server2': ''
}
try:
# 检查行数
cursor1 = conn1.cursor()
cursor1.execute(f"SELECT COUNT(*) FROM {database}.{table}")
count1 = cursor1.fetchone()[0]
cursor1.close()
cursor2 = conn2.cursor()
cursor2.execute(f"SELECT COUNT(*) FROM {database}.{table}")
count2 = cursor2.fetchone()[0]
cursor2.close()
table_report['row_count_server1'] = count1
table_report['row_count_server2'] = count2
table_report['row_count_diff'] = count1 - count2
if count1 != count2:
table_report['consistent'] = False
# 计算校验和(如果行数相同)
if count1 == count2 and count1 > 0:
cursor1 = conn1.cursor()
cursor1.execute(f"CHECKSUM TABLE {database}.{table}")
checksum1 = cursor1.fetchone()[1]
cursor1.close()
cursor2 = conn2.cursor()
cursor2.execute(f"CHECKSUM TABLE {database}.{table}")
checksum2 = cursor2.fetchone()[1]
cursor2.close()
table_report['checksum_server1'] = str(checksum1)
table_report['checksum_server2'] = str(checksum2)
if checksum1 != checksum2:
table_report['consistent'] = False
except Exception as e:
self.logger.error(f"检查表 {database}.{table} 一致性时出错: {e}")
table_report['error'] = str(e)
table_report['consistent'] = False
return table_report
def generate_conflict_report(self) -> Dict:
"""生成冲突报告"""
report = {
'timestamp': datetime.now().isoformat(),
'total_conflicts': len(self.conflicts),
'resolved_conflicts': len([c for c in self.conflicts if c.resolved]),
'unresolved_conflicts': len([c for c in self.conflicts if not c.resolved]),
'conflicts_by_type': {},
'conflicts': []
}
# 按类型统计冲突
for conflict_type in ConflictType:
count = len([c for c in self.conflicts if c.conflict_type == conflict_type])
report['conflicts_by_type'][conflict_type.value] = count
# 冲突详情
for conflict in self.conflicts:
conflict_data = {
'type': conflict.conflict_type.value,
'server': conflict.server,
'database': conflict.database,
'table': conflict.table,
'error_message': conflict.error_message,
'error_code': conflict.error_code,
'timestamp': conflict.timestamp.isoformat(),
'resolution_strategy': conflict.resolution_strategy,
'resolved': conflict.resolved
}
report['conflicts'].append(conflict_data)
return report
def run_conflict_resolution(self, auto_resolve: bool = False) -> Dict:
"""运行冲突解决流程"""
self.logger.info("开始冲突检测和解决流程")
# 检测冲突
conflicts = self.detect_replication_errors()
self.conflicts.extend(conflicts)
if not conflicts:
self.logger.info("未检测到冲突")
return {'status': 'success', 'message': '未检测到冲突'}
self.logger.info(f"检测到 {len(conflicts)} 个冲突")
# 自动解决冲突
if auto_resolve:
resolved_count = 0
for conflict in conflicts:
if self.resolve_conflict(conflict):
resolved_count += 1
self.logger.info(f"已解决 {resolved_count}/{len(conflicts)} 个冲突")
# 生成报告
report = self.generate_conflict_report()
# 检查数据一致性
consistency_report = self.check_data_consistency()
report['data_consistency'] = consistency_report
return report
def export_report(self, report: Dict, filename: str = None):
"""导出报告"""
if not filename:
filename = f"mysql_conflict_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
self.logger.info(f"报告已导出到: {filename}")
# 使用示例
if __name__ == '__main__':
# 服务器配置
server1_config = {
'host': '192.168.1.100',
'port': 3306,
'user': 'root',
'password': 'password',
'autocommit': True
}
server2_config = {
'host': '192.168.1.101',
'port': 3306,
'user': 'root',
'password': 'password',
'autocommit': True
}
# 创建冲突解决器
resolver = MySQLMasterMasterConflictResolver(server1_config, server2_config)
# 运行冲突解决流程
report = resolver.run_conflict_resolution(auto_resolve=True)
# 导出报告
resolver.export_report(report)
print("冲突解决完成,详细报告已生成")
print(f"总冲突数: {report['total_conflicts']}")
print(f"已解决: {report['resolved_conflicts']}")
print(f"未解决: {report['unresolved_conflicts']}")
13.4 MySQL集群架构
13.4.1 MySQL Cluster (NDB) 配置
MySQL Cluster是基于NDB存储引擎的分布式集群解决方案,提供真正的无单点故障架构。
#!/bin/bash
# MySQL Cluster自动部署脚本
# 部署管理节点、数据节点和SQL节点
set -euo pipefail
# 集群配置
CLUSTER_NAME="mysql_cluster"
MANAGEMENT_NODE="192.168.1.100"
DATA_NODES=("192.168.1.101" "192.168.1.102")
SQL_NODES=("192.168.1.103" "192.168.1.104")
CLUSTER_USER="mysql"
CLUSTER_PASSWORD="cluster_password"
DATA_DIR="/var/lib/mysql-cluster"
LOG_DIR="/var/log/mysql-cluster"
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1"
}
# 错误处理
error_exit() {
log "错误: $1"
exit 1
}
# 检查SSH连接
check_ssh_connection() {
local host=$1
if ! ssh -o ConnectTimeout=5 "$host" "echo 'SSH连接正常'" >/dev/null 2>&1; then
error_exit "无法SSH连接到 $host"
fi
log "SSH连接到 $host 正常"
}
# 安装MySQL Cluster软件包
install_mysql_cluster() {
local host=$1
local node_type=$2
log "在 $host 安装MySQL Cluster ($node_type)"
ssh "$host" << EOF
# 更新系统
sudo apt-get update
# 安装MySQL Cluster
wget https://dev.mysql.com/get/Downloads/MySQL-Cluster-8.0/mysql-cluster-community-server_8.0.34-1ubuntu20.04_amd64.deb
wget https://dev.mysql.com/get/Downloads/MySQL-Cluster-8.0/mysql-cluster-community-client_8.0.34-1ubuntu20.04_amd64.deb
wget https://dev.mysql.com/get/Downloads/MySQL-Cluster-8.0/mysql-cluster-community-common_8.0.34-1ubuntu20.04_amd64.deb
sudo dpkg -i mysql-cluster-community-*.deb
sudo apt-get install -f
# 创建用户和目录
sudo useradd -r -s /bin/false mysql || true
sudo mkdir -p $DATA_DIR $LOG_DIR
sudo chown mysql:mysql $DATA_DIR $LOG_DIR
EOF
log "$host MySQL Cluster安装完成"
}
# 生成管理节点配置
generate_management_config() {
log "生成管理节点配置"
cat > "/tmp/config.ini" << EOF
[ndbd default]
NoOfReplicas=2
DataMemory=80M
IndexMemory=18M
ServerPort=2202
DataDir=$DATA_DIR
MaxNoOfConcurrentOperations=100000
MaxNoOfLocalOperations=110000
RedoBuffer=8M
LongMessageBuffer=1M
MaxNoOfTriggers=14000
MaxNoOfFiredTriggers=4000
TransactionBufferMemory=1M
DefaultHashMapSize=241
DefaultHashMapBuckets=240
MaxNoOfOrderedIndexes=512
MaxNoOfUniqueHashIndexes=64
MaxNoOfAttributes=1000
MaxNoOfTables=128
StringMemory=25
MaxNoOfConcurrentIndexOperations=8K
MaxNoOfConcurrentScans=256
MaxNoOfLocalScans=32
BatchSizePerLocalScan=256
LockPagesInMainMemory=1
TimeBetweenWatchDogCheck=6000
TimeBetweenWatchDogCheckInitial=6000
StartFailureTimeout=0
HeartbeatIntervalDbDb=5000
HeartbeatIntervalDbApi=1500
TimeBetweenLocalCheckpoints=20
TimeBetweenGlobalCheckpoints=2000
CompressedLCP=0
CompressedBackup=1
BackupMaxWriteSize=1M
BackupLogBufferSize=2M
BackupDataBufferSize=2M
MaxAllocate=32M
MemReportFrequency=30
BackupReportFrequency=10
DiskCheckpointSpeed=10M
DiskCheckpointSpeedInRestart=100M
ArbitrationTimeout=7500
ArbitrationDelay=0
StopOnError=true
RestartOnErrorInsert=2
MaxStartFailRetries=3
StartFailRetryDelay=0
[ndbd mgmd]
Hostname=$MANAGEMENT_NODE
DataDir=$DATA_DIR
LogDestination=FILE:filename=ndb_mgmd.log,maxsize=1000000,maxfiles=6
ArbitrationRank=1
EOF
# 添加数据节点配置
local node_id=2
for data_node in "${DATA_NODES[@]}"; do
cat >> "/tmp/config.ini" << EOF
[ndbd]
Hostname=$data_node
NodeId=$node_id
DataDir=$DATA_DIR
LogDestination=FILE:filename=ndb_${node_id}.log,maxsize=1000000,maxfiles=6
EOF
((node_id++))
done
# 添加SQL节点配置
for sql_node in "${SQL_NODES[@]}"; do
cat >> "/tmp/config.ini" << EOF
[mysqld]
Hostname=$sql_node
NodeId=$node_id
EOF
((node_id++))
done
# 添加API节点配置(用于应用连接)
cat >> "/tmp/config.ini" << EOF
[mysqld]
[mysqld]
[mysqld]
[mysqld]
EOF
log "管理节点配置已生成"
}
# 配置管理节点
setup_management_node() {
log "配置管理节点: $MANAGEMENT_NODE"
# 检查连接
check_ssh_connection "$MANAGEMENT_NODE"
# 安装软件
install_mysql_cluster "$MANAGEMENT_NODE" "management"
# 复制配置文件
scp "/tmp/config.ini" "$MANAGEMENT_NODE:/var/lib/mysql-cluster/"
# 启动管理节点
ssh "$MANAGEMENT_NODE" << EOF
sudo ndb_mgmd -f /var/lib/mysql-cluster/config.ini --initial --configdir=/var/lib/mysql-cluster/
EOF
log "管理节点配置完成"
}
# 配置数据节点
setup_data_nodes() {
for data_node in "${DATA_NODES[@]}"; do
log "配置数据节点: $data_node"
# 检查连接
check_ssh_connection "$data_node"
# 安装软件
install_mysql_cluster "$data_node" "data"
# 生成数据节点配置
ssh "$data_node" << EOF
sudo mkdir -p /etc/mysql
cat > /tmp/my.cnf << 'MYCNF'
[mysql_cluster]
ndb-connectstring=$MANAGEMENT_NODE
[ndbd]
connect-string=$MANAGEMENT_NODE
MYCNF
sudo mv /tmp/my.cnf /etc/mysql/my.cnf
sudo chown mysql:mysql /etc/mysql/my.cnf
EOF
# 启动数据节点
ssh "$data_node" << EOF
sudo ndbd --initial
EOF
log "数据节点 $data_node 配置完成"
done
}
# 配置SQL节点
setup_sql_nodes() {
for sql_node in "${SQL_NODES[@]}"; do
log "配置SQL节点: $sql_node"
# 检查连接
check_ssh_connection "$sql_node"
# 安装软件
install_mysql_cluster "$sql_node" "sql"
# 生成SQL节点配置
ssh "$sql_node" << EOF
sudo mkdir -p /etc/mysql
cat > /tmp/my.cnf << 'MYCNF'
[mysqld]
user=mysql
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
port=3306
ndbcluster
ndb-connectstring=$MANAGEMENT_NODE
[mysql_cluster]
ndb-connectstring=$MANAGEMENT_NODE
[client]
port=3306
socket=/var/lib/mysql/mysql.sock
MYCNF
sudo mv /tmp/my.cnf /etc/mysql/my.cnf
sudo chown mysql:mysql /etc/mysql/my.cnf
# 初始化MySQL数据目录
sudo mysqld --initialize-insecure --user=mysql --datadir=/var/lib/mysql
# 启动MySQL服务
sudo mysqld_safe --user=mysql &
EOF
# 等待MySQL启动
sleep 10
# 配置MySQL用户
ssh "$sql_node" << EOF
mysql -u root << 'SQL'
CREATE USER '$CLUSTER_USER'@'%' IDENTIFIED BY '$CLUSTER_PASSWORD';
GRANT ALL PRIVILEGES ON *.* TO '$CLUSTER_USER'@'%' WITH GRANT OPTION;
FLUSH PRIVILEGES;
SQL
EOF
log "SQL节点 $sql_node 配置完成"
done
}
# 检查集群状态
check_cluster_status() {
log "检查集群状态"
ssh "$MANAGEMENT_NODE" << 'EOF'
echo "集群状态:"
ndb_mgm -e "SHOW"
echo -e "\n节点状态:"
ndb_mgm -e "ALL STATUS"
echo -e "\n集群报告:"
ndb_mgm -e "ALL REPORT MEMORY"
EOF
}
# 创建测试数据库和表
create_test_database() {
log "创建测试数据库"
local sql_node="${SQL_NODES[0]}"
ssh "$sql_node" << EOF
mysql -u$CLUSTER_USER -p$CLUSTER_PASSWORD << 'SQL'
CREATE DATABASE IF NOT EXISTS cluster_test;
USE cluster_test;
CREATE TABLE test_table (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=NDB;
INSERT INTO test_table (name) VALUES
('Test Record 1'),
('Test Record 2'),
('Test Record 3');
SELECT * FROM test_table;
SQL
EOF
log "测试数据库创建完成"
}
# 生成集群监控脚本
generate_monitoring_script() {
log "生成集群监控脚本"
cat > "/tmp/mysql_cluster_monitor.sh" << 'EOF'
#!/bin/bash
# MySQL Cluster监控脚本
MANAGEMENT_NODE="192.168.1.100"
ALERT_EMAIL="admin@example.com"
LOG_FILE="/var/log/mysql_cluster_monitor.log"
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}
# 检查管理节点
check_management_node() {
if ! ssh "$MANAGEMENT_NODE" "pgrep ndb_mgmd" >/dev/null 2>&1; then
log "ALERT: 管理节点离线"
send_alert "MySQL Cluster管理节点离线" "管理节点 $MANAGEMENT_NODE 无法访问"
return 1
fi
return 0
}
# 检查数据节点
check_data_nodes() {
local failed_nodes=()
local status_output=$(ssh "$MANAGEMENT_NODE" "ndb_mgm -e 'SHOW'" 2>/dev/null)
if echo "$status_output" | grep -q "not connected"; then
local failed=$(echo "$status_output" | grep "not connected" | awk '{print $2}')
failed_nodes+=("$failed")
fi
if [ ${#failed_nodes[@]} -gt 0 ]; then
log "ALERT: 数据节点故障: ${failed_nodes[*]}"
send_alert "MySQL Cluster数据节点故障" "故障节点: ${failed_nodes[*]}"
return 1
fi
return 0
}
# 检查SQL节点
check_sql_nodes() {
local sql_nodes=("192.168.1.103" "192.168.1.104")
local failed_nodes=()
for node in "${sql_nodes[@]}"; do
if ! ssh "$node" "mysqladmin ping" >/dev/null 2>&1; then
failed_nodes+=("$node")
fi
done
if [ ${#failed_nodes[@]} -gt 0 ]; then
log "ALERT: SQL节点故障: ${failed_nodes[*]}"
send_alert "MySQL Cluster SQL节点故障" "故障节点: ${failed_nodes[*]}"
return 1
fi
return 0
}
# 检查集群性能
check_cluster_performance() {
local memory_usage=$(ssh "$MANAGEMENT_NODE" "ndb_mgm -e 'ALL REPORT MEMORY'" 2>/dev/null | grep "Data usage is" | head -1 | awk '{print $4}' | tr -d '%')
if [ -n "$memory_usage" ] && [ "$memory_usage" -gt 80 ]; then
log "WARNING: 数据内存使用率过高: ${memory_usage}%"
send_alert "MySQL Cluster内存告警" "数据内存使用率: ${memory_usage}%"
fi
}
# 发送告警
send_alert() {
local subject="$1"
local message="$2"
echo "$message" | mail -s "$subject" "$ALERT_EMAIL" 2>/dev/null || {
log "WARNING: 无法发送邮件告警"
logger "MySQL Cluster Alert: $subject - $message"
}
}
# 生成状态报告
generate_status_report() {
local report_file="/tmp/cluster_status_$(date +%Y%m%d_%H%M%S).txt"
{
echo "MySQL Cluster状态报告 - $(date)"
echo "========================================"
echo
echo "集群概览:"
ssh "$MANAGEMENT_NODE" "ndb_mgm -e 'SHOW'"
echo
echo "节点状态:"
ssh "$MANAGEMENT_NODE" "ndb_mgm -e 'ALL STATUS'"
echo
echo "内存使用:"
ssh "$MANAGEMENT_NODE" "ndb_mgm -e 'ALL REPORT MEMORY'"
echo
echo "备份状态:"
ssh "$MANAGEMENT_NODE" "ndb_mgm -e 'ALL REPORT BACKUP'"
} > "$report_file"
log "状态报告已生成: $report_file"
}
# 主函数
main() {
case "${1:-check}" in
check)
log "开始集群健康检查"
local errors=0
check_management_node || ((errors++))
check_data_nodes || ((errors++))
check_sql_nodes || ((errors++))
check_cluster_performance
if [ $errors -eq 0 ]; then
log "集群状态正常"
else
log "检测到 $errors 个问题"
fi
;;
report)
generate_status_report
;;
monitor)
while true; do
main check
sleep 60
done
;;
*)
echo "用法: $0 {check|report|monitor}"
echo " check - 检查集群状态"
echo " report - 生成状态报告"
echo " monitor - 持续监控"
exit 1
;;
esac
}
main "$@"
EOF
chmod +x "/tmp/mysql_cluster_monitor.sh"
# 复制到管理节点
scp "/tmp/mysql_cluster_monitor.sh" "$MANAGEMENT_NODE:/usr/local/bin/"
log "集群监控脚本已生成并部署"
}
# 主函数
main() {
case "${1:-help}" in
install)
log "开始MySQL Cluster部署"
# 生成配置
generate_management_config
# 配置各节点
setup_management_node
setup_data_nodes
setup_sql_nodes
# 检查状态
sleep 10
check_cluster_status
# 创建测试数据
create_test_database
# 生成监控脚本
generate_monitoring_script
log "MySQL Cluster部署完成"
;;
status)
check_cluster_status
;;
test)
create_test_database
;;
monitor)
generate_monitoring_script
;;
help|*)
echo "MySQL Cluster部署脚本"
echo
echo "用法: $0 [命令]"
echo
echo "命令:"
echo " install - 完整部署集群"
echo " status - 检查集群状态"
echo " test - 创建测试数据"
echo " monitor - 生成监控脚本"
echo " help - 显示此帮助信息"
echo
echo "配置说明:"
echo " 管理节点: $MANAGEMENT_NODE"
echo " 数据节点: ${DATA_NODES[*]}"
echo " SQL节点: ${SQL_NODES[*]}"
echo
echo "部署前请确保:"
echo " 1. 所有节点已配置SSH免密登录"
echo " 2. 所有节点防火墙已开放相关端口"
echo " 3. 所有节点时间已同步"
;;
esac
}
main "$@"
13.4.2 MySQL Cluster管理
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MySQL Cluster管理工具
提供集群监控、备份、故障恢复等功能
"""
import subprocess
import json
import time
import logging
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
class NodeType(Enum):
"""节点类型"""
MANAGEMENT = "mgmd"
DATA = "ndbd"
SQL = "mysqld"
class NodeStatus(Enum):
"""节点状态"""
CONNECTED = "connected"
NOT_CONNECTED = "not connected"
STARTING = "starting"
SHUTTING_DOWN = "shutting down"
RESTARTING = "restarting"
@dataclass
class ClusterNode:
"""集群节点信息"""
node_id: int
node_type: NodeType
hostname: str
status: NodeStatus
version: str = ""
group: int = 0
master: bool = False
class MySQLClusterManager:
"""MySQL Cluster管理器"""
def __init__(self, management_host: str = "localhost", management_port: int = 1186):
self.management_host = management_host
self.management_port = management_port
self.connect_string = f"{management_host}:{management_port}"
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/mysql_cluster_manager.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def execute_mgm_command(self, command: str) -> str:
"""执行管理命令"""
try:
cmd = ["ndb_mgm", "-c", self.connect_string, "-e", command]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
raise Exception(f"命令执行失败: {result.stderr}")
return result.stdout.strip()
except subprocess.TimeoutExpired:
raise Exception("命令执行超时")
except Exception as e:
self.logger.error(f"执行管理命令失败: {e}")
raise
def get_cluster_status(self) -> Dict:
"""获取集群状态"""
try:
output = self.execute_mgm_command("SHOW")
nodes = self._parse_cluster_status(output)
status = {
'timestamp': datetime.now().isoformat(),
'total_nodes': len(nodes),
'connected_nodes': len([n for n in nodes if n.status == NodeStatus.CONNECTED]),
'nodes': []
}
for node in nodes:
node_info = {
'node_id': node.node_id,
'type': node.node_type.value,
'hostname': node.hostname,
'status': node.status.value,
'version': node.version,
'group': node.group,
'master': node.master
}
status['nodes'].append(node_info)
return status
except Exception as e:
self.logger.error(f"获取集群状态失败: {e}")
return {'error': str(e)}
def _parse_cluster_status(self, output: str) -> List[ClusterNode]:
"""解析集群状态输出"""
nodes = []
lines = output.split('\n')
for line in lines:
line = line.strip()
if not line or line.startswith('Cluster') or line.startswith('Node'):
continue
# 解析节点信息
parts = line.split()
if len(parts) >= 4:
try:
node_id = int(parts[0])
node_type_str = parts[1].lower()
hostname = parts[2] if len(parts) > 2 else "unknown"
status_str = ' '.join(parts[3:]).lower()
# 确定节点类型
if 'mgm' in node_type_str:
node_type = NodeType.MANAGEMENT
elif 'ndb' in node_type_str:
node_type = NodeType.DATA
else:
node_type = NodeType.SQL
# 确定节点状态
if 'connected' in status_str:
status = NodeStatus.CONNECTED
elif 'not connected' in status_str:
status = NodeStatus.NOT_CONNECTED
elif 'starting' in status_str:
status = NodeStatus.STARTING
elif 'shutting down' in status_str:
status = NodeStatus.SHUTTING_DOWN
else:
status = NodeStatus.RESTARTING
node = ClusterNode(
node_id=node_id,
node_type=node_type,
hostname=hostname,
status=status
)
nodes.append(node)
except (ValueError, IndexError) as e:
self.logger.warning(f"解析节点信息失败: {line} - {e}")
return nodes
def get_node_status(self, node_id: int) -> Dict:
"""获取特定节点状态"""
try:
output = self.execute_mgm_command(f"{node_id} STATUS")
status = {
'node_id': node_id,
'timestamp': datetime.now().isoformat(),
'details': output
}
return status
except Exception as e:
self.logger.error(f"获取节点 {node_id} 状态失败: {e}")
return {'error': str(e)}
def get_memory_usage(self) -> Dict:
"""获取内存使用情况"""
try:
output = self.execute_mgm_command("ALL REPORT MEMORY")
memory_info = {
'timestamp': datetime.now().isoformat(),
'nodes': []
}
lines = output.split('\n')
current_node = None
for line in lines:
line = line.strip()
if line.startswith('Node'):
if current_node:
memory_info['nodes'].append(current_node)
node_id = int(line.split()[1].rstrip(':'))
current_node = {
'node_id': node_id,
'data_memory': {},
'index_memory': {}
}
elif current_node and 'Data usage is' in line:
# 解析数据内存使用
parts = line.split()
usage_pct = int(parts[3].rstrip('%'))
used_mb = int(parts[0])
total_mb = int(parts[5])
current_node['data_memory'] = {
'used_mb': used_mb,
'total_mb': total_mb,
'usage_percent': usage_pct
}
elif current_node and 'Index usage is' in line:
# 解析索引内存使用
parts = line.split()
usage_pct = int(parts[3].rstrip('%'))
used_mb = int(parts[0])
total_mb = int(parts[5])
current_node['index_memory'] = {
'used_mb': used_mb,
'total_mb': total_mb,
'usage_percent': usage_pct
}
if current_node:
memory_info['nodes'].append(current_node)
return memory_info
except Exception as e:
self.logger.error(f"获取内存使用情况失败: {e}")
return {'error': str(e)}
def start_backup(self, backup_id: Optional[int] = None, wait: bool = True) -> Dict:
"""启动集群备份"""
try:
if backup_id:
command = f"START BACKUP {backup_id}"
else:
command = "START BACKUP"
if wait:
command += " WAIT COMPLETED"
output = self.execute_mgm_command(command)
backup_info = {
'timestamp': datetime.now().isoformat(),
'command': command,
'output': output,
'status': 'started' if not wait else 'completed'
}
# 解析备份ID
for line in output.split('\n'):
if 'Backup' in line and 'started' in line:
parts = line.split()
for i, part in enumerate(parts):
if part == 'Backup' and i + 1 < len(parts):
backup_info['backup_id'] = int(parts[i + 1])
break
self.logger.info(f"备份已启动: {backup_info.get('backup_id', 'unknown')}")
return backup_info
except Exception as e:
self.logger.error(f"启动备份失败: {e}")
return {'error': str(e)}
def get_backup_status(self) -> Dict:
"""获取备份状态"""
try:
output = self.execute_mgm_command("ALL REPORT BACKUP")
backup_status = {
'timestamp': datetime.now().isoformat(),
'details': output
}
return backup_status
except Exception as e:
self.logger.error(f"获取备份状态失败: {e}")
return {'error': str(e)}
def restart_node(self, node_id: int, initial: bool = False) -> Dict:
"""重启节点"""
try:
command = f"{node_id} RESTART"
if initial:
command += " -I"
output = self.execute_mgm_command(command)
restart_info = {
'node_id': node_id,
'timestamp': datetime.now().isoformat(),
'initial': initial,
'output': output
}
self.logger.info(f"节点 {node_id} 重启命令已发送")
return restart_info
except Exception as e:
self.logger.error(f"重启节点 {node_id} 失败: {e}")
return {'error': str(e)}
def stop_node(self, node_id: int, abort: bool = False) -> Dict:
"""停止节点"""
try:
command = f"{node_id} STOP"
if abort:
command += " -A"
output = self.execute_mgm_command(command)
stop_info = {
'node_id': node_id,
'timestamp': datetime.now().isoformat(),
'abort': abort,
'output': output
}
self.logger.info(f"节点 {node_id} 停止命令已发送")
return stop_info
except Exception as e:
self.logger.error(f"停止节点 {node_id} 失败: {e}")
return {'error': str(e)}
def rolling_restart(self, node_type: Optional[NodeType] = None) -> Dict:
"""滚动重启"""
try:
self.logger.info("开始滚动重启")
# 获取当前集群状态
cluster_status = self.get_cluster_status()
if 'error' in cluster_status:
return cluster_status
restart_results = []
# 过滤需要重启的节点
nodes_to_restart = []
for node in cluster_status['nodes']:
if node_type is None or NodeType(node['type']) == node_type:
if node['status'] == NodeStatus.CONNECTED.value:
nodes_to_restart.append(node)
# 逐个重启节点
for node in nodes_to_restart:
node_id = node['node_id']
self.logger.info(f"重启节点 {node_id}")
# 重启节点
restart_result = self.restart_node(node_id)
restart_results.append(restart_result)
if 'error' in restart_result:
self.logger.error(f"节点 {node_id} 重启失败")
continue
# 等待节点重新连接
self.logger.info(f"等待节点 {node_id} 重新连接")
max_wait = 300 # 5分钟
wait_time = 0
while wait_time < max_wait:
time.sleep(10)
wait_time += 10
node_status = self.get_node_status(node_id)
if 'error' not in node_status:
self.logger.info(f"节点 {node_id} 已重新连接")
break
else:
self.logger.warning(f"节点 {node_id} 重启超时")
rolling_restart_info = {
'timestamp': datetime.now().isoformat(),
'node_type': node_type.value if node_type else 'all',
'total_nodes': len(nodes_to_restart),
'restart_results': restart_results
}
self.logger.info("滚动重启完成")
return rolling_restart_info
except Exception as e:
self.logger.error(f"滚动重启失败: {e}")
return {'error': str(e)}
def generate_health_report(self) -> Dict:
"""生成健康报告"""
report = {
'timestamp': datetime.now().isoformat(),
'cluster_status': self.get_cluster_status(),
'memory_usage': self.get_memory_usage(),
'backup_status': self.get_backup_status(),
'health_score': 0,
'recommendations': []
}
# 计算健康分数
health_score = 100
# 检查节点连接状态
cluster_status = report['cluster_status']
if 'error' not in cluster_status:
total_nodes = cluster_status['total_nodes']
connected_nodes = cluster_status['connected_nodes']
if total_nodes > 0:
connection_ratio = connected_nodes / total_nodes
if connection_ratio < 1.0:
health_score -= (1.0 - connection_ratio) * 50
report['recommendations'].append(f"有 {total_nodes - connected_nodes} 个节点未连接")
# 检查内存使用
memory_usage = report['memory_usage']
if 'error' not in memory_usage:
for node in memory_usage['nodes']:
data_usage = node.get('data_memory', {}).get('usage_percent', 0)
index_usage = node.get('index_memory', {}).get('usage_percent', 0)
if data_usage > 80:
health_score -= 10
report['recommendations'].append(f"节点 {node['node_id']} 数据内存使用率过高: {data_usage}%")
if index_usage > 80:
health_score -= 10
report['recommendations'].append(f"节点 {node['node_id']} 索引内存使用率过高: {index_usage}%")
report['health_score'] = max(0, health_score)
return report
def export_report(self, report: Dict, filename: str = None):
"""导出报告"""
if not filename:
filename = f"mysql_cluster_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
self.logger.info(f"报告已导出到: {filename}")
# 使用示例
if __name__ == '__main__':
# 创建集群管理器
manager = MySQLClusterManager("192.168.1.100")
# 获取集群状态
status = manager.get_cluster_status()
print("集群状态:")
print(json.dumps(status, ensure_ascii=False, indent=2))
# 获取内存使用情况
memory = manager.get_memory_usage()
print("\n内存使用:")
print(json.dumps(memory, ensure_ascii=False, indent=2))
# 启动备份
backup = manager.start_backup(wait=False)
print("\n备份状态:")
print(json.dumps(backup, ensure_ascii=False, indent=2))
# 生成健康报告
health_report = manager.generate_health_report()
print("\n健康报告:")
print(json.dumps(health_report, ensure_ascii=False, indent=2))
# 导出报告
manager.export_report(health_report)
13.5 代理层高可用
13.5.1 ProxySQL配置与管理
ProxySQL是一个高性能的MySQL代理,提供连接池、读写分离、故障转移等功能。
#!/bin/bash
# ProxySQL自动部署和配置脚本
set -euo pipefail
# 配置参数
PROXYSQL_VERSION="2.5.5"
PROXYSQL_ADMIN_USER="admin"
PROXYSQL_ADMIN_PASSWORD="admin_password"
PROXYSQL_MONITOR_USER="monitor"
PROXYSQL_MONITOR_PASSWORD="monitor_password"
PROXYSQL_PORT="6033"
PROXYSQL_ADMIN_PORT="6032"
# MySQL服务器配置
MYSQL_MASTER="192.168.1.100:3306"
MYSQL_SLAVES=("192.168.1.101:3306" "192.168.1.102:3306")
MYSQL_USER="proxysql_user"
MYSQL_PASSWORD="proxysql_password"
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1"
}
# 错误处理
error_exit() {
log "错误: $1"
exit 1
}
# 安装ProxySQL
install_proxysql() {
log "安装ProxySQL $PROXYSQL_VERSION"
# 下载并安装ProxySQL
if ! command -v proxysql &> /dev/null; then
# Ubuntu/Debian
if command -v apt-get &> /dev/null; then
wget -O - 'https://repo.proxysql.com/ProxySQL/repo_pub_key' | apt-key add -
echo deb https://repo.proxysql.com/ProxySQL/proxysql-2.5.x/$(lsb_release -sc)/ ./ | tee /etc/apt/sources.list.d/proxysql.list
apt-get update
apt-get install -y proxysql=$PROXYSQL_VERSION
# CentOS/RHEL
elif command -v yum &> /dev/null; then
cat > /etc/yum.repos.d/proxysql.repo << EOF
[proxysql_repo]
name= ProxySQL YUM repository
baseurl=https://repo.proxysql.com/ProxySQL/proxysql-2.5.x/centos/\$releasever
gpgcheck=1
gpgkey=https://repo.proxysql.com/ProxySQL/repo_pub_key
EOF
yum install -y proxysql-$PROXYSQL_VERSION
else
error_exit "不支持的操作系统"
fi
fi
log "ProxySQL安装完成"
}
# 生成ProxySQL配置文件
generate_proxysql_config() {
log "生成ProxySQL配置文件"
cat > "/etc/proxysql.cnf" << EOF
datadir="/var/lib/proxysql"
errorlog="/var/lib/proxysql/proxysql.log"
admin_variables=
{
admin_credentials="$PROXYSQL_ADMIN_USER:$PROXYSQL_ADMIN_PASSWORD"
mysql_ifaces="0.0.0.0:$PROXYSQL_ADMIN_PORT"
refresh_interval=2000
debug=false
}
mysql_variables=
{
threads=4
max_connections=2048
default_query_delay=0
default_query_timeout=36000000
have_compress=true
poll_timeout=2000
interfaces="0.0.0.0:$PROXYSQL_PORT"
default_schema="information_schema"
stacksize=1048576
server_version="8.0.25"
connect_timeout_server=3000
monitor_username="$PROXYSQL_MONITOR_USER"
monitor_password="$PROXYSQL_MONITOR_PASSWORD"
monitor_history=600000
monitor_connect_interval=60000
monitor_ping_interval=10000
monitor_read_only_interval=1500
monitor_read_only_timeout=500
ping_interval_server_msec=120000
ping_timeout_server=500
commands_stats=true
sessions_sort=true
connect_retries_on_failure=10
}
mysql_servers =
(
EOF
# 添加主服务器
local master_host=$(echo $MYSQL_MASTER | cut -d':' -f1)
local master_port=$(echo $MYSQL_MASTER | cut -d':' -f2)
cat >> "/etc/proxysql.cnf" << EOF
{
address="$master_host"
port=$master_port
hostgroup=0
status="ONLINE"
weight=1000
compression=0
max_replication_lag=10
use_ssl=0
max_latency_ms=0
comment="Master Server"
},
EOF
# 添加从服务器
local hostgroup_id=1
for slave in "${MYSQL_SLAVES[@]}"; do
local slave_host=$(echo $slave | cut -d':' -f1)
local slave_port=$(echo $slave | cut -d':' -f2)
cat >> "/etc/proxysql.cnf" << EOF
{
address="$slave_host"
port=$slave_port
hostgroup=$hostgroup_id
status="ONLINE"
weight=900
compression=0
max_replication_lag=10
use_ssl=0
max_latency_ms=0
comment="Slave Server $hostgroup_id"
},
EOF
((hostgroup_id++))
done
# 移除最后的逗号并关闭配置
sed -i '$ s/,$//' "/etc/proxysql.cnf"
cat >> "/etc/proxysql.cnf" << EOF
)
mysql_users:
(
{
username = "$MYSQL_USER"
password = "$MYSQL_PASSWORD"
default_hostgroup = 0
max_connections=1000
default_schema="test"
active = 1
}
)
mysql_query_rules:
(
{
rule_id=1
active=1
match_pattern="^SELECT.*FOR UPDATE$"
destination_hostgroup=0
apply=1
comment="Send SELECT FOR UPDATE to master"
},
{
rule_id=2
active=1
match_pattern="^SELECT"
destination_hostgroup=1
apply=1
comment="Send SELECT to slaves"
}
)
scheduler=
(
{
id=1
active=1
interval_ms=10000
filename="/var/lib/proxysql/proxysql_galera_checker.sh"
arg1="0"
arg2="0"
arg3="0"
arg4="1"
arg5="/var/lib/proxysql/proxysql_galera_checker.log"
comment="Galera checker"
}
)
EOF
log "ProxySQL配置文件已生成"
}
# 配置MySQL用户权限
setup_mysql_users() {
log "配置MySQL用户权限"
# 在主服务器上创建用户
local master_host=$(echo $MYSQL_MASTER | cut -d':' -f1)
local master_port=$(echo $MYSQL_MASTER | cut -d':' -f2)
mysql -h"$master_host" -P"$master_port" -uroot -p << EOF
-- 创建ProxySQL用户
CREATE USER IF NOT EXISTS '$MYSQL_USER'@'%' IDENTIFIED BY '$MYSQL_PASSWORD';
GRANT ALL PRIVILEGES ON *.* TO '$MYSQL_USER'@'%';
-- 创建监控用户
CREATE USER IF NOT EXISTS '$PROXYSQL_MONITOR_USER'@'%' IDENTIFIED BY '$PROXYSQL_MONITOR_PASSWORD';
GRANT REPLICATION CLIENT ON *.* TO '$PROXYSQL_MONITOR_USER'@'%';
GRANT SELECT ON performance_schema.* TO '$PROXYSQL_MONITOR_USER'@'%';
FLUSH PRIVILEGES;
EOF
log "MySQL用户权限配置完成"
}
# 启动ProxySQL服务
start_proxysql() {
log "启动ProxySQL服务"
# 创建数据目录
mkdir -p /var/lib/proxysql
chown proxysql:proxysql /var/lib/proxysql
# 启动服务
systemctl enable proxysql
systemctl start proxysql
# 等待服务启动
sleep 5
# 检查服务状态
if systemctl is-active --quiet proxysql; then
log "ProxySQL服务启动成功"
else
error_exit "ProxySQL服务启动失败"
fi
}
# 配置ProxySQL规则
configure_proxysql_rules() {
log "配置ProxySQL规则"
# 连接到ProxySQL管理接口
mysql -h127.0.0.1 -P$PROXYSQL_ADMIN_PORT -u$PROXYSQL_ADMIN_USER -p$PROXYSQL_ADMIN_PASSWORD << 'EOF'
-- 加载配置到运行时
LOAD MYSQL SERVERS TO RUNTIME;
LOAD MYSQL USERS TO RUNTIME;
LOAD MYSQL QUERY RULES TO RUNTIME;
LOAD SCHEDULER TO RUNTIME;
-- 保存配置到磁盘
SAVE MYSQL SERVERS TO DISK;
SAVE MYSQL USERS TO DISK;
SAVE MYSQL QUERY RULES TO DISK;
SAVE SCHEDULER TO DISK;
-- 显示配置状态
SELECT * FROM mysql_servers;
SELECT * FROM mysql_users;
SELECT * FROM mysql_query_rules;
EOF
log "ProxySQL规则配置完成"
}
# 生成健康检查脚本
generate_health_check() {
log "生成健康检查脚本"
cat > "/usr/local/bin/proxysql_health_check.sh" << 'EOF'
#!/bin/bash
# ProxySQL健康检查脚本
PROXYSQL_ADMIN_PORT="6032"
PROXYSQL_ADMIN_USER="admin"
PROXYSQL_ADMIN_PASSWORD="admin_password"
LOG_FILE="/var/log/proxysql_health.log"
ALERT_EMAIL="admin@example.com"
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}
# 检查ProxySQL服务状态
check_proxysql_service() {
if ! systemctl is-active --quiet proxysql; then
log "ALERT: ProxySQL服务未运行"
return 1
fi
return 0
}
# 检查MySQL服务器连接
check_mysql_servers() {
local failed_servers=()
# 获取服务器列表
local servers=$(mysql -h127.0.0.1 -P$PROXYSQL_ADMIN_PORT -u$PROXYSQL_ADMIN_USER -p$PROXYSQL_ADMIN_PASSWORD -e "SELECT hostname,port,status FROM mysql_servers;" --skip-column-names 2>/dev/null)
while IFS=$'\t' read -r hostname port status; do
if [ "$status" != "ONLINE" ]; then
failed_servers+=("$hostname:$port")
fi
done <<< "$servers"
if [ ${#failed_servers[@]} -gt 0 ]; then
log "ALERT: MySQL服务器离线: ${failed_servers[*]}"
return 1
fi
return 0
}
# 检查连接池状态
check_connection_pool() {
local pool_info=$(mysql -h127.0.0.1 -P$PROXYSQL_ADMIN_PORT -u$PROXYSQL_ADMIN_USER -p$PROXYSQL_ADMIN_PASSWORD -e "SELECT hostgroup,srv_host,srv_port,status,ConnUsed,ConnFree,ConnOK,ConnERR FROM stats_mysql_connection_pool;" --skip-column-names 2>/dev/null)
local high_error_count=0
while IFS=$'\t' read -r hostgroup srv_host srv_port status conn_used conn_free conn_ok conn_err; do
if [ "$conn_err" -gt 10 ]; then
log "WARNING: 服务器 $srv_host:$srv_port 连接错误数过高: $conn_err"
((high_error_count++))
fi
done <<< "$pool_info"
if [ $high_error_count -gt 0 ]; then
return 1
fi
return 0
}
# 检查查询性能
check_query_performance() {
local slow_queries=$(mysql -h127.0.0.1 -P$PROXYSQL_ADMIN_PORT -u$PROXYSQL_ADMIN_USER -p$PROXYSQL_ADMIN_PASSWORD -e "SELECT COUNT(*) FROM stats_mysql_query_digest WHERE sum_time/count_star > 1000;" --skip-column-names 2>/dev/null)
if [ "$slow_queries" -gt 5 ]; then
log "WARNING: 检测到 $slow_queries 个慢查询"
return 1
fi
return 0
}
# 发送告警
send_alert() {
local subject="$1"
local message="$2"
echo "$message" | mail -s "$subject" "$ALERT_EMAIL" 2>/dev/null || {
log "WARNING: 无法发送邮件告警"
logger "ProxySQL Alert: $subject - $message"
}
}
# 生成状态报告
generate_status_report() {
local report_file="/tmp/proxysql_status_$(date +%Y%m%d_%H%M%S).txt"
{
echo "ProxySQL状态报告 - $(date)"
echo "======================================"
echo
echo "服务器状态:"
mysql -h127.0.0.1 -P$PROXYSQL_ADMIN_PORT -u$PROXYSQL_ADMIN_USER -p$PROXYSQL_ADMIN_PASSWORD -e "SELECT * FROM mysql_servers;"
echo
echo "连接池状态:"
mysql -h127.0.0.1 -P$PROXYSQL_ADMIN_PORT -u$PROXYSQL_ADMIN_USER -p$PROXYSQL_ADMIN_PASSWORD -e "SELECT * FROM stats_mysql_connection_pool;"
echo
echo "查询统计:"
mysql -h127.0.0.1 -P$PROXYSQL_ADMIN_PORT -u$PROXYSQL_ADMIN_USER -p$PROXYSQL_ADMIN_PASSWORD -e "SELECT schemaname,username,digest_text,count_star,sum_time,min_time,max_time FROM stats_mysql_query_digest ORDER BY sum_time DESC LIMIT 10;"
} > "$report_file"
log "状态报告已生成: $report_file"
}
# 主函数
main() {
case "${1:-check}" in
check)
log "开始ProxySQL健康检查"
local errors=0
check_proxysql_service || ((errors++))
check_mysql_servers || ((errors++))
check_connection_pool || ((errors++))
check_query_performance || ((errors++))
if [ $errors -eq 0 ]; then
log "ProxySQL状态正常"
else
log "检测到 $errors 个问题"
fi
;;
report)
generate_status_report
;;
monitor)
while true; do
main check
sleep 60
done
;;
*)
echo "用法: $0 {check|report|monitor}"
echo " check - 检查ProxySQL状态"
echo " report - 生成状态报告"
echo " monitor - 持续监控"
exit 1
;;
esac
}
main "$@"
EOF
chmod +x "/usr/local/bin/proxysql_health_check.sh"
log "健康检查脚本已生成"
}
# 测试ProxySQL连接
test_proxysql_connection() {
log "测试ProxySQL连接"
# 测试读写分离
echo "测试写操作(应该路由到主服务器):"
mysql -h127.0.0.1 -P$PROXYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD -e "CREATE DATABASE IF NOT EXISTS proxysql_test; USE proxysql_test; CREATE TABLE IF NOT EXISTS test_table (id INT AUTO_INCREMENT PRIMARY KEY, data VARCHAR(100)); INSERT INTO test_table (data) VALUES ('ProxySQL Test');"
echo "测试读操作(应该路由到从服务器):"
mysql -h127.0.0.1 -P$PROXYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD -e "USE proxysql_test; SELECT * FROM test_table;"
# 检查路由统计
echo "查询路由统计:"
mysql -h127.0.0.1 -P$PROXYSQL_ADMIN_PORT -u$PROXYSQL_ADMIN_USER -p$PROXYSQL_ADMIN_PASSWORD -e "SELECT hostgroup,schemaname,username,digest_text,count_star FROM stats_mysql_query_digest ORDER BY count_star DESC LIMIT 5;"
log "ProxySQL连接测试完成"
}
# 主函数
main() {
case "${1:-help}" in
install)
log "开始ProxySQL部署"
install_proxysql
generate_proxysql_config
setup_mysql_users
start_proxysql
configure_proxysql_rules
generate_health_check
test_proxysql_connection
log "ProxySQL部署完成"
echo
echo "连接信息:"
echo " 应用连接: mysql -h127.0.0.1 -P$PROXYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD"
echo " 管理连接: mysql -h127.0.0.1 -P$PROXYSQL_ADMIN_PORT -u$PROXYSQL_ADMIN_USER -p$PROXYSQL_ADMIN_PASSWORD"
echo " 健康检查: /usr/local/bin/proxysql_health_check.sh"
;;
test)
test_proxysql_connection
;;
health)
/usr/local/bin/proxysql_health_check.sh check
;;
help|*)
echo "ProxySQL部署脚本"
echo
echo "用法: $0 [命令]"
echo
echo "命令:"
echo " install - 完整部署ProxySQL"
echo " test - 测试连接和读写分离"
echo " health - 健康检查"
echo " help - 显示此帮助信息"
echo
echo "配置说明:"
echo " ProxySQL端口: $PROXYSQL_PORT"
echo " 管理端口: $PROXYSQL_ADMIN_PORT"
echo " 主服务器: $MYSQL_MASTER"
echo " 从服务器: ${MYSQL_SLAVES[*]}"
;;
esac
}
main "$@"
13.5.2 HAProxy + Keepalived高可用
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
HAProxy + Keepalived MySQL高可用解决方案
提供负载均衡、故障转移和VIP管理
"""
import subprocess
import json
import time
import logging
import socket
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
class ServerStatus(Enum):
"""服务器状态"""
UP = "UP"
DOWN = "DOWN"
MAINT = "MAINT"
NOLB = "NOLB"
class ServerRole(Enum):
"""服务器角色"""
MASTER = "master"
SLAVE = "slave"
BACKUP = "backup"
@dataclass
class MySQLServer:
"""MySQL服务器信息"""
name: str
host: str
port: int
role: ServerRole
weight: int = 100
status: ServerStatus = ServerStatus.UP
check_interval: int = 2000
max_connections: int = 1000
class HAProxyMySQLManager:
"""HAProxy MySQL高可用管理器"""
def __init__(self, config_file: str = "/etc/haproxy/haproxy.cfg"):
self.config_file = config_file
self.stats_socket = "/var/run/haproxy/admin.sock"
self.vip = "192.168.1.200"
self.mysql_port = 3306
self.stats_port = 8404
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/haproxy_mysql_manager.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def generate_haproxy_config(self, servers: List[MySQLServer]) -> str:
"""生成HAProxy配置"""
config = f"""
global
log stdout local0
chroot /var/lib/haproxy
stats socket {self.stats_socket} mode 660 level admin
stats timeout 30s
user haproxy
group haproxy
daemon
# SSL配置
ssl-default-bind-ciphers ECDHE+AESGCM:ECDHE+CHACHA20:RSA+AESGCM:RSA+AES:!aNULL:!MD5:!DSS
ssl-default-bind-options ssl-min-ver TLSv1.2 no-tls-tickets
defaults
mode tcp
log global
option tcplog
option dontlognull
retries 3
timeout queue 1m
timeout connect 10s
timeout client 1m
timeout server 1m
timeout check 10s
maxconn 3000
# 统计页面
listen stats
bind *:{self.stats_port}
mode http
stats enable
stats uri /stats
stats realm HAProxy\ Statistics
stats auth admin:admin123
stats refresh 30s
stats show-node
stats show-legends
stats admin if TRUE
# MySQL写服务(主服务器)
listen mysql-write
bind {self.vip}:{self.mysql_port}
mode tcp
balance roundrobin
option mysql-check user haproxy_check
"""
# 添加主服务器
for server in servers:
if server.role == ServerRole.MASTER:
config += f"""
server {server.name} {server.host}:{server.port} check weight {server.weight} maxconn {server.max_connections} inter {server.check_interval}ms"""
config += f"""
# MySQL读服务(从服务器)
listen mysql-read
bind {self.vip}:3307
mode tcp
balance roundrobin
option mysql-check user haproxy_check
"""
# 添加从服务器
for server in servers:
if server.role == ServerRole.SLAVE:
config += f"""
server {server.name} {server.host}:{server.port} check weight {server.weight} maxconn {server.max_connections} inter {server.check_interval}ms"""
# 添加备份服务器
for server in servers:
if server.role == ServerRole.BACKUP:
config += f"""
server {server.name} {server.host}:{server.port} check weight {server.weight} maxconn {server.max_connections} inter {server.check_interval}ms backup"""
return config
def generate_keepalived_config(self, interface: str = "eth0", priority: int = 100, is_master: bool = True) -> str:
"""生成Keepalived配置"""
state = "MASTER" if is_master else "BACKUP"
config = f"""
global_defs {{
notification_email {{
admin@example.com
}}
notification_email_from keepalived@example.com
smtp_server localhost
smtp_connect_timeout 30
router_id LVS_MYSQL
vrrp_skip_check_adv_addr
vrrp_strict
vrrp_garp_interval 0
vrrp_gna_interval 0
}}
vrrp_script chk_haproxy {{
script "/usr/local/bin/check_haproxy.sh"
interval 2
weight -2
fall 3
rise 2
}}
vrrp_instance VI_1 {{
state {state}
interface {interface}
virtual_router_id 51
priority {priority}
advert_int 1
authentication {{
auth_type PASS
auth_pass mysql_ha_pass
}}
virtual_ipaddress {{
{self.vip}
}}
track_script {{
chk_haproxy
}}
notify_master "/usr/local/bin/notify_master.sh"
notify_backup "/usr/local/bin/notify_backup.sh"
notify_fault "/usr/local/bin/notify_fault.sh"
}}
"""
return config
def create_health_check_scripts(self):
"""创建健康检查脚本"""
# HAProxy健康检查脚本
haproxy_check_script = f"""
#!/bin/bash
# HAProxy健康检查脚本
if ! pgrep haproxy > /dev/null; then
exit 1
fi
# 检查HAProxy统计接口
if ! echo "show info" | socat stdio {self.stats_socket} > /dev/null 2>&1; then
exit 1
fi
exit 0
"""
with open("/usr/local/bin/check_haproxy.sh", "w") as f:
f.write(haproxy_check_script)
subprocess.run(["chmod", "+x", "/usr/local/bin/check_haproxy.sh"])
# 主节点通知脚本
master_notify_script = f"""
#!/bin/bash
# 主节点通知脚本
echo "$(date): 成为主节点" >> /var/log/keepalived-notify.log
# 发送邮件通知
echo "HAProxy MySQL HA: 节点 $(hostname) 成为主节点" | mail -s "MySQL HA Master" admin@example.com
# 记录系统日志
logger "HAProxy MySQL HA: 节点 $(hostname) 成为主节点"
"""
with open("/usr/local/bin/notify_master.sh", "w") as f:
f.write(master_notify_script)
subprocess.run(["chmod", "+x", "/usr/local/bin/notify_master.sh"])
# 备份节点通知脚本
backup_notify_script = f"""
#!/bin/bash
# 备份节点通知脚本
echo "$(date): 成为备份节点" >> /var/log/keepalived-notify.log
# 发送邮件通知
echo "HAProxy MySQL HA: 节点 $(hostname) 成为备份节点" | mail -s "MySQL HA Backup" admin@example.com
# 记录系统日志
logger "HAProxy MySQL HA: 节点 $(hostname) 成为备份节点"
"""
with open("/usr/local/bin/notify_backup.sh", "w") as f:
f.write(backup_notify_script)
subprocess.run(["chmod", "+x", "/usr/local/bin/notify_backup.sh"])
# 故障通知脚本
fault_notify_script = f"""
#!/bin/bash
# 故障通知脚本
echo "$(date): 节点故障" >> /var/log/keepalived-notify.log
# 发送邮件通知
echo "HAProxy MySQL HA: 节点 $(hostname) 发生故障" | mail -s "MySQL HA Fault" admin@example.com
# 记录系统日志
logger "HAProxy MySQL HA: 节点 $(hostname) 发生故障"
"""
with open("/usr/local/bin/notify_fault.sh", "w") as f:
f.write(fault_notify_script)
subprocess.run(["chmod", "+x", "/usr/local/bin/notify_fault.sh"])
self.logger.info("健康检查脚本已创建")
def get_haproxy_stats(self) -> Dict:
"""获取HAProxy统计信息"""
try:
# 获取基本信息
result = subprocess.run(
["echo", "show info"],
stdout=subprocess.PIPE,
text=True
)
info_output = subprocess.run(
["socat", "stdio", self.stats_socket],
input=result.stdout,
capture_output=True,
text=True
)
# 获取服务器状态
stat_result = subprocess.run(
["echo", "show stat"],
stdout=subprocess.PIPE,
text=True
)
stat_output = subprocess.run(
["socat", "stdio", self.stats_socket],
input=stat_result.stdout,
capture_output=True,
text=True
)
stats = {
'timestamp': datetime.now().isoformat(),
'info': self._parse_haproxy_info(info_output.stdout),
'servers': self._parse_haproxy_stats(stat_output.stdout)
}
return stats
except Exception as e:
self.logger.error(f"获取HAProxy统计信息失败: {e}")
return {'error': str(e)}
def _parse_haproxy_info(self, info_output: str) -> Dict:
"""解析HAProxy信息"""
info = {}
for line in info_output.strip().split('\n'):
if ':' in line:
key, value = line.split(':', 1)
info[key.strip()] = value.strip()
return info
def _parse_haproxy_stats(self, stat_output: str) -> List[Dict]:
"""解析HAProxy统计信息"""
servers = []
lines = stat_output.strip().split('\n')
if not lines:
return servers
# 第一行是标题
headers = lines[0].split(',')
for line in lines[1:]:
if line.strip():
values = line.split(',')
server_info = {}
for i, header in enumerate(headers):
if i < len(values):
server_info[header] = values[i]
servers.append(server_info)
return servers
def enable_server(self, backend: str, server: str) -> bool:
"""启用服务器"""
try:
command = f"enable server {backend}/{server}"
result = subprocess.run(
["echo", command],
stdout=subprocess.PIPE,
text=True
)
subprocess.run(
["socat", "stdio", self.stats_socket],
input=result.stdout,
capture_output=True,
text=True
)
self.logger.info(f"服务器 {backend}/{server} 已启用")
return True
except Exception as e:
self.logger.error(f"启用服务器失败: {e}")
return False
def disable_server(self, backend: str, server: str) -> bool:
"""禁用服务器"""
try:
command = f"disable server {backend}/{server}"
result = subprocess.run(
["echo", command],
stdout=subprocess.PIPE,
text=True
)
subprocess.run(
["socat", "stdio", self.stats_socket],
input=result.stdout,
capture_output=True,
text=True
)
self.logger.info(f"服务器 {backend}/{server} 已禁用")
return True
except Exception as e:
self.logger.error(f"禁用服务器失败: {e}")
return False
def check_mysql_connectivity(self, host: str, port: int, timeout: int = 5) -> bool:
"""检查MySQL连接性"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
sock.close()
return result == 0
except Exception:
return False
def monitor_servers(self, servers: List[MySQLServer]) -> Dict:
"""监控服务器状态"""
monitoring_result = {
'timestamp': datetime.now().isoformat(),
'servers': [],
'alerts': []
}
for server in servers:
is_online = self.check_mysql_connectivity(server.host, server.port)
server_status = {
'name': server.name,
'host': server.host,
'port': server.port,
'role': server.role.value,
'online': is_online,
'status': ServerStatus.UP.value if is_online else ServerStatus.DOWN.value
}
monitoring_result['servers'].append(server_status)
# 生成告警
if not is_online:
alert = {
'level': 'CRITICAL',
'message': f"MySQL服务器 {server.name} ({server.host}:{server.port}) 离线",
'timestamp': datetime.now().isoformat()
}
monitoring_result['alerts'].append(alert)
return monitoring_result
def generate_deployment_script(self, servers: List[MySQLServer], interface: str = "eth0") -> str:
"""生成部署脚本"""
script = f"""
#!/bin/bash
# HAProxy + Keepalived MySQL高可用部署脚本
set -euo pipefail
# 安装软件包
install_packages() {{
echo "安装HAProxy和Keepalived..."
if command -v apt-get &> /dev/null; then
apt-get update
apt-get install -y haproxy keepalived socat mailutils
elif command -v yum &> /dev/null; then
yum install -y haproxy keepalived socat mailx
else
echo "不支持的操作系统"
exit 1
fi
}}
# 配置HAProxy
configure_haproxy() {{
echo "配置HAProxy..."
# 备份原配置
cp /etc/haproxy/haproxy.cfg /etc/haproxy/haproxy.cfg.bak
# 写入新配置
cat > /etc/haproxy/haproxy.cfg << 'EOF'
{self.generate_haproxy_config(servers)}
EOF
# 创建HAProxy用户(用于MySQL健康检查)
echo "创建HAProxy MySQL检查用户..."
"""
# 为每个MySQL服务器创建检查用户
for server in servers:
script += f"""
mysql -h{server.host} -P{server.port} -uroot -p << 'SQL'
CREATE USER IF NOT EXISTS 'haproxy_check'@'%';
FLUSH PRIVILEGES;
SQL
"""
script += f"""
# 启动HAProxy
systemctl enable haproxy
systemctl start haproxy
}}
# 配置Keepalived(主节点)
configure_keepalived_master() {{
echo "配置Keepalived主节点..."
cat > /etc/keepalived/keepalived.conf << 'EOF'
{self.generate_keepalived_config(interface, 100, True)}
EOF
systemctl enable keepalived
systemctl start keepalived
}}
# 配置Keepalived(备份节点)
configure_keepalived_backup() {{
echo "配置Keepalived备份节点..."
cat > /etc/keepalived/keepalived.conf << 'EOF'
{self.generate_keepalived_config(interface, 90, False)}
EOF
systemctl enable keepalived
systemctl start keepalived
}}
# 主函数
main() {{
case "${{1:-help}}" in
master)
install_packages
configure_haproxy
configure_keepalived_master
echo "主节点配置完成"
echo "VIP: {self.vip}"
echo "MySQL写端口: {self.mysql_port}"
echo "MySQL读端口: 3307"
echo "统计页面: http://$(hostname -I | awk '{{print $1}}'):{self.stats_port}/stats"
;;
backup)
install_packages
configure_haproxy
configure_keepalived_backup
echo "备份节点配置完成"
;;
help|*)
echo "HAProxy + Keepalived MySQL高可用部署脚本"
echo
echo "用法: $0 [master|backup]"
echo " master - 配置为主节点"
echo " backup - 配置为备份节点"
;;
esac
}}
main "$@"
"""
return script
def export_config(self, servers: List[MySQLServer], output_dir: str = "/tmp"):
"""导出配置文件"""
# 导出HAProxy配置
haproxy_config = self.generate_haproxy_config(servers)
with open(f"{output_dir}/haproxy.cfg", "w") as f:
f.write(haproxy_config)
# 导出Keepalived配置
keepalived_master_config = self.generate_keepalived_config("eth0", 100, True)
with open(f"{output_dir}/keepalived_master.conf", "w") as f:
f.write(keepalived_master_config)
keepalived_backup_config = self.generate_keepalived_config("eth0", 90, False)
with open(f"{output_dir}/keepalived_backup.conf", "w") as f:
f.write(keepalived_backup_config)
# 导出部署脚本
deployment_script = self.generate_deployment_script(servers)
with open(f"{output_dir}/deploy_ha.sh", "w") as f:
f.write(deployment_script)
subprocess.run(["chmod", "+x", f"{output_dir}/deploy_ha.sh"])
self.logger.info(f"配置文件已导出到: {output_dir}")
# 使用示例
if __name__ == '__main__':
# 定义MySQL服务器
servers = [
MySQLServer("mysql-master", "192.168.1.100", 3306, ServerRole.MASTER, 1000),
MySQLServer("mysql-slave1", "192.168.1.101", 3306, ServerRole.SLAVE, 900),
MySQLServer("mysql-slave2", "192.168.1.102", 3306, ServerRole.SLAVE, 900),
MySQLServer("mysql-backup", "192.168.1.103", 3306, ServerRole.BACKUP, 800)
]
# 创建管理器
manager = HAProxyMySQLManager()
# 创建健康检查脚本
manager.create_health_check_scripts()
# 监控服务器
monitoring_result = manager.monitor_servers(servers)
print("服务器监控结果:")
print(json.dumps(monitoring_result, ensure_ascii=False, indent=2))
# 获取HAProxy统计信息
stats = manager.get_haproxy_stats()
print("\nHAProxy统计信息:")
print(json.dumps(stats, ensure_ascii=False, indent=2))
# 导出配置
manager.export_config(servers)
print("\n配置文件已导出到 /tmp 目录")
13.6 云数据库高可用
13.6.1 云数据库服务对比
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
云数据库高可用服务对比与选择
支持AWS RDS、阿里云RDS、腾讯云CDB等主流云服务
"""
import json
import boto3
import requests
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
class CloudProvider(Enum):
"""云服务提供商"""
AWS = "aws"
ALIYUN = "aliyun"
TENCENT = "tencent"
AZURE = "azure"
GCP = "gcp"
class HALevel(Enum):
"""高可用等级"""
BASIC = "basic" # 基础版
STANDARD = "standard" # 标准版
ENTERPRISE = "enterprise" # 企业版
ULTIMATE = "ultimate" # 旗舰版
@dataclass
class CloudDBInstance:
"""云数据库实例信息"""
provider: CloudProvider
instance_id: str
instance_class: str
engine: str
engine_version: str
storage_type: str
storage_size: int
ha_level: HALevel
multi_az: bool
backup_retention: int
region: str
availability_zones: List[str]
endpoint: str
port: int
status: str
created_time: datetime
monthly_cost: float
class CloudDatabaseManager:
"""云数据库高可用管理器"""
def __init__(self):
self.providers_config = {
CloudProvider.AWS: {
'regions': ['us-east-1', 'us-west-2', 'eu-west-1', 'ap-southeast-1'],
'instance_classes': ['db.t3.micro', 'db.t3.small', 'db.m5.large', 'db.r5.xlarge'],
'storage_types': ['gp2', 'gp3', 'io1', 'io2'],
'engines': ['mysql', 'mariadb', 'aurora-mysql']
},
CloudProvider.ALIYUN: {
'regions': ['cn-hangzhou', 'cn-beijing', 'cn-shanghai', 'cn-shenzhen'],
'instance_classes': ['mysql.n1.micro.1', 'mysql.n2.small.1', 'mysql.n4.medium.1'],
'storage_types': ['cloud_essd', 'cloud_ssd', 'local_ssd'],
'engines': ['MySQL', 'MariaDB']
},
CloudProvider.TENCENT: {
'regions': ['ap-beijing', 'ap-shanghai', 'ap-guangzhou', 'ap-chengdu'],
'instance_classes': ['cdb.s1.small', 'cdb.s2.large', 'cdb.s3.medium'],
'storage_types': ['CLOUD_SSD', 'CLOUD_HSSD', 'LOCAL_NVME'],
'engines': ['MySQL', 'MariaDB']
}
}
def compare_providers(self, requirements: Dict) -> Dict:
"""对比云服务提供商"""
comparison = {
'timestamp': datetime.now().isoformat(),
'requirements': requirements,
'providers': {}
}
for provider in CloudProvider:
provider_info = self._analyze_provider(provider, requirements)
comparison['providers'][provider.value] = provider_info
# 生成推荐
recommendation = self._generate_recommendation(comparison)
comparison['recommendation'] = recommendation
return comparison
def _analyze_provider(self, provider: CloudProvider, requirements: Dict) -> Dict:
"""分析单个云服务提供商"""
config = self.providers_config.get(provider, {})
analysis = {
'provider': provider.value,
'features': self._get_provider_features(provider),
'pricing': self._estimate_pricing(provider, requirements),
'availability': self._get_availability_info(provider),
'performance': self._get_performance_metrics(provider),
'compliance': self._get_compliance_info(provider),
'support': self._get_support_info(provider),
'score': 0
}
# 计算综合评分
analysis['score'] = self._calculate_score(analysis, requirements)
return analysis
def _get_provider_features(self, provider: CloudProvider) -> Dict:
"""获取云服务提供商特性"""
features = {
CloudProvider.AWS: {
'multi_az': True,
'read_replicas': True,
'automated_backup': True,
'point_in_time_recovery': True,
'encryption': True,
'monitoring': 'CloudWatch',
'auto_scaling': True,
'cross_region_replication': True,
'aurora_serverless': True,
'global_database': True
},
CloudProvider.ALIYUN: {
'multi_az': True,
'read_replicas': True,
'automated_backup': True,
'point_in_time_recovery': True,
'encryption': True,
'monitoring': 'CloudMonitor',
'auto_scaling': True,
'cross_region_replication': True,
'polardb': True,
'global_database': False
},
CloudProvider.TENCENT: {
'multi_az': True,
'read_replicas': True,
'automated_backup': True,
'point_in_time_recovery': True,
'encryption': True,
'monitoring': 'Cloud Monitor',
'auto_scaling': True,
'cross_region_replication': True,
'tdsql': True,
'global_database': False
}
}
return features.get(provider, {})
def _estimate_pricing(self, provider: CloudProvider, requirements: Dict) -> Dict:
"""估算定价"""
# 基础定价模型(示例数据)
base_pricing = {
CloudProvider.AWS: {
'instance_hourly': 0.017, # db.t3.micro
'storage_gb_monthly': 0.115, # gp2
'backup_gb_monthly': 0.095,
'data_transfer_gb': 0.09
},
CloudProvider.ALIYUN: {
'instance_hourly': 0.015,
'storage_gb_monthly': 0.10,
'backup_gb_monthly': 0.08,
'data_transfer_gb': 0.08
},
CloudProvider.TENCENT: {
'instance_hourly': 0.014,
'storage_gb_monthly': 0.09,
'backup_gb_monthly': 0.07,
'data_transfer_gb': 0.07
}
}
pricing = base_pricing.get(provider, {})
# 计算月度成本
storage_size = requirements.get('storage_gb', 100)
backup_size = storage_size * requirements.get('backup_retention_days', 7) / 30
monthly_cost = {
'instance': pricing.get('instance_hourly', 0) * 24 * 30,
'storage': pricing.get('storage_gb_monthly', 0) * storage_size,
'backup': pricing.get('backup_gb_monthly', 0) * backup_size,
'data_transfer': pricing.get('data_transfer_gb', 0) * requirements.get('data_transfer_gb', 10)
}
monthly_cost['total'] = sum(monthly_cost.values())
return {
'monthly_cost': monthly_cost,
'annual_cost': monthly_cost['total'] * 12,
'currency': 'USD'
}
def _get_availability_info(self, provider: CloudProvider) -> Dict:
"""获取可用性信息"""
availability_info = {
CloudProvider.AWS: {
'sla': 99.95,
'rto': '< 5 minutes',
'rpo': '< 5 minutes',
'multi_az_failover': 'Automatic',
'backup_frequency': 'Continuous',
'regions': 25,
'availability_zones': 80
},
CloudProvider.ALIYUN: {
'sla': 99.95,
'rto': '< 5 minutes',
'rpo': '< 5 minutes',
'multi_az_failover': 'Automatic',
'backup_frequency': 'Continuous',
'regions': 20,
'availability_zones': 60
},
CloudProvider.TENCENT: {
'sla': 99.95,
'rto': '< 5 minutes',
'rpo': '< 5 minutes',
'multi_az_failover': 'Automatic',
'backup_frequency': 'Continuous',
'regions': 15,
'availability_zones': 45
}
}
return availability_info.get(provider, {})
def _get_performance_metrics(self, provider: CloudProvider) -> Dict:
"""获取性能指标"""
performance = {
CloudProvider.AWS: {
'max_iops': 80000,
'max_throughput_mbps': 2000,
'max_connections': 16000,
'cpu_credits': True,
'burst_performance': True,
'read_replica_lag': '< 100ms'
},
CloudProvider.ALIYUN: {
'max_iops': 50000,
'max_throughput_mbps': 1500,
'max_connections': 12000,
'cpu_credits': True,
'burst_performance': True,
'read_replica_lag': '< 100ms'
},
CloudProvider.TENCENT: {
'max_iops': 40000,
'max_throughput_mbps': 1200,
'max_connections': 10000,
'cpu_credits': True,
'burst_performance': True,
'read_replica_lag': '< 100ms'
}
}
return performance.get(provider, {})
def _get_compliance_info(self, provider: CloudProvider) -> Dict:
"""获取合规信息"""
compliance = {
CloudProvider.AWS: {
'certifications': ['SOC 1/2/3', 'PCI DSS', 'ISO 27001', 'HIPAA', 'FedRAMP'],
'data_residency': True,
'encryption_at_rest': True,
'encryption_in_transit': True,
'key_management': 'AWS KMS',
'audit_logging': True
},
CloudProvider.ALIYUN: {
'certifications': ['ISO 27001', 'SOC 1/2', 'PCI DSS', 'CSA STAR'],
'data_residency': True,
'encryption_at_rest': True,
'encryption_in_transit': True,
'key_management': 'KMS',
'audit_logging': True
},
CloudProvider.TENCENT: {
'certifications': ['ISO 27001', 'SOC 2', 'PCI DSS'],
'data_residency': True,
'encryption_at_rest': True,
'encryption_in_transit': True,
'key_management': 'KMS',
'audit_logging': True
}
}
return compliance.get(provider, {})
def _get_support_info(self, provider: CloudProvider) -> Dict:
"""获取支持信息"""
support = {
CloudProvider.AWS: {
'support_levels': ['Basic', 'Developer', 'Business', 'Enterprise'],
'response_time': '< 1 hour (Business)',
'documentation_quality': 'Excellent',
'community_support': 'Very Active',
'professional_services': True,
'training_resources': 'Extensive'
},
CloudProvider.ALIYUN: {
'support_levels': ['Basic', 'Business', 'Enterprise'],
'response_time': '< 2 hours (Business)',
'documentation_quality': 'Good',
'community_support': 'Active',
'professional_services': True,
'training_resources': 'Good'
},
CloudProvider.TENCENT: {
'support_levels': ['Basic', 'Business', 'Enterprise'],
'response_time': '< 4 hours (Business)',
'documentation_quality': 'Good',
'community_support': 'Moderate',
'professional_services': True,
'training_resources': 'Moderate'
}
}
return support.get(provider, {})
def _calculate_score(self, analysis: Dict, requirements: Dict) -> float:
"""计算综合评分"""
weights = {
'pricing': 0.25,
'availability': 0.30,
'performance': 0.20,
'compliance': 0.15,
'support': 0.10
}
scores = {
'pricing': self._score_pricing(analysis['pricing'], requirements),
'availability': self._score_availability(analysis['availability']),
'performance': self._score_performance(analysis['performance']),
'compliance': self._score_compliance(analysis['compliance']),
'support': self._score_support(analysis['support'])
}
total_score = sum(score * weights[category] for category, score in scores.items())
return round(total_score, 2)
def _score_pricing(self, pricing: Dict, requirements: Dict) -> float:
"""评分定价"""
budget = requirements.get('monthly_budget', 1000)
actual_cost = pricing['monthly_cost']['total']
if actual_cost <= budget * 0.5:
return 100
elif actual_cost <= budget * 0.75:
return 80
elif actual_cost <= budget:
return 60
elif actual_cost <= budget * 1.25:
return 40
else:
return 20
def _score_availability(self, availability: Dict) -> float:
"""评分可用性"""
sla = availability.get('sla', 99.0)
if sla >= 99.99:
return 100
elif sla >= 99.95:
return 90
elif sla >= 99.9:
return 80
elif sla >= 99.5:
return 70
else:
return 60
def _score_performance(self, performance: Dict) -> float:
"""评分性能"""
max_iops = performance.get('max_iops', 0)
if max_iops >= 50000:
return 100
elif max_iops >= 30000:
return 80
elif max_iops >= 10000:
return 60
else:
return 40
def _score_compliance(self, compliance: Dict) -> float:
"""评分合规性"""
cert_count = len(compliance.get('certifications', []))
if cert_count >= 5:
return 100
elif cert_count >= 3:
return 80
elif cert_count >= 2:
return 60
else:
return 40
def _score_support(self, support: Dict) -> float:
"""评分支持"""
support_levels = len(support.get('support_levels', []))
doc_quality = support.get('documentation_quality', 'Poor')
base_score = support_levels * 20
if doc_quality == 'Excellent':
base_score += 20
elif doc_quality == 'Good':
base_score += 10
return min(base_score, 100)
def _generate_recommendation(self, comparison: Dict) -> Dict:
"""生成推荐"""
providers = comparison['providers']
# 按评分排序
sorted_providers = sorted(
providers.items(),
key=lambda x: x[1]['score'],
reverse=True
)
recommendation = {
'best_overall': sorted_providers[0][0],
'best_value': self._find_best_value(providers),
'best_performance': self._find_best_performance(providers),
'best_compliance': self._find_best_compliance(providers),
'ranking': [provider[0] for provider in sorted_providers],
'summary': self._generate_summary(sorted_providers)
}
return recommendation
def _find_best_value(self, providers: Dict) -> str:
"""找到最佳性价比"""
best_value = None
best_ratio = 0
for provider_name, provider_data in providers.items():
score = provider_data['score']
cost = provider_data['pricing']['monthly_cost']['total']
ratio = score / cost if cost > 0 else 0
if ratio > best_ratio:
best_ratio = ratio
best_value = provider_name
return best_value
def _find_best_performance(self, providers: Dict) -> str:
"""找到最佳性能"""
best_performance = None
best_iops = 0
for provider_name, provider_data in providers.items():
iops = provider_data['performance']['max_iops']
if iops > best_iops:
best_iops = iops
best_performance = provider_name
return best_performance
def _find_best_compliance(self, providers: Dict) -> str:
"""找到最佳合规性"""
best_compliance = None
best_cert_count = 0
for provider_name, provider_data in providers.items():
cert_count = len(provider_data['compliance']['certifications'])
if cert_count > best_cert_count:
best_cert_count = cert_count
best_compliance = provider_name
return best_compliance
def _generate_summary(self, sorted_providers: List) -> str:
"""生成总结"""
best_provider = sorted_providers[0]
summary = f"""
基于综合评分,推荐使用 {best_provider[0].upper()} 作为云数据库服务提供商。
主要优势:
- 综合评分:{best_provider[1]['score']}/100
- SLA:{best_provider[1]['availability']['sla']}%
- 月度成本:${best_provider[1]['pricing']['monthly_cost']['total']:.2f}
- 最大IOPS:{best_provider[1]['performance']['max_iops']:,}
建议在选择时还需要考虑:
1. 数据本地化要求
2. 现有技术栈兼容性
3. 团队技术能力
4. 长期发展规划
"""
return summary.strip()
def generate_migration_plan(self, source_provider: CloudProvider, target_provider: CloudProvider) -> Dict:
"""生成迁移计划"""
migration_plan = {
'source': source_provider.value,
'target': target_provider.value,
'phases': [
{
'phase': 1,
'name': '准备阶段',
'duration': '1-2周',
'tasks': [
'评估现有数据库架构',
'制定迁移策略',
'准备目标环境',
'配置网络连接',
'设置监控和告警'
]
},
{
'phase': 2,
'name': '测试迁移',
'duration': '1周',
'tasks': [
'创建测试环境',
'执行数据迁移测试',
'验证数据完整性',
'性能测试',
'故障转移测试'
]
},
{
'phase': 3,
'name': '生产迁移',
'duration': '1-2天',
'tasks': [
'停止写入操作',
'执行最终数据同步',
'切换应用连接',
'验证系统功能',
'监控系统状态'
]
},
{
'phase': 4,
'name': '优化阶段',
'duration': '1-2周',
'tasks': [
'性能调优',
'配置优化',
'监控调整',
'文档更新',
'团队培训'
]
}
],
'risks': [
{
'risk': '数据丢失',
'probability': 'Low',
'impact': 'High',
'mitigation': '多重备份验证,分阶段迁移'
},
{
'risk': '服务中断',
'probability': 'Medium',
'impact': 'High',
'mitigation': '维护窗口执行,快速回滚方案'
},
{
'risk': '性能下降',
'probability': 'Medium',
'impact': 'Medium',
'mitigation': '充分测试,性能基准对比'
}
],
'tools': [
'AWS Database Migration Service',
'MySQL Workbench',
'Percona XtraBackup',
'pt-table-checksum',
'Custom migration scripts'
],
'estimated_cost': {
'migration_tools': 500,
'additional_resources': 2000,
'professional_services': 5000,
'total': 7500,
'currency': 'USD'
}
}
return migration_plan
def export_comparison_report(self, comparison: Dict, output_file: str = "/tmp/cloud_db_comparison.json"):
"""导出对比报告"""
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(comparison, f, ensure_ascii=False, indent=2, default=str)
print(f"对比报告已导出到: {output_file}")
# 使用示例
if __name__ == '__main__':
# 定义需求
requirements = {
'storage_gb': 500,
'monthly_budget': 800,
'backup_retention_days': 30,
'data_transfer_gb': 100,
'performance_tier': 'high',
'compliance_required': ['ISO 27001', 'SOC 2'],
'regions': ['us-east-1', 'cn-beijing']
}
# 创建管理器
manager = CloudDatabaseManager()
# 对比云服务提供商
comparison = manager.compare_providers(requirements)
print("云数据库服务对比结果:")
print(json.dumps(comparison, ensure_ascii=False, indent=2, default=str))
# 生成迁移计划
migration_plan = manager.generate_migration_plan(
CloudProvider.AWS,
CloudProvider.ALIYUN
)
print("\n迁移计划:")
print(json.dumps(migration_plan, ensure_ascii=False, indent=2))
# 导出报告
manager.export_comparison_report(comparison)
13.6.2 云数据库自动化运维
#!/bin/bash
# 云数据库自动化运维脚本
# 支持AWS RDS、阿里云RDS、腾讯云CDB的统一管理
set -euo pipefail
# 配置参数
CLOUD_PROVIDER="aws" # aws, aliyun, tencent
REGION="us-east-1"
DB_INSTANCE_ID="mydb-instance"
MONITOR_INTERVAL=60
LOG_FILE="/var/log/cloud_db_monitor.log"
ALERT_EMAIL="admin@example.com"
SLACK_WEBHOOK="https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}
# 错误处理
error_exit() {
log "错误: $1"
send_alert "ERROR" "$1"
exit 1
}
# 发送告警
send_alert() {
local level="$1"
local message="$2"
# 发送邮件
echo "$message" | mail -s "[$level] 云数据库告警" "$ALERT_EMAIL" 2>/dev/null || true
# 发送Slack通知
if [ -n "$SLACK_WEBHOOK" ]; then
curl -X POST -H 'Content-type: application/json' \
--data "{\"text\":\"[$level] 云数据库告警: $message\"}" \
"$SLACK_WEBHOOK" 2>/dev/null || true
fi
# 记录系统日志
logger "Cloud DB Alert [$level]: $message"
}
# AWS RDS操作
aws_rds_operations() {
local operation="$1"
case "$operation" in
status)
aws rds describe-db-instances \
--db-instance-identifier "$DB_INSTANCE_ID" \
--region "$REGION" \
--query 'DBInstances[0].{Status:DBInstanceStatus,Engine:Engine,Class:DBInstanceClass,Storage:AllocatedStorage,MultiAZ:MultiAZ}' \
--output table
;;
metrics)
# 获取CloudWatch指标
local end_time=$(date -u +%Y-%m-%dT%H:%M:%S)
local start_time=$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%S)
echo "CPU使用率:"
aws cloudwatch get-metric-statistics \
--namespace AWS/RDS \
--metric-name CPUUtilization \
--dimensions Name=DBInstanceIdentifier,Value="$DB_INSTANCE_ID" \
--start-time "$start_time" \
--end-time "$end_time" \
--period 300 \
--statistics Average \
--region "$REGION" \
--query 'Datapoints[*].[Timestamp,Average]' \
--output table
echo "\n连接数:"
aws cloudwatch get-metric-statistics \
--namespace AWS/RDS \
--metric-name DatabaseConnections \
--dimensions Name=DBInstanceIdentifier,Value="$DB_INSTANCE_ID" \
--start-time "$start_time" \
--end-time "$end_time" \
--period 300 \
--statistics Average \
--region "$REGION" \
--query 'Datapoints[*].[Timestamp,Average]' \
--output table
;;
backup)
# 创建快照
local snapshot_id="${DB_INSTANCE_ID}-$(date +%Y%m%d-%H%M%S)"
aws rds create-db-snapshot \
--db-instance-identifier "$DB_INSTANCE_ID" \
--db-snapshot-identifier "$snapshot_id" \
--region "$REGION"
log "已创建快照: $snapshot_id"
;;
scale)
local new_class="$2"
aws rds modify-db-instance \
--db-instance-identifier "$DB_INSTANCE_ID" \
--db-instance-class "$new_class" \
--apply-immediately \
--region "$REGION"
log "已启动实例类型变更: $new_class"
;;
failover)
# 强制故障转移(仅Multi-AZ)
aws rds reboot-db-instance \
--db-instance-identifier "$DB_INSTANCE_ID" \
--force-failover \
--region "$REGION"
log "已启动强制故障转移"
;;
*)
echo "不支持的AWS RDS操作: $operation"
return 1
;;
esac
}
# 阿里云RDS操作
aliyun_rds_operations() {
local operation="$1"
case "$operation" in
status)
aliyun rds DescribeDBInstances \
--DBInstanceId "$DB_INSTANCE_ID" \
--region "$REGION"
;;
metrics)
# 获取监控数据
local end_time=$(date -u +%Y-%m-%dT%H:%M:%SZ)
local start_time=$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)
aliyun cms DescribeMetricList \
--Namespace acs_rds_dashboard \
--MetricName CpuUsage \
--Dimensions "[{\"instanceId\":\"$DB_INSTANCE_ID\"}]" \
--StartTime "$start_time" \
--EndTime "$end_time" \
--region "$REGION"
;;
backup)
# 创建备份
aliyun rds CreateBackup \
--DBInstanceId "$DB_INSTANCE_ID" \
--BackupMethod Physical \
--region "$REGION"
log "已启动物理备份"
;;
scale)
local new_class="$2"
aliyun rds ModifyDBInstanceSpec \
--DBInstanceId "$DB_INSTANCE_ID" \
--DBInstanceClass "$new_class" \
--PayType Postpaid \
--region "$REGION"
log "已启动实例规格变更: $new_class"
;;
*)
echo "不支持的阿里云RDS操作: $operation"
return 1
;;
esac
}
# 腾讯云CDB操作
tencent_cdb_operations() {
local operation="$1"
case "$operation" in
status)
tccli cdb DescribeDBInstances \
--InstanceIds "[\"$DB_INSTANCE_ID\"]" \
--region "$REGION"
;;
metrics)
# 获取监控数据
local end_time=$(date +%Y-%m-%d\ %H:%M:%S)
local start_time=$(date -d '1 hour ago' +%Y-%m-%d\ %H:%M:%S)
tccli monitor GetMonitorData \
--Namespace QCE/CDB \
--MetricName CpuUseRate \
--Instances "[{\"Dimensions\":[{\"Name\":\"InstanceId\",\"Value\":\"$DB_INSTANCE_ID\"}]}]" \
--StartTime "$start_time" \
--EndTime "$end_time" \
--Period 300 \
--region "$REGION"
;;
backup)
# 创建备份
tccli cdb CreateBackup \
--InstanceId "$DB_INSTANCE_ID" \
--BackupMethod physical \
--region "$REGION"
log "已启动物理备份"
;;
scale)
local new_class="$2"
tccli cdb UpgradeDBInstance \
--InstanceId "$DB_INSTANCE_ID" \
--Memory 4000 \
--Volume 200 \
--region "$REGION"
log "已启动实例升级"
;;
*)
echo "不支持的腾讯云CDB操作: $operation"
return 1
;;
esac
}
# 统一操作接口
cloud_db_operation() {
local operation="$1"
shift
case "$CLOUD_PROVIDER" in
aws)
aws_rds_operations "$operation" "$@"
;;
aliyun)
aliyun_rds_operations "$operation" "$@"
;;
tencent)
tencent_cdb_operations "$operation" "$@"
;;
*)
error_exit "不支持的云服务提供商: $CLOUD_PROVIDER"
;;
esac
}
# 健康检查
health_check() {
log "开始健康检查"
# 检查实例状态
local status_output
status_output=$(cloud_db_operation status 2>&1) || {
error_exit "无法获取实例状态: $status_output"
}
# 解析状态(简化示例)
if echo "$status_output" | grep -q "available\|running\|Running"; then
log "实例状态正常"
else
send_alert "WARNING" "实例状态异常: $status_output"
fi
# 检查关键指标
check_metrics
}
# 检查指标
check_metrics() {
log "检查性能指标"
# 获取指标数据
local metrics_output
metrics_output=$(cloud_db_operation metrics 2>&1) || {
log "WARNING: 无法获取指标数据: $metrics_output"
return 1
}
# 简化的指标检查(实际应用中需要解析JSON/XML输出)
if echo "$metrics_output" | grep -q "error\|Error\|ERROR"; then
send_alert "WARNING" "指标获取异常: $metrics_output"
else
log "指标获取正常"
fi
}
# 自动备份
auto_backup() {
log "开始自动备份"
# 检查是否需要备份
local current_hour=$(date +%H)
local backup_hour=02 # 凌晨2点备份
if [ "$current_hour" = "$backup_hour" ]; then
cloud_db_operation backup || {
error_exit "自动备份失败"
}
log "自动备份完成"
else
log "跳过备份(当前时间: ${current_hour}:xx,备份时间: ${backup_hour}:xx)"
fi
}
# 自动扩容检查
auto_scaling_check() {
log "检查是否需要自动扩容"
# 这里应该根据实际指标数据判断
# 简化示例:假设CPU使用率超过80%需要扩容
# 获取当前实例规格
local current_status
current_status=$(cloud_db_operation status 2>&1)
# 简化的扩容逻辑
if echo "$current_status" | grep -q "db.t3.micro\|mysql.n1.micro\|cdb.s1.small"; then
log "检测到小规格实例,建议考虑升级"
# 这里可以添加自动扩容逻辑
# cloud_db_operation scale "db.t3.small"
fi
}
# 故障转移测试
failover_test() {
log "执行故障转移测试"
if [ "$CLOUD_PROVIDER" = "aws" ]; then
# 仅在AWS Multi-AZ环境下执行
cloud_db_operation failover || {
error_exit "故障转移测试失败"
}
log "故障转移测试完成"
else
log "当前云服务提供商不支持手动故障转移测试"
fi
}
# 生成运维报告
generate_report() {
local report_file="/tmp/cloud_db_report_$(date +%Y%m%d_%H%M%S).txt"
{
echo "云数据库运维报告 - $(date)"
echo "======================================"
echo
echo "基本信息:"
echo " 云服务提供商: $CLOUD_PROVIDER"
echo " 区域: $REGION"
echo " 实例ID: $DB_INSTANCE_ID"
echo
echo "实例状态:"
cloud_db_operation status 2>&1 || echo " 状态获取失败"
echo
echo "性能指标:"
cloud_db_operation metrics 2>&1 || echo " 指标获取失败"
echo
echo "最近日志:"
tail -20 "$LOG_FILE" 2>/dev/null || echo " 日志文件不存在"
} > "$report_file"
log "运维报告已生成: $report_file"
# 发送报告
if command -v mail &> /dev/null; then
mail -s "云数据库运维报告" "$ALERT_EMAIL" < "$report_file" 2>/dev/null || true
fi
}
# 监控循环
monitor_loop() {
log "开始监控循环(间隔: ${MONITOR_INTERVAL}秒)"
while true; do
health_check
auto_backup
auto_scaling_check
sleep "$MONITOR_INTERVAL"
done
}
# 主函数
main() {
case "${1:-help}" in
monitor)
monitor_loop
;;
status)
cloud_db_operation status
;;
metrics)
cloud_db_operation metrics
;;
backup)
cloud_db_operation backup
;;
scale)
if [ -z "${2:-}" ]; then
echo "用法: $0 scale <new_instance_class>"
exit 1
fi
cloud_db_operation scale "$2"
;;
failover)
failover_test
;;
report)
generate_report
;;
health)
health_check
;;
help|*)
echo "云数据库自动化运维脚本"
echo
echo "用法: $0 [命令]"
echo
echo "命令:"
echo " monitor - 启动监控循环"
echo " status - 查看实例状态"
echo " metrics - 查看性能指标"
echo " backup - 执行备份"
echo " scale - 扩容实例"
echo " failover - 故障转移测试"
echo " report - 生成运维报告"
echo " health - 健康检查"
echo " help - 显示此帮助信息"
echo
echo "配置:"
echo " 云服务提供商: $CLOUD_PROVIDER"
echo " 区域: $REGION"
echo " 实例ID: $DB_INSTANCE_ID"
echo " 监控间隔: ${MONITOR_INTERVAL}秒"
echo " 日志文件: $LOG_FILE"
;;
esac
}
main "$@"
”`