8.1 课程回顾

学习路径总结

通过前面七章的学习,我们完整地掌握了 AI Script 编程语言的核心知识和实践技能:

第1章:AI Script基础概念

  • 核心收获:理解了 AI Script 的设计理念和核心特性
  • 关键技能:掌握了开发环境搭建和基本配置
  • 实践能力:能够编写第一个 AI Script 程序

第2章:语法基础与数据类型

  • 核心收获:掌握了 AI Script 的语法规则和类型系统
  • 关键技能:熟练使用各种数据类型和运算符
  • 实践能力:能够进行基本的数据处理和类型转换

第3章:控制结构与函数

  • 核心收获:理解了程序控制流和函数式编程特性
  • 关键技能:掌握了条件判断、循环、函数定义和异常处理
  • 实践能力:能够编写结构化的程序逻辑

第4章:AI集成与机器学习

  • 核心收获:学会了 AI 模型的集成和使用
  • 关键技能:掌握了数据预处理、模型训练和推理
  • 实践能力:能够构建完整的机器学习流水线

第5章:自动化脚本开发

  • 核心收获:掌握了系统自动化和任务调度
  • 关键技能:学会了文件操作、网络请求和数据库管理
  • 实践能力:能够开发复杂的自动化工作流

第6章:高级特性与扩展开发

  • 核心收获:理解了元编程、插件系统和性能优化
  • 关键技能:掌握了并发编程和异步处理
  • 实践能力:能够开发高性能的扩展应用

第7章:部署与运维

  • 核心收获:学会了生产环境部署和运维管理
  • 关键技能:掌握了容器化、监控和故障排除
  • 实践能力:能够构建稳定可靠的生产系统

技能矩阵评估

技能领域 基础水平 中级水平 高级水平 专家水平
语法基础 🎯
AI集成 🎯
自动化开发 🎯
性能优化 🎯
部署运维 🎯

图例说明: - ✅ 已掌握 - 🎯 目标水平 - ⭐ 需要持续实践

8.2 实战项目案例

案例1:智能数据分析平台

# 智能数据分析平台
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime
import asyncio
import aiohttp
import json

@dataclass
class AnalysisResult {
    model_name: str
    accuracy: float
    precision: float
    recall: float
    f1_score: float
    feature_importance: Dict[str, float]
    predictions: List[Any]
    timestamp: datetime
}

class IntelligentDataAnalyzer {
    func __init__(config: Dict[str, Any]) {
        self.config = config
        self.models = {}
        self.data_cache = {}
        self.analysis_history = []
        self.logger = self._setup_logger()
    }
    
    func _setup_logger() {
        import logging
        logger = logging.getLogger("DataAnalyzer")
        logger.setLevel(logging.INFO)
        
        handler = logging.StreamHandler()
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        
        return logger
    }
    
    async func load_data_async(source: str, source_type: str = "csv") -> pd.DataFrame {
        """异步加载数据"""
        self.logger.info(f"开始加载数据: {source}")
        
        if source_type == "csv" {
            # 模拟异步文件读取
            await asyncio.sleep(0.1)
            data = pd.read_csv(source)
        } elif source_type == "api" {
            # 异步API调用
            async with aiohttp.ClientSession() as session {
                async with session.get(source) as response {
                    json_data = await response.json()
                    data = pd.DataFrame(json_data)
                }
            }
        } elif source_type == "database" {
            # 模拟异步数据库查询
            await asyncio.sleep(0.2)
            import sqlite3
            conn = sqlite3.connect(source)
            data = pd.read_sql_query("SELECT * FROM data", conn)
            conn.close()
        } else {
            raise ValueError(f"不支持的数据源类型: {source_type}")
        }
        
        # 缓存数据
        cache_key = f"{source}_{source_type}"
        self.data_cache[cache_key] = {
            'data': data,
            'timestamp': datetime.now(),
            'source': source,
            'type': source_type
        }
        
        self.logger.info(f"数据加载完成,形状: {data.shape}")
        return data
    }
    
    func preprocess_data(
        data: pd.DataFrame,
        target_column: str,
        feature_columns: Optional[List[str]] = null,
        handle_missing: str = "drop",
        normalize: bool = true
    ) -> tuple[pd.DataFrame, pd.Series] {
        """数据预处理"""
        self.logger.info("开始数据预处理")
        
        # 选择特征列
        if feature_columns is null {
            feature_columns = [col for col in data.columns if col != target_column]
        }
        
        # 提取特征和目标
        X = data[feature_columns].copy()
        y = data[target_column].copy()
        
        # 处理缺失值
        if handle_missing == "drop" {
            # 删除包含缺失值的行
            mask = ~(X.isnull().any(axis=1) | y.isnull())
            X = X[mask]
            y = y[mask]
        } elif handle_missing == "fill" {
            # 填充缺失值
            for col in X.columns {
                if X[col].dtype in ['int64', 'float64'] {
                    X[col].fillna(X[col].mean(), inplace=true)
                } else {
                    X[col].fillna(X[col].mode()[0], inplace=true)
                }
            }
            
            if y.dtype in ['int64', 'float64'] {
                y.fillna(y.mean(), inplace=true)
            } else {
                y.fillna(y.mode()[0], inplace=true)
            }
        }
        
        # 编码分类变量
        for col in X.columns {
            if X[col].dtype == 'object' {
                from sklearn.preprocessing import LabelEncoder
                le = LabelEncoder()
                X[col] = le.fit_transform(X[col].astype(str))
            }
        }
        
        # 标准化数值特征
        if normalize {
            from sklearn.preprocessing import StandardScaler
            scaler = StandardScaler()
            numeric_columns = X.select_dtypes(include=[np.number]).columns
            X[numeric_columns] = scaler.fit_transform(X[numeric_columns])
        }
        
        self.logger.info(f"预处理完成,特征数量: {X.shape[1]},样本数量: {X.shape[0]}")
        return X, y
    }
    
    func train_model(
        X: pd.DataFrame,
        y: pd.Series,
        model_type: str = "random_forest",
        test_size: float = 0.2,
        random_state: int = 42
    ) -> AnalysisResult {
        """训练模型"""
        self.logger.info(f"开始训练模型: {model_type}")
        
        # 分割数据
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, random_state=random_state
        )
        
        # 选择模型
        if model_type == "random_forest" {
            from sklearn.ensemble import RandomForestClassifier
            model = RandomForestClassifier(
                n_estimators=100,
                random_state=random_state,
                n_jobs=-1
            )
        } elif model_type == "gradient_boosting" {
            from sklearn.ensemble import GradientBoostingClassifier
            model = GradientBoostingClassifier(
                n_estimators=100,
                random_state=random_state
            )
        } elif model_type == "svm" {
            from sklearn.svm import SVC
            model = SVC(random_state=random_state, probability=true)
        } elif model_type == "neural_network" {
            from sklearn.neural_network import MLPClassifier
            model = MLPClassifier(
                hidden_layer_sizes=(100, 50),
                random_state=random_state,
                max_iter=1000
            )
        } else {
            raise ValueError(f"不支持的模型类型: {model_type}")
        }
        
        # 训练模型
        model.fit(X_train, y_train)
        
        # 预测
        y_pred = model.predict(X_test)
        
        # 计算指标
        from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
        
        accuracy = accuracy_score(y_test, y_pred)
        precision = precision_score(y_test, y_pred, average='weighted')
        recall = recall_score(y_test, y_pred, average='weighted')
        f1 = f1_score(y_test, y_pred, average='weighted')
        
        # 特征重要性
        feature_importance = {}
        if hasattr(model, 'feature_importances_') {
            for i, importance in enumerate(model.feature_importances_) {
                feature_importance[X.columns[i]] = float(importance)
        }
        
        # 保存模型
        self.models[model_type] = {
            'model': model,
            'feature_columns': list(X.columns),
            'trained_at': datetime.now()
        }
        
        # 创建结果
        result = AnalysisResult(
            model_name=model_type,
            accuracy=accuracy,
            precision=precision,
            recall=recall,
            f1_score=f1,
            feature_importance=feature_importance,
            predictions=y_pred.tolist(),
            timestamp=datetime.now()
        )
        
        self.analysis_history.append(result)
        
        self.logger.info(f"模型训练完成,准确率: {accuracy:.4f}")
        return result
    }
    
