高可用性是现代数据库系统的核心要求之一。本章将深入探讨MySQL高可用架构的设计原理、实现方案和最佳实践,帮助您构建稳定可靠的数据库服务。

13.1 高可用架构概述

13.1.1 高可用性基本概念

高可用性(High Availability,HA)是指系统在长时间运行过程中,能够持续提供服务的能力。对于数据库系统,高可用性通常包括以下几个方面:

1. 可用性指标

  • RTO(Recovery Time Objective): 恢复时间目标,系统故障后恢复服务的最大允许时间
  • RPO(Recovery Point Objective): 恢复点目标,系统故障时允许丢失的最大数据量
  • MTBF(Mean Time Between Failures): 平均故障间隔时间
  • MTTR(Mean Time To Repair): 平均修复时间

2. 可用性等级

可用性等级    年停机时间    月停机时间    周停机时间
99%          3.65天       7.20小时     1.68小时
99.9%        8.76小时     43.2分钟     10.1分钟
99.99%       52.56分钟    4.32分钟     1.01分钟
99.999%      5.26分钟     25.9秒       6.05秒

13.1.2 MySQL高可用架构类型

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MySQL高可用架构分析工具
分析不同高可用方案的特点和适用场景
"""

from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
import json

class HAType(Enum):
    """高可用架构类型"""
    MASTER_SLAVE = "master_slave"
    MASTER_MASTER = "master_master"
    CLUSTER = "cluster"
    PROXY = "proxy"
    CLOUD = "cloud"

class FailoverType(Enum):
    """故障转移类型"""
    MANUAL = "manual"
    AUTOMATIC = "automatic"
    SEMI_AUTOMATIC = "semi_automatic"

@dataclass
class HAArchitecture:
    """高可用架构配置"""
    name: str
    ha_type: HAType
    failover_type: FailoverType
    rto_seconds: int  # 恢复时间目标(秒)
    rpo_seconds: int  # 恢复点目标(秒)
    availability: float  # 可用性百分比
    complexity: str  # 复杂度:low, medium, high
    cost: str  # 成本:low, medium, high
    data_consistency: str  # 数据一致性:eventual, strong
    scalability: str  # 扩展性:low, medium, high
    description: str
    pros: List[str]
    cons: List[str]
    use_cases: List[str]

class MySQLHAAnalyzer:
    """MySQL高可用架构分析器"""
    
    def __init__(self):
        self.architectures = self._initialize_architectures()
    
    def _initialize_architectures(self) -> List[HAArchitecture]:
        """初始化高可用架构配置"""
        return [
            HAArchitecture(
                name="主从复制(Master-Slave)",
                ha_type=HAType.MASTER_SLAVE,
                failover_type=FailoverType.MANUAL,
                rto_seconds=300,  # 5分钟
                rpo_seconds=60,   # 1分钟
                availability=99.9,
                complexity="low",
                cost="low",
                data_consistency="eventual",
                scalability="medium",
                description="一个主库处理写操作,一个或多个从库处理读操作",
                pros=[
                    "配置简单,易于理解和维护",
                    "读写分离,提高查询性能",
                    "成本较低",
                    "支持数据备份和报表查询"
                ],
                cons=[
                    "故障转移需要手动操作",
                    "存在单点故障风险",
                    "主从延迟可能影响数据一致性",
                    "写操作无法扩展"
                ],
                use_cases=[
                    "读多写少的应用",
                    "对一致性要求不高的场景",
                    "预算有限的项目",
                    "数据分析和报表系统"
                ]
            ),
            HAArchitecture(
                name="主主复制(Master-Master)",
                ha_type=HAType.MASTER_MASTER,
                failover_type=FailoverType.SEMI_AUTOMATIC,
                rto_seconds=60,   # 1分钟
                rpo_seconds=30,   # 30秒
                availability=99.95,
                complexity="medium",
                cost="medium",
                data_consistency="eventual",
                scalability="medium",
                description="两个MySQL实例互为主从,都可以处理写操作",
                pros=[
                    "消除单点故障",
                    "故障转移时间短",
                    "支持读写负载均衡",
                    "地理分布式部署"
                ],
                cons=[
                    "数据冲突风险",
                    "配置复杂度增加",
                    "需要应用层协调",
                    "网络分区问题"
                ],
                use_cases=[
                    "需要快速故障转移的应用",
                    "地理分布式系统",
                    "中等规模的Web应用",
                    "对RTO要求较高的场景"
                ]
            ),
            HAArchitecture(
                name="MySQL集群(MySQL Cluster/NDB)",
                ha_type=HAType.CLUSTER,
                failover_type=FailoverType.AUTOMATIC,
                rto_seconds=10,   # 10秒
                rpo_seconds=0,    # 无数据丢失
                availability=99.99,
                complexity="high",
                cost="high",
                data_consistency="strong",
                scalability="high",
                description="基于NDB存储引擎的分布式集群架构",
                pros=[
                    "真正的无单点故障",
                    "自动故障检测和恢复",
                    "强一致性保证",
                    "水平扩展能力强",
                    "内存数据库性能"
                ],
                cons=[
                    "配置和维护复杂",
                    "硬件成本高",
                    "学习曲线陡峭",
                    "功能限制(不支持外键等)",
                    "网络要求高"
                ],
                use_cases=[
                    "电信级应用",
                    "金融交易系统",
                    "实时数据处理",
                    "对可用性要求极高的场景"
                ]
            ),
            HAArchitecture(
                name="代理层高可用(ProxySQL + MHA)",
                ha_type=HAType.PROXY,
                failover_type=FailoverType.AUTOMATIC,
                rto_seconds=30,   # 30秒
                rpo_seconds=10,   # 10秒
                availability=99.95,
                complexity="medium",
                cost="medium",
                data_consistency="eventual",
                scalability="high",
                description="通过代理层实现连接管理和自动故障转移",
                pros=[
                    "自动故障检测和转移",
                    "连接池管理",
                    "读写分离透明化",
                    "负载均衡",
                    "应用无感知切换"
                ],
                cons=[
                    "增加架构复杂度",
                    "代理层成为潜在瓶颈",
                    "需要额外的监控",
                    "故障排查复杂"
                ],
                use_cases=[
                    "微服务架构",
                    "高并发Web应用",
                    "需要透明故障转移的系统",
                    "多租户应用"
                ]
            ),
            HAArchitecture(
                name="云数据库服务(RDS/Aurora)",
                ha_type=HAType.CLOUD,
                failover_type=FailoverType.AUTOMATIC,
                rto_seconds=60,   # 1分钟
                rpo_seconds=5,    # 5秒
                availability=99.99,
                complexity="low",
                cost="high",
                data_consistency="strong",
                scalability="high",
                description="云服务商提供的托管数据库服务",
                pros=[
                    "完全托管,无需运维",
                    "自动备份和恢复",
                    "弹性扩展",
                    "多可用区部署",
                    "监控和告警完善"
                ],
                cons=[
                    "成本较高",
                    "厂商锁定",
                    "定制化限制",
                    "数据迁移复杂"
                ],
                use_cases=[
                    "初创公司",
                    "快速原型开发",
                    "缺乏DBA资源的团队",
                    "云原生应用"
                ]
            )
        ]
    
    def get_architecture_by_type(self, ha_type: HAType) -> Optional[HAArchitecture]:
        """根据类型获取架构配置"""
        for arch in self.architectures:
            if arch.ha_type == ha_type:
                return arch
        return None
    
    def recommend_architecture(self, requirements: Dict) -> List[HAArchitecture]:
        """根据需求推荐架构"""
        recommendations = []
        
        max_rto = requirements.get('max_rto_seconds', 300)
        max_rpo = requirements.get('max_rpo_seconds', 60)
        min_availability = requirements.get('min_availability', 99.9)
        max_complexity = requirements.get('max_complexity', 'high')
        max_cost = requirements.get('max_cost', 'high')
        
        complexity_levels = {'low': 1, 'medium': 2, 'high': 3}
        cost_levels = {'low': 1, 'medium': 2, 'high': 3}
        
        for arch in self.architectures:
            if (arch.rto_seconds <= max_rto and
                arch.rpo_seconds <= max_rpo and
                arch.availability >= min_availability and
                complexity_levels[arch.complexity] <= complexity_levels[max_complexity] and
                cost_levels[arch.cost] <= cost_levels[max_cost]):
                recommendations.append(arch)
        
        # 按可用性和RTO排序
        recommendations.sort(key=lambda x: (-x.availability, x.rto_seconds))
        return recommendations
    
    def compare_architectures(self, arch_names: List[str]) -> Dict:
        """比较不同架构"""
        comparison = {
            'architectures': [],
            'comparison_matrix': {}
        }
        
        selected_archs = []
        for name in arch_names:
            for arch in self.architectures:
                if arch.name == name:
                    selected_archs.append(arch)
                    break
        
        if not selected_archs:
            return comparison
        
        comparison['architectures'] = [arch.name for arch in selected_archs]
        
        # 创建比较矩阵
        metrics = ['availability', 'rto_seconds', 'rpo_seconds', 'complexity', 'cost', 'scalability']
        for metric in metrics:
            comparison['comparison_matrix'][metric] = []
            for arch in selected_archs:
                comparison['comparison_matrix'][metric].append(getattr(arch, metric))
        
        return comparison
    
    def generate_architecture_report(self, arch_name: str) -> Dict:
        """生成架构详细报告"""
        arch = None
        for a in self.architectures:
            if a.name == arch_name:
                arch = a
                break
        
        if not arch:
            return {}
        
        return {
            'name': arch.name,
            'type': arch.ha_type.value,
            'overview': {
                'description': arch.description,
                'availability': f"{arch.availability}%",
                'rto': f"{arch.rto_seconds}秒",
                'rpo': f"{arch.rpo_seconds}秒",
                'complexity': arch.complexity,
                'cost': arch.cost,
                'scalability': arch.scalability,
                'data_consistency': arch.data_consistency
            },
            'advantages': arch.pros,
            'disadvantages': arch.cons,
            'use_cases': arch.use_cases,
            'implementation_considerations': self._get_implementation_considerations(arch)
        }
    
    def _get_implementation_considerations(self, arch: HAArchitecture) -> List[str]:
        """获取实施注意事项"""
        considerations = {
            HAType.MASTER_SLAVE: [
                "配置主从复制参数",
                "监控复制延迟",
                "准备故障转移脚本",
                "定期测试故障转移流程"
            ],
            HAType.MASTER_MASTER: [
                "避免自增ID冲突",
                "配置双向复制",
                "处理数据冲突策略",
                "网络分区检测"
            ],
            HAType.CLUSTER: [
                "规划集群节点数量",
                "配置数据分片",
                "优化网络连接",
                "监控集群状态"
            ],
            HAType.PROXY: [
                "配置代理规则",
                "设置健康检查",
                "优化连接池",
                "监控代理性能"
            ],
            HAType.CLOUD: [
                "选择合适的实例类型",
                "配置多可用区",
                "设置自动备份",
                "监控成本和性能"
            ]
        }
        return considerations.get(arch.ha_type, [])
    
    def export_analysis(self, output_file: str = None):
        """导出分析结果"""
        if not output_file:
            output_file = "mysql_ha_analysis.json"
        
        analysis_data = {
            'architectures': [],
            'summary': {
                'total_architectures': len(self.architectures),
                'availability_range': {
                    'min': min(arch.availability for arch in self.architectures),
                    'max': max(arch.availability for arch in self.architectures)
                },
                'rto_range': {
                    'min': min(arch.rto_seconds for arch in self.architectures),
                    'max': max(arch.rto_seconds for arch in self.architectures)
                }
            }
        }
        
        for arch in self.architectures:
            arch_data = {
                'name': arch.name,
                'type': arch.ha_type.value,
                'metrics': {
                    'availability': arch.availability,
                    'rto_seconds': arch.rto_seconds,
                    'rpo_seconds': arch.rpo_seconds,
                    'complexity': arch.complexity,
                    'cost': arch.cost,
                    'scalability': arch.scalability
                },
                'description': arch.description,
                'pros': arch.pros,
                'cons': arch.cons,
                'use_cases': arch.use_cases
            }
            analysis_data['architectures'].append(arch_data)
        
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(analysis_data, f, ensure_ascii=False, indent=2)
        
        print(f"分析结果已导出到: {output_file}")

# 使用示例
if __name__ == '__main__':
    analyzer = MySQLHAAnalyzer()
    
    # 根据需求推荐架构
    requirements = {
        'max_rto_seconds': 60,
        'max_rpo_seconds': 30,
        'min_availability': 99.95,
        'max_complexity': 'medium',
        'max_cost': 'medium'
    }
    
    recommendations = analyzer.recommend_architecture(requirements)
    print("推荐的高可用架构:")
    for i, arch in enumerate(recommendations, 1):
        print(f"{i}. {arch.name} (可用性: {arch.availability}%, RTO: {arch.rto_seconds}s)")
    
    # 比较不同架构
    comparison = analyzer.compare_architectures([
        "主从复制(Master-Slave)",
        "主主复制(Master-Master)",
        "代理层高可用(ProxySQL + MHA)"
    ])
    print("\n架构比较:")
    print(json.dumps(comparison, ensure_ascii=False, indent=2))
    
    # 生成详细报告
    report = analyzer.generate_architecture_report("MySQL集群(MySQL Cluster/NDB)")
    print("\nMySQL集群架构报告:")
    print(json.dumps(report, ensure_ascii=False, indent=2))
    
    # 导出分析结果
    analyzer.export_analysis()

13.2 主从复制架构

13.2.1 主从复制原理

MySQL主从复制是最基础的高可用方案,通过二进制日志(binlog)实现数据同步。

复制过程

  1. 主库记录变更: 主库将所有数据变更记录到二进制日志
  2. 从库请求日志: 从库的I/O线程连接主库,请求二进制日志
  3. 传输日志: 主库的dump线程将二进制日志发送给从库
  4. 写入中继日志: 从库将接收到的日志写入中继日志(relay log)
  5. 应用变更: 从库的SQL线程读取中继日志并执行SQL语句

13.2.2 主从复制配置

#!/bin/bash
# MySQL主从复制自动配置脚本
# 支持一主多从、GTID复制、半同步复制等

set -euo pipefail

# 配置变量
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
LOG_FILE="/var/log/mysql_replication_setup.log"
CONFIG_DIR="/etc/mysql/conf.d"
DATA_DIR="/var/lib/mysql"

# 复制配置
MASTER_HOST=""
MASTER_PORT="3306"
MASTER_USER="root"
MASTER_PASSWORD=""
SLAVE_HOST=""
SLAVE_PORT="3306"
SLAVE_USER="root"
SLAVE_PASSWORD=""
REPL_USER="repl_user"
REPL_PASSWORD="repl_password"
USE_GTID="true"
USE_SEMI_SYNC="false"

# 日志函数
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}

# 错误处理
error_exit() {
    log "错误: $1"
    exit 1
}

# 检查MySQL服务
check_mysql_service() {
    local host=$1
    local port=$2
    local user=$3
    local password=$4
    
    if ! mysql -h"$host" -P"$port" -u"$user" -p"$password" -e "SELECT 1" >/dev/null 2>&1; then
        error_exit "无法连接到MySQL服务器 $host:$port"
    fi
    
    log "MySQL服务器 $host:$port 连接正常"
}

# 生成MySQL配置文件
generate_mysql_config() {
    local role=$1  # master 或 slave
    local server_id=$2
    local config_file="$CONFIG_DIR/replication.cnf"
    
    log "生成MySQL配置文件: $config_file"
    
    cat > "$config_file" << EOF
[mysqld]
# 服务器ID(必须唯一)
server-id = $server_id

# 二进制日志配置
log-bin = mysql-bin
binlog-format = ROW
expire-logs-days = 7
max-binlog-size = 100M

# GTID配置
EOF

    if [ "$USE_GTID" = "true" ]; then
        cat >> "$config_file" << EOF
gtid-mode = ON
enforce-gtid-consistency = ON
log-slave-updates = ON
EOF
    fi

    if [ "$role" = "master" ]; then
        cat >> "$config_file" << EOF

# 主库配置
binlog-do-db = 
binlog-ignore-db = mysql,information_schema,performance_schema,sys

# 半同步复制(主库)
EOF
        if [ "$USE_SEMI_SYNC" = "true" ]; then
            cat >> "$config_file" << EOF
plugin-load = "rpl_semi_sync_master=semisync_master.so"
rpl-semi-sync-master-enabled = 1
rpl-semi-sync-master-timeout = 1000
EOF
        fi
    else
        cat >> "$config_file" << EOF

# 从库配置
read-only = 1
relay-log = relay-bin
relay-log-index = relay-bin.index

# 半同步复制(从库)
EOF
        if [ "$USE_SEMI_SYNC" = "true" ]; then
            cat >> "$config_file" << EOF
plugin-load = "rpl_semi_sync_slave=semisync_slave.so"
rpl-semi-sync-slave-enabled = 1
EOF
        fi
    fi

    cat >> "$config_file" << EOF

# 性能优化
innodb-flush-log-at-trx-commit = 1
sync-binlog = 1
slave-parallel-type = LOGICAL_CLOCK
slave-parallel-workers = 4
slave-preserve-commit-order = 1

# 错误处理
slave-skip-errors = 1062,1032
EOF

    log "MySQL配置文件已生成"
}

# 配置主库
setup_master() {
    log "配置主库: $MASTER_HOST:$MASTER_PORT"
    
    # 检查连接
    check_mysql_service "$MASTER_HOST" "$MASTER_PORT" "$MASTER_USER" "$MASTER_PASSWORD"
    
    # 创建复制用户
    log "创建复制用户: $REPL_USER"
    mysql -h"$MASTER_HOST" -P"$MASTER_PORT" -u"$MASTER_USER" -p"$MASTER_PASSWORD" << EOF
CREATE USER IF NOT EXISTS '$REPL_USER'@'%' IDENTIFIED BY '$REPL_PASSWORD';
GRANT REPLICATION SLAVE ON *.* TO '$REPL_USER'@'%';
FLUSH PRIVILEGES;
EOF

    # 启用半同步复制插件(如果需要)
    if [ "$USE_SEMI_SYNC" = "true" ]; then
        log "启用半同步复制插件(主库)"
        mysql -h"$MASTER_HOST" -P"$MASTER_PORT" -u"$MASTER_USER" -p"$MASTER_PASSWORD" << EOF
INSTALL PLUGIN rpl_semi_sync_master SONAME 'semisync_master.so';
SET GLOBAL rpl_semi_sync_master_enabled = 1;
SET GLOBAL rpl_semi_sync_master_timeout = 1000;
EOF
    fi
    
    # 获取主库状态
    if [ "$USE_GTID" = "true" ]; then
        log "使用GTID复制模式"
        GTID_SET=$(mysql -h"$MASTER_HOST" -P"$MASTER_PORT" -u"$MASTER_USER" -p"$MASTER_PASSWORD" -sN -e "SELECT @@GLOBAL.gtid_executed;")
        log "当前GTID集合: $GTID_SET"
        echo "GTID_SET=$GTID_SET" > "/tmp/master_status.txt"
    else
        log "使用传统复制模式"
        mysql -h"$MASTER_HOST" -P"$MASTER_PORT" -u"$MASTER_USER" -p"$MASTER_PASSWORD" -e "SHOW MASTER STATUS\G" > "/tmp/master_status.txt"
        BINLOG_FILE=$(grep "File:" "/tmp/master_status.txt" | awk '{print $2}')
        BINLOG_POS=$(grep "Position:" "/tmp/master_status.txt" | awk '{print $2}')
        log "主库状态: File=$BINLOG_FILE, Position=$BINLOG_POS"
    fi
    
    log "主库配置完成"
}

# 配置从库
setup_slave() {
    log "配置从库: $SLAVE_HOST:$SLAVE_PORT"
    
    # 检查连接
    check_mysql_service "$SLAVE_HOST" "$SLAVE_PORT" "$SLAVE_USER" "$SLAVE_PASSWORD"
    
    # 停止从库(如果正在运行)
    mysql -h"$SLAVE_HOST" -P"$SLAVE_PORT" -u"$SLAVE_USER" -p"$SLAVE_PASSWORD" -e "STOP SLAVE;" 2>/dev/null || true
    
    # 启用半同步复制插件(如果需要)
    if [ "$USE_SEMI_SYNC" = "true" ]; then
        log "启用半同步复制插件(从库)"
        mysql -h"$SLAVE_HOST" -P"$SLAVE_PORT" -u"$SLAVE_USER" -p"$SLAVE_PASSWORD" << EOF
INSTALL PLUGIN rpl_semi_sync_slave SONAME 'semisync_slave.so';
SET GLOBAL rpl_semi_sync_slave_enabled = 1;
EOF
    fi
    
    # 配置复制
    if [ "$USE_GTID" = "true" ]; then
        log "配置GTID复制"
        mysql -h"$SLAVE_HOST" -P"$SLAVE_PORT" -u"$SLAVE_USER" -p"$SLAVE_PASSWORD" << EOF
CHANGE MASTER TO
    MASTER_HOST='$MASTER_HOST',
    MASTER_PORT=$MASTER_PORT,
    MASTER_USER='$REPL_USER',
    MASTER_PASSWORD='$REPL_PASSWORD',
    MASTER_AUTO_POSITION=1;
EOF
    else
        log "配置传统复制"
        # 读取主库状态
        if [ ! -f "/tmp/master_status.txt" ]; then
            error_exit "主库状态文件不存在,请先配置主库"
        fi
        
        BINLOG_FILE=$(grep "File:" "/tmp/master_status.txt" | awk '{print $2}')
        BINLOG_POS=$(grep "Position:" "/tmp/master_status.txt" | awk '{print $2}')
        
        mysql -h"$SLAVE_HOST" -P"$SLAVE_PORT" -u"$SLAVE_USER" -p"$SLAVE_PASSWORD" << EOF
CHANGE MASTER TO
    MASTER_HOST='$MASTER_HOST',
    MASTER_PORT=$MASTER_PORT,
    MASTER_USER='$REPL_USER',
    MASTER_PASSWORD='$REPL_PASSWORD',
    MASTER_LOG_FILE='$BINLOG_FILE',
    MASTER_LOG_POS=$BINLOG_POS;
EOF
    fi
    
    # 启动从库
    log "启动从库复制"
    mysql -h"$SLAVE_HOST" -P"$SLAVE_PORT" -u"$SLAVE_USER" -p"$SLAVE_PASSWORD" -e "START SLAVE;"
    
    # 检查复制状态
    sleep 2
    check_slave_status
    
    log "从库配置完成"
}

# 检查从库状态
check_slave_status() {
    log "检查从库复制状态"
    
    mysql -h"$SLAVE_HOST" -P"$SLAVE_PORT" -u"$SLAVE_USER" -p"$SLAVE_PASSWORD" -e "SHOW SLAVE STATUS\G" > "/tmp/slave_status.txt"
    
    IO_RUNNING=$(grep "Slave_IO_Running:" "/tmp/slave_status.txt" | awk '{print $2}')
    SQL_RUNNING=$(grep "Slave_SQL_Running:" "/tmp/slave_status.txt" | awk '{print $2}')
    LAST_ERROR=$(grep "Last_Error:" "/tmp/slave_status.txt" | cut -d':' -f2- | xargs)
    SECONDS_BEHIND=$(grep "Seconds_Behind_Master:" "/tmp/slave_status.txt" | awk '{print $2}')
    
    log "IO线程状态: $IO_RUNNING"
    log "SQL线程状态: $SQL_RUNNING"
    log "复制延迟: $SECONDS_BEHIND 秒"
    
    if [ "$LAST_ERROR" != "" ]; then
        log "复制错误: $LAST_ERROR"
    fi
    
    if [ "$IO_RUNNING" = "Yes" ] && [ "$SQL_RUNNING" = "Yes" ]; then
        log "复制状态正常"
        return 0
    else
        log "复制状态异常"
        return 1
    fi
}

# 生成监控脚本
generate_monitoring_script() {
    log "生成复制监控脚本"
    
    cat > "/usr/local/bin/mysql_replication_monitor.sh" << 'EOF'
#!/bin/bash
# MySQL复制监控脚本

SLAVE_HOST="localhost"
SLAVE_PORT="3306"
SLAVE_USER="root"
SLAVE_PASSWORD=""
ALERT_EMAIL="admin@example.com"
MAX_LAG_SECONDS=60

# 检查复制状态
check_replication() {
    local status_file="/tmp/slave_status_$(date +%s).txt"
    
    mysql -h"$SLAVE_HOST" -P"$SLAVE_PORT" -u"$SLAVE_USER" -p"$SLAVE_PASSWORD" -e "SHOW SLAVE STATUS\G" > "$status_file" 2>/dev/null
    
    if [ $? -ne 0 ]; then
        echo "ERROR: 无法连接到从库"
        return 1
    fi
    
    local io_running=$(grep "Slave_IO_Running:" "$status_file" | awk '{print $2}')
    local sql_running=$(grep "Slave_SQL_Running:" "$status_file" | awk '{print $2}')
    local last_error=$(grep "Last_Error:" "$status_file" | cut -d':' -f2- | xargs)
    local seconds_behind=$(grep "Seconds_Behind_Master:" "$status_file" | awk '{print $2}')
    
    echo "复制状态检查 - $(date)"
    echo "IO线程: $io_running"
    echo "SQL线程: $sql_running"
    echo "延迟: $seconds_behind 秒"
    
    # 检查复制是否正常
    if [ "$io_running" != "Yes" ] || [ "$sql_running" != "Yes" ]; then
        echo "ALERT: 复制线程停止"
        echo "错误信息: $last_error"
        send_alert "MySQL复制线程停止" "IO: $io_running, SQL: $sql_running, Error: $last_error"
        rm -f "$status_file"
        return 1
    fi
    
    # 检查复制延迟
    if [ "$seconds_behind" != "NULL" ] && [ "$seconds_behind" -gt "$MAX_LAG_SECONDS" ]; then
        echo "ALERT: 复制延迟过高"
        send_alert "MySQL复制延迟告警" "当前延迟: $seconds_behind 秒,阈值: $MAX_LAG_SECONDS 秒"
    fi
    
    rm -f "$status_file"
    return 0
}

# 发送告警
send_alert() {
    local subject="$1"
    local message="$2"
    
    echo "$message" | mail -s "$subject" "$ALERT_EMAIL" 2>/dev/null || {
        echo "WARNING: 无法发送邮件告警"
        logger "MySQL Replication Alert: $subject - $message"
    }
}

# 主函数
main() {
    case "${1:-check}" in
        check)
            check_replication
            ;;
        monitor)
            while true; do
                check_replication
                sleep 60
            done
            ;;
        *)
            echo "用法: $0 {check|monitor}"
            echo "  check   - 检查一次复制状态"
            echo "  monitor - 持续监控复制状态"
            exit 1
            ;;
    esac
}

main "$@"
EOF

    chmod +x "/usr/local/bin/mysql_replication_monitor.sh"
    log "监控脚本已生成: /usr/local/bin/mysql_replication_monitor.sh"
}

# 生成故障转移脚本
generate_failover_script() {
    log "生成故障转移脚本"
    
    cat > "/usr/local/bin/mysql_failover.sh" << 'EOF'
#!/bin/bash
# MySQL主从故障转移脚本

set -euo pipefail

# 配置变量
OLD_MASTER_HOST=""
NEW_MASTER_HOST=""
MYSQL_USER="root"
MYSQL_PASSWORD=""
REPL_USER="repl_user"
REPL_PASSWORD="repl_password"

# 日志函数
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1"
}

# 提升从库为主库
promote_slave_to_master() {
    local new_master=$1
    
    log "提升从库为主库: $new_master"
    
    # 停止复制
    mysql -h"$new_master" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "STOP SLAVE;"
    
    # 重置从库状态
    mysql -h"$new_master" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "RESET SLAVE ALL;"
    
    # 启用写入
    mysql -h"$new_master" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "SET GLOBAL read_only = 0;"
    
    # 重置二进制日志
    mysql -h"$new_master" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "RESET MASTER;"
    
    log "从库已提升为主库"
}

# 重新配置其他从库
reconfigure_slaves() {
    local new_master=$1
    shift
    local slaves=("$@")
    
    for slave in "${slaves[@]}"; do
        log "重新配置从库: $slave"
        
        # 停止复制
        mysql -h"$slave" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "STOP SLAVE;"
        
        # 配置新主库
        mysql -h"$slave" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" << EOF
CHANGE MASTER TO
    MASTER_HOST='$new_master',
    MASTER_USER='$REPL_USER',
    MASTER_PASSWORD='$REPL_PASSWORD',
    MASTER_AUTO_POSITION=1;
EOF
        
        # 启动复制
        mysql -h"$slave" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "START SLAVE;"
        
        log "从库 $slave 已重新配置"
    done
}

# 主函数
main() {
    if [ $# -lt 2 ]; then
        echo "用法: $0 <新主库IP> <从库IP1> [从库IP2] ..."
        echo "示例: $0 192.168.1.101 192.168.1.102 192.168.1.103"
        exit 1
    fi
    
    local new_master=$1
    shift
    local slaves=("$@")
    
    log "开始故障转移流程"
    log "新主库: $new_master"
    log "从库列表: ${slaves[*]}"
    
    # 提升新主库
    promote_slave_to_master "$new_master"
    
    # 重新配置从库
    if [ ${#slaves[@]} -gt 0 ]; then
        reconfigure_slaves "$new_master" "${slaves[@]}"
    fi
    
    log "故障转移完成"
    echo
    echo "请更新应用配置,将数据库连接指向新主库: $new_master"
}

main "$@"
EOF

    chmod +x "/usr/local/bin/mysql_failover.sh"
    log "故障转移脚本已生成: /usr/local/bin/mysql_failover.sh"
}

# 主函数
main() {
    log "开始MySQL主从复制配置"
    
    case "${1:-help}" in
        master)
            if [ -z "$MASTER_HOST" ]; then
                error_exit "请设置MASTER_HOST变量"
            fi
            generate_mysql_config "master" "1"
            setup_master
            ;;
        slave)
            if [ -z "$SLAVE_HOST" ] || [ -z "$MASTER_HOST" ]; then
                error_exit "请设置SLAVE_HOST和MASTER_HOST变量"
            fi
            generate_mysql_config "slave" "2"
            setup_slave
            ;;
        check)
            check_slave_status
            ;;
        monitor)
            generate_monitoring_script
            ;;
        failover)
            generate_failover_script
            ;;
        all)
            if [ -z "$MASTER_HOST" ] || [ -z "$SLAVE_HOST" ]; then
                error_exit "请设置MASTER_HOST和SLAVE_HOST变量"
            fi
            generate_mysql_config "master" "1"
            setup_master
            generate_mysql_config "slave" "2"
            setup_slave
            generate_monitoring_script
            generate_failover_script
            ;;
        help|*)
            echo "MySQL主从复制配置脚本"
            echo
            echo "用法: $0 [命令]"
            echo
            echo "命令:"
            echo "  master   - 配置主库"
            echo "  slave    - 配置从库"
            echo "  check    - 检查复制状态"
            echo "  monitor  - 生成监控脚本"
            echo "  failover - 生成故障转移脚本"
            echo "  all      - 完整配置主从复制"
            echo "  help     - 显示此帮助信息"
            echo
            echo "环境变量:"
            echo "  MASTER_HOST      - 主库IP地址"
            echo "  MASTER_PORT      - 主库端口 (默认: 3306)"
            echo "  MASTER_USER      - 主库用户 (默认: root)"
            echo "  MASTER_PASSWORD  - 主库密码"
            echo "  SLAVE_HOST       - 从库IP地址"
            echo "  SLAVE_PORT       - 从库端口 (默认: 3306)"
            echo "  SLAVE_USER       - 从库用户 (默认: root)"
            echo "  SLAVE_PASSWORD   - 从库密码"
            echo "  REPL_USER        - 复制用户 (默认: repl_user)"
            echo "  REPL_PASSWORD    - 复制密码 (默认: repl_password)"
            echo "  USE_GTID         - 使用GTID (默认: true)"
            echo "  USE_SEMI_SYNC    - 使用半同步复制 (默认: false)"
            echo
            echo "示例:"
            echo "  export MASTER_HOST=192.168.1.100"
            echo "  export SLAVE_HOST=192.168.1.101"
            echo "  export MASTER_PASSWORD=master_pass"
            echo "  export SLAVE_PASSWORD=slave_pass"
            echo "  $0 all"
            exit 0
            ;;
    esac
    
    log "MySQL主从复制配置完成"
}

# 解析命令行参数
while [[ $# -gt 1 ]]; do
    case $1 in
        --master-host)
            MASTER_HOST="$2"
            shift 2
            ;;
        --slave-host)
            SLAVE_HOST="$2"
            shift 2
            ;;
        --master-password)
            MASTER_PASSWORD="$2"
            shift 2
            ;;
        --slave-password)
            SLAVE_PASSWORD="$2"
            shift 2
            ;;
        --repl-password)
            REPL_PASSWORD="$2"
            shift 2
            ;;
        --use-gtid)
            USE_GTID="$2"
            shift 2
            ;;
        --use-semi-sync)
            USE_SEMI_SYNC="$2"
            shift 2
            ;;
        *)
            break
            ;;
    esac
done

# 执行主函数
main "$@"

13.7 故障转移机制

13.7.1 自动故障转移

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MySQL自动故障转移系统
支持主从复制、主主复制和集群环境的自动故障检测与转移
"""

