性能优化策略

1. 连接池管理

高效的连接池管理是MCP协议性能优化的关键。通过复用连接、控制并发数量和智能负载均衡,可以显著提升系统性能。

import asyncio
import time
from typing import Dict, List, Optional, Any, Callable, Set
from dataclasses import dataclass, field
from enum import Enum
from abc import ABC, abstractmethod
import weakref
from collections import defaultdict, deque
import threading
import psutil
import json
from datetime import datetime, timedelta

class ConnectionState(Enum):
    """连接状态"""
    IDLE = "idle"
    ACTIVE = "active"
    CONNECTING = "connecting"
    DISCONNECTED = "disconnected"
    ERROR = "error"

@dataclass
class ConnectionMetrics:
    """连接指标"""
    created_at: datetime = field(default_factory=datetime.now)
    last_used: datetime = field(default_factory=datetime.now)
    total_requests: int = 0
    total_errors: int = 0
    total_bytes_sent: int = 0
    total_bytes_received: int = 0
    average_response_time: float = 0.0
    peak_response_time: float = 0.0
    
    def update_request_metrics(self, response_time: float, bytes_sent: int, bytes_received: int, error: bool = False):
        """更新请求指标"""
        self.last_used = datetime.now()
        self.total_requests += 1
        self.total_bytes_sent += bytes_sent
        self.total_bytes_received += bytes_received
        
        if error:
            self.total_errors += 1
        
        # 更新响应时间统计
        if self.total_requests == 1:
            self.average_response_time = response_time
        else:
            self.average_response_time = (
                (self.average_response_time * (self.total_requests - 1) + response_time) / 
                self.total_requests
            )
        
        if response_time > self.peak_response_time:
            self.peak_response_time = response_time

class MCPConnection:
    """MCP连接封装"""
    
    def __init__(self, connection_id: str, endpoint: str):
        self.connection_id = connection_id
        self.endpoint = endpoint
        self.state = ConnectionState.DISCONNECTED
        self.metrics = ConnectionMetrics()
        self.last_heartbeat = datetime.now()
        self.connection_object = None  # 实际的连接对象
        self.lock = asyncio.Lock()
        self.in_use = False
        self.created_at = datetime.now()
    
    async def connect(self):
        """建立连接"""
        async with self.lock:
            if self.state == ConnectionState.ACTIVE:
                return True
            
            try:
                self.state = ConnectionState.CONNECTING
                # 这里应该实现实际的连接逻辑
                await asyncio.sleep(0.1)  # 模拟连接延迟
                self.connection_object = f"connection_to_{self.endpoint}"
                self.state = ConnectionState.ACTIVE
                self.last_heartbeat = datetime.now()
                return True
            except Exception as e:
                self.state = ConnectionState.ERROR
                raise ConnectionError(f"连接失败: {e}")
    
    async def disconnect(self):
        """断开连接"""
        async with self.lock:
            if self.connection_object:
                # 这里应该实现实际的断开逻辑
                self.connection_object = None
            self.state = ConnectionState.DISCONNECTED
    
    async def send_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
        """发送请求"""
        if self.state != ConnectionState.ACTIVE:
            raise ConnectionError("连接未激活")
        
        start_time = time.time()
        try:
            # 模拟请求处理
            await asyncio.sleep(0.05)  # 模拟网络延迟
            
            request_json = json.dumps(request_data)
            response_json = json.dumps({"result": "success", "data": request_data})
            
            response_time = time.time() - start_time
            
            # 更新指标
            self.metrics.update_request_metrics(
                response_time=response_time,
                bytes_sent=len(request_json.encode()),
                bytes_received=len(response_json.encode())
            )
            
            return {"result": "success", "data": request_data}
        
        except Exception as e:
            response_time = time.time() - start_time
            self.metrics.update_request_metrics(
                response_time=response_time,
                bytes_sent=len(json.dumps(request_data).encode()),
                bytes_received=0,
                error=True
            )
            raise
    
    async def heartbeat(self) -> bool:
        """心跳检测"""
        try:
            if self.state == ConnectionState.ACTIVE:
                # 发送心跳请求
                await self.send_request({"method": "heartbeat"})
                self.last_heartbeat = datetime.now()
                return True
        except Exception:
            self.state = ConnectionState.ERROR
        return False
    
    def is_healthy(self, max_idle_time: timedelta = timedelta(minutes=5)) -> bool:
        """检查连接健康状态"""
        if self.state != ConnectionState.ACTIVE:
            return False
        
        # 检查是否超过最大空闲时间
        if datetime.now() - self.last_heartbeat > max_idle_time:
            return False
        
        return True
    
    def get_metrics_summary(self) -> Dict[str, Any]:
        """获取指标摘要"""
        return {
            "connection_id": self.connection_id,
            "endpoint": self.endpoint,
            "state": self.state.value,
            "created_at": self.created_at.isoformat(),
            "last_used": self.metrics.last_used.isoformat(),
            "total_requests": self.metrics.total_requests,
            "total_errors": self.metrics.total_errors,
            "error_rate": self.metrics.total_errors / max(self.metrics.total_requests, 1),
            "average_response_time": self.metrics.average_response_time,
            "peak_response_time": self.metrics.peak_response_time,
            "total_bytes_sent": self.metrics.total_bytes_sent,
            "total_bytes_received": self.metrics.total_bytes_received
        }

