11.1 概述
Redis集群和高可用性是构建可扩展、可靠Redis服务的关键技术。本章将介绍Redis Cluster、Redis Sentinel以及各种高可用性架构模式。
11.1.1 高可用性需求
- 服务连续性:确保服务24/7可用
- 数据一致性:保证数据不丢失
- 故障恢复:快速检测和恢复故障
- 水平扩展:支持数据和负载的水平扩展
- 负载均衡:合理分配请求负载
11.1.2 Redis高可用方案对比
方案 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
主从复制 | 简单、读写分离 | 手动故障转移 | 读多写少 |
Sentinel | 自动故障转移 | 单点写入 | 中小规模 |
Cluster | 水平扩展、自动分片 | 复杂性高 | 大规模应用 |
Proxy方案 | 透明分片 | 额外组件 | 平滑迁移 |
11.2 Redis主从复制
11.2.1 主从复制原理
Redis主从复制通过异步复制实现数据同步,支持一主多从的架构。
import redis
import time
import threading
from typing import Dict, List, Any, Optional, Tuple
import logging
import json
from datetime import datetime
import socket
class RedisMasterSlaveManager:
def __init__(self, master_config: Dict[str, Any],
slave_configs: List[Dict[str, Any]]):
self.master_config = master_config
self.slave_configs = slave_configs
self.logger = logging.getLogger(__name__)
# 连接实例
self.master = None
self.slaves = []
self.connect_all()
def connect_all(self) -> Dict[str, Any]:
"""连接所有Redis实例"""
try:
# 连接主节点
self.master = redis.Redis(**self.master_config, decode_responses=True)
self.master.ping() # 测试连接
# 连接从节点
self.slaves = []
for slave_config in self.slave_configs:
slave = redis.Redis(**slave_config, decode_responses=True)
slave.ping() # 测试连接
self.slaves.append(slave)
return {
'success': True,
'master_connected': True,
'slaves_connected': len(self.slaves),
'total_slaves': len(self.slave_configs)
}
except Exception as e:
self.logger.error(f"Failed to connect to Redis instances: {e}")
return {
'success': False,
'error': str(e)
}
def setup_replication(self) -> Dict[str, Any]:
"""设置主从复制"""
try:
results = []
for i, slave in enumerate(self.slaves):
try:
# 配置从节点
slave.slaveof(self.master_config['host'],
self.master_config['port'])
# 如果主节点有密码,配置认证
if 'password' in self.master_config:
slave.config_set('masterauth', self.master_config['password'])
results.append({
'slave_index': i,
'slave_config': self.slave_configs[i],
'replication_setup': True,
'status': 'success'
})
except Exception as e:
results.append({
'slave_index': i,
'slave_config': self.slave_configs[i],
'replication_setup': False,
'status': 'failed',
'error': str(e)
})
successful_slaves = sum(1 for r in results if r['replication_setup'])
return {
'success': successful_slaves > 0,
'total_slaves': len(self.slaves),
'successful_slaves': successful_slaves,
'results': results,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Failed to setup replication: {e}")
return {
'success': False,
'error': str(e)
}
def get_replication_status(self) -> Dict[str, Any]:
"""获取复制状态"""
try:
# 主节点信息
master_info = self.master.info('replication')
master_status = {
'role': master_info.get('role'),
'connected_slaves': master_info.get('connected_slaves', 0),
'master_replid': master_info.get('master_replid'),
'master_repl_offset': master_info.get('master_repl_offset', 0),
'repl_backlog_active': master_info.get('repl_backlog_active', 0),
'repl_backlog_size': master_info.get('repl_backlog_size', 0)
}
# 从节点信息
slaves_status = []
for i, slave in enumerate(self.slaves):
try:
slave_info = slave.info('replication')
slave_status = {
'slave_index': i,
'role': slave_info.get('role'),
'master_host': slave_info.get('master_host'),
'master_port': slave_info.get('master_port'),
'master_link_status': slave_info.get('master_link_status'),
'master_last_io_seconds_ago': slave_info.get('master_last_io_seconds_ago'),
'master_sync_in_progress': slave_info.get('master_sync_in_progress', 0),
'slave_repl_offset': slave_info.get('slave_repl_offset', 0),
'slave_priority': slave_info.get('slave_priority', 100),
'slave_read_only': slave_info.get('slave_read_only', 1),
'connected': True
}
slaves_status.append(slave_status)
except Exception as e:
slaves_status.append({
'slave_index': i,
'connected': False,
'error': str(e)
})
# 计算复制延迟
replication_lag = []
master_offset = master_status['master_repl_offset']
for slave_status in slaves_status:
if slave_status.get('connected') and 'slave_repl_offset' in slave_status:
lag = master_offset - slave_status['slave_repl_offset']
replication_lag.append({
'slave_index': slave_status['slave_index'],
'lag_bytes': lag,
'lag_seconds': slave_status.get('master_last_io_seconds_ago', 0)
})
return {
'success': True,
'master_status': master_status,
'slaves_status': slaves_status,
'replication_lag': replication_lag,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Failed to get replication status: {e}")
return {
'success': False,
'error': str(e)
}
def promote_slave_to_master(self, slave_index: int) -> Dict[str, Any]:
"""提升从节点为主节点"""
if slave_index >= len(self.slaves):
return {
'success': False,
'error': f"Invalid slave index: {slave_index}"
}
try:
new_master = self.slaves[slave_index]
# 停止复制,成为独立的主节点
new_master.slaveof()
# 等待一段时间确保提升完成
time.sleep(1)
# 验证新主节点状态
new_master_info = new_master.info('replication')
if new_master_info.get('role') != 'master':
return {
'success': False,
'error': 'Failed to promote slave to master'
}
# 重新配置其他从节点指向新主节点
new_master_config = self.slave_configs[slave_index]
reconfigured_slaves = []
for i, slave in enumerate(self.slaves):
if i != slave_index:
try:
slave.slaveof(new_master_config['host'],
new_master_config['port'])
reconfigured_slaves.append(i)
except Exception as e:
self.logger.error(f"Failed to reconfigure slave {i}: {e}")
return {
'success': True,
'new_master_index': slave_index,
'new_master_config': new_master_config,
'reconfigured_slaves': reconfigured_slaves,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Failed to promote slave to master: {e}")
return {
'success': False,
'error': str(e)
}
def configure_read_write_split(self) -> Dict[str, Any]:
"""配置读写分离"""
try:
# 配置主节点为读写
master_config = {
'read_operations': ['GET', 'MGET', 'HGET', 'HGETALL', 'LRANGE',
'SMEMBERS', 'ZRANGE', 'EXISTS', 'TTL'],
'write_operations': ['SET', 'MSET', 'HSET', 'LPUSH', 'SADD',
'ZADD', 'DEL', 'EXPIRE'],
'role': 'master',
'allow_reads': True,
'allow_writes': True
}
# 配置从节点为只读
slaves_config = []
for i, slave in enumerate(self.slaves):
# 确保从节点为只读模式
slave.config_set('slave-read-only', 'yes')
slaves_config.append({
'slave_index': i,
'role': 'slave',
'allow_reads': True,
'allow_writes': False,
'read_only': True
})
return {
'success': True,
'master_config': master_config,
'slaves_config': slaves_config,
'read_write_split_enabled': True,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Failed to configure read-write split: {e}")
return {
'success': False,
'error': str(e)
}
### 11.2.2 读写分离客户端
```python
class ReadWriteSplitClient:
def __init__(self, master_config: Dict[str, Any],
slave_configs: List[Dict[str, Any]],
read_strategy: str = 'round_robin'):
self.master_config = master_config
self.slave_configs = slave_configs
self.read_strategy = read_strategy
self.logger = logging.getLogger(__name__)
# 连接池
self.master = redis.Redis(**master_config, decode_responses=True)
self.slaves = [redis.Redis(**config, decode_responses=True)
for config in slave_configs]
# 读取策略相关
self.current_slave_index = 0
self.slave_weights = [1] * len(self.slaves) # 权重
# 操作分类
self.read_operations = {
'GET', 'MGET', 'HGET', 'HGETALL', 'HMGET', 'HKEYS', 'HVALS',
'LRANGE', 'LINDEX', 'LLEN', 'SMEMBERS', 'SCARD', 'SISMEMBER',
'ZRANGE', 'ZREVRANGE', 'ZCARD', 'ZSCORE', 'EXISTS', 'TTL',
'TYPE', 'KEYS', 'SCAN', 'HSCAN', 'SSCAN', 'ZSCAN'
}
self.write_operations = {
'SET', 'MSET', 'HSET', 'HMSET', 'HDEL', 'LPUSH', 'RPUSH',
'LPOP', 'RPOP', 'SADD', 'SREM', 'ZADD', 'ZREM', 'DEL',
'EXPIRE', 'EXPIREAT', 'PERSIST', 'INCR', 'DECR', 'INCRBY',
'DECRBY', 'APPEND'
}
def _select_slave_for_read(self) -> redis.Redis:
"""选择从节点进行读取"""
if not self.slaves:
return self.master
if self.read_strategy == 'round_robin':
slave = self.slaves[self.current_slave_index]
self.current_slave_index = (self.current_slave_index + 1) % len(self.slaves)
return slave
elif self.read_strategy == 'random':
import random
return random.choice(self.slaves)
elif self.read_strategy == 'weighted':
import random
total_weight = sum(self.slave_weights)
if total_weight == 0:
return self.slaves[0]
rand_val = random.uniform(0, total_weight)
current_weight = 0
for i, weight in enumerate(self.slave_weights):
current_weight += weight
if rand_val <= current_weight:
return self.slaves[i]
return self.slaves[-1]
elif self.read_strategy == 'least_connections':
# 简化实现,选择第一个可用的从节点
for slave in self.slaves:
try:
slave.ping()
return slave
except:
continue
return self.master
else:
return self.slaves[0] if self.slaves else self.master
def _is_read_operation(self, command: str) -> bool:
"""判断是否为读操作"""
return command.upper() in self.read_operations
def _is_write_operation(self, command: str) -> bool:
"""判断是否为写操作"""
return command.upper() in self.write_operations
def execute_command(self, command: str, *args, **kwargs) -> Any:
"""执行Redis命令"""
try:
if self._is_read_operation(command):
# 读操作:使用从节点
client = self._select_slave_for_read()
try:
return client.execute_command(command, *args, **kwargs)
except Exception as e:
# 从节点失败,回退到主节点
self.logger.warning(f"Slave read failed, fallback to master: {e}")
return self.master.execute_command(command, *args, **kwargs)
elif self._is_write_operation(command):
# 写操作:使用主节点
return self.master.execute_command(command, *args, **kwargs)
else:
# 未知操作:使用主节点
self.logger.warning(f"Unknown operation type for command: {command}")
return self.master.execute_command(command, *args, **kwargs)
except Exception as e:
self.logger.error(f"Command execution failed: {command}, error: {e}")
raise
def get(self, key: str) -> Any:
"""读取操作"""
return self.execute_command('GET', key)
def set(self, key: str, value: Any, **kwargs) -> bool:
"""写入操作"""
return self.execute_command('SET', key, value, **kwargs)
def hget(self, name: str, key: str) -> Any:
"""哈希读取操作"""
return self.execute_command('HGET', name, key)
def hset(self, name: str, key: str, value: Any) -> int:
"""哈希写入操作"""
return self.execute_command('HSET', name, key, value)
def check_health(self) -> Dict[str, Any]:
"""检查所有节点健康状态"""
health_status = {
'master': {'healthy': False, 'latency_ms': None, 'error': None},
'slaves': []
}
# 检查主节点
try:
start_time = time.time()
self.master.ping()
latency = (time.time() - start_time) * 1000
health_status['master'] = {
'healthy': True,
'latency_ms': round(latency, 2),
'error': None
}
except Exception as e:
health_status['master']['error'] = str(e)
# 检查从节点
for i, slave in enumerate(self.slaves):
slave_status = {
'index': i,
'healthy': False,
'latency_ms': None,
'error': None
}
try:
start_time = time.time()
slave.ping()
latency = (time.time() - start_time) * 1000
slave_status.update({
'healthy': True,
'latency_ms': round(latency, 2)
})
except Exception as e:
slave_status['error'] = str(e)
health_status['slaves'].append(slave_status)
# 计算总体健康状态
healthy_slaves = sum(1 for s in health_status['slaves'] if s['healthy'])
health_status['summary'] = {
'master_healthy': health_status['master']['healthy'],
'total_slaves': len(self.slaves),
'healthy_slaves': healthy_slaves,
'overall_healthy': health_status['master']['healthy'] and healthy_slaves > 0
}
return health_status
def update_slave_weights(self, weights: List[float]) -> Dict[str, Any]:
"""更新从节点权重"""
if len(weights) != len(self.slaves):
return {
'success': False,
'error': f"Weight count ({len(weights)}) doesn't match slave count ({len(self.slaves)})"
}
self.slave_weights = weights
return {
'success': True,
'weights': weights,
'strategy': self.read_strategy
}
## 11.3 Redis Sentinel
### 11.3.1 Sentinel架构和配置
Redis Sentinel提供自动故障检测和故障转移功能。
```python
class RedisSentinelManager:
def __init__(self, sentinel_configs: List[Dict[str, Any]],
service_name: str = 'mymaster'):
self.sentinel_configs = sentinel_configs
self.service_name = service_name
self.logger = logging.getLogger(__name__)
# 创建Sentinel连接
from redis.sentinel import Sentinel
self.sentinel = Sentinel(
[(config['host'], config['port']) for config in sentinel_configs],
decode_responses=True
)
def get_master_client(self) -> redis.Redis:
"""获取主节点客户端"""
return self.sentinel.master_for(self.service_name)
def get_slave_client(self) -> redis.Redis:
"""获取从节点客户端"""
return self.sentinel.slave_for(self.service_name)
def discover_master(self) -> Dict[str, Any]:
"""发现主节点"""
try:
master_info = self.sentinel.discover_master(self.service_name)
return {
'success': True,
'master_host': master_info[0],
'master_port': master_info[1],
'service_name': self.service_name
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def discover_slaves(self) -> Dict[str, Any]:
"""发现从节点"""
try:
slaves_info = self.sentinel.discover_slaves(self.service_name)
slaves = []
for slave_info in slaves_info:
slaves.append({
'host': slave_info[0],
'port': slave_info[1]
})
return {
'success': True,
'slaves': slaves,
'slave_count': len(slaves),
'service_name': self.service_name
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def get_sentinel_status(self) -> Dict[str, Any]:
"""获取Sentinel状态"""
try:
status = {
'sentinels': [],
'master_info': None,
'slaves_info': None
}
# 获取每个Sentinel的状态
for i, config in enumerate(self.sentinel_configs):
try:
sentinel_client = redis.Redis(
host=config['host'],
port=config['port'],
decode_responses=True
)
# 获取Sentinel信息
sentinel_info = sentinel_client.info('sentinel')
# 获取主节点信息
master_info = sentinel_client.execute_command(
'SENTINEL', 'masters'
)
# 获取从节点信息
slaves_info = sentinel_client.execute_command(
'SENTINEL', 'slaves', self.service_name
)
status['sentinels'].append({
'index': i,
'host': config['host'],
'port': config['port'],
'connected': True,
'sentinel_masters': sentinel_info.get('sentinel_masters', 0),
'sentinel_running_scripts': sentinel_info.get('sentinel_running_scripts', 0),
'sentinel_scripts_queue_length': sentinel_info.get('sentinel_scripts_queue_length', 0)
})
if status['master_info'] is None:
status['master_info'] = master_info
if status['slaves_info'] is None:
status['slaves_info'] = slaves_info
except Exception as e:
status['sentinels'].append({
'index': i,
'host': config['host'],
'port': config['port'],
'connected': False,
'error': str(e)
})
# 统计
connected_sentinels = sum(1 for s in status['sentinels'] if s['connected'])
status['summary'] = {
'total_sentinels': len(self.sentinel_configs),
'connected_sentinels': connected_sentinels,
'quorum_available': connected_sentinels >= (len(self.sentinel_configs) // 2 + 1)
}
return {
'success': True,
'status': status,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Failed to get sentinel status: {e}")
return {
'success': False,
'error': str(e)
}
def force_failover(self) -> Dict[str, Any]:
"""强制故障转移"""
try:
# 连接到第一个可用的Sentinel
sentinel_client = None
for config in self.sentinel_configs:
try:
sentinel_client = redis.Redis(
host=config['host'],
port=config['port'],
decode_responses=True
)
sentinel_client.ping()
break
except:
continue
if sentinel_client is None:
return {
'success': False,
'error': 'No available Sentinel found'
}
# 执行故障转移
result = sentinel_client.execute_command(
'SENTINEL', 'failover', self.service_name
)
# 等待故障转移完成
time.sleep(2)
# 获取新的主节点信息
new_master = self.discover_master()
return {
'success': True,
'failover_result': result,
'new_master': new_master,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Failed to force failover: {e}")
return {
'success': False,
'error': str(e)
}
def monitor_failover_events(self, duration_seconds: int = 300) -> Dict[str, Any]:
"""监控故障转移事件"""
events = []
start_time = time.time()
end_time = start_time + duration_seconds
try:
# 连接到Sentinel进行事件监听
sentinel_client = None
for config in self.sentinel_configs:
try:
sentinel_client = redis.Redis(
host=config['host'],
port=config['port'],
decode_responses=True
)
sentinel_client.ping()
break
except:
continue
if sentinel_client is None:
return {
'success': False,
'error': 'No available Sentinel found'
}
# 订阅Sentinel事件
pubsub = sentinel_client.pubsub()
pubsub.subscribe('__sentinel__:hello')
while time.time() < end_time:
try:
message = pubsub.get_message(timeout=1)
if message and message['type'] == 'message':
events.append({
'timestamp': datetime.now().isoformat(),
'channel': message['channel'],
'data': message['data'],
'type': message['type']
})
except:
continue
pubsub.close()
return {
'success': True,
'events': events,
'event_count': len(events),
'monitoring_duration': duration_seconds,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Failed to monitor failover events: {e}")
return {
'success': False,
'error': str(e),
'partial_events': events
}
### 11.3.2 Sentinel高可用客户端
```python
class SentinelAwareClient:
def __init__(self, sentinel_configs: List[Dict[str, Any]],
service_name: str = 'mymaster',
retry_attempts: int = 3,
retry_delay: float = 1.0):
self.sentinel_configs = sentinel_configs
self.service_name = service_name
self.retry_attempts = retry_attempts
self.retry_delay = retry_delay
self.logger = logging.getLogger(__name__)
# 创建Sentinel连接
from redis.sentinel import Sentinel
self.sentinel = Sentinel(
[(config['host'], config['port']) for config in sentinel_configs],
decode_responses=True
)
# 缓存连接
self._master_client = None
self._slave_client = None
self._last_master_check = 0
self._master_check_interval = 30 # 30秒检查一次主节点
def _get_master_client(self, force_refresh: bool = False) -> redis.Redis:
"""获取主节点客户端(带缓存)"""
current_time = time.time()
if (force_refresh or
self._master_client is None or
current_time - self._last_master_check > self._master_check_interval):
try:
self._master_client = self.sentinel.master_for(self.service_name)
self._last_master_check = current_time
except Exception as e:
self.logger.error(f"Failed to get master client: {e}")
if self._master_client is None:
raise
return self._master_client
def _get_slave_client(self) -> redis.Redis:
"""获取从节点客户端"""
if self._slave_client is None:
try:
self._slave_client = self.sentinel.slave_for(self.service_name)
except Exception as e:
self.logger.warning(f"Failed to get slave client, using master: {e}")
return self._get_master_client()
return self._slave_client
def _execute_with_retry(self, operation: str, client_type: str = 'master',
*args, **kwargs) -> Any:
"""带重试的命令执行"""
last_exception = None
for attempt in range(self.retry_attempts):
try:
if client_type == 'master':
client = self._get_master_client(force_refresh=attempt > 0)
else:
client = self._get_slave_client()
return client.execute_command(operation, *args, **kwargs)
except Exception as e:
last_exception = e
self.logger.warning(
f"Attempt {attempt + 1} failed for {operation}: {e}"
)
if attempt < self.retry_attempts - 1:
time.sleep(self.retry_delay * (2 ** attempt)) # 指数退避
# 强制刷新连接
if client_type == 'master':
self._master_client = None
else:
self._slave_client = None
raise last_exception
def set(self, key: str, value: Any, **kwargs) -> bool:
"""设置键值(写操作)"""
return self._execute_with_retry('SET', 'master', key, value, **kwargs)
def get(self, key: str) -> Any:
"""获取键值(读操作,优先使用从节点)"""
try:
return self._execute_with_retry('GET', 'slave', key)
except Exception as e:
self.logger.warning(f"Slave read failed, fallback to master: {e}")
return self._execute_with_retry('GET', 'master', key)
def hset(self, name: str, key: str, value: Any) -> int:
"""哈希设置(写操作)"""
return self._execute_with_retry('HSET', 'master', name, key, value)
def hget(self, name: str, key: str) -> Any:
"""哈希获取(读操作)"""
try:
return self._execute_with_retry('HGET', 'slave', name, key)
except Exception as e:
self.logger.warning(f"Slave read failed, fallback to master: {e}")
return self._execute_with_retry('HGET', 'master', name, key)
def ping(self) -> Dict[str, Any]:
"""健康检查"""
status = {
'master': {'available': False, 'latency_ms': None, 'error': None},
'slave': {'available': False, 'latency_ms': None, 'error': None},
'sentinels': []
}
# 检查主节点
try:
start_time = time.time()
master_client = self._get_master_client()
master_client.ping()
latency = (time.time() - start_time) * 1000
status['master'] = {
'available': True,
'latency_ms': round(latency, 2),
'error': None
}
except Exception as e:
status['master']['error'] = str(e)
# 检查从节点
try:
start_time = time.time()
slave_client = self._get_slave_client()
slave_client.ping()
latency = (time.time() - start_time) * 1000
status['slave'] = {
'available': True,
'latency_ms': round(latency, 2),
'error': None
}
except Exception as e:
status['slave']['error'] = str(e)
# 检查Sentinel节点
for i, config in enumerate(self.sentinel_configs):
sentinel_status = {
'index': i,
'host': config['host'],
'port': config['port'],
'available': False,
'latency_ms': None,
'error': None
}
try:
start_time = time.time()
sentinel_client = redis.Redis(
host=config['host'],
port=config['port'],
decode_responses=True
)
sentinel_client.ping()
latency = (time.time() - start_time) * 1000
sentinel_status.update({
'available': True,
'latency_ms': round(latency, 2)
})
except Exception as e:
sentinel_status['error'] = str(e)
status['sentinels'].append(sentinel_status)
# 计算总体状态
available_sentinels = sum(1 for s in status['sentinels'] if s['available'])
status['summary'] = {
'master_available': status['master']['available'],
'slave_available': status['slave']['available'],
'available_sentinels': available_sentinels,
'total_sentinels': len(self.sentinel_configs),
'quorum_available': available_sentinels >= (len(self.sentinel_configs) // 2 + 1),
'overall_healthy': (status['master']['available'] and
available_sentinels >= (len(self.sentinel_configs) // 2 + 1))
}
return status
def get_current_topology(self) -> Dict[str, Any]:
"""获取当前拓扑结构"""
try:
master_info = self.sentinel.discover_master(self.service_name)
slaves_info = self.sentinel.discover_slaves(self.service_name)
return {
'success': True,
'service_name': self.service_name,
'master': {
'host': master_info[0],
'port': master_info[1]
},
'slaves': [
{'host': slave[0], 'port': slave[1]}
for slave in slaves_info
],
'slave_count': len(slaves_info),
'timestamp': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
# 使用示例
if __name__ == "__main__":
# 主从复制示例
master_config = {'host': 'localhost', 'port': 6379}
slave_configs = [
{'host': 'localhost', 'port': 6380},
{'host': 'localhost', 'port': 6381}
]
# 设置主从复制
ms_manager = RedisMasterSlaveManager(master_config, slave_configs)
replication_result = ms_manager.setup_replication()
print(f"Replication setup: {replication_result}")
# 读写分离客户端
rw_client = ReadWriteSplitClient(master_config, slave_configs)
# 写入数据(主节点)
rw_client.set('test_key', 'test_value')
# 读取数据(从节点)
value = rw_client.get('test_key')
print(f"Read value: {value}")
# 健康检查
health = rw_client.check_health()
print(f"Health status: {health}")
# Sentinel示例
sentinel_configs = [
{'host': 'localhost', 'port': 26379},
{'host': 'localhost', 'port': 26380},
{'host': 'localhost', 'port': 26381}
]
# Sentinel管理器
sentinel_manager = RedisSentinelManager(sentinel_configs)
# 发现主节点
master_discovery = sentinel_manager.discover_master()
print(f"Master discovery: {master_discovery}")
# Sentinel感知客户端
sentinel_client = SentinelAwareClient(sentinel_configs)
# 使用Sentinel客户端
sentinel_client.set('sentinel_key', 'sentinel_value')
value = sentinel_client.get('sentinel_key')
print(f"Sentinel client value: {value}")
# 获取拓扑结构
topology = sentinel_client.get_current_topology()
print(f"Current topology: {topology}")
11.4 Redis Cluster
11.4.1 Cluster架构和分片
Redis Cluster提供自动数据分片和高可用性。
from rediscluster import RedisCluster
import hashlib
class RedisClusterManager:
def __init__(self, startup_nodes: List[Dict[str, Any]],
decode_responses: bool = True):
self.startup_nodes = startup_nodes
self.decode_responses = decode_responses
self.logger = logging.getLogger(__name__)
# 创建集群连接
self.cluster = RedisCluster(
startup_nodes=startup_nodes,
decode_responses=decode_responses,
skip_full_coverage_check=True
)
def get_cluster_info(self) -> Dict[str, Any]:
"""获取集群信息"""
try:
# 获取集群节点信息
cluster_nodes = self.cluster.cluster_nodes()
# 解析节点信息
nodes = []
masters = []
slaves = []
for node_id, node_info in cluster_nodes.items():
node_data = {
'node_id': node_id,
'host': node_info['host'],
'port': node_info['port'],
'flags': node_info['flags'],
'master_id': node_info.get('master'),
'slots': node_info.get('slots', []),
'is_master': 'master' in node_info['flags'],
'is_slave': 'slave' in node_info['flags']
}
nodes.append(node_data)
if node_data['is_master']:
masters.append(node_data)
elif node_data['is_slave']:
slaves.append(node_data)
# 获取集群状态
cluster_info = self.cluster.cluster_info()
return {
'success': True,
'cluster_state': cluster_info.get('cluster_state'),
'cluster_slots_assigned': cluster_info.get('cluster_slots_assigned'),
'cluster_slots_ok': cluster_info.get('cluster_slots_ok'),
'cluster_slots_pfail': cluster_info.get('cluster_slots_pfail'),
'cluster_slots_fail': cluster_info.get('cluster_slots_fail'),
'cluster_known_nodes': cluster_info.get('cluster_known_nodes'),
'cluster_size': cluster_info.get('cluster_size'),
'nodes': nodes,
'masters': masters,
'slaves': slaves,
'master_count': len(masters),
'slave_count': len(slaves),
'total_nodes': len(nodes),
'timestamp': datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Failed to get cluster info: {e}")
return {
'success': False,
'error': str(e)
}
def get_slot_distribution(self) -> Dict[str, Any]:
"""获取槽位分布"""
try:
cluster_info = self.get_cluster_info()
if not cluster_info['success']:
return cluster_info
slot_distribution = {}
total_slots = 16384
assigned_slots = 0
for master in cluster_info['masters']:
node_key = f"{master['host']}:{master['port']}"
slots = master['slots']
slot_count = 0
# 计算槽位数量
for slot_range in slots:
if isinstance(slot_range, list) and len(slot_range) == 2:
slot_count += slot_range[1] - slot_range[0] + 1
else:
slot_count += 1
assigned_slots += slot_count
slot_distribution[node_key] = {
'node_id': master['node_id'],
'host': master['host'],
'port': master['port'],
'slot_ranges': slots,
'slot_count': slot_count,
'slot_percentage': round((slot_count / total_slots) * 100, 2)
}
return {
'success': True,
'total_slots': total_slots,
'assigned_slots': assigned_slots,
'unassigned_slots': total_slots - assigned_slots,
'slot_distribution': slot_distribution,
'is_fully_covered': assigned_slots == total_slots,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Failed to get slot distribution: {e}")
return {
'success': False,
'error': str(e)
}
def calculate_key_slot(self, key: str) -> int:
"""计算键的槽位"""
# Redis Cluster使用CRC16算法计算槽位
import crc16
return crc16.crc16xmodem(key.encode()) % 16384
def find_key_node(self, key: str) -> Dict[str, Any]:
"""查找键所在的节点"""
try:
slot = self.calculate_key_slot(key)
cluster_info = self.get_cluster_info()
if not cluster_info['success']:
return cluster_info
# 查找负责该槽位的主节点
for master in cluster_info['masters']:
for slot_range in master['slots']:
if isinstance(slot_range, list) and len(slot_range) == 2:
if slot_range[0] <= slot <= slot_range[1]:
return {
'success': True,
'key': key,
'slot': slot,
'node': {
'node_id': master['node_id'],
'host': master['host'],
'port': master['port']
}
}
elif slot_range == slot:
return {
'success': True,
'key': key,
'slot': slot,
'node': {
'node_id': master['node_id'],
'host': master['host'],
'port': master['port']
}
}
return {
'success': False,
'error': f"No node found for slot {slot}",
'key': key,
'slot': slot
}
except Exception as e:
self.logger.error(f"Failed to find key node: {e}")
return {
'success': False,
'error': str(e),
'key': key
}
def rebalance_slots(self, target_distribution: Dict[str, int] = None) -> Dict[str, Any]:
"""重新平衡槽位分布"""
try:
cluster_info = self.get_cluster_info()
if not cluster_info['success']:
return cluster_info
masters = cluster_info['masters']
total_slots = 16384
if target_distribution is None:
# 平均分配
slots_per_master = total_slots // len(masters)
target_distribution = {}
for i, master in enumerate(masters):
node_key = f"{master['host']}:{master['port']}"
if i == len(masters) - 1:
# 最后一个节点分配剩余的槽位
target_distribution[node_key] = total_slots - (slots_per_master * i)
else:
target_distribution[node_key] = slots_per_master
# 计算需要移动的槽位
current_distribution = self.get_slot_distribution()
if not current_distribution['success']:
return current_distribution
moves = []
for node_key, target_slots in target_distribution.items():
current_slots = current_distribution['slot_distribution'].get(node_key, {}).get('slot_count', 0)
difference = target_slots - current_slots
if difference != 0:
moves.append({
'node': node_key,
'current_slots': current_slots,
'target_slots': target_slots,
'difference': difference,
'action': 'receive' if difference > 0 else 'give'
})
return {
'success': True,
'current_distribution': current_distribution['slot_distribution'],
'target_distribution': target_distribution,
'required_moves': moves,
'rebalance_needed': len(moves) > 0,
'note': 'This is a plan. Actual slot migration requires cluster management tools.',
'timestamp': datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Failed to rebalance slots: {e}")
return {
'success': False,
'error': str(e)
}
def monitor_cluster_health(self) -> Dict[str, Any]:
"""监控集群健康状态"""
try:
cluster_info = self.get_cluster_info()
if not cluster_info['success']:
return cluster_info
health_status = {
'overall_health': 'healthy',
'issues': [],
'node_status': [],
'slot_coverage': None,
'replication_status': None
}
# 检查节点状态
for node in cluster_info['nodes']:
node_status = {
'node_id': node['node_id'],
'host': node['host'],
'port': node['port'],
'is_master': node['is_master'],
'is_slave': node['is_slave'],
'healthy': True,
'issues': []
}
# 检查节点标志
if 'fail' in node['flags']:
node_status['healthy'] = False
node_status['issues'].append('Node marked as failed')
health_status['issues'].append(f"Node {node['host']}:{node['port']} is failed")
if 'pfail' in node['flags']:
node_status['issues'].append('Node possibly failing')
health_status['issues'].append(f"Node {node['host']}:{node['port']} is possibly failing")
health_status['node_status'].append(node_status)
# 检查槽位覆盖
slot_distribution = self.get_slot_distribution()
if slot_distribution['success']:
health_status['slot_coverage'] = {
'fully_covered': slot_distribution['is_fully_covered'],
'assigned_slots': slot_distribution['assigned_slots'],
'unassigned_slots': slot_distribution['unassigned_slots']
}
if not slot_distribution['is_fully_covered']:
health_status['overall_health'] = 'degraded'
health_status['issues'].append(
f"{slot_distribution['unassigned_slots']} slots are not assigned"
)
# 检查复制状态
master_count = cluster_info['master_count']
slave_count = cluster_info['slave_count']
health_status['replication_status'] = {
'master_count': master_count,
'slave_count': slave_count,
'replication_factor': slave_count / master_count if master_count > 0 else 0
}
if slave_count == 0:
health_status['overall_health'] = 'at_risk'
health_status['issues'].append('No slave nodes for replication')
elif slave_count < master_count:
health_status['issues'].append('Some masters have no slaves')
# 设置总体健康状态
if health_status['issues']:
if health_status['overall_health'] == 'healthy':
health_status['overall_health'] = 'warning'
return {
'success': True,
'health_status': health_status,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Failed to monitor cluster health: {e}")
return {
'success': False,
'error': str(e)
}
### 11.4.2 Cluster客户端
```python
class ClusterAwareClient:
def __init__(self, startup_nodes: List[Dict[str, Any]],
retry_attempts: int = 3,
retry_delay: float = 1.0):
self.startup_nodes = startup_nodes
self.retry_attempts = retry_attempts
self.retry_delay = retry_delay
self.logger = logging.getLogger(__name__)
# 创建集群连接
self.cluster = RedisCluster(
startup_nodes=startup_nodes,
decode_responses=True,
skip_full_coverage_check=True,
max_connections=32,
retry_on_timeout=True
)
# 性能统计
self.stats = {
'total_operations': 0,
'successful_operations': 0,
'failed_operations': 0,
'redirections': 0,
'cross_slot_operations': 0
}
def _execute_with_retry(self, operation: callable, *args, **kwargs) -> Any:
"""带重试的操作执行"""
last_exception = None
for attempt in range(self.retry_attempts):
try:
self.stats['total_operations'] += 1
result = operation(*args, **kwargs)
self.stats['successful_operations'] += 1
return result
except Exception as e:
last_exception = e
self.stats['failed_operations'] += 1
# 检查是否为重定向错误
if 'MOVED' in str(e) or 'ASK' in str(e):
self.stats['redirections'] += 1
self.logger.warning(
f"Attempt {attempt + 1} failed: {e}"
)
if attempt < self.retry_attempts - 1:
time.sleep(self.retry_delay * (2 ** attempt))
raise last_exception
def set(self, key: str, value: Any, **kwargs) -> bool:
"""设置键值"""
return self._execute_with_retry(self.cluster.set, key, value, **kwargs)
def get(self, key: str) -> Any:
"""获取键值"""
return self._execute_with_retry(self.cluster.get, key)
def mset(self, mapping: Dict[str, Any]) -> bool:
"""批量设置(处理跨槽位问题)"""
try:
# 按槽位分组
slot_groups = {}
for key, value in mapping.items():
slot = self._calculate_slot(key)
if slot not in slot_groups:
slot_groups[slot] = {}
slot_groups[slot][key] = value
if len(slot_groups) > 1:
self.stats['cross_slot_operations'] += 1
# 跨槽位操作,分别执行
for slot, group_mapping in slot_groups.items():
self._execute_with_retry(self.cluster.mset, group_mapping)
return True
else:
# 同槽位操作
return self._execute_with_retry(self.cluster.mset, mapping)
except Exception as e:
self.logger.error(f"Failed to execute mset: {e}")
raise
def mget(self, keys: List[str]) -> List[Any]:
"""批量获取(处理跨槽位问题)"""
try:
# 按槽位分组
slot_groups = {}
key_to_slot = {}
for key in keys:
slot = self._calculate_slot(key)
key_to_slot[key] = slot
if slot not in slot_groups:
slot_groups[slot] = []
slot_groups[slot].append(key)
if len(slot_groups) > 1:
self.stats['cross_slot_operations'] += 1
# 跨槽位操作,分别获取后合并
all_results = {}
for slot, group_keys in slot_groups.items():
group_results = self._execute_with_retry(self.cluster.mget, group_keys)
for i, key in enumerate(group_keys):
all_results[key] = group_results[i]
# 按原始顺序返回结果
return [all_results[key] for key in keys]
else:
# 同槽位操作
return self._execute_with_retry(self.cluster.mget, keys)
except Exception as e:
self.logger.error(f"Failed to execute mget: {e}")
raise
def _calculate_slot(self, key: str) -> int:
"""计算键的槽位"""
import crc16
return crc16.crc16xmodem(key.encode()) % 16384
def pipeline_execute(self, commands: List[Dict[str, Any]]) -> List[Any]:
"""管道执行(按槽位分组)"""
try:
# 按槽位分组命令
slot_groups = {}
for i, cmd in enumerate(commands):
key = cmd.get('key')
if key:
slot = self._calculate_slot(key)
if slot not in slot_groups:
slot_groups[slot] = []
slot_groups[slot].append((i, cmd))
if len(slot_groups) > 1:
self.stats['cross_slot_operations'] += 1
# 执行分组的命令
all_results = [None] * len(commands)
for slot, group_commands in slot_groups.items():
# 为每个槽位创建管道
pipe = self.cluster.pipeline()
for original_index, cmd in group_commands:
operation = cmd['operation']
args = cmd.get('args', [])
kwargs = cmd.get('kwargs', {})
getattr(pipe, operation)(*args, **kwargs)
# 执行管道
group_results = pipe.execute()
# 将结果放回原始位置
for i, (original_index, _) in enumerate(group_commands):
all_results[original_index] = group_results[i]
return all_results
except Exception as e:
self.logger.error(f"Failed to execute pipeline: {e}")
raise
def get_stats(self) -> Dict[str, Any]:
"""获取性能统计"""
total_ops = self.stats['total_operations']
if total_ops == 0:
success_rate = 0
redirection_rate = 0
cross_slot_rate = 0
else:
success_rate = (self.stats['successful_operations'] / total_ops) * 100
redirection_rate = (self.stats['redirections'] / total_ops) * 100
cross_slot_rate = (self.stats['cross_slot_operations'] / total_ops) * 100
return {
'total_operations': total_ops,
'successful_operations': self.stats['successful_operations'],
'failed_operations': self.stats['failed_operations'],
'redirections': self.stats['redirections'],
'cross_slot_operations': self.stats['cross_slot_operations'],
'success_rate': round(success_rate, 2),
'redirection_rate': round(redirection_rate, 2),
'cross_slot_rate': round(cross_slot_rate, 2)
}
def reset_stats(self) -> None:
"""重置统计信息"""
self.stats = {
'total_operations': 0,
'successful_operations': 0,
'failed_operations': 0,
'redirections': 0,
'cross_slot_operations': 0
}
## 11.5 高可用性架构模式
### 11.5.1 多数据中心部署
```python
class MultiDataCenterRedis:
def __init__(self, datacenters: Dict[str, Dict[str, Any]]):
self.datacenters = datacenters
self.logger = logging.getLogger(__name__)
self.clients = {}
self.primary_dc = None
# 初始化各数据中心的连接
self._initialize_connections()
def _initialize_connections(self) -> None:
"""初始化数据中心连接"""
for dc_name, dc_config in self.datacenters.items():
try:
if dc_config['type'] == 'cluster':
client = RedisCluster(
startup_nodes=dc_config['nodes'],
decode_responses=True
)
elif dc_config['type'] == 'sentinel':
from redis.sentinel import Sentinel
sentinel = Sentinel(dc_config['sentinels'])
client = sentinel.master_for(dc_config['service_name'])
else:
client = redis.Redis(**dc_config['connection'])
self.clients[dc_name] = {
'client': client,
'config': dc_config,
'healthy': True,
'latency': 0
}
# 设置主数据中心
if dc_config.get('primary', False):
self.primary_dc = dc_name
except Exception as e:
self.logger.error(f"Failed to connect to datacenter {dc_name}: {e}")
self.clients[dc_name] = {
'client': None,
'config': dc_config,
'healthy': False,
'error': str(e)
}
def write_with_replication(self, key: str, value: Any,
consistency_level: str = 'eventual') -> Dict[str, Any]:
"""跨数据中心写入"""
results = {}
primary_success = False
try:
# 首先写入主数据中心
if self.primary_dc and self.clients[self.primary_dc]['healthy']:
try:
primary_client = self.clients[self.primary_dc]['client']
primary_client.set(key, value)
primary_success = True
results[self.primary_dc] = {'success': True, 'role': 'primary'}
except Exception as e:
results[self.primary_dc] = {
'success': False,
'role': 'primary',
'error': str(e)
}
# 根据一致性级别决定是否等待副本写入
if consistency_level == 'strong' and not primary_success:
return {
'success': False,
'error': 'Primary datacenter write failed',
'results': results
}
# 异步写入其他数据中心
replica_writes = []
for dc_name, dc_info in self.clients.items():
if dc_name != self.primary_dc and dc_info['healthy']:
try:
dc_info['client'].set(key, value)
results[dc_name] = {'success': True, 'role': 'replica'}
replica_writes.append(dc_name)
except Exception as e:
results[dc_name] = {
'success': False,
'role': 'replica',
'error': str(e)
}
# 评估写入结果
successful_writes = sum(1 for r in results.values() if r['success'])
total_dcs = len(self.clients)
success = False
if consistency_level == 'eventual':
success = primary_success or successful_writes > 0
elif consistency_level == 'majority':
success = successful_writes > (total_dcs // 2)
elif consistency_level == 'strong':
success = primary_success and successful_writes == total_dcs
return {
'success': success,
'consistency_level': consistency_level,
'primary_success': primary_success,
'successful_writes': successful_writes,
'total_datacenters': total_dcs,
'results': results,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Failed to write with replication: {e}")
return {
'success': False,
'error': str(e),
'results': results
}
def read_with_fallback(self, key: str,
prefer_local: bool = True) -> Dict[str, Any]:
"""带故障转移的读取"""
read_order = []
if prefer_local and self.primary_dc:
read_order.append(self.primary_dc)
read_order.extend([dc for dc in self.clients.keys()
if dc != self.primary_dc])
else:
read_order = list(self.clients.keys())
for dc_name in read_order:
dc_info = self.clients[dc_name]
if not dc_info['healthy']:
continue
try:
start_time = time.time()
value = dc_info['client'].get(key)
latency = (time.time() - start_time) * 1000
return {
'success': True,
'value': value,
'datacenter': dc_name,
'latency_ms': round(latency, 2),
'fallback_used': dc_name != read_order[0]
}
except Exception as e:
self.logger.warning(f"Read failed from {dc_name}: {e}")
continue
return {
'success': False,
'error': 'All datacenters failed',
'attempted_datacenters': read_order
}
def check_datacenter_health(self) -> Dict[str, Any]:
"""检查数据中心健康状态"""
health_results = {}
for dc_name, dc_info in self.clients.items():
if dc_info['client'] is None:
health_results[dc_name] = {
'healthy': False,
'error': dc_info.get('error', 'No client available')
}
continue
try:
start_time = time.time()
dc_info['client'].ping()
latency = (time.time() - start_time) * 1000
# 更新客户端状态
dc_info['healthy'] = True
dc_info['latency'] = latency
health_results[dc_name] = {
'healthy': True,
'latency_ms': round(latency, 2),
'role': 'primary' if dc_name == self.primary_dc else 'replica'
}
except Exception as e:
dc_info['healthy'] = False
health_results[dc_name] = {
'healthy': False,
'error': str(e),
'role': 'primary' if dc_name == self.primary_dc else 'replica'
}
# 计算总体健康状态
healthy_dcs = sum(1 for h in health_results.values() if h['healthy'])
primary_healthy = (self.primary_dc and
health_results.get(self.primary_dc, {}).get('healthy', False))
return {
'datacenters': health_results,
'summary': {
'total_datacenters': len(self.clients),
'healthy_datacenters': healthy_dcs,
'primary_datacenter': self.primary_dc,
'primary_healthy': primary_healthy,
'overall_healthy': healthy_dcs > 0,
'high_availability': healthy_dcs > 1
},
'timestamp': datetime.now().isoformat()
}
def failover_to_datacenter(self, target_dc: str) -> Dict[str, Any]:
"""故障转移到指定数据中心"""
if target_dc not in self.clients:
return {
'success': False,
'error': f"Datacenter {target_dc} not found"
}
if not self.clients[target_dc]['healthy']:
return {
'success': False,
'error': f"Target datacenter {target_dc} is not healthy"
}
old_primary = self.primary_dc
self.primary_dc = target_dc
# 更新配置
if old_primary:
self.clients[old_primary]['config']['primary'] = False
self.clients[target_dc]['config']['primary'] = True
return {
'success': True,
'old_primary': old_primary,
'new_primary': target_dc,
'timestamp': datetime.now().isoformat()
}
### 11.5.2 负载均衡和代理
```python
class RedisLoadBalancer:
def __init__(self, redis_nodes: List[Dict[str, Any]],
strategy: str = 'round_robin'):
self.redis_nodes = redis_nodes
self.strategy = strategy
self.logger = logging.getLogger(__name__)
# 初始化连接池
self.connections = []
self.node_weights = []
self.current_index = 0
self._initialize_connections()
# 健康检查
self.health_check_interval = 30
self.last_health_check = 0
def _initialize_connections(self) -> None:
"""初始化连接"""
for i, node_config in enumerate(self.redis_nodes):
try:
if node_config.get('type') == 'cluster':
client = RedisCluster(
startup_nodes=node_config['startup_nodes'],
decode_responses=True
)
else:
client = redis.Redis(**node_config['connection'], decode_responses=True)
self.connections.append({
'client': client,
'config': node_config,
'healthy': True,
'weight': node_config.get('weight', 1),
'current_connections': 0,
'total_requests': 0,
'failed_requests': 0
})
self.node_weights.append(node_config.get('weight', 1))
except Exception as e:
self.logger.error(f"Failed to connect to node {i}: {e}")
self.connections.append({
'client': None,
'config': node_config,
'healthy': False,
'error': str(e)
})
self.node_weights.append(0)
def _select_node(self) -> Dict[str, Any]:
"""根据策略选择节点"""
healthy_nodes = [conn for conn in self.connections if conn['healthy']]
if not healthy_nodes:
raise Exception("No healthy nodes available")
if self.strategy == 'round_robin':
node = healthy_nodes[self.current_index % len(healthy_nodes)]
self.current_index += 1
return node
elif self.strategy == 'weighted_round_robin':
total_weight = sum(conn['weight'] for conn in healthy_nodes)
if total_weight == 0:
return healthy_nodes[0]
import random
rand_val = random.uniform(0, total_weight)
current_weight = 0
for conn in healthy_nodes:
current_weight += conn['weight']
if rand_val <= current_weight:
return conn
return healthy_nodes[-1]
elif self.strategy == 'least_connections':
return min(healthy_nodes, key=lambda x: x['current_connections'])
elif self.strategy == 'least_response_time':
# 简化实现,选择失败请求最少的节点
return min(healthy_nodes,
key=lambda x: x['failed_requests'] / max(x['total_requests'], 1))
else:
return healthy_nodes[0]
def execute_command(self, command: str, *args, **kwargs) -> Any:
"""执行命令"""
# 健康检查
self._periodic_health_check()
node = self._select_node()
node['current_connections'] += 1
node['total_requests'] += 1
try:
result = node['client'].execute_command(command, *args, **kwargs)
return result
except Exception as e:
node['failed_requests'] += 1
self.logger.error(f"Command failed on node: {e}")
# 尝试其他节点
healthy_nodes = [conn for conn in self.connections
if conn['healthy'] and conn != node]
if healthy_nodes:
fallback_node = healthy_nodes[0]
fallback_node['current_connections'] += 1
fallback_node['total_requests'] += 1
try:
result = fallback_node['client'].execute_command(command, *args, **kwargs)
return result
except Exception as fallback_error:
fallback_node['failed_requests'] += 1
raise fallback_error
finally:
fallback_node['current_connections'] -= 1
raise e
finally:
node['current_connections'] -= 1
def _periodic_health_check(self) -> None:
"""定期健康检查"""
current_time = time.time()
if current_time - self.last_health_check < self.health_check_interval:
return
self.last_health_check = current_time
for conn in self.connections:
if conn['client'] is None:
continue
try:
conn['client'].ping()
conn['healthy'] = True
except Exception as e:
conn['healthy'] = False
self.logger.warning(f"Node health check failed: {e}")
def get_load_balancer_stats(self) -> Dict[str, Any]:
"""获取负载均衡统计"""
stats = {
'strategy': self.strategy,
'total_nodes': len(self.connections),
'healthy_nodes': sum(1 for conn in self.connections if conn['healthy']),
'nodes': []
}
for i, conn in enumerate(self.connections):
node_stats = {
'index': i,
'healthy': conn['healthy'],
'weight': conn.get('weight', 1),
'current_connections': conn.get('current_connections', 0),
'total_requests': conn.get('total_requests', 0),
'failed_requests': conn.get('failed_requests', 0),
'success_rate': 0
}
if node_stats['total_requests'] > 0:
success_requests = node_stats['total_requests'] - node_stats['failed_requests']
node_stats['success_rate'] = round(
(success_requests / node_stats['total_requests']) * 100, 2
)
if 'error' in conn:
node_stats['error'] = conn['error']
stats['nodes'].append(node_stats)
return stats
# 使用示例
if __name__ == "__main__":
# Redis Cluster示例
startup_nodes = [
{'host': 'localhost', 'port': 7000},
{'host': 'localhost', 'port': 7001},
{'host': 'localhost', 'port': 7002}
]
# 集群管理器
cluster_manager = RedisClusterManager(startup_nodes)
# 获取集群信息
cluster_info = cluster_manager.get_cluster_info()
print(f"Cluster info: {cluster_info}")
# 获取槽位分布
slot_distribution = cluster_manager.get_slot_distribution()
print(f"Slot distribution: {slot_distribution}")
# 集群感知客户端
cluster_client = ClusterAwareClient(startup_nodes)
# 设置和获取数据
cluster_client.set('cluster_key', 'cluster_value')
value = cluster_client.get('cluster_key')
print(f"Cluster value: {value}")
# 批量操作
mapping = {'key1': 'value1', 'key2': 'value2', 'key3': 'value3'}
cluster_client.mset(mapping)
values = cluster_client.mget(['key1', 'key2', 'key3'])
print(f"Batch values: {values}")
# 获取统计信息
stats = cluster_client.get_stats()
print(f"Client stats: {stats}")
# 多数据中心示例
datacenters = {
'dc1': {
'type': 'single',
'connection': {'host': 'localhost', 'port': 6379},
'primary': True
},
'dc2': {
'type': 'single',
'connection': {'host': 'localhost', 'port': 6380},
'primary': False
}
}
# 多数据中心管理器
mdc_redis = MultiDataCenterRedis(datacenters)
# 跨数据中心写入
write_result = mdc_redis.write_with_replication(
'mdc_key', 'mdc_value', consistency_level='majority'
)
print(f"Multi-DC write: {write_result}")
# 带故障转移的读取
read_result = mdc_redis.read_with_fallback('mdc_key')
print(f"Multi-DC read: {read_result}")
# 负载均衡示例
redis_nodes = [
{
'connection': {'host': 'localhost', 'port': 6379},
'weight': 3
},
{
'connection': {'host': 'localhost', 'port': 6380},
'weight': 2
},
{
'connection': {'host': 'localhost', 'port': 6381},
'weight': 1
}
]
# 负载均衡器
load_balancer = RedisLoadBalancer(redis_nodes, strategy='weighted_round_robin')
# 执行命令
for i in range(10):
load_balancer.execute_command('SET', f'lb_key_{i}', f'lb_value_{i}')
# 获取负载均衡统计
lb_stats = load_balancer.get_load_balancer_stats()
print(f"Load balancer stats: {lb_stats}")
11.6 性能监控和调优
11.6.1 集群性能监控
”`python class RedisClusterMonitor: def init(self, cluster_nodes: List[Dict[str, Any]]): self.cluster_nodes = cluster_nodes self.logger = logging.getLogger(name)
# 创建到各节点的连接
self.node_clients = {}
self._initialize_node_connections()
# 监控指标
self.metrics_history = []
self.alert_thresholds = {
'memory_usage_percent': 80,
'cpu_usage_percent': 80,
'connection_count': 1000,
'ops_per_second': 10000,
'replication_lag_seconds': 10
}
def _initialize_node_connections(self) -> None:
"""初始化节点连接"""
for node in self.cluster_nodes:
node_key = f"{node['host']}:{node['port']}"
try:
client = redis.Redis(
host=node['host'],
port=node['port'],
decode_responses=True
)
client.ping()
self.node_clients[node_key] = client
except Exception as e:
self.logger.error(f"Failed to connect to node {node_key}: {e}")
def collect_cluster_metrics(self) -> Dict[str, Any]:
"""收集集群指标"""
timestamp = datetime.now()
cluster_metrics = {
'timestamp': timestamp.isoformat(),
'nodes': {},
'cluster_summary': {
'total_nodes': len(self.cluster_nodes),
'healthy_nodes': 0,
'total_memory_used': 0,
'total_memory_max': 0,
'total_connections': 0,
'total_ops_per_second': 0,
'average_cpu_usage': 0
},
'alerts': []
}
cpu_usage_sum = 0
healthy_nodes = 0
for node_key, client in self.node_clients.items():
try:
# 获取节点信息
info = client.info()
# 内存信息
memory_used = info.get('used_memory', 0)
memory_max = info.get('maxmemory', 0)
memory_usage_percent = 0
if memory_max > 0:
memory_usage_percent = (memory_used / memory_max) * 100
# 连接信息
connected_clients = info.get('connected_clients', 0)
# 操作统计
ops_per_second = info.get('instantaneous_ops_per_sec', 0)
# CPU使用率(简化计算)
cpu_usage = info.get('used_cpu_sys', 0) + info.get('used_cpu_user', 0)
# 复制信息
replication_info = client.info('replication')
role = replication_info.get('role', 'unknown')
replication_lag = 0
if role == 'slave':
replication_lag = replication_info.get('master_last_io_seconds_ago', 0)
node_metrics = {
'node_key': node_key,
'role': role,
'healthy': True,
'memory': {
'used_bytes': memory_used,
'max_bytes': memory_max,
'usage_percent': round(memory_usage_percent, 2)
},
'connections': {
'connected_clients': connected_clients
},
'performance': {
'ops_per_second': ops_per_second,
'cpu_usage_percent': round(cpu_usage, 2)
},
'replication': {
'role': role,
'lag_seconds': replication_lag
}
}
# 检查告警阈值
alerts = self._check_node_alerts(node_key, node_metrics)
cluster_metrics['alerts'].extend(alerts)
cluster_metrics['nodes'][node_key] = node_metrics
# 累计统计
healthy_nodes += 1
cluster_metrics['cluster_summary']['total_memory_used'] += memory_used
cluster_metrics['cluster_summary']['total_memory_max'] += memory_max
cluster_metrics['cluster_summary']['total_connections'] += connected_clients
cluster_metrics['cluster_summary']['total_ops_per_second'] += ops_per_second
cpu_usage_sum += cpu_usage
except Exception as e:
self.logger.error(f"Failed to collect metrics from {node_key}: {e}")
cluster_metrics['nodes'][node_key] = {
'node_key': node_key,
'healthy': False,
'error': str(e)
}
# 计算集群汇总
cluster_metrics['cluster_summary']['healthy_nodes'] = healthy_nodes
if healthy_nodes > 0:
cluster_metrics['cluster_summary']['average_cpu_usage'] = round(
cpu_usage_sum / healthy_nodes, 2
)
# 计算集群内存使用率
total_memory_max = cluster_metrics['cluster_summary']['total_memory_max']
if total_memory_max > 0:
cluster_usage_percent = (
cluster_metrics['cluster_summary']['total_memory_used'] / total_memory_max
) * 100
cluster_metrics['cluster_summary']['memory_usage_percent'] = round(
cluster_usage_percent, 2
)
# 保存历史记录
self.metrics_history.append(cluster_metrics)
# 只保留最近100条记录
if len(self.metrics_history) > 100:
self.metrics_history = self.metrics_history[-100:]
return cluster_metrics
def _check_node_alerts(self, node_key: str, node_metrics: Dict[str, Any]) -> List[Dict[str, Any]]:
"""检查节点告警"""
alerts = []
# 内存使用率告警
memory_usage = node_metrics['memory']['usage_percent']
if memory_usage > self.alert_thresholds['memory_usage_percent']:
alerts.append({
'type': 'memory_usage_high',
'node': node_key,
'severity': 'warning' if memory_usage < 90 else 'critical',
'message': f"Memory usage {memory_usage}% exceeds threshold",
'value': memory_usage,
'threshold': self.alert_thresholds['memory_usage_percent']
})
# CPU使用率告警
cpu_usage = node_metrics['performance']['cpu_usage_percent']
if cpu_usage > self.alert_thresholds['cpu_usage_percent']:
alerts.append({
'type': 'cpu_usage_high',
'node': node_key,
'severity': 'warning' if cpu_usage < 90 else 'critical',
'message': f"CPU usage {cpu_usage}% exceeds threshold",
'value': cpu_usage,
'threshold': self.alert_thresholds['cpu_usage_percent']
})
# 连接数告警
connections = node_metrics['connections']['connected_clients']
if connections > self.alert_thresholds['connection_count']:
alerts.append({
'type': 'connection_count_high',
'node': node_key,
'severity': 'warning',
'message': f"Connection count {connections} exceeds threshold",
'value': connections,
'threshold': self.alert_thresholds['connection_count']
})
# 复制延迟告警
if node_metrics['replication']['role'] == 'slave':
lag = node_metrics['replication']['lag_seconds']
if lag > self.alert_thresholds['replication_lag_seconds']:
alerts.append({
'type': 'replication_lag_high',
'node': node_key,
'severity': 'warning' if lag < 30 else 'critical',
'message': f"Replication lag {lag}s exceeds threshold",
'value': lag,
'threshold': self.alert_thresholds['replication_lag_seconds']
})
return alerts
def get_performance_trends(self, hours: int = 24) -> Dict[str, Any]:
"""获取性能趋势"""
if not self.metrics_history:
return {'success': False, 'error': 'No metrics history available'}
# 计算时间范围
end_time = datetime.now()
start_time = end_time - timedelta(hours=hours)
# 过滤历史数据
filtered_metrics = []
for metrics in self.metrics_history:
metrics_time = datetime.fromisoformat(metrics['timestamp'])
if start_time <= metrics_time <= end_time:
filtered_metrics.append(metrics)
if not filtered_metrics:
return {'success': False, 'error': 'No metrics in specified time range'}
# 计算趋势
trends = {
'time_range': {
'start': start_time.isoformat(),
'end': end_time.isoformat(),
'hours': hours
},
'cluster_trends': {
'memory_usage': [],
'total_connections': [],
'total_ops_per_second': [],
'healthy_nodes': []
},
'node_trends': {}
}
for metrics in filtered_metrics:
timestamp = metrics['timestamp']
summary = metrics['cluster_summary']
# 集群趋势
trends['cluster_trends']['memory_usage'].append({
'timestamp': timestamp,
'value': summary.get('memory_usage_percent', 0)
})
trends['cluster_trends']['total_connections'].append({
'timestamp': timestamp,
'value': summary.get('total_connections', 0)
})
trends['cluster_trends']['total_ops_per_second'].append({
'timestamp': timestamp,
'value': summary.get('total_ops_per_second', 0)
})
trends['cluster_trends']['healthy_nodes'].append({
'timestamp': timestamp,
'value': summary.get('healthy_nodes', 0)
})
# 节点趋势
for node_key, node_metrics in metrics['nodes'].items():
if node_key not in trends['node_trends']:
trends['node_trends'][node_key] = {
'memory_usage': [],
'connections': [],
'ops_per_second': []
}
if node_metrics.get('healthy', False):
trends['node_trends'][node_key]['memory_usage'].append({
'timestamp': timestamp,
'value': node_metrics['memory']['usage_percent']
})
trends['node_trends'][node_key]['connections'].append({
'timestamp': timestamp,
'value': node_metrics['connections']['connected_clients']
})
trends['node_trends'][node_key]['ops_per_second'].append({
'timestamp': timestamp,
'value': node_metrics['performance']['ops_per_second']
})
return {
'success': True,
'trends': trends,
'data_points': len(filtered_metrics)
}
def generate_health_report(self) -> Dict[str, Any]:
"""生成健康报告"""
latest_metrics = self.collect_cluster_metrics()
if not latest_metrics:
return {'success': False, 'error': 'Failed to collect metrics'}
summary = latest_metrics['cluster_summary']
alerts = latest_metrics['alerts']
# 计算健康分数
health_score = 100
# 节点健康度影响
if summary['total_nodes'] > 0:
node_health_ratio = summary['healthy_nodes'] / summary['total_nodes']
if node_health_ratio < 1.0:
health_score -= (1.0 - node_health_ratio) * 30
# 告警影响
critical_alerts = [a for a in alerts if a.get('severity') == 'critical']
warning_alerts = [a for a in alerts if a.get('severity') == 'warning']
health_score -= len(critical_alerts) * 15
health_score -= len(warning_alerts) * 5
health_score = max(0, health_score)
# 确定健康状态
if health_score >= 90:
health_status = 'excellent'
elif health_score >= 75:
health_status = 'good'
elif health_score >= 50:
health_status = 'fair'
elif health_score >= 25:
health_status = 'poor'
else:
health_status = 'critical'
report = {
'timestamp': datetime.now().isoformat(),
'health_score': round(health_score, 1),
'health_status': health_status,
'cluster_summary': summary,
'alerts': {
'total': len(alerts),
'critical': len(critical_alerts),
'warning': len(warning_alerts),
'details': alerts
},
'recommendations': self._generate_recommendations(latest_metrics)
}
return {
'success': True,
'report': report
}
def _generate_recommendations(self, metrics: Dict[str, Any]) -> List[str]:
"""生成优化建议"""
recommendations = []
summary = metrics['cluster_summary']
# 内存使用建议
if summary.get('memory_usage_percent', 0) > 80:
recommendations.append(
"Consider increasing memory or implementing data eviction policies"
)
# 连接数建议
if summary.get('total_connections', 0) > 5000:
recommendations.append(
"High connection count detected. Consider connection pooling"
)
# 节点健康建议
if summary['healthy_nodes'] < summary['total_nodes']:
recommendations.append(
"Some nodes are unhealthy. Check node status and network connectivity"
)
# 性能建议
if summary.get('total_ops_per_second', 0) > 50000:
recommendations.append(
"High operation rate. Monitor for potential bottlenecks"
)
return recommendations
11.7 总结
Redis集群和高可用性是构建可靠、可扩展Redis服务的关键技术:
11.7.1 核心特性
- 主从复制:提供数据冗余和读写分离
- Redis Sentinel:自动故障检测和故障转移
- Redis Cluster:水平扩展和自动分片
- 多数据中心:跨地域高可用性
- 负载均衡:请求分发和性能优化
11.7.2 适用场景
- 主从复制:读多写少的应用
- Sentinel:需要自动故障转移的中小规模应用
- Cluster:大规模、高并发的分布式应用
- 多数据中心:对可用性要求极高的关键业务
11.7.3 最佳实践
- 架构设计:根据业务需求选择合适的高可用方案
- 监控告警:建立完善的监控和告警机制
- 容量规划:合理规划节点数量和资源配置
- 故障演练:定期进行故障转移演练
- 性能优化:持续监控和优化性能指标
11.7.4 参考资料
下一章将介绍Redis的安全配置和最佳实践。