4.1 交换机类型详解

4.1.1 Direct Exchange(直连交换机)

from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
import pika
import json
import time

class RoutingStrategy(Enum):
    """路由策略"""
    EXACT_MATCH = "exact"      # 精确匹配
    PREFIX_MATCH = "prefix"    # 前缀匹配
    SUFFIX_MATCH = "suffix"    # 后缀匹配
    PATTERN_MATCH = "pattern"  # 模式匹配

@dataclass
class DirectExchangeConfig:
    """直连交换机配置"""
    name: str
    durable: bool = True
    auto_delete: bool = False
    internal: bool = False
    arguments: Optional[Dict[str, Any]] = None

class DirectExchangeManager:
    """直连交换机管理器"""
    
    def __init__(self, channel):
        self.channel = channel
        self.exchanges = {}
        self.bindings = {}
    
    def declare_exchange(self, config: DirectExchangeConfig) -> bool:
        """声明直连交换机"""
        try:
            self.channel.exchange_declare(
                exchange=config.name,
                exchange_type='direct',
                durable=config.durable,
                auto_delete=config.auto_delete,
                internal=config.internal,
                arguments=config.arguments or {}
            )
            
            self.exchanges[config.name] = config
            print(f"✅ 直连交换机声明成功: {config.name}")
            return True
            
        except Exception as e:
            print(f"❌ 直连交换机声明失败: {e}")
            return False
    
    def bind_queue(self, exchange_name: str, queue_name: str, routing_key: str) -> bool:
        """绑定队列到交换机"""
        try:
            self.channel.queue_bind(
                exchange=exchange_name,
                queue=queue_name,
                routing_key=routing_key
            )
            
            # 记录绑定关系
            if exchange_name not in self.bindings:
                self.bindings[exchange_name] = []
            
            self.bindings[exchange_name].append({
                'queue': queue_name,
                'routing_key': routing_key,
                'type': 'direct'
            })
            
            print(f"✅ 队列绑定成功: {queue_name} -> {exchange_name} (路由键: {routing_key})")
            return True
            
        except Exception as e:
            print(f"❌ 队列绑定失败: {e}")
            return False
    
    def publish_message(self, exchange_name: str, routing_key: str, message: Dict[str, Any], 
                       properties: Optional[pika.BasicProperties] = None) -> bool:
        """发布消息到直连交换机"""
        try:
            body = json.dumps(message, ensure_ascii=False)
            
            self.channel.basic_publish(
                exchange=exchange_name,
                routing_key=routing_key,
                body=body.encode('utf-8'),
                properties=properties or pika.BasicProperties(
                    delivery_mode=2,  # 持久化
                    timestamp=int(time.time())
                )
            )
            
            print(f"✅ 消息发布成功: {exchange_name} (路由键: {routing_key})")
            return True
            
        except Exception as e:
            print(f"❌ 消息发布失败: {e}")
            return False
    
    def get_bindings(self, exchange_name: str) -> List[Dict[str, str]]:
        """获取交换机绑定关系"""
        return self.bindings.get(exchange_name, [])
    
    def create_routing_topology(self, topology_config: Dict[str, Any]) -> bool:
        """创建路由拓扑"""
        try:
            # 声明交换机
            exchange_config = DirectExchangeConfig(
                name=topology_config['exchange_name'],
                durable=topology_config.get('durable', True)
            )
            
            if not self.declare_exchange(exchange_config):
                return False
            
            # 声明队列并绑定
            for queue_config in topology_config.get('queues', []):
                queue_name = queue_config['name']
                routing_keys = queue_config.get('routing_keys', [])
                
                # 声明队列
                self.channel.queue_declare(
                    queue=queue_name,
                    durable=queue_config.get('durable', True)
                )
                
                # 绑定路由键
                for routing_key in routing_keys:
                    self.bind_queue(exchange_config.name, queue_name, routing_key)
            
            print(f"✅ 路由拓扑创建成功: {exchange_config.name}")
            return True
            
        except Exception as e:
            print(f"❌ 路由拓扑创建失败: {e}")
            return False

4.1.2 Topic Exchange(主题交换机)

import re
from typing import Pattern

@dataclass
class TopicExchangeConfig:
    """主题交换机配置"""
    name: str
    durable: bool = True
    auto_delete: bool = False
    internal: bool = False
    arguments: Optional[Dict[str, Any]] = None

