7.1 RabbitMQ集群架构
7.1.1 集群基础概念
from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Any, Set
import json
import time
import threading
import logging
import subprocess
import requests
from concurrent.futures import ThreadPoolExecutor
class NodeType(Enum):
"""节点类型"""
DISK = "disk" # 磁盘节点
RAM = "ram" # 内存节点
MIXED = "mixed" # 混合节点
class NodeStatus(Enum):
"""节点状态"""
RUNNING = "running"
STOPPED = "stopped"
PARTITIONED = "partitioned"
DOWN = "down"
MAINTENANCE = "maintenance"
class ClusterMode(Enum):
"""集群模式"""
CLASSIC = "classic" # 经典模式
QUORUM = "quorum" # 仲裁队列模式
STREAM = "stream" # 流模式
FEDERATION = "federation" # 联邦模式
@dataclass
class NodeInfo:
"""节点信息"""
name: str
host: str
port: int
node_type: NodeType
status: NodeStatus
memory_used: int
memory_limit: int
disk_free: int
disk_limit: int
uptime: int
erlang_version: str
rabbitmq_version: str
enabled_plugins: List[str]
@dataclass
class ClusterInfo:
"""集群信息"""
name: str
nodes: List[NodeInfo]
total_queues: int
total_exchanges: int
total_connections: int
total_channels: int
cluster_mode: ClusterMode
partition_handling: str
net_ticktime: int
7.1.2 集群配置管理器
class ClusterConfigManager:
"""集群配置管理器"""
def __init__(self, management_api):
self.management_api = management_api
self.logger = logging.getLogger(__name__)
self.config_cache = {}
self.lock = threading.Lock()
def get_cluster_info(self) -> ClusterInfo:
"""获取集群信息"""
try:
# 获取节点信息
nodes_data = self.management_api.get_nodes()
nodes = []
for node_data in nodes_data:
node = NodeInfo(
name=node_data.get('name', ''),
host=node_data.get('host', ''),
port=node_data.get('port', 5672),
node_type=NodeType(node_data.get('type', 'disk')),
status=NodeStatus(node_data.get('running', 'stopped') and 'running' or 'stopped'),
memory_used=node_data.get('mem_used', 0),
memory_limit=node_data.get('mem_limit', 0),
disk_free=node_data.get('disk_free', 0),
disk_limit=node_data.get('disk_free_limit', 0),
uptime=node_data.get('uptime', 0),
erlang_version=node_data.get('erlang_version', ''),
rabbitmq_version=node_data.get('rabbitmq_version', ''),
enabled_plugins=node_data.get('enabled_plugins', [])
)
nodes.append(node)
# 获取集群概览
overview = self.management_api.get_overview()
cluster_info = ClusterInfo(
name=overview.get('cluster_name', 'rabbitmq'),
nodes=nodes,
total_queues=overview.get('object_totals', {}).get('queues', 0),
total_exchanges=overview.get('object_totals', {}).get('exchanges', 0),
total_connections=overview.get('object_totals', {}).get('connections', 0),
total_channels=overview.get('object_totals', {}).get('channels', 0),
cluster_mode=ClusterMode.CLASSIC, # 默认值,实际需要从配置获取
partition_handling=overview.get('partition_handling', 'ignore'),
net_ticktime=overview.get('net_ticktime', 60)
)
return cluster_info
except Exception as e:
self.logger.error(f"获取集群信息失败: {e}")
raise
def add_node_to_cluster(self, node_name: str, target_node: str) -> bool:
"""将节点添加到集群"""
try:
# 停止应用
self._execute_rabbitmqctl_command(node_name, ['stop_app'])
# 重置节点
self._execute_rabbitmqctl_command(node_name, ['reset'])
# 加入集群
self._execute_rabbitmqctl_command(node_name, ['join_cluster', f'rabbit@{target_node}'])
# 启动应用
self._execute_rabbitmqctl_command(node_name, ['start_app'])
self.logger.info(f"节点 {node_name} 成功加入集群")
return True
except Exception as e:
self.logger.error(f"添加节点到集群失败: {e}")
return False
## 7.4 网络分区处理
### 7.4.1 分区检测与处理
```python
class PartitionHandlingMode(Enum):
"""分区处理模式"""
IGNORE = "ignore" # 忽略分区
PAUSE_MINORITY = "pause_minority" # 暂停少数派
PAUSE_IF_ALL_DOWN = "pause_if_all_down" # 全部下线时暂停
AUTOHEAL = "autoheal" # 自动修复
@dataclass
class PartitionInfo:
"""分区信息"""
partition_id: str
nodes: List[str]
leader_node: str
start_time: float
duration: float
status: str # active, healing, resolved
affected_queues: List[str]
affected_exchanges: List[str]
class NetworkPartitionManager:
"""网络分区管理器"""
def __init__(self, cluster_manager: ClusterConfigManager,
handling_mode: PartitionHandlingMode = PartitionHandlingMode.PAUSE_MINORITY):
self.cluster_manager = cluster_manager
self.handling_mode = handling_mode
self.logger = logging.getLogger(__name__)
self.active_partitions: Dict[str, PartitionInfo] = {}
self.partition_history: List[PartitionInfo] = []
self.monitoring = False
self.monitor_thread = None
self.partition_callbacks: List[Callable[[PartitionInfo], None]] = []
self.healing_callbacks: List[Callable[[PartitionInfo], None]] = []
def start_monitoring(self):
"""开始分区监控"""
if self.monitoring:
return
self.monitoring = True
self.monitor_thread = threading.Thread(target=self._monitor_partitions, daemon=True)
self.monitor_thread.start()
self.logger.info("网络分区监控已启动")
def stop_monitoring(self):
"""停止分区监控"""
self.monitoring = False
if self.monitor_thread:
self.monitor_thread.join(timeout=5)
self.logger.info("网络分区监控已停止")
def _monitor_partitions(self):
"""监控网络分区"""
while self.monitoring:
try:
self._detect_partitions()
self._check_partition_healing()
time.sleep(10) # 每10秒检查一次
except Exception as e:
self.logger.error(f"分区监控失败: {e}")
time.sleep(10)
def _detect_partitions(self):
"""检测网络分区"""
try:
cluster_info = self.cluster_manager.get_cluster_info()
# 检查节点连通性
partitioned_nodes = []
for node in cluster_info.nodes:
if node.status == NodeStatus.PARTITIONED:
partitioned_nodes.append(node.name)
if partitioned_nodes:
partition_id = f"partition_{int(time.time())}"
# 创建分区信息
partition_info = PartitionInfo(
partition_id=partition_id,
nodes=partitioned_nodes,
leader_node=self._select_partition_leader(partitioned_nodes),
start_time=time.time(),
duration=0,
status="active",
affected_queues=self._get_affected_queues(partitioned_nodes),
affected_exchanges=self._get_affected_exchanges(partitioned_nodes)
)
self.active_partitions[partition_id] = partition_info
self.logger.warning(f"检测到网络分区: {partition_id}, 节点: {partitioned_nodes}")
# 处理分区
self._handle_partition(partition_info)
# 执行回调
for callback in self.partition_callbacks:
try:
callback(partition_info)
except Exception as e:
self.logger.error(f"分区回调执行失败: {e}")
except Exception as e:
self.logger.error(f"检测网络分区失败: {e}")
def _handle_partition(self, partition_info: PartitionInfo):
"""处理网络分区"""
try:
if self.handling_mode == PartitionHandlingMode.IGNORE:
self.logger.info(f"忽略分区: {partition_info.partition_id}")
return
elif self.handling_mode == PartitionHandlingMode.PAUSE_MINORITY:
self._pause_minority_partition(partition_info)
elif self.handling_mode == PartitionHandlingMode.PAUSE_IF_ALL_DOWN:
self._pause_if_all_down(partition_info)
elif self.handling_mode == PartitionHandlingMode.AUTOHEAL:
self._autoheal_partition(partition_info)
except Exception as e:
self.logger.error(f"处理网络分区失败: {e}")
def _pause_minority_partition(self, partition_info: PartitionInfo):
"""暂停少数派分区"""
try:
cluster_info = self.cluster_manager.get_cluster_info()
total_nodes = len(cluster_info.nodes)
partitioned_nodes = len(partition_info.nodes)
# 如果分区节点是少数派,则暂停这些节点
if partitioned_nodes < total_nodes / 2:
for node_name in partition_info.nodes:
self._pause_node(node_name)
self.logger.info(f"已暂停少数派分区节点: {partition_info.nodes}")
else:
self.logger.warning(f"分区节点不是少数派,无法自动处理")
except Exception as e:
self.logger.error(f"暂停少数派分区失败: {e}")
def _pause_if_all_down(self, partition_info: PartitionInfo):
"""全部下线时暂停"""
try:
cluster_info = self.cluster_manager.get_cluster_info()
running_nodes = [node for node in cluster_info.nodes
if node.status == NodeStatus.RUNNING]
# 如果没有运行的节点,暂停所有节点
if not running_nodes:
for node_name in partition_info.nodes:
self._pause_node(node_name)
self.logger.info(f"所有节点下线,已暂停分区节点: {partition_info.nodes}")
except Exception as e:
self.logger.error(f"全部下线暂停失败: {e}")
def _autoheal_partition(self, partition_info: PartitionInfo):
"""自动修复分区"""
try:
# 尝试重新连接分区节点
for node_name in partition_info.nodes:
self._attempt_node_reconnection(node_name)
self.logger.info(f"尝试自动修复分区: {partition_info.partition_id}")
except Exception as e:
self.logger.error(f"自动修复分区失败: {e}")
def _pause_node(self, node_name: str):
"""暂停节点"""
try:
# 使用rabbitmqctl暂停节点
self.cluster_manager._execute_rabbitmqctl_command(
node_name, ['stop_app']
)
self.logger.info(f"节点 {node_name} 已暂停")
except Exception as e:
self.logger.error(f"暂停节点失败: {e}")
def _attempt_node_reconnection(self, node_name: str):
"""尝试节点重连"""
try:
# 重启节点应用
self.cluster_manager._execute_rabbitmqctl_command(
node_name, ['start_app']
)
self.logger.info(f"尝试重连节点: {node_name}")
except Exception as e:
self.logger.error(f"节点重连失败: {e}")
def _check_partition_healing(self):
"""检查分区修复状态"""
try:
for partition_id, partition_info in list(self.active_partitions.items()):
# 更新分区持续时间
partition_info.duration = time.time() - partition_info.start_time
# 检查分区是否已修复
if self._is_partition_healed(partition_info):
partition_info.status = "resolved"
# 移动到历史记录
self.partition_history.append(partition_info)
del self.active_partitions[partition_id]
self.logger.info(f"分区已修复: {partition_id}")
# 执行修复回调
for callback in self.healing_callbacks:
try:
callback(partition_info)
except Exception as e:
self.logger.error(f"修复回调执行失败: {e}")
except Exception as e:
self.logger.error(f"检查分区修复失败: {e}")
def _is_partition_healed(self, partition_info: PartitionInfo) -> bool:
"""检查分区是否已修复"""
try:
cluster_info = self.cluster_manager.get_cluster_info()
# 检查分区节点是否重新连接
for node_name in partition_info.nodes:
node = next((n for n in cluster_info.nodes if n.name == node_name), None)
if not node or node.status != NodeStatus.RUNNING:
return False
return True
except Exception as e:
self.logger.error(f"检查分区修复状态失败: {e}")
return False
def _select_partition_leader(self, nodes: List[str]) -> str:
"""选择分区领导者"""
# 简单选择第一个节点作为领导者
return nodes[0] if nodes else ""
def _get_affected_queues(self, nodes: List[str]) -> List[str]:
"""获取受影响的队列"""
try:
# 这里应该查询哪些队列在这些节点上
# 简化实现,返回空列表
return []
except Exception as e:
self.logger.error(f"获取受影响队列失败: {e}")
return []
def _get_affected_exchanges(self, nodes: List[str]) -> List[str]:
"""获取受影响的交换机"""
try:
# 这里应该查询哪些交换机在这些节点上
# 简化实现,返回空列表
return []
except Exception as e:
self.logger.error(f"获取受影响交换机失败: {e}")
return []
def force_heal_partition(self, partition_id: str) -> bool:
"""强制修复分区"""
try:
if partition_id not in self.active_partitions:
self.logger.error(f"分区不存在: {partition_id}")
return False
partition_info = self.active_partitions[partition_id]
# 强制重启所有分区节点
for node_name in partition_info.nodes:
self._attempt_node_reconnection(node_name)
self.logger.info(f"强制修复分区: {partition_id}")
return True
except Exception as e:
self.logger.error(f"强制修复分区失败: {e}")
return False
def get_partition_status(self) -> Dict[str, Any]:
"""获取分区状态"""
return {
'handling_mode': self.handling_mode.value,
'monitoring': self.monitoring,
'active_partitions': {
pid: {
'nodes': info.nodes,
'leader_node': info.leader_node,
'start_time': info.start_time,
'duration': info.duration,
'status': info.status,
'affected_queues': info.affected_queues,
'affected_exchanges': info.affected_exchanges
} for pid, info in self.active_partitions.items()
},
'partition_history_count': len(self.partition_history),
'total_partitions_detected': len(self.active_partitions) + len(self.partition_history)
}
def add_partition_callback(self, callback: Callable[[PartitionInfo], None]):
"""添加分区回调"""
self.partition_callbacks.append(callback)
def add_healing_callback(self, callback: Callable[[PartitionInfo], None]):
"""添加修复回调"""
self.healing_callbacks.append(callback)
7.5 完整的集群部署示例
7.5.1 集群部署脚本
class ClusterDeploymentManager:
"""集群部署管理器"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.deployment_config = {}
def deploy_cluster(self, cluster_config: Dict[str, Any]) -> bool:
"""部署RabbitMQ集群"""
try:
self.logger.info("开始部署RabbitMQ集群")
# 1. 验证配置
if not self._validate_cluster_config(cluster_config):
return False
# 2. 准备节点
if not self._prepare_nodes(cluster_config['nodes']):
return False
# 3. 安装RabbitMQ
if not self._install_rabbitmq(cluster_config['nodes']):
return False
# 4. 配置集群
if not self._configure_cluster(cluster_config):
return False
# 5. 启动服务
if not self._start_cluster_services(cluster_config['nodes']):
return False
# 6. 验证集群
if not self._verify_cluster(cluster_config):
return False
# 7. 配置高可用性
if not self._configure_high_availability(cluster_config):
return False
self.logger.info("RabbitMQ集群部署完成")
return True
except Exception as e:
self.logger.error(f"集群部署失败: {e}")
return False
def _validate_cluster_config(self, config: Dict[str, Any]) -> bool:
"""验证集群配置"""
required_fields = ['cluster_name', 'nodes', 'erlang_cookie']
for field in required_fields:
if field not in config:
self.logger.error(f"缺少必需配置: {field}")
return False
if len(config['nodes']) < 1:
self.logger.error("至少需要一个节点")
return False
return True
def _prepare_nodes(self, nodes: List[Dict[str, Any]]) -> bool:
"""准备节点"""
try:
for node in nodes:
self.logger.info(f"准备节点: {node['host']}")
# 检查节点连通性
if not self._check_node_connectivity(node['host']):
return False
# 创建用户和目录
if not self._setup_node_environment(node):
return False
return True
except Exception as e:
self.logger.error(f"准备节点失败: {e}")
return False
def _check_node_connectivity(self, host: str) -> bool:
"""检查节点连通性"""
try:
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
result = sock.connect_ex((host, 22)) # 检查SSH端口
sock.close()
if result == 0:
self.logger.info(f"节点 {host} 连通性正常")
return True
else:
self.logger.error(f"节点 {host} 无法连接")
return False
except Exception as e:
self.logger.error(f"检查节点连通性失败: {e}")
return False
def _setup_node_environment(self, node: Dict[str, Any]) -> bool:
"""设置节点环境"""
try:
# 这里应该使用SSH或其他远程执行工具
# 简化实现,假设本地执行
commands = [
"sudo useradd -r -s /bin/false rabbitmq || true",
"sudo mkdir -p /var/lib/rabbitmq",
"sudo mkdir -p /var/log/rabbitmq",
"sudo mkdir -p /etc/rabbitmq",
"sudo chown -R rabbitmq:rabbitmq /var/lib/rabbitmq",
"sudo chown -R rabbitmq:rabbitmq /var/log/rabbitmq",
"sudo chown -R rabbitmq:rabbitmq /etc/rabbitmq"
]
for cmd in commands:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0 and "already exists" not in result.stderr:
self.logger.error(f"命令执行失败: {cmd}, 错误: {result.stderr}")
return False
self.logger.info(f"节点环境设置完成: {node['host']}")
return True
except Exception as e:
self.logger.error(f"设置节点环境失败: {e}")
return False
def _install_rabbitmq(self, nodes: List[Dict[str, Any]]) -> bool:
"""安装RabbitMQ"""
try:
for node in nodes:
self.logger.info(f"在节点 {node['host']} 安装RabbitMQ")
# 安装Erlang和RabbitMQ
install_commands = [
"sudo apt-get update",
"sudo apt-get install -y erlang-nox",
"wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -",
"echo 'deb https://dl.bintray.com/rabbitmq/debian bionic main' | sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list",
"sudo apt-get update",
"sudo apt-get install -y rabbitmq-server"
]
for cmd in install_commands:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
self.logger.error(f"安装命令失败: {cmd}, 错误: {result.stderr}")
return False
return True
except Exception as e:
self.logger.error(f"安装RabbitMQ失败: {e}")
return False
def _configure_cluster(self, config: Dict[str, Any]) -> bool:
"""配置集群"""
try:
# 设置Erlang Cookie
cookie = config['erlang_cookie']
for node in config['nodes']:
# 写入Erlang Cookie
cookie_path = "/var/lib/rabbitmq/.erlang.cookie"
with open(cookie_path, 'w') as f:
f.write(cookie)
# 设置权限
subprocess.run(f"sudo chown rabbitmq:rabbitmq {cookie_path}", shell=True)
subprocess.run(f"sudo chmod 400 {cookie_path}", shell=True)
# 配置主节点
master_node = config['nodes'][0]
self._configure_master_node(master_node, config)
# 配置从节点
for node in config['nodes'][1:]:
self._configure_slave_node(node, master_node, config)
return True
except Exception as e:
self.logger.error(f"配置集群失败: {e}")
return False
def _configure_master_node(self, master_node: Dict[str, Any], config: Dict[str, Any]):
"""配置主节点"""
try:
# 启动RabbitMQ
subprocess.run("sudo systemctl start rabbitmq-server", shell=True)
subprocess.run("sudo systemctl enable rabbitmq-server", shell=True)
# 启用管理插件
subprocess.run("sudo rabbitmq-plugins enable rabbitmq_management", shell=True)
# 设置集群名称
cluster_name = config['cluster_name']
subprocess.run(f"sudo rabbitmqctl set_cluster_name {cluster_name}", shell=True)
self.logger.info(f"主节点配置完成: {master_node['host']}")
except Exception as e:
self.logger.error(f"配置主节点失败: {e}")
raise
def _configure_slave_node(self, slave_node: Dict[str, Any],
master_node: Dict[str, Any], config: Dict[str, Any]):
"""配置从节点"""
try:
# 启动RabbitMQ
subprocess.run("sudo systemctl start rabbitmq-server", shell=True)
subprocess.run("sudo systemctl enable rabbitmq-server", shell=True)
# 停止应用
subprocess.run("sudo rabbitmqctl stop_app", shell=True)
# 重置节点
subprocess.run("sudo rabbitmqctl reset", shell=True)
# 加入集群
master_host = master_node['host']
subprocess.run(f"sudo rabbitmqctl join_cluster rabbit@{master_host}", shell=True)
# 启动应用
subprocess.run("sudo rabbitmqctl start_app", shell=True)
self.logger.info(f"从节点配置完成: {slave_node['host']}")
except Exception as e:
self.logger.error(f"配置从节点失败: {e}")
raise
def _start_cluster_services(self, nodes: List[Dict[str, Any]]) -> bool:
"""启动集群服务"""
try:
for node in nodes:
# 确保服务运行
result = subprocess.run("sudo systemctl is-active rabbitmq-server",
shell=True, capture_output=True, text=True)
if "active" not in result.stdout:
subprocess.run("sudo systemctl start rabbitmq-server", shell=True)
self.logger.info(f"节点服务已启动: {node['host']}")
return True
except Exception as e:
self.logger.error(f"启动集群服务失败: {e}")
return False
def _verify_cluster(self, config: Dict[str, Any]) -> bool:
"""验证集群"""
try:
# 检查集群状态
result = subprocess.run("sudo rabbitmqctl cluster_status",
shell=True, capture_output=True, text=True)
if result.returncode != 0:
self.logger.error(f"集群状态检查失败: {result.stderr}")
return False
# 验证所有节点都在集群中
cluster_output = result.stdout
for node in config['nodes']:
node_name = f"rabbit@{node['host']}"
if node_name not in cluster_output:
self.logger.error(f"节点 {node_name} 不在集群中")
return False
self.logger.info("集群验证通过")
return True
except Exception as e:
self.logger.error(f"验证集群失败: {e}")
return False
def _configure_high_availability(self, config: Dict[str, Any]) -> bool:
"""配置高可用性"""
try:
# 创建默认的镜像队列策略
ha_policy = {
"pattern": ".*",
"definition": {
"ha-mode": "exactly",
"ha-params": 2,
"ha-sync-mode": "automatic"
},
"priority": 0,
"apply-to": "queues"
}
policy_cmd = f"sudo rabbitmqctl set_policy ha-all '{ha_policy['pattern']}' '{json.dumps(ha_policy['definition'])}'"
result = subprocess.run(policy_cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
self.logger.error(f"设置高可用策略失败: {result.stderr}")
return False
self.logger.info("高可用性配置完成")
return True
except Exception as e:
self.logger.error(f"配置高可用性失败: {e}")
return False
# 使用示例
def deploy_rabbitmq_cluster_example():
"""部署RabbitMQ集群示例"""
# 集群配置
cluster_config = {
"cluster_name": "production-rabbitmq",
"erlang_cookie": "my-secret-cookie-12345",
"nodes": [
{"host": "rabbitmq-node1", "type": "disk"},
{"host": "rabbitmq-node2", "type": "disk"},
{"host": "rabbitmq-node3", "type": "ram"}
]
}
# 创建部署管理器
deployment_manager = ClusterDeploymentManager()
# 部署集群
if deployment_manager.deploy_cluster(cluster_config):
print("集群部署成功")
else:
print("集群部署失败")
if __name__ == "__main__":
deploy_rabbitmq_cluster_example()
7.5.2 完整的高可用性示例
def complete_high_availability_example():
"""完整的高可用性示例"""
# 模拟管理API
class MockManagementAPI:
def get_nodes(self):
return [
{"name": "rabbit@node1", "host": "node1", "running": True, "type": "disk"},
{"name": "rabbit@node2", "host": "node2", "running": True, "type": "disk"},
{"name": "rabbit@node3", "host": "node3", "running": False, "type": "ram"}
]
def get_overview(self):
return {
"cluster_name": "production-cluster",
"object_totals": {"queues": 10, "exchanges": 5, "connections": 20, "channels": 50}
}
def create_policy(self, name, policy_data):
print(f"创建策略: {name} - {policy_data}")
return True
def create_queue(self, vhost, name, queue_data):
print(f"创建队列: {name} - {queue_data}")
return True
# 创建管理API实例
management_api = MockManagementAPI()
# 1. 创建集群配置管理器
cluster_manager = ClusterConfigManager(management_api)
# 获取集群信息
cluster_info = cluster_manager.get_cluster_info()
print(f"集群名称: {cluster_info.name}")
print(f"节点数量: {len(cluster_info.nodes)}")
# 2. 创建高可用性管理器
ha_manager = HighAvailabilityManager(management_api)
# 创建镜像队列策略
mirror_config = MirrorQueueConfig(
name="ha-all-queues",
pattern=".*",
policy=MirrorQueuePolicy.EXACTLY,
ha_params={"count": 2},
ha_sync_mode="automatic",
priority=1
)
ha_manager.create_mirror_queue_policy(mirror_config)
# 3. 创建仲裁队列管理器
quorum_manager = QuorumQueueManager(management_api)
# 创建仲裁队列
quorum_config = QuorumQueueConfig(
name="critical-tasks",
initial_group_size=3,
max_length=10000,
delivery_limit=3
)
quorum_manager.create_quorum_queue(quorum_config)
# 4. 创建负载均衡器
load_balancer = LoadBalancer(LoadBalancingStrategy.LEAST_CONNECTIONS)
# 添加节点
nodes = [
NodeWeight("node1", weight=3, max_connections=100),
NodeWeight("node2", weight=2, max_connections=80),
NodeWeight("node3", weight=1, max_connections=50)
]
for node in nodes:
load_balancer.add_node(node)
# 5. 创建故障转移管理器
failover_config = FailoverConfig(
strategy=FailoverStrategy.GRACEFUL,
health_check_interval=30,
failure_threshold=3,
enable_auto_recovery=True
)
failover_manager = FailoverManager(failover_config, load_balancer, cluster_manager)
# 添加回调函数
def on_failover(failed_node: str, backup_node: str):
print(f"故障转移: {failed_node} -> {backup_node}")
def on_recovery(recovered_node: str):
print(f"节点恢复: {recovered_node}")
failover_manager.add_failover_callback(on_failover)
failover_manager.add_recovery_callback(on_recovery)
# 6. 创建网络分区管理器
partition_manager = NetworkPartitionManager(
cluster_manager,
PartitionHandlingMode.PAUSE_MINORITY
)
# 添加分区回调
def on_partition(partition_info: PartitionInfo):
print(f"检测到网络分区: {partition_info.partition_id}, 节点: {partition_info.nodes}")
def on_partition_healed(partition_info: PartitionInfo):
print(f"分区已修复: {partition_info.partition_id}")
partition_manager.add_partition_callback(on_partition)
partition_manager.add_healing_callback(on_partition_healed)
# 7. 启动监控
failover_manager.start_monitoring()
partition_manager.start_monitoring()
print("高可用性系统已启动")
# 模拟运行一段时间
try:
time.sleep(60) # 运行1分钟
except KeyboardInterrupt:
print("停止监控...")
# 8. 停止监控
failover_manager.stop_monitoring()
partition_manager.stop_monitoring()
# 9. 获取状态报告
cluster_status = cluster_manager.get_cluster_status()
failover_status = failover_manager.get_failover_status()
partition_status = partition_manager.get_partition_status()
node_stats = load_balancer.get_node_stats()
print("\n=== 集群状态报告 ===")
print(f"集群名称: {cluster_status.get('cluster_name')}")
print(f"运行节点: {cluster_status.get('running_nodes')}/{cluster_status.get('total_nodes')}")
print(f"健康比例: {cluster_status.get('health_ratio', 0):.2%}")
print(f"内存使用: {cluster_status.get('memory_usage_ratio', 0):.2%}")
print("\n=== 故障转移状态 ===")
print(f"监控状态: {failover_status.get('monitoring')}")
print(f"失败节点: {failover_status.get('failed_nodes')}")
print("\n=== 分区状态 ===")
print(f"处理模式: {partition_status.get('handling_mode')}")
print(f"活跃分区: {len(partition_status.get('active_partitions', {}))}")
print("\n=== 负载均衡状态 ===")
for node_stat in node_stats:
print(f"节点 {node_stat['node_name']}: 利用率 {node_stat['utilization']:.2%}")
if __name__ == "__main__":
complete_high_availability_example()
7.6 本章总结
7.6.1 核心知识点
集群架构
- 节点类型:磁盘节点、内存节点
- 集群模式:经典模式、仲裁队列模式
- 集群配置与管理
高可用性配置
- 镜像队列策略配置
- 仲裁队列管理
- 队列同步机制
负载均衡与故障转移
- 多种负载均衡策略
- 自动故障检测与转移
- 节点健康监控
网络分区处理
- 分区检测机制
- 多种分区处理策略
- 自动修复功能
7.6.2 最佳实践
集群规划
- 至少使用3个节点确保高可用
- 合理分配磁盘节点和内存节点
- 考虑网络延迟和带宽
高可用配置
- 为关键队列配置镜像或仲裁
- 设置合适的同步策略
- 定期检查集群健康状态
监控与维护
- 实施全面的监控策略
- 设置合理的告警阈值
- 定期进行故障演练
7.6.3 练习题
设计一个3节点RabbitMQ集群的部署方案
实现自定义的负载均衡策略
配置网络分区的自动处理机制
创建监控仪表板显示集群状态
编写集群故障恢复脚本
def remove_node_from_cluster(self, node_name: str) -> bool: “”“从集群中移除节点”“” try: # 停止应用 self._execute_rabbitmqctl_command(node_name, [‘stop_app’])
# 从集群中移除 self._execute_rabbitmqctl_command('', ['forget_cluster_node', f'rabbit@{node_name}']) self.logger.info(f"节点 {node_name} 已从集群中移除") return True except Exception as e: self.logger.error(f"从集群移除节点失败: {e}") return False
def set_cluster_name(self, cluster_name: str) -> bool: “”“设置集群名称”“” try: self._execute_rabbitmqctl_command(“, [‘set_cluster_name’, cluster_name]) self.logger.info(f”集群名称已设置为: {cluster_name}“) return True except Exception as e: self.logger.error(f”设置集群名称失败: {e}“) return False
def _execute_rabbitmqctl_command(self, node_name: str, command: List[str]) -> str: “”“执行rabbitmqctl命令”“” cmd = [‘rabbitmqctl’] if node_name: cmd.extend([‘-n’, f’rabbit@{node_name}‘]) cmd.extend(command)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode != 0: raise Exception(f"命令执行失败: {result.stderr}") return result.stdout
def get_cluster_status(self) -> Dict[str, Any]: “”“获取集群状态”“” try: cluster_info = self.get_cluster_info()
# 计算健康状态 running_nodes = sum(1 for node in cluster_info.nodes if node.status == NodeStatus.RUNNING) total_nodes = len(cluster_info.nodes) health_ratio = running_nodes / total_nodes if total_nodes > 0 else 0 # 计算资源使用情况 total_memory_used = sum(node.memory_used for node in cluster_info.nodes) total_memory_limit = sum(node.memory_limit for node in cluster_info.nodes) memory_usage_ratio = total_memory_used / total_memory_limit if total_memory_limit > 0 else 0 total_disk_free = sum(node.disk_free for node in cluster_info.nodes) total_disk_limit = sum(node.disk_limit for node in cluster_info.nodes) disk_usage_ratio = 1 - (total_disk_free / total_disk_limit) if total_disk_limit > 0 else 0 return { 'cluster_name': cluster_info.name, 'total_nodes': total_nodes, 'running_nodes': running_nodes, 'health_ratio': health_ratio, 'memory_usage_ratio': memory_usage_ratio, 'disk_usage_ratio': disk_usage_ratio, 'total_queues': cluster_info.total_queues, 'total_exchanges': cluster_info.total_exchanges, 'total_connections': cluster_info.total_connections, 'total_channels': cluster_info.total_channels, 'partition_handling': cluster_info.partition_handling, 'nodes': [{ 'name': node.name, 'host': node.host, 'status': node.status.value, 'memory_usage': node.memory_used / node.memory_limit if node.memory_limit > 0 else 0, 'disk_usage': 1 - (node.disk_free / node.disk_limit) if node.disk_limit > 0 else 0, 'uptime': node.uptime } for node in cluster_info.nodes] } except Exception as e: self.logger.error(f"获取集群状态失败: {e}") return {}
## 7.2 高可用性配置 ### 7.2.1 镜像队列配置 ```python class MirrorQueuePolicy(Enum): """镜像队列策略""" ALL = "all" # 所有节点 EXACTLY = "exactly" # 精确数量 NODES = "nodes" # 指定节点 @dataclass class MirrorQueueConfig: """镜像队列配置""" name: str pattern: str policy: MirrorQueuePolicy ha_params: Dict[str, Any] ha_sync_mode: str = "automatic" # automatic, manual ha_sync_batch_size: int = 1 priority: int = 0 apply_to: str = "queues" # queues, exchanges, all class HighAvailabilityManager: """高可用性管理器""" def __init__(self, management_api): self.management_api = management_api self.logger = logging.getLogger(__name__) self.policies = {} def create_mirror_queue_policy(self, config: MirrorQueueConfig) -> bool: """创建镜像队列策略""" try: definition = { "ha-mode": config.policy.value, "ha-sync-mode": config.ha_sync_mode, "ha-sync-batch-size": config.ha_sync_batch_size } # 根据策略类型设置参数 if config.policy == MirrorQueuePolicy.EXACTLY: definition["ha-params"] = config.ha_params.get("count", 2) elif config.policy == MirrorQueuePolicy.NODES: definition["ha-params"] = config.ha_params.get("nodes", []) policy_data = { "pattern": config.pattern, "definition": definition, "priority": config.priority, "apply-to": config.apply_to } success = self.management_api.create_policy(config.name, policy_data) if success: self.policies[config.name] = config self.logger.info(f"镜像队列策略 {config.name} 创建成功") return True else: self.logger.error(f"镜像队列策略 {config.name} 创建失败") return False except Exception as e: self.logger.error(f"创建镜像队列策略失败: {e}") return False def delete_mirror_queue_policy(self, policy_name: str) -> bool: """删除镜像队列策略""" try: success = self.management_api.delete_policy(policy_name) if success: if policy_name in self.policies: del self.policies[policy_name] self.logger.info(f"镜像队列策略 {policy_name} 删除成功") return True else: self.logger.error(f"镜像队列策略 {policy_name} 删除失败") return False except Exception as e: self.logger.error(f"删除镜像队列策略失败: {e}") return False def get_mirror_queue_policies(self) -> List[Dict[str, Any]]: """获取镜像队列策略列表""" try: policies = self.management_api.get_policies() mirror_policies = [] for policy in policies: definition = policy.get('definition', {}) if 'ha-mode' in definition: mirror_policies.append({ 'name': policy.get('name'), 'pattern': policy.get('pattern'), 'ha_mode': definition.get('ha-mode'), 'ha_params': definition.get('ha-params'), 'ha_sync_mode': definition.get('ha-sync-mode'), 'priority': policy.get('priority', 0), 'apply_to': policy.get('apply-to', 'queues') }) return mirror_policies except Exception as e: self.logger.error(f"获取镜像队列策略失败: {e}") return [] def sync_mirror_queue(self, queue_name: str, vhost: str = "/") -> bool: """同步镜像队列""" try: # 使用管理API触发队列同步 sync_data = {"action": "sync"} success = self.management_api.queue_action(vhost, queue_name, sync_data) if success: self.logger.info(f"队列 {queue_name} 同步成功") return True else: self.logger.error(f"队列 {queue_name} 同步失败") return False except Exception as e: self.logger.error(f"同步镜像队列失败: {e}") return False def cancel_mirror_queue_sync(self, queue_name: str, vhost: str = "/") -> bool: """取消镜像队列同步""" try: cancel_data = {"action": "cancel_sync"} success = self.management_api.queue_action(vhost, queue_name, cancel_data) if success: self.logger.info(f"队列 {queue_name} 同步已取消") return True else: self.logger.error(f"队列 {queue_name} 同步取消失败") return False except Exception as e: self.logger.error(f"取消镜像队列同步失败: {e}") return False
7.2.2 仲裁队列管理
@dataclass
class QuorumQueueConfig:
"""仲裁队列配置"""
name: str
durable: bool = True
auto_delete: bool = False
arguments: Dict[str, Any] = None
initial_group_size: int = 3
leader_locator: str = "client-local" # client-local, balanced
max_length: Optional[int] = None
max_length_bytes: Optional[int] = None
overflow: str = "drop-head" # drop-head, reject-publish
dead_letter_exchange: Optional[str] = None
dead_letter_routing_key: Optional[str] = None
message_ttl: Optional[int] = None
delivery_limit: Optional[int] = None
class QuorumQueueManager:
"""仲裁队列管理器"""
def __init__(self, management_api):
self.management_api = management_api
self.logger = logging.getLogger(__name__)
def create_quorum_queue(self, config: QuorumQueueConfig, vhost: str = "/") -> bool:
"""创建仲裁队列"""
try:
arguments = config.arguments or {}
# 设置仲裁队列特有参数
arguments["x-queue-type"] = "quorum"
arguments["x-quorum-initial-group-size"] = config.initial_group_size
arguments["x-queue-leader-locator"] = config.leader_locator
# 设置可选参数
if config.max_length is not None:
arguments["x-max-length"] = config.max_length
if config.max_length_bytes is not None:
arguments["x-max-length-bytes"] = config.max_length_bytes
if config.overflow:
arguments["x-overflow"] = config.overflow
if config.dead_letter_exchange:
arguments["x-dead-letter-exchange"] = config.dead_letter_exchange
if config.dead_letter_routing_key:
arguments["x-dead-letter-routing-key"] = config.dead_letter_routing_key
if config.message_ttl is not None:
arguments["x-message-ttl"] = config.message_ttl
if config.delivery_limit is not None:
arguments["x-delivery-limit"] = config.delivery_limit
queue_data = {
"durable": config.durable,
"auto_delete": config.auto_delete,
"arguments": arguments
}
success = self.management_api.create_queue(vhost, config.name, queue_data)
if success:
self.logger.info(f"仲裁队列 {config.name} 创建成功")
return True
else:
self.logger.error(f"仲裁队列 {config.name} 创建失败")
return False
except Exception as e:
self.logger.error(f"创建仲裁队列失败: {e}")
return False
def get_quorum_queue_info(self, queue_name: str, vhost: str = "/") -> Optional[Dict[str, Any]]:
"""获取仲裁队列信息"""
try:
queue_info = self.management_api.get_queue(vhost, queue_name)
if queue_info and queue_info.get('arguments', {}).get('x-queue-type') == 'quorum':
return {
'name': queue_info.get('name'),
'vhost': queue_info.get('vhost'),
'durable': queue_info.get('durable'),
'auto_delete': queue_info.get('auto_delete'),
'messages': queue_info.get('messages', 0),
'messages_ready': queue_info.get('messages_ready', 0),
'messages_unacknowledged': queue_info.get('messages_unacknowledged', 0),
'consumers': queue_info.get('consumers', 0),
'memory': queue_info.get('memory', 0),
'leader': queue_info.get('leader'),
'members': queue_info.get('members', []),
'online': queue_info.get('online', []),
'state': queue_info.get('state'),
'arguments': queue_info.get('arguments', {})
}
return None
except Exception as e:
self.logger.error(f"获取仲裁队列信息失败: {e}")
return None
def add_quorum_queue_member(self, queue_name: str, node_name: str, vhost: str = "/") -> bool:
"""添加仲裁队列成员"""
try:
action_data = {
"action": "grow",
"node": f"rabbit@{node_name}"
}
success = self.management_api.queue_action(vhost, queue_name, action_data)
if success:
self.logger.info(f"节点 {node_name} 已添加到仲裁队列 {queue_name}")
return True
else:
self.logger.error(f"添加节点到仲裁队列失败")
return False
except Exception as e:
self.logger.error(f"添加仲裁队列成员失败: {e}")
return False
def remove_quorum_queue_member(self, queue_name: str, node_name: str, vhost: str = "/") -> bool:
"""移除仲裁队列成员"""
try:
action_data = {
"action": "shrink",
"node": f"rabbit@{node_name}"
}
success = self.management_api.queue_action(vhost, queue_name, action_data)
if success:
self.logger.info(f"节点 {node_name} 已从仲裁队列 {queue_name} 移除")
return True
else:
self.logger.error(f"从仲裁队列移除节点失败")
return False
except Exception as e:
self.logger.error(f"移除仲裁队列成员失败: {e}")
return False
def delete_quorum_queue_member_data(self, queue_name: str, node_name: str, vhost: str = "/") -> bool:
"""删除仲裁队列成员数据"""
try:
action_data = {
"action": "delete_member",
"node": f"rabbit@{node_name}"
}
success = self.management_api.queue_action(vhost, queue_name, action_data)
if success:
self.logger.info(f"节点 {node_name} 的仲裁队列数据已删除")
return True
else:
self.logger.error(f"删除仲裁队列成员数据失败")
return False
except Exception as e:
self.logger.error(f"删除仲裁队列成员数据失败: {e}")
return False
7.3 负载均衡与故障转移
7.3.1 负载均衡配置
class LoadBalancingStrategy(Enum):
"""负载均衡策略"""
ROUND_ROBIN = "round_robin" # 轮询
LEAST_CONNECTIONS = "least_connections" # 最少连接
WEIGHTED = "weighted" # 加权
RANDOM = "random" # 随机
HASH = "hash" # 哈希
@dataclass
class NodeWeight:
"""节点权重"""
node_name: str
weight: int
max_connections: int
current_connections: int = 0
health_score: float = 1.0
last_check: float = 0
class LoadBalancer:
"""负载均衡器"""
def __init__(self, strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN):
self.strategy = strategy
self.nodes: List[NodeWeight] = []
self.current_index = 0
self.lock = threading.Lock()
self.logger = logging.getLogger(__name__)
def add_node(self, node: NodeWeight):
"""添加节点"""
with self.lock:
self.nodes.append(node)
self.logger.info(f"节点 {node.node_name} 已添加到负载均衡器")
def remove_node(self, node_name: str):
"""移除节点"""
with self.lock:
self.nodes = [node for node in self.nodes if node.node_name != node_name]
self.logger.info(f"节点 {node_name} 已从负载均衡器移除")
def select_node(self, client_id: str = None) -> Optional[NodeWeight]:
"""选择节点"""
with self.lock:
available_nodes = [node for node in self.nodes
if node.health_score > 0.5 and
node.current_connections < node.max_connections]
if not available_nodes:
self.logger.warning("没有可用的节点")
return None
if self.strategy == LoadBalancingStrategy.ROUND_ROBIN:
return self._round_robin_select(available_nodes)
elif self.strategy == LoadBalancingStrategy.LEAST_CONNECTIONS:
return self._least_connections_select(available_nodes)
elif self.strategy == LoadBalancingStrategy.WEIGHTED:
return self._weighted_select(available_nodes)
elif self.strategy == LoadBalancingStrategy.RANDOM:
return self._random_select(available_nodes)
elif self.strategy == LoadBalancingStrategy.HASH:
return self._hash_select(available_nodes, client_id or "")
else:
return available_nodes[0]
def _round_robin_select(self, nodes: List[NodeWeight]) -> NodeWeight:
"""轮询选择"""
if not nodes:
return None
node = nodes[self.current_index % len(nodes)]
self.current_index = (self.current_index + 1) % len(nodes)
return node
def _least_connections_select(self, nodes: List[NodeWeight]) -> NodeWeight:
"""最少连接选择"""
return min(nodes, key=lambda n: n.current_connections)
def _weighted_select(self, nodes: List[NodeWeight]) -> NodeWeight:
"""加权选择"""
import random
total_weight = sum(node.weight * node.health_score for node in nodes)
if total_weight <= 0:
return nodes[0]
random_weight = random.uniform(0, total_weight)
current_weight = 0
for node in nodes:
current_weight += node.weight * node.health_score
if current_weight >= random_weight:
return node
return nodes[-1]
def _random_select(self, nodes: List[NodeWeight]) -> NodeWeight:
"""随机选择"""
import random
return random.choice(nodes)
def _hash_select(self, nodes: List[NodeWeight], client_id: str) -> NodeWeight:
"""哈希选择"""
import hashlib
hash_value = int(hashlib.md5(client_id.encode()).hexdigest(), 16)
index = hash_value % len(nodes)
return nodes[index]
def update_node_connections(self, node_name: str, connections: int):
"""更新节点连接数"""
with self.lock:
for node in self.nodes:
if node.node_name == node_name:
node.current_connections = connections
break
def update_node_health(self, node_name: str, health_score: float):
"""更新节点健康分数"""
with self.lock:
for node in self.nodes:
if node.node_name == node_name:
node.health_score = health_score
node.last_check = time.time()
break
def get_node_stats(self) -> List[Dict[str, Any]]:
"""获取节点统计信息"""
with self.lock:
return [{
'node_name': node.node_name,
'weight': node.weight,
'max_connections': node.max_connections,
'current_connections': node.current_connections,
'health_score': node.health_score,
'last_check': node.last_check,
'utilization': node.current_connections / node.max_connections if node.max_connections > 0 else 0
} for node in self.nodes]
7.3.2 故障转移管理器
class FailoverStrategy(Enum):
"""故障转移策略"""
IMMEDIATE = "immediate" # 立即转移
GRACEFUL = "graceful" # 优雅转移
MANUAL = "manual" # 手动转移
@dataclass
class FailoverConfig:
"""故障转移配置"""
strategy: FailoverStrategy = FailoverStrategy.GRACEFUL
health_check_interval: int = 30
failure_threshold: int = 3
recovery_threshold: int = 2
timeout: int = 60
enable_auto_recovery: bool = True
notification_enabled: bool = True
class FailoverManager:
"""故障转移管理器"""
def __init__(self, config: FailoverConfig, load_balancer: LoadBalancer,
cluster_manager: ClusterConfigManager):
self.config = config
self.load_balancer = load_balancer
self.cluster_manager = cluster_manager
self.logger = logging.getLogger(__name__)
self.node_failures: Dict[str, int] = {}
self.node_recoveries: Dict[str, int] = {}
self.failed_nodes: Set[str] = set()
self.monitoring = False
self.monitor_thread = None
self.failover_callbacks: List[Callable[[str, str], None]] = []
self.recovery_callbacks: List[Callable[[str], None]] = []
def start_monitoring(self):
"""开始监控"""
if self.monitoring:
return
self.monitoring = True
self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
self.monitor_thread.start()
self.logger.info("故障转移监控已启动")
def stop_monitoring(self):
"""停止监控"""
self.monitoring = False
if self.monitor_thread:
self.monitor_thread.join(timeout=5)
self.logger.info("故障转移监控已停止")
def _monitor_loop(self):
"""监控循环"""
while self.monitoring:
try:
self._check_node_health()
time.sleep(self.config.health_check_interval)
except Exception as e:
self.logger.error(f"健康检查失败: {e}")
time.sleep(self.config.health_check_interval)
def _check_node_health(self):
"""检查节点健康状态"""
try:
cluster_info = self.cluster_manager.get_cluster_info()
for node in cluster_info.nodes:
node_name = node.name
is_healthy = node.status == NodeStatus.RUNNING
if is_healthy:
self._handle_healthy_node(node_name)
else:
self._handle_unhealthy_node(node_name)
except Exception as e:
self.logger.error(f"检查节点健康状态失败: {e}")
def _handle_healthy_node(self, node_name: str):
"""处理健康节点"""
# 重置失败计数
if node_name in self.node_failures:
self.node_failures[node_name] = 0
# 检查是否需要恢复
if node_name in self.failed_nodes:
self.node_recoveries[node_name] = self.node_recoveries.get(node_name, 0) + 1
if self.node_recoveries[node_name] >= self.config.recovery_threshold:
self._recover_node(node_name)
def _handle_unhealthy_node(self, node_name: str):
"""处理不健康节点"""
self.node_failures[node_name] = self.node_failures.get(node_name, 0) + 1
# 重置恢复计数
if node_name in self.node_recoveries:
self.node_recoveries[node_name] = 0
# 检查是否需要故障转移
if (self.node_failures[node_name] >= self.config.failure_threshold and
node_name not in self.failed_nodes):
self._trigger_failover(node_name)
def _trigger_failover(self, failed_node: str):
"""触发故障转移"""
try:
self.logger.warning(f"节点 {failed_node} 故障,开始故障转移")
# 标记节点为失败状态
self.failed_nodes.add(failed_node)
# 从负载均衡器移除节点
self.load_balancer.remove_node(failed_node)
# 选择备用节点
backup_node = self._select_backup_node(failed_node)
if backup_node:
self.logger.info(f"故障转移到节点: {backup_node}")
# 执行故障转移回调
for callback in self.failover_callbacks:
try:
callback(failed_node, backup_node)
except Exception as e:
self.logger.error(f"故障转移回调执行失败: {e}")
else:
self.logger.error("没有可用的备用节点")
except Exception as e:
self.logger.error(f"故障转移失败: {e}")
def _recover_node(self, node_name: str):
"""恢复节点"""
try:
if not self.config.enable_auto_recovery:
return
self.logger.info(f"节点 {node_name} 恢复,重新加入集群")
# 从失败节点集合中移除
self.failed_nodes.discard(node_name)
# 重置计数器
self.node_failures[node_name] = 0
self.node_recoveries[node_name] = 0
# 重新添加到负载均衡器
# 这里需要根据实际情况重新创建NodeWeight对象
# 执行恢复回调
for callback in self.recovery_callbacks:
try:
callback(node_name)
except Exception as e:
self.logger.error(f"恢复回调执行失败: {e}")
except Exception as e:
self.logger.error(f"节点恢复失败: {e}")
def _select_backup_node(self, failed_node: str) -> Optional[str]:
"""选择备用节点"""
try:
cluster_info = self.cluster_manager.get_cluster_info()
# 找到健康的节点作为备用
for node in cluster_info.nodes:
if (node.name != failed_node and
node.status == NodeStatus.RUNNING and
node.name not in self.failed_nodes):
return node.name
return None
except Exception as e:
self.logger.error(f"选择备用节点失败: {e}")
return None
def add_failover_callback(self, callback: Callable[[str, str], None]):
"""添加故障转移回调"""
self.failover_callbacks.append(callback)
def add_recovery_callback(self, callback: Callable[[str], None]):
"""添加恢复回调"""
self.recovery_callbacks.append(callback)
def manual_failover(self, source_node: str, target_node: str) -> bool:
"""手动故障转移"""
try:
self.logger.info(f"手动故障转移: {source_node} -> {target_node}")
# 执行故障转移逻辑
self.failed_nodes.add(source_node)
self.load_balancer.remove_node(source_node)
# 执行回调
for callback in self.failover_callbacks:
try:
callback(source_node, target_node)
except Exception as e:
self.logger.error(f"手动故障转移回调执行失败: {e}")
return True
except Exception as e:
self.logger.error(f"手动故障转移失败: {e}")
return False
def get_failover_status(self) -> Dict[str, Any]:
"""获取故障转移状态"""
return {
'monitoring': self.monitoring,
'failed_nodes': list(self.failed_nodes),
'node_failures': dict(self.node_failures),
'node_recoveries': dict(self.node_recoveries),
'config': {
'strategy': self.config.strategy.value,
'health_check_interval': self.config.health_check_interval,
'failure_threshold': self.config.failure_threshold,
'recovery_threshold': self.config.recovery_threshold,
'enable_auto_recovery': self.config.enable_auto_recovery
}
}