class ConnectionPool:
    """连接池"""
    
    def __init__(self, endpoint: str, min_connections: int = 2, 
                 max_connections: int = 10, max_idle_time: timedelta = timedelta(minutes=5)):
        self.endpoint = endpoint
        self.min_connections = min_connections
        self.max_connections = max_connections
        self.max_idle_time = max_idle_time
        
        self.connections: Dict[str, MCPConnection] = {}
        self.idle_connections: deque = deque()
        self.active_connections: Set[str] = set()
        
        self.lock = asyncio.Lock()
        self.connection_counter = 0
        self.total_requests = 0
        self.total_wait_time = 0.0
        
        # 启动后台任务
        self.cleanup_task = None
        self.heartbeat_task = None
    
    async def initialize(self):
        """初始化连接池"""
        # 创建最小连接数
        for _ in range(self.min_connections):
            await self._create_connection()
        
        # 启动后台任务
        self.cleanup_task = asyncio.create_task(self._cleanup_loop())
        self.heartbeat_task = asyncio.create_task(self._heartbeat_loop())
    
    async def _create_connection(self) -> MCPConnection:
        """创建新连接"""
        self.connection_counter += 1
        connection_id = f"{self.endpoint}_{self.connection_counter}"
        
        connection = MCPConnection(connection_id, self.endpoint)
        await connection.connect()
        
        self.connections[connection_id] = connection
        self.idle_connections.append(connection_id)
        
        return connection
    
    async def get_connection(self, timeout: float = 30.0) -> MCPConnection:
        """获取连接"""
        start_time = time.time()
        
        async with self.lock:
            # 尝试获取空闲连接
            while self.idle_connections:
                connection_id = self.idle_connections.popleft()
                connection = self.connections.get(connection_id)
                
                if connection and connection.is_healthy(self.max_idle_time):
                    connection.in_use = True
                    self.active_connections.add(connection_id)
                    
                    wait_time = time.time() - start_time
                    self.total_wait_time += wait_time
                    self.total_requests += 1
                    
                    return connection
                elif connection:
                    # 连接不健康,移除它
                    await self._remove_connection(connection_id)
            
            # 没有空闲连接,尝试创建新连接
            if len(self.connections) < self.max_connections:
                connection = await self._create_connection()
                connection_id = connection.connection_id
                
                # 从空闲队列移除并标记为活跃
                if connection_id in [self.idle_connections[i] for i in range(len(self.idle_connections))]:
                    temp_deque = deque()
                    while self.idle_connections:
                        cid = self.idle_connections.popleft()
                        if cid != connection_id:
                            temp_deque.append(cid)
                    self.idle_connections = temp_deque
                
                connection.in_use = True
                self.active_connections.add(connection_id)
                
                wait_time = time.time() - start_time
                self.total_wait_time += wait_time
                self.total_requests += 1
                
                return connection
        
        # 等待连接可用
        while time.time() - start_time < timeout:
            await asyncio.sleep(0.1)
            
            async with self.lock:
                if self.idle_connections:
                    connection_id = self.idle_connections.popleft()
                    connection = self.connections.get(connection_id)
                    
                    if connection and connection.is_healthy(self.max_idle_time):
                        connection.in_use = True
                        self.active_connections.add(connection_id)
                        
                        wait_time = time.time() - start_time
                        self.total_wait_time += wait_time
                        self.total_requests += 1
                        
                        return connection
        
        raise TimeoutError(f"获取连接超时: {timeout}秒")
    
    async def return_connection(self, connection: MCPConnection):
        """归还连接"""
        async with self.lock:
            if connection.connection_id in self.active_connections:
                connection.in_use = False
                self.active_connections.remove(connection.connection_id)
                
                if connection.is_healthy(self.max_idle_time):
                    self.idle_connections.append(connection.connection_id)
                else:
                    await self._remove_connection(connection.connection_id)
    
    async def _remove_connection(self, connection_id: str):
        """移除连接"""
        if connection_id in self.connections:
            connection = self.connections[connection_id]
            await connection.disconnect()
            del self.connections[connection_id]
        
        self.active_connections.discard(connection_id)
        
        # 从空闲队列移除
        temp_deque = deque()
        while self.idle_connections:
            cid = self.idle_connections.popleft()
            if cid != connection_id:
                temp_deque.append(cid)
        self.idle_connections = temp_deque
    
    async def _cleanup_loop(self):
        """清理循环"""
        while True:
            try:
                await asyncio.sleep(60)  # 每分钟清理一次
                
                async with self.lock:
                    # 清理不健康的连接
                    unhealthy_connections = []
                    for connection_id, connection in self.connections.items():
                        if not connection.is_healthy(self.max_idle_time) and not connection.in_use:
                            unhealthy_connections.append(connection_id)
                    
                    for connection_id in unhealthy_connections:
                        await self._remove_connection(connection_id)
                    
                    # 确保最小连接数
                    while len(self.connections) < self.min_connections:
                        await self._create_connection()
            
            except Exception as e:
                print(f"连接池清理错误: {e}")
    
    async def _heartbeat_loop(self):
        """心跳循环"""
        while True:
            try:
                await asyncio.sleep(30)  # 每30秒心跳一次
                
                # 对所有连接进行心跳检测
                heartbeat_tasks = []
                for connection in self.connections.values():
                    if not connection.in_use:
                        heartbeat_tasks.append(connection.heartbeat())
                
                if heartbeat_tasks:
                    await asyncio.gather(*heartbeat_tasks, return_exceptions=True)
            
            except Exception as e:
                print(f"心跳检测错误: {e}")
    
    async def close(self):
        """关闭连接池"""
        # 取消后台任务
        if self.cleanup_task:
            self.cleanup_task.cancel()
        if self.heartbeat_task:
            self.heartbeat_task.cancel()
        
        # 关闭所有连接
        close_tasks = []
        for connection in self.connections.values():
            close_tasks.append(connection.disconnect())
        
        if close_tasks:
            await asyncio.gather(*close_tasks, return_exceptions=True)
        
        self.connections.clear()
        self.idle_connections.clear()
        self.active_connections.clear()
    
    def get_pool_stats(self) -> Dict[str, Any]:
        """获取连接池统计"""
        total_connections = len(self.connections)
        idle_connections = len(self.idle_connections)
        active_connections = len(self.active_connections)
        
        avg_wait_time = self.total_wait_time / max(self.total_requests, 1)
        
        return {
            "endpoint": self.endpoint,
            "total_connections": total_connections,
            "idle_connections": idle_connections,
            "active_connections": active_connections,
            "min_connections": self.min_connections,
            "max_connections": self.max_connections,
            "total_requests": self.total_requests,
            "average_wait_time": avg_wait_time,
            "pool_utilization": active_connections / max(total_connections, 1)
        }

2. 缓存系统

import hashlib
from typing import Any, Optional, Dict, List, Tuple, Callable
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
import pickle
import gzip
import threading
from collections import OrderedDict

class CachePolicy(Enum):
    """缓存策略"""
    LRU = "lru"  # 最近最少使用
    LFU = "lfu"  # 最少使用频率
    FIFO = "fifo"  # 先进先出
    TTL = "ttl"  # 基于时间

@dataclass
class CacheEntry:
    """缓存条目"""
    key: str
    value: Any
    created_at: datetime
    last_accessed: datetime
    access_count: int = 0
    ttl: Optional[timedelta] = None
    size: int = 0
    
    def is_expired(self) -> bool:
        """检查是否过期"""
        if self.ttl is None:
            return False
        return datetime.now() - self.created_at > self.ttl
    
    def touch(self):
        """更新访问时间和次数"""
        self.last_accessed = datetime.now()
        self.access_count += 1

class CacheBackend(ABC):
    """缓存后端接口"""
    
    @abstractmethod
    async def get(self, key: str) -> Optional[Any]:
        """获取缓存值"""
        pass
    
    @abstractmethod
    async def set(self, key: str, value: Any, ttl: Optional[timedelta] = None) -> bool:
        """设置缓存值"""
        pass
    
    @abstractmethod
    async def delete(self, key: str) -> bool:
        """删除缓存值"""
        pass
    
    @abstractmethod
    async def clear(self) -> bool:
        """清空缓存"""
        pass
    
    @abstractmethod
    def get_stats(self) -> Dict[str, Any]:
        """获取缓存统计"""
        pass

