学习目标

  • 掌握 Nacos Go 客户端的安装和配置
  • 学会使用 Go 客户端进行服务注册与发现
  • 掌握 Go 客户端的配置管理功能
  • 了解 Go 客户端的高级特性和最佳实践

1. Go 客户端概述

1.1 客户端特性

Nacos Go 客户端提供了完整的服务发现和配置管理功能:

  • 服务注册与发现: 支持服务实例的注册、注销和查询
  • 配置管理: 支持配置的发布、获取、监听和删除
  • 健康检查: 支持多种健康检查方式
  • 负载均衡: 内置多种负载均衡算法
  • 故障转移: 支持客户端故障转移
  • 安全认证: 支持用户名密码认证

1.2 核心组件

// client_types.go
// Go 客户端核心类型定义
package nacos

import (
    "context"
    "time"
)

// ServiceInstance 服务实例
type ServiceInstance struct {
    InstanceId  string            `json:"instanceId"`
    ServiceName string            `json:"serviceName"`
    GroupName   string            `json:"groupName"`
    ClusterName string            `json:"clusterName"`
    Ip          string            `json:"ip"`
    Port        uint64            `json:"port"`
    Weight      float64           `json:"weight"`
    Enable      bool              `json:"enable"`
    Healthy     bool              `json:"healthy"`
    Ephemeral   bool              `json:"ephemeral"`
    Metadata    map[string]string `json:"metadata"`
}

// ConfigParam 配置参数
type ConfigParam struct {
    DataId   string `json:"dataId"`
    Group    string `json:"group"`
    Content  string `json:"content"`
    Tag      string `json:"tag"`
    AppName  string `json:"appName"`
    BetaIps  string `json:"betaIps"`
    CasMd5   string `json:"casMd5"`
    Type     string `json:"type"`
}

// ConfigChangeEvent 配置变更事件
type ConfigChangeEvent struct {
    DataId    string
    Group     string
    Content   string
    EventType string
    Timestamp time.Time
}

// HealthCheckType 健康检查类型
type HealthCheckType string

const (
    HealthCheckTCP  HealthCheckType = "TCP"
    HealthCheckHTTP HealthCheckType = "HTTP"
    HealthCheckNone HealthCheckType = "NONE"
)

// LoadBalanceStrategy 负载均衡策略
type LoadBalanceStrategy string

const (
    LoadBalanceRoundRobin LoadBalanceStrategy = "round-robin"
    LoadBalanceRandom     LoadBalanceStrategy = "random"
    LoadBalanceWeighted   LoadBalanceStrategy = "weighted"
)

// NacosClient Nacos 客户端接口
type NacosClient interface {
    // 服务发现相关
    RegisterInstance(ctx context.Context, param RegisterInstanceParam) error
    DeregisterInstance(ctx context.Context, param DeregisterInstanceParam) error
    GetService(ctx context.Context, param GetServiceParam) (*Service, error)
    GetAllServicesInfo(ctx context.Context, param GetAllServicesInfoParam) (*ServiceList, error)
    SelectInstances(ctx context.Context, param SelectInstancesParam) ([]*ServiceInstance, error)
    SelectOneHealthyInstance(ctx context.Context, param SelectOneHealthyInstanceParam) (*ServiceInstance, error)
    Subscribe(ctx context.Context, param SubscribeParam) error
    Unsubscribe(ctx context.Context, param SubscribeParam) error
    
    // 配置管理相关
    GetConfig(ctx context.Context, param ConfigParam) (string, error)
    PublishConfig(ctx context.Context, param ConfigParam) error
    DeleteConfig(ctx context.Context, param ConfigParam) error
    ListenConfig(ctx context.Context, param ConfigParam, listener ConfigListener) error
    CancelListenConfig(ctx context.Context, param ConfigParam) error
    
    // 客户端管理
    Close() error
}

// ConfigListener 配置监听器
type ConfigListener interface {
    OnChange(event ConfigChangeEvent)
    OnError(err error)
}

// ServiceChangeListener 服务变更监听器
type ServiceChangeListener interface {
    OnChange(services []*ServiceInstance)
    OnError(err error)
}

2. 安装与配置

2.1 安装依赖

# 初始化 Go 模块
go mod init nacos-go-example

# 安装 Nacos Go 客户端
go get github.com/nacos-group/nacos-sdk-go/v2

# 安装其他依赖
go get github.com/gin-gonic/gin
go get github.com/spf13/viper
go get go.uber.org/zap
go get github.com/prometheus/client_golang

2.2 客户端配置

// config.go
// 客户端配置
package config

import (
    "github.com/nacos-group/nacos-sdk-go/v2/clients"
    "github.com/nacos-group/nacos-sdk-go/v2/clients/config_client"
    "github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
    "github.com/nacos-group/nacos-sdk-go/v2/common/constant"
    "github.com/nacos-group/nacos-sdk-go/v2/vo"
    "go.uber.org/zap"
)

// NacosConfig Nacos 配置
type NacosConfig struct {
    ServerConfigs []constant.ServerConfig
    ClientConfig  constant.ClientConfig
    NamingClient  naming_client.INamingClient
    ConfigClient  config_client.IConfigClient
    Logger        *zap.Logger
}

// NewNacosConfig 创建 Nacos 配置
func NewNacosConfig() (*NacosConfig, error) {
    logger, _ := zap.NewProduction()
    
    // 服务端配置
    serverConfigs := []constant.ServerConfig{
        {
            IpAddr:      "localhost",
            Port:        8848,
            ContextPath: "/nacos",
            Scheme:      "http",
        },
    }
    
    // 客户端配置
    clientConfig := constant.ClientConfig{
        NamespaceId:         "public",
        TimeoutMs:           5000,
        NotLoadCacheAtStart: true,
        LogDir:              "./logs",
        CacheDir:            "./cache",
        LogLevel:            "info",
        Username:            "nacos",
        Password:            "nacos",
    }
    
    // 创建命名客户端
    namingClient, err := clients.NewNamingClient(
        vo.NacosClientParam{
            ClientConfig:  &clientConfig,
            ServerConfigs: serverConfigs,
        },
    )
    if err != nil {
        logger.Error("创建命名客户端失败", zap.Error(err))
        return nil, err
    }
    
    // 创建配置客户端
    configClient, err := clients.NewConfigClient(
        vo.NacosClientParam{
            ClientConfig:  &clientConfig,
            ServerConfigs: serverConfigs,
        },
    )
    if err != nil {
        logger.Error("创建配置客户端失败", zap.Error(err))
        return nil, err
    }
    
    return &NacosConfig{
        ServerConfigs: serverConfigs,
        ClientConfig:  clientConfig,
        NamingClient:  namingClient,
        ConfigClient:  configClient,
        Logger:        logger,
    }, nil
}

// Close 关闭客户端
func (nc *NacosConfig) Close() {
    if nc.Logger != nil {
        nc.Logger.Sync()
    }
}

2.3 配置文件管理

// config_manager.go
// 配置文件管理器
package config

import (
    "encoding/json"
    "fmt"
    "github.com/spf13/viper"
    "go.uber.org/zap"
    "gopkg.in/yaml.v2"
    "strings"
)

// AppConfig 应用配置
type AppConfig struct {
    Server   ServerConfig   `yaml:"server" json:"server"`
    Database DatabaseConfig `yaml:"database" json:"database"`
    Redis    RedisConfig    `yaml:"redis" json:"redis"`
    Log      LogConfig      `yaml:"log" json:"log"`
    Business BusinessConfig `yaml:"business" json:"business"`
}

// ServerConfig 服务器配置
type ServerConfig struct {
    Port         int    `yaml:"port" json:"port"`
    Host         string `yaml:"host" json:"host"`
    ReadTimeout  int    `yaml:"readTimeout" json:"readTimeout"`
    WriteTimeout int    `yaml:"writeTimeout" json:"writeTimeout"`
    IdleTimeout  int    `yaml:"idleTimeout" json:"idleTimeout"`
}

