13.1 性能优化概述

13.1.1 性能优化的重要性

Redis作为高性能的内存数据库,在大规模应用中性能优化至关重要:

  • 响应时间: 优化可以显著降低响应延迟
  • 吞吐量: 提高系统的并发处理能力
  • 资源利用: 更高效地使用CPU和内存资源
  • 成本控制: 减少硬件资源需求
  • 用户体验: 提供更流畅的应用体验

13.1.2 性能优化策略

”`python import redis import time import threading import json from typing import Dict, List, Any, Optional from dataclasses import dataclass from datetime import datetime, timedelta import psutil import logging

@dataclass class PerformanceMetrics: “”“性能指标数据类”“” timestamp: datetime cpu_usage: float memory_usage: float network_io: Dict[str, int] redis_info: Dict[str, Any] response_times: List[float] throughput: float error_rate: float

class RedisPerformanceOptimizer: “”“Redis性能优化器”“”

def __init__(self, redis_client: redis.Redis):
    self.redis_client = redis_client
    self.logger = logging.getLogger(__name__)
    self.metrics_history = []
    self.optimization_rules = self._load_optimization_rules()

def _load_optimization_rules(self) -> Dict[str, Any]:
    """加载优化规则"""
    return {
        'memory': {
            'max_memory_usage': 0.8,  # 最大内存使用率
            'eviction_policy': 'allkeys-lru',
            'compression_threshold': 1000  # 压缩阈值
        },
        'network': {
            'pipeline_batch_size': 100,
            'connection_pool_size': 50,
            'timeout_settings': {
                'socket_timeout': 5,
                'socket_connect_timeout': 5
            }
        },
        'persistence': {
            'save_frequency': '900 1 300 10 60 10000',
            'aof_rewrite_threshold': 100,
            'no_appendfsync_on_rewrite': True
        },
        'cpu': {
            'max_cpu_usage': 0.7,
            'slow_log_threshold': 10000,  # 微秒
            'client_output_buffer_limit': '256mb'
        }
    }

def collect_metrics(self) -> PerformanceMetrics:
    """收集性能指标"""
    try:
        # 系统指标
        cpu_usage = psutil.cpu_percent(interval=1)
        memory = psutil.virtual_memory()
        network = psutil.net_io_counters()

        # Redis指标
        redis_info = self.redis_client.info()

        # 响应时间测试
        response_times = self._measure_response_times()

        # 吞吐量测试
        throughput = self._measure_throughput()

        metrics = PerformanceMetrics(
            timestamp=datetime.now(),
            cpu_usage=cpu_usage,
            memory_usage=memory.percent,
            network_io={
                'bytes_sent': network.bytes_sent,
                'bytes_recv': network.bytes_recv,
                'packets_sent': network.packets_sent,
                'packets_recv': network.packets_recv
            },
            redis_info=redis_info,
            response_times=response_times,
            throughput=throughput,
            error_rate=self._calculate_error_rate()
        )

        self.metrics_history.append(metrics)
        # 保留最近1000条记录
        if len(self.metrics_history) > 1000:
            self.metrics_history = self.metrics_history[-1000:]

        return metrics

    except Exception as e:
        self.logger.error(f"收集性能指标失败: {e}")
        raise

def _measure_response_times(self, sample_count: int = 100) -> List[float]:
    """测量响应时间"""
    response_times = []

    for i in range(sample_count):
        start_time = time.time()
        try:
            self.redis_client.ping()
            response_time = (time.time() - start_time) * 1000  # 毫秒
            response_times.append(response_time)
        except Exception:
            response_times.append(float('inf'))

    return response_times

def _measure_throughput(self, duration: int = 10) -> float:
    """测量吞吐量"""
    start_time = time.time()
    operations = 0

    end_time = start_time + duration

    while time.time() < end_time:
        try:
            # 执行简单的SET/GET操作
            key = f"benchmark:{operations}"
            self.redis_client.set(key, "test_value")
            self.redis_client.get(key)
            self.redis_client.delete(key)
            operations += 3
        except Exception:
            pass

    actual_duration = time.time() - start_time
    return operations / actual_duration if actual_duration > 0 else 0

def _calculate_error_rate(self) -> float:
    """计算错误率"""
    try:
        info = self.redis_client.info('stats')
        total_commands = info.get('total_commands_processed', 0)
        rejected_connections = info.get('rejected_connections', 0)

        if total_commands > 0:
            return rejected_connections / total_commands
        return 0.0
    except Exception:
        return 0.0

def analyze_performance(self) -> Dict[str, Any]:
    """分析性能"""
    if not self.metrics_history:
        return {'error': '没有性能数据'}

    latest_metrics = self.metrics_history[-1]

    analysis = {
        'timestamp': latest_metrics.timestamp.isoformat(),
        'overall_health': 'good',
        'issues': [],
        'recommendations': [],
        'metrics_summary': {
            'avg_response_time': sum(latest_metrics.response_times) / len(latest_metrics.response_times),
            'max_response_time': max(latest_metrics.response_times),
            'min_response_time': min(latest_metrics.response_times),
            'throughput': latest_metrics.throughput,
            'cpu_usage': latest_metrics.cpu_usage,
            'memory_usage': latest_metrics.memory_usage,
            'error_rate': latest_metrics.error_rate
        }
    }

    # 分析CPU使用率
    if latest_metrics.cpu_usage > self.optimization_rules['cpu']['max_cpu_usage'] * 100:
        analysis['issues'].append('CPU使用率过高')
        analysis['recommendations'].append('考虑优化命令复杂度或增加CPU资源')
        analysis['overall_health'] = 'warning'

    # 分析内存使用率
    if latest_metrics.memory_usage > self.optimization_rules['memory']['max_memory_usage'] * 100:
        analysis['issues'].append('内存使用率过高')
        analysis['recommendations'].append('配置内存淘汰策略或增加内存')
        analysis['overall_health'] = 'warning'

    # 分析响应时间
    avg_response_time = analysis['metrics_summary']['avg_response_time']
    if avg_response_time > 10:  # 10ms
        analysis['issues'].append('响应时间过长')
        analysis['recommendations'].append('检查网络延迟和Redis配置')
        analysis['overall_health'] = 'warning'

    # 分析错误率
    if latest_metrics.error_rate > 0.01:  # 1%
        analysis['issues'].append('错误率过高')
        analysis['recommendations'].append('检查连接池配置和网络稳定性')
        analysis['overall_health'] = 'critical'

    return analysis

def optimize_configuration(self) -> Dict[str, Any]:
    """优化配置"""
    try:
        current_config = dict(self.redis_client.config_get('*'))
        optimizations = []

        # 内存优化
        memory_optimizations = self._optimize_memory_config(current_config)
        optimizations.extend(memory_optimizations)

        # 网络优化
        network_optimizations = self._optimize_network_config(current_config)
        optimizations.extend(network_optimizations)

        # 持久化优化
        persistence_optimizations = self._optimize_persistence_config(current_config)
        optimizations.extend(persistence_optimizations)

        # 应用优化
        applied_count = 0
        for optimization in optimizations:
            try:
                self.redis_client.config_set(optimization['key'], optimization['value'])
                applied_count += 1
                self.logger.info(f"应用优化: {optimization['key']} = {optimization['value']}")
            except Exception as e:
                self.logger.warning(f"应用优化失败: {optimization['key']} - {e}")

        return {
            'success': True,
            'total_optimizations': len(optimizations),
            'applied_optimizations': applied_count,
            'optimizations': optimizations
        }

    except Exception as e:
        self.logger.error(f"配置优化失败: {e}")
        return {
            'success': False,
            'error': str(e)
        }

def _optimize_memory_config(self, current_config: Dict[str, str]) -> List[Dict[str, Any]]:
    """优化内存配置"""
    optimizations = []

    # 设置最大内存
    if 'maxmemory' not in current_config or current_config['maxmemory'] == '0':
        # 设置为系统内存的80%
        total_memory = psutil.virtual_memory().total
        max_memory = int(total_memory * 0.8)
        optimizations.append({
            'key': 'maxmemory',
            'value': str(max_memory),
            'reason': '设置最大内存限制'
        })

    # 设置内存淘汰策略
    if current_config.get('maxmemory-policy') != self.optimization_rules['memory']['eviction_policy']:
        optimizations.append({
            'key': 'maxmemory-policy',
            'value': self.optimization_rules['memory']['eviction_policy'],
            'reason': '优化内存淘汰策略'
        })

    # 优化哈希表压缩配置
    hash_configs = {
        'hash-max-ziplist-entries': '512',
        'hash-max-ziplist-value': '64',
        'list-max-ziplist-size': '-2',
        'set-max-intset-entries': '512',
        'zset-max-ziplist-entries': '128',
        'zset-max-ziplist-value': '64'
    }

    for key, value in hash_configs.items():
        if current_config.get(key) != value:
            optimizations.append({
                'key': key,
                'value': value,
                'reason': '优化数据结构压缩'
            })

    return optimizations

def _optimize_network_config(self, current_config: Dict[str, str]) -> List[Dict[str, Any]]:
    """优化网络配置"""
    optimizations = []

    # 优化TCP keepalive
    tcp_configs = {
        'tcp-keepalive': '300',
        'timeout': '300'
    }

    for key, value in tcp_configs.items():
        if current_config.get(key) != value:
            optimizations.append({
                'key': key,
                'value': value,
                'reason': '优化网络连接'
            })

    return optimizations

def _optimize_persistence_config(self, current_config: Dict[str, str]) -> List[Dict[str, Any]]:
    """优化持久化配置"""
    optimizations = []

    # 优化RDB保存策略
    if current_config.get('save') != self.optimization_rules['persistence']['save_frequency']:
        optimizations.append({
            'key': 'save',
            'value': self.optimization_rules['persistence']['save_frequency'],
            'reason': '优化RDB保存频率'
        })

    # 优化AOF配置
    aof_configs = {
        'auto-aof-rewrite-percentage': '100',
        'auto-aof-rewrite-min-size': '64mb',
        'no-appendfsync-on-rewrite': 'yes' if self.optimization_rules['persistence']['no_appendfsync_on_rewrite'] else 'no'
    }

    for key, value in aof_configs.items():
        if current_config.get(key) != value:
            optimizations.append({
                'key': key,
                'value': value,
                'reason': '优化AOF配置'
            })

    return optimizations

13.2 内存优化

13.2.1 内存使用分析

”`python class RedisMemoryAnalyzer: “”“Redis内存分析器”“”

