5.1 微服务通信概述

5.1.1 微服务通信模式

在微服务架构中,服务间通信是核心问题之一。Quarkus 提供了多种通信方式和集成方案:

graph TB
    A[微服务通信模式] --> B[同步通信]
    A --> C[异步通信]
    A --> D[事件驱动]
    A --> E[流处理]
    
    B --> B1[REST API]
    B --> B2[GraphQL]
    B --> B3[gRPC]
    B --> B4[WebSocket]
    
    C --> C1[消息队列]
    C --> C2[发布订阅]
    C --> C3[请求响应]
    C --> C4[工作队列]
    
    D --> D1[事件总线]
    D --> D2[事件溯源]
    D --> D3[CQRS]
    D --> D4[Saga 模式]
    
    E --> E1[Apache Kafka]
    E --> E2[Apache Pulsar]
    E --> E3[Redis Streams]
    E --> E4[RabbitMQ Streams]

5.1.2 Quarkus 通信扩展

Quarkus 提供了丰富的扩展来支持各种通信方式:

  • REST Client:用于 HTTP/REST 服务调用
  • SmallRye Reactive Messaging:响应式消息处理
  • Apache Kafka:分布式流处理平台
  • RabbitMQ:消息代理
  • gRPC:高性能 RPC 框架
  • WebSocket:实时双向通信

5.2 REST 客户端开发

5.2.1 添加依赖

pom.xml 中添加 REST 客户端依赖:

<dependencies>
    <!-- REST Client -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-rest-client-reactive</artifactId>
    </dependency>
    
    <!-- JSON 支持 -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-rest-client-reactive-jackson</artifactId>
    </dependency>
    
    <!-- 容错支持 -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-smallrye-fault-tolerance</artifactId>
    </dependency>
    
    <!-- 健康检查 -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-smallrye-health</artifactId>
    </dependency>
    
    <!-- 指标监控 -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-micrometer</artifactId>
    </dependency>
</dependencies>

5.2.2 定义 REST 客户端接口

package com.example.client;

import jakarta.ws.rs.*;
import jakarta.ws.rs.core.MediaType;
import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;
import org.eclipse.microprofile.faulttolerance.*;
import io.smallrye.mutiny.Uni;

import java.util.List;
import java.util.concurrent.CompletionStage;

// 用户服务客户端
@RegisterRestClient(configKey = "user-service")
@Path("/api/users")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public interface UserServiceClient {
    
    @GET
    @Path("/{id}")
    @Retry(maxRetries = 3, delay = 1000)
    @Timeout(value = 5000)
    @Fallback(fallbackMethod = "getUserFallback")
    Uni<User> getUser(@PathParam("id") Long id);
    
    @GET
    @CircuitBreaker(requestVolumeThreshold = 4, failureRatio = 0.5, delay = 10000)
    @Retry(maxRetries = 2)
    @Timeout(value = 3000)
    Uni<List<User>> getAllUsers(@QueryParam("page") int page, 
                               @QueryParam("size") int size);
    
    @POST
    @Retry(maxRetries = 2)
    @Timeout(value = 10000)
    Uni<User> createUser(User user);
    
    @PUT
    @Path("/{id}")
    @Retry(maxRetries = 2)
    @Timeout(value = 8000)
    Uni<User> updateUser(@PathParam("id") Long id, User user);
    
    @DELETE
    @Path("/{id}")
    @Retry(maxRetries = 1)
    @Timeout(value = 5000)
    Uni<Void> deleteUser(@PathParam("id") Long id);
    
    @GET
    @Path("/search")
    @Bulkhead(value = 5, waitingTaskQueue = 10)
    @Timeout(value = 15000)
    Uni<List<User>> searchUsers(@QueryParam("keyword") String keyword,
                               @QueryParam("department") String department);
    
    // 降级方法
    default Uni<User> getUserFallback(Long id) {
        User fallbackUser = new User();
        fallbackUser.id = id;
        fallbackUser.name = "Unknown User";
        fallbackUser.email = "unknown@example.com";
        fallbackUser.status = UserStatus.INACTIVE;
        return Uni.createFrom().item(fallbackUser);
    }
}