// DatabaseConfig 数据库配置
type DatabaseConfig struct {
    Driver          string `yaml:"driver" json:"driver"`
    Host            string `yaml:"host" json:"host"`
    Port            int    `yaml:"port" json:"port"`
    Username        string `yaml:"username" json:"username"`
    Password        string `yaml:"password" json:"password"`
    Database        string `yaml:"database" json:"database"`
    MaxOpenConns    int    `yaml:"maxOpenConns" json:"maxOpenConns"`
    MaxIdleConns    int    `yaml:"maxIdleConns" json:"maxIdleConns"`
    ConnMaxLifetime int    `yaml:"connMaxLifetime" json:"connMaxLifetime"`
}

// RedisConfig Redis 配置
type RedisConfig struct {
    Host        string `yaml:"host" json:"host"`
    Port        int    `yaml:"port" json:"port"`
    Password    string `yaml:"password" json:"password"`
    Database    int    `yaml:"database" json:"database"`
    MaxRetries  int    `yaml:"maxRetries" json:"maxRetries"`
    PoolSize    int    `yaml:"poolSize" json:"poolSize"`
    MinIdleConns int   `yaml:"minIdleConns" json:"minIdleConns"`
}

// LogConfig 日志配置
type LogConfig struct {
    Level      string `yaml:"level" json:"level"`
    Format     string `yaml:"format" json:"format"`
    Output     string `yaml:"output" json:"output"`
    MaxSize    int    `yaml:"maxSize" json:"maxSize"`
    MaxBackups int    `yaml:"maxBackups" json:"maxBackups"`
    MaxAge     int    `yaml:"maxAge" json:"maxAge"`
    Compress   bool   `yaml:"compress" json:"compress"`
}

// BusinessConfig 业务配置
type BusinessConfig struct {
    OrderTimeout   int     `yaml:"orderTimeout" json:"orderTimeout"`
    RetryTimes     int     `yaml:"retryTimes" json:"retryTimes"`
    DefaultPageSize int    `yaml:"defaultPageSize" json:"defaultPageSize"`
    MaxPageSize    int     `yaml:"maxPageSize" json:"maxPageSize"`
    CacheTTL       int     `yaml:"cacheTTL" json:"cacheTTL"`
    RateLimit      float64 `yaml:"rateLimit" json:"rateLimit"`
}

// ConfigManager 配置管理器
type ConfigManager struct {
    nacos  *NacosConfig
    config *AppConfig
    logger *zap.Logger
}

// NewConfigManager 创建配置管理器
func NewConfigManager(nacos *NacosConfig) *ConfigManager {
    return &ConfigManager{
        nacos:  nacos,
        config: &AppConfig{},
        logger: nacos.Logger,
    }
}

// LoadConfig 加载配置
func (cm *ConfigManager) LoadConfig(dataId, group string) error {
    // 从 Nacos 获取配置
    content, err := cm.nacos.ConfigClient.GetConfig(vo.ConfigParam{
        DataId: dataId,
        Group:  group,
    })
    if err != nil {
        cm.logger.Error("获取配置失败", zap.Error(err))
        return err
    }
    
    // 解析配置
    if err := cm.parseConfig(content, dataId); err != nil {
        cm.logger.Error("解析配置失败", zap.Error(err))
        return err
    }
    
    // 监听配置变更
    if err := cm.listenConfig(dataId, group); err != nil {
        cm.logger.Error("监听配置失败", zap.Error(err))
        return err
    }
    
    cm.logger.Info("配置加载成功", zap.String("dataId", dataId), zap.String("group", group))
    return nil
}

// parseConfig 解析配置
func (cm *ConfigManager) parseConfig(content, dataId string) error {
    // 根据文件扩展名选择解析方式
    if strings.HasSuffix(dataId, ".yaml") || strings.HasSuffix(dataId, ".yml") {
        return yaml.Unmarshal([]byte(content), cm.config)
    } else if strings.HasSuffix(dataId, ".json") {
        return json.Unmarshal([]byte(content), cm.config)
    } else {
        // 默认使用 YAML 解析
        return yaml.Unmarshal([]byte(content), cm.config)
    }
}

// listenConfig 监听配置变更
func (cm *ConfigManager) listenConfig(dataId, group string) error {
    return cm.nacos.ConfigClient.ListenConfig(vo.ConfigParam{
        DataId: dataId,
        Group:  group,
        OnChange: func(namespace, group, dataId, data string) {
            cm.logger.Info("配置发生变更",
                zap.String("namespace", namespace),
                zap.String("group", group),
                zap.String("dataId", dataId),
            )
            
            // 重新解析配置
            if err := cm.parseConfig(data, dataId); err != nil {
                cm.logger.Error("重新解析配置失败", zap.Error(err))
                return
            }
            
            cm.logger.Info("配置热更新成功")
        },
    })
}

// GetConfig 获取配置
func (cm *ConfigManager) GetConfig() *AppConfig {
    return cm.config
}

// PublishConfig 发布配置
func (cm *ConfigManager) PublishConfig(dataId, group, content string) error {
    success, err := cm.nacos.ConfigClient.PublishConfig(vo.ConfigParam{
        DataId:  dataId,
        Group:   group,
        Content: content,
    })
    
    if err != nil {
        return err
    }
    
    if !success {
        return fmt.Errorf("发布配置失败")
    }
    
    cm.logger.Info("配置发布成功", zap.String("dataId", dataId), zap.String("group", group))
    return nil
}

// DeleteConfig 删除配置
func (cm *ConfigManager) DeleteConfig(dataId, group string) error {
    success, err := cm.nacos.ConfigClient.DeleteConfig(vo.ConfigParam{
        DataId: dataId,
        Group:  group,
    })
    
    if err != nil {
        return err
    }
    
    if !success {
        return fmt.Errorf("删除配置失败")
    }
    
    cm.logger.Info("配置删除成功", zap.String("dataId", dataId), zap.String("group", group))
    return nil
}

3. 服务注册与发现

3.1 服务注册

// service_registry.go
// 服务注册器
package service

import (
    "context"
    "fmt"
    "github.com/nacos-group/nacos-sdk-go/v2/vo"
    "go.uber.org/zap"
    "net"
    "os"
    "strconv"
    "time"
)

// ServiceRegistry 服务注册器
type ServiceRegistry struct {
    nacos       *config.NacosConfig
    serviceName string
    groupName   string
    clusterName string
    ip          string
    port        uint64
    weight      float64
    metadata    map[string]string
    logger      *zap.Logger
}

// NewServiceRegistry 创建服务注册器
func NewServiceRegistry(nacos *config.NacosConfig, serviceName string, port uint64) (*ServiceRegistry, error) {
    ip, err := getLocalIP()
    if err != nil {
        return nil, err
    }
    
    return &ServiceRegistry{
        nacos:       nacos,
        serviceName: serviceName,
        groupName:   "DEFAULT_GROUP",
        clusterName: "DEFAULT",
        ip:          ip,
        port:        port,
        weight:      1.0,
        metadata:    make(map[string]string),
        logger:      nacos.Logger,
    }, nil
}

// SetGroupName 设置分组名称
func (sr *ServiceRegistry) SetGroupName(groupName string) {
    sr.groupName = groupName
}

// SetClusterName 设置集群名称
func (sr *ServiceRegistry) SetClusterName(clusterName string) {
    sr.clusterName = clusterName
}

// SetWeight 设置权重
func (sr *ServiceRegistry) SetWeight(weight float64) {
    sr.weight = weight
}

// SetMetadata 设置元数据
func (sr *ServiceRegistry) SetMetadata(key, value string) {
    sr.metadata[key] = value
}

