学习目标

通过本章学习,你将能够: - 理解 Nacos 集群架构和部署模式 - 掌握 Nacos 集群的安装和配置 - 实现 Nacos 的高可用部署 - 配置负载均衡和故障转移 - 监控集群状态和性能

1. 集群架构概述

1.1 集群架构设计

from enum import Enum
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Set
import time
import threading
import random
import hashlib
from datetime import datetime

class NodeStatus(Enum):
    """节点状态枚举"""
    HEALTHY = "HEALTHY"          # 健康
    UNHEALTHY = "UNHEALTHY"      # 不健康
    STARTING = "STARTING"        # 启动中
    STOPPING = "STOPPING"        # 停止中
    DOWN = "DOWN"                # 宕机
    MAINTENANCE = "MAINTENANCE"  # 维护中

class NodeRole(Enum):
    """节点角色枚举"""
    LEADER = "LEADER"            # 领导者
    FOLLOWER = "FOLLOWER"        # 跟随者
    CANDIDATE = "CANDIDATE"      # 候选者
    OBSERVER = "OBSERVER"        # 观察者

class ClusterMode(Enum):
    """集群模式枚举"""
    STANDALONE = "STANDALONE"    # 单机模式
    CLUSTER = "CLUSTER"          # 集群模式
    EMBEDDED = "EMBEDDED"        # 嵌入式模式

class HealthCheckType(Enum):
    """健康检查类型枚举"""
    TCP = "TCP"                  # TCP 检查
    HTTP = "HTTP"                # HTTP 检查
    MYSQL = "MYSQL"              # MySQL 检查
    CUSTOM = "CUSTOM"            # 自定义检查

@dataclass
class ClusterNode:
    """集群节点信息"""
    node_id: str                 # 节点 ID
    ip: str                      # IP 地址
    port: int                    # 端口
    role: NodeRole = NodeRole.FOLLOWER
    status: NodeStatus = NodeStatus.STARTING
    weight: float = 1.0          # 权重
    last_heartbeat: float = field(default_factory=time.time)
    start_time: float = field(default_factory=time.time)
    version: str = "2.2.0"       # Nacos 版本
    metadata: Dict[str, str] = field(default_factory=dict)
    
    def get_address(self) -> str:
        """获取节点地址"""
        return f"{self.ip}:{self.port}"
    
    def is_healthy(self, timeout: float = 30.0) -> bool:
        """检查节点是否健康"""
        if self.status == NodeStatus.DOWN:
            return False
        
        # 检查心跳超时
        current_time = time.time()
        return (current_time - self.last_heartbeat) < timeout
    
    def update_heartbeat(self):
        """更新心跳时间"""
        self.last_heartbeat = time.time()
        if self.status == NodeStatus.STARTING:
            self.status = NodeStatus.HEALTHY

@dataclass
class ClusterConfig:
    """集群配置"""
    cluster_name: str = "nacos-cluster"
    mode: ClusterMode = ClusterMode.CLUSTER
    min_nodes: int = 3           # 最小节点数
    max_nodes: int = 10          # 最大节点数
    heartbeat_interval: float = 5.0  # 心跳间隔(秒)
    election_timeout: float = 15.0   # 选举超时(秒)
    health_check_interval: float = 10.0  # 健康检查间隔
    data_sync_interval: float = 30.0     # 数据同步间隔
    enable_auth: bool = True     # 启用认证
    enable_ssl: bool = False     # 启用 SSL
    backup_count: int = 2        # 备份数量

@dataclass
class ClusterMetrics:
    """集群指标"""
    total_nodes: int = 0
    healthy_nodes: int = 0
    leader_node: Optional[str] = None
    total_services: int = 0
    total_instances: int = 0
    total_configs: int = 0
    qps: float = 0.0             # 每秒查询数
    avg_response_time: float = 0.0  # 平均响应时间
    memory_usage: float = 0.0    # 内存使用率
    cpu_usage: float = 0.0       # CPU 使用率
    disk_usage: float = 0.0      # 磁盘使用率
    network_in: float = 0.0      # 网络入流量
    network_out: float = 0.0     # 网络出流量
    last_update: float = field(default_factory=time.time)

