11.1 服务网格 (Service Mesh)

1. Istio 简介

服务网格是一个专用的基础设施层,用于处理服务间通信。Istio 是目前最流行的服务网格解决方案。

Istio 架构组件

istio-system.yaml

apiVersion: v1
kind: Namespace
metadata:
  name: istio-system
  labels:
    istio-injection: disabled
---
apiVersion: install.istio.io/v1alpha1
kind: IstioOperator
metadata:
  name: control-plane
  namespace: istio-system
spec:
  values:
    global:
      meshID: mesh1
      multiCluster:
        clusterName: cluster1
      network: network1
  components:
    pilot:
      k8s:
        resources:
          requests:
            cpu: 100m
            memory: 128Mi
          limits:
            cpu: 500m
            memory: 512Mi
    ingressGateways:
    - name: istio-ingressgateway
      enabled: true
      k8s:
        service:
          type: LoadBalancer
        resources:
          requests:
            cpu: 100m
            memory: 128Mi
          limits:
            cpu: 500m
            memory: 512Mi
    egressGateways:
    - name: istio-egressgateway
      enabled: true
      k8s:
        resources:
          requests:
            cpu: 100m
            memory: 128Mi
          limits:
            cpu: 500m
            memory: 512Mi

2. 流量管理

虚拟服务配置

virtual-service.yaml

apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: user-service-vs
  namespace: microservices
spec:
  hosts:
  - user-service
  http:
  - match:
    - headers:
        version:
          exact: v2
    route:
    - destination:
        host: user-service
        subset: v2
      weight: 100
  - route:
    - destination:
        host: user-service
        subset: v1
      weight: 90
    - destination:
        host: user-service
        subset: v2
      weight: 10
---
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: user-service-dr
  namespace: microservices
spec:
  host: user-service
  trafficPolicy:
    connectionPool:
      tcp:
        maxConnections: 100
      http:
        http1MaxPendingRequests: 50
        maxRequestsPerConnection: 10
    circuitBreaker:
      consecutiveGatewayErrors: 5
      consecutive5xxErrors: 5
      interval: 30s
      baseEjectionTime: 30s
      maxEjectionPercent: 50
    retryPolicy:
      attempts: 3
      perTryTimeout: 2s
      retryOn: gateway-error,connect-failure,refused-stream
  subsets:
  - name: v1
    labels:
      version: v1
  - name: v2
    labels:
      version: v2

网关配置

gateway.yaml

apiVersion: networking.istio.io/v1beta1
kind: Gateway
metadata:
  name: microservices-gateway
  namespace: microservices
spec:
  selector:
    istio: ingressgateway
  servers:
  - port:
      number: 80
      name: http
      protocol: HTTP
    hosts:
    - microservices.example.com
    tls:
      httpsRedirect: true
  - port:
      number: 443
      name: https
      protocol: HTTPS
    tls:
      mode: SIMPLE
      credentialName: microservices-tls
    hosts:
    - microservices.example.com
---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: microservices-vs
  namespace: microservices
spec:
  hosts:
  - microservices.example.com
  gateways:
  - microservices-gateway
  http:
  - match:
    - uri:
        prefix: /api/users
    route:
    - destination:
        host: user-service
        port:
          number: 8081
  - match:
    - uri:
        prefix: /api/orders
    route:
    - destination:
        host: order-service
        port:
          number: 8082
  - match:
    - uri:
        prefix: /
    route:
    - destination:
        host: api-gateway
        port:
          number: 8080

3. 安全策略

认证策略

authentication-policy.yaml

apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
  name: default
  namespace: microservices
spec:
  mtls:
    mode: STRICT
---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
  name: user-service-authz
  namespace: microservices
spec:
  selector:
    matchLabels:
      app: user-service
  rules:
  - from:
    - source:
        principals: ["cluster.local/ns/microservices/sa/api-gateway"]
  - to:
    - operation:
        methods: ["GET", "POST"]
        paths: ["/api/users/*"]
  - when:
    - key: request.headers[authorization]
      values: ["Bearer *"]
---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
  name: order-service-authz
  namespace: microservices
spec:
  selector:
    matchLabels:
      app: order-service
  rules:
  - from:
    - source:
        principals: ["cluster.local/ns/microservices/sa/api-gateway"]
    - source:
        principals: ["cluster.local/ns/microservices/sa/user-service"]
  - to:
    - operation:
        methods: ["GET", "POST", "PUT", "DELETE"]
        paths: ["/api/orders/*"]

11.2 微服务测试策略

1. 测试金字塔

单元测试

UserServiceTest.java

@ExtendWith(MockitoExtension.class)
class UserServiceTest {
    
    @Mock
    private UserRepository userRepository;
    
    @Mock
    private PasswordEncoder passwordEncoder;
    
    @Mock
    private RedisTemplate<String, Object> redisTemplate;
    
    @InjectMocks
    private UserService userService;
    
