11.1 备份策略概述

11.1.1 备份类型与特点

from datetime import datetime, timedelta
import os
import subprocess
import json
import logging

class MySQLBackupStrategy:
    def __init__(self):
        self.backup_types = {
            'full': {
                'description': '完整备份',
                'frequency': 'weekly',
                'retention': '4 weeks',
                'size_factor': 1.0,
                'recovery_time': 'long',
                'advantages': ['完整数据', '独立恢复', '简单管理'],
                'disadvantages': ['占用空间大', '备份时间长', '网络传输慢']
            },
            'incremental': {
                'description': '增量备份',
                'frequency': 'daily',
                'retention': '2 weeks',
                'size_factor': 0.1,
                'recovery_time': 'medium',
                'advantages': ['占用空间小', '备份速度快', '网络友好'],
                'disadvantages': ['依赖链完整', '恢复复杂', '管理困难']
            },
            'differential': {
                'description': '差异备份',
                'frequency': 'daily',
                'retention': '1 week',
                'size_factor': 0.3,
                'recovery_time': 'medium',
                'advantages': ['恢复简单', '空间适中', '速度较快'],
                'disadvantages': ['逐渐增大', '重复数据', '依赖全备']
            },
            'logical': {
                'description': '逻辑备份',
                'frequency': 'daily',
                'retention': '1 week',
                'size_factor': 0.8,
                'recovery_time': 'long',
                'advantages': ['跨平台', '可读性好', '选择性恢复'],
                'disadvantages': ['速度慢', '占用CPU', '锁表时间长']
            },
            'physical': {
                'description': '物理备份',
                'frequency': 'daily',
                'retention': '1 week',
                'size_factor': 1.2,
                'recovery_time': 'fast',
                'advantages': ['速度快', '一致性好', '恢复快'],
                'disadvantages': ['平台相关', '版本敏感', '空间大']
            }
        }
        
        self.rpo_rto_matrix = {
            'critical': {'rpo': '15 minutes', 'rto': '1 hour'},
            'important': {'rpo': '1 hour', 'rto': '4 hours'},
            'normal': {'rpo': '4 hours', 'rto': '8 hours'},
            'low': {'rpo': '24 hours', 'rto': '24 hours'}
        }
    
    def analyze_backup_requirements(self, database_size_gb, daily_change_rate, 
                                  business_criticality, available_storage_gb):
        """分析备份需求"""
        print("\n=== 备份需求分析 ===")
        print(f"数据库大小: {database_size_gb} GB")
        print(f"日变化率: {daily_change_rate * 100:.1f}%")
        print(f"业务重要性: {business_criticality}")
        print(f"可用存储: {available_storage_gb} GB")
        print("=" * 40)
        
        # 获取RTO/RPO要求
        rpo_rto = self.rpo_rto_matrix.get(business_criticality, 
                                         self.rpo_rto_matrix['normal'])
        
        print(f"\n📊 业务要求:")
        print(f"  RPO (恢复点目标): {rpo_rto['rpo']}")
        print(f"  RTO (恢复时间目标): {rpo_rto['rto']}")
        
        # 计算存储需求
        storage_requirements = self.calculate_storage_requirements(
            database_size_gb, daily_change_rate
        )
        
        print(f"\n💾 存储需求分析:")
        for backup_type, requirement in storage_requirements.items():
            print(f"  {self.backup_types[backup_type]['description']}:")
            print(f"    每次备份: {requirement['per_backup']:.1f} GB")
            print(f"    月存储需求: {requirement['monthly']:.1f} GB")
        
        # 推荐备份策略
        recommended_strategy = self.recommend_backup_strategy(
            database_size_gb, daily_change_rate, business_criticality, 
            available_storage_gb, storage_requirements
        )
        
        return recommended_strategy
    
    def calculate_storage_requirements(self, database_size_gb, daily_change_rate):
        """计算存储需求"""
        requirements = {}
        
        for backup_type, config in self.backup_types.items():
            if backup_type == 'full':
                per_backup = database_size_gb * config['size_factor']
                monthly = per_backup * 4  # 每周一次,保留4周
            elif backup_type == 'incremental':
                per_backup = database_size_gb * daily_change_rate * config['size_factor']
                monthly = per_backup * 30  # 每天一次,保留30天
            elif backup_type == 'differential':
                # 差异备份逐渐增大
                avg_size = database_size_gb * daily_change_rate * 3.5 * config['size_factor']
                per_backup = avg_size
                monthly = avg_size * 7  # 每天一次,保留7天
            else:
                per_backup = database_size_gb * config['size_factor']
                monthly = per_backup * 7  # 保留7天
            
            requirements[backup_type] = {
                'per_backup': per_backup,
                'monthly': monthly
            }
        
        return requirements
    
    def recommend_backup_strategy(self, database_size_gb, daily_change_rate, 
                                business_criticality, available_storage_gb, 
                                storage_requirements):
        """推荐备份策略"""
        print(f"\n💡 推荐备份策略:")
        print("-" * 30)
        
        strategies = []
        
        # 策略1: 全备份 + 增量备份
        strategy1_storage = (storage_requirements['full']['monthly'] + 
                           storage_requirements['incremental']['monthly'])
        
        if strategy1_storage <= available_storage_gb:
            strategies.append({
                'name': '全备份 + 增量备份',
                'schedule': {
                    'full': '每周日 02:00',
                    'incremental': '每日 02:00(除周日)'
                },
                'storage_needed': strategy1_storage,
                'recovery_complexity': 'medium',
                'suitable_for': ['critical', 'important']
            })
        
        # 策略2: 全备份 + 差异备份
        strategy2_storage = (storage_requirements['full']['monthly'] + 
                           storage_requirements['differential']['monthly'])
        
        if strategy2_storage <= available_storage_gb:
            strategies.append({
                'name': '全备份 + 差异备份',
                'schedule': {
                    'full': '每周日 02:00',
                    'differential': '每日 02:00(除周日)'
                },
                'storage_needed': strategy2_storage,
                'recovery_complexity': 'low',
                'suitable_for': ['important', 'normal']
            })
        
        # 策略3: 仅全备份
        strategy3_storage = storage_requirements['full']['monthly']
        
        if strategy3_storage <= available_storage_gb:
            strategies.append({
                'name': '仅全备份',
                'schedule': {
                    'full': '每日 02:00'
                },
                'storage_needed': strategy3_storage,
                'recovery_complexity': 'very_low',
                'suitable_for': ['normal', 'low']
            })
        
        # 选择最佳策略
        best_strategy = None
        
        for strategy in strategies:
            if business_criticality in strategy['suitable_for']:
                best_strategy = strategy
                break
        
        if not best_strategy and strategies:
            best_strategy = strategies[0]
        
        if best_strategy:
            print(f"🎯 推荐策略: {best_strategy['name']}")
            print(f"  存储需求: {best_strategy['storage_needed']:.1f} GB")
            print(f"  恢复复杂度: {best_strategy['recovery_complexity']}")
            print(f"  备份计划:")
            for backup_type, schedule in best_strategy['schedule'].items():
                print(f"    {backup_type}: {schedule}")
        else:
            print("❌ 存储空间不足,无法满足备份需求")
            print("建议:")
            print("  1. 增加存储空间")
            print("  2. 考虑数据归档")
            print("  3. 使用压缩备份")
            print("  4. 缩短备份保留期")
        
        return best_strategy
    
    def generate_backup_policy(self, strategy):
        """生成备份策略文档"""
        if not strategy:
            return None
        
        policy = {
            'strategy_name': strategy['name'],
            'created_date': datetime.now().isoformat(),
            'backup_schedule': strategy['schedule'],
            'retention_policy': {
                'full_backup': '4 weeks',
                'incremental_backup': '2 weeks',
                'differential_backup': '1 week'
            },
            'storage_requirements': {
                'total_needed_gb': strategy['storage_needed'],
                'compression': 'enabled',
                'encryption': 'enabled'
            },
            'monitoring': {
                'backup_success_rate': '>= 99%',
                'backup_duration_alert': '> 4 hours',
                'storage_usage_alert': '> 80%'
            },
            'testing': {
                'restore_test_frequency': 'monthly',
                'disaster_recovery_drill': 'quarterly'
            }
        }
        
        return policy

# 备份策略分析示例
strategy_analyzer = MySQLBackupStrategy()

# 分析不同场景的备份需求
scenarios = [
    {
        'name': '大型电商数据库',
        'database_size_gb': 500,
        'daily_change_rate': 0.05,
        'business_criticality': 'critical',
        'available_storage_gb': 2000
    },
    {
        'name': '中型企业应用',
        'database_size_gb': 100,
        'daily_change_rate': 0.02,
        'business_criticality': 'important',
        'available_storage_gb': 500
    },
    {
        'name': '小型网站数据库',
        'database_size_gb': 20,
        'daily_change_rate': 0.01,
        'business_criticality': 'normal',
        'available_storage_gb': 100
    }
]

for scenario in scenarios:
    print(f"\n{'='*50}")
    print(f"场景: {scenario['name']}")
    print(f"{'='*50}")
    
    recommended_strategy = strategy_analyzer.analyze_backup_requirements(
        scenario['database_size_gb'],
        scenario['daily_change_rate'],
        scenario['business_criticality'],
        scenario['available_storage_gb']
    )
    
    if recommended_strategy:
        policy = strategy_analyzer.generate_backup_policy(recommended_strategy)
        print(f"\n📋 备份策略文档已生成")

11.1.2 备份工具对比

class MySQLBackupTools:
    def __init__(self):
        self.tools = {
            'mysqldump': {
                'type': 'logical',
                'pros': [
                    '内置工具,无需额外安装',
                    '跨平台兼容性好',
                    '支持选择性备份',
                    '输出可读SQL文件',
                    '支持压缩和加密'
                ],
                'cons': [
                    '大数据库备份慢',
                    '备份期间锁表',
                    '恢复速度慢',
                    '内存使用较高'
                ],
                'best_for': ['小到中型数据库', '开发环境', '数据迁移'],
                'max_recommended_size': '100GB'
            },
            'mysqlpump': {
                'type': 'logical',
                'pros': [
                    '并行备份支持',
                    '更好的性能',
                    '支持排除数据库/表',
                    '内置压缩',
                    '进度显示'
                ],
                'cons': [
                    'MySQL 5.7+才支持',
                    '功能相对较新',
                    '文档较少',
                    '兼容性问题'
                ],
                'best_for': ['中型数据库', '需要并行处理', 'MySQL 5.7+环境'],
                'max_recommended_size': '500GB'
            },
            'xtrabackup': {
                'type': 'physical',
                'pros': [
                    '热备份,无锁表',
                    '备份速度快',
                    '支持增量备份',
                    '恢复速度快',
                    '支持压缩和加密'
                ],
                'cons': [
                    '需要额外安装',
                    '仅支持InnoDB',
                    '平台相关性',
                    '配置复杂'
                ],
                'best_for': ['大型数据库', '生产环境', '24/7服务'],
                'max_recommended_size': '10TB+'
            },
            'mysql_enterprise_backup': {
                'type': 'physical',
                'pros': [
                    'Oracle官方工具',
                    '完整功能支持',
                    '优秀的性能',
                    '企业级支持',
                    '云备份集成'
                ],
                'cons': [
                    '需要商业许可',
                    '成本较高',
                    '仅企业版',
                    '学习成本'
                ],
                'best_for': ['企业环境', '关键业务', '大型部署'],
                'max_recommended_size': '无限制'
            },
            'mydumper': {
                'type': 'logical',
                'pros': [
                    '多线程并行',
                    '一致性快照',
                    '分表输出',
                    '压缩支持',
                    '开源免费'
                ],
                'cons': [
                    '需要额外安装',
                    '配置复杂',
                    '文档较少',
                    '社区支持'
                ],
                'best_for': ['大型逻辑备份', '数据迁移', '并行处理'],
                'max_recommended_size': '1TB'
            }
        }
    
    def compare_tools(self, database_size_gb, backup_type_preference, 
                     budget_level, technical_expertise):
        """比较备份工具"""
        print("\n=== 备份工具对比分析 ===")
        print(f"数据库大小: {database_size_gb} GB")
        print(f"备份类型偏好: {backup_type_preference}")
        print(f"预算水平: {budget_level}")
        print(f"技术水平: {technical_expertise}")
        print("=" * 50)
        
        suitable_tools = []
        
        for tool_name, tool_info in self.tools.items():
            score = 0
            reasons = []
            
            # 检查数据库大小适配性
            max_size_str = tool_info['max_recommended_size']
            if max_size_str == '无限制':
                size_suitable = True
                score += 3
            elif max_size_str.endswith('TB+'):
                max_size = float(max_size_str.replace('TB+', '')) * 1024
                size_suitable = database_size_gb <= max_size
                score += 3 if size_suitable else -2
            elif max_size_str.endswith('TB'):
                max_size = float(max_size_str.replace('TB', '')) * 1024
                size_suitable = database_size_gb <= max_size
                score += 2 if size_suitable else -2
            else:
                max_size = float(max_size_str.replace('GB', ''))
                size_suitable = database_size_gb <= max_size
                score += 1 if size_suitable else -3
            
            if size_suitable:
                reasons.append(f"适合{database_size_gb}GB数据库")
            else:
                reasons.append(f"数据库过大(推荐<{max_size_str})")
            
            # 检查备份类型匹配
            if tool_info['type'] == backup_type_preference:
                score += 2
                reasons.append(f"支持{backup_type_preference}备份")
            
            # 检查预算适配性
            if 'commercial' in tool_name.lower() or 'enterprise' in tool_name.lower():
                if budget_level == 'high':
                    score += 1
                    reasons.append("企业级工具")
                else:
                    score -= 2
                    reasons.append("需要商业许可")
            else:
                if budget_level in ['low', 'medium']:
                    score += 1
                    reasons.append("开源免费")
            
            # 检查技术要求
            if tool_name in ['mysqldump']:
                if technical_expertise in ['beginner', 'intermediate']:
                    score += 1
                    reasons.append("易于使用")
            elif tool_name in ['xtrabackup', 'mydumper']:
                if technical_expertise in ['intermediate', 'advanced']:
                    score += 1
                    reasons.append("适合有经验用户")
                else:
                    score -= 1
                    reasons.append("需要技术经验")
            
            suitable_tools.append({
                'name': tool_name,
                'score': score,
                'reasons': reasons,
                'info': tool_info
            })
        
        # 按评分排序
        suitable_tools.sort(key=lambda x: x['score'], reverse=True)
        
        print("\n🏆 推荐工具排序:")
        print("-" * 30)
        
        for i, tool in enumerate(suitable_tools[:3], 1):
            print(f"\n{i}. {tool['name'].upper()} (评分: {tool['score']})")
            print(f"   类型: {tool['info']['type']}")
            print(f"   推荐原因:")
            for reason in tool['reasons']:
                print(f"     • {reason}")
            
            print(f"   优点:")
            for pro in tool['info']['pros'][:3]:
                print(f"     ✅ {pro}")
            
            print(f"   缺点:")
            for con in tool['info']['cons'][:2]:
                print(f"     ❌ {con}")
        
        return suitable_tools[0] if suitable_tools else None
    
    def generate_tool_recommendation_report(self, scenarios):
        """生成工具推荐报告"""
        print("\n" + "="*60)
        print("MySQL备份工具推荐报告")
        print("="*60)
        
        for scenario in scenarios:
            print(f"\n📋 场景: {scenario['name']}")
            print("-" * 40)
            
            recommended_tool = self.compare_tools(
                scenario['database_size_gb'],
                scenario['backup_type_preference'],
                scenario['budget_level'],
                scenario['technical_expertise']
            )
            
            if recommended_tool:
                print(f"\n✅ 最佳选择: {recommended_tool['name']}")
                print(f"   适用场景: {', '.join(recommended_tool['info']['best_for'])}")
            else:
                print("\n❌ 未找到合适的工具")

# 工具对比示例
tools_analyzer = MySQLBackupTools()

# 定义不同场景
scenarios = [
    {
        'name': '初创公司网站',
        'database_size_gb': 5,
        'backup_type_preference': 'logical',
        'budget_level': 'low',
        'technical_expertise': 'beginner'
    },
    {
        'name': '中型企业应用',
        'database_size_gb': 200,
        'backup_type_preference': 'physical',
        'budget_level': 'medium',
        'technical_expertise': 'intermediate'
    },
    {
        'name': '大型电商平台',
        'database_size_gb': 2000,
        'backup_type_preference': 'physical',
        'budget_level': 'high',
        'technical_expertise': 'advanced'
    }
]

tools_analyzer.generate_tool_recommendation_report(scenarios)

11.2 逻辑备份详解

11.2.1 mysqldump详细使用

#!/bin/bash
# mysqldump完整备份脚本

# 配置参数
MYSQL_USER="backup_user"
MYSQL_PASSWORD="backup_password"
MYSQL_HOST="localhost"
MYSQL_PORT="3306"
BACKUP_DIR="/backup/mysql"
LOG_FILE="/var/log/mysql/backup.log"
RETENTION_DAYS=7
COMPRESSION_LEVEL=6

# 创建备份目录
mkdir -p $BACKUP_DIR
mkdir -p $(dirname $LOG_FILE)

# 日志函数
log_message() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a $LOG_FILE
}

# 检查MySQL连接
check_mysql_connection() {
    log_message "检查MySQL连接..."
    
    if mysql -h$MYSQL_HOST -P$MYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD -e "SELECT 1" >/dev/null 2>&1; then
        log_message "✅ MySQL连接成功"
        return 0
    else
        log_message "❌ MySQL连接失败"
        return 1
    fi
}

# 获取数据库列表
get_databases() {
    mysql -h$MYSQL_HOST -P$MYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD \
          -e "SHOW DATABASES" | grep -v -E '^(Database|information_schema|performance_schema|mysql|sys)$'
}

