1.1 RabbitMQ简介

1.1.1 什么是RabbitMQ

RabbitMQ是一个开源的消息代理和队列服务器,用于在分布式系统中传递消息。它实现了高级消息队列协议(AMQP),提供了可靠、灵活的消息传递机制。

import pika
import json
import time
from typing import Dict, Any, Optional, Callable
from dataclasses import dataclass
from datetime import datetime
import logging

@dataclass
class RabbitMQConfig:
    """RabbitMQ配置类"""
    host: str = 'localhost'
    port: int = 5672
    username: str = 'guest'
    password: str = 'guest'
    virtual_host: str = '/'
    heartbeat: int = 600
    blocked_connection_timeout: int = 300
    
class RabbitMQConnection:
    """RabbitMQ连接管理器"""
    
    def __init__(self, config: RabbitMQConfig):
        self.config = config
        self.connection = None
        self.channel = None
        self.logger = logging.getLogger(__name__)
    
    def connect(self):
        """建立连接"""
        try:
            # 设置连接参数
            credentials = pika.PlainCredentials(
                self.config.username, 
                self.config.password
            )
            
            parameters = pika.ConnectionParameters(
                host=self.config.host,
                port=self.config.port,
                virtual_host=self.config.virtual_host,
                credentials=credentials,
                heartbeat=self.config.heartbeat,
                blocked_connection_timeout=self.config.blocked_connection_timeout
            )
            
            # 建立连接
            self.connection = pika.BlockingConnection(parameters)
            self.channel = self.connection.channel()
            
            self.logger.info(f"连接到RabbitMQ成功: {self.config.host}:{self.config.port}")
            return True
            
        except Exception as e:
            self.logger.error(f"连接RabbitMQ失败: {e}")
            return False
    
    def disconnect(self):
        """断开连接"""
        try:
            if self.channel and not self.channel.is_closed:
                self.channel.close()
            
            if self.connection and not self.connection.is_closed:
                self.connection.close()
            
            self.logger.info("RabbitMQ连接已断开")
            
        except Exception as e:
            self.logger.error(f"断开RabbitMQ连接失败: {e}")
    
    def is_connected(self) -> bool:
        """检查连接状态"""
        return (self.connection and not self.connection.is_closed and 
                self.channel and not self.channel.is_closed)
    
    def get_channel(self):
        """获取通道"""
        if not self.is_connected():
            if not self.connect():
                raise Exception("无法建立RabbitMQ连接")
        return self.channel

# 使用示例
if __name__ == "__main__":
    # 配置日志
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    print("=== RabbitMQ连接示例 ===")
    
    # 创建配置
    config = RabbitMQConfig(
        host='localhost',
        port=5672,
        username='guest',
        password='guest'
    )
    
    # 创建连接
    conn = RabbitMQConnection(config)
    
    try:
        # 连接到RabbitMQ
        if conn.connect():
            print("✅ 连接成功")
            
            # 获取通道
            channel = conn.get_channel()
            print(f"✅ 通道获取成功: {channel}")
            
            # 检查连接状态
            print(f"连接状态: {conn.is_connected()}")
        else:
            print("❌ 连接失败")
    
    except Exception as e:
        print(f"❌ 错误: {e}")
    
    finally:
        # 断开连接
        conn.disconnect()
        print("连接已断开")

1.1.2 RabbitMQ的特点

  1. 可靠性:支持消息持久化、确认机制
  2. 灵活的路由:支持多种交换器类型
  3. 集群支持:支持高可用集群部署
  4. 多协议支持:AMQP、STOMP、MQTT等
  5. 管理界面:提供Web管理控制台
  6. 插件系统:丰富的插件生态

1.2 AMQP协议

1.2.1 AMQP概述

AMQP(Advanced Message Queuing Protocol)是一个开放标准的应用层协议,为面向消息的中间件设计。