class NacosClusterManager:
    """Nacos 集群管理器"""
    
    def __init__(self, config: ClusterConfig):
        self.config = config
        self.nodes: Dict[str, ClusterNode] = {}
        self.leader_node: Optional[str] = None
        self.metrics = ClusterMetrics()
        self._lock = threading.RLock()
        self._running = False
        self._threads = []
        
    def start_cluster(self):
        """启动集群"""
        with self._lock:
            if self._running:
                print("⚠️ 集群已经在运行中")
                return
            
            self._running = True
            
            # 启动心跳线程
            heartbeat_thread = threading.Thread(target=self._heartbeat_worker, daemon=True)
            heartbeat_thread.start()
            self._threads.append(heartbeat_thread)
            
            # 启动健康检查线程
            health_thread = threading.Thread(target=self._health_check_worker, daemon=True)
            health_thread.start()
            self._threads.append(health_thread)
            
            # 启动领导者选举线程
            election_thread = threading.Thread(target=self._leader_election_worker, daemon=True)
            election_thread.start()
            self._threads.append(election_thread)
            
            # 启动指标收集线程
            metrics_thread = threading.Thread(target=self._metrics_worker, daemon=True)
            metrics_thread.start()
            self._threads.append(metrics_thread)
            
            print(f"✅ Nacos 集群 '{self.config.cluster_name}' 启动成功")
    
    def stop_cluster(self):
        """停止集群"""
        with self._lock:
            if not self._running:
                print("⚠️ 集群未在运行")
                return
            
            self._running = False
            
            # 等待线程结束
            for thread in self._threads:
                if thread.is_alive():
                    thread.join(timeout=5)
            
            print(f"✅ Nacos 集群 '{self.config.cluster_name}' 已停止")
    
    def add_node(self, node: ClusterNode) -> bool:
        """添加节点"""
        with self._lock:
            if len(self.nodes) >= self.config.max_nodes:
                print(f"❌ 集群节点数已达上限: {self.config.max_nodes}")
                return False
            
            if node.node_id in self.nodes:
                print(f"❌ 节点已存在: {node.node_id}")
                return False
            
            self.nodes[node.node_id] = node
            print(f"✅ 节点添加成功: {node.node_id} ({node.get_address()})")
            
            # 触发领导者选举
            if len(self.nodes) >= self.config.min_nodes and not self.leader_node:
                self._trigger_leader_election()
            
            return True
    
    def remove_node(self, node_id: str) -> bool:
        """移除节点"""
        with self._lock:
            if node_id not in self.nodes:
                print(f"❌ 节点不存在: {node_id}")
                return False
            
            node = self.nodes[node_id]
            node.status = NodeStatus.STOPPING
            
            # 如果是领导者节点,触发重新选举
            if self.leader_node == node_id:
                self.leader_node = None
                self._trigger_leader_election()
            
            del self.nodes[node_id]
            print(f"✅ 节点移除成功: {node_id}")
            
            return True
    
    def get_healthy_nodes(self) -> List[ClusterNode]:
        """获取健康节点列表"""
        with self._lock:
            healthy_nodes = []
            for node in self.nodes.values():
                if node.is_healthy() and node.status == NodeStatus.HEALTHY:
                    healthy_nodes.append(node)
            return healthy_nodes
    
    def get_leader_node(self) -> Optional[ClusterNode]:
        """获取领导者节点"""
        with self._lock:
            if self.leader_node and self.leader_node in self.nodes:
                return self.nodes[self.leader_node]
            return None
    
    def _heartbeat_worker(self):
        """心跳工作线程"""
        while self._running:
            try:
                with self._lock:
                    current_time = time.time()
                    for node in self.nodes.values():
                        # 模拟心跳更新
                        if node.status == NodeStatus.HEALTHY:
                            # 随机模拟心跳丢失
                            if random.random() > 0.05:  # 95% 成功率
                                node.update_heartbeat()
                
                time.sleep(self.config.heartbeat_interval)
            except Exception as e:
                print(f"❌ 心跳工作线程异常: {e}")
                time.sleep(1)
    
    def _health_check_worker(self):
        """健康检查工作线程"""
        while self._running:
            try:
                with self._lock:
                    unhealthy_nodes = []
                    for node_id, node in self.nodes.items():
                        if not node.is_healthy():
                            if node.status == NodeStatus.HEALTHY:
                                node.status = NodeStatus.UNHEALTHY
                                unhealthy_nodes.append(node_id)
                                print(f"⚠️ 节点变为不健康: {node_id}")
                    
                    # 如果领导者不健康,触发重新选举
                    if self.leader_node in unhealthy_nodes:
                        print(f"⚠️ 领导者节点不健康,触发重新选举: {self.leader_node}")
                        self.leader_node = None
                        self._trigger_leader_election()
                
                time.sleep(self.config.health_check_interval)
            except Exception as e:
                print(f"❌ 健康检查工作线程异常: {e}")
                time.sleep(1)
    
    def _leader_election_worker(self):
        """领导者选举工作线程"""
        while self._running:
            try:
                with self._lock:
                    if not self.leader_node:
                        healthy_nodes = self.get_healthy_nodes()
                        if len(healthy_nodes) >= self.config.min_nodes:
                            # 简单的选举算法:选择节点 ID 最小的健康节点
                            leader_candidate = min(healthy_nodes, key=lambda n: n.node_id)
                            self.leader_node = leader_candidate.node_id
                            leader_candidate.role = NodeRole.LEADER
                            
                            # 设置其他节点为跟随者
                            for node in healthy_nodes:
                                if node.node_id != self.leader_node:
                                    node.role = NodeRole.FOLLOWER
                            
                            print(f"🎯 新领导者选举成功: {self.leader_node}")
                
                time.sleep(self.config.election_timeout)
            except Exception as e:
                print(f"❌ 领导者选举工作线程异常: {e}")
                time.sleep(1)
    
    def _metrics_worker(self):
        """指标收集工作线程"""
        while self._running:
            try:
                with self._lock:
                    healthy_nodes = self.get_healthy_nodes()
                    
                    self.metrics.total_nodes = len(self.nodes)
                    self.metrics.healthy_nodes = len(healthy_nodes)
                    self.metrics.leader_node = self.leader_node
                    
                    # 模拟业务指标
                    self.metrics.total_services = random.randint(50, 200)
                    self.metrics.total_instances = random.randint(100, 1000)
                    self.metrics.total_configs = random.randint(20, 100)
                    self.metrics.qps = random.uniform(100, 1000)
                    self.metrics.avg_response_time = random.uniform(10, 100)
                    
                    # 模拟系统指标
                    self.metrics.memory_usage = random.uniform(30, 80)
                    self.metrics.cpu_usage = random.uniform(10, 60)
                    self.metrics.disk_usage = random.uniform(20, 70)
                    self.metrics.network_in = random.uniform(1, 50)
                    self.metrics.network_out = random.uniform(1, 50)
                    
                    self.metrics.last_update = time.time()
                
                time.sleep(30)  # 每30秒更新一次指标
            except Exception as e:
                print(f"❌ 指标收集工作线程异常: {e}")
                time.sleep(1)
    
    def _trigger_leader_election(self):
        """触发领导者选举"""
        # 重置所有节点角色
        for node in self.nodes.values():
            if node.status == NodeStatus.HEALTHY:
                node.role = NodeRole.CANDIDATE
        
        print("🗳️ 触发领导者选举")
    
    def get_cluster_status(self) -> Dict[str, any]:
        """获取集群状态"""
        with self._lock:
            healthy_nodes = self.get_healthy_nodes()
            
            status = {
                "cluster_name": self.config.cluster_name,
                "mode": self.config.mode.value,
                "total_nodes": len(self.nodes),
                "healthy_nodes": len(healthy_nodes),
                "leader_node": self.leader_node,
                "min_nodes": self.config.min_nodes,
                "is_cluster_healthy": len(healthy_nodes) >= self.config.min_nodes,
                "nodes": {},
                "metrics": {
                    "total_services": self.metrics.total_services,
                    "total_instances": self.metrics.total_instances,
                    "total_configs": self.metrics.total_configs,
                    "qps": self.metrics.qps,
                    "avg_response_time": self.metrics.avg_response_time,
                    "memory_usage": self.metrics.memory_usage,
                    "cpu_usage": self.metrics.cpu_usage,
                    "disk_usage": self.metrics.disk_usage
                }
            }
            
            # 添加节点详细信息
            for node_id, node in self.nodes.items():
                status["nodes"][node_id] = {
                    "address": node.get_address(),
                    "role": node.role.value,
                    "status": node.status.value,
                    "weight": node.weight,
                    "is_healthy": node.is_healthy(),
                    "uptime": time.time() - node.start_time,
                    "last_heartbeat": node.last_heartbeat,
                    "version": node.version
                }
            
            return status
    
    def scale_cluster(self, target_nodes: int) -> bool:
        """集群扩缩容"""
        with self._lock:
            current_nodes = len(self.nodes)
            
            if target_nodes < self.config.min_nodes:
                print(f"❌ 目标节点数不能少于最小节点数: {self.config.min_nodes}")
                return False
            
            if target_nodes > self.config.max_nodes:
                print(f"❌ 目标节点数不能超过最大节点数: {self.config.max_nodes}")
                return False
            
            if target_nodes == current_nodes:
                print(f"ℹ️ 当前节点数已是目标数量: {target_nodes}")
                return True
            
            if target_nodes > current_nodes:
                # 扩容
                for i in range(target_nodes - current_nodes):
                    node_id = f"nacos-node-{len(self.nodes) + 1}"
                    new_node = ClusterNode(
                        node_id=node_id,
                        ip=f"192.168.1.{100 + len(self.nodes)}",
                        port=8848,
                        status=NodeStatus.STARTING
                    )
                    self.add_node(new_node)
                
                print(f"✅ 集群扩容完成: {current_nodes} -> {target_nodes}")
            else:
                # 缩容
                nodes_to_remove = current_nodes - target_nodes
                follower_nodes = [node_id for node_id, node in self.nodes.items() 
                                if node.role == NodeRole.FOLLOWER and node.is_healthy()]
                
                for i in range(min(nodes_to_remove, len(follower_nodes))):
                    self.remove_node(follower_nodes[i])
                
                print(f"✅ 集群缩容完成: {current_nodes} -> {len(self.nodes)}")
            
            return True

