概述

微服务架构虽然带来了灵活性和可扩展性,但也引入了新的性能挑战。本章将深入探讨微服务性能优化的各个方面,包括应用层优化、数据库优化、网络优化、缓存策略、负载均衡等关键技术。

性能优化挑战

# 性能挑战概览
performance_challenges:
  network_latency:
    description: "服务间网络调用延迟"
    impact: "响应时间增加"
    solutions:
      - "服务合并"
      - "异步处理"
      - "缓存策略"
      - "连接池优化"
  
  resource_overhead:
    description: "多服务资源开销"
    impact: "资源利用率低"
    solutions:
      - "容器优化"
      - "资源限制"
      - "垂直扩展"
      - "服务合并"
  
  data_consistency:
    description: "分布式数据一致性"
    impact: "性能与一致性权衡"
    solutions:
      - "最终一致性"
      - "CQRS模式"
      - "事件驱动"
      - "缓存策略"
  
  monitoring_complexity:
    description: "分布式监控复杂性"
    impact: "问题定位困难"
    solutions:
      - "分布式追踪"
      - "集中日志"
      - "指标聚合"
      - "APM工具"

性能优化架构

# 性能优化架构概览
performance_architecture:
  application_layer:
    components:
      - name: "代码优化"
        technologies: ["算法优化", "内存管理", "并发处理", "资源池化"]
      - name: "框架优化"
        technologies: ["Go Gin", "FastHTTP", "gRPC", "WebSocket"]
      - name: "编译优化"
        technologies: ["编译参数", "静态链接", "代码分析", "性能剖析"]
  
  data_layer:
    components:
      - name: "数据库优化"
        technologies: ["索引优化", "查询优化", "连接池", "分库分表"]
      - name: "缓存策略"
        technologies: ["Redis", "Memcached", "本地缓存", "CDN"]
      - name: "数据访问"
        technologies: ["ORM优化", "批量操作", "预加载", "懒加载"]
  
  network_layer:
    components:
      - name: "网络优化"
        technologies: ["HTTP/2", "gRPC", "连接复用", "压缩传输"]
      - name: "负载均衡"
        technologies: ["Nginx", "HAProxy", "Istio", "云负载均衡"]
      - name: "服务网格"
        technologies: ["Istio", "Linkerd", "Consul Connect", "流量管理"]
  
  infrastructure_layer:
    components:
      - name: "容器优化"
        technologies: ["Docker", "资源限制", "镜像优化", "多阶段构建"]
      - name: "编排优化"
        technologies: ["Kubernetes", "HPA", "VPA", "节点亲和性"]
      - name: "云原生"
        technologies: ["Serverless", "边缘计算", "弹性伸缩", "成本优化"]

应用层性能优化

Go语言性能优化

// performance/optimizer.go
package performance

import (
	"context"
	"runtime"
	"sync"
	"time"
	"unsafe"
)

// MemoryPool 内存池
type MemoryPool struct {
	pool sync.Pool
	size int
}

// NewMemoryPool 创建内存池
func NewMemoryPool(size int) *MemoryPool {
	return &MemoryPool{
		pool: sync.Pool{
			New: func() interface{} {
				return make([]byte, size)
			},
		},
		size: size,
	}
}

// Get 获取内存块
func (mp *MemoryPool) Get() []byte {
	return mp.pool.Get().([]byte)
}

// Put 归还内存块
func (mp *MemoryPool) Put(buf []byte) {
	if cap(buf) == mp.size {
		mp.pool.Put(buf[:0])
	}
}

// StringToBytes 零拷贝字符串转字节
func StringToBytes(s string) []byte {
	return *(*[]byte)(unsafe.Pointer(
		&struct {
			string
			Cap int
		}{s, len(s)},
	))
}

// BytesToString 零拷贝字节转字符串
func BytesToString(b []byte) string {
	return *(*string)(unsafe.Pointer(&b))
}

// WorkerPool 工作池
type WorkerPool struct {
	workerCount int
	jobQueue    chan Job
	workers     []*Worker
	ctx         context.Context
	cancel      context.CancelFunc
	wg          sync.WaitGroup
}

// Job 工作任务
type Job interface {
	Execute() error
}

// Worker 工作者
type Worker struct {
	id       int
	jobQueue chan Job
	quit     chan bool
}

// NewWorkerPool 创建工作池
func NewWorkerPool(workerCount, queueSize int) *WorkerPool {
	ctx, cancel := context.WithCancel(context.Background())
	return &WorkerPool{
		workerCount: workerCount,
		jobQueue:    make(chan Job, queueSize),
		workers:     make([]*Worker, workerCount),
		ctx:         ctx,
		cancel:      cancel,
	}
}

// Start 启动工作池
func (wp *WorkerPool) Start() {
	for i := 0; i < wp.workerCount; i++ {
		worker := &Worker{
			id:       i,
			jobQueue: wp.jobQueue,
			quit:     make(chan bool),
		}
		wp.workers[i] = worker
		wp.wg.Add(1)
		go wp.runWorker(worker)
	}
}

// runWorker 运行工作者
func (wp *WorkerPool) runWorker(worker *Worker) {
	defer wp.wg.Done()
	for {
		select {
		case job := <-worker.jobQueue:
			if job != nil {
				job.Execute()
			}
		case <-worker.quit:
			return
		case <-wp.ctx.Done():
			return
		}
	}
}

// Submit 提交任务
func (wp *WorkerPool) Submit(job Job) bool {
	select {
	case wp.jobQueue <- job:
		return true
	default:
		return false
	}
}

// Stop 停止工作池
func (wp *WorkerPool) Stop() {
	wp.cancel()
	for _, worker := range wp.workers {
		worker.quit <- true
	}
	wp.wg.Wait()
	close(wp.jobQueue)
}

// PerformanceMonitor 性能监控器
type PerformanceMonitor struct {
	metrics map[string]*Metric
	mu      sync.RWMutex
}

// Metric 性能指标
type Metric struct {
	Name      string
	Value     float64
	Timestamp time.Time
	Tags      map[string]string
}

// NewPerformanceMonitor 创建性能监控器
func NewPerformanceMonitor() *PerformanceMonitor {
	return &PerformanceMonitor{
		metrics: make(map[string]*Metric),
	}
}

// RecordMetric 记录指标
func (pm *PerformanceMonitor) RecordMetric(name string, value float64, tags map[string]string) {
	pm.mu.Lock()
	defer pm.mu.Unlock()
	
	pm.metrics[name] = &Metric{
		Name:      name,
		Value:     value,
		Timestamp: time.Now(),
		Tags:      tags,
	}
}

// GetMetric 获取指标
func (pm *PerformanceMonitor) GetMetric(name string) (*Metric, bool) {
	pm.mu.RLock()
	defer pm.mu.RUnlock()
	
	metric, exists := pm.metrics[name]
	return metric, exists
}

