3.1 连接与通道管理

3.1.1 连接管理

import pika
import time
import threading
import logging
from typing import Dict, Any, Optional, Callable, List
from dataclasses import dataclass
from enum import Enum
import ssl
import json

class ConnectionState(Enum):
    """连接状态枚举"""
    DISCONNECTED = "disconnected"
    CONNECTING = "connecting"
    CONNECTED = "connected"
    RECONNECTING = "reconnecting"
    CLOSED = "closed"

@dataclass
class ConnectionConfig:
    """连接配置"""
    host: str = 'localhost'
    port: int = 5672
    virtual_host: str = '/'
    username: str = 'guest'
    password: str = 'guest'
    
    # 连接参数
    heartbeat: int = 600
    blocked_connection_timeout: int = 300
    connection_attempts: int = 3
    retry_delay: float = 2.0
    socket_timeout: float = 10.0
    
    # SSL配置
    ssl_enabled: bool = False
    ssl_options: Dict[str, Any] = None
    
    # 其他配置
    locale: str = 'en_US'
    client_properties: Dict[str, Any] = None
    
    def __post_init__(self):
        if self.ssl_options is None:
            self.ssl_options = {}
        if self.client_properties is None:
            self.client_properties = {
                'product': 'RabbitMQ Python Client',
                'version': '1.0.0'
            }

class RabbitMQConnectionManager:
    """RabbitMQ连接管理器"""
    
    def __init__(self, config: ConnectionConfig):
        self.config = config
        self.connection: Optional[pika.BlockingConnection] = None
        self.state = ConnectionState.DISCONNECTED
        self.logger = logging.getLogger(__name__)
        self._lock = threading.Lock()
        self._reconnect_thread: Optional[threading.Thread] = None
        self._stop_reconnect = threading.Event()
        
        # 回调函数
        self.on_connection_open: Optional[Callable] = None
        self.on_connection_closed: Optional[Callable] = None
        self.on_connection_error: Optional[Callable] = None
    
    def connect(self) -> bool:
        """建立连接"""
        with self._lock:
            if self.state == ConnectionState.CONNECTED:
                self.logger.info("连接已存在")
                return True
            
            try:
                self.state = ConnectionState.CONNECTING
                self.logger.info(f"正在连接到 {self.config.host}:{self.config.port}")
                
                # 构建连接参数
                connection_params = self._build_connection_params()
                
                # 建立连接
                self.connection = pika.BlockingConnection(connection_params)
                self.state = ConnectionState.CONNECTED
                
                self.logger.info("RabbitMQ连接建立成功")
                
                # 调用连接打开回调
                if self.on_connection_open:
                    self.on_connection_open(self.connection)
                
                return True
                
            except Exception as e:
                self.state = ConnectionState.DISCONNECTED
                self.logger.error(f"连接失败: {e}")
                
                # 调用连接错误回调
                if self.on_connection_error:
                    self.on_connection_error(e)
                
                return False
    
    def disconnect(self):
        """断开连接"""
        with self._lock:
            if self.connection and not self.connection.is_closed:
                try:
                    self.connection.close()
                    self.logger.info("连接已断开")
                except Exception as e:
                    self.logger.error(f"断开连接时出错: {e}")
            
            self.state = ConnectionState.CLOSED
            self.connection = None
            
            # 停止重连线程
            if self._reconnect_thread and self._reconnect_thread.is_alive():
                self._stop_reconnect.set()
                self._reconnect_thread.join()
            
            # 调用连接关闭回调
            if self.on_connection_closed:
                self.on_connection_closed()
    
    def is_connected(self) -> bool:
        """检查连接状态"""
        return (self.connection is not None and 
                not self.connection.is_closed and 
                self.state == ConnectionState.CONNECTED)
    
    def get_connection(self) -> Optional[pika.BlockingConnection]:
        """获取连接对象"""
        if self.is_connected():
            return self.connection
        return None
    
    def start_auto_reconnect(self, max_attempts: int = -1):
        """启动自动重连"""
        if self._reconnect_thread and self._reconnect_thread.is_alive():
            self.logger.warning("自动重连已在运行")
            return
        
        self._stop_reconnect.clear()
        self._reconnect_thread = threading.Thread(
            target=self._auto_reconnect_worker,
            args=(max_attempts,),
            daemon=True
        )
        self._reconnect_thread.start()
        self.logger.info("自动重连已启动")
    
    def stop_auto_reconnect(self):
        """停止自动重连"""
        self._stop_reconnect.set()
        if self._reconnect_thread and self._reconnect_thread.is_alive():
            self._reconnect_thread.join()
        self.logger.info("自动重连已停止")
    
    def _build_connection_params(self) -> pika.ConnectionParameters:
        """构建连接参数"""
        # SSL配置
        ssl_context = None
        if self.config.ssl_enabled:
            ssl_context = ssl.create_default_context()
            if self.config.ssl_options:
                # 配置SSL选项
                if 'ca_certs' in self.config.ssl_options:
                    ssl_context.load_verify_locations(self.config.ssl_options['ca_certs'])
                if 'certfile' in self.config.ssl_options and 'keyfile' in self.config.ssl_options:
                    ssl_context.load_cert_chain(
                        self.config.ssl_options['certfile'],
                        self.config.ssl_options['keyfile']
                    )
        
        # 认证信息
        credentials = pika.PlainCredentials(
            self.config.username,
            self.config.password
        )
        
        return pika.ConnectionParameters(
            host=self.config.host,
            port=self.config.port,
            virtual_host=self.config.virtual_host,
            credentials=credentials,
            heartbeat=self.config.heartbeat,
            blocked_connection_timeout=self.config.blocked_connection_timeout,
            connection_attempts=self.config.connection_attempts,
            retry_delay=self.config.retry_delay,
            socket_timeout=self.config.socket_timeout,
            ssl_options=pika.SSLOptions(ssl_context) if ssl_context else None,
            locale=self.config.locale,
            client_properties=self.config.client_properties
        )
    
    def _auto_reconnect_worker(self, max_attempts: int):
        """自动重连工作线程"""
        attempt = 0
        
        while not self._stop_reconnect.is_set():
            if max_attempts > 0 and attempt >= max_attempts:
                self.logger.error(f"达到最大重连次数 ({max_attempts}),停止重连")
                break
            
            if not self.is_connected():
                attempt += 1
                self.state = ConnectionState.RECONNECTING
                self.logger.info(f"尝试重连 (第 {attempt} 次)")
                
                if self.connect():
                    attempt = 0  # 重置重连计数
                    self.logger.info("重连成功")
                else:
                    self.logger.warning(f"重连失败,{self.config.retry_delay}秒后重试")
            
            # 等待重试间隔
            self._stop_reconnect.wait(self.config.retry_delay)

class RabbitMQChannelManager:
    """RabbitMQ通道管理器"""
    
    def __init__(self, connection_manager: RabbitMQConnectionManager):
        self.connection_manager = connection_manager
        self.channels: Dict[str, pika.channel.Channel] = {}
        self.logger = logging.getLogger(__name__)
        self._lock = threading.Lock()
    
    def create_channel(self, channel_id: str = None) -> Optional[pika.channel.Channel]:
        """创建通道"""
        with self._lock:
            connection = self.connection_manager.get_connection()
            if not connection:
                self.logger.error("无法创建通道:连接不可用")
                return None
            
            try:
                channel = connection.channel()
                
                # 生成通道ID
                if not channel_id:
                    channel_id = f"channel_{len(self.channels) + 1}"
                
                self.channels[channel_id] = channel
                self.logger.info(f"通道创建成功: {channel_id}")
                
                return channel
                
            except Exception as e:
                self.logger.error(f"创建通道失败: {e}")
                return None
    
    def get_channel(self, channel_id: str) -> Optional[pika.channel.Channel]:
        """获取通道"""
        return self.channels.get(channel_id)
    
    def close_channel(self, channel_id: str) -> bool:
        """关闭通道"""
        with self._lock:
            channel = self.channels.get(channel_id)
            if not channel:
                self.logger.warning(f"通道不存在: {channel_id}")
                return False
            
            try:
                if not channel.is_closed:
                    channel.close()
                del self.channels[channel_id]
                self.logger.info(f"通道已关闭: {channel_id}")
                return True
                
            except Exception as e:
                self.logger.error(f"关闭通道失败: {e}")
                return False
    
    def close_all_channels(self):
        """关闭所有通道"""
        with self._lock:
            for channel_id in list(self.channels.keys()):
                self.close_channel(channel_id)
    
    def get_channel_count(self) -> int:
        """获取通道数量"""
        return len(self.channels)
    
    def list_channels(self) -> List[str]:
        """列出所有通道ID"""
        return list(self.channels.keys())

