概述

本章将深入探讨 gRPC 在微服务架构中的应用,包括服务发现机制、负载均衡策略、服务治理、分布式系统设计等。我们将学习如何构建可扩展、高可用的微服务系统。

学习目标

  • 理解微服务架构的核心概念和设计原则
  • 掌握服务发现的实现方式和最佳实践
  • 学习负载均衡和故障转移机制
  • 了解服务治理和监控体系
  • 掌握分布式系统的设计模式

微服务架构基础

from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Callable, Union
from abc import ABC, abstractmethod
import time
import json
import uuid
import hashlib
from datetime import datetime, timedelta

class ServiceState(Enum):
    """服务状态枚举"""
    STARTING = "starting"
    RUNNING = "running"
    STOPPING = "stopping"
    STOPPED = "stopped"
    UNHEALTHY = "unhealthy"
    MAINTENANCE = "maintenance"

class DiscoveryType(Enum):
    """服务发现类型枚举"""
    CONSUL = "consul"
    ETCD = "etcd"
    ZOOKEEPER = "zookeeper"
    KUBERNETES = "kubernetes"
    EUREKA = "eureka"
    NACOS = "nacos"

class LoadBalanceStrategy(Enum):
    """负载均衡策略枚举"""
    ROUND_ROBIN = "round_robin"
    WEIGHTED_ROUND_ROBIN = "weighted_round_robin"
    LEAST_CONNECTIONS = "least_connections"
    RANDOM = "random"
    CONSISTENT_HASH = "consistent_hash"
    LOCALITY_AWARE = "locality_aware"

class HealthCheckType(Enum):
    """健康检查类型枚举"""
    HTTP = "http"
    TCP = "tcp"
    GRPC = "grpc"
    SCRIPT = "script"
    TTL = "ttl"

@dataclass
class ServiceInstance:
    """服务实例"""
    id: str
    name: str
    address: str
    port: int
    version: str
    metadata: Dict[str, str]
    state: ServiceState = ServiceState.STARTING
    weight: int = 100
    zone: str = "default"
    region: str = "default"
    tags: List[str] = None
    last_heartbeat: datetime = None
    health_check_url: str = ""
    
@dataclass
class ServiceConfig:
    """服务配置"""
    name: str
    discovery_type: DiscoveryType
    load_balance_strategy: LoadBalanceStrategy
    health_check_interval: int = 30
    health_check_timeout: int = 5
    health_check_type: HealthCheckType = HealthCheckType.GRPC
    retry_count: int = 3
    circuit_breaker_enabled: bool = True
    timeout_ms: int = 5000
    