class MemoryCacheBackend(CacheBackend):
    """内存缓存后端"""
    
    def __init__(self, max_size: int = 1000, max_memory: int = 100 * 1024 * 1024,  # 100MB
                 policy: CachePolicy = CachePolicy.LRU):
        self.max_size = max_size
        self.max_memory = max_memory
        self.policy = policy
        
        self.cache: Dict[str, CacheEntry] = {}
        self.access_order: OrderedDict = OrderedDict()  # 用于LRU
        self.frequency: Dict[str, int] = defaultdict(int)  # 用于LFU
        
        self.lock = threading.RLock()
        
        # 统计信息
        self.hits = 0
        self.misses = 0
        self.evictions = 0
        self.current_memory = 0
    
    def _calculate_size(self, value: Any) -> int:
        """计算值的大小"""
        try:
            return len(pickle.dumps(value))
        except Exception:
            return len(str(value).encode('utf-8'))
    
    def _evict_if_needed(self):
        """根据策略驱逐缓存项"""
        while (len(self.cache) >= self.max_size or 
               self.current_memory >= self.max_memory) and self.cache:
            
            if self.policy == CachePolicy.LRU:
                # 移除最近最少使用的项
                if self.access_order:
                    key_to_remove = next(iter(self.access_order))
                else:
                    key_to_remove = next(iter(self.cache))
            
            elif self.policy == CachePolicy.LFU:
                # 移除使用频率最低的项
                key_to_remove = min(self.cache.keys(), 
                                   key=lambda k: self.frequency.get(k, 0))
            
            elif self.policy == CachePolicy.FIFO:
                # 移除最早添加的项
                key_to_remove = next(iter(self.cache))
            
            else:  # TTL
                # 移除最早过期的项
                expired_keys = [k for k, entry in self.cache.items() if entry.is_expired()]
                if expired_keys:
                    key_to_remove = expired_keys[0]
                else:
                    key_to_remove = next(iter(self.cache))
            
            self._remove_entry(key_to_remove)
            self.evictions += 1
    
    def _remove_entry(self, key: str):
        """移除缓存条目"""
        if key in self.cache:
            entry = self.cache[key]
            self.current_memory -= entry.size
            del self.cache[key]
            
            self.access_order.pop(key, None)
            self.frequency.pop(key, None)
    
    async def get(self, key: str) -> Optional[Any]:
        """获取缓存值"""
        with self.lock:
            entry = self.cache.get(key)
            
            if entry is None:
                self.misses += 1
                return None
            
            if entry.is_expired():
                self._remove_entry(key)
                self.misses += 1
                return None
            
            # 更新访问信息
            entry.touch()
            self.frequency[key] += 1
            
            # 更新LRU顺序
            if self.policy == CachePolicy.LRU:
                self.access_order.move_to_end(key)
            
            self.hits += 1
            return entry.value
    
    async def set(self, key: str, value: Any, ttl: Optional[timedelta] = None) -> bool:
        """设置缓存值"""
        with self.lock:
            size = self._calculate_size(value)
            
            # 检查单个值是否超过最大内存限制
            if size > self.max_memory:
                return False
            
            # 如果键已存在,先移除旧值
            if key in self.cache:
                self._remove_entry(key)
            
            # 创建新条目
            entry = CacheEntry(
                key=key,
                value=value,
                created_at=datetime.now(),
                last_accessed=datetime.now(),
                ttl=ttl,
                size=size
            )
            
            # 检查是否需要驱逐
            self.current_memory += size
            self._evict_if_needed()
            
            # 添加新条目
            self.cache[key] = entry
            self.frequency[key] = 1
            
            if self.policy == CachePolicy.LRU:
                self.access_order[key] = True
            
            return True
    
    async def delete(self, key: str) -> bool:
        """删除缓存值"""
        with self.lock:
            if key in self.cache:
                self._remove_entry(key)
                return True
            return False
    
    async def clear(self) -> bool:
        """清空缓存"""
        with self.lock:
            self.cache.clear()
            self.access_order.clear()
            self.frequency.clear()
            self.current_memory = 0
            return True
    
    def get_stats(self) -> Dict[str, Any]:
        """获取缓存统计"""
        with self.lock:
            total_requests = self.hits + self.misses
            hit_rate = self.hits / max(total_requests, 1)
            
            return {
                "backend_type": "memory",
                "policy": self.policy.value,
                "max_size": self.max_size,
                "max_memory": self.max_memory,
                "current_size": len(self.cache),
                "current_memory": self.current_memory,
                "memory_utilization": self.current_memory / self.max_memory,
                "hits": self.hits,
                "misses": self.misses,
                "hit_rate": hit_rate,
                "evictions": self.evictions,
                "total_requests": total_requests
            }

class CompressedCacheBackend(CacheBackend):
    """压缩缓存后端"""
    
    def __init__(self, backend: CacheBackend, compression_level: int = 6,
                 min_compress_size: int = 1024):
        self.backend = backend
        self.compression_level = compression_level
        self.min_compress_size = min_compress_size
        
        self.compression_ratio = 0.0
        self.compressed_count = 0
        self.total_original_size = 0
        self.total_compressed_size = 0
    
    def _should_compress(self, data: bytes) -> bool:
        """判断是否应该压缩"""
        return len(data) >= self.min_compress_size
    
    def _compress_value(self, value: Any) -> Tuple[Any, bool]:
        """压缩值"""
        try:
            serialized = pickle.dumps(value)
            
            if self._should_compress(serialized):
                compressed = gzip.compress(serialized, compresslevel=self.compression_level)
                
                # 更新压缩统计
                self.total_original_size += len(serialized)
                self.total_compressed_size += len(compressed)
                self.compressed_count += 1
                
                if self.compressed_count > 0:
                    self.compression_ratio = (
                        1 - self.total_compressed_size / self.total_original_size
                    )
                
                return {
                    "compressed": True,
                    "data": compressed
                }, True
            else:
                return value, False
        
        except Exception:
            return value, False
    
    def _decompress_value(self, compressed_value: Any) -> Any:
        """解压缩值"""
        try:
            if isinstance(compressed_value, dict) and compressed_value.get("compressed"):
                decompressed = gzip.decompress(compressed_value["data"])
                return pickle.loads(decompressed)
            else:
                return compressed_value
        except Exception:
            return compressed_value
    
    async def get(self, key: str) -> Optional[Any]:
        """获取缓存值"""
        compressed_value = await self.backend.get(key)
        if compressed_value is None:
            return None
        
        return self._decompress_value(compressed_value)
    
    async def set(self, key: str, value: Any, ttl: Optional[timedelta] = None) -> bool:
        """设置缓存值"""
        compressed_value, was_compressed = self._compress_value(value)
        return await self.backend.set(key, compressed_value, ttl)
    
    async def delete(self, key: str) -> bool:
        """删除缓存值"""
        return await self.backend.delete(key)
    
    async def clear(self) -> bool:
        """清空缓存"""
        return await self.backend.clear()
    
    def get_stats(self) -> Dict[str, Any]:
        """获取缓存统计"""
        backend_stats = self.backend.get_stats()
        backend_stats.update({
            "compression_enabled": True,
            "compression_level": self.compression_level,
            "compression_ratio": self.compression_ratio,
            "compressed_count": self.compressed_count,
            "total_original_size": self.total_original_size,
            "total_compressed_size": self.total_compressed_size
        })
        return backend_stats

