7.1 熔断降级概述
1. 什么是服务熔断
服务熔断是一种保护机制,当某个服务出现故障或响应时间过长时,熔断器会自动切断对该服务的调用,避免故障扩散到整个系统。这类似于电路中的保险丝,当电流过大时自动断开电路保护设备。
熔断器状态
关闭状态(Closed):
- 正常情况下,熔断器处于关闭状态
- 所有请求正常通过
- 统计失败率和响应时间
开启状态(Open):
- 当失败率达到阈值时,熔断器开启
- 所有请求直接返回错误或降级响应
- 不会调用实际的服务
半开状态(Half-Open):
- 经过一定时间后,熔断器进入半开状态
- 允许少量请求通过进行测试
- 根据测试结果决定是关闭还是重新开启
2. 什么是服务降级
服务降级是指当系统压力过大或某些服务不可用时,主动关闭一些非核心功能,或者返回预设的默认值,以保证核心功能的正常运行。
降级策略
- 功能降级:关闭非核心功能
- 读降级:只提供读操作,暂停写操作
- 写降级:只允许核心写操作
- 缓存降级:返回缓存数据,不调用实时服务
- 默认值降级:返回预设的默认响应
3. 熔断与降级的关系
- 熔断:是一种自动保护机制,基于实时监控数据
- 降级:可以是主动的,也可以是被动的
- 配合使用:熔断触发后通常会执行降级策略
7.2 Spring Cloud Circuit Breaker
1. Circuit Breaker 简介
Spring Cloud Circuit Breaker 提供了一个抽象层,支持多种熔断器实现: - Resilience4j:轻量级的熔断器库 - Hystrix:Netflix开源的熔断器(已进入维护模式) - Sentinel:阿里巴巴开源的流量控制组件
2. Resilience4j 集成
依赖配置
pom.xml
<dependencies>
<!-- Spring Cloud Circuit Breaker -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-circuitbreaker-resilience4j</artifactId>
</dependency>
<!-- Resilience4j Spring Boot Starter -->
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot2</artifactId>
</dependency>
<!-- Resilience4j Micrometer -->
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-micrometer</artifactId>
</dependency>
<!-- Spring Boot Actuator -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
配置文件
application.yml
spring:
application:
name: order-service
cloud:
circuitbreaker:
resilience4j:
enabled: true
# Resilience4j 配置
resilience4j:
circuitbreaker:
configs:
default:
# 熔断器开启的失败阈值百分比
failure-rate-threshold: 50
# 慢调用阈值时间
slow-call-duration-threshold: 2s
# 慢调用百分比阈值
slow-call-rate-threshold: 50
# 滑动窗口大小
sliding-window-size: 10
# 滑动窗口类型:COUNT_BASED 或 TIME_BASED
sliding-window-type: COUNT_BASED
# 最小调用次数
minimum-number-of-calls: 5
# 熔断器开启后等待时间
wait-duration-in-open-state: 10s
# 半开状态下允许的调用次数
permitted-number-of-calls-in-half-open-state: 3
# 是否自动从开启状态转换到半开状态
automatic-transition-from-open-to-half-open-enabled: true
# 记录的异常类型
record-exceptions:
- java.lang.Exception
# 忽略的异常类型
ignore-exceptions:
- java.lang.IllegalArgumentException
instances:
userService:
base-config: default
failure-rate-threshold: 60
productService:
base-config: default
wait-duration-in-open-state: 5s
# 重试配置
retry:
configs:
default:
max-attempts: 3
wait-duration: 1s
retry-exceptions:
- java.net.ConnectException
- java.net.SocketTimeoutException
instances:
userService:
base-config: default
productService:
base-config: default
max-attempts: 2
# 限流配置
ratelimiter:
configs:
default:
limit-for-period: 10
limit-refresh-period: 1s
timeout-duration: 0s
instances:
userService:
base-config: default
productService:
base-config: default
limit-for-period: 5
# 舱壁隔离配置
bulkhead:
configs:
default:
max-concurrent-calls: 10
max-wait-duration: 1s
instances:
userService:
base-config: default
productService:
base-config: default
max-concurrent-calls: 5
# 监控端点配置
management:
endpoints:
web:
exposure:
include: health,info,metrics,circuitbreakers,ratelimiters,retries,bulkheads
endpoint:
health:
show-details: always
health:
circuitbreakers:
enabled: true
ratelimiters:
enabled: true
7.3 熔断器实现
1. 基于注解的熔断器
用户服务客户端
UserServiceClient.java
package com.example.orderservice.client;
import com.example.orderservice.dto.UserDTO;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import io.github.resilience4j.retry.annotation.Retry;
import io.github.resilience4j.ratelimiter.annotation.RateLimiter;
import io.github.resilience4j.bulkhead.annotation.Bulkhead;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
/**
* 用户服务客户端
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class UserServiceClient {
private final RestTemplate restTemplate;
private static final String USER_SERVICE_URL = "http://user-service";
/**
* 获取用户信息
*/
@CircuitBreaker(name = "userService", fallbackMethod = "getUserFallback")
@Retry(name = "userService")
@RateLimiter(name = "userService")
@Bulkhead(name = "userService")
public UserDTO getUser(Long userId) {
log.info("Calling user service for user: {}", userId);
String url = USER_SERVICE_URL + "/users/" + userId;
UserDTO user = restTemplate.getForObject(url, UserDTO.class);
log.info("User service response: {}", user);
return user;
}
/**
* 验证用户
*/
@CircuitBreaker(name = "userService", fallbackMethod = "validateUserFallback")
@Retry(name = "userService")
@RateLimiter(name = "userService")
public boolean validateUser(Long userId) {
log.info("Validating user: {}", userId);
try {
String url = USER_SERVICE_URL + "/users/" + userId + "/validate";
Boolean result = restTemplate.getForObject(url, Boolean.class);
log.info("User validation result: {}", result);
return Boolean.TRUE.equals(result);
} catch (Exception e) {
log.error("Error validating user: {}", userId, e);
throw e;
}
}
/**
* 获取用户信息降级方法
*/
public UserDTO getUserFallback(Long userId, Exception ex) {
log.warn("User service fallback triggered for user: {}, reason: {}",
userId, ex.getMessage());
// 返回默认用户信息
UserDTO fallbackUser = new UserDTO();
fallbackUser.setId(userId);
fallbackUser.setUsername("Unknown User");
fallbackUser.setEmail("unknown@example.com");
fallbackUser.setStatus("INACTIVE");
return fallbackUser;
}
/**
* 验证用户降级方法
*/
public boolean validateUserFallback(Long userId, Exception ex) {
log.warn("User validation fallback triggered for user: {}, reason: {}",
userId, ex.getMessage());
// 降级策略:返回false,拒绝订单创建
return false;
}
}
商品服务客户端
ProductServiceClient.java
package com.example.orderservice.client;
import com.example.orderservice.dto.ProductDTO;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import io.github.resilience4j.retry.annotation.Retry;
import io.github.resilience4j.ratelimiter.annotation.RateLimiter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.TimeUnit;
/**
* 商品服务客户端
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class ProductServiceClient {
private final RestTemplate restTemplate;
private final RedisTemplate<String, Object> redisTemplate;
private static final String PRODUCT_SERVICE_URL = "http://product-service";
private static final String PRODUCT_CACHE_PREFIX = "product:";
/**
* 获取商品信息
*/
@CircuitBreaker(name = "productService", fallbackMethod = "getProductFallback")
@Retry(name = "productService")
@RateLimiter(name = "productService")
public ProductDTO getProduct(Long productId) {
log.info("Calling product service for product: {}", productId);
String url = PRODUCT_SERVICE_URL + "/products/" + productId;
ProductDTO product = restTemplate.getForObject(url, ProductDTO.class);
// 缓存商品信息
if (product != null) {
String cacheKey = PRODUCT_CACHE_PREFIX + productId;
redisTemplate.opsForValue().set(cacheKey, product, 30, TimeUnit.MINUTES);
}
log.info("Product service response: {}", product);
return product;
}
/**
* 检查库存
*/
@CircuitBreaker(name = "productService", fallbackMethod = "checkStockFallback")
@Retry(name = "productService")
public boolean checkStock(Long productId, Integer quantity) {
log.info("Checking stock for product: {}, quantity: {}", productId, quantity);
try {
String url = PRODUCT_SERVICE_URL + "/products/" + productId + "/stock?quantity=" + quantity;
Boolean result = restTemplate.getForObject(url, Boolean.class);
log.info("Stock check result: {}", result);
return Boolean.TRUE.equals(result);
} catch (Exception e) {
log.error("Error checking stock for product: {}", productId, e);
throw e;
}
}
/**
* 扣减库存
*/
@CircuitBreaker(name = "productService", fallbackMethod = "reduceStockFallback")
@Retry(name = "productService")
public boolean reduceStock(Long productId, Integer quantity) {
log.info("Reducing stock for product: {}, quantity: {}", productId, quantity);
try {
String url = PRODUCT_SERVICE_URL + "/products/" + productId + "/stock/reduce";
StockRequest request = new StockRequest(productId, quantity);
Boolean result = restTemplate.postForObject(url, request, Boolean.class);
log.info("Stock reduction result: {}", result);
return Boolean.TRUE.equals(result);
} catch (Exception e) {
log.error("Error reducing stock for product: {}", productId, e);
throw e;
}
}
/**
* 获取商品信息降级方法
*/
public ProductDTO getProductFallback(Long productId, Exception ex) {
log.warn("Product service fallback triggered for product: {}, reason: {}",
productId, ex.getMessage());
// 尝试从缓存获取
String cacheKey = PRODUCT_CACHE_PREFIX + productId;
ProductDTO cachedProduct = (ProductDTO) redisTemplate.opsForValue().get(cacheKey);
if (cachedProduct != null) {
log.info("Returning cached product: {}", productId);
return cachedProduct;
}
// 返回默认商品信息
ProductDTO fallbackProduct = new ProductDTO();
fallbackProduct.setId(productId);
fallbackProduct.setName("商品暂时不可用");
fallbackProduct.setPrice(0.0);
fallbackProduct.setStock(0);
fallbackProduct.setStatus("UNAVAILABLE");
return fallbackProduct;
}
/**
* 检查库存降级方法
*/
public boolean checkStockFallback(Long productId, Integer quantity, Exception ex) {
log.warn("Stock check fallback triggered for product: {}, reason: {}",
productId, ex.getMessage());
// 降级策略:返回false,表示库存不足
return false;
}
/**
* 扣减库存降级方法
*/
public boolean reduceStockFallback(Long productId, Integer quantity, Exception ex) {
log.warn("Stock reduction fallback triggered for product: {}, reason: {}",
productId, ex.getMessage());
// 降级策略:返回false,表示扣减失败
return false;
}
/**
* 库存请求DTO
*/
public static class StockRequest {
private Long productId;
private Integer quantity;
public StockRequest(Long productId, Integer quantity) {
this.productId = productId;
this.quantity = quantity;
}
// getters and setters
public Long getProductId() { return productId; }
public void setProductId(Long productId) { this.productId = productId; }
public Integer getQuantity() { return quantity; }
public void setQuantity(Integer quantity) { this.quantity = quantity; }
}
}
2. 编程式熔断器
熔断器工厂
CircuitBreakerFactory.java
package com.example.orderservice.config;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.function.Supplier;
/**
* 熔断器工厂
*/
@Component
@RequiredArgsConstructor
public class CircuitBreakerFactory {
private final CircuitBreakerRegistry circuitBreakerRegistry;
/**
* 创建熔断器
*/
public CircuitBreaker createCircuitBreaker(String name, CircuitBreakerConfig config) {
return circuitBreakerRegistry.circuitBreaker(name, config);
}
/**
* 创建默认熔断器
*/
public CircuitBreaker createDefaultCircuitBreaker(String name) {
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.slowCallRateThreshold(50)
.slowCallDurationThreshold(Duration.ofSeconds(2))
.slidingWindowSize(10)
.minimumNumberOfCalls(5)
.waitDurationInOpenState(Duration.ofSeconds(10))
.permittedNumberOfCallsInHalfOpenState(3)
.automaticTransitionFromOpenToHalfOpenEnabled(true)
.build();
return createCircuitBreaker(name, config);
}
/**
* 执行带熔断保护的操作
*/
public <T> T executeWithCircuitBreaker(String circuitBreakerName,
Supplier<T> operation,
Supplier<T> fallback) {
CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker(circuitBreakerName);
Supplier<T> decoratedSupplier = CircuitBreaker.decorateSupplier(circuitBreaker, operation);
try {
return decoratedSupplier.get();
} catch (Exception e) {
return fallback.get();
}
}
}
编程式服务调用
ProgrammaticServiceClient.java
package com.example.orderservice.service;
import com.example.orderservice.config.CircuitBreakerFactory;
import com.example.orderservice.dto.UserDTO;
import com.example.orderservice.dto.ProductDTO;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
/**
* 编程式服务调用
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class ProgrammaticServiceClient {
private final RestTemplate restTemplate;
private final CircuitBreakerFactory circuitBreakerFactory;
/**
* 获取用户信息(编程式熔断)
*/
public UserDTO getUserWithCircuitBreaker(Long userId) {
return circuitBreakerFactory.executeWithCircuitBreaker(
"programmatic-user-service",
() -> {
log.info("Calling user service for user: {}", userId);
String url = "http://user-service/users/" + userId;
return restTemplate.getForObject(url, UserDTO.class);
},
() -> {
log.warn("User service circuit breaker fallback for user: {}", userId);
UserDTO fallbackUser = new UserDTO();
fallbackUser.setId(userId);
fallbackUser.setUsername("Fallback User");
fallbackUser.setEmail("fallback@example.com");
return fallbackUser;
}
);
}
/**
* 获取商品信息(编程式熔断)
*/
public ProductDTO getProductWithCircuitBreaker(Long productId) {
CircuitBreaker circuitBreaker = circuitBreakerFactory.createDefaultCircuitBreaker("product-service");
return circuitBreaker.executeSupplier(() -> {
log.info("Calling product service for product: {}", productId);
String url = "http://product-service/products/" + productId;
ProductDTO product = restTemplate.getForObject(url, ProductDTO.class);
if (product == null) {
throw new RuntimeException("Product not found: " + productId);
}
return product;
});
}
}
7.4 降级策略实现
1. 多级降级策略
降级策略管理器
FallbackStrategyManager.java
package com.example.orderservice.service;
import com.example.orderservice.dto.ProductDTO;
import com.example.orderservice.dto.UserDTO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
/**
* 降级策略管理器
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class FallbackStrategyManager {
private final RedisTemplate<String, Object> redisTemplate;
private final DatabaseFallbackService databaseFallbackService;
/**
* 用户信息多级降级
*/
public UserDTO getUserWithMultiLevelFallback(Long userId) {
// 第一级:尝试从缓存获取
UserDTO cachedUser = getUserFromCache(userId);
if (cachedUser != null) {
log.info("User found in cache: {}", userId);
return cachedUser;
}
// 第二级:尝试从数据库获取
UserDTO dbUser = databaseFallbackService.getUserFromDatabase(userId);
if (dbUser != null) {
log.info("User found in database: {}", userId);
// 更新缓存
cacheUser(userId, dbUser);
return dbUser;
}
// 第三级:返回默认用户
log.warn("Using default fallback for user: {}", userId);
return createDefaultUser(userId);
}
/**
* 商品信息多级降级
*/
public ProductDTO getProductWithMultiLevelFallback(Long productId) {
// 第一级:尝试从缓存获取
ProductDTO cachedProduct = getProductFromCache(productId);
if (cachedProduct != null) {
log.info("Product found in cache: {}", productId);
return cachedProduct;
}
// 第二级:尝试从数据库获取基本信息
ProductDTO dbProduct = databaseFallbackService.getProductFromDatabase(productId);
if (dbProduct != null) {
log.info("Product found in database: {}", productId);
// 更新缓存
cacheProduct(productId, dbProduct);
return dbProduct;
}
// 第三级:返回默认商品
log.warn("Using default fallback for product: {}", productId);
return createDefaultProduct(productId);
}
/**
* 从缓存获取用户
*/
private UserDTO getUserFromCache(Long userId) {
try {
String cacheKey = "user:" + userId;
return (UserDTO) redisTemplate.opsForValue().get(cacheKey);
} catch (Exception e) {
log.error("Error getting user from cache: {}", userId, e);
return null;
}
}
/**
* 缓存用户信息
*/
private void cacheUser(Long userId, UserDTO user) {
try {
String cacheKey = "user:" + userId;
redisTemplate.opsForValue().set(cacheKey, user, 30, TimeUnit.MINUTES);
} catch (Exception e) {
log.error("Error caching user: {}", userId, e);
}
}
/**
* 从缓存获取商品
*/
private ProductDTO getProductFromCache(Long productId) {
try {
String cacheKey = "product:" + productId;
return (ProductDTO) redisTemplate.opsForValue().get(cacheKey);
} catch (Exception e) {
log.error("Error getting product from cache: {}", productId, e);
return null;
}
}
/**
* 缓存商品信息
*/
private void cacheProduct(Long productId, ProductDTO product) {
try {
String cacheKey = "product:" + productId;
redisTemplate.opsForValue().set(cacheKey, product, 30, TimeUnit.MINUTES);
} catch (Exception e) {
log.error("Error caching product: {}", productId, e);
}
}
/**
* 创建默认用户
*/
private UserDTO createDefaultUser(Long userId) {
UserDTO defaultUser = new UserDTO();
defaultUser.setId(userId);
defaultUser.setUsername("临时用户");
defaultUser.setEmail("temp@example.com");
defaultUser.setStatus("TEMP");
return defaultUser;
}
/**
* 创建默认商品
*/
private ProductDTO createDefaultProduct(Long productId) {
ProductDTO defaultProduct = new ProductDTO();
defaultProduct.setId(productId);
defaultProduct.setName("商品暂时不可用");
defaultProduct.setPrice(0.0);
defaultProduct.setStock(0);
defaultProduct.setStatus("UNAVAILABLE");
return defaultProduct;
}
}
2. 数据库降级服务
DatabaseFallbackService.java
package com.example.orderservice.service;
import com.example.orderservice.dto.ProductDTO;
import com.example.orderservice.dto.UserDTO;
import com.example.orderservice.entity.User;
import com.example.orderservice.entity.Product;
import com.example.orderservice.repository.UserRepository;
import com.example.orderservice.repository.ProductRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* 数据库降级服务
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class DatabaseFallbackService {
private final UserRepository userRepository;
private final ProductRepository productRepository;
/**
* 从数据库获取用户信息
*/
public UserDTO getUserFromDatabase(Long userId) {
try {
User user = userRepository.findById(userId).orElse(null);
if (user != null) {
return convertToUserDTO(user);
}
} catch (Exception e) {
log.error("Error getting user from database: {}", userId, e);
}
return null;
}
/**
* 从数据库获取商品信息
*/
public ProductDTO getProductFromDatabase(Long productId) {
try {
Product product = productRepository.findById(productId).orElse(null);
if (product != null) {
return convertToProductDTO(product);
}
} catch (Exception e) {
log.error("Error getting product from database: {}", productId, e);
}
return null;
}
/**
* 转换用户实体为DTO
*/
private UserDTO convertToUserDTO(User user) {
UserDTO userDTO = new UserDTO();
userDTO.setId(user.getId());
userDTO.setUsername(user.getUsername());
userDTO.setEmail(user.getEmail());
userDTO.setStatus(user.getStatus());
return userDTO;
}
/**
* 转换商品实体为DTO
*/
private ProductDTO convertToProductDTO(Product product) {
ProductDTO productDTO = new ProductDTO();
productDTO.setId(product.getId());
productDTO.setName(product.getName());
productDTO.setPrice(product.getPrice());
productDTO.setStock(product.getStock());
productDTO.setStatus(product.getStatus());
return productDTO;
}
}
3. 动态降级配置
降级配置管理
FallbackConfigManager.java
package com.example.orderservice.config;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
/**
* 降级配置管理
*/
@Component
@ConfigurationProperties(prefix = "fallback")
@RefreshScope
@Data
@Slf4j
public class FallbackConfigManager {
private Map<String, ServiceFallbackConfig> services = new HashMap<>();
private GlobalFallbackConfig global = new GlobalFallbackConfig();
@PostConstruct
public void init() {
log.info("Fallback configuration initialized: {}", this);
}
/**
* 获取服务降级配置
*/
public ServiceFallbackConfig getServiceConfig(String serviceName) {
return services.getOrDefault(serviceName, new ServiceFallbackConfig());
}
/**
* 是否启用降级
*/
public boolean isFallbackEnabled(String serviceName) {
ServiceFallbackConfig config = getServiceConfig(serviceName);
return config.isEnabled() && global.isEnabled();
}
/**
* 获取降级策略
*/
public String getFallbackStrategy(String serviceName) {
return getServiceConfig(serviceName).getStrategy();
}
/**
* 服务降级配置
*/
@Data
public static class ServiceFallbackConfig {
private boolean enabled = true;
private String strategy = "default"; // default, cache, database, mock
private int timeout = 5000; // 超时时间(毫秒)
private int maxRetries = 3; // 最大重试次数
private Map<String, Object> properties = new HashMap<>();
}
/**
* 全局降级配置
*/
@Data
public static class GlobalFallbackConfig {
private boolean enabled = true;
private String defaultStrategy = "default";
private int globalTimeout = 10000;
}
}
配置文件
application.yml
# 降级配置
fallback:
global:
enabled: true
default-strategy: cache
global-timeout: 10000
services:
userService:
enabled: true
strategy: cache
timeout: 3000
max-retries: 2
properties:
cache-ttl: 1800
use-database-fallback: true
productService:
enabled: true
strategy: database
timeout: 5000
max-retries: 3
properties:
cache-ttl: 3600
use-mock-data: false
paymentService:
enabled: true
strategy: mock
timeout: 2000
max-retries: 1
properties:
mock-success-rate: 0.8
7.5 熔断器监控
1. 熔断器事件监听
熔断器事件监听器
CircuitBreakerEventListener.java
package com.example.orderservice.listener;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.core.registry.EntryAddedEvent;
import io.github.resilience4j.core.registry.EntryRemovedEvent;
import io.github.resilience4j.core.registry.EntryReplacedEvent;
import io.github.resilience4j.core.registry.RegistryEventConsumer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* 熔断器事件监听器
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class CircuitBreakerEventListener {
private final CircuitBreakerRegistry circuitBreakerRegistry;
private final CircuitBreakerMetricsService metricsService;
@PostConstruct
public void init() {
// 注册熔断器注册表事件监听
circuitBreakerRegistry.getEventPublisher()
.onEntryAdded(this::onCircuitBreakerAdded)
.onEntryRemoved(this::onCircuitBreakerRemoved)
.onEntryReplaced(this::onCircuitBreakerReplaced);
// 为现有的熔断器添加事件监听
circuitBreakerRegistry.getAllCircuitBreakers()
.forEach(this::addCircuitBreakerEventListeners);
}
/**
* 熔断器添加事件
*/
private void onCircuitBreakerAdded(EntryAddedEvent<CircuitBreaker> event) {
log.info("Circuit breaker added: {}", event.getAddedEntry().getName());
addCircuitBreakerEventListeners(event.getAddedEntry());
}
/**
* 熔断器移除事件
*/
private void onCircuitBreakerRemoved(EntryRemovedEvent<CircuitBreaker> event) {
log.info("Circuit breaker removed: {}", event.getRemovedEntry().getName());
}
/**
* 熔断器替换事件
*/
private void onCircuitBreakerReplaced(EntryReplacedEvent<CircuitBreaker> event) {
log.info("Circuit breaker replaced: {}", event.getNewEntry().getName());
addCircuitBreakerEventListeners(event.getNewEntry());
}
/**
* 添加熔断器事件监听
*/
private void addCircuitBreakerEventListeners(CircuitBreaker circuitBreaker) {
circuitBreaker.getEventPublisher()
.onSuccess(event -> {
log.debug("Circuit breaker success: {}, duration: {}ms",
event.getCircuitBreakerName(), event.getElapsedDuration().toMillis());
metricsService.recordSuccess(event.getCircuitBreakerName(), event.getElapsedDuration());
})
.onError(event -> {
log.warn("Circuit breaker error: {}, duration: {}ms, error: {}",
event.getCircuitBreakerName(), event.getElapsedDuration().toMillis(),
event.getThrowable().getMessage());
metricsService.recordError(event.getCircuitBreakerName(), event.getElapsedDuration(),
event.getThrowable());
})
.onStateTransition(event -> {
log.info("Circuit breaker state transition: {} from {} to {}",
event.getCircuitBreakerName(), event.getStateTransition().getFromState(),
event.getStateTransition().getToState());
metricsService.recordStateTransition(event.getCircuitBreakerName(),
event.getStateTransition());
})
.onSlowCallRateExceeded(event -> {
log.warn("Circuit breaker slow call rate exceeded: {}, rate: {}%",
event.getCircuitBreakerName(), event.getSlowCallRate());
metricsService.recordSlowCallRateExceeded(event.getCircuitBreakerName(),
event.getSlowCallRate());
})
.onFailureRateExceeded(event -> {
log.warn("Circuit breaker failure rate exceeded: {}, rate: {}%",
event.getCircuitBreakerName(), event.getFailureRate());
metricsService.recordFailureRateExceeded(event.getCircuitBreakerName(),
event.getFailureRate());
});
}
}
2. 熔断器指标服务
CircuitBreakerMetricsService.java
package com.example.orderservice.service;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* 熔断器指标服务
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class CircuitBreakerMetricsService {
private final MeterRegistry meterRegistry;
private final ConcurrentMap<String, CircuitBreakerMetrics> metricsMap = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
log.info("Circuit breaker metrics service initialized");
}
/**
* 记录成功调用
*/
public void recordSuccess(String circuitBreakerName, Duration duration) {
CircuitBreakerMetrics metrics = getOrCreateMetrics(circuitBreakerName);
metrics.successCounter.increment();
metrics.callTimer.record(duration);
}
/**
* 记录错误调用
*/
public void recordError(String circuitBreakerName, Duration duration, Throwable throwable) {
CircuitBreakerMetrics metrics = getOrCreateMetrics(circuitBreakerName);
metrics.errorCounter.increment();
metrics.callTimer.record(duration);
// 按异常类型统计
String exceptionType = throwable.getClass().getSimpleName();
Counter exceptionCounter = Counter.builder("circuit.breaker.exceptions")
.tag("name", circuitBreakerName)
.tag("exception", exceptionType)
.register(meterRegistry);
exceptionCounter.increment();
}
/**
* 记录状态转换
*/
public void recordStateTransition(String circuitBreakerName,
CircuitBreaker.StateTransition stateTransition) {
Counter transitionCounter = Counter.builder("circuit.breaker.state.transitions")
.tag("name", circuitBreakerName)
.tag("from", stateTransition.getFromState().name())
.tag("to", stateTransition.getToState().name())
.register(meterRegistry);
transitionCounter.increment();
}
/**
* 记录慢调用率超标
*/
public void recordSlowCallRateExceeded(String circuitBreakerName, float slowCallRate) {
Counter slowCallCounter = Counter.builder("circuit.breaker.slow.call.rate.exceeded")
.tag("name", circuitBreakerName)
.register(meterRegistry);
slowCallCounter.increment();
}
/**
* 记录失败率超标
*/
public void recordFailureRateExceeded(String circuitBreakerName, float failureRate) {
Counter failureRateCounter = Counter.builder("circuit.breaker.failure.rate.exceeded")
.tag("name", circuitBreakerName)
.register(meterRegistry);
failureRateCounter.increment();
}
/**
* 获取或创建指标
*/
private CircuitBreakerMetrics getOrCreateMetrics(String circuitBreakerName) {
return metricsMap.computeIfAbsent(circuitBreakerName, name -> {
CircuitBreakerMetrics metrics = new CircuitBreakerMetrics();
metrics.successCounter = Counter.builder("circuit.breaker.calls")
.tag("name", name)
.tag("result", "success")
.register(meterRegistry);
metrics.errorCounter = Counter.builder("circuit.breaker.calls")
.tag("name", name)
.tag("result", "error")
.register(meterRegistry);
metrics.callTimer = Timer.builder("circuit.breaker.call.duration")
.tag("name", name)
.register(meterRegistry);
return metrics;
});
}
/**
* 熔断器指标
*/
private static class CircuitBreakerMetrics {
Counter successCounter;
Counter errorCounter;
Timer callTimer;
}
}
3. 熔断器健康检查
CircuitBreakerHealthIndicator.java
package com.example.orderservice.health;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.actuator.health.Health;
import org.springframework.boot.actuator.health.HealthIndicator;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* 熔断器健康检查
*/
@Component
@RequiredArgsConstructor
public class CircuitBreakerHealthIndicator implements HealthIndicator {
private final CircuitBreakerRegistry circuitBreakerRegistry;
@Override
public Health health() {
Map<String, Object> details = new HashMap<>();
boolean allHealthy = true;
for (CircuitBreaker circuitBreaker : circuitBreakerRegistry.getAllCircuitBreakers()) {
String name = circuitBreaker.getName();
CircuitBreaker.State state = circuitBreaker.getState();
Map<String, Object> circuitBreakerDetails = new HashMap<>();
circuitBreakerDetails.put("state", state.name());
circuitBreakerDetails.put("failureRate", circuitBreaker.getMetrics().getFailureRate());
circuitBreakerDetails.put("slowCallRate", circuitBreaker.getMetrics().getSlowCallRate());
circuitBreakerDetails.put("numberOfCalls", circuitBreaker.getMetrics().getNumberOfBufferedCalls());
circuitBreakerDetails.put("numberOfFailedCalls", circuitBreaker.getMetrics().getNumberOfFailedCalls());
circuitBreakerDetails.put("numberOfSlowCalls", circuitBreaker.getMetrics().getNumberOfSlowCalls());
details.put(name, circuitBreakerDetails);
// 如果熔断器处于开启状态,则认为不健康
if (state == CircuitBreaker.State.OPEN) {
allHealthy = false;
}
}
if (allHealthy) {
return Health.up().withDetails(details).build();
} else {
return Health.down().withDetails(details).build();
}
}
}
7.6 总结
核心概念回顾
熔断器模式:
- 自动故障检测和隔离
- 三种状态:关闭、开启、半开
- 防止故障扩散
- 快速失败机制
降级策略:
- 多级降级:缓存 → 数据库 → 默认值
- 功能降级:关闭非核心功能
- 性能降级:降低服务质量
- 容量降级:限制并发数
Resilience4j特性:
- 轻量级和易于使用
- 丰富的配置选项
- 完善的监控指标
- 与Spring Boot深度集成
最佳实践
熔断器配置:
- 根据业务特点调整阈值
- 合理设置滑动窗口大小
- 配置适当的等待时间
- 启用自动状态转换
降级策略设计:
- 实现多级降级机制
- 提供有意义的降级响应
- 考虑业务优先级
- 支持动态配置
监控和告警:
- 监控熔断器状态变化
- 记录详细的调用指标
- 设置合理的告警规则
- 定期分析故障模式
测试验证:
- 进行故障注入测试
- 验证降级逻辑正确性
- 测试恢复机制
- 压力测试验证阈值
注意事项
避免级联故障:
- 合理设置超时时间
- 实现请求隔离
- 避免共享资源竞争
- 使用异步处理
降级数据一致性:
- 确保降级数据的准确性
- 定期更新缓存数据
- 处理数据版本问题
- 考虑最终一致性
性能影响:
- 熔断器本身的性能开销
- 监控数据收集的影响
- 降级逻辑的执行效率
- 内存使用优化
扩展方向
高级功能:
- 自适应熔断阈值
- 基于机器学习的故障预测
- 智能降级策略选择
- 分布式熔断协调
集成方案:
- 与服务网格集成
- 与APM系统集成
- 与告警系统集成
- 与配置中心集成
通过本章学习,你应该掌握了服务熔断与降级的核心概念和实际应用。熔断降级是微服务架构中实现系统稳定性和可用性的重要手段,合理使用能够显著提升系统的容错能力。
下一章我们将学习链路追踪,了解如何在微服务架构中实现分布式请求追踪和性能监控。