import time
import json
import logging
import threading
import subprocess
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import pymysql
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

class NodeRole(Enum):
    """节点角色"""
    MASTER = "master"
    SLAVE = "slave"
    CANDIDATE = "candidate"
    FAILED = "failed"

class FailoverStatus(Enum):
    """故障转移状态"""
    MONITORING = "monitoring"
    DETECTING = "detecting"
    FAILING_OVER = "failing_over"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class MySQLNode:
    """MySQL节点信息"""
    host: str
    port: int
    user: str
    password: str
    role: NodeRole
    priority: int = 100
    last_check: Optional[datetime] = None
    is_healthy: bool = True
    lag_seconds: int = 0
    gtid_executed: str = ""
    server_id: int = 0

class MySQLFailoverManager:
    """MySQL故障转移管理器"""
    
    def __init__(self, config_file: str = "/etc/mysql_failover.json"):
        self.config = self._load_config(config_file)
        self.nodes = self._initialize_nodes()
        self.current_master = None
        self.status = FailoverStatus.MONITORING
        self.failover_lock = threading.Lock()
        self.monitoring_thread = None
        self.is_running = False
        
        # 配置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('/var/log/mysql_failover.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def _load_config(self, config_file: str) -> Dict:
        """加载配置文件"""
        try:
            with open(config_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        except FileNotFoundError:
            # 默认配置
            return {
                "check_interval": 5,
                "failover_timeout": 30,
                "max_lag_seconds": 10,
                "min_slaves_for_failover": 1,
                "email_alerts": {
                    "enabled": True,
                    "smtp_server": "localhost",
                    "smtp_port": 587,
                    "username": "",
                    "password": "",
                    "from_email": "mysql-failover@example.com",
                    "to_emails": ["admin@example.com"]
                },
                "vip": {
                    "enabled": False,
                    "interface": "eth0",
                    "ip": "192.168.1.100",
                    "netmask": "255.255.255.0"
                },
                "nodes": [
                    {
                        "host": "192.168.1.10",
                        "port": 3306,
                        "user": "repl",
                        "password": "password",
                        "role": "master",
                        "priority": 100
                    },
                    {
                        "host": "192.168.1.11",
                        "port": 3306,
                        "user": "repl",
                        "password": "password",
                        "role": "slave",
                        "priority": 90
                    },
                    {
                        "host": "192.168.1.12",
                        "port": 3306,
                        "user": "repl",
                        "password": "password",
                        "role": "slave",
                        "priority": 80
                    }
                ]
            }
    
    def _initialize_nodes(self) -> List[MySQLNode]:
        """初始化节点列表"""
        nodes = []
        for node_config in self.config['nodes']:
            node = MySQLNode(
                host=node_config['host'],
                port=node_config['port'],
                user=node_config['user'],
                password=node_config['password'],
                role=NodeRole(node_config['role']),
                priority=node_config.get('priority', 100)
            )
            nodes.append(node)
            
            if node.role == NodeRole.MASTER:
                self.current_master = node
        
        return nodes
    
    def _connect_to_node(self, node: MySQLNode) -> Optional[pymysql.Connection]:
        """连接到MySQL节点"""
        try:
            connection = pymysql.connect(
                host=node.host,
                port=node.port,
                user=node.user,
                password=node.password,
                connect_timeout=5,
                read_timeout=10,
                charset='utf8mb4'
            )
            return connection
        except Exception as e:
            self.logger.warning(f"无法连接到节点 {node.host}:{node.port}: {e}")
            return None
    
    def _check_node_health(self, node: MySQLNode) -> bool:
        """检查节点健康状态"""
        connection = self._connect_to_node(node)
        if not connection:
            node.is_healthy = False
            return False
        
        try:
            with connection.cursor() as cursor:
                # 检查基本连接
                cursor.execute("SELECT 1")
                
                # 获取服务器ID
                cursor.execute("SELECT @@server_id")
                node.server_id = cursor.fetchone()[0]
                
                # 检查复制状态
                if node.role == NodeRole.SLAVE:
                    cursor.execute("SHOW SLAVE STATUS")
                    slave_status = cursor.fetchone()
                    if slave_status:
                        # 检查复制延迟
                        node.lag_seconds = slave_status[32] or 0  # Seconds_Behind_Master
                        
                        # 检查IO和SQL线程状态
                        io_running = slave_status[10] == 'Yes'  # Slave_IO_Running
                        sql_running = slave_status[11] == 'Yes'  # Slave_SQL_Running
                        
                        if not (io_running and sql_running):
                            self.logger.warning(f"节点 {node.host} 复制线程异常")
                            node.is_healthy = False
                            return False
                        
                        # 检查延迟是否超过阈值
                        if node.lag_seconds > self.config['max_lag_seconds']:
                            self.logger.warning(f"节点 {node.host} 复制延迟过高: {node.lag_seconds}秒")
                
                # 获取GTID执行状态
                cursor.execute("SELECT @@gtid_executed")
                node.gtid_executed = cursor.fetchone()[0] or ""
                
                node.is_healthy = True
                node.last_check = datetime.now()
                return True
                
        except Exception as e:
            self.logger.error(f"检查节点 {node.host} 健康状态失败: {e}")
            node.is_healthy = False
            return False
        finally:
            connection.close()
    
    def _get_best_candidate(self) -> Optional[MySQLNode]:
        """选择最佳的故障转移候选节点"""
        candidates = [
            node for node in self.nodes
            if node.role == NodeRole.SLAVE and node.is_healthy
        ]
        
        if not candidates:
            return None
        
        # 按优先级和GTID位置排序
        candidates.sort(key=lambda x: (-x.priority, x.lag_seconds))
        
        return candidates[0]
    
    def _promote_slave_to_master(self, new_master: MySQLNode) -> bool:
        """将从库提升为主库"""
        connection = self._connect_to_node(new_master)
        if not connection:
            return False
        
        try:
            with connection.cursor() as cursor:
                # 停止复制
                cursor.execute("STOP SLAVE")
                
                # 重置复制配置
                cursor.execute("RESET SLAVE ALL")
                
                # 设置为可写
                cursor.execute("SET GLOBAL read_only = 0")
                cursor.execute("SET GLOBAL super_read_only = 0")
                
                # 刷新日志
                cursor.execute("FLUSH LOGS")
                
                self.logger.info(f"成功将 {new_master.host} 提升为主库")
                return True
                
        except Exception as e:
            self.logger.error(f"提升 {new_master.host} 为主库失败: {e}")
            return False
        finally:
            connection.close()
    
    def _reconfigure_slaves(self, new_master: MySQLNode) -> bool:
        """重新配置其他从库指向新主库"""
        success_count = 0
        
        for node in self.nodes:
            if node == new_master or node.role != NodeRole.SLAVE:
                continue
            
            if not node.is_healthy:
                continue
            
            connection = self._connect_to_node(node)
            if not connection:
                continue
            
            try:
                with connection.cursor() as cursor:
                    # 停止复制
                    cursor.execute("STOP SLAVE")
                    
                    # 重新配置主库
                    change_master_sql = f"""
                    CHANGE MASTER TO
                        MASTER_HOST='{new_master.host}',
                        MASTER_PORT={new_master.port},
                        MASTER_USER='{new_master.user}',
                        MASTER_PASSWORD='{new_master.password}',
                        MASTER_AUTO_POSITION=1
                    """
                    cursor.execute(change_master_sql)
                    
                    # 启动复制
                    cursor.execute("START SLAVE")
                    
                    self.logger.info(f"成功重新配置从库 {node.host}")
                    success_count += 1
                    
            except Exception as e:
                self.logger.error(f"重新配置从库 {node.host} 失败: {e}")
            finally:
                connection.close()
        
        return success_count > 0
    
    def _update_vip(self, new_master: MySQLNode) -> bool:
        """更新虚拟IP"""
        if not self.config['vip']['enabled']:
            return True
        
        vip_config = self.config['vip']
        
        try:
            # 删除旧的VIP(如果存在)
            subprocess.run([
                'ip', 'addr', 'del',
                f"{vip_config['ip']}/{vip_config['netmask']}",
                'dev', vip_config['interface']
            ], check=False, capture_output=True)
            
            # 在新主库上添加VIP
            if new_master.host == self._get_local_ip():
                subprocess.run([
                    'ip', 'addr', 'add',
                    f"{vip_config['ip']}/{vip_config['netmask']}",
                    'dev', vip_config['interface']
                ], check=True, capture_output=True)
                
                # 发送免费ARP
                subprocess.run([
                    'arping', '-c', '3', '-I', vip_config['interface'],
                    vip_config['ip']
                ], check=False, capture_output=True)
                
                self.logger.info(f"VIP {vip_config['ip']} 已迁移到 {new_master.host}")
            
            return True
            
        except Exception as e:
            self.logger.error(f"更新VIP失败: {e}")
            return False
    
    def _get_local_ip(self) -> str:
        """获取本地IP地址"""
        import socket
        try:
            # 连接到一个不存在的地址来获取本地IP
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            s.connect(("8.8.8.8", 80))
            local_ip = s.getsockname()[0]
            s.close()
            return local_ip
        except:
            return "127.0.0.1"
    
    def _send_alert(self, subject: str, message: str):
        """发送告警邮件"""
        if not self.config['email_alerts']['enabled']:
            return
        
        email_config = self.config['email_alerts']
        
        try:
            msg = MIMEMultipart()
            msg['From'] = email_config['from_email']
            msg['To'] = ', '.join(email_config['to_emails'])
            msg['Subject'] = subject
            
            msg.attach(MIMEText(message, 'plain', 'utf-8'))
            
            server = smtplib.SMTP(email_config['smtp_server'], email_config['smtp_port'])
            if email_config['username']:
                server.starttls()
                server.login(email_config['username'], email_config['password'])
            
            server.send_message(msg)
            server.quit()
            
            self.logger.info("告警邮件发送成功")
            
        except Exception as e:
            self.logger.error(f"发送告警邮件失败: {e}")
    
    def perform_failover(self) -> bool:
        """执行故障转移"""
        with self.failover_lock:
            if self.status != FailoverStatus.MONITORING:
                self.logger.warning("故障转移正在进行中,跳过")
                return False
            
            self.status = FailoverStatus.DETECTING
            
            try:
                # 检查是否真的需要故障转移
                if self.current_master and self._check_node_health(self.current_master):
                    self.logger.info("主库恢复正常,取消故障转移")
                    self.status = FailoverStatus.MONITORING
                    return False
                
                # 选择最佳候选节点
                new_master = self._get_best_candidate()
                if not new_master:
                    self.logger.error("没有可用的故障转移候选节点")
                    self.status = FailoverStatus.FAILED
                    self._send_alert(
                        "MySQL故障转移失败",
                        "没有可用的故障转移候选节点"
                    )
                    return False
                
                self.logger.info(f"开始故障转移,新主库: {new_master.host}")
                self.status = FailoverStatus.FAILING_OVER
                
                # 提升新主库
                if not self._promote_slave_to_master(new_master):
                    self.logger.error("提升新主库失败")
                    self.status = FailoverStatus.FAILED
                    return False
                
                # 重新配置其他从库
                if not self._reconfigure_slaves(new_master):
                    self.logger.warning("重新配置从库部分失败")
                
                # 更新VIP
                self._update_vip(new_master)
                
                # 更新角色
                if self.current_master:
                    self.current_master.role = NodeRole.FAILED
                new_master.role = NodeRole.MASTER
                self.current_master = new_master
                
                self.status = FailoverStatus.COMPLETED
                
                # 发送成功通知
                self._send_alert(
                    "MySQL故障转移成功",
                    f"故障转移已完成,新主库: {new_master.host}:{new_master.port}"
                )
                
                self.logger.info("故障转移完成")
                return True
                
            except Exception as e:
                self.logger.error(f"故障转移过程中发生错误: {e}")
                self.status = FailoverStatus.FAILED
                self._send_alert(
                    "MySQL故障转移异常",
                    f"故障转移过程中发生错误: {e}"
                )
                return False
            finally:
                # 重置状态
                if self.status != FailoverStatus.COMPLETED:
                    time.sleep(30)  # 等待30秒后重新开始监控
                self.status = FailoverStatus.MONITORING
    
    def _monitoring_loop(self):
        """监控循环"""
        while self.is_running:
            try:
                # 检查所有节点健康状态
                for node in self.nodes:
                    self._check_node_health(node)
                
                # 检查主库状态
                if self.current_master and not self.current_master.is_healthy:
                    self.logger.warning(f"检测到主库 {self.current_master.host} 异常")
                    
                    # 等待一段时间再次确认
                    time.sleep(5)
                    if not self._check_node_health(self.current_master):
                        self.logger.error(f"主库 {self.current_master.host} 确认失效,开始故障转移")
                        self.perform_failover()
                
                # 检查从库状态
                unhealthy_slaves = [
                    node for node in self.nodes
                    if node.role == NodeRole.SLAVE and not node.is_healthy
                ]
                
                if unhealthy_slaves:
                    for slave in unhealthy_slaves:
                        self.logger.warning(f"从库 {slave.host} 状态异常")
                
                time.sleep(self.config['check_interval'])
                
            except Exception as e:
                self.logger.error(f"监控循环异常: {e}")
                time.sleep(10)
    
    def start_monitoring(self):
        """开始监控"""
        if self.is_running:
            self.logger.warning("监控已在运行中")
            return
        
        self.is_running = True
        self.monitoring_thread = threading.Thread(target=self._monitoring_loop)
        self.monitoring_thread.daemon = True
        self.monitoring_thread.start()
        
        self.logger.info("MySQL故障转移监控已启动")
    
    def stop_monitoring(self):
        """停止监控"""
        self.is_running = False
        if self.monitoring_thread:
            self.monitoring_thread.join(timeout=10)
        
        self.logger.info("MySQL故障转移监控已停止")
    
    def get_cluster_status(self) -> Dict:
        """获取集群状态"""
        status = {
            'timestamp': datetime.now().isoformat(),
            'failover_status': self.status.value,
            'current_master': None,
            'nodes': []
        }
        
        if self.current_master:
            status['current_master'] = {
                'host': self.current_master.host,
                'port': self.current_master.port,
                'is_healthy': self.current_master.is_healthy,
                'last_check': self.current_master.last_check.isoformat() if self.current_master.last_check else None
            }
        
        for node in self.nodes:
            node_info = {
                'host': node.host,
                'port': node.port,
                'role': node.role.value,
                'priority': node.priority,
                'is_healthy': node.is_healthy,
                'lag_seconds': node.lag_seconds,
                'server_id': node.server_id,
                'last_check': node.last_check.isoformat() if node.last_check else None
            }
            status['nodes'].append(node_info)
        
        return status
    
    def manual_failover(self, target_host: str) -> bool:
        """手动故障转移到指定节点"""
        target_node = None
        for node in self.nodes:
            if node.host == target_host and node.role == NodeRole.SLAVE:
                target_node = node
                break
        
        if not target_node:
            self.logger.error(f"未找到目标节点 {target_host} 或节点不是从库")
            return False
        
        if not target_node.is_healthy:
            self.logger.error(f"目标节点 {target_host} 状态异常")
            return False
        
        self.logger.info(f"开始手动故障转移到 {target_host}")
        
        # 临时设置当前主库为失效状态
        old_master = self.current_master
        if old_master:
            old_master.is_healthy = False
        
        # 执行故障转移
        success = self.perform_failover()
        
        # 如果失败,恢复原主库状态
        if not success and old_master:
            old_master.is_healthy = True
        
        return success

# 使用示例
if __name__ == '__main__':
    import sys
    import signal
    
    def signal_handler(signum, frame):
        print("\n正在停止故障转移监控...")
        failover_manager.stop_monitoring()
        sys.exit(0)
    
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    
    # 创建故障转移管理器
    failover_manager = MySQLFailoverManager()
    
    if len(sys.argv) > 1:
        command = sys.argv[1]
        
        if command == "status":
            # 显示集群状态
            status = failover_manager.get_cluster_status()
            print(json.dumps(status, ensure_ascii=False, indent=2))
        
        elif command == "failover" and len(sys.argv) > 2:
            # 手动故障转移
            target_host = sys.argv[2]
            success = failover_manager.manual_failover(target_host)
            print(f"手动故障转移{'成功' if success else '失败'}")
        
        elif command == "monitor":
            # 启动监控
            failover_manager.start_monitoring()
            print("故障转移监控已启动,按 Ctrl+C 停止")
            
            try:
                while True:
                    time.sleep(1)
            except KeyboardInterrupt:
                pass
        
        else:
            print("用法:")
            print("  python mysql_failover.py monitor    # 启动监控")
            print("  python mysql_failover.py status     # 查看状态")
            print("  python mysql_failover.py failover <host>  # 手动故障转移")
    
    else:
        print("MySQL自动故障转移系统")
        print("用法: python mysql_failover.py [monitor|status|failover <host>]")

13.7.2 故障转移测试

#!/bin/bash
# MySQL故障转移测试脚本
# 用于测试各种故障场景下的自动转移能力

set -euo pipefail

# 配置参数
MASTER_HOST="192.168.1.10"
SLAVE1_HOST="192.168.1.11"
SLAVE2_HOST="192.168.1.12"
MYSQL_USER="test"
MYSQL_PASSWORD="password"
TEST_DATABASE="failover_test"
LOG_FILE="/tmp/failover_test.log"

# 日志函数
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}