# 单个数据库备份
backup_database() {
    local db_name=$1
    local backup_file="$BACKUP_DIR/${db_name}_$(date +%Y%m%d_%H%M%S).sql"
    local compressed_file="${backup_file}.gz"
    
    log_message "开始备份数据库: $db_name"
    
    # 获取数据库大小
    local db_size=$(mysql -h$MYSQL_HOST -P$MYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD \
                   -e "SELECT ROUND(SUM(data_length + index_length) / 1024 / 1024, 1) AS 'DB Size in MB' 
                       FROM information_schema.tables 
                       WHERE table_schema='$db_name'" | tail -1)
    
    log_message "数据库大小: ${db_size} MB"
    
    # 执行备份
    local start_time=$(date +%s)
    
    mysqldump \
        --host=$MYSQL_HOST \
        --port=$MYSQL_PORT \
        --user=$MYSQL_USER \
        --password=$MYSQL_PASSWORD \
        --single-transaction \
        --routines \
        --triggers \
        --events \
        --flush-logs \
        --master-data=2 \
        --add-drop-database \
        --add-drop-table \
        --create-options \
        --disable-keys \
        --extended-insert \
        --quick \
        --lock-tables=false \
        --databases $db_name > $backup_file
    
    local backup_result=$?
    local end_time=$(date +%s)
    local duration=$((end_time - start_time))
    
    if [ $backup_result -eq 0 ]; then
        # 压缩备份文件
        log_message "压缩备份文件..."
        gzip -$COMPRESSION_LEVEL $backup_file
        
        if [ $? -eq 0 ]; then
            local compressed_size=$(du -h $compressed_file | cut -f1)
            log_message "✅ 数据库 $db_name 备份成功"
            log_message "   备份文件: $compressed_file"
            log_message "   压缩后大小: $compressed_size"
            log_message "   备份耗时: ${duration}秒"
            
            # 验证备份文件
            validate_backup $compressed_file $db_name
        else
            log_message "❌ 压缩失败: $db_name"
            rm -f $backup_file
            return 1
        fi
    else
        log_message "❌ 备份失败: $db_name"
        rm -f $backup_file
        return 1
    fi
    
    return 0
}

# 备份恢复准备
prepare_backup() {
    local backup_path="$1"
    local prepare_type="$2"  # full 或 incremental
    
    log_message "准备备份: $backup_path"
    log_message "准备类型: $prepare_type"
    
    # 构建prepare命令
    local cmd="xtrabackup --prepare"
    cmd="$cmd --target-dir=$backup_path"
    
    if [ "$prepare_type" = "incremental" ]; then
        cmd="$cmd --apply-log-only"
        log_message "应用增量备份(仅应用日志)"
    else
        log_message "准备完整备份"
    fi
    
    # 如果备份是压缩的,需要先解压
    if [ -f "$backup_path/xtrabackup_compressed" ]; then
        log_message "检测到压缩备份,开始解压..."
        
        local decompress_cmd="xtrabackup --decompress"
        decompress_cmd="$decompress_cmd --target-dir=$backup_path"
        decompress_cmd="$decompress_cmd --parallel=$PARALLEL_THREADS"
        
        eval $decompress_cmd
        
        if [ $? -eq 0 ]; then
            log_message "✅ 解压完成"
            # 删除压缩文件
            find $backup_path -name "*.qp" -delete
        else
            log_message "❌ 解压失败"
            return 1
        fi
    fi
    
    # 如果备份是加密的,需要先解密
    if [ -f "$backup_path/xtrabackup_encrypted" ]; then
        log_message "检测到加密备份,开始解密..."
        
        local decrypt_cmd="xtrabackup --decrypt=AES256"
        decrypt_cmd="$decrypt_cmd --encrypt-key-file=$ENCRYPTION_KEY"
        decrypt_cmd="$decrypt_cmd --target-dir=$backup_path"
        decrypt_cmd="$decrypt_cmd --parallel=$PARALLEL_THREADS"
        
        eval $decrypt_cmd
        
        if [ $? -eq 0 ]; then
            log_message "✅ 解密完成"
            # 删除加密文件
            find $backup_path -name "*.xbcrypt" -delete
        else
            log_message "❌ 解密失败"
            return 1
        fi
    fi
    
    # 执行prepare
    local start_time=$(date +%s)
    
    log_message "执行命令: $cmd"
    eval $cmd
    
    local prepare_result=$?
    local end_time=$(date +%s)
    local duration=$((end_time - start_time))
    
    if [ $prepare_result -eq 0 ]; then
        log_message "✅ 备份准备成功"
        log_message "   准备耗时: ${duration}秒"
        return 0
    else
        log_message "❌ 备份准备失败"
        return 1
    fi
}

# 清理旧备份
cleanup_old_backups() {
    log_message "清理 $RETENTION_DAYS 天前的备份..."
    
    # 清理全量备份
    local full_deleted=$(find $FULL_BACKUP_DIR -maxdepth 1 -type d -mtime +$RETENTION_DAYS -exec rm -rf {} \; -print | wc -l)
    
    # 清理增量备份
    local inc_deleted=$(find $INC_BACKUP_DIR -maxdepth 1 -type d -mtime +$RETENTION_DAYS -exec rm -rf {} \; -print | wc -l)
    
    log_message "✅ 已清理 $full_deleted 个全量备份,$inc_deleted 个增量备份"
    
    # 更新备份记录文件
    if [ -f "$BACKUP_DIR/latest_full_backup.txt" ]; then
        local latest_full=$(cat "$BACKUP_DIR/latest_full_backup.txt")
        if [ ! -d "$latest_full" ]; then
            rm -f "$BACKUP_DIR/latest_full_backup.txt"
            log_message "⚠️  最新全量备份记录已失效,已清除"
        fi
    fi
}

# 生成备份报告
generate_backup_report() {
    local backup_type="$1"
    local backup_status="$2"
    local backup_path="$3"
    local backup_size="$4"
    local duration="$5"
    
    log_message "\n=== XtraBackup备份报告 ==="
    log_message "备份时间: $(date '+%Y-%m-%d %H:%M:%S')"
    log_message "备份类型: $backup_type"
    log_message "备份状态: $backup_status"
    
    if [ "$backup_status" = "成功" ]; then
        log_message "备份路径: $backup_path"
        log_message "备份大小: $backup_size"
        log_message "备份耗时: ${duration}秒"
        
        # 统计备份文件数量
        local file_count=$(find $backup_path -type f | wc -l)
        log_message "文件数量: $file_count"
        
        # 显示磁盘使用情况
        local disk_usage=$(df -h $BACKUP_DIR | tail -1 | awk '{print $5}')
        log_message "磁盘使用率: $disk_usage"
    fi
    
    log_message "==========================================="
}

# 主备份流程
main() {
    log_message "=== 开始XtraBackup备份 ==="
    
    # 检查环境
    if ! check_xtrabackup; then
        exit 1
    fi
    
    # 根据参数决定备份类型
    local backup_type="${1:-full}"
    
    case $backup_type in
        "full")
            log_message "执行全量备份"
            if full_backup; then
                generate_backup_report "全量备份" "成功" "$(cat $BACKUP_DIR/latest_full_backup.txt)" "$(du -sh $(cat $BACKUP_DIR/latest_full_backup.txt) | cut -f1)" "N/A"
            else
                generate_backup_report "全量备份" "失败" "N/A" "N/A" "N/A"
            fi
            ;;
        "incremental")
            log_message "执行增量备份"
            if incremental_backup; then
                local latest_inc=$(tail -1 $BACKUP_DIR/incremental_backups.txt)
                generate_backup_report "增量备份" "成功" "$latest_inc" "$(du -sh $latest_inc | cut -f1)" "N/A"
            else
                generate_backup_report "增量备份" "失败" "N/A" "N/A" "N/A"
            fi
            ;;
        "cleanup")
            log_message "执行备份清理"
            cleanup_old_backups
            ;;
        *)
            log_message "❌ 未知的备份类型: $backup_type"
            log_message "支持的类型: full, incremental, cleanup"
            exit 1
            ;;
    esac
    
    log_message "=== XtraBackup备份完成 ==="
}

# 执行主流程
main "$@"

11.5 二进制日志备份与管理

11.5.1 二进制日志备份脚本

#!/bin/bash
# MySQL二进制日志备份脚本

# 配置参数
MYSQL_HOST="localhost"
MYSQL_PORT="3306"
MYSQL_USER="root"
MYSQL_PASSWORD="your_password"
BINLOG_DIR="/var/lib/mysql"
BACKUP_DIR="/backup/mysql/binlog"
LOG_FILE="/var/log/mysql_binlog_backup.log"
RETENTION_DAYS=7

# 日志函数
log_message() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a $LOG_FILE
}

# 检查MySQL连接
check_mysql_connection() {
    log_message "检查MySQL连接..."
    
    mysql -h$MYSQL_HOST -P$MYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD -e "SELECT 1" >/dev/null 2>&1
    
    if [ $? -eq 0 ]; then
        log_message "✅ MySQL连接正常"
        return 0
    else
        log_message "❌ MySQL连接失败"
        return 1
    fi
}

# 获取当前二进制日志信息
get_current_binlog_info() {
    log_message "获取当前二进制日志信息..."
    
    # 获取当前二进制日志文件和位置
    local binlog_info=$(mysql -h$MYSQL_HOST -P$MYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD \
        -e "SHOW MASTER STATUS" --skip-column-names --silent)
    
    if [ -n "$binlog_info" ]; then
        local current_file=$(echo "$binlog_info" | awk '{print $1}')
        local current_position=$(echo "$binlog_info" | awk '{print $2}')
        
        log_message "当前二进制日志: $current_file"
        log_message "当前位置: $current_position"
        
        echo "$current_file:$current_position"
        return 0
    else
        log_message "❌ 无法获取二进制日志信息"
        return 1
    fi
}

# 获取二进制日志列表
get_binlog_list() {
    log_message "获取二进制日志列表..."
    
    mysql -h$MYSQL_HOST -P$MYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD \
        -e "SHOW BINARY LOGS" --skip-column-names --silent | awk '{print $1}'
}

# 刷新二进制日志
flush_binary_logs() {
    log_message "刷新二进制日志..."
    
    mysql -h$MYSQL_HOST -P$MYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD \
        -e "FLUSH BINARY LOGS"
    
    if [ $? -eq 0 ]; then
        log_message "✅ 二进制日志刷新成功"
        return 0
    else
        log_message "❌ 二进制日志刷新失败"
        return 1
    fi
}

# 备份二进制日志文件
backup_binlog_file() {
    local binlog_file="$1"
    local backup_date="$(date +%Y%m%d)"
    local backup_subdir="$BACKUP_DIR/$backup_date"
    
    # 创建备份目录
    if [ ! -d "$backup_subdir" ]; then
        mkdir -p "$backup_subdir"
        log_message "创建备份目录: $backup_subdir"
    fi
    
    local source_path="$BINLOG_DIR/$binlog_file"
    local backup_path="$backup_subdir/$binlog_file"
    
    # 检查源文件是否存在
    if [ ! -f "$source_path" ]; then
        log_message "⚠️  二进制日志文件不存在: $source_path"
        return 1
    fi
    
    # 复制文件
    log_message "备份二进制日志: $binlog_file"
    
    cp "$source_path" "$backup_path"
    
    if [ $? -eq 0 ]; then
        # 验证备份文件
        local source_size=$(stat -c%s "$source_path")
        local backup_size=$(stat -c%s "$backup_path")
        
        if [ "$source_size" -eq "$backup_size" ]; then
            log_message "✅ $binlog_file 备份成功 ($(du -h $backup_path | cut -f1))"
            
            # 压缩备份文件
            gzip "$backup_path"
            
            if [ $? -eq 0 ]; then
                log_message "✅ $binlog_file 压缩完成"
            else
                log_message "⚠️  $binlog_file 压缩失败"
            fi
            
            return 0
        else
            log_message "❌ $binlog_file 备份文件大小不匹配"
            rm -f "$backup_path"
            return 1
        fi
    else
        log_message "❌ $binlog_file 备份失败"
        return 1
    fi
}

# 增量备份二进制日志
incremental_binlog_backup() {
    local last_backup_info_file="$BACKUP_DIR/last_backup_info.txt"
    local current_binlog_info
    
    log_message "\n=== 增量二进制日志备份 ==="
    
    # 获取当前二进制日志信息
    current_binlog_info=$(get_current_binlog_info)
    if [ $? -ne 0 ]; then
        return 1
    fi
    
    # 刷新二进制日志(生成新的日志文件)
    if ! flush_binary_logs; then
        return 1
    fi
    
    # 获取所有二进制日志文件
    local all_binlogs=$(get_binlog_list)
    
    # 读取上次备份信息
    local last_backup_file=""
    if [ -f "$last_backup_info_file" ]; then
        last_backup_file=$(cat "$last_backup_info_file" | cut -d':' -f1)
        log_message "上次备份的最后文件: $last_backup_file"
    else
        log_message "首次备份,将备份所有二进制日志"
    fi
    
    # 确定需要备份的文件
    local backup_files=()
    local start_backup=false
    
    if [ -z "$last_backup_file" ]; then
        # 首次备份,备份所有文件(除了当前正在使用的)
        start_backup=true
    fi
    
    local current_file=$(echo "$current_binlog_info" | cut -d':' -f1)
    
    for binlog in $all_binlogs; do
        # 跳过当前正在使用的二进制日志文件
        if [ "$binlog" = "$current_file" ]; then
            continue
        fi
        
        if [ "$start_backup" = "true" ]; then
            backup_files+=("$binlog")
        elif [ "$binlog" = "$last_backup_file" ]; then
            start_backup=true
        elif [ "$start_backup" = "false" ] && [ "$binlog" \> "$last_backup_file" ]; then
            backup_files+=("$binlog")
            start_backup=true
        fi
    done
    
    # 执行备份
    local success_count=0
    local failed_count=0
    
    log_message "需要备份的文件数量: ${#backup_files[@]}"
    
    for binlog_file in "${backup_files[@]}"; do
        if backup_binlog_file "$binlog_file"; then
            ((success_count++))
        else
            ((failed_count++))
        fi
    done
    
    # 更新备份信息
    if [ ${#backup_files[@]} -gt 0 ]; then
        local last_backed_file="${backup_files[-1]}"
        echo "$last_backed_file:$(date +%s)" > "$last_backup_info_file"
        log_message "更新备份信息: $last_backed_file"
    fi
    
    log_message "\n=== 增量备份完成 ==="
    log_message "成功备份: $success_count 个文件"
    log_message "备份失败: $failed_count 个文件"
    
    return $failed_count
}

# 全量备份二进制日志
full_binlog_backup() {
    log_message "\n=== 全量二进制日志备份 ==="
    
    # 刷新二进制日志
    if ! flush_binary_logs; then
        return 1
    fi
    
    # 获取所有二进制日志文件
    local all_binlogs=$(get_binlog_list)
    local current_binlog_info=$(get_current_binlog_info)
    local current_file=$(echo "$current_binlog_info" | cut -d':' -f1)
    
    local success_count=0
    local failed_count=0
    
    for binlog_file in $all_binlogs; do
        # 跳过当前正在使用的二进制日志文件
        if [ "$binlog_file" = "$current_file" ]; then
            log_message "跳过当前使用的日志文件: $binlog_file"
            continue
        fi
        
        if backup_binlog_file "$binlog_file"; then
            ((success_count++))
        else
            ((failed_count++))
        fi
    done
    
    log_message "\n=== 全量备份完成 ==="
    log_message "成功备份: $success_count 个文件"
    log_message "备份失败: $failed_count 个文件"
    
    # 更新备份信息
    if [ $success_count -gt 0 ]; then
        local last_backup_info_file="$BACKUP_DIR/last_backup_info.txt"
        local last_binlog=$(echo "$all_binlogs" | tail -2 | head -1)  # 倒数第二个文件
        echo "$last_binlog:$(date +%s)" > "$last_backup_info_file"
        log_message "更新备份信息: $last_binlog"
    fi
    
    return $failed_count
}

# 清理过期备份
cleanup_old_backups() {
    log_message "\n=== 清理过期备份 ==="
    log_message "保留天数: $RETENTION_DAYS 天"
    
    # 查找过期的备份目录
    local cutoff_date=$(date -d "$RETENTION_DAYS days ago" +%Y%m%d)
    local deleted_count=0
    local total_size=0
    
    for backup_subdir in "$BACKUP_DIR"/[0-9]*; do
        if [ -d "$backup_subdir" ]; then
            local dir_date=$(basename "$backup_subdir")
            
            if [[ "$dir_date" =~ ^[0-9]{8}$ ]] && [ "$dir_date" -lt "$cutoff_date" ]; then
                local dir_size=$(du -sb "$backup_subdir" | cut -f1)
                total_size=$((total_size + dir_size))
                
                log_message "删除过期备份: $backup_subdir ($(du -sh $backup_subdir | cut -f1))"
                rm -rf "$backup_subdir"
                
                if [ $? -eq 0 ]; then
                    ((deleted_count++))
                else
                    log_message "⚠️  删除失败: $backup_subdir"
                fi
            fi
        fi
    done
    
    if [ $deleted_count -gt 0 ]; then
        local total_size_mb=$((total_size / 1024 / 1024))
        log_message "✅ 清理完成,删除 $deleted_count 个目录,释放 ${total_size_mb}MB 空间"
    else
        log_message "✅ 无过期备份需要清理"
    fi
}

# 生成备份报告
generate_binlog_backup_report() {
    local backup_type="$1"
    local status="$2"
    
    log_message "\n=== 二进制日志备份报告 ==="
    log_message "备份时间: $(date '+%Y-%m-%d %H:%M:%S')"
    log_message "备份类型: $backup_type"
    log_message "备份状态: $status"
    
    # 统计备份目录信息
    if [ -d "$BACKUP_DIR" ]; then
        local total_size=$(du -sh "$BACKUP_DIR" | cut -f1)
        local file_count=$(find "$BACKUP_DIR" -name "*.gz" | wc -l)
        
        log_message "备份目录: $BACKUP_DIR"
        log_message "总大小: $total_size"
        log_message "文件数量: $file_count"
        
        # 显示最近的备份
        log_message "\n最近的备份目录:"
        ls -la "$BACKUP_DIR" | grep "^d" | tail -5 | while read line; do
            log_message "  $line"
        done
    fi
    
    log_message "==========================================="
}

# 主备份流程
main() {
    log_message "=== 开始MySQL二进制日志备份 ==="
    
    # 检查MySQL连接
    if ! check_mysql_connection; then
        exit 1
    fi
    
    # 创建备份目录
    if [ ! -d "$BACKUP_DIR" ]; then
        mkdir -p "$BACKUP_DIR"
        log_message "创建备份目录: $BACKUP_DIR"
    fi
    
    # 根据参数决定备份类型
    local backup_type="${1:-incremental}"
    
    case $backup_type in
        "incremental")
            log_message "执行增量备份"
            if incremental_binlog_backup; then
                generate_binlog_backup_report "增量备份" "成功"
            else
                generate_binlog_backup_report "增量备份" "失败"
            fi
            ;;
        "full")
            log_message "执行全量备份"
            if full_binlog_backup; then
                generate_binlog_backup_report "全量备份" "成功"
            else
                generate_binlog_backup_report "全量备份" "失败"
            fi
            ;;
        "cleanup")
            log_message "执行清理操作"
            cleanup_old_backups
            generate_binlog_backup_report "清理操作" "完成"
            ;;
        *)
            log_message "❌ 未知的备份类型: $backup_type"
            log_message "支持的类型: incremental, full, cleanup"
            exit 1
            ;;
    esac
    
    # 清理过期备份(除非是单独的清理操作)
    if [ "$backup_type" != "cleanup" ]; then
        cleanup_old_backups
    fi
    
    log_message "=== MySQL二进制日志备份完成 ==="
}

# 执行主流程
main "$@"

