8.1 链路追踪概述
1. 什么是链路追踪
链路追踪(Distributed Tracing)是一种用于监控和分析分布式系统中请求流转的技术。它可以跟踪一个请求在多个服务之间的完整调用链路,帮助开发者理解系统行为、定位性能瓶颈和排查故障。
核心概念
Trace(追踪):
- 表示一个完整的请求调用链
- 包含多个Span
- 具有唯一的Trace ID
Span(跨度):
- 表示调用链中的一个操作
- 包含操作名称、开始时间、结束时间
- 具有唯一的Span ID和父Span ID
Tag(标签):
- 用于描述Span的键值对
- 如:http.method=GET, db.statement=SELECT
Log(日志):
- Span中的时间戳事件
- 记录特定时刻的信息
2. 链路追踪的作用
性能监控:
- 识别慢查询和性能瓶颈
- 分析服务响应时间
- 监控系统吞吐量
故障排查:
- 快速定位错误发生的服务
- 分析错误传播路径
- 理解服务依赖关系
系统优化:
- 发现不必要的服务调用
- 优化服务调用顺序
- 减少网络延迟
容量规划:
- 分析服务负载分布
- 预测系统容量需求
- 优化资源配置
3. 主流链路追踪系统
Zipkin:
- Twitter开源的分布式追踪系统
- 轻量级,易于部署
- 支持多种语言和框架
Jaeger:
- Uber开源的分布式追踪系统
- 高性能,可扩展
- 原生支持OpenTracing
SkyWalking:
- Apache开源的APM系统
- 支持多语言探针
- 提供完整的监控解决方案
8.2 Spring Cloud Sleuth
1. Sleuth 简介
Spring Cloud Sleuth 是 Spring Cloud 提供的分布式追踪解决方案,它可以自动为 Spring Boot 应用添加追踪功能,并与 Zipkin、Jaeger 等追踪系统集成。
核心特性
自动埋点:
- 自动为HTTP请求、消息队列、数据库操作添加追踪
- 无需修改业务代码
- 支持异步操作追踪
上下文传播:
- 自动在服务间传播追踪上下文
- 支持HTTP头、消息头传播
- 维护父子Span关系
采样策略:
- 支持概率采样
- 支持速率限制采样
- 可配置采样规则
2. Sleuth 集成配置
依赖配置
pom.xml
<dependencies>
<!-- Spring Cloud Sleuth -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
<!-- Zipkin 集成 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-sleuth-zipkin</artifactId>
</dependency>
<!-- 异步发送支持 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
</dependencies>
配置文件
application.yml
spring:
application:
name: order-service
sleuth:
# 采样配置
sampler:
# 采样概率 (0.0 到 1.0)
probability: 0.1
# 每秒最大采样数
rate: 100
# Zipkin 配置
zipkin:
# Zipkin 服务地址
base-url: http://localhost:9411
# 发送方式:web(HTTP) 或 rabbit(消息队列)
sender:
type: web
# 追踪配置
trace:
# 是否启用追踪
enabled: true
# Web 配置
web:
# 跳过的URL模式
skip-pattern: "/actuator.*|/health.*"
# 数据库追踪
jdbc:
# 是否包含参数
includes: connection,query,fetch
# 数据源代理
datasource-proxy:
# 查询日志
query:
enable-logging: true
log-level: debug
# HTTP 客户端追踪
http:
# 是否启用
enabled: true
# 消息追踪
messaging:
# RabbitMQ 追踪
rabbit:
enabled: true
# Kafka 追踪
kafka:
enabled: true
# 日志配置
logging:
level:
org.springframework.cloud.sleuth: DEBUG
pattern:
# 包含追踪信息的日志格式
console: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level [%X{traceId:-},%X{spanId:-}] %logger{36} - %msg%n"
8.3 Zipkin 集成
1. Zipkin 服务搭建
Docker 部署
docker-compose.yml
version: '3.8'
services:
# Zipkin 服务
zipkin:
image: openzipkin/zipkin:latest
container_name: zipkin
ports:
- "9411:9411"
environment:
- STORAGE_TYPE=mysql
- MYSQL_HOST=mysql
- MYSQL_TCP_PORT=3306
- MYSQL_DB=zipkin
- MYSQL_USER=zipkin
- MYSQL_PASS=zipkin123
depends_on:
- mysql
networks:
- microservice-network
# MySQL 数据库
mysql:
image: mysql:8.0
container_name: zipkin-mysql
ports:
- "3307:3306"
environment:
- MYSQL_ROOT_PASSWORD=root123
- MYSQL_DATABASE=zipkin
- MYSQL_USER=zipkin
- MYSQL_PASSWORD=zipkin123
volumes:
- zipkin_mysql_data:/var/lib/mysql
- ./sql/zipkin.sql:/docker-entrypoint-initdb.d/zipkin.sql
networks:
- microservice-network
# Elasticsearch (可选)
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.0
container_name: zipkin-elasticsearch
ports:
- "9200:9200"
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- zipkin_es_data:/usr/share/elasticsearch/data
networks:
- microservice-network
volumes:
zipkin_mysql_data:
zipkin_es_data:
networks:
microservice-network:
driver: bridge
Zipkin 数据库初始化
sql/zipkin.sql
CREATE TABLE IF NOT EXISTS zipkin_spans (
`trace_id_high` BIGINT NOT NULL DEFAULT 0 COMMENT 'If non zero, this means the trace uses 128 bit traceIds instead of 64 bit',
`trace_id` BIGINT NOT NULL,
`id` BIGINT NOT NULL,
`name` VARCHAR(255) NOT NULL,
`remote_service_name` VARCHAR(255),
`parent_id` BIGINT,
`debug` BIT(1),
`start_ts` BIGINT COMMENT 'Span.timestamp(): epoch micros used for endTs query and to implement TTL',
`duration` BIGINT COMMENT 'Span.duration(): micros used for minDuration and maxDuration query',
PRIMARY KEY (`trace_id_high`, `trace_id`, `id`)
) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci;
ALTER TABLE zipkin_spans ADD INDEX(`trace_id_high`, `trace_id`) COMMENT 'for getTracesByIds';
ALTER TABLE zipkin_spans ADD INDEX(`name`) COMMENT 'for getTraces and getSpanNames';
ALTER TABLE zipkin_spans ADD INDEX(`remote_service_name`) COMMENT 'for getTraces and getRemoteServiceNames';
ALTER TABLE zipkin_spans ADD INDEX(`start_ts`) COMMENT 'for getTraces ordering and range';
CREATE TABLE IF NOT EXISTS zipkin_annotations (
`trace_id_high` BIGINT NOT NULL DEFAULT 0 COMMENT 'If non zero, this means the trace uses 128 bit traceIds instead of 64 bit',
`trace_id` BIGINT NOT NULL COMMENT 'coincides with zipkin_spans.trace_id',
`span_id` BIGINT NOT NULL COMMENT 'coincides with zipkin_spans.id',
`a_key` VARCHAR(255) NOT NULL COMMENT 'BinaryAnnotation.key or Annotation.value if type == -1',
`a_value` BLOB COMMENT 'BinaryAnnotation.value(), which must be smaller than 64KB',
`a_type` INT NOT NULL COMMENT 'BinaryAnnotation.type() or -1 if Annotation',
`a_timestamp` BIGINT COMMENT 'Used to implement TTL; Annotation.timestamp or zipkin_spans.timestamp',
`endpoint_ipv4` INT COMMENT 'Null when Binary/Annotation.endpoint is null',
`endpoint_ipv6` BINARY(16) COMMENT 'Null when Binary/Annotation.endpoint is null, or no IPv6 address',
`endpoint_port` SMALLINT COMMENT 'Null when Binary/Annotation.endpoint is null',
`endpoint_service_name` VARCHAR(255) COMMENT 'Null when Binary/Annotation.endpoint is null'
) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci;
ALTER TABLE zipkin_annotations ADD UNIQUE KEY(`trace_id_high`, `trace_id`, `span_id`, `a_key`, `a_timestamp`) COMMENT 'Ignore insert on duplicate';
ALTER TABLE zipkin_annotations ADD INDEX(`trace_id_high`, `trace_id`, `span_id`) COMMENT 'for joining with zipkin_spans';
ALTER TABLE zipkin_annotations ADD INDEX(`trace_id_high`, `trace_id`) COMMENT 'for getTraces/ByIds';
ALTER TABLE zipkin_annotations ADD INDEX(`endpoint_service_name`) COMMENT 'for getTraces and getServiceNames';
ALTER TABLE zipkin_annotations ADD INDEX(`a_type`) COMMENT 'for getTraces and autocomplete values';
ALTER TABLE zipkin_annotations ADD INDEX(`a_key`) COMMENT 'for getTraces and autocomplete values';
ALTER TABLE zipkin_annotations ADD INDEX(`trace_id`, `span_id`, `a_key`) COMMENT 'for dependencies job';
CREATE TABLE IF NOT EXISTS zipkin_dependencies (
`day` DATE NOT NULL,
`parent` VARCHAR(255) NOT NULL,
`child` VARCHAR(255) NOT NULL,
`call_count` BIGINT,
`error_count` BIGINT,
PRIMARY KEY (`day`, `parent`, `child`)
) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci;
2. 自定义追踪配置
追踪配置类
TracingConfiguration.java
package com.example.orderservice.config;
import brave.sampler.Sampler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 追踪配置
*/
@Configuration
public class TracingConfiguration {
@Value("${spring.sleuth.sampler.probability:0.1}")
private float samplingRate;
/**
* 自定义采样器
*/
@Bean
public Sampler customSampler() {
return Sampler.create(samplingRate);
}
}
自定义Span标签
CustomSpanTagger.java
package com.example.orderservice.tracing;
import brave.Span;
import brave.Tracer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 自定义Span标签器
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class CustomSpanTagger {
private final Tracer tracer;
/**
* 添加用户信息标签
*/
public void tagUser(Long userId, String username) {
Span currentSpan = tracer.nextSpan();
if (currentSpan != null) {
currentSpan.tag("user.id", String.valueOf(userId));
currentSpan.tag("user.name", username);
}
}
/**
* 添加业务操作标签
*/
public void tagBusinessOperation(String operation, String entityType, String entityId) {
Span currentSpan = tracer.nextSpan();
if (currentSpan != null) {
currentSpan.tag("business.operation", operation);
currentSpan.tag("business.entity.type", entityType);
currentSpan.tag("business.entity.id", entityId);
}
}
/**
* 添加错误信息标签
*/
public void tagError(Throwable throwable) {
Span currentSpan = tracer.nextSpan();
if (currentSpan != null) {
currentSpan.tag("error", "true");
currentSpan.tag("error.class", throwable.getClass().getSimpleName());
currentSpan.tag("error.message", throwable.getMessage());
}
}
/**
* 添加性能指标标签
*/
public void tagPerformance(String metric, String value) {
Span currentSpan = tracer.nextSpan();
if (currentSpan != null) {
currentSpan.tag("performance." + metric, value);
}
}
}
8.4 手动埋点
1. 服务层埋点
订单服务埋点
OrderService.java
package com.example.orderservice.service;
import brave.Span;
import brave.Tracer;
import com.example.orderservice.client.UserServiceClient;
import com.example.orderservice.client.ProductServiceClient;
import com.example.orderservice.dto.OrderDTO;
import com.example.orderservice.entity.Order;
import com.example.orderservice.repository.OrderRepository;
import com.example.orderservice.tracing.CustomSpanTagger;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.sleuth.annotation.NewSpan;
import org.springframework.cloud.sleuth.annotation.SpanTag;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* 订单服务
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderService {
private final OrderRepository orderRepository;
private final UserServiceClient userServiceClient;
private final ProductServiceClient productServiceClient;
private final Tracer tracer;
private final CustomSpanTagger spanTagger;
/**
* 创建订单
*/
@NewSpan("order-creation")
@Transactional
public OrderDTO createOrder(@SpanTag("userId") Long userId,
@SpanTag("productId") Long productId,
@SpanTag("quantity") Integer quantity) {
Span span = tracer.nextSpan().name("create-order").start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
log.info("Creating order for user: {}, product: {}, quantity: {}",
userId, productId, quantity);
// 添加业务标签
spanTagger.tagBusinessOperation("CREATE_ORDER", "ORDER", null);
spanTagger.tagUser(userId, null);
// 验证用户
Span userValidationSpan = tracer.nextSpan().name("validate-user").start();
try (Tracer.SpanInScope userWs = tracer.withSpanInScope(userValidationSpan)) {
boolean userValid = userServiceClient.validateUser(userId);
userValidationSpan.tag("user.valid", String.valueOf(userValid));
if (!userValid) {
throw new RuntimeException("Invalid user: " + userId);
}
} catch (Exception e) {
userValidationSpan.tag("error", "true");
spanTagger.tagError(e);
throw e;
} finally {
userValidationSpan.end();
}
// 检查库存
Span stockCheckSpan = tracer.nextSpan().name("check-stock").start();
try (Tracer.SpanInScope stockWs = tracer.withSpanInScope(stockCheckSpan)) {
boolean stockAvailable = productServiceClient.checkStock(productId, quantity);
stockCheckSpan.tag("stock.available", String.valueOf(stockAvailable));
stockCheckSpan.tag("product.id", String.valueOf(productId));
stockCheckSpan.tag("requested.quantity", String.valueOf(quantity));
if (!stockAvailable) {
throw new RuntimeException("Insufficient stock for product: " + productId);
}
} catch (Exception e) {
stockCheckSpan.tag("error", "true");
spanTagger.tagError(e);
throw e;
} finally {
stockCheckSpan.end();
}
// 扣减库存
Span stockReductionSpan = tracer.nextSpan().name("reduce-stock").start();
try (Tracer.SpanInScope stockWs = tracer.withSpanInScope(stockReductionSpan)) {
boolean stockReduced = productServiceClient.reduceStock(productId, quantity);
stockReductionSpan.tag("stock.reduced", String.valueOf(stockReduced));
if (!stockReduced) {
throw new RuntimeException("Failed to reduce stock for product: " + productId);
}
} catch (Exception e) {
stockReductionSpan.tag("error", "true");
spanTagger.tagError(e);
throw e;
} finally {
stockReductionSpan.end();
}
// 创建订单
Span orderCreationSpan = tracer.nextSpan().name("save-order").start();
Order order;
try (Tracer.SpanInScope orderWs = tracer.withSpanInScope(orderCreationSpan)) {
order = new Order();
order.setUserId(userId);
order.setProductId(productId);
order.setQuantity(quantity);
order.setStatus("CREATED");
order.setCreateTime(System.currentTimeMillis());
order = orderRepository.save(order);
orderCreationSpan.tag("order.id", String.valueOf(order.getId()));
orderCreationSpan.tag("order.status", order.getStatus());
log.info("Order created successfully: {}", order.getId());
} catch (Exception e) {
orderCreationSpan.tag("error", "true");
spanTagger.tagError(e);
throw e;
} finally {
orderCreationSpan.end();
}
// 转换为DTO
OrderDTO orderDTO = convertToDTO(order);
span.tag("order.created", "true");
span.tag("order.id", String.valueOf(order.getId()));
return orderDTO;
} catch (Exception e) {
span.tag("error", "true");
spanTagger.tagError(e);
log.error("Failed to create order", e);
throw e;
} finally {
span.end();
}
}
/**
* 获取订单详情
*/
@NewSpan("get-order")
public OrderDTO getOrder(@SpanTag("orderId") Long orderId) {
Span span = tracer.nextSpan().name("get-order-details").start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
log.info("Getting order details: {}", orderId);
spanTagger.tagBusinessOperation("GET_ORDER", "ORDER", String.valueOf(orderId));
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new RuntimeException("Order not found: " + orderId));
span.tag("order.found", "true");
span.tag("order.status", order.getStatus());
span.tag("order.user.id", String.valueOf(order.getUserId()));
return convertToDTO(order);
} catch (Exception e) {
span.tag("error", "true");
spanTagger.tagError(e);
log.error("Failed to get order: {}", orderId, e);
throw e;
} finally {
span.end();
}
}
/**
* 支付订单
*/
@NewSpan("pay-order")
@Transactional
public void payOrder(@SpanTag("orderId") Long orderId,
@SpanTag("paymentMethod") String paymentMethod) {
Span span = tracer.nextSpan().name("process-payment").start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
log.info("Processing payment for order: {}, method: {}", orderId, paymentMethod);
spanTagger.tagBusinessOperation("PAY_ORDER", "ORDER", String.valueOf(orderId));
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new RuntimeException("Order not found: " + orderId));
if (!"CREATED".equals(order.getStatus())) {
throw new RuntimeException("Order cannot be paid, current status: " + order.getStatus());
}
// 模拟支付处理
Span paymentSpan = tracer.nextSpan().name("external-payment").start();
try (Tracer.SpanInScope paymentWs = tracer.withSpanInScope(paymentSpan)) {
paymentSpan.tag("payment.method", paymentMethod);
paymentSpan.tag("payment.amount", "100.00"); // 模拟金额
// 模拟支付延迟
Thread.sleep(100);
// 模拟支付成功
paymentSpan.tag("payment.result", "success");
} catch (InterruptedException e) {
paymentSpan.tag("error", "true");
Thread.currentThread().interrupt();
throw new RuntimeException("Payment interrupted", e);
} finally {
paymentSpan.end();
}
// 更新订单状态
order.setStatus("PAID");
order.setUpdateTime(System.currentTimeMillis());
orderRepository.save(order);
span.tag("payment.success", "true");
span.tag("order.status.updated", "PAID");
log.info("Payment processed successfully for order: {}", orderId);
} catch (Exception e) {
span.tag("error", "true");
spanTagger.tagError(e);
log.error("Failed to process payment for order: {}", orderId, e);
throw e;
} finally {
span.end();
}
}
private OrderDTO convertToDTO(Order order) {
OrderDTO dto = new OrderDTO();
dto.setId(order.getId());
dto.setUserId(order.getUserId());
dto.setProductId(order.getProductId());
dto.setQuantity(order.getQuantity());
dto.setStatus(order.getStatus());
dto.setCreateTime(order.getCreateTime());
dto.setUpdateTime(order.getUpdateTime());
return dto;
}
}
2. 控制器层埋点
订单控制器埋点
OrderController.java
package com.example.orderservice.controller;
import brave.Span;
import brave.Tracer;
import com.example.orderservice.dto.OrderDTO;
import com.example.orderservice.dto.CreateOrderRequest;
import com.example.orderservice.dto.PayOrderRequest;
import com.example.orderservice.service.OrderService;
import com.example.orderservice.tracing.CustomSpanTagger;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.sleuth.annotation.NewSpan;
import org.springframework.cloud.sleuth.annotation.SpanTag;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
/**
* 订单控制器
*/
@RestController
@RequestMapping("/orders")
@RequiredArgsConstructor
@Slf4j
public class OrderController {
private final OrderService orderService;
private final Tracer tracer;
private final CustomSpanTagger spanTagger;
/**
* 创建订单
*/
@PostMapping
@NewSpan("http-create-order")
public ResponseEntity<OrderDTO> createOrder(@RequestBody CreateOrderRequest request,
HttpServletRequest httpRequest) {
Span span = tracer.nextSpan().name("http-create-order").start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
// 添加HTTP相关标签
span.tag("http.method", "POST");
span.tag("http.url", httpRequest.getRequestURL().toString());
span.tag("http.user_agent", httpRequest.getHeader("User-Agent"));
span.tag("http.remote_addr", httpRequest.getRemoteAddr());
// 添加请求参数标签
span.tag("request.user_id", String.valueOf(request.getUserId()));
span.tag("request.product_id", String.valueOf(request.getProductId()));
span.tag("request.quantity", String.valueOf(request.getQuantity()));
log.info("Received create order request: {}", request);
OrderDTO order = orderService.createOrder(
request.getUserId(),
request.getProductId(),
request.getQuantity()
);
span.tag("http.status_code", "201");
span.tag("response.order_id", String.valueOf(order.getId()));
return ResponseEntity.status(201).body(order);
} catch (Exception e) {
span.tag("error", "true");
span.tag("http.status_code", "500");
spanTagger.tagError(e);
log.error("Failed to create order", e);
return ResponseEntity.status(500).build();
} finally {
span.end();
}
}
/**
* 获取订单详情
*/
@GetMapping("/{orderId}")
@NewSpan("http-get-order")
public ResponseEntity<OrderDTO> getOrder(@PathVariable @SpanTag("orderId") Long orderId,
HttpServletRequest httpRequest) {
Span span = tracer.nextSpan().name("http-get-order").start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
// 添加HTTP相关标签
span.tag("http.method", "GET");
span.tag("http.url", httpRequest.getRequestURL().toString());
span.tag("http.remote_addr", httpRequest.getRemoteAddr());
log.info("Received get order request: {}", orderId);
OrderDTO order = orderService.getOrder(orderId);
span.tag("http.status_code", "200");
span.tag("response.order_status", order.getStatus());
return ResponseEntity.ok(order);
} catch (Exception e) {
span.tag("error", "true");
if (e.getMessage().contains("not found")) {
span.tag("http.status_code", "404");
return ResponseEntity.notFound().build();
} else {
span.tag("http.status_code", "500");
spanTagger.tagError(e);
log.error("Failed to get order: {}", orderId, e);
return ResponseEntity.status(500).build();
}
} finally {
span.end();
}
}
/**
* 支付订单
*/
@PostMapping("/{orderId}/pay")
@NewSpan("http-pay-order")
public ResponseEntity<Void> payOrder(@PathVariable @SpanTag("orderId") Long orderId,
@RequestBody PayOrderRequest request,
HttpServletRequest httpRequest) {
Span span = tracer.nextSpan().name("http-pay-order").start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
// 添加HTTP相关标签
span.tag("http.method", "POST");
span.tag("http.url", httpRequest.getRequestURL().toString());
span.tag("http.remote_addr", httpRequest.getRemoteAddr());
// 添加请求参数标签
span.tag("request.payment_method", request.getPaymentMethod());
log.info("Received pay order request: {}, method: {}", orderId, request.getPaymentMethod());
orderService.payOrder(orderId, request.getPaymentMethod());
span.tag("http.status_code", "200");
span.tag("payment.processed", "true");
return ResponseEntity.ok().build();
} catch (Exception e) {
span.tag("error", "true");
if (e.getMessage().contains("not found")) {
span.tag("http.status_code", "404");
return ResponseEntity.notFound().build();
} else {
span.tag("http.status_code", "500");
spanTagger.tagError(e);
log.error("Failed to pay order: {}", orderId, e);
return ResponseEntity.status(500).build();
}
} finally {
span.end();
}
}
}
3. 数据库操作埋点
自定义数据库追踪
DatabaseTracingAspect.java
package com.example.orderservice.aspect;
import brave.Span;
import brave.Tracer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
/**
* 数据库追踪切面
*/
@Aspect
@Component
@RequiredArgsConstructor
@Slf4j
public class DatabaseTracingAspect {
private final Tracer tracer;
/**
* Repository 方法追踪
*/
@Around("execution(* com.example.orderservice.repository.*.*(..))")
public Object traceRepositoryMethods(ProceedingJoinPoint joinPoint) throws Throwable {
String className = joinPoint.getTarget().getClass().getSimpleName();
String methodName = joinPoint.getSignature().getName();
Span span = tracer.nextSpan()
.name("db-" + className + "-" + methodName)
.tag("db.type", "mysql")
.tag("db.operation", methodName)
.tag("component", "spring-data-jpa")
.start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
long startTime = System.currentTimeMillis();
Object result = joinPoint.proceed();
long duration = System.currentTimeMillis() - startTime;
span.tag("db.duration_ms", String.valueOf(duration));
if (result != null) {
span.tag("db.result.type", result.getClass().getSimpleName());
if (result instanceof java.util.Collection) {
span.tag("db.result.count", String.valueOf(((java.util.Collection<?>) result).size()));
}
}
return result;
} catch (Exception e) {
span.tag("error", "true");
span.tag("error.class", e.getClass().getSimpleName());
span.tag("error.message", e.getMessage());
throw e;
} finally {
span.end();
}
}
}
8.5 异步操作追踪
1. 异步方法追踪
异步服务配置
AsyncConfiguration.java
package com.example.orderservice.config;
import brave.Tracer;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.cloud.sleuth.instrument.async.TraceableExecutorService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
/**
* 异步配置
*/
@Configuration
@EnableAsync
public class AsyncConfiguration {
/**
* 支持追踪的异步执行器
*/
@Bean("traceableTaskExecutor")
public Executor traceableTaskExecutor(BeanFactory beanFactory) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("async-trace-");
executor.initialize();
// 包装为支持追踪的执行器
return new TraceableExecutorService(beanFactory, executor.getThreadPoolExecutor());
}
}
异步服务实现
AsyncOrderService.java
package com.example.orderservice.service;
import brave.Span;
import brave.Tracer;
import com.example.orderservice.tracing.CustomSpanTagger;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.sleuth.annotation.Async;
import org.springframework.cloud.sleuth.annotation.NewSpan;
import org.springframework.cloud.sleuth.annotation.SpanTag;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
/**
* 异步订单服务
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class AsyncOrderService {
private final Tracer tracer;
private final CustomSpanTagger spanTagger;
private final NotificationService notificationService;
private final InventoryService inventoryService;
/**
* 异步处理订单创建后的操作
*/
@Async("traceableTaskExecutor")
@NewSpan("async-post-order-creation")
public CompletableFuture<Void> processOrderCreationAsync(@SpanTag("orderId") Long orderId,
@SpanTag("userId") Long userId) {
Span span = tracer.nextSpan().name("async-order-post-processing").start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
log.info("Starting async post-processing for order: {}", orderId);
spanTagger.tagBusinessOperation("POST_PROCESS_ORDER", "ORDER", String.valueOf(orderId));
// 发送订单确认通知
Span notificationSpan = tracer.nextSpan().name("send-order-notification").start();
try (Tracer.SpanInScope notificationWs = tracer.withSpanInScope(notificationSpan)) {
notificationService.sendOrderConfirmation(orderId, userId);
notificationSpan.tag("notification.sent", "true");
} catch (Exception e) {
notificationSpan.tag("error", "true");
spanTagger.tagError(e);
log.error("Failed to send order notification", e);
} finally {
notificationSpan.end();
}
// 更新库存统计
Span inventorySpan = tracer.nextSpan().name("update-inventory-stats").start();
try (Tracer.SpanInScope inventoryWs = tracer.withSpanInScope(inventorySpan)) {
inventoryService.updateInventoryStatistics(orderId);
inventorySpan.tag("inventory.stats.updated", "true");
} catch (Exception e) {
inventorySpan.tag("error", "true");
spanTagger.tagError(e);
log.error("Failed to update inventory statistics", e);
} finally {
inventorySpan.end();
}
// 模拟一些处理时间
Thread.sleep(500);
span.tag("async.processing.completed", "true");
log.info("Async post-processing completed for order: {}", orderId);
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
span.tag("error", "true");
spanTagger.tagError(e);
log.error("Failed to process order async: {}", orderId, e);
return CompletableFuture.failedFuture(e);
} finally {
span.end();
}
}
/**
* 异步发送邮件通知
*/
@Async("traceableTaskExecutor")
@NewSpan("async-email-notification")
public CompletableFuture<Void> sendEmailNotificationAsync(@SpanTag("userId") Long userId,
@SpanTag("emailType") String emailType,
@SpanTag("orderId") Long orderId) {
Span span = tracer.nextSpan().name("async-email-sending").start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
log.info("Sending async email notification: type={}, userId={}, orderId={}",
emailType, userId, orderId);
span.tag("email.type", emailType);
span.tag("email.recipient.user_id", String.valueOf(userId));
// 模拟邮件发送
Thread.sleep(200);
span.tag("email.sent", "true");
log.info("Email notification sent successfully");
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
span.tag("error", "true");
spanTagger.tagError(e);
log.error("Failed to send email notification", e);
return CompletableFuture.failedFuture(e);
} finally {
span.end();
}
}
}
2. 消息队列追踪
RabbitMQ 消息追踪
OrderMessageProducer.java
package com.example.orderservice.messaging;
import brave.Span;
import brave.Tracer;
import brave.messaging.MessagingTracing;
import brave.messaging.ProducerRequest;
import brave.messaging.ProducerResponse;
import com.example.orderservice.dto.OrderEvent;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.cloud.sleuth.annotation.NewSpan;
import org.springframework.cloud.sleuth.annotation.SpanTag;
import org.springframework.stereotype.Component;
/**
* 订单消息生产者
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class OrderMessageProducer {
private final RabbitTemplate rabbitTemplate;
private final ObjectMapper objectMapper;
private final Tracer tracer;
private final MessagingTracing messagingTracing;
/**
* 发送订单创建事件
*/
@NewSpan("message-send-order-created")
public void sendOrderCreatedEvent(@SpanTag("orderId") Long orderId,
@SpanTag("userId") Long userId) {
Span span = tracer.nextSpan().name("rabbitmq-send").start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
OrderEvent event = new OrderEvent();
event.setOrderId(orderId);
event.setUserId(userId);
event.setEventType("ORDER_CREATED");
event.setTimestamp(System.currentTimeMillis());
String messageBody = objectMapper.writeValueAsString(event);
// 创建消息属性
MessageProperties properties = new MessageProperties();
properties.setContentType("application/json");
properties.setHeader("eventType", "ORDER_CREATED");
properties.setHeader("orderId", String.valueOf(orderId));
// 添加追踪信息到消息头
Message message = new Message(messageBody.getBytes(), properties);
// 添加消息追踪标签
span.tag("messaging.system", "rabbitmq");
span.tag("messaging.destination", "order.events");
span.tag("messaging.operation", "send");
span.tag("messaging.message_id", properties.getMessageId());
span.tag("message.event_type", "ORDER_CREATED");
span.tag("message.order_id", String.valueOf(orderId));
// 发送消息
rabbitTemplate.send("order.events", "order.created", message);
span.tag("message.sent", "true");
log.info("Order created event sent: orderId={}", orderId);
} catch (Exception e) {
span.tag("error", "true");
span.tag("error.class", e.getClass().getSimpleName());
span.tag("error.message", e.getMessage());
log.error("Failed to send order created event", e);
throw new RuntimeException("Failed to send message", e);
} finally {
span.end();
}
}
/**
* 发送订单支付事件
*/
@NewSpan("message-send-order-paid")
public void sendOrderPaidEvent(@SpanTag("orderId") Long orderId,
@SpanTag("paymentMethod") String paymentMethod) {
Span span = tracer.nextSpan().name("rabbitmq-send-payment").start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
OrderEvent event = new OrderEvent();
event.setOrderId(orderId);
event.setEventType("ORDER_PAID");
event.setPaymentMethod(paymentMethod);
event.setTimestamp(System.currentTimeMillis());
String messageBody = objectMapper.writeValueAsString(event);
MessageProperties properties = new MessageProperties();
properties.setContentType("application/json");
properties.setHeader("eventType", "ORDER_PAID");
properties.setHeader("orderId", String.valueOf(orderId));
properties.setHeader("paymentMethod", paymentMethod);
Message message = new Message(messageBody.getBytes(), properties);
span.tag("messaging.system", "rabbitmq");
span.tag("messaging.destination", "order.events");
span.tag("messaging.operation", "send");
span.tag("message.event_type", "ORDER_PAID");
span.tag("message.order_id", String.valueOf(orderId));
span.tag("message.payment_method", paymentMethod);
rabbitTemplate.send("order.events", "order.paid", message);
span.tag("message.sent", "true");
log.info("Order paid event sent: orderId={}, paymentMethod={}", orderId, paymentMethod);
} catch (Exception e) {
span.tag("error", "true");
span.tag("error.class", e.getClass().getSimpleName());
span.tag("error.message", e.getMessage());
log.error("Failed to send order paid event", e);
throw new RuntimeException("Failed to send message", e);
} finally {
span.end();
}
}
}
消息消费者追踪
OrderMessageConsumer.java
package com.example.orderservice.messaging;
import brave.Span;
import brave.Tracer;
import com.example.orderservice.dto.OrderEvent;
import com.example.orderservice.tracing.CustomSpanTagger;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.cloud.sleuth.annotation.NewSpan;
import org.springframework.stereotype.Component;
/**
* 订单消息消费者
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class OrderMessageConsumer {
private final ObjectMapper objectMapper;
private final Tracer tracer;
private final CustomSpanTagger spanTagger;
/**
* 处理订单创建事件
*/
@RabbitListener(queues = "order.created.queue")
@NewSpan("message-consume-order-created")
public void handleOrderCreatedEvent(Message message) {
Span span = tracer.nextSpan().name("rabbitmq-consume").start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
String messageBody = new String(message.getBody());
OrderEvent event = objectMapper.readValue(messageBody, OrderEvent.class);
// 添加消息追踪标签
span.tag("messaging.system", "rabbitmq");
span.tag("messaging.destination", "order.created.queue");
span.tag("messaging.operation", "receive");
span.tag("message.event_type", event.getEventType());
span.tag("message.order_id", String.valueOf(event.getOrderId()));
span.tag("message.user_id", String.valueOf(event.getUserId()));
log.info("Processing order created event: {}", event);
spanTagger.tagBusinessOperation("PROCESS_ORDER_CREATED_EVENT", "ORDER_EVENT",
String.valueOf(event.getOrderId()));
// 处理业务逻辑
processOrderCreatedEvent(event);
span.tag("message.processed", "true");
log.info("Order created event processed successfully: orderId={}", event.getOrderId());
} catch (Exception e) {
span.tag("error", "true");
spanTagger.tagError(e);
log.error("Failed to process order created event", e);
throw new RuntimeException("Message processing failed", e);
} finally {
span.end();
}
}
/**
* 处理订单支付事件
*/
@RabbitListener(queues = "order.paid.queue")
@NewSpan("message-consume-order-paid")
public void handleOrderPaidEvent(Message message) {
Span span = tracer.nextSpan().name("rabbitmq-consume-payment").start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
String messageBody = new String(message.getBody());
OrderEvent event = objectMapper.readValue(messageBody, OrderEvent.class);
span.tag("messaging.system", "rabbitmq");
span.tag("messaging.destination", "order.paid.queue");
span.tag("messaging.operation", "receive");
span.tag("message.event_type", event.getEventType());
span.tag("message.order_id", String.valueOf(event.getOrderId()));
span.tag("message.payment_method", event.getPaymentMethod());
log.info("Processing order paid event: {}", event);
spanTagger.tagBusinessOperation("PROCESS_ORDER_PAID_EVENT", "ORDER_EVENT",
String.valueOf(event.getOrderId()));
// 处理业务逻辑
processOrderPaidEvent(event);
span.tag("message.processed", "true");
log.info("Order paid event processed successfully: orderId={}", event.getOrderId());
} catch (Exception e) {
span.tag("error", "true");
spanTagger.tagError(e);
log.error("Failed to process order paid event", e);
throw new RuntimeException("Message processing failed", e);
} finally {
span.end();
}
}
private void processOrderCreatedEvent(OrderEvent event) {
// 实现订单创建事件处理逻辑
// 例如:发送通知、更新统计等
}
private void processOrderPaidEvent(OrderEvent event) {
// 实现订单支付事件处理逻辑
// 例如:发货通知、更新库存等
}
}
8.6 总结
核心概念回顾
链路追踪的价值:
- 性能监控:识别瓶颈和优化点
- 故障排查:快速定位问题根源
- 依赖分析:理解服务调用关系
- 容量规划:分析系统负载分布
Spring Cloud Sleuth特性:
- 自动埋点:无侵入式追踪
- 上下文传播:跨服务追踪
- 灵活采样:控制追踪开销
- 多系统集成:支持Zipkin、Jaeger等
追踪数据结构:
- Trace:完整的请求调用链
- Span:单个操作的时间跨度
- Tag:描述性的键值对标签
- Log:时间戳事件记录
最佳实践
埋点策略:
- 关键业务操作必须埋点
- 外部服务调用重点追踪
- 数据库操作性能监控
- 异步操作上下文传播
标签设计:
- 使用有意义的标签名称
- 包含关键业务信息
- 避免高基数标签
- 统一标签命名规范
采样配置:
- 生产环境适当降低采样率
- 关键服务提高采样率
- 错误请求全量采样
- 动态调整采样策略
性能优化:
- 异步发送追踪数据
- 合理设置缓冲区大小
- 监控追踪系统性能
- 避免过度追踪影响性能
注意事项
性能影响:
- 追踪会增加系统开销
- 合理控制采样率
- 监控追踪系统资源使用
- 避免在热点路径过度埋点
数据安全:
- 避免在标签中记录敏感信息
- 对敏感数据进行脱敏处理
- 控制追踪数据的访问权限
- 定期清理历史追踪数据
系统稳定性:
- 追踪系统故障不应影响业务
- 实现追踪数据的降级策略
- 监控追踪系统的健康状态
- 准备追踪系统的容灾方案
数据质量:
- 确保追踪数据的完整性
- 验证追踪链路的正确性
- 处理追踪数据的异常情况
- 建立追踪数据的质量监控
扩展方向
高级功能:
- 自定义采样策略
- 追踪数据分析和可视化
- 基于追踪的性能优化
- 追踪数据的机器学习应用
集成组件:
- APM系统集成
- 日志系统关联
- 监控告警联动
- DevOps工具链集成
多语言支持:
- 跨语言追踪传播
- 统一追踪标准
- 多技术栈集成
- 追踪数据格式统一
通过本章的学习,我们掌握了Spring Cloud微服务架构中链路追踪的核心技术。链路追踪是微服务监控的重要组成部分,它帮助我们理解系统行为、定位性能问题、优化服务调用。在实际项目中,需要根据业务特点和性能要求,合理设计追踪策略,确保在获得有价值监控数据的同时,不影响系统的正常运行。
下一章我们将学习服务安全,了解如何在微服务架构中实现认证、授权和安全防护。