本章将通过完整的实战项目,展示如何在真实场景中应用RabbitMQ的各种特性和最佳实践。我们将构建一个电商订单处理系统,涵盖订单创建、支付处理、库存管理、物流配送等完整业务流程。

10.1 项目架构设计

系统架构概览

from enum import Enum
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Callable
from datetime import datetime, timedelta
import json
import uuid
import logging
import asyncio
import time
from abc import ABC, abstractmethod

# 业务状态枚举
class OrderStatus(Enum):
    """订单状态"""
    PENDING = "pending"           # 待处理
    CONFIRMED = "confirmed"       # 已确认
    PAID = "paid"                # 已支付
    PROCESSING = "processing"     # 处理中
    SHIPPED = "shipped"          # 已发货
    DELIVERED = "delivered"      # 已送达
    CANCELLED = "cancelled"      # 已取消
    REFUNDED = "refunded"        # 已退款

class PaymentStatus(Enum):
    """支付状态"""
    PENDING = "pending"          # 待支付
    PROCESSING = "processing"    # 支付中
    SUCCESS = "success"          # 支付成功
    FAILED = "failed"            # 支付失败
    REFUNDED = "refunded"        # 已退款

class InventoryOperation(Enum):
    """库存操作类型"""
    RESERVE = "reserve"          # 预留
    CONFIRM = "confirm"          # 确认
    RELEASE = "release"          # 释放
    DEDUCT = "deduct"            # 扣减

class NotificationType(Enum):
    """通知类型"""
    EMAIL = "email"
    SMS = "sms"
    PUSH = "push"
    WEBHOOK = "webhook"

# 业务数据模型
@dataclass
class Product:
    """商品信息"""
    product_id: str
    name: str
    price: float
    category: str
    description: str = ""
    attributes: Dict[str, Any] = field(default_factory=dict)

@dataclass
class OrderItem:
    """订单项"""
    product_id: str
    product_name: str
    quantity: int
    unit_price: float
    total_price: float
    attributes: Dict[str, Any] = field(default_factory=dict)

@dataclass
class Customer:
    """客户信息"""
    customer_id: str
    name: str
    email: str
    phone: str
    address: str
    level: str = "regular"  # regular, vip, premium

@dataclass
class Order:
    """订单信息"""
    order_id: str
    customer: Customer
    items: List[OrderItem]
    total_amount: float
    status: OrderStatus
    created_at: datetime
    updated_at: datetime
    payment_method: str = ""
    shipping_address: str = ""
    notes: str = ""
    metadata: Dict[str, Any] = field(default_factory=dict)

@dataclass
class PaymentRequest:
    """支付请求"""
    payment_id: str
    order_id: str
    amount: float
    payment_method: str
    customer_id: str
    created_at: datetime
    status: PaymentStatus = PaymentStatus.PENDING
    gateway_response: Dict[str, Any] = field(default_factory=dict)

@dataclass
class InventoryItem:
    """库存项"""
    product_id: str
    available_quantity: int
    reserved_quantity: int
    total_quantity: int
    warehouse_id: str
    last_updated: datetime

@dataclass
class ShippingInfo:
    """物流信息"""
    shipping_id: str
    order_id: str
    carrier: str
    tracking_number: str
    shipping_address: str
    estimated_delivery: datetime
    status: str
    created_at: datetime

# 消息事件定义
@dataclass
class OrderEvent:
    """订单事件"""
    event_id: str
    event_type: str
    order_id: str
    timestamp: datetime
    data: Dict[str, Any]
    correlation_id: str = ""
    retry_count: int = 0

@dataclass
class PaymentEvent:
    """支付事件"""
    event_id: str
    event_type: str
    payment_id: str
    order_id: str
    timestamp: datetime
    data: Dict[str, Any]
    correlation_id: str = ""

@dataclass
class InventoryEvent:
    """库存事件"""
    event_id: str
    event_type: str
    product_id: str
    operation: InventoryOperation
    quantity: int
    timestamp: datetime
    order_id: str = ""
    correlation_id: str = ""

@dataclass
class NotificationEvent:
    """通知事件"""
    event_id: str
    notification_type: NotificationType
    recipient: str
    subject: str
    content: str
    template_id: str = ""
    data: Dict[str, Any] = field(default_factory=dict)
    timestamp: datetime = field(default_factory=datetime.now)

消息队列架构设计

class MessageQueueConfig:
    """消息队列配置"""
    
    def __init__(self):
        # 交换机配置
        self.exchanges = {
            "orders": {
                "name": "ecommerce.orders",
                "type": "topic",
                "durable": True
            },
            "payments": {
                "name": "ecommerce.payments",
                "type": "topic",
                "durable": True
            },
            "inventory": {
                "name": "ecommerce.inventory",
                "type": "topic",
                "durable": True
            },
            "notifications": {
                "name": "ecommerce.notifications",
                "type": "direct",
                "durable": True
            },
            "dlx": {
                "name": "ecommerce.dlx",
                "type": "direct",
                "durable": True
            }
        }
        
        # 队列配置
        self.queues = {
            # 订单相关队列
            "order.created": {
                "name": "ecommerce.order.created",
                "routing_key": "order.created",
                "exchange": "orders",
                "durable": True,
                "arguments": {
                    "x-dead-letter-exchange": "ecommerce.dlx",
                    "x-dead-letter-routing-key": "order.created.failed"
                }
            },
            "order.confirmed": {
                "name": "ecommerce.order.confirmed",
                "routing_key": "order.confirmed",
                "exchange": "orders",
                "durable": True
            },
            "order.cancelled": {
                "name": "ecommerce.order.cancelled",
                "routing_key": "order.cancelled",
                "exchange": "orders",
                "durable": True
            },
            
            # 支付相关队列
            "payment.process": {
                "name": "ecommerce.payment.process",
                "routing_key": "payment.process",
                "exchange": "payments",
                "durable": True,
                "arguments": {
                    "x-dead-letter-exchange": "ecommerce.dlx",
                    "x-message-ttl": 300000  # 5分钟TTL
                }
            },
            "payment.success": {
                "name": "ecommerce.payment.success",
                "routing_key": "payment.success",
                "exchange": "payments",
                "durable": True
            },
            "payment.failed": {
                "name": "ecommerce.payment.failed",
                "routing_key": "payment.failed",
                "exchange": "payments",
                "durable": True
            },
            
            # 库存相关队列
            "inventory.reserve": {
                "name": "ecommerce.inventory.reserve",
                "routing_key": "inventory.reserve",
                "exchange": "inventory",
                "durable": True
            },
            "inventory.confirm": {
                "name": "ecommerce.inventory.confirm",
                "routing_key": "inventory.confirm",
                "exchange": "inventory",
                "durable": True
            },
            "inventory.release": {
                "name": "ecommerce.inventory.release",
                "routing_key": "inventory.release",
                "exchange": "inventory",
                "durable": True
            },
            
            # 通知相关队列
            "notification.email": {
                "name": "ecommerce.notification.email",
                "routing_key": "email",
                "exchange": "notifications",
                "durable": True
            },
            "notification.sms": {
                "name": "ecommerce.notification.sms",
                "routing_key": "sms",
                "exchange": "notifications",
                "durable": True
            },
            
            # 死信队列
            "dlq.orders": {
                "name": "ecommerce.dlq.orders",
                "routing_key": "order.*.failed",
                "exchange": "dlx",
                "durable": True
            },
            "dlq.payments": {
                "name": "ecommerce.dlq.payments",
                "routing_key": "payment.*.failed",
                "exchange": "dlx",
                "durable": True
            }
        }
        
        # 路由键模式
        self.routing_patterns = {
            "order_events": "order.*",
            "payment_events": "payment.*",
            "inventory_events": "inventory.*",
            "high_priority": "*.urgent",
            "customer_notifications": "notification.customer.*"
        }

