概述

可观测性(Observability)是微服务架构中的关键能力,它通过三大支柱——指标(Metrics)、日志(Logs)和追踪(Traces)来帮助我们理解系统的运行状态。本章将深入探讨如何构建完整的微服务监控与可观测性体系。

可观测性三大支柱

可观测性架构概览

# 可观测性架构配置
apiVersion: v1
kind: ConfigMap
metadata:
  name: observability-config
  namespace: microservices
data:
  architecture.yaml: |
    observability:
      metrics:
        collection:
          - prometheus
          - grafana
        storage: prometheus
        visualization: grafana
        alerting: alertmanager
      
      logging:
        collection:
          - fluentd
          - filebeat
        aggregation: elasticsearch
        visualization: kibana
        storage: elasticsearch
      
      tracing:
        collection: jaeger-agent
        processing: jaeger-collector
        storage: jaeger-storage
        visualization: jaeger-ui
      
      integration:
        service_mesh: istio
        api_gateway: kong
        load_balancer: nginx

指标监控(Metrics)

Prometheus配置

# Prometheus部署配置
apiVersion: apps/v1
kind: Deployment
metadata:
  name: prometheus
  namespace: monitoring
spec:
  replicas: 1
  selector:
    matchLabels:
      app: prometheus
  template:
    metadata:
      labels:
        app: prometheus
    spec:
      containers:
      - name: prometheus
        image: prom/prometheus:v2.40.0
        ports:
        - containerPort: 9090
        volumeMounts:
        - name: config
          mountPath: /etc/prometheus
        - name: storage
          mountPath: /prometheus
        args:
        - '--config.file=/etc/prometheus/prometheus.yml'
        - '--storage.tsdb.path=/prometheus'
        - '--web.console.libraries=/etc/prometheus/console_libraries'
        - '--web.console.templates=/etc/prometheus/consoles'
        - '--storage.tsdb.retention.time=15d'
        - '--web.enable-lifecycle'
      volumes:
      - name: config
        configMap:
          name: prometheus-config
      - name: storage
        persistentVolumeClaim:
          claimName: prometheus-storage
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: prometheus-config
  namespace: monitoring
data:
  prometheus.yml: |
    global:
      scrape_interval: 15s
      evaluation_interval: 15s
    
    rule_files:
    - "rules/*.yml"
    
    alerting:
      alertmanagers:
      - static_configs:
        - targets:
          - alertmanager:9093
    
    scrape_configs:
    - job_name: 'kubernetes-apiservers'
      kubernetes_sd_configs:
      - role: endpoints
      scheme: https
      tls_config:
        ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
      bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token
      relabel_configs:
      - source_labels: [__meta_kubernetes_namespace, __meta_kubernetes_service_name, __meta_kubernetes_endpoint_port_name]
        action: keep
        regex: default;kubernetes;https
    
    - job_name: 'kubernetes-nodes'
      kubernetes_sd_configs:
      - role: node
      scheme: https
      tls_config:
        ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
      bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token
      relabel_configs:
      - action: labelmap
        regex: __meta_kubernetes_node_label_(.+)
      - target_label: __address__
        replacement: kubernetes.default.svc:443
      - source_labels: [__meta_kubernetes_node_name]
        regex: (.+)
        target_label: __metrics_path__
        replacement: /api/v1/nodes/${1}/proxy/metrics
    
    - job_name: 'microservices'
      kubernetes_sd_configs:
      - role: endpoints
        namespaces:
          names:
          - microservices
      relabel_configs:
      - source_labels: [__meta_kubernetes_service_annotation_prometheus_io_scrape]
        action: keep
        regex: true
      - source_labels: [__meta_kubernetes_service_annotation_prometheus_io_path]
        action: replace
        target_label: __metrics_path__
        regex: (.+)
      - source_labels: [__address__, __meta_kubernetes_service_annotation_prometheus_io_port]
        action: replace
        regex: ([^:]+)(?::\d+)?;(\d+)
        replacement: $1:$2
        target_label: __address__
      - action: labelmap
        regex: __meta_kubernetes_service_label_(.+)
      - source_labels: [__meta_kubernetes_namespace]
        action: replace
        target_label: kubernetes_namespace
      - source_labels: [__meta_kubernetes_service_name]
        action: replace
        target_label: kubernetes_name
---
apiVersion: v1
kind: Service
metadata:
  name: prometheus
  namespace: monitoring
  annotations:
    prometheus.io/scrape: "true"
    prometheus.io/port: "9090"
spec:
  selector:
    app: prometheus
  ports:
  - port: 9090
    targetPort: 9090
    name: web

Go语言指标收集实现

// 指标收集实现
package metrics

import (
    "net/http"
    "strconv"
    "time"
    
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promhttp"
)

// MetricsCollector 指标收集器
type MetricsCollector struct {
    // HTTP指标
    httpRequestsTotal    *prometheus.CounterVec
    httpRequestDuration *prometheus.HistogramVec
    httpRequestsInFlight prometheus.Gauge
    
    // 业务指标
    businessOperationsTotal *prometheus.CounterVec
    businessOperationDuration *prometheus.HistogramVec
    
    // 系统指标
    systemCPUUsage    prometheus.Gauge
    systemMemoryUsage prometheus.Gauge
    systemDiskUsage   prometheus.Gauge
    
    // 数据库指标
    dbConnectionsActive prometheus.Gauge
    dbConnectionsIdle   prometheus.Gauge
    dbQueriesTotal      *prometheus.CounterVec
    dbQueryDuration     *prometheus.HistogramVec
    
    // 缓存指标
    cacheHitsTotal   *prometheus.CounterVec
    cacheMissesTotal *prometheus.CounterVec
    cacheSize        prometheus.Gauge
}

func NewMetricsCollector(serviceName string) *MetricsCollector {
    collector := &MetricsCollector{
        // HTTP指标
        httpRequestsTotal: prometheus.NewCounterVec(
            prometheus.CounterOpts{
                Name: "http_requests_total",
                Help: "Total number of HTTP requests",
            },
            []string{"service", "method", "endpoint", "status_code"},
        ),
        httpRequestDuration: prometheus.NewHistogramVec(
            prometheus.HistogramOpts{
                Name:    "http_request_duration_seconds",
                Help:    "HTTP request duration in seconds",
                Buckets: prometheus.DefBuckets,
            },
            []string{"service", "method", "endpoint"},
        ),
        httpRequestsInFlight: prometheus.NewGauge(
            prometheus.GaugeOpts{
                Name: "http_requests_in_flight",
                Help: "Current number of HTTP requests being processed",
            },
        ),
        
        // 业务指标
        businessOperationsTotal: prometheus.NewCounterVec(
            prometheus.CounterOpts{
                Name: "business_operations_total",
                Help: "Total number of business operations",
            },
            []string{"service", "operation", "status"},
        ),
        businessOperationDuration: prometheus.NewHistogramVec(
            prometheus.HistogramOpts{
                Name:    "business_operation_duration_seconds",
                Help:    "Business operation duration in seconds",
                Buckets: []float64{0.001, 0.01, 0.1, 0.5, 1, 2.5, 5, 10},
            },
            []string{"service", "operation"},
        ),
        
        // 系统指标
        systemCPUUsage: prometheus.NewGauge(
            prometheus.GaugeOpts{
                Name: "system_cpu_usage_percent",
                Help: "Current CPU usage percentage",
            },
        ),
        systemMemoryUsage: prometheus.NewGauge(
            prometheus.GaugeOpts{
                Name: "system_memory_usage_bytes",
                Help: "Current memory usage in bytes",
            },
        ),
        systemDiskUsage: prometheus.NewGauge(
            prometheus.GaugeOpts{
                Name: "system_disk_usage_percent",
                Help: "Current disk usage percentage",
            },
        ),
        
        // 数据库指标
        dbConnectionsActive: prometheus.NewGauge(
            prometheus.GaugeOpts{
                Name: "db_connections_active",
                Help: "Number of active database connections",
            },
        ),
        dbConnectionsIdle: prometheus.NewGauge(
            prometheus.GaugeOpts{
                Name: "db_connections_idle",
                Help: "Number of idle database connections",
            },
        ),
        dbQueriesTotal: prometheus.NewCounterVec(
            prometheus.CounterOpts{
                Name: "db_queries_total",
                Help: "Total number of database queries",
            },
            []string{"service", "query_type", "status"},
        ),
        dbQueryDuration: prometheus.NewHistogramVec(
            prometheus.HistogramOpts{
                Name:    "db_query_duration_seconds",
                Help:    "Database query duration in seconds",
                Buckets: []float64{0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 2},
            },
            []string{"service", "query_type"},
        ),
        
        // 缓存指标
        cacheHitsTotal: prometheus.NewCounterVec(
            prometheus.CounterOpts{
                Name: "cache_hits_total",
                Help: "Total number of cache hits",
            },
            []string{"service", "cache_name"},
        ),
        cacheMissesTotal: prometheus.NewCounterVec(
            prometheus.CounterOpts{
                Name: "cache_misses_total",
                Help: "Total number of cache misses",
            },
            []string{"service", "cache_name"},
        ),
        cacheSize: prometheus.NewGauge(
            prometheus.GaugeOpts{
                Name: "cache_size_bytes",
                Help: "Current cache size in bytes",
            },
        ),
    }
    
    // 注册所有指标
    prometheus.MustRegister(
        collector.httpRequestsTotal,
        collector.httpRequestDuration,
        collector.httpRequestsInFlight,
        collector.businessOperationsTotal,
        collector.businessOperationDuration,
        collector.systemCPUUsage,
        collector.systemMemoryUsage,
        collector.systemDiskUsage,
        collector.dbConnectionsActive,
        collector.dbConnectionsIdle,
        collector.dbQueriesTotal,
        collector.dbQueryDuration,
        collector.cacheHitsTotal,
        collector.cacheMissesTotal,
        collector.cacheSize,
    )
    
    return collector
}