# 负载均衡器
class LoadBalancer:
    """负载均衡器"""
    
    def __init__(self, cluster_manager: NacosClusterManager):
        self.cluster_manager = cluster_manager
        self.algorithms = {
            "round_robin": self._round_robin,
            "weighted_round_robin": self._weighted_round_robin,
            "least_connections": self._least_connections,
            "random": self._random,
            "hash": self._hash
        }
        self._round_robin_index = 0
        self._connections = {}  # 连接计数
    
    def select_node(self, algorithm: str = "round_robin", 
                   client_id: str = None) -> Optional[ClusterNode]:
        """选择节点"""
        healthy_nodes = self.cluster_manager.get_healthy_nodes()
        
        if not healthy_nodes:
            print("❌ 没有可用的健康节点")
            return None
        
        if algorithm not in self.algorithms:
            print(f"❌ 不支持的负载均衡算法: {algorithm}")
            algorithm = "round_robin"
        
        return self.algorithms[algorithm](healthy_nodes, client_id)
    
    def _round_robin(self, nodes: List[ClusterNode], client_id: str = None) -> ClusterNode:
        """轮询算法"""
        node = nodes[self._round_robin_index % len(nodes)]
        self._round_robin_index += 1
        return node
    
    def _weighted_round_robin(self, nodes: List[ClusterNode], client_id: str = None) -> ClusterNode:
        """加权轮询算法"""
        total_weight = sum(node.weight for node in nodes)
        if total_weight == 0:
            return self._round_robin(nodes, client_id)
        
        # 简化的加权轮询实现
        weighted_nodes = []
        for node in nodes:
            count = int(node.weight * 10)  # 放大10倍便于计算
            weighted_nodes.extend([node] * count)
        
        if weighted_nodes:
            return weighted_nodes[self._round_robin_index % len(weighted_nodes)]
        return nodes[0]
    
    def _least_connections(self, nodes: List[ClusterNode], client_id: str = None) -> ClusterNode:
        """最少连接算法"""
        min_connections = float('inf')
        selected_node = nodes[0]
        
        for node in nodes:
            connections = self._connections.get(node.node_id, 0)
            if connections < min_connections:
                min_connections = connections
                selected_node = node
        
        return selected_node
    
    def _random(self, nodes: List[ClusterNode], client_id: str = None) -> ClusterNode:
        """随机算法"""
        return random.choice(nodes)
    
    def _hash(self, nodes: List[ClusterNode], client_id: str = None) -> ClusterNode:
        """哈希算法"""
        if not client_id:
            return self._random(nodes, client_id)
        
        hash_value = int(hashlib.md5(client_id.encode()).hexdigest(), 16)
        return nodes[hash_value % len(nodes)]
    
    def add_connection(self, node_id: str):
        """增加连接计数"""
        self._connections[node_id] = self._connections.get(node_id, 0) + 1
    
    def remove_connection(self, node_id: str):
        """减少连接计数"""
        if node_id in self._connections:
            self._connections[node_id] = max(0, self._connections[node_id] - 1)

# 使用示例
if __name__ == "__main__":
    # 创建集群配置
    cluster_config = ClusterConfig(
        cluster_name="nacos-production",
        mode=ClusterMode.CLUSTER,
        min_nodes=3,
        max_nodes=10,
        heartbeat_interval=5.0,
        health_check_interval=10.0
    )
    
    # 创建集群管理器
    cluster_manager = NacosClusterManager(cluster_config)
    
    print("=== Nacos 集群部署示例 ===")
    
    # 添加集群节点
    nodes = [
        ClusterNode("nacos-1", "192.168.1.101", 8848, weight=1.0),
        ClusterNode("nacos-2", "192.168.1.102", 8848, weight=1.5),
        ClusterNode("nacos-3", "192.168.1.103", 8848, weight=1.2),
        ClusterNode("nacos-4", "192.168.1.104", 8848, weight=0.8)
    ]
    
    for node in nodes:
        cluster_manager.add_node(node)
    
    # 启动集群
    cluster_manager.start_cluster()
    
    # 等待集群稳定
    time.sleep(3)
    
    # 查看集群状态
    status = cluster_manager.get_cluster_status()
    print(f"\n集群状态:")
    print(f"  集群名称: {status['cluster_name']}")
    print(f"  总节点数: {status['total_nodes']}")
    print(f"  健康节点数: {status['healthy_nodes']}")
    print(f"  领导者节点: {status['leader_node']}")
    print(f"  集群健康: {status['is_cluster_healthy']}")
    
    # 创建负载均衡器
    load_balancer = LoadBalancer(cluster_manager)
    
    print("\n=== 负载均衡测试 ===")
    
    # 测试不同的负载均衡算法
    algorithms = ["round_robin", "weighted_round_robin", "random", "hash"]
    
    for algorithm in algorithms:
        print(f"\n{algorithm} 算法测试:")
        for i in range(5):
            node = load_balancer.select_node(algorithm, f"client-{i}")
            if node:
                print(f"  请求 {i+1}: {node.node_id} ({node.get_address()})")
    
    # 模拟节点故障
    print("\n=== 故障转移测试 ===")
    
    # 模拟节点故障
    if "nacos-2" in cluster_manager.nodes:
        cluster_manager.nodes["nacos-2"].status = NodeStatus.DOWN
        print("模拟 nacos-2 节点故障")
    
    time.sleep(12)  # 等待健康检查
    
    # 再次查看集群状态
    status = cluster_manager.get_cluster_status()
    print(f"\n故障后集群状态:")
    print(f"  健康节点数: {status['healthy_nodes']}")
    print(f"  领导者节点: {status['leader_node']}")
    
    # 集群扩容测试
    print("\n=== 集群扩容测试 ===")
    cluster_manager.scale_cluster(6)
    
    time.sleep(2)
    
    final_status = cluster_manager.get_cluster_status()
    print(f"扩容后节点数: {final_status['total_nodes']}")
    
    # 停止集群
    time.sleep(5)
    cluster_manager.stop_cluster()

2. 集群部署配置

2.1 数据库配置

# MySQL 集群配置
# docker-compose-mysql-cluster.yml
version: '3.8'

services:
  mysql-master:
    image: mysql:8.0
    container_name: mysql-master
    environment:
      MYSQL_ROOT_PASSWORD: nacos123
      MYSQL_DATABASE: nacos
      MYSQL_USER: nacos
      MYSQL_PASSWORD: nacos123
    ports:
      - "3306:3306"
    volumes:
      - mysql-master-data:/var/lib/mysql
      - ./mysql/master.cnf:/etc/mysql/conf.d/master.cnf
      - ./mysql/init.sql:/docker-entrypoint-initdb.d/init.sql
    command: --server-id=1 --log-bin=mysql-bin --binlog-format=ROW
    networks:
      - nacos-network

  mysql-slave1:
    image: mysql:8.0
    container_name: mysql-slave1
    environment:
      MYSQL_ROOT_PASSWORD: nacos123
      MYSQL_DATABASE: nacos
      MYSQL_USER: nacos
      MYSQL_PASSWORD: nacos123
    ports:
      - "3307:3306"
    volumes:
      - mysql-slave1-data:/var/lib/mysql
      - ./mysql/slave.cnf:/etc/mysql/conf.d/slave.cnf
    command: --server-id=2 --relay-log=mysql-relay-bin
    depends_on:
      - mysql-master
    networks:
      - nacos-network

  mysql-slave2:
    image: mysql:8.0
    container_name: mysql-slave2
    environment:
      MYSQL_ROOT_PASSWORD: nacos123
      MYSQL_DATABASE: nacos
      MYSQL_USER: nacos
      MYSQL_PASSWORD: nacos123
    ports:
      - "3308:3306"
    volumes:
      - mysql-slave2-data:/var/lib/mysql
      - ./mysql/slave.cnf:/etc/mysql/conf.d/slave.cnf
    command: --server-id=3 --relay-log=mysql-relay-bin
    depends_on:
      - mysql-master
    networks:
      - nacos-network

volumes:
  mysql-master-data:
  mysql-slave1-data:
  mysql-slave2-data:

networks:
  nacos-network:
    driver: bridge

2.2 Nacos 集群配置

# Nacos 集群部署配置
# docker-compose-nacos-cluster.yml
version: '3.8'