11.5.2 二进制日志管理工具

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MySQL二进制日志管理工具
提供二进制日志的分析、清理和恢复功能
"""

import os
import re
import subprocess
import mysql.connector
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
import logging
import argparse

class MySQLBinlogManager:
    """MySQL二进制日志管理器"""
    
    def __init__(self, host='localhost', port=3306, user='root', password='', 
                 binlog_dir='/var/lib/mysql', backup_dir='/backup/mysql/binlog'):
        self.host = host
        self.port = port
        self.user = user
        self.password = password
        self.binlog_dir = binlog_dir
        self.backup_dir = backup_dir
        
        # 配置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('/var/log/mysql_binlog_manager.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
        
    def get_connection(self):
        """获取MySQL连接"""
        try:
            connection = mysql.connector.connect(
                host=self.host,
                port=self.port,
                user=self.user,
                password=self.password
            )
            return connection
        except mysql.connector.Error as e:
            self.logger.error(f"MySQL连接失败: {e}")
            return None
    
    def get_binary_logs(self) -> List[Dict]:
        """获取二进制日志列表"""
        connection = self.get_connection()
        if not connection:
            return []
        
        try:
            cursor = connection.cursor(dictionary=True)
            cursor.execute("SHOW BINARY LOGS")
            logs = cursor.fetchall()
            
            # 添加文件路径和修改时间
            for log in logs:
                log_path = os.path.join(self.binlog_dir, log['Log_name'])
                if os.path.exists(log_path):
                    stat = os.stat(log_path)
                    log['file_path'] = log_path
                    log['modified_time'] = datetime.fromtimestamp(stat.st_mtime)
                    log['file_size_mb'] = round(stat.st_size / 1024 / 1024, 2)
                else:
                    log['file_path'] = None
                    log['modified_time'] = None
                    log['file_size_mb'] = 0
            
            return logs
            
        except mysql.connector.Error as e:
            self.logger.error(f"获取二进制日志列表失败: {e}")
            return []
        finally:
            connection.close()
    
    def get_master_status(self) -> Optional[Dict]:
        """获取主服务器状态"""
        connection = self.get_connection()
        if not connection:
            return None
        
        try:
            cursor = connection.cursor(dictionary=True)
            cursor.execute("SHOW MASTER STATUS")
            status = cursor.fetchone()
            return status
            
        except mysql.connector.Error as e:
            self.logger.error(f"获取主服务器状态失败: {e}")
            return None
        finally:
            connection.close()
    
    def analyze_binlog_content(self, binlog_file: str, start_datetime: str = None, 
                             stop_datetime: str = None) -> Dict:
        """分析二进制日志内容"""
        self.logger.info(f"分析二进制日志: {binlog_file}")
        
        # 构建mysqlbinlog命令
        cmd = ['mysqlbinlog']
        
        if start_datetime:
            cmd.extend(['--start-datetime', start_datetime])
        if stop_datetime:
            cmd.extend(['--stop-datetime', stop_datetime])
        
        # 添加其他选项
        cmd.extend([
            '--base64-output=DECODE-ROWS',
            '--verbose',
            os.path.join(self.binlog_dir, binlog_file)
        ])
        
        try:
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
            
            if result.returncode != 0:
                self.logger.error(f"mysqlbinlog执行失败: {result.stderr}")
                return {}
            
            # 分析输出内容
            content = result.stdout
            analysis = {
                'total_events': 0,
                'event_types': {},
                'databases': set(),
                'tables': set(),
                'transactions': 0,
                'ddl_statements': [],
                'large_transactions': [],
                'time_range': {'start': None, 'end': None}
            }
            
            # 解析事件
            lines = content.split('\n')
            current_transaction_size = 0
            in_transaction = False
            
            for line in lines:
                # 统计事件类型
                if '# at ' in line and 'server id' in line:
                    analysis['total_events'] += 1
                
                # 检测事件类型
                if 'Query' in line and 'thread_id=' in line:
                    analysis['event_types']['Query'] = analysis['event_types'].get('Query', 0) + 1
                elif 'Table_map' in line:
                    analysis['event_types']['Table_map'] = analysis['event_types'].get('Table_map', 0) + 1
                elif 'Write_rows' in line:
                    analysis['event_types']['Write_rows'] = analysis['event_types'].get('Write_rows', 0) + 1
                elif 'Update_rows' in line:
                    analysis['event_types']['Update_rows'] = analysis['event_types'].get('Update_rows', 0) + 1
                elif 'Delete_rows' in line:
                    analysis['event_types']['Delete_rows'] = analysis['event_types'].get('Delete_rows', 0) + 1
                
                # 检测数据库和表
                if 'use `' in line:
                    db_match = re.search(r'use `([^`]+)`', line)
                    if db_match:
                        analysis['databases'].add(db_match.group(1))
                
                # 检测表操作
                table_match = re.search(r'### (INSERT|UPDATE|DELETE) .* `([^`]+)`\.`([^`]+)`', line)
                if table_match:
                    db_name = table_match.group(2)
                    table_name = table_match.group(3)
                    analysis['databases'].add(db_name)
                    analysis['tables'].add(f"{db_name}.{table_name}")
                
                # 检测DDL语句
                if any(ddl in line.upper() for ddl in ['CREATE', 'ALTER', 'DROP', 'TRUNCATE']):
                    if not line.strip().startswith('#'):
                        analysis['ddl_statements'].append(line.strip())
                
                # 检测事务
                if 'BEGIN' in line:
                    in_transaction = True
                    current_transaction_size = 0
                elif 'COMMIT' in line and in_transaction:
                    analysis['transactions'] += 1
                    if current_transaction_size > 1000000:  # 大于1MB的事务
                        analysis['large_transactions'].append({
                            'size_bytes': current_transaction_size,
                            'size_mb': round(current_transaction_size / 1024 / 1024, 2)
                        })
                    in_transaction = False
                
                if in_transaction:
                    current_transaction_size += len(line)
                
                # 提取时间范围
                time_match = re.search(r'#(\d{6}\s+\d{1,2}:\d{2}:\d{2})', line)
                if time_match:
                    time_str = time_match.group(1)
                    try:
                        # 假设是当前年份
                        current_year = datetime.now().year
                        full_time_str = f"{current_year}{time_str}"
                        event_time = datetime.strptime(full_time_str, '%Y%m%d %H:%M:%S')
                        
                        if analysis['time_range']['start'] is None or event_time < analysis['time_range']['start']:
                            analysis['time_range']['start'] = event_time
                        if analysis['time_range']['end'] is None or event_time > analysis['time_range']['end']:
                            analysis['time_range']['end'] = event_time
                    except ValueError:
                        pass
            
            # 转换集合为列表
            analysis['databases'] = list(analysis['databases'])
            analysis['tables'] = list(analysis['tables'])
            
            return analysis
            
        except subprocess.TimeoutExpired:
            self.logger.error(f"分析二进制日志超时: {binlog_file}")
            return {}
        except Exception as e:
            self.logger.error(f"分析二进制日志失败: {e}")
            return {}
    
    def purge_binary_logs(self, before_log: str = None, before_date: str = None) -> bool:
        """清理二进制日志"""
        connection = self.get_connection()
        if not connection:
            return False
        
        try:
            cursor = connection.cursor()
            
            if before_log:
                self.logger.info(f"清理二进制日志到: {before_log}")
                cursor.execute(f"PURGE BINARY LOGS TO '{before_log}'")
            elif before_date:
                self.logger.info(f"清理二进制日志到日期: {before_date}")
                cursor.execute(f"PURGE BINARY LOGS BEFORE '{before_date}'")
            else:
                self.logger.error("必须指定before_log或before_date参数")
                return False
            
            self.logger.info("二进制日志清理成功")
            return True
            
        except mysql.connector.Error as e:
            self.logger.error(f"清理二进制日志失败: {e}")
            return False
        finally:
            connection.close()
    
    def auto_purge_old_logs(self, retention_days: int = 7) -> bool:
        """自动清理过期的二进制日志"""
        self.logger.info(f"自动清理{retention_days}天前的二进制日志")
        
        # 计算截止日期
        cutoff_date = datetime.now() - timedelta(days=retention_days)
        cutoff_date_str = cutoff_date.strftime('%Y-%m-%d %H:%M:%S')
        
        return self.purge_binary_logs(before_date=cutoff_date_str)
    
    def generate_binlog_report(self) -> Dict:
        """生成二进制日志报告"""
        self.logger.info("生成二进制日志报告")
        
        # 获取基本信息
        binary_logs = self.get_binary_logs()
        master_status = self.get_master_status()
        
        if not binary_logs:
            return {}
        
        # 统计信息
        total_files = len(binary_logs)
        total_size_mb = sum(log.get('file_size_mb', 0) for log in binary_logs)
        
        # 时间范围
        oldest_log = min(binary_logs, key=lambda x: x.get('modified_time', datetime.now()))
        newest_log = max(binary_logs, key=lambda x: x.get('modified_time', datetime.now()))
        
        # 大文件检测
        large_files = [log for log in binary_logs if log.get('file_size_mb', 0) > 100]
        
        report = {
            'summary': {
                'total_files': total_files,
                'total_size_mb': round(total_size_mb, 2),
                'total_size_gb': round(total_size_mb / 1024, 2),
                'oldest_log': oldest_log.get('Log_name'),
                'oldest_time': oldest_log.get('modified_time'),
                'newest_log': newest_log.get('Log_name'),
                'newest_time': newest_log.get('modified_time'),
                'current_log': master_status.get('File') if master_status else None,
                'current_position': master_status.get('Position') if master_status else None
            },
            'large_files': large_files,
            'all_logs': binary_logs,
            'recommendations': []
        }
        
        # 生成建议
        if total_size_gb > 10:
            report['recommendations'].append("二进制日志总大小超过10GB,建议清理旧日志")
        
        if len(large_files) > 0:
            report['recommendations'].append(f"发现{len(large_files)}个大文件(>100MB),可能需要调整binlog_max_size参数")
        
        if total_files > 100:
            report['recommendations'].append("二进制日志文件数量过多,建议增加清理频率")
        
        # 检查时间跨度
        if oldest_log.get('modified_time') and newest_log.get('modified_time'):
            time_span = newest_log['modified_time'] - oldest_log['modified_time']
            if time_span.days > 30:
                report['recommendations'].append(f"二进制日志跨度{time_span.days}天,建议设置自动清理")
        
        return report
    
    def print_report(self, report: Dict):
        """打印报告"""
        if not report:
            print("无法生成报告")
            return
        
        summary = report['summary']
        
        print("\n" + "=" * 60)
        print("           MySQL二进制日志报告")
        print("=" * 60)
        
        print(f"\n📊 总体统计:")
        print(f"   文件数量: {summary['total_files']}")
        print(f"   总大小: {summary['total_size_mb']} MB ({summary['total_size_gb']} GB)")
        print(f"   最旧日志: {summary['oldest_log']} ({summary['oldest_time']})")
        print(f"   最新日志: {summary['newest_log']} ({summary['newest_time']})")
        print(f"   当前日志: {summary['current_log']} (位置: {summary['current_position']})")
        
        if report['large_files']:
            print(f"\n📁 大文件 (>100MB):")
            for log in report['large_files']:
                print(f"   {log['Log_name']}: {log['file_size_mb']} MB")
        
        if report['recommendations']:
            print(f"\n💡 建议:")
            for i, rec in enumerate(report['recommendations'], 1):
                print(f"   {i}. {rec}")
        
        print("\n" + "=" * 60)

def main():
    parser = argparse.ArgumentParser(description='MySQL二进制日志管理工具')
    parser.add_argument('--host', default='localhost', help='MySQL主机')
    parser.add_argument('--port', type=int, default=3306, help='MySQL端口')
    parser.add_argument('--user', default='root', help='MySQL用户名')
    parser.add_argument('--password', default='', help='MySQL密码')
    parser.add_argument('--binlog-dir', default='/var/lib/mysql', help='二进制日志目录')
    parser.add_argument('--backup-dir', default='/backup/mysql/binlog', help='备份目录')
    
    subparsers = parser.add_subparsers(dest='command', help='可用命令')
    
    # 报告命令
    subparsers.add_parser('report', help='生成二进制日志报告')
    
    # 分析命令
    analyze_parser = subparsers.add_parser('analyze', help='分析二进制日志内容')
    analyze_parser.add_argument('binlog_file', help='要分析的二进制日志文件名')
    analyze_parser.add_argument('--start-datetime', help='开始时间 (YYYY-MM-DD HH:MM:SS)')
    analyze_parser.add_argument('--stop-datetime', help='结束时间 (YYYY-MM-DD HH:MM:SS)')
    
    # 清理命令
    purge_parser = subparsers.add_parser('purge', help='清理二进制日志')
    purge_parser.add_argument('--before-log', help='清理到指定日志文件')
    purge_parser.add_argument('--before-date', help='清理到指定日期 (YYYY-MM-DD HH:MM:SS)')
    purge_parser.add_argument('--retention-days', type=int, help='保留天数(自动计算清理日期)')
    
    args = parser.parse_args()
    
    if not args.command:
        parser.print_help()
        return
    
    # 创建管理器
    manager = MySQLBinlogManager(
        host=args.host,
        port=args.port,
        user=args.user,
        password=args.password,
        binlog_dir=args.binlog_dir,
        backup_dir=args.backup_dir
    )
    
    # 执行命令
    if args.command == 'report':
        report = manager.generate_binlog_report()
        manager.print_report(report)
    
    elif args.command == 'analyze':
        analysis = manager.analyze_binlog_content(
            args.binlog_file,
            args.start_datetime,
            args.stop_datetime
        )
        
        if analysis:
            print(f"\n分析结果: {args.binlog_file}")
            print(f"总事件数: {analysis['total_events']}")
            print(f"事件类型: {analysis['event_types']}")
            print(f"涉及数据库: {analysis['databases']}")
            print(f"涉及表: {analysis['tables']}")
            print(f"事务数: {analysis['transactions']}")
            if analysis['large_transactions']:
                print(f"大事务: {len(analysis['large_transactions'])}个")
            if analysis['ddl_statements']:
                print(f"DDL语句: {len(analysis['ddl_statements'])}条")
    
    elif args.command == 'purge':
        if args.retention_days:
            success = manager.auto_purge_old_logs(args.retention_days)
        elif args.before_log:
            success = manager.purge_binary_logs(before_log=args.before_log)
        elif args.before_date:
            success = manager.purge_binary_logs(before_date=args.before_date)
        else:
            print("错误: 必须指定 --retention-days, --before-log 或 --before-date")
            return
        
        if success:
            print("二进制日志清理成功")
        else:
            print("二进制日志清理失败")

if __name__ == '__main__':
    main()

11.6 备份策略实施与自动化

11.6.1 备份策略实施脚本

#!/bin/bash
# MySQL备份策略实施脚本
# 根据数据库大小和业务需求自动选择备份方案

# 配置参数
MYSQL_HOST="localhost"
MYSQL_PORT="3306"
MYSQL_USER="root"
MYSQL_PASSWORD="your_password"
BACKUP_BASE_DIR="/backup/mysql"
LOG_FILE="/var/log/mysql_backup_strategy.log"
CONFIG_FILE="/etc/mysql/backup_strategy.conf"

# 默认配置
DEFAULT_FULL_BACKUP_SCHEDULE="0 2 * * 0"  # 每周日凌晨2点
DEFAULT_INCREMENTAL_SCHEDULE="0 2 * * 1-6"  # 周一到周六凌晨2点
DEFAULT_BINLOG_SCHEDULE="0 */6 * * *"  # 每6小时
DEFAULT_RETENTION_DAYS=30

# 日志函数
log_message() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a $LOG_FILE
}

# 加载配置文件
load_config() {
    if [ -f "$CONFIG_FILE" ]; then
        log_message "加载配置文件: $CONFIG_FILE"
        source "$CONFIG_FILE"
    else
        log_message "配置文件不存在,使用默认配置"
        create_default_config
    fi
}

# 创建默认配置文件
create_default_config() {
    log_message "创建默认配置文件: $CONFIG_FILE"
    
    mkdir -p "$(dirname $CONFIG_FILE)"
    
    cat > "$CONFIG_FILE" << EOF
# MySQL备份策略配置文件

# 备份调度配置
FULL_BACKUP_SCHEDULE="$DEFAULT_FULL_BACKUP_SCHEDULE"
INCREMENTAL_SCHEDULE="$DEFAULT_INCREMENTAL_SCHEDULE"
BINLOG_SCHEDULE="$DEFAULT_BINLOG_SCHEDULE"

# 保留策略
RETENTION_DAYS=$DEFAULT_RETENTION_DAYS
FULL_BACKUP_RETENTION=4  # 保留4个全量备份
INCREMENTAL_RETENTION=14  # 保留14天增量备份

# 备份方法选择阈值(GB)
SMALL_DB_THRESHOLD=1
MEDIUM_DB_THRESHOLD=10
LARGE_DB_THRESHOLD=100

# 压缩和加密
ENABLE_COMPRESSION=true
ENABLE_ENCRYPTION=false
ENCRYPTION_KEY="/etc/mysql/backup.key"

# 远程存储
ENABLE_REMOTE_BACKUP=false
REMOTE_TYPE="s3"  # s3, ftp, rsync
REMOTE_CONFIG="/etc/mysql/remote_backup.conf"

# 通知配置
ENABLE_NOTIFICATIONS=true
NOTIFY_EMAIL="admin@example.com"
NOTIFY_ON_SUCCESS=false
NOTIFY_ON_FAILURE=true
EOF

    log_message "✅ 默认配置文件创建完成"
}

# 检查MySQL连接
check_mysql_connection() {
    mysql -h$MYSQL_HOST -P$MYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD -e "SELECT 1" >/dev/null 2>&1
    return $?
}

# 获取数据库大小信息
get_database_sizes() {
    log_message "获取数据库大小信息..."
    
    mysql -h$MYSQL_HOST -P$MYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD \
        -e "SELECT 
                table_schema AS 'Database',
                ROUND(SUM(data_length + index_length) / 1024 / 1024 / 1024, 2) AS 'Size_GB'
            FROM information_schema.tables 
            WHERE table_schema NOT IN ('information_schema', 'performance_schema', 'mysql', 'sys')
            GROUP BY table_schema
            ORDER BY Size_GB DESC" \
        --skip-column-names --silent
}

# 分析数据库并推荐备份策略
analyze_and_recommend_strategy() {
    log_message "\n=== 分析数据库并推荐备份策略 ==="
    
    local total_size=0
    local db_count=0
    local large_dbs=()
    local medium_dbs=()
    local small_dbs=()
    
    # 获取数据库大小
    while IFS=$'\t' read -r db_name size_gb; do
        if [ -n "$db_name" ] && [ -n "$size_gb" ]; then
            db_count=$((db_count + 1))
            total_size=$(echo "$total_size + $size_gb" | bc -l)
            
            log_message "数据库: $db_name, 大小: ${size_gb}GB"
            
            # 根据大小分类
            if (( $(echo "$size_gb >= $LARGE_DB_THRESHOLD" | bc -l) )); then
                large_dbs+=("$db_name:$size_gb")
            elif (( $(echo "$size_gb >= $MEDIUM_DB_THRESHOLD" | bc -l) )); then
                medium_dbs+=("$db_name:$size_gb")
            else
                small_dbs+=("$db_name:$size_gb")
            fi
        fi
    done < <(get_database_sizes)
    
    log_message "\n📊 数据库统计:"
    log_message "   总数据库数: $db_count"
    log_message "   总大小: ${total_size}GB"
    log_message "   大型数据库(>=${LARGE_DB_THRESHOLD}GB): ${#large_dbs[@]}"
    log_message "   中型数据库(${MEDIUM_DB_THRESHOLD}-${LARGE_DB_THRESHOLD}GB): ${#medium_dbs[@]}"
    log_message "   小型数据库(<${MEDIUM_DB_THRESHOLD}GB): ${#small_dbs[@]}"
    
    # 推荐备份策略
    log_message "\n💡 推荐备份策略:"
    
    if [ ${#large_dbs[@]} -gt 0 ]; then
        log_message "   大型数据库建议使用:"
        log_message "     - XtraBackup物理备份(全量+增量)"
        log_message "     - 每周全量备份,每日增量备份"
        log_message "     - 启用压缩和流式备份"
        log_message "     - 考虑并行备份以提高速度"
    fi
    
    if [ ${#medium_dbs[@]} -gt 0 ]; then
        log_message "   中型数据库建议使用:"
        log_message "     - XtraBackup或mysqlpump"
        log_message "     - 每日全量备份或每周全量+每日增量"
        log_message "     - 启用压缩"
    fi
    
    if [ ${#small_dbs[@]} -gt 0 ]; then
        log_message "   小型数据库建议使用:"
        log_message "     - mysqldump逻辑备份"
        log_message "     - 每日全量备份"
        log_message "     - 可考虑更频繁的备份"
    fi
    
    # 生成具体的备份计划
    generate_backup_plan "$total_size" "${large_dbs[@]}" "${medium_dbs[@]}" "${small_dbs[@]}"
}

# 生成备份计划
generate_backup_plan() {
    local total_size="$1"
    shift
    local all_dbs=("$@")
    
    log_message "\n📋 生成备份计划..."
    
    local plan_file="$BACKUP_BASE_DIR/backup_plan.json"
    mkdir -p "$(dirname $plan_file)"
    
    cat > "$plan_file" << EOF
{
  "generated_at": "$(date -Iseconds)",
  "total_size_gb": $total_size,
  "backup_strategies": {
EOF

    # 为每个数据库生成策略
    local first=true
    for db_info in "${all_dbs[@]}"; do
        if [ -n "$db_info" ]; then
            local db_name=$(echo "$db_info" | cut -d':' -f1)
            local size_gb=$(echo "$db_info" | cut -d':' -f2)
            
            if [ "$first" = "false" ]; then
                echo "," >> "$plan_file"
            fi
            first=false
            
            # 根据大小选择策略
            if (( $(echo "$size_gb >= $LARGE_DB_THRESHOLD" | bc -l) )); then
                cat >> "$plan_file" << EOF
    "$db_name": {
      "size_gb": $size_gb,
      "category": "large",
      "backup_method": "xtrabackup",
      "backup_type": "incremental",
      "full_backup_schedule": "$FULL_BACKUP_SCHEDULE",
      "incremental_schedule": "$INCREMENTAL_SCHEDULE",
      "compression": true,
      "encryption": $ENABLE_ENCRYPTION,
      "parallel_threads": 4,
      "retention_days": $RETENTION_DAYS
    }EOF
            elif (( $(echo "$size_gb >= $MEDIUM_DB_THRESHOLD" | bc -l) )); then
                cat >> "$plan_file" << EOF
    "$db_name": {
      "size_gb": $size_gb,
      "category": "medium",
      "backup_method": "xtrabackup",
      "backup_type": "full",
      "backup_schedule": "0 2 * * *",
      "compression": true,
      "encryption": false,
      "parallel_threads": 2,
      "retention_days": $RETENTION_DAYS
    }EOF
            else
                cat >> "$plan_file" << EOF
    "$db_name": {
      "size_gb": $size_gb,
      "category": "small",
      "backup_method": "mysqldump",
      "backup_type": "full",
      "backup_schedule": "0 1 * * *",
      "compression": true,
      "encryption": false,
      "single_transaction": true,
      "retention_days": $RETENTION_DAYS
    }EOF
            fi
        fi
    done
    
    cat >> "$plan_file" << EOF

  },
  "binlog_backup": {
    "enabled": true,
    "schedule": "$BINLOG_SCHEDULE",
    "retention_days": 7
  },
  "remote_backup": {
    "enabled": $ENABLE_REMOTE_BACKUP,
    "type": "$REMOTE_TYPE",
    "schedule": "0 4 * * *"
  },
  "monitoring": {
    "enabled": true,
    "check_interval": "0 */2 * * *",
    "alert_thresholds": {
      "backup_age_hours": 26,
      "backup_size_change_percent": 50,
      "backup_failure_count": 2
    }
  }
}
EOF

    log_message "✅ 备份计划已生成: $plan_file"
}

# 安装crontab任务
install_cron_jobs() {
    log_message "\n=== 安装定时备份任务 ==="
    
    local cron_file="/tmp/mysql_backup_cron"
    
    # 生成crontab条目
    cat > "$cron_file" << EOF
# MySQL备份定时任务
# 由备份策略脚本自动生成于 $(date)

# 全量备份(每周日凌晨2点)
$FULL_BACKUP_SCHEDULE /usr/local/bin/mysql_backup.sh full >> $LOG_FILE 2>&1

# 增量备份(周一到周六凌晨2点)
$INCREMENTAL_SCHEDULE /usr/local/bin/mysql_backup.sh incremental >> $LOG_FILE 2>&1

# 二进制日志备份(每6小时)
$BINLOG_SCHEDULE /usr/local/bin/mysql_binlog_backup.sh incremental >> $LOG_FILE 2>&1

# 备份监控检查(每2小时)
0 */2 * * * /usr/local/bin/mysql_backup_monitor.sh >> $LOG_FILE 2>&1

# 清理过期备份(每天凌晨4点)
0 4 * * * /usr/local/bin/mysql_backup_cleanup.sh >> $LOG_FILE 2>&1
EOF

    # 安装到crontab
    if crontab -l >/dev/null 2>&1; then
        # 备份现有crontab
        crontab -l > "/tmp/crontab_backup_$(date +%Y%m%d_%H%M%S)"
        log_message "已备份现有crontab"
        
        # 移除旧的MySQL备份任务
        crontab -l | grep -v "mysql_backup" | grep -v "mysql_binlog_backup" | grep -v "mysql_backup_monitor" | grep -v "mysql_backup_cleanup" > "/tmp/crontab_clean"
        
        # 添加新任务
        cat "/tmp/crontab_clean" "$cron_file" | crontab -
    else
        # 直接安装新的crontab
        crontab "$cron_file"
    fi
    
    if [ $? -eq 0 ]; then
        log_message "✅ 定时备份任务安装成功"
        log_message "当前crontab任务:"
        crontab -l | grep -E "mysql_backup|mysql_binlog" | while read line; do
            log_message "  $line"
        done
    else
        log_message "❌ 定时备份任务安装失败"
        return 1
    fi
    
    # 清理临时文件
    rm -f "$cron_file" "/tmp/crontab_clean"
}

# 创建备份脚本
create_backup_scripts() {
    log_message "\n=== 创建备份脚本 ==="
    
    local script_dir="/usr/local/bin"
    
    # 确保脚本目录存在
    if [ ! -d "$script_dir" ]; then
        log_message "创建脚本目录: $script_dir"
        mkdir -p "$script_dir"
    fi
    
    # 创建主备份脚本
    local main_script="$script_dir/mysql_backup.sh"
    cat > "$main_script" << 'EOF'
#!/bin/bash
# MySQL主备份脚本
# 根据备份计划执行相应的备份操作

BACKUP_TYPE="${1:-full}"
CONFIG_FILE="/etc/mysql/backup_strategy.conf"
PLAN_FILE="/backup/mysql/backup_plan.json"

# 加载配置
if [ -f "$CONFIG_FILE" ]; then
    source "$CONFIG_FILE"
fi

# 根据备份类型执行相应操作
case $BACKUP_TYPE in
    "full")
        echo "执行全量备份..."
        # 这里调用具体的全量备份脚本
        /usr/local/bin/xtrabackup_full.sh
        ;;
    "incremental")
        echo "执行增量备份..."
        # 这里调用具体的增量备份脚本
        /usr/local/bin/xtrabackup_incremental.sh
        ;;
    "logical")
        echo "执行逻辑备份..."
        # 这里调用mysqldump备份脚本
        /usr/local/bin/mysqldump_backup.sh
        ;;
    *)
        echo "未知的备份类型: $BACKUP_TYPE"
        exit 1
        ;;
esac
EOF

    chmod +x "$main_script"
    log_message "✅ 创建主备份脚本: $main_script"
    
    # 创建监控脚本
    local monitor_script="$script_dir/mysql_backup_monitor.sh"
    cat > "$monitor_script" << 'EOF'
#!/bin/bash
# MySQL备份监控脚本
# 检查备份状态并发送告警

BACKUP_DIR="/backup/mysql"
LOG_FILE="/var/log/mysql_backup_monitor.log"
MAX_BACKUP_AGE_HOURS=26

# 检查最近的备份
check_recent_backups() {
    local latest_backup=$(find "$BACKUP_DIR" -name "*.sql.gz" -o -name "*.xbstream" | head -1)
    
    if [ -z "$latest_backup" ]; then
        echo "警告: 未找到任何备份文件" | tee -a "$LOG_FILE"
        return 1
    fi
    
    local backup_age=$(( ($(date +%s) - $(stat -c %Y "$latest_backup")) / 3600 ))
    
    if [ $backup_age -gt $MAX_BACKUP_AGE_HOURS ]; then
        echo "警告: 最新备份已超过${backup_age}小时" | tee -a "$LOG_FILE"
        return 1
    fi
    
    echo "备份检查正常,最新备份: $latest_backup (${backup_age}小时前)" | tee -a "$LOG_FILE"
    return 0
}

check_recent_backups
EOF

    chmod +x "$monitor_script"
    log_message "✅ 创建监控脚本: $monitor_script"
    
    # 创建清理脚本
    local cleanup_script="$script_dir/mysql_backup_cleanup.sh"
    cat > "$cleanup_script" << 'EOF'
#!/bin/bash
# MySQL备份清理脚本
# 清理过期的备份文件

BACKUP_DIR="/backup/mysql"
RETENTION_DAYS=30
LOG_FILE="/var/log/mysql_backup_cleanup.log"

# 清理过期备份
echo "[$(date)] 开始清理过期备份..." >> "$LOG_FILE"

find "$BACKUP_DIR" -type f \( -name "*.sql.gz" -o -name "*.xbstream" -o -name "*.tar.gz" \) -mtime +$RETENTION_DAYS -delete

echo "[$(date)] 备份清理完成" >> "$LOG_FILE"
EOF

    chmod +x "$cleanup_script"
    log_message "✅ 创建清理脚本: $cleanup_script"
}

# 验证备份策略
validate_backup_strategy() {
    log_message "\n=== 验证备份策略 ==="
    
    # 检查必要的工具
    local tools=("mysql" "mysqldump" "xtrabackup")
    local missing_tools=()
    
    for tool in "${tools[@]}"; do
        if ! command -v "$tool" >/dev/null 2>&1; then
            missing_tools+=("$tool")
        fi
    done
    
    if [ ${#missing_tools[@]} -gt 0 ]; then
        log_message "⚠️  缺少必要工具: ${missing_tools[*]}"
        log_message "请安装缺少的工具后重新运行"
        return 1
    fi
    
    # 检查备份目录
    if [ ! -d "$BACKUP_BASE_DIR" ]; then
        log_message "创建备份目录: $BACKUP_BASE_DIR"
        mkdir -p "$BACKUP_BASE_DIR"
    fi
    
    # 检查磁盘空间
    local available_space=$(df "$BACKUP_BASE_DIR" | awk 'NR==2 {print $4}')
    local available_gb=$((available_space / 1024 / 1024))
    
    log_message "备份目录可用空间: ${available_gb}GB"
    
    if [ $available_gb -lt 10 ]; then
        log_message "⚠️  备份目录可用空间不足,建议至少保留10GB空间"
    fi
    
    # 测试MySQL连接
    if check_mysql_connection; then
        log_message "✅ MySQL连接正常"
    else
        log_message "❌ MySQL连接失败,请检查连接参数"
        return 1
    fi
    
    log_message "✅ 备份策略验证通过"
    return 0
}

# 生成备份策略报告
generate_strategy_report() {
    log_message "\n=== 生成备份策略报告 ==="
    
    local report_file="$BACKUP_BASE_DIR/backup_strategy_report.html"
    
    cat > "$report_file" << EOF
<!DOCTYPE html>
<html>
<head>
    <title>MySQL备份策略报告</title>
    <meta charset="UTF-8">
    <style>
        body { font-family: Arial, sans-serif; margin: 20px; }
        .header { background-color: #f0f0f0; padding: 10px; border-radius: 5px; }
        .section { margin: 20px 0; }
        .database { background-color: #f9f9f9; padding: 10px; margin: 10px 0; border-radius: 3px; }
        .large { border-left: 5px solid #ff6b6b; }
        .medium { border-left: 5px solid #ffa500; }
        .small { border-left: 5px solid #4ecdc4; }
        table { border-collapse: collapse; width: 100%; }
        th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
        th { background-color: #f2f2f2; }
    </style>
</head>
<body>
    <div class="header">
        <h1>MySQL备份策略报告</h1>
        <p>生成时间: $(date)</p>
        <p>服务器: $MYSQL_HOST:$MYSQL_PORT</p>
    </div>
EOF

    # 添加数据库统计
    echo "    <div class='section'>" >> "$report_file"
    echo "        <h2>数据库统计</h2>" >> "$report_file"
    echo "        <table>" >> "$report_file"
    echo "            <tr><th>数据库</th><th>大小(GB)</th><th>分类</th><th>推荐备份方法</th></tr>" >> "$report_file"
    
    while IFS=$'\t' read -r db_name size_gb; do
        if [ -n "$db_name" ] && [ -n "$size_gb" ]; then
            local category="small"
            local method="mysqldump"
            
            if (( $(echo "$size_gb >= $LARGE_DB_THRESHOLD" | bc -l) )); then
                category="large"
                method="XtraBackup (增量)"
            elif (( $(echo "$size_gb >= $MEDIUM_DB_THRESHOLD" | bc -l) )); then
                category="medium"
                method="XtraBackup (全量)"
            fi
            
            echo "            <tr class='$category'><td>$db_name</td><td>$size_gb</td><td>$category</td><td>$method</td></tr>" >> "$report_file"
        fi
    done < <(get_database_sizes)
    
    echo "        </table>" >> "$report_file"
    echo "    </div>" >> "$report_file"
    
    # 添加备份计划
    if [ -f "$BACKUP_BASE_DIR/backup_plan.json" ]; then
        echo "    <div class='section'>" >> "$report_file"
        echo "        <h2>备份计划</h2>" >> "$report_file"
        echo "        <pre>$(cat $BACKUP_BASE_DIR/backup_plan.json)</pre>" >> "$report_file"
        echo "    </div>" >> "$report_file"
    fi
    
    echo "</body></html>" >> "$report_file"
    
    log_message "✅ 备份策略报告已生成: $report_file"
}

# 主函数
main() {
    log_message "=== MySQL备份策略实施开始 ==="
    
    # 检查参数
    local action="${1:-analyze}"
    
    case $action in
        "analyze")
            log_message "执行数据库分析和策略推荐"
            
            # 加载配置
            load_config
            
            # 验证环境
            if ! validate_backup_strategy; then
                log_message "❌ 环境验证失败,退出"
                exit 1
            fi
            
            # 分析数据库
            analyze_and_recommend_strategy
            
            # 生成报告
            generate_strategy_report
            ;;
        "install")
            log_message "安装备份策略和定时任务"
            
            # 加载配置
            load_config
            
            # 验证环境
            if ! validate_backup_strategy; then
                log_message "❌ 环境验证失败,退出"
                exit 1
            fi
            
            # 创建备份脚本
            create_backup_scripts
            
            # 安装定时任务
            install_cron_jobs
            
            log_message "✅ 备份策略安装完成"
            ;;
        "validate")
            log_message "验证备份策略配置"
            
            load_config
            validate_backup_strategy
            ;;
        "report")
            log_message "生成备份策略报告"
            
            load_config
            generate_strategy_report
            ;;
        *)
            log_message "❌ 未知操作: $action"
            log_message "支持的操作: analyze, install, validate, report"
            exit 1
            ;;
    esac
    
    log_message "=== MySQL备份策略实施完成 ==="
}

# 执行主函数
main "$@"

11.6.2 备份自动化监控

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MySQL备份自动化监控系统
监控备份任务执行状态,发送告警和生成报告
"""

