2.1 RabbitMQ安装

2.1.1 Docker安装(推荐)

# docker-compose.yml
version: '3.8'

services:
  rabbitmq:
    image: rabbitmq:3.12-management
    container_name: rabbitmq
    hostname: rabbitmq
    ports:
      - "5672:5672"    # AMQP端口
      - "15672:15672"  # 管理界面端口
      - "25672:25672"  # 集群端口
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: admin123
      RABBITMQ_DEFAULT_VHOST: /
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
      - ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
      - ./definitions.json:/etc/rabbitmq/definitions.json
    networks:
      - rabbitmq_network
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "ping"]
      interval: 30s
      timeout: 10s
      retries: 3

volumes:
  rabbitmq_data:
    driver: local

networks:
  rabbitmq_network:
    driver: bridge
import docker
import time
import requests
from typing import Dict, Any, Optional
import logging

class RabbitMQDockerManager:
    """RabbitMQ Docker管理器"""
    
    def __init__(self):
        self.client = docker.from_env()
        self.container_name = 'rabbitmq'
        self.image = 'rabbitmq:3.12-management'
        self.logger = logging.getLogger(__name__)
    
    def start_container(self, config: Dict[str, Any] = None) -> bool:
        """启动RabbitMQ容器"""
        try:
            # 默认配置
            default_config = {
                'ports': {
                    '5672/tcp': 5672,
                    '15672/tcp': 15672,
                    '25672/tcp': 25672
                },
                'environment': {
                    'RABBITMQ_DEFAULT_USER': 'admin',
                    'RABBITMQ_DEFAULT_PASS': 'admin123',
                    'RABBITMQ_DEFAULT_VHOST': '/'
                },
                'volumes': {
                    'rabbitmq_data': {'bind': '/var/lib/rabbitmq', 'mode': 'rw'}
                },
                'detach': True,
                'remove': False,
                'restart_policy': {'Name': 'unless-stopped'}
            }
            
            # 合并配置
            if config:
                default_config.update(config)
            
            # 检查容器是否已存在
            try:
                existing_container = self.client.containers.get(self.container_name)
                if existing_container.status == 'running':
                    self.logger.info(f"容器 {self.container_name} 已在运行")
                    return True
                else:
                    existing_container.start()
                    self.logger.info(f"启动现有容器 {self.container_name}")
                    return True
            except docker.errors.NotFound:
                pass
            
            # 创建并启动新容器
            container = self.client.containers.run(
                self.image,
                name=self.container_name,
                hostname='rabbitmq',
                **default_config
            )
            
            self.logger.info(f"RabbitMQ容器启动成功: {container.id[:12]}")
            
            # 等待服务启动
            self.wait_for_service()
            return True
            
        except Exception as e:
            self.logger.error(f"启动RabbitMQ容器失败: {e}")
            return False
    
    def stop_container(self) -> bool:
        """停止RabbitMQ容器"""
        try:
            container = self.client.containers.get(self.container_name)
            container.stop()
            self.logger.info(f"RabbitMQ容器已停止: {self.container_name}")
            return True
            
        except docker.errors.NotFound:
            self.logger.warning(f"容器 {self.container_name} 不存在")
            return True
        except Exception as e:
            self.logger.error(f"停止RabbitMQ容器失败: {e}")
            return False
    
    def remove_container(self) -> bool:
        """删除RabbitMQ容器"""
        try:
            container = self.client.containers.get(self.container_name)
            container.stop()
            container.remove()
            self.logger.info(f"RabbitMQ容器已删除: {self.container_name}")
            return True
            
        except docker.errors.NotFound:
            self.logger.warning(f"容器 {self.container_name} 不存在")
            return True
        except Exception as e:
            self.logger.error(f"删除RabbitMQ容器失败: {e}")
            return False
    
    def get_container_status(self) -> Optional[Dict[str, Any]]:
        """获取容器状态"""
        try:
            container = self.client.containers.get(self.container_name)
            return {
                'id': container.id,
                'name': container.name,
                'status': container.status,
                'image': container.image.tags[0] if container.image.tags else 'unknown',
                'ports': container.ports,
                'created': container.attrs['Created'],
                'started': container.attrs['State']['StartedAt']
            }
        except docker.errors.NotFound:
            return None
        except Exception as e:
            self.logger.error(f"获取容器状态失败: {e}")
            return None
    
    def wait_for_service(self, timeout: int = 60) -> bool:
        """等待RabbitMQ服务启动"""
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            try:
                # 检查管理界面是否可访问
                response = requests.get('http://localhost:15672', timeout=5)
                if response.status_code == 200:
                    self.logger.info("RabbitMQ服务已启动")
                    return True
            except requests.exceptions.RequestException:
                pass
            
            time.sleep(2)
        
        self.logger.error(f"等待RabbitMQ服务启动超时 ({timeout}秒)")
        return False
    
    def get_logs(self, tail: int = 100) -> str:
        """获取容器日志"""
        try:
            container = self.client.containers.get(self.container_name)
            logs = container.logs(tail=tail, timestamps=True)
            return logs.decode('utf-8')
        except Exception as e:
            self.logger.error(f"获取容器日志失败: {e}")
            return ""

