13.1 性能优化概述
13.1.1 性能优化的重要性
Redis作为高性能的内存数据库,在大规模应用中性能优化至关重要:
- 响应时间: 优化可以显著降低响应延迟
- 吞吐量: 提高系统的并发处理能力
- 资源利用: 更高效地使用CPU和内存资源
- 成本控制: 减少硬件资源需求
- 用户体验: 提供更流畅的应用体验
13.1.2 性能优化策略
”`python import redis import time import threading import json from typing import Dict, List, Any, Optional from dataclasses import dataclass from datetime import datetime, timedelta import psutil import logging
@dataclass class PerformanceMetrics: “”“性能指标数据类”“” timestamp: datetime cpu_usage: float memory_usage: float network_io: Dict[str, int] redis_info: Dict[str, Any] response_times: List[float] throughput: float error_rate: float
class RedisPerformanceOptimizer: “”“Redis性能优化器”“”
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.logger = logging.getLogger(__name__)
self.metrics_history = []
self.optimization_rules = self._load_optimization_rules()
def _load_optimization_rules(self) -> Dict[str, Any]:
"""加载优化规则"""
return {
'memory': {
'max_memory_usage': 0.8, # 最大内存使用率
'eviction_policy': 'allkeys-lru',
'compression_threshold': 1000 # 压缩阈值
},
'network': {
'pipeline_batch_size': 100,
'connection_pool_size': 50,
'timeout_settings': {
'socket_timeout': 5,
'socket_connect_timeout': 5
}
},
'persistence': {
'save_frequency': '900 1 300 10 60 10000',
'aof_rewrite_threshold': 100,
'no_appendfsync_on_rewrite': True
},
'cpu': {
'max_cpu_usage': 0.7,
'slow_log_threshold': 10000, # 微秒
'client_output_buffer_limit': '256mb'
}
}
def collect_metrics(self) -> PerformanceMetrics:
"""收集性能指标"""
try:
# 系统指标
cpu_usage = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
network = psutil.net_io_counters()
# Redis指标
redis_info = self.redis_client.info()
# 响应时间测试
response_times = self._measure_response_times()
# 吞吐量测试
throughput = self._measure_throughput()
metrics = PerformanceMetrics(
timestamp=datetime.now(),
cpu_usage=cpu_usage,
memory_usage=memory.percent,
network_io={
'bytes_sent': network.bytes_sent,
'bytes_recv': network.bytes_recv,
'packets_sent': network.packets_sent,
'packets_recv': network.packets_recv
},
redis_info=redis_info,
response_times=response_times,
throughput=throughput,
error_rate=self._calculate_error_rate()
)
self.metrics_history.append(metrics)
# 保留最近1000条记录
if len(self.metrics_history) > 1000:
self.metrics_history = self.metrics_history[-1000:]
return metrics
except Exception as e:
self.logger.error(f"收集性能指标失败: {e}")
raise
def _measure_response_times(self, sample_count: int = 100) -> List[float]:
"""测量响应时间"""
response_times = []
for i in range(sample_count):
start_time = time.time()
try:
self.redis_client.ping()
response_time = (time.time() - start_time) * 1000 # 毫秒
response_times.append(response_time)
except Exception:
response_times.append(float('inf'))
return response_times
def _measure_throughput(self, duration: int = 10) -> float:
"""测量吞吐量"""
start_time = time.time()
operations = 0
end_time = start_time + duration
while time.time() < end_time:
try:
# 执行简单的SET/GET操作
key = f"benchmark:{operations}"
self.redis_client.set(key, "test_value")
self.redis_client.get(key)
self.redis_client.delete(key)
operations += 3
except Exception:
pass
actual_duration = time.time() - start_time
return operations / actual_duration if actual_duration > 0 else 0
def _calculate_error_rate(self) -> float:
"""计算错误率"""
try:
info = self.redis_client.info('stats')
total_commands = info.get('total_commands_processed', 0)
rejected_connections = info.get('rejected_connections', 0)
if total_commands > 0:
return rejected_connections / total_commands
return 0.0
except Exception:
return 0.0
def analyze_performance(self) -> Dict[str, Any]:
"""分析性能"""
if not self.metrics_history:
return {'error': '没有性能数据'}
latest_metrics = self.metrics_history[-1]
analysis = {
'timestamp': latest_metrics.timestamp.isoformat(),
'overall_health': 'good',
'issues': [],
'recommendations': [],
'metrics_summary': {
'avg_response_time': sum(latest_metrics.response_times) / len(latest_metrics.response_times),
'max_response_time': max(latest_metrics.response_times),
'min_response_time': min(latest_metrics.response_times),
'throughput': latest_metrics.throughput,
'cpu_usage': latest_metrics.cpu_usage,
'memory_usage': latest_metrics.memory_usage,
'error_rate': latest_metrics.error_rate
}
}
# 分析CPU使用率
if latest_metrics.cpu_usage > self.optimization_rules['cpu']['max_cpu_usage'] * 100:
analysis['issues'].append('CPU使用率过高')
analysis['recommendations'].append('考虑优化命令复杂度或增加CPU资源')
analysis['overall_health'] = 'warning'
# 分析内存使用率
if latest_metrics.memory_usage > self.optimization_rules['memory']['max_memory_usage'] * 100:
analysis['issues'].append('内存使用率过高')
analysis['recommendations'].append('配置内存淘汰策略或增加内存')
analysis['overall_health'] = 'warning'
# 分析响应时间
avg_response_time = analysis['metrics_summary']['avg_response_time']
if avg_response_time > 10: # 10ms
analysis['issues'].append('响应时间过长')
analysis['recommendations'].append('检查网络延迟和Redis配置')
analysis['overall_health'] = 'warning'
# 分析错误率
if latest_metrics.error_rate > 0.01: # 1%
analysis['issues'].append('错误率过高')
analysis['recommendations'].append('检查连接池配置和网络稳定性')
analysis['overall_health'] = 'critical'
return analysis
def optimize_configuration(self) -> Dict[str, Any]:
"""优化配置"""
try:
current_config = dict(self.redis_client.config_get('*'))
optimizations = []
# 内存优化
memory_optimizations = self._optimize_memory_config(current_config)
optimizations.extend(memory_optimizations)
# 网络优化
network_optimizations = self._optimize_network_config(current_config)
optimizations.extend(network_optimizations)
# 持久化优化
persistence_optimizations = self._optimize_persistence_config(current_config)
optimizations.extend(persistence_optimizations)
# 应用优化
applied_count = 0
for optimization in optimizations:
try:
self.redis_client.config_set(optimization['key'], optimization['value'])
applied_count += 1
self.logger.info(f"应用优化: {optimization['key']} = {optimization['value']}")
except Exception as e:
self.logger.warning(f"应用优化失败: {optimization['key']} - {e}")
return {
'success': True,
'total_optimizations': len(optimizations),
'applied_optimizations': applied_count,
'optimizations': optimizations
}
except Exception as e:
self.logger.error(f"配置优化失败: {e}")
return {
'success': False,
'error': str(e)
}
def _optimize_memory_config(self, current_config: Dict[str, str]) -> List[Dict[str, Any]]:
"""优化内存配置"""
optimizations = []
# 设置最大内存
if 'maxmemory' not in current_config or current_config['maxmemory'] == '0':
# 设置为系统内存的80%
total_memory = psutil.virtual_memory().total
max_memory = int(total_memory * 0.8)
optimizations.append({
'key': 'maxmemory',
'value': str(max_memory),
'reason': '设置最大内存限制'
})
# 设置内存淘汰策略
if current_config.get('maxmemory-policy') != self.optimization_rules['memory']['eviction_policy']:
optimizations.append({
'key': 'maxmemory-policy',
'value': self.optimization_rules['memory']['eviction_policy'],
'reason': '优化内存淘汰策略'
})
# 优化哈希表压缩配置
hash_configs = {
'hash-max-ziplist-entries': '512',
'hash-max-ziplist-value': '64',
'list-max-ziplist-size': '-2',
'set-max-intset-entries': '512',
'zset-max-ziplist-entries': '128',
'zset-max-ziplist-value': '64'
}
for key, value in hash_configs.items():
if current_config.get(key) != value:
optimizations.append({
'key': key,
'value': value,
'reason': '优化数据结构压缩'
})
return optimizations
def _optimize_network_config(self, current_config: Dict[str, str]) -> List[Dict[str, Any]]:
"""优化网络配置"""
optimizations = []
# 优化TCP keepalive
tcp_configs = {
'tcp-keepalive': '300',
'timeout': '300'
}
for key, value in tcp_configs.items():
if current_config.get(key) != value:
optimizations.append({
'key': key,
'value': value,
'reason': '优化网络连接'
})
return optimizations
def _optimize_persistence_config(self, current_config: Dict[str, str]) -> List[Dict[str, Any]]:
"""优化持久化配置"""
optimizations = []
# 优化RDB保存策略
if current_config.get('save') != self.optimization_rules['persistence']['save_frequency']:
optimizations.append({
'key': 'save',
'value': self.optimization_rules['persistence']['save_frequency'],
'reason': '优化RDB保存频率'
})
# 优化AOF配置
aof_configs = {
'auto-aof-rewrite-percentage': '100',
'auto-aof-rewrite-min-size': '64mb',
'no-appendfsync-on-rewrite': 'yes' if self.optimization_rules['persistence']['no_appendfsync_on_rewrite'] else 'no'
}
for key, value in aof_configs.items():
if current_config.get(key) != value:
optimizations.append({
'key': key,
'value': value,
'reason': '优化AOF配置'
})
return optimizations
13.2 内存优化
13.2.1 内存使用分析
”`python class RedisMemoryAnalyzer: “”“Redis内存分析器”“”
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.logger = logging.getLogger(__name__)
def analyze_memory_usage(self) -> Dict[str, Any]:
"""分析内存使用情况"""
try:
info = self.redis_client.info('memory')
analysis = {
'total_memory': {
'used_memory': info.get('used_memory', 0),
'used_memory_human': info.get('used_memory_human', '0B'),
'used_memory_rss': info.get('used_memory_rss', 0),
'used_memory_peak': info.get('used_memory_peak', 0),
'used_memory_peak_human': info.get('used_memory_peak_human', '0B')
},
'memory_fragmentation': {
'fragmentation_ratio': info.get('mem_fragmentation_ratio', 0),
'fragmentation_bytes': info.get('mem_fragmentation_bytes', 0)
},
'memory_efficiency': self._calculate_memory_efficiency(info),
'recommendations': []
}
# 分析内存碎片
frag_ratio = analysis['memory_fragmentation']['fragmentation_ratio']
if frag_ratio > 1.5:
analysis['recommendations'].append({
'type': 'fragmentation',
'message': f'内存碎片率过高 ({frag_ratio:.2f}),建议重启Redis或使用MEMORY PURGE命令',
'priority': 'high'
})
# 分析内存使用率
max_memory = self.redis_client.config_get('maxmemory')['maxmemory']
if max_memory != '0':
usage_ratio = info.get('used_memory', 0) / int(max_memory)
if usage_ratio > 0.8:
analysis['recommendations'].append({
'type': 'usage',
'message': f'内存使用率过高 ({usage_ratio:.2%}),建议增加内存或优化数据结构',
'priority': 'high'
})
return analysis
except Exception as e:
self.logger.error(f"内存分析失败: {e}")
return {'error': str(e)}
def _calculate_memory_efficiency(self, memory_info: Dict[str, Any]) -> Dict[str, float]:
"""计算内存效率"""
used_memory = memory_info.get('used_memory', 0)
used_memory_rss = memory_info.get('used_memory_rss', 0)
if used_memory_rss > 0:
efficiency = used_memory / used_memory_rss
else:
efficiency = 1.0
return {
'efficiency_ratio': efficiency,
'waste_bytes': max(0, used_memory_rss - used_memory),
'efficiency_percentage': efficiency * 100
}
def find_memory_hotspots(self, sample_size: int = 1000) -> Dict[str, Any]:
"""查找内存热点"""
try:
# 获取大键
big_keys = self._find_big_keys(sample_size)
# 分析键类型分布
key_type_distribution = self._analyze_key_types(sample_size)
# 分析过期键
expiry_analysis = self._analyze_key_expiry(sample_size)
return {
'big_keys': big_keys,
'key_type_distribution': key_type_distribution,
'expiry_analysis': expiry_analysis,
'optimization_suggestions': self._generate_optimization_suggestions(
big_keys, key_type_distribution, expiry_analysis
)
}
except Exception as e:
self.logger.error(f"查找内存热点失败: {e}")
return {'error': str(e)}
def _find_big_keys(self, sample_size: int) -> List[Dict[str, Any]]:
"""查找大键"""
big_keys = []
try:
# 随机采样键
keys = self.redis_client.randomkey()
sampled_keys = []
for _ in range(sample_size):
key = self.redis_client.randomkey()
if key:
sampled_keys.append(key)
# 分析每个键的内存使用
for key in sampled_keys:
try:
memory_usage = self.redis_client.memory_usage(key)
key_type = self.redis_client.type(key).decode()
ttl = self.redis_client.ttl(key)
if memory_usage > 1024 * 1024: # 大于1MB的键
big_keys.append({
'key': key.decode() if isinstance(key, bytes) else key,
'memory_usage': memory_usage,
'memory_usage_human': self._format_bytes(memory_usage),
'type': key_type,
'ttl': ttl
})
except Exception:
continue
# 按内存使用排序
big_keys.sort(key=lambda x: x['memory_usage'], reverse=True)
except Exception as e:
self.logger.warning(f"查找大键时出错: {e}")
return big_keys[:20] # 返回前20个大键
def _analyze_key_types(self, sample_size: int) -> Dict[str, Any]:
"""分析键类型分布"""
type_stats = {}
total_memory = 0
try:
for _ in range(sample_size):
key = self.redis_client.randomkey()
if not key:
continue
try:
key_type = self.redis_client.type(key).decode()
memory_usage = self.redis_client.memory_usage(key)
if key_type not in type_stats:
type_stats[key_type] = {
'count': 0,
'total_memory': 0,
'avg_memory': 0
}
type_stats[key_type]['count'] += 1
type_stats[key_type]['total_memory'] += memory_usage
total_memory += memory_usage
except Exception:
continue
# 计算平均值和百分比
for key_type, stats in type_stats.items():
stats['avg_memory'] = stats['total_memory'] / stats['count']
stats['memory_percentage'] = (stats['total_memory'] / total_memory * 100) if total_memory > 0 else 0
stats['total_memory_human'] = self._format_bytes(stats['total_memory'])
stats['avg_memory_human'] = self._format_bytes(stats['avg_memory'])
except Exception as e:
self.logger.warning(f"分析键类型时出错: {e}")
return type_stats
def _analyze_key_expiry(self, sample_size: int) -> Dict[str, Any]:
"""分析键过期情况"""
expiry_stats = {
'with_ttl': 0,
'without_ttl': 0,
'expired_soon': 0, # 1小时内过期
'expired_today': 0, # 24小时内过期
'long_term': 0 # 超过24小时
}
try:
for _ in range(sample_size):
key = self.redis_client.randomkey()
if not key:
continue
try:
ttl = self.redis_client.ttl(key)
if ttl == -1: # 没有过期时间
expiry_stats['without_ttl'] += 1
elif ttl == -2: # 键不存在
continue
else:
expiry_stats['with_ttl'] += 1
if ttl <= 3600: # 1小时内
expiry_stats['expired_soon'] += 1
elif ttl <= 86400: # 24小时内
expiry_stats['expired_today'] += 1
else:
expiry_stats['long_term'] += 1
except Exception:
continue
except Exception as e:
self.logger.warning(f"分析键过期时出错: {e}")
return expiry_stats
def _generate_optimization_suggestions(self, big_keys: List[Dict],
type_distribution: Dict,
expiry_analysis: Dict) -> List[str]:
"""生成优化建议"""
suggestions = []
# 大键优化建议
if big_keys:
suggestions.append(f"发现 {len(big_keys)} 个大键,建议考虑拆分或压缩")
# 类型分布优化建议
for key_type, stats in type_distribution.items():
if stats['memory_percentage'] > 50:
suggestions.append(f"{key_type} 类型占用内存过多 ({stats['memory_percentage']:.1f}%),建议优化")
# 过期键优化建议
total_keys = expiry_analysis['with_ttl'] + expiry_analysis['without_ttl']
if total_keys > 0:
no_ttl_ratio = expiry_analysis['without_ttl'] / total_keys
if no_ttl_ratio > 0.5:
suggestions.append(f"有 {no_ttl_ratio:.1%} 的键没有设置过期时间,建议设置合适的TTL")
return suggestions
def _format_bytes(self, bytes_value: int) -> str:
"""格式化字节数"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes_value < 1024.0:
return f"{bytes_value:.2f} {unit}"
bytes_value /= 1024.0
return f"{bytes_value:.2f} PB"
13.3 网络优化
13.3.1 连接池优化
”`python class OptimizedRedisConnectionPool: “”“优化的Redis连接池”“”
def __init__(self, host='localhost', port=6379, db=0, password=None,
max_connections=50, retry_on_timeout=True,
socket_timeout=5, socket_connect_timeout=5):
# 优化的连接池配置
self.pool = redis.ConnectionPool(
host=host,
port=port,
db=db,
password=password,
max_connections=max_connections,
retry_on_timeout=retry_on_timeout,
socket_timeout=socket_timeout,
socket_connect_timeout=socket_connect_timeout,
socket_keepalive=True,
socket_keepalive_options={},
health_check_interval=30
)
self.redis_client = redis.Redis(connection_pool=self.pool)
self.logger = logging.getLogger(__name__)
# 性能监控
self.connection_stats = {
'created': 0,
'in_use': 0,
'available': 0,
'errors': 0
}
def get_connection_stats(self) -> Dict[str, Any]:
"""获取连接池统计信息"""
try:
pool_stats = {
'created_connections': self.pool.created_connections,
'available_connections': len(self.pool._available_connections),
'in_use_connections': len(self.pool._in_use_connections),
'max_connections': self.pool.max_connections
}
# 计算使用率
usage_rate = (pool_stats['in_use_connections'] /
pool_stats['max_connections']) * 100
pool_stats['usage_rate'] = usage_rate
pool_stats['health_status'] = 'healthy' if usage_rate < 80 else 'warning'
return pool_stats
except Exception as e:
self.logger.error(f"获取连接池统计失败: {e}")
return {}
def optimize_pool_size(self) -> Dict[str, Any]:
"""优化连接池大小"""
try:
stats = self.get_connection_stats()
current_max = stats.get('max_connections', 50)
usage_rate = stats.get('usage_rate', 0)
recommendations = []
if usage_rate > 90:
new_size = int(current_max * 1.5)
recommendations.append({
'action': 'increase',
'current_size': current_max,
'recommended_size': new_size,
'reason': f'使用率过高 ({usage_rate:.1f}%)'
})
elif usage_rate < 30:
new_size = max(10, int(current_max * 0.7))
recommendations.append({
'action': 'decrease',
'current_size': current_max,
'recommended_size': new_size,
'reason': f'使用率过低 ({usage_rate:.1f}%)'
})
else:
recommendations.append({
'action': 'maintain',
'current_size': current_max,
'reason': f'使用率正常 ({usage_rate:.1f}%)'
})
return {
'current_stats': stats,
'recommendations': recommendations
}
except Exception as e:
self.logger.error(f"优化连接池大小失败: {e}")
return {'error': str(e)}
13.3.2 管道优化
”`python class OptimizedRedisPipeline: “”“优化的Redis管道”“”
def __init__(self, redis_client: redis.Redis, batch_size: int = 100):
self.redis_client = redis_client
self.batch_size = batch_size
self.logger = logging.getLogger(__name__)
def batch_execute(self, commands: List[Dict[str, Any]]) -> List[Any]:
"""批量执行命令"""
results = []
try:
# 分批处理
for i in range(0, len(commands), self.batch_size):
batch = commands[i:i + self.batch_size]
batch_results = self._execute_batch(batch)
results.extend(batch_results)
return results
except Exception as e:
self.logger.error(f"批量执行失败: {e}")
raise
def _execute_batch(self, batch: List[Dict[str, Any]]) -> List[Any]:
"""执行单个批次"""
pipe = self.redis_client.pipeline()
try:
# 添加命令到管道
for cmd in batch:
method = getattr(pipe, cmd['method'])
args = cmd.get('args', [])
kwargs = cmd.get('kwargs', {})
method(*args, **kwargs)
# 执行管道
return pipe.execute()
except Exception as e:
self.logger.error(f"执行批次失败: {e}")
raise
def optimized_bulk_set(self, data: Dict[str, Any], ttl: Optional[int] = None) -> bool:
"""优化的批量设置"""
try:
pipe = self.redis_client.pipeline()
for key, value in data.items():
if ttl:
pipe.setex(key, ttl, value)
else:
pipe.set(key, value)
pipe.execute()
return True
except Exception as e:
self.logger.error(f"批量设置失败: {e}")
return False
def optimized_bulk_get(self, keys: List[str]) -> Dict[str, Any]:
"""优化的批量获取"""
try:
# 使用MGET进行批量获取
values = self.redis_client.mget(keys)
# 组装结果
result = {}
for i, key in enumerate(keys):
if i < len(values) and values[i] is not None:
result[key] = values[i]
return result
except Exception as e:
self.logger.error(f"批量获取失败: {e}")
return {}
13.4 查询优化
13.4.1 慢查询分析
”`python class RedisSlowQueryAnalyzer: “”“Redis慢查询分析器”“”
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.logger = logging.getLogger(__name__)
def configure_slow_log(self, slower_than: int = 10000, max_len: int = 128) -> bool:
"""配置慢查询日志"""
try:
# 设置慢查询阈值(微秒)
self.redis_client.config_set('slowlog-log-slower-than', slower_than)
# 设置慢查询日志最大长度
self.redis_client.config_set('slowlog-max-len', max_len)
self.logger.info(f"慢查询配置已更新: 阈值={slower_than}μs, 最大长度={max_len}")
return True
except Exception as e:
self.logger.error(f"配置慢查询失败: {e}")
return False
def get_slow_queries(self, count: int = 10) -> List[Dict[str, Any]]:
"""获取慢查询记录"""
try:
slow_logs = self.redis_client.slowlog_get(count)
analyzed_logs = []
for log in slow_logs:
analyzed_log = {
'id': log['id'],
'start_time': datetime.fromtimestamp(log['start_time']),
'duration_microseconds': log['duration'],
'duration_milliseconds': log['duration'] / 1000,
'command': ' '.join(str(arg) for arg in log['command']),
'client_address': log.get('client_address', 'unknown'),
'client_name': log.get('client_name', 'unknown')
}
analyzed_logs.append(analyzed_log)
return analyzed_logs
except Exception as e:
self.logger.error(f"获取慢查询失败: {e}")
return []
def analyze_slow_patterns(self) -> Dict[str, Any]:
"""分析慢查询模式"""
try:
slow_queries = self.get_slow_queries(100)
if not slow_queries:
return {'message': '没有慢查询记录'}
# 统计命令类型
command_stats = {}
total_duration = 0
for query in slow_queries:
command = query['command'].split()[0].upper()
duration = query['duration_microseconds']
if command not in command_stats:
command_stats[command] = {
'count': 0,
'total_duration': 0,
'avg_duration': 0,
'max_duration': 0
}
command_stats[command]['count'] += 1
command_stats[command]['total_duration'] += duration
command_stats[command]['max_duration'] = max(
command_stats[command]['max_duration'], duration
)
total_duration += duration
# 计算平均值和百分比
for command, stats in command_stats.items():
stats['avg_duration'] = stats['total_duration'] / stats['count']
stats['duration_percentage'] = (stats['total_duration'] / total_duration * 100) if total_duration > 0 else 0
# 排序
sorted_commands = sorted(
command_stats.items(),
key=lambda x: x[1]['total_duration'],
reverse=True
)
# 生成优化建议
recommendations = self._generate_slow_query_recommendations(sorted_commands)
return {
'total_slow_queries': len(slow_queries),
'total_duration_ms': total_duration / 1000,
'command_statistics': dict(sorted_commands),
'top_slow_commands': sorted_commands[:5],
'recommendations': recommendations
}
except Exception as e:
self.logger.error(f"分析慢查询模式失败: {e}")
return {'error': str(e)}
def _generate_slow_query_recommendations(self, command_stats: List[tuple]) -> List[str]:
"""生成慢查询优化建议"""
recommendations = []
for command, stats in command_stats[:3]: # 前3个最慢的命令
if command in ['KEYS', 'FLUSHALL', 'FLUSHDB']:
recommendations.append(f"避免在生产环境使用 {command} 命令")
elif command in ['SORT', 'SUNION', 'SINTER', 'SDIFF']:
recommendations.append(f"优化 {command} 命令的数据集大小")
elif command in ['ZRANGE', 'ZREVRANGE']:
recommendations.append(f"限制 {command} 命令的返回结果数量")
elif stats['avg_duration'] > 100000: # 100ms
recommendations.append(f"优化 {command} 命令的执行效率")
return recommendations
def clear_slow_log(self) -> bool:
"""清空慢查询日志"""
try:
self.redis_client.slowlog_reset()
self.logger.info("慢查询日志已清空")
return True
except Exception as e:
self.logger.error(f"清空慢查询日志失败: {e}")
return False
13.5 总结
本章详细介绍了Redis性能优化与调优的各个方面:
13.5.1 核心优化策略
- 性能监控:建立完善的性能监控体系
- 内存优化:优化内存使用和数据结构
- 网络优化:优化连接池和网络配置
- 查询优化:分析和优化慢查询
13.5.2 最佳实践
- 定期进行性能分析和优化
- 合理配置内存和网络参数
- 使用管道和批量操作提高效率
- 监控慢查询并及时优化
- 根据业务场景选择合适的数据结构
13.5.3 参考资料
下一章将介绍Redis的实战案例和项目应用。