    @Test
    @DisplayName("创建用户 - 成功")
    void createUser_Success() {
        // Given
        CreateUserRequest request = CreateUserRequest.builder()
            .username("testuser")
            .email("test@example.com")
            .password("password123")
            .build();
        
        User savedUser = User.builder()
            .id(1L)
            .username("testuser")
            .email("test@example.com")
            .password("encoded_password")
            .status(UserStatus.ACTIVE)
            .build();
        
        when(userRepository.existsByUsername("testuser")).thenReturn(false);
        when(userRepository.existsByEmail("test@example.com")).thenReturn(false);
        when(passwordEncoder.encode("password123")).thenReturn("encoded_password");
        when(userRepository.save(any(User.class))).thenReturn(savedUser);
        
        // When
        UserResponse response = userService.createUser(request);
        
        // Then
        assertThat(response).isNotNull();
        assertThat(response.getId()).isEqualTo(1L);
        assertThat(response.getUsername()).isEqualTo("testuser");
        assertThat(response.getEmail()).isEqualTo("test@example.com");
        
        verify(userRepository).existsByUsername("testuser");
        verify(userRepository).existsByEmail("test@example.com");
        verify(passwordEncoder).encode("password123");
        verify(userRepository).save(any(User.class));
    }
    
    @Test
    @DisplayName("创建用户 - 用户名已存在")
    void createUser_UsernameExists() {
        // Given
        CreateUserRequest request = CreateUserRequest.builder()
            .username("existinguser")
            .email("test@example.com")
            .password("password123")
            .build();
        
        when(userRepository.existsByUsername("existinguser")).thenReturn(true);
        
        // When & Then
        assertThatThrownBy(() -> userService.createUser(request))
            .isInstanceOf(BusinessException.class)
            .hasMessage("用户名已存在");
        
        verify(userRepository).existsByUsername("existinguser");
        verify(userRepository, never()).save(any(User.class));
    }
    
    @Test
    @DisplayName("获取用户信息 - 从缓存")
    void getUserById_FromCache() {
        // Given
        Long userId = 1L;
        UserResponse cachedUser = UserResponse.builder()
            .id(userId)
            .username("testuser")
            .email("test@example.com")
            .build();
        
        when(redisTemplate.opsForValue().get("user:" + userId))
            .thenReturn(cachedUser);
        
        // When
        UserResponse response = userService.getUserById(userId);
        
        // Then
        assertThat(response).isNotNull();
        assertThat(response.getId()).isEqualTo(userId);
        assertThat(response.getUsername()).isEqualTo("testuser");
        
        verify(redisTemplate.opsForValue()).get("user:" + userId);
        verify(userRepository, never()).findById(userId);
    }
}

集成测试

UserControllerIntegrationTest.java

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@TestPropertySource(properties = {
    "spring.datasource.url=jdbc:h2:mem:testdb",
    "spring.jpa.hibernate.ddl-auto=create-drop",
    "spring.redis.host=localhost",
    "spring.redis.port=6370"
})
@Testcontainers
class UserControllerIntegrationTest {
    
    @Container
    static RedisContainer redis = new RedisContainer("redis:6.2")
        .withExposedPorts(6379);
    
    @Autowired
    private TestRestTemplate restTemplate;
    
    @Autowired
    private UserRepository userRepository;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @DynamicPropertySource
    static void configureProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.redis.host", redis::getHost);
        registry.add("spring.redis.port", redis::getFirstMappedPort);
    }
    
    @BeforeEach
    void setUp() {
        userRepository.deleteAll();
        redisTemplate.getConnectionFactory().getConnection().flushAll();
    }
    
    @Test
    @DisplayName("创建用户 - 完整流程")
    void createUser_FullFlow() {
        // Given
        CreateUserRequest request = CreateUserRequest.builder()
            .username("testuser")
            .email("test@example.com")
            .password("password123")
            .build();
        
        // When
        ResponseEntity<UserResponse> response = restTemplate.postForEntity(
            "/api/users", request, UserResponse.class);
        
        // Then
        assertThat(response.getStatusCode()).isEqualTo(HttpStatus.CREATED);
        assertThat(response.getBody()).isNotNull();
        assertThat(response.getBody().getUsername()).isEqualTo("testuser");
        assertThat(response.getBody().getEmail()).isEqualTo("test@example.com");
        
        // 验证数据库
        Optional<User> savedUser = userRepository.findByUsername("testuser");
        assertThat(savedUser).isPresent();
        assertThat(savedUser.get().getEmail()).isEqualTo("test@example.com");
    }
    
    @Test
    @DisplayName("获取用户信息 - 缓存测试")
    void getUserById_CacheTest() {
        // Given - 创建用户
        User user = User.builder()
            .username("testuser")
            .email("test@example.com")
            .password("encoded_password")
            .status(UserStatus.ACTIVE)
            .build();
        User savedUser = userRepository.save(user);
        
        // When - 第一次请求
        ResponseEntity<UserResponse> response1 = restTemplate.getForEntity(
            "/api/users/" + savedUser.getId(), UserResponse.class);
        
        // Then - 验证响应
        assertThat(response1.getStatusCode()).isEqualTo(HttpStatus.OK);
        assertThat(response1.getBody()).isNotNull();
        
        // When - 第二次请求(应该从缓存获取)
        ResponseEntity<UserResponse> response2 = restTemplate.getForEntity(
            "/api/users/" + savedUser.getId(), UserResponse.class);
        
        // Then - 验证缓存
        assertThat(response2.getStatusCode()).isEqualTo(HttpStatus.OK);
        assertThat(response2.getBody()).isEqualTo(response1.getBody());
        
        // 验证Redis中有缓存
        Object cachedUser = redisTemplate.opsForValue().get("user:" + savedUser.getId());
        assertThat(cachedUser).isNotNull();
    }
}