class TopicExchangeManager:
    """主题交换机管理器"""
    
    def __init__(self, channel):
        self.channel = channel
        self.exchanges = {}
        self.bindings = {}
        self.routing_patterns = {}
    
    def declare_exchange(self, config: TopicExchangeConfig) -> bool:
        """声明主题交换机"""
        try:
            self.channel.exchange_declare(
                exchange=config.name,
                exchange_type='topic',
                durable=config.durable,
                auto_delete=config.auto_delete,
                internal=config.internal,
                arguments=config.arguments or {}
            )
            
            self.exchanges[config.name] = config
            print(f"✅ 主题交换机声明成功: {config.name}")
            return True
            
        except Exception as e:
            print(f"❌ 主题交换机声明失败: {e}")
            return False
    
    def bind_queue_with_pattern(self, exchange_name: str, queue_name: str, 
                               routing_pattern: str) -> bool:
        """使用模式绑定队列"""
        try:
            self.channel.queue_bind(
                exchange=exchange_name,
                queue=queue_name,
                routing_key=routing_pattern
            )
            
            # 记录绑定关系
            if exchange_name not in self.bindings:
                self.bindings[exchange_name] = []
            
            self.bindings[exchange_name].append({
                'queue': queue_name,
                'routing_pattern': routing_pattern,
                'type': 'topic'
            })
            
            # 编译正则表达式用于本地匹配测试
            regex_pattern = self._convert_topic_to_regex(routing_pattern)
            if exchange_name not in self.routing_patterns:
                self.routing_patterns[exchange_name] = []
            
            self.routing_patterns[exchange_name].append({
                'queue': queue_name,
                'pattern': routing_pattern,
                'regex': re.compile(regex_pattern)
            })
            
            print(f"✅ 主题队列绑定成功: {queue_name} -> {exchange_name} (模式: {routing_pattern})")
            return True
            
        except Exception as e:
            print(f"❌ 主题队列绑定失败: {e}")
            return False
    
    def _convert_topic_to_regex(self, topic_pattern: str) -> str:
        """将主题模式转换为正则表达式"""
        # 转义特殊字符
        pattern = re.escape(topic_pattern)
        # 替换通配符
        pattern = pattern.replace(r'\*', r'[^.]*')  # * 匹配一个单词
        pattern = pattern.replace(r'\#', r'.*')     # # 匹配零个或多个单词
        return f'^{pattern}$'
    
    def test_routing(self, exchange_name: str, routing_key: str) -> List[str]:
        """测试路由匹配"""
        matched_queues = []
        
        if exchange_name in self.routing_patterns:
            for binding in self.routing_patterns[exchange_name]:
                if binding['regex'].match(routing_key):
                    matched_queues.append(binding['queue'])
        
        return matched_queues
    
    def publish_message(self, exchange_name: str, routing_key: str, message: Dict[str, Any],
                       properties: Optional[pika.BasicProperties] = None) -> bool:
        """发布消息到主题交换机"""
        try:
            # 测试路由匹配
            matched_queues = self.test_routing(exchange_name, routing_key)
            print(f"📍 路由键 '{routing_key}' 将匹配队列: {matched_queues}")
            
            body = json.dumps(message, ensure_ascii=False)
            
            self.channel.basic_publish(
                exchange=exchange_name,
                routing_key=routing_key,
                body=body.encode('utf-8'),
                properties=properties or pika.BasicProperties(
                    delivery_mode=2,
                    timestamp=int(time.time())
                )
            )
            
            print(f"✅ 主题消息发布成功: {exchange_name} (路由键: {routing_key})")
            return True
            
        except Exception as e:
            print(f"❌ 主题消息发布失败: {e}")
            return False
    
    def create_topic_topology(self, topology_config: Dict[str, Any]) -> bool:
        """创建主题拓扑"""
        try:
            # 声明交换机
            exchange_config = TopicExchangeConfig(
                name=topology_config['exchange_name'],
                durable=topology_config.get('durable', True)
            )
            
            if not self.declare_exchange(exchange_config):
                return False
            
            # 声明队列并绑定
            for queue_config in topology_config.get('queues', []):
                queue_name = queue_config['name']
                routing_patterns = queue_config.get('routing_patterns', [])
                
                # 声明队列
                self.channel.queue_declare(
                    queue=queue_name,
                    durable=queue_config.get('durable', True)
                )
                
                # 绑定路由模式
                for pattern in routing_patterns:
                    self.bind_queue_with_pattern(exchange_config.name, queue_name, pattern)
            
            print(f"✅ 主题拓扑创建成功: {exchange_config.name}")
            return True
            
        except Exception as e:
            print(f"❌ 主题拓扑创建失败: {e}")
            return False

4.1.3 Fanout Exchange(扇出交换机)

@dataclass
class FanoutExchangeConfig:
    """扇出交换机配置"""
    name: str
    durable: bool = True
    auto_delete: bool = False
    internal: bool = False
    arguments: Optional[Dict[str, Any]] = None

class FanoutExchangeManager:
    """扇出交换机管理器"""
    
    def __init__(self, channel):
        self.channel = channel
        self.exchanges = {}
        self.bindings = {}
    
    def declare_exchange(self, config: FanoutExchangeConfig) -> bool:
        """声明扇出交换机"""
        try:
            self.channel.exchange_declare(
                exchange=config.name,
                exchange_type='fanout',
                durable=config.durable,
                auto_delete=config.auto_delete,
                internal=config.internal,
                arguments=config.arguments or {}
            )
            
            self.exchanges[config.name] = config
            print(f"✅ 扇出交换机声明成功: {config.name}")
            return True
            
        except Exception as e:
            print(f"❌ 扇出交换机声明失败: {e}")
            return False
    
    def bind_queue(self, exchange_name: str, queue_name: str) -> bool:
        """绑定队列到扇出交换机"""
        try:
            # 扇出交换机忽略路由键
            self.channel.queue_bind(
                exchange=exchange_name,
                queue=queue_name,
                routing_key=''  # 扇出交换机不使用路由键
            )
            
            # 记录绑定关系
            if exchange_name not in self.bindings:
                self.bindings[exchange_name] = []
            
            self.bindings[exchange_name].append({
                'queue': queue_name,
                'type': 'fanout'
            })
            
            print(f"✅ 扇出队列绑定成功: {queue_name} -> {exchange_name}")
            return True
            
        except Exception as e:
            print(f"❌ 扇出队列绑定失败: {e}")
            return False
    
    def broadcast_message(self, exchange_name: str, message: Dict[str, Any],
                         properties: Optional[pika.BasicProperties] = None) -> bool:
        """广播消息到所有绑定的队列"""
        try:
            # 获取绑定的队列数量
            bound_queues = self.bindings.get(exchange_name, [])
            queue_count = len(bound_queues)
            
            print(f"📢 广播消息到 {queue_count} 个队列")
            
            body = json.dumps(message, ensure_ascii=False)
            
            self.channel.basic_publish(
                exchange=exchange_name,
                routing_key='',  # 扇出交换机忽略路由键
                body=body.encode('utf-8'),
                properties=properties or pika.BasicProperties(
                    delivery_mode=2,
                    timestamp=int(time.time())
                )
            )
            
            print(f"✅ 扇出消息广播成功: {exchange_name}")
            return True
            
        except Exception as e:
            print(f"❌ 扇出消息广播失败: {e}")
            return False
    
    def create_fanout_topology(self, topology_config: Dict[str, Any]) -> bool:
        """创建扇出拓扑"""
        try:
            # 声明交换机
            exchange_config = FanoutExchangeConfig(
                name=topology_config['exchange_name'],
                durable=topology_config.get('durable', True)
            )
            
            if not self.declare_exchange(exchange_config):
                return False
            
            # 声明队列并绑定
            for queue_config in topology_config.get('queues', []):
                queue_name = queue_config['name']
                
                # 声明队列
                self.channel.queue_declare(
                    queue=queue_name,
                    durable=queue_config.get('durable', True)
                )
                
                # 绑定到扇出交换机
                self.bind_queue(exchange_config.name, queue_name)
            
            print(f"✅ 扇出拓扑创建成功: {exchange_config.name}")
            return True
            
        except Exception as e:
            print(f"❌ 扇出拓扑创建失败: {e}")
            return False
    
    def get_bound_queues(self, exchange_name: str) -> List[str]:
        """获取绑定的队列列表"""
        bindings = self.bindings.get(exchange_name, [])
        return [binding['queue'] for binding in bindings]