    func generate_insights(result: AnalysisResult) -> Dict[str, Any] {
        """生成分析洞察"""
        insights = {
            'model_performance': {
                'overall_rating': self._rate_performance(result.accuracy),
                'strengths': [],
                'weaknesses': [],
                'recommendations': []
            },
            'feature_analysis': {
                'top_features': [],
                'feature_insights': []
            },
            'business_impact': {
                'potential_value': self._estimate_business_value(result),
                'implementation_complexity': self._assess_complexity(result),
                'risk_factors': []
            }
        }
        
        # 性能分析
        if result.accuracy > 0.9 {
            insights['model_performance']['strengths'].append("模型准确率优秀")
        } elif result.accuracy > 0.8 {
            insights['model_performance']['strengths'].append("模型准确率良好")
        } else {
            insights['model_performance']['weaknesses'].append("模型准确率需要改进")
            insights['model_performance']['recommendations'].append("考虑特征工程或模型调优")
        }
        
        if result.precision > 0.85 {
            insights['model_performance']['strengths'].append("精确率表现良好")
        } else {
            insights['model_performance']['weaknesses'].append("存在较多误报")
            insights['model_performance']['recommendations'].append("调整分类阈值或平衡数据集")
        }
        
        if result.recall > 0.85 {
            insights['model_performance']['strengths'].append("召回率表现良好")
        } else {
            insights['model_performance']['weaknesses'].append("存在较多漏报")
            insights['model_performance']['recommendations'].append("增加训练数据或调整模型参数")
        }
        
        # 特征分析
        if result.feature_importance {
            sorted_features = sorted(
                result.feature_importance.items(),
                key=lambda x: x[1],
                reverse=true
            )
            
            insights['feature_analysis']['top_features'] = sorted_features[:5]
            
            # 特征洞察
            top_feature = sorted_features[0]
            insights['feature_analysis']['feature_insights'].append(
                f"最重要的特征是 '{top_feature[0]}',重要性为 {top_feature[1]:.3f}"
            )
            
            if len(sorted_features) > 1 {
                second_feature = sorted_features[1]
                ratio = top_feature[1] / second_feature[1]
                if ratio > 2 {
                    insights['feature_analysis']['feature_insights'].append(
                        f"特征 '{top_feature[0]}' 的重要性显著高于其他特征"
                    )
                }
            }
        }
        
        return insights
    }
    
    func _rate_performance(accuracy: float) -> str {
        """评估性能等级"""
        if accuracy >= 0.95 {
            return "优秀"
        } elif accuracy >= 0.9 {
            return "良好"
        } elif accuracy >= 0.8 {
            return "一般"
        } elif accuracy >= 0.7 {
            return "较差"
        } else {
            return "很差"
        }
    }
    
    func _estimate_business_value(result: AnalysisResult) -> str {
        """估算商业价值"""
        if result.accuracy > 0.9 and result.f1_score > 0.9 {
            return "高价值 - 可直接用于生产环境"
        } elif result.accuracy > 0.8 {
            return "中等价值 - 需要进一步优化"
        } else {
            return "低价值 - 需要重新设计"
        }
    }
    
    func _assess_complexity(result: AnalysisResult) -> str {
        """评估实施复杂度"""
        feature_count = len(result.feature_importance)
        
        if feature_count <= 5 {
            return "低复杂度 - 易于实施和维护"
        } elif feature_count <= 15 {
            return "中等复杂度 - 需要适当的数据管道"
        } else {
            return "高复杂度 - 需要完善的特征工程流程"
        }
    }
    
    func create_visualization(result: AnalysisResult, output_dir: str = "./plots") {
        """创建可视化图表"""
        import os
        os.makedirs(output_dir, exist_ok=true)
        
        # 特征重要性图
        if result.feature_importance {
            plt.figure(figsize=(10, 6))
            
            features = list(result.feature_importance.keys())[:10]  # 前10个特征
            importances = [result.feature_importance[f] for f in features]
            
            plt.barh(features, importances)
            plt.title(f'{result.model_name} - 特征重要性')
            plt.xlabel('重要性')
            plt.tight_layout()
            plt.savefig(f'{output_dir}/feature_importance_{result.model_name}.png')
            plt.close()
        }
        
        # 性能指标雷达图
        plt.figure(figsize=(8, 8))
        
        metrics = ['准确率', '精确率', '召回率', 'F1分数']
        values = [result.accuracy, result.precision, result.recall, result.f1_score]
        
        angles = np.linspace(0, 2 * np.pi, len(metrics), endpoint=false).tolist()
        values += values[:1]  # 闭合图形
        angles += angles[:1]
        
        ax = plt.subplot(111, projection='polar')
        ax.plot(angles, values, 'o-', linewidth=2)
        ax.fill(angles, values, alpha=0.25)
        ax.set_xticks(angles[:-1])
        ax.set_xticklabels(metrics)
        ax.set_ylim(0, 1)
        ax.set_title(f'{result.model_name} - 性能指标', pad=20)
        
        plt.savefig(f'{output_dir}/performance_radar_{result.model_name}.png')
        plt.close()
        
        self.logger.info(f"可视化图表已保存到 {output_dir}")
    }
    
    func generate_report(result: AnalysisResult, insights: Dict[str, Any]) -> str {
        """生成分析报告"""
        report = f"""
# 智能数据分析报告

## 模型信息
- **模型类型**: {result.model_name}
- **分析时间**: {result.timestamp.strftime('%Y-%m-%d %H:%M:%S')}

## 性能指标
- **准确率**: {result.accuracy:.4f}
- **精确率**: {result.precision:.4f}
- **召回率**: {result.recall:.4f}
- **F1分数**: {result.f1_score:.4f}
- **整体评级**: {insights['model_performance']['overall_rating']}

## 特征分析
### 重要特征排序
"""
        
        if insights['feature_analysis']['top_features'] {
            for i, (feature, importance) in enumerate(insights['feature_analysis']['top_features'], 1) {
                report += f"{i}. {feature}: {importance:.4f}\n"
            }
        }
        
        report += f"""

### 特征洞察
"""
        for insight in insights['feature_analysis']['feature_insights'] {
            report += f"- {insight}\n"
        }
        
        report += f"""

## 模型优势
"""
        for strength in insights['model_performance']['strengths'] {
            report += f"- {strength}\n"
        }
        
        if insights['model_performance']['weaknesses'] {
            report += f"""

## 改进建议
"""
            for weakness in insights['model_performance']['weaknesses'] {
                report += f"- {weakness}\n"
            }
            
            for recommendation in insights['model_performance']['recommendations'] {
                report += f"- {recommendation}\n"
            }
        }
        
        report += f"""

## 商业价值评估
- **潜在价值**: {insights['business_impact']['potential_value']}
- **实施复杂度**: {insights['business_impact']['implementation_complexity']}

## 结论
基于当前分析结果,该模型{insights['model_performance']['overall_rating']},
建议根据上述改进建议进行优化后投入使用。
"""
        
        return report
    }
    
    async func run_complete_analysis(
        data_source: str,
        target_column: str,
        source_type: str = "csv",
        model_types: List[str] = ["random_forest"],
        output_dir: str = "./analysis_output"
    ) -> Dict[str, Any] {
        """运行完整的分析流程"""
        import os
        os.makedirs(output_dir, exist_ok=true)
        
        results = {}
        
        try {
            # 1. 加载数据
            data = await self.load_data_async(data_source, source_type)
            
            # 2. 数据预处理
            X, y = self.preprocess_data(data, target_column)
            
            # 3. 训练多个模型
            for model_type in model_types {
                self.logger.info(f"开始分析模型: {model_type}")
                
                # 训练模型
                result = self.train_model(X, y, model_type)
                
                # 生成洞察
                insights = self.generate_insights(result)
                
                # 创建可视化
                self.create_visualization(result, f"{output_dir}/plots")
                
                # 生成报告
                report = self.generate_report(result, insights)
                
                # 保存报告
                report_path = f"{output_dir}/report_{model_type}.md"
                with open(report_path, 'w', encoding='utf-8') as f {
                    f.write(report)
                }
                
                results[model_type] = {
                    'result': result,
                    'insights': insights,
                    'report_path': report_path
                }
                
                self.logger.info(f"模型 {model_type} 分析完成")
            }
            
            # 4. 比较模型
            if len(model_types) > 1 {
                best_model = max(
                    results.keys(),
                    key=lambda k: results[k]['result'].f1_score
                )
                
                self.logger.info(f"最佳模型: {best_model}")
                results['best_model'] = best_model
            }
            
            return results
            
        } catch Exception as e {
            self.logger.error(f"分析过程中出错: {e}")
            raise e
        }
    }
}

# 使用示例
if __name__ == "__main__" {
    import asyncio
    
    async func main() {
        # 配置
        config = {
            'log_level': 'INFO',
            'cache_size': 1000,
            'model_timeout': 300
        }
        
        # 创建分析器
        analyzer = IntelligentDataAnalyzer(config)
        
        # 运行完整分析
        results = await analyzer.run_complete_analysis(
            data_source="data/customer_data.csv",
            target_column="churn",
            source_type="csv",
            model_types=["random_forest", "gradient_boosting", "neural_network"],
            output_dir="./customer_analysis"
        )
        
        # 输出结果摘要
        print("\n=== 分析结果摘要 ===")
        for model_type, result_data in results.items() {
            if model_type != 'best_model' {
                result = result_data['result']
                print(f"\n{model_type}:")
                print(f"  准确率: {result.accuracy:.4f}")
                print(f"  F1分数: {result.f1_score:.4f}")
            }
        }
        
        if 'best_model' in results {
            print(f"\n最佳模型: {results['best_model']}")
        }
    }
    
    # 运行分析
    asyncio.run(main())
}

案例2:智能运维监控系统

# 智能运维监控系统
import asyncio
import aiohttp
import psutil
import json
import time
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, asdict
from enum import Enum
import logging
import sqlite3
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import threading
import queue
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler

