7.1 熔断降级概述

1. 什么是服务熔断

服务熔断是一种保护机制,当某个服务出现故障或响应时间过长时,熔断器会自动切断对该服务的调用,避免故障扩散到整个系统。这类似于电路中的保险丝,当电流过大时自动断开电路保护设备。

熔断器状态

  1. 关闭状态(Closed)

    • 正常情况下,熔断器处于关闭状态
    • 所有请求正常通过
    • 统计失败率和响应时间
  2. 开启状态(Open)

    • 当失败率达到阈值时,熔断器开启
    • 所有请求直接返回错误或降级响应
    • 不会调用实际的服务
  3. 半开状态(Half-Open)

    • 经过一定时间后,熔断器进入半开状态
    • 允许少量请求通过进行测试
    • 根据测试结果决定是关闭还是重新开启

2. 什么是服务降级

服务降级是指当系统压力过大或某些服务不可用时,主动关闭一些非核心功能,或者返回预设的默认值,以保证核心功能的正常运行。

降级策略

  1. 功能降级:关闭非核心功能
  2. 读降级:只提供读操作,暂停写操作
  3. 写降级:只允许核心写操作
  4. 缓存降级:返回缓存数据,不调用实时服务
  5. 默认值降级:返回预设的默认响应

3. 熔断与降级的关系

  • 熔断:是一种自动保护机制,基于实时监控数据
  • 降级:可以是主动的,也可以是被动的
  • 配合使用:熔断触发后通常会执行降级策略

7.2 Spring Cloud Circuit Breaker

1. Circuit Breaker 简介

Spring Cloud Circuit Breaker 提供了一个抽象层,支持多种熔断器实现: - Resilience4j:轻量级的熔断器库 - Hystrix:Netflix开源的熔断器(已进入维护模式) - Sentinel:阿里巴巴开源的流量控制组件

2. Resilience4j 集成

依赖配置

pom.xml

<dependencies>
    <!-- Spring Cloud Circuit Breaker -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-circuitbreaker-resilience4j</artifactId>
    </dependency>
    
    <!-- Resilience4j Spring Boot Starter -->
    <dependency>
        <groupId>io.github.resilience4j</groupId>
        <artifactId>resilience4j-spring-boot2</artifactId>
    </dependency>
    
    <!-- Resilience4j Micrometer -->
    <dependency>
        <groupId>io.github.resilience4j</groupId>
        <artifactId>resilience4j-micrometer</artifactId>
    </dependency>
    
    <!-- Spring Boot Actuator -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
</dependencies>

配置文件

application.yml

spring:
  application:
    name: order-service
  cloud:
    circuitbreaker:
      resilience4j:
        enabled: true

# Resilience4j 配置
resilience4j:
  circuitbreaker:
    configs:
      default:
        # 熔断器开启的失败阈值百分比
        failure-rate-threshold: 50
        # 慢调用阈值时间
        slow-call-duration-threshold: 2s
        # 慢调用百分比阈值
        slow-call-rate-threshold: 50
        # 滑动窗口大小
        sliding-window-size: 10
        # 滑动窗口类型:COUNT_BASED 或 TIME_BASED
        sliding-window-type: COUNT_BASED
        # 最小调用次数
        minimum-number-of-calls: 5
        # 熔断器开启后等待时间
        wait-duration-in-open-state: 10s
        # 半开状态下允许的调用次数
        permitted-number-of-calls-in-half-open-state: 3
        # 是否自动从开启状态转换到半开状态
        automatic-transition-from-open-to-half-open-enabled: true
        # 记录的异常类型
        record-exceptions:
          - java.lang.Exception
        # 忽略的异常类型
        ignore-exceptions:
          - java.lang.IllegalArgumentException
    instances:
      userService:
        base-config: default
        failure-rate-threshold: 60
      productService:
        base-config: default
        wait-duration-in-open-state: 5s
  
  # 重试配置
  retry:
    configs:
      default:
        max-attempts: 3
        wait-duration: 1s
        retry-exceptions:
          - java.net.ConnectException
          - java.net.SocketTimeoutException
    instances:
      userService:
        base-config: default
      productService:
        base-config: default
        max-attempts: 2
  
  # 限流配置
  ratelimiter:
    configs:
      default:
        limit-for-period: 10
        limit-refresh-period: 1s
        timeout-duration: 0s
    instances:
      userService:
        base-config: default
      productService:
        base-config: default
        limit-for-period: 5
  
  # 舱壁隔离配置
  bulkhead:
    configs:
      default:
        max-concurrent-calls: 10
        max-wait-duration: 1s
    instances:
      userService:
        base-config: default
      productService:
        base-config: default
        max-concurrent-calls: 5

