3.1 同步通信模式

REST API通信

// REST API客户端实现
package main

import (
    "bytes"
    "context"
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "time"
)

// HTTPClient REST客户端接口
type HTTPClient interface {
    Get(ctx context.Context, url string, headers map[string]string) (*http.Response, error)
    Post(ctx context.Context, url string, body interface{}, headers map[string]string) (*http.Response, error)
    Put(ctx context.Context, url string, body interface{}, headers map[string]string) (*http.Response, error)
    Delete(ctx context.Context, url string, headers map[string]string) (*http.Response, error)
}

// RestClient REST客户端实现
type RestClient struct {
    client  *http.Client
    baseURL string
    timeout time.Duration
}

func NewRestClient(baseURL string, timeout time.Duration) *RestClient {
    return &RestClient{
        client: &http.Client{
            Timeout: timeout,
            Transport: &http.Transport{
                MaxIdleConns:        100,
                MaxIdleConnsPerHost: 10,
                IdleConnTimeout:     90 * time.Second,
                DisableKeepAlives:   false,
            },
        },
        baseURL: baseURL,
        timeout: timeout,
    }
}

func (c *RestClient) Get(ctx context.Context, url string, headers map[string]string) (*http.Response, error) {
    return c.doRequest(ctx, "GET", url, nil, headers)
}

func (c *RestClient) Post(ctx context.Context, url string, body interface{}, headers map[string]string) (*http.Response, error) {
    return c.doRequest(ctx, "POST", url, body, headers)
}

func (c *RestClient) Put(ctx context.Context, url string, body interface{}, headers map[string]string) (*http.Response, error) {
    return c.doRequest(ctx, "PUT", url, body, headers)
}

func (c *RestClient) Delete(ctx context.Context, url string, headers map[string]string) (*http.Response, error) {
    return c.doRequest(ctx, "DELETE", url, nil, headers)
}

func (c *RestClient) doRequest(ctx context.Context, method, url string, body interface{}, headers map[string]string) (*http.Response, error) {
    fullURL := c.baseURL + url
    
    var reqBody io.Reader
    if body != nil {
        jsonBody, err := json.Marshal(body)
        if err != nil {
            return nil, fmt.Errorf("failed to marshal request body: %w", err)
        }
        reqBody = bytes.NewBuffer(jsonBody)
    }
    
    req, err := http.NewRequestWithContext(ctx, method, fullURL, reqBody)
    if err != nil {
        return nil, fmt.Errorf("failed to create request: %w", err)
    }
    
    // 设置默认头部
    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("Accept", "application/json")
    req.Header.Set("User-Agent", "microservice-client/1.0")
    
    // 设置自定义头部
    for key, value := range headers {
        req.Header.Set(key, value)
    }
    
    resp, err := c.client.Do(req)
    if err != nil {
        return nil, fmt.Errorf("request failed: %w", err)
    }
    
    return resp, nil
}

// 服务客户端示例
type UserServiceClient struct {
    client HTTPClient
}

func NewUserServiceClient(baseURL string) *UserServiceClient {
    return &UserServiceClient{
        client: NewRestClient(baseURL, 30*time.Second),
    }
}

type User struct {
    ID       string `json:"id"`
    Username string `json:"username"`
    Email    string `json:"email"`
    Status   string `json:"status"`
}

type CreateUserRequest struct {
    Username string `json:"username"`
    Email    string `json:"email"`
    Password string `json:"password"`
}

type UpdateUserRequest struct {
    Username string `json:"username,omitempty"`
    Email    string `json:"email,omitempty"`
    Status   string `json:"status,omitempty"`
}

func (c *UserServiceClient) GetUser(ctx context.Context, userID string, authToken string) (*User, error) {
    headers := map[string]string{
        "Authorization": "Bearer " + authToken,
    }
    
    resp, err := c.client.Get(ctx, "/users/"+userID, headers)
    if err != nil {
        return nil, fmt.Errorf("failed to get user: %w", err)
    }
    defer resp.Body.Close()
    
    if resp.StatusCode != http.StatusOK {
        body, _ := io.ReadAll(resp.Body)
        return nil, fmt.Errorf("get user failed with status %d: %s", resp.StatusCode, string(body))
    }
    
    var user User
    if err := json.NewDecoder(resp.Body).Decode(&user); err != nil {
        return nil, fmt.Errorf("failed to decode response: %w", err)
    }
    
    return &user, nil
}

func (c *UserServiceClient) CreateUser(ctx context.Context, req *CreateUserRequest, authToken string) (*User, error) {
    headers := map[string]string{
        "Authorization": "Bearer " + authToken,
    }
    
    resp, err := c.client.Post(ctx, "/users", req, headers)
    if err != nil {
        return nil, fmt.Errorf("failed to create user: %w", err)
    }
    defer resp.Body.Close()
    
    if resp.StatusCode != http.StatusCreated {
        body, _ := io.ReadAll(resp.Body)
        return nil, fmt.Errorf("create user failed with status %d: %s", resp.StatusCode, string(body))
    }
    
    var user User
    if err := json.NewDecoder(resp.Body).Decode(&user); err != nil {
        return nil, fmt.Errorf("failed to decode response: %w", err)
    }
    
    return &user, nil
}

func (c *UserServiceClient) UpdateUser(ctx context.Context, userID string, req *UpdateUserRequest, authToken string) (*User, error) {
    headers := map[string]string{
        "Authorization": "Bearer " + authToken,
    }
    
    resp, err := c.client.Put(ctx, "/users/"+userID, req, headers)
    if err != nil {
        return nil, fmt.Errorf("failed to update user: %w", err)
    }
    defer resp.Body.Close()
    
    if resp.StatusCode != http.StatusOK {
        body, _ := io.ReadAll(resp.Body)
        return nil, fmt.Errorf("update user failed with status %d: %s", resp.StatusCode, string(body))
    }
    
    var user User
    if err := json.NewDecoder(resp.Body).Decode(&user); err != nil {
        return nil, fmt.Errorf("failed to decode response: %w", err)
    }
    
    return &user, nil
}

func (c *UserServiceClient) DeleteUser(ctx context.Context, userID string, authToken string) error {
    headers := map[string]string{
        "Authorization": "Bearer " + authToken,
    }
    
    resp, err := c.client.Delete(ctx, "/users/"+userID, headers)
    if err != nil {
        return fmt.Errorf("failed to delete user: %w", err)
    }
    defer resp.Body.Close()
    
    if resp.StatusCode != http.StatusNoContent {
        body, _ := io.ReadAll(resp.Body)
        return fmt.Errorf("delete user failed with status %d: %s", resp.StatusCode, string(body))
    }
    
    return nil
}

// 使用示例
func main() {
    ctx := context.Background()
    
    // 创建用户服务客户端
    userClient := NewUserServiceClient("http://user-service:8080")
    
    // 创建用户
    createReq := &CreateUserRequest{
        Username: "john_doe",
        Email:    "john@example.com",
        Password: "password123",
    }
    
    user, err := userClient.CreateUser(ctx, createReq, "auth-token-123")
    if err != nil {
        fmt.Printf("Failed to create user: %v\n", err)
        return
    }
    
    fmt.Printf("Created user: %+v\n", user)
    
    // 获取用户
    fetchedUser, err := userClient.GetUser(ctx, user.ID, "auth-token-123")
    if err != nil {
        fmt.Printf("Failed to get user: %v\n", err)
        return
    }
    
    fmt.Printf("Fetched user: %+v\n", fetchedUser)
    
    // 更新用户
    updateReq := &UpdateUserRequest{
        Status: "active",
    }
    
    updatedUser, err := userClient.UpdateUser(ctx, user.ID, updateReq, "auth-token-123")
    if err != nil {
        fmt.Printf("Failed to update user: %v\n", err)
        return
    }
    
    fmt.Printf("Updated user: %+v\n", updatedUser)
}

API网关模式

API网关作为微服务架构的入口点,提供统一的访问接口。

# Kong API网关配置
apiVersion: v1
kind: ConfigMap
metadata:
  name: kong-config
  namespace: api-gateway
data:
  kong.conf: |
    database = off
    declarative_config = /kong/declarative/kong.yml
    proxy_listen = 0.0.0.0:8000
    admin_listen = 0.0.0.0:8001
    log_level = info
  kong.yml: |
    _format_version: "3.0"
    _transform: true
    
    services:
    - name: user-service
      url: http://user-service:8080
      plugins:
      - name: rate-limiting
        config:
          minute: 100
          hour: 1000
      - name: cors
        config:
          origins:
          - "*"
          methods:
          - GET
          - POST
          - PUT
          - DELETE
          headers:
          - Accept
          - Content-Type
          - Authorization
    
    - name: order-service
      url: http://order-service:8080
      plugins:
      - name: jwt
        config:
          secret_is_base64: false
      - name: request-transformer
        config:
          add:
            headers:
            - "X-Service-Name:order-service"
    
    routes:
    - name: user-routes
      service: user-service
      paths:
      - "/api/v1/users"
      strip_path: false
      
    - name: order-routes
      service: order-service
      paths:
      - "/api/v1/orders"
      strip_path: false
    
    consumers:
    - username: api-client
      jwt_secrets:
      - key: api-key
        secret: my-secret-key
        algorithm: HS256
    
    plugins:
    - name: prometheus
      config:
        per_consumer: true
    - name: zipkin
      config:
        http_endpoint: http://zipkin:9411/api/v2/spans
        sample_ratio: 0.1
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kong-gateway
  namespace: api-gateway
spec:
  replicas: 2
  selector:
    matchLabels:
      app: kong-gateway
  template:
    metadata:
      labels:
        app: kong-gateway
    spec:
      containers:
      - name: kong
        image: kong:3.4
        ports:
        - containerPort: 8000
          name: proxy
        - containerPort: 8001
          name: admin
        env:
        - name: KONG_DATABASE
          value: "off"
        - name: KONG_DECLARATIVE_CONFIG
          value: "/kong/declarative/kong.yml"
        - name: KONG_PROXY_LISTEN
          value: "0.0.0.0:8000"
        - name: KONG_ADMIN_LISTEN
          value: "0.0.0.0:8001"
        - name: KONG_LOG_LEVEL
          value: "info"
        volumeMounts:
        - name: kong-config
          mountPath: /kong/declarative
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /status
            port: 8001
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /status
            port: 8001
          initialDelaySeconds: 5
          periodSeconds: 5
      volumes:
      - name: kong-config
        configMap:
          name: kong-config
---
apiVersion: v1
kind: Service
metadata:
  name: kong-gateway
  namespace: api-gateway
spec:
  selector:
    app: kong-gateway
  ports:
  - name: proxy
    port: 8000
    targetPort: 8000
  - name: admin
    port: 8001
    targetPort: 8001
  type: LoadBalancer
// Go语言API网关实现
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "io"
    "log"
    "net/http"
    "net/http/httputil"
    "net/url"
    "strings"
    "sync"
    "time"
    
    "github.com/gorilla/mux"
    "golang.org/x/time/rate"
)