// 订单服务客户端
@RegisterRestClient(configKey = "order-service")
@Path("/api/orders")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public interface OrderServiceClient {
    
    @GET
    @Path("/user/{userId}")
    @Retry(maxRetries = 3)
    @Timeout(value = 5000)
    @CircuitBreaker(requestVolumeThreshold = 5, failureRatio = 0.6)
    Uni<List<Order>> getUserOrders(@PathParam("userId") Long userId);
    
    @POST
    @Retry(maxRetries = 2)
    @Timeout(value = 15000)
    @Bulkhead(value = 3)
    Uni<Order> createOrder(CreateOrderRequest request);
    
    @GET
    @Path("/{id}/status")
    @Retry(maxRetries = 2)
    @Timeout(value = 3000)
    Uni<OrderStatus> getOrderStatus(@PathParam("id") Long orderId);
    
    @PUT
    @Path("/{id}/cancel")
    @Retry(maxRetries = 1)
    @Timeout(value = 8000)
    Uni<Void> cancelOrder(@PathParam("id") Long orderId);
}

// 支付服务客户端
@RegisterRestClient(configKey = "payment-service")
@Path("/api/payments")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public interface PaymentServiceClient {
    
    @POST
    @Path("/process")
    @Retry(maxRetries = 2, delay = 2000)
    @Timeout(value = 30000)
    @CircuitBreaker(requestVolumeThreshold = 3, failureRatio = 0.7, delay = 15000)
    Uni<PaymentResult> processPayment(PaymentRequest request);
    
    @GET
    @Path("/{id}")
    @Retry(maxRetries = 3)
    @Timeout(value = 5000)
    Uni<Payment> getPayment(@PathParam("id") String paymentId);
    
    @POST
    @Path("/{id}/refund")
    @Retry(maxRetries = 1)
    @Timeout(value = 20000)
    Uni<RefundResult> refundPayment(@PathParam("id") String paymentId, 
                                   RefundRequest request);
}

5.2.3 配置 REST 客户端

application.properties 中配置客户端:

# 用户服务配置
quarkus.rest-client.user-service.url=http://localhost:8081
quarkus.rest-client.user-service.scope=jakarta.inject.Singleton
quarkus.rest-client.user-service.connect-timeout=5000
quarkus.rest-client.user-service.read-timeout=10000
quarkus.rest-client.user-service.max-redirects=3
quarkus.rest-client.user-service.follow-redirects=true
quarkus.rest-client.user-service.providers=com.example.client.UserServiceClientExceptionMapper

# 订单服务配置
quarkus.rest-client.order-service.url=http://localhost:8082
quarkus.rest-client.order-service.scope=jakarta.inject.Singleton
quarkus.rest-client.order-service.connect-timeout=3000
quarkus.rest-client.order-service.read-timeout=15000

# 支付服务配置
quarkus.rest-client.payment-service.url=http://localhost:8083
quarkus.rest-client.payment-service.scope=jakarta.inject.Singleton
quarkus.rest-client.payment-service.connect-timeout=5000
quarkus.rest-client.payment-service.read-timeout=30000

# 开发环境配置
%dev.quarkus.rest-client.user-service.url=http://localhost:8081
%dev.quarkus.rest-client.order-service.url=http://localhost:8082
%dev.quarkus.rest-client.payment-service.url=http://localhost:8083

# 生产环境配置
%prod.quarkus.rest-client.user-service.url=https://user-service.example.com
%prod.quarkus.rest-client.order-service.url=https://order-service.example.com
%prod.quarkus.rest-client.payment-service.url=https://payment-service.example.com

# SSL 配置
quarkus.rest-client.payment-service.trust-store=/path/to/truststore.jks
quarkus.rest-client.payment-service.trust-store-password=changeit
quarkus.rest-client.payment-service.trust-store-type=JKS

# 代理配置
quarkus.rest-client.user-service.proxy-address=proxy.example.com:8080
quarkus.rest-client.user-service.proxy-username=proxyuser
quarkus.rest-client.user-service.proxy-password=proxypass

# 日志配置
quarkus.log.category."org.eclipse.microprofile.rest.client".level=DEBUG
quarkus.rest-client.logging.scope=request-response
quarkus.rest-client.logging.body-limit=1024

5.2.4 使用 REST 客户端

