资源概念与类型

1. 资源基础概念

在MCP协议中,资源(Resource)是服务器可以提供给客户端的数据或内容。资源具有以下特征:

  • 唯一标识:每个资源都有唯一的URI
  • 类型化内容:资源包含特定类型的数据
  • 动态性:资源内容可能随时间变化
  • 可订阅性:客户端可以订阅资源变更通知
from typing import Optional, Dict, Any, List
from datetime import datetime
from enum import Enum
from pydantic import BaseModel, Field

class ResourceType(Enum):
    """资源类型枚举"""
    TEXT = "text"
    JSON = "application/json"
    XML = "application/xml"
    HTML = "text/html"
    MARKDOWN = "text/markdown"
    CSV = "text/csv"
    BINARY = "application/octet-stream"
    IMAGE = "image/*"
    AUDIO = "audio/*"
    VIDEO = "video/*"

class ResourceMetadata(BaseModel):
    """资源元数据"""
    created_at: datetime = Field(description="创建时间")
    updated_at: datetime = Field(description="更新时间")
    size: Optional[int] = Field(default=None, description="资源大小(字节)")
    checksum: Optional[str] = Field(default=None, description="资源校验和")
    version: Optional[str] = Field(default=None, description="资源版本")
    tags: List[str] = Field(default_factory=list, description="资源标签")
    custom_fields: Dict[str, Any] = Field(default_factory=dict, description="自定义字段")

class Resource(BaseModel):
    """资源模型"""
    uri: str = Field(description="资源URI")
    name: str = Field(description="资源名称")
    description: Optional[str] = Field(default=None, description="资源描述")
    mime_type: str = Field(description="MIME类型")
    content: Any = Field(description="资源内容")
    metadata: ResourceMetadata = Field(description="资源元数据")
    
    class Config:
        json_encoders = {
            datetime: lambda v: v.isoformat()
        }

class ResourceReference(BaseModel):
    """资源引用"""
    uri: str = Field(description="资源URI")
    name: str = Field(description="资源名称")
    description: Optional[str] = Field(default=None, description="资源描述")
    mime_type: str = Field(description="MIME类型")
    metadata: ResourceMetadata = Field(description="资源元数据")

2. 资源URI设计

资源URI应该遵循一致的命名规范:

from urllib.parse import urlparse, parse_qs
import re

class ResourceURIBuilder:
    """资源URI构建器"""
    
    def __init__(self, base_scheme: str = "mcp"):
        self.base_scheme = base_scheme
    
    def build_file_uri(self, file_path: str) -> str:
        """构建文件资源URI"""
        # 规范化路径
        normalized_path = file_path.replace('\\', '/').lstrip('/')
        return f"{self.base_scheme}://file/{normalized_path}"
    
    def build_database_uri(self, database: str, table: str, 
                          record_id: Optional[str] = None) -> str:
        """构建数据库资源URI"""
        uri = f"{self.base_scheme}://database/{database}/{table}"
        if record_id:
            uri += f"/{record_id}"
        return uri
    
    def build_api_uri(self, service: str, endpoint: str, 
                     params: Optional[Dict[str, str]] = None) -> str:
        """构建API资源URI"""
        uri = f"{self.base_scheme}://api/{service}/{endpoint}"
        if params:
            query_string = '&'.join([f"{k}={v}" for k, v in params.items()])
            uri += f"?{query_string}"
        return uri
    
    def build_memory_uri(self, namespace: str, key: str) -> str:
        """构建内存资源URI"""
        return f"{self.base_scheme}://memory/{namespace}/{key}"
    
    def build_stream_uri(self, stream_type: str, stream_id: str) -> str:
        """构建流资源URI"""
        return f"{self.base_scheme}://stream/{stream_type}/{stream_id}"