class ECommerceMessageBroker:
    """电商消息代理"""
    
    def __init__(self, connection_url: str = "amqp://localhost"):
        self.connection_url = connection_url
        self.config = MessageQueueConfig()
        self.connection = None
        self.channel = None
        self.logger = logging.getLogger(__name__)
        
        # 消息处理器注册表
        self.message_handlers: Dict[str, List[Callable]] = {}
        
        # 消息序列化器
        self.serializer = self._create_serializer()
    
    def _create_serializer(self):
        """创建消息序列化器"""
        class MessageSerializer:
            @staticmethod
            def serialize(obj) -> str:
                if hasattr(obj, '__dict__'):
                    # 处理dataclass对象
                    data = obj.__dict__.copy()
                    # 处理datetime对象
                    for key, value in data.items():
                        if isinstance(value, datetime):
                            data[key] = value.isoformat()
                        elif isinstance(value, Enum):
                            data[key] = value.value
                    return json.dumps(data, ensure_ascii=False)
                return json.dumps(obj, ensure_ascii=False)
            
            @staticmethod
            def deserialize(data: str, target_class=None) -> Dict[str, Any]:
                parsed = json.loads(data)
                if target_class and hasattr(target_class, '__annotations__'):
                    # 尝试转换为目标类型
                    annotations = target_class.__annotations__
                    for key, value in parsed.items():
                        if key in annotations:
                            annotation = annotations[key]
                            if annotation == datetime and isinstance(value, str):
                                try:
                                    parsed[key] = datetime.fromisoformat(value)
                                except ValueError:
                                    pass
                return parsed
        
        return MessageSerializer()
    
    async def connect(self):
        """建立连接"""
        try:
            # 这里使用模拟的连接,实际项目中使用aio-pika或pika
            self.logger.info(f"连接到RabbitMQ: {self.connection_url}")
            self.connection = True  # 模拟连接
            self.channel = True     # 模拟通道
            
            # 初始化交换机和队列
            await self._setup_topology()
            
            self.logger.info("RabbitMQ连接建立成功")
            
        except Exception as e:
            self.logger.error(f"连接RabbitMQ失败: {e}")
            raise
    
    async def _setup_topology(self):
        """设置消息队列拓扑结构"""
        self.logger.info("设置消息队列拓扑结构")
        
        # 创建交换机
        for exchange_name, exchange_config in self.config.exchanges.items():
            self.logger.info(f"创建交换机: {exchange_config['name']}")
            # 实际项目中这里会调用RabbitMQ API
        
        # 创建队列并绑定
        for queue_name, queue_config in self.config.queues.items():
            self.logger.info(f"创建队列: {queue_config['name']}")
            # 实际项目中这里会调用RabbitMQ API
        
        self.logger.info("拓扑结构设置完成")
    
    async def publish_message(self, exchange: str, routing_key: str, 
                            message: Any, priority: int = 0,
                            correlation_id: str = None) -> bool:
        """发布消息"""
        try:
            if not self.connection:
                await self.connect()
            
            # 序列化消息
            serialized_message = self.serializer.serialize(message)
            
            # 生成消息ID
            message_id = str(uuid.uuid4())
            
            # 消息属性
            properties = {
                "message_id": message_id,
                "timestamp": datetime.now().isoformat(),
                "priority": priority,
                "delivery_mode": 2,  # 持久化
                "correlation_id": correlation_id or str(uuid.uuid4())
            }
            
            self.logger.info(
                f"发布消息到 {exchange}/{routing_key}: {message_id}"
            )
            
            # 实际项目中这里会调用RabbitMQ发布API
            # await channel.basic_publish(
            #     exchange=exchange,
            #     routing_key=routing_key,
            #     body=serialized_message,
            #     properties=properties
            # )
            
            return True
            
        except Exception as e:
            self.logger.error(f"发布消息失败: {e}")
            return False
    
    def register_handler(self, queue_name: str, handler: Callable):
        """注册消息处理器"""
        if queue_name not in self.message_handlers:
            self.message_handlers[queue_name] = []
        
        self.message_handlers[queue_name].append(handler)
        self.logger.info(f"注册处理器到队列: {queue_name}")
    
    async def start_consuming(self, queue_name: str, 
                            prefetch_count: int = 10):
        """开始消费消息"""
        if not self.connection:
            await self.connect()
        
        self.logger.info(f"开始消费队列: {queue_name}")
        
        # 实际项目中这里会设置消费者
        # await channel.basic_qos(prefetch_count=prefetch_count)
        # await channel.basic_consume(
        #     queue=queue_name,
        #     on_message_callback=self._handle_message
        # )
    
    async def _handle_message(self, channel, method, properties, body):
        """处理接收到的消息"""
        try:
            # 反序列化消息
            message_data = self.serializer.deserialize(body.decode())
            
            # 获取队列名称
            queue_name = method.routing_key
            
            # 调用注册的处理器
            if queue_name in self.message_handlers:
                for handler in self.message_handlers[queue_name]:
                    await handler(message_data, properties)
            
            # 确认消息
            # await channel.basic_ack(delivery_tag=method.delivery_tag)
            
        except Exception as e:
            self.logger.error(f"处理消息失败: {e}")
            # 拒绝消息并重新入队
            # await channel.basic_nack(
            #     delivery_tag=method.delivery_tag,
            #     requeue=True
            # )
    
    async def close(self):
        """关闭连接"""
        if self.connection:
            # await self.connection.close()
            self.connection = None
            self.logger.info("RabbitMQ连接已关闭")

10.2 订单服务实现

class OrderService:
    """订单服务"""
    
    def __init__(self, message_broker: ECommerceMessageBroker):
        self.message_broker = message_broker
        self.logger = logging.getLogger(__name__)
        
        # 模拟数据存储
        self.orders: Dict[str, Order] = {}
        self.customers: Dict[str, Customer] = {}
        self.products: Dict[str, Product] = {}
        
        # 注册消息处理器
        self._register_handlers()
        
        # 初始化测试数据
        self._init_test_data()
    
    def _register_handlers(self):
        """注册消息处理器"""
        self.message_broker.register_handler(
            "payment.success", 
            self._handle_payment_success
        )
        self.message_broker.register_handler(
            "payment.failed", 
            self._handle_payment_failed
        )
        self.message_broker.register_handler(
            "inventory.confirmed", 
            self._handle_inventory_confirmed
        )
        self.message_broker.register_handler(
            "inventory.insufficient", 
            self._handle_inventory_insufficient
        )
    
    def _init_test_data(self):
        """初始化测试数据"""
        # 创建测试客户
        self.customers["customer_001"] = Customer(
            customer_id="customer_001",
            name="张三",
            email="zhangsan@example.com",
            phone="13800138000",
            address="北京市朝阳区xxx街道xxx号",
            level="vip"
        )
        
        # 创建测试商品
        self.products["product_001"] = Product(
            product_id="product_001",
            name="iPhone 15 Pro",
            price=8999.00,
            category="手机",
            description="苹果最新款手机"
        )
        
        self.products["product_002"] = Product(
            product_id="product_002",
            name="MacBook Pro",
            price=15999.00,
            category="电脑",
            description="苹果笔记本电脑"
        )
    
    async def create_order(self, customer_id: str, items: List[Dict[str, Any]], 
                          shipping_address: str = "", 
                          payment_method: str = "credit_card") -> Order:
        """创建订单"""
        try:
            # 验证客户
            if customer_id not in self.customers:
                raise ValueError(f"客户不存在: {customer_id}")
            
            customer = self.customers[customer_id]
            
            # 构建订单项
            order_items = []
            total_amount = 0.0
            
            for item_data in items:
                product_id = item_data["product_id"]
                quantity = item_data["quantity"]
                
                if product_id not in self.products:
                    raise ValueError(f"商品不存在: {product_id}")
                
                product = self.products[product_id]
                unit_price = product.price
                total_price = unit_price * quantity
                
                order_item = OrderItem(
                    product_id=product_id,
                    product_name=product.name,
                    quantity=quantity,
                    unit_price=unit_price,
                    total_price=total_price
                )
                
                order_items.append(order_item)
                total_amount += total_price
            
            # 创建订单
            order_id = f"order_{int(time.time())}_{uuid.uuid4().hex[:8]}"
            order = Order(
                order_id=order_id,
                customer=customer,
                items=order_items,
                total_amount=total_amount,
                status=OrderStatus.PENDING,
                created_at=datetime.now(),
                updated_at=datetime.now(),
                payment_method=payment_method,
                shipping_address=shipping_address or customer.address
            )
            
            # 保存订单
            self.orders[order_id] = order
            
            # 发布订单创建事件
            order_event = OrderEvent(
                event_id=str(uuid.uuid4()),
                event_type="order.created",
                order_id=order_id,
                timestamp=datetime.now(),
                data={
                    "customer_id": customer_id,
                    "total_amount": total_amount,
                    "items_count": len(order_items),
                    "payment_method": payment_method
                }
            )
            
            await self.message_broker.publish_message(
                exchange="ecommerce.orders",
                routing_key="order.created",
                message=order_event,
                priority=5
            )
            
            self.logger.info(f"订单创建成功: {order_id}")
            return order
            
        except Exception as e:
            self.logger.error(f"创建订单失败: {e}")
            raise
    
    async def confirm_order(self, order_id: str) -> bool:
        """确认订单"""
        try:
            if order_id not in self.orders:
                raise ValueError(f"订单不存在: {order_id}")
            
            order = self.orders[order_id]
            
            if order.status != OrderStatus.PENDING:
                raise ValueError(f"订单状态不允许确认: {order.status}")
            
            # 更新订单状态
            order.status = OrderStatus.CONFIRMED
            order.updated_at = datetime.now()
            
            # 发布订单确认事件
            order_event = OrderEvent(
                event_id=str(uuid.uuid4()),
                event_type="order.confirmed",
                order_id=order_id,
                timestamp=datetime.now(),
                data={
                    "customer_id": order.customer.customer_id,
                    "total_amount": order.total_amount
                }
            )
            
            await self.message_broker.publish_message(
                exchange="ecommerce.orders",
                routing_key="order.confirmed",
                message=order_event,
                priority=7
            )
            
            # 请求库存预留
            for item in order.items:
                inventory_event = InventoryEvent(
                    event_id=str(uuid.uuid4()),
                    event_type="inventory.reserve",
                    product_id=item.product_id,
                    operation=InventoryOperation.RESERVE,
                    quantity=item.quantity,
                    timestamp=datetime.now(),
                    order_id=order_id
                )
                
                await self.message_broker.publish_message(
                    exchange="ecommerce.inventory",
                    routing_key="inventory.reserve",
                    message=inventory_event
                )
            
            self.logger.info(f"订单确认成功: {order_id}")
            return True
            
        except Exception as e:
            self.logger.error(f"确认订单失败: {e}")
            return False
    
    async def cancel_order(self, order_id: str, reason: str = "") -> bool:
        """取消订单"""
        try:
            if order_id not in self.orders:
                raise ValueError(f"订单不存在: {order_id}")
            
            order = self.orders[order_id]
            
            # 检查订单状态是否允许取消
            if order.status in [OrderStatus.SHIPPED, OrderStatus.DELIVERED]:
                raise ValueError(f"订单状态不允许取消: {order.status}")
            
            old_status = order.status
            order.status = OrderStatus.CANCELLED
            order.updated_at = datetime.now()
            order.metadata["cancel_reason"] = reason
            order.metadata["cancelled_at"] = datetime.now().isoformat()
            
            # 发布订单取消事件
            order_event = OrderEvent(
                event_id=str(uuid.uuid4()),
                event_type="order.cancelled",
                order_id=order_id,
                timestamp=datetime.now(),
                data={
                    "customer_id": order.customer.customer_id,
                    "old_status": old_status.value,
                    "reason": reason,
                    "total_amount": order.total_amount
                }
            )
            
            await self.message_broker.publish_message(
                exchange="ecommerce.orders",
                routing_key="order.cancelled",
                message=order_event,
                priority=8
            )
            
            # 如果已经预留库存,需要释放
            if old_status in [OrderStatus.CONFIRMED, OrderStatus.PAID, OrderStatus.PROCESSING]:
                for item in order.items:
                    inventory_event = InventoryEvent(
                        event_id=str(uuid.uuid4()),
                        event_type="inventory.release",
                        product_id=item.product_id,
                        operation=InventoryOperation.RELEASE,
                        quantity=item.quantity,
                        timestamp=datetime.now(),
                        order_id=order_id
                    )
                    
                    await self.message_broker.publish_message(
                        exchange="ecommerce.inventory",
                        routing_key="inventory.release",
                        message=inventory_event
                    )
            
            self.logger.info(f"订单取消成功: {order_id}, 原因: {reason}")
            return True
            
        except Exception as e:
            self.logger.error(f"取消订单失败: {e}")
            return False
    
    async def _handle_payment_success(self, message_data: Dict[str, Any], 
                                    properties: Dict[str, Any]):
        """处理支付成功事件"""
        try:
            order_id = message_data.get("order_id")
            payment_id = message_data.get("payment_id")
            
            if order_id not in self.orders:
                self.logger.warning(f"支付成功但订单不存在: {order_id}")
                return
            
            order = self.orders[order_id]
            order.status = OrderStatus.PAID
            order.updated_at = datetime.now()
            order.metadata["payment_id"] = payment_id
            order.metadata["paid_at"] = datetime.now().isoformat()
            
            # 确认库存扣减
            for item in order.items:
                inventory_event = InventoryEvent(
                    event_id=str(uuid.uuid4()),
                    event_type="inventory.confirm",
                    product_id=item.product_id,
                    operation=InventoryOperation.CONFIRM,
                    quantity=item.quantity,
                    timestamp=datetime.now(),
                    order_id=order_id
                )
                
                await self.message_broker.publish_message(
                    exchange="ecommerce.inventory",
                    routing_key="inventory.confirm",
                    message=inventory_event
                )
            
            # 发送支付成功通知
            notification_event = NotificationEvent(
                event_id=str(uuid.uuid4()),
                notification_type=NotificationType.EMAIL,
                recipient=order.customer.email,
                subject="订单支付成功通知",
                content=f"您的订单 {order_id} 已支付成功,金额:¥{order.total_amount}",
                data={
                    "order_id": order_id,
                    "customer_name": order.customer.name,
                    "amount": order.total_amount
                }
            )
            
            await self.message_broker.publish_message(
                exchange="ecommerce.notifications",
                routing_key="email",
                message=notification_event
            )
            
            self.logger.info(f"处理支付成功事件: {order_id}")
            
        except Exception as e:
            self.logger.error(f"处理支付成功事件失败: {e}")
    
    async def _handle_payment_failed(self, message_data: Dict[str, Any], 
                                   properties: Dict[str, Any]):
        """处理支付失败事件"""
        try:
            order_id = message_data.get("order_id")
            failure_reason = message_data.get("failure_reason", "未知原因")
            
            if order_id not in self.orders:
                self.logger.warning(f"支付失败但订单不存在: {order_id}")
                return
            
            order = self.orders[order_id]
            order.metadata["payment_failure_reason"] = failure_reason
            order.metadata["payment_failed_at"] = datetime.now().isoformat()
            
            # 释放预留的库存
            for item in order.items:
                inventory_event = InventoryEvent(
                    event_id=str(uuid.uuid4()),
                    event_type="inventory.release",
                    product_id=item.product_id,
                    operation=InventoryOperation.RELEASE,
                    quantity=item.quantity,
                    timestamp=datetime.now(),
                    order_id=order_id
                )
                
                await self.message_broker.publish_message(
                    exchange="ecommerce.inventory",
                    routing_key="inventory.release",
                    message=inventory_event
                )
            
            # 发送支付失败通知
            notification_event = NotificationEvent(
                event_id=str(uuid.uuid4()),
                notification_type=NotificationType.EMAIL,
                recipient=order.customer.email,
                subject="订单支付失败通知",
                content=f"您的订单 {order_id} 支付失败,原因:{failure_reason}",
                data={
                    "order_id": order_id,
                    "customer_name": order.customer.name,
                    "failure_reason": failure_reason
                }
            )
            
            await self.message_broker.publish_message(
                exchange="ecommerce.notifications",
                routing_key="email",
                message=notification_event
            )
            
            self.logger.info(f"处理支付失败事件: {order_id}, 原因: {failure_reason}")
            
        except Exception as e:
            self.logger.error(f"处理支付失败事件失败: {e}")
    
    async def _handle_inventory_confirmed(self, message_data: Dict[str, Any], 
                                        properties: Dict[str, Any]):
        """处理库存确认事件"""
        try:
            order_id = message_data.get("order_id")
            
            if order_id and order_id in self.orders:
                order = self.orders[order_id]
                if order.status == OrderStatus.PAID:
                    order.status = OrderStatus.PROCESSING
                    order.updated_at = datetime.now()
                    
                    self.logger.info(f"订单进入处理状态: {order_id}")
            
        except Exception as e:
            self.logger.error(f"处理库存确认事件失败: {e}")
    
    async def _handle_inventory_insufficient(self, message_data: Dict[str, Any], 
                                           properties: Dict[str, Any]):
        """处理库存不足事件"""
        try:
            order_id = message_data.get("order_id")
            product_id = message_data.get("product_id")
            
            if order_id and order_id in self.orders:
                # 自动取消订单
                await self.cancel_order(
                    order_id, 
                    f"商品 {product_id} 库存不足"
                )
                
                self.logger.info(f"因库存不足自动取消订单: {order_id}")
            
        except Exception as e:
            self.logger.error(f"处理库存不足事件失败: {e}")
    
    def get_order(self, order_id: str) -> Optional[Order]:
        """获取订单信息"""
        return self.orders.get(order_id)
    
    def get_customer_orders(self, customer_id: str) -> List[Order]:
        """获取客户的所有订单"""
        return [
            order for order in self.orders.values() 
            if order.customer.customer_id == customer_id
        ]
    
    def get_orders_by_status(self, status: OrderStatus) -> List[Order]:
        """根据状态获取订单"""
        return [
            order for order in self.orders.values() 
            if order.status == status
        ]