// Register 注册服务
func (sr *ServiceRegistry) Register(ctx context.Context) error {
    // 添加默认元数据
    sr.metadata["version"] = "1.0.0"
    sr.metadata["env"] = getEnv("ENV", "dev")
    sr.metadata["startTime"] = strconv.FormatInt(time.Now().Unix(), 10)
    sr.metadata["pid"] = strconv.Itoa(os.Getpid())
    
    success, err := sr.nacos.NamingClient.RegisterInstance(vo.RegisterInstanceParam{
        Ip:          sr.ip,
        Port:        sr.port,
        ServiceName: sr.serviceName,
        GroupName:   sr.groupName,
        ClusterName: sr.clusterName,
        Weight:      sr.weight,
        Enable:      true,
        Healthy:     true,
        Ephemeral:   true,
        Metadata:    sr.metadata,
    })
    
    if err != nil {
        sr.logger.Error("服务注册失败", zap.Error(err))
        return err
    }
    
    if !success {
        return fmt.Errorf("服务注册失败")
    }
    
    sr.logger.Info("服务注册成功",
        zap.String("serviceName", sr.serviceName),
        zap.String("ip", sr.ip),
        zap.Uint64("port", sr.port),
        zap.String("groupName", sr.groupName),
        zap.String("clusterName", sr.clusterName),
    )
    
    return nil
}

// Deregister 注销服务
func (sr *ServiceRegistry) Deregister(ctx context.Context) error {
    success, err := sr.nacos.NamingClient.DeregisterInstance(vo.DeregisterInstanceParam{
        Ip:          sr.ip,
        Port:        sr.port,
        ServiceName: sr.serviceName,
        GroupName:   sr.groupName,
        ClusterName: sr.clusterName,
        Ephemeral:   true,
    })
    
    if err != nil {
        sr.logger.Error("服务注销失败", zap.Error(err))
        return err
    }
    
    if !success {
        return fmt.Errorf("服务注销失败")
    }
    
    sr.logger.Info("服务注销成功",
        zap.String("serviceName", sr.serviceName),
        zap.String("ip", sr.ip),
        zap.Uint64("port", sr.port),
    )
    
    return nil
}

// UpdateInstance 更新服务实例
func (sr *ServiceRegistry) UpdateInstance(ctx context.Context) error {
    success, err := sr.nacos.NamingClient.UpdateInstance(vo.UpdateInstanceParam{
        Ip:          sr.ip,
        Port:        sr.port,
        ServiceName: sr.serviceName,
        GroupName:   sr.groupName,
        ClusterName: sr.clusterName,
        Weight:      sr.weight,
        Enable:      true,
        Healthy:     true,
        Ephemeral:   true,
        Metadata:    sr.metadata,
    })
    
    if err != nil {
        sr.logger.Error("更新服务实例失败", zap.Error(err))
        return err
    }
    
    if !success {
        return fmt.Errorf("更新服务实例失败")
    }
    
    sr.logger.Info("更新服务实例成功")
    return nil
}

// getLocalIP 获取本地 IP 地址
func getLocalIP() (string, error) {
    conn, err := net.Dial("udp", "8.8.8.8:80")
    if err != nil {
        return "", err
    }
    defer conn.Close()
    
    localAddr := conn.LocalAddr().(*net.UDPAddr)
    return localAddr.IP.String(), nil
}

// getEnv 获取环境变量
func getEnv(key, defaultValue string) string {
    if value := os.Getenv(key); value != "" {
        return value
    }
    return defaultValue
}

3.2 服务发现

// service_discovery.go
// 服务发现器
package service

import (
    "context"
    "fmt"
    "github.com/nacos-group/nacos-sdk-go/v2/model"
    "github.com/nacos-group/nacos-sdk-go/v2/vo"
    "go.uber.org/zap"
    "math/rand"
    "sync"
    "time"
)

// ServiceDiscovery 服务发现器
type ServiceDiscovery struct {
    nacos       *config.NacosConfig
    cache       map[string][]*model.Instance
    cacheMutex  sync.RWMutex
    subscribers map[string][]ServiceChangeListener
    subMutex    sync.RWMutex
    logger      *zap.Logger
}

// ServiceChangeListener 服务变更监听器
type ServiceChangeListener interface {
    OnChange(serviceName string, instances []*model.Instance)
    OnError(serviceName string, err error)
}

// NewServiceDiscovery 创建服务发现器
func NewServiceDiscovery(nacos *config.NacosConfig) *ServiceDiscovery {
    return &ServiceDiscovery{
        nacos:       nacos,
        cache:       make(map[string][]*model.Instance),
        subscribers: make(map[string][]ServiceChangeListener),
        logger:      nacos.Logger,
    }
}

// GetService 获取服务信息
func (sd *ServiceDiscovery) GetService(ctx context.Context, serviceName, groupName string) (*model.Service, error) {
    service, err := sd.nacos.NamingClient.GetService(vo.GetServiceParam{
        ServiceName: serviceName,
        GroupName:   groupName,
    })
    
    if err != nil {
        sd.logger.Error("获取服务信息失败",
            zap.String("serviceName", serviceName),
            zap.String("groupName", groupName),
            zap.Error(err),
        )
        return nil, err
    }
    
    return &service, nil
}

// GetInstances 获取服务实例列表
func (sd *ServiceDiscovery) GetInstances(ctx context.Context, serviceName, groupName string, healthyOnly bool) ([]*model.Instance, error) {
    instances, err := sd.nacos.NamingClient.SelectInstances(vo.SelectInstancesParam{
        ServiceName: serviceName,
        GroupName:   groupName,
        HealthyOnly: healthyOnly,
    })
    
    if err != nil {
        sd.logger.Error("获取服务实例失败",
            zap.String("serviceName", serviceName),
            zap.String("groupName", groupName),
            zap.Bool("healthyOnly", healthyOnly),
            zap.Error(err),
        )
        return nil, err
    }
    
    // 更新缓存
    sd.updateCache(serviceName, instances)
    
    sd.logger.Debug("获取服务实例成功",
        zap.String("serviceName", serviceName),
        zap.Int("instanceCount", len(instances)),
    )
    
    return instances, nil
}

// GetHealthyInstance 获取一个健康的服务实例
func (sd *ServiceDiscovery) GetHealthyInstance(ctx context.Context, serviceName, groupName string) (*model.Instance, error) {
    instance, err := sd.nacos.NamingClient.SelectOneHealthyInstance(vo.SelectOneHealthyInstanceParam{
        ServiceName: serviceName,
        GroupName:   groupName,
    })
    
    if err != nil {
        sd.logger.Error("获取健康服务实例失败",
            zap.String("serviceName", serviceName),
            zap.String("groupName", groupName),
            zap.Error(err),
        )
        return nil, err
    }
    
    return &instance, nil
}

// GetInstanceWithLoadBalance 使用负载均衡获取服务实例
func (sd *ServiceDiscovery) GetInstanceWithLoadBalance(ctx context.Context, serviceName, groupName string, strategy LoadBalanceStrategy) (*model.Instance, error) {
    instances, err := sd.GetInstances(ctx, serviceName, groupName, true)
    if err != nil {
        return nil, err
    }
    
    if len(instances) == 0 {
        return nil, fmt.Errorf("没有可用的服务实例")
    }
    
    switch strategy {
    case LoadBalanceRandom:
        return sd.randomSelect(instances), nil
    case LoadBalanceWeighted:
        return sd.weightedSelect(instances), nil
    case LoadBalanceRoundRobin:
        return sd.roundRobinSelect(serviceName, instances), nil
    default:
        return sd.randomSelect(instances), nil
    }
}

// Subscribe 订阅服务变更
func (sd *ServiceDiscovery) Subscribe(ctx context.Context, serviceName, groupName string, listener ServiceChangeListener) error {
    // 添加监听器
    sd.addListener(serviceName, listener)
    
    // 订阅服务变更
    err := sd.nacos.NamingClient.Subscribe(&vo.SubscribeParam{
        ServiceName: serviceName,
        GroupName:   groupName,
        SubscribeCallback: func(services []model.Instance, err error) {
            if err != nil {
                sd.logger.Error("服务变更通知错误",
                    zap.String("serviceName", serviceName),
                    zap.Error(err),
                )
                sd.notifyError(serviceName, err)
                return
            }
            
            // 转换为指针切片
            instances := make([]*model.Instance, len(services))
            for i := range services {
                instances[i] = &services[i]
            }
            
            // 更新缓存
            sd.updateCache(serviceName, instances)
            
            // 通知监听器
            sd.notifyChange(serviceName, instances)
            
            sd.logger.Info("服务实例变更",
                zap.String("serviceName", serviceName),
                zap.Int("instanceCount", len(instances)),
            )
        },
    })
    
    if err != nil {
        sd.logger.Error("订阅服务失败",
            zap.String("serviceName", serviceName),
            zap.String("groupName", groupName),
            zap.Error(err),
        )
        return err
    }
    
    sd.logger.Info("订阅服务成功",
        zap.String("serviceName", serviceName),
        zap.String("groupName", groupName),
    )
    
    return nil
}