def __init__(self, redis_client: redis.Redis):
    self.redis_client = redis_client
    self.logger = logging.getLogger(__name__)

def analyze_memory_usage(self) -> Dict[str, Any]:
    """分析内存使用情况"""
    try:
        info = self.redis_client.info('memory')

        analysis = {
            'total_memory': {
                'used_memory': info.get('used_memory', 0),
                'used_memory_human': info.get('used_memory_human', '0B'),
                'used_memory_rss': info.get('used_memory_rss', 0),
                'used_memory_peak': info.get('used_memory_peak', 0),
                'used_memory_peak_human': info.get('used_memory_peak_human', '0B')
            },
            'memory_fragmentation': {
                'fragmentation_ratio': info.get('mem_fragmentation_ratio', 0),
                'fragmentation_bytes': info.get('mem_fragmentation_bytes', 0)
            },
            'memory_efficiency': self._calculate_memory_efficiency(info),
            'recommendations': []
        }

        # 分析内存碎片
        frag_ratio = analysis['memory_fragmentation']['fragmentation_ratio']
        if frag_ratio > 1.5:
            analysis['recommendations'].append({
                'type': 'fragmentation',
                'message': f'内存碎片率过高 ({frag_ratio:.2f}),建议重启Redis或使用MEMORY PURGE命令',
                'priority': 'high'
            })

        # 分析内存使用率
        max_memory = self.redis_client.config_get('maxmemory')['maxmemory']
        if max_memory != '0':
            usage_ratio = info.get('used_memory', 0) / int(max_memory)
            if usage_ratio > 0.8:
                analysis['recommendations'].append({
                    'type': 'usage',
                    'message': f'内存使用率过高 ({usage_ratio:.2%}),建议增加内存或优化数据结构',
                    'priority': 'high'
                })

        return analysis

    except Exception as e:
        self.logger.error(f"内存分析失败: {e}")
        return {'error': str(e)}