package com.example.service;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.Multi;
import com.example.client.*;
import com.example.dto.*;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class BusinessService {
    
    @Inject
    @RestClient
    UserServiceClient userServiceClient;
    
    @Inject
    @RestClient
    OrderServiceClient orderServiceClient;
    
    @Inject
    @RestClient
    PaymentServiceClient paymentServiceClient;
    
    // 获取用户详细信息(包含订单)
    public Uni<UserWithOrders> getUserWithOrders(Long userId) {
        return userServiceClient.getUser(userId)
            .onItem().transformToUni(user -> 
                orderServiceClient.getUserOrders(userId)
                    .onItem().transform(orders -> {
                        UserWithOrders result = new UserWithOrders();
                        result.user = user;
                        result.orders = orders;
                        return result;
                    })
            )
            .onFailure().recoverWithItem(throwable -> {
                // 降级处理
                UserWithOrders fallback = new UserWithOrders();
                fallback.user = createFallbackUser(userId);
                fallback.orders = List.of();
                return fallback;
            });
    }
    
    // 并行获取多个用户信息
    public Uni<List<User>> getUsersInParallel(List<Long> userIds) {
        List<Uni<User>> userUnis = userIds.stream()
            .map(userServiceClient::getUser)
            .toList();
        
        return Uni.combine().all().unis(userUnis)
            .combinedWith(users -> (List<User>) users);
    }
    
    // 创建订单并处理支付
    public Uni<OrderResult> createOrderWithPayment(CreateOrderRequest orderRequest, 
                                                   PaymentRequest paymentRequest) {
        return orderServiceClient.createOrder(orderRequest)
            .onItem().transformToUni(order -> 
                paymentServiceClient.processPayment(paymentRequest)
                    .onItem().transform(paymentResult -> {
                        OrderResult result = new OrderResult();
                        result.order = order;
                        result.paymentResult = paymentResult;
                        result.success = paymentResult.status == PaymentStatus.SUCCESS;
                        return result;
                    })
                    .onFailure().recoverWithUni(throwable -> {
                        // 支付失败,取消订单
                        return orderServiceClient.cancelOrder(order.id)
                            .onItem().transform(ignored -> {
                                OrderResult result = new OrderResult();
                                result.order = order;
                                result.success = false;
                                result.errorMessage = "Payment failed: " + throwable.getMessage();
                                return result;
                            });
                    })
            );
    }
    
    // 批量处理订单状态
    public Uni<List<OrderStatusInfo>> getOrderStatuses(List<Long> orderIds) {
        return Multi.createFrom().iterable(orderIds)
            .onItem().transformToUniAndMerge(orderId -> 
                orderServiceClient.getOrderStatus(orderId)
                    .onItem().transform(status -> {
                        OrderStatusInfo info = new OrderStatusInfo();
                        info.orderId = orderId;
                        info.status = status;
                        return info;
                    })
                    .onFailure().recoverWithItem(throwable -> {
                        OrderStatusInfo info = new OrderStatusInfo();
                        info.orderId = orderId;
                        info.status = OrderStatus.UNKNOWN;
                        info.error = throwable.getMessage();
                        return info;
                    })
            )
            .collect().asList();
    }
    
    // 搜索用户并获取其订单统计
    public Uni<List<UserOrderSummary>> searchUsersWithOrderSummary(String keyword, 
                                                                   String department) {
        return userServiceClient.searchUsers(keyword, department)
            .onItem().transformToUni(users -> {
                List<Uni<UserOrderSummary>> summaryUnis = users.stream()
                    .map(user -> 
                        orderServiceClient.getUserOrders(user.id)
                            .onItem().transform(orders -> {
                                UserOrderSummary summary = new UserOrderSummary();
                                summary.user = user;
                                summary.totalOrders = orders.size();
                                summary.totalAmount = orders.stream()
                                    .map(order -> order.totalAmount)
                                    .reduce(BigDecimal.ZERO, BigDecimal::add);
                                return summary;
                            })
                            .onFailure().recoverWithItem(throwable -> {
                                UserOrderSummary summary = new UserOrderSummary();
                                summary.user = user;
                                summary.totalOrders = 0;
                                summary.totalAmount = BigDecimal.ZERO;
                                return summary;
                            })
                    )
                    .toList();
                
                return Uni.combine().all().unis(summaryUnis)
                    .combinedWith(summaries -> (List<UserOrderSummary>) summaries);
            });
    }
    
    // 超时和重试示例
    public Uni<String> complexOperation(Long userId) {
        return userServiceClient.getUser(userId)
            .onItem().transformToUni(user -> 
                orderServiceClient.getUserOrders(userId)
                    .onItem().transformToUni(orders -> {
                        if (!orders.isEmpty()) {
                            Order latestOrder = orders.get(0);
                            return paymentServiceClient.getPayment(latestOrder.paymentId)
                                .onItem().transform(payment -> 
                                    String.format("User %s has %d orders, latest payment: %s",
                                                 user.name, orders.size(), payment.status)
                                );
                        } else {
                            return Uni.createFrom().item(
                                String.format("User %s has no orders", user.name)
                            );
                        }
                    })
            )
            .ifNoItem().after(Duration.ofSeconds(30))
            .fail()
            .onFailure().retry().atMost(2)
            .onFailure().recoverWithItem("Operation failed after retries");
    }
    
    private User createFallbackUser(Long userId) {
        User user = new User();
        user.id = userId;
        user.name = "Unknown User";
        user.email = "unknown@example.com";
        user.status = UserStatus.INACTIVE;
        return user;
    }
}