// RecordHTTPRequest 记录HTTP请求指标
func (collector *MetricsCollector) RecordHTTPRequest(service, method, endpoint string, statusCode int, duration time.Duration) {
    collector.httpRequestsTotal.WithLabelValues(
        service, method, endpoint, strconv.Itoa(statusCode),
    ).Inc()
    
    collector.httpRequestDuration.WithLabelValues(
        service, method, endpoint,
    ).Observe(duration.Seconds())
}

// IncHTTPRequestsInFlight 增加正在处理的HTTP请求数
func (collector *MetricsCollector) IncHTTPRequestsInFlight() {
    collector.httpRequestsInFlight.Inc()
}

// DecHTTPRequestsInFlight 减少正在处理的HTTP请求数
func (collector *MetricsCollector) DecHTTPRequestsInFlight() {
    collector.httpRequestsInFlight.Dec()
}

// RecordBusinessOperation 记录业务操作指标
func (collector *MetricsCollector) RecordBusinessOperation(service, operation, status string, duration time.Duration) {
    collector.businessOperationsTotal.WithLabelValues(
        service, operation, status,
    ).Inc()
    
    collector.businessOperationDuration.WithLabelValues(
        service, operation,
    ).Observe(duration.Seconds())
}

// UpdateSystemMetrics 更新系统指标
func (collector *MetricsCollector) UpdateSystemMetrics(cpuUsage, memoryUsage float64, diskUsage float64) {
    collector.systemCPUUsage.Set(cpuUsage)
    collector.systemMemoryUsage.Set(memoryUsage)
    collector.systemDiskUsage.Set(diskUsage)
}

// UpdateDatabaseMetrics 更新数据库指标
func (collector *MetricsCollector) UpdateDatabaseMetrics(activeConns, idleConns int) {
    collector.dbConnectionsActive.Set(float64(activeConns))
    collector.dbConnectionsIdle.Set(float64(idleConns))
}

// RecordDatabaseQuery 记录数据库查询指标
func (collector *MetricsCollector) RecordDatabaseQuery(service, queryType, status string, duration time.Duration) {
    collector.dbQueriesTotal.WithLabelValues(
        service, queryType, status,
    ).Inc()
    
    collector.dbQueryDuration.WithLabelValues(
        service, queryType,
    ).Observe(duration.Seconds())
}

// RecordCacheHit 记录缓存命中
func (collector *MetricsCollector) RecordCacheHit(service, cacheName string) {
    collector.cacheHitsTotal.WithLabelValues(service, cacheName).Inc()
}

// RecordCacheMiss 记录缓存未命中
func (collector *MetricsCollector) RecordCacheMiss(service, cacheName string) {
    collector.cacheMissesTotal.WithLabelValues(service, cacheName).Inc()
}

// UpdateCacheSize 更新缓存大小
func (collector *MetricsCollector) UpdateCacheSize(size float64) {
    collector.cacheSize.Set(size)
}

// MetricsMiddleware HTTP指标中间件
type MetricsMiddleware struct {
    collector   *MetricsCollector
    serviceName string
}

func NewMetricsMiddleware(collector *MetricsCollector, serviceName string) *MetricsMiddleware {
    return &MetricsMiddleware{
        collector:   collector,
        serviceName: serviceName,
    }
}

func (middleware *MetricsMiddleware) Handler(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        start := time.Now()
        
        // 增加正在处理的请求数
        middleware.collector.IncHTTPRequestsInFlight()
        defer middleware.collector.DecHTTPRequestsInFlight()
        
        // 创建响应记录器
        recorder := &responseRecorder{
            ResponseWriter: w,
            statusCode:     http.StatusOK,
        }
        
        // 处理请求
        next.ServeHTTP(recorder, r)
        
        // 记录指标
        duration := time.Since(start)
        middleware.collector.RecordHTTPRequest(
            middleware.serviceName,
            r.Method,
            r.URL.Path,
            recorder.statusCode,
            duration,
        )
    })
}

// responseRecorder 响应记录器
type responseRecorder struct {
    http.ResponseWriter
    statusCode int
}

func (recorder *responseRecorder) WriteHeader(statusCode int) {
    recorder.statusCode = statusCode
    recorder.ResponseWriter.WriteHeader(statusCode)
}

// StartMetricsServer 启动指标服务器
func StartMetricsServer(port string) {
    http.Handle("/metrics", promhttp.Handler())
    http.ListenAndServe(":"+port, nil)
}

// 使用示例
func ExampleMetrics() {
    // 创建指标收集器
    collector := NewMetricsCollector("user-service")
    
    // 创建指标中间件
    metricsMiddleware := NewMetricsMiddleware(collector, "user-service")
    
    // 创建HTTP路由
    mux := http.NewServeMux()
    mux.HandleFunc("/users", func(w http.ResponseWriter, r *http.Request) {
        start := time.Now()
        
        // 模拟业务操作
        time.Sleep(100 * time.Millisecond)
        
        // 记录业务指标
        collector.RecordBusinessOperation(
            "user-service",
            "get_users",
            "success",
            time.Since(start),
        )
        
        w.WriteHeader(http.StatusOK)
        w.Write([]byte("Users data"))
    })
    
    // 应用指标中间件
    handler := metricsMiddleware.Handler(mux)
    
    // 启动指标服务器(在单独的goroutine中)
    go StartMetricsServer("9090")
    
    // 启动主服务器
    // http.ListenAndServe(":8080", handler)
}

日志聚合(Logging)

ELK Stack配置

# Elasticsearch部署
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: elasticsearch
  namespace: logging
spec:
  serviceName: elasticsearch
  replicas: 3
  selector:
    matchLabels:
      app: elasticsearch
  template:
    metadata:
      labels:
        app: elasticsearch
    spec:
      containers:
      - name: elasticsearch
        image: docker.elastic.co/elasticsearch/elasticsearch:8.5.0
        ports:
        - containerPort: 9200
          name: http
        - containerPort: 9300
          name: transport
        env:
        - name: cluster.name
          value: "microservices-logs"
        - name: node.name
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: discovery.seed_hosts
          value: "elasticsearch-0.elasticsearch,elasticsearch-1.elasticsearch,elasticsearch-2.elasticsearch"
        - name: cluster.initial_master_nodes
          value: "elasticsearch-0,elasticsearch-1,elasticsearch-2"
        - name: ES_JAVA_OPTS
          value: "-Xms1g -Xmx1g"
        - name: xpack.security.enabled
          value: "false"
        volumeMounts:
        - name: data
          mountPath: /usr/share/elasticsearch/data
        resources:
          requests:
            memory: "2Gi"
            cpu: "1000m"
          limits:
            memory: "2Gi"
            cpu: "1000m"
  volumeClaimTemplates:
  - metadata:
      name: data
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 10Gi
---
apiVersion: v1
kind: Service
metadata:
  name: elasticsearch
  namespace: logging