# 使用示例
if __name__ == "__main__":
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    print("=== RabbitMQ Docker管理示例 ===")
    
    manager = RabbitMQDockerManager()
    
    try:
        # 启动容器
        print("\n1. 启动RabbitMQ容器")
        if manager.start_container():
            print("✅ 容器启动成功")
            
            # 获取容器状态
            status = manager.get_container_status()
            if status:
                print(f"容器状态: {status['status']}")
                print(f"端口映射: {status['ports']}")
            
            print("\n管理界面: http://localhost:15672")
            print("用户名: admin")
            print("密码: admin123")
        else:
            print("❌ 容器启动失败")
    
    except KeyboardInterrupt:
        print("\n收到中断信号,正在停止容器...")
        manager.stop_container()
    except Exception as e:
        print(f"❌ 错误: {e}")

2.1.2 Linux系统安装

#!/bin/bash
# install_rabbitmq.sh - RabbitMQ安装脚本

set -e

# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color

# 日志函数
log_info() {
    echo -e "${GREEN}[INFO]${NC} $1"
}

log_warn() {
    echo -e "${YELLOW}[WARN]${NC} $1"
}

log_error() {
    echo -e "${RED}[ERROR]${NC} $1"
}

# 检测操作系统
detect_os() {
    if [ -f /etc/os-release ]; then
        . /etc/os-release
        OS=$NAME
        VER=$VERSION_ID
    else
        log_error "无法检测操作系统"
        exit 1
    fi
    
    log_info "检测到操作系统: $OS $VER"
}

# Ubuntu/Debian安装
install_ubuntu() {
    log_info "开始在Ubuntu/Debian上安装RabbitMQ"
    
    # 更新包列表
    sudo apt-get update
    
    # 安装依赖
    sudo apt-get install -y curl gnupg apt-transport-https
    
    # 添加RabbitMQ签名密钥
    curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null
    
    # 添加RabbitMQ仓库
    echo "deb [signed-by=/usr/share/keyrings/com.rabbitmq.team.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
    echo "deb [signed-by=/usr/share/keyrings/com.rabbitmq.team.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu $(lsb_release -cs) main" | sudo tee -a /etc/apt/sources.list.d/rabbitmq.list
    
    # 更新包列表
    sudo apt-get update
    
    # 安装Erlang和RabbitMQ
    sudo apt-get install -y erlang-base erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key erlang-runtime-tools erlang-snmp erlang-ssl erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl
    sudo apt-get install -y rabbitmq-server
    
    log_info "RabbitMQ安装完成"
}

# CentOS/RHEL安装
install_centos() {
    log_info "开始在CentOS/RHEL上安装RabbitMQ"
    
    # 安装EPEL仓库
    sudo yum install -y epel-release
    
    # 添加RabbitMQ仓库
    curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
    
    # 安装Erlang
    sudo yum install -y erlang
    
    # 安装RabbitMQ
    sudo yum install -y rabbitmq-server
    
    log_info "RabbitMQ安装完成"
}

# 配置RabbitMQ
configure_rabbitmq() {
    log_info "配置RabbitMQ"
    
    # 启用管理插件
    sudo rabbitmq-plugins enable rabbitmq_management
    
    # 启动RabbitMQ服务
    sudo systemctl enable rabbitmq-server
    sudo systemctl start rabbitmq-server
    
    # 等待服务启动
    sleep 5
    
    # 创建管理员用户
    sudo rabbitmqctl add_user admin admin123
    sudo rabbitmqctl set_user_tags admin administrator
    sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
    
    # 删除默认guest用户(可选)
    sudo rabbitmqctl delete_user guest || true
    
    log_info "RabbitMQ配置完成"
    log_info "管理界面: http://localhost:15672"
    log_info "用户名: admin"
    log_info "密码: admin123"
}