import os
import json
import smtplib
import subprocess
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import Dict, List, Optional
import logging
import argparse
import mysql.connector
from pathlib import Path

class MySQLBackupMonitor:
    """MySQL备份监控器"""
    
    def __init__(self, config_file='/etc/mysql/backup_monitor.conf'):
        self.config_file = config_file
        self.config = self.load_config()
        
        # 配置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(self.config.get('log_file', '/var/log/mysql_backup_monitor.log')),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
        
    def load_config(self) -> Dict:
        """加载配置文件"""
        default_config = {
            'mysql': {
                'host': 'localhost',
                'port': 3306,
                'user': 'root',
                'password': ''
            },
            'backup': {
                'base_dir': '/backup/mysql',
                'max_age_hours': 26,
                'size_change_threshold': 0.5,
                'failure_threshold': 2
            },
            'notification': {
                'enabled': True,
                'smtp_server': 'localhost',
                'smtp_port': 587,
                'smtp_user': '',
                'smtp_password': '',
                'from_email': 'backup@example.com',
                'to_emails': ['admin@example.com'],
                'notify_on_success': False,
                'notify_on_failure': True
            },
            'monitoring': {
                'check_interval': 3600,  # 1小时
                'report_interval': 86400,  # 24小时
                'retention_days': 30
            },
            'log_file': '/var/log/mysql_backup_monitor.log'
        }
        
        if os.path.exists(self.config_file):
            try:
                with open(self.config_file, 'r') as f:
                    user_config = json.load(f)
                    # 递归更新配置
                    self._update_config(default_config, user_config)
            except Exception as e:
                print(f"加载配置文件失败: {e},使用默认配置")
        
        return default_config
    
    def _update_config(self, default: Dict, user: Dict):
        """递归更新配置"""
        for key, value in user.items():
            if key in default and isinstance(default[key], dict) and isinstance(value, dict):
                self._update_config(default[key], value)
            else:
                default[key] = value
    
    def get_mysql_connection(self):
        """获取MySQL连接"""
        try:
            mysql_config = self.config['mysql']
            connection = mysql.connector.connect(
                host=mysql_config['host'],
                port=mysql_config['port'],
                user=mysql_config['user'],
                password=mysql_config['password']
            )
            return connection
        except mysql.connector.Error as e:
            self.logger.error(f"MySQL连接失败: {e}")
            return None
    
    def check_backup_files(self) -> Dict:
        """检查备份文件状态"""
        backup_dir = Path(self.config['backup']['base_dir'])
        max_age_hours = self.config['backup']['max_age_hours']
        
        if not backup_dir.exists():
            return {
                'status': 'error',
                'message': f'备份目录不存在: {backup_dir}',
                'files': []
            }
        
        # 查找备份文件
        backup_files = []
        patterns = ['*.sql.gz', '*.xbstream', '*.tar.gz']
        
        for pattern in patterns:
            backup_files.extend(backup_dir.rglob(pattern))
        
        if not backup_files:
            return {
                'status': 'error',
                'message': '未找到任何备份文件',
                'files': []
            }
        
        # 分析备份文件
        file_info = []
        now = datetime.now()
        
        for file_path in backup_files:
            stat = file_path.stat()
            modified_time = datetime.fromtimestamp(stat.st_mtime)
            age_hours = (now - modified_time).total_seconds() / 3600
            
            file_info.append({
                'path': str(file_path),
                'size_mb': round(stat.st_size / 1024 / 1024, 2),
                'modified_time': modified_time.isoformat(),
                'age_hours': round(age_hours, 2),
                'is_recent': age_hours <= max_age_hours
            })
        
        # 按修改时间排序
        file_info.sort(key=lambda x: x['modified_time'], reverse=True)
        
        # 检查最新备份
        latest_file = file_info[0] if file_info else None
        
        if latest_file and latest_file['age_hours'] <= max_age_hours:
            status = 'ok'
            message = f"最新备份正常,文件: {Path(latest_file['path']).name}"
        else:
            status = 'warning'
            age = latest_file['age_hours'] if latest_file else 'N/A'
            message = f"最新备份过期,年龄: {age}小时"
        
        return {
            'status': status,
            'message': message,
            'files': file_info,
            'latest_backup': latest_file,
            'total_files': len(file_info),
            'total_size_mb': sum(f['size_mb'] for f in file_info)
        }
    
    def check_backup_processes(self) -> Dict:
        """检查备份进程状态"""
        try:
            # 检查是否有备份进程在运行
            result = subprocess.run(
                ['pgrep', '-f', 'mysql.*backup'],
                capture_output=True,
                text=True
            )
            
            running_processes = []
            if result.returncode == 0 and result.stdout.strip():
                pids = result.stdout.strip().split('\n')
                
                for pid in pids:
                    if pid:
                        # 获取进程详细信息
                        ps_result = subprocess.run(
                            ['ps', '-p', pid, '-o', 'pid,ppid,cmd,etime'],
                            capture_output=True,
                            text=True
                        )
                        
                        if ps_result.returncode == 0:
                            lines = ps_result.stdout.strip().split('\n')
                            if len(lines) > 1:
                                running_processes.append(lines[1])
            
            return {
                'status': 'ok' if running_processes else 'info',
                'message': f"发现{len(running_processes)}个备份进程" if running_processes else "无备份进程运行",
                'processes': running_processes
            }
            
        except Exception as e:
            return {
                'status': 'error',
                'message': f"检查备份进程失败: {e}",
                'processes': []
            }
    
    def check_mysql_status(self) -> Dict:
        """检查MySQL状态"""
        connection = self.get_mysql_connection()
        if not connection:
            return {
                'status': 'error',
                'message': 'MySQL连接失败',
                'details': {}
            }
        
        try:
            cursor = connection.cursor(dictionary=True)
            
            # 检查MySQL状态
            status_info = {}
            
            # 获取服务器状态
            cursor.execute("SHOW STATUS LIKE 'Uptime'")
            uptime_result = cursor.fetchone()
            if uptime_result:
                uptime_seconds = int(uptime_result['Value'])
                uptime_hours = uptime_seconds / 3600
                status_info['uptime_hours'] = round(uptime_hours, 2)
            
            # 检查二进制日志状态
            cursor.execute("SHOW MASTER STATUS")
            master_status = cursor.fetchone()
            if master_status:
                status_info['current_binlog'] = master_status['File']
                status_info['binlog_position'] = master_status['Position']
            
            # 检查二进制日志大小
            cursor.execute("SHOW BINARY LOGS")
            binlogs = cursor.fetchall()
            total_binlog_size = sum(log['File_size'] for log in binlogs)
            status_info['binlog_count'] = len(binlogs)
            status_info['total_binlog_size_mb'] = round(total_binlog_size / 1024 / 1024, 2)
            
            return {
                'status': 'ok',
                'message': 'MySQL状态正常',
                'details': status_info
            }
            
        except mysql.connector.Error as e:
            return {
                'status': 'error',
                'message': f'MySQL状态检查失败: {e}',
                'details': {}
            }
        finally:
            connection.close()
    
    def check_disk_space(self) -> Dict:
        """检查磁盘空间"""
        backup_dir = self.config['backup']['base_dir']
        
        try:
            # 获取磁盘使用情况
            result = subprocess.run(
                ['df', '-h', backup_dir],
                capture_output=True,
                text=True
            )
            
            if result.returncode == 0:
                lines = result.stdout.strip().split('\n')
                if len(lines) > 1:
                    fields = lines[1].split()
                    if len(fields) >= 5:
                        total = fields[1]
                        used = fields[2]
                        available = fields[3]
                        use_percent = fields[4].rstrip('%')
                        
                        # 检查使用率
                        use_percent_int = int(use_percent)
                        
                        if use_percent_int > 90:
                            status = 'error'
                            message = f"磁盘空间严重不足: {use_percent}%"
                        elif use_percent_int > 80:
                            status = 'warning'
                            message = f"磁盘空间不足: {use_percent}%"
                        else:
                            status = 'ok'
                            message = f"磁盘空间正常: {use_percent}%"
                        
                        return {
                            'status': status,
                            'message': message,
                            'details': {
                                'total': total,
                                'used': used,
                                'available': available,
                                'use_percent': use_percent_int
                            }
                        }
            
            return {
                'status': 'error',
                'message': '无法获取磁盘空间信息',
                'details': {}
            }
            
        except Exception as e:
            return {
                'status': 'error',
                'message': f'磁盘空间检查失败: {e}',
                'details': {}
            }
    
    def send_notification(self, subject: str, message: str, is_error: bool = False):
        """发送通知邮件"""
        notification_config = self.config['notification']
        
        if not notification_config['enabled']:
            return
        
        # 根据类型决定是否发送
        if is_error and not notification_config['notify_on_failure']:
            return
        if not is_error and not notification_config['notify_on_success']:
            return
        
        try:
            # 创建邮件
            msg = MIMEMultipart()
            msg['From'] = notification_config['from_email']
            msg['To'] = ', '.join(notification_config['to_emails'])
            msg['Subject'] = f"[MySQL备份监控] {subject}"
            
            # 添加邮件内容
            body = f"""
时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
服务器: {self.config['mysql']['host']}:{self.config['mysql']['port']}

{message}

---
此邮件由MySQL备份监控系统自动发送
            """
            
            msg.attach(MIMEText(body, 'plain', 'utf-8'))
            
            # 发送邮件
            server = smtplib.SMTP(notification_config['smtp_server'], notification_config['smtp_port'])
            
            if notification_config['smtp_user']:
                server.starttls()
                server.login(notification_config['smtp_user'], notification_config['smtp_password'])
            
            server.send_message(msg)
            server.quit()
            
            self.logger.info(f"通知邮件已发送: {subject}")
            
        except Exception as e:
            self.logger.error(f"发送通知邮件失败: {e}")
    
    def run_health_check(self) -> Dict:
        """运行健康检查"""
        self.logger.info("开始备份健康检查")
        
        checks = {
            'backup_files': self.check_backup_files(),
            'backup_processes': self.check_backup_processes(),
            'mysql_status': self.check_mysql_status(),
            'disk_space': self.check_disk_space()
        }
        
        # 汇总状态
        overall_status = 'ok'
        error_messages = []
        warning_messages = []
        
        for check_name, result in checks.items():
            if result['status'] == 'error':
                overall_status = 'error'
                error_messages.append(f"{check_name}: {result['message']}")
            elif result['status'] == 'warning':
                if overall_status != 'error':
                    overall_status = 'warning'
                warning_messages.append(f"{check_name}: {result['message']}")
        
        # 生成报告
        report = {
            'timestamp': datetime.now().isoformat(),
            'overall_status': overall_status,
            'checks': checks,
            'summary': {
                'errors': error_messages,
                'warnings': warning_messages
            }
        }
        
        # 发送通知
        if overall_status == 'error':
            subject = "备份监控告警 - 发现错误"
            message = "\n".join(["发现以下错误:"] + error_messages)
            if warning_messages:
                message += "\n\n警告信息:\n" + "\n".join(warning_messages)
            self.send_notification(subject, message, is_error=True)
        elif overall_status == 'warning':
            subject = "备份监控警告"
            message = "\n".join(["发现以下警告:"] + warning_messages)
            self.send_notification(subject, message, is_error=False)
        
        self.logger.info(f"健康检查完成,状态: {overall_status}")
        return report
    
    def generate_daily_report(self) -> Dict:
        """生成每日报告"""
        self.logger.info("生成每日备份报告")
        
        # 运行健康检查
        health_report = self.run_health_check()
        
        # 获取备份统计
        backup_files = health_report['checks']['backup_files']
        
        # 计算24小时内的备份
        recent_backups = []
        if backup_files['status'] != 'error':
            cutoff_time = datetime.now() - timedelta(hours=24)
            for file_info in backup_files['files']:
                file_time = datetime.fromisoformat(file_info['modified_time'])
                if file_time >= cutoff_time:
                    recent_backups.append(file_info)
        
        report = {
            'date': datetime.now().strftime('%Y-%m-%d'),
            'health_status': health_report['overall_status'],
            'backup_summary': {
                'total_backups': backup_files.get('total_files', 0),
                'recent_backups_24h': len(recent_backups),
                'total_size_mb': backup_files.get('total_size_mb', 0),
                'latest_backup': backup_files.get('latest_backup')
            },
            'mysql_status': health_report['checks']['mysql_status'],
            'disk_usage': health_report['checks']['disk_space'],
            'issues': {
                'errors': health_report['summary']['errors'],
                'warnings': health_report['summary']['warnings']
            }
        }
        
        # 保存报告
        report_file = f"{self.config['backup']['base_dir']}/reports/daily_report_{datetime.now().strftime('%Y%m%d')}.json"
        os.makedirs(os.path.dirname(report_file), exist_ok=True)
        
        with open(report_file, 'w') as f:
            json.dump(report, f, indent=2, ensure_ascii=False)
        
        self.logger.info(f"每日报告已保存: {report_file}")
        
        # 发送每日报告邮件
        if self.config['notification']['enabled']:
            subject = f"MySQL备份每日报告 - {report['date']}"
            message = f"""
备份状态: {report['health_status']}
总备份数: {report['backup_summary']['total_backups']}
24小时内备份: {report['backup_summary']['recent_backups_24h']}
总大小: {report['backup_summary']['total_size_mb']} MB

磁盘使用率: {report['disk_usage']['details'].get('use_percent', 'N/A')}%

问题汇总:
错误: {len(report['issues']['errors'])}
警告: {len(report['issues']['warnings'])}
            """
            
            if report['issues']['errors']:
                message += "\n\n错误详情:\n" + "\n".join(report['issues']['errors'])
            
            if report['issues']['warnings']:
                message += "\n\n警告详情:\n" + "\n".join(report['issues']['warnings'])
            
            self.send_notification(subject, message, is_error=False)
        
        return report