// Route 路由配置
type Route struct {
    Path        string            `json:"path"`
    Method      string            `json:"method"`
    ServiceName string            `json:"service_name"`
    ServiceURL  string            `json:"service_url"`
    Middleware  []string          `json:"middleware"`
    Headers     map[string]string `json:"headers"`
    Timeout     time.Duration     `json:"timeout"`
}

// Gateway API网关
type Gateway struct {
    routes      map[string]*Route
    rateLimiter *RateLimiter
    auth        *AuthMiddleware
    logger      *Logger
    metrics     *Metrics
    mutex       sync.RWMutex
}

func NewGateway() *Gateway {
    return &Gateway{
        routes:      make(map[string]*Route),
        rateLimiter: NewRateLimiter(),
        auth:        NewAuthMiddleware(),
        logger:      NewLogger(),
        metrics:     NewMetrics(),
    }
}

func (g *Gateway) AddRoute(route *Route) {
    g.mutex.Lock()
    defer g.mutex.Unlock()
    
    key := fmt.Sprintf("%s:%s", route.Method, route.Path)
    g.routes[key] = route
    
    log.Printf("Route added: %s %s -> %s", route.Method, route.Path, route.ServiceURL)
}

func (g *Gateway) RemoveRoute(method, path string) {
    g.mutex.Lock()
    defer g.mutex.Unlock()
    
    key := fmt.Sprintf("%s:%s", method, path)
    delete(g.routes, key)
    
    log.Printf("Route removed: %s %s", method, path)
}

func (g *Gateway) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    start := time.Now()
    
    // 记录请求
    g.logger.LogRequest(r)
    
    // 查找路由
    route := g.findRoute(r.Method, r.URL.Path)
    if route == nil {
        http.Error(w, "Route not found", http.StatusNotFound)
        return
    }
    
    // 应用中间件
    for _, middleware := range route.Middleware {
        switch middleware {
        case "rate_limit":
            if !g.rateLimiter.Allow(r) {
                http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
                return
            }
        case "auth":
            if !g.auth.Authenticate(r) {
                http.Error(w, "Unauthorized", http.StatusUnauthorized)
                return
            }
        case "cors":
            g.applyCORS(w, r)
        }
    }
    
    // 代理请求
    g.proxyRequest(w, r, route)
    
    // 记录指标
    duration := time.Since(start)
    g.metrics.RecordRequest(route.ServiceName, r.Method, duration)
}

func (g *Gateway) findRoute(method, path string) *Route {
    g.mutex.RLock()
    defer g.mutex.RUnlock()
    
    // 精确匹配
    key := fmt.Sprintf("%s:%s", method, path)
    if route, exists := g.routes[key]; exists {
        return route
    }
    
    // 前缀匹配
    for routeKey, route := range g.routes {
        parts := strings.Split(routeKey, ":")
        if len(parts) != 2 {
            continue
        }
        
        routeMethod, routePath := parts[0], parts[1]
        if routeMethod == method && strings.HasPrefix(path, routePath) {
            return route
        }
    }
    
    return nil
}

func (g *Gateway) proxyRequest(w http.ResponseWriter, r *http.Request, route *Route) {
    // 解析目标URL
    target, err := url.Parse(route.ServiceURL)
    if err != nil {
        http.Error(w, "Invalid service URL", http.StatusInternalServerError)
        return
    }
    
    // 创建反向代理
    proxy := httputil.NewSingleHostReverseProxy(target)
    
    // 设置超时
    if route.Timeout > 0 {
        ctx, cancel := context.WithTimeout(r.Context(), route.Timeout)
        defer cancel()
        r = r.WithContext(ctx)
    }
    
    // 添加自定义头部
    for key, value := range route.Headers {
        r.Header.Set(key, value)
    }
    
    // 添加追踪头部
    r.Header.Set("X-Gateway-Time", time.Now().Format(time.RFC3339))
    r.Header.Set("X-Service-Name", route.ServiceName)
    
    // 执行代理
    proxy.ServeHTTP(w, r)
}

func (g *Gateway) applyCORS(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Access-Control-Allow-Origin", "*")
    w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
    w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization")
    
    if r.Method == "OPTIONS" {
        w.WriteHeader(http.StatusOK)
        return
    }
}

// RateLimiter 限流器
type RateLimiter struct {
    limiters map[string]*rate.Limiter
    mutex    sync.RWMutex
}

func NewRateLimiter() *RateLimiter {
    return &RateLimiter{
        limiters: make(map[string]*rate.Limiter),
    }
}

func (rl *RateLimiter) Allow(r *http.Request) bool {
    key := r.RemoteAddr // 可以使用IP、用户ID等作为key
    
    rl.mutex.RLock()
    limiter, exists := rl.limiters[key]
    rl.mutex.RUnlock()
    
    if !exists {
        rl.mutex.Lock()
        limiter = rate.NewLimiter(rate.Limit(10), 20) // 每秒10个请求,突发20个
        rl.limiters[key] = limiter
        rl.mutex.Unlock()
    }
    
    return limiter.Allow()
}

// AuthMiddleware 认证中间件
type AuthMiddleware struct {
    jwtSecret string
}

func NewAuthMiddleware() *AuthMiddleware {
    return &AuthMiddleware{
        jwtSecret: "your-secret-key",
    }
}

func (am *AuthMiddleware) Authenticate(r *http.Request) bool {
    authHeader := r.Header.Get("Authorization")
    if authHeader == "" {
        return false
    }
    
    // 简单的Bearer token验证
    if strings.HasPrefix(authHeader, "Bearer ") {
        token := strings.TrimPrefix(authHeader, "Bearer ")
        // 这里应该验证JWT token
        return token != ""
    }
    
    return false
}

// Logger 日志记录器
type Logger struct {
    // 日志配置
}

func NewLogger() *Logger {
    return &Logger{}
}

func (l *Logger) LogRequest(r *http.Request) {
    log.Printf("[%s] %s %s - %s", time.Now().Format(time.RFC3339), r.Method, r.URL.Path, r.RemoteAddr)
}

// Metrics 指标收集器
type Metrics struct {
    requests map[string]int64
    mutex    sync.RWMutex
}

func NewMetrics() *Metrics {
    return &Metrics{
        requests: make(map[string]int64),
    }
}

func (m *Metrics) RecordRequest(service, method string, duration time.Duration) {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    
    key := fmt.Sprintf("%s:%s", service, method)
    m.requests[key]++
    
    log.Printf("Request to %s %s took %v", service, method, duration)
}

func (m *Metrics) GetMetrics() map[string]int64 {
    m.mutex.RLock()
    defer m.mutex.RUnlock()
    
    result := make(map[string]int64)
    for k, v := range m.requests {
        result[k] = v
    }
    
    return result
}

// 使用示例
func main() {
    gateway := NewGateway()
    
    // 添加路由
    gateway.AddRoute(&Route{
        Path:        "/api/v1/users",
        Method:      "GET",
        ServiceName: "user-service",
        ServiceURL:  "http://user-service:8080",
        Middleware:  []string{"rate_limit", "auth", "cors"},
        Headers: map[string]string{
            "X-Gateway": "true",
        },
        Timeout: 30 * time.Second,
    })
    
    gateway.AddRoute(&Route{
        Path:        "/api/v1/orders",
        Method:      "POST",
        ServiceName: "order-service",
        ServiceURL:  "http://order-service:8080",
        Middleware:  []string{"rate_limit", "auth"},
        Timeout:     15 * time.Second,
    })
    
    // 健康检查端点
    http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        w.WriteHeader(http.StatusOK)
        json.NewEncoder(w).Encode(map[string]string{"status": "healthy"})
    })
    
    // 指标端点
    http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
        metrics := gateway.metrics.GetMetrics()
        w.Header().Set("Content-Type", "application/json")
        json.NewEncoder(w).Encode(metrics)
    })
    
    // 启动网关
    http.Handle("/", gateway)
    
    log.Println("API Gateway starting on :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

熔断器模式

熔断器模式用于防止级联故障,当服务不可用时快速失败。

// Go语言熔断器实现
package main

import (
    "context"
    "errors"
    "fmt"
    "log"
    "sync"
    "time"
)

// CircuitState 熔断器状态
type CircuitState int

const (
    StateClosed CircuitState = iota
    StateHalfOpen
    StateOpen
)

func (s CircuitState) String() string {
    switch s {
    case StateClosed:
        return "CLOSED"
    case StateHalfOpen:
        return "HALF_OPEN"
    case StateOpen:
        return "OPEN"
    default:
        return "UNKNOWN"
    }
}

// CircuitBreakerConfig 熔断器配置
type CircuitBreakerConfig struct {
    MaxRequests      uint32        // 半开状态下的最大请求数
    Interval         time.Duration // 统计时间窗口
    Timeout          time.Duration // 开启状态的超时时间
    ReadyToTrip      func(counts Counts) bool // 判断是否应该开启熔断器
    OnStateChange    func(name string, from CircuitState, to CircuitState) // 状态变化回调
    IsSuccessful     func(err error) bool // 判断请求是否成功
}

// Counts 统计信息
type Counts struct {
    Requests             uint32
    TotalSuccesses       uint32
    TotalFailures        uint32
    ConsecutiveSuccesses uint32
    ConsecutiveFailures  uint32
}

func (c *Counts) onRequest() {
    c.Requests++
}

func (c *Counts) onSuccess() {
    c.TotalSuccesses++
    c.ConsecutiveSuccesses++
    c.ConsecutiveFailures = 0
}

func (c *Counts) onFailure() {
    c.TotalFailures++
    c.ConsecutiveFailures++
    c.ConsecutiveSuccesses = 0
}

func (c *Counts) clear() {
    c.Requests = 0
    c.TotalSuccesses = 0
    c.TotalFailures = 0
    c.ConsecutiveSuccesses = 0
    c.ConsecutiveFailures = 0
}

// CircuitBreaker 熔断器
type CircuitBreaker struct {
    name         string
    maxRequests  uint32
    interval     time.Duration
    timeout      time.Duration
    readyToTrip  func(counts Counts) bool
    isSuccessful func(err error) bool
    onStateChange func(name string, from CircuitState, to CircuitState)
    
    mutex      sync.Mutex
    state      CircuitState
    generation uint64
    counts     Counts
    expiry     time.Time
}

func NewCircuitBreaker(name string, config CircuitBreakerConfig) *CircuitBreaker {
    cb := &CircuitBreaker{
        name:        name,
        maxRequests: config.MaxRequests,
        interval:    config.Interval,
        timeout:     config.Timeout,
        readyToTrip: config.ReadyToTrip,
        isSuccessful: config.IsSuccessful,
        onStateChange: config.OnStateChange,
        state:       StateClosed,
        expiry:      time.Now().Add(config.Interval),
    }
    
    if cb.maxRequests == 0 {
        cb.maxRequests = 1
    }
    
    if cb.interval <= 0 {
        cb.interval = 60 * time.Second
    }
    
    if cb.timeout <= 0 {
        cb.timeout = 60 * time.Second
    }
    
    if cb.readyToTrip == nil {
        cb.readyToTrip = func(counts Counts) bool {
            return counts.ConsecutiveFailures > 5
        }
    }
    
    if cb.isSuccessful == nil {
        cb.isSuccessful = func(err error) bool {
            return err == nil
        }
    }
    
    return cb
}

func (cb *CircuitBreaker) Name() string {
    return cb.name
}

func (cb *CircuitBreaker) State() CircuitState {
    cb.mutex.Lock()
    defer cb.mutex.Unlock()
    
    now := time.Now()
    state, _ := cb.currentState(now)
    return state
}

func (cb *CircuitBreaker) Counts() Counts {
    cb.mutex.Lock()
    defer cb.mutex.Unlock()
    
    return cb.counts
}

func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) {
    generation, err := cb.beforeRequest()
    if err != nil {
        return nil, err
    }
    
    defer func() {
        e := recover()
        if e != nil {
            cb.afterRequest(generation, false)
            panic(e)
        }
    }()
    
    result, err := req()
    cb.afterRequest(generation, cb.isSuccessful(err))
    return result, err
}

