概述

本章将深入探讨 gRPC 应用的监控、日志记录和调试技术,包括指标收集、分布式追踪、结构化日志、性能分析、故障排查等。我们将学习如何构建完整的可观测性体系。

学习目标

  • 理解可观测性的三大支柱:指标、日志、追踪
  • 掌握 gRPC 指标收集和监控方法
  • 学习分布式追踪的实现和应用
  • 了解结构化日志的最佳实践
  • 掌握性能分析和调试技巧

监控指标体系

from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Callable, Union
from abc import ABC, abstractmethod
import time
import json
import threading
from datetime import datetime, timedelta
from collections import defaultdict, deque

class MetricType(Enum):
    """指标类型枚举"""
    COUNTER = "counter"
    GAUGE = "gauge"
    HISTOGRAM = "histogram"
    SUMMARY = "summary"

class LogLevel(Enum):
    """日志级别枚举"""
    TRACE = "trace"
    DEBUG = "debug"
    INFO = "info"
    WARN = "warn"
    ERROR = "error"
    FATAL = "fatal"

class TraceSpanKind(Enum):
    """追踪跨度类型枚举"""
    CLIENT = "client"
    SERVER = "server"
    PRODUCER = "producer"
    CONSUMER = "consumer"
    INTERNAL = "internal"

class AlertSeverity(Enum):
    """告警严重级别枚举"""
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

@dataclass
class MetricConfig:
    """指标配置"""
    name: str
    metric_type: MetricType
    description: str
    labels: List[str]
    buckets: List[float] = None  # 用于直方图
    objectives: Dict[float, float] = None  # 用于摘要
    
@dataclass
class LogConfig:
    """日志配置"""
    level: LogLevel
    format: str
    output: str
    rotation: bool = True
    max_size: int = 100  # MB
    max_files: int = 10
    structured: bool = True
    
@dataclass
class TraceConfig:
    """追踪配置"""
    service_name: str
    service_version: str
    environment: str
    sampler_type: str = "probabilistic"
    sampler_param: float = 0.1
    jaeger_endpoint: str = "http://localhost:14268/api/traces"
    
