服务注册与发现

学习目标

通过本章学习,你将能够: - 理解服务注册与发现的核心原理 - 掌握 Nacos 服务注册的多种方式 - 熟练配置健康检查机制 - 实现负载均衡和服务路由 - 处理服务发现的异常情况

1. 服务注册与发现原理

1.1 核心概念

from enum import Enum
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable
import time
import random
import threading
import json

class ServiceStatus(Enum):
    """服务状态枚举"""
    UP = "UP"                    # 服务正常
    DOWN = "DOWN"                # 服务下线
    STARTING = "STARTING"        # 服务启动中
    OUT_OF_SERVICE = "OUT_OF_SERVICE"  # 服务暂停
    UNKNOWN = "UNKNOWN"          # 状态未知

class HealthCheckType(Enum):
    """健康检查类型"""
    TCP = "TCP"                  # TCP 端口检查
    HTTP = "HTTP"                # HTTP 接口检查
    MYSQL = "MYSQL"              # MySQL 连接检查
    NONE = "NONE"                # 不进行健康检查

class LoadBalanceStrategy(Enum):
    """负载均衡策略"""
    ROUND_ROBIN = "round_robin"   # 轮询
    WEIGHTED_ROUND_ROBIN = "weighted_round_robin"  # 加权轮询
    RANDOM = "random"            # 随机
    WEIGHTED_RANDOM = "weighted_random"  # 加权随机
    LEAST_CONNECTIONS = "least_connections"  # 最少连接
    CONSISTENT_HASH = "consistent_hash"  # 一致性哈希

@dataclass
class ServiceInstance:
    """服务实例数据类"""
    service_name: str
    instance_id: str
    ip: str
    port: int
    weight: float = 1.0
    healthy: bool = True
    enabled: bool = True
    ephemeral: bool = True  # 是否为临时实例
    cluster_name: str = "DEFAULT"
    namespace_id: str = "public"
    group_name: str = "DEFAULT_GROUP"
    metadata: Dict[str, str] = field(default_factory=dict)
    register_time: float = field(default_factory=time.time)
    last_beat_time: float = field(default_factory=time.time)
    
    def __post_init__(self):
        if not self.instance_id:
            self.instance_id = f"{self.ip}#{self.port}#{self.cluster_name}"

@dataclass
class HealthCheckConfig:
    """健康检查配置"""
    type: HealthCheckType = HealthCheckType.TCP
    interval: int = 5000  # 检查间隔(毫秒)
    timeout: int = 3000   # 超时时间(毫秒)
    healthy_threshold: int = 1    # 健康阈值
    unhealthy_threshold: int = 3  # 不健康阈值
    path: str = "/health"          # HTTP 检查路径
    headers: Dict[str, str] = field(default_factory=dict)
    expected_response_code: int = 200

@dataclass
class ServiceRegistration:
    """服务注册信息"""
    instance: ServiceInstance
    health_check: HealthCheckConfig
    auto_register: bool = True
    preserve_instance_when_disabled: bool = False

