概述
本章将深入介绍 gRPC 客户端的开发,包括连接管理、调用模式、错误处理、重试机制、负载均衡等核心内容。我们将学习如何构建高效、可靠的 gRPC 客户端应用。
学习目标
- 掌握 gRPC 客户端的基本架构和连接管理
- 学习四种不同类型的 RPC 调用模式
- 了解客户端拦截器和中间件的使用
- 掌握错误处理和重试机制
- 学习连接池和负载均衡配置
gRPC 客户端架构
from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Callable, Iterator
from abc import ABC, abstractmethod
import asyncio
import time
import random
class ConnectionState(Enum):
"""连接状态枚举"""
IDLE = "idle"
CONNECTING = "connecting"
READY = "ready"
TRANSIENT_FAILURE = "transient_failure"
SHUTDOWN = "shutdown"
class LoadBalancingPolicy(Enum):
"""负载均衡策略枚举"""
ROUND_ROBIN = "round_robin"
PICK_FIRST = "pick_first"
GRPCLB = "grpclb"
WEIGHTED_ROUND_ROBIN = "weighted_round_robin"
CONSISTENT_HASH = "consistent_hash"
class RetryPolicy(Enum):
"""重试策略枚举"""
EXPONENTIAL_BACKOFF = "exponential_backoff"
LINEAR_BACKOFF = "linear_backoff"
FIXED_DELAY = "fixed_delay"
IMMEDIATE = "immediate"
@dataclass
class ClientConfig:
"""客户端配置"""
target: str = "localhost:50051"
insecure: bool = True
max_receive_message_length: int = 4 * 1024 * 1024 # 4MB
max_send_message_length: int = 4 * 1024 * 1024 # 4MB
keepalive_time_ms: int = 30000
keepalive_timeout_ms: int = 5000
keepalive_permit_without_calls: bool = True
max_connection_idle_ms: int = 300000
max_connection_age_ms: int = 600000
initial_window_size: int = 65536
initial_conn_window_size: int = 65536
load_balancing_policy: LoadBalancingPolicy = LoadBalancingPolicy.ROUND_ROBIN
enable_retry: bool = True
max_retry_attempts: int = 3
retry_policy: RetryPolicy = RetryPolicy.EXPONENTIAL_BACKOFF
base_delay_ms: int = 100
max_delay_ms: int = 30000
backoff_multiplier: float = 2.0
timeout_ms: int = 30000
@dataclass
class CallOptions:
"""调用选项"""
timeout_ms: Optional[int] = None
metadata: Dict[str, str] = None
retry_policy: Optional[RetryPolicy] = None
max_retry_attempts: Optional[int] = None
compression: Optional[str] = None
wait_for_ready: bool = False
class GRPCClientManager:
"""gRPC 客户端管理器"""
def __init__(self, config: ClientConfig):
self.config = config
self.connections = {}
self.connection_pool = []
def create_basic_client_implementation(self) -> str:
"""创建基础客户端实现"""
return """
// client.go - gRPC 客户端基础实现
package main
import (
"context"
"fmt"
"io"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
pb "./proto/user"
)
// UserClient gRPC 用户客户端
type UserClient struct {
conn *grpc.ClientConn
client pb.UserServiceClient
config *ClientConfig
}
// ClientConfig 客户端配置
type ClientConfig struct {
Target string
Timeout time.Duration
MaxRetryAttempts int
BaseDelay time.Duration
MaxDelay time.Duration
BackoffMultiplier float64
KeepaliveTime time.Duration
KeepaliveTimeout time.Duration
MaxReceiveMessageLength int
MaxSendMessageLength int
}
// NewUserClient 创建用户客户端
func NewUserClient(config *ClientConfig) (*UserClient, error) {
// 配置连接选项
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: config.KeepaliveTime,
Timeout: config.KeepaliveTimeout,
PermitWithoutStream: true,
}),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(config.MaxReceiveMessageLength),
grpc.MaxCallSendMsgSize(config.MaxSendMessageLength),
),
// 添加拦截器
grpc.WithUnaryInterceptor(clientUnaryInterceptor()),
grpc.WithStreamInterceptor(clientStreamInterceptor()),
}
// 建立连接
conn, err := grpc.Dial(config.Target, opts...)
if err != nil {
return nil, fmt.Errorf("failed to connect: %w", err)
}
// 等待连接就绪
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if !conn.WaitForStateChange(ctx, connectivity.Connecting) {
conn.Close()
return nil, fmt.Errorf("connection timeout")
}
return &UserClient{
conn: conn,
client: pb.NewUserServiceClient(conn),
config: config,
}, nil
}
// Close 关闭客户端连接
func (c *UserClient) Close() error {
return c.conn.Close()
}
// CreateUser 创建用户(一元调用)
func (c *UserClient) CreateUser(ctx context.Context, req *pb.CreateUserRequest, opts ...CallOption) (*pb.CreateUserResponse, error) {
// 应用调用选项
callOpts := &CallOptions{}
for _, opt := range opts {
opt(callOpts)
}
// 设置超时
if callOpts.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, callOpts.Timeout)
defer cancel()
}
// 添加元数据
if len(callOpts.Metadata) > 0 {
md := metadata.New(callOpts.Metadata)
ctx = metadata.NewOutgoingContext(ctx, md)
}
// 执行调用
resp, err := c.client.CreateUser(ctx, req)
if err != nil {
return nil, handleGRPCError(err)
}
return resp, nil
}
// GetUser 获取用户信息(一元调用)
func (c *UserClient) GetUser(ctx context.Context, userID string, opts ...CallOption) (*pb.User, error) {
req := &pb.GetUserRequest{UserId: userID}
resp, err := c.client.GetUser(ctx, req)
if err != nil {
return nil, handleGRPCError(err)
}
return resp.User, nil
}
// ListUsers 列出用户(服务端流)
func (c *UserClient) ListUsers(ctx context.Context, req *pb.ListUsersRequest, opts ...CallOption) (<-chan *pb.User, <-chan error) {
userChan := make(chan *pb.User, 100)
errChan := make(chan error, 1)
go func() {
defer close(userChan)
defer close(errChan)
stream, err := c.client.ListUsers(ctx, req)
if err != nil {
errChan <- handleGRPCError(err)
return
}
for {
user, err := stream.Recv()
if err == io.EOF {
return
}
if err != nil {
errChan <- handleGRPCError(err)
return
}
select {
case userChan <- user:
case <-ctx.Done():
errChan <- ctx.Err()
return
}
}
}()
return userChan, errChan
}
// BatchCreateUsers 批量创建用户(客户端流)
func (c *UserClient) BatchCreateUsers(ctx context.Context, users []*pb.CreateUserRequest, opts ...CallOption) (*pb.BatchCreateUsersResponse, error) {
stream, err := c.client.BatchCreateUsers(ctx)
if err != nil {
return nil, handleGRPCError(err)
}
// 发送用户数据
for _, user := range users {
if err := stream.Send(user); err != nil {
return nil, handleGRPCError(err)
}
}
// 关闭发送并接收响应
resp, err := stream.CloseAndRecv()
if err != nil {
return nil, handleGRPCError(err)
}
return resp, nil
}
// SyncUsers 同步用户数据(双向流)
func (c *UserClient) SyncUsers(ctx context.Context) (*UserSyncClient, error) {
stream, err := c.client.SyncUsers(ctx)
if err != nil {
return nil, handleGRPCError(err)
}
return &UserSyncClient{
stream: stream,
ctx: ctx,
}, nil
}
// UserSyncClient 用户同步客户端
type UserSyncClient struct {
stream pb.UserService_SyncUsersClient
ctx context.Context
}
// Send 发送同步请求
func (c *UserSyncClient) Send(req *pb.SyncUserRequest) error {
return c.stream.Send(req)
}
// Recv 接收同步响应
func (c *UserSyncClient) Recv() (*pb.SyncUserResponse, error) {
return c.stream.Recv()
}
// Close 关闭同步流
func (c *UserSyncClient) Close() error {
return c.stream.CloseSend()
}
// CallOption 调用选项函数类型
type CallOption func(*CallOptions)
// CallOptions 调用选项
type CallOptions struct {
Timeout time.Duration
Metadata map[string]string
Retry bool
}
// WithTimeout 设置超时时间
func WithTimeout(timeout time.Duration) CallOption {
return func(opts *CallOptions) {
opts.Timeout = timeout
}
}
// WithMetadata 设置元数据
func WithMetadata(md map[string]string) CallOption {
return func(opts *CallOptions) {
opts.Metadata = md
}
}
// WithRetry 启用重试
func WithRetry() CallOption {
return func(opts *CallOptions) {
opts.Retry = true
}
}
// 错误处理
func handleGRPCError(err error) error {
st, ok := status.FromError(err)
if !ok {
return err
}
switch st.Code() {
case codes.NotFound:
return fmt.Errorf("resource not found: %s", st.Message())
case codes.AlreadyExists:
return fmt.Errorf("resource already exists: %s", st.Message())
case codes.InvalidArgument:
return fmt.Errorf("invalid argument: %s", st.Message())
case codes.PermissionDenied:
return fmt.Errorf("permission denied: %s", st.Message())
case codes.Unauthenticated:
return fmt.Errorf("authentication required: %s", st.Message())
case codes.Unavailable:
return fmt.Errorf("service unavailable: %s", st.Message())
case codes.DeadlineExceeded:
return fmt.Errorf("request timeout: %s", st.Message())
default:
return fmt.Errorf("grpc error [%s]: %s", st.Code(), st.Message())
}
}
// 客户端拦截器
func clientUnaryInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
start := time.Now()
log.Printf("[CLIENT] Calling method: %s", method)
err := invoker(ctx, method, req, reply, cc, opts...)
duration := time.Since(start)
if err != nil {
log.Printf("[CLIENT] Method: %s, Error: %v, Duration: %s", method, err, duration)
} else {
log.Printf("[CLIENT] Method: %s, Success, Duration: %s", method, duration)
}
return err
}
}
func clientStreamInterceptor() grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
log.Printf("[CLIENT] Starting stream: %s", method)
stream, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
log.Printf("[CLIENT] Stream creation failed: %s, Error: %v", method, err)
return nil, err
}
return &wrappedClientStream{
ClientStream: stream,
method: method,
}, nil
}
}
type wrappedClientStream struct {
grpc.ClientStream
method string
}
func (w *wrappedClientStream) SendMsg(m interface{}) error {
log.Printf("[CLIENT] Sending message on stream: %s", w.method)
return w.ClientStream.SendMsg(m)
}
func (w *wrappedClientStream) RecvMsg(m interface{}) error {
err := w.ClientStream.RecvMsg(m)
if err != nil && err != io.EOF {
log.Printf("[CLIENT] Receive error on stream: %s, Error: %v", w.method, err)
}
return err
}
// 使用示例
func main() {
config := &ClientConfig{
Target: "localhost:50051",
Timeout: 30 * time.Second,
MaxRetryAttempts: 3,
BaseDelay: 100 * time.Millisecond,
MaxDelay: 30 * time.Second,
BackoffMultiplier: 2.0,
KeepaliveTime: 30 * time.Second,
KeepaliveTimeout: 5 * time.Second,
MaxReceiveMessageLength: 4 * 1024 * 1024,
MaxSendMessageLength: 4 * 1024 * 1024,
}
client, err := NewUserClient(config)
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
defer client.Close()
ctx := context.Background()
// 创建用户
createReq := &pb.CreateUserRequest{
Username: "testuser",
Email: "test@example.com",
FullName: "Test User",
}
createResp, err := client.CreateUser(ctx, createReq,
WithTimeout(5*time.Second),
WithMetadata(map[string]string{"client-id": "test-client"}),
)
if err != nil {
log.Fatalf("Failed to create user: %v", err)
}
log.Printf("Created user: %v", createResp.User)
// 获取用户
user, err := client.GetUser(ctx, createResp.User.Id)
if err != nil {
log.Fatalf("Failed to get user: %v", err)
}
log.Printf("Retrieved user: %v", user)
// 列出用户(流式)
userChan, errChan := client.ListUsers(ctx, &pb.ListUsersRequest{})
for {
select {
case user, ok := <-userChan:
if !ok {
log.Println("User stream completed")
return
}
log.Printf("Received user: %v", user)
case err := <-errChan:
if err != nil {
log.Fatalf("Stream error: %v", err)
}
case <-ctx.Done():
log.Println("Context cancelled")
return
}
}
}
"""
def create_connection_pool_manager(self) -> str:
"""创建连接池管理器"""
return """
// connection_pool.go - gRPC 连接池管理
package client
import (
"context"
"fmt"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
)
// ConnectionPool gRPC 连接池
type ConnectionPool struct {
target string
poolSize int
connections []*grpc.ClientConn
current int
mu sync.RWMutex
dialOptions []grpc.DialOption
}
// PoolConfig 连接池配置
type PoolConfig struct {
Target string
PoolSize int
MaxReceiveMessageLength int
MaxSendMessageLength int
KeepaliveTime time.Duration
KeepaliveTimeout time.Duration
ConnectTimeout time.Duration
}
// NewConnectionPool 创建连接池
func NewConnectionPool(config *PoolConfig) (*ConnectionPool, error) {
if config.PoolSize <= 0 {
config.PoolSize = 5 // 默认连接池大小
}
dialOptions := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: config.KeepaliveTime,
Timeout: config.KeepaliveTimeout,
PermitWithoutStream: true,
}),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(config.MaxReceiveMessageLength),
grpc.MaxCallSendMsgSize(config.MaxSendMessageLength),
),
}
pool := &ConnectionPool{
target: config.Target,
poolSize: config.PoolSize,
connections: make([]*grpc.ClientConn, config.PoolSize),
dialOptions: dialOptions,
}
// 初始化连接
for i := 0; i < config.PoolSize; i++ {
conn, err := pool.createConnection(config.ConnectTimeout)
if err != nil {
// 清理已创建的连接
pool.Close()
return nil, fmt.Errorf("failed to create connection %d: %w", i, err)
}
pool.connections[i] = conn
}
// 启动健康检查
go pool.healthCheck()
return pool, nil
}
// GetConnection 获取连接(轮询)
func (p *ConnectionPool) GetConnection() *grpc.ClientConn {
p.mu.Lock()
defer p.mu.Unlock()
conn := p.connections[p.current]
p.current = (p.current + 1) % p.poolSize
return conn
}
// GetHealthyConnection 获取健康的连接
func (p *ConnectionPool) GetHealthyConnection() *grpc.ClientConn {
p.mu.RLock()
defer p.mu.RUnlock()
// 从当前位置开始查找健康连接
start := p.current
for i := 0; i < p.poolSize; i++ {
idx := (start + i) % p.poolSize
conn := p.connections[idx]
if conn.GetState() == connectivity.Ready {
p.current = (idx + 1) % p.poolSize
return conn
}
}
// 如果没有健康连接,返回当前连接
return p.connections[p.current]
}
// Close 关闭连接池
func (p *ConnectionPool) Close() error {
p.mu.Lock()
defer p.mu.Unlock()
var lastErr error
for i, conn := range p.connections {
if conn != nil {
if err := conn.Close(); err != nil {
lastErr = err
}
p.connections[i] = nil
}
}
return lastErr
}
// createConnection 创建单个连接
func (p *ConnectionPool) createConnection(timeout time.Duration) (*grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
conn, err := grpc.DialContext(ctx, p.target, p.dialOptions...)
if err != nil {
return nil, err
}
// 等待连接就绪
if !conn.WaitForStateChange(ctx, connectivity.Connecting) {
conn.Close()
return nil, fmt.Errorf("connection timeout")
}
return conn, nil
}
// healthCheck 健康检查
func (p *ConnectionPool) healthCheck() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
p.mu.Lock()
for i, conn := range p.connections {
if conn == nil {
continue
}
state := conn.GetState()
if state == connectivity.TransientFailure || state == connectivity.Shutdown {
// 重新创建连接
conn.Close()
newConn, err := p.createConnection(10 * time.Second)
if err != nil {
log.Printf("Failed to recreate connection %d: %v", i, err)
p.connections[i] = nil
} else {
p.connections[i] = newConn
}
}
}
p.mu.Unlock()
}
}
// GetStats 获取连接池统计信息
func (p *ConnectionPool) GetStats() map[string]interface{} {
p.mu.RLock()
defer p.mu.RUnlock()
stats := map[string]interface{}{
"pool_size": p.poolSize,
"target": p.target,
}
stateCount := make(map[connectivity.State]int)
for _, conn := range p.connections {
if conn != nil {
state := conn.GetState()
stateCount[state]++
} else {
stateCount[connectivity.Shutdown]++
}
}
stats["connection_states"] = stateCount
return stats
}
// PooledClient 使用连接池的客户端
type PooledClient struct {
pool *ConnectionPool
client pb.UserServiceClient
}
// NewPooledClient 创建使用连接池的客户端
func NewPooledClient(pool *ConnectionPool) *PooledClient {
return &PooledClient{
pool: pool,
}
}
// CreateUser 使用连接池创建用户
func (c *PooledClient) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.CreateUserResponse, error) {
conn := c.pool.GetHealthyConnection()
client := pb.NewUserServiceClient(conn)
return client.CreateUser(ctx, req)
}
// GetUser 使用连接池获取用户
func (c *PooledClient) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) {
conn := c.pool.GetHealthyConnection()
client := pb.NewUserServiceClient(conn)
return client.GetUser(ctx, req)
}
"""
def create_retry_mechanism(self) -> str:
"""创建重试机制"""
return """
// retry.go - gRPC 重试机制
package client
import (
"context"
"fmt"
"math"
"math/rand"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// RetryConfig 重试配置
type RetryConfig struct {
MaxAttempts int
BaseDelay time.Duration
MaxDelay time.Duration
BackoffMultiplier float64
Jitter bool
RetryableCodes []codes.Code
}
// DefaultRetryConfig 默认重试配置
func DefaultRetryConfig() *RetryConfig {
return &RetryConfig{
MaxAttempts: 3,
BaseDelay: 100 * time.Millisecond,
MaxDelay: 30 * time.Second,
BackoffMultiplier: 2.0,
Jitter: true,
RetryableCodes: []codes.Code{
codes.Unavailable,
codes.DeadlineExceeded,
codes.ResourceExhausted,
codes.Aborted,
codes.Internal,
},
}
}
// RetryInterceptor 重试拦截器
func RetryInterceptor(config *RetryConfig) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
var lastErr error
for attempt := 0; attempt < config.MaxAttempts; attempt++ {
// 第一次尝试不延迟
if attempt > 0 {
delay := calculateDelay(config, attempt)
select {
case <-time.After(delay):
case <-ctx.Done():
return ctx.Err()
}
}
// 执行调用
err := invoker(ctx, method, req, reply, cc, opts...)
if err == nil {
return nil // 成功
}
lastErr = err
// 检查是否应该重试
if !shouldRetry(err, config.RetryableCodes) {
return err
}
// 检查上下文是否已取消
if ctx.Err() != nil {
return ctx.Err()
}
}
return lastErr
}
}
// calculateDelay 计算延迟时间
func calculateDelay(config *RetryConfig, attempt int) time.Duration {
delay := float64(config.BaseDelay) * math.Pow(config.BackoffMultiplier, float64(attempt-1))
if delay > float64(config.MaxDelay) {
delay = float64(config.MaxDelay)
}
// 添加抖动
if config.Jitter {
jitter := rand.Float64() * 0.1 * delay // 10% 抖动
delay += jitter
}
return time.Duration(delay)
}
// shouldRetry 判断是否应该重试
func shouldRetry(err error, retryableCodes []codes.Code) bool {
st, ok := status.FromError(err)
if !ok {
return false
}
for _, code := range retryableCodes {
if st.Code() == code {
return true
}
}
return false
}
// RetryableClient 支持重试的客户端
type RetryableClient struct {
client pb.UserServiceClient
config *RetryConfig
}
// NewRetryableClient 创建支持重试的客户端
func NewRetryableClient(conn *grpc.ClientConn, config *RetryConfig) *RetryableClient {
return &RetryableClient{
client: pb.NewUserServiceClient(conn),
config: config,
}
}
// CreateUserWithRetry 带重试的创建用户
func (c *RetryableClient) CreateUserWithRetry(ctx context.Context, req *pb.CreateUserRequest) (*pb.CreateUserResponse, error) {
var lastErr error
for attempt := 0; attempt < c.config.MaxAttempts; attempt++ {
if attempt > 0 {
delay := calculateDelay(c.config, attempt)
log.Printf("Retrying CreateUser (attempt %d) after %v", attempt+1, delay)
select {
case <-time.After(delay):
case <-ctx.Done():
return nil, ctx.Err()
}
}
resp, err := c.client.CreateUser(ctx, req)
if err == nil {
if attempt > 0 {
log.Printf("CreateUser succeeded on attempt %d", attempt+1)
}
return resp, nil
}
lastErr = err
if !shouldRetry(err, c.config.RetryableCodes) {
return nil, err
}
if ctx.Err() != nil {
return nil, ctx.Err()
}
}
return nil, fmt.Errorf("max retry attempts exceeded: %w", lastErr)
}
// CircuitBreaker 熔断器
type CircuitBreaker struct {
maxFailures int
resetTimeout time.Duration
failureCount int
lastFailureTime time.Time
state CircuitState
mu sync.RWMutex
}
type CircuitState int
const (
CircuitClosed CircuitState = iota
CircuitOpen
CircuitHalfOpen
)
// NewCircuitBreaker 创建熔断器
func NewCircuitBreaker(maxFailures int, resetTimeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
maxFailures: maxFailures,
resetTimeout: resetTimeout,
state: CircuitClosed,
}
}
// Call 执行调用
func (cb *CircuitBreaker) Call(fn func() error) error {
cb.mu.Lock()
defer cb.mu.Unlock()
// 检查熔断器状态
if cb.state == CircuitOpen {
if time.Since(cb.lastFailureTime) > cb.resetTimeout {
cb.state = CircuitHalfOpen
cb.failureCount = 0
} else {
return fmt.Errorf("circuit breaker is open")
}
}
// 执行函数
err := fn()
if err != nil {
cb.failureCount++
cb.lastFailureTime = time.Now()
if cb.failureCount >= cb.maxFailures {
cb.state = CircuitOpen
}
return err
}
// 成功时重置
if cb.state == CircuitHalfOpen {
cb.state = CircuitClosed
}
cb.failureCount = 0
return nil
}
// GetState 获取熔断器状态
func (cb *CircuitBreaker) GetState() CircuitState {
cb.mu.RLock()
defer cb.mu.RUnlock()
return cb.state
}
"""
def create_load_balancing_config(self) -> str:
"""创建负载均衡配置"""
return """
// load_balancing.go - gRPC 负载均衡配置
package client
import (
"context"
"fmt"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
)
// LoadBalancedClient 负载均衡客户端
type LoadBalancedClient struct {
conn *grpc.ClientConn
client pb.UserServiceClient
resolver *manual.Resolver
}
// NewLoadBalancedClient 创建负载均衡客户端
func NewLoadBalancedClient(endpoints []string, policy string) (*LoadBalancedClient, error) {
// 创建手动解析器
r := manual.NewBuilderWithScheme("example")
// 构建地址列表
var addrs []resolver.Address
for _, endpoint := range endpoints {
addrs = append(addrs, resolver.Address{Addr: endpoint})
}
// 配置连接选项
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{
"loadBalancingPolicy": "%s",
"healthCheckConfig": {
"serviceName": ""
}
}`, policy)),
}
// 建立连接
conn, err := grpc.Dial("example:///service", opts...)
if err != nil {
return nil, fmt.Errorf("failed to dial: %w", err)
}
// 更新解析器地址
r.UpdateState(resolver.State{Addresses: addrs})
return &LoadBalancedClient{
conn: conn,
client: pb.NewUserServiceClient(conn),
resolver: r,
}, nil
}
// UpdateEndpoints 更新端点列表
func (c *LoadBalancedClient) UpdateEndpoints(endpoints []string) {
var addrs []resolver.Address
for _, endpoint := range endpoints {
addrs = append(addrs, resolver.Address{Addr: endpoint})
}
c.resolver.UpdateState(resolver.State{Addresses: addrs})
log.Printf("Updated endpoints: %v", endpoints)
}
// Close 关闭客户端
func (c *LoadBalancedClient) Close() error {
return c.conn.Close()
}
// CreateUser 创建用户
func (c *LoadBalancedClient) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.CreateUserResponse, error) {
return c.client.CreateUser(ctx, req)
}
// ServiceDiscoveryClient 服务发现客户端
type ServiceDiscoveryClient struct {
serviceName string
registry ServiceRegistry
client *LoadBalancedClient
stopCh chan struct{}
}
// ServiceRegistry 服务注册接口
type ServiceRegistry interface {
Discover(serviceName string) ([]string, error)
Watch(serviceName string) (<-chan []string, error)
}
// NewServiceDiscoveryClient 创建服务发现客户端
func NewServiceDiscoveryClient(serviceName string, registry ServiceRegistry) (*ServiceDiscoveryClient, error) {
// 初始发现服务
endpoints, err := registry.Discover(serviceName)
if err != nil {
return nil, fmt.Errorf("failed to discover service: %w", err)
}
if len(endpoints) == 0 {
return nil, fmt.Errorf("no endpoints found for service: %s", serviceName)
}
// 创建负载均衡客户端
client, err := NewLoadBalancedClient(endpoints, "round_robin")
if err != nil {
return nil, err
}
sdc := &ServiceDiscoveryClient{
serviceName: serviceName,
registry: registry,
client: client,
stopCh: make(chan struct{}),
}
// 启动服务监听
go sdc.watchService()
return sdc, nil
}
// watchService 监听服务变化
func (sdc *ServiceDiscoveryClient) watchService() {
watchCh, err := sdc.registry.Watch(sdc.serviceName)
if err != nil {
log.Printf("Failed to watch service %s: %v", sdc.serviceName, err)
return
}
for {
select {
case endpoints := <-watchCh:
if len(endpoints) > 0 {
sdc.client.UpdateEndpoints(endpoints)
}
case <-sdc.stopCh:
return
}
}
}
// Close 关闭服务发现客户端
func (sdc *ServiceDiscoveryClient) Close() error {
close(sdc.stopCh)
return sdc.client.Close()
}
// CreateUser 创建用户
func (sdc *ServiceDiscoveryClient) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.CreateUserResponse, error) {
return sdc.client.CreateUser(ctx, req)
}
// ConsulRegistry Consul 服务注册实现
type ConsulRegistry struct {
client *consul.Client
}
// NewConsulRegistry 创建 Consul 注册器
func NewConsulRegistry(address string) (*ConsulRegistry, error) {
config := consul.DefaultConfig()
config.Address = address
client, err := consul.NewClient(config)
if err != nil {
return nil, err
}
return &ConsulRegistry{client: client}, nil
}
// Discover 发现服务
func (cr *ConsulRegistry) Discover(serviceName string) ([]string, error) {
services, _, err := cr.client.Health().Service(serviceName, "", true, nil)
if err != nil {
return nil, err
}
var endpoints []string
for _, service := range services {
endpoint := fmt.Sprintf("%s:%d", service.Service.Address, service.Service.Port)
endpoints = append(endpoints, endpoint)
}
return endpoints, nil
}
// Watch 监听服务变化
func (cr *ConsulRegistry) Watch(serviceName string) (<-chan []string, error) {
ch := make(chan []string, 1)
go func() {
defer close(ch)
var lastIndex uint64
for {
services, meta, err := cr.client.Health().Service(serviceName, "", true, &consul.QueryOptions{
WaitIndex: lastIndex,
WaitTime: 30 * time.Second,
})
if err != nil {
log.Printf("Error watching service %s: %v", serviceName, err)
time.Sleep(5 * time.Second)
continue
}
lastIndex = meta.LastIndex
var endpoints []string
for _, service := range services {
endpoint := fmt.Sprintf("%s:%d", service.Service.Address, service.Service.Port)
endpoints = append(endpoints, endpoint)
}
select {
case ch <- endpoints:
default:
}
}
}()
return ch, nil
}
"""
# 创建客户端管理器实例
client_mgr = GRPCClientManager(ClientConfig())
# 生成基础客户端实现
client_impl = client_mgr.create_basic_client_implementation()
print("=== gRPC 客户端基础实现 ===")
print("✓ 一元调用客户端")
print("✓ 服务端流客户端")
print("✓ 客户端流客户端")
print("✓ 双向流客户端")
print("✓ 连接管理")
# 生成连接池管理器
pool_mgr = client_mgr.create_connection_pool_manager()
print("\n=== 连接池管理 ===")
print("✓ 连接池创建")
print("✓ 连接健康检查")
print("✓ 连接重建")
print("✓ 负载均衡")
# 生成重试机制
retry_mechanism = client_mgr.create_retry_mechanism()
print("\n=== 重试机制 ===")
print("✓ 指数退避")
print("✓ 重试策略")
print("✓ 熔断器")
print("✓ 错误分类")
# 生成负载均衡配置
lb_config = client_mgr.create_load_balancing_config()
print("\n=== 负载均衡 ===")
print("✓ 轮询策略")
print("✓ 服务发现")
print("✓ 健康检查")
print("✓ 动态更新")
客户端最佳实践
1. 连接管理
// 连接复用
var (
clientConn *grpc.ClientConn
once sync.Once
)
func GetConnection() *grpc.ClientConn {
once.Do(func() {
var err error
clientConn, err = grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second,
Timeout: 5 * time.Second,
PermitWithoutStream: true,
}),
)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
})
return clientConn
}
2. 超时控制
// 设置合理的超时时间
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.CreateUser(ctx, req)
3. 错误处理
// 详细的错误处理
if err != nil {
if st, ok := status.FromError(err); ok {
switch st.Code() {
case codes.NotFound:
// 处理资源不存在
case codes.InvalidArgument:
// 处理参数错误
case codes.Unavailable:
// 处理服务不可用,可能需要重试
default:
// 处理其他错误
}
}
}
总结
通过本章的学习,您应该已经掌握了:
- 客户端架构:连接管理、配置选项、调用模式
- 连接池管理:连接复用、健康检查、故障恢复
- 重试机制:指数退避、熔断器、错误分类
- 负载均衡:策略配置、服务发现、动态更新
- 最佳实践:超时控制、错误处理、资源管理
在下一章中,我们将学习 gRPC 的流式处理,包括四种流式模式的深入应用和性能优化。