class AlertLevel(Enum) {
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    CRITICAL = "critical"
}

class SystemStatus(Enum) {
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    CRITICAL = "critical"
    UNKNOWN = "unknown"
}

@dataclass
class MetricData {
    name: str
    value: float
    unit: str
    timestamp: datetime
    tags: Dict[str, str] = null
    metadata: Dict[str, Any] = null
}

@dataclass
class Alert {
    id: str
    level: AlertLevel
    title: str
    message: str
    source: str
    timestamp: datetime
    resolved: bool = false
    resolved_at: Optional[datetime] = null
    metadata: Dict[str, Any] = null
}

@dataclass
class SystemHealth {
    overall_status: SystemStatus
    cpu_usage: float
    memory_usage: float
    disk_usage: float
    network_latency: float
    active_alerts: int
    timestamp: datetime
    details: Dict[str, Any] = null
}

class IntelligentMonitoringSystem {
    func __init__(config: Dict[str, Any]) {
        self.config = config
        self.metrics_queue = queue.Queue(maxsize=10000)
        self.alerts_queue = queue.Queue(maxsize=1000)
        self.metric_collectors = {}
        self.alert_handlers = {}
        self.anomaly_detector = null
        self.running = false
        self.threads = []
        
        # 数据库连接
        self.db_path = config.get('db_path', 'monitoring.db')
        self.init_database()
        
        # 日志配置
        self.logger = self._setup_logger()
        
        # 初始化异常检测
        self._init_anomaly_detection()
    }
    
    func _setup_logger() -> logging.Logger {
        logger = logging.getLogger("MonitoringSystem")
        logger.setLevel(logging.INFO)
        
        # 文件处理器
        file_handler = logging.FileHandler('monitoring.log')
        file_formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        file_handler.setFormatter(file_formatter)
        logger.addHandler(file_handler)
        
        # 控制台处理器
        console_handler = logging.StreamHandler()
        console_formatter = logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s'
        )
        console_handler.setFormatter(console_formatter)
        logger.addHandler(console_handler)
        