class CacheManager:
    """缓存管理器"""
    
    def __init__(self, default_backend: CacheBackend):
        self.default_backend = default_backend
        self.backends: Dict[str, CacheBackend] = {"default": default_backend}
        self.key_hasher = hashlib.sha256
        
        # 缓存命中率监控
        self.hit_rate_threshold = 0.8
        self.monitoring_enabled = True
    
    def register_backend(self, name: str, backend: CacheBackend):
        """注册缓存后端"""
        self.backends[name] = backend
    
    def _hash_key(self, key: str) -> str:
        """哈希键名"""
        return self.key_hasher(key.encode()).hexdigest()
    
    def _get_backend(self, backend_name: Optional[str] = None) -> CacheBackend:
        """获取缓存后端"""
        if backend_name and backend_name in self.backends:
            return self.backends[backend_name]
        return self.default_backend
    
    async def get(self, key: str, backend_name: Optional[str] = None) -> Optional[Any]:
        """获取缓存值"""
        backend = self._get_backend(backend_name)
        hashed_key = self._hash_key(key)
        return await backend.get(hashed_key)
    
    async def set(self, key: str, value: Any, ttl: Optional[timedelta] = None,
                 backend_name: Optional[str] = None) -> bool:
        """设置缓存值"""
        backend = self._get_backend(backend_name)
        hashed_key = self._hash_key(key)
        return await backend.set(hashed_key, value, ttl)
    
    async def delete(self, key: str, backend_name: Optional[str] = None) -> bool:
        """删除缓存值"""
        backend = self._get_backend(backend_name)
        hashed_key = self._hash_key(key)
        return await backend.delete(hashed_key)
    
    async def get_or_set(self, key: str, factory: Callable[[], Any],
                        ttl: Optional[timedelta] = None,
                        backend_name: Optional[str] = None) -> Any:
        """获取或设置缓存值"""
        # 尝试获取缓存值
        cached_value = await self.get(key, backend_name)
        if cached_value is not None:
            return cached_value
        
        # 缓存未命中,调用工厂函数生成值
        value = factory() if not asyncio.iscoroutinefunction(factory) else await factory()
        
        # 设置缓存
        await self.set(key, value, ttl, backend_name)
        
        return value
    
    async def clear_all(self) -> Dict[str, bool]:
        """清空所有缓存后端"""
        results = {}
        for name, backend in self.backends.items():
            results[name] = await backend.clear()
        return results
    
    def get_all_stats(self) -> Dict[str, Dict[str, Any]]:
        """获取所有后端统计"""
        stats = {}
        for name, backend in self.backends.items():
            stats[name] = backend.get_stats()
        return stats
    
    def check_performance(self) -> Dict[str, Any]:
        """检查缓存性能"""
        all_stats = self.get_all_stats()
        performance_report = {
            "overall_health": "good",
            "recommendations": [],
            "backend_performance": {}
        }
        
        for name, stats in all_stats.items():
            hit_rate = stats.get("hit_rate", 0)
            memory_utilization = stats.get("memory_utilization", 0)
            
            backend_health = "good"
            recommendations = []
            
            if hit_rate < self.hit_rate_threshold:
                backend_health = "poor"
                recommendations.append(f"命中率过低 ({hit_rate:.2%}),考虑调整缓存策略")
            
            if memory_utilization > 0.9:
                backend_health = "warning"
                recommendations.append(f"内存使用率过高 ({memory_utilization:.2%}),考虑增加内存或调整驱逐策略")
            
            performance_report["backend_performance"][name] = {
                "health": backend_health,
                "hit_rate": hit_rate,
                "memory_utilization": memory_utilization,
                "recommendations": recommendations
            }
            
            if backend_health == "poor":
                performance_report["overall_health"] = "poor"
            elif backend_health == "warning" and performance_report["overall_health"] == "good":
                performance_report["overall_health"] = "warning"
        
        return performance_report

3. 异步处理优化

import asyncio
from typing import List, Dict, Any, Optional, Callable, Awaitable, Union
from dataclasses import dataclass
from enum import Enum
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing

class TaskPriority(Enum):
    """任务优先级"""
    LOW = 1
    NORMAL = 2
    HIGH = 3
    CRITICAL = 4

@dataclass
class Task:
    """异步任务"""
    task_id: str
    coro: Awaitable[Any]
    priority: TaskPriority = TaskPriority.NORMAL
    timeout: Optional[float] = None
    retry_count: int = 0
    max_retries: int = 3
    created_at: datetime = field(default_factory=datetime.now)
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    result: Any = None
    error: Optional[Exception] = None
    
    def __lt__(self, other):
        """用于优先级队列排序"""
        return self.priority.value > other.priority.value