// Unsubscribe 取消订阅服务变更
func (sd *ServiceDiscovery) Unsubscribe(ctx context.Context, serviceName, groupName string) error {
    err := sd.nacos.NamingClient.Unsubscribe(&vo.SubscribeParam{
        ServiceName: serviceName,
        GroupName:   groupName,
    })
    
    if err != nil {
        sd.logger.Error("取消订阅服务失败",
            zap.String("serviceName", serviceName),
            zap.String("groupName", groupName),
            zap.Error(err),
        )
        return err
    }
    
    // 移除监听器
    sd.removeListeners(serviceName)
    
    sd.logger.Info("取消订阅服务成功",
        zap.String("serviceName", serviceName),
        zap.String("groupName", groupName),
    )
    
    return nil
}

// 负载均衡算法实现

// randomSelect 随机选择
func (sd *ServiceDiscovery) randomSelect(instances []*model.Instance) *model.Instance {
    if len(instances) == 0 {
        return nil
    }
    return instances[rand.Intn(len(instances))]
}

// weightedSelect 加权选择
func (sd *ServiceDiscovery) weightedSelect(instances []*model.Instance) *model.Instance {
    if len(instances) == 0 {
        return nil
    }
    
    totalWeight := 0.0
    for _, instance := range instances {
        totalWeight += instance.Weight
    }
    
    if totalWeight <= 0 {
        return sd.randomSelect(instances)
    }
    
    random := rand.Float64() * totalWeight
    currentWeight := 0.0
    
    for _, instance := range instances {
        currentWeight += instance.Weight
        if random <= currentWeight {
            return instance
        }
    }
    
    return instances[len(instances)-1]
}

// roundRobinSelect 轮询选择
var roundRobinCounters = make(map[string]int)
var roundRobinMutex sync.Mutex

func (sd *ServiceDiscovery) roundRobinSelect(serviceName string, instances []*model.Instance) *model.Instance {
    if len(instances) == 0 {
        return nil
    }
    
    roundRobinMutex.Lock()
    defer roundRobinMutex.Unlock()
    
    counter := roundRobinCounters[serviceName]
    instance := instances[counter%len(instances)]
    roundRobinCounters[serviceName] = counter + 1
    
    return instance
}

// 缓存和监听器管理

// updateCache 更新缓存
func (sd *ServiceDiscovery) updateCache(serviceName string, instances []*model.Instance) {
    sd.cacheMutex.Lock()
    defer sd.cacheMutex.Unlock()
    sd.cache[serviceName] = instances
}

// addListener 添加监听器
func (sd *ServiceDiscovery) addListener(serviceName string, listener ServiceChangeListener) {
    sd.subMutex.Lock()
    defer sd.subMutex.Unlock()
    sd.subscribers[serviceName] = append(sd.subscribers[serviceName], listener)
}

// removeListeners 移除监听器
func (sd *ServiceDiscovery) removeListeners(serviceName string) {
    sd.subMutex.Lock()
    defer sd.subMutex.Unlock()
    delete(sd.subscribers, serviceName)
}

// notifyChange 通知变更
func (sd *ServiceDiscovery) notifyChange(serviceName string, instances []*model.Instance) {
    sd.subMutex.RLock()
    listeners := sd.subscribers[serviceName]
    sd.subMutex.RUnlock()
    
    for _, listener := range listeners {
        go listener.OnChange(serviceName, instances)
    }
}

// notifyError 通知错误
func (sd *ServiceDiscovery) notifyError(serviceName string, err error) {
    sd.subMutex.RLock()
    listeners := sd.subscribers[serviceName]
    sd.subMutex.RUnlock()
    
    for _, listener := range listeners {
        go listener.OnError(serviceName, err)
    }
}

4. 实际应用示例

4.1 用户服务示例

// user_service.go
// 用户服务示例
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "github.com/gin-gonic/gin"
    "go.uber.org/zap"
    "net/http"
    "os"
    "os/signal"
    "strconv"
    "syscall"
    "time"
)

// User 用户模型
type User struct {
    ID       int64  `json:"id"`
    Username string `json:"username"`
    Email    string `json:"email"`
    Phone    string `json:"phone"`
    Status   int    `json:"status"`
    CreateAt int64  `json:"createAt"`
    UpdateAt int64  `json:"updateAt"`
}

// UserService 用户服务
type UserService struct {
    nacos     *config.NacosConfig
    registry  *service.ServiceRegistry
    discovery *service.ServiceDiscovery
    configMgr *config.ConfigManager
    logger    *zap.Logger
    users     map[int64]*User // 模拟数据库
}

// NewUserService 创建用户服务
func NewUserService() (*UserService, error) {
    // 初始化 Nacos 配置
    nacos, err := config.NewNacosConfig()
    if err != nil {
        return nil, err
    }
    
    // 创建服务注册器
    registry, err := service.NewServiceRegistry(nacos, "user-service", 8080)
    if err != nil {
        return nil, err
    }
    
    // 设置服务元数据
    registry.SetMetadata("service.type", "user")
    registry.SetMetadata("service.version", "1.0.0")
    registry.SetMetadata("service.protocol", "http")
    
    // 创建服务发现器
    discovery := service.NewServiceDiscovery(nacos)
    
    // 创建配置管理器
    configMgr := config.NewConfigManager(nacos)
    
    // 初始化模拟数据
    users := make(map[int64]*User)
    users[1] = &User{
        ID:       1,
        Username: "admin",
        Email:    "admin@example.com",
        Phone:    "13800138000",
        Status:   1,
        CreateAt: time.Now().Unix(),
        UpdateAt: time.Now().Unix(),
    }
    
    return &UserService{
        nacos:     nacos,
        registry:  registry,
        discovery: discovery,
        configMgr: configMgr,
        logger:    nacos.Logger,
        users:     users,
    }, nil
}

// Start 启动服务
func (us *UserService) Start() error {
    // 加载配置
    if err := us.configMgr.LoadConfig("user-service.yaml", "DEFAULT_GROUP"); err != nil {
        us.logger.Error("加载配置失败", zap.Error(err))
        return err
    }
    
    // 注册服务
    if err := us.registry.Register(context.Background()); err != nil {
        us.logger.Error("注册服务失败", zap.Error(err))
        return err
    }
    
    // 启动 HTTP 服务器
    router := us.setupRouter()
    server := &http.Server{
        Addr:    ":8080",
        Handler: router,
    }
    
    // 优雅关闭
    go us.gracefulShutdown(server)
    
    us.logger.Info("用户服务启动成功", zap.String("addr", ":8080"))
    return server.ListenAndServe()
}

// setupRouter 设置路由
func (us *UserService) setupRouter() *gin.Engine {
    gin.SetMode(gin.ReleaseMode)
    router := gin.New()
    router.Use(gin.Logger(), gin.Recovery())
    
    // 健康检查
    router.GET("/health", us.healthCheck)
    
    // 用户相关接口
    userGroup := router.Group("/api/v1/users")
    {
        userGroup.GET("", us.getUsers)
        userGroup.GET("/:id", us.getUserByID)
        userGroup.POST("", us.createUser)
        userGroup.PUT("/:id", us.updateUser)
        userGroup.DELETE("/:id", us.deleteUser)
    }
    
    // 服务信息接口
    router.GET("/info", us.getServiceInfo)
    
    return router
}

// healthCheck 健康检查
func (us *UserService) healthCheck(c *gin.Context) {
    c.JSON(http.StatusOK, gin.H{
        "status":    "UP",
        "timestamp": time.Now().Unix(),
        "service":   "user-service",
    })
}