# 使用示例
if __name__ == "__main__":
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    print("=== RabbitMQ连接与通道管理示例 ===")
    
    # 创建连接配置
    config = ConnectionConfig(
        host='localhost',
        port=5672,
        username='admin',
        password='admin123',
        heartbeat=600,
        connection_attempts=3,
        retry_delay=2.0
    )
    
    # 创建连接管理器
    conn_manager = RabbitMQConnectionManager(config)
    
    # 设置回调函数
    def on_connection_open(connection):
        print(f"✅ 连接已打开: {connection}")
    
    def on_connection_closed():
        print("❌ 连接已关闭")
    
    def on_connection_error(error):
        print(f"❌ 连接错误: {error}")
    
    conn_manager.on_connection_open = on_connection_open
    conn_manager.on_connection_closed = on_connection_closed
    conn_manager.on_connection_error = on_connection_error
    
    try:
        # 建立连接
        print("\n1. 建立连接")
        if conn_manager.connect():
            print("✅ 连接建立成功")
            
            # 创建通道管理器
            channel_manager = RabbitMQChannelManager(conn_manager)
            
            # 创建通道
            print("\n2. 创建通道")
            channel1 = channel_manager.create_channel("main_channel")
            channel2 = channel_manager.create_channel("backup_channel")
            
            if channel1 and channel2:
                print(f"✅ 通道创建成功,总数: {channel_manager.get_channel_count()}")
                print(f"通道列表: {channel_manager.list_channels()}")
                
                # 模拟使用通道
                time.sleep(2)
                
                # 关闭通道
                print("\n3. 关闭通道")
                channel_manager.close_all_channels()
                print("✅ 所有通道已关闭")
            else:
                print("❌ 通道创建失败")
        else:
            print("❌ 连接建立失败")
    
    except KeyboardInterrupt:
        print("\n收到中断信号")
    finally:
        # 断开连接
        print("\n4. 断开连接")
        conn_manager.disconnect()
        print("✅ 连接已断开")

3.2 队列操作

3.2.1 队列声明与管理

import pika
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
from enum import Enum
import json
import time

class QueueType(Enum):
    """队列类型枚举"""
    CLASSIC = "classic"
    QUORUM = "quorum"
    STREAM = "stream"

@dataclass
class QueueConfig:
    """队列配置"""
    name: str
    durable: bool = True
    exclusive: bool = False
    auto_delete: bool = False
    arguments: Dict[str, Any] = None
    
    # 队列类型
    queue_type: QueueType = QueueType.CLASSIC
    
    # TTL配置
    message_ttl: Optional[int] = None  # 消息TTL(毫秒)
    expires: Optional[int] = None      # 队列TTL(毫秒)
    
    # 长度限制
    max_length: Optional[int] = None
    max_length_bytes: Optional[int] = None
    
    # 死信配置
    dead_letter_exchange: Optional[str] = None
    dead_letter_routing_key: Optional[str] = None
    
    # 优先级配置
    max_priority: Optional[int] = None
    
    # 延迟配置
    delayed: bool = False
    
    def __post_init__(self):
        if self.arguments is None:
            self.arguments = {}
        
        # 设置队列类型
        if self.queue_type != QueueType.CLASSIC:
            self.arguments['x-queue-type'] = self.queue_type.value
        
        # 设置TTL
        if self.message_ttl is not None:
            self.arguments['x-message-ttl'] = self.message_ttl
        if self.expires is not None:
            self.arguments['x-expires'] = self.expires
        
        # 设置长度限制
        if self.max_length is not None:
            self.arguments['x-max-length'] = self.max_length
        if self.max_length_bytes is not None:
            self.arguments['x-max-length-bytes'] = self.max_length_bytes
        
        # 设置死信配置
        if self.dead_letter_exchange is not None:
            self.arguments['x-dead-letter-exchange'] = self.dead_letter_exchange
        if self.dead_letter_routing_key is not None:
            self.arguments['x-dead-letter-routing-key'] = self.dead_letter_routing_key
        
        # 设置优先级
        if self.max_priority is not None:
            self.arguments['x-max-priority'] = self.max_priority
        
        # 设置延迟队列
        if self.delayed:
            self.arguments['x-delayed-type'] = 'direct'

class RabbitMQQueueManager:
    """RabbitMQ队列管理器"""
    
    def __init__(self, channel: pika.channel.Channel):
        self.channel = channel
        self.logger = logging.getLogger(__name__)
        self.declared_queues: Dict[str, QueueConfig] = {}
    
    def declare_queue(self, config: QueueConfig) -> bool:
        """声明队列"""
        try:
            result = self.channel.queue_declare(
                queue=config.name,
                durable=config.durable,
                exclusive=config.exclusive,
                auto_delete=config.auto_delete,
                arguments=config.arguments
            )
            
            self.declared_queues[config.name] = config
            self.logger.info(f"队列声明成功: {config.name}")
            
            return True
            
        except Exception as e:
            self.logger.error(f"队列声明失败 {config.name}: {e}")
            return False
    
    def delete_queue(self, queue_name: str, if_unused: bool = False, if_empty: bool = False) -> bool:
        """删除队列"""
        try:
            self.channel.queue_delete(
                queue=queue_name,
                if_unused=if_unused,
                if_empty=if_empty
            )
            
            if queue_name in self.declared_queues:
                del self.declared_queues[queue_name]
            
            self.logger.info(f"队列删除成功: {queue_name}")
            return True
            
        except Exception as e:
            self.logger.error(f"队列删除失败 {queue_name}: {e}")
            return False
    
    def purge_queue(self, queue_name: str) -> Optional[int]:
        """清空队列"""
        try:
            result = self.channel.queue_purge(queue=queue_name)
            message_count = result.method.message_count
            
            self.logger.info(f"队列清空成功: {queue_name}, 清除消息数: {message_count}")
            return message_count
            
        except Exception as e:
            self.logger.error(f"队列清空失败 {queue_name}: {e}")
            return None
    
    def get_queue_info(self, queue_name: str) -> Optional[Dict[str, Any]]:
        """获取队列信息"""
        try:
            # 使用passive=True来检查队列是否存在而不创建它
            result = self.channel.queue_declare(queue=queue_name, passive=True)
            
            return {
                'name': queue_name,
                'message_count': result.method.message_count,
                'consumer_count': result.method.consumer_count,
                'config': self.declared_queues.get(queue_name)
            }
            
        except Exception as e:
            self.logger.error(f"获取队列信息失败 {queue_name}: {e}")
            return None
    
    def bind_queue(self, queue_name: str, exchange_name: str, routing_key: str = "", arguments: Dict[str, Any] = None) -> bool:
        """绑定队列到交换机"""
        try:
            self.channel.queue_bind(
                exchange=exchange_name,
                queue=queue_name,
                routing_key=routing_key,
                arguments=arguments
            )
            
            self.logger.info(f"队列绑定成功: {queue_name} -> {exchange_name} (routing_key: {routing_key})")
            return True
            
        except Exception as e:
            self.logger.error(f"队列绑定失败: {e}")
            return False
    
    def unbind_queue(self, queue_name: str, exchange_name: str, routing_key: str = "", arguments: Dict[str, Any] = None) -> bool:
        """解绑队列"""
        try:
            self.channel.queue_unbind(
                exchange=exchange_name,
                queue=queue_name,
                routing_key=routing_key,
                arguments=arguments
            )
            
            self.logger.info(f"队列解绑成功: {queue_name} -> {exchange_name} (routing_key: {routing_key})")
            return True
            
        except Exception as e:
            self.logger.error(f"队列解绑失败: {e}")
            return False
    
    def list_declared_queues(self) -> List[str]:
        """列出已声明的队列"""
        return list(self.declared_queues.keys())
    
    def create_standard_queues(self) -> Dict[str, bool]:
        """创建标准队列集合"""
        results = {}
        
        # 普通队列
        normal_queue = QueueConfig(
            name="normal.queue",
            durable=True
        )
        results["normal.queue"] = self.declare_queue(normal_queue)
        
        # 优先级队列
        priority_queue = QueueConfig(
            name="priority.queue",
            durable=True,
            max_priority=10
        )
        results["priority.queue"] = self.declare_queue(priority_queue)
        
        # TTL队列
        ttl_queue = QueueConfig(
            name="ttl.queue",
            durable=True,
            message_ttl=60000  # 60秒
        )
        results["ttl.queue"] = self.declare_queue(ttl_queue)
        
        # 死信队列
        dlx_queue = QueueConfig(
            name="dlx.queue",
            durable=True
        )
        results["dlx.queue"] = self.declare_queue(dlx_queue)
        
        # 带死信的队列
        main_queue = QueueConfig(
            name="main.queue",
            durable=True,
            dead_letter_exchange="dlx.exchange",
            dead_letter_routing_key="dlx"
        )
        results["main.queue"] = self.declare_queue(main_queue)
        
        # 限长队列
        limited_queue = QueueConfig(
            name="limited.queue",
            durable=True,
            max_length=1000
        )
        results["limited.queue"] = self.declare_queue(limited_queue)
        
        return results