4.1.4 Headers Exchange(头部交换机)

@dataclass
class HeadersExchangeConfig:
    """头部交换机配置"""
    name: str
    durable: bool = True
    auto_delete: bool = False
    internal: bool = False
    arguments: Optional[Dict[str, Any]] = None

class HeadersExchangeManager:
    """头部交换机管理器"""
    
    def __init__(self, channel):
        self.channel = channel
        self.exchanges = {}
        self.bindings = {}
    
    def declare_exchange(self, config: HeadersExchangeConfig) -> bool:
        """声明头部交换机"""
        try:
            self.channel.exchange_declare(
                exchange=config.name,
                exchange_type='headers',
                durable=config.durable,
                auto_delete=config.auto_delete,
                internal=config.internal,
                arguments=config.arguments or {}
            )
            
            self.exchanges[config.name] = config
            print(f"✅ 头部交换机声明成功: {config.name}")
            return True
            
        except Exception as e:
            print(f"❌ 头部交换机声明失败: {e}")
            return False
    
    def bind_queue_with_headers(self, exchange_name: str, queue_name: str, 
                               headers: Dict[str, Any], match_all: bool = True) -> bool:
        """使用头部匹配绑定队列"""
        try:
            # 设置匹配模式
            arguments = headers.copy()
            arguments['x-match'] = 'all' if match_all else 'any'
            
            self.channel.queue_bind(
                exchange=exchange_name,
                queue=queue_name,
                routing_key='',  # 头部交换机不使用路由键
                arguments=arguments
            )
            
            # 记录绑定关系
            if exchange_name not in self.bindings:
                self.bindings[exchange_name] = []
            
            self.bindings[exchange_name].append({
                'queue': queue_name,
                'headers': headers,
                'match_all': match_all,
                'type': 'headers'
            })
            
            match_mode = "全部匹配" if match_all else "任意匹配"
            print(f"✅ 头部队列绑定成功: {queue_name} -> {exchange_name} ({match_mode}: {headers})")
            return True
            
        except Exception as e:
            print(f"❌ 头部队列绑定失败: {e}")
            return False
    
    def publish_message_with_headers(self, exchange_name: str, message: Dict[str, Any],
                                   headers: Dict[str, Any],
                                   properties: Optional[pika.BasicProperties] = None) -> bool:
        """发布带头部的消息"""
        try:
            # 测试头部匹配
            matched_queues = self.test_header_matching(exchange_name, headers)
            print(f"📍 头部 {headers} 将匹配队列: {matched_queues}")
            
            body = json.dumps(message, ensure_ascii=False)
            
            # 设置消息属性,包含头部
            if properties is None:
                properties = pika.BasicProperties()
            
            properties.headers = headers
            properties.delivery_mode = 2
            properties.timestamp = int(time.time())
            
            self.channel.basic_publish(
                exchange=exchange_name,
                routing_key='',  # 头部交换机不使用路由键
                body=body.encode('utf-8'),
                properties=properties
            )
            
            print(f"✅ 头部消息发布成功: {exchange_name} (头部: {headers})")
            return True
            
        except Exception as e:
            print(f"❌ 头部消息发布失败: {e}")
            return False
    
    def test_header_matching(self, exchange_name: str, message_headers: Dict[str, Any]) -> List[str]:
        """测试头部匹配"""
        matched_queues = []
        
        if exchange_name in self.bindings:
            for binding in self.bindings[exchange_name]:
                if self._match_headers(binding['headers'], message_headers, binding['match_all']):
                    matched_queues.append(binding['queue'])
        
        return matched_queues
    
    def _match_headers(self, binding_headers: Dict[str, Any], 
                      message_headers: Dict[str, Any], match_all: bool) -> bool:
        """匹配头部"""
        if not binding_headers:
            return True
        
        if match_all:
            # 全部匹配:绑定的所有头部都必须在消息头部中存在且值相等
            for key, value in binding_headers.items():
                if key not in message_headers or message_headers[key] != value:
                    return False
            return True
        else:
            # 任意匹配:绑定的任意一个头部在消息头部中存在且值相等即可
            for key, value in binding_headers.items():
                if key in message_headers and message_headers[key] == value:
                    return True
            return False
    
    def create_headers_topology(self, topology_config: Dict[str, Any]) -> bool:
        """创建头部拓扑"""
        try:
            # 声明交换机
            exchange_config = HeadersExchangeConfig(
                name=topology_config['exchange_name'],
                durable=topology_config.get('durable', True)
            )
            
            if not self.declare_exchange(exchange_config):
                return False
            
            # 声明队列并绑定
            for queue_config in topology_config.get('queues', []):
                queue_name = queue_config['name']
                headers_bindings = queue_config.get('headers_bindings', [])
                
                # 声明队列
                self.channel.queue_declare(
                    queue=queue_name,
                    durable=queue_config.get('durable', True)
                )
                
                # 绑定头部匹配
                for binding in headers_bindings:
                    headers = binding.get('headers', {})
                    match_all = binding.get('match_all', True)
                    self.bind_queue_with_headers(exchange_config.name, queue_name, headers, match_all)
            
            print(f"✅ 头部拓扑创建成功: {exchange_config.name}")
            return True
            
        except Exception as e:
            print(f"❌ 头部拓扑创建失败: {e}")
            return False