10.3 支付服务实现

class PaymentGateway(ABC):
    """支付网关抽象基类"""
    
    @abstractmethod
    async def process_payment(self, payment_request: PaymentRequest) -> Dict[str, Any]:
        """处理支付"""
        pass
    
    @abstractmethod
    async def refund_payment(self, payment_id: str, amount: float) -> Dict[str, Any]:
        """退款"""
        pass

class MockPaymentGateway(PaymentGateway):
    """模拟支付网关"""
    
    def __init__(self, success_rate: float = 0.9):
        self.success_rate = success_rate
        self.logger = logging.getLogger(__name__)
        
        # 模拟支付记录
        self.payment_records: Dict[str, Dict[str, Any]] = {}
    
    async def process_payment(self, payment_request: PaymentRequest) -> Dict[str, Any]:
        """处理支付"""
        try:
            # 模拟网络延迟
            await asyncio.sleep(0.5)
            
            # 模拟支付结果
            import random
            success = random.random() < self.success_rate
            
            if success:
                transaction_id = f"txn_{int(time.time())}_{uuid.uuid4().hex[:8]}"
                
                result = {
                    "success": True,
                    "transaction_id": transaction_id,
                    "amount": payment_request.amount,
                    "currency": "CNY",
                    "gateway_response": {
                        "code": "SUCCESS",
                        "message": "支付成功",
                        "timestamp": datetime.now().isoformat()
                    }
                }
                
                # 记录支付信息
                self.payment_records[payment_request.payment_id] = {
                    "payment_request": payment_request,
                    "transaction_id": transaction_id,
                    "status": "success",
                    "processed_at": datetime.now()
                }
                
                self.logger.info(f"支付成功: {payment_request.payment_id}")
                
            else:
                result = {
                    "success": False,
                    "error_code": "PAYMENT_FAILED",
                    "error_message": "支付失败,请检查支付信息",
                    "gateway_response": {
                        "code": "FAILED",
                        "message": "余额不足或银行卡信息错误",
                        "timestamp": datetime.now().isoformat()
                    }
                }
                
                self.payment_records[payment_request.payment_id] = {
                    "payment_request": payment_request,
                    "status": "failed",
                    "processed_at": datetime.now(),
                    "error": result["error_message"]
                }
                
                self.logger.warning(f"支付失败: {payment_request.payment_id}")
            
            return result
            
        except Exception as e:
            self.logger.error(f"支付处理异常: {e}")
            return {
                "success": False,
                "error_code": "SYSTEM_ERROR",
                "error_message": f"系统错误: {e}"
            }
    
    async def refund_payment(self, payment_id: str, amount: float) -> Dict[str, Any]:
        """退款"""
        try:
            if payment_id not in self.payment_records:
                return {
                    "success": False,
                    "error_code": "PAYMENT_NOT_FOUND",
                    "error_message": "支付记录不存在"
                }
            
            payment_record = self.payment_records[payment_id]
            
            if payment_record["status"] != "success":
                return {
                    "success": False,
                    "error_code": "INVALID_PAYMENT_STATUS",
                    "error_message": "只能对成功的支付进行退款"
                }
            
            # 模拟退款处理
            await asyncio.sleep(0.3)
            
            refund_id = f"refund_{int(time.time())}_{uuid.uuid4().hex[:8]}"
            
            result = {
                "success": True,
                "refund_id": refund_id,
                "amount": amount,
                "currency": "CNY",
                "gateway_response": {
                    "code": "SUCCESS",
                    "message": "退款成功",
                    "timestamp": datetime.now().isoformat()
                }
            }
            
            # 更新支付记录
            payment_record["refund_id"] = refund_id
            payment_record["refund_amount"] = amount
            payment_record["refunded_at"] = datetime.now()
            
            self.logger.info(f"退款成功: {payment_id} -> {refund_id}")
            return result
            
        except Exception as e:
            self.logger.error(f"退款处理异常: {e}")
            return {
                "success": False,
                "error_code": "SYSTEM_ERROR",
                "error_message": f"系统错误: {e}"
            }

