架构设计最佳实践

1. 分层架构设计

# architecture/layers.py
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from dataclasses import dataclass
import asyncio
from enum import Enum

class LayerType(Enum):
    """层类型"""
    PRESENTATION = "presentation"  # 表示层
    APPLICATION = "application"    # 应用层
    DOMAIN = "domain"             # 领域层
    INFRASTRUCTURE = "infrastructure"  # 基础设施层

@dataclass
class LayerContext:
    """层上下文"""
    request_id: str
    user_id: Optional[str] = None
    session_id: Optional[str] = None
    metadata: Dict[str, Any] = None

class Layer(ABC):
    """层基类"""
    
    def __init__(self, layer_type: LayerType):
        self.layer_type = layer_type
        self.dependencies: List['Layer'] = []
    
    @abstractmethod
    async def process(self, context: LayerContext, data: Any) -> Any:
        """处理请求"""
        pass
    
    def add_dependency(self, layer: 'Layer'):
        """添加依赖层"""
        self.dependencies.append(layer)
    
    async def validate_dependencies(self, context: LayerContext) -> bool:
        """验证依赖"""
        for dep in self.dependencies:
            if not await dep.health_check(context):
                return False
        return False
    
    async def health_check(self, context: LayerContext) -> bool:
        """健康检查"""
        return True

class PresentationLayer(Layer):
    """表示层"""
    
    def __init__(self):
        super().__init__(LayerType.PRESENTATION)
    
    async def process(self, context: LayerContext, data: Any) -> Any:
        """处理HTTP请求"""
        # 请求验证
        if not await self._validate_request(data):
            raise ValueError("Invalid request")
        
        # 调用应用层
        result = await self._call_application_layer(context, data)
        
        # 格式化响应
        return await self._format_response(result)
    
    async def _validate_request(self, data: Any) -> bool:
        """验证请求"""
        return True
    
    async def _call_application_layer(self, context: LayerContext, data: Any) -> Any:
        """调用应用层"""
        for dep in self.dependencies:
            if dep.layer_type == LayerType.APPLICATION:
                return await dep.process(context, data)
        raise RuntimeError("No application layer found")
    
    async def _format_response(self, result: Any) -> Dict[str, Any]:
        """格式化响应"""
        return {
            "status": "success",
            "data": result,
            "timestamp": asyncio.get_event_loop().time()
        }

class ApplicationLayer(Layer):
    """应用层"""
    
    def __init__(self):
        super().__init__(LayerType.APPLICATION)
    
    async def process(self, context: LayerContext, data: Any) -> Any:
        """处理业务逻辑"""
        # 业务逻辑编排
        domain_result = await self._call_domain_layer(context, data)
        
        # 应用服务处理
        return await self._apply_business_rules(context, domain_result)
    
    async def _call_domain_layer(self, context: LayerContext, data: Any) -> Any:
        """调用领域层"""
        for dep in self.dependencies:
            if dep.layer_type == LayerType.DOMAIN:
                return await dep.process(context, data)
        raise RuntimeError("No domain layer found")
    
    async def _apply_business_rules(self, context: LayerContext, data: Any) -> Any:
        """应用业务规则"""
        # 这里可以添加跨领域的业务逻辑
        return data

class DomainLayer(Layer):
    """领域层"""
    
    def __init__(self):
        super().__init__(LayerType.DOMAIN)
    
    async def process(self, context: LayerContext, data: Any) -> Any:
        """处理核心业务逻辑"""
        # 领域模型处理
        return await self._execute_domain_logic(context, data)
    
    async def _execute_domain_logic(self, context: LayerContext, data: Any) -> Any:
        """执行领域逻辑"""
        # 核心业务逻辑实现
        return {"processed": True, "data": data}

class InfrastructureLayer(Layer):
    """基础设施层"""
    
    def __init__(self):
        super().__init__(LayerType.INFRASTRUCTURE)
    
    async def process(self, context: LayerContext, data: Any) -> Any:
        """处理基础设施相关操作"""
        # 数据持久化、外部服务调用等
        return await self._handle_infrastructure(context, data)
    
    async def _handle_infrastructure(self, context: LayerContext, data: Any) -> Any:
        """处理基础设施操作"""
        # 模拟数据库操作
        return {"saved": True, "id": "123"}

class ArchitectureBuilder:
    """架构构建器"""
    
    def __init__(self):
        self.layers: Dict[LayerType, Layer] = {}
    
    def add_layer(self, layer: Layer) -> 'ArchitectureBuilder':
        """添加层"""
        self.layers[layer.layer_type] = layer
        return self
    
    def build(self) -> Dict[LayerType, Layer]:
        """构建架构"""
        # 设置层依赖关系
        if LayerType.PRESENTATION in self.layers and LayerType.APPLICATION in self.layers:
            self.layers[LayerType.PRESENTATION].add_dependency(self.layers[LayerType.APPLICATION])
        
        if LayerType.APPLICATION in self.layers and LayerType.DOMAIN in self.layers:
            self.layers[LayerType.APPLICATION].add_dependency(self.layers[LayerType.DOMAIN])
        
        if LayerType.DOMAIN in self.layers and LayerType.INFRASTRUCTURE in self.layers:
            self.layers[LayerType.DOMAIN].add_dependency(self.layers[LayerType.INFRASTRUCTURE])
        
        return self.layers

2. 领域驱动设计实践

# domain/entities.py
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass, field
from datetime import datetime
import uuid

@dataclass
class DomainEvent:
    """领域事件基类"""
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    occurred_at: datetime = field(default_factory=datetime.now)
    event_type: str = ""
    aggregate_id: str = ""
    version: int = 1
    data: Dict[str, Any] = field(default_factory=dict)

class Entity(ABC):
    """实体基类"""
    
    def __init__(self, entity_id: str):
        self._id = entity_id
        self._domain_events: List[DomainEvent] = []
    
    @property
    def id(self) -> str:
        return self._id
    
    def add_domain_event(self, event: DomainEvent):
        """添加领域事件"""
        self._domain_events.append(event)
    
    def clear_domain_events(self):
        """清除领域事件"""
        self._domain_events.clear()
    
    def get_domain_events(self) -> List[DomainEvent]:
        """获取领域事件"""
        return self._domain_events.copy()
    
    def __eq__(self, other):
        if not isinstance(other, Entity):
            return False
        return self._id == other._id
    
    def __hash__(self):
        return hash(self._id)

class ValueObject(ABC):
    """值对象基类"""
    
    @abstractmethod
    def __eq__(self, other):
        pass
    
    @abstractmethod
    def __hash__(self):
        pass

@dataclass(frozen=True)
class ToolId(ValueObject):
    """工具ID值对象"""
    value: str
    
    def __post_init__(self):
        if not self.value or not self.value.strip():
            raise ValueError("Tool ID cannot be empty")
    
    def __eq__(self, other):
        return isinstance(other, ToolId) and self.value == other.value
    
    def __hash__(self):
        return hash(self.value)

@dataclass(frozen=True)
class ToolName(ValueObject):
    """工具名称值对象"""
    value: str
    
    def __post_init__(self):
        if not self.value or len(self.value.strip()) < 2:
            raise ValueError("Tool name must be at least 2 characters")
        if len(self.value) > 100:
            raise ValueError("Tool name cannot exceed 100 characters")
    
    def __eq__(self, other):
        return isinstance(other, ToolName) and self.value == other.value
    
    def __hash__(self):
        return hash(self.value)

class Tool(Entity):
    """工具聚合根"""
    
    def __init__(self, tool_id: ToolId, name: ToolName, description: str):
        super().__init__(tool_id.value)
        self._tool_id = tool_id
        self._name = name
        self._description = description
        self._is_active = True
        self._created_at = datetime.now()
        self._updated_at = datetime.now()
        
        # 添加工具创建事件
        self.add_domain_event(DomainEvent(
            event_type="ToolCreated",
            aggregate_id=self._id,
            data={
                "name": name.value,
                "description": description
            }
        ))
    
    @property
    def tool_id(self) -> ToolId:
        return self._tool_id
    
    @property
    def name(self) -> ToolName:
        return self._name
    
    @property
    def description(self) -> str:
        return self._description
    
    @property
    def is_active(self) -> bool:
        return self._is_active
    
    def update_description(self, new_description: str):
        """更新工具描述"""
        if new_description != self._description:
            old_description = self._description
            self._description = new_description
            self._updated_at = datetime.now()
            
            self.add_domain_event(DomainEvent(
                event_type="ToolDescriptionUpdated",
                aggregate_id=self._id,
                data={
                    "old_description": old_description,
                    "new_description": new_description
                }
            ))
    
    def activate(self):
        """激活工具"""
        if not self._is_active:
            self._is_active = True
            self._updated_at = datetime.now()
            
            self.add_domain_event(DomainEvent(
                event_type="ToolActivated",
                aggregate_id=self._id
            ))
    
    def deactivate(self):
        """停用工具"""
        if self._is_active:
            self._is_active = False
            self._updated_at = datetime.now()
            
            self.add_domain_event(DomainEvent(
                event_type="ToolDeactivated",
                aggregate_id=self._id
            ))

# domain/repositories.py
class Repository(ABC):
    """仓储基类"""
    
    @abstractmethod
    async def save(self, entity: Entity) -> None:
        """保存实体"""
        pass
    
    @abstractmethod
    async def find_by_id(self, entity_id: str) -> Optional[Entity]:
        """根据ID查找实体"""
        pass
    
    @abstractmethod
    async def delete(self, entity: Entity) -> None:
        """删除实体"""
        pass

class ToolRepository(Repository):
    """工具仓储接口"""
    
    @abstractmethod
    async def find_by_name(self, name: ToolName) -> Optional[Tool]:
        """根据名称查找工具"""
        pass
    
    @abstractmethod
    async def find_active_tools(self) -> List[Tool]:
        """查找所有活跃工具"""
        pass

# infrastructure/repositories.py
import asyncio
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete
from typing import Dict, Any

class SQLAlchemyToolRepository(ToolRepository):
    """基于SQLAlchemy的工具仓储实现"""
    
    def __init__(self, session: AsyncSession):
        self._session = session
    
    async def save(self, tool: Tool) -> None:
        """保存工具"""
        # 这里应该有ORM映射逻辑
        # 简化示例
        tool_data = {
            "id": tool.id,
            "name": tool.name.value,
            "description": tool.description,
            "is_active": tool.is_active
        }
        
        # 模拟保存到数据库
        await asyncio.sleep(0.01)  # 模拟数据库操作
        
        # 发布领域事件
        await self._publish_domain_events(tool)
    
    async def find_by_id(self, tool_id: str) -> Optional[Tool]:
        """根据ID查找工具"""
        # 模拟从数据库查询
        await asyncio.sleep(0.01)
        
        # 这里应该从数据库查询并重建聚合
        # 简化示例
        return None
    
    async def find_by_name(self, name: ToolName) -> Optional[Tool]:
        """根据名称查找工具"""
        await asyncio.sleep(0.01)
        return None
    
    async def find_active_tools(self) -> List[Tool]:
        """查找所有活跃工具"""
        await asyncio.sleep(0.01)
        return []
    
    async def delete(self, tool: Tool) -> None:
        """删除工具"""
        await asyncio.sleep(0.01)
    
    async def _publish_domain_events(self, entity: Entity):
        """发布领域事件"""
        events = entity.get_domain_events()
        for event in events:
            # 这里应该发布到事件总线
            print(f"Publishing event: {event.event_type}")
        entity.clear_domain_events()