class ServiceRegistry:
    """服务注册表"""
    
    def __init__(self):
        self.services: Dict[str, Dict[str, ServiceInstance]] = {}
        self.health_checkers: Dict[str, threading.Thread] = {}
        self.subscribers: Dict[str, List[Callable]] = {}
        self._lock = threading.RLock()
    
    def register_service(self, registration: ServiceRegistration) -> bool:
        """注册服务"""
        instance = registration.instance
        service_key = self._get_service_key(instance)
        instance_key = instance.instance_id
        
        with self._lock:
            if service_key not in self.services:
                self.services[service_key] = {}
            
            # 更新实例信息
            instance.register_time = time.time()
            instance.last_beat_time = time.time()
            self.services[service_key][instance_key] = instance
            
            # 启动健康检查
            if registration.health_check.type != HealthCheckType.NONE:
                self._start_health_check(instance, registration.health_check)
            
            # 通知订阅者
            self._notify_subscribers(service_key, "register", instance)
            
            print(f"✅ 服务注册成功: {service_key}/{instance_key}")
            return True
    
    def deregister_service(self, service_name: str, instance_id: str, 
                          namespace_id: str = "public", 
                          group_name: str = "DEFAULT_GROUP") -> bool:
        """注销服务"""
        service_key = f"{namespace_id}##{group_name}##{service_name}"
        
        with self._lock:
            if service_key in self.services and instance_id in self.services[service_key]:
                instance = self.services[service_key][instance_id]
                
                # 停止健康检查
                self._stop_health_check(instance_id)
                
                # 移除实例
                del self.services[service_key][instance_id]
                
                # 如果服务没有实例了,移除服务
                if not self.services[service_key]:
                    del self.services[service_key]
                
                # 通知订阅者
                self._notify_subscribers(service_key, "deregister", instance)
                
                print(f"✅ 服务注销成功: {service_key}/{instance_id}")
                return True
            
            print(f"❌ 服务注销失败: 未找到 {service_key}/{instance_id}")
            return False
    
    def discover_services(self, service_name: str, 
                         namespace_id: str = "public",
                         group_name: str = "DEFAULT_GROUP",
                         cluster_name: str = None,
                         healthy_only: bool = True) -> List[ServiceInstance]:
        """发现服务"""
        service_key = f"{namespace_id}##{group_name}##{service_name}"
        instances = []
        
        with self._lock:
            if service_key in self.services:
                for instance in self.services[service_key].values():
                    # 过滤条件
                    if healthy_only and not instance.healthy:
                        continue
                    if not instance.enabled:
                        continue
                    if cluster_name and instance.cluster_name != cluster_name:
                        continue
                    
                    instances.append(instance)
        
        print(f"🔍 发现服务: {service_key} -> {len(instances)} 个实例")
        return instances
    
    def send_heartbeat(self, service_name: str, instance_id: str,
                      namespace_id: str = "public",
                      group_name: str = "DEFAULT_GROUP") -> bool:
        """发送心跳"""
        service_key = f"{namespace_id}##{group_name}##{service_name}"
        
        with self._lock:
            if (service_key in self.services and 
                instance_id in self.services[service_key]):
                
                instance = self.services[service_key][instance_id]
                instance.last_beat_time = time.time()
                instance.healthy = True
                
                print(f"💓 心跳更新: {service_key}/{instance_id}")
                return True
            
            print(f"❌ 心跳失败: 未找到 {service_key}/{instance_id}")
            return False
    
    def subscribe_service(self, service_name: str, callback: Callable,
                         namespace_id: str = "public",
                         group_name: str = "DEFAULT_GROUP"):
        """订阅服务变更"""
        service_key = f"{namespace_id}##{group_name}##{service_name}"
        
        with self._lock:
            if service_key not in self.subscribers:
                self.subscribers[service_key] = []
            self.subscribers[service_key].append(callback)
        
        print(f"📡 订阅服务: {service_key}")
    
    def _get_service_key(self, instance: ServiceInstance) -> str:
        """获取服务键"""
        return f"{instance.namespace_id}##{instance.group_name}##{instance.service_name}"
    
    def _start_health_check(self, instance: ServiceInstance, 
                           config: HealthCheckConfig):
        """启动健康检查"""
        def health_check_worker():
            consecutive_failures = 0
            consecutive_successes = 0
            
            while instance.instance_id in self.health_checkers:
                try:
                    # 模拟健康检查
                    is_healthy = self._perform_health_check(instance, config)
                    
                    if is_healthy:
                        consecutive_successes += 1
                        consecutive_failures = 0
                        
                        if (not instance.healthy and 
                            consecutive_successes >= config.healthy_threshold):
                            instance.healthy = True
                            service_key = self._get_service_key(instance)
                            self._notify_subscribers(service_key, "health_change", instance)
                            print(f"✅ 实例恢复健康: {instance.instance_id}")
                    else:
                        consecutive_failures += 1
                        consecutive_successes = 0
                        
                        if (instance.healthy and 
                            consecutive_failures >= config.unhealthy_threshold):
                            instance.healthy = False
                            service_key = self._get_service_key(instance)
                            self._notify_subscribers(service_key, "health_change", instance)
                            print(f"❌ 实例变为不健康: {instance.instance_id}")
                    
                    time.sleep(config.interval / 1000.0)
                    
                except Exception as e:
                    print(f"❌ 健康检查异常: {instance.instance_id} - {e}")
                    time.sleep(config.interval / 1000.0)
        
        thread = threading.Thread(target=health_check_worker, daemon=True)
        self.health_checkers[instance.instance_id] = thread
        thread.start()
        
        print(f"🏥 启动健康检查: {instance.instance_id}")
    
    def _perform_health_check(self, instance: ServiceInstance, 
                             config: HealthCheckConfig) -> bool:
        """执行健康检查"""
        # 模拟健康检查逻辑
        if config.type == HealthCheckType.TCP:
            # 模拟 TCP 连接检查
            return random.random() > 0.1  # 90% 成功率
        elif config.type == HealthCheckType.HTTP:
            # 模拟 HTTP 检查
            return random.random() > 0.05  # 95% 成功率
        elif config.type == HealthCheckType.MYSQL:
            # 模拟数据库连接检查
            return random.random() > 0.02  # 98% 成功率
        else:
            return True
    
    def _stop_health_check(self, instance_id: str):
        """停止健康检查"""
        if instance_id in self.health_checkers:
            # 线程会在下次循环时自动退出
            del self.health_checkers[instance_id]
            print(f"🛑 停止健康检查: {instance_id}")
    
    def _notify_subscribers(self, service_key: str, event_type: str, 
                           instance: ServiceInstance):
        """通知订阅者"""
        if service_key in self.subscribers:
            for callback in self.subscribers[service_key]:
                try:
                    callback(event_type, instance)
                except Exception as e:
                    print(f"❌ 通知订阅者失败: {e}")
    
    def get_service_stats(self) -> Dict:
        """获取服务统计信息"""
        with self._lock:
            stats = {
                "total_services": len(self.services),
                "total_instances": sum(len(instances) for instances in self.services.values()),
                "healthy_instances": 0,
                "unhealthy_instances": 0,
                "services": {}
            }
            
            for service_key, instances in self.services.items():
                service_stats = {
                    "total_instances": len(instances),
                    "healthy_instances": 0,
                    "unhealthy_instances": 0,
                    "instances": []
                }
                
                for instance in instances.values():
                    if instance.healthy:
                        service_stats["healthy_instances"] += 1
                        stats["healthy_instances"] += 1
                    else:
                        service_stats["unhealthy_instances"] += 1
                        stats["unhealthy_instances"] += 1
                    
                    service_stats["instances"].append({
                        "instance_id": instance.instance_id,
                        "ip": instance.ip,
                        "port": instance.port,
                        "healthy": instance.healthy,
                        "weight": instance.weight
                    })
                
                stats["services"][service_key] = service_stats
            
            return stats