class PaymentService:
    """支付服务"""
    
    def __init__(self, message_broker: ECommerceMessageBroker, 
                 payment_gateway: PaymentGateway):
        self.message_broker = message_broker
        self.payment_gateway = payment_gateway
        self.logger = logging.getLogger(__name__)
        
        # 支付请求存储
        self.payment_requests: Dict[str, PaymentRequest] = {}
        
        # 注册消息处理器
        self._register_handlers()
    
    def _register_handlers(self):
        """注册消息处理器"""
        self.message_broker.register_handler(
            "order.confirmed", 
            self._handle_order_confirmed
        )
        self.message_broker.register_handler(
            "order.cancelled", 
            self._handle_order_cancelled
        )
    
    async def create_payment_request(self, order_id: str, amount: float, 
                                   customer_id: str, 
                                   payment_method: str) -> PaymentRequest:
        """创建支付请求"""
        try:
            payment_id = f"pay_{int(time.time())}_{uuid.uuid4().hex[:8]}"
            
            payment_request = PaymentRequest(
                payment_id=payment_id,
                order_id=order_id,
                amount=amount,
                payment_method=payment_method,
                customer_id=customer_id,
                created_at=datetime.now(),
                status=PaymentStatus.PENDING
            )
            
            self.payment_requests[payment_id] = payment_request
            
            self.logger.info(f"创建支付请求: {payment_id} for order {order_id}")
            return payment_request
            
        except Exception as e:
            self.logger.error(f"创建支付请求失败: {e}")
            raise
    
    async def process_payment(self, payment_id: str) -> bool:
        """处理支付"""
        try:
            if payment_id not in self.payment_requests:
                raise ValueError(f"支付请求不存在: {payment_id}")
            
            payment_request = self.payment_requests[payment_id]
            
            if payment_request.status != PaymentStatus.PENDING:
                raise ValueError(f"支付状态不允许处理: {payment_request.status}")
            
            # 更新状态为处理中
            payment_request.status = PaymentStatus.PROCESSING
            
            # 发布支付处理事件
            payment_event = PaymentEvent(
                event_id=str(uuid.uuid4()),
                event_type="payment.processing",
                payment_id=payment_id,
                order_id=payment_request.order_id,
                timestamp=datetime.now(),
                data={
                    "amount": payment_request.amount,
                    "payment_method": payment_request.payment_method,
                    "customer_id": payment_request.customer_id
                }
            )
            
            await self.message_broker.publish_message(
                exchange="ecommerce.payments",
                routing_key="payment.processing",
                message=payment_event
            )
            
            # 调用支付网关
            gateway_result = await self.payment_gateway.process_payment(payment_request)
            
            if gateway_result["success"]:
                # 支付成功
                payment_request.status = PaymentStatus.SUCCESS
                payment_request.gateway_response = gateway_result["gateway_response"]
                
                # 发布支付成功事件
                success_event = PaymentEvent(
                    event_id=str(uuid.uuid4()),
                    event_type="payment.success",
                    payment_id=payment_id,
                    order_id=payment_request.order_id,
                    timestamp=datetime.now(),
                    data={
                        "amount": payment_request.amount,
                        "transaction_id": gateway_result.get("transaction_id"),
                        "customer_id": payment_request.customer_id
                    }
                )
                
                await self.message_broker.publish_message(
                    exchange="ecommerce.payments",
                    routing_key="payment.success",
                    message=success_event,
                    priority=8
                )
                
                self.logger.info(f"支付成功: {payment_id}")
                return True
                
            else:
                # 支付失败
                payment_request.status = PaymentStatus.FAILED
                payment_request.gateway_response = gateway_result.get("gateway_response", {})
                
                # 发布支付失败事件
                failed_event = PaymentEvent(
                    event_id=str(uuid.uuid4()),
                    event_type="payment.failed",
                    payment_id=payment_id,
                    order_id=payment_request.order_id,
                    timestamp=datetime.now(),
                    data={
                        "amount": payment_request.amount,
                        "error_code": gateway_result.get("error_code"),
                        "error_message": gateway_result.get("error_message"),
                        "customer_id": payment_request.customer_id,
                        "failure_reason": gateway_result.get("error_message", "未知错误")
                    }
                )
                
                await self.message_broker.publish_message(
                    exchange="ecommerce.payments",
                    routing_key="payment.failed",
                    message=failed_event,
                    priority=8
                )
                
                self.logger.warning(f"支付失败: {payment_id}, 原因: {gateway_result.get('error_message')}")
                return False
            
        except Exception as e:
            self.logger.error(f"处理支付失败: {e}")
            
            # 更新支付状态为失败
            if payment_id in self.payment_requests:
                self.payment_requests[payment_id].status = PaymentStatus.FAILED
            
            return False
    
    async def refund_payment(self, payment_id: str, amount: float = None) -> bool:
        """退款"""
        try:
            if payment_id not in self.payment_requests:
                raise ValueError(f"支付请求不存在: {payment_id}")
            
            payment_request = self.payment_requests[payment_id]
            
            if payment_request.status != PaymentStatus.SUCCESS:
                raise ValueError(f"只能对成功的支付进行退款: {payment_request.status}")
            
            # 如果没有指定退款金额,则全额退款
            refund_amount = amount or payment_request.amount
            
            if refund_amount > payment_request.amount:
                raise ValueError("退款金额不能超过支付金额")
            
            # 调用支付网关退款
            gateway_result = await self.payment_gateway.refund_payment(
                payment_id, refund_amount
            )
            
            if gateway_result["success"]:
                # 退款成功
                payment_request.status = PaymentStatus.REFUNDED
                
                # 发布退款成功事件
                refund_event = PaymentEvent(
                    event_id=str(uuid.uuid4()),
                    event_type="payment.refunded",
                    payment_id=payment_id,
                    order_id=payment_request.order_id,
                    timestamp=datetime.now(),
                    data={
                        "original_amount": payment_request.amount,
                        "refund_amount": refund_amount,
                        "refund_id": gateway_result.get("refund_id"),
                        "customer_id": payment_request.customer_id
                    }
                )
                
                await self.message_broker.publish_message(
                    exchange="ecommerce.payments",
                    routing_key="payment.refunded",
                    message=refund_event
                )
                
                self.logger.info(f"退款成功: {payment_id}, 金额: {refund_amount}")
                return True
                
            else:
                self.logger.error(f"退款失败: {payment_id}, 原因: {gateway_result.get('error_message')}")
                return False
            
        except Exception as e:
            self.logger.error(f"退款处理失败: {e}")
            return False
    
    async def _handle_order_confirmed(self, message_data: Dict[str, Any], 
                                    properties: Dict[str, Any]):
        """处理订单确认事件"""
        try:
            order_id = message_data.get("order_id")
            customer_id = message_data.get("customer_id")
            total_amount = message_data.get("total_amount")
            
            # 创建支付请求
            payment_request = await self.create_payment_request(
                order_id=order_id,
                amount=total_amount,
                customer_id=customer_id,
                payment_method="credit_card"  # 默认支付方式
            )
            
            # 发布支付请求创建事件
            payment_event = PaymentEvent(
                event_id=str(uuid.uuid4()),
                event_type="payment.request_created",
                payment_id=payment_request.payment_id,
                order_id=order_id,
                timestamp=datetime.now(),
                data={
                    "amount": total_amount,
                    "customer_id": customer_id
                }
            )
            
            await self.message_broker.publish_message(
                exchange="ecommerce.payments",
                routing_key="payment.request_created",
                message=payment_event
            )
            
            # 自动处理支付(在实际项目中,这通常由用户触发)
            await asyncio.sleep(1)  # 模拟用户操作延迟
            await self.process_payment(payment_request.payment_id)
            
            self.logger.info(f"处理订单确认事件,创建支付请求: {payment_request.payment_id}")
            
        except Exception as e:
            self.logger.error(f"处理订单确认事件失败: {e}")
    
    async def _handle_order_cancelled(self, message_data: Dict[str, Any], 
                                    properties: Dict[str, Any]):
        """处理订单取消事件"""
        try:
            order_id = message_data.get("order_id")
            
            # 查找相关的支付请求
            payment_requests = [
                pr for pr in self.payment_requests.values()
                if pr.order_id == order_id and pr.status == PaymentStatus.SUCCESS
            ]
            
            # 对成功的支付进行退款
            for payment_request in payment_requests:
                await self.refund_payment(payment_request.payment_id)
            
            self.logger.info(f"处理订单取消事件,退款支付: {order_id}")
            
        except Exception as e:
            self.logger.error(f"处理订单取消事件失败: {e}")
    
    def get_payment_request(self, payment_id: str) -> Optional[PaymentRequest]:
        """获取支付请求"""
        return self.payment_requests.get(payment_id)
    
    def get_order_payments(self, order_id: str) -> List[PaymentRequest]:
        """获取订单的所有支付请求"""
        return [
            pr for pr in self.payment_requests.values()
            if pr.order_id == order_id
        ]

10.4 库存服务实现

