资源概念与类型
1. 资源基础概念
在MCP协议中,资源(Resource)是服务器可以提供给客户端的数据或内容。资源具有以下特征:
- 唯一标识:每个资源都有唯一的URI
- 类型化内容:资源包含特定类型的数据
- 动态性:资源内容可能随时间变化
- 可订阅性:客户端可以订阅资源变更通知
from typing import Optional, Dict, Any, List
from datetime import datetime
from enum import Enum
from pydantic import BaseModel, Field
class ResourceType(Enum):
"""资源类型枚举"""
TEXT = "text"
JSON = "application/json"
XML = "application/xml"
HTML = "text/html"
MARKDOWN = "text/markdown"
CSV = "text/csv"
BINARY = "application/octet-stream"
IMAGE = "image/*"
AUDIO = "audio/*"
VIDEO = "video/*"
class ResourceMetadata(BaseModel):
"""资源元数据"""
created_at: datetime = Field(description="创建时间")
updated_at: datetime = Field(description="更新时间")
size: Optional[int] = Field(default=None, description="资源大小(字节)")
checksum: Optional[str] = Field(default=None, description="资源校验和")
version: Optional[str] = Field(default=None, description="资源版本")
tags: List[str] = Field(default_factory=list, description="资源标签")
custom_fields: Dict[str, Any] = Field(default_factory=dict, description="自定义字段")
class Resource(BaseModel):
"""资源模型"""
uri: str = Field(description="资源URI")
name: str = Field(description="资源名称")
description: Optional[str] = Field(default=None, description="资源描述")
mime_type: str = Field(description="MIME类型")
content: Any = Field(description="资源内容")
metadata: ResourceMetadata = Field(description="资源元数据")
class Config:
json_encoders = {
datetime: lambda v: v.isoformat()
}
class ResourceReference(BaseModel):
"""资源引用"""
uri: str = Field(description="资源URI")
name: str = Field(description="资源名称")
description: Optional[str] = Field(default=None, description="资源描述")
mime_type: str = Field(description="MIME类型")
metadata: ResourceMetadata = Field(description="资源元数据")
2. 资源URI设计
资源URI应该遵循一致的命名规范:
from urllib.parse import urlparse, parse_qs
import re
class ResourceURIBuilder:
"""资源URI构建器"""
def __init__(self, base_scheme: str = "mcp"):
self.base_scheme = base_scheme
def build_file_uri(self, file_path: str) -> str:
"""构建文件资源URI"""
# 规范化路径
normalized_path = file_path.replace('\\', '/').lstrip('/')
return f"{self.base_scheme}://file/{normalized_path}"
def build_database_uri(self, database: str, table: str,
record_id: Optional[str] = None) -> str:
"""构建数据库资源URI"""
uri = f"{self.base_scheme}://database/{database}/{table}"
if record_id:
uri += f"/{record_id}"
return uri
def build_api_uri(self, service: str, endpoint: str,
params: Optional[Dict[str, str]] = None) -> str:
"""构建API资源URI"""
uri = f"{self.base_scheme}://api/{service}/{endpoint}"
if params:
query_string = '&'.join([f"{k}={v}" for k, v in params.items()])
uri += f"?{query_string}"
return uri
def build_memory_uri(self, namespace: str, key: str) -> str:
"""构建内存资源URI"""
return f"{self.base_scheme}://memory/{namespace}/{key}"
def build_stream_uri(self, stream_type: str, stream_id: str) -> str:
"""构建流资源URI"""
return f"{self.base_scheme}://stream/{stream_type}/{stream_id}"
class ResourceURIParser:
"""资源URI解析器"""
@staticmethod
def parse(uri: str) -> Dict[str, Any]:
"""解析资源URI"""
parsed = urlparse(uri)
result = {
"scheme": parsed.scheme,
"netloc": parsed.netloc,
"path": parsed.path,
"params": parse_qs(parsed.query),
"fragment": parsed.fragment
}
# 解析路径组件
path_parts = [part for part in parsed.path.split('/') if part]
if path_parts:
result["resource_type"] = path_parts[0]
result["path_components"] = path_parts[1:]
return result
@staticmethod
def validate_uri(uri: str) -> bool:
"""验证URI格式"""
try:
parsed = urlparse(uri)
# 检查基本格式
if not parsed.scheme or not parsed.path:
return False
# 检查路径格式
path_pattern = r'^/[a-zA-Z0-9_-]+(/[a-zA-Z0-9_.-]+)*$'
if not re.match(path_pattern, parsed.path):
return False
return True
except Exception:
return False
# 使用示例
uri_builder = ResourceURIBuilder()
uri_parser = ResourceURIParser()
# 构建各种类型的URI
file_uri = uri_builder.build_file_uri("/home/user/documents/report.pdf")
db_uri = uri_builder.build_database_uri("mydb", "users", "123")
api_uri = uri_builder.build_api_uri("weather", "current", {"city": "beijing"})
print(f"文件URI: {file_uri}")
print(f"数据库URI: {db_uri}")
print(f"API URI: {api_uri}")
# 解析URI
parsed = uri_parser.parse(file_uri)
print(f"解析结果: {parsed}")
资源管理器实现
1. 基础资源管理器
import asyncio
import hashlib
import json
from typing import Dict, List, Optional, Callable, Any
from abc import ABC, abstractmethod
from datetime import datetime
import weakref
class ResourceProvider(ABC):
"""资源提供者接口"""
@abstractmethod
async def get_resource(self, uri: str) -> Optional[Resource]:
"""获取资源"""
pass
@abstractmethod
async def list_resources(self, pattern: Optional[str] = None) -> List[ResourceReference]:
"""列出资源"""
pass
@abstractmethod
async def resource_exists(self, uri: str) -> bool:
"""检查资源是否存在"""
pass
@abstractmethod
async def get_resource_metadata(self, uri: str) -> Optional[ResourceMetadata]:
"""获取资源元数据"""
pass
class ResourceManager:
"""资源管理器"""
def __init__(self):
self.providers: Dict[str, ResourceProvider] = {}
self.cache: Dict[str, Resource] = {}
self.cache_ttl: Dict[str, datetime] = {}
self.subscribers: Dict[str, List[Callable]] = {}
self.uri_builder = ResourceURIBuilder()
self.uri_parser = ResourceURIParser()
def register_provider(self, resource_type: str, provider: ResourceProvider):
"""注册资源提供者"""
self.providers[resource_type] = provider
def _get_provider(self, uri: str) -> Optional[ResourceProvider]:
"""获取资源提供者"""
parsed = self.uri_parser.parse(uri)
resource_type = parsed.get("resource_type")
return self.providers.get(resource_type)
def _generate_cache_key(self, uri: str) -> str:
"""生成缓存键"""
return hashlib.md5(uri.encode()).hexdigest()
def _is_cache_valid(self, cache_key: str, ttl_seconds: int = 300) -> bool:
"""检查缓存是否有效"""
if cache_key not in self.cache_ttl:
return False
cache_time = self.cache_ttl[cache_key]
now = datetime.now()
return (now - cache_time).total_seconds() < ttl_seconds
async def get_resource(self, uri: str, use_cache: bool = True) -> Optional[Resource]:
"""获取资源"""
# 验证URI
if not self.uri_parser.validate_uri(uri):
raise ValueError(f"无效的资源URI: {uri}")
# 检查缓存
cache_key = self._generate_cache_key(uri)
if use_cache and self._is_cache_valid(cache_key):
return self.cache.get(cache_key)
# 获取提供者
provider = self._get_provider(uri)
if not provider:
return None
# 获取资源
resource = await provider.get_resource(uri)
# 更新缓存
if resource and use_cache:
self.cache[cache_key] = resource
self.cache_ttl[cache_key] = datetime.now()
return resource
async def list_resources(self, resource_type: Optional[str] = None,
pattern: Optional[str] = None) -> List[ResourceReference]:
"""列出资源"""
all_resources = []
providers_to_query = (
{resource_type: self.providers[resource_type]}
if resource_type and resource_type in self.providers
else self.providers
)
for provider in providers_to_query.values():
try:
resources = await provider.list_resources(pattern)
all_resources.extend(resources)
except Exception as e:
print(f"提供者查询失败: {e}")
return all_resources
async def resource_exists(self, uri: str) -> bool:
"""检查资源是否存在"""
provider = self._get_provider(uri)
if not provider:
return False
return await provider.resource_exists(uri)
async def get_resource_metadata(self, uri: str) -> Optional[ResourceMetadata]:
"""获取资源元数据"""
provider = self._get_provider(uri)
if not provider:
return None
return await provider.get_resource_metadata(uri)
def clear_cache(self, uri: Optional[str] = None):
"""清理缓存"""
if uri:
cache_key = self._generate_cache_key(uri)
self.cache.pop(cache_key, None)
self.cache_ttl.pop(cache_key, None)
else:
self.cache.clear()
self.cache_ttl.clear()
def get_cache_stats(self) -> Dict[str, Any]:
"""获取缓存统计"""
total_entries = len(self.cache)
valid_entries = sum(
1 for key in self.cache.keys()
if self._is_cache_valid(key)
)
return {
"total_entries": total_entries,
"valid_entries": valid_entries,
"expired_entries": total_entries - valid_entries,
"cache_hit_ratio": valid_entries / total_entries if total_entries > 0 else 0
}
2. 文件资源提供者
import os
import aiofiles
import mimetypes
from pathlib import Path
class FileResourceProvider(ResourceProvider):
"""文件资源提供者"""
def __init__(self, base_path: str = "/", allowed_extensions: Optional[List[str]] = None):
self.base_path = Path(base_path).resolve()
self.allowed_extensions = allowed_extensions or []
def _resolve_path(self, uri: str) -> Path:
"""解析URI到文件路径"""
parsed = ResourceURIParser.parse(uri)
path_components = parsed.get("path_components", [])
# 构建相对路径
relative_path = Path(*path_components) if path_components else Path()
# 解析为绝对路径
full_path = (self.base_path / relative_path).resolve()
# 安全检查:确保路径在基础目录内
if not str(full_path).startswith(str(self.base_path)):
raise ValueError(f"路径访问被拒绝: {full_path}")
return full_path
def _is_allowed_file(self, file_path: Path) -> bool:
"""检查文件是否被允许访问"""
if not self.allowed_extensions:
return True
return file_path.suffix.lower() in self.allowed_extensions
def _calculate_checksum(self, file_path: Path) -> str:
"""计算文件校验和"""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
async def get_resource(self, uri: str) -> Optional[Resource]:
"""获取文件资源"""
try:
file_path = self._resolve_path(uri)
if not file_path.exists() or not file_path.is_file():
return None
if not self._is_allowed_file(file_path):
return None
# 获取文件信息
stat = file_path.stat()
mime_type, _ = mimetypes.guess_type(str(file_path))
mime_type = mime_type or "application/octet-stream"
# 读取文件内容
if mime_type.startswith('text/') or mime_type in ['application/json', 'application/xml']:
async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
content = await f.read()
else:
async with aiofiles.open(file_path, 'rb') as f:
content = await f.read()
# 创建元数据
metadata = ResourceMetadata(
created_at=datetime.fromtimestamp(stat.st_ctime),
updated_at=datetime.fromtimestamp(stat.st_mtime),
size=stat.st_size,
checksum=self._calculate_checksum(file_path),
tags=["file", file_path.suffix.lstrip('.') if file_path.suffix else "no-extension"]
)
return Resource(
uri=uri,
name=file_path.name,
description=f"文件: {file_path}",
mime_type=mime_type,
content=content,
metadata=metadata
)
except Exception as e:
print(f"获取文件资源失败: {e}")
return None
async def list_resources(self, pattern: Optional[str] = None) -> List[ResourceReference]:
"""列出文件资源"""
resources = []
try:
for file_path in self.base_path.rglob(pattern or "*"):
if file_path.is_file() and self._is_allowed_file(file_path):
# 构建URI
relative_path = file_path.relative_to(self.base_path)
uri = ResourceURIBuilder().build_file_uri(str(relative_path))
# 获取文件信息
stat = file_path.stat()
mime_type, _ = mimetypes.guess_type(str(file_path))
mime_type = mime_type or "application/octet-stream"
metadata = ResourceMetadata(
created_at=datetime.fromtimestamp(stat.st_ctime),
updated_at=datetime.fromtimestamp(stat.st_mtime),
size=stat.st_size,
tags=["file", file_path.suffix.lstrip('.') if file_path.suffix else "no-extension"]
)
resources.append(ResourceReference(
uri=uri,
name=file_path.name,
description=f"文件: {file_path}",
mime_type=mime_type,
metadata=metadata
))
except Exception as e:
print(f"列出文件资源失败: {e}")
return resources
async def resource_exists(self, uri: str) -> bool:
"""检查文件资源是否存在"""
try:
file_path = self._resolve_path(uri)
return file_path.exists() and file_path.is_file() and self._is_allowed_file(file_path)
except Exception:
return False
async def get_resource_metadata(self, uri: str) -> Optional[ResourceMetadata]:
"""获取文件资源元数据"""
try:
file_path = self._resolve_path(uri)
if not file_path.exists() or not file_path.is_file():
return None
stat = file_path.stat()
return ResourceMetadata(
created_at=datetime.fromtimestamp(stat.st_ctime),
updated_at=datetime.fromtimestamp(stat.st_mtime),
size=stat.st_size,
checksum=self._calculate_checksum(file_path),
tags=["file", file_path.suffix.lstrip('.') if file_path.suffix else "no-extension"]
)
except Exception:
return None
3. 内存资源提供者
from typing import Dict, Any
import json
class MemoryResourceProvider(ResourceProvider):
"""内存资源提供者"""
def __init__(self):
self.storage: Dict[str, Dict[str, Any]] = {}
self.metadata_storage: Dict[str, ResourceMetadata] = {}
def _parse_memory_uri(self, uri: str) -> tuple[str, str]:
"""解析内存URI"""
parsed = ResourceURIParser.parse(uri)
path_components = parsed.get("path_components", [])
if len(path_components) < 2:
raise ValueError(f"无效的内存URI格式: {uri}")
namespace = path_components[0]
key = '/'.join(path_components[1:])
return namespace, key
def _get_storage_key(self, namespace: str, key: str) -> str:
"""获取存储键"""
return f"{namespace}:{key}"
async def set_resource(self, uri: str, content: Any,
mime_type: str = "application/json",
description: Optional[str] = None,
tags: Optional[List[str]] = None) -> None:
"""设置内存资源"""
namespace, key = self._parse_memory_uri(uri)
storage_key = self._get_storage_key(namespace, key)
# 序列化内容
if isinstance(content, (dict, list)):
serialized_content = json.dumps(content, ensure_ascii=False, indent=2)
if mime_type == "application/json":
pass # 保持JSON类型
elif isinstance(content, str):
serialized_content = content
else:
serialized_content = str(content)
mime_type = "text/plain"
# 存储资源
self.storage[storage_key] = {
"uri": uri,
"namespace": namespace,
"key": key,
"content": serialized_content,
"original_content": content,
"mime_type": mime_type,
"description": description or f"内存资源: {namespace}/{key}"
}
# 创建元数据
now = datetime.now()
content_size = len(serialized_content.encode('utf-8'))
self.metadata_storage[storage_key] = ResourceMetadata(
created_at=now,
updated_at=now,
size=content_size,
checksum=hashlib.md5(serialized_content.encode()).hexdigest(),
tags=tags or ["memory", namespace]
)
async def get_resource(self, uri: str) -> Optional[Resource]:
"""获取内存资源"""
try:
namespace, key = self._parse_memory_uri(uri)
storage_key = self._get_storage_key(namespace, key)
if storage_key not in self.storage:
return None
data = self.storage[storage_key]
metadata = self.metadata_storage[storage_key]
return Resource(
uri=uri,
name=key,
description=data["description"],
mime_type=data["mime_type"],
content=data["content"],
metadata=metadata
)
except Exception as e:
print(f"获取内存资源失败: {e}")
return None
async def list_resources(self, pattern: Optional[str] = None) -> List[ResourceReference]:
"""列出内存资源"""
resources = []
for storage_key, data in self.storage.items():
if pattern and pattern not in data["key"]:
continue
metadata = self.metadata_storage[storage_key]
resources.append(ResourceReference(
uri=data["uri"],
name=data["key"],
description=data["description"],
mime_type=data["mime_type"],
metadata=metadata
))
return resources
async def resource_exists(self, uri: str) -> bool:
"""检查内存资源是否存在"""
try:
namespace, key = self._parse_memory_uri(uri)
storage_key = self._get_storage_key(namespace, key)
return storage_key in self.storage
except Exception:
return False
async def get_resource_metadata(self, uri: str) -> Optional[ResourceMetadata]:
"""获取内存资源元数据"""
try:
namespace, key = self._parse_memory_uri(uri)
storage_key = self._get_storage_key(namespace, key)
return self.metadata_storage.get(storage_key)
except Exception:
return None
async def delete_resource(self, uri: str) -> bool:
"""删除内存资源"""
try:
namespace, key = self._parse_memory_uri(uri)
storage_key = self._get_storage_key(namespace, key)
if storage_key in self.storage:
del self.storage[storage_key]
del self.metadata_storage[storage_key]
return True
return False
except Exception:
return False
def clear_namespace(self, namespace: str) -> int:
"""清空命名空间"""
cleared_count = 0
keys_to_remove = []
for storage_key, data in self.storage.items():
if data["namespace"] == namespace:
keys_to_remove.append(storage_key)
for key in keys_to_remove:
del self.storage[key]
del self.metadata_storage[key]
cleared_count += 1
return cleared_count
def get_namespaces(self) -> List[str]:
"""获取所有命名空间"""
namespaces = set()
for data in self.storage.values():
namespaces.add(data["namespace"])
return list(namespaces)
订阅机制实现
1. 资源订阅管理器
import asyncio
from typing import Set, Callable, Dict, List, Any
from dataclasses import dataclass
from enum import Enum
import weakref
class SubscriptionEventType(Enum):
"""订阅事件类型"""
RESOURCE_CREATED = "resource_created"
RESOURCE_UPDATED = "resource_updated"
RESOURCE_DELETED = "resource_deleted"
RESOURCE_ACCESSED = "resource_accessed"
@dataclass
class SubscriptionEvent:
"""订阅事件"""
event_type: SubscriptionEventType
resource_uri: str
timestamp: datetime
data: Optional[Dict[str, Any]] = None
metadata: Optional[ResourceMetadata] = None
class ResourceSubscription:
"""资源订阅"""
def __init__(self,
subscription_id: str,
resource_uri: str,
event_types: Set[SubscriptionEventType],
callback: Callable[[SubscriptionEvent], None]):
self.subscription_id = subscription_id
self.resource_uri = resource_uri
self.event_types = event_types
self.callback = callback
self.created_at = datetime.now()
self.last_event_at: Optional[datetime] = None
self.event_count = 0
self.is_active = True
class SubscriptionManager:
"""订阅管理器"""
def __init__(self):
self.subscriptions: Dict[str, ResourceSubscription] = {}
self.uri_subscriptions: Dict[str, Set[str]] = {} # URI -> subscription_ids
self.event_queue: asyncio.Queue = asyncio.Queue()
self.event_processor_task: Optional[asyncio.Task] = None
self.is_running = False
def start(self):
"""启动订阅管理器"""
if not self.is_running:
self.is_running = True
self.event_processor_task = asyncio.create_task(self._process_events())
async def stop(self):
"""停止订阅管理器"""
self.is_running = False
if self.event_processor_task:
self.event_processor_task.cancel()
try:
await self.event_processor_task
except asyncio.CancelledError:
pass
def subscribe(self,
resource_uri: str,
event_types: Set[SubscriptionEventType],
callback: Callable[[SubscriptionEvent], None]) -> str:
"""订阅资源"""
subscription_id = f"sub_{int(datetime.now().timestamp() * 1000)}_{len(self.subscriptions)}"
subscription = ResourceSubscription(
subscription_id=subscription_id,
resource_uri=resource_uri,
event_types=event_types,
callback=callback
)
self.subscriptions[subscription_id] = subscription
# 更新URI索引
if resource_uri not in self.uri_subscriptions:
self.uri_subscriptions[resource_uri] = set()
self.uri_subscriptions[resource_uri].add(subscription_id)
return subscription_id
def unsubscribe(self, subscription_id: str) -> bool:
"""取消订阅"""
if subscription_id not in self.subscriptions:
return False
subscription = self.subscriptions[subscription_id]
resource_uri = subscription.resource_uri
# 移除订阅
del self.subscriptions[subscription_id]
# 更新URI索引
if resource_uri in self.uri_subscriptions:
self.uri_subscriptions[resource_uri].discard(subscription_id)
if not self.uri_subscriptions[resource_uri]:
del self.uri_subscriptions[resource_uri]
return True
def unsubscribe_all(self, resource_uri: str) -> int:
"""取消资源的所有订阅"""
if resource_uri not in self.uri_subscriptions:
return 0
subscription_ids = list(self.uri_subscriptions[resource_uri])
count = 0
for subscription_id in subscription_ids:
if self.unsubscribe(subscription_id):
count += 1
return count
async def notify(self, event: SubscriptionEvent):
"""通知订阅者"""
await self.event_queue.put(event)
async def _process_events(self):
"""处理事件队列"""
while self.is_running:
try:
# 等待事件,设置超时避免无限阻塞
event = await asyncio.wait_for(self.event_queue.get(), timeout=1.0)
await self._dispatch_event(event)
except asyncio.TimeoutError:
continue
except Exception as e:
print(f"处理事件时出错: {e}")
async def _dispatch_event(self, event: SubscriptionEvent):
"""分发事件给订阅者"""
resource_uri = event.resource_uri
if resource_uri not in self.uri_subscriptions:
return
subscription_ids = list(self.uri_subscriptions[resource_uri])
for subscription_id in subscription_ids:
subscription = self.subscriptions.get(subscription_id)
if not subscription or not subscription.is_active:
continue
if event.event_type not in subscription.event_types:
continue
try:
# 异步调用回调函数
if asyncio.iscoroutinefunction(subscription.callback):
await subscription.callback(event)
else:
subscription.callback(event)
# 更新订阅统计
subscription.last_event_at = datetime.now()
subscription.event_count += 1
except Exception as e:
print(f"订阅回调执行失败 {subscription_id}: {e}")
# 可以选择禁用有问题的订阅
# subscription.is_active = False
def get_subscription_stats(self) -> Dict[str, Any]:
"""获取订阅统计信息"""
active_subscriptions = sum(1 for s in self.subscriptions.values() if s.is_active)
total_events = sum(s.event_count for s in self.subscriptions.values())
# 按资源URI分组统计
uri_stats = {}
for uri, subscription_ids in self.uri_subscriptions.items():
active_count = sum(
1 for sid in subscription_ids
if sid in self.subscriptions and self.subscriptions[sid].is_active
)
uri_stats[uri] = {
"total_subscriptions": len(subscription_ids),
"active_subscriptions": active_count
}
return {
"total_subscriptions": len(self.subscriptions),
"active_subscriptions": active_subscriptions,
"total_events_processed": total_events,
"subscribed_resources": len(self.uri_subscriptions),
"uri_statistics": uri_stats
}
def get_subscriptions_for_resource(self, resource_uri: str) -> List[Dict[str, Any]]:
"""获取资源的所有订阅信息"""
if resource_uri not in self.uri_subscriptions:
return []
subscription_ids = self.uri_subscriptions[resource_uri]
subscriptions_info = []
for subscription_id in subscription_ids:
subscription = self.subscriptions.get(subscription_id)
if subscription:
subscriptions_info.append({
"subscription_id": subscription_id,
"event_types": [et.value for et in subscription.event_types],
"created_at": subscription.created_at.isoformat(),
"last_event_at": subscription.last_event_at.isoformat() if subscription.last_event_at else None,
"event_count": subscription.event_count,
"is_active": subscription.is_active
})
return subscriptions_info
2. 资源变更监控
import asyncio
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from typing import Dict, Set
class FileSystemWatcher(FileSystemEventHandler):
"""文件系统监控器"""
def __init__(self, subscription_manager: SubscriptionManager,
uri_builder: ResourceURIBuilder):
self.subscription_manager = subscription_manager
self.uri_builder = uri_builder
self.watched_paths: Dict[str, str] = {} # path -> base_path
def on_created(self, event):
"""文件创建事件"""
if not event.is_directory:
asyncio.create_task(self._handle_file_event(
event.src_path, SubscriptionEventType.RESOURCE_CREATED
))
def on_modified(self, event):
"""文件修改事件"""
if not event.is_directory:
asyncio.create_task(self._handle_file_event(
event.src_path, SubscriptionEventType.RESOURCE_UPDATED
))
def on_deleted(self, event):
"""文件删除事件"""
if not event.is_directory:
asyncio.create_task(self._handle_file_event(
event.src_path, SubscriptionEventType.RESOURCE_DELETED
))
async def _handle_file_event(self, file_path: str, event_type: SubscriptionEventType):
"""处理文件事件"""
try:
# 找到对应的基础路径
base_path = None
for watched_path, watched_base in self.watched_paths.items():
if file_path.startswith(watched_path):
base_path = watched_base
break
if not base_path:
return
# 构建资源URI
relative_path = os.path.relpath(file_path, base_path)
uri = self.uri_builder.build_file_uri(relative_path)
# 创建事件
event = SubscriptionEvent(
event_type=event_type,
resource_uri=uri,
timestamp=datetime.now(),
data={"file_path": file_path}
)
# 通知订阅者
await self.subscription_manager.notify(event)
except Exception as e:
print(f"处理文件事件失败: {e}")
class ResourceMonitor:
"""资源监控器"""
def __init__(self, subscription_manager: SubscriptionManager):
self.subscription_manager = subscription_manager
self.uri_builder = ResourceURIBuilder()
self.file_observer = Observer()
self.file_watcher = FileSystemWatcher(subscription_manager, self.uri_builder)
self.watched_directories: Set[str] = set()
self.is_monitoring = False
def start_monitoring(self):
"""开始监控"""
if not self.is_monitoring:
self.file_observer.start()
self.is_monitoring = True
def stop_monitoring(self):
"""停止监控"""
if self.is_monitoring:
self.file_observer.stop()
self.file_observer.join()
self.is_monitoring = False
def watch_directory(self, directory_path: str, recursive: bool = True):
"""监控目录"""
if directory_path not in self.watched_directories:
self.file_observer.schedule(
self.file_watcher,
directory_path,
recursive=recursive
)
self.watched_directories.add(directory_path)
self.file_watcher.watched_paths[directory_path] = directory_path
def unwatch_directory(self, directory_path: str):
"""停止监控目录"""
if directory_path in self.watched_directories:
# watchdog 不提供直接的unschedule方法,需要重新创建observer
self.watched_directories.discard(directory_path)
self.file_watcher.watched_paths.pop(directory_path, None)
async def trigger_manual_event(self, resource_uri: str,
event_type: SubscriptionEventType,
data: Optional[Dict[str, Any]] = None):
"""手动触发事件"""
event = SubscriptionEvent(
event_type=event_type,
resource_uri=resource_uri,
timestamp=datetime.now(),
data=data
)
await self.subscription_manager.notify(event)
本章总结
本章详细介绍了MCP协议中的资源管理与订阅机制,包括:
核心内容
资源概念与类型
- 资源基础概念和特征
- 资源URI设计规范
- 资源元数据管理
资源管理器实现
- 基础资源管理器架构
- 文件资源提供者
- 内存资源提供者
- 缓存机制
订阅机制实现
- 资源订阅管理器
- 事件类型和处理
- 异步事件分发
资源变更监控
- 文件系统监控
- 自动事件触发
- 手动事件管理
最佳实践
资源设计
- 使用一致的URI命名规范
- 合理设计资源元数据
- 考虑资源的生命周期
性能优化
- 实现适当的缓存策略
- 使用异步处理机制
- 避免过度订阅
错误处理
- 处理资源不存在的情况
- 优雅处理订阅回调错误
- 实现重试机制
监控和维护
- 监控订阅统计信息
- 定期清理无效订阅
- 记录资源访问日志
下一章我们将学习提示模板与动态生成,了解如何在MCP协议中管理和使用提示模板。