5.2.5 异常处理和映射

package com.example.client;

import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.ext.Provider;
import org.eclipse.microprofile.rest.client.ext.ResponseExceptionMapper;
import com.example.exception.*;

@Provider
public class UserServiceClientExceptionMapper implements ResponseExceptionMapper<RuntimeException> {
    
    @Override
    public RuntimeException toThrowable(Response response) {
        int status = response.getStatus();
        String message = response.readEntity(String.class);
        
        return switch (status) {
            case 400 -> new BadRequestException("Bad request: " + message);
            case 401 -> new UnauthorizedException("Unauthorized: " + message);
            case 403 -> new ForbiddenException("Forbidden: " + message);
            case 404 -> new NotFoundException("User not found: " + message);
            case 409 -> new ConflictException("Conflict: " + message);
            case 429 -> new TooManyRequestsException("Rate limit exceeded: " + message);
            case 500 -> new InternalServerErrorException("Internal server error: " + message);
            case 502 -> new BadGatewayException("Bad gateway: " + message);
            case 503 -> new ServiceUnavailableException("Service unavailable: " + message);
            case 504 -> new GatewayTimeoutException("Gateway timeout: " + message);
            default -> new RuntimeException("HTTP " + status + ": " + message);
        };
    }
}

// 自定义异常类
public class BadRequestException extends RuntimeException {
    public BadRequestException(String message) {
        super(message);
    }
}

public class UnauthorizedException extends RuntimeException {
    public UnauthorizedException(String message) {
        super(message);
    }
}

public class NotFoundException extends RuntimeException {
    public NotFoundException(String message) {
        super(message);
    }
}

public class ConflictException extends RuntimeException {
    public ConflictException(String message) {
        super(message);
    }
}

public class TooManyRequestsException extends RuntimeException {
    public TooManyRequestsException(String message) {
        super(message);
    }
}

public class InternalServerErrorException extends RuntimeException {
    public InternalServerErrorException(String message) {
        super(message);
    }
}

public class BadGatewayException extends RuntimeException {
    public BadGatewayException(String message) {
        super(message);
    }
}

public class ServiceUnavailableException extends RuntimeException {
    public ServiceUnavailableException(String message) {
        super(message);
    }
}

public class GatewayTimeoutException extends RuntimeException {
    public GatewayTimeoutException(String message) {
        super(message);
    }
}

5.3 响应式消息处理

5.3.1 添加消息处理依赖

<dependencies>
    <!-- SmallRye Reactive Messaging -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-smallrye-reactive-messaging</artifactId>
    </dependency>
    
    <!-- Apache Kafka -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
    </dependency>
    
    <!-- RabbitMQ -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-smallrye-reactive-messaging-rabbitmq</artifactId>
    </dependency>
    
    <!-- AMQP -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-smallrye-reactive-messaging-amqp</artifactId>
    </dependency>
    
    <!-- JSON 序列化 -->
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-jackson</artifactId>
    </dependency>
</dependencies>

5.3.2 配置消息系统

# Kafka 配置
kafka.bootstrap.servers=localhost:9092

# 用户事件通道配置
mp.messaging.outgoing.user-events.connector=smallrye-kafka
mp.messaging.outgoing.user-events.topic=user-events
mp.messaging.outgoing.user-events.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.user-events.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer

# 订单事件通道配置
mp.messaging.outgoing.order-events.connector=smallrye-kafka
mp.messaging.outgoing.order-events.topic=order-events
mp.messaging.outgoing.order-events.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.order-events.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer

# 支付事件通道配置
mp.messaging.incoming.payment-events.connector=smallrye-kafka
mp.messaging.incoming.payment-events.topic=payment-events
mp.messaging.incoming.payment-events.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.payment-events.value.deserializer=io.quarkus.kafka.client.serialization.JsonbDeserializer
mp.messaging.incoming.payment-events.group.id=order-service-group

# 通知事件通道配置
mp.messaging.incoming.notification-events.connector=smallrye-kafka
mp.messaging.incoming.notification-events.topic=notification-events
mp.messaging.incoming.notification-events.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.notification-events.value.deserializer=io.quarkus.kafka.client.serialization.JsonbDeserializer
mp.messaging.incoming.notification-events.group.id=notification-service-group

# RabbitMQ 配置
rabbitmq-host=localhost
rabbitmq-port=5672
rabbitmq-username=guest
rabbitmq-password=guest

