学习目标
- 掌握 Nacos Go 客户端的安装和配置
- 学会使用 Go 客户端进行服务注册与发现
- 掌握 Go 客户端的配置管理功能
- 了解 Go 客户端的高级特性和最佳实践
1. Go 客户端概述
1.1 客户端特性
Nacos Go 客户端提供了完整的服务发现和配置管理功能:
- 服务注册与发现: 支持服务实例的注册、注销和查询
- 配置管理: 支持配置的发布、获取、监听和删除
- 健康检查: 支持多种健康检查方式
- 负载均衡: 内置多种负载均衡算法
- 故障转移: 支持客户端故障转移
- 安全认证: 支持用户名密码认证
1.2 核心组件
// client_types.go
// Go 客户端核心类型定义
package nacos
import (
"context"
"time"
)
// ServiceInstance 服务实例
type ServiceInstance struct {
InstanceId string `json:"instanceId"`
ServiceName string `json:"serviceName"`
GroupName string `json:"groupName"`
ClusterName string `json:"clusterName"`
Ip string `json:"ip"`
Port uint64 `json:"port"`
Weight float64 `json:"weight"`
Enable bool `json:"enable"`
Healthy bool `json:"healthy"`
Ephemeral bool `json:"ephemeral"`
Metadata map[string]string `json:"metadata"`
}
// ConfigParam 配置参数
type ConfigParam struct {
DataId string `json:"dataId"`
Group string `json:"group"`
Content string `json:"content"`
Tag string `json:"tag"`
AppName string `json:"appName"`
BetaIps string `json:"betaIps"`
CasMd5 string `json:"casMd5"`
Type string `json:"type"`
}
// ConfigChangeEvent 配置变更事件
type ConfigChangeEvent struct {
DataId string
Group string
Content string
EventType string
Timestamp time.Time
}
// HealthCheckType 健康检查类型
type HealthCheckType string
const (
HealthCheckTCP HealthCheckType = "TCP"
HealthCheckHTTP HealthCheckType = "HTTP"
HealthCheckNone HealthCheckType = "NONE"
)
// LoadBalanceStrategy 负载均衡策略
type LoadBalanceStrategy string
const (
LoadBalanceRoundRobin LoadBalanceStrategy = "round-robin"
LoadBalanceRandom LoadBalanceStrategy = "random"
LoadBalanceWeighted LoadBalanceStrategy = "weighted"
)
// NacosClient Nacos 客户端接口
type NacosClient interface {
// 服务发现相关
RegisterInstance(ctx context.Context, param RegisterInstanceParam) error
DeregisterInstance(ctx context.Context, param DeregisterInstanceParam) error
GetService(ctx context.Context, param GetServiceParam) (*Service, error)
GetAllServicesInfo(ctx context.Context, param GetAllServicesInfoParam) (*ServiceList, error)
SelectInstances(ctx context.Context, param SelectInstancesParam) ([]*ServiceInstance, error)
SelectOneHealthyInstance(ctx context.Context, param SelectOneHealthyInstanceParam) (*ServiceInstance, error)
Subscribe(ctx context.Context, param SubscribeParam) error
Unsubscribe(ctx context.Context, param SubscribeParam) error
// 配置管理相关
GetConfig(ctx context.Context, param ConfigParam) (string, error)
PublishConfig(ctx context.Context, param ConfigParam) error
DeleteConfig(ctx context.Context, param ConfigParam) error
ListenConfig(ctx context.Context, param ConfigParam, listener ConfigListener) error
CancelListenConfig(ctx context.Context, param ConfigParam) error
// 客户端管理
Close() error
}
// ConfigListener 配置监听器
type ConfigListener interface {
OnChange(event ConfigChangeEvent)
OnError(err error)
}
// ServiceChangeListener 服务变更监听器
type ServiceChangeListener interface {
OnChange(services []*ServiceInstance)
OnError(err error)
}
2. 安装与配置
2.1 安装依赖
# 初始化 Go 模块
go mod init nacos-go-example
# 安装 Nacos Go 客户端
go get github.com/nacos-group/nacos-sdk-go/v2
# 安装其他依赖
go get github.com/gin-gonic/gin
go get github.com/spf13/viper
go get go.uber.org/zap
go get github.com/prometheus/client_golang
2.2 客户端配置
// config.go
// 客户端配置
package config
import (
"github.com/nacos-group/nacos-sdk-go/v2/clients"
"github.com/nacos-group/nacos-sdk-go/v2/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
"go.uber.org/zap"
)
// NacosConfig Nacos 配置
type NacosConfig struct {
ServerConfigs []constant.ServerConfig
ClientConfig constant.ClientConfig
NamingClient naming_client.INamingClient
ConfigClient config_client.IConfigClient
Logger *zap.Logger
}
// NewNacosConfig 创建 Nacos 配置
func NewNacosConfig() (*NacosConfig, error) {
logger, _ := zap.NewProduction()
// 服务端配置
serverConfigs := []constant.ServerConfig{
{
IpAddr: "localhost",
Port: 8848,
ContextPath: "/nacos",
Scheme: "http",
},
}
// 客户端配置
clientConfig := constant.ClientConfig{
NamespaceId: "public",
TimeoutMs: 5000,
NotLoadCacheAtStart: true,
LogDir: "./logs",
CacheDir: "./cache",
LogLevel: "info",
Username: "nacos",
Password: "nacos",
}
// 创建命名客户端
namingClient, err := clients.NewNamingClient(
vo.NacosClientParam{
ClientConfig: &clientConfig,
ServerConfigs: serverConfigs,
},
)
if err != nil {
logger.Error("创建命名客户端失败", zap.Error(err))
return nil, err
}
// 创建配置客户端
configClient, err := clients.NewConfigClient(
vo.NacosClientParam{
ClientConfig: &clientConfig,
ServerConfigs: serverConfigs,
},
)
if err != nil {
logger.Error("创建配置客户端失败", zap.Error(err))
return nil, err
}
return &NacosConfig{
ServerConfigs: serverConfigs,
ClientConfig: clientConfig,
NamingClient: namingClient,
ConfigClient: configClient,
Logger: logger,
}, nil
}
// Close 关闭客户端
func (nc *NacosConfig) Close() {
if nc.Logger != nil {
nc.Logger.Sync()
}
}
2.3 配置文件管理
// config_manager.go
// 配置文件管理器
package config
import (
"encoding/json"
"fmt"
"github.com/spf13/viper"
"go.uber.org/zap"
"gopkg.in/yaml.v2"
"strings"
)
// AppConfig 应用配置
type AppConfig struct {
Server ServerConfig `yaml:"server" json:"server"`
Database DatabaseConfig `yaml:"database" json:"database"`
Redis RedisConfig `yaml:"redis" json:"redis"`
Log LogConfig `yaml:"log" json:"log"`
Business BusinessConfig `yaml:"business" json:"business"`
}
// ServerConfig 服务器配置
type ServerConfig struct {
Port int `yaml:"port" json:"port"`
Host string `yaml:"host" json:"host"`
ReadTimeout int `yaml:"readTimeout" json:"readTimeout"`
WriteTimeout int `yaml:"writeTimeout" json:"writeTimeout"`
IdleTimeout int `yaml:"idleTimeout" json:"idleTimeout"`
}
// DatabaseConfig 数据库配置
type DatabaseConfig struct {
Driver string `yaml:"driver" json:"driver"`
Host string `yaml:"host" json:"host"`
Port int `yaml:"port" json:"port"`
Username string `yaml:"username" json:"username"`
Password string `yaml:"password" json:"password"`
Database string `yaml:"database" json:"database"`
MaxOpenConns int `yaml:"maxOpenConns" json:"maxOpenConns"`
MaxIdleConns int `yaml:"maxIdleConns" json:"maxIdleConns"`
ConnMaxLifetime int `yaml:"connMaxLifetime" json:"connMaxLifetime"`
}
// RedisConfig Redis 配置
type RedisConfig struct {
Host string `yaml:"host" json:"host"`
Port int `yaml:"port" json:"port"`
Password string `yaml:"password" json:"password"`
Database int `yaml:"database" json:"database"`
MaxRetries int `yaml:"maxRetries" json:"maxRetries"`
PoolSize int `yaml:"poolSize" json:"poolSize"`
MinIdleConns int `yaml:"minIdleConns" json:"minIdleConns"`
}
// LogConfig 日志配置
type LogConfig struct {
Level string `yaml:"level" json:"level"`
Format string `yaml:"format" json:"format"`
Output string `yaml:"output" json:"output"`
MaxSize int `yaml:"maxSize" json:"maxSize"`
MaxBackups int `yaml:"maxBackups" json:"maxBackups"`
MaxAge int `yaml:"maxAge" json:"maxAge"`
Compress bool `yaml:"compress" json:"compress"`
}
// BusinessConfig 业务配置
type BusinessConfig struct {
OrderTimeout int `yaml:"orderTimeout" json:"orderTimeout"`
RetryTimes int `yaml:"retryTimes" json:"retryTimes"`
DefaultPageSize int `yaml:"defaultPageSize" json:"defaultPageSize"`
MaxPageSize int `yaml:"maxPageSize" json:"maxPageSize"`
CacheTTL int `yaml:"cacheTTL" json:"cacheTTL"`
RateLimit float64 `yaml:"rateLimit" json:"rateLimit"`
}
// ConfigManager 配置管理器
type ConfigManager struct {
nacos *NacosConfig
config *AppConfig
logger *zap.Logger
}
// NewConfigManager 创建配置管理器
func NewConfigManager(nacos *NacosConfig) *ConfigManager {
return &ConfigManager{
nacos: nacos,
config: &AppConfig{},
logger: nacos.Logger,
}
}
// LoadConfig 加载配置
func (cm *ConfigManager) LoadConfig(dataId, group string) error {
// 从 Nacos 获取配置
content, err := cm.nacos.ConfigClient.GetConfig(vo.ConfigParam{
DataId: dataId,
Group: group,
})
if err != nil {
cm.logger.Error("获取配置失败", zap.Error(err))
return err
}
// 解析配置
if err := cm.parseConfig(content, dataId); err != nil {
cm.logger.Error("解析配置失败", zap.Error(err))
return err
}
// 监听配置变更
if err := cm.listenConfig(dataId, group); err != nil {
cm.logger.Error("监听配置失败", zap.Error(err))
return err
}
cm.logger.Info("配置加载成功", zap.String("dataId", dataId), zap.String("group", group))
return nil
}
// parseConfig 解析配置
func (cm *ConfigManager) parseConfig(content, dataId string) error {
// 根据文件扩展名选择解析方式
if strings.HasSuffix(dataId, ".yaml") || strings.HasSuffix(dataId, ".yml") {
return yaml.Unmarshal([]byte(content), cm.config)
} else if strings.HasSuffix(dataId, ".json") {
return json.Unmarshal([]byte(content), cm.config)
} else {
// 默认使用 YAML 解析
return yaml.Unmarshal([]byte(content), cm.config)
}
}
// listenConfig 监听配置变更
func (cm *ConfigManager) listenConfig(dataId, group string) error {
return cm.nacos.ConfigClient.ListenConfig(vo.ConfigParam{
DataId: dataId,
Group: group,
OnChange: func(namespace, group, dataId, data string) {
cm.logger.Info("配置发生变更",
zap.String("namespace", namespace),
zap.String("group", group),
zap.String("dataId", dataId),
)
// 重新解析配置
if err := cm.parseConfig(data, dataId); err != nil {
cm.logger.Error("重新解析配置失败", zap.Error(err))
return
}
cm.logger.Info("配置热更新成功")
},
})
}
// GetConfig 获取配置
func (cm *ConfigManager) GetConfig() *AppConfig {
return cm.config
}
// PublishConfig 发布配置
func (cm *ConfigManager) PublishConfig(dataId, group, content string) error {
success, err := cm.nacos.ConfigClient.PublishConfig(vo.ConfigParam{
DataId: dataId,
Group: group,
Content: content,
})
if err != nil {
return err
}
if !success {
return fmt.Errorf("发布配置失败")
}
cm.logger.Info("配置发布成功", zap.String("dataId", dataId), zap.String("group", group))
return nil
}
// DeleteConfig 删除配置
func (cm *ConfigManager) DeleteConfig(dataId, group string) error {
success, err := cm.nacos.ConfigClient.DeleteConfig(vo.ConfigParam{
DataId: dataId,
Group: group,
})
if err != nil {
return err
}
if !success {
return fmt.Errorf("删除配置失败")
}
cm.logger.Info("配置删除成功", zap.String("dataId", dataId), zap.String("group", group))
return nil
}
3. 服务注册与发现
3.1 服务注册
// service_registry.go
// 服务注册器
package service
import (
"context"
"fmt"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
"go.uber.org/zap"
"net"
"os"
"strconv"
"time"
)
// ServiceRegistry 服务注册器
type ServiceRegistry struct {
nacos *config.NacosConfig
serviceName string
groupName string
clusterName string
ip string
port uint64
weight float64
metadata map[string]string
logger *zap.Logger
}
// NewServiceRegistry 创建服务注册器
func NewServiceRegistry(nacos *config.NacosConfig, serviceName string, port uint64) (*ServiceRegistry, error) {
ip, err := getLocalIP()
if err != nil {
return nil, err
}
return &ServiceRegistry{
nacos: nacos,
serviceName: serviceName,
groupName: "DEFAULT_GROUP",
clusterName: "DEFAULT",
ip: ip,
port: port,
weight: 1.0,
metadata: make(map[string]string),
logger: nacos.Logger,
}, nil
}
// SetGroupName 设置分组名称
func (sr *ServiceRegistry) SetGroupName(groupName string) {
sr.groupName = groupName
}
// SetClusterName 设置集群名称
func (sr *ServiceRegistry) SetClusterName(clusterName string) {
sr.clusterName = clusterName
}
// SetWeight 设置权重
func (sr *ServiceRegistry) SetWeight(weight float64) {
sr.weight = weight
}
// SetMetadata 设置元数据
func (sr *ServiceRegistry) SetMetadata(key, value string) {
sr.metadata[key] = value
}
// Register 注册服务
func (sr *ServiceRegistry) Register(ctx context.Context) error {
// 添加默认元数据
sr.metadata["version"] = "1.0.0"
sr.metadata["env"] = getEnv("ENV", "dev")
sr.metadata["startTime"] = strconv.FormatInt(time.Now().Unix(), 10)
sr.metadata["pid"] = strconv.Itoa(os.Getpid())
success, err := sr.nacos.NamingClient.RegisterInstance(vo.RegisterInstanceParam{
Ip: sr.ip,
Port: sr.port,
ServiceName: sr.serviceName,
GroupName: sr.groupName,
ClusterName: sr.clusterName,
Weight: sr.weight,
Enable: true,
Healthy: true,
Ephemeral: true,
Metadata: sr.metadata,
})
if err != nil {
sr.logger.Error("服务注册失败", zap.Error(err))
return err
}
if !success {
return fmt.Errorf("服务注册失败")
}
sr.logger.Info("服务注册成功",
zap.String("serviceName", sr.serviceName),
zap.String("ip", sr.ip),
zap.Uint64("port", sr.port),
zap.String("groupName", sr.groupName),
zap.String("clusterName", sr.clusterName),
)
return nil
}
// Deregister 注销服务
func (sr *ServiceRegistry) Deregister(ctx context.Context) error {
success, err := sr.nacos.NamingClient.DeregisterInstance(vo.DeregisterInstanceParam{
Ip: sr.ip,
Port: sr.port,
ServiceName: sr.serviceName,
GroupName: sr.groupName,
ClusterName: sr.clusterName,
Ephemeral: true,
})
if err != nil {
sr.logger.Error("服务注销失败", zap.Error(err))
return err
}
if !success {
return fmt.Errorf("服务注销失败")
}
sr.logger.Info("服务注销成功",
zap.String("serviceName", sr.serviceName),
zap.String("ip", sr.ip),
zap.Uint64("port", sr.port),
)
return nil
}
// UpdateInstance 更新服务实例
func (sr *ServiceRegistry) UpdateInstance(ctx context.Context) error {
success, err := sr.nacos.NamingClient.UpdateInstance(vo.UpdateInstanceParam{
Ip: sr.ip,
Port: sr.port,
ServiceName: sr.serviceName,
GroupName: sr.groupName,
ClusterName: sr.clusterName,
Weight: sr.weight,
Enable: true,
Healthy: true,
Ephemeral: true,
Metadata: sr.metadata,
})
if err != nil {
sr.logger.Error("更新服务实例失败", zap.Error(err))
return err
}
if !success {
return fmt.Errorf("更新服务实例失败")
}
sr.logger.Info("更新服务实例成功")
return nil
}
// getLocalIP 获取本地 IP 地址
func getLocalIP() (string, error) {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
return "", err
}
defer conn.Close()
localAddr := conn.LocalAddr().(*net.UDPAddr)
return localAddr.IP.String(), nil
}
// getEnv 获取环境变量
func getEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}
3.2 服务发现
// service_discovery.go
// 服务发现器
package service
import (
"context"
"fmt"
"github.com/nacos-group/nacos-sdk-go/v2/model"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
"go.uber.org/zap"
"math/rand"
"sync"
"time"
)
// ServiceDiscovery 服务发现器
type ServiceDiscovery struct {
nacos *config.NacosConfig
cache map[string][]*model.Instance
cacheMutex sync.RWMutex
subscribers map[string][]ServiceChangeListener
subMutex sync.RWMutex
logger *zap.Logger
}
// ServiceChangeListener 服务变更监听器
type ServiceChangeListener interface {
OnChange(serviceName string, instances []*model.Instance)
OnError(serviceName string, err error)
}
// NewServiceDiscovery 创建服务发现器
func NewServiceDiscovery(nacos *config.NacosConfig) *ServiceDiscovery {
return &ServiceDiscovery{
nacos: nacos,
cache: make(map[string][]*model.Instance),
subscribers: make(map[string][]ServiceChangeListener),
logger: nacos.Logger,
}
}
// GetService 获取服务信息
func (sd *ServiceDiscovery) GetService(ctx context.Context, serviceName, groupName string) (*model.Service, error) {
service, err := sd.nacos.NamingClient.GetService(vo.GetServiceParam{
ServiceName: serviceName,
GroupName: groupName,
})
if err != nil {
sd.logger.Error("获取服务信息失败",
zap.String("serviceName", serviceName),
zap.String("groupName", groupName),
zap.Error(err),
)
return nil, err
}
return &service, nil
}
// GetInstances 获取服务实例列表
func (sd *ServiceDiscovery) GetInstances(ctx context.Context, serviceName, groupName string, healthyOnly bool) ([]*model.Instance, error) {
instances, err := sd.nacos.NamingClient.SelectInstances(vo.SelectInstancesParam{
ServiceName: serviceName,
GroupName: groupName,
HealthyOnly: healthyOnly,
})
if err != nil {
sd.logger.Error("获取服务实例失败",
zap.String("serviceName", serviceName),
zap.String("groupName", groupName),
zap.Bool("healthyOnly", healthyOnly),
zap.Error(err),
)
return nil, err
}
// 更新缓存
sd.updateCache(serviceName, instances)
sd.logger.Debug("获取服务实例成功",
zap.String("serviceName", serviceName),
zap.Int("instanceCount", len(instances)),
)
return instances, nil
}
// GetHealthyInstance 获取一个健康的服务实例
func (sd *ServiceDiscovery) GetHealthyInstance(ctx context.Context, serviceName, groupName string) (*model.Instance, error) {
instance, err := sd.nacos.NamingClient.SelectOneHealthyInstance(vo.SelectOneHealthyInstanceParam{
ServiceName: serviceName,
GroupName: groupName,
})
if err != nil {
sd.logger.Error("获取健康服务实例失败",
zap.String("serviceName", serviceName),
zap.String("groupName", groupName),
zap.Error(err),
)
return nil, err
}
return &instance, nil
}
// GetInstanceWithLoadBalance 使用负载均衡获取服务实例
func (sd *ServiceDiscovery) GetInstanceWithLoadBalance(ctx context.Context, serviceName, groupName string, strategy LoadBalanceStrategy) (*model.Instance, error) {
instances, err := sd.GetInstances(ctx, serviceName, groupName, true)
if err != nil {
return nil, err
}
if len(instances) == 0 {
return nil, fmt.Errorf("没有可用的服务实例")
}
switch strategy {
case LoadBalanceRandom:
return sd.randomSelect(instances), nil
case LoadBalanceWeighted:
return sd.weightedSelect(instances), nil
case LoadBalanceRoundRobin:
return sd.roundRobinSelect(serviceName, instances), nil
default:
return sd.randomSelect(instances), nil
}
}
// Subscribe 订阅服务变更
func (sd *ServiceDiscovery) Subscribe(ctx context.Context, serviceName, groupName string, listener ServiceChangeListener) error {
// 添加监听器
sd.addListener(serviceName, listener)
// 订阅服务变更
err := sd.nacos.NamingClient.Subscribe(&vo.SubscribeParam{
ServiceName: serviceName,
GroupName: groupName,
SubscribeCallback: func(services []model.Instance, err error) {
if err != nil {
sd.logger.Error("服务变更通知错误",
zap.String("serviceName", serviceName),
zap.Error(err),
)
sd.notifyError(serviceName, err)
return
}
// 转换为指针切片
instances := make([]*model.Instance, len(services))
for i := range services {
instances[i] = &services[i]
}
// 更新缓存
sd.updateCache(serviceName, instances)
// 通知监听器
sd.notifyChange(serviceName, instances)
sd.logger.Info("服务实例变更",
zap.String("serviceName", serviceName),
zap.Int("instanceCount", len(instances)),
)
},
})
if err != nil {
sd.logger.Error("订阅服务失败",
zap.String("serviceName", serviceName),
zap.String("groupName", groupName),
zap.Error(err),
)
return err
}
sd.logger.Info("订阅服务成功",
zap.String("serviceName", serviceName),
zap.String("groupName", groupName),
)
return nil
}
// Unsubscribe 取消订阅服务变更
func (sd *ServiceDiscovery) Unsubscribe(ctx context.Context, serviceName, groupName string) error {
err := sd.nacos.NamingClient.Unsubscribe(&vo.SubscribeParam{
ServiceName: serviceName,
GroupName: groupName,
})
if err != nil {
sd.logger.Error("取消订阅服务失败",
zap.String("serviceName", serviceName),
zap.String("groupName", groupName),
zap.Error(err),
)
return err
}
// 移除监听器
sd.removeListeners(serviceName)
sd.logger.Info("取消订阅服务成功",
zap.String("serviceName", serviceName),
zap.String("groupName", groupName),
)
return nil
}
// 负载均衡算法实现
// randomSelect 随机选择
func (sd *ServiceDiscovery) randomSelect(instances []*model.Instance) *model.Instance {
if len(instances) == 0 {
return nil
}
return instances[rand.Intn(len(instances))]
}
// weightedSelect 加权选择
func (sd *ServiceDiscovery) weightedSelect(instances []*model.Instance) *model.Instance {
if len(instances) == 0 {
return nil
}
totalWeight := 0.0
for _, instance := range instances {
totalWeight += instance.Weight
}
if totalWeight <= 0 {
return sd.randomSelect(instances)
}
random := rand.Float64() * totalWeight
currentWeight := 0.0
for _, instance := range instances {
currentWeight += instance.Weight
if random <= currentWeight {
return instance
}
}
return instances[len(instances)-1]
}
// roundRobinSelect 轮询选择
var roundRobinCounters = make(map[string]int)
var roundRobinMutex sync.Mutex
func (sd *ServiceDiscovery) roundRobinSelect(serviceName string, instances []*model.Instance) *model.Instance {
if len(instances) == 0 {
return nil
}
roundRobinMutex.Lock()
defer roundRobinMutex.Unlock()
counter := roundRobinCounters[serviceName]
instance := instances[counter%len(instances)]
roundRobinCounters[serviceName] = counter + 1
return instance
}
// 缓存和监听器管理
// updateCache 更新缓存
func (sd *ServiceDiscovery) updateCache(serviceName string, instances []*model.Instance) {
sd.cacheMutex.Lock()
defer sd.cacheMutex.Unlock()
sd.cache[serviceName] = instances
}
// addListener 添加监听器
func (sd *ServiceDiscovery) addListener(serviceName string, listener ServiceChangeListener) {
sd.subMutex.Lock()
defer sd.subMutex.Unlock()
sd.subscribers[serviceName] = append(sd.subscribers[serviceName], listener)
}
// removeListeners 移除监听器
func (sd *ServiceDiscovery) removeListeners(serviceName string) {
sd.subMutex.Lock()
defer sd.subMutex.Unlock()
delete(sd.subscribers, serviceName)
}
// notifyChange 通知变更
func (sd *ServiceDiscovery) notifyChange(serviceName string, instances []*model.Instance) {
sd.subMutex.RLock()
listeners := sd.subscribers[serviceName]
sd.subMutex.RUnlock()
for _, listener := range listeners {
go listener.OnChange(serviceName, instances)
}
}
// notifyError 通知错误
func (sd *ServiceDiscovery) notifyError(serviceName string, err error) {
sd.subMutex.RLock()
listeners := sd.subscribers[serviceName]
sd.subMutex.RUnlock()
for _, listener := range listeners {
go listener.OnError(serviceName, err)
}
}
4. 实际应用示例
4.1 用户服务示例
// user_service.go
// 用户服务示例
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"
)
// User 用户模型
type User struct {
ID int64 `json:"id"`
Username string `json:"username"`
Email string `json:"email"`
Phone string `json:"phone"`
Status int `json:"status"`
CreateAt int64 `json:"createAt"`
UpdateAt int64 `json:"updateAt"`
}
// UserService 用户服务
type UserService struct {
nacos *config.NacosConfig
registry *service.ServiceRegistry
discovery *service.ServiceDiscovery
configMgr *config.ConfigManager
logger *zap.Logger
users map[int64]*User // 模拟数据库
}
// NewUserService 创建用户服务
func NewUserService() (*UserService, error) {
// 初始化 Nacos 配置
nacos, err := config.NewNacosConfig()
if err != nil {
return nil, err
}
// 创建服务注册器
registry, err := service.NewServiceRegistry(nacos, "user-service", 8080)
if err != nil {
return nil, err
}
// 设置服务元数据
registry.SetMetadata("service.type", "user")
registry.SetMetadata("service.version", "1.0.0")
registry.SetMetadata("service.protocol", "http")
// 创建服务发现器
discovery := service.NewServiceDiscovery(nacos)
// 创建配置管理器
configMgr := config.NewConfigManager(nacos)
// 初始化模拟数据
users := make(map[int64]*User)
users[1] = &User{
ID: 1,
Username: "admin",
Email: "admin@example.com",
Phone: "13800138000",
Status: 1,
CreateAt: time.Now().Unix(),
UpdateAt: time.Now().Unix(),
}
return &UserService{
nacos: nacos,
registry: registry,
discovery: discovery,
configMgr: configMgr,
logger: nacos.Logger,
users: users,
}, nil
}
// Start 启动服务
func (us *UserService) Start() error {
// 加载配置
if err := us.configMgr.LoadConfig("user-service.yaml", "DEFAULT_GROUP"); err != nil {
us.logger.Error("加载配置失败", zap.Error(err))
return err
}
// 注册服务
if err := us.registry.Register(context.Background()); err != nil {
us.logger.Error("注册服务失败", zap.Error(err))
return err
}
// 启动 HTTP 服务器
router := us.setupRouter()
server := &http.Server{
Addr: ":8080",
Handler: router,
}
// 优雅关闭
go us.gracefulShutdown(server)
us.logger.Info("用户服务启动成功", zap.String("addr", ":8080"))
return server.ListenAndServe()
}
// setupRouter 设置路由
func (us *UserService) setupRouter() *gin.Engine {
gin.SetMode(gin.ReleaseMode)
router := gin.New()
router.Use(gin.Logger(), gin.Recovery())
// 健康检查
router.GET("/health", us.healthCheck)
// 用户相关接口
userGroup := router.Group("/api/v1/users")
{
userGroup.GET("", us.getUsers)
userGroup.GET("/:id", us.getUserByID)
userGroup.POST("", us.createUser)
userGroup.PUT("/:id", us.updateUser)
userGroup.DELETE("/:id", us.deleteUser)
}
// 服务信息接口
router.GET("/info", us.getServiceInfo)
return router
}
// healthCheck 健康检查
func (us *UserService) healthCheck(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"status": "UP",
"timestamp": time.Now().Unix(),
"service": "user-service",
})
}
// getUsers 获取用户列表
func (us *UserService) getUsers(c *gin.Context) {
users := make([]*User, 0, len(us.users))
for _, user := range us.users {
users = append(users, user)
}
c.JSON(http.StatusOK, gin.H{
"code": 0,
"msg": "success",
"data": users,
})
}
// getUserByID 根据 ID 获取用户
func (us *UserService) getUserByID(c *gin.Context) {
idStr := c.Param("id")
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"msg": "无效的用户 ID",
})
return
}
user, exists := us.users[id]
if !exists {
c.JSON(http.StatusNotFound, gin.H{
"code": 404,
"msg": "用户不存在",
})
return
}
c.JSON(http.StatusOK, gin.H{
"code": 0,
"msg": "success",
"data": user,
})
}
// createUser 创建用户
func (us *UserService) createUser(c *gin.Context) {
var user User
if err := c.ShouldBindJSON(&user); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"msg": "请求参数错误",
})
return
}
// 生成 ID
user.ID = int64(len(us.users) + 1)
user.CreateAt = time.Now().Unix()
user.UpdateAt = time.Now().Unix()
user.Status = 1
us.users[user.ID] = &user
c.JSON(http.StatusCreated, gin.H{
"code": 0,
"msg": "创建成功",
"data": user,
})
}
// updateUser 更新用户
func (us *UserService) updateUser(c *gin.Context) {
idStr := c.Param("id")
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"msg": "无效的用户 ID",
})
return
}
user, exists := us.users[id]
if !exists {
c.JSON(http.StatusNotFound, gin.H{
"code": 404,
"msg": "用户不存在",
})
return
}
var updateUser User
if err := c.ShouldBindJSON(&updateUser); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"msg": "请求参数错误",
})
return
}
// 更新字段
if updateUser.Username != "" {
user.Username = updateUser.Username
}
if updateUser.Email != "" {
user.Email = updateUser.Email
}
if updateUser.Phone != "" {
user.Phone = updateUser.Phone
}
user.UpdateAt = time.Now().Unix()
c.JSON(http.StatusOK, gin.H{
"code": 0,
"msg": "更新成功",
"data": user,
})
}
// deleteUser 删除用户
func (us *UserService) deleteUser(c *gin.Context) {
idStr := c.Param("id")
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"msg": "无效的用户 ID",
})
return
}
_, exists := us.users[id]
if !exists {
c.JSON(http.StatusNotFound, gin.H{
"code": 404,
"msg": "用户不存在",
})
return
}
delete(us.users, id)
c.JSON(http.StatusOK, gin.H{
"code": 0,
"msg": "删除成功",
})
}
// getServiceInfo 获取服务信息
func (us *UserService) getServiceInfo(c *gin.Context) {
config := us.configMgr.GetConfig()
c.JSON(http.StatusOK, gin.H{
"service": "user-service",
"version": "1.0.0",
"config": config,
"uptime": time.Now().Unix(),
})
}
// gracefulShutdown 优雅关闭
func (us *UserService) gracefulShutdown(server *http.Server) {
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
us.logger.Info("正在关闭服务...")
// 注销服务
if err := us.registry.Deregister(context.Background()); err != nil {
us.logger.Error("注销服务失败", zap.Error(err))
}
// 关闭 HTTP 服务器
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
us.logger.Error("关闭服务器失败", zap.Error(err))
}
// 关闭 Nacos 客户端
us.nacos.Close()
us.logger.Info("服务已关闭")
}
// main 主函数
func main() {
userService, err := NewUserService()
if err != nil {
panic(err)
}
if err := userService.Start(); err != nil && err != http.ErrServerClosed {
panic(err)
}
}
4.2 订单服务示例
// order_service.go
// 订单服务示例
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/gin-gonic/gin"
"github.com/nacos-group/nacos-sdk-go/v2/model"
"go.uber.org/zap"
"io"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"
)
// Order 订单模型
type Order struct {
ID int64 `json:"id"`
UserID int64 `json:"userId"`
Amount float64 `json:"amount"`
Status int `json:"status"`
CreateAt int64 `json:"createAt"`
UpdateAt int64 `json:"updateAt"`
}
// OrderService 订单服务
type OrderService struct {
nacos *config.NacosConfig
registry *service.ServiceRegistry
discovery *service.ServiceDiscovery
configMgr *config.ConfigManager
logger *zap.Logger
orders map[int64]*Order // 模拟数据库
httpClient *http.Client
}
// ServiceChangeListenerImpl 服务变更监听器实现
type ServiceChangeListenerImpl struct {
serviceName string
logger *zap.Logger
}
// OnChange 服务变更回调
func (scl *ServiceChangeListenerImpl) OnChange(serviceName string, instances []*model.Instance) {
scl.logger.Info("服务实例变更",
zap.String("serviceName", serviceName),
zap.Int("instanceCount", len(instances)),
)
}
// OnError 服务变更错误回调
func (scl *ServiceChangeListenerImpl) OnError(serviceName string, err error) {
scl.logger.Error("服务变更错误",
zap.String("serviceName", serviceName),
zap.Error(err),
)
}
// NewOrderService 创建订单服务
func NewOrderService() (*OrderService, error) {
// 初始化 Nacos 配置
nacos, err := config.NewNacosConfig()
if err != nil {
return nil, err
}
// 创建服务注册器
registry, err := service.NewServiceRegistry(nacos, "order-service", 8081)
if err != nil {
return nil, err
}
// 设置服务元数据
registry.SetMetadata("service.type", "order")
registry.SetMetadata("service.version", "1.0.0")
registry.SetMetadata("service.protocol", "http")
// 创建服务发现器
discovery := service.NewServiceDiscovery(nacos)
// 创建配置管理器
configMgr := config.NewConfigManager(nacos)
// 初始化模拟数据
orders := make(map[int64]*Order)
// 创建 HTTP 客户端
httpClient := &http.Client{
Timeout: 10 * time.Second,
}
return &OrderService{
nacos: nacos,
registry: registry,
discovery: discovery,
configMgr: configMgr,
logger: nacos.Logger,
orders: orders,
httpClient: httpClient,
}, nil
}
// Start 启动服务
func (os *OrderService) Start() error {
// 加载配置
if err := os.configMgr.LoadConfig("order-service.yaml", "DEFAULT_GROUP"); err != nil {
os.logger.Error("加载配置失败", zap.Error(err))
return err
}
// 注册服务
if err := os.registry.Register(context.Background()); err != nil {
os.logger.Error("注册服务失败", zap.Error(err))
return err
}
// 订阅用户服务
listener := &ServiceChangeListenerImpl{
serviceName: "user-service",
logger: os.logger,
}
if err := os.discovery.Subscribe(context.Background(), "user-service", "DEFAULT_GROUP", listener); err != nil {
os.logger.Error("订阅用户服务失败", zap.Error(err))
return err
}
// 启动 HTTP 服务器
router := os.setupRouter()
server := &http.Server{
Addr: ":8081",
Handler: router,
}
// 优雅关闭
go os.gracefulShutdown(server)
os.logger.Info("订单服务启动成功", zap.String("addr", ":8081"))
return server.ListenAndServe()
}
// setupRouter 设置路由
func (os *OrderService) setupRouter() *gin.Engine {
gin.SetMode(gin.ReleaseMode)
router := gin.New()
router.Use(gin.Logger(), gin.Recovery())
// 健康检查
router.GET("/health", os.healthCheck)
// 订单相关接口
orderGroup := router.Group("/api/v1/orders")
{
orderGroup.GET("", os.getOrders)
orderGroup.GET("/:id", os.getOrderByID)
orderGroup.POST("", os.createOrder)
orderGroup.PUT("/:id", os.updateOrder)
orderGroup.DELETE("/:id", os.deleteOrder)
}
// 服务信息接口
router.GET("/info", os.getServiceInfo)
return router
}
// healthCheck 健康检查
func (os *OrderService) healthCheck(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"status": "UP",
"timestamp": time.Now().Unix(),
"service": "order-service",
})
}
// getOrders 获取订单列表
func (os *OrderService) getOrders(c *gin.Context) {
orders := make([]*Order, 0, len(os.orders))
for _, order := range os.orders {
orders = append(orders, order)
}
c.JSON(http.StatusOK, gin.H{
"code": 0,
"msg": "success",
"data": orders,
})
}
// getOrderByID 根据 ID 获取订单
func (os *OrderService) getOrderByID(c *gin.Context) {
idStr := c.Param("id")
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"msg": "无效的订单 ID",
})
return
}
order, exists := os.orders[id]
if !exists {
c.JSON(http.StatusNotFound, gin.H{
"code": 404,
"msg": "订单不存在",
})
return
}
c.JSON(http.StatusOK, gin.H{
"code": 0,
"msg": "success",
"data": order,
})
}
// createOrder 创建订单
func (os *OrderService) createOrder(c *gin.Context) {
var order Order
if err := c.ShouldBindJSON(&order); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"msg": "请求参数错误",
})
return
}
// 验证用户是否存在
if !os.validateUser(order.UserID) {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"msg": "用户不存在",
})
return
}
// 生成 ID
order.ID = int64(len(os.orders) + 1)
order.CreateAt = time.Now().Unix()
order.UpdateAt = time.Now().Unix()
order.Status = 1
os.orders[order.ID] = &order
c.JSON(http.StatusCreated, gin.H{
"code": 0,
"msg": "创建成功",
"data": order,
})
}
// updateOrder 更新订单
func (os *OrderService) updateOrder(c *gin.Context) {
idStr := c.Param("id")
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"msg": "无效的订单 ID",
})
return
}
order, exists := os.orders[id]
if !exists {
c.JSON(http.StatusNotFound, gin.H{
"code": 404,
"msg": "订单不存在",
})
return
}
var updateOrder Order
if err := c.ShouldBindJSON(&updateOrder); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"msg": "请求参数错误",
})
return
}
// 更新字段
if updateOrder.Amount > 0 {
order.Amount = updateOrder.Amount
}
if updateOrder.Status > 0 {
order.Status = updateOrder.Status
}
order.UpdateAt = time.Now().Unix()
c.JSON(http.StatusOK, gin.H{
"code": 0,
"msg": "更新成功",
"data": order,
})
}
// deleteOrder 删除订单
func (os *OrderService) deleteOrder(c *gin.Context) {
idStr := c.Param("id")
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": 400,
"msg": "无效的订单 ID",
})
return
}
_, exists := os.orders[id]
if !exists {
c.JSON(http.StatusNotFound, gin.H{
"code": 404,
"msg": "订单不存在",
})
return
}
delete(os.orders, id)
c.JSON(http.StatusOK, gin.H{
"code": 0,
"msg": "删除成功",
})
}
// getServiceInfo 获取服务信息
func (os *OrderService) getServiceInfo(c *gin.Context) {
config := os.configMgr.GetConfig()
c.JSON(http.StatusOK, gin.H{
"service": "order-service",
"version": "1.0.0",
"config": config,
"uptime": time.Now().Unix(),
})
}
// validateUser 验证用户是否存在
func (os *OrderService) validateUser(userID int64) bool {
// 获取用户服务实例
instance, err := os.discovery.GetInstanceWithLoadBalance(
context.Background(),
"user-service",
"DEFAULT_GROUP",
service.LoadBalanceRandom,
)
if err != nil {
os.logger.Error("获取用户服务实例失败", zap.Error(err))
return false
}
// 调用用户服务
url := fmt.Sprintf("http://%s:%d/api/v1/users/%d", instance.Ip, instance.Port, userID)
resp, err := os.httpClient.Get(url)
if err != nil {
os.logger.Error("调用用户服务失败", zap.Error(err))
return false
}
defer resp.Body.Close()
return resp.StatusCode == http.StatusOK
}
// gracefulShutdown 优雅关闭
func (os *OrderService) gracefulShutdown(server *http.Server) {
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
os.logger.Info("正在关闭服务...")
// 取消订阅
if err := os.discovery.Unsubscribe(context.Background(), "user-service", "DEFAULT_GROUP"); err != nil {
os.logger.Error("取消订阅失败", zap.Error(err))
}
// 注销服务
if err := os.registry.Deregister(context.Background()); err != nil {
os.logger.Error("注销服务失败", zap.Error(err))
}
// 关闭 HTTP 服务器
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
os.logger.Error("关闭服务器失败", zap.Error(err))
}
// 关闭 Nacos 客户端
os.nacos.Close()
os.logger.Info("服务已关闭")
}
// main 主函数
func main() {
orderService, err := NewOrderService()
if err != nil {
panic(err)
}
if err := orderService.Start(); err != nil && err != http.ErrServerClosed {
panic(err)
}
}
5. 高级特性
5.1 配置加密解密
// config_crypto.go
// 配置加密解密
package crypto
import (
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"encoding/base64"
"errors"
"io"
)
// ConfigCrypto 配置加密解密器
type ConfigCrypto struct {
key []byte
}
// NewConfigCrypto 创建配置加密解密器
func NewConfigCrypto(key string) *ConfigCrypto {
return &ConfigCrypto{
key: []byte(key),
}
}
// Encrypt 加密配置
func (cc *ConfigCrypto) Encrypt(plaintext string) (string, error) {
block, err := aes.NewCipher(cc.key)
if err != nil {
return "", err
}
// 创建 GCM
gcm, err := cipher.NewGCM(block)
if err != nil {
return "", err
}
// 生成随机 nonce
nonce := make([]byte, gcm.NonceSize())
if _, err = io.ReadFull(rand.Reader, nonce); err != nil {
return "", err
}
// 加密
ciphertext := gcm.Seal(nonce, nonce, []byte(plaintext), nil)
// Base64 编码
return base64.StdEncoding.EncodeToString(ciphertext), nil
}
// Decrypt 解密配置
func (cc *ConfigCrypto) Decrypt(ciphertext string) (string, error) {
// Base64 解码
data, err := base64.StdEncoding.DecodeString(ciphertext)
if err != nil {
return "", err
}
block, err := aes.NewCipher(cc.key)
if err != nil {
return "", err
}
// 创建 GCM
gcm, err := cipher.NewGCM(block)
if err != nil {
return "", err
}
// 检查数据长度
nonceSize := gcm.NonceSize()
if len(data) < nonceSize {
return "", errors.New("密文太短")
}
// 提取 nonce 和密文
nonce, ciphertext := data[:nonceSize], data[nonceSize:]
// 解密
plaintext, err := gcm.Open(nil, nonce, ciphertext, nil)
if err != nil {
return "", err
}
return string(plaintext), nil
}
5.2 服务熔断器
// circuit_breaker.go
// 服务熔断器
package breaker
import (
"errors"
"sync"
"time"
)
// State 熔断器状态
type State int
const (
StateClosed State = iota
StateOpen
StateHalfOpen
)
// CircuitBreaker 熔断器
type CircuitBreaker struct {
mutex sync.RWMutex
state State
failureCount int
successCount int
requestCount int
lastFailureTime time.Time
// 配置参数
maxFailures int // 最大失败次数
timeout time.Duration // 超时时间
resetTimeout time.Duration // 重置超时时间
successThreshold int // 成功阈值
}
// NewCircuitBreaker 创建熔断器
func NewCircuitBreaker(maxFailures int, timeout, resetTimeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
state: StateClosed,
maxFailures: maxFailures,
timeout: timeout,
resetTimeout: resetTimeout,
successThreshold: 3,
}
}
// Call 执行调用
func (cb *CircuitBreaker) Call(fn func() error) error {
if !cb.canExecute() {
return errors.New("熔断器开启,拒绝请求")
}
err := fn()
cb.recordResult(err == nil)
return err
}
// canExecute 检查是否可以执行
func (cb *CircuitBreaker) canExecute() bool {
cb.mutex.RLock()
defer cb.mutex.RUnlock()
switch cb.state {
case StateClosed:
return true
case StateOpen:
return time.Since(cb.lastFailureTime) > cb.resetTimeout
case StateHalfOpen:
return true
default:
return false
}
}
// recordResult 记录结果
func (cb *CircuitBreaker) recordResult(success bool) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
cb.requestCount++
if success {
cb.successCount++
if cb.state == StateHalfOpen && cb.successCount >= cb.successThreshold {
cb.setState(StateClosed)
}
} else {
cb.failureCount++
cb.lastFailureTime = time.Now()
if cb.state == StateClosed && cb.failureCount >= cb.maxFailures {
cb.setState(StateOpen)
} else if cb.state == StateHalfOpen {
cb.setState(StateOpen)
}
}
}
// setState 设置状态
func (cb *CircuitBreaker) setState(state State) {
cb.state = state
if state == StateClosed {
cb.failureCount = 0
cb.successCount = 0
cb.requestCount = 0
} else if state == StateHalfOpen {
cb.successCount = 0
}
}
// GetState 获取状态
func (cb *CircuitBreaker) GetState() State {
cb.mutex.RLock()
defer cb.mutex.RUnlock()
return cb.state
}
// GetStats 获取统计信息
func (cb *CircuitBreaker) GetStats() (int, int, int, State) {
cb.mutex.RLock()
defer cb.mutex.RUnlock()
return cb.requestCount, cb.successCount, cb.failureCount, cb.state
}
5.3 服务限流器
// rate_limiter.go
// 服务限流器
package limiter
import (
"sync"
"time"
)
// TokenBucket 令牌桶限流器
type TokenBucket struct {
mutex sync.Mutex
capacity int64 // 桶容量
tokens int64 // 当前令牌数
rate int64 // 令牌生成速率(每秒)
lastRefill time.Time // 上次填充时间
}
// NewTokenBucket 创建令牌桶限流器
func NewTokenBucket(capacity, rate int64) *TokenBucket {
return &TokenBucket{
capacity: capacity,
tokens: capacity,
rate: rate,
lastRefill: time.Now(),
}
}
// Allow 检查是否允许请求
func (tb *TokenBucket) Allow() bool {
return tb.AllowN(1)
}
// AllowN 检查是否允许 N 个请求
func (tb *TokenBucket) AllowN(n int64) bool {
tb.mutex.Lock()
defer tb.mutex.Unlock()
tb.refill()
if tb.tokens >= n {
tb.tokens -= n
return true
}
return false
}
// refill 填充令牌
func (tb *TokenBucket) refill() {
now := time.Now()
elapsed := now.Sub(tb.lastRefill)
tokensToAdd := int64(elapsed.Seconds()) * tb.rate
if tokensToAdd > 0 {
tb.tokens += tokensToAdd
if tb.tokens > tb.capacity {
tb.tokens = tb.capacity
}
tb.lastRefill = now
}
}
// GetTokens 获取当前令牌数
func (tb *TokenBucket) GetTokens() int64 {
tb.mutex.Lock()
defer tb.mutex.Unlock()
tb.refill()
return tb.tokens
}
// SlidingWindow 滑动窗口限流器
type SlidingWindow struct {
mutex sync.Mutex
window time.Duration // 窗口大小
limit int64 // 限制数量
requests []time.Time // 请求时间戳
}
// NewSlidingWindow 创建滑动窗口限流器
func NewSlidingWindow(window time.Duration, limit int64) *SlidingWindow {
return &SlidingWindow{
window: window,
limit: limit,
requests: make([]time.Time, 0),
}
}
// Allow 检查是否允许请求
func (sw *SlidingWindow) Allow() bool {
sw.mutex.Lock()
defer sw.mutex.Unlock()
now := time.Now()
// 清理过期请求
sw.cleanup(now)
// 检查是否超过限制
if int64(len(sw.requests)) >= sw.limit {
return false
}
// 添加当前请求
sw.requests = append(sw.requests, now)
return true
}
// cleanup 清理过期请求
func (sw *SlidingWindow) cleanup(now time.Time) {
cutoff := now.Add(-sw.window)
// 找到第一个未过期的请求
i := 0
for i < len(sw.requests) && sw.requests[i].Before(cutoff) {
i++
}
// 移除过期请求
if i > 0 {
sw.requests = sw.requests[i:]
}
}
// GetRequestCount 获取当前请求数
func (sw *SlidingWindow) GetRequestCount() int {
sw.mutex.Lock()
defer sw.mutex.Unlock()
sw.cleanup(time.Now())
return len(sw.requests)
}
6. 测试与调试
6.1 单元测试
// service_test.go
// 服务测试
package service
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
// MockNamingClient 模拟命名客户端
type MockNamingClient struct {
mock.Mock
}
func (m *MockNamingClient) RegisterInstance(param vo.RegisterInstanceParam) (bool, error) {
args := m.Called(param)
return args.Bool(0), args.Error(1)
}
func (m *MockNamingClient) DeregisterInstance(param vo.DeregisterInstanceParam) (bool, error) {
args := m.Called(param)
return args.Bool(0), args.Error(1)
}
// TestServiceRegistry 测试服务注册
func TestServiceRegistry(t *testing.T) {
// 创建模拟客户端
mockClient := new(MockNamingClient)
// 设置期望
mockClient.On("RegisterInstance", mock.AnythingOfType("vo.RegisterInstanceParam")).Return(true, nil)
// 创建服务注册器
registry := &ServiceRegistry{
serviceName: "test-service",
groupName: "DEFAULT_GROUP",
ip: "127.0.0.1",
port: 8080,
weight: 1.0,
metadata: make(map[string]string),
}
// 执行注册
err := registry.Register(context.Background())
// 验证结果
assert.NoError(t, err)
mockClient.AssertExpectations(t)
}
// TestCircuitBreaker 测试熔断器
func TestCircuitBreaker(t *testing.T) {
cb := NewCircuitBreaker(3, time.Second, 5*time.Second)
// 测试正常调用
err := cb.Call(func() error {
return nil
})
assert.NoError(t, err)
assert.Equal(t, StateClosed, cb.GetState())
// 测试失败调用
for i := 0; i < 3; i++ {
cb.Call(func() error {
return errors.New("test error")
})
}
assert.Equal(t, StateOpen, cb.GetState())
// 测试熔断状态下的调用
err = cb.Call(func() error {
return nil
})
assert.Error(t, err)
assert.Contains(t, err.Error(), "熔断器开启")
}
// TestTokenBucket 测试令牌桶
func TestTokenBucket(t *testing.T) {
tb := NewTokenBucket(10, 5)
// 测试初始状态
assert.True(t, tb.Allow())
assert.Equal(t, int64(9), tb.GetTokens())
// 测试批量消费
assert.True(t, tb.AllowN(5))
assert.Equal(t, int64(4), tb.GetTokens())
// 测试超出容量
assert.False(t, tb.AllowN(10))
// 等待令牌补充
time.Sleep(2 * time.Second)
assert.True(t, tb.AllowN(5))
}
// BenchmarkServiceDiscovery 性能测试
func BenchmarkServiceDiscovery(b *testing.B) {
discovery := NewServiceDiscovery(nil)
instances := []*model.Instance{
{Ip: "127.0.0.1", Port: 8080, Weight: 1.0},
{Ip: "127.0.0.1", Port: 8081, Weight: 1.0},
{Ip: "127.0.0.1", Port: 8082, Weight: 1.0},
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
discovery.randomSelect(instances)
}
}
6.2 集成测试
// integration_test.go
// 集成测试
package main
import (
"context"
"encoding/json"
"fmt"
"net/http"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
// IntegrationTestSuite 集成测试套件
type IntegrationTestSuite struct {
suite.Suite
userService *UserService
orderService *OrderService
httpClient *http.Client
}
// SetupSuite 设置测试套件
func (suite *IntegrationTestSuite) SetupSuite() {
// 启动用户服务
userService, err := NewUserService()
suite.Require().NoError(err)
suite.userService = userService
go func() {
userService.Start()
}()
// 启动订单服务
orderService, err := NewOrderService()
suite.Require().NoError(err)
suite.orderService = orderService
go func() {
orderService.Start()
}()
// 等待服务启动
time.Sleep(3 * time.Second)
suite.httpClient = &http.Client{
Timeout: 10 * time.Second,
}
}
// TearDownSuite 清理测试套件
func (suite *IntegrationTestSuite) TearDownSuite() {
// 关闭服务
if suite.userService != nil {
suite.userService.nacos.Close()
}
if suite.orderService != nil {
suite.orderService.nacos.Close()
}
}
// TestServiceRegistration 测试服务注册
func (suite *IntegrationTestSuite) TestServiceRegistration() {
// 检查用户服务健康状态
resp, err := suite.httpClient.Get("http://localhost:8080/health")
suite.Require().NoError(err)
suite.Equal(http.StatusOK, resp.StatusCode)
resp.Body.Close()
// 检查订单服务健康状态
resp, err = suite.httpClient.Get("http://localhost:8081/health")
suite.Require().NoError(err)
suite.Equal(http.StatusOK, resp.StatusCode)
resp.Body.Close()
}
// TestServiceDiscovery 测试服务发现
func (suite *IntegrationTestSuite) TestServiceDiscovery() {
// 通过订单服务调用用户服务
resp, err := suite.httpClient.Get("http://localhost:8081/api/v1/orders")
suite.Require().NoError(err)
suite.Equal(http.StatusOK, resp.StatusCode)
resp.Body.Close()
}
// TestConfigManagement 测试配置管理
func (suite *IntegrationTestSuite) TestConfigManagement() {
// 获取服务配置信息
resp, err := suite.httpClient.Get("http://localhost:8080/info")
suite.Require().NoError(err)
suite.Equal(http.StatusOK, resp.StatusCode)
var result map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&result)
suite.Require().NoError(err)
resp.Body.Close()
suite.Equal("user-service", result["service"])
suite.Equal("1.0.0", result["version"])
}
// TestInSuite 运行集成测试
func TestIntegrationSuite(t *testing.T) {
suite.Run(t, new(IntegrationTestSuite))
}
7. 核心要点
7.1 架构设计
- 模块化设计: 将服务注册、服务发现、配置管理等功能模块化
- 接口抽象: 定义清晰的接口,便于测试和扩展
- 错误处理: 完善的错误处理和日志记录
- 并发安全: 使用互斥锁保证并发安全
7.2 最佳实践
- 服务注册: 设置合适的元数据,便于服务治理
- 健康检查: 实现健康检查接口,确保服务可用性
- 负载均衡: 根据业务需求选择合适的负载均衡策略
- 配置管理: 使用配置热更新,提高系统灵活性
- 故障处理: 实现熔断、限流等容错机制
7.3 性能优化
- 连接池: 使用连接池减少连接开销
- 缓存机制: 缓存服务实例信息,减少网络请求
- 异步处理: 使用异步方式处理服务变更通知
- 批量操作: 支持批量注册和注销服务
8. 下一步学习
- Spring Native: 学习 Spring Native 原生镜像技术
- 服务网格: 了解 Istio、Linkerd 等服务网格技术
- 可观测性: 学习分布式链路追踪和监控
- 云原生: 掌握 Kubernetes、Docker 等容器技术 “`