# 错误处理
error_exit() {
    log "错误: $1"
    exit 1
}

# 执行MySQL命令
execute_mysql() {
    local host="$1"
    local sql="$2"
    
    mysql -h "$host" -u "$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "$sql" 2>/dev/null
}

# 检查MySQL连接
check_mysql_connection() {
    local host="$1"
    
    if execute_mysql "$host" "SELECT 1" >/dev/null 2>&1; then
        return 0
    else
        return 1
    fi
}

# 获取主库信息
get_master_status() {
    local host="$1"
    
    execute_mysql "$host" "SHOW MASTER STATUS\\G" | grep -E "(File|Position)"
}

# 获取从库状态
get_slave_status() {
    local host="$1"
    
    execute_mysql "$host" "SHOW SLAVE STATUS\\G" | grep -E "(Master_Host|Slave_IO_Running|Slave_SQL_Running|Seconds_Behind_Master)"
}

# 创建测试数据
create_test_data() {
    log "创建测试数据"
    
    execute_mysql "$MASTER_HOST" "
        CREATE DATABASE IF NOT EXISTS $TEST_DATABASE;
        USE $TEST_DATABASE;
        CREATE TABLE IF NOT EXISTS test_table (
            id INT AUTO_INCREMENT PRIMARY KEY,
            data VARCHAR(255),
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        INSERT INTO test_table (data) VALUES 
            ('Initial data 1'),
            ('Initial data 2'),
            ('Initial data 3');
    "
    
    log "测试数据创建完成"
}

# 验证数据一致性
verify_data_consistency() {
    local host="$1"
    
    log "验证节点 $host 的数据一致性"
    
    local count
    count=$(execute_mysql "$host" "SELECT COUNT(*) FROM $TEST_DATABASE.test_table" | tail -1)
    
    log "节点 $host 数据行数: $count"
    
    # 显示最新的几条记录
    execute_mysql "$host" "SELECT * FROM $TEST_DATABASE.test_table ORDER BY id DESC LIMIT 5"
}

# 模拟主库故障
simulate_master_failure() {
    log "模拟主库故障 - 停止MySQL服务"
    
    ssh "root@$MASTER_HOST" "systemctl stop mysql" || {
        log "无法通过SSH停止主库服务,尝试其他方法"
        return 1
    }
    
    log "主库服务已停止"
}

# 模拟网络分区
simulate_network_partition() {
    log "模拟网络分区 - 阻断主库网络"
    
    ssh "root@$MASTER_HOST" "iptables -A INPUT -p tcp --dport 3306 -j DROP" || {
        log "无法设置网络分区"
        return 1
    }
    
    log "网络分区已设置"
}

# 模拟高负载
simulate_high_load() {
    log "模拟高负载场景"
    
    # 在主库上执行大量写入操作
    for i in {1..100}; do
        execute_mysql "$MASTER_HOST" "
            INSERT INTO $TEST_DATABASE.test_table (data) 
            VALUES ('Load test data $i');
        " &
    done
    
    wait
    log "高负载测试完成"
}

# 恢复主库
restore_master() {
    log "恢复主库服务"
    
    # 清除网络规则
    ssh "root@$MASTER_HOST" "iptables -D INPUT -p tcp --dport 3306 -j DROP" 2>/dev/null || true
    
    # 启动MySQL服务
    ssh "root@$MASTER_HOST" "systemctl start mysql" || {
        log "无法启动主库服务"
        return 1
    }
    
    # 等待服务启动
    sleep 10
    
    log "主库服务已恢复"
}

# 等待故障转移完成
wait_for_failover() {
    local timeout=60
    local elapsed=0
    
    log "等待故障转移完成(超时: ${timeout}秒)"
    
    while [ $elapsed -lt $timeout ]; do
        # 检查从库是否变为主库
        if check_mysql_connection "$SLAVE1_HOST"; then
            local read_only
            read_only=$(execute_mysql "$SLAVE1_HOST" "SELECT @@read_only" | tail -1)
            
            if [ "$read_only" = "0" ]; then
                log "故障转移完成,$SLAVE1_HOST 已成为新主库"
                return 0
            fi
        fi
        
        if check_mysql_connection "$SLAVE2_HOST"; then
            local read_only
            read_only=$(execute_mysql "$SLAVE2_HOST" "SELECT @@read_only" | tail -1)
            
            if [ "$read_only" = "0" ]; then
                log "故障转移完成,$SLAVE2_HOST 已成为新主库"
                return 0
            fi
        fi
        
        sleep 5
        elapsed=$((elapsed + 5))
        log "等待故障转移... ($elapsed/$timeout 秒)"
    done
    
    log "故障转移超时"
    return 1
}

# 测试场景1:主库服务停止
test_master_service_failure() {
    log "=== 测试场景1:主库服务停止 ==="
    
    # 记录初始状态
    log "初始主库状态:"
    get_master_status "$MASTER_HOST"
    
    log "初始从库状态:"
    get_slave_status "$SLAVE1_HOST"
    get_slave_status "$SLAVE2_HOST"
    
    # 模拟故障
    simulate_master_failure
    
    # 等待故障转移
    if wait_for_failover; then
        log "故障转移测试通过"
        
        # 验证数据一致性
        verify_data_consistency "$SLAVE1_HOST"
        verify_data_consistency "$SLAVE2_HOST"
    else
        log "故障转移测试失败"
    fi
    
    # 恢复环境
    restore_master
}

# 测试场景2:网络分区
test_network_partition() {
    log "=== 测试场景2:网络分区 ==="
    
    # 模拟网络分区
    simulate_network_partition
    
    # 等待故障转移
    if wait_for_failover; then
        log "网络分区故障转移测试通过"
    else
        log "网络分区故障转移测试失败"
    fi
    
    # 恢复环境
    restore_master
}

# 测试场景3:高负载下的故障转移
test_high_load_failover() {
    log "=== 测试场景3:高负载下的故障转移 ==="
    
    # 启动高负载
    simulate_high_load &
    local load_pid=$!
    
    # 等待一段时间让负载稳定
    sleep 10
    
    # 模拟故障
    simulate_master_failure
    
    # 停止负载生成
    kill $load_pid 2>/dev/null || true
    
    # 等待故障转移
    if wait_for_failover; then
        log "高负载故障转移测试通过"
    else
        log "高负载故障转移测试失败"
    fi
    
    # 恢复环境
    restore_master
}

# 测试场景4:脑裂检测
test_split_brain_detection() {
    log "=== 测试场景4:脑裂检测 ==="
    
    # 模拟网络分区
    simulate_network_partition
    
    # 等待故障转移
    wait_for_failover
    
    # 恢复原主库网络
    ssh "root@$MASTER_HOST" "iptables -D INPUT -p tcp --dport 3306 -j DROP" 2>/dev/null || true
    
    # 检查是否有脑裂
    sleep 10
    
    local master_count=0
    
    if check_mysql_connection "$MASTER_HOST"; then
        local read_only
        read_only=$(execute_mysql "$MASTER_HOST" "SELECT @@read_only" | tail -1)
        if [ "$read_only" = "0" ]; then
            master_count=$((master_count + 1))
            log "检测到主库: $MASTER_HOST"
        fi
    fi
    
    if check_mysql_connection "$SLAVE1_HOST"; then
        local read_only
        read_only=$(execute_mysql "$SLAVE1_HOST" "SELECT @@read_only" | tail -1)
        if [ "$read_only" = "0" ]; then
            master_count=$((master_count + 1))
            log "检测到主库: $SLAVE1_HOST"
        fi
    fi
    
    if check_mysql_connection "$SLAVE2_HOST"; then
        local read_only
        read_only=$(execute_mysql "$SLAVE2_HOST" "SELECT @@read_only" | tail -1)
        if [ "$read_only" = "0" ]; then
            master_count=$((master_count + 1))
            log "检测到主库: $SLAVE2_HOST"
        fi
    fi
    
    if [ $master_count -gt 1 ]; then
        log "警告: 检测到脑裂,存在多个主库!"
    else
        log "脑裂检测正常,只有一个主库"
    fi
}

# 性能测试
performance_test() {
    log "=== 性能测试 ==="
    
    local current_master
    
    # 确定当前主库
    if check_mysql_connection "$MASTER_HOST"; then
        local read_only
        read_only=$(execute_mysql "$MASTER_HOST" "SELECT @@read_only" | tail -1)
        if [ "$read_only" = "0" ]; then
            current_master="$MASTER_HOST"
        fi
    fi
    
    if [ -z "${current_master:-}" ]; then
        if check_mysql_connection "$SLAVE1_HOST"; then
            local read_only
            read_only=$(execute_mysql "$SLAVE1_HOST" "SELECT @@read_only" | tail -1)
            if [ "$read_only" = "0" ]; then
                current_master="$SLAVE1_HOST"
            fi
        fi
    fi
    
    if [ -z "${current_master:-}" ]; then
        if check_mysql_connection "$SLAVE2_HOST"; then
            local read_only
            read_only=$(execute_mysql "$SLAVE2_HOST" "SELECT @@read_only" | tail -1)
            if [ "$read_only" = "0" ]; then
                current_master="$SLAVE2_HOST"
            fi
        fi
    fi
    
    if [ -z "${current_master:-}" ]; then
        log "无法找到当前主库"
        return 1
    fi
    
    log "当前主库: $current_master"
    
    # 执行性能测试
    local start_time=$(date +%s)
    
    for i in {1..1000}; do
        execute_mysql "$current_master" "
            INSERT INTO $TEST_DATABASE.test_table (data) 
            VALUES ('Performance test $i');
        " >/dev/null
    done
    
    local end_time=$(date +%s)
    local duration=$((end_time - start_time))
    local tps=$((1000 / duration))
    
    log "性能测试完成: 1000次插入耗时 ${duration}秒,TPS: $tps"
}

# 生成测试报告
generate_report() {
    local report_file="/tmp/failover_test_report_$(date +%Y%m%d_%H%M%S).txt"
    
    {
        echo "MySQL故障转移测试报告"
        echo "======================"
        echo "测试时间: $(date)"
        echo "测试环境:"
        echo "  主库: $MASTER_HOST"
        echo "  从库1: $SLAVE1_HOST"
        echo "  从库2: $SLAVE2_HOST"
        echo
        echo "测试结果:"
        echo "--------"
        
        # 提取测试结果
        grep -E "(测试通过|测试失败|检测到脑裂|性能测试完成)" "$LOG_FILE" || echo "无测试结果"
        
        echo
        echo "详细日志:"
        echo "--------"
        cat "$LOG_FILE"
        
    } > "$report_file"
    
    log "测试报告已生成: $report_file"
}

# 清理环境
cleanup() {
    log "清理测试环境"
    
    # 删除测试数据库
    for host in "$MASTER_HOST" "$SLAVE1_HOST" "$SLAVE2_HOST"; do
        if check_mysql_connection "$host"; then
            execute_mysql "$host" "DROP DATABASE IF EXISTS $TEST_DATABASE" 2>/dev/null || true
        fi
    done
    
    # 清除网络规则
    ssh "root@$MASTER_HOST" "iptables -D INPUT -p tcp --dport 3306 -j DROP" 2>/dev/null || true
    
    log "环境清理完成"
}

# 主函数
main() {
    case "${1:-all}" in
        create-data)
            create_test_data
            ;;
        test1)
            test_master_service_failure
            ;;
        test2)
            test_network_partition
            ;;
        test3)
            test_high_load_failover
            ;;
        test4)
            test_split_brain_detection
            ;;
        performance)
            performance_test
            ;;
        cleanup)
            cleanup
            ;;
        report)
            generate_report
            ;;
        all)
            log "开始完整的故障转移测试"
            
            # 创建测试数据
            create_test_data
            
            # 执行所有测试
            test_master_service_failure
            sleep 30
            
            test_network_partition
            sleep 30
            
            test_high_load_failover
            sleep 30
            
            test_split_brain_detection
            sleep 30
            
            performance_test
            
            # 生成报告
            generate_report
            
            # 清理环境
            cleanup
            
            log "所有测试完成"
            ;;
        help|*)
            echo "MySQL故障转移测试脚本"
            echo
            echo "用法: $0 [命令]"
            echo
            echo "命令:"
            echo "  create-data  - 创建测试数据"
            echo "  test1        - 测试主库服务停止"
            echo "  test2        - 测试网络分区"
            echo "  test3        - 测试高负载故障转移"
            echo "  test4        - 测试脑裂检测"
            echo "  performance  - 性能测试"
            echo "  cleanup      - 清理测试环境"
            echo "  report       - 生成测试报告"
            echo "  all          - 执行所有测试(默认)"
            echo "  help         - 显示此帮助信息"
            ;;
    esac
}

main "$@"

13.8 总结

本章详细介绍了MySQL高可用架构设计的各个方面,从基础的主从复制到复杂的集群架构,再到云数据库服务和自动故障转移机制。

13.8.1 核心要点回顾

  1. 高可用架构层次

    • 主从复制:基础的高可用方案
    • 主主复制:双向复制,提供更好的可用性
    • MySQL集群:分布式架构,支持水平扩展
    • 代理层高可用:通过中间件实现透明故障转移
    • 云数据库:托管服务,简化运维复杂度
  2. 关键技术组件

    • GTID:全局事务标识符,简化复制配置
    • 半同步复制:平衡性能和数据安全性
    • 并行复制:提高复制性能
    • 故障检测:及时发现节点异常
    • 自动故障转移:减少人工干预
  3. 架构选择考虑因素

    • RTO/RPO要求
    • 数据一致性需求
    • 性能要求
    • 运维复杂度
    • 成本预算

13.8.2 实施建议

  1. 分阶段实施

    • 第一阶段:建立基础主从复制
    • 第二阶段:添加监控和告警
    • 第三阶段:实现自动故障转移
    • 第四阶段:优化性能和扩展性
  2. 最佳实践

    • 定期进行故障转移演练
    • 建立完善的监控体系
    • 制定详细的运维手册
    • 保持架构文档更新
    • 定期备份和恢复测试
  3. 常见问题处理

    • 复制延迟:优化网络、调整参数
    • 脑裂问题:使用仲裁机制
    • 数据不一致:定期校验和修复
    • 性能瓶颈:读写分离、分库分表

13.8.3 发展趋势

  1. 云原生架构

    • 容器化部署
    • Kubernetes编排
    • 服务网格集成
    • 自动化运维
  2. 新技术融合

    • MySQL 8.0新特性
    • InnoDB Cluster
    • MySQL Router
    • Group Replication
  3. 智能化运维

    • AI驱动的故障预测
    • 自动化性能调优
    • 智能容量规划
    • 自愈系统

13.8.4 下一步学习方向

  1. 深入学习

    • MySQL内核原理
    • 分布式系统理论
    • 云计算架构
    • DevOps实践
  2. 实践项目

    • 搭建完整的高可用环境
    • 开发监控和管理工具
    • 参与开源项目
    • 性能调优案例
  3. 认证考试

    • MySQL认证
    • 云服务认证
    • 架构师认证
    • DevOps认证

通过本章的学习,你应该能够: - 理解各种高可用架构的原理和适用场景 - 设计和实施适合业务需求的高可用方案 - 配置和管理MySQL复制环境 - 实现自动故障检测和转移 - 选择合适的云数据库服务 - 进行故障转移测试和性能优化

下一章我们将学习MySQL的分库分表技术,探讨如何处理大规模数据的水平扩展问题。

13.2.3 主从复制优化

-- 主从复制性能优化SQL脚本
-- 优化复制性能和稳定性

-- 1. 并行复制配置
SET GLOBAL slave_parallel_type = 'LOGICAL_CLOCK';
SET GLOBAL slave_parallel_workers = 4;
SET GLOBAL slave_preserve_commit_order = 1;
SET GLOBAL slave_pending_jobs_size_max = 134217728; -- 128MB

-- 2. 复制过滤配置
-- 忽略系统数据库
CHANGE REPLICATION FILTER REPLICATE_IGNORE_DB = (mysql, information_schema, performance_schema, sys);

-- 只复制特定数据库
-- CHANGE REPLICATION FILTER REPLICATE_DO_DB = (app_db, user_db);

-- 忽略特定表
-- CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE = (app_db.temp_table, app_db.log_table);

-- 3. 二进制日志优化
SET GLOBAL binlog_cache_size = 1048576; -- 1MB
SET GLOBAL max_binlog_cache_size = 4294967296; -- 4GB
SET GLOBAL binlog_stmt_cache_size = 32768; -- 32KB
SET GLOBAL max_binlog_stmt_cache_size = 4294967296; -- 4GB

-- 4. 复制缓冲区优化
SET GLOBAL slave_net_timeout = 60;
SET GLOBAL slave_compressed_protocol = 1;

-- 5. 检查复制状态
SHOW SLAVE STATUS\G

-- 6. 查看复制延迟
SELECT 
    CHANNEL_NAME,
    SERVICE_STATE,
    LAST_ERROR_MESSAGE,
    LAST_ERROR_TIMESTAMP
FROM performance_schema.replication_connection_status;

-- 7. 查看并行复制工作线程
SELECT 
    WORKER_ID,
    THREAD_ID,
    SERVICE_STATE,
    LAST_ERROR_MESSAGE,
    LAST_ERROR_TIMESTAMP
FROM performance_schema.replication_applier_status_by_worker;

-- 8. 复制性能监控
SELECT 
    EVENT_NAME,
    COUNT_STAR,
    SUM_TIMER_WAIT/1000000000000 AS SUM_TIMER_WAIT_SEC,
    AVG_TIMER_WAIT/1000000000000 AS AVG_TIMER_WAIT_SEC
FROM performance_schema.events_waits_summary_global_by_event_name 
WHERE EVENT_NAME LIKE '%replication%'
ORDER BY SUM_TIMER_WAIT DESC;

13.3 主主复制架构

13.3.1 主主复制配置

主主复制(Master-Master Replication)允许两个MySQL实例互为主从,实现双向数据同步。

#!/bin/bash
# MySQL主主复制配置脚本
# 配置双向复制,避免数据冲突

set -euo pipefail

# 配置变量
SERVER1_HOST="192.168.1.100"
SERVER2_HOST="192.168.1.101"
MYSQL_PORT="3306"
MYSQL_USER="root"
MYSQL_PASSWORD="password"
REPL_USER="repl_user"
REPL_PASSWORD="repl_password"

# 日志函数
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1"
}

# 生成主主复制配置
generate_master_master_config() {
    local server_id=$1
    local auto_increment_offset=$2
    local config_file="/etc/mysql/conf.d/master_master.cnf"
    
    log "生成服务器$server_id的配置文件"
    
    cat > "$config_file" << EOF
[mysqld]
# 服务器标识
server-id = $server_id

# 二进制日志
log-bin = mysql-bin
binlog-format = ROW
expire-logs-days = 7
max-binlog-size = 100M

# GTID配置
gtid-mode = ON
enforce-gtid-consistency = ON
log-slave-updates = ON

# 自增ID配置(避免冲突)
auto-increment-increment = 2
auto-increment-offset = $auto_increment_offset

# 主主复制优化
slave-parallel-type = LOGICAL_CLOCK
slave-parallel-workers = 4
slave-preserve-commit-order = 1

# 冲突解决
slave-skip-errors = 1062,1032
replicate-same-server-id = 0

# 性能优化
innodb-flush-log-at-trx-commit = 1
sync-binlog = 1
EOF

    log "配置文件已生成: $config_file"
}