# 主函数
main() {
    log_info "开始安装RabbitMQ"
    
    detect_os
    
    case "$OS" in
        "Ubuntu"*|"Debian"*)
            install_ubuntu
            ;;
        "CentOS"*|"Red Hat"*)
            install_centos
            ;;
        *)
            log_error "不支持的操作系统: $OS"
            exit 1
            ;;
    esac
    
    configure_rabbitmq
    
    log_info "RabbitMQ安装和配置完成!"
}

# 执行主函数
main "$@"

2.2 配置管理

2.2.1 配置文件管理

import os
import json
import yaml
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, asdict
from pathlib import Path

@dataclass
class RabbitMQServerConfig:
    """RabbitMQ服务器配置"""
    # 网络配置
    listeners_tcp_default: int = 5672
    listeners_ssl_default: Optional[int] = None
    
    # 管理界面配置
    management_tcp_port: int = 15672
    management_ssl_port: Optional[int] = None
    
    # 集群配置
    cluster_formation_peer_discovery_backend: str = "rabbit_peer_discovery_classic_config"
    cluster_formation_classic_config_nodes: List[str] = None
    
    # 内存配置
    vm_memory_high_watermark: float = 0.4
    vm_memory_high_watermark_paging_ratio: float = 0.5
    
    # 磁盘配置
    disk_free_limit_absolute: str = "2GB"
    disk_free_limit_relative: float = 1.0
    
    # 日志配置
    log_levels_default: str = "info"
    log_levels_connection: str = "info"
    log_levels_channel: str = "info"
    
    # 队列配置
    queue_master_locator: str = "min-masters"
    
    # 其他配置
    heartbeat: int = 60
    frame_max: int = 131072
    channel_max: int = 2047
    
    def __post_init__(self):
        if self.cluster_formation_classic_config_nodes is None:
            self.cluster_formation_classic_config_nodes = []

@dataclass
class RabbitMQAdvancedConfig:
    """RabbitMQ高级配置"""
    # 认证配置
    auth_mechanisms: List[str] = None
    auth_backends: List[str] = None
    
    # SSL配置
    ssl_options: Dict[str, Any] = None
    
    # 插件配置
    enabled_plugins: List[str] = None
    
    # 策略配置
    policies: List[Dict[str, Any]] = None
    
    # 用户配置
    users: List[Dict[str, Any]] = None
    
    # 虚拟主机配置
    vhosts: List[Dict[str, Any]] = None
    
    def __post_init__(self):
        if self.auth_mechanisms is None:
            self.auth_mechanisms = ['PLAIN', 'AMQPLAIN']
        if self.auth_backends is None:
            self.auth_backends = ['rabbit_auth_backend_internal']
        if self.ssl_options is None:
            self.ssl_options = {}
        if self.enabled_plugins is None:
            self.enabled_plugins = ['rabbitmq_management']
        if self.policies is None:
            self.policies = []
        if self.users is None:
            self.users = []
        if self.vhosts is None:
            self.vhosts = []

