10.1 概述
Redis持久化是将内存中的数据保存到磁盘的过程,确保数据在Redis重启后能够恢复。Redis提供了两种主要的持久化方式:RDB(Redis Database)和AOF(Append Only File)。
10.1.1 持久化的重要性
- 数据安全:防止因服务器故障导致的数据丢失
- 业务连续性:确保服务重启后数据完整性
- 灾难恢复:提供数据备份和恢复机制
- 数据迁移:支持数据在不同环境间迁移
10.1.2 持久化方式对比
特性 | RDB | AOF |
---|---|---|
文件大小 | 小 | 大 |
恢复速度 | 快 | 慢 |
数据完整性 | 可能丢失部分数据 | 更好的数据完整性 |
性能影响 | 较小 | 较大 |
配置复杂度 | 简单 | 相对复杂 |
10.2 RDB持久化
10.2.1 RDB工作原理
RDB持久化通过创建数据快照的方式保存数据,生成一个紧凑的二进制文件。
import redis
import time
import os
import subprocess
from typing import Dict, List, Any, Optional
import logging
import json
from datetime import datetime, timedelta
class RDBManager:
def __init__(self, redis_client: redis.Redis,
rdb_dir: str = "/var/lib/redis",
rdb_filename: str = "dump.rdb"):
self.redis_client = redis_client
self.rdb_dir = rdb_dir
self.rdb_filename = rdb_filename
self.rdb_path = os.path.join(rdb_dir, rdb_filename)
self.logger = logging.getLogger(__name__)
# 确保目录存在
os.makedirs(rdb_dir, exist_ok=True)
def create_snapshot(self, background: bool = True) -> Dict[str, Any]:
"""创建RDB快照"""
start_time = time.time()
try:
if background:
# 后台保存,不阻塞客户端
result = self.redis_client.bgsave()
# 等待后台保存完成
while self.redis_client.lastsave() == self.redis_client.lastsave():
time.sleep(0.1)
if time.time() - start_time > 300: # 5分钟超时
raise TimeoutError("Background save timeout")
else:
# 前台保存,会阻塞客户端
result = self.redis_client.save()
end_time = time.time()
# 获取RDB文件信息
rdb_info = self.get_rdb_info()
return {
'success': True,
'method': 'background' if background else 'foreground',
'duration': end_time - start_time,
'rdb_info': rdb_info,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Failed to create RDB snapshot: {e}")
return {
'success': False,
'error': str(e),
'timestamp': datetime.now().isoformat()
}
def get_rdb_info(self) -> Dict[str, Any]:
"""获取RDB文件信息"""
try:
if os.path.exists(self.rdb_path):
stat = os.stat(self.rdb_path)
return {
'file_path': self.rdb_path,
'file_size': stat.st_size,
'file_size_mb': round(stat.st_size / 1024 / 1024, 2),
'last_modified': datetime.fromtimestamp(stat.st_mtime).isoformat(),
'exists': True
}
else:
return {
'file_path': self.rdb_path,
'exists': False
}
except Exception as e:
return {
'error': str(e),
'exists': False
}
def configure_rdb(self, save_points: List[tuple] = None) -> Dict[str, Any]:
"""配置RDB自动保存"""
if save_points is None:
# 默认保存点:900秒内至少1个key变化,300秒内至少10个key变化,60秒内至少10000个key变化
save_points = [(900, 1), (300, 10), (60, 10000)]
try:
# 清除现有的保存点
self.redis_client.config_set('save', '')
# 设置新的保存点
save_config = ' '.join([f"{seconds} {changes}" for seconds, changes in save_points])
self.redis_client.config_set('save', save_config)
# 设置RDB文件名和目录
self.redis_client.config_set('dbfilename', self.rdb_filename)
self.redis_client.config_set('dir', self.rdb_dir)
# 设置RDB压缩
self.redis_client.config_set('rdbcompression', 'yes')
# 设置RDB校验
self.redis_client.config_set('rdbchecksum', 'yes')
return {
'success': True,
'save_points': save_points,
'rdb_filename': self.rdb_filename,
'rdb_dir': self.rdb_dir,
'compression': True,
'checksum': True
}
except Exception as e:
self.logger.error(f"Failed to configure RDB: {e}")
return {
'success': False,
'error': str(e)
}
def restore_from_rdb(self, rdb_file_path: str = None) -> Dict[str, Any]:
"""从RDB文件恢复数据"""
rdb_file = rdb_file_path or self.rdb_path
if not os.path.exists(rdb_file):
return {
'success': False,
'error': f"RDB file not found: {rdb_file}"
}
try:
# 停止Redis服务(需要管理员权限)
# 注意:这里只是示例,实际使用时需要根据具体环境调整
# 备份当前RDB文件
if os.path.exists(self.rdb_path):
backup_path = f"{self.rdb_path}.backup.{int(time.time())}"
os.rename(self.rdb_path, backup_path)
self.logger.info(f"Current RDB backed up to: {backup_path}")
# 复制新的RDB文件
import shutil
shutil.copy2(rdb_file, self.rdb_path)
return {
'success': True,
'restored_from': rdb_file,
'restored_to': self.rdb_path,
'message': 'RDB file restored. Please restart Redis to load the data.'
}
except Exception as e:
self.logger.error(f"Failed to restore from RDB: {e}")
return {
'success': False,
'error': str(e)
}
def analyze_rdb(self, rdb_file_path: str = None) -> Dict[str, Any]:
"""分析RDB文件内容"""
rdb_file = rdb_file_path or self.rdb_path
if not os.path.exists(rdb_file):
return {
'success': False,
'error': f"RDB file not found: {rdb_file}"
}
try:
# 使用redis-rdb-tools分析RDB文件(需要安装rdbtools)
# pip install rdbtools
analysis_result = {
'file_info': self.get_rdb_info(),
'redis_info': self._get_redis_info(),
'analysis_timestamp': datetime.now().isoformat()
}
# 尝试使用rdb命令分析
try:
# 获取内存使用情况
memory_cmd = f"rdb --command memory {rdb_file}"
memory_result = subprocess.run(memory_cmd, shell=True,
capture_output=True, text=True, timeout=30)
if memory_result.returncode == 0:
analysis_result['memory_analysis'] = memory_result.stdout
# 获取键统计
stats_cmd = f"rdb --command stats {rdb_file}"
stats_result = subprocess.run(stats_cmd, shell=True,
capture_output=True, text=True, timeout=30)
if stats_result.returncode == 0:
analysis_result['key_statistics'] = stats_result.stdout
except (subprocess.TimeoutExpired, FileNotFoundError):
analysis_result['note'] = 'rdbtools not available for detailed analysis'
return {
'success': True,
'analysis': analysis_result
}
except Exception as e:
self.logger.error(f"Failed to analyze RDB: {e}")
return {
'success': False,
'error': str(e)
}
def _get_redis_info(self) -> Dict[str, Any]:
"""获取Redis实例信息"""
try:
info = self.redis_client.info()
return {
'redis_version': info.get('redis_version'),
'used_memory': info.get('used_memory'),
'used_memory_human': info.get('used_memory_human'),
'connected_clients': info.get('connected_clients'),
'total_commands_processed': info.get('total_commands_processed'),
'keyspace': {k: v for k, v in info.items() if k.startswith('db')}
}
except Exception as e:
return {'error': str(e)}
### 10.2.2 RDB配置优化
```python
class RDBOptimizer:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.logger = logging.getLogger(__name__)
def optimize_rdb_config(self, workload_type: str = 'balanced') -> Dict[str, Any]:
"""根据工作负载优化RDB配置"""
configs = {
'high_write': {
'save_points': [(1800, 1), (600, 100), (120, 10000)],
'rdbcompression': 'yes',
'rdbchecksum': 'yes',
'stop_writes_on_bgsave_error': 'no',
'description': '高写入负载,减少保存频率'
},
'balanced': {
'save_points': [(900, 1), (300, 10), (60, 10000)],
'rdbcompression': 'yes',
'rdbchecksum': 'yes',
'stop_writes_on_bgsave_error': 'yes',
'description': '平衡的读写负载'
},
'high_read': {
'save_points': [(600, 1), (180, 10), (30, 1000)],
'rdbcompression': 'yes',
'rdbchecksum': 'yes',
'stop_writes_on_bgsave_error': 'yes',
'description': '高读取负载,增加保存频率'
},
'memory_optimized': {
'save_points': [(3600, 1), (1800, 10), (300, 10000)],
'rdbcompression': 'yes',
'rdbchecksum': 'no',
'stop_writes_on_bgsave_error': 'no',
'description': '内存优化,减少保存频率和校验'
}
}
if workload_type not in configs:
return {
'success': False,
'error': f"Unknown workload type: {workload_type}",
'available_types': list(configs.keys())
}
config = configs[workload_type]
try:
# 应用配置
save_config = ' '.join([f"{seconds} {changes}"
for seconds, changes in config['save_points']])
self.redis_client.config_set('save', save_config)
self.redis_client.config_set('rdbcompression', config['rdbcompression'])
self.redis_client.config_set('rdbchecksum', config['rdbchecksum'])
self.redis_client.config_set('stop-writes-on-bgsave-error',
config['stop_writes_on_bgsave_error'])
return {
'success': True,
'workload_type': workload_type,
'applied_config': config,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Failed to optimize RDB config: {e}")
return {
'success': False,
'error': str(e)
}
def monitor_rdb_performance(self, duration_minutes: int = 60) -> Dict[str, Any]:
"""监控RDB性能"""
start_time = time.time()
end_time = start_time + (duration_minutes * 60)
performance_data = {
'start_time': datetime.fromtimestamp(start_time).isoformat(),
'duration_minutes': duration_minutes,
'snapshots': [],
'metrics': {
'total_bgsave_count': 0,
'total_bgsave_time': 0,
'avg_bgsave_time': 0,
'max_bgsave_time': 0,
'min_bgsave_time': float('inf')
}
}
last_bgsave_time = None
try:
while time.time() < end_time:
current_time = time.time()
info = self.redis_client.info()
# 检查是否有新的后台保存
current_bgsave_time = info.get('rdb_last_bgsave_time_sec', 0)
if last_bgsave_time is None:
last_bgsave_time = current_bgsave_time
elif current_bgsave_time != last_bgsave_time:
# 发生了新的后台保存
performance_data['snapshots'].append({
'timestamp': datetime.fromtimestamp(current_time).isoformat(),
'bgsave_time': current_bgsave_time,
'rdb_last_save_time': info.get('rdb_last_save_time'),
'rdb_changes_since_last_save': info.get('rdb_changes_since_last_save'),
'used_memory': info.get('used_memory'),
'used_memory_rss': info.get('used_memory_rss')
})
# 更新统计
performance_data['metrics']['total_bgsave_count'] += 1
performance_data['metrics']['total_bgsave_time'] += current_bgsave_time
performance_data['metrics']['max_bgsave_time'] = max(
performance_data['metrics']['max_bgsave_time'],
current_bgsave_time
)
performance_data['metrics']['min_bgsave_time'] = min(
performance_data['metrics']['min_bgsave_time'],
current_bgsave_time
)
last_bgsave_time = current_bgsave_time
time.sleep(10) # 每10秒检查一次
# 计算平均时间
if performance_data['metrics']['total_bgsave_count'] > 0:
performance_data['metrics']['avg_bgsave_time'] = (
performance_data['metrics']['total_bgsave_time'] /
performance_data['metrics']['total_bgsave_count']
)
if performance_data['metrics']['min_bgsave_time'] == float('inf'):
performance_data['metrics']['min_bgsave_time'] = 0
performance_data['end_time'] = datetime.now().isoformat()
return {
'success': True,
'performance_data': performance_data
}
except Exception as e:
self.logger.error(f"Failed to monitor RDB performance: {e}")
return {
'success': False,
'error': str(e),
'partial_data': performance_data
}
## 10.3 AOF持久化
### 10.3.1 AOF工作原理
AOF(Append Only File)持久化通过记录每个写操作命令来保存数据。
```python
class AOFManager:
def __init__(self, redis_client: redis.Redis,
aof_dir: str = "/var/lib/redis",
aof_filename: str = "appendonly.aof"):
self.redis_client = redis_client
self.aof_dir = aof_dir
self.aof_filename = aof_filename
self.aof_path = os.path.join(aof_dir, aof_filename)
self.logger = logging.getLogger(__name__)
# 确保目录存在
os.makedirs(aof_dir, exist_ok=True)
def enable_aof(self, fsync_policy: str = 'everysec') -> Dict[str, Any]:
"""启用AOF持久化"""
try:
# 启用AOF
self.redis_client.config_set('appendonly', 'yes')
# 设置AOF文件名和目录
self.redis_client.config_set('appendfilename', self.aof_filename)
self.redis_client.config_set('dir', self.aof_dir)
# 设置fsync策略
valid_policies = ['always', 'everysec', 'no']
if fsync_policy not in valid_policies:
fsync_policy = 'everysec'
self.redis_client.config_set('appendfsync', fsync_policy)
# 设置AOF重写配置
self.redis_client.config_set('auto-aof-rewrite-percentage', '100')
self.redis_client.config_set('auto-aof-rewrite-min-size', '64mb')
# 设置AOF加载时的错误处理
self.redis_client.config_set('aof-load-truncated', 'yes')
return {
'success': True,
'aof_enabled': True,
'aof_filename': self.aof_filename,
'aof_dir': self.aof_dir,
'fsync_policy': fsync_policy,
'auto_rewrite_enabled': True
}
except Exception as e:
self.logger.error(f"Failed to enable AOF: {e}")
return {
'success': False,
'error': str(e)
}
def disable_aof(self) -> Dict[str, Any]:
"""禁用AOF持久化"""
try:
self.redis_client.config_set('appendonly', 'no')
return {
'success': True,
'aof_enabled': False,
'message': 'AOF persistence disabled'
}
except Exception as e:
self.logger.error(f"Failed to disable AOF: {e}")
return {
'success': False,
'error': str(e)
}
def rewrite_aof(self, background: bool = True) -> Dict[str, Any]:
"""重写AOF文件"""
start_time = time.time()
try:
if background:
# 后台重写
result = self.redis_client.bgrewriteaof()
# 等待重写完成
while True:
info = self.redis_client.info()
if info.get('aof_rewrite_in_progress', 0) == 0:
break
time.sleep(0.5)
if time.time() - start_time > 600: # 10分钟超时
raise TimeoutError("AOF rewrite timeout")
else:
# 前台重写(Redis 6.0+)
# 注意:这个功能可能不在所有版本中可用
result = self.redis_client.execute_command('BGREWRITEAOF')
end_time = time.time()
# 获取AOF文件信息
aof_info = self.get_aof_info()
return {
'success': True,
'method': 'background' if background else 'foreground',
'duration': end_time - start_time,
'aof_info': aof_info,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Failed to rewrite AOF: {e}")
return {
'success': False,
'error': str(e),
'timestamp': datetime.now().isoformat()
}
def get_aof_info(self) -> Dict[str, Any]:
"""获取AOF文件信息"""
try:
info = self.redis_client.info('persistence')
aof_info = {
'aof_enabled': info.get('aof_enabled', 0) == 1,
'aof_rewrite_in_progress': info.get('aof_rewrite_in_progress', 0) == 1,
'aof_rewrite_scheduled': info.get('aof_rewrite_scheduled', 0) == 1,
'aof_last_rewrite_time_sec': info.get('aof_last_rewrite_time_sec', -1),
'aof_current_rewrite_time_sec': info.get('aof_current_rewrite_time_sec', -1),
'aof_last_bgrewrite_status': info.get('aof_last_bgrewrite_status', 'unknown'),
'aof_last_write_status': info.get('aof_last_write_status', 'unknown'),
'aof_current_size': info.get('aof_current_size', 0),
'aof_base_size': info.get('aof_base_size', 0),
'aof_pending_rewrite': info.get('aof_pending_rewrite', 0),
'aof_buffer_length': info.get('aof_buffer_length', 0),
'aof_rewrite_buffer_length': info.get('aof_rewrite_buffer_length', 0),
'aof_pending_bio_fsync': info.get('aof_pending_bio_fsync', 0),
'aof_delayed_fsync': info.get('aof_delayed_fsync', 0)
}
# 添加文件系统信息
if os.path.exists(self.aof_path):
stat = os.stat(self.aof_path)
aof_info.update({
'file_path': self.aof_path,
'file_size': stat.st_size,
'file_size_mb': round(stat.st_size / 1024 / 1024, 2),
'last_modified': datetime.fromtimestamp(stat.st_mtime).isoformat(),
'file_exists': True
})
else:
aof_info.update({
'file_path': self.aof_path,
'file_exists': False
})
return aof_info
except Exception as e:
return {
'error': str(e),
'file_path': self.aof_path
}
def configure_aof(self, fsync_policy: str = 'everysec',
auto_rewrite_percentage: int = 100,
auto_rewrite_min_size: str = '64mb') -> Dict[str, Any]:
"""配置AOF参数"""
try:
# 设置fsync策略
valid_policies = ['always', 'everysec', 'no']
if fsync_policy not in valid_policies:
return {
'success': False,
'error': f"Invalid fsync policy: {fsync_policy}",
'valid_policies': valid_policies
}
self.redis_client.config_set('appendfsync', fsync_policy)
# 设置自动重写参数
self.redis_client.config_set('auto-aof-rewrite-percentage',
str(auto_rewrite_percentage))
self.redis_client.config_set('auto-aof-rewrite-min-size',
auto_rewrite_min_size)
# 设置其他AOF相关配置
self.redis_client.config_set('aof-load-truncated', 'yes')
self.redis_client.config_set('aof-use-rdb-preamble', 'yes')
return {
'success': True,
'fsync_policy': fsync_policy,
'auto_rewrite_percentage': auto_rewrite_percentage,
'auto_rewrite_min_size': auto_rewrite_min_size,
'load_truncated': True,
'use_rdb_preamble': True
}
except Exception as e:
self.logger.error(f"Failed to configure AOF: {e}")
return {
'success': False,
'error': str(e)
}
def repair_aof(self, backup: bool = True) -> Dict[str, Any]:
"""修复损坏的AOF文件"""
if not os.path.exists(self.aof_path):
return {
'success': False,
'error': f"AOF file not found: {self.aof_path}"
}
try:
# 备份原文件
if backup:
backup_path = f"{self.aof_path}.backup.{int(time.time())}"
import shutil
shutil.copy2(self.aof_path, backup_path)
self.logger.info(f"AOF file backed up to: {backup_path}")
# 使用redis-check-aof修复文件
repair_cmd = f"redis-check-aof --fix {self.aof_path}"
result = subprocess.run(repair_cmd, shell=True,
capture_output=True, text=True, timeout=60)
if result.returncode == 0:
return {
'success': True,
'repaired_file': self.aof_path,
'backup_file': backup_path if backup else None,
'repair_output': result.stdout,
'message': 'AOF file repaired successfully'
}
else:
return {
'success': False,
'error': f"AOF repair failed: {result.stderr}",
'repair_output': result.stdout
}
except subprocess.TimeoutExpired:
return {
'success': False,
'error': 'AOF repair timeout'
}
except FileNotFoundError:
return {
'success': False,
'error': 'redis-check-aof command not found'
}
except Exception as e:
self.logger.error(f"Failed to repair AOF: {e}")
return {
'success': False,
'error': str(e)
}
### 10.3.2 AOF性能优化
```python
class AOFOptimizer:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.logger = logging.getLogger(__name__)
def optimize_aof_config(self, workload_type: str = 'balanced') -> Dict[str, Any]:
"""根据工作负载优化AOF配置"""
configs = {
'high_performance': {
'appendfsync': 'no',
'auto_aof_rewrite_percentage': 200,
'auto_aof_rewrite_min_size': '128mb',
'aof_use_rdb_preamble': 'yes',
'description': '高性能,可能丢失更多数据'
},
'balanced': {
'appendfsync': 'everysec',
'auto_aof_rewrite_percentage': 100,
'auto_aof_rewrite_min_size': '64mb',
'aof_use_rdb_preamble': 'yes',
'description': '平衡性能和数据安全'
},
'high_durability': {
'appendfsync': 'always',
'auto_aof_rewrite_percentage': 50,
'auto_aof_rewrite_min_size': '32mb',
'aof_use_rdb_preamble': 'no',
'description': '高数据安全性,性能较低'
},
'memory_optimized': {
'appendfsync': 'everysec',
'auto_aof_rewrite_percentage': 50,
'auto_aof_rewrite_min_size': '32mb',
'aof_use_rdb_preamble': 'yes',
'description': '内存优化,频繁重写'
}
}
if workload_type not in configs:
return {
'success': False,
'error': f"Unknown workload type: {workload_type}",
'available_types': list(configs.keys())
}
config = configs[workload_type]
try:
# 应用配置
self.redis_client.config_set('appendfsync', config['appendfsync'])
self.redis_client.config_set('auto-aof-rewrite-percentage',
str(config['auto_aof_rewrite_percentage']))
self.redis_client.config_set('auto-aof-rewrite-min-size',
config['auto_aof_rewrite_min_size'])
self.redis_client.config_set('aof-use-rdb-preamble',
config['aof_use_rdb_preamble'])
return {
'success': True,
'workload_type': workload_type,
'applied_config': config,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Failed to optimize AOF config: {e}")
return {
'success': False,
'error': str(e)
}
def monitor_aof_performance(self, duration_minutes: int = 60) -> Dict[str, Any]:
"""监控AOF性能"""
start_time = time.time()
end_time = start_time + (duration_minutes * 60)
performance_data = {
'start_time': datetime.fromtimestamp(start_time).isoformat(),
'duration_minutes': duration_minutes,
'snapshots': [],
'metrics': {
'total_rewrites': 0,
'total_rewrite_time': 0,
'avg_rewrite_time': 0,
'max_rewrite_time': 0,
'min_rewrite_time': float('inf'),
'aof_size_growth': 0,
'delayed_fsync_count': 0
}
}
initial_info = self.redis_client.info('persistence')
initial_aof_size = initial_info.get('aof_current_size', 0)
last_rewrite_count = 0
try:
while time.time() < end_time:
current_time = time.time()
info = self.redis_client.info('persistence')
# 记录快照
snapshot = {
'timestamp': datetime.fromtimestamp(current_time).isoformat(),
'aof_current_size': info.get('aof_current_size', 0),
'aof_base_size': info.get('aof_base_size', 0),
'aof_rewrite_in_progress': info.get('aof_rewrite_in_progress', 0),
'aof_last_rewrite_time_sec': info.get('aof_last_rewrite_time_sec', -1),
'aof_delayed_fsync': info.get('aof_delayed_fsync', 0),
'aof_buffer_length': info.get('aof_buffer_length', 0),
'used_memory': info.get('used_memory', 0)
}
performance_data['snapshots'].append(snapshot)
# 检查重写统计
current_rewrite_time = info.get('aof_last_rewrite_time_sec', -1)
if current_rewrite_time > 0 and current_rewrite_time != last_rewrite_count:
performance_data['metrics']['total_rewrites'] += 1
performance_data['metrics']['total_rewrite_time'] += current_rewrite_time
performance_data['metrics']['max_rewrite_time'] = max(
performance_data['metrics']['max_rewrite_time'],
current_rewrite_time
)
performance_data['metrics']['min_rewrite_time'] = min(
performance_data['metrics']['min_rewrite_time'],
current_rewrite_time
)
last_rewrite_count = current_rewrite_time
# 记录延迟fsync
delayed_fsync = info.get('aof_delayed_fsync', 0)
if delayed_fsync > performance_data['metrics']['delayed_fsync_count']:
performance_data['metrics']['delayed_fsync_count'] = delayed_fsync
time.sleep(10) # 每10秒检查一次
# 计算最终统计
final_info = self.redis_client.info('persistence')
final_aof_size = final_info.get('aof_current_size', 0)
performance_data['metrics']['aof_size_growth'] = final_aof_size - initial_aof_size
if performance_data['metrics']['total_rewrites'] > 0:
performance_data['metrics']['avg_rewrite_time'] = (
performance_data['metrics']['total_rewrite_time'] /
performance_data['metrics']['total_rewrites']
)
if performance_data['metrics']['min_rewrite_time'] == float('inf'):
performance_data['metrics']['min_rewrite_time'] = 0
performance_data['end_time'] = datetime.now().isoformat()
return {
'success': True,
'performance_data': performance_data
}
except Exception as e:
self.logger.error(f"Failed to monitor AOF performance: {e}")
return {
'success': False,
'error': str(e),
'partial_data': performance_data
}
# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(decode_responses=True)
# RDB管理示例
rdb_manager = RDBManager(redis_client)
# 配置RDB
rdb_config = rdb_manager.configure_rdb([(900, 1), (300, 10), (60, 10000)])
print(f"RDB configuration: {rdb_config}")
# 创建快照
snapshot_result = rdb_manager.create_snapshot(background=True)
print(f"Snapshot result: {snapshot_result}")
# 获取RDB信息
rdb_info = rdb_manager.get_rdb_info()
print(f"RDB info: {rdb_info}")
# AOF管理示例
aof_manager = AOFManager(redis_client)
# 启用AOF
aof_enable = aof_manager.enable_aof(fsync_policy='everysec')
print(f"AOF enable result: {aof_enable}")
# 配置AOF
aof_config = aof_manager.configure_aof(
fsync_policy='everysec',
auto_rewrite_percentage=100,
auto_rewrite_min_size='64mb'
)
print(f"AOF configuration: {aof_config}")
# 获取AOF信息
aof_info = aof_manager.get_aof_info()
print(f"AOF info: {aof_info}")
# 性能优化示例
rdb_optimizer = RDBOptimizer(redis_client)
aof_optimizer = AOFOptimizer(redis_client)
# 优化RDB配置
rdb_optimization = rdb_optimizer.optimize_rdb_config('balanced')
print(f"RDB optimization: {rdb_optimization}")
# 优化AOF配置
aof_optimization = aof_optimizer.optimize_aof_config('balanced')
print(f"AOF optimization: {aof_optimization}")
10.4 混合持久化
10.4.1 RDB + AOF混合模式
Redis 4.0引入了混合持久化模式,结合了RDB和AOF的优点。
class HybridPersistenceManager:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.rdb_manager = RDBManager(redis_client)
self.aof_manager = AOFManager(redis_client)
self.logger = logging.getLogger(__name__)
def enable_hybrid_persistence(self,
rdb_save_points: List[tuple] = None,
aof_fsync_policy: str = 'everysec') -> Dict[str, Any]:
"""启用混合持久化模式"""
try:
# 启用RDB
rdb_result = self.rdb_manager.configure_rdb(rdb_save_points)
if not rdb_result['success']:
return {
'success': False,
'error': f"Failed to configure RDB: {rdb_result['error']}"
}
# 启用AOF
aof_result = self.aof_manager.enable_aof(aof_fsync_policy)
if not aof_result['success']:
return {
'success': False,
'error': f"Failed to enable AOF: {aof_result['error']}"
}
# 启用AOF使用RDB前导
self.redis_client.config_set('aof-use-rdb-preamble', 'yes')
return {
'success': True,
'mode': 'hybrid',
'rdb_config': rdb_result,
'aof_config': aof_result,
'aof_use_rdb_preamble': True,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Failed to enable hybrid persistence: {e}")
return {
'success': False,
'error': str(e)
}
def get_persistence_status(self) -> Dict[str, Any]:
"""获取持久化状态"""
try:
info = self.redis_client.info('persistence')
# RDB状态
rdb_status = {
'enabled': len(self.redis_client.config_get('save')['save']) > 0,
'last_save_time': info.get('rdb_last_save_time', 0),
'last_bgsave_status': info.get('rdb_last_bgsave_status', 'unknown'),
'last_bgsave_time_sec': info.get('rdb_last_bgsave_time_sec', -1),
'current_bgsave_time_sec': info.get('rdb_current_bgsave_time_sec', -1),
'bgsave_in_progress': info.get('rdb_bgsave_in_progress', 0) == 1,
'changes_since_last_save': info.get('rdb_changes_since_last_save', 0)
}
# AOF状态
aof_status = {
'enabled': info.get('aof_enabled', 0) == 1,
'rewrite_in_progress': info.get('aof_rewrite_in_progress', 0) == 1,
'last_rewrite_time_sec': info.get('aof_last_rewrite_time_sec', -1),
'current_rewrite_time_sec': info.get('aof_current_rewrite_time_sec', -1),
'last_bgrewrite_status': info.get('aof_last_bgrewrite_status', 'unknown'),
'last_write_status': info.get('aof_last_write_status', 'unknown'),
'current_size': info.get('aof_current_size', 0),
'base_size': info.get('aof_base_size', 0),
'use_rdb_preamble': self.redis_client.config_get('aof-use-rdb-preamble')['aof-use-rdb-preamble'] == 'yes'
}
# 混合模式状态
hybrid_status = {
'enabled': rdb_status['enabled'] and aof_status['enabled'],
'rdb_preamble_enabled': aof_status['use_rdb_preamble']
}
return {
'success': True,
'rdb_status': rdb_status,
'aof_status': aof_status,
'hybrid_status': hybrid_status,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Failed to get persistence status: {e}")
return {
'success': False,
'error': str(e)
}
def optimize_hybrid_config(self, workload_type: str = 'balanced') -> Dict[str, Any]:
"""优化混合持久化配置"""
configs = {
'high_write': {
'rdb_save_points': [(1800, 1), (600, 100), (120, 10000)],
'aof_fsync': 'everysec',
'aof_rewrite_percentage': 200,
'aof_rewrite_min_size': '128mb',
'description': '高写入负载优化'
},
'balanced': {
'rdb_save_points': [(900, 1), (300, 10), (60, 10000)],
'aof_fsync': 'everysec',
'aof_rewrite_percentage': 100,
'aof_rewrite_min_size': '64mb',
'description': '平衡配置'
},
'high_read': {
'rdb_save_points': [(600, 1), (180, 10), (30, 1000)],
'aof_fsync': 'everysec',
'aof_rewrite_percentage': 100,
'aof_rewrite_min_size': '64mb',
'description': '高读取负载优化'
},
'memory_critical': {
'rdb_save_points': [(3600, 1), (1800, 10), (300, 10000)],
'aof_fsync': 'everysec',
'aof_rewrite_percentage': 50,
'aof_rewrite_min_size': '32mb',
'description': '内存关键优化'
}
}
if workload_type not in configs:
return {
'success': False,
'error': f"Unknown workload type: {workload_type}",
'available_types': list(configs.keys())
}
config = configs[workload_type]
try:
# 配置RDB
rdb_save_config = ' '.join([f"{seconds} {changes}"
for seconds, changes in config['rdb_save_points']])
self.redis_client.config_set('save', rdb_save_config)
# 配置AOF
self.redis_client.config_set('appendfsync', config['aof_fsync'])
self.redis_client.config_set('auto-aof-rewrite-percentage',
str(config['aof_rewrite_percentage']))
self.redis_client.config_set('auto-aof-rewrite-min-size',
config['aof_rewrite_min_size'])
# 启用混合模式
self.redis_client.config_set('aof-use-rdb-preamble', 'yes')
return {
'success': True,
'workload_type': workload_type,
'applied_config': config,
'hybrid_mode_enabled': True,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Failed to optimize hybrid config: {e}")
return {
'success': False,
'error': str(e)
}
def create_full_backup(self, backup_dir: str = None) -> Dict[str, Any]:
"""创建完整备份(RDB + AOF)"""
if backup_dir is None:
backup_dir = f"/tmp/redis_backup_{int(time.time())}"
os.makedirs(backup_dir, exist_ok=True)
try:
backup_info = {
'backup_dir': backup_dir,
'timestamp': datetime.now().isoformat(),
'files': []
}
# 创建RDB快照
rdb_result = self.rdb_manager.create_snapshot(background=True)
if rdb_result['success']:
# 复制RDB文件
rdb_backup_path = os.path.join(backup_dir, 'dump.rdb')
if os.path.exists(self.rdb_manager.rdb_path):
import shutil
shutil.copy2(self.rdb_manager.rdb_path, rdb_backup_path)
backup_info['files'].append({
'type': 'rdb',
'source': self.rdb_manager.rdb_path,
'backup': rdb_backup_path,
'size': os.path.getsize(rdb_backup_path)
})
# 复制AOF文件
if os.path.exists(self.aof_manager.aof_path):
aof_backup_path = os.path.join(backup_dir, 'appendonly.aof')
import shutil
shutil.copy2(self.aof_manager.aof_path, aof_backup_path)
backup_info['files'].append({
'type': 'aof',
'source': self.aof_manager.aof_path,
'backup': aof_backup_path,
'size': os.path.getsize(aof_backup_path)
})
# 保存配置信息
config_backup_path = os.path.join(backup_dir, 'redis_config.json')
config_info = {
'redis_version': self.redis_client.info()['redis_version'],
'persistence_status': self.get_persistence_status(),
'config': dict(self.redis_client.config_get('*'))
}
with open(config_backup_path, 'w') as f:
json.dump(config_info, f, indent=2, default=str)
backup_info['files'].append({
'type': 'config',
'backup': config_backup_path,
'size': os.path.getsize(config_backup_path)
})
backup_info['total_size'] = sum(f['size'] for f in backup_info['files'])
backup_info['total_size_mb'] = round(backup_info['total_size'] / 1024 / 1024, 2)
return {
'success': True,
'backup_info': backup_info
}
except Exception as e:
self.logger.error(f"Failed to create full backup: {e}")
return {
'success': False,
'error': str(e),
'backup_dir': backup_dir
}
下一章将介绍Redis集群和高可用性配置。