6.1 消息总线概述

什么是消息总线

消息总线(Message Bus)是微服务架构中用于实现服务间异步通信的基础设施。它提供了发布-订阅模式的消息传递机制,使得服务之间可以通过消息进行解耦的通信。

消息总线的作用

┌─────────────────────────────────────────────────────────────────┐
│                        消息总线架构图                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐ │
│  │   User Service  │    │ Product Service │    │ Order Service   │ │
│  │                 │    │                 │    │                 │ │
│  │  ┌───────────┐  │    │  ┌───────────┐  │    │  ┌───────────┐  │ │
│  │  │ Publisher │  │    │  │Subscriber │  │    │  │Publisher/ │  │ │
│  │  │           │  │    │  │           │  │    │  │Subscriber │  │ │
│  │  └─────┬─────┘  │    │  └─────┬─────┘  │    │  └─────┬─────┘  │ │
│  └────────┼────────┘    └────────┼────────┘    └────────┼────────┘ │
│           │                      │                      │          │
│           └──────────────────────┼──────────────────────┘          │
│                                  │                                 │
│                                  ▼                                 │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                      消息总线                               │   │
│  │  ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │   │
│  │  │   消息路由   │ │   消息存储   │ │       消息分发          │ │   │
│  │  └─────────────┘ └─────────────┘ └─────────────────────────┘ │   │
│  │  ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │   │
│  │  │   消息过滤   │ │   消息转换   │ │       消息监控          │ │   │
│  │  └─────────────┘ └─────────────┘ └─────────────────────────┘ │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                  │                                 │
│                                  ▼                                 │
│  ┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐ │
│  │ Payment Service │    │ Notification    │    │ Audit Service   │ │
│  │                 │    │ Service         │    │                 │ │
│  │  ┌───────────┐  │    │  ┌───────────┐  │    │  ┌───────────┐  │ │
│  │  │Subscriber │  │    │  │Subscriber │  │    │  │Subscriber │  │ │
│  │  │           │  │    │  │           │  │    │  │           │  │ │
│  │  └───────────┘  │    │  └───────────┘  │    │  └───────────┘  │ │
│  └─────────────────┘    └─────────────────┘    └─────────────────┘ │
└─────────────────────────────────────────────────────────────────┘

核心功能

  1. 异步通信:服务间通过消息进行异步通信,提高系统响应性
  2. 服务解耦:发布者和订阅者之间松耦合,降低系统复杂度
  3. 事件驱动:支持事件驱动架构,实现业务流程的自动化
  4. 消息路由:根据消息类型和规则进行智能路由
  5. 消息持久化:保证消息的可靠传递和持久化存储
  6. 负载均衡:支持消息的负载均衡分发
  7. 故障恢复:提供消息重试和死信队列机制
  8. 监控管理:提供消息传递的监控和管理功能

消息模式

  1. 点对点模式(Queue):一对一的消息传递
  2. 发布订阅模式(Topic):一对多的消息广播
  3. 请求响应模式(RPC):同步的请求响应通信
  4. 工作队列模式:任务分发和负载均衡

6.2 Spring Cloud Bus

Bus简介

Spring Cloud Bus是Spring Cloud生态系统中的消息总线解决方案,它使用轻量级的消息代理(如RabbitMQ、Kafka)来连接分布式系统的节点,实现配置刷新、服务状态同步等功能。

核心特性

  1. 配置刷新:实现配置的自动刷新和同步
  2. 事件传播:支持自定义事件的传播
  3. 服务状态同步:同步服务的状态信息
  4. 集群管理:支持集群节点的管理

6.3 RabbitMQ集成

1. RabbitMQ安装配置

Docker安装RabbitMQ

docker-compose.yml

version: '3.8'
services:
  rabbitmq:
    image: rabbitmq:3.11-management
    container_name: rabbitmq
    hostname: rabbitmq
    ports:
      - "5672:5672"    # AMQP端口
      - "15672:15672"  # 管理界面端口
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: admin123
      RABBITMQ_DEFAULT_VHOST: /
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
      - ./rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
      - ./rabbitmq/definitions.json:/etc/rabbitmq/definitions.json
    networks:
      - microservice-network
    restart: unless-stopped

volumes:
  rabbitmq_data:

networks:
  microservice-network:
    driver: bridge

RabbitMQ配置文件

rabbitmq/rabbitmq.conf

# RabbitMQ配置文件

# 网络配置
listeners.tcp.default = 5672
management.tcp.port = 15672

# 集群配置
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@rabbitmq

# 内存和磁盘限制
vm_memory_high_watermark.relative = 0.6
disk_free_limit.relative = 2.0

# 日志配置
log.console = true
log.console.level = info
log.file = /var/log/rabbitmq/rabbit.log
log.file.level = info

# 管理插件
management.load_definitions = /etc/rabbitmq/definitions.json

# 心跳配置
heartbeat = 60

# 连接配置
num_acceptors.tcp = 10
handshake_timeout = 10000

预定义配置

rabbitmq/definitions.json

{
  "rabbit_version": "3.11.0",
  "rabbitmq_version": "3.11.0",
  "users": [
    {
      "name": "admin",
      "password_hash": "hashed_password",
      "hashing_algorithm": "rabbit_password_hashing_sha256",
      "tags": "administrator"
    },
    {
      "name": "microservice",
      "password_hash": "hashed_password",
      "hashing_algorithm": "rabbit_password_hashing_sha256",
      "tags": "management"
    }
  ],
  "vhosts": [
    {
      "name": "/"
    },
    {
      "name": "/microservice"
    }
  ],
  "permissions": [
    {
      "user": "admin",
      "vhost": "/",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    },
    {
      "user": "microservice",
      "vhost": "/microservice",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    }
  ],
  "exchanges": [
    {
      "name": "microservice.topic",
      "vhost": "/microservice",
      "type": "topic",
      "durable": true,
      "auto_delete": false,
      "internal": false,
      "arguments": {}
    },
    {
      "name": "microservice.direct",
      "vhost": "/microservice",
      "type": "direct",
      "durable": true,
      "auto_delete": false,
      "internal": false,
      "arguments": {}
    }
  ],
  "queues": [
    {
      "name": "user.events",
      "vhost": "/microservice",
      "durable": true,
      "auto_delete": false,
      "arguments": {
        "x-message-ttl": 86400000,
        "x-max-length": 10000
      }
    },
    {
      "name": "order.events",
      "vhost": "/microservice",
      "durable": true,
      "auto_delete": false,
      "arguments": {
        "x-message-ttl": 86400000,
        "x-max-length": 10000
      }
    }
  ],
  "bindings": [
    {
      "source": "microservice.topic",
      "vhost": "/microservice",
      "destination": "user.events",
      "destination_type": "queue",
      "routing_key": "user.*",
      "arguments": {}
    },
    {
      "source": "microservice.topic",
      "vhost": "/microservice",
      "destination": "order.events",
      "destination_type": "queue",
      "routing_key": "order.*",
      "arguments": {}
    }
  ]
}