class AsyncTaskManager:
    """异步任务管理器"""
    
    def __init__(self, max_concurrent_tasks: int = 100, 
                 thread_pool_size: int = 10,
                 process_pool_size: Optional[int] = None):
        self.max_concurrent_tasks = max_concurrent_tasks
        self.semaphore = asyncio.Semaphore(max_concurrent_tasks)
        
        # 线程池和进程池
        self.thread_pool = ThreadPoolExecutor(max_workers=thread_pool_size)
        self.process_pool_size = process_pool_size or multiprocessing.cpu_count()
        self.process_pool = ProcessPoolExecutor(max_workers=self.process_pool_size)
        
        # 任务队列和状态跟踪
        self.pending_tasks: asyncio.PriorityQueue = asyncio.PriorityQueue()
        self.running_tasks: Dict[str, asyncio.Task] = {}
        self.completed_tasks: Dict[str, Task] = {}
        
        # 统计信息
        self.total_tasks = 0
        self.completed_count = 0
        self.failed_count = 0
        self.total_execution_time = 0.0
        
        # 工作协程
        self.workers: List[asyncio.Task] = []
        self.worker_count = min(max_concurrent_tasks, 50)
        self.running = False
    
    async def start(self):
        """启动任务管理器"""
        if self.running:
            return
        
        self.running = True
        
        # 启动工作协程
        for i in range(self.worker_count):
            worker = asyncio.create_task(self._worker(f"worker_{i}"))
            self.workers.append(worker)
    
    async def stop(self):
        """停止任务管理器"""
        self.running = False
        
        # 等待所有工作协程完成
        if self.workers:
            await asyncio.gather(*self.workers, return_exceptions=True)
        
        # 关闭线程池和进程池
        self.thread_pool.shutdown(wait=True)
        self.process_pool.shutdown(wait=True)
    
    async def _worker(self, worker_name: str):
        """工作协程"""
        while self.running:
            try:
                # 获取任务(带超时,避免无限等待)
                try:
                    _, task = await asyncio.wait_for(
                        self.pending_tasks.get(), timeout=1.0
                    )
                except asyncio.TimeoutError:
                    continue
                
                # 执行任务
                await self._execute_task(task)
                
                # 标记任务完成
                self.pending_tasks.task_done()
            
            except Exception as e:
                print(f"工作协程 {worker_name} 错误: {e}")
    
    async def _execute_task(self, task: Task):
        """执行任务"""
        task.started_at = datetime.now()
        
        try:
            async with self.semaphore:
                # 添加到运行任务列表
                running_task = asyncio.create_task(task.coro)
                self.running_tasks[task.task_id] = running_task
                
                try:
                    # 执行任务(带超时)
                    if task.timeout:
                        task.result = await asyncio.wait_for(running_task, timeout=task.timeout)
                    else:
                        task.result = await running_task
                    
                    # 任务成功完成
                    task.completed_at = datetime.now()
                    self.completed_count += 1
                    
                    execution_time = (task.completed_at - task.started_at).total_seconds()
                    self.total_execution_time += execution_time
                
                except asyncio.TimeoutError:
                    task.error = TimeoutError(f"任务超时: {task.timeout}秒")
                    self.failed_count += 1
                
                except Exception as e:
                    task.error = e
                    
                    # 重试逻辑
                    if task.retry_count < task.max_retries:
                        task.retry_count += 1
                        task.started_at = None
                        task.error = None
                        
                        # 重新加入队列
                        await self.pending_tasks.put((task.priority.value, task))
                        return
                    else:
                        self.failed_count += 1
                
                finally:
                    # 从运行任务列表移除
                    self.running_tasks.pop(task.task_id, None)
        
        finally:
            # 移动到完成任务列表
            self.completed_tasks[task.task_id] = task
    
    async def submit_task(self, task_id: str, coro: Awaitable[Any],
                         priority: TaskPriority = TaskPriority.NORMAL,
                         timeout: Optional[float] = None,
                         max_retries: int = 3) -> str:
        """提交任务"""
        task = Task(
            task_id=task_id,
            coro=coro,
            priority=priority,
            timeout=timeout,
            max_retries=max_retries
        )
        
        await self.pending_tasks.put((priority.value, task))
        self.total_tasks += 1
        
        return task_id
    
    async def submit_cpu_bound_task(self, task_id: str, func: Callable, *args, **kwargs) -> str:
        """提交CPU密集型任务到进程池"""
        loop = asyncio.get_event_loop()
        coro = loop.run_in_executor(self.process_pool, func, *args, **kwargs)
        
        return await self.submit_task(task_id, coro, TaskPriority.NORMAL)
    
    async def submit_io_bound_task(self, task_id: str, func: Callable, *args, **kwargs) -> str:
        """提交IO密集型任务到线程池"""
        loop = asyncio.get_event_loop()
        coro = loop.run_in_executor(self.thread_pool, func, *args, **kwargs)
        
        return await self.submit_task(task_id, coro, TaskPriority.NORMAL)
    
    async def wait_for_task(self, task_id: str, timeout: Optional[float] = None) -> Any:
        """等待任务完成"""
        start_time = time.time()
        
        while True:
            # 检查任务是否完成
            if task_id in self.completed_tasks:
                task = self.completed_tasks[task_id]
                if task.error:
                    raise task.error
                return task.result
            
            # 检查超时
            if timeout and time.time() - start_time > timeout:
                raise TimeoutError(f"等待任务 {task_id} 超时")
            
            # 短暂等待
            await asyncio.sleep(0.1)
    
    async def cancel_task(self, task_id: str) -> bool:
        """取消任务"""
        # 取消运行中的任务
        if task_id in self.running_tasks:
            running_task = self.running_tasks[task_id]
            running_task.cancel()
            return True
        
        # 从待处理队列中移除(这个比较复杂,简化处理)
        return False
    
    def get_task_status(self, task_id: str) -> Optional[Dict[str, Any]]:
        """获取任务状态"""
        # 检查完成的任务
        if task_id in self.completed_tasks:
            task = self.completed_tasks[task_id]
            return {
                "task_id": task.task_id,
                "status": "completed" if task.error is None else "failed",
                "priority": task.priority.value,
                "created_at": task.created_at.isoformat(),
                "started_at": task.started_at.isoformat() if task.started_at else None,
                "completed_at": task.completed_at.isoformat() if task.completed_at else None,
                "retry_count": task.retry_count,
                "error": str(task.error) if task.error else None
            }
        
        # 检查运行中的任务
        if task_id in self.running_tasks:
            return {
                "task_id": task_id,
                "status": "running"
            }
        
        return None
    
    def get_manager_stats(self) -> Dict[str, Any]:
        """获取管理器统计"""
        pending_count = self.pending_tasks.qsize()
        running_count = len(self.running_tasks)
        
        avg_execution_time = (
            self.total_execution_time / max(self.completed_count, 1)
        )
        
        success_rate = self.completed_count / max(self.total_tasks, 1)
        
        return {
            "total_tasks": self.total_tasks,
            "pending_tasks": pending_count,
            "running_tasks": running_count,
            "completed_tasks": self.completed_count,
            "failed_tasks": self.failed_count,
            "success_rate": success_rate,
            "average_execution_time": avg_execution_time,
            "max_concurrent_tasks": self.max_concurrent_tasks,
            "worker_count": self.worker_count,
            "thread_pool_size": self.thread_pool._max_workers,
            "process_pool_size": self.process_pool_size
        }

class BatchProcessor:
    """批处理器"""
    
    def __init__(self, batch_size: int = 100, max_wait_time: float = 1.0):
        self.batch_size = batch_size
        self.max_wait_time = max_wait_time
        self.pending_items: List[Any] = []
        self.batch_processors: Dict[str, Callable] = {}
        self.last_batch_time = time.time()
        
        # 启动批处理循环
        self.processing_task = None
        self.running = False
    
    def register_processor(self, name: str, processor: Callable[[List[Any]], Awaitable[List[Any]]]):
        """注册批处理器"""
        self.batch_processors[name] = processor
    
    async def start(self):
        """启动批处理"""
        if self.running:
            return
        
        self.running = True
        self.processing_task = asyncio.create_task(self._processing_loop())
    
    async def stop(self):
        """停止批处理"""
        self.running = False
        if self.processing_task:
            self.processing_task.cancel()
            try:
                await self.processing_task
            except asyncio.CancelledError:
                pass
    
    async def add_item(self, item: Any) -> bool:
        """添加项目到批处理队列"""
        self.pending_items.append(item)
        
        # 检查是否需要立即处理
        if len(self.pending_items) >= self.batch_size:
            await self._process_batch()
            return True
        
        return False
    
    async def _processing_loop(self):
        """批处理循环"""
        while self.running:
            try:
                await asyncio.sleep(0.1)
                
                # 检查是否需要处理批次
                current_time = time.time()
                if (self.pending_items and 
                    (len(self.pending_items) >= self.batch_size or
                     current_time - self.last_batch_time >= self.max_wait_time)):
                    await self._process_batch()
            
            except Exception as e:
                print(f"批处理循环错误: {e}")
    
    async def _process_batch(self):
        """处理批次"""
        if not self.pending_items:
            return
        
        batch = self.pending_items[:self.batch_size]
        self.pending_items = self.pending_items[self.batch_size:]
        self.last_batch_time = time.time()
        
        # 并行处理所有注册的处理器
        tasks = []
        for name, processor in self.batch_processors.items():
            task = asyncio.create_task(processor(batch.copy()))
            tasks.append(task)
        
        if tasks:
            await asyncio.gather(*tasks, return_exceptions=True)