services:
  nacos1:
    image: nacos/nacos-server:v2.2.0
    container_name: nacos1
    environment:
      - PREFER_HOST_MODE=hostname
      - MODE=cluster
      - NACOS_APPLICATION_PORT=8848
      - NACOS_SERVERS=nacos1:8848 nacos2:8848 nacos3:8848
      - MYSQL_SERVICE_HOST=mysql-master
      - MYSQL_SERVICE_PORT=3306
      - MYSQL_SERVICE_DB_NAME=nacos
      - MYSQL_SERVICE_USER=nacos
      - MYSQL_SERVICE_PASSWORD=nacos123
      - MYSQL_SERVICE_DB_PARAM=characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useSSL=false&allowPublicKeyRetrieval=true
      - NACOS_AUTH_ENABLE=true
      - NACOS_AUTH_TOKEN=SecretKey012345678901234567890123456789012345678901234567890123456789
      - NACOS_AUTH_IDENTITY_KEY=nacos
      - NACOS_AUTH_IDENTITY_VALUE=nacos
      - JVM_XMS=512m
      - JVM_XMX=512m
      - JVM_XMN=256m
    ports:
      - "8848:8848"
      - "9848:9848"
      - "9849:9849"
    volumes:
      - nacos1-logs:/home/nacos/logs
      - ./nacos/cluster.conf:/home/nacos/conf/cluster.conf
    depends_on:
      - mysql-master
    networks:
      - nacos-network
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8848/nacos/actuator/health"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 60s

  nacos2:
    image: nacos/nacos-server:v2.2.0
    container_name: nacos2
    environment:
      - PREFER_HOST_MODE=hostname
      - MODE=cluster
      - NACOS_APPLICATION_PORT=8848
      - NACOS_SERVERS=nacos1:8848 nacos2:8848 nacos3:8848
      - MYSQL_SERVICE_HOST=mysql-master
      - MYSQL_SERVICE_PORT=3306
      - MYSQL_SERVICE_DB_NAME=nacos
      - MYSQL_SERVICE_USER=nacos
      - MYSQL_SERVICE_PASSWORD=nacos123
      - MYSQL_SERVICE_DB_PARAM=characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useSSL=false&allowPublicKeyRetrieval=true
      - NACOS_AUTH_ENABLE=true
      - NACOS_AUTH_TOKEN=SecretKey012345678901234567890123456789012345678901234567890123456789
      - NACOS_AUTH_IDENTITY_KEY=nacos
      - NACOS_AUTH_IDENTITY_VALUE=nacos
      - JVM_XMS=512m
      - JVM_XMX=512m
      - JVM_XMN=256m
    ports:
      - "8849:8848"
      - "9850:9848"
      - "9851:9849"
    volumes:
      - nacos2-logs:/home/nacos/logs
      - ./nacos/cluster.conf:/home/nacos/conf/cluster.conf
    depends_on:
      - mysql-master
      - nacos1
    networks:
      - nacos-network
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8848/nacos/actuator/health"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 60s

  nacos3:
    image: nacos/nacos-server:v2.2.0
    container_name: nacos3
    environment:
      - PREFER_HOST_MODE=hostname
      - MODE=cluster
      - NACOS_APPLICATION_PORT=8848
      - NACOS_SERVERS=nacos1:8848 nacos2:8848 nacos3:8848
      - MYSQL_SERVICE_HOST=mysql-master
      - MYSQL_SERVICE_PORT=3306
      - MYSQL_SERVICE_DB_NAME=nacos
      - MYSQL_SERVICE_USER=nacos
      - MYSQL_SERVICE_PASSWORD=nacos123
      - MYSQL_SERVICE_DB_PARAM=characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useSSL=false&allowPublicKeyRetrieval=true
      - NACOS_AUTH_ENABLE=true
      - NACOS_AUTH_TOKEN=SecretKey012345678901234567890123456789012345678901234567890123456789
      - NACOS_AUTH_IDENTITY_KEY=nacos
      - NACOS_AUTH_IDENTITY_VALUE=nacos
      - JVM_XMS=512m
      - JVM_XMX=512m
      - JVM_XMN=256m
    ports:
      - "8850:8848"
      - "9852:9848"
      - "9853:9849"
    volumes:
      - nacos3-logs:/home/nacos/logs
      - ./nacos/cluster.conf:/home/nacos/conf/cluster.conf
    depends_on:
      - mysql-master
      - nacos1
      - nacos2
    networks:
      - nacos-network
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8848/nacos/actuator/health"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 60s

  # Nginx 负载均衡器
  nginx:
    image: nginx:alpine
    container_name: nacos-nginx
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx/nginx.conf:/etc/nginx/nginx.conf
      - ./nginx/ssl:/etc/nginx/ssl
    depends_on:
      - nacos1
      - nacos2
      - nacos3
    networks:
      - nacos-network
    restart: unless-stopped

volumes:
  nacos1-logs:
  nacos2-logs:
  nacos3-logs:

networks:
  nacos-network:
    driver: bridge
    ipam:
      config:
        - subnet: 172.20.0.0/16

2.3 Nginx 负载均衡配置

# nginx.conf
events {
    worker_connections 1024;
}

http {
    include       /etc/nginx/mime.types;
    default_type  application/octet-stream;
    
    # 日志格式
    log_format main '$remote_addr - $remote_user [$time_local] "$request" '
                   '$status $body_bytes_sent "$http_referer" '
                   '"$http_user_agent" "$http_x_forwarded_for" '
                   'rt=$request_time uct="$upstream_connect_time" '
                   'uht="$upstream_header_time" urt="$upstream_response_time"';
    
    access_log /var/log/nginx/access.log main;
    error_log /var/log/nginx/error.log warn;
    
    # Gzip 压缩
    gzip on;
    gzip_vary on;
    gzip_min_length 1024;
    gzip_types text/plain text/css application/json application/javascript text/xml application/xml application/xml+rss text/javascript;
    
    # 上游服务器组
    upstream nacos_backend {
        # 负载均衡策略:ip_hash 保证会话粘性
        ip_hash;
        
        # Nacos 集群节点
        server nacos1:8848 weight=3 max_fails=3 fail_timeout=30s;
        server nacos2:8848 weight=3 max_fails=3 fail_timeout=30s;
        server nacos3:8848 weight=3 max_fails=3 fail_timeout=30s;
        
        # 健康检查
        keepalive 32;
    }
    
    # HTTP 服务器配置
    server {
        listen 80;
        server_name nacos.example.com;
        
        # 重定向到 HTTPS
        return 301 https://$server_name$request_uri;
    }
    
    # HTTPS 服务器配置
    server {
        listen 443 ssl http2;
        server_name nacos.example.com;
        
        # SSL 证书配置
        ssl_certificate /etc/nginx/ssl/nacos.crt;
        ssl_certificate_key /etc/nginx/ssl/nacos.key;
        ssl_session_timeout 1d;
        ssl_session_cache shared:SSL:50m;
        ssl_session_tickets off;
        
        # SSL 协议和加密套件
        ssl_protocols TLSv1.2 TLSv1.3;
        ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-RSA-AES128-SHA256:ECDHE-RSA-AES256-SHA384;
        ssl_prefer_server_ciphers off;
        
        # HSTS
        add_header Strict-Transport-Security "max-age=63072000" always;
        
        # 客户端最大请求体大小
        client_max_body_size 10M;
        
        # 代理配置
        location / {
            proxy_pass http://nacos_backend;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;
            
            # 超时配置
            proxy_connect_timeout 30s;
            proxy_send_timeout 30s;
            proxy_read_timeout 30s;
            
            # 缓冲配置
            proxy_buffering on;
            proxy_buffer_size 4k;
            proxy_buffers 8 4k;
            proxy_busy_buffers_size 8k;
            
            # WebSocket 支持
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "upgrade";
        }
        
        # 健康检查端点
        location /health {
            access_log off;
            return 200 "healthy\n";
            add_header Content-Type text/plain;
        }
        
        # 静态文件缓存
        location ~* \.(css|js|png|jpg|jpeg|gif|ico|svg)$ {
            expires 1y;
            add_header Cache-Control "public, immutable";
            add_header Vary Accept-Encoding;
        }
    }
    
    # 状态监控页面
    server {
        listen 8080;
        server_name localhost;
        
        location /nginx_status {
            stub_status on;
            access_log off;
            allow 127.0.0.1;
            allow 172.20.0.0/16;
            deny all;
        }
        
        location /upstream_status {
            access_log off;
            allow 127.0.0.1;
            allow 172.20.0.0/16;
            deny all;
            
            content_by_lua_block {
                local upstream = require "ngx.upstream"
                local get_servers = upstream.get_servers
                local get_primary_peers = upstream.get_primary_peers
                
                ngx.say("Upstream Status:")
                local servers, err = get_servers("nacos_backend")
                if not servers then
                    ngx.say("Failed to get servers: ", err)
                    return
                end
                
                for i, server in ipairs(servers) do
                    ngx.say(string.format("Server %d: %s:%d, weight=%d, status=%s", 
                           i, server.addr, server.port, server.weight, server.status))
                end
            }
        }
    }
}