def _calculate_memory_efficiency(self, memory_info: Dict[str, Any]) -> Dict[str, float]:
    """计算内存效率"""
    used_memory = memory_info.get('used_memory', 0)
    used_memory_rss = memory_info.get('used_memory_rss', 0)

    if used_memory_rss > 0:
        efficiency = used_memory / used_memory_rss
    else:
        efficiency = 1.0

    return {
        'efficiency_ratio': efficiency,
        'waste_bytes': max(0, used_memory_rss - used_memory),
        'efficiency_percentage': efficiency * 100
    }

def find_memory_hotspots(self, sample_size: int = 1000) -> Dict[str, Any]:
    """查找内存热点"""
    try:
        # 获取大键
        big_keys = self._find_big_keys(sample_size)

        # 分析键类型分布
        key_type_distribution = self._analyze_key_types(sample_size)

        # 分析过期键
        expiry_analysis = self._analyze_key_expiry(sample_size)

        return {
            'big_keys': big_keys,
            'key_type_distribution': key_type_distribution,
            'expiry_analysis': expiry_analysis,
            'optimization_suggestions': self._generate_optimization_suggestions(
                big_keys, key_type_distribution, expiry_analysis
            )
        }

    except Exception as e:
        self.logger.error(f"查找内存热点失败: {e}")
        return {'error': str(e)}