2. 契约测试

Pact 消费者测试

UserServiceConsumerTest.java

@ExtendWith(PactConsumerTestExt.class)
@PactTestFor(providerName = "user-service")
class UserServiceConsumerTest {
    
    @Mock
    private UserServiceClient userServiceClient;
    
    @Pact(consumer = "order-service")
    public RequestResponsePact getUserPact(PactDslWithProvider builder) {
        return builder
            .given("user exists with id 1")
            .uponReceiving("a request for user with id 1")
            .path("/api/users/1")
            .method("GET")
            .headers(Map.of("Authorization", "Bearer token"))
            .willRespondWith()
            .status(200)
            .headers(Map.of("Content-Type", "application/json"))
            .body(LambdaDsl.newJsonBody(body -> body
                .numberType("id", 1)
                .stringType("username", "testuser")
                .stringType("email", "test@example.com")
                .stringType("status", "ACTIVE")
            ).build())
            .toPact();
    }
    
    @Test
    @PactTestFor(pactMethod = "getUserPact")
    void testGetUser(MockServer mockServer) {
        // Given
        UserServiceClient client = new UserServiceClient(mockServer.getUrl());
        
        // When
        UserResponse user = client.getUserById(1L, "Bearer token");
        
        // Then
        assertThat(user).isNotNull();
        assertThat(user.getId()).isEqualTo(1L);
        assertThat(user.getUsername()).isEqualTo("testuser");
        assertThat(user.getEmail()).isEqualTo("test@example.com");
        assertThat(user.getStatus()).isEqualTo("ACTIVE");
    }
}

Pact 提供者测试

UserServiceProviderTest.java

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Provider("user-service")
@PactBroker(url = "http://localhost:9292")
class UserServiceProviderTest {
    
    @LocalServerPort
    private int port;
    
    @Autowired
    private UserRepository userRepository;
    
    @TestTemplate
    @ExtendWith(PactVerificationInvocationContextProvider.class)
    void pactVerificationTestTemplate(PactVerificationContext context) {
        context.verifyInteraction();
    }
    
    @BeforeEach
    void before(PactVerificationContext context) {
        context.setTarget(new HttpTestTarget("localhost", port));
    }
    
    @State("user exists with id 1")
    void userExistsWithId1() {
        User user = User.builder()
            .id(1L)
            .username("testuser")
            .email("test@example.com")
            .password("encoded_password")
            .status(UserStatus.ACTIVE)
            .build();
        userRepository.save(user);
    }
}

3. 端到端测试

Selenium 测试

UserManagementE2ETest.java

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
@Testcontainers
class UserManagementE2ETest {
    
    @Container
    static BrowserWebDriverContainer<?> chrome = new BrowserWebDriverContainer<>()
        .withCapabilities(new ChromeOptions());
    
    private WebDriver driver;
    
    @BeforeEach
    void setUp() {
        driver = chrome.getWebDriver();
    }
    
    @Test
    @DisplayName("用户注册流程")
    void userRegistrationFlow() {
        // Given
        driver.get("http://localhost:8080/register");
        
        // When - 填写注册表单
        WebElement usernameField = driver.findElement(By.id("username"));
        WebElement emailField = driver.findElement(By.id("email"));
        WebElement passwordField = driver.findElement(By.id("password"));
        WebElement submitButton = driver.findElement(By.id("submit"));
        
        usernameField.sendKeys("testuser");
        emailField.sendKeys("test@example.com");
        passwordField.sendKeys("password123");
        submitButton.click();
        
        // Then - 验证注册成功
        WebDriverWait wait = new WebDriverWait(driver, Duration.ofSeconds(10));
        WebElement successMessage = wait.until(
            ExpectedConditions.presenceOfElementLocated(By.className("success-message"))
        );
        
        assertThat(successMessage.getText()).contains("注册成功");
        
        // 验证跳转到登录页面
        wait.until(ExpectedConditions.urlContains("/login"));
        assertThat(driver.getCurrentUrl()).contains("/login");
    }
    
    @Test
    @DisplayName("用户登录流程")
    void userLoginFlow() {
        // Given - 先创建用户
        createTestUser();
        
        driver.get("http://localhost:8080/login");
        
        // When - 登录
        WebElement usernameField = driver.findElement(By.id("username"));
        WebElement passwordField = driver.findElement(By.id("password"));
        WebElement loginButton = driver.findElement(By.id("login"));
        
        usernameField.sendKeys("testuser");
        passwordField.sendKeys("password123");
        loginButton.click();
        
        // Then - 验证登录成功
        WebDriverWait wait = new WebDriverWait(driver, Duration.ofSeconds(10));
        wait.until(ExpectedConditions.urlContains("/dashboard"));
        
        WebElement welcomeMessage = driver.findElement(By.className("welcome-message"));
        assertThat(welcomeMessage.getText()).contains("欢迎, testuser");
    }
    