性能监控系统

1. 指标收集器

import psutil
import gc
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, field
from collections import deque
import threading
import time
from datetime import datetime, timedelta

@dataclass
class MetricPoint:
    """指标数据点"""
    timestamp: datetime
    value: float
    tags: Dict[str, str] = field(default_factory=dict)

@dataclass
class SystemMetrics:
    """系统指标"""
    cpu_percent: float
    memory_percent: float
    memory_used: int
    memory_available: int
    disk_usage_percent: float
    network_bytes_sent: int
    network_bytes_recv: int
    open_files: int
    thread_count: int
    process_count: int

class MetricsCollector:
    """指标收集器"""
    
    def __init__(self, max_points: int = 1000, collection_interval: float = 1.0):
        self.max_points = max_points
        self.collection_interval = collection_interval
        
        # 指标存储
        self.metrics: Dict[str, deque] = defaultdict(lambda: deque(maxlen=max_points))
        self.custom_metrics: Dict[str, Callable[[], float]] = {}
        
        # 收集线程
        self.collection_thread = None
        self.running = False
        self.lock = threading.RLock()
        
        # 系统信息缓存
        self.process = psutil.Process()
        self.last_network_stats = psutil.net_io_counters()
        self.last_collection_time = time.time()
    
    def start(self):
        """启动指标收集"""
        if self.running:
            return
        
        self.running = True
        self.collection_thread = threading.Thread(target=self._collection_loop, daemon=True)
        self.collection_thread.start()
    
    def stop(self):
        """停止指标收集"""
        self.running = False
        if self.collection_thread:
            self.collection_thread.join(timeout=5)
    
    def register_custom_metric(self, name: str, collector: Callable[[], float]):
        """注册自定义指标"""
        self.custom_metrics[name] = collector
    
    def _collection_loop(self):
        """收集循环"""
        while self.running:
            try:
                self._collect_system_metrics()
                self._collect_custom_metrics()
                time.sleep(self.collection_interval)
            except Exception as e:
                print(f"指标收集错误: {e}")
    
    def _collect_system_metrics(self):
        """收集系统指标"""
        current_time = datetime.now()
        
        try:
            # CPU使用率
            cpu_percent = psutil.cpu_percent()
            self._add_metric("system.cpu.percent", cpu_percent, current_time)
            
            # 内存使用情况
            memory = psutil.virtual_memory()
            self._add_metric("system.memory.percent", memory.percent, current_time)
            self._add_metric("system.memory.used", memory.used, current_time)
            self._add_metric("system.memory.available", memory.available, current_time)
            
            # 磁盘使用情况
            disk = psutil.disk_usage('/')
            disk_percent = (disk.used / disk.total) * 100
            self._add_metric("system.disk.percent", disk_percent, current_time)
            
            # 网络IO
            network = psutil.net_io_counters()
            current_collection_time = time.time()
            time_delta = current_collection_time - self.last_collection_time
            
            if time_delta > 0:
                bytes_sent_rate = (network.bytes_sent - self.last_network_stats.bytes_sent) / time_delta
                bytes_recv_rate = (network.bytes_recv - self.last_network_stats.bytes_recv) / time_delta
                
                self._add_metric("system.network.bytes_sent_rate", bytes_sent_rate, current_time)
                self._add_metric("system.network.bytes_recv_rate", bytes_recv_rate, current_time)
            
            self.last_network_stats = network
            self.last_collection_time = current_collection_time
            
            # 进程相关指标
            self._add_metric("process.open_files", len(self.process.open_files()), current_time)
            self._add_metric("process.threads", self.process.num_threads(), current_time)
            self._add_metric("process.memory_percent", self.process.memory_percent(), current_time)
            
            # Python GC统计
            gc_stats = gc.get_stats()
            for i, stat in enumerate(gc_stats):
                self._add_metric(f"python.gc.generation_{i}.collections", stat['collections'], current_time)
                self._add_metric(f"python.gc.generation_{i}.collected", stat['collected'], current_time)
        
        except Exception as e:
            print(f"系统指标收集错误: {e}")
    
    def _collect_custom_metrics(self):
        """收集自定义指标"""
        current_time = datetime.now()
        
        for name, collector in self.custom_metrics.items():
            try:
                value = collector()
                self._add_metric(name, value, current_time)
            except Exception as e:
                print(f"自定义指标 {name} 收集错误: {e}")
    
    def _add_metric(self, name: str, value: float, timestamp: datetime, tags: Optional[Dict[str, str]] = None):
        """添加指标数据点"""
        with self.lock:
            point = MetricPoint(timestamp=timestamp, value=value, tags=tags or {})
            self.metrics[name].append(point)
    
    def get_metric_values(self, name: str, start_time: Optional[datetime] = None,
                         end_time: Optional[datetime] = None) -> List[MetricPoint]:
        """获取指标值"""
        with self.lock:
            points = list(self.metrics.get(name, []))
            
            if start_time or end_time:
                filtered_points = []
                for point in points:
                    if start_time and point.timestamp < start_time:
                        continue
                    if end_time and point.timestamp > end_time:
                        continue
                    filtered_points.append(point)
                return filtered_points
            
            return points
    
    def get_latest_value(self, name: str) -> Optional[float]:
        """获取最新指标值"""
        with self.lock:
            points = self.metrics.get(name)
            if points:
                return points[-1].value
            return None
    
    def get_average_value(self, name: str, duration: timedelta) -> Optional[float]:
        """获取指定时间段内的平均值"""
        end_time = datetime.now()
        start_time = end_time - duration
        
        points = self.get_metric_values(name, start_time, end_time)
        if not points:
            return None
        
        return sum(point.value for point in points) / len(points)
    
    def get_max_value(self, name: str, duration: timedelta) -> Optional[float]:
        """获取指定时间段内的最大值"""
        end_time = datetime.now()
        start_time = end_time - duration
        
        points = self.get_metric_values(name, start_time, end_time)
        if not points:
            return None
        
        return max(point.value for point in points)
    
    def get_all_metric_names(self) -> List[str]:
        """获取所有指标名称"""
        with self.lock:
            return list(self.metrics.keys())
    
    def get_metrics_summary(self) -> Dict[str, Dict[str, Any]]:
        """获取指标摘要"""
        summary = {}
        
        for name in self.get_all_metric_names():
            latest = self.get_latest_value(name)
            avg_1min = self.get_average_value(name, timedelta(minutes=1))
            avg_5min = self.get_average_value(name, timedelta(minutes=5))
            max_5min = self.get_max_value(name, timedelta(minutes=5))
            
            summary[name] = {
                "latest": latest,
                "avg_1min": avg_1min,
                "avg_5min": avg_5min,
                "max_5min": max_5min,
                "point_count": len(self.metrics[name])
            }
        
        return summary