class InMemoryToolRepository(ToolRepository):
    """内存工具仓储实现"""
    
    def __init__(self):
        self._tools: Dict[str, Tool] = {}
    
    async def save(self, tool: Tool) -> None:
        """保存工具"""
        self._tools[tool.id] = tool
        await self._publish_domain_events(tool)
    
    async def find_by_id(self, tool_id: str) -> Optional[Tool]:
        """根据ID查找工具"""
        return self._tools.get(tool_id)
    
    async def find_by_name(self, name: ToolName) -> Optional[Tool]:
        """根据名称查找工具"""
        for tool in self._tools.values():
            if tool.name == name:
                return tool
        return None
    
    async def find_active_tools(self) -> List[Tool]:
        """查找所有活跃工具"""
        return [tool for tool in self._tools.values() if tool.is_active]
    
    async def delete(self, tool: Tool) -> None:
        """删除工具"""
        if tool.id in self._tools:
            del self._tools[tool.id]
    
    async def _publish_domain_events(self, entity: Entity):
        """发布领域事件"""
        events = entity.get_domain_events()
        for event in events:
            print(f"Publishing event: {event.event_type}")
        entity.clear_domain_events()

实际案例分析

1. 智能客服系统

# 智能客服系统MCP实现
from typing import Dict, List, Any, Optional
import asyncio
from dataclasses import dataclass
from enum import Enum

class IntentType(Enum):
    """意图类型"""
    GREETING = "greeting"
    QUESTION = "question"
    COMPLAINT = "complaint"
    ORDER_INQUIRY = "order_inquiry"
    TECHNICAL_SUPPORT = "technical_support"
    UNKNOWN = "unknown"

@dataclass
class CustomerMessage:
    """客户消息"""
    message_id: str
    customer_id: str
    content: str
    timestamp: float
    channel: str  # web, mobile, wechat, etc.

@dataclass
class IntentAnalysisResult:
    """意图分析结果"""
    intent: IntentType
    confidence: float
    entities: Dict[str, Any]
    suggested_actions: List[str]

class CustomerServiceMCPServer:
    """智能客服MCP服务器"""
    
    def __init__(self):
        self.knowledge_base = self._load_knowledge_base()
        self.intent_classifier = self._load_intent_classifier()
    
    async def handle_customer_message(self, message: CustomerMessage) -> Dict[str, Any]:
        """处理客户消息"""
        # 1. 意图识别
        intent_result = await self._analyze_intent(message.content)
        
        # 2. 实体提取
        entities = await self._extract_entities(message.content)
        
        # 3. 生成回复
        response = await self._generate_response(intent_result, entities, message)
        
        # 4. 记录对话历史
        await self._log_conversation(message, response)
        
        return {
            "response": response,
            "intent": intent_result.intent.value,
            "confidence": intent_result.confidence,
            "entities": entities
        }
    
    async def _analyze_intent(self, content: str) -> IntentAnalysisResult:
        """分析用户意图"""
        # 简化的意图分析逻辑
        content_lower = content.lower()
        
        if any(word in content_lower for word in ["你好", "hello", "hi"]):
            return IntentAnalysisResult(
                intent=IntentType.GREETING,
                confidence=0.95,
                entities={},
                suggested_actions=["send_greeting"]
            )
        elif any(word in content_lower for word in ["问题", "怎么", "如何", "?", "?"]):
            return IntentAnalysisResult(
                intent=IntentType.QUESTION,
                confidence=0.85,
                entities={},
                suggested_actions=["search_knowledge_base", "escalate_to_human"]
            )
        elif any(word in content_lower for word in ["投诉", "不满", "问题"]):
            return IntentAnalysisResult(
                intent=IntentType.COMPLAINT,
                confidence=0.90,
                entities={},
                suggested_actions=["escalate_to_human", "collect_feedback"]
            )
        else:
            return IntentAnalysisResult(
                intent=IntentType.UNKNOWN,
                confidence=0.30,
                entities={},
                suggested_actions=["ask_clarification", "escalate_to_human"]
            )
    
    async def _extract_entities(self, content: str) -> Dict[str, Any]:
        """提取实体信息"""
        entities = {}
        
        # 简化的实体提取逻辑
        import re
        
        # 提取订单号
        order_pattern = r'订单号?[::]?\s*([A-Z0-9]{8,})'
        order_match = re.search(order_pattern, content)
        if order_match:
            entities["order_id"] = order_match.group(1)
        
        # 提取电话号码
        phone_pattern = r'1[3-9]\d{9}'
        phone_match = re.search(phone_pattern, content)
        if phone_match:
            entities["phone"] = phone_match.group(0)
        
        return entities
    
    async def _generate_response(self, intent_result: IntentAnalysisResult, 
                               entities: Dict[str, Any], 
                               message: CustomerMessage) -> str:
        """生成回复"""
        if intent_result.intent == IntentType.GREETING:
            return "您好!我是智能客服助手,很高兴为您服务。请问有什么可以帮助您的吗?"
        
        elif intent_result.intent == IntentType.QUESTION:
            # 搜索知识库
            answer = await self._search_knowledge_base(message.content)
            if answer:
                return f"根据您的问题,我找到了以下信息:\n{answer}"
            else:
                return "抱歉,我暂时无法回答您的问题。正在为您转接人工客服..."
        
        elif intent_result.intent == IntentType.COMPLAINT:
            return "非常抱歉给您带来了不便。我已经记录了您的反馈,并将立即为您转接专门的客服人员处理。"
        
        elif intent_result.intent == IntentType.ORDER_INQUIRY:
            if "order_id" in entities:
                order_info = await self._query_order_info(entities["order_id"])
                return f"您的订单信息如下:\n{order_info}"
            else:
                return "请提供您的订单号,我来帮您查询订单状态。"
        
        else:
            return "抱歉,我没有完全理解您的问题。能否请您详细描述一下,或者我为您转接人工客服?"
    
    async def _search_knowledge_base(self, query: str) -> Optional[str]:
        """搜索知识库"""
        # 简化的知识库搜索
        knowledge_items = [
            {"keywords": ["退货", "退款"], "answer": "退货退款流程:1. 登录账户 2. 找到订单 3. 申请退货 4. 等待审核"},
            {"keywords": ["配送", "物流"], "answer": "我们提供多种配送方式,一般3-7个工作日送达。您可以在订单详情中查看物流信息。"},
            {"keywords": ["支付", "付款"], "answer": "我们支持微信支付、支付宝、银行卡等多种支付方式。"}
        ]
        
        query_lower = query.lower()
        for item in knowledge_items:
            if any(keyword in query_lower for keyword in item["keywords"]):
                return item["answer"]
        
        return None
    
    async def _query_order_info(self, order_id: str) -> str:
        """查询订单信息"""
        # 模拟订单查询
        await asyncio.sleep(0.1)
        return f"订单号:{order_id}\n状态:已发货\n预计送达:明天下午\n物流单号:SF1234567890"
    
    async def _log_conversation(self, message: CustomerMessage, response: str):
        """记录对话历史"""
        # 记录到数据库或日志系统
        print(f"Logged conversation: {message.customer_id} -> {response[:50]}...")
    
    def _load_knowledge_base(self) -> Dict[str, Any]:
        """加载知识库"""
        return {}
    
    def _load_intent_classifier(self) -> Any:
        """加载意图分类器"""
        return None

2. 代码助手系统

# 代码助手系统MCP实现
from typing import Dict, List, Any, Optional, Union
from dataclasses import dataclass
from enum import Enum
import re
import ast

class CodeLanguage(Enum):
    """编程语言"""
    PYTHON = "python"
    JAVASCRIPT = "javascript"
    JAVA = "java"
    GO = "go"
    RUST = "rust"
    UNKNOWN = "unknown"

class AnalysisType(Enum):
    """分析类型"""
    SYNTAX = "syntax"
    STYLE = "style"
    SECURITY = "security"
    PERFORMANCE = "performance"
    COMPLEXITY = "complexity"

@dataclass
class CodeAnalysisResult:
    """代码分析结果"""
    language: CodeLanguage
    analysis_type: AnalysisType
    issues: List[Dict[str, Any]]
    suggestions: List[str]
    metrics: Dict[str, Any]
    score: float  # 0-100

class CodeAnalyzer(ABC):
    """代码分析器基类"""
    
    @property
    @abstractmethod
    def supported_language(self) -> CodeLanguage:
        """支持的编程语言"""
        pass
    
    @property
    @abstractmethod
    def default_test_framework(self) -> str:
        """默认测试框架"""
        pass
    
    @abstractmethod
    async def analyze(self, code: str, analysis_type: AnalysisType) -> CodeAnalysisResult:
        """分析代码"""
        pass
    
    @abstractmethod
    async def format_code(self, code: str, style_config: Dict[str, Any]) -> str:
        """格式化代码"""
        pass
    
    @abstractmethod
    async def generate_tests(self, code: str, test_framework: str = None) -> str:
        """生成测试代码"""
        pass
    
    @abstractmethod
    async def explain_code(self, code: str, detail_level: str) -> Dict[str, Any]:
        """解释代码"""
        pass