# 配置服务器复制
setup_server_replication() {
    local local_host=$1
    local remote_host=$2
    local server_name=$3
    
    log "配置$server_name复制: $local_host -> $remote_host"
    
    # 创建复制用户
    mysql -h"$local_host" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" << EOF
CREATE USER IF NOT EXISTS '$REPL_USER'@'%' IDENTIFIED BY '$REPL_PASSWORD';
GRANT REPLICATION SLAVE ON *.* TO '$REPL_USER'@'%';
FLUSH PRIVILEGES;
EOF

    # 配置到远程服务器的复制
    mysql -h"$local_host" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" << EOF
STOP SLAVE;
CHANGE MASTER TO
    MASTER_HOST='$remote_host',
    MASTER_PORT=$MYSQL_PORT,
    MASTER_USER='$REPL_USER',
    MASTER_PASSWORD='$REPL_PASSWORD',
    MASTER_AUTO_POSITION=1;
START SLAVE;
EOF

    log "$server_name复制配置完成"
}

# 检查复制状态
check_replication_status() {
    local host=$1
    local server_name=$2
    
    log "检查$server_name复制状态"
    
    mysql -h"$host" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" << 'EOF'
SELECT 
    'Slave Status' as Status_Type,
    Slave_IO_Running,
    Slave_SQL_Running,
    Seconds_Behind_Master,
    Last_Error
FROM 
    (SELECT 
        IF(Slave_IO_Running='Yes', 'Running', 'Stopped') as Slave_IO_Running,
        IF(Slave_SQL_Running='Yes', 'Running', 'Stopped') as Slave_SQL_Running,
        Seconds_Behind_Master,
        Last_Error
     FROM information_schema.SLAVE_HOSTS 
     UNION ALL
     SELECT 'N/A', 'N/A', NULL, 'No slave status available'
     LIMIT 1
    ) as slave_info;
EOF

    # 显示GTID状态
    mysql -h"$host" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "SELECT @@GLOBAL.gtid_executed as GTID_Executed;"
}

# 测试数据同步
test_data_sync() {
    log "测试数据同步"
    
    # 在服务器1创建测试数据
    mysql -h"$SERVER1_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" << EOF
CREATE DATABASE IF NOT EXISTS test_replication;
USE test_replication;
CREATE TABLE IF NOT EXISTS sync_test (
    id INT AUTO_INCREMENT PRIMARY KEY,
    server_source VARCHAR(20),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    data_value VARCHAR(100)
);
INSERT INTO sync_test (server_source, data_value) VALUES ('server1', 'test from server 1');
EOF

    # 等待同步
    sleep 3
    
    # 在服务器2创建测试数据
    mysql -h"$SERVER2_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" << EOF
USE test_replication;
INSERT INTO sync_test (server_source, data_value) VALUES ('server2', 'test from server 2');
EOF

    # 等待同步
    sleep 3
    
    # 检查两个服务器的数据
    log "服务器1的数据:"
    mysql -h"$SERVER1_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "SELECT * FROM test_replication.sync_test ORDER BY id;"
    
    log "服务器2的数据:"
    mysql -h"$SERVER2_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "SELECT * FROM test_replication.sync_test ORDER BY id;"
}

# 生成冲突检测脚本
generate_conflict_detection() {
    cat > "/usr/local/bin/mysql_conflict_detector.sh" << 'EOF'
#!/bin/bash
# MySQL主主复制冲突检测脚本

SERVER1_HOST="192.168.1.100"
SERVER2_HOST="192.168.1.101"
MYSQL_USER="root"
MYSQL_PASSWORD="password"

# 检查复制错误
check_replication_errors() {
    local host=$1
    local server_name=$2
    
    echo "检查$server_name复制错误:"
    
    mysql -h"$host" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -sN << 'SQL'
SELECT 
    CONCAT('Last_Error: ', Last_Error) as Error_Info
FROM 
    (SHOW SLAVE STATUS) as slave_status
WHERE 
    Last_Error != '';
SQL

    # 检查错误日志中的冲突
    mysql -h"$host" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" << 'SQL'
SELECT 
    'Duplicate Key Errors' as Error_Type,
    COUNT(*) as Error_Count
FROM 
    performance_schema.events_errors_summary_global_by_error
WHERE 
    ERROR_NUMBER = 1062; -- Duplicate entry error
    
SELECT 
    'Key Not Found Errors' as Error_Type,
    COUNT(*) as Error_Count
FROM 
    performance_schema.events_errors_summary_global_by_error
WHERE 
    ERROR_NUMBER = 1032; -- Can't find record error
SQL
}

# 检查自增ID冲突
check_auto_increment_conflicts() {
    echo "检查自增ID配置:"
    
    for host in "$SERVER1_HOST" "$SERVER2_HOST"; do
        echo "服务器 $host:"
        mysql -h"$host" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "SHOW VARIABLES LIKE 'auto_increment_%';"
        echo
    done
}

# 检查GTID一致性
check_gtid_consistency() {
    echo "检查GTID一致性:"
    
    gtid1=$(mysql -h"$SERVER1_HOST" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -sN -e "SELECT @@GLOBAL.gtid_executed;")
    gtid2=$(mysql -h"$SERVER2_HOST" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -sN -e "SELECT @@GLOBAL.gtid_executed;")
    
    echo "服务器1 GTID: $gtid1"
    echo "服务器2 GTID: $gtid2"
    
    if [ "$gtid1" = "$gtid2" ]; then
        echo "GTID一致性: 正常"
    else
        echo "GTID一致性: 异常 - 存在差异"
    fi
}

# 主函数
main() {
    echo "MySQL主主复制冲突检测 - $(date)"
    echo "========================================"
    
    check_replication_errors "$SERVER1_HOST" "服务器1"
    echo
    check_replication_errors "$SERVER2_HOST" "服务器2"
    echo
    
    check_auto_increment_conflicts
    echo
    
    check_gtid_consistency
    echo
    
    echo "检测完成"
}

main
EOF

    chmod +x "/usr/local/bin/mysql_conflict_detector.sh"
    log "冲突检测脚本已生成: /usr/local/bin/mysql_conflict_detector.sh"
}

# 主函数
main() {
    case "${1:-help}" in
        setup)
            log "开始配置MySQL主主复制"
            
            # 生成配置文件
            log "为服务器1生成配置"
            generate_master_master_config "1" "1"
            
            log "为服务器2生成配置"
            generate_master_master_config "2" "2"
            
            echo "请将配置文件复制到对应服务器并重启MySQL服务"
            echo "然后运行: $0 configure"
            ;;
        configure)
            log "配置双向复制"
            
            # 配置服务器1到服务器2的复制
            setup_server_replication "$SERVER1_HOST" "$SERVER2_HOST" "服务器1"
            
            # 配置服务器2到服务器1的复制
            setup_server_replication "$SERVER2_HOST" "$SERVER1_HOST" "服务器2"
            
            log "主主复制配置完成"
            ;;
        check)
            check_replication_status "$SERVER1_HOST" "服务器1"
            echo
            check_replication_status "$SERVER2_HOST" "服务器2"
            ;;
        test)
            test_data_sync
            ;;
        conflict)
            generate_conflict_detection
            ;;
        help|*)
            echo "MySQL主主复制配置脚本"
            echo
            echo "用法: $0 [命令]"
            echo
            echo "命令:"
            echo "  setup     - 生成配置文件"
            echo "  configure - 配置双向复制"
            echo "  check     - 检查复制状态"
            echo "  test      - 测试数据同步"
            echo "  conflict  - 生成冲突检测脚本"
            echo "  help      - 显示此帮助信息"
            echo
            echo "配置步骤:"
            echo "  1. 修改脚本中的服务器IP和密码"
            echo "  2. 运行 '$0 setup' 生成配置文件"
            echo "  3. 将配置文件复制到对应服务器"
            echo "  4. 重启两台MySQL服务器"
            echo "  5. 运行 '$0 configure' 配置复制"
            echo "  6. 运行 '$0 check' 检查状态"
            echo "  7. 运行 '$0 test' 测试同步"
            ;;
    esac
}

main "$@"

13.3.2 主主复制冲突处理

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MySQL主主复制冲突处理工具
自动检测和处理主主复制中的数据冲突
"""

import mysql.connector
import logging
import time
import json
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum

class ConflictType(Enum):
    """冲突类型"""
    DUPLICATE_KEY = "duplicate_key"
    MISSING_RECORD = "missing_record"
    DATA_INCONSISTENCY = "data_inconsistency"
    AUTO_INCREMENT_CONFLICT = "auto_increment_conflict"

@dataclass
class ConflictInfo:
    """冲突信息"""
    conflict_type: ConflictType
    server: str
    database: str
    table: str
    error_message: str
    error_code: int
    timestamp: datetime
    resolution_strategy: str = ""
    resolved: bool = False

class MySQLMasterMasterConflictResolver:
    """MySQL主主复制冲突解决器"""
    
    def __init__(self, server1_config: Dict, server2_config: Dict):
        self.server1_config = server1_config
        self.server2_config = server2_config
        self.conflicts = []
        
        # 配置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('/var/log/mysql_conflict_resolver.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def connect_to_server(self, config: Dict) -> mysql.connector.MySQLConnection:
        """连接到MySQL服务器"""
        try:
            conn = mysql.connector.connect(**config)
            return conn
        except mysql.connector.Error as e:
            self.logger.error(f"连接失败 {config['host']}: {e}")
            raise
    
    def get_slave_status(self, conn: mysql.connector.MySQLConnection) -> Dict:
        """获取从库状态"""
        cursor = conn.cursor(dictionary=True)
        cursor.execute("SHOW SLAVE STATUS")
        result = cursor.fetchone()
        cursor.close()
        return result or {}
    
    def detect_replication_errors(self) -> List[ConflictInfo]:
        """检测复制错误"""
        conflicts = []
        
        for server_name, config in [("server1", self.server1_config), ("server2", self.server2_config)]:
            try:
                conn = self.connect_to_server(config)
                slave_status = self.get_slave_status(conn)
                
                if slave_status and slave_status.get('Last_Error'):
                    error_message = slave_status['Last_Error']
                    error_code = slave_status.get('Last_Errno', 0)
                    
                    # 分析错误类型
                    conflict_type = self._analyze_error_type(error_code, error_message)
                    
                    conflict = ConflictInfo(
                        conflict_type=conflict_type,
                        server=server_name,
                        database=self._extract_database_from_error(error_message),
                        table=self._extract_table_from_error(error_message),
                        error_message=error_message,
                        error_code=error_code,
                        timestamp=datetime.now()
                    )
                    
                    conflicts.append(conflict)
                    self.logger.warning(f"检测到冲突: {server_name} - {error_message}")
                
                conn.close()
                
            except Exception as e:
                self.logger.error(f"检测 {server_name} 时出错: {e}")
        
        return conflicts
    
    def _analyze_error_type(self, error_code: int, error_message: str) -> ConflictType:
        """分析错误类型"""
        if error_code == 1062 or "Duplicate entry" in error_message:
            return ConflictType.DUPLICATE_KEY
        elif error_code == 1032 or "Can't find record" in error_message:
            return ConflictType.MISSING_RECORD
        elif "auto_increment" in error_message.lower():
            return ConflictType.AUTO_INCREMENT_CONFLICT
        else:
            return ConflictType.DATA_INCONSISTENCY
    
    def _extract_database_from_error(self, error_message: str) -> str:
        """从错误消息中提取数据库名"""
        # 简单的正则匹配,实际应用中可能需要更复杂的解析
        import re
        match = re.search(r"database '([^']+)'", error_message)
        return match.group(1) if match else "unknown"
    
    def _extract_table_from_error(self, error_message: str) -> str:
        """从错误消息中提取表名"""
        import re
        match = re.search(r"table '([^']+)'", error_message)
        return match.group(1) if match else "unknown"
    
    def resolve_duplicate_key_conflict(self, conflict: ConflictInfo) -> bool:
        """解决重复键冲突"""
        try:
            server_config = self.server1_config if conflict.server == "server1" else self.server2_config
            conn = self.connect_to_server(server_config)
            
            # 策略1: 跳过重复键错误
            cursor = conn.cursor()
            cursor.execute("SET GLOBAL sql_slave_skip_counter = 1")
            cursor.execute("START SLAVE")
            cursor.close()
            
            conflict.resolution_strategy = "跳过重复键错误"
            conflict.resolved = True
            
            self.logger.info(f"已解决重复键冲突: {conflict.server}")
            conn.close()
            return True
            
        except Exception as e:
            self.logger.error(f"解决重复键冲突失败: {e}")
            return False
    
    def resolve_missing_record_conflict(self, conflict: ConflictInfo) -> bool:
        """解决缺失记录冲突"""
        try:
            server_config = self.server1_config if conflict.server == "server1" else self.server2_config
            conn = self.connect_to_server(server_config)
            
            # 策略1: 跳过缺失记录错误
            cursor = conn.cursor()
            cursor.execute("SET GLOBAL sql_slave_skip_counter = 1")
            cursor.execute("START SLAVE")
            cursor.close()
            
            conflict.resolution_strategy = "跳过缺失记录错误"
            conflict.resolved = True
            
            self.logger.info(f"已解决缺失记录冲突: {conflict.server}")
            conn.close()
            return True
            
        except Exception as e:
            self.logger.error(f"解决缺失记录冲突失败: {e}")
            return False
    
    def resolve_auto_increment_conflict(self, conflict: ConflictInfo) -> bool:
        """解决自增ID冲突"""
        try:
            # 检查并修复自增配置
            for i, (server_name, config) in enumerate([("server1", self.server1_config), ("server2", self.server2_config)], 1):
                conn = self.connect_to_server(config)
                cursor = conn.cursor()
                
                # 设置自增参数
                cursor.execute("SET GLOBAL auto_increment_increment = 2")
                cursor.execute(f"SET GLOBAL auto_increment_offset = {i}")
                
                cursor.close()
                conn.close()
            
            conflict.resolution_strategy = "修复自增ID配置"
            conflict.resolved = True
            
            self.logger.info("已解决自增ID冲突")
            return True
            
        except Exception as e:
            self.logger.error(f"解决自增ID冲突失败: {e}")
            return False
    
    def resolve_conflict(self, conflict: ConflictInfo) -> bool:
        """解决冲突"""
        self.logger.info(f"开始解决冲突: {conflict.conflict_type.value} on {conflict.server}")
        
        if conflict.conflict_type == ConflictType.DUPLICATE_KEY:
            return self.resolve_duplicate_key_conflict(conflict)
        elif conflict.conflict_type == ConflictType.MISSING_RECORD:
            return self.resolve_missing_record_conflict(conflict)
        elif conflict.conflict_type == ConflictType.AUTO_INCREMENT_CONFLICT:
            return self.resolve_auto_increment_conflict(conflict)
        else:
            self.logger.warning(f"未知冲突类型: {conflict.conflict_type}")
            return False
    
    def check_data_consistency(self) -> Dict:
        """检查数据一致性"""
        consistency_report = {
            'timestamp': datetime.now().isoformat(),
            'databases': {},
            'overall_consistent': True
        }
        
        try:
            conn1 = self.connect_to_server(self.server1_config)
            conn2 = self.connect_to_server(self.server2_config)
            
            # 获取数据库列表
            cursor1 = conn1.cursor()
            cursor1.execute("SHOW DATABASES")
            databases = [db[0] for db in cursor1.fetchall() if db[0] not in ['information_schema', 'performance_schema', 'mysql', 'sys']]
            cursor1.close()
            
            for db in databases:
                db_report = self._check_database_consistency(conn1, conn2, db)
                consistency_report['databases'][db] = db_report
                if not db_report['consistent']:
                    consistency_report['overall_consistent'] = False
            
            conn1.close()
            conn2.close()
            
        except Exception as e:
            self.logger.error(f"检查数据一致性时出错: {e}")
            consistency_report['error'] = str(e)
        
        return consistency_report
    
    def _check_database_consistency(self, conn1, conn2, database: str) -> Dict:
        """检查单个数据库的一致性"""
        db_report = {
            'consistent': True,
            'tables': {},
            'row_count_diff': 0
        }
        
        try:
            # 获取表列表
            cursor1 = conn1.cursor()
            cursor1.execute(f"USE {database}")
            cursor1.execute("SHOW TABLES")
            tables = [table[0] for table in cursor1.fetchall()]
            cursor1.close()
            
            cursor2 = conn2.cursor()
            cursor2.execute(f"USE {database}")
            
            for table in tables:
                table_report = self._check_table_consistency(conn1, conn2, database, table)
                db_report['tables'][table] = table_report
                if not table_report['consistent']:
                    db_report['consistent'] = False
                db_report['row_count_diff'] += abs(table_report.get('row_count_diff', 0))
            
            cursor2.close()
            
        except Exception as e:
            self.logger.error(f"检查数据库 {database} 一致性时出错: {e}")
            db_report['error'] = str(e)
            db_report['consistent'] = False
        
        return db_report
    
    def _check_table_consistency(self, conn1, conn2, database: str, table: str) -> Dict:
        """检查单个表的一致性"""
        table_report = {
            'consistent': True,
            'row_count_server1': 0,
            'row_count_server2': 0,
            'row_count_diff': 0,
            'checksum_server1': '',
            'checksum_server2': ''
        }
        
        try:
            # 检查行数
            cursor1 = conn1.cursor()
            cursor1.execute(f"SELECT COUNT(*) FROM {database}.{table}")
            count1 = cursor1.fetchone()[0]
            cursor1.close()
            
            cursor2 = conn2.cursor()
            cursor2.execute(f"SELECT COUNT(*) FROM {database}.{table}")
            count2 = cursor2.fetchone()[0]
            cursor2.close()
            
            table_report['row_count_server1'] = count1
            table_report['row_count_server2'] = count2
            table_report['row_count_diff'] = count1 - count2
            
            if count1 != count2:
                table_report['consistent'] = False
            
            # 计算校验和(如果行数相同)
            if count1 == count2 and count1 > 0:
                cursor1 = conn1.cursor()
                cursor1.execute(f"CHECKSUM TABLE {database}.{table}")
                checksum1 = cursor1.fetchone()[1]
                cursor1.close()
                
                cursor2 = conn2.cursor()
                cursor2.execute(f"CHECKSUM TABLE {database}.{table}")
                checksum2 = cursor2.fetchone()[1]
                cursor2.close()
                
                table_report['checksum_server1'] = str(checksum1)
                table_report['checksum_server2'] = str(checksum2)
                
                if checksum1 != checksum2:
                    table_report['consistent'] = False
        
        except Exception as e:
            self.logger.error(f"检查表 {database}.{table} 一致性时出错: {e}")
            table_report['error'] = str(e)
            table_report['consistent'] = False
        
        return table_report
    
    def generate_conflict_report(self) -> Dict:
        """生成冲突报告"""
        report = {
            'timestamp': datetime.now().isoformat(),
            'total_conflicts': len(self.conflicts),
            'resolved_conflicts': len([c for c in self.conflicts if c.resolved]),
            'unresolved_conflicts': len([c for c in self.conflicts if not c.resolved]),
            'conflicts_by_type': {},
            'conflicts': []
        }
        
        # 按类型统计冲突
        for conflict_type in ConflictType:
            count = len([c for c in self.conflicts if c.conflict_type == conflict_type])
            report['conflicts_by_type'][conflict_type.value] = count
        
        # 冲突详情
        for conflict in self.conflicts:
            conflict_data = {
                'type': conflict.conflict_type.value,
                'server': conflict.server,
                'database': conflict.database,
                'table': conflict.table,
                'error_message': conflict.error_message,
                'error_code': conflict.error_code,
                'timestamp': conflict.timestamp.isoformat(),
                'resolution_strategy': conflict.resolution_strategy,
                'resolved': conflict.resolved
            }
            report['conflicts'].append(conflict_data)
        
        return report
    
    def run_conflict_resolution(self, auto_resolve: bool = False) -> Dict:
        """运行冲突解决流程"""
        self.logger.info("开始冲突检测和解决流程")
        
        # 检测冲突
        conflicts = self.detect_replication_errors()
        self.conflicts.extend(conflicts)
        
        if not conflicts:
            self.logger.info("未检测到冲突")
            return {'status': 'success', 'message': '未检测到冲突'}
        
        self.logger.info(f"检测到 {len(conflicts)} 个冲突")
        
        # 自动解决冲突
        if auto_resolve:
            resolved_count = 0
            for conflict in conflicts:
                if self.resolve_conflict(conflict):
                    resolved_count += 1
            
            self.logger.info(f"已解决 {resolved_count}/{len(conflicts)} 个冲突")
        
        # 生成报告
        report = self.generate_conflict_report()
        
        # 检查数据一致性
        consistency_report = self.check_data_consistency()
        report['data_consistency'] = consistency_report
        
        return report
    
    def export_report(self, report: Dict, filename: str = None):
        """导出报告"""
        if not filename:
            filename = f"mysql_conflict_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(report, f, ensure_ascii=False, indent=2)
        
        self.logger.info(f"报告已导出到: {filename}")

# 使用示例
if __name__ == '__main__':
    # 服务器配置
    server1_config = {
        'host': '192.168.1.100',
        'port': 3306,
        'user': 'root',
        'password': 'password',
        'autocommit': True
    }
    
    server2_config = {
        'host': '192.168.1.101',
        'port': 3306,
        'user': 'root',
        'password': 'password',
        'autocommit': True
    }
    
    # 创建冲突解决器
    resolver = MySQLMasterMasterConflictResolver(server1_config, server2_config)
    
    # 运行冲突解决流程
    report = resolver.run_conflict_resolution(auto_resolve=True)
    
    # 导出报告
    resolver.export_report(report)
    
    print("冲突解决完成,详细报告已生成")
    print(f"总冲突数: {report['total_conflicts']}")
    print(f"已解决: {report['resolved_conflicts']}")
    print(f"未解决: {report['unresolved_conflicts']}")

13.4 MySQL集群架构

13.4.1 MySQL Cluster (NDB) 配置

MySQL Cluster是基于NDB存储引擎的分布式集群解决方案,提供真正的无单点故障架构。

#!/bin/bash
# MySQL Cluster自动部署脚本
# 部署管理节点、数据节点和SQL节点

set -euo pipefail

# 集群配置
CLUSTER_NAME="mysql_cluster"
MANAGEMENT_NODE="192.168.1.100"
DATA_NODES=("192.168.1.101" "192.168.1.102")
SQL_NODES=("192.168.1.103" "192.168.1.104")
CLUSTER_USER="mysql"
CLUSTER_PASSWORD="cluster_password"
DATA_DIR="/var/lib/mysql-cluster"
LOG_DIR="/var/log/mysql-cluster"

# 日志函数
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1"
}

# 错误处理
error_exit() {
    log "错误: $1"
    exit 1
}

# 检查SSH连接
check_ssh_connection() {
    local host=$1
    if ! ssh -o ConnectTimeout=5 "$host" "echo 'SSH连接正常'" >/dev/null 2>&1; then
        error_exit "无法SSH连接到 $host"
    fi
    log "SSH连接到 $host 正常"
}

# 安装MySQL Cluster软件包
install_mysql_cluster() {
    local host=$1
    local node_type=$2
    
    log "在 $host 安装MySQL Cluster ($node_type)"
    
    ssh "$host" << EOF
# 更新系统
sudo apt-get update

# 安装MySQL Cluster
wget https://dev.mysql.com/get/Downloads/MySQL-Cluster-8.0/mysql-cluster-community-server_8.0.34-1ubuntu20.04_amd64.deb
wget https://dev.mysql.com/get/Downloads/MySQL-Cluster-8.0/mysql-cluster-community-client_8.0.34-1ubuntu20.04_amd64.deb
wget https://dev.mysql.com/get/Downloads/MySQL-Cluster-8.0/mysql-cluster-community-common_8.0.34-1ubuntu20.04_amd64.deb

sudo dpkg -i mysql-cluster-community-*.deb
sudo apt-get install -f

# 创建用户和目录
sudo useradd -r -s /bin/false mysql || true
sudo mkdir -p $DATA_DIR $LOG_DIR
sudo chown mysql:mysql $DATA_DIR $LOG_DIR
EOF

    log "$host MySQL Cluster安装完成"
}

# 生成管理节点配置
generate_management_config() {
    log "生成管理节点配置"
    
    cat > "/tmp/config.ini" << EOF
[ndbd default]
NoOfReplicas=2
DataMemory=80M
IndexMemory=18M
ServerPort=2202
DataDir=$DATA_DIR
MaxNoOfConcurrentOperations=100000
MaxNoOfLocalOperations=110000
RedoBuffer=8M
LongMessageBuffer=1M
MaxNoOfTriggers=14000
MaxNoOfFiredTriggers=4000
TransactionBufferMemory=1M
DefaultHashMapSize=241
DefaultHashMapBuckets=240
MaxNoOfOrderedIndexes=512
MaxNoOfUniqueHashIndexes=64
MaxNoOfAttributes=1000
MaxNoOfTables=128
StringMemory=25
MaxNoOfConcurrentIndexOperations=8K
MaxNoOfConcurrentScans=256
MaxNoOfLocalScans=32
BatchSizePerLocalScan=256
LockPagesInMainMemory=1
TimeBetweenWatchDogCheck=6000
TimeBetweenWatchDogCheckInitial=6000
StartFailureTimeout=0
HeartbeatIntervalDbDb=5000
HeartbeatIntervalDbApi=1500
TimeBetweenLocalCheckpoints=20
TimeBetweenGlobalCheckpoints=2000
CompressedLCP=0
CompressedBackup=1
BackupMaxWriteSize=1M
BackupLogBufferSize=2M
BackupDataBufferSize=2M
MaxAllocate=32M
MemReportFrequency=30
BackupReportFrequency=10
DiskCheckpointSpeed=10M
DiskCheckpointSpeedInRestart=100M
ArbitrationTimeout=7500
ArbitrationDelay=0
StopOnError=true
RestartOnErrorInsert=2
MaxStartFailRetries=3
StartFailRetryDelay=0

[ndbd mgmd]
Hostname=$MANAGEMENT_NODE
DataDir=$DATA_DIR
LogDestination=FILE:filename=ndb_mgmd.log,maxsize=1000000,maxfiles=6
ArbitrationRank=1

EOF

    # 添加数据节点配置
    local node_id=2
    for data_node in "${DATA_NODES[@]}"; do
        cat >> "/tmp/config.ini" << EOF
[ndbd]
Hostname=$data_node
NodeId=$node_id
DataDir=$DATA_DIR
LogDestination=FILE:filename=ndb_${node_id}.log,maxsize=1000000,maxfiles=6

EOF
        ((node_id++))
    done
    
    # 添加SQL节点配置
    for sql_node in "${SQL_NODES[@]}"; do
        cat >> "/tmp/config.ini" << EOF
[mysqld]
Hostname=$sql_node
NodeId=$node_id

EOF
        ((node_id++))
    done
    
    # 添加API节点配置(用于应用连接)
    cat >> "/tmp/config.ini" << EOF
[mysqld]
[mysqld]
[mysqld]
[mysqld]
EOF

    log "管理节点配置已生成"
}

# 配置管理节点
setup_management_node() {
    log "配置管理节点: $MANAGEMENT_NODE"
    
    # 检查连接
    check_ssh_connection "$MANAGEMENT_NODE"
    
    # 安装软件
    install_mysql_cluster "$MANAGEMENT_NODE" "management"
    
    # 复制配置文件
    scp "/tmp/config.ini" "$MANAGEMENT_NODE:/var/lib/mysql-cluster/"
    
    # 启动管理节点
    ssh "$MANAGEMENT_NODE" << EOF
sudo ndb_mgmd -f /var/lib/mysql-cluster/config.ini --initial --configdir=/var/lib/mysql-cluster/
EOF

    log "管理节点配置完成"
}

# 配置数据节点
setup_data_nodes() {
    for data_node in "${DATA_NODES[@]}"; do
        log "配置数据节点: $data_node"
        
        # 检查连接
        check_ssh_connection "$data_node"
        
        # 安装软件
        install_mysql_cluster "$data_node" "data"
        
        # 生成数据节点配置
        ssh "$data_node" << EOF
sudo mkdir -p /etc/mysql
cat > /tmp/my.cnf << 'MYCNF'
[mysql_cluster]
ndb-connectstring=$MANAGEMENT_NODE

[ndbd]
connect-string=$MANAGEMENT_NODE
MYCNF

sudo mv /tmp/my.cnf /etc/mysql/my.cnf
sudo chown mysql:mysql /etc/mysql/my.cnf
EOF
        
        # 启动数据节点
        ssh "$data_node" << EOF
sudo ndbd --initial
EOF
        
        log "数据节点 $data_node 配置完成"
    done
}

# 配置SQL节点
setup_sql_nodes() {
    for sql_node in "${SQL_NODES[@]}"; do
        log "配置SQL节点: $sql_node"
        
        # 检查连接
        check_ssh_connection "$sql_node"
        
        # 安装软件
        install_mysql_cluster "$sql_node" "sql"
        
        # 生成SQL节点配置
        ssh "$sql_node" << EOF
sudo mkdir -p /etc/mysql
cat > /tmp/my.cnf << 'MYCNF'
[mysqld]
user=mysql
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
port=3306
ndbcluster
ndb-connectstring=$MANAGEMENT_NODE

[mysql_cluster]
ndb-connectstring=$MANAGEMENT_NODE

[client]
port=3306
socket=/var/lib/mysql/mysql.sock
MYCNF

sudo mv /tmp/my.cnf /etc/mysql/my.cnf
sudo chown mysql:mysql /etc/mysql/my.cnf

# 初始化MySQL数据目录
sudo mysqld --initialize-insecure --user=mysql --datadir=/var/lib/mysql

# 启动MySQL服务
sudo mysqld_safe --user=mysql &
EOF
        
        # 等待MySQL启动
        sleep 10
        
        # 配置MySQL用户
        ssh "$sql_node" << EOF
mysql -u root << 'SQL'
CREATE USER '$CLUSTER_USER'@'%' IDENTIFIED BY '$CLUSTER_PASSWORD';
GRANT ALL PRIVILEGES ON *.* TO '$CLUSTER_USER'@'%' WITH GRANT OPTION;
FLUSH PRIVILEGES;
SQL
EOF
        
        log "SQL节点 $sql_node 配置完成"
    done
}

# 检查集群状态
check_cluster_status() {
    log "检查集群状态"
    
    ssh "$MANAGEMENT_NODE" << 'EOF'
echo "集群状态:"
ndb_mgm -e "SHOW"

echo -e "\n节点状态:"
ndb_mgm -e "ALL STATUS"

echo -e "\n集群报告:"
ndb_mgm -e "ALL REPORT MEMORY"
EOF
}

# 创建测试数据库和表
create_test_database() {
    log "创建测试数据库"
    
    local sql_node="${SQL_NODES[0]}"
    
    ssh "$sql_node" << EOF
mysql -u$CLUSTER_USER -p$CLUSTER_PASSWORD << 'SQL'
CREATE DATABASE IF NOT EXISTS cluster_test;
USE cluster_test;

CREATE TABLE test_table (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(100),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=NDB;

INSERT INTO test_table (name) VALUES 
('Test Record 1'),
('Test Record 2'),
('Test Record 3');

SELECT * FROM test_table;
SQL
EOF

    log "测试数据库创建完成"
}

# 生成集群监控脚本
generate_monitoring_script() {
    log "生成集群监控脚本"
    
    cat > "/tmp/mysql_cluster_monitor.sh" << 'EOF'
#!/bin/bash
# MySQL Cluster监控脚本

MANAGEMENT_NODE="192.168.1.100"
ALERT_EMAIL="admin@example.com"
LOG_FILE="/var/log/mysql_cluster_monitor.log"

# 日志函数
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}

# 检查管理节点
check_management_node() {
    if ! ssh "$MANAGEMENT_NODE" "pgrep ndb_mgmd" >/dev/null 2>&1; then
        log "ALERT: 管理节点离线"
        send_alert "MySQL Cluster管理节点离线" "管理节点 $MANAGEMENT_NODE 无法访问"
        return 1
    fi
    return 0
}

# 检查数据节点
check_data_nodes() {
    local failed_nodes=()
    
    local status_output=$(ssh "$MANAGEMENT_NODE" "ndb_mgm -e 'SHOW'" 2>/dev/null)
    
    if echo "$status_output" | grep -q "not connected"; then
        local failed=$(echo "$status_output" | grep "not connected" | awk '{print $2}')
        failed_nodes+=("$failed")
    fi
    
    if [ ${#failed_nodes[@]} -gt 0 ]; then
        log "ALERT: 数据节点故障: ${failed_nodes[*]}"
        send_alert "MySQL Cluster数据节点故障" "故障节点: ${failed_nodes[*]}"
        return 1
    fi
    
    return 0
}

# 检查SQL节点
check_sql_nodes() {
    local sql_nodes=("192.168.1.103" "192.168.1.104")
    local failed_nodes=()
    
    for node in "${sql_nodes[@]}"; do
        if ! ssh "$node" "mysqladmin ping" >/dev/null 2>&1; then
            failed_nodes+=("$node")
        fi
    done
    
    if [ ${#failed_nodes[@]} -gt 0 ]; then
        log "ALERT: SQL节点故障: ${failed_nodes[*]}"
        send_alert "MySQL Cluster SQL节点故障" "故障节点: ${failed_nodes[*]}"
        return 1
    fi
    
    return 0
}

# 检查集群性能
check_cluster_performance() {
    local memory_usage=$(ssh "$MANAGEMENT_NODE" "ndb_mgm -e 'ALL REPORT MEMORY'" 2>/dev/null | grep "Data usage is" | head -1 | awk '{print $4}' | tr -d '%')
    
    if [ -n "$memory_usage" ] && [ "$memory_usage" -gt 80 ]; then
        log "WARNING: 数据内存使用率过高: ${memory_usage}%"
        send_alert "MySQL Cluster内存告警" "数据内存使用率: ${memory_usage}%"
    fi
}

# 发送告警
send_alert() {
    local subject="$1"
    local message="$2"
    
    echo "$message" | mail -s "$subject" "$ALERT_EMAIL" 2>/dev/null || {
        log "WARNING: 无法发送邮件告警"
        logger "MySQL Cluster Alert: $subject - $message"
    }
}

# 生成状态报告
generate_status_report() {
    local report_file="/tmp/cluster_status_$(date +%Y%m%d_%H%M%S).txt"
    
    {
        echo "MySQL Cluster状态报告 - $(date)"
        echo "========================================"
        echo
        
        echo "集群概览:"
        ssh "$MANAGEMENT_NODE" "ndb_mgm -e 'SHOW'"
        echo
        
        echo "节点状态:"
        ssh "$MANAGEMENT_NODE" "ndb_mgm -e 'ALL STATUS'"
        echo
        
        echo "内存使用:"
        ssh "$MANAGEMENT_NODE" "ndb_mgm -e 'ALL REPORT MEMORY'"
        echo
        
        echo "备份状态:"
        ssh "$MANAGEMENT_NODE" "ndb_mgm -e 'ALL REPORT BACKUP'"
        
    } > "$report_file"
    
    log "状态报告已生成: $report_file"
}

# 主函数
main() {
    case "${1:-check}" in
        check)
            log "开始集群健康检查"
            
            local errors=0
            
            check_management_node || ((errors++))
            check_data_nodes || ((errors++))
            check_sql_nodes || ((errors++))
            check_cluster_performance
            
            if [ $errors -eq 0 ]; then
                log "集群状态正常"
            else
                log "检测到 $errors 个问题"
            fi
            ;;
        report)
            generate_status_report
            ;;
        monitor)
            while true; do
                main check
                sleep 60
            done
            ;;
        *)
            echo "用法: $0 {check|report|monitor}"
            echo "  check   - 检查集群状态"
            echo "  report  - 生成状态报告"
            echo "  monitor - 持续监控"
            exit 1
            ;;
    esac
}

main "$@"
EOF

    chmod +x "/tmp/mysql_cluster_monitor.sh"
    
    # 复制到管理节点
    scp "/tmp/mysql_cluster_monitor.sh" "$MANAGEMENT_NODE:/usr/local/bin/"
    
    log "集群监控脚本已生成并部署"
}

# 主函数
main() {
    case "${1:-help}" in
        install)
            log "开始MySQL Cluster部署"
            
            # 生成配置
            generate_management_config
            
            # 配置各节点
            setup_management_node
            setup_data_nodes
            setup_sql_nodes
            
            # 检查状态
            sleep 10
            check_cluster_status
            
            # 创建测试数据
            create_test_database
            
            # 生成监控脚本
            generate_monitoring_script
            
            log "MySQL Cluster部署完成"
            ;;
        status)
            check_cluster_status
            ;;
        test)
            create_test_database
            ;;
        monitor)
            generate_monitoring_script
            ;;
        help|*)
            echo "MySQL Cluster部署脚本"
            echo
            echo "用法: $0 [命令]"
            echo
            echo "命令:"
            echo "  install  - 完整部署集群"
            echo "  status   - 检查集群状态"
            echo "  test     - 创建测试数据"
            echo "  monitor  - 生成监控脚本"
            echo "  help     - 显示此帮助信息"
            echo
            echo "配置说明:"
            echo "  管理节点: $MANAGEMENT_NODE"
            echo "  数据节点: ${DATA_NODES[*]}"
            echo "  SQL节点:  ${SQL_NODES[*]}"
            echo
            echo "部署前请确保:"
            echo "  1. 所有节点已配置SSH免密登录"
            echo "  2. 所有节点防火墙已开放相关端口"
            echo "  3. 所有节点时间已同步"
            ;;
    esac
}

main "$@"

13.4.2 MySQL Cluster管理

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MySQL Cluster管理工具
提供集群监控、备份、故障恢复等功能
"""