class LoadBalancer:
    """负载均衡器"""
    
    def __init__(self, strategy: LoadBalanceStrategy = LoadBalanceStrategy.ROUND_ROBIN):
        self.strategy = strategy
        self.round_robin_counters: Dict[str, int] = {}
        self.connection_counts: Dict[str, int] = {}
    
    def select_instance(self, instances: List[ServiceInstance], 
                       request_key: str = None) -> Optional[ServiceInstance]:
        """选择服务实例"""
        if not instances:
            return None
        
        # 过滤健康且启用的实例
        available_instances = [
            inst for inst in instances 
            if inst.healthy and inst.enabled
        ]
        
        if not available_instances:
            return None
        
        if self.strategy == LoadBalanceStrategy.ROUND_ROBIN:
            return self._round_robin_select(available_instances)
        elif self.strategy == LoadBalanceStrategy.WEIGHTED_ROUND_ROBIN:
            return self._weighted_round_robin_select(available_instances)
        elif self.strategy == LoadBalanceStrategy.RANDOM:
            return self._random_select(available_instances)
        elif self.strategy == LoadBalanceStrategy.WEIGHTED_RANDOM:
            return self._weighted_random_select(available_instances)
        elif self.strategy == LoadBalanceStrategy.LEAST_CONNECTIONS:
            return self._least_connections_select(available_instances)
        elif self.strategy == LoadBalanceStrategy.CONSISTENT_HASH:
            return self._consistent_hash_select(available_instances, request_key)
        else:
            return available_instances[0]
    
    def _round_robin_select(self, instances: List[ServiceInstance]) -> ServiceInstance:
        """轮询选择"""
        service_key = f"{instances[0].service_name}"
        if service_key not in self.round_robin_counters:
            self.round_robin_counters[service_key] = 0
        
        index = self.round_robin_counters[service_key] % len(instances)
        self.round_robin_counters[service_key] += 1
        
        return instances[index]
    
    def _weighted_round_robin_select(self, instances: List[ServiceInstance]) -> ServiceInstance:
        """加权轮询选择"""
        # 简化实现:根据权重创建加权列表
        weighted_instances = []
        for instance in instances:
            weight = max(1, int(instance.weight * 10))  # 权重放大10倍
            weighted_instances.extend([instance] * weight)
        
        return self._round_robin_select(weighted_instances)
    
    def _random_select(self, instances: List[ServiceInstance]) -> ServiceInstance:
        """随机选择"""
        return random.choice(instances)
    
    def _weighted_random_select(self, instances: List[ServiceInstance]) -> ServiceInstance:
        """加权随机选择"""
        total_weight = sum(instance.weight for instance in instances)
        random_weight = random.uniform(0, total_weight)
        
        current_weight = 0
        for instance in instances:
            current_weight += instance.weight
            if random_weight <= current_weight:
                return instance
        
        return instances[-1]  # 兜底
    
    def _least_connections_select(self, instances: List[ServiceInstance]) -> ServiceInstance:
        """最少连接选择"""
        min_connections = float('inf')
        selected_instance = None
        
        for instance in instances:
            connections = self.connection_counts.get(instance.instance_id, 0)
            if connections < min_connections:
                min_connections = connections
                selected_instance = instance
        
        return selected_instance
    
    def _consistent_hash_select(self, instances: List[ServiceInstance], 
                               request_key: str) -> ServiceInstance:
        """一致性哈希选择"""
        if not request_key:
            return self._random_select(instances)
        
        # 简化的一致性哈希实现
        hash_value = hash(request_key)
        index = hash_value % len(instances)
        return instances[index]
    
    def record_connection(self, instance_id: str, increment: bool = True):
        """记录连接数"""
        if instance_id not in self.connection_counts:
            self.connection_counts[instance_id] = 0
        
        if increment:
            self.connection_counts[instance_id] += 1
        else:
            self.connection_counts[instance_id] = max(0, 
                self.connection_counts[instance_id] - 1)