class InventoryService:
    """库存服务"""
    
    def __init__(self, message_broker: ECommerceMessageBroker):
        self.message_broker = message_broker
        self.logger = logging.getLogger(__name__)
        
        # 库存数据存储
        self.inventory: Dict[str, InventoryItem] = {}
        
        # 预留记录
        self.reservations: Dict[str, Dict[str, Any]] = {}
        
        # 注册消息处理器
        self._register_handlers()
        
        # 初始化测试库存
        self._init_test_inventory()
    
    def _register_handlers(self):
        """注册消息处理器"""
        self.message_broker.register_handler(
            "inventory.reserve", 
            self._handle_inventory_reserve
        )
        self.message_broker.register_handler(
            "inventory.confirm", 
            self._handle_inventory_confirm
        )
        self.message_broker.register_handler(
            "inventory.release", 
            self._handle_inventory_release
        )
    
    def _init_test_inventory(self):
        """初始化测试库存"""
        self.inventory["product_001"] = InventoryItem(
            product_id="product_001",
            available_quantity=100,
            reserved_quantity=0,
            total_quantity=100,
            warehouse_id="warehouse_001",
            last_updated=datetime.now()
        )
        
        self.inventory["product_002"] = InventoryItem(
            product_id="product_002",
            available_quantity=50,
            reserved_quantity=0,
            total_quantity=50,
            warehouse_id="warehouse_001",
            last_updated=datetime.now()
        )
    
    async def check_availability(self, product_id: str, quantity: int) -> bool:
        """检查库存可用性"""
        if product_id not in self.inventory:
            return False
        
        inventory_item = self.inventory[product_id]
        return inventory_item.available_quantity >= quantity
    
    async def reserve_inventory(self, product_id: str, quantity: int, 
                              order_id: str) -> bool:
        """预留库存"""
        try:
            if product_id not in self.inventory:
                self.logger.warning(f"商品不存在: {product_id}")
                return False
            
            inventory_item = self.inventory[product_id]
            
            if inventory_item.available_quantity < quantity:
                self.logger.warning(
                    f"库存不足: {product_id}, 需要: {quantity}, 可用: {inventory_item.available_quantity}"
                )
                return False
            
            # 预留库存
            inventory_item.available_quantity -= quantity
            inventory_item.reserved_quantity += quantity
            inventory_item.last_updated = datetime.now()
            
            # 记录预留信息
            reservation_id = f"res_{order_id}_{product_id}"
            self.reservations[reservation_id] = {
                "product_id": product_id,
                "quantity": quantity,
                "order_id": order_id,
                "reserved_at": datetime.now(),
                "status": "reserved"
            }
            
            self.logger.info(
                f"库存预留成功: {product_id}, 数量: {quantity}, 订单: {order_id}"
            )
            return True
            
        except Exception as e:
            self.logger.error(f"预留库存失败: {e}")
            return False
    
    async def confirm_inventory(self, product_id: str, quantity: int, 
                              order_id: str) -> bool:
        """确认库存扣减"""
        try:
            reservation_id = f"res_{order_id}_{product_id}"
            
            if reservation_id not in self.reservations:
                self.logger.warning(f"预留记录不存在: {reservation_id}")
                return False
            
            reservation = self.reservations[reservation_id]
            
            if reservation["status"] != "reserved":
                self.logger.warning(f"预留状态不正确: {reservation['status']}")
                return False
            
            if reservation["quantity"] != quantity:
                self.logger.warning(
                    f"确认数量与预留数量不匹配: {quantity} vs {reservation['quantity']}"
                )
                return False
            
            inventory_item = self.inventory[product_id]
            
            # 确认扣减(预留数量转为已扣减)
            inventory_item.reserved_quantity -= quantity
            inventory_item.total_quantity -= quantity
            inventory_item.last_updated = datetime.now()
            
            # 更新预留状态
            reservation["status"] = "confirmed"
            reservation["confirmed_at"] = datetime.now()
            
            self.logger.info(
                f"库存确认扣减: {product_id}, 数量: {quantity}, 订单: {order_id}"
            )
            return True
            
        except Exception as e:
            self.logger.error(f"确认库存扣减失败: {e}")
            return False
    
    async def release_inventory(self, product_id: str, quantity: int, 
                              order_id: str) -> bool:
        """释放预留库存"""
        try:
            reservation_id = f"res_{order_id}_{product_id}"
            
            if reservation_id not in self.reservations:
                self.logger.warning(f"预留记录不存在: {reservation_id}")
                return False
            
            reservation = self.reservations[reservation_id]
            
            if reservation["status"] not in ["reserved", "confirmed"]:
                self.logger.warning(f"预留状态不允许释放: {reservation['status']}")
                return False
            
            inventory_item = self.inventory[product_id]
            
            if reservation["status"] == "reserved":
                # 释放预留库存
                inventory_item.available_quantity += quantity
                inventory_item.reserved_quantity -= quantity
            else:
                # 已确认的库存,需要恢复总量
                inventory_item.available_quantity += quantity
                inventory_item.total_quantity += quantity
            
            inventory_item.last_updated = datetime.now()
            
            # 更新预留状态
            reservation["status"] = "released"
            reservation["released_at"] = datetime.now()
            
            self.logger.info(
                f"库存释放成功: {product_id}, 数量: {quantity}, 订单: {order_id}"
            )
            return True
            
        except Exception as e:
            self.logger.error(f"释放库存失败: {e}")
            return False
    
    async def _handle_inventory_reserve(self, message_data: Dict[str, Any], 
                                      properties: Dict[str, Any]):
        """处理库存预留请求"""
        try:
            product_id = message_data.get("product_id")
            quantity = message_data.get("quantity")
            order_id = message_data.get("order_id")
            
            success = await self.reserve_inventory(product_id, quantity, order_id)
            
            if success:
                # 发布库存预留成功事件
                inventory_event = InventoryEvent(
                    event_id=str(uuid.uuid4()),
                    event_type="inventory.reserved",
                    product_id=product_id,
                    operation=InventoryOperation.RESERVE,
                    quantity=quantity,
                    timestamp=datetime.now(),
                    order_id=order_id
                )
                
                await self.message_broker.publish_message(
                    exchange="ecommerce.inventory",
                    routing_key="inventory.reserved",
                    message=inventory_event
                )
            else:
                # 发布库存不足事件
                inventory_event = InventoryEvent(
                    event_id=str(uuid.uuid4()),
                    event_type="inventory.insufficient",
                    product_id=product_id,
                    operation=InventoryOperation.RESERVE,
                    quantity=quantity,
                    timestamp=datetime.now(),
                    order_id=order_id
                )
                
                await self.message_broker.publish_message(
                    exchange="ecommerce.inventory",
                    routing_key="inventory.insufficient",
                    message=inventory_event
                )
            
        except Exception as e:
            self.logger.error(f"处理库存预留请求失败: {e}")
    
    async def _handle_inventory_confirm(self, message_data: Dict[str, Any], 
                                      properties: Dict[str, Any]):
        """处理库存确认请求"""
        try:
            product_id = message_data.get("product_id")
            quantity = message_data.get("quantity")
            order_id = message_data.get("order_id")
            
            success = await self.confirm_inventory(product_id, quantity, order_id)
            
            if success:
                # 发布库存确认成功事件
                inventory_event = InventoryEvent(
                    event_id=str(uuid.uuid4()),
                    event_type="inventory.confirmed",
                    product_id=product_id,
                    operation=InventoryOperation.CONFIRM,
                    quantity=quantity,
                    timestamp=datetime.now(),
                    order_id=order_id
                )
                
                await self.message_broker.publish_message(
                    exchange="ecommerce.inventory",
                    routing_key="inventory.confirmed",
                    message=inventory_event
                )
            
        except Exception as e:
            self.logger.error(f"处理库存确认请求失败: {e}")
    
    async def _handle_inventory_release(self, message_data: Dict[str, Any], 
                                      properties: Dict[str, Any]):
        """处理库存释放请求"""
        try:
            product_id = message_data.get("product_id")
            quantity = message_data.get("quantity")
            order_id = message_data.get("order_id")
            
            success = await self.release_inventory(product_id, quantity, order_id)
            
            if success:
                # 发布库存释放成功事件
                inventory_event = InventoryEvent(
                    event_id=str(uuid.uuid4()),
                    event_type="inventory.released",
                    product_id=product_id,
                    operation=InventoryOperation.RELEASE,
                    quantity=quantity,
                    timestamp=datetime.now(),
                    order_id=order_id
                )
                
                await self.message_broker.publish_message(
                    exchange="ecommerce.inventory",
                    routing_key="inventory.released",
                    message=inventory_event
                )
            
        except Exception as e:
            self.logger.error(f"处理库存释放请求失败: {e}")
    
    def get_inventory_info(self, product_id: str) -> Optional[InventoryItem]:
        """获取库存信息"""
        return self.inventory.get(product_id)
    
    def get_all_inventory(self) -> Dict[str, InventoryItem]:
        """获取所有库存信息"""
        return self.inventory.copy()
    
    def get_low_stock_products(self, threshold: int = 10) -> List[InventoryItem]:
        """获取低库存商品"""
        return [
            item for item in self.inventory.values()
            if item.available_quantity <= threshold
        ]

10.5 通知服务实现

class NotificationChannel(ABC):
    """通知渠道抽象基类"""
    
    @abstractmethod
    async def send_notification(self, recipient: str, subject: str, 
                              content: str, data: Dict[str, Any] = None) -> bool:
        """发送通知"""
        pass

class EmailNotificationChannel(NotificationChannel):
    """邮件通知渠道"""
    
    def __init__(self, smtp_config: Dict[str, Any] = None):
        self.smtp_config = smtp_config or {}
        self.logger = logging.getLogger(__name__)
    
    async def send_notification(self, recipient: str, subject: str, 
                              content: str, data: Dict[str, Any] = None) -> bool:
        """发送邮件通知"""
        try:
            # 模拟邮件发送
            await asyncio.sleep(0.1)
            
            self.logger.info(
                f"发送邮件通知 - 收件人: {recipient}, 主题: {subject}"
            )
            
            # 在实际项目中,这里会调用SMTP服务发送邮件
            # import smtplib
            # from email.mime.text import MIMEText
            # ...
            
            return True
            
        except Exception as e:
            self.logger.error(f"发送邮件失败: {e}")
            return False

class SMSNotificationChannel(NotificationChannel):
    """短信通知渠道"""
    
    def __init__(self, sms_config: Dict[str, Any] = None):
        self.sms_config = sms_config or {}
        self.logger = logging.getLogger(__name__)
    
    async def send_notification(self, recipient: str, subject: str, 
                              content: str, data: Dict[str, Any] = None) -> bool:
        """发送短信通知"""
        try:
            # 模拟短信发送
            await asyncio.sleep(0.1)
            
            self.logger.info(
                f"发送短信通知 - 收件人: {recipient}, 内容: {content[:50]}..."
            )
            
            # 在实际项目中,这里会调用短信服务API
            # import requests
            # response = requests.post(sms_api_url, data=...)
            # ...
            
            return True
            
        except Exception as e:
            self.logger.error(f"发送短信失败: {e}")
            return False

class PushNotificationChannel(NotificationChannel):
    """推送通知渠道"""
    
    def __init__(self, push_config: Dict[str, Any] = None):
        self.push_config = push_config or {}
        self.logger = logging.getLogger(__name__)
    
    async def send_notification(self, recipient: str, subject: str, 
                              content: str, data: Dict[str, Any] = None) -> bool:
        """发送推送通知"""
        try:
            # 模拟推送发送
            await asyncio.sleep(0.1)
            
            self.logger.info(
                f"发送推送通知 - 收件人: {recipient}, 标题: {subject}"
            )
            
            # 在实际项目中,这里会调用推送服务API
            # 如Firebase Cloud Messaging, Apple Push Notification等
            
            return True
            
        except Exception as e:
            self.logger.error(f"发送推送失败: {e}")
            return False