// getUsers 获取用户列表
func (us *UserService) getUsers(c *gin.Context) {
    users := make([]*User, 0, len(us.users))
    for _, user := range us.users {
        users = append(users, user)
    }
    
    c.JSON(http.StatusOK, gin.H{
        "code": 0,
        "msg":  "success",
        "data": users,
    })
}

// getUserByID 根据 ID 获取用户
func (us *UserService) getUserByID(c *gin.Context) {
    idStr := c.Param("id")
    id, err := strconv.ParseInt(idStr, 10, 64)
    if err != nil {
        c.JSON(http.StatusBadRequest, gin.H{
            "code": 400,
            "msg":  "无效的用户 ID",
        })
        return
    }
    
    user, exists := us.users[id]
    if !exists {
        c.JSON(http.StatusNotFound, gin.H{
            "code": 404,
            "msg":  "用户不存在",
        })
        return
    }
    
    c.JSON(http.StatusOK, gin.H{
        "code": 0,
        "msg":  "success",
        "data": user,
    })
}

// createUser 创建用户
func (us *UserService) createUser(c *gin.Context) {
    var user User
    if err := c.ShouldBindJSON(&user); err != nil {
        c.JSON(http.StatusBadRequest, gin.H{
            "code": 400,
            "msg":  "请求参数错误",
        })
        return
    }
    
    // 生成 ID
    user.ID = int64(len(us.users) + 1)
    user.CreateAt = time.Now().Unix()
    user.UpdateAt = time.Now().Unix()
    user.Status = 1
    
    us.users[user.ID] = &user
    
    c.JSON(http.StatusCreated, gin.H{
        "code": 0,
        "msg":  "创建成功",
        "data": user,
    })
}

// updateUser 更新用户
func (us *UserService) updateUser(c *gin.Context) {
    idStr := c.Param("id")
    id, err := strconv.ParseInt(idStr, 10, 64)
    if err != nil {
        c.JSON(http.StatusBadRequest, gin.H{
            "code": 400,
            "msg":  "无效的用户 ID",
        })
        return
    }
    
    user, exists := us.users[id]
    if !exists {
        c.JSON(http.StatusNotFound, gin.H{
            "code": 404,
            "msg":  "用户不存在",
        })
        return
    }
    
    var updateUser User
    if err := c.ShouldBindJSON(&updateUser); err != nil {
        c.JSON(http.StatusBadRequest, gin.H{
            "code": 400,
            "msg":  "请求参数错误",
        })
        return
    }
    
    // 更新字段
    if updateUser.Username != "" {
        user.Username = updateUser.Username
    }
    if updateUser.Email != "" {
        user.Email = updateUser.Email
    }
    if updateUser.Phone != "" {
        user.Phone = updateUser.Phone
    }
    user.UpdateAt = time.Now().Unix()
    
    c.JSON(http.StatusOK, gin.H{
        "code": 0,
        "msg":  "更新成功",
        "data": user,
    })
}

// deleteUser 删除用户
func (us *UserService) deleteUser(c *gin.Context) {
    idStr := c.Param("id")
    id, err := strconv.ParseInt(idStr, 10, 64)
    if err != nil {
        c.JSON(http.StatusBadRequest, gin.H{
            "code": 400,
            "msg":  "无效的用户 ID",
        })
        return
    }
    
    _, exists := us.users[id]
    if !exists {
        c.JSON(http.StatusNotFound, gin.H{
            "code": 404,
            "msg":  "用户不存在",
        })
        return
    }
    
    delete(us.users, id)
    
    c.JSON(http.StatusOK, gin.H{
        "code": 0,
        "msg":  "删除成功",
    })
}

// getServiceInfo 获取服务信息
func (us *UserService) getServiceInfo(c *gin.Context) {
    config := us.configMgr.GetConfig()
    
    c.JSON(http.StatusOK, gin.H{
        "service": "user-service",
        "version": "1.0.0",
        "config":  config,
        "uptime":  time.Now().Unix(),
    })
}

// gracefulShutdown 优雅关闭
func (us *UserService) gracefulShutdown(server *http.Server) {
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit
    
    us.logger.Info("正在关闭服务...")
    
    // 注销服务
    if err := us.registry.Deregister(context.Background()); err != nil {
        us.logger.Error("注销服务失败", zap.Error(err))
    }
    
    // 关闭 HTTP 服务器
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    if err := server.Shutdown(ctx); err != nil {
        us.logger.Error("关闭服务器失败", zap.Error(err))
    }
    
    // 关闭 Nacos 客户端
    us.nacos.Close()
    
    us.logger.Info("服务已关闭")
}

// main 主函数
func main() {
    userService, err := NewUserService()
    if err != nil {
        panic(err)
    }
    
    if err := userService.Start(); err != nil && err != http.ErrServerClosed {
        panic(err)
    }
}

4.2 订单服务示例

// order_service.go
// 订单服务示例
package main

import (
    "bytes"
    "context"
    "encoding/json"
    "fmt"
    "github.com/gin-gonic/gin"
    "github.com/nacos-group/nacos-sdk-go/v2/model"
    "go.uber.org/zap"
    "io"
    "net/http"
    "os"
    "os/signal"
    "strconv"
    "syscall"
    "time"
)

// Order 订单模型
type Order struct {
    ID       int64  `json:"id"`
    UserID   int64  `json:"userId"`
    Amount   float64 `json:"amount"`
    Status   int    `json:"status"`
    CreateAt int64  `json:"createAt"`
    UpdateAt int64  `json:"updateAt"`
}

// OrderService 订单服务
type OrderService struct {
    nacos       *config.NacosConfig
    registry    *service.ServiceRegistry
    discovery   *service.ServiceDiscovery
    configMgr   *config.ConfigManager
    logger      *zap.Logger
    orders      map[int64]*Order // 模拟数据库
    httpClient  *http.Client
}

// ServiceChangeListenerImpl 服务变更监听器实现
type ServiceChangeListenerImpl struct {
    serviceName string
    logger      *zap.Logger
}

// OnChange 服务变更回调
func (scl *ServiceChangeListenerImpl) OnChange(serviceName string, instances []*model.Instance) {
    scl.logger.Info("服务实例变更",
        zap.String("serviceName", serviceName),
        zap.Int("instanceCount", len(instances)),
    )
}

// OnError 服务变更错误回调
func (scl *ServiceChangeListenerImpl) OnError(serviceName string, err error) {
    scl.logger.Error("服务变更错误",
        zap.String("serviceName", serviceName),
        zap.Error(err),
    )
}

// NewOrderService 创建订单服务
func NewOrderService() (*OrderService, error) {
    // 初始化 Nacos 配置
    nacos, err := config.NewNacosConfig()
    if err != nil {
        return nil, err
    }
    
    // 创建服务注册器
    registry, err := service.NewServiceRegistry(nacos, "order-service", 8081)
    if err != nil {
        return nil, err
    }
    
    // 设置服务元数据
    registry.SetMetadata("service.type", "order")
    registry.SetMetadata("service.version", "1.0.0")
    registry.SetMetadata("service.protocol", "http")
    
    // 创建服务发现器
    discovery := service.NewServiceDiscovery(nacos)
    
    // 创建配置管理器
    configMgr := config.NewConfigManager(nacos)
    
    // 初始化模拟数据
    orders := make(map[int64]*Order)
    
    // 创建 HTTP 客户端
    httpClient := &http.Client{
        Timeout: 10 * time.Second,
    }
    
    return &OrderService{
        nacos:      nacos,
        registry:   registry,
        discovery:  discovery,
        configMgr:  configMgr,
        logger:     nacos.Logger,
        orders:     orders,
        httpClient: httpClient,
    }, nil
}