def _find_big_keys(self, sample_size: int) -> List[Dict[str, Any]]:
    """查找大键"""
    big_keys = []

    try:
        # 随机采样键
        keys = self.redis_client.randomkey()
        sampled_keys = []

        for _ in range(sample_size):
            key = self.redis_client.randomkey()
            if key:
                sampled_keys.append(key)

        # 分析每个键的内存使用
        for key in sampled_keys:
            try:
                memory_usage = self.redis_client.memory_usage(key)
                key_type = self.redis_client.type(key).decode()
                ttl = self.redis_client.ttl(key)

                if memory_usage > 1024 * 1024:  # 大于1MB的键
                    big_keys.append({
                        'key': key.decode() if isinstance(key, bytes) else key,
                        'memory_usage': memory_usage,
                        'memory_usage_human': self._format_bytes(memory_usage),
                        'type': key_type,
                        'ttl': ttl
                    })
            except Exception:
                continue

        # 按内存使用排序
        big_keys.sort(key=lambda x: x['memory_usage'], reverse=True)

    except Exception as e:
        self.logger.warning(f"查找大键时出错: {e}")

    return big_keys[:20]  # 返回前20个大键

def _analyze_key_types(self, sample_size: int) -> Dict[str, Any]:
    """分析键类型分布"""
    type_stats = {}
    total_memory = 0

    try:
        for _ in range(sample_size):
            key = self.redis_client.randomkey()
            if not key:
                continue

            try:
                key_type = self.redis_client.type(key).decode()
                memory_usage = self.redis_client.memory_usage(key)

                if key_type not in type_stats:
                    type_stats[key_type] = {
                        'count': 0,
                        'total_memory': 0,
                        'avg_memory': 0
                    }

                type_stats[key_type]['count'] += 1
                type_stats[key_type]['total_memory'] += memory_usage
                total_memory += memory_usage

            except Exception:
                continue

        # 计算平均值和百分比
        for key_type, stats in type_stats.items():
            stats['avg_memory'] = stats['total_memory'] / stats['count']
            stats['memory_percentage'] = (stats['total_memory'] / total_memory * 100) if total_memory > 0 else 0
            stats['total_memory_human'] = self._format_bytes(stats['total_memory'])
            stats['avg_memory_human'] = self._format_bytes(stats['avg_memory'])

    except Exception as e:
        self.logger.warning(f"分析键类型时出错: {e}")

    return type_stats