    private void createTestUser() {
        // 通过API创建测试用户
        RestTemplate restTemplate = new RestTemplate();
        CreateUserRequest request = CreateUserRequest.builder()
            .username("testuser")
            .email("test@example.com")
            .password("password123")
            .build();
        
        restTemplate.postForEntity(
            "http://localhost:8080/api/users", request, UserResponse.class);
    }
}

11.3 性能优化策略

1. 缓存策略

多级缓存实现

MultiLevelCacheManager.java

@Component
@Slf4j
public class MultiLevelCacheManager {
    
    private final Cache<String, Object> localCache;
    private final RedisTemplate<String, Object> redisTemplate;
    private final CacheMetrics cacheMetrics;
    
    public MultiLevelCacheManager(RedisTemplate<String, Object> redisTemplate,
                                 CacheMetrics cacheMetrics) {
        this.redisTemplate = redisTemplate;
        this.cacheMetrics = cacheMetrics;
        this.localCache = Caffeine.newBuilder()
            .maximumSize(1000)
            .expireAfterWrite(Duration.ofMinutes(5))
            .recordStats()
            .removalListener((key, value, cause) -> {
                log.debug("Local cache eviction: key={}, cause={}", key, cause);
                cacheMetrics.recordEviction("local", cause.toString());
            })
            .build();
    }
    
    @SuppressWarnings("unchecked")
    public <T> Optional<T> get(String key, Class<T> type) {
        Timer.Sample sample = Timer.start();
        
        try {
            // 1. 尝试从本地缓存获取
            Object localValue = localCache.getIfPresent(key);
            if (localValue != null) {
                cacheMetrics.recordHit("local");
                log.debug("Cache hit from local: key={}", key);
                return Optional.of((T) localValue);
            }
            
            cacheMetrics.recordMiss("local");
            
            // 2. 尝试从Redis获取
            Object redisValue = redisTemplate.opsForValue().get(key);
            if (redisValue != null) {
                // 回填本地缓存
                localCache.put(key, redisValue);
                cacheMetrics.recordHit("redis");
                log.debug("Cache hit from Redis: key={}", key);
                return Optional.of((T) redisValue);
            }
            
            cacheMetrics.recordMiss("redis");
            log.debug("Cache miss: key={}", key);
            return Optional.empty();
            
        } finally {
            sample.stop(Timer.builder("cache.get")
                .tag("key", key)
                .register(Metrics.globalRegistry));
        }
    }
    
    public void put(String key, Object value, Duration ttl) {
        // 存储到本地缓存
        localCache.put(key, value);
        
        // 存储到Redis
        redisTemplate.opsForValue().set(key, value, ttl);
        
        log.debug("Cache put: key={}, ttl={}", key, ttl);
        cacheMetrics.recordPut("multi-level");
    }
    
    public void evict(String key) {
        localCache.invalidate(key);
        redisTemplate.delete(key);
        
        log.debug("Cache evict: key={}", key);
        cacheMetrics.recordEviction("multi-level", "manual");
    }
    
    public void evictPattern(String pattern) {
        // 清除本地缓存中匹配的键
        localCache.asMap().keySet().stream()
            .filter(key -> key.matches(pattern.replace("*", ".*")))
            .forEach(localCache::invalidate);
        
        // 清除Redis中匹配的键
        Set<String> keys = redisTemplate.keys(pattern);
        if (keys != null && !keys.isEmpty()) {
            redisTemplate.delete(keys);
        }
        
        log.debug("Cache evict pattern: pattern={}", pattern);
    }
    
    @EventListener
    public void handleCacheInvalidationEvent(CacheInvalidationEvent event) {
        if (event.isPattern()) {
            evictPattern(event.getKey());
        } else {
            evict(event.getKey());
        }
    }
    
    @Scheduled(fixedRate = 60000) // 每分钟执行一次
    public void reportCacheStats() {
        CacheStats stats = localCache.stats();
        
        Gauge.builder("cache.local.size")
            .register(Metrics.globalRegistry, localCache, cache -> cache.estimatedSize());
        
        Gauge.builder("cache.local.hit_rate")
            .register(Metrics.globalRegistry, stats, CacheStats::hitRate);
        
        Gauge.builder("cache.local.miss_rate")
            .register(Metrics.globalRegistry, stats, CacheStats::missRate);
        
        log.info("Local cache stats: size={}, hitRate={}, missRate={}", 
            localCache.estimatedSize(), stats.hitRate(), stats.missRate());
    }
}

缓存预热策略

CacheWarmupService.java

@Service
@Slf4j
public class CacheWarmupService {
    
    private final UserService userService;
    private final ProductService productService;
    private final MultiLevelCacheManager cacheManager;
    private final ApplicationEventPublisher eventPublisher;
    
    @EventListener(ApplicationReadyEvent.class)
    public void warmupCache() {
        log.info("Starting cache warmup...");
        
        CompletableFuture.allOf(
            CompletableFuture.runAsync(this::warmupUserCache),
            CompletableFuture.runAsync(this::warmupProductCache),
            CompletableFuture.runAsync(this::warmupConfigCache)
        ).thenRun(() -> {
            log.info("Cache warmup completed");
            eventPublisher.publishEvent(new CacheWarmupCompletedEvent());
        }).exceptionally(throwable -> {
            log.error("Cache warmup failed", throwable);
            return null;
        });
    }
    