class ResourceURIParser:
    """资源URI解析器"""
    
    @staticmethod
    def parse(uri: str) -> Dict[str, Any]:
        """解析资源URI"""
        parsed = urlparse(uri)
        
        result = {
            "scheme": parsed.scheme,
            "netloc": parsed.netloc,
            "path": parsed.path,
            "params": parse_qs(parsed.query),
            "fragment": parsed.fragment
        }
        
        # 解析路径组件
        path_parts = [part for part in parsed.path.split('/') if part]
        if path_parts:
            result["resource_type"] = path_parts[0]
            result["path_components"] = path_parts[1:]
        
        return result
    
    @staticmethod
    def validate_uri(uri: str) -> bool:
        """验证URI格式"""
        try:
            parsed = urlparse(uri)
            # 检查基本格式
            if not parsed.scheme or not parsed.path:
                return False
            
            # 检查路径格式
            path_pattern = r'^/[a-zA-Z0-9_-]+(/[a-zA-Z0-9_.-]+)*$'
            if not re.match(path_pattern, parsed.path):
                return False
            
            return True
        except Exception:
            return False

# 使用示例
uri_builder = ResourceURIBuilder()
uri_parser = ResourceURIParser()

# 构建各种类型的URI
file_uri = uri_builder.build_file_uri("/home/user/documents/report.pdf")
db_uri = uri_builder.build_database_uri("mydb", "users", "123")
api_uri = uri_builder.build_api_uri("weather", "current", {"city": "beijing"})

print(f"文件URI: {file_uri}")
print(f"数据库URI: {db_uri}")
print(f"API URI: {api_uri}")

# 解析URI
parsed = uri_parser.parse(file_uri)
print(f"解析结果: {parsed}")

资源管理器实现

1. 基础资源管理器

import asyncio
import hashlib
import json
from typing import Dict, List, Optional, Callable, Any
from abc import ABC, abstractmethod
from datetime import datetime
import weakref

class ResourceProvider(ABC):
    """资源提供者接口"""
    
    @abstractmethod
    async def get_resource(self, uri: str) -> Optional[Resource]:
        """获取资源"""
        pass
    
    @abstractmethod
    async def list_resources(self, pattern: Optional[str] = None) -> List[ResourceReference]:
        """列出资源"""
        pass
    
    @abstractmethod
    async def resource_exists(self, uri: str) -> bool:
        """检查资源是否存在"""
        pass
    
    @abstractmethod
    async def get_resource_metadata(self, uri: str) -> Optional[ResourceMetadata]:
        """获取资源元数据"""
        pass