class AMQPMessage:
    """AMQP消息类"""
    
    def __init__(self, body: bytes, properties: Dict[str, Any] = None):
        self.body = body
        self.properties = properties or {}
        self.delivery_info = {}
    
    def set_property(self, key: str, value: Any):
        """设置消息属性"""
        self.properties[key] = value
    
    def get_property(self, key: str, default: Any = None):
        """获取消息属性"""
        return self.properties.get(key, default)
    
    def set_delivery_info(self, delivery_tag: int, exchange: str, routing_key: str):
        """设置投递信息"""
        self.delivery_info = {
            'delivery_tag': delivery_tag,
            'exchange': exchange,
            'routing_key': routing_key,
            'timestamp': datetime.now().isoformat()
        }
    
    def to_dict(self) -> Dict[str, Any]:
        """转换为字典"""
        return {
            'body': self.body.decode('utf-8') if isinstance(self.body, bytes) else self.body,
            'properties': self.properties,
            'delivery_info': self.delivery_info
        }
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'AMQPMessage':
        """从字典创建消息"""
        body = data['body']
        if isinstance(body, str):
            body = body.encode('utf-8')
        
        message = cls(body, data.get('properties', {}))
        message.delivery_info = data.get('delivery_info', {})
        return message
    
    def __str__(self) -> str:
        return f"AMQPMessage(body={self.body}, properties={self.properties})"

class AMQPMessageProperties:
    """AMQP消息属性常量"""
    
    # 基本属性
    CONTENT_TYPE = 'content_type'
    CONTENT_ENCODING = 'content_encoding'
    HEADERS = 'headers'
    DELIVERY_MODE = 'delivery_mode'  # 1=非持久化, 2=持久化
    PRIORITY = 'priority'
    CORRELATION_ID = 'correlation_id'
    REPLY_TO = 'reply_to'
    EXPIRATION = 'expiration'
    MESSAGE_ID = 'message_id'
    TIMESTAMP = 'timestamp'
    TYPE = 'type'
    USER_ID = 'user_id'
    APP_ID = 'app_id'
    CLUSTER_ID = 'cluster_id'
    
    # 投递模式
    DELIVERY_MODE_TRANSIENT = 1  # 非持久化
    DELIVERY_MODE_PERSISTENT = 2  # 持久化

# 使用示例
if __name__ == "__main__":
    print("=== AMQP消息示例 ===")
    
    # 创建消息
    message_data = {
        'user_id': 12345,
        'action': 'user_login',
        'timestamp': datetime.now().isoformat()
    }
    
    message = AMQPMessage(
        body=json.dumps(message_data).encode('utf-8')
    )
    
    # 设置消息属性
    message.set_property(AMQPMessageProperties.CONTENT_TYPE, 'application/json')
    message.set_property(AMQPMessageProperties.DELIVERY_MODE, AMQPMessageProperties.DELIVERY_MODE_PERSISTENT)
    message.set_property(AMQPMessageProperties.MESSAGE_ID, str(uuid.uuid4()))
    message.set_property(AMQPMessageProperties.TIMESTAMP, int(time.time()))
    
    # 设置投递信息
    message.set_delivery_info(
        delivery_tag=1,
        exchange='user_events',
        routing_key='user.login'
    )
    
    print(f"消息: {message}")
    print(f"消息字典: {json.dumps(message.to_dict(), indent=2, ensure_ascii=False)}")
    
    # 从字典恢复消息
    restored_message = AMQPMessage.from_dict(message.to_dict())
    print(f"恢复的消息: {restored_message}")

1.2.2 AMQP核心概念

  1. Virtual Host(虚拟主机):逻辑隔离单元
  2. Exchange(交换器):消息路由器
  3. Queue(队列):消息存储容器
  4. Binding(绑定):交换器和队列的关联
  5. Routing Key(路由键):消息路由规则
  6. Message(消息):传输的数据单元

1.3 RabbitMQ架构

1.3.1 核心组件

from enum import Enum
from typing import List, Set

class ExchangeType(Enum):
    """交换器类型"""
    DIRECT = 'direct'
    FANOUT = 'fanout'
    TOPIC = 'topic'
    HEADERS = 'headers'

class QueueType(Enum):
    """队列类型"""
    CLASSIC = 'classic'
    QUORUM = 'quorum'
    STREAM = 'stream'

@dataclass
class Exchange:
    """交换器"""
    name: str
    type: ExchangeType
    durable: bool = True
    auto_delete: bool = False
    internal: bool = False
    arguments: Dict[str, Any] = None
    
    def __post_init__(self):
        if self.arguments is None:
            self.arguments = {}