func (cb *CircuitBreaker) beforeRequest() (uint64, error) {
    cb.mutex.Lock()
    defer cb.mutex.Unlock()
    
    now := time.Now()
    state, generation := cb.currentState(now)
    
    if state == StateOpen {
        return generation, errors.New("circuit breaker is open")
    } else if state == StateHalfOpen && cb.counts.Requests >= cb.maxRequests {
        return generation, errors.New("too many requests")
    }
    
    cb.counts.onRequest()
    return generation, nil
}

func (cb *CircuitBreaker) afterRequest(before uint64, success bool) {
    cb.mutex.Lock()
    defer cb.mutex.Unlock()
    
    now := time.Now()
    state, generation := cb.currentState(now)
    if generation != before {
        return
    }
    
    if success {
        cb.onSuccess(state, now)
    } else {
        cb.onFailure(state, now)
    }
}

func (cb *CircuitBreaker) onSuccess(state CircuitState, now time.Time) {
    cb.counts.onSuccess()
    
    if state == StateHalfOpen {
        cb.setState(StateClosed, now)
    }
}

func (cb *CircuitBreaker) onFailure(state CircuitState, now time.Time) {
    cb.counts.onFailure()
    
    if cb.readyToTrip(cb.counts) {
        cb.setState(StateOpen, now)
    }
}

func (cb *CircuitBreaker) currentState(now time.Time) (CircuitState, uint64) {
    switch cb.state {
    case StateClosed:
        if !cb.expiry.IsZero() && cb.expiry.Before(now) {
            cb.toNewGeneration(now)
        }
    case StateOpen:
        if cb.expiry.Before(now) {
            cb.setState(StateHalfOpen, now)
        }
    }
    return cb.state, cb.generation
}

func (cb *CircuitBreaker) setState(state CircuitState, now time.Time) {
    if cb.state == state {
        return
    }
    
    prev := cb.state
    cb.state = state
    
    cb.toNewGeneration(now)
    
    if cb.onStateChange != nil {
        cb.onStateChange(cb.name, prev, state)
    }
    
    log.Printf("Circuit breaker '%s' state changed from %s to %s", cb.name, prev, state)
}

func (cb *CircuitBreaker) toNewGeneration(now time.Time) {
    cb.generation++
    cb.counts.clear()
    
    var zero time.Time
    switch cb.state {
    case StateClosed:
        if cb.interval == 0 {
            cb.expiry = zero
        } else {
            cb.expiry = now.Add(cb.interval)
        }
    case StateOpen:
        cb.expiry = now.Add(cb.timeout)
    default: // StateHalfOpen
        cb.expiry = zero
    }
}

// RetryConfig 重试配置
type RetryConfig struct {
    MaxAttempts int
    Delay       time.Duration
    MaxDelay    time.Duration
    Multiplier  float64
    Jitter      bool
}

// RetryableFunc 可重试的函数
type RetryableFunc func() error

// IsRetryable 判断错误是否可重试
type IsRetryable func(error) bool

// Retry 重试执行器
type Retry struct {
    config      RetryConfig
    isRetryable IsRetryable
}

func NewRetry(config RetryConfig, isRetryable IsRetryable) *Retry {
    if config.MaxAttempts <= 0 {
        config.MaxAttempts = 3
    }
    if config.Delay <= 0 {
        config.Delay = 100 * time.Millisecond
    }
    if config.MaxDelay <= 0 {
        config.MaxDelay = 30 * time.Second
    }
    if config.Multiplier <= 0 {
        config.Multiplier = 2.0
    }
    
    if isRetryable == nil {
        isRetryable = func(err error) bool {
            return true // 默认所有错误都可重试
        }
    }
    
    return &Retry{
        config:      config,
        isRetryable: isRetryable,
    }
}

func (r *Retry) Execute(ctx context.Context, fn RetryableFunc) error {
    var lastErr error
    delay := r.config.Delay
    
    for attempt := 1; attempt <= r.config.MaxAttempts; attempt++ {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }
        
        err := fn()
        if err == nil {
            return nil
        }
        
        lastErr = err
        
        if attempt == r.config.MaxAttempts {
            break
        }
        
        if !r.isRetryable(err) {
            return err
        }
        
        log.Printf("Attempt %d failed: %v, retrying in %v", attempt, err, delay)
        
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-time.After(delay):
        }
        
        // 计算下次延迟时间
        delay = time.Duration(float64(delay) * r.config.Multiplier)
        if delay > r.config.MaxDelay {
            delay = r.config.MaxDelay
        }
        
        // 添加抖动
        if r.config.Jitter {
            jitter := time.Duration(float64(delay) * 0.1 * (2*time.Now().UnixNano()%2 - 1))
            delay += jitter
        }
    }
    
    return fmt.Errorf("max attempts (%d) exceeded, last error: %w", r.config.MaxAttempts, lastErr)
}

// ResilientClient 弹性客户端
type ResilientClient struct {
    circuitBreaker *CircuitBreaker
    retry          *Retry
    timeout        time.Duration
}

func NewResilientClient(name string, cbConfig CircuitBreakerConfig, retryConfig RetryConfig, timeout time.Duration) *ResilientClient {
    cb := NewCircuitBreaker(name, cbConfig)
    retry := NewRetry(retryConfig, func(err error) bool {
        // 熔断器开启时不重试
        return err.Error() != "circuit breaker is open"
    })
    
    return &ResilientClient{
        circuitBreaker: cb,
        retry:          retry,
        timeout:        timeout,
    }
}

func (rc *ResilientClient) Execute(ctx context.Context, fn func() (interface{}, error)) (interface{}, error) {
    if rc.timeout > 0 {
        var cancel context.CancelFunc
        ctx, cancel = context.WithTimeout(ctx, rc.timeout)
        defer cancel()
    }
    
    var result interface{}
    var err error
    
    retryErr := rc.retry.Execute(ctx, func() error {
        result, err = rc.circuitBreaker.Execute(fn)
        return err
    })
    
    if retryErr != nil {
        return nil, retryErr
    }
    
    return result, err
}

// 使用示例
func main() {
    // 创建熔断器配置
    cbConfig := CircuitBreakerConfig{
        MaxRequests: 3,
        Interval:    10 * time.Second,
        Timeout:     30 * time.Second,
        ReadyToTrip: func(counts Counts) bool {
            failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
            return counts.Requests >= 3 && failureRatio >= 0.6
        },
        OnStateChange: func(name string, from CircuitState, to CircuitState) {
            log.Printf("Circuit breaker '%s' changed from %s to %s", name, from, to)
        },
        IsSuccessful: func(err error) bool {
            return err == nil
        },
    }
    
    // 创建重试配置
    retryConfig := RetryConfig{
        MaxAttempts: 3,
        Delay:       100 * time.Millisecond,
        MaxDelay:    5 * time.Second,
        Multiplier:  2.0,
        Jitter:      true,
    }
    
    // 创建弹性客户端
    client := NewResilientClient("user-service", cbConfig, retryConfig, 10*time.Second)
    
    ctx := context.Background()
    
    // 模拟服务调用
    for i := 0; i < 10; i++ {
        result, err := client.Execute(ctx, func() (interface{}, error) {
            // 模拟服务调用
            if i%3 == 0 {
                return nil, errors.New("service unavailable")
            }
            return fmt.Sprintf("Success %d", i), nil
        })
        
        if err != nil {
            log.Printf("Request %d failed: %v", i, err)
        } else {
            log.Printf("Request %d succeeded: %v", i, result)
        }
        
        time.Sleep(1 * time.Second)
    }
    
    // 打印熔断器状态
    log.Printf("Final circuit breaker state: %s", client.circuitBreaker.State())
    log.Printf("Final counts: %+v", client.circuitBreaker.Counts())
}

gRPC通信

前面已经介绍了gRPC的基本实现,这里不再重复。

本章总结

本章详细介绍了微服务架构中的服务间通信模式和技术实现:

主要内容回顾

  1. 同步通信模式

    • REST API:基于HTTP的轻量级通信方式
    • gRPC:高性能的RPC框架,支持多种语言
    • 适用于需要立即响应的场景
  2. 异步通信模式

    • 消息队列:解耦服务间依赖,提高系统弹性
    • 事件流:支持事件驱动架构,实现最终一致性
    • 适用于可以容忍延迟的场景
  3. 服务发现与负载均衡

    • 服务注册与发现:动态管理服务实例
    • 负载均衡算法:轮询、随机、加权轮询等
    • 健康检查:确保只路由到健康的服务实例
  4. API网关模式

    • 统一入口:简化客户端调用
    • 横切关注点:认证、限流、监控等
    • 协议转换:支持多种通信协议
  5. 弹性设计模式

    • 熔断器:防止级联故障
    • 重试机制:处理临时性故障
    • 超时控制:避免资源耗尽

最佳实践

  1. 选择合适的通信模式

    • 同步通信用于查询操作
    • 异步通信用于事件通知
    • 混合使用以满足不同需求
  2. 实施弹性设计

    • 始终考虑故障场景
    • 实现优雅降级
    • 监控和告警
  3. 性能优化

    • 连接池管理
    • 请求批处理
    • 缓存策略
  4. 安全考虑

    • 服务间认证
    • 传输加密
    • 访问控制

下一章预告

下一章将探讨微服务架构中的数据管理与一致性问题,包括: - 数据库设计模式 - 分布式事务处理 - 数据一致性策略 - CQRS和Event Sourcing模式

// user_service.proto
syntax = "proto3";

package user;

option go_package = "./proto/user";

import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";

// 用户服务定义
service UserService {
  // 获取用户信息
  rpc GetUser(GetUserRequest) returns (GetUserResponse);
  
  // 创建用户
  rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
  
  // 更新用户
  rpc UpdateUser(UpdateUserRequest) returns (UpdateUserResponse);
  
  // 删除用户
  rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty);
  
  // 列出用户
  rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);
  
  // 流式获取用户更新
  rpc WatchUserUpdates(WatchUserUpdatesRequest) returns (stream UserUpdateEvent);
}

// 用户消息
message User {
  string id = 1;
  string username = 2;
  string email = 3;
  string status = 4;
  google.protobuf.Timestamp created_at = 5;
  google.protobuf.Timestamp updated_at = 6;
}