# 使用示例
if __name__ == "__main__":
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    print("=== RabbitMQ队列管理示例 ===")
    
    # 连接配置
    config = ConnectionConfig(
        host='localhost',
        port=5672,
        username='admin',
        password='admin123'
    )
    
    # 建立连接
    conn_manager = RabbitMQConnectionManager(config)
    
    try:
        if conn_manager.connect():
            # 创建通道
            channel_manager = RabbitMQChannelManager(conn_manager)
            channel = channel_manager.create_channel("queue_mgmt_channel")
            
            if channel:
                # 创建队列管理器
                queue_manager = RabbitMQQueueManager(channel)
                
                print("\n1. 创建标准队列")
                results = queue_manager.create_standard_queues()
                success_count = sum(1 for success in results.values() if success)
                print(f"✅ 成功创建 {success_count}/{len(results)} 个队列")
                
                print("\n2. 列出已声明的队列")
                declared_queues = queue_manager.list_declared_queues()
                print(f"已声明队列: {declared_queues}")
                
                print("\n3. 获取队列信息")
                for queue_name in declared_queues[:3]:  # 只显示前3个
                    info = queue_manager.get_queue_info(queue_name)
                    if info:
                        print(f"队列 {queue_name}: 消息数={info['message_count']}, 消费者数={info['consumer_count']}")
                
                print("\n4. 测试队列操作")
                test_queue = QueueConfig(
                    name="test.queue",
                    durable=False,
                    auto_delete=True
                )
                
                if queue_manager.declare_queue(test_queue):
                    print("✅ 测试队列创建成功")
                    
                    # 清空队列
                    purged_count = queue_manager.purge_queue("test.queue")
                    print(f"清空队列,删除消息数: {purged_count}")
                    
                    # 删除队列
                    if queue_manager.delete_queue("test.queue"):
                        print("✅ 测试队列删除成功")
                
                # 关闭通道
                channel_manager.close_all_channels()
            else:
                print("❌ 通道创建失败")
        else:
            print("❌ 连接失败")
    
    except Exception as e:
        print(f"❌ 错误: {e}")
    finally:
        conn_manager.disconnect()

if __name__ == "__main__":
    main()

4. 消息消费

4.1 消费者配置

from enum import Enum
from dataclasses import dataclass
from typing import Callable, Optional, Dict, Any
import threading
import time

class AckMode(Enum):
    """确认模式"""
    AUTO = "auto"          # 自动确认
    MANUAL = "manual"      # 手动确认
    REJECT = "reject"      # 拒绝消息

class QosMode(Enum):
    """QoS模式"""
    GLOBAL = "global"      # 全局QoS
    CHANNEL = "channel"    # 通道QoS

@dataclass
class ConsumerConfig:
    """消费者配置"""
    queue_name: str
    consumer_tag: str = ""
    auto_ack: bool = False
    exclusive: bool = False
    no_local: bool = False
    prefetch_count: int = 1
    prefetch_size: int = 0
    qos_global: bool = False
    callback: Optional[Callable] = None
    
class RabbitMQConsumer:
    """RabbitMQ消费者"""
    
    def __init__(self, channel):
        self.channel = channel
        self.consumers = {}
        self.is_consuming = False
        self.statistics = {
            'consumed': 0,
            'acknowledged': 0,
            'rejected': 0,
            'requeued': 0
        }
        self._lock = threading.Lock()
    
    def set_qos(self, prefetch_count: int = 1, prefetch_size: int = 0, global_qos: bool = False) -> bool:
        """设置QoS"""
        try:
            self.channel.basic_qos(
                prefetch_count=prefetch_count,
                prefetch_size=prefetch_size,
                global_=global_qos
            )
            return True
        except Exception as e:
            print(f"设置QoS失败: {e}")
            return False
    
    def start_consuming(self, config: ConsumerConfig) -> bool:
        """开始消费"""
        try:
            # 设置QoS
            self.set_qos(
                prefetch_count=config.prefetch_count,
                prefetch_size=config.prefetch_size,
                global_qos=config.qos_global
            )
            
            # 创建消费回调
            def callback_wrapper(ch, method, properties, body):
                try:
                    # 更新统计
                    with self._lock:
                        self.statistics['consumed'] += 1
                    
                    # 调用用户回调
                    if config.callback:
                        result = config.callback(ch, method, properties, body)
                        
                        # 处理确认
                        if not config.auto_ack:
                            if result == AckMode.AUTO or result is True:
                                ch.basic_ack(delivery_tag=method.delivery_tag)
                                with self._lock:
                                    self.statistics['acknowledged'] += 1
                            elif result == AckMode.REJECT:
                                ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
                                with self._lock:
                                    self.statistics['rejected'] += 1
                            elif result == AckMode.MANUAL:
                                ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
                                with self._lock:
                                    self.statistics['requeued'] += 1
                    else:
                        # 默认确认
                        if not config.auto_ack:
                            ch.basic_ack(delivery_tag=method.delivery_tag)
                            with self._lock:
                                self.statistics['acknowledged'] += 1
                
                except Exception as e:
                    print(f"消息处理错误: {e}")
                    if not config.auto_ack:
                        ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
            
            # 开始消费
            consumer_tag = self.channel.basic_consume(
                queue=config.queue_name,
                on_message_callback=callback_wrapper,
                auto_ack=config.auto_ack,
                exclusive=config.exclusive,
                consumer_tag=config.consumer_tag
            )
            
            self.consumers[consumer_tag] = config
            self.is_consuming = True
            
            print(f"✅ 开始消费队列: {config.queue_name}, 消费者标签: {consumer_tag}")
            return True
            
        except Exception as e:
            print(f"开始消费失败: {e}")
            return False
    
    def stop_consuming(self, consumer_tag: str = None) -> bool:
        """停止消费"""
        try:
            if consumer_tag:
                self.channel.basic_cancel(consumer_tag)
                if consumer_tag in self.consumers:
                    del self.consumers[consumer_tag]
                print(f"✅ 停止消费者: {consumer_tag}")
            else:
                # 停止所有消费者
                for tag in list(self.consumers.keys()):
                    self.channel.basic_cancel(tag)
                self.consumers.clear()
                self.is_consuming = False
                print("✅ 停止所有消费者")
            
            return True
        except Exception as e:
            print(f"停止消费失败: {e}")
            return False
    
    def get_statistics(self) -> Dict[str, int]:
        """获取统计信息"""
        with self._lock:
            return self.statistics.copy()
    
    def reset_statistics(self):
        """重置统计信息"""
        with self._lock:
            self.statistics = {
                'consumed': 0,
                'acknowledged': 0,
                'rejected': 0,
                'requeued': 0
            }