class ResourceManager:
    """资源管理器"""
    
    def __init__(self):
        self.providers: Dict[str, ResourceProvider] = {}
        self.cache: Dict[str, Resource] = {}
        self.cache_ttl: Dict[str, datetime] = {}
        self.subscribers: Dict[str, List[Callable]] = {}
        self.uri_builder = ResourceURIBuilder()
        self.uri_parser = ResourceURIParser()
    
    def register_provider(self, resource_type: str, provider: ResourceProvider):
        """注册资源提供者"""
        self.providers[resource_type] = provider
    
    def _get_provider(self, uri: str) -> Optional[ResourceProvider]:
        """获取资源提供者"""
        parsed = self.uri_parser.parse(uri)
        resource_type = parsed.get("resource_type")
        return self.providers.get(resource_type)
    
    def _generate_cache_key(self, uri: str) -> str:
        """生成缓存键"""
        return hashlib.md5(uri.encode()).hexdigest()
    
    def _is_cache_valid(self, cache_key: str, ttl_seconds: int = 300) -> bool:
        """检查缓存是否有效"""
        if cache_key not in self.cache_ttl:
            return False
        
        cache_time = self.cache_ttl[cache_key]
        now = datetime.now()
        return (now - cache_time).total_seconds() < ttl_seconds
    
    async def get_resource(self, uri: str, use_cache: bool = True) -> Optional[Resource]:
        """获取资源"""
        # 验证URI
        if not self.uri_parser.validate_uri(uri):
            raise ValueError(f"无效的资源URI: {uri}")
        
        # 检查缓存
        cache_key = self._generate_cache_key(uri)
        if use_cache and self._is_cache_valid(cache_key):
            return self.cache.get(cache_key)
        
        # 获取提供者
        provider = self._get_provider(uri)
        if not provider:
            return None
        
        # 获取资源
        resource = await provider.get_resource(uri)
        
        # 更新缓存
        if resource and use_cache:
            self.cache[cache_key] = resource
            self.cache_ttl[cache_key] = datetime.now()
        
        return resource
    
    async def list_resources(self, resource_type: Optional[str] = None, 
                           pattern: Optional[str] = None) -> List[ResourceReference]:
        """列出资源"""
        all_resources = []
        
        providers_to_query = (
            {resource_type: self.providers[resource_type]} 
            if resource_type and resource_type in self.providers
            else self.providers
        )
        
        for provider in providers_to_query.values():
            try:
                resources = await provider.list_resources(pattern)
                all_resources.extend(resources)
            except Exception as e:
                print(f"提供者查询失败: {e}")
        
        return all_resources
    
    async def resource_exists(self, uri: str) -> bool:
        """检查资源是否存在"""
        provider = self._get_provider(uri)
        if not provider:
            return False
        
        return await provider.resource_exists(uri)
    
    async def get_resource_metadata(self, uri: str) -> Optional[ResourceMetadata]:
        """获取资源元数据"""
        provider = self._get_provider(uri)
        if not provider:
            return None
        
        return await provider.get_resource_metadata(uri)
    
    def clear_cache(self, uri: Optional[str] = None):
        """清理缓存"""
        if uri:
            cache_key = self._generate_cache_key(uri)
            self.cache.pop(cache_key, None)
            self.cache_ttl.pop(cache_key, None)
        else:
            self.cache.clear()
            self.cache_ttl.clear()
    
    def get_cache_stats(self) -> Dict[str, Any]:
        """获取缓存统计"""
        total_entries = len(self.cache)
        valid_entries = sum(
            1 for key in self.cache.keys()
            if self._is_cache_valid(key)
        )
        
        return {
            "total_entries": total_entries,
            "valid_entries": valid_entries,
            "expired_entries": total_entries - valid_entries,
            "cache_hit_ratio": valid_entries / total_entries if total_entries > 0 else 0
        }

2. 文件资源提供者

import os
import aiofiles
import mimetypes
from pathlib import Path