def _analyze_key_expiry(self, sample_size: int) -> Dict[str, Any]:
    """分析键过期情况"""
    expiry_stats = {
        'with_ttl': 0,
        'without_ttl': 0,
        'expired_soon': 0,  # 1小时内过期
        'expired_today': 0,  # 24小时内过期
        'long_term': 0  # 超过24小时
    }

    try:
        for _ in range(sample_size):
            key = self.redis_client.randomkey()
            if not key:
                continue

            try:
                ttl = self.redis_client.ttl(key)

                if ttl == -1:  # 没有过期时间
                    expiry_stats['without_ttl'] += 1
                elif ttl == -2:  # 键不存在
                    continue
                else:
                    expiry_stats['with_ttl'] += 1

                    if ttl <= 3600:  # 1小时内
                        expiry_stats['expired_soon'] += 1
                    elif ttl <= 86400:  # 24小时内
                        expiry_stats['expired_today'] += 1
                    else:
                        expiry_stats['long_term'] += 1

            except Exception:
                continue

    except Exception as e:
        self.logger.warning(f"分析键过期时出错: {e}")

    return expiry_stats

def _generate_optimization_suggestions(self, big_keys: List[Dict], 
                                     type_distribution: Dict, 
                                     expiry_analysis: Dict) -> List[str]:
    """生成优化建议"""
    suggestions = []

    # 大键优化建议
    if big_keys:
        suggestions.append(f"发现 {len(big_keys)} 个大键,建议考虑拆分或压缩")

    # 类型分布优化建议
    for key_type, stats in type_distribution.items():
        if stats['memory_percentage'] > 50:
            suggestions.append(f"{key_type} 类型占用内存过多 ({stats['memory_percentage']:.1f}%),建议优化")

    # 过期键优化建议
    total_keys = expiry_analysis['with_ttl'] + expiry_analysis['without_ttl']
    if total_keys > 0:
        no_ttl_ratio = expiry_analysis['without_ttl'] / total_keys
        if no_ttl_ratio > 0.5:
            suggestions.append(f"有 {no_ttl_ratio:.1%} 的键没有设置过期时间,建议设置合适的TTL")

    return suggestions

def _format_bytes(self, bytes_value: int) -> str:
    """格式化字节数"""
    for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
        if bytes_value < 1024.0:
            return f"{bytes_value:.2f} {unit}"
        bytes_value /= 1024.0
    return f"{bytes_value:.2f} PB"

13.3 网络优化

13.3.1 连接池优化

”`python class OptimizedRedisConnectionPool: “”“优化的Redis连接池”“”

def __init__(self, host='localhost', port=6379, db=0, password=None,
             max_connections=50, retry_on_timeout=True, 
             socket_timeout=5, socket_connect_timeout=5):

    # 优化的连接池配置
    self.pool = redis.ConnectionPool(
        host=host,
        port=port,
        db=db,
        password=password,
        max_connections=max_connections,
        retry_on_timeout=retry_on_timeout,
        socket_timeout=socket_timeout,
        socket_connect_timeout=socket_connect_timeout,
        socket_keepalive=True,
        socket_keepalive_options={},
        health_check_interval=30
    )

    self.redis_client = redis.Redis(connection_pool=self.pool)
    self.logger = logging.getLogger(__name__)

    # 性能监控
    self.connection_stats = {
        'created': 0,
        'in_use': 0,
        'available': 0,
        'errors': 0
    }

def get_connection_stats(self) -> Dict[str, Any]:
    """获取连接池统计信息"""
    try:
        pool_stats = {
            'created_connections': self.pool.created_connections,
            'available_connections': len(self.pool._available_connections),
            'in_use_connections': len(self.pool._in_use_connections),
            'max_connections': self.pool.max_connections
        }

        # 计算使用率
        usage_rate = (pool_stats['in_use_connections'] / 
                     pool_stats['max_connections']) * 100

        pool_stats['usage_rate'] = usage_rate
        pool_stats['health_status'] = 'healthy' if usage_rate < 80 else 'warning'

        return pool_stats

    except Exception as e:
        self.logger.error(f"获取连接池统计失败: {e}")
        return {}

def optimize_pool_size(self) -> Dict[str, Any]:
    """优化连接池大小"""
    try:
        stats = self.get_connection_stats()
        current_max = stats.get('max_connections', 50)
        usage_rate = stats.get('usage_rate', 0)

        recommendations = []

        if usage_rate > 90:
            new_size = int(current_max * 1.5)
            recommendations.append({
                'action': 'increase',
                'current_size': current_max,
                'recommended_size': new_size,
                'reason': f'使用率过高 ({usage_rate:.1f}%)'
            })
        elif usage_rate < 30:
            new_size = max(10, int(current_max * 0.7))
            recommendations.append({
                'action': 'decrease',
                'current_size': current_max,
                'recommended_size': new_size,
                'reason': f'使用率过低 ({usage_rate:.1f}%)'
            })
        else:
            recommendations.append({
                'action': 'maintain',
                'current_size': current_max,
                'reason': f'使用率正常 ({usage_rate:.1f}%)'
            })

        return {
            'current_stats': stats,
            'recommendations': recommendations
        }

    except Exception as e:
        self.logger.error(f"优化连接池大小失败: {e}")
        return {'error': str(e)}

13.3.2 管道优化

”`python class OptimizedRedisPipeline: “”“优化的Redis管道”“”