// GetMemoryStats 获取内存统计
func (pm *PerformanceMonitor) GetMemoryStats() runtime.MemStats {
	var m runtime.MemStats
	runtime.ReadMemStats(&m)
	return m
}

// GetGoroutineCount 获取协程数量
func (pm *PerformanceMonitor) GetGoroutineCount() int {
	return runtime.NumGoroutine()
}

// OptimizedHTTPClient 优化的HTTP客户端
type OptimizedHTTPClient struct {
	client *http.Client
	pool   *MemoryPool
}

// NewOptimizedHTTPClient 创建优化的HTTP客户端
func NewOptimizedHTTPClient() *OptimizedHTTPClient {
	transport := &http.Transport{
		MaxIdleConns:        100,
		MaxIdleConnsPerHost: 10,
		IdleConnTimeout:     90 * time.Second,
		DisableCompression:  false,
		ForceAttemptHTTP2:   true,
	}
	
	client := &http.Client{
		Transport: transport,
		Timeout:   30 * time.Second,
	}
	
	return &OptimizedHTTPClient{
		client: client,
		pool:   NewMemoryPool(4096),
	}
}

// Get 优化的GET请求
func (c *OptimizedHTTPClient) Get(url string) (*http.Response, error) {
	req, err := http.NewRequest("GET", url, nil)
	if err != nil {
		return nil, err
	}
	
	// 设置优化的请求头
	req.Header.Set("Accept-Encoding", "gzip, deflate")
	req.Header.Set("Connection", "keep-alive")
	
	return c.client.Do(req)
}

缓存策略优化

多级缓存架构

// cache/manager.go
package cache

import (
	"context"
	"encoding/json"
	"fmt"
	"sync"
	"time"
	
	"github.com/go-redis/redis/v8"
	"github.com/patrickmn/go-cache"
)

// CacheLevel 缓存级别
type CacheLevel int

const (
	L1Cache CacheLevel = iota // 本地缓存
	L2Cache                   // Redis缓存
	L3Cache                   // 数据库缓存
)

// CacheManager 缓存管理器
type CacheManager struct {
	l1Cache    *cache.Cache
	redisClient *redis.Client
	config     *CacheConfig
	stats      *CacheStats
	mu         sync.RWMutex
}

// CacheConfig 缓存配置
type CacheConfig struct {
	L1TTL         time.Duration `yaml:"l1_ttl"`
	L2TTL         time.Duration `yaml:"l2_ttl"`
	L1MaxSize     int           `yaml:"l1_max_size"`
	RedisAddr     string        `yaml:"redis_addr"`
	RedisPassword string        `yaml:"redis_password"`
	RedisDB       int           `yaml:"redis_db"`
	Compression   bool          `yaml:"compression"`
}

// CacheStats 缓存统计
type CacheStats struct {
	L1Hits   int64
	L1Misses int64
	L2Hits   int64
	L2Misses int64
	L3Hits   int64
	L3Misses int64
	mu       sync.RWMutex
}

// CacheItem 缓存项
type CacheItem struct {
	Key       string
	Value     interface{}
	TTL       time.Duration
	Level     CacheLevel
	Timestamp time.Time
}

// NewCacheManager 创建缓存管理器
func NewCacheManager(config *CacheConfig) (*CacheManager, error) {
	// 创建本地缓存
	l1Cache := cache.New(config.L1TTL, 10*time.Minute)
	
	// 创建Redis客户端
	redisClient := redis.NewClient(&redis.Options{
		Addr:     config.RedisAddr,
		Password: config.RedisPassword,
		DB:       config.RedisDB,
		PoolSize: 10,
	})
	
	// 测试Redis连接
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	
	_, err := redisClient.Ping(ctx).Result()
	if err != nil {
		return nil, fmt.Errorf("failed to connect to Redis: %w", err)
	}
	
	cm := &CacheManager{
		l1Cache:     l1Cache,
		redisClient: redisClient,
		config:      config,
		stats:       &CacheStats{},
	}
	
	return cm, nil
}

// Get 获取缓存值
func (cm *CacheManager) Get(ctx context.Context, key string) (interface{}, bool) {
	// L1缓存查找
	if value, found := cm.l1Cache.Get(key); found {
		cm.updateStats(L1Cache, true)
		return value, true
	}
	cm.updateStats(L1Cache, false)
	
	// L2缓存查找
	value, err := cm.redisClient.Get(ctx, key).Result()
	if err == nil {
		cm.updateStats(L2Cache, true)
		
		// 反序列化
		var result interface{}
		if err := json.Unmarshal([]byte(value), &result); err == nil {
			// 回填L1缓存
			cm.l1Cache.Set(key, result, cm.config.L1TTL)
			return result, true
		}
	}
	cm.updateStats(L2Cache, false)
	
	return nil, false
}

// Set 设置缓存值
func (cm *CacheManager) Set(ctx context.Context, key string, value interface{}, ttl time.Duration) error {
	// 设置L1缓存
	cm.l1Cache.Set(key, value, ttl)
	
	// 序列化并设置L2缓存
	data, err := json.Marshal(value)
	if err != nil {
		return fmt.Errorf("failed to marshal value: %w", err)
	}
	
	err = cm.redisClient.Set(ctx, key, data, ttl).Err()
	if err != nil {
		return fmt.Errorf("failed to set Redis cache: %w", err)
	}
	
	return nil
}

// Delete 删除缓存
func (cm *CacheManager) Delete(ctx context.Context, key string) error {
	// 删除L1缓存
	cm.l1Cache.Delete(key)
	
	// 删除L2缓存
	err := cm.redisClient.Del(ctx, key).Err()
	if err != nil {
		return fmt.Errorf("failed to delete Redis cache: %w", err)
	}
	
	return nil
}

// GetOrSet 获取或设置缓存
func (cm *CacheManager) GetOrSet(ctx context.Context, key string, ttl time.Duration, fn func() (interface{}, error)) (interface{}, error) {
	// 尝试获取缓存
	if value, found := cm.Get(ctx, key); found {
		return value, nil
	}
	
	// 缓存未命中,执行函数获取数据
	value, err := fn()
	if err != nil {
		return nil, err
	}
	
	// 设置缓存
	if err := cm.Set(ctx, key, value, ttl); err != nil {
		// 记录错误但不影响返回结果
		fmt.Printf("Failed to set cache: %v\n", err)
	}
	
	return value, nil
}