class MicroserviceManager:
    """微服务管理器"""
    
    def __init__(self):
        self.services = {}
        self.discovery_clients = {}
        
    def create_service_discovery(self) -> str:
        """创建服务发现实现"""
        return """
// service_discovery.go - 服务发现实现
package discovery

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "sync"
    "time"
    
    "google.golang.org/grpc"
    "google.golang.org/grpc/resolver"
    "google.golang.org/grpc/serviceconfig"
)

// ServiceInstance 服务实例
type ServiceInstance struct {
    ID          string            `json:"id"`
    Name        string            `json:"name"`
    Address     string            `json:"address"`
    Port        int               `json:"port"`
    Version     string            `json:"version"`
    Metadata    map[string]string `json:"metadata"`
    State       string            `json:"state"`
    Weight      int               `json:"weight"`
    Zone        string            `json:"zone"`
    Region      string            `json:"region"`
    Tags        []string          `json:"tags"`
    LastHeartbeat time.Time       `json:"last_heartbeat"`
    HealthCheckURL string         `json:"health_check_url"`
}

// ServiceRegistry 服务注册接口
type ServiceRegistry interface {
    Register(instance *ServiceInstance) error
    Deregister(instanceID string) error
    Heartbeat(instanceID string) error
    GetInstances(serviceName string) ([]*ServiceInstance, error)
    Watch(serviceName string) (<-chan []*ServiceInstance, error)
    Close() error
}

// ConsulRegistry Consul服务注册实现
type ConsulRegistry struct {
    client     ConsulClient
    instances  map[string]*ServiceInstance
    watchers   map[string][]chan []*ServiceInstance
    mu         sync.RWMutex
    stopCh     chan struct{}
    healthTTL  time.Duration
}

// NewConsulRegistry 创建Consul注册中心
func NewConsulRegistry(consulAddr string) (*ConsulRegistry, error) {
    client, err := NewConsulClient(consulAddr)
    if err != nil {
        return nil, fmt.Errorf("failed to create consul client: %w", err)
    }
    
    return &ConsulRegistry{
        client:    client,
        instances: make(map[string]*ServiceInstance),
        watchers:  make(map[string][]chan []*ServiceInstance),
        stopCh:    make(chan struct{}),
        healthTTL: 30 * time.Second,
    }, nil
}

// Register 注册服务实例
func (cr *ConsulRegistry) Register(instance *ServiceInstance) error {
    cr.mu.Lock()
    defer cr.mu.Unlock()
    
    // 构建Consul服务定义
    service := &ConsulService{
        ID:      instance.ID,
        Name:    instance.Name,
        Address: instance.Address,
        Port:    instance.Port,
        Tags:    instance.Tags,
        Meta:    instance.Metadata,
        Check: &ConsulCheck{
            GRPC:                           fmt.Sprintf("%s:%d", instance.Address, instance.Port),
            Interval:                       "30s",
            Timeout:                        "5s",
            DeregisterCriticalServiceAfter: "90s",
        },
    }
    
    // 注册到Consul
    if err := cr.client.RegisterService(service); err != nil {
        return fmt.Errorf("failed to register service: %w", err)
    }
    
    // 保存实例信息
    cr.instances[instance.ID] = instance
    
    log.Printf("Registered service instance: %s (%s:%d)", instance.ID, instance.Address, instance.Port)
    return nil
}

// Deregister 注销服务实例
func (cr *ConsulRegistry) Deregister(instanceID string) error {
    cr.mu.Lock()
    defer cr.mu.Unlock()
    
    if err := cr.client.DeregisterService(instanceID); err != nil {
        return fmt.Errorf("failed to deregister service: %w", err)
    }
    
    delete(cr.instances, instanceID)
    
    log.Printf("Deregistered service instance: %s", instanceID)
    return nil
}

// Heartbeat 发送心跳
func (cr *ConsulRegistry) Heartbeat(instanceID string) error {
    return cr.client.PassTTL(instanceID, "Service is healthy")
}

// GetInstances 获取服务实例列表
func (cr *ConsulRegistry) GetInstances(serviceName string) ([]*ServiceInstance, error) {
    services, err := cr.client.GetHealthyServices(serviceName)
    if err != nil {
        return nil, fmt.Errorf("failed to get services: %w", err)
    }
    
    instances := make([]*ServiceInstance, 0, len(services))
    for _, service := range services {
        instance := &ServiceInstance{
            ID:      service.ID,
            Name:    service.Name,
            Address: service.Address,
            Port:    service.Port,
            Tags:    service.Tags,
            Metadata: service.Meta,
            State:   "running",
            Weight:  100,
        }
        
        // 从元数据中获取权重
        if weightStr, ok := service.Meta["weight"]; ok {
            if weight, err := strconv.Atoi(weightStr); err == nil {
                instance.Weight = weight
            }
        }
        
        instances = append(instances, instance)
    }
    
    return instances, nil
}

// Watch 监听服务变化
func (cr *ConsulRegistry) Watch(serviceName string) (<-chan []*ServiceInstance, error) {
    cr.mu.Lock()
    defer cr.mu.Unlock()
    
    ch := make(chan []*ServiceInstance, 1)
    cr.watchers[serviceName] = append(cr.watchers[serviceName], ch)
    
    // 启动监听协程
    go cr.watchService(serviceName)
    
    return ch, nil
}

// watchService 监听服务变化
func (cr *ConsulRegistry) watchService(serviceName string) {
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()
    
    var lastIndex uint64
    
    for {
        select {
        case <-ticker.C:
            services, index, err := cr.client.GetServicesWithIndex(serviceName, lastIndex)
            if err != nil {
                log.Printf("Failed to watch service %s: %v", serviceName, err)
                continue
            }
            
            if index != lastIndex {
                lastIndex = index
                
                instances := make([]*ServiceInstance, 0, len(services))
                for _, service := range services {
                    instances = append(instances, &ServiceInstance{
                        ID:      service.ID,
                        Name:    service.Name,
                        Address: service.Address,
                        Port:    service.Port,
                        Tags:    service.Tags,
                        Metadata: service.Meta,
                        State:   "running",
                    })
                }
                
                // 通知所有监听者
                cr.notifyWatchers(serviceName, instances)
            }
            
        case <-cr.stopCh:
            return
        }
    }
}

// notifyWatchers 通知监听者
func (cr *ConsulRegistry) notifyWatchers(serviceName string, instances []*ServiceInstance) {
    cr.mu.RLock()
    watchers := cr.watchers[serviceName]
    cr.mu.RUnlock()
    
    for _, watcher := range watchers {
        select {
        case watcher <- instances:
        default:
            // 非阻塞发送
        }
    }
}

// Close 关闭注册中心
func (cr *ConsulRegistry) Close() error {
    close(cr.stopCh)
    return nil
}

// gRPC Resolver 实现
type discoveryResolver struct {
    registry     ServiceRegistry
    serviceName  string
    cc           resolver.ClientConn
    watchCh      <-chan []*ServiceInstance
    closeCh      chan struct{}
}

// NewDiscoveryResolver 创建服务发现解析器
func NewDiscoveryResolver(registry ServiceRegistry) resolver.Builder {
    return &discoveryResolverBuilder{
        registry: registry,
    }
}

type discoveryResolverBuilder struct {
    registry ServiceRegistry
}

// Build 构建解析器
func (drb *discoveryResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    serviceName := target.Endpoint
    
    watchCh, err := drb.registry.Watch(serviceName)
    if err != nil {
        return nil, fmt.Errorf("failed to watch service: %w", err)
    }
    
    dr := &discoveryResolver{
        registry:    drb.registry,
        serviceName: serviceName,
        cc:          cc,
        watchCh:     watchCh,
        closeCh:     make(chan struct{}),
    }
    
    // 启动监听协程
    go dr.watch()
    
    // 初始解析
    dr.resolve()
    
    return dr, nil
}

// Scheme 返回解析器方案
func (drb *discoveryResolverBuilder) Scheme() string {
    return "discovery"
}

// ResolveNow 立即解析
func (dr *discoveryResolver) ResolveNow(opts resolver.ResolveNowOptions) {
    dr.resolve()
}

// Close 关闭解析器
func (dr *discoveryResolver) Close() {
    close(dr.closeCh)
}

// watch 监听服务变化
func (dr *discoveryResolver) watch() {
    for {
        select {
        case instances := <-dr.watchCh:
            dr.updateInstances(instances)
        case <-dr.closeCh:
            return
        }
    }
}

// resolve 解析服务
func (dr *discoveryResolver) resolve() {
    instances, err := dr.registry.GetInstances(dr.serviceName)
    if err != nil {
        dr.cc.ReportError(fmt.Errorf("failed to get instances: %w", err))
        return
    }
    
    dr.updateInstances(instances)
}

// updateInstances 更新实例列表
func (dr *discoveryResolver) updateInstances(instances []*ServiceInstance) {
    addresses := make([]resolver.Address, 0, len(instances))
    
    for _, instance := range instances {
        addr := resolver.Address{
            Addr: fmt.Sprintf("%s:%d", instance.Address, instance.Port),
            Attributes: attributes.New(
                "weight", instance.Weight,
                "zone", instance.Zone,
                "region", instance.Region,
                "version", instance.Version,
            ),
        }
        addresses = append(addresses, addr)
    }
    
    state := resolver.State{
        Addresses: addresses,
        ServiceConfig: dr.buildServiceConfig(),
    }
    
    dr.cc.UpdateState(state)
}

// buildServiceConfig 构建服务配置
func (dr *discoveryResolver) buildServiceConfig() *serviceconfig.ParseResult {
    config := map[string]interface{}{
        "loadBalancingPolicy": "round_robin",
        "healthCheckConfig": map[string]interface{}{
            "serviceName": dr.serviceName,
        },
    }
    
    configJSON, _ := json.Marshal(config)
    return serviceconfig.Parse(string(configJSON))
}

// Consul客户端接口和实现
type ConsulClient interface {
    RegisterService(service *ConsulService) error
    DeregisterService(serviceID string) error
    GetHealthyServices(serviceName string) ([]*ConsulService, error)
    GetServicesWithIndex(serviceName string, waitIndex uint64) ([]*ConsulService, uint64, error)
    PassTTL(checkID, note string) error
}

type ConsulService struct {
    ID      string            `json:"ID"`
    Name    string            `json:"Name"`
    Address string            `json:"Address"`
    Port    int               `json:"Port"`
    Tags    []string          `json:"Tags"`
    Meta    map[string]string `json:"Meta"`
    Check   *ConsulCheck      `json:"Check"`
}

type ConsulCheck struct {
    GRPC                           string `json:"GRPC"`
    Interval                       string `json:"Interval"`
    Timeout                        string `json:"Timeout"`
    DeregisterCriticalServiceAfter string `json:"DeregisterCriticalServiceAfter"`
}
"""
    
    def create_load_balancer(self) -> str:
        """创建负载均衡器"""
        return """
// load_balancer.go - 负载均衡实现
package balancer

import (
    "context"
    "fmt"
    "hash/crc32"
    "math/rand"
    "sort"
    "sync"
    "sync/atomic"
    "time"
    
    "google.golang.org/grpc/balancer"
    "google.golang.org/grpc/balancer/base"
    "google.golang.org/grpc/grpclog"
    "google.golang.org/grpc/resolver"
)

// WeightedRoundRobinBalancer 加权轮询负载均衡器
type WeightedRoundRobinBalancer struct {
    mu       sync.RWMutex
    subConns []balancer.SubConn
    weights  []int
    current  int64
    total    int
}

// NewWeightedRoundRobinBalancer 创建加权轮询负载均衡器
func NewWeightedRoundRobinBalancer() balancer.Builder {
    return &weightedRoundRobinBuilder{}
}

type weightedRoundRobinBuilder struct{}

// Build 构建负载均衡器
func (wrrb *weightedRoundRobinBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
    return &WeightedRoundRobinBalancer{}
}

// Name 返回负载均衡器名称
func (wrrb *weightedRoundRobinBuilder) Name() string {
    return "weighted_round_robin"
}

// UpdateClientConnState 更新客户端连接状态
func (wrrb *WeightedRoundRobinBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
    wrrb.mu.Lock()
    defer wrrb.mu.Unlock()
    
    // 清理旧连接
    for _, sc := range wrrb.subConns {
        wrrb.cc.RemoveSubConn(sc)
    }
    
    // 创建新连接
    wrrb.subConns = make([]balancer.SubConn, 0, len(state.ResolverState.Addresses))
    wrrb.weights = make([]int, 0, len(state.ResolverState.Addresses))
    wrrb.total = 0
    
    for _, addr := range state.ResolverState.Addresses {
        sc, err := wrrb.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
        if err != nil {
            grpclog.Errorf("Failed to create SubConn: %v", err)
            continue
        }
        
        weight := 100 // 默认权重
        if w, ok := addr.Attributes.Value("weight").(int); ok {
            weight = w
        }
        
        wrrb.subConns = append(wrrb.subConns, sc)
        wrrb.weights = append(wrrb.weights, weight)
        wrrb.total += weight
        
        sc.Connect()
    }
    
    return nil
}

// Pick 选择连接
func (wrrb *WeightedRoundRobinBalancer) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
    wrrb.mu.RLock()
    defer wrrb.mu.RUnlock()
    
    if len(wrrb.subConns) == 0 {
        return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
    }
    
    // 加权轮询算法
    current := atomic.AddInt64(&wrrb.current, 1)
    index := int(current % int64(wrrb.total))
    
    cumulative := 0
    for i, weight := range wrrb.weights {
        cumulative += weight
        if index < cumulative {
            return balancer.PickResult{
                SubConn: wrrb.subConns[i],
            }, nil
        }
    }
    
    // 回退到第一个连接
    return balancer.PickResult{
        SubConn: wrrb.subConns[0],
    }, nil
}

// ConsistentHashBalancer 一致性哈希负载均衡器
type ConsistentHashBalancer struct {
    mu       sync.RWMutex
    ring     *HashRing
    subConns map[string]balancer.SubConn
}

// HashRing 哈希环
type HashRing struct {
    nodes    []uint32
    nodeMap  map[uint32]string
    replicas int
}

// NewConsistentHashBalancer 创建一致性哈希负载均衡器
func NewConsistentHashBalancer() balancer.Builder {
    return &consistentHashBuilder{}
}

type consistentHashBuilder struct{}

// Build 构建负载均衡器
func (chb *consistentHashBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
    return &ConsistentHashBalancer{
        ring: &HashRing{
            nodeMap:  make(map[uint32]string),
            replicas: 150, // 虚拟节点数
        },
        subConns: make(map[string]balancer.SubConn),
    }
}

// Name 返回负载均衡器名称
func (chb *consistentHashBuilder) Name() string {
    return "consistent_hash"
}

// UpdateClientConnState 更新客户端连接状态
func (chb *ConsistentHashBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
    chb.mu.Lock()
    defer chb.mu.Unlock()
    
    // 清理旧连接
    for _, sc := range chb.subConns {
        chb.cc.RemoveSubConn(sc)
    }
    
    // 重建哈希环
    chb.ring = &HashRing{
        nodeMap:  make(map[uint32]string),
        replicas: 150,
    }
    chb.subConns = make(map[string]balancer.SubConn)
    
    for _, addr := range state.ResolverState.Addresses {
        sc, err := chb.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
        if err != nil {
            grpclog.Errorf("Failed to create SubConn: %v", err)
            continue
        }
        
        nodeKey := addr.Addr
        chb.subConns[nodeKey] = sc
        chb.ring.AddNode(nodeKey)
        
        sc.Connect()
    }
    
    return nil
}

// Pick 选择连接
func (chb *ConsistentHashBalancer) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
    chb.mu.RLock()
    defer chb.mu.RUnlock()
    
    if len(chb.subConns) == 0 {
        return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
    }
    
    // 从上下文中获取哈希键
    hashKey := "default"
    if key, ok := info.Ctx.Value("hash_key").(string); ok {
        hashKey = key
    }
    
    // 使用一致性哈希选择节点
    nodeKey := chb.ring.GetNode(hashKey)
    if sc, ok := chb.subConns[nodeKey]; ok {
        return balancer.PickResult{
            SubConn: sc,
        }, nil
    }
    
    // 回退到第一个可用连接
    for _, sc := range chb.subConns {
        return balancer.PickResult{
            SubConn: sc,
        }, nil
    }
    
    return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}

// AddNode 添加节点到哈希环
func (hr *HashRing) AddNode(node string) {
    for i := 0; i < hr.replicas; i++ {
        virtualNode := fmt.Sprintf("%s#%d", node, i)
        hash := crc32.ChecksumIEEE([]byte(virtualNode))
        hr.nodes = append(hr.nodes, hash)
        hr.nodeMap[hash] = node
    }
    sort.Slice(hr.nodes, func(i, j int) bool {
        return hr.nodes[i] < hr.nodes[j]
    })
}

// GetNode 获取节点
func (hr *HashRing) GetNode(key string) string {
    if len(hr.nodes) == 0 {
        return ""
    }
    
    hash := crc32.ChecksumIEEE([]byte(key))
    
    // 二分查找
    idx := sort.Search(len(hr.nodes), func(i int) bool {
        return hr.nodes[i] >= hash
    })
    
    if idx == len(hr.nodes) {
        idx = 0
    }
    
    return hr.nodeMap[hr.nodes[idx]]
}

// LocalityAwareBalancer 地域感知负载均衡器
type LocalityAwareBalancer struct {
    mu           sync.RWMutex
    localZone    string
    localRegion  string
    zoneSubConns map[string][]balancer.SubConn
    current      map[string]int64
}

// NewLocalityAwareBalancer 创建地域感知负载均衡器
func NewLocalityAwareBalancer(localZone, localRegion string) balancer.Builder {
    return &localityAwareBuilder{
        localZone:   localZone,
        localRegion: localRegion,
    }
}

type localityAwareBuilder struct {
    localZone   string
    localRegion string
}

// Build 构建负载均衡器
func (lab *localityAwareBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
    return &LocalityAwareBalancer{
        localZone:    lab.localZone,
        localRegion:  lab.localRegion,
        zoneSubConns: make(map[string][]balancer.SubConn),
        current:      make(map[string]int64),
    }
}

// Name 返回负载均衡器名称
func (lab *localityAwareBuilder) Name() string {
    return "locality_aware"
}

// UpdateClientConnState 更新客户端连接状态
func (lab *LocalityAwareBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
    lab.mu.Lock()
    defer lab.mu.Unlock()
    
    // 清理旧连接
    for _, subConns := range lab.zoneSubConns {
        for _, sc := range subConns {
            lab.cc.RemoveSubConn(sc)
        }
    }
    
    // 按地域分组连接
    lab.zoneSubConns = make(map[string][]balancer.SubConn)
    
    for _, addr := range state.ResolverState.Addresses {
        sc, err := lab.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
        if err != nil {
            grpclog.Errorf("Failed to create SubConn: %v", err)
            continue
        }
        
        zone := "default"
        if z, ok := addr.Attributes.Value("zone").(string); ok {
            zone = z
        }
        
        lab.zoneSubConns[zone] = append(lab.zoneSubConns[zone], sc)
        sc.Connect()
    }
    
    return nil
}

// Pick 选择连接
func (lab *LocalityAwareBalancer) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
    lab.mu.RLock()
    defer lab.mu.RUnlock()
    
    // 优先选择本地区域
    if subConns, ok := lab.zoneSubConns[lab.localZone]; ok && len(subConns) > 0 {
        return lab.pickFromZone(lab.localZone, subConns), nil
    }
    
    // 选择同区域其他可用区
    for zone, subConns := range lab.zoneSubConns {
        if len(subConns) > 0 {
            return lab.pickFromZone(zone, subConns), nil
        }
    }
    
    return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}

// pickFromZone 从指定区域选择连接
func (lab *LocalityAwareBalancer) pickFromZone(zone string, subConns []balancer.SubConn) balancer.PickResult {
    current := atomic.AddInt64(&lab.current[zone], 1)
    index := int(current) % len(subConns)
    
    return balancer.PickResult{
        SubConn: subConns[index],
    }
}
"""
    
    def create_service_mesh(self) -> str:
        """创建服务网格集成"""
        return """
// service_mesh.go - 服务网格集成
package mesh

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "sync"
    "time"
    
    "google.golang.org/grpc"
    "google.golang.org/grpc/metadata"
)

// ServiceMesh 服务网格接口
type ServiceMesh interface {
    RegisterService(service *ServiceInfo) error
    DiscoverServices(serviceName string) ([]*ServiceEndpoint, error)
    InjectSidecar(config *SidecarConfig) error
    ConfigureTraffic(rules *TrafficRules) error
    EnableSecurity(policy *SecurityPolicy) error
    GetMetrics() (*MeshMetrics, error)
}

// ServiceInfo 服务信息
type ServiceInfo struct {
    Name        string            `json:"name"`
    Version     string            `json:"version"`
    Namespace   string            `json:"namespace"`
    Labels      map[string]string `json:"labels"`
    Annotations map[string]string `json:"annotations"`
    Ports       []ServicePort     `json:"ports"`
}

// ServicePort 服务端口
type ServicePort struct {
    Name     string `json:"name"`
    Port     int    `json:"port"`
    Protocol string `json:"protocol"`
}

// ServiceEndpoint 服务端点
type ServiceEndpoint struct {
    Address   string            `json:"address"`
    Port      int               `json:"port"`
    Weight    int               `json:"weight"`
    Healthy   bool              `json:"healthy"`
    Metadata  map[string]string `json:"metadata"`
    Zone      string            `json:"zone"`
    Region    string            `json:"region"`
}

// SidecarConfig Sidecar配置
type SidecarConfig struct {
    ServiceName   string                 `json:"service_name"`
    ProxyImage    string                 `json:"proxy_image"`
    Resources     ResourceRequirements   `json:"resources"`
    Interceptors  []InterceptorConfig    `json:"interceptors"`
    TLS           TLSConfig              `json:"tls"`
    Tracing       TracingConfig          `json:"tracing"`
    Metrics       MetricsConfig          `json:"metrics"`
}

// TrafficRules 流量规则
type TrafficRules struct {
    ServiceName     string              `json:"service_name"`
    RoutingRules    []RoutingRule       `json:"routing_rules"`
    LoadBalancing   LoadBalancingConfig `json:"load_balancing"`
    CircuitBreaker  CircuitBreakerConfig `json:"circuit_breaker"`
    RateLimit       RateLimitConfig     `json:"rate_limit"`
    Timeout         TimeoutConfig       `json:"timeout"`
    Retry           RetryConfig         `json:"retry"`
}

// RoutingRule 路由规则
type RoutingRule struct {
    Match       MatchCondition `json:"match"`
    Destination Destination    `json:"destination"`
    Weight      int            `json:"weight"`
}

// MatchCondition 匹配条件
type MatchCondition struct {
    Headers map[string]string `json:"headers"`
    Method  string            `json:"method"`
    Path    string            `json:"path"`
}

// Destination 目标服务
type Destination struct {
    Service string `json:"service"`
    Version string `json:"version"`
    Subset  string `json:"subset"`
}

// SecurityPolicy 安全策略
type SecurityPolicy struct {
    ServiceName        string                `json:"service_name"`
    AuthenticationPolicy AuthenticationPolicy `json:"authentication"`
    AuthorizationPolicy  AuthorizationPolicy  `json:"authorization"`
    NetworkPolicy       NetworkPolicy        `json:"network"`
}

// MeshMetrics 网格指标
type MeshMetrics struct {
    RequestCount    int64             `json:"request_count"`
    ErrorRate       float64           `json:"error_rate"`
    Latency         LatencyMetrics    `json:"latency"`
    ServiceMetrics  map[string]ServiceMetrics `json:"service_metrics"`
    TrafficMetrics  TrafficMetrics    `json:"traffic_metrics"`
}

// IstioServiceMesh Istio服务网格实现
type IstioServiceMesh struct {
    kubeClient   KubernetesClient
    istioClient  IstioClient
    namespace    string
    mu           sync.RWMutex
    services     map[string]*ServiceInfo
}

// NewIstioServiceMesh 创建Istio服务网格
func NewIstioServiceMesh(kubeClient KubernetesClient, istioClient IstioClient, namespace string) *IstioServiceMesh {
    return &IstioServiceMesh{
        kubeClient:  kubeClient,
        istioClient: istioClient,
        namespace:   namespace,
        services:    make(map[string]*ServiceInfo),
    }
}

// RegisterService 注册服务
func (ism *IstioServiceMesh) RegisterService(service *ServiceInfo) error {
    ism.mu.Lock()
    defer ism.mu.Unlock()
    
    // 创建Kubernetes Service
    k8sService := &KubernetesService{
        Name:      service.Name,
        Namespace: service.Namespace,
        Labels:    service.Labels,
        Ports:     service.Ports,
    }
    
    if err := ism.kubeClient.CreateService(k8sService); err != nil {
        return fmt.Errorf("failed to create k8s service: %w", err)
    }
    
    // 创建Istio ServiceEntry
    serviceEntry := &ServiceEntry{
        Name:      service.Name,
        Namespace: service.Namespace,
        Hosts:     []string{fmt.Sprintf("%s.%s.svc.cluster.local", service.Name, service.Namespace)},
        Ports:     convertToIstioPort(service.Ports),
        Location:  "MESH_EXTERNAL",
        Resolution: "DNS",
    }
    
    if err := ism.istioClient.CreateServiceEntry(serviceEntry); err != nil {
        return fmt.Errorf("failed to create service entry: %w", err)
    }
    
    ism.services[service.Name] = service
    
    log.Printf("Registered service in mesh: %s", service.Name)
    return nil
}

// DiscoverServices 发现服务
func (ism *IstioServiceMesh) DiscoverServices(serviceName string) ([]*ServiceEndpoint, error) {
    endpoints, err := ism.kubeClient.GetEndpoints(serviceName, ism.namespace)
    if err != nil {
        return nil, fmt.Errorf("failed to get endpoints: %w", err)
    }
    
    serviceEndpoints := make([]*ServiceEndpoint, 0, len(endpoints))
    for _, ep := range endpoints {
        serviceEndpoints = append(serviceEndpoints, &ServiceEndpoint{
            Address:  ep.Address,
            Port:     ep.Port,
            Weight:   100,
            Healthy:  ep.Ready,
            Metadata: ep.Metadata,
            Zone:     ep.Zone,
            Region:   ep.Region,
        })
    }
    
    return serviceEndpoints, nil
}

// InjectSidecar 注入Sidecar
func (ism *IstioServiceMesh) InjectSidecar(config *SidecarConfig) error {
    // 创建Sidecar配置
    sidecar := &Sidecar{
        Name:      config.ServiceName + "-sidecar",
        Namespace: ism.namespace,
        Workload:  config.ServiceName,
        Ingress: []IngressConfig{
            {
                Port: Port{
                    Number:   8080,
                    Protocol: "HTTP",
                    Name:     "http",
                },
                DefaultEndpoint: "127.0.0.1:8080",
            },
        },
        Egress: []EgressConfig{
            {
                Hosts: []string{"./*"},
            },
        },
    }
    
    if err := ism.istioClient.CreateSidecar(sidecar); err != nil {
        return fmt.Errorf("failed to create sidecar: %w", err)
    }
    
    log.Printf("Injected sidecar for service: %s", config.ServiceName)
    return nil
}

// ConfigureTraffic 配置流量
func (ism *IstioServiceMesh) ConfigureTraffic(rules *TrafficRules) error {
    // 创建VirtualService
    vs := &VirtualService{
        Name:      rules.ServiceName + "-vs",
        Namespace: ism.namespace,
        Hosts:     []string{rules.ServiceName},
        HTTP:      convertToHTTPRoute(rules.RoutingRules),
    }
    
    if err := ism.istioClient.CreateVirtualService(vs); err != nil {
        return fmt.Errorf("failed to create virtual service: %w", err)
    }
    
    // 创建DestinationRule
    dr := &DestinationRule{
        Name:      rules.ServiceName + "-dr",
        Namespace: ism.namespace,
        Host:      rules.ServiceName,
        TrafficPolicy: TrafficPolicy{
            LoadBalancer: LoadBalancer{
                Simple: rules.LoadBalancing.Strategy,
            },
            CircuitBreaker: CircuitBreaker{
                ConsecutiveErrors: rules.CircuitBreaker.ConsecutiveErrors,
                Interval:         rules.CircuitBreaker.Interval,
                BaseEjectionTime: rules.CircuitBreaker.BaseEjectionTime,
            },
        },
    }
    
    if err := ism.istioClient.CreateDestinationRule(dr); err != nil {
        return fmt.Errorf("failed to create destination rule: %w", err)
    }
    
    log.Printf("Configured traffic rules for service: %s", rules.ServiceName)
    return nil
}

// EnableSecurity 启用安全策略
func (ism *IstioServiceMesh) EnableSecurity(policy *SecurityPolicy) error {
    // 创建AuthenticationPolicy
    authPolicy := &AuthenticationPolicy{
        Name:      policy.ServiceName + "-auth",
        Namespace: ism.namespace,
        Selector: Selector{
            MatchLabels: map[string]string{
                "app": policy.ServiceName,
            },
        },
        Peers: []PeerAuthentication{
            {
                MTLS: MTLSConfig{
                    Mode: "STRICT",
                },
            },
        },
    }
    
    if err := ism.istioClient.CreateAuthenticationPolicy(authPolicy); err != nil {
        return fmt.Errorf("failed to create authentication policy: %w", err)
    }
    
    // 创建AuthorizationPolicy
    authzPolicy := &AuthorizationPolicy{
        Name:      policy.ServiceName + "-authz",
        Namespace: ism.namespace,
        Selector: Selector{
            MatchLabels: map[string]string{
                "app": policy.ServiceName,
            },
        },
        Rules: convertToAuthzRules(policy.AuthorizationPolicy.Rules),
    }
    
    if err := ism.istioClient.CreateAuthorizationPolicy(authzPolicy); err != nil {
        return fmt.Errorf("failed to create authorization policy: %w", err)
    }
    
    log.Printf("Enabled security policy for service: %s", policy.ServiceName)
    return nil
}

// GetMetrics 获取指标
func (ism *IstioServiceMesh) GetMetrics() (*MeshMetrics, error) {
    // 从Prometheus获取指标
    metrics, err := ism.queryPrometheusMetrics()
    if err != nil {
        return nil, fmt.Errorf("failed to query metrics: %w", err)
    }
    
    return metrics, nil
}

// queryPrometheusMetrics 查询Prometheus指标
func (ism *IstioServiceMesh) queryPrometheusMetrics() (*MeshMetrics, error) {
    // 实现Prometheus查询逻辑
    return &MeshMetrics{
        RequestCount: 1000,
        ErrorRate:    0.01,
        Latency: LatencyMetrics{
            P50: 10.5,
            P90: 25.0,
            P99: 100.0,
        },
    }, nil
}

// 辅助函数
func convertToIstioPort(ports []ServicePort) []Port {
    istioPorts := make([]Port, len(ports))
    for i, port := range ports {
        istioPorts[i] = Port{
            Number:   port.Port,
            Name:     port.Name,
            Protocol: port.Protocol,
        }
    }
    return istioPorts
}

func convertToHTTPRoute(rules []RoutingRule) []HTTPRoute {
    routes := make([]HTTPRoute, len(rules))
    for i, rule := range rules {
        routes[i] = HTTPRoute{
            Match: []HTTPMatchRequest{
                {
                    Headers: rule.Match.Headers,
                    Method:  &StringMatch{Exact: rule.Match.Method},
                    URI:     &StringMatch{Prefix: rule.Match.Path},
                },
            },
            Route: []HTTPRouteDestination{
                {
                    Destination: HTTPDestination{
                        Host:   rule.Destination.Service,
                        Subset: rule.Destination.Subset,
                    },
                    Weight: rule.Weight,
                },
            },
        }
    }
    return routes
}
"""