    private void warmupUserCache() {
        try {
            log.info("Warming up user cache...");
            
            // 预热活跃用户数据
            List<Long> activeUserIds = userService.getActiveUserIds(1000);
            
            activeUserIds.parallelStream().forEach(userId -> {
                try {
                    UserResponse user = userService.getUserById(userId);
                    cacheManager.put("user:" + userId, user, Duration.ofHours(1));
                } catch (Exception e) {
                    log.warn("Failed to warmup user cache for userId: {}", userId, e);
                }
            });
            
            log.info("User cache warmup completed: {} users", activeUserIds.size());
            
        } catch (Exception e) {
            log.error("User cache warmup failed", e);
        }
    }
    
    private void warmupProductCache() {
        try {
            log.info("Warming up product cache...");
            
            // 预热热门商品数据
            List<Long> popularProductIds = productService.getPopularProductIds(500);
            
            popularProductIds.parallelStream().forEach(productId -> {
                try {
                    ProductResponse product = productService.getProductById(productId);
                    cacheManager.put("product:" + productId, product, Duration.ofHours(2));
                } catch (Exception e) {
                    log.warn("Failed to warmup product cache for productId: {}", productId, e);
                }
            });
            
            log.info("Product cache warmup completed: {} products", popularProductIds.size());
            
        } catch (Exception e) {
            log.error("Product cache warmup failed", e);
        }
    }
    
    private void warmupConfigCache() {
        try {
            log.info("Warming up config cache...");
            
            // 预热系统配置
            Map<String, String> configs = Map.of(
                "system.maintenance.enabled", "false",
                "feature.new_ui.enabled", "true",
                "rate_limit.default", "1000"
            );
            
            configs.forEach((key, value) -> {
                cacheManager.put("config:" + key, value, Duration.ofMinutes(30));
            });
            
            log.info("Config cache warmup completed: {} configs", configs.size());
            
        } catch (Exception e) {
            log.error("Config cache warmup failed", e);
        }
    }
}

2. 数据库优化

读写分离配置

DatabaseConfiguration.java

@Configuration
@EnableJpaRepositories(basePackages = "com.example.repository")
public class DatabaseConfiguration {
    
    @Bean
    @Primary
    @ConfigurationProperties("spring.datasource.master")
    public DataSource masterDataSource() {
        return DataSourceBuilder.create()
            .type(HikariDataSource.class)
            .build();
    }
    
    @Bean
    @ConfigurationProperties("spring.datasource.slave")
    public DataSource slaveDataSource() {
        return DataSourceBuilder.create()
            .type(HikariDataSource.class)
            .build();
    }
    
    @Bean
    public DataSource routingDataSource() {
        RoutingDataSource routingDataSource = new RoutingDataSource();
        
        Map<Object, Object> dataSourceMap = new HashMap<>();
        dataSourceMap.put("master", masterDataSource());
        dataSourceMap.put("slave", slaveDataSource());
        
        routingDataSource.setTargetDataSources(dataSourceMap);
        routingDataSource.setDefaultTargetDataSource(masterDataSource());
        
        return routingDataSource;
    }
    
    @Bean
    public EntityManagerFactory entityManagerFactory() {
        LocalContainerEntityManagerFactoryBean factory = 
            new LocalContainerEntityManagerFactoryBean();
        
        factory.setDataSource(routingDataSource());
        factory.setPackagesToScan("com.example.entity");
        factory.setJpaVendorAdapter(new HibernateJpaVendorAdapter());
        
        Properties jpaProperties = new Properties();
        jpaProperties.setProperty("hibernate.dialect", "org.hibernate.dialect.MySQL8Dialect");
        jpaProperties.setProperty("hibernate.show_sql", "false");
        jpaProperties.setProperty("hibernate.format_sql", "true");
        jpaProperties.setProperty("hibernate.jdbc.batch_size", "50");
        jpaProperties.setProperty("hibernate.order_inserts", "true");
        jpaProperties.setProperty("hibernate.order_updates", "true");
        jpaProperties.setProperty("hibernate.jdbc.batch_versioned_data", "true");
        
        factory.setJpaProperties(jpaProperties);
        factory.afterPropertiesSet();
        
        return factory.getObject();
    }
    
    @Bean
    public PlatformTransactionManager transactionManager() {
        JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setEntityManagerFactory(entityManagerFactory());
        return transactionManager;
    }
}

动态数据源路由

RoutingDataSource.java

public class RoutingDataSource extends AbstractRoutingDataSource {
    
    @Override
    protected Object determineCurrentLookupKey() {
        return DataSourceContextHolder.getDataSourceType();
    }
}

@Component
public class DataSourceContextHolder {
    
    private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>();
    
    public static void setDataSourceType(String dataSourceType) {
        CONTEXT_HOLDER.set(dataSourceType);
    }
    
    public static String getDataSourceType() {
        return CONTEXT_HOLDER.get();
    }
    
    public static void clearDataSourceType() {
        CONTEXT_HOLDER.remove();
    }
}

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ReadOnly {
}

@Aspect
@Component
@Order(1)
public class DataSourceAspect {
    
    @Before("@annotation(readOnly)")
    public void setReadDataSourceType(ReadOnly readOnly) {
        DataSourceContextHolder.setDataSourceType("slave");
    }
    