4.2 消息处理器

from abc import ABC, abstractmethod
import json
import logging
from typing import Any, Dict

class MessageProcessor(ABC):
    """消息处理器基类"""
    
    @abstractmethod
    def process(self, ch, method, properties, body) -> AckMode:
        """处理消息"""
        pass

class JSONMessageProcessor(MessageProcessor):
    """JSON消息处理器"""
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
    
    def process(self, ch, method, properties, body) -> AckMode:
        """处理JSON消息"""
        try:
            # 解析JSON
            message = json.loads(body.decode('utf-8'))
            
            # 记录消息信息
            self.logger.info(f"收到消息: {message}")
            self.logger.info(f"路由键: {method.routing_key}")
            self.logger.info(f"交换机: {method.exchange}")
            
            # 处理消息
            result = self.handle_message(message, method, properties)
            
            return AckMode.AUTO if result else AckMode.REJECT
            
        except json.JSONDecodeError as e:
            self.logger.error(f"JSON解析失败: {e}")
            return AckMode.REJECT
        except Exception as e:
            self.logger.error(f"消息处理失败: {e}")
            return AckMode.MANUAL  # 重新入队
    
    def handle_message(self, message: Dict[str, Any], method, properties) -> bool:
        """处理具体消息逻辑"""
        # 子类实现具体逻辑
        print(f"处理消息: {message}")
        return True

class OrderMessageProcessor(JSONMessageProcessor):
    """订单消息处理器"""
    
    def handle_message(self, message: Dict[str, Any], method, properties) -> bool:
        """处理订单消息"""
        try:
            message_type = message.get('type')
            
            if message_type == 'order_created':
                return self.handle_order_created(message)
            elif message_type == 'order_paid':
                return self.handle_order_paid(message)
            elif message_type == 'order_shipped':
                return self.handle_order_shipped(message)
            else:
                self.logger.warning(f"未知消息类型: {message_type}")
                return False
                
        except Exception as e:
            self.logger.error(f"订单消息处理失败: {e}")
            return False
    
    def handle_order_created(self, message: Dict[str, Any]) -> bool:
        """处理订单创建"""
        order_id = message.get('order_id')
        print(f"📦 处理订单创建: {order_id}")
        # 实际业务逻辑
        time.sleep(0.1)  # 模拟处理时间
        return True
    
    def handle_order_paid(self, message: Dict[str, Any]) -> bool:
        """处理订单支付"""
        order_id = message.get('order_id')
        amount = message.get('amount')
        print(f"💰 处理订单支付: {order_id}, 金额: {amount}")
        time.sleep(0.1)
        return True
    
    def handle_order_shipped(self, message: Dict[str, Any]) -> bool:
        """处理订单发货"""
        order_id = message.get('order_id')
        tracking_number = message.get('tracking_number')
        print(f"🚚 处理订单发货: {order_id}, 快递单号: {tracking_number}")
        time.sleep(0.1)
        return True

4.3 消费示例

def consumer_example():
    """消费者示例"""
    print("=== RabbitMQ消息消费示例 ===")
    
    # 连接配置
    config = ConnectionConfig(
        host='localhost',
        port=5672,
        username='admin',
        password='admin123'
    )
    
    # 建立连接
    conn_manager = RabbitMQConnectionManager(config)
    
    try:
        if conn_manager.connect():
            # 创建通道
            channel_manager = RabbitMQChannelManager(conn_manager)
            channel = channel_manager.create_channel("consumer_channel")
            
            if channel:
                # 创建消费者
                consumer = RabbitMQConsumer(channel)
                
                # 创建消息处理器
                order_processor = OrderMessageProcessor()
                
                # 配置消费者
                consumer_config = ConsumerConfig(
                    queue_name="order.queue",
                    consumer_tag="order_consumer",
                    auto_ack=False,
                    prefetch_count=10,
                    callback=order_processor.process
                )
                
                # 开始消费
                if consumer.start_consuming(consumer_config):
                    print("✅ 消费者启动成功,等待消息...")
                    
                    # 消费一段时间
                    try:
                        channel.start_consuming()
                    except KeyboardInterrupt:
                        print("\n⏹️ 收到停止信号")
                        consumer.stop_consuming()
                        channel.stop_consuming()
                        
                        # 显示统计信息
                        stats = consumer.get_statistics()
                        print("\n📊 消费统计:")
                        print(f"已消费: {stats['consumed']}")
                        print(f"已确认: {stats['acknowledged']}")
                        print(f"已拒绝: {stats['rejected']}")
                        print(f"重新入队: {stats['requeued']}")
                
                # 关闭通道
                channel_manager.close_all_channels()
            else:
                print("❌ 通道创建失败")
        else:
            print("❌ 连接失败")
    
    except Exception as e:
        print(f"❌ 错误: {e}")
    finally:
        conn_manager.disconnect()

if __name__ == "__main__":
    consumer_example()

5. 监控与管理

5.1 管理API

import requests
from typing import Dict, List, Optional
from dataclasses import dataclass

@dataclass
class RabbitMQManagementConfig:
    """管理API配置"""
    host: str = "localhost"
    port: int = 15672
    username: str = "admin"
    password: str = "admin123"
    use_ssl: bool = False
    
    @property
    def base_url(self) -> str:
        protocol = "https" if self.use_ssl else "http"
        return f"{protocol}://{self.host}:{self.port}/api"