def main():
    parser = argparse.ArgumentParser(description='MySQL备份监控系统')
    parser.add_argument('--config', default='/etc/mysql/backup_monitor.conf', help='配置文件路径')
    parser.add_argument('--action', choices=['check', 'report', 'daemon'], default='check', help='执行动作')
    parser.add_argument('--daemon-interval', type=int, default=3600, help='守护进程检查间隔(秒)')
    
    args = parser.parse_args()
    
    # 创建监控器
    monitor = MySQLBackupMonitor(args.config)
    
    if args.action == 'check':
        # 执行健康检查
        report = monitor.run_health_check()
        print(json.dumps(report, indent=2, ensure_ascii=False))
    
    elif args.action == 'report':
        # 生成每日报告
        report = monitor.generate_daily_report()
        print(f"每日报告已生成,状态: {report['health_status']}")
    
    elif args.action == 'daemon':
        # 守护进程模式
        import time
        
        monitor.logger.info(f"启动守护进程模式,检查间隔: {args.daemon_interval}秒")
        
        last_daily_report = datetime.now().date()
        
        while True:
            try:
                # 执行健康检查
                monitor.run_health_check()
                
                # 检查是否需要生成每日报告
                current_date = datetime.now().date()
                if current_date > last_daily_report:
                    monitor.generate_daily_report()
                    last_daily_report = current_date
                
                # 等待下次检查
                time.sleep(args.daemon_interval)
                
            except KeyboardInterrupt:
                monitor.logger.info("收到中断信号,退出守护进程")
                break
            except Exception as e:
                monitor.logger.error(f"守护进程执行错误: {e}")
                time.sleep(60)  # 出错后等待1分钟再继续

if __name__ == '__main__':
    main()

11.7 总结

11.7.1 本章核心要点

本章详细介绍了MySQL备份与恢复的完整体系,涵盖了以下核心内容:

1. 备份策略设计

  • 备份类型选择:根据数据库大小、业务需求和技术水平选择合适的备份方法
  • RPO/RTO规划:明确恢复点目标和恢复时间目标,制定相应的备份频率
  • 存储策略:合理规划备份存储空间,实现本地和远程备份的结合
  • 自动化实施:通过脚本和工具实现备份策略的自动化执行

2. 备份工具掌握

  • mysqldump:适用于小到中型数据库的逻辑备份,支持跨版本恢复
  • mysqlpump:MySQL 5.7+的并行逻辑备份工具,提升备份效率
  • XtraBackup:企业级物理备份工具,支持热备份和增量备份
  • 工具选择:基于数据库规模、性能要求和预算进行合理选择

3. 逻辑备份实践

  • 完整备份流程:从连接检查到备份验证的完整自动化流程
  • 高级技巧:并行备份、选择性备份、结构与数据分离
  • 性能优化:通过参数调优和并行处理提升备份效率
  • 脚本化管理:实现备份任务的自动化和标准化

4. 物理备份实践

  • XtraBackup配置:掌握全量备份、增量备份的配置和执行
  • 高级特性:流式备份、压缩加密、并行处理
  • 备份管理:备份验证、清理和监控的自动化实现
  • 恢复流程:全量恢复和增量恢复的标准化操作

5. 备份恢复流程

  • 逻辑恢复:单库恢复、批量恢复和时间点恢复
  • 物理恢复:XtraBackup的prepare和restore流程
  • 恢复验证:确保恢复数据的完整性和一致性
  • 应急处理:快速恢复和故障切换的应急预案

6. 备份验证与演练

  • 自动化验证:备份文件完整性和可恢复性的自动检查
  • 定期演练:模拟故障场景,验证恢复流程的有效性
  • 监控告警:实时监控备份状态,及时发现和处理问题
  • 报告生成:定期生成备份状态报告,为决策提供依据

7. 云端备份集成

  • 云存储集成:与AWS S3等云存储服务的集成
  • 混合备份策略:本地备份与云端备份的结合
  • 成本优化:通过存储类别和生命周期管理优化成本
  • 安全考虑:传输加密和存储加密的实现

8. 二进制日志管理

  • 增量备份:基于二进制日志的增量备份策略
  • 日志分析:二进制日志的解析和分析工具
  • 时间点恢复:精确到秒级的时间点恢复实现
  • 日志清理:自动化的日志清理和归档策略

9. 监控与自动化

  • 健康检查:备份文件、进程、MySQL状态的全面检查
  • 告警机制:多渠道告警通知,确保问题及时响应
  • 自动化部署:备份策略的自动化分析、部署和维护
  • 报告系统:详细的备份报告和统计分析

11.7.2 实施建议

1. 分阶段实施

第一阶段:基础备份
- 建立基本的mysqldump备份
- 实现备份文件的本地存储
- 配置基础的监控和告警

第二阶段:策略优化
- 根据数据库规模选择合适工具
- 实现增量备份和压缩
- 建立备份验证机制

第三阶段:高级特性
- 部署XtraBackup物理备份
- 实现云端备份集成
- 建立完整的监控体系

第四阶段:自动化完善
- 实现备份策略自动化
- 建立应急恢复预案
- 定期进行恢复演练

2. 最佳实践

  • 3-2-1原则:3份备份,2种介质,1份异地
  • 定期测试:至少每月进行一次恢复测试
  • 文档维护:保持备份和恢复文档的及时更新
  • 权限管理:严格控制备份文件的访问权限
  • 加密保护:对敏感数据备份进行加密处理

3. 常见问题处理

  • 备份失败:检查磁盘空间、权限和网络连接
  • 恢复缓慢:优化I/O性能,使用并行恢复
  • 数据不一致:确保备份时的事务一致性
  • 空间不足:实施备份压缩和清理策略

11.7.3 下一步学习方向

完成MySQL备份与恢复的学习后,建议继续深入以下领域:

  1. MySQL性能优化

    • 查询优化和索引设计
    • 服务器参数调优
    • 硬件配置优化
  2. MySQL高可用架构

    • 主从复制配置
    • MySQL Cluster部署
    • 故障切换机制
  3. MySQL监控与运维

    • 性能监控体系
    • 自动化运维工具
    • 容量规划方法
  4. MySQL安全加固

    • 深度安全配置
    • 审计日志分析
    • 合规性管理

11.7.4 章节总结

MySQL备份与恢复是数据库管理的核心技能,直接关系到数据的安全性和业务的连续性。通过本章的学习,您应该能够:

  • 设计适合业务需求的备份策略
  • 熟练使用各种备份工具和技术
  • 建立完整的备份监控和告警体系
  • 在故障发生时快速有效地进行数据恢复
  • 通过自动化手段提升备份管理效率

记住,备份不是目的,能够成功恢复才是关键。定期的恢复演练和备份验证是确保备份策略有效性的重要手段。


下一章预告:第12章将介绍《MySQL性能优化与监控》,包括查询优化、索引设计、参数调优、性能监控等内容,帮助您构建高性能的MySQL数据库系统。

11.3.2 XtraBackup高级特性

import subprocess
import os
import json
import threading
import time
from datetime import datetime, timedelta

class XtraBackupManager:
    def __init__(self, config):
        self.config = config
        self.backup_history = []
        
    def create_backup_schedule(self):
        """创建备份计划"""
        schedule = {
            'full_backup': {
                'frequency': 'weekly',
                'day': 'sunday',
                'time': '02:00',
                'retention': '4 weeks'
            },
            'incremental_backup': {
                'frequency': 'daily',
                'time': '02:00',
                'retention': '2 weeks',
                'exclude_days': ['sunday']  # 全备份日不做增量
            },
            'cleanup': {
                'frequency': 'daily',
                'time': '06:00'
            }
        }
        
        return schedule
    
    def parallel_database_backup(self, databases, max_workers=3):
        """并行数据库备份"""
        print(f"\n=== 并行数据库备份 ===")
        print(f"数据库数量: {len(databases)}")
        print(f"并行度: {max_workers}")
        
        results = []
        threads = []
        
        # 创建线程池
        for i in range(min(max_workers, len(databases))):
            if i < len(databases):
                thread = threading.Thread(
                    target=self._backup_database_worker,
                    args=(databases[i], i, results)
                )
                thread.start()
                threads.append(thread)
        
        # 等待所有线程完成
        for thread in threads:
            thread.join()
        
        return results
    
    def _backup_database_worker(self, database, worker_id, results):
        """数据库备份工作线程"""
        start_time = datetime.now()
        backup_dir = f"{self.config['backup_dir']}/{database}_{start_time.strftime('%Y%m%d_%H%M%S')}"
        
        print(f"Worker {worker_id}: 开始备份数据库 {database}")
        
        cmd = [
            'xtrabackup',
            '--backup',
            f"--host={self.config['host']}",
            f"--port={self.config['port']}",
            f"--user={self.config['user']}",
            f"--password={self.config['password']}",
            f"--databases={database}",
            f"--target-dir={backup_dir}",
            f"--parallel={self.config.get('parallel_threads', 4)}"
        ]
        
        # 添加压缩和加密选项
        if self.config.get('compression', False):
            cmd.extend(['--compress', f"--compress-threads={self.config.get('parallel_threads', 4)}"])
        
        if self.config.get('encryption', False) and self.config.get('encryption_key'):
            cmd.extend([
                '--encrypt=AES256',
                f"--encrypt-key-file={self.config['encryption_key']}",
                f"--encrypt-threads={self.config.get('parallel_threads', 4)}"
            ])
        
        try:
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=7200)
            end_time = datetime.now()
            duration = (end_time - start_time).total_seconds()
            
            if result.returncode == 0:
                backup_size = self._get_directory_size(backup_dir)
                print(f"Worker {worker_id}: ✅ {database} 备份成功 ({backup_size:.1f} MB, {duration:.1f}s)")
                
                results.append({
                    'database': database,
                    'status': 'success',
                    'backup_dir': backup_dir,
                    'size_mb': backup_size,
                    'duration': duration,
                    'worker_id': worker_id
                })
            else:
                print(f"Worker {worker_id}: ❌ {database} 备份失败: {result.stderr}")
                # 清理失败的备份目录
                if os.path.exists(backup_dir):
                    subprocess.run(['rm', '-rf', backup_dir])
                
                results.append({
                    'database': database,
                    'status': 'failed',
                    'error': result.stderr,
                    'duration': duration,
                    'worker_id': worker_id
                })
        
        except subprocess.TimeoutExpired:
            print(f"Worker {worker_id}: ⏰ {database} 备份超时")
            if os.path.exists(backup_dir):
                subprocess.run(['rm', '-rf', backup_dir])
            
            results.append({
                'database': database,
                'status': 'timeout',
                'duration': 7200,
                'worker_id': worker_id
            })
        
        except Exception as e:
            print(f"Worker {worker_id}: 💥 {database} 备份异常: {e}")
            results.append({
                'database': database,
                'status': 'error',
                'error': str(e),
                'worker_id': worker_id
            })
    
    def _get_directory_size(self, directory):
        """获取目录大小(MB)"""
        try:
            result = subprocess.run(['du', '-sm', directory], capture_output=True, text=True)
            if result.returncode == 0:
                return float(result.stdout.split()[0])
        except:
            pass
        return 0.0
    
    def streaming_backup(self, target_host, target_path):
        """流式备份到远程主机"""
        print(f"\n=== 流式备份到远程主机 ===")
        print(f"目标主机: {target_host}")
        print(f"目标路径: {target_path}")
        
        backup_name = f"stream_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xbstream"
        
        # 构建流式备份命令
        backup_cmd = [
            'xtrabackup',
            '--backup',
            f"--host={self.config['host']}",
            f"--port={self.config['port']}",
            f"--user={self.config['user']}",
            f"--password={self.config['password']}",
            '--stream=xbstream',
            f"--parallel={self.config.get('parallel_threads', 4)}"
        ]
        
        # 添加压缩
        if self.config.get('compression', False):
            backup_cmd.extend(['--compress', f"--compress-threads={self.config.get('parallel_threads', 4)}"])
        
        # 构建传输命令
        transfer_cmd = [
            'ssh',
            target_host,
            f"cat > {target_path}/{backup_name}"
        ]
        
        try:
            print(f"执行流式备份: {' '.join(backup_cmd)} | {' '.join(transfer_cmd)}")
            
            # 创建管道
            backup_process = subprocess.Popen(backup_cmd, stdout=subprocess.PIPE)
            transfer_process = subprocess.Popen(transfer_cmd, stdin=backup_process.stdout, 
                                              stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            
            backup_process.stdout.close()
            
            # 等待完成
            transfer_output, transfer_error = transfer_process.communicate()
            backup_process.wait()
            
            if backup_process.returncode == 0 and transfer_process.returncode == 0:
                print(f"✅ 流式备份成功")
                print(f"   远程文件: {target_host}:{target_path}/{backup_name}")
                
                return {
                    'status': 'success',
                    'remote_file': f"{target_host}:{target_path}/{backup_name}"
                }
            else:
                print(f"❌ 流式备份失败")
                if backup_process.returncode != 0:
                    print(f"   备份进程错误: {backup_process.returncode}")
                if transfer_process.returncode != 0:
                    print(f"   传输进程错误: {transfer_error.decode()}")
                
                return {
                    'status': 'failed',
                    'backup_error': backup_process.returncode,
                    'transfer_error': transfer_error.decode()
                }
        
        except Exception as e:
            print(f"💥 流式备份异常: {e}")
            return {
                'status': 'error',
                'error': str(e)
            }
    
    def partial_backup(self, include_tables=None, exclude_tables=None):
        """部分备份"""
        print(f"\n=== 部分备份 ===")
        
        backup_dir = f"{self.config['backup_dir']}/partial_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        
        cmd = [
            'xtrabackup',
            '--backup',
            f"--host={self.config['host']}",
            f"--port={self.config['port']}",
            f"--user={self.config['user']}",
            f"--password={self.config['password']}",
            f"--target-dir={backup_dir}",
            f"--parallel={self.config.get('parallel_threads', 4)}"
        ]
        
        # 添加包含表
        if include_tables:
            tables_regex = '|'.join([f"^{table}$" for table in include_tables])
            cmd.extend(['--tables', tables_regex])
            print(f"包含表: {include_tables}")
        
        # 添加排除表
        if exclude_tables:
            exclude_regex = '|'.join([f"^{table}$" for table in exclude_tables])
            cmd.extend(['--tables-exclude', exclude_regex])
            print(f"排除表: {exclude_tables}")
        
        try:
            start_time = datetime.now()
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=3600)
            end_time = datetime.now()
            duration = (end_time - start_time).total_seconds()
            
            if result.returncode == 0:
                backup_size = self._get_directory_size(backup_dir)
                print(f"✅ 部分备份成功")
                print(f"   备份目录: {backup_dir}")
                print(f"   备份大小: {backup_size:.1f} MB")
                print(f"   备份耗时: {duration:.1f}秒")
                
                return {
                    'status': 'success',
                    'backup_dir': backup_dir,
                    'size_mb': backup_size,
                    'duration': duration
                }
            else:
                print(f"❌ 部分备份失败: {result.stderr}")
                if os.path.exists(backup_dir):
                    subprocess.run(['rm', '-rf', backup_dir])
                
                return {
                    'status': 'failed',
                    'error': result.stderr
                }
        
        except subprocess.TimeoutExpired:
            print(f"⏰ 部分备份超时")
            if os.path.exists(backup_dir):
                subprocess.run(['rm', '-rf', backup_dir])
            
            return {
                'status': 'timeout'
            }
        
        except Exception as e:
            print(f"💥 部分备份异常: {e}")
            return {
                'status': 'error',
                'error': str(e)
            }
    
    def backup_with_monitoring(self, backup_type='full'):
        """带监控的备份"""
        print(f"\n=== 带监控的{backup_type}备份 ===")
        
        # 启动监控线程
        monitoring_data = {'running': True, 'metrics': []}
        monitor_thread = threading.Thread(
            target=self._monitor_backup_process,
            args=(monitoring_data,)
        )
        monitor_thread.start()
        
        try:
            # 执行备份
            if backup_type == 'full':
                result = self._execute_full_backup()
            elif backup_type == 'incremental':
                result = self._execute_incremental_backup()
            else:
                result = {'status': 'error', 'error': f'Unknown backup type: {backup_type}'}
            
            # 停止监控
            monitoring_data['running'] = False
            monitor_thread.join()
            
            # 生成监控报告
            self._generate_monitoring_report(monitoring_data['metrics'], result)
            
            return result
        
        except Exception as e:
            monitoring_data['running'] = False
            monitor_thread.join()
            print(f"💥 备份异常: {e}")
            return {'status': 'error', 'error': str(e)}
    
    def _monitor_backup_process(self, monitoring_data):
        """监控备份进程"""
        while monitoring_data['running']:
            try:
                # 获取系统指标
                timestamp = datetime.now()
                
                # CPU使用率
                cpu_result = subprocess.run(['top', '-bn1'], capture_output=True, text=True)
                cpu_usage = 0.0
                for line in cpu_result.stdout.split('\n'):
                    if 'xtrabackup' in line:
                        parts = line.split()
                        if len(parts) > 8:
                            cpu_usage = float(parts[8])
                        break
                
                # 内存使用
                mem_result = subprocess.run(['free', '-m'], capture_output=True, text=True)
                mem_lines = mem_result.stdout.split('\n')
                if len(mem_lines) > 1:
                    mem_parts = mem_lines[1].split()
                    if len(mem_parts) > 2:
                        mem_used = int(mem_parts[2])
                        mem_total = int(mem_parts[1])
                        mem_usage = (mem_used / mem_total) * 100
                    else:
                        mem_usage = 0.0
                else:
                    mem_usage = 0.0
                
                # 磁盘I/O
                io_result = subprocess.run(['iostat', '-x', '1', '1'], capture_output=True, text=True)
                io_usage = 0.0
                for line in io_result.stdout.split('\n'):
                    if 'sda' in line or 'nvme' in line:
                        parts = line.split()
                        if len(parts) > 9:
                            io_usage = float(parts[9])
                        break
                
                monitoring_data['metrics'].append({
                    'timestamp': timestamp,
                    'cpu_usage': cpu_usage,
                    'memory_usage': mem_usage,
                    'io_usage': io_usage
                })
                
                time.sleep(5)
            
            except Exception as e:
                print(f"监控异常: {e}")
                time.sleep(5)
    
    def _generate_monitoring_report(self, metrics, backup_result):
        """生成监控报告"""
        if not metrics:
            return
        
        print(f"\n📊 备份监控报告:")
        print("-" * 30)
        
        # 计算平均值
        avg_cpu = sum(m['cpu_usage'] for m in metrics) / len(metrics)
        avg_memory = sum(m['memory_usage'] for m in metrics) / len(metrics)
        avg_io = sum(m['io_usage'] for m in metrics) / len(metrics)
        
        # 计算峰值
        max_cpu = max(m['cpu_usage'] for m in metrics)
        max_memory = max(m['memory_usage'] for m in metrics)
        max_io = max(m['io_usage'] for m in metrics)
        
        print(f"监控时长: {len(metrics) * 5}秒")
        print(f"CPU使用率: 平均 {avg_cpu:.1f}%, 峰值 {max_cpu:.1f}%")
        print(f"内存使用率: 平均 {avg_memory:.1f}%, 峰值 {max_memory:.1f}%")
        print(f"磁盘I/O: 平均 {avg_io:.1f}%, 峰值 {max_io:.1f}%")
        
        if backup_result['status'] == 'success':
            print(f"✅ 备份成功完成")
        else:
            print(f"❌ 备份失败: {backup_result.get('error', 'Unknown error')}")
    
    def _execute_full_backup(self):
        """执行全量备份"""
        # 这里应该调用实际的全量备份逻辑
        # 为了示例,我们模拟一个备份过程
        time.sleep(10)  # 模拟备份时间
        return {'status': 'success', 'backup_dir': '/backup/full_backup'}
    
    def _execute_incremental_backup(self):
        """执行增量备份"""
        # 这里应该调用实际的增量备份逻辑
        time.sleep(5)  # 模拟备份时间
        return {'status': 'success', 'backup_dir': '/backup/incremental_backup'}