class RabbitMQConfigManager:
    """RabbitMQ配置管理器"""
    
    def __init__(self, config_dir: str = "/etc/rabbitmq"):
        self.config_dir = Path(config_dir)
        self.config_file = self.config_dir / "rabbitmq.conf"
        self.advanced_config_file = self.config_dir / "advanced.config"
        self.enabled_plugins_file = self.config_dir / "enabled_plugins"
        self.definitions_file = self.config_dir / "definitions.json"
        self.logger = logging.getLogger(__name__)
    
    def generate_config(self, config: RabbitMQServerConfig) -> str:
        """生成配置文件内容"""
        config_lines = []
        config_lines.append("# RabbitMQ Configuration File")
        config_lines.append("# Generated by RabbitMQConfigManager")
        config_lines.append("")
        
        # 网络配置
        config_lines.append("# Network Configuration")
        config_lines.append(f"listeners.tcp.default = {config.listeners_tcp_default}")
        if config.listeners_ssl_default:
            config_lines.append(f"listeners.ssl.default = {config.listeners_ssl_default}")
        config_lines.append("")
        
        # 管理界面配置
        config_lines.append("# Management Plugin Configuration")
        config_lines.append(f"management.tcp.port = {config.management_tcp_port}")
        if config.management_ssl_port:
            config_lines.append(f"management.ssl.port = {config.management_ssl_port}")
        config_lines.append("")
        
        # 集群配置
        if config.cluster_formation_classic_config_nodes:
            config_lines.append("# Cluster Configuration")
            config_lines.append(f"cluster_formation.peer_discovery_backend = {config.cluster_formation_peer_discovery_backend}")
            nodes_str = ",".join([f"rabbit@{node}" for node in config.cluster_formation_classic_config_nodes])
            config_lines.append(f"cluster_formation.classic_config.nodes.1 = {nodes_str}")
            config_lines.append("")
        
        # 内存配置
        config_lines.append("# Memory Configuration")
        config_lines.append(f"vm_memory_high_watermark.relative = {config.vm_memory_high_watermark}")
        config_lines.append(f"vm_memory_high_watermark_paging_ratio = {config.vm_memory_high_watermark_paging_ratio}")
        config_lines.append("")
        
        # 磁盘配置
        config_lines.append("# Disk Configuration")
        config_lines.append(f"disk_free_limit.absolute = {config.disk_free_limit_absolute}")
        config_lines.append("")
        
        # 日志配置
        config_lines.append("# Logging Configuration")
        config_lines.append(f"log.console.level = {config.log_levels_default}")
        config_lines.append(f"log.connection.level = {config.log_levels_connection}")
        config_lines.append(f"log.channel.level = {config.log_levels_channel}")
        config_lines.append("")
        
        # 其他配置
        config_lines.append("# Other Configuration")
        config_lines.append(f"heartbeat = {config.heartbeat}")
        config_lines.append(f"frame_max = {config.frame_max}")
        config_lines.append(f"channel_max = {config.channel_max}")
        config_lines.append(f"queue_master_locator = {config.queue_master_locator}")
        
        return "\n".join(config_lines)
    
    def generate_advanced_config(self, config: RabbitMQAdvancedConfig) -> str:
        """生成高级配置文件内容"""
        advanced_config = {
            'rabbit': {
                'auth_mechanisms': config.auth_mechanisms,
                'auth_backends': config.auth_backends
            }
        }
        
        if config.ssl_options:
            advanced_config['rabbit']['ssl_options'] = config.ssl_options
        
        # 转换为Erlang配置格式
        return self._dict_to_erlang_config(advanced_config)
    
    def generate_enabled_plugins(self, plugins: List[str]) -> str:
        """生成启用插件文件内容"""
        plugins_str = ",".join(plugins)
        return f"[{plugins_str}].\n"
    
    def generate_definitions(self, config: RabbitMQAdvancedConfig) -> str:
        """生成定义文件内容"""
        definitions = {
            'rabbit_version': '3.12.0',
            'rabbitmq_version': '3.12.0',
            'product_name': 'RabbitMQ',
            'product_version': '3.12.0',
            'users': config.users,
            'vhosts': config.vhosts,
            'permissions': [],
            'topic_permissions': [],
            'parameters': [],
            'global_parameters': [],
            'policies': config.policies,
            'queues': [],
            'exchanges': [],
            'bindings': []
        }
        
        return json.dumps(definitions, indent=2)
    
    def save_config(self, config: RabbitMQServerConfig, advanced_config: RabbitMQAdvancedConfig = None):
        """保存配置文件"""
        try:
            # 创建配置目录
            self.config_dir.mkdir(parents=True, exist_ok=True)
            
            # 保存主配置文件
            config_content = self.generate_config(config)
            with open(self.config_file, 'w', encoding='utf-8') as f:
                f.write(config_content)
            self.logger.info(f"主配置文件已保存: {self.config_file}")
            
            # 保存高级配置文件
            if advanced_config:
                advanced_config_content = self.generate_advanced_config(advanced_config)
                with open(self.advanced_config_file, 'w', encoding='utf-8') as f:
                    f.write(advanced_config_content)
                self.logger.info(f"高级配置文件已保存: {self.advanced_config_file}")
                
                # 保存启用插件文件
                plugins_content = self.generate_enabled_plugins(advanced_config.enabled_plugins)
                with open(self.enabled_plugins_file, 'w', encoding='utf-8') as f:
                    f.write(plugins_content)
                self.logger.info(f"插件配置文件已保存: {self.enabled_plugins_file}")
                
                # 保存定义文件
                definitions_content = self.generate_definitions(advanced_config)
                with open(self.definitions_file, 'w', encoding='utf-8') as f:
                    f.write(definitions_content)
                self.logger.info(f"定义文件已保存: {self.definitions_file}")
        
        except Exception as e:
            self.logger.error(f"保存配置文件失败: {e}")
            raise
    
    def load_config(self) -> Optional[Dict[str, Any]]:
        """加载配置文件"""
        try:
            config_data = {}
            
            # 加载主配置文件
            if self.config_file.exists():
                with open(self.config_file, 'r', encoding='utf-8') as f:
                    config_data['main_config'] = f.read()
            
            # 加载高级配置文件
            if self.advanced_config_file.exists():
                with open(self.advanced_config_file, 'r', encoding='utf-8') as f:
                    config_data['advanced_config'] = f.read()
            
            # 加载插件配置
            if self.enabled_plugins_file.exists():
                with open(self.enabled_plugins_file, 'r', encoding='utf-8') as f:
                    config_data['enabled_plugins'] = f.read()
            
            # 加载定义文件
            if self.definitions_file.exists():
                with open(self.definitions_file, 'r', encoding='utf-8') as f:
                    config_data['definitions'] = json.loads(f.read())
            
            return config_data
        
        except Exception as e:
            self.logger.error(f"加载配置文件失败: {e}")
            return None
    
    def _dict_to_erlang_config(self, config_dict: Dict[str, Any]) -> str:
        """将字典转换为Erlang配置格式"""
        def format_value(value):
            if isinstance(value, str):
                return f'"{value}"'
            elif isinstance(value, bool):
                return 'true' if value else 'false'
            elif isinstance(value, list):
                items = [format_value(item) for item in value]
                return f"[{', '.join(items)}]"
            elif isinstance(value, dict):
                items = [f"{key}, {format_value(val)}" for key, val in value.items()]
                return f"{{{', '.join(items)}}}"
            else:
                return str(value)
        
        lines = []
        for app, config in config_dict.items():
            lines.append(f"{{{app}, [")
            for key, value in config.items():
                lines.append(f"  {{{key}, {format_value(value)}}},")
            lines.append("]}.")
        
        return "\n".join(lines)