// Start 启动服务
func (os *OrderService) Start() error {
    // 加载配置
    if err := os.configMgr.LoadConfig("order-service.yaml", "DEFAULT_GROUP"); err != nil {
        os.logger.Error("加载配置失败", zap.Error(err))
        return err
    }
    
    // 注册服务
    if err := os.registry.Register(context.Background()); err != nil {
        os.logger.Error("注册服务失败", zap.Error(err))
        return err
    }
    
    // 订阅用户服务
    listener := &ServiceChangeListenerImpl{
        serviceName: "user-service",
        logger:      os.logger,
    }
    if err := os.discovery.Subscribe(context.Background(), "user-service", "DEFAULT_GROUP", listener); err != nil {
        os.logger.Error("订阅用户服务失败", zap.Error(err))
        return err
    }
    
    // 启动 HTTP 服务器
    router := os.setupRouter()
    server := &http.Server{
        Addr:    ":8081",
        Handler: router,
    }
    
    // 优雅关闭
    go os.gracefulShutdown(server)
    
    os.logger.Info("订单服务启动成功", zap.String("addr", ":8081"))
    return server.ListenAndServe()
}

// setupRouter 设置路由
func (os *OrderService) setupRouter() *gin.Engine {
    gin.SetMode(gin.ReleaseMode)
    router := gin.New()
    router.Use(gin.Logger(), gin.Recovery())
    
    // 健康检查
    router.GET("/health", os.healthCheck)
    
    // 订单相关接口
    orderGroup := router.Group("/api/v1/orders")
    {
        orderGroup.GET("", os.getOrders)
        orderGroup.GET("/:id", os.getOrderByID)
        orderGroup.POST("", os.createOrder)
        orderGroup.PUT("/:id", os.updateOrder)
        orderGroup.DELETE("/:id", os.deleteOrder)
    }
    
    // 服务信息接口
    router.GET("/info", os.getServiceInfo)
    
    return router
}

// healthCheck 健康检查
func (os *OrderService) healthCheck(c *gin.Context) {
    c.JSON(http.StatusOK, gin.H{
        "status":    "UP",
        "timestamp": time.Now().Unix(),
        "service":   "order-service",
    })
}

// getOrders 获取订单列表
func (os *OrderService) getOrders(c *gin.Context) {
    orders := make([]*Order, 0, len(os.orders))
    for _, order := range os.orders {
        orders = append(orders, order)
    }
    
    c.JSON(http.StatusOK, gin.H{
        "code": 0,
        "msg":  "success",
        "data": orders,
    })
}

// getOrderByID 根据 ID 获取订单
func (os *OrderService) getOrderByID(c *gin.Context) {
    idStr := c.Param("id")
    id, err := strconv.ParseInt(idStr, 10, 64)
    if err != nil {
        c.JSON(http.StatusBadRequest, gin.H{
            "code": 400,
            "msg":  "无效的订单 ID",
        })
        return
    }
    
    order, exists := os.orders[id]
    if !exists {
        c.JSON(http.StatusNotFound, gin.H{
            "code": 404,
            "msg":  "订单不存在",
        })
        return
    }
    
    c.JSON(http.StatusOK, gin.H{
        "code": 0,
        "msg":  "success",
        "data": order,
    })
}

// createOrder 创建订单
func (os *OrderService) createOrder(c *gin.Context) {
    var order Order
    if err := c.ShouldBindJSON(&order); err != nil {
        c.JSON(http.StatusBadRequest, gin.H{
            "code": 400,
            "msg":  "请求参数错误",
        })
        return
    }
    
    // 验证用户是否存在
    if !os.validateUser(order.UserID) {
        c.JSON(http.StatusBadRequest, gin.H{
            "code": 400,
            "msg":  "用户不存在",
        })
        return
    }
    
    // 生成 ID
    order.ID = int64(len(os.orders) + 1)
    order.CreateAt = time.Now().Unix()
    order.UpdateAt = time.Now().Unix()
    order.Status = 1
    
    os.orders[order.ID] = &order
    
    c.JSON(http.StatusCreated, gin.H{
        "code": 0,
        "msg":  "创建成功",
        "data": order,
    })
}

// updateOrder 更新订单
func (os *OrderService) updateOrder(c *gin.Context) {
    idStr := c.Param("id")
    id, err := strconv.ParseInt(idStr, 10, 64)
    if err != nil {
        c.JSON(http.StatusBadRequest, gin.H{
            "code": 400,
            "msg":  "无效的订单 ID",
        })
        return
    }
    
    order, exists := os.orders[id]
    if !exists {
        c.JSON(http.StatusNotFound, gin.H{
            "code": 404,
            "msg":  "订单不存在",
        })
        return
    }
    
    var updateOrder Order
    if err := c.ShouldBindJSON(&updateOrder); err != nil {
        c.JSON(http.StatusBadRequest, gin.H{
            "code": 400,
            "msg":  "请求参数错误",
        })
        return
    }
    
    // 更新字段
    if updateOrder.Amount > 0 {
        order.Amount = updateOrder.Amount
    }
    if updateOrder.Status > 0 {
        order.Status = updateOrder.Status
    }
    order.UpdateAt = time.Now().Unix()
    
    c.JSON(http.StatusOK, gin.H{
        "code": 0,
        "msg":  "更新成功",
        "data": order,
    })
}

// deleteOrder 删除订单
func (os *OrderService) deleteOrder(c *gin.Context) {
    idStr := c.Param("id")
    id, err := strconv.ParseInt(idStr, 10, 64)
    if err != nil {
        c.JSON(http.StatusBadRequest, gin.H{
            "code": 400,
            "msg":  "无效的订单 ID",
        })
        return
    }
    
    _, exists := os.orders[id]
    if !exists {
        c.JSON(http.StatusNotFound, gin.H{
            "code": 404,
            "msg":  "订单不存在",
        })
        return
    }
    
    delete(os.orders, id)
    
    c.JSON(http.StatusOK, gin.H{
        "code": 0,
        "msg":  "删除成功",
    })
}

// getServiceInfo 获取服务信息
func (os *OrderService) getServiceInfo(c *gin.Context) {
    config := os.configMgr.GetConfig()
    
    c.JSON(http.StatusOK, gin.H{
        "service": "order-service",
        "version": "1.0.0",
        "config":  config,
        "uptime":  time.Now().Unix(),
    })
}

// validateUser 验证用户是否存在
func (os *OrderService) validateUser(userID int64) bool {
    // 获取用户服务实例
    instance, err := os.discovery.GetInstanceWithLoadBalance(
        context.Background(),
        "user-service",
        "DEFAULT_GROUP",
        service.LoadBalanceRandom,
    )
    if err != nil {
        os.logger.Error("获取用户服务实例失败", zap.Error(err))
        return false
    }
    
    // 调用用户服务
    url := fmt.Sprintf("http://%s:%d/api/v1/users/%d", instance.Ip, instance.Port, userID)
    resp, err := os.httpClient.Get(url)
    if err != nil {
        os.logger.Error("调用用户服务失败", zap.Error(err))
        return false
    }
    defer resp.Body.Close()
    
    return resp.StatusCode == http.StatusOK
}

// gracefulShutdown 优雅关闭
func (os *OrderService) gracefulShutdown(server *http.Server) {
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit
    
    os.logger.Info("正在关闭服务...")
    
    // 取消订阅
    if err := os.discovery.Unsubscribe(context.Background(), "user-service", "DEFAULT_GROUP"); err != nil {
        os.logger.Error("取消订阅失败", zap.Error(err))
    }
    
    // 注销服务
    if err := os.registry.Deregister(context.Background()); err != nil {
        os.logger.Error("注销服务失败", zap.Error(err))
    }
    
    // 关闭 HTTP 服务器
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    if err := server.Shutdown(ctx); err != nil {
        os.logger.Error("关闭服务器失败", zap.Error(err))
    }
    
    // 关闭 Nacos 客户端
    os.nacos.Close()
    
    os.logger.Info("服务已关闭")
}

// main 主函数
func main() {
    orderService, err := NewOrderService()
    if err != nil {
        panic(err)
    }
    
    if err := orderService.Start(); err != nil && err != http.ErrServerClosed {
        panic(err)
    }
}

5. 高级特性

5.1 配置加密解密

// config_crypto.go
// 配置加密解密
package crypto