class RabbitMQManagementAPI:
    """RabbitMQ管理API客户端"""
    
    def __init__(self, config: RabbitMQManagementConfig):
        self.config = config
        self.session = requests.Session()
        self.session.auth = (config.username, config.password)
        self.session.headers.update({'Content-Type': 'application/json'})
    
    def get_overview(self) -> Optional[Dict]:
        """获取集群概览"""
        try:
            response = self.session.get(f"{self.config.base_url}/overview")
            response.raise_for_status()
            return response.json()
        except Exception as e:
            print(f"获取概览失败: {e}")
            return None
    
    def get_nodes(self) -> Optional[List[Dict]]:
        """获取节点列表"""
        try:
            response = self.session.get(f"{self.config.base_url}/nodes")
            response.raise_for_status()
            return response.json()
        except Exception as e:
            print(f"获取节点列表失败: {e}")
            return None
    
    def get_queues(self, vhost: str = "/") -> Optional[List[Dict]]:
        """获取队列列表"""
        try:
            response = self.session.get(f"{self.config.base_url}/queues/{vhost}")
            response.raise_for_status()
            return response.json()
        except Exception as e:
            print(f"获取队列列表失败: {e}")
            return None
    
    def get_exchanges(self, vhost: str = "/") -> Optional[List[Dict]]:
        """获取交换机列表"""
        try:
            response = self.session.get(f"{self.config.base_url}/exchanges/{vhost}")
            response.raise_for_status()
            return response.json()
        except Exception as e:
            print(f"获取交换机列表失败: {e}")
            return None
    
    def get_connections(self) -> Optional[List[Dict]]:
        """获取连接列表"""
        try:
            response = self.session.get(f"{self.config.base_url}/connections")
            response.raise_for_status()
            return response.json()
        except Exception as e:
            print(f"获取连接列表失败: {e}")
            return None
    
    def get_channels(self) -> Optional[List[Dict]]:
        """获取通道列表"""
        try:
            response = self.session.get(f"{self.config.base_url}/channels")
            response.raise_for_status()
            return response.json()
        except Exception as e:
            print(f"获取通道列表失败: {e}")
            return None
    
    def delete_queue(self, queue_name: str, vhost: str = "/") -> bool:
        """删除队列"""
        try:
            response = self.session.delete(f"{self.config.base_url}/queues/{vhost}/{queue_name}")
            response.raise_for_status()
            return True
        except Exception as e:
            print(f"删除队列失败: {e}")
            return False
    
    def purge_queue(self, queue_name: str, vhost: str = "/") -> bool:
        """清空队列"""
        try:
            response = self.session.delete(f"{self.config.base_url}/queues/{vhost}/{queue_name}/contents")
            response.raise_for_status()
            return True
        except Exception as e:
            print(f"清空队列失败: {e}")
            return False

5.2 监控示例

def monitoring_example():
    """监控示例"""
    print("=== RabbitMQ监控示例 ===")
    
    # 管理API配置
    mgmt_config = RabbitMQManagementConfig(
        host='localhost',
        port=15672,
        username='admin',
        password='admin123'
    )
    
    # 创建管理API客户端
    api = RabbitMQManagementAPI(mgmt_config)
    
    # 获取集群概览
    print("\n1. 集群概览")
    overview = api.get_overview()
    if overview:
        print(f"RabbitMQ版本: {overview.get('rabbitmq_version')}")
        print(f"Erlang版本: {overview.get('erlang_version')}")
        print(f"消息总数: {overview.get('queue_totals', {}).get('messages', 0)}")
        print(f"准备就绪消息: {overview.get('queue_totals', {}).get('messages_ready', 0)}")
        print(f"未确认消息: {overview.get('queue_totals', {}).get('messages_unacknowledged', 0)}")
    
    # 获取节点信息
    print("\n2. 节点信息")
    nodes = api.get_nodes()
    if nodes:
        for node in nodes:
            print(f"节点: {node.get('name')}")
            print(f"  状态: {node.get('running')}")
            print(f"  内存使用: {node.get('mem_used', 0) / 1024 / 1024:.2f} MB")
            print(f"  磁盘空闲: {node.get('disk_free', 0) / 1024 / 1024 / 1024:.2f} GB")
    
    # 获取队列信息
    print("\n3. 队列信息")
    queues = api.get_queues()
    if queues:
        for queue in queues:
            print(f"队列: {queue.get('name')}")
            print(f"  消息数: {queue.get('messages', 0)}")
            print(f"  消费者数: {queue.get('consumers', 0)}")
            print(f"  状态: {queue.get('state')}")
    
    # 获取交换机信息
    print("\n4. 交换机信息")
    exchanges = api.get_exchanges()
    if exchanges:
        for exchange in exchanges:
            if exchange.get('name'):  # 跳过默认交换机
                print(f"交换机: {exchange.get('name')}")
                print(f"  类型: {exchange.get('type')}")
                print(f"  持久化: {exchange.get('durable')}")
    
    # 获取连接信息
    print("\n5. 连接信息")
    connections = api.get_connections()
    if connections:
        for conn in connections:
            print(f"连接: {conn.get('name')}")
            print(f"  状态: {conn.get('state')}")
            print(f"  用户: {conn.get('user')}")
            print(f"  虚拟主机: {conn.get('vhost')}")

if __name__ == "__main__":
    monitoring_example()

本章总结

核心知识点

  1. 连接与通道管理

    • 连接池管理和复用
    • 通道的创建和生命周期
    • 连接状态监控
  2. 队列操作

    • 队列声明和配置
    • 队列类型和特性
    • 队列绑定和路由
  3. 交换机操作

    • 交换机类型和路由规则
    • 拓扑结构设计
    • 绑定关系管理
  4. 消息发布

    • 消息属性和配置
    • 发布确认机制
    • 批量发布优化
  5. 消息消费

    • 消费者配置和QoS
    • 消息确认机制
    • 消息处理模式
  6. 监控管理

    • 管理API使用
    • 集群状态监控
    • 性能指标收集

最佳实践

  1. 连接管理

    • 使用连接池避免频繁创建连接
    • 合理设置连接超时和心跳
    • 实现连接重连机制
  2. 消息处理

    • 使用手动确认保证消息可靠性
    • 设置合适的QoS避免消息堆积
    • 实现幂等性处理
  3. 错误处理

    • 实现重试机制和死信队列
    • 记录详细的错误日志
    • 监控异常指标
  4. 性能优化

    • 批量发布提高吞吐量
    • 合理设置预取数量
    • 使用持久化连接

练习题

  1. 实现一个支持重连的RabbitMQ连接管理器
  2. 设计一个消息路由系统,支持多种路由策略
  3. 实现一个消息处理框架,支持插件化处理器
  4. 创建一个RabbitMQ监控面板,显示关键指标
  5. 设计一个消息重试和死信处理机制

3.3 交换机操作

3.3.1 交换机声明与管理

import pika
from typing import Dict, Any, Optional, List, Tuple
from dataclasses import dataclass
from enum import Enum
import json

class ExchangeType(Enum):
    """交换机类型枚举"""
    DIRECT = "direct"
    FANOUT = "fanout"
    TOPIC = "topic"
    HEADERS = "headers"
    DELAYED = "x-delayed-message"

@dataclass
class ExchangeConfig:
    """交换机配置"""
    name: str
    exchange_type: ExchangeType
    durable: bool = True
    auto_delete: bool = False
    internal: bool = False
    arguments: Dict[str, Any] = None
    
    # 延迟交换机配置
    delayed_type: Optional[str] = None
    
    def __post_init__(self):
        if self.arguments is None:
            self.arguments = {}
        
        # 设置延迟交换机参数
        if self.exchange_type == ExchangeType.DELAYED:
            if self.delayed_type:
                self.arguments['x-delayed-type'] = self.delayed_type
            else:
                self.arguments['x-delayed-type'] = 'direct'

