2.1 RabbitMQ安装
2.1.1 Docker安装(推荐)
# docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3.12-management
container_name: rabbitmq
hostname: rabbitmq
ports:
- "5672:5672" # AMQP端口
- "15672:15672" # 管理界面端口
- "25672:25672" # 集群端口
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: admin123
RABBITMQ_DEFAULT_VHOST: /
volumes:
- rabbitmq_data:/var/lib/rabbitmq
- ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
- ./definitions.json:/etc/rabbitmq/definitions.json
networks:
- rabbitmq_network
restart: unless-stopped
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "ping"]
interval: 30s
timeout: 10s
retries: 3
volumes:
rabbitmq_data:
driver: local
networks:
rabbitmq_network:
driver: bridge
import docker
import time
import requests
from typing import Dict, Any, Optional
import logging
class RabbitMQDockerManager:
"""RabbitMQ Docker管理器"""
def __init__(self):
self.client = docker.from_env()
self.container_name = 'rabbitmq'
self.image = 'rabbitmq:3.12-management'
self.logger = logging.getLogger(__name__)
def start_container(self, config: Dict[str, Any] = None) -> bool:
"""启动RabbitMQ容器"""
try:
# 默认配置
default_config = {
'ports': {
'5672/tcp': 5672,
'15672/tcp': 15672,
'25672/tcp': 25672
},
'environment': {
'RABBITMQ_DEFAULT_USER': 'admin',
'RABBITMQ_DEFAULT_PASS': 'admin123',
'RABBITMQ_DEFAULT_VHOST': '/'
},
'volumes': {
'rabbitmq_data': {'bind': '/var/lib/rabbitmq', 'mode': 'rw'}
},
'detach': True,
'remove': False,
'restart_policy': {'Name': 'unless-stopped'}
}
# 合并配置
if config:
default_config.update(config)
# 检查容器是否已存在
try:
existing_container = self.client.containers.get(self.container_name)
if existing_container.status == 'running':
self.logger.info(f"容器 {self.container_name} 已在运行")
return True
else:
existing_container.start()
self.logger.info(f"启动现有容器 {self.container_name}")
return True
except docker.errors.NotFound:
pass
# 创建并启动新容器
container = self.client.containers.run(
self.image,
name=self.container_name,
hostname='rabbitmq',
**default_config
)
self.logger.info(f"RabbitMQ容器启动成功: {container.id[:12]}")
# 等待服务启动
self.wait_for_service()
return True
except Exception as e:
self.logger.error(f"启动RabbitMQ容器失败: {e}")
return False
def stop_container(self) -> bool:
"""停止RabbitMQ容器"""
try:
container = self.client.containers.get(self.container_name)
container.stop()
self.logger.info(f"RabbitMQ容器已停止: {self.container_name}")
return True
except docker.errors.NotFound:
self.logger.warning(f"容器 {self.container_name} 不存在")
return True
except Exception as e:
self.logger.error(f"停止RabbitMQ容器失败: {e}")
return False
def remove_container(self) -> bool:
"""删除RabbitMQ容器"""
try:
container = self.client.containers.get(self.container_name)
container.stop()
container.remove()
self.logger.info(f"RabbitMQ容器已删除: {self.container_name}")
return True
except docker.errors.NotFound:
self.logger.warning(f"容器 {self.container_name} 不存在")
return True
except Exception as e:
self.logger.error(f"删除RabbitMQ容器失败: {e}")
return False
def get_container_status(self) -> Optional[Dict[str, Any]]:
"""获取容器状态"""
try:
container = self.client.containers.get(self.container_name)
return {
'id': container.id,
'name': container.name,
'status': container.status,
'image': container.image.tags[0] if container.image.tags else 'unknown',
'ports': container.ports,
'created': container.attrs['Created'],
'started': container.attrs['State']['StartedAt']
}
except docker.errors.NotFound:
return None
except Exception as e:
self.logger.error(f"获取容器状态失败: {e}")
return None
def wait_for_service(self, timeout: int = 60) -> bool:
"""等待RabbitMQ服务启动"""
start_time = time.time()
while time.time() - start_time < timeout:
try:
# 检查管理界面是否可访问
response = requests.get('http://localhost:15672', timeout=5)
if response.status_code == 200:
self.logger.info("RabbitMQ服务已启动")
return True
except requests.exceptions.RequestException:
pass
time.sleep(2)
self.logger.error(f"等待RabbitMQ服务启动超时 ({timeout}秒)")
return False
def get_logs(self, tail: int = 100) -> str:
"""获取容器日志"""
try:
container = self.client.containers.get(self.container_name)
logs = container.logs(tail=tail, timestamps=True)
return logs.decode('utf-8')
except Exception as e:
self.logger.error(f"获取容器日志失败: {e}")
return ""
# 使用示例
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
print("=== RabbitMQ Docker管理示例 ===")
manager = RabbitMQDockerManager()
try:
# 启动容器
print("\n1. 启动RabbitMQ容器")
if manager.start_container():
print("✅ 容器启动成功")
# 获取容器状态
status = manager.get_container_status()
if status:
print(f"容器状态: {status['status']}")
print(f"端口映射: {status['ports']}")
print("\n管理界面: http://localhost:15672")
print("用户名: admin")
print("密码: admin123")
else:
print("❌ 容器启动失败")
except KeyboardInterrupt:
print("\n收到中断信号,正在停止容器...")
manager.stop_container()
except Exception as e:
print(f"❌ 错误: {e}")
2.1.2 Linux系统安装
#!/bin/bash
# install_rabbitmq.sh - RabbitMQ安装脚本
set -e
# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# 日志函数
log_info() {
echo -e "${GREEN}[INFO]${NC} $1"
}
log_warn() {
echo -e "${YELLOW}[WARN]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# 检测操作系统
detect_os() {
if [ -f /etc/os-release ]; then
. /etc/os-release
OS=$NAME
VER=$VERSION_ID
else
log_error "无法检测操作系统"
exit 1
fi
log_info "检测到操作系统: $OS $VER"
}
# Ubuntu/Debian安装
install_ubuntu() {
log_info "开始在Ubuntu/Debian上安装RabbitMQ"
# 更新包列表
sudo apt-get update
# 安装依赖
sudo apt-get install -y curl gnupg apt-transport-https
# 添加RabbitMQ签名密钥
curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null
# 添加RabbitMQ仓库
echo "deb [signed-by=/usr/share/keyrings/com.rabbitmq.team.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
echo "deb [signed-by=/usr/share/keyrings/com.rabbitmq.team.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu $(lsb_release -cs) main" | sudo tee -a /etc/apt/sources.list.d/rabbitmq.list
# 更新包列表
sudo apt-get update
# 安装Erlang和RabbitMQ
sudo apt-get install -y erlang-base erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key erlang-runtime-tools erlang-snmp erlang-ssl erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl
sudo apt-get install -y rabbitmq-server
log_info "RabbitMQ安装完成"
}
# CentOS/RHEL安装
install_centos() {
log_info "开始在CentOS/RHEL上安装RabbitMQ"
# 安装EPEL仓库
sudo yum install -y epel-release
# 添加RabbitMQ仓库
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
# 安装Erlang
sudo yum install -y erlang
# 安装RabbitMQ
sudo yum install -y rabbitmq-server
log_info "RabbitMQ安装完成"
}
# 配置RabbitMQ
configure_rabbitmq() {
log_info "配置RabbitMQ"
# 启用管理插件
sudo rabbitmq-plugins enable rabbitmq_management
# 启动RabbitMQ服务
sudo systemctl enable rabbitmq-server
sudo systemctl start rabbitmq-server
# 等待服务启动
sleep 5
# 创建管理员用户
sudo rabbitmqctl add_user admin admin123
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
# 删除默认guest用户(可选)
sudo rabbitmqctl delete_user guest || true
log_info "RabbitMQ配置完成"
log_info "管理界面: http://localhost:15672"
log_info "用户名: admin"
log_info "密码: admin123"
}
# 主函数
main() {
log_info "开始安装RabbitMQ"
detect_os
case "$OS" in
"Ubuntu"*|"Debian"*)
install_ubuntu
;;
"CentOS"*|"Red Hat"*)
install_centos
;;
*)
log_error "不支持的操作系统: $OS"
exit 1
;;
esac
configure_rabbitmq
log_info "RabbitMQ安装和配置完成!"
}
# 执行主函数
main "$@"
2.2 配置管理
2.2.1 配置文件管理
import os
import json
import yaml
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, asdict
from pathlib import Path
@dataclass
class RabbitMQServerConfig:
"""RabbitMQ服务器配置"""
# 网络配置
listeners_tcp_default: int = 5672
listeners_ssl_default: Optional[int] = None
# 管理界面配置
management_tcp_port: int = 15672
management_ssl_port: Optional[int] = None
# 集群配置
cluster_formation_peer_discovery_backend: str = "rabbit_peer_discovery_classic_config"
cluster_formation_classic_config_nodes: List[str] = None
# 内存配置
vm_memory_high_watermark: float = 0.4
vm_memory_high_watermark_paging_ratio: float = 0.5
# 磁盘配置
disk_free_limit_absolute: str = "2GB"
disk_free_limit_relative: float = 1.0
# 日志配置
log_levels_default: str = "info"
log_levels_connection: str = "info"
log_levels_channel: str = "info"
# 队列配置
queue_master_locator: str = "min-masters"
# 其他配置
heartbeat: int = 60
frame_max: int = 131072
channel_max: int = 2047
def __post_init__(self):
if self.cluster_formation_classic_config_nodes is None:
self.cluster_formation_classic_config_nodes = []
@dataclass
class RabbitMQAdvancedConfig:
"""RabbitMQ高级配置"""
# 认证配置
auth_mechanisms: List[str] = None
auth_backends: List[str] = None
# SSL配置
ssl_options: Dict[str, Any] = None
# 插件配置
enabled_plugins: List[str] = None
# 策略配置
policies: List[Dict[str, Any]] = None
# 用户配置
users: List[Dict[str, Any]] = None
# 虚拟主机配置
vhosts: List[Dict[str, Any]] = None
def __post_init__(self):
if self.auth_mechanisms is None:
self.auth_mechanisms = ['PLAIN', 'AMQPLAIN']
if self.auth_backends is None:
self.auth_backends = ['rabbit_auth_backend_internal']
if self.ssl_options is None:
self.ssl_options = {}
if self.enabled_plugins is None:
self.enabled_plugins = ['rabbitmq_management']
if self.policies is None:
self.policies = []
if self.users is None:
self.users = []
if self.vhosts is None:
self.vhosts = []
class RabbitMQConfigManager:
"""RabbitMQ配置管理器"""
def __init__(self, config_dir: str = "/etc/rabbitmq"):
self.config_dir = Path(config_dir)
self.config_file = self.config_dir / "rabbitmq.conf"
self.advanced_config_file = self.config_dir / "advanced.config"
self.enabled_plugins_file = self.config_dir / "enabled_plugins"
self.definitions_file = self.config_dir / "definitions.json"
self.logger = logging.getLogger(__name__)
def generate_config(self, config: RabbitMQServerConfig) -> str:
"""生成配置文件内容"""
config_lines = []
config_lines.append("# RabbitMQ Configuration File")
config_lines.append("# Generated by RabbitMQConfigManager")
config_lines.append("")
# 网络配置
config_lines.append("# Network Configuration")
config_lines.append(f"listeners.tcp.default = {config.listeners_tcp_default}")
if config.listeners_ssl_default:
config_lines.append(f"listeners.ssl.default = {config.listeners_ssl_default}")
config_lines.append("")
# 管理界面配置
config_lines.append("# Management Plugin Configuration")
config_lines.append(f"management.tcp.port = {config.management_tcp_port}")
if config.management_ssl_port:
config_lines.append(f"management.ssl.port = {config.management_ssl_port}")
config_lines.append("")
# 集群配置
if config.cluster_formation_classic_config_nodes:
config_lines.append("# Cluster Configuration")
config_lines.append(f"cluster_formation.peer_discovery_backend = {config.cluster_formation_peer_discovery_backend}")
nodes_str = ",".join([f"rabbit@{node}" for node in config.cluster_formation_classic_config_nodes])
config_lines.append(f"cluster_formation.classic_config.nodes.1 = {nodes_str}")
config_lines.append("")
# 内存配置
config_lines.append("# Memory Configuration")
config_lines.append(f"vm_memory_high_watermark.relative = {config.vm_memory_high_watermark}")
config_lines.append(f"vm_memory_high_watermark_paging_ratio = {config.vm_memory_high_watermark_paging_ratio}")
config_lines.append("")
# 磁盘配置
config_lines.append("# Disk Configuration")
config_lines.append(f"disk_free_limit.absolute = {config.disk_free_limit_absolute}")
config_lines.append("")
# 日志配置
config_lines.append("# Logging Configuration")
config_lines.append(f"log.console.level = {config.log_levels_default}")
config_lines.append(f"log.connection.level = {config.log_levels_connection}")
config_lines.append(f"log.channel.level = {config.log_levels_channel}")
config_lines.append("")
# 其他配置
config_lines.append("# Other Configuration")
config_lines.append(f"heartbeat = {config.heartbeat}")
config_lines.append(f"frame_max = {config.frame_max}")
config_lines.append(f"channel_max = {config.channel_max}")
config_lines.append(f"queue_master_locator = {config.queue_master_locator}")
return "\n".join(config_lines)
def generate_advanced_config(self, config: RabbitMQAdvancedConfig) -> str:
"""生成高级配置文件内容"""
advanced_config = {
'rabbit': {
'auth_mechanisms': config.auth_mechanisms,
'auth_backends': config.auth_backends
}
}
if config.ssl_options:
advanced_config['rabbit']['ssl_options'] = config.ssl_options
# 转换为Erlang配置格式
return self._dict_to_erlang_config(advanced_config)
def generate_enabled_plugins(self, plugins: List[str]) -> str:
"""生成启用插件文件内容"""
plugins_str = ",".join(plugins)
return f"[{plugins_str}].\n"
def generate_definitions(self, config: RabbitMQAdvancedConfig) -> str:
"""生成定义文件内容"""
definitions = {
'rabbit_version': '3.12.0',
'rabbitmq_version': '3.12.0',
'product_name': 'RabbitMQ',
'product_version': '3.12.0',
'users': config.users,
'vhosts': config.vhosts,
'permissions': [],
'topic_permissions': [],
'parameters': [],
'global_parameters': [],
'policies': config.policies,
'queues': [],
'exchanges': [],
'bindings': []
}
return json.dumps(definitions, indent=2)
def save_config(self, config: RabbitMQServerConfig, advanced_config: RabbitMQAdvancedConfig = None):
"""保存配置文件"""
try:
# 创建配置目录
self.config_dir.mkdir(parents=True, exist_ok=True)
# 保存主配置文件
config_content = self.generate_config(config)
with open(self.config_file, 'w', encoding='utf-8') as f:
f.write(config_content)
self.logger.info(f"主配置文件已保存: {self.config_file}")
# 保存高级配置文件
if advanced_config:
advanced_config_content = self.generate_advanced_config(advanced_config)
with open(self.advanced_config_file, 'w', encoding='utf-8') as f:
f.write(advanced_config_content)
self.logger.info(f"高级配置文件已保存: {self.advanced_config_file}")
# 保存启用插件文件
plugins_content = self.generate_enabled_plugins(advanced_config.enabled_plugins)
with open(self.enabled_plugins_file, 'w', encoding='utf-8') as f:
f.write(plugins_content)
self.logger.info(f"插件配置文件已保存: {self.enabled_plugins_file}")
# 保存定义文件
definitions_content = self.generate_definitions(advanced_config)
with open(self.definitions_file, 'w', encoding='utf-8') as f:
f.write(definitions_content)
self.logger.info(f"定义文件已保存: {self.definitions_file}")
except Exception as e:
self.logger.error(f"保存配置文件失败: {e}")
raise
def load_config(self) -> Optional[Dict[str, Any]]:
"""加载配置文件"""
try:
config_data = {}
# 加载主配置文件
if self.config_file.exists():
with open(self.config_file, 'r', encoding='utf-8') as f:
config_data['main_config'] = f.read()
# 加载高级配置文件
if self.advanced_config_file.exists():
with open(self.advanced_config_file, 'r', encoding='utf-8') as f:
config_data['advanced_config'] = f.read()
# 加载插件配置
if self.enabled_plugins_file.exists():
with open(self.enabled_plugins_file, 'r', encoding='utf-8') as f:
config_data['enabled_plugins'] = f.read()
# 加载定义文件
if self.definitions_file.exists():
with open(self.definitions_file, 'r', encoding='utf-8') as f:
config_data['definitions'] = json.loads(f.read())
return config_data
except Exception as e:
self.logger.error(f"加载配置文件失败: {e}")
return None
def _dict_to_erlang_config(self, config_dict: Dict[str, Any]) -> str:
"""将字典转换为Erlang配置格式"""
def format_value(value):
if isinstance(value, str):
return f'"{value}"'
elif isinstance(value, bool):
return 'true' if value else 'false'
elif isinstance(value, list):
items = [format_value(item) for item in value]
return f"[{', '.join(items)}]"
elif isinstance(value, dict):
items = [f"{key}, {format_value(val)}" for key, val in value.items()]
return f"{{{', '.join(items)}}}"
else:
return str(value)
lines = []
for app, config in config_dict.items():
lines.append(f"{{{app}, [")
for key, value in config.items():
lines.append(f" {{{key}, {format_value(value)}}},")
lines.append("]}.")
return "\n".join(lines)
# 使用示例
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
print("=== RabbitMQ配置管理示例 ===")
# 创建配置管理器
config_manager = RabbitMQConfigManager("./rabbitmq_config")
# 创建服务器配置
server_config = RabbitMQServerConfig(
listeners_tcp_default=5672,
management_tcp_port=15672,
vm_memory_high_watermark=0.4,
disk_free_limit_absolute="2GB",
heartbeat=60
)
# 创建高级配置
advanced_config = RabbitMQAdvancedConfig(
enabled_plugins=['rabbitmq_management', 'rabbitmq_prometheus'],
users=[
{
'name': 'admin',
'password_hash': 'admin123',
'hashing_algorithm': 'rabbit_password_hashing_sha256',
'tags': 'administrator'
}
],
vhosts=[
{'name': '/'},
{'name': '/test'}
],
policies=[
{
'vhost': '/',
'name': 'ha-all',
'pattern': '.*',
'apply-to': 'queues',
'definition': {
'ha-mode': 'all',
'ha-sync-mode': 'automatic'
},
'priority': 0
}
]
)
try:
# 保存配置
print("\n1. 保存配置文件")
config_manager.save_config(server_config, advanced_config)
print("✅ 配置文件保存成功")
# 加载配置
print("\n2. 加载配置文件")
loaded_config = config_manager.load_config()
if loaded_config:
print("✅ 配置文件加载成功")
print(f"配置文件数量: {len(loaded_config)}")
else:
print("❌ 配置文件加载失败")
except Exception as e:
print(f"❌ 错误: {e}")
2.2.2 环境变量配置
import os
from typing import Dict, Any, Optional
from dataclasses import dataclass
@dataclass
class RabbitMQEnvironment:
"""RabbitMQ环境变量配置"""
# 基本配置
RABBITMQ_NODE_NAME: str = "rabbit@localhost"
RABBITMQ_NODENAME: str = "rabbit@localhost"
RABBITMQ_NODE_PORT: int = 5672
RABBITMQ_DIST_PORT: int = 25672
# 数据目录
RABBITMQ_MNESIA_BASE: str = "/var/lib/rabbitmq/mnesia"
RABBITMQ_LOG_BASE: str = "/var/log/rabbitmq"
RABBITMQ_PLUGINS_DIR: str = "/usr/lib/rabbitmq/plugins"
# 配置文件
RABBITMQ_CONFIG_FILE: str = "/etc/rabbitmq/rabbitmq"
RABBITMQ_ADVANCED_CONFIG_FILE: str = "/etc/rabbitmq/advanced.config"
RABBITMQ_ENABLED_PLUGINS_FILE: str = "/etc/rabbitmq/enabled_plugins"
# 用户配置
RABBITMQ_DEFAULT_USER: str = "guest"
RABBITMQ_DEFAULT_PASS: str = "guest"
RABBITMQ_DEFAULT_VHOST: str = "/"
# 集群配置
RABBITMQ_USE_LONGNAME: bool = False
RABBITMQ_CLUSTER_NODES: str = ""
RABBITMQ_ERLANG_COOKIE: str = ""
# 内存和磁盘限制
RABBITMQ_VM_MEMORY_HIGH_WATERMARK: float = 0.4
RABBITMQ_DISK_FREE_LIMIT: str = "2GB"
# SSL配置
RABBITMQ_SSL_CERTFILE: str = ""
RABBITMQ_SSL_KEYFILE: str = ""
RABBITMQ_SSL_CACERTFILE: str = ""
# 管理插件配置
RABBITMQ_MANAGEMENT_SSL_PORT: int = 15671
RABBITMQ_MANAGEMENT_SSL_CERTFILE: str = ""
RABBITMQ_MANAGEMENT_SSL_KEYFILE: str = ""
RABBITMQ_MANAGEMENT_SSL_CACERTFILE: str = ""
class RabbitMQEnvironmentManager:
"""RabbitMQ环境变量管理器"""
def __init__(self):
self.logger = logging.getLogger(__name__)
def set_environment(self, env_config: RabbitMQEnvironment):
"""设置环境变量"""
try:
env_vars = asdict(env_config)
for key, value in env_vars.items():
if value: # 只设置非空值
os.environ[key] = str(value)
self.logger.debug(f"设置环境变量: {key}={value}")
self.logger.info("RabbitMQ环境变量设置完成")
except Exception as e:
self.logger.error(f"设置环境变量失败: {e}")
raise
def get_environment(self) -> Dict[str, str]:
"""获取RabbitMQ相关环境变量"""
rabbitmq_env = {}
for key, value in os.environ.items():
if key.startswith('RABBITMQ_'):
rabbitmq_env[key] = value
return rabbitmq_env
def generate_env_file(self, env_config: RabbitMQEnvironment, file_path: str = ".env"):
"""生成环境变量文件"""
try:
env_vars = asdict(env_config)
with open(file_path, 'w', encoding='utf-8') as f:
f.write("# RabbitMQ Environment Variables\n")
f.write("# Generated by RabbitMQEnvironmentManager\n\n")
for key, value in env_vars.items():
if value: # 只写入非空值
f.write(f"{key}={value}\n")
self.logger.info(f"环境变量文件已生成: {file_path}")
except Exception as e:
self.logger.error(f"生成环境变量文件失败: {e}")
raise
def load_env_file(self, file_path: str = ".env") -> bool:
"""加载环境变量文件"""
try:
if not os.path.exists(file_path):
self.logger.warning(f"环境变量文件不存在: {file_path}")
return False
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
os.environ[key] = value
self.logger.debug(f"加载环境变量: {key}={value}")
self.logger.info(f"环境变量文件加载完成: {file_path}")
return True
except Exception as e:
self.logger.error(f"加载环境变量文件失败: {e}")
return False
def validate_environment(self) -> Dict[str, Any]:
"""验证环境变量配置"""
validation_result = {
'valid': True,
'errors': [],
'warnings': []
}
# 检查必需的环境变量
required_vars = [
'RABBITMQ_NODE_NAME',
'RABBITMQ_MNESIA_BASE',
'RABBITMQ_LOG_BASE'
]
for var in required_vars:
if not os.environ.get(var):
validation_result['errors'].append(f"缺少必需的环境变量: {var}")
validation_result['valid'] = False
# 检查端口配置
node_port = os.environ.get('RABBITMQ_NODE_PORT')
if node_port:
try:
port = int(node_port)
if port < 1024 or port > 65535:
validation_result['warnings'].append(f"端口号可能不合适: {port}")
except ValueError:
validation_result['errors'].append(f"无效的端口号: {node_port}")
validation_result['valid'] = False
# 检查内存水位配置
memory_watermark = os.environ.get('RABBITMQ_VM_MEMORY_HIGH_WATERMARK')
if memory_watermark:
try:
watermark = float(memory_watermark)
if watermark < 0.1 or watermark > 1.0:
validation_result['warnings'].append(f"内存水位可能不合适: {watermark}")
except ValueError:
validation_result['errors'].append(f"无效的内存水位: {memory_watermark}")
validation_result['valid'] = False
return validation_result
# 使用示例
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
print("=== RabbitMQ环境变量管理示例 ===")
# 创建环境变量管理器
env_manager = RabbitMQEnvironmentManager()
# 创建环境变量配置
env_config = RabbitMQEnvironment(
RABBITMQ_NODE_NAME="rabbit@myhost",
RABBITMQ_NODE_PORT=5672,
RABBITMQ_DEFAULT_USER="admin",
RABBITMQ_DEFAULT_PASS="admin123",
RABBITMQ_VM_MEMORY_HIGH_WATERMARK=0.4,
RABBITMQ_DISK_FREE_LIMIT="2GB"
)
try:
# 设置环境变量
print("\n1. 设置环境变量")
env_manager.set_environment(env_config)
print("✅ 环境变量设置成功")
# 生成环境变量文件
print("\n2. 生成环境变量文件")
env_manager.generate_env_file(env_config, "rabbitmq.env")
print("✅ 环境变量文件生成成功")
# 获取当前环境变量
print("\n3. 获取RabbitMQ环境变量")
current_env = env_manager.get_environment()
print(f"当前RabbitMQ环境变量数量: {len(current_env)}")
# 验证环境变量
print("\n4. 验证环境变量配置")
validation = env_manager.validate_environment()
if validation['valid']:
print("✅ 环境变量配置有效")
else:
print("❌ 环境变量配置无效")
for error in validation['errors']:
print(f" 错误: {error}")
for warning in validation['warnings']:
print(f" 警告: {warning}")
except Exception as e:
print(f"❌ 错误: {e}")
2.3 章节总结
2.3.1 核心知识点
安装方式
- Docker容器化部署(推荐)
- Linux系统原生安装
- 自动化安装脚本
配置管理
- 主配置文件(rabbitmq.conf)
- 高级配置文件(advanced.config)
- 插件配置和定义文件
环境变量
- 系统级环境变量配置
- 容器环境变量管理
- 配置验证和优化
2.3.2 最佳实践
部署建议
- 优先使用Docker部署
- 配置持久化存储
- 设置合理的资源限制
配置管理
- 版本化配置文件
- 环境隔离配置
- 配置验证机制
安全配置
- 修改默认用户密码
- 启用SSL/TLS
- 配置防火墙规则
2.3.3 练习题
基础练习
- 使用Docker部署RabbitMQ
- 配置管理界面访问
- 创建自定义用户和虚拟主机
进阶练习
- 编写自动化安装脚本
- 实现配置文件生成器
- 设计环境变量管理系统
实战练习
- 部署RabbitMQ集群
- 配置SSL/TLS加密
- 实现配置热更新机制
通过本章的学习,我们掌握了RabbitMQ的安装部署和配置管理方法。这些技能为后续学习RabbitMQ的核心功能和高级特性提供了坚实的基础。