    @Before("@annotation(org.springframework.transaction.annotation.Transactional)")
    public void setWriteDataSourceType() {
        if (DataSourceContextHolder.getDataSourceType() == null) {
            DataSourceContextHolder.setDataSourceType("master");
        }
    }
    
    @After("@annotation(readOnly) || @annotation(org.springframework.transaction.annotation.Transactional)")
    public void clearDataSourceType() {
        DataSourceContextHolder.clearDataSourceType();
    }
}

3. 异步处理优化

异步任务配置

AsyncConfiguration.java

@Configuration
@EnableAsync
public class AsyncConfiguration implements AsyncConfigurer {
    
    @Bean(name = "taskExecutor")
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        
        // 核心线程数
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        // 最大线程数
        executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
        // 队列容量
        executor.setQueueCapacity(500);
        // 线程名前缀
        executor.setThreadNamePrefix("async-task-");
        // 拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 线程空闲时间
        executor.setKeepAliveSeconds(60);
        // 等待任务完成后关闭
        executor.setWaitForTasksToCompleteOnShutdown(true);
        // 等待时间
        executor.setAwaitTerminationSeconds(60);
        
        executor.initialize();
        return executor;
    }
    
    @Bean(name = "ioTaskExecutor")
    public ThreadPoolTaskExecutor ioTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        
        // IO密集型任务使用更多线程
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);
        executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4);
        executor.setQueueCapacity(1000);
        executor.setThreadNamePrefix("io-task-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setKeepAliveSeconds(60);
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        
        executor.initialize();
        return executor;
    }
    
    @Override
    public Executor getAsyncExecutor() {
        return taskExecutor();
    }
    
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new CustomAsyncExceptionHandler();
    }
}

@Component
@Slf4j
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
    
    @Override
    public void handleUncaughtException(Throwable throwable, Method method, Object... objects) {
        log.error("Async method execution failed: method={}, args={}", 
            method.getName(), Arrays.toString(objects), throwable);
        
        // 发送告警
        // alertService.sendAlert("Async task failed", throwable.getMessage());
    }
}

11.4 微服务治理

1. 服务版本管理

版本控制策略

VersionController.java

@RestController
@RequestMapping("/api/v1/version")
@Api(tags = "版本管理")
public class VersionController {
    
    private final VersionService versionService;
    
    @GetMapping("/info")
    @ApiOperation("获取服务版本信息")
    public ResponseEntity<VersionInfo> getVersionInfo() {
        VersionInfo versionInfo = versionService.getCurrentVersionInfo();
        return ResponseEntity.ok(versionInfo);
    }
    
    @GetMapping("/compatibility")
    @ApiOperation("检查版本兼容性")
    public ResponseEntity<CompatibilityResult> checkCompatibility(
            @RequestParam String targetVersion) {
        CompatibilityResult result = versionService.checkCompatibility(targetVersion);
        return ResponseEntity.ok(result);
    }
    
    @PostMapping("/migrate")
    @ApiOperation("执行版本迁移")
    public ResponseEntity<MigrationResult> migrateVersion(
            @RequestBody MigrationRequest request) {
        MigrationResult result = versionService.migrateToVersion(request);
        return ResponseEntity.ok(result);
    }
}

@Service
@Slf4j
public class VersionService {
    
    private final VersionRepository versionRepository;
    private final MigrationExecutor migrationExecutor;
    
    @Value("${app.version}")
    private String currentVersion;
    
    @Value("${app.build.time}")
    private String buildTime;
    
    @Value("${app.git.commit}")
    private String gitCommit;
    
    public VersionInfo getCurrentVersionInfo() {
        return VersionInfo.builder()
            .version(currentVersion)
            .buildTime(buildTime)
            .gitCommit(gitCommit)
            .supportedApiVersions(getSupportedApiVersions())
            .deprecatedFeatures(getDeprecatedFeatures())
            .build();
    }
    
    public CompatibilityResult checkCompatibility(String targetVersion) {
        try {
            Version current = Version.parse(currentVersion);
            Version target = Version.parse(targetVersion);
            
            boolean isCompatible = isVersionCompatible(current, target);
            List<String> warnings = getCompatibilityWarnings(current, target);
            List<String> breakingChanges = getBreakingChanges(current, target);
            
            return CompatibilityResult.builder()
                .compatible(isCompatible)
                .warnings(warnings)
                .breakingChanges(breakingChanges)
                .migrationRequired(!breakingChanges.isEmpty())
                .build();
                
        } catch (Exception e) {
            log.error("Failed to check version compatibility", e);
            throw new BusinessException("版本兼容性检查失败");
        }
    }
    
    @Transactional
    public MigrationResult migrateToVersion(MigrationRequest request) {
        try {
            log.info("Starting migration from {} to {}", currentVersion, request.getTargetVersion());
            
            // 1. 验证迁移前置条件
            validateMigrationPreconditions(request);
            
            // 2. 创建迁移记录
            Migration migration = createMigrationRecord(request);
            
            // 3. 执行迁移步骤
            List<MigrationStep> steps = migrationExecutor.executeMigration(migration);
            
            // 4. 验证迁移结果
            validateMigrationResult(migration);
            
            // 5. 更新版本信息
            updateVersionInfo(request.getTargetVersion());
            
            log.info("Migration completed successfully");
            
            return MigrationResult.builder()
                .success(true)
                .migrationId(migration.getId())
                .executedSteps(steps)
                .duration(migration.getDuration())
                .build();
                
        } catch (Exception e) {
            log.error("Migration failed", e);
            // 回滚迁移
            rollbackMigration(request);
            throw new BusinessException("版本迁移失败: " + e.getMessage());
        }
    }
    