def __init__(self, redis_client: redis.Redis, batch_size: int = 100):
    self.redis_client = redis_client
    self.batch_size = batch_size
    self.logger = logging.getLogger(__name__)

def batch_execute(self, commands: List[Dict[str, Any]]) -> List[Any]:
    """批量执行命令"""
    results = []

    try:
        # 分批处理
        for i in range(0, len(commands), self.batch_size):
            batch = commands[i:i + self.batch_size]
            batch_results = self._execute_batch(batch)
            results.extend(batch_results)

        return results

    except Exception as e:
        self.logger.error(f"批量执行失败: {e}")
        raise

def _execute_batch(self, batch: List[Dict[str, Any]]) -> List[Any]:
    """执行单个批次"""
    pipe = self.redis_client.pipeline()

    try:
        # 添加命令到管道
        for cmd in batch:
            method = getattr(pipe, cmd['method'])
            args = cmd.get('args', [])
            kwargs = cmd.get('kwargs', {})
            method(*args, **kwargs)

        # 执行管道
        return pipe.execute()

    except Exception as e:
        self.logger.error(f"执行批次失败: {e}")
        raise

def optimized_bulk_set(self, data: Dict[str, Any], ttl: Optional[int] = None) -> bool:
    """优化的批量设置"""
    try:
        pipe = self.redis_client.pipeline()

        for key, value in data.items():
            if ttl:
                pipe.setex(key, ttl, value)
            else:
                pipe.set(key, value)

        pipe.execute()
        return True

    except Exception as e:
        self.logger.error(f"批量设置失败: {e}")
        return False

def optimized_bulk_get(self, keys: List[str]) -> Dict[str, Any]:
    """优化的批量获取"""
    try:
        # 使用MGET进行批量获取
        values = self.redis_client.mget(keys)

        # 组装结果
        result = {}
        for i, key in enumerate(keys):
            if i < len(values) and values[i] is not None:
                result[key] = values[i]

        return result

    except Exception as e:
        self.logger.error(f"批量获取失败: {e}")
        return {}

13.4 查询优化

13.4.1 慢查询分析

