概述

本章将深入介绍 gRPC 客户端的开发,包括连接管理、调用模式、错误处理、重试机制、负载均衡等核心内容。我们将学习如何构建高效、可靠的 gRPC 客户端应用。

学习目标

  • 掌握 gRPC 客户端的基本架构和连接管理
  • 学习四种不同类型的 RPC 调用模式
  • 了解客户端拦截器和中间件的使用
  • 掌握错误处理和重试机制
  • 学习连接池和负载均衡配置

gRPC 客户端架构

from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Callable, Iterator
from abc import ABC, abstractmethod
import asyncio
import time
import random

class ConnectionState(Enum):
    """连接状态枚举"""
    IDLE = "idle"
    CONNECTING = "connecting"
    READY = "ready"
    TRANSIENT_FAILURE = "transient_failure"
    SHUTDOWN = "shutdown"

class LoadBalancingPolicy(Enum):
    """负载均衡策略枚举"""
    ROUND_ROBIN = "round_robin"
    PICK_FIRST = "pick_first"
    GRPCLB = "grpclb"
    WEIGHTED_ROUND_ROBIN = "weighted_round_robin"
    CONSISTENT_HASH = "consistent_hash"

class RetryPolicy(Enum):
    """重试策略枚举"""
    EXPONENTIAL_BACKOFF = "exponential_backoff"
    LINEAR_BACKOFF = "linear_backoff"
    FIXED_DELAY = "fixed_delay"
    IMMEDIATE = "immediate"

@dataclass
class ClientConfig:
    """客户端配置"""
    target: str = "localhost:50051"
    insecure: bool = True
    max_receive_message_length: int = 4 * 1024 * 1024  # 4MB
    max_send_message_length: int = 4 * 1024 * 1024     # 4MB
    keepalive_time_ms: int = 30000
    keepalive_timeout_ms: int = 5000
    keepalive_permit_without_calls: bool = True
    max_connection_idle_ms: int = 300000
    max_connection_age_ms: int = 600000
    initial_window_size: int = 65536
    initial_conn_window_size: int = 65536
    load_balancing_policy: LoadBalancingPolicy = LoadBalancingPolicy.ROUND_ROBIN
    enable_retry: bool = True
    max_retry_attempts: int = 3
    retry_policy: RetryPolicy = RetryPolicy.EXPONENTIAL_BACKOFF
    base_delay_ms: int = 100
    max_delay_ms: int = 30000
    backoff_multiplier: float = 2.0
    timeout_ms: int = 30000

@dataclass
class CallOptions:
    """调用选项"""
    timeout_ms: Optional[int] = None
    metadata: Dict[str, str] = None
    retry_policy: Optional[RetryPolicy] = None
    max_retry_attempts: Optional[int] = None
    compression: Optional[str] = None
    wait_for_ready: bool = False