# 使用示例
if __name__ == "__main__":
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    print("=== RabbitMQ配置管理示例 ===")
    
    # 创建配置管理器
    config_manager = RabbitMQConfigManager("./rabbitmq_config")
    
    # 创建服务器配置
    server_config = RabbitMQServerConfig(
        listeners_tcp_default=5672,
        management_tcp_port=15672,
        vm_memory_high_watermark=0.4,
        disk_free_limit_absolute="2GB",
        heartbeat=60
    )
    
    # 创建高级配置
    advanced_config = RabbitMQAdvancedConfig(
        enabled_plugins=['rabbitmq_management', 'rabbitmq_prometheus'],
        users=[
            {
                'name': 'admin',
                'password_hash': 'admin123',
                'hashing_algorithm': 'rabbit_password_hashing_sha256',
                'tags': 'administrator'
            }
        ],
        vhosts=[
            {'name': '/'},
            {'name': '/test'}
        ],
        policies=[
            {
                'vhost': '/',
                'name': 'ha-all',
                'pattern': '.*',
                'apply-to': 'queues',
                'definition': {
                    'ha-mode': 'all',
                    'ha-sync-mode': 'automatic'
                },
                'priority': 0
            }
        ]
    )
    
    try:
        # 保存配置
        print("\n1. 保存配置文件")
        config_manager.save_config(server_config, advanced_config)
        print("✅ 配置文件保存成功")
        
        # 加载配置
        print("\n2. 加载配置文件")
        loaded_config = config_manager.load_config()
        if loaded_config:
            print("✅ 配置文件加载成功")
            print(f"配置文件数量: {len(loaded_config)}")
        else:
            print("❌ 配置文件加载失败")
    
    except Exception as e:
        print(f"❌ 错误: {e}")

2.2.2 环境变量配置

import os
from typing import Dict, Any, Optional
from dataclasses import dataclass