# 使用示例
if __name__ == "__main__":
    # 创建服务注册表
    registry = ServiceRegistry()
    
    # 创建负载均衡器
    load_balancer = LoadBalancer(LoadBalanceStrategy.WEIGHTED_ROUND_ROBIN)
    
    print("=== 服务注册示例 ===")
    
    # 注册多个服务实例
    instances = [
        ServiceInstance(
            service_name="user-service",
            instance_id="user-service-1",
            ip="192.168.1.101",
            port=8080,
            weight=1.0,
            metadata={"version": "1.0.0", "zone": "zone-a"}
        ),
        ServiceInstance(
            service_name="user-service",
            instance_id="user-service-2",
            ip="192.168.1.102",
            port=8080,
            weight=2.0,
            metadata={"version": "1.0.0", "zone": "zone-b"}
        ),
        ServiceInstance(
            service_name="user-service",
            instance_id="user-service-3",
            ip="192.168.1.103",
            port=8080,
            weight=1.5,
            metadata={"version": "1.1.0", "zone": "zone-a"}
        )
    ]
    
    # 注册服务
    for instance in instances:
        health_check = HealthCheckConfig(
            type=HealthCheckType.HTTP,
            interval=5000,
            path="/health"
        )
        registration = ServiceRegistration(instance, health_check)
        registry.register_service(registration)
    
    # 等待一段时间让健康检查运行
    time.sleep(2)
    
    print("\n=== 服务发现示例 ===")
    
    # 发现服务
    discovered_instances = registry.discover_services("user-service")
    print(f"发现 {len(discovered_instances)} 个服务实例")
    
    # 负载均衡测试
    print("\n=== 负载均衡测试 ===")
    selection_counts = {}
    
    for i in range(10):
        selected = load_balancer.select_instance(discovered_instances)
        if selected:
            if selected.instance_id not in selection_counts:
                selection_counts[selected.instance_id] = 0
            selection_counts[selected.instance_id] += 1
            print(f"第 {i+1} 次选择: {selected.instance_id} (权重: {selected.weight})")
    
    print("\n选择统计:")
    for instance_id, count in selection_counts.items():
        print(f"  {instance_id}: {count} 次")
    
    # 服务统计
    print("\n=== 服务统计 ===")
    stats = registry.get_service_stats()
    print(f"总服务数: {stats['total_services']}")
    print(f"总实例数: {stats['total_instances']}")
    print(f"健康实例数: {stats['healthy_instances']}")
    print(f"不健康实例数: {stats['unhealthy_instances']}")

2. 服务注册实现

2.1 Java 客户端注册

// Java 服务注册示例
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.PropertyKeyConst;

import java.util.Properties;
import java.util.HashMap;
import java.util.Map;

public class ServiceRegistrationExample {
    
    public static void main(String[] args) throws Exception {
        // 1. 配置 Nacos 连接
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:8848");
        properties.setProperty(PropertyKeyConst.NAMESPACE, "dev");
        properties.setProperty(PropertyKeyConst.USERNAME, "nacos");
        properties.setProperty(PropertyKeyConst.PASSWORD, "nacos");
        
        // 2. 创建命名服务
        NamingService namingService = NacosFactory.createNamingService(properties);
        
        // 3. 创建服务实例
        Instance instance = new Instance();
        instance.setIp("192.168.1.100");
        instance.setPort(8080);
        instance.setWeight(1.0);
        instance.setHealthy(true);
        instance.setEnabled(true);
        instance.setEphemeral(true);
        instance.setClusterName("DEFAULT");
        
        // 4. 设置元数据
        Map<String, String> metadata = new HashMap<>();
        metadata.put("version", "1.0.0");
        metadata.put("environment", "dev");
        metadata.put("team", "backend");
        instance.setMetadata(metadata);
        
        // 5. 注册服务
        String serviceName = "user-service";
        String groupName = "DEFAULT_GROUP";
        
        namingService.registerInstance(serviceName, groupName, instance);
        System.out.println("服务注册成功: " + serviceName);
        
        // 6. 保持服务运行
        Thread.sleep(60000);
        
        // 7. 注销服务
        namingService.deregisterInstance(serviceName, groupName, 
            instance.getIp(), instance.getPort());
        System.out.println("服务注销成功: " + serviceName);
    }
}

2.2 Spring Boot 集成

// Spring Boot 配置
// application.yml
spring:
  application:
    name: user-service
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848
        namespace: dev
        group: DEFAULT_GROUP
        cluster-name: DEFAULT
        weight: 1.0
        metadata:
          version: 1.0.0
          environment: dev
        heart-beat-interval: 5000
        heart-beat-timeout: 15000
        ip-delete-timeout: 30000
        register-enabled: true
        
// 启动类
@SpringBootApplication
@EnableDiscoveryClient
public class UserServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(UserServiceApplication.class, args);
    }
}

// 服务提供者
@RestController
@RequestMapping("/api/users")
public class UserController {
    
    @Autowired
    private DiscoveryClient discoveryClient;
    
    @GetMapping("/info")
    public Map<String, Object> getServiceInfo() {
        List<ServiceInstance> instances = discoveryClient.getInstances("user-service");
        
        Map<String, Object> info = new HashMap<>();
        info.put("service-name", "user-service");
        info.put("instance-count", instances.size());
        info.put("current-instance", getCurrentInstance());
        
        return info;
    }
    
    @GetMapping("/health")
    public Map<String, String> health() {
        Map<String, String> health = new HashMap<>();
        health.put("status", "UP");
        health.put("timestamp", Instant.now().toString());
        return health;
    }
    
    private Map<String, Object> getCurrentInstance() {
        try {
            String localHost = InetAddress.getLocalHost().getHostAddress();
            Map<String, Object> instance = new HashMap<>();
            instance.put("ip", localHost);
            instance.put("port", 8080);
            return instance;
        } catch (Exception e) {
            return Collections.emptyMap();
        }
    }
}

2.3 Go 客户端注册

// Go 服务注册示例
package main

import (
    "fmt"
    "log"
    "net/http"
    "time"
    
    "github.com/nacos-group/nacos-sdk-go/clients"
    "github.com/nacos-group/nacos-sdk-go/clients/naming_client"
    "github.com/nacos-group/nacos-sdk-go/common/constant"
    "github.com/nacos-group/nacos-sdk-go/vo"
)

type ServiceRegistry struct {
    namingClient naming_client.INamingClient
    serviceName  string
    ip          string
    port        uint64
}