import (
    "crypto/aes"
    "crypto/cipher"
    "crypto/rand"
    "encoding/base64"
    "errors"
    "io"
)

// ConfigCrypto 配置加密解密器
type ConfigCrypto struct {
    key []byte
}

// NewConfigCrypto 创建配置加密解密器
func NewConfigCrypto(key string) *ConfigCrypto {
    return &ConfigCrypto{
        key: []byte(key),
    }
}

// Encrypt 加密配置
func (cc *ConfigCrypto) Encrypt(plaintext string) (string, error) {
    block, err := aes.NewCipher(cc.key)
    if err != nil {
        return "", err
    }
    
    // 创建 GCM
    gcm, err := cipher.NewGCM(block)
    if err != nil {
        return "", err
    }
    
    // 生成随机 nonce
    nonce := make([]byte, gcm.NonceSize())
    if _, err = io.ReadFull(rand.Reader, nonce); err != nil {
        return "", err
    }
    
    // 加密
    ciphertext := gcm.Seal(nonce, nonce, []byte(plaintext), nil)
    
    // Base64 编码
    return base64.StdEncoding.EncodeToString(ciphertext), nil
}

// Decrypt 解密配置
func (cc *ConfigCrypto) Decrypt(ciphertext string) (string, error) {
    // Base64 解码
    data, err := base64.StdEncoding.DecodeString(ciphertext)
    if err != nil {
        return "", err
    }
    
    block, err := aes.NewCipher(cc.key)
    if err != nil {
        return "", err
    }
    
    // 创建 GCM
    gcm, err := cipher.NewGCM(block)
    if err != nil {
        return "", err
    }
    
    // 检查数据长度
    nonceSize := gcm.NonceSize()
    if len(data) < nonceSize {
        return "", errors.New("密文太短")
    }
    
    // 提取 nonce 和密文
    nonce, ciphertext := data[:nonceSize], data[nonceSize:]
    
    // 解密
    plaintext, err := gcm.Open(nil, nonce, ciphertext, nil)
    if err != nil {
        return "", err
    }
    
    return string(plaintext), nil
}

5.2 服务熔断器

// circuit_breaker.go
// 服务熔断器
package breaker

import (
    "errors"
    "sync"
    "time"
)

// State 熔断器状态
type State int

const (
    StateClosed State = iota
    StateOpen
    StateHalfOpen
)

// CircuitBreaker 熔断器
type CircuitBreaker struct {
    mutex           sync.RWMutex
    state           State
    failureCount    int
    successCount    int
    requestCount    int
    lastFailureTime time.Time
    
    // 配置参数
    maxFailures     int           // 最大失败次数
    timeout         time.Duration // 超时时间
    resetTimeout    time.Duration // 重置超时时间
    successThreshold int          // 成功阈值
}

// NewCircuitBreaker 创建熔断器
func NewCircuitBreaker(maxFailures int, timeout, resetTimeout time.Duration) *CircuitBreaker {
    return &CircuitBreaker{
        state:            StateClosed,
        maxFailures:      maxFailures,
        timeout:          timeout,
        resetTimeout:     resetTimeout,
        successThreshold: 3,
    }
}

// Call 执行调用
func (cb *CircuitBreaker) Call(fn func() error) error {
    if !cb.canExecute() {
        return errors.New("熔断器开启,拒绝请求")
    }
    
    err := fn()
    cb.recordResult(err == nil)
    
    return err
}

// canExecute 检查是否可以执行
func (cb *CircuitBreaker) canExecute() bool {
    cb.mutex.RLock()
    defer cb.mutex.RUnlock()
    
    switch cb.state {
    case StateClosed:
        return true
    case StateOpen:
        return time.Since(cb.lastFailureTime) > cb.resetTimeout
    case StateHalfOpen:
        return true
    default:
        return false
    }
}

// recordResult 记录结果
func (cb *CircuitBreaker) recordResult(success bool) {
    cb.mutex.Lock()
    defer cb.mutex.Unlock()
    
    cb.requestCount++
    
    if success {
        cb.successCount++
        if cb.state == StateHalfOpen && cb.successCount >= cb.successThreshold {
            cb.setState(StateClosed)
        }
    } else {
        cb.failureCount++
        cb.lastFailureTime = time.Now()
        
        if cb.state == StateClosed && cb.failureCount >= cb.maxFailures {
            cb.setState(StateOpen)
        } else if cb.state == StateHalfOpen {
            cb.setState(StateOpen)
        }
    }
}

// setState 设置状态
func (cb *CircuitBreaker) setState(state State) {
    cb.state = state
    
    if state == StateClosed {
        cb.failureCount = 0
        cb.successCount = 0
        cb.requestCount = 0
    } else if state == StateHalfOpen {
        cb.successCount = 0
    }
}

// GetState 获取状态
func (cb *CircuitBreaker) GetState() State {
    cb.mutex.RLock()
    defer cb.mutex.RUnlock()
    return cb.state
}

// GetStats 获取统计信息
func (cb *CircuitBreaker) GetStats() (int, int, int, State) {
    cb.mutex.RLock()
    defer cb.mutex.RUnlock()
    return cb.requestCount, cb.successCount, cb.failureCount, cb.state
}

5.3 服务限流器

// rate_limiter.go
// 服务限流器
package limiter

import (
    "sync"
    "time"
)

// TokenBucket 令牌桶限流器
type TokenBucket struct {
    mutex      sync.Mutex
    capacity   int64         // 桶容量
    tokens     int64         // 当前令牌数
    rate       int64         // 令牌生成速率(每秒)
    lastRefill time.Time     // 上次填充时间
}

// NewTokenBucket 创建令牌桶限流器
func NewTokenBucket(capacity, rate int64) *TokenBucket {
    return &TokenBucket{
        capacity:   capacity,
        tokens:     capacity,
        rate:       rate,
        lastRefill: time.Now(),
    }
}

// Allow 检查是否允许请求
func (tb *TokenBucket) Allow() bool {
    return tb.AllowN(1)
}

// AllowN 检查是否允许 N 个请求
func (tb *TokenBucket) AllowN(n int64) bool {
    tb.mutex.Lock()
    defer tb.mutex.Unlock()
    
    tb.refill()
    
    if tb.tokens >= n {
        tb.tokens -= n
        return true
    }
    
    return false
}

// refill 填充令牌
func (tb *TokenBucket) refill() {
    now := time.Now()
    elapsed := now.Sub(tb.lastRefill)
    
    tokensToAdd := int64(elapsed.Seconds()) * tb.rate
    if tokensToAdd > 0 {
        tb.tokens += tokensToAdd
        if tb.tokens > tb.capacity {
            tb.tokens = tb.capacity
        }
        tb.lastRefill = now
    }
}

// GetTokens 获取当前令牌数
func (tb *TokenBucket) GetTokens() int64 {
    tb.mutex.Lock()
    defer tb.mutex.Unlock()
    
    tb.refill()
    return tb.tokens
}

// SlidingWindow 滑动窗口限流器
type SlidingWindow struct {
    mutex    sync.Mutex
    window   time.Duration // 窗口大小
    limit    int64         // 限制数量
    requests []time.Time   // 请求时间戳
}

// NewSlidingWindow 创建滑动窗口限流器
func NewSlidingWindow(window time.Duration, limit int64) *SlidingWindow {
    return &SlidingWindow{
        window:   window,
        limit:    limit,
        requests: make([]time.Time, 0),
    }
}

// Allow 检查是否允许请求
func (sw *SlidingWindow) Allow() bool {
    sw.mutex.Lock()
    defer sw.mutex.Unlock()
    
    now := time.Now()
    
    // 清理过期请求
    sw.cleanup(now)
    
    // 检查是否超过限制
    if int64(len(sw.requests)) >= sw.limit {
        return false
    }
    
    // 添加当前请求
    sw.requests = append(sw.requests, now)
    return true
}