# 监控端点配置
management:
  endpoints:
    web:
      exposure:
        include: health,info,metrics,circuitbreakers,ratelimiters,retries,bulkheads
  endpoint:
    health:
      show-details: always
  health:
    circuitbreakers:
      enabled: true
    ratelimiters:
      enabled: true

7.3 熔断器实现

1. 基于注解的熔断器

用户服务客户端

UserServiceClient.java

package com.example.orderservice.client;

import com.example.orderservice.dto.UserDTO;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import io.github.resilience4j.retry.annotation.Retry;
import io.github.resilience4j.ratelimiter.annotation.RateLimiter;
import io.github.resilience4j.bulkhead.annotation.Bulkhead;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

/**
 * 用户服务客户端
 */
@Component
@RequiredArgsConstructor
@Slf4j
public class UserServiceClient {
    
    private final RestTemplate restTemplate;
    private static final String USER_SERVICE_URL = "http://user-service";
    
    /**
     * 获取用户信息
     */
    @CircuitBreaker(name = "userService", fallbackMethod = "getUserFallback")
    @Retry(name = "userService")
    @RateLimiter(name = "userService")
    @Bulkhead(name = "userService")
    public UserDTO getUser(Long userId) {
        log.info("Calling user service for user: {}", userId);
        
        String url = USER_SERVICE_URL + "/users/" + userId;
        UserDTO user = restTemplate.getForObject(url, UserDTO.class);
        
        log.info("User service response: {}", user);
        return user;
    }
    
    /**
     * 验证用户
     */
    @CircuitBreaker(name = "userService", fallbackMethod = "validateUserFallback")
    @Retry(name = "userService")
    @RateLimiter(name = "userService")
    public boolean validateUser(Long userId) {
        log.info("Validating user: {}", userId);
        
        try {
            String url = USER_SERVICE_URL + "/users/" + userId + "/validate";
            Boolean result = restTemplate.getForObject(url, Boolean.class);
            
            log.info("User validation result: {}", result);
            return Boolean.TRUE.equals(result);
        } catch (Exception e) {
            log.error("Error validating user: {}", userId, e);
            throw e;
        }
    }
    
    /**
     * 获取用户信息降级方法
     */
    public UserDTO getUserFallback(Long userId, Exception ex) {
        log.warn("User service fallback triggered for user: {}, reason: {}", 
                userId, ex.getMessage());
        
        // 返回默认用户信息
        UserDTO fallbackUser = new UserDTO();
        fallbackUser.setId(userId);
        fallbackUser.setUsername("Unknown User");
        fallbackUser.setEmail("unknown@example.com");
        fallbackUser.setStatus("INACTIVE");
        
        return fallbackUser;
    }
    
    /**
     * 验证用户降级方法
     */
    public boolean validateUserFallback(Long userId, Exception ex) {
        log.warn("User validation fallback triggered for user: {}, reason: {}", 
                userId, ex.getMessage());
        
        // 降级策略:返回false,拒绝订单创建
        return false;
    }
}

商品服务客户端

ProductServiceClient.java

package com.example.orderservice.client;

import com.example.orderservice.dto.ProductDTO;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import io.github.resilience4j.retry.annotation.Retry;
import io.github.resilience4j.ratelimiter.annotation.RateLimiter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

import java.util.concurrent.TimeUnit;

/**
 * 商品服务客户端
 */
@Component
@RequiredArgsConstructor
@Slf4j
public class ProductServiceClient {
    
    private final RestTemplate restTemplate;
    private final RedisTemplate<String, Object> redisTemplate;
    private static final String PRODUCT_SERVICE_URL = "http://product-service";
    private static final String PRODUCT_CACHE_PREFIX = "product:";
    
    /**
     * 获取商品信息
     */
    @CircuitBreaker(name = "productService", fallbackMethod = "getProductFallback")
    @Retry(name = "productService")
    @RateLimiter(name = "productService")
    public ProductDTO getProduct(Long productId) {
        log.info("Calling product service for product: {}", productId);
        
        String url = PRODUCT_SERVICE_URL + "/products/" + productId;
        ProductDTO product = restTemplate.getForObject(url, ProductDTO.class);
        
        // 缓存商品信息
        if (product != null) {
            String cacheKey = PRODUCT_CACHE_PREFIX + productId;
            redisTemplate.opsForValue().set(cacheKey, product, 30, TimeUnit.MINUTES);
        }
        
        log.info("Product service response: {}", product);
        return product;
    }
    