class PythonCodeAnalyzer(CodeAnalyzer):
    """Python代码分析器"""
    
    @property
    def supported_language(self) -> CodeLanguage:
        return CodeLanguage.PYTHON
    
    @property
    def default_test_framework(self) -> str:
        return "pytest"
    
    async def analyze(self, code: str, analysis_type: AnalysisType) -> CodeAnalysisResult:
        """分析Python代码"""
        if analysis_type == AnalysisType.SYNTAX:
            return await self._analyze_syntax(code)
        elif analysis_type == AnalysisType.STYLE:
            return await self._analyze_style(code)
        elif analysis_type == AnalysisType.SECURITY:
            return await self._analyze_security(code)
        elif analysis_type == AnalysisType.PERFORMANCE:
            return await self._analyze_performance(code)
        elif analysis_type == AnalysisType.COMPLEXITY:
            return await self._analyze_complexity(code)
        else:
            raise ValueError(f"Unsupported analysis type: {analysis_type}")
    
    async def _analyze_syntax(self, code: str) -> CodeAnalysisResult:
        """语法分析"""
        issues = []
        suggestions = []
        
        try:
            ast.parse(code)
        except SyntaxError as e:
            issues.append({
                "type": "syntax_error",
                "message": str(e),
                "line": e.lineno,
                "severity": "error"
            })
            suggestions.append(f"Fix syntax error at line {e.lineno}: {e.msg}")
        
        score = 100 if not issues else 0
        
        return CodeAnalysisResult(
            language=CodeLanguage.PYTHON,
            analysis_type=AnalysisType.SYNTAX,
            issues=issues,
            metrics={"syntax_errors": len(issues)},
            suggestions=suggestions,
            score=score
        )
    
    async def _analyze_style(self, code: str) -> CodeAnalysisResult:
        """代码风格分析"""
        issues = []
        suggestions = []
        
        lines = code.split('\n')
        
        # 检查行长度
        for i, line in enumerate(lines, 1):
            if len(line) > 88:  # PEP 8 建议
                issues.append({
                    "type": "line_too_long",
                    "message": f"Line {i} is too long ({len(line)} characters)",
                    "line": i,
                    "severity": "warning"
                })
        
        # 检查导入顺序
        import_lines = [line for line in lines if line.strip().startswith(('import ', 'from '))]
        if len(import_lines) > 1:
            # 简化的导入顺序检查
            if not self._check_import_order(import_lines):
                issues.append({
                    "type": "import_order",
                    "message": "Imports are not properly ordered",
                    "severity": "info"
                })
            
            suggestions.append("Consider organizing imports according to PEP 8 (standard library, third-party, local)")
        
        # 检查函数命名
        function_pattern = r'def\s+([A-Z][a-zA-Z0-9_]*)'  # 驼峰命名
        if re.search(function_pattern, code):
            issues.append({
                "type": "naming_convention",
                "message": "Function names should use snake_case, not camelCase",
                "severity": "warning"
            })
        
        score = max(0, 100 - len(issues) * 10)
        
        return CodeAnalysisResult(
            language=CodeLanguage.PYTHON,
            analysis_type=AnalysisType.STYLE,
            issues=issues,
            metrics={"style_issues": len(issues), "line_count": len(lines)},
            suggestions=suggestions,
            score=score
        )
    
    def _check_import_order(self, import_lines: List[str]) -> bool:
        """检查导入顺序"""
        # 简化的导入顺序检查
        return True
    
    async def _analyze_security(self, code: str) -> CodeAnalysisResult:
        """安全分析"""
        issues = []
        suggestions = []
        
        # 检查危险函数调用
        dangerous_patterns = [
            (r'eval\s*\(', "Use of eval() is dangerous"),
            (r'exec\s*\(', "Use of exec() is dangerous"),
            (r'subprocess\.call\s*\(.*shell\s*=\s*True', "shell=True in subprocess is risky"),
            (r'os\.system\s*\(', "Use of os.system() is dangerous")
        ]
        
        for pattern, message in dangerous_patterns:
            if re.search(pattern, code):
                issues.append({
                    "type": "security_risk",
                    "message": message,
                    "severity": "high"
                })
                suggestions.append(f"Avoid {message.lower()}")
        
        # 检查硬编码密码
        password_patterns = [
            r'password\s*=\s*["\'][^"\'
]+["\']',
            r'secret\s*=\s*["\'][^"\'
]+["\']',
            r'api_key\s*=\s*["\'][^"\'
]+["\']'
        ]
        
        for pattern in password_patterns:
            if re.search(pattern, code, re.IGNORECASE):
                issues.append({
                    "type": "hardcoded_secret",
                    "message": "Hardcoded secrets detected",
                    "severity": "high"
                })
                suggestions.append("Use environment variables or secure vaults for secrets")
                break
        
        score = max(0, 100 - len(issues) * 20)
        
        return CodeAnalysisResult(
            language=CodeLanguage.PYTHON,
            analysis_type=AnalysisType.SECURITY,
            issues=issues,
            metrics={"security_issues": len(issues)},
            suggestions=suggestions,
            score=score
        )
    
    async def _analyze_performance(self, code: str) -> CodeAnalysisResult:
        """性能分析"""
        issues = []
        suggestions = []
        
        # 检查循环中的字符串拼接
        if re.search(r'for\s+.*:\s*\n\s*.*\+=.*["\']', code):
            issues.append({
                "type": "string_concatenation_in_loop",
                "message": "String concatenation in loop is inefficient",
                "severity": "medium"
            })
            suggestions.append("Use list.join() or f-strings instead of += in loops")
        
        # 检查全局变量访问
        global_pattern = r'global\s+\w+'
        if re.search(global_pattern, code):
            issues.append({
                "type": "global_variable_access",
                "message": "Global variable access can impact performance",
                "severity": "low"
            })
            suggestions.append("Consider passing variables as parameters instead of using global")
        
        score = max(0, 100 - len(issues) * 15)
        
        return CodeAnalysisResult(
            language=CodeLanguage.PYTHON,
            analysis_type=AnalysisType.PERFORMANCE,
            issues=issues,
            metrics={"performance_issues": len(issues)},
            suggestions=suggestions,
            score=score
        )
    
    async def _analyze_complexity(self, code: str) -> CodeAnalysisResult:
        """复杂度分析"""
        issues = []
        suggestions = []
        
        try:
            tree = ast.parse(code)
            complexity_analyzer = ComplexityAnalyzer()
            complexity_analyzer.visit(tree)
            
            for func_name, complexity in complexity_analyzer.complexities.items():
                if complexity > 10:
                    issues.append({
                        "type": "high_complexity",
                        "message": f"Function '{func_name}' has high cyclomatic complexity ({complexity})",
                        "severity": "medium"
                    })
                    suggestions.append(f"Consider refactoring '{func_name}' to reduce complexity")
            
            avg_complexity = sum(complexity_analyzer.complexities.values()) / len(complexity_analyzer.complexities) if complexity_analyzer.complexities else 0
            
        except SyntaxError:
            avg_complexity = 0
        
        score = max(0, 100 - len(issues) * 10)
        
        return CodeAnalysisResult(
            language=CodeLanguage.PYTHON,
            analysis_type=AnalysisType.COMPLEXITY,
            issues=issues,
            metrics={"average_complexity": avg_complexity, "complex_functions": len(issues)},
            suggestions=suggestions,
            score=score
        )
    
    async def format_code(self, code: str, style_config: Dict[str, Any] = None) -> str:
        """格式化Python代码"""
        if style_config is None:
            style_config = {"line_length": 88, "indent_size": 4}
        
        # 简化的代码格式化
        lines = code.split('\n')
        formatted_lines = []
        
        for line in lines:
            # 移除行尾空格
            line = line.rstrip()
            # 简单的缩进标准化
            if line.strip():
                indent_level = len(line) - len(line.lstrip())
                normalized_indent = ' ' * (indent_level // 4 * style_config['indent_size'])
                formatted_lines.append(normalized_indent + line.lstrip())
            else:
                formatted_lines.append('')
        
        return '\n'.join(formatted_lines)
    
    async def generate_tests(self, code: str, test_framework: str = None) -> str:
        """生成测试代码"""
        if test_framework is None:
            test_framework = self.default_test_framework
        
        # 简化的测试生成
        try:
            tree = ast.parse(code)
            test_generator = TestGenerator(test_framework)
            test_generator.visit(tree)
            return test_generator.generate_test_code()
        except SyntaxError:
            return f"# Unable to generate tests due to syntax errors\n# Please fix syntax errors first"
    
    async def explain_code(self, code: str, detail_level: str = "medium") -> Dict[str, Any]:
        """解释代码"""
        explanation = {
            "summary": "",
            "functions": [],
            "classes": [],
            "imports": [],
            "complexity": "low"
        }
        
        try:
            tree = ast.parse(code)
            explainer = CodeExplainer(detail_level)
            explainer.visit(tree)
            explanation = explainer.get_explanation()
        except SyntaxError as e:
            explanation["summary"] = f"Code contains syntax errors: {str(e)}"
        
        return explanation

class ComplexityAnalyzer(ast.NodeVisitor):
    """循环复杂度分析器"""
    
    def __init__(self):
        self.complexities = {}
        self.current_function = None
        self.current_complexity = 0
    
    def visit_FunctionDef(self, node):
        old_function = self.current_function
        old_complexity = self.current_complexity
        
        self.current_function = node.name
        self.current_complexity = 1  # 基础复杂度
        
        self.generic_visit(node)
        
        self.complexities[self.current_function] = self.current_complexity
        
        self.current_function = old_function
        self.current_complexity = old_complexity
    
    def visit_If(self, node):
        self.current_complexity += 1
        self.generic_visit(node)
    
    def visit_For(self, node):
        self.current_complexity += 1
        self.generic_visit(node)
    
    def visit_While(self, node):
        self.current_complexity += 1
        self.generic_visit(node)
    
    def visit_ExceptHandler(self, node):
        self.current_complexity += 1
        self.generic_visit(node)

class TestGenerator(ast.NodeVisitor):
    """测试代码生成器"""
    
    def __init__(self, framework: str):
        self.framework = framework
        self.functions = []
        self.classes = []
    
    def visit_FunctionDef(self, node):
        if not node.name.startswith('_'):  # 跳过私有函数
            self.functions.append({
                'name': node.name,
                'args': [arg.arg for arg in node.args.args],
                'returns': node.returns is not None
            })
        self.generic_visit(node)
    
    def visit_ClassDef(self, node):
        self.classes.append({
            'name': node.name,
            'methods': []
        })
        self.generic_visit(node)
    
    def generate_test_code(self) -> str:
        """生成测试代码"""
        if self.framework == "pytest":
            return self._generate_pytest_code()
        elif self.framework == "unittest":
            return self._generate_unittest_code()
        else:
            return f"# Unsupported test framework: {self.framework}"
    
    def _generate_pytest_code(self) -> str:
        """生成pytest测试代码"""
        test_code = ["import pytest", "from your_module import *", "", ""]
        
        for func in self.functions:
            test_code.append(f"def test_{func['name']}():")
            test_code.append(f"    # TODO: Implement test for {func['name']}")
            if func['args']:
                args_str = ', '.join([f"mock_{arg}" for arg in func['args']])
                test_code.append(f"    # Example: result = {func['name']}({args_str})")
            else:
                test_code.append(f"    # Example: result = {func['name']}()")
            test_code.append("    # assert result == expected_value")
            test_code.append("")
        
        return '\n'.join(test_code)
    
    def _generate_unittest_code(self) -> str:
        """生成unittest测试代码"""
        test_code = [
            "import unittest",
            "from your_module import *",
            "",
            "class TestYourModule(unittest.TestCase):",
            ""
        ]
        
        for func in self.functions:
            test_code.append(f"    def test_{func['name']}(self):")
            test_code.append(f"        # TODO: Implement test for {func['name']}")
            test_code.append("        pass")
            test_code.append("")
        
        test_code.extend([
            "if __name__ == '__main__':",
            "    unittest.main()"
        ])
        
        return '\n'.join(test_code)

class CodeExplainer(ast.NodeVisitor):
    """代码解释器"""
    
    def __init__(self, detail_level: str):
        self.detail_level = detail_level
        self.explanation = {
            "summary": "",
            "functions": [],
            "classes": [],
            "imports": [],
            "complexity": "low"
        }
    
    def visit_Import(self, node):
        for alias in node.names:
            self.explanation["imports"].append({
                "name": alias.name,
                "alias": alias.asname,
                "type": "import"
            })
    
    def visit_ImportFrom(self, node):
        for alias in node.names:
            self.explanation["imports"].append({
                "name": f"{node.module}.{alias.name}" if node.module else alias.name,
                "alias": alias.asname,
                "type": "from_import"
            })
    
    def visit_FunctionDef(self, node):
        func_info = {
            "name": node.name,
            "args": [arg.arg for arg in node.args.args],
            "docstring": ast.get_docstring(node),
            "line_count": node.end_lineno - node.lineno if hasattr(node, 'end_lineno') else 0
        }
        self.explanation["functions"].append(func_info)
        self.generic_visit(node)
    
    def visit_ClassDef(self, node):
        class_info = {
            "name": node.name,
            "docstring": ast.get_docstring(node),
            "methods": [],
            "line_count": node.end_lineno - node.lineno if hasattr(node, 'end_lineno') else 0
        }
        self.explanation["classes"].append(class_info)
        self.generic_visit(node)
    
    def get_explanation(self) -> Dict[str, Any]:
        """获取代码解释"""
        # 生成总结
        summary_parts = []
        
        if self.explanation["imports"]:
            summary_parts.append(f"导入了 {len(self.explanation['imports'])} 个模块")
        
        if self.explanation["classes"]:
            summary_parts.append(f"定义了 {len(self.explanation['classes'])} 个类")
        
        if self.explanation["functions"]:
            summary_parts.append(f"定义了 {len(self.explanation['functions'])} 个函数")
        
        self.explanation["summary"] = "这段代码" + ",".join(summary_parts) if summary_parts else "这段代码没有明显的结构"
        
        # 评估复杂度
        total_functions = len(self.explanation["functions"])
        total_classes = len(self.explanation["classes"])
        
        if total_functions + total_classes > 10:
            self.explanation["complexity"] = "high"
        elif total_functions + total_classes > 5:
            self.explanation["complexity"] = "medium"
        else:
            self.explanation["complexity"] = "low"
        
        return self.explanation

class JavaScriptCodeAnalyzer(CodeAnalyzer):
    """JavaScript代码分析器"""
    
    @property
    def supported_language(self) -> CodeLanguage:
        return CodeLanguage.JAVASCRIPT
    
    @property
    def default_test_framework(self) -> str:
        return "jest"
    
    async def analyze(self, code: str, analysis_type: AnalysisType) -> CodeAnalysisResult:
        """分析JavaScript代码"""
        # 简化的JavaScript分析实现
        issues = []
        suggestions = []
        
        if analysis_type == AnalysisType.STYLE:
            # 检查分号
            lines = code.split('\n')
            for i, line in enumerate(lines, 1):
                line = line.strip()
                if line and not line.endswith((';', '{', '}', ')', ']')) and not line.startswith(('if', 'for', 'while', 'function', 'class')):
                    issues.append({
                        "type": "missing_semicolon",
                        "message": f"Missing semicolon at line {i}",
                        "line": i,
                        "severity": "warning"
                    })
        
        score = max(0, 100 - len(issues) * 10)
        
        return CodeAnalysisResult(
            language=CodeLanguage.JAVASCRIPT,
            analysis_type=analysis_type,
            issues=issues,
            metrics={"issues_count": len(issues)},
            suggestions=suggestions,
            score=score
        )
    
    async def format_code(self, code: str, style_config: Dict[str, Any] = None) -> str:
        """格式化JavaScript代码"""
        # 简化的JavaScript格式化
        return code
    
    async def generate_tests(self, code: str, test_framework: str = None) -> str:
        """生成JavaScript测试代码"""
        if test_framework is None:
            test_framework = self.default_test_framework
        
        return f"// {test_framework} test template\n// TODO: Implement tests"
    
    async def explain_code(self, code: str, detail_level: str = "medium") -> Dict[str, Any]:
        """解释JavaScript代码"""
        return {
            "summary": "JavaScript代码分析",
            "functions": [],
            "classes": [],
            "complexity": "medium"
        }

class CodeAssistantMCPServer:
    """代码助手MCP服务器"""
    
    def __init__(self):
        self.analyzers = {
            CodeLanguage.PYTHON: PythonCodeAnalyzer(),
            CodeLanguage.JAVASCRIPT: JavaScriptCodeAnalyzer()
        }
    
    async def analyze_code(self, code: str, language: str, analysis_type: str) -> Dict[str, Any]:
        """分析代码"""
        try:
            lang_enum = CodeLanguage(language.lower())
            analysis_enum = AnalysisType(analysis_type.lower())
        except ValueError as e:
            return {"error": f"Invalid parameter: {str(e)}"}
        
        if lang_enum not in self.analyzers:
            return {"error": f"Unsupported language: {language}"}
        
        analyzer = self.analyzers[lang_enum]
        result = await analyzer.analyze(code, analysis_enum)
        
        return {
            "language": result.language.value,
            "analysis_type": result.analysis_type.value,
            "issues": result.issues,
            "suggestions": result.suggestions,
            "metrics": result.metrics,
            "score": result.score
        }
    
    async def format_code(self, code: str, language: str, style_config: Dict[str, Any] = None) -> Dict[str, Any]:
        """格式化代码"""
        try:
            lang_enum = CodeLanguage(language.lower())
        except ValueError:
            return {"error": f"Unsupported language: {language}"}
        
        if lang_enum not in self.analyzers:
            return {"error": f"Unsupported language: {language}"}
        
        analyzer = self.analyzers[lang_enum]
        formatted_code = await analyzer.format_code(code, style_config)
        
        return {
            "original_code": code,
            "formatted_code": formatted_code,
            "language": language
        }
    
    async def generate_tests(self, code: str, language: str, test_framework: str = None) -> Dict[str, Any]:
        """生成测试代码"""
        try:
            lang_enum = CodeLanguage(language.lower())
        except ValueError:
            return {"error": f"Unsupported language: {language}"}
        
        if lang_enum not in self.analyzers:
            return {"error": f"Unsupported language: {language}"}
        
        analyzer = self.analyzers[lang_enum]
        test_code = await analyzer.generate_tests(code, test_framework)
        
        return {
            "original_code": code,
            "test_code": test_code,
            "language": language,
            "test_framework": test_framework or analyzer.default_test_framework
        }
    
    async def explain_code(self, code: str, language: str, detail_level: str = "medium") -> Dict[str, Any]:
        """解释代码"""
        try:
            lang_enum = CodeLanguage(language.lower())
        except ValueError:
            return {"error": f"Unsupported language: {language}"}
        
        if lang_enum not in self.analyzers:
            return {"error": f"Unsupported language: {language}"}
        
        analyzer = self.analyzers[lang_enum]
        explanation = await analyzer.explain_code(code, detail_level)
        
        return {
             "code": code,
             "language": language,
             "explanation": explanation,
             "detail_level": detail_level
         }

最佳实践总结

1. 架构设计原则

  • 单一职责原则:每个组件只负责一个明确的功能
  • 开放封闭原则:对扩展开放,对修改封闭
  • 依赖倒置原则:依赖抽象而不是具体实现
  • 接口隔离原则:使用小而专一的接口
  • 里氏替换原则:子类可以替换父类

2. 代码质量保证

# 代码质量检查清单
class CodeQualityChecklist:
    """代码质量检查清单"""
    
    def __init__(self):
        self.checks = {
            "syntax": False,
            "style": False,
            "security": False,
            "performance": False,
            "tests": False,
            "documentation": False
        }
    
    def run_all_checks(self, code: str, language: str) -> Dict[str, Any]:
        """运行所有质量检查"""
        results = {}
        
        # 语法检查
        results["syntax"] = self._check_syntax(code, language)
        
        # 代码风格检查
        results["style"] = self._check_style(code, language)
        
        # 安全检查
        results["security"] = self._check_security(code, language)
        
        # 性能检查
        results["performance"] = self._check_performance(code, language)
        
        # 测试覆盖率检查
        results["tests"] = self._check_test_coverage(code)
        
        # 文档检查
        results["documentation"] = self._check_documentation(code)
        
        # 计算总体质量分数
        results["overall_score"] = self._calculate_overall_score(results)
        
        return results
    
    def _check_syntax(self, code: str, language: str) -> Dict[str, Any]:
        """语法检查"""
        return {"passed": True, "issues": [], "score": 100}
    
    def _check_style(self, code: str, language: str) -> Dict[str, Any]:
        """代码风格检查"""
        return {"passed": True, "issues": [], "score": 95}
    
    def _check_security(self, code: str, language: str) -> Dict[str, Any]:
        """安全检查"""
        return {"passed": True, "issues": [], "score": 90}
    
    def _check_performance(self, code: str, language: str) -> Dict[str, Any]:
        """性能检查"""
        return {"passed": True, "issues": [], "score": 85}
    
    def _check_test_coverage(self, code: str) -> Dict[str, Any]:
        """测试覆盖率检查"""
        return {"passed": True, "coverage": 80, "score": 80}
    
    def _check_documentation(self, code: str) -> Dict[str, Any]:
        """文档检查"""
        return {"passed": True, "coverage": 75, "score": 75}
    
    def _calculate_overall_score(self, results: Dict[str, Any]) -> float:
        """计算总体质量分数"""
        scores = []
        for key, result in results.items():
            if isinstance(result, dict) and "score" in result:
                scores.append(result["score"])
        
        return sum(scores) / len(scores) if scores else 0

3. 性能优化建议

  • 缓存策略:合理使用缓存减少重复计算
  • 异步处理:使用异步编程提高并发性能
  • 连接池:复用数据库和网络连接
  • 批量处理:减少网络往返次数
  • 内存管理:及时释放不需要的资源

4. 安全最佳实践

  • 输入验证:严格验证所有用户输入
  • 权限控制:实施最小权限原则
  • 数据加密:敏感数据必须加密存储和传输
  • 审计日志:记录所有重要操作
  • 定期更新:及时更新依赖库和框架

5. 监控和运维

# 运维最佳实践
class OperationsBestPractices:
    """运维最佳实践"""
    
    @staticmethod
    def setup_monitoring():
        """设置监控"""
        monitoring_config = {
            "metrics": {
                "response_time": {"threshold": 1000, "unit": "ms"},
                "error_rate": {"threshold": 0.01, "unit": "percentage"},
                "cpu_usage": {"threshold": 80, "unit": "percentage"},
                "memory_usage": {"threshold": 85, "unit": "percentage"}
            },
            "alerts": {
                "email": ["ops@company.com"],
                "slack": "#alerts",
                "escalation_time": 300  # 5分钟
            },
            "dashboards": {
                "grafana_url": "https://grafana.company.com",
                "panels": ["system", "application", "business"]
            }
        }
        return monitoring_config
    
    @staticmethod
    def setup_logging():
        """设置日志"""
        logging_config = {
            "level": "INFO",
            "format": "json",
            "fields": [
                "timestamp", "level", "message", 
                "request_id", "user_id", "session_id"
            ],
            "outputs": [
                {"type": "file", "path": "/var/log/mcp/app.log"},
                {"type": "elasticsearch", "index": "mcp-logs"},
                {"type": "stdout", "format": "human"}
            ],
            "retention": "30d",
            "rotation": "daily"
        }
        return logging_config
    
    @staticmethod
    def setup_backup():
        """设置备份"""
        backup_config = {
            "schedule": {
                "full_backup": "0 2 * * 0",  # 每周日凌晨2点
                "incremental_backup": "0 2 * * 1-6"  # 工作日凌晨2点
            },
            "retention": {
                "daily": 7,
                "weekly": 4,
                "monthly": 12
            },
            "storage": {
                "primary": "s3://backup-bucket/mcp/",
                "secondary": "gs://backup-bucket-secondary/mcp/"
            },
            "encryption": {
                "enabled": True,
                "algorithm": "AES-256",
                "key_rotation": "quarterly"
            }
        }
        return backup_config

6. 团队协作规范

  • 代码审查:所有代码必须经过同行审查
  • 版本控制:使用Git进行版本控制,遵循Git Flow
  • 文档维护:及时更新技术文档和API文档
  • 知识分享:定期进行技术分享和培训
  • 持续集成:自动化构建、测试和部署流程

本章总结

本章深入探讨了MCP协议的最佳实践与案例分析,主要内容包括:

  1. 架构设计最佳实践

    • 分层架构设计模式
    • 领域驱动设计实践
    • 依赖注入和控制反转
  2. 实际案例分析

    • 智能客服系统的MCP实现
    • 代码助手系统的设计与开发
    • 多语言代码分析器的架构
  3. 代码质量保证

    • 自动化代码检查
    • 多维度质量评估
    • 持续改进机制
  4. 性能优化策略

    • 缓存和异步处理
    • 资源管理和连接池
    • 监控和性能调优
  5. 安全和运维实践

    • 安全设计原则
    • 监控和日志管理
    • 备份和灾难恢复

通过这些最佳实践的应用,可以构建出高质量、高性能、高可用的MCP协议应用系统。在实际项目中,应该根据具体需求和约束条件,灵活运用这些实践经验,持续优化和改进系统架构。

至此,我们已经完成了MCP协议的完整学习之旅,从基础概念到高级应用,从理论知识到实践案例,希望这个教程能够帮助您深入理解和掌握MCP协议的精髓,在实际项目中发挥重要作用。

def __init__(self):
    super().__init__(LayerType.PRESENTATION)
    self.request_validators = []
    self.response_transformers = []

async def process(self, context: LayerContext, request_data: Dict[str, Any]) -> Dict[str, Any]:
    """处理HTTP请求"""
    # 请求验证
    validated_data = await self._validate_request(context, request_data)

    # 委托给应用层
    app_layer = self._get_application_layer()
    result = await app_layer.process(context, validated_data)

    # 响应转换
    response = await self._transform_response(context, result)

    return response

async def _validate_request(self, context: LayerContext, data: Dict[str, Any]) -> Dict[str, Any]:
    """验证请求"""
    for validator in self.request_validators:
        data = await validator.validate(context, data)
    return data

async def _transform_response(self, context: LayerContext, data: Any) -> Dict[str, Any]:
    """转换响应"""
    for transformer in self.response_transformers:
        data = await transformer.transform(context, data)
    return data

def _get_application_layer(self) -> 'ApplicationLayer':
    """获取应用层"""
    for dep in self.dependencies:
        if isinstance(dep, ApplicationLayer):
            return dep
    raise RuntimeError("Application layer not found")

class ApplicationLayer(Layer): “”“应用层 - 业务流程编排”“”

def __init__(self):
    super().__init__(LayerType.APPLICATION)
    self.use_cases = {}
    self.event_handlers = {}

async def process(self, context: LayerContext, command: Dict[str, Any]) -> Any:
    """处理业务命令"""
    command_type = command.get('type')

    if command_type not in self.use_cases:
        raise ValueError(f"Unknown command type: {command_type}")

    use_case = self.use_cases[command_type]

    # 执行用例
    result = await use_case.execute(context, command)

    # 发布事件
    await self._publish_events(context, use_case.get_events())

    return result

def register_use_case(self, command_type: str, use_case: 'UseCase'):
    """注册用例"""
    self.use_cases[command_type] = use_case

def register_event_handler(self, event_type: str, handler: 'EventHandler'):
    """注册事件处理器"""
    if event_type not in self.event_handlers:
        self.event_handlers[event_type] = []
    self.event_handlers[event_type].append(handler)

async def _publish_events(self, context: LayerContext, events: List['DomainEvent']):
    """发布事件"""
    for event in events:
        handlers = self.event_handlers.get(event.event_type, [])
        for handler in handlers:
            asyncio.create_task(handler.handle(context, event))

class DomainLayer(Layer): “”“领域层 - 核心业务逻辑”“”

def __init__(self):
    super().__init__(LayerType.DOMAIN)
    self.entities = {}
    self.value_objects = {}
    self.domain_services = {}

async def process(self, context: LayerContext, domain_command: Any) -> Any:
    """处理领域命令"""
    # 领域逻辑处理
    return await self._execute_domain_logic(context, domain_command)

async def _execute_domain_logic(self, context: LayerContext, command: Any) -> Any:
    """执行领域逻辑"""
    # 实现具体的领域逻辑
    pass

def register_entity(self, name: str, entity_class: type):
    """注册实体"""
    self.entities[name] = entity_class

def register_value_object(self, name: str, vo_class: type):
    """注册值对象"""
    self.value_objects[name] = vo_class

def register_domain_service(self, name: str, service: 'DomainService'):
    """注册领域服务"""
    self.domain_services[name] = service

class InfrastructureLayer(Layer): “”“基础设施层 - 技术实现”“”

def __init__(self):
    super().__init__(LayerType.INFRASTRUCTURE)
    self.repositories = {}
    self.external_services = {}
    self.message_brokers = {}

async def process(self, context: LayerContext, infrastructure_request: Any) -> Any:
    """处理基础设施请求"""
    # 基础设施操作
    return await self._execute_infrastructure_operation(context, infrastructure_request)

async def _execute_infrastructure_operation(self, context: LayerContext, request: Any) -> Any:
    """执行基础设施操作"""
    # 实现具体的基础设施操作
    pass

def register_repository(self, name: str, repository: 'Repository'):
    """注册仓储"""
    self.repositories[name] = repository

def register_external_service(self, name: str, service: 'ExternalService'):
    """注册外部服务"""
    self.external_services[name] = service

def register_message_broker(self, name: str, broker: 'MessageBroker'):
    """注册消息代理"""
    self.message_brokers[name] = broker

class ArchitectureBuilder: “”“架构构建器”“”

def __init__(self):
    self.layers = {}

def add_layer(self, layer: Layer) -> 'ArchitectureBuilder':
    """添加层"""
    self.layers[layer.layer_type] = layer
    return self

def build_dependencies(self) -> 'ArchitectureBuilder':
    """构建依赖关系"""
    # 表示层依赖应用层
    if LayerType.PRESENTATION in self.layers and LayerType.APPLICATION in self.layers:
        self.layers[LayerType.PRESENTATION].add_dependency(self.layers[LayerType.APPLICATION])

    # 应用层依赖领域层
    if LayerType.APPLICATION in self.layers and LayerType.DOMAIN in self.layers:
        self.layers[LayerType.APPLICATION].add_dependency(self.layers[LayerType.DOMAIN])

    # 领域层依赖基础设施层
    if LayerType.DOMAIN in self.layers and LayerType.INFRASTRUCTURE in self.layers:
        self.layers[LayerType.DOMAIN].add_dependency(self.layers[LayerType.INFRASTRUCTURE])

    return self

def build(self) -> Dict[LayerType, Layer]:
    """构建架构"""
    return self.layers

### 2. 领域驱动设计(DDD)实践

```python
# domain/entities.py
from abc import ABC, abstractmethod
from typing import List, Optional, Any, Dict
from dataclasses import dataclass, field
from datetime import datetime
import uuid

class DomainEvent(ABC):
    """领域事件基类"""
    
    def __init__(self, event_type: str, aggregate_id: str, version: int):
        self.event_id = str(uuid.uuid4())
        self.event_type = event_type
        self.aggregate_id = aggregate_id
        self.version = version
        self.occurred_at = datetime.utcnow()
        self.metadata = {}
    
    def add_metadata(self, key: str, value: Any):
        """添加元数据"""
        self.metadata[key] = value

class AggregateRoot(ABC):
    """聚合根基类"""
    
    def __init__(self, aggregate_id: str):
        self.id = aggregate_id
        self.version = 0
        self._events: List[DomainEvent] = []
        self.created_at = datetime.utcnow()
        self.updated_at = datetime.utcnow()
    
    def add_event(self, event: DomainEvent):
        """添加领域事件"""
        self._events.append(event)
        self.version += 1
        self.updated_at = datetime.utcnow()
    
    def get_events(self) -> List[DomainEvent]:
        """获取领域事件"""
        return self._events.copy()
    
    def clear_events(self):
        """清除领域事件"""
        self._events.clear()
    
    @abstractmethod
    def validate(self) -> bool:
        """验证聚合状态"""
        pass

class ValueObject(ABC):
    """值对象基类"""
    
    def __eq__(self, other):
        if not isinstance(other, self.__class__):
            return False
        return self.__dict__ == other.__dict__
    
    def __hash__(self):
        return hash(tuple(sorted(self.__dict__.items())))
    
    @abstractmethod
    def validate(self) -> bool:
        """验证值对象"""
        pass

# MCP特定的领域模型
@dataclass
class ToolId(ValueObject):
    """工具ID值对象"""
    value: str
    
    def __post_init__(self):
        if not self.validate():
            raise ValueError(f"Invalid tool ID: {self.value}")
    
    def validate(self) -> bool:
        return bool(self.value and len(self.value) > 0 and len(self.value) <= 100)

@dataclass
class ToolName(ValueObject):
    """工具名称值对象"""
    value: str
    
    def __post_init__(self):
        if not self.validate():
            raise ValueError(f"Invalid tool name: {self.value}")
    
    def validate(self) -> bool:
        return bool(self.value and len(self.value) > 0 and len(self.value) <= 50)

@dataclass
class ToolDescription(ValueObject):
    """工具描述值对象"""
    value: str
    
    def __post_init__(self):
        if not self.validate():
            raise ValueError(f"Invalid tool description: {self.value}")
    
    def validate(self) -> bool:
        return len(self.value) <= 500

class ToolCreatedEvent(DomainEvent):
    """工具创建事件"""
    
    def __init__(self, tool_id: str, tool_name: str, version: int):
        super().__init__("tool_created", tool_id, version)
        self.tool_name = tool_name

class ToolUpdatedEvent(DomainEvent):
    """工具更新事件"""
    
    def __init__(self, tool_id: str, changes: Dict[str, Any], version: int):
        super().__init__("tool_updated", tool_id, version)
        self.changes = changes

class ToolDeletedEvent(DomainEvent):
    """工具删除事件"""
    
    def __init__(self, tool_id: str, version: int):
        super().__init__("tool_deleted", tool_id, version)

class Tool(AggregateRoot):
    """工具聚合根"""
    
    def __init__(self, tool_id: ToolId, name: ToolName, description: ToolDescription):
        super().__init__(tool_id.value)
        self.tool_id = tool_id
        self.name = name
        self.description = description
        self.is_active = True
        self.parameters = {}
        self.execution_count = 0
        self.last_executed_at: Optional[datetime] = None
        
        # 添加创建事件
        self.add_event(ToolCreatedEvent(self.id, self.name.value, self.version))
    
    def update_name(self, new_name: ToolName):
        """更新工具名称"""
        if self.name != new_name:
            old_name = self.name.value
            self.name = new_name
            
            changes = {"name": {"old": old_name, "new": new_name.value}}
            self.add_event(ToolUpdatedEvent(self.id, changes, self.version))
    
    def update_description(self, new_description: ToolDescription):
        """更新工具描述"""
        if self.description != new_description:
            old_description = self.description.value
            self.description = new_description
            
            changes = {"description": {"old": old_description, "new": new_description.value}}
            self.add_event(ToolUpdatedEvent(self.id, changes, self.version))
    
    def execute(self) -> 'ToolExecutionResult':
        """执行工具"""
        if not self.is_active:
            raise ValueError("Cannot execute inactive tool")
        
        self.execution_count += 1
        self.last_executed_at = datetime.utcnow()
        
        # 这里应该委托给领域服务执行具体逻辑
        return ToolExecutionResult(self.id, True, "Execution successful")
    
    def deactivate(self):
        """停用工具"""
        if self.is_active:
            self.is_active = False
            changes = {"is_active": {"old": True, "new": False}}
            self.add_event(ToolUpdatedEvent(self.id, changes, self.version))
    
    def activate(self):
        """激活工具"""
        if not self.is_active:
            self.is_active = True
            changes = {"is_active": {"old": False, "new": True}}
            self.add_event(ToolUpdatedEvent(self.id, changes, self.version))
    
    def delete(self):
        """删除工具"""
        self.add_event(ToolDeletedEvent(self.id, self.version))
    
    def validate(self) -> bool:
        """验证工具状态"""
        return (
            self.tool_id.validate() and
            self.name.validate() and
            self.description.validate()
        )

@dataclass
class ToolExecutionResult:
    """工具执行结果"""
    tool_id: ToolId
    success: bool
    message: str
    data: Optional[Any] = None
    execution_time: Optional[float] = None

class ToolDomainService:
    """工具领域服务"""
    
    def __init__(self, tool_repository: 'ToolRepository'):
        self.tool_repository = tool_repository
    
    async def create_tool(self, name: str, description: str) -> Tool:
        """创建工具"""
        # 检查名称是否已存在
        existing_tool = await self.tool_repository.find_by_name(name)
        if existing_tool:
            raise ValueError(f"Tool with name '{name}' already exists")
        
        # 创建工具
        tool_id = ToolId(str(uuid.uuid4()))
        tool_name = ToolName(name)
        tool_description = ToolDescription(description)
        
        tool = Tool(tool_id, tool_name, tool_description)
        
        # 保存工具
        await self.tool_repository.save(tool)
        
        return tool
    
    async def execute_tool(self, tool_id: str, parameters: Dict[str, Any]) -> ToolExecutionResult:
        """执行工具"""
        tool = await self.tool_repository.find_by_id(tool_id)
        if not tool:
            raise ValueError(f"Tool with ID '{tool_id}' not found")
        
        # 执行工具
        result = tool.execute()
        
        # 保存更新后的工具状态
        await self.tool_repository.save(tool)
        
        return result
    
    async def get_tool_statistics(self, tool_id: str) -> Dict[str, Any]:
        """获取工具统计信息"""
        tool = await self.tool_repository.find_by_id(tool_id)
        if not tool:
            raise ValueError(f"Tool with ID '{tool_id}' not found")
        
        return {
            "execution_count": tool.execution_count,
            "last_executed_at": tool.last_executed_at,
            "is_active": tool.is_active,
            "created_at": tool.created_at,
            "updated_at": tool.updated_at
        }

3. 仓储模式实现

# infrastructure/repositories.py
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any
import asyncio
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete
from sqlalchemy.orm import selectinload

class Repository(ABC):
    """仓储基类"""
    
    @abstractmethod
    async def find_by_id(self, entity_id: str) -> Optional[Any]:
        """根据ID查找实体"""
        pass
    
    @abstractmethod
    async def save(self, entity: Any) -> Any:
        """保存实体"""
        pass
    
    @abstractmethod
    async def delete(self, entity: Any) -> bool:
        """删除实体"""
        pass
    
    @abstractmethod
    async def find_all(self) -> List[Any]:
        """查找所有实体"""
        pass

class ToolRepository(Repository):
    """工具仓储接口"""
    
    @abstractmethod
    async def find_by_name(self, name: str) -> Optional[Tool]:
        """根据名称查找工具"""
        pass
    
    @abstractmethod
    async def find_active_tools(self) -> List[Tool]:
        """查找活跃工具"""
        pass
    
    @abstractmethod
    async def find_by_execution_count_range(self, min_count: int, max_count: int) -> List[Tool]:
        """根据执行次数范围查找工具"""
        pass

class SQLAlchemyToolRepository(ToolRepository):
    """SQLAlchemy工具仓储实现"""
    
    def __init__(self, session: AsyncSession):
        self.session = session
    
    async def find_by_id(self, tool_id: str) -> Optional[Tool]:
        """根据ID查找工具"""
        stmt = select(ToolModel).where(ToolModel.id == tool_id)
        result = await self.session.execute(stmt)
        tool_model = result.scalar_one_or_none()
        
        if tool_model:
            return self._to_domain_entity(tool_model)
        return None
    
    async def find_by_name(self, name: str) -> Optional[Tool]:
        """根据名称查找工具"""
        stmt = select(ToolModel).where(ToolModel.name == name)
        result = await self.session.execute(stmt)
        tool_model = result.scalar_one_or_none()
        
        if tool_model:
            return self._to_domain_entity(tool_model)
        return None
    
    async def find_active_tools(self) -> List[Tool]:
        """查找活跃工具"""
        stmt = select(ToolModel).where(ToolModel.is_active == True)
        result = await self.session.execute(stmt)
        tool_models = result.scalars().all()
        
        return [self._to_domain_entity(model) for model in tool_models]
    
    async def find_by_execution_count_range(self, min_count: int, max_count: int) -> List[Tool]:
        """根据执行次数范围查找工具"""
        stmt = select(ToolModel).where(
            ToolModel.execution_count >= min_count,
            ToolModel.execution_count <= max_count
        )
        result = await self.session.execute(stmt)
        tool_models = result.scalars().all()
        
        return [self._to_domain_entity(model) for model in tool_models]
    
    async def save(self, tool: Tool) -> Tool:
        """保存工具"""
        existing_model = await self.session.get(ToolModel, tool.id)
        
        if existing_model:
            # 更新现有工具
            self._update_model_from_entity(existing_model, tool)
        else:
            # 创建新工具
            tool_model = self._to_data_model(tool)
            self.session.add(tool_model)
        
        await self.session.commit()
        return tool
    
    async def delete(self, tool: Tool) -> bool:
        """删除工具"""
        stmt = delete(ToolModel).where(ToolModel.id == tool.id)
        result = await self.session.execute(stmt)
        await self.session.commit()
        
        return result.rowcount > 0
    
    async def find_all(self) -> List[Tool]:
        """查找所有工具"""
        stmt = select(ToolModel)
        result = await self.session.execute(stmt)
        tool_models = result.scalars().all()
        
        return [self._to_domain_entity(model) for model in tool_models]
    
    def _to_domain_entity(self, model: 'ToolModel') -> Tool:
        """将数据模型转换为领域实体"""
        tool_id = ToolId(model.id)
        tool_name = ToolName(model.name)
        tool_description = ToolDescription(model.description)
        
        tool = Tool(tool_id, tool_name, tool_description)
        tool.is_active = model.is_active
        tool.execution_count = model.execution_count
        tool.last_executed_at = model.last_executed_at
        tool.created_at = model.created_at
        tool.updated_at = model.updated_at
        tool.version = model.version
        
        # 清除事件(从数据库加载的实体不应该有未提交的事件)
        tool.clear_events()
        
        return tool
    
    def _to_data_model(self, tool: Tool) -> 'ToolModel':
        """将领域实体转换为数据模型"""
        return ToolModel(
            id=tool.id,
            name=tool.name.value,
            description=tool.description.value,
            is_active=tool.is_active,
            execution_count=tool.execution_count,
            last_executed_at=tool.last_executed_at,
            created_at=tool.created_at,
            updated_at=tool.updated_at,
            version=tool.version
        )
    
    def _update_model_from_entity(self, model: 'ToolModel', tool: Tool):
        """从领域实体更新数据模型"""
        model.name = tool.name.value
        model.description = tool.description.value
        model.is_active = tool.is_active
        model.execution_count = tool.execution_count
        model.last_executed_at = tool.last_executed_at
        model.updated_at = tool.updated_at
        model.version = tool.version

class InMemoryToolRepository(ToolRepository):
    """内存工具仓储实现(用于测试)"""
    
    def __init__(self):
        self._tools: Dict[str, Tool] = {}
    
    async def find_by_id(self, tool_id: str) -> Optional[Tool]:
        return self._tools.get(tool_id)
    
    async def find_by_name(self, name: str) -> Optional[Tool]:
        for tool in self._tools.values():
            if tool.name.value == name:
                return tool
        return None
    
    async def find_active_tools(self) -> List[Tool]:
        return [tool for tool in self._tools.values() if tool.is_active]
    
    async def find_by_execution_count_range(self, min_count: int, max_count: int) -> List[Tool]:
        return [
            tool for tool in self._tools.values()
            if min_count <= tool.execution_count <= max_count
        ]
    
    async def save(self, tool: Tool) -> Tool:
        self._tools[tool.id] = tool
        return tool
    
    async def delete(self, tool: Tool) -> bool:
        if tool.id in self._tools:
            del self._tools[tool.id]
            return True
        return False
    
    async def find_all(self) -> List[Tool]:
        return list(self._tools.values())
    
    def clear(self):
        """清空仓储(用于测试)"""
        self._tools.clear()

实际案例分析

案例1:智能客服系统

# cases/customer_service.py
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
import asyncio
from datetime import datetime

class ConversationStatus(Enum):
    """对话状态"""
    ACTIVE = "active"
    WAITING = "waiting"
    RESOLVED = "resolved"
    ESCALATED = "escalated"

@dataclass
class CustomerServiceConfig:
    """客服系统配置"""
    max_concurrent_conversations: int = 100
    response_timeout: int = 30
    escalation_threshold: int = 3
    knowledge_base_url: str = "https://kb.example.com"
    sentiment_analysis_enabled: bool = True
    auto_resolution_enabled: bool = True

class CustomerServiceMCPServer:
    """智能客服MCP服务器"""
    
    def __init__(self, config: CustomerServiceConfig):
        self.config = config
        self.conversations: Dict[str, 'Conversation'] = {}
        self.knowledge_base = KnowledgeBase(config.knowledge_base_url)
        self.sentiment_analyzer = SentimentAnalyzer() if config.sentiment_analysis_enabled else None
        self.escalation_manager = EscalationManager(config.escalation_threshold)
    
    async def handle_customer_message(self, conversation_id: str, message: str, customer_id: str) -> Dict[str, Any]:
        """处理客户消息"""
        # 获取或创建对话
        conversation = await self._get_or_create_conversation(conversation_id, customer_id)
        
        # 添加客户消息
        conversation.add_message("customer", message)
        
        # 情感分析
        sentiment = None
        if self.sentiment_analyzer:
            sentiment = await self.sentiment_analyzer.analyze(message)
            conversation.update_sentiment(sentiment)
        
        # 生成回复
        response = await self._generate_response(conversation, message)
        
        # 检查是否需要升级
        if await self._should_escalate(conversation):
            await self.escalation_manager.escalate(conversation)
            response["escalated"] = True
        
        return response
    
    async def _get_or_create_conversation(self, conversation_id: str, customer_id: str) -> 'Conversation':
        """获取或创建对话"""
        if conversation_id not in self.conversations:
            self.conversations[conversation_id] = Conversation(
                conversation_id=conversation_id,
                customer_id=customer_id,
                status=ConversationStatus.ACTIVE
            )
        return self.conversations[conversation_id]
    
    async def _generate_response(self, conversation: 'Conversation', message: str) -> Dict[str, Any]:
        """生成回复"""
        # 1. 从知识库搜索相关信息
        knowledge_results = await self.knowledge_base.search(message)
        
        # 2. 分析对话历史
        context = conversation.get_context()
        
        # 3. 生成回复
        if knowledge_results and self.config.auto_resolution_enabled:
            # 自动回复
            response_text = await self._generate_auto_response(knowledge_results, context)
            conversation.add_message("assistant", response_text)
            
            return {
                "response": response_text,
                "type": "auto",
                "confidence": knowledge_results[0].get("confidence", 0.0),
                "sources": [result["source"] for result in knowledge_results[:3]]
            }
        else:
            # 需要人工处理
            conversation.status = ConversationStatus.WAITING
            
            return {
                "response": "我正在为您查找相关信息,请稍等片刻。",
                "type": "waiting",
                "requires_human": True
            }
    
    async def _generate_auto_response(self, knowledge_results: List[Dict], context: Dict) -> str:
        """生成自动回复"""
        # 基于知识库结果和对话上下文生成回复
        best_result = knowledge_results[0]
        
        # 这里可以集成更复杂的NLP模型
        response_template = best_result.get("response_template", "根据您的问题,{answer}")
        answer = best_result.get("answer", "我会尽快为您处理。")
        
        return response_template.format(answer=answer)
    
    async def _should_escalate(self, conversation: 'Conversation') -> bool:
        """判断是否需要升级"""
        # 检查升级条件
        if conversation.unresolved_count >= self.config.escalation_threshold:
            return True
        
        if conversation.sentiment and conversation.sentiment.get("score", 0) < -0.7:
            return True
        
        if conversation.duration_minutes > 30:
            return True
        
        return False
    
    # MCP工具定义
    async def search_knowledge_base(self, query: str, limit: int = 5) -> List[Dict[str, Any]]:
        """搜索知识库工具"""
        return await self.knowledge_base.search(query, limit)
    
    async def analyze_sentiment(self, text: str) -> Dict[str, Any]:
        """情感分析工具"""
        if not self.sentiment_analyzer:
            return {"error": "Sentiment analysis not enabled"}
        
        return await self.sentiment_analyzer.analyze(text)
    
    async def get_conversation_summary(self, conversation_id: str) -> Dict[str, Any]:
        """获取对话摘要工具"""
        conversation = self.conversations.get(conversation_id)
        if not conversation:
            return {"error": "Conversation not found"}
        
        return {
            "conversation_id": conversation_id,
            "customer_id": conversation.customer_id,
            "status": conversation.status.value,
            "message_count": len(conversation.messages),
            "duration_minutes": conversation.duration_minutes,
            "sentiment": conversation.sentiment,
            "unresolved_count": conversation.unresolved_count,
            "last_activity": conversation.last_activity.isoformat()
        }
    
    async def escalate_conversation(self, conversation_id: str, reason: str) -> Dict[str, Any]:
        """升级对话工具"""
        conversation = self.conversations.get(conversation_id)
        if not conversation:
            return {"error": "Conversation not found"}
        
        result = await self.escalation_manager.escalate(conversation, reason)
        return result

class Conversation:
    """对话类"""
    
    def __init__(self, conversation_id: str, customer_id: str, status: ConversationStatus):
        self.conversation_id = conversation_id
        self.customer_id = customer_id
        self.status = status
        self.messages: List[Dict[str, Any]] = []
        self.sentiment: Optional[Dict[str, Any]] = None
        self.unresolved_count = 0
        self.created_at = datetime.utcnow()
        self.last_activity = datetime.utcnow()
    
    def add_message(self, role: str, content: str):
        """添加消息"""
        self.messages.append({
            "role": role,
            "content": content,
            "timestamp": datetime.utcnow().isoformat()
        })
        self.last_activity = datetime.utcnow()
        
        if role == "customer":
            self.unresolved_count += 1
        elif role == "assistant":
            self.unresolved_count = max(0, self.unresolved_count - 1)
    
    def update_sentiment(self, sentiment: Dict[str, Any]):
        """更新情感分析结果"""
        self.sentiment = sentiment
    
    def get_context(self) -> Dict[str, Any]:
        """获取对话上下文"""
        return {
            "message_count": len(self.messages),
            "recent_messages": self.messages[-5:],  # 最近5条消息
            "sentiment": self.sentiment,
            "duration_minutes": self.duration_minutes
        }
    
    @property
    def duration_minutes(self) -> int:
        """对话持续时间(分钟)"""
        return int((datetime.utcnow() - self.created_at).total_seconds() / 60)

class KnowledgeBase:
    """知识库"""
    
    def __init__(self, base_url: str):
        self.base_url = base_url
        self.cache = {}
    
    async def search(self, query: str, limit: int = 5) -> List[Dict[str, Any]]:
        """搜索知识库"""
        # 这里应该实现实际的知识库搜索逻辑
        # 可以集成Elasticsearch、向量数据库等
        
        # 模拟搜索结果
        return [
            {
                "id": "kb_001",
                "title": "常见问题解答",
                "answer": "这是一个常见问题的答案。",
                "confidence": 0.85,
                "source": "FAQ",
                "response_template": "根据我们的FAQ,{answer}"
            }
        ]

class SentimentAnalyzer:
    """情感分析器"""
    
    async def analyze(self, text: str) -> Dict[str, Any]:
        """分析文本情感"""
        # 这里应该集成实际的情感分析模型
        # 可以使用BERT、RoBERTa等预训练模型
        
        # 模拟情感分析结果
        return {
            "score": 0.2,  # -1到1之间,负数表示负面情感
            "label": "neutral",
            "confidence": 0.75
        }

class EscalationManager:
    """升级管理器"""
    
    def __init__(self, threshold: int):
        self.threshold = threshold
    
    async def escalate(self, conversation: 'Conversation', reason: str = None) -> Dict[str, Any]:
        """升级对话"""
        conversation.status = ConversationStatus.ESCALATED
        
        # 这里应该实现实际的升级逻辑
        # 例如:通知人工客服、创建工单等
        
        return {
            "escalated": True,
            "conversation_id": conversation.conversation_id,
            "reason": reason or "Automatic escalation",
            "escalated_at": datetime.utcnow().isoformat()
        }

案例2:代码助手系统

”`python

cases/code_assistant.py

from typing import Dict, List, Any, Optional, Tuple from dataclasses import dataclass from enum import Enum import ast import re import subprocess import tempfile import os from pathlib import Path

class CodeLanguage(Enum): “”“编程语言”“” PYTHON = “python” JAVASCRIPT = “javascript” TYPESCRIPT = “typescript” JAVA = “java” GO = “go” RUST = “rust” CPP = “cpp”

class AnalysisType(Enum): “”“分析类型”“” SYNTAX = “syntax” STYLE = “style” SECURITY = “security” PERFORMANCE = “performance” COMPLEXITY = “complexity”

@dataclass class CodeAnalysisResult: “”“代码分析结果”“” language: CodeLanguage analysis_type: AnalysisType issues: List[Dict[str, Any]] metrics: Dict[str, Any] suggestions: List[str] score: float # 0-100分

class CodeAssistantMCPServer: “”“代码助手MCP服务器”“”

def __init__(self):
    self.analyzers = {
        CodeLanguage.PYTHON: PythonCodeAnalyzer(),
        CodeLanguage.JAVASCRIPT: JavaScriptCodeAnalyzer(),
        # 可以添加更多语言的分析器
    }
    self.code_cache = {}

# MCP工具定义
async def analyze_code(self, code: str, language: str, analysis_types: List[str] = None) -> Dict[str, Any]:
    """代码分析工具"""
    try:
        lang_enum = CodeLanguage(language.lower())
    except ValueError:
        return {"error": f"Unsupported language: {language}"}

    if lang_enum not in self.analyzers:
        return {"error": f"No analyzer available for {language}"}

    analyzer = self.analyzers[lang_enum]

    # 默认分析类型
    if not analysis_types:
        analysis_types = ["syntax", "style", "security"]

    results = {}
    overall_score = 0

    for analysis_type in analysis_types:
        try:
            analysis_enum = AnalysisType(analysis_type.lower())
            result = await analyzer.analyze(code, analysis_enum)
            results[analysis_type] = {
                "issues": result.issues,
                "metrics": result.metrics,
                "suggestions": result.suggestions,
                "score": result.score
            }
            overall_score += result.score
        except ValueError:
            results[analysis_type] = {"error": f"Unknown analysis type: {analysis_type}"}

    if analysis_types:
        overall_score /= len(analysis_types)

    return {
        "language": language,
        "overall_score": overall_score,
        "analysis_results": results,
        "summary": self._generate_summary(results, overall_score)
    }

async def format_code(self, code: str, language: str, style_config: Dict[str, Any] = None) -> Dict[str, Any]:
    """代码格式化工具"""
    try:
        lang_enum = CodeLanguage(language.lower())
    except ValueError:
        return {"error": f"Unsupported language: {language}"}

    if lang_enum not in self.analyzers:
        return {"error": f"No formatter available for {language}"}

    analyzer = self.analyzers[lang_enum]
    formatted_code = await analyzer.format_code(code, style_config or {})

    return {
        "original_code": code,
        "formatted_code": formatted_code,
        "changes_made": code != formatted_code
    }

async def suggest_improvements(self, code: str, language: str, context: str = None) -> Dict[str, Any]:
    """代码改进建议工具"""
    # 先进行代码分析
    analysis_result = await self.analyze_code(code, language, ["style", "performance", "security"])

    if "error" in analysis_result:
        return analysis_result

    # 生成改进建议
    suggestions = []

    for analysis_type, result in analysis_result["analysis_results"].items():
        if "suggestions" in result:
            suggestions.extend(result["suggestions"])

    # 基于上下文的额外建议
    if context:
        context_suggestions = await self._generate_context_suggestions(code, language, context)
        suggestions.extend(context_suggestions)

    return {
        "suggestions": suggestions,
        "priority_suggestions": self._prioritize_suggestions(suggestions),
        "estimated_impact": self._estimate_impact(suggestions)
    }

async def execute_code(self, code: str, language: str, inputs: List[str] = None) -> Dict[str, Any]:
    """代码执行工具(安全沙箱)"""
    try:
        lang_enum = CodeLanguage(language.lower())
    except ValueError:
        return {"error": f"Unsupported language: {language}"}

    # 安全检查
    security_issues = await self._check_security(code, lang_enum)
    if security_issues:
        return {
            "error": "Security issues detected",
            "issues": security_issues
        }

    # 在沙箱中执行代码
    result = await self._execute_in_sandbox(code, lang_enum, inputs or [])

    return result

async def generate_tests(self, code: str, language: str, test_framework: str = None) -> Dict[str, Any]:
    """测试生成工具"""
    try:
        lang_enum = CodeLanguage(language.lower())
    except ValueError:
        return {"error": f"Unsupported language: {language}"}

    if lang_enum not in self.analyzers:
        return {"error": f"No test generator available for {language}"}

    analyzer = self.analyzers[lang_enum]
    tests = await analyzer.generate_tests(code, test_framework)

    return {
        "test_code": tests,
        "test_framework": test_framework or analyzer.default_test_framework,
        "coverage_estimate": await self._estimate_coverage(code, tests, lang_enum)
    }

async def explain_code(self, code: str, language: str, detail_level: str = "medium") -> Dict[str, Any]:
    """代码解释工具"""
    try:
        lang_enum = CodeLanguage(language.lower())
    except ValueError:
        return {"error": f"Unsupported language: {language}"}

    if lang_enum not in self.analyzers:
        return {"error": f"No explainer available for {language}"}

    analyzer = self.analyzers[lang_enum]
    explanation = await analyzer.explain_code(code, detail_level)

    return explanation

def _generate_summary(self, results: Dict[str, Any], overall_score: float) -> str:
    """生成分析摘要"""
    if overall_score >= 80:
        quality = "excellent"
    elif overall_score >= 60:
        quality = "good"
    elif overall_score >= 40:
        quality = "fair"
    else:
        quality = "poor"

    total_issues = sum(
        len(result.get("issues", []))
        for result in results.values()
        if isinstance(result, dict) and "issues" in result
    )

    return f"Code quality: {quality} (score: {overall_score:.1f}/100). Found {total_issues} issues."

async def _generate_context_suggestions(self, code: str, language: str, context: str) -> List[str]:
    """基于上下文生成建议"""
    suggestions = []

    # 根据上下文类型生成不同的建议
    if "web" in context.lower():
        suggestions.append("Consider adding input validation for web security")
        suggestions.append("Implement proper error handling for HTTP requests")

    if "api" in context.lower():
        suggestions.append("Add rate limiting to prevent abuse")
        suggestions.append("Implement proper authentication and authorization")

    if "database" in context.lower():
        suggestions.append("Use parameterized queries to prevent SQL injection")
        suggestions.append("Implement connection pooling for better performance")

    return suggestions

def _prioritize_suggestions(self, suggestions: List[str]) -> List[str]:
    """优先级排序建议"""
    # 安全相关的建议优先级最高
    security_keywords = ["security", "injection", "validation", "authentication"]
    performance_keywords = ["performance", "optimization", "caching", "pooling"]

    prioritized = []

    # 安全建议
    for suggestion in suggestions:
        if any(keyword in suggestion.lower() for keyword in security_keywords):
            prioritized.append(suggestion)

    # 性能建议
    for suggestion in suggestions:
        if (any(keyword in suggestion.lower() for keyword in performance_keywords) and
            suggestion not in prioritized):
            prioritized.append(suggestion)

    # 其他建议
    for suggestion in suggestions:
        if suggestion not in prioritized:
            prioritized.append(suggestion)

    return prioritized

def _estimate_impact(self, suggestions: List[str]) -> Dict[str, int]:
    """估算建议的影响"""
    return {
        "security_impact": len([s for s in suggestions if "security" in s.lower()]),
        "performance_impact": len([s for s in suggestions if "performance" in s.lower()]),
        "maintainability_impact": len([s for s in suggestions if any(word in s.lower() for word in ["readable", "maintainable", "clean"])])
    }

async def _check_security(self, code: str, language: CodeLanguage) -> List[str]:
    """安全检查"""
    issues = []

    # 检查危险函数调用
    dangerous_patterns = {
        CodeLanguage.PYTHON: [
            r"eval\(",
            r"exec\(",
            r"__import__\(",
            r"open\([^)]*['\"]w",  # 写文件操作
            r"subprocess\.",
            r"os\.system\("
        ],
        CodeLanguage.JAVASCRIPT: [
            r"eval\(",
            r"Function\(",
            r"document\.write\(",
            r"innerHTML\s*="
        ]
    }

    if language in dangerous_patterns:
        for pattern in dangerous_patterns[language]:
            if re.search(pattern, code):
                issues.append(f"Potentially dangerous pattern detected: {pattern}")

    return issues

async def _execute_in_sandbox(self, code: str, language: CodeLanguage, inputs: List[str]) -> Dict[str, Any]:
    """在沙箱中执行代码"""
    if language == CodeLanguage.PYTHON:
        return await self._execute_python_sandbox(code, inputs)
    else:
        return {"error": f"Sandbox execution not implemented for {language.value}"}

async def _execute_python_sandbox(self, code: str, inputs: List[str]) -> Dict[str, Any]:
    """Python沙箱执行"""
    try:
        # 创建临时文件
        with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
            f.write(code)
            temp_file = f.name

        # 准备输入
        input_str = '\n'.join(inputs) if inputs else ''

        # 执行代码(限制资源)
        result = subprocess.run(
            ['python', temp_file],
            input=input_str,
            capture_output=True,
            text=True,
            timeout=10,  # 10秒超时
            cwd=tempfile.gettempdir()  # 在临时目录执行
        )

        # 清理临时文件
        os.unlink(temp_file)

        return {
            "stdout": result.stdout,
            "stderr": result.stderr,
            "return_code": result.returncode,
            "execution_time": "< 10s"
        }

    except subprocess.TimeoutExpired:
        return {"error": "Code execution timed out"}
    except Exception as e:
        return {"error": f"Execution failed: {str(e)}"}

async def _estimate_coverage(self, code: str, tests: str, language: CodeLanguage) -> Dict[str, Any]:
    """估算测试覆盖率"""
    # 这里应该实现实际的覆盖率分析
    # 可以集成coverage.py等工具

    return {
        "estimated_line_coverage": 85,
        "estimated_branch_coverage": 70,
        "test_count": len(re.findall(r'def test_', tests))
    }

class CodeAnalyzer(ABC): “”“代码分析器基类”“”

@abstractmethod
async def analyze(self, code: str, analysis_type: AnalysisType) -> CodeAnalysisResult:
    pass

@abstractmethod
async def format_code(self, code: str, style_config: Dict[str, Any]) -> str:
    pass

@abstractmethod
async def generate_tests(self, code: str, test_framework: str = None) -> str:
    pass

@abstractmethod
async def explain_code(self, code: str, detail_level: str) -> Dict[str, Any]:
    pass

@property
@abstractmethod
def default_test_framework(self) -> str:
    pass

class PythonCodeAnalyzer(CodeAnalyzer): “”“Python代码分析器”“”

@property
def default_test_framework(self) -> str:
    return "pytest"

async def analyze(self, code: str, analysis_type: AnalysisType) -> CodeAnalysisResult:
    """分析Python代码"""
    if analysis_type == AnalysisType.SYNTAX:
        return await self._analyze_syntax(code)
    elif analysis_type == AnalysisType.STYLE:
        return await self._analyze_style(code)
    elif analysis_type == AnalysisType.SECURITY:
        return await self._analyze_security(code)
    elif analysis_type == AnalysisType.PERFORMANCE:
        return await self._analyze_performance(code)
    elif analysis_type == AnalysisType.COMPLEXITY:
        return await self._analyze_complexity(code)
    else:
        raise ValueError(f"Unknown analysis type: {analysis_type}")

async def _analyze_syntax(self, code: str) -> CodeAnalysisResult:
    """语法分析"""
    issues = []
    score = 100

    try:
        ast.parse(code)
    except SyntaxError as e:
        issues.append({
            "type": "syntax_error",
            "line": e.lineno,
            "message": str(e),
            "severity": "error"
        })
        score = 0

    return CodeAnalysisResult(
        language=CodeLanguage.PYTHON,
        analysis_type=AnalysisType.SYNTAX,
        issues=issues,
        metrics={"syntax_errors": len(issues)},
        suggestions=["Fix syntax errors before proceeding"] if issues else [],
        score=score
    )

async def _analyze_style(self, code: str) -> CodeAnalysisResult:
    """代码风格分析"""
    issues = []
    suggestions = []

    lines = code.split('\n')

    # 检查行长度
    for i, line in enumerate(lines, 1):
        if len(line) > 88:  # PEP 8建议
            issues.append({
                "type": "line_too_long",
                "line": i,
                "message": f"Line too long ({len(line)} > 88 characters)",
                "severity": "warning"
            })