4.2 路由策略与模式

4.2.1 路由策略管理器

from abc import ABC, abstractmethod
from typing import Union

class RoutingStrategy(ABC):
    """路由策略基类"""
    
    @abstractmethod
    def route_message(self, exchange_manager, message: Dict[str, Any], 
                     routing_info: Dict[str, Any]) -> bool:
        """路由消息"""
        pass

class DirectRoutingStrategy(RoutingStrategy):
    """直连路由策略"""
    
    def route_message(self, exchange_manager: DirectExchangeManager, 
                     message: Dict[str, Any], routing_info: Dict[str, Any]) -> bool:
        """直连路由"""
        exchange_name = routing_info['exchange_name']
        routing_key = routing_info['routing_key']
        
        return exchange_manager.publish_message(exchange_name, routing_key, message)

class TopicRoutingStrategy(RoutingStrategy):
    """主题路由策略"""
    
    def route_message(self, exchange_manager: TopicExchangeManager,
                     message: Dict[str, Any], routing_info: Dict[str, Any]) -> bool:
        """主题路由"""
        exchange_name = routing_info['exchange_name']
        routing_key = routing_info['routing_key']
        
        return exchange_manager.publish_message(exchange_name, routing_key, message)

class FanoutRoutingStrategy(RoutingStrategy):
    """扇出路由策略"""
    
    def route_message(self, exchange_manager: FanoutExchangeManager,
                     message: Dict[str, Any], routing_info: Dict[str, Any]) -> bool:
        """扇出路由"""
        exchange_name = routing_info['exchange_name']
        
        return exchange_manager.broadcast_message(exchange_name, message)

class HeadersRoutingStrategy(RoutingStrategy):
    """头部路由策略"""
    
    def route_message(self, exchange_manager: HeadersExchangeManager,
                     message: Dict[str, Any], routing_info: Dict[str, Any]) -> bool:
        """头部路由"""
        exchange_name = routing_info['exchange_name']
        headers = routing_info['headers']
        
        return exchange_manager.publish_message_with_headers(exchange_name, message, headers)

class UniversalRoutingManager:
    """通用路由管理器"""
    
    def __init__(self, channel):
        self.channel = channel
        self.direct_manager = DirectExchangeManager(channel)
        self.topic_manager = TopicExchangeManager(channel)
        self.fanout_manager = FanoutExchangeManager(channel)
        self.headers_manager = HeadersExchangeManager(channel)
        
        self.strategies = {
            'direct': DirectRoutingStrategy(),
            'topic': TopicRoutingStrategy(),
            'fanout': FanoutRoutingStrategy(),
            'headers': HeadersRoutingStrategy()
        }
        
        self.managers = {
            'direct': self.direct_manager,
            'topic': self.topic_manager,
            'fanout': self.fanout_manager,
            'headers': self.headers_manager
        }
    
    def route_message(self, exchange_type: str, message: Dict[str, Any], 
                     routing_info: Dict[str, Any]) -> bool:
        """通用消息路由"""
        if exchange_type not in self.strategies:
            print(f"❌ 不支持的交换机类型: {exchange_type}")
            return False
        
        strategy = self.strategies[exchange_type]
        manager = self.managers[exchange_type]
        
        return strategy.route_message(manager, message, routing_info)
    
    def create_topology(self, exchange_type: str, topology_config: Dict[str, Any]) -> bool:
        """创建拓扑结构"""
        if exchange_type == 'direct':
            return self.direct_manager.create_routing_topology(topology_config)
        elif exchange_type == 'topic':
            return self.topic_manager.create_topic_topology(topology_config)
        elif exchange_type == 'fanout':
            return self.fanout_manager.create_fanout_topology(topology_config)
        elif exchange_type == 'headers':
            return self.headers_manager.create_headers_topology(topology_config)
        else:
            print(f"❌ 不支持的交换机类型: {exchange_type}")
            return False

4.2.2 路由模式示例