# XtraBackup高级特性使用示例
config = {
    'host': 'localhost',
    'port': '3306',
    'user': 'backup_user',
    'password': 'backup_password',
    'backup_dir': '/backup/mysql/xtrabackup',
    'parallel_threads': 4,
    'compression': True,
    'encryption': True,
    'encryption_key': '/etc/mysql/backup.key'
}

xtrabackup_manager = XtraBackupManager(config)

# 示例1: 并行数据库备份
databases = ['ecommerce', 'analytics', 'logs', 'cache']
parallel_results = xtrabackup_manager.parallel_database_backup(databases, max_workers=2)

print("\n📊 并行备份结果:")
for result in parallel_results:
    status_icon = "✅" if result['status'] == 'success' else "❌"
    print(f"  {status_icon} {result['database']} - {result['status']}")

# 示例2: 流式备份
streaming_result = xtrabackup_manager.streaming_backup('backup-server.example.com', '/backup/mysql')
print(f"\n📊 流式备份结果: {streaming_result['status']}")

# 示例3: 部分备份
partial_result = xtrabackup_manager.partial_backup(
    include_tables=['users', 'orders', 'products'],
    exclude_tables=['logs', 'temp_data']
)
print(f"\n📊 部分备份结果: {partial_result['status']}")

# 示例4: 带监控的备份
monitored_result = xtrabackup_manager.backup_with_monitoring('full')
print(f"\n📊 监控备份结果: {monitored_result['status']}")

11.4 备份恢复流程

11.4.1 逻辑备份恢复

#!/bin/bash
# MySQL逻辑备份恢复脚本

# 配置参数
MYSQL_HOST="localhost"
MYSQL_PORT="3306"
MYSQL_USER="root"
MYSQL_PASSWORD="your_password"
BACKUP_DIR="/backup/mysql/logical"
LOG_FILE="/var/log/mysql_restore.log"

# 日志函数
log_message() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a $LOG_FILE
}

# 检查MySQL连接
check_mysql_connection() {
    log_message "检查MySQL连接..."
    
    mysql -h$MYSQL_HOST -P$MYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD -e "SELECT 1" >/dev/null 2>&1
    
    if [ $? -eq 0 ]; then
        log_message "✅ MySQL连接正常"
        return 0
    else
        log_message "❌ MySQL连接失败"
        return 1
    fi
}

# 验证备份文件
validate_backup_file() {
    local backup_file="$1"
    
    log_message "验证备份文件: $backup_file"
    
    # 检查文件是否存在
    if [ ! -f "$backup_file" ]; then
        log_message "❌ 备份文件不存在: $backup_file"
        return 1
    fi
    
    # 检查文件大小
    local file_size=$(stat -c%s "$backup_file")
    if [ $file_size -eq 0 ]; then
        log_message "❌ 备份文件为空: $backup_file"
        return 1
    fi
    
    # 检查文件格式(SQL文件应该包含SQL语句)
    if [[ "$backup_file" == *.sql ]]; then
        if ! head -10 "$backup_file" | grep -q "CREATE\|INSERT\|DROP"; then
            log_message "⚠️  备份文件可能不是有效的SQL文件"
        fi
    elif [[ "$backup_file" == *.sql.gz ]]; then
        if ! zcat "$backup_file" | head -10 | grep -q "CREATE\|INSERT\|DROP"; then
            log_message "⚠️  压缩备份文件可能不是有效的SQL文件"
        fi
    fi
    
    log_message "✅ 备份文件验证通过"
    log_message "   文件大小: $(du -h $backup_file | cut -f1)"
    return 0
}

# 创建数据库(如果不存在)
create_database_if_not_exists() {
    local database_name="$1"
    
    log_message "检查数据库: $database_name"
    
    # 检查数据库是否存在
    local db_exists=$(mysql -h$MYSQL_HOST -P$MYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD \
        -e "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME='$database_name'" \
        --skip-column-names --silent)
    
    if [ -z "$db_exists" ]; then
        log_message "创建数据库: $database_name"
        mysql -h$MYSQL_HOST -P$MYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD \
            -e "CREATE DATABASE $database_name DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci"
        
        if [ $? -eq 0 ]; then
            log_message "✅ 数据库创建成功: $database_name"
        else
            log_message "❌ 数据库创建失败: $database_name"
            return 1
        fi
    else
        log_message "✅ 数据库已存在: $database_name"
    fi
    
    return 0
}

# 恢复单个数据库
restore_database() {
    local backup_file="$1"
    local database_name="$2"
    local restore_mode="${3:-replace}"  # replace, append, skip_existing
    
    log_message "\n=== 开始恢复数据库 ==="
    log_message "备份文件: $backup_file"
    log_message "目标数据库: $database_name"
    log_message "恢复模式: $restore_mode"
    
    # 验证备份文件
    if ! validate_backup_file "$backup_file"; then
        return 1
    fi
    
    # 创建数据库
    if ! create_database_if_not_exists "$database_name"; then
        return 1
    fi
    
    # 根据恢复模式处理
    case $restore_mode in
        "replace")
            log_message "清空目标数据库..."
            mysql -h$MYSQL_HOST -P$MYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD \
                -e "DROP DATABASE IF EXISTS $database_name; CREATE DATABASE $database_name DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci"
            ;;
        "append")
            log_message "追加模式,保留现有数据"
            ;;
        "skip_existing")
            log_message "跳过已存在的表"
            ;;
    esac
    
    # 执行恢复
    local start_time=$(date +%s)
    
    log_message "开始恢复数据..."
    
    if [[ "$backup_file" == *.gz ]]; then
        # 压缩文件恢复
        log_message "检测到压缩文件,使用zcat解压恢复"
        zcat "$backup_file" | mysql -h$MYSQL_HOST -P$MYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD $database_name
    else
        # 普通SQL文件恢复
        mysql -h$MYSQL_HOST -P$MYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD $database_name < "$backup_file"
    fi
    
    local restore_result=$?
    local end_time=$(date +%s)
    local duration=$((end_time - start_time))
    
    if [ $restore_result -eq 0 ]; then
        log_message "✅ 数据库恢复成功"
        log_message "   恢复耗时: ${duration}秒"
        
        # 验证恢复结果
        verify_restore_result "$database_name"
        
        return 0
    else
        log_message "❌ 数据库恢复失败"
        return 1
    fi
}

# 验证恢复结果
verify_restore_result() {
    local database_name="$1"
    
    log_message "验证恢复结果..."
    
    # 检查表数量
    local table_count=$(mysql -h$MYSQL_HOST -P$MYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD \
        -e "SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='$database_name'" \
        --skip-column-names --silent)
    
    log_message "   表数量: $table_count"
    
    # 检查数据量(前5个表)
    local tables=$(mysql -h$MYSQL_HOST -P$MYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD \
        -e "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='$database_name' LIMIT 5" \
        --skip-column-names --silent)
    
    log_message "   数据量统计:"
    while IFS= read -r table; do
        if [ -n "$table" ]; then
            local row_count=$(mysql -h$MYSQL_HOST -P$MYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD \
                -e "SELECT COUNT(*) FROM $database_name.$table" \
                --skip-column-names --silent 2>/dev/null)
            
            if [ $? -eq 0 ]; then
                log_message "     $table: $row_count 行"
            else
                log_message "     $table: 无法统计(可能是视图或特殊表)"
            fi
        fi
    done <<< "$tables"
}

# 批量恢复
batch_restore() {
    local backup_pattern="$1"
    local target_database_prefix="${2:-restored_}"
    
    log_message "\n=== 批量恢复 ==="
    log_message "备份模式: $backup_pattern"
    log_message "数据库前缀: $target_database_prefix"
    
    local success_count=0
    local failed_count=0
    
    # 查找匹配的备份文件
    for backup_file in $backup_pattern; do
        if [ -f "$backup_file" ]; then
            # 从文件名提取数据库名
            local filename=$(basename "$backup_file")
            local database_name
            
            # 处理不同的文件名格式
            if [[ "$filename" =~ ^(.+)_[0-9]{8}_[0-9]{6}\.sql(\.gz)?$ ]]; then
                database_name="${BASH_REMATCH[1]}"
            elif [[ "$filename" =~ ^(.+)\.sql(\.gz)?$ ]]; then
                database_name="${BASH_REMATCH[1]}"
            else
                log_message "⚠️  无法从文件名提取数据库名: $filename"
                database_name="unknown_$(date +%s)"
            fi
            
            # 添加前缀
            local target_database="${target_database_prefix}${database_name}"
            
            log_message "\n--- 恢复 $filename 到 $target_database ---"
            
            if restore_database "$backup_file" "$target_database" "replace"; then
                ((success_count++))
                log_message "✅ $filename 恢复成功"
            else
                ((failed_count++))
                log_message "❌ $filename 恢复失败"
            fi
        fi
    done
    
    log_message "\n=== 批量恢复完成 ==="
    log_message "成功: $success_count 个"
    log_message "失败: $failed_count 个"
    
    return $failed_count
}

# 时间点恢复(PITR)
point_in_time_recovery() {
    local full_backup_file="$1"
    local binlog_dir="$2"
    local target_datetime="$3"
    local target_database="$4"
    
    log_message "\n=== 时间点恢复 (PITR) ==="
    log_message "全量备份: $full_backup_file"
    log_message "二进制日志目录: $binlog_dir"
    log_message "目标时间: $target_datetime"
    log_message "目标数据库: $target_database"
    
    # 1. 恢复全量备份
    log_message "\n步骤1: 恢复全量备份"
    if ! restore_database "$full_backup_file" "$target_database" "replace"; then
        log_message "❌ 全量备份恢复失败"
        return 1
    fi
    
    # 2. 应用二进制日志
    log_message "\n步骤2: 应用二进制日志到指定时间点"
    
    # 查找需要应用的二进制日志文件
    local binlog_files=$(find "$binlog_dir" -name "mysql-bin.*" -type f | sort)
    
    for binlog_file in $binlog_files; do
        log_message "处理二进制日志: $binlog_file"
        
        # 使用mysqlbinlog应用到指定时间点
        mysqlbinlog --stop-datetime="$target_datetime" "$binlog_file" | \
            mysql -h$MYSQL_HOST -P$MYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD $target_database
        
        if [ $? -ne 0 ]; then
            log_message "⚠️  应用二进制日志时出现错误: $binlog_file"
        fi
    done
    
    log_message "✅ 时间点恢复完成"
    
    # 验证恢复结果
    verify_restore_result "$target_database"
}

# 生成恢复报告
generate_restore_report() {
    local operation_type="$1"
    local status="$2"
    local details="$3"
    
    log_message "\n=== MySQL恢复报告 ==="
    log_message "恢复时间: $(date '+%Y-%m-%d %H:%M:%S')"
    log_message "操作类型: $operation_type"
    log_message "恢复状态: $status"
    
    if [ "$status" = "成功" ]; then
        log_message "详细信息: $details"
        
        # 显示当前数据库列表
        log_message "\n当前数据库列表:"
        mysql -h$MYSQL_HOST -P$MYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD \
            -e "SHOW DATABASES" --skip-column-names | while read db; do
            if [[ "$db" != "information_schema" && "$db" != "performance_schema" && "$db" != "mysql" && "$db" != "sys" ]]; then
                log_message "  - $db"
            fi
        done
    fi
    
    log_message "==========================================="
}

# 主恢复流程
main() {
    log_message "=== 开始MySQL逻辑备份恢复 ==="
    
    # 检查MySQL连接
    if ! check_mysql_connection; then
        exit 1
    fi
    
    # 根据参数决定恢复类型
    local restore_type="${1:-single}"
    
    case $restore_type in
        "single")
            local backup_file="$2"
            local database_name="$3"
            local restore_mode="${4:-replace}"
            
            if [ -z "$backup_file" ] || [ -z "$database_name" ]; then
                log_message "❌ 参数不足"
                log_message "用法: $0 single <backup_file> <database_name> [restore_mode]"
                exit 1
            fi
            
            if restore_database "$backup_file" "$database_name" "$restore_mode"; then
                generate_restore_report "单数据库恢复" "成功" "$database_name"
            else
                generate_restore_report "单数据库恢复" "失败" "$database_name"
            fi
            ;;
        "batch")
            local backup_pattern="$2"
            local database_prefix="$3"
            
            if [ -z "$backup_pattern" ]; then
                log_message "❌ 参数不足"
                log_message "用法: $0 batch <backup_pattern> [database_prefix]"
                exit 1
            fi
            
            if batch_restore "$backup_pattern" "$database_prefix"; then
                generate_restore_report "批量恢复" "成功" "模式: $backup_pattern"
            else
                generate_restore_report "批量恢复" "部分失败" "模式: $backup_pattern"
            fi
            ;;
        "pitr")
            local full_backup="$2"
            local binlog_dir="$3"
            local target_time="$4"
            local target_db="$5"
            
            if [ -z "$full_backup" ] || [ -z "$binlog_dir" ] || [ -z "$target_time" ] || [ -z "$target_db" ]; then
                log_message "❌ 参数不足"
                log_message "用法: $0 pitr <full_backup> <binlog_dir> <target_datetime> <target_database>"
                exit 1
            fi
            
            if point_in_time_recovery "$full_backup" "$binlog_dir" "$target_time" "$target_db"; then
                generate_restore_report "时间点恢复" "成功" "恢复到: $target_time"
            else
                generate_restore_report "时间点恢复" "失败" "恢复到: $target_time"
            fi
            ;;
        *)
            log_message "❌ 未知的恢复类型: $restore_type"
            log_message "支持的类型: single, batch, pitr"
            exit 1
            ;;
    esac
    
    log_message "=== MySQL逻辑备份恢复完成 ==="
}

# 执行主流程
main "$@"

11.4.2 物理备份恢复

#!/bin/bash
# XtraBackup物理备份恢复脚本

# 配置参数
MYSQL_DATADIR="/var/lib/mysql"
MYSQL_USER="mysql"
MYSQL_GROUP="mysql"
BACKUP_DIR="/backup/mysql/xtrabackup"
TEMP_RESTORE_DIR="/tmp/mysql_restore"
LOG_FILE="/var/log/xtrabackup_restore.log"

# 日志函数
log_message() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a $LOG_FILE
}

# 检查XtraBackup环境
check_xtrabackup_environment() {
    log_message "检查XtraBackup环境..."
    
    # 检查xtrabackup命令
    if ! command -v xtrabackup >/dev/null 2>&1; then
        log_message "❌ xtrabackup命令未找到"
        return 1
    fi
    
    # 检查MySQL是否停止
    if systemctl is-active --quiet mysql || systemctl is-active --quiet mysqld; then
        log_message "⚠️  MySQL服务正在运行,恢复前需要停止"
        return 2
    fi
    
    # 检查数据目录
    if [ ! -d "$MYSQL_DATADIR" ]; then
        log_message "❌ MySQL数据目录不存在: $MYSQL_DATADIR"
        return 1
    fi
    
    # 检查临时目录
    if [ ! -d "$TEMP_RESTORE_DIR" ]; then
        mkdir -p "$TEMP_RESTORE_DIR"
        log_message "✅ 创建临时恢复目录: $TEMP_RESTORE_DIR"
    fi
    
    log_message "✅ XtraBackup环境检查通过"
    return 0
}

# 停止MySQL服务
stop_mysql_service() {
    log_message "停止MySQL服务..."
    
    # 尝试优雅停止
    if systemctl is-active --quiet mysql; then
        systemctl stop mysql
        sleep 5
    elif systemctl is-active --quiet mysqld; then
        systemctl stop mysqld
        sleep 5
    fi
    
    # 检查是否还有MySQL进程
    local mysql_pids=$(pgrep mysqld)
    if [ -n "$mysql_pids" ]; then
        log_message "⚠️  发现残留MySQL进程,强制终止"
        kill -9 $mysql_pids
        sleep 2
    fi
    
    log_message "✅ MySQL服务已停止"
}

# 备份当前数据目录
backup_current_datadir() {
    local backup_suffix="$(date +%Y%m%d_%H%M%S)"
    local current_backup_dir="${MYSQL_DATADIR}_backup_${backup_suffix}"
    
    log_message "备份当前数据目录..."
    log_message "备份路径: $current_backup_dir"
    
    if [ -d "$MYSQL_DATADIR" ] && [ "$(ls -A $MYSQL_DATADIR)" ]; then
        mv "$MYSQL_DATADIR" "$current_backup_dir"
        
        if [ $? -eq 0 ]; then
            log_message "✅ 当前数据目录已备份到: $current_backup_dir"
            echo "$current_backup_dir" > "$TEMP_RESTORE_DIR/original_datadir_backup.txt"
            return 0
        else
            log_message "❌ 备份当前数据目录失败"
            return 1
        fi
    else
        log_message "✅ 数据目录为空,无需备份"
        return 0
    fi
}