    /**
     * 检查库存
     */
    @CircuitBreaker(name = "productService", fallbackMethod = "checkStockFallback")
    @Retry(name = "productService")
    public boolean checkStock(Long productId, Integer quantity) {
        log.info("Checking stock for product: {}, quantity: {}", productId, quantity);
        
        try {
            String url = PRODUCT_SERVICE_URL + "/products/" + productId + "/stock?quantity=" + quantity;
            Boolean result = restTemplate.getForObject(url, Boolean.class);
            
            log.info("Stock check result: {}", result);
            return Boolean.TRUE.equals(result);
        } catch (Exception e) {
            log.error("Error checking stock for product: {}", productId, e);
            throw e;
        }
    }
    
    /**
     * 扣减库存
     */
    @CircuitBreaker(name = "productService", fallbackMethod = "reduceStockFallback")
    @Retry(name = "productService")
    public boolean reduceStock(Long productId, Integer quantity) {
        log.info("Reducing stock for product: {}, quantity: {}", productId, quantity);
        
        try {
            String url = PRODUCT_SERVICE_URL + "/products/" + productId + "/stock/reduce";
            StockRequest request = new StockRequest(productId, quantity);
            Boolean result = restTemplate.postForObject(url, request, Boolean.class);
            
            log.info("Stock reduction result: {}", result);
            return Boolean.TRUE.equals(result);
        } catch (Exception e) {
            log.error("Error reducing stock for product: {}", productId, e);
            throw e;
        }
    }
    
    /**
     * 获取商品信息降级方法
     */
    public ProductDTO getProductFallback(Long productId, Exception ex) {
        log.warn("Product service fallback triggered for product: {}, reason: {}", 
                productId, ex.getMessage());
        
        // 尝试从缓存获取
        String cacheKey = PRODUCT_CACHE_PREFIX + productId;
        ProductDTO cachedProduct = (ProductDTO) redisTemplate.opsForValue().get(cacheKey);
        
        if (cachedProduct != null) {
            log.info("Returning cached product: {}", productId);
            return cachedProduct;
        }
        
        // 返回默认商品信息
        ProductDTO fallbackProduct = new ProductDTO();
        fallbackProduct.setId(productId);
        fallbackProduct.setName("商品暂时不可用");
        fallbackProduct.setPrice(0.0);
        fallbackProduct.setStock(0);
        fallbackProduct.setStatus("UNAVAILABLE");
        
        return fallbackProduct;
    }
    
    /**
     * 检查库存降级方法
     */
    public boolean checkStockFallback(Long productId, Integer quantity, Exception ex) {
        log.warn("Stock check fallback triggered for product: {}, reason: {}", 
                productId, ex.getMessage());
        
        // 降级策略:返回false,表示库存不足
        return false;
    }
    
    /**
     * 扣减库存降级方法
     */
    public boolean reduceStockFallback(Long productId, Integer quantity, Exception ex) {
        log.warn("Stock reduction fallback triggered for product: {}, reason: {}", 
                productId, ex.getMessage());
        
        // 降级策略:返回false,表示扣减失败
        return false;
    }
    
    /**
     * 库存请求DTO
     */
    public static class StockRequest {
        private Long productId;
        private Integer quantity;
        
        public StockRequest(Long productId, Integer quantity) {
            this.productId = productId;
            this.quantity = quantity;
        }
        
        // getters and setters
        public Long getProductId() { return productId; }
        public void setProductId(Long productId) { this.productId = productId; }
        public Integer getQuantity() { return quantity; }
        public void setQuantity(Integer quantity) { this.quantity = quantity; }
    }
}

2. 编程式熔断器

熔断器工厂

CircuitBreakerFactory.java

package com.example.orderservice.config;

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.function.Supplier;

/**
 * 熔断器工厂
 */
@Component
@RequiredArgsConstructor
public class CircuitBreakerFactory {
    
    private final CircuitBreakerRegistry circuitBreakerRegistry;
    
    /**
     * 创建熔断器
     */
    public CircuitBreaker createCircuitBreaker(String name, CircuitBreakerConfig config) {
        return circuitBreakerRegistry.circuitBreaker(name, config);
    }
    
    /**
     * 创建默认熔断器
     */
    public CircuitBreaker createDefaultCircuitBreaker(String name) {
        CircuitBreakerConfig config = CircuitBreakerConfig.custom()
            .failureRateThreshold(50)
            .slowCallRateThreshold(50)
            .slowCallDurationThreshold(Duration.ofSeconds(2))
            .slidingWindowSize(10)
            .minimumNumberOfCalls(5)
            .waitDurationInOpenState(Duration.ofSeconds(10))
            .permittedNumberOfCallsInHalfOpenState(3)
            .automaticTransitionFromOpenToHalfOpenEnabled(true)
            .build();
        
        return createCircuitBreaker(name, config);
    }
    