        return logger
    }
    
    func init_database() {
        """初始化数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 指标表
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS metrics (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                value REAL NOT NULL,
                unit TEXT,
                timestamp TEXT NOT NULL,
                tags TEXT,
                metadata TEXT
            )
        """)
        
        # 告警表
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS alerts (
                id TEXT PRIMARY KEY,
                level TEXT NOT NULL,
                title TEXT NOT NULL,
                message TEXT,
                source TEXT NOT NULL,
                timestamp TEXT NOT NULL,
                resolved BOOLEAN DEFAULT FALSE,
                resolved_at TEXT,
                metadata TEXT
            )
        """)
        
        # 系统健康表
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS system_health (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                overall_status TEXT NOT NULL,
                cpu_usage REAL,
                memory_usage REAL,
                disk_usage REAL,
                network_latency REAL,
                active_alerts INTEGER,
                timestamp TEXT NOT NULL,
                details TEXT
            )
        """)
        
        conn.commit()
        conn.close()
    }
    
    func _init_anomaly_detection() {
        """初始化异常检测模型"""
        self.anomaly_detector = IsolationForest(
            contamination=0.1,  # 假设10%的数据是异常
            random_state=42
        )
        self.scaler = StandardScaler()
        self.anomaly_history = []
    }
    
    func register_metric_collector(name: str, collector: Callable) {
        """注册指标收集器"""
        self.metric_collectors[name] = collector
        self.logger.info(f"注册指标收集器: {name}")
    }
    
    func register_alert_handler(level: AlertLevel, handler: Callable) {
        """注册告警处理器"""
        self.alert_handlers[level] = handler
        self.logger.info(f"注册告警处理器: {level.value}")
    }
    
    func start_monitoring() {
        """启动监控系统"""
        if self.running {
            return
        }
        
        self.running = true
        
        # 启动指标收集线程
        metrics_thread = threading.Thread(target=self._metrics_collection_loop)
        metrics_thread.daemon = true
        metrics_thread.start()
        self.threads.append(metrics_thread)
        
        # 启动指标处理线程
        processing_thread = threading.Thread(target=self._metrics_processing_loop)
        processing_thread.daemon = true
        processing_thread.start()
        self.threads.append(processing_thread)
        
        # 启动告警处理线程
        alerts_thread = threading.Thread(target=self._alerts_processing_loop)
        alerts_thread.daemon = true
        alerts_thread.start()
        self.threads.append(alerts_thread)
        
        # 启动健康检查线程
        health_thread = threading.Thread(target=self._health_check_loop)
        health_thread.daemon = true
        health_thread.start()
        self.threads.append(health_thread)
        
        self.logger.info("监控系统已启动")
    }
    
    func stop_monitoring() {
        """停止监控系统"""
        self.running = false
        
        # 等待所有线程结束
        for thread in self.threads {
            thread.join(timeout=5)
        }
        
        self.logger.info("监控系统已停止")
    }
    
    func _metrics_collection_loop() {
        """指标收集循环"""
        while self.running {
            try {
                # 收集所有注册的指标
                for name, collector in self.metric_collectors.items() {
                    try {
                        metrics = collector()
                        if isinstance(metrics, list) {
                            for metric in metrics {
                                self.metrics_queue.put(metric, timeout=1)
                            }
                        } else {
                            self.metrics_queue.put(metrics, timeout=1)
                        }
                    } catch Exception as e {
                        self.logger.error(f"收集指标 {name} 时出错: {e}")
                    }
                }
                
                # 等待下次收集
                time.sleep(self.config.get('collection_interval', 30))
                
            } catch Exception as e {
                self.logger.error(f"指标收集循环出错: {e}")
                time.sleep(10)
            }
    }
    
    func _metrics_processing_loop() {
        """指标处理循环"""
        batch_size = self.config.get('batch_size', 100)
        batch = []
        
        while self.running {
            try {
                # 收集批量指标
                while len(batch) < batch_size and self.running {
                    try {
                        metric = self.metrics_queue.get(timeout=1)
                        batch.append(metric)
                    } catch queue.Empty {
                        break
                    }
                }
                
                if batch {
                    # 处理批量指标
                    self._process_metrics_batch(batch)
                    batch.clear()
                }
                
            } except Exception as e {
                self.logger.error(f"指标处理循环出错: {e}")
                time.sleep(5)
            }
    }
    
    func _process_metrics_batch(metrics: List[MetricData]) {
        """处理指标批次"""
        # 保存到数据库
        self._save_metrics_to_db(metrics)
        
        # 异常检测
        self._detect_anomalies(metrics)
        
        # 阈值检查
        self._check_thresholds(metrics)
    }
    
    func _save_metrics_to_db(metrics: List[MetricData]) {
        """保存指标到数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        for metric in metrics {
            cursor.execute("""
                INSERT INTO metrics (name, value, unit, timestamp, tags, metadata)
                VALUES (?, ?, ?, ?, ?, ?)
            """, (
                metric.name,
                metric.value,
                metric.unit,
                metric.timestamp.isoformat(),
                json.dumps(metric.tags) if metric.tags else null,
                json.dumps(metric.metadata) if metric.metadata else null
            ))
        }
        
        conn.commit()
        conn.close()
    }
    
    func _detect_anomalies(metrics: List[MetricData]) {
        """异常检测"""
        # 准备数据
        metric_values = {}
        for metric in metrics {
            if metric.name not in metric_values {
                metric_values[metric.name] = []
            }
            metric_values[metric.name].append(metric.value)
        }
        
        # 对每个指标进行异常检测
        for metric_name, values in metric_values.items() {
            if len(values) < 10 {  # 需要足够的数据点
                continue
            }
            
            try {
                # 标准化数据
                values_array = np.array(values).reshape(-1, 1)
                scaled_values = self.scaler.fit_transform(values_array)
                
                # 异常检测
                anomalies = self.anomaly_detector.fit_predict(scaled_values)
                
                # 处理异常
                for i, is_anomaly in enumerate(anomalies) {
                    if is_anomaly == -1 {  # 异常值
                        self._create_anomaly_alert(
                            metric_name,
                            values[i],
                            metrics[i].timestamp
                        )
                    }
                }
                
            } catch Exception as e {
                self.logger.error(f"异常检测失败 {metric_name}: {e}")
            }
    }
    
    func _check_thresholds(metrics: List[MetricData]) {
        """检查阈值"""
        thresholds = self.config.get('thresholds', {})
        
        for metric in metrics {
            if metric.name in thresholds {
                threshold_config = thresholds[metric.name]
                
                # 检查临界阈值
                if 'critical' in threshold_config {
                    critical_threshold = threshold_config['critical']
                    if self._check_threshold_condition(metric.value, critical_threshold) {
                        self._create_threshold_alert(
                            metric,
                            AlertLevel.CRITICAL,
                            critical_threshold
                        )
                    }
                }
                
                # 检查警告阈值
                elif 'warning' in threshold_config {
                    warning_threshold = threshold_config['warning']
                    if self._check_threshold_condition(metric.value, warning_threshold) {
                        self._create_threshold_alert(
                            metric,
                            AlertLevel.WARNING,
                            warning_threshold
                        )
                    }
                }
            }
        }
    }
    
    func _check_threshold_condition(value: float, threshold: Dict[str, Any]) -> bool {
        """检查阈值条件"""
        operator = threshold.get('operator', '>')
        threshold_value = threshold.get('value', 0)
        
        if operator == '>' {
            return value > threshold_value
        } elif operator == '<' {
            return value < threshold_value
        } elif operator == '>=' {
            return value >= threshold_value
        } elif operator == '<=' {
            return value <= threshold_value
        } elif operator == '==' {
            return value == threshold_value
        } elif operator == '!=' {
            return value != threshold_value
        }
        
        return false
    }
    
    func _create_anomaly_alert(metric_name: str, value: float, timestamp: datetime) {
        """创建异常告警"""
        alert = Alert(
            id=f"anomaly_{metric_name}_{int(timestamp.timestamp())}",
            level=AlertLevel.WARNING,
            title=f"指标异常: {metric_name}",
            message=f"指标 {metric_name} 的值 {value} 被检测为异常",
            source="anomaly_detector",
            timestamp=timestamp,
            metadata={
                'metric_name': metric_name,
                'metric_value': value,
                'detection_method': 'isolation_forest'
            }
        )
        
        self.alerts_queue.put(alert)
    }
    
    func _create_threshold_alert(
        metric: MetricData,
        level: AlertLevel,
        threshold: Dict[str, Any]
    ) {
        """创建阈值告警"""
        alert = Alert(
            id=f"threshold_{metric.name}_{int(metric.timestamp.timestamp())}",
            level=level,
            title=f"阈值告警: {metric.name}",
            message=f"指标 {metric.name} 的值 {metric.value} {threshold['operator']} {threshold['value']}",
            source="threshold_monitor",
            timestamp=metric.timestamp,
            metadata={
                'metric_name': metric.name,
                'metric_value': metric.value,
                'threshold': threshold
            }
        )
        
        self.alerts_queue.put(alert)
    }
    
    func _alerts_processing_loop() {
        """告警处理循环"""
        while self.running {
            try {
                alert = self.alerts_queue.get(timeout=1)
                self._process_alert(alert)
            } except queue.Empty {
                continue
            } except Exception as e {
                self.logger.error(f"告警处理循环出错: {e}")
                time.sleep(5)
            }
    }
    
    func _process_alert(alert: Alert) {
        """处理告警"""
        # 保存到数据库
        self._save_alert_to_db(alert)
        
        # 调用告警处理器
        if alert.level in self.alert_handlers {
            try {
                self.alert_handlers[alert.level](alert)
            } catch Exception as e {
                self.logger.error(f"告警处理器执行失败: {e}")
            }
        }
        
        self.logger.warning(f"告警: {alert.title} - {alert.message}")
    }
    
    func _save_alert_to_db(alert: Alert) {
        """保存告警到数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            INSERT OR REPLACE INTO alerts (
                id, level, title, message, source, timestamp, resolved, resolved_at, metadata
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            alert.id,
            alert.level.value,
            alert.title,
            alert.message,
            alert.source,
            alert.timestamp.isoformat(),
            alert.resolved,
            alert.resolved_at.isoformat() if alert.resolved_at else null,
            json.dumps(alert.metadata) if alert.metadata else null
        ))
        
        conn.commit()
        conn.close()
    }
    
    func _health_check_loop() {
        """健康检查循环"""
        while self.running {
            try {
                health = self._calculate_system_health()
                self._save_health_to_db(health)
                
                # 根据健康状态创建告警
                if health.overall_status == SystemStatus.CRITICAL {
                    self._create_health_alert(health, AlertLevel.CRITICAL)
                } elif health.overall_status == SystemStatus.DEGRADED {
                    self._create_health_alert(health, AlertLevel.WARNING)
                }
                
                time.sleep(self.config.get('health_check_interval', 60))
                
            } catch Exception as e {
                self.logger.error(f"健康检查循环出错: {e}")
                time.sleep(30)
            }
    }
    
    func _calculate_system_health() -> SystemHealth {
        """计算系统健康状态"""
        # 获取系统指标
        cpu_usage = psutil.cpu_percent(interval=1)
        memory = psutil.virtual_memory()
        disk = psutil.disk_usage('/')
        
        # 网络延迟(模拟)
        network_latency = self._measure_network_latency()
        
        # 活跃告警数量
        active_alerts = self._count_active_alerts()
        
        # 计算整体状态
        overall_status = self._determine_overall_status(
            cpu_usage, memory.percent, disk.percent, network_latency, active_alerts
        )
        
        return SystemHealth(
            overall_status=overall_status,
            cpu_usage=cpu_usage,
            memory_usage=memory.percent,
            disk_usage=disk.percent,
            network_latency=network_latency,
            active_alerts=active_alerts,
            timestamp=datetime.now(),
            details={
                'memory_available_gb': memory.available / (1024**3),
                'disk_free_gb': disk.free / (1024**3),
                'cpu_count': psutil.cpu_count()
            }
        )
    }
    
    func _measure_network_latency() -> float {
        """测量网络延迟"""
        import subprocess
        import platform
        
        try {
            # 根据操作系统选择ping命令
            if platform.system().lower() == "windows" {
                result = subprocess.run(
                    ['ping', '-n', '1', '8.8.8.8'],
                    capture_output=true,
                    text=true,
                    timeout=5
                )
            } else {
                result = subprocess.run(
                    ['ping', '-c', '1', '8.8.8.8'],
                    capture_output=true,
                    text=true,
                    timeout=5
                )
            }
            
            if result.returncode == 0 {
                # 解析延迟时间(简化实现)
                output = result.stdout
                if 'time=' in output {
                    import re
                    match = re.search(r'time[=<](\d+(?:\.\d+)?)\s*ms', output)
                    if match {
                        return float(match.group(1))
                    }
                }
            }
            
            return 999.0  # 超时或失败
            
        } catch Exception {
            return 999.0
        }
    }
    
    func _count_active_alerts() -> int {
        """统计活跃告警数量"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            SELECT COUNT(*) FROM alerts
            WHERE resolved = FALSE
            AND timestamp > datetime('now', '-1 hour')
        """)
        
        count = cursor.fetchone()[0]
        conn.close()
        
        return count
    }
    
    func _determine_overall_status(
        cpu_usage: float,
        memory_usage: float,
        disk_usage: float,
        network_latency: float,
        active_alerts: int
    ) -> SystemStatus {
        """确定整体状态"""
        critical_conditions = [
            cpu_usage > 90,
            memory_usage > 95,
            disk_usage > 95,
            network_latency > 1000,
            active_alerts > 10
        ]
        
        warning_conditions = [
            cpu_usage > 80,
            memory_usage > 85,
            disk_usage > 85,
            network_latency > 500,
            active_alerts > 5
        ]
        
        if any(critical_conditions) {
            return SystemStatus.CRITICAL
        } elif any(warning_conditions) {
            return SystemStatus.DEGRADED
        } else {
            return SystemStatus.HEALTHY
        }
    }
    
    func _save_health_to_db(health: SystemHealth) {
        """保存健康状态到数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            INSERT INTO system_health (
                overall_status, cpu_usage, memory_usage, disk_usage,
                network_latency, active_alerts, timestamp, details
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            health.overall_status.value,
            health.cpu_usage,
            health.memory_usage,
            health.disk_usage,
            health.network_latency,
            health.active_alerts,
            health.timestamp.isoformat(),
            json.dumps(health.details) if health.details else null
        ))
        
        conn.commit()
        conn.close()
    }
    
    func _create_health_alert(health: SystemHealth, level: AlertLevel) {
        """创建健康状态告警"""
        alert = Alert(
            id=f"health_{health.overall_status.value}_{int(health.timestamp.timestamp())}",
            level=level,
            title=f"系统健康状态: {health.overall_status.value}",
            message=f"系统状态为 {health.overall_status.value},CPU: {health.cpu_usage:.1f}%, 内存: {health.memory_usage:.1f}%, 磁盘: {health.disk_usage:.1f}%",
            source="health_monitor",
            timestamp=health.timestamp,
            metadata=asdict(health)
        )
        
        self.alerts_queue.put(alert)
    }
    
    func get_system_metrics(
        start_time: datetime,
        end_time: datetime,
        metric_names: Optional[List[str]] = null
    ) -> List[MetricData] {
        """获取系统指标"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        query = """
            SELECT name, value, unit, timestamp, tags, metadata
            FROM metrics
            WHERE timestamp BETWEEN ? AND ?
        """
        
        params = [start_time.isoformat(), end_time.isoformat()]
        
        if metric_names {
            placeholders = ','.join(['?' for _ in metric_names])
            query += f" AND name IN ({placeholders})"
            params.extend(metric_names)
        }
        
        query += " ORDER BY timestamp"
        
        cursor.execute(query, params)
        
        metrics = []
        for row in cursor.fetchall() {
            name, value, unit, timestamp, tags, metadata = row
            
            metric = MetricData(
                name=name,
                value=value,
                unit=unit,
                timestamp=datetime.fromisoformat(timestamp),
                tags=json.loads(tags) if tags else null,
                metadata=json.loads(metadata) if metadata else null
            )
            
            metrics.append(metric)
        }
        
        conn.close()
        return metrics
    }
    
    func get_alerts(
        start_time: datetime,
        end_time: datetime,
        levels: Optional[List[AlertLevel]] = null,
        resolved: Optional[bool] = null
    ) -> List[Alert] {
        """获取告警"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        query = """
            SELECT id, level, title, message, source, timestamp, resolved, resolved_at, metadata
            FROM alerts
            WHERE timestamp BETWEEN ? AND ?
        """
        
        params = [start_time.isoformat(), end_time.isoformat()]
        
        if levels {
            level_values = [level.value for level in levels]
            placeholders = ','.join(['?' for _ in level_values])
            query += f" AND level IN ({placeholders})"
            params.extend(level_values)
        }
        
        if resolved is not null {
            query += " AND resolved = ?"
            params.append(resolved)
        }
        
        query += " ORDER BY timestamp DESC"
        
        cursor.execute(query, params)
        
        alerts = []
        for row in cursor.fetchall() {
            (
                id, level, title, message, source, timestamp,
                resolved, resolved_at, metadata
            ) = row
            
            alert = Alert(
                id=id,
                level=AlertLevel(level),
                title=title,
                message=message,
                source=source,
                timestamp=datetime.fromisoformat(timestamp),
                resolved=bool(resolved),
                resolved_at=datetime.fromisoformat(resolved_at) if resolved_at else null,
                metadata=json.loads(metadata) if metadata else null
            )
            
            alerts.append(alert)
        }
        
        conn.close()
        return alerts
    }
    
    func resolve_alert(alert_id: str) {
        """解决告警"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            UPDATE alerts
            SET resolved = TRUE, resolved_at = ?
            WHERE id = ?
        """, (datetime.now().isoformat(), alert_id))
        
        conn.commit()
        conn.close()
        
        self.logger.info(f"告警已解决: {alert_id}")
    }
    
    func generate_dashboard_data() -> Dict[str, Any] {
        """生成仪表板数据"""
        now = datetime.now()
        one_hour_ago = now - timedelta(hours=1)
        
        # 获取最新的系统健康状态
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            SELECT overall_status, cpu_usage, memory_usage, disk_usage,
                   network_latency, active_alerts, timestamp
            FROM system_health
            ORDER BY timestamp DESC
            LIMIT 1
        """)
        
        health_row = cursor.fetchone()
        current_health = null
        
        if health_row {
            current_health = {
                'overall_status': health_row[0],
                'cpu_usage': health_row[1],
                'memory_usage': health_row[2],
                'disk_usage': health_row[3],
                'network_latency': health_row[4],
                'active_alerts': health_row[5],
                'timestamp': health_row[6]
            }
        }
        
        # 获取最近的告警
        recent_alerts = self.get_alerts(
            start_time=one_hour_ago,
            end_time=now,
            resolved=false
        )
        
        # 获取指标趋势
        metrics = self.get_system_metrics(
            start_time=one_hour_ago,
            end_time=now,
            metric_names=['cpu_usage', 'memory_usage', 'disk_usage']
        )
        
        # 按指标名称分组
        metrics_by_name = {}
        for metric in metrics {
            if metric.name not in metrics_by_name {
                metrics_by_name[metric.name] = []
            }
            metrics_by_name[metric.name].append({
                'timestamp': metric.timestamp.isoformat(),
                'value': metric.value
            })
        }
        
        conn.close()
        
        return {
            'current_health': current_health,
            'recent_alerts': [{
                'id': alert.id,
                'level': alert.level.value,
                'title': alert.title,
                'message': alert.message,
                'timestamp': alert.timestamp.isoformat()
            } for alert in recent_alerts],
            'metrics_trends': metrics_by_name,
            'summary': {
                'total_alerts_last_hour': len(recent_alerts),
                'critical_alerts': len([a for a in recent_alerts if a.level == AlertLevel.CRITICAL]),
                'warning_alerts': len([a for a in recent_alerts if a.level == AlertLevel.WARNING]),
                'system_uptime': self._get_system_uptime()
            }
        }
    
    func _get_system_uptime() -> str {
        """获取系统运行时间"""
        import uptime
        try {
            uptime_seconds = uptime.uptime()
            days = uptime_seconds // 86400
            hours = (uptime_seconds % 86400) // 3600
            minutes = (uptime_seconds % 3600) // 60
            return f"{int(days)}天 {int(hours)}小时 {int(minutes)}分钟"
        } catch Exception {
            return "未知"
        }
    }
}

