3.1 连接与通道管理
3.1.1 连接管理
import pika
import time
import threading
import logging
from typing import Dict, Any, Optional, Callable, List
from dataclasses import dataclass
from enum import Enum
import ssl
import json
class ConnectionState(Enum):
"""连接状态枚举"""
DISCONNECTED = "disconnected"
CONNECTING = "connecting"
CONNECTED = "connected"
RECONNECTING = "reconnecting"
CLOSED = "closed"
@dataclass
class ConnectionConfig:
"""连接配置"""
host: str = 'localhost'
port: int = 5672
virtual_host: str = '/'
username: str = 'guest'
password: str = 'guest'
# 连接参数
heartbeat: int = 600
blocked_connection_timeout: int = 300
connection_attempts: int = 3
retry_delay: float = 2.0
socket_timeout: float = 10.0
# SSL配置
ssl_enabled: bool = False
ssl_options: Dict[str, Any] = None
# 其他配置
locale: str = 'en_US'
client_properties: Dict[str, Any] = None
def __post_init__(self):
if self.ssl_options is None:
self.ssl_options = {}
if self.client_properties is None:
self.client_properties = {
'product': 'RabbitMQ Python Client',
'version': '1.0.0'
}
class RabbitMQConnectionManager:
"""RabbitMQ连接管理器"""
def __init__(self, config: ConnectionConfig):
self.config = config
self.connection: Optional[pika.BlockingConnection] = None
self.state = ConnectionState.DISCONNECTED
self.logger = logging.getLogger(__name__)
self._lock = threading.Lock()
self._reconnect_thread: Optional[threading.Thread] = None
self._stop_reconnect = threading.Event()
# 回调函数
self.on_connection_open: Optional[Callable] = None
self.on_connection_closed: Optional[Callable] = None
self.on_connection_error: Optional[Callable] = None
def connect(self) -> bool:
"""建立连接"""
with self._lock:
if self.state == ConnectionState.CONNECTED:
self.logger.info("连接已存在")
return True
try:
self.state = ConnectionState.CONNECTING
self.logger.info(f"正在连接到 {self.config.host}:{self.config.port}")
# 构建连接参数
connection_params = self._build_connection_params()
# 建立连接
self.connection = pika.BlockingConnection(connection_params)
self.state = ConnectionState.CONNECTED
self.logger.info("RabbitMQ连接建立成功")
# 调用连接打开回调
if self.on_connection_open:
self.on_connection_open(self.connection)
return True
except Exception as e:
self.state = ConnectionState.DISCONNECTED
self.logger.error(f"连接失败: {e}")
# 调用连接错误回调
if self.on_connection_error:
self.on_connection_error(e)
return False
def disconnect(self):
"""断开连接"""
with self._lock:
if self.connection and not self.connection.is_closed:
try:
self.connection.close()
self.logger.info("连接已断开")
except Exception as e:
self.logger.error(f"断开连接时出错: {e}")
self.state = ConnectionState.CLOSED
self.connection = None
# 停止重连线程
if self._reconnect_thread and self._reconnect_thread.is_alive():
self._stop_reconnect.set()
self._reconnect_thread.join()
# 调用连接关闭回调
if self.on_connection_closed:
self.on_connection_closed()
def is_connected(self) -> bool:
"""检查连接状态"""
return (self.connection is not None and
not self.connection.is_closed and
self.state == ConnectionState.CONNECTED)
def get_connection(self) -> Optional[pika.BlockingConnection]:
"""获取连接对象"""
if self.is_connected():
return self.connection
return None
def start_auto_reconnect(self, max_attempts: int = -1):
"""启动自动重连"""
if self._reconnect_thread and self._reconnect_thread.is_alive():
self.logger.warning("自动重连已在运行")
return
self._stop_reconnect.clear()
self._reconnect_thread = threading.Thread(
target=self._auto_reconnect_worker,
args=(max_attempts,),
daemon=True
)
self._reconnect_thread.start()
self.logger.info("自动重连已启动")
def stop_auto_reconnect(self):
"""停止自动重连"""
self._stop_reconnect.set()
if self._reconnect_thread and self._reconnect_thread.is_alive():
self._reconnect_thread.join()
self.logger.info("自动重连已停止")
def _build_connection_params(self) -> pika.ConnectionParameters:
"""构建连接参数"""
# SSL配置
ssl_context = None
if self.config.ssl_enabled:
ssl_context = ssl.create_default_context()
if self.config.ssl_options:
# 配置SSL选项
if 'ca_certs' in self.config.ssl_options:
ssl_context.load_verify_locations(self.config.ssl_options['ca_certs'])
if 'certfile' in self.config.ssl_options and 'keyfile' in self.config.ssl_options:
ssl_context.load_cert_chain(
self.config.ssl_options['certfile'],
self.config.ssl_options['keyfile']
)
# 认证信息
credentials = pika.PlainCredentials(
self.config.username,
self.config.password
)
return pika.ConnectionParameters(
host=self.config.host,
port=self.config.port,
virtual_host=self.config.virtual_host,
credentials=credentials,
heartbeat=self.config.heartbeat,
blocked_connection_timeout=self.config.blocked_connection_timeout,
connection_attempts=self.config.connection_attempts,
retry_delay=self.config.retry_delay,
socket_timeout=self.config.socket_timeout,
ssl_options=pika.SSLOptions(ssl_context) if ssl_context else None,
locale=self.config.locale,
client_properties=self.config.client_properties
)
def _auto_reconnect_worker(self, max_attempts: int):
"""自动重连工作线程"""
attempt = 0
while not self._stop_reconnect.is_set():
if max_attempts > 0 and attempt >= max_attempts:
self.logger.error(f"达到最大重连次数 ({max_attempts}),停止重连")
break
if not self.is_connected():
attempt += 1
self.state = ConnectionState.RECONNECTING
self.logger.info(f"尝试重连 (第 {attempt} 次)")
if self.connect():
attempt = 0 # 重置重连计数
self.logger.info("重连成功")
else:
self.logger.warning(f"重连失败,{self.config.retry_delay}秒后重试")
# 等待重试间隔
self._stop_reconnect.wait(self.config.retry_delay)
class RabbitMQChannelManager:
"""RabbitMQ通道管理器"""
def __init__(self, connection_manager: RabbitMQConnectionManager):
self.connection_manager = connection_manager
self.channels: Dict[str, pika.channel.Channel] = {}
self.logger = logging.getLogger(__name__)
self._lock = threading.Lock()
def create_channel(self, channel_id: str = None) -> Optional[pika.channel.Channel]:
"""创建通道"""
with self._lock:
connection = self.connection_manager.get_connection()
if not connection:
self.logger.error("无法创建通道:连接不可用")
return None
try:
channel = connection.channel()
# 生成通道ID
if not channel_id:
channel_id = f"channel_{len(self.channels) + 1}"
self.channels[channel_id] = channel
self.logger.info(f"通道创建成功: {channel_id}")
return channel
except Exception as e:
self.logger.error(f"创建通道失败: {e}")
return None
def get_channel(self, channel_id: str) -> Optional[pika.channel.Channel]:
"""获取通道"""
return self.channels.get(channel_id)
def close_channel(self, channel_id: str) -> bool:
"""关闭通道"""
with self._lock:
channel = self.channels.get(channel_id)
if not channel:
self.logger.warning(f"通道不存在: {channel_id}")
return False
try:
if not channel.is_closed:
channel.close()
del self.channels[channel_id]
self.logger.info(f"通道已关闭: {channel_id}")
return True
except Exception as e:
self.logger.error(f"关闭通道失败: {e}")
return False
def close_all_channels(self):
"""关闭所有通道"""
with self._lock:
for channel_id in list(self.channels.keys()):
self.close_channel(channel_id)
def get_channel_count(self) -> int:
"""获取通道数量"""
return len(self.channels)
def list_channels(self) -> List[str]:
"""列出所有通道ID"""
return list(self.channels.keys())
# 使用示例
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
print("=== RabbitMQ连接与通道管理示例 ===")
# 创建连接配置
config = ConnectionConfig(
host='localhost',
port=5672,
username='admin',
password='admin123',
heartbeat=600,
connection_attempts=3,
retry_delay=2.0
)
# 创建连接管理器
conn_manager = RabbitMQConnectionManager(config)
# 设置回调函数
def on_connection_open(connection):
print(f"✅ 连接已打开: {connection}")
def on_connection_closed():
print("❌ 连接已关闭")
def on_connection_error(error):
print(f"❌ 连接错误: {error}")
conn_manager.on_connection_open = on_connection_open
conn_manager.on_connection_closed = on_connection_closed
conn_manager.on_connection_error = on_connection_error
try:
# 建立连接
print("\n1. 建立连接")
if conn_manager.connect():
print("✅ 连接建立成功")
# 创建通道管理器
channel_manager = RabbitMQChannelManager(conn_manager)
# 创建通道
print("\n2. 创建通道")
channel1 = channel_manager.create_channel("main_channel")
channel2 = channel_manager.create_channel("backup_channel")
if channel1 and channel2:
print(f"✅ 通道创建成功,总数: {channel_manager.get_channel_count()}")
print(f"通道列表: {channel_manager.list_channels()}")
# 模拟使用通道
time.sleep(2)
# 关闭通道
print("\n3. 关闭通道")
channel_manager.close_all_channels()
print("✅ 所有通道已关闭")
else:
print("❌ 通道创建失败")
else:
print("❌ 连接建立失败")
except KeyboardInterrupt:
print("\n收到中断信号")
finally:
# 断开连接
print("\n4. 断开连接")
conn_manager.disconnect()
print("✅ 连接已断开")
3.2 队列操作
3.2.1 队列声明与管理
import pika
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
from enum import Enum
import json
import time
class QueueType(Enum):
"""队列类型枚举"""
CLASSIC = "classic"
QUORUM = "quorum"
STREAM = "stream"
@dataclass
class QueueConfig:
"""队列配置"""
name: str
durable: bool = True
exclusive: bool = False
auto_delete: bool = False
arguments: Dict[str, Any] = None
# 队列类型
queue_type: QueueType = QueueType.CLASSIC
# TTL配置
message_ttl: Optional[int] = None # 消息TTL(毫秒)
expires: Optional[int] = None # 队列TTL(毫秒)
# 长度限制
max_length: Optional[int] = None
max_length_bytes: Optional[int] = None
# 死信配置
dead_letter_exchange: Optional[str] = None
dead_letter_routing_key: Optional[str] = None
# 优先级配置
max_priority: Optional[int] = None
# 延迟配置
delayed: bool = False
def __post_init__(self):
if self.arguments is None:
self.arguments = {}
# 设置队列类型
if self.queue_type != QueueType.CLASSIC:
self.arguments['x-queue-type'] = self.queue_type.value
# 设置TTL
if self.message_ttl is not None:
self.arguments['x-message-ttl'] = self.message_ttl
if self.expires is not None:
self.arguments['x-expires'] = self.expires
# 设置长度限制
if self.max_length is not None:
self.arguments['x-max-length'] = self.max_length
if self.max_length_bytes is not None:
self.arguments['x-max-length-bytes'] = self.max_length_bytes
# 设置死信配置
if self.dead_letter_exchange is not None:
self.arguments['x-dead-letter-exchange'] = self.dead_letter_exchange
if self.dead_letter_routing_key is not None:
self.arguments['x-dead-letter-routing-key'] = self.dead_letter_routing_key
# 设置优先级
if self.max_priority is not None:
self.arguments['x-max-priority'] = self.max_priority
# 设置延迟队列
if self.delayed:
self.arguments['x-delayed-type'] = 'direct'
class RabbitMQQueueManager:
"""RabbitMQ队列管理器"""
def __init__(self, channel: pika.channel.Channel):
self.channel = channel
self.logger = logging.getLogger(__name__)
self.declared_queues: Dict[str, QueueConfig] = {}
def declare_queue(self, config: QueueConfig) -> bool:
"""声明队列"""
try:
result = self.channel.queue_declare(
queue=config.name,
durable=config.durable,
exclusive=config.exclusive,
auto_delete=config.auto_delete,
arguments=config.arguments
)
self.declared_queues[config.name] = config
self.logger.info(f"队列声明成功: {config.name}")
return True
except Exception as e:
self.logger.error(f"队列声明失败 {config.name}: {e}")
return False
def delete_queue(self, queue_name: str, if_unused: bool = False, if_empty: bool = False) -> bool:
"""删除队列"""
try:
self.channel.queue_delete(
queue=queue_name,
if_unused=if_unused,
if_empty=if_empty
)
if queue_name in self.declared_queues:
del self.declared_queues[queue_name]
self.logger.info(f"队列删除成功: {queue_name}")
return True
except Exception as e:
self.logger.error(f"队列删除失败 {queue_name}: {e}")
return False
def purge_queue(self, queue_name: str) -> Optional[int]:
"""清空队列"""
try:
result = self.channel.queue_purge(queue=queue_name)
message_count = result.method.message_count
self.logger.info(f"队列清空成功: {queue_name}, 清除消息数: {message_count}")
return message_count
except Exception as e:
self.logger.error(f"队列清空失败 {queue_name}: {e}")
return None
def get_queue_info(self, queue_name: str) -> Optional[Dict[str, Any]]:
"""获取队列信息"""
try:
# 使用passive=True来检查队列是否存在而不创建它
result = self.channel.queue_declare(queue=queue_name, passive=True)
return {
'name': queue_name,
'message_count': result.method.message_count,
'consumer_count': result.method.consumer_count,
'config': self.declared_queues.get(queue_name)
}
except Exception as e:
self.logger.error(f"获取队列信息失败 {queue_name}: {e}")
return None
def bind_queue(self, queue_name: str, exchange_name: str, routing_key: str = "", arguments: Dict[str, Any] = None) -> bool:
"""绑定队列到交换机"""
try:
self.channel.queue_bind(
exchange=exchange_name,
queue=queue_name,
routing_key=routing_key,
arguments=arguments
)
self.logger.info(f"队列绑定成功: {queue_name} -> {exchange_name} (routing_key: {routing_key})")
return True
except Exception as e:
self.logger.error(f"队列绑定失败: {e}")
return False
def unbind_queue(self, queue_name: str, exchange_name: str, routing_key: str = "", arguments: Dict[str, Any] = None) -> bool:
"""解绑队列"""
try:
self.channel.queue_unbind(
exchange=exchange_name,
queue=queue_name,
routing_key=routing_key,
arguments=arguments
)
self.logger.info(f"队列解绑成功: {queue_name} -> {exchange_name} (routing_key: {routing_key})")
return True
except Exception as e:
self.logger.error(f"队列解绑失败: {e}")
return False
def list_declared_queues(self) -> List[str]:
"""列出已声明的队列"""
return list(self.declared_queues.keys())
def create_standard_queues(self) -> Dict[str, bool]:
"""创建标准队列集合"""
results = {}
# 普通队列
normal_queue = QueueConfig(
name="normal.queue",
durable=True
)
results["normal.queue"] = self.declare_queue(normal_queue)
# 优先级队列
priority_queue = QueueConfig(
name="priority.queue",
durable=True,
max_priority=10
)
results["priority.queue"] = self.declare_queue(priority_queue)
# TTL队列
ttl_queue = QueueConfig(
name="ttl.queue",
durable=True,
message_ttl=60000 # 60秒
)
results["ttl.queue"] = self.declare_queue(ttl_queue)
# 死信队列
dlx_queue = QueueConfig(
name="dlx.queue",
durable=True
)
results["dlx.queue"] = self.declare_queue(dlx_queue)
# 带死信的队列
main_queue = QueueConfig(
name="main.queue",
durable=True,
dead_letter_exchange="dlx.exchange",
dead_letter_routing_key="dlx"
)
results["main.queue"] = self.declare_queue(main_queue)
# 限长队列
limited_queue = QueueConfig(
name="limited.queue",
durable=True,
max_length=1000
)
results["limited.queue"] = self.declare_queue(limited_queue)
return results
# 使用示例
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
print("=== RabbitMQ队列管理示例 ===")
# 连接配置
config = ConnectionConfig(
host='localhost',
port=5672,
username='admin',
password='admin123'
)
# 建立连接
conn_manager = RabbitMQConnectionManager(config)
try:
if conn_manager.connect():
# 创建通道
channel_manager = RabbitMQChannelManager(conn_manager)
channel = channel_manager.create_channel("queue_mgmt_channel")
if channel:
# 创建队列管理器
queue_manager = RabbitMQQueueManager(channel)
print("\n1. 创建标准队列")
results = queue_manager.create_standard_queues()
success_count = sum(1 for success in results.values() if success)
print(f"✅ 成功创建 {success_count}/{len(results)} 个队列")
print("\n2. 列出已声明的队列")
declared_queues = queue_manager.list_declared_queues()
print(f"已声明队列: {declared_queues}")
print("\n3. 获取队列信息")
for queue_name in declared_queues[:3]: # 只显示前3个
info = queue_manager.get_queue_info(queue_name)
if info:
print(f"队列 {queue_name}: 消息数={info['message_count']}, 消费者数={info['consumer_count']}")
print("\n4. 测试队列操作")
test_queue = QueueConfig(
name="test.queue",
durable=False,
auto_delete=True
)
if queue_manager.declare_queue(test_queue):
print("✅ 测试队列创建成功")
# 清空队列
purged_count = queue_manager.purge_queue("test.queue")
print(f"清空队列,删除消息数: {purged_count}")
# 删除队列
if queue_manager.delete_queue("test.queue"):
print("✅ 测试队列删除成功")
# 关闭通道
channel_manager.close_all_channels()
else:
print("❌ 通道创建失败")
else:
print("❌ 连接失败")
except Exception as e:
print(f"❌ 错误: {e}")
finally:
conn_manager.disconnect()
if __name__ == "__main__":
main()
4. 消息消费
4.1 消费者配置
from enum import Enum
from dataclasses import dataclass
from typing import Callable, Optional, Dict, Any
import threading
import time
class AckMode(Enum):
"""确认模式"""
AUTO = "auto" # 自动确认
MANUAL = "manual" # 手动确认
REJECT = "reject" # 拒绝消息
class QosMode(Enum):
"""QoS模式"""
GLOBAL = "global" # 全局QoS
CHANNEL = "channel" # 通道QoS
@dataclass
class ConsumerConfig:
"""消费者配置"""
queue_name: str
consumer_tag: str = ""
auto_ack: bool = False
exclusive: bool = False
no_local: bool = False
prefetch_count: int = 1
prefetch_size: int = 0
qos_global: bool = False
callback: Optional[Callable] = None
class RabbitMQConsumer:
"""RabbitMQ消费者"""
def __init__(self, channel):
self.channel = channel
self.consumers = {}
self.is_consuming = False
self.statistics = {
'consumed': 0,
'acknowledged': 0,
'rejected': 0,
'requeued': 0
}
self._lock = threading.Lock()
def set_qos(self, prefetch_count: int = 1, prefetch_size: int = 0, global_qos: bool = False) -> bool:
"""设置QoS"""
try:
self.channel.basic_qos(
prefetch_count=prefetch_count,
prefetch_size=prefetch_size,
global_=global_qos
)
return True
except Exception as e:
print(f"设置QoS失败: {e}")
return False
def start_consuming(self, config: ConsumerConfig) -> bool:
"""开始消费"""
try:
# 设置QoS
self.set_qos(
prefetch_count=config.prefetch_count,
prefetch_size=config.prefetch_size,
global_qos=config.qos_global
)
# 创建消费回调
def callback_wrapper(ch, method, properties, body):
try:
# 更新统计
with self._lock:
self.statistics['consumed'] += 1
# 调用用户回调
if config.callback:
result = config.callback(ch, method, properties, body)
# 处理确认
if not config.auto_ack:
if result == AckMode.AUTO or result is True:
ch.basic_ack(delivery_tag=method.delivery_tag)
with self._lock:
self.statistics['acknowledged'] += 1
elif result == AckMode.REJECT:
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
with self._lock:
self.statistics['rejected'] += 1
elif result == AckMode.MANUAL:
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
with self._lock:
self.statistics['requeued'] += 1
else:
# 默认确认
if not config.auto_ack:
ch.basic_ack(delivery_tag=method.delivery_tag)
with self._lock:
self.statistics['acknowledged'] += 1
except Exception as e:
print(f"消息处理错误: {e}")
if not config.auto_ack:
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
# 开始消费
consumer_tag = self.channel.basic_consume(
queue=config.queue_name,
on_message_callback=callback_wrapper,
auto_ack=config.auto_ack,
exclusive=config.exclusive,
consumer_tag=config.consumer_tag
)
self.consumers[consumer_tag] = config
self.is_consuming = True
print(f"✅ 开始消费队列: {config.queue_name}, 消费者标签: {consumer_tag}")
return True
except Exception as e:
print(f"开始消费失败: {e}")
return False
def stop_consuming(self, consumer_tag: str = None) -> bool:
"""停止消费"""
try:
if consumer_tag:
self.channel.basic_cancel(consumer_tag)
if consumer_tag in self.consumers:
del self.consumers[consumer_tag]
print(f"✅ 停止消费者: {consumer_tag}")
else:
# 停止所有消费者
for tag in list(self.consumers.keys()):
self.channel.basic_cancel(tag)
self.consumers.clear()
self.is_consuming = False
print("✅ 停止所有消费者")
return True
except Exception as e:
print(f"停止消费失败: {e}")
return False
def get_statistics(self) -> Dict[str, int]:
"""获取统计信息"""
with self._lock:
return self.statistics.copy()
def reset_statistics(self):
"""重置统计信息"""
with self._lock:
self.statistics = {
'consumed': 0,
'acknowledged': 0,
'rejected': 0,
'requeued': 0
}
4.2 消息处理器
from abc import ABC, abstractmethod
import json
import logging
from typing import Any, Dict
class MessageProcessor(ABC):
"""消息处理器基类"""
@abstractmethod
def process(self, ch, method, properties, body) -> AckMode:
"""处理消息"""
pass
class JSONMessageProcessor(MessageProcessor):
"""JSON消息处理器"""
def __init__(self):
self.logger = logging.getLogger(__name__)
def process(self, ch, method, properties, body) -> AckMode:
"""处理JSON消息"""
try:
# 解析JSON
message = json.loads(body.decode('utf-8'))
# 记录消息信息
self.logger.info(f"收到消息: {message}")
self.logger.info(f"路由键: {method.routing_key}")
self.logger.info(f"交换机: {method.exchange}")
# 处理消息
result = self.handle_message(message, method, properties)
return AckMode.AUTO if result else AckMode.REJECT
except json.JSONDecodeError as e:
self.logger.error(f"JSON解析失败: {e}")
return AckMode.REJECT
except Exception as e:
self.logger.error(f"消息处理失败: {e}")
return AckMode.MANUAL # 重新入队
def handle_message(self, message: Dict[str, Any], method, properties) -> bool:
"""处理具体消息逻辑"""
# 子类实现具体逻辑
print(f"处理消息: {message}")
return True
class OrderMessageProcessor(JSONMessageProcessor):
"""订单消息处理器"""
def handle_message(self, message: Dict[str, Any], method, properties) -> bool:
"""处理订单消息"""
try:
message_type = message.get('type')
if message_type == 'order_created':
return self.handle_order_created(message)
elif message_type == 'order_paid':
return self.handle_order_paid(message)
elif message_type == 'order_shipped':
return self.handle_order_shipped(message)
else:
self.logger.warning(f"未知消息类型: {message_type}")
return False
except Exception as e:
self.logger.error(f"订单消息处理失败: {e}")
return False
def handle_order_created(self, message: Dict[str, Any]) -> bool:
"""处理订单创建"""
order_id = message.get('order_id')
print(f"📦 处理订单创建: {order_id}")
# 实际业务逻辑
time.sleep(0.1) # 模拟处理时间
return True
def handle_order_paid(self, message: Dict[str, Any]) -> bool:
"""处理订单支付"""
order_id = message.get('order_id')
amount = message.get('amount')
print(f"💰 处理订单支付: {order_id}, 金额: {amount}")
time.sleep(0.1)
return True
def handle_order_shipped(self, message: Dict[str, Any]) -> bool:
"""处理订单发货"""
order_id = message.get('order_id')
tracking_number = message.get('tracking_number')
print(f"🚚 处理订单发货: {order_id}, 快递单号: {tracking_number}")
time.sleep(0.1)
return True
4.3 消费示例
def consumer_example():
"""消费者示例"""
print("=== RabbitMQ消息消费示例 ===")
# 连接配置
config = ConnectionConfig(
host='localhost',
port=5672,
username='admin',
password='admin123'
)
# 建立连接
conn_manager = RabbitMQConnectionManager(config)
try:
if conn_manager.connect():
# 创建通道
channel_manager = RabbitMQChannelManager(conn_manager)
channel = channel_manager.create_channel("consumer_channel")
if channel:
# 创建消费者
consumer = RabbitMQConsumer(channel)
# 创建消息处理器
order_processor = OrderMessageProcessor()
# 配置消费者
consumer_config = ConsumerConfig(
queue_name="order.queue",
consumer_tag="order_consumer",
auto_ack=False,
prefetch_count=10,
callback=order_processor.process
)
# 开始消费
if consumer.start_consuming(consumer_config):
print("✅ 消费者启动成功,等待消息...")
# 消费一段时间
try:
channel.start_consuming()
except KeyboardInterrupt:
print("\n⏹️ 收到停止信号")
consumer.stop_consuming()
channel.stop_consuming()
# 显示统计信息
stats = consumer.get_statistics()
print("\n📊 消费统计:")
print(f"已消费: {stats['consumed']}")
print(f"已确认: {stats['acknowledged']}")
print(f"已拒绝: {stats['rejected']}")
print(f"重新入队: {stats['requeued']}")
# 关闭通道
channel_manager.close_all_channels()
else:
print("❌ 通道创建失败")
else:
print("❌ 连接失败")
except Exception as e:
print(f"❌ 错误: {e}")
finally:
conn_manager.disconnect()
if __name__ == "__main__":
consumer_example()
5. 监控与管理
5.1 管理API
import requests
from typing import Dict, List, Optional
from dataclasses import dataclass
@dataclass
class RabbitMQManagementConfig:
"""管理API配置"""
host: str = "localhost"
port: int = 15672
username: str = "admin"
password: str = "admin123"
use_ssl: bool = False
@property
def base_url(self) -> str:
protocol = "https" if self.use_ssl else "http"
return f"{protocol}://{self.host}:{self.port}/api"
class RabbitMQManagementAPI:
"""RabbitMQ管理API客户端"""
def __init__(self, config: RabbitMQManagementConfig):
self.config = config
self.session = requests.Session()
self.session.auth = (config.username, config.password)
self.session.headers.update({'Content-Type': 'application/json'})
def get_overview(self) -> Optional[Dict]:
"""获取集群概览"""
try:
response = self.session.get(f"{self.config.base_url}/overview")
response.raise_for_status()
return response.json()
except Exception as e:
print(f"获取概览失败: {e}")
return None
def get_nodes(self) -> Optional[List[Dict]]:
"""获取节点列表"""
try:
response = self.session.get(f"{self.config.base_url}/nodes")
response.raise_for_status()
return response.json()
except Exception as e:
print(f"获取节点列表失败: {e}")
return None
def get_queues(self, vhost: str = "/") -> Optional[List[Dict]]:
"""获取队列列表"""
try:
response = self.session.get(f"{self.config.base_url}/queues/{vhost}")
response.raise_for_status()
return response.json()
except Exception as e:
print(f"获取队列列表失败: {e}")
return None
def get_exchanges(self, vhost: str = "/") -> Optional[List[Dict]]:
"""获取交换机列表"""
try:
response = self.session.get(f"{self.config.base_url}/exchanges/{vhost}")
response.raise_for_status()
return response.json()
except Exception as e:
print(f"获取交换机列表失败: {e}")
return None
def get_connections(self) -> Optional[List[Dict]]:
"""获取连接列表"""
try:
response = self.session.get(f"{self.config.base_url}/connections")
response.raise_for_status()
return response.json()
except Exception as e:
print(f"获取连接列表失败: {e}")
return None
def get_channels(self) -> Optional[List[Dict]]:
"""获取通道列表"""
try:
response = self.session.get(f"{self.config.base_url}/channels")
response.raise_for_status()
return response.json()
except Exception as e:
print(f"获取通道列表失败: {e}")
return None
def delete_queue(self, queue_name: str, vhost: str = "/") -> bool:
"""删除队列"""
try:
response = self.session.delete(f"{self.config.base_url}/queues/{vhost}/{queue_name}")
response.raise_for_status()
return True
except Exception as e:
print(f"删除队列失败: {e}")
return False
def purge_queue(self, queue_name: str, vhost: str = "/") -> bool:
"""清空队列"""
try:
response = self.session.delete(f"{self.config.base_url}/queues/{vhost}/{queue_name}/contents")
response.raise_for_status()
return True
except Exception as e:
print(f"清空队列失败: {e}")
return False
5.2 监控示例
def monitoring_example():
"""监控示例"""
print("=== RabbitMQ监控示例 ===")
# 管理API配置
mgmt_config = RabbitMQManagementConfig(
host='localhost',
port=15672,
username='admin',
password='admin123'
)
# 创建管理API客户端
api = RabbitMQManagementAPI(mgmt_config)
# 获取集群概览
print("\n1. 集群概览")
overview = api.get_overview()
if overview:
print(f"RabbitMQ版本: {overview.get('rabbitmq_version')}")
print(f"Erlang版本: {overview.get('erlang_version')}")
print(f"消息总数: {overview.get('queue_totals', {}).get('messages', 0)}")
print(f"准备就绪消息: {overview.get('queue_totals', {}).get('messages_ready', 0)}")
print(f"未确认消息: {overview.get('queue_totals', {}).get('messages_unacknowledged', 0)}")
# 获取节点信息
print("\n2. 节点信息")
nodes = api.get_nodes()
if nodes:
for node in nodes:
print(f"节点: {node.get('name')}")
print(f" 状态: {node.get('running')}")
print(f" 内存使用: {node.get('mem_used', 0) / 1024 / 1024:.2f} MB")
print(f" 磁盘空闲: {node.get('disk_free', 0) / 1024 / 1024 / 1024:.2f} GB")
# 获取队列信息
print("\n3. 队列信息")
queues = api.get_queues()
if queues:
for queue in queues:
print(f"队列: {queue.get('name')}")
print(f" 消息数: {queue.get('messages', 0)}")
print(f" 消费者数: {queue.get('consumers', 0)}")
print(f" 状态: {queue.get('state')}")
# 获取交换机信息
print("\n4. 交换机信息")
exchanges = api.get_exchanges()
if exchanges:
for exchange in exchanges:
if exchange.get('name'): # 跳过默认交换机
print(f"交换机: {exchange.get('name')}")
print(f" 类型: {exchange.get('type')}")
print(f" 持久化: {exchange.get('durable')}")
# 获取连接信息
print("\n5. 连接信息")
connections = api.get_connections()
if connections:
for conn in connections:
print(f"连接: {conn.get('name')}")
print(f" 状态: {conn.get('state')}")
print(f" 用户: {conn.get('user')}")
print(f" 虚拟主机: {conn.get('vhost')}")
if __name__ == "__main__":
monitoring_example()
本章总结
核心知识点
连接与通道管理
- 连接池管理和复用
- 通道的创建和生命周期
- 连接状态监控
队列操作
- 队列声明和配置
- 队列类型和特性
- 队列绑定和路由
交换机操作
- 交换机类型和路由规则
- 拓扑结构设计
- 绑定关系管理
消息发布
- 消息属性和配置
- 发布确认机制
- 批量发布优化
消息消费
- 消费者配置和QoS
- 消息确认机制
- 消息处理模式
监控管理
- 管理API使用
- 集群状态监控
- 性能指标收集
最佳实践
连接管理
- 使用连接池避免频繁创建连接
- 合理设置连接超时和心跳
- 实现连接重连机制
消息处理
- 使用手动确认保证消息可靠性
- 设置合适的QoS避免消息堆积
- 实现幂等性处理
错误处理
- 实现重试机制和死信队列
- 记录详细的错误日志
- 监控异常指标
性能优化
- 批量发布提高吞吐量
- 合理设置预取数量
- 使用持久化连接
练习题
- 实现一个支持重连的RabbitMQ连接管理器
- 设计一个消息路由系统,支持多种路由策略
- 实现一个消息处理框架,支持插件化处理器
- 创建一个RabbitMQ监控面板,显示关键指标
- 设计一个消息重试和死信处理机制
3.3 交换机操作
3.3.1 交换机声明与管理
import pika
from typing import Dict, Any, Optional, List, Tuple
from dataclasses import dataclass
from enum import Enum
import json
class ExchangeType(Enum):
"""交换机类型枚举"""
DIRECT = "direct"
FANOUT = "fanout"
TOPIC = "topic"
HEADERS = "headers"
DELAYED = "x-delayed-message"
@dataclass
class ExchangeConfig:
"""交换机配置"""
name: str
exchange_type: ExchangeType
durable: bool = True
auto_delete: bool = False
internal: bool = False
arguments: Dict[str, Any] = None
# 延迟交换机配置
delayed_type: Optional[str] = None
def __post_init__(self):
if self.arguments is None:
self.arguments = {}
# 设置延迟交换机参数
if self.exchange_type == ExchangeType.DELAYED:
if self.delayed_type:
self.arguments['x-delayed-type'] = self.delayed_type
else:
self.arguments['x-delayed-type'] = 'direct'
class RabbitMQExchangeManager:
"""RabbitMQ交换机管理器"""
def __init__(self, channel: pika.channel.Channel):
self.channel = channel
self.logger = logging.getLogger(__name__)
self.declared_exchanges: Dict[str, ExchangeConfig] = {}
def declare_exchange(self, config: ExchangeConfig) -> bool:
"""声明交换机"""
try:
self.channel.exchange_declare(
exchange=config.name,
exchange_type=config.exchange_type.value,
durable=config.durable,
auto_delete=config.auto_delete,
internal=config.internal,
arguments=config.arguments
)
self.declared_exchanges[config.name] = config
self.logger.info(f"交换机声明成功: {config.name} (类型: {config.exchange_type.value})")
return True
except Exception as e:
self.logger.error(f"交换机声明失败 {config.name}: {e}")
return False
def delete_exchange(self, exchange_name: str, if_unused: bool = False) -> bool:
"""删除交换机"""
try:
self.channel.exchange_delete(
exchange=exchange_name,
if_unused=if_unused
)
if exchange_name in self.declared_exchanges:
del self.declared_exchanges[exchange_name]
self.logger.info(f"交换机删除成功: {exchange_name}")
return True
except Exception as e:
self.logger.error(f"交换机删除失败 {exchange_name}: {e}")
return False
def bind_exchange(self, destination: str, source: str, routing_key: str = "", arguments: Dict[str, Any] = None) -> bool:
"""绑定交换机"""
try:
self.channel.exchange_bind(
destination=destination,
source=source,
routing_key=routing_key,
arguments=arguments
)
self.logger.info(f"交换机绑定成功: {source} -> {destination} (routing_key: {routing_key})")
return True
except Exception as e:
self.logger.error(f"交换机绑定失败: {e}")
return False
def unbind_exchange(self, destination: str, source: str, routing_key: str = "", arguments: Dict[str, Any] = None) -> bool:
"""解绑交换机"""
try:
self.channel.exchange_unbind(
destination=destination,
source=source,
routing_key=routing_key,
arguments=arguments
)
self.logger.info(f"交换机解绑成功: {source} -> {destination} (routing_key: {routing_key})")
return True
except Exception as e:
self.logger.error(f"交换机解绑失败: {e}")
return False
def list_declared_exchanges(self) -> List[str]:
"""列出已声明的交换机"""
return list(self.declared_exchanges.keys())
def get_exchange_config(self, exchange_name: str) -> Optional[ExchangeConfig]:
"""获取交换机配置"""
return self.declared_exchanges.get(exchange_name)
def create_standard_exchanges(self) -> Dict[str, bool]:
"""创建标准交换机集合"""
results = {}
# Direct交换机
direct_exchange = ExchangeConfig(
name="direct.exchange",
exchange_type=ExchangeType.DIRECT,
durable=True
)
results["direct.exchange"] = self.declare_exchange(direct_exchange)
# Fanout交换机
fanout_exchange = ExchangeConfig(
name="fanout.exchange",
exchange_type=ExchangeType.FANOUT,
durable=True
)
results["fanout.exchange"] = self.declare_exchange(fanout_exchange)
# Topic交换机
topic_exchange = ExchangeConfig(
name="topic.exchange",
exchange_type=ExchangeType.TOPIC,
durable=True
)
results["topic.exchange"] = self.declare_exchange(topic_exchange)
# Headers交换机
headers_exchange = ExchangeConfig(
name="headers.exchange",
exchange_type=ExchangeType.HEADERS,
durable=True
)
results["headers.exchange"] = self.declare_exchange(headers_exchange)
# 死信交换机
dlx_exchange = ExchangeConfig(
name="dlx.exchange",
exchange_type=ExchangeType.DIRECT,
durable=True
)
results["dlx.exchange"] = self.declare_exchange(dlx_exchange)
return results
def create_routing_topology(self) -> bool:
"""创建路由拓扑"""
try:
# 创建交换机
exchanges_result = self.create_standard_exchanges()
# 创建队列管理器(需要传入channel)
queue_manager = RabbitMQQueueManager(self.channel)
# 创建队列
queues_result = queue_manager.create_standard_queues()
# 绑定队列到交换机
bindings = [
("normal.queue", "direct.exchange", "normal"),
("priority.queue", "direct.exchange", "priority"),
("ttl.queue", "topic.exchange", "ttl.*"),
("dlx.queue", "dlx.exchange", "dlx"),
("limited.queue", "fanout.exchange", "")
]
binding_results = []
for queue_name, exchange_name, routing_key in bindings:
success = queue_manager.bind_queue(queue_name, exchange_name, routing_key)
binding_results.append(success)
success_count = sum(1 for success in binding_results if success)
self.logger.info(f"路由拓扑创建完成: {success_count}/{len(bindings)} 个绑定成功")
return all(binding_results)
except Exception as e:
self.logger.error(f"创建路由拓扑失败: {e}")
return False
class RabbitMQTopologyManager:
"""RabbitMQ拓扑管理器"""
def __init__(self, channel: pika.channel.Channel):
self.channel = channel
self.exchange_manager = RabbitMQExchangeManager(channel)
self.queue_manager = RabbitMQQueueManager(channel)
self.logger = logging.getLogger(__name__)
def create_complete_topology(self) -> Dict[str, Any]:
"""创建完整拓扑"""
result = {
'exchanges': {},
'queues': {},
'bindings': [],
'success': False
}
try:
# 创建交换机
self.logger.info("创建交换机...")
result['exchanges'] = self.exchange_manager.create_standard_exchanges()
# 创建队列
self.logger.info("创建队列...")
result['queues'] = self.queue_manager.create_standard_queues()
# 创建绑定
self.logger.info("创建绑定...")
bindings = self._create_bindings()
result['bindings'] = bindings
# 检查整体成功率
exchange_success = sum(1 for success in result['exchanges'].values() if success)
queue_success = sum(1 for success in result['queues'].values() if success)
binding_success = sum(1 for success in bindings if success)
total_operations = len(result['exchanges']) + len(result['queues']) + len(bindings)
successful_operations = exchange_success + queue_success + binding_success
result['success'] = successful_operations == total_operations
result['success_rate'] = successful_operations / total_operations if total_operations > 0 else 0
self.logger.info(f"拓扑创建完成: {successful_operations}/{total_operations} 操作成功")
except Exception as e:
self.logger.error(f"创建拓扑失败: {e}")
result['error'] = str(e)
return result
def _create_bindings(self) -> List[bool]:
"""创建绑定关系"""
bindings = [
# 队列绑定
("queue", "normal.queue", "direct.exchange", "normal"),
("queue", "priority.queue", "direct.exchange", "priority"),
("queue", "ttl.queue", "topic.exchange", "ttl.*"),
("queue", "dlx.queue", "dlx.exchange", "dlx"),
("queue", "limited.queue", "fanout.exchange", ""),
# 交换机绑定(可选)
# ("exchange", "topic.exchange", "direct.exchange", "topic.route")
]
results = []
for binding_type, target, source, routing_key in bindings:
try:
if binding_type == "queue":
success = self.queue_manager.bind_queue(target, source, routing_key)
elif binding_type == "exchange":
success = self.exchange_manager.bind_exchange(target, source, routing_key)
else:
success = False
results.append(success)
except Exception as e:
self.logger.error(f"绑定失败 {target} -> {source}: {e}")
results.append(False)
return results
def cleanup_topology(self) -> bool:
"""清理拓扑"""
try:
# 删除队列
for queue_name in self.queue_manager.list_declared_queues():
self.queue_manager.delete_queue(queue_name)
# 删除交换机
for exchange_name in self.exchange_manager.list_declared_exchanges():
self.exchange_manager.delete_exchange(exchange_name)
self.logger.info("拓扑清理完成")
return True
except Exception as e:
self.logger.error(f"拓扑清理失败: {e}")
return False
def get_topology_summary(self) -> Dict[str, Any]:
"""获取拓扑摘要"""
return {
'exchanges': {
'count': len(self.exchange_manager.list_declared_exchanges()),
'names': self.exchange_manager.list_declared_exchanges()
},
'queues': {
'count': len(self.queue_manager.list_declared_queues()),
'names': self.queue_manager.list_declared_queues()
}
}
# 使用示例
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
print("=== RabbitMQ交换机与拓扑管理示例 ===")
# 连接配置
config = ConnectionConfig(
host='localhost',
port=5672,
username='admin',
password='admin123'
)
# 建立连接
conn_manager = RabbitMQConnectionManager(config)
try:
if conn_manager.connect():
# 创建通道
channel_manager = RabbitMQChannelManager(conn_manager)
channel = channel_manager.create_channel("topology_mgmt_channel")
if channel:
# 创建拓扑管理器
topology_manager = RabbitMQTopologyManager(channel)
print("\n1. 创建完整拓扑")
result = topology_manager.create_complete_topology()
if result['success']:
print(f"✅ 拓扑创建成功 (成功率: {result['success_rate']:.2%})")
else:
print(f"❌ 拓扑创建失败 (成功率: {result['success_rate']:.2%})")
if 'error' in result:
print(f"错误: {result['error']}")
print("\n2. 拓扑摘要")
summary = topology_manager.get_topology_summary()
print(f"交换机数量: {summary['exchanges']['count']}")
print(f"队列数量: {summary['queues']['count']}")
print(f"交换机列表: {summary['exchanges']['names']}")
print(f"队列列表: {summary['queues']['names']}")
print("\n3. 测试交换机操作")
exchange_manager = topology_manager.exchange_manager
# 创建测试交换机
test_exchange = ExchangeConfig(
name="test.exchange",
exchange_type=ExchangeType.TOPIC,
durable=False,
auto_delete=True
)
if exchange_manager.declare_exchange(test_exchange):
print("✅ 测试交换机创建成功")
# 删除测试交换机
if exchange_manager.delete_exchange("test.exchange"):
print("✅ 测试交换机删除成功")
# 清理拓扑(可选)
# print("\n4. 清理拓扑")
# if topology_manager.cleanup_topology():
# print("✅ 拓扑清理成功")
# 关闭通道
channel_manager.close_all_channels()
else:
print("❌ 通道创建失败")
else:
print("❌ 连接失败")
except Exception as e:
print(f"❌ 错误: {e}")
finally:
conn_manager.disconnect()
3.4 消息发布与消费
3.4.1 消息发布
import pika
import json
import time
import uuid
from typing import Dict, Any, Optional, List, Union
from dataclasses import dataclass, asdict
from enum import Enum
import threading
from datetime import datetime, timedelta
class DeliveryMode(Enum):
"""消息持久化模式"""
NON_PERSISTENT = 1
PERSISTENT = 2
class Priority(Enum):
"""消息优先级"""
LOW = 1
NORMAL = 5
HIGH = 10
@dataclass
class MessageProperties:
"""消息属性"""
# 基本属性
content_type: str = "application/json"
content_encoding: str = "utf-8"
delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT
priority: int = Priority.NORMAL.value
# 标识属性
correlation_id: Optional[str] = None
reply_to: Optional[str] = None
message_id: Optional[str] = None
# 时间属性
timestamp: Optional[int] = None
expiration: Optional[str] = None # TTL in milliseconds
# 应用属性
app_id: Optional[str] = None
user_id: Optional[str] = None
type: Optional[str] = None
# 自定义头部
headers: Optional[Dict[str, Any]] = None
def __post_init__(self):
if self.headers is None:
self.headers = {}
# 自动生成消息ID
if self.message_id is None:
self.message_id = str(uuid.uuid4())
# 自动设置时间戳
if self.timestamp is None:
self.timestamp = int(time.time())
def to_pika_properties(self) -> pika.BasicProperties:
"""转换为pika属性对象"""
return pika.BasicProperties(
content_type=self.content_type,
content_encoding=self.content_encoding,
delivery_mode=self.delivery_mode.value,
priority=self.priority,
correlation_id=self.correlation_id,
reply_to=self.reply_to,
message_id=self.message_id,
timestamp=self.timestamp,
expiration=self.expiration,
app_id=self.app_id,
user_id=self.user_id,
type=self.type,
headers=self.headers
)
@dataclass
class Message:
"""消息对象"""
body: Union[str, bytes, Dict[str, Any]]
properties: MessageProperties = None
exchange: str = ""
routing_key: str = ""
mandatory: bool = False
immediate: bool = False
def __post_init__(self):
if self.properties is None:
self.properties = MessageProperties()
# 处理消息体
if isinstance(self.body, dict):
self.body = json.dumps(self.body, ensure_ascii=False)
self.properties.content_type = "application/json"
elif isinstance(self.body, str):
self.body = self.body.encode('utf-8')
self.properties.content_type = "text/plain"
def get_body_bytes(self) -> bytes:
"""获取字节格式的消息体"""
if isinstance(self.body, bytes):
return self.body
elif isinstance(self.body, str):
return self.body.encode('utf-8')
else:
return json.dumps(self.body, ensure_ascii=False).encode('utf-8')
class RabbitMQPublisher:
"""RabbitMQ消息发布器"""
def __init__(self, channel: pika.channel.Channel):
self.channel = channel
self.logger = logging.getLogger(__name__)
self._confirm_delivery = False
self._published_messages = 0
self._confirmed_messages = 0
self._failed_messages = 0
self._lock = threading.Lock()
def enable_confirm_delivery(self) -> bool:
"""启用发布确认"""
try:
self.channel.confirm_delivery()
self._confirm_delivery = True
self.logger.info("发布确认已启用")
return True
except Exception as e:
self.logger.error(f"启用发布确认失败: {e}")
return False
def publish_message(self, message: Message) -> bool:
"""发布单条消息"""
try:
with self._lock:
success = self.channel.basic_publish(
exchange=message.exchange,
routing_key=message.routing_key,
body=message.get_body_bytes(),
properties=message.properties.to_pika_properties(),
mandatory=message.mandatory,
immediate=message.immediate
)
self._published_messages += 1
if self._confirm_delivery:
if success:
self._confirmed_messages += 1
self.logger.debug(f"消息发布确认: {message.properties.message_id}")
else:
self._failed_messages += 1
self.logger.warning(f"消息发布失败: {message.properties.message_id}")
return success
except Exception as e:
self.logger.error(f"发布消息失败: {e}")
with self._lock:
self._failed_messages += 1
return False
def publish_batch(self, messages: List[Message]) -> Dict[str, int]:
"""批量发布消息"""
results = {
'total': len(messages),
'success': 0,
'failed': 0
}
for message in messages:
if self.publish_message(message):
results['success'] += 1
else:
results['failed'] += 1
self.logger.info(f"批量发布完成: {results['success']}/{results['total']} 成功")
return results
def publish_with_retry(self, message: Message, max_retries: int = 3, retry_delay: float = 1.0) -> bool:
"""带重试的消息发布"""
for attempt in range(max_retries + 1):
if self.publish_message(message):
if attempt > 0:
self.logger.info(f"消息发布成功 (重试 {attempt} 次): {message.properties.message_id}")
return True
if attempt < max_retries:
self.logger.warning(f"消息发布失败,{retry_delay}秒后重试 (第 {attempt + 1} 次): {message.properties.message_id}")
time.sleep(retry_delay)
self.logger.error(f"消息发布最终失败 (重试 {max_retries} 次): {message.properties.message_id}")
return False
def publish_delayed_message(self, message: Message, delay_seconds: int) -> bool:
"""发布延迟消息"""
# 设置延迟头部
if message.properties.headers is None:
message.properties.headers = {}
message.properties.headers['x-delay'] = delay_seconds * 1000 # 转换为毫秒
self.logger.info(f"发布延迟消息 (延迟 {delay_seconds} 秒): {message.properties.message_id}")
return self.publish_message(message)
def publish_priority_message(self, message: Message, priority: Priority) -> bool:
"""发布优先级消息"""
message.properties.priority = priority.value
self.logger.info(f"发布优先级消息 (优先级 {priority.value}): {message.properties.message_id}")
return self.publish_message(message)
def publish_rpc_request(self, message: Message, reply_queue: str, correlation_id: str = None) -> bool:
"""发布RPC请求消息"""
if correlation_id is None:
correlation_id = str(uuid.uuid4())
message.properties.reply_to = reply_queue
message.properties.correlation_id = correlation_id
self.logger.info(f"发布RPC请求: {correlation_id} -> {reply_queue}")
return self.publish_message(message)
def get_statistics(self) -> Dict[str, int]:
"""获取发布统计信息"""
with self._lock:
return {
'published': self._published_messages,
'confirmed': self._confirmed_messages,
'failed': self._failed_messages,
'pending': self._published_messages - self._confirmed_messages - self._failed_messages
}
def reset_statistics(self):
"""重置统计信息"""
with self._lock:
self._published_messages = 0
self._confirmed_messages = 0
self._failed_messages = 0
class MessageBuilder:
"""消息构建器"""
def __init__(self):
self.reset()
def reset(self):
"""重置构建器"""
self._body = None
self._properties = MessageProperties()
self._exchange = ""
self._routing_key = ""
self._mandatory = False
self._immediate = False
return self
def body(self, body: Union[str, bytes, Dict[str, Any]]):
"""设置消息体"""
self._body = body
return self
def exchange(self, exchange: str):
"""设置交换机"""
self._exchange = exchange
return self
def routing_key(self, routing_key: str):
"""设置路由键"""
self._routing_key = routing_key
return self
def content_type(self, content_type: str):
"""设置内容类型"""
self._properties.content_type = content_type
return self
def priority(self, priority: Union[int, Priority]):
"""设置优先级"""
if isinstance(priority, Priority):
self._properties.priority = priority.value
else:
self._properties.priority = priority
return self
def persistent(self, persistent: bool = True):
"""设置持久化"""
self._properties.delivery_mode = DeliveryMode.PERSISTENT if persistent else DeliveryMode.NON_PERSISTENT
return self
def ttl(self, ttl_seconds: int):
"""设置TTL"""
self._properties.expiration = str(ttl_seconds * 1000)
return self
def correlation_id(self, correlation_id: str):
"""设置关联ID"""
self._properties.correlation_id = correlation_id
return self
def reply_to(self, reply_to: str):
"""设置回复队列"""
self._properties.reply_to = reply_to
return self
def header(self, key: str, value: Any):
"""添加头部"""
if self._properties.headers is None:
self._properties.headers = {}
self._properties.headers[key] = value
return self
def headers(self, headers: Dict[str, Any]):
"""设置头部"""
self._properties.headers = headers
return self
def mandatory(self, mandatory: bool = True):
"""设置强制路由"""
self._mandatory = mandatory
return self
def build(self) -> Message:
"""构建消息"""
if self._body is None:
raise ValueError("消息体不能为空")
message = Message(
body=self._body,
properties=self._properties,
exchange=self._exchange,
routing_key=self._routing_key,
mandatory=self._mandatory,
immediate=self._immediate
)
return message
# 使用示例
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
print("=== RabbitMQ消息发布示例 ===")
# 连接配置
config = ConnectionConfig(
host='localhost',
port=5672,
username='admin',
password='admin123'
)
# 建立连接
conn_manager = RabbitMQConnectionManager(config)
try:
if conn_manager.connect():
# 创建通道
channel_manager = RabbitMQChannelManager(conn_manager)
channel = channel_manager.create_channel("publisher_channel")
if channel:
# 创建发布器
publisher = RabbitMQPublisher(channel)
publisher.enable_confirm_delivery()
# 创建拓扑
topology_manager = RabbitMQTopologyManager(channel)
topology_result = topology_manager.create_complete_topology()
if topology_result['success']:
print("✅ 拓扑创建成功")
# 使用消息构建器创建消息
print("\n1. 发布普通消息")
builder = MessageBuilder()
normal_message = (builder
.reset()
.body({"type": "normal", "content": "这是一条普通消息", "timestamp": time.time()})
.exchange("direct.exchange")
.routing_key("normal")
.persistent(True)
.build())
if publisher.publish_message(normal_message):
print("✅ 普通消息发布成功")
# 发布优先级消息
print("\n2. 发布优先级消息")
priority_message = (builder
.reset()
.body({"type": "priority", "content": "这是一条高优先级消息"})
.exchange("direct.exchange")
.routing_key("priority")
.priority(Priority.HIGH)
.build())
if publisher.publish_priority_message(priority_message, Priority.HIGH):
print("✅ 优先级消息发布成功")
# 发布TTL消息
print("\n3. 发布TTL消息")
ttl_message = (builder
.reset()
.body({"type": "ttl", "content": "这是一条TTL消息"})
.exchange("topic.exchange")
.routing_key("ttl.test")
.ttl(30) # 30秒TTL
.build())
if publisher.publish_message(ttl_message):
print("✅ TTL消息发布成功")
# 批量发布消息
print("\n4. 批量发布消息")
batch_messages = []
for i in range(5):
message = (builder
.reset()
.body({"type": "batch", "index": i, "content": f"批量消息 {i}"})
.exchange("fanout.exchange")
.routing_key("")
.build())
batch_messages.append(message)
batch_result = publisher.publish_batch(batch_messages)
print(f"✅ 批量发布完成: {batch_result['success']}/{batch_result['total']} 成功")
# 获取统计信息
print("\n5. 发布统计信息")
stats = publisher.get_statistics()
print(f"已发布: {stats['published']}")
print(f"已确认: {stats['confirmed']}")
print(f"失败: {stats['failed']}")
print(f"待确认: {stats['pending']}")
else:
print("❌ 拓扑创建失败")
# 关闭通道
channel_manager.close_all_channels()
else:
print("❌ 通道创建失败")
else:
print("❌ 连接失败")
except Exception as e:
print(f"❌ 错误: {e}")
finally:
conn_manager.disconnect()