import subprocess
import json
import time
import logging
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum

class NodeType(Enum):
    """节点类型"""
    MANAGEMENT = "mgmd"
    DATA = "ndbd"
    SQL = "mysqld"

class NodeStatus(Enum):
    """节点状态"""
    CONNECTED = "connected"
    NOT_CONNECTED = "not connected"
    STARTING = "starting"
    SHUTTING_DOWN = "shutting down"
    RESTARTING = "restarting"

@dataclass
class ClusterNode:
    """集群节点信息"""
    node_id: int
    node_type: NodeType
    hostname: str
    status: NodeStatus
    version: str = ""
    group: int = 0
    master: bool = False

class MySQLClusterManager:
    """MySQL Cluster管理器"""
    
    def __init__(self, management_host: str = "localhost", management_port: int = 1186):
        self.management_host = management_host
        self.management_port = management_port
        self.connect_string = f"{management_host}:{management_port}"
        
        # 配置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('/var/log/mysql_cluster_manager.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def execute_mgm_command(self, command: str) -> str:
        """执行管理命令"""
        try:
            cmd = ["ndb_mgm", "-c", self.connect_string, "-e", command]
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
            
            if result.returncode != 0:
                raise Exception(f"命令执行失败: {result.stderr}")
            
            return result.stdout.strip()
        
        except subprocess.TimeoutExpired:
            raise Exception("命令执行超时")
        except Exception as e:
            self.logger.error(f"执行管理命令失败: {e}")
            raise
    
    def get_cluster_status(self) -> Dict:
        """获取集群状态"""
        try:
            output = self.execute_mgm_command("SHOW")
            nodes = self._parse_cluster_status(output)
            
            status = {
                'timestamp': datetime.now().isoformat(),
                'total_nodes': len(nodes),
                'connected_nodes': len([n for n in nodes if n.status == NodeStatus.CONNECTED]),
                'nodes': []
            }
            
            for node in nodes:
                node_info = {
                    'node_id': node.node_id,
                    'type': node.node_type.value,
                    'hostname': node.hostname,
                    'status': node.status.value,
                    'version': node.version,
                    'group': node.group,
                    'master': node.master
                }
                status['nodes'].append(node_info)
            
            return status
        
        except Exception as e:
            self.logger.error(f"获取集群状态失败: {e}")
            return {'error': str(e)}
    
    def _parse_cluster_status(self, output: str) -> List[ClusterNode]:
        """解析集群状态输出"""
        nodes = []
        lines = output.split('\n')
        
        for line in lines:
            line = line.strip()
            if not line or line.startswith('Cluster') or line.startswith('Node'):
                continue
            
            # 解析节点信息
            parts = line.split()
            if len(parts) >= 4:
                try:
                    node_id = int(parts[0])
                    node_type_str = parts[1].lower()
                    hostname = parts[2] if len(parts) > 2 else "unknown"
                    status_str = ' '.join(parts[3:]).lower()
                    
                    # 确定节点类型
                    if 'mgm' in node_type_str:
                        node_type = NodeType.MANAGEMENT
                    elif 'ndb' in node_type_str:
                        node_type = NodeType.DATA
                    else:
                        node_type = NodeType.SQL
                    
                    # 确定节点状态
                    if 'connected' in status_str:
                        status = NodeStatus.CONNECTED
                    elif 'not connected' in status_str:
                        status = NodeStatus.NOT_CONNECTED
                    elif 'starting' in status_str:
                        status = NodeStatus.STARTING
                    elif 'shutting down' in status_str:
                        status = NodeStatus.SHUTTING_DOWN
                    else:
                        status = NodeStatus.RESTARTING
                    
                    node = ClusterNode(
                        node_id=node_id,
                        node_type=node_type,
                        hostname=hostname,
                        status=status
                    )
                    nodes.append(node)
                
                except (ValueError, IndexError) as e:
                    self.logger.warning(f"解析节点信息失败: {line} - {e}")
        
        return nodes
    
    def get_node_status(self, node_id: int) -> Dict:
        """获取特定节点状态"""
        try:
            output = self.execute_mgm_command(f"{node_id} STATUS")
            
            status = {
                'node_id': node_id,
                'timestamp': datetime.now().isoformat(),
                'details': output
            }
            
            return status
        
        except Exception as e:
            self.logger.error(f"获取节点 {node_id} 状态失败: {e}")
            return {'error': str(e)}
    
    def get_memory_usage(self) -> Dict:
        """获取内存使用情况"""
        try:
            output = self.execute_mgm_command("ALL REPORT MEMORY")
            
            memory_info = {
                'timestamp': datetime.now().isoformat(),
                'nodes': []
            }
            
            lines = output.split('\n')
            current_node = None
            
            for line in lines:
                line = line.strip()
                if line.startswith('Node'):
                    if current_node:
                        memory_info['nodes'].append(current_node)
                    
                    node_id = int(line.split()[1].rstrip(':'))
                    current_node = {
                        'node_id': node_id,
                        'data_memory': {},
                        'index_memory': {}
                    }
                
                elif current_node and 'Data usage is' in line:
                    # 解析数据内存使用
                    parts = line.split()
                    usage_pct = int(parts[3].rstrip('%'))
                    used_mb = int(parts[0])
                    total_mb = int(parts[5])
                    
                    current_node['data_memory'] = {
                        'used_mb': used_mb,
                        'total_mb': total_mb,
                        'usage_percent': usage_pct
                    }
                
                elif current_node and 'Index usage is' in line:
                    # 解析索引内存使用
                    parts = line.split()
                    usage_pct = int(parts[3].rstrip('%'))
                    used_mb = int(parts[0])
                    total_mb = int(parts[5])
                    
                    current_node['index_memory'] = {
                        'used_mb': used_mb,
                        'total_mb': total_mb,
                        'usage_percent': usage_pct
                    }
            
            if current_node:
                memory_info['nodes'].append(current_node)
            
            return memory_info
        
        except Exception as e:
            self.logger.error(f"获取内存使用情况失败: {e}")
            return {'error': str(e)}
    
    def start_backup(self, backup_id: Optional[int] = None, wait: bool = True) -> Dict:
        """启动集群备份"""
        try:
            if backup_id:
                command = f"START BACKUP {backup_id}"
            else:
                command = "START BACKUP"
            
            if wait:
                command += " WAIT COMPLETED"
            
            output = self.execute_mgm_command(command)
            
            backup_info = {
                'timestamp': datetime.now().isoformat(),
                'command': command,
                'output': output,
                'status': 'started' if not wait else 'completed'
            }
            
            # 解析备份ID
            for line in output.split('\n'):
                if 'Backup' in line and 'started' in line:
                    parts = line.split()
                    for i, part in enumerate(parts):
                        if part == 'Backup' and i + 1 < len(parts):
                            backup_info['backup_id'] = int(parts[i + 1])
                            break
            
            self.logger.info(f"备份已启动: {backup_info.get('backup_id', 'unknown')}")
            return backup_info
        
        except Exception as e:
            self.logger.error(f"启动备份失败: {e}")
            return {'error': str(e)}
    
    def get_backup_status(self) -> Dict:
        """获取备份状态"""
        try:
            output = self.execute_mgm_command("ALL REPORT BACKUP")
            
            backup_status = {
                'timestamp': datetime.now().isoformat(),
                'details': output
            }
            
            return backup_status
        
        except Exception as e:
            self.logger.error(f"获取备份状态失败: {e}")
            return {'error': str(e)}
    
    def restart_node(self, node_id: int, initial: bool = False) -> Dict:
        """重启节点"""
        try:
            command = f"{node_id} RESTART"
            if initial:
                command += " -I"
            
            output = self.execute_mgm_command(command)
            
            restart_info = {
                'node_id': node_id,
                'timestamp': datetime.now().isoformat(),
                'initial': initial,
                'output': output
            }
            
            self.logger.info(f"节点 {node_id} 重启命令已发送")
            return restart_info
        
        except Exception as e:
            self.logger.error(f"重启节点 {node_id} 失败: {e}")
            return {'error': str(e)}
    
    def stop_node(self, node_id: int, abort: bool = False) -> Dict:
        """停止节点"""
        try:
            command = f"{node_id} STOP"
            if abort:
                command += " -A"
            
            output = self.execute_mgm_command(command)
            
            stop_info = {
                'node_id': node_id,
                'timestamp': datetime.now().isoformat(),
                'abort': abort,
                'output': output
            }
            
            self.logger.info(f"节点 {node_id} 停止命令已发送")
            return stop_info
        
        except Exception as e:
            self.logger.error(f"停止节点 {node_id} 失败: {e}")
            return {'error': str(e)}
    
    def rolling_restart(self, node_type: Optional[NodeType] = None) -> Dict:
        """滚动重启"""
        try:
            self.logger.info("开始滚动重启")
            
            # 获取当前集群状态
            cluster_status = self.get_cluster_status()
            if 'error' in cluster_status:
                return cluster_status
            
            restart_results = []
            
            # 过滤需要重启的节点
            nodes_to_restart = []
            for node in cluster_status['nodes']:
                if node_type is None or NodeType(node['type']) == node_type:
                    if node['status'] == NodeStatus.CONNECTED.value:
                        nodes_to_restart.append(node)
            
            # 逐个重启节点
            for node in nodes_to_restart:
                node_id = node['node_id']
                self.logger.info(f"重启节点 {node_id}")
                
                # 重启节点
                restart_result = self.restart_node(node_id)
                restart_results.append(restart_result)
                
                if 'error' in restart_result:
                    self.logger.error(f"节点 {node_id} 重启失败")
                    continue
                
                # 等待节点重新连接
                self.logger.info(f"等待节点 {node_id} 重新连接")
                max_wait = 300  # 5分钟
                wait_time = 0
                
                while wait_time < max_wait:
                    time.sleep(10)
                    wait_time += 10
                    
                    node_status = self.get_node_status(node_id)
                    if 'error' not in node_status:
                        self.logger.info(f"节点 {node_id} 已重新连接")
                        break
                else:
                    self.logger.warning(f"节点 {node_id} 重启超时")
            
            rolling_restart_info = {
                'timestamp': datetime.now().isoformat(),
                'node_type': node_type.value if node_type else 'all',
                'total_nodes': len(nodes_to_restart),
                'restart_results': restart_results
            }
            
            self.logger.info("滚动重启完成")
            return rolling_restart_info
        
        except Exception as e:
            self.logger.error(f"滚动重启失败: {e}")
            return {'error': str(e)}
    
    def generate_health_report(self) -> Dict:
        """生成健康报告"""
        report = {
            'timestamp': datetime.now().isoformat(),
            'cluster_status': self.get_cluster_status(),
            'memory_usage': self.get_memory_usage(),
            'backup_status': self.get_backup_status(),
            'health_score': 0,
            'recommendations': []
        }
        
        # 计算健康分数
        health_score = 100
        
        # 检查节点连接状态
        cluster_status = report['cluster_status']
        if 'error' not in cluster_status:
            total_nodes = cluster_status['total_nodes']
            connected_nodes = cluster_status['connected_nodes']
            
            if total_nodes > 0:
                connection_ratio = connected_nodes / total_nodes
                if connection_ratio < 1.0:
                    health_score -= (1.0 - connection_ratio) * 50
                    report['recommendations'].append(f"有 {total_nodes - connected_nodes} 个节点未连接")
        
        # 检查内存使用
        memory_usage = report['memory_usage']
        if 'error' not in memory_usage:
            for node in memory_usage['nodes']:
                data_usage = node.get('data_memory', {}).get('usage_percent', 0)
                index_usage = node.get('index_memory', {}).get('usage_percent', 0)
                
                if data_usage > 80:
                    health_score -= 10
                    report['recommendations'].append(f"节点 {node['node_id']} 数据内存使用率过高: {data_usage}%")
                
                if index_usage > 80:
                    health_score -= 10
                    report['recommendations'].append(f"节点 {node['node_id']} 索引内存使用率过高: {index_usage}%")
        
        report['health_score'] = max(0, health_score)
        
        return report
    
    def export_report(self, report: Dict, filename: str = None):
        """导出报告"""
        if not filename:
            filename = f"mysql_cluster_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(report, f, ensure_ascii=False, indent=2)
        
        self.logger.info(f"报告已导出到: {filename}")