def routing_examples():
    """路由示例"""
    print("=== RabbitMQ路由示例 ===")
    
    # 连接配置
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(
            host='localhost',
            port=5672,
            credentials=pika.PlainCredentials('admin', 'admin123')
        )
    )
    channel = connection.channel()
    
    try:
        # 创建通用路由管理器
        routing_manager = UniversalRoutingManager(channel)
        
        # 1. 直连路由示例
        print("\n1. 直连路由示例")
        direct_topology = {
            'exchange_name': 'order.direct',
            'durable': True,
            'queues': [
                {
                    'name': 'order.created.queue',
                    'routing_keys': ['order.created'],
                    'durable': True
                },
                {
                    'name': 'order.paid.queue',
                    'routing_keys': ['order.paid'],
                    'durable': True
                },
                {
                    'name': 'order.shipped.queue',
                    'routing_keys': ['order.shipped'],
                    'durable': True
                }
            ]
        }
        
        routing_manager.create_topology('direct', direct_topology)
        
        # 发送直连消息
        direct_messages = [
            {
                'message': {'order_id': '12345', 'user_id': '67890', 'amount': 99.99},
                'routing_info': {'exchange_name': 'order.direct', 'routing_key': 'order.created'}
            },
            {
                'message': {'order_id': '12345', 'payment_id': 'pay_123', 'amount': 99.99},
                'routing_info': {'exchange_name': 'order.direct', 'routing_key': 'order.paid'}
            }
        ]
        
        for msg_data in direct_messages:
            routing_manager.route_message('direct', msg_data['message'], msg_data['routing_info'])
        
        # 2. 主题路由示例
        print("\n2. 主题路由示例")
        topic_topology = {
            'exchange_name': 'log.topic',
            'durable': True,
            'queues': [
                {
                    'name': 'error.log.queue',
                    'routing_patterns': ['*.error.*', '*.critical.*'],
                    'durable': True
                },
                {
                    'name': 'app.log.queue',
                    'routing_patterns': ['app.*'],
                    'durable': True
                },
                {
                    'name': 'all.log.queue',
                    'routing_patterns': ['#'],
                    'durable': True
                }
            ]
        }
        
        routing_manager.create_topology('topic', topic_topology)
        
        # 发送主题消息
        topic_messages = [
            {
                'message': {'level': 'error', 'message': '数据库连接失败'},
                'routing_info': {'exchange_name': 'log.topic', 'routing_key': 'app.error.database'}
            },
            {
                'message': {'level': 'info', 'message': '用户登录成功'},
                'routing_info': {'exchange_name': 'log.topic', 'routing_key': 'app.info.auth'}
            },
            {
                'message': {'level': 'critical', 'message': '系统内存不足'},
                'routing_info': {'exchange_name': 'log.topic', 'routing_key': 'system.critical.memory'}
            }
        ]
        
        for msg_data in topic_messages:
            routing_manager.route_message('topic', msg_data['message'], msg_data['routing_info'])
        
        # 3. 扇出路由示例
        print("\n3. 扇出路由示例")
        fanout_topology = {
            'exchange_name': 'notification.fanout',
            'durable': True,
            'queues': [
                {'name': 'email.notification.queue', 'durable': True},
                {'name': 'sms.notification.queue', 'durable': True},
                {'name': 'push.notification.queue', 'durable': True}
            ]
        }
        
        routing_manager.create_topology('fanout', fanout_topology)
        
        # 发送扇出消息
        fanout_message = {
            'message': {
                'type': 'user_registered',
                'user_id': '12345',
                'email': 'user@example.com',
                'phone': '+1234567890'
            },
            'routing_info': {'exchange_name': 'notification.fanout'}
        }
        
        routing_manager.route_message('fanout', fanout_message['message'], fanout_message['routing_info'])
        
        # 4. 头部路由示例
        print("\n4. 头部路由示例")
        headers_topology = {
            'exchange_name': 'task.headers',
            'durable': True,
            'queues': [
                {
                    'name': 'urgent.task.queue',
                    'headers_bindings': [
                        {'headers': {'priority': 'high', 'urgent': True}, 'match_all': True}
                    ],
                    'durable': True
                },
                {
                    'name': 'image.task.queue',
                    'headers_bindings': [
                        {'headers': {'type': 'image'}, 'match_all': False}
                    ],
                    'durable': True
                },
                {
                    'name': 'video.task.queue',
                    'headers_bindings': [
                        {'headers': {'type': 'video'}, 'match_all': False}
                    ],
                    'durable': True
                }
            ]
        }
        
        routing_manager.create_topology('headers', headers_topology)
        
        # 发送头部消息
        headers_messages = [
            {
                'message': {'task_id': '001', 'content': '紧急图片处理任务'},
                'routing_info': {
                    'exchange_name': 'task.headers',
                    'headers': {'priority': 'high', 'urgent': True, 'type': 'image'}
                }
            },
            {
                'message': {'task_id': '002', 'content': '普通视频处理任务'},
                'routing_info': {
                    'exchange_name': 'task.headers',
                    'headers': {'priority': 'normal', 'type': 'video'}
                }
            }
        ]
        
        for msg_data in headers_messages:
            routing_manager.route_message('headers', msg_data['message'], msg_data['routing_info'])
        
        print("\n✅ 所有路由示例执行完成")
        
    except Exception as e:
        print(f"❌ 路由示例执行失败: {e}")
    finally:
        connection.close()

if __name__ == "__main__":
    routing_examples()

4.3 路由性能优化

4.3.1 路由性能监控器

import time
from collections import defaultdict, deque
from threading import Lock
from typing import Deque

@dataclass
class RoutingMetrics:
    """路由指标"""
    total_messages: int = 0
    successful_routes: int = 0
    failed_routes: int = 0
    average_latency: float = 0.0
    peak_latency: float = 0.0
    throughput_per_second: float = 0.0
    last_updated: float = 0.0