class PerformanceMonitor:
    """性能监控器"""
    
    def __init__(self, metrics_collector: MetricsCollector):
        self.metrics_collector = metrics_collector
        self.alert_rules: List[AlertRule] = []
        self.alert_handlers: List[Callable[[Alert], None]] = []
        
        # 性能阈值
        self.cpu_threshold = 80.0
        self.memory_threshold = 85.0
        self.disk_threshold = 90.0
        self.response_time_threshold = 5.0
        
        # 监控状态
        self.monitoring_enabled = True
        self.last_alert_time: Dict[str, datetime] = {}
        self.alert_cooldown = timedelta(minutes=5)
    
    def add_alert_rule(self, rule: 'AlertRule'):
        """添加告警规则"""
        self.alert_rules.append(rule)
    
    def add_alert_handler(self, handler: Callable[['Alert'], None]):
        """添加告警处理器"""
        self.alert_handlers.append(handler)
    
    def check_performance(self) -> List['Alert']:
        """检查性能并生成告警"""
        if not self.monitoring_enabled:
            return []
        
        alerts = []
        current_time = datetime.now()
        
        # 检查系统指标
        alerts.extend(self._check_system_metrics(current_time))
        
        # 检查自定义规则
        for rule in self.alert_rules:
            try:
                if rule.should_alert(self.metrics_collector, current_time):
                    # 检查告警冷却时间
                    last_alert = self.last_alert_time.get(rule.name)
                    if not last_alert or current_time - last_alert > self.alert_cooldown:
                        alert = Alert(
                            rule_name=rule.name,
                            message=rule.get_alert_message(self.metrics_collector),
                            severity=rule.severity,
                            timestamp=current_time,
                            metric_name=rule.metric_name,
                            current_value=self.metrics_collector.get_latest_value(rule.metric_name)
                        )
                        alerts.append(alert)
                        self.last_alert_time[rule.name] = current_time
            except Exception as e:
                print(f"告警规则 {rule.name} 检查错误: {e}")
        
        # 处理告警
        for alert in alerts:
            self._handle_alert(alert)
        
        return alerts
    
    def _check_system_metrics(self, current_time: datetime) -> List['Alert']:
        """检查系统指标"""
        alerts = []
        
        # CPU使用率检查
        cpu_usage = self.metrics_collector.get_latest_value("system.cpu.percent")
        if cpu_usage and cpu_usage > self.cpu_threshold:
            alerts.append(Alert(
                rule_name="high_cpu_usage",
                message=f"CPU使用率过高: {cpu_usage:.1f}%",
                severity=AlertSeverity.WARNING,
                timestamp=current_time,
                metric_name="system.cpu.percent",
                current_value=cpu_usage
            ))
        
        # 内存使用率检查
        memory_usage = self.metrics_collector.get_latest_value("system.memory.percent")
        if memory_usage and memory_usage > self.memory_threshold:
            alerts.append(Alert(
                rule_name="high_memory_usage",
                message=f"内存使用率过高: {memory_usage:.1f}%",
                severity=AlertSeverity.WARNING,
                timestamp=current_time,
                metric_name="system.memory.percent",
                current_value=memory_usage
            ))
        
        # 磁盘使用率检查
        disk_usage = self.metrics_collector.get_latest_value("system.disk.percent")
        if disk_usage and disk_usage > self.disk_threshold:
            alerts.append(Alert(
                rule_name="high_disk_usage",
                message=f"磁盘使用率过高: {disk_usage:.1f}%",
                severity=AlertSeverity.CRITICAL,
                timestamp=current_time,
                metric_name="system.disk.percent",
                current_value=disk_usage
            ))
        
        return alerts
    
    def _handle_alert(self, alert: 'Alert'):
        """处理告警"""
        for handler in self.alert_handlers:
            try:
                handler(alert)
            except Exception as e:
                print(f"告警处理器错误: {e}")
    
    def get_performance_report(self) -> Dict[str, Any]:
        """获取性能报告"""
        metrics_summary = self.metrics_collector.get_metrics_summary()
        
        # 计算性能评分
        performance_score = self._calculate_performance_score(metrics_summary)
        
        # 生成建议
        recommendations = self._generate_recommendations(metrics_summary)
        
        return {
            "timestamp": datetime.now().isoformat(),
            "performance_score": performance_score,
            "metrics_summary": metrics_summary,
            "recommendations": recommendations,
            "alert_rules_count": len(self.alert_rules),
            "monitoring_enabled": self.monitoring_enabled
        }
    
    def _calculate_performance_score(self, metrics_summary: Dict[str, Dict[str, Any]]) -> float:
        """计算性能评分 (0-100)"""
        score = 100.0
        
        # CPU评分
        cpu_usage = metrics_summary.get("system.cpu.percent", {}).get("latest", 0)
        if cpu_usage > 90:
            score -= 30
        elif cpu_usage > 70:
            score -= 15
        elif cpu_usage > 50:
            score -= 5
        
        # 内存评分
        memory_usage = metrics_summary.get("system.memory.percent", {}).get("latest", 0)
        if memory_usage > 95:
            score -= 25
        elif memory_usage > 80:
            score -= 10
        elif memory_usage > 60:
            score -= 3
        
        # 磁盘评分
        disk_usage = metrics_summary.get("system.disk.percent", {}).get("latest", 0)
        if disk_usage > 95:
            score -= 20
        elif disk_usage > 85:
            score -= 8
        elif disk_usage > 70:
            score -= 2
        
        return max(0, score)
    
    def _generate_recommendations(self, metrics_summary: Dict[str, Dict[str, Any]]) -> List[str]:
        """生成性能优化建议"""
        recommendations = []
        
        cpu_usage = metrics_summary.get("system.cpu.percent", {}).get("latest", 0)
        memory_usage = metrics_summary.get("system.memory.percent", {}).get("latest", 0)
        disk_usage = metrics_summary.get("system.disk.percent", {}).get("latest", 0)
        
        if cpu_usage > 80:
            recommendations.append("CPU使用率过高,考虑优化算法或增加并发控制")
        
        if memory_usage > 85:
            recommendations.append("内存使用率过高,检查内存泄漏或增加缓存清理")
        
        if disk_usage > 90:
            recommendations.append("磁盘空间不足,清理临时文件或扩展存储")
        
        # 检查网络IO
        network_sent = metrics_summary.get("system.network.bytes_sent_rate", {}).get("latest", 0)
        if network_sent > 100 * 1024 * 1024:  # 100MB/s
            recommendations.append("网络发送流量过高,考虑数据压缩或批量处理")
        
        if not recommendations:
            recommendations.append("系统性能良好,继续保持")
        
        return recommendations

class AlertSeverity(Enum):
    """告警严重级别"""
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"

@dataclass
class Alert:
    """告警"""
    rule_name: str
    message: str
    severity: AlertSeverity
    timestamp: datetime
    metric_name: str
    current_value: Optional[float] = None
    threshold_value: Optional[float] = None