class NotificationService:
    """通知服务"""
    
    def __init__(self, message_broker: ECommerceMessageBroker):
        self.message_broker = message_broker
        self.logger = logging.getLogger(__name__)
        
        # 通知渠道
        self.channels: Dict[NotificationType, NotificationChannel] = {
            NotificationType.EMAIL: EmailNotificationChannel(),
            NotificationType.SMS: SMSNotificationChannel(),
            NotificationType.PUSH: PushNotificationChannel()
        }
        
        # 通知模板
        self.templates = self._init_templates()
        
        # 通知历史
        self.notification_history: List[Dict[str, Any]] = []
        
        # 注册消息处理器
        self._register_handlers()
    
    def _init_templates(self) -> Dict[str, Dict[str, str]]:
        """初始化通知模板"""
        return {
            "order_created": {
                "email_subject": "订单创建成功 - {order_id}",
                "email_content": "亲爱的 {customer_name},您的订单 {order_id} 已创建成功,金额:¥{amount}。",
                "sms_content": "您的订单 {order_id} 已创建,金额¥{amount}。",
                "push_title": "订单创建成功",
                "push_content": "订单 {order_id} 已创建,金额¥{amount}"
            },
            "payment_success": {
                "email_subject": "支付成功通知 - {order_id}",
                "email_content": "亲爱的 {customer_name},您的订单 {order_id} 支付成功,金额:¥{amount}。我们将尽快为您处理订单。",
                "sms_content": "订单 {order_id} 支付成功,金额¥{amount},正在处理中。",
                "push_title": "支付成功",
                "push_content": "订单 {order_id} 支付成功"
            },
            "payment_failed": {
                "email_subject": "支付失败通知 - {order_id}",
                "email_content": "亲爱的 {customer_name},您的订单 {order_id} 支付失败,原因:{failure_reason}。请重新尝试支付。",
                "sms_content": "订单 {order_id} 支付失败:{failure_reason},请重新支付。",
                "push_title": "支付失败",
                "push_content": "订单 {order_id} 支付失败"
            },
            "order_shipped": {
                "email_subject": "订单已发货 - {order_id}",
                "email_content": "亲爱的 {customer_name},您的订单 {order_id} 已发货,快递单号:{tracking_number}。",
                "sms_content": "订单 {order_id} 已发货,快递单号:{tracking_number}。",
                "push_title": "订单已发货",
                "push_content": "订单 {order_id} 已发货"
            },
            "order_cancelled": {
                "email_subject": "订单取消通知 - {order_id}",
                "email_content": "亲爱的 {customer_name},您的订单 {order_id} 已取消,原因:{reason}。如有疑问请联系客服。",
                "sms_content": "订单 {order_id} 已取消:{reason}。",
                "push_title": "订单已取消",
                "push_content": "订单 {order_id} 已取消"
            }
        }
    
    def _register_handlers(self):
        """注册消息处理器"""
        self.message_broker.register_handler(
            "notification.email", 
            self._handle_email_notification
        )
        self.message_broker.register_handler(
            "notification.sms", 
            self._handle_sms_notification
        )
        self.message_broker.register_handler(
            "notification.push", 
            self._handle_push_notification
        )
        
        # 监听业务事件,自动发送通知
        self.message_broker.register_handler(
            "order.created", 
            self._handle_order_created
        )
        self.message_broker.register_handler(
            "payment.success", 
            self._handle_payment_success
        )
        self.message_broker.register_handler(
            "payment.failed", 
            self._handle_payment_failed
        )
        self.message_broker.register_handler(
            "order.cancelled", 
            self._handle_order_cancelled
        )
    
    async def send_notification(self, notification_type: NotificationType, 
                              recipient: str, template_id: str, 
                              data: Dict[str, Any]) -> bool:
        """发送通知"""
        try:
            if notification_type not in self.channels:
                self.logger.error(f"不支持的通知类型: {notification_type}")
                return False
            
            if template_id not in self.templates:
                self.logger.error(f"模板不存在: {template_id}")
                return False
            
            template = self.templates[template_id]
            channel = self.channels[notification_type]
            
            # 根据通知类型选择模板内容
            if notification_type == NotificationType.EMAIL:
                subject = template["email_subject"].format(**data)
                content = template["email_content"].format(**data)
            elif notification_type == NotificationType.SMS:
                subject = ""
                content = template["sms_content"].format(**data)
            elif notification_type == NotificationType.PUSH:
                subject = template["push_title"].format(**data)
                content = template["push_content"].format(**data)
            else:
                subject = "通知"
                content = str(data)
            
            # 发送通知
            success = await channel.send_notification(recipient, subject, content, data)
            
            # 记录通知历史
            self.notification_history.append({
                "notification_id": str(uuid.uuid4()),
                "type": notification_type.value,
                "recipient": recipient,
                "template_id": template_id,
                "subject": subject,
                "content": content,
                "data": data,
                "success": success,
                "sent_at": datetime.now().isoformat()
            })
            
            return success
            
        except Exception as e:
            self.logger.error(f"发送通知失败: {e}")
            return False
    
    async def _handle_email_notification(self, message_data: Dict[str, Any], 
                                       properties: Dict[str, Any]):
        """处理邮件通知请求"""
        try:
            recipient = message_data.get("recipient")
            subject = message_data.get("subject")
            content = message_data.get("content")
            data = message_data.get("data", {})
            
            channel = self.channels[NotificationType.EMAIL]
            success = await channel.send_notification(recipient, subject, content, data)
            
            self.logger.info(f"处理邮件通知: {recipient}, 成功: {success}")
            
        except Exception as e:
            self.logger.error(f"处理邮件通知失败: {e}")
    
    async def _handle_sms_notification(self, message_data: Dict[str, Any], 
                                     properties: Dict[str, Any]):
        """处理短信通知请求"""
        try:
            recipient = message_data.get("recipient")
            content = message_data.get("content")
            data = message_data.get("data", {})
            
            channel = self.channels[NotificationType.SMS]
            success = await channel.send_notification(recipient, "", content, data)
            
            self.logger.info(f"处理短信通知: {recipient}, 成功: {success}")
            
        except Exception as e:
            self.logger.error(f"处理短信通知失败: {e}")
    
    async def _handle_push_notification(self, message_data: Dict[str, Any], 
                                      properties: Dict[str, Any]):
        """处理推送通知请求"""
        try:
            recipient = message_data.get("recipient")
            subject = message_data.get("subject")
            content = message_data.get("content")
            data = message_data.get("data", {})
            
            channel = self.channels[NotificationType.PUSH]
            success = await channel.send_notification(recipient, subject, content, data)
            
            self.logger.info(f"处理推送通知: {recipient}, 成功: {success}")
            
        except Exception as e:
            self.logger.error(f"处理推送通知失败: {e}")
    
    async def _handle_order_created(self, message_data: Dict[str, Any], 
                                  properties: Dict[str, Any]):
        """处理订单创建事件,发送通知"""
        try:
            # 这里需要获取客户信息,实际项目中会从数据库查询
            # 为了演示,我们使用模拟数据
            customer_email = "customer@example.com"
            customer_phone = "13800138000"
            customer_name = "张三"
            
            notification_data = {
                "order_id": message_data.get("order_id"),
                "customer_name": customer_name,
                "amount": message_data.get("total_amount")
            }
            
            # 发送邮件通知
            await self.send_notification(
                NotificationType.EMAIL,
                customer_email,
                "order_created",
                notification_data
            )
            
            # 发送短信通知
            await self.send_notification(
                NotificationType.SMS,
                customer_phone,
                "order_created",
                notification_data
            )
            
        except Exception as e:
            self.logger.error(f"处理订单创建通知失败: {e}")
    
    async def _handle_payment_success(self, message_data: Dict[str, Any], 
                                    properties: Dict[str, Any]):
        """处理支付成功事件,发送通知"""
        try:
            customer_email = "customer@example.com"
            customer_name = "张三"
            
            notification_data = {
                "order_id": message_data.get("order_id"),
                "customer_name": customer_name,
                "amount": message_data.get("amount")
            }
            
            await self.send_notification(
                NotificationType.EMAIL,
                customer_email,
                "payment_success",
                notification_data
            )
            
        except Exception as e:
            self.logger.error(f"处理支付成功通知失败: {e}")
    
    async def _handle_payment_failed(self, message_data: Dict[str, Any], 
                                   properties: Dict[str, Any]):
        """处理支付失败事件,发送通知"""
        try:
            customer_email = "customer@example.com"
            customer_name = "张三"
            
            notification_data = {
                "order_id": message_data.get("order_id"),
                "customer_name": customer_name,
                "failure_reason": message_data.get("failure_reason", "未知原因")
            }
            
            await self.send_notification(
                NotificationType.EMAIL,
                customer_email,
                "payment_failed",
                notification_data
            )
            
        except Exception as e:
            self.logger.error(f"处理支付失败通知失败: {e}")
    
    async def _handle_order_cancelled(self, message_data: Dict[str, Any], 
                                    properties: Dict[str, Any]):
        """处理订单取消事件,发送通知"""
        try:
            customer_email = "customer@example.com"
            customer_name = "张三"
            
            notification_data = {
                "order_id": message_data.get("order_id"),
                "customer_name": customer_name,
                "reason": message_data.get("reason", "用户取消")
            }
            
            await self.send_notification(
                NotificationType.EMAIL,
                customer_email,
                "order_cancelled",
                notification_data
            )
            
        except Exception as e:
            self.logger.error(f"处理订单取消通知失败: {e}")
    
    def get_notification_history(self, recipient: str = None, 
                               limit: int = 100) -> List[Dict[str, Any]]:
        """获取通知历史"""
        history = self.notification_history
        
        if recipient:
            history = [n for n in history if n["recipient"] == recipient]
        
        return history[-limit:]
    
    def get_notification_stats(self) -> Dict[str, Any]:
        """获取通知统计"""
        total_notifications = len(self.notification_history)
        successful_notifications = len([
            n for n in self.notification_history if n["success"]
        ])
        
        type_stats = {}
        for notification in self.notification_history:
            notification_type = notification["type"]
            if notification_type not in type_stats:
                type_stats[notification_type] = {"total": 0, "success": 0}
            
            type_stats[notification_type]["total"] += 1
            if notification["success"]:
                type_stats[notification_type]["success"] += 1
        
        return {
            "total_notifications": total_notifications,
            "successful_notifications": successful_notifications,
            "success_rate": successful_notifications / total_notifications if total_notifications > 0 else 0,
            "type_stats": type_stats
         }