# 邮件通知队列配置
mp.messaging.outgoing.email-notifications.connector=smallrye-rabbitmq
mp.messaging.outgoing.email-notifications.exchange.name=notifications
mp.messaging.outgoing.email-notifications.routing-key=email
mp.messaging.outgoing.email-notifications.exchange.type=topic

# SMS 通知队列配置
mp.messaging.outgoing.sms-notifications.connector=smallrye-rabbitmq
mp.messaging.outgoing.sms-notifications.exchange.name=notifications
mp.messaging.outgoing.sms-notifications.routing-key=sms
mp.messaging.outgoing.sms-notifications.exchange.type=topic

# 死信队列配置
mp.messaging.incoming.failed-messages.connector=smallrye-kafka
mp.messaging.incoming.failed-messages.topic=failed-messages
mp.messaging.incoming.failed-messages.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.failed-messages.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.failed-messages.group.id=error-handler-group

# 重试配置
mp.messaging.incoming.payment-events.failure-strategy=ignore
mp.messaging.incoming.notification-events.retry=true
mp.messaging.incoming.notification-events.max-retries=3
mp.messaging.incoming.notification-events.retry-delay=5000

5.3.3 事件模型定义

package com.example.event;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.LocalDateTime;
import java.math.BigDecimal;

// 基础事件类
public abstract class BaseEvent {
    @JsonProperty("event_id")
    public String eventId;
    
    @JsonProperty("event_type")
    public String eventType;
    
    @JsonProperty("timestamp")
    public LocalDateTime timestamp;
    
    @JsonProperty("source_service")
    public String sourceService;
    
    @JsonProperty("correlation_id")
    public String correlationId;
    
    public BaseEvent() {
        this.eventId = java.util.UUID.randomUUID().toString();
        this.timestamp = LocalDateTime.now();
    }
}

// 用户事件
public class UserEvent extends BaseEvent {
    @JsonProperty("user_id")
    public Long userId;
    
    @JsonProperty("user_name")
    public String userName;
    
    @JsonProperty("user_email")
    public String userEmail;
    
    @JsonProperty("action")
    public UserAction action;
    
    @JsonProperty("previous_data")
    public Object previousData;
    
    @JsonProperty("current_data")
    public Object currentData;
    
    public UserEvent() {
        super();
        this.eventType = "USER_EVENT";
    }
}

public enum UserAction {
    CREATED,
    UPDATED,
    DELETED,
    ACTIVATED,
    DEACTIVATED,
    PASSWORD_CHANGED,
    EMAIL_VERIFIED
}

// 订单事件
public class OrderEvent extends BaseEvent {
    @JsonProperty("order_id")
    public Long orderId;
    
    @JsonProperty("user_id")
    public Long userId;
    
    @JsonProperty("order_status")
    public OrderStatus orderStatus;
    
    @JsonProperty("total_amount")
    public BigDecimal totalAmount;
    
    @JsonProperty("action")
    public OrderAction action;
    
    @JsonProperty("items")
    public List<OrderItemData> items;
    
    public OrderEvent() {
        super();
        this.eventType = "ORDER_EVENT";
    }
}

public enum OrderAction {
    CREATED,
    UPDATED,
    CANCELLED,
    CONFIRMED,
    SHIPPED,
    DELIVERED,
    RETURNED
}

public class OrderItemData {
    @JsonProperty("product_id")
    public Long productId;
    
    @JsonProperty("product_name")
    public String productName;
    
    @JsonProperty("quantity")
    public Integer quantity;
    
    @JsonProperty("unit_price")
    public BigDecimal unitPrice;
    
    @JsonProperty("total_price")
    public BigDecimal totalPrice;
}

// 支付事件
public class PaymentEvent extends BaseEvent {
    @JsonProperty("payment_id")
    public String paymentId;
    
    @JsonProperty("order_id")
    public Long orderId;
    
    @JsonProperty("user_id")
    public Long userId;
    
    @JsonProperty("amount")
    public BigDecimal amount;
    
    @JsonProperty("currency")
    public String currency;
    
    @JsonProperty("payment_method")
    public String paymentMethod;
    
    @JsonProperty("status")
    public PaymentStatus status;
    
    @JsonProperty("action")
    public PaymentAction action;
    
    @JsonProperty("transaction_id")
    public String transactionId;
    
    @JsonProperty("failure_reason")
    public String failureReason;
    
    public PaymentEvent() {
        super();
        this.eventType = "PAYMENT_EVENT";
    }
}

public enum PaymentAction {
    INITIATED,
    PROCESSING,
    COMPLETED,
    FAILED,
    CANCELLED,
    REFUNDED,
    PARTIALLY_REFUNDED
}