// BatchGet 批量获取
func (cm *CacheManager) BatchGet(ctx context.Context, keys []string) (map[string]interface{}, error) {
	result := make(map[string]interface{})
	missedKeys := make([]string, 0)
	
	// 先从L1缓存获取
	for _, key := range keys {
		if value, found := cm.l1Cache.Get(key); found {
			result[key] = value
			cm.updateStats(L1Cache, true)
		} else {
			missedKeys = append(missedKeys, key)
			cm.updateStats(L1Cache, false)
		}
	}
	
	// 从L2缓存获取未命中的键
	if len(missedKeys) > 0 {
		values, err := cm.redisClient.MGet(ctx, missedKeys...).Result()
		if err != nil {
			return result, err
		}
		
		for i, value := range values {
			if value != nil {
				key := missedKeys[i]
				var unmarshaled interface{}
				if err := json.Unmarshal([]byte(value.(string)), &unmarshaled); err == nil {
					result[key] = unmarshaled
					// 回填L1缓存
					cm.l1Cache.Set(key, unmarshaled, cm.config.L1TTL)
					cm.updateStats(L2Cache, true)
				} else {
					cm.updateStats(L2Cache, false)
				}
			} else {
				cm.updateStats(L2Cache, false)
			}
		}
	}
	
	return result, nil
}

// BatchSet 批量设置
func (cm *CacheManager) BatchSet(ctx context.Context, items map[string]interface{}, ttl time.Duration) error {
	// 设置L1缓存
	for key, value := range items {
		cm.l1Cache.Set(key, value, ttl)
	}
	
	// 批量设置L2缓存
	pipe := cm.redisClient.Pipeline()
	for key, value := range items {
		data, err := json.Marshal(value)
		if err != nil {
			continue
		}
		pipe.Set(ctx, key, data, ttl)
	}
	
	_, err := pipe.Exec(ctx)
	return err
}

// updateStats 更新统计信息
func (cm *CacheManager) updateStats(level CacheLevel, hit bool) {
	cm.stats.mu.Lock()
	defer cm.stats.mu.Unlock()
	
	switch level {
	case L1Cache:
		if hit {
			cm.stats.L1Hits++
		} else {
			cm.stats.L1Misses++
		}
	case L2Cache:
		if hit {
			cm.stats.L2Hits++
		} else {
			cm.stats.L2Misses++
		}
	case L3Cache:
		if hit {
			cm.stats.L3Hits++
		} else {
			cm.stats.L3Misses++
		}
	}
}

// GetStats 获取统计信息
func (cm *CacheManager) GetStats() CacheStats {
	cm.stats.mu.RLock()
	defer cm.stats.mu.RUnlock()
	return *cm.stats
}

// GetHitRate 获取命中率
func (cm *CacheManager) GetHitRate() map[string]float64 {
	stats := cm.GetStats()
	
	l1Total := stats.L1Hits + stats.L1Misses
	l2Total := stats.L2Hits + stats.L2Misses
	l3Total := stats.L3Hits + stats.L3Misses
	
	result := make(map[string]float64)
	
	if l1Total > 0 {
		result["l1"] = float64(stats.L1Hits) / float64(l1Total)
	}
	if l2Total > 0 {
		result["l2"] = float64(stats.L2Hits) / float64(l2Total)
	}
	if l3Total > 0 {
		result["l3"] = float64(stats.L3Hits) / float64(l3Total)
	}
	
	return result
}

// Close 关闭缓存管理器
func (cm *CacheManager) Close() error {
	return cm.redisClient.Close()
}

// CacheMiddleware 缓存中间件
func (cm *CacheManager) CacheMiddleware(ttl time.Duration) gin.HandlerFunc {
	return func(c *gin.Context) {
		// 只缓存GET请求
		if c.Request.Method != "GET" {
			c.Next()
			return
		}
		
		// 生成缓存键
		cacheKey := fmt.Sprintf("http:%s:%s", c.Request.Method, c.Request.URL.Path)
		if c.Request.URL.RawQuery != "" {
			cacheKey += ":" + c.Request.URL.RawQuery
		}
		
		// 尝试从缓存获取
		if cached, found := cm.Get(c.Request.Context(), cacheKey); found {
			if response, ok := cached.(map[string]interface{}); ok {
				// 设置响应头
				if headers, ok := response["headers"].(map[string]string); ok {
					for k, v := range headers {
						c.Header(k, v)
					}
				}
				
				// 设置状态码和响应体
				if status, ok := response["status"].(float64); ok {
					c.JSON(int(status), response["body"])
					return
				}
			}
		}
		
		// 创建响应写入器
		writer := &responseWriter{
			ResponseWriter: c.Writer,
			body:           make([]byte, 0),
			headers:        make(map[string]string),
		}
		c.Writer = writer
		
		// 执行请求
		c.Next()
		
		// 缓存响应
		if writer.status >= 200 && writer.status < 300 {
			response := map[string]interface{}{
				"status":  writer.status,
				"headers": writer.headers,
				"body":    string(writer.body),
			}
			cm.Set(c.Request.Context(), cacheKey, response, ttl)
		}
	}
}

// responseWriter 响应写入器
type responseWriter struct {
	gin.ResponseWriter
	body    []byte
	headers map[string]string
	status  int
}

func (w *responseWriter) Write(data []byte) (int, error) {
	w.body = append(w.body, data...)
	return w.ResponseWriter.Write(data)
}

func (w *responseWriter) WriteHeader(status int) {
	w.status = status
	w.ResponseWriter.WriteHeader(status)
}

func (w *responseWriter) Header() http.Header {
	return w.ResponseWriter.Header()
}

网络性能优化

HTTP/2 和 gRPC 优化

// network/optimization.go
package network

import (
	"context"
	"crypto/tls"
	"fmt"
	"net"
	"net/http"
	"sync"
	"time"
	
	"golang.org/x/net/http2"
	"google.golang.org/grpc"
	"google.golang.org/grpc/keepalive"
	"google.golang.org/grpc/credentials"
)

// HTTPServerConfig HTTP服务器配置
type HTTPServerConfig struct {
	Addr              string        `yaml:"addr"`
	ReadTimeout       time.Duration `yaml:"read_timeout"`
	WriteTimeout      time.Duration `yaml:"write_timeout"`
	IdleTimeout       time.Duration `yaml:"idle_timeout"`
	MaxHeaderBytes    int           `yaml:"max_header_bytes"`
	EnableHTTP2       bool          `yaml:"enable_http2"`
	EnableCompression bool          `yaml:"enable_compression"`
	TLSCertFile       string        `yaml:"tls_cert_file"`
	TLSKeyFile        string        `yaml:"tls_key_file"`
}

// OptimizedHTTPServer 优化的HTTP服务器
type OptimizedHTTPServer struct {
	server *http.Server
	config *HTTPServerConfig
	mu     sync.RWMutex
}

