10.1 概述

Redis持久化是将内存中的数据保存到磁盘的过程,确保数据在Redis重启后能够恢复。Redis提供了两种主要的持久化方式:RDB(Redis Database)和AOF(Append Only File)。

10.1.1 持久化的重要性

  1. 数据安全:防止因服务器故障导致的数据丢失
  2. 业务连续性:确保服务重启后数据完整性
  3. 灾难恢复:提供数据备份和恢复机制
  4. 数据迁移:支持数据在不同环境间迁移

10.1.2 持久化方式对比

特性 RDB AOF
文件大小
恢复速度
数据完整性 可能丢失部分数据 更好的数据完整性
性能影响 较小 较大
配置复杂度 简单 相对复杂

10.2 RDB持久化

10.2.1 RDB工作原理

RDB持久化通过创建数据快照的方式保存数据,生成一个紧凑的二进制文件。

import redis
import time
import os
import subprocess
from typing import Dict, List, Any, Optional
import logging
import json
from datetime import datetime, timedelta

class RDBManager:
    def __init__(self, redis_client: redis.Redis, 
                 rdb_dir: str = "/var/lib/redis",
                 rdb_filename: str = "dump.rdb"):
        self.redis_client = redis_client
        self.rdb_dir = rdb_dir
        self.rdb_filename = rdb_filename
        self.rdb_path = os.path.join(rdb_dir, rdb_filename)
        self.logger = logging.getLogger(__name__)
        
        # 确保目录存在
        os.makedirs(rdb_dir, exist_ok=True)
    
    def create_snapshot(self, background: bool = True) -> Dict[str, Any]:
        """创建RDB快照"""
        start_time = time.time()
        
        try:
            if background:
                # 后台保存,不阻塞客户端
                result = self.redis_client.bgsave()
                
                # 等待后台保存完成
                while self.redis_client.lastsave() == self.redis_client.lastsave():
                    time.sleep(0.1)
                    if time.time() - start_time > 300:  # 5分钟超时
                        raise TimeoutError("Background save timeout")
            else:
                # 前台保存,会阻塞客户端
                result = self.redis_client.save()
            
            end_time = time.time()
            
            # 获取RDB文件信息
            rdb_info = self.get_rdb_info()
            
            return {
                'success': True,
                'method': 'background' if background else 'foreground',
                'duration': end_time - start_time,
                'rdb_info': rdb_info,
                'timestamp': datetime.now().isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to create RDB snapshot: {e}")
            return {
                'success': False,
                'error': str(e),
                'timestamp': datetime.now().isoformat()
            }
    
    def get_rdb_info(self) -> Dict[str, Any]:
        """获取RDB文件信息"""
        try:
            if os.path.exists(self.rdb_path):
                stat = os.stat(self.rdb_path)
                return {
                    'file_path': self.rdb_path,
                    'file_size': stat.st_size,
                    'file_size_mb': round(stat.st_size / 1024 / 1024, 2),
                    'last_modified': datetime.fromtimestamp(stat.st_mtime).isoformat(),
                    'exists': True
                }
            else:
                return {
                    'file_path': self.rdb_path,
                    'exists': False
                }
        except Exception as e:
            return {
                'error': str(e),
                'exists': False
            }
    
    def configure_rdb(self, save_points: List[tuple] = None) -> Dict[str, Any]:
        """配置RDB自动保存"""
        if save_points is None:
            # 默认保存点:900秒内至少1个key变化,300秒内至少10个key变化,60秒内至少10000个key变化
            save_points = [(900, 1), (300, 10), (60, 10000)]
        
        try:
            # 清除现有的保存点
            self.redis_client.config_set('save', '')
            
            # 设置新的保存点
            save_config = ' '.join([f"{seconds} {changes}" for seconds, changes in save_points])
            self.redis_client.config_set('save', save_config)
            
            # 设置RDB文件名和目录
            self.redis_client.config_set('dbfilename', self.rdb_filename)
            self.redis_client.config_set('dir', self.rdb_dir)
            
            # 设置RDB压缩
            self.redis_client.config_set('rdbcompression', 'yes')
            
            # 设置RDB校验
            self.redis_client.config_set('rdbchecksum', 'yes')
            
            return {
                'success': True,
                'save_points': save_points,
                'rdb_filename': self.rdb_filename,
                'rdb_dir': self.rdb_dir,
                'compression': True,
                'checksum': True
            }
            
        except Exception as e:
            self.logger.error(f"Failed to configure RDB: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def restore_from_rdb(self, rdb_file_path: str = None) -> Dict[str, Any]:
        """从RDB文件恢复数据"""
        rdb_file = rdb_file_path or self.rdb_path
        
        if not os.path.exists(rdb_file):
            return {
                'success': False,
                'error': f"RDB file not found: {rdb_file}"
            }
        
        try:
            # 停止Redis服务(需要管理员权限)
            # 注意:这里只是示例,实际使用时需要根据具体环境调整
            
            # 备份当前RDB文件
            if os.path.exists(self.rdb_path):
                backup_path = f"{self.rdb_path}.backup.{int(time.time())}"
                os.rename(self.rdb_path, backup_path)
                self.logger.info(f"Current RDB backed up to: {backup_path}")
            
            # 复制新的RDB文件
            import shutil
            shutil.copy2(rdb_file, self.rdb_path)
            
            return {
                'success': True,
                'restored_from': rdb_file,
                'restored_to': self.rdb_path,
                'message': 'RDB file restored. Please restart Redis to load the data.'
            }
            
        except Exception as e:
            self.logger.error(f"Failed to restore from RDB: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def analyze_rdb(self, rdb_file_path: str = None) -> Dict[str, Any]:
        """分析RDB文件内容"""
        rdb_file = rdb_file_path or self.rdb_path
        
        if not os.path.exists(rdb_file):
            return {
                'success': False,
                'error': f"RDB file not found: {rdb_file}"
            }
        
        try:
            # 使用redis-rdb-tools分析RDB文件(需要安装rdbtools)
            # pip install rdbtools
            
            analysis_result = {
                'file_info': self.get_rdb_info(),
                'redis_info': self._get_redis_info(),
                'analysis_timestamp': datetime.now().isoformat()
            }
            
            # 尝试使用rdb命令分析
            try:
                # 获取内存使用情况
                memory_cmd = f"rdb --command memory {rdb_file}"
                memory_result = subprocess.run(memory_cmd, shell=True, 
                                             capture_output=True, text=True, timeout=30)
                
                if memory_result.returncode == 0:
                    analysis_result['memory_analysis'] = memory_result.stdout
                
                # 获取键统计
                stats_cmd = f"rdb --command stats {rdb_file}"
                stats_result = subprocess.run(stats_cmd, shell=True, 
                                            capture_output=True, text=True, timeout=30)
                
                if stats_result.returncode == 0:
                    analysis_result['key_statistics'] = stats_result.stdout
                    
            except (subprocess.TimeoutExpired, FileNotFoundError):
                analysis_result['note'] = 'rdbtools not available for detailed analysis'
            
            return {
                'success': True,
                'analysis': analysis_result
            }
            
        except Exception as e:
            self.logger.error(f"Failed to analyze RDB: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def _get_redis_info(self) -> Dict[str, Any]:
        """获取Redis实例信息"""
        try:
            info = self.redis_client.info()
            return {
                'redis_version': info.get('redis_version'),
                'used_memory': info.get('used_memory'),
                'used_memory_human': info.get('used_memory_human'),
                'connected_clients': info.get('connected_clients'),
                'total_commands_processed': info.get('total_commands_processed'),
                'keyspace': {k: v for k, v in info.items() if k.startswith('db')}
            }
        except Exception as e:
            return {'error': str(e)}

### 10.2.2 RDB配置优化

```python
class RDBOptimizer:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.logger = logging.getLogger(__name__)
    
    def optimize_rdb_config(self, workload_type: str = 'balanced') -> Dict[str, Any]:
        """根据工作负载优化RDB配置"""
        
        configs = {
            'high_write': {
                'save_points': [(1800, 1), (600, 100), (120, 10000)],
                'rdbcompression': 'yes',
                'rdbchecksum': 'yes',
                'stop_writes_on_bgsave_error': 'no',
                'description': '高写入负载,减少保存频率'
            },
            'balanced': {
                'save_points': [(900, 1), (300, 10), (60, 10000)],
                'rdbcompression': 'yes',
                'rdbchecksum': 'yes',
                'stop_writes_on_bgsave_error': 'yes',
                'description': '平衡的读写负载'
            },
            'high_read': {
                'save_points': [(600, 1), (180, 10), (30, 1000)],
                'rdbcompression': 'yes',
                'rdbchecksum': 'yes',
                'stop_writes_on_bgsave_error': 'yes',
                'description': '高读取负载,增加保存频率'
            },
            'memory_optimized': {
                'save_points': [(3600, 1), (1800, 10), (300, 10000)],
                'rdbcompression': 'yes',
                'rdbchecksum': 'no',
                'stop_writes_on_bgsave_error': 'no',
                'description': '内存优化,减少保存频率和校验'
            }
        }
        
        if workload_type not in configs:
            return {
                'success': False,
                'error': f"Unknown workload type: {workload_type}",
                'available_types': list(configs.keys())
            }
        
        config = configs[workload_type]
        
        try:
            # 应用配置
            save_config = ' '.join([f"{seconds} {changes}" 
                                  for seconds, changes in config['save_points']])
            
            self.redis_client.config_set('save', save_config)
            self.redis_client.config_set('rdbcompression', config['rdbcompression'])
            self.redis_client.config_set('rdbchecksum', config['rdbchecksum'])
            self.redis_client.config_set('stop-writes-on-bgsave-error', 
                                       config['stop_writes_on_bgsave_error'])
            
            return {
                'success': True,
                'workload_type': workload_type,
                'applied_config': config,
                'timestamp': datetime.now().isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to optimize RDB config: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def monitor_rdb_performance(self, duration_minutes: int = 60) -> Dict[str, Any]:
        """监控RDB性能"""
        start_time = time.time()
        end_time = start_time + (duration_minutes * 60)
        
        performance_data = {
            'start_time': datetime.fromtimestamp(start_time).isoformat(),
            'duration_minutes': duration_minutes,
            'snapshots': [],
            'metrics': {
                'total_bgsave_count': 0,
                'total_bgsave_time': 0,
                'avg_bgsave_time': 0,
                'max_bgsave_time': 0,
                'min_bgsave_time': float('inf')
            }
        }
        
        last_bgsave_time = None
        
        try:
            while time.time() < end_time:
                current_time = time.time()
                info = self.redis_client.info()
                
                # 检查是否有新的后台保存
                current_bgsave_time = info.get('rdb_last_bgsave_time_sec', 0)
                
                if last_bgsave_time is None:
                    last_bgsave_time = current_bgsave_time
                elif current_bgsave_time != last_bgsave_time:
                    # 发生了新的后台保存
                    performance_data['snapshots'].append({
                        'timestamp': datetime.fromtimestamp(current_time).isoformat(),
                        'bgsave_time': current_bgsave_time,
                        'rdb_last_save_time': info.get('rdb_last_save_time'),
                        'rdb_changes_since_last_save': info.get('rdb_changes_since_last_save'),
                        'used_memory': info.get('used_memory'),
                        'used_memory_rss': info.get('used_memory_rss')
                    })
                    
                    # 更新统计
                    performance_data['metrics']['total_bgsave_count'] += 1
                    performance_data['metrics']['total_bgsave_time'] += current_bgsave_time
                    performance_data['metrics']['max_bgsave_time'] = max(
                        performance_data['metrics']['max_bgsave_time'], 
                        current_bgsave_time
                    )
                    performance_data['metrics']['min_bgsave_time'] = min(
                        performance_data['metrics']['min_bgsave_time'], 
                        current_bgsave_time
                    )
                    
                    last_bgsave_time = current_bgsave_time
                
                time.sleep(10)  # 每10秒检查一次
            
            # 计算平均时间
            if performance_data['metrics']['total_bgsave_count'] > 0:
                performance_data['metrics']['avg_bgsave_time'] = (
                    performance_data['metrics']['total_bgsave_time'] / 
                    performance_data['metrics']['total_bgsave_count']
                )
            
            if performance_data['metrics']['min_bgsave_time'] == float('inf'):
                performance_data['metrics']['min_bgsave_time'] = 0
            
            performance_data['end_time'] = datetime.now().isoformat()
            
            return {
                'success': True,
                'performance_data': performance_data
            }
            
        except Exception as e:
            self.logger.error(f"Failed to monitor RDB performance: {e}")
            return {
                'success': False,
                'error': str(e),
                'partial_data': performance_data
            }

## 10.3 AOF持久化

### 10.3.1 AOF工作原理

AOF(Append Only File)持久化通过记录每个写操作命令来保存数据。

```python
class AOFManager:
    def __init__(self, redis_client: redis.Redis, 
                 aof_dir: str = "/var/lib/redis",
                 aof_filename: str = "appendonly.aof"):
        self.redis_client = redis_client
        self.aof_dir = aof_dir
        self.aof_filename = aof_filename
        self.aof_path = os.path.join(aof_dir, aof_filename)
        self.logger = logging.getLogger(__name__)
        
        # 确保目录存在
        os.makedirs(aof_dir, exist_ok=True)
    
    def enable_aof(self, fsync_policy: str = 'everysec') -> Dict[str, Any]:
        """启用AOF持久化"""
        try:
            # 启用AOF
            self.redis_client.config_set('appendonly', 'yes')
            
            # 设置AOF文件名和目录
            self.redis_client.config_set('appendfilename', self.aof_filename)
            self.redis_client.config_set('dir', self.aof_dir)
            
            # 设置fsync策略
            valid_policies = ['always', 'everysec', 'no']
            if fsync_policy not in valid_policies:
                fsync_policy = 'everysec'
            
            self.redis_client.config_set('appendfsync', fsync_policy)
            
            # 设置AOF重写配置
            self.redis_client.config_set('auto-aof-rewrite-percentage', '100')
            self.redis_client.config_set('auto-aof-rewrite-min-size', '64mb')
            
            # 设置AOF加载时的错误处理
            self.redis_client.config_set('aof-load-truncated', 'yes')
            
            return {
                'success': True,
                'aof_enabled': True,
                'aof_filename': self.aof_filename,
                'aof_dir': self.aof_dir,
                'fsync_policy': fsync_policy,
                'auto_rewrite_enabled': True
            }
            
        except Exception as e:
            self.logger.error(f"Failed to enable AOF: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def disable_aof(self) -> Dict[str, Any]:
        """禁用AOF持久化"""
        try:
            self.redis_client.config_set('appendonly', 'no')
            
            return {
                'success': True,
                'aof_enabled': False,
                'message': 'AOF persistence disabled'
            }
            
        except Exception as e:
            self.logger.error(f"Failed to disable AOF: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def rewrite_aof(self, background: bool = True) -> Dict[str, Any]:
        """重写AOF文件"""
        start_time = time.time()
        
        try:
            if background:
                # 后台重写
                result = self.redis_client.bgrewriteaof()
                
                # 等待重写完成
                while True:
                    info = self.redis_client.info()
                    if info.get('aof_rewrite_in_progress', 0) == 0:
                        break
                    time.sleep(0.5)
                    if time.time() - start_time > 600:  # 10分钟超时
                        raise TimeoutError("AOF rewrite timeout")
            else:
                # 前台重写(Redis 6.0+)
                # 注意:这个功能可能不在所有版本中可用
                result = self.redis_client.execute_command('BGREWRITEAOF')
            
            end_time = time.time()
            
            # 获取AOF文件信息
            aof_info = self.get_aof_info()
            
            return {
                'success': True,
                'method': 'background' if background else 'foreground',
                'duration': end_time - start_time,
                'aof_info': aof_info,
                'timestamp': datetime.now().isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to rewrite AOF: {e}")
            return {
                'success': False,
                'error': str(e),
                'timestamp': datetime.now().isoformat()
            }
    
    def get_aof_info(self) -> Dict[str, Any]:
        """获取AOF文件信息"""
        try:
            info = self.redis_client.info('persistence')
            
            aof_info = {
                'aof_enabled': info.get('aof_enabled', 0) == 1,
                'aof_rewrite_in_progress': info.get('aof_rewrite_in_progress', 0) == 1,
                'aof_rewrite_scheduled': info.get('aof_rewrite_scheduled', 0) == 1,
                'aof_last_rewrite_time_sec': info.get('aof_last_rewrite_time_sec', -1),
                'aof_current_rewrite_time_sec': info.get('aof_current_rewrite_time_sec', -1),
                'aof_last_bgrewrite_status': info.get('aof_last_bgrewrite_status', 'unknown'),
                'aof_last_write_status': info.get('aof_last_write_status', 'unknown'),
                'aof_current_size': info.get('aof_current_size', 0),
                'aof_base_size': info.get('aof_base_size', 0),
                'aof_pending_rewrite': info.get('aof_pending_rewrite', 0),
                'aof_buffer_length': info.get('aof_buffer_length', 0),
                'aof_rewrite_buffer_length': info.get('aof_rewrite_buffer_length', 0),
                'aof_pending_bio_fsync': info.get('aof_pending_bio_fsync', 0),
                'aof_delayed_fsync': info.get('aof_delayed_fsync', 0)
            }
            
            # 添加文件系统信息
            if os.path.exists(self.aof_path):
                stat = os.stat(self.aof_path)
                aof_info.update({
                    'file_path': self.aof_path,
                    'file_size': stat.st_size,
                    'file_size_mb': round(stat.st_size / 1024 / 1024, 2),
                    'last_modified': datetime.fromtimestamp(stat.st_mtime).isoformat(),
                    'file_exists': True
                })
            else:
                aof_info.update({
                    'file_path': self.aof_path,
                    'file_exists': False
                })
            
            return aof_info
            
        except Exception as e:
            return {
                'error': str(e),
                'file_path': self.aof_path
            }
    
    def configure_aof(self, fsync_policy: str = 'everysec',
                     auto_rewrite_percentage: int = 100,
                     auto_rewrite_min_size: str = '64mb') -> Dict[str, Any]:
        """配置AOF参数"""
        try:
            # 设置fsync策略
            valid_policies = ['always', 'everysec', 'no']
            if fsync_policy not in valid_policies:
                return {
                    'success': False,
                    'error': f"Invalid fsync policy: {fsync_policy}",
                    'valid_policies': valid_policies
                }
            
            self.redis_client.config_set('appendfsync', fsync_policy)
            
            # 设置自动重写参数
            self.redis_client.config_set('auto-aof-rewrite-percentage', 
                                       str(auto_rewrite_percentage))
            self.redis_client.config_set('auto-aof-rewrite-min-size', 
                                       auto_rewrite_min_size)
            
            # 设置其他AOF相关配置
            self.redis_client.config_set('aof-load-truncated', 'yes')
            self.redis_client.config_set('aof-use-rdb-preamble', 'yes')
            
            return {
                'success': True,
                'fsync_policy': fsync_policy,
                'auto_rewrite_percentage': auto_rewrite_percentage,
                'auto_rewrite_min_size': auto_rewrite_min_size,
                'load_truncated': True,
                'use_rdb_preamble': True
            }
            
        except Exception as e:
            self.logger.error(f"Failed to configure AOF: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def repair_aof(self, backup: bool = True) -> Dict[str, Any]:
        """修复损坏的AOF文件"""
        if not os.path.exists(self.aof_path):
            return {
                'success': False,
                'error': f"AOF file not found: {self.aof_path}"
            }
        
        try:
            # 备份原文件
            if backup:
                backup_path = f"{self.aof_path}.backup.{int(time.time())}"
                import shutil
                shutil.copy2(self.aof_path, backup_path)
                self.logger.info(f"AOF file backed up to: {backup_path}")
            
            # 使用redis-check-aof修复文件
            repair_cmd = f"redis-check-aof --fix {self.aof_path}"
            result = subprocess.run(repair_cmd, shell=True, 
                                  capture_output=True, text=True, timeout=60)
            
            if result.returncode == 0:
                return {
                    'success': True,
                    'repaired_file': self.aof_path,
                    'backup_file': backup_path if backup else None,
                    'repair_output': result.stdout,
                    'message': 'AOF file repaired successfully'
                }
            else:
                return {
                    'success': False,
                    'error': f"AOF repair failed: {result.stderr}",
                    'repair_output': result.stdout
                }
                
        except subprocess.TimeoutExpired:
            return {
                'success': False,
                'error': 'AOF repair timeout'
            }
        except FileNotFoundError:
            return {
                'success': False,
                'error': 'redis-check-aof command not found'
            }
        except Exception as e:
            self.logger.error(f"Failed to repair AOF: {e}")
            return {
                'success': False,
                'error': str(e)
            }

### 10.3.2 AOF性能优化

```python
class AOFOptimizer:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.logger = logging.getLogger(__name__)
    
    def optimize_aof_config(self, workload_type: str = 'balanced') -> Dict[str, Any]:
        """根据工作负载优化AOF配置"""
        
        configs = {
            'high_performance': {
                'appendfsync': 'no',
                'auto_aof_rewrite_percentage': 200,
                'auto_aof_rewrite_min_size': '128mb',
                'aof_use_rdb_preamble': 'yes',
                'description': '高性能,可能丢失更多数据'
            },
            'balanced': {
                'appendfsync': 'everysec',
                'auto_aof_rewrite_percentage': 100,
                'auto_aof_rewrite_min_size': '64mb',
                'aof_use_rdb_preamble': 'yes',
                'description': '平衡性能和数据安全'
            },
            'high_durability': {
                'appendfsync': 'always',
                'auto_aof_rewrite_percentage': 50,
                'auto_aof_rewrite_min_size': '32mb',
                'aof_use_rdb_preamble': 'no',
                'description': '高数据安全性,性能较低'
            },
            'memory_optimized': {
                'appendfsync': 'everysec',
                'auto_aof_rewrite_percentage': 50,
                'auto_aof_rewrite_min_size': '32mb',
                'aof_use_rdb_preamble': 'yes',
                'description': '内存优化,频繁重写'
            }
        }
        
        if workload_type not in configs:
            return {
                'success': False,
                'error': f"Unknown workload type: {workload_type}",
                'available_types': list(configs.keys())
            }
        
        config = configs[workload_type]
        
        try:
            # 应用配置
            self.redis_client.config_set('appendfsync', config['appendfsync'])
            self.redis_client.config_set('auto-aof-rewrite-percentage', 
                                       str(config['auto_aof_rewrite_percentage']))
            self.redis_client.config_set('auto-aof-rewrite-min-size', 
                                       config['auto_aof_rewrite_min_size'])
            self.redis_client.config_set('aof-use-rdb-preamble', 
                                       config['aof_use_rdb_preamble'])
            
            return {
                'success': True,
                'workload_type': workload_type,
                'applied_config': config,
                'timestamp': datetime.now().isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to optimize AOF config: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def monitor_aof_performance(self, duration_minutes: int = 60) -> Dict[str, Any]:
        """监控AOF性能"""
        start_time = time.time()
        end_time = start_time + (duration_minutes * 60)
        
        performance_data = {
            'start_time': datetime.fromtimestamp(start_time).isoformat(),
            'duration_minutes': duration_minutes,
            'snapshots': [],
            'metrics': {
                'total_rewrites': 0,
                'total_rewrite_time': 0,
                'avg_rewrite_time': 0,
                'max_rewrite_time': 0,
                'min_rewrite_time': float('inf'),
                'aof_size_growth': 0,
                'delayed_fsync_count': 0
            }
        }
        
        initial_info = self.redis_client.info('persistence')
        initial_aof_size = initial_info.get('aof_current_size', 0)
        last_rewrite_count = 0
        
        try:
            while time.time() < end_time:
                current_time = time.time()
                info = self.redis_client.info('persistence')
                
                # 记录快照
                snapshot = {
                    'timestamp': datetime.fromtimestamp(current_time).isoformat(),
                    'aof_current_size': info.get('aof_current_size', 0),
                    'aof_base_size': info.get('aof_base_size', 0),
                    'aof_rewrite_in_progress': info.get('aof_rewrite_in_progress', 0),
                    'aof_last_rewrite_time_sec': info.get('aof_last_rewrite_time_sec', -1),
                    'aof_delayed_fsync': info.get('aof_delayed_fsync', 0),
                    'aof_buffer_length': info.get('aof_buffer_length', 0),
                    'used_memory': info.get('used_memory', 0)
                }
                
                performance_data['snapshots'].append(snapshot)
                
                # 检查重写统计
                current_rewrite_time = info.get('aof_last_rewrite_time_sec', -1)
                if current_rewrite_time > 0 and current_rewrite_time != last_rewrite_count:
                    performance_data['metrics']['total_rewrites'] += 1
                    performance_data['metrics']['total_rewrite_time'] += current_rewrite_time
                    performance_data['metrics']['max_rewrite_time'] = max(
                        performance_data['metrics']['max_rewrite_time'], 
                        current_rewrite_time
                    )
                    performance_data['metrics']['min_rewrite_time'] = min(
                        performance_data['metrics']['min_rewrite_time'], 
                        current_rewrite_time
                    )
                    last_rewrite_count = current_rewrite_time
                
                # 记录延迟fsync
                delayed_fsync = info.get('aof_delayed_fsync', 0)
                if delayed_fsync > performance_data['metrics']['delayed_fsync_count']:
                    performance_data['metrics']['delayed_fsync_count'] = delayed_fsync
                
                time.sleep(10)  # 每10秒检查一次
            
            # 计算最终统计
            final_info = self.redis_client.info('persistence')
            final_aof_size = final_info.get('aof_current_size', 0)
            performance_data['metrics']['aof_size_growth'] = final_aof_size - initial_aof_size
            
            if performance_data['metrics']['total_rewrites'] > 0:
                performance_data['metrics']['avg_rewrite_time'] = (
                    performance_data['metrics']['total_rewrite_time'] / 
                    performance_data['metrics']['total_rewrites']
                )
            
            if performance_data['metrics']['min_rewrite_time'] == float('inf'):
                performance_data['metrics']['min_rewrite_time'] = 0
            
            performance_data['end_time'] = datetime.now().isoformat()
            
            return {
                'success': True,
                'performance_data': performance_data
            }
            
        except Exception as e:
            self.logger.error(f"Failed to monitor AOF performance: {e}")
            return {
                'success': False,
                'error': str(e),
                'partial_data': performance_data
            }

# 使用示例
if __name__ == "__main__":
    redis_client = redis.Redis(decode_responses=True)
    
    # RDB管理示例
    rdb_manager = RDBManager(redis_client)
    
    # 配置RDB
    rdb_config = rdb_manager.configure_rdb([(900, 1), (300, 10), (60, 10000)])
    print(f"RDB configuration: {rdb_config}")
    
    # 创建快照
    snapshot_result = rdb_manager.create_snapshot(background=True)
    print(f"Snapshot result: {snapshot_result}")
    
    # 获取RDB信息
    rdb_info = rdb_manager.get_rdb_info()
    print(f"RDB info: {rdb_info}")
    
    # AOF管理示例
    aof_manager = AOFManager(redis_client)
    
    # 启用AOF
    aof_enable = aof_manager.enable_aof(fsync_policy='everysec')
    print(f"AOF enable result: {aof_enable}")
    
    # 配置AOF
    aof_config = aof_manager.configure_aof(
        fsync_policy='everysec',
        auto_rewrite_percentage=100,
        auto_rewrite_min_size='64mb'
    )
    print(f"AOF configuration: {aof_config}")
    
    # 获取AOF信息
    aof_info = aof_manager.get_aof_info()
    print(f"AOF info: {aof_info}")
    
    # 性能优化示例
    rdb_optimizer = RDBOptimizer(redis_client)
    aof_optimizer = AOFOptimizer(redis_client)
    
    # 优化RDB配置
    rdb_optimization = rdb_optimizer.optimize_rdb_config('balanced')
    print(f"RDB optimization: {rdb_optimization}")
    
    # 优化AOF配置
    aof_optimization = aof_optimizer.optimize_aof_config('balanced')
    print(f"AOF optimization: {aof_optimization}")

10.4 混合持久化

10.4.1 RDB + AOF混合模式

Redis 4.0引入了混合持久化模式,结合了RDB和AOF的优点。

class HybridPersistenceManager:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.rdb_manager = RDBManager(redis_client)
        self.aof_manager = AOFManager(redis_client)
        self.logger = logging.getLogger(__name__)
    
    def enable_hybrid_persistence(self, 
                                rdb_save_points: List[tuple] = None,
                                aof_fsync_policy: str = 'everysec') -> Dict[str, Any]:
        """启用混合持久化模式"""
        try:
            # 启用RDB
            rdb_result = self.rdb_manager.configure_rdb(rdb_save_points)
            if not rdb_result['success']:
                return {
                    'success': False,
                    'error': f"Failed to configure RDB: {rdb_result['error']}"
                }
            
            # 启用AOF
            aof_result = self.aof_manager.enable_aof(aof_fsync_policy)
            if not aof_result['success']:
                return {
                    'success': False,
                    'error': f"Failed to enable AOF: {aof_result['error']}"
                }
            
            # 启用AOF使用RDB前导
            self.redis_client.config_set('aof-use-rdb-preamble', 'yes')
            
            return {
                'success': True,
                'mode': 'hybrid',
                'rdb_config': rdb_result,
                'aof_config': aof_result,
                'aof_use_rdb_preamble': True,
                'timestamp': datetime.now().isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to enable hybrid persistence: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def get_persistence_status(self) -> Dict[str, Any]:
        """获取持久化状态"""
        try:
            info = self.redis_client.info('persistence')
            
            # RDB状态
            rdb_status = {
                'enabled': len(self.redis_client.config_get('save')['save']) > 0,
                'last_save_time': info.get('rdb_last_save_time', 0),
                'last_bgsave_status': info.get('rdb_last_bgsave_status', 'unknown'),
                'last_bgsave_time_sec': info.get('rdb_last_bgsave_time_sec', -1),
                'current_bgsave_time_sec': info.get('rdb_current_bgsave_time_sec', -1),
                'bgsave_in_progress': info.get('rdb_bgsave_in_progress', 0) == 1,
                'changes_since_last_save': info.get('rdb_changes_since_last_save', 0)
            }
            
            # AOF状态
            aof_status = {
                'enabled': info.get('aof_enabled', 0) == 1,
                'rewrite_in_progress': info.get('aof_rewrite_in_progress', 0) == 1,
                'last_rewrite_time_sec': info.get('aof_last_rewrite_time_sec', -1),
                'current_rewrite_time_sec': info.get('aof_current_rewrite_time_sec', -1),
                'last_bgrewrite_status': info.get('aof_last_bgrewrite_status', 'unknown'),
                'last_write_status': info.get('aof_last_write_status', 'unknown'),
                'current_size': info.get('aof_current_size', 0),
                'base_size': info.get('aof_base_size', 0),
                'use_rdb_preamble': self.redis_client.config_get('aof-use-rdb-preamble')['aof-use-rdb-preamble'] == 'yes'
            }
            
            # 混合模式状态
            hybrid_status = {
                'enabled': rdb_status['enabled'] and aof_status['enabled'],
                'rdb_preamble_enabled': aof_status['use_rdb_preamble']
            }
            
            return {
                'success': True,
                'rdb_status': rdb_status,
                'aof_status': aof_status,
                'hybrid_status': hybrid_status,
                'timestamp': datetime.now().isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to get persistence status: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def optimize_hybrid_config(self, workload_type: str = 'balanced') -> Dict[str, Any]:
        """优化混合持久化配置"""
        
        configs = {
            'high_write': {
                'rdb_save_points': [(1800, 1), (600, 100), (120, 10000)],
                'aof_fsync': 'everysec',
                'aof_rewrite_percentage': 200,
                'aof_rewrite_min_size': '128mb',
                'description': '高写入负载优化'
            },
            'balanced': {
                'rdb_save_points': [(900, 1), (300, 10), (60, 10000)],
                'aof_fsync': 'everysec',
                'aof_rewrite_percentage': 100,
                'aof_rewrite_min_size': '64mb',
                'description': '平衡配置'
            },
            'high_read': {
                'rdb_save_points': [(600, 1), (180, 10), (30, 1000)],
                'aof_fsync': 'everysec',
                'aof_rewrite_percentage': 100,
                'aof_rewrite_min_size': '64mb',
                'description': '高读取负载优化'
            },
            'memory_critical': {
                'rdb_save_points': [(3600, 1), (1800, 10), (300, 10000)],
                'aof_fsync': 'everysec',
                'aof_rewrite_percentage': 50,
                'aof_rewrite_min_size': '32mb',
                'description': '内存关键优化'
            }
        }
        
        if workload_type not in configs:
            return {
                'success': False,
                'error': f"Unknown workload type: {workload_type}",
                'available_types': list(configs.keys())
            }
        
        config = configs[workload_type]
        
        try:
            # 配置RDB
            rdb_save_config = ' '.join([f"{seconds} {changes}" 
                                      for seconds, changes in config['rdb_save_points']])
            self.redis_client.config_set('save', rdb_save_config)
            
            # 配置AOF
            self.redis_client.config_set('appendfsync', config['aof_fsync'])
            self.redis_client.config_set('auto-aof-rewrite-percentage', 
                                       str(config['aof_rewrite_percentage']))
            self.redis_client.config_set('auto-aof-rewrite-min-size', 
                                       config['aof_rewrite_min_size'])
            
            # 启用混合模式
            self.redis_client.config_set('aof-use-rdb-preamble', 'yes')
            
            return {
                'success': True,
                'workload_type': workload_type,
                'applied_config': config,
                'hybrid_mode_enabled': True,
                'timestamp': datetime.now().isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to optimize hybrid config: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def create_full_backup(self, backup_dir: str = None) -> Dict[str, Any]:
        """创建完整备份(RDB + AOF)"""
        if backup_dir is None:
            backup_dir = f"/tmp/redis_backup_{int(time.time())}"
        
        os.makedirs(backup_dir, exist_ok=True)
        
        try:
            backup_info = {
                'backup_dir': backup_dir,
                'timestamp': datetime.now().isoformat(),
                'files': []
            }
            
            # 创建RDB快照
            rdb_result = self.rdb_manager.create_snapshot(background=True)
            if rdb_result['success']:
                # 复制RDB文件
                rdb_backup_path = os.path.join(backup_dir, 'dump.rdb')
                if os.path.exists(self.rdb_manager.rdb_path):
                    import shutil
                    shutil.copy2(self.rdb_manager.rdb_path, rdb_backup_path)
                    backup_info['files'].append({
                        'type': 'rdb',
                        'source': self.rdb_manager.rdb_path,
                        'backup': rdb_backup_path,
                        'size': os.path.getsize(rdb_backup_path)
                    })
            
            # 复制AOF文件
            if os.path.exists(self.aof_manager.aof_path):
                aof_backup_path = os.path.join(backup_dir, 'appendonly.aof')
                import shutil
                shutil.copy2(self.aof_manager.aof_path, aof_backup_path)
                backup_info['files'].append({
                    'type': 'aof',
                    'source': self.aof_manager.aof_path,
                    'backup': aof_backup_path,
                    'size': os.path.getsize(aof_backup_path)
                })
            
            # 保存配置信息
            config_backup_path = os.path.join(backup_dir, 'redis_config.json')
            config_info = {
                'redis_version': self.redis_client.info()['redis_version'],
                'persistence_status': self.get_persistence_status(),
                'config': dict(self.redis_client.config_get('*'))
            }
            
            with open(config_backup_path, 'w') as f:
                json.dump(config_info, f, indent=2, default=str)
            
            backup_info['files'].append({
                'type': 'config',
                'backup': config_backup_path,
                'size': os.path.getsize(config_backup_path)
            })
            
            backup_info['total_size'] = sum(f['size'] for f in backup_info['files'])
            backup_info['total_size_mb'] = round(backup_info['total_size'] / 1024 / 1024, 2)
            
            return {
                'success': True,
                'backup_info': backup_info
            }
            
        except Exception as e:
            self.logger.error(f"Failed to create full backup: {e}")
            return {
                'success': False,
                'error': str(e),
                'backup_dir': backup_dir
            }

下一章将介绍Redis集群和高可用性配置。