// 通知事件
public class NotificationEvent extends BaseEvent {
    @JsonProperty("recipient_id")
    public Long recipientId;
    
    @JsonProperty("recipient_email")
    public String recipientEmail;
    
    @JsonProperty("recipient_phone")
    public String recipientPhone;
    
    @JsonProperty("notification_type")
    public NotificationType notificationType;
    
    @JsonProperty("channel")
    public NotificationChannel channel;
    
    @JsonProperty("subject")
    public String subject;
    
    @JsonProperty("content")
    public String content;
    
    @JsonProperty("template_id")
    public String templateId;
    
    @JsonProperty("template_data")
    public Map<String, Object> templateData;
    
    @JsonProperty("priority")
    public NotificationPriority priority;
    
    @JsonProperty("scheduled_time")
    public LocalDateTime scheduledTime;
    
    public NotificationEvent() {
        super();
        this.eventType = "NOTIFICATION_EVENT";
    }
}

public enum NotificationType {
    WELCOME,
    ORDER_CONFIRMATION,
    ORDER_SHIPPED,
    ORDER_DELIVERED,
    PAYMENT_SUCCESS,
    PAYMENT_FAILED,
    PASSWORD_RESET,
    ACCOUNT_VERIFICATION,
    PROMOTIONAL,
    SYSTEM_ALERT
}

public enum NotificationChannel {
    EMAIL,
    SMS,
    PUSH,
    IN_APP,
    WEBHOOK
}

public enum NotificationPriority {
    LOW,
    NORMAL,
    HIGH,
    URGENT
}

5.3.4 事件发布者

package com.example.messaging;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Message;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import com.example.event.*;

import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class EventPublisher {
    
    @Inject
    @Channel("user-events")
    Emitter<UserEvent> userEventEmitter;
    
    @Inject
    @Channel("order-events")
    Emitter<OrderEvent> orderEventEmitter;
    
    @Inject
    @Channel("email-notifications")
    Emitter<NotificationEvent> emailNotificationEmitter;
    
    @Inject
    @Channel("sms-notifications")
    Emitter<NotificationEvent> smsNotificationEmitter;
    
    // 发布用户事件
    public Uni<Void> publishUserEvent(UserEvent event) {
        event.sourceService = "user-service";
        
        OutgoingKafkaRecordMetadata<String> metadata = OutgoingKafkaRecordMetadata.<String>builder()
            .withKey(event.userId.toString())
            .withPartition(event.userId.intValue() % 3) // 按用户ID分区
            .build();
        
        Message<UserEvent> message = Message.of(event)
            .addMetadata(metadata);
        
        return Uni.createFrom().completionStage(
            userEventEmitter.send(message)
        );
    }
    
    // 发布订单事件
    public Uni<Void> publishOrderEvent(OrderEvent event) {
        event.sourceService = "order-service";
        
        OutgoingKafkaRecordMetadata<String> metadata = OutgoingKafkaRecordMetadata.<String>builder()
            .withKey(event.orderId.toString())
            .withPartition(event.userId.intValue() % 5) // 按用户ID分区
            .build();
        
        Message<OrderEvent> message = Message.of(event)
            .addMetadata(metadata);
        
        return Uni.createFrom().completionStage(
            orderEventEmitter.send(message)
        );
    }
    
    // 发布邮件通知事件
    public Uni<Void> publishEmailNotification(NotificationEvent event) {
        event.sourceService = "notification-service";
        event.channel = NotificationChannel.EMAIL;
        
        Message<NotificationEvent> message = Message.of(event);
        
        return Uni.createFrom().completionStage(
            emailNotificationEmitter.send(message)
        );
    }
    
    // 发布短信通知事件
    public Uni<Void> publishSmsNotification(NotificationEvent event) {
        event.sourceService = "notification-service";
        event.channel = NotificationChannel.SMS;
        
        Message<NotificationEvent> message = Message.of(event);
        
        return Uni.createFrom().completionStage(
            smsNotificationEmitter.send(message)
        );
    }
    
    // 批量发布事件
    public Uni<Void> publishUserEvents(List<UserEvent> events) {
        List<Uni<Void>> publishUnis = events.stream()
            .map(this::publishUserEvent)
            .toList();
        
        return Uni.combine().all().unis(publishUnis)
            .discardItems();
    }
    
    // 条件发布
    public Uni<Void> publishOrderEventIfNeeded(OrderEvent event, boolean condition) {
        if (condition) {
            return publishOrderEvent(event);
        } else {
            return Uni.createFrom().voidItem();
        }
    }
    
    // 延迟发布
    public Uni<Void> publishDelayedNotification(NotificationEvent event, Duration delay) {
        return Uni.createFrom().voidItem()
            .onItem().delayIt().by(delay)
            .onItem().transformToUni(ignored -> publishEmailNotification(event));
    }
}