class GRPCClientManager:
    """gRPC 客户端管理器"""
    
    def __init__(self, config: ClientConfig):
        self.config = config
        self.connections = {}
        self.connection_pool = []
        
    def create_basic_client_implementation(self) -> str:
        """创建基础客户端实现"""
        return """
// client.go - gRPC 客户端基础实现
package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "time"
    
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/grpc/keepalive"
    "google.golang.org/grpc/metadata"
    "google.golang.org/grpc/status"
    
    pb "./proto/user"
)

// UserClient gRPC 用户客户端
type UserClient struct {
    conn   *grpc.ClientConn
    client pb.UserServiceClient
    config *ClientConfig
}

// ClientConfig 客户端配置
type ClientConfig struct {
    Target                    string
    Timeout                   time.Duration
    MaxRetryAttempts         int
    BaseDelay                time.Duration
    MaxDelay                 time.Duration
    BackoffMultiplier        float64
    KeepaliveTime            time.Duration
    KeepaliveTimeout         time.Duration
    MaxReceiveMessageLength  int
    MaxSendMessageLength     int
}

// NewUserClient 创建用户客户端
func NewUserClient(config *ClientConfig) (*UserClient, error) {
    // 配置连接选项
    opts := []grpc.DialOption{
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithKeepaliveParams(keepalive.ClientParameters{
            Time:                config.KeepaliveTime,
            Timeout:             config.KeepaliveTimeout,
            PermitWithoutStream: true,
        }),
        grpc.WithDefaultCallOptions(
            grpc.MaxCallRecvMsgSize(config.MaxReceiveMessageLength),
            grpc.MaxCallSendMsgSize(config.MaxSendMessageLength),
        ),
        // 添加拦截器
        grpc.WithUnaryInterceptor(clientUnaryInterceptor()),
        grpc.WithStreamInterceptor(clientStreamInterceptor()),
    }
    
    // 建立连接
    conn, err := grpc.Dial(config.Target, opts...)
    if err != nil {
        return nil, fmt.Errorf("failed to connect: %w", err)
    }
    
    // 等待连接就绪
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    
    if !conn.WaitForStateChange(ctx, connectivity.Connecting) {
        conn.Close()
        return nil, fmt.Errorf("connection timeout")
    }
    
    return &UserClient{
        conn:   conn,
        client: pb.NewUserServiceClient(conn),
        config: config,
    }, nil
}

// Close 关闭客户端连接
func (c *UserClient) Close() error {
    return c.conn.Close()
}

// CreateUser 创建用户(一元调用)
func (c *UserClient) CreateUser(ctx context.Context, req *pb.CreateUserRequest, opts ...CallOption) (*pb.CreateUserResponse, error) {
    // 应用调用选项
    callOpts := &CallOptions{}
    for _, opt := range opts {
        opt(callOpts)
    }
    
    // 设置超时
    if callOpts.Timeout > 0 {
        var cancel context.CancelFunc
        ctx, cancel = context.WithTimeout(ctx, callOpts.Timeout)
        defer cancel()
    }
    
    // 添加元数据
    if len(callOpts.Metadata) > 0 {
        md := metadata.New(callOpts.Metadata)
        ctx = metadata.NewOutgoingContext(ctx, md)
    }
    
    // 执行调用
    resp, err := c.client.CreateUser(ctx, req)
    if err != nil {
        return nil, handleGRPCError(err)
    }
    
    return resp, nil
}

// GetUser 获取用户信息(一元调用)
func (c *UserClient) GetUser(ctx context.Context, userID string, opts ...CallOption) (*pb.User, error) {
    req := &pb.GetUserRequest{UserId: userID}
    
    resp, err := c.client.GetUser(ctx, req)
    if err != nil {
        return nil, handleGRPCError(err)
    }
    
    return resp.User, nil
}

// ListUsers 列出用户(服务端流)
func (c *UserClient) ListUsers(ctx context.Context, req *pb.ListUsersRequest, opts ...CallOption) (<-chan *pb.User, <-chan error) {
    userChan := make(chan *pb.User, 100)
    errChan := make(chan error, 1)
    
    go func() {
        defer close(userChan)
        defer close(errChan)
        
        stream, err := c.client.ListUsers(ctx, req)
        if err != nil {
            errChan <- handleGRPCError(err)
            return
        }
        
        for {
            user, err := stream.Recv()
            if err == io.EOF {
                return
            }
            if err != nil {
                errChan <- handleGRPCError(err)
                return
            }
            
            select {
            case userChan <- user:
            case <-ctx.Done():
                errChan <- ctx.Err()
                return
            }
        }
    }()
    
    return userChan, errChan
}

// BatchCreateUsers 批量创建用户(客户端流)
func (c *UserClient) BatchCreateUsers(ctx context.Context, users []*pb.CreateUserRequest, opts ...CallOption) (*pb.BatchCreateUsersResponse, error) {
    stream, err := c.client.BatchCreateUsers(ctx)
    if err != nil {
        return nil, handleGRPCError(err)
    }
    
    // 发送用户数据
    for _, user := range users {
        if err := stream.Send(user); err != nil {
            return nil, handleGRPCError(err)
        }
    }
    
    // 关闭发送并接收响应
    resp, err := stream.CloseAndRecv()
    if err != nil {
        return nil, handleGRPCError(err)
    }
    
    return resp, nil
}

// SyncUsers 同步用户数据(双向流)
func (c *UserClient) SyncUsers(ctx context.Context) (*UserSyncClient, error) {
    stream, err := c.client.SyncUsers(ctx)
    if err != nil {
        return nil, handleGRPCError(err)
    }
    
    return &UserSyncClient{
        stream: stream,
        ctx:    ctx,
    }, nil
}

// UserSyncClient 用户同步客户端
type UserSyncClient struct {
    stream pb.UserService_SyncUsersClient
    ctx    context.Context
}

// Send 发送同步请求
func (c *UserSyncClient) Send(req *pb.SyncUserRequest) error {
    return c.stream.Send(req)
}

// Recv 接收同步响应
func (c *UserSyncClient) Recv() (*pb.SyncUserResponse, error) {
    return c.stream.Recv()
}

// Close 关闭同步流
func (c *UserSyncClient) Close() error {
    return c.stream.CloseSend()
}

// CallOption 调用选项函数类型
type CallOption func(*CallOptions)

// CallOptions 调用选项
type CallOptions struct {
    Timeout  time.Duration
    Metadata map[string]string
    Retry    bool
}

// WithTimeout 设置超时时间
func WithTimeout(timeout time.Duration) CallOption {
    return func(opts *CallOptions) {
        opts.Timeout = timeout
    }
}

// WithMetadata 设置元数据
func WithMetadata(md map[string]string) CallOption {
    return func(opts *CallOptions) {
        opts.Metadata = md
    }
}

// WithRetry 启用重试
func WithRetry() CallOption {
    return func(opts *CallOptions) {
        opts.Retry = true
    }
}

// 错误处理
func handleGRPCError(err error) error {
    st, ok := status.FromError(err)
    if !ok {
        return err
    }
    
    switch st.Code() {
    case codes.NotFound:
        return fmt.Errorf("resource not found: %s", st.Message())
    case codes.AlreadyExists:
        return fmt.Errorf("resource already exists: %s", st.Message())
    case codes.InvalidArgument:
        return fmt.Errorf("invalid argument: %s", st.Message())
    case codes.PermissionDenied:
        return fmt.Errorf("permission denied: %s", st.Message())
    case codes.Unauthenticated:
        return fmt.Errorf("authentication required: %s", st.Message())
    case codes.Unavailable:
        return fmt.Errorf("service unavailable: %s", st.Message())
    case codes.DeadlineExceeded:
        return fmt.Errorf("request timeout: %s", st.Message())
    default:
        return fmt.Errorf("grpc error [%s]: %s", st.Code(), st.Message())
    }
}

// 客户端拦截器
func clientUnaryInterceptor() grpc.UnaryClientInterceptor {
    return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
        start := time.Now()
        
        log.Printf("[CLIENT] Calling method: %s", method)
        
        err := invoker(ctx, method, req, reply, cc, opts...)
        
        duration := time.Since(start)
        if err != nil {
            log.Printf("[CLIENT] Method: %s, Error: %v, Duration: %s", method, err, duration)
        } else {
            log.Printf("[CLIENT] Method: %s, Success, Duration: %s", method, duration)
        }
        
        return err
    }
}

func clientStreamInterceptor() grpc.StreamClientInterceptor {
    return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
        log.Printf("[CLIENT] Starting stream: %s", method)
        
        stream, err := streamer(ctx, desc, cc, method, opts...)
        if err != nil {
            log.Printf("[CLIENT] Stream creation failed: %s, Error: %v", method, err)
            return nil, err
        }
        
        return &wrappedClientStream{
            ClientStream: stream,
            method:       method,
        }, nil
    }
}

type wrappedClientStream struct {
    grpc.ClientStream
    method string
}

func (w *wrappedClientStream) SendMsg(m interface{}) error {
    log.Printf("[CLIENT] Sending message on stream: %s", w.method)
    return w.ClientStream.SendMsg(m)
}

func (w *wrappedClientStream) RecvMsg(m interface{}) error {
    err := w.ClientStream.RecvMsg(m)
    if err != nil && err != io.EOF {
        log.Printf("[CLIENT] Receive error on stream: %s, Error: %v", w.method, err)
    }
    return err
}

// 使用示例
func main() {
    config := &ClientConfig{
        Target:                   "localhost:50051",
        Timeout:                  30 * time.Second,
        MaxRetryAttempts:         3,
        BaseDelay:                100 * time.Millisecond,
        MaxDelay:                 30 * time.Second,
        BackoffMultiplier:        2.0,
        KeepaliveTime:            30 * time.Second,
        KeepaliveTimeout:         5 * time.Second,
        MaxReceiveMessageLength:  4 * 1024 * 1024,
        MaxSendMessageLength:     4 * 1024 * 1024,
    }
    
    client, err := NewUserClient(config)
    if err != nil {
        log.Fatalf("Failed to create client: %v", err)
    }
    defer client.Close()
    
    ctx := context.Background()
    
    // 创建用户
    createReq := &pb.CreateUserRequest{
        Username: "testuser",
        Email:    "test@example.com",
        FullName: "Test User",
    }
    
    createResp, err := client.CreateUser(ctx, createReq, 
        WithTimeout(5*time.Second),
        WithMetadata(map[string]string{"client-id": "test-client"}),
    )
    if err != nil {
        log.Fatalf("Failed to create user: %v", err)
    }
    
    log.Printf("Created user: %v", createResp.User)
    
    // 获取用户
    user, err := client.GetUser(ctx, createResp.User.Id)
    if err != nil {
        log.Fatalf("Failed to get user: %v", err)
    }
    
    log.Printf("Retrieved user: %v", user)
    
    // 列出用户(流式)
    userChan, errChan := client.ListUsers(ctx, &pb.ListUsersRequest{})
    
    for {
        select {
        case user, ok := <-userChan:
            if !ok {
                log.Println("User stream completed")
                return
            }
            log.Printf("Received user: %v", user)
        case err := <-errChan:
            if err != nil {
                log.Fatalf("Stream error: %v", err)
            }
        case <-ctx.Done():
            log.Println("Context cancelled")
            return
        }
    }
}
"""
    
    def create_connection_pool_manager(self) -> str:
        """创建连接池管理器"""
        return """
// connection_pool.go - gRPC 连接池管理
package client

import (
    "context"
    "fmt"
    "sync"
    "time"
    
    "google.golang.org/grpc"
    "google.golang.org/grpc/connectivity"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/grpc/keepalive"
)

// ConnectionPool gRPC 连接池
type ConnectionPool struct {
    target      string
    poolSize    int
    connections []*grpc.ClientConn
    current     int
    mu          sync.RWMutex
    dialOptions []grpc.DialOption
}

// PoolConfig 连接池配置
type PoolConfig struct {
    Target                  string
    PoolSize               int
    MaxReceiveMessageLength int
    MaxSendMessageLength   int
    KeepaliveTime          time.Duration
    KeepaliveTimeout       time.Duration
    ConnectTimeout         time.Duration
}

// NewConnectionPool 创建连接池
func NewConnectionPool(config *PoolConfig) (*ConnectionPool, error) {
    if config.PoolSize <= 0 {
        config.PoolSize = 5 // 默认连接池大小
    }
    
    dialOptions := []grpc.DialOption{
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithKeepaliveParams(keepalive.ClientParameters{
            Time:                config.KeepaliveTime,
            Timeout:             config.KeepaliveTimeout,
            PermitWithoutStream: true,
        }),
        grpc.WithDefaultCallOptions(
            grpc.MaxCallRecvMsgSize(config.MaxReceiveMessageLength),
            grpc.MaxCallSendMsgSize(config.MaxSendMessageLength),
        ),
    }
    
    pool := &ConnectionPool{
        target:      config.Target,
        poolSize:    config.PoolSize,
        connections: make([]*grpc.ClientConn, config.PoolSize),
        dialOptions: dialOptions,
    }
    
    // 初始化连接
    for i := 0; i < config.PoolSize; i++ {
        conn, err := pool.createConnection(config.ConnectTimeout)
        if err != nil {
            // 清理已创建的连接
            pool.Close()
            return nil, fmt.Errorf("failed to create connection %d: %w", i, err)
        }
        pool.connections[i] = conn
    }
    
    // 启动健康检查
    go pool.healthCheck()
    
    return pool, nil
}

// GetConnection 获取连接(轮询)
func (p *ConnectionPool) GetConnection() *grpc.ClientConn {
    p.mu.Lock()
    defer p.mu.Unlock()
    
    conn := p.connections[p.current]
    p.current = (p.current + 1) % p.poolSize
    
    return conn
}

// GetHealthyConnection 获取健康的连接
func (p *ConnectionPool) GetHealthyConnection() *grpc.ClientConn {
    p.mu.RLock()
    defer p.mu.RUnlock()
    
    // 从当前位置开始查找健康连接
    start := p.current
    for i := 0; i < p.poolSize; i++ {
        idx := (start + i) % p.poolSize
        conn := p.connections[idx]
        
        if conn.GetState() == connectivity.Ready {
            p.current = (idx + 1) % p.poolSize
            return conn
        }
    }
    
    // 如果没有健康连接,返回当前连接
    return p.connections[p.current]
}

// Close 关闭连接池
func (p *ConnectionPool) Close() error {
    p.mu.Lock()
    defer p.mu.Unlock()
    
    var lastErr error
    for i, conn := range p.connections {
        if conn != nil {
            if err := conn.Close(); err != nil {
                lastErr = err
            }
            p.connections[i] = nil
        }
    }
    
    return lastErr
}

// createConnection 创建单个连接
func (p *ConnectionPool) createConnection(timeout time.Duration) (*grpc.ClientConn, error) {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()
    
    conn, err := grpc.DialContext(ctx, p.target, p.dialOptions...)
    if err != nil {
        return nil, err
    }
    
    // 等待连接就绪
    if !conn.WaitForStateChange(ctx, connectivity.Connecting) {
        conn.Close()
        return nil, fmt.Errorf("connection timeout")
    }
    
    return conn, nil
}

// healthCheck 健康检查
func (p *ConnectionPool) healthCheck() {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()
    
    for range ticker.C {
        p.mu.Lock()
        for i, conn := range p.connections {
            if conn == nil {
                continue
            }
            
            state := conn.GetState()
            if state == connectivity.TransientFailure || state == connectivity.Shutdown {
                // 重新创建连接
                conn.Close()
                newConn, err := p.createConnection(10 * time.Second)
                if err != nil {
                    log.Printf("Failed to recreate connection %d: %v", i, err)
                    p.connections[i] = nil
                } else {
                    p.connections[i] = newConn
                }
            }
        }
        p.mu.Unlock()
    }
}

// GetStats 获取连接池统计信息
func (p *ConnectionPool) GetStats() map[string]interface{} {
    p.mu.RLock()
    defer p.mu.RUnlock()
    
    stats := map[string]interface{}{
        "pool_size": p.poolSize,
        "target":    p.target,
    }
    
    stateCount := make(map[connectivity.State]int)
    for _, conn := range p.connections {
        if conn != nil {
            state := conn.GetState()
            stateCount[state]++
        } else {
            stateCount[connectivity.Shutdown]++
        }
    }
    
    stats["connection_states"] = stateCount
    return stats
}

// PooledClient 使用连接池的客户端
type PooledClient struct {
    pool   *ConnectionPool
    client pb.UserServiceClient
}

// NewPooledClient 创建使用连接池的客户端
func NewPooledClient(pool *ConnectionPool) *PooledClient {
    return &PooledClient{
        pool: pool,
    }
}

// CreateUser 使用连接池创建用户
func (c *PooledClient) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.CreateUserResponse, error) {
    conn := c.pool.GetHealthyConnection()
    client := pb.NewUserServiceClient(conn)
    
    return client.CreateUser(ctx, req)
}

// GetUser 使用连接池获取用户
func (c *PooledClient) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
    conn := c.pool.GetHealthyConnection()
    client := pb.NewUserServiceClient(conn)
    
    return client.GetUser(ctx, req)
}
"""
    
    def create_retry_mechanism(self) -> str:
        """创建重试机制"""
        return """
// retry.go - gRPC 重试机制
package client

import (
    "context"
    "fmt"
    "math"
    "math/rand"
    "time"
    
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

// RetryConfig 重试配置
type RetryConfig struct {
    MaxAttempts       int
    BaseDelay         time.Duration
    MaxDelay          time.Duration
    BackoffMultiplier float64
    Jitter            bool
    RetryableCodes    []codes.Code
}

// DefaultRetryConfig 默认重试配置
func DefaultRetryConfig() *RetryConfig {
    return &RetryConfig{
        MaxAttempts:       3,
        BaseDelay:         100 * time.Millisecond,
        MaxDelay:          30 * time.Second,
        BackoffMultiplier: 2.0,
        Jitter:            true,
        RetryableCodes: []codes.Code{
            codes.Unavailable,
            codes.DeadlineExceeded,
            codes.ResourceExhausted,
            codes.Aborted,
            codes.Internal,
        },
    }
}

// RetryInterceptor 重试拦截器
func RetryInterceptor(config *RetryConfig) grpc.UnaryClientInterceptor {
    return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
        var lastErr error
        
        for attempt := 0; attempt < config.MaxAttempts; attempt++ {
            // 第一次尝试不延迟
            if attempt > 0 {
                delay := calculateDelay(config, attempt)
                select {
                case <-time.After(delay):
                case <-ctx.Done():
                    return ctx.Err()
                }
            }
            
            // 执行调用
            err := invoker(ctx, method, req, reply, cc, opts...)
            if err == nil {
                return nil // 成功
            }
            
            lastErr = err
            
            // 检查是否应该重试
            if !shouldRetry(err, config.RetryableCodes) {
                return err
            }
            
            // 检查上下文是否已取消
            if ctx.Err() != nil {
                return ctx.Err()
            }
        }
        
        return lastErr
    }
}

// calculateDelay 计算延迟时间
func calculateDelay(config *RetryConfig, attempt int) time.Duration {
    delay := float64(config.BaseDelay) * math.Pow(config.BackoffMultiplier, float64(attempt-1))
    
    if delay > float64(config.MaxDelay) {
        delay = float64(config.MaxDelay)
    }
    
    // 添加抖动
    if config.Jitter {
        jitter := rand.Float64() * 0.1 * delay // 10% 抖动
        delay += jitter
    }
    
    return time.Duration(delay)
}

// shouldRetry 判断是否应该重试
func shouldRetry(err error, retryableCodes []codes.Code) bool {
    st, ok := status.FromError(err)
    if !ok {
        return false
    }
    
    for _, code := range retryableCodes {
        if st.Code() == code {
            return true
        }
    }
    
    return false
}

// RetryableClient 支持重试的客户端
type RetryableClient struct {
    client pb.UserServiceClient
    config *RetryConfig
}

// NewRetryableClient 创建支持重试的客户端
func NewRetryableClient(conn *grpc.ClientConn, config *RetryConfig) *RetryableClient {
    return &RetryableClient{
        client: pb.NewUserServiceClient(conn),
        config: config,
    }
}

// CreateUserWithRetry 带重试的创建用户
func (c *RetryableClient) CreateUserWithRetry(ctx context.Context, req *pb.CreateUserRequest) (*pb.CreateUserResponse, error) {
    var lastErr error
    
    for attempt := 0; attempt < c.config.MaxAttempts; attempt++ {
        if attempt > 0 {
            delay := calculateDelay(c.config, attempt)
            log.Printf("Retrying CreateUser (attempt %d) after %v", attempt+1, delay)
            
            select {
            case <-time.After(delay):
            case <-ctx.Done():
                return nil, ctx.Err()
            }
        }
        
        resp, err := c.client.CreateUser(ctx, req)
        if err == nil {
            if attempt > 0 {
                log.Printf("CreateUser succeeded on attempt %d", attempt+1)
            }
            return resp, nil
        }
        
        lastErr = err
        
        if !shouldRetry(err, c.config.RetryableCodes) {
            return nil, err
        }
        
        if ctx.Err() != nil {
            return nil, ctx.Err()
        }
    }
    
    return nil, fmt.Errorf("max retry attempts exceeded: %w", lastErr)
}

// CircuitBreaker 熔断器
type CircuitBreaker struct {
    maxFailures     int
    resetTimeout    time.Duration
    failureCount    int
    lastFailureTime time.Time
    state           CircuitState
    mu              sync.RWMutex
}

type CircuitState int

const (
    CircuitClosed CircuitState = iota
    CircuitOpen
    CircuitHalfOpen
)

// NewCircuitBreaker 创建熔断器
func NewCircuitBreaker(maxFailures int, resetTimeout time.Duration) *CircuitBreaker {
    return &CircuitBreaker{
        maxFailures:  maxFailures,
        resetTimeout: resetTimeout,
        state:        CircuitClosed,
    }
}

// Call 执行调用
func (cb *CircuitBreaker) Call(fn func() error) error {
    cb.mu.Lock()
    defer cb.mu.Unlock()
    
    // 检查熔断器状态
    if cb.state == CircuitOpen {
        if time.Since(cb.lastFailureTime) > cb.resetTimeout {
            cb.state = CircuitHalfOpen
            cb.failureCount = 0
        } else {
            return fmt.Errorf("circuit breaker is open")
        }
    }
    
    // 执行函数
    err := fn()
    
    if err != nil {
        cb.failureCount++
        cb.lastFailureTime = time.Now()
        
        if cb.failureCount >= cb.maxFailures {
            cb.state = CircuitOpen
        }
        
        return err
    }
    
    // 成功时重置
    if cb.state == CircuitHalfOpen {
        cb.state = CircuitClosed
    }
    cb.failureCount = 0
    
    return nil
}

// GetState 获取熔断器状态
func (cb *CircuitBreaker) GetState() CircuitState {
    cb.mu.RLock()
    defer cb.mu.RUnlock()
    return cb.state
}
"""
    
    def create_load_balancing_config(self) -> str:
        """创建负载均衡配置"""
        return """
// load_balancing.go - gRPC 负载均衡配置
package client

import (
    "context"
    "fmt"
    "log"
    "time"
    
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/grpc/resolver"
    "google.golang.org/grpc/resolver/manual"
)

// LoadBalancedClient 负载均衡客户端
type LoadBalancedClient struct {
    conn     *grpc.ClientConn
    client   pb.UserServiceClient
    resolver *manual.Resolver
}

// NewLoadBalancedClient 创建负载均衡客户端
func NewLoadBalancedClient(endpoints []string, policy string) (*LoadBalancedClient, error) {
    // 创建手动解析器
    r := manual.NewBuilderWithScheme("example")
    
    // 构建地址列表
    var addrs []resolver.Address
    for _, endpoint := range endpoints {
        addrs = append(addrs, resolver.Address{Addr: endpoint})
    }
    
    // 配置连接选项
    opts := []grpc.DialOption{
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithResolvers(r),
        grpc.WithDefaultServiceConfig(fmt.Sprintf(`{
            "loadBalancingPolicy": "%s",
            "healthCheckConfig": {
                "serviceName": ""
            }
        }`, policy)),
    }
    
    // 建立连接
    conn, err := grpc.Dial("example:///service", opts...)
    if err != nil {
        return nil, fmt.Errorf("failed to dial: %w", err)
    }
    
    // 更新解析器地址
    r.UpdateState(resolver.State{Addresses: addrs})
    
    return &LoadBalancedClient{
        conn:     conn,
        client:   pb.NewUserServiceClient(conn),
        resolver: r,
    }, nil
}

// UpdateEndpoints 更新端点列表
func (c *LoadBalancedClient) UpdateEndpoints(endpoints []string) {
    var addrs []resolver.Address
    for _, endpoint := range endpoints {
        addrs = append(addrs, resolver.Address{Addr: endpoint})
    }
    
    c.resolver.UpdateState(resolver.State{Addresses: addrs})
    log.Printf("Updated endpoints: %v", endpoints)
}

// Close 关闭客户端
func (c *LoadBalancedClient) Close() error {
    return c.conn.Close()
}

// CreateUser 创建用户
func (c *LoadBalancedClient) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.CreateUserResponse, error) {
    return c.client.CreateUser(ctx, req)
}

// ServiceDiscoveryClient 服务发现客户端
type ServiceDiscoveryClient struct {
    serviceName string
    registry    ServiceRegistry
    client      *LoadBalancedClient
    stopCh      chan struct{}
}

// ServiceRegistry 服务注册接口
type ServiceRegistry interface {
    Discover(serviceName string) ([]string, error)
    Watch(serviceName string) (<-chan []string, error)
}

// NewServiceDiscoveryClient 创建服务发现客户端
func NewServiceDiscoveryClient(serviceName string, registry ServiceRegistry) (*ServiceDiscoveryClient, error) {
    // 初始发现服务
    endpoints, err := registry.Discover(serviceName)
    if err != nil {
        return nil, fmt.Errorf("failed to discover service: %w", err)
    }
    
    if len(endpoints) == 0 {
        return nil, fmt.Errorf("no endpoints found for service: %s", serviceName)
    }
    
    // 创建负载均衡客户端
    client, err := NewLoadBalancedClient(endpoints, "round_robin")
    if err != nil {
        return nil, err
    }
    
    sdc := &ServiceDiscoveryClient{
        serviceName: serviceName,
        registry:    registry,
        client:      client,
        stopCh:      make(chan struct{}),
    }
    
    // 启动服务监听
    go sdc.watchService()
    
    return sdc, nil
}

// watchService 监听服务变化
func (sdc *ServiceDiscoveryClient) watchService() {
    watchCh, err := sdc.registry.Watch(sdc.serviceName)
    if err != nil {
        log.Printf("Failed to watch service %s: %v", sdc.serviceName, err)
        return
    }
    
    for {
        select {
        case endpoints := <-watchCh:
            if len(endpoints) > 0 {
                sdc.client.UpdateEndpoints(endpoints)
            }
        case <-sdc.stopCh:
            return
        }
    }
}

// Close 关闭服务发现客户端
func (sdc *ServiceDiscoveryClient) Close() error {
    close(sdc.stopCh)
    return sdc.client.Close()
}

// CreateUser 创建用户
func (sdc *ServiceDiscoveryClient) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.CreateUserResponse, error) {
    return sdc.client.CreateUser(ctx, req)
}

// ConsulRegistry Consul 服务注册实现
type ConsulRegistry struct {
    client *consul.Client
}

// NewConsulRegistry 创建 Consul 注册器
func NewConsulRegistry(address string) (*ConsulRegistry, error) {
    config := consul.DefaultConfig()
    config.Address = address
    
    client, err := consul.NewClient(config)
    if err != nil {
        return nil, err
    }
    
    return &ConsulRegistry{client: client}, nil
}

// Discover 发现服务
func (cr *ConsulRegistry) Discover(serviceName string) ([]string, error) {
    services, _, err := cr.client.Health().Service(serviceName, "", true, nil)
    if err != nil {
        return nil, err
    }
    
    var endpoints []string
    for _, service := range services {
        endpoint := fmt.Sprintf("%s:%d", service.Service.Address, service.Service.Port)
        endpoints = append(endpoints, endpoint)
    }
    
    return endpoints, nil
}

// Watch 监听服务变化
func (cr *ConsulRegistry) Watch(serviceName string) (<-chan []string, error) {
    ch := make(chan []string, 1)
    
    go func() {
        defer close(ch)
        
        var lastIndex uint64
        for {
            services, meta, err := cr.client.Health().Service(serviceName, "", true, &consul.QueryOptions{
                WaitIndex: lastIndex,
                WaitTime:  30 * time.Second,
            })
            
            if err != nil {
                log.Printf("Error watching service %s: %v", serviceName, err)
                time.Sleep(5 * time.Second)
                continue
            }
            
            lastIndex = meta.LastIndex
            
            var endpoints []string
            for _, service := range services {
                endpoint := fmt.Sprintf("%s:%d", service.Service.Address, service.Service.Port)
                endpoints = append(endpoints, endpoint)
            }
            
            select {
            case ch <- endpoints:
            default:
            }
        }
    }()
    
    return ch, nil
}
"""