// NewOptimizedHTTPServer 创建优化的HTTP服务器
func NewOptimizedHTTPServer(config *HTTPServerConfig, handler http.Handler) *OptimizedHTTPServer {
	server := &http.Server{
		Addr:           config.Addr,
		Handler:        handler,
		ReadTimeout:    config.ReadTimeout,
		WriteTimeout:   config.WriteTimeout,
		IdleTimeout:    config.IdleTimeout,
		MaxHeaderBytes: config.MaxHeaderBytes,
	}
	
	// 启用HTTP/2
	if config.EnableHTTP2 {
		http2.ConfigureServer(server, &http2.Server{
			MaxConcurrentStreams: 250,
			MaxReadFrameSize:     16384,
			IdleTimeout:          30 * time.Second,
		})
	}
	
	return &OptimizedHTTPServer{
		server: server,
		config: config,
	}
}

// Start 启动服务器
func (s *OptimizedHTTPServer) Start() error {
	if s.config.TLSCertFile != "" && s.config.TLSKeyFile != "" {
		return s.server.ListenAndServeTLS(s.config.TLSCertFile, s.config.TLSKeyFile)
	}
	return s.server.ListenAndServe()
}

// Stop 停止服务器
func (s *OptimizedHTTPServer) Stop(ctx context.Context) error {
	return s.server.Shutdown(ctx)
}

// GRPCServerConfig gRPC服务器配置
type GRPCServerConfig struct {
	Addr                    string        `yaml:"addr"`
	MaxRecvMsgSize          int           `yaml:"max_recv_msg_size"`
	MaxSendMsgSize          int           `yaml:"max_send_msg_size"`
	MaxConcurrentStreams    uint32        `yaml:"max_concurrent_streams"`
	KeepaliveTime           time.Duration `yaml:"keepalive_time"`
	KeepaliveTimeout        time.Duration `yaml:"keepalive_timeout"`
	KeepaliveEnforcementMinTime time.Duration `yaml:"keepalive_enforcement_min_time"`
	TLSCertFile             string        `yaml:"tls_cert_file"`
	TLSKeyFile              string        `yaml:"tls_key_file"`
}

// OptimizedGRPCServer 优化的gRPC服务器
type OptimizedGRPCServer struct {
	server   *grpc.Server
	listener net.Listener
	config   *GRPCServerConfig
}

// NewOptimizedGRPCServer 创建优化的gRPC服务器
func NewOptimizedGRPCServer(config *GRPCServerConfig) (*OptimizedGRPCServer, error) {
	listener, err := net.Listen("tcp", config.Addr)
	if err != nil {
		return nil, fmt.Errorf("failed to listen: %w", err)
	}
	
	// gRPC服务器选项
	opts := []grpc.ServerOption{
		grpc.MaxRecvMsgSize(config.MaxRecvMsgSize),
		grpc.MaxSendMsgSize(config.MaxSendMsgSize),
		grpc.MaxConcurrentStreams(config.MaxConcurrentStreams),
		grpc.KeepaliveParams(keepalive.ServerParameters{
			Time:    config.KeepaliveTime,
			Timeout: config.KeepaliveTimeout,
		}),
		grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
			MinTime:             config.KeepaliveEnforcementMinTime,
			PermitWithoutStream: true,
		}),
	}
	
	// TLS配置
	if config.TLSCertFile != "" && config.TLSKeyFile != "" {
		creds, err := credentials.NewServerTLSFromFile(config.TLSCertFile, config.TLSKeyFile)
		if err != nil {
			return nil, fmt.Errorf("failed to load TLS credentials: %w", err)
		}
		opts = append(opts, grpc.Creds(creds))
	}
	
	server := grpc.NewServer(opts...)
	
	return &OptimizedGRPCServer{
		server:   server,
		listener: listener,
		config:   config,
	}, nil
}

// GetServer 获取gRPC服务器
func (s *OptimizedGRPCServer) GetServer() *grpc.Server {
	return s.server
}

// Start 启动服务器
func (s *OptimizedGRPCServer) Start() error {
	return s.server.Serve(s.listener)
}

// Stop 停止服务器
func (s *OptimizedGRPCServer) Stop() {
	s.server.GracefulStop()
}

// ConnectionPool 连接池
type ConnectionPool struct {
	connections chan *grpc.ClientConn
	target      string
	maxSize     int
	mu          sync.RWMutex
	closed      bool
}

// NewConnectionPool 创建连接池
func NewConnectionPool(target string, maxSize int) (*ConnectionPool, error) {
	pool := &ConnectionPool{
		connections: make(chan *grpc.ClientConn, maxSize),
		target:      target,
		maxSize:     maxSize,
	}
	
	// 预创建连接
	for i := 0; i < maxSize; i++ {
		conn, err := grpc.Dial(target, grpc.WithInsecure(),
			grpc.WithKeepaliveParams(keepalive.ClientParameters{
				Time:                10 * time.Second,
				Timeout:             3 * time.Second,
				PermitWithoutStream: true,
			}),
		)
		if err != nil {
			return nil, fmt.Errorf("failed to create connection: %w", err)
		}
		pool.connections <- conn
	}
	
	return pool, nil
}

// Get 获取连接
func (p *ConnectionPool) Get() (*grpc.ClientConn, error) {
	p.mu.RLock()
	defer p.mu.RUnlock()
	
	if p.closed {
		return nil, fmt.Errorf("connection pool is closed")
	}
	
	select {
	case conn := <-p.connections:
		return conn, nil
	default:
		// 池中没有可用连接,创建新连接
		return grpc.Dial(p.target, grpc.WithInsecure())
	}
}

// Put 归还连接
func (p *ConnectionPool) Put(conn *grpc.ClientConn) {
	p.mu.RLock()
	defer p.mu.RUnlock()
	
	if p.closed {
		conn.Close()
		return
	}
	
	select {
	case p.connections <- conn:
		// 成功归还到池中
	default:
		// 池已满,关闭连接
		conn.Close()
	}
}

// Close 关闭连接池
func (p *ConnectionPool) Close() {
	p.mu.Lock()
	defer p.mu.Unlock()
	
	if p.closed {
		return
	}
	
	p.closed = true
	close(p.connections)
	
	// 关闭所有连接
	for conn := range p.connections {
		conn.Close()
	}
}

// CompressionMiddleware 压缩中间件
func CompressionMiddleware() gin.HandlerFunc {
	return gin.HandlerFunc(func(c *gin.Context) {
		// 检查客户端是否支持压缩
		acceptEncoding := c.GetHeader("Accept-Encoding")
		if !strings.Contains(acceptEncoding, "gzip") {
			c.Next()
			return
		}
		
		// 设置压缩响应头
		c.Header("Content-Encoding", "gzip")
		c.Header("Vary", "Accept-Encoding")
		
		// 创建gzip写入器
		gzipWriter := gzip.NewWriter(c.Writer)
		defer gzipWriter.Close()
		
		// 包装响应写入器
		c.Writer = &gzipResponseWriter{
			ResponseWriter: c.Writer,
			gzipWriter:     gzipWriter,
		}
		
		c.Next()
	})
}