# 创建微服务管理器实例
microservice_mgr = MicroserviceManager()

# 生成服务发现实现
service_discovery = microservice_mgr.create_service_discovery()
print("=== 服务发现 ===")
print("✓ Consul注册中心")
print("✓ 服务注册与发现")
print("✓ 健康检查")
print("✓ 服务监听")
print("✓ gRPC解析器")

# 生成负载均衡器
load_balancer = microservice_mgr.create_load_balancer()
print("\n=== 负载均衡 ===")
print("✓ 加权轮询")
print("✓ 一致性哈希")
print("✓ 地域感知")
print("✓ 最少连接")
print("✓ 随机选择")

# 生成服务网格集成
service_mesh = microservice_mgr.create_service_mesh()
print("\n=== 服务网格 ===")
print("✓ Istio集成")
print("✓ Sidecar注入")
print("✓ 流量管理")
print("✓ 安全策略")
print("✓ 可观测性")

微服务治理

1. 服务注册与发现

// 服务注册示例
func RegisterService() {
    registry, _ := NewConsulRegistry("localhost:8500")
    
    instance := &ServiceInstance{
        ID:      "user-service-001",
        Name:    "user-service",
        Address: "192.168.1.100",
        Port:    8080,
        Version: "v1.0.0",
        Tags:    []string{"api", "user"},
        Metadata: map[string]string{
            "weight": "100",
            "zone":   "us-west-1a",
        },
    }
    
    registry.Register(instance)
}