# 使用示例
if __name__ == '__main__':
    # 创建集群管理器
    manager = MySQLClusterManager("192.168.1.100")
    
    # 获取集群状态
    status = manager.get_cluster_status()
    print("集群状态:")
    print(json.dumps(status, ensure_ascii=False, indent=2))
    
    # 获取内存使用情况
    memory = manager.get_memory_usage()
    print("\n内存使用:")
    print(json.dumps(memory, ensure_ascii=False, indent=2))
    
    # 启动备份
    backup = manager.start_backup(wait=False)
    print("\n备份状态:")
    print(json.dumps(backup, ensure_ascii=False, indent=2))
    
    # 生成健康报告
    health_report = manager.generate_health_report()
    print("\n健康报告:")
    print(json.dumps(health_report, ensure_ascii=False, indent=2))
    
    # 导出报告
    manager.export_report(health_report)

13.5 代理层高可用

13.5.1 ProxySQL配置与管理

ProxySQL是一个高性能的MySQL代理,提供连接池、读写分离、故障转移等功能。

#!/bin/bash
# ProxySQL自动部署和配置脚本

set -euo pipefail

# 配置参数
PROXYSQL_VERSION="2.5.5"
PROXYSQL_ADMIN_USER="admin"
PROXYSQL_ADMIN_PASSWORD="admin_password"
PROXYSQL_MONITOR_USER="monitor"
PROXYSQL_MONITOR_PASSWORD="monitor_password"
PROXYSQL_PORT="6033"
PROXYSQL_ADMIN_PORT="6032"

# MySQL服务器配置
MYSQL_MASTER="192.168.1.100:3306"
MYSQL_SLAVES=("192.168.1.101:3306" "192.168.1.102:3306")
MYSQL_USER="proxysql_user"
MYSQL_PASSWORD="proxysql_password"

# 日志函数
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1"
}

# 错误处理
error_exit() {
    log "错误: $1"
    exit 1
}

# 安装ProxySQL
install_proxysql() {
    log "安装ProxySQL $PROXYSQL_VERSION"
    
    # 下载并安装ProxySQL
    if ! command -v proxysql &> /dev/null; then
        # Ubuntu/Debian
        if command -v apt-get &> /dev/null; then
            wget -O - 'https://repo.proxysql.com/ProxySQL/repo_pub_key' | apt-key add -
            echo deb https://repo.proxysql.com/ProxySQL/proxysql-2.5.x/$(lsb_release -sc)/ ./ | tee /etc/apt/sources.list.d/proxysql.list
            apt-get update
            apt-get install -y proxysql=$PROXYSQL_VERSION
        
        # CentOS/RHEL
        elif command -v yum &> /dev/null; then
            cat > /etc/yum.repos.d/proxysql.repo << EOF
[proxysql_repo]
name= ProxySQL YUM repository
baseurl=https://repo.proxysql.com/ProxySQL/proxysql-2.5.x/centos/\$releasever
gpgcheck=1
gpgkey=https://repo.proxysql.com/ProxySQL/repo_pub_key
EOF
            yum install -y proxysql-$PROXYSQL_VERSION
        else
            error_exit "不支持的操作系统"
        fi
    fi
    
    log "ProxySQL安装完成"
}

# 生成ProxySQL配置文件
generate_proxysql_config() {
    log "生成ProxySQL配置文件"
    
    cat > "/etc/proxysql.cnf" << EOF
datadir="/var/lib/proxysql"
errorlog="/var/lib/proxysql/proxysql.log"

admin_variables=
{
    admin_credentials="$PROXYSQL_ADMIN_USER:$PROXYSQL_ADMIN_PASSWORD"
    mysql_ifaces="0.0.0.0:$PROXYSQL_ADMIN_PORT"
    refresh_interval=2000
    debug=false
}

mysql_variables=
{
    threads=4
    max_connections=2048
    default_query_delay=0
    default_query_timeout=36000000
    have_compress=true
    poll_timeout=2000
    interfaces="0.0.0.0:$PROXYSQL_PORT"
    default_schema="information_schema"
    stacksize=1048576
    server_version="8.0.25"
    connect_timeout_server=3000
    monitor_username="$PROXYSQL_MONITOR_USER"
    monitor_password="$PROXYSQL_MONITOR_PASSWORD"
    monitor_history=600000
    monitor_connect_interval=60000
    monitor_ping_interval=10000
    monitor_read_only_interval=1500
    monitor_read_only_timeout=500
    ping_interval_server_msec=120000
    ping_timeout_server=500
    commands_stats=true
    sessions_sort=true
    connect_retries_on_failure=10
}

mysql_servers =
(
EOF

    # 添加主服务器
    local master_host=$(echo $MYSQL_MASTER | cut -d':' -f1)
    local master_port=$(echo $MYSQL_MASTER | cut -d':' -f2)
    
    cat >> "/etc/proxysql.cnf" << EOF
    {
        address="$master_host"
        port=$master_port
        hostgroup=0
        status="ONLINE"
        weight=1000
        compression=0
        max_replication_lag=10
        use_ssl=0
        max_latency_ms=0
        comment="Master Server"
    },
EOF

    # 添加从服务器
    local hostgroup_id=1
    for slave in "${MYSQL_SLAVES[@]}"; do
        local slave_host=$(echo $slave | cut -d':' -f1)
        local slave_port=$(echo $slave | cut -d':' -f2)
        
        cat >> "/etc/proxysql.cnf" << EOF
    {
        address="$slave_host"
        port=$slave_port
        hostgroup=$hostgroup_id
        status="ONLINE"
        weight=900
        compression=0
        max_replication_lag=10
        use_ssl=0
        max_latency_ms=0
        comment="Slave Server $hostgroup_id"
    },
EOF
        ((hostgroup_id++))
    done
    
    # 移除最后的逗号并关闭配置
    sed -i '$ s/,$//' "/etc/proxysql.cnf"
    cat >> "/etc/proxysql.cnf" << EOF
)

mysql_users:
(
    {
        username = "$MYSQL_USER"
        password = "$MYSQL_PASSWORD"
        default_hostgroup = 0
        max_connections=1000
        default_schema="test"
        active = 1
    }
)

mysql_query_rules:
(
    {
        rule_id=1
        active=1
        match_pattern="^SELECT.*FOR UPDATE$"
        destination_hostgroup=0
        apply=1
        comment="Send SELECT FOR UPDATE to master"
    },
    {
        rule_id=2
        active=1
        match_pattern="^SELECT"
        destination_hostgroup=1
        apply=1
        comment="Send SELECT to slaves"
    }
)

scheduler=
(
    {
        id=1
        active=1
        interval_ms=10000
        filename="/var/lib/proxysql/proxysql_galera_checker.sh"
        arg1="0"
        arg2="0"
        arg3="0"
        arg4="1"
        arg5="/var/lib/proxysql/proxysql_galera_checker.log"
        comment="Galera checker"
    }
)
EOF

    log "ProxySQL配置文件已生成"
}

# 配置MySQL用户权限
setup_mysql_users() {
    log "配置MySQL用户权限"
    
    # 在主服务器上创建用户
    local master_host=$(echo $MYSQL_MASTER | cut -d':' -f1)
    local master_port=$(echo $MYSQL_MASTER | cut -d':' -f2)
    
    mysql -h"$master_host" -P"$master_port" -uroot -p << EOF
-- 创建ProxySQL用户
CREATE USER IF NOT EXISTS '$MYSQL_USER'@'%' IDENTIFIED BY '$MYSQL_PASSWORD';
GRANT ALL PRIVILEGES ON *.* TO '$MYSQL_USER'@'%';

-- 创建监控用户
CREATE USER IF NOT EXISTS '$PROXYSQL_MONITOR_USER'@'%' IDENTIFIED BY '$PROXYSQL_MONITOR_PASSWORD';
GRANT REPLICATION CLIENT ON *.* TO '$PROXYSQL_MONITOR_USER'@'%';
GRANT SELECT ON performance_schema.* TO '$PROXYSQL_MONITOR_USER'@'%';

FLUSH PRIVILEGES;
EOF

    log "MySQL用户权限配置完成"
}

# 启动ProxySQL服务
start_proxysql() {
    log "启动ProxySQL服务"
    
    # 创建数据目录
    mkdir -p /var/lib/proxysql
    chown proxysql:proxysql /var/lib/proxysql
    
    # 启动服务
    systemctl enable proxysql
    systemctl start proxysql
    
    # 等待服务启动
    sleep 5
    
    # 检查服务状态
    if systemctl is-active --quiet proxysql; then
        log "ProxySQL服务启动成功"
    else
        error_exit "ProxySQL服务启动失败"
    fi
}

# 配置ProxySQL规则
configure_proxysql_rules() {
    log "配置ProxySQL规则"
    
    # 连接到ProxySQL管理接口
    mysql -h127.0.0.1 -P$PROXYSQL_ADMIN_PORT -u$PROXYSQL_ADMIN_USER -p$PROXYSQL_ADMIN_PASSWORD << 'EOF'
-- 加载配置到运行时
LOAD MYSQL SERVERS TO RUNTIME;
LOAD MYSQL USERS TO RUNTIME;
LOAD MYSQL QUERY RULES TO RUNTIME;
LOAD SCHEDULER TO RUNTIME;

-- 保存配置到磁盘
SAVE MYSQL SERVERS TO DISK;
SAVE MYSQL USERS TO DISK;
SAVE MYSQL QUERY RULES TO DISK;
SAVE SCHEDULER TO DISK;

-- 显示配置状态
SELECT * FROM mysql_servers;
SELECT * FROM mysql_users;
SELECT * FROM mysql_query_rules;
EOF

    log "ProxySQL规则配置完成"
}

# 生成健康检查脚本
generate_health_check() {
    log "生成健康检查脚本"
    
    cat > "/usr/local/bin/proxysql_health_check.sh" << 'EOF'
#!/bin/bash
# ProxySQL健康检查脚本

PROXYSQL_ADMIN_PORT="6032"
PROXYSQL_ADMIN_USER="admin"
PROXYSQL_ADMIN_PASSWORD="admin_password"
LOG_FILE="/var/log/proxysql_health.log"
ALERT_EMAIL="admin@example.com"

# 日志函数
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}

# 检查ProxySQL服务状态
check_proxysql_service() {
    if ! systemctl is-active --quiet proxysql; then
        log "ALERT: ProxySQL服务未运行"
        return 1
    fi
    return 0
}

# 检查MySQL服务器连接
check_mysql_servers() {
    local failed_servers=()
    
    # 获取服务器列表
    local servers=$(mysql -h127.0.0.1 -P$PROXYSQL_ADMIN_PORT -u$PROXYSQL_ADMIN_USER -p$PROXYSQL_ADMIN_PASSWORD -e "SELECT hostname,port,status FROM mysql_servers;" --skip-column-names 2>/dev/null)
    
    while IFS=$'\t' read -r hostname port status; do
        if [ "$status" != "ONLINE" ]; then
            failed_servers+=("$hostname:$port")
        fi
    done <<< "$servers"
    
    if [ ${#failed_servers[@]} -gt 0 ]; then
        log "ALERT: MySQL服务器离线: ${failed_servers[*]}"
        return 1
    fi
    
    return 0
}

# 检查连接池状态
check_connection_pool() {
    local pool_info=$(mysql -h127.0.0.1 -P$PROXYSQL_ADMIN_PORT -u$PROXYSQL_ADMIN_USER -p$PROXYSQL_ADMIN_PASSWORD -e "SELECT hostgroup,srv_host,srv_port,status,ConnUsed,ConnFree,ConnOK,ConnERR FROM stats_mysql_connection_pool;" --skip-column-names 2>/dev/null)
    
    local high_error_count=0
    
    while IFS=$'\t' read -r hostgroup srv_host srv_port status conn_used conn_free conn_ok conn_err; do
        if [ "$conn_err" -gt 10 ]; then
            log "WARNING: 服务器 $srv_host:$srv_port 连接错误数过高: $conn_err"
            ((high_error_count++))
        fi
    done <<< "$pool_info"
    
    if [ $high_error_count -gt 0 ]; then
        return 1
    fi
    
    return 0
}

# 检查查询性能
check_query_performance() {
    local slow_queries=$(mysql -h127.0.0.1 -P$PROXYSQL_ADMIN_PORT -u$PROXYSQL_ADMIN_USER -p$PROXYSQL_ADMIN_PASSWORD -e "SELECT COUNT(*) FROM stats_mysql_query_digest WHERE sum_time/count_star > 1000;" --skip-column-names 2>/dev/null)
    
    if [ "$slow_queries" -gt 5 ]; then
        log "WARNING: 检测到 $slow_queries 个慢查询"
        return 1
    fi
    
    return 0
}

# 发送告警
send_alert() {
    local subject="$1"
    local message="$2"
    
    echo "$message" | mail -s "$subject" "$ALERT_EMAIL" 2>/dev/null || {
        log "WARNING: 无法发送邮件告警"
        logger "ProxySQL Alert: $subject - $message"
    }
}

# 生成状态报告
generate_status_report() {
    local report_file="/tmp/proxysql_status_$(date +%Y%m%d_%H%M%S).txt"
    
    {
        echo "ProxySQL状态报告 - $(date)"
        echo "======================================"
        echo
        
        echo "服务器状态:"
        mysql -h127.0.0.1 -P$PROXYSQL_ADMIN_PORT -u$PROXYSQL_ADMIN_USER -p$PROXYSQL_ADMIN_PASSWORD -e "SELECT * FROM mysql_servers;"
        echo
        
        echo "连接池状态:"
        mysql -h127.0.0.1 -P$PROXYSQL_ADMIN_PORT -u$PROXYSQL_ADMIN_USER -p$PROXYSQL_ADMIN_PASSWORD -e "SELECT * FROM stats_mysql_connection_pool;"
        echo
        
        echo "查询统计:"
        mysql -h127.0.0.1 -P$PROXYSQL_ADMIN_PORT -u$PROXYSQL_ADMIN_USER -p$PROXYSQL_ADMIN_PASSWORD -e "SELECT schemaname,username,digest_text,count_star,sum_time,min_time,max_time FROM stats_mysql_query_digest ORDER BY sum_time DESC LIMIT 10;"
        
    } > "$report_file"
    
    log "状态报告已生成: $report_file"
}

# 主函数
main() {
    case "${1:-check}" in
        check)
            log "开始ProxySQL健康检查"
            
            local errors=0
            
            check_proxysql_service || ((errors++))
            check_mysql_servers || ((errors++))
            check_connection_pool || ((errors++))
            check_query_performance || ((errors++))
            
            if [ $errors -eq 0 ]; then
                log "ProxySQL状态正常"
            else
                log "检测到 $errors 个问题"
            fi
            ;;
        report)
            generate_status_report
            ;;
        monitor)
            while true; do
                main check
                sleep 60
            done
            ;;
        *)
            echo "用法: $0 {check|report|monitor}"
            echo "  check   - 检查ProxySQL状态"
            echo "  report  - 生成状态报告"
            echo "  monitor - 持续监控"
            exit 1
            ;;
    esac
}

main "$@"
EOF

    chmod +x "/usr/local/bin/proxysql_health_check.sh"
    
    log "健康检查脚本已生成"
}

# 测试ProxySQL连接
test_proxysql_connection() {
    log "测试ProxySQL连接"
    
    # 测试读写分离
    echo "测试写操作(应该路由到主服务器):"
    mysql -h127.0.0.1 -P$PROXYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD -e "CREATE DATABASE IF NOT EXISTS proxysql_test; USE proxysql_test; CREATE TABLE IF NOT EXISTS test_table (id INT AUTO_INCREMENT PRIMARY KEY, data VARCHAR(100)); INSERT INTO test_table (data) VALUES ('ProxySQL Test');"
    
    echo "测试读操作(应该路由到从服务器):"
    mysql -h127.0.0.1 -P$PROXYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD -e "USE proxysql_test; SELECT * FROM test_table;"
    
    # 检查路由统计
    echo "查询路由统计:"
    mysql -h127.0.0.1 -P$PROXYSQL_ADMIN_PORT -u$PROXYSQL_ADMIN_USER -p$PROXYSQL_ADMIN_PASSWORD -e "SELECT hostgroup,schemaname,username,digest_text,count_star FROM stats_mysql_query_digest ORDER BY count_star DESC LIMIT 5;"
    
    log "ProxySQL连接测试完成"
}

# 主函数
main() {
    case "${1:-help}" in
        install)
            log "开始ProxySQL部署"
            
            install_proxysql
            generate_proxysql_config
            setup_mysql_users
            start_proxysql
            configure_proxysql_rules
            generate_health_check
            test_proxysql_connection
            
            log "ProxySQL部署完成"
            echo
            echo "连接信息:"
            echo "  应用连接: mysql -h127.0.0.1 -P$PROXYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD"
            echo "  管理连接: mysql -h127.0.0.1 -P$PROXYSQL_ADMIN_PORT -u$PROXYSQL_ADMIN_USER -p$PROXYSQL_ADMIN_PASSWORD"
            echo "  健康检查: /usr/local/bin/proxysql_health_check.sh"
            ;;
        test)
            test_proxysql_connection
            ;;
        health)
            /usr/local/bin/proxysql_health_check.sh check
            ;;
        help|*)
            echo "ProxySQL部署脚本"
            echo
            echo "用法: $0 [命令]"
            echo
            echo "命令:"
            echo "  install  - 完整部署ProxySQL"
            echo "  test     - 测试连接和读写分离"
            echo "  health   - 健康检查"
            echo "  help     - 显示此帮助信息"
            echo
            echo "配置说明:"
            echo "  ProxySQL端口: $PROXYSQL_PORT"
            echo "  管理端口: $PROXYSQL_ADMIN_PORT"
            echo "  主服务器: $MYSQL_MASTER"
            echo "  从服务器: ${MYSQL_SLAVES[*]}"
            ;;
    esac
}

main "$@"

13.5.2 HAProxy + Keepalived高可用

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
HAProxy + Keepalived MySQL高可用解决方案
提供负载均衡、故障转移和VIP管理
"""

import subprocess
import json
import time
import logging
import socket
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum

class ServerStatus(Enum):
    """服务器状态"""
    UP = "UP"
    DOWN = "DOWN"
    MAINT = "MAINT"
    NOLB = "NOLB"

class ServerRole(Enum):
    """服务器角色"""
    MASTER = "master"
    SLAVE = "slave"
    BACKUP = "backup"

@dataclass
class MySQLServer:
    """MySQL服务器信息"""
    name: str
    host: str
    port: int
    role: ServerRole
    weight: int = 100
    status: ServerStatus = ServerStatus.UP
    check_interval: int = 2000
    max_connections: int = 1000

class HAProxyMySQLManager:
    """HAProxy MySQL高可用管理器"""
    
    def __init__(self, config_file: str = "/etc/haproxy/haproxy.cfg"):
        self.config_file = config_file
        self.stats_socket = "/var/run/haproxy/admin.sock"
        self.vip = "192.168.1.200"
        self.mysql_port = 3306
        self.stats_port = 8404
        
        # 配置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('/var/log/haproxy_mysql_manager.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def generate_haproxy_config(self, servers: List[MySQLServer]) -> str:
        """生成HAProxy配置"""
        config = f"""
global
    log stdout local0
    chroot /var/lib/haproxy
    stats socket {self.stats_socket} mode 660 level admin
    stats timeout 30s
    user haproxy
    group haproxy
    daemon
    
    # SSL配置
    ssl-default-bind-ciphers ECDHE+AESGCM:ECDHE+CHACHA20:RSA+AESGCM:RSA+AES:!aNULL:!MD5:!DSS
    ssl-default-bind-options ssl-min-ver TLSv1.2 no-tls-tickets