// gzipResponseWriter gzip响应写入器
type gzipResponseWriter struct {
	gin.ResponseWriter
	gzipWriter *gzip.Writer
}

func (w *gzipResponseWriter) Write(data []byte) (int, error) {
	return w.gzipWriter.Write(data)
}

func (w *gzipResponseWriter) WriteString(s string) (int, error) {
	return w.gzipWriter.Write([]byte(s))
}

使用示例

// main.go
package main

import (
	"context"
	"log"
	"time"
	
	"github.com/gin-gonic/gin"
)

func main() {
	// 缓存配置
	cacheConfig := &cache.CacheConfig{
		L1TTL:         5 * time.Minute,
		L2TTL:         30 * time.Minute,
		L1MaxSize:     1000,
		RedisAddr:     "localhost:6379",
		RedisPassword: "",
		RedisDB:       0,
		Compression:   true,
	}
	
	// 创建缓存管理器
	cacheManager, err := cache.NewCacheManager(cacheConfig)
	if err != nil {
		log.Fatal("Failed to create cache manager:", err)
	}
	defer cacheManager.Close()
	
	// HTTP服务器配置
	httpConfig := &network.HTTPServerConfig{
		Addr:              ":8080",
		ReadTimeout:       30 * time.Second,
		WriteTimeout:      30 * time.Second,
		IdleTimeout:       60 * time.Second,
		MaxHeaderBytes:    1 << 20, // 1MB
		EnableHTTP2:       true,
		EnableCompression: true,
	}
	
	// 创建Gin路由
	router := gin.Default()
	
	// 添加中间件
	router.Use(network.CompressionMiddleware())
	router.Use(cacheManager.CacheMiddleware(10 * time.Minute))
	
	// 添加路由
	router.GET("/api/users/:id", func(c *gin.Context) {
		userID := c.Param("id")
		
		// 使用缓存获取用户信息
		user, err := cacheManager.GetOrSet(
			c.Request.Context(),
			"user:"+userID,
			15*time.Minute,
			func() (interface{}, error) {
				// 模拟从数据库获取用户
				return map[string]interface{}{
					"id":   userID,
					"name": "User " + userID,
					"email": "user" + userID + "@example.com",
				}, nil
			},
		)
		
		if err != nil {
			c.JSON(500, gin.H{"error": err.Error()})
			return
		}
		
		c.JSON(200, user)
	})
	
	// 创建优化的HTTP服务器
	server := network.NewOptimizedHTTPServer(httpConfig, router)
	
	// 启动服务器
	log.Println("Starting server on", httpConfig.Addr)
	if err := server.Start(); err != nil {
		log.Fatal("Failed to start server:", err)
	}
}

性能监控与调优

性能监控指标

// monitoring/performance.go
package monitoring

import (
	"context"
	"runtime"
	"sync"
	"time"
	
	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promauto"
)

// PerformanceMonitor 性能监控器
type PerformanceMonitor struct {
	// 系统指标
	goroutineCount prometheus.Gauge
	memoryUsage    prometheus.Gauge
	gcDuration     prometheus.Histogram
	cpuUsage       prometheus.Gauge
	
	// 应用指标
	requestDuration prometheus.HistogramVec
	requestCount    prometheus.CounterVec
	errorCount      prometheus.CounterVec
	
	// 数据库指标
	dbConnections   prometheus.Gauge
	dbQueryDuration prometheus.HistogramVec
	
	// 缓存指标
	cacheHitRate    prometheus.GaugeVec
	cacheOperations prometheus.CounterVec
	
	mu sync.RWMutex
}

// NewPerformanceMonitor 创建性能监控器
func NewPerformanceMonitor() *PerformanceMonitor {
	return &PerformanceMonitor{
		// 系统指标
		goroutineCount: promauto.NewGauge(prometheus.GaugeOpts{
			Name: "go_goroutines_current",
			Help: "Current number of goroutines",
		}),
		memoryUsage: promauto.NewGauge(prometheus.GaugeOpts{
			Name: "go_memory_usage_bytes",
			Help: "Current memory usage in bytes",
		}),
		gcDuration: promauto.NewHistogram(prometheus.HistogramOpts{
			Name:    "go_gc_duration_seconds",
			Help:    "GC duration in seconds",
			Buckets: prometheus.DefBuckets,
		}),
		cpuUsage: promauto.NewGauge(prometheus.GaugeOpts{
			Name: "go_cpu_usage_percent",
			Help: "Current CPU usage percentage",
		}),
		
		// 应用指标
		requestDuration: promauto.NewHistogramVec(
			prometheus.HistogramOpts{
				Name:    "http_request_duration_seconds",
				Help:    "HTTP request duration in seconds",
				Buckets: []float64{0.001, 0.01, 0.1, 0.5, 1, 2.5, 5, 10},
			},
			[]string{"method", "endpoint", "status"},
		),
		requestCount: promauto.NewCounterVec(
			prometheus.CounterOpts{
				Name: "http_requests_total",
				Help: "Total number of HTTP requests",
			},
			[]string{"method", "endpoint", "status"},
		),
		errorCount: promauto.NewCounterVec(
			prometheus.CounterOpts{
				Name: "http_errors_total",
				Help: "Total number of HTTP errors",
			},
			[]string{"method", "endpoint", "error_type"},
		),
		
		// 数据库指标
		dbConnections: promauto.NewGauge(prometheus.GaugeOpts{
			Name: "db_connections_active",
			Help: "Number of active database connections",
		}),
		dbQueryDuration: promauto.NewHistogramVec(
			prometheus.HistogramOpts{
				Name:    "db_query_duration_seconds",
				Help:    "Database query duration in seconds",
				Buckets: []float64{0.001, 0.01, 0.1, 0.5, 1, 2, 5},
			},
			[]string{"operation", "table"},
		),
		
		// 缓存指标
		cacheHitRate: promauto.NewGaugeVec(
			prometheus.GaugeOpts{
				Name: "cache_hit_rate",
				Help: "Cache hit rate percentage",
			},
			[]string{"cache_level"},
		),
		cacheOperations: promauto.NewCounterVec(
			prometheus.CounterOpts{
				Name: "cache_operations_total",
				Help: "Total number of cache operations",
			},
			[]string{"operation", "cache_level", "result"},
		),
	}
}

// StartSystemMetricsCollection 开始收集系统指标
func (pm *PerformanceMonitor) StartSystemMetricsCollection(ctx context.Context) {
	ticker := time.NewTicker(10 * time.Second)
	defer ticker.Stop()
	
	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			pm.collectSystemMetrics()
		}
	}
}

// collectSystemMetrics 收集系统指标
func (pm *PerformanceMonitor) collectSystemMetrics() {
	var m runtime.MemStats
	runtime.ReadMemStats(&m)
	
	// 更新指标
	pm.goroutineCount.Set(float64(runtime.NumGoroutine()))
	pm.memoryUsage.Set(float64(m.Alloc))
}