    /**
     * 执行带熔断保护的操作
     */
    public <T> T executeWithCircuitBreaker(String circuitBreakerName, 
                                          Supplier<T> operation, 
                                          Supplier<T> fallback) {
        CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker(circuitBreakerName);
        
        Supplier<T> decoratedSupplier = CircuitBreaker.decorateSupplier(circuitBreaker, operation);
        
        try {
            return decoratedSupplier.get();
        } catch (Exception e) {
            return fallback.get();
        }
    }
}

编程式服务调用

ProgrammaticServiceClient.java

package com.example.orderservice.service;

import com.example.orderservice.config.CircuitBreakerFactory;
import com.example.orderservice.dto.UserDTO;
import com.example.orderservice.dto.ProductDTO;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

/**
 * 编程式服务调用
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class ProgrammaticServiceClient {
    
    private final RestTemplate restTemplate;
    private final CircuitBreakerFactory circuitBreakerFactory;
    
    /**
     * 获取用户信息(编程式熔断)
     */
    public UserDTO getUserWithCircuitBreaker(Long userId) {
        return circuitBreakerFactory.executeWithCircuitBreaker(
            "programmatic-user-service",
            () -> {
                log.info("Calling user service for user: {}", userId);
                String url = "http://user-service/users/" + userId;
                return restTemplate.getForObject(url, UserDTO.class);
            },
            () -> {
                log.warn("User service circuit breaker fallback for user: {}", userId);
                UserDTO fallbackUser = new UserDTO();
                fallbackUser.setId(userId);
                fallbackUser.setUsername("Fallback User");
                fallbackUser.setEmail("fallback@example.com");
                return fallbackUser;
            }
        );
    }
    
    /**
     * 获取商品信息(编程式熔断)
     */
    public ProductDTO getProductWithCircuitBreaker(Long productId) {
        CircuitBreaker circuitBreaker = circuitBreakerFactory.createDefaultCircuitBreaker("product-service");
        
        return circuitBreaker.executeSupplier(() -> {
            log.info("Calling product service for product: {}", productId);
            String url = "http://product-service/products/" + productId;
            ProductDTO product = restTemplate.getForObject(url, ProductDTO.class);
            
            if (product == null) {
                throw new RuntimeException("Product not found: " + productId);
            }
            
            return product;
        });
    }
}

7.4 降级策略实现

1. 多级降级策略

降级策略管理器

FallbackStrategyManager.java

package com.example.orderservice.service;

import com.example.orderservice.dto.ProductDTO;
import com.example.orderservice.dto.UserDTO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