class RoutingPerformanceMonitor:
    """路由性能监控器"""
    
    def __init__(self, window_size: int = 1000):
        self.window_size = window_size
        self.metrics = defaultdict(RoutingMetrics)
        self.latency_history: Dict[str, Deque[float]] = defaultdict(lambda: deque(maxlen=window_size))
        self.throughput_history: Dict[str, Deque[float]] = defaultdict(lambda: deque(maxlen=60))  # 60秒窗口
        self.lock = Lock()
        self.start_time = time.time()
    
    def record_routing_start(self, exchange_name: str) -> float:
        """记录路由开始时间"""
        return time.time()
    
    def record_routing_end(self, exchange_name: str, start_time: float, success: bool = True):
        """记录路由结束"""
        end_time = time.time()
        latency = end_time - start_time
        
        with self.lock:
            metrics = self.metrics[exchange_name]
            metrics.total_messages += 1
            
            if success:
                metrics.successful_routes += 1
            else:
                metrics.failed_routes += 1
            
            # 更新延迟统计
            self.latency_history[exchange_name].append(latency)
            if latency > metrics.peak_latency:
                metrics.peak_latency = latency
            
            # 计算平均延迟
            latencies = list(self.latency_history[exchange_name])
            metrics.average_latency = sum(latencies) / len(latencies) if latencies else 0.0
            
            # 更新吞吐量
            current_time = time.time()
            self.throughput_history[exchange_name].append(current_time)
            
            # 计算每秒吞吐量
            recent_timestamps = [t for t in self.throughput_history[exchange_name] 
                               if current_time - t <= 1.0]
            metrics.throughput_per_second = len(recent_timestamps)
            
            metrics.last_updated = current_time
    
    def get_metrics(self, exchange_name: str) -> RoutingMetrics:
        """获取路由指标"""
        with self.lock:
            return self.metrics[exchange_name]
    
    def get_all_metrics(self) -> Dict[str, RoutingMetrics]:
        """获取所有路由指标"""
        with self.lock:
            return dict(self.metrics)
    
    def reset_metrics(self, exchange_name: str = None):
        """重置指标"""
        with self.lock:
            if exchange_name:
                self.metrics[exchange_name] = RoutingMetrics()
                self.latency_history[exchange_name].clear()
                self.throughput_history[exchange_name].clear()
            else:
                self.metrics.clear()
                self.latency_history.clear()
                self.throughput_history.clear()
    
    def print_metrics_report(self):
        """打印指标报告"""
        print("\n=== 路由性能报告 ===")
        
        with self.lock:
            for exchange_name, metrics in self.metrics.items():
                success_rate = (metrics.successful_routes / metrics.total_messages * 100) if metrics.total_messages > 0 else 0
                
                print(f"\n交换机: {exchange_name}")
                print(f"  总消息数: {metrics.total_messages}")
                print(f"  成功路由: {metrics.successful_routes}")
                print(f"  失败路由: {metrics.failed_routes}")
                print(f"  成功率: {success_rate:.2f}%")
                print(f"  平均延迟: {metrics.average_latency*1000:.2f}ms")
                print(f"  峰值延迟: {metrics.peak_latency*1000:.2f}ms")
                print(f"  吞吐量: {metrics.throughput_per_second:.2f} msg/s")
                print(f"  最后更新: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(metrics.last_updated))}")

class OptimizedRoutingManager(UniversalRoutingManager):
    """优化的路由管理器"""
    
    def __init__(self, channel):
        super().__init__(channel)
        self.performance_monitor = RoutingPerformanceMonitor()
        self.connection_pool = {}
        self.channel_pool = {}
    
    def route_message_with_monitoring(self, exchange_type: str, message: Dict[str, Any], 
                                    routing_info: Dict[str, Any]) -> bool:
        """带监控的消息路由"""
        exchange_name = routing_info.get('exchange_name', 'unknown')
        start_time = self.performance_monitor.record_routing_start(exchange_name)
        
        try:
            success = self.route_message(exchange_type, message, routing_info)
            self.performance_monitor.record_routing_end(exchange_name, start_time, success)
            return success
        except Exception as e:
            self.performance_monitor.record_routing_end(exchange_name, start_time, False)
            print(f"❌ 路由失败: {e}")
            return False
    
    def batch_route_messages(self, messages: List[Dict[str, Any]]) -> Dict[str, int]:
        """批量路由消息"""
        results = {'success': 0, 'failed': 0}
        
        for msg_data in messages:
            exchange_type = msg_data['exchange_type']
            message = msg_data['message']
            routing_info = msg_data['routing_info']
            
            if self.route_message_with_monitoring(exchange_type, message, routing_info):
                results['success'] += 1
            else:
                results['failed'] += 1
        
        return results
    
    def get_performance_report(self) -> Dict[str, Any]:
        """获取性能报告"""
        return {
            'metrics': self.performance_monitor.get_all_metrics(),
            'timestamp': time.time()
        }

4.3.2 路由缓存优化

from functools import lru_cache
from threading import RLock

class RoutingCache:
    """路由缓存"""
    
    def __init__(self, max_size: int = 1000):
        self.max_size = max_size
        self.cache = {}
        self.access_count = defaultdict(int)
        self.lock = RLock()
    
    def get_cached_route(self, cache_key: str) -> Optional[List[str]]:
        """获取缓存的路由"""
        with self.lock:
            if cache_key in self.cache:
                self.access_count[cache_key] += 1
                return self.cache[cache_key]
            return None
    
    def cache_route(self, cache_key: str, matched_queues: List[str]):
        """缓存路由结果"""
        with self.lock:
            if len(self.cache) >= self.max_size:
                # LRU淘汰策略
                least_used_key = min(self.access_count.keys(), key=lambda k: self.access_count[k])
                del self.cache[least_used_key]
                del self.access_count[least_used_key]
            
            self.cache[cache_key] = matched_queues
            self.access_count[cache_key] = 1
    
    def clear_cache(self):
        """清空缓存"""
        with self.lock:
            self.cache.clear()
            self.access_count.clear()
    
    def get_cache_stats(self) -> Dict[str, Any]:
        """获取缓存统计"""
        with self.lock:
            return {
                'cache_size': len(self.cache),
                'max_size': self.max_size,
                'hit_rate': sum(self.access_count.values()) / len(self.access_count) if self.access_count else 0
            }