@dataclass
class RabbitMQEnvironment:
    """RabbitMQ环境变量配置"""
    
    # 基本配置
    RABBITMQ_NODE_NAME: str = "rabbit@localhost"
    RABBITMQ_NODENAME: str = "rabbit@localhost"
    RABBITMQ_NODE_PORT: int = 5672
    RABBITMQ_DIST_PORT: int = 25672
    
    # 数据目录
    RABBITMQ_MNESIA_BASE: str = "/var/lib/rabbitmq/mnesia"
    RABBITMQ_LOG_BASE: str = "/var/log/rabbitmq"
    RABBITMQ_PLUGINS_DIR: str = "/usr/lib/rabbitmq/plugins"
    
    # 配置文件
    RABBITMQ_CONFIG_FILE: str = "/etc/rabbitmq/rabbitmq"
    RABBITMQ_ADVANCED_CONFIG_FILE: str = "/etc/rabbitmq/advanced.config"
    RABBITMQ_ENABLED_PLUGINS_FILE: str = "/etc/rabbitmq/enabled_plugins"
    
    # 用户配置
    RABBITMQ_DEFAULT_USER: str = "guest"
    RABBITMQ_DEFAULT_PASS: str = "guest"
    RABBITMQ_DEFAULT_VHOST: str = "/"
    
    # 集群配置
    RABBITMQ_USE_LONGNAME: bool = False
    RABBITMQ_CLUSTER_NODES: str = ""
    RABBITMQ_ERLANG_COOKIE: str = ""
    
    # 内存和磁盘限制
    RABBITMQ_VM_MEMORY_HIGH_WATERMARK: float = 0.4
    RABBITMQ_DISK_FREE_LIMIT: str = "2GB"
    
    # SSL配置
    RABBITMQ_SSL_CERTFILE: str = ""
    RABBITMQ_SSL_KEYFILE: str = ""
    RABBITMQ_SSL_CACERTFILE: str = ""
    
    # 管理插件配置
    RABBITMQ_MANAGEMENT_SSL_PORT: int = 15671
    RABBITMQ_MANAGEMENT_SSL_CERTFILE: str = ""
    RABBITMQ_MANAGEMENT_SSL_KEYFILE: str = ""
    RABBITMQ_MANAGEMENT_SSL_CACERTFILE: str = ""

class RabbitMQEnvironmentManager:
    """RabbitMQ环境变量管理器"""
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
    
    def set_environment(self, env_config: RabbitMQEnvironment):
        """设置环境变量"""
        try:
            env_vars = asdict(env_config)
            
            for key, value in env_vars.items():
                if value:  # 只设置非空值
                    os.environ[key] = str(value)
                    self.logger.debug(f"设置环境变量: {key}={value}")
            
            self.logger.info("RabbitMQ环境变量设置完成")
        
        except Exception as e:
            self.logger.error(f"设置环境变量失败: {e}")
            raise
    
    def get_environment(self) -> Dict[str, str]:
        """获取RabbitMQ相关环境变量"""
        rabbitmq_env = {}
        
        for key, value in os.environ.items():
            if key.startswith('RABBITMQ_'):
                rabbitmq_env[key] = value
        
        return rabbitmq_env
    
    def generate_env_file(self, env_config: RabbitMQEnvironment, file_path: str = ".env"):
        """生成环境变量文件"""
        try:
            env_vars = asdict(env_config)
            
            with open(file_path, 'w', encoding='utf-8') as f:
                f.write("# RabbitMQ Environment Variables\n")
                f.write("# Generated by RabbitMQEnvironmentManager\n\n")
                
                for key, value in env_vars.items():
                    if value:  # 只写入非空值
                        f.write(f"{key}={value}\n")
            
            self.logger.info(f"环境变量文件已生成: {file_path}")
        
        except Exception as e:
            self.logger.error(f"生成环境变量文件失败: {e}")
            raise
    
    def load_env_file(self, file_path: str = ".env") -> bool:
        """加载环境变量文件"""
        try:
            if not os.path.exists(file_path):
                self.logger.warning(f"环境变量文件不存在: {file_path}")
                return False
            
            with open(file_path, 'r', encoding='utf-8') as f:
                for line in f:
                    line = line.strip()
                    if line and not line.startswith('#') and '=' in line:
                        key, value = line.split('=', 1)
                        os.environ[key] = value
                        self.logger.debug(f"加载环境变量: {key}={value}")
            
            self.logger.info(f"环境变量文件加载完成: {file_path}")
            return True
        
        except Exception as e:
            self.logger.error(f"加载环境变量文件失败: {e}")
            return False
    
    def validate_environment(self) -> Dict[str, Any]:
        """验证环境变量配置"""
        validation_result = {
            'valid': True,
            'errors': [],
            'warnings': []
        }
        
        # 检查必需的环境变量
        required_vars = [
            'RABBITMQ_NODE_NAME',
            'RABBITMQ_MNESIA_BASE',
            'RABBITMQ_LOG_BASE'
        ]
        
        for var in required_vars:
            if not os.environ.get(var):
                validation_result['errors'].append(f"缺少必需的环境变量: {var}")
                validation_result['valid'] = False
        
        # 检查端口配置
        node_port = os.environ.get('RABBITMQ_NODE_PORT')
        if node_port:
            try:
                port = int(node_port)
                if port < 1024 or port > 65535:
                    validation_result['warnings'].append(f"端口号可能不合适: {port}")
            except ValueError:
                validation_result['errors'].append(f"无效的端口号: {node_port}")
                validation_result['valid'] = False
        
        # 检查内存水位配置
        memory_watermark = os.environ.get('RABBITMQ_VM_MEMORY_HIGH_WATERMARK')
        if memory_watermark:
            try:
                watermark = float(memory_watermark)
                if watermark < 0.1 or watermark > 1.0:
                    validation_result['warnings'].append(f"内存水位可能不合适: {watermark}")
            except ValueError:
                validation_result['errors'].append(f"无效的内存水位: {memory_watermark}")
                validation_result['valid'] = False
        
        return validation_result