2. Spring Cloud Bus配置

添加依赖

pom.xml

<!-- Spring Cloud Bus -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>

<!-- Spring Boot AMQP -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<!-- Actuator -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

配置文件

application.yml

spring:
  application:
    name: user-service
  
  # RabbitMQ配置
  rabbitmq:
    host: localhost
    port: 5672
    username: microservice
    password: microservice123
    virtual-host: /microservice
    
    # 连接配置
    connection-timeout: 15000
    requested-heartbeat: 60
    
    # 发布者确认
    publisher-confirm-type: correlated
    publisher-returns: true
    
    # 消费者配置
    listener:
      simple:
        # 手动确认
        acknowledge-mode: manual
        # 并发消费者数量
        concurrency: 1
        max-concurrency: 5
        # 预取数量
        prefetch: 1
        # 重试配置
        retry:
          enabled: true
          initial-interval: 1000
          max-attempts: 3
          max-interval: 10000
          multiplier: 2
        # 死信队列
        default-requeue-rejected: false
    
    # 模板配置
    template:
      mandatory: true
      receive-timeout: 5000
      reply-timeout: 5000
      retry:
        enabled: true
        initial-interval: 1000
        max-attempts: 3
        max-interval: 10000
        multiplier: 2
  
  # Spring Cloud Bus配置
  cloud:
    bus:
      enabled: true
      refresh:
        enabled: true
      env:
        enabled: true
      trace:
        enabled: true
      destination: springCloudBus
      ack:
        enabled: true
        destination-service: ${spring.application.name}

# 监控配置
management:
  endpoints:
    web:
      exposure:
        include: bus-refresh,bus-env,health,info,metrics
  endpoint:
    health:
      show-details: always
    bus-refresh:
      enabled: true
    bus-env:
      enabled: true

# 日志配置
logging:
  level:
    org.springframework.amqp: DEBUG
    org.springframework.cloud.bus: DEBUG
    com.example: DEBUG

3. 自定义事件

用户事件定义

UserEvent.java

package com.example.userservice.event;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;

/**
 * 用户事件基类
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public abstract class UserEvent extends RemoteApplicationEvent {
    
    private Long userId;
    private String username;
    private Long timestamp;
    
    public UserEvent(Object source, String originService, String destinationService,
                    Long userId, String username) {
        super(source, originService, destinationService);
        this.userId = userId;
        this.username = username;
        this.timestamp = System.currentTimeMillis();
    }
}

用户注册事件

UserRegisteredEvent.java

package com.example.userservice.event;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

/**
 * 用户注册事件
 */
@Data
@EqualsAndHashCode(callSuper = true)
@NoArgsConstructor
public class UserRegisteredEvent extends UserEvent {
    
    private String email;
    private String phone;
    
    public UserRegisteredEvent(Object source, String originService, String destinationService,
                              Long userId, String username, String email, String phone) {
        super(source, originService, destinationService, userId, username);
        this.email = email;
        this.phone = phone;
    }
}

用户登录事件

UserLoginEvent.java

package com.example.userservice.event;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

/**
 * 用户登录事件
 */
@Data
@EqualsAndHashCode(callSuper = true)
@NoArgsConstructor
public class UserLoginEvent extends UserEvent {
    
    private String ipAddress;
    private String userAgent;
    private String loginType;
    
    public UserLoginEvent(Object source, String originService, String destinationService,
                         Long userId, String username, String ipAddress, 
                         String userAgent, String loginType) {
        super(source, originService, destinationService, userId, username);
        this.ipAddress = ipAddress;
        this.userAgent = userAgent;
        this.loginType = loginType;
    }
}

4. 事件发布

事件发布服务

EventPublisher.java

package com.example.userservice.service;

import com.example.userservice.event.UserLoginEvent;
import com.example.userservice.event.UserRegisteredEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.bus.BusProperties;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;

/**
 * 事件发布服务
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class EventPublisher {
    
    private final ApplicationEventPublisher applicationEventPublisher;
    private final BusProperties busProperties;
    
    /**
     * 发布用户注册事件
     */
    public void publishUserRegisteredEvent(Long userId, String username, 
                                          String email, String phone) {
        UserRegisteredEvent event = new UserRegisteredEvent(
            this,
            busProperties.getId(),
            null, // 广播到所有服务
            userId,
            username,
            email,
            phone
        );
        
        log.info("Publishing user registered event: {}", event);
        applicationEventPublisher.publishEvent(event);
    }
    
    /**
     * 发布用户登录事件
     */
    public void publishUserLoginEvent(Long userId, String username, 
                                     String ipAddress, String userAgent, String loginType) {
        UserLoginEvent event = new UserLoginEvent(
            this,
            busProperties.getId(),
            null, // 广播到所有服务
            userId,
            username,
            ipAddress,
            userAgent,
            loginType
        );
        
        log.info("Publishing user login event: {}", event);
        applicationEventPublisher.publishEvent(event);
    }
    
    /**
     * 发布事件到特定服务
     */
    public void publishEventToService(Object event, String destinationService) {
        log.info("Publishing event to service {}: {}", destinationService, event);
        applicationEventPublisher.publishEvent(event);
    }
}

在业务服务中发布事件

UserService.java

package com.example.userservice.service;

import com.example.userservice.entity.User;
import com.example.userservice.repository.UserRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.servlet.http.HttpServletRequest;

