5.1 微服务通信概述
5.1.1 微服务通信模式
在微服务架构中,服务间通信是核心问题之一。Quarkus 提供了多种通信方式和集成方案:
graph TB
A[微服务通信模式] --> B[同步通信]
A --> C[异步通信]
A --> D[事件驱动]
A --> E[流处理]
B --> B1[REST API]
B --> B2[GraphQL]
B --> B3[gRPC]
B --> B4[WebSocket]
C --> C1[消息队列]
C --> C2[发布订阅]
C --> C3[请求响应]
C --> C4[工作队列]
D --> D1[事件总线]
D --> D2[事件溯源]
D --> D3[CQRS]
D --> D4[Saga 模式]
E --> E1[Apache Kafka]
E --> E2[Apache Pulsar]
E --> E3[Redis Streams]
E --> E4[RabbitMQ Streams]
5.1.2 Quarkus 通信扩展
Quarkus 提供了丰富的扩展来支持各种通信方式:
- REST Client:用于 HTTP/REST 服务调用
- SmallRye Reactive Messaging:响应式消息处理
- Apache Kafka:分布式流处理平台
- RabbitMQ:消息代理
- gRPC:高性能 RPC 框架
- WebSocket:实时双向通信
5.2 REST 客户端开发
5.2.1 添加依赖
在 pom.xml
中添加 REST 客户端依赖:
<dependencies>
<!-- REST Client -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client-reactive</artifactId>
</dependency>
<!-- JSON 支持 -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client-reactive-jackson</artifactId>
</dependency>
<!-- 容错支持 -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-fault-tolerance</artifactId>
</dependency>
<!-- 健康检查 -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
</dependency>
<!-- 指标监控 -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-micrometer</artifactId>
</dependency>
</dependencies>
5.2.2 定义 REST 客户端接口
package com.example.client;
import jakarta.ws.rs.*;
import jakarta.ws.rs.core.MediaType;
import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;
import org.eclipse.microprofile.faulttolerance.*;
import io.smallrye.mutiny.Uni;
import java.util.List;
import java.util.concurrent.CompletionStage;
// 用户服务客户端
@RegisterRestClient(configKey = "user-service")
@Path("/api/users")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public interface UserServiceClient {
@GET
@Path("/{id}")
@Retry(maxRetries = 3, delay = 1000)
@Timeout(value = 5000)
@Fallback(fallbackMethod = "getUserFallback")
Uni<User> getUser(@PathParam("id") Long id);
@GET
@CircuitBreaker(requestVolumeThreshold = 4, failureRatio = 0.5, delay = 10000)
@Retry(maxRetries = 2)
@Timeout(value = 3000)
Uni<List<User>> getAllUsers(@QueryParam("page") int page,
@QueryParam("size") int size);
@POST
@Retry(maxRetries = 2)
@Timeout(value = 10000)
Uni<User> createUser(User user);
@PUT
@Path("/{id}")
@Retry(maxRetries = 2)
@Timeout(value = 8000)
Uni<User> updateUser(@PathParam("id") Long id, User user);
@DELETE
@Path("/{id}")
@Retry(maxRetries = 1)
@Timeout(value = 5000)
Uni<Void> deleteUser(@PathParam("id") Long id);
@GET
@Path("/search")
@Bulkhead(value = 5, waitingTaskQueue = 10)
@Timeout(value = 15000)
Uni<List<User>> searchUsers(@QueryParam("keyword") String keyword,
@QueryParam("department") String department);
// 降级方法
default Uni<User> getUserFallback(Long id) {
User fallbackUser = new User();
fallbackUser.id = id;
fallbackUser.name = "Unknown User";
fallbackUser.email = "unknown@example.com";
fallbackUser.status = UserStatus.INACTIVE;
return Uni.createFrom().item(fallbackUser);
}
}
// 订单服务客户端
@RegisterRestClient(configKey = "order-service")
@Path("/api/orders")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public interface OrderServiceClient {
@GET
@Path("/user/{userId}")
@Retry(maxRetries = 3)
@Timeout(value = 5000)
@CircuitBreaker(requestVolumeThreshold = 5, failureRatio = 0.6)
Uni<List<Order>> getUserOrders(@PathParam("userId") Long userId);
@POST
@Retry(maxRetries = 2)
@Timeout(value = 15000)
@Bulkhead(value = 3)
Uni<Order> createOrder(CreateOrderRequest request);
@GET
@Path("/{id}/status")
@Retry(maxRetries = 2)
@Timeout(value = 3000)
Uni<OrderStatus> getOrderStatus(@PathParam("id") Long orderId);
@PUT
@Path("/{id}/cancel")
@Retry(maxRetries = 1)
@Timeout(value = 8000)
Uni<Void> cancelOrder(@PathParam("id") Long orderId);
}
// 支付服务客户端
@RegisterRestClient(configKey = "payment-service")
@Path("/api/payments")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public interface PaymentServiceClient {
@POST
@Path("/process")
@Retry(maxRetries = 2, delay = 2000)
@Timeout(value = 30000)
@CircuitBreaker(requestVolumeThreshold = 3, failureRatio = 0.7, delay = 15000)
Uni<PaymentResult> processPayment(PaymentRequest request);
@GET
@Path("/{id}")
@Retry(maxRetries = 3)
@Timeout(value = 5000)
Uni<Payment> getPayment(@PathParam("id") String paymentId);
@POST
@Path("/{id}/refund")
@Retry(maxRetries = 1)
@Timeout(value = 20000)
Uni<RefundResult> refundPayment(@PathParam("id") String paymentId,
RefundRequest request);
}
5.2.3 配置 REST 客户端
在 application.properties
中配置客户端:
# 用户服务配置
quarkus.rest-client.user-service.url=http://localhost:8081
quarkus.rest-client.user-service.scope=jakarta.inject.Singleton
quarkus.rest-client.user-service.connect-timeout=5000
quarkus.rest-client.user-service.read-timeout=10000
quarkus.rest-client.user-service.max-redirects=3
quarkus.rest-client.user-service.follow-redirects=true
quarkus.rest-client.user-service.providers=com.example.client.UserServiceClientExceptionMapper
# 订单服务配置
quarkus.rest-client.order-service.url=http://localhost:8082
quarkus.rest-client.order-service.scope=jakarta.inject.Singleton
quarkus.rest-client.order-service.connect-timeout=3000
quarkus.rest-client.order-service.read-timeout=15000
# 支付服务配置
quarkus.rest-client.payment-service.url=http://localhost:8083
quarkus.rest-client.payment-service.scope=jakarta.inject.Singleton
quarkus.rest-client.payment-service.connect-timeout=5000
quarkus.rest-client.payment-service.read-timeout=30000
# 开发环境配置
%dev.quarkus.rest-client.user-service.url=http://localhost:8081
%dev.quarkus.rest-client.order-service.url=http://localhost:8082
%dev.quarkus.rest-client.payment-service.url=http://localhost:8083
# 生产环境配置
%prod.quarkus.rest-client.user-service.url=https://user-service.example.com
%prod.quarkus.rest-client.order-service.url=https://order-service.example.com
%prod.quarkus.rest-client.payment-service.url=https://payment-service.example.com
# SSL 配置
quarkus.rest-client.payment-service.trust-store=/path/to/truststore.jks
quarkus.rest-client.payment-service.trust-store-password=changeit
quarkus.rest-client.payment-service.trust-store-type=JKS
# 代理配置
quarkus.rest-client.user-service.proxy-address=proxy.example.com:8080
quarkus.rest-client.user-service.proxy-username=proxyuser
quarkus.rest-client.user-service.proxy-password=proxypass
# 日志配置
quarkus.log.category."org.eclipse.microprofile.rest.client".level=DEBUG
quarkus.rest-client.logging.scope=request-response
quarkus.rest-client.logging.body-limit=1024
5.2.4 使用 REST 客户端
package com.example.service;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.Multi;
import com.example.client.*;
import com.example.dto.*;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletionStage;
@ApplicationScoped
public class BusinessService {
@Inject
@RestClient
UserServiceClient userServiceClient;
@Inject
@RestClient
OrderServiceClient orderServiceClient;
@Inject
@RestClient
PaymentServiceClient paymentServiceClient;
// 获取用户详细信息(包含订单)
public Uni<UserWithOrders> getUserWithOrders(Long userId) {
return userServiceClient.getUser(userId)
.onItem().transformToUni(user ->
orderServiceClient.getUserOrders(userId)
.onItem().transform(orders -> {
UserWithOrders result = new UserWithOrders();
result.user = user;
result.orders = orders;
return result;
})
)
.onFailure().recoverWithItem(throwable -> {
// 降级处理
UserWithOrders fallback = new UserWithOrders();
fallback.user = createFallbackUser(userId);
fallback.orders = List.of();
return fallback;
});
}
// 并行获取多个用户信息
public Uni<List<User>> getUsersInParallel(List<Long> userIds) {
List<Uni<User>> userUnis = userIds.stream()
.map(userServiceClient::getUser)
.toList();
return Uni.combine().all().unis(userUnis)
.combinedWith(users -> (List<User>) users);
}
// 创建订单并处理支付
public Uni<OrderResult> createOrderWithPayment(CreateOrderRequest orderRequest,
PaymentRequest paymentRequest) {
return orderServiceClient.createOrder(orderRequest)
.onItem().transformToUni(order ->
paymentServiceClient.processPayment(paymentRequest)
.onItem().transform(paymentResult -> {
OrderResult result = new OrderResult();
result.order = order;
result.paymentResult = paymentResult;
result.success = paymentResult.status == PaymentStatus.SUCCESS;
return result;
})
.onFailure().recoverWithUni(throwable -> {
// 支付失败,取消订单
return orderServiceClient.cancelOrder(order.id)
.onItem().transform(ignored -> {
OrderResult result = new OrderResult();
result.order = order;
result.success = false;
result.errorMessage = "Payment failed: " + throwable.getMessage();
return result;
});
})
);
}
// 批量处理订单状态
public Uni<List<OrderStatusInfo>> getOrderStatuses(List<Long> orderIds) {
return Multi.createFrom().iterable(orderIds)
.onItem().transformToUniAndMerge(orderId ->
orderServiceClient.getOrderStatus(orderId)
.onItem().transform(status -> {
OrderStatusInfo info = new OrderStatusInfo();
info.orderId = orderId;
info.status = status;
return info;
})
.onFailure().recoverWithItem(throwable -> {
OrderStatusInfo info = new OrderStatusInfo();
info.orderId = orderId;
info.status = OrderStatus.UNKNOWN;
info.error = throwable.getMessage();
return info;
})
)
.collect().asList();
}
// 搜索用户并获取其订单统计
public Uni<List<UserOrderSummary>> searchUsersWithOrderSummary(String keyword,
String department) {
return userServiceClient.searchUsers(keyword, department)
.onItem().transformToUni(users -> {
List<Uni<UserOrderSummary>> summaryUnis = users.stream()
.map(user ->
orderServiceClient.getUserOrders(user.id)
.onItem().transform(orders -> {
UserOrderSummary summary = new UserOrderSummary();
summary.user = user;
summary.totalOrders = orders.size();
summary.totalAmount = orders.stream()
.map(order -> order.totalAmount)
.reduce(BigDecimal.ZERO, BigDecimal::add);
return summary;
})
.onFailure().recoverWithItem(throwable -> {
UserOrderSummary summary = new UserOrderSummary();
summary.user = user;
summary.totalOrders = 0;
summary.totalAmount = BigDecimal.ZERO;
return summary;
})
)
.toList();
return Uni.combine().all().unis(summaryUnis)
.combinedWith(summaries -> (List<UserOrderSummary>) summaries);
});
}
// 超时和重试示例
public Uni<String> complexOperation(Long userId) {
return userServiceClient.getUser(userId)
.onItem().transformToUni(user ->
orderServiceClient.getUserOrders(userId)
.onItem().transformToUni(orders -> {
if (!orders.isEmpty()) {
Order latestOrder = orders.get(0);
return paymentServiceClient.getPayment(latestOrder.paymentId)
.onItem().transform(payment ->
String.format("User %s has %d orders, latest payment: %s",
user.name, orders.size(), payment.status)
);
} else {
return Uni.createFrom().item(
String.format("User %s has no orders", user.name)
);
}
})
)
.ifNoItem().after(Duration.ofSeconds(30))
.fail()
.onFailure().retry().atMost(2)
.onFailure().recoverWithItem("Operation failed after retries");
}
private User createFallbackUser(Long userId) {
User user = new User();
user.id = userId;
user.name = "Unknown User";
user.email = "unknown@example.com";
user.status = UserStatus.INACTIVE;
return user;
}
}
5.2.5 异常处理和映射
package com.example.client;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.ext.Provider;
import org.eclipse.microprofile.rest.client.ext.ResponseExceptionMapper;
import com.example.exception.*;
@Provider
public class UserServiceClientExceptionMapper implements ResponseExceptionMapper<RuntimeException> {
@Override
public RuntimeException toThrowable(Response response) {
int status = response.getStatus();
String message = response.readEntity(String.class);
return switch (status) {
case 400 -> new BadRequestException("Bad request: " + message);
case 401 -> new UnauthorizedException("Unauthorized: " + message);
case 403 -> new ForbiddenException("Forbidden: " + message);
case 404 -> new NotFoundException("User not found: " + message);
case 409 -> new ConflictException("Conflict: " + message);
case 429 -> new TooManyRequestsException("Rate limit exceeded: " + message);
case 500 -> new InternalServerErrorException("Internal server error: " + message);
case 502 -> new BadGatewayException("Bad gateway: " + message);
case 503 -> new ServiceUnavailableException("Service unavailable: " + message);
case 504 -> new GatewayTimeoutException("Gateway timeout: " + message);
default -> new RuntimeException("HTTP " + status + ": " + message);
};
}
}
// 自定义异常类
public class BadRequestException extends RuntimeException {
public BadRequestException(String message) {
super(message);
}
}
public class UnauthorizedException extends RuntimeException {
public UnauthorizedException(String message) {
super(message);
}
}
public class NotFoundException extends RuntimeException {
public NotFoundException(String message) {
super(message);
}
}
public class ConflictException extends RuntimeException {
public ConflictException(String message) {
super(message);
}
}
public class TooManyRequestsException extends RuntimeException {
public TooManyRequestsException(String message) {
super(message);
}
}
public class InternalServerErrorException extends RuntimeException {
public InternalServerErrorException(String message) {
super(message);
}
}
public class BadGatewayException extends RuntimeException {
public BadGatewayException(String message) {
super(message);
}
}
public class ServiceUnavailableException extends RuntimeException {
public ServiceUnavailableException(String message) {
super(message);
}
}
public class GatewayTimeoutException extends RuntimeException {
public GatewayTimeoutException(String message) {
super(message);
}
}
5.3 响应式消息处理
5.3.1 添加消息处理依赖
<dependencies>
<!-- SmallRye Reactive Messaging -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging</artifactId>
</dependency>
<!-- Apache Kafka -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
<!-- RabbitMQ -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-rabbitmq</artifactId>
</dependency>
<!-- AMQP -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-amqp</artifactId>
</dependency>
<!-- JSON 序列化 -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jackson</artifactId>
</dependency>
</dependencies>
5.3.2 配置消息系统
# Kafka 配置
kafka.bootstrap.servers=localhost:9092
# 用户事件通道配置
mp.messaging.outgoing.user-events.connector=smallrye-kafka
mp.messaging.outgoing.user-events.topic=user-events
mp.messaging.outgoing.user-events.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.user-events.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer
# 订单事件通道配置
mp.messaging.outgoing.order-events.connector=smallrye-kafka
mp.messaging.outgoing.order-events.topic=order-events
mp.messaging.outgoing.order-events.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.order-events.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer
# 支付事件通道配置
mp.messaging.incoming.payment-events.connector=smallrye-kafka
mp.messaging.incoming.payment-events.topic=payment-events
mp.messaging.incoming.payment-events.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.payment-events.value.deserializer=io.quarkus.kafka.client.serialization.JsonbDeserializer
mp.messaging.incoming.payment-events.group.id=order-service-group
# 通知事件通道配置
mp.messaging.incoming.notification-events.connector=smallrye-kafka
mp.messaging.incoming.notification-events.topic=notification-events
mp.messaging.incoming.notification-events.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.notification-events.value.deserializer=io.quarkus.kafka.client.serialization.JsonbDeserializer
mp.messaging.incoming.notification-events.group.id=notification-service-group
# RabbitMQ 配置
rabbitmq-host=localhost
rabbitmq-port=5672
rabbitmq-username=guest
rabbitmq-password=guest
# 邮件通知队列配置
mp.messaging.outgoing.email-notifications.connector=smallrye-rabbitmq
mp.messaging.outgoing.email-notifications.exchange.name=notifications
mp.messaging.outgoing.email-notifications.routing-key=email
mp.messaging.outgoing.email-notifications.exchange.type=topic
# SMS 通知队列配置
mp.messaging.outgoing.sms-notifications.connector=smallrye-rabbitmq
mp.messaging.outgoing.sms-notifications.exchange.name=notifications
mp.messaging.outgoing.sms-notifications.routing-key=sms
mp.messaging.outgoing.sms-notifications.exchange.type=topic
# 死信队列配置
mp.messaging.incoming.failed-messages.connector=smallrye-kafka
mp.messaging.incoming.failed-messages.topic=failed-messages
mp.messaging.incoming.failed-messages.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.failed-messages.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.failed-messages.group.id=error-handler-group
# 重试配置
mp.messaging.incoming.payment-events.failure-strategy=ignore
mp.messaging.incoming.notification-events.retry=true
mp.messaging.incoming.notification-events.max-retries=3
mp.messaging.incoming.notification-events.retry-delay=5000
5.3.3 事件模型定义
package com.example.event;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.LocalDateTime;
import java.math.BigDecimal;
// 基础事件类
public abstract class BaseEvent {
@JsonProperty("event_id")
public String eventId;
@JsonProperty("event_type")
public String eventType;
@JsonProperty("timestamp")
public LocalDateTime timestamp;
@JsonProperty("source_service")
public String sourceService;
@JsonProperty("correlation_id")
public String correlationId;
public BaseEvent() {
this.eventId = java.util.UUID.randomUUID().toString();
this.timestamp = LocalDateTime.now();
}
}
// 用户事件
public class UserEvent extends BaseEvent {
@JsonProperty("user_id")
public Long userId;
@JsonProperty("user_name")
public String userName;
@JsonProperty("user_email")
public String userEmail;
@JsonProperty("action")
public UserAction action;
@JsonProperty("previous_data")
public Object previousData;
@JsonProperty("current_data")
public Object currentData;
public UserEvent() {
super();
this.eventType = "USER_EVENT";
}
}
public enum UserAction {
CREATED,
UPDATED,
DELETED,
ACTIVATED,
DEACTIVATED,
PASSWORD_CHANGED,
EMAIL_VERIFIED
}
// 订单事件
public class OrderEvent extends BaseEvent {
@JsonProperty("order_id")
public Long orderId;
@JsonProperty("user_id")
public Long userId;
@JsonProperty("order_status")
public OrderStatus orderStatus;
@JsonProperty("total_amount")
public BigDecimal totalAmount;
@JsonProperty("action")
public OrderAction action;
@JsonProperty("items")
public List<OrderItemData> items;
public OrderEvent() {
super();
this.eventType = "ORDER_EVENT";
}
}
public enum OrderAction {
CREATED,
UPDATED,
CANCELLED,
CONFIRMED,
SHIPPED,
DELIVERED,
RETURNED
}
public class OrderItemData {
@JsonProperty("product_id")
public Long productId;
@JsonProperty("product_name")
public String productName;
@JsonProperty("quantity")
public Integer quantity;
@JsonProperty("unit_price")
public BigDecimal unitPrice;
@JsonProperty("total_price")
public BigDecimal totalPrice;
}
// 支付事件
public class PaymentEvent extends BaseEvent {
@JsonProperty("payment_id")
public String paymentId;
@JsonProperty("order_id")
public Long orderId;
@JsonProperty("user_id")
public Long userId;
@JsonProperty("amount")
public BigDecimal amount;
@JsonProperty("currency")
public String currency;
@JsonProperty("payment_method")
public String paymentMethod;
@JsonProperty("status")
public PaymentStatus status;
@JsonProperty("action")
public PaymentAction action;
@JsonProperty("transaction_id")
public String transactionId;
@JsonProperty("failure_reason")
public String failureReason;
public PaymentEvent() {
super();
this.eventType = "PAYMENT_EVENT";
}
}
public enum PaymentAction {
INITIATED,
PROCESSING,
COMPLETED,
FAILED,
CANCELLED,
REFUNDED,
PARTIALLY_REFUNDED
}
// 通知事件
public class NotificationEvent extends BaseEvent {
@JsonProperty("recipient_id")
public Long recipientId;
@JsonProperty("recipient_email")
public String recipientEmail;
@JsonProperty("recipient_phone")
public String recipientPhone;
@JsonProperty("notification_type")
public NotificationType notificationType;
@JsonProperty("channel")
public NotificationChannel channel;
@JsonProperty("subject")
public String subject;
@JsonProperty("content")
public String content;
@JsonProperty("template_id")
public String templateId;
@JsonProperty("template_data")
public Map<String, Object> templateData;
@JsonProperty("priority")
public NotificationPriority priority;
@JsonProperty("scheduled_time")
public LocalDateTime scheduledTime;
public NotificationEvent() {
super();
this.eventType = "NOTIFICATION_EVENT";
}
}
public enum NotificationType {
WELCOME,
ORDER_CONFIRMATION,
ORDER_SHIPPED,
ORDER_DELIVERED,
PAYMENT_SUCCESS,
PAYMENT_FAILED,
PASSWORD_RESET,
ACCOUNT_VERIFICATION,
PROMOTIONAL,
SYSTEM_ALERT
}
public enum NotificationChannel {
EMAIL,
SMS,
PUSH,
IN_APP,
WEBHOOK
}
public enum NotificationPriority {
LOW,
NORMAL,
HIGH,
URGENT
}
5.3.4 事件发布者
package com.example.messaging;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Message;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import com.example.event.*;
import java.util.concurrent.CompletionStage;
@ApplicationScoped
public class EventPublisher {
@Inject
@Channel("user-events")
Emitter<UserEvent> userEventEmitter;
@Inject
@Channel("order-events")
Emitter<OrderEvent> orderEventEmitter;
@Inject
@Channel("email-notifications")
Emitter<NotificationEvent> emailNotificationEmitter;
@Inject
@Channel("sms-notifications")
Emitter<NotificationEvent> smsNotificationEmitter;
// 发布用户事件
public Uni<Void> publishUserEvent(UserEvent event) {
event.sourceService = "user-service";
OutgoingKafkaRecordMetadata<String> metadata = OutgoingKafkaRecordMetadata.<String>builder()
.withKey(event.userId.toString())
.withPartition(event.userId.intValue() % 3) // 按用户ID分区
.build();
Message<UserEvent> message = Message.of(event)
.addMetadata(metadata);
return Uni.createFrom().completionStage(
userEventEmitter.send(message)
);
}
// 发布订单事件
public Uni<Void> publishOrderEvent(OrderEvent event) {
event.sourceService = "order-service";
OutgoingKafkaRecordMetadata<String> metadata = OutgoingKafkaRecordMetadata.<String>builder()
.withKey(event.orderId.toString())
.withPartition(event.userId.intValue() % 5) // 按用户ID分区
.build();
Message<OrderEvent> message = Message.of(event)
.addMetadata(metadata);
return Uni.createFrom().completionStage(
orderEventEmitter.send(message)
);
}
// 发布邮件通知事件
public Uni<Void> publishEmailNotification(NotificationEvent event) {
event.sourceService = "notification-service";
event.channel = NotificationChannel.EMAIL;
Message<NotificationEvent> message = Message.of(event);
return Uni.createFrom().completionStage(
emailNotificationEmitter.send(message)
);
}
// 发布短信通知事件
public Uni<Void> publishSmsNotification(NotificationEvent event) {
event.sourceService = "notification-service";
event.channel = NotificationChannel.SMS;
Message<NotificationEvent> message = Message.of(event);
return Uni.createFrom().completionStage(
smsNotificationEmitter.send(message)
);
}
// 批量发布事件
public Uni<Void> publishUserEvents(List<UserEvent> events) {
List<Uni<Void>> publishUnis = events.stream()
.map(this::publishUserEvent)
.toList();
return Uni.combine().all().unis(publishUnis)
.discardItems();
}
// 条件发布
public Uni<Void> publishOrderEventIfNeeded(OrderEvent event, boolean condition) {
if (condition) {
return publishOrderEvent(event);
} else {
return Uni.createFrom().voidItem();
}
}
// 延迟发布
public Uni<Void> publishDelayedNotification(NotificationEvent event, Duration delay) {
return Uni.createFrom().voidItem()
.onItem().delayIt().by(delay)
.onItem().transformToUni(ignored -> publishEmailNotification(event));
}
}
5.3.5 事件消费者
package com.example.messaging;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;
import com.example.event.*;
import com.example.service.*;
import java.util.concurrent.CompletionStage;
import java.time.Duration;
@ApplicationScoped
public class EventConsumer {
@Inject
OrderService orderService;
@Inject
NotificationService notificationService;
@Inject
AnalyticsService analyticsService;
@Inject
EventPublisher eventPublisher;
// 处理支付事件
@Incoming("payment-events")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public Uni<Void> handlePaymentEvent(Message<PaymentEvent> message) {
PaymentEvent event = message.getPayload();
return processPaymentEvent(event)
.onItem().transformToUni(ignored -> {
// 手动确认消息
return Uni.createFrom().completionStage(message.ack());
})
.onFailure().recoverWithUni(throwable -> {
// 处理失败,拒绝消息
logger.error("Failed to process payment event: {}", event.paymentId, throwable);
return Uni.createFrom().completionStage(message.nack(throwable));
});
}
// 处理通知事件
@Incoming("notification-events")
@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
public Uni<Void> handleNotificationEvent(Message<NotificationEvent> message) {
NotificationEvent event = message.getPayload();
// 获取 Kafka 元数据
IncomingKafkaRecordMetadata metadata = message.getMetadata(IncomingKafkaRecordMetadata.class)
.orElse(null);
if (metadata != null) {
logger.info("Processing notification from partition: {}, offset: {}",
metadata.getPartition(), metadata.getOffset());
}
return processNotificationEvent(event)
.onFailure().retry().withBackOff(Duration.ofSeconds(1), Duration.ofSeconds(10))
.atMost(3);
}
// 处理失败消息
@Incoming("failed-messages")
public Uni<Void> handleFailedMessage(Message<String> message) {
String failedMessage = message.getPayload();
logger.warn("Received failed message: {}", failedMessage);
// 记录失败消息到数据库或发送告警
return analyticsService.recordFailedMessage(failedMessage)
.onItem().transformToUni(ignored -> {
// 可以选择重新发布到原始主题或死信队列
return Uni.createFrom().voidItem();
});
}
private Uni<Void> processPaymentEvent(PaymentEvent event) {
return switch (event.action) {
case COMPLETED -> handlePaymentCompleted(event);
case FAILED -> handlePaymentFailed(event);
case REFUNDED -> handlePaymentRefunded(event);
default -> {
logger.info("Ignoring payment event action: {}", event.action);
yield Uni.createFrom().voidItem();
}
};
}
private Uni<Void> handlePaymentCompleted(PaymentEvent event) {
return orderService.markOrderAsPaid(event.orderId)
.onItem().transformToUni(order -> {
// 发布订单确认事件
OrderEvent orderEvent = new OrderEvent();
orderEvent.orderId = order.id;
orderEvent.userId = order.userId;
orderEvent.orderStatus = OrderStatus.CONFIRMED;
orderEvent.totalAmount = order.totalAmount;
orderEvent.action = OrderAction.CONFIRMED;
orderEvent.correlationId = event.correlationId;
return eventPublisher.publishOrderEvent(orderEvent);
})
.onItem().transformToUni(ignored -> {
// 发送确认通知
NotificationEvent notification = new NotificationEvent();
notification.recipientId = event.userId;
notification.notificationType = NotificationType.ORDER_CONFIRMATION;
notification.priority = NotificationPriority.HIGH;
notification.correlationId = event.correlationId;
return eventPublisher.publishEmailNotification(notification);
});
}
private Uni<Void> handlePaymentFailed(PaymentEvent event) {
return orderService.markOrderAsPaymentFailed(event.orderId)
.onItem().transformToUni(order -> {
// 发送支付失败通知
NotificationEvent notification = new NotificationEvent();
notification.recipientId = event.userId;
notification.notificationType = NotificationType.PAYMENT_FAILED;
notification.priority = NotificationPriority.HIGH;
notification.correlationId = event.correlationId;
return eventPublisher.publishEmailNotification(notification);
});
}
private Uni<Void> handlePaymentRefunded(PaymentEvent event) {
return orderService.processRefund(event.orderId, event.amount)
.onItem().transformToUni(refund -> {
// 发送退款通知
NotificationEvent notification = new NotificationEvent();
notification.recipientId = event.userId;
notification.notificationType = NotificationType.PAYMENT_SUCCESS;
notification.subject = "Refund Processed";
notification.priority = NotificationPriority.NORMAL;
notification.correlationId = event.correlationId;
return eventPublisher.publishEmailNotification(notification);
});
}
private Uni<Void> processNotificationEvent(NotificationEvent event) {
return switch (event.channel) {
case EMAIL -> notificationService.sendEmail(event);
case SMS -> notificationService.sendSms(event);
case PUSH -> notificationService.sendPushNotification(event);
case IN_APP -> notificationService.createInAppNotification(event);
case WEBHOOK -> notificationService.sendWebhook(event);
};
}
}
5.4 服务发现与负载均衡
5.4.1 Consul 集成
添加 Consul 依赖:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-consul-config</artifactId>
</dependency>
配置 Consul:
# Consul 配置
quarkus.consul-config.enabled=true
quarkus.consul-config.agent.host-port=localhost:8500
quarkus.consul-config.prefix=config
quarkus.consul-config.fail-on-missing-key=false
# 服务注册
quarkus.application.name=order-service
quarkus.application.version=1.0.0
# 健康检查
quarkus.consul-config.health-check.enabled=true
quarkus.consul-config.health-check.path=/health
quarkus.consul-config.health-check.interval=10s
5.4.2 Kubernetes 服务发现
# kubernetes-service-discovery.yaml
apiVersion: v1
kind: Service
metadata:
name: user-service
labels:
app: user-service
spec:
selector:
app: user-service
ports:
- port: 8080
targetPort: 8080
type: ClusterIP
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: user-service
spec:
replicas: 3
selector:
matchLabels:
app: user-service
template:
metadata:
labels:
app: user-service
spec:
containers:
- name: user-service
image: user-service:latest
ports:
- containerPort: 8080
env:
- name: QUARKUS_PROFILE
value: "prod"
livenessProbe:
httpGet:
path: /health/live
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health/ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
配置 Kubernetes 服务发现:
# Kubernetes 配置
quarkus.kubernetes-client.trust-certs=true
quarkus.kubernetes-client.namespace=default
# 服务发现配置
quarkus.rest-client.user-service.url=http://user-service:8080
quarkus.rest-client.order-service.url=http://order-service:8080
quarkus.rest-client.payment-service.url=http://payment-service:8080
# 负载均衡配置
quarkus.rest-client.user-service.providers=io.quarkus.restclient.runtime.RestClientBuilderImpl$LoadBalancerProvider
5.5 本章小结
5.5.1 核心概念回顾
本章深入探讨了 Quarkus 中的微服务通信与集成技术,主要包括:
REST 客户端:学习了如何使用 Quarkus REST 客户端进行服务间通信,包括容错机制
响应式消息处理:掌握了 SmallRye Reactive Messaging 的使用,支持 Kafka、RabbitMQ 等
事件驱动架构:了解了事件模型设计、事件发布和消费的最佳实践
服务发现:学习了 Consul 和 Kubernetes 服务发现的配置和使用
容错机制:掌握了重试、超时、断路器、舱壁等容错模式
5.5.2 技术要点总结
- 客户端配置:合理配置超时、重试和连接池参数
- 容错设计:使用断路器、降级和舱壁模式提高系统稳定性
- 消息处理:选择合适的消息确认策略和错误处理机制
- 事件设计:设计清晰的事件模型和处理流程
- 监控告警:实施完善的监控和告警机制
5.5.3 最佳实践
通信设计原则:
- 优先使用异步通信
- 实现幂等性操作
- 设计合理的重试策略
消息处理策略:
- 使用死信队列处理失败消息
- 实现消息去重机制
- 合理设置消费者并发度
服务治理策略:
- 实施服务版本管理
- 使用健康检查和服务注册
- 实现优雅的服务下线
5.5.4 下一章预告
下一章我们将学习安全认证与授权,包括: - JWT 令牌认证 - OAuth2 和 OpenID Connect - 角色和权限管理 - API 安全最佳实践 - 安全审计和监控
通过本章的学习,你已经掌握了微服务通信的核心技术,为构建分布式系统奠定了坚实的基础。