/**
 * 降级策略管理器
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class FallbackStrategyManager {
    
    private final RedisTemplate<String, Object> redisTemplate;
    private final DatabaseFallbackService databaseFallbackService;
    
    /**
     * 用户信息多级降级
     */
    public UserDTO getUserWithMultiLevelFallback(Long userId) {
        // 第一级:尝试从缓存获取
        UserDTO cachedUser = getUserFromCache(userId);
        if (cachedUser != null) {
            log.info("User found in cache: {}", userId);
            return cachedUser;
        }
        
        // 第二级:尝试从数据库获取
        UserDTO dbUser = databaseFallbackService.getUserFromDatabase(userId);
        if (dbUser != null) {
            log.info("User found in database: {}", userId);
            // 更新缓存
            cacheUser(userId, dbUser);
            return dbUser;
        }
        
        // 第三级:返回默认用户
        log.warn("Using default fallback for user: {}", userId);
        return createDefaultUser(userId);
    }
    
    /**
     * 商品信息多级降级
     */
    public ProductDTO getProductWithMultiLevelFallback(Long productId) {
        // 第一级:尝试从缓存获取
        ProductDTO cachedProduct = getProductFromCache(productId);
        if (cachedProduct != null) {
            log.info("Product found in cache: {}", productId);
            return cachedProduct;
        }
        
        // 第二级:尝试从数据库获取基本信息
        ProductDTO dbProduct = databaseFallbackService.getProductFromDatabase(productId);
        if (dbProduct != null) {
            log.info("Product found in database: {}", productId);
            // 更新缓存
            cacheProduct(productId, dbProduct);
            return dbProduct;
        }
        
        // 第三级:返回默认商品
        log.warn("Using default fallback for product: {}", productId);
        return createDefaultProduct(productId);
    }
    
    /**
     * 从缓存获取用户
     */
    private UserDTO getUserFromCache(Long userId) {
        try {
            String cacheKey = "user:" + userId;
            return (UserDTO) redisTemplate.opsForValue().get(cacheKey);
        } catch (Exception e) {
            log.error("Error getting user from cache: {}", userId, e);
            return null;
        }
    }
    
    /**
     * 缓存用户信息
     */
    private void cacheUser(Long userId, UserDTO user) {
        try {
            String cacheKey = "user:" + userId;
            redisTemplate.opsForValue().set(cacheKey, user, 30, TimeUnit.MINUTES);
        } catch (Exception e) {
            log.error("Error caching user: {}", userId, e);
        }
    }
    
    /**
     * 从缓存获取商品
     */
    private ProductDTO getProductFromCache(Long productId) {
        try {
            String cacheKey = "product:" + productId;
            return (ProductDTO) redisTemplate.opsForValue().get(cacheKey);
        } catch (Exception e) {
            log.error("Error getting product from cache: {}", productId, e);
            return null;
        }
    }
    
    /**
     * 缓存商品信息
     */
    private void cacheProduct(Long productId, ProductDTO product) {
        try {
            String cacheKey = "product:" + productId;
            redisTemplate.opsForValue().set(cacheKey, product, 30, TimeUnit.MINUTES);
        } catch (Exception e) {
            log.error("Error caching product: {}", productId, e);
        }
    }
    
    /**
     * 创建默认用户
     */
    private UserDTO createDefaultUser(Long userId) {
        UserDTO defaultUser = new UserDTO();
        defaultUser.setId(userId);
        defaultUser.setUsername("临时用户");
        defaultUser.setEmail("temp@example.com");
        defaultUser.setStatus("TEMP");
        return defaultUser;
    }
    
    /**
     * 创建默认商品
     */
    private ProductDTO createDefaultProduct(Long productId) {
        ProductDTO defaultProduct = new ProductDTO();
        defaultProduct.setId(productId);
        defaultProduct.setName("商品暂时不可用");
        defaultProduct.setPrice(0.0);
        defaultProduct.setStock(0);
        defaultProduct.setStatus("UNAVAILABLE");
        return defaultProduct;
    }
}

2. 数据库降级服务

DatabaseFallbackService.java

package com.example.orderservice.service;

import com.example.orderservice.dto.ProductDTO;
import com.example.orderservice.dto.UserDTO;
import com.example.orderservice.entity.User;
import com.example.orderservice.entity.Product;
import com.example.orderservice.repository.UserRepository;
import com.example.orderservice.repository.ProductRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

/**
 * 数据库降级服务
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class DatabaseFallbackService {
    
    private final UserRepository userRepository;
    private final ProductRepository productRepository;
    
    /**
     * 从数据库获取用户信息
     */
    public UserDTO getUserFromDatabase(Long userId) {
        try {
            User user = userRepository.findById(userId).orElse(null);
            if (user != null) {
                return convertToUserDTO(user);
            }
        } catch (Exception e) {
            log.error("Error getting user from database: {}", userId, e);
        }
        return null;
    }
    
    /**
     * 从数据库获取商品信息
     */
    public ProductDTO getProductFromDatabase(Long productId) {
        try {
            Product product = productRepository.findById(productId).orElse(null);
            if (product != null) {
                return convertToProductDTO(product);
            }
        } catch (Exception e) {
            log.error("Error getting product from database: {}", productId, e);
        }
        return null;
    }
    
    /**
     * 转换用户实体为DTO
     */
    private UserDTO convertToUserDTO(User user) {
        UserDTO userDTO = new UserDTO();
        userDTO.setId(user.getId());
        userDTO.setUsername(user.getUsername());
        userDTO.setEmail(user.getEmail());
        userDTO.setStatus(user.getStatus());
        return userDTO;
    }
    
    /**
     * 转换商品实体为DTO
     */
    private ProductDTO convertToProductDTO(Product product) {
        ProductDTO productDTO = new ProductDTO();
        productDTO.setId(product.getId());
        productDTO.setName(product.getName());
        productDTO.setPrice(product.getPrice());
        productDTO.setStock(product.getStock());
        productDTO.setStatus(product.getStatus());
        return productDTO;
    }
}

3. 动态降级配置

降级配置管理

FallbackConfigManager.java

package com.example.orderservice.config;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;

/**
 * 降级配置管理
 */
@Component
@ConfigurationProperties(prefix = "fallback")
@RefreshScope
@Data
@Slf4j
public class FallbackConfigManager {
    