2.4 集群配置文件

# cluster.conf
# Nacos 集群节点配置
# 格式:ip:port
nacos1:8848
nacos2:8848
nacos3:8848
# application.properties
# Nacos 集群应用配置

# 服务器配置
server.servlet.contextPath=/nacos
server.port=8848
server.tomcat.accesslog.enabled=true
server.tomcat.accesslog.pattern=%h %l %u %t "%r" %s %b %D %{User-Agent}i %{Request-Source}i
server.tomcat.basedir=file:.

# 数据库配置
spring.datasource.platform=mysql
db.num=1
db.url.0=jdbc:mysql://mysql-master:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true
db.user.0=nacos
db.password.0=nacos123
db.pool.config.connectionTimeout=30000
db.pool.config.validationTimeout=10000
db.pool.config.maximumPoolSize=20
db.pool.config.minimumIdle=2

# 集群配置
nacos.core.cluster.task.timeout=5000
nacos.core.cluster.server-status-synchronization.timeout=5000
nacos.core.cluster.raft.data.load_timeout=10000

# 认证配置
nacos.core.auth.enabled=true
nacos.core.auth.system.type=nacos
nacos.core.auth.plugin.nacos.token.secret.key=SecretKey012345678901234567890123456789012345678901234567890123456789
nacos.core.auth.plugin.nacos.token.expire.seconds=18000
nacos.core.auth.default.token.secret.key=SecretKey012345678901234567890123456789012345678901234567890123456789

# 监控配置
management.endpoints.web.exposure.include=*
management.metrics.export.elastic.enabled=false
management.metrics.export.influx.enabled=false

# 日志配置
nacos.logs.path=/home/nacos/logs
logging.config=/home/nacos/conf/nacos-logback.xml

# JVM 配置
nacos.server.ip=
nacos.inetutils.prefer-hostname-over-ip=false
nacos.inetutils.ip-address=

# 性能配置
nacos.config.push.maxRetryTime=50
nacos.config.retry.time=3000
nacos.config.maxAggrCount=10000
nacos.config.maxContent=10485760

# 命名空间配置
nacos.naming.distro.taskDispatchThreadCount=10
nacos.naming.distro.taskDispatchPeriod=200
nacos.naming.distro.batchSyncKeyCount=1000
nacos.naming.distro.initDataRatio=0.9
nacos.naming.distro.syncRetryDelay=5000

# 健康检查配置
nacos.naming.healthcheck.timeout=3000
nacos.naming.healthcheck.max.timeout=20000
nacos.naming.healthcheck.interval=5000

3. 部署脚本

3.1 自动化部署脚本

#!/bin/bash
# deploy-nacos-cluster.sh
# Nacos 集群自动化部署脚本

set -e

# 配置变量
NACOS_VERSION="v2.2.0"
CLUSTER_NAME="nacos-production"
NODE_COUNT=3
MYSQL_ROOT_PASSWORD="nacos123"
NACOS_AUTH_TOKEN="SecretKey012345678901234567890123456789012345678901234567890123456789"
DOMAIN="nacos.example.com"

# 颜色输出
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color

log_info() {
    echo -e "${BLUE}[INFO]${NC} $1"
}

log_success() {
    echo -e "${GREEN}[SUCCESS]${NC} $1"
}

log_warning() {
    echo -e "${YELLOW}[WARNING]${NC} $1"
}

log_error() {
    echo -e "${RED}[ERROR]${NC} $1"
}

# 检查依赖
check_dependencies() {
    log_info "检查系统依赖..."
    
    # 检查 Docker
    if ! command -v docker &> /dev/null; then
        log_error "Docker 未安装,请先安装 Docker"
        exit 1
    fi
    
    # 检查 Docker Compose
    if ! command -v docker-compose &> /dev/null; then
        log_error "Docker Compose 未安装,请先安装 Docker Compose"
        exit 1
    fi
    
    # 检查端口占用
    local ports=("80" "443" "3306" "8848" "8849" "8850")
    for port in "${ports[@]}"; do
        if netstat -tuln | grep ":$port " > /dev/null; then
            log_warning "端口 $port 已被占用"
        fi
    done
    
    log_success "依赖检查完成"
}

# 创建目录结构
create_directories() {
    log_info "创建目录结构..."
    
    mkdir -p {
        mysql/{master,slave},
        nacos/{conf,logs},
        nginx/{conf,ssl,logs},
        scripts,
        backup
    }
    
    log_success "目录结构创建完成"
}