5.3.5 事件消费者

package com.example.messaging;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;
import com.example.event.*;
import com.example.service.*;

import java.util.concurrent.CompletionStage;
import java.time.Duration;

@ApplicationScoped
public class EventConsumer {
    
    @Inject
    OrderService orderService;
    
    @Inject
    NotificationService notificationService;
    
    @Inject
    AnalyticsService analyticsService;
    
    @Inject
    EventPublisher eventPublisher;
    
    // 处理支付事件
    @Incoming("payment-events")
    @Acknowledgment(Acknowledgment.Strategy.MANUAL)
    public Uni<Void> handlePaymentEvent(Message<PaymentEvent> message) {
        PaymentEvent event = message.getPayload();
        
        return processPaymentEvent(event)
            .onItem().transformToUni(ignored -> {
                // 手动确认消息
                return Uni.createFrom().completionStage(message.ack());
            })
            .onFailure().recoverWithUni(throwable -> {
                // 处理失败,拒绝消息
                logger.error("Failed to process payment event: {}", event.paymentId, throwable);
                return Uni.createFrom().completionStage(message.nack(throwable));
            });
    }
    
    // 处理通知事件
    @Incoming("notification-events")
    @Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
    public Uni<Void> handleNotificationEvent(Message<NotificationEvent> message) {
        NotificationEvent event = message.getPayload();
        
        // 获取 Kafka 元数据
        IncomingKafkaRecordMetadata metadata = message.getMetadata(IncomingKafkaRecordMetadata.class)
            .orElse(null);
        
        if (metadata != null) {
            logger.info("Processing notification from partition: {}, offset: {}", 
                       metadata.getPartition(), metadata.getOffset());
        }
        
        return processNotificationEvent(event)
            .onFailure().retry().withBackOff(Duration.ofSeconds(1), Duration.ofSeconds(10))
            .atMost(3);
    }
    
    // 处理失败消息
    @Incoming("failed-messages")
    public Uni<Void> handleFailedMessage(Message<String> message) {
        String failedMessage = message.getPayload();
        
        logger.warn("Received failed message: {}", failedMessage);
        
        // 记录失败消息到数据库或发送告警
        return analyticsService.recordFailedMessage(failedMessage)
            .onItem().transformToUni(ignored -> {
                // 可以选择重新发布到原始主题或死信队列
                return Uni.createFrom().voidItem();
            });
    }
    
    private Uni<Void> processPaymentEvent(PaymentEvent event) {
        return switch (event.action) {
            case COMPLETED -> handlePaymentCompleted(event);
            case FAILED -> handlePaymentFailed(event);
            case REFUNDED -> handlePaymentRefunded(event);
            default -> {
                logger.info("Ignoring payment event action: {}", event.action);
                yield Uni.createFrom().voidItem();
            }
        };
    }
    
    private Uni<Void> handlePaymentCompleted(PaymentEvent event) {
        return orderService.markOrderAsPaid(event.orderId)
            .onItem().transformToUni(order -> {
                // 发布订单确认事件
                OrderEvent orderEvent = new OrderEvent();
                orderEvent.orderId = order.id;
                orderEvent.userId = order.userId;
                orderEvent.orderStatus = OrderStatus.CONFIRMED;
                orderEvent.totalAmount = order.totalAmount;
                orderEvent.action = OrderAction.CONFIRMED;
                orderEvent.correlationId = event.correlationId;
                
                return eventPublisher.publishOrderEvent(orderEvent);
            })
            .onItem().transformToUni(ignored -> {
                // 发送确认通知
                NotificationEvent notification = new NotificationEvent();
                notification.recipientId = event.userId;
                notification.notificationType = NotificationType.ORDER_CONFIRMATION;
                notification.priority = NotificationPriority.HIGH;
                notification.correlationId = event.correlationId;
                
                return eventPublisher.publishEmailNotification(notification);
            });
    }
    
    private Uni<Void> handlePaymentFailed(PaymentEvent event) {
        return orderService.markOrderAsPaymentFailed(event.orderId)
            .onItem().transformToUni(order -> {
                // 发送支付失败通知
                NotificationEvent notification = new NotificationEvent();
                notification.recipientId = event.userId;
                notification.notificationType = NotificationType.PAYMENT_FAILED;
                notification.priority = NotificationPriority.HIGH;
                notification.correlationId = event.correlationId;
                
                return eventPublisher.publishEmailNotification(notification);
            });
    }
    