# 创建客户端管理器实例
client_mgr = GRPCClientManager(ClientConfig())

# 生成基础客户端实现
client_impl = client_mgr.create_basic_client_implementation()
print("=== gRPC 客户端基础实现 ===")
print("✓ 一元调用客户端")
print("✓ 服务端流客户端")
print("✓ 客户端流客户端")
print("✓ 双向流客户端")
print("✓ 连接管理")

# 生成连接池管理器
pool_mgr = client_mgr.create_connection_pool_manager()
print("\n=== 连接池管理 ===")
print("✓ 连接池创建")
print("✓ 连接健康检查")
print("✓ 连接重建")
print("✓ 负载均衡")

# 生成重试机制
retry_mechanism = client_mgr.create_retry_mechanism()
print("\n=== 重试机制 ===")
print("✓ 指数退避")
print("✓ 重试策略")
print("✓ 熔断器")
print("✓ 错误分类")

# 生成负载均衡配置
lb_config = client_mgr.create_load_balancing_config()
print("\n=== 负载均衡 ===")
print("✓ 轮询策略")
print("✓ 服务发现")
print("✓ 健康检查")
print("✓ 动态更新")

客户端最佳实践

1. 连接管理

// 连接复用
var (
    clientConn *grpc.ClientConn
    once       sync.Once
)