    private boolean isVersionCompatible(Version current, Version target) {
        // 主版本号不同时不兼容
        if (current.getMajor() != target.getMajor()) {
            return false;
        }
        
        // 次版本号向后兼容
        return target.getMinor() >= current.getMinor();
    }
    
    private List<String> getSupportedApiVersions() {
        return Arrays.asList("v1", "v2");
    }
    
    private List<String> getDeprecatedFeatures() {
        return Arrays.asList(
            "Legacy user authentication (将在v3.0中移除)",
            "Old order status API (将在v2.5中移除)"
        );
    }
}

2. 灰度发布

灰度发布控制器

CanaryDeploymentController.java

@RestController
@RequestMapping("/api/v1/canary")
@Api(tags = "灰度发布")
public class CanaryDeploymentController {
    
    private final CanaryDeploymentService canaryService;
    
    @PostMapping("/start")
    @ApiOperation("开始灰度发布")
    public ResponseEntity<CanaryDeployment> startCanaryDeployment(
            @RequestBody @Valid StartCanaryRequest request) {
        CanaryDeployment deployment = canaryService.startCanaryDeployment(request);
        return ResponseEntity.ok(deployment);
    }
    
    @GetMapping("/{deploymentId}/status")
    @ApiOperation("获取灰度发布状态")
    public ResponseEntity<CanaryStatus> getCanaryStatus(
            @PathVariable String deploymentId) {
        CanaryStatus status = canaryService.getCanaryStatus(deploymentId);
        return ResponseEntity.ok(status);
    }
    
    @PostMapping("/{deploymentId}/promote")
    @ApiOperation("推广灰度版本")
    public ResponseEntity<Void> promoteCanary(
            @PathVariable String deploymentId,
            @RequestBody @Valid PromoteCanaryRequest request) {
        canaryService.promoteCanary(deploymentId, request);
        return ResponseEntity.ok().build();
    }
    
    @PostMapping("/{deploymentId}/rollback")
    @ApiOperation("回滚灰度版本")
    public ResponseEntity<Void> rollbackCanary(
            @PathVariable String deploymentId) {
        canaryService.rollbackCanary(deploymentId);
        return ResponseEntity.ok().build();
    }
}

@Service
@Slf4j
public class CanaryDeploymentService {
    
    private final CanaryDeploymentRepository deploymentRepository;
    private final TrafficSplitter trafficSplitter;
    private final MetricsCollector metricsCollector;
    private final AlertService alertService;
    
    @Transactional
    public CanaryDeployment startCanaryDeployment(StartCanaryRequest request) {
        log.info("Starting canary deployment: {}", request);
        
        // 1. 验证部署条件
        validateDeploymentConditions(request);
        
        // 2. 创建灰度部署记录
        CanaryDeployment deployment = CanaryDeployment.builder()
            .id(UUID.randomUUID().toString())
            .serviceName(request.getServiceName())
            .currentVersion(request.getCurrentVersion())
            .canaryVersion(request.getCanaryVersion())
            .trafficPercentage(request.getInitialTrafficPercentage())
            .status(CanaryStatus.STARTING)
            .strategy(request.getStrategy())
            .createdAt(Instant.now())
            .build();
        
        deploymentRepository.save(deployment);
        
        // 3. 部署灰度版本
        deployCanaryVersion(deployment);
        
        // 4. 配置流量分割
        trafficSplitter.configureTrafficSplit(
            deployment.getServiceName(),
            deployment.getTrafficPercentage()
        );
        
        // 5. 启动监控
        startCanaryMonitoring(deployment);
        
        deployment.setStatus(CanaryStatus.RUNNING);
        deploymentRepository.save(deployment);
        
        log.info("Canary deployment started: {}", deployment.getId());
        return deployment;
    }
    
    public CanaryStatus getCanaryStatus(String deploymentId) {
        CanaryDeployment deployment = getDeploymentById(deploymentId);
        
        // 收集当前指标
        CanaryMetrics metrics = metricsCollector.collectCanaryMetrics(
            deployment.getServiceName(),
            deployment.getCanaryVersion()
        );
        
        return CanaryStatus.builder()
            .deploymentId(deploymentId)
            .status(deployment.getStatus())
            .trafficPercentage(deployment.getTrafficPercentage())
            .metrics(metrics)
            .healthChecks(getHealthChecks(deployment))
            .duration(Duration.between(deployment.getCreatedAt(), Instant.now()))
            .build();
    }
    