// RecordRequest 记录请求指标
func (pm *PerformanceMonitor) RecordRequest(method, endpoint, status string, duration time.Duration) {
	pm.requestDuration.WithLabelValues(method, endpoint, status).Observe(duration.Seconds())
	pm.requestCount.WithLabelValues(method, endpoint, status).Inc()
}

// RecordError 记录错误指标
func (pm *PerformanceMonitor) RecordError(method, endpoint, errorType string) {
	pm.errorCount.WithLabelValues(method, endpoint, errorType).Inc()
}

// RecordDBQuery 记录数据库查询指标
func (pm *PerformanceMonitor) RecordDBQuery(operation, table string, duration time.Duration) {
	pm.dbQueryDuration.WithLabelValues(operation, table).Observe(duration.Seconds())
}

// UpdateDBConnections 更新数据库连接数
func (pm *PerformanceMonitor) UpdateDBConnections(count int) {
	pm.dbConnections.Set(float64(count))
}

// UpdateCacheHitRate 更新缓存命中率
func (pm *PerformanceMonitor) UpdateCacheHitRate(level string, hitRate float64) {
	pm.cacheHitRate.WithLabelValues(level).Set(hitRate)
}

// RecordCacheOperation 记录缓存操作
func (pm *PerformanceMonitor) RecordCacheOperation(operation, level, result string) {
	pm.cacheOperations.WithLabelValues(operation, level, result).Inc()
}

// PerformanceTuner 性能调优器
type PerformanceTuner struct {
	monitor *PerformanceMonitor
	config  *TuningConfig
	mu      sync.RWMutex
}

// TuningConfig 调优配置
type TuningConfig struct {
	MaxGoroutines     int           `yaml:"max_goroutines"`
	GCTargetPercent   int           `yaml:"gc_target_percent"`
	MaxMemoryUsage    int64         `yaml:"max_memory_usage"`
	TuningInterval    time.Duration `yaml:"tuning_interval"`
	EnableAutoTuning  bool          `yaml:"enable_auto_tuning"`
}

// NewPerformanceTuner 创建性能调优器
func NewPerformanceTuner(monitor *PerformanceMonitor, config *TuningConfig) *PerformanceTuner {
	return &PerformanceTuner{
		monitor: monitor,
		config:  config,
	}
}

// StartAutoTuning 开始自动调优
func (pt *PerformanceTuner) StartAutoTuning(ctx context.Context) {
	if !pt.config.EnableAutoTuning {
		return
	}
	
	ticker := time.NewTicker(pt.config.TuningInterval)
	defer ticker.Stop()
	
	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			pt.performTuning()
		}
	}
}

// performTuning 执行调优
func (pt *PerformanceTuner) performTuning() {
	var m runtime.MemStats
	runtime.ReadMemStats(&m)
	
	// 内存调优
	if int64(m.Alloc) > pt.config.MaxMemoryUsage {
		runtime.GC()
		runtime.GC() // 强制执行两次GC
	}
	
	// GC调优
	if m.GCCPUFraction > 0.1 { // GC占用CPU超过10%
		runtime.SetGCPercent(pt.config.GCTargetPercent + 50)
	} else if m.GCCPUFraction < 0.01 { // GC占用CPU低于1%
		runtime.SetGCPercent(pt.config.GCTargetPercent - 20)
	}
	
	// Goroutine数量检查
	if runtime.NumGoroutine() > pt.config.MaxGoroutines {
		// 记录警告日志
		log.Printf("Warning: High goroutine count: %d", runtime.NumGoroutine())
	}
}

性能优化最佳实践

1. 应用层优化原则

  • 减少内存分配:使用对象池、避免不必要的字符串拼接
  • 优化算法复杂度:选择合适的数据结构和算法
  • 并发控制:合理使用goroutine,避免goroutine泄漏
  • 资源复用:连接池、缓存、预分配切片

2. 数据库优化策略

  • 索引优化:合理创建和使用索引
  • 查询优化:避免N+1查询,使用批量操作
  • 连接池管理:合理配置连接池大小
  • 读写分离:分离读写操作,提高并发性能

3. 缓存优化策略

  • 多级缓存:L1本地缓存 + L2分布式缓存
  • 缓存预热:系统启动时预加载热点数据
  • 缓存更新策略:Write-through、Write-back、Write-around
  • 缓存穿透防护:布隆过滤器、空值缓存

4. 网络优化策略

  • HTTP/2:启用HTTP/2提高传输效率
  • 压缩:启用gzip压缩减少传输数据量
  • Keep-Alive:复用TCP连接
  • CDN:使用CDN加速静态资源访问

5. 监控与调优

  • 关键指标监控:延迟、吞吐量、错误率、资源使用率
  • 性能基准测试:定期进行压力测试
  • 自动化调优:基于监控数据自动调整参数
  • 容量规划:根据业务增长预测资源需求

本章总结

本章深入探讨了微服务性能优化与调优的各个方面:

主要内容回顾

  1. 性能优化挑战

    • 网络延迟、资源开销、数据一致性、监控复杂性
    • 性能优化架构概览
  2. 应用层性能优化

    • Go语言性能优化技术
    • 并发优化模式(限流器、熔断器)
  3. 数据库性能优化

    • 数据库连接池优化
    • 查询性能优化策略
  4. 缓存策略优化

    • 多级缓存架构设计
    • 缓存管理器实现
    • 缓存中间件开发
  5. 网络性能优化

    • HTTP/2和gRPC优化
    • 连接池管理
    • 压缩中间件
  6. 性能监控与调优

    • 性能监控指标收集
    • 自动化性能调优

最佳实践要点

  • 系统性优化:从应用、数据、网络、基础设施多层面优化
  • 监控驱动:基于监控数据进行性能调优
  • 渐进式优化:先解决瓶颈,再进行全面优化
  • 自动化调优:减少人工干预,提高调优效率

技术要点

  • 内存池和对象复用减少GC压力
  • 多级缓存提高数据访问性能
  • HTTP/2和gRPC提升网络传输效率
  • 自动化监控和调优保障系统稳定性

下一章我们将探讨微服务的故障处理与恢复,学习如何构建高可用的微服务系统。

并发优化模式

// performance/concurrency.go
package performance

import (
	"context"
	"sync"
	"time"
)

// RateLimiter 限流器
type RateLimiter struct {
	limit    int
	interval time.Duration
	tokens   chan struct{}
	ticker   *time.Ticker
	ctx      context.Context
	cancel   context.CancelFunc
}