@dataclass
class Queue:
    """队列"""
    name: str
    durable: bool = True
    exclusive: bool = False
    auto_delete: bool = False
    arguments: Dict[str, Any] = None
    
    def __post_init__(self):
        if self.arguments is None:
            self.arguments = {}

@dataclass
class Binding:
    """绑定"""
    exchange: str
    queue: str
    routing_key: str = ''
    arguments: Dict[str, Any] = None
    
    def __post_init__(self):
        if self.arguments is None:
            self.arguments = {}

class RabbitMQTopology:
    """RabbitMQ拓扑管理器"""
    
    def __init__(self, connection: RabbitMQConnection):
        self.connection = connection
        self.exchanges: Dict[str, Exchange] = {}
        self.queues: Dict[str, Queue] = {}
        self.bindings: List[Binding] = []
        self.logger = logging.getLogger(__name__)
    
    def declare_exchange(self, exchange: Exchange) -> bool:
        """声明交换器"""
        try:
            channel = self.connection.get_channel()
            
            channel.exchange_declare(
                exchange=exchange.name,
                exchange_type=exchange.type.value,
                durable=exchange.durable,
                auto_delete=exchange.auto_delete,
                internal=exchange.internal,
                arguments=exchange.arguments
            )
            
            self.exchanges[exchange.name] = exchange
            self.logger.info(f"交换器声明成功: {exchange.name} ({exchange.type.value})")
            return True
            
        except Exception as e:
            self.logger.error(f"声明交换器失败: {e}")
            return False
    
    def declare_queue(self, queue: Queue) -> bool:
        """声明队列"""
        try:
            channel = self.connection.get_channel()
            
            result = channel.queue_declare(
                queue=queue.name,
                durable=queue.durable,
                exclusive=queue.exclusive,
                auto_delete=queue.auto_delete,
                arguments=queue.arguments
            )
            
            # 如果队列名为空,使用服务器生成的名称
            if not queue.name:
                queue.name = result.method.queue
            
            self.queues[queue.name] = queue
            self.logger.info(f"队列声明成功: {queue.name}")
            return True
            
        except Exception as e:
            self.logger.error(f"声明队列失败: {e}")
            return False
    
    def bind_queue(self, binding: Binding) -> bool:
        """绑定队列到交换器"""
        try:
            channel = self.connection.get_channel()
            
            channel.queue_bind(
                exchange=binding.exchange,
                queue=binding.queue,
                routing_key=binding.routing_key,
                arguments=binding.arguments
            )
            
            self.bindings.append(binding)
            self.logger.info(f"队列绑定成功: {binding.queue} -> {binding.exchange} ({binding.routing_key})")
            return True
            
        except Exception as e:
            self.logger.error(f"绑定队列失败: {e}")
            return False
    
    def delete_exchange(self, exchange_name: str, if_unused: bool = False) -> bool:
        """删除交换器"""
        try:
            channel = self.connection.get_channel()
            
            channel.exchange_delete(
                exchange=exchange_name,
                if_unused=if_unused
            )
            
            if exchange_name in self.exchanges:
                del self.exchanges[exchange_name]
            
            self.logger.info(f"交换器删除成功: {exchange_name}")
            return True
            
        except Exception as e:
            self.logger.error(f"删除交换器失败: {e}")
            return False
    
    def delete_queue(self, queue_name: str, if_unused: bool = False, if_empty: bool = False) -> bool:
        """删除队列"""
        try:
            channel = self.connection.get_channel()
            
            channel.queue_delete(
                queue=queue_name,
                if_unused=if_unused,
                if_empty=if_empty
            )
            
            if queue_name in self.queues:
                del self.queues[queue_name]
            
            self.logger.info(f"队列删除成功: {queue_name}")
            return True
            
        except Exception as e:
            self.logger.error(f"删除队列失败: {e}")
            return False
    
    def unbind_queue(self, binding: Binding) -> bool:
        """解绑队列"""
        try:
            channel = self.connection.get_channel()
            
            channel.queue_unbind(
                exchange=binding.exchange,
                queue=binding.queue,
                routing_key=binding.routing_key,
                arguments=binding.arguments
            )
            
            # 从绑定列表中移除
            self.bindings = [b for b in self.bindings if not (
                b.exchange == binding.exchange and
                b.queue == binding.queue and
                b.routing_key == binding.routing_key
            )]
            
            self.logger.info(f"队列解绑成功: {binding.queue} -> {binding.exchange} ({binding.routing_key})")
            return True
            
        except Exception as e:
            self.logger.error(f"解绑队列失败: {e}")
            return False
    
    def get_topology_info(self) -> Dict[str, Any]:
        """获取拓扑信息"""
        return {
            'exchanges': {name: {
                'type': ex.type.value,
                'durable': ex.durable,
                'auto_delete': ex.auto_delete,
                'internal': ex.internal,
                'arguments': ex.arguments
            } for name, ex in self.exchanges.items()},
            'queues': {name: {
                'durable': q.durable,
                'exclusive': q.exclusive,
                'auto_delete': q.auto_delete,
                'arguments': q.arguments
            } for name, q in self.queues.items()},
            'bindings': [{
                'exchange': b.exchange,
                'queue': b.queue,
                'routing_key': b.routing_key,
                'arguments': b.arguments
            } for b in self.bindings]
        }