spec:
  clusterIP: None
  selector:
    app: elasticsearch
  ports:
  - port: 9200
    name: http
  - port: 9300
    name: transport
---
# Kibana部署
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kibana
  namespace: logging
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kibana
  template:
    metadata:
      labels:
        app: kibana
    spec:
      containers:
      - name: kibana
        image: docker.elastic.co/kibana/kibana:8.5.0
        ports:
        - containerPort: 5601
        env:
        - name: ELASTICSEARCH_HOSTS
          value: "http://elasticsearch:9200"
        - name: SERVER_NAME
          value: "kibana"
        - name: SERVER_HOST
          value: "0.0.0.0"
        resources:
          requests:
            memory: "1Gi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:
  name: kibana
  namespace: logging
spec:
  selector:
    app: kibana
  ports:
  - port: 5601
    targetPort: 5601
  type: LoadBalancer
---
# Fluentd配置
apiVersion: v1
kind: ConfigMap
metadata:
  name: fluentd-config
  namespace: logging
data:
  fluent.conf: |
    <source>
      @type tail
      path /var/log/containers/*.log
      pos_file /var/log/fluentd-containers.log.pos
      tag kubernetes.*
      read_from_head true
      <parse>
        @type json
        time_format %Y-%m-%dT%H:%M:%S.%NZ
      </parse>
    </source>
    
    <filter kubernetes.**>
      @type kubernetes_metadata
    </filter>
    
    <filter kubernetes.**>
      @type record_transformer
      <record>
        hostname ${hostname}
        service_name ${record["kubernetes"]["labels"]["app"]}
        namespace ${record["kubernetes"]["namespace_name"]}
        pod_name ${record["kubernetes"]["pod_name"]}
        container_name ${record["kubernetes"]["container_name"]}
      </record>
    </filter>
    
    <match kubernetes.**>
      @type elasticsearch
      host elasticsearch.logging.svc.cluster.local
      port 9200
      index_name microservices-logs
      type_name _doc
      include_tag_key true
      tag_key @log_name
      flush_interval 1s
      <buffer>
        @type file
        path /var/log/fluentd-buffers/kubernetes.system.buffer
        flush_mode interval
        retry_type exponential_backoff
        flush_thread_count 2
        flush_interval 5s
        retry_forever
        retry_max_interval 30
        chunk_limit_size 2M
        queue_limit_length 8
        overflow_action block
      </buffer>
    </match>
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd
  namespace: logging
spec:
  selector:
    matchLabels:
      app: fluentd
  template:
    metadata:
      labels:
        app: fluentd
    spec:
      serviceAccountName: fluentd
      containers:
      - name: fluentd
        image: fluent/fluentd-kubernetes-daemonset:v1.15-debian-elasticsearch7-1
        env:
        - name: FLUENT_ELASTICSEARCH_HOST
          value: "elasticsearch.logging.svc.cluster.local"
        - name: FLUENT_ELASTICSEARCH_PORT
          value: "9200"
        - name: FLUENT_ELASTICSEARCH_SCHEME
          value: "http"
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
        - name: config
          mountPath: /fluentd/etc/fluent.conf
          subPath: fluent.conf
        resources:
          requests:
            memory: "200Mi"
            cpu: "100m"
          limits:
            memory: "500Mi"
            cpu: "500m"
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
      - name: config
        configMap:
          name: fluentd-config

Go语言结构化日志实现

// 结构化日志实现
package logging

import (
    "context"
    "encoding/json"
    "fmt"
    "io"
    "os"
    "runtime"
    "strings"
    "time"
    
    "github.com/sirupsen/logrus"
)

// LogLevel 日志级别
type LogLevel string

const (
    DebugLevel LogLevel = "debug"
    InfoLevel  LogLevel = "info"
    WarnLevel  LogLevel = "warn"
    ErrorLevel LogLevel = "error"
    FatalLevel LogLevel = "fatal"
)

// LogEntry 日志条目
type LogEntry struct {
    Timestamp   time.Time              `json:"timestamp"`
    Level       LogLevel               `json:"level"`
    Service     string                 `json:"service"`
    Version     string                 `json:"version"`
    TraceID     string                 `json:"trace_id,omitempty"`
    SpanID      string                 `json:"span_id,omitempty"`
    UserID      string                 `json:"user_id,omitempty"`
    RequestID   string                 `json:"request_id,omitempty"`
    Message     string                 `json:"message"`
    Fields      map[string]interface{} `json:"fields,omitempty"`
    Error       string                 `json:"error,omitempty"`
    Stack       string                 `json:"stack,omitempty"`
    File        string                 `json:"file,omitempty"`
    Line        int                    `json:"line,omitempty"`
    Function    string                 `json:"function,omitempty"`
}

// Logger 结构化日志器
type Logger struct {
    service   string
    version   string
    output    io.Writer
    level     LogLevel
    formatter Formatter
}

// Formatter 日志格式化器接口
type Formatter interface {
    Format(entry *LogEntry) ([]byte, error)
}

// JSONFormatter JSON格式化器
type JSONFormatter struct{}

func (f *JSONFormatter) Format(entry *LogEntry) ([]byte, error) {
    data, err := json.Marshal(entry)
    if err != nil {
        return nil, err
    }
    return append(data, '\n'), nil
}

// TextFormatter 文本格式化器
type TextFormatter struct{}

func (f *TextFormatter) Format(entry *LogEntry) ([]byte, error) {
    var fields []string
    for k, v := range entry.Fields {
        fields = append(fields, fmt.Sprintf("%s=%v", k, v))
    }
    
    fieldsStr := ""
    if len(fields) > 0 {
        fieldsStr = " " + strings.Join(fields, " ")
    }
    
    line := fmt.Sprintf("[%s] %s %s: %s%s\n",
        entry.Timestamp.Format(time.RFC3339),
        strings.ToUpper(string(entry.Level)),
        entry.Service,
        entry.Message,
        fieldsStr,
    )
    
    return []byte(line), nil
}

// NewLogger 创建新的日志器
func NewLogger(service, version string, level LogLevel, output io.Writer, formatter Formatter) *Logger {
    if output == nil {
        output = os.Stdout
    }
    if formatter == nil {
        formatter = &JSONFormatter{}
    }
    
    return &Logger{
        service:   service,
        version:   version,
        output:    output,
        level:     level,
        formatter: formatter,
    }
}

// WithContext 从上下文创建日志器
func (logger *Logger) WithContext(ctx context.Context) *ContextLogger {
    return &ContextLogger{
        logger: logger,
        ctx:    ctx,
    }
}

// log 记录日志
func (logger *Logger) log(level LogLevel, message string, fields map[string]interface{}, err error) {
    if !logger.shouldLog(level) {
        return
    }
    
    entry := &LogEntry{
        Timestamp: time.Now().UTC(),
        Level:     level,
        Service:   logger.service,
        Version:   logger.version,
        Message:   message,
        Fields:    fields,
    }
    
    if err != nil {
        entry.Error = err.Error()
        if level == ErrorLevel || level == FatalLevel {
            entry.Stack = getStackTrace()
        }
    }
    
    // 获取调用信息
    if pc, file, line, ok := runtime.Caller(3); ok {
        entry.File = file
        entry.Line = line
        if fn := runtime.FuncForPC(pc); fn != nil {
            entry.Function = fn.Name()
        }
    }
    
    data, err := logger.formatter.Format(entry)
    if err != nil {
        fmt.Fprintf(os.Stderr, "Failed to format log entry: %v\n", err)
        return
    }
    
    logger.output.Write(data)
}

// shouldLog 检查是否应该记录日志
func (logger *Logger) shouldLog(level LogLevel) bool {
    levels := map[LogLevel]int{
        DebugLevel: 0,
        InfoLevel:  1,
        WarnLevel:  2,
        ErrorLevel: 3,
        FatalLevel: 4,
    }
    
    return levels[level] >= levels[logger.level]
}

// Debug 记录调试日志
func (logger *Logger) Debug(message string, fields ...map[string]interface{}) {
    var f map[string]interface{}
    if len(fields) > 0 {
        f = fields[0]
    }
    logger.log(DebugLevel, message, f, nil)
}

// Info 记录信息日志
func (logger *Logger) Info(message string, fields ...map[string]interface{}) {
    var f map[string]interface{}
    if len(fields) > 0 {
        f = fields[0]
    }
    logger.log(InfoLevel, message, f, nil)
}

// Warn 记录警告日志
func (logger *Logger) Warn(message string, fields ...map[string]interface{}) {
    var f map[string]interface{}
    if len(fields) > 0 {
        f = fields[0]
    }
    logger.log(WarnLevel, message, f, nil)
}

// Error 记录错误日志
func (logger *Logger) Error(message string, err error, fields ...map[string]interface{}) {
    var f map[string]interface{}
    if len(fields) > 0 {
        f = fields[0]
    }
    logger.log(ErrorLevel, message, f, err)
}

// Fatal 记录致命错误日志
func (logger *Logger) Fatal(message string, err error, fields ...map[string]interface{}) {
    var f map[string]interface{}
    if len(fields) > 0 {
        f = fields[0]
    }
    logger.log(FatalLevel, message, f, err)
    os.Exit(1)
}

// ContextLogger 带上下文的日志器
type ContextLogger struct {
    logger *Logger
    ctx    context.Context
}

// extractContextFields 从上下文提取字段
func (cl *ContextLogger) extractContextFields() map[string]interface{} {
    fields := make(map[string]interface{})
    
    if traceID := cl.ctx.Value("trace_id"); traceID != nil {
        fields["trace_id"] = traceID
    }
    
    if spanID := cl.ctx.Value("span_id"); spanID != nil {
        fields["span_id"] = spanID
    }
    
    if userID := cl.ctx.Value("user_id"); userID != nil {
        fields["user_id"] = userID
    }
    
    if requestID := cl.ctx.Value("request_id"); requestID != nil {
        fields["request_id"] = requestID
    }
    
    return fields
}

// mergeFields 合并字段
func (cl *ContextLogger) mergeFields(fields ...map[string]interface{}) map[string]interface{} {
    contextFields := cl.extractContextFields()
    
    for _, f := range fields {
        for k, v := range f {
            contextFields[k] = v
        }
    }
    
    return contextFields
}

// Debug 记录调试日志
func (cl *ContextLogger) Debug(message string, fields ...map[string]interface{}) {
    mergedFields := cl.mergeFields(fields...)
    cl.logger.log(DebugLevel, message, mergedFields, nil)
}

// Info 记录信息日志
func (cl *ContextLogger) Info(message string, fields ...map[string]interface{}) {
    mergedFields := cl.mergeFields(fields...)
    cl.logger.log(InfoLevel, message, mergedFields, nil)
}

// Warn 记录警告日志
func (cl *ContextLogger) Warn(message string, fields ...map[string]interface{}) {
    mergedFields := cl.mergeFields(fields...)
    cl.logger.log(WarnLevel, message, mergedFields, nil)
}

// Error 记录错误日志
func (cl *ContextLogger) Error(message string, err error, fields ...map[string]interface{}) {
    mergedFields := cl.mergeFields(fields...)
    cl.logger.log(ErrorLevel, message, mergedFields, err)
}

// Fatal 记录致命错误日志
func (cl *ContextLogger) Fatal(message string, err error, fields ...map[string]interface{}) {
    mergedFields := cl.mergeFields(fields...)
    cl.logger.log(FatalLevel, message, mergedFields, err)
    os.Exit(1)
}

// getStackTrace 获取堆栈跟踪
func getStackTrace() string {
    buf := make([]byte, 1024)
    for {
        n := runtime.Stack(buf, false)
        if n < len(buf) {
            return string(buf[:n])
        }
        buf = make([]byte, 2*len(buf))
    }
}

// LoggingMiddleware 日志中间件
type LoggingMiddleware struct {
    logger *Logger
}

func NewLoggingMiddleware(logger *Logger) *LoggingMiddleware {
    return &LoggingMiddleware{
        logger: logger,
    }
}

func (middleware *LoggingMiddleware) Handler(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        start := time.Now()
        
        // 生成请求ID
        requestID := generateRequestID()
        ctx := context.WithValue(r.Context(), "request_id", requestID)
        r = r.WithContext(ctx)
        
        // 创建响应记录器
        recorder := &responseRecorder{
            ResponseWriter: w,
            statusCode:     http.StatusOK,
        }
        
        // 记录请求开始
        contextLogger := middleware.logger.WithContext(ctx)
        contextLogger.Info("Request started", map[string]interface{}{
            "method":     r.Method,
            "path":       r.URL.Path,
            "query":      r.URL.RawQuery,
            "user_agent": r.UserAgent(),
            "remote_addr": r.RemoteAddr,
        })
        
        // 处理请求
        next.ServeHTTP(recorder, r)
        
        // 记录请求完成
        duration := time.Since(start)
        level := InfoLevel
        if recorder.statusCode >= 400 {
            level = WarnLevel
        }
        if recorder.statusCode >= 500 {
            level = ErrorLevel
        }
        
        contextLogger.log(level, "Request completed", map[string]interface{}{
            "method":      r.Method,
            "path":        r.URL.Path,
            "status_code": recorder.statusCode,
            "duration_ms": duration.Milliseconds(),
            "response_size": recorder.size,
        }, nil)
    })
}

// responseRecorder 响应记录器
type responseRecorder struct {
    http.ResponseWriter
    statusCode int
    size       int
}

func (recorder *responseRecorder) WriteHeader(statusCode int) {
    recorder.statusCode = statusCode
    recorder.ResponseWriter.WriteHeader(statusCode)
}

func (recorder *responseRecorder) Write(data []byte) (int, error) {
    size, err := recorder.ResponseWriter.Write(data)
    recorder.size += size
    return size, err
}

// generateRequestID 生成请求ID
func generateRequestID() string {
    return fmt.Sprintf("%d", time.Now().UnixNano())
}

// 使用示例
func ExampleLogging() {
    // 创建日志器
    logger := NewLogger(
        "user-service",
        "v1.0.0",
        InfoLevel,
        os.Stdout,
        &JSONFormatter{},
    )
    
    // 创建日志中间件
    loggingMiddleware := NewLoggingMiddleware(logger)
    
    // 创建HTTP路由
    mux := http.NewServeMux()
    mux.HandleFunc("/users", func(w http.ResponseWriter, r *http.Request) {
        ctx := r.Context()
        contextLogger := logger.WithContext(ctx)
        
        contextLogger.Info("Processing user request", map[string]interface{}{
            "operation": "get_users",
        })
        
        // 模拟业务逻辑
        users := []string{"user1", "user2", "user3"}
        
        contextLogger.Info("Users retrieved successfully", map[string]interface{}{
            "count": len(users),
        })
        
        w.Header().Set("Content-Type", "application/json")
        json.NewEncoder(w).Encode(users)
    })
    
    // 应用日志中间件
    handler := loggingMiddleware.Handler(mux)
    
    // 启动服务器
    // http.ListenAndServe(":8080", handler)
}

可观测性最佳实践

1. 指标设计原则

  • 四个黄金信号:延迟、流量、错误、饱和度
  • RED方法:Rate(请求速率)、Errors(错误率)、Duration(持续时间)
  • USE方法:Utilization(使用率)、Saturation(饱和度)、Errors(错误)

2. 日志最佳实践

  • 结构化日志:使用JSON格式,便于解析和查询
  • 日志级别:合理使用DEBUG、INFO、WARN、ERROR、FATAL
  • 上下文信息:包含请求ID、用户ID、会话ID等关键信息
  • 敏感信息:避免记录密码、令牌等敏感数据

3. 追踪最佳实践

  • 采样策略:在高流量环境中使用适当的采样率
  • Span命名:使用有意义的操作名称
  • 标签使用:添加有助于过滤和分析的标签
  • 错误处理:确保错误信息被正确记录

4. 告警最佳实践

  • 告警分级:区分不同严重程度的告警
  • 告警聚合:避免告警风暴,合理聚合相关告警
  • 告警疲劳:设置合理的阈值,避免过多误报
  • 可操作性:告警应包含足够的上下文信息和处理建议

5. 仪表板设计

  • 分层设计:从概览到详细的层次化视图
  • 关键指标:突出显示最重要的业务和技术指标
  • 时间范围:提供多种时间范围选择
  • 钻取能力:支持从高层视图钻取到详细信息

6. 性能优化

  • 指标收集:避免过度收集,影响应用性能
  • 存储优化:合理设置数据保留策略
  • 查询优化:优化复杂查询的性能
  • 网络带宽:考虑监控数据传输的网络开销

7. 安全考虑

  • 数据脱敏:确保监控数据不包含敏感信息
  • 访问控制:实施适当的权限管理
  • 数据加密:保护传输和存储中的监控数据
  • 审计日志:记录对监控系统的访问和操作

8. 团队协作

  • SLI/SLO定义:明确服务级别指标和目标
  • Runbook:为常见问题提供操作手册
  • 知识共享:建立监控知识库和最佳实践文档
  • 培训计划:确保团队成员具备必要的监控技能

本章总结

本章详细介绍了微服务监控与可观测性的核心概念和实践方法:

主要内容回顾

  1. 可观测性概述

    • 可观测性三大支柱:指标、日志、追踪
    • 监控架构设计和工具选择
    • 与微服务架构的集成方案
  2. 指标监控(Metrics)

    • Prometheus部署和配置
    • Go语言指标收集实现
    • HTTP、业务、系统、数据库指标
    • 指标中间件和使用示例
  3. 日志聚合(Logging)

    • ELK Stack部署配置
    • 结构化日志实现
    • 日志格式化和上下文管理
    • 日志中间件集成
  4. 分布式追踪(Tracing)

    • Jaeger部署和配置
    • 分布式追踪实现
    • HTTP、数据库、消息队列追踪
    • 追踪中间件和客户端
  5. 告警系统(Alerting)

    • Alertmanager配置和部署
    • Prometheus告警规则
    • Go语言告警客户端
    • 健康检查和阈值监控
  6. 最佳实践

    • 指标设计原则(四个黄金信号、RED/USE方法)
    • 日志、追踪、告警最佳实践
    • 仪表板设计和性能优化
    • 安全考虑和团队协作

技术要点

  • 全面覆盖:实现指标、日志、追踪的完整可观测性
  • 工具集成:Prometheus、ELK、Jaeger等主流工具的使用
  • 代码实现:提供完整的Go语言实现示例
  • Kubernetes集成:所有组件都提供K8s部署配置
  • 实用性:注重实际应用场景和最佳实践

下一章预告

下一章我们将探讨微服务部署与运维,包括: - 容器化部署策略 - CI/CD流水线设计 - 蓝绿部署和金丝雀发布 - 服务网格运维 - 故障排查和性能调优

通过本章的学习,您应该能够构建完整的微服务可观测性体系,实现对微服务系统的全面监控和管理。

告警系统(Alerting)

Alertmanager配置

# Alertmanager配置
apiVersion: v1
kind: ConfigMap
metadata:
  name: alertmanager-config
  namespace: monitoring
data:
  alertmanager.yml: |
    global:
      smtp_smarthost: 'smtp.gmail.com:587'
      smtp_from: 'alerts@company.com'
      smtp_auth_username: 'alerts@company.com'
      smtp_auth_password: 'password'
    
    route:
      group_by: ['alertname', 'cluster', 'service']
      group_wait: 10s
      group_interval: 10s
      repeat_interval: 1h
      receiver: 'web.hook'
      routes:
      - match:
          severity: critical
        receiver: 'critical-alerts'
      - match:
          severity: warning
        receiver: 'warning-alerts'
    
    receivers:
    - name: 'web.hook'
      webhook_configs:
      - url: 'http://webhook-service:8080/alerts'
        send_resolved: true
    
    - name: 'critical-alerts'
      email_configs:
      - to: 'oncall@company.com'
        subject: '[CRITICAL] {{ .GroupLabels.alertname }}'
        body: |
          {{ range .Alerts }}
          Alert: {{ .Annotations.summary }}
          Description: {{ .Annotations.description }}
          Labels: {{ range .Labels.SortedPairs }}{{ .Name }}={{ .Value }} {{ end }}
          {{ end }}
      slack_configs:
      - api_url: 'https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK'
        channel: '#alerts-critical'
        title: 'Critical Alert'
        text: '{{ range .Alerts }}{{ .Annotations.summary }}{{ end }}'
    
    - name: 'warning-alerts'
      email_configs:
      - to: 'team@company.com'
        subject: '[WARNING] {{ .GroupLabels.alertname }}'
        body: |
          {{ range .Alerts }}
          Alert: {{ .Annotations.summary }}
          Description: {{ .Annotations.description }}
          {{ end }}
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: alertmanager
  namespace: monitoring
spec:
  replicas: 1
  selector:
    matchLabels:
      app: alertmanager
  template:
    metadata:
      labels:
        app: alertmanager
    spec:
      containers:
      - name: alertmanager
        image: prom/alertmanager:v0.25.0
        ports:
        - containerPort: 9093
        volumeMounts:
        - name: config
          mountPath: /etc/alertmanager
        - name: storage
          mountPath: /alertmanager
        args:
        - '--config.file=/etc/alertmanager/alertmanager.yml'
        - '--storage.path=/alertmanager'
        - '--web.external-url=http://alertmanager.monitoring.svc.cluster.local:9093'
        resources:
          requests:
            memory: "256Mi"
            cpu: "100m"
          limits:
            memory: "512Mi"
            cpu: "200m"
      volumes:
      - name: config
        configMap:
          name: alertmanager-config
      - name: storage
        emptyDir: {}
---
apiVersion: v1
kind: Service
metadata:
  name: alertmanager
  namespace: monitoring
spec:
  selector:
    app: alertmanager
  ports:
  - port: 9093
    targetPort: 9093
  type: LoadBalancer

Prometheus告警规则

# Prometheus告警规则
apiVersion: v1
kind: ConfigMap
metadata:
  name: prometheus-rules
  namespace: monitoring
data:
  microservices.rules.yml: |
    groups:
    - name: microservices.rules
      rules:
      # 服务可用性告警
      - alert: ServiceDown
        expr: up == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Service {{ $labels.instance }} is down"
          description: "Service {{ $labels.instance }} has been down for more than 1 minute."
      
      # 高错误率告警
      - alert: HighErrorRate
        expr: |
          (
            rate(http_requests_total{status=~"5.."}[5m]) /
            rate(http_requests_total[5m])
          ) > 0.05
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High error rate on {{ $labels.service }}"
          description: "Error rate is {{ $value | humanizePercentage }} on {{ $labels.service }}."
      
      # 高延迟告警
      - alert: HighLatency
        expr: |
          histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 0.5
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High latency on {{ $labels.service }}"
          description: "95th percentile latency is {{ $value }}s on {{ $labels.service }}."
      
      # CPU使用率告警
      - alert: HighCPUUsage
        expr: |
          (
            rate(container_cpu_usage_seconds_total[5m]) * 100
          ) > 80
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "High CPU usage on {{ $labels.pod }}"
          description: "CPU usage is {{ $value }}% on {{ $labels.pod }}."
      
      # 内存使用率告警
      - alert: HighMemoryUsage
        expr: |
          (
            container_memory_working_set_bytes / container_spec_memory_limit_bytes * 100
          ) > 80
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "High memory usage on {{ $labels.pod }}"
          description: "Memory usage is {{ $value }}% on {{ $labels.pod }}."
      
      # 磁盘空间告警
      - alert: DiskSpaceLow
        expr: |
          (
            (node_filesystem_size_bytes - node_filesystem_free_bytes) / node_filesystem_size_bytes * 100
          ) > 85
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Disk space low on {{ $labels.instance }}"
          description: "Disk usage is {{ $value }}% on {{ $labels.instance }}."
      
      # 数据库连接池告警
      - alert: DatabaseConnectionPoolHigh
        expr: |
          (
            db_connections_active / db_connections_max * 100
          ) > 80
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Database connection pool usage high"
          description: "Database connection pool usage is {{ $value }}% on {{ $labels.service }}."
      
      # 消息队列积压告警
      - alert: MessageQueueBacklog
        expr: |
          rabbitmq_queue_messages > 1000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Message queue backlog on {{ $labels.queue }}"
          description: "Queue {{ $labels.queue }} has {{ $value }} messages."

Go语言告警客户端实现

// 告警客户端实现
package alerting

import (
    "bytes"
    "context"
    "encoding/json"
    "fmt"
    "net/http"
    "time"
)

// AlertLevel 告警级别
type AlertLevel string

const (
    AlertLevelInfo     AlertLevel = "info"
    AlertLevelWarning  AlertLevel = "warning"
    AlertLevelCritical AlertLevel = "critical"
)

// Alert 告警信息
type Alert struct {
    Name        string            `json:"name"`
    Level       AlertLevel        `json:"level"`
    Summary     string            `json:"summary"`
    Description string            `json:"description"`
    Labels      map[string]string `json:"labels"`
    Annotations map[string]string `json:"annotations"`
    StartsAt    time.Time         `json:"startsAt"`
    EndsAt      *time.Time        `json:"endsAt,omitempty"`
}

// AlertManager 告警管理器接口
type AlertManager interface {
    SendAlert(ctx context.Context, alert Alert) error
    SendAlerts(ctx context.Context, alerts []Alert) error
    ResolveAlert(ctx context.Context, alertName string, labels map[string]string) error
}

// WebhookAlertManager Webhook告警管理器
type WebhookAlertManager struct {
    webhookURL string
    client     *http.Client
    timeout    time.Duration
}

func NewWebhookAlertManager(webhookURL string, timeout time.Duration) *WebhookAlertManager {
    return &WebhookAlertManager{
        webhookURL: webhookURL,
        client: &http.Client{
            Timeout: timeout,
        },
        timeout: timeout,
    }
}

func (wam *WebhookAlertManager) SendAlert(ctx context.Context, alert Alert) error {
    return wam.SendAlerts(ctx, []Alert{alert})
}

func (wam *WebhookAlertManager) SendAlerts(ctx context.Context, alerts []Alert) error {
    payload, err := json.Marshal(alerts)
    if err != nil {
        return fmt.Errorf("failed to marshal alerts: %w", err)
    }
    
    req, err := http.NewRequestWithContext(ctx, "POST", wam.webhookURL, bytes.NewBuffer(payload))
    if err != nil {
        return fmt.Errorf("failed to create request: %w", err)
    }
    
    req.Header.Set("Content-Type", "application/json")
    
    resp, err := wam.client.Do(req)
    if err != nil {
        return fmt.Errorf("failed to send alert: %w", err)
    }
    defer resp.Body.Close()
    
    if resp.StatusCode >= 400 {
        return fmt.Errorf("alert webhook returned status %d", resp.StatusCode)
    }
    
    return nil
}

func (wam *WebhookAlertManager) ResolveAlert(ctx context.Context, alertName string, labels map[string]string) error {
    now := time.Now()
    alert := Alert{
        Name:        alertName,
        Level:       AlertLevelInfo,
        Summary:     fmt.Sprintf("Alert %s resolved", alertName),
        Description: fmt.Sprintf("Alert %s has been resolved", alertName),
        Labels:      labels,
        StartsAt:    now,
        EndsAt:      &now,
    }
    
    return wam.SendAlert(ctx, alert)
}

// HealthChecker 健康检查器
type HealthChecker struct {
    alertManager AlertManager
    serviceName  string
    checkInterval time.Duration
    checks       map[string]HealthCheck
}

// HealthCheck 健康检查接口
type HealthCheck interface {
    Name() string
    Check(ctx context.Context) error
}

// DatabaseHealthCheck 数据库健康检查
type DatabaseHealthCheck struct {
    name string
    db   interface{ Ping() error }
}

func NewDatabaseHealthCheck(name string, db interface{ Ping() error }) *DatabaseHealthCheck {
    return &DatabaseHealthCheck{
        name: name,
        db:   db,
    }
}

func (dhc *DatabaseHealthCheck) Name() string {
    return dhc.name
}

func (dhc *DatabaseHealthCheck) Check(ctx context.Context) error {
    return dhc.db.Ping()
}

// HTTPHealthCheck HTTP健康检查
type HTTPHealthCheck struct {
    name string
    url  string
    client *http.Client
}

func NewHTTPHealthCheck(name, url string, timeout time.Duration) *HTTPHealthCheck {
    return &HTTPHealthCheck{
        name: name,
        url:  url,
        client: &http.Client{
            Timeout: timeout,
        },
    }
}

func (hhc *HTTPHealthCheck) Name() string {
    return hhc.name
}

func (hhc *HTTPHealthCheck) Check(ctx context.Context) error {
    req, err := http.NewRequestWithContext(ctx, "GET", hhc.url, nil)
    if err != nil {
        return err
    }
    
    resp, err := hhc.client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    if resp.StatusCode >= 400 {
        return fmt.Errorf("health check failed with status %d", resp.StatusCode)
    }
    
    return nil
}

func NewHealthChecker(alertManager AlertManager, serviceName string, checkInterval time.Duration) *HealthChecker {
    return &HealthChecker{
        alertManager:  alertManager,
        serviceName:   serviceName,
        checkInterval: checkInterval,
        checks:        make(map[string]HealthCheck),
    }
}

func (hc *HealthChecker) AddCheck(check HealthCheck) {
    hc.checks[check.Name()] = check
}

func (hc *HealthChecker) Start(ctx context.Context) {
    ticker := time.NewTicker(hc.checkInterval)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            hc.runChecks(ctx)
        }
    }
}

func (hc *HealthChecker) runChecks(ctx context.Context) {
    for name, check := range hc.checks {
        go func(name string, check HealthCheck) {
            if err := check.Check(ctx); err != nil {
                alert := Alert{
                    Name:        fmt.Sprintf("%s_health_check_failed", name),
                    Level:       AlertLevelCritical,
                    Summary:     fmt.Sprintf("Health check failed for %s", name),
                    Description: fmt.Sprintf("Health check for %s failed: %v", name, err),
                    Labels: map[string]string{
                        "service":     hc.serviceName,
                        "check_name":  name,
                        "check_type":  "health",
                    },
                    StartsAt: time.Now(),
                }
                
                if alertErr := hc.alertManager.SendAlert(ctx, alert); alertErr != nil {
                    // 记录告警发送失败的日志
                    fmt.Printf("Failed to send alert: %v\n", alertErr)
                }
            }
        }(name, check)
    }
}

// MetricThresholdMonitor 指标阈值监控器
type MetricThresholdMonitor struct {
    alertManager AlertManager
    serviceName  string
    thresholds   map[string]ThresholdConfig
}

// ThresholdConfig 阈值配置
type ThresholdConfig struct {
    MetricName  string
    Threshold   float64
    Operator    string // ">", "<", ">=", "<=", "=="
    Duration    time.Duration
    AlertLevel  AlertLevel
    Description string
}

func NewMetricThresholdMonitor(alertManager AlertManager, serviceName string) *MetricThresholdMonitor {
    return &MetricThresholdMonitor{
        alertManager: alertManager,
        serviceName:  serviceName,
        thresholds:   make(map[string]ThresholdConfig),
    }
}

func (mtm *MetricThresholdMonitor) AddThreshold(config ThresholdConfig) {
    mtm.thresholds[config.MetricName] = config
}

func (mtm *MetricThresholdMonitor) CheckMetric(ctx context.Context, metricName string, value float64) {
    config, exists := mtm.thresholds[metricName]
    if !exists {
        return
    }
    
    var triggered bool
    switch config.Operator {
    case ">":
        triggered = value > config.Threshold
    case "<":
        triggered = value < config.Threshold
    case ">=":
        triggered = value >= config.Threshold
    case "<=":
        triggered = value <= config.Threshold
    case "==":
        triggered = value == config.Threshold
    }
    
    if triggered {
        alert := Alert{
            Name:        fmt.Sprintf("%s_threshold_exceeded", metricName),
            Level:       config.AlertLevel,
            Summary:     fmt.Sprintf("Metric %s threshold exceeded", metricName),
            Description: fmt.Sprintf("%s: current value %.2f %s threshold %.2f", config.Description, value, config.Operator, config.Threshold),
            Labels: map[string]string{
                "service":     mtm.serviceName,
                "metric_name": metricName,
                "alert_type":  "threshold",
            },
            Annotations: map[string]string{
                "current_value": fmt.Sprintf("%.2f", value),
                "threshold":     fmt.Sprintf("%.2f", config.Threshold),
                "operator":      config.Operator,
            },
            StartsAt: time.Now(),
        }
        
        if err := mtm.alertManager.SendAlert(ctx, alert); err != nil {
            fmt.Printf("Failed to send threshold alert: %v\n", err)
        }
    }
}

// 使用示例
func ExampleAlerting() {
    // 创建告警管理器
    alertManager := NewWebhookAlertManager(
        "http://alertmanager:9093/api/v1/alerts",
        30*time.Second,
    )
    
    // 创建健康检查器
    healthChecker := NewHealthChecker(
        alertManager,
        "user-service",
        30*time.Second,
    )
    
    // 添加数据库健康检查
    // dbHealthCheck := NewDatabaseHealthCheck("database", db)
    // healthChecker.AddCheck(dbHealthCheck)
    
    // 添加HTTP健康检查
    httpHealthCheck := NewHTTPHealthCheck(
        "external-api",
        "http://external-api/health",
        10*time.Second,
    )
    healthChecker.AddCheck(httpHealthCheck)
    
    // 创建指标阈值监控器
    thresholdMonitor := NewMetricThresholdMonitor(alertManager, "user-service")
    
    // 添加阈值配置
    thresholdMonitor.AddThreshold(ThresholdConfig{
        MetricName:  "cpu_usage_percent",
        Threshold:   80.0,
        Operator:    ">",
        Duration:    5 * time.Minute,
        AlertLevel:  AlertLevelWarning,
        Description: "High CPU usage detected",
    })
    
    thresholdMonitor.AddThreshold(ThresholdConfig{
        MetricName:  "memory_usage_percent",
        Threshold:   90.0,
        Operator:    ">",
        Duration:    5 * time.Minute,
        AlertLevel:  AlertLevelCritical,
        Description: "Critical memory usage detected",
    })
    
    // 启动健康检查
    ctx := context.Background()
    go healthChecker.Start(ctx)
    
    // 模拟指标检查
    go func() {
        ticker := time.NewTicker(1 * time.Minute)
        defer ticker.Stop()
        
        for {
            select {
            case <-ctx.Done():
                return
            case <-ticker.C:
                // 模拟获取CPU使用率
                cpuUsage := 85.0 // 从监控系统获取实际值
                thresholdMonitor.CheckMetric(ctx, "cpu_usage_percent", cpuUsage)
                
                // 模拟获取内存使用率
                memoryUsage := 75.0 // 从监控系统获取实际值
                thresholdMonitor.CheckMetric(ctx, "memory_usage_percent", memoryUsage)
            }
        }
    }()
}

分布式追踪(Tracing)

Jaeger配置

# Jaeger All-in-One部署
apiVersion: apps/v1
kind: Deployment
metadata:
  name: jaeger
  namespace: tracing
spec:
  replicas: 1
  selector:
    matchLabels:
      app: jaeger
  template:
    metadata:
      labels:
        app: jaeger
    spec:
      containers:
      - name: jaeger
        image: jaegertracing/all-in-one:1.41
        ports:
        - containerPort: 16686
          name: ui
        - containerPort: 14268
          name: collector
        - containerPort: 6831
          name: agent-udp
        - containerPort: 6832
          name: agent-binary
        - containerPort: 5778
          name: config
        env:
        - name: COLLECTOR_ZIPKIN_HOST_PORT
          value: ":9411"
        - name: SPAN_STORAGE_TYPE
          value: "elasticsearch"
        - name: ES_SERVER_URLS
          value: "http://elasticsearch.logging.svc.cluster.local:9200"
        - name: ES_INDEX_PREFIX
          value: "jaeger"
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
---
apiVersion: v1
kind: Service
metadata:
  name: jaeger
  namespace: tracing
spec:
  selector:
    app: jaeger
  ports:
  - port: 16686
    targetPort: 16686
    name: ui
  - port: 14268
    targetPort: 14268
    name: collector
  - port: 6831
    targetPort: 6831
    protocol: UDP
    name: agent-udp
  - port: 6832
    targetPort: 6832
    protocol: UDP
    name: agent-binary
  - port: 5778
    targetPort: 5778
    name: config
  type: LoadBalancer
---
# Jaeger Agent DaemonSet
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: jaeger-agent
  namespace: tracing
spec:
  selector:
    matchLabels:
      app: jaeger-agent
  template:
    metadata:
      labels:
        app: jaeger-agent
    spec:
      containers:
      - name: jaeger-agent
        image: jaegertracing/jaeger-agent:1.41
        ports:
        - containerPort: 6831
          protocol: UDP
          name: agent-udp
        - containerPort: 6832
          protocol: UDP
          name: agent-binary
        - containerPort: 5778
          name: config
        args:
        - "--collector.host-port=jaeger.tracing.svc.cluster.local:14267"
        - "--log-level=info"
        resources:
          requests:
            memory: "128Mi"
            cpu: "100m"
          limits:
            memory: "256Mi"
            cpu: "200m"
      hostNetwork: true
      dnsPolicy: ClusterFirstWithHostNet

Go语言分布式追踪实现

// 分布式追踪实现
package tracing

import (
    "context"
    "fmt"
    "net/http"
    "time"
    
    "github.com/opentracing/opentracing-go"
    "github.com/opentracing/opentracing-go/ext"
    "github.com/uber/jaeger-client-go"
    "github.com/uber/jaeger-client-go/config"
    "github.com/uber/jaeger-client-go/log"
    "github.com/uber/jaeger-lib/metrics"
)

// TracingConfig 追踪配置
type TracingConfig struct {
    ServiceName     string
    JaegerEndpoint  string
    SamplingRate    float64
    LogSpans        bool
}

// TracingManager 追踪管理器
type TracingManager struct {
    tracer opentracing.Tracer
    closer func() error
}

// NewTracingManager 创建追踪管理器
func NewTracingManager(cfg TracingConfig) (*TracingManager, error) {
    // 配置Jaeger
    jcfg := config.Configuration{
        ServiceName: cfg.ServiceName,
        Sampler: &config.SamplerConfig{
            Type:  jaeger.SamplerTypeConst,
            Param: cfg.SamplingRate,
        },
        Reporter: &config.ReporterConfig{
            LogSpans:            cfg.LogSpans,
            BufferFlushInterval: 1 * time.Second,
            LocalAgentHostPort:  cfg.JaegerEndpoint,
        },
    }
    
    // 创建追踪器
    tracer, closer, err := jcfg.NewTracer(
        config.Logger(log.StdLogger),
        config.Metrics(metrics.NullFactory),
    )
    if err != nil {
        return nil, fmt.Errorf("failed to create tracer: %w", err)
    }
    
    // 设置全局追踪器
    opentracing.SetGlobalTracer(tracer)
    
    return &TracingManager{
        tracer: tracer,
        closer: closer,
    }, nil
}

// Close 关闭追踪器
func (tm *TracingManager) Close() error {
    if tm.closer != nil {
        return tm.closer()
    }
    return nil
}

// StartSpan 开始新的Span
func (tm *TracingManager) StartSpan(operationName string, opts ...opentracing.StartSpanOption) opentracing.Span {
    return tm.tracer.StartSpan(operationName, opts...)
}

// StartSpanFromContext 从上下文开始新的Span
func (tm *TracingManager) StartSpanFromContext(ctx context.Context, operationName string, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) {
    return opentracing.StartSpanFromContext(ctx, operationName, opts...)
}

// InjectHTTPHeaders 注入HTTP头
func (tm *TracingManager) InjectHTTPHeaders(span opentracing.Span, req *http.Request) error {
    return tm.tracer.Inject(
        span.Context(),
        opentracing.HTTPHeaders,
        opentracing.HTTPHeadersCarrier(req.Header),
    )
}

// ExtractHTTPHeaders 提取HTTP头
func (tm *TracingManager) ExtractHTTPHeaders(req *http.Request) (opentracing.SpanContext, error) {
    return tm.tracer.Extract(
        opentracing.HTTPHeaders,
        opentracing.HTTPHeadersCarrier(req.Header),
    )
}

// TracingMiddleware 追踪中间件
type TracingMiddleware struct {
    tracer *TracingManager
}

func NewTracingMiddleware(tracer *TracingManager) *TracingMiddleware {
    return &TracingMiddleware{
        tracer: tracer,
    }
}

func (middleware *TracingMiddleware) Handler(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        // 尝试从请求头提取span上下文
        spanCtx, err := middleware.tracer.ExtractHTTPHeaders(r)
        
        var span opentracing.Span
        if err != nil {
            // 创建新的根span
            span = middleware.tracer.StartSpan(fmt.Sprintf("%s %s", r.Method, r.URL.Path))
        } else {
            // 创建子span
            span = middleware.tracer.StartSpan(
                fmt.Sprintf("%s %s", r.Method, r.URL.Path),
                opentracing.ChildOf(spanCtx),
            )
        }
        defer span.Finish()
        
        // 设置span标签
        ext.HTTPMethod.Set(span, r.Method)
        ext.HTTPUrl.Set(span, r.URL.String())
        ext.Component.Set(span, "http-server")
        
        // 将span添加到上下文
        ctx := opentracing.ContextWithSpan(r.Context(), span)
        r = r.WithContext(ctx)
        
        // 创建响应记录器
        recorder := &tracingResponseRecorder{
            ResponseWriter: w,
            statusCode:     http.StatusOK,
        }
        
        // 处理请求
        next.ServeHTTP(recorder, r)
        
        // 设置响应状态码
        ext.HTTPStatusCode.Set(span, uint16(recorder.statusCode))
        if recorder.statusCode >= 400 {
            ext.Error.Set(span, true)
        }
    })
}

// tracingResponseRecorder 追踪响应记录器
type tracingResponseRecorder struct {
    http.ResponseWriter
    statusCode int
}

func (recorder *tracingResponseRecorder) WriteHeader(statusCode int) {
    recorder.statusCode = statusCode
    recorder.ResponseWriter.WriteHeader(statusCode)
}

// HTTPClient 带追踪的HTTP客户端
type HTTPClient struct {
    client *http.Client
    tracer *TracingManager
}

func NewHTTPClient(tracer *TracingManager) *HTTPClient {
    return &HTTPClient{
        client: &http.Client{
            Timeout: 30 * time.Second,
        },
        tracer: tracer,
    }
}

func (c *HTTPClient) Do(ctx context.Context, req *http.Request) (*http.Response, error) {
    // 从上下文获取span
    span := opentracing.SpanFromContext(ctx)
    if span == nil {
        // 创建新的span
        span = c.tracer.StartSpan(fmt.Sprintf("HTTP %s", req.Method))
        defer span.Finish()
    } else {
        // 创建子span
        childSpan := c.tracer.StartSpan(
            fmt.Sprintf("HTTP %s", req.Method),
            opentracing.ChildOf(span.Context()),
        )
        defer childSpan.Finish()
        span = childSpan
    }
    
    // 设置span标签
    ext.HTTPMethod.Set(span, req.Method)
    ext.HTTPUrl.Set(span, req.URL.String())
    ext.Component.Set(span, "http-client")
    ext.SpanKindRPCClient.Set(span)
    
    // 注入追踪头
    err := c.tracer.InjectHTTPHeaders(span, req)
    if err != nil {
        span.LogFields(
            opentracing.String("event", "inject_failed"),
            opentracing.String("error", err.Error()),
        )
    }
    
    // 发送请求
    resp, err := c.client.Do(req)
    if err != nil {
        ext.Error.Set(span, true)
        span.LogFields(
            opentracing.String("event", "request_failed"),
            opentracing.String("error", err.Error()),
        )
        return nil, err
    }
    
    // 设置响应状态码
    ext.HTTPStatusCode.Set(span, uint16(resp.StatusCode))
    if resp.StatusCode >= 400 {
        ext.Error.Set(span, true)
    }
    
    return resp, nil
}

// DatabaseTracer 数据库追踪器
type DatabaseTracer struct {
    tracer *TracingManager
}

func NewDatabaseTracer(tracer *TracingManager) *DatabaseTracer {
    return &DatabaseTracer{
        tracer: tracer,
    }
}

func (dt *DatabaseTracer) TraceQuery(ctx context.Context, query string, args ...interface{}) (opentracing.Span, context.Context) {
    span, ctx := dt.tracer.StartSpanFromContext(ctx, "db.query")
    
    // 设置数据库相关标签
    ext.DBType.Set(span, "sql")
    ext.DBStatement.Set(span, query)
    ext.Component.Set(span, "database")
    
    // 记录查询参数
    if len(args) > 0 {
        span.SetTag("db.args_count", len(args))
    }
    
    return span, ctx
}

func (dt *DatabaseTracer) FinishQuery(span opentracing.Span, err error, rowsAffected int64) {
    if err != nil {
        ext.Error.Set(span, true)
        span.LogFields(
            opentracing.String("event", "query_failed"),
            opentracing.String("error", err.Error()),
        )
    } else {
        span.SetTag("db.rows_affected", rowsAffected)
    }
    span.Finish()
}

// MessageQueueTracer 消息队列追踪器
type MessageQueueTracer struct {
    tracer *TracingManager
}

func NewMessageQueueTracer(tracer *TracingManager) *MessageQueueTracer {
    return &MessageQueueTracer{
        tracer: tracer,
    }
}

func (mqt *MessageQueueTracer) TracePublish(ctx context.Context, topic string, message []byte) (opentracing.Span, context.Context) {
    span, ctx := mqt.tracer.StartSpanFromContext(ctx, "mq.publish")
    
    // 设置消息队列相关标签
    ext.MessageBusDestination.Set(span, topic)
    ext.Component.Set(span, "message-queue")
    ext.SpanKindProducer.Set(span)
    
    span.SetTag("mq.message_size", len(message))
    
    return span, ctx
}

func (mqt *MessageQueueTracer) TraceConsume(ctx context.Context, topic string, message []byte) (opentracing.Span, context.Context) {
    span, ctx := mqt.tracer.StartSpanFromContext(ctx, "mq.consume")
    
    // 设置消息队列相关标签
    ext.MessageBusDestination.Set(span, topic)
    ext.Component.Set(span, "message-queue")
    ext.SpanKindConsumer.Set(span)
    
    span.SetTag("mq.message_size", len(message))
    
    return span, ctx
}

// 使用示例
func ExampleTracing() {
    // 创建追踪管理器
    tracingManager, err := NewTracingManager(TracingConfig{
        ServiceName:    "user-service",
        JaegerEndpoint: "localhost:6831",
        SamplingRate:   1.0,
        LogSpans:       true,
    })
    if err != nil {
        panic(err)
    }
    defer tracingManager.Close()
    
    // 创建追踪中间件
    tracingMiddleware := NewTracingMiddleware(tracingManager)
    
    // 创建HTTP客户端
    httpClient := NewHTTPClient(tracingManager)
    
    // 创建数据库追踪器
    dbTracer := NewDatabaseTracer(tracingManager)
    
    // 创建HTTP路由
    mux := http.NewServeMux()
    mux.HandleFunc("/users", func(w http.ResponseWriter, r *http.Request) {
        ctx := r.Context()
        
        // 数据库查询追踪
        span, ctx := dbTracer.TraceQuery(ctx, "SELECT * FROM users WHERE active = ?", true)
        
        // 模拟数据库查询
        time.Sleep(10 * time.Millisecond)
        
        // 完成数据库查询
        dbTracer.FinishQuery(span, nil, 3)
        
        // HTTP客户端调用追踪
        req, _ := http.NewRequest("GET", "http://profile-service/profiles", nil)
        resp, err := httpClient.Do(ctx, req)
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
        defer resp.Body.Close()
        
        // 返回响应
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"users": ["user1", "user2", "user3"]}`))
    })
    
    // 应用追踪中间件
    handler := tracingMiddleware.Handler(mux)
    
    // 启动服务器
    // http.ListenAndServe(":8080", handler)
}