func NewServiceRegistry() (*ServiceRegistry, error) {
    // 1. 创建服务端配置
    serverConfigs := []constant.ServerConfig{
        {
            IpAddr: "127.0.0.1",
            Port:   8848,
        },
    }
    
    // 2. 创建客户端配置
    clientConfig := constant.ClientConfig{
        NamespaceId:         "dev",
        TimeoutMs:           5000,
        NotLoadCacheAtStart: true,
        LogDir:              "/tmp/nacos/log",
        CacheDir:            "/tmp/nacos/cache",
        LogLevel:            "info",
        Username:            "nacos",
        Password:            "nacos",
    }
    
    // 3. 创建命名客户端
    namingClient, err := clients.CreateNamingClient(map[string]interface{}{
        "serverConfigs": serverConfigs,
        "clientConfig":  clientConfig,
    })
    
    if err != nil {
        return nil, fmt.Errorf("创建命名客户端失败: %v", err)
    }
    
    return &ServiceRegistry{
        namingClient: namingClient,
        serviceName:  "user-service-go",
        ip:          "192.168.1.100",
        port:        8080,
    }, nil
}

func (sr *ServiceRegistry) RegisterService() error {
    // 注册服务实例
    success, err := sr.namingClient.RegisterInstance(vo.RegisterInstanceParam{
        Ip:          sr.ip,
        Port:        sr.port,
        ServiceName: sr.serviceName,
        GroupName:   "DEFAULT_GROUP",
        ClusterName: "DEFAULT",
        Weight:      1.0,
        Enable:      true,
        Healthy:     true,
        Ephemeral:   true,
        Metadata: map[string]string{
            "version":     "1.0.0",
            "environment": "dev",
            "language":    "go",
        },
    })
    
    if err != nil {
        return fmt.Errorf("注册服务失败: %v", err)
    }
    
    if !success {
        return fmt.Errorf("注册服务失败: 返回 false")
    }
    
    log.Printf("✅ 服务注册成功: %s@%s:%d", sr.serviceName, sr.ip, sr.port)
    return nil
}

func (sr *ServiceRegistry) DeregisterService() error {
    success, err := sr.namingClient.DeregisterInstance(vo.DeregisterInstanceParam{
        Ip:          sr.ip,
        Port:        sr.port,
        ServiceName: sr.serviceName,
        GroupName:   "DEFAULT_GROUP",
        ClusterName: "DEFAULT",
        Ephemeral:   true,
    })
    
    if err != nil {
        return fmt.Errorf("注销服务失败: %v", err)
    }
    
    if !success {
        return fmt.Errorf("注销服务失败: 返回 false")
    }
    
    log.Printf("✅ 服务注销成功: %s@%s:%d", sr.serviceName, sr.ip, sr.port)
    return nil
}

func (sr *ServiceRegistry) DiscoverServices(serviceName string) ([]vo.Instance, error) {
    instances, err := sr.namingClient.SelectInstances(vo.SelectInstancesParam{
        ServiceName: serviceName,
        GroupName:   "DEFAULT_GROUP",
        Clusters:    []string{"DEFAULT"},
        HealthyOnly: true,
    })
    
    if err != nil {
        return nil, fmt.Errorf("发现服务失败: %v", err)
    }
    
    log.Printf("🔍 发现服务: %s -> %d 个实例", serviceName, len(instances))
    return instances, nil
}

func (sr *ServiceRegistry) StartHeartbeat() {
    ticker := time.NewTicker(5 * time.Second)
    go func() {
        for range ticker.C {
            // Nacos Go SDK 会自动发送心跳,这里只是示例
            log.Printf("💓 心跳发送: %s", sr.serviceName)
        }
    }()
}