// 请求消息
message GetUserRequest {
  string id = 1;
}

message GetUserResponse {
  User user = 1;
}

message CreateUserRequest {
  string username = 1;
  string email = 2;
  string password = 3;
}

message CreateUserResponse {
  User user = 1;
}

message UpdateUserRequest {
  string id = 1;
  string username = 2;
  string email = 3;
  string status = 4;
}

message UpdateUserResponse {
  User user = 1;
}

message DeleteUserRequest {
  string id = 1;
}

message ListUsersRequest {
  int32 page = 1;
  int32 page_size = 2;
  string filter = 3;
}

message ListUsersResponse {
  repeated User users = 1;
  int32 total = 2;
  int32 page = 3;
  int32 page_size = 4;
}

message WatchUserUpdatesRequest {
  repeated string user_ids = 1;
}

message UserUpdateEvent {
  enum EventType {
    CREATED = 0;
    UPDATED = 1;
    DELETED = 2;
  }
  
  EventType event_type = 1;
  User user = 2;
  google.protobuf.Timestamp timestamp = 3;
}
// gRPC服务端实现
package main

import (
    "context"
    "fmt"
    "log"
    "net"
    "sync"
    "time"
    
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    "google.golang.org/protobuf/types/known/emptypb"
    "google.golang.org/protobuf/types/known/timestamppb"
    
    pb "./proto/user"
)

// UserServiceServer gRPC服务实现
type UserServiceServer struct {
    pb.UnimplementedUserServiceServer
    users    map[string]*pb.User
    mutex    sync.RWMutex
    watchers map[string][]chan *pb.UserUpdateEvent
}

func NewUserServiceServer() *UserServiceServer {
    return &UserServiceServer{
        users:    make(map[string]*pb.User),
        watchers: make(map[string][]chan *pb.UserUpdateEvent),
    }
}

func (s *UserServiceServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
    s.mutex.RLock()
    defer s.mutex.RUnlock()
    
    user, exists := s.users[req.Id]
    if !exists {
        return nil, status.Errorf(codes.NotFound, "user not found: %s", req.Id)
    }
    
    return &pb.GetUserResponse{
        User: user,
    }, nil
}

func (s *UserServiceServer) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.CreateUserResponse, error) {
    s.mutex.Lock()
    defer s.mutex.Unlock()
    
    // 验证输入
    if req.Username == "" {
        return nil, status.Errorf(codes.InvalidArgument, "username is required")
    }
    if req.Email == "" {
        return nil, status.Errorf(codes.InvalidArgument, "email is required")
    }
    
    // 检查用户名是否已存在
    for _, user := range s.users {
        if user.Username == req.Username {
            return nil, status.Errorf(codes.AlreadyExists, "username already exists: %s", req.Username)
        }
        if user.Email == req.Email {
            return nil, status.Errorf(codes.AlreadyExists, "email already exists: %s", req.Email)
        }
    }
    
    // 创建用户
    userID := fmt.Sprintf("user_%d", time.Now().UnixNano())
    now := timestamppb.Now()
    
    user := &pb.User{
        Id:        userID,
        Username:  req.Username,
        Email:     req.Email,
        Status:    "active",
        CreatedAt: now,
        UpdatedAt: now,
    }
    
    s.users[userID] = user
    
    // 通知观察者
    s.notifyWatchers(&pb.UserUpdateEvent{
        EventType: pb.UserUpdateEvent_CREATED,
        User:      user,
        Timestamp: now,
    })
    
    return &pb.CreateUserResponse{
        User: user,
    }, nil
}

func (s *UserServiceServer) UpdateUser(ctx context.Context, req *pb.UpdateUserRequest) (*pb.UpdateUserResponse, error) {
    s.mutex.Lock()
    defer s.mutex.Unlock()
    
    user, exists := s.users[req.Id]
    if !exists {
        return nil, status.Errorf(codes.NotFound, "user not found: %s", req.Id)
    }
    
    // 更新字段
    if req.Username != "" {
        user.Username = req.Username
    }
    if req.Email != "" {
        user.Email = req.Email
    }
    if req.Status != "" {
        user.Status = req.Status
    }
    
    user.UpdatedAt = timestamppb.Now()
    
    // 通知观察者
    s.notifyWatchers(&pb.UserUpdateEvent{
        EventType: pb.UserUpdateEvent_UPDATED,
        User:      user,
        Timestamp: user.UpdatedAt,
    })
    
    return &pb.UpdateUserResponse{
        User: user,
    }, nil
}

func (s *UserServiceServer) DeleteUser(ctx context.Context, req *pb.DeleteUserRequest) (*emptypb.Empty, error) {
    s.mutex.Lock()
    defer s.mutex.Unlock()
    
    user, exists := s.users[req.Id]
    if !exists {
        return nil, status.Errorf(codes.NotFound, "user not found: %s", req.Id)
    }
    
    delete(s.users, req.Id)
    
    // 通知观察者
    s.notifyWatchers(&pb.UserUpdateEvent{
        EventType: pb.UserUpdateEvent_DELETED,
        User:      user,
        Timestamp: timestamppb.Now(),
    })
    
    return &emptypb.Empty{}, nil
}

func (s *UserServiceServer) ListUsers(ctx context.Context, req *pb.ListUsersRequest) (*pb.ListUsersResponse, error) {
    s.mutex.RLock()
    defer s.mutex.RUnlock()
    
    var users []*pb.User
    for _, user := range s.users {
        users = append(users, user)
    }
    
    // 简单分页
    total := len(users)
    page := req.Page
    pageSize := req.PageSize
    
    if page <= 0 {
        page = 1
    }
    if pageSize <= 0 {
        pageSize = 10
    }
    
    start := (page - 1) * pageSize
    end := start + pageSize
    
    if start >= int32(total) {
        users = []*pb.User{}
    } else {
        if end > int32(total) {
            end = int32(total)
        }
        users = users[start:end]
    }
    
    return &pb.ListUsersResponse{
        Users:    users,
        Total:    int32(total),
        Page:     page,
        PageSize: pageSize,
    }, nil
}

func (s *UserServiceServer) WatchUserUpdates(req *pb.WatchUserUpdatesRequest, stream pb.UserService_WatchUserUpdatesServer) error {
    // 创建事件通道
    eventCh := make(chan *pb.UserUpdateEvent, 100)
    
    // 注册观察者
    s.mutex.Lock()
    for _, userID := range req.UserIds {
        if s.watchers[userID] == nil {
            s.watchers[userID] = make([]chan *pb.UserUpdateEvent, 0)
        }
        s.watchers[userID] = append(s.watchers[userID], eventCh)
    }
    s.mutex.Unlock()
    
    // 清理函数
    defer func() {
        s.mutex.Lock()
        for _, userID := range req.UserIds {
            watchers := s.watchers[userID]
            for i, ch := range watchers {
                if ch == eventCh {
                    s.watchers[userID] = append(watchers[:i], watchers[i+1:]...)
                    break
                }
            }
        }
        s.mutex.Unlock()
        close(eventCh)
    }()
    
    // 监听事件并发送
    for {
        select {
        case event := <-eventCh:
            if err := stream.Send(event); err != nil {
                return err
            }
        case <-stream.Context().Done():
            return stream.Context().Err()
        }
    }
}

func (s *UserServiceServer) notifyWatchers(event *pb.UserUpdateEvent) {
    userID := event.User.Id
    watchers := s.watchers[userID]
    
    for _, ch := range watchers {
        select {
        case ch <- event:
        default:
            // 通道满了,跳过
        }
    }
}

func main() {
    // 创建gRPC服务器
    server := grpc.NewServer()
    
    // 注册服务
    userService := NewUserServiceServer()
    pb.RegisterUserServiceServer(server, userService)
    
    // 监听端口
    listener, err := net.Listen("tcp", ":9090")
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }
    
    log.Println("gRPC server listening on :9090")
    
    // 启动服务器
    if err := server.Serve(listener); err != nil {
        log.Fatalf("Failed to serve: %v", err)
    }
}
// gRPC客户端实现
package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "time"
    
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    
    pb "./proto/user"
)

type UserServiceClient struct {
    client pb.UserServiceClient
    conn   *grpc.ClientConn
}

func NewUserServiceClient(address string) (*UserServiceClient, error) {
    conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        return nil, fmt.Errorf("failed to connect: %w", err)
    }
    
    client := pb.NewUserServiceClient(conn)
    
    return &UserServiceClient{
        client: client,
        conn:   conn,
    }, nil
}

func (c *UserServiceClient) Close() error {
    return c.conn.Close()
}

func (c *UserServiceClient) CreateUser(ctx context.Context, username, email, password string) (*pb.User, error) {
    req := &pb.CreateUserRequest{
        Username: username,
        Email:    email,
        Password: password,
    }
    
    resp, err := c.client.CreateUser(ctx, req)
    if err != nil {
        return nil, fmt.Errorf("failed to create user: %w", err)
    }
    
    return resp.User, nil
}

func (c *UserServiceClient) GetUser(ctx context.Context, userID string) (*pb.User, error) {
    req := &pb.GetUserRequest{
        Id: userID,
    }
    
    resp, err := c.client.GetUser(ctx, req)
    if err != nil {
        return nil, fmt.Errorf("failed to get user: %w", err)
    }
    
    return resp.User, nil
}

func (c *UserServiceClient) UpdateUser(ctx context.Context, userID, username, email, status string) (*pb.User, error) {
    req := &pb.UpdateUserRequest{
        Id:       userID,
        Username: username,
        Email:    email,
        Status:   status,
    }
    
    resp, err := c.client.UpdateUser(ctx, req)
    if err != nil {
        return nil, fmt.Errorf("failed to update user: %w", err)
    }
    
    return resp.User, nil
}

func (c *UserServiceClient) DeleteUser(ctx context.Context, userID string) error {
    req := &pb.DeleteUserRequest{
        Id: userID,
    }
    
    _, err := c.client.DeleteUser(ctx, req)
    if err != nil {
        return fmt.Errorf("failed to delete user: %w", err)
    }
    
    return nil
}

func (c *UserServiceClient) ListUsers(ctx context.Context, page, pageSize int32) ([]*pb.User, error) {
    req := &pb.ListUsersRequest{
        Page:     page,
        PageSize: pageSize,
    }
    
    resp, err := c.client.ListUsers(ctx, req)
    if err != nil {
        return nil, fmt.Errorf("failed to list users: %w", err)
    }
    
    return resp.Users, nil
}

func (c *UserServiceClient) WatchUserUpdates(ctx context.Context, userIDs []string) error {
    req := &pb.WatchUserUpdatesRequest{
        UserIds: userIDs,
    }
    
    stream, err := c.client.WatchUserUpdates(ctx, req)
    if err != nil {
        return fmt.Errorf("failed to watch user updates: %w", err)
    }
    
    for {
        event, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return fmt.Errorf("failed to receive event: %w", err)
        }
        
        fmt.Printf("Received event: %s for user %s\n", event.EventType, event.User.Id)
    }
    
    return nil
}