class CachedTopicExchangeManager(TopicExchangeManager):
    """带缓存的主题交换机管理器"""
    
    def __init__(self, channel):
        super().__init__(channel)
        self.routing_cache = RoutingCache()
    
    def test_routing_with_cache(self, exchange_name: str, routing_key: str) -> List[str]:
        """带缓存的路由测试"""
        cache_key = f"{exchange_name}:{routing_key}"
        
        # 尝试从缓存获取
        cached_result = self.routing_cache.get_cached_route(cache_key)
        if cached_result is not None:
            return cached_result
        
        # 缓存未命中,计算路由
        matched_queues = self.test_routing(exchange_name, routing_key)
        
        # 缓存结果
        self.routing_cache.cache_route(cache_key, matched_queues)
        
        return matched_queues
    
    def publish_message(self, exchange_name: str, routing_key: str, message: Dict[str, Any],
                       properties: Optional[pika.BasicProperties] = None) -> bool:
        """发布消息(使用缓存)"""
        try:
            # 使用缓存的路由测试
            matched_queues = self.test_routing_with_cache(exchange_name, routing_key)
            print(f"📍 路由键 '{routing_key}' 将匹配队列: {matched_queues} (缓存)")
            
            body = json.dumps(message, ensure_ascii=False)
            
            self.channel.basic_publish(
                exchange=exchange_name,
                routing_key=routing_key,
                body=body.encode('utf-8'),
                properties=properties or pika.BasicProperties(
                    delivery_mode=2,
                    timestamp=int(time.time())
                )
            )
            
            print(f"✅ 主题消息发布成功: {exchange_name} (路由键: {routing_key})")
            return True
            
        except Exception as e:
            print(f"❌ 主题消息发布失败: {e}")
            return False

4.4 路由监控与诊断

4.4.1 路由诊断工具

class RoutingDiagnostics:
    """路由诊断工具"""
    
    def __init__(self, routing_manager: UniversalRoutingManager):
        self.routing_manager = routing_manager
        self.diagnostic_history = []
    
    def diagnose_routing(self, exchange_type: str, exchange_name: str, 
                        routing_key: str = None, headers: Dict[str, Any] = None) -> Dict[str, Any]:
        """诊断路由配置"""
        diagnosis = {
            'exchange_type': exchange_type,
            'exchange_name': exchange_name,
            'routing_key': routing_key,
            'headers': headers,
            'timestamp': time.time(),
            'issues': [],
            'recommendations': [],
            'matched_queues': []
        }
        
        try:
            if exchange_type == 'direct':
                manager = self.routing_manager.direct_manager
                if exchange_name in manager.bindings:
                    bindings = manager.bindings[exchange_name]
                    matched_queues = [b['queue'] for b in bindings if b['routing_key'] == routing_key]
                    diagnosis['matched_queues'] = matched_queues
                    
                    if not matched_queues:
                        diagnosis['issues'].append(f"没有队列绑定到路由键 '{routing_key}'")
                        diagnosis['recommendations'].append("检查队列绑定配置")
                else:
                    diagnosis['issues'].append(f"交换机 '{exchange_name}' 不存在")
            
            elif exchange_type == 'topic':
                manager = self.routing_manager.topic_manager
                if exchange_name in manager.routing_patterns:
                    matched_queues = manager.test_routing(exchange_name, routing_key)
                    diagnosis['matched_queues'] = matched_queues
                    
                    if not matched_queues:
                        diagnosis['issues'].append(f"路由键 '{routing_key}' 没有匹配任何模式")
                        diagnosis['recommendations'].append("检查路由模式配置")
                else:
                    diagnosis['issues'].append(f"交换机 '{exchange_name}' 不存在")
            
            elif exchange_type == 'fanout':
                manager = self.routing_manager.fanout_manager
                if exchange_name in manager.bindings:
                    matched_queues = manager.get_bound_queues(exchange_name)
                    diagnosis['matched_queues'] = matched_queues
                    
                    if not matched_queues:
                        diagnosis['issues'].append("没有队列绑定到扇出交换机")
                        diagnosis['recommendations'].append("添加队列绑定")
                else:
                    diagnosis['issues'].append(f"交换机 '{exchange_name}' 不存在")
            
            elif exchange_type == 'headers':
                manager = self.routing_manager.headers_manager
                if headers and exchange_name in manager.bindings:
                    matched_queues = manager.test_header_matching(exchange_name, headers)
                    diagnosis['matched_queues'] = matched_queues
                    
                    if not matched_queues:
                        diagnosis['issues'].append(f"头部 {headers} 没有匹配任何绑定")
                        diagnosis['recommendations'].append("检查头部匹配规则")
                else:
                    if not headers:
                        diagnosis['issues'].append("头部交换机需要提供头部信息")
                    else:
                        diagnosis['issues'].append(f"交换机 '{exchange_name}' 不存在")
            
            # 通用检查
            if len(diagnosis['matched_queues']) > 10:
                diagnosis['recommendations'].append("匹配的队列过多,考虑优化路由规则")
            
            if not diagnosis['issues']:
                diagnosis['status'] = 'healthy'
            else:
                diagnosis['status'] = 'issues_found'
        
        except Exception as e:
            diagnosis['issues'].append(f"诊断过程中发生错误: {e}")
            diagnosis['status'] = 'error'
        
        self.diagnostic_history.append(diagnosis)
        return diagnosis
    
    def generate_routing_report(self) -> str:
        """生成路由报告"""
        report = ["\n=== 路由诊断报告 ==="]
        
        for diagnosis in self.diagnostic_history[-10:]:  # 最近10次诊断
            report.append(f"\n时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(diagnosis['timestamp']))}")
            report.append(f"交换机: {diagnosis['exchange_name']} ({diagnosis['exchange_type']})")
            
            if diagnosis['routing_key']:
                report.append(f"路由键: {diagnosis['routing_key']}")
            if diagnosis['headers']:
                report.append(f"头部: {diagnosis['headers']}")
            
            report.append(f"状态: {diagnosis['status']}")
            report.append(f"匹配队列: {diagnosis['matched_queues']}")
            
            if diagnosis['issues']:
                report.append("问题:")
                for issue in diagnosis['issues']:
                    report.append(f"  - {issue}")
            
            if diagnosis['recommendations']:
                report.append("建议:")
                for rec in diagnosis['recommendations']:
                    report.append(f"  - {rec}")
            
            report.append("-" * 50)
        
        return "\n".join(report)
    
    def clear_history(self):
        """清空诊断历史"""
        self.diagnostic_history.clear()

