11.1 概述

Redis集群和高可用性是构建可扩展、可靠Redis服务的关键技术。本章将介绍Redis Cluster、Redis Sentinel以及各种高可用性架构模式。

11.1.1 高可用性需求

  1. 服务连续性:确保服务24/7可用
  2. 数据一致性:保证数据不丢失
  3. 故障恢复:快速检测和恢复故障
  4. 水平扩展:支持数据和负载的水平扩展
  5. 负载均衡:合理分配请求负载

11.1.2 Redis高可用方案对比

方案 优点 缺点 适用场景
主从复制 简单、读写分离 手动故障转移 读多写少
Sentinel 自动故障转移 单点写入 中小规模
Cluster 水平扩展、自动分片 复杂性高 大规模应用
Proxy方案 透明分片 额外组件 平滑迁移

11.2 Redis主从复制

11.2.1 主从复制原理

Redis主从复制通过异步复制实现数据同步,支持一主多从的架构。

import redis
import time
import threading
from typing import Dict, List, Any, Optional, Tuple
import logging
import json
from datetime import datetime
import socket

class RedisMasterSlaveManager:
    def __init__(self, master_config: Dict[str, Any], 
                 slave_configs: List[Dict[str, Any]]):
        self.master_config = master_config
        self.slave_configs = slave_configs
        self.logger = logging.getLogger(__name__)
        
        # 连接实例
        self.master = None
        self.slaves = []
        self.connect_all()
    
    def connect_all(self) -> Dict[str, Any]:
        """连接所有Redis实例"""
        try:
            # 连接主节点
            self.master = redis.Redis(**self.master_config, decode_responses=True)
            self.master.ping()  # 测试连接
            
            # 连接从节点
            self.slaves = []
            for slave_config in self.slave_configs:
                slave = redis.Redis(**slave_config, decode_responses=True)
                slave.ping()  # 测试连接
                self.slaves.append(slave)
            
            return {
                'success': True,
                'master_connected': True,
                'slaves_connected': len(self.slaves),
                'total_slaves': len(self.slave_configs)
            }
            
        except Exception as e:
            self.logger.error(f"Failed to connect to Redis instances: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def setup_replication(self) -> Dict[str, Any]:
        """设置主从复制"""
        try:
            results = []
            
            for i, slave in enumerate(self.slaves):
                try:
                    # 配置从节点
                    slave.slaveof(self.master_config['host'], 
                                self.master_config['port'])
                    
                    # 如果主节点有密码,配置认证
                    if 'password' in self.master_config:
                        slave.config_set('masterauth', self.master_config['password'])
                    
                    results.append({
                        'slave_index': i,
                        'slave_config': self.slave_configs[i],
                        'replication_setup': True,
                        'status': 'success'
                    })
                    
                except Exception as e:
                    results.append({
                        'slave_index': i,
                        'slave_config': self.slave_configs[i],
                        'replication_setup': False,
                        'status': 'failed',
                        'error': str(e)
                    })
            
            successful_slaves = sum(1 for r in results if r['replication_setup'])
            
            return {
                'success': successful_slaves > 0,
                'total_slaves': len(self.slaves),
                'successful_slaves': successful_slaves,
                'results': results,
                'timestamp': datetime.now().isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to setup replication: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def get_replication_status(self) -> Dict[str, Any]:
        """获取复制状态"""
        try:
            # 主节点信息
            master_info = self.master.info('replication')
            master_status = {
                'role': master_info.get('role'),
                'connected_slaves': master_info.get('connected_slaves', 0),
                'master_replid': master_info.get('master_replid'),
                'master_repl_offset': master_info.get('master_repl_offset', 0),
                'repl_backlog_active': master_info.get('repl_backlog_active', 0),
                'repl_backlog_size': master_info.get('repl_backlog_size', 0)
            }
            
            # 从节点信息
            slaves_status = []
            for i, slave in enumerate(self.slaves):
                try:
                    slave_info = slave.info('replication')
                    slave_status = {
                        'slave_index': i,
                        'role': slave_info.get('role'),
                        'master_host': slave_info.get('master_host'),
                        'master_port': slave_info.get('master_port'),
                        'master_link_status': slave_info.get('master_link_status'),
                        'master_last_io_seconds_ago': slave_info.get('master_last_io_seconds_ago'),
                        'master_sync_in_progress': slave_info.get('master_sync_in_progress', 0),
                        'slave_repl_offset': slave_info.get('slave_repl_offset', 0),
                        'slave_priority': slave_info.get('slave_priority', 100),
                        'slave_read_only': slave_info.get('slave_read_only', 1),
                        'connected': True
                    }
                    slaves_status.append(slave_status)
                    
                except Exception as e:
                    slaves_status.append({
                        'slave_index': i,
                        'connected': False,
                        'error': str(e)
                    })
            
            # 计算复制延迟
            replication_lag = []
            master_offset = master_status['master_repl_offset']
            
            for slave_status in slaves_status:
                if slave_status.get('connected') and 'slave_repl_offset' in slave_status:
                    lag = master_offset - slave_status['slave_repl_offset']
                    replication_lag.append({
                        'slave_index': slave_status['slave_index'],
                        'lag_bytes': lag,
                        'lag_seconds': slave_status.get('master_last_io_seconds_ago', 0)
                    })
            
            return {
                'success': True,
                'master_status': master_status,
                'slaves_status': slaves_status,
                'replication_lag': replication_lag,
                'timestamp': datetime.now().isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to get replication status: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def promote_slave_to_master(self, slave_index: int) -> Dict[str, Any]:
        """提升从节点为主节点"""
        if slave_index >= len(self.slaves):
            return {
                'success': False,
                'error': f"Invalid slave index: {slave_index}"
            }
        
        try:
            new_master = self.slaves[slave_index]
            
            # 停止复制,成为独立的主节点
            new_master.slaveof()
            
            # 等待一段时间确保提升完成
            time.sleep(1)
            
            # 验证新主节点状态
            new_master_info = new_master.info('replication')
            if new_master_info.get('role') != 'master':
                return {
                    'success': False,
                    'error': 'Failed to promote slave to master'
                }
            
            # 重新配置其他从节点指向新主节点
            new_master_config = self.slave_configs[slave_index]
            reconfigured_slaves = []
            
            for i, slave in enumerate(self.slaves):
                if i != slave_index:
                    try:
                        slave.slaveof(new_master_config['host'], 
                                    new_master_config['port'])
                        reconfigured_slaves.append(i)
                    except Exception as e:
                        self.logger.error(f"Failed to reconfigure slave {i}: {e}")
            
            return {
                'success': True,
                'new_master_index': slave_index,
                'new_master_config': new_master_config,
                'reconfigured_slaves': reconfigured_slaves,
                'timestamp': datetime.now().isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to promote slave to master: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def configure_read_write_split(self) -> Dict[str, Any]:
        """配置读写分离"""
        try:
            # 配置主节点为读写
            master_config = {
                'read_operations': ['GET', 'MGET', 'HGET', 'HGETALL', 'LRANGE', 
                                  'SMEMBERS', 'ZRANGE', 'EXISTS', 'TTL'],
                'write_operations': ['SET', 'MSET', 'HSET', 'LPUSH', 'SADD', 
                                   'ZADD', 'DEL', 'EXPIRE'],
                'role': 'master',
                'allow_reads': True,
                'allow_writes': True
            }
            
            # 配置从节点为只读
            slaves_config = []
            for i, slave in enumerate(self.slaves):
                # 确保从节点为只读模式
                slave.config_set('slave-read-only', 'yes')
                
                slaves_config.append({
                    'slave_index': i,
                    'role': 'slave',
                    'allow_reads': True,
                    'allow_writes': False,
                    'read_only': True
                })
            
            return {
                'success': True,
                'master_config': master_config,
                'slaves_config': slaves_config,
                'read_write_split_enabled': True,
                'timestamp': datetime.now().isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to configure read-write split: {e}")
            return {
                'success': False,
                'error': str(e)
            }

### 11.2.2 读写分离客户端

```python
class ReadWriteSplitClient:
    def __init__(self, master_config: Dict[str, Any], 
                 slave_configs: List[Dict[str, Any]],
                 read_strategy: str = 'round_robin'):
        self.master_config = master_config
        self.slave_configs = slave_configs
        self.read_strategy = read_strategy
        self.logger = logging.getLogger(__name__)
        
        # 连接池
        self.master = redis.Redis(**master_config, decode_responses=True)
        self.slaves = [redis.Redis(**config, decode_responses=True) 
                      for config in slave_configs]
        
        # 读取策略相关
        self.current_slave_index = 0
        self.slave_weights = [1] * len(self.slaves)  # 权重
        
        # 操作分类
        self.read_operations = {
            'GET', 'MGET', 'HGET', 'HGETALL', 'HMGET', 'HKEYS', 'HVALS',
            'LRANGE', 'LINDEX', 'LLEN', 'SMEMBERS', 'SCARD', 'SISMEMBER',
            'ZRANGE', 'ZREVRANGE', 'ZCARD', 'ZSCORE', 'EXISTS', 'TTL',
            'TYPE', 'KEYS', 'SCAN', 'HSCAN', 'SSCAN', 'ZSCAN'
        }
        
        self.write_operations = {
            'SET', 'MSET', 'HSET', 'HMSET', 'HDEL', 'LPUSH', 'RPUSH',
            'LPOP', 'RPOP', 'SADD', 'SREM', 'ZADD', 'ZREM', 'DEL',
            'EXPIRE', 'EXPIREAT', 'PERSIST', 'INCR', 'DECR', 'INCRBY',
            'DECRBY', 'APPEND'
        }
    
    def _select_slave_for_read(self) -> redis.Redis:
        """选择从节点进行读取"""
        if not self.slaves:
            return self.master
        
        if self.read_strategy == 'round_robin':
            slave = self.slaves[self.current_slave_index]
            self.current_slave_index = (self.current_slave_index + 1) % len(self.slaves)
            return slave
        
        elif self.read_strategy == 'random':
            import random
            return random.choice(self.slaves)
        
        elif self.read_strategy == 'weighted':
            import random
            total_weight = sum(self.slave_weights)
            if total_weight == 0:
                return self.slaves[0]
            
            rand_val = random.uniform(0, total_weight)
            current_weight = 0
            
            for i, weight in enumerate(self.slave_weights):
                current_weight += weight
                if rand_val <= current_weight:
                    return self.slaves[i]
            
            return self.slaves[-1]
        
        elif self.read_strategy == 'least_connections':
            # 简化实现,选择第一个可用的从节点
            for slave in self.slaves:
                try:
                    slave.ping()
                    return slave
                except:
                    continue
            return self.master
        
        else:
            return self.slaves[0] if self.slaves else self.master
    
    def _is_read_operation(self, command: str) -> bool:
        """判断是否为读操作"""
        return command.upper() in self.read_operations
    
    def _is_write_operation(self, command: str) -> bool:
        """判断是否为写操作"""
        return command.upper() in self.write_operations
    
    def execute_command(self, command: str, *args, **kwargs) -> Any:
        """执行Redis命令"""
        try:
            if self._is_read_operation(command):
                # 读操作:使用从节点
                client = self._select_slave_for_read()
                try:
                    return client.execute_command(command, *args, **kwargs)
                except Exception as e:
                    # 从节点失败,回退到主节点
                    self.logger.warning(f"Slave read failed, fallback to master: {e}")
                    return self.master.execute_command(command, *args, **kwargs)
            
            elif self._is_write_operation(command):
                # 写操作:使用主节点
                return self.master.execute_command(command, *args, **kwargs)
            
            else:
                # 未知操作:使用主节点
                self.logger.warning(f"Unknown operation type for command: {command}")
                return self.master.execute_command(command, *args, **kwargs)
        
        except Exception as e:
            self.logger.error(f"Command execution failed: {command}, error: {e}")
            raise
    
    def get(self, key: str) -> Any:
        """读取操作"""
        return self.execute_command('GET', key)
    
    def set(self, key: str, value: Any, **kwargs) -> bool:
        """写入操作"""
        return self.execute_command('SET', key, value, **kwargs)
    
    def hget(self, name: str, key: str) -> Any:
        """哈希读取操作"""
        return self.execute_command('HGET', name, key)
    
    def hset(self, name: str, key: str, value: Any) -> int:
        """哈希写入操作"""
        return self.execute_command('HSET', name, key, value)
    
    def check_health(self) -> Dict[str, Any]:
        """检查所有节点健康状态"""
        health_status = {
            'master': {'healthy': False, 'latency_ms': None, 'error': None},
            'slaves': []
        }
        
        # 检查主节点
        try:
            start_time = time.time()
            self.master.ping()
            latency = (time.time() - start_time) * 1000
            health_status['master'] = {
                'healthy': True,
                'latency_ms': round(latency, 2),
                'error': None
            }
        except Exception as e:
            health_status['master']['error'] = str(e)
        
        # 检查从节点
        for i, slave in enumerate(self.slaves):
            slave_status = {
                'index': i,
                'healthy': False,
                'latency_ms': None,
                'error': None
            }
            
            try:
                start_time = time.time()
                slave.ping()
                latency = (time.time() - start_time) * 1000
                slave_status.update({
                    'healthy': True,
                    'latency_ms': round(latency, 2)
                })
            except Exception as e:
                slave_status['error'] = str(e)
            
            health_status['slaves'].append(slave_status)
        
        # 计算总体健康状态
        healthy_slaves = sum(1 for s in health_status['slaves'] if s['healthy'])
        health_status['summary'] = {
            'master_healthy': health_status['master']['healthy'],
            'total_slaves': len(self.slaves),
            'healthy_slaves': healthy_slaves,
            'overall_healthy': health_status['master']['healthy'] and healthy_slaves > 0
        }
        
        return health_status
    
    def update_slave_weights(self, weights: List[float]) -> Dict[str, Any]:
        """更新从节点权重"""
        if len(weights) != len(self.slaves):
            return {
                'success': False,
                'error': f"Weight count ({len(weights)}) doesn't match slave count ({len(self.slaves)})"
            }
        
        self.slave_weights = weights
        return {
            'success': True,
            'weights': weights,
            'strategy': self.read_strategy
        }

## 11.3 Redis Sentinel

### 11.3.1 Sentinel架构和配置

Redis Sentinel提供自动故障检测和故障转移功能。

```python
class RedisSentinelManager:
    def __init__(self, sentinel_configs: List[Dict[str, Any]],
                 service_name: str = 'mymaster'):
        self.sentinel_configs = sentinel_configs
        self.service_name = service_name
        self.logger = logging.getLogger(__name__)
        
        # 创建Sentinel连接
        from redis.sentinel import Sentinel
        self.sentinel = Sentinel(
            [(config['host'], config['port']) for config in sentinel_configs],
            decode_responses=True
        )
    
    def get_master_client(self) -> redis.Redis:
        """获取主节点客户端"""
        return self.sentinel.master_for(self.service_name)
    
    def get_slave_client(self) -> redis.Redis:
        """获取从节点客户端"""
        return self.sentinel.slave_for(self.service_name)
    
    def discover_master(self) -> Dict[str, Any]:
        """发现主节点"""
        try:
            master_info = self.sentinel.discover_master(self.service_name)
            return {
                'success': True,
                'master_host': master_info[0],
                'master_port': master_info[1],
                'service_name': self.service_name
            }
        except Exception as e:
            return {
                'success': False,
                'error': str(e)
            }
    
    def discover_slaves(self) -> Dict[str, Any]:
        """发现从节点"""
        try:
            slaves_info = self.sentinel.discover_slaves(self.service_name)
            slaves = []
            for slave_info in slaves_info:
                slaves.append({
                    'host': slave_info[0],
                    'port': slave_info[1]
                })
            
            return {
                'success': True,
                'slaves': slaves,
                'slave_count': len(slaves),
                'service_name': self.service_name
            }
        except Exception as e:
            return {
                'success': False,
                'error': str(e)
            }
    
    def get_sentinel_status(self) -> Dict[str, Any]:
        """获取Sentinel状态"""
        try:
            status = {
                'sentinels': [],
                'master_info': None,
                'slaves_info': None
            }
            
            # 获取每个Sentinel的状态
            for i, config in enumerate(self.sentinel_configs):
                try:
                    sentinel_client = redis.Redis(
                        host=config['host'],
                        port=config['port'],
                        decode_responses=True
                    )
                    
                    # 获取Sentinel信息
                    sentinel_info = sentinel_client.info('sentinel')
                    
                    # 获取主节点信息
                    master_info = sentinel_client.execute_command(
                        'SENTINEL', 'masters'
                    )
                    
                    # 获取从节点信息
                    slaves_info = sentinel_client.execute_command(
                        'SENTINEL', 'slaves', self.service_name
                    )
                    
                    status['sentinels'].append({
                        'index': i,
                        'host': config['host'],
                        'port': config['port'],
                        'connected': True,
                        'sentinel_masters': sentinel_info.get('sentinel_masters', 0),
                        'sentinel_running_scripts': sentinel_info.get('sentinel_running_scripts', 0),
                        'sentinel_scripts_queue_length': sentinel_info.get('sentinel_scripts_queue_length', 0)
                    })
                    
                    if status['master_info'] is None:
                        status['master_info'] = master_info
                    
                    if status['slaves_info'] is None:
                        status['slaves_info'] = slaves_info
                    
                except Exception as e:
                    status['sentinels'].append({
                        'index': i,
                        'host': config['host'],
                        'port': config['port'],
                        'connected': False,
                        'error': str(e)
                    })
            
            # 统计
            connected_sentinels = sum(1 for s in status['sentinels'] if s['connected'])
            status['summary'] = {
                'total_sentinels': len(self.sentinel_configs),
                'connected_sentinels': connected_sentinels,
                'quorum_available': connected_sentinels >= (len(self.sentinel_configs) // 2 + 1)
            }
            
            return {
                'success': True,
                'status': status,
                'timestamp': datetime.now().isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to get sentinel status: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def force_failover(self) -> Dict[str, Any]:
        """强制故障转移"""
        try:
            # 连接到第一个可用的Sentinel
            sentinel_client = None
            for config in self.sentinel_configs:
                try:
                    sentinel_client = redis.Redis(
                        host=config['host'],
                        port=config['port'],
                        decode_responses=True
                    )
                    sentinel_client.ping()
                    break
                except:
                    continue
            
            if sentinel_client is None:
                return {
                    'success': False,
                    'error': 'No available Sentinel found'
                }
            
            # 执行故障转移
            result = sentinel_client.execute_command(
                'SENTINEL', 'failover', self.service_name
            )
            
            # 等待故障转移完成
            time.sleep(2)
            
            # 获取新的主节点信息
            new_master = self.discover_master()
            
            return {
                'success': True,
                'failover_result': result,
                'new_master': new_master,
                'timestamp': datetime.now().isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to force failover: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def monitor_failover_events(self, duration_seconds: int = 300) -> Dict[str, Any]:
        """监控故障转移事件"""
        events = []
        start_time = time.time()
        end_time = start_time + duration_seconds
        
        try:
            # 连接到Sentinel进行事件监听
            sentinel_client = None
            for config in self.sentinel_configs:
                try:
                    sentinel_client = redis.Redis(
                        host=config['host'],
                        port=config['port'],
                        decode_responses=True
                    )
                    sentinel_client.ping()
                    break
                except:
                    continue
            
            if sentinel_client is None:
                return {
                    'success': False,
                    'error': 'No available Sentinel found'
                }
            
            # 订阅Sentinel事件
            pubsub = sentinel_client.pubsub()
            pubsub.subscribe('__sentinel__:hello')
            
            while time.time() < end_time:
                try:
                    message = pubsub.get_message(timeout=1)
                    if message and message['type'] == 'message':
                        events.append({
                            'timestamp': datetime.now().isoformat(),
                            'channel': message['channel'],
                            'data': message['data'],
                            'type': message['type']
                        })
                except:
                    continue
            
            pubsub.close()
            
            return {
                'success': True,
                'events': events,
                'event_count': len(events),
                'monitoring_duration': duration_seconds,
                'timestamp': datetime.now().isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to monitor failover events: {e}")
            return {
                'success': False,
                'error': str(e),
                'partial_events': events
            }

### 11.3.2 Sentinel高可用客户端

```python
class SentinelAwareClient:
    def __init__(self, sentinel_configs: List[Dict[str, Any]],
                 service_name: str = 'mymaster',
                 retry_attempts: int = 3,
                 retry_delay: float = 1.0):
        self.sentinel_configs = sentinel_configs
        self.service_name = service_name
        self.retry_attempts = retry_attempts
        self.retry_delay = retry_delay
        self.logger = logging.getLogger(__name__)
        
        # 创建Sentinel连接
        from redis.sentinel import Sentinel
        self.sentinel = Sentinel(
            [(config['host'], config['port']) for config in sentinel_configs],
            decode_responses=True
        )
        
        # 缓存连接
        self._master_client = None
        self._slave_client = None
        self._last_master_check = 0
        self._master_check_interval = 30  # 30秒检查一次主节点
    
    def _get_master_client(self, force_refresh: bool = False) -> redis.Redis:
        """获取主节点客户端(带缓存)"""
        current_time = time.time()
        
        if (force_refresh or 
            self._master_client is None or 
            current_time - self._last_master_check > self._master_check_interval):
            
            try:
                self._master_client = self.sentinel.master_for(self.service_name)
                self._last_master_check = current_time
            except Exception as e:
                self.logger.error(f"Failed to get master client: {e}")
                if self._master_client is None:
                    raise
        
        return self._master_client
    
    def _get_slave_client(self) -> redis.Redis:
        """获取从节点客户端"""
        if self._slave_client is None:
            try:
                self._slave_client = self.sentinel.slave_for(self.service_name)
            except Exception as e:
                self.logger.warning(f"Failed to get slave client, using master: {e}")
                return self._get_master_client()
        
        return self._slave_client
    
    def _execute_with_retry(self, operation: str, client_type: str = 'master', 
                          *args, **kwargs) -> Any:
        """带重试的命令执行"""
        last_exception = None
        
        for attempt in range(self.retry_attempts):
            try:
                if client_type == 'master':
                    client = self._get_master_client(force_refresh=attempt > 0)
                else:
                    client = self._get_slave_client()
                
                return client.execute_command(operation, *args, **kwargs)
            
            except Exception as e:
                last_exception = e
                self.logger.warning(
                    f"Attempt {attempt + 1} failed for {operation}: {e}"
                )
                
                if attempt < self.retry_attempts - 1:
                    time.sleep(self.retry_delay * (2 ** attempt))  # 指数退避
                    
                    # 强制刷新连接
                    if client_type == 'master':
                        self._master_client = None
                    else:
                        self._slave_client = None
        
        raise last_exception
    
    def set(self, key: str, value: Any, **kwargs) -> bool:
        """设置键值(写操作)"""
        return self._execute_with_retry('SET', 'master', key, value, **kwargs)
    
    def get(self, key: str) -> Any:
        """获取键值(读操作,优先使用从节点)"""
        try:
            return self._execute_with_retry('GET', 'slave', key)
        except Exception as e:
            self.logger.warning(f"Slave read failed, fallback to master: {e}")
            return self._execute_with_retry('GET', 'master', key)
    
    def hset(self, name: str, key: str, value: Any) -> int:
        """哈希设置(写操作)"""
        return self._execute_with_retry('HSET', 'master', name, key, value)
    
    def hget(self, name: str, key: str) -> Any:
        """哈希获取(读操作)"""
        try:
            return self._execute_with_retry('HGET', 'slave', name, key)
        except Exception as e:
            self.logger.warning(f"Slave read failed, fallback to master: {e}")
            return self._execute_with_retry('HGET', 'master', name, key)
    
    def ping(self) -> Dict[str, Any]:
        """健康检查"""
        status = {
            'master': {'available': False, 'latency_ms': None, 'error': None},
            'slave': {'available': False, 'latency_ms': None, 'error': None},
            'sentinels': []
        }
        
        # 检查主节点
        try:
            start_time = time.time()
            master_client = self._get_master_client()
            master_client.ping()
            latency = (time.time() - start_time) * 1000
            status['master'] = {
                'available': True,
                'latency_ms': round(latency, 2),
                'error': None
            }
        except Exception as e:
            status['master']['error'] = str(e)
        
        # 检查从节点
        try:
            start_time = time.time()
            slave_client = self._get_slave_client()
            slave_client.ping()
            latency = (time.time() - start_time) * 1000
            status['slave'] = {
                'available': True,
                'latency_ms': round(latency, 2),
                'error': None
            }
        except Exception as e:
            status['slave']['error'] = str(e)
        
        # 检查Sentinel节点
        for i, config in enumerate(self.sentinel_configs):
            sentinel_status = {
                'index': i,
                'host': config['host'],
                'port': config['port'],
                'available': False,
                'latency_ms': None,
                'error': None
            }
            
            try:
                start_time = time.time()
                sentinel_client = redis.Redis(
                    host=config['host'],
                    port=config['port'],
                    decode_responses=True
                )
                sentinel_client.ping()
                latency = (time.time() - start_time) * 1000
                sentinel_status.update({
                    'available': True,
                    'latency_ms': round(latency, 2)
                })
            except Exception as e:
                sentinel_status['error'] = str(e)
            
            status['sentinels'].append(sentinel_status)
        
        # 计算总体状态
        available_sentinels = sum(1 for s in status['sentinels'] if s['available'])
        status['summary'] = {
            'master_available': status['master']['available'],
            'slave_available': status['slave']['available'],
            'available_sentinels': available_sentinels,
            'total_sentinels': len(self.sentinel_configs),
            'quorum_available': available_sentinels >= (len(self.sentinel_configs) // 2 + 1),
            'overall_healthy': (status['master']['available'] and 
                              available_sentinels >= (len(self.sentinel_configs) // 2 + 1))
        }
        
        return status
    
    def get_current_topology(self) -> Dict[str, Any]:
        """获取当前拓扑结构"""
        try:
            master_info = self.sentinel.discover_master(self.service_name)
            slaves_info = self.sentinel.discover_slaves(self.service_name)
            
            return {
                'success': True,
                'service_name': self.service_name,
                'master': {
                    'host': master_info[0],
                    'port': master_info[1]
                },
                'slaves': [
                    {'host': slave[0], 'port': slave[1]} 
                    for slave in slaves_info
                ],
                'slave_count': len(slaves_info),
                'timestamp': datetime.now().isoformat()
            }
        except Exception as e:
            return {
                'success': False,
                'error': str(e)
            }

# 使用示例
if __name__ == "__main__":
    # 主从复制示例
    master_config = {'host': 'localhost', 'port': 6379}
    slave_configs = [
        {'host': 'localhost', 'port': 6380},
        {'host': 'localhost', 'port': 6381}
    ]
    
    # 设置主从复制
    ms_manager = RedisMasterSlaveManager(master_config, slave_configs)
    replication_result = ms_manager.setup_replication()
    print(f"Replication setup: {replication_result}")
    
    # 读写分离客户端
    rw_client = ReadWriteSplitClient(master_config, slave_configs)
    
    # 写入数据(主节点)
    rw_client.set('test_key', 'test_value')
    
    # 读取数据(从节点)
    value = rw_client.get('test_key')
    print(f"Read value: {value}")
    
    # 健康检查
    health = rw_client.check_health()
    print(f"Health status: {health}")
    
    # Sentinel示例
    sentinel_configs = [
        {'host': 'localhost', 'port': 26379},
        {'host': 'localhost', 'port': 26380},
        {'host': 'localhost', 'port': 26381}
    ]
    
    # Sentinel管理器
    sentinel_manager = RedisSentinelManager(sentinel_configs)
    
    # 发现主节点
    master_discovery = sentinel_manager.discover_master()
    print(f"Master discovery: {master_discovery}")
    
    # Sentinel感知客户端
    sentinel_client = SentinelAwareClient(sentinel_configs)
    
    # 使用Sentinel客户端
    sentinel_client.set('sentinel_key', 'sentinel_value')
    value = sentinel_client.get('sentinel_key')
    print(f"Sentinel client value: {value}")
    
    # 获取拓扑结构
    topology = sentinel_client.get_current_topology()
    print(f"Current topology: {topology}")

11.4 Redis Cluster

11.4.1 Cluster架构和分片

Redis Cluster提供自动数据分片和高可用性。

from rediscluster import RedisCluster
import hashlib

class RedisClusterManager:
    def __init__(self, startup_nodes: List[Dict[str, Any]], 
                 decode_responses: bool = True):
        self.startup_nodes = startup_nodes
        self.decode_responses = decode_responses
        self.logger = logging.getLogger(__name__)
        
        # 创建集群连接
        self.cluster = RedisCluster(
            startup_nodes=startup_nodes,
            decode_responses=decode_responses,
            skip_full_coverage_check=True
        )
    
    def get_cluster_info(self) -> Dict[str, Any]:
        """获取集群信息"""
        try:
            # 获取集群节点信息
            cluster_nodes = self.cluster.cluster_nodes()
            
            # 解析节点信息
            nodes = []
            masters = []
            slaves = []
            
            for node_id, node_info in cluster_nodes.items():
                node_data = {
                    'node_id': node_id,
                    'host': node_info['host'],
                    'port': node_info['port'],
                    'flags': node_info['flags'],
                    'master_id': node_info.get('master'),
                    'slots': node_info.get('slots', []),
                    'is_master': 'master' in node_info['flags'],
                    'is_slave': 'slave' in node_info['flags']
                }
                
                nodes.append(node_data)
                
                if node_data['is_master']:
                    masters.append(node_data)
                elif node_data['is_slave']:
                    slaves.append(node_data)
            
            # 获取集群状态
            cluster_info = self.cluster.cluster_info()
            
            return {
                'success': True,
                'cluster_state': cluster_info.get('cluster_state'),
                'cluster_slots_assigned': cluster_info.get('cluster_slots_assigned'),
                'cluster_slots_ok': cluster_info.get('cluster_slots_ok'),
                'cluster_slots_pfail': cluster_info.get('cluster_slots_pfail'),
                'cluster_slots_fail': cluster_info.get('cluster_slots_fail'),
                'cluster_known_nodes': cluster_info.get('cluster_known_nodes'),
                'cluster_size': cluster_info.get('cluster_size'),
                'nodes': nodes,
                'masters': masters,
                'slaves': slaves,
                'master_count': len(masters),
                'slave_count': len(slaves),
                'total_nodes': len(nodes),
                'timestamp': datetime.now().isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to get cluster info: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def get_slot_distribution(self) -> Dict[str, Any]:
        """获取槽位分布"""
        try:
            cluster_info = self.get_cluster_info()
            if not cluster_info['success']:
                return cluster_info
            
            slot_distribution = {}
            total_slots = 16384
            assigned_slots = 0
            
            for master in cluster_info['masters']:
                node_key = f"{master['host']}:{master['port']}"
                slots = master['slots']
                slot_count = 0
                
                # 计算槽位数量
                for slot_range in slots:
                    if isinstance(slot_range, list) and len(slot_range) == 2:
                        slot_count += slot_range[1] - slot_range[0] + 1
                    else:
                        slot_count += 1
                
                assigned_slots += slot_count
                
                slot_distribution[node_key] = {
                    'node_id': master['node_id'],
                    'host': master['host'],
                    'port': master['port'],
                    'slot_ranges': slots,
                    'slot_count': slot_count,
                    'slot_percentage': round((slot_count / total_slots) * 100, 2)
                }
            
            return {
                'success': True,
                'total_slots': total_slots,
                'assigned_slots': assigned_slots,
                'unassigned_slots': total_slots - assigned_slots,
                'slot_distribution': slot_distribution,
                'is_fully_covered': assigned_slots == total_slots,
                'timestamp': datetime.now().isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to get slot distribution: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def calculate_key_slot(self, key: str) -> int:
        """计算键的槽位"""
        # Redis Cluster使用CRC16算法计算槽位
        import crc16
        return crc16.crc16xmodem(key.encode()) % 16384
    
    def find_key_node(self, key: str) -> Dict[str, Any]:
        """查找键所在的节点"""
        try:
            slot = self.calculate_key_slot(key)
            cluster_info = self.get_cluster_info()
            
            if not cluster_info['success']:
                return cluster_info
            
            # 查找负责该槽位的主节点
            for master in cluster_info['masters']:
                for slot_range in master['slots']:
                    if isinstance(slot_range, list) and len(slot_range) == 2:
                        if slot_range[0] <= slot <= slot_range[1]:
                            return {
                                'success': True,
                                'key': key,
                                'slot': slot,
                                'node': {
                                    'node_id': master['node_id'],
                                    'host': master['host'],
                                    'port': master['port']
                                }
                            }
                    elif slot_range == slot:
                        return {
                            'success': True,
                            'key': key,
                            'slot': slot,
                            'node': {
                                'node_id': master['node_id'],
                                'host': master['host'],
                                'port': master['port']
                            }
                        }
            
            return {
                'success': False,
                'error': f"No node found for slot {slot}",
                'key': key,
                'slot': slot
            }
            
        except Exception as e:
            self.logger.error(f"Failed to find key node: {e}")
            return {
                'success': False,
                'error': str(e),
                'key': key
            }
    
    def rebalance_slots(self, target_distribution: Dict[str, int] = None) -> Dict[str, Any]:
        """重新平衡槽位分布"""
        try:
            cluster_info = self.get_cluster_info()
            if not cluster_info['success']:
                return cluster_info
            
            masters = cluster_info['masters']
            total_slots = 16384
            
            if target_distribution is None:
                # 平均分配
                slots_per_master = total_slots // len(masters)
                target_distribution = {}
                
                for i, master in enumerate(masters):
                    node_key = f"{master['host']}:{master['port']}"
                    if i == len(masters) - 1:
                        # 最后一个节点分配剩余的槽位
                        target_distribution[node_key] = total_slots - (slots_per_master * i)
                    else:
                        target_distribution[node_key] = slots_per_master
            
            # 计算需要移动的槽位
            current_distribution = self.get_slot_distribution()
            if not current_distribution['success']:
                return current_distribution
            
            moves = []
            for node_key, target_slots in target_distribution.items():
                current_slots = current_distribution['slot_distribution'].get(node_key, {}).get('slot_count', 0)
                difference = target_slots - current_slots
                
                if difference != 0:
                    moves.append({
                        'node': node_key,
                        'current_slots': current_slots,
                        'target_slots': target_slots,
                        'difference': difference,
                        'action': 'receive' if difference > 0 else 'give'
                    })
            
            return {
                'success': True,
                'current_distribution': current_distribution['slot_distribution'],
                'target_distribution': target_distribution,
                'required_moves': moves,
                'rebalance_needed': len(moves) > 0,
                'note': 'This is a plan. Actual slot migration requires cluster management tools.',
                'timestamp': datetime.now().isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to rebalance slots: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def monitor_cluster_health(self) -> Dict[str, Any]:
        """监控集群健康状态"""
        try:
            cluster_info = self.get_cluster_info()
            if not cluster_info['success']:
                return cluster_info
            
            health_status = {
                'overall_health': 'healthy',
                'issues': [],
                'node_status': [],
                'slot_coverage': None,
                'replication_status': None
            }
            
            # 检查节点状态
            for node in cluster_info['nodes']:
                node_status = {
                    'node_id': node['node_id'],
                    'host': node['host'],
                    'port': node['port'],
                    'is_master': node['is_master'],
                    'is_slave': node['is_slave'],
                    'healthy': True,
                    'issues': []
                }
                
                # 检查节点标志
                if 'fail' in node['flags']:
                    node_status['healthy'] = False
                    node_status['issues'].append('Node marked as failed')
                    health_status['issues'].append(f"Node {node['host']}:{node['port']} is failed")
                
                if 'pfail' in node['flags']:
                    node_status['issues'].append('Node possibly failing')
                    health_status['issues'].append(f"Node {node['host']}:{node['port']} is possibly failing")
                
                health_status['node_status'].append(node_status)
            
            # 检查槽位覆盖
            slot_distribution = self.get_slot_distribution()
            if slot_distribution['success']:
                health_status['slot_coverage'] = {
                    'fully_covered': slot_distribution['is_fully_covered'],
                    'assigned_slots': slot_distribution['assigned_slots'],
                    'unassigned_slots': slot_distribution['unassigned_slots']
                }
                
                if not slot_distribution['is_fully_covered']:
                    health_status['overall_health'] = 'degraded'
                    health_status['issues'].append(
                        f"{slot_distribution['unassigned_slots']} slots are not assigned"
                    )
            
            # 检查复制状态
            master_count = cluster_info['master_count']
            slave_count = cluster_info['slave_count']
            
            health_status['replication_status'] = {
                'master_count': master_count,
                'slave_count': slave_count,
                'replication_factor': slave_count / master_count if master_count > 0 else 0
            }
            
            if slave_count == 0:
                health_status['overall_health'] = 'at_risk'
                health_status['issues'].append('No slave nodes for replication')
            elif slave_count < master_count:
                health_status['issues'].append('Some masters have no slaves')
            
            # 设置总体健康状态
            if health_status['issues']:
                if health_status['overall_health'] == 'healthy':
                    health_status['overall_health'] = 'warning'
            
            return {
                'success': True,
                'health_status': health_status,
                'timestamp': datetime.now().isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to monitor cluster health: {e}")
            return {
                'success': False,
                'error': str(e)
            }

### 11.4.2 Cluster客户端

```python
class ClusterAwareClient:
    def __init__(self, startup_nodes: List[Dict[str, Any]],
                 retry_attempts: int = 3,
                 retry_delay: float = 1.0):
        self.startup_nodes = startup_nodes
        self.retry_attempts = retry_attempts
        self.retry_delay = retry_delay
        self.logger = logging.getLogger(__name__)
        
        # 创建集群连接
        self.cluster = RedisCluster(
            startup_nodes=startup_nodes,
            decode_responses=True,
            skip_full_coverage_check=True,
            max_connections=32,
            retry_on_timeout=True
        )
        
        # 性能统计
        self.stats = {
            'total_operations': 0,
            'successful_operations': 0,
            'failed_operations': 0,
            'redirections': 0,
            'cross_slot_operations': 0
        }
    
    def _execute_with_retry(self, operation: callable, *args, **kwargs) -> Any:
        """带重试的操作执行"""
        last_exception = None
        
        for attempt in range(self.retry_attempts):
            try:
                self.stats['total_operations'] += 1
                result = operation(*args, **kwargs)
                self.stats['successful_operations'] += 1
                return result
            
            except Exception as e:
                last_exception = e
                self.stats['failed_operations'] += 1
                
                # 检查是否为重定向错误
                if 'MOVED' in str(e) or 'ASK' in str(e):
                    self.stats['redirections'] += 1
                
                self.logger.warning(
                    f"Attempt {attempt + 1} failed: {e}"
                )
                
                if attempt < self.retry_attempts - 1:
                    time.sleep(self.retry_delay * (2 ** attempt))
        
        raise last_exception
    
    def set(self, key: str, value: Any, **kwargs) -> bool:
        """设置键值"""
        return self._execute_with_retry(self.cluster.set, key, value, **kwargs)
    
    def get(self, key: str) -> Any:
        """获取键值"""
        return self._execute_with_retry(self.cluster.get, key)
    
    def mset(self, mapping: Dict[str, Any]) -> bool:
        """批量设置(处理跨槽位问题)"""
        try:
            # 按槽位分组
            slot_groups = {}
            
            for key, value in mapping.items():
                slot = self._calculate_slot(key)
                if slot not in slot_groups:
                    slot_groups[slot] = {}
                slot_groups[slot][key] = value
            
            if len(slot_groups) > 1:
                self.stats['cross_slot_operations'] += 1
                # 跨槽位操作,分别执行
                for slot, group_mapping in slot_groups.items():
                    self._execute_with_retry(self.cluster.mset, group_mapping)
                return True
            else:
                # 同槽位操作
                return self._execute_with_retry(self.cluster.mset, mapping)
        
        except Exception as e:
            self.logger.error(f"Failed to execute mset: {e}")
            raise
    
    def mget(self, keys: List[str]) -> List[Any]:
        """批量获取(处理跨槽位问题)"""
        try:
            # 按槽位分组
            slot_groups = {}
            key_to_slot = {}
            
            for key in keys:
                slot = self._calculate_slot(key)
                key_to_slot[key] = slot
                if slot not in slot_groups:
                    slot_groups[slot] = []
                slot_groups[slot].append(key)
            
            if len(slot_groups) > 1:
                self.stats['cross_slot_operations'] += 1
                # 跨槽位操作,分别获取后合并
                all_results = {}
                for slot, group_keys in slot_groups.items():
                    group_results = self._execute_with_retry(self.cluster.mget, group_keys)
                    for i, key in enumerate(group_keys):
                        all_results[key] = group_results[i]
                
                # 按原始顺序返回结果
                return [all_results[key] for key in keys]
            else:
                # 同槽位操作
                return self._execute_with_retry(self.cluster.mget, keys)
        
        except Exception as e:
            self.logger.error(f"Failed to execute mget: {e}")
            raise
    
    def _calculate_slot(self, key: str) -> int:
        """计算键的槽位"""
        import crc16
        return crc16.crc16xmodem(key.encode()) % 16384
    
    def pipeline_execute(self, commands: List[Dict[str, Any]]) -> List[Any]:
        """管道执行(按槽位分组)"""
        try:
            # 按槽位分组命令
            slot_groups = {}
            
            for i, cmd in enumerate(commands):
                key = cmd.get('key')
                if key:
                    slot = self._calculate_slot(key)
                    if slot not in slot_groups:
                        slot_groups[slot] = []
                    slot_groups[slot].append((i, cmd))
            
            if len(slot_groups) > 1:
                self.stats['cross_slot_operations'] += 1
            
            # 执行分组的命令
            all_results = [None] * len(commands)
            
            for slot, group_commands in slot_groups.items():
                # 为每个槽位创建管道
                pipe = self.cluster.pipeline()
                
                for original_index, cmd in group_commands:
                    operation = cmd['operation']
                    args = cmd.get('args', [])
                    kwargs = cmd.get('kwargs', {})
                    
                    getattr(pipe, operation)(*args, **kwargs)
                
                # 执行管道
                group_results = pipe.execute()
                
                # 将结果放回原始位置
                for i, (original_index, _) in enumerate(group_commands):
                    all_results[original_index] = group_results[i]
            
            return all_results
        
        except Exception as e:
            self.logger.error(f"Failed to execute pipeline: {e}")
            raise
    
    def get_stats(self) -> Dict[str, Any]:
        """获取性能统计"""
        total_ops = self.stats['total_operations']
        if total_ops == 0:
            success_rate = 0
            redirection_rate = 0
            cross_slot_rate = 0
        else:
            success_rate = (self.stats['successful_operations'] / total_ops) * 100
            redirection_rate = (self.stats['redirections'] / total_ops) * 100
            cross_slot_rate = (self.stats['cross_slot_operations'] / total_ops) * 100
        
        return {
            'total_operations': total_ops,
            'successful_operations': self.stats['successful_operations'],
            'failed_operations': self.stats['failed_operations'],
            'redirections': self.stats['redirections'],
            'cross_slot_operations': self.stats['cross_slot_operations'],
            'success_rate': round(success_rate, 2),
            'redirection_rate': round(redirection_rate, 2),
            'cross_slot_rate': round(cross_slot_rate, 2)
        }
    
    def reset_stats(self) -> None:
        """重置统计信息"""
        self.stats = {
            'total_operations': 0,
            'successful_operations': 0,
            'failed_operations': 0,
            'redirections': 0,
            'cross_slot_operations': 0
        }

## 11.5 高可用性架构模式

### 11.5.1 多数据中心部署

```python
class MultiDataCenterRedis:
    def __init__(self, datacenters: Dict[str, Dict[str, Any]]):
        self.datacenters = datacenters
        self.logger = logging.getLogger(__name__)
        self.clients = {}
        self.primary_dc = None
        
        # 初始化各数据中心的连接
        self._initialize_connections()
    
    def _initialize_connections(self) -> None:
        """初始化数据中心连接"""
        for dc_name, dc_config in self.datacenters.items():
            try:
                if dc_config['type'] == 'cluster':
                    client = RedisCluster(
                        startup_nodes=dc_config['nodes'],
                        decode_responses=True
                    )
                elif dc_config['type'] == 'sentinel':
                    from redis.sentinel import Sentinel
                    sentinel = Sentinel(dc_config['sentinels'])
                    client = sentinel.master_for(dc_config['service_name'])
                else:
                    client = redis.Redis(**dc_config['connection'])
                
                self.clients[dc_name] = {
                    'client': client,
                    'config': dc_config,
                    'healthy': True,
                    'latency': 0
                }
                
                # 设置主数据中心
                if dc_config.get('primary', False):
                    self.primary_dc = dc_name
                    
            except Exception as e:
                self.logger.error(f"Failed to connect to datacenter {dc_name}: {e}")
                self.clients[dc_name] = {
                    'client': None,
                    'config': dc_config,
                    'healthy': False,
                    'error': str(e)
                }
    
    def write_with_replication(self, key: str, value: Any, 
                             consistency_level: str = 'eventual') -> Dict[str, Any]:
        """跨数据中心写入"""
        results = {}
        primary_success = False
        
        try:
            # 首先写入主数据中心
            if self.primary_dc and self.clients[self.primary_dc]['healthy']:
                try:
                    primary_client = self.clients[self.primary_dc]['client']
                    primary_client.set(key, value)
                    primary_success = True
                    results[self.primary_dc] = {'success': True, 'role': 'primary'}
                except Exception as e:
                    results[self.primary_dc] = {
                        'success': False, 
                        'role': 'primary', 
                        'error': str(e)
                    }
            
            # 根据一致性级别决定是否等待副本写入
            if consistency_level == 'strong' and not primary_success:
                return {
                    'success': False,
                    'error': 'Primary datacenter write failed',
                    'results': results
                }
            
            # 异步写入其他数据中心
            replica_writes = []
            for dc_name, dc_info in self.clients.items():
                if dc_name != self.primary_dc and dc_info['healthy']:
                    try:
                        dc_info['client'].set(key, value)
                        results[dc_name] = {'success': True, 'role': 'replica'}
                        replica_writes.append(dc_name)
                    except Exception as e:
                        results[dc_name] = {
                            'success': False, 
                            'role': 'replica', 
                            'error': str(e)
                        }
            
            # 评估写入结果
            successful_writes = sum(1 for r in results.values() if r['success'])
            total_dcs = len(self.clients)
            
            success = False
            if consistency_level == 'eventual':
                success = primary_success or successful_writes > 0
            elif consistency_level == 'majority':
                success = successful_writes > (total_dcs // 2)
            elif consistency_level == 'strong':
                success = primary_success and successful_writes == total_dcs
            
            return {
                'success': success,
                'consistency_level': consistency_level,
                'primary_success': primary_success,
                'successful_writes': successful_writes,
                'total_datacenters': total_dcs,
                'results': results,
                'timestamp': datetime.now().isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to write with replication: {e}")
            return {
                'success': False,
                'error': str(e),
                'results': results
            }
    
    def read_with_fallback(self, key: str, 
                          prefer_local: bool = True) -> Dict[str, Any]:
        """带故障转移的读取"""
        read_order = []
        
        if prefer_local and self.primary_dc:
            read_order.append(self.primary_dc)
            read_order.extend([dc for dc in self.clients.keys() 
                             if dc != self.primary_dc])
        else:
            read_order = list(self.clients.keys())
        
        for dc_name in read_order:
            dc_info = self.clients[dc_name]
            if not dc_info['healthy']:
                continue
            
            try:
                start_time = time.time()
                value = dc_info['client'].get(key)
                latency = (time.time() - start_time) * 1000
                
                return {
                    'success': True,
                    'value': value,
                    'datacenter': dc_name,
                    'latency_ms': round(latency, 2),
                    'fallback_used': dc_name != read_order[0]
                }
                
            except Exception as e:
                self.logger.warning(f"Read failed from {dc_name}: {e}")
                continue
        
        return {
            'success': False,
            'error': 'All datacenters failed',
            'attempted_datacenters': read_order
        }
    
    def check_datacenter_health(self) -> Dict[str, Any]:
        """检查数据中心健康状态"""
        health_results = {}
        
        for dc_name, dc_info in self.clients.items():
            if dc_info['client'] is None:
                health_results[dc_name] = {
                    'healthy': False,
                    'error': dc_info.get('error', 'No client available')
                }
                continue
            
            try:
                start_time = time.time()
                dc_info['client'].ping()
                latency = (time.time() - start_time) * 1000
                
                # 更新客户端状态
                dc_info['healthy'] = True
                dc_info['latency'] = latency
                
                health_results[dc_name] = {
                    'healthy': True,
                    'latency_ms': round(latency, 2),
                    'role': 'primary' if dc_name == self.primary_dc else 'replica'
                }
                
            except Exception as e:
                dc_info['healthy'] = False
                health_results[dc_name] = {
                    'healthy': False,
                    'error': str(e),
                    'role': 'primary' if dc_name == self.primary_dc else 'replica'
                }
        
        # 计算总体健康状态
        healthy_dcs = sum(1 for h in health_results.values() if h['healthy'])
        primary_healthy = (self.primary_dc and 
                          health_results.get(self.primary_dc, {}).get('healthy', False))
        
        return {
            'datacenters': health_results,
            'summary': {
                'total_datacenters': len(self.clients),
                'healthy_datacenters': healthy_dcs,
                'primary_datacenter': self.primary_dc,
                'primary_healthy': primary_healthy,
                'overall_healthy': healthy_dcs > 0,
                'high_availability': healthy_dcs > 1
            },
            'timestamp': datetime.now().isoformat()
        }
    
    def failover_to_datacenter(self, target_dc: str) -> Dict[str, Any]:
        """故障转移到指定数据中心"""
        if target_dc not in self.clients:
            return {
                'success': False,
                'error': f"Datacenter {target_dc} not found"
            }
        
        if not self.clients[target_dc]['healthy']:
            return {
                'success': False,
                'error': f"Target datacenter {target_dc} is not healthy"
            }
        
        old_primary = self.primary_dc
        self.primary_dc = target_dc
        
        # 更新配置
        if old_primary:
            self.clients[old_primary]['config']['primary'] = False
        self.clients[target_dc]['config']['primary'] = True
        
        return {
            'success': True,
            'old_primary': old_primary,
            'new_primary': target_dc,
            'timestamp': datetime.now().isoformat()
        }

### 11.5.2 负载均衡和代理

```python
class RedisLoadBalancer:
    def __init__(self, redis_nodes: List[Dict[str, Any]], 
                 strategy: str = 'round_robin'):
        self.redis_nodes = redis_nodes
        self.strategy = strategy
        self.logger = logging.getLogger(__name__)
        
        # 初始化连接池
        self.connections = []
        self.node_weights = []
        self.current_index = 0
        
        self._initialize_connections()
        
        # 健康检查
        self.health_check_interval = 30
        self.last_health_check = 0
    
    def _initialize_connections(self) -> None:
        """初始化连接"""
        for i, node_config in enumerate(self.redis_nodes):
            try:
                if node_config.get('type') == 'cluster':
                    client = RedisCluster(
                        startup_nodes=node_config['startup_nodes'],
                        decode_responses=True
                    )
                else:
                    client = redis.Redis(**node_config['connection'], decode_responses=True)
                
                self.connections.append({
                    'client': client,
                    'config': node_config,
                    'healthy': True,
                    'weight': node_config.get('weight', 1),
                    'current_connections': 0,
                    'total_requests': 0,
                    'failed_requests': 0
                })
                
                self.node_weights.append(node_config.get('weight', 1))
                
            except Exception as e:
                self.logger.error(f"Failed to connect to node {i}: {e}")
                self.connections.append({
                    'client': None,
                    'config': node_config,
                    'healthy': False,
                    'error': str(e)
                })
                self.node_weights.append(0)
    
    def _select_node(self) -> Dict[str, Any]:
        """根据策略选择节点"""
        healthy_nodes = [conn for conn in self.connections if conn['healthy']]
        
        if not healthy_nodes:
            raise Exception("No healthy nodes available")
        
        if self.strategy == 'round_robin':
            node = healthy_nodes[self.current_index % len(healthy_nodes)]
            self.current_index += 1
            return node
        
        elif self.strategy == 'weighted_round_robin':
            total_weight = sum(conn['weight'] for conn in healthy_nodes)
            if total_weight == 0:
                return healthy_nodes[0]
            
            import random
            rand_val = random.uniform(0, total_weight)
            current_weight = 0
            
            for conn in healthy_nodes:
                current_weight += conn['weight']
                if rand_val <= current_weight:
                    return conn
            
            return healthy_nodes[-1]
        
        elif self.strategy == 'least_connections':
            return min(healthy_nodes, key=lambda x: x['current_connections'])
        
        elif self.strategy == 'least_response_time':
            # 简化实现,选择失败请求最少的节点
            return min(healthy_nodes, 
                      key=lambda x: x['failed_requests'] / max(x['total_requests'], 1))
        
        else:
            return healthy_nodes[0]
    
    def execute_command(self, command: str, *args, **kwargs) -> Any:
        """执行命令"""
        # 健康检查
        self._periodic_health_check()
        
        node = self._select_node()
        node['current_connections'] += 1
        node['total_requests'] += 1
        
        try:
            result = node['client'].execute_command(command, *args, **kwargs)
            return result
        
        except Exception as e:
            node['failed_requests'] += 1
            self.logger.error(f"Command failed on node: {e}")
            
            # 尝试其他节点
            healthy_nodes = [conn for conn in self.connections 
                           if conn['healthy'] and conn != node]
            
            if healthy_nodes:
                fallback_node = healthy_nodes[0]
                fallback_node['current_connections'] += 1
                fallback_node['total_requests'] += 1
                
                try:
                    result = fallback_node['client'].execute_command(command, *args, **kwargs)
                    return result
                except Exception as fallback_error:
                    fallback_node['failed_requests'] += 1
                    raise fallback_error
                finally:
                    fallback_node['current_connections'] -= 1
            
            raise e
        
        finally:
            node['current_connections'] -= 1
    
    def _periodic_health_check(self) -> None:
        """定期健康检查"""
        current_time = time.time()
        if current_time - self.last_health_check < self.health_check_interval:
            return
        
        self.last_health_check = current_time
        
        for conn in self.connections:
            if conn['client'] is None:
                continue
            
            try:
                conn['client'].ping()
                conn['healthy'] = True
            except Exception as e:
                conn['healthy'] = False
                self.logger.warning(f"Node health check failed: {e}")
    
    def get_load_balancer_stats(self) -> Dict[str, Any]:
        """获取负载均衡统计"""
        stats = {
            'strategy': self.strategy,
            'total_nodes': len(self.connections),
            'healthy_nodes': sum(1 for conn in self.connections if conn['healthy']),
            'nodes': []
        }
        
        for i, conn in enumerate(self.connections):
            node_stats = {
                'index': i,
                'healthy': conn['healthy'],
                'weight': conn.get('weight', 1),
                'current_connections': conn.get('current_connections', 0),
                'total_requests': conn.get('total_requests', 0),
                'failed_requests': conn.get('failed_requests', 0),
                'success_rate': 0
            }
            
            if node_stats['total_requests'] > 0:
                success_requests = node_stats['total_requests'] - node_stats['failed_requests']
                node_stats['success_rate'] = round(
                    (success_requests / node_stats['total_requests']) * 100, 2
                )
            
            if 'error' in conn:
                node_stats['error'] = conn['error']
            
            stats['nodes'].append(node_stats)
        
        return stats

# 使用示例
if __name__ == "__main__":
    # Redis Cluster示例
    startup_nodes = [
        {'host': 'localhost', 'port': 7000},
        {'host': 'localhost', 'port': 7001},
        {'host': 'localhost', 'port': 7002}
    ]
    
    # 集群管理器
    cluster_manager = RedisClusterManager(startup_nodes)
    
    # 获取集群信息
    cluster_info = cluster_manager.get_cluster_info()
    print(f"Cluster info: {cluster_info}")
    
    # 获取槽位分布
    slot_distribution = cluster_manager.get_slot_distribution()
    print(f"Slot distribution: {slot_distribution}")
    
    # 集群感知客户端
    cluster_client = ClusterAwareClient(startup_nodes)
    
    # 设置和获取数据
    cluster_client.set('cluster_key', 'cluster_value')
    value = cluster_client.get('cluster_key')
    print(f"Cluster value: {value}")
    
    # 批量操作
    mapping = {'key1': 'value1', 'key2': 'value2', 'key3': 'value3'}
    cluster_client.mset(mapping)
    values = cluster_client.mget(['key1', 'key2', 'key3'])
    print(f"Batch values: {values}")
    
    # 获取统计信息
    stats = cluster_client.get_stats()
    print(f"Client stats: {stats}")
    
    # 多数据中心示例
    datacenters = {
        'dc1': {
            'type': 'single',
            'connection': {'host': 'localhost', 'port': 6379},
            'primary': True
        },
        'dc2': {
            'type': 'single',
            'connection': {'host': 'localhost', 'port': 6380},
            'primary': False
        }
    }
    
    # 多数据中心管理器
    mdc_redis = MultiDataCenterRedis(datacenters)
    
    # 跨数据中心写入
    write_result = mdc_redis.write_with_replication(
        'mdc_key', 'mdc_value', consistency_level='majority'
    )
    print(f"Multi-DC write: {write_result}")
    
    # 带故障转移的读取
    read_result = mdc_redis.read_with_fallback('mdc_key')
    print(f"Multi-DC read: {read_result}")
    
    # 负载均衡示例
    redis_nodes = [
        {
            'connection': {'host': 'localhost', 'port': 6379},
            'weight': 3
        },
        {
            'connection': {'host': 'localhost', 'port': 6380},
            'weight': 2
        },
        {
            'connection': {'host': 'localhost', 'port': 6381},
            'weight': 1
        }
    ]
    
    # 负载均衡器
    load_balancer = RedisLoadBalancer(redis_nodes, strategy='weighted_round_robin')
    
    # 执行命令
    for i in range(10):
        load_balancer.execute_command('SET', f'lb_key_{i}', f'lb_value_{i}')
    
    # 获取负载均衡统计
    lb_stats = load_balancer.get_load_balancer_stats()
    print(f"Load balancer stats: {lb_stats}")

11.6 性能监控和调优

11.6.1 集群性能监控

”`python class RedisClusterMonitor: def init(self, cluster_nodes: List[Dict[str, Any]]): self.cluster_nodes = cluster_nodes self.logger = logging.getLogger(name)

    # 创建到各节点的连接
    self.node_clients = {}
    self._initialize_node_connections()

    # 监控指标
    self.metrics_history = []
    self.alert_thresholds = {
        'memory_usage_percent': 80,
        'cpu_usage_percent': 80,
        'connection_count': 1000,
        'ops_per_second': 10000,
        'replication_lag_seconds': 10
    }

def _initialize_node_connections(self) -> None:
    """初始化节点连接"""
    for node in self.cluster_nodes:
        node_key = f"{node['host']}:{node['port']}"
        try:
            client = redis.Redis(
                host=node['host'],
                port=node['port'],
                decode_responses=True
            )
            client.ping()
            self.node_clients[node_key] = client
        except Exception as e:
            self.logger.error(f"Failed to connect to node {node_key}: {e}")

def collect_cluster_metrics(self) -> Dict[str, Any]:
    """收集集群指标"""
    timestamp = datetime.now()
    cluster_metrics = {
        'timestamp': timestamp.isoformat(),
        'nodes': {},
        'cluster_summary': {
            'total_nodes': len(self.cluster_nodes),
            'healthy_nodes': 0,
            'total_memory_used': 0,
            'total_memory_max': 0,
            'total_connections': 0,
            'total_ops_per_second': 0,
            'average_cpu_usage': 0
        },
        'alerts': []
    }

    cpu_usage_sum = 0
    healthy_nodes = 0

    for node_key, client in self.node_clients.items():
        try:
            # 获取节点信息
            info = client.info()

            # 内存信息
            memory_used = info.get('used_memory', 0)
            memory_max = info.get('maxmemory', 0)
            memory_usage_percent = 0
            if memory_max > 0:
                memory_usage_percent = (memory_used / memory_max) * 100

            # 连接信息
            connected_clients = info.get('connected_clients', 0)

            # 操作统计
            ops_per_second = info.get('instantaneous_ops_per_sec', 0)

            # CPU使用率(简化计算)
            cpu_usage = info.get('used_cpu_sys', 0) + info.get('used_cpu_user', 0)

            # 复制信息
            replication_info = client.info('replication')
            role = replication_info.get('role', 'unknown')
            replication_lag = 0

            if role == 'slave':
                replication_lag = replication_info.get('master_last_io_seconds_ago', 0)

            node_metrics = {
                'node_key': node_key,
                'role': role,
                'healthy': True,
                'memory': {
                    'used_bytes': memory_used,
                    'max_bytes': memory_max,
                    'usage_percent': round(memory_usage_percent, 2)
                },
                'connections': {
                    'connected_clients': connected_clients
                },
                'performance': {
                    'ops_per_second': ops_per_second,
                    'cpu_usage_percent': round(cpu_usage, 2)
                },
                'replication': {
                    'role': role,
                    'lag_seconds': replication_lag
                }
            }

            # 检查告警阈值
            alerts = self._check_node_alerts(node_key, node_metrics)
            cluster_metrics['alerts'].extend(alerts)

            cluster_metrics['nodes'][node_key] = node_metrics

            # 累计统计
            healthy_nodes += 1
            cluster_metrics['cluster_summary']['total_memory_used'] += memory_used
            cluster_metrics['cluster_summary']['total_memory_max'] += memory_max
            cluster_metrics['cluster_summary']['total_connections'] += connected_clients
            cluster_metrics['cluster_summary']['total_ops_per_second'] += ops_per_second
            cpu_usage_sum += cpu_usage

        except Exception as e:
            self.logger.error(f"Failed to collect metrics from {node_key}: {e}")
            cluster_metrics['nodes'][node_key] = {
                'node_key': node_key,
                'healthy': False,
                'error': str(e)
            }

    # 计算集群汇总
    cluster_metrics['cluster_summary']['healthy_nodes'] = healthy_nodes
    if healthy_nodes > 0:
        cluster_metrics['cluster_summary']['average_cpu_usage'] = round(
            cpu_usage_sum / healthy_nodes, 2
        )

    # 计算集群内存使用率
    total_memory_max = cluster_metrics['cluster_summary']['total_memory_max']
    if total_memory_max > 0:
        cluster_usage_percent = (
            cluster_metrics['cluster_summary']['total_memory_used'] / total_memory_max
        ) * 100
        cluster_metrics['cluster_summary']['memory_usage_percent'] = round(
            cluster_usage_percent, 2
        )

    # 保存历史记录
    self.metrics_history.append(cluster_metrics)

    # 只保留最近100条记录
    if len(self.metrics_history) > 100:
        self.metrics_history = self.metrics_history[-100:]

    return cluster_metrics

def _check_node_alerts(self, node_key: str, node_metrics: Dict[str, Any]) -> List[Dict[str, Any]]:
    """检查节点告警"""
    alerts = []

    # 内存使用率告警
    memory_usage = node_metrics['memory']['usage_percent']
    if memory_usage > self.alert_thresholds['memory_usage_percent']:
        alerts.append({
            'type': 'memory_usage_high',
            'node': node_key,
            'severity': 'warning' if memory_usage < 90 else 'critical',
            'message': f"Memory usage {memory_usage}% exceeds threshold",
            'value': memory_usage,
            'threshold': self.alert_thresholds['memory_usage_percent']
        })

    # CPU使用率告警
    cpu_usage = node_metrics['performance']['cpu_usage_percent']
    if cpu_usage > self.alert_thresholds['cpu_usage_percent']:
        alerts.append({
            'type': 'cpu_usage_high',
            'node': node_key,
            'severity': 'warning' if cpu_usage < 90 else 'critical',
            'message': f"CPU usage {cpu_usage}% exceeds threshold",
            'value': cpu_usage,
            'threshold': self.alert_thresholds['cpu_usage_percent']
        })

    # 连接数告警
    connections = node_metrics['connections']['connected_clients']
    if connections > self.alert_thresholds['connection_count']:
        alerts.append({
            'type': 'connection_count_high',
            'node': node_key,
            'severity': 'warning',
            'message': f"Connection count {connections} exceeds threshold",
            'value': connections,
            'threshold': self.alert_thresholds['connection_count']
        })

    # 复制延迟告警
    if node_metrics['replication']['role'] == 'slave':
        lag = node_metrics['replication']['lag_seconds']
        if lag > self.alert_thresholds['replication_lag_seconds']:
            alerts.append({
                'type': 'replication_lag_high',
                'node': node_key,
                'severity': 'warning' if lag < 30 else 'critical',
                'message': f"Replication lag {lag}s exceeds threshold",
                'value': lag,
                'threshold': self.alert_thresholds['replication_lag_seconds']
            })

    return alerts

def get_performance_trends(self, hours: int = 24) -> Dict[str, Any]:
    """获取性能趋势"""
    if not self.metrics_history:
        return {'success': False, 'error': 'No metrics history available'}

    # 计算时间范围
    end_time = datetime.now()
    start_time = end_time - timedelta(hours=hours)

    # 过滤历史数据
    filtered_metrics = []
    for metrics in self.metrics_history:
        metrics_time = datetime.fromisoformat(metrics['timestamp'])
        if start_time <= metrics_time <= end_time:
            filtered_metrics.append(metrics)

    if not filtered_metrics:
        return {'success': False, 'error': 'No metrics in specified time range'}

    # 计算趋势
    trends = {
        'time_range': {
            'start': start_time.isoformat(),
            'end': end_time.isoformat(),
            'hours': hours
        },
        'cluster_trends': {
            'memory_usage': [],
            'total_connections': [],
            'total_ops_per_second': [],
            'healthy_nodes': []
        },
        'node_trends': {}
    }

    for metrics in filtered_metrics:
        timestamp = metrics['timestamp']
        summary = metrics['cluster_summary']

        # 集群趋势
        trends['cluster_trends']['memory_usage'].append({
            'timestamp': timestamp,
            'value': summary.get('memory_usage_percent', 0)
        })

        trends['cluster_trends']['total_connections'].append({
            'timestamp': timestamp,
            'value': summary.get('total_connections', 0)
        })

        trends['cluster_trends']['total_ops_per_second'].append({
            'timestamp': timestamp,
            'value': summary.get('total_ops_per_second', 0)
        })

        trends['cluster_trends']['healthy_nodes'].append({
            'timestamp': timestamp,
            'value': summary.get('healthy_nodes', 0)
        })

        # 节点趋势
        for node_key, node_metrics in metrics['nodes'].items():
            if node_key not in trends['node_trends']:
                trends['node_trends'][node_key] = {
                    'memory_usage': [],
                    'connections': [],
                    'ops_per_second': []
                }

            if node_metrics.get('healthy', False):
                trends['node_trends'][node_key]['memory_usage'].append({
                    'timestamp': timestamp,
                    'value': node_metrics['memory']['usage_percent']
                })

                trends['node_trends'][node_key]['connections'].append({
                    'timestamp': timestamp,
                    'value': node_metrics['connections']['connected_clients']
                })

                trends['node_trends'][node_key]['ops_per_second'].append({
                    'timestamp': timestamp,
                    'value': node_metrics['performance']['ops_per_second']
                })

    return {
        'success': True,
        'trends': trends,
        'data_points': len(filtered_metrics)
    }

def generate_health_report(self) -> Dict[str, Any]:
    """生成健康报告"""
    latest_metrics = self.collect_cluster_metrics()

    if not latest_metrics:
        return {'success': False, 'error': 'Failed to collect metrics'}

    summary = latest_metrics['cluster_summary']
    alerts = latest_metrics['alerts']

    # 计算健康分数
    health_score = 100

    # 节点健康度影响
    if summary['total_nodes'] > 0:
        node_health_ratio = summary['healthy_nodes'] / summary['total_nodes']
        if node_health_ratio < 1.0:
            health_score -= (1.0 - node_health_ratio) * 30

    # 告警影响
    critical_alerts = [a for a in alerts if a.get('severity') == 'critical']
    warning_alerts = [a for a in alerts if a.get('severity') == 'warning']

    health_score -= len(critical_alerts) * 15
    health_score -= len(warning_alerts) * 5

    health_score = max(0, health_score)

    # 确定健康状态
    if health_score >= 90:
        health_status = 'excellent'
    elif health_score >= 75:
        health_status = 'good'
    elif health_score >= 50:
        health_status = 'fair'
    elif health_score >= 25:
        health_status = 'poor'
    else:
        health_status = 'critical'

    report = {
        'timestamp': datetime.now().isoformat(),
        'health_score': round(health_score, 1),
        'health_status': health_status,
        'cluster_summary': summary,
        'alerts': {
            'total': len(alerts),
            'critical': len(critical_alerts),
            'warning': len(warning_alerts),
            'details': alerts
        },
        'recommendations': self._generate_recommendations(latest_metrics)
    }

    return {
        'success': True,
        'report': report
    }

def _generate_recommendations(self, metrics: Dict[str, Any]) -> List[str]:
    """生成优化建议"""
    recommendations = []
    summary = metrics['cluster_summary']

    # 内存使用建议
    if summary.get('memory_usage_percent', 0) > 80:
        recommendations.append(
            "Consider increasing memory or implementing data eviction policies"
        )

    # 连接数建议
    if summary.get('total_connections', 0) > 5000:
        recommendations.append(
            "High connection count detected. Consider connection pooling"
        )

    # 节点健康建议
    if summary['healthy_nodes'] < summary['total_nodes']:
        recommendations.append(
            "Some nodes are unhealthy. Check node status and network connectivity"
        )

    # 性能建议
    if summary.get('total_ops_per_second', 0) > 50000:
        recommendations.append(
            "High operation rate. Monitor for potential bottlenecks"
        )

    return recommendations

11.7 总结

Redis集群和高可用性是构建可靠、可扩展Redis服务的关键技术:

11.7.1 核心特性

  1. 主从复制:提供数据冗余和读写分离
  2. Redis Sentinel:自动故障检测和故障转移
  3. Redis Cluster:水平扩展和自动分片
  4. 多数据中心:跨地域高可用性
  5. 负载均衡:请求分发和性能优化

11.7.2 适用场景

  • 主从复制:读多写少的应用
  • Sentinel:需要自动故障转移的中小规模应用
  • Cluster:大规模、高并发的分布式应用
  • 多数据中心:对可用性要求极高的关键业务

11.7.3 最佳实践

  1. 架构设计:根据业务需求选择合适的高可用方案
  2. 监控告警:建立完善的监控和告警机制
  3. 容量规划:合理规划节点数量和资源配置
  4. 故障演练:定期进行故障转移演练
  5. 性能优化:持续监控和优化性能指标

11.7.4 参考资料


下一章将介绍Redis的安全配置和最佳实践。