# 生成 MySQL 配置
generate_mysql_config() {
    log_info "生成 MySQL 配置文件..."
    
    # Master 配置
    cat > mysql/master.cnf << EOF
[mysqld]
server-id = 1
log-bin = mysql-bin
binlog-format = ROW
binlog-do-db = nacos
max_connections = 1000
innodb_buffer_pool_size = 256M
innodb_log_file_size = 64M
innodb_flush_log_at_trx_commit = 2
sync_binlog = 1
EOF
    
    # Slave 配置
    cat > mysql/slave.cnf << EOF
[mysqld]
relay-log = mysql-relay-bin
read_only = 1
max_connections = 1000
innodb_buffer_pool_size = 256M
EOF
    
    # 初始化 SQL
    cat > mysql/init.sql << EOF
CREATE DATABASE IF NOT EXISTS nacos DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE USER IF NOT EXISTS 'nacos'@'%' IDENTIFIED BY '${MYSQL_ROOT_PASSWORD}';
GRANT ALL PRIVILEGES ON nacos.* TO 'nacos'@'%';
FLUSH PRIVILEGES;

-- Nacos 表结构
USE nacos;

CREATE TABLE config_info (
  id bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
  data_id varchar(255) NOT NULL COMMENT 'data_id',
  group_id varchar(255) DEFAULT NULL,
  content longtext NOT NULL COMMENT 'content',
  md5 varchar(32) DEFAULT NULL COMMENT 'md5',
  gmt_create datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  gmt_modified datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  src_user text COMMENT 'source user',
  src_ip varchar(50) DEFAULT NULL COMMENT 'source ip',
  app_name varchar(128) DEFAULT NULL,
  tenant_id varchar(128) DEFAULT '' COMMENT '租户字段',
  c_desc varchar(256) DEFAULT NULL,
  c_use varchar(64) DEFAULT NULL,
  effect varchar(64) DEFAULT NULL,
  type varchar(64) DEFAULT NULL,
  c_schema text,
  encrypted_data_key text NOT NULL COMMENT '秘钥',
  PRIMARY KEY (id),
  UNIQUE KEY uk_configinfo_datagrouptenant (data_id,group_id,tenant_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='config_info';

-- 更多表结构...
EOF
    
    log_success "MySQL 配置文件生成完成"
}

# 生成 Nacos 配置
generate_nacos_config() {
    log_info "生成 Nacos 配置文件..."
    
    # 集群配置
    cat > nacos/cluster.conf << EOF
nacos1:8848
nacos2:8848
nacos3:8848
EOF
    
    # 应用配置
    cat > nacos/application.properties << EOF
# 服务器配置
server.servlet.contextPath=/nacos
server.port=8848

# 数据库配置
spring.datasource.platform=mysql
db.num=1
db.url.0=jdbc:mysql://mysql-master:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true
db.user.0=nacos
db.password.0=${MYSQL_ROOT_PASSWORD}

# 认证配置
nacos.core.auth.enabled=true
nacos.core.auth.system.type=nacos
nacos.core.auth.plugin.nacos.token.secret.key=${NACOS_AUTH_TOKEN}
nacos.core.auth.plugin.nacos.token.expire.seconds=18000

# 监控配置
management.endpoints.web.exposure.include=*
EOF
    
    log_success "Nacos 配置文件生成完成"
}

# 生成 SSL 证书
generate_ssl_certificates() {
    log_info "生成 SSL 证书..."
    
    # 生成自签名证书(生产环境请使用正式证书)
    openssl req -x509 -nodes -days 365 -newkey rsa:2048 \
        -keyout nginx/ssl/nacos.key \
        -out nginx/ssl/nacos.crt \
        -subj "/C=CN/ST=Beijing/L=Beijing/O=Example/OU=IT/CN=${DOMAIN}"
    
    log_success "SSL 证书生成完成"
}

# 生成 Docker Compose 文件
generate_docker_compose() {
    log_info "生成 Docker Compose 配置..."
    
    # 这里会生成完整的 docker-compose.yml 文件
    # (内容太长,这里省略,实际应包含完整配置)
    
    log_success "Docker Compose 配置生成完成"
}

# 部署集群
deploy_cluster() {
    log_info "开始部署 Nacos 集群..."
    
    # 拉取镜像
    log_info "拉取 Docker 镜像..."
    docker-compose pull
    
    # 启动 MySQL 集群
    log_info "启动 MySQL 集群..."
    docker-compose up -d mysql-master mysql-slave1 mysql-slave2
    
    # 等待 MySQL 启动
    log_info "等待 MySQL 集群启动..."
    sleep 30
    
    # 配置 MySQL 主从复制
    setup_mysql_replication
    
    # 启动 Nacos 集群
    log_info "启动 Nacos 集群..."
    docker-compose up -d nacos1 nacos2 nacos3
    
    # 等待 Nacos 启动
    log_info "等待 Nacos 集群启动..."
    sleep 60
    
    # 启动 Nginx
    log_info "启动 Nginx 负载均衡器..."
    docker-compose up -d nginx
    
    log_success "Nacos 集群部署完成"
}

# 配置 MySQL 主从复制
setup_mysql_replication() {
    log_info "配置 MySQL 主从复制..."
    
    # 获取 Master 状态
    MASTER_STATUS=$(docker exec mysql-master mysql -uroot -p${MYSQL_ROOT_PASSWORD} -e "SHOW MASTER STATUS\G")
    MASTER_FILE=$(echo "$MASTER_STATUS" | grep "File:" | awk '{print $2}')
    MASTER_POS=$(echo "$MASTER_STATUS" | grep "Position:" | awk '{print $2}')
    
    # 配置 Slave1
    docker exec mysql-slave1 mysql -uroot -p${MYSQL_ROOT_PASSWORD} -e "
        CHANGE MASTER TO 
        MASTER_HOST='mysql-master',
        MASTER_USER='root',
        MASTER_PASSWORD='${MYSQL_ROOT_PASSWORD}',
        MASTER_LOG_FILE='${MASTER_FILE}',
        MASTER_LOG_POS=${MASTER_POS};
        START SLAVE;
    "
    
    # 配置 Slave2
    docker exec mysql-slave2 mysql -uroot -p${MYSQL_ROOT_PASSWORD} -e "
        CHANGE MASTER TO 
        MASTER_HOST='mysql-master',
        MASTER_USER='root',
        MASTER_PASSWORD='${MYSQL_ROOT_PASSWORD}',
        MASTER_LOG_FILE='${MASTER_FILE}',
        MASTER_LOG_POS=${MASTER_POS};
        START SLAVE;
    "
    
    log_success "MySQL 主从复制配置完成"
}

# 健康检查
health_check() {
    log_info "执行健康检查..."
    
    local max_attempts=30
    local attempt=1
    
    while [ $attempt -le $max_attempts ]; do
        log_info "健康检查尝试 $attempt/$max_attempts"
        
        # 检查 MySQL
        if docker exec mysql-master mysql -uroot -p${MYSQL_ROOT_PASSWORD} -e "SELECT 1" > /dev/null 2>&1; then
            log_success "MySQL Master 健康检查通过"
        else
            log_error "MySQL Master 健康检查失败"
            return 1
        fi
        
        # 检查 Nacos 节点
        local healthy_nodes=0
        for i in {1..3}; do
            local port=$((8847 + i))
            if curl -f "http://localhost:${port}/nacos/actuator/health" > /dev/null 2>&1; then
                log_success "Nacos${i} 健康检查通过"
                ((healthy_nodes++))
            else
                log_warning "Nacos${i} 健康检查失败"
            fi
        done
        
        if [ $healthy_nodes -ge 2 ]; then
            log_success "Nacos 集群健康检查通过 ($healthy_nodes/3 节点健康)"
            break
        fi
        
        if [ $attempt -eq $max_attempts ]; then
            log_error "健康检查失败,集群可能存在问题"
            return 1
        fi
        
        ((attempt++))
        sleep 10
    done
    
    # 检查 Nginx
    if curl -f "http://localhost/nacos" > /dev/null 2>&1; then
        log_success "Nginx 负载均衡器健康检查通过"
    else
        log_warning "Nginx 负载均衡器健康检查失败"
    fi
    
    log_success "集群健康检查完成"
}

# 显示集群信息
show_cluster_info() {
    log_info "集群部署信息:"
    echo "==========================================="
    echo "集群名称: ${CLUSTER_NAME}"
    echo "Nacos 版本: ${NACOS_VERSION}"
    echo "节点数量: ${NODE_COUNT}"
    echo "访问地址: https://${DOMAIN}/nacos"
    echo "管理员账号: nacos / nacos"
    echo "==========================================="
    echo "服务端口:"
    echo "  - Nginx HTTP: 80"
    echo "  - Nginx HTTPS: 443"
    echo "  - Nacos1: 8848"
    echo "  - Nacos2: 8849"
    echo "  - Nacos3: 8850"
    echo "  - MySQL Master: 3306"
    echo "  - MySQL Slave1: 3307"
    echo "  - MySQL Slave2: 3308"
    echo "==========================================="
}

# 备份集群数据
backup_cluster() {
    log_info "备份集群数据..."
    
    local backup_dir="backup/$(date +%Y%m%d_%H%M%S)"
    mkdir -p "$backup_dir"
    
    # 备份 MySQL 数据
    docker exec mysql-master mysqldump -uroot -p${MYSQL_ROOT_PASSWORD} --all-databases > "$backup_dir/mysql_backup.sql"
    
    # 备份 Nacos 配置
    docker cp nacos1:/home/nacos/conf "$backup_dir/nacos_conf"
    docker cp nacos1:/home/nacos/logs "$backup_dir/nacos_logs"
    
    # 压缩备份
    tar -czf "$backup_dir.tar.gz" -C backup "$(basename $backup_dir)"
    rm -rf "$backup_dir"
    
    log_success "集群数据备份完成: $backup_dir.tar.gz"
}

# 主函数
main() {
    echo "==========================================="
    echo "    Nacos 集群自动化部署脚本"
    echo "==========================================="
    
    case "${1:-deploy}" in
        "deploy")
            check_dependencies
            create_directories
            generate_mysql_config
            generate_nacos_config
            generate_ssl_certificates
            generate_docker_compose
            deploy_cluster
            health_check
            show_cluster_info
            ;;
        "health")
            health_check
            ;;
        "backup")
            backup_cluster
            ;;
        "stop")
            log_info "停止集群..."
            docker-compose down
            log_success "集群已停止"
            ;;
        "restart")
            log_info "重启集群..."
            docker-compose restart
            health_check
            log_success "集群重启完成"
            ;;
        "logs")
            docker-compose logs -f
            ;;
        "status")
            docker-compose ps
            ;;
        *)
            echo "用法: $0 {deploy|health|backup|stop|restart|logs|status}"
            echo "  deploy  - 部署集群"
            echo "  health  - 健康检查"
            echo "  backup  - 备份数据"
            echo "  stop    - 停止集群"
            echo "  restart - 重启集群"
            echo "  logs    - 查看日志"
            echo "  status  - 查看状态"
            exit 1
            ;;
    esac
}

# 执行主函数
main "$@"

3.2 监控脚本

#!/bin/bash
# monitor-nacos-cluster.sh
# Nacos 集群监控脚本

set -e

# 配置
CLUSTER_NODES=("nacos1:8848" "nacos2:8848" "nacos3:8848")
MYSQL_NODES=("mysql-master:3306" "mysql-slave1:3306" "mysql-slave2:3306")
ALERT_EMAIL="admin@example.com"
SLACK_WEBHOOK="https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
LOG_FILE="/var/log/nacos-monitor.log"