# 使用示例
if __name__ == "__main__":
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    print("=== RabbitMQ环境变量管理示例 ===")
    
    # 创建环境变量管理器
    env_manager = RabbitMQEnvironmentManager()
    
    # 创建环境变量配置
    env_config = RabbitMQEnvironment(
        RABBITMQ_NODE_NAME="rabbit@myhost",
        RABBITMQ_NODE_PORT=5672,
        RABBITMQ_DEFAULT_USER="admin",
        RABBITMQ_DEFAULT_PASS="admin123",
        RABBITMQ_VM_MEMORY_HIGH_WATERMARK=0.4,
        RABBITMQ_DISK_FREE_LIMIT="2GB"
    )
    
    try:
        # 设置环境变量
        print("\n1. 设置环境变量")
        env_manager.set_environment(env_config)
        print("✅ 环境变量设置成功")
        
        # 生成环境变量文件
        print("\n2. 生成环境变量文件")
        env_manager.generate_env_file(env_config, "rabbitmq.env")
        print("✅ 环境变量文件生成成功")
        
        # 获取当前环境变量
        print("\n3. 获取RabbitMQ环境变量")
        current_env = env_manager.get_environment()
        print(f"当前RabbitMQ环境变量数量: {len(current_env)}")
        
        # 验证环境变量
        print("\n4. 验证环境变量配置")
        validation = env_manager.validate_environment()
        if validation['valid']:
            print("✅ 环境变量配置有效")
        else:
            print("❌ 环境变量配置无效")
            for error in validation['errors']:
                print(f"  错误: {error}")
        
        for warning in validation['warnings']:
            print(f"  警告: {warning}")
    
    except Exception as e:
        print(f"❌ 错误: {e}")

2.3 章节总结

2.3.1 核心知识点

  1. 安装方式

    • Docker容器化部署(推荐)
    • Linux系统原生安装
    • 自动化安装脚本
  2. 配置管理

    • 主配置文件(rabbitmq.conf)
    • 高级配置文件(advanced.config)
    • 插件配置和定义文件
  3. 环境变量

    • 系统级环境变量配置
    • 容器环境变量管理
    • 配置验证和优化

2.3.2 最佳实践

  1. 部署建议

    • 优先使用Docker部署
    • 配置持久化存储
    • 设置合理的资源限制
  2. 配置管理

    • 版本化配置文件
    • 环境隔离配置
    • 配置验证机制
  3. 安全配置

    • 修改默认用户密码
    • 启用SSL/TLS
    • 配置防火墙规则

2.3.3 练习题

  1. 基础练习

    • 使用Docker部署RabbitMQ
    • 配置管理界面访问
    • 创建自定义用户和虚拟主机
  2. 进阶练习

    • 编写自动化安装脚本
    • 实现配置文件生成器
    • 设计环境变量管理系统
  3. 实战练习

    • 部署RabbitMQ集群
    • 配置SSL/TLS加密
    • 实现配置热更新机制

通过本章的学习,我们掌握了RabbitMQ的安装部署和配置管理方法。这些技能为后续学习RabbitMQ的核心功能和高级特性提供了坚实的基础。