6.1 消息总线概述
什么是消息总线
消息总线(Message Bus)是微服务架构中用于实现服务间异步通信的基础设施。它提供了发布-订阅模式的消息传递机制,使得服务之间可以通过消息进行解耦的通信。
消息总线的作用
┌─────────────────────────────────────────────────────────────────┐
│ 消息总线架构图 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ User Service │ │ Product Service │ │ Order Service │ │
│ │ │ │ │ │ │ │
│ │ ┌───────────┐ │ │ ┌───────────┐ │ │ ┌───────────┐ │ │
│ │ │ Publisher │ │ │ │Subscriber │ │ │ │Publisher/ │ │ │
│ │ │ │ │ │ │ │ │ │ │Subscriber │ │ │
│ │ └─────┬─────┘ │ │ └─────┬─────┘ │ │ └─────┬─────┘ │ │
│ └────────┼────────┘ └────────┼────────┘ └────────┼────────┘ │
│ │ │ │ │
│ └──────────────────────┼──────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 消息总线 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │ │
│ │ │ 消息路由 │ │ 消息存储 │ │ 消息分发 │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────────────────┘ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │ │
│ │ │ 消息过滤 │ │ 消息转换 │ │ 消息监控 │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Payment Service │ │ Notification │ │ Audit Service │ │
│ │ │ │ Service │ │ │ │
│ │ ┌───────────┐ │ │ ┌───────────┐ │ │ ┌───────────┐ │ │
│ │ │Subscriber │ │ │ │Subscriber │ │ │ │Subscriber │ │ │
│ │ │ │ │ │ │ │ │ │ │ │ │ │
│ │ └───────────┘ │ │ └───────────┘ │ │ └───────────┘ │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
核心功能
- 异步通信:服务间通过消息进行异步通信,提高系统响应性
- 服务解耦:发布者和订阅者之间松耦合,降低系统复杂度
- 事件驱动:支持事件驱动架构,实现业务流程的自动化
- 消息路由:根据消息类型和规则进行智能路由
- 消息持久化:保证消息的可靠传递和持久化存储
- 负载均衡:支持消息的负载均衡分发
- 故障恢复:提供消息重试和死信队列机制
- 监控管理:提供消息传递的监控和管理功能
消息模式
- 点对点模式(Queue):一对一的消息传递
- 发布订阅模式(Topic):一对多的消息广播
- 请求响应模式(RPC):同步的请求响应通信
- 工作队列模式:任务分发和负载均衡
6.2 Spring Cloud Bus
Bus简介
Spring Cloud Bus是Spring Cloud生态系统中的消息总线解决方案,它使用轻量级的消息代理(如RabbitMQ、Kafka)来连接分布式系统的节点,实现配置刷新、服务状态同步等功能。
核心特性
- 配置刷新:实现配置的自动刷新和同步
- 事件传播:支持自定义事件的传播
- 服务状态同步:同步服务的状态信息
- 集群管理:支持集群节点的管理
6.3 RabbitMQ集成
1. RabbitMQ安装配置
Docker安装RabbitMQ
docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3.11-management
container_name: rabbitmq
hostname: rabbitmq
ports:
- "5672:5672" # AMQP端口
- "15672:15672" # 管理界面端口
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: admin123
RABBITMQ_DEFAULT_VHOST: /
volumes:
- rabbitmq_data:/var/lib/rabbitmq
- ./rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
- ./rabbitmq/definitions.json:/etc/rabbitmq/definitions.json
networks:
- microservice-network
restart: unless-stopped
volumes:
rabbitmq_data:
networks:
microservice-network:
driver: bridge
RabbitMQ配置文件
rabbitmq/rabbitmq.conf
# RabbitMQ配置文件
# 网络配置
listeners.tcp.default = 5672
management.tcp.port = 15672
# 集群配置
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@rabbitmq
# 内存和磁盘限制
vm_memory_high_watermark.relative = 0.6
disk_free_limit.relative = 2.0
# 日志配置
log.console = true
log.console.level = info
log.file = /var/log/rabbitmq/rabbit.log
log.file.level = info
# 管理插件
management.load_definitions = /etc/rabbitmq/definitions.json
# 心跳配置
heartbeat = 60
# 连接配置
num_acceptors.tcp = 10
handshake_timeout = 10000
预定义配置
rabbitmq/definitions.json
{
"rabbit_version": "3.11.0",
"rabbitmq_version": "3.11.0",
"users": [
{
"name": "admin",
"password_hash": "hashed_password",
"hashing_algorithm": "rabbit_password_hashing_sha256",
"tags": "administrator"
},
{
"name": "microservice",
"password_hash": "hashed_password",
"hashing_algorithm": "rabbit_password_hashing_sha256",
"tags": "management"
}
],
"vhosts": [
{
"name": "/"
},
{
"name": "/microservice"
}
],
"permissions": [
{
"user": "admin",
"vhost": "/",
"configure": ".*",
"write": ".*",
"read": ".*"
},
{
"user": "microservice",
"vhost": "/microservice",
"configure": ".*",
"write": ".*",
"read": ".*"
}
],
"exchanges": [
{
"name": "microservice.topic",
"vhost": "/microservice",
"type": "topic",
"durable": true,
"auto_delete": false,
"internal": false,
"arguments": {}
},
{
"name": "microservice.direct",
"vhost": "/microservice",
"type": "direct",
"durable": true,
"auto_delete": false,
"internal": false,
"arguments": {}
}
],
"queues": [
{
"name": "user.events",
"vhost": "/microservice",
"durable": true,
"auto_delete": false,
"arguments": {
"x-message-ttl": 86400000,
"x-max-length": 10000
}
},
{
"name": "order.events",
"vhost": "/microservice",
"durable": true,
"auto_delete": false,
"arguments": {
"x-message-ttl": 86400000,
"x-max-length": 10000
}
}
],
"bindings": [
{
"source": "microservice.topic",
"vhost": "/microservice",
"destination": "user.events",
"destination_type": "queue",
"routing_key": "user.*",
"arguments": {}
},
{
"source": "microservice.topic",
"vhost": "/microservice",
"destination": "order.events",
"destination_type": "queue",
"routing_key": "order.*",
"arguments": {}
}
]
}
2. Spring Cloud Bus配置
添加依赖
pom.xml
<!-- Spring Cloud Bus -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<!-- Spring Boot AMQP -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Actuator -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
配置文件
application.yml
spring:
application:
name: user-service
# RabbitMQ配置
rabbitmq:
host: localhost
port: 5672
username: microservice
password: microservice123
virtual-host: /microservice
# 连接配置
connection-timeout: 15000
requested-heartbeat: 60
# 发布者确认
publisher-confirm-type: correlated
publisher-returns: true
# 消费者配置
listener:
simple:
# 手动确认
acknowledge-mode: manual
# 并发消费者数量
concurrency: 1
max-concurrency: 5
# 预取数量
prefetch: 1
# 重试配置
retry:
enabled: true
initial-interval: 1000
max-attempts: 3
max-interval: 10000
multiplier: 2
# 死信队列
default-requeue-rejected: false
# 模板配置
template:
mandatory: true
receive-timeout: 5000
reply-timeout: 5000
retry:
enabled: true
initial-interval: 1000
max-attempts: 3
max-interval: 10000
multiplier: 2
# Spring Cloud Bus配置
cloud:
bus:
enabled: true
refresh:
enabled: true
env:
enabled: true
trace:
enabled: true
destination: springCloudBus
ack:
enabled: true
destination-service: ${spring.application.name}
# 监控配置
management:
endpoints:
web:
exposure:
include: bus-refresh,bus-env,health,info,metrics
endpoint:
health:
show-details: always
bus-refresh:
enabled: true
bus-env:
enabled: true
# 日志配置
logging:
level:
org.springframework.amqp: DEBUG
org.springframework.cloud.bus: DEBUG
com.example: DEBUG
3. 自定义事件
用户事件定义
UserEvent.java
package com.example.userservice.event;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
/**
* 用户事件基类
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public abstract class UserEvent extends RemoteApplicationEvent {
private Long userId;
private String username;
private Long timestamp;
public UserEvent(Object source, String originService, String destinationService,
Long userId, String username) {
super(source, originService, destinationService);
this.userId = userId;
this.username = username;
this.timestamp = System.currentTimeMillis();
}
}
用户注册事件
UserRegisteredEvent.java
package com.example.userservice.event;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
/**
* 用户注册事件
*/
@Data
@EqualsAndHashCode(callSuper = true)
@NoArgsConstructor
public class UserRegisteredEvent extends UserEvent {
private String email;
private String phone;
public UserRegisteredEvent(Object source, String originService, String destinationService,
Long userId, String username, String email, String phone) {
super(source, originService, destinationService, userId, username);
this.email = email;
this.phone = phone;
}
}
用户登录事件
UserLoginEvent.java
package com.example.userservice.event;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
/**
* 用户登录事件
*/
@Data
@EqualsAndHashCode(callSuper = true)
@NoArgsConstructor
public class UserLoginEvent extends UserEvent {
private String ipAddress;
private String userAgent;
private String loginType;
public UserLoginEvent(Object source, String originService, String destinationService,
Long userId, String username, String ipAddress,
String userAgent, String loginType) {
super(source, originService, destinationService, userId, username);
this.ipAddress = ipAddress;
this.userAgent = userAgent;
this.loginType = loginType;
}
}
4. 事件发布
事件发布服务
EventPublisher.java
package com.example.userservice.service;
import com.example.userservice.event.UserLoginEvent;
import com.example.userservice.event.UserRegisteredEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.bus.BusProperties;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
/**
* 事件发布服务
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class EventPublisher {
private final ApplicationEventPublisher applicationEventPublisher;
private final BusProperties busProperties;
/**
* 发布用户注册事件
*/
public void publishUserRegisteredEvent(Long userId, String username,
String email, String phone) {
UserRegisteredEvent event = new UserRegisteredEvent(
this,
busProperties.getId(),
null, // 广播到所有服务
userId,
username,
email,
phone
);
log.info("Publishing user registered event: {}", event);
applicationEventPublisher.publishEvent(event);
}
/**
* 发布用户登录事件
*/
public void publishUserLoginEvent(Long userId, String username,
String ipAddress, String userAgent, String loginType) {
UserLoginEvent event = new UserLoginEvent(
this,
busProperties.getId(),
null, // 广播到所有服务
userId,
username,
ipAddress,
userAgent,
loginType
);
log.info("Publishing user login event: {}", event);
applicationEventPublisher.publishEvent(event);
}
/**
* 发布事件到特定服务
*/
public void publishEventToService(Object event, String destinationService) {
log.info("Publishing event to service {}: {}", destinationService, event);
applicationEventPublisher.publishEvent(event);
}
}
在业务服务中发布事件
UserService.java
package com.example.userservice.service;
import com.example.userservice.entity.User;
import com.example.userservice.repository.UserRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.servlet.http.HttpServletRequest;
/**
* 用户服务
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class UserService {
private final UserRepository userRepository;
private final PasswordEncoder passwordEncoder;
private final EventPublisher eventPublisher;
/**
* 用户注册
*/
@Transactional
public User register(String username, String password, String email, String phone) {
// 检查用户是否已存在
if (userRepository.existsByUsername(username)) {
throw new RuntimeException("用户名已存在");
}
if (userRepository.existsByEmail(email)) {
throw new RuntimeException("邮箱已被注册");
}
// 创建用户
User user = new User();
user.setUsername(username);
user.setPassword(passwordEncoder.encode(password));
user.setEmail(email);
user.setPhone(phone);
user.setEnabled(true);
user.setCreateTime(System.currentTimeMillis());
User savedUser = userRepository.save(user);
// 发布用户注册事件
eventPublisher.publishUserRegisteredEvent(
savedUser.getId(),
savedUser.getUsername(),
savedUser.getEmail(),
savedUser.getPhone()
);
log.info("User registered successfully: {}", savedUser.getUsername());
return savedUser;
}
/**
* 用户登录
*/
public String login(String username, String password, HttpServletRequest request) {
User user = userRepository.findByUsername(username)
.orElseThrow(() -> new RuntimeException("用户不存在"));
if (!passwordEncoder.matches(password, user.getPassword())) {
throw new RuntimeException("密码错误");
}
if (!user.getEnabled()) {
throw new RuntimeException("用户已被禁用");
}
// 生成JWT Token
String token = generateJwtToken(user);
// 发布用户登录事件
eventPublisher.publishUserLoginEvent(
user.getId(),
user.getUsername(),
getClientIpAddress(request),
request.getHeader("User-Agent"),
"password"
);
log.info("User logged in successfully: {}", user.getUsername());
return token;
}
private String generateJwtToken(User user) {
// JWT Token生成逻辑
return "jwt_token_" + user.getId();
}
private String getClientIpAddress(HttpServletRequest request) {
String xForwardedFor = request.getHeader("X-Forwarded-For");
if (xForwardedFor != null && !xForwardedFor.isEmpty()) {
return xForwardedFor.split(",")[0].trim();
}
String xRealIp = request.getHeader("X-Real-IP");
if (xRealIp != null && !xRealIp.isEmpty()) {
return xRealIp;
}
return request.getRemoteAddr();
}
}
5. 事件监听
通知服务事件监听器
NotificationEventListener.java
package com.example.notificationservice.listener;
import com.example.userservice.event.UserLoginEvent;
import com.example.userservice.event.UserRegisteredEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
/**
* 通知服务事件监听器
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class NotificationEventListener {
private final EmailService emailService;
private final SmsService smsService;
/**
* 监听用户注册事件
*/
@EventListener
public void handleUserRegisteredEvent(UserRegisteredEvent event) {
log.info("Received user registered event: {}", event);
try {
// 发送欢迎邮件
emailService.sendWelcomeEmail(
event.getEmail(),
event.getUsername()
);
// 发送欢迎短信
if (event.getPhone() != null && !event.getPhone().isEmpty()) {
smsService.sendWelcomeSms(
event.getPhone(),
event.getUsername()
);
}
log.info("Welcome notifications sent for user: {}", event.getUsername());
} catch (Exception e) {
log.error("Failed to send welcome notifications for user: {}",
event.getUsername(), e);
}
}
/**
* 监听用户登录事件
*/
@EventListener
public void handleUserLoginEvent(UserLoginEvent event) {
log.info("Received user login event: {}", event);
try {
// 检查是否为异常登录
if (isAbnormalLogin(event)) {
// 发送安全提醒邮件
emailService.sendSecurityAlert(
event.getUserId(),
event.getUsername(),
event.getIpAddress(),
event.getTimestamp()
);
log.info("Security alert sent for abnormal login: {}", event.getUsername());
}
// 记录登录日志
logLoginActivity(event);
} catch (Exception e) {
log.error("Failed to process login event for user: {}",
event.getUsername(), e);
}
}
private boolean isAbnormalLogin(UserLoginEvent event) {
// 异常登录检测逻辑
// 例如:检查IP地址、登录时间、设备信息等
return false;
}
private void logLoginActivity(UserLoginEvent event) {
// 记录登录活动日志
log.info("Login activity - User: {}, IP: {}, Time: {}",
event.getUsername(), event.getIpAddress(), event.getTimestamp());
}
}
审计服务事件监听器
AuditEventListener.java
package com.example.auditservice.listener;
import com.example.userservice.event.UserEvent;
import com.example.userservice.event.UserLoginEvent;
import com.example.userservice.event.UserRegisteredEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
/**
* 审计服务事件监听器
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class AuditEventListener {
private final AuditLogService auditLogService;
/**
* 监听所有用户事件
*/
@EventListener
public void handleUserEvent(UserEvent event) {
log.info("Received user event for audit: {}", event);
try {
// 记录审计日志
auditLogService.recordUserEvent(
event.getUserId(),
event.getUsername(),
event.getClass().getSimpleName(),
event.getTimestamp(),
event.getOriginService()
);
log.info("Audit log recorded for user event: {}", event.getClass().getSimpleName());
} catch (Exception e) {
log.error("Failed to record audit log for user event", e);
}
}
/**
* 监听用户注册事件(特殊处理)
*/
@EventListener
public void handleUserRegisteredEvent(UserRegisteredEvent event) {
log.info("Received user registered event for audit: {}", event);
try {
// 记录详细的注册审计信息
auditLogService.recordUserRegistration(
event.getUserId(),
event.getUsername(),
event.getEmail(),
event.getPhone(),
event.getTimestamp()
);
log.info("User registration audit recorded: {}", event.getUsername());
} catch (Exception e) {
log.error("Failed to record user registration audit", e);
}
}
/**
* 监听用户登录事件(特殊处理)
*/
@EventListener
public void handleUserLoginEvent(UserLoginEvent event) {
log.info("Received user login event for audit: {}", event);
try {
// 记录详细的登录审计信息
auditLogService.recordUserLogin(
event.getUserId(),
event.getUsername(),
event.getIpAddress(),
event.getUserAgent(),
event.getLoginType(),
event.getTimestamp()
);
log.info("User login audit recorded: {}", event.getUsername());
} catch (Exception e) {
log.error("Failed to record user login audit", e);
}
}
}
6.4 消息队列实战
1. 订单处理消息队列
订单事件定义
OrderEvent.java
package com.example.orderservice.event;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.List;
/**
* 订单事件基类
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public abstract class OrderEvent implements Serializable {
private Long orderId;
private Long userId;
private String orderNo;
private BigDecimal totalAmount;
private Long timestamp;
private String eventType;
public OrderEvent(Long orderId, Long userId, String orderNo,
BigDecimal totalAmount, String eventType) {
this.orderId = orderId;
this.userId = userId;
this.orderNo = orderNo;
this.totalAmount = totalAmount;
this.eventType = eventType;
this.timestamp = System.currentTimeMillis();
}
}
订单创建事件
OrderCreatedEvent.java
package com.example.orderservice.event;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
import java.util.List;
/**
* 订单创建事件
*/
@Data
@EqualsAndHashCode(callSuper = true)
@NoArgsConstructor
public class OrderCreatedEvent extends OrderEvent {
private List<OrderItem> orderItems;
private String shippingAddress;
private String paymentMethod;
public OrderCreatedEvent(Long orderId, Long userId, String orderNo,
BigDecimal totalAmount, List<OrderItem> orderItems,
String shippingAddress, String paymentMethod) {
super(orderId, userId, orderNo, totalAmount, "ORDER_CREATED");
this.orderItems = orderItems;
this.shippingAddress = shippingAddress;
this.paymentMethod = paymentMethod;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class OrderItem {
private Long productId;
private String productName;
private Integer quantity;
private BigDecimal price;
}
}
订单支付事件
OrderPaidEvent.java
package com.example.orderservice.event;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
/**
* 订单支付事件
*/
@Data
@EqualsAndHashCode(callSuper = true)
@NoArgsConstructor
public class OrderPaidEvent extends OrderEvent {
private String paymentId;
private String paymentMethod;
private Long paymentTime;
public OrderPaidEvent(Long orderId, Long userId, String orderNo,
BigDecimal totalAmount, String paymentId,
String paymentMethod, Long paymentTime) {
super(orderId, userId, orderNo, totalAmount, "ORDER_PAID");
this.paymentId = paymentId;
this.paymentMethod = paymentMethod;
this.paymentTime = paymentTime;
}
}
2. RabbitMQ配置
消息队列配置
RabbitMQConfig.java
package com.example.orderservice.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ配置
*/
@Configuration
public class RabbitMQConfig {
// 交换机名称
public static final String ORDER_EXCHANGE = "order.exchange";
public static final String DLX_EXCHANGE = "order.dlx.exchange";
// 队列名称
public static final String ORDER_CREATED_QUEUE = "order.created.queue";
public static final String ORDER_PAID_QUEUE = "order.paid.queue";
public static final String INVENTORY_QUEUE = "inventory.queue";
public static final String PAYMENT_QUEUE = "payment.queue";
public static final String NOTIFICATION_QUEUE = "notification.queue";
// 死信队列
public static final String ORDER_DLQ = "order.dlq";
// 路由键
public static final String ORDER_CREATED_ROUTING_KEY = "order.created";
public static final String ORDER_PAID_ROUTING_KEY = "order.paid";
public static final String INVENTORY_ROUTING_KEY = "inventory.update";
public static final String PAYMENT_ROUTING_KEY = "payment.process";
public static final String NOTIFICATION_ROUTING_KEY = "notification.send";
/**
* 消息转换器
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* RabbitTemplate配置
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(messageConverter());
template.setMandatory(true);
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败: " + cause);
}
});
template.setReturnsCallback(returned -> {
System.out.println("消息被退回: " + returned.getMessage());
});
return template;
}
/**
* 监听器容器工厂
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(messageConverter());
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(5);
factory.setPrefetchCount(1);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
// ========== 交换机定义 ==========
/**
* 订单主题交换机
*/
@Bean
public TopicExchange orderExchange() {
return ExchangeBuilder
.topicExchange(ORDER_EXCHANGE)
.durable(true)
.build();
}
/**
* 死信交换机
*/
@Bean
public DirectExchange dlxExchange() {
return ExchangeBuilder
.directExchange(DLX_EXCHANGE)
.durable(true)
.build();
}
// ========== 队列定义 ==========
/**
* 订单创建队列
*/
@Bean
public Queue orderCreatedQueue() {
return QueueBuilder
.durable(ORDER_CREATED_QUEUE)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE)
.withArgument("x-dead-letter-routing-key", "order.dead")
.withArgument("x-message-ttl", 86400000) // 24小时TTL
.build();
}
/**
* 订单支付队列
*/
@Bean
public Queue orderPaidQueue() {
return QueueBuilder
.durable(ORDER_PAID_QUEUE)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE)
.withArgument("x-dead-letter-routing-key", "order.dead")
.withArgument("x-message-ttl", 86400000)
.build();
}
/**
* 库存队列
*/
@Bean
public Queue inventoryQueue() {
return QueueBuilder
.durable(INVENTORY_QUEUE)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE)
.withArgument("x-dead-letter-routing-key", "inventory.dead")
.build();
}
/**
* 支付队列
*/
@Bean
public Queue paymentQueue() {
return QueueBuilder
.durable(PAYMENT_QUEUE)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE)
.withArgument("x-dead-letter-routing-key", "payment.dead")
.build();
}
/**
* 通知队列
*/
@Bean
public Queue notificationQueue() {
return QueueBuilder
.durable(NOTIFICATION_QUEUE)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE)
.withArgument("x-dead-letter-routing-key", "notification.dead")
.build();
}
/**
* 死信队列
*/
@Bean
public Queue deadLetterQueue() {
return QueueBuilder
.durable(ORDER_DLQ)
.build();
}
// ========== 绑定定义 ==========
/**
* 订单创建队列绑定
*/
@Bean
public Binding orderCreatedBinding() {
return BindingBuilder
.bind(orderCreatedQueue())
.to(orderExchange())
.with(ORDER_CREATED_ROUTING_KEY);
}
/**
* 订单支付队列绑定
*/
@Bean
public Binding orderPaidBinding() {
return BindingBuilder
.bind(orderPaidQueue())
.to(orderExchange())
.with(ORDER_PAID_ROUTING_KEY);
}
/**
* 库存队列绑定
*/
@Bean
public Binding inventoryBinding() {
return BindingBuilder
.bind(inventoryQueue())
.to(orderExchange())
.with(INVENTORY_ROUTING_KEY);
}
/**
* 支付队列绑定
*/
@Bean
public Binding paymentBinding() {
return BindingBuilder
.bind(paymentQueue())
.to(orderExchange())
.with(PAYMENT_ROUTING_KEY);
}
/**
* 通知队列绑定
*/
@Bean
public Binding notificationBinding() {
return BindingBuilder
.bind(notificationQueue())
.to(orderExchange())
.with(NOTIFICATION_ROUTING_KEY);
}
/**
* 死信队列绑定
*/
@Bean
public Binding deadLetterBinding() {
return BindingBuilder
.bind(deadLetterQueue())
.to(dlxExchange())
.with("order.dead");
}
}
3. 消息发送
订单消息发送服务
OrderMessageSender.java
package com.example.orderservice.service;
import com.example.orderservice.config.RabbitMQConfig;
import com.example.orderservice.event.OrderCreatedEvent;
import com.example.orderservice.event.OrderPaidEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
/**
* 订单消息发送服务
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderMessageSender {
private final RabbitTemplate rabbitTemplate;
/**
* 发送订单创建消息
*/
public void sendOrderCreatedMessage(OrderCreatedEvent event) {
try {
log.info("Sending order created message: {}", event.getOrderNo());
rabbitTemplate.convertAndSend(
RabbitMQConfig.ORDER_EXCHANGE,
RabbitMQConfig.ORDER_CREATED_ROUTING_KEY,
event
);
log.info("Order created message sent successfully: {}", event.getOrderNo());
} catch (Exception e) {
log.error("Failed to send order created message: {}", event.getOrderNo(), e);
throw new RuntimeException("消息发送失败", e);
}
}
/**
* 发送订单支付消息
*/
public void sendOrderPaidMessage(OrderPaidEvent event) {
try {
log.info("Sending order paid message: {}", event.getOrderNo());
rabbitTemplate.convertAndSend(
RabbitMQConfig.ORDER_EXCHANGE,
RabbitMQConfig.ORDER_PAID_ROUTING_KEY,
event
);
log.info("Order paid message sent successfully: {}", event.getOrderNo());
} catch (Exception e) {
log.error("Failed to send order paid message: {}", event.getOrderNo(), e);
throw new RuntimeException("消息发送失败", e);
}
}
/**
* 发送库存扣减消息
*/
public void sendInventoryUpdateMessage(Long productId, Integer quantity, String operation) {
try {
InventoryUpdateMessage message = new InventoryUpdateMessage(
productId, quantity, operation, System.currentTimeMillis()
);
log.info("Sending inventory update message: {}", message);
rabbitTemplate.convertAndSend(
RabbitMQConfig.ORDER_EXCHANGE,
RabbitMQConfig.INVENTORY_ROUTING_KEY,
message
);
log.info("Inventory update message sent successfully");
} catch (Exception e) {
log.error("Failed to send inventory update message", e);
throw new RuntimeException("库存消息发送失败", e);
}
}
/**
* 库存更新消息
*/
public static class InventoryUpdateMessage {
private Long productId;
private Integer quantity;
private String operation; // DECREASE, INCREASE
private Long timestamp;
// 构造函数、getter、setter省略
}
}
4. 消息接收
库存服务消息监听器
InventoryMessageListener.java
package com.example.inventoryservice.listener;
import com.example.orderservice.event.OrderCreatedEvent;
import com.example.orderservice.service.OrderMessageSender.InventoryUpdateMessage;
import com.rabbitmq.client.Channel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 库存服务消息监听器
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class InventoryMessageListener {
private final InventoryService inventoryService;
/**
* 监听订单创建消息,扣减库存
*/
@RabbitListener(queues = "order.created.queue")
public void handleOrderCreated(OrderCreatedEvent event, Message message, Channel channel) {
log.info("Received order created message: {}", event.getOrderNo());
try {
// 扣减库存
for (OrderCreatedEvent.OrderItem item : event.getOrderItems()) {
inventoryService.decreaseInventory(
item.getProductId(),
item.getQuantity()
);
}
log.info("Inventory decreased for order: {}", event.getOrderNo());
// 手动确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("Failed to decrease inventory for order: {}", event.getOrderNo(), e);
try {
// 拒绝消息,不重新入队(进入死信队列)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} catch (IOException ioException) {
log.error("Failed to nack message", ioException);
}
}
}
/**
* 监听库存更新消息
*/
@RabbitListener(queues = "inventory.queue")
public void handleInventoryUpdate(InventoryUpdateMessage message, Message msg, Channel channel) {
log.info("Received inventory update message: {}", message);
try {
if ("DECREASE".equals(message.getOperation())) {
inventoryService.decreaseInventory(
message.getProductId(),
message.getQuantity()
);
} else if ("INCREASE".equals(message.getOperation())) {
inventoryService.increaseInventory(
message.getProductId(),
message.getQuantity()
);
}
log.info("Inventory updated successfully");
// 手动确认消息
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("Failed to update inventory", e);
try {
// 重试机制:重新入队
channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);
} catch (IOException ioException) {
log.error("Failed to nack message", ioException);
}
}
}
}
支付服务消息监听器
PaymentMessageListener.java
package com.example.paymentservice.listener;
import com.example.orderservice.event.OrderCreatedEvent;
import com.example.orderservice.event.OrderPaidEvent;
import com.rabbitmq.client.Channel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 支付服务消息监听器
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class PaymentMessageListener {
private final PaymentService paymentService;
/**
* 监听订单创建消息,创建支付记录
*/
@RabbitListener(queues = "payment.queue")
public void handleOrderCreated(OrderCreatedEvent event, Message message, Channel channel) {
log.info("Received order created message for payment: {}", event.getOrderNo());
try {
// 创建支付记录
paymentService.createPaymentRecord(
event.getOrderId(),
event.getOrderNo(),
event.getTotalAmount(),
event.getPaymentMethod()
);
log.info("Payment record created for order: {}", event.getOrderNo());
// 手动确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("Failed to create payment record for order: {}", event.getOrderNo(), e);
try {
// 重试机制
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (IOException ioException) {
log.error("Failed to nack message", ioException);
}
}
}
/**
* 监听订单支付消息,更新支付状态
*/
@RabbitListener(queues = "order.paid.queue")
public void handleOrderPaid(OrderPaidEvent event, Message message, Channel channel) {
log.info("Received order paid message: {}", event.getOrderNo());
try {
// 更新支付状态
paymentService.updatePaymentStatus(
event.getPaymentId(),
"PAID",
event.getPaymentTime()
);
log.info("Payment status updated for order: {}", event.getOrderNo());
// 手动确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("Failed to update payment status for order: {}", event.getOrderNo(), e);
try {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (IOException ioException) {
log.error("Failed to nack message", ioException);
}
}
}
}
6.5 消息可靠性保证
1. 发送端确认
发送确认配置
MessageReliabilityConfig.java
package com.example.orderservice.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
/**
* 消息可靠性配置
*/
@Configuration
@Slf4j
public class MessageReliabilityConfig {
/**
* 可靠性RabbitTemplate
*/
@Bean
public RabbitTemplate reliableRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
// 开启发送确认
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("Message sent successfully, correlationData: {}", correlationData);
} else {
log.error("Message send failed, correlationData: {}, cause: {}",
correlationData, cause);
// 可以在这里实现重发逻辑
handleSendFailure(correlationData, cause);
}
});
// 开启返回确认
template.setReturnsCallback(returned -> {
log.error("Message returned: {}, replyCode: {}, replyText: {}",
returned.getMessage(), returned.getReplyCode(), returned.getReplyText());
// 处理消息被退回的情况
handleMessageReturned(returned);
});
// 设置为强制性
template.setMandatory(true);
// 设置重试模板
template.setRetryTemplate(retryTemplate());
return template;
}
/**
* 重试模板
*/
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
// 重试策略
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
retryTemplate.setRetryPolicy(retryPolicy);
// 退避策略
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(2);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
return retryTemplate;
}
private void handleSendFailure(Object correlationData, String cause) {
// 实现发送失败的处理逻辑
// 例如:记录到数据库,等待重发
log.error("Handling send failure for correlationData: {}", correlationData);
}
private void handleMessageReturned(org.springframework.amqp.core.ReturnedMessage returned) {
// 实现消息被退回的处理逻辑
// 例如:记录到数据库,人工处理
log.error("Handling returned message: {}", returned.getMessage());
}
}
2. 消息持久化
持久化消息发送
ReliableMessageSender.java
package com.example.orderservice.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.UUID;
/**
* 可靠消息发送服务
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class ReliableMessageSender {
private final RabbitTemplate rabbitTemplate;
private final MessageLogService messageLogService;
/**
* 可靠消息发送
*/
@Transactional
public void sendReliableMessage(String exchange, String routingKey, Object message) {
// 生成消息ID
String messageId = UUID.randomUUID().toString();
// 记录消息日志
messageLogService.saveMessageLog(messageId, exchange, routingKey, message, "SENDING");
try {
// 创建关联数据
CorrelationData correlationData = new CorrelationData(messageId);
// 设置消息属性
MessageProperties properties = new MessageProperties();
properties.setMessageId(messageId);
properties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE); // 持久化
// 发送消息
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
log.info("Reliable message sent, messageId: {}", messageId);
} catch (Exception e) {
// 更新消息状态为失败
messageLogService.updateMessageStatus(messageId, "FAILED");
log.error("Failed to send reliable message, messageId: {}", messageId, e);
throw e;
}
}
/**
* 处理发送确认
*/
public void handleConfirm(CorrelationData correlationData, boolean ack, String cause) {
String messageId = correlationData.getId();
if (ack) {
// 更新消息状态为成功
messageLogService.updateMessageStatus(messageId, "SUCCESS");
log.info("Message confirmed successfully, messageId: {}", messageId);
} else {
// 更新消息状态为失败
messageLogService.updateMessageStatus(messageId, "FAILED");
log.error("Message confirm failed, messageId: {}, cause: {}", messageId, cause);
// 可以实现重发逻辑
scheduleRetry(messageId);
}
}
private void scheduleRetry(String messageId) {
// 实现重发调度逻辑
log.info("Scheduling retry for message: {}", messageId);
}
}
消息日志服务
MessageLogService.java
package com.example.orderservice.service;
import com.example.orderservice.entity.MessageLog;
import com.example.orderservice.repository.MessageLogRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
/**
* 消息日志服务
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class MessageLogService {
private final MessageLogRepository messageLogRepository;
private final ObjectMapper objectMapper;
/**
* 保存消息日志
*/
@Transactional
public void saveMessageLog(String messageId, String exchange, String routingKey,
Object message, String status) {
try {
MessageLog messageLog = new MessageLog();
messageLog.setMessageId(messageId);
messageLog.setExchange(exchange);
messageLog.setRoutingKey(routingKey);
messageLog.setMessageContent(objectMapper.writeValueAsString(message));
messageLog.setStatus(status);
messageLog.setCreateTime(System.currentTimeMillis());
messageLog.setRetryCount(0);
messageLogRepository.save(messageLog);
log.info("Message log saved, messageId: {}", messageId);
} catch (Exception e) {
log.error("Failed to save message log, messageId: {}", messageId, e);
}
}
/**
* 更新消息状态
*/
@Transactional
public void updateMessageStatus(String messageId, String status) {
try {
MessageLog messageLog = messageLogRepository.findByMessageId(messageId);
if (messageLog != null) {
messageLog.setStatus(status);
messageLog.setUpdateTime(System.currentTimeMillis());
messageLogRepository.save(messageLog);
log.info("Message status updated, messageId: {}, status: {}", messageId, status);
}
} catch (Exception e) {
log.error("Failed to update message status, messageId: {}", messageId, e);
}
}
/**
* 获取失败的消息
*/
public List<MessageLog> getFailedMessages() {
return messageLogRepository.findByStatusAndRetryCountLessThan("FAILED", 3);
}
/**
* 增加重试次数
*/
@Transactional
public void incrementRetryCount(String messageId) {
MessageLog messageLog = messageLogRepository.findByMessageId(messageId);
if (messageLog != null) {
messageLog.setRetryCount(messageLog.getRetryCount() + 1);
messageLog.setUpdateTime(System.currentTimeMillis());
messageLogRepository.save(messageLog);
}
}
}
3. 消费端幂等性
幂等性处理
IdempotentMessageProcessor.java
package com.example.inventoryservice.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
/**
* 幂等性消息处理器
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class IdempotentMessageProcessor {
private final StringRedisTemplate redisTemplate;
private static final String IDEMPOTENT_KEY_PREFIX = "msg:idempotent:";
private static final long IDEMPOTENT_EXPIRE_TIME = 24; // 24小时
/**
* 检查消息是否已处理
*/
public boolean isMessageProcessed(String messageId) {
String key = IDEMPOTENT_KEY_PREFIX + messageId;
return Boolean.TRUE.equals(redisTemplate.hasKey(key));
}
/**
* 标记消息已处理
*/
public void markMessageProcessed(String messageId) {
String key = IDEMPOTENT_KEY_PREFIX + messageId;
redisTemplate.opsForValue().set(key, "processed", IDEMPOTENT_EXPIRE_TIME, TimeUnit.HOURS);
log.info("Message marked as processed: {}", messageId);
}
/**
* 幂等性处理包装器
*/
public <T> boolean processIdempotent(String messageId, T message,
MessageProcessor<T> processor) {
// 检查是否已处理
if (isMessageProcessed(messageId)) {
log.info("Message already processed, skipping: {}", messageId);
return true;
}
try {
// 处理消息
processor.process(message);
// 标记为已处理
markMessageProcessed(messageId);
log.info("Message processed successfully: {}", messageId);
return true;
} catch (Exception e) {
log.error("Failed to process message: {}", messageId, e);
return false;
}
}
/**
* 消息处理器接口
*/
@FunctionalInterface
public interface MessageProcessor<T> {
void process(T message) throws Exception;
}
}
6.6 死信队列处理
1. 死信队列配置
死信队列处理器
DeadLetterQueueHandler.java
package com.example.orderservice.service;
import com.rabbitmq.client.Channel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* 死信队列处理器
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class DeadLetterQueueHandler {
private final DeadLetterService deadLetterService;
private final NotificationService notificationService;
/**
* 处理死信消息
*/
@RabbitListener(queues = "order.dlq")
public void handleDeadLetter(Message message, Channel channel) {
log.error("Received dead letter message: {}",
new String(message.getBody(), StandardCharsets.UTF_8));
try {
// 记录死信消息
deadLetterService.recordDeadLetter(
message.getMessageProperties().getMessageId(),
new String(message.getBody(), StandardCharsets.UTF_8),
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
getFailureReason(message)
);
// 发送告警通知
notificationService.sendDeadLetterAlert(
message.getMessageProperties().getMessageId(),
getFailureReason(message)
);
// 确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("Dead letter message processed: {}",
message.getMessageProperties().getMessageId());
} catch (Exception e) {
log.error("Failed to process dead letter message", e);
try {
// 拒绝消息,避免无限循环
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} catch (IOException ioException) {
log.error("Failed to nack dead letter message", ioException);
}
}
}
private String getFailureReason(Message message) {
// 从消息头中获取失败原因
Object reason = message.getMessageProperties().getHeaders().get("x-death-reason");
return reason != null ? reason.toString() : "Unknown";
}
}
2. 死信服务
DeadLetterService.java
package com.example.orderservice.service;
import com.example.orderservice.entity.DeadLetterRecord;
import com.example.orderservice.repository.DeadLetterRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
/**
* 死信服务
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class DeadLetterService {
private final DeadLetterRepository deadLetterRepository;
/**
* 记录死信消息
*/
@Transactional
public void recordDeadLetter(String messageId, String messageContent,
String exchange, String routingKey, String reason) {
DeadLetterRecord record = new DeadLetterRecord();
record.setMessageId(messageId);
record.setMessageContent(messageContent);
record.setExchange(exchange);
record.setRoutingKey(routingKey);
record.setFailureReason(reason);
record.setStatus("PENDING");
record.setCreateTime(System.currentTimeMillis());
deadLetterRepository.save(record);
log.info("Dead letter record saved: {}", messageId);
}
/**
* 获取待处理的死信消息
*/
public List<DeadLetterRecord> getPendingDeadLetters() {
return deadLetterRepository.findByStatus("PENDING");
}
/**
* 重新发送死信消息
*/
@Transactional
public void resendDeadLetter(Long recordId) {
DeadLetterRecord record = deadLetterRepository.findById(recordId)
.orElseThrow(() -> new RuntimeException("死信记录不存在"));
try {
// 重新发送逻辑
// ...
record.setStatus("RESENT");
record.setUpdateTime(System.currentTimeMillis());
deadLetterRepository.save(record);
log.info("Dead letter message resent: {}", record.getMessageId());
} catch (Exception e) {
record.setStatus("FAILED");
record.setUpdateTime(System.currentTimeMillis());
deadLetterRepository.save(record);
log.error("Failed to resend dead letter message: {}", record.getMessageId(), e);
throw e;
}
}
}
6.7 消息监控
1. 消息监控配置
MessageMonitoringConfig.java
package com.example.orderservice.config;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 消息监控配置
*/
@Configuration
@RequiredArgsConstructor
public class MessageMonitoringConfig {
private final MeterRegistry meterRegistry;
/**
* 带监控的监听器容器工厂
*/
@Bean
public RabbitListenerContainerFactory<?> monitoringRabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 添加消息处理时间监控
factory.setAfterReceivePostProcessors(message -> {
Timer.Sample sample = Timer.start(meterRegistry);
message.getMessageProperties().getHeaders().put("timer.sample", sample);
return message;
});
return factory;
}
}
2. 消息指标收集
MessageMetricsCollector.java
package com.example.orderservice.service;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
/**
* 消息指标收集器
*/
@Service
@RequiredArgsConstructor
public class MessageMetricsCollector {
private final MeterRegistry meterRegistry;
private Counter messagesSentCounter;
private Counter messagesReceivedCounter;
private Counter messagesFailedCounter;
private Timer messageProcessingTimer;
@PostConstruct
public void init() {
messagesSentCounter = Counter.builder("messages.sent")
.description("Number of messages sent")
.register(meterRegistry);
messagesReceivedCounter = Counter.builder("messages.received")
.description("Number of messages received")
.register(meterRegistry);
messagesFailedCounter = Counter.builder("messages.failed")
.description("Number of failed messages")
.register(meterRegistry);
messageProcessingTimer = Timer.builder("message.processing.time")
.description("Message processing time")
.register(meterRegistry);
}
public void incrementMessagesSent() {
messagesSentCounter.increment();
}
public void incrementMessagesReceived() {
messagesReceivedCounter.increment();
}
public void incrementMessagesFailed() {
messagesFailedCounter.increment();
}
public Timer.Sample startProcessingTimer() {
return Timer.start(meterRegistry);
}
public void recordProcessingTime(Timer.Sample sample) {
sample.stop(messageProcessingTimer);
}
}
6.8 总结
核心概念回顾
消息总线的作用:
- 异步通信:提高系统响应性和吞吐量
- 服务解耦:降低服务间的耦合度
- 事件驱动:支持事件驱动架构
- 消息路由:智能的消息路由和分发
Spring Cloud Bus特性:
- 配置刷新:实现配置的自动刷新
- 事件传播:支持自定义事件的传播
- 服务状态同步:同步服务状态信息
- 集群管理:支持集群节点管理
消息可靠性保证:
- 发送端确认:确保消息成功发送到Broker
- 消息持久化:保证消息不丢失
- 消费端确认:确保消息被正确处理
- 幂等性处理:避免重复处理
最佳实践
消息设计:
- 消息结构清晰,包含必要的业务信息
- 消息大小适中,避免过大的消息
- 消息版本化,支持向后兼容
- 消息幂等性设计
可靠性保证:
- 开启发送端和消费端确认
- 实现消息重试机制
- 使用死信队列处理失败消息
- 记录消息处理日志
性能优化:
- 合理设置消费者并发数
- 使用批量处理提高效率
- 监控队列长度和处理速度
- 优化消息序列化方式
监控运维:
- 监控消息发送和消费指标
- 设置告警规则
- 定期清理过期消息
- 备份重要消息数据
注意事项
消息顺序:
- RabbitMQ不保证全局消息顺序
- 需要顺序处理时使用单一队列
- 考虑使用消息分区
消息重复:
- 网络异常可能导致消息重复
- 实现幂等性处理机制
- 使用唯一消息ID
资源管理:
- 合理配置连接池大小
- 及时关闭不用的连接
- 监控内存和CPU使用情况
安全考虑:
- 使用SSL/TLS加密传输
- 配置访问权限控制
- 敏感信息加密存储
扩展方向
高级功能:
- 消息路由规则引擎
- 消息转换和过滤
- 消息聚合和分发
- 消息流处理
集成方案:
- 集成Apache Kafka
- 集成Apache Pulsar
- 集成Redis Streams
- 集成云消息服务
通过本章学习,你应该掌握了Spring Cloud Bus和RabbitMQ的核心概念和实际应用。消息总线是微服务架构中实现异步通信和事件驱动的重要组件,合理使用能够显著提升系统的可扩展性和可靠性。
下一章我们将学习服务熔断与降级,了解如何在微服务架构中实现故障隔离和系统保护。