func main() {
    // 创建客户端
    client, err := NewUserServiceClient("localhost:9090")
    if err != nil {
        log.Fatalf("Failed to create client: %v", err)
    }
    defer client.Close()
    
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    
    // 创建用户
    user, err := client.CreateUser(ctx, "john_doe", "john@example.com", "password123")
    if err != nil {
        log.Fatalf("Failed to create user: %v", err)
    }
    
    fmt.Printf("Created user: %+v\n", user)
    
    // 获取用户
    fetchedUser, err := client.GetUser(ctx, user.Id)
    if err != nil {
        log.Fatalf("Failed to get user: %v", err)
    }
    
    fmt.Printf("Fetched user: %+v\n", fetchedUser)
    
    // 更新用户
    updatedUser, err := client.UpdateUser(ctx, user.Id, "john_smith", "", "inactive")
    if err != nil {
        log.Fatalf("Failed to update user: %v", err)
    }
    
    fmt.Printf("Updated user: %+v\n", updatedUser)
    
    // 列出用户
    users, err := client.ListUsers(ctx, 1, 10)
    if err != nil {
        log.Fatalf("Failed to list users: %v", err)
    }
    
    fmt.Printf("Listed %d users\n", len(users))
    
    // 监听用户更新(在新的goroutine中)
    go func() {
        watchCtx, watchCancel := context.WithTimeout(context.Background(), 30*time.Second)
        defer watchCancel()
        
        if err := client.WatchUserUpdates(watchCtx, []string{user.Id}); err != nil {
            log.Printf("Watch failed: %v", err)
        }
    }()
    
    // 等待一段时间让监听生效
    time.Sleep(1 * time.Second)
    
    // 再次更新用户以触发事件
    _, err = client.UpdateUser(ctx, user.Id, "", "john.smith@example.com", "active")
    if err != nil {
        log.Printf("Failed to update user: %v", err)
    }
    
    // 等待事件处理
    time.Sleep(2 * time.Second)
}

3.2 异步通信模式

消息队列实现

# 基于RabbitMQ的异步通信实现
import asyncio
import json
import logging
from datetime import datetime
from typing import Dict, Any, Callable, Optional
from dataclasses import dataclass, asdict
from abc import ABC, abstractmethod

import aio_pika
from aio_pika import Message, DeliveryMode
from aio_pika.abc import AbstractRobustConnection, AbstractRobustChannel

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class MessageEnvelope:
    """消息信封"""
    id: str
    type: str
    source: str
    timestamp: str
    correlation_id: Optional[str] = None
    reply_to: Optional[str] = None
    data: Dict[str, Any] = None
    
    def to_dict(self) -> Dict[str, Any]:
        return asdict(self)
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'MessageEnvelope':
        return cls(**data)

class MessageHandler(ABC):
    """消息处理器接口"""
    
    @abstractmethod
    async def handle(self, envelope: MessageEnvelope) -> Optional[MessageEnvelope]:
        """处理消息,返回响应消息(如果需要)"""
        pass
    
    @abstractmethod
    def get_message_types(self) -> list[str]:
        """获取处理的消息类型"""
        pass

class MessageBus:
    """消息总线"""
    
    def __init__(self, connection_url: str):
        self.connection_url = connection_url
        self.connection: Optional[AbstractRobustConnection] = None
        self.channel: Optional[AbstractRobustChannel] = None
        self.handlers: Dict[str, list[MessageHandler]] = {}
        self.exchanges: Dict[str, Any] = {}
        self.queues: Dict[str, Any] = {}
    
    async def connect(self):
        """连接到消息代理"""
        try:
            self.connection = await aio_pika.connect_robust(self.connection_url)
            self.channel = await self.connection.channel()
            await self.channel.set_qos(prefetch_count=10)
            logger.info("Connected to message broker")
        except Exception as e:
            logger.error(f"Failed to connect to message broker: {e}")
            raise
    
    async def disconnect(self):
        """断开连接"""
        if self.connection:
            await self.connection.close()
            logger.info("Disconnected from message broker")
    
    async def declare_exchange(self, name: str, exchange_type: str = "topic", durable: bool = True):
        """声明交换机"""
        if not self.channel:
            raise RuntimeError("Not connected to message broker")
        
        exchange = await self.channel.declare_exchange(
            name, 
            type=exchange_type, 
            durable=durable
        )
        self.exchanges[name] = exchange
        logger.info(f"Declared exchange: {name}")
        return exchange
    
    async def declare_queue(self, name: str, durable: bool = True, exclusive: bool = False):
        """声明队列"""
        if not self.channel:
            raise RuntimeError("Not connected to message broker")
        
        queue = await self.channel.declare_queue(
            name, 
            durable=durable, 
            exclusive=exclusive
        )
        self.queues[name] = queue
        logger.info(f"Declared queue: {name}")
        return queue
    
    async def bind_queue(self, queue_name: str, exchange_name: str, routing_key: str):
        """绑定队列到交换机"""
        queue = self.queues.get(queue_name)
        exchange = self.exchanges.get(exchange_name)
        
        if not queue or not exchange:
            raise ValueError("Queue or exchange not found")
        
        await queue.bind(exchange, routing_key)
        logger.info(f"Bound queue {queue_name} to exchange {exchange_name} with routing key {routing_key}")
    
    async def publish(self, 
                     exchange_name: str, 
                     routing_key: str, 
                     envelope: MessageEnvelope,
                     delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT):
        """发布消息"""
        if not self.channel:
            raise RuntimeError("Not connected to message broker")
        
        exchange = self.exchanges.get(exchange_name)
        if not exchange:
            raise ValueError(f"Exchange {exchange_name} not found")
        
        message_body = json.dumps(envelope.to_dict()).encode()
        message = Message(
            message_body,
            delivery_mode=delivery_mode,
            message_id=envelope.id,
            correlation_id=envelope.correlation_id,
            reply_to=envelope.reply_to,
            timestamp=datetime.fromisoformat(envelope.timestamp)
        )
        
        await exchange.publish(message, routing_key)
        logger.info(f"Published message {envelope.id} to {exchange_name}/{routing_key}")
    
    async def subscribe(self, queue_name: str, handler: MessageHandler):
        """订阅消息"""
        queue = self.queues.get(queue_name)
        if not queue:
            raise ValueError(f"Queue {queue_name} not found")
        
        # 注册处理器
        for message_type in handler.get_message_types():
            if message_type not in self.handlers:
                self.handlers[message_type] = []
            self.handlers[message_type].append(handler)
        
        # 开始消费消息
        await queue.consume(self._message_callback)
        logger.info(f"Subscribed to queue {queue_name}")
    
    async def _message_callback(self, message: aio_pika.IncomingMessage):
        """消息回调处理"""
        async with message.process():
            try:
                # 解析消息
                message_data = json.loads(message.body.decode())
                envelope = MessageEnvelope.from_dict(message_data)
                
                logger.info(f"Received message {envelope.id} of type {envelope.type}")
                
                # 查找处理器
                handlers = self.handlers.get(envelope.type, [])
                if not handlers:
                    logger.warning(f"No handlers for message type {envelope.type}")
                    return
                
                # 处理消息
                for handler in handlers:
                    try:
                        response = await handler.handle(envelope)
                        
                        # 如果有响应且指定了回复地址
                        if response and envelope.reply_to:
                            await self._send_reply(response, envelope.reply_to)
                            
                    except Exception as e:
                        logger.error(f"Handler {handler.__class__.__name__} failed: {e}")
                        # 可以在这里实现重试逻辑
                        
            except Exception as e:
                logger.error(f"Failed to process message: {e}")
                # 消息处理失败,可以发送到死信队列
    
    async def _send_reply(self, response: MessageEnvelope, reply_to: str):
        """发送回复消息"""
        try:
            # 假设reply_to格式为 "exchange/routing_key"
            parts = reply_to.split('/', 1)
            if len(parts) == 2:
                exchange_name, routing_key = parts
                await self.publish(exchange_name, routing_key, response)
            else:
                logger.error(f"Invalid reply_to format: {reply_to}")
        except Exception as e:
            logger.error(f"Failed to send reply: {e}")

# 具体的消息处理器实现
class UserEventHandler(MessageHandler):
    """用户事件处理器"""
    
    def __init__(self, service_name: str):
        self.service_name = service_name
    
    async def handle(self, envelope: MessageEnvelope) -> Optional[MessageEnvelope]:
        """处理用户事件"""
        logger.info(f"[{self.service_name}] Processing {envelope.type} event")
        
        if envelope.type == "user.created":
            return await self._handle_user_created(envelope)
        elif envelope.type == "user.updated":
            return await self._handle_user_updated(envelope)
        elif envelope.type == "user.deleted":
            return await self._handle_user_deleted(envelope)
        else:
            logger.warning(f"Unknown event type: {envelope.type}")
            return None
    
    def get_message_types(self) -> list[str]:
        return ["user.created", "user.updated", "user.deleted"]
    
    async def _handle_user_created(self, envelope: MessageEnvelope) -> Optional[MessageEnvelope]:
        """处理用户创建事件"""
        user_data = envelope.data
        user_id = user_data.get("id")
        username = user_data.get("username")
        email = user_data.get("email")
        
        logger.info(f"[{self.service_name}] User created: {username} ({email})")
        
        # 模拟业务处理
        await asyncio.sleep(0.1)
        
        # 发送欢迎邮件事件
        welcome_event = MessageEnvelope(
            id=f"welcome_{user_id}_{datetime.now().isoformat()}",
            type="email.welcome",
            source=self.service_name,
            timestamp=datetime.now().isoformat(),
            data={
                "user_id": user_id,
                "username": username,
                "email": email,
                "template": "welcome"
            }
        )
        
        return welcome_event
    
    async def _handle_user_updated(self, envelope: MessageEnvelope) -> Optional[MessageEnvelope]:
        """处理用户更新事件"""
        user_data = envelope.data
        user_id = user_data.get("id")
        
        logger.info(f"[{self.service_name}] User updated: {user_id}")
        
        # 模拟业务处理
        await asyncio.sleep(0.1)
        
        return None
    
    async def _handle_user_deleted(self, envelope: MessageEnvelope) -> Optional[MessageEnvelope]:
        """处理用户删除事件"""
        user_data = envelope.data
        user_id = user_data.get("id")
        
        logger.info(f"[{self.service_name}] User deleted: {user_id}")
        
        # 模拟清理相关数据
        await asyncio.sleep(0.1)
        
        return None