defaults
    mode tcp
    log global
    option tcplog
    option dontlognull
    retries 3
    timeout queue 1m
    timeout connect 10s
    timeout client 1m
    timeout server 1m
    timeout check 10s
    maxconn 3000

# 统计页面
listen stats
    bind *:{self.stats_port}
    mode http
    stats enable
    stats uri /stats
    stats realm HAProxy\ Statistics
    stats auth admin:admin123
    stats refresh 30s
    stats show-node
    stats show-legends
    stats admin if TRUE

# MySQL写服务(主服务器)
listen mysql-write
    bind {self.vip}:{self.mysql_port}
    mode tcp
    balance roundrobin
    option mysql-check user haproxy_check
    """
        
        # 添加主服务器
        for server in servers:
            if server.role == ServerRole.MASTER:
                config += f"""
    server {server.name} {server.host}:{server.port} check weight {server.weight} maxconn {server.max_connections} inter {server.check_interval}ms"""
        
        config += f"""

# MySQL读服务(从服务器)
listen mysql-read
    bind {self.vip}:3307
    mode tcp
    balance roundrobin
    option mysql-check user haproxy_check
    """
        
        # 添加从服务器
        for server in servers:
            if server.role == ServerRole.SLAVE:
                config += f"""
    server {server.name} {server.host}:{server.port} check weight {server.weight} maxconn {server.max_connections} inter {server.check_interval}ms"""
        
        # 添加备份服务器
        for server in servers:
            if server.role == ServerRole.BACKUP:
                config += f"""
    server {server.name} {server.host}:{server.port} check weight {server.weight} maxconn {server.max_connections} inter {server.check_interval}ms backup"""
        
        return config
    
    def generate_keepalived_config(self, interface: str = "eth0", priority: int = 100, is_master: bool = True) -> str:
        """生成Keepalived配置"""
        state = "MASTER" if is_master else "BACKUP"
        
        config = f"""
global_defs {{
    notification_email {{
        admin@example.com
    }}
    notification_email_from keepalived@example.com
    smtp_server localhost
    smtp_connect_timeout 30
    router_id LVS_MYSQL
    vrrp_skip_check_adv_addr
    vrrp_strict
    vrrp_garp_interval 0
    vrrp_gna_interval 0
}}

vrrp_script chk_haproxy {{
    script "/usr/local/bin/check_haproxy.sh"
    interval 2
    weight -2
    fall 3
    rise 2
}}

vrrp_instance VI_1 {{
    state {state}
    interface {interface}
    virtual_router_id 51
    priority {priority}
    advert_int 1
    authentication {{
        auth_type PASS
        auth_pass mysql_ha_pass
    }}
    virtual_ipaddress {{
        {self.vip}
    }}
    track_script {{
        chk_haproxy
    }}
    notify_master "/usr/local/bin/notify_master.sh"
    notify_backup "/usr/local/bin/notify_backup.sh"
    notify_fault "/usr/local/bin/notify_fault.sh"
}}
"""
        
        return config
    
    def create_health_check_scripts(self):
        """创建健康检查脚本"""
        # HAProxy健康检查脚本
        haproxy_check_script = f"""
#!/bin/bash
# HAProxy健康检查脚本

if ! pgrep haproxy > /dev/null; then
    exit 1
fi

# 检查HAProxy统计接口
if ! echo "show info" | socat stdio {self.stats_socket} > /dev/null 2>&1; then
    exit 1
fi

exit 0
"""
        
        with open("/usr/local/bin/check_haproxy.sh", "w") as f:
            f.write(haproxy_check_script)
        subprocess.run(["chmod", "+x", "/usr/local/bin/check_haproxy.sh"])
        
        # 主节点通知脚本
        master_notify_script = f"""
#!/bin/bash
# 主节点通知脚本

echo "$(date): 成为主节点" >> /var/log/keepalived-notify.log

# 发送邮件通知
echo "HAProxy MySQL HA: 节点 $(hostname) 成为主节点" | mail -s "MySQL HA Master" admin@example.com

# 记录系统日志
logger "HAProxy MySQL HA: 节点 $(hostname) 成为主节点"
"""
        
        with open("/usr/local/bin/notify_master.sh", "w") as f:
            f.write(master_notify_script)
        subprocess.run(["chmod", "+x", "/usr/local/bin/notify_master.sh"])
        
        # 备份节点通知脚本
        backup_notify_script = f"""
#!/bin/bash
# 备份节点通知脚本

echo "$(date): 成为备份节点" >> /var/log/keepalived-notify.log

# 发送邮件通知
echo "HAProxy MySQL HA: 节点 $(hostname) 成为备份节点" | mail -s "MySQL HA Backup" admin@example.com

# 记录系统日志
logger "HAProxy MySQL HA: 节点 $(hostname) 成为备份节点"
"""
        
        with open("/usr/local/bin/notify_backup.sh", "w") as f:
            f.write(backup_notify_script)
        subprocess.run(["chmod", "+x", "/usr/local/bin/notify_backup.sh"])
        
        # 故障通知脚本
        fault_notify_script = f"""
#!/bin/bash
# 故障通知脚本

echo "$(date): 节点故障" >> /var/log/keepalived-notify.log

# 发送邮件通知
echo "HAProxy MySQL HA: 节点 $(hostname) 发生故障" | mail -s "MySQL HA Fault" admin@example.com

# 记录系统日志
logger "HAProxy MySQL HA: 节点 $(hostname) 发生故障"
"""
        
        with open("/usr/local/bin/notify_fault.sh", "w") as f:
            f.write(fault_notify_script)
        subprocess.run(["chmod", "+x", "/usr/local/bin/notify_fault.sh"])
        
        self.logger.info("健康检查脚本已创建")
    
    def get_haproxy_stats(self) -> Dict:
        """获取HAProxy统计信息"""
        try:
            # 获取基本信息
            result = subprocess.run(
                ["echo", "show info"],
                stdout=subprocess.PIPE,
                text=True
            )
            
            info_output = subprocess.run(
                ["socat", "stdio", self.stats_socket],
                input=result.stdout,
                capture_output=True,
                text=True
            )
            
            # 获取服务器状态
            stat_result = subprocess.run(
                ["echo", "show stat"],
                stdout=subprocess.PIPE,
                text=True
            )
            
            stat_output = subprocess.run(
                ["socat", "stdio", self.stats_socket],
                input=stat_result.stdout,
                capture_output=True,
                text=True
            )
            
            stats = {
                'timestamp': datetime.now().isoformat(),
                'info': self._parse_haproxy_info(info_output.stdout),
                'servers': self._parse_haproxy_stats(stat_output.stdout)
            }
            
            return stats
        
        except Exception as e:
            self.logger.error(f"获取HAProxy统计信息失败: {e}")
            return {'error': str(e)}
    
    def _parse_haproxy_info(self, info_output: str) -> Dict:
        """解析HAProxy信息"""
        info = {}
        for line in info_output.strip().split('\n'):
            if ':' in line:
                key, value = line.split(':', 1)
                info[key.strip()] = value.strip()
        return info
    
    def _parse_haproxy_stats(self, stat_output: str) -> List[Dict]:
        """解析HAProxy统计信息"""
        servers = []
        lines = stat_output.strip().split('\n')
        
        if not lines:
            return servers
        
        # 第一行是标题
        headers = lines[0].split(',')
        
        for line in lines[1:]:
            if line.strip():
                values = line.split(',')
                server_info = {}
                
                for i, header in enumerate(headers):
                    if i < len(values):
                        server_info[header] = values[i]
                
                servers.append(server_info)
        
        return servers
    
    def enable_server(self, backend: str, server: str) -> bool:
        """启用服务器"""
        try:
            command = f"enable server {backend}/{server}"
            result = subprocess.run(
                ["echo", command],
                stdout=subprocess.PIPE,
                text=True
            )
            
            subprocess.run(
                ["socat", "stdio", self.stats_socket],
                input=result.stdout,
                capture_output=True,
                text=True
            )
            
            self.logger.info(f"服务器 {backend}/{server} 已启用")
            return True
        
        except Exception as e:
            self.logger.error(f"启用服务器失败: {e}")
            return False
    
    def disable_server(self, backend: str, server: str) -> bool:
        """禁用服务器"""
        try:
            command = f"disable server {backend}/{server}"
            result = subprocess.run(
                ["echo", command],
                stdout=subprocess.PIPE,
                text=True
            )
            
            subprocess.run(
                ["socat", "stdio", self.stats_socket],
                input=result.stdout,
                capture_output=True,
                text=True
            )
            
            self.logger.info(f"服务器 {backend}/{server} 已禁用")
            return True
        
        except Exception as e:
            self.logger.error(f"禁用服务器失败: {e}")
            return False
    
    def check_mysql_connectivity(self, host: str, port: int, timeout: int = 5) -> bool:
        """检查MySQL连接性"""
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(timeout)
            result = sock.connect_ex((host, port))
            sock.close()
            return result == 0
        
        except Exception:
            return False
    
    def monitor_servers(self, servers: List[MySQLServer]) -> Dict:
        """监控服务器状态"""
        monitoring_result = {
            'timestamp': datetime.now().isoformat(),
            'servers': [],
            'alerts': []
        }
        
        for server in servers:
            is_online = self.check_mysql_connectivity(server.host, server.port)
            
            server_status = {
                'name': server.name,
                'host': server.host,
                'port': server.port,
                'role': server.role.value,
                'online': is_online,
                'status': ServerStatus.UP.value if is_online else ServerStatus.DOWN.value
            }
            
            monitoring_result['servers'].append(server_status)
            
            # 生成告警
            if not is_online:
                alert = {
                    'level': 'CRITICAL',
                    'message': f"MySQL服务器 {server.name} ({server.host}:{server.port}) 离线",
                    'timestamp': datetime.now().isoformat()
                }
                monitoring_result['alerts'].append(alert)
        
        return monitoring_result
    
    def generate_deployment_script(self, servers: List[MySQLServer], interface: str = "eth0") -> str:
        """生成部署脚本"""
        script = f"""
#!/bin/bash
# HAProxy + Keepalived MySQL高可用部署脚本

set -euo pipefail

# 安装软件包
install_packages() {{
    echo "安装HAProxy和Keepalived..."
    
    if command -v apt-get &> /dev/null; then
        apt-get update
        apt-get install -y haproxy keepalived socat mailutils
    elif command -v yum &> /dev/null; then
        yum install -y haproxy keepalived socat mailx
    else
        echo "不支持的操作系统"
        exit 1
    fi
}}

# 配置HAProxy
configure_haproxy() {{
    echo "配置HAProxy..."
    
    # 备份原配置
    cp /etc/haproxy/haproxy.cfg /etc/haproxy/haproxy.cfg.bak
    
    # 写入新配置
    cat > /etc/haproxy/haproxy.cfg << 'EOF'
{self.generate_haproxy_config(servers)}
EOF
    
    # 创建HAProxy用户(用于MySQL健康检查)
    echo "创建HAProxy MySQL检查用户..."
    """
        
        # 为每个MySQL服务器创建检查用户
        for server in servers:
            script += f"""
    mysql -h{server.host} -P{server.port} -uroot -p << 'SQL'
CREATE USER IF NOT EXISTS 'haproxy_check'@'%';
FLUSH PRIVILEGES;
SQL
"""
        
        script += f"""
    
    # 启动HAProxy
    systemctl enable haproxy
    systemctl start haproxy
}}

# 配置Keepalived(主节点)
configure_keepalived_master() {{
    echo "配置Keepalived主节点..."
    
    cat > /etc/keepalived/keepalived.conf << 'EOF'
{self.generate_keepalived_config(interface, 100, True)}
EOF
    
    systemctl enable keepalived
    systemctl start keepalived
}}

# 配置Keepalived(备份节点)
configure_keepalived_backup() {{
    echo "配置Keepalived备份节点..."
    
    cat > /etc/keepalived/keepalived.conf << 'EOF'
{self.generate_keepalived_config(interface, 90, False)}
EOF
    
    systemctl enable keepalived
    systemctl start keepalived
}}

# 主函数
main() {{
    case "${{1:-help}}" in
        master)
            install_packages
            configure_haproxy
            configure_keepalived_master
            echo "主节点配置完成"
            echo "VIP: {self.vip}"
            echo "MySQL写端口: {self.mysql_port}"
            echo "MySQL读端口: 3307"
            echo "统计页面: http://$(hostname -I | awk '{{print $1}}'):{self.stats_port}/stats"
            ;;
        backup)
            install_packages
            configure_haproxy
            configure_keepalived_backup
            echo "备份节点配置完成"
            ;;
        help|*)
            echo "HAProxy + Keepalived MySQL高可用部署脚本"
            echo
            echo "用法: $0 [master|backup]"
            echo "  master  - 配置为主节点"
            echo "  backup  - 配置为备份节点"
            ;;
    esac
}}

main "$@"
"""
        
        return script
    
    def export_config(self, servers: List[MySQLServer], output_dir: str = "/tmp"):
        """导出配置文件"""
        # 导出HAProxy配置
        haproxy_config = self.generate_haproxy_config(servers)
        with open(f"{output_dir}/haproxy.cfg", "w") as f:
            f.write(haproxy_config)
        
        # 导出Keepalived配置
        keepalived_master_config = self.generate_keepalived_config("eth0", 100, True)
        with open(f"{output_dir}/keepalived_master.conf", "w") as f:
            f.write(keepalived_master_config)
        
        keepalived_backup_config = self.generate_keepalived_config("eth0", 90, False)
        with open(f"{output_dir}/keepalived_backup.conf", "w") as f:
            f.write(keepalived_backup_config)
        
        # 导出部署脚本
        deployment_script = self.generate_deployment_script(servers)
        with open(f"{output_dir}/deploy_ha.sh", "w") as f:
            f.write(deployment_script)
        subprocess.run(["chmod", "+x", f"{output_dir}/deploy_ha.sh"])
        
        self.logger.info(f"配置文件已导出到: {output_dir}")

# 使用示例
if __name__ == '__main__':
    # 定义MySQL服务器
    servers = [
        MySQLServer("mysql-master", "192.168.1.100", 3306, ServerRole.MASTER, 1000),
        MySQLServer("mysql-slave1", "192.168.1.101", 3306, ServerRole.SLAVE, 900),
        MySQLServer("mysql-slave2", "192.168.1.102", 3306, ServerRole.SLAVE, 900),
        MySQLServer("mysql-backup", "192.168.1.103", 3306, ServerRole.BACKUP, 800)
    ]
    
    # 创建管理器
    manager = HAProxyMySQLManager()
    
    # 创建健康检查脚本
    manager.create_health_check_scripts()
    
    # 监控服务器
    monitoring_result = manager.monitor_servers(servers)
    print("服务器监控结果:")
    print(json.dumps(monitoring_result, ensure_ascii=False, indent=2))
    
    # 获取HAProxy统计信息
    stats = manager.get_haproxy_stats()
    print("\nHAProxy统计信息:")
    print(json.dumps(stats, ensure_ascii=False, indent=2))
    
    # 导出配置
    manager.export_config(servers)
    print("\n配置文件已导出到 /tmp 目录")

13.6 云数据库高可用

13.6.1 云数据库服务对比

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
云数据库高可用服务对比与选择
支持AWS RDS、阿里云RDS、腾讯云CDB等主流云服务
"""

import json
import boto3
import requests
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum

class CloudProvider(Enum):
    """云服务提供商"""
    AWS = "aws"
    ALIYUN = "aliyun"
    TENCENT = "tencent"
    AZURE = "azure"
    GCP = "gcp"

class HALevel(Enum):
    """高可用等级"""
    BASIC = "basic"          # 基础版
    STANDARD = "standard"    # 标准版
    ENTERPRISE = "enterprise" # 企业版
    ULTIMATE = "ultimate"    # 旗舰版

@dataclass
class CloudDBInstance:
    """云数据库实例信息"""
    provider: CloudProvider
    instance_id: str
    instance_class: str
    engine: str
    engine_version: str
    storage_type: str
    storage_size: int
    ha_level: HALevel
    multi_az: bool
    backup_retention: int
    region: str
    availability_zones: List[str]
    endpoint: str
    port: int
    status: str
    created_time: datetime
    monthly_cost: float