class ObservabilityManager:
    """可观测性管理器"""
    
    def __init__(self):
        self.metrics_registry = {}
        self.loggers = {}
        self.tracers = {}
        
    def create_metrics_system(self) -> str:
        """创建指标系统"""
        return """
// metrics.go - 指标系统实现
package observability

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "sync"
    "time"
    
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promhttp"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

// MetricsCollector 指标收集器
type MetricsCollector struct {
    requestsTotal     *prometheus.CounterVec
    requestDuration   *prometheus.HistogramVec
    requestsInFlight  *prometheus.GaugeVec
    errorRate         *prometheus.CounterVec
    connectionCount   *prometheus.GaugeVec
    registry          *prometheus.Registry
    mu                sync.RWMutex
}

// NewMetricsCollector 创建指标收集器
func NewMetricsCollector(serviceName string) *MetricsCollector {
    mc := &MetricsCollector{
        registry: prometheus.NewRegistry(),
    }
    
    // 请求总数计数器
    mc.requestsTotal = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "grpc_requests_total",
            Help: "Total number of gRPC requests",
            ConstLabels: prometheus.Labels{
                "service": serviceName,
            },
        },
        []string{"method", "status_code", "client_ip"},
    )
    
    // 请求持续时间直方图
    mc.requestDuration = prometheus.NewHistogramVec(
        prometheus.HistogramOpts{
            Name: "grpc_request_duration_seconds",
            Help: "Duration of gRPC requests in seconds",
            Buckets: []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10},
            ConstLabels: prometheus.Labels{
                "service": serviceName,
            },
        },
        []string{"method", "status_code"},
    )
    
    // 正在处理的请求数量
    mc.requestsInFlight = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "grpc_requests_in_flight",
            Help: "Number of gRPC requests currently being processed",
            ConstLabels: prometheus.Labels{
                "service": serviceName,
            },
        },
        []string{"method"},
    )
    
    // 错误率计数器
    mc.errorRate = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "grpc_errors_total",
            Help: "Total number of gRPC errors",
            ConstLabels: prometheus.Labels{
                "service": serviceName,
            },
        },
        []string{"method", "error_code", "error_type"},
    )
    
    // 连接数量
    mc.connectionCount = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "grpc_connections_active",
            Help: "Number of active gRPC connections",
            ConstLabels: prometheus.Labels{
                "service": serviceName,
            },
        },
        []string{"client_type"},
    )
    
    // 注册指标
    mc.registry.MustRegister(
        mc.requestsTotal,
        mc.requestDuration,
        mc.requestsInFlight,
        mc.errorRate,
        mc.connectionCount,
    )
    
    return mc
}

// MetricsInterceptor 指标拦截器
func (mc *MetricsCollector) MetricsInterceptor() grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
        start := time.Now()
        method := info.FullMethod
        
        // 增加正在处理的请求数
        mc.requestsInFlight.WithLabelValues(method).Inc()
        defer mc.requestsInFlight.WithLabelValues(method).Dec()
        
        // 获取客户端IP
        clientIP := getClientIP(ctx)
        
        // 执行处理器
        resp, err := handler(ctx, req)
        
        // 记录指标
        duration := time.Since(start).Seconds()
        statusCode := codes.OK
        
        if err != nil {
            statusCode = status.Code(err)
            
            // 记录错误指标
            mc.errorRate.WithLabelValues(
                method,
                statusCode.String(),
                getErrorType(err),
            ).Inc()
        }
        
        // 记录请求指标
        mc.requestsTotal.WithLabelValues(
            method,
            statusCode.String(),
            clientIP,
        ).Inc()
        
        mc.requestDuration.WithLabelValues(
            method,
            statusCode.String(),
        ).Observe(duration)
        
        return resp, err
    }
}

// StreamMetricsInterceptor 流指标拦截器
func (mc *MetricsCollector) StreamMetricsInterceptor() grpc.StreamServerInterceptor {
    return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
        start := time.Now()
        method := info.FullMethod
        
        // 增加正在处理的请求数
        mc.requestsInFlight.WithLabelValues(method).Inc()
        defer mc.requestsInFlight.WithLabelValues(method).Dec()
        
        // 获取客户端IP
        clientIP := getClientIP(ss.Context())
        
        // 包装流以收集指标
        wrappedStream := &metricsServerStream{
            ServerStream: ss,
            collector:    mc,
            method:       method,
            clientIP:     clientIP,
            start:        start,
        }
        
        // 执行处理器
        err := handler(srv, wrappedStream)
        
        // 记录最终指标
        duration := time.Since(start).Seconds()
        statusCode := codes.OK
        
        if err != nil {
            statusCode = status.Code(err)
            mc.errorRate.WithLabelValues(
                method,
                statusCode.String(),
                getErrorType(err),
            ).Inc()
        }
        
        mc.requestsTotal.WithLabelValues(
            method,
            statusCode.String(),
            clientIP,
        ).Inc()
        
        mc.requestDuration.WithLabelValues(
            method,
            statusCode.String(),
        ).Observe(duration)
        
        return err
    }
}

// metricsServerStream 指标服务器流包装器
type metricsServerStream struct {
    grpc.ServerStream
    collector *MetricsCollector
    method    string
    clientIP  string
    start     time.Time
    msgSent   int64
    msgRecv   int64
}

// SendMsg 发送消息
func (mss *metricsServerStream) SendMsg(m interface{}) error {
    err := mss.ServerStream.SendMsg(m)
    if err == nil {
        mss.msgSent++
    }
    return err
}

// RecvMsg 接收消息
func (mss *metricsServerStream) RecvMsg(m interface{}) error {
    err := mss.ServerStream.RecvMsg(m)
    if err == nil {
        mss.msgRecv++
    }
    return err
}

// StartMetricsServer 启动指标服务器
func (mc *MetricsCollector) StartMetricsServer(port int) error {
    http.Handle("/metrics", promhttp.HandlerFor(mc.registry, promhttp.HandlerOpts{}))
    
    // 添加健康检查端点
    http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        w.WriteHeader(http.StatusOK)
        w.Write([]byte("OK"))
    })
    
    // 添加指标摘要端点
    http.HandleFunc("/metrics/summary", mc.metricsHandler)
    
    log.Printf("Starting metrics server on port %d", port)
    return http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
}

// metricsHandler 指标摘要处理器
func (mc *MetricsCollector) metricsHandler(w http.ResponseWriter, r *http.Request) {
    mc.mu.RLock()
    defer mc.mu.RUnlock()
    
    summary := map[string]interface{}{
        "timestamp": time.Now().Unix(),
        "service":   "grpc-service",
        "metrics": map[string]interface{}{
            "requests_total":     mc.getCounterValue(mc.requestsTotal),
            "requests_in_flight": mc.getGaugeValue(mc.requestsInFlight),
            "errors_total":       mc.getCounterValue(mc.errorRate),
            "connections_active": mc.getGaugeValue(mc.connectionCount),
        },
    }
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(summary)
}

// CustomMetrics 自定义指标
type CustomMetrics struct {
    businessMetrics map[string]prometheus.Metric
    mu              sync.RWMutex
}

// NewCustomMetrics 创建自定义指标
func NewCustomMetrics() *CustomMetrics {
    return &CustomMetrics{
        businessMetrics: make(map[string]prometheus.Metric),
    }
}

// RecordBusinessMetric 记录业务指标
func (cm *CustomMetrics) RecordBusinessMetric(name string, value float64, labels map[string]string) {
    cm.mu.Lock()
    defer cm.mu.Unlock()
    
    // 创建或更新业务指标
    if metric, exists := cm.businessMetrics[name]; exists {
        if gauge, ok := metric.(prometheus.Gauge); ok {
            gauge.Set(value)
        }
    } else {
        // 创建新的业务指标
        gauge := prometheus.NewGauge(prometheus.GaugeOpts{
            Name: fmt.Sprintf("business_%s", name),
            Help: fmt.Sprintf("Business metric: %s", name),
        })
        gauge.Set(value)
        cm.businessMetrics[name] = gauge
    }
}

// AlertManager 告警管理器
type AlertManager struct {
    rules       []AlertRule
    webhookURL  string
    mu          sync.RWMutex
}

// AlertRule 告警规则
type AlertRule struct {
    Name        string
    Query       string
    Threshold   float64
    Severity    string
    Duration    time.Duration
    Description string
}

// NewAlertManager 创建告警管理器
func NewAlertManager(webhookURL string) *AlertManager {
    return &AlertManager{
        webhookURL: webhookURL,
        rules:      make([]AlertRule, 0),
    }
}

// AddRule 添加告警规则
func (am *AlertManager) AddRule(rule AlertRule) {
    am.mu.Lock()
    defer am.mu.Unlock()
    am.rules = append(am.rules, rule)
}

// CheckAlerts 检查告警
func (am *AlertManager) CheckAlerts(metrics map[string]float64) {
    am.mu.RLock()
    defer am.mu.RUnlock()
    
    for _, rule := range am.rules {
        if value, exists := metrics[rule.Name]; exists {
            if value > rule.Threshold {
                am.triggerAlert(rule, value)
            }
        }
    }
}

// triggerAlert 触发告警
func (am *AlertManager) triggerAlert(rule AlertRule, value float64) {
    alert := map[string]interface{}{
        "alert":       rule.Name,
        "severity":    rule.Severity,
        "value":       value,
        "threshold":   rule.Threshold,
        "description": rule.Description,
        "timestamp":   time.Now().Unix(),
    }
    
    // 发送告警到Webhook
    go am.sendWebhook(alert)
}

// sendWebhook 发送Webhook
func (am *AlertManager) sendWebhook(alert map[string]interface{}) {
    // 实现Webhook发送逻辑
    log.Printf("Alert triggered: %+v", alert)
}

// 辅助函数
func getClientIP(ctx context.Context) string {
    // 从上下文中提取客户端IP
    return "unknown"
}

func getErrorType(err error) string {
    if err == nil {
        return "none"
    }
    
    code := status.Code(err)
    switch code {
    case codes.InvalidArgument:
        return "validation"
    case codes.Unauthenticated:
        return "authentication"
    case codes.PermissionDenied:
        return "authorization"
    case codes.Internal:
        return "internal"
    case codes.Unavailable:
        return "unavailable"
    default:
        return "unknown"
    }
}

func (mc *MetricsCollector) getCounterValue(counter *prometheus.CounterVec) float64 {
    // 获取计数器值的实现
    return 0.0
}

func (mc *MetricsCollector) getGaugeValue(gauge *prometheus.GaugeVec) float64 {
    // 获取仪表值的实现
    return 0.0
}
"""
    
    def create_logging_system(self) -> str:
        """创建日志系统"""
        return """
// logging.go - 日志系统实现
package observability

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "os"
    "path/filepath"
    "runtime"
    "sync"
    "time"
    
    "github.com/sirupsen/logrus"
    "google.golang.org/grpc"
    "google.golang.org/grpc/metadata"
    "google.golang.org/grpc/peer"
)

// StructuredLogger 结构化日志器
type StructuredLogger struct {
    logger    *logrus.Logger
    serviceName string
    version   string
    environment string
    mu        sync.RWMutex
}

// LogEntry 日志条目
type LogEntry struct {
    Timestamp   time.Time              `json:"timestamp"`
    Level       string                 `json:"level"`
    Service     string                 `json:"service"`
    Version     string                 `json:"version"`
    Environment string                 `json:"environment"`
    TraceID     string                 `json:"trace_id,omitempty"`
    SpanID      string                 `json:"span_id,omitempty"`
    RequestID   string                 `json:"request_id,omitempty"`
    Method      string                 `json:"method,omitempty"`
    ClientIP    string                 `json:"client_ip,omitempty"`
    UserID      string                 `json:"user_id,omitempty"`
    Duration    int64                  `json:"duration_ms,omitempty"`
    StatusCode  string                 `json:"status_code,omitempty"`
    Error       string                 `json:"error,omitempty"`
    Message     string                 `json:"message"`
    Fields      map[string]interface{} `json:"fields,omitempty"`
}

// NewStructuredLogger 创建结构化日志器
func NewStructuredLogger(serviceName, version, environment string) *StructuredLogger {
    logger := logrus.New()
    
    // 设置JSON格式
    logger.SetFormatter(&logrus.JSONFormatter{
        TimestampFormat: time.RFC3339Nano,
        FieldMap: logrus.FieldMap{
            logrus.FieldKeyTime:  "timestamp",
            logrus.FieldKeyLevel: "level",
            logrus.FieldKeyMsg:   "message",
        },
    })
    
    // 设置输出
    logger.SetOutput(os.Stdout)
    
    // 设置日志级别
    logger.SetLevel(logrus.InfoLevel)
    
    return &StructuredLogger{
        logger:      logger,
        serviceName: serviceName,
        version:     version,
        environment: environment,
    }
}

// LoggingInterceptor 日志拦截器
func (sl *StructuredLogger) LoggingInterceptor() grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
        start := time.Now()
        
        // 提取请求信息
        requestInfo := sl.extractRequestInfo(ctx, info.FullMethod)
        
        // 记录请求开始
        sl.logRequest(requestInfo, req)
        
        // 执行处理器
        resp, err := handler(ctx, req)
        
        // 记录请求结束
        duration := time.Since(start)
        sl.logResponse(requestInfo, resp, err, duration)
        
        return resp, err
    }
}

// StreamLoggingInterceptor 流日志拦截器
func (sl *StructuredLogger) StreamLoggingInterceptor() grpc.StreamServerInterceptor {
    return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
        start := time.Now()
        
        // 提取请求信息
        requestInfo := sl.extractRequestInfo(ss.Context(), info.FullMethod)
        
        // 记录流开始
        sl.logStreamStart(requestInfo)
        
        // 包装流以记录消息
        wrappedStream := &loggingServerStream{
            ServerStream: ss,
            logger:       sl,
            requestInfo:  requestInfo,
            start:        start,
        }
        
        // 执行处理器
        err := handler(srv, wrappedStream)
        
        // 记录流结束
        duration := time.Since(start)
        sl.logStreamEnd(requestInfo, err, duration, wrappedStream.msgSent, wrappedStream.msgRecv)
        
        return err
    }
}

// RequestInfo 请求信息
type RequestInfo struct {
    TraceID   string
    SpanID    string
    RequestID string
    Method    string
    ClientIP  string
    UserID    string
    UserAgent string
    Metadata  map[string]string
}

// extractRequestInfo 提取请求信息
func (sl *StructuredLogger) extractRequestInfo(ctx context.Context, method string) *RequestInfo {
    info := &RequestInfo{
        Method:   method,
        Metadata: make(map[string]string),
    }
    
    // 从上下文中提取信息
    if traceID, ok := ctx.Value("trace_id").(string); ok {
        info.TraceID = traceID
    }
    
    if spanID, ok := ctx.Value("span_id").(string); ok {
        info.SpanID = spanID
    }
    
    if requestID, ok := ctx.Value("request_id").(string); ok {
        info.RequestID = requestID
    }
    
    if userID, ok := ctx.Value("user_id").(string); ok {
        info.UserID = userID
    }
    
    // 从peer中提取客户端IP
    if p, ok := peer.FromContext(ctx); ok {
        info.ClientIP = p.Addr.String()
    }
    
    // 从metadata中提取信息
    if md, ok := metadata.FromIncomingContext(ctx); ok {
        for key, values := range md {
            if len(values) > 0 {
                info.Metadata[key] = values[0]
            }
        }
        
        if userAgent := md.Get("user-agent"); len(userAgent) > 0 {
            info.UserAgent = userAgent[0]
        }
    }
    
    return info
}

// logRequest 记录请求
func (sl *StructuredLogger) logRequest(info *RequestInfo, req interface{}) {
    entry := sl.createLogEntry("info", "Request started")
    sl.addRequestFields(entry, info)
    
    // 添加请求大小信息
    if reqBytes, err := json.Marshal(req); err == nil {
        entry.Fields["request_size"] = len(reqBytes)
    }
    
    sl.writeLog(entry)
}

// logResponse 记录响应
func (sl *StructuredLogger) logResponse(info *RequestInfo, resp interface{}, err error, duration time.Duration) {
    level := "info"
    message := "Request completed"
    
    if err != nil {
        level = "error"
        message = "Request failed"
    }
    
    entry := sl.createLogEntry(level, message)
    sl.addRequestFields(entry, info)
    
    entry.Duration = duration.Milliseconds()
    
    if err != nil {
        entry.Error = err.Error()
        entry.StatusCode = getStatusCode(err)
    } else {
        entry.StatusCode = "OK"
        
        // 添加响应大小信息
        if respBytes, jsonErr := json.Marshal(resp); jsonErr == nil {
            entry.Fields["response_size"] = len(respBytes)
        }
    }
    
    sl.writeLog(entry)
}

// logStreamStart 记录流开始
func (sl *StructuredLogger) logStreamStart(info *RequestInfo) {
    entry := sl.createLogEntry("info", "Stream started")
    sl.addRequestFields(entry, info)
    sl.writeLog(entry)
}

// logStreamEnd 记录流结束
func (sl *StructuredLogger) logStreamEnd(info *RequestInfo, err error, duration time.Duration, msgSent, msgRecv int64) {
    level := "info"
    message := "Stream completed"
    
    if err != nil {
        level = "error"
        message = "Stream failed"
    }
    
    entry := sl.createLogEntry(level, message)
    sl.addRequestFields(entry, info)
    
    entry.Duration = duration.Milliseconds()
    entry.Fields["messages_sent"] = msgSent
    entry.Fields["messages_received"] = msgRecv
    
    if err != nil {
        entry.Error = err.Error()
        entry.StatusCode = getStatusCode(err)
    } else {
        entry.StatusCode = "OK"
    }
    
    sl.writeLog(entry)
}

// createLogEntry 创建日志条目
func (sl *StructuredLogger) createLogEntry(level, message string) *LogEntry {
    return &LogEntry{
        Timestamp:   time.Now(),
        Level:       level,
        Service:     sl.serviceName,
        Version:     sl.version,
        Environment: sl.environment,
        Message:     message,
        Fields:      make(map[string]interface{}),
    }
}

// addRequestFields 添加请求字段
func (sl *StructuredLogger) addRequestFields(entry *LogEntry, info *RequestInfo) {
    entry.TraceID = info.TraceID
    entry.SpanID = info.SpanID
    entry.RequestID = info.RequestID
    entry.Method = info.Method
    entry.ClientIP = info.ClientIP
    entry.UserID = info.UserID
    
    if info.UserAgent != "" {
        entry.Fields["user_agent"] = info.UserAgent
    }
    
    // 添加调用栈信息
    if pc, file, line, ok := runtime.Caller(3); ok {
        entry.Fields["caller"] = fmt.Sprintf("%s:%d", filepath.Base(file), line)
        if fn := runtime.FuncForPC(pc); fn != nil {
            entry.Fields["function"] = fn.Name()
        }
    }
}

// writeLog 写入日志
func (sl *StructuredLogger) writeLog(entry *LogEntry) {
    sl.mu.Lock()
    defer sl.mu.Unlock()
    
    fields := logrus.Fields{
        "service":     entry.Service,
        "version":     entry.Version,
        "environment": entry.Environment,
    }
    
    if entry.TraceID != "" {
        fields["trace_id"] = entry.TraceID
    }
    
    if entry.SpanID != "" {
        fields["span_id"] = entry.SpanID
    }
    
    if entry.RequestID != "" {
        fields["request_id"] = entry.RequestID
    }
    
    if entry.Method != "" {
        fields["method"] = entry.Method
    }
    
    if entry.ClientIP != "" {
        fields["client_ip"] = entry.ClientIP
    }
    
    if entry.UserID != "" {
        fields["user_id"] = entry.UserID
    }
    
    if entry.Duration > 0 {
        fields["duration_ms"] = entry.Duration
    }
    
    if entry.StatusCode != "" {
        fields["status_code"] = entry.StatusCode
    }
    
    if entry.Error != "" {
        fields["error"] = entry.Error
    }
    
    // 添加自定义字段
    for k, v := range entry.Fields {
        fields[k] = v
    }
    
    // 根据级别写入日志
    switch entry.Level {
    case "debug":
        sl.logger.WithFields(fields).Debug(entry.Message)
    case "info":
        sl.logger.WithFields(fields).Info(entry.Message)
    case "warn":
        sl.logger.WithFields(fields).Warn(entry.Message)
    case "error":
        sl.logger.WithFields(fields).Error(entry.Message)
    case "fatal":
        sl.logger.WithFields(fields).Fatal(entry.Message)
    default:
        sl.logger.WithFields(fields).Info(entry.Message)
    }
}

// loggingServerStream 日志服务器流包装器
type loggingServerStream struct {
    grpc.ServerStream
    logger      *StructuredLogger
    requestInfo *RequestInfo
    start       time.Time
    msgSent     int64
    msgRecv     int64
}

// SendMsg 发送消息
func (lss *loggingServerStream) SendMsg(m interface{}) error {
    err := lss.ServerStream.SendMsg(m)
    if err == nil {
        lss.msgSent++
        
        // 记录发送的消息
        entry := lss.logger.createLogEntry("debug", "Message sent")
        lss.logger.addRequestFields(entry, lss.requestInfo)
        entry.Fields["message_type"] = "send"
        entry.Fields["message_count"] = lss.msgSent
        
        if msgBytes, jsonErr := json.Marshal(m); jsonErr == nil {
            entry.Fields["message_size"] = len(msgBytes)
        }
        
        lss.logger.writeLog(entry)
    }
    return err
}

// RecvMsg 接收消息
func (lss *loggingServerStream) RecvMsg(m interface{}) error {
    err := lss.ServerStream.RecvMsg(m)
    if err == nil {
        lss.msgRecv++
        
        // 记录接收的消息
        entry := lss.logger.createLogEntry("debug", "Message received")
        lss.logger.addRequestFields(entry, lss.requestInfo)
        entry.Fields["message_type"] = "recv"
        entry.Fields["message_count"] = lss.msgRecv
        
        if msgBytes, jsonErr := json.Marshal(m); jsonErr == nil {
            entry.Fields["message_size"] = len(msgBytes)
        }
        
        lss.logger.writeLog(entry)
    }
    return err
}

// LogRotator 日志轮转器
type LogRotator struct {
    filename    string
    maxSize     int64
    maxFiles    int
    currentFile *os.File
    currentSize int64
    mu          sync.Mutex
}

// NewLogRotator 创建日志轮转器
func NewLogRotator(filename string, maxSize int64, maxFiles int) *LogRotator {
    return &LogRotator{
        filename: filename,
        maxSize:  maxSize,
        maxFiles: maxFiles,
    }
}

// Write 写入日志
func (lr *LogRotator) Write(p []byte) (n int, err error) {
    lr.mu.Lock()
    defer lr.mu.Unlock()
    
    // 检查是否需要轮转
    if lr.currentFile == nil || lr.currentSize+int64(len(p)) > lr.maxSize {
        if err := lr.rotate(); err != nil {
            return 0, err
        }
    }
    
    n, err = lr.currentFile.Write(p)
    lr.currentSize += int64(n)
    return n, err
}

// rotate 轮转日志文件
func (lr *LogRotator) rotate() error {
    // 关闭当前文件
    if lr.currentFile != nil {
        lr.currentFile.Close()
    }
    
    // 移动旧文件
    for i := lr.maxFiles - 1; i > 0; i-- {
        oldName := fmt.Sprintf("%s.%d", lr.filename, i)
        newName := fmt.Sprintf("%s.%d", lr.filename, i+1)
        os.Rename(oldName, newName)
    }
    
    if _, err := os.Stat(lr.filename); err == nil {
        os.Rename(lr.filename, lr.filename+".1")
    }
    
    // 创建新文件
    file, err := os.OpenFile(lr.filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
    if err != nil {
        return err
    }
    
    lr.currentFile = file
    lr.currentSize = 0
    
    return nil
}

// 辅助函数
func getStatusCode(err error) string {
    if err == nil {
        return "OK"
    }
    
    if s, ok := status.FromError(err); ok {
        return s.Code().String()
    }
    
    return "UNKNOWN"
}
"""
    
    def create_tracing_system(self) -> str:
        """创建分布式追踪系统"""
        return """
// tracing.go - 分布式追踪系统实现
package observability

import (
    "context"
    "fmt"
    "log"
    "time"
    
    "github.com/opentracing/opentracing-go"
    "github.com/opentracing/opentracing-go/ext"
    "github.com/uber/jaeger-client-go"
    "github.com/uber/jaeger-client-go/config"
    "google.golang.org/grpc"
    "google.golang.org/grpc/metadata"
)

// TracingManager 追踪管理器
type TracingManager struct {
    tracer      opentracing.Tracer
    serviceName string
    closer      func() error
}

// NewTracingManager 创建追踪管理器
func NewTracingManager(serviceName, jaegerEndpoint string) (*TracingManager, error) {
    cfg := config.Configuration{
        ServiceName: serviceName,
        Sampler: &config.SamplerConfig{
            Type:  jaeger.SamplerTypeConst,
            Param: 1,
        },
        Reporter: &config.ReporterConfig{
            LogSpans:            true,
            BufferFlushInterval: 1 * time.Second,
            LocalAgentHostPort:  jaegerEndpoint,
        },
    }
    
    tracer, closer, err := cfg.NewTracer()
    if err != nil {
        return nil, fmt.Errorf("failed to create tracer: %w", err)
    }
    
    opentracing.SetGlobalTracer(tracer)
    
    return &TracingManager{
        tracer:      tracer,
        serviceName: serviceName,
        closer:      closer,
    }, nil
}

// Close 关闭追踪器
func (tm *TracingManager) Close() error {
    if tm.closer != nil {
        return tm.closer()
    }
    return nil
}

// TracingInterceptor 追踪拦截器
func (tm *TracingManager) TracingInterceptor() grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
        // 从metadata中提取span上下文
        spanCtx, err := tm.extractSpanContext(ctx)
        if err != nil {
            log.Printf("Failed to extract span context: %v", err)
        }
        
        // 创建新的span
        span := tm.tracer.StartSpan(
            info.FullMethod,
            ext.RPCServerOption(spanCtx),
        )
        defer span.Finish()
        
        // 设置span标签
        ext.Component.Set(span, "grpc-server")
        ext.SpanKind.Set(span, ext.SpanKindRPCServerEnum)
        span.SetTag("grpc.method", info.FullMethod)
        span.SetTag("grpc.service", tm.serviceName)
        
        // 添加请求信息
        if md, ok := metadata.FromIncomingContext(ctx); ok {
            if userAgent := md.Get("user-agent"); len(userAgent) > 0 {
                span.SetTag("http.user_agent", userAgent[0])
            }
        }
        
        // 将span添加到上下文
        ctx = opentracing.ContextWithSpan(ctx, span)
        
        // 执行处理器
        resp, err := handler(ctx, req)
        
        // 记录结果
        if err != nil {
            ext.Error.Set(span, true)
            span.LogFields(
                log.String("event", "error"),
                log.String("message", err.Error()),
            )
            span.SetTag("grpc.status_code", getGRPCStatusCode(err))
        } else {
            span.SetTag("grpc.status_code", "OK")
        }
        
        return resp, err
    }
}

// StreamTracingInterceptor 流追踪拦截器
func (tm *TracingManager) StreamTracingInterceptor() grpc.StreamServerInterceptor {
    return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
        // 从metadata中提取span上下文
        spanCtx, err := tm.extractSpanContext(ss.Context())
        if err != nil {
            log.Printf("Failed to extract span context: %v", err)
        }
        
        // 创建新的span
        span := tm.tracer.StartSpan(
            info.FullMethod,
            ext.RPCServerOption(spanCtx),
        )
        defer span.Finish()
        
        // 设置span标签
        ext.Component.Set(span, "grpc-server")
        ext.SpanKind.Set(span, ext.SpanKindRPCServerEnum)
        span.SetTag("grpc.method", info.FullMethod)
        span.SetTag("grpc.service", tm.serviceName)
        span.SetTag("grpc.stream", true)
        
        // 包装流
        wrappedStream := &tracingServerStream{
            ServerStream: ss,
            span:         span,
            ctx:          opentracing.ContextWithSpan(ss.Context(), span),
        }
        
        // 执行处理器
        err = handler(srv, wrappedStream)
        
        // 记录结果
        if err != nil {
            ext.Error.Set(span, true)
            span.LogFields(
                log.String("event", "error"),
                log.String("message", err.Error()),
            )
            span.SetTag("grpc.status_code", getGRPCStatusCode(err))
        } else {
            span.SetTag("grpc.status_code", "OK")
        }
        
        return err
    }
}

// tracingServerStream 追踪服务器流包装器
type tracingServerStream struct {
    grpc.ServerStream
    span opentracing.Span
    ctx  context.Context
}

// Context 返回上下文
func (tss *tracingServerStream) Context() context.Context {
    return tss.ctx
}

// SendMsg 发送消息
func (tss *tracingServerStream) SendMsg(m interface{}) error {
    // 创建发送消息的子span
    childSpan := opentracing.StartSpan(
        "grpc.send_message",
        opentracing.ChildOf(tss.span.Context()),
    )
    defer childSpan.Finish()
    
    err := tss.ServerStream.SendMsg(m)
    if err != nil {
        ext.Error.Set(childSpan, true)
        childSpan.LogFields(
            log.String("event", "send_error"),
            log.String("message", err.Error()),
        )
    }
    
    return err
}

// RecvMsg 接收消息
func (tss *tracingServerStream) RecvMsg(m interface{}) error {
    // 创建接收消息的子span
    childSpan := opentracing.StartSpan(
        "grpc.recv_message",
        opentracing.ChildOf(tss.span.Context()),
    )
    defer childSpan.Finish()
    
    err := tss.ServerStream.RecvMsg(m)
    if err != nil {
        ext.Error.Set(childSpan, true)
        childSpan.LogFields(
            log.String("event", "recv_error"),
            log.String("message", err.Error()),
        )
    }
    
    return err
}

// extractSpanContext 提取span上下文
func (tm *TracingManager) extractSpanContext(ctx context.Context) (opentracing.SpanContext, error) {
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return nil, fmt.Errorf("no metadata found")
    }
    
    carrier := MetadataCarrier(md)
    spanCtx, err := tm.tracer.Extract(opentracing.HTTPHeaders, carrier)
    if err != nil {
        return nil, err
    }
    
    return spanCtx, nil
}

// MetadataCarrier metadata载体
type MetadataCarrier metadata.MD

// Set 设置值
func (mc MetadataCarrier) Set(key, val string) {
    metadata.MD(mc)[key] = []string{val}
}

// ForeachKey 遍历键
func (mc MetadataCarrier) ForeachKey(handler func(key, val string) error) error {
    for key, values := range metadata.MD(mc) {
        for _, val := range values {
            if err := handler(key, val); err != nil {
                return err
            }
        }
    }
    return nil
}

// ClientTracingInterceptor 客户端追踪拦截器
func (tm *TracingManager) ClientTracingInterceptor() grpc.UnaryClientInterceptor {
    return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
        // 从上下文中获取父span
        parentSpan := opentracing.SpanFromContext(ctx)
        
        // 创建客户端span
        span := tm.tracer.StartSpan(
            method,
            opentracing.ChildOf(parentSpan.Context()),
        )
        defer span.Finish()
        
        // 设置span标签
        ext.Component.Set(span, "grpc-client")
        ext.SpanKind.Set(span, ext.SpanKindRPCClientEnum)
        span.SetTag("grpc.method", method)
        span.SetTag("grpc.service", tm.serviceName)
        
        // 注入span上下文到metadata
        md, ok := metadata.FromOutgoingContext(ctx)
        if !ok {
            md = metadata.New(nil)
        }
        
        carrier := MetadataCarrier(md)
        if err := tm.tracer.Inject(span.Context(), opentracing.HTTPHeaders, carrier); err != nil {
            log.Printf("Failed to inject span context: %v", err)
        }
        
        ctx = metadata.NewOutgoingContext(ctx, metadata.MD(carrier))
        ctx = opentracing.ContextWithSpan(ctx, span)
        
        // 执行调用
        err := invoker(ctx, method, req, reply, cc, opts...)
        
        // 记录结果
        if err != nil {
            ext.Error.Set(span, true)
            span.LogFields(
                log.String("event", "error"),
                log.String("message", err.Error()),
            )
            span.SetTag("grpc.status_code", getGRPCStatusCode(err))
        } else {
            span.SetTag("grpc.status_code", "OK")
        }
        
        return err
    }
}

// BusinessSpan 业务span
type BusinessSpan struct {
    span opentracing.Span
}

// NewBusinessSpan 创建业务span
func (tm *TracingManager) NewBusinessSpan(ctx context.Context, operationName string) *BusinessSpan {
    parentSpan := opentracing.SpanFromContext(ctx)
    
    span := tm.tracer.StartSpan(
        operationName,
        opentracing.ChildOf(parentSpan.Context()),
    )
    
    return &BusinessSpan{span: span}
}

// SetTag 设置标签
func (bs *BusinessSpan) SetTag(key string, value interface{}) {
    bs.span.SetTag(key, value)
}

// LogFields 记录字段
func (bs *BusinessSpan) LogFields(fields ...log.Field) {
    bs.span.LogFields(fields...)
}

// Finish 完成span
func (bs *BusinessSpan) Finish() {
    bs.span.Finish()
}

// SetError 设置错误
func (bs *BusinessSpan) SetError(err error) {
    if err != nil {
        ext.Error.Set(bs.span, true)
        bs.span.LogFields(
            log.String("event", "error"),
            log.String("message", err.Error()),
        )
    }
}

// 辅助函数
func getGRPCStatusCode(err error) string {
    if err == nil {
        return "OK"
    }
    
    if s, ok := status.FromError(err); ok {
        return s.Code().String()
    }
    
    return "UNKNOWN"
}
"""