class FileResourceProvider(ResourceProvider):
    """文件资源提供者"""
    
    def __init__(self, base_path: str = "/", allowed_extensions: Optional[List[str]] = None):
        self.base_path = Path(base_path).resolve()
        self.allowed_extensions = allowed_extensions or []
    
    def _resolve_path(self, uri: str) -> Path:
        """解析URI到文件路径"""
        parsed = ResourceURIParser.parse(uri)
        path_components = parsed.get("path_components", [])
        
        # 构建相对路径
        relative_path = Path(*path_components) if path_components else Path()
        
        # 解析为绝对路径
        full_path = (self.base_path / relative_path).resolve()
        
        # 安全检查:确保路径在基础目录内
        if not str(full_path).startswith(str(self.base_path)):
            raise ValueError(f"路径访问被拒绝: {full_path}")
        
        return full_path
    
    def _is_allowed_file(self, file_path: Path) -> bool:
        """检查文件是否被允许访问"""
        if not self.allowed_extensions:
            return True
        
        return file_path.suffix.lower() in self.allowed_extensions
    
    def _calculate_checksum(self, file_path: Path) -> str:
        """计算文件校验和"""
        hash_md5 = hashlib.md5()
        with open(file_path, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                hash_md5.update(chunk)
        return hash_md5.hexdigest()
    
    async def get_resource(self, uri: str) -> Optional[Resource]:
        """获取文件资源"""
        try:
            file_path = self._resolve_path(uri)
            
            if not file_path.exists() or not file_path.is_file():
                return None
            
            if not self._is_allowed_file(file_path):
                return None
            
            # 获取文件信息
            stat = file_path.stat()
            mime_type, _ = mimetypes.guess_type(str(file_path))
            mime_type = mime_type or "application/octet-stream"
            
            # 读取文件内容
            if mime_type.startswith('text/') or mime_type in ['application/json', 'application/xml']:
                async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
                    content = await f.read()
            else:
                async with aiofiles.open(file_path, 'rb') as f:
                    content = await f.read()
            
            # 创建元数据
            metadata = ResourceMetadata(
                created_at=datetime.fromtimestamp(stat.st_ctime),
                updated_at=datetime.fromtimestamp(stat.st_mtime),
                size=stat.st_size,
                checksum=self._calculate_checksum(file_path),
                tags=["file", file_path.suffix.lstrip('.') if file_path.suffix else "no-extension"]
            )
            
            return Resource(
                uri=uri,
                name=file_path.name,
                description=f"文件: {file_path}",
                mime_type=mime_type,
                content=content,
                metadata=metadata
            )
        
        except Exception as e:
            print(f"获取文件资源失败: {e}")
            return None
    
    async def list_resources(self, pattern: Optional[str] = None) -> List[ResourceReference]:
        """列出文件资源"""
        resources = []
        
        try:
            for file_path in self.base_path.rglob(pattern or "*"):
                if file_path.is_file() and self._is_allowed_file(file_path):
                    # 构建URI
                    relative_path = file_path.relative_to(self.base_path)
                    uri = ResourceURIBuilder().build_file_uri(str(relative_path))
                    
                    # 获取文件信息
                    stat = file_path.stat()
                    mime_type, _ = mimetypes.guess_type(str(file_path))
                    mime_type = mime_type or "application/octet-stream"
                    
                    metadata = ResourceMetadata(
                        created_at=datetime.fromtimestamp(stat.st_ctime),
                        updated_at=datetime.fromtimestamp(stat.st_mtime),
                        size=stat.st_size,
                        tags=["file", file_path.suffix.lstrip('.') if file_path.suffix else "no-extension"]
                    )
                    
                    resources.append(ResourceReference(
                        uri=uri,
                        name=file_path.name,
                        description=f"文件: {file_path}",
                        mime_type=mime_type,
                        metadata=metadata
                    ))
        
        except Exception as e:
            print(f"列出文件资源失败: {e}")
        
        return resources
    
    async def resource_exists(self, uri: str) -> bool:
        """检查文件资源是否存在"""
        try:
            file_path = self._resolve_path(uri)
            return file_path.exists() and file_path.is_file() and self._is_allowed_file(file_path)
        except Exception:
            return False
    
    async def get_resource_metadata(self, uri: str) -> Optional[ResourceMetadata]:
        """获取文件资源元数据"""
        try:
            file_path = self._resolve_path(uri)
            
            if not file_path.exists() or not file_path.is_file():
                return None
            
            stat = file_path.stat()
            
            return ResourceMetadata(
                created_at=datetime.fromtimestamp(stat.st_ctime),
                updated_at=datetime.fromtimestamp(stat.st_mtime),
                size=stat.st_size,
                checksum=self._calculate_checksum(file_path),
                tags=["file", file_path.suffix.lstrip('.') if file_path.suffix else "no-extension"]
            )
        
        except Exception:
            return None

3. 内存资源提供者

from typing import Dict, Any
import json

class MemoryResourceProvider(ResourceProvider):
    """内存资源提供者"""
    
    def __init__(self):
        self.storage: Dict[str, Dict[str, Any]] = {}
        self.metadata_storage: Dict[str, ResourceMetadata] = {}
    
    def _parse_memory_uri(self, uri: str) -> tuple[str, str]:
        """解析内存URI"""
        parsed = ResourceURIParser.parse(uri)
        path_components = parsed.get("path_components", [])
        
        if len(path_components) < 2:
            raise ValueError(f"无效的内存URI格式: {uri}")
        
        namespace = path_components[0]
        key = '/'.join(path_components[1:])
        
        return namespace, key
    
    def _get_storage_key(self, namespace: str, key: str) -> str:
        """获取存储键"""
        return f"{namespace}:{key}"
    
    async def set_resource(self, uri: str, content: Any, 
                          mime_type: str = "application/json",
                          description: Optional[str] = None,
                          tags: Optional[List[str]] = None) -> None:
        """设置内存资源"""
        namespace, key = self._parse_memory_uri(uri)
        storage_key = self._get_storage_key(namespace, key)
        
        # 序列化内容
        if isinstance(content, (dict, list)):
            serialized_content = json.dumps(content, ensure_ascii=False, indent=2)
            if mime_type == "application/json":
                pass  # 保持JSON类型
        elif isinstance(content, str):
            serialized_content = content
        else:
            serialized_content = str(content)
            mime_type = "text/plain"
        
        # 存储资源
        self.storage[storage_key] = {
            "uri": uri,
            "namespace": namespace,
            "key": key,
            "content": serialized_content,
            "original_content": content,
            "mime_type": mime_type,
            "description": description or f"内存资源: {namespace}/{key}"
        }
        
        # 创建元数据
        now = datetime.now()
        content_size = len(serialized_content.encode('utf-8'))
        
        self.metadata_storage[storage_key] = ResourceMetadata(
            created_at=now,
            updated_at=now,
            size=content_size,
            checksum=hashlib.md5(serialized_content.encode()).hexdigest(),
            tags=tags or ["memory", namespace]
        )
    
    async def get_resource(self, uri: str) -> Optional[Resource]:
        """获取内存资源"""
        try:
            namespace, key = self._parse_memory_uri(uri)
            storage_key = self._get_storage_key(namespace, key)
            
            if storage_key not in self.storage:
                return None
            
            data = self.storage[storage_key]
            metadata = self.metadata_storage[storage_key]
            
            return Resource(
                uri=uri,
                name=key,
                description=data["description"],
                mime_type=data["mime_type"],
                content=data["content"],
                metadata=metadata
            )
        
        except Exception as e:
            print(f"获取内存资源失败: {e}")
            return None
    
    async def list_resources(self, pattern: Optional[str] = None) -> List[ResourceReference]:
        """列出内存资源"""
        resources = []
        
        for storage_key, data in self.storage.items():
            if pattern and pattern not in data["key"]:
                continue
            
            metadata = self.metadata_storage[storage_key]
            
            resources.append(ResourceReference(
                uri=data["uri"],
                name=data["key"],
                description=data["description"],
                mime_type=data["mime_type"],
                metadata=metadata
            ))
        
        return resources
    
    async def resource_exists(self, uri: str) -> bool:
        """检查内存资源是否存在"""
        try:
            namespace, key = self._parse_memory_uri(uri)
            storage_key = self._get_storage_key(namespace, key)
            return storage_key in self.storage
        except Exception:
            return False
    
    async def get_resource_metadata(self, uri: str) -> Optional[ResourceMetadata]:
        """获取内存资源元数据"""
        try:
            namespace, key = self._parse_memory_uri(uri)
            storage_key = self._get_storage_key(namespace, key)
            return self.metadata_storage.get(storage_key)
        except Exception:
            return None
    
    async def delete_resource(self, uri: str) -> bool:
        """删除内存资源"""
        try:
            namespace, key = self._parse_memory_uri(uri)
            storage_key = self._get_storage_key(namespace, key)
            
            if storage_key in self.storage:
                del self.storage[storage_key]
                del self.metadata_storage[storage_key]
                return True
            
            return False
        except Exception:
            return False
    
    def clear_namespace(self, namespace: str) -> int:
        """清空命名空间"""
        cleared_count = 0
        keys_to_remove = []
        
        for storage_key, data in self.storage.items():
            if data["namespace"] == namespace:
                keys_to_remove.append(storage_key)
        
        for key in keys_to_remove:
            del self.storage[key]
            del self.metadata_storage[key]
            cleared_count += 1
        
        return cleared_count
    
    def get_namespaces(self) -> List[str]:
        """获取所有命名空间"""
        namespaces = set()
        for data in self.storage.values():
            namespaces.add(data["namespace"])
        return list(namespaces)

订阅机制实现

1. 资源订阅管理器

import asyncio
from typing import Set, Callable, Dict, List, Any
from dataclasses import dataclass
from enum import Enum
import weakref

class SubscriptionEventType(Enum):
    """订阅事件类型"""
    RESOURCE_CREATED = "resource_created"
    RESOURCE_UPDATED = "resource_updated"
    RESOURCE_DELETED = "resource_deleted"
    RESOURCE_ACCESSED = "resource_accessed"

@dataclass
class SubscriptionEvent:
    """订阅事件"""
    event_type: SubscriptionEventType
    resource_uri: str
    timestamp: datetime
    data: Optional[Dict[str, Any]] = None
    metadata: Optional[ResourceMetadata] = None

class ResourceSubscription:
    """资源订阅"""
    
    def __init__(self, 
                 subscription_id: str,
                 resource_uri: str,
                 event_types: Set[SubscriptionEventType],
                 callback: Callable[[SubscriptionEvent], None]):
        self.subscription_id = subscription_id
        self.resource_uri = resource_uri
        self.event_types = event_types
        self.callback = callback
        self.created_at = datetime.now()
        self.last_event_at: Optional[datetime] = None
        self.event_count = 0
        self.is_active = True

class SubscriptionManager:
    """订阅管理器"""
    
    def __init__(self):
        self.subscriptions: Dict[str, ResourceSubscription] = {}
        self.uri_subscriptions: Dict[str, Set[str]] = {}  # URI -> subscription_ids
        self.event_queue: asyncio.Queue = asyncio.Queue()
        self.event_processor_task: Optional[asyncio.Task] = None
        self.is_running = False
    
    def start(self):
        """启动订阅管理器"""
        if not self.is_running:
            self.is_running = True
            self.event_processor_task = asyncio.create_task(self._process_events())
    
    async def stop(self):
        """停止订阅管理器"""
        self.is_running = False
        if self.event_processor_task:
            self.event_processor_task.cancel()
            try:
                await self.event_processor_task
            except asyncio.CancelledError:
                pass
    
    def subscribe(self, 
                 resource_uri: str,
                 event_types: Set[SubscriptionEventType],
                 callback: Callable[[SubscriptionEvent], None]) -> str:
        """订阅资源"""
        subscription_id = f"sub_{int(datetime.now().timestamp() * 1000)}_{len(self.subscriptions)}"
        
        subscription = ResourceSubscription(
            subscription_id=subscription_id,
            resource_uri=resource_uri,
            event_types=event_types,
            callback=callback
        )
        
        self.subscriptions[subscription_id] = subscription
        
        # 更新URI索引
        if resource_uri not in self.uri_subscriptions:
            self.uri_subscriptions[resource_uri] = set()
        self.uri_subscriptions[resource_uri].add(subscription_id)
        
        return subscription_id
    
    def unsubscribe(self, subscription_id: str) -> bool:
        """取消订阅"""
        if subscription_id not in self.subscriptions:
            return False
        
        subscription = self.subscriptions[subscription_id]
        resource_uri = subscription.resource_uri
        
        # 移除订阅
        del self.subscriptions[subscription_id]
        
        # 更新URI索引
        if resource_uri in self.uri_subscriptions:
            self.uri_subscriptions[resource_uri].discard(subscription_id)
            if not self.uri_subscriptions[resource_uri]:
                del self.uri_subscriptions[resource_uri]
        
        return True
    
    def unsubscribe_all(self, resource_uri: str) -> int:
        """取消资源的所有订阅"""
        if resource_uri not in self.uri_subscriptions:
            return 0
        
        subscription_ids = list(self.uri_subscriptions[resource_uri])
        count = 0
        
        for subscription_id in subscription_ids:
            if self.unsubscribe(subscription_id):
                count += 1
        
        return count
    
    async def notify(self, event: SubscriptionEvent):
        """通知订阅者"""
        await self.event_queue.put(event)
    
    async def _process_events(self):
        """处理事件队列"""
        while self.is_running:
            try:
                # 等待事件,设置超时避免无限阻塞
                event = await asyncio.wait_for(self.event_queue.get(), timeout=1.0)
                await self._dispatch_event(event)
            except asyncio.TimeoutError:
                continue
            except Exception as e:
                print(f"处理事件时出错: {e}")
    
    async def _dispatch_event(self, event: SubscriptionEvent):
        """分发事件给订阅者"""
        resource_uri = event.resource_uri
        
        if resource_uri not in self.uri_subscriptions:
            return
        
        subscription_ids = list(self.uri_subscriptions[resource_uri])
        
        for subscription_id in subscription_ids:
            subscription = self.subscriptions.get(subscription_id)
            
            if not subscription or not subscription.is_active:
                continue
            
            if event.event_type not in subscription.event_types:
                continue
            
            try:
                # 异步调用回调函数
                if asyncio.iscoroutinefunction(subscription.callback):
                    await subscription.callback(event)
                else:
                    subscription.callback(event)
                
                # 更新订阅统计
                subscription.last_event_at = datetime.now()
                subscription.event_count += 1
                
            except Exception as e:
                print(f"订阅回调执行失败 {subscription_id}: {e}")
                # 可以选择禁用有问题的订阅
                # subscription.is_active = False
    
    def get_subscription_stats(self) -> Dict[str, Any]:
        """获取订阅统计信息"""
        active_subscriptions = sum(1 for s in self.subscriptions.values() if s.is_active)
        total_events = sum(s.event_count for s in self.subscriptions.values())
        
        # 按资源URI分组统计
        uri_stats = {}
        for uri, subscription_ids in self.uri_subscriptions.items():
            active_count = sum(
                1 for sid in subscription_ids 
                if sid in self.subscriptions and self.subscriptions[sid].is_active
            )
            uri_stats[uri] = {
                "total_subscriptions": len(subscription_ids),
                "active_subscriptions": active_count
            }
        
        return {
            "total_subscriptions": len(self.subscriptions),
            "active_subscriptions": active_subscriptions,
            "total_events_processed": total_events,
            "subscribed_resources": len(self.uri_subscriptions),
            "uri_statistics": uri_stats
        }
    
    def get_subscriptions_for_resource(self, resource_uri: str) -> List[Dict[str, Any]]:
        """获取资源的所有订阅信息"""
        if resource_uri not in self.uri_subscriptions:
            return []
        
        subscription_ids = self.uri_subscriptions[resource_uri]
        subscriptions_info = []
        
        for subscription_id in subscription_ids:
            subscription = self.subscriptions.get(subscription_id)
            if subscription:
                subscriptions_info.append({
                    "subscription_id": subscription_id,
                    "event_types": [et.value for et in subscription.event_types],
                    "created_at": subscription.created_at.isoformat(),
                    "last_event_at": subscription.last_event_at.isoformat() if subscription.last_event_at else None,
                    "event_count": subscription.event_count,
                    "is_active": subscription.is_active
                })
        
        return subscriptions_info

2. 资源变更监控

import asyncio
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from typing import Dict, Set

class FileSystemWatcher(FileSystemEventHandler):
    """文件系统监控器"""
    
    def __init__(self, subscription_manager: SubscriptionManager, 
                 uri_builder: ResourceURIBuilder):
        self.subscription_manager = subscription_manager
        self.uri_builder = uri_builder
        self.watched_paths: Dict[str, str] = {}  # path -> base_path
    
    def on_created(self, event):
        """文件创建事件"""
        if not event.is_directory:
            asyncio.create_task(self._handle_file_event(
                event.src_path, SubscriptionEventType.RESOURCE_CREATED
            ))
    
    def on_modified(self, event):
        """文件修改事件"""
        if not event.is_directory:
            asyncio.create_task(self._handle_file_event(
                event.src_path, SubscriptionEventType.RESOURCE_UPDATED
            ))
    
    def on_deleted(self, event):
        """文件删除事件"""
        if not event.is_directory:
            asyncio.create_task(self._handle_file_event(
                event.src_path, SubscriptionEventType.RESOURCE_DELETED
            ))
    
    async def _handle_file_event(self, file_path: str, event_type: SubscriptionEventType):
        """处理文件事件"""
        try:
            # 找到对应的基础路径
            base_path = None
            for watched_path, watched_base in self.watched_paths.items():
                if file_path.startswith(watched_path):
                    base_path = watched_base
                    break
            
            if not base_path:
                return
            
            # 构建资源URI
            relative_path = os.path.relpath(file_path, base_path)
            uri = self.uri_builder.build_file_uri(relative_path)
            
            # 创建事件
            event = SubscriptionEvent(
                event_type=event_type,
                resource_uri=uri,
                timestamp=datetime.now(),
                data={"file_path": file_path}
            )
            
            # 通知订阅者
            await self.subscription_manager.notify(event)
            
        except Exception as e:
            print(f"处理文件事件失败: {e}")

class ResourceMonitor:
    """资源监控器"""
    
    def __init__(self, subscription_manager: SubscriptionManager):
        self.subscription_manager = subscription_manager
        self.uri_builder = ResourceURIBuilder()
        self.file_observer = Observer()
        self.file_watcher = FileSystemWatcher(subscription_manager, self.uri_builder)
        self.watched_directories: Set[str] = set()
        self.is_monitoring = False
    
    def start_monitoring(self):
        """开始监控"""
        if not self.is_monitoring:
            self.file_observer.start()
            self.is_monitoring = True
    
    def stop_monitoring(self):
        """停止监控"""
        if self.is_monitoring:
            self.file_observer.stop()
            self.file_observer.join()
            self.is_monitoring = False
    
    def watch_directory(self, directory_path: str, recursive: bool = True):
        """监控目录"""
        if directory_path not in self.watched_directories:
            self.file_observer.schedule(
                self.file_watcher, 
                directory_path, 
                recursive=recursive
            )
            self.watched_directories.add(directory_path)
            self.file_watcher.watched_paths[directory_path] = directory_path
    
    def unwatch_directory(self, directory_path: str):
        """停止监控目录"""
        if directory_path in self.watched_directories:
            # watchdog 不提供直接的unschedule方法,需要重新创建observer
            self.watched_directories.discard(directory_path)
            self.file_watcher.watched_paths.pop(directory_path, None)
    
    async def trigger_manual_event(self, resource_uri: str, 
                                  event_type: SubscriptionEventType,
                                  data: Optional[Dict[str, Any]] = None):
        """手动触发事件"""
        event = SubscriptionEvent(
            event_type=event_type,
            resource_uri=resource_uri,
            timestamp=datetime.now(),
            data=data
        )
        
        await self.subscription_manager.notify(event)

本章总结

本章详细介绍了MCP协议中的资源管理与订阅机制,包括:

核心内容

  1. 资源概念与类型

    • 资源基础概念和特征
    • 资源URI设计规范
    • 资源元数据管理
  2. 资源管理器实现

    • 基础资源管理器架构
    • 文件资源提供者
    • 内存资源提供者
    • 缓存机制
  3. 订阅机制实现

    • 资源订阅管理器
    • 事件类型和处理
    • 异步事件分发
  4. 资源变更监控

    • 文件系统监控
    • 自动事件触发
    • 手动事件管理

最佳实践

  1. 资源设计

    • 使用一致的URI命名规范
    • 合理设计资源元数据
    • 考虑资源的生命周期
  2. 性能优化

    • 实现适当的缓存策略
    • 使用异步处理机制
    • 避免过度订阅
  3. 错误处理

    • 处理资源不存在的情况
    • 优雅处理订阅回调错误
    • 实现重试机制
  4. 监控和维护

    • 监控订阅统计信息
    • 定期清理无效订阅
    • 记录资源访问日志

下一章我们将学习提示模板与动态生成,了解如何在MCP协议中管理和使用提示模板。