    private Map<String, ServiceFallbackConfig> services = new HashMap<>();
    private GlobalFallbackConfig global = new GlobalFallbackConfig();
    
    @PostConstruct
    public void init() {
        log.info("Fallback configuration initialized: {}", this);
    }
    
    /**
     * 获取服务降级配置
     */
    public ServiceFallbackConfig getServiceConfig(String serviceName) {
        return services.getOrDefault(serviceName, new ServiceFallbackConfig());
    }
    
    /**
     * 是否启用降级
     */
    public boolean isFallbackEnabled(String serviceName) {
        ServiceFallbackConfig config = getServiceConfig(serviceName);
        return config.isEnabled() && global.isEnabled();
    }
    
    /**
     * 获取降级策略
     */
    public String getFallbackStrategy(String serviceName) {
        return getServiceConfig(serviceName).getStrategy();
    }
    
    /**
     * 服务降级配置
     */
    @Data
    public static class ServiceFallbackConfig {
        private boolean enabled = true;
        private String strategy = "default"; // default, cache, database, mock
        private int timeout = 5000; // 超时时间(毫秒)
        private int maxRetries = 3; // 最大重试次数
        private Map<String, Object> properties = new HashMap<>();
    }
    
    /**
     * 全局降级配置
     */
    @Data
    public static class GlobalFallbackConfig {
        private boolean enabled = true;
        private String defaultStrategy = "default";
        private int globalTimeout = 10000;
    }
}

配置文件

application.yml

# 降级配置
fallback:
  global:
    enabled: true
    default-strategy: cache
    global-timeout: 10000
  services:
    userService:
      enabled: true
      strategy: cache
      timeout: 3000
      max-retries: 2
      properties:
        cache-ttl: 1800
        use-database-fallback: true
    productService:
      enabled: true
      strategy: database
      timeout: 5000
      max-retries: 3
      properties:
        cache-ttl: 3600
        use-mock-data: false
    paymentService:
      enabled: true
      strategy: mock
      timeout: 2000
      max-retries: 1
      properties:
        mock-success-rate: 0.8

7.5 熔断器监控

1. 熔断器事件监听

熔断器事件监听器

CircuitBreakerEventListener.java

package com.example.orderservice.listener;

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.core.registry.EntryAddedEvent;
import io.github.resilience4j.core.registry.EntryRemovedEvent;
import io.github.resilience4j.core.registry.EntryReplacedEvent;
import io.github.resilience4j.core.registry.RegistryEventConsumer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 熔断器事件监听器
 */
@Component
@RequiredArgsConstructor
@Slf4j
public class CircuitBreakerEventListener {
    
    private final CircuitBreakerRegistry circuitBreakerRegistry;
    private final CircuitBreakerMetricsService metricsService;
    
    @PostConstruct
    public void init() {
        // 注册熔断器注册表事件监听
        circuitBreakerRegistry.getEventPublisher()
            .onEntryAdded(this::onCircuitBreakerAdded)
            .onEntryRemoved(this::onCircuitBreakerRemoved)
            .onEntryReplaced(this::onCircuitBreakerReplaced);
        
        // 为现有的熔断器添加事件监听
        circuitBreakerRegistry.getAllCircuitBreakers()
            .forEach(this::addCircuitBreakerEventListeners);
    }
    
    /**
     * 熔断器添加事件
     */
    private void onCircuitBreakerAdded(EntryAddedEvent<CircuitBreaker> event) {
        log.info("Circuit breaker added: {}", event.getAddedEntry().getName());
        addCircuitBreakerEventListeners(event.getAddedEntry());
    }
    
    /**
     * 熔断器移除事件
     */
    private void onCircuitBreakerRemoved(EntryRemovedEvent<CircuitBreaker> event) {
        log.info("Circuit breaker removed: {}", event.getRemovedEntry().getName());
    }
    
    /**
     * 熔断器替换事件
     */
    private void onCircuitBreakerReplaced(EntryReplacedEvent<CircuitBreaker> event) {
        log.info("Circuit breaker replaced: {}", event.getNewEntry().getName());
        addCircuitBreakerEventListeners(event.getNewEntry());
    }
    