/**
 * 用户服务
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class UserService {
    
    private final UserRepository userRepository;
    private final PasswordEncoder passwordEncoder;
    private final EventPublisher eventPublisher;
    
    /**
     * 用户注册
     */
    @Transactional
    public User register(String username, String password, String email, String phone) {
        // 检查用户是否已存在
        if (userRepository.existsByUsername(username)) {
            throw new RuntimeException("用户名已存在");
        }
        
        if (userRepository.existsByEmail(email)) {
            throw new RuntimeException("邮箱已被注册");
        }
        
        // 创建用户
        User user = new User();
        user.setUsername(username);
        user.setPassword(passwordEncoder.encode(password));
        user.setEmail(email);
        user.setPhone(phone);
        user.setEnabled(true);
        user.setCreateTime(System.currentTimeMillis());
        
        User savedUser = userRepository.save(user);
        
        // 发布用户注册事件
        eventPublisher.publishUserRegisteredEvent(
            savedUser.getId(),
            savedUser.getUsername(),
            savedUser.getEmail(),
            savedUser.getPhone()
        );
        
        log.info("User registered successfully: {}", savedUser.getUsername());
        return savedUser;
    }
    
    /**
     * 用户登录
     */
    public String login(String username, String password, HttpServletRequest request) {
        User user = userRepository.findByUsername(username)
            .orElseThrow(() -> new RuntimeException("用户不存在"));
        
        if (!passwordEncoder.matches(password, user.getPassword())) {
            throw new RuntimeException("密码错误");
        }
        
        if (!user.getEnabled()) {
            throw new RuntimeException("用户已被禁用");
        }
        
        // 生成JWT Token
        String token = generateJwtToken(user);
        
        // 发布用户登录事件
        eventPublisher.publishUserLoginEvent(
            user.getId(),
            user.getUsername(),
            getClientIpAddress(request),
            request.getHeader("User-Agent"),
            "password"
        );
        
        log.info("User logged in successfully: {}", user.getUsername());
        return token;
    }
    
    private String generateJwtToken(User user) {
        // JWT Token生成逻辑
        return "jwt_token_" + user.getId();
    }
    
    private String getClientIpAddress(HttpServletRequest request) {
        String xForwardedFor = request.getHeader("X-Forwarded-For");
        if (xForwardedFor != null && !xForwardedFor.isEmpty()) {
            return xForwardedFor.split(",")[0].trim();
        }
        
        String xRealIp = request.getHeader("X-Real-IP");
        if (xRealIp != null && !xRealIp.isEmpty()) {
            return xRealIp;
        }
        
        return request.getRemoteAddr();
    }
}

5. 事件监听

通知服务事件监听器

NotificationEventListener.java

package com.example.notificationservice.listener;

import com.example.userservice.event.UserLoginEvent;
import com.example.userservice.event.UserRegisteredEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

/**
 * 通知服务事件监听器
 */
@Component
@RequiredArgsConstructor
@Slf4j
public class NotificationEventListener {
    
    private final EmailService emailService;
    private final SmsService smsService;
    
    /**
     * 监听用户注册事件
     */
    @EventListener
    public void handleUserRegisteredEvent(UserRegisteredEvent event) {
        log.info("Received user registered event: {}", event);
        
        try {
            // 发送欢迎邮件
            emailService.sendWelcomeEmail(
                event.getEmail(),
                event.getUsername()
            );
            
            // 发送欢迎短信
            if (event.getPhone() != null && !event.getPhone().isEmpty()) {
                smsService.sendWelcomeSms(
                    event.getPhone(),
                    event.getUsername()
                );
            }
            
            log.info("Welcome notifications sent for user: {}", event.getUsername());
        } catch (Exception e) {
            log.error("Failed to send welcome notifications for user: {}", 
                     event.getUsername(), e);
        }
    }
    
    /**
     * 监听用户登录事件
     */
    @EventListener
    public void handleUserLoginEvent(UserLoginEvent event) {
        log.info("Received user login event: {}", event);
        
        try {
            // 检查是否为异常登录
            if (isAbnormalLogin(event)) {
                // 发送安全提醒邮件
                emailService.sendSecurityAlert(
                    event.getUserId(),
                    event.getUsername(),
                    event.getIpAddress(),
                    event.getTimestamp()
                );
                
                log.info("Security alert sent for abnormal login: {}", event.getUsername());
            }
            
            // 记录登录日志
            logLoginActivity(event);
            
        } catch (Exception e) {
            log.error("Failed to process login event for user: {}", 
                     event.getUsername(), e);
        }
    }
    
    private boolean isAbnormalLogin(UserLoginEvent event) {
        // 异常登录检测逻辑
        // 例如:检查IP地址、登录时间、设备信息等
        return false;
    }
    
    private void logLoginActivity(UserLoginEvent event) {
        // 记录登录活动日志
        log.info("Login activity - User: {}, IP: {}, Time: {}", 
                event.getUsername(), event.getIpAddress(), event.getTimestamp());
    }
}

审计服务事件监听器

AuditEventListener.java

package com.example.auditservice.listener;

import com.example.userservice.event.UserEvent;
import com.example.userservice.event.UserLoginEvent;
import com.example.userservice.event.UserRegisteredEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

/**
 * 审计服务事件监听器
 */
@Component
@RequiredArgsConstructor
@Slf4j
public class AuditEventListener {
    
    private final AuditLogService auditLogService;
    
    /**
     * 监听所有用户事件
     */
    @EventListener
    public void handleUserEvent(UserEvent event) {
        log.info("Received user event for audit: {}", event);
        
        try {
            // 记录审计日志
            auditLogService.recordUserEvent(
                event.getUserId(),
                event.getUsername(),
                event.getClass().getSimpleName(),
                event.getTimestamp(),
                event.getOriginService()
            );
            
            log.info("Audit log recorded for user event: {}", event.getClass().getSimpleName());
        } catch (Exception e) {
            log.error("Failed to record audit log for user event", e);
        }
    }
    
    /**
     * 监听用户注册事件(特殊处理)
     */
    @EventListener
    public void handleUserRegisteredEvent(UserRegisteredEvent event) {
        log.info("Received user registered event for audit: {}", event);
        
        try {
            // 记录详细的注册审计信息
            auditLogService.recordUserRegistration(
                event.getUserId(),
                event.getUsername(),
                event.getEmail(),
                event.getPhone(),
                event.getTimestamp()
            );
            
            log.info("User registration audit recorded: {}", event.getUsername());
        } catch (Exception e) {
            log.error("Failed to record user registration audit", e);
        }
    }
    
    /**
     * 监听用户登录事件(特殊处理)
     */
    @EventListener
    public void handleUserLoginEvent(UserLoginEvent event) {
        log.info("Received user login event for audit: {}", event);
        
        try {
            // 记录详细的登录审计信息
            auditLogService.recordUserLogin(
                event.getUserId(),
                event.getUsername(),
                event.getIpAddress(),
                event.getUserAgent(),
                event.getLoginType(),
                event.getTimestamp()
            );
            
            log.info("User login audit recorded: {}", event.getUsername());
        } catch (Exception e) {
            log.error("Failed to record user login audit", e);
        }
    }
}