# 准备备份文件
prepare_backup() {
    local backup_path="$1"
    local prepare_dir="$TEMP_RESTORE_DIR/prepared"
    
    log_message "准备备份文件..."
    log_message "源备份路径: $backup_path"
    log_message "准备目录: $prepare_dir"
    
    # 清理准备目录
    if [ -d "$prepare_dir" ]; then
        rm -rf "$prepare_dir"
    fi
    mkdir -p "$prepare_dir"
    
    # 复制备份到准备目录
    log_message "复制备份文件到准备目录..."
    cp -r "$backup_path"/* "$prepare_dir/"
    
    if [ $? -ne 0 ]; then
        log_message "❌ 复制备份文件失败"
        return 1
    fi
    
    # 检查是否需要解压
    if [ -f "$prepare_dir/xtrabackup_compressed" ]; then
        log_message "检测到压缩备份,开始解压..."
        
        xtrabackup --decompress --target-dir="$prepare_dir" --parallel=4
        
        if [ $? -eq 0 ]; then
            log_message "✅ 解压完成"
            # 删除压缩文件
            find "$prepare_dir" -name "*.qp" -delete
            rm -f "$prepare_dir/xtrabackup_compressed"
        else
            log_message "❌ 解压失败"
            return 1
        fi
    fi
    
    # 检查是否需要解密
    if [ -f "$prepare_dir/xtrabackup_encrypted" ]; then
        log_message "检测到加密备份,开始解密..."
        
        # 这里需要提供加密密钥文件路径
        local encryption_key="/etc/mysql/backup.key"
        
        if [ -f "$encryption_key" ]; then
            xtrabackup --decrypt=AES256 --encrypt-key-file="$encryption_key" \
                --target-dir="$prepare_dir" --parallel=4
            
            if [ $? -eq 0 ]; then
                log_message "✅ 解密完成"
                # 删除加密文件
                find "$prepare_dir" -name "*.xbcrypt" -delete
                rm -f "$prepare_dir/xtrabackup_encrypted"
            else
                log_message "❌ 解密失败"
                return 1
            fi
        else
            log_message "❌ 加密密钥文件不存在: $encryption_key"
            return 1
        fi
    fi
    
    # 执行prepare
    log_message "执行备份准备..."
    
    xtrabackup --prepare --target-dir="$prepare_dir"
    
    if [ $? -eq 0 ]; then
        log_message "✅ 备份准备完成"
        echo "$prepare_dir" > "$TEMP_RESTORE_DIR/prepared_backup_dir.txt"
        return 0
    else
        log_message "❌ 备份准备失败"
        return 1
    fi
}

# 恢复数据文件
restore_datafiles() {
    local prepared_backup_dir="$1"
    
    log_message "恢复数据文件..."
    log_message "源目录: $prepared_backup_dir"
    log_message "目标目录: $MYSQL_DATADIR"
    
    # 创建数据目录
    mkdir -p "$MYSQL_DATADIR"
    
    # 复制数据文件
    log_message "复制数据文件..."
    
    xtrabackup --copy-back --target-dir="$prepared_backup_dir" --datadir="$MYSQL_DATADIR"
    
    if [ $? -eq 0 ]; then
        log_message "✅ 数据文件恢复完成"
    else
        log_message "❌ 数据文件恢复失败"
        return 1
    fi
    
    # 设置文件权限
    log_message "设置文件权限..."
    
    chown -R $MYSQL_USER:$MYSQL_GROUP "$MYSQL_DATADIR"
    chmod -R 750 "$MYSQL_DATADIR"
    
    # 特殊文件权限
    if [ -f "$MYSQL_DATADIR/mysql.sock" ]; then
        chmod 777 "$MYSQL_DATADIR/mysql.sock"
    fi
    
    log_message "✅ 文件权限设置完成"
    return 0
}

# 增量恢复
incremental_restore() {
    local full_backup_dir="$1"
    local incremental_backup_dirs="$2"  # 空格分隔的增量备份目录列表
    
    log_message "\n=== 增量恢复 ==="
    log_message "全量备份: $full_backup_dir"
    log_message "增量备份: $incremental_backup_dirs"
    
    local prepare_dir="$TEMP_RESTORE_DIR/incremental_prepared"
    
    # 清理准备目录
    if [ -d "$prepare_dir" ]; then
        rm -rf "$prepare_dir"
    fi
    mkdir -p "$prepare_dir"
    
    # 1. 复制全量备份
    log_message "\n步骤1: 准备全量备份"
    cp -r "$full_backup_dir"/* "$prepare_dir/"
    
    # 准备全量备份(仅应用日志)
    xtrabackup --prepare --apply-log-only --target-dir="$prepare_dir"
    
    if [ $? -ne 0 ]; then
        log_message "❌ 全量备份准备失败"
        return 1
    fi
    
    log_message "✅ 全量备份准备完成"
    
    # 2. 应用增量备份
    local inc_count=1
    for inc_backup_dir in $incremental_backup_dirs; do
        log_message "\n步骤$((inc_count + 1)): 应用增量备份 $inc_count"
        log_message "增量备份目录: $inc_backup_dir"
        
        # 检查增量备份目录
        if [ ! -d "$inc_backup_dir" ]; then
            log_message "❌ 增量备份目录不存在: $inc_backup_dir"
            return 1
        fi
        
        # 应用增量备份
        if [ $inc_count -eq $(echo $incremental_backup_dirs | wc -w) ]; then
            # 最后一个增量备份,不使用--apply-log-only
            xtrabackup --prepare --target-dir="$prepare_dir" --incremental-dir="$inc_backup_dir"
        else
            # 中间的增量备份,使用--apply-log-only
            xtrabackup --prepare --apply-log-only --target-dir="$prepare_dir" --incremental-dir="$inc_backup_dir"
        fi
        
        if [ $? -eq 0 ]; then
            log_message "✅ 增量备份 $inc_count 应用成功"
        else
            log_message "❌ 增量备份 $inc_count 应用失败"
            return 1
        fi
        
        ((inc_count++))
    done
    
    # 3. 最终准备
    log_message "\n步骤$((inc_count + 1)): 最终准备"
    xtrabackup --prepare --target-dir="$prepare_dir"
    
    if [ $? -eq 0 ]; then
        log_message "✅ 增量恢复准备完成"
        echo "$prepare_dir" > "$TEMP_RESTORE_DIR/prepared_backup_dir.txt"
        return 0
    else
        log_message "❌ 增量恢复准备失败"
        return 1
    fi
}

# 启动MySQL服务
start_mysql_service() {
    log_message "启动MySQL服务..."
    
    # 检查配置文件
    local mysql_config="/etc/mysql/my.cnf"
    if [ ! -f "$mysql_config" ]; then
        mysql_config="/etc/my.cnf"
    fi
    
    if [ ! -f "$mysql_config" ]; then
        log_message "⚠️  MySQL配置文件未找到,使用默认配置"
    else
        log_message "使用配置文件: $mysql_config"
    fi
    
    # 启动服务
    if systemctl start mysql 2>/dev/null || systemctl start mysqld 2>/dev/null; then
        # 等待服务启动
        local wait_count=0
        while [ $wait_count -lt 30 ]; do
            if systemctl is-active --quiet mysql || systemctl is-active --quiet mysqld; then
                log_message "✅ MySQL服务启动成功"
                return 0
            fi
            sleep 2
            ((wait_count++))
        done
        
        log_message "❌ MySQL服务启动超时"
        return 1
    else
        log_message "❌ MySQL服务启动失败"
        return 1
    fi
}

# 验证恢复结果
verify_physical_restore() {
    log_message "验证物理恢复结果..."
    
    # 等待MySQL完全启动
    sleep 10
    
    # 检查MySQL连接
    local max_attempts=10
    local attempt=1
    
    while [ $attempt -le $max_attempts ]; do
        if mysql -e "SELECT 1" >/dev/null 2>&1; then
            log_message "✅ MySQL连接正常"
            break
        else
            log_message "⏳ 等待MySQL启动... (尝试 $attempt/$max_attempts)"
            sleep 5
            ((attempt++))
        fi
    done
    
    if [ $attempt -gt $max_attempts ]; then
        log_message "❌ MySQL连接失败"
        return 1
    fi
    
    # 检查数据库列表
    log_message "\n数据库列表:"
    mysql -e "SHOW DATABASES" --skip-column-names | while read db; do
        log_message "  - $db"
    done
    
    # 检查InnoDB状态
    log_message "\nInnoDB引擎状态:"
    local innodb_status=$(mysql -e "SHOW ENGINE INNODB STATUS\G" | grep "LATEST DETECTED DEADLOCK" -A 5 -B 5 | head -10)
    if [ -n "$innodb_status" ]; then
        log_message "InnoDB引擎正常运行"
    fi
    
    # 检查错误日志
    local error_log=$(mysql -e "SHOW VARIABLES LIKE 'log_error'" --skip-column-names | awk '{print $2}')
    if [ -f "$error_log" ]; then
        local recent_errors=$(tail -20 "$error_log" | grep -i error | wc -l)
        log_message "最近错误数量: $recent_errors"
        
        if [ $recent_errors -gt 0 ]; then
            log_message "⚠️  发现错误,请检查错误日志: $error_log"
        fi
    fi
    
    log_message "✅ 物理恢复验证完成"
    return 0
}

# 清理临时文件
cleanup_temp_files() {
    log_message "清理临时文件..."
    
    if [ -d "$TEMP_RESTORE_DIR" ]; then
        rm -rf "$TEMP_RESTORE_DIR"
        log_message "✅ 临时文件清理完成"
    fi
}

# 生成恢复报告
generate_physical_restore_report() {
    local restore_type="$1"
    local status="$2"
    local backup_info="$3"
    
    log_message "\n=== XtraBackup恢复报告 ==="
    log_message "恢复时间: $(date '+%Y-%m-%d %H:%M:%S')"
    log_message "恢复类型: $restore_type"
    log_message "恢复状态: $status"
    log_message "备份信息: $backup_info"
    
    if [ "$status" = "成功" ]; then
        log_message "数据目录: $MYSQL_DATADIR"
        
        # 显示磁盘使用情况
        local disk_usage=$(du -sh "$MYSQL_DATADIR" | cut -f1)
        log_message "数据大小: $disk_usage"
        
        # 显示MySQL状态
        if systemctl is-active --quiet mysql || systemctl is-active --quiet mysqld; then
            log_message "MySQL状态: 运行中"
        else
            log_message "MySQL状态: 已停止"
        fi
    fi
    
    log_message "==========================================="
}

# 主恢复流程
main() {
    log_message "=== 开始XtraBackup物理恢复 ==="
    
    # 检查环境
    local env_check_result
    check_xtrabackup_environment
    env_check_result=$?
    
    if [ $env_check_result -eq 1 ]; then
        exit 1
    elif [ $env_check_result -eq 2 ]; then
        # MySQL正在运行,需要停止
        stop_mysql_service
    fi
    
    # 根据参数决定恢复类型
    local restore_type="${1:-full}"
    
    case $restore_type in
        "full")
            local backup_dir="$2"
            
            if [ -z "$backup_dir" ] || [ ! -d "$backup_dir" ]; then
                log_message "❌ 备份目录无效: $backup_dir"
                log_message "用法: $0 full <backup_directory>"
                exit 1
            fi
            
            log_message "执行全量恢复"
            
            # 备份当前数据目录
            if ! backup_current_datadir; then
                exit 1
            fi
            
            # 准备备份
            if ! prepare_backup "$backup_dir"; then
                exit 1
            fi
            
            # 恢复数据文件
            local prepared_dir=$(cat "$TEMP_RESTORE_DIR/prepared_backup_dir.txt")
            if ! restore_datafiles "$prepared_dir"; then
                exit 1
            fi
            
            # 启动MySQL
            if ! start_mysql_service; then
                exit 1
            fi
            
            # 验证恢复
            if verify_physical_restore; then
                generate_physical_restore_report "全量恢复" "成功" "$backup_dir"
                cleanup_temp_files
            else
                generate_physical_restore_report "全量恢复" "验证失败" "$backup_dir"
            fi
            ;;
        "incremental")
            local full_backup="$2"
            shift 2
            local incremental_backups="$@"
            
            if [ -z "$full_backup" ] || [ -z "$incremental_backups" ]; then
                log_message "❌ 参数不足"
                log_message "用法: $0 incremental <full_backup_dir> <inc_backup_dir1> [inc_backup_dir2] ..."
                exit 1
            fi
            
            log_message "执行增量恢复"
            
            # 备份当前数据目录
            if ! backup_current_datadir; then
                exit 1
            fi
            
            # 增量恢复
            if ! incremental_restore "$full_backup" "$incremental_backups"; then
                exit 1
            fi
            
            # 恢复数据文件
            local prepared_dir=$(cat "$TEMP_RESTORE_DIR/prepared_backup_dir.txt")
            if ! restore_datafiles "$prepared_dir"; then
                exit 1
            fi
            
            # 启动MySQL
            if ! start_mysql_service; then
                exit 1
            fi
            
            # 验证恢复
            if verify_physical_restore; then
                generate_physical_restore_report "增量恢复" "成功" "全量: $full_backup, 增量: $incremental_backups"
                cleanup_temp_files
            else
                generate_physical_restore_report "增量恢复" "验证失败" "全量: $full_backup, 增量: $incremental_backups"
            fi
            ;;
        *)
            log_message "❌ 未知的恢复类型: $restore_type"
            log_message "支持的类型: full, incremental"
            exit 1
            ;;
    esac
    
    log_message "=== XtraBackup物理恢复完成 ==="
}

# 执行主流程
main "$@"

验证备份文件

validate_backup() { local backup_file=$1 local db_name=$2

log_message "验证备份文件: $backup_file"

# 检查文件是否存在且不为空
if [ ! -s "$backup_file" ]; then
    log_message "❌ 备份文件为空或不存在"
    return 1
fi

# 检查压缩文件完整性
if gzip -t "$backup_file" >/dev/null 2>&1; then
    log_message "✅ 压缩文件完整性验证通过"
else
    log_message "❌ 压缩文件损坏"
    return 1
fi

# 检查SQL文件内容
if zcat "$backup_file" | head -20 | grep -q "MySQL dump"; then
    log_message "✅ 备份文件格式验证通过"
else
    log_message "❌ 备份文件格式错误"
    return 1
fi

# 检查是否包含数据库创建语句
if zcat "$backup_file" | grep -q "CREATE DATABASE.*$db_name"; then
    log_message "✅ 数据库创建语句验证通过"
else
    log_message "⚠️  未找到数据库创建语句"
fi

return 0

}

清理旧备份

cleanup_old_backups() { log_message “清理 $RETENTION_DAYS 天前的备份文件…”

local deleted_count=$(find $BACKUP_DIR -name "*.sql.gz" -mtime +$RETENTION_DAYS -delete -print | wc -l)

if [ $deleted_count -gt 0 ]; then
    log_message "✅ 已删除 $deleted_count 个过期备份文件"
else
    log_message "ℹ️  没有需要删除的过期备份文件"
fi

}

生成备份报告

generate_backup_report() { local total_databases=$1 local successful_backups=$2 local failed_backups=$3 local total_size=$4

log_message "\n=== 备份报告 ==="
log_message "备份时间: $(date '+%Y-%m-%d %H:%M:%S')"
log_message "总数据库数: $total_databases"
log_message "成功备份: $successful_backups"
log_message "失败备份: $failed_backups"
log_message "总备份大小: $total_size"
log_message "备份目录: $BACKUP_DIR"

if [ $failed_backups -eq 0 ]; then
    log_message "✅ 所有数据库备份成功"
else
    log_message "⚠️  有 $failed_backups 个数据库备份失败"
fi

}

发送备份通知

send_notification() { local status=$1 local message=$2

# 发送邮件通知(需要配置邮件服务)
if command -v mail >/dev/null 2>&1; then
    echo "$message" | mail -s "MySQL备份通知: $status" "admin@example.com"
fi

# 发送到系统日志
logger "MySQL Backup: $status - $message"

}

主备份流程

main() { log_message “=== 开始MySQL备份 ===”

# 检查MySQL连接
if ! check_mysql_connection; then
    send_notification "失败" "MySQL连接失败"
    exit 1
fi

# 获取数据库列表
local databases=$(get_databases)
local total_databases=$(echo "$databases" | wc -l)
local successful_backups=0
local failed_backups=0

log_message "发现 $total_databases 个数据库需要备份"

# 备份每个数据库
for db in $databases; do
    if backup_database $db; then
        ((successful_backups++))
    else
        ((failed_backups++))
    fi
done

# 计算总备份大小
local total_size=$(du -sh $BACKUP_DIR | cut -f1)

# 清理旧备份
cleanup_old_backups

# 生成报告
generate_backup_report $total_databases $successful_backups $failed_backups $total_size

# 发送通知
if [ $failed_backups -eq 0 ]; then
    send_notification "成功" "所有 $total_databases 个数据库备份成功,总大小: $total_size"
else
    send_notification "部分失败" "$successful_backups/$total_databases 个数据库备份成功,$failed_backups 个失败"
fi

log_message "=== 备份完成 ==="

}

执行主流程

main


### 11.2.2 高级mysqldump技巧

```python
import subprocess
import os
import json
from datetime import datetime
import threading
import queue

class AdvancedMySQLDump:
    def __init__(self, config):
        self.config = config
        self.backup_queue = queue.Queue()
        self.results = []
        
    def parallel_table_backup(self, database, tables, max_workers=4):
        """并行表级备份"""
        print(f"\n=== 并行备份数据库 {database} ===")
        print(f"表数量: {len(tables)}")
        print(f"并行度: {max_workers}")
        
        # 将表添加到队列
        for table in tables:
            self.backup_queue.put((database, table))
        
        # 创建工作线程
        threads = []
        for i in range(max_workers):
            thread = threading.Thread(target=self._backup_worker, args=(i,))
            thread.start()
            threads.append(thread)
        
        # 等待所有线程完成
        for thread in threads:
            thread.join()
        
        return self.results
    
    def _backup_worker(self, worker_id):
        """备份工作线程"""
        while True:
            try:
                database, table = self.backup_queue.get(timeout=1)
                result = self._backup_single_table(database, table, worker_id)
                self.results.append(result)
                self.backup_queue.task_done()
            except queue.Empty:
                break
            except Exception as e:
                print(f"Worker {worker_id} error: {e}")
                break
    
    def _backup_single_table(self, database, table, worker_id):
        """备份单个表"""
        start_time = datetime.now()
        backup_file = f"{self.config['backup_dir']}/{database}_{table}_{start_time.strftime('%Y%m%d_%H%M%S')}.sql"
        
        print(f"Worker {worker_id}: 备份 {database}.{table}")
        
        # 构建mysqldump命令
        cmd = [
            'mysqldump',
            f"--host={self.config['host']}",
            f"--port={self.config['port']}",
            f"--user={self.config['user']}",
            f"--password={self.config['password']}",
            '--single-transaction',
            '--quick',
            '--lock-tables=false',
            '--where=1',  # 可以添加WHERE条件
            database,
            table
        ]
        
        try:
            with open(backup_file, 'w') as f:
                result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, 
                                      text=True, timeout=3600)
            
            end_time = datetime.now()
            duration = (end_time - start_time).total_seconds()
            
            if result.returncode == 0:
                file_size = os.path.getsize(backup_file)
                print(f"Worker {worker_id}: ✅ {database}.{table} 备份成功 ({file_size} bytes, {duration:.1f}s)")
                
                return {
                    'database': database,
                    'table': table,
                    'status': 'success',
                    'file': backup_file,
                    'size': file_size,
                    'duration': duration,
                    'worker_id': worker_id
                }
            else:
                print(f"Worker {worker_id}: ❌ {database}.{table} 备份失败: {result.stderr}")
                os.remove(backup_file) if os.path.exists(backup_file) else None
                
                return {
                    'database': database,
                    'table': table,
                    'status': 'failed',
                    'error': result.stderr,
                    'duration': duration,
                    'worker_id': worker_id
                }
        
        except subprocess.TimeoutExpired:
            print(f"Worker {worker_id}: ⏰ {database}.{table} 备份超时")
            os.remove(backup_file) if os.path.exists(backup_file) else None
            
            return {
                'database': database,
                'table': table,
                'status': 'timeout',
                'duration': 3600,
                'worker_id': worker_id
            }
        
        except Exception as e:
            print(f"Worker {worker_id}: 💥 {database}.{table} 备份异常: {e}")
            os.remove(backup_file) if os.path.exists(backup_file) else None
            
            return {
                'database': database,
                'table': table,
                'status': 'error',
                'error': str(e),
                'worker_id': worker_id
            }
    
    def selective_backup(self, database, conditions):
        """选择性备份"""
        print(f"\n=== 选择性备份 {database} ===")
        
        backup_sets = []
        
        for condition in conditions:
            table = condition['table']
            where_clause = condition.get('where', '1=1')
            backup_name = condition.get('name', f"{table}_selective")
            
            backup_file = f"{self.config['backup_dir']}/{database}_{backup_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.sql"
            
            cmd = [
                'mysqldump',
                f"--host={self.config['host']}",
                f"--port={self.config['port']}",
                f"--user={self.config['user']}",
                f"--password={self.config['password']}",
                '--single-transaction',
                '--quick',
                '--lock-tables=false',
                f"--where={where_clause}",
                database,
                table
            ]
            
            print(f"备份 {table} (条件: {where_clause})")
            
            try:
                with open(backup_file, 'w') as f:
                    result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, text=True)
                
                if result.returncode == 0:
                    file_size = os.path.getsize(backup_file)
                    print(f"✅ {table} 选择性备份成功 ({file_size} bytes)")
                    
                    backup_sets.append({
                        'table': table,
                        'condition': where_clause,
                        'file': backup_file,
                        'size': file_size,
                        'status': 'success'
                    })
                else:
                    print(f"❌ {table} 选择性备份失败: {result.stderr}")
                    os.remove(backup_file) if os.path.exists(backup_file) else None
                    
                    backup_sets.append({
                        'table': table,
                        'condition': where_clause,
                        'error': result.stderr,
                        'status': 'failed'
                    })
            
            except Exception as e:
                print(f"💥 {table} 选择性备份异常: {e}")
                backup_sets.append({
                    'table': table,
                    'condition': where_clause,
                    'error': str(e),
                    'status': 'error'
                })
        
        return backup_sets
    
    def schema_only_backup(self, databases):
        """仅结构备份"""
        print(f"\n=== 仅结构备份 ===")
        
        schema_backups = []
        
        for database in databases:
            backup_file = f"{self.config['backup_dir']}/{database}_schema_{datetime.now().strftime('%Y%m%d_%H%M%S')}.sql"
            
            cmd = [
                'mysqldump',
                f"--host={self.config['host']}",
                f"--port={self.config['port']}",
                f"--user={self.config['user']}",
                f"--password={self.config['password']}",
                '--no-data',  # 仅结构,不包含数据
                '--routines',
                '--triggers',
                '--events',
                '--single-transaction',
                '--databases',
                database
            ]
            
            print(f"备份 {database} 数据库结构")
            
            try:
                with open(backup_file, 'w') as f:
                    result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, text=True)
                
                if result.returncode == 0:
                    file_size = os.path.getsize(backup_file)
                    print(f"✅ {database} 结构备份成功 ({file_size} bytes)")
                    
                    schema_backups.append({
                        'database': database,
                        'file': backup_file,
                        'size': file_size,
                        'status': 'success'
                    })
                else:
                    print(f"❌ {database} 结构备份失败: {result.stderr}")
                    os.remove(backup_file) if os.path.exists(backup_file) else None
                    
                    schema_backups.append({
                        'database': database,
                        'error': result.stderr,
                        'status': 'failed'
                    })
            
            except Exception as e:
                print(f"💥 {database} 结构备份异常: {e}")
                schema_backups.append({
                    'database': database,
                    'error': str(e),
                    'status': 'error'
                })
        
        return schema_backups
    
    def data_only_backup(self, database, tables):
        """仅数据备份"""
        print(f"\n=== 仅数据备份 {database} ===")
        
        data_backups = []
        
        for table in tables:
            backup_file = f"{self.config['backup_dir']}/{database}_{table}_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.sql"
            
            cmd = [
                'mysqldump',
                f"--host={self.config['host']}",
                f"--port={self.config['port']}",
                f"--user={self.config['user']}",
                f"--password={self.config['password']}",
                '--no-create-info',  # 仅数据,不包含结构
                '--single-transaction',
                '--quick',
                '--lock-tables=false',
                '--complete-insert',  # 完整INSERT语句
                database,
                table
            ]
            
            print(f"备份 {table} 表数据")
            
            try:
                with open(backup_file, 'w') as f:
                    result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, text=True)
                
                if result.returncode == 0:
                    file_size = os.path.getsize(backup_file)
                    print(f"✅ {table} 数据备份成功 ({file_size} bytes)")
                    
                    data_backups.append({
                        'table': table,
                        'file': backup_file,
                        'size': file_size,
                        'status': 'success'
                    })
                else:
                    print(f"❌ {table} 数据备份失败: {result.stderr}")
                    os.remove(backup_file) if os.path.exists(backup_file) else None
                    
                    data_backups.append({
                        'table': table,
                        'error': result.stderr,
                        'status': 'failed'
                    })
            
            except Exception as e:
                print(f"💥 {table} 数据备份异常: {e}")
                data_backups.append({
                    'table': table,
                    'error': str(e),
                    'status': 'error'
                })
        
        return data_backups

# 高级备份使用示例
config = {
    'host': 'localhost',
    'port': '3306',
    'user': 'backup_user',
    'password': 'backup_password',
    'backup_dir': '/backup/mysql/advanced'
}

# 创建备份目录
os.makedirs(config['backup_dir'], exist_ok=True)

advanced_dump = AdvancedMySQLDump(config)

# 示例1: 并行表备份
tables_to_backup = ['users', 'orders', 'products', 'categories', 'reviews']
parallel_results = advanced_dump.parallel_table_backup('ecommerce', tables_to_backup, max_workers=3)

print("\n📊 并行备份结果:")
for result in parallel_results:
    status_icon = "✅" if result['status'] == 'success' else "❌"
    print(f"  {status_icon} {result['database']}.{result['table']} - {result['status']}")

# 示例2: 选择性备份
selective_conditions = [
    {
        'table': 'orders',
        'where': "order_date >= '2024-01-01'",
        'name': 'orders_2024'
    },
    {
        'table': 'users',
        'where': "created_at >= '2023-01-01' AND status = 'active'",
        'name': 'active_users_2023'
    }
]

selective_results = advanced_dump.selective_backup('ecommerce', selective_conditions)

print("\n📊 选择性备份结果:")
for result in selective_results:
    status_icon = "✅" if result['status'] == 'success' else "❌"
    print(f"  {status_icon} {result['table']} ({result['condition']}) - {result['status']}")

# 示例3: 仅结构备份
schema_results = advanced_dump.schema_only_backup(['ecommerce', 'analytics'])

print("\n📊 结构备份结果:")
for result in schema_results:
    status_icon = "✅" if result['status'] == 'success' else "❌"
    print(f"  {status_icon} {result['database']} schema - {result['status']}")

# 示例4: 仅数据备份
data_results = advanced_dump.data_only_backup('ecommerce', ['users', 'products'])

print("\n📊 数据备份结果:")
for result in data_results:
    status_icon = "✅" if result['status'] == 'success' else "❌"
    print(f"  {status_icon} {result['table']} data - {result['status']}")

11.2.3 mysqlpump并行备份

#!/bin/bash
# mysqlpump并行备份脚本

# 配置参数
MYSQL_USER="backup_user"
MYSQL_PASSWORD="backup_password"
MYSQL_HOST="localhost"
MYSQL_PORT="3306"
BACKUP_DIR="/backup/mysql/pump"
LOG_FILE="/var/log/mysql/mysqlpump.log"
PARALLEL_THREADS=4
COMPRESSION_ALGORITHM="zlib"
COMPRESSION_LEVEL=6

# 创建备份目录
mkdir -p $BACKUP_DIR
mkdir -p $(dirname $LOG_FILE)

# 日志函数
log_message() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a $LOG_FILE
}

# mysqlpump完整备份
mysqlpump_full_backup() {
    local backup_file="$BACKUP_DIR/full_backup_$(date +%Y%m%d_%H%M%S).sql"
    
    log_message "开始mysqlpump完整备份"
    log_message "并行线程数: $PARALLEL_THREADS"
    log_message "压缩算法: $COMPRESSION_ALGORITHM"
    
    local start_time=$(date +%s)
    
    mysqlpump \
        --host=$MYSQL_HOST \
        --port=$MYSQL_PORT \
        --user=$MYSQL_USER \
        --password=$MYSQL_PASSWORD \
        --default-parallelism=$PARALLEL_THREADS \
        --compress-output=$COMPRESSION_ALGORITHM \
        --single-transaction \
        --add-drop-database \
        --add-drop-table \
        --routines \
        --events \
        --triggers \
        --exclude-databases=information_schema,performance_schema,mysql,sys \
        --result-file=$backup_file
    
    local backup_result=$?
    local end_time=$(date +%s)
    local duration=$((end_time - start_time))
    
    if [ $backup_result -eq 0 ]; then
        local file_size=$(du -h $backup_file | cut -f1)
        log_message "✅ mysqlpump完整备份成功"
        log_message "   备份文件: $backup_file"
        log_message "   文件大小: $file_size"
        log_message "   备份耗时: ${duration}秒"
        
        # 验证备份文件
        validate_mysqlpump_backup $backup_file
    else
        log_message "❌ mysqlpump完整备份失败"
        rm -f $backup_file
        return 1
    fi
    
    return 0
}

# mysqlpump选择性备份
mysqlpump_selective_backup() {
    local databases="$1"
    local backup_file="$BACKUP_DIR/selective_backup_$(date +%Y%m%d_%H%M%S).sql"
    
    log_message "开始mysqlpump选择性备份"
    log_message "目标数据库: $databases"
    
    local start_time=$(date +%s)
    
    mysqlpump \
        --host=$MYSQL_HOST \
        --port=$MYSQL_PORT \
        --user=$MYSQL_USER \
        --password=$MYSQL_PASSWORD \
        --default-parallelism=$PARALLEL_THREADS \
        --compress-output=$COMPRESSION_ALGORITHM \
        --single-transaction \
        --add-drop-database \
        --add-drop-table \
        --routines \
        --events \
        --triggers \
        --databases $databases \
        --result-file=$backup_file
    
    local backup_result=$?
    local end_time=$(date +%s)
    local duration=$((end_time - start_time))
    
    if [ $backup_result -eq 0 ]; then
        local file_size=$(du -h $backup_file | cut -f1)
        log_message "✅ mysqlpump选择性备份成功"
        log_message "   备份文件: $backup_file"
        log_message "   文件大小: $file_size"
        log_message "   备份耗时: ${duration}秒"
    else
        log_message "❌ mysqlpump选择性备份失败"
        rm -f $backup_file
        return 1
    fi
    
    return 0
}

# mysqlpump表级并行备份
mysqlpump_table_parallel_backup() {
    local database="$1"
    local tables="$2"
    local backup_file="$BACKUP_DIR/${database}_tables_$(date +%Y%m%d_%H%M%S).sql"
    
    log_message "开始mysqlpump表级并行备份"
    log_message "数据库: $database"
    log_message "表: $tables"
    
    local start_time=$(date +%s)
    
    # 为每个表设置并行度
    local parallel_config=""
    for table in $tables; do
        parallel_config="$parallel_config --parallel-schemas=$PARALLEL_THREADS:$database.$table"
    done
    
    mysqlpump \
        --host=$MYSQL_HOST \
        --port=$MYSQL_PORT \
        --user=$MYSQL_USER \
        --password=$MYSQL_PASSWORD \
        $parallel_config \
        --compress-output=$COMPRESSION_ALGORITHM \
        --single-transaction \
        --add-drop-table \
        --include-tables=$database.$tables \
        --result-file=$backup_file
    
    local backup_result=$?
    local end_time=$(date +%s)
    local duration=$((end_time - start_time))
    
    if [ $backup_result -eq 0 ]; then
        local file_size=$(du -h $backup_file | cut -f1)
        log_message "✅ mysqlpump表级并行备份成功"
        log_message "   备份文件: $backup_file"
        log_message "   文件大小: $file_size"
        log_message "   备份耗时: ${duration}秒"
    else
        log_message "❌ mysqlpump表级并行备份失败"
        rm -f $backup_file
        return 1
    fi
    
    return 0
}

# 验证mysqlpump备份文件
validate_mysqlpump_backup() {
    local backup_file=$1
    
    log_message "验证mysqlpump备份文件: $backup_file"
    
    # 检查文件是否存在且不为空
    if [ ! -s "$backup_file" ]; then
        log_message "❌ 备份文件为空或不存在"
        return 1
    fi
    
    # 检查文件头部
    if head -10 "$backup_file" | grep -q "MySQL dump"; then
        log_message "✅ 备份文件格式验证通过"
    else
        log_message "❌ 备份文件格式错误"
        return 1
    fi
    
    # 检查是否包含数据
    if grep -q "INSERT INTO" "$backup_file"; then
        log_message "✅ 备份文件包含数据"
    else
        log_message "⚠️  备份文件可能只包含结构"
    fi
    
    return 0
}

# 性能监控
monitor_backup_performance() {
    local backup_pid=$1
    local monitor_file="$BACKUP_DIR/performance_$(date +%Y%m%d_%H%M%S).log"
    
    log_message "开始性能监控 (PID: $backup_pid)"
    
    while kill -0 $backup_pid 2>/dev/null; do
        local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
        local cpu_usage=$(ps -p $backup_pid -o %cpu --no-headers)
        local memory_usage=$(ps -p $backup_pid -o %mem --no-headers)
        local io_read=$(cat /proc/$backup_pid/io 2>/dev/null | grep read_bytes | awk '{print $2}')
        local io_write=$(cat /proc/$backup_pid/io 2>/dev/null | grep write_bytes | awk '{print $2}')
        
        echo "$timestamp,CPU:$cpu_usage%,MEM:$memory_usage%,READ:$io_read,WRITE:$io_write" >> $monitor_file
        
        sleep 5
    done
    
    log_message "性能监控完成,日志文件: $monitor_file"
}

# 主备份流程
main() {
    log_message "=== 开始mysqlpump备份 ==="
    
    # 检查mysqlpump是否可用
    if ! command -v mysqlpump >/dev/null 2>&1; then
        log_message "❌ mysqlpump命令不可用,请检查MySQL版本(需要5.7+)"
        exit 1
    fi
    
    # 执行完整备份
    if mysqlpump_full_backup; then
        log_message "✅ 完整备份成功"
    else
        log_message "❌ 完整备份失败"
    fi
    
    # 执行选择性备份
    if mysqlpump_selective_backup "ecommerce analytics"; then
        log_message "✅ 选择性备份成功"
    else
        log_message "❌ 选择性备份失败"
    fi
    
    # 执行表级并行备份
    if mysqlpump_table_parallel_backup "ecommerce" "users orders products"; then
        log_message "✅ 表级并行备份成功"
    else
        log_message "❌ 表级并行备份失败"
    fi
    
    log_message "=== mysqlpump备份完成 ==="
}

# 执行主流程
main

11.3 物理备份详解

11.3.1 Percona XtraBackup使用

”`bash #!/bin/bash

Percona XtraBackup备份脚本

配置参数

MYSQL_USER=“backup_user” MYSQL_PASSWORD=“backup_password” MYSQL_HOST=“localhost” MYSQL_PORT=“3306” MYSQL_DATADIR=“/var/lib/mysql” BACKUP_DIR=“/backup/mysql/xtrabackup” LOG_FILE=“/var/log/mysql/xtrabackup.log” FULL_BACKUP_DIR=“$BACKUP_DIR/full” INC_BACKUP_DIR=“$BACKUP_DIR/incremental” RETENTION_DAYS=7 COMPRESSION=true ENCRYPTION=true ENCRYPTION_KEY=“/etc/mysql/backup.key” PARALLEL_THREADS=4

创建备份目录

mkdir -p $FULL_BACKUP_DIR mkdir -p $INC_BACKUP_DIR mkdir -p $(dirname $LOG_FILE)

日志函数

log_message() { echo “[$(date ‘+%Y-%m-%d %H:%M:%S’)] $1” | tee -a $LOG_FILE }

检查XtraBackup是否可用

check_xtrabackup() { log_message “检查XtraBackup环境…”

if ! command -v xtrabackup >/dev/null 2>&1; then
    log_message "❌ xtrabackup命令不可用,请安装Percona XtraBackup"
    return 1
fi

local version=$(xtrabackup --version 2>&1 | head -1)
log_message "✅ XtraBackup版本: $version"

# 检查MySQL连接
if mysql -h$MYSQL_HOST -P$MYSQL_PORT -u$MYSQL_USER -p$MYSQL_PASSWORD -e "SELECT 1" >/dev/null 2>&1; then
    log_message "✅ MySQL连接成功"
else
    log_message "❌ MySQL连接失败"
    return 1
fi

return 0

}

全量备份

full_backup() { local backupname=“full$(date +%Y%m%d_%H%M%S)” local backup_path=“$FULL_BACKUP_DIR/$backup_name”

log_message "开始全量备份: $backup_name"

# 构建xtrabackup命令
local cmd="xtrabackup --backup"
cmd="$cmd --host=$MYSQL_HOST"
cmd="$cmd --port=$MYSQL_PORT"
cmd="$cmd --user=$MYSQL_USER"
cmd="$cmd --password=$MYSQL_PASSWORD"
cmd="$cmd --datadir=$MYSQL_DATADIR"
cmd="$cmd --target-dir=$backup_path"
cmd="$cmd --parallel=$PARALLEL_THREADS"

# 添加压缩选项
if [ "$COMPRESSION" = true ]; then
    cmd="$cmd --compress"
    cmd="$cmd --compress-threads=$PARALLEL_THREADS"
    log_message "启用压缩备份"
fi

# 添加加密选项
if [ "$ENCRYPTION" = true ] && [ -f "$ENCRYPTION_KEY" ]; then
    cmd="$cmd --encrypt=AES256"
    cmd="$cmd --encrypt-key-file=$ENCRYPTION_KEY"
    cmd="$cmd --encrypt-threads=$PARALLEL_THREADS"
    log_message "启用加密备份"
fi

local start_time=$(date +%s)

# 执行备份
log_message "执行命令: $cmd"
eval $cmd

local backup_result=$?
local end_time=$(date +%s)
local duration=$((end_time - start_time))

if [ $backup_result -eq 0 ]; then
    # 计算备份大小
    local backup_size=$(du -sh $backup_path | cut -f1)

    log_message "✅ 全量备份成功"
    log_message "   备份路径: $backup_path"
    log_message "   备份大小: $backup_size"
    log_message "   备份耗时: ${duration}秒"

    # 记录备份信息
    echo "$backup_path" > "$BACKUP_DIR/latest_full_backup.txt"

    # 验证备份
    validate_backup $backup_path

    return 0
else
    log_message "❌ 全量备份失败"
    rm -rf $backup_path
    return 1
fi

}

增量备份

incremental_backup() { local base_backup=“$1” local backupname=“inc$(date +%Y%m%d_%H%M%S)” local backup_path=“$INC_BACKUP_DIR/$backup_name”

if [ -z "$base_backup" ]; then
    # 查找最新的全量备份
    if [ -f "$BACKUP_DIR/latest_full_backup.txt" ]; then
        base_backup=$(cat "$BACKUP_DIR/latest_full_backup.txt")
    else
        log_message "❌ 未找到基础备份,请先执行全量备份"
        return 1
    fi
fi

if [ ! -d "$base_backup" ]; then
    log_message "❌ 基础备份目录不存在: $base_backup"
    return 1
fi

log_message "开始增量备份: $backup_name"
log_message "基础备份: $base_backup"

# 构建增量备份命令
local cmd="xtrabackup --backup"
cmd="$cmd --host=$MYSQL_HOST"
cmd="$cmd --port=$MYSQL_PORT"
cmd="$cmd --user=$MYSQL_USER"
cmd="$cmd --password=$MYSQL_PASSWORD"
cmd="$cmd --datadir=$MYSQL_DATADIR"
cmd="$cmd --target-dir=$backup_path"
cmd="$cmd --incremental-basedir=$base_backup"
cmd="$cmd --parallel=$PARALLEL_THREADS"

# 添加压缩和加密选项
if [ "$COMPRESSION" = true ]; then
    cmd="$cmd --compress"
    cmd="$cmd --compress-threads=$PARALLEL_THREADS"
fi

if [ "$ENCRYPTION" = true ] && [ -f "$ENCRYPTION_KEY" ]; then
    cmd="$cmd --encrypt=AES256"
    cmd="$cmd --encrypt-key-file=$ENCRYPTION_KEY"
    cmd="$cmd --encrypt-threads=$PARALLEL_THREADS"
fi

local start_time=$(date +%s)

# 执行增量备份
log_message "执行命令: $cmd"
eval $cmd

local backup_result=$?
local end_time=$(date +%s)
local duration=$((end_time - start_time))

if [ $backup_result -eq 0 ]; then
    local backup_size=$(du -sh $backup_path | cut -f1)

    log_message "✅ 增量备份成功"
    log_message "   备份路径: $backup_path"
    log_message "   备份大小: $backup_size"
    log_message "   备份耗时: ${duration}秒"

    # 记录增量备份信息
    echo "$backup_path" >> "$BACKUP_DIR/incremental_backups.txt"

    return 0
else
    log_message "❌ 增量备份失败"
    rm -rf $backup_path
    return 1
fi

}

验证备份

validate_backup() { local backup_path=“$1”

log_message "验证备份: $backup_path"

# 检查备份目录
if [ ! -d "$backup_path" ]; then
    log_message "❌ 备份目录不存在"
    return 1
fi

# 检查关键文件
if [ ! -f "$backup_path/xtrabackup_checkpoints" ]; then
    log_message "❌ 缺少检查点文件"
    return 1
fi

if [ ! -f "$backup_path/xtrabackup_info" ]; then
    log_message "❌ 缺少备份信息文件"
    return 1
fi

# 检查备份状态
local backup_type=$(grep "backup_type" "$backup_path/xtrabackup_checkpoints" | cut -d'=' -f2 | tr -d ' ')
local from_lsn=$(grep "from_lsn" "$backup_path/xtrabackup_checkpoints" | cut -d'=' -f2 | tr -d ' ')
local to_lsn=$(grep "to_lsn" "$backup_path/xtrabackup_checkpoints" | cut -d'=' -f2 | tr -d ' ')

log_message "✅ 备份验证通过"
log_message "   备份类型: $backup_type"
log_message "   LSN范围: $from_lsn -> $to_lsn"

return 0

}