class RabbitMQExchangeManager:
    """RabbitMQ交换机管理器"""
    
    def __init__(self, channel: pika.channel.Channel):
        self.channel = channel
        self.logger = logging.getLogger(__name__)
        self.declared_exchanges: Dict[str, ExchangeConfig] = {}
    
    def declare_exchange(self, config: ExchangeConfig) -> bool:
        """声明交换机"""
        try:
            self.channel.exchange_declare(
                exchange=config.name,
                exchange_type=config.exchange_type.value,
                durable=config.durable,
                auto_delete=config.auto_delete,
                internal=config.internal,
                arguments=config.arguments
            )
            
            self.declared_exchanges[config.name] = config
            self.logger.info(f"交换机声明成功: {config.name} (类型: {config.exchange_type.value})")
            
            return True
            
        except Exception as e:
            self.logger.error(f"交换机声明失败 {config.name}: {e}")
            return False
    
    def delete_exchange(self, exchange_name: str, if_unused: bool = False) -> bool:
        """删除交换机"""
        try:
            self.channel.exchange_delete(
                exchange=exchange_name,
                if_unused=if_unused
            )
            
            if exchange_name in self.declared_exchanges:
                del self.declared_exchanges[exchange_name]
            
            self.logger.info(f"交换机删除成功: {exchange_name}")
            return True
            
        except Exception as e:
            self.logger.error(f"交换机删除失败 {exchange_name}: {e}")
            return False
    
    def bind_exchange(self, destination: str, source: str, routing_key: str = "", arguments: Dict[str, Any] = None) -> bool:
        """绑定交换机"""
        try:
            self.channel.exchange_bind(
                destination=destination,
                source=source,
                routing_key=routing_key,
                arguments=arguments
            )
            
            self.logger.info(f"交换机绑定成功: {source} -> {destination} (routing_key: {routing_key})")
            return True
            
        except Exception as e:
            self.logger.error(f"交换机绑定失败: {e}")
            return False
    
    def unbind_exchange(self, destination: str, source: str, routing_key: str = "", arguments: Dict[str, Any] = None) -> bool:
        """解绑交换机"""
        try:
            self.channel.exchange_unbind(
                destination=destination,
                source=source,
                routing_key=routing_key,
                arguments=arguments
            )
            
            self.logger.info(f"交换机解绑成功: {source} -> {destination} (routing_key: {routing_key})")
            return True
            
        except Exception as e:
            self.logger.error(f"交换机解绑失败: {e}")
            return False
    
    def list_declared_exchanges(self) -> List[str]:
        """列出已声明的交换机"""
        return list(self.declared_exchanges.keys())
    
    def get_exchange_config(self, exchange_name: str) -> Optional[ExchangeConfig]:
        """获取交换机配置"""
        return self.declared_exchanges.get(exchange_name)
    
    def create_standard_exchanges(self) -> Dict[str, bool]:
        """创建标准交换机集合"""
        results = {}
        
        # Direct交换机
        direct_exchange = ExchangeConfig(
            name="direct.exchange",
            exchange_type=ExchangeType.DIRECT,
            durable=True
        )
        results["direct.exchange"] = self.declare_exchange(direct_exchange)
        
        # Fanout交换机
        fanout_exchange = ExchangeConfig(
            name="fanout.exchange",
            exchange_type=ExchangeType.FANOUT,
            durable=True
        )
        results["fanout.exchange"] = self.declare_exchange(fanout_exchange)
        
        # Topic交换机
        topic_exchange = ExchangeConfig(
            name="topic.exchange",
            exchange_type=ExchangeType.TOPIC,
            durable=True
        )
        results["topic.exchange"] = self.declare_exchange(topic_exchange)
        
        # Headers交换机
        headers_exchange = ExchangeConfig(
            name="headers.exchange",
            exchange_type=ExchangeType.HEADERS,
            durable=True
        )
        results["headers.exchange"] = self.declare_exchange(headers_exchange)
        
        # 死信交换机
        dlx_exchange = ExchangeConfig(
            name="dlx.exchange",
            exchange_type=ExchangeType.DIRECT,
            durable=True
        )
        results["dlx.exchange"] = self.declare_exchange(dlx_exchange)
        
        return results
    
    def create_routing_topology(self) -> bool:
        """创建路由拓扑"""
        try:
            # 创建交换机
            exchanges_result = self.create_standard_exchanges()
            
            # 创建队列管理器(需要传入channel)
            queue_manager = RabbitMQQueueManager(self.channel)
            
            # 创建队列
            queues_result = queue_manager.create_standard_queues()
            
            # 绑定队列到交换机
            bindings = [
                ("normal.queue", "direct.exchange", "normal"),
                ("priority.queue", "direct.exchange", "priority"),
                ("ttl.queue", "topic.exchange", "ttl.*"),
                ("dlx.queue", "dlx.exchange", "dlx"),
                ("limited.queue", "fanout.exchange", "")
            ]
            
            binding_results = []
            for queue_name, exchange_name, routing_key in bindings:
                success = queue_manager.bind_queue(queue_name, exchange_name, routing_key)
                binding_results.append(success)
            
            success_count = sum(1 for success in binding_results if success)
            self.logger.info(f"路由拓扑创建完成: {success_count}/{len(bindings)} 个绑定成功")
            
            return all(binding_results)
            
        except Exception as e:
            self.logger.error(f"创建路由拓扑失败: {e}")
            return False

class RabbitMQTopologyManager:
    """RabbitMQ拓扑管理器"""
    
    def __init__(self, channel: pika.channel.Channel):
        self.channel = channel
        self.exchange_manager = RabbitMQExchangeManager(channel)
        self.queue_manager = RabbitMQQueueManager(channel)
        self.logger = logging.getLogger(__name__)
    
    def create_complete_topology(self) -> Dict[str, Any]:
        """创建完整拓扑"""
        result = {
            'exchanges': {},
            'queues': {},
            'bindings': [],
            'success': False
        }
        
        try:
            # 创建交换机
            self.logger.info("创建交换机...")
            result['exchanges'] = self.exchange_manager.create_standard_exchanges()
            
            # 创建队列
            self.logger.info("创建队列...")
            result['queues'] = self.queue_manager.create_standard_queues()
            
            # 创建绑定
            self.logger.info("创建绑定...")
            bindings = self._create_bindings()
            result['bindings'] = bindings
            
            # 检查整体成功率
            exchange_success = sum(1 for success in result['exchanges'].values() if success)
            queue_success = sum(1 for success in result['queues'].values() if success)
            binding_success = sum(1 for success in bindings if success)
            
            total_operations = len(result['exchanges']) + len(result['queues']) + len(bindings)
            successful_operations = exchange_success + queue_success + binding_success
            
            result['success'] = successful_operations == total_operations
            result['success_rate'] = successful_operations / total_operations if total_operations > 0 else 0
            
            self.logger.info(f"拓扑创建完成: {successful_operations}/{total_operations} 操作成功")
            
        except Exception as e:
            self.logger.error(f"创建拓扑失败: {e}")
            result['error'] = str(e)
        
        return result
    
    def _create_bindings(self) -> List[bool]:
        """创建绑定关系"""
        bindings = [
            # 队列绑定
            ("queue", "normal.queue", "direct.exchange", "normal"),
            ("queue", "priority.queue", "direct.exchange", "priority"),
            ("queue", "ttl.queue", "topic.exchange", "ttl.*"),
            ("queue", "dlx.queue", "dlx.exchange", "dlx"),
            ("queue", "limited.queue", "fanout.exchange", ""),
            
            # 交换机绑定(可选)
            # ("exchange", "topic.exchange", "direct.exchange", "topic.route")
        ]
        
        results = []
        for binding_type, target, source, routing_key in bindings:
            try:
                if binding_type == "queue":
                    success = self.queue_manager.bind_queue(target, source, routing_key)
                elif binding_type == "exchange":
                    success = self.exchange_manager.bind_exchange(target, source, routing_key)
                else:
                    success = False
                
                results.append(success)
                
            except Exception as e:
                self.logger.error(f"绑定失败 {target} -> {source}: {e}")
                results.append(False)
        
        return results
    
    def cleanup_topology(self) -> bool:
        """清理拓扑"""
        try:
            # 删除队列
            for queue_name in self.queue_manager.list_declared_queues():
                self.queue_manager.delete_queue(queue_name)
            
            # 删除交换机
            for exchange_name in self.exchange_manager.list_declared_exchanges():
                self.exchange_manager.delete_exchange(exchange_name)
            
            self.logger.info("拓扑清理完成")
            return True
            
        except Exception as e:
            self.logger.error(f"拓扑清理失败: {e}")
            return False
    
    def get_topology_summary(self) -> Dict[str, Any]:
        """获取拓扑摘要"""
        return {
            'exchanges': {
                'count': len(self.exchange_manager.list_declared_exchanges()),
                'names': self.exchange_manager.list_declared_exchanges()
            },
            'queues': {
                'count': len(self.queue_manager.list_declared_queues()),
                'names': self.queue_manager.list_declared_queues()
            }
        }