// HTTP 服务器
func startHTTPServer() {
    http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Content-Type", "application/json")
        w.WriteHeader(http.StatusOK)
        fmt.Fprintf(w, `{"status":"UP","timestamp":"%s"}`, time.Now().Format(time.RFC3339))
    })
    
    http.HandleFunc("/api/users", func(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Content-Type", "application/json")
        w.WriteHeader(http.StatusOK)
        fmt.Fprintf(w, `{"message":"User service is running","timestamp":"%s"}`, time.Now().Format(time.RFC3339))
    })
    
    log.Println("🚀 HTTP 服务器启动在端口 8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

func main() {
    // 1. 创建服务注册器
    registry, err := NewServiceRegistry()
    if err != nil {
        log.Fatalf("创建服务注册器失败: %v", err)
    }
    
    // 2. 注册服务
    if err := registry.RegisterService(); err != nil {
        log.Fatalf("注册服务失败: %v", err)
    }
    
    // 3. 启动心跳
    registry.StartHeartbeat()
    
    // 4. 服务发现示例
    go func() {
        time.Sleep(2 * time.Second)
        instances, err := registry.DiscoverServices("user-service")
        if err != nil {
            log.Printf("服务发现失败: %v", err)
        } else {
            for _, instance := range instances {
                log.Printf("发现实例: %s:%d (权重: %.1f)", 
                    instance.Ip, instance.Port, instance.Weight)
            }
        }
    }()
    
    // 5. 启动 HTTP 服务器
    startHTTPServer()
}

3. 健康检查配置

3.1 健康检查类型

3.1.1 TCP 健康检查

# application.yml
spring:
  cloud:
    nacos:
      discovery:
        heart-beat-interval: 5000    # 心跳间隔 5 秒
        heart-beat-timeout: 15000    # 心跳超时 15 秒
        ip-delete-timeout: 30000     # 实例删除超时 30 秒

3.1.2 HTTP 健康检查

// 自定义健康检查端点
@RestController
public class HealthController {
    
    @Autowired
    private DataSource dataSource;
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @GetMapping("/actuator/health")
    public ResponseEntity<Map<String, Object>> health() {
        Map<String, Object> health = new HashMap<>();
        Map<String, Object> details = new HashMap<>();
        
        boolean isHealthy = true;
        
        // 检查数据库连接
        try {
            dataSource.getConnection().close();
            details.put("database", Map.of("status", "UP"));
        } catch (Exception e) {
            details.put("database", Map.of("status", "DOWN", "error", e.getMessage()));
            isHealthy = false;
        }
        
        // 检查 Redis 连接
        try {
            redisTemplate.opsForValue().get("health-check");
            details.put("redis", Map.of("status", "UP"));
        } catch (Exception e) {
            details.put("redis", Map.of("status", "DOWN", "error", e.getMessage()));
            isHealthy = false;
        }
        
        health.put("status", isHealthy ? "UP" : "DOWN");
        health.put("details", details);
        health.put("timestamp", Instant.now().toString());
        
        HttpStatus status = isHealthy ? HttpStatus.OK : HttpStatus.SERVICE_UNAVAILABLE;
        return ResponseEntity.status(status).body(health);
    }
}

3.1.3 自定义健康检查

// 自定义健康检查器
@Component
public class CustomHealthIndicator implements HealthIndicator {
    
    @Override
    public Health health() {
        // 自定义健康检查逻辑
        boolean isHealthy = checkCustomLogic();
        
        if (isHealthy) {
            return Health.up()
                .withDetail("custom", "All systems operational")
                .withDetail("timestamp", Instant.now())
                .build();
        } else {
            return Health.down()
                .withDetail("custom", "System degraded")
                .withDetail("timestamp", Instant.now())
                .build();
        }
    }
    
    private boolean checkCustomLogic() {
        // 实现自定义检查逻辑
        // 例如:检查外部服务、检查业务指标等
        return true;
    }
}

3.2 健康检查配置优化

# application.properties

# 健康检查配置
management.endpoints.web.exposure.include=health,info,metrics
management.endpoint.health.show-details=always
management.health.defaults.enabled=true

# Nacos 健康检查配置
spring.cloud.nacos.discovery.heart-beat-interval=5000
spring.cloud.nacos.discovery.heart-beat-timeout=15000
spring.cloud.nacos.discovery.ip-delete-timeout=30000

# 自定义健康检查
management.health.custom.enabled=true
management.health.db.enabled=true
management.health.redis.enabled=true
management.health.diskspace.enabled=true

4. 负载均衡策略

4.1 客户端负载均衡

// 自定义负载均衡规则
@Component
public class CustomLoadBalancerRule {
    
    private final AtomicInteger counter = new AtomicInteger(0);
    
    public ServiceInstance choose(List<ServiceInstance> instances) {
        if (instances == null || instances.isEmpty()) {
            return null;
        }
        
        // 过滤健康实例
        List<ServiceInstance> healthyInstances = instances.stream()
            .filter(instance -> {
                Map<String, String> metadata = instance.getMetadata();
                return "UP".equals(metadata.get("status"));
            })
            .collect(Collectors.toList());
        
        if (healthyInstances.isEmpty()) {
            return null;
        }
        
        // 加权轮询
        return weightedRoundRobin(healthyInstances);
    }
    
    private ServiceInstance weightedRoundRobin(List<ServiceInstance> instances) {
        // 计算总权重
        double totalWeight = instances.stream()
            .mapToDouble(instance -> {
                Map<String, String> metadata = instance.getMetadata();
                return Double.parseDouble(metadata.getOrDefault("weight", "1.0"));
            })
            .sum();
        
        // 生成随机数
        double random = Math.random() * totalWeight;
        
        // 选择实例
        double currentWeight = 0;
        for (ServiceInstance instance : instances) {
            Map<String, String> metadata = instance.getMetadata();
            double weight = Double.parseDouble(metadata.getOrDefault("weight", "1.0"));
            currentWeight += weight;
            
            if (random <= currentWeight) {
                return instance;
            }
        }
        
        return instances.get(0); // 兜底
    }
}

// 服务调用示例
@Service
public class UserServiceClient {
    
    @Autowired
    private DiscoveryClient discoveryClient;
    
    @Autowired
    private CustomLoadBalancerRule loadBalancerRule;
    
    @Autowired
    private RestTemplate restTemplate;
    
    public String callUserService(String path) {
        // 1. 发现服务实例
        List<ServiceInstance> instances = discoveryClient.getInstances("user-service");
        
        // 2. 负载均衡选择实例
        ServiceInstance selectedInstance = loadBalancerRule.choose(instances);
        
        if (selectedInstance == null) {
            throw new RuntimeException("No available instances for user-service");
        }
        
        // 3. 构建请求 URL
        String url = String.format("http://%s:%d%s", 
            selectedInstance.getHost(), 
            selectedInstance.getPort(), 
            path);
        
        // 4. 发起请求
        try {
            return restTemplate.getForObject(url, String.class);
        } catch (Exception e) {
            // 可以实现重试逻辑
            throw new RuntimeException("Failed to call user-service", e);
        }
    }
}

4.2 一致性哈希负载均衡

// 一致性哈希负载均衡器
public class ConsistentHashLoadBalancer {
    
    private final TreeMap<Long, ServiceInstance> ring = new TreeMap<>();
    private final int virtualNodes = 150; // 虚拟节点数
    
    public void updateInstances(List<ServiceInstance> instances) {
        ring.clear();
        
        for (ServiceInstance instance : instances) {
            for (int i = 0; i < virtualNodes; i++) {
                String virtualNodeKey = instance.getInstanceId() + "#" + i;
                long hash = hash(virtualNodeKey);
                ring.put(hash, instance);
            }
        }
    }
    
    public ServiceInstance select(String key) {
        if (ring.isEmpty()) {
            return null;
        }
        
        long hash = hash(key);
        
        // 找到第一个大于等于该 hash 值的节点
        Map.Entry<Long, ServiceInstance> entry = ring.ceilingEntry(hash);
        
        // 如果没有找到,则返回第一个节点(环形结构)
        if (entry == null) {
            entry = ring.firstEntry();
        }
        
        return entry.getValue();
    }
    
    private long hash(String key) {
        // 使用 FNV-1a 哈希算法
        final long FNV_64_INIT = 0xcbf29ce484222325L;
        final long FNV_64_PRIME = 0x100000001b3L;
        
        long hash = FNV_64_INIT;
        for (byte b : key.getBytes()) {
            hash ^= (b & 0xff);
            hash *= FNV_64_PRIME;
        }
        
        return hash;
    }
}

5. 服务路由规则

5.1 基于版本的路由

// 版本路由规则
@Component
public class VersionBasedRouter {
    
    public List<ServiceInstance> filterByVersion(List<ServiceInstance> instances, 
                                                String targetVersion) {
        if (targetVersion == null || targetVersion.isEmpty()) {
            return instances;
        }
        
        return instances.stream()
            .filter(instance -> {
                Map<String, String> metadata = instance.getMetadata();
                String version = metadata.get("version");
                return targetVersion.equals(version);
            })
            .collect(Collectors.toList());
    }
    
    public List<ServiceInstance> filterByZone(List<ServiceInstance> instances, 
                                             String targetZone) {
        if (targetZone == null || targetZone.isEmpty()) {
            return instances;
        }
        
        List<ServiceInstance> sameZoneInstances = instances.stream()
            .filter(instance -> {
                Map<String, String> metadata = instance.getMetadata();
                String zone = metadata.get("zone");
                return targetZone.equals(zone);
            })
            .collect(Collectors.toList());
        
        // 如果同区域没有实例,返回所有实例
        return sameZoneInstances.isEmpty() ? instances : sameZoneInstances;
    }
    
    public List<ServiceInstance> filterByCanary(List<ServiceInstance> instances, 
                                               boolean useCanary) {
        return instances.stream()
            .filter(instance -> {
                Map<String, String> metadata = instance.getMetadata();
                boolean isCanary = "true".equals(metadata.get("canary"));
                return useCanary == isCanary;
            })
            .collect(Collectors.toList());
    }
}

5.2 动态路由配置

# 路由规则配置
routing:
  rules:
    - service: user-service
      conditions:
        - header: "X-Version"
          value: "v2"
          target:
            version: "2.0.0"
        - header: "X-Canary"
          value: "true"
          target:
            canary: true
      default:
        version: "1.0.0"
        zone: "preferred"
// 动态路由处理器
@Component
public class DynamicRoutingHandler {
    
    @Value("${routing.rules}")
    private List<RoutingRule> routingRules;
    
    public List<ServiceInstance> route(String serviceName, 
                                      HttpServletRequest request,
                                      List<ServiceInstance> instances) {
        
        RoutingRule rule = findRoutingRule(serviceName);
        if (rule == null) {
            return instances;
        }
        
        // 检查路由条件
        for (RoutingCondition condition : rule.getConditions()) {
            String headerValue = request.getHeader(condition.getHeader());
            if (condition.getValue().equals(headerValue)) {
                return filterInstances(instances, condition.getTarget());
            }
        }
        
        // 使用默认路由
        return filterInstances(instances, rule.getDefaultTarget());
    }
    
    private RoutingRule findRoutingRule(String serviceName) {
        return routingRules.stream()
            .filter(rule -> rule.getService().equals(serviceName))
            .findFirst()
            .orElse(null);
    }
    
    private List<ServiceInstance> filterInstances(List<ServiceInstance> instances, 
                                                 RoutingTarget target) {
        return instances.stream()
            .filter(instance -> matchesTarget(instance, target))
            .collect(Collectors.toList());
    }
    
    private boolean matchesTarget(ServiceInstance instance, RoutingTarget target) {
        Map<String, String> metadata = instance.getMetadata();
        
        if (target.getVersion() != null) {
            if (!target.getVersion().equals(metadata.get("version"))) {
                return false;
            }
        }
        
        if (target.getZone() != null) {
            if (!target.getZone().equals(metadata.get("zone"))) {
                return false;
            }
        }
        
        if (target.isCanary()) {
            if (!"true".equals(metadata.get("canary"))) {
                return false;
            }
        }
        
        return true;
    }
}

6. 异常处理与容错

6.1 服务发现异常处理

// 容错服务发现客户端
@Component
public class FaultTolerantDiscoveryClient {
    
    @Autowired
    private DiscoveryClient discoveryClient;
    
    private final Cache<String, List<ServiceInstance>> instanceCache = 
        Caffeine.newBuilder()
            .maximumSize(1000)
            .expireAfterWrite(30, TimeUnit.SECONDS)
            .build();
    
    public List<ServiceInstance> getInstances(String serviceName) {
        try {
            // 尝试从注册中心获取
            List<ServiceInstance> instances = discoveryClient.getInstances(serviceName);
            
            if (!instances.isEmpty()) {
                // 更新缓存
                instanceCache.put(serviceName, instances);
                return instances;
            }
        } catch (Exception e) {
            log.warn("Failed to get instances from registry for service: {}", serviceName, e);
        }
        
        // 从缓存获取
        List<ServiceInstance> cachedInstances = instanceCache.getIfPresent(serviceName);
        if (cachedInstances != null) {
            log.info("Using cached instances for service: {}", serviceName);
            return cachedInstances;
        }
        
        // 返回空列表
        log.error("No instances available for service: {}", serviceName);
        return Collections.emptyList();
    }
    
    public ServiceInstance selectInstance(String serviceName, String requestKey) {
        List<ServiceInstance> instances = getInstances(serviceName);
        
        if (instances.isEmpty()) {
            throw new ServiceUnavailableException("No instances available for " + serviceName);
        }
        
        // 健康检查过滤
        List<ServiceInstance> healthyInstances = instances.stream()
            .filter(this::isInstanceHealthy)
            .collect(Collectors.toList());
        
        if (healthyInstances.isEmpty()) {
            log.warn("No healthy instances for service: {}, using all instances", serviceName);
            healthyInstances = instances;
        }
        
        // 负载均衡选择
        return selectInstanceWithLoadBalancing(healthyInstances, requestKey);
    }
    
    private boolean isInstanceHealthy(ServiceInstance instance) {
        // 实现健康检查逻辑
        try {
            String healthUrl = String.format("http://%s:%d/actuator/health", 
                instance.getHost(), instance.getPort());
            
            RestTemplate restTemplate = new RestTemplate();
            restTemplate.getRequestFactory().setConnectTimeout(Duration.ofSeconds(2));
            restTemplate.getRequestFactory().setReadTimeout(Duration.ofSeconds(2));
            
            ResponseEntity<Map> response = restTemplate.getForEntity(healthUrl, Map.class);
            
            return response.getStatusCode() == HttpStatus.OK &&
                   "UP".equals(((Map) response.getBody()).get("status"));
        } catch (Exception e) {
            log.debug("Health check failed for instance: {}:{}", 
                instance.getHost(), instance.getPort());
            return false;
        }
    }
    
    private ServiceInstance selectInstanceWithLoadBalancing(List<ServiceInstance> instances, 
                                                           String requestKey) {
        // 实现负载均衡逻辑
        if (requestKey != null) {
            // 一致性哈希
            int hash = requestKey.hashCode();
            int index = Math.abs(hash) % instances.size();
            return instances.get(index);
        } else {
            // 随机选择
            int index = ThreadLocalRandom.current().nextInt(instances.size());
            return instances.get(index);
        }
    }
}

6.2 重试机制

// 重试配置
@Configuration
public class RetryConfiguration {
    
    @Bean
    @ConditionalOnMissingBean
    public RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        
        // 重试策略
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(1000); // 1秒间隔
        retryTemplate.setBackOffPolicy(backOffPolicy);
        
        // 重试次数
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        retryTemplate.setRetryPolicy(retryPolicy);
        
        return retryTemplate;
    }
}