    @Transactional
    public void promoteCanary(String deploymentId, PromoteCanaryRequest request) {
        CanaryDeployment deployment = getDeploymentById(deploymentId);
        
        if (deployment.getStatus() != CanaryStatus.RUNNING) {
            throw new BusinessException("只能推广运行中的灰度部署");
        }
        
        log.info("Promoting canary deployment: {}", deploymentId);
        
        try {
            // 1. 逐步增加流量
            promoteTrafficGradually(deployment, request.getTargetPercentage());
            
            // 2. 如果是完全推广,替换生产版本
            if (request.getTargetPercentage() == 100) {
                replaceProductionVersion(deployment);
                deployment.setStatus(CanaryStatus.PROMOTED);
            } else {
                deployment.setTrafficPercentage(request.getTargetPercentage());
            }
            
            deploymentRepository.save(deployment);
            
            log.info("Canary promotion completed: {}", deploymentId);
            
        } catch (Exception e) {
            log.error("Canary promotion failed: {}", deploymentId, e);
            alertService.sendAlert("Canary promotion failed", e.getMessage());
            throw new BusinessException("灰度推广失败: " + e.getMessage());
        }
    }
    
    @Transactional
    public void rollbackCanary(String deploymentId) {
        CanaryDeployment deployment = getDeploymentById(deploymentId);
        
        log.info("Rolling back canary deployment: {}", deploymentId);
        
        try {
            // 1. 停止流量到灰度版本
            trafficSplitter.stopCanaryTraffic(deployment.getServiceName());
            
            // 2. 删除灰度版本
            removeCanaryVersion(deployment);
            
            // 3. 停止监控
            stopCanaryMonitoring(deployment);
            
            deployment.setStatus(CanaryStatus.ROLLED_BACK);
            deployment.setRolledBackAt(Instant.now());
            deploymentRepository.save(deployment);
            
            log.info("Canary rollback completed: {}", deploymentId);
            
        } catch (Exception e) {
            log.error("Canary rollback failed: {}", deploymentId, e);
            throw new BusinessException("灰度回滚失败: " + e.getMessage());
        }
    }
    
    @Scheduled(fixedRate = 30000) // 每30秒检查一次
    public void monitorCanaryDeployments() {
        List<CanaryDeployment> runningDeployments = 
            deploymentRepository.findByStatus(CanaryStatus.RUNNING);
        
        for (CanaryDeployment deployment : runningDeployments) {
            try {
                checkCanaryHealth(deployment);
            } catch (Exception e) {
                log.error("Failed to check canary health: {}", deployment.getId(), e);
            }
        }
    }
    
    private void checkCanaryHealth(CanaryDeployment deployment) {
        CanaryMetrics metrics = metricsCollector.collectCanaryMetrics(
            deployment.getServiceName(),
            deployment.getCanaryVersion()
        );
        
        // 检查错误率
        if (metrics.getErrorRate() > deployment.getStrategy().getMaxErrorRate()) {
            log.warn("Canary error rate too high: {} > {}", 
                metrics.getErrorRate(), deployment.getStrategy().getMaxErrorRate());
            
            alertService.sendAlert(
                "Canary error rate exceeded threshold",
                String.format("Deployment: %s, Error rate: %.2f%%", 
                    deployment.getId(), metrics.getErrorRate() * 100)
            );
            
            // 自动回滚
            if (deployment.getStrategy().isAutoRollback()) {
                rollbackCanary(deployment.getId());
            }
        }
        
        // 检查响应时间
        if (metrics.getAvgResponseTime() > deployment.getStrategy().getMaxResponseTime()) {
            log.warn("Canary response time too high: {} > {}", 
                metrics.getAvgResponseTime(), deployment.getStrategy().getMaxResponseTime());
            
            alertService.sendAlert(
                "Canary response time exceeded threshold",
                String.format("Deployment: %s, Response time: %dms", 
                    deployment.getId(), metrics.getAvgResponseTime())
            );
        }
    }
}

总结

本章深入探讨了微服务的高级主题与最佳实践,涵盖了以下核心内容:

核心概念

  1. 服务网格 - Istio架构、流量管理、安全策略
  2. 微服务测试 - 测试金字塔、契约测试、端到端测试
  3. 性能优化 - 多级缓存、数据库优化、异步处理
  4. 微服务治理 - 版本管理、灰度发布、流量控制

最佳实践

  1. 架构设计 - 服务拆分原则、领域驱动设计、事件驱动架构
  2. 开发实践 - 代码规范、测试策略、持续集成
  3. 运维实践 - 监控告警、日志管理、故障处理
  4. 治理实践 - 服务治理、版本控制、发布策略

注意事项

  1. 复杂性管理 - 避免过度拆分、合理设计边界
  2. 性能考虑 - 网络延迟、数据一致性、缓存策略
  3. 安全防护 - 服务间认证、数据加密、访问控制
  4. 运维成本 - 监控复杂度、部署复杂度、人员技能要求

扩展方向

  1. 云原生技术 - Kubernetes、服务网格、Serverless
  2. 新兴技术 - 边缘计算、AI/ML集成、区块链
  3. 行业实践 - 金融级微服务、电商微服务、物联网微服务

通过本教程的学习,你已经全面掌握了Spring Cloud微服务开发的核心技术和最佳实践。希望这些知识能够帮助你在实际项目中构建高质量、高可用的微服务系统。

微服务架构是一个不断发展的领域,建议持续关注新技术和最佳实践,在实践中不断优化和改进你的微服务系统。