10.6 完整系统集成示例

class ECommerceSystem:
    """电商系统主类"""
    
    def __init__(self, rabbitmq_url: str = "amqp://localhost"):
        self.logger = logging.getLogger(__name__)
        
        # 初始化消息代理
        self.message_broker = ECommerceMessageBroker(rabbitmq_url)
        
        # 初始化各个服务
        self.order_service = OrderService(self.message_broker)
        
        # 初始化支付网关和支付服务
        self.payment_gateway = MockPaymentGateway(success_rate=0.85)
        self.payment_service = PaymentService(self.message_broker, self.payment_gateway)
        
        # 初始化库存服务
        self.inventory_service = InventoryService(self.message_broker)
        
        # 初始化通知服务
        self.notification_service = NotificationService(self.message_broker)
        
        # 系统状态
        self.is_running = False
    
    async def start(self):
        """启动系统"""
        try:
            self.logger.info("启动电商系统...")
            
            # 启动消息代理
            await self.message_broker.start()
            
            # 设置消息队列拓扑
            await self.message_broker.setup_topology()
            
            # 启动消息消费
            await self.message_broker.start_consuming()
            
            self.is_running = True
            self.logger.info("电商系统启动成功")
            
        except Exception as e:
            self.logger.error(f"启动系统失败: {e}")
            raise
    
    async def stop(self):
        """停止系统"""
        try:
            self.logger.info("停止电商系统...")
            
            # 停止消息消费
            await self.message_broker.stop_consuming()
            
            # 关闭连接
            await self.message_broker.close()
            
            self.is_running = False
            self.logger.info("电商系统已停止")
            
        except Exception as e:
            self.logger.error(f"停止系统失败: {e}")
    
    async def create_order(self, customer_id: str, items: List[Dict[str, Any]]) -> str:
        """创建订单"""
        try:
            # 构建订单项
            order_items = []
            total_amount = 0.0
            
            for item_data in items:
                product_id = item_data["product_id"]
                quantity = item_data["quantity"]
                price = item_data["price"]
                
                # 检查库存
                available = await self.inventory_service.check_availability(product_id, quantity)
                if not available:
                    raise ValueError(f"商品 {product_id} 库存不足")
                
                order_item = OrderItem(
                    product_id=product_id,
                    quantity=quantity,
                    unit_price=price,
                    total_price=price * quantity
                )
                order_items.append(order_item)
                total_amount += order_item.total_price
            
            # 创建订单
            order_id = await self.order_service.create_order(
                customer_id=customer_id,
                items=order_items,
                total_amount=total_amount
            )
            
            self.logger.info(f"订单创建成功: {order_id}")
            return order_id
            
        except Exception as e:
            self.logger.error(f"创建订单失败: {e}")
            raise
    
    async def process_order_workflow(self, order_id: str) -> Dict[str, Any]:
        """处理订单工作流"""
        try:
            # 获取订单信息
            order = self.order_service.get_order(order_id)
            if not order:
                raise ValueError(f"订单不存在: {order_id}")
            
            workflow_result = {
                "order_id": order_id,
                "steps": [],
                "success": False,
                "error": None
            }
            
            # 步骤1: 预留库存
            self.logger.info(f"步骤1: 为订单 {order_id} 预留库存")
            inventory_reserved = True
            
            for item in order.items:
                success = await self.inventory_service.reserve_inventory(
                    item.product_id, item.quantity, order_id
                )
                if not success:
                    inventory_reserved = False
                    break
            
            workflow_result["steps"].append({
                "step": "inventory_reservation",
                "success": inventory_reserved,
                "timestamp": datetime.now().isoformat()
            })
            
            if not inventory_reserved:
                workflow_result["error"] = "库存预留失败"
                await self.order_service.cancel_order(order_id, "库存不足")
                return workflow_result
            
            # 步骤2: 确认订单
            self.logger.info(f"步骤2: 确认订单 {order_id}")
            order_confirmed = await self.order_service.confirm_order(order_id)
            
            workflow_result["steps"].append({
                "step": "order_confirmation",
                "success": order_confirmed,
                "timestamp": datetime.now().isoformat()
            })
            
            if not order_confirmed:
                workflow_result["error"] = "订单确认失败"
                return workflow_result
            
            # 步骤3: 等待支付处理
            self.logger.info(f"步骤3: 等待支付处理 {order_id}")
            
            # 等待支付服务处理(在实际系统中,这会通过消息队列异步处理)
            await asyncio.sleep(2)
            
            # 检查支付状态
            payments = self.payment_service.get_order_payments(order_id)
            payment_success = any(p.status == PaymentStatus.SUCCESS for p in payments)
            
            workflow_result["steps"].append({
                "step": "payment_processing",
                "success": payment_success,
                "timestamp": datetime.now().isoformat()
            })
            
            if payment_success:
                # 步骤4: 确认库存扣减
                self.logger.info(f"步骤4: 确认库存扣减 {order_id}")
                
                for item in order.items:
                    await self.inventory_service.confirm_inventory(
                        item.product_id, item.quantity, order_id
                    )
                
                workflow_result["steps"].append({
                    "step": "inventory_confirmation",
                    "success": True,
                    "timestamp": datetime.now().isoformat()
                })
                
                workflow_result["success"] = True
                self.logger.info(f"订单 {order_id} 处理完成")
            else:
                # 支付失败,释放库存
                self.logger.warning(f"支付失败,释放库存: {order_id}")
                
                for item in order.items:
                    await self.inventory_service.release_inventory(
                        item.product_id, item.quantity, order_id
                    )
                
                workflow_result["error"] = "支付失败"
            
            return workflow_result
            
        except Exception as e:
            self.logger.error(f"处理订单工作流失败: {e}")
            workflow_result["error"] = str(e)
            return workflow_result
    
    def get_system_status(self) -> Dict[str, Any]:
        """获取系统状态"""
        return {
            "system_running": self.is_running,
            "message_broker_connected": self.message_broker.is_connected(),
            "services": {
                "order_service": {
                    "total_orders": len(self.order_service.orders),
                    "pending_orders": len(self.order_service.get_orders_by_status(OrderStatus.PENDING)),
                    "confirmed_orders": len(self.order_service.get_orders_by_status(OrderStatus.CONFIRMED)),
                    "completed_orders": len(self.order_service.get_orders_by_status(OrderStatus.COMPLETED))
                },
                "payment_service": {
                    "total_payments": len(self.payment_service.payment_requests),
                    "successful_payments": len([
                        p for p in self.payment_service.payment_requests.values()
                        if p.status == PaymentStatus.SUCCESS
                    ])
                },
                "inventory_service": {
                    "total_products": len(self.inventory_service.inventory),
                    "low_stock_products": len(self.inventory_service.get_low_stock_products())
                },
                "notification_service": self.notification_service.get_notification_stats()
            },
            "timestamp": datetime.now().isoformat()
        }