4.4.2 路由监控示例

def routing_monitoring_example():
    """路由监控示例"""
    print("=== 路由监控示例 ===")
    
    # 连接配置
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(
            host='localhost',
            port=5672,
            credentials=pika.PlainCredentials('admin', 'admin123')
        )
    )
    channel = connection.channel()
    
    try:
        # 创建优化的路由管理器
        routing_manager = OptimizedRoutingManager(channel)
        
        # 创建诊断工具
        diagnostics = RoutingDiagnostics(routing_manager)
        
        # 创建测试拓扑
        topic_topology = {
            'exchange_name': 'monitor.topic',
            'durable': True,
            'queues': [
                {
                    'name': 'error.monitor.queue',
                    'routing_patterns': ['*.error.*'],
                    'durable': True
                },
                {
                    'name': 'info.monitor.queue',
                    'routing_patterns': ['*.info.*'],
                    'durable': True
                }
            ]
        }
        
        routing_manager.create_topology('topic', topic_topology)
        
        # 测试消息路由
        test_messages = [
            {
                'exchange_type': 'topic',
                'message': {'level': 'error', 'message': '测试错误消息'},
                'routing_info': {'exchange_name': 'monitor.topic', 'routing_key': 'app.error.test'}
            },
            {
                'exchange_type': 'topic',
                'message': {'level': 'info', 'message': '测试信息消息'},
                'routing_info': {'exchange_name': 'monitor.topic', 'routing_key': 'app.info.test'}
            },
            {
                'exchange_type': 'topic',
                'message': {'level': 'debug', 'message': '测试调试消息'},
                'routing_info': {'exchange_name': 'monitor.topic', 'routing_key': 'app.debug.test'}
            }
        ]
        
        # 批量路由消息
        print("\n批量路由消息...")
        results = routing_manager.batch_route_messages(test_messages)
        print(f"路由结果: 成功 {results['success']}, 失败 {results['failed']}")
        
        # 性能监控
        print("\n性能监控报告:")
        routing_manager.performance_monitor.print_metrics_report()
        
        # 路由诊断
        print("\n路由诊断:")
        
        # 诊断正常路由
        diagnosis1 = diagnostics.diagnose_routing('topic', 'monitor.topic', 'app.error.database')
        print(f"诊断结果1: {diagnosis1['status']}, 匹配队列: {diagnosis1['matched_queues']}")
        
        # 诊断异常路由
        diagnosis2 = diagnostics.diagnose_routing('topic', 'monitor.topic', 'system.warning.memory')
        print(f"诊断结果2: {diagnosis2['status']}, 问题: {diagnosis2['issues']}")
        
        # 生成诊断报告
        print(diagnostics.generate_routing_report())
        
        print("\n✅ 路由监控示例完成")
        
    except Exception as e:
        print(f"❌ 路由监控示例失败: {e}")
    finally:
        connection.close()

if __name__ == "__main__":
    routing_examples()
    routing_monitoring_example()

4.5 本章总结

4.5.1 核心知识点

  1. 交换机类型

    • Direct Exchange:基于精确路由键匹配
    • Topic Exchange:基于模式匹配(* 和 # 通配符)
    • Fanout Exchange:广播到所有绑定队列
    • Headers Exchange:基于消息头部属性匹配
  2. 路由策略

    • 策略模式设计:不同交换机类型使用不同路由策略
    • 通用路由管理器:统一管理多种交换机类型
    • 路由拓扑管理:自动化创建和配置路由结构
  3. 性能优化

    • 路由缓存:缓存路由计算结果,提高性能
    • 性能监控:实时监控路由延迟和吞吐量
    • 批量处理:批量路由消息,减少网络开销
  4. 监控诊断

    • 路由诊断工具:自动检测路由配置问题
    • 性能指标收集:延迟、吞吐量、成功率统计
    • 问题排查:提供具体的问题描述和解决建议

4.5.2 最佳实践

  1. 交换机选择

    • 简单路由使用 Direct Exchange
    • 复杂模式匹配使用 Topic Exchange
    • 广播场景使用 Fanout Exchange
    • 复杂条件匹配使用 Headers Exchange
  2. 路由设计

    • 合理设计路由键命名规范
    • 避免过度复杂的路由模式
    • 考虑路由的可扩展性和维护性
  3. 性能优化

    • 使用路由缓存减少计算开销
    • 监控路由性能指标
    • 合理配置队列和交换机参数
  4. 运维监控

    • 定期检查路由配置
    • 监控路由性能指标
    • 建立路由问题排查流程

4.5.3 练习题

  1. 基础练习

    • 实现一个日志系统的路由配置,支持按级别和模块分发
    • 设计一个订单处理系统的路由策略
    • 创建一个通知系统的扇出路由
  2. 进阶练习

    • 实现一个智能路由系统,根据消息内容动态选择路由策略
    • 设计一个高性能的路由缓存系统
    • 创建一个路由监控和告警系统
  3. 实战练习

    • 构建一个微服务间的事件路由系统
    • 实现一个支持多租户的消息路由
    • 设计一个容错的路由系统,支持路由失败重试

通过本章学习,你应该掌握了 RabbitMQ 中各种交换机类型的使用方法、路由策略的设计原则,以及路由性能优化和监控的技巧。这些知识将为构建高效、可靠的消息路由系统奠定坚实基础。 “`