6.4 消息队列实战

1. 订单处理消息队列

订单事件定义

OrderEvent.java

package com.example.orderservice.event;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.math.BigDecimal;
import java.util.List;

/**
 * 订单事件基类
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public abstract class OrderEvent implements Serializable {
    
    private Long orderId;
    private Long userId;
    private String orderNo;
    private BigDecimal totalAmount;
    private Long timestamp;
    private String eventType;
    
    public OrderEvent(Long orderId, Long userId, String orderNo, 
                     BigDecimal totalAmount, String eventType) {
        this.orderId = orderId;
        this.userId = userId;
        this.orderNo = orderNo;
        this.totalAmount = totalAmount;
        this.eventType = eventType;
        this.timestamp = System.currentTimeMillis();
    }
}

订单创建事件

OrderCreatedEvent.java

package com.example.orderservice.event;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

import java.math.BigDecimal;
import java.util.List;

/**
 * 订单创建事件
 */
@Data
@EqualsAndHashCode(callSuper = true)
@NoArgsConstructor
public class OrderCreatedEvent extends OrderEvent {
    
    private List<OrderItem> orderItems;
    private String shippingAddress;
    private String paymentMethod;
    
    public OrderCreatedEvent(Long orderId, Long userId, String orderNo, 
                           BigDecimal totalAmount, List<OrderItem> orderItems,
                           String shippingAddress, String paymentMethod) {
        super(orderId, userId, orderNo, totalAmount, "ORDER_CREATED");
        this.orderItems = orderItems;
        this.shippingAddress = shippingAddress;
        this.paymentMethod = paymentMethod;
    }
    
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class OrderItem {
        private Long productId;
        private String productName;
        private Integer quantity;
        private BigDecimal price;
    }
}

订单支付事件

OrderPaidEvent.java

package com.example.orderservice.event;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

import java.math.BigDecimal;

/**
 * 订单支付事件
 */
@Data
@EqualsAndHashCode(callSuper = true)
@NoArgsConstructor
public class OrderPaidEvent extends OrderEvent {
    
    private String paymentId;
    private String paymentMethod;
    private Long paymentTime;
    
    public OrderPaidEvent(Long orderId, Long userId, String orderNo, 
                         BigDecimal totalAmount, String paymentId, 
                         String paymentMethod, Long paymentTime) {
        super(orderId, userId, orderNo, totalAmount, "ORDER_PAID");
        this.paymentId = paymentId;
        this.paymentMethod = paymentMethod;
        this.paymentTime = paymentTime;
    }
}

2. RabbitMQ配置

消息队列配置

RabbitMQConfig.java

package com.example.orderservice.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ配置
 */
@Configuration
public class RabbitMQConfig {
    
    // 交换机名称
    public static final String ORDER_EXCHANGE = "order.exchange";
    public static final String DLX_EXCHANGE = "order.dlx.exchange";
    
    // 队列名称
    public static final String ORDER_CREATED_QUEUE = "order.created.queue";
    public static final String ORDER_PAID_QUEUE = "order.paid.queue";
    public static final String INVENTORY_QUEUE = "inventory.queue";
    public static final String PAYMENT_QUEUE = "payment.queue";
    public static final String NOTIFICATION_QUEUE = "notification.queue";
    
    // 死信队列
    public static final String ORDER_DLQ = "order.dlq";
    
    // 路由键
    public static final String ORDER_CREATED_ROUTING_KEY = "order.created";
    public static final String ORDER_PAID_ROUTING_KEY = "order.paid";
    public static final String INVENTORY_ROUTING_KEY = "inventory.update";
    public static final String PAYMENT_ROUTING_KEY = "payment.process";
    public static final String NOTIFICATION_ROUTING_KEY = "notification.send";
    
    /**
     * 消息转换器
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    
    /**
     * RabbitTemplate配置
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(messageConverter());
        template.setMandatory(true);
        template.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("消息发送成功");
            } else {
                System.out.println("消息发送失败: " + cause);
            }
        });
        template.setReturnsCallback(returned -> {
            System.out.println("消息被退回: " + returned.getMessage());
        });
        return template;
    }
    
    /**
     * 监听器容器工厂
     */
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(messageConverter());
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(5);
        factory.setPrefetchCount(1);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }
    
    // ========== 交换机定义 ==========
    
    /**
     * 订单主题交换机
     */
    @Bean
    public TopicExchange orderExchange() {
        return ExchangeBuilder
                .topicExchange(ORDER_EXCHANGE)
                .durable(true)
                .build();
    }
    
    /**
     * 死信交换机
     */
    @Bean
    public DirectExchange dlxExchange() {
        return ExchangeBuilder
                .directExchange(DLX_EXCHANGE)
                .durable(true)
                .build();
    }
    
    // ========== 队列定义 ==========
    
    /**
     * 订单创建队列
     */
    @Bean
    public Queue orderCreatedQueue() {
        return QueueBuilder
                .durable(ORDER_CREATED_QUEUE)
                .withArgument("x-dead-letter-exchange", DLX_EXCHANGE)
                .withArgument("x-dead-letter-routing-key", "order.dead")
                .withArgument("x-message-ttl", 86400000) // 24小时TTL
                .build();
    }
    
    /**
     * 订单支付队列
     */
    @Bean
    public Queue orderPaidQueue() {
        return QueueBuilder
                .durable(ORDER_PAID_QUEUE)
                .withArgument("x-dead-letter-exchange", DLX_EXCHANGE)
                .withArgument("x-dead-letter-routing-key", "order.dead")
                .withArgument("x-message-ttl", 86400000)
                .build();
    }
    
    /**
     * 库存队列
     */
    @Bean
    public Queue inventoryQueue() {
        return QueueBuilder
                .durable(INVENTORY_QUEUE)
                .withArgument("x-dead-letter-exchange", DLX_EXCHANGE)
                .withArgument("x-dead-letter-routing-key", "inventory.dead")
                .build();
    }
    
    /**
     * 支付队列
     */
    @Bean
    public Queue paymentQueue() {
        return QueueBuilder
                .durable(PAYMENT_QUEUE)
                .withArgument("x-dead-letter-exchange", DLX_EXCHANGE)
                .withArgument("x-dead-letter-routing-key", "payment.dead")
                .build();
    }
    
    /**
     * 通知队列
     */
    @Bean
    public Queue notificationQueue() {
        return QueueBuilder
                .durable(NOTIFICATION_QUEUE)
                .withArgument("x-dead-letter-exchange", DLX_EXCHANGE)
                .withArgument("x-dead-letter-routing-key", "notification.dead")
                .build();
    }
    
    /**
     * 死信队列
     */
    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder
                .durable(ORDER_DLQ)
                .build();
    }
    
    // ========== 绑定定义 ==========
    
    /**
     * 订单创建队列绑定
     */
    @Bean
    public Binding orderCreatedBinding() {
        return BindingBuilder
                .bind(orderCreatedQueue())
                .to(orderExchange())
                .with(ORDER_CREATED_ROUTING_KEY);
    }
    
    /**
     * 订单支付队列绑定
     */
    @Bean
    public Binding orderPaidBinding() {
        return BindingBuilder
                .bind(orderPaidQueue())
                .to(orderExchange())
                .with(ORDER_PAID_ROUTING_KEY);
    }
    
    /**
     * 库存队列绑定
     */
    @Bean
    public Binding inventoryBinding() {
        return BindingBuilder
                .bind(inventoryQueue())
                .to(orderExchange())
                .with(INVENTORY_ROUTING_KEY);
    }
    
    /**
     * 支付队列绑定
     */
    @Bean
    public Binding paymentBinding() {
        return BindingBuilder
                .bind(paymentQueue())
                .to(orderExchange())
                .with(PAYMENT_ROUTING_KEY);
    }
    
    /**
     * 通知队列绑定
     */
    @Bean
    public Binding notificationBinding() {
        return BindingBuilder
                .bind(notificationQueue())
                .to(orderExchange())
                .with(NOTIFICATION_ROUTING_KEY);
    }
    
    /**
     * 死信队列绑定
     */
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder
                .bind(deadLetterQueue())
                .to(dlxExchange())
                .with("order.dead");
    }
}