class AlertRule:
    """告警规则"""
    
    def __init__(self, name: str, metric_name: str, threshold: float,
                 comparison: str = ">", severity: AlertSeverity = AlertSeverity.WARNING,
                 duration: Optional[timedelta] = None):
        self.name = name
        self.metric_name = metric_name
        self.threshold = threshold
        self.comparison = comparison  # ">", "<", ">=", "<=", "=="
        self.severity = severity
        self.duration = duration  # 持续时间阈值
    
    def should_alert(self, metrics_collector: MetricsCollector, current_time: datetime) -> bool:
        """判断是否应该告警"""
        if self.duration:
            # 检查持续时间内的所有值
            start_time = current_time - self.duration
            points = metrics_collector.get_metric_values(self.metric_name, start_time, current_time)
            
            if not points:
                return False
            
            # 检查所有点是否都满足条件
            for point in points:
                if not self._check_threshold(point.value):
                    return False
            return True
        else:
            # 只检查最新值
            latest_value = metrics_collector.get_latest_value(self.metric_name)
            return latest_value is not None and self._check_threshold(latest_value)
    
    def _check_threshold(self, value: float) -> bool:
        """检查阈值"""
        if self.comparison == ">":
            return value > self.threshold
        elif self.comparison == "<":
            return value < self.threshold
        elif self.comparison == ">=":
            return value >= self.threshold
        elif self.comparison == "<=":
            return value <= self.threshold
        elif self.comparison == "==":
            return abs(value - self.threshold) < 0.001
        return False
    
    def get_alert_message(self, metrics_collector: MetricsCollector) -> str:
        """获取告警消息"""
        current_value = metrics_collector.get_latest_value(self.metric_name)
        return f"{self.metric_name} {self.comparison} {self.threshold} (当前值: {current_value})"

2. 性能分析工具

import cProfile
import pstats
import io
from typing import Dict, List, Any, Optional, Callable
from functools import wraps
import time
import tracemalloc
import linecache
import os

class ProfilerManager:
    """性能分析管理器"""
    
    def __init__(self):
        self.profilers: Dict[str, cProfile.Profile] = {}
        self.memory_snapshots: Dict[str, Any] = {}
        self.function_timings: Dict[str, List[float]] = defaultdict(list)
    
    def start_profiling(self, name: str = "default"):
        """开始性能分析"""
        if name in self.profilers:
            self.profilers[name].enable()
        else:
            profiler = cProfile.Profile()
            profiler.enable()
            self.profilers[name] = profiler
    
    def stop_profiling(self, name: str = "default") -> Optional[pstats.Stats]:
        """停止性能分析"""
        if name in self.profilers:
            profiler = self.profilers[name]
            profiler.disable()
            
            # 生成统计信息
            s = io.StringIO()
            stats = pstats.Stats(profiler, stream=s)
            return stats
        return None
    
    def get_profile_report(self, name: str = "default", sort_by: str = "cumulative",
                          limit: int = 20) -> str:
        """获取性能分析报告"""
        stats = self.stop_profiling(name)
        if not stats:
            return "没有找到性能分析数据"
        
        s = io.StringIO()
        stats.stream = s
        stats.sort_stats(sort_by)
        stats.print_stats(limit)
        
        return s.getvalue()
    
    def start_memory_profiling(self, name: str = "default"):
        """开始内存分析"""
        tracemalloc.start()
        snapshot = tracemalloc.take_snapshot()
        self.memory_snapshots[f"{name}_start"] = snapshot
    
    def stop_memory_profiling(self, name: str = "default") -> Dict[str, Any]:
        """停止内存分析"""
        if not tracemalloc.is_tracing():
            return {"error": "内存追踪未启动"}
        
        current_snapshot = tracemalloc.take_snapshot()
        start_snapshot = self.memory_snapshots.get(f"{name}_start")
        
        if start_snapshot:
            # 比较快照
            top_stats = current_snapshot.compare_to(start_snapshot, 'lineno')
            
            memory_report = {
                "total_memory_diff": sum(stat.size_diff for stat in top_stats),
                "top_differences": []
            }
            
            for stat in top_stats[:10]:
                memory_report["top_differences"].append({
                    "filename": stat.traceback.format()[0] if stat.traceback.format() else "unknown",
                    "size_diff": stat.size_diff,
                    "count_diff": stat.count_diff
                })
            
            return memory_report
        
        tracemalloc.stop()
        return {"error": "没有找到起始快照"}

def profile_function(profiler_manager: ProfilerManager, name: Optional[str] = None):
    """函数性能分析装饰器"""
    def decorator(func: Callable) -> Callable:
        profile_name = name or f"{func.__module__}.{func.__name__}"
        
        @wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            
            try:
                profiler_manager.start_profiling(profile_name)
                result = func(*args, **kwargs)
                return result
            finally:
                execution_time = time.time() - start_time
                profiler_manager.function_timings[profile_name].append(execution_time)
                profiler_manager.stop_profiling(profile_name)
        
        @wraps(func)
        async def async_wrapper(*args, **kwargs):
            start_time = time.time()
            
            try:
                profiler_manager.start_profiling(profile_name)
                result = await func(*args, **kwargs)
                return result
            finally:
                execution_time = time.time() - start_time
                profiler_manager.function_timings[profile_name].append(execution_time)
                profiler_manager.stop_profiling(profile_name)
        
        return async_wrapper if asyncio.iscoroutinefunction(func) else wrapper
    
    return decorator

def memory_profile(profiler_manager: ProfilerManager, name: Optional[str] = None):
    """内存分析装饰器"""
    def decorator(func: Callable) -> Callable:
        profile_name = name or f"{func.__module__}.{func.__name__}"
        
        @wraps(func)
        def wrapper(*args, **kwargs):
            profiler_manager.start_memory_profiling(profile_name)
            try:
                result = func(*args, **kwargs)
                return result
            finally:
                memory_report = profiler_manager.stop_memory_profiling(profile_name)
                print(f"内存分析报告 - {profile_name}: {memory_report}")
        
        @wraps(func)
        async def async_wrapper(*args, **kwargs):
            profiler_manager.start_memory_profiling(profile_name)
            try:
                result = await func(*args, **kwargs)
                return result
            finally:
                memory_report = profiler_manager.stop_memory_profiling(profile_name)
                print(f"内存分析报告 - {profile_name}: {memory_report}")
        
        return async_wrapper if asyncio.iscoroutinefunction(func) else wrapper
    
    return decorator

本章总结

本章深入介绍了MCP协议的性能优化与监控策略:

性能优化方面:

  1. 连接池管理 - 实现了智能连接复用、健康检查和负载均衡
  2. 缓存系统 - 提供了多种缓存策略和压缩优化
  3. 异步处理 - 实现了任务队列、批处理和并发控制

监控系统方面:

  1. 指标收集 - 自动收集系统和自定义指标
  2. 性能监控 - 实时监控和智能告警
  3. 性能分析 - 提供详细的性能分析工具

这些优化和监控机制确保MCP服务器能够: - 高效处理大量并发请求 - 及时发现和解决性能瓶颈 - 提供稳定可靠的服务质量

下一章我们将学习MCP协议的部署与运维,包括容器化部署、集群管理和故障恢复等内容。