// 重试服务调用
@Service
public class RetryableServiceClient {
    
    @Autowired
    private RetryTemplate retryTemplate;
    
    @Autowired
    private FaultTolerantDiscoveryClient discoveryClient;
    
    @Autowired
    private RestTemplate restTemplate;
    
    public String callServiceWithRetry(String serviceName, String path, String requestKey) {
        return retryTemplate.execute(context -> {
            ServiceInstance instance = discoveryClient.selectInstance(serviceName, requestKey);
            
            String url = String.format("http://%s:%d%s", 
                instance.getHost(), instance.getPort(), path);
            
            try {
                return restTemplate.getForObject(url, String.class);
            } catch (Exception e) {
                log.warn("Attempt {} failed for {}: {}", 
                    context.getRetryCount() + 1, url, e.getMessage());
                throw e;
            }
        });
    }
}

7. 总结

7.1 核心要点

  1. 服务注册: 支持多种客户端 SDK,自动注册和手动注册
  2. 健康检查: TCP、HTTP、自定义检查,保证服务可用性
  3. 负载均衡: 多种策略,支持权重和一致性哈希
  4. 服务路由: 基于版本、区域、标签的智能路由
  5. 容错机制: 缓存、重试、降级,提高系统稳定性

7.2 最佳实践

  1. 合理设置心跳间隔: 平衡实时性和性能
  2. 实现健康检查: 确保服务真正可用
  3. 使用元数据: 丰富服务信息,支持高级路由
  4. 缓存服务列表: 提高容错能力
  5. 监控服务状态: 及时发现和处理问题

7.3 下一步学习

在下一章中,我们将学习: - 配置管理的核心功能 - 动态配置推送机制 - 配置变更监听 - 配置加密和安全

通过本章的学习,你已经掌握了 Nacos 服务注册与发现的核心功能。接下来让我们深入学习配置管理的强大特性!