3. 消息发送

订单消息发送服务

OrderMessageSender.java

package com.example.orderservice.service;

import com.example.orderservice.config.RabbitMQConfig;
import com.example.orderservice.event.OrderCreatedEvent;
import com.example.orderservice.event.OrderPaidEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

/**
 * 订单消息发送服务
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderMessageSender {
    
    private final RabbitTemplate rabbitTemplate;
    
    /**
     * 发送订单创建消息
     */
    public void sendOrderCreatedMessage(OrderCreatedEvent event) {
        try {
            log.info("Sending order created message: {}", event.getOrderNo());
            
            rabbitTemplate.convertAndSend(
                RabbitMQConfig.ORDER_EXCHANGE,
                RabbitMQConfig.ORDER_CREATED_ROUTING_KEY,
                event
            );
            
            log.info("Order created message sent successfully: {}", event.getOrderNo());
        } catch (Exception e) {
            log.error("Failed to send order created message: {}", event.getOrderNo(), e);
            throw new RuntimeException("消息发送失败", e);
        }
    }
    
    /**
     * 发送订单支付消息
     */
    public void sendOrderPaidMessage(OrderPaidEvent event) {
        try {
            log.info("Sending order paid message: {}", event.getOrderNo());
            
            rabbitTemplate.convertAndSend(
                RabbitMQConfig.ORDER_EXCHANGE,
                RabbitMQConfig.ORDER_PAID_ROUTING_KEY,
                event
            );
            
            log.info("Order paid message sent successfully: {}", event.getOrderNo());
        } catch (Exception e) {
            log.error("Failed to send order paid message: {}", event.getOrderNo(), e);
            throw new RuntimeException("消息发送失败", e);
        }
    }
    
    /**
     * 发送库存扣减消息
     */
    public void sendInventoryUpdateMessage(Long productId, Integer quantity, String operation) {
        try {
            InventoryUpdateMessage message = new InventoryUpdateMessage(
                productId, quantity, operation, System.currentTimeMillis()
            );
            
            log.info("Sending inventory update message: {}", message);
            
            rabbitTemplate.convertAndSend(
                RabbitMQConfig.ORDER_EXCHANGE,
                RabbitMQConfig.INVENTORY_ROUTING_KEY,
                message
            );
            
            log.info("Inventory update message sent successfully");
        } catch (Exception e) {
            log.error("Failed to send inventory update message", e);
            throw new RuntimeException("库存消息发送失败", e);
        }
    }
    
    /**
     * 库存更新消息
     */
    public static class InventoryUpdateMessage {
        private Long productId;
        private Integer quantity;
        private String operation; // DECREASE, INCREASE
        private Long timestamp;
        
        // 构造函数、getter、setter省略
    }
}

4. 消息接收

库存服务消息监听器

InventoryMessageListener.java

package com.example.inventoryservice.listener;

import com.example.orderservice.event.OrderCreatedEvent;
import com.example.orderservice.service.OrderMessageSender.InventoryUpdateMessage;
import com.rabbitmq.client.Channel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 库存服务消息监听器
 */
@Component
@RequiredArgsConstructor
@Slf4j
public class InventoryMessageListener {
    
    private final InventoryService inventoryService;
    
    /**
     * 监听订单创建消息,扣减库存
     */
    @RabbitListener(queues = "order.created.queue")
    public void handleOrderCreated(OrderCreatedEvent event, Message message, Channel channel) {
        log.info("Received order created message: {}", event.getOrderNo());
        
        try {
            // 扣减库存
            for (OrderCreatedEvent.OrderItem item : event.getOrderItems()) {
                inventoryService.decreaseInventory(
                    item.getProductId(),
                    item.getQuantity()
                );
            }
            
            log.info("Inventory decreased for order: {}", event.getOrderNo());
            
            // 手动确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            
        } catch (Exception e) {
            log.error("Failed to decrease inventory for order: {}", event.getOrderNo(), e);
            
            try {
                // 拒绝消息,不重新入队(进入死信队列)
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            } catch (IOException ioException) {
                log.error("Failed to nack message", ioException);
            }
        }
    }
    