func GetConnection() *grpc.ClientConn {
    once.Do(func() {
        var err error
        clientConn, err = grpc.Dial("localhost:50051", 
            grpc.WithTransportCredentials(insecure.NewCredentials()),
            grpc.WithKeepaliveParams(keepalive.ClientParameters{
                Time:                30 * time.Second,
                Timeout:             5 * time.Second,
                PermitWithoutStream: true,
            }),
        )
        if err != nil {
            log.Fatalf("Failed to connect: %v", err)
        }
    })
    return clientConn
}

2. 超时控制

// 设置合理的超时时间
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

resp, err := client.CreateUser(ctx, req)

3. 错误处理

// 详细的错误处理
if err != nil {
    if st, ok := status.FromError(err); ok {
        switch st.Code() {
        case codes.NotFound:
            // 处理资源不存在
        case codes.InvalidArgument:
            // 处理参数错误
        case codes.Unavailable:
            // 处理服务不可用,可能需要重试
        default:
            // 处理其他错误
        }
    }
}

总结

通过本章的学习,您应该已经掌握了:

  1. 客户端架构:连接管理、配置选项、调用模式
  2. 连接池管理:连接复用、健康检查、故障恢复
  3. 重试机制:指数退避、熔断器、错误分类
  4. 负载均衡:策略配置、服务发现、动态更新
  5. 最佳实践:超时控制、错误处理、资源管理

在下一章中,我们将学习 gRPC 的流式处理,包括四种流式模式的深入应用和性能优化。