    /**
     * 添加熔断器事件监听
     */
    private void addCircuitBreakerEventListeners(CircuitBreaker circuitBreaker) {
        circuitBreaker.getEventPublisher()
            .onSuccess(event -> {
                log.debug("Circuit breaker success: {}, duration: {}ms", 
                         event.getCircuitBreakerName(), event.getElapsedDuration().toMillis());
                metricsService.recordSuccess(event.getCircuitBreakerName(), event.getElapsedDuration());
            })
            .onError(event -> {
                log.warn("Circuit breaker error: {}, duration: {}ms, error: {}", 
                        event.getCircuitBreakerName(), event.getElapsedDuration().toMillis(), 
                        event.getThrowable().getMessage());
                metricsService.recordError(event.getCircuitBreakerName(), event.getElapsedDuration(), 
                                         event.getThrowable());
            })
            .onStateTransition(event -> {
                log.info("Circuit breaker state transition: {} from {} to {}", 
                        event.getCircuitBreakerName(), event.getStateTransition().getFromState(), 
                        event.getStateTransition().getToState());
                metricsService.recordStateTransition(event.getCircuitBreakerName(), 
                                                    event.getStateTransition());
            })
            .onSlowCallRateExceeded(event -> {
                log.warn("Circuit breaker slow call rate exceeded: {}, rate: {}%", 
                        event.getCircuitBreakerName(), event.getSlowCallRate());
                metricsService.recordSlowCallRateExceeded(event.getCircuitBreakerName(), 
                                                        event.getSlowCallRate());
            })
            .onFailureRateExceeded(event -> {
                log.warn("Circuit breaker failure rate exceeded: {}, rate: {}%", 
                        event.getCircuitBreakerName(), event.getFailureRate());
                metricsService.recordFailureRateExceeded(event.getCircuitBreakerName(), 
                                                        event.getFailureRate());
            });
    }
}

2. 熔断器指标服务

CircuitBreakerMetricsService.java

package com.example.orderservice.service;

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
 * 熔断器指标服务
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class CircuitBreakerMetricsService {
    
    private final MeterRegistry meterRegistry;
    private final ConcurrentMap<String, CircuitBreakerMetrics> metricsMap = new ConcurrentHashMap<>();
    
    @PostConstruct
    public void init() {
        log.info("Circuit breaker metrics service initialized");
    }
    
    /**
     * 记录成功调用
     */
    public void recordSuccess(String circuitBreakerName, Duration duration) {
        CircuitBreakerMetrics metrics = getOrCreateMetrics(circuitBreakerName);
        metrics.successCounter.increment();
        metrics.callTimer.record(duration);
    }
    
    /**
     * 记录错误调用
     */
    public void recordError(String circuitBreakerName, Duration duration, Throwable throwable) {
        CircuitBreakerMetrics metrics = getOrCreateMetrics(circuitBreakerName);
        metrics.errorCounter.increment();
        metrics.callTimer.record(duration);
        
        // 按异常类型统计
        String exceptionType = throwable.getClass().getSimpleName();
        Counter exceptionCounter = Counter.builder("circuit.breaker.exceptions")
            .tag("name", circuitBreakerName)
            .tag("exception", exceptionType)
            .register(meterRegistry);
        exceptionCounter.increment();
    }
    
    /**
     * 记录状态转换
     */
    public void recordStateTransition(String circuitBreakerName, 
                                     CircuitBreaker.StateTransition stateTransition) {
        Counter transitionCounter = Counter.builder("circuit.breaker.state.transitions")
            .tag("name", circuitBreakerName)
            .tag("from", stateTransition.getFromState().name())
            .tag("to", stateTransition.getToState().name())
            .register(meterRegistry);
        transitionCounter.increment();
    }
    
    /**
     * 记录慢调用率超标
     */
    public void recordSlowCallRateExceeded(String circuitBreakerName, float slowCallRate) {
        Counter slowCallCounter = Counter.builder("circuit.breaker.slow.call.rate.exceeded")
            .tag("name", circuitBreakerName)
            .register(meterRegistry);
        slowCallCounter.increment();
    }
    
    /**
     * 记录失败率超标
     */
    public void recordFailureRateExceeded(String circuitBreakerName, float failureRate) {
        Counter failureRateCounter = Counter.builder("circuit.breaker.failure.rate.exceeded")
            .tag("name", circuitBreakerName)
            .register(meterRegistry);
        failureRateCounter.increment();
    }
    
    /**
     * 获取或创建指标
     */
    private CircuitBreakerMetrics getOrCreateMetrics(String circuitBreakerName) {
        return metricsMap.computeIfAbsent(circuitBreakerName, name -> {
            CircuitBreakerMetrics metrics = new CircuitBreakerMetrics();
            
            metrics.successCounter = Counter.builder("circuit.breaker.calls")
                .tag("name", name)
                .tag("result", "success")
                .register(meterRegistry);
            
            metrics.errorCounter = Counter.builder("circuit.breaker.calls")
                .tag("name", name)
                .tag("result", "error")
                .register(meterRegistry);
            
            metrics.callTimer = Timer.builder("circuit.breaker.call.duration")
                .tag("name", name)
                .register(meterRegistry);
            
            return metrics;
        });
    }
    
    /**
     * 熔断器指标
     */
    private static class CircuitBreakerMetrics {
        Counter successCounter;
        Counter errorCounter;
        Timer callTimer;
    }
}

