1.1 RabbitMQ简介
1.1.1 什么是RabbitMQ
RabbitMQ是一个开源的消息代理和队列服务器,用于在分布式系统中传递消息。它实现了高级消息队列协议(AMQP),提供了可靠、灵活的消息传递机制。
import pika
import json
import time
from typing import Dict, Any, Optional, Callable
from dataclasses import dataclass
from datetime import datetime
import logging
@dataclass
class RabbitMQConfig:
"""RabbitMQ配置类"""
host: str = 'localhost'
port: int = 5672
username: str = 'guest'
password: str = 'guest'
virtual_host: str = '/'
heartbeat: int = 600
blocked_connection_timeout: int = 300
class RabbitMQConnection:
"""RabbitMQ连接管理器"""
def __init__(self, config: RabbitMQConfig):
self.config = config
self.connection = None
self.channel = None
self.logger = logging.getLogger(__name__)
def connect(self):
"""建立连接"""
try:
# 设置连接参数
credentials = pika.PlainCredentials(
self.config.username,
self.config.password
)
parameters = pika.ConnectionParameters(
host=self.config.host,
port=self.config.port,
virtual_host=self.config.virtual_host,
credentials=credentials,
heartbeat=self.config.heartbeat,
blocked_connection_timeout=self.config.blocked_connection_timeout
)
# 建立连接
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
self.logger.info(f"连接到RabbitMQ成功: {self.config.host}:{self.config.port}")
return True
except Exception as e:
self.logger.error(f"连接RabbitMQ失败: {e}")
return False
def disconnect(self):
"""断开连接"""
try:
if self.channel and not self.channel.is_closed:
self.channel.close()
if self.connection and not self.connection.is_closed:
self.connection.close()
self.logger.info("RabbitMQ连接已断开")
except Exception as e:
self.logger.error(f"断开RabbitMQ连接失败: {e}")
def is_connected(self) -> bool:
"""检查连接状态"""
return (self.connection and not self.connection.is_closed and
self.channel and not self.channel.is_closed)
def get_channel(self):
"""获取通道"""
if not self.is_connected():
if not self.connect():
raise Exception("无法建立RabbitMQ连接")
return self.channel
# 使用示例
if __name__ == "__main__":
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
print("=== RabbitMQ连接示例 ===")
# 创建配置
config = RabbitMQConfig(
host='localhost',
port=5672,
username='guest',
password='guest'
)
# 创建连接
conn = RabbitMQConnection(config)
try:
# 连接到RabbitMQ
if conn.connect():
print("✅ 连接成功")
# 获取通道
channel = conn.get_channel()
print(f"✅ 通道获取成功: {channel}")
# 检查连接状态
print(f"连接状态: {conn.is_connected()}")
else:
print("❌ 连接失败")
except Exception as e:
print(f"❌ 错误: {e}")
finally:
# 断开连接
conn.disconnect()
print("连接已断开")
1.1.2 RabbitMQ的特点
- 可靠性:支持消息持久化、确认机制
- 灵活的路由:支持多种交换器类型
- 集群支持:支持高可用集群部署
- 多协议支持:AMQP、STOMP、MQTT等
- 管理界面:提供Web管理控制台
- 插件系统:丰富的插件生态
1.2 AMQP协议
1.2.1 AMQP概述
AMQP(Advanced Message Queuing Protocol)是一个开放标准的应用层协议,为面向消息的中间件设计。
class AMQPMessage:
"""AMQP消息类"""
def __init__(self, body: bytes, properties: Dict[str, Any] = None):
self.body = body
self.properties = properties or {}
self.delivery_info = {}
def set_property(self, key: str, value: Any):
"""设置消息属性"""
self.properties[key] = value
def get_property(self, key: str, default: Any = None):
"""获取消息属性"""
return self.properties.get(key, default)
def set_delivery_info(self, delivery_tag: int, exchange: str, routing_key: str):
"""设置投递信息"""
self.delivery_info = {
'delivery_tag': delivery_tag,
'exchange': exchange,
'routing_key': routing_key,
'timestamp': datetime.now().isoformat()
}
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
'body': self.body.decode('utf-8') if isinstance(self.body, bytes) else self.body,
'properties': self.properties,
'delivery_info': self.delivery_info
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'AMQPMessage':
"""从字典创建消息"""
body = data['body']
if isinstance(body, str):
body = body.encode('utf-8')
message = cls(body, data.get('properties', {}))
message.delivery_info = data.get('delivery_info', {})
return message
def __str__(self) -> str:
return f"AMQPMessage(body={self.body}, properties={self.properties})"
class AMQPMessageProperties:
"""AMQP消息属性常量"""
# 基本属性
CONTENT_TYPE = 'content_type'
CONTENT_ENCODING = 'content_encoding'
HEADERS = 'headers'
DELIVERY_MODE = 'delivery_mode' # 1=非持久化, 2=持久化
PRIORITY = 'priority'
CORRELATION_ID = 'correlation_id'
REPLY_TO = 'reply_to'
EXPIRATION = 'expiration'
MESSAGE_ID = 'message_id'
TIMESTAMP = 'timestamp'
TYPE = 'type'
USER_ID = 'user_id'
APP_ID = 'app_id'
CLUSTER_ID = 'cluster_id'
# 投递模式
DELIVERY_MODE_TRANSIENT = 1 # 非持久化
DELIVERY_MODE_PERSISTENT = 2 # 持久化
# 使用示例
if __name__ == "__main__":
print("=== AMQP消息示例 ===")
# 创建消息
message_data = {
'user_id': 12345,
'action': 'user_login',
'timestamp': datetime.now().isoformat()
}
message = AMQPMessage(
body=json.dumps(message_data).encode('utf-8')
)
# 设置消息属性
message.set_property(AMQPMessageProperties.CONTENT_TYPE, 'application/json')
message.set_property(AMQPMessageProperties.DELIVERY_MODE, AMQPMessageProperties.DELIVERY_MODE_PERSISTENT)
message.set_property(AMQPMessageProperties.MESSAGE_ID, str(uuid.uuid4()))
message.set_property(AMQPMessageProperties.TIMESTAMP, int(time.time()))
# 设置投递信息
message.set_delivery_info(
delivery_tag=1,
exchange='user_events',
routing_key='user.login'
)
print(f"消息: {message}")
print(f"消息字典: {json.dumps(message.to_dict(), indent=2, ensure_ascii=False)}")
# 从字典恢复消息
restored_message = AMQPMessage.from_dict(message.to_dict())
print(f"恢复的消息: {restored_message}")
1.2.2 AMQP核心概念
- Virtual Host(虚拟主机):逻辑隔离单元
- Exchange(交换器):消息路由器
- Queue(队列):消息存储容器
- Binding(绑定):交换器和队列的关联
- Routing Key(路由键):消息路由规则
- Message(消息):传输的数据单元
1.3 RabbitMQ架构
1.3.1 核心组件
from enum import Enum
from typing import List, Set
class ExchangeType(Enum):
"""交换器类型"""
DIRECT = 'direct'
FANOUT = 'fanout'
TOPIC = 'topic'
HEADERS = 'headers'
class QueueType(Enum):
"""队列类型"""
CLASSIC = 'classic'
QUORUM = 'quorum'
STREAM = 'stream'
@dataclass
class Exchange:
"""交换器"""
name: str
type: ExchangeType
durable: bool = True
auto_delete: bool = False
internal: bool = False
arguments: Dict[str, Any] = None
def __post_init__(self):
if self.arguments is None:
self.arguments = {}
@dataclass
class Queue:
"""队列"""
name: str
durable: bool = True
exclusive: bool = False
auto_delete: bool = False
arguments: Dict[str, Any] = None
def __post_init__(self):
if self.arguments is None:
self.arguments = {}
@dataclass
class Binding:
"""绑定"""
exchange: str
queue: str
routing_key: str = ''
arguments: Dict[str, Any] = None
def __post_init__(self):
if self.arguments is None:
self.arguments = {}
class RabbitMQTopology:
"""RabbitMQ拓扑管理器"""
def __init__(self, connection: RabbitMQConnection):
self.connection = connection
self.exchanges: Dict[str, Exchange] = {}
self.queues: Dict[str, Queue] = {}
self.bindings: List[Binding] = []
self.logger = logging.getLogger(__name__)
def declare_exchange(self, exchange: Exchange) -> bool:
"""声明交换器"""
try:
channel = self.connection.get_channel()
channel.exchange_declare(
exchange=exchange.name,
exchange_type=exchange.type.value,
durable=exchange.durable,
auto_delete=exchange.auto_delete,
internal=exchange.internal,
arguments=exchange.arguments
)
self.exchanges[exchange.name] = exchange
self.logger.info(f"交换器声明成功: {exchange.name} ({exchange.type.value})")
return True
except Exception as e:
self.logger.error(f"声明交换器失败: {e}")
return False
def declare_queue(self, queue: Queue) -> bool:
"""声明队列"""
try:
channel = self.connection.get_channel()
result = channel.queue_declare(
queue=queue.name,
durable=queue.durable,
exclusive=queue.exclusive,
auto_delete=queue.auto_delete,
arguments=queue.arguments
)
# 如果队列名为空,使用服务器生成的名称
if not queue.name:
queue.name = result.method.queue
self.queues[queue.name] = queue
self.logger.info(f"队列声明成功: {queue.name}")
return True
except Exception as e:
self.logger.error(f"声明队列失败: {e}")
return False
def bind_queue(self, binding: Binding) -> bool:
"""绑定队列到交换器"""
try:
channel = self.connection.get_channel()
channel.queue_bind(
exchange=binding.exchange,
queue=binding.queue,
routing_key=binding.routing_key,
arguments=binding.arguments
)
self.bindings.append(binding)
self.logger.info(f"队列绑定成功: {binding.queue} -> {binding.exchange} ({binding.routing_key})")
return True
except Exception as e:
self.logger.error(f"绑定队列失败: {e}")
return False
def delete_exchange(self, exchange_name: str, if_unused: bool = False) -> bool:
"""删除交换器"""
try:
channel = self.connection.get_channel()
channel.exchange_delete(
exchange=exchange_name,
if_unused=if_unused
)
if exchange_name in self.exchanges:
del self.exchanges[exchange_name]
self.logger.info(f"交换器删除成功: {exchange_name}")
return True
except Exception as e:
self.logger.error(f"删除交换器失败: {e}")
return False
def delete_queue(self, queue_name: str, if_unused: bool = False, if_empty: bool = False) -> bool:
"""删除队列"""
try:
channel = self.connection.get_channel()
channel.queue_delete(
queue=queue_name,
if_unused=if_unused,
if_empty=if_empty
)
if queue_name in self.queues:
del self.queues[queue_name]
self.logger.info(f"队列删除成功: {queue_name}")
return True
except Exception as e:
self.logger.error(f"删除队列失败: {e}")
return False
def unbind_queue(self, binding: Binding) -> bool:
"""解绑队列"""
try:
channel = self.connection.get_channel()
channel.queue_unbind(
exchange=binding.exchange,
queue=binding.queue,
routing_key=binding.routing_key,
arguments=binding.arguments
)
# 从绑定列表中移除
self.bindings = [b for b in self.bindings if not (
b.exchange == binding.exchange and
b.queue == binding.queue and
b.routing_key == binding.routing_key
)]
self.logger.info(f"队列解绑成功: {binding.queue} -> {binding.exchange} ({binding.routing_key})")
return True
except Exception as e:
self.logger.error(f"解绑队列失败: {e}")
return False
def get_topology_info(self) -> Dict[str, Any]:
"""获取拓扑信息"""
return {
'exchanges': {name: {
'type': ex.type.value,
'durable': ex.durable,
'auto_delete': ex.auto_delete,
'internal': ex.internal,
'arguments': ex.arguments
} for name, ex in self.exchanges.items()},
'queues': {name: {
'durable': q.durable,
'exclusive': q.exclusive,
'auto_delete': q.auto_delete,
'arguments': q.arguments
} for name, q in self.queues.items()},
'bindings': [{
'exchange': b.exchange,
'queue': b.queue,
'routing_key': b.routing_key,
'arguments': b.arguments
} for b in self.bindings]
}
# 使用示例
if __name__ == "__main__":
print("=== RabbitMQ拓扑管理示例 ===")
# 创建连接
config = RabbitMQConfig()
conn = RabbitMQConnection(config)
try:
if conn.connect():
# 创建拓扑管理器
topology = RabbitMQTopology(conn)
# 声明交换器
user_exchange = Exchange(
name='user_events',
type=ExchangeType.TOPIC,
durable=True
)
topology.declare_exchange(user_exchange)
# 声明队列
login_queue = Queue(
name='user_login_queue',
durable=True
)
topology.declare_queue(login_queue)
logout_queue = Queue(
name='user_logout_queue',
durable=True
)
topology.declare_queue(logout_queue)
# 绑定队列
login_binding = Binding(
exchange='user_events',
queue='user_login_queue',
routing_key='user.login'
)
topology.bind_queue(login_binding)
logout_binding = Binding(
exchange='user_events',
queue='user_logout_queue',
routing_key='user.logout'
)
topology.bind_queue(logout_binding)
# 显示拓扑信息
topology_info = topology.get_topology_info()
print("\n拓扑信息:")
print(json.dumps(topology_info, indent=2, ensure_ascii=False))
except Exception as e:
print(f"❌ 错误: {e}")
finally:
conn.disconnect()
1.3.2 消息流转过程
- 生产者发送消息:消息发送到交换器
- 交换器路由消息:根据路由规则分发消息
- 队列存储消息:消息存储在匹配的队列中
- 消费者接收消息:从队列中获取并处理消息
- 消息确认:消费者确认消息处理完成
1.4 章节总结
1.4.1 核心知识点
RabbitMQ基础
- 消息代理和队列服务器
- AMQP协议实现
- 可靠性和灵活性特点
AMQP协议
- 开放标准的消息协议
- 消息属性和投递模式
- 协议层次结构
核心组件
- 虚拟主机、交换器、队列
- 绑定和路由键
- 消息和连接管理
1.4.2 最佳实践
连接管理
- 使用连接池
- 合理设置心跳和超时
- 优雅的连接关闭
拓扑设计
- 合理的交换器类型选择
- 持久化配置
- 命名规范
错误处理
- 连接异常处理
- 重试机制
- 日志记录
1.4.3 练习题
基础练习
- 建立RabbitMQ连接
- 声明不同类型的交换器
- 创建和绑定队列
进阶练习
- 实现拓扑管理器
- 设计消息属性管理
- 实现连接池
实战练习
- 设计用户事件系统拓扑
- 实现动态拓扑管理
- 构建监控和管理工具
通过本章的学习,我们了解了RabbitMQ的基础概念、AMQP协议和核心架构。这些知识为后续深入学习RabbitMQ的高级特性和实际应用奠定了坚实的基础。