本章目标
- 理解微服务架构的核心概念和优势
- 掌握Spring Cloud生态系统的使用
- 学会服务注册与发现的实现
- 了解配置中心的设计和使用
- 掌握API网关的配置和管理
- 学习分布式追踪和监控
- 了解容器化部署的最佳实践
1. 微服务架构概述
1.1 微服务架构基础
// 微服务架构演示
public class MicroserviceArchitectureDemo {
public static void main(String[] args) {
System.out.println("=== 微服务架构特点 ===");
// 1. 服务拆分
demonstrateServiceDecomposition();
// 2. 独立部署
demonstrateIndependentDeployment();
// 3. 技术多样性
demonstrateTechnologyDiversity();
// 4. 数据隔离
demonstrateDataIsolation();
}
private static void demonstrateServiceDecomposition() {
System.out.println("\n1. 服务拆分:");
// 单体应用 vs 微服务
System.out.println("单体应用: 所有功能在一个应用中");
System.out.println("微服务: 按业务领域拆分为多个独立服务");
// 服务边界定义
System.out.println("\n服务边界定义原则:");
System.out.println("- 业务能力边界");
System.out.println("- 数据一致性边界");
System.out.println("- 团队组织边界");
System.out.println("- 技术边界");
}
private static void demonstrateIndependentDeployment() {
System.out.println("\n2. 独立部署:");
System.out.println("- 每个服务可以独立开发、测试、部署");
System.out.println("- 服务版本管理独立");
System.out.println("- 故障隔离");
System.out.println("- 扩展性更好");
}
private static void demonstrateTechnologyDiversity() {
System.out.println("\n3. 技术多样性:");
System.out.println("- 不同服务可以使用不同技术栈");
System.out.println("- 用户服务: Spring Boot + MySQL");
System.out.println("- 订单服务: Node.js + MongoDB");
System.out.println("- 推荐服务: Python + Redis");
}
private static void demonstrateDataIsolation() {
System.out.println("\n4. 数据隔离:");
System.out.println("- 每个服务拥有自己的数据库");
System.out.println("- 通过API进行数据交互");
System.out.println("- 避免数据库层面的耦合");
}
}
// 微服务设计原则
public class MicroserviceDesignPrinciples {
// 1. 单一职责原则
public interface UserService {
User createUser(CreateUserRequest request);
User getUserById(Long id);
User updateUser(Long id, UpdateUserRequest request);
void deleteUser(Long id);
}
// 2. 服务自治
@Service
public class UserServiceImpl implements UserService {
private final UserRepository userRepository;
private final UserEventPublisher eventPublisher;
public UserServiceImpl(UserRepository userRepository,
UserEventPublisher eventPublisher) {
this.userRepository = userRepository;
this.eventPublisher = eventPublisher;
}
@Override
public User createUser(CreateUserRequest request) {
// 验证输入
validateCreateUserRequest(request);
// 创建用户
User user = new User();
user.setName(request.getName());
user.setEmail(request.getEmail());
user.setCreatedAt(LocalDateTime.now());
User savedUser = userRepository.save(user);
// 发布事件
eventPublisher.publishUserCreated(savedUser);
return savedUser;
}
@Override
public User getUserById(Long id) {
return userRepository.findById(id)
.orElseThrow(() -> new UserNotFoundException("用户不存在: " + id));
}
@Override
public User updateUser(Long id, UpdateUserRequest request) {
User user = getUserById(id);
if (request.getName() != null) {
user.setName(request.getName());
}
if (request.getEmail() != null) {
user.setEmail(request.getEmail());
}
user.setUpdatedAt(LocalDateTime.now());
User updatedUser = userRepository.save(user);
// 发布事件
eventPublisher.publishUserUpdated(updatedUser);
return updatedUser;
}
@Override
public void deleteUser(Long id) {
User user = getUserById(id);
userRepository.delete(user);
// 发布事件
eventPublisher.publishUserDeleted(user);
}
private void validateCreateUserRequest(CreateUserRequest request) {
if (request.getName() == null || request.getName().trim().isEmpty()) {
throw new IllegalArgumentException("用户名不能为空");
}
if (request.getEmail() == null || !isValidEmail(request.getEmail())) {
throw new IllegalArgumentException("邮箱格式不正确");
}
if (userRepository.existsByEmail(request.getEmail())) {
throw new DuplicateEmailException("邮箱已存在: " + request.getEmail());
}
}
private boolean isValidEmail(String email) {
return email.matches("^[A-Za-z0-9+_.-]+@([A-Za-z0-9.-]+\\.[A-Za-z]{2,})$");
}
}
// 3. 去中心化治理
@Component
public class UserEventPublisher {
private final ApplicationEventPublisher eventPublisher;
public UserEventPublisher(ApplicationEventPublisher eventPublisher) {
this.eventPublisher = eventPublisher;
}
public void publishUserCreated(User user) {
UserCreatedEvent event = new UserCreatedEvent(user.getId(), user.getName(), user.getEmail());
eventPublisher.publishEvent(event);
}
public void publishUserUpdated(User user) {
UserUpdatedEvent event = new UserUpdatedEvent(user.getId(), user.getName(), user.getEmail());
eventPublisher.publishEvent(event);
}
public void publishUserDeleted(User user) {
UserDeletedEvent event = new UserDeletedEvent(user.getId(), user.getName());
eventPublisher.publishEvent(event);
}
}
}
// 事件定义
class UserCreatedEvent {
private final Long userId;
private final String userName;
private final String userEmail;
private final LocalDateTime timestamp;
public UserCreatedEvent(Long userId, String userName, String userEmail) {
this.userId = userId;
this.userName = userName;
this.userEmail = userEmail;
this.timestamp = LocalDateTime.now();
}
// Getters
public Long getUserId() { return userId; }
public String getUserName() { return userName; }
public String getUserEmail() { return userEmail; }
public LocalDateTime getTimestamp() { return timestamp; }
}
class UserUpdatedEvent {
private final Long userId;
private final String userName;
private final String userEmail;
private final LocalDateTime timestamp;
public UserUpdatedEvent(Long userId, String userName, String userEmail) {
this.userId = userId;
this.userName = userName;
this.userEmail = userEmail;
this.timestamp = LocalDateTime.now();
}
// Getters
public Long getUserId() { return userId; }
public String getUserName() { return userName; }
public String getUserEmail() { return userEmail; }
public LocalDateTime getTimestamp() { return timestamp; }
}
class UserDeletedEvent {
private final Long userId;
private final String userName;
private final LocalDateTime timestamp;
public UserDeletedEvent(Long userId, String userName) {
this.userId = userId;
this.userName = userName;
this.timestamp = LocalDateTime.now();
}
// Getters
public Long getUserId() { return userId; }
public String getUserName() { return userName; }
public LocalDateTime getTimestamp() { return timestamp; }
}
1.2 微服务架构挑战
// 微服务架构挑战演示
public class MicroserviceChallengesDemo {
public static void main(String[] args) {
System.out.println("=== 微服务架构挑战 ===");
// 1. 分布式系统复杂性
demonstrateDistributedComplexity();
// 2. 服务间通信
demonstrateServiceCommunication();
// 3. 数据一致性
demonstrateDataConsistency();
// 4. 服务治理
demonstrateServiceGovernance();
}
private static void demonstrateDistributedComplexity() {
System.out.println("\n1. 分布式系统复杂性:");
System.out.println("- 网络延迟和故障");
System.out.println("- 服务发现和负载均衡");
System.out.println("- 配置管理");
System.out.println("- 监控和追踪");
}
private static void demonstrateServiceCommunication() {
System.out.println("\n2. 服务间通信:");
System.out.println("- 同步通信: REST API, gRPC");
System.out.println("- 异步通信: 消息队列, 事件驱动");
System.out.println("- 通信协议选择");
System.out.println("- 服务契约管理");
}
private static void demonstrateDataConsistency() {
System.out.println("\n3. 数据一致性:");
System.out.println("- 分布式事务");
System.out.println("- 最终一致性");
System.out.println("- Saga模式");
System.out.println("- 事件溯源");
}
private static void demonstrateServiceGovernance() {
System.out.println("\n4. 服务治理:");
System.out.println("- 服务版本管理");
System.out.println("- API网关");
System.out.println("- 熔断和限流");
System.out.println("- 安全认证");
}
}
// 分布式事务处理
@Service
public class DistributedTransactionService {
private final OrderService orderService;
private final PaymentService paymentService;
private final InventoryService inventoryService;
private final NotificationService notificationService;
public DistributedTransactionService(OrderService orderService,
PaymentService paymentService,
InventoryService inventoryService,
NotificationService notificationService) {
this.orderService = orderService;
this.paymentService = paymentService;
this.inventoryService = inventoryService;
this.notificationService = notificationService;
}
// Saga模式实现
public void processOrder(CreateOrderRequest request) {
String sagaId = UUID.randomUUID().toString();
try {
// 1. 创建订单
Order order = orderService.createOrder(request);
// 2. 扣减库存
inventoryService.reserveInventory(order.getProductId(), order.getQuantity(), sagaId);
// 3. 处理支付
Payment payment = paymentService.processPayment(order.getId(), order.getAmount(), sagaId);
// 4. 确认订单
orderService.confirmOrder(order.getId());
// 5. 发送通知
notificationService.sendOrderConfirmation(order.getUserId(), order.getId());
} catch (Exception e) {
// 补偿操作
compensateOrder(sagaId, e);
throw new OrderProcessingException("订单处理失败", e);
}
}
private void compensateOrder(String sagaId, Exception originalException) {
try {
// 回滚库存
inventoryService.releaseInventory(sagaId);
// 回滚支付
paymentService.refundPayment(sagaId);
// 取消订单
orderService.cancelOrder(sagaId);
} catch (Exception compensationException) {
// 记录补偿失败,需要人工介入
logCompensationFailure(sagaId, originalException, compensationException);
}
}
private void logCompensationFailure(String sagaId, Exception originalException,
Exception compensationException) {
// 记录到专门的补偿失败表,供人工处理
System.err.println("补偿操作失败 - SagaId: " + sagaId);
System.err.println("原始异常: " + originalException.getMessage());
System.err.println("补偿异常: " + compensationException.getMessage());
}
}
// 服务接口定义
interface OrderService {
Order createOrder(CreateOrderRequest request);
void confirmOrder(Long orderId);
void cancelOrder(String sagaId);
}
interface PaymentService {
Payment processPayment(Long orderId, BigDecimal amount, String sagaId);
void refundPayment(String sagaId);
}
interface InventoryService {
void reserveInventory(Long productId, Integer quantity, String sagaId);
void releaseInventory(String sagaId);
}
interface NotificationService {
void sendOrderConfirmation(Long userId, Long orderId);
}
// 异常定义
class OrderProcessingException extends RuntimeException {
public OrderProcessingException(String message, Throwable cause) {
super(message, cause);
}
}
2. Spring Cloud基础
2.1 Spring Cloud生态系统
// Spring Cloud主应用
@SpringBootApplication
@EnableEurekaClient
@EnableCircuitBreaker
@EnableZuulProxy
public class MicroserviceApplication {
public static void main(String[] args) {
SpringApplication.run(MicroserviceApplication.class, args);
}
// 负载均衡配置
@Bean
@LoadBalanced
public RestTemplate restTemplate() {
return new RestTemplate();
}
// 熔断器配置
@Bean
public HystrixCommandAspect hystrixAspect() {
return new HystrixCommandAspect();
}
}
// 用户服务
@RestController
@RequestMapping("/api/users")
public class UserController {
private final UserService userService;
private final OrderServiceClient orderServiceClient;
public UserController(UserService userService, OrderServiceClient orderServiceClient) {
this.userService = userService;
this.orderServiceClient = orderServiceClient;
}
@GetMapping("/{id}")
public ResponseEntity<UserDto> getUser(@PathVariable Long id) {
User user = userService.getUserById(id);
UserDto userDto = convertToDto(user);
return ResponseEntity.ok(userDto);
}
@GetMapping("/{id}/orders")
@HystrixCommand(fallbackMethod = "getUserOrdersFallback")
public ResponseEntity<List<OrderDto>> getUserOrders(@PathVariable Long id) {
List<OrderDto> orders = orderServiceClient.getOrdersByUserId(id);
return ResponseEntity.ok(orders);
}
// 熔断降级方法
public ResponseEntity<List<OrderDto>> getUserOrdersFallback(Long id) {
// 返回缓存数据或默认数据
List<OrderDto> fallbackOrders = Collections.emptyList();
return ResponseEntity.ok(fallbackOrders);
}
@PostMapping
public ResponseEntity<UserDto> createUser(@Valid @RequestBody CreateUserRequest request) {
User user = userService.createUser(request);
UserDto userDto = convertToDto(user);
return ResponseEntity.status(HttpStatus.CREATED).body(userDto);
}
@PutMapping("/{id}")
public ResponseEntity<UserDto> updateUser(@PathVariable Long id,
@Valid @RequestBody UpdateUserRequest request) {
User user = userService.updateUser(id, request);
UserDto userDto = convertToDto(user);
return ResponseEntity.ok(userDto);
}
@DeleteMapping("/{id}")
public ResponseEntity<Void> deleteUser(@PathVariable Long id) {
userService.deleteUser(id);
return ResponseEntity.noContent().build();
}
private UserDto convertToDto(User user) {
UserDto dto = new UserDto();
dto.setId(user.getId());
dto.setName(user.getName());
dto.setEmail(user.getEmail());
dto.setCreatedAt(user.getCreatedAt());
dto.setUpdatedAt(user.getUpdatedAt());
return dto;
}
}
// Feign客户端
@FeignClient(name = "order-service", fallback = OrderServiceClientFallback.class)
public interface OrderServiceClient {
@GetMapping("/api/orders/user/{userId}")
List<OrderDto> getOrdersByUserId(@PathVariable("userId") Long userId);
@PostMapping("/api/orders")
OrderDto createOrder(@RequestBody CreateOrderRequest request);
@GetMapping("/api/orders/{id}")
OrderDto getOrderById(@PathVariable("id") Long id);
}
// Feign客户端降级实现
@Component
public class OrderServiceClientFallback implements OrderServiceClient {
@Override
public List<OrderDto> getOrdersByUserId(Long userId) {
// 返回空列表或缓存数据
return Collections.emptyList();
}
@Override
public OrderDto createOrder(CreateOrderRequest request) {
throw new ServiceUnavailableException("订单服务暂时不可用");
}
@Override
public OrderDto getOrderById(Long id) {
return null;
}
}
// 配置类
@Configuration
public class MicroserviceConfig {
// Feign配置
@Bean
public Logger.Level feignLoggerLevel() {
return Logger.Level.FULL;
}
// 重试配置
@Bean
public Retryer feignRetryer() {
return new Retryer.Default(1000, 2000, 3);
}
// 超时配置
@Bean
public Request.Options feignOptions() {
return new Request.Options(5000, 10000);
}
// Hystrix配置
@Bean
public HystrixCommandProperties.Setter hystrixProperties() {
return HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(5000)
.withCircuitBreakerRequestVolumeThreshold(10)
.withCircuitBreakerErrorThresholdPercentage(50)
.withCircuitBreakerSleepWindowInMilliseconds(10000);
}
}
2.2 服务间通信
// 同步通信示例
@Service
public class UserOrderService {
private final RestTemplate restTemplate;
private final OrderServiceClient orderServiceClient;
public UserOrderService(RestTemplate restTemplate, OrderServiceClient orderServiceClient) {
this.restTemplate = restTemplate;
this.orderServiceClient = orderServiceClient;
}
// 使用RestTemplate
public List<OrderDto> getUserOrdersWithRestTemplate(Long userId) {
String url = "http://order-service/api/orders/user/" + userId;
try {
ResponseEntity<List<OrderDto>> response = restTemplate.exchange(
url,
HttpMethod.GET,
null,
new ParameterizedTypeReference<List<OrderDto>>() {}
);
return response.getBody();
} catch (Exception e) {
// 处理异常
throw new ServiceCommunicationException("获取用户订单失败", e);
}
}
// 使用Feign客户端
public List<OrderDto> getUserOrdersWithFeign(Long userId) {
try {
return orderServiceClient.getOrdersByUserId(userId);
} catch (Exception e) {
// Feign会自动处理降级
throw new ServiceCommunicationException("获取用户订单失败", e);
}
}
// 创建订单
public OrderDto createUserOrder(Long userId, CreateOrderRequest request) {
// 设置用户ID
request.setUserId(userId);
try {
return orderServiceClient.createOrder(request);
} catch (Exception e) {
throw new OrderCreationException("创建订单失败", e);
}
}
}
// 异步通信示例
@Service
public class EventDrivenUserService {
private final UserService userService;
private final ApplicationEventPublisher eventPublisher;
private final RabbitTemplate rabbitTemplate;
public EventDrivenUserService(UserService userService,
ApplicationEventPublisher eventPublisher,
RabbitTemplate rabbitTemplate) {
this.userService = userService;
this.eventPublisher = eventPublisher;
this.rabbitTemplate = rabbitTemplate;
}
public User createUser(CreateUserRequest request) {
// 创建用户
User user = userService.createUser(request);
// 发布本地事件
UserCreatedEvent localEvent = new UserCreatedEvent(user.getId(), user.getName(), user.getEmail());
eventPublisher.publishEvent(localEvent);
// 发布远程事件
UserCreatedMessage message = new UserCreatedMessage(
user.getId(), user.getName(), user.getEmail(), LocalDateTime.now());
rabbitTemplate.convertAndSend("user.exchange", "user.created", message);
return user;
}
// 监听本地事件
@EventListener
public void handleUserCreatedEvent(UserCreatedEvent event) {
// 处理本地业务逻辑
System.out.println("本地处理用户创建事件: " + event.getUserId());
}
// 监听远程消息
@RabbitListener(queues = "user.notification.queue")
public void handleUserCreatedMessage(UserCreatedMessage message) {
// 处理来自其他服务的消息
System.out.println("收到用户创建消息: " + message.getUserId());
// 发送欢迎邮件等
sendWelcomeEmail(message);
}
private void sendWelcomeEmail(UserCreatedMessage message) {
// 发送欢迎邮件的逻辑
System.out.println("发送欢迎邮件给: " + message.getUserEmail());
}
}
// 消息定义
class UserCreatedMessage {
private Long userId;
private String userName;
private String userEmail;
private LocalDateTime timestamp;
public UserCreatedMessage() {}
public UserCreatedMessage(Long userId, String userName, String userEmail, LocalDateTime timestamp) {
this.userId = userId;
this.userName = userName;
this.userEmail = userEmail;
this.timestamp = timestamp;
}
// Getters and Setters
public Long getUserId() { return userId; }
public void setUserId(Long userId) { this.userId = userId; }
public String getUserName() { return userName; }
public void setUserName(String userName) { this.userName = userName; }
public String getUserEmail() { return userEmail; }
public void setUserEmail(String userEmail) { this.userEmail = userEmail; }
public LocalDateTime getTimestamp() { return timestamp; }
public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }
}
// RabbitMQ配置
@Configuration
@EnableRabbit
public class RabbitMQConfig {
// 交换机
@Bean
public TopicExchange userExchange() {
return new TopicExchange("user.exchange");
}
// 队列
@Bean
public Queue userNotificationQueue() {
return QueueBuilder.durable("user.notification.queue").build();
}
@Bean
public Queue userStatisticsQueue() {
return QueueBuilder.durable("user.statistics.queue").build();
}
// 绑定
@Bean
public Binding userNotificationBinding() {
return BindingBuilder.bind(userNotificationQueue())
.to(userExchange())
.with("user.created");
}
@Bean
public Binding userStatisticsBinding() {
return BindingBuilder.bind(userStatisticsQueue())
.to(userExchange())
.with("user.*");
}
// 消息转换器
@Bean
public Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
// RabbitTemplate配置
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(messageConverter());
return template;
}
}
// 异常定义
class ServiceCommunicationException extends RuntimeException {
public ServiceCommunicationException(String message, Throwable cause) {
super(message, cause);
}
}
class OrderCreationException extends RuntimeException {
public OrderCreationException(String message, Throwable cause) {
super(message, cause);
}
}
3. 服务注册与发现
3.1 Eureka服务注册中心
// Eureka服务器配置
@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaServerApplication.class, args);
}
}
// Eureka客户端配置
@SpringBootApplication
@EnableEurekaClient
@EnableDiscoveryClient
public class UserServiceApplication {
public static void main(String[] args) {
SpringApplication.run(UserServiceApplication.class, args);
}
// 服务发现演示
@RestController
public class ServiceDiscoveryController {
private final DiscoveryClient discoveryClient;
private final EurekaClient eurekaClient;
public ServiceDiscoveryController(DiscoveryClient discoveryClient, EurekaClient eurekaClient) {
this.discoveryClient = discoveryClient;
this.eurekaClient = eurekaClient;
}
@GetMapping("/services")
public List<String> getServices() {
return discoveryClient.getServices();
}
@GetMapping("/services/{serviceName}/instances")
public List<ServiceInstance> getServiceInstances(@PathVariable String serviceName) {
return discoveryClient.getInstances(serviceName);
}
@GetMapping("/eureka/services/{serviceName}")
public Application getEurekaApplication(@PathVariable String serviceName) {
return eurekaClient.getApplication(serviceName);
}
@GetMapping("/health")
public Map<String, Object> health() {
Map<String, Object> health = new HashMap<>();
health.put("status", "UP");
health.put("timestamp", LocalDateTime.now());
health.put("service", "user-service");
return health;
}
}
}
// 服务实例信息
@Component
public class ServiceInstanceInfo {
private final DiscoveryClient discoveryClient;
public ServiceInstanceInfo(DiscoveryClient discoveryClient) {
this.discoveryClient = discoveryClient;
}
public ServiceInstance getCurrentInstance() {
List<ServiceInstance> instances = discoveryClient.getInstances("user-service");
return instances.stream()
.filter(instance -> instance.getPort() == getCurrentPort())
.findFirst()
.orElse(null);
}
public List<ServiceInstance> getAvailableInstances(String serviceName) {
return discoveryClient.getInstances(serviceName);
}
public ServiceInstance selectInstance(String serviceName) {
List<ServiceInstance> instances = getAvailableInstances(serviceName);
if (instances.isEmpty()) {
throw new ServiceUnavailableException("没有可用的服务实例: " + serviceName);
}
// 简单的轮询负载均衡
int index = (int) (System.currentTimeMillis() % instances.size());
return instances.get(index);
}
private int getCurrentPort() {
// 获取当前应用端口
return 8080; // 简化实现
}
}
// 自定义负载均衡
@Component
public class CustomLoadBalancer {
private final DiscoveryClient discoveryClient;
private final AtomicInteger counter = new AtomicInteger(0);
public CustomLoadBalancer(DiscoveryClient discoveryClient) {
this.discoveryClient = discoveryClient;
}
// 轮询算法
public ServiceInstance roundRobin(String serviceName) {
List<ServiceInstance> instances = discoveryClient.getInstances(serviceName);
if (instances.isEmpty()) {
return null;
}
int index = counter.getAndIncrement() % instances.size();
return instances.get(index);
}
// 随机算法
public ServiceInstance random(String serviceName) {
List<ServiceInstance> instances = discoveryClient.getInstances(serviceName);
if (instances.isEmpty()) {
return null;
}
Random random = new Random();
int index = random.nextInt(instances.size());
return instances.get(index);
}
// 权重算法
public ServiceInstance weighted(String serviceName) {
List<ServiceInstance> instances = discoveryClient.getInstances(serviceName);
if (instances.isEmpty()) {
return null;
}
// 根据实例的权重进行选择
int totalWeight = instances.stream()
.mapToInt(this::getInstanceWeight)
.sum();
Random random = new Random();
int randomWeight = random.nextInt(totalWeight);
int currentWeight = 0;
for (ServiceInstance instance : instances) {
currentWeight += getInstanceWeight(instance);
if (randomWeight < currentWeight) {
return instance;
}
}
return instances.get(0);
}
private int getInstanceWeight(ServiceInstance instance) {
// 从实例元数据中获取权重,默认为1
String weight = instance.getMetadata().get("weight");
return weight != null ? Integer.parseInt(weight) : 1;
}
}
3.2 Consul服务发现
// Consul配置
@Configuration
@EnableDiscoveryClient
public class ConsulConfig {
@Bean
public ConsulClient consulClient() {
return new ConsulClient("localhost", 8500);
}
// 健康检查配置
@Bean
public NewService consulService() {
NewService service = new NewService();
service.setId("user-service-1");
service.setName("user-service");
service.setAddress("localhost");
service.setPort(8080);
// 健康检查
NewService.Check check = new NewService.Check();
check.setHttp("http://localhost:8080/actuator/health");
check.setInterval("10s");
check.setTimeout("3s");
service.setCheck(check);
return service;
}
}
// Consul服务注册
@Component
public class ConsulServiceRegistry {
private final ConsulClient consulClient;
public ConsulServiceRegistry(ConsulClient consulClient) {
this.consulClient = consulClient;
}
@PostConstruct
public void registerService() {
NewService service = createServiceDefinition();
consulClient.agentServiceRegister(service);
}
@PreDestroy
public void deregisterService() {
consulClient.agentServiceDeregister("user-service-1");
}
private NewService createServiceDefinition() {
NewService service = new NewService();
service.setId("user-service-1");
service.setName("user-service");
service.setAddress("localhost");
service.setPort(8080);
// 添加标签
service.setTags(Arrays.asList("user", "microservice", "v1"));
// 健康检查
NewService.Check check = new NewService.Check();
check.setHttp("http://localhost:8080/actuator/health");
check.setInterval("10s");
check.setTimeout("3s");
service.setCheck(check);
return service;
}
}
// Consul服务发现
@Service
public class ConsulServiceDiscovery {
private final ConsulClient consulClient;
public ConsulServiceDiscovery(ConsulClient consulClient) {
this.consulClient = consulClient;
}
public List<ServiceHealth> getHealthyServices(String serviceName) {
Response<List<ServiceHealth>> response = consulClient.getHealthServices(
serviceName, true, QueryParams.DEFAULT);
return response.getValue();
}
public ServiceHealth selectHealthyService(String serviceName) {
List<ServiceHealth> services = getHealthyServices(serviceName);
if (services.isEmpty()) {
throw new ServiceUnavailableException("没有健康的服务实例: " + serviceName);
}
// 简单的随机选择
Random random = new Random();
return services.get(random.nextInt(services.size()));
}
public String getServiceUrl(String serviceName) {
ServiceHealth service = selectHealthyService(serviceName);
Service.Port port = service.getService().getPort();
String address = service.getService().getAddress();
return String.format("http://%s:%d", address, port);
}
}
4. 配置中心
4.1 Spring Cloud Config
// Config Server配置
@SpringBootApplication
@EnableConfigServer
public class ConfigServerApplication {
public static void main(String[] args) {
SpringApplication.run(ConfigServerApplication.class, args);
}
}
// Config Client配置
@SpringBootApplication
@EnableEurekaClient
public class ConfigClientApplication {
public static void main(String[] args) {
SpringApplication.run(ConfigClientApplication.class, args);
}
// 配置属性
@Component
@ConfigurationProperties(prefix = "app")
@RefreshScope
public static class AppConfig {
private String name;
private String version;
private DatabaseConfig database;
private CacheConfig cache;
// Getters and Setters
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getVersion() { return version; }
public void setVersion(String version) { this.version = version; }
public DatabaseConfig getDatabase() { return database; }
public void setDatabase(DatabaseConfig database) { this.database = database; }
public CacheConfig getCache() { return cache; }
public void setCache(CacheConfig cache) { this.cache = cache; }
public static class DatabaseConfig {
private String url;
private String username;
private String password;
private int maxConnections;
// Getters and Setters
public String getUrl() { return url; }
public void setUrl(String url) { this.url = url; }
public String getUsername() { return username; }
public void setUsername(String username) { this.username = username; }
public String getPassword() { return password; }
public void setPassword(String password) { this.password = password; }
public int getMaxConnections() { return maxConnections; }
public void setMaxConnections(int maxConnections) { this.maxConnections = maxConnections; }
}
public static class CacheConfig {
private String type;
private int ttl;
private int maxSize;
// Getters and Setters
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public int getTtl() { return ttl; }
public void setTtl(int ttl) { this.ttl = ttl; }
public int getMaxSize() { return maxSize; }
public void setMaxSize(int maxSize) { this.maxSize = maxSize; }
}
}
// 配置使用示例
@RestController
public class ConfigController {
private final AppConfig appConfig;
public ConfigController(AppConfig appConfig) {
this.appConfig = appConfig;
}
@GetMapping("/config")
public Map<String, Object> getConfig() {
Map<String, Object> config = new HashMap<>();
config.put("name", appConfig.getName());
config.put("version", appConfig.getVersion());
config.put("database", appConfig.getDatabase());
config.put("cache", appConfig.getCache());
return config;
}
@PostMapping("/config/refresh")
public String refreshConfig() {
// 触发配置刷新
return "配置已刷新";
}
}
}
// 配置变更监听
@Component
public class ConfigChangeListener {
private static final Logger logger = LoggerFactory.getLogger(ConfigChangeListener.class);
@EventListener
public void handleRefreshEvent(RefreshEvent event) {
logger.info("配置刷新事件: {}", event.getEventDesc());
// 处理配置变更逻辑
handleConfigChange();
}
private void handleConfigChange() {
// 重新初始化缓存
reinitializeCache();
// 重新连接数据库
reconnectDatabase();
// 通知其他组件
notifyComponents();
}
private void reinitializeCache() {
logger.info("重新初始化缓存");
// 缓存重新初始化逻辑
}
private void reconnectDatabase() {
logger.info("重新连接数据库");
// 数据库重连逻辑
}
private void notifyComponents() {
logger.info("通知其他组件配置已变更");
// 组件通知逻辑
}
}
4.2 Nacos配置中心
// Nacos配置
@Configuration
@EnableNacosConfig
public class NacosConfig {
@Bean
@ConfigurationProperties(prefix = "nacos.config")
public NacosConfigProperties nacosConfigProperties() {
return new NacosConfigProperties();
}
@Bean
public ConfigService configService() throws NacosException {
Properties properties = new Properties();
properties.put("serverAddr", "localhost:8848");
properties.put("namespace", "public");
return NacosFactory.createConfigService(properties);
}
}
// Nacos配置管理
@Service
public class NacosConfigManager {
private final ConfigService configService;
private static final Logger logger = LoggerFactory.getLogger(NacosConfigManager.class);
public NacosConfigManager(ConfigService configService) {
this.configService = configService;
}
public String getConfig(String dataId, String group) {
try {
return configService.getConfig(dataId, group, 5000);
} catch (NacosException e) {
logger.error("获取配置失败: dataId={}, group={}", dataId, group, e);
return null;
}
}
public boolean publishConfig(String dataId, String group, String content) {
try {
return configService.publishConfig(dataId, group, content);
} catch (NacosException e) {
logger.error("发布配置失败: dataId={}, group={}", dataId, group, e);
return false;
}
}
public boolean removeConfig(String dataId, String group) {
try {
return configService.removeConfig(dataId, group);
} catch (NacosException e) {
logger.error("删除配置失败: dataId={}, group={}", dataId, group, e);
return false;
}
}
public void addListener(String dataId, String group, Listener listener) {
try {
configService.addListener(dataId, group, listener);
} catch (NacosException e) {
logger.error("添加监听器失败: dataId={}, group={}", dataId, group, e);
}
}
}
// 配置监听器
@Component
public class NacosConfigListener {
private final NacosConfigManager configManager;
private static final Logger logger = LoggerFactory.getLogger(NacosConfigListener.class);
public NacosConfigListener(NacosConfigManager configManager) {
this.configManager = configManager;
initListeners();
}
private void initListeners() {
// 监听应用配置
configManager.addListener("user-service.yml", "DEFAULT_GROUP", new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configInfo) {
logger.info("应用配置已更新: {}", configInfo);
handleAppConfigChange(configInfo);
}
});
// 监听数据库配置
configManager.addListener("database.yml", "DEFAULT_GROUP", new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configInfo) {
logger.info("数据库配置已更新: {}", configInfo);
handleDatabaseConfigChange(configInfo);
}
});
}
private void handleAppConfigChange(String configInfo) {
// 处理应用配置变更
try {
// 解析配置
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
Map<String, Object> config = mapper.readValue(configInfo, Map.class);
// 更新应用配置
updateAppConfig(config);
} catch (Exception e) {
logger.error("处理应用配置变更失败", e);
}
}
private void handleDatabaseConfigChange(String configInfo) {
// 处理数据库配置变更
try {
// 解析配置
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
Map<String, Object> config = mapper.readValue(configInfo, Map.class);
// 更新数据库配置
updateDatabaseConfig(config);
} catch (Exception e) {
logger.error("处理数据库配置变更失败", e);
}
}
private void updateAppConfig(Map<String, Object> config) {
// 更新应用配置的逻辑
logger.info("更新应用配置: {}", config);
}
private void updateDatabaseConfig(Map<String, Object> config) {
// 更新数据库配置的逻辑
logger.info("更新数据库配置: {}", config);
}
}
// 配置属性类
class NacosConfigProperties {
private String serverAddr;
private String namespace;
private String group;
private long timeout;
// Getters and Setters
public String getServerAddr() { return serverAddr; }
public void setServerAddr(String serverAddr) { this.serverAddr = serverAddr; }
public String getNamespace() { return namespace; }
public void setNamespace(String namespace) { this.namespace = namespace; }
public String getGroup() { return group; }
public void setGroup(String group) { this.group = group; }
public long getTimeout() { return timeout; }
public void setTimeout(long timeout) { this.timeout = timeout; }
}
// 动态配置服务
@Service
public class DynamicConfigService {
private final NacosConfigManager configManager;
private final Map<String, Object> configCache = new ConcurrentHashMap<>();
public DynamicConfigService(NacosConfigManager configManager) {
this.configManager = configManager;
loadInitialConfigs();
}
private void loadInitialConfigs() {
// 加载初始配置
loadConfig("user-service.yml", "DEFAULT_GROUP");
loadConfig("database.yml", "DEFAULT_GROUP");
loadConfig("cache.yml", "DEFAULT_GROUP");
}
private void loadConfig(String dataId, String group) {
String config = configManager.getConfig(dataId, group);
if (config != null) {
configCache.put(dataId, config);
}
}
public String getConfigValue(String key) {
return (String) configCache.get(key);
}
public void updateConfigValue(String key, Object value) {
configCache.put(key, value);
}
public Map<String, Object> getAllConfigs() {
return new HashMap<>(configCache);
}
}
5. API网关
5.1 Spring Cloud Gateway
// Gateway配置
@SpringBootApplication
@EnableEurekaClient
public class GatewayApplication {
public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class, args);
}
// 路由配置
@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
return builder.routes()
// 用户服务路由
.route("user-service", r -> r.path("/api/users/**")
.filters(f -> f
.stripPrefix(1)
.addRequestHeader("X-Gateway", "Spring-Cloud-Gateway")
.addResponseHeader("X-Response-Time", String.valueOf(System.currentTimeMillis()))
.circuitBreaker(config -> config
.setName("user-service-cb")
.setFallbackUri("forward:/fallback/user")))
.uri("lb://user-service"))
// 订单服务路由
.route("order-service", r -> r.path("/api/orders/**")
.filters(f -> f
.stripPrefix(1)
.requestRateLimiter(config -> config
.setRateLimiter(redisRateLimiter())
.setKeyResolver(userKeyResolver())))
.uri("lb://order-service"))
// WebSocket路由
.route("websocket-service", r -> r.path("/ws/**")
.uri("lb://websocket-service"))
.build();
}
// Redis限流器
@Bean
public RedisRateLimiter redisRateLimiter() {
return new RedisRateLimiter(10, 20, 1);
}
// 用户Key解析器
@Bean
public KeyResolver userKeyResolver() {
return exchange -> {
String userId = exchange.getRequest().getHeaders().getFirst("X-User-Id");
return Mono.just(userId != null ? userId : "anonymous");
};
}
}
// 全局过滤器
@Component
public class GlobalAuthFilter implements GlobalFilter, Ordered {
private static final Logger logger = LoggerFactory.getLogger(GlobalAuthFilter.class);
private final JwtTokenUtil jwtTokenUtil;
public GlobalAuthFilter(JwtTokenUtil jwtTokenUtil) {
this.jwtTokenUtil = jwtTokenUtil;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
String path = request.getURI().getPath();
// 跳过认证的路径
if (isExcludedPath(path)) {
return chain.filter(exchange);
}
// 获取token
String token = extractToken(request);
if (token == null) {
return unauthorized(exchange);
}
// 验证token
try {
if (!jwtTokenUtil.validateToken(token)) {
return unauthorized(exchange);
}
// 添加用户信息到请求头
String userId = jwtTokenUtil.getUserIdFromToken(token);
ServerHttpRequest modifiedRequest = request.mutate()
.header("X-User-Id", userId)
.header("X-User-Token", token)
.build();
return chain.filter(exchange.mutate().request(modifiedRequest).build());
} catch (Exception e) {
logger.error("Token验证失败", e);
return unauthorized(exchange);
}
}
private boolean isExcludedPath(String path) {
return path.startsWith("/api/auth/") ||
path.startsWith("/api/public/") ||
path.equals("/actuator/health");
}
private String extractToken(ServerHttpRequest request) {
String authHeader = request.getHeaders().getFirst("Authorization");
if (authHeader != null && authHeader.startsWith("Bearer ")) {
return authHeader.substring(7);
}
return null;
}
private Mono<Void> unauthorized(ServerWebExchange exchange) {
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.UNAUTHORIZED);
response.getHeaders().add("Content-Type", "application/json");
String body = "{\"error\":\"Unauthorized\",\"message\":\"访问被拒绝\"}";
DataBuffer buffer = response.bufferFactory().wrap(body.getBytes());
return response.writeWith(Mono.just(buffer));
}
@Override
public int getOrder() {
return -100;
}
}
// 日志过滤器
@Component
public class LoggingGatewayFilter implements GlobalFilter, Ordered {
private static final Logger logger = LoggerFactory.getLogger(LoggingGatewayFilter.class);
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
long startTime = System.currentTimeMillis();
logger.info("请求开始: {} {} from {}",
request.getMethod(),
request.getURI(),
request.getRemoteAddress());
return chain.filter(exchange).then(Mono.fromRunnable(() -> {
long endTime = System.currentTimeMillis();
ServerHttpResponse response = exchange.getResponse();
logger.info("请求结束: {} {} -> {} ({}ms)",
request.getMethod(),
request.getURI(),
response.getStatusCode(),
endTime - startTime);
}));
}
@Override
public int getOrder() {
return -99;
}
}
// 熔断降级处理
@RestController
public class FallbackController {
@RequestMapping("/fallback/user")
public Mono<Map<String, Object>> userFallback() {
Map<String, Object> result = new HashMap<>();
result.put("error", "用户服务暂时不可用");
result.put("message", "请稍后重试");
result.put("timestamp", LocalDateTime.now());
return Mono.just(result);
}
@RequestMapping("/fallback/order")
public Mono<Map<String, Object>> orderFallback() {
Map<String, Object> result = new HashMap<>();
result.put("error", "订单服务暂时不可用");
result.put("message", "请稍后重试");
result.put("timestamp", LocalDateTime.now());
return Mono.just(result);
}
}
// 自定义路由过滤器
@Component
public class CustomGatewayFilterFactory extends AbstractGatewayFilterFactory<CustomGatewayFilterFactory.Config> {
public CustomGatewayFilterFactory() {
super(Config.class);
}
@Override
public GatewayFilter apply(Config config) {
return (exchange, chain) -> {
ServerHttpRequest request = exchange.getRequest();
// 添加自定义逻辑
if (config.isPreLog()) {
System.out.println("Pre Filter: " + request.getURI());
}
return chain.filter(exchange).then(Mono.fromRunnable(() -> {
if (config.isPostLog()) {
System.out.println("Post Filter: " + exchange.getResponse().getStatusCode());
}
}));
};
}
public static class Config {
private boolean preLog;
private boolean postLog;
public boolean isPreLog() { return preLog; }
public void setPreLog(boolean preLog) { this.preLog = preLog; }
public boolean isPostLog() { return postLog; }
public void setPostLog(boolean postLog) { this.postLog = postLog; }
}
}
5.2 Zuul网关
// Zuul配置
@SpringBootApplication
@EnableZuulProxy
@EnableEurekaClient
public class ZuulGatewayApplication {
public static void main(String[] args) {
SpringApplication.run(ZuulGatewayApplication.class, args);
}
}
// Zuul过滤器
@Component
public class AuthenticationFilter extends ZuulFilter {
private static final Logger logger = LoggerFactory.getLogger(AuthenticationFilter.class);
private final JwtTokenUtil jwtTokenUtil;
public AuthenticationFilter(JwtTokenUtil jwtTokenUtil) {
this.jwtTokenUtil = jwtTokenUtil;
}
@Override
public String filterType() {
return "pre";
}
@Override
public int filterOrder() {
return 1;
}
@Override
public boolean shouldFilter() {
RequestContext ctx = RequestContext.getCurrentContext();
HttpServletRequest request = ctx.getRequest();
String path = request.getRequestURI();
// 跳过认证的路径
return !isExcludedPath(path);
}
@Override
public Object run() throws ZuulException {
RequestContext ctx = RequestContext.getCurrentContext();
HttpServletRequest request = ctx.getRequest();
String token = extractToken(request);
if (token == null || !jwtTokenUtil.validateToken(token)) {
logger.warn("无效的访问令牌: {}", request.getRequestURI());
ctx.setSendZuulResponse(false);
ctx.setResponseStatusCode(HttpStatus.UNAUTHORIZED.value());
ctx.setResponseBody("{\"error\":\"Unauthorized\",\"message\":\"访问被拒绝\"}");
ctx.getResponse().setContentType("application/json");
return null;
}
// 添加用户信息到请求头
String userId = jwtTokenUtil.getUserIdFromToken(token);
ctx.addZuulRequestHeader("X-User-Id", userId);
ctx.addZuulRequestHeader("X-User-Token", token);
return null;
}
private boolean isExcludedPath(String path) {
return path.startsWith("/api/auth/") ||
path.startsWith("/api/public/") ||
path.equals("/actuator/health");
}
private String extractToken(HttpServletRequest request) {
String authHeader = request.getHeader("Authorization");
if (authHeader != null && authHeader.startsWith("Bearer ")) {
return authHeader.substring(7);
}
return null;
}
}
// 请求日志过滤器
@Component
public class RequestLoggingFilter extends ZuulFilter {
private static final Logger logger = LoggerFactory.getLogger(RequestLoggingFilter.class);
@Override
public String filterType() {
return "pre";
}
@Override
public int filterOrder() {
return 0;
}
@Override
public boolean shouldFilter() {
return true;
}
@Override
public Object run() throws ZuulException {
RequestContext ctx = RequestContext.getCurrentContext();
HttpServletRequest request = ctx.getRequest();
logger.info("请求: {} {} from {}",
request.getMethod(),
request.getRequestURL(),
request.getRemoteAddr());
ctx.set("startTime", System.currentTimeMillis());
return null;
}
}
// 响应日志过滤器
@Component
public class ResponseLoggingFilter extends ZuulFilter {
private static final Logger logger = LoggerFactory.getLogger(ResponseLoggingFilter.class);
@Override
public String filterType() {
return "post";
}
@Override
public int filterOrder() {
return 1000;
}
@Override
public boolean shouldFilter() {
return true;
}
@Override
public Object run() throws ZuulException {
RequestContext ctx = RequestContext.getCurrentContext();
HttpServletRequest request = ctx.getRequest();
Long startTime = (Long) ctx.get("startTime");
long duration = startTime != null ? System.currentTimeMillis() - startTime : 0;
logger.info("响应: {} {} -> {} ({}ms)",
request.getMethod(),
request.getRequestURL(),
ctx.getResponseStatusCode(),
duration);
return null;
}
}
6. 分布式链路追踪
6.1 Spring Cloud Sleuth
// Sleuth配置
@SpringBootApplication
@EnableZipkinServer // 如果作为Zipkin服务器
public class TracingApplication {
public static void main(String[] args) {
SpringApplication.run(TracingApplication.class, args);
}
// 自定义采样器
@Bean
public ProbabilityBasedSampler defaultSampler() {
return new ProbabilityBasedSampler(0.1f); // 10%采样率
}
// 自定义Span标签
@Bean
public SpanCustomizer spanCustomizer() {
return span -> {
span.tag("service.version", "1.0.0");
span.tag("environment", "production");
};
}
}
// 链路追踪服务
@Service
public class TracingService {
private static final Logger logger = LoggerFactory.getLogger(TracingService.class);
private final Tracer tracer;
private final RestTemplate restTemplate;
public TracingService(Tracer tracer, RestTemplate restTemplate) {
this.tracer = tracer;
this.restTemplate = restTemplate;
}
@NewSpan("user-operation")
public User processUser(@SpanTag("userId") String userId) {
Span span = tracer.nextSpan().name("process-user").start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
span.tag("user.id", userId);
span.annotate("开始处理用户");
// 调用其他服务
User user = callUserService(userId);
// 调用订单服务
List<Order> orders = callOrderService(userId);
user.setOrders(orders);
span.tag("user.orders.count", String.valueOf(orders.size()));
span.annotate("用户处理完成");
return user;
} catch (Exception e) {
span.tag("error", e.getMessage());
throw e;
} finally {
span.end();
}
}
@NewSpan("call-user-service")
private User callUserService(@SpanTag("userId") String userId) {
logger.info("调用用户服务: {}", userId);
// 添加自定义标签
Span currentSpan = tracer.currentSpan();
if (currentSpan != null) {
currentSpan.tag("service.name", "user-service");
currentSpan.tag("operation.type", "http-call");
}
return restTemplate.getForObject(
"http://user-service/users/" + userId, User.class);
}
@NewSpan("call-order-service")
private List<Order> callOrderService(@SpanTag("userId") String userId) {
logger.info("调用订单服务: {}", userId);
Span currentSpan = tracer.currentSpan();
if (currentSpan != null) {
currentSpan.tag("service.name", "order-service");
currentSpan.tag("operation.type", "http-call");
}
Order[] orders = restTemplate.getForObject(
"http://order-service/orders/user/" + userId, Order[].class);
return orders != null ? Arrays.asList(orders) : Collections.emptyList();
}
// 异步处理
@Async
@NewSpan("async-processing")
public CompletableFuture<Void> processAsync(@SpanTag("taskId") String taskId) {
Span span = tracer.nextSpan().name("async-task").start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
span.tag("task.id", taskId);
span.annotate("开始异步处理");
// 模拟异步处理
Thread.sleep(1000);
span.annotate("异步处理完成");
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
span.tag("error", e.getMessage());
throw new RuntimeException(e);
} finally {
span.end();
}
}
}
// 自定义链路追踪拦截器
@Component
public class TracingInterceptor implements HandlerInterceptor {
private final Tracer tracer;
public TracingInterceptor(Tracer tracer) {
this.tracer = tracer;
}
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
Span span = tracer.nextSpan()
.name("http-" + request.getMethod().toLowerCase())
.tag("http.method", request.getMethod())
.tag("http.url", request.getRequestURL().toString())
.tag("http.user_agent", request.getHeader("User-Agent"))
.start();
request.setAttribute("currentSpan", span);
tracer.withSpanInScope(span);
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) {
Span span = (Span) request.getAttribute("currentSpan");
if (span != null) {
span.tag("http.status_code", String.valueOf(response.getStatus()));
if (ex != null) {
span.tag("error", ex.getMessage());
}
span.end();
}
}
}
// 数据库链路追踪
@Component
public class DatabaseTracingAspect {
private final Tracer tracer;
public DatabaseTracingAspect(Tracer tracer) {
this.tracer = tracer;
}
@Around("@annotation(org.springframework.data.jpa.repository.Query)")
public Object traceQuery(ProceedingJoinPoint joinPoint) throws Throwable {
Span span = tracer.nextSpan().name("db-query").start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
span.tag("db.type", "mysql");
span.tag("db.operation", "query");
span.tag("component", "spring-data-jpa");
Object result = joinPoint.proceed();
if (result instanceof Collection) {
span.tag("db.rows_affected", String.valueOf(((Collection<?>) result).size()));
}
return result;
} catch (Exception e) {
span.tag("error", e.getMessage());
throw e;
} finally {
span.end();
}
}
}
6.2 Jaeger集成
// Jaeger配置
@Configuration
public class JaegerConfig {
@Bean
public JaegerTracer jaegerTracer() {
Configuration.SamplerConfiguration samplerConfig =
Configuration.SamplerConfiguration.fromEnv()
.withType(ConstSampler.TYPE)
.withParam(1);
Configuration.ReporterConfiguration reporterConfig =
Configuration.ReporterConfiguration.fromEnv()
.withLogSpans(true);
Configuration config = new Configuration("user-service")
.withSampler(samplerConfig)
.withReporter(reporterConfig);
return config.getTracer();
}
@Bean
public OpenTracing openTracing(JaegerTracer jaegerTracer) {
GlobalTracer.register(jaegerTracer);
return new OpenTracing(jaegerTracer);
}
}
// OpenTracing使用示例
@Service
public class OpenTracingService {
private static final Logger logger = LoggerFactory.getLogger(OpenTracingService.class);
@Traced(operationName = "business-operation")
public String performBusinessOperation(String input) {
Span span = GlobalTracer.get().activeSpan();
if (span != null) {
span.setTag("input.length", input.length());
span.log("开始业务操作");
}
try {
// 模拟业务逻辑
Thread.sleep(100);
String result = "处理结果: " + input.toUpperCase();
if (span != null) {
span.setTag("result.length", result.length());
span.log("业务操作完成");
}
return result;
} catch (Exception e) {
if (span != null) {
span.setTag("error", true);
span.log(Collections.singletonMap("error.message", e.getMessage()));
}
throw new RuntimeException(e);
}
}
public void manualTracing() {
Tracer tracer = GlobalTracer.get();
Span span = tracer.buildSpan("manual-operation").start();
try (Scope scope = tracer.scopeManager().activate(span)) {
span.setTag("component", "manual");
span.log("手动创建的Span");
// 创建子Span
Span childSpan = tracer.buildSpan("child-operation")
.asChildOf(span)
.start();
try (Scope childScope = tracer.scopeManager().activate(childSpan)) {
childSpan.setTag("child.component", "manual-child");
childSpan.log("子操作");
// 模拟操作
Thread.sleep(50);
} finally {
childSpan.finish();
}
} catch (Exception e) {
span.setTag("error", true);
span.log(Collections.singletonMap("error.message", e.getMessage()));
} finally {
span.finish();
}
}
}
7. 微服务监控
7.1 Spring Boot Actuator
// Actuator配置
@Configuration
public class ActuatorConfig {
// 自定义健康检查
@Bean
public HealthIndicator customHealthIndicator() {
return new HealthIndicator() {
@Override
public Health health() {
// 检查数据库连接
if (isDatabaseHealthy()) {
return Health.up()
.withDetail("database", "连接正常")
.withDetail("timestamp", LocalDateTime.now())
.build();
} else {
return Health.down()
.withDetail("database", "连接异常")
.withDetail("timestamp", LocalDateTime.now())
.build();
}
}
private boolean isDatabaseHealthy() {
// 实际的数据库健康检查逻辑
return true;
}
};
}
// 自定义指标
@Bean
public MeterRegistryCustomizer<MeterRegistry> metricsCommonTags() {
return registry -> {
registry.config().commonTags("application", "user-service");
registry.config().commonTags("environment", "production");
};
}
}
// 业务指标收集
@Service
public class MetricsService {
private final MeterRegistry meterRegistry;
private final Counter userCreatedCounter;
private final Timer userProcessingTimer;
private final Gauge activeUsersGauge;
private final AtomicInteger activeUsers = new AtomicInteger(0);
public MetricsService(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
// 计数器
this.userCreatedCounter = Counter.builder("users.created")
.description("创建的用户数量")
.register(meterRegistry);
// 计时器
this.userProcessingTimer = Timer.builder("user.processing.time")
.description("用户处理时间")
.register(meterRegistry);
// 仪表盘
this.activeUsersGauge = Gauge.builder("users.active")
.description("活跃用户数量")
.register(meterRegistry, activeUsers, AtomicInteger::get);
}
public void recordUserCreated() {
userCreatedCounter.increment();
}
public void recordUserCreated(String userType) {
Counter.builder("users.created")
.tag("type", userType)
.register(meterRegistry)
.increment();
}
public <T> T recordUserProcessingTime(Supplier<T> operation) {
return userProcessingTimer.recordCallable(operation::get);
}
public void incrementActiveUsers() {
activeUsers.incrementAndGet();
}
public void decrementActiveUsers() {
activeUsers.decrementAndGet();
}
// 自定义分布摘要
public void recordOrderAmount(double amount) {
DistributionSummary.builder("order.amount")
.description("订单金额分布")
.register(meterRegistry)
.record(amount);
}
}
// 监控端点
@RestController
@RequestMapping("/actuator/custom")
public class CustomActuatorController {
private final MetricsService metricsService;
private final ApplicationContext applicationContext;
public CustomActuatorController(MetricsService metricsService, ApplicationContext applicationContext) {
this.metricsService = metricsService;
this.applicationContext = applicationContext;
}
@GetMapping("/info")
public Map<String, Object> customInfo() {
Map<String, Object> info = new HashMap<>();
info.put("service", "user-service");
info.put("version", "1.0.0");
info.put("build-time", LocalDateTime.now());
info.put("active-profiles", applicationContext.getEnvironment().getActiveProfiles());
return info;
}
@GetMapping("/metrics/summary")
public Map<String, Object> metricsSummary() {
Map<String, Object> metrics = new HashMap<>();
metrics.put("timestamp", LocalDateTime.now());
metrics.put("jvm-memory", getJvmMemoryInfo());
metrics.put("system-cpu", getSystemCpuInfo());
metrics.put("business-metrics", getBusinessMetrics());
return metrics;
}
private Map<String, Object> getJvmMemoryInfo() {
MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
MemoryUsage heapUsage = memoryBean.getHeapMemoryUsage();
Map<String, Object> memory = new HashMap<>();
memory.put("heap-used", heapUsage.getUsed());
memory.put("heap-max", heapUsage.getMax());
memory.put("heap-usage-percent", (double) heapUsage.getUsed() / heapUsage.getMax() * 100);
return memory;
}
private Map<String, Object> getSystemCpuInfo() {
OperatingSystemMXBean osBean = ManagementFactory.getOperatingSystemMXBean();
Map<String, Object> cpu = new HashMap<>();
cpu.put("processors", osBean.getAvailableProcessors());
cpu.put("load-average", osBean.getSystemLoadAverage());
return cpu;
}
private Map<String, Object> getBusinessMetrics() {
Map<String, Object> business = new HashMap<>();
business.put("users-created-today", getUsersCreatedToday());
business.put("active-sessions", getActiveSessions());
business.put("error-rate", getErrorRate());
return business;
}
private int getUsersCreatedToday() {
// 实际的业务逻辑
return 150;
}
private int getActiveSessions() {
// 实际的会话统计逻辑
return 45;
}
private double getErrorRate() {
// 实际的错误率计算逻辑
return 0.02;
}
}
7.2 Prometheus集成
// Prometheus配置
@Configuration
public class PrometheusConfig {
@Bean
public PrometheusMeterRegistry prometheusMeterRegistry() {
return new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
}
@Bean
public TimedAspect timedAspect(MeterRegistry registry) {
return new TimedAspect(registry);
}
@Bean
public CountedAspect countedAspect(MeterRegistry registry) {
return new CountedAspect(registry);
}
}
// Prometheus指标端点
@RestController
public class PrometheusController {
private final PrometheusMeterRegistry prometheusMeterRegistry;
public PrometheusController(PrometheusMeterRegistry prometheusMeterRegistry) {
this.prometheusMeterRegistry = prometheusMeterRegistry;
}
@GetMapping("/actuator/prometheus")
public String prometheus() {
return prometheusMeterRegistry.scrape();
}
}
// 业务指标注解
@Service
public class UserBusinessService {
@Timed(name = "user.creation.time", description = "用户创建时间")
@Counted(name = "user.creation.count", description = "用户创建次数")
public User createUser(CreateUserRequest request) {
// 用户创建逻辑
User user = new User();
user.setUsername(request.getUsername());
user.setEmail(request.getEmail());
user.setCreatedAt(LocalDateTime.now());
// 模拟处理时间
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return user;
}
@Timed(name = "user.query.time", description = "用户查询时间")
public User getUserById(String userId) {
// 用户查询逻辑
return new User(); // 简化实现
}
@Counted(name = "user.deletion.count", description = "用户删除次数")
public void deleteUser(String userId) {
// 用户删除逻辑
}
}
8. 容器化部署
8.1 Docker配置
# 用户服务Dockerfile
FROM openjdk:11-jre-slim
# 设置工作目录
WORKDIR /app
# 复制jar文件
COPY target/user-service-1.0.0.jar app.jar
# 创建非root用户
RUN groupadd -r appuser && useradd -r -g appuser appuser
RUN chown -R appuser:appuser /app
USER appuser
# 暴露端口
EXPOSE 8080
# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8080/actuator/health || exit 1
# 启动应用
ENTRYPOINT ["java", "-jar", "-Djava.security.egd=file:/dev/./urandom", "app.jar"]
# docker-compose.yml
version: '3.8'
services:
# 服务注册中心
eureka-server:
image: user-service/eureka-server:latest
ports:
- "8761:8761"
environment:
- SPRING_PROFILES_ACTIVE=docker
networks:
- microservices
# 配置中心
config-server:
image: user-service/config-server:latest
ports:
- "8888:8888"
environment:
- SPRING_PROFILES_ACTIVE=docker
- EUREKA_CLIENT_SERVICE_URL_DEFAULTZONE=http://eureka-server:8761/eureka
depends_on:
- eureka-server
networks:
- microservices
# API网关
api-gateway:
image: user-service/api-gateway:latest
ports:
- "8080:8080"
environment:
- SPRING_PROFILES_ACTIVE=docker
- EUREKA_CLIENT_SERVICE_URL_DEFAULTZONE=http://eureka-server:8761/eureka
- SPRING_CLOUD_CONFIG_URI=http://config-server:8888
depends_on:
- eureka-server
- config-server
networks:
- microservices
# 用户服务
user-service:
image: user-service/user-service:latest
ports:
- "8081:8080"
environment:
- SPRING_PROFILES_ACTIVE=docker
- EUREKA_CLIENT_SERVICE_URL_DEFAULTZONE=http://eureka-server:8761/eureka
- SPRING_CLOUD_CONFIG_URI=http://config-server:8888
- SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/userdb
- SPRING_DATASOURCE_USERNAME=root
- SPRING_DATASOURCE_PASSWORD=password
depends_on:
- eureka-server
- config-server
- mysql
networks:
- microservices
# 订单服务
order-service:
image: user-service/order-service:latest
ports:
- "8082:8080"
environment:
- SPRING_PROFILES_ACTIVE=docker
- EUREKA_CLIENT_SERVICE_URL_DEFAULTZONE=http://eureka-server:8761/eureka
- SPRING_CLOUD_CONFIG_URI=http://config-server:8888
- SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/orderdb
- SPRING_DATASOURCE_USERNAME=root
- SPRING_DATASOURCE_PASSWORD=password
depends_on:
- eureka-server
- config-server
- mysql
networks:
- microservices
# MySQL数据库
mysql:
image: mysql:8.0
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=password
- MYSQL_DATABASE=userdb
volumes:
- mysql_data:/var/lib/mysql
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
networks:
- microservices
# Redis缓存
redis:
image: redis:6-alpine
ports:
- "6379:6379"
networks:
- microservices
# Zipkin链路追踪
zipkin:
image: openzipkin/zipkin
ports:
- "9411:9411"
networks:
- microservices
# Prometheus监控
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
networks:
- microservices
# Grafana可视化
grafana:
image: grafana/grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana_data:/var/lib/grafana
networks:
- microservices
volumes:
mysql_data:
grafana_data:
networks:
microservices:
driver: bridge
8.2 Kubernetes部署
# user-service-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: user-service
labels:
app: user-service
spec:
replicas: 3
selector:
matchLabels:
app: user-service
template:
metadata:
labels:
app: user-service
spec:
containers:
- name: user-service
image: user-service/user-service:latest
ports:
- containerPort: 8080
env:
- name: SPRING_PROFILES_ACTIVE
value: "kubernetes"
- name: EUREKA_CLIENT_SERVICE_URL_DEFAULTZONE
value: "http://eureka-server:8761/eureka"
- name: SPRING_DATASOURCE_URL
value: "jdbc:mysql://mysql:3306/userdb"
- name: SPRING_DATASOURCE_USERNAME
valueFrom:
secretKeyRef:
name: mysql-secret
key: username
- name: SPRING_DATASOURCE_PASSWORD
valueFrom:
secretKeyRef:
name: mysql-secret
key: password
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
livenessProbe:
httpGet:
path: /actuator/health
port: 8080
initialDelaySeconds: 60
periodSeconds: 30
readinessProbe:
httpGet:
path: /actuator/health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: user-service
spec:
selector:
app: user-service
ports:
- protocol: TCP
port: 80
targetPort: 8080
type: ClusterIP
---
apiVersion: v1
kind: Secret
metadata:
name: mysql-secret
type: Opaque
data:
username: cm9vdA== # base64 encoded 'root'
password: cGFzc3dvcmQ= # base64 encoded 'password'
9. 微服务最佳实践
9.1 服务设计原则
// 单一职责原则示例
@Service
public class UserService {
// 只负责用户相关的业务逻辑
public User createUser(CreateUserRequest request) {
// 用户创建逻辑
return new User();
}
public User getUserById(String userId) {
// 用户查询逻辑
return new User();
}
public void updateUser(String userId, UpdateUserRequest request) {
// 用户更新逻辑
}
public void deleteUser(String userId) {
// 用户删除逻辑
}
}
// 数据一致性处理
@Service
public class OrderSagaService {
private final UserService userService;
private final PaymentService paymentService;
private final InventoryService inventoryService;
private final NotificationService notificationService;
public OrderSagaService(UserService userService, PaymentService paymentService,
InventoryService inventoryService, NotificationService notificationService) {
this.userService = userService;
this.paymentService = paymentService;
this.inventoryService = inventoryService;
this.notificationService = notificationService;
}
@SagaOrchestrationStart
public void processOrder(CreateOrderRequest request) {
try {
// 1. 验证用户
User user = userService.validateUser(request.getUserId());
// 2. 检查库存
inventoryService.reserveInventory(request.getProductId(), request.getQuantity());
// 3. 处理支付
Payment payment = paymentService.processPayment(request.getPaymentInfo());
// 4. 创建订单
Order order = createOrder(request, user, payment);
// 5. 发送通知
notificationService.sendOrderConfirmation(order);
} catch (Exception e) {
// 补偿操作
compensateOrder(request);
throw new OrderProcessingException("订单处理失败", e);
}
}
private void compensateOrder(CreateOrderRequest request) {
try {
// 释放库存
inventoryService.releaseInventory(request.getProductId(), request.getQuantity());
// 退款
paymentService.refundPayment(request.getPaymentInfo());
} catch (Exception e) {
// 记录补偿失败,需要人工介入
logger.error("订单补偿失败,需要人工处理: {}", request, e);
}
}
private Order createOrder(CreateOrderRequest request, User user, Payment payment) {
Order order = new Order();
order.setUserId(user.getId());
order.setProductId(request.getProductId());
order.setQuantity(request.getQuantity());
order.setPaymentId(payment.getId());
order.setStatus(OrderStatus.CONFIRMED);
order.setCreatedAt(LocalDateTime.now());
return order;
}
}
// 服务降级和熔断
@Component
public class ResilientUserService {
private final UserService userService;
private final CircuitBreaker circuitBreaker;
private final RateLimiter rateLimiter;
private final Retry retry;
public ResilientUserService(UserService userService) {
this.userService = userService;
// 配置熔断器
this.circuitBreaker = CircuitBreaker.ofDefaults("userService");
circuitBreaker.getEventPublisher()
.onStateTransition(event ->
logger.info("熔断器状态变更: {}", event));
// 配置限流器
this.rateLimiter = RateLimiter.ofDefaults("userService");
// 配置重试
this.retry = Retry.ofDefaults("userService");
}
public User getUserWithResilience(String userId) {
Supplier<User> decoratedSupplier = Decorators.ofSupplier(() -> userService.getUserById(userId))
.withCircuitBreaker(circuitBreaker)
.withRateLimiter(rateLimiter)
.withRetry(retry)
.withFallback(Arrays.asList(Exception.class),
throwable -> getUserFallback(userId, throwable))
.decorate();
return decoratedSupplier.get();
}
private User getUserFallback(String userId, Throwable throwable) {
logger.warn("获取用户信息失败,使用降级方案: userId={}", userId, throwable);
// 返回默认用户信息
User fallbackUser = new User();
fallbackUser.setId(userId);
fallbackUser.setUsername("Unknown User");
fallbackUser.setEmail("unknown@example.com");
return fallbackUser;
}
}
9.2 性能优化
// 缓存策略
@Service
public class CachedUserService {
private final UserRepository userRepository;
private final RedisTemplate<String, Object> redisTemplate;
public CachedUserService(UserRepository userRepository, RedisTemplate<String, Object> redisTemplate) {
this.userRepository = userRepository;
this.redisTemplate = redisTemplate;
}
@Cacheable(value = "users", key = "#userId")
public User getUserById(String userId) {
return userRepository.findById(userId)
.orElseThrow(() -> new UserNotFoundException("用户不存在: " + userId));
}
@CacheEvict(value = "users", key = "#user.id")
public User updateUser(User user) {
return userRepository.save(user);
}
@CacheEvict(value = "users", key = "#userId")
public void deleteUser(String userId) {
userRepository.deleteById(userId);
}
// 批量缓存预热
@PostConstruct
public void warmUpCache() {
List<User> activeUsers = userRepository.findActiveUsers();
activeUsers.forEach(user -> {
String key = "users::" + user.getId();
redisTemplate.opsForValue().set(key, user, Duration.ofHours(1));
});
}
}
// 异步处理
@Service
public class AsyncUserService {
private final UserService userService;
private final NotificationService notificationService;
public AsyncUserService(UserService userService, NotificationService notificationService) {
this.userService = userService;
this.notificationService = notificationService;
}
@Async("userTaskExecutor")
public CompletableFuture<User> createUserAsync(CreateUserRequest request) {
try {
User user = userService.createUser(request);
// 异步发送欢迎邮件
sendWelcomeEmailAsync(user);
return CompletableFuture.completedFuture(user);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
@Async
public void sendWelcomeEmailAsync(User user) {
try {
notificationService.sendWelcomeEmail(user);
} catch (Exception e) {
logger.error("发送欢迎邮件失败: userId={}", user.getId(), e);
}
}
// 批量处理
@Async
public CompletableFuture<List<User>> batchCreateUsers(List<CreateUserRequest> requests) {
List<CompletableFuture<User>> futures = requests.stream()
.map(this::createUserAsync)
.collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
}
// 线程池配置
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean("userTaskExecutor")
public TaskExecutor userTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("user-async-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
10. 本章小结
微服务架构技术对比
技术领域 | 技术选型 | 优势 | 适用场景 |
---|---|---|---|
服务注册发现 | Eureka | 简单易用,Spring生态 | 中小型项目 |
Consul | 功能丰富,多语言支持 | 大型分布式系统 | |
Nacos | 配置+注册中心一体 | 阿里云生态 | |
API网关 | Spring Cloud Gateway | 响应式,性能好 | 新项目推荐 |
Zuul | 成熟稳定,生态丰富 | 传统项目 | |
配置中心 | Spring Cloud Config | Git集成,版本控制 | 配置变更频繁 |
Nacos | 动态配置,实时推送 | 配置实时性要求高 | |
链路追踪 | Sleuth+Zipkin | Spring集成简单 | Spring项目 |
Jaeger | 性能好,功能丰富 | 大规模分布式系统 |
性能优化要点
服务拆分粒度
- 按业务领域拆分
- 避免过度拆分
- 考虑数据一致性
通信优化
- 使用连接池
- 实现熔断降级
- 异步处理非关键路径
缓存策略
- 多级缓存
- 缓存预热
- 缓存更新策略
数据库优化
- 读写分离
- 分库分表
- 连接池优化
安全考虑
服务间认证
- JWT令牌
- OAuth2.0
- 服务网格安全
网络安全
- HTTPS通信
- 网络隔离
- 防火墙配置
数据安全
- 敏感数据加密
- 访问控制
- 审计日志
开发建议
设计原则
- 单一职责
- 高内聚低耦合
- 容错设计
监控运维
- 全链路监控
- 日志聚合
- 自动化部署
团队协作
- API契约管理
- 版本控制策略
- 文档维护
下一章预告
下一章我们将学习Spring Boot进阶,包括: - Spring Boot自动配置原理 - 自定义Starter开发 - Spring Boot测试 - 生产环境部署 - 性能调优
练习题
- 设计一个电商微服务架构,包含用户、商品、订单、支付等服务
- 实现一个简单的服务注册发现机制
- 使用Spring Cloud Gateway实现API网关
- 集成分布式链路追踪
- 实现微服务监控和告警