# 使用示例
if __name__ == "__main__":
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    print("=== RabbitMQ交换机与拓扑管理示例 ===")
    
    # 连接配置
    config = ConnectionConfig(
        host='localhost',
        port=5672,
        username='admin',
        password='admin123'
    )
    
    # 建立连接
    conn_manager = RabbitMQConnectionManager(config)
    
    try:
        if conn_manager.connect():
            # 创建通道
            channel_manager = RabbitMQChannelManager(conn_manager)
            channel = channel_manager.create_channel("topology_mgmt_channel")
            
            if channel:
                # 创建拓扑管理器
                topology_manager = RabbitMQTopologyManager(channel)
                
                print("\n1. 创建完整拓扑")
                result = topology_manager.create_complete_topology()
                
                if result['success']:
                    print(f"✅ 拓扑创建成功 (成功率: {result['success_rate']:.2%})")
                else:
                    print(f"❌ 拓扑创建失败 (成功率: {result['success_rate']:.2%})")
                    if 'error' in result:
                        print(f"错误: {result['error']}")
                
                print("\n2. 拓扑摘要")
                summary = topology_manager.get_topology_summary()
                print(f"交换机数量: {summary['exchanges']['count']}")
                print(f"队列数量: {summary['queues']['count']}")
                print(f"交换机列表: {summary['exchanges']['names']}")
                print(f"队列列表: {summary['queues']['names']}")
                
                print("\n3. 测试交换机操作")
                exchange_manager = topology_manager.exchange_manager
                
                # 创建测试交换机
                test_exchange = ExchangeConfig(
                    name="test.exchange",
                    exchange_type=ExchangeType.TOPIC,
                    durable=False,
                    auto_delete=True
                )
                
                if exchange_manager.declare_exchange(test_exchange):
                    print("✅ 测试交换机创建成功")
                    
                    # 删除测试交换机
                    if exchange_manager.delete_exchange("test.exchange"):
                        print("✅ 测试交换机删除成功")
                
                # 清理拓扑(可选)
                # print("\n4. 清理拓扑")
                # if topology_manager.cleanup_topology():
                #     print("✅ 拓扑清理成功")
                
                # 关闭通道
                channel_manager.close_all_channels()
            else:
                print("❌ 通道创建失败")
        else:
            print("❌ 连接失败")
    
    except Exception as e:
        print(f"❌ 错误: {e}")
    finally:
        conn_manager.disconnect()

3.4 消息发布与消费

3.4.1 消息发布

import pika
import json
import time
import uuid
from typing import Dict, Any, Optional, List, Union
from dataclasses import dataclass, asdict
from enum import Enum
import threading
from datetime import datetime, timedelta

class DeliveryMode(Enum):
    """消息持久化模式"""
    NON_PERSISTENT = 1
    PERSISTENT = 2

class Priority(Enum):
    """消息优先级"""
    LOW = 1
    NORMAL = 5
    HIGH = 10

@dataclass
class MessageProperties:
    """消息属性"""
    # 基本属性
    content_type: str = "application/json"
    content_encoding: str = "utf-8"
    delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT
    priority: int = Priority.NORMAL.value
    
    # 标识属性
    correlation_id: Optional[str] = None
    reply_to: Optional[str] = None
    message_id: Optional[str] = None
    
    # 时间属性
    timestamp: Optional[int] = None
    expiration: Optional[str] = None  # TTL in milliseconds
    
    # 应用属性
    app_id: Optional[str] = None
    user_id: Optional[str] = None
    type: Optional[str] = None
    
    # 自定义头部
    headers: Optional[Dict[str, Any]] = None
    
    def __post_init__(self):
        if self.headers is None:
            self.headers = {}
        
        # 自动生成消息ID
        if self.message_id is None:
            self.message_id = str(uuid.uuid4())
        
        # 自动设置时间戳
        if self.timestamp is None:
            self.timestamp = int(time.time())
    
    def to_pika_properties(self) -> pika.BasicProperties:
        """转换为pika属性对象"""
        return pika.BasicProperties(
            content_type=self.content_type,
            content_encoding=self.content_encoding,
            delivery_mode=self.delivery_mode.value,
            priority=self.priority,
            correlation_id=self.correlation_id,
            reply_to=self.reply_to,
            message_id=self.message_id,
            timestamp=self.timestamp,
            expiration=self.expiration,
            app_id=self.app_id,
            user_id=self.user_id,
            type=self.type,
            headers=self.headers
        )

@dataclass
class Message:
    """消息对象"""
    body: Union[str, bytes, Dict[str, Any]]
    properties: MessageProperties = None
    exchange: str = ""
    routing_key: str = ""
    mandatory: bool = False
    immediate: bool = False
    
    def __post_init__(self):
        if self.properties is None:
            self.properties = MessageProperties()
        
        # 处理消息体
        if isinstance(self.body, dict):
            self.body = json.dumps(self.body, ensure_ascii=False)
            self.properties.content_type = "application/json"
        elif isinstance(self.body, str):
            self.body = self.body.encode('utf-8')
            self.properties.content_type = "text/plain"
    
    def get_body_bytes(self) -> bytes:
        """获取字节格式的消息体"""
        if isinstance(self.body, bytes):
            return self.body
        elif isinstance(self.body, str):
            return self.body.encode('utf-8')
        else:
            return json.dumps(self.body, ensure_ascii=False).encode('utf-8')

class RabbitMQPublisher:
    """RabbitMQ消息发布器"""
    
    def __init__(self, channel: pika.channel.Channel):
        self.channel = channel
        self.logger = logging.getLogger(__name__)
        self._confirm_delivery = False
        self._published_messages = 0
        self._confirmed_messages = 0
        self._failed_messages = 0
        self._lock = threading.Lock()
    
    def enable_confirm_delivery(self) -> bool:
        """启用发布确认"""
        try:
            self.channel.confirm_delivery()
            self._confirm_delivery = True
            self.logger.info("发布确认已启用")
            return True
        except Exception as e:
            self.logger.error(f"启用发布确认失败: {e}")
            return False
    
    def publish_message(self, message: Message) -> bool:
        """发布单条消息"""
        try:
            with self._lock:
                success = self.channel.basic_publish(
                    exchange=message.exchange,
                    routing_key=message.routing_key,
                    body=message.get_body_bytes(),
                    properties=message.properties.to_pika_properties(),
                    mandatory=message.mandatory,
                    immediate=message.immediate
                )
                
                self._published_messages += 1
                
                if self._confirm_delivery:
                    if success:
                        self._confirmed_messages += 1
                        self.logger.debug(f"消息发布确认: {message.properties.message_id}")
                    else:
                        self._failed_messages += 1
                        self.logger.warning(f"消息发布失败: {message.properties.message_id}")
                
                return success
                
        except Exception as e:
            self.logger.error(f"发布消息失败: {e}")
            with self._lock:
                self._failed_messages += 1
            return False
    
    def publish_batch(self, messages: List[Message]) -> Dict[str, int]:
        """批量发布消息"""
        results = {
            'total': len(messages),
            'success': 0,
            'failed': 0
        }
        
        for message in messages:
            if self.publish_message(message):
                results['success'] += 1
            else:
                results['failed'] += 1
        
        self.logger.info(f"批量发布完成: {results['success']}/{results['total']} 成功")
        return results
    
    def publish_with_retry(self, message: Message, max_retries: int = 3, retry_delay: float = 1.0) -> bool:
        """带重试的消息发布"""
        for attempt in range(max_retries + 1):
            if self.publish_message(message):
                if attempt > 0:
                    self.logger.info(f"消息发布成功 (重试 {attempt} 次): {message.properties.message_id}")
                return True
            
            if attempt < max_retries:
                self.logger.warning(f"消息发布失败,{retry_delay}秒后重试 (第 {attempt + 1} 次): {message.properties.message_id}")
                time.sleep(retry_delay)
        
        self.logger.error(f"消息发布最终失败 (重试 {max_retries} 次): {message.properties.message_id}")
        return False
    
    def publish_delayed_message(self, message: Message, delay_seconds: int) -> bool:
        """发布延迟消息"""
        # 设置延迟头部
        if message.properties.headers is None:
            message.properties.headers = {}
        
        message.properties.headers['x-delay'] = delay_seconds * 1000  # 转换为毫秒
        
        self.logger.info(f"发布延迟消息 (延迟 {delay_seconds} 秒): {message.properties.message_id}")
        return self.publish_message(message)
    
    def publish_priority_message(self, message: Message, priority: Priority) -> bool:
        """发布优先级消息"""
        message.properties.priority = priority.value
        
        self.logger.info(f"发布优先级消息 (优先级 {priority.value}): {message.properties.message_id}")
        return self.publish_message(message)
    
    def publish_rpc_request(self, message: Message, reply_queue: str, correlation_id: str = None) -> bool:
        """发布RPC请求消息"""
        if correlation_id is None:
            correlation_id = str(uuid.uuid4())
        
        message.properties.reply_to = reply_queue
        message.properties.correlation_id = correlation_id
        
        self.logger.info(f"发布RPC请求: {correlation_id} -> {reply_queue}")
        return self.publish_message(message)
    
    def get_statistics(self) -> Dict[str, int]:
        """获取发布统计信息"""
        with self._lock:
            return {
                'published': self._published_messages,
                'confirmed': self._confirmed_messages,
                'failed': self._failed_messages,
                'pending': self._published_messages - self._confirmed_messages - self._failed_messages
            }
    
    def reset_statistics(self):
        """重置统计信息"""
        with self._lock:
            self._published_messages = 0
            self._confirmed_messages = 0
            self._failed_messages = 0