# 使用示例
if __name__ == "__main__":
    print("=== RabbitMQ拓扑管理示例 ===")
    
    # 创建连接
    config = RabbitMQConfig()
    conn = RabbitMQConnection(config)
    
    try:
        if conn.connect():
            # 创建拓扑管理器
            topology = RabbitMQTopology(conn)
            
            # 声明交换器
            user_exchange = Exchange(
                name='user_events',
                type=ExchangeType.TOPIC,
                durable=True
            )
            topology.declare_exchange(user_exchange)
            
            # 声明队列
            login_queue = Queue(
                name='user_login_queue',
                durable=True
            )
            topology.declare_queue(login_queue)
            
            logout_queue = Queue(
                name='user_logout_queue',
                durable=True
            )
            topology.declare_queue(logout_queue)
            
            # 绑定队列
            login_binding = Binding(
                exchange='user_events',
                queue='user_login_queue',
                routing_key='user.login'
            )
            topology.bind_queue(login_binding)
            
            logout_binding = Binding(
                exchange='user_events',
                queue='user_logout_queue',
                routing_key='user.logout'
            )
            topology.bind_queue(logout_binding)
            
            # 显示拓扑信息
            topology_info = topology.get_topology_info()
            print("\n拓扑信息:")
            print(json.dumps(topology_info, indent=2, ensure_ascii=False))
    
    except Exception as e:
        print(f"❌ 错误: {e}")
    
    finally:
        conn.disconnect()

1.3.2 消息流转过程

  1. 生产者发送消息:消息发送到交换器
  2. 交换器路由消息:根据路由规则分发消息
  3. 队列存储消息:消息存储在匹配的队列中
  4. 消费者接收消息:从队列中获取并处理消息
  5. 消息确认:消费者确认消息处理完成

1.4 章节总结

1.4.1 核心知识点

  1. RabbitMQ基础

    • 消息代理和队列服务器
    • AMQP协议实现
    • 可靠性和灵活性特点
  2. AMQP协议

    • 开放标准的消息协议
    • 消息属性和投递模式
    • 协议层次结构
  3. 核心组件

    • 虚拟主机、交换器、队列
    • 绑定和路由键
    • 消息和连接管理

1.4.2 最佳实践

  1. 连接管理

    • 使用连接池
    • 合理设置心跳和超时
    • 优雅的连接关闭
  2. 拓扑设计

    • 合理的交换器类型选择
    • 持久化配置
    • 命名规范
  3. 错误处理

    • 连接异常处理
    • 重试机制
    • 日志记录

1.4.3 练习题

  1. 基础练习

    • 建立RabbitMQ连接
    • 声明不同类型的交换器
    • 创建和绑定队列
  2. 进阶练习

    • 实现拓扑管理器
    • 设计消息属性管理
    • 实现连接池
  3. 实战练习

    • 设计用户事件系统拓扑
    • 实现动态拓扑管理
    • 构建监控和管理工具

通过本章的学习,我们了解了RabbitMQ的基础概念、AMQP协议和核心架构。这些知识为后续深入学习RabbitMQ的高级特性和实际应用奠定了坚实的基础。