6.1 性能优化策略
6.1.1 连接与通道优化
from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Any, Callable
import pika
import threading
import time
import logging
from concurrent.futures import ThreadPoolExecutor
import queue
import weakref
class ConnectionPoolStrategy(Enum):
"""连接池策略"""
FIXED = "fixed" # 固定大小
DYNAMIC = "dynamic" # 动态调整
ELASTIC = "elastic" # 弹性伸缩
@dataclass
class ConnectionPoolConfig:
"""连接池配置"""
strategy: ConnectionPoolStrategy = ConnectionPoolStrategy.DYNAMIC
min_connections: int = 2
max_connections: int = 10
initial_connections: int = 3
connection_timeout: int = 30
idle_timeout: int = 300
max_retries: int = 3
retry_delay: int = 1000
health_check_interval: int = 60
enable_heartbeat: bool = True
heartbeat_interval: int = 600
class OptimizedConnection:
"""优化的连接对象"""
def __init__(self, connection_params: pika.ConnectionParameters, pool_id: str):
self.connection_params = connection_params
self.pool_id = pool_id
self.connection = None
self.channels = {}
self.created_at = time.time()
self.last_used = time.time()
self.is_healthy = True
self.lock = threading.Lock()
self.logger = logging.getLogger(__name__)
# 建立连接
self._establish_connection()
def _establish_connection(self):
"""建立连接"""
try:
self.connection = pika.BlockingConnection(self.connection_params)
self.is_healthy = True
self.logger.info(f"✅ 连接建立成功: {self.pool_id}")
except Exception as e:
self.is_healthy = False
self.logger.error(f"❌ 连接建立失败: {e}")
raise
def get_channel(self, channel_id: str = None) -> pika.channel.Channel:
"""获取通道"""
with self.lock:
if not self.is_healthy or not self.connection or self.connection.is_closed:
self._establish_connection()
if channel_id is None:
channel_id = f"ch_{int(time.time() * 1000000)}"
if channel_id not in self.channels or self.channels[channel_id].is_closed:
self.channels[channel_id] = self.connection.channel()
self.logger.debug(f"📡 创建新通道: {channel_id}")
self.last_used = time.time()
return self.channels[channel_id]
def close_channel(self, channel_id: str):
"""关闭通道"""
with self.lock:
if channel_id in self.channels:
try:
if not self.channels[channel_id].is_closed:
self.channels[channel_id].close()
del self.channels[channel_id]
self.logger.debug(f"🔌 通道已关闭: {channel_id}")
except Exception as e:
self.logger.error(f"❌ 关闭通道失败: {e}")
def health_check(self) -> bool:
"""健康检查"""
try:
if not self.connection or self.connection.is_closed:
self.is_healthy = False
return False
# 简单的心跳检查
self.connection.process_data_events(time_limit=1)
self.is_healthy = True
return True
except Exception as e:
self.logger.error(f"❌ 健康检查失败: {e}")
self.is_healthy = False
return False
def is_idle(self, idle_timeout: int) -> bool:
"""检查是否空闲"""
return time.time() - self.last_used > idle_timeout
def close(self):
"""关闭连接"""
with self.lock:
# 关闭所有通道
for channel_id in list(self.channels.keys()):
self.close_channel(channel_id)
# 关闭连接
if self.connection and not self.connection.is_closed:
try:
self.connection.close()
self.logger.info(f"🔌 连接已关闭: {self.pool_id}")
except Exception as e:
self.logger.error(f"❌ 关闭连接失败: {e}")
self.is_healthy = False
class ConnectionPool:
"""连接池"""
def __init__(self, connection_params: pika.ConnectionParameters,
config: ConnectionPoolConfig = None):
self.connection_params = connection_params
self.config = config or ConnectionPoolConfig()
self.connections: Dict[str, OptimizedConnection] = {}
self.available_connections = queue.Queue()
self.lock = threading.Lock()
self.logger = logging.getLogger(__name__)
self.stats = {
'total_connections': 0,
'active_connections': 0,
'idle_connections': 0,
'failed_connections': 0,
'connection_requests': 0,
'connection_hits': 0
}
# 初始化连接池
self._initialize_pool()
# 启动健康检查线程
self._start_health_check_thread()
def _initialize_pool(self):
"""初始化连接池"""
for i in range(self.config.initial_connections):
try:
conn_id = f"conn_{i+1}"
connection = OptimizedConnection(self.connection_params, conn_id)
with self.lock:
self.connections[conn_id] = connection
self.available_connections.put(conn_id)
self.stats['total_connections'] += 1
self.logger.info(f"✅ 初始化连接: {conn_id}")
except Exception as e:
self.logger.error(f"❌ 初始化连接失败: {e}")
def _start_health_check_thread(self):
"""启动健康检查线程"""
def health_check_worker():
while True:
try:
self._perform_health_check()
time.sleep(self.config.health_check_interval)
except Exception as e:
self.logger.error(f"健康检查异常: {e}")
health_thread = threading.Thread(target=health_check_worker, daemon=True)
health_thread.start()
def _perform_health_check(self):
"""执行健康检查"""
with self.lock:
unhealthy_connections = []
for conn_id, connection in self.connections.items():
if not connection.health_check():
unhealthy_connections.append(conn_id)
elif connection.is_idle(self.config.idle_timeout):
# 处理空闲连接
if len(self.connections) > self.config.min_connections:
unhealthy_connections.append(conn_id)
# 移除不健康的连接
for conn_id in unhealthy_connections:
self._remove_connection(conn_id)
# 更新统计信息
self._update_stats()
def _remove_connection(self, conn_id: str):
"""移除连接"""
if conn_id in self.connections:
try:
self.connections[conn_id].close()
del self.connections[conn_id]
# 从可用队列中移除
temp_queue = queue.Queue()
while not self.available_connections.empty():
available_id = self.available_connections.get()
if available_id != conn_id:
temp_queue.put(available_id)
self.available_connections = temp_queue
self.stats['total_connections'] -= 1
self.logger.info(f"🗑️ 移除连接: {conn_id}")
except Exception as e:
self.logger.error(f"❌ 移除连接失败: {e}")
def _create_new_connection(self) -> Optional[str]:
"""创建新连接"""
if len(self.connections) >= self.config.max_connections:
return None
try:
conn_id = f"conn_{int(time.time() * 1000000)}"
connection = OptimizedConnection(self.connection_params, conn_id)
self.connections[conn_id] = connection
self.stats['total_connections'] += 1
self.logger.info(f"✅ 创建新连接: {conn_id}")
return conn_id
except Exception as e:
self.stats['failed_connections'] += 1
self.logger.error(f"❌ 创建连接失败: {e}")
return None
def get_connection(self, timeout: int = 30) -> Optional[OptimizedConnection]:
"""获取连接"""
with self.lock:
self.stats['connection_requests'] += 1
# 尝试从可用连接中获取
try:
conn_id = self.available_connections.get(timeout=timeout)
with self.lock:
if conn_id in self.connections and self.connections[conn_id].is_healthy:
self.stats['connection_hits'] += 1
return self.connections[conn_id]
else:
# 连接不可用,尝试创建新连接
new_conn_id = self._create_new_connection()
if new_conn_id:
return self.connections[new_conn_id]
except queue.Empty:
# 没有可用连接,尝试创建新连接
with self.lock:
new_conn_id = self._create_new_connection()
if new_conn_id:
return self.connections[new_conn_id]
return None
def return_connection(self, connection: OptimizedConnection):
"""归还连接"""
if connection.pool_id in self.connections and connection.is_healthy:
self.available_connections.put(connection.pool_id)
def _update_stats(self):
"""更新统计信息"""
active_count = 0
idle_count = 0
for connection in self.connections.values():
if connection.is_healthy:
if connection.is_idle(self.config.idle_timeout):
idle_count += 1
else:
active_count += 1
self.stats['active_connections'] = active_count
self.stats['idle_connections'] = idle_count
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
with self.lock:
self._update_stats()
return {
**self.stats,
'pool_size': len(self.connections),
'available_connections': self.available_connections.qsize(),
'hit_rate': (self.stats['connection_hits'] / max(self.stats['connection_requests'], 1)) * 100
}
def close_all(self):
"""关闭所有连接"""
with self.lock:
for connection in self.connections.values():
connection.close()
self.connections.clear()
# 清空可用连接队列
while not self.available_connections.empty():
self.available_connections.get()
self.logger.info("🔌 所有连接已关闭")
6.1.2 消息批处理优化
class BatchMode(Enum):
"""批处理模式"""
SIZE_BASED = "size" # 基于大小
TIME_BASED = "time" # 基于时间
HYBRID = "hybrid" # 混合模式
ADAPTIVE = "adaptive" # 自适应
@dataclass
class BatchConfig:
"""批处理配置"""
mode: BatchMode = BatchMode.HYBRID
batch_size: int = 100
batch_timeout: int = 1000 # 毫秒
max_batch_size: int = 1000
min_batch_size: int = 10
adaptive_threshold: float = 0.8 # 自适应阈值
compression_enabled: bool = True
compression_algorithm: str = "gzip" # gzip, lz4, snappy
class MessageBatch:
"""消息批次"""
def __init__(self, batch_id: str):
self.batch_id = batch_id
self.messages = []
self.created_at = time.time()
self.total_size = 0
self.lock = threading.Lock()
def add_message(self, exchange: str, routing_key: str, body: bytes,
properties: pika.BasicProperties = None) -> bool:
"""添加消息到批次"""
with self.lock:
message = {
'exchange': exchange,
'routing_key': routing_key,
'body': body,
'properties': properties,
'timestamp': time.time()
}
self.messages.append(message)
self.total_size += len(body)
return True
def get_messages(self) -> List[Dict[str, Any]]:
"""获取批次中的所有消息"""
with self.lock:
return self.messages.copy()
def size(self) -> int:
"""获取批次大小"""
with self.lock:
return len(self.messages)
def age(self) -> float:
"""获取批次年龄(秒)"""
return time.time() - self.created_at
def clear(self):
"""清空批次"""
with self.lock:
self.messages.clear()
self.total_size = 0
class BatchProcessor:
"""批处理器"""
def __init__(self, connection_pool: ConnectionPool, config: BatchConfig = None):
self.connection_pool = connection_pool
self.config = config or BatchConfig()
self.current_batch = None
self.batch_queue = queue.Queue()
self.lock = threading.Lock()
self.logger = logging.getLogger(__name__)
self.stats = {
'total_batches': 0,
'total_messages': 0,
'avg_batch_size': 0,
'processing_time': 0,
'compression_ratio': 0
}
# 启动批处理线程
self._start_batch_processing_thread()
def _start_batch_processing_thread(self):
"""启动批处理线程"""
def batch_worker():
while True:
try:
# 检查当前批次是否需要处理
self._check_and_process_current_batch()
# 处理队列中的批次
try:
batch = self.batch_queue.get(timeout=0.1)
self._process_batch(batch)
self.batch_queue.task_done()
except queue.Empty:
pass
except Exception as e:
self.logger.error(f"批处理线程异常: {e}")
time.sleep(0.1)
batch_thread = threading.Thread(target=batch_worker, daemon=True)
batch_thread.start()
def _check_and_process_current_batch(self):
"""检查并处理当前批次"""
with self.lock:
if self.current_batch is None:
return
should_process = False
if self.config.mode == BatchMode.SIZE_BASED:
should_process = self.current_batch.size() >= self.config.batch_size
elif self.config.mode == BatchMode.TIME_BASED:
should_process = self.current_batch.age() >= (self.config.batch_timeout / 1000)
elif self.config.mode == BatchMode.HYBRID:
should_process = (self.current_batch.size() >= self.config.batch_size or
self.current_batch.age() >= (self.config.batch_timeout / 1000))
elif self.config.mode == BatchMode.ADAPTIVE:
# 自适应模式:根据系统负载调整
load_factor = self._calculate_load_factor()
adaptive_size = int(self.config.batch_size * (1 + load_factor))
adaptive_size = min(adaptive_size, self.config.max_batch_size)
should_process = (self.current_batch.size() >= adaptive_size or
self.current_batch.age() >= (self.config.batch_timeout / 1000))
if should_process and self.current_batch.size() > 0:
self.batch_queue.put(self.current_batch)
self.current_batch = None
def _calculate_load_factor(self) -> float:
"""计算系统负载因子"""
# 简化的负载计算,实际应该考虑CPU、内存、网络等因素
queue_size = self.batch_queue.qsize()
max_queue_size = 100 # 假设的最大队列大小
return min(queue_size / max_queue_size, 1.0)
def add_message(self, exchange: str, routing_key: str, body: bytes,
properties: pika.BasicProperties = None) -> bool:
"""添加消息到批处理器"""
with self.lock:
if self.current_batch is None:
batch_id = f"batch_{int(time.time() * 1000000)}"
self.current_batch = MessageBatch(batch_id)
return self.current_batch.add_message(exchange, routing_key, body, properties)
def _compress_batch(self, messages: List[Dict[str, Any]]) -> bytes:
"""压缩批次数据"""
if not self.config.compression_enabled:
return None
try:
import json
import gzip
import lz4.frame
# 序列化消息
serialized_data = json.dumps([
{
'exchange': msg['exchange'],
'routing_key': msg['routing_key'],
'body': msg['body'].decode('utf-8', errors='ignore'),
'properties': {
'delivery_mode': msg['properties'].delivery_mode if msg['properties'] else None,
'message_id': msg['properties'].message_id if msg['properties'] else None,
'timestamp': msg['properties'].timestamp if msg['properties'] else None
}
}
for msg in messages
], ensure_ascii=False).encode('utf-8')
# 压缩数据
if self.config.compression_algorithm == "gzip":
compressed_data = gzip.compress(serialized_data)
elif self.config.compression_algorithm == "lz4":
compressed_data = lz4.frame.compress(serialized_data)
else:
compressed_data = serialized_data
# 计算压缩比
compression_ratio = len(compressed_data) / len(serialized_data)
self.stats['compression_ratio'] = compression_ratio
return compressed_data
except Exception as e:
self.logger.error(f"压缩批次数据失败: {e}")
return None
def _process_batch(self, batch: MessageBatch):
"""处理批次"""
start_time = time.time()
try:
messages = batch.get_messages()
if not messages:
return
# 获取连接
connection = self.connection_pool.get_connection()
if not connection:
self.logger.error("无法获取连接处理批次")
return
try:
# 获取通道
channel = connection.get_channel()
# 启用发布确认
channel.confirm_delivery()
# 批量发布消息
successful_count = 0
failed_count = 0
for message in messages:
try:
channel.basic_publish(
exchange=message['exchange'],
routing_key=message['routing_key'],
body=message['body'],
properties=message['properties'],
mandatory=True
)
successful_count += 1
except Exception as e:
failed_count += 1
self.logger.error(f"批次消息发布失败: {e}")
# 等待确认
if successful_count > 0:
channel.confirm_delivery()
# 更新统计信息
with self.lock:
self.stats['total_batches'] += 1
self.stats['total_messages'] += len(messages)
self.stats['avg_batch_size'] = (self.stats['total_messages'] /
max(self.stats['total_batches'], 1))
processing_time = time.time() - start_time
self.stats['processing_time'] = processing_time
self.logger.info(f"✅ 批次处理完成: {batch.batch_id}, "
f"消息数: {len(messages)}, "
f"成功: {successful_count}, "
f"失败: {failed_count}, "
f"耗时: {processing_time:.3f}s")
finally:
# 归还连接
self.connection_pool.return_connection(connection)
except Exception as e:
self.logger.error(f"❌ 批次处理失败: {e}")
finally:
# 清空批次
batch.clear()
def flush(self):
"""强制处理当前批次"""
with self.lock:
if self.current_batch and self.current_batch.size() > 0:
self.batch_queue.put(self.current_batch)
self.current_batch = None
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
with self.lock:
return {
**self.stats,
'pending_batches': self.batch_queue.qsize(),
'current_batch_size': self.current_batch.size() if self.current_batch else 0
}
6.1.3 内存与缓存优化
import weakref
from collections import OrderedDict
import psutil
import gc
class CacheStrategy(Enum):
"""缓存策略"""
LRU = "lru" # 最近最少使用
LFU = "lfu" # 最少使用频率
TTL = "ttl" # 基于时间
ADAPTIVE = "adaptive" # 自适应
@dataclass
class CacheConfig:
"""缓存配置"""
strategy: CacheStrategy = CacheStrategy.LRU
max_size: int = 10000
ttl_seconds: int = 3600
cleanup_interval: int = 300
memory_threshold: float = 0.8 # 内存使用阈值
enable_compression: bool = True
enable_weak_references: bool = True
class MemoryOptimizedCache:
"""内存优化缓存"""
def __init__(self, config: CacheConfig = None):
self.config = config or CacheConfig()
self.cache = OrderedDict()
self.access_count = {}
self.access_time = {}
self.lock = threading.Lock()
self.logger = logging.getLogger(__name__)
self.stats = {
'hits': 0,
'misses': 0,
'evictions': 0,
'memory_usage': 0
}
# 启动清理线程
self._start_cleanup_thread()
def _start_cleanup_thread(self):
"""启动清理线程"""
def cleanup_worker():
while True:
try:
self._cleanup_expired_entries()
self._check_memory_usage()
time.sleep(self.config.cleanup_interval)
except Exception as e:
self.logger.error(f"缓存清理异常: {e}")
cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
cleanup_thread.start()
def _cleanup_expired_entries(self):
"""清理过期条目"""
if self.config.strategy != CacheStrategy.TTL:
return
current_time = time.time()
expired_keys = []
with self.lock:
for key, access_time in self.access_time.items():
if current_time - access_time > self.config.ttl_seconds:
expired_keys.append(key)
for key in expired_keys:
self._remove_entry(key)
if expired_keys:
self.logger.info(f"清理过期缓存条目: {len(expired_keys)} 个")
def _check_memory_usage(self):
"""检查内存使用情况"""
try:
memory_percent = psutil.virtual_memory().percent / 100
if memory_percent > self.config.memory_threshold:
# 内存使用过高,强制清理
self._force_cleanup()
# 触发垃圾回收
gc.collect()
self.logger.warning(f"内存使用过高 ({memory_percent:.1%}),执行强制清理")
self.stats['memory_usage'] = memory_percent
except Exception as e:
self.logger.error(f"检查内存使用失败: {e}")
def _force_cleanup(self):
"""强制清理缓存"""
with self.lock:
# 清理一半的缓存条目
cleanup_count = len(self.cache) // 2
if self.config.strategy == CacheStrategy.LRU:
# 移除最久未使用的条目
for _ in range(cleanup_count):
if self.cache:
key = next(iter(self.cache))
self._remove_entry(key)
elif self.config.strategy == CacheStrategy.LFU:
# 移除使用频率最低的条目
sorted_keys = sorted(self.access_count.keys(),
key=lambda k: self.access_count[k])
for key in sorted_keys[:cleanup_count]:
self._remove_entry(key)
self.stats['evictions'] += cleanup_count
def _remove_entry(self, key: str):
"""移除缓存条目"""
if key in self.cache:
del self.cache[key]
if key in self.access_count:
del self.access_count[key]
if key in self.access_time:
del self.access_time[key]
def _compress_value(self, value: Any) -> bytes:
"""压缩值"""
if not self.config.enable_compression:
return value
try:
import pickle
import gzip
pickled_data = pickle.dumps(value)
compressed_data = gzip.compress(pickled_data)
return compressed_data
except Exception as e:
self.logger.error(f"压缩值失败: {e}")
return value
def _decompress_value(self, compressed_value: bytes) -> Any:
"""解压缩值"""
if not self.config.enable_compression:
return compressed_value
try:
import pickle
import gzip
decompressed_data = gzip.decompress(compressed_value)
value = pickle.loads(decompressed_data)
return value
except Exception as e:
self.logger.error(f"解压缩值失败: {e}")
return compressed_value
def get(self, key: str) -> Optional[Any]:
"""获取缓存值"""
with self.lock:
if key in self.cache:
# 更新访问统计
self.access_count[key] = self.access_count.get(key, 0) + 1
self.access_time[key] = time.time()
# LRU策略:移动到末尾
if self.config.strategy == CacheStrategy.LRU:
self.cache.move_to_end(key)
self.stats['hits'] += 1
# 解压缩并返回值
compressed_value = self.cache[key]
return self._decompress_value(compressed_value)
else:
self.stats['misses'] += 1
return None
def put(self, key: str, value: Any):
"""存储缓存值"""
with self.lock:
# 检查缓存大小限制
if len(self.cache) >= self.config.max_size and key not in self.cache:
self._evict_entry()
# 压缩并存储值
compressed_value = self._compress_value(value)
self.cache[key] = compressed_value
# 更新访问统计
self.access_count[key] = self.access_count.get(key, 0) + 1
self.access_time[key] = time.time()
# LRU策略:移动到末尾
if self.config.strategy == CacheStrategy.LRU:
self.cache.move_to_end(key)
def _evict_entry(self):
"""驱逐缓存条目"""
if not self.cache:
return
if self.config.strategy == CacheStrategy.LRU:
# 移除最久未使用的条目
key = next(iter(self.cache))
self._remove_entry(key)
elif self.config.strategy == CacheStrategy.LFU:
# 移除使用频率最低的条目
min_key = min(self.access_count.keys(),
key=lambda k: self.access_count[k])
self._remove_entry(min_key)
elif self.config.strategy == CacheStrategy.TTL:
# 移除最旧的条目
oldest_key = min(self.access_time.keys(),
key=lambda k: self.access_time[k])
self._remove_entry(oldest_key)
self.stats['evictions'] += 1
def clear(self):
"""清空缓存"""
with self.lock:
self.cache.clear()
self.access_count.clear()
self.access_time.clear()
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
with self.lock:
total_requests = self.stats['hits'] + self.stats['misses']
hit_rate = (self.stats['hits'] / max(total_requests, 1)) * 100
return {
**self.stats,
'cache_size': len(self.cache),
'hit_rate': hit_rate,
'total_requests': total_requests
}
class MessageCache:
"""消息缓存管理器"""
def __init__(self, config: CacheConfig = None):
self.config = config or CacheConfig()
self.message_cache = MemoryOptimizedCache(config)
self.routing_cache = MemoryOptimizedCache(config)
self.metadata_cache = MemoryOptimizedCache(config)
self.logger = logging.getLogger(__name__)
def cache_message(self, message_id: str, message_data: Dict[str, Any]):
"""缓存消息"""
self.message_cache.put(message_id, message_data)
def get_cached_message(self, message_id: str) -> Optional[Dict[str, Any]]:
"""获取缓存的消息"""
return self.message_cache.get(message_id)
def cache_routing_result(self, routing_key: str, queue_list: List[str]):
"""缓存路由结果"""
self.routing_cache.put(routing_key, queue_list)
def get_cached_routing(self, routing_key: str) -> Optional[List[str]]:
"""获取缓存的路由结果"""
return self.routing_cache.get(routing_key)
def cache_metadata(self, key: str, metadata: Dict[str, Any]):
"""缓存元数据"""
self.metadata_cache.put(key, metadata)
def get_cached_metadata(self, key: str) -> Optional[Dict[str, Any]]:
"""获取缓存的元数据"""
return self.metadata_cache.get(key)
def get_overall_stats(self) -> Dict[str, Any]:
"""获取整体统计信息"""
return {
'message_cache': self.message_cache.get_stats(),
'routing_cache': self.routing_cache.get_stats(),
'metadata_cache': self.metadata_cache.get_stats()
}
def clear_all_caches(self):
"""清空所有缓存"""
self.message_cache.clear()
self.routing_cache.clear()
self.metadata_cache.clear()
self.logger.info("所有缓存已清空")
6.4 性能监控与指标收集
6.4.1 监控指标定义
from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Callable
import time
import threading
import json
from collections import defaultdict, deque
class MetricType(Enum):
"""监控指标类型"""
COUNTER = "counter" # 计数器
GAUGE = "gauge" # 仪表盘
HISTOGRAM = "histogram" # 直方图
SUMMARY = "summary" # 摘要
class AlertLevel(Enum):
"""告警级别"""
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
@dataclass
class MetricValue:
"""监控指标值"""
name: str
value: float
timestamp: float
labels: Dict[str, str]
metric_type: MetricType
@dataclass
class AlertRule:
"""告警规则"""
name: str
metric_name: str
condition: str # 条件表达式,如 "> 100", "< 0.95"
threshold: float
level: AlertLevel
duration: int # 持续时间(秒)
enabled: bool = True
@dataclass
class Alert:
"""告警信息"""
rule_name: str
metric_name: str
current_value: float
threshold: float
level: AlertLevel
message: str
timestamp: float
resolved: bool = False
6.4.2 性能监控管理器
class PerformanceMonitor:
"""性能监控管理器"""
def __init__(self):
self.metrics: Dict[str, List[MetricValue]] = defaultdict(list)
self.alert_rules: Dict[str, AlertRule] = {}
self.active_alerts: Dict[str, Alert] = {}
self.alert_callbacks: List[Callable[[Alert], None]] = []
self.lock = threading.Lock()
self.max_metric_history = 1000
def record_metric(self, name: str, value: float,
labels: Optional[Dict[str, str]] = None,
metric_type: MetricType = MetricType.GAUGE):
"""记录监控指标"""
with self.lock:
metric = MetricValue(
name=name,
value=value,
timestamp=time.time(),
labels=labels or {},
metric_type=metric_type
)
self.metrics[name].append(metric)
# 保持历史记录数量限制
if len(self.metrics[name]) > self.max_metric_history:
self.metrics[name] = self.metrics[name][-self.max_metric_history:]
# 检查告警规则
self._check_alert_rules(name, value)
def add_alert_rule(self, rule: AlertRule):
"""添加告警规则"""
self.alert_rules[rule.name] = rule
def remove_alert_rule(self, rule_name: str):
"""移除告警规则"""
if rule_name in self.alert_rules:
del self.alert_rules[rule_name]
def add_alert_callback(self, callback: Callable[[Alert], None]):
"""添加告警回调"""
self.alert_callbacks.append(callback)
def _check_alert_rules(self, metric_name: str, value: float):
"""检查告警规则"""
for rule in self.alert_rules.values():
if not rule.enabled or rule.metric_name != metric_name:
continue
triggered = self._evaluate_condition(value, rule.condition, rule.threshold)
if triggered:
self._trigger_alert(rule, value)
else:
self._resolve_alert(rule.name)
def _evaluate_condition(self, value: float, condition: str, threshold: float) -> bool:
"""评估告警条件"""
if condition == ">":
return value > threshold
elif condition == "<":
return value < threshold
elif condition == ">=":
return value >= threshold
elif condition == "<=":
return value <= threshold
elif condition == "==":
return abs(value - threshold) < 0.001
return False
def _trigger_alert(self, rule: AlertRule, current_value: float):
"""触发告警"""
if rule.name not in self.active_alerts:
alert = Alert(
rule_name=rule.name,
metric_name=rule.metric_name,
current_value=current_value,
threshold=rule.threshold,
level=rule.level,
message=f"Metric {rule.metric_name} {rule.condition} {rule.threshold}, current: {current_value}",
timestamp=time.time()
)
self.active_alerts[rule.name] = alert
# 执行告警回调
for callback in self.alert_callbacks:
try:
callback(alert)
except Exception as e:
print(f"Alert callback error: {e}")
def _resolve_alert(self, rule_name: str):
"""解决告警"""
if rule_name in self.active_alerts:
alert = self.active_alerts[rule_name]
alert.resolved = True
del self.active_alerts[rule_name]
def get_metric_history(self, name: str, limit: int = 100) -> List[MetricValue]:
"""获取指标历史"""
with self.lock:
return self.metrics.get(name, [])[-limit:]
def get_current_metrics(self) -> Dict[str, float]:
"""获取当前指标值"""
with self.lock:
current = {}
for name, values in self.metrics.items():
if values:
current[name] = values[-1].value
return current
def get_active_alerts(self) -> List[Alert]:
"""获取活跃告警"""
return list(self.active_alerts.values())
def export_metrics(self, format_type: str = "json") -> str:
"""导出监控指标"""
if format_type == "json":
return self._export_json()
elif format_type == "prometheus":
return self._export_prometheus()
else:
raise ValueError(f"Unsupported format: {format_type}")
def _export_json(self) -> str:
"""导出JSON格式"""
data = {
"timestamp": time.time(),
"metrics": self.get_current_metrics(),
"alerts": [{
"rule_name": alert.rule_name,
"metric_name": alert.metric_name,
"current_value": alert.current_value,
"threshold": alert.threshold,
"level": alert.level.value,
"message": alert.message,
"timestamp": alert.timestamp
} for alert in self.get_active_alerts()]
}
return json.dumps(data, indent=2)
def _export_prometheus(self) -> str:
"""导出Prometheus格式"""
lines = []
current_metrics = self.get_current_metrics()
for name, value in current_metrics.items():
# 简化的Prometheus格式
lines.append(f"# TYPE {name} gauge")
lines.append(f"{name} {value}")
return "\n".join(lines)
6.4.3 RabbitMQ性能监控集成
class RabbitMQPerformanceMonitor:
"""RabbitMQ性能监控集成"""
def __init__(self, connection_pool, management_api):
self.connection_pool = connection_pool
self.management_api = management_api
self.monitor = PerformanceMonitor()
self.monitoring = False
self.monitor_thread = None
self.monitor_interval = 30 # 监控间隔(秒)
# 设置默认告警规则
self._setup_default_alerts()
def _setup_default_alerts(self):
"""设置默认告警规则"""
default_rules = [
AlertRule(
name="high_memory_usage",
metric_name="memory_usage_percent",
condition=">",
threshold=85.0,
level=AlertLevel.WARNING,
duration=300
),
AlertRule(
name="critical_memory_usage",
metric_name="memory_usage_percent",
condition=">",
threshold=95.0,
level=AlertLevel.CRITICAL,
duration=60
),
AlertRule(
name="high_queue_length",
metric_name="max_queue_length",
condition=">",
threshold=10000,
level=AlertLevel.WARNING,
duration=300
),
AlertRule(
name="low_connection_success_rate",
metric_name="connection_success_rate",
condition="<",
threshold=0.95,
level=AlertLevel.ERROR,
duration=180
)
]
for rule in default_rules:
self.monitor.add_alert_rule(rule)
def start_monitoring(self):
"""开始监控"""
if self.monitoring:
return
self.monitoring = True
self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
self.monitor_thread.start()
print("RabbitMQ performance monitoring started")
def stop_monitoring(self):
"""停止监控"""
self.monitoring = False
if self.monitor_thread:
self.monitor_thread.join(timeout=5)
print("RabbitMQ performance monitoring stopped")
def _monitor_loop(self):
"""监控循环"""
while self.monitoring:
try:
self._collect_metrics()
time.sleep(self.monitor_interval)
except Exception as e:
print(f"Monitoring error: {e}")
time.sleep(self.monitor_interval)
def _collect_metrics(self):
"""收集监控指标"""
# 收集连接池指标
pool_stats = self.connection_pool.get_statistics()
self.monitor.record_metric("active_connections", pool_stats["active_connections"])
self.monitor.record_metric("idle_connections", pool_stats["idle_connections"])
self.monitor.record_metric("connection_success_rate", pool_stats["success_rate"])
# 收集RabbitMQ服务器指标
try:
overview = self.management_api.get_overview()
if overview:
# 内存使用
memory_used = overview.get("memory", {}).get("used", 0)
memory_limit = overview.get("memory", {}).get("limit", 1)
memory_percent = (memory_used / memory_limit) * 100 if memory_limit > 0 else 0
self.monitor.record_metric("memory_usage_percent", memory_percent)
# 消息统计
message_stats = overview.get("message_stats", {})
self.monitor.record_metric("messages_published",
message_stats.get("publish", 0),
metric_type=MetricType.COUNTER)
self.monitor.record_metric("messages_delivered",
message_stats.get("deliver_get", 0),
metric_type=MetricType.COUNTER)
# 收集队列指标
queues = self.management_api.get_queues()
if queues:
total_messages = sum(q.get("messages", 0) for q in queues)
max_queue_length = max((q.get("messages", 0) for q in queues), default=0)
self.monitor.record_metric("total_queue_messages", total_messages)
self.monitor.record_metric("max_queue_length", max_queue_length)
self.monitor.record_metric("queue_count", len(queues))
except Exception as e:
print(f"Failed to collect RabbitMQ metrics: {e}")
def get_performance_report(self) -> Dict:
"""获取性能报告"""
current_metrics = self.monitor.get_current_metrics()
active_alerts = self.monitor.get_active_alerts()
return {
"timestamp": time.time(),
"metrics": current_metrics,
"alerts": [{
"rule_name": alert.rule_name,
"level": alert.level.value,
"message": alert.message,
"timestamp": alert.timestamp
} for alert in active_alerts],
"health_status": self._calculate_health_status(current_metrics, active_alerts)
}
def _calculate_health_status(self, metrics: Dict[str, float], alerts: List[Alert]) -> str:
"""计算健康状态"""
if any(alert.level == AlertLevel.CRITICAL for alert in alerts):
return "critical"
elif any(alert.level == AlertLevel.ERROR for alert in alerts):
return "error"
elif any(alert.level == AlertLevel.WARNING for alert in alerts):
return "warning"
else:
return "healthy"
6.5 性能优化与监控示例
6.5.1 完整的性能优化示例
def performance_optimization_example():
"""性能优化示例"""
# 1. 配置连接池
pool_config = ConnectionPoolConfig(
strategy=ConnectionPoolStrategy.DYNAMIC,
min_connections=3,
max_connections=15,
initial_connections=5,
connection_timeout=30,
idle_timeout=300,
health_check_interval=60
)
# 2. 创建连接池
connection_params = pika.ConnectionParameters(
host='localhost',
port=5672,
virtual_host='/',
credentials=pika.PlainCredentials('guest', 'guest'),
heartbeat=600,
blocked_connection_timeout=300
)
pool = ConnectionPool(connection_params, pool_config)
# 3. 配置批处理
batch_config = BatchConfig(
mode=BatchMode.SIZE_AND_TIME,
batch_size=100,
flush_interval=5.0,
max_wait_time=10.0,
enable_compression=True,
compression_threshold=1024
)
batch_processor = BatchProcessor(pool, batch_config)
# 4. 配置缓存
cache_config = CacheConfig(
strategy=CacheStrategy.LRU,
max_size=10000,
ttl=3600,
memory_threshold=0.8,
enable_compression=True,
enable_weak_references=True
)
message_cache = MessageCache(cache_config)
# 5. 创建管理API客户端
management_config = RabbitMQManagementConfig(
host="localhost",
port=15672,
username="guest",
password="guest"
)
management_api = RabbitMQManagementAPI(management_config)
# 6. 创建性能监控
perf_monitor = RabbitMQPerformanceMonitor(pool, management_api)
# 7. 设置告警回调
def alert_handler(alert: Alert):
print(f"🚨 告警触发: {alert.message}")
if alert.level == AlertLevel.CRITICAL:
print(f"💥 严重告警: {alert.rule_name}")
# 这里可以发送邮件、短信等通知
perf_monitor.monitor.add_alert_callback(alert_handler)
try:
# 启动监控
perf_monitor.start_monitoring()
# 启动批处理
batch_processor.start()
print("🚀 性能优化系统启动完成")
# 模拟消息发送
for i in range(1000):
message = {
"id": i,
"content": f"Test message {i}",
"timestamp": time.time()
}
# 使用批处理发送
batch_processor.add_message(
exchange="test_exchange",
routing_key="test.key",
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # 持久化
timestamp=int(time.time())
)
)
# 缓存消息
message_cache.cache_message(f"msg_{i}", message)
if i % 100 == 0:
print(f"📊 已发送 {i} 条消息")
# 获取性能报告
report = perf_monitor.get_performance_report()
print(f"📈 系统状态: {report['health_status']}")
print(f"📊 活跃连接: {report['metrics'].get('active_connections', 0)}")
print(f"📊 队列消息数: {report['metrics'].get('total_queue_messages', 0)}")
# 等待批处理完成
time.sleep(10)
# 获取最终统计
batch_stats = batch_processor.get_statistics()
cache_stats = message_cache.get_overall_stats()
pool_stats = pool.get_statistics()
print("\n📊 最终统计报告:")
print(f"批处理统计: {json.dumps(batch_stats, indent=2)}")
print(f"缓存统计: {json.dumps(cache_stats, indent=2)}")
print(f"连接池统计: {json.dumps(pool_stats, indent=2)}")
# 导出监控指标
metrics_json = perf_monitor.monitor.export_metrics("json")
print(f"\n📊 监控指标 (JSON):\n{metrics_json}")
metrics_prometheus = perf_monitor.monitor.export_metrics("prometheus")
print(f"\n📊 监控指标 (Prometheus):\n{metrics_prometheus}")
except Exception as e:
print(f"❌ 性能优化示例执行失败: {e}")
finally:
# 清理资源
batch_processor.stop()
perf_monitor.stop_monitoring()
pool.close()
print("🔄 资源清理完成")
if __name__ == "__main__":
performance_optimization_example()
6.6 本章总结
6.6.1 核心知识点
连接与通道优化
- 连接池管理策略(固定、动态、弹性)
- 通道复用和生命周期管理
- 健康检查和故障恢复机制
消息批处理优化
- 批处理模式(大小、时间、混合)
- 消息压缩和序列化优化
- 批量发送和统计监控
内存与缓存优化
- 缓存策略(LRU、LFU、TTL)
- 内存阈值管理和压缩
- 弱引用和垃圾回收优化
性能监控与指标收集
- 监控指标类型和数据结构
- 告警规则和回调机制
- 指标导出(JSON、Prometheus)
6.6.2 最佳实践
连接管理
- 使用连接池避免频繁创建/销毁连接
- 合理设置连接参数和超时时间
- 实施健康检查和自动恢复
性能优化
- 启用消息批处理提高吞吐量
- 使用缓存减少重复计算
- 合理配置内存和压缩策略
监控告警
- 设置关键指标的监控和告警
- 建立多级告警机制
- 定期导出和分析性能数据
资源管理
- 及时释放不用的资源
- 监控内存使用情况
- 实施优雅的关闭流程
6.6.3 练习题
基础练习
- 实现一个简单的连接池管理器
- 创建消息批处理器
- 设计缓存淘汰策略
进阶练习
- 集成性能监控系统
- 实现自适应批处理大小
- 开发告警通知机制
实战练习
- 构建完整的性能优化方案
- 设计监控仪表板
- 实施性能基准测试
通过本章的学习,你应该掌握了RabbitMQ的性能优化策略和监控方法,能够构建高性能、可监控的消息队列系统。这些技能对于生产环境中的RabbitMQ部署和运维至关重要。