class CloudDatabaseManager:
    """云数据库高可用管理器"""
    
    def __init__(self):
        self.providers_config = {
            CloudProvider.AWS: {
                'regions': ['us-east-1', 'us-west-2', 'eu-west-1', 'ap-southeast-1'],
                'instance_classes': ['db.t3.micro', 'db.t3.small', 'db.m5.large', 'db.r5.xlarge'],
                'storage_types': ['gp2', 'gp3', 'io1', 'io2'],
                'engines': ['mysql', 'mariadb', 'aurora-mysql']
            },
            CloudProvider.ALIYUN: {
                'regions': ['cn-hangzhou', 'cn-beijing', 'cn-shanghai', 'cn-shenzhen'],
                'instance_classes': ['mysql.n1.micro.1', 'mysql.n2.small.1', 'mysql.n4.medium.1'],
                'storage_types': ['cloud_essd', 'cloud_ssd', 'local_ssd'],
                'engines': ['MySQL', 'MariaDB']
            },
            CloudProvider.TENCENT: {
                'regions': ['ap-beijing', 'ap-shanghai', 'ap-guangzhou', 'ap-chengdu'],
                'instance_classes': ['cdb.s1.small', 'cdb.s2.large', 'cdb.s3.medium'],
                'storage_types': ['CLOUD_SSD', 'CLOUD_HSSD', 'LOCAL_NVME'],
                'engines': ['MySQL', 'MariaDB']
            }
        }
    
    def compare_providers(self, requirements: Dict) -> Dict:
        """对比云服务提供商"""
        comparison = {
            'timestamp': datetime.now().isoformat(),
            'requirements': requirements,
            'providers': {}
        }
        
        for provider in CloudProvider:
            provider_info = self._analyze_provider(provider, requirements)
            comparison['providers'][provider.value] = provider_info
        
        # 生成推荐
        recommendation = self._generate_recommendation(comparison)
        comparison['recommendation'] = recommendation
        
        return comparison
    
    def _analyze_provider(self, provider: CloudProvider, requirements: Dict) -> Dict:
        """分析单个云服务提供商"""
        config = self.providers_config.get(provider, {})
        
        analysis = {
            'provider': provider.value,
            'features': self._get_provider_features(provider),
            'pricing': self._estimate_pricing(provider, requirements),
            'availability': self._get_availability_info(provider),
            'performance': self._get_performance_metrics(provider),
            'compliance': self._get_compliance_info(provider),
            'support': self._get_support_info(provider),
            'score': 0
        }
        
        # 计算综合评分
        analysis['score'] = self._calculate_score(analysis, requirements)
        
        return analysis
    
    def _get_provider_features(self, provider: CloudProvider) -> Dict:
        """获取云服务提供商特性"""
        features = {
            CloudProvider.AWS: {
                'multi_az': True,
                'read_replicas': True,
                'automated_backup': True,
                'point_in_time_recovery': True,
                'encryption': True,
                'monitoring': 'CloudWatch',
                'auto_scaling': True,
                'cross_region_replication': True,
                'aurora_serverless': True,
                'global_database': True
            },
            CloudProvider.ALIYUN: {
                'multi_az': True,
                'read_replicas': True,
                'automated_backup': True,
                'point_in_time_recovery': True,
                'encryption': True,
                'monitoring': 'CloudMonitor',
                'auto_scaling': True,
                'cross_region_replication': True,
                'polardb': True,
                'global_database': False
            },
            CloudProvider.TENCENT: {
                'multi_az': True,
                'read_replicas': True,
                'automated_backup': True,
                'point_in_time_recovery': True,
                'encryption': True,
                'monitoring': 'Cloud Monitor',
                'auto_scaling': True,
                'cross_region_replication': True,
                'tdsql': True,
                'global_database': False
            }
        }
        
        return features.get(provider, {})
    
    def _estimate_pricing(self, provider: CloudProvider, requirements: Dict) -> Dict:
        """估算定价"""
        # 基础定价模型(示例数据)
        base_pricing = {
            CloudProvider.AWS: {
                'instance_hourly': 0.017,  # db.t3.micro
                'storage_gb_monthly': 0.115,  # gp2
                'backup_gb_monthly': 0.095,
                'data_transfer_gb': 0.09
            },
            CloudProvider.ALIYUN: {
                'instance_hourly': 0.015,
                'storage_gb_monthly': 0.10,
                'backup_gb_monthly': 0.08,
                'data_transfer_gb': 0.08
            },
            CloudProvider.TENCENT: {
                'instance_hourly': 0.014,
                'storage_gb_monthly': 0.09,
                'backup_gb_monthly': 0.07,
                'data_transfer_gb': 0.07
            }
        }
        
        pricing = base_pricing.get(provider, {})
        
        # 计算月度成本
        storage_size = requirements.get('storage_gb', 100)
        backup_size = storage_size * requirements.get('backup_retention_days', 7) / 30
        
        monthly_cost = {
            'instance': pricing.get('instance_hourly', 0) * 24 * 30,
            'storage': pricing.get('storage_gb_monthly', 0) * storage_size,
            'backup': pricing.get('backup_gb_monthly', 0) * backup_size,
            'data_transfer': pricing.get('data_transfer_gb', 0) * requirements.get('data_transfer_gb', 10)
        }
        
        monthly_cost['total'] = sum(monthly_cost.values())
        
        return {
            'monthly_cost': monthly_cost,
            'annual_cost': monthly_cost['total'] * 12,
            'currency': 'USD'
        }
    
    def _get_availability_info(self, provider: CloudProvider) -> Dict:
        """获取可用性信息"""
        availability_info = {
            CloudProvider.AWS: {
                'sla': 99.95,
                'rto': '< 5 minutes',
                'rpo': '< 5 minutes',
                'multi_az_failover': 'Automatic',
                'backup_frequency': 'Continuous',
                'regions': 25,
                'availability_zones': 80
            },
            CloudProvider.ALIYUN: {
                'sla': 99.95,
                'rto': '< 5 minutes',
                'rpo': '< 5 minutes',
                'multi_az_failover': 'Automatic',
                'backup_frequency': 'Continuous',
                'regions': 20,
                'availability_zones': 60
            },
            CloudProvider.TENCENT: {
                'sla': 99.95,
                'rto': '< 5 minutes',
                'rpo': '< 5 minutes',
                'multi_az_failover': 'Automatic',
                'backup_frequency': 'Continuous',
                'regions': 15,
                'availability_zones': 45
            }
        }
        
        return availability_info.get(provider, {})
    
    def _get_performance_metrics(self, provider: CloudProvider) -> Dict:
        """获取性能指标"""
        performance = {
            CloudProvider.AWS: {
                'max_iops': 80000,
                'max_throughput_mbps': 2000,
                'max_connections': 16000,
                'cpu_credits': True,
                'burst_performance': True,
                'read_replica_lag': '< 100ms'
            },
            CloudProvider.ALIYUN: {
                'max_iops': 50000,
                'max_throughput_mbps': 1500,
                'max_connections': 12000,
                'cpu_credits': True,
                'burst_performance': True,
                'read_replica_lag': '< 100ms'
            },
            CloudProvider.TENCENT: {
                'max_iops': 40000,
                'max_throughput_mbps': 1200,
                'max_connections': 10000,
                'cpu_credits': True,
                'burst_performance': True,
                'read_replica_lag': '< 100ms'
            }
        }
        
        return performance.get(provider, {})
    
    def _get_compliance_info(self, provider: CloudProvider) -> Dict:
        """获取合规信息"""
        compliance = {
            CloudProvider.AWS: {
                'certifications': ['SOC 1/2/3', 'PCI DSS', 'ISO 27001', 'HIPAA', 'FedRAMP'],
                'data_residency': True,
                'encryption_at_rest': True,
                'encryption_in_transit': True,
                'key_management': 'AWS KMS',
                'audit_logging': True
            },
            CloudProvider.ALIYUN: {
                'certifications': ['ISO 27001', 'SOC 1/2', 'PCI DSS', 'CSA STAR'],
                'data_residency': True,
                'encryption_at_rest': True,
                'encryption_in_transit': True,
                'key_management': 'KMS',
                'audit_logging': True
            },
            CloudProvider.TENCENT: {
                'certifications': ['ISO 27001', 'SOC 2', 'PCI DSS'],
                'data_residency': True,
                'encryption_at_rest': True,
                'encryption_in_transit': True,
                'key_management': 'KMS',
                'audit_logging': True
            }
        }
        
        return compliance.get(provider, {})
    
    def _get_support_info(self, provider: CloudProvider) -> Dict:
        """获取支持信息"""
        support = {
            CloudProvider.AWS: {
                'support_levels': ['Basic', 'Developer', 'Business', 'Enterprise'],
                'response_time': '< 1 hour (Business)',
                'documentation_quality': 'Excellent',
                'community_support': 'Very Active',
                'professional_services': True,
                'training_resources': 'Extensive'
            },
            CloudProvider.ALIYUN: {
                'support_levels': ['Basic', 'Business', 'Enterprise'],
                'response_time': '< 2 hours (Business)',
                'documentation_quality': 'Good',
                'community_support': 'Active',
                'professional_services': True,
                'training_resources': 'Good'
            },
            CloudProvider.TENCENT: {
                'support_levels': ['Basic', 'Business', 'Enterprise'],
                'response_time': '< 4 hours (Business)',
                'documentation_quality': 'Good',
                'community_support': 'Moderate',
                'professional_services': True,
                'training_resources': 'Moderate'
            }
        }
        
        return support.get(provider, {})
    
    def _calculate_score(self, analysis: Dict, requirements: Dict) -> float:
        """计算综合评分"""
        weights = {
            'pricing': 0.25,
            'availability': 0.30,
            'performance': 0.20,
            'compliance': 0.15,
            'support': 0.10
        }
        
        scores = {
            'pricing': self._score_pricing(analysis['pricing'], requirements),
            'availability': self._score_availability(analysis['availability']),
            'performance': self._score_performance(analysis['performance']),
            'compliance': self._score_compliance(analysis['compliance']),
            'support': self._score_support(analysis['support'])
        }
        
        total_score = sum(score * weights[category] for category, score in scores.items())
        return round(total_score, 2)
    
    def _score_pricing(self, pricing: Dict, requirements: Dict) -> float:
        """评分定价"""
        budget = requirements.get('monthly_budget', 1000)
        actual_cost = pricing['monthly_cost']['total']
        
        if actual_cost <= budget * 0.5:
            return 100
        elif actual_cost <= budget * 0.75:
            return 80
        elif actual_cost <= budget:
            return 60
        elif actual_cost <= budget * 1.25:
            return 40
        else:
            return 20
    
    def _score_availability(self, availability: Dict) -> float:
        """评分可用性"""
        sla = availability.get('sla', 99.0)
        if sla >= 99.99:
            return 100
        elif sla >= 99.95:
            return 90
        elif sla >= 99.9:
            return 80
        elif sla >= 99.5:
            return 70
        else:
            return 60
    
    def _score_performance(self, performance: Dict) -> float:
        """评分性能"""
        max_iops = performance.get('max_iops', 0)
        if max_iops >= 50000:
            return 100
        elif max_iops >= 30000:
            return 80
        elif max_iops >= 10000:
            return 60
        else:
            return 40
    
    def _score_compliance(self, compliance: Dict) -> float:
        """评分合规性"""
        cert_count = len(compliance.get('certifications', []))
        if cert_count >= 5:
            return 100
        elif cert_count >= 3:
            return 80
        elif cert_count >= 2:
            return 60
        else:
            return 40
    
    def _score_support(self, support: Dict) -> float:
        """评分支持"""
        support_levels = len(support.get('support_levels', []))
        doc_quality = support.get('documentation_quality', 'Poor')
        
        base_score = support_levels * 20
        
        if doc_quality == 'Excellent':
            base_score += 20
        elif doc_quality == 'Good':
            base_score += 10
        
        return min(base_score, 100)
    
    def _generate_recommendation(self, comparison: Dict) -> Dict:
        """生成推荐"""
        providers = comparison['providers']
        
        # 按评分排序
        sorted_providers = sorted(
            providers.items(),
            key=lambda x: x[1]['score'],
            reverse=True
        )
        
        recommendation = {
            'best_overall': sorted_providers[0][0],
            'best_value': self._find_best_value(providers),
            'best_performance': self._find_best_performance(providers),
            'best_compliance': self._find_best_compliance(providers),
            'ranking': [provider[0] for provider in sorted_providers],
            'summary': self._generate_summary(sorted_providers)
        }
        
        return recommendation
    
    def _find_best_value(self, providers: Dict) -> str:
        """找到最佳性价比"""
        best_value = None
        best_ratio = 0
        
        for provider_name, provider_data in providers.items():
            score = provider_data['score']
            cost = provider_data['pricing']['monthly_cost']['total']
            ratio = score / cost if cost > 0 else 0
            
            if ratio > best_ratio:
                best_ratio = ratio
                best_value = provider_name
        
        return best_value
    
    def _find_best_performance(self, providers: Dict) -> str:
        """找到最佳性能"""
        best_performance = None
        best_iops = 0
        
        for provider_name, provider_data in providers.items():
            iops = provider_data['performance']['max_iops']
            if iops > best_iops:
                best_iops = iops
                best_performance = provider_name
        
        return best_performance
    
    def _find_best_compliance(self, providers: Dict) -> str:
        """找到最佳合规性"""
        best_compliance = None
        best_cert_count = 0
        
        for provider_name, provider_data in providers.items():
            cert_count = len(provider_data['compliance']['certifications'])
            if cert_count > best_cert_count:
                best_cert_count = cert_count
                best_compliance = provider_name
        
        return best_compliance
    
    def _generate_summary(self, sorted_providers: List) -> str:
        """生成总结"""
        best_provider = sorted_providers[0]
        
        summary = f"""
基于综合评分,推荐使用 {best_provider[0].upper()} 作为云数据库服务提供商。

主要优势:
- 综合评分:{best_provider[1]['score']}/100
- SLA:{best_provider[1]['availability']['sla']}%
- 月度成本:${best_provider[1]['pricing']['monthly_cost']['total']:.2f}
- 最大IOPS:{best_provider[1]['performance']['max_iops']:,}

建议在选择时还需要考虑:
1. 数据本地化要求
2. 现有技术栈兼容性
3. 团队技术能力
4. 长期发展规划
"""
        
        return summary.strip()
    
    def generate_migration_plan(self, source_provider: CloudProvider, target_provider: CloudProvider) -> Dict:
        """生成迁移计划"""
        migration_plan = {
            'source': source_provider.value,
            'target': target_provider.value,
            'phases': [
                {
                    'phase': 1,
                    'name': '准备阶段',
                    'duration': '1-2周',
                    'tasks': [
                        '评估现有数据库架构',
                        '制定迁移策略',
                        '准备目标环境',
                        '配置网络连接',
                        '设置监控和告警'
                    ]
                },
                {
                    'phase': 2,
                    'name': '测试迁移',
                    'duration': '1周',
                    'tasks': [
                        '创建测试环境',
                        '执行数据迁移测试',
                        '验证数据完整性',
                        '性能测试',
                        '故障转移测试'
                    ]
                },
                {
                    'phase': 3,
                    'name': '生产迁移',
                    'duration': '1-2天',
                    'tasks': [
                        '停止写入操作',
                        '执行最终数据同步',
                        '切换应用连接',
                        '验证系统功能',
                        '监控系统状态'
                    ]
                },
                {
                    'phase': 4,
                    'name': '优化阶段',
                    'duration': '1-2周',
                    'tasks': [
                        '性能调优',
                        '配置优化',
                        '监控调整',
                        '文档更新',
                        '团队培训'
                    ]
                }
            ],
            'risks': [
                {
                    'risk': '数据丢失',
                    'probability': 'Low',
                    'impact': 'High',
                    'mitigation': '多重备份验证,分阶段迁移'
                },
                {
                    'risk': '服务中断',
                    'probability': 'Medium',
                    'impact': 'High',
                    'mitigation': '维护窗口执行,快速回滚方案'
                },
                {
                    'risk': '性能下降',
                    'probability': 'Medium',
                    'impact': 'Medium',
                    'mitigation': '充分测试,性能基准对比'
                }
            ],
            'tools': [
                'AWS Database Migration Service',
                'MySQL Workbench',
                'Percona XtraBackup',
                'pt-table-checksum',
                'Custom migration scripts'
            ],
            'estimated_cost': {
                'migration_tools': 500,
                'additional_resources': 2000,
                'professional_services': 5000,
                'total': 7500,
                'currency': 'USD'
            }
        }
        
        return migration_plan
    
    def export_comparison_report(self, comparison: Dict, output_file: str = "/tmp/cloud_db_comparison.json"):
        """导出对比报告"""
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(comparison, f, ensure_ascii=False, indent=2, default=str)
        
        print(f"对比报告已导出到: {output_file}")

# 使用示例
if __name__ == '__main__':
    # 定义需求
    requirements = {
        'storage_gb': 500,
        'monthly_budget': 800,
        'backup_retention_days': 30,
        'data_transfer_gb': 100,
        'performance_tier': 'high',
        'compliance_required': ['ISO 27001', 'SOC 2'],
        'regions': ['us-east-1', 'cn-beijing']
    }
    
    # 创建管理器
    manager = CloudDatabaseManager()
    
    # 对比云服务提供商
    comparison = manager.compare_providers(requirements)
    
    print("云数据库服务对比结果:")
    print(json.dumps(comparison, ensure_ascii=False, indent=2, default=str))
    
    # 生成迁移计划
    migration_plan = manager.generate_migration_plan(
        CloudProvider.AWS,
        CloudProvider.ALIYUN
    )
    
    print("\n迁移计划:")
    print(json.dumps(migration_plan, ensure_ascii=False, indent=2))
    
    # 导出报告
    manager.export_comparison_report(comparison)

13.6.2 云数据库自动化运维

#!/bin/bash
# 云数据库自动化运维脚本
# 支持AWS RDS、阿里云RDS、腾讯云CDB的统一管理

set -euo pipefail

# 配置参数
CLOUD_PROVIDER="aws"  # aws, aliyun, tencent
REGION="us-east-1"
DB_INSTANCE_ID="mydb-instance"
MONITOR_INTERVAL=60
LOG_FILE="/var/log/cloud_db_monitor.log"
ALERT_EMAIL="admin@example.com"
SLACK_WEBHOOK="https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"

# 日志函数
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}

# 错误处理
error_exit() {
    log "错误: $1"
    send_alert "ERROR" "$1"
    exit 1
}

# 发送告警
send_alert() {
    local level="$1"
    local message="$2"
    
    # 发送邮件
    echo "$message" | mail -s "[$level] 云数据库告警" "$ALERT_EMAIL" 2>/dev/null || true
    
    # 发送Slack通知
    if [ -n "$SLACK_WEBHOOK" ]; then
        curl -X POST -H 'Content-type: application/json' \
            --data "{\"text\":\"[$level] 云数据库告警: $message\"}" \
            "$SLACK_WEBHOOK" 2>/dev/null || true
    fi
    
    # 记录系统日志
    logger "Cloud DB Alert [$level]: $message"
}

# AWS RDS操作
aws_rds_operations() {
    local operation="$1"
    
    case "$operation" in
        status)
            aws rds describe-db-instances \
                --db-instance-identifier "$DB_INSTANCE_ID" \
                --region "$REGION" \
                --query 'DBInstances[0].{Status:DBInstanceStatus,Engine:Engine,Class:DBInstanceClass,Storage:AllocatedStorage,MultiAZ:MultiAZ}' \
                --output table
            ;;
        metrics)
            # 获取CloudWatch指标
            local end_time=$(date -u +%Y-%m-%dT%H:%M:%S)
            local start_time=$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%S)
            
            echo "CPU使用率:"
            aws cloudwatch get-metric-statistics \
                --namespace AWS/RDS \
                --metric-name CPUUtilization \
                --dimensions Name=DBInstanceIdentifier,Value="$DB_INSTANCE_ID" \
                --start-time "$start_time" \
                --end-time "$end_time" \
                --period 300 \
                --statistics Average \
                --region "$REGION" \
                --query 'Datapoints[*].[Timestamp,Average]' \
                --output table
            
            echo "\n连接数:"
            aws cloudwatch get-metric-statistics \
                --namespace AWS/RDS \
                --metric-name DatabaseConnections \
                --dimensions Name=DBInstanceIdentifier,Value="$DB_INSTANCE_ID" \
                --start-time "$start_time" \
                --end-time "$end_time" \
                --period 300 \
                --statistics Average \
                --region "$REGION" \
                --query 'Datapoints[*].[Timestamp,Average]' \
                --output table
            ;;
        backup)
            # 创建快照
            local snapshot_id="${DB_INSTANCE_ID}-$(date +%Y%m%d-%H%M%S)"
            
            aws rds create-db-snapshot \
                --db-instance-identifier "$DB_INSTANCE_ID" \
                --db-snapshot-identifier "$snapshot_id" \
                --region "$REGION"
            
            log "已创建快照: $snapshot_id"
            ;;
        scale)
            local new_class="$2"
            
            aws rds modify-db-instance \
                --db-instance-identifier "$DB_INSTANCE_ID" \
                --db-instance-class "$new_class" \
                --apply-immediately \
                --region "$REGION"
            
            log "已启动实例类型变更: $new_class"
            ;;
        failover)
            # 强制故障转移(仅Multi-AZ)
            aws rds reboot-db-instance \
                --db-instance-identifier "$DB_INSTANCE_ID" \
                --force-failover \
                --region "$REGION"
            
            log "已启动强制故障转移"
            ;;
        *)
            echo "不支持的AWS RDS操作: $operation"
            return 1
            ;;
    esac
}

# 阿里云RDS操作
aliyun_rds_operations() {
    local operation="$1"
    
    case "$operation" in
        status)
            aliyun rds DescribeDBInstances \
                --DBInstanceId "$DB_INSTANCE_ID" \
                --region "$REGION"
            ;;
        metrics)
            # 获取监控数据
            local end_time=$(date -u +%Y-%m-%dT%H:%M:%SZ)
            local start_time=$(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%SZ)
            
            aliyun cms DescribeMetricList \
                --Namespace acs_rds_dashboard \
                --MetricName CpuUsage \
                --Dimensions "[{\"instanceId\":\"$DB_INSTANCE_ID\"}]" \
                --StartTime "$start_time" \
                --EndTime "$end_time" \
                --region "$REGION"
            ;;
        backup)
            # 创建备份
            aliyun rds CreateBackup \
                --DBInstanceId "$DB_INSTANCE_ID" \
                --BackupMethod Physical \
                --region "$REGION"
            
            log "已启动物理备份"
            ;;
        scale)
            local new_class="$2"
            
            aliyun rds ModifyDBInstanceSpec \
                --DBInstanceId "$DB_INSTANCE_ID" \
                --DBInstanceClass "$new_class" \
                --PayType Postpaid \
                --region "$REGION"
            
            log "已启动实例规格变更: $new_class"
            ;;
        *)
            echo "不支持的阿里云RDS操作: $operation"
            return 1
            ;;
    esac
}

# 腾讯云CDB操作
tencent_cdb_operations() {
    local operation="$1"
    
    case "$operation" in
        status)
            tccli cdb DescribeDBInstances \
                --InstanceIds "[\"$DB_INSTANCE_ID\"]" \
                --region "$REGION"
            ;;
        metrics)
            # 获取监控数据
            local end_time=$(date +%Y-%m-%d\ %H:%M:%S)
            local start_time=$(date -d '1 hour ago' +%Y-%m-%d\ %H:%M:%S)
            
            tccli monitor GetMonitorData \
                --Namespace QCE/CDB \
                --MetricName CpuUseRate \
                --Instances "[{\"Dimensions\":[{\"Name\":\"InstanceId\",\"Value\":\"$DB_INSTANCE_ID\"}]}]" \
                --StartTime "$start_time" \
                --EndTime "$end_time" \
                --Period 300 \
                --region "$REGION"
            ;;
        backup)
            # 创建备份
            tccli cdb CreateBackup \
                --InstanceId "$DB_INSTANCE_ID" \
                --BackupMethod physical \
                --region "$REGION"
            
            log "已启动物理备份"
            ;;
        scale)
            local new_class="$2"
            
            tccli cdb UpgradeDBInstance \
                --InstanceId "$DB_INSTANCE_ID" \
                --Memory 4000 \
                --Volume 200 \
                --region "$REGION"
            
            log "已启动实例升级"
            ;;
        *)
            echo "不支持的腾讯云CDB操作: $operation"
            return 1
            ;;
    esac
}

# 统一操作接口
cloud_db_operation() {
    local operation="$1"
    shift
    
    case "$CLOUD_PROVIDER" in
        aws)
            aws_rds_operations "$operation" "$@"
            ;;
        aliyun)
            aliyun_rds_operations "$operation" "$@"
            ;;
        tencent)
            tencent_cdb_operations "$operation" "$@"
            ;;
        *)
            error_exit "不支持的云服务提供商: $CLOUD_PROVIDER"
            ;;
    esac
}

# 健康检查
health_check() {
    log "开始健康检查"
    
    # 检查实例状态
    local status_output
    status_output=$(cloud_db_operation status 2>&1) || {
        error_exit "无法获取实例状态: $status_output"
    }
    
    # 解析状态(简化示例)
    if echo "$status_output" | grep -q "available\|running\|Running"; then
        log "实例状态正常"
    else
        send_alert "WARNING" "实例状态异常: $status_output"
    fi
    
    # 检查关键指标
    check_metrics
}

# 检查指标
check_metrics() {
    log "检查性能指标"
    
    # 获取指标数据
    local metrics_output
    metrics_output=$(cloud_db_operation metrics 2>&1) || {
        log "WARNING: 无法获取指标数据: $metrics_output"
        return 1
    }
    
    # 简化的指标检查(实际应用中需要解析JSON/XML输出)
    if echo "$metrics_output" | grep -q "error\|Error\|ERROR"; then
        send_alert "WARNING" "指标获取异常: $metrics_output"
    else
        log "指标获取正常"
    fi
}

# 自动备份
auto_backup() {
    log "开始自动备份"
    
    # 检查是否需要备份
    local current_hour=$(date +%H)
    local backup_hour=02  # 凌晨2点备份
    
    if [ "$current_hour" = "$backup_hour" ]; then
        cloud_db_operation backup || {
            error_exit "自动备份失败"
        }
        log "自动备份完成"
    else
        log "跳过备份(当前时间: ${current_hour}:xx,备份时间: ${backup_hour}:xx)"
    fi
}

# 自动扩容检查
auto_scaling_check() {
    log "检查是否需要自动扩容"
    
    # 这里应该根据实际指标数据判断
    # 简化示例:假设CPU使用率超过80%需要扩容
    
    # 获取当前实例规格
    local current_status
    current_status=$(cloud_db_operation status 2>&1)
    
    # 简化的扩容逻辑
    if echo "$current_status" | grep -q "db.t3.micro\|mysql.n1.micro\|cdb.s1.small"; then
        log "检测到小规格实例,建议考虑升级"
        # 这里可以添加自动扩容逻辑
        # cloud_db_operation scale "db.t3.small"
    fi
}

# 故障转移测试
failover_test() {
    log "执行故障转移测试"
    
    if [ "$CLOUD_PROVIDER" = "aws" ]; then
        # 仅在AWS Multi-AZ环境下执行
        cloud_db_operation failover || {
            error_exit "故障转移测试失败"
        }
        log "故障转移测试完成"
    else
        log "当前云服务提供商不支持手动故障转移测试"
    fi
}

# 生成运维报告
generate_report() {
    local report_file="/tmp/cloud_db_report_$(date +%Y%m%d_%H%M%S).txt"
    
    {
        echo "云数据库运维报告 - $(date)"
        echo "======================================"
        echo
        echo "基本信息:"
        echo "  云服务提供商: $CLOUD_PROVIDER"
        echo "  区域: $REGION"
        echo "  实例ID: $DB_INSTANCE_ID"
        echo
        
        echo "实例状态:"
        cloud_db_operation status 2>&1 || echo "  状态获取失败"
        echo
        
        echo "性能指标:"
        cloud_db_operation metrics 2>&1 || echo "  指标获取失败"
        echo
        
        echo "最近日志:"
        tail -20 "$LOG_FILE" 2>/dev/null || echo "  日志文件不存在"
        
    } > "$report_file"
    
    log "运维报告已生成: $report_file"
    
    # 发送报告
    if command -v mail &> /dev/null; then
        mail -s "云数据库运维报告" "$ALERT_EMAIL" < "$report_file" 2>/dev/null || true
    fi
}

# 监控循环
monitor_loop() {
    log "开始监控循环(间隔: ${MONITOR_INTERVAL}秒)"
    
    while true; do
        health_check
        auto_backup
        auto_scaling_check
        
        sleep "$MONITOR_INTERVAL"
    done
}

# 主函数
main() {
    case "${1:-help}" in
        monitor)
            monitor_loop
            ;;
        status)
            cloud_db_operation status
            ;;
        metrics)
            cloud_db_operation metrics
            ;;
        backup)
            cloud_db_operation backup
            ;;
        scale)
            if [ -z "${2:-}" ]; then
                echo "用法: $0 scale <new_instance_class>"
                exit 1
            fi
            cloud_db_operation scale "$2"
            ;;
        failover)
            failover_test
            ;;
        report)
            generate_report
            ;;
        health)
            health_check
            ;;
        help|*)
            echo "云数据库自动化运维脚本"
            echo
            echo "用法: $0 [命令]"
            echo
            echo "命令:"
            echo "  monitor   - 启动监控循环"
            echo "  status    - 查看实例状态"
            echo "  metrics   - 查看性能指标"
            echo "  backup    - 执行备份"
            echo "  scale     - 扩容实例"
            echo "  failover  - 故障转移测试"
            echo "  report    - 生成运维报告"
            echo "  health    - 健康检查"
            echo "  help      - 显示此帮助信息"
            echo
            echo "配置:"
            echo "  云服务提供商: $CLOUD_PROVIDER"
            echo "  区域: $REGION"
            echo "  实例ID: $DB_INSTANCE_ID"
            echo "  监控间隔: ${MONITOR_INTERVAL}秒"
            echo "  日志文件: $LOG_FILE"
            ;;
    esac
}

main "$@"

”`