”`python class RedisSlowQueryAnalyzer: “”“Redis慢查询分析器”“”

def __init__(self, redis_client: redis.Redis):
    self.redis_client = redis_client
    self.logger = logging.getLogger(__name__)

def configure_slow_log(self, slower_than: int = 10000, max_len: int = 128) -> bool:
    """配置慢查询日志"""
    try:
        # 设置慢查询阈值(微秒)
        self.redis_client.config_set('slowlog-log-slower-than', slower_than)

        # 设置慢查询日志最大长度
        self.redis_client.config_set('slowlog-max-len', max_len)

        self.logger.info(f"慢查询配置已更新: 阈值={slower_than}μs, 最大长度={max_len}")
        return True

    except Exception as e:
        self.logger.error(f"配置慢查询失败: {e}")
        return False

def get_slow_queries(self, count: int = 10) -> List[Dict[str, Any]]:
    """获取慢查询记录"""
    try:
        slow_logs = self.redis_client.slowlog_get(count)

        analyzed_logs = []
        for log in slow_logs:
            analyzed_log = {
                'id': log['id'],
                'start_time': datetime.fromtimestamp(log['start_time']),
                'duration_microseconds': log['duration'],
                'duration_milliseconds': log['duration'] / 1000,
                'command': ' '.join(str(arg) for arg in log['command']),
                'client_address': log.get('client_address', 'unknown'),
                'client_name': log.get('client_name', 'unknown')
            }
            analyzed_logs.append(analyzed_log)

        return analyzed_logs

    except Exception as e:
        self.logger.error(f"获取慢查询失败: {e}")
        return []

def analyze_slow_patterns(self) -> Dict[str, Any]:
    """分析慢查询模式"""
    try:
        slow_queries = self.get_slow_queries(100)

        if not slow_queries:
            return {'message': '没有慢查询记录'}

        # 统计命令类型
        command_stats = {}
        total_duration = 0

        for query in slow_queries:
            command = query['command'].split()[0].upper()
            duration = query['duration_microseconds']

            if command not in command_stats:
                command_stats[command] = {
                    'count': 0,
                    'total_duration': 0,
                    'avg_duration': 0,
                    'max_duration': 0
                }

            command_stats[command]['count'] += 1
            command_stats[command]['total_duration'] += duration
            command_stats[command]['max_duration'] = max(
                command_stats[command]['max_duration'], duration
            )
            total_duration += duration

        # 计算平均值和百分比
        for command, stats in command_stats.items():
            stats['avg_duration'] = stats['total_duration'] / stats['count']
            stats['duration_percentage'] = (stats['total_duration'] / total_duration * 100) if total_duration > 0 else 0

        # 排序
        sorted_commands = sorted(
            command_stats.items(),
            key=lambda x: x[1]['total_duration'],
            reverse=True
        )

        # 生成优化建议
        recommendations = self._generate_slow_query_recommendations(sorted_commands)

        return {
            'total_slow_queries': len(slow_queries),
            'total_duration_ms': total_duration / 1000,
            'command_statistics': dict(sorted_commands),
            'top_slow_commands': sorted_commands[:5],
            'recommendations': recommendations
        }

    except Exception as e:
        self.logger.error(f"分析慢查询模式失败: {e}")
        return {'error': str(e)}

def _generate_slow_query_recommendations(self, command_stats: List[tuple]) -> List[str]:
    """生成慢查询优化建议"""
    recommendations = []

    for command, stats in command_stats[:3]:  # 前3个最慢的命令
        if command in ['KEYS', 'FLUSHALL', 'FLUSHDB']:
            recommendations.append(f"避免在生产环境使用 {command} 命令")
        elif command in ['SORT', 'SUNION', 'SINTER', 'SDIFF']:
            recommendations.append(f"优化 {command} 命令的数据集大小")
        elif command in ['ZRANGE', 'ZREVRANGE']:
            recommendations.append(f"限制 {command} 命令的返回结果数量")
        elif stats['avg_duration'] > 100000:  # 100ms
            recommendations.append(f"优化 {command} 命令的执行效率")

    return recommendations

def clear_slow_log(self) -> bool:
    """清空慢查询日志"""
    try:
        self.redis_client.slowlog_reset()
        self.logger.info("慢查询日志已清空")
        return True
    except Exception as e:
        self.logger.error(f"清空慢查询日志失败: {e}")
        return False

13.5 总结

本章详细介绍了Redis性能优化与调优的各个方面:

13.5.1 核心优化策略

  1. 性能监控:建立完善的性能监控体系
  2. 内存优化:优化内存使用和数据结构
  3. 网络优化:优化连接池和网络配置
  4. 查询优化:分析和优化慢查询

13.5.2 最佳实践

  • 定期进行性能分析和优化
  • 合理配置内存和网络参数
  • 使用管道和批量操作提高效率
  • 监控慢查询并及时优化
  • 根据业务场景选择合适的数据结构

13.5.3 参考资料


下一章将介绍Redis的实战案例和项目应用。