// cleanup 清理过期请求
func (sw *SlidingWindow) cleanup(now time.Time) {
    cutoff := now.Add(-sw.window)
    
    // 找到第一个未过期的请求
    i := 0
    for i < len(sw.requests) && sw.requests[i].Before(cutoff) {
        i++
    }
    
    // 移除过期请求
    if i > 0 {
        sw.requests = sw.requests[i:]
    }
}

// GetRequestCount 获取当前请求数
func (sw *SlidingWindow) GetRequestCount() int {
    sw.mutex.Lock()
    defer sw.mutex.Unlock()
    
    sw.cleanup(time.Now())
    return len(sw.requests)
}

6. 测试与调试

6.1 单元测试

// service_test.go
// 服务测试
package service

import (
    "context"
    "testing"
    "time"
    
    "github.com/stretchr/testify/assert"
    "github.com/stretchr/testify/mock"
)

// MockNamingClient 模拟命名客户端
type MockNamingClient struct {
    mock.Mock
}

func (m *MockNamingClient) RegisterInstance(param vo.RegisterInstanceParam) (bool, error) {
    args := m.Called(param)
    return args.Bool(0), args.Error(1)
}

func (m *MockNamingClient) DeregisterInstance(param vo.DeregisterInstanceParam) (bool, error) {
    args := m.Called(param)
    return args.Bool(0), args.Error(1)
}

// TestServiceRegistry 测试服务注册
func TestServiceRegistry(t *testing.T) {
    // 创建模拟客户端
    mockClient := new(MockNamingClient)
    
    // 设置期望
    mockClient.On("RegisterInstance", mock.AnythingOfType("vo.RegisterInstanceParam")).Return(true, nil)
    
    // 创建服务注册器
    registry := &ServiceRegistry{
        serviceName: "test-service",
        groupName:   "DEFAULT_GROUP",
        ip:          "127.0.0.1",
        port:        8080,
        weight:      1.0,
        metadata:    make(map[string]string),
    }
    
    // 执行注册
    err := registry.Register(context.Background())
    
    // 验证结果
    assert.NoError(t, err)
    mockClient.AssertExpectations(t)
}

// TestCircuitBreaker 测试熔断器
func TestCircuitBreaker(t *testing.T) {
    cb := NewCircuitBreaker(3, time.Second, 5*time.Second)
    
    // 测试正常调用
    err := cb.Call(func() error {
        return nil
    })
    assert.NoError(t, err)
    assert.Equal(t, StateClosed, cb.GetState())
    
    // 测试失败调用
    for i := 0; i < 3; i++ {
        cb.Call(func() error {
            return errors.New("test error")
        })
    }
    assert.Equal(t, StateOpen, cb.GetState())
    
    // 测试熔断状态下的调用
    err = cb.Call(func() error {
        return nil
    })
    assert.Error(t, err)
    assert.Contains(t, err.Error(), "熔断器开启")
}

// TestTokenBucket 测试令牌桶
func TestTokenBucket(t *testing.T) {
    tb := NewTokenBucket(10, 5)
    
    // 测试初始状态
    assert.True(t, tb.Allow())
    assert.Equal(t, int64(9), tb.GetTokens())
    
    // 测试批量消费
    assert.True(t, tb.AllowN(5))
    assert.Equal(t, int64(4), tb.GetTokens())
    
    // 测试超出容量
    assert.False(t, tb.AllowN(10))
    
    // 等待令牌补充
    time.Sleep(2 * time.Second)
    assert.True(t, tb.AllowN(5))
}

// BenchmarkServiceDiscovery 性能测试
func BenchmarkServiceDiscovery(b *testing.B) {
    discovery := NewServiceDiscovery(nil)
    instances := []*model.Instance{
        {Ip: "127.0.0.1", Port: 8080, Weight: 1.0},
        {Ip: "127.0.0.1", Port: 8081, Weight: 1.0},
        {Ip: "127.0.0.1", Port: 8082, Weight: 1.0},
    }
    
    b.ResetTimer()
    
    for i := 0; i < b.N; i++ {
        discovery.randomSelect(instances)
    }
}

6.2 集成测试

// integration_test.go
// 集成测试
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "net/http"
    "testing"
    "time"
    
    "github.com/stretchr/testify/assert"
    "github.com/stretchr/testify/suite"
)

// IntegrationTestSuite 集成测试套件
type IntegrationTestSuite struct {
    suite.Suite
    userService  *UserService
    orderService *OrderService
    httpClient   *http.Client
}

// SetupSuite 设置测试套件
func (suite *IntegrationTestSuite) SetupSuite() {
    // 启动用户服务
    userService, err := NewUserService()
    suite.Require().NoError(err)
    suite.userService = userService
    
    go func() {
        userService.Start()
    }()
    
    // 启动订单服务
    orderService, err := NewOrderService()
    suite.Require().NoError(err)
    suite.orderService = orderService
    
    go func() {
        orderService.Start()
    }()
    
    // 等待服务启动
    time.Sleep(3 * time.Second)
    
    suite.httpClient = &http.Client{
        Timeout: 10 * time.Second,
    }
}

// TearDownSuite 清理测试套件
func (suite *IntegrationTestSuite) TearDownSuite() {
    // 关闭服务
    if suite.userService != nil {
        suite.userService.nacos.Close()
    }
    if suite.orderService != nil {
        suite.orderService.nacos.Close()
    }
}

// TestServiceRegistration 测试服务注册
func (suite *IntegrationTestSuite) TestServiceRegistration() {
    // 检查用户服务健康状态
    resp, err := suite.httpClient.Get("http://localhost:8080/health")
    suite.Require().NoError(err)
    suite.Equal(http.StatusOK, resp.StatusCode)
    resp.Body.Close()
    
    // 检查订单服务健康状态
    resp, err = suite.httpClient.Get("http://localhost:8081/health")
    suite.Require().NoError(err)
    suite.Equal(http.StatusOK, resp.StatusCode)
    resp.Body.Close()
}

// TestServiceDiscovery 测试服务发现
func (suite *IntegrationTestSuite) TestServiceDiscovery() {
    // 通过订单服务调用用户服务
    resp, err := suite.httpClient.Get("http://localhost:8081/api/v1/orders")
    suite.Require().NoError(err)
    suite.Equal(http.StatusOK, resp.StatusCode)
    resp.Body.Close()
}

// TestConfigManagement 测试配置管理
func (suite *IntegrationTestSuite) TestConfigManagement() {
    // 获取服务配置信息
    resp, err := suite.httpClient.Get("http://localhost:8080/info")
    suite.Require().NoError(err)
    suite.Equal(http.StatusOK, resp.StatusCode)
    
    var result map[string]interface{}
    err = json.NewDecoder(resp.Body).Decode(&result)
    suite.Require().NoError(err)
    resp.Body.Close()
    
    suite.Equal("user-service", result["service"])
    suite.Equal("1.0.0", result["version"])
}

// TestInSuite 运行集成测试
func TestIntegrationSuite(t *testing.T) {
    suite.Run(t, new(IntegrationTestSuite))
}

7. 核心要点

7.1 架构设计

  • 模块化设计: 将服务注册、服务发现、配置管理等功能模块化
  • 接口抽象: 定义清晰的接口,便于测试和扩展
  • 错误处理: 完善的错误处理和日志记录
  • 并发安全: 使用互斥锁保证并发安全

7.2 最佳实践

  • 服务注册: 设置合适的元数据,便于服务治理
  • 健康检查: 实现健康检查接口,确保服务可用性
  • 负载均衡: 根据业务需求选择合适的负载均衡策略
  • 配置管理: 使用配置热更新,提高系统灵活性
  • 故障处理: 实现熔断、限流等容错机制

7.3 性能优化

  • 连接池: 使用连接池减少连接开销
  • 缓存机制: 缓存服务实例信息,减少网络请求
  • 异步处理: 使用异步方式处理服务变更通知
  • 批量操作: 支持批量注册和注销服务

8. 下一步学习

  • Spring Native: 学习 Spring Native 原生镜像技术
  • 服务网格: 了解 Istio、Linkerd 等服务网格技术
  • 可观测性: 学习分布式链路追踪和监控
  • 云原生: 掌握 Kubernetes、Docker 等容器技术 “`