概述
可观测性(Observability)是微服务架构中的关键能力,它通过三大支柱——指标(Metrics)、日志(Logs)和追踪(Traces)来帮助我们理解系统的运行状态。本章将深入探讨如何构建完整的微服务监控与可观测性体系。
可观测性三大支柱
可观测性架构概览
# 可观测性架构配置
apiVersion: v1
kind: ConfigMap
metadata:
name: observability-config
namespace: microservices
data:
architecture.yaml: |
observability:
metrics:
collection:
- prometheus
- grafana
storage: prometheus
visualization: grafana
alerting: alertmanager
logging:
collection:
- fluentd
- filebeat
aggregation: elasticsearch
visualization: kibana
storage: elasticsearch
tracing:
collection: jaeger-agent
processing: jaeger-collector
storage: jaeger-storage
visualization: jaeger-ui
integration:
service_mesh: istio
api_gateway: kong
load_balancer: nginx
指标监控(Metrics)
Prometheus配置
# Prometheus部署配置
apiVersion: apps/v1
kind: Deployment
metadata:
name: prometheus
namespace: monitoring
spec:
replicas: 1
selector:
matchLabels:
app: prometheus
template:
metadata:
labels:
app: prometheus
spec:
containers:
- name: prometheus
image: prom/prometheus:v2.40.0
ports:
- containerPort: 9090
volumeMounts:
- name: config
mountPath: /etc/prometheus
- name: storage
mountPath: /prometheus
args:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
- '--storage.tsdb.retention.time=15d'
- '--web.enable-lifecycle'
volumes:
- name: config
configMap:
name: prometheus-config
- name: storage
persistentVolumeClaim:
claimName: prometheus-storage
---
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-config
namespace: monitoring
data:
prometheus.yml: |
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "rules/*.yml"
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093
scrape_configs:
- job_name: 'kubernetes-apiservers'
kubernetes_sd_configs:
- role: endpoints
scheme: https
tls_config:
ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token
relabel_configs:
- source_labels: [__meta_kubernetes_namespace, __meta_kubernetes_service_name, __meta_kubernetes_endpoint_port_name]
action: keep
regex: default;kubernetes;https
- job_name: 'kubernetes-nodes'
kubernetes_sd_configs:
- role: node
scheme: https
tls_config:
ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token
relabel_configs:
- action: labelmap
regex: __meta_kubernetes_node_label_(.+)
- target_label: __address__
replacement: kubernetes.default.svc:443
- source_labels: [__meta_kubernetes_node_name]
regex: (.+)
target_label: __metrics_path__
replacement: /api/v1/nodes/${1}/proxy/metrics
- job_name: 'microservices'
kubernetes_sd_configs:
- role: endpoints
namespaces:
names:
- microservices
relabel_configs:
- source_labels: [__meta_kubernetes_service_annotation_prometheus_io_scrape]
action: keep
regex: true
- source_labels: [__meta_kubernetes_service_annotation_prometheus_io_path]
action: replace
target_label: __metrics_path__
regex: (.+)
- source_labels: [__address__, __meta_kubernetes_service_annotation_prometheus_io_port]
action: replace
regex: ([^:]+)(?::\d+)?;(\d+)
replacement: $1:$2
target_label: __address__
- action: labelmap
regex: __meta_kubernetes_service_label_(.+)
- source_labels: [__meta_kubernetes_namespace]
action: replace
target_label: kubernetes_namespace
- source_labels: [__meta_kubernetes_service_name]
action: replace
target_label: kubernetes_name
---
apiVersion: v1
kind: Service
metadata:
name: prometheus
namespace: monitoring
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9090"
spec:
selector:
app: prometheus
ports:
- port: 9090
targetPort: 9090
name: web
Go语言指标收集实现
// 指标收集实现
package metrics
import (
"net/http"
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// MetricsCollector 指标收集器
type MetricsCollector struct {
// HTTP指标
httpRequestsTotal *prometheus.CounterVec
httpRequestDuration *prometheus.HistogramVec
httpRequestsInFlight prometheus.Gauge
// 业务指标
businessOperationsTotal *prometheus.CounterVec
businessOperationDuration *prometheus.HistogramVec
// 系统指标
systemCPUUsage prometheus.Gauge
systemMemoryUsage prometheus.Gauge
systemDiskUsage prometheus.Gauge
// 数据库指标
dbConnectionsActive prometheus.Gauge
dbConnectionsIdle prometheus.Gauge
dbQueriesTotal *prometheus.CounterVec
dbQueryDuration *prometheus.HistogramVec
// 缓存指标
cacheHitsTotal *prometheus.CounterVec
cacheMissesTotal *prometheus.CounterVec
cacheSize prometheus.Gauge
}
func NewMetricsCollector(serviceName string) *MetricsCollector {
collector := &MetricsCollector{
// HTTP指标
httpRequestsTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_total",
Help: "Total number of HTTP requests",
},
[]string{"service", "method", "endpoint", "status_code"},
),
httpRequestDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "HTTP request duration in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"service", "method", "endpoint"},
),
httpRequestsInFlight: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "http_requests_in_flight",
Help: "Current number of HTTP requests being processed",
},
),
// 业务指标
businessOperationsTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "business_operations_total",
Help: "Total number of business operations",
},
[]string{"service", "operation", "status"},
),
businessOperationDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "business_operation_duration_seconds",
Help: "Business operation duration in seconds",
Buckets: []float64{0.001, 0.01, 0.1, 0.5, 1, 2.5, 5, 10},
},
[]string{"service", "operation"},
),
// 系统指标
systemCPUUsage: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "system_cpu_usage_percent",
Help: "Current CPU usage percentage",
},
),
systemMemoryUsage: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "system_memory_usage_bytes",
Help: "Current memory usage in bytes",
},
),
systemDiskUsage: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "system_disk_usage_percent",
Help: "Current disk usage percentage",
},
),
// 数据库指标
dbConnectionsActive: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "db_connections_active",
Help: "Number of active database connections",
},
),
dbConnectionsIdle: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "db_connections_idle",
Help: "Number of idle database connections",
},
),
dbQueriesTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "db_queries_total",
Help: "Total number of database queries",
},
[]string{"service", "query_type", "status"},
),
dbQueryDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "db_query_duration_seconds",
Help: "Database query duration in seconds",
Buckets: []float64{0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 2},
},
[]string{"service", "query_type"},
),
// 缓存指标
cacheHitsTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "cache_hits_total",
Help: "Total number of cache hits",
},
[]string{"service", "cache_name"},
),
cacheMissesTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "cache_misses_total",
Help: "Total number of cache misses",
},
[]string{"service", "cache_name"},
),
cacheSize: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "cache_size_bytes",
Help: "Current cache size in bytes",
},
),
}
// 注册所有指标
prometheus.MustRegister(
collector.httpRequestsTotal,
collector.httpRequestDuration,
collector.httpRequestsInFlight,
collector.businessOperationsTotal,
collector.businessOperationDuration,
collector.systemCPUUsage,
collector.systemMemoryUsage,
collector.systemDiskUsage,
collector.dbConnectionsActive,
collector.dbConnectionsIdle,
collector.dbQueriesTotal,
collector.dbQueryDuration,
collector.cacheHitsTotal,
collector.cacheMissesTotal,
collector.cacheSize,
)
return collector
}
// RecordHTTPRequest 记录HTTP请求指标
func (collector *MetricsCollector) RecordHTTPRequest(service, method, endpoint string, statusCode int, duration time.Duration) {
collector.httpRequestsTotal.WithLabelValues(
service, method, endpoint, strconv.Itoa(statusCode),
).Inc()
collector.httpRequestDuration.WithLabelValues(
service, method, endpoint,
).Observe(duration.Seconds())
}
// IncHTTPRequestsInFlight 增加正在处理的HTTP请求数
func (collector *MetricsCollector) IncHTTPRequestsInFlight() {
collector.httpRequestsInFlight.Inc()
}
// DecHTTPRequestsInFlight 减少正在处理的HTTP请求数
func (collector *MetricsCollector) DecHTTPRequestsInFlight() {
collector.httpRequestsInFlight.Dec()
}
// RecordBusinessOperation 记录业务操作指标
func (collector *MetricsCollector) RecordBusinessOperation(service, operation, status string, duration time.Duration) {
collector.businessOperationsTotal.WithLabelValues(
service, operation, status,
).Inc()
collector.businessOperationDuration.WithLabelValues(
service, operation,
).Observe(duration.Seconds())
}
// UpdateSystemMetrics 更新系统指标
func (collector *MetricsCollector) UpdateSystemMetrics(cpuUsage, memoryUsage float64, diskUsage float64) {
collector.systemCPUUsage.Set(cpuUsage)
collector.systemMemoryUsage.Set(memoryUsage)
collector.systemDiskUsage.Set(diskUsage)
}
// UpdateDatabaseMetrics 更新数据库指标
func (collector *MetricsCollector) UpdateDatabaseMetrics(activeConns, idleConns int) {
collector.dbConnectionsActive.Set(float64(activeConns))
collector.dbConnectionsIdle.Set(float64(idleConns))
}
// RecordDatabaseQuery 记录数据库查询指标
func (collector *MetricsCollector) RecordDatabaseQuery(service, queryType, status string, duration time.Duration) {
collector.dbQueriesTotal.WithLabelValues(
service, queryType, status,
).Inc()
collector.dbQueryDuration.WithLabelValues(
service, queryType,
).Observe(duration.Seconds())
}
// RecordCacheHit 记录缓存命中
func (collector *MetricsCollector) RecordCacheHit(service, cacheName string) {
collector.cacheHitsTotal.WithLabelValues(service, cacheName).Inc()
}
// RecordCacheMiss 记录缓存未命中
func (collector *MetricsCollector) RecordCacheMiss(service, cacheName string) {
collector.cacheMissesTotal.WithLabelValues(service, cacheName).Inc()
}
// UpdateCacheSize 更新缓存大小
func (collector *MetricsCollector) UpdateCacheSize(size float64) {
collector.cacheSize.Set(size)
}
// MetricsMiddleware HTTP指标中间件
type MetricsMiddleware struct {
collector *MetricsCollector
serviceName string
}
func NewMetricsMiddleware(collector *MetricsCollector, serviceName string) *MetricsMiddleware {
return &MetricsMiddleware{
collector: collector,
serviceName: serviceName,
}
}
func (middleware *MetricsMiddleware) Handler(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// 增加正在处理的请求数
middleware.collector.IncHTTPRequestsInFlight()
defer middleware.collector.DecHTTPRequestsInFlight()
// 创建响应记录器
recorder := &responseRecorder{
ResponseWriter: w,
statusCode: http.StatusOK,
}
// 处理请求
next.ServeHTTP(recorder, r)
// 记录指标
duration := time.Since(start)
middleware.collector.RecordHTTPRequest(
middleware.serviceName,
r.Method,
r.URL.Path,
recorder.statusCode,
duration,
)
})
}
// responseRecorder 响应记录器
type responseRecorder struct {
http.ResponseWriter
statusCode int
}
func (recorder *responseRecorder) WriteHeader(statusCode int) {
recorder.statusCode = statusCode
recorder.ResponseWriter.WriteHeader(statusCode)
}
// StartMetricsServer 启动指标服务器
func StartMetricsServer(port string) {
http.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(":"+port, nil)
}
// 使用示例
func ExampleMetrics() {
// 创建指标收集器
collector := NewMetricsCollector("user-service")
// 创建指标中间件
metricsMiddleware := NewMetricsMiddleware(collector, "user-service")
// 创建HTTP路由
mux := http.NewServeMux()
mux.HandleFunc("/users", func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// 模拟业务操作
time.Sleep(100 * time.Millisecond)
// 记录业务指标
collector.RecordBusinessOperation(
"user-service",
"get_users",
"success",
time.Since(start),
)
w.WriteHeader(http.StatusOK)
w.Write([]byte("Users data"))
})
// 应用指标中间件
handler := metricsMiddleware.Handler(mux)
// 启动指标服务器(在单独的goroutine中)
go StartMetricsServer("9090")
// 启动主服务器
// http.ListenAndServe(":8080", handler)
}
日志聚合(Logging)
ELK Stack配置
# Elasticsearch部署
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: elasticsearch
namespace: logging
spec:
serviceName: elasticsearch
replicas: 3
selector:
matchLabels:
app: elasticsearch
template:
metadata:
labels:
app: elasticsearch
spec:
containers:
- name: elasticsearch
image: docker.elastic.co/elasticsearch/elasticsearch:8.5.0
ports:
- containerPort: 9200
name: http
- containerPort: 9300
name: transport
env:
- name: cluster.name
value: "microservices-logs"
- name: node.name
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: discovery.seed_hosts
value: "elasticsearch-0.elasticsearch,elasticsearch-1.elasticsearch,elasticsearch-2.elasticsearch"
- name: cluster.initial_master_nodes
value: "elasticsearch-0,elasticsearch-1,elasticsearch-2"
- name: ES_JAVA_OPTS
value: "-Xms1g -Xmx1g"
- name: xpack.security.enabled
value: "false"
volumeMounts:
- name: data
mountPath: /usr/share/elasticsearch/data
resources:
requests:
memory: "2Gi"
cpu: "1000m"
limits:
memory: "2Gi"
cpu: "1000m"
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 10Gi
---
apiVersion: v1
kind: Service
metadata:
name: elasticsearch
namespace: logging
spec:
clusterIP: None
selector:
app: elasticsearch
ports:
- port: 9200
name: http
- port: 9300
name: transport
---
# Kibana部署
apiVersion: apps/v1
kind: Deployment
metadata:
name: kibana
namespace: logging
spec:
replicas: 1
selector:
matchLabels:
app: kibana
template:
metadata:
labels:
app: kibana
spec:
containers:
- name: kibana
image: docker.elastic.co/kibana/kibana:8.5.0
ports:
- containerPort: 5601
env:
- name: ELASTICSEARCH_HOSTS
value: "http://elasticsearch:9200"
- name: SERVER_NAME
value: "kibana"
- name: SERVER_HOST
value: "0.0.0.0"
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:
name: kibana
namespace: logging
spec:
selector:
app: kibana
ports:
- port: 5601
targetPort: 5601
type: LoadBalancer
---
# Fluentd配置
apiVersion: v1
kind: ConfigMap
metadata:
name: fluentd-config
namespace: logging
data:
fluent.conf: |
<source>
@type tail
path /var/log/containers/*.log
pos_file /var/log/fluentd-containers.log.pos
tag kubernetes.*
read_from_head true
<parse>
@type json
time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>
</source>
<filter kubernetes.**>
@type kubernetes_metadata
</filter>
<filter kubernetes.**>
@type record_transformer
<record>
hostname ${hostname}
service_name ${record["kubernetes"]["labels"]["app"]}
namespace ${record["kubernetes"]["namespace_name"]}
pod_name ${record["kubernetes"]["pod_name"]}
container_name ${record["kubernetes"]["container_name"]}
</record>
</filter>
<match kubernetes.**>
@type elasticsearch
host elasticsearch.logging.svc.cluster.local
port 9200
index_name microservices-logs
type_name _doc
include_tag_key true
tag_key @log_name
flush_interval 1s
<buffer>
@type file
path /var/log/fluentd-buffers/kubernetes.system.buffer
flush_mode interval
retry_type exponential_backoff
flush_thread_count 2
flush_interval 5s
retry_forever
retry_max_interval 30
chunk_limit_size 2M
queue_limit_length 8
overflow_action block
</buffer>
</match>
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluentd
namespace: logging
spec:
selector:
matchLabels:
app: fluentd
template:
metadata:
labels:
app: fluentd
spec:
serviceAccountName: fluentd
containers:
- name: fluentd
image: fluent/fluentd-kubernetes-daemonset:v1.15-debian-elasticsearch7-1
env:
- name: FLUENT_ELASTICSEARCH_HOST
value: "elasticsearch.logging.svc.cluster.local"
- name: FLUENT_ELASTICSEARCH_PORT
value: "9200"
- name: FLUENT_ELASTICSEARCH_SCHEME
value: "http"
volumeMounts:
- name: varlog
mountPath: /var/log
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
- name: config
mountPath: /fluentd/etc/fluent.conf
subPath: fluent.conf
resources:
requests:
memory: "200Mi"
cpu: "100m"
limits:
memory: "500Mi"
cpu: "500m"
volumes:
- name: varlog
hostPath:
path: /var/log
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers
- name: config
configMap:
name: fluentd-config
Go语言结构化日志实现
// 结构化日志实现
package logging
import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"runtime"
"strings"
"time"
"github.com/sirupsen/logrus"
)
// LogLevel 日志级别
type LogLevel string
const (
DebugLevel LogLevel = "debug"
InfoLevel LogLevel = "info"
WarnLevel LogLevel = "warn"
ErrorLevel LogLevel = "error"
FatalLevel LogLevel = "fatal"
)
// LogEntry 日志条目
type LogEntry struct {
Timestamp time.Time `json:"timestamp"`
Level LogLevel `json:"level"`
Service string `json:"service"`
Version string `json:"version"`
TraceID string `json:"trace_id,omitempty"`
SpanID string `json:"span_id,omitempty"`
UserID string `json:"user_id,omitempty"`
RequestID string `json:"request_id,omitempty"`
Message string `json:"message"`
Fields map[string]interface{} `json:"fields,omitempty"`
Error string `json:"error,omitempty"`
Stack string `json:"stack,omitempty"`
File string `json:"file,omitempty"`
Line int `json:"line,omitempty"`
Function string `json:"function,omitempty"`
}
// Logger 结构化日志器
type Logger struct {
service string
version string
output io.Writer
level LogLevel
formatter Formatter
}
// Formatter 日志格式化器接口
type Formatter interface {
Format(entry *LogEntry) ([]byte, error)
}
// JSONFormatter JSON格式化器
type JSONFormatter struct{}
func (f *JSONFormatter) Format(entry *LogEntry) ([]byte, error) {
data, err := json.Marshal(entry)
if err != nil {
return nil, err
}
return append(data, '\n'), nil
}
// TextFormatter 文本格式化器
type TextFormatter struct{}
func (f *TextFormatter) Format(entry *LogEntry) ([]byte, error) {
var fields []string
for k, v := range entry.Fields {
fields = append(fields, fmt.Sprintf("%s=%v", k, v))
}
fieldsStr := ""
if len(fields) > 0 {
fieldsStr = " " + strings.Join(fields, " ")
}
line := fmt.Sprintf("[%s] %s %s: %s%s\n",
entry.Timestamp.Format(time.RFC3339),
strings.ToUpper(string(entry.Level)),
entry.Service,
entry.Message,
fieldsStr,
)
return []byte(line), nil
}
// NewLogger 创建新的日志器
func NewLogger(service, version string, level LogLevel, output io.Writer, formatter Formatter) *Logger {
if output == nil {
output = os.Stdout
}
if formatter == nil {
formatter = &JSONFormatter{}
}
return &Logger{
service: service,
version: version,
output: output,
level: level,
formatter: formatter,
}
}
// WithContext 从上下文创建日志器
func (logger *Logger) WithContext(ctx context.Context) *ContextLogger {
return &ContextLogger{
logger: logger,
ctx: ctx,
}
}
// log 记录日志
func (logger *Logger) log(level LogLevel, message string, fields map[string]interface{}, err error) {
if !logger.shouldLog(level) {
return
}
entry := &LogEntry{
Timestamp: time.Now().UTC(),
Level: level,
Service: logger.service,
Version: logger.version,
Message: message,
Fields: fields,
}
if err != nil {
entry.Error = err.Error()
if level == ErrorLevel || level == FatalLevel {
entry.Stack = getStackTrace()
}
}
// 获取调用信息
if pc, file, line, ok := runtime.Caller(3); ok {
entry.File = file
entry.Line = line
if fn := runtime.FuncForPC(pc); fn != nil {
entry.Function = fn.Name()
}
}
data, err := logger.formatter.Format(entry)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to format log entry: %v\n", err)
return
}
logger.output.Write(data)
}
// shouldLog 检查是否应该记录日志
func (logger *Logger) shouldLog(level LogLevel) bool {
levels := map[LogLevel]int{
DebugLevel: 0,
InfoLevel: 1,
WarnLevel: 2,
ErrorLevel: 3,
FatalLevel: 4,
}
return levels[level] >= levels[logger.level]
}
// Debug 记录调试日志
func (logger *Logger) Debug(message string, fields ...map[string]interface{}) {
var f map[string]interface{}
if len(fields) > 0 {
f = fields[0]
}
logger.log(DebugLevel, message, f, nil)
}
// Info 记录信息日志
func (logger *Logger) Info(message string, fields ...map[string]interface{}) {
var f map[string]interface{}
if len(fields) > 0 {
f = fields[0]
}
logger.log(InfoLevel, message, f, nil)
}
// Warn 记录警告日志
func (logger *Logger) Warn(message string, fields ...map[string]interface{}) {
var f map[string]interface{}
if len(fields) > 0 {
f = fields[0]
}
logger.log(WarnLevel, message, f, nil)
}
// Error 记录错误日志
func (logger *Logger) Error(message string, err error, fields ...map[string]interface{}) {
var f map[string]interface{}
if len(fields) > 0 {
f = fields[0]
}
logger.log(ErrorLevel, message, f, err)
}
// Fatal 记录致命错误日志
func (logger *Logger) Fatal(message string, err error, fields ...map[string]interface{}) {
var f map[string]interface{}
if len(fields) > 0 {
f = fields[0]
}
logger.log(FatalLevel, message, f, err)
os.Exit(1)
}
// ContextLogger 带上下文的日志器
type ContextLogger struct {
logger *Logger
ctx context.Context
}
// extractContextFields 从上下文提取字段
func (cl *ContextLogger) extractContextFields() map[string]interface{} {
fields := make(map[string]interface{})
if traceID := cl.ctx.Value("trace_id"); traceID != nil {
fields["trace_id"] = traceID
}
if spanID := cl.ctx.Value("span_id"); spanID != nil {
fields["span_id"] = spanID
}
if userID := cl.ctx.Value("user_id"); userID != nil {
fields["user_id"] = userID
}
if requestID := cl.ctx.Value("request_id"); requestID != nil {
fields["request_id"] = requestID
}
return fields
}
// mergeFields 合并字段
func (cl *ContextLogger) mergeFields(fields ...map[string]interface{}) map[string]interface{} {
contextFields := cl.extractContextFields()
for _, f := range fields {
for k, v := range f {
contextFields[k] = v
}
}
return contextFields
}
// Debug 记录调试日志
func (cl *ContextLogger) Debug(message string, fields ...map[string]interface{}) {
mergedFields := cl.mergeFields(fields...)
cl.logger.log(DebugLevel, message, mergedFields, nil)
}
// Info 记录信息日志
func (cl *ContextLogger) Info(message string, fields ...map[string]interface{}) {
mergedFields := cl.mergeFields(fields...)
cl.logger.log(InfoLevel, message, mergedFields, nil)
}
// Warn 记录警告日志
func (cl *ContextLogger) Warn(message string, fields ...map[string]interface{}) {
mergedFields := cl.mergeFields(fields...)
cl.logger.log(WarnLevel, message, mergedFields, nil)
}
// Error 记录错误日志
func (cl *ContextLogger) Error(message string, err error, fields ...map[string]interface{}) {
mergedFields := cl.mergeFields(fields...)
cl.logger.log(ErrorLevel, message, mergedFields, err)
}
// Fatal 记录致命错误日志
func (cl *ContextLogger) Fatal(message string, err error, fields ...map[string]interface{}) {
mergedFields := cl.mergeFields(fields...)
cl.logger.log(FatalLevel, message, mergedFields, err)
os.Exit(1)
}
// getStackTrace 获取堆栈跟踪
func getStackTrace() string {
buf := make([]byte, 1024)
for {
n := runtime.Stack(buf, false)
if n < len(buf) {
return string(buf[:n])
}
buf = make([]byte, 2*len(buf))
}
}
// LoggingMiddleware 日志中间件
type LoggingMiddleware struct {
logger *Logger
}
func NewLoggingMiddleware(logger *Logger) *LoggingMiddleware {
return &LoggingMiddleware{
logger: logger,
}
}
func (middleware *LoggingMiddleware) Handler(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// 生成请求ID
requestID := generateRequestID()
ctx := context.WithValue(r.Context(), "request_id", requestID)
r = r.WithContext(ctx)
// 创建响应记录器
recorder := &responseRecorder{
ResponseWriter: w,
statusCode: http.StatusOK,
}
// 记录请求开始
contextLogger := middleware.logger.WithContext(ctx)
contextLogger.Info("Request started", map[string]interface{}{
"method": r.Method,
"path": r.URL.Path,
"query": r.URL.RawQuery,
"user_agent": r.UserAgent(),
"remote_addr": r.RemoteAddr,
})
// 处理请求
next.ServeHTTP(recorder, r)
// 记录请求完成
duration := time.Since(start)
level := InfoLevel
if recorder.statusCode >= 400 {
level = WarnLevel
}
if recorder.statusCode >= 500 {
level = ErrorLevel
}
contextLogger.log(level, "Request completed", map[string]interface{}{
"method": r.Method,
"path": r.URL.Path,
"status_code": recorder.statusCode,
"duration_ms": duration.Milliseconds(),
"response_size": recorder.size,
}, nil)
})
}
// responseRecorder 响应记录器
type responseRecorder struct {
http.ResponseWriter
statusCode int
size int
}
func (recorder *responseRecorder) WriteHeader(statusCode int) {
recorder.statusCode = statusCode
recorder.ResponseWriter.WriteHeader(statusCode)
}
func (recorder *responseRecorder) Write(data []byte) (int, error) {
size, err := recorder.ResponseWriter.Write(data)
recorder.size += size
return size, err
}
// generateRequestID 生成请求ID
func generateRequestID() string {
return fmt.Sprintf("%d", time.Now().UnixNano())
}
// 使用示例
func ExampleLogging() {
// 创建日志器
logger := NewLogger(
"user-service",
"v1.0.0",
InfoLevel,
os.Stdout,
&JSONFormatter{},
)
// 创建日志中间件
loggingMiddleware := NewLoggingMiddleware(logger)
// 创建HTTP路由
mux := http.NewServeMux()
mux.HandleFunc("/users", func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
contextLogger := logger.WithContext(ctx)
contextLogger.Info("Processing user request", map[string]interface{}{
"operation": "get_users",
})
// 模拟业务逻辑
users := []string{"user1", "user2", "user3"}
contextLogger.Info("Users retrieved successfully", map[string]interface{}{
"count": len(users),
})
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(users)
})
// 应用日志中间件
handler := loggingMiddleware.Handler(mux)
// 启动服务器
// http.ListenAndServe(":8080", handler)
}
可观测性最佳实践
1. 指标设计原则
- 四个黄金信号:延迟、流量、错误、饱和度
- RED方法:Rate(请求速率)、Errors(错误率)、Duration(持续时间)
- USE方法:Utilization(使用率)、Saturation(饱和度)、Errors(错误)
2. 日志最佳实践
- 结构化日志:使用JSON格式,便于解析和查询
- 日志级别:合理使用DEBUG、INFO、WARN、ERROR、FATAL
- 上下文信息:包含请求ID、用户ID、会话ID等关键信息
- 敏感信息:避免记录密码、令牌等敏感数据
3. 追踪最佳实践
- 采样策略:在高流量环境中使用适当的采样率
- Span命名:使用有意义的操作名称
- 标签使用:添加有助于过滤和分析的标签
- 错误处理:确保错误信息被正确记录
4. 告警最佳实践
- 告警分级:区分不同严重程度的告警
- 告警聚合:避免告警风暴,合理聚合相关告警
- 告警疲劳:设置合理的阈值,避免过多误报
- 可操作性:告警应包含足够的上下文信息和处理建议
5. 仪表板设计
- 分层设计:从概览到详细的层次化视图
- 关键指标:突出显示最重要的业务和技术指标
- 时间范围:提供多种时间范围选择
- 钻取能力:支持从高层视图钻取到详细信息
6. 性能优化
- 指标收集:避免过度收集,影响应用性能
- 存储优化:合理设置数据保留策略
- 查询优化:优化复杂查询的性能
- 网络带宽:考虑监控数据传输的网络开销
7. 安全考虑
- 数据脱敏:确保监控数据不包含敏感信息
- 访问控制:实施适当的权限管理
- 数据加密:保护传输和存储中的监控数据
- 审计日志:记录对监控系统的访问和操作
8. 团队协作
- SLI/SLO定义:明确服务级别指标和目标
- Runbook:为常见问题提供操作手册
- 知识共享:建立监控知识库和最佳实践文档
- 培训计划:确保团队成员具备必要的监控技能
本章总结
本章详细介绍了微服务监控与可观测性的核心概念和实践方法:
主要内容回顾
可观测性概述
- 可观测性三大支柱:指标、日志、追踪
- 监控架构设计和工具选择
- 与微服务架构的集成方案
指标监控(Metrics)
- Prometheus部署和配置
- Go语言指标收集实现
- HTTP、业务、系统、数据库指标
- 指标中间件和使用示例
日志聚合(Logging)
- ELK Stack部署配置
- 结构化日志实现
- 日志格式化和上下文管理
- 日志中间件集成
分布式追踪(Tracing)
- Jaeger部署和配置
- 分布式追踪实现
- HTTP、数据库、消息队列追踪
- 追踪中间件和客户端
告警系统(Alerting)
- Alertmanager配置和部署
- Prometheus告警规则
- Go语言告警客户端
- 健康检查和阈值监控
最佳实践
- 指标设计原则(四个黄金信号、RED/USE方法)
- 日志、追踪、告警最佳实践
- 仪表板设计和性能优化
- 安全考虑和团队协作
技术要点
- 全面覆盖:实现指标、日志、追踪的完整可观测性
- 工具集成:Prometheus、ELK、Jaeger等主流工具的使用
- 代码实现:提供完整的Go语言实现示例
- Kubernetes集成:所有组件都提供K8s部署配置
- 实用性:注重实际应用场景和最佳实践
下一章预告
下一章我们将探讨微服务部署与运维,包括: - 容器化部署策略 - CI/CD流水线设计 - 蓝绿部署和金丝雀发布 - 服务网格运维 - 故障排查和性能调优
通过本章的学习,您应该能够构建完整的微服务可观测性体系,实现对微服务系统的全面监控和管理。
告警系统(Alerting)
Alertmanager配置
# Alertmanager配置
apiVersion: v1
kind: ConfigMap
metadata:
name: alertmanager-config
namespace: monitoring
data:
alertmanager.yml: |
global:
smtp_smarthost: 'smtp.gmail.com:587'
smtp_from: 'alerts@company.com'
smtp_auth_username: 'alerts@company.com'
smtp_auth_password: 'password'
route:
group_by: ['alertname', 'cluster', 'service']
group_wait: 10s
group_interval: 10s
repeat_interval: 1h
receiver: 'web.hook'
routes:
- match:
severity: critical
receiver: 'critical-alerts'
- match:
severity: warning
receiver: 'warning-alerts'
receivers:
- name: 'web.hook'
webhook_configs:
- url: 'http://webhook-service:8080/alerts'
send_resolved: true
- name: 'critical-alerts'
email_configs:
- to: 'oncall@company.com'
subject: '[CRITICAL] {{ .GroupLabels.alertname }}'
body: |
{{ range .Alerts }}
Alert: {{ .Annotations.summary }}
Description: {{ .Annotations.description }}
Labels: {{ range .Labels.SortedPairs }}{{ .Name }}={{ .Value }} {{ end }}
{{ end }}
slack_configs:
- api_url: 'https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK'
channel: '#alerts-critical'
title: 'Critical Alert'
text: '{{ range .Alerts }}{{ .Annotations.summary }}{{ end }}'
- name: 'warning-alerts'
email_configs:
- to: 'team@company.com'
subject: '[WARNING] {{ .GroupLabels.alertname }}'
body: |
{{ range .Alerts }}
Alert: {{ .Annotations.summary }}
Description: {{ .Annotations.description }}
{{ end }}
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: alertmanager
namespace: monitoring
spec:
replicas: 1
selector:
matchLabels:
app: alertmanager
template:
metadata:
labels:
app: alertmanager
spec:
containers:
- name: alertmanager
image: prom/alertmanager:v0.25.0
ports:
- containerPort: 9093
volumeMounts:
- name: config
mountPath: /etc/alertmanager
- name: storage
mountPath: /alertmanager
args:
- '--config.file=/etc/alertmanager/alertmanager.yml'
- '--storage.path=/alertmanager'
- '--web.external-url=http://alertmanager.monitoring.svc.cluster.local:9093'
resources:
requests:
memory: "256Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "200m"
volumes:
- name: config
configMap:
name: alertmanager-config
- name: storage
emptyDir: {}
---
apiVersion: v1
kind: Service
metadata:
name: alertmanager
namespace: monitoring
spec:
selector:
app: alertmanager
ports:
- port: 9093
targetPort: 9093
type: LoadBalancer
Prometheus告警规则
# Prometheus告警规则
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-rules
namespace: monitoring
data:
microservices.rules.yml: |
groups:
- name: microservices.rules
rules:
# 服务可用性告警
- alert: ServiceDown
expr: up == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Service {{ $labels.instance }} is down"
description: "Service {{ $labels.instance }} has been down for more than 1 minute."
# 高错误率告警
- alert: HighErrorRate
expr: |
(
rate(http_requests_total{status=~"5.."}[5m]) /
rate(http_requests_total[5m])
) > 0.05
for: 5m
labels:
severity: warning
annotations:
summary: "High error rate on {{ $labels.service }}"
description: "Error rate is {{ $value | humanizePercentage }} on {{ $labels.service }}."
# 高延迟告警
- alert: HighLatency
expr: |
histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 0.5
for: 5m
labels:
severity: warning
annotations:
summary: "High latency on {{ $labels.service }}"
description: "95th percentile latency is {{ $value }}s on {{ $labels.service }}."
# CPU使用率告警
- alert: HighCPUUsage
expr: |
(
rate(container_cpu_usage_seconds_total[5m]) * 100
) > 80
for: 10m
labels:
severity: warning
annotations:
summary: "High CPU usage on {{ $labels.pod }}"
description: "CPU usage is {{ $value }}% on {{ $labels.pod }}."
# 内存使用率告警
- alert: HighMemoryUsage
expr: |
(
container_memory_working_set_bytes / container_spec_memory_limit_bytes * 100
) > 80
for: 10m
labels:
severity: warning
annotations:
summary: "High memory usage on {{ $labels.pod }}"
description: "Memory usage is {{ $value }}% on {{ $labels.pod }}."
# 磁盘空间告警
- alert: DiskSpaceLow
expr: |
(
(node_filesystem_size_bytes - node_filesystem_free_bytes) / node_filesystem_size_bytes * 100
) > 85
for: 5m
labels:
severity: warning
annotations:
summary: "Disk space low on {{ $labels.instance }}"
description: "Disk usage is {{ $value }}% on {{ $labels.instance }}."
# 数据库连接池告警
- alert: DatabaseConnectionPoolHigh
expr: |
(
db_connections_active / db_connections_max * 100
) > 80
for: 5m
labels:
severity: warning
annotations:
summary: "Database connection pool usage high"
description: "Database connection pool usage is {{ $value }}% on {{ $labels.service }}."
# 消息队列积压告警
- alert: MessageQueueBacklog
expr: |
rabbitmq_queue_messages > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "Message queue backlog on {{ $labels.queue }}"
description: "Queue {{ $labels.queue }} has {{ $value }} messages."
Go语言告警客户端实现
// 告警客户端实现
package alerting
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"time"
)
// AlertLevel 告警级别
type AlertLevel string
const (
AlertLevelInfo AlertLevel = "info"
AlertLevelWarning AlertLevel = "warning"
AlertLevelCritical AlertLevel = "critical"
)
// Alert 告警信息
type Alert struct {
Name string `json:"name"`
Level AlertLevel `json:"level"`
Summary string `json:"summary"`
Description string `json:"description"`
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
StartsAt time.Time `json:"startsAt"`
EndsAt *time.Time `json:"endsAt,omitempty"`
}
// AlertManager 告警管理器接口
type AlertManager interface {
SendAlert(ctx context.Context, alert Alert) error
SendAlerts(ctx context.Context, alerts []Alert) error
ResolveAlert(ctx context.Context, alertName string, labels map[string]string) error
}
// WebhookAlertManager Webhook告警管理器
type WebhookAlertManager struct {
webhookURL string
client *http.Client
timeout time.Duration
}
func NewWebhookAlertManager(webhookURL string, timeout time.Duration) *WebhookAlertManager {
return &WebhookAlertManager{
webhookURL: webhookURL,
client: &http.Client{
Timeout: timeout,
},
timeout: timeout,
}
}
func (wam *WebhookAlertManager) SendAlert(ctx context.Context, alert Alert) error {
return wam.SendAlerts(ctx, []Alert{alert})
}
func (wam *WebhookAlertManager) SendAlerts(ctx context.Context, alerts []Alert) error {
payload, err := json.Marshal(alerts)
if err != nil {
return fmt.Errorf("failed to marshal alerts: %w", err)
}
req, err := http.NewRequestWithContext(ctx, "POST", wam.webhookURL, bytes.NewBuffer(payload))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := wam.client.Do(req)
if err != nil {
return fmt.Errorf("failed to send alert: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
return fmt.Errorf("alert webhook returned status %d", resp.StatusCode)
}
return nil
}
func (wam *WebhookAlertManager) ResolveAlert(ctx context.Context, alertName string, labels map[string]string) error {
now := time.Now()
alert := Alert{
Name: alertName,
Level: AlertLevelInfo,
Summary: fmt.Sprintf("Alert %s resolved", alertName),
Description: fmt.Sprintf("Alert %s has been resolved", alertName),
Labels: labels,
StartsAt: now,
EndsAt: &now,
}
return wam.SendAlert(ctx, alert)
}
// HealthChecker 健康检查器
type HealthChecker struct {
alertManager AlertManager
serviceName string
checkInterval time.Duration
checks map[string]HealthCheck
}
// HealthCheck 健康检查接口
type HealthCheck interface {
Name() string
Check(ctx context.Context) error
}
// DatabaseHealthCheck 数据库健康检查
type DatabaseHealthCheck struct {
name string
db interface{ Ping() error }
}
func NewDatabaseHealthCheck(name string, db interface{ Ping() error }) *DatabaseHealthCheck {
return &DatabaseHealthCheck{
name: name,
db: db,
}
}
func (dhc *DatabaseHealthCheck) Name() string {
return dhc.name
}
func (dhc *DatabaseHealthCheck) Check(ctx context.Context) error {
return dhc.db.Ping()
}
// HTTPHealthCheck HTTP健康检查
type HTTPHealthCheck struct {
name string
url string
client *http.Client
}
func NewHTTPHealthCheck(name, url string, timeout time.Duration) *HTTPHealthCheck {
return &HTTPHealthCheck{
name: name,
url: url,
client: &http.Client{
Timeout: timeout,
},
}
}
func (hhc *HTTPHealthCheck) Name() string {
return hhc.name
}
func (hhc *HTTPHealthCheck) Check(ctx context.Context) error {
req, err := http.NewRequestWithContext(ctx, "GET", hhc.url, nil)
if err != nil {
return err
}
resp, err := hhc.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
return fmt.Errorf("health check failed with status %d", resp.StatusCode)
}
return nil
}
func NewHealthChecker(alertManager AlertManager, serviceName string, checkInterval time.Duration) *HealthChecker {
return &HealthChecker{
alertManager: alertManager,
serviceName: serviceName,
checkInterval: checkInterval,
checks: make(map[string]HealthCheck),
}
}
func (hc *HealthChecker) AddCheck(check HealthCheck) {
hc.checks[check.Name()] = check
}
func (hc *HealthChecker) Start(ctx context.Context) {
ticker := time.NewTicker(hc.checkInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
hc.runChecks(ctx)
}
}
}
func (hc *HealthChecker) runChecks(ctx context.Context) {
for name, check := range hc.checks {
go func(name string, check HealthCheck) {
if err := check.Check(ctx); err != nil {
alert := Alert{
Name: fmt.Sprintf("%s_health_check_failed", name),
Level: AlertLevelCritical,
Summary: fmt.Sprintf("Health check failed for %s", name),
Description: fmt.Sprintf("Health check for %s failed: %v", name, err),
Labels: map[string]string{
"service": hc.serviceName,
"check_name": name,
"check_type": "health",
},
StartsAt: time.Now(),
}
if alertErr := hc.alertManager.SendAlert(ctx, alert); alertErr != nil {
// 记录告警发送失败的日志
fmt.Printf("Failed to send alert: %v\n", alertErr)
}
}
}(name, check)
}
}
// MetricThresholdMonitor 指标阈值监控器
type MetricThresholdMonitor struct {
alertManager AlertManager
serviceName string
thresholds map[string]ThresholdConfig
}
// ThresholdConfig 阈值配置
type ThresholdConfig struct {
MetricName string
Threshold float64
Operator string // ">", "<", ">=", "<=", "=="
Duration time.Duration
AlertLevel AlertLevel
Description string
}
func NewMetricThresholdMonitor(alertManager AlertManager, serviceName string) *MetricThresholdMonitor {
return &MetricThresholdMonitor{
alertManager: alertManager,
serviceName: serviceName,
thresholds: make(map[string]ThresholdConfig),
}
}
func (mtm *MetricThresholdMonitor) AddThreshold(config ThresholdConfig) {
mtm.thresholds[config.MetricName] = config
}
func (mtm *MetricThresholdMonitor) CheckMetric(ctx context.Context, metricName string, value float64) {
config, exists := mtm.thresholds[metricName]
if !exists {
return
}
var triggered bool
switch config.Operator {
case ">":
triggered = value > config.Threshold
case "<":
triggered = value < config.Threshold
case ">=":
triggered = value >= config.Threshold
case "<=":
triggered = value <= config.Threshold
case "==":
triggered = value == config.Threshold
}
if triggered {
alert := Alert{
Name: fmt.Sprintf("%s_threshold_exceeded", metricName),
Level: config.AlertLevel,
Summary: fmt.Sprintf("Metric %s threshold exceeded", metricName),
Description: fmt.Sprintf("%s: current value %.2f %s threshold %.2f", config.Description, value, config.Operator, config.Threshold),
Labels: map[string]string{
"service": mtm.serviceName,
"metric_name": metricName,
"alert_type": "threshold",
},
Annotations: map[string]string{
"current_value": fmt.Sprintf("%.2f", value),
"threshold": fmt.Sprintf("%.2f", config.Threshold),
"operator": config.Operator,
},
StartsAt: time.Now(),
}
if err := mtm.alertManager.SendAlert(ctx, alert); err != nil {
fmt.Printf("Failed to send threshold alert: %v\n", err)
}
}
}
// 使用示例
func ExampleAlerting() {
// 创建告警管理器
alertManager := NewWebhookAlertManager(
"http://alertmanager:9093/api/v1/alerts",
30*time.Second,
)
// 创建健康检查器
healthChecker := NewHealthChecker(
alertManager,
"user-service",
30*time.Second,
)
// 添加数据库健康检查
// dbHealthCheck := NewDatabaseHealthCheck("database", db)
// healthChecker.AddCheck(dbHealthCheck)
// 添加HTTP健康检查
httpHealthCheck := NewHTTPHealthCheck(
"external-api",
"http://external-api/health",
10*time.Second,
)
healthChecker.AddCheck(httpHealthCheck)
// 创建指标阈值监控器
thresholdMonitor := NewMetricThresholdMonitor(alertManager, "user-service")
// 添加阈值配置
thresholdMonitor.AddThreshold(ThresholdConfig{
MetricName: "cpu_usage_percent",
Threshold: 80.0,
Operator: ">",
Duration: 5 * time.Minute,
AlertLevel: AlertLevelWarning,
Description: "High CPU usage detected",
})
thresholdMonitor.AddThreshold(ThresholdConfig{
MetricName: "memory_usage_percent",
Threshold: 90.0,
Operator: ">",
Duration: 5 * time.Minute,
AlertLevel: AlertLevelCritical,
Description: "Critical memory usage detected",
})
// 启动健康检查
ctx := context.Background()
go healthChecker.Start(ctx)
// 模拟指标检查
go func() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// 模拟获取CPU使用率
cpuUsage := 85.0 // 从监控系统获取实际值
thresholdMonitor.CheckMetric(ctx, "cpu_usage_percent", cpuUsage)
// 模拟获取内存使用率
memoryUsage := 75.0 // 从监控系统获取实际值
thresholdMonitor.CheckMetric(ctx, "memory_usage_percent", memoryUsage)
}
}
}()
}
分布式追踪(Tracing)
Jaeger配置
# Jaeger All-in-One部署
apiVersion: apps/v1
kind: Deployment
metadata:
name: jaeger
namespace: tracing
spec:
replicas: 1
selector:
matchLabels:
app: jaeger
template:
metadata:
labels:
app: jaeger
spec:
containers:
- name: jaeger
image: jaegertracing/all-in-one:1.41
ports:
- containerPort: 16686
name: ui
- containerPort: 14268
name: collector
- containerPort: 6831
name: agent-udp
- containerPort: 6832
name: agent-binary
- containerPort: 5778
name: config
env:
- name: COLLECTOR_ZIPKIN_HOST_PORT
value: ":9411"
- name: SPAN_STORAGE_TYPE
value: "elasticsearch"
- name: ES_SERVER_URLS
value: "http://elasticsearch.logging.svc.cluster.local:9200"
- name: ES_INDEX_PREFIX
value: "jaeger"
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
---
apiVersion: v1
kind: Service
metadata:
name: jaeger
namespace: tracing
spec:
selector:
app: jaeger
ports:
- port: 16686
targetPort: 16686
name: ui
- port: 14268
targetPort: 14268
name: collector
- port: 6831
targetPort: 6831
protocol: UDP
name: agent-udp
- port: 6832
targetPort: 6832
protocol: UDP
name: agent-binary
- port: 5778
targetPort: 5778
name: config
type: LoadBalancer
---
# Jaeger Agent DaemonSet
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: jaeger-agent
namespace: tracing
spec:
selector:
matchLabels:
app: jaeger-agent
template:
metadata:
labels:
app: jaeger-agent
spec:
containers:
- name: jaeger-agent
image: jaegertracing/jaeger-agent:1.41
ports:
- containerPort: 6831
protocol: UDP
name: agent-udp
- containerPort: 6832
protocol: UDP
name: agent-binary
- containerPort: 5778
name: config
args:
- "--collector.host-port=jaeger.tracing.svc.cluster.local:14267"
- "--log-level=info"
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "256Mi"
cpu: "200m"
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
Go语言分布式追踪实现
// 分布式追踪实现
package tracing
import (
"context"
"fmt"
"net/http"
"time"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config"
"github.com/uber/jaeger-client-go/log"
"github.com/uber/jaeger-lib/metrics"
)
// TracingConfig 追踪配置
type TracingConfig struct {
ServiceName string
JaegerEndpoint string
SamplingRate float64
LogSpans bool
}
// TracingManager 追踪管理器
type TracingManager struct {
tracer opentracing.Tracer
closer func() error
}
// NewTracingManager 创建追踪管理器
func NewTracingManager(cfg TracingConfig) (*TracingManager, error) {
// 配置Jaeger
jcfg := config.Configuration{
ServiceName: cfg.ServiceName,
Sampler: &config.SamplerConfig{
Type: jaeger.SamplerTypeConst,
Param: cfg.SamplingRate,
},
Reporter: &config.ReporterConfig{
LogSpans: cfg.LogSpans,
BufferFlushInterval: 1 * time.Second,
LocalAgentHostPort: cfg.JaegerEndpoint,
},
}
// 创建追踪器
tracer, closer, err := jcfg.NewTracer(
config.Logger(log.StdLogger),
config.Metrics(metrics.NullFactory),
)
if err != nil {
return nil, fmt.Errorf("failed to create tracer: %w", err)
}
// 设置全局追踪器
opentracing.SetGlobalTracer(tracer)
return &TracingManager{
tracer: tracer,
closer: closer,
}, nil
}
// Close 关闭追踪器
func (tm *TracingManager) Close() error {
if tm.closer != nil {
return tm.closer()
}
return nil
}
// StartSpan 开始新的Span
func (tm *TracingManager) StartSpan(operationName string, opts ...opentracing.StartSpanOption) opentracing.Span {
return tm.tracer.StartSpan(operationName, opts...)
}
// StartSpanFromContext 从上下文开始新的Span
func (tm *TracingManager) StartSpanFromContext(ctx context.Context, operationName string, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) {
return opentracing.StartSpanFromContext(ctx, operationName, opts...)
}
// InjectHTTPHeaders 注入HTTP头
func (tm *TracingManager) InjectHTTPHeaders(span opentracing.Span, req *http.Request) error {
return tm.tracer.Inject(
span.Context(),
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(req.Header),
)
}
// ExtractHTTPHeaders 提取HTTP头
func (tm *TracingManager) ExtractHTTPHeaders(req *http.Request) (opentracing.SpanContext, error) {
return tm.tracer.Extract(
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(req.Header),
)
}
// TracingMiddleware 追踪中间件
type TracingMiddleware struct {
tracer *TracingManager
}
func NewTracingMiddleware(tracer *TracingManager) *TracingMiddleware {
return &TracingMiddleware{
tracer: tracer,
}
}
func (middleware *TracingMiddleware) Handler(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// 尝试从请求头提取span上下文
spanCtx, err := middleware.tracer.ExtractHTTPHeaders(r)
var span opentracing.Span
if err != nil {
// 创建新的根span
span = middleware.tracer.StartSpan(fmt.Sprintf("%s %s", r.Method, r.URL.Path))
} else {
// 创建子span
span = middleware.tracer.StartSpan(
fmt.Sprintf("%s %s", r.Method, r.URL.Path),
opentracing.ChildOf(spanCtx),
)
}
defer span.Finish()
// 设置span标签
ext.HTTPMethod.Set(span, r.Method)
ext.HTTPUrl.Set(span, r.URL.String())
ext.Component.Set(span, "http-server")
// 将span添加到上下文
ctx := opentracing.ContextWithSpan(r.Context(), span)
r = r.WithContext(ctx)
// 创建响应记录器
recorder := &tracingResponseRecorder{
ResponseWriter: w,
statusCode: http.StatusOK,
}
// 处理请求
next.ServeHTTP(recorder, r)
// 设置响应状态码
ext.HTTPStatusCode.Set(span, uint16(recorder.statusCode))
if recorder.statusCode >= 400 {
ext.Error.Set(span, true)
}
})
}
// tracingResponseRecorder 追踪响应记录器
type tracingResponseRecorder struct {
http.ResponseWriter
statusCode int
}
func (recorder *tracingResponseRecorder) WriteHeader(statusCode int) {
recorder.statusCode = statusCode
recorder.ResponseWriter.WriteHeader(statusCode)
}
// HTTPClient 带追踪的HTTP客户端
type HTTPClient struct {
client *http.Client
tracer *TracingManager
}
func NewHTTPClient(tracer *TracingManager) *HTTPClient {
return &HTTPClient{
client: &http.Client{
Timeout: 30 * time.Second,
},
tracer: tracer,
}
}
func (c *HTTPClient) Do(ctx context.Context, req *http.Request) (*http.Response, error) {
// 从上下文获取span
span := opentracing.SpanFromContext(ctx)
if span == nil {
// 创建新的span
span = c.tracer.StartSpan(fmt.Sprintf("HTTP %s", req.Method))
defer span.Finish()
} else {
// 创建子span
childSpan := c.tracer.StartSpan(
fmt.Sprintf("HTTP %s", req.Method),
opentracing.ChildOf(span.Context()),
)
defer childSpan.Finish()
span = childSpan
}
// 设置span标签
ext.HTTPMethod.Set(span, req.Method)
ext.HTTPUrl.Set(span, req.URL.String())
ext.Component.Set(span, "http-client")
ext.SpanKindRPCClient.Set(span)
// 注入追踪头
err := c.tracer.InjectHTTPHeaders(span, req)
if err != nil {
span.LogFields(
opentracing.String("event", "inject_failed"),
opentracing.String("error", err.Error()),
)
}
// 发送请求
resp, err := c.client.Do(req)
if err != nil {
ext.Error.Set(span, true)
span.LogFields(
opentracing.String("event", "request_failed"),
opentracing.String("error", err.Error()),
)
return nil, err
}
// 设置响应状态码
ext.HTTPStatusCode.Set(span, uint16(resp.StatusCode))
if resp.StatusCode >= 400 {
ext.Error.Set(span, true)
}
return resp, nil
}
// DatabaseTracer 数据库追踪器
type DatabaseTracer struct {
tracer *TracingManager
}
func NewDatabaseTracer(tracer *TracingManager) *DatabaseTracer {
return &DatabaseTracer{
tracer: tracer,
}
}
func (dt *DatabaseTracer) TraceQuery(ctx context.Context, query string, args ...interface{}) (opentracing.Span, context.Context) {
span, ctx := dt.tracer.StartSpanFromContext(ctx, "db.query")
// 设置数据库相关标签
ext.DBType.Set(span, "sql")
ext.DBStatement.Set(span, query)
ext.Component.Set(span, "database")
// 记录查询参数
if len(args) > 0 {
span.SetTag("db.args_count", len(args))
}
return span, ctx
}
func (dt *DatabaseTracer) FinishQuery(span opentracing.Span, err error, rowsAffected int64) {
if err != nil {
ext.Error.Set(span, true)
span.LogFields(
opentracing.String("event", "query_failed"),
opentracing.String("error", err.Error()),
)
} else {
span.SetTag("db.rows_affected", rowsAffected)
}
span.Finish()
}
// MessageQueueTracer 消息队列追踪器
type MessageQueueTracer struct {
tracer *TracingManager
}
func NewMessageQueueTracer(tracer *TracingManager) *MessageQueueTracer {
return &MessageQueueTracer{
tracer: tracer,
}
}
func (mqt *MessageQueueTracer) TracePublish(ctx context.Context, topic string, message []byte) (opentracing.Span, context.Context) {
span, ctx := mqt.tracer.StartSpanFromContext(ctx, "mq.publish")
// 设置消息队列相关标签
ext.MessageBusDestination.Set(span, topic)
ext.Component.Set(span, "message-queue")
ext.SpanKindProducer.Set(span)
span.SetTag("mq.message_size", len(message))
return span, ctx
}
func (mqt *MessageQueueTracer) TraceConsume(ctx context.Context, topic string, message []byte) (opentracing.Span, context.Context) {
span, ctx := mqt.tracer.StartSpanFromContext(ctx, "mq.consume")
// 设置消息队列相关标签
ext.MessageBusDestination.Set(span, topic)
ext.Component.Set(span, "message-queue")
ext.SpanKindConsumer.Set(span)
span.SetTag("mq.message_size", len(message))
return span, ctx
}
// 使用示例
func ExampleTracing() {
// 创建追踪管理器
tracingManager, err := NewTracingManager(TracingConfig{
ServiceName: "user-service",
JaegerEndpoint: "localhost:6831",
SamplingRate: 1.0,
LogSpans: true,
})
if err != nil {
panic(err)
}
defer tracingManager.Close()
// 创建追踪中间件
tracingMiddleware := NewTracingMiddleware(tracingManager)
// 创建HTTP客户端
httpClient := NewHTTPClient(tracingManager)
// 创建数据库追踪器
dbTracer := NewDatabaseTracer(tracingManager)
// 创建HTTP路由
mux := http.NewServeMux()
mux.HandleFunc("/users", func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// 数据库查询追踪
span, ctx := dbTracer.TraceQuery(ctx, "SELECT * FROM users WHERE active = ?", true)
// 模拟数据库查询
time.Sleep(10 * time.Millisecond)
// 完成数据库查询
dbTracer.FinishQuery(span, nil, 3)
// HTTP客户端调用追踪
req, _ := http.NewRequest("GET", "http://profile-service/profiles", nil)
resp, err := httpClient.Do(ctx, req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer resp.Body.Close()
// 返回响应
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"users": ["user1", "user2", "user3"]}`))
})
// 应用追踪中间件
handler := tracingMiddleware.Handler(mux)
// 启动服务器
// http.ListenAndServe(":8080", handler)
}