本章将深入探讨AI Script的高级特性,包括元编程、插件系统、性能优化、并发编程等内容,帮助开发者构建更加强大和灵活的AI应用。
6.1 元编程与反射
动态代码生成
import ast
import inspect
from typing import Any, Dict, List, Callable
class CodeGenerator {
func __init__() {
self.templates = {}
self.generated_functions = {}
}
func register_template(name: str, template: str) {
"""注册代码模板"""
self.templates[name] = template
}
func generate_function(
template_name: str,
func_name: str,
params: Dict[str, Any]
) -> Callable {
"""根据模板生成函数"""
if template_name not in self.templates {
raise ValueError(f"模板 '{template_name}' 不存在")
}
template = self.templates[template_name]
# 替换模板中的占位符
code = template.format(**params)
# 编译代码
compiled_code = compile(code, f"<generated_{func_name}>", "exec")
# 执行代码并获取函数
namespace = {}
exec(compiled_code, globals(), namespace)
if func_name not in namespace {
raise ValueError(f"生成的代码中未找到函数 '{func_name}'")
}
generated_func = namespace[func_name]
self.generated_functions[func_name] = generated_func
return generated_func
}
func generate_class(
class_name: str,
attributes: Dict[str, Any],
methods: Dict[str, str]
) -> type {
"""动态生成类"""
# 构建类的代码
class_code = f"class {class_name}:\n"
# 添加__init__方法
if attributes {
init_params = ", ".join([f"{name}: {type(value).__name__}" for name, value in attributes.items()])
init_body = "\n".join([f" self.{name} = {name}" for name in attributes.keys()])
class_code += f" def __init__(self, {init_params}):\n{init_body}\n\n"
}
# 添加方法
for method_name, method_code in methods.items() {
# 确保方法代码正确缩进
indented_code = "\n".join([" " + line for line in method_code.split("\n")])
class_code += f"{indented_code}\n\n"
}
# 编译并执行代码
compiled_code = compile(class_code, f"<generated_{class_name}>", "exec")
namespace = {}
exec(compiled_code, globals(), namespace)
return namespace[class_name]
}
func create_data_class(
class_name: str,
fields: Dict[str, type]
) -> type {
"""创建数据类"""
attributes = {}
methods = {}
# 生成__init__方法
init_params = ", ".join([f"{name}: {field_type.__name__}" for name, field_type in fields.items()])
init_body = "\n".join([f" self.{name} = {name}" for name in fields.keys()])
# 生成__repr__方法
repr_fields = ", ".join([f"{name}={{self.{name}!r}}" for name in fields.keys()])
repr_method = f"""def __repr__(self):
return f"{class_name}({repr_fields})"""
# 生成__eq__方法
eq_comparisons = " and ".join([f"self.{name} == other.{name}" for name in fields.keys()])
eq_method = f"""def __eq__(self, other):
if not isinstance(other, {class_name}):
return False
return {eq_comparisons}"""
methods["__repr__"] = repr_method
methods["__eq__"] = eq_method
return self.generate_class(class_name, fields, methods)
}
}
# 使用代码生成器
code_gen = CodeGenerator()
# 注册函数模板
code_gen.register_template("validator", """
def {func_name}(value):
if not isinstance(value, {value_type}):
raise TypeError(f"Expected {value_type.__name__}, got {{type(value).__name__}}")
if {condition}:
raise ValueError("{error_message}")
return True
""")
# 生成验证函数
age_validator = code_gen.generate_function(
"validator",
"validate_age",
{
"value_type": int,
"condition": "value < 0 or value > 150",
"error_message": "Age must be between 0 and 150"
}
)
# 测试生成的函数
try {
age_validator(25) # 正常
print("年龄验证通过")
} catch Exception as e {
print(f"验证失败: {e}")
}
# 创建数据类
Person = code_gen.create_data_class("Person", {
"name": str,
"age": int,
"email": str
})
# 使用生成的数据类
person = Person("Alice", 30, "alice@example.com")
print(person)
print(person == Person("Alice", 30, "alice@example.com")) # True
反射与内省
import inspect
import types
from typing import Any, Dict, List, Callable
class Reflector {
@staticmethod
func get_class_info(cls: type) -> Dict[str, Any] {
"""获取类的详细信息"""
return {
"name": cls.__name__,
"module": cls.__module__,
"bases": [base.__name__ for base in cls.__bases__],
"attributes": [attr for attr in dir(cls) if not attr.startswith("_")],
"methods": [name for name, method in inspect.getmembers(cls, inspect.isfunction)],
"properties": [name for name, prop in inspect.getmembers(cls, lambda x: isinstance(x, property))],
"doc": inspect.getdoc(cls)
}
}
@staticmethod
func get_function_info(func: Callable) -> Dict[str, Any] {
"""获取函数的详细信息"""
signature = inspect.signature(func)
return {
"name": func.__name__,
"module": func.__module__,
"signature": str(signature),
"parameters": {
name: {
"annotation": param.annotation if param.annotation != param.empty else None,
"default": param.default if param.default != param.empty else None,
"kind": param.kind.name
}
for name, param in signature.parameters.items()
},
"return_annotation": signature.return_annotation if signature.return_annotation != signature.empty else None,
"doc": inspect.getdoc(func),
"source": inspect.getsource(func) if inspect.getsourcefile(func) else None
}
}
@staticmethod
func call_with_kwargs(func: Callable, kwargs: Dict[str, Any]) -> Any {
"""使用关键字参数调用函数,自动过滤不需要的参数"""
signature = inspect.signature(func)
filtered_kwargs = {}
for param_name in signature.parameters {
if param_name in kwargs {
filtered_kwargs[param_name] = kwargs[param_name]
}
}
return func(**filtered_kwargs)
}
@staticmethod
func create_proxy(target: Any, interceptor: Callable = null) -> Any {
"""创建对象代理"""
class Proxy {
func __init__(target, interceptor) {
self._target = target
self._interceptor = interceptor
}
func __getattr__(name) {
attr = getattr(self._target, name)
if callable(attr) and self._interceptor {
func wrapper(*args, **kwargs) {
# 调用拦截器
if self._interceptor {
result = self._interceptor("before", name, args, kwargs)
if result is not null {
return result
}
}
# 调用原方法
try {
result = attr(*args, **kwargs)
# 后置拦截
if self._interceptor {
self._interceptor("after", name, args, kwargs, result)
}
return result
} catch Exception as e {
if self._interceptor {
self._interceptor("error", name, args, kwargs, e)
}
raise
}
}
return wrapper
}
return attr
}
func __setattr__(name, value) {
if name.startswith("_") {
super().__setattr__(name, value)
} else {
setattr(self._target, name, value)
}
}
}
return Proxy(target, interceptor)
}
}
# 示例类
class Calculator {
func add(a: int, b: int) -> int {
"""加法运算"""
return a + b
}
func multiply(a: int, b: int) -> int {
"""乘法运算"""
return a * b
}
@property
func version() -> str {
return "1.0.0"
}
}
# 使用反射器
reflector = Reflector()
# 获取类信息
class_info = reflector.get_class_info(Calculator)
print("类信息:")
for key, value in class_info.items() {
print(f" {key}: {value}")
}
# 获取函数信息
func_info = reflector.get_function_info(Calculator.add)
print("\n函数信息:")
for key, value in func_info.items() {
print(f" {key}: {value}")
}
# 动态调用函数
calc = Calculator()
result = reflector.call_with_kwargs(calc.add, {"a": 10, "b": 20, "extra": "ignored"})
print(f"\n动态调用结果: {result}")
# 创建代理对象
func method_interceptor(phase: str, method_name: str, args: tuple, kwargs: dict, result: Any = null) {
if phase == "before" {
print(f"调用方法: {method_name}, 参数: {args}, {kwargs}")
} elif phase == "after" {
print(f"方法 {method_name} 执行完成, 结果: {result}")
} elif phase == "error" {
print(f"方法 {method_name} 执行出错: {result}")
}
}
proxy_calc = reflector.create_proxy(calc, method_interceptor)
result = proxy_calc.add(5, 3)
print(f"代理调用结果: {result}")
6.2 插件系统与扩展机制
插件架构设计
import importlib
import os
import sys
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Type
import json
class Plugin(ABC) {
"""插件基类"""
@property
@abstractmethod
func name() -> str {
"""插件名称"""
pass
}
@property
@abstractmethod
func version() -> str {
"""插件版本"""
pass
}
@property
@abstractmethod
func description() -> str {
"""插件描述"""
pass
}
@abstractmethod
func initialize(config: Dict[str, Any]) -> bool {
"""初始化插件"""
pass
}
@abstractmethod
func execute(context: Dict[str, Any]) -> Any {
"""执行插件功能"""
pass
}
@abstractmethod
func cleanup() {
"""清理资源"""
pass
}
func get_dependencies() -> List[str] {
"""获取依赖的插件列表"""
return []
}
func is_compatible(version: str) -> bool {
"""检查版本兼容性"""
return true
}
}
class PluginManager {
func __init__(plugin_dir: str = "plugins") {
self.plugin_dir = plugin_dir
self.plugins: Dict[str, Plugin] = {}
self.plugin_configs: Dict[str, Dict] = {}
self.load_order: List[str] = []
self.hooks: Dict[str, List[Callable]] = {}
# 确保插件目录存在
os.makedirs(plugin_dir, exist_ok=true)
}
func discover_plugins() -> List[str] {
"""发现可用的插件"""
discovered = []
for item in os.listdir(self.plugin_dir) {
plugin_path = os.path.join(self.plugin_dir, item)
# 检查是否是Python包
if os.path.isdir(plugin_path) {
init_file = os.path.join(plugin_path, "__init__.py")
if os.path.exists(init_file) {
discovered.append(item)
}
# 检查是否是Python模块
elif item.endswith(".py") and not item.startswith("_") {
discovered.append(item[:-3]) # 移除.py扩展名
}
}
return discovered
}
func load_plugin(plugin_name: str) -> bool {
"""加载单个插件"""
try {
# 添加插件目录到Python路径
if self.plugin_dir not in sys.path {
sys.path.insert(0, self.plugin_dir)
}
# 动态导入插件模块
module = importlib.import_module(plugin_name)
# 查找插件类
plugin_class = null
for attr_name in dir(module) {
attr = getattr(module, attr_name)
if (isinstance(attr, type) and
issubclass(attr, Plugin) and
attr != Plugin) {
plugin_class = attr
break
}
}
if not plugin_class {
print(f"在模块 '{plugin_name}' 中未找到插件类")
return false
}
# 实例化插件
plugin_instance = plugin_class()
# 检查依赖
dependencies = plugin_instance.get_dependencies()
for dep in dependencies {
if dep not in self.plugins {
print(f"插件 '{plugin_name}' 依赖的插件 '{dep}' 未加载")
return false
}
}
# 加载配置
config = self.load_plugin_config(plugin_name)
# 初始化插件
if plugin_instance.initialize(config) {
self.plugins[plugin_name] = plugin_instance
self.load_order.append(plugin_name)
print(f"插件 '{plugin_name}' 加载成功")
return true
} else {
print(f"插件 '{plugin_name}' 初始化失败")
return false
}
} catch Exception as e {
print(f"加载插件 '{plugin_name}' 时出错: {e}")
return false
}
}
func load_plugin_config(plugin_name: str) -> Dict[str, Any] {
"""加载插件配置"""
config_file = os.path.join(self.plugin_dir, f"{plugin_name}_config.json")
if os.path.exists(config_file) {
try {
with open(config_file, "r", encoding="utf-8") as f {
config = json.load(f)
self.plugin_configs[plugin_name] = config
return config
}
} catch Exception as e {
print(f"加载插件 '{plugin_name}' 配置失败: {e}")
}
}
return {}
}
func save_plugin_config(plugin_name: str, config: Dict[str, Any]) {
"""保存插件配置"""
config_file = os.path.join(self.plugin_dir, f"{plugin_name}_config.json")
try {
with open(config_file, "w", encoding="utf-8") as f {
json.dump(config, f, indent=2, ensure_ascii=false)
}
self.plugin_configs[plugin_name] = config
} catch Exception as e {
print(f"保存插件 '{plugin_name}' 配置失败: {e}")
}
}
func unload_plugin(plugin_name: str) -> bool {
"""卸载插件"""
if plugin_name not in self.plugins {
print(f"插件 '{plugin_name}' 未加载")
return false
}
try {
# 检查是否有其他插件依赖此插件
dependents = []
for name, plugin in self.plugins.items() {
if plugin_name in plugin.get_dependencies() {
dependents.append(name)
}
}
if dependents {
print(f"无法卸载插件 '{plugin_name}',以下插件依赖它: {dependents}")
return false
}
# 清理插件资源
self.plugins[plugin_name].cleanup()
# 移除插件
del self.plugins[plugin_name]
self.load_order.remove(plugin_name)
print(f"插件 '{plugin_name}' 卸载成功")
return true
} catch Exception as e {
print(f"卸载插件 '{plugin_name}' 时出错: {e}")
return false
}
}
func execute_plugin(plugin_name: str, context: Dict[str, Any]) -> Any {
"""执行插件"""
if plugin_name not in self.plugins {
raise ValueError(f"插件 '{plugin_name}' 未加载")
}
return self.plugins[plugin_name].execute(context)
}
func register_hook(hook_name: str, callback: Callable) {
"""注册钩子函数"""
if hook_name not in self.hooks {
self.hooks[hook_name] = []
}
self.hooks[hook_name].append(callback)
}
func trigger_hook(hook_name: str, *args, **kwargs) -> List[Any] {
"""触发钩子"""
results = []
if hook_name in self.hooks {
for callback in self.hooks[hook_name] {
try {
result = callback(*args, **kwargs)
results.append(result)
} catch Exception as e {
print(f"钩子 '{hook_name}' 执行失败: {e}")
}
}
}
return results
}
func get_plugin_info(plugin_name: str) -> Dict[str, Any] {
"""获取插件信息"""
if plugin_name not in self.plugins {
return null
}
plugin = self.plugins[plugin_name]
return {
"name": plugin.name,
"version": plugin.version,
"description": plugin.description,
"dependencies": plugin.get_dependencies(),
"config": self.plugin_configs.get(plugin_name, {})
}
}
func list_plugins() -> List[Dict[str, Any]] {
"""列出所有已加载的插件"""
return [self.get_plugin_info(name) for name in self.load_order]
}
func reload_plugin(plugin_name: str) -> bool {
"""重新加载插件"""
if plugin_name in self.plugins {
if not self.unload_plugin(plugin_name) {
return false
}
}
# 重新导入模块
if plugin_name in sys.modules {
importlib.reload(sys.modules[plugin_name])
}
return self.load_plugin(plugin_name)
}
}
# 示例插件实现
class TextProcessorPlugin(Plugin) {
@property
func name() -> str {
return "text_processor"
}
@property
func version() -> str {
return "1.0.0"
}
@property
func description() -> str {
return "文本处理插件,提供文本清理和格式化功能"
}
func initialize(config: Dict[str, Any]) -> bool {
self.config = config
self.enabled_features = config.get("features", ["clean", "format"])
print(f"文本处理插件初始化完成,启用功能: {self.enabled_features}")
return true
}
func execute(context: Dict[str, Any]) -> Any {
text = context.get("text", "")
operation = context.get("operation", "clean")
if operation == "clean" and "clean" in self.enabled_features {
# 清理文本
cleaned = text.strip().replace("\n\n", "\n")
return {"result": cleaned, "operation": "clean"}
} elif operation == "format" and "format" in self.enabled_features {
# 格式化文本
formatted = text.title()
return {"result": formatted, "operation": "format"}
} else {
return {"error": f"不支持的操作: {operation}"}
}
}
func cleanup() {
print("文本处理插件清理完成")
}
}
# 使用插件管理器
plugin_manager = PluginManager()
# 手动注册插件(在实际应用中,插件通常放在单独的文件中)
sys.modules["text_processor"] = types.ModuleType("text_processor")
sys.modules["text_processor"].TextProcessorPlugin = TextProcessorPlugin
# 加载插件
if plugin_manager.load_plugin("text_processor") {
# 执行插件
result = plugin_manager.execute_plugin("text_processor", {
"text": " hello world \n\n this is a test ",
"operation": "clean"
})
print(f"插件执行结果: {result}")
# 获取插件信息
info = plugin_manager.get_plugin_info("text_processor")
print(f"插件信息: {info}")
}
# 列出所有插件
print("已加载的插件:")
for plugin_info in plugin_manager.list_plugins() {
print(f" {plugin_info['name']} v{plugin_info['version']}: {plugin_info['description']}")
}
6.3 性能优化与调优
性能分析工具
import time
import functools
import cProfile
import pstats
import io
from typing import Any, Callable, Dict, List
from dataclasses import dataclass
from collections import defaultdict
import threading
import psutil
import gc
@dataclass
class PerformanceMetrics {
execution_time: float
memory_usage: float
cpu_usage: float
function_calls: int
cache_hits: int = 0
cache_misses: int = 0
}
class PerformanceProfiler {
func __init__() {
self.metrics: Dict[str, List[PerformanceMetrics]] = defaultdict(list)
self.active_profiles: Dict[str, Dict] = {}
self.lock = threading.Lock()
}
func profile_function(func_name: str = null) {
"""函数性能分析装饰器"""
func decorator(func: Callable) -> Callable {
name = func_name or f"{func.__module__}.{func.__name__}"
@functools.wraps(func)
func wrapper(*args, **kwargs) {
# 开始性能监控
start_time = time.time()
start_memory = psutil.Process().memory_info().rss / 1024 / 1024 # MB
start_cpu = psutil.Process().cpu_percent()
# 启用详细分析
profiler = cProfile.Profile()
profiler.enable()
try {
result = func(*args, **kwargs)
# 结束性能监控
end_time = time.time()
end_memory = psutil.Process().memory_info().rss / 1024 / 1024
end_cpu = psutil.Process().cpu_percent()
profiler.disable()
# 分析函数调用统计
stats_stream = io.StringIO()
stats = pstats.Stats(profiler, stream=stats_stream)
stats.sort_stats('cumulative')
# 计算指标
metrics = PerformanceMetrics(
execution_time=end_time - start_time,
memory_usage=end_memory - start_memory,
cpu_usage=max(0, end_cpu - start_cpu),
function_calls=stats.total_calls
)
# 记录指标
with self.lock {
self.metrics[name].append(metrics)
}
return result
} except Exception as e {
profiler.disable()
raise
}
}
return wrapper
}
return decorator
}
func start_profiling(session_name: str) {
"""开始性能分析会话"""
with self.lock {
self.active_profiles[session_name] = {
"start_time": time.time(),
"profiler": cProfile.Profile(),
"initial_memory": psutil.Process().memory_info().rss / 1024 / 1024
}
self.active_profiles[session_name]["profiler"].enable()
}
func stop_profiling(session_name: str) -> Dict[str, Any] {
"""停止性能分析会话"""
with self.lock {
if session_name not in self.active_profiles {
raise ValueError(f"分析会话 '{session_name}' 不存在")
}
session = self.active_profiles[session_name]
session["profiler"].disable()
end_time = time.time()
end_memory = psutil.Process().memory_info().rss / 1024 / 1024
# 生成统计报告
stats_stream = io.StringIO()
stats = pstats.Stats(session["profiler"], stream=stats_stream)
stats.sort_stats('cumulative')
stats.print_stats(20) # 显示前20个函数
report = {
"session_name": session_name,
"duration": end_time - session["start_time"],
"memory_delta": end_memory - session["initial_memory"],
"total_calls": stats.total_calls,
"stats_report": stats_stream.getvalue()
}
del self.active_profiles[session_name]
return report
}
func get_function_stats(func_name: str) -> Dict[str, Any] {
"""获取函数性能统计"""
if func_name not in self.metrics {
return null
}
metrics_list = self.metrics[func_name]
if not metrics_list {
return null
}
# 计算统计信息
execution_times = [m.execution_time for m in metrics_list]
memory_usages = [m.memory_usage for m in metrics_list]
cpu_usages = [m.cpu_usage for m in metrics_list]
return {
"function_name": func_name,
"call_count": len(metrics_list),
"avg_execution_time": sum(execution_times) / len(execution_times),
"max_execution_time": max(execution_times),
"min_execution_time": min(execution_times),
"avg_memory_usage": sum(memory_usages) / len(memory_usages),
"max_memory_usage": max(memory_usages),
"avg_cpu_usage": sum(cpu_usages) / len(cpu_usages),
"total_function_calls": sum(m.function_calls for m in metrics_list)
}
}
func generate_report() -> str {
"""生成性能报告"""
report = ["=== 性能分析报告 ==="]
for func_name in self.metrics.keys() {
stats = self.get_function_stats(func_name)
if stats {
report.append(f"\n函数: {func_name}")
report.append(f" 调用次数: {stats['call_count']}")
report.append(f" 平均执行时间: {stats['avg_execution_time']:.4f}s")
report.append(f" 最大执行时间: {stats['max_execution_time']:.4f}s")
report.append(f" 平均内存使用: {stats['avg_memory_usage']:.2f}MB")
report.append(f" 最大内存使用: {stats['max_memory_usage']:.2f}MB")
report.append(f" 平均CPU使用: {stats['avg_cpu_usage']:.2f}%")
}
}
return "\n".join(report)
}
func clear_metrics() {
"""清除所有性能指标"""
with self.lock {
self.metrics.clear()
}
}
# 缓存优化
class SmartCache {
func __init__(max_size: int = 1000, ttl: int = 3600) {
self.max_size = max_size
self.ttl = ttl
self.cache: Dict[str, Dict] = {}
self.access_times: Dict[str, float] = {}
self.hit_count = 0
self.miss_count = 0
self.lock = threading.RLock()
}
func _is_expired(key: str) -> bool {
"""检查缓存项是否过期"""
if key not in self.cache {
return true
}
return time.time() - self.cache[key]["timestamp"] > self.ttl
}
func _evict_expired() {
"""清除过期的缓存项"""
current_time = time.time()
expired_keys = [
key for key, data in self.cache.items()
if current_time - data["timestamp"] > self.ttl
]
for key in expired_keys {
del self.cache[key]
if key in self.access_times {
del self.access_times[key]
}
}
}
func _evict_lru() {
"""使用LRU策略清除缓存项"""
if len(self.cache) <= self.max_size {
return
}
# 找到最少使用的键
lru_key = min(self.access_times.keys(), key=lambda k: self.access_times[k])
del self.cache[lru_key]
del self.access_times[lru_key]
}
func get(key: str) -> Any {
"""获取缓存值"""
with self.lock {
if key in self.cache and not self._is_expired(key) {
self.access_times[key] = time.time()
self.hit_count += 1
return self.cache[key]["value"]
} else {
self.miss_count += 1
return null
}
}
}
func set(key: str, value: Any) {
"""设置缓存值"""
with self.lock {
# 清除过期项
self._evict_expired()
# 如果达到最大容量,使用LRU清除
if len(self.cache) >= self.max_size {
self._evict_lru()
}
self.cache[key] = {
"value": value,
"timestamp": time.time()
}
self.access_times[key] = time.time()
}
}
func delete(key: str) -> bool {
"""删除缓存项"""
with self.lock {
if key in self.cache {
del self.cache[key]
if key in self.access_times {
del self.access_times[key]
}
return true
}
return false
}
}
func clear() {
"""清空缓存"""
with self.lock {
self.cache.clear()
self.access_times.clear()
self.hit_count = 0
self.miss_count = 0
}
}
func get_stats() -> Dict[str, Any] {
"""获取缓存统计信息"""
with self.lock {
total_requests = self.hit_count + self.miss_count
hit_rate = self.hit_count / total_requests if total_requests > 0 else 0
return {
"size": len(self.cache),
"max_size": self.max_size,
"hit_count": self.hit_count,
"miss_count": self.miss_count,
"hit_rate": hit_rate,
"ttl": self.ttl
}
}
}
}
# 缓存装饰器
func cached(cache: SmartCache = null, key_func: Callable = null) {
"""缓存装饰器"""
if cache is null {
cache = SmartCache()
}
func decorator(func: Callable) -> Callable {
@functools.wraps(func)
func wrapper(*args, **kwargs) {
# 生成缓存键
if key_func {
cache_key = key_func(*args, **kwargs)
} else {
cache_key = f"{func.__name__}:{hash((args, tuple(sorted(kwargs.items()))))}"
}
# 尝试从缓存获取
cached_result = cache.get(cache_key)
if cached_result is not null {
return cached_result
}
# 执行函数并缓存结果
result = func(*args, **kwargs)
cache.set(cache_key, result)
return result
}
# 添加缓存管理方法
wrapper.cache = cache
wrapper.clear_cache = cache.clear
wrapper.cache_stats = cache.get_stats
return wrapper
}
return decorator
}
# 使用示例
profiler = PerformanceProfiler()
cache = SmartCache(max_size=100, ttl=300)
@profiler.profile_function("fibonacci_calculation")
@cached(cache)
func fibonacci(n: int) -> int {
"""计算斐波那契数列"""
if n <= 1 {
return n
}
return fibonacci(n - 1) + fibonacci(n - 2)
}
@profiler.profile_function("expensive_operation")
func expensive_operation(data: List[int]) -> float {
"""模拟耗时操作"""
time.sleep(0.1) # 模拟计算时间
return sum(x * x for x in data) / len(data)
}
# 性能测试
print("开始性能测试...")
# 测试斐波那契计算(带缓存)
for i in range(10, 15) {
result = fibonacci(i)
print(f"fibonacci({i}) = {result}")
}
# 测试耗时操作
test_data = list(range(1000))
for _ in range(5) {
result = expensive_operation(test_data)
}
# 查看性能统计
print("\n=== 性能统计 ===")
print(profiler.generate_report())
# 查看缓存统计
print("\n=== 缓存统计 ===")
stats = fibonacci.cache_stats()
for key, value in stats.items() {
print(f"{key}: {value}")
}
# 内存优化示例
class MemoryOptimizer {
@staticmethod
func optimize_memory() {
"""执行内存优化"""
# 强制垃圾回收
collected = gc.collect()
# 获取内存使用情况
process = psutil.Process()
memory_info = process.memory_info()
return {
"collected_objects": collected,
"memory_rss": memory_info.rss / 1024 / 1024, # MB
"memory_vms": memory_info.vms / 1024 / 1024, # MB
"memory_percent": process.memory_percent()
}
}
@staticmethod
func monitor_memory_usage(func: Callable) {
"""内存使用监控装饰器"""
@functools.wraps(func)
func wrapper(*args, **kwargs) {
# 记录初始内存
initial_memory = psutil.Process().memory_info().rss / 1024 / 1024
try {
result = func(*args, **kwargs)
return result
} finally {
# 记录最终内存
final_memory = psutil.Process().memory_info().rss / 1024 / 1024
memory_delta = final_memory - initial_memory
if memory_delta > 10 { # 如果内存增长超过10MB
print(f"警告: 函数 {func.__name__} 内存使用增长 {memory_delta:.2f}MB")
# 自动执行垃圾回收
collected = gc.collect()
if collected > 0 {
print(f"自动垃圾回收清理了 {collected} 个对象")
}
}
}
}
return wrapper
}
}
@MemoryOptimizer.monitor_memory_usage
func memory_intensive_function() {
"""内存密集型函数示例"""
# 创建大量对象
large_list = [i * i for i in range(100000)]
large_dict = {i: str(i) * 100 for i in range(10000)}
# 模拟一些处理
result = sum(large_list) + len(large_dict)
return result
}
# 测试内存监控
print("\n=== 内存优化测试 ===")
result = memory_intensive_function()
print(f"函数执行结果: {result}")
# 手动内存优化
optimization_result = MemoryOptimizer.optimize_memory()
print(f"内存优化结果: {optimization_result}")
6.4 并发编程与异步处理
异步编程模型
import asyncio
import aiohttp
import aiofiles
import concurrent.futures
import threading
import multiprocessing
from typing import List, Dict, Any, Callable, Awaitable
from dataclasses import dataclass
import time
import queue
import weakref
@dataclass
class TaskResult {
task_id: str
result: Any
error: Exception = null
execution_time: float = 0
status: str = "pending" # pending, running, completed, failed
}
class AsyncTaskManager {
func __init__(max_concurrent_tasks: int = 10) {
self.max_concurrent_tasks = max_concurrent_tasks
self.tasks: Dict[str, TaskResult] = {}
self.semaphore = asyncio.Semaphore(max_concurrent_tasks)
self.task_queue = asyncio.Queue()
self.workers_running = false
self.task_counter = 0
}
async func start_workers() {
"""启动工作协程"""
if self.workers_running {
return
}
self.workers_running = true
workers = []
for i in range(self.max_concurrent_tasks) {
worker = asyncio.create_task(self._worker(f"worker-{i}"))
workers.append(worker)
}
return workers
}
async func _worker(worker_name: str) {
"""工作协程"""
while self.workers_running {
try {
# 从队列获取任务
task_info = await asyncio.wait_for(
self.task_queue.get(),
timeout=1.0
)
task_id, coro, callback = task_info
# 更新任务状态
if task_id in self.tasks {
self.tasks[task_id].status = "running"
}
# 执行任务
start_time = time.time()
try {
async with self.semaphore {
result = await coro
}
execution_time = time.time() - start_time
# 更新任务结果
if task_id in self.tasks {
self.tasks[task_id].result = result
self.tasks[task_id].execution_time = execution_time
self.tasks[task_id].status = "completed"
}
# 调用回调函数
if callback {
try {
if asyncio.iscoroutinefunction(callback) {
await callback(task_id, result, null)
} else {
callback(task_id, result, null)
}
} catch Exception as e {
print(f"回调函数执行失败: {e}")
}
}
} catch Exception as e {
execution_time = time.time() - start_time
# 更新错误信息
if task_id in self.tasks {
self.tasks[task_id].error = e
self.tasks[task_id].execution_time = execution_time
self.tasks[task_id].status = "failed"
}
# 调用错误回调
if callback {
try {
if asyncio.iscoroutinefunction(callback) {
await callback(task_id, null, e)
} else {
callback(task_id, null, e)
}
} catch Exception as cb_error {
print(f"错误回调函数执行失败: {cb_error}")
}
}
# 标记任务完成
self.task_queue.task_done()
} catch asyncio.TimeoutError {
# 队列为空,继续等待
continue
} except Exception as e {
print(f"工作协程 {worker_name} 出错: {e}")
}
}
async func submit_task(
coro: Awaitable,
task_id: str = null,
callback: Callable = null
) -> str {
"""提交异步任务"""
if task_id is null {
self.task_counter += 1
task_id = f"task-{self.task_counter}"
}
# 创建任务结果对象
self.tasks[task_id] = TaskResult(task_id=task_id, result=null)
# 将任务添加到队列
await self.task_queue.put((task_id, coro, callback))
return task_id
}
async func wait_for_task(task_id: str, timeout: float = null) -> TaskResult {
"""等待任务完成"""
start_time = time.time()
while task_id in self.tasks {
task_result = self.tasks[task_id]
if task_result.status in ["completed", "failed"] {
return task_result
}
if timeout and (time.time() - start_time) > timeout {
raise asyncio.TimeoutError(f"任务 {task_id} 超时")
}
await asyncio.sleep(0.1)
}
raise ValueError(f"任务 {task_id} 不存在")
}
async func wait_for_all(task_ids: List[str], timeout: float = null) -> List[TaskResult] {
"""等待多个任务完成"""
results = []
for task_id in task_ids {
result = await self.wait_for_task(task_id, timeout)
results.append(result)
}
return results
}
func get_task_status(task_id: str) -> TaskResult {
"""获取任务状态"""
return self.tasks.get(task_id)
}
func list_tasks() -> List[TaskResult] {
"""列出所有任务"""
return list(self.tasks.values())
}
async func cancel_task(task_id: str) -> bool {
"""取消任务"""
if task_id in self.tasks {
task_result = self.tasks[task_id]
if task_result.status == "pending" {
task_result.status = "cancelled"
return true
}
}
return false
}
async func shutdown() {
"""关闭任务管理器"""
self.workers_running = false
# 等待队列中的任务完成
await self.task_queue.join()
print("异步任务管理器已关闭")
}
# HTTP客户端异步操作
class AsyncHTTPClient {
func __init__(timeout: int = 30, max_connections: int = 100) {
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.connector = aiohttp.TCPConnector(limit=max_connections)
self.session = null
}
async func __aenter__() {
self.session = aiohttp.ClientSession(
timeout=self.timeout,
connector=self.connector
)
return self
}
async func __aexit__(exc_type, exc_val, exc_tb) {
if self.session {
await self.session.close()
}
}
async func get(url: str, headers: Dict = null) -> Dict[str, Any] {
"""异步GET请求"""
async with self.session.get(url, headers=headers) as response {
return {
"status": response.status,
"headers": dict(response.headers),
"content": await response.text(),
"url": str(response.url)
}
}
}
async func post(url: str, data: Any = null, json: Dict = null, headers: Dict = null) -> Dict[str, Any] {
"""异步POST请求"""
async with self.session.post(url, data=data, json=json, headers=headers) as response {
return {
"status": response.status,
"headers": dict(response.headers),
"content": await response.text(),
"url": str(response.url)
}
}
}
async func download_file(url: str, file_path: str, chunk_size: int = 8192) -> Dict[str, Any] {
"""异步下载文件"""
start_time = time.time()
total_size = 0
async with self.session.get(url) as response {
if response.status == 200 {
async with aiofiles.open(file_path, "wb") as file {
async for chunk in response.content.iter_chunked(chunk_size) {
await file.write(chunk)
total_size += len(chunk)
}
}
download_time = time.time() - start_time
return {
"success": true,
"file_path": file_path,
"size": total_size,
"download_time": download_time,
"speed": total_size / download_time if download_time > 0 else 0
}
} else {
return {
"success": false,
"status": response.status,
"error": f"HTTP {response.status}"
}
}
}
}
async func batch_requests(urls: List[str], max_concurrent: int = 10) -> List[Dict[str, Any]] {
"""批量异步请求"""
semaphore = asyncio.Semaphore(max_concurrent)
async func fetch_with_semaphore(url: str) {
async with semaphore {
try {
return await self.get(url)
} catch Exception as e {
return {
"url": url,
"error": str(e),
"status": -1
}
}
}
}
tasks = [fetch_with_semaphore(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=true)
return results
}
# 异步文件操作
class AsyncFileProcessor {
@staticmethod
async func read_file(file_path: str, encoding: str = "utf-8") -> str {
"""异步读取文件"""
async with aiofiles.open(file_path, "r", encoding=encoding) as file {
return await file.read()
}
}
@staticmethod
async func write_file(file_path: str, content: str, encoding: str = "utf-8") {
"""异步写入文件"""
async with aiofiles.open(file_path, "w", encoding=encoding) as file {
await file.write(content)
}
}
@staticmethod
async func process_files_batch(
file_paths: List[str],
processor: Callable[[str], Awaitable[str]],
max_concurrent: int = 5
) -> List[str] {
"""批量处理文件"""
semaphore = asyncio.Semaphore(max_concurrent)
async func process_with_semaphore(file_path: str) {
async with semaphore {
content = await AsyncFileProcessor.read_file(file_path)
processed_content = await processor(content)
return processed_content
}
}
tasks = [process_with_semaphore(path) for path in file_paths]
results = await asyncio.gather(*tasks)
return results
}
# 使用示例
async func example_async_operations() {
"""异步操作示例"""
# 创建任务管理器
task_manager = AsyncTaskManager(max_concurrent_tasks=5)
workers = await task_manager.start_workers()
# 定义一些异步任务
async func slow_calculation(n: int) -> int {
await asyncio.sleep(1) # 模拟耗时计算
return n * n
}
async func fetch_data(url: str) -> str {
await asyncio.sleep(0.5) # 模拟网络请求
return f"Data from {url}"
}
# 提交任务
task_ids = []
for i in range(10) {
task_id = await task_manager.submit_task(
slow_calculation(i),
callback=lambda tid, result, error: print(f"任务 {tid} 完成: {result}")
)
task_ids.append(task_id)
}
# 等待所有任务完成
results = await task_manager.wait_for_all(task_ids, timeout=10)
print("所有计算任务完成:")
for result in results {
print(f" 任务 {result.task_id}: {result.result} (耗时: {result.execution_time:.2f}s)")
}
# HTTP客户端示例
async with AsyncHTTPClient() as client {
# 批量请求
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/json"
]
print("\n开始批量HTTP请求...")
http_results = await client.batch_requests(urls, max_concurrent=3)
for i, result in enumerate(http_results) {
if "error" in result {
print(f"请求 {i+1} 失败: {result['error']}")
} else {
print(f"请求 {i+1} 成功: HTTP {result['status']}")
}
}
# 关闭任务管理器
await task_manager.shutdown()
# 运行异步示例
if __name__ == "__main__" {
print("开始异步操作示例...")
asyncio.run(example_async_operations())
print("异步操作示例完成")
}
6.5 部署与运维
应用打包与分发
import os
import shutil
import zipfile
import tarfile
import json
import subprocess
from pathlib import Path
from typing import List, Dict, Any
from dataclasses import dataclass
import hashlib
import datetime
@dataclass
class PackageInfo {
name: str
version: str
description: str
author: str
dependencies: List[str]
entry_point: str
build_date: str
checksum: str
}
class ApplicationPackager {
func __init__(project_root: str) {
self.project_root = Path(project_root)
self.build_dir = self.project_root / "build"
self.dist_dir = self.project_root / "dist"
self.package_info = null
}
func load_package_config(config_file: str = "package.json") -> PackageInfo {
"""加载包配置信息"""
config_path = self.project_root / config_file
if not config_path.exists() {
raise FileNotFoundError(f"配置文件 {config_file} 不存在")
}
with open(config_path, "r", encoding="utf-8") as f {
config = json.load(f)
}
self.package_info = PackageInfo(
name=config.get("name", "unknown"),
version=config.get("version", "1.0.0"),
description=config.get("description", ""),
author=config.get("author", ""),
dependencies=config.get("dependencies", []),
entry_point=config.get("entry_point", "main.ais"),
build_date=datetime.datetime.now().isoformat(),
checksum=""
)
return self.package_info
}
func prepare_build_environment() {
"""准备构建环境"""
# 清理旧的构建目录
if self.build_dir.exists() {
shutil.rmtree(self.build_dir)
}
if self.dist_dir.exists() {
shutil.rmtree(self.dist_dir)
}
# 创建构建目录
self.build_dir.mkdir(parents=true, exist_ok=true)
self.dist_dir.mkdir(parents=true, exist_ok=true)
print(f"构建环境已准备: {self.build_dir}")
}
func copy_source_files(exclude_patterns: List[str] = null) {
"""复制源文件到构建目录"""
if exclude_patterns is null {
exclude_patterns = [
"__pycache__", "*.pyc", ".git", ".gitignore",
"build", "dist", "*.log", "test_*", "tests"
]
}
func should_exclude(path: Path) -> bool {
for pattern in exclude_patterns {
if pattern in str(path) or path.match(pattern) {
return true
}
}
return false
}
# 复制源文件
for item in self.project_root.iterdir() {
if item.name in ["build", "dist"] {
continue
}
if should_exclude(item) {
continue
}
dest_path = self.build_dir / item.name
if item.is_dir() {
shutil.copytree(item, dest_path, ignore=shutil.ignore_patterns(*exclude_patterns))
} else {
shutil.copy2(item, dest_path)
}
print(f"源文件已复制到构建目录")
}
func install_dependencies() {
"""安装依赖包"""
if not self.package_info or not self.package_info.dependencies {
print("没有需要安装的依赖")
return
}
# 创建依赖目录
deps_dir = self.build_dir / "dependencies"
deps_dir.mkdir(exist_ok=true)
for dep in self.package_info.dependencies {
print(f"安装依赖: {dep}")
# 这里可以实现具体的依赖安装逻辑
# 例如从包管理器下载或复制本地包
try {
# 模拟依赖安装
result = subprocess.run(
["aiscript", "install", dep],
cwd=deps_dir,
capture_output=true,
text=true
)
if result.returncode == 0 {
print(f" {dep} 安装成功")
} else {
print(f" {dep} 安装失败: {result.stderr}")
}
} catch Exception as e {
print(f" {dep} 安装出错: {e}")
}
}
}
func optimize_code() {
"""代码优化"""
print("开始代码优化...")
# 遍历所有 .ais 文件进行优化
for ais_file in self.build_dir.rglob("*.ais") {
try {
with open(ais_file, "r", encoding="utf-8") as f {
content = f.read()
}
# 简单的代码优化:移除注释和空行
optimized_lines = []
for line in content.split("\n") {
stripped = line.strip()
if stripped and not stripped.startswith("#") {
optimized_lines.append(line)
}
optimized_content = "\n".join(optimized_lines)
with open(ais_file, "w", encoding="utf-8") as f {
f.write(optimized_content)
}
print(f" 优化文件: {ais_file.relative_to(self.build_dir)}")
} catch Exception as e {
print(f" 优化文件失败 {ais_file}: {e}")
}
}
print("代码优化完成")
}
func calculate_checksum() -> str {
"""计算包的校验和"""
hasher = hashlib.sha256()
# 遍历所有文件计算校验和
for file_path in sorted(self.build_dir.rglob("*")) {
if file_path.is_file() {
with open(file_path, "rb") as f {
for chunk in iter(lambda: f.read(4096), b"") {
hasher.update(chunk)
}
}
}
}
return hasher.hexdigest()
}
func create_package_manifest() {
"""创建包清单文件"""
if not self.package_info {
raise ValueError("包信息未加载")
}
# 更新校验和
self.package_info.checksum = self.calculate_checksum()
# 创建文件列表
file_list = []
for file_path in self.build_dir.rglob("*") {
if file_path.is_file() {
rel_path = file_path.relative_to(self.build_dir)
file_size = file_path.stat().st_size
file_list.append({
"path": str(rel_path),
"size": file_size
})
}
}
# 创建清单
manifest = {
"package": {
"name": self.package_info.name,
"version": self.package_info.version,
"description": self.package_info.description,
"author": self.package_info.author,
"entry_point": self.package_info.entry_point,
"build_date": self.package_info.build_date,
"checksum": self.package_info.checksum
},
"dependencies": self.package_info.dependencies,
"files": file_list
}
# 写入清单文件
manifest_path = self.build_dir / "MANIFEST.json"
with open(manifest_path, "w", encoding="utf-8") as f {
json.dump(manifest, f, indent=2, ensure_ascii=false)
}
print(f"包清单已创建: {manifest_path}")
}
func create_archive(format: str = "zip") -> str {
"""创建归档文件"""
if not self.package_info {
raise ValueError("包信息未加载")
}
archive_name = f"{self.package_info.name}-{self.package_info.version}"
if format == "zip" {
archive_path = self.dist_dir / f"{archive_name}.zip"
with zipfile.ZipFile(archive_path, "w", zipfile.ZIP_DEFLATED) as zipf {
for file_path in self.build_dir.rglob("*") {
if file_path.is_file() {
arc_name = file_path.relative_to(self.build_dir)
zipf.write(file_path, arc_name)
}
}
}
} elif format == "tar.gz" {
archive_path = self.dist_dir / f"{archive_name}.tar.gz"
with tarfile.open(archive_path, "w:gz") as tarf {
tarf.add(self.build_dir, arcname=archive_name)
}
} else {
raise ValueError(f"不支持的归档格式: {format}")
}
print(f"归档文件已创建: {archive_path}")
return str(archive_path)
}
func build_package(format: str = "zip", optimize: bool = true) -> str {
"""构建完整的包"""
print(f"开始构建包: {self.package_info.name if self.package_info else 'unknown'}")
# 1. 准备构建环境
self.prepare_build_environment()
# 2. 复制源文件
self.copy_source_files()
# 3. 安装依赖
self.install_dependencies()
# 4. 代码优化(可选)
if optimize {
self.optimize_code()
}
# 5. 创建包清单
self.create_package_manifest()
# 6. 创建归档文件
archive_path = self.create_archive(format)
print(f"包构建完成: {archive_path}")
return archive_path
}
# 部署管理器
class DeploymentManager {
func __init__(config_file: str = "deploy.json") {
self.config_file = config_file
self.config = self.load_config()
}
func load_config() -> Dict[str, Any] {
"""加载部署配置"""
if not os.path.exists(self.config_file) {
# 创建默认配置
default_config = {
"environments": {
"development": {
"host": "localhost",
"port": 8000,
"debug": true,
"log_level": "DEBUG"
},
"staging": {
"host": "staging.example.com",
"port": 80,
"debug": false,
"log_level": "INFO"
},
"production": {
"host": "production.example.com",
"port": 80,
"debug": false,
"log_level": "WARNING"
}
},
"deployment": {
"strategy": "rolling",
"max_parallel": 2,
"health_check_url": "/health",
"rollback_on_failure": true
}
}
with open(self.config_file, "w", encoding="utf-8") as f {
json.dump(default_config, f, indent=2)
}
return default_config
}
with open(self.config_file, "r", encoding="utf-8") as f {
return json.load(f)
}
}
func deploy_to_environment(package_path: str, environment: str) {
"""部署到指定环境"""
if environment not in self.config["environments"] {
raise ValueError(f"未知的环境: {environment}")
}
env_config = self.config["environments"][environment]
print(f"开始部署到 {environment} 环境...")
print(f"目标主机: {env_config['host']}:{env_config['port']}")
# 这里实现具体的部署逻辑
# 例如:上传文件、解压、启动服务等
try {
# 模拟部署过程
print("1. 上传包文件...")
# upload_package(package_path, env_config)
print("2. 解压包文件...")
# extract_package(env_config)
print("3. 安装依赖...")
# install_dependencies(env_config)
print("4. 启动服务...")
# start_service(env_config)
print("5. 健康检查...")
# health_check(env_config)
print(f"部署到 {environment} 环境成功!")
} catch Exception as e {
print(f"部署失败: {e}")
if self.config["deployment"]["rollback_on_failure"] {
print("开始回滚...")
# rollback(env_config)
}
raise
}
}
func list_environments() -> List[str] {
"""列出所有环境"""
return list(self.config["environments"].keys())
}
func get_environment_config(environment: str) -> Dict[str, Any] {
"""获取环境配置"""
return self.config["environments"].get(environment)
}
# 使用示例
if __name__ == "__main__" {
# 创建包配置文件
package_config = {
"name": "my-aiscript-app",
"version": "1.0.0",
"description": "My AI Script Application",
"author": "Developer",
"entry_point": "app.ais",
"dependencies": [
"aiscript-http",
"aiscript-db",
"aiscript-ml"
]
}
with open("package.json", "w", encoding="utf-8") as f {
json.dump(package_config, f, indent=2)
}
# 打包应用
packager = ApplicationPackager(".")
packager.load_package_config()
archive_path = packager.build_package(format="zip", optimize=true)
# 部署应用
deployment_manager = DeploymentManager()
print("\n可用的部署环境:")
for env in deployment_manager.list_environments() {
config = deployment_manager.get_environment_config(env)
print(f" {env}: {config['host']}:{config['port']}")
}
# 部署到开发环境
deployment_manager.deploy_to_environment(archive_path, "development")
}
6.6 章节总结
核心要点
元编程与反射
- 动态代码生成和执行
- 反射机制用于运行时检查和修改
- 代码模板和生成器的实现
插件系统
- 可扩展的插件架构设计
- 插件生命周期管理
- 插件间通信和依赖管理
性能优化
- 性能分析和监控工具
- 智能缓存策略
- 内存优化和垃圾回收
并发编程
- 异步编程模型
- 任务管理和调度
- 并发控制和资源管理
部署运维
- 应用打包和分发
- 多环境部署管理
- 自动化部署流程
最佳实践
代码组织
- 模块化设计,职责分离
- 使用抽象基类定义接口
- 合理使用设计模式
性能优化
- 定期进行性能分析
- 合理使用缓存机制
- 避免内存泄漏
并发安全
- 正确使用锁机制
- 避免死锁和竞态条件
- 合理控制并发度
部署管理
- 环境配置标准化
- 自动化部署流程
- 完善的回滚机制
练习题
元编程实践
- 实现一个代码生成器,根据配置文件生成 CRUD 操作代码
- 使用反射机制实现一个简单的 ORM 框架
插件开发
- 设计并实现一个日志插件系统
- 创建多个日志输出插件(文件、数据库、网络等)
性能优化
- 对一个计算密集型函数进行性能分析和优化
- 实现一个多级缓存系统
异步编程
- 使用异步编程实现一个网络爬虫
- 创建一个异步任务队列系统
部署实践
- 为你的 AI Script 应用创建完整的打包和部署流程
- 实现蓝绿部署或滚动部署策略
通过本章的学习,你应该掌握了 AI Script 的高级特性和扩展开发技巧,能够构建高性能、可扩展的 AI Script 应用程序。