    /**
     * 监听库存更新消息
     */
    @RabbitListener(queues = "inventory.queue")
    public void handleInventoryUpdate(InventoryUpdateMessage message, Message msg, Channel channel) {
        log.info("Received inventory update message: {}", message);
        
        try {
            if ("DECREASE".equals(message.getOperation())) {
                inventoryService.decreaseInventory(
                    message.getProductId(),
                    message.getQuantity()
                );
            } else if ("INCREASE".equals(message.getOperation())) {
                inventoryService.increaseInventory(
                    message.getProductId(),
                    message.getQuantity()
                );
            }
            
            log.info("Inventory updated successfully");
            
            // 手动确认消息
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
            
        } catch (Exception e) {
            log.error("Failed to update inventory", e);
            
            try {
                // 重试机制:重新入队
                channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);
            } catch (IOException ioException) {
                log.error("Failed to nack message", ioException);
            }
        }
    }
}

支付服务消息监听器

PaymentMessageListener.java

package com.example.paymentservice.listener;

import com.example.orderservice.event.OrderCreatedEvent;
import com.example.orderservice.event.OrderPaidEvent;
import com.rabbitmq.client.Channel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 支付服务消息监听器
 */
@Component
@RequiredArgsConstructor
@Slf4j
public class PaymentMessageListener {
    
    private final PaymentService paymentService;
    
    /**
     * 监听订单创建消息,创建支付记录
     */
    @RabbitListener(queues = "payment.queue")
    public void handleOrderCreated(OrderCreatedEvent event, Message message, Channel channel) {
        log.info("Received order created message for payment: {}", event.getOrderNo());
        
        try {
            // 创建支付记录
            paymentService.createPaymentRecord(
                event.getOrderId(),
                event.getOrderNo(),
                event.getTotalAmount(),
                event.getPaymentMethod()
            );
            
            log.info("Payment record created for order: {}", event.getOrderNo());
            
            // 手动确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            
        } catch (Exception e) {
            log.error("Failed to create payment record for order: {}", event.getOrderNo(), e);
            
            try {
                // 重试机制
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            } catch (IOException ioException) {
                log.error("Failed to nack message", ioException);
            }
        }
    }
    
    /**
     * 监听订单支付消息,更新支付状态
     */
    @RabbitListener(queues = "order.paid.queue")
    public void handleOrderPaid(OrderPaidEvent event, Message message, Channel channel) {
        log.info("Received order paid message: {}", event.getOrderNo());
        
        try {
            // 更新支付状态
            paymentService.updatePaymentStatus(
                event.getPaymentId(),
                "PAID",
                event.getPaymentTime()
            );
            
            log.info("Payment status updated for order: {}", event.getOrderNo());
            
            // 手动确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            
        } catch (Exception e) {
            log.error("Failed to update payment status for order: {}", event.getOrderNo(), e);
            
            try {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            } catch (IOException ioException) {
                log.error("Failed to nack message", ioException);
            }
        }
    }
}

6.5 消息可靠性保证

1. 发送端确认

发送确认配置

MessageReliabilityConfig.java

package com.example.orderservice.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

/**
 * 消息可靠性配置
 */
@Configuration
@Slf4j
public class MessageReliabilityConfig {
    
    /**
     * 可靠性RabbitTemplate
     */
    @Bean
    public RabbitTemplate reliableRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        
        // 开启发送确认
        template.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("Message sent successfully, correlationData: {}", correlationData);
            } else {
                log.error("Message send failed, correlationData: {}, cause: {}", 
                         correlationData, cause);
                // 可以在这里实现重发逻辑
                handleSendFailure(correlationData, cause);
            }
        });
        
        // 开启返回确认
        template.setReturnsCallback(returned -> {
            log.error("Message returned: {}, replyCode: {}, replyText: {}", 
                     returned.getMessage(), returned.getReplyCode(), returned.getReplyText());
            // 处理消息被退回的情况
            handleMessageReturned(returned);
        });
        
        // 设置为强制性
        template.setMandatory(true);
        
        // 设置重试模板
        template.setRetryTemplate(retryTemplate());
        
        return template;
    }
    
    /**
     * 重试模板
     */
    @Bean
    public RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        
        // 重试策略
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        retryTemplate.setRetryPolicy(retryPolicy);
        
        // 退避策略
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2);
        backOffPolicy.setMaxInterval(10000);
        retryTemplate.setBackOffPolicy(backOffPolicy);
        
        return retryTemplate;
    }
    
    private void handleSendFailure(Object correlationData, String cause) {
        // 实现发送失败的处理逻辑
        // 例如:记录到数据库,等待重发
        log.error("Handling send failure for correlationData: {}", correlationData);
    }
    
    private void handleMessageReturned(org.springframework.amqp.core.ReturnedMessage returned) {
        // 实现消息被退回的处理逻辑
        // 例如:记录到数据库,人工处理
        log.error("Handling returned message: {}", returned.getMessage());
    }
}

2. 消息持久化

持久化消息发送

ReliableMessageSender.java

package com.example.orderservice.service;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.UUID;

/**
 * 可靠消息发送服务
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class ReliableMessageSender {
    
    private final RabbitTemplate rabbitTemplate;
    private final MessageLogService messageLogService;
    
    /**
     * 可靠消息发送
     */
    @Transactional
    public void sendReliableMessage(String exchange, String routingKey, Object message) {
        // 生成消息ID
        String messageId = UUID.randomUUID().toString();
        
        // 记录消息日志
        messageLogService.saveMessageLog(messageId, exchange, routingKey, message, "SENDING");
        
        try {
            // 创建关联数据
            CorrelationData correlationData = new CorrelationData(messageId);
            
            // 设置消息属性
            MessageProperties properties = new MessageProperties();
            properties.setMessageId(messageId);
            properties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE); // 持久化
            
            // 发送消息
            rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
            
            log.info("Reliable message sent, messageId: {}", messageId);
            
        } catch (Exception e) {
            // 更新消息状态为失败
            messageLogService.updateMessageStatus(messageId, "FAILED");
            log.error("Failed to send reliable message, messageId: {}", messageId, e);
            throw e;
        }
    }
    
    /**
     * 处理发送确认
     */
    public void handleConfirm(CorrelationData correlationData, boolean ack, String cause) {
        String messageId = correlationData.getId();
        
        if (ack) {
            // 更新消息状态为成功
            messageLogService.updateMessageStatus(messageId, "SUCCESS");
            log.info("Message confirmed successfully, messageId: {}", messageId);
        } else {
            // 更新消息状态为失败
            messageLogService.updateMessageStatus(messageId, "FAILED");
            log.error("Message confirm failed, messageId: {}, cause: {}", messageId, cause);
            
            // 可以实现重发逻辑
            scheduleRetry(messageId);
        }
    }
    
    private void scheduleRetry(String messageId) {
        // 实现重发调度逻辑
        log.info("Scheduling retry for message: {}", messageId);
    }
}