class EmailHandler(MessageHandler):
    """邮件处理器"""
    
    def __init__(self, service_name: str):
        self.service_name = service_name
    
    async def handle(self, envelope: MessageEnvelope) -> Optional[MessageEnvelope]:
        """处理邮件事件"""
        logger.info(f"[{self.service_name}] Processing {envelope.type} event")
        
        if envelope.type == "email.welcome":
            return await self._send_welcome_email(envelope)
        elif envelope.type == "email.notification":
            return await self._send_notification_email(envelope)
        else:
            logger.warning(f"Unknown email type: {envelope.type}")
            return None
    
    def get_message_types(self) -> list[str]:
        return ["email.welcome", "email.notification"]
    
    async def _send_welcome_email(self, envelope: MessageEnvelope) -> Optional[MessageEnvelope]:
        """发送欢迎邮件"""
        email_data = envelope.data
        username = email_data.get("username")
        email = email_data.get("email")
        
        logger.info(f"[{self.service_name}] Sending welcome email to {username} ({email})")
        
        # 模拟发送邮件
        await asyncio.sleep(0.2)
        
        logger.info(f"[{self.service_name}] Welcome email sent successfully")
        return None
    
    async def _send_notification_email(self, envelope: MessageEnvelope) -> Optional[MessageEnvelope]:
        """发送通知邮件"""
        email_data = envelope.data
        subject = email_data.get("subject")
        recipient = email_data.get("recipient")
        
        logger.info(f"[{self.service_name}] Sending notification '{subject}' to {recipient}")
        
        # 模拟发送邮件
        await asyncio.sleep(0.2)
        
        logger.info(f"[{self.service_name}] Notification email sent successfully")
        return None

# 使用示例
async def main():
    # 创建消息总线
    message_bus = MessageBus("amqp://guest:guest@localhost/")
    
    try:
        # 连接到消息代理
        await message_bus.connect()
        
        # 声明交换机和队列
        await message_bus.declare_exchange("user.events", "topic")
        await message_bus.declare_exchange("email.events", "topic")
        
        await message_bus.declare_queue("user.service.queue")
        await message_bus.declare_queue("email.service.queue")
        
        # 绑定队列
        await message_bus.bind_queue("user.service.queue", "user.events", "user.*")
        await message_bus.bind_queue("email.service.queue", "email.events", "email.*")
        
        # 创建处理器
        user_handler = UserEventHandler("user-service")
        email_handler = EmailHandler("email-service")
        
        # 订阅消息
        await message_bus.subscribe("user.service.queue", user_handler)
        await message_bus.subscribe("email.service.queue", email_handler)
        
        # 等待一段时间让订阅生效
        await asyncio.sleep(1)
        
        # 发布用户创建事件
        user_created_event = MessageEnvelope(
            id=f"user_created_{datetime.now().isoformat()}",
            type="user.created",
            source="user-api",
            timestamp=datetime.now().isoformat(),
            data={
                "id": "user123",
                "username": "john_doe",
                "email": "john@example.com",
                "status": "active"
            }
        )
        
        await message_bus.publish("user.events", "user.created", user_created_event)
        
        # 等待消息处理
        await asyncio.sleep(2)
        
        # 发布用户更新事件
        user_updated_event = MessageEnvelope(
            id=f"user_updated_{datetime.now().isoformat()}",
            type="user.updated",
            source="user-api",
            timestamp=datetime.now().isoformat(),
            data={
                "id": "user123",
                "username": "john_smith",
                "email": "john.smith@example.com",
                "status": "active"
            }
        )
        
        await message_bus.publish("user.events", "user.updated", user_updated_event)
        
        # 等待消息处理
        await asyncio.sleep(2)
        
        logger.info("Message processing completed")
        
    except Exception as e:
        logger.error(f"Error in main: {e}")
    finally:
        await message_bus.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

事件流处理

// 基于Apache Kafka的事件流处理
const { Kafka, logLevel } = require('kafkajs');
const { v4: uuidv4 } = require('uuid');

// Kafka配置
const kafka = new Kafka({
  clientId: 'microservice-app',
  brokers: ['localhost:9092'],
  logLevel: logLevel.INFO,
  retry: {
    initialRetryTime: 100,
    retries: 8
  }
});

// 事件基类
class Event {
  constructor(type, source, data, metadata = {}) {
    this.id = uuidv4();
    this.type = type;
    this.source = source;
    this.timestamp = new Date().toISOString();
    this.data = data;
    this.metadata = metadata;
  }
  
  toJSON() {
    return {
      id: this.id,
      type: this.type,
      source: this.source,
      timestamp: this.timestamp,
      data: this.data,
      metadata: this.metadata
    };
  }
  
  static fromJSON(json) {
    const event = new Event(json.type, json.source, json.data, json.metadata);
    event.id = json.id;
    event.timestamp = json.timestamp;
    return event;
  }
}

// 事件发布器
class EventPublisher {
  constructor(kafka) {
    this.producer = kafka.producer({
      maxInFlightRequests: 1,
      idempotent: true,
      transactionTimeout: 30000
    });
    this.connected = false;
  }
  
  async connect() {
    if (!this.connected) {
      await this.producer.connect();
      this.connected = true;
      console.log('Event publisher connected');
    }
  }
  
  async disconnect() {
    if (this.connected) {
      await this.producer.disconnect();
      this.connected = false;
      console.log('Event publisher disconnected');
    }
  }
  
  async publish(topic, event, partition = null) {
    if (!this.connected) {
      throw new Error('Publisher not connected');
    }
    
    const message = {
      key: event.id,
      value: JSON.stringify(event.toJSON()),
      headers: {
        'event-type': event.type,
        'event-source': event.source,
        'event-timestamp': event.timestamp
      }
    };
    
    if (partition !== null) {
      message.partition = partition;
    }
    
    try {
      const result = await this.producer.send({
        topic,
        messages: [message]
      });
      
      console.log(`Published event ${event.id} to topic ${topic}:`, result);
      return result;
    } catch (error) {
      console.error(`Failed to publish event ${event.id}:`, error);
      throw error;
    }
  }
  
  async publishBatch(topic, events) {
    if (!this.connected) {
      throw new Error('Publisher not connected');
    }
    
    const messages = events.map(event => ({
      key: event.id,
      value: JSON.stringify(event.toJSON()),
      headers: {
        'event-type': event.type,
        'event-source': event.source,
        'event-timestamp': event.timestamp
      }
    }));
    
    try {
      const result = await this.producer.send({
        topic,
        messages
      });
      
      console.log(`Published ${events.length} events to topic ${topic}`);
      return result;
    } catch (error) {
      console.error(`Failed to publish batch events:`, error);
      throw error;
    }
  }
}

// 事件处理器接口
class EventHandler {
  constructor(name) {
    this.name = name;
  }
  
  async handle(event) {
    throw new Error('handle method must be implemented');
  }
  
  getEventTypes() {
    throw new Error('getEventTypes method must be implemented');
  }
  
  async onError(error, event) {
    console.error(`Handler ${this.name} error:`, error);
  }
}

// 事件消费者
class EventConsumer {
  constructor(kafka, groupId) {
    this.consumer = kafka.consumer({ 
      groupId,
      sessionTimeout: 30000,
      rebalanceTimeout: 60000,
      heartbeatInterval: 3000,
      maxBytesPerPartition: 1048576,
      minBytes: 1,
      maxBytes: 10485760,
      maxWaitTimeInMs: 5000
    });
    this.handlers = new Map();
    this.connected = false;
    this.running = false;
  }
  
  async connect() {
    if (!this.connected) {
      await this.consumer.connect();
      this.connected = true;
      console.log('Event consumer connected');
    }
  }
  
  async disconnect() {
    if (this.connected) {
      this.running = false;
      await this.consumer.disconnect();
      this.connected = false;
      console.log('Event consumer disconnected');
    }
  }
  
  addHandler(handler) {
    const eventTypes = handler.getEventTypes();
    eventTypes.forEach(type => {
      if (!this.handlers.has(type)) {
        this.handlers.set(type, []);
      }
      this.handlers.get(type).push(handler);
    });
    
    console.log(`Added handler ${handler.name} for events: ${eventTypes.join(', ')}`);
  }
  
  async subscribe(topics) {
    if (!this.connected) {
      throw new Error('Consumer not connected');
    }
    
    await this.consumer.subscribe({ 
      topics: Array.isArray(topics) ? topics : [topics],
      fromBeginning: false
    });
    
    console.log(`Subscribed to topics: ${Array.isArray(topics) ? topics.join(', ') : topics}`);
  }
  
  async start() {
    if (!this.connected) {
      throw new Error('Consumer not connected');
    }
    
    this.running = true;
    
    await this.consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        if (!this.running) return;
        
        try {
          const eventData = JSON.parse(message.value.toString());
          const event = Event.fromJSON(eventData);
          
          console.log(`Received event ${event.id} of type ${event.type} from topic ${topic}`);
          
          // 查找处理器
          const handlers = this.handlers.get(event.type) || [];
          
          if (handlers.length === 0) {
            console.warn(`No handlers for event type ${event.type}`);
            return;
          }
          
          // 并发处理事件
          const promises = handlers.map(async handler => {
            try {
              await handler.handle(event);
            } catch (error) {
              await handler.onError(error, event);
            }
          });
          
          await Promise.all(promises);
          
        } catch (error) {
          console.error('Failed to process message:', error);
        }
      }
    });
    
    console.log('Event consumer started');
  }
  
  async stop() {
    this.running = false;
    await this.consumer.stop();
    console.log('Event consumer stopped');
  }
}

// 具体的事件处理器实现
class UserEventHandler extends EventHandler {
  constructor() {
    super('UserEventHandler');
  }
  
  getEventTypes() {
    return ['user.created', 'user.updated', 'user.deleted'];
  }
  
  async handle(event) {
    console.log(`[${this.name}] Processing ${event.type} event`);
    
    switch (event.type) {
      case 'user.created':
        await this.handleUserCreated(event);
        break;
      case 'user.updated':
        await this.handleUserUpdated(event);
        break;
      case 'user.deleted':
        await this.handleUserDeleted(event);
        break;
      default:
        console.warn(`Unknown event type: ${event.type}`);
    }
  }
  
  async handleUserCreated(event) {
    const { id, username, email } = event.data;
    console.log(`[${this.name}] User created: ${username} (${email})`);
    
    // 模拟业务处理
    await new Promise(resolve => setTimeout(resolve, 100));
    
    // 发布欢迎邮件事件
    const publisher = new EventPublisher(kafka);
    await publisher.connect();
    
    const welcomeEvent = new Event(
      'email.welcome',
      'user-service',
      {
        userId: id,
        username,
        email,
        template: 'welcome'
      }
    );
    
    await publisher.publish('email-events', welcomeEvent);
    await publisher.disconnect();
  }
  
  async handleUserUpdated(event) {
    const { id } = event.data;
    console.log(`[${this.name}] User updated: ${id}`);
    
    // 模拟业务处理
    await new Promise(resolve => setTimeout(resolve, 100));
  }
  
  async handleUserDeleted(event) {
    const { id } = event.data;
    console.log(`[${this.name}] User deleted: ${id}`);
    
    // 模拟清理相关数据
    await new Promise(resolve => setTimeout(resolve, 100));
  }
}

class EmailEventHandler extends EventHandler {
  constructor() {
    super('EmailEventHandler');
  }
  
  getEventTypes() {
    return ['email.welcome', 'email.notification'];
  }
  
  async handle(event) {
    console.log(`[${this.name}] Processing ${event.type} event`);
    
    switch (event.type) {
      case 'email.welcome':
        await this.sendWelcomeEmail(event);
        break;
      case 'email.notification':
        await this.sendNotificationEmail(event);
        break;
      default:
        console.warn(`Unknown event type: ${event.type}`);
    }
  }
  