3. 熔断器健康检查

CircuitBreakerHealthIndicator.java

package com.example.orderservice.health;

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.actuator.health.Health;
import org.springframework.boot.actuator.health.HealthIndicator;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * 熔断器健康检查
 */
@Component
@RequiredArgsConstructor
public class CircuitBreakerHealthIndicator implements HealthIndicator {
    
    private final CircuitBreakerRegistry circuitBreakerRegistry;
    
    @Override
    public Health health() {
        Map<String, Object> details = new HashMap<>();
        boolean allHealthy = true;
        
        for (CircuitBreaker circuitBreaker : circuitBreakerRegistry.getAllCircuitBreakers()) {
            String name = circuitBreaker.getName();
            CircuitBreaker.State state = circuitBreaker.getState();
            
            Map<String, Object> circuitBreakerDetails = new HashMap<>();
            circuitBreakerDetails.put("state", state.name());
            circuitBreakerDetails.put("failureRate", circuitBreaker.getMetrics().getFailureRate());
            circuitBreakerDetails.put("slowCallRate", circuitBreaker.getMetrics().getSlowCallRate());
            circuitBreakerDetails.put("numberOfCalls", circuitBreaker.getMetrics().getNumberOfBufferedCalls());
            circuitBreakerDetails.put("numberOfFailedCalls", circuitBreaker.getMetrics().getNumberOfFailedCalls());
            circuitBreakerDetails.put("numberOfSlowCalls", circuitBreaker.getMetrics().getNumberOfSlowCalls());
            
            details.put(name, circuitBreakerDetails);
            
            // 如果熔断器处于开启状态,则认为不健康
            if (state == CircuitBreaker.State.OPEN) {
                allHealthy = false;
            }
        }
        
        if (allHealthy) {
            return Health.up().withDetails(details).build();
        } else {
            return Health.down().withDetails(details).build();
        }
    }
}

7.6 总结

核心概念回顾

  1. 熔断器模式

    • 自动故障检测和隔离
    • 三种状态:关闭、开启、半开
    • 防止故障扩散
    • 快速失败机制
  2. 降级策略

    • 多级降级:缓存 → 数据库 → 默认值
    • 功能降级:关闭非核心功能
    • 性能降级:降低服务质量
    • 容量降级:限制并发数
  3. Resilience4j特性

    • 轻量级和易于使用
    • 丰富的配置选项
    • 完善的监控指标
    • 与Spring Boot深度集成

最佳实践

  1. 熔断器配置

    • 根据业务特点调整阈值
    • 合理设置滑动窗口大小
    • 配置适当的等待时间
    • 启用自动状态转换
  2. 降级策略设计

    • 实现多级降级机制
    • 提供有意义的降级响应
    • 考虑业务优先级
    • 支持动态配置
  3. 监控和告警

    • 监控熔断器状态变化
    • 记录详细的调用指标
    • 设置合理的告警规则
    • 定期分析故障模式
  4. 测试验证

    • 进行故障注入测试
    • 验证降级逻辑正确性
    • 测试恢复机制
    • 压力测试验证阈值

注意事项

  1. 避免级联故障

    • 合理设置超时时间
    • 实现请求隔离
    • 避免共享资源竞争
    • 使用异步处理
  2. 降级数据一致性

    • 确保降级数据的准确性
    • 定期更新缓存数据
    • 处理数据版本问题
    • 考虑最终一致性
  3. 性能影响

    • 熔断器本身的性能开销
    • 监控数据收集的影响
    • 降级逻辑的执行效率
    • 内存使用优化

扩展方向

  1. 高级功能

    • 自适应熔断阈值
    • 基于机器学习的故障预测
    • 智能降级策略选择
    • 分布式熔断协调
  2. 集成方案

    • 与服务网格集成
    • 与APM系统集成
    • 与告警系统集成
    • 与配置中心集成

通过本章学习,你应该掌握了服务熔断与降级的核心概念和实际应用。熔断降级是微服务架构中实现系统稳定性和可用性的重要手段,合理使用能够显著提升系统的容错能力。

下一章我们将学习链路追踪,了解如何在微服务架构中实现分布式请求追踪和性能监控。