class MessageBuilder:
    """消息构建器"""
    
    def __init__(self):
        self.reset()
    
    def reset(self):
        """重置构建器"""
        self._body = None
        self._properties = MessageProperties()
        self._exchange = ""
        self._routing_key = ""
        self._mandatory = False
        self._immediate = False
        return self
    
    def body(self, body: Union[str, bytes, Dict[str, Any]]):
        """设置消息体"""
        self._body = body
        return self
    
    def exchange(self, exchange: str):
        """设置交换机"""
        self._exchange = exchange
        return self
    
    def routing_key(self, routing_key: str):
        """设置路由键"""
        self._routing_key = routing_key
        return self
    
    def content_type(self, content_type: str):
        """设置内容类型"""
        self._properties.content_type = content_type
        return self
    
    def priority(self, priority: Union[int, Priority]):
        """设置优先级"""
        if isinstance(priority, Priority):
            self._properties.priority = priority.value
        else:
            self._properties.priority = priority
        return self
    
    def persistent(self, persistent: bool = True):
        """设置持久化"""
        self._properties.delivery_mode = DeliveryMode.PERSISTENT if persistent else DeliveryMode.NON_PERSISTENT
        return self
    
    def ttl(self, ttl_seconds: int):
        """设置TTL"""
        self._properties.expiration = str(ttl_seconds * 1000)
        return self
    
    def correlation_id(self, correlation_id: str):
        """设置关联ID"""
        self._properties.correlation_id = correlation_id
        return self
    
    def reply_to(self, reply_to: str):
        """设置回复队列"""
        self._properties.reply_to = reply_to
        return self
    
    def header(self, key: str, value: Any):
        """添加头部"""
        if self._properties.headers is None:
            self._properties.headers = {}
        self._properties.headers[key] = value
        return self
    
    def headers(self, headers: Dict[str, Any]):
        """设置头部"""
        self._properties.headers = headers
        return self
    
    def mandatory(self, mandatory: bool = True):
        """设置强制路由"""
        self._mandatory = mandatory
        return self
    
    def build(self) -> Message:
        """构建消息"""
        if self._body is None:
            raise ValueError("消息体不能为空")
        
        message = Message(
            body=self._body,
            properties=self._properties,
            exchange=self._exchange,
            routing_key=self._routing_key,
            mandatory=self._mandatory,
            immediate=self._immediate
        )
        
        return message

# 使用示例
if __name__ == "__main__":
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    print("=== RabbitMQ消息发布示例 ===")
    
    # 连接配置
    config = ConnectionConfig(
        host='localhost',
        port=5672,
        username='admin',
        password='admin123'
    )
    
    # 建立连接
    conn_manager = RabbitMQConnectionManager(config)
    
    try:
        if conn_manager.connect():
            # 创建通道
            channel_manager = RabbitMQChannelManager(conn_manager)
            channel = channel_manager.create_channel("publisher_channel")
            
            if channel:
                # 创建发布器
                publisher = RabbitMQPublisher(channel)
                publisher.enable_confirm_delivery()
                
                # 创建拓扑
                topology_manager = RabbitMQTopologyManager(channel)
                topology_result = topology_manager.create_complete_topology()
                
                if topology_result['success']:
                    print("✅ 拓扑创建成功")
                    
                    # 使用消息构建器创建消息
                    print("\n1. 发布普通消息")
                    builder = MessageBuilder()
                    
                    normal_message = (builder
                        .reset()
                        .body({"type": "normal", "content": "这是一条普通消息", "timestamp": time.time()})
                        .exchange("direct.exchange")
                        .routing_key("normal")
                        .persistent(True)
                        .build())
                    
                    if publisher.publish_message(normal_message):
                        print("✅ 普通消息发布成功")
                    
                    # 发布优先级消息
                    print("\n2. 发布优先级消息")
                    priority_message = (builder
                        .reset()
                        .body({"type": "priority", "content": "这是一条高优先级消息"})
                        .exchange("direct.exchange")
                        .routing_key("priority")
                        .priority(Priority.HIGH)
                        .build())
                    
                    if publisher.publish_priority_message(priority_message, Priority.HIGH):
                        print("✅ 优先级消息发布成功")
                    
                    # 发布TTL消息
                    print("\n3. 发布TTL消息")
                    ttl_message = (builder
                        .reset()
                        .body({"type": "ttl", "content": "这是一条TTL消息"})
                        .exchange("topic.exchange")
                        .routing_key("ttl.test")
                        .ttl(30)  # 30秒TTL
                        .build())
                    
                    if publisher.publish_message(ttl_message):
                        print("✅ TTL消息发布成功")
                    
                    # 批量发布消息
                    print("\n4. 批量发布消息")
                    batch_messages = []
                    for i in range(5):
                        message = (builder
                            .reset()
                            .body({"type": "batch", "index": i, "content": f"批量消息 {i}"})
                            .exchange("fanout.exchange")
                            .routing_key("")
                            .build())
                        batch_messages.append(message)
                    
                    batch_result = publisher.publish_batch(batch_messages)
                    print(f"✅ 批量发布完成: {batch_result['success']}/{batch_result['total']} 成功")
                    
                    # 获取统计信息
                    print("\n5. 发布统计信息")
                    stats = publisher.get_statistics()
                    print(f"已发布: {stats['published']}")
                    print(f"已确认: {stats['confirmed']}")
                    print(f"失败: {stats['failed']}")
                    print(f"待确认: {stats['pending']}")
                
                else:
                    print("❌ 拓扑创建失败")
                
                # 关闭通道
                channel_manager.close_all_channels()
            else:
                print("❌ 通道创建失败")
        else:
            print("❌ 连接失败")
    
    except Exception as e:
        print(f"❌ 错误: {e}")
    finally:
        conn_manager.disconnect()