# 完整的系统演示示例
async def ecommerce_system_demo():
    """电商系统演示"""
    
    # 配置日志
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    logger = logging.getLogger(__name__)
    
    # 创建电商系统
    system = ECommerceSystem()
    
    try:
        # 启动系统
        logger.info("=== 启动电商系统 ===")
        await system.start()
        
        # 等待系统初始化
        await asyncio.sleep(1)
        
        # 创建测试订单
        logger.info("=== 创建测试订单 ===")
        
        test_orders = [
            {
                "customer_id": "customer_001",
                "items": [
                    {"product_id": "product_001", "quantity": 2, "price": 99.99},
                    {"product_id": "product_002", "quantity": 1, "price": 149.99}
                ]
            },
            {
                "customer_id": "customer_002",
                "items": [
                    {"product_id": "product_001", "quantity": 5, "price": 99.99}
                ]
            },
            {
                "customer_id": "customer_003",
                "items": [
                    {"product_id": "product_002", "quantity": 3, "price": 149.99}
                ]
            }
        ]
        
        created_orders = []
        
        for i, order_data in enumerate(test_orders, 1):
            try:
                logger.info(f"创建订单 {i}...")
                order_id = await system.create_order(
                    customer_id=order_data["customer_id"],
                    items=order_data["items"]
                )
                created_orders.append(order_id)
                logger.info(f"订单 {i} 创建成功: {order_id}")
                
                # 等待消息处理
                await asyncio.sleep(0.5)
                
            except Exception as e:
                logger.error(f"创建订单 {i} 失败: {e}")
        
        # 处理订单工作流
        logger.info("=== 处理订单工作流 ===")
        
        workflow_results = []
        for order_id in created_orders:
            logger.info(f"处理订单工作流: {order_id}")
            result = await system.process_order_workflow(order_id)
            workflow_results.append(result)
            
            logger.info(f"订单 {order_id} 工作流结果: {result['success']}")
            if not result['success']:
                logger.warning(f"订单 {order_id} 处理失败: {result['error']}")
            
            # 等待处理完成
            await asyncio.sleep(1)
        
        # 等待所有异步消息处理完成
        logger.info("=== 等待消息处理完成 ===")
        await asyncio.sleep(3)
        
        # 显示系统状态
        logger.info("=== 系统状态报告 ===")
        status = system.get_system_status()
        
        print("\n" + "="*50)
        print("电商系统状态报告")
        print("="*50)
        print(f"系统运行状态: {status['system_running']}")
        print(f"消息代理连接: {status['message_broker_connected']}")
        print("\n服务状态:")
        
        # 订单服务状态
        order_stats = status['services']['order_service']
        print(f"  订单服务:")
        print(f"    总订单数: {order_stats['total_orders']}")
        print(f"    待处理订单: {order_stats['pending_orders']}")
        print(f"    已确认订单: {order_stats['confirmed_orders']}")
        print(f"    已完成订单: {order_stats['completed_orders']}")
        
        # 支付服务状态
        payment_stats = status['services']['payment_service']
        print(f"  支付服务:")
        print(f"    总支付请求: {payment_stats['total_payments']}")
        print(f"    成功支付: {payment_stats['successful_payments']}")
        
        # 库存服务状态
        inventory_stats = status['services']['inventory_service']
        print(f"  库存服务:")
        print(f"    商品总数: {inventory_stats['total_products']}")
        print(f"    低库存商品: {inventory_stats['low_stock_products']}")
        
        # 通知服务状态
        notification_stats = status['services']['notification_service']
        print(f"  通知服务:")
        print(f"    总通知数: {notification_stats['total_notifications']}")
        print(f"    成功通知: {notification_stats['successful_notifications']}")
        print(f"    成功率: {notification_stats['success_rate']:.2%}")
        
        # 显示详细的订单信息
        print("\n" + "="*50)
        print("订单详细信息")
        print("="*50)
        
        for order_id in created_orders:
            order = system.order_service.get_order(order_id)
            if order:
                print(f"\n订单 {order_id}:")
                print(f"  客户ID: {order.customer_id}")
                print(f"  状态: {order.status.value}")
                print(f"  总金额: ¥{order.total_amount:.2f}")
                print(f"  创建时间: {order.created_at}")
                print(f"  商品数量: {len(order.items)}")
                
                # 显示支付信息
                payments = system.payment_service.get_order_payments(order_id)
                if payments:
                    payment = payments[0]  # 取第一个支付记录
                    print(f"  支付状态: {payment.status.value}")
                    print(f"  支付金额: ¥{payment.amount:.2f}")
        
        # 显示库存信息
        print("\n" + "="*50)
        print("库存信息")
        print("="*50)
        
        all_inventory = system.inventory_service.get_all_inventory()
        for product_id, inventory_item in all_inventory.items():
            print(f"\n商品 {product_id}:")
            print(f"  可用库存: {inventory_item.available_quantity}")
            print(f"  预留库存: {inventory_item.reserved_quantity}")
            print(f"  总库存: {inventory_item.total_quantity}")
            print(f"  仓库ID: {inventory_item.warehouse_id}")
        
        # 显示工作流结果
        print("\n" + "="*50)
        print("订单工作流结果")
        print("="*50)
        
        for result in workflow_results:
            print(f"\n订单 {result['order_id']}:")
            print(f"  处理结果: {'成功' if result['success'] else '失败'}")
            if result['error']:
                print(f"  错误信息: {result['error']}")
            
            print(f"  处理步骤:")
            for step in result['steps']:
                status_text = "✓" if step['success'] else "✗"
                print(f"    {status_text} {step['step']} - {step['timestamp']}")
        
        print("\n" + "="*50)
        print("演示完成")
        print("="*50)
        
    except Exception as e:
        logger.error(f"系统演示失败: {e}")
        raise
    
    finally:
        # 停止系统
        logger.info("=== 停止电商系统 ===")
        await system.stop()

# 运行演示
if __name__ == "__main__":
    asyncio.run(ecommerce_system_demo())

10.7 案例分析与最佳实践

10.7.1 架构设计分析

优点:

  1. 松耦合设计:各服务通过消息队列通信,降低了服务间的直接依赖
  2. 异步处理:支付、库存、通知等操作异步执行,提高系统响应速度
  3. 可扩展性:每个服务可以独立扩展,支持水平扩展
  4. 容错性:消息队列提供重试机制,提高系统可靠性
  5. 事件驱动:基于事件的架构,便于添加新功能和服务

改进建议:

  1. 分布式事务:引入Saga模式或两阶段提交处理分布式事务
  2. 服务发现:添加服务注册与发现机制
  3. 配置管理:使用配置中心管理各服务配置
  4. 监控告警:添加全链路监控和告警机制

10.7.2 消息队列使用最佳实践

1. 消息设计

# 好的消息设计
{
    "event_id": "uuid",
    "event_type": "order.created",
    "timestamp": "2024-01-01T00:00:00Z",
    "version": "1.0",
    "data": {
        "order_id": "order_123",
        "customer_id": "customer_456",
        "total_amount": 299.99
    },
    "metadata": {
        "source": "order-service",
        "correlation_id": "trace_789"
    }
}

2. 错误处理

# 实现重试和死信队列
async def handle_message_with_retry(self, message, max_retries=3):
    for attempt in range(max_retries):
        try:
            await self.process_message(message)
            return
        except Exception as e:
            if attempt == max_retries - 1:
                # 发送到死信队列
                await self.send_to_dlx(message, str(e))
            else:
                await asyncio.sleep(2 ** attempt)  # 指数退避

3. 消息幂等性

class IdempotentMessageHandler:
    def __init__(self):
        self.processed_messages = set()
    
    async def handle_message(self, message):
        message_id = message.get("event_id")
        
        if message_id in self.processed_messages:
            self.logger.info(f"消息已处理,跳过: {message_id}")
            return
        
        try:
            await self.process_message(message)
            self.processed_messages.add(message_id)
        except Exception as e:
            self.logger.error(f"处理消息失败: {e}")
            raise

10.7.3 性能优化建议

1. 连接池管理

class ConnectionPool:
    def __init__(self, max_connections=10):
        self.max_connections = max_connections
        self.connections = asyncio.Queue(maxsize=max_connections)
        self.created_connections = 0
    
    async def get_connection(self):
        if self.connections.empty() and self.created_connections < self.max_connections:
            connection = await self.create_connection()
            self.created_connections += 1
            return connection
        else:
            return await self.connections.get()
    
    async def return_connection(self, connection):
        await self.connections.put(connection)

2. 批量处理

class BatchProcessor:
    def __init__(self, batch_size=100, flush_interval=5.0):
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.batch = []
        self.last_flush = time.time()
    
    async def add_message(self, message):
        self.batch.append(message)
        
        if (len(self.batch) >= self.batch_size or 
            time.time() - self.last_flush >= self.flush_interval):
            await self.flush_batch()
    
    async def flush_batch(self):
        if self.batch:
            await self.process_batch(self.batch)
            self.batch.clear()
            self.last_flush = time.time()

3. 消息压缩

import gzip
import json

class MessageCompressor:
    @staticmethod
    def compress_message(message):
        json_str = json.dumps(message)
        compressed = gzip.compress(json_str.encode('utf-8'))
        return compressed
    
    @staticmethod
    def decompress_message(compressed_data):
        decompressed = gzip.decompress(compressed_data)
        json_str = decompressed.decode('utf-8')
        return json.loads(json_str)

10.7.4 安全性考虑

1. 消息加密

from cryptography.fernet import Fernet

class MessageEncryption:
    def __init__(self, key):
        self.cipher = Fernet(key)
    
    def encrypt_message(self, message):
        json_str = json.dumps(message)
        encrypted = self.cipher.encrypt(json_str.encode())
        return encrypted
    
    def decrypt_message(self, encrypted_data):
        decrypted = self.cipher.decrypt(encrypted_data)
        json_str = decrypted.decode()
        return json.loads(json_str)

2. 访问控制

class AccessControl:
    def __init__(self):
        self.permissions = {
            "order-service": ["order.*", "payment.request_created"],
            "payment-service": ["payment.*", "order.confirmed"],
            "inventory-service": ["inventory.*"],
            "notification-service": ["notification.*", "*.success", "*.failed"]
        }
    
    def check_permission(self, service_name, routing_key):
        service_permissions = self.permissions.get(service_name, [])
        
        for permission in service_permissions:
            if self.match_pattern(permission, routing_key):
                return True
        
        return False
    
    def match_pattern(self, pattern, routing_key):
        # 实现通配符匹配逻辑
        import fnmatch
        return fnmatch.fnmatch(routing_key, pattern)

10.8 本章总结

核心知识点

  1. 微服务架构设计

    • 服务拆分原则
    • 消息队列作为服务间通信机制
    • 事件驱动架构模式
  2. 业务流程设计

    • 订单处理流程
    • 支付处理流程
    • 库存管理流程
    • 通知发送流程
  3. 消息队列应用

    • 异步消息处理
    • 事件发布与订阅
    • 消息路由与过滤
    • 错误处理与重试
  4. 系统集成

    • 服务协调
    • 数据一致性
    • 事务处理
    • 监控与运维

最佳实践

  1. 设计原则

    • 单一职责原则
    • 松耦合设计
    • 高内聚低耦合
    • 容错性设计
  2. 消息设计

    • 消息结构标准化
    • 版本兼容性
    • 幂等性保证
    • 消息追踪
  3. 性能优化

    • 连接池管理
    • 批量处理
    • 消息压缩
    • 缓存策略
  4. 安全性

    • 消息加密
    • 访问控制
    • 审计日志
    • 安全传输

实战经验

  1. 开发阶段

    • 先设计消息格式和接口
    • 使用模拟服务进行测试
    • 逐步集成各个服务
    • 完善错误处理机制
  2. 测试阶段

    • 单元测试覆盖核心逻辑
    • 集成测试验证服务协作
    • 压力测试评估性能
    • 故障注入测试容错性
  3. 部署阶段

    • 蓝绿部署减少停机时间
    • 监控关键指标
    • 日志聚合分析
    • 告警机制配置
  4. 运维阶段

    • 性能监控
    • 容量规划
    • 故障排查
    • 持续优化

练习题

  1. 基础练习

    • 实现一个简单的订单服务
    • 添加消息重试机制
    • 实现消息幂等性处理
  2. 进阶练习

    • 实现分布式事务处理
    • 添加服务熔断机制
    • 实现消息压缩和加密
  3. 综合练习

    • 设计完整的电商系统架构
    • 实现系统监控和告警
    • 进行性能测试和优化

通过本章的学习,你应该掌握了如何使用RabbitMQ构建复杂的分布式系统,理解了消息队列在微服务架构中的重要作用,并学会了相关的最佳实践和优化技巧。这些知识将为你在实际项目中应用RabbitMQ提供坚实的基础。