4.1 交换机类型详解
4.1.1 Direct Exchange(直连交换机)
from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
import pika
import json
import time
class RoutingStrategy(Enum):
"""路由策略"""
EXACT_MATCH = "exact" # 精确匹配
PREFIX_MATCH = "prefix" # 前缀匹配
SUFFIX_MATCH = "suffix" # 后缀匹配
PATTERN_MATCH = "pattern" # 模式匹配
@dataclass
class DirectExchangeConfig:
"""直连交换机配置"""
name: str
durable: bool = True
auto_delete: bool = False
internal: bool = False
arguments: Optional[Dict[str, Any]] = None
class DirectExchangeManager:
"""直连交换机管理器"""
def __init__(self, channel):
self.channel = channel
self.exchanges = {}
self.bindings = {}
def declare_exchange(self, config: DirectExchangeConfig) -> bool:
"""声明直连交换机"""
try:
self.channel.exchange_declare(
exchange=config.name,
exchange_type='direct',
durable=config.durable,
auto_delete=config.auto_delete,
internal=config.internal,
arguments=config.arguments or {}
)
self.exchanges[config.name] = config
print(f"✅ 直连交换机声明成功: {config.name}")
return True
except Exception as e:
print(f"❌ 直连交换机声明失败: {e}")
return False
def bind_queue(self, exchange_name: str, queue_name: str, routing_key: str) -> bool:
"""绑定队列到交换机"""
try:
self.channel.queue_bind(
exchange=exchange_name,
queue=queue_name,
routing_key=routing_key
)
# 记录绑定关系
if exchange_name not in self.bindings:
self.bindings[exchange_name] = []
self.bindings[exchange_name].append({
'queue': queue_name,
'routing_key': routing_key,
'type': 'direct'
})
print(f"✅ 队列绑定成功: {queue_name} -> {exchange_name} (路由键: {routing_key})")
return True
except Exception as e:
print(f"❌ 队列绑定失败: {e}")
return False
def publish_message(self, exchange_name: str, routing_key: str, message: Dict[str, Any],
properties: Optional[pika.BasicProperties] = None) -> bool:
"""发布消息到直连交换机"""
try:
body = json.dumps(message, ensure_ascii=False)
self.channel.basic_publish(
exchange=exchange_name,
routing_key=routing_key,
body=body.encode('utf-8'),
properties=properties or pika.BasicProperties(
delivery_mode=2, # 持久化
timestamp=int(time.time())
)
)
print(f"✅ 消息发布成功: {exchange_name} (路由键: {routing_key})")
return True
except Exception as e:
print(f"❌ 消息发布失败: {e}")
return False
def get_bindings(self, exchange_name: str) -> List[Dict[str, str]]:
"""获取交换机绑定关系"""
return self.bindings.get(exchange_name, [])
def create_routing_topology(self, topology_config: Dict[str, Any]) -> bool:
"""创建路由拓扑"""
try:
# 声明交换机
exchange_config = DirectExchangeConfig(
name=topology_config['exchange_name'],
durable=topology_config.get('durable', True)
)
if not self.declare_exchange(exchange_config):
return False
# 声明队列并绑定
for queue_config in topology_config.get('queues', []):
queue_name = queue_config['name']
routing_keys = queue_config.get('routing_keys', [])
# 声明队列
self.channel.queue_declare(
queue=queue_name,
durable=queue_config.get('durable', True)
)
# 绑定路由键
for routing_key in routing_keys:
self.bind_queue(exchange_config.name, queue_name, routing_key)
print(f"✅ 路由拓扑创建成功: {exchange_config.name}")
return True
except Exception as e:
print(f"❌ 路由拓扑创建失败: {e}")
return False
4.1.2 Topic Exchange(主题交换机)
import re
from typing import Pattern
@dataclass
class TopicExchangeConfig:
"""主题交换机配置"""
name: str
durable: bool = True
auto_delete: bool = False
internal: bool = False
arguments: Optional[Dict[str, Any]] = None
class TopicExchangeManager:
"""主题交换机管理器"""
def __init__(self, channel):
self.channel = channel
self.exchanges = {}
self.bindings = {}
self.routing_patterns = {}
def declare_exchange(self, config: TopicExchangeConfig) -> bool:
"""声明主题交换机"""
try:
self.channel.exchange_declare(
exchange=config.name,
exchange_type='topic',
durable=config.durable,
auto_delete=config.auto_delete,
internal=config.internal,
arguments=config.arguments or {}
)
self.exchanges[config.name] = config
print(f"✅ 主题交换机声明成功: {config.name}")
return True
except Exception as e:
print(f"❌ 主题交换机声明失败: {e}")
return False
def bind_queue_with_pattern(self, exchange_name: str, queue_name: str,
routing_pattern: str) -> bool:
"""使用模式绑定队列"""
try:
self.channel.queue_bind(
exchange=exchange_name,
queue=queue_name,
routing_key=routing_pattern
)
# 记录绑定关系
if exchange_name not in self.bindings:
self.bindings[exchange_name] = []
self.bindings[exchange_name].append({
'queue': queue_name,
'routing_pattern': routing_pattern,
'type': 'topic'
})
# 编译正则表达式用于本地匹配测试
regex_pattern = self._convert_topic_to_regex(routing_pattern)
if exchange_name not in self.routing_patterns:
self.routing_patterns[exchange_name] = []
self.routing_patterns[exchange_name].append({
'queue': queue_name,
'pattern': routing_pattern,
'regex': re.compile(regex_pattern)
})
print(f"✅ 主题队列绑定成功: {queue_name} -> {exchange_name} (模式: {routing_pattern})")
return True
except Exception as e:
print(f"❌ 主题队列绑定失败: {e}")
return False
def _convert_topic_to_regex(self, topic_pattern: str) -> str:
"""将主题模式转换为正则表达式"""
# 转义特殊字符
pattern = re.escape(topic_pattern)
# 替换通配符
pattern = pattern.replace(r'\*', r'[^.]*') # * 匹配一个单词
pattern = pattern.replace(r'\#', r'.*') # # 匹配零个或多个单词
return f'^{pattern}$'
def test_routing(self, exchange_name: str, routing_key: str) -> List[str]:
"""测试路由匹配"""
matched_queues = []
if exchange_name in self.routing_patterns:
for binding in self.routing_patterns[exchange_name]:
if binding['regex'].match(routing_key):
matched_queues.append(binding['queue'])
return matched_queues
def publish_message(self, exchange_name: str, routing_key: str, message: Dict[str, Any],
properties: Optional[pika.BasicProperties] = None) -> bool:
"""发布消息到主题交换机"""
try:
# 测试路由匹配
matched_queues = self.test_routing(exchange_name, routing_key)
print(f"📍 路由键 '{routing_key}' 将匹配队列: {matched_queues}")
body = json.dumps(message, ensure_ascii=False)
self.channel.basic_publish(
exchange=exchange_name,
routing_key=routing_key,
body=body.encode('utf-8'),
properties=properties or pika.BasicProperties(
delivery_mode=2,
timestamp=int(time.time())
)
)
print(f"✅ 主题消息发布成功: {exchange_name} (路由键: {routing_key})")
return True
except Exception as e:
print(f"❌ 主题消息发布失败: {e}")
return False
def create_topic_topology(self, topology_config: Dict[str, Any]) -> bool:
"""创建主题拓扑"""
try:
# 声明交换机
exchange_config = TopicExchangeConfig(
name=topology_config['exchange_name'],
durable=topology_config.get('durable', True)
)
if not self.declare_exchange(exchange_config):
return False
# 声明队列并绑定
for queue_config in topology_config.get('queues', []):
queue_name = queue_config['name']
routing_patterns = queue_config.get('routing_patterns', [])
# 声明队列
self.channel.queue_declare(
queue=queue_name,
durable=queue_config.get('durable', True)
)
# 绑定路由模式
for pattern in routing_patterns:
self.bind_queue_with_pattern(exchange_config.name, queue_name, pattern)
print(f"✅ 主题拓扑创建成功: {exchange_config.name}")
return True
except Exception as e:
print(f"❌ 主题拓扑创建失败: {e}")
return False
4.1.3 Fanout Exchange(扇出交换机)
@dataclass
class FanoutExchangeConfig:
"""扇出交换机配置"""
name: str
durable: bool = True
auto_delete: bool = False
internal: bool = False
arguments: Optional[Dict[str, Any]] = None
class FanoutExchangeManager:
"""扇出交换机管理器"""
def __init__(self, channel):
self.channel = channel
self.exchanges = {}
self.bindings = {}
def declare_exchange(self, config: FanoutExchangeConfig) -> bool:
"""声明扇出交换机"""
try:
self.channel.exchange_declare(
exchange=config.name,
exchange_type='fanout',
durable=config.durable,
auto_delete=config.auto_delete,
internal=config.internal,
arguments=config.arguments or {}
)
self.exchanges[config.name] = config
print(f"✅ 扇出交换机声明成功: {config.name}")
return True
except Exception as e:
print(f"❌ 扇出交换机声明失败: {e}")
return False
def bind_queue(self, exchange_name: str, queue_name: str) -> bool:
"""绑定队列到扇出交换机"""
try:
# 扇出交换机忽略路由键
self.channel.queue_bind(
exchange=exchange_name,
queue=queue_name,
routing_key='' # 扇出交换机不使用路由键
)
# 记录绑定关系
if exchange_name not in self.bindings:
self.bindings[exchange_name] = []
self.bindings[exchange_name].append({
'queue': queue_name,
'type': 'fanout'
})
print(f"✅ 扇出队列绑定成功: {queue_name} -> {exchange_name}")
return True
except Exception as e:
print(f"❌ 扇出队列绑定失败: {e}")
return False
def broadcast_message(self, exchange_name: str, message: Dict[str, Any],
properties: Optional[pika.BasicProperties] = None) -> bool:
"""广播消息到所有绑定的队列"""
try:
# 获取绑定的队列数量
bound_queues = self.bindings.get(exchange_name, [])
queue_count = len(bound_queues)
print(f"📢 广播消息到 {queue_count} 个队列")
body = json.dumps(message, ensure_ascii=False)
self.channel.basic_publish(
exchange=exchange_name,
routing_key='', # 扇出交换机忽略路由键
body=body.encode('utf-8'),
properties=properties or pika.BasicProperties(
delivery_mode=2,
timestamp=int(time.time())
)
)
print(f"✅ 扇出消息广播成功: {exchange_name}")
return True
except Exception as e:
print(f"❌ 扇出消息广播失败: {e}")
return False
def create_fanout_topology(self, topology_config: Dict[str, Any]) -> bool:
"""创建扇出拓扑"""
try:
# 声明交换机
exchange_config = FanoutExchangeConfig(
name=topology_config['exchange_name'],
durable=topology_config.get('durable', True)
)
if not self.declare_exchange(exchange_config):
return False
# 声明队列并绑定
for queue_config in topology_config.get('queues', []):
queue_name = queue_config['name']
# 声明队列
self.channel.queue_declare(
queue=queue_name,
durable=queue_config.get('durable', True)
)
# 绑定到扇出交换机
self.bind_queue(exchange_config.name, queue_name)
print(f"✅ 扇出拓扑创建成功: {exchange_config.name}")
return True
except Exception as e:
print(f"❌ 扇出拓扑创建失败: {e}")
return False
def get_bound_queues(self, exchange_name: str) -> List[str]:
"""获取绑定的队列列表"""
bindings = self.bindings.get(exchange_name, [])
return [binding['queue'] for binding in bindings]
4.1.4 Headers Exchange(头部交换机)
@dataclass
class HeadersExchangeConfig:
"""头部交换机配置"""
name: str
durable: bool = True
auto_delete: bool = False
internal: bool = False
arguments: Optional[Dict[str, Any]] = None
class HeadersExchangeManager:
"""头部交换机管理器"""
def __init__(self, channel):
self.channel = channel
self.exchanges = {}
self.bindings = {}
def declare_exchange(self, config: HeadersExchangeConfig) -> bool:
"""声明头部交换机"""
try:
self.channel.exchange_declare(
exchange=config.name,
exchange_type='headers',
durable=config.durable,
auto_delete=config.auto_delete,
internal=config.internal,
arguments=config.arguments or {}
)
self.exchanges[config.name] = config
print(f"✅ 头部交换机声明成功: {config.name}")
return True
except Exception as e:
print(f"❌ 头部交换机声明失败: {e}")
return False
def bind_queue_with_headers(self, exchange_name: str, queue_name: str,
headers: Dict[str, Any], match_all: bool = True) -> bool:
"""使用头部匹配绑定队列"""
try:
# 设置匹配模式
arguments = headers.copy()
arguments['x-match'] = 'all' if match_all else 'any'
self.channel.queue_bind(
exchange=exchange_name,
queue=queue_name,
routing_key='', # 头部交换机不使用路由键
arguments=arguments
)
# 记录绑定关系
if exchange_name not in self.bindings:
self.bindings[exchange_name] = []
self.bindings[exchange_name].append({
'queue': queue_name,
'headers': headers,
'match_all': match_all,
'type': 'headers'
})
match_mode = "全部匹配" if match_all else "任意匹配"
print(f"✅ 头部队列绑定成功: {queue_name} -> {exchange_name} ({match_mode}: {headers})")
return True
except Exception as e:
print(f"❌ 头部队列绑定失败: {e}")
return False
def publish_message_with_headers(self, exchange_name: str, message: Dict[str, Any],
headers: Dict[str, Any],
properties: Optional[pika.BasicProperties] = None) -> bool:
"""发布带头部的消息"""
try:
# 测试头部匹配
matched_queues = self.test_header_matching(exchange_name, headers)
print(f"📍 头部 {headers} 将匹配队列: {matched_queues}")
body = json.dumps(message, ensure_ascii=False)
# 设置消息属性,包含头部
if properties is None:
properties = pika.BasicProperties()
properties.headers = headers
properties.delivery_mode = 2
properties.timestamp = int(time.time())
self.channel.basic_publish(
exchange=exchange_name,
routing_key='', # 头部交换机不使用路由键
body=body.encode('utf-8'),
properties=properties
)
print(f"✅ 头部消息发布成功: {exchange_name} (头部: {headers})")
return True
except Exception as e:
print(f"❌ 头部消息发布失败: {e}")
return False
def test_header_matching(self, exchange_name: str, message_headers: Dict[str, Any]) -> List[str]:
"""测试头部匹配"""
matched_queues = []
if exchange_name in self.bindings:
for binding in self.bindings[exchange_name]:
if self._match_headers(binding['headers'], message_headers, binding['match_all']):
matched_queues.append(binding['queue'])
return matched_queues
def _match_headers(self, binding_headers: Dict[str, Any],
message_headers: Dict[str, Any], match_all: bool) -> bool:
"""匹配头部"""
if not binding_headers:
return True
if match_all:
# 全部匹配:绑定的所有头部都必须在消息头部中存在且值相等
for key, value in binding_headers.items():
if key not in message_headers or message_headers[key] != value:
return False
return True
else:
# 任意匹配:绑定的任意一个头部在消息头部中存在且值相等即可
for key, value in binding_headers.items():
if key in message_headers and message_headers[key] == value:
return True
return False
def create_headers_topology(self, topology_config: Dict[str, Any]) -> bool:
"""创建头部拓扑"""
try:
# 声明交换机
exchange_config = HeadersExchangeConfig(
name=topology_config['exchange_name'],
durable=topology_config.get('durable', True)
)
if not self.declare_exchange(exchange_config):
return False
# 声明队列并绑定
for queue_config in topology_config.get('queues', []):
queue_name = queue_config['name']
headers_bindings = queue_config.get('headers_bindings', [])
# 声明队列
self.channel.queue_declare(
queue=queue_name,
durable=queue_config.get('durable', True)
)
# 绑定头部匹配
for binding in headers_bindings:
headers = binding.get('headers', {})
match_all = binding.get('match_all', True)
self.bind_queue_with_headers(exchange_config.name, queue_name, headers, match_all)
print(f"✅ 头部拓扑创建成功: {exchange_config.name}")
return True
except Exception as e:
print(f"❌ 头部拓扑创建失败: {e}")
return False
4.2 路由策略与模式
4.2.1 路由策略管理器
from abc import ABC, abstractmethod
from typing import Union
class RoutingStrategy(ABC):
"""路由策略基类"""
@abstractmethod
def route_message(self, exchange_manager, message: Dict[str, Any],
routing_info: Dict[str, Any]) -> bool:
"""路由消息"""
pass
class DirectRoutingStrategy(RoutingStrategy):
"""直连路由策略"""
def route_message(self, exchange_manager: DirectExchangeManager,
message: Dict[str, Any], routing_info: Dict[str, Any]) -> bool:
"""直连路由"""
exchange_name = routing_info['exchange_name']
routing_key = routing_info['routing_key']
return exchange_manager.publish_message(exchange_name, routing_key, message)
class TopicRoutingStrategy(RoutingStrategy):
"""主题路由策略"""
def route_message(self, exchange_manager: TopicExchangeManager,
message: Dict[str, Any], routing_info: Dict[str, Any]) -> bool:
"""主题路由"""
exchange_name = routing_info['exchange_name']
routing_key = routing_info['routing_key']
return exchange_manager.publish_message(exchange_name, routing_key, message)
class FanoutRoutingStrategy(RoutingStrategy):
"""扇出路由策略"""
def route_message(self, exchange_manager: FanoutExchangeManager,
message: Dict[str, Any], routing_info: Dict[str, Any]) -> bool:
"""扇出路由"""
exchange_name = routing_info['exchange_name']
return exchange_manager.broadcast_message(exchange_name, message)
class HeadersRoutingStrategy(RoutingStrategy):
"""头部路由策略"""
def route_message(self, exchange_manager: HeadersExchangeManager,
message: Dict[str, Any], routing_info: Dict[str, Any]) -> bool:
"""头部路由"""
exchange_name = routing_info['exchange_name']
headers = routing_info['headers']
return exchange_manager.publish_message_with_headers(exchange_name, message, headers)
class UniversalRoutingManager:
"""通用路由管理器"""
def __init__(self, channel):
self.channel = channel
self.direct_manager = DirectExchangeManager(channel)
self.topic_manager = TopicExchangeManager(channel)
self.fanout_manager = FanoutExchangeManager(channel)
self.headers_manager = HeadersExchangeManager(channel)
self.strategies = {
'direct': DirectRoutingStrategy(),
'topic': TopicRoutingStrategy(),
'fanout': FanoutRoutingStrategy(),
'headers': HeadersRoutingStrategy()
}
self.managers = {
'direct': self.direct_manager,
'topic': self.topic_manager,
'fanout': self.fanout_manager,
'headers': self.headers_manager
}
def route_message(self, exchange_type: str, message: Dict[str, Any],
routing_info: Dict[str, Any]) -> bool:
"""通用消息路由"""
if exchange_type not in self.strategies:
print(f"❌ 不支持的交换机类型: {exchange_type}")
return False
strategy = self.strategies[exchange_type]
manager = self.managers[exchange_type]
return strategy.route_message(manager, message, routing_info)
def create_topology(self, exchange_type: str, topology_config: Dict[str, Any]) -> bool:
"""创建拓扑结构"""
if exchange_type == 'direct':
return self.direct_manager.create_routing_topology(topology_config)
elif exchange_type == 'topic':
return self.topic_manager.create_topic_topology(topology_config)
elif exchange_type == 'fanout':
return self.fanout_manager.create_fanout_topology(topology_config)
elif exchange_type == 'headers':
return self.headers_manager.create_headers_topology(topology_config)
else:
print(f"❌ 不支持的交换机类型: {exchange_type}")
return False
4.2.2 路由模式示例
def routing_examples():
"""路由示例"""
print("=== RabbitMQ路由示例 ===")
# 连接配置
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='localhost',
port=5672,
credentials=pika.PlainCredentials('admin', 'admin123')
)
)
channel = connection.channel()
try:
# 创建通用路由管理器
routing_manager = UniversalRoutingManager(channel)
# 1. 直连路由示例
print("\n1. 直连路由示例")
direct_topology = {
'exchange_name': 'order.direct',
'durable': True,
'queues': [
{
'name': 'order.created.queue',
'routing_keys': ['order.created'],
'durable': True
},
{
'name': 'order.paid.queue',
'routing_keys': ['order.paid'],
'durable': True
},
{
'name': 'order.shipped.queue',
'routing_keys': ['order.shipped'],
'durable': True
}
]
}
routing_manager.create_topology('direct', direct_topology)
# 发送直连消息
direct_messages = [
{
'message': {'order_id': '12345', 'user_id': '67890', 'amount': 99.99},
'routing_info': {'exchange_name': 'order.direct', 'routing_key': 'order.created'}
},
{
'message': {'order_id': '12345', 'payment_id': 'pay_123', 'amount': 99.99},
'routing_info': {'exchange_name': 'order.direct', 'routing_key': 'order.paid'}
}
]
for msg_data in direct_messages:
routing_manager.route_message('direct', msg_data['message'], msg_data['routing_info'])
# 2. 主题路由示例
print("\n2. 主题路由示例")
topic_topology = {
'exchange_name': 'log.topic',
'durable': True,
'queues': [
{
'name': 'error.log.queue',
'routing_patterns': ['*.error.*', '*.critical.*'],
'durable': True
},
{
'name': 'app.log.queue',
'routing_patterns': ['app.*'],
'durable': True
},
{
'name': 'all.log.queue',
'routing_patterns': ['#'],
'durable': True
}
]
}
routing_manager.create_topology('topic', topic_topology)
# 发送主题消息
topic_messages = [
{
'message': {'level': 'error', 'message': '数据库连接失败'},
'routing_info': {'exchange_name': 'log.topic', 'routing_key': 'app.error.database'}
},
{
'message': {'level': 'info', 'message': '用户登录成功'},
'routing_info': {'exchange_name': 'log.topic', 'routing_key': 'app.info.auth'}
},
{
'message': {'level': 'critical', 'message': '系统内存不足'},
'routing_info': {'exchange_name': 'log.topic', 'routing_key': 'system.critical.memory'}
}
]
for msg_data in topic_messages:
routing_manager.route_message('topic', msg_data['message'], msg_data['routing_info'])
# 3. 扇出路由示例
print("\n3. 扇出路由示例")
fanout_topology = {
'exchange_name': 'notification.fanout',
'durable': True,
'queues': [
{'name': 'email.notification.queue', 'durable': True},
{'name': 'sms.notification.queue', 'durable': True},
{'name': 'push.notification.queue', 'durable': True}
]
}
routing_manager.create_topology('fanout', fanout_topology)
# 发送扇出消息
fanout_message = {
'message': {
'type': 'user_registered',
'user_id': '12345',
'email': 'user@example.com',
'phone': '+1234567890'
},
'routing_info': {'exchange_name': 'notification.fanout'}
}
routing_manager.route_message('fanout', fanout_message['message'], fanout_message['routing_info'])
# 4. 头部路由示例
print("\n4. 头部路由示例")
headers_topology = {
'exchange_name': 'task.headers',
'durable': True,
'queues': [
{
'name': 'urgent.task.queue',
'headers_bindings': [
{'headers': {'priority': 'high', 'urgent': True}, 'match_all': True}
],
'durable': True
},
{
'name': 'image.task.queue',
'headers_bindings': [
{'headers': {'type': 'image'}, 'match_all': False}
],
'durable': True
},
{
'name': 'video.task.queue',
'headers_bindings': [
{'headers': {'type': 'video'}, 'match_all': False}
],
'durable': True
}
]
}
routing_manager.create_topology('headers', headers_topology)
# 发送头部消息
headers_messages = [
{
'message': {'task_id': '001', 'content': '紧急图片处理任务'},
'routing_info': {
'exchange_name': 'task.headers',
'headers': {'priority': 'high', 'urgent': True, 'type': 'image'}
}
},
{
'message': {'task_id': '002', 'content': '普通视频处理任务'},
'routing_info': {
'exchange_name': 'task.headers',
'headers': {'priority': 'normal', 'type': 'video'}
}
}
]
for msg_data in headers_messages:
routing_manager.route_message('headers', msg_data['message'], msg_data['routing_info'])
print("\n✅ 所有路由示例执行完成")
except Exception as e:
print(f"❌ 路由示例执行失败: {e}")
finally:
connection.close()
if __name__ == "__main__":
routing_examples()
4.3 路由性能优化
4.3.1 路由性能监控器
import time
from collections import defaultdict, deque
from threading import Lock
from typing import Deque
@dataclass
class RoutingMetrics:
"""路由指标"""
total_messages: int = 0
successful_routes: int = 0
failed_routes: int = 0
average_latency: float = 0.0
peak_latency: float = 0.0
throughput_per_second: float = 0.0
last_updated: float = 0.0
class RoutingPerformanceMonitor:
"""路由性能监控器"""
def __init__(self, window_size: int = 1000):
self.window_size = window_size
self.metrics = defaultdict(RoutingMetrics)
self.latency_history: Dict[str, Deque[float]] = defaultdict(lambda: deque(maxlen=window_size))
self.throughput_history: Dict[str, Deque[float]] = defaultdict(lambda: deque(maxlen=60)) # 60秒窗口
self.lock = Lock()
self.start_time = time.time()
def record_routing_start(self, exchange_name: str) -> float:
"""记录路由开始时间"""
return time.time()
def record_routing_end(self, exchange_name: str, start_time: float, success: bool = True):
"""记录路由结束"""
end_time = time.time()
latency = end_time - start_time
with self.lock:
metrics = self.metrics[exchange_name]
metrics.total_messages += 1
if success:
metrics.successful_routes += 1
else:
metrics.failed_routes += 1
# 更新延迟统计
self.latency_history[exchange_name].append(latency)
if latency > metrics.peak_latency:
metrics.peak_latency = latency
# 计算平均延迟
latencies = list(self.latency_history[exchange_name])
metrics.average_latency = sum(latencies) / len(latencies) if latencies else 0.0
# 更新吞吐量
current_time = time.time()
self.throughput_history[exchange_name].append(current_time)
# 计算每秒吞吐量
recent_timestamps = [t for t in self.throughput_history[exchange_name]
if current_time - t <= 1.0]
metrics.throughput_per_second = len(recent_timestamps)
metrics.last_updated = current_time
def get_metrics(self, exchange_name: str) -> RoutingMetrics:
"""获取路由指标"""
with self.lock:
return self.metrics[exchange_name]
def get_all_metrics(self) -> Dict[str, RoutingMetrics]:
"""获取所有路由指标"""
with self.lock:
return dict(self.metrics)
def reset_metrics(self, exchange_name: str = None):
"""重置指标"""
with self.lock:
if exchange_name:
self.metrics[exchange_name] = RoutingMetrics()
self.latency_history[exchange_name].clear()
self.throughput_history[exchange_name].clear()
else:
self.metrics.clear()
self.latency_history.clear()
self.throughput_history.clear()
def print_metrics_report(self):
"""打印指标报告"""
print("\n=== 路由性能报告 ===")
with self.lock:
for exchange_name, metrics in self.metrics.items():
success_rate = (metrics.successful_routes / metrics.total_messages * 100) if metrics.total_messages > 0 else 0
print(f"\n交换机: {exchange_name}")
print(f" 总消息数: {metrics.total_messages}")
print(f" 成功路由: {metrics.successful_routes}")
print(f" 失败路由: {metrics.failed_routes}")
print(f" 成功率: {success_rate:.2f}%")
print(f" 平均延迟: {metrics.average_latency*1000:.2f}ms")
print(f" 峰值延迟: {metrics.peak_latency*1000:.2f}ms")
print(f" 吞吐量: {metrics.throughput_per_second:.2f} msg/s")
print(f" 最后更新: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(metrics.last_updated))}")
class OptimizedRoutingManager(UniversalRoutingManager):
"""优化的路由管理器"""
def __init__(self, channel):
super().__init__(channel)
self.performance_monitor = RoutingPerformanceMonitor()
self.connection_pool = {}
self.channel_pool = {}
def route_message_with_monitoring(self, exchange_type: str, message: Dict[str, Any],
routing_info: Dict[str, Any]) -> bool:
"""带监控的消息路由"""
exchange_name = routing_info.get('exchange_name', 'unknown')
start_time = self.performance_monitor.record_routing_start(exchange_name)
try:
success = self.route_message(exchange_type, message, routing_info)
self.performance_monitor.record_routing_end(exchange_name, start_time, success)
return success
except Exception as e:
self.performance_monitor.record_routing_end(exchange_name, start_time, False)
print(f"❌ 路由失败: {e}")
return False
def batch_route_messages(self, messages: List[Dict[str, Any]]) -> Dict[str, int]:
"""批量路由消息"""
results = {'success': 0, 'failed': 0}
for msg_data in messages:
exchange_type = msg_data['exchange_type']
message = msg_data['message']
routing_info = msg_data['routing_info']
if self.route_message_with_monitoring(exchange_type, message, routing_info):
results['success'] += 1
else:
results['failed'] += 1
return results
def get_performance_report(self) -> Dict[str, Any]:
"""获取性能报告"""
return {
'metrics': self.performance_monitor.get_all_metrics(),
'timestamp': time.time()
}
4.3.2 路由缓存优化
from functools import lru_cache
from threading import RLock
class RoutingCache:
"""路由缓存"""
def __init__(self, max_size: int = 1000):
self.max_size = max_size
self.cache = {}
self.access_count = defaultdict(int)
self.lock = RLock()
def get_cached_route(self, cache_key: str) -> Optional[List[str]]:
"""获取缓存的路由"""
with self.lock:
if cache_key in self.cache:
self.access_count[cache_key] += 1
return self.cache[cache_key]
return None
def cache_route(self, cache_key: str, matched_queues: List[str]):
"""缓存路由结果"""
with self.lock:
if len(self.cache) >= self.max_size:
# LRU淘汰策略
least_used_key = min(self.access_count.keys(), key=lambda k: self.access_count[k])
del self.cache[least_used_key]
del self.access_count[least_used_key]
self.cache[cache_key] = matched_queues
self.access_count[cache_key] = 1
def clear_cache(self):
"""清空缓存"""
with self.lock:
self.cache.clear()
self.access_count.clear()
def get_cache_stats(self) -> Dict[str, Any]:
"""获取缓存统计"""
with self.lock:
return {
'cache_size': len(self.cache),
'max_size': self.max_size,
'hit_rate': sum(self.access_count.values()) / len(self.access_count) if self.access_count else 0
}
class CachedTopicExchangeManager(TopicExchangeManager):
"""带缓存的主题交换机管理器"""
def __init__(self, channel):
super().__init__(channel)
self.routing_cache = RoutingCache()
def test_routing_with_cache(self, exchange_name: str, routing_key: str) -> List[str]:
"""带缓存的路由测试"""
cache_key = f"{exchange_name}:{routing_key}"
# 尝试从缓存获取
cached_result = self.routing_cache.get_cached_route(cache_key)
if cached_result is not None:
return cached_result
# 缓存未命中,计算路由
matched_queues = self.test_routing(exchange_name, routing_key)
# 缓存结果
self.routing_cache.cache_route(cache_key, matched_queues)
return matched_queues
def publish_message(self, exchange_name: str, routing_key: str, message: Dict[str, Any],
properties: Optional[pika.BasicProperties] = None) -> bool:
"""发布消息(使用缓存)"""
try:
# 使用缓存的路由测试
matched_queues = self.test_routing_with_cache(exchange_name, routing_key)
print(f"📍 路由键 '{routing_key}' 将匹配队列: {matched_queues} (缓存)")
body = json.dumps(message, ensure_ascii=False)
self.channel.basic_publish(
exchange=exchange_name,
routing_key=routing_key,
body=body.encode('utf-8'),
properties=properties or pika.BasicProperties(
delivery_mode=2,
timestamp=int(time.time())
)
)
print(f"✅ 主题消息发布成功: {exchange_name} (路由键: {routing_key})")
return True
except Exception as e:
print(f"❌ 主题消息发布失败: {e}")
return False
4.4 路由监控与诊断
4.4.1 路由诊断工具
class RoutingDiagnostics:
"""路由诊断工具"""
def __init__(self, routing_manager: UniversalRoutingManager):
self.routing_manager = routing_manager
self.diagnostic_history = []
def diagnose_routing(self, exchange_type: str, exchange_name: str,
routing_key: str = None, headers: Dict[str, Any] = None) -> Dict[str, Any]:
"""诊断路由配置"""
diagnosis = {
'exchange_type': exchange_type,
'exchange_name': exchange_name,
'routing_key': routing_key,
'headers': headers,
'timestamp': time.time(),
'issues': [],
'recommendations': [],
'matched_queues': []
}
try:
if exchange_type == 'direct':
manager = self.routing_manager.direct_manager
if exchange_name in manager.bindings:
bindings = manager.bindings[exchange_name]
matched_queues = [b['queue'] for b in bindings if b['routing_key'] == routing_key]
diagnosis['matched_queues'] = matched_queues
if not matched_queues:
diagnosis['issues'].append(f"没有队列绑定到路由键 '{routing_key}'")
diagnosis['recommendations'].append("检查队列绑定配置")
else:
diagnosis['issues'].append(f"交换机 '{exchange_name}' 不存在")
elif exchange_type == 'topic':
manager = self.routing_manager.topic_manager
if exchange_name in manager.routing_patterns:
matched_queues = manager.test_routing(exchange_name, routing_key)
diagnosis['matched_queues'] = matched_queues
if not matched_queues:
diagnosis['issues'].append(f"路由键 '{routing_key}' 没有匹配任何模式")
diagnosis['recommendations'].append("检查路由模式配置")
else:
diagnosis['issues'].append(f"交换机 '{exchange_name}' 不存在")
elif exchange_type == 'fanout':
manager = self.routing_manager.fanout_manager
if exchange_name in manager.bindings:
matched_queues = manager.get_bound_queues(exchange_name)
diagnosis['matched_queues'] = matched_queues
if not matched_queues:
diagnosis['issues'].append("没有队列绑定到扇出交换机")
diagnosis['recommendations'].append("添加队列绑定")
else:
diagnosis['issues'].append(f"交换机 '{exchange_name}' 不存在")
elif exchange_type == 'headers':
manager = self.routing_manager.headers_manager
if headers and exchange_name in manager.bindings:
matched_queues = manager.test_header_matching(exchange_name, headers)
diagnosis['matched_queues'] = matched_queues
if not matched_queues:
diagnosis['issues'].append(f"头部 {headers} 没有匹配任何绑定")
diagnosis['recommendations'].append("检查头部匹配规则")
else:
if not headers:
diagnosis['issues'].append("头部交换机需要提供头部信息")
else:
diagnosis['issues'].append(f"交换机 '{exchange_name}' 不存在")
# 通用检查
if len(diagnosis['matched_queues']) > 10:
diagnosis['recommendations'].append("匹配的队列过多,考虑优化路由规则")
if not diagnosis['issues']:
diagnosis['status'] = 'healthy'
else:
diagnosis['status'] = 'issues_found'
except Exception as e:
diagnosis['issues'].append(f"诊断过程中发生错误: {e}")
diagnosis['status'] = 'error'
self.diagnostic_history.append(diagnosis)
return diagnosis
def generate_routing_report(self) -> str:
"""生成路由报告"""
report = ["\n=== 路由诊断报告 ==="]
for diagnosis in self.diagnostic_history[-10:]: # 最近10次诊断
report.append(f"\n时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(diagnosis['timestamp']))}")
report.append(f"交换机: {diagnosis['exchange_name']} ({diagnosis['exchange_type']})")
if diagnosis['routing_key']:
report.append(f"路由键: {diagnosis['routing_key']}")
if diagnosis['headers']:
report.append(f"头部: {diagnosis['headers']}")
report.append(f"状态: {diagnosis['status']}")
report.append(f"匹配队列: {diagnosis['matched_queues']}")
if diagnosis['issues']:
report.append("问题:")
for issue in diagnosis['issues']:
report.append(f" - {issue}")
if diagnosis['recommendations']:
report.append("建议:")
for rec in diagnosis['recommendations']:
report.append(f" - {rec}")
report.append("-" * 50)
return "\n".join(report)
def clear_history(self):
"""清空诊断历史"""
self.diagnostic_history.clear()
4.4.2 路由监控示例
def routing_monitoring_example():
"""路由监控示例"""
print("=== 路由监控示例 ===")
# 连接配置
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='localhost',
port=5672,
credentials=pika.PlainCredentials('admin', 'admin123')
)
)
channel = connection.channel()
try:
# 创建优化的路由管理器
routing_manager = OptimizedRoutingManager(channel)
# 创建诊断工具
diagnostics = RoutingDiagnostics(routing_manager)
# 创建测试拓扑
topic_topology = {
'exchange_name': 'monitor.topic',
'durable': True,
'queues': [
{
'name': 'error.monitor.queue',
'routing_patterns': ['*.error.*'],
'durable': True
},
{
'name': 'info.monitor.queue',
'routing_patterns': ['*.info.*'],
'durable': True
}
]
}
routing_manager.create_topology('topic', topic_topology)
# 测试消息路由
test_messages = [
{
'exchange_type': 'topic',
'message': {'level': 'error', 'message': '测试错误消息'},
'routing_info': {'exchange_name': 'monitor.topic', 'routing_key': 'app.error.test'}
},
{
'exchange_type': 'topic',
'message': {'level': 'info', 'message': '测试信息消息'},
'routing_info': {'exchange_name': 'monitor.topic', 'routing_key': 'app.info.test'}
},
{
'exchange_type': 'topic',
'message': {'level': 'debug', 'message': '测试调试消息'},
'routing_info': {'exchange_name': 'monitor.topic', 'routing_key': 'app.debug.test'}
}
]
# 批量路由消息
print("\n批量路由消息...")
results = routing_manager.batch_route_messages(test_messages)
print(f"路由结果: 成功 {results['success']}, 失败 {results['failed']}")
# 性能监控
print("\n性能监控报告:")
routing_manager.performance_monitor.print_metrics_report()
# 路由诊断
print("\n路由诊断:")
# 诊断正常路由
diagnosis1 = diagnostics.diagnose_routing('topic', 'monitor.topic', 'app.error.database')
print(f"诊断结果1: {diagnosis1['status']}, 匹配队列: {diagnosis1['matched_queues']}")
# 诊断异常路由
diagnosis2 = diagnostics.diagnose_routing('topic', 'monitor.topic', 'system.warning.memory')
print(f"诊断结果2: {diagnosis2['status']}, 问题: {diagnosis2['issues']}")
# 生成诊断报告
print(diagnostics.generate_routing_report())
print("\n✅ 路由监控示例完成")
except Exception as e:
print(f"❌ 路由监控示例失败: {e}")
finally:
connection.close()
if __name__ == "__main__":
routing_examples()
routing_monitoring_example()
4.5 本章总结
4.5.1 核心知识点
交换机类型
- Direct Exchange:基于精确路由键匹配
- Topic Exchange:基于模式匹配(* 和 # 通配符)
- Fanout Exchange:广播到所有绑定队列
- Headers Exchange:基于消息头部属性匹配
路由策略
- 策略模式设计:不同交换机类型使用不同路由策略
- 通用路由管理器:统一管理多种交换机类型
- 路由拓扑管理:自动化创建和配置路由结构
性能优化
- 路由缓存:缓存路由计算结果,提高性能
- 性能监控:实时监控路由延迟和吞吐量
- 批量处理:批量路由消息,减少网络开销
监控诊断
- 路由诊断工具:自动检测路由配置问题
- 性能指标收集:延迟、吞吐量、成功率统计
- 问题排查:提供具体的问题描述和解决建议
4.5.2 最佳实践
交换机选择
- 简单路由使用 Direct Exchange
- 复杂模式匹配使用 Topic Exchange
- 广播场景使用 Fanout Exchange
- 复杂条件匹配使用 Headers Exchange
路由设计
- 合理设计路由键命名规范
- 避免过度复杂的路由模式
- 考虑路由的可扩展性和维护性
性能优化
- 使用路由缓存减少计算开销
- 监控路由性能指标
- 合理配置队列和交换机参数
运维监控
- 定期检查路由配置
- 监控路由性能指标
- 建立路由问题排查流程
4.5.3 练习题
基础练习
- 实现一个日志系统的路由配置,支持按级别和模块分发
- 设计一个订单处理系统的路由策略
- 创建一个通知系统的扇出路由
进阶练习
- 实现一个智能路由系统,根据消息内容动态选择路由策略
- 设计一个高性能的路由缓存系统
- 创建一个路由监控和告警系统
实战练习
- 构建一个微服务间的事件路由系统
- 实现一个支持多租户的消息路由
- 设计一个容错的路由系统,支持路由失败重试
通过本章学习,你应该掌握了 RabbitMQ 中各种交换机类型的使用方法、路由策略的设计原则,以及路由性能优化和监控的技巧。这些知识将为构建高效、可靠的消息路由系统奠定坚实基础。 “`