// NewRateLimiter 创建限流器
func NewRateLimiter(limit int, interval time.Duration) *RateLimiter {
	ctx, cancel := context.WithCancel(context.Background())
	rl := &RateLimiter{
		limit:    limit,
		interval: interval,
		tokens:   make(chan struct{}, limit),
		ticker:   time.NewTicker(interval),
		ctx:      ctx,
		cancel:   cancel,
	}
	
	// 初始化令牌
	for i := 0; i < limit; i++ {
		rl.tokens <- struct{}{}
	}
	
	go rl.refillTokens()
	return rl
}

// refillTokens 补充令牌
func (rl *RateLimiter) refillTokens() {
	for {
		select {
		case <-rl.ticker.C:
			select {
			case rl.tokens <- struct{}{}:
			default:
				// 令牌桶已满
			}
		case <-rl.ctx.Done():
			return
		}
	}
}

// Allow 检查是否允许请求
func (rl *RateLimiter) Allow() bool {
	select {
	case <-rl.tokens:
		return true
	default:
		return false
	}
}

// Wait 等待令牌
func (rl *RateLimiter) Wait(ctx context.Context) error {
	select {
	case <-rl.tokens:
		return nil
	case <-ctx.Done():
		return ctx.Err()
	}
}

// Stop 停止限流器
func (rl *RateLimiter) Stop() {
	rl.cancel()
	rl.ticker.Stop()
}

// CircuitBreaker 熔断器
type CircuitBreaker struct {
	name          string
	maxRequests   uint32
	interval      time.Duration
	timeout       time.Duration
	readyToTrip   func(counts Counts) bool
	onStateChange func(name string, from State, to State)
	
	mutex      sync.Mutex
	state      State
	generation uint64
	counts     Counts
	expiry     time.Time
}

// State 熔断器状态
type State int

const (
	StateClosed State = iota
	StateHalfOpen
	StateOpen
)

// Counts 计数器
type Counts struct {
	Requests         uint32
	TotalSuccesses   uint32
	TotalFailures    uint32
	ConsecutiveSuccesses uint32
	ConsecutiveFailures  uint32
}

// NewCircuitBreaker 创建熔断器
func NewCircuitBreaker(name string) *CircuitBreaker {
	cb := &CircuitBreaker{
		name:        name,
		maxRequests: 1,
		interval:    60 * time.Second,
		timeout:     60 * time.Second,
		readyToTrip: func(counts Counts) bool {
			return counts.ConsecutiveFailures > 5
		},
	}
	
	cb.toNewGeneration(time.Now())
	return cb
}

// Execute 执行函数
func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) {
	generation, err := cb.beforeRequest()
	if err != nil {
		return nil, err
	}
	
	defer func() {
		e := recover()
		if e != nil {
			cb.afterRequest(generation, false)
			panic(e)
		}
	}()
	
	result, err := req()
	cb.afterRequest(generation, err == nil)
	return result, err
}

// beforeRequest 请求前检查
func (cb *CircuitBreaker) beforeRequest() (uint64, error) {
	cb.mutex.Lock()
	defer cb.mutex.Unlock()
	
	now := time.Now()
	state, generation := cb.currentState(now)
	
	if state == StateOpen {
		return generation, ErrOpenState
	} else if state == StateHalfOpen && cb.counts.Requests >= cb.maxRequests {
		return generation, ErrTooManyRequests
	}
	
	cb.counts.onRequest()
	return generation, nil
}

// afterRequest 请求后处理
func (cb *CircuitBreaker) afterRequest(before uint64, success bool) {
	cb.mutex.Lock()
	defer cb.mutex.Unlock()
	
	now := time.Now()
	state, generation := cb.currentState(now)
	if generation != before {
		return
	}
	
	if success {
		cb.onSuccess(state, now)
	} else {
		cb.onFailure(state, now)
	}
}

// currentState 获取当前状态
func (cb *CircuitBreaker) currentState(now time.Time) (State, uint64) {
	switch cb.state {
	case StateClosed:
		if !cb.expiry.IsZero() && cb.expiry.Before(now) {
			cb.toNewGeneration(now)
		}
	case StateOpen:
		if cb.expiry.Before(now) {
			cb.setState(StateHalfOpen, now)
		}
	}
	return cb.state, cb.generation
}

// onSuccess 成功处理
func (cb *CircuitBreaker) onSuccess(state State, now time.Time) {
	cb.counts.onSuccess()
	
	if state == StateHalfOpen {
		cb.setState(StateClosed, now)
	}
}

// onFailure 失败处理
func (cb *CircuitBreaker) onFailure(state State, now time.Time) {
	cb.counts.onFailure()
	
	if cb.readyToTrip(cb.counts) {
		cb.setState(StateOpen, now)
	}
}

// setState 设置状态
func (cb *CircuitBreaker) setState(state State, now time.Time) {
	if cb.state == state {
		return
	}
	
	prev := cb.state
	cb.state = state
	cb.toNewGeneration(now)
	
	if cb.onStateChange != nil {
		cb.onStateChange(cb.name, prev, state)
	}
}

// toNewGeneration 新一代
func (cb *CircuitBreaker) toNewGeneration(now time.Time) {
	cb.generation++
	cb.counts.clear()
	
	var zero time.Time
	switch cb.state {
	case StateClosed:
		if cb.interval == 0 {
			cb.expiry = zero
		} else {
			cb.expiry = now.Add(cb.interval)
		}
	case StateOpen:
		cb.expiry = now.Add(cb.timeout)
	default: // StateHalfOpen
		cb.expiry = zero
	}
}

// onRequest 请求计数
func (c *Counts) onRequest() {
	c.Requests++
}

// onSuccess 成功计数
func (c *Counts) onSuccess() {
	c.TotalSuccesses++
	c.ConsecutiveSuccesses++
	c.ConsecutiveFailures = 0
}

// onFailure 失败计数
func (c *Counts) onFailure() {
	c.TotalFailures++
	c.ConsecutiveFailures++
	c.ConsecutiveSuccesses = 0
}

// clear 清空计数
func (c *Counts) clear() {
	c.Requests = 0
	c.TotalSuccesses = 0
	c.TotalFailures = 0
	c.ConsecutiveSuccesses = 0
	c.ConsecutiveFailures = 0
}

// 错误定义
var (
	ErrOpenState        = errors.New("circuit breaker is open")
	ErrTooManyRequests = errors.New("too many requests")
)

数据库性能优化

数据库连接池优化

// database/pool.go
package database

import (
	"context"
	"database/sql"
	"fmt"
	"sync"
	"time"
	
	_ "github.com/lib/pq"
	"github.com/jmoiron/sqlx"
)

// DatabaseConfig 数据库配置
type DatabaseConfig struct {
	Host            string        `yaml:"host"`
	Port            int           `yaml:"port"`
	Database        string        `yaml:"database"`
	Username        string        `yaml:"username"`
	Password        string        `yaml:"password"`
	SSLMode         string        `yaml:"ssl_mode"`
	MaxOpenConns    int           `yaml:"max_open_conns"`
	MaxIdleConns    int           `yaml:"max_idle_conns"`
	ConnMaxLifetime time.Duration `yaml:"conn_max_lifetime"`
	ConnMaxIdleTime time.Duration `yaml:"conn_max_idle_time"`
	QueryTimeout    time.Duration `yaml:"query_timeout"`
}