# 日志函数
log_with_timestamp() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}

# 发送告警
send_alert() {
    local message="$1"
    local severity="$2"
    
    log_with_timestamp "ALERT [$severity]: $message"
    
    # 发送邮件告警
    if command -v mail &> /dev/null; then
        echo "$message" | mail -s "Nacos Cluster Alert [$severity]" "$ALERT_EMAIL"
    fi
    
    # 发送 Slack 告警
    if [ -n "$SLACK_WEBHOOK" ]; then
        curl -X POST -H 'Content-type: application/json' \
            --data "{\"text\":\"🚨 Nacos Cluster Alert [$severity]: $message\"}" \
            "$SLACK_WEBHOOK" > /dev/null 2>&1
    fi
}

# 检查 Nacos 节点
check_nacos_nodes() {
    local failed_nodes=()
    
    for node in "${CLUSTER_NODES[@]}"; do
        local host=$(echo "$node" | cut -d':' -f1)
        local port=$(echo "$node" | cut -d':' -f2)
        
        if ! curl -f "http://$host:$port/nacos/actuator/health" > /dev/null 2>&1; then
            failed_nodes+=("$node")
        fi
    done
    
    if [ ${#failed_nodes[@]} -gt 0 ]; then
        send_alert "Nacos nodes failed: ${failed_nodes[*]}" "CRITICAL"
        return 1
    fi
    
    log_with_timestamp "All Nacos nodes are healthy"
    return 0
}

# 检查 MySQL 节点
check_mysql_nodes() {
    local failed_nodes=()
    
    for node in "${MYSQL_NODES[@]}"; do
        local host=$(echo "$node" | cut -d':' -f1)
        
        if ! docker exec "$host" mysql -unacos -pnacos123 -e "SELECT 1" > /dev/null 2>&1; then
            failed_nodes+=("$node")
        fi
    done
    
    if [ ${#failed_nodes[@]} -gt 0 ]; then
        send_alert "MySQL nodes failed: ${failed_nodes[*]}" "CRITICAL"
        return 1
    fi
    
    log_with_timestamp "All MySQL nodes are healthy"
    return 0
}

# 检查集群状态
check_cluster_status() {
    log_with_timestamp "Starting cluster health check..."
    
    local nacos_ok=true
    local mysql_ok=true
    
    if ! check_nacos_nodes; then
        nacos_ok=false
    fi
    
    if ! check_mysql_nodes; then
        mysql_ok=false
    fi
    
    if $nacos_ok && $mysql_ok; then
        log_with_timestamp "Cluster health check passed"
        return 0
    else
        log_with_timestamp "Cluster health check failed"
        return 1
    fi
}

# 收集性能指标
collect_metrics() {
    log_with_timestamp "Collecting cluster metrics..."
    
    # 收集 Docker 容器指标
    docker stats --no-stream --format "table {{.Container}}\t{{.CPUPerc}}\t{{.MemUsage}}\t{{.NetIO}}\t{{.BlockIO}}" \
        nacos1 nacos2 nacos3 mysql-master mysql-slave1 mysql-slave2 nginx
    
    # 收集 Nacos 业务指标
    for node in "${CLUSTER_NODES[@]}"; do
        local host=$(echo "$node" | cut -d':' -f1)
        local port=$(echo "$node" | cut -d':' -f2)
        
        log_with_timestamp "Metrics for $node:"
        curl -s "http://$host:$port/nacos/actuator/metrics" | jq '.'
    done
}

# 主监控循环
monitor_loop() {
    log_with_timestamp "Starting Nacos cluster monitoring..."
    
    while true; do
        if ! check_cluster_status; then
            # 集群异常,增加检查频率
            sleep 30
        else
            # 集群正常,正常检查频率
            sleep 300  # 5分钟
        fi
        
        # 每小时收集一次指标
        if [ $(($(date +%M) % 60)) -eq 0 ]; then
            collect_metrics
        fi
    done
}

# 主函数
case "${1:-monitor}" in
    "monitor")
        monitor_loop
        ;;
    "check")
        check_cluster_status
        ;;
    "metrics")
        collect_metrics
        ;;
    *)
        echo "用法: $0 {monitor|check|metrics}"
        echo "  monitor - 启动监控循环"
        echo "  check   - 执行一次健康检查"
        echo "  metrics - 收集性能指标"
        exit 1
        ;;
esac

4. 高可用配置

4.1 故障转移配置

// failover.go
// Nacos 客户端故障转移实现
package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"sync"
	"time"

	"github.com/nacos-group/nacos-sdk-go/clients"
	"github.com/nacos-group/nacos-sdk-go/clients/config_client"
	"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
	"github.com/nacos-group/nacos-sdk-go/common/constant"
	"github.com/nacos-group/nacos-sdk-go/vo"
)

type FailoverConfig struct {
	PrimaryServers   []string      `json:"primary_servers"`
	SecondaryServers []string      `json:"secondary_servers"`n	HealthCheckInterval time.Duration `json:"health_check_interval"`
	FailoverTimeout     time.Duration `json:"failover_timeout"`
	RetryAttempts       int           `json:"retry_attempts"`
	RetryDelay          time.Duration `json:"retry_delay"`
}

type NacosFailoverClient struct {
	config          FailoverConfig
	configClient    config_client.IConfigClient
	namingClient    naming_client.INamingClient
	currentServers  []string
	isUsingPrimary  bool
	mutex           sync.RWMutex
	healthChecker   *time.Ticker
	ctx             context.Context
	cancel          context.CancelFunc
}

func NewNacosFailoverClient(config FailoverConfig) (*NacosFailoverClient, error) {
	ctx, cancel := context.WithCancel(context.Background())
	
	client := &NacosFailoverClient{
		config:         config,
		currentServers: config.PrimaryServers,
		isUsingPrimary: true,
		ctx:            ctx,
		cancel:         cancel,
	}
	
	// 初始化客户端
	if err := client.initClients(); err != nil {
		return nil, fmt.Errorf("failed to initialize clients: %w", err)
	}
	
	// 启动健康检查
	client.startHealthCheck()
	
	return client, nil
}

func (c *NacosFailoverClient) initClients() error {
	c.mutex.Lock()
	defer c.mutex.Unlock()
	
	// 构建服务器配置
	serverConfigs := make([]constant.ServerConfig, 0, len(c.currentServers))
	for _, server := range c.currentServers {
		serverConfigs = append(serverConfigs, constant.ServerConfig{
			IpAddr: server,
			Port:   8848,
		})
	}
	
	// 客户端配置
	clientConfig := constant.ClientConfig{
		NamespaceId:         "public",
		TimeoutMs:           5000,
		NotLoadCacheAtStart: true,
		LogDir:              "/tmp/nacos/log",
		CacheDir:            "/tmp/nacos/cache",
		LogLevel:            "info",
		Username:            "nacos",
		Password:            "nacos",
	}
	
	// 创建配置客户端
	configClient, err := clients.CreateConfigClient(map[string]interface{}{
		"serverConfigs": serverConfigs,
		"clientConfig":  clientConfig,
	})
	if err != nil {
		return fmt.Errorf("failed to create config client: %w", err)
	}
	c.configClient = configClient
	
	// 创建命名客户端
	namingClient, err := clients.CreateNamingClient(map[string]interface{}{
		"serverConfigs": serverConfigs,
		"clientConfig":  clientConfig,
	})
	if err != nil {
		return fmt.Errorf("failed to create naming client: %w", err)
	}
	c.namingClient = namingClient
	
	return nil
}

func (c *NacosFailoverClient) startHealthCheck() {
	c.healthChecker = time.NewTicker(c.config.HealthCheckInterval)
	
	go func() {
		for {
			select {
			case <-c.ctx.Done():
				return
			case <-c.healthChecker.C:
				c.performHealthCheck()
			}
		}
	}()
}