2. 负载均衡配置

// 配置负载均衡
func ConfigureLoadBalancer() {
    // 注册自定义负载均衡器
    balancer.Register(NewWeightedRoundRobinBalancer())
    
    // 创建连接
    conn, err := grpc.Dial(
        "discovery:///user-service",
        grpc.WithDefaultServiceConfig(`{
            "loadBalancingPolicy": "weighted_round_robin"
        }`),
    )
}

3. 服务网格集成

# Istio配置示例
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: user-service
spec:
  hosts:
  - user-service
  http:
  - match:
    - headers:
        version:
          exact: v2
    route:
    - destination:
        host: user-service
        subset: v2
      weight: 100
  - route:
    - destination:
        host: user-service
        subset: v1
      weight: 100

总结

本章深入探讨了 gRPC 在微服务架构中的应用,主要内容包括:

核心要点

  1. 微服务架构

    • 服务拆分原则
    • 分布式系统设计
    • 服务间通信
    • 数据一致性
  2. 服务发现

    • 注册中心选择
    • 服务注册与注销
    • 健康检查机制
    • 服务监听与更新
  3. 负载均衡

    • 多种均衡策略
    • 权重配置
    • 地域感知
    • 故障转移
  4. 服务网格

    • Istio集成
    • 流量管理
    • 安全策略
    • 可观测性

最佳实践

  1. 架构设计

    • 合理的服务边界
    • 松耦合设计
    • 容错机制
    • 可扩展性
  2. 服务治理

    • 统一的服务注册
    • 智能的负载均衡
    • 完善的监控体系
    • 自动化运维
  3. 性能优化

    • 连接池管理
    • 缓存策略
    • 异步处理
    • 资源隔离
  4. 可靠性保障

    • 熔断降级
    • 重试机制
    • 超时控制
    • 优雅关闭

下一步学习

  • 深入学习服务网格技术
  • 掌握分布式追踪系统
  • 了解云原生架构模式
  • 实践DevOps和GitOps

通过本章学习,你已经掌握了构建可扩展微服务架构的核心技能,能够设计和实现高可用的分布式系统。