    private Uni<Void> handlePaymentRefunded(PaymentEvent event) {
        return orderService.processRefund(event.orderId, event.amount)
            .onItem().transformToUni(refund -> {
                // 发送退款通知
                NotificationEvent notification = new NotificationEvent();
                notification.recipientId = event.userId;
                notification.notificationType = NotificationType.PAYMENT_SUCCESS;
                notification.subject = "Refund Processed";
                notification.priority = NotificationPriority.NORMAL;
                notification.correlationId = event.correlationId;
                
                return eventPublisher.publishEmailNotification(notification);
            });
    }
    
    private Uni<Void> processNotificationEvent(NotificationEvent event) {
        return switch (event.channel) {
            case EMAIL -> notificationService.sendEmail(event);
            case SMS -> notificationService.sendSms(event);
            case PUSH -> notificationService.sendPushNotification(event);
            case IN_APP -> notificationService.createInAppNotification(event);
            case WEBHOOK -> notificationService.sendWebhook(event);
        };
    }
}

5.4 服务发现与负载均衡

5.4.1 Consul 集成

添加 Consul 依赖:

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-consul-config</artifactId>
</dependency>

配置 Consul:

# Consul 配置
quarkus.consul-config.enabled=true
quarkus.consul-config.agent.host-port=localhost:8500
quarkus.consul-config.prefix=config
quarkus.consul-config.fail-on-missing-key=false

# 服务注册
quarkus.application.name=order-service
quarkus.application.version=1.0.0

# 健康检查
quarkus.consul-config.health-check.enabled=true
quarkus.consul-config.health-check.path=/health
quarkus.consul-config.health-check.interval=10s

5.4.2 Kubernetes 服务发现

# kubernetes-service-discovery.yaml
apiVersion: v1
kind: Service
metadata:
  name: user-service
  labels:
    app: user-service
spec:
  selector:
    app: user-service
  ports:
    - port: 8080
      targetPort: 8080
  type: ClusterIP
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: user-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: user-service
  template:
    metadata:
      labels:
        app: user-service
    spec:
      containers:
      - name: user-service
        image: user-service:latest
        ports:
        - containerPort: 8080
        env:
        - name: QUARKUS_PROFILE
          value: "prod"
        livenessProbe:
          httpGet:
            path: /health/live
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health/ready
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 5

配置 Kubernetes 服务发现:

# Kubernetes 配置
quarkus.kubernetes-client.trust-certs=true
quarkus.kubernetes-client.namespace=default

# 服务发现配置
quarkus.rest-client.user-service.url=http://user-service:8080
quarkus.rest-client.order-service.url=http://order-service:8080
quarkus.rest-client.payment-service.url=http://payment-service:8080

# 负载均衡配置
quarkus.rest-client.user-service.providers=io.quarkus.restclient.runtime.RestClientBuilderImpl$LoadBalancerProvider

5.5 本章小结

5.5.1 核心概念回顾

本章深入探讨了 Quarkus 中的微服务通信与集成技术,主要包括:

  1. REST 客户端:学习了如何使用 Quarkus REST 客户端进行服务间通信,包括容错机制

  2. 响应式消息处理:掌握了 SmallRye Reactive Messaging 的使用,支持 Kafka、RabbitMQ 等

  3. 事件驱动架构:了解了事件模型设计、事件发布和消费的最佳实践

  4. 服务发现:学习了 Consul 和 Kubernetes 服务发现的配置和使用

  5. 容错机制:掌握了重试、超时、断路器、舱壁等容错模式

5.5.2 技术要点总结

  • 客户端配置:合理配置超时、重试和连接池参数
  • 容错设计:使用断路器、降级和舱壁模式提高系统稳定性
  • 消息处理:选择合适的消息确认策略和错误处理机制
  • 事件设计:设计清晰的事件模型和处理流程
  • 监控告警:实施完善的监控和告警机制

5.5.3 最佳实践

  1. 通信设计原则

    • 优先使用异步通信
    • 实现幂等性操作
    • 设计合理的重试策略
  2. 消息处理策略

    • 使用死信队列处理失败消息
    • 实现消息去重机制
    • 合理设置消费者并发度
  3. 服务治理策略

    • 实施服务版本管理
    • 使用健康检查和服务注册
    • 实现优雅的服务下线

5.5.4 下一章预告

下一章我们将学习安全认证与授权,包括: - JWT 令牌认证 - OAuth2 和 OpenID Connect - 角色和权限管理 - API 安全最佳实践 - 安全审计和监控

通过本章的学习,你已经掌握了微服务通信的核心技术,为构建分布式系统奠定了坚实的基础。