消息日志服务

MessageLogService.java

package com.example.orderservice.service;

import com.example.orderservice.entity.MessageLog;
import com.example.orderservice.repository.MessageLogRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;

/**
 * 消息日志服务
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class MessageLogService {
    
    private final MessageLogRepository messageLogRepository;
    private final ObjectMapper objectMapper;
    
    /**
     * 保存消息日志
     */
    @Transactional
    public void saveMessageLog(String messageId, String exchange, String routingKey, 
                              Object message, String status) {
        try {
            MessageLog messageLog = new MessageLog();
            messageLog.setMessageId(messageId);
            messageLog.setExchange(exchange);
            messageLog.setRoutingKey(routingKey);
            messageLog.setMessageContent(objectMapper.writeValueAsString(message));
            messageLog.setStatus(status);
            messageLog.setCreateTime(System.currentTimeMillis());
            messageLog.setRetryCount(0);
            
            messageLogRepository.save(messageLog);
            
            log.info("Message log saved, messageId: {}", messageId);
        } catch (Exception e) {
            log.error("Failed to save message log, messageId: {}", messageId, e);
        }
    }
    
    /**
     * 更新消息状态
     */
    @Transactional
    public void updateMessageStatus(String messageId, String status) {
        try {
            MessageLog messageLog = messageLogRepository.findByMessageId(messageId);
            if (messageLog != null) {
                messageLog.setStatus(status);
                messageLog.setUpdateTime(System.currentTimeMillis());
                messageLogRepository.save(messageLog);
                
                log.info("Message status updated, messageId: {}, status: {}", messageId, status);
            }
        } catch (Exception e) {
            log.error("Failed to update message status, messageId: {}", messageId, e);
        }
    }
    
    /**
     * 获取失败的消息
     */
    public List<MessageLog> getFailedMessages() {
        return messageLogRepository.findByStatusAndRetryCountLessThan("FAILED", 3);
    }
    
    /**
     * 增加重试次数
     */
    @Transactional
    public void incrementRetryCount(String messageId) {
        MessageLog messageLog = messageLogRepository.findByMessageId(messageId);
        if (messageLog != null) {
            messageLog.setRetryCount(messageLog.getRetryCount() + 1);
            messageLog.setUpdateTime(System.currentTimeMillis());
            messageLogRepository.save(messageLog);
        }
    }
}

3. 消费端幂等性

幂等性处理

IdempotentMessageProcessor.java

package com.example.inventoryservice.service;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

/**
 * 幂等性消息处理器
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class IdempotentMessageProcessor {
    
    private final StringRedisTemplate redisTemplate;
    private static final String IDEMPOTENT_KEY_PREFIX = "msg:idempotent:";
    private static final long IDEMPOTENT_EXPIRE_TIME = 24; // 24小时
    
    /**
     * 检查消息是否已处理
     */
    public boolean isMessageProcessed(String messageId) {
        String key = IDEMPOTENT_KEY_PREFIX + messageId;
        return Boolean.TRUE.equals(redisTemplate.hasKey(key));
    }
    
    /**
     * 标记消息已处理
     */
    public void markMessageProcessed(String messageId) {
        String key = IDEMPOTENT_KEY_PREFIX + messageId;
        redisTemplate.opsForValue().set(key, "processed", IDEMPOTENT_EXPIRE_TIME, TimeUnit.HOURS);
        log.info("Message marked as processed: {}", messageId);
    }
    
    /**
     * 幂等性处理包装器
     */
    public <T> boolean processIdempotent(String messageId, T message, 
                                        MessageProcessor<T> processor) {
        // 检查是否已处理
        if (isMessageProcessed(messageId)) {
            log.info("Message already processed, skipping: {}", messageId);
            return true;
        }
        
        try {
            // 处理消息
            processor.process(message);
            
            // 标记为已处理
            markMessageProcessed(messageId);
            
            log.info("Message processed successfully: {}", messageId);
            return true;
            
        } catch (Exception e) {
            log.error("Failed to process message: {}", messageId, e);
            return false;
        }
    }
    
    /**
     * 消息处理器接口
     */
    @FunctionalInterface
    public interface MessageProcessor<T> {
        void process(T message) throws Exception;
    }
}

6.6 死信队列处理

1. 死信队列配置

死信队列处理器

DeadLetterQueueHandler.java

package com.example.orderservice.service;

import com.rabbitmq.client.Channel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * 死信队列处理器
 */
@Component
@RequiredArgsConstructor
@Slf4j
public class DeadLetterQueueHandler {
    
    private final DeadLetterService deadLetterService;
    private final NotificationService notificationService;
    
    /**
     * 处理死信消息
     */
    @RabbitListener(queues = "order.dlq")
    public void handleDeadLetter(Message message, Channel channel) {
        log.error("Received dead letter message: {}", 
                 new String(message.getBody(), StandardCharsets.UTF_8));
        
        try {
            // 记录死信消息
            deadLetterService.recordDeadLetter(
                message.getMessageProperties().getMessageId(),
                new String(message.getBody(), StandardCharsets.UTF_8),
                message.getMessageProperties().getReceivedExchange(),
                message.getMessageProperties().getReceivedRoutingKey(),
                getFailureReason(message)
            );
            
            // 发送告警通知
            notificationService.sendDeadLetterAlert(
                message.getMessageProperties().getMessageId(),
                getFailureReason(message)
            );
            
            // 确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            
            log.info("Dead letter message processed: {}", 
                    message.getMessageProperties().getMessageId());
            
        } catch (Exception e) {
            log.error("Failed to process dead letter message", e);
            
            try {
                // 拒绝消息,避免无限循环
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            } catch (IOException ioException) {
                log.error("Failed to nack dead letter message", ioException);
            }
        }
    }
    
    private String getFailureReason(Message message) {
        // 从消息头中获取失败原因
        Object reason = message.getMessageProperties().getHeaders().get("x-death-reason");
        return reason != null ? reason.toString() : "Unknown";
    }
}

2. 死信服务

DeadLetterService.java

package com.example.orderservice.service;

import com.example.orderservice.entity.DeadLetterRecord;
import com.example.orderservice.repository.DeadLetterRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;