# 预定义的指标收集器
class SystemMetricsCollector {
    @staticmethod
    func collect_cpu_metrics() -> List[MetricData] {
        """收集CPU指标"""
        cpu_percent = psutil.cpu_percent(interval=1)
        cpu_count = psutil.cpu_count()
        
        return [
            MetricData(
                name="cpu_usage",
                value=cpu_percent,
                unit="percent",
                timestamp=datetime.now(),
                tags={'type': 'system'},
                metadata={'cpu_count': cpu_count}
            )
        ]
    
    @staticmethod
    func collect_memory_metrics() -> List[MetricData] {
        """收集内存指标"""
        memory = psutil.virtual_memory()
        
        return [
            MetricData(
                name="memory_usage",
                value=memory.percent,
                unit="percent",
                timestamp=datetime.now(),
                tags={'type': 'system'},
                metadata={
                    'total_gb': memory.total / (1024**3),
                    'available_gb': memory.available / (1024**3)
                }
            )
        ]
    
    @staticmethod
    func collect_disk_metrics() -> List[MetricData] {
        """收集磁盘指标"""
        disk = psutil.disk_usage('/')
        
        return [
            MetricData(
                name="disk_usage",
                value=(disk.used / disk.total) * 100,
                unit="percent",
                timestamp=datetime.now(),
                tags={'type': 'system', 'mount': '/'},
                metadata={
                    'total_gb': disk.total / (1024**3),
                    'free_gb': disk.free / (1024**3)
                }
            )
        ]
    
    @staticmethod
    func collect_network_metrics() -> List[MetricData] {
        """收集网络指标"""
        network = psutil.net_io_counters()
        
        return [
            MetricData(
                name="network_bytes_sent",
                value=network.bytes_sent,
                unit="bytes",
                timestamp=datetime.now(),
                tags={'type': 'network', 'direction': 'sent'}
            ),
            MetricData(
                name="network_bytes_recv",
                value=network.bytes_recv,
                unit="bytes",
                timestamp=datetime.now(),
                tags={'type': 'network', 'direction': 'received'}
            )
        ]
}

# 预定义的告警处理器
class AlertHandlers {
    @staticmethod
    func email_handler(alert: Alert) {
        """邮件告警处理器"""
        # 这里是邮件发送的示例实现
        print(f"发送邮件告警: {alert.title} - {alert.message}")
    
    @staticmethod
    func webhook_handler(alert: Alert) {
        """Webhook告警处理器"""
        import requests
        
        webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
        
        payload = {
            "text": f"告警: {alert.title}",
            "attachments": [{
                "color": "danger" if alert.level == AlertLevel.CRITICAL else "warning",
                "fields": [
                    {"title": "级别", "value": alert.level.value, "short": true},
                    {"title": "来源", "value": alert.source, "short": true},
                    {"title": "消息", "value": alert.message, "short": false},
                    {"title": "时间", "value": alert.timestamp.strftime('%Y-%m-%d %H:%M:%S'), "short": true}
                ]
            }]
        }
        
        try {
            requests.post(webhook_url, json=payload, timeout=10)
        } catch Exception as e {
            print(f"Webhook发送失败: {e}")
        }
    
    @staticmethod
    func log_handler(alert: Alert) {
        """日志告警处理器"""
        logger = logging.getLogger("AlertHandler")
        
        if alert.level == AlertLevel.CRITICAL {
            logger.critical(f"{alert.title}: {alert.message}")
        } elif alert.level == AlertLevel.ERROR {
            logger.error(f"{alert.title}: {alert.message}")
        } elif alert.level == AlertLevel.WARNING {
            logger.warning(f"{alert.title}: {alert.message}")
        } else {
            logger.info(f"{alert.title}: {alert.message}")
        }
}

# 使用示例
if __name__ == "__main__" {
    # 配置
    config = {
        'db_path': 'monitoring.db',
        'collection_interval': 30,  # 30秒收集一次
        'health_check_interval': 60,  # 60秒检查一次健康状态
        'batch_size': 50,
        'thresholds': {
            'cpu_usage': {
                'warning': {'operator': '>', 'value': 80},
                'critical': {'operator': '>', 'value': 90}
            },
            'memory_usage': {
                'warning': {'operator': '>', 'value': 85},
                'critical': {'operator': '>', 'value': 95}
            },
            'disk_usage': {
                'warning': {'operator': '>', 'value': 85},
                'critical': {'operator': '>', 'value': 95}
            }
        }
    }
    
    # 创建监控系统
    monitoring = IntelligentMonitoringSystem(config)
    
    # 注册指标收集器
    monitoring.register_metric_collector(
        'cpu_metrics',
        SystemMetricsCollector.collect_cpu_metrics
    )
    monitoring.register_metric_collector(
        'memory_metrics',
        SystemMetricsCollector.collect_memory_metrics
    )
    monitoring.register_metric_collector(
        'disk_metrics',
        SystemMetricsCollector.collect_disk_metrics
    )
    monitoring.register_metric_collector(
        'network_metrics',
        SystemMetricsCollector.collect_network_metrics
    )
    
    # 注册告警处理器
    monitoring.register_alert_handler(AlertLevel.CRITICAL, AlertHandlers.email_handler)
    monitoring.register_alert_handler(AlertLevel.WARNING, AlertHandlers.webhook_handler)
    monitoring.register_alert_handler(AlertLevel.INFO, AlertHandlers.log_handler)
    
    try {
        # 启动监控
        monitoring.start_monitoring()
        
        print("监控系统已启动,按 Ctrl+C 停止...")
        
        # 保持运行
        while true {
            time.sleep(10)
            
            # 生成仪表板数据示例
            dashboard_data = monitoring.generate_dashboard_data()
            print(f"\n当前系统状态: {dashboard_data['current_health']['overall_status'] if dashboard_data['current_health'] else '未知'}")
            print(f"活跃告警数量: {dashboard_data['summary']['total_alerts_last_hour']}")
        }
        
    } except KeyboardInterrupt {
        print("\n正在停止监控系统...")
        monitoring.stop_monitoring()
        print("监控系统已停止")
    }
}

案例3:智能代码生成器

# 智能代码生成器
import ast
import inspect
import textwrap
from typing import Dict, List, Any, Optional, Union, Callable
from dataclasses import dataclass
from enum import Enum
import json
import re
from pathlib import Path

class CodeLanguage(Enum) {
    PYTHON = "python"
    JAVASCRIPT = "javascript"
    TYPESCRIPT = "typescript"
    JAVA = "java"
    CSHARP = "csharp"
    GO = "go"
    RUST = "rust"
}

class CodePattern(Enum) {
    CLASS = "class"
    FUNCTION = "function"
    API_ENDPOINT = "api_endpoint"
    DATABASE_MODEL = "database_model"
    TEST_CASE = "test_case"
    CONFIGURATION = "configuration"
    DOCUMENTATION = "documentation"
}

@dataclass
class CodeTemplate {
    name: str
    language: CodeLanguage
    pattern: CodePattern
    template: str
    variables: List[str]
    description: str
    examples: List[Dict[str, Any]] = null
    dependencies: List[str] = null
}

@dataclass
class GenerationRequest {
    pattern: CodePattern
    language: CodeLanguage
    parameters: Dict[str, Any]
    template_name: Optional[str] = null
    custom_template: Optional[str] = null
    output_format: str = "code"
}

@dataclass
class GenerationResult {
    code: str
    language: CodeLanguage
    pattern: CodePattern
    metadata: Dict[str, Any]
    suggestions: List[str] = null
    warnings: List[str] = null
}

class IntelligentCodeGenerator {
    func __init__() {
        self.templates = {}
        self.custom_generators = {}
        self.ai_models = {}
        self.code_analyzers = {}
        
        # 初始化内置模板
        self._init_builtin_templates()
        
        # 初始化代码分析器
        self._init_code_analyzers()
    }
    
    func _init_builtin_templates() {
        """初始化内置模板"""
        
        # Python类模板
        python_class_template = CodeTemplate(
            name="python_class",
            language=CodeLanguage.PYTHON,
            pattern=CodePattern.CLASS,
            template="""
class {class_name}:
    """{{class_description}}"""
    
    def __init__(self{{init_params}}):
        {{init_body}}
    
{{methods}}
""",
            variables=["class_name", "class_description", "init_params", "init_body", "methods"],
            description="Python类生成模板",
            examples=[
                {
                    "class_name": "UserManager",
                    "class_description": "用户管理类",
                    "init_params": ", database_url: str",
                    "init_body": "        self.database_url = database_url\n        self.connection = None",
                    "methods": "    def connect(self):\n        pass\n    \n    def create_user(self, user_data):\n        pass"
                }
            ]
        )
        
        # Python函数模板
        python_function_template = CodeTemplate(
            name="python_function",
            language=CodeLanguage.PYTHON,
            pattern=CodePattern.FUNCTION,
            template="""
def {function_name}({parameters}) -> {return_type}:
    """{{function_description}}
    
    Args:
{{args_docs}}
    
    Returns:
        {{return_description}}
    """
{{function_body}}
""",
            variables=["function_name", "parameters", "return_type", "function_description", "args_docs", "return_description", "function_body"],
            description="Python函数生成模板"
        )
        
        # API端点模板
        api_endpoint_template = CodeTemplate(
            name="fastapi_endpoint",
            language=CodeLanguage.PYTHON,
            pattern=CodePattern.API_ENDPOINT,
            template="""
@app.{http_method}("{endpoint_path}")
async def {endpoint_name}({parameters}):
    """{{endpoint_description}}"""
    try:
{{endpoint_body}}
        return {{success_response}}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
""",
            variables=["http_method", "endpoint_path", "endpoint_name", "parameters", "endpoint_description", "endpoint_body", "success_response"],
            description="FastAPI端点生成模板",
            dependencies=["fastapi", "pydantic"]
        )
        
        # 数据库模型模板
        database_model_template = CodeTemplate(
            name="sqlalchemy_model",
            language=CodeLanguage.PYTHON,
            pattern=CodePattern.DATABASE_MODEL,
            template="""
class {model_name}(Base):
    """{{model_description}}"""
    
    __tablename__ = "{table_name}"
    
{{fields}}
    
    def __repr__(self):
        return f"<{model_name}({{repr_fields}})>"
    
    def to_dict(self):
        return {
{{to_dict_fields}}
        }
""",
            variables=["model_name", "model_description", "table_name", "fields", "repr_fields", "to_dict_fields"],
            description="SQLAlchemy模型生成模板",
            dependencies=["sqlalchemy"]
        )
        
        # 注册模板
        self.register_template(python_class_template)
        self.register_template(python_function_template)
        self.register_template(api_endpoint_template)
        self.register_template(database_model_template)
    }
    
    func _init_code_analyzers() {
        """初始化代码分析器"""
        self.code_analyzers[CodeLanguage.PYTHON] = PythonCodeAnalyzer()
    }
    
    func register_template(template: CodeTemplate) {
        """注册代码模板"""
        key = f"{template.language.value}_{template.pattern.value}_{template.name}"
        self.templates[key] = template
    }
    
    func register_custom_generator(pattern: CodePattern, language: CodeLanguage, generator: Callable) {
        """注册自定义生成器"""
        key = f"{language.value}_{pattern.value}"
        self.custom_generators[key] = generator
    }
    
    func generate_code(request: GenerationRequest) -> GenerationResult {
        """生成代码"""
        try {
            # 选择生成方法
            if request.custom_template {
                code = self._generate_from_custom_template(request)
            } elif request.template_name {
                code = self._generate_from_named_template(request)
            } else {
                code = self._generate_from_pattern(request)
            }
            
            # 代码分析和优化
            analyzed_code, suggestions, warnings = self._analyze_generated_code(
                code, request.language
            )
            
            # 格式化代码
            formatted_code = self._format_code(analyzed_code, request.language)
            
            return GenerationResult(
                code=formatted_code,
                language=request.language,
                pattern=request.pattern,
                metadata={
                    'template_used': request.template_name,
                    'generation_time': datetime.now().isoformat(),
                    'parameters': request.parameters
                },
                suggestions=suggestions,
                warnings=warnings
            )
            
        } except Exception as e {
            raise ValueError(f"代码生成失败: {e}")
        }
    }
    
    func _generate_from_custom_template(request: GenerationRequest) -> str {
        """从自定义模板生成代码"""
        template = request.custom_template
        
        # 替换模板变量
        for key, value in request.parameters.items() {
            placeholder = f"{{{key}}}"
            template = template.replace(placeholder, str(value))
        }
        
        return template
    }
    
    func _generate_from_named_template(request: GenerationRequest) -> str {
        """从命名模板生成代码"""
        key = f"{request.language.value}_{request.pattern.value}_{request.template_name}"
        
        if key not in self.templates {
            raise ValueError(f"模板不存在: {request.template_name}")
        }
        
        template = self.templates[key]
        return self._apply_template(template, request.parameters)
    }
    
    func _generate_from_pattern(request: GenerationRequest) -> str {
        """从模式生成代码"""
        # 查找匹配的模板
        pattern_key = f"{request.language.value}_{request.pattern.value}"
        
        # 首先尝试自定义生成器
        if pattern_key in self.custom_generators {
            return self.custom_generators[pattern_key](request.parameters)
        }
        
        # 查找默认模板
        matching_templates = [
            template for key, template in self.templates.items()
            if key.startswith(pattern_key)
        ]
        
        if not matching_templates {
            raise ValueError(f"没有找到匹配的模板: {request.language.value} {request.pattern.value}")
        }
        
        # 使用第一个匹配的模板
        template = matching_templates[0]
        return self._apply_template(template, request.parameters)
    }
    
    func _apply_template(template: CodeTemplate, parameters: Dict[str, Any]) -> str {
        """应用模板"""
        code = template.template
        
        # 替换模板变量
        for variable in template.variables {
            if variable in parameters {
                placeholder = f"{{{variable}}}"
                value = parameters[variable]
                
                # 处理特殊格式
                if isinstance(value, list) {
                    if variable.endswith('_list') {
                        value = '\n'.join(str(item) for item in value)
                    } else {
                        value = ', '.join(str(item) for item in value)
                } elif isinstance(value, dict) {
                    value = json.dumps(value, indent=2)
                }
                
                code = code.replace(placeholder, str(value))
        }
        
        # 处理条件块
        code = self._process_conditional_blocks(code, parameters)
        
        return code
    }
    
    func _process_conditional_blocks(code: str, parameters: Dict[str, Any]) -> str {
        """处理条件块"""
        # 处理 {{if condition}} ... {{endif}} 块
        pattern = r'\{\{if\s+([^}]+)\}\}(.*?)\{\{endif\}\}'
        
        def replace_conditional(match) {
            condition = match.group(1).strip()
            content = match.group(2)
            
            # 简单的条件评估
            if condition in parameters and parameters[condition] {
                return content
            } else {
                return ""
            }
        }
        
        return re.sub(pattern, replace_conditional, code, flags=re.DOTALL)
    }
    
    func _analyze_generated_code(
        code: str,
        language: CodeLanguage
    ) -> tuple[str, List[str], List[str]] {
        """分析生成的代码"""
        suggestions = []
        warnings = []
        
        if language in self.code_analyzers {
            analyzer = self.code_analyzers[language]
            analyzed_code, analysis_suggestions, analysis_warnings = analyzer.analyze(code)
            suggestions.extend(analysis_suggestions)
            warnings.extend(analysis_warnings)
            return analyzed_code, suggestions, warnings
        }
        
        return code, suggestions, warnings
    }
    
    func _format_code(code: str, language: CodeLanguage) -> str {
        """格式化代码"""
        if language == CodeLanguage.PYTHON {
            try {
                import black
                return black.format_str(code, mode=black.FileMode())
            } catch ImportError {
                # 如果没有安装black,使用简单的格式化
                return textwrap.dedent(code).strip()
            }
        }
        
        return textwrap.dedent(code).strip()
    }
    
    func generate_batch(
        requests: List[GenerationRequest],
        output_dir: Optional[Path] = null
    ) -> List[GenerationResult] {
        """批量生成代码"""
        results = []
        
        for i, request in enumerate(requests) {
            try {
                result = self.generate_code(request)
                results.append(result)
                
                # 保存到文件
                if output_dir {
                    filename = f"generated_{request.pattern.value}_{i}.{self._get_file_extension(request.language)}"
                    file_path = output_dir / filename
                    
                    with open(file_path, 'w', encoding='utf-8') as f {
                        f.write(result.code)
                    }
                    
                    result.metadata['output_file'] = str(file_path)
                }
                
            } except Exception as e {
                # 创建错误结果
                error_result = GenerationResult(
                    code="",
                    language=request.language,
                    pattern=request.pattern,
                    metadata={'error': str(e)},
                    warnings=[f"生成失败: {e}"]
                )
                results.append(error_result)
        }
        
        return results
    }
    
    func _get_file_extension(language: CodeLanguage) -> str {
        """获取文件扩展名"""
        extensions = {
            CodeLanguage.PYTHON: "py",
            CodeLanguage.JAVASCRIPT: "js",
            CodeLanguage.TYPESCRIPT: "ts",
            CodeLanguage.JAVA: "java",
            CodeLanguage.CSHARP: "cs",
            CodeLanguage.GO: "go",
            CodeLanguage.RUST: "rs"
        }
        return extensions.get(language, "txt")
    }
    
    func get_available_templates(
        language: Optional[CodeLanguage] = null,
        pattern: Optional[CodePattern] = null
    ) -> List[CodeTemplate] {
        """获取可用模板"""
        templates = list(self.templates.values())
        
        if language {
            templates = [t for t in templates if t.language == language]
        }
        
        if pattern {
            templates = [t for t in templates if t.pattern == pattern]
        }
        
        return templates
    }
    
    func create_project_structure(
        project_name: str,
        language: CodeLanguage,
        components: List[str],
        output_dir: Path
    ) -> Dict[str, Any] {
        """创建项目结构"""
        project_dir = output_dir / project_name
        project_dir.mkdir(parents=true, exist_ok=true)
        
        generated_files = []
        
        # 根据语言和组件生成项目结构
        if language == CodeLanguage.PYTHON {
            # Python项目结构
            structure = self._create_python_project_structure(
                project_name, components, project_dir
            )
            generated_files.extend(structure)
        }
        
        return {
            'project_dir': str(project_dir),
            'generated_files': generated_files,
            'language': language.value,
            'components': components
        }
    }
    
    func _create_python_project_structure(
        project_name: str,
        components: List[str],
        project_dir: Path
    ) -> List[str] {
        """创建Python项目结构"""
        generated_files = []
        
        # 创建主包目录
        package_dir = project_dir / project_name
        package_dir.mkdir(exist_ok=true)
        
        # __init__.py
        init_file = package_dir / "__init__.py"
        init_file.write_text(f'"""\n{project_name} package\n"""\n\n__version__ = "0.1.0"\n')
        generated_files.append(str(init_file))
        
        # 根据组件生成文件
        for component in components {
            if component == "api" {
                api_file = package_dir / "api.py"
                api_code = self.generate_code(GenerationRequest(
                    pattern=CodePattern.API_ENDPOINT,
                    language=CodeLanguage.PYTHON,
                    parameters={
                        'http_method': 'get',
                        'endpoint_path': '/health',
                        'endpoint_name': 'health_check',
                        'parameters': '',
                        'endpoint_description': 'Health check endpoint',
                        'endpoint_body': '        # Health check logic\n        return {"status": "healthy"}',
                        'success_response': '{"status": "healthy"}'
                    }
                ))
                api_file.write_text(api_code.code)
                generated_files.append(str(api_file))
            }
            
            elif component == "models" {
                models_file = package_dir / "models.py"
                models_code = self.generate_code(GenerationRequest(
                    pattern=CodePattern.DATABASE_MODEL,
                    language=CodeLanguage.PYTHON,
                    parameters={
                        'model_name': 'BaseModel',
                        'model_description': 'Base model class',
                        'table_name': 'base_models',
                        'fields': '    id = Column(Integer, primary_key=True)\n    created_at = Column(DateTime, default=datetime.utcnow)',
                        'repr_fields': 'id={self.id}',
                        'to_dict_fields': '            "id": self.id,\n            "created_at": self.created_at.isoformat() if self.created_at else None'
                    }
                ))
                models_file.write_text(models_code.code)
                generated_files.append(str(models_file))
            }
        }
        
        # requirements.txt
        requirements_file = project_dir / "requirements.txt"
        requirements = ["fastapi", "uvicorn", "sqlalchemy", "pydantic"]
        requirements_file.write_text('\n'.join(requirements))
        generated_files.append(str(requirements_file))
        
        # README.md
        readme_file = project_dir / "README.md"
        readme_content = f"""
# {project_name}

## 安装

```bash
pip install -r requirements.txt

运行

uvicorn {project_name}.main:app --reload

”“” readme_file.write_text(readme_content.strip()) generated_files.append(str(readme_file))

    return generated_files
}

}

class PythonCodeAnalyzer { “”“Python代码分析器”“”

func analyze(code: str) -> tuple[str, List[str], List[str]] {
    """分析Python代码"""
    suggestions = []
    warnings = []

    try {
        # 解析AST
        tree = ast.parse(code)

        # 分析代码
        suggestions.extend(self._analyze_imports(tree))
        suggestions.extend(self._analyze_functions(tree))
        suggestions.extend(self._analyze_classes(tree))

        # 检查潜在问题
        warnings.extend(self._check_common_issues(code))

    } catch SyntaxError as e {
        warnings.append(f"语法错误: {e}")
    }

    return code, suggestions, warnings
}

func _analyze_imports(tree: ast.AST) -> List[str] {
    """分析导入语句"""
    suggestions = []

    imports = []
    for node in ast.walk(tree) {
        if isinstance(node, ast.Import) {
            for alias in node.names {
                imports.append(alias.name)
            }
        } elif isinstance(node, ast.ImportFrom) {
            if node.module {
                imports.append(node.module)
            }
        }
    }

    # 检查未使用的导入
    # 这里是简化实现
    if len(imports) > 10 {
        suggestions.append("考虑减少导入数量或使用延迟导入")
    }

    return suggestions
}

func _analyze_functions(tree: ast.AST) -> List[str] {
    """分析函数"""
    suggestions = []

    for node in ast.walk(tree) {
        if isinstance(node, ast.FunctionDef) {
            # 检查函数长度
            if len(node.body) > 20 {
                suggestions.append(f"函数 {node.name} 过长,考虑拆分")
            }

            # 检查参数数量
            if len(node.args.args) > 5 {
                suggestions.append(f"函数 {node.name} 参数过多,考虑使用数据类")
            }

            # 检查文档字符串
            if not ast.get_docstring(node) {
                suggestions.append(f"函数 {node.name} 缺少文档字符串")
            }
    }

    return suggestions
}

func _analyze_classes(tree: ast.AST) -> List[str] {
    """分析类"""
    suggestions = []

    for node in ast.walk(tree) {
        if isinstance(node, ast.ClassDef) {
            # 检查类的方法数量
            methods = [n for n in node.body if isinstance(n, ast.FunctionDef)]
            if len(methods) > 15 {
                suggestions.append(f"类 {node.name} 方法过多,考虑拆分")
            }

            # 检查文档字符串
            if not ast.get_docstring(node) {
                suggestions.append(f"类 {node.name} 缺少文档字符串")
            }
    }

    return suggestions
}

func _check_common_issues(code: str) -> List[str] {
    """检查常见问题"""
    warnings = []

    lines = code.split('\n')

    for i, line in enumerate(lines, 1) {
        # 检查行长度
        if len(line) > 100 {
            warnings.append(f"第{i}行过长 ({len(line)}字符)")
        }

        # 检查TODO注释
        if 'TODO' in line or 'FIXME' in line {
            warnings.append(f"第{i}行包含TODO/FIXME注释")
        }
    }

    return warnings
}

}

使用示例

if name == “main” { # 创建代码生成器 generator = IntelligentCodeGenerator()

# 生成单个代码文件
request = GenerationRequest(
    pattern=CodePattern.CLASS,
    language=CodeLanguage.PYTHON,
    parameters={
        'class_name': 'UserService',
        'class_description': '用户服务类,处理用户相关的业务逻辑',
        'init_params': ', database: Database',
        'init_body': '        self.database = database\n        self.logger = logging.getLogger(__name__)',
        'methods': '''    def create_user(self, user_data: dict) -> User:
    """创建新用户"""
    # 验证用户数据
    if not user_data.get('email'):
        raise ValueError("邮箱不能为空")

    # 创建用户
    user = User(**user_data)
    self.database.add(user)
    self.database.commit()

    self.logger.info(f"创建用户成功: {user.email}")
    return user

def get_user_by_id(self, user_id: int) -> Optional[User]:
    """根据ID获取用户"""
    return self.database.query(User).filter(User.id == user_id).first()

def update_user(self, user_id: int, update_data: dict) -> User:
    """更新用户信息"""
    user = self.get_user_by_id(user_id)
    if not user:
        raise ValueError(f"用户不存在: {user_id}")

    for key, value in update_data.items():
        setattr(user, key, value)

    self.database.commit()
    self.logger.info(f"更新用户成功: {user.email}")
    return user'''
    }
)

result = generator.generate_code(request)
print("生成的代码:")
print(result.code)

if result.suggestions {
    print("\n建议:")
    for suggestion in result.suggestions {
        print(f"- {suggestion}")
    }
}

if result.warnings {
    print("\n警告:")
    for warning in result.warnings {
        print(f"- {warning}")
    }
}

# 创建项目结构
from pathlib import Path

project_info = generator.create_project_structure(
    project_name="my_api",
    language=CodeLanguage.PYTHON,
    components=["api", "models"],
    output_dir=Path("./generated_projects")
)

print(f"\n项目已创建: {project_info['project_dir']}")
print("生成的文件:")
for file_path in project_info['generated_files'] {
    print(f"- {file_path}")
}

} “`

8.3 技术发展趋势

AI Script 的未来发展方向

1. 更强的AI集成能力

  • 多模态AI支持:集成视觉、语音、文本等多种AI模型
  • 自适应学习:根据使用模式自动优化脚本性能
  • 智能代码补全:基于上下文的智能代码建议

2. 云原生特性增强

  • 无服务器架构:原生支持Serverless部署
  • 容器化优化:更好的容器运行时支持
  • 微服务集成:简化微服务架构开发

3. 性能优化技术

  • JIT编译:即时编译提升执行效率
  • 并行计算:自动并行化处理
  • 内存优化:智能内存管理和垃圾回收

4. 开发体验改进

  • 可视化编程:图形化脚本编辑器
  • 实时调试:增强的调试和分析工具
  • 智能错误处理:自动错误诊断和修复建议

行业应用趋势

1. 企业级应用

  • 数字化转型:助力企业自动化流程
  • 智能运维:AI驱动的运维管理
  • 业务流程优化:智能化业务流程设计

2. 科研与教育

  • 科学计算:简化复杂科学计算流程
  • 教育工具:编程教育和AI教学
  • 研究辅助:自动化研究数据处理

3. 创新应用领域

  • IoT集成:物联网设备智能控制
  • 边缘计算:边缘设备AI应用开发
  • 区块链:智能合约和DApp开发

8.4 学习建议与资源

持续学习路径

阶段1:基础巩固(1-2个月)

  1. 语法熟练

    • 每日编写小型脚本练习
    • 完成所有章节的练习题
    • 参与开源项目贡献
  2. 工具掌握

    • 熟练使用开发环境
    • 掌握调试技巧
    • 学习版本控制

阶段2:实践应用(2-3个月)

  1. 项目实战

    • 完成3-5个实际项目
    • 涵盖不同应用领域
    • 注重代码质量和文档
  2. 技术深化

    • 深入学习AI集成
    • 掌握性能优化技巧
    • 了解部署和运维

阶段3:专业发展(3-6个月)

  1. 专业化方向

    • 选择专业领域深入
    • 成为某个方向的专家
    • 分享经验和知识
  2. 社区参与

    • 参与技术社区
    • 贡献开源项目
    • 指导新手学习

推荐学习资源

官方资源

社区资源

学习平台

  • 在线课程:Coursera、edX上的AI Script课程
  • 视频教程:YouTube、B站相关教学视频
  • 编程挑战:LeetCode、HackerRank的AI Script题目

书籍推荐

  1. 《AI Script编程实战》 - 深入实践指南
  2. 《人工智能脚本开发》 - 理论与应用结合
  3. 《现代脚本语言设计》 - 语言设计原理

实践项目建议

初级项目

  1. 个人任务管理器

    • 功能:任务创建、编辑、删除、提醒
    • 技术:基础语法、文件操作、定时任务
  2. 网站监控工具

    • 功能:网站可用性检查、性能监控、告警
    • 技术:网络请求、数据存储、通知系统
  3. 日志分析器

    • 功能:日志解析、统计分析、可视化
    • 技术:文本处理、数据分析、图表生成

中级项目

  1. 智能客服机器人

    • 功能:自然语言理解、智能回复、学习优化
    • 技术:AI模型集成、对话管理、知识库
  2. 自动化测试框架

    • 功能:测试用例生成、执行、报告
    • 技术:测试自动化、CI/CD集成、报告生成
  3. 数据ETL管道

    • 功能:数据提取、转换、加载、调度
    • 技术:数据处理、工作流管理、错误处理

高级项目

  1. 分布式爬虫系统

    • 功能:大规模数据采集、分布式处理、反爬虫
    • 技术:分布式架构、消息队列、数据存储
  2. AI驱动的代码审查工具

    • 功能:代码质量检查、安全漏洞检测、优化建议
    • 技术:代码分析、机器学习、规则引擎
  3. 智能运维平台

    • 功能:监控、告警、自动修复、容量规划
    • 技术:系统监控、AI预测、自动化运维

8.5 总结

通过本教程的学习,我们完整地掌握了AI Script编程语言的核心知识和实践技能。从基础语法到高级特性,从简单脚本到复杂系统,AI Script为我们提供了强大而灵活的编程工具。

核心收获

  1. 语言掌握:深入理解AI Script的语法特性和设计理念
  2. AI集成:学会如何将AI能力融入到实际应用中
  3. 自动化开发:掌握构建高效自动化系统的方法
  4. 工程实践:了解从开发到部署的完整工程流程
  5. 性能优化:学会识别和解决性能瓶颈
  6. 运维管理:掌握生产环境的部署和维护技能

发展建议

  1. 持续实践:通过实际项目不断提升编程技能
  2. 关注趋势:跟上AI和编程语言的发展趋势
  3. 社区参与:积极参与技术社区,分享和学习
  4. 知识更新:定期学习新技术和最佳实践
  5. 经验总结:及时总结项目经验和教训

未来展望

AI Script作为新一代编程语言,将在以下方面发挥重要作用:

  • AI民主化:让更多开发者能够轻松使用AI技术
  • 开发效率:显著提升软件开发和部署效率
  • 创新应用:催生更多创新的AI应用场景
  • 技术融合:促进AI与传统软件开发的深度融合

希望本教程能够为你的AI Script学习之旅提供坚实的基础,祝你在AI编程的道路上取得成功!


教程完结

感谢你完成了AI Script完整教程的学习。如果你有任何问题或建议,欢迎通过以下方式联系我们:

继续你的编程之旅,用AI Script创造更美好的未来!