11.1 服务网格 (Service Mesh)
1. Istio 简介
服务网格是一个专用的基础设施层,用于处理服务间通信。Istio 是目前最流行的服务网格解决方案。
Istio 架构组件
istio-system.yaml
apiVersion: v1
kind: Namespace
metadata:
name: istio-system
labels:
istio-injection: disabled
---
apiVersion: install.istio.io/v1alpha1
kind: IstioOperator
metadata:
name: control-plane
namespace: istio-system
spec:
values:
global:
meshID: mesh1
multiCluster:
clusterName: cluster1
network: network1
components:
pilot:
k8s:
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 500m
memory: 512Mi
ingressGateways:
- name: istio-ingressgateway
enabled: true
k8s:
service:
type: LoadBalancer
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 500m
memory: 512Mi
egressGateways:
- name: istio-egressgateway
enabled: true
k8s:
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 500m
memory: 512Mi
2. 流量管理
虚拟服务配置
virtual-service.yaml
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: user-service-vs
namespace: microservices
spec:
hosts:
- user-service
http:
- match:
- headers:
version:
exact: v2
route:
- destination:
host: user-service
subset: v2
weight: 100
- route:
- destination:
host: user-service
subset: v1
weight: 90
- destination:
host: user-service
subset: v2
weight: 10
---
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: user-service-dr
namespace: microservices
spec:
host: user-service
trafficPolicy:
connectionPool:
tcp:
maxConnections: 100
http:
http1MaxPendingRequests: 50
maxRequestsPerConnection: 10
circuitBreaker:
consecutiveGatewayErrors: 5
consecutive5xxErrors: 5
interval: 30s
baseEjectionTime: 30s
maxEjectionPercent: 50
retryPolicy:
attempts: 3
perTryTimeout: 2s
retryOn: gateway-error,connect-failure,refused-stream
subsets:
- name: v1
labels:
version: v1
- name: v2
labels:
version: v2
网关配置
gateway.yaml
apiVersion: networking.istio.io/v1beta1
kind: Gateway
metadata:
name: microservices-gateway
namespace: microservices
spec:
selector:
istio: ingressgateway
servers:
- port:
number: 80
name: http
protocol: HTTP
hosts:
- microservices.example.com
tls:
httpsRedirect: true
- port:
number: 443
name: https
protocol: HTTPS
tls:
mode: SIMPLE
credentialName: microservices-tls
hosts:
- microservices.example.com
---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: microservices-vs
namespace: microservices
spec:
hosts:
- microservices.example.com
gateways:
- microservices-gateway
http:
- match:
- uri:
prefix: /api/users
route:
- destination:
host: user-service
port:
number: 8081
- match:
- uri:
prefix: /api/orders
route:
- destination:
host: order-service
port:
number: 8082
- match:
- uri:
prefix: /
route:
- destination:
host: api-gateway
port:
number: 8080
3. 安全策略
认证策略
authentication-policy.yaml
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
name: default
namespace: microservices
spec:
mtls:
mode: STRICT
---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: user-service-authz
namespace: microservices
spec:
selector:
matchLabels:
app: user-service
rules:
- from:
- source:
principals: ["cluster.local/ns/microservices/sa/api-gateway"]
- to:
- operation:
methods: ["GET", "POST"]
paths: ["/api/users/*"]
- when:
- key: request.headers[authorization]
values: ["Bearer *"]
---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: order-service-authz
namespace: microservices
spec:
selector:
matchLabels:
app: order-service
rules:
- from:
- source:
principals: ["cluster.local/ns/microservices/sa/api-gateway"]
- source:
principals: ["cluster.local/ns/microservices/sa/user-service"]
- to:
- operation:
methods: ["GET", "POST", "PUT", "DELETE"]
paths: ["/api/orders/*"]
11.2 微服务测试策略
1. 测试金字塔
单元测试
UserServiceTest.java
@ExtendWith(MockitoExtension.class)
class UserServiceTest {
@Mock
private UserRepository userRepository;
@Mock
private PasswordEncoder passwordEncoder;
@Mock
private RedisTemplate<String, Object> redisTemplate;
@InjectMocks
private UserService userService;
@Test
@DisplayName("创建用户 - 成功")
void createUser_Success() {
// Given
CreateUserRequest request = CreateUserRequest.builder()
.username("testuser")
.email("test@example.com")
.password("password123")
.build();
User savedUser = User.builder()
.id(1L)
.username("testuser")
.email("test@example.com")
.password("encoded_password")
.status(UserStatus.ACTIVE)
.build();
when(userRepository.existsByUsername("testuser")).thenReturn(false);
when(userRepository.existsByEmail("test@example.com")).thenReturn(false);
when(passwordEncoder.encode("password123")).thenReturn("encoded_password");
when(userRepository.save(any(User.class))).thenReturn(savedUser);
// When
UserResponse response = userService.createUser(request);
// Then
assertThat(response).isNotNull();
assertThat(response.getId()).isEqualTo(1L);
assertThat(response.getUsername()).isEqualTo("testuser");
assertThat(response.getEmail()).isEqualTo("test@example.com");
verify(userRepository).existsByUsername("testuser");
verify(userRepository).existsByEmail("test@example.com");
verify(passwordEncoder).encode("password123");
verify(userRepository).save(any(User.class));
}
@Test
@DisplayName("创建用户 - 用户名已存在")
void createUser_UsernameExists() {
// Given
CreateUserRequest request = CreateUserRequest.builder()
.username("existinguser")
.email("test@example.com")
.password("password123")
.build();
when(userRepository.existsByUsername("existinguser")).thenReturn(true);
// When & Then
assertThatThrownBy(() -> userService.createUser(request))
.isInstanceOf(BusinessException.class)
.hasMessage("用户名已存在");
verify(userRepository).existsByUsername("existinguser");
verify(userRepository, never()).save(any(User.class));
}
@Test
@DisplayName("获取用户信息 - 从缓存")
void getUserById_FromCache() {
// Given
Long userId = 1L;
UserResponse cachedUser = UserResponse.builder()
.id(userId)
.username("testuser")
.email("test@example.com")
.build();
when(redisTemplate.opsForValue().get("user:" + userId))
.thenReturn(cachedUser);
// When
UserResponse response = userService.getUserById(userId);
// Then
assertThat(response).isNotNull();
assertThat(response.getId()).isEqualTo(userId);
assertThat(response.getUsername()).isEqualTo("testuser");
verify(redisTemplate.opsForValue()).get("user:" + userId);
verify(userRepository, never()).findById(userId);
}
}
集成测试
UserControllerIntegrationTest.java
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@TestPropertySource(properties = {
"spring.datasource.url=jdbc:h2:mem:testdb",
"spring.jpa.hibernate.ddl-auto=create-drop",
"spring.redis.host=localhost",
"spring.redis.port=6370"
})
@Testcontainers
class UserControllerIntegrationTest {
@Container
static RedisContainer redis = new RedisContainer("redis:6.2")
.withExposedPorts(6379);
@Autowired
private TestRestTemplate restTemplate;
@Autowired
private UserRepository userRepository;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@DynamicPropertySource
static void configureProperties(DynamicPropertyRegistry registry) {
registry.add("spring.redis.host", redis::getHost);
registry.add("spring.redis.port", redis::getFirstMappedPort);
}
@BeforeEach
void setUp() {
userRepository.deleteAll();
redisTemplate.getConnectionFactory().getConnection().flushAll();
}
@Test
@DisplayName("创建用户 - 完整流程")
void createUser_FullFlow() {
// Given
CreateUserRequest request = CreateUserRequest.builder()
.username("testuser")
.email("test@example.com")
.password("password123")
.build();
// When
ResponseEntity<UserResponse> response = restTemplate.postForEntity(
"/api/users", request, UserResponse.class);
// Then
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.CREATED);
assertThat(response.getBody()).isNotNull();
assertThat(response.getBody().getUsername()).isEqualTo("testuser");
assertThat(response.getBody().getEmail()).isEqualTo("test@example.com");
// 验证数据库
Optional<User> savedUser = userRepository.findByUsername("testuser");
assertThat(savedUser).isPresent();
assertThat(savedUser.get().getEmail()).isEqualTo("test@example.com");
}
@Test
@DisplayName("获取用户信息 - 缓存测试")
void getUserById_CacheTest() {
// Given - 创建用户
User user = User.builder()
.username("testuser")
.email("test@example.com")
.password("encoded_password")
.status(UserStatus.ACTIVE)
.build();
User savedUser = userRepository.save(user);
// When - 第一次请求
ResponseEntity<UserResponse> response1 = restTemplate.getForEntity(
"/api/users/" + savedUser.getId(), UserResponse.class);
// Then - 验证响应
assertThat(response1.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(response1.getBody()).isNotNull();
// When - 第二次请求(应该从缓存获取)
ResponseEntity<UserResponse> response2 = restTemplate.getForEntity(
"/api/users/" + savedUser.getId(), UserResponse.class);
// Then - 验证缓存
assertThat(response2.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(response2.getBody()).isEqualTo(response1.getBody());
// 验证Redis中有缓存
Object cachedUser = redisTemplate.opsForValue().get("user:" + savedUser.getId());
assertThat(cachedUser).isNotNull();
}
}
2. 契约测试
Pact 消费者测试
UserServiceConsumerTest.java
@ExtendWith(PactConsumerTestExt.class)
@PactTestFor(providerName = "user-service")
class UserServiceConsumerTest {
@Mock
private UserServiceClient userServiceClient;
@Pact(consumer = "order-service")
public RequestResponsePact getUserPact(PactDslWithProvider builder) {
return builder
.given("user exists with id 1")
.uponReceiving("a request for user with id 1")
.path("/api/users/1")
.method("GET")
.headers(Map.of("Authorization", "Bearer token"))
.willRespondWith()
.status(200)
.headers(Map.of("Content-Type", "application/json"))
.body(LambdaDsl.newJsonBody(body -> body
.numberType("id", 1)
.stringType("username", "testuser")
.stringType("email", "test@example.com")
.stringType("status", "ACTIVE")
).build())
.toPact();
}
@Test
@PactTestFor(pactMethod = "getUserPact")
void testGetUser(MockServer mockServer) {
// Given
UserServiceClient client = new UserServiceClient(mockServer.getUrl());
// When
UserResponse user = client.getUserById(1L, "Bearer token");
// Then
assertThat(user).isNotNull();
assertThat(user.getId()).isEqualTo(1L);
assertThat(user.getUsername()).isEqualTo("testuser");
assertThat(user.getEmail()).isEqualTo("test@example.com");
assertThat(user.getStatus()).isEqualTo("ACTIVE");
}
}
Pact 提供者测试
UserServiceProviderTest.java
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Provider("user-service")
@PactBroker(url = "http://localhost:9292")
class UserServiceProviderTest {
@LocalServerPort
private int port;
@Autowired
private UserRepository userRepository;
@TestTemplate
@ExtendWith(PactVerificationInvocationContextProvider.class)
void pactVerificationTestTemplate(PactVerificationContext context) {
context.verifyInteraction();
}
@BeforeEach
void before(PactVerificationContext context) {
context.setTarget(new HttpTestTarget("localhost", port));
}
@State("user exists with id 1")
void userExistsWithId1() {
User user = User.builder()
.id(1L)
.username("testuser")
.email("test@example.com")
.password("encoded_password")
.status(UserStatus.ACTIVE)
.build();
userRepository.save(user);
}
}
3. 端到端测试
Selenium 测试
UserManagementE2ETest.java
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
@Testcontainers
class UserManagementE2ETest {
@Container
static BrowserWebDriverContainer<?> chrome = new BrowserWebDriverContainer<>()
.withCapabilities(new ChromeOptions());
private WebDriver driver;
@BeforeEach
void setUp() {
driver = chrome.getWebDriver();
}
@Test
@DisplayName("用户注册流程")
void userRegistrationFlow() {
// Given
driver.get("http://localhost:8080/register");
// When - 填写注册表单
WebElement usernameField = driver.findElement(By.id("username"));
WebElement emailField = driver.findElement(By.id("email"));
WebElement passwordField = driver.findElement(By.id("password"));
WebElement submitButton = driver.findElement(By.id("submit"));
usernameField.sendKeys("testuser");
emailField.sendKeys("test@example.com");
passwordField.sendKeys("password123");
submitButton.click();
// Then - 验证注册成功
WebDriverWait wait = new WebDriverWait(driver, Duration.ofSeconds(10));
WebElement successMessage = wait.until(
ExpectedConditions.presenceOfElementLocated(By.className("success-message"))
);
assertThat(successMessage.getText()).contains("注册成功");
// 验证跳转到登录页面
wait.until(ExpectedConditions.urlContains("/login"));
assertThat(driver.getCurrentUrl()).contains("/login");
}
@Test
@DisplayName("用户登录流程")
void userLoginFlow() {
// Given - 先创建用户
createTestUser();
driver.get("http://localhost:8080/login");
// When - 登录
WebElement usernameField = driver.findElement(By.id("username"));
WebElement passwordField = driver.findElement(By.id("password"));
WebElement loginButton = driver.findElement(By.id("login"));
usernameField.sendKeys("testuser");
passwordField.sendKeys("password123");
loginButton.click();
// Then - 验证登录成功
WebDriverWait wait = new WebDriverWait(driver, Duration.ofSeconds(10));
wait.until(ExpectedConditions.urlContains("/dashboard"));
WebElement welcomeMessage = driver.findElement(By.className("welcome-message"));
assertThat(welcomeMessage.getText()).contains("欢迎, testuser");
}
private void createTestUser() {
// 通过API创建测试用户
RestTemplate restTemplate = new RestTemplate();
CreateUserRequest request = CreateUserRequest.builder()
.username("testuser")
.email("test@example.com")
.password("password123")
.build();
restTemplate.postForEntity(
"http://localhost:8080/api/users", request, UserResponse.class);
}
}
11.3 性能优化策略
1. 缓存策略
多级缓存实现
MultiLevelCacheManager.java
@Component
@Slf4j
public class MultiLevelCacheManager {
private final Cache<String, Object> localCache;
private final RedisTemplate<String, Object> redisTemplate;
private final CacheMetrics cacheMetrics;
public MultiLevelCacheManager(RedisTemplate<String, Object> redisTemplate,
CacheMetrics cacheMetrics) {
this.redisTemplate = redisTemplate;
this.cacheMetrics = cacheMetrics;
this.localCache = Caffeine.newBuilder()
.maximumSize(1000)
.expireAfterWrite(Duration.ofMinutes(5))
.recordStats()
.removalListener((key, value, cause) -> {
log.debug("Local cache eviction: key={}, cause={}", key, cause);
cacheMetrics.recordEviction("local", cause.toString());
})
.build();
}
@SuppressWarnings("unchecked")
public <T> Optional<T> get(String key, Class<T> type) {
Timer.Sample sample = Timer.start();
try {
// 1. 尝试从本地缓存获取
Object localValue = localCache.getIfPresent(key);
if (localValue != null) {
cacheMetrics.recordHit("local");
log.debug("Cache hit from local: key={}", key);
return Optional.of((T) localValue);
}
cacheMetrics.recordMiss("local");
// 2. 尝试从Redis获取
Object redisValue = redisTemplate.opsForValue().get(key);
if (redisValue != null) {
// 回填本地缓存
localCache.put(key, redisValue);
cacheMetrics.recordHit("redis");
log.debug("Cache hit from Redis: key={}", key);
return Optional.of((T) redisValue);
}
cacheMetrics.recordMiss("redis");
log.debug("Cache miss: key={}", key);
return Optional.empty();
} finally {
sample.stop(Timer.builder("cache.get")
.tag("key", key)
.register(Metrics.globalRegistry));
}
}
public void put(String key, Object value, Duration ttl) {
// 存储到本地缓存
localCache.put(key, value);
// 存储到Redis
redisTemplate.opsForValue().set(key, value, ttl);
log.debug("Cache put: key={}, ttl={}", key, ttl);
cacheMetrics.recordPut("multi-level");
}
public void evict(String key) {
localCache.invalidate(key);
redisTemplate.delete(key);
log.debug("Cache evict: key={}", key);
cacheMetrics.recordEviction("multi-level", "manual");
}
public void evictPattern(String pattern) {
// 清除本地缓存中匹配的键
localCache.asMap().keySet().stream()
.filter(key -> key.matches(pattern.replace("*", ".*")))
.forEach(localCache::invalidate);
// 清除Redis中匹配的键
Set<String> keys = redisTemplate.keys(pattern);
if (keys != null && !keys.isEmpty()) {
redisTemplate.delete(keys);
}
log.debug("Cache evict pattern: pattern={}", pattern);
}
@EventListener
public void handleCacheInvalidationEvent(CacheInvalidationEvent event) {
if (event.isPattern()) {
evictPattern(event.getKey());
} else {
evict(event.getKey());
}
}
@Scheduled(fixedRate = 60000) // 每分钟执行一次
public void reportCacheStats() {
CacheStats stats = localCache.stats();
Gauge.builder("cache.local.size")
.register(Metrics.globalRegistry, localCache, cache -> cache.estimatedSize());
Gauge.builder("cache.local.hit_rate")
.register(Metrics.globalRegistry, stats, CacheStats::hitRate);
Gauge.builder("cache.local.miss_rate")
.register(Metrics.globalRegistry, stats, CacheStats::missRate);
log.info("Local cache stats: size={}, hitRate={}, missRate={}",
localCache.estimatedSize(), stats.hitRate(), stats.missRate());
}
}
缓存预热策略
CacheWarmupService.java
@Service
@Slf4j
public class CacheWarmupService {
private final UserService userService;
private final ProductService productService;
private final MultiLevelCacheManager cacheManager;
private final ApplicationEventPublisher eventPublisher;
@EventListener(ApplicationReadyEvent.class)
public void warmupCache() {
log.info("Starting cache warmup...");
CompletableFuture.allOf(
CompletableFuture.runAsync(this::warmupUserCache),
CompletableFuture.runAsync(this::warmupProductCache),
CompletableFuture.runAsync(this::warmupConfigCache)
).thenRun(() -> {
log.info("Cache warmup completed");
eventPublisher.publishEvent(new CacheWarmupCompletedEvent());
}).exceptionally(throwable -> {
log.error("Cache warmup failed", throwable);
return null;
});
}
private void warmupUserCache() {
try {
log.info("Warming up user cache...");
// 预热活跃用户数据
List<Long> activeUserIds = userService.getActiveUserIds(1000);
activeUserIds.parallelStream().forEach(userId -> {
try {
UserResponse user = userService.getUserById(userId);
cacheManager.put("user:" + userId, user, Duration.ofHours(1));
} catch (Exception e) {
log.warn("Failed to warmup user cache for userId: {}", userId, e);
}
});
log.info("User cache warmup completed: {} users", activeUserIds.size());
} catch (Exception e) {
log.error("User cache warmup failed", e);
}
}
private void warmupProductCache() {
try {
log.info("Warming up product cache...");
// 预热热门商品数据
List<Long> popularProductIds = productService.getPopularProductIds(500);
popularProductIds.parallelStream().forEach(productId -> {
try {
ProductResponse product = productService.getProductById(productId);
cacheManager.put("product:" + productId, product, Duration.ofHours(2));
} catch (Exception e) {
log.warn("Failed to warmup product cache for productId: {}", productId, e);
}
});
log.info("Product cache warmup completed: {} products", popularProductIds.size());
} catch (Exception e) {
log.error("Product cache warmup failed", e);
}
}
private void warmupConfigCache() {
try {
log.info("Warming up config cache...");
// 预热系统配置
Map<String, String> configs = Map.of(
"system.maintenance.enabled", "false",
"feature.new_ui.enabled", "true",
"rate_limit.default", "1000"
);
configs.forEach((key, value) -> {
cacheManager.put("config:" + key, value, Duration.ofMinutes(30));
});
log.info("Config cache warmup completed: {} configs", configs.size());
} catch (Exception e) {
log.error("Config cache warmup failed", e);
}
}
}
2. 数据库优化
读写分离配置
DatabaseConfiguration.java
@Configuration
@EnableJpaRepositories(basePackages = "com.example.repository")
public class DatabaseConfiguration {
@Bean
@Primary
@ConfigurationProperties("spring.datasource.master")
public DataSource masterDataSource() {
return DataSourceBuilder.create()
.type(HikariDataSource.class)
.build();
}
@Bean
@ConfigurationProperties("spring.datasource.slave")
public DataSource slaveDataSource() {
return DataSourceBuilder.create()
.type(HikariDataSource.class)
.build();
}
@Bean
public DataSource routingDataSource() {
RoutingDataSource routingDataSource = new RoutingDataSource();
Map<Object, Object> dataSourceMap = new HashMap<>();
dataSourceMap.put("master", masterDataSource());
dataSourceMap.put("slave", slaveDataSource());
routingDataSource.setTargetDataSources(dataSourceMap);
routingDataSource.setDefaultTargetDataSource(masterDataSource());
return routingDataSource;
}
@Bean
public EntityManagerFactory entityManagerFactory() {
LocalContainerEntityManagerFactoryBean factory =
new LocalContainerEntityManagerFactoryBean();
factory.setDataSource(routingDataSource());
factory.setPackagesToScan("com.example.entity");
factory.setJpaVendorAdapter(new HibernateJpaVendorAdapter());
Properties jpaProperties = new Properties();
jpaProperties.setProperty("hibernate.dialect", "org.hibernate.dialect.MySQL8Dialect");
jpaProperties.setProperty("hibernate.show_sql", "false");
jpaProperties.setProperty("hibernate.format_sql", "true");
jpaProperties.setProperty("hibernate.jdbc.batch_size", "50");
jpaProperties.setProperty("hibernate.order_inserts", "true");
jpaProperties.setProperty("hibernate.order_updates", "true");
jpaProperties.setProperty("hibernate.jdbc.batch_versioned_data", "true");
factory.setJpaProperties(jpaProperties);
factory.afterPropertiesSet();
return factory.getObject();
}
@Bean
public PlatformTransactionManager transactionManager() {
JpaTransactionManager transactionManager = new JpaTransactionManager();
transactionManager.setEntityManagerFactory(entityManagerFactory());
return transactionManager;
}
}
动态数据源路由
RoutingDataSource.java
public class RoutingDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DataSourceContextHolder.getDataSourceType();
}
}
@Component
public class DataSourceContextHolder {
private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>();
public static void setDataSourceType(String dataSourceType) {
CONTEXT_HOLDER.set(dataSourceType);
}
public static String getDataSourceType() {
return CONTEXT_HOLDER.get();
}
public static void clearDataSourceType() {
CONTEXT_HOLDER.remove();
}
}
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ReadOnly {
}
@Aspect
@Component
@Order(1)
public class DataSourceAspect {
@Before("@annotation(readOnly)")
public void setReadDataSourceType(ReadOnly readOnly) {
DataSourceContextHolder.setDataSourceType("slave");
}
@Before("@annotation(org.springframework.transaction.annotation.Transactional)")
public void setWriteDataSourceType() {
if (DataSourceContextHolder.getDataSourceType() == null) {
DataSourceContextHolder.setDataSourceType("master");
}
}
@After("@annotation(readOnly) || @annotation(org.springframework.transaction.annotation.Transactional)")
public void clearDataSourceType() {
DataSourceContextHolder.clearDataSourceType();
}
}
3. 异步处理优化
异步任务配置
AsyncConfiguration.java
@Configuration
@EnableAsync
public class AsyncConfiguration implements AsyncConfigurer {
@Bean(name = "taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
// 最大线程数
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
// 队列容量
executor.setQueueCapacity(500);
// 线程名前缀
executor.setThreadNamePrefix("async-task-");
// 拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 线程空闲时间
executor.setKeepAliveSeconds(60);
// 等待任务完成后关闭
executor.setWaitForTasksToCompleteOnShutdown(true);
// 等待时间
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
@Bean(name = "ioTaskExecutor")
public ThreadPoolTaskExecutor ioTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// IO密集型任务使用更多线程
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("io-task-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setKeepAliveSeconds(60);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
@Override
public Executor getAsyncExecutor() {
return taskExecutor();
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new CustomAsyncExceptionHandler();
}
}
@Component
@Slf4j
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable throwable, Method method, Object... objects) {
log.error("Async method execution failed: method={}, args={}",
method.getName(), Arrays.toString(objects), throwable);
// 发送告警
// alertService.sendAlert("Async task failed", throwable.getMessage());
}
}
11.4 微服务治理
1. 服务版本管理
版本控制策略
VersionController.java
@RestController
@RequestMapping("/api/v1/version")
@Api(tags = "版本管理")
public class VersionController {
private final VersionService versionService;
@GetMapping("/info")
@ApiOperation("获取服务版本信息")
public ResponseEntity<VersionInfo> getVersionInfo() {
VersionInfo versionInfo = versionService.getCurrentVersionInfo();
return ResponseEntity.ok(versionInfo);
}
@GetMapping("/compatibility")
@ApiOperation("检查版本兼容性")
public ResponseEntity<CompatibilityResult> checkCompatibility(
@RequestParam String targetVersion) {
CompatibilityResult result = versionService.checkCompatibility(targetVersion);
return ResponseEntity.ok(result);
}
@PostMapping("/migrate")
@ApiOperation("执行版本迁移")
public ResponseEntity<MigrationResult> migrateVersion(
@RequestBody MigrationRequest request) {
MigrationResult result = versionService.migrateToVersion(request);
return ResponseEntity.ok(result);
}
}
@Service
@Slf4j
public class VersionService {
private final VersionRepository versionRepository;
private final MigrationExecutor migrationExecutor;
@Value("${app.version}")
private String currentVersion;
@Value("${app.build.time}")
private String buildTime;
@Value("${app.git.commit}")
private String gitCommit;
public VersionInfo getCurrentVersionInfo() {
return VersionInfo.builder()
.version(currentVersion)
.buildTime(buildTime)
.gitCommit(gitCommit)
.supportedApiVersions(getSupportedApiVersions())
.deprecatedFeatures(getDeprecatedFeatures())
.build();
}
public CompatibilityResult checkCompatibility(String targetVersion) {
try {
Version current = Version.parse(currentVersion);
Version target = Version.parse(targetVersion);
boolean isCompatible = isVersionCompatible(current, target);
List<String> warnings = getCompatibilityWarnings(current, target);
List<String> breakingChanges = getBreakingChanges(current, target);
return CompatibilityResult.builder()
.compatible(isCompatible)
.warnings(warnings)
.breakingChanges(breakingChanges)
.migrationRequired(!breakingChanges.isEmpty())
.build();
} catch (Exception e) {
log.error("Failed to check version compatibility", e);
throw new BusinessException("版本兼容性检查失败");
}
}
@Transactional
public MigrationResult migrateToVersion(MigrationRequest request) {
try {
log.info("Starting migration from {} to {}", currentVersion, request.getTargetVersion());
// 1. 验证迁移前置条件
validateMigrationPreconditions(request);
// 2. 创建迁移记录
Migration migration = createMigrationRecord(request);
// 3. 执行迁移步骤
List<MigrationStep> steps = migrationExecutor.executeMigration(migration);
// 4. 验证迁移结果
validateMigrationResult(migration);
// 5. 更新版本信息
updateVersionInfo(request.getTargetVersion());
log.info("Migration completed successfully");
return MigrationResult.builder()
.success(true)
.migrationId(migration.getId())
.executedSteps(steps)
.duration(migration.getDuration())
.build();
} catch (Exception e) {
log.error("Migration failed", e);
// 回滚迁移
rollbackMigration(request);
throw new BusinessException("版本迁移失败: " + e.getMessage());
}
}
private boolean isVersionCompatible(Version current, Version target) {
// 主版本号不同时不兼容
if (current.getMajor() != target.getMajor()) {
return false;
}
// 次版本号向后兼容
return target.getMinor() >= current.getMinor();
}
private List<String> getSupportedApiVersions() {
return Arrays.asList("v1", "v2");
}
private List<String> getDeprecatedFeatures() {
return Arrays.asList(
"Legacy user authentication (将在v3.0中移除)",
"Old order status API (将在v2.5中移除)"
);
}
}
2. 灰度发布
灰度发布控制器
CanaryDeploymentController.java
@RestController
@RequestMapping("/api/v1/canary")
@Api(tags = "灰度发布")
public class CanaryDeploymentController {
private final CanaryDeploymentService canaryService;
@PostMapping("/start")
@ApiOperation("开始灰度发布")
public ResponseEntity<CanaryDeployment> startCanaryDeployment(
@RequestBody @Valid StartCanaryRequest request) {
CanaryDeployment deployment = canaryService.startCanaryDeployment(request);
return ResponseEntity.ok(deployment);
}
@GetMapping("/{deploymentId}/status")
@ApiOperation("获取灰度发布状态")
public ResponseEntity<CanaryStatus> getCanaryStatus(
@PathVariable String deploymentId) {
CanaryStatus status = canaryService.getCanaryStatus(deploymentId);
return ResponseEntity.ok(status);
}
@PostMapping("/{deploymentId}/promote")
@ApiOperation("推广灰度版本")
public ResponseEntity<Void> promoteCanary(
@PathVariable String deploymentId,
@RequestBody @Valid PromoteCanaryRequest request) {
canaryService.promoteCanary(deploymentId, request);
return ResponseEntity.ok().build();
}
@PostMapping("/{deploymentId}/rollback")
@ApiOperation("回滚灰度版本")
public ResponseEntity<Void> rollbackCanary(
@PathVariable String deploymentId) {
canaryService.rollbackCanary(deploymentId);
return ResponseEntity.ok().build();
}
}
@Service
@Slf4j
public class CanaryDeploymentService {
private final CanaryDeploymentRepository deploymentRepository;
private final TrafficSplitter trafficSplitter;
private final MetricsCollector metricsCollector;
private final AlertService alertService;
@Transactional
public CanaryDeployment startCanaryDeployment(StartCanaryRequest request) {
log.info("Starting canary deployment: {}", request);
// 1. 验证部署条件
validateDeploymentConditions(request);
// 2. 创建灰度部署记录
CanaryDeployment deployment = CanaryDeployment.builder()
.id(UUID.randomUUID().toString())
.serviceName(request.getServiceName())
.currentVersion(request.getCurrentVersion())
.canaryVersion(request.getCanaryVersion())
.trafficPercentage(request.getInitialTrafficPercentage())
.status(CanaryStatus.STARTING)
.strategy(request.getStrategy())
.createdAt(Instant.now())
.build();
deploymentRepository.save(deployment);
// 3. 部署灰度版本
deployCanaryVersion(deployment);
// 4. 配置流量分割
trafficSplitter.configureTrafficSplit(
deployment.getServiceName(),
deployment.getTrafficPercentage()
);
// 5. 启动监控
startCanaryMonitoring(deployment);
deployment.setStatus(CanaryStatus.RUNNING);
deploymentRepository.save(deployment);
log.info("Canary deployment started: {}", deployment.getId());
return deployment;
}
public CanaryStatus getCanaryStatus(String deploymentId) {
CanaryDeployment deployment = getDeploymentById(deploymentId);
// 收集当前指标
CanaryMetrics metrics = metricsCollector.collectCanaryMetrics(
deployment.getServiceName(),
deployment.getCanaryVersion()
);
return CanaryStatus.builder()
.deploymentId(deploymentId)
.status(deployment.getStatus())
.trafficPercentage(deployment.getTrafficPercentage())
.metrics(metrics)
.healthChecks(getHealthChecks(deployment))
.duration(Duration.between(deployment.getCreatedAt(), Instant.now()))
.build();
}
@Transactional
public void promoteCanary(String deploymentId, PromoteCanaryRequest request) {
CanaryDeployment deployment = getDeploymentById(deploymentId);
if (deployment.getStatus() != CanaryStatus.RUNNING) {
throw new BusinessException("只能推广运行中的灰度部署");
}
log.info("Promoting canary deployment: {}", deploymentId);
try {
// 1. 逐步增加流量
promoteTrafficGradually(deployment, request.getTargetPercentage());
// 2. 如果是完全推广,替换生产版本
if (request.getTargetPercentage() == 100) {
replaceProductionVersion(deployment);
deployment.setStatus(CanaryStatus.PROMOTED);
} else {
deployment.setTrafficPercentage(request.getTargetPercentage());
}
deploymentRepository.save(deployment);
log.info("Canary promotion completed: {}", deploymentId);
} catch (Exception e) {
log.error("Canary promotion failed: {}", deploymentId, e);
alertService.sendAlert("Canary promotion failed", e.getMessage());
throw new BusinessException("灰度推广失败: " + e.getMessage());
}
}
@Transactional
public void rollbackCanary(String deploymentId) {
CanaryDeployment deployment = getDeploymentById(deploymentId);
log.info("Rolling back canary deployment: {}", deploymentId);
try {
// 1. 停止流量到灰度版本
trafficSplitter.stopCanaryTraffic(deployment.getServiceName());
// 2. 删除灰度版本
removeCanaryVersion(deployment);
// 3. 停止监控
stopCanaryMonitoring(deployment);
deployment.setStatus(CanaryStatus.ROLLED_BACK);
deployment.setRolledBackAt(Instant.now());
deploymentRepository.save(deployment);
log.info("Canary rollback completed: {}", deploymentId);
} catch (Exception e) {
log.error("Canary rollback failed: {}", deploymentId, e);
throw new BusinessException("灰度回滚失败: " + e.getMessage());
}
}
@Scheduled(fixedRate = 30000) // 每30秒检查一次
public void monitorCanaryDeployments() {
List<CanaryDeployment> runningDeployments =
deploymentRepository.findByStatus(CanaryStatus.RUNNING);
for (CanaryDeployment deployment : runningDeployments) {
try {
checkCanaryHealth(deployment);
} catch (Exception e) {
log.error("Failed to check canary health: {}", deployment.getId(), e);
}
}
}
private void checkCanaryHealth(CanaryDeployment deployment) {
CanaryMetrics metrics = metricsCollector.collectCanaryMetrics(
deployment.getServiceName(),
deployment.getCanaryVersion()
);
// 检查错误率
if (metrics.getErrorRate() > deployment.getStrategy().getMaxErrorRate()) {
log.warn("Canary error rate too high: {} > {}",
metrics.getErrorRate(), deployment.getStrategy().getMaxErrorRate());
alertService.sendAlert(
"Canary error rate exceeded threshold",
String.format("Deployment: %s, Error rate: %.2f%%",
deployment.getId(), metrics.getErrorRate() * 100)
);
// 自动回滚
if (deployment.getStrategy().isAutoRollback()) {
rollbackCanary(deployment.getId());
}
}
// 检查响应时间
if (metrics.getAvgResponseTime() > deployment.getStrategy().getMaxResponseTime()) {
log.warn("Canary response time too high: {} > {}",
metrics.getAvgResponseTime(), deployment.getStrategy().getMaxResponseTime());
alertService.sendAlert(
"Canary response time exceeded threshold",
String.format("Deployment: %s, Response time: %dms",
deployment.getId(), metrics.getAvgResponseTime())
);
}
}
}
总结
本章深入探讨了微服务的高级主题与最佳实践,涵盖了以下核心内容:
核心概念
- 服务网格 - Istio架构、流量管理、安全策略
- 微服务测试 - 测试金字塔、契约测试、端到端测试
- 性能优化 - 多级缓存、数据库优化、异步处理
- 微服务治理 - 版本管理、灰度发布、流量控制
最佳实践
- 架构设计 - 服务拆分原则、领域驱动设计、事件驱动架构
- 开发实践 - 代码规范、测试策略、持续集成
- 运维实践 - 监控告警、日志管理、故障处理
- 治理实践 - 服务治理、版本控制、发布策略
注意事项
- 复杂性管理 - 避免过度拆分、合理设计边界
- 性能考虑 - 网络延迟、数据一致性、缓存策略
- 安全防护 - 服务间认证、数据加密、访问控制
- 运维成本 - 监控复杂度、部署复杂度、人员技能要求
扩展方向
- 云原生技术 - Kubernetes、服务网格、Serverless
- 新兴技术 - 边缘计算、AI/ML集成、区块链
- 行业实践 - 金融级微服务、电商微服务、物联网微服务
通过本教程的学习,你已经全面掌握了Spring Cloud微服务开发的核心技术和最佳实践。希望这些知识能够帮助你在实际项目中构建高质量、高可用的微服务系统。
微服务架构是一个不断发展的领域,建议持续关注新技术和最佳实践,在实践中不断优化和改进你的微服务系统。