func (c *NacosFailoverClient) performHealthCheck() {
	c.mutex.RLock()
	currentServers := c.currentServers
	isUsingPrimary := c.isUsingPrimary
	c.mutex.RUnlock()
	
	// 检查当前服务器健康状态
	healthyServers := c.checkServersHealth(currentServers)
	
	if len(healthyServers) == 0 {
		// 当前服务器全部不健康,尝试故障转移
		log.Printf("All current servers are unhealthy, attempting failover")
		c.performFailover()
	} else if !isUsingPrimary {
		// 当前使用备用服务器,检查主服务器是否恢复
		primaryHealthy := c.checkServersHealth(c.config.PrimaryServers)
		if len(primaryHealthy) > 0 {
			log.Printf("Primary servers recovered, switching back")
			c.switchToPrimary()
		}
	}
}

func (c *NacosFailoverClient) checkServersHealth(servers []string) []string {
	healthyServers := make([]string, 0, len(servers))
	
	for _, server := range servers {
		if c.isServerHealthy(server) {
			healthyServers = append(healthyServers, server)
		}
	}
	
	return healthyServers
}

func (c *NacosFailoverClient) isServerHealthy(server string) bool {
	// 简单的健康检查实现
	// 实际应该调用 Nacos 健康检查 API
	return rand.Float32() > 0.1 // 90% 健康率模拟
}

func (c *NacosFailoverClient) performFailover() {
	c.mutex.Lock()
	defer c.mutex.Unlock()
	
	if c.isUsingPrimary {
		// 切换到备用服务器
		log.Printf("Switching to secondary servers")
		c.currentServers = c.config.SecondaryServers
		c.isUsingPrimary = false
	} else {
		// 已经在使用备用服务器,无法进一步故障转移
		log.Printf("Already using secondary servers, no further failover available")
		return
	}
	
	// 重新初始化客户端
	if err := c.initClients(); err != nil {
		log.Printf("Failed to reinitialize clients after failover: %v", err)
	}
}

func (c *NacosFailoverClient) switchToPrimary() {
	c.mutex.Lock()
	defer c.mutex.Unlock()
	
	log.Printf("Switching back to primary servers")
	c.currentServers = c.config.PrimaryServers
	c.isUsingPrimary = true
	
	// 重新初始化客户端
	if err := c.initClients(); err != nil {
		log.Printf("Failed to reinitialize clients when switching to primary: %v", err)
	}
}

// 配置操作方法
func (c *NacosFailoverClient) GetConfig(dataId, group string) (string, error) {
	c.mutex.RLock()
	configClient := c.configClient
	c.mutex.RUnlock()
	
	for attempt := 0; attempt < c.config.RetryAttempts; attempt++ {
		content, err := configClient.GetConfig(vo.ConfigParam{
			DataId: dataId,
			Group:  group,
		})
		
		if err == nil {
			return content, nil
		}
		
		log.Printf("Attempt %d failed to get config: %v", attempt+1, err)
		
		if attempt < c.config.RetryAttempts-1 {
			time.Sleep(c.config.RetryDelay)
		}
	}
	
	return "", fmt.Errorf("failed to get config after %d attempts", c.config.RetryAttempts)
}

func (c *NacosFailoverClient) PublishConfig(dataId, group, content string) error {
	c.mutex.RLock()
	configClient := c.configClient
	c.mutex.RUnlock()
	
	for attempt := 0; attempt < c.config.RetryAttempts; attempt++ {
		success, err := configClient.PublishConfig(vo.ConfigParam{
			DataId:  dataId,
			Group:   group,
			Content: content,
		})
		
		if err == nil && success {
			return nil
		}
		
		log.Printf("Attempt %d failed to publish config: %v", attempt+1, err)
		
		if attempt < c.config.RetryAttempts-1 {
			time.Sleep(c.config.RetryDelay)
		}
	}
	
	return fmt.Errorf("failed to publish config after %d attempts", c.config.RetryAttempts)
}

// 服务发现操作方法
func (c *NacosFailoverClient) RegisterInstance(serviceName string, ip string, port uint64) error {
	c.mutex.RLock()
	namingClient := c.namingClient
	c.mutex.RUnlock()
	
	for attempt := 0; attempt < c.config.RetryAttempts; attempt++ {
		success, err := namingClient.RegisterInstance(vo.RegisterInstanceParam{
			Ip:          ip,
			Port:        port,
			ServiceName: serviceName,
			Weight:      1,
			Enable:      true,
			Healthy:     true,
			Ephemeral:   true,
		})
		
		if err == nil && success {
			return nil
		}
		
		log.Printf("Attempt %d failed to register instance: %v", attempt+1, err)
		
		if attempt < c.config.RetryAttempts-1 {
			time.Sleep(c.config.RetryDelay)
		}
	}
	
	return fmt.Errorf("failed to register instance after %d attempts", c.config.RetryAttempts)
}

func (c *NacosFailoverClient) Close() {
	if c.cancel != nil {
		c.cancel()
	}
	
	if c.healthChecker != nil {
		c.healthChecker.Stop()
	}
}

// 使用示例
func main() {
	config := FailoverConfig{
		PrimaryServers:      []string{"192.168.1.101", "192.168.1.102", "192.168.1.103"},
		SecondaryServers:    []string{"192.168.1.201", "192.168.1.202"},
		HealthCheckInterval: 30 * time.Second,
		FailoverTimeout:     10 * time.Second,
		RetryAttempts:       3,
		RetryDelay:          1 * time.Second,
	}
	
	client, err := NewNacosFailoverClient(config)
	if err != nil {
		log.Fatalf("Failed to create failover client: %v", err)
	}
	defer client.Close()
	
	// 测试配置操作
	err = client.PublishConfig("test-config", "DEFAULT_GROUP", "test content")
	if err != nil {
		log.Printf("Failed to publish config: %v", err)
	} else {
		log.Printf("Config published successfully")
	}
	
	content, err := client.GetConfig("test-config", "DEFAULT_GROUP")
	if err != nil {
		log.Printf("Failed to get config: %v", err)
	} else {
		log.Printf("Config content: %s", content)
	}
	
	// 测试服务注册
	err = client.RegisterInstance("test-service", "192.168.1.100", 8080)
	if err != nil {
		log.Printf("Failed to register instance: %v", err)
	} else {
		log.Printf("Instance registered successfully")
	}
	
	// 保持运行
	select {}
}

5. 核心要点

5.1 集群架构设计

  • 节点角色: 领导者、跟随者、观察者
  • 数据一致性: 基于 Raft 协议的强一致性
  • 故障检测: 心跳机制和健康检查
  • 负载均衡: 多种算法支持

5.2 高可用保障

  • 多节点部署: 至少3个节点的奇数集群
  • 数据备份: MySQL 主从复制
  • 故障转移: 自动故障检测和切换
  • 监控告警: 实时监控和告警通知

5.3 性能优化

  • 资源配置: 合理的 JVM 和数据库参数
  • 网络优化: 负载均衡和连接池
  • 缓存策略: 客户端缓存和服务端缓存
  • 压缩传输: Gzip 压缩减少网络开销

6. 最佳实践

6.1 部署实践

  • 使用容器化部署提高可移植性
  • 配置健康检查和自动重启
  • 实施蓝绿部署或滚动更新
  • 定期备份配置和数据

6.2 运维实践

  • 建立完善的监控体系
  • 制定故障应急预案
  • 定期进行故障演练
  • 优化资源配置和性能调优

6.3 安全实践

  • 启用认证和授权机制
  • 使用 HTTPS 加密传输
  • 定期更新和打补丁
  • 限制网络访问权限

7. 下一步学习

  1. 深入学习 Raft 协议
  2. 掌握 Kubernetes 部署
  3. 学习服务网格集成
  4. 了解多数据中心部署
  5. 研究性能调优技巧

通过本章学习,你已经掌握了 Nacos 集群部署与高可用的核心技术。下一章我们将学习 Nacos 与 Spring Cloud 的集成应用。