  async sendWelcomeEmail(event) {
    const { username, email } = event.data;
    console.log(`[${this.name}] Sending welcome email to ${username} (${email})`);
    
    // 模拟发送邮件
    await new Promise(resolve => setTimeout(resolve, 200));
    
    console.log(`[${this.name}] Welcome email sent successfully`);
  }
  
  async sendNotificationEmail(event) {
    const { subject, recipient, content } = event.data;
    console.log(`[${this.name}] Sending notification '${subject}' to ${recipient}`);
    
    // 模拟发送邮件
    await new Promise(resolve => setTimeout(resolve, 200));
    
    console.log(`[${this.name}] Notification email sent successfully`);
  }
}

// 事件流处理器
class EventStreamProcessor {
  constructor(kafka) {
    this.kafka = kafka;
    this.publisher = new EventPublisher(kafka);
    this.consumers = new Map();
  }
  
  async start() {
    await this.publisher.connect();
    
    // 启动用户事件消费者
    const userConsumer = new EventConsumer(this.kafka, 'user-service-group');
    await userConsumer.connect();
    await userConsumer.subscribe(['user-events']);
    userConsumer.addHandler(new UserEventHandler());
    await userConsumer.start();
    this.consumers.set('user', userConsumer);
    
    // 启动邮件事件消费者
    const emailConsumer = new EventConsumer(this.kafka, 'email-service-group');
    await emailConsumer.connect();
    await emailConsumer.subscribe(['email-events']);
    emailConsumer.addHandler(new EmailEventHandler());
    await emailConsumer.start();
    this.consumers.set('email', emailConsumer);
    
    console.log('Event stream processor started');
  }
  
  async stop() {
    for (const [name, consumer] of this.consumers) {
      await consumer.stop();
      await consumer.disconnect();
    }
    
    await this.publisher.disconnect();
    console.log('Event stream processor stopped');
  }
  
  async publishUserEvent(type, userData) {
    const event = new Event(type, 'user-api', userData);
    await this.publisher.publish('user-events', event);
    return event;
  }
}

// 使用示例
async function main() {
  const processor = new EventStreamProcessor(kafka);
  
  try {
    await processor.start();
    
    // 等待消费者启动
    await new Promise(resolve => setTimeout(resolve, 2000));
    
    // 发布用户创建事件
    await processor.publishUserEvent('user.created', {
      id: 'user123',
      username: 'john_doe',
      email: 'john@example.com',
      status: 'active'
    });
    
    // 等待事件处理
    await new Promise(resolve => setTimeout(resolve, 3000));
    
    // 发布用户更新事件
    await processor.publishUserEvent('user.updated', {
      id: 'user123',
      username: 'john_smith',
      email: 'john.smith@example.com',
      status: 'active'
    });
    
    // 等待事件处理
    await new Promise(resolve => setTimeout(resolve, 3000));
    
    console.log('Event processing completed');
    
  } catch (error) {
    console.error('Error in main:', error);
  } finally {
    await processor.stop();
  }
}

if (require.main === module) {
  main().catch(console.error);
}

module.exports = {
  Event,
  EventPublisher,
  EventHandler,
  EventConsumer,
  EventStreamProcessor,
  UserEventHandler,
  EmailEventHandler
};

3.3 服务发现与负载均衡

服务注册与发现

# Consul服务发现配置
apiVersion: v1
kind: ConfigMap
metadata:
  name: consul-config
  namespace: microservices
data:
  consul.json: |
    {
      "datacenter": "dc1",
      "data_dir": "/consul/data",
      "log_level": "INFO",
      "server": true,
      "bootstrap_expect": 3,
      "bind_addr": "0.0.0.0",
      "client_addr": "0.0.0.0",
      "retry_join": [
        "consul-0.consul.microservices.svc.cluster.local",
        "consul-1.consul.microservices.svc.cluster.local",
        "consul-2.consul.microservices.svc.cluster.local"
      ],
      "ui_config": {
        "enabled": true
      },
      "connect": {
        "enabled": true
      },
      "ports": {
        "grpc": 8502
      },
      "acl": {
        "enabled": true,
        "default_policy": "allow",
        "enable_token_persistence": true
      }
    }
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: consul
  namespace: microservices
spec:
  serviceName: consul
  replicas: 3
  selector:
    matchLabels:
      app: consul
  template:
    metadata:
      labels:
        app: consul
    spec:
      containers:
      - name: consul
        image: consul:1.15.2
        ports:
        - containerPort: 8500
          name: ui-port
        - containerPort: 8400
          name: alt-port
        - containerPort: 53
          name: udp-port
        - containerPort: 8443
          name: https-port
        - containerPort: 8080
          name: http-port
        - containerPort: 8301
          name: serflan
        - containerPort: 8302
          name: serfwan
        - containerPort: 8600
          name: consuldns
        - containerPort: 8300
          name: server
        command:
        - "/bin/sh"
        - "-ec"
        - |
          CONSUL_DATA_DIR=/consul/data
          CONSUL_CONFIG_DIR=/consul/config
          
          mkdir -p ${CONSUL_DATA_DIR}
          mkdir -p ${CONSUL_CONFIG_DIR}
          
          cp /etc/consul/consul.json ${CONSUL_CONFIG_DIR}/
          
          exec /usr/local/bin/docker-entrypoint.sh consul agent -config-dir=${CONSUL_CONFIG_DIR}
        volumeMounts:
        - name: consul-config
          mountPath: /etc/consul
        - name: consul-data
          mountPath: /consul/data
        env:
        - name: CONSUL_BIND_INTERFACE
          value: eth0
        - name: CONSUL_CLIENT_INTERFACE
          value: eth0
      volumes:
      - name: consul-config
        configMap:
          name: consul-config
  volumeClaimTemplates:
  - metadata:
      name: consul-data
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 10Gi
---
apiVersion: v1
kind: Service
metadata:
  name: consul
  namespace: microservices
  labels:
    app: consul
spec:
  clusterIP: None
  ports:
  - port: 8500
    targetPort: 8500
    name: ui
  - port: 8400
    targetPort: 8400
    name: alt-port
  - port: 53
    targetPort: 53
    name: udp-port
  - port: 8443
    targetPort: 8443
    name: https-port
  - port: 8080
    targetPort: 8080
    name: http-port
  - port: 8301
    targetPort: 8301
    name: serflan-tcp
    protocol: "TCP"
  - port: 8301
    targetPort: 8301
    name: serflan-udp
    protocol: "UDP"
  - port: 8302
    targetPort: 8302
    name: serfwan-tcp
    protocol: "TCP"
  - port: 8302
    targetPort: 8302
    name: serfwan-udp
    protocol: "UDP"
  - port: 8300
    targetPort: 8300
    name: server
  - port: 8600
    targetPort: 8600
    name: consuldns-tcp
    protocol: "TCP"
  - port: 8600
    targetPort: 8600
    name: consuldns-udp
    protocol: "UDP"
  selector:
    app: consul
---
apiVersion: v1
kind: Service
metadata:
  name: consul-ui
  namespace: microservices
  labels:
    app: consul
spec:
  type: LoadBalancer
  ports:
  - port: 80
    targetPort: 8500
    name: ui
  selector:
    app: consul
// Go语言服务发现客户端实现
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "math/rand"
    "net/http"
    "sync"
    "time"
    
    "github.com/hashicorp/consul/api"
)

// ServiceInstance 服务实例
type ServiceInstance struct {
    ID      string            `json:"id"`
    Name    string            `json:"name"`
    Address string            `json:"address"`
    Port    int               `json:"port"`
    Tags    []string          `json:"tags"`
    Meta    map[string]string `json:"meta"`
    Health  string            `json:"health"`
}

// ServiceRegistry 服务注册接口
type ServiceRegistry interface {
    Register(ctx context.Context, instance *ServiceInstance) error
    Deregister(ctx context.Context, instanceID string) error
    Discover(ctx context.Context, serviceName string) ([]*ServiceInstance, error)
    Watch(ctx context.Context, serviceName string) (<-chan []*ServiceInstance, error)
    HealthCheck(ctx context.Context, instanceID string) error
}

// ConsulRegistry Consul服务注册实现
type ConsulRegistry struct {
    client   *api.Client
    config   *api.Config
    watchers map[string][]chan []*ServiceInstance
    mutex    sync.RWMutex
}

func NewConsulRegistry(address string) (*ConsulRegistry, error) {
    config := api.DefaultConfig()
    config.Address = address
    
    client, err := api.NewClient(config)
    if err != nil {
        return nil, fmt.Errorf("failed to create consul client: %w", err)
    }
    
    return &ConsulRegistry{
        client:   client,
        config:   config,
        watchers: make(map[string][]chan []*ServiceInstance),
    }, nil
}

func (r *ConsulRegistry) Register(ctx context.Context, instance *ServiceInstance) error {
    registration := &api.AgentServiceRegistration{
        ID:      instance.ID,
        Name:    instance.Name,
        Address: instance.Address,
        Port:    instance.Port,
        Tags:    instance.Tags,
        Meta:    instance.Meta,
        Check: &api.AgentServiceCheck{
            HTTP:                           fmt.Sprintf("http://%s:%d/health", instance.Address, instance.Port),
            Interval:                       "10s",
            Timeout:                        "3s",
            DeregisterCriticalServiceAfter: "30s",
        },
    }
    
    err := r.client.Agent().ServiceRegister(registration)
    if err != nil {
        return fmt.Errorf("failed to register service: %w", err)
    }
    
    log.Printf("Service registered: %s (%s:%d)", instance.Name, instance.Address, instance.Port)
    return nil
}

func (r *ConsulRegistry) Deregister(ctx context.Context, instanceID string) error {
    err := r.client.Agent().ServiceDeregister(instanceID)
    if err != nil {
        return fmt.Errorf("failed to deregister service: %w", err)
    }
    
    log.Printf("Service deregistered: %s", instanceID)
    return nil
}

func (r *ConsulRegistry) Discover(ctx context.Context, serviceName string) ([]*ServiceInstance, error) {
    services, _, err := r.client.Health().Service(serviceName, "", true, nil)
    if err != nil {
        return nil, fmt.Errorf("failed to discover services: %w", err)
    }
    
    var instances []*ServiceInstance
    for _, service := range services {
        instance := &ServiceInstance{
            ID:      service.Service.ID,
            Name:    service.Service.Service,
            Address: service.Service.Address,
            Port:    service.Service.Port,
            Tags:    service.Service.Tags,
            Meta:    service.Service.Meta,
            Health:  "passing",
        }
        
        // 检查健康状态
        for _, check := range service.Checks {
            if check.Status != "passing" {
                instance.Health = check.Status
                break
            }
        }
        
        instances = append(instances, instance)
    }
    
    return instances, nil
}

func (r *ConsulRegistry) Watch(ctx context.Context, serviceName string) (<-chan []*ServiceInstance, error) {
    ch := make(chan []*ServiceInstance, 10)
    
    r.mutex.Lock()
    if r.watchers[serviceName] == nil {
        r.watchers[serviceName] = make([]chan []*ServiceInstance, 0)
    }
    r.watchers[serviceName] = append(r.watchers[serviceName], ch)
    r.mutex.Unlock()
    
    go r.watchService(ctx, serviceName)
    
    return ch, nil
}