/**
 * 死信服务
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class DeadLetterService {
    
    private final DeadLetterRepository deadLetterRepository;
    
    /**
     * 记录死信消息
     */
    @Transactional
    public void recordDeadLetter(String messageId, String messageContent, 
                                String exchange, String routingKey, String reason) {
        DeadLetterRecord record = new DeadLetterRecord();
        record.setMessageId(messageId);
        record.setMessageContent(messageContent);
        record.setExchange(exchange);
        record.setRoutingKey(routingKey);
        record.setFailureReason(reason);
        record.setStatus("PENDING");
        record.setCreateTime(System.currentTimeMillis());
        
        deadLetterRepository.save(record);
        
        log.info("Dead letter record saved: {}", messageId);
    }
    
    /**
     * 获取待处理的死信消息
     */
    public List<DeadLetterRecord> getPendingDeadLetters() {
        return deadLetterRepository.findByStatus("PENDING");
    }
    
    /**
     * 重新发送死信消息
     */
    @Transactional
    public void resendDeadLetter(Long recordId) {
        DeadLetterRecord record = deadLetterRepository.findById(recordId)
            .orElseThrow(() -> new RuntimeException("死信记录不存在"));
        
        try {
            // 重新发送逻辑
            // ...
            
            record.setStatus("RESENT");
            record.setUpdateTime(System.currentTimeMillis());
            deadLetterRepository.save(record);
            
            log.info("Dead letter message resent: {}", record.getMessageId());
            
        } catch (Exception e) {
            record.setStatus("FAILED");
            record.setUpdateTime(System.currentTimeMillis());
            deadLetterRepository.save(record);
            
            log.error("Failed to resend dead letter message: {}", record.getMessageId(), e);
            throw e;
        }
    }
}

6.7 消息监控

1. 消息监控配置

MessageMonitoringConfig.java

package com.example.orderservice.config;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 消息监控配置
 */
@Configuration
@RequiredArgsConstructor
public class MessageMonitoringConfig {
    
    private final MeterRegistry meterRegistry;
    
    /**
     * 带监控的监听器容器工厂
     */
    @Bean
    public RabbitListenerContainerFactory<?> monitoringRabbitListenerContainerFactory(
            ConnectionFactory connectionFactory) {
        
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        
        // 添加消息处理时间监控
        factory.setAfterReceivePostProcessors(message -> {
            Timer.Sample sample = Timer.start(meterRegistry);
            message.getMessageProperties().getHeaders().put("timer.sample", sample);
            return message;
        });
        
        return factory;
    }
}

2. 消息指标收集

MessageMetricsCollector.java

package com.example.orderservice.service;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;

/**
 * 消息指标收集器
 */
@Service
@RequiredArgsConstructor
public class MessageMetricsCollector {
    
    private final MeterRegistry meterRegistry;
    
    private Counter messagesSentCounter;
    private Counter messagesReceivedCounter;
    private Counter messagesFailedCounter;
    private Timer messageProcessingTimer;
    
    @PostConstruct
    public void init() {
        messagesSentCounter = Counter.builder("messages.sent")
            .description("Number of messages sent")
            .register(meterRegistry);
        
        messagesReceivedCounter = Counter.builder("messages.received")
            .description("Number of messages received")
            .register(meterRegistry);
        
        messagesFailedCounter = Counter.builder("messages.failed")
            .description("Number of failed messages")
            .register(meterRegistry);
        
        messageProcessingTimer = Timer.builder("message.processing.time")
            .description("Message processing time")
            .register(meterRegistry);
    }
    
    public void incrementMessagesSent() {
        messagesSentCounter.increment();
    }
    
    public void incrementMessagesReceived() {
        messagesReceivedCounter.increment();
    }
    
    public void incrementMessagesFailed() {
        messagesFailedCounter.increment();
    }
    
    public Timer.Sample startProcessingTimer() {
        return Timer.start(meterRegistry);
    }
    
    public void recordProcessingTime(Timer.Sample sample) {
        sample.stop(messageProcessingTimer);
    }
}

6.8 总结

核心概念回顾

  1. 消息总线的作用

    • 异步通信:提高系统响应性和吞吐量
    • 服务解耦:降低服务间的耦合度
    • 事件驱动:支持事件驱动架构
    • 消息路由:智能的消息路由和分发
  2. Spring Cloud Bus特性

    • 配置刷新:实现配置的自动刷新
    • 事件传播:支持自定义事件的传播
    • 服务状态同步:同步服务状态信息
    • 集群管理:支持集群节点管理
  3. 消息可靠性保证

    • 发送端确认:确保消息成功发送到Broker
    • 消息持久化:保证消息不丢失
    • 消费端确认:确保消息被正确处理
    • 幂等性处理:避免重复处理

最佳实践

  1. 消息设计

    • 消息结构清晰,包含必要的业务信息
    • 消息大小适中,避免过大的消息
    • 消息版本化,支持向后兼容
    • 消息幂等性设计
  2. 可靠性保证

    • 开启发送端和消费端确认
    • 实现消息重试机制
    • 使用死信队列处理失败消息
    • 记录消息处理日志
  3. 性能优化

    • 合理设置消费者并发数
    • 使用批量处理提高效率
    • 监控队列长度和处理速度
    • 优化消息序列化方式
  4. 监控运维

    • 监控消息发送和消费指标
    • 设置告警规则
    • 定期清理过期消息
    • 备份重要消息数据

注意事项

  1. 消息顺序

    • RabbitMQ不保证全局消息顺序
    • 需要顺序处理时使用单一队列
    • 考虑使用消息分区
  2. 消息重复

    • 网络异常可能导致消息重复
    • 实现幂等性处理机制
    • 使用唯一消息ID
  3. 资源管理

    • 合理配置连接池大小
    • 及时关闭不用的连接
    • 监控内存和CPU使用情况
  4. 安全考虑

    • 使用SSL/TLS加密传输
    • 配置访问权限控制
    • 敏感信息加密存储

扩展方向

  1. 高级功能

    • 消息路由规则引擎
    • 消息转换和过滤
    • 消息聚合和分发
    • 消息流处理
  2. 集成方案

    • 集成Apache Kafka
    • 集成Apache Pulsar
    • 集成Redis Streams
    • 集成云消息服务

通过本章学习,你应该掌握了Spring Cloud Bus和RabbitMQ的核心概念和实际应用。消息总线是微服务架构中实现异步通信和事件驱动的重要组件,合理使用能够显著提升系统的可扩展性和可靠性。

下一章我们将学习服务熔断与降级,了解如何在微服务架构中实现故障隔离和系统保护。