// DatabaseManager 数据库管理器
type DatabaseManager struct {
	db     *sqlx.DB
	config *DatabaseConfig
	mu     sync.RWMutex
	stats  *DatabaseStats
}

// DatabaseStats 数据库统计
type DatabaseStats struct {
	TotalQueries    int64
	SlowQueries     int64
	FailedQueries   int64
	AverageLatency  time.Duration
	ConnectionsUsed int
	mu              sync.RWMutex
}

// NewDatabaseManager 创建数据库管理器
func NewDatabaseManager(config *DatabaseConfig) (*DatabaseManager, error) {
	dsn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s",
		config.Host, config.Port, config.Username, config.Password, config.Database, config.SSLMode)
	
	db, err := sqlx.Connect("postgres", dsn)
	if err != nil {
		return nil, fmt.Errorf("failed to connect to database: %w", err)
	}
	
	// 配置连接池
	db.SetMaxOpenConns(config.MaxOpenConns)
	db.SetMaxIdleConns(config.MaxIdleConns)
	db.SetConnMaxLifetime(config.ConnMaxLifetime)
	db.SetConnMaxIdleTime(config.ConnMaxIdleTime)
	
	dm := &DatabaseManager{
		db:     db,
		config: config,
		stats:  &DatabaseStats{},
	}
	
	// 启动健康检查
	go dm.healthCheck()
	
	return dm, nil
}

// healthCheck 健康检查
func (dm *DatabaseManager) healthCheck() {
	ticker := time.NewTicker(30 * time.Second)
	defer ticker.Stop()
	
	for range ticker.C {
		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		err := dm.db.PingContext(ctx)
		cancel()
		
		if err != nil {
			fmt.Printf("Database health check failed: %v\n", err)
		}
	}
}

// QueryWithStats 带统计的查询
func (dm *DatabaseManager) QueryWithStats(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
	start := time.Now()
	defer func() {
		latency := time.Since(start)
		dm.updateStats(latency, latency > 1*time.Second)
	}()
	
	queryCtx, cancel := context.WithTimeout(ctx, dm.config.QueryTimeout)
	defer cancel()
	
	rows, err := dm.db.QueryContext(queryCtx, query, args...)
	if err != nil {
		dm.stats.mu.Lock()
		dm.stats.FailedQueries++
		dm.stats.mu.Unlock()
		return nil, err
	}
	
	return rows, nil
}

// ExecWithStats 带统计的执行
func (dm *DatabaseManager) ExecWithStats(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
	start := time.Now()
	defer func() {
		latency := time.Since(start)
		dm.updateStats(latency, latency > 1*time.Second)
	}()
	
	queryCtx, cancel := context.WithTimeout(ctx, dm.config.QueryTimeout)
	defer cancel()
	
	result, err := dm.db.ExecContext(queryCtx, query, args...)
	if err != nil {
		dm.stats.mu.Lock()
		dm.stats.FailedQueries++
		dm.stats.mu.Unlock()
		return nil, err
	}
	
	return result, nil
}

// updateStats 更新统计信息
func (dm *DatabaseManager) updateStats(latency time.Duration, isSlow bool) {
	dm.stats.mu.Lock()
	defer dm.stats.mu.Unlock()
	
	dm.stats.TotalQueries++
	if isSlow {
		dm.stats.SlowQueries++
	}
	
	// 计算平均延迟
	if dm.stats.TotalQueries == 1 {
		dm.stats.AverageLatency = latency
	} else {
		dm.stats.AverageLatency = time.Duration(
			(int64(dm.stats.AverageLatency)*(dm.stats.TotalQueries-1) + int64(latency)) / dm.stats.TotalQueries,
		)
	}
	
	// 更新连接使用情况
	dbStats := dm.db.Stats()
	dm.stats.ConnectionsUsed = dbStats.InUse
}

// GetStats 获取统计信息
func (dm *DatabaseManager) GetStats() DatabaseStats {
	dm.stats.mu.RLock()
	defer dm.stats.mu.RUnlock()
	return *dm.stats
}

// Close 关闭数据库连接
func (dm *DatabaseManager) Close() error {
	return dm.db.Close()
}

// BatchInsert 批量插入优化
func (dm *DatabaseManager) BatchInsert(ctx context.Context, table string, columns []string, values [][]interface{}) error {
	if len(values) == 0 {
		return nil
	}
	
	// 构建批量插入SQL
	placeholders := make([]string, len(values))
	args := make([]interface{}, 0, len(values)*len(columns))
	
	for i, row := range values {
		rowPlaceholders := make([]string, len(columns))
		for j := range columns {
			rowPlaceholders[j] = fmt.Sprintf("$%d", len(args)+j+1)
		}
		placeholders[i] = fmt.Sprintf("(%s)", strings.Join(rowPlaceholders, ","))
		args = append(args, row...)
	}
	
	query := fmt.Sprintf("INSERT INTO %s (%s) VALUES %s",
		table, strings.Join(columns, ","), strings.Join(placeholders, ","))
	
	_, err := dm.ExecWithStats(ctx, query, args...)
	return err
}

// PreparedStatementCache 预编译语句缓存
type PreparedStatementCache struct {
	cache map[string]*sql.Stmt
	mu    sync.RWMutex
	db    *sql.DB
}

// NewPreparedStatementCache 创建预编译语句缓存
func NewPreparedStatementCache(db *sql.DB) *PreparedStatementCache {
	return &PreparedStatementCache{
		cache: make(map[string]*sql.Stmt),
		db:    db,
	}
}

// GetOrPrepare 获取或准备语句
func (psc *PreparedStatementCache) GetOrPrepare(query string) (*sql.Stmt, error) {
	psc.mu.RLock()
	stmt, exists := psc.cache[query]
	psc.mu.RUnlock()
	
	if exists {
		return stmt, nil
	}
	
	psc.mu.Lock()
	defer psc.mu.Unlock()
	
	// 双重检查
	if stmt, exists := psc.cache[query]; exists {
		return stmt, nil
	}
	
	stmt, err := psc.db.Prepare(query)
	if err != nil {
		return nil, err
	}
	
	psc.cache[query] = stmt
	return stmt, nil
}

// Close 关闭所有预编译语句
func (psc *PreparedStatementCache) Close() error {
	psc.mu.Lock()
	defer psc.mu.Unlock()
	
	for _, stmt := range psc.cache {
		stmt.Close()
	}
	psc.cache = make(map[string]*sql.Stmt)
	return nil
}