func (r *ConsulRegistry) watchService(ctx context.Context, serviceName string) {
    queryOptions := &api.QueryOptions{
        WaitTime: 30 * time.Second,
    }
    
    for {
        select {
        case <-ctx.Done():
            return
        default:
            services, meta, err := r.client.Health().Service(serviceName, "", true, queryOptions)
            if err != nil {
                log.Printf("Watch error for service %s: %v", serviceName, err)
                time.Sleep(5 * time.Second)
                continue
            }
            
            queryOptions.WaitIndex = meta.LastIndex
            
            var instances []*ServiceInstance
            for _, service := range services {
                instance := &ServiceInstance{
                    ID:      service.Service.ID,
                    Name:    service.Service.Service,
                    Address: service.Service.Address,
                    Port:    service.Service.Port,
                    Tags:    service.Service.Tags,
                    Meta:    service.Service.Meta,
                    Health:  "passing",
                }
                
                for _, check := range service.Checks {
                    if check.Status != "passing" {
                        instance.Health = check.Status
                        break
                    }
                }
                
                instances = append(instances, instance)
            }
            
            // 通知所有观察者
            r.mutex.RLock()
            watchers := r.watchers[serviceName]
            r.mutex.RUnlock()
            
            for _, watcher := range watchers {
                select {
                case watcher <- instances:
                default:
                    // 通道满了,跳过
                }
            }
        }
    }
}

func (r *ConsulRegistry) HealthCheck(ctx context.Context, instanceID string) error {
    // 实现健康检查逻辑
    return nil
}

// LoadBalancer 负载均衡器接口
type LoadBalancer interface {
    Select(instances []*ServiceInstance) (*ServiceInstance, error)
    UpdateInstances(instances []*ServiceInstance)
}

// RoundRobinBalancer 轮询负载均衡器
type RoundRobinBalancer struct {
    instances []*ServiceInstance
    current   int
    mutex     sync.RWMutex
}

func NewRoundRobinBalancer() *RoundRobinBalancer {
    return &RoundRobinBalancer{
        instances: make([]*ServiceInstance, 0),
        current:   0,
    }
}

func (lb *RoundRobinBalancer) Select(instances []*ServiceInstance) (*ServiceInstance, error) {
    lb.mutex.Lock()
    defer lb.mutex.Unlock()
    
    if len(instances) == 0 {
        return nil, fmt.Errorf("no available instances")
    }
    
    // 过滤健康的实例
    healthyInstances := make([]*ServiceInstance, 0)
    for _, instance := range instances {
        if instance.Health == "passing" {
            healthyInstances = append(healthyInstances, instance)
        }
    }
    
    if len(healthyInstances) == 0 {
        return nil, fmt.Errorf("no healthy instances")
    }
    
    instance := healthyInstances[lb.current%len(healthyInstances)]
    lb.current++
    
    return instance, nil
}

func (lb *RoundRobinBalancer) UpdateInstances(instances []*ServiceInstance) {
    lb.mutex.Lock()
    defer lb.mutex.Unlock()
    
    lb.instances = instances
    if lb.current >= len(instances) {
        lb.current = 0
    }
}

// RandomBalancer 随机负载均衡器
type RandomBalancer struct {
    rand *rand.Rand
}

func NewRandomBalancer() *RandomBalancer {
    return &RandomBalancer{
        rand: rand.New(rand.NewSource(time.Now().UnixNano())),
    }
}

func (lb *RandomBalancer) Select(instances []*ServiceInstance) (*ServiceInstance, error) {
    if len(instances) == 0 {
        return nil, fmt.Errorf("no available instances")
    }
    
    // 过滤健康的实例
    healthyInstances := make([]*ServiceInstance, 0)
    for _, instance := range instances {
        if instance.Health == "passing" {
            healthyInstances = append(healthyInstances, instance)
        }
    }
    
    if len(healthyInstances) == 0 {
        return nil, fmt.Errorf("no healthy instances")
    }
    
    index := lb.rand.Intn(len(healthyInstances))
    return healthyInstances[index], nil
}

func (lb *RandomBalancer) UpdateInstances(instances []*ServiceInstance) {
    // 随机负载均衡器不需要维护状态
}

// WeightedRoundRobinBalancer 加权轮询负载均衡器
type WeightedRoundRobinBalancer struct {
    instances []*ServiceInstance
    weights   []int
    current   []int
    mutex     sync.RWMutex
}

func NewWeightedRoundRobinBalancer() *WeightedRoundRobinBalancer {
    return &WeightedRoundRobinBalancer{
        instances: make([]*ServiceInstance, 0),
        weights:   make([]int, 0),
        current:   make([]int, 0),
    }
}

func (lb *WeightedRoundRobinBalancer) Select(instances []*ServiceInstance) (*ServiceInstance, error) {
    lb.mutex.Lock()
    defer lb.mutex.Unlock()
    
    if len(instances) == 0 {
        return nil, fmt.Errorf("no available instances")
    }
    
    // 过滤健康的实例并获取权重
    healthyInstances := make([]*ServiceInstance, 0)
    weights := make([]int, 0)
    
    for _, instance := range instances {
        if instance.Health == "passing" {
            healthyInstances = append(healthyInstances, instance)
            
            // 从meta中获取权重,默认为1
            weight := 1
            if weightStr, exists := instance.Meta["weight"]; exists {
                if w, err := fmt.Sscanf(weightStr, "%d", &weight); err != nil || w != 1 {
                    weight = 1
                }
            }
            weights = append(weights, weight)
        }
    }
    
    if len(healthyInstances) == 0 {
        return nil, fmt.Errorf("no healthy instances")
    }
    
    // 初始化当前权重
    if len(lb.current) != len(healthyInstances) {
        lb.current = make([]int, len(healthyInstances))
        copy(lb.current, weights)
    }
    
    // 找到当前权重最大的实例
    maxIndex := 0
    for i := 1; i < len(lb.current); i++ {
        if lb.current[i] > lb.current[maxIndex] {
            maxIndex = i
        }
    }
    
    // 减少选中实例的当前权重
    totalWeight := 0
    for _, w := range weights {
        totalWeight += w
    }
    lb.current[maxIndex] -= totalWeight
    
    // 增加所有实例的当前权重
    for i := range lb.current {
        lb.current[i] += weights[i]
    }
    
    return healthyInstances[maxIndex], nil
}

func (lb *WeightedRoundRobinBalancer) UpdateInstances(instances []*ServiceInstance) {
    lb.mutex.Lock()
    defer lb.mutex.Unlock()
    
    lb.instances = instances
    lb.weights = make([]int, len(instances))
    lb.current = make([]int, len(instances))
    
    for i, instance := range instances {
        weight := 1
        if weightStr, exists := instance.Meta["weight"]; exists {
            if w, err := fmt.Sscanf(weightStr, "%d", &weight); err != nil || w != 1 {
                weight = 1
            }
        }
        lb.weights[i] = weight
        lb.current[i] = weight
    }
}

// ServiceClient 服务客户端
type ServiceClient struct {
    registry     ServiceRegistry
    loadBalancer LoadBalancer
    httpClient   *http.Client
    serviceName  string
    instances    []*ServiceInstance
    mutex        sync.RWMutex
}

func NewServiceClient(registry ServiceRegistry, loadBalancer LoadBalancer, serviceName string) *ServiceClient {
    return &ServiceClient{
        registry:     registry,
        loadBalancer: loadBalancer,
        httpClient: &http.Client{
            Timeout: 30 * time.Second,
        },
        serviceName: serviceName,
        instances:   make([]*ServiceInstance, 0),
    }
}

func (c *ServiceClient) Start(ctx context.Context) error {
    // 初始发现服务
    instances, err := c.registry.Discover(ctx, c.serviceName)
    if err != nil {
        return fmt.Errorf("failed to discover service: %w", err)
    }
    
    c.updateInstances(instances)
    
    // 监听服务变化
    watcher, err := c.registry.Watch(ctx, c.serviceName)
    if err != nil {
        return fmt.Errorf("failed to watch service: %w", err)
    }
    
    go func() {
        for {
            select {
            case instances := <-watcher:
                c.updateInstances(instances)
            case <-ctx.Done():
                return
            }
        }
    }()
    
    return nil
}

func (c *ServiceClient) updateInstances(instances []*ServiceInstance) {
    c.mutex.Lock()
    defer c.mutex.Unlock()
    
    c.instances = instances
    c.loadBalancer.UpdateInstances(instances)
    
    log.Printf("Updated service instances for %s: %d instances", c.serviceName, len(instances))
}

func (c *ServiceClient) Call(ctx context.Context, method, path string, body interface{}) (*http.Response, error) {
    c.mutex.RLock()
    instances := c.instances
    c.mutex.RUnlock()
    
    instance, err := c.loadBalancer.Select(instances)
    if err != nil {
        return nil, fmt.Errorf("failed to select instance: %w", err)
    }
    
    url := fmt.Sprintf("http://%s:%d%s", instance.Address, instance.Port, path)
    
    var reqBody []byte
    if body != nil {
        reqBody, err = json.Marshal(body)
        if err != nil {
            return nil, fmt.Errorf("failed to marshal request body: %w", err)
        }
    }
    
    req, err := http.NewRequestWithContext(ctx, method, url, nil)
    if err != nil {
        return nil, fmt.Errorf("failed to create request: %w", err)
    }
    
    if reqBody != nil {
        req.Header.Set("Content-Type", "application/json")
    }
    
    resp, err := c.httpClient.Do(req)
    if err != nil {
        return nil, fmt.Errorf("request failed: %w", err)
    }
    
    return resp, nil
}

// 使用示例
func main() {
    ctx := context.Background()
    
    // 创建Consul注册中心
    registry, err := NewConsulRegistry("localhost:8500")
    if err != nil {
        log.Fatalf("Failed to create registry: %v", err)
    }
    
    // 注册服务实例
    instance := &ServiceInstance{
        ID:      "user-service-1",
        Name:    "user-service",
        Address: "localhost",
        Port:    8080,
        Tags:    []string{"api", "v1"},
        Meta: map[string]string{
            "version": "1.0.0",
            "weight":  "10",
        },
    }
    
    err = registry.Register(ctx, instance)
    if err != nil {
        log.Fatalf("Failed to register service: %v", err)
    }
    
    // 创建服务客户端
    loadBalancer := NewWeightedRoundRobinBalancer()
    client := NewServiceClient(registry, loadBalancer, "user-service")
    
    err = client.Start(ctx)
    if err != nil {
        log.Fatalf("Failed to start client: %v", err)
    }
    
    // 调用服务
    resp, err := client.Call(ctx, "GET", "/users/123", nil)
    if err != nil {
        log.Printf("Service call failed: %v", err)
    } else {
        log.Printf("Service call successful: %d", resp.StatusCode)
        resp.Body.Close()
    }
    
    // 清理
    err = registry.Deregister(ctx, instance.ID)
    if err != nil {
        log.Printf("Failed to deregister service: %v", err)
    }
}