# 创建可观测性管理器实例
observability_mgr = ObservabilityManager()

# 生成指标系统
metrics_system = observability_mgr.create_metrics_system()
print("=== 指标监控 ===")
print("✓ Prometheus集成")
print("✓ 自定义指标")
print("✓ 告警管理")
print("✓ 指标拦截器")
print("✓ 业务指标")

# 生成日志系统
logging_system = observability_mgr.create_logging_system()
print("\n=== 日志系统 ===")
print("✓ 结构化日志")
print("✓ 日志拦截器")
print("✓ 日志轮转")
print("✓ 上下文传递")
print("✓ 性能分析")

# 生成追踪系统
tracing_system = observability_mgr.create_tracing_system()
print("\n=== 分布式追踪 ===")
print("✓ Jaeger集成")
print("✓ 追踪拦截器")
print("✓ 上下文传播")
print("✓ 业务追踪")
print("✓ 性能分析")

调试技巧

1. gRPC 调试工具

# grpcurl - 命令行调试工具
grpcurl -plaintext localhost:8080 list
grpcurl -plaintext localhost:8080 describe UserService
grpcurl -plaintext -d '{"id": 1}' localhost:8080 UserService/GetUser

# grpc_cli - 官方调试工具
grpc_cli ls localhost:8080
grpc_cli call localhost:8080 UserService.GetUser "id: 1"

2. 性能分析

// pprof性能分析
import _ "net/http/pprof"

func main() {
    go func() {
        log.Println(http.ListenAndServe("localhost:6060", nil))
    }()
    
    // 启动gRPC服务器
    startGRPCServer()
}

3. 故障排查

// 健康检查实现
func (s *server) Check(ctx context.Context, req *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
    // 检查服务健康状态
    if s.isHealthy() {
        return &healthpb.HealthCheckResponse{
            Status: healthpb.HealthCheckResponse_SERVING,
        }, nil
    }
    
    return &healthpb.HealthCheckResponse{
        Status: healthpb.HealthCheckResponse_NOT_SERVING,
    }, nil
}

总结

本章深入探讨了 gRPC 应用的监控、日志和调试技术,主要内容包括:

核心要点

  1. 指标监控

    • Prometheus集成
    • 自定义指标收集
    • 告警规则配置
    • 性能指标分析
  2. 日志系统

    • 结构化日志记录
    • 上下文信息传递
    • 日志轮转管理
    • 调试信息输出
  3. 分布式追踪

    • Jaeger集成
    • 请求链路追踪
    • 性能瓶颈分析
    • 错误传播追踪
  4. 调试工具

    • 命令行工具
    • 性能分析工具
    • 健康检查机制
    • 故障排查方法

最佳实践

  1. 可观测性设计

    • 三大支柱协同
    • 统一的标识符
    • 合理的采样策略
    • 高效的数据收集
  2. 性能优化

    • 异步日志写入
    • 采样率控制
    • 批量数据传输
    • 资源使用监控
  3. 故障处理

    • 快速故障定位
    • 详细的错误信息
    • 自动化告警
    • 优雅的降级策略
  4. 运维友好

    • 标准化的指标
    • 结构化的日志
    • 可视化的监控
    • 自动化的运维

下一步学习

  • 深入学习APM系统
  • 掌握云原生监控
  • 了解AIOps实践
  • 实践SRE方法论

通过本章学习,你已经掌握了构建完整可观测性体系的核心技能,能够有效监控、调试和优化 gRPC 应用。