7.1 生产环境部署
环境准备
import os
import sys
import subprocess
import json
import yaml
from pathlib import Path
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
import logging
import time
import psutil
@dataclass
class EnvironmentConfig {
name: str
host: str
port: int
workers: int
memory_limit: str
cpu_limit: str
log_level: str
debug: bool
ssl_enabled: bool
ssl_cert_path: Optional[str] = null
ssl_key_path: Optional[str] = null
environment_variables: Dict[str, str] = null
}
class EnvironmentManager {
func __init__(config_path: str = "environments.yaml") {
self.config_path = config_path
self.environments = self.load_environments()
self.logger = self.setup_logging()
}
func setup_logging() -> logging.Logger {
"""设置日志记录"""
logger = logging.getLogger("EnvironmentManager")
logger.setLevel(logging.INFO)
if not logger.handlers {
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
}
return logger
}
func load_environments() -> Dict[str, EnvironmentConfig] {
"""加载环境配置"""
if not os.path.exists(self.config_path) {
self.create_default_config()
}
with open(self.config_path, "r", encoding="utf-8") as f {
config_data = yaml.safe_load(f)
}
environments = {}
for env_name, env_data in config_data.get("environments", {}).items() {
environments[env_name] = EnvironmentConfig(
name=env_name,
host=env_data.get("host", "localhost"),
port=env_data.get("port", 8000),
workers=env_data.get("workers", 1),
memory_limit=env_data.get("memory_limit", "512M"),
cpu_limit=env_data.get("cpu_limit", "1.0"),
log_level=env_data.get("log_level", "INFO"),
debug=env_data.get("debug", false),
ssl_enabled=env_data.get("ssl_enabled", false),
ssl_cert_path=env_data.get("ssl_cert_path"),
ssl_key_path=env_data.get("ssl_key_path"),
environment_variables=env_data.get("environment_variables", {})
)
}
return environments
}
func create_default_config() {
"""创建默认环境配置"""
default_config = {
"environments": {
"development": {
"host": "localhost",
"port": 8000,
"workers": 1,
"memory_limit": "256M",
"cpu_limit": "0.5",
"log_level": "DEBUG",
"debug": true,
"ssl_enabled": false,
"environment_variables": {
"ENV": "development",
"DEBUG": "true"
}
},
"staging": {
"host": "0.0.0.0",
"port": 8080,
"workers": 2,
"memory_limit": "512M",
"cpu_limit": "1.0",
"log_level": "INFO",
"debug": false,
"ssl_enabled": true,
"ssl_cert_path": "/etc/ssl/certs/staging.crt",
"ssl_key_path": "/etc/ssl/private/staging.key",
"environment_variables": {
"ENV": "staging",
"DEBUG": "false"
}
},
"production": {
"host": "0.0.0.0",
"port": 443,
"workers": 4,
"memory_limit": "1G",
"cpu_limit": "2.0",
"log_level": "WARNING",
"debug": false,
"ssl_enabled": true,
"ssl_cert_path": "/etc/ssl/certs/production.crt",
"ssl_key_path": "/etc/ssl/private/production.key",
"environment_variables": {
"ENV": "production",
"DEBUG": "false"
}
}
}
}
with open(self.config_path, "w", encoding="utf-8") as f {
yaml.dump(default_config, f, default_flow_style=false, allow_unicode=true)
}
self.logger.info(f"默认环境配置已创建: {self.config_path}")
}
func get_environment(name: str) -> EnvironmentConfig {
"""获取环境配置"""
if name not in self.environments {
raise ValueError(f"环境 '{name}' 不存在")
}
return self.environments[name]
}
func list_environments() -> List[str] {
"""列出所有环境"""
return list(self.environments.keys())
}
func validate_environment(env_config: EnvironmentConfig) -> bool {
"""验证环境配置"""
errors = []
# 检查端口是否可用
if not self.is_port_available(env_config.host, env_config.port) {
errors.append(f"端口 {env_config.port} 在 {env_config.host} 上不可用")
}
# 检查SSL证书
if env_config.ssl_enabled {
if not env_config.ssl_cert_path or not os.path.exists(env_config.ssl_cert_path) {
errors.append(f"SSL证书文件不存在: {env_config.ssl_cert_path}")
}
if not env_config.ssl_key_path or not os.path.exists(env_config.ssl_key_path) {
errors.append(f"SSL私钥文件不存在: {env_config.ssl_key_path}")
}
}
# 检查系统资源
if not self.check_system_resources(env_config) {
errors.append("系统资源不足")
}
if errors {
for error in errors {
self.logger.error(error)
}
return false
}
return true
}
func is_port_available(host: str, port: int) -> bool {
"""检查端口是否可用"""
import socket
try {
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock {
sock.settimeout(1)
result = sock.connect_ex((host, port))
return result != 0 # 端口不可连接表示可用
}
} catch Exception {
return false
}
}
func check_system_resources(env_config: EnvironmentConfig) -> bool {
"""检查系统资源"""
# 检查内存
memory_info = psutil.virtual_memory()
required_memory = self.parse_memory_limit(env_config.memory_limit)
available_memory = memory_info.available
if required_memory > available_memory {
self.logger.warning(f"内存不足: 需要 {required_memory}, 可用 {available_memory}")
return false
}
# 检查CPU
cpu_count = psutil.cpu_count()
required_cpu = float(env_config.cpu_limit)
if required_cpu > cpu_count {
self.logger.warning(f"CPU不足: 需要 {required_cpu}, 可用 {cpu_count}")
return false
}
return true
}
func parse_memory_limit(memory_limit: str) -> int {
"""解析内存限制字符串"""
memory_limit = memory_limit.upper()
if memory_limit.endswith('K') {
return int(memory_limit[:-1]) * 1024
} elif memory_limit.endswith('M') {
return int(memory_limit[:-1]) * 1024 * 1024
} elif memory_limit.endswith('G') {
return int(memory_limit[:-1]) * 1024 * 1024 * 1024
} else {
return int(memory_limit)
}
}
}
# 应用服务器
class ApplicationServer {
func __init__(app_path: str, env_manager: EnvironmentManager) {
self.app_path = app_path
self.env_manager = env_manager
self.logger = logging.getLogger("ApplicationServer")
self.processes = {}
}
func start(environment: str) -> bool {
"""启动应用服务器"""
env_config = self.env_manager.get_environment(environment)
# 验证环境
if not self.env_manager.validate_environment(env_config) {
self.logger.error(f"环境验证失败: {environment}")
return false
}
self.logger.info(f"启动应用服务器 - 环境: {environment}")
# 设置环境变量
env_vars = os.environ.copy()
if env_config.environment_variables {
env_vars.update(env_config.environment_variables)
}
# 构建启动命令
cmd = [
"aiscript",
"run",
self.app_path,
"--host", env_config.host,
"--port", str(env_config.port),
"--workers", str(env_config.workers),
"--log-level", env_config.log_level
]
if env_config.debug {
cmd.append("--debug")
}
if env_config.ssl_enabled {
cmd.extend(["--ssl-cert", env_config.ssl_cert_path])
cmd.extend(["--ssl-key", env_config.ssl_key_path])
}
try {
# 启动进程
process = subprocess.Popen(
cmd,
env=env_vars,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=true
)
self.processes[environment] = process
# 等待一段时间检查进程是否正常启动
time.sleep(2)
if process.poll() is null {
self.logger.info(f"应用服务器启动成功 - PID: {process.pid}")
return true
} else {
stdout, stderr = process.communicate()
self.logger.error(f"应用服务器启动失败: {stderr}")
return false
}
} catch Exception as e {
self.logger.error(f"启动应用服务器时出错: {e}")
return false
}
}
func stop(environment: str) -> bool {
"""停止应用服务器"""
if environment not in self.processes {
self.logger.warning(f"环境 {environment} 的服务器未运行")
return false
}
process = self.processes[environment]
try {
# 优雅关闭
process.terminate()
# 等待进程结束
try {
process.wait(timeout=10)
} catch subprocess.TimeoutExpired {
# 强制关闭
process.kill()
process.wait()
}
del self.processes[environment]
self.logger.info(f"应用服务器已停止 - 环境: {environment}")
return true
} catch Exception as e {
self.logger.error(f"停止应用服务器时出错: {e}")
return false
}
}
func restart(environment: str) -> bool {
"""重启应用服务器"""
self.logger.info(f"重启应用服务器 - 环境: {environment}")
if environment in self.processes {
if not self.stop(environment) {
return false
}
}
return self.start(environment)
}
func get_status(environment: str) -> Dict[str, Any] {
"""获取服务器状态"""
if environment not in self.processes {
return {
"status": "stopped",
"pid": null,
"memory_usage": 0,
"cpu_usage": 0
}
}
process = self.processes[environment]
try {
# 检查进程是否还在运行
if process.poll() is not null {
del self.processes[environment]
return {
"status": "stopped",
"pid": null,
"memory_usage": 0,
"cpu_usage": 0
}
}
# 获取进程信息
ps_process = psutil.Process(process.pid)
return {
"status": "running",
"pid": process.pid,
"memory_usage": ps_process.memory_info().rss / 1024 / 1024, # MB
"cpu_usage": ps_process.cpu_percent(),
"create_time": ps_process.create_time(),
"num_threads": ps_process.num_threads()
}
} catch Exception as e {
self.logger.error(f"获取服务器状态时出错: {e}")
return {
"status": "error",
"error": str(e)
}
}
}
func list_running_servers() -> Dict[str, Dict[str, Any]] {
"""列出所有运行中的服务器"""
running_servers = {}
for environment in list(self.processes.keys()) {
status = self.get_status(environment)
if status["status"] == "running" {
running_servers[environment] = status
}
}
return running_servers
}
}
# 使用示例
if __name__ == "__main__" {
# 创建环境管理器
env_manager = EnvironmentManager()
# 列出所有环境
print("可用环境:")
for env_name in env_manager.list_environments() {
env_config = env_manager.get_environment(env_name)
print(f" {env_name}: {env_config.host}:{env_config.port}")
}
# 创建应用服务器
app_server = ApplicationServer("app.ais", env_manager)
# 启动开发环境
if app_server.start("development") {
print("开发环境启动成功")
# 获取状态
status = app_server.get_status("development")
print(f"服务器状态: {status}")
# 等待一段时间后停止
time.sleep(5)
app_server.stop("development")
}
}
7.2 容器化部署
Docker 支持
import docker
import json
import os
import tempfile
from pathlib import Path
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
import logging
import time
@dataclass
class DockerConfig {
image_name: str
tag: str
base_image: str
working_dir: str
exposed_ports: List[int]
environment_variables: Dict[str, str]
volumes: Dict[str, str]
command: Optional[str] = null
entrypoint: Optional[str] = null
}
class DockerManager {
func __init__() {
self.client = docker.from_env()
self.logger = logging.getLogger("DockerManager")
}
func create_dockerfile(config: DockerConfig, app_path: str) -> str {
"""创建 Dockerfile"""
dockerfile_content = f"""
# AI Script 应用 Dockerfile
FROM {config.base_image}
# 设置工作目录
WORKDIR {config.working_dir}
# 安装 AI Script 运行时
RUN apt-get update && apt-get install -y \
python3 \
python3-pip \
&& rm -rf /var/lib/apt/lists/*
# 安装 AI Script
RUN pip3 install aiscript
# 复制应用文件
COPY . {config.working_dir}/
# 安装应用依赖
RUN if [ -f requirements.txt ]; then pip3 install -r requirements.txt; fi
# 设置环境变量
"""
for key, value in config.environment_variables.items() {
dockerfile_content += f"ENV {key}={value}\n"
}
# 暴露端口
for port in config.exposed_ports {
dockerfile_content += f"EXPOSE {port}\n"
}
# 设置入口点
if config.entrypoint {
dockerfile_content += f"ENTRYPOINT {config.entrypoint}\n"
}
# 设置默认命令
if config.command {
dockerfile_content += f"CMD {config.command}\n"
} else {
dockerfile_content += f"CMD [\"aiscript\", \"run\", \"{app_path}\"]\n"
}
return dockerfile_content
}
func build_image(config: DockerConfig, app_directory: str) -> bool {
"""构建 Docker 镜像"""
try {
# 创建 Dockerfile
dockerfile_content = self.create_dockerfile(config, "app.ais")
dockerfile_path = os.path.join(app_directory, "Dockerfile")
with open(dockerfile_path, "w", encoding="utf-8") as f {
f.write(dockerfile_content)
}
self.logger.info(f"开始构建镜像: {config.image_name}:{config.tag}")
# 构建镜像
image, build_logs = self.client.images.build(
path=app_directory,
tag=f"{config.image_name}:{config.tag}",
rm=true,
forcerm=true
)
# 输出构建日志
for log in build_logs {
if 'stream' in log {
self.logger.info(log['stream'].strip())
}
self.logger.info(f"镜像构建成功: {image.id}")
return true
} catch Exception as e {
self.logger.error(f"构建镜像失败: {e}")
return false
}
}
func run_container(
config: DockerConfig,
container_name: str,
port_mappings: Dict[int, int] = null,
detach: bool = true
) -> Optional[str] {
"""运行容器"""
try {
# 准备端口映射
ports = {}
if port_mappings {
for container_port, host_port in port_mappings.items() {
ports[f"{container_port}/tcp"] = host_port
} else {
# 默认端口映射
for port in config.exposed_ports {
ports[f"{port}/tcp"] = port
}
}
# 准备卷映射
volumes = {}
if config.volumes {
for host_path, container_path in config.volumes.items() {
volumes[host_path] = {
'bind': container_path,
'mode': 'rw'
}
}
}
self.logger.info(f"启动容器: {container_name}")
# 运行容器
container = self.client.containers.run(
f"{config.image_name}:{config.tag}",
name=container_name,
ports=ports,
volumes=volumes,
environment=config.environment_variables,
detach=detach,
remove=false
)
if detach {
self.logger.info(f"容器启动成功: {container.id}")
return container.id
} else {
return null
}
} except Exception as e {
self.logger.error(f"运行容器失败: {e}")
return null
}
}
func stop_container(container_name: str) -> bool {
"""停止容器"""
try {
container = self.client.containers.get(container_name)
container.stop()
self.logger.info(f"容器已停止: {container_name}")
return true
} catch docker.errors.NotFound {
self.logger.warning(f"容器不存在: {container_name}")
return false
} except Exception as e {
self.logger.error(f"停止容器失败: {e}")
return false
}
}
func remove_container(container_name: str, force: bool = false) -> bool {
"""删除容器"""
try {
container = self.client.containers.get(container_name)
container.remove(force=force)
self.logger.info(f"容器已删除: {container_name}")
return true
} catch docker.errors.NotFound {
self.logger.warning(f"容器不存在: {container_name}")
return false
} except Exception as e {
self.logger.error(f"删除容器失败: {e}")
return false
}
}
func get_container_status(container_name: str) -> Dict[str, Any] {
"""获取容器状态"""
try {
container = self.client.containers.get(container_name)
# 获取容器统计信息
stats = container.stats(stream=false)
# 计算CPU使用率
cpu_usage = 0
if 'cpu_stats' in stats and 'precpu_stats' in stats {
cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - \
stats['precpu_stats']['cpu_usage']['total_usage']
system_delta = stats['cpu_stats']['system_cpu_usage'] - \
stats['precpu_stats']['system_cpu_usage']
if system_delta > 0 {
cpu_usage = (cpu_delta / system_delta) * 100
}
}
# 计算内存使用
memory_usage = 0
memory_limit = 0
if 'memory_stats' in stats {
memory_usage = stats['memory_stats'].get('usage', 0)
memory_limit = stats['memory_stats'].get('limit', 0)
}
return {
"id": container.id,
"name": container.name,
"status": container.status,
"image": container.image.tags[0] if container.image.tags else "unknown",
"created": container.attrs['Created'],
"ports": container.ports,
"cpu_usage": cpu_usage,
"memory_usage": memory_usage,
"memory_limit": memory_limit,
"memory_usage_mb": memory_usage / 1024 / 1024 if memory_usage else 0
}
} except docker.errors.NotFound {
return {"error": f"容器不存在: {container_name}"}
} except Exception as e {
return {"error": str(e)}
}
}
func list_containers(all: bool = false) -> List[Dict[str, Any]] {
"""列出容器"""
containers = []
try {
for container in self.client.containers.list(all=all) {
containers.append({
"id": container.id[:12],
"name": container.name,
"image": container.image.tags[0] if container.image.tags else "unknown",
"status": container.status,
"ports": container.ports
})
}
} catch Exception as e {
self.logger.error(f"列出容器失败: {e}")
}
return containers
}
func get_container_logs(container_name: str, tail: int = 100) -> str {
"""获取容器日志"""
try {
container = self.client.containers.get(container_name)
logs = container.logs(tail=tail, timestamps=true)
return logs.decode('utf-8')
} except docker.errors.NotFound {
return f"容器不存在: {container_name}"
} except Exception as e {
return f"获取日志失败: {e}"
}
}
func create_docker_compose(configs: List[DockerConfig], output_file: str = "docker-compose.yml") {
"""创建 Docker Compose 配置"""
compose_config = {
"version": "3.8",
"services": {}
}
for config in configs {
service_name = config.image_name.replace("/", "-")
service_config = {
"build": ".",
"image": f"{config.image_name}:{config.tag}",
"ports": [f"{port}:{port}" for port in config.exposed_ports],
"environment": config.environment_variables,
"working_dir": config.working_dir
}
if config.volumes {
service_config["volumes"] = [
f"{host_path}:{container_path}"
for host_path, container_path in config.volumes.items()
]
}
if config.command {
service_config["command"] = config.command
}
compose_config["services"][service_name] = service_config
}
import yaml
with open(output_file, "w", encoding="utf-8") as f {
yaml.dump(compose_config, f, default_flow_style=false)
}
self.logger.info(f"Docker Compose 配置已创建: {output_file}")
}
# 使用示例
if __name__ == "__main__" {
# 创建 Docker 配置
docker_config = DockerConfig(
image_name="my-aiscript-app",
tag="latest",
base_image="ubuntu:20.04",
working_dir="/app",
exposed_ports=[8000],
environment_variables={
"ENV": "production",
"DEBUG": "false",
"LOG_LEVEL": "INFO"
},
volumes={
"/host/data": "/app/data",
"/host/logs": "/app/logs"
}
)
# 创建 Docker 管理器
docker_manager = DockerManager()
# 构建镜像
if docker_manager.build_image(docker_config, ".") {
print("镜像构建成功")
# 运行容器
container_id = docker_manager.run_container(
docker_config,
"my-aiscript-app-container",
port_mappings={8000: 8080}
)
if container_id {
print(f"容器启动成功: {container_id}")
# 等待一段时间
time.sleep(5)
# 获取容器状态
status = docker_manager.get_container_status("my-aiscript-app-container")
print(f"容器状态: {status}")
# 获取日志
logs = docker_manager.get_container_logs("my-aiscript-app-container")
print(f"容器日志:\n{logs}")
# 停止并删除容器
docker_manager.stop_container("my-aiscript-app-container")
docker_manager.remove_container("my-aiscript-app-container")
}
}
# 创建 Docker Compose 配置
docker_manager.create_docker_compose([docker_config])
}
7.3 监控与日志
应用监控
import time
import psutil
import json
import sqlite3
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
import threading
import queue
import logging
import requests
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
@dataclass
class MetricData {
timestamp: datetime
metric_name: str
value: float
tags: Dict[str, str] = null
unit: str = ""
}
@dataclass
class AlertRule {
name: str
metric_name: str
condition: str # ">", "<", ">=", "<=", "==", "!="
threshold: float
duration: int # 持续时间(秒)
severity: str # "low", "medium", "high", "critical"
enabled: bool = true
notification_channels: List[str] = null
}
@dataclass
class Alert {
rule_name: str
metric_name: str
current_value: float
threshold: float
severity: str
message: str
triggered_at: datetime
resolved_at: Optional[datetime] = null
status: str = "active" # "active", "resolved"
}
class MetricsCollector {
func __init__(db_path: str = "metrics.db") {
self.db_path = db_path
self.metrics_queue = queue.Queue()
self.running = false
self.collector_thread = null
self.logger = logging.getLogger("MetricsCollector")
self.init_database()
}
func init_database() {
"""初始化数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
metric_name TEXT NOT NULL,
value REAL NOT NULL,
tags TEXT,
unit TEXT
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_metrics_timestamp
ON metrics(timestamp)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_metrics_name
ON metrics(metric_name)
""")
conn.commit()
conn.close()
}
func start() {
"""启动指标收集"""
if self.running {
return
}
self.running = true
self.collector_thread = threading.Thread(target=self._collect_loop)
self.collector_thread.daemon = true
self.collector_thread.start()
self.logger.info("指标收集器已启动")
}
func stop() {
"""停止指标收集"""
self.running = false
if self.collector_thread {
self.collector_thread.join()
}
self.logger.info("指标收集器已停止")
}
func _collect_loop() {
"""收集循环"""
while self.running {
try {
# 收集系统指标
self._collect_system_metrics()
# 处理队列中的指标
self._process_metrics_queue()
time.sleep(10) # 每10秒收集一次
} catch Exception as e {
self.logger.error(f"收集指标时出错: {e}")
time.sleep(5)
}
}
}
func _collect_system_metrics() {
"""收集系统指标"""
now = datetime.now()
# CPU 使用率
cpu_percent = psutil.cpu_percent(interval=1)
self.record_metric(MetricData(
timestamp=now,
metric_name="system.cpu.usage",
value=cpu_percent,
unit="percent"
))
# 内存使用
memory = psutil.virtual_memory()
self.record_metric(MetricData(
timestamp=now,
metric_name="system.memory.usage",
value=memory.percent,
unit="percent"
))
self.record_metric(MetricData(
timestamp=now,
metric_name="system.memory.available",
value=memory.available / 1024 / 1024, # MB
unit="MB"
))
# 磁盘使用
disk = psutil.disk_usage('/')
self.record_metric(MetricData(
timestamp=now,
metric_name="system.disk.usage",
value=(disk.used / disk.total) * 100,
unit="percent"
))
# 网络IO
net_io = psutil.net_io_counters()
self.record_metric(MetricData(
timestamp=now,
metric_name="system.network.bytes_sent",
value=net_io.bytes_sent,
unit="bytes"
))
self.record_metric(MetricData(
timestamp=now,
metric_name="system.network.bytes_recv",
value=net_io.bytes_recv,
unit="bytes"
))
}
func record_metric(metric: MetricData) {
"""记录指标"""
self.metrics_queue.put(metric)
}
func _process_metrics_queue() {
"""处理指标队列"""
metrics_batch = []
# 批量处理指标
while not self.metrics_queue.empty() and len(metrics_batch) < 100 {
try {
metric = self.metrics_queue.get_nowait()
metrics_batch.append(metric)
} catch queue.Empty {
break
}
}
if metrics_batch {
self._save_metrics_batch(metrics_batch)
}
func _save_metrics_batch(metrics: List[MetricData]) {
"""批量保存指标"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try {
for metric in metrics {
cursor.execute("""
INSERT INTO metrics (timestamp, metric_name, value, tags, unit)
VALUES (?, ?, ?, ?, ?)
""", (
metric.timestamp.isoformat(),
metric.metric_name,
metric.value,
json.dumps(metric.tags) if metric.tags else null,
metric.unit
))
}
conn.commit()
} catch Exception as e {
self.logger.error(f"保存指标失败: {e}")
conn.rollback()
} finally {
conn.close()
}
}
func get_metrics(
metric_name: str,
start_time: datetime,
end_time: datetime
) -> List[MetricData] {
"""获取指标数据"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT timestamp, metric_name, value, tags, unit
FROM metrics
WHERE metric_name = ? AND timestamp BETWEEN ? AND ?
ORDER BY timestamp
""", (
metric_name,
start_time.isoformat(),
end_time.isoformat()
))
metrics = []
for row in cursor.fetchall() {
metrics.append(MetricData(
timestamp=datetime.fromisoformat(row[0]),
metric_name=row[1],
value=row[2],
tags=json.loads(row[3]) if row[3] else null,
unit=row[4] or ""
))
}
conn.close()
return metrics
}
func get_latest_metric(metric_name: str) -> Optional[MetricData] {
"""获取最新指标值"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT timestamp, metric_name, value, tags, unit
FROM metrics
WHERE metric_name = ?
ORDER BY timestamp DESC
LIMIT 1
""", (metric_name,))
row = cursor.fetchone()
conn.close()
if row {
return MetricData(
timestamp=datetime.fromisoformat(row[0]),
metric_name=row[1],
value=row[2],
tags=json.loads(row[3]) if row[3] else null,
unit=row[4] or ""
)
}
return null
}
func cleanup_old_metrics(days: int = 30) {
"""清理旧指标数据"""
cutoff_date = datetime.now() - timedelta(days=days)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
DELETE FROM metrics
WHERE timestamp < ?
""", (cutoff_date.isoformat(),))
deleted_count = cursor.rowcount
conn.commit()
conn.close()
self.logger.info(f"清理了 {deleted_count} 条旧指标数据")
}
class AlertManager {
func __init__(metrics_collector: MetricsCollector) {
self.metrics_collector = metrics_collector
self.alert_rules = []
self.active_alerts = {}
self.alert_history = []
self.running = false
self.alert_thread = null
self.logger = logging.getLogger("AlertManager")
self.notification_channels = {}
}
func add_alert_rule(rule: AlertRule) {
"""添加告警规则"""
self.alert_rules.append(rule)
self.logger.info(f"添加告警规则: {rule.name}")
}
func remove_alert_rule(rule_name: str) {
"""删除告警规则"""
self.alert_rules = [rule for rule in self.alert_rules if rule.name != rule_name]
self.logger.info(f"删除告警规则: {rule_name}")
}
func add_notification_channel(name: str, config: Dict[str, Any]) {
"""添加通知渠道"""
self.notification_channels[name] = config
self.logger.info(f"添加通知渠道: {name}")
}
func start() {
"""启动告警管理"""
if self.running {
return
}
self.running = true
self.alert_thread = threading.Thread(target=self._alert_loop)
self.alert_thread.daemon = true
self.alert_thread.start()
self.logger.info("告警管理器已启动")
}
func stop() {
"""停止告警管理"""
self.running = false
if self.alert_thread {
self.alert_thread.join()
}
self.logger.info("告警管理器已停止")
}
func _alert_loop() {
"""告警检查循环"""
while self.running {
try {
self._check_alert_rules()
time.sleep(30) # 每30秒检查一次
} catch Exception as e {
self.logger.error(f"检查告警规则时出错: {e}")
time.sleep(10)
}
}
}
func _check_alert_rules() {
"""检查告警规则"""
for rule in self.alert_rules {
if not rule.enabled {
continue
}
try {
self._evaluate_rule(rule)
} catch Exception as e {
self.logger.error(f"评估告警规则 {rule.name} 时出错: {e}")
}
}
}
func _evaluate_rule(rule: AlertRule) {
"""评估告警规则"""
# 获取最新指标值
metric = self.metrics_collector.get_latest_metric(rule.metric_name)
if not metric {
return
}
# 检查条件
triggered = self._check_condition(metric.value, rule.condition, rule.threshold)
if triggered {
# 检查是否已经有活跃告警
if rule.name not in self.active_alerts {
# 创建新告警
alert = Alert(
rule_name=rule.name,
metric_name=rule.metric_name,
current_value=metric.value,
threshold=rule.threshold,
severity=rule.severity,
message=f"{rule.metric_name} {rule.condition} {rule.threshold}, 当前值: {metric.value}",
triggered_at=datetime.now()
)
self.active_alerts[rule.name] = alert
self.alert_history.append(alert)
# 发送通知
self._send_alert_notification(alert, rule)
self.logger.warning(f"触发告警: {alert.message}")
}
} else {
# 检查是否需要解决告警
if rule.name in self.active_alerts {
alert = self.active_alerts[rule.name]
alert.status = "resolved"
alert.resolved_at = datetime.now()
del self.active_alerts[rule.name]
# 发送解决通知
self._send_resolution_notification(alert, rule)
self.logger.info(f"告警已解决: {alert.message}")
}
}
}
func _check_condition(value: float, condition: str, threshold: float) -> bool {
"""检查条件"""
if condition == ">" {
return value > threshold
} elif condition == "<" {
return value < threshold
} elif condition == ">=" {
return value >= threshold
} elif condition == "<=" {
return value <= threshold
} elif condition == "==" {
return value == threshold
} elif condition == "!=" {
return value != threshold
} else {
return false
}
}
func _send_alert_notification(alert: Alert, rule: AlertRule) {
"""发送告警通知"""
if not rule.notification_channels {
return
}
for channel_name in rule.notification_channels {
if channel_name in self.notification_channels {
try {
self._send_notification(channel_name, alert, "alert")
} catch Exception as e {
self.logger.error(f"发送告警通知失败 ({channel_name}): {e}")
}
}
}
}
func _send_resolution_notification(alert: Alert, rule: AlertRule) {
"""发送解决通知"""
if not rule.notification_channels {
return
}
for channel_name in rule.notification_channels {
if channel_name in self.notification_channels {
try {
self._send_notification(channel_name, alert, "resolution")
} catch Exception as e {
self.logger.error(f"发送解决通知失败 ({channel_name}): {e}")
}
}
}
}
func _send_notification(channel_name: str, alert: Alert, notification_type: str) {
"""发送通知"""
channel_config = self.notification_channels[channel_name]
channel_type = channel_config.get("type")
if channel_type == "email" {
self._send_email_notification(channel_config, alert, notification_type)
} elif channel_type == "webhook" {
self._send_webhook_notification(channel_config, alert, notification_type)
} elif channel_type == "slack" {
self._send_slack_notification(channel_config, alert, notification_type)
}
}
func _send_email_notification(config: Dict[str, Any], alert: Alert, notification_type: str) {
"""发送邮件通知"""
smtp_server = config["smtp_server"]
smtp_port = config["smtp_port"]
username = config["username"]
password = config["password"]
to_emails = config["to_emails"]
subject = f"[{alert.severity.upper()}] {notification_type.title()}: {alert.rule_name}"
if notification_type == "alert" {
body = f"""
告警触发:
规则名称: {alert.rule_name}
指标名称: {alert.metric_name}
当前值: {alert.current_value}
阈值: {alert.threshold}
严重程度: {alert.severity}
触发时间: {alert.triggered_at}
消息: {alert.message}
"""
} else {
body = f"""
告警解决:
规则名称: {alert.rule_name}
指标名称: {alert.metric_name}
严重程度: {alert.severity}
触发时间: {alert.triggered_at}
解决时间: {alert.resolved_at}
消息: {alert.message}
"""
}
msg = MIMEMultipart()
msg['From'] = username
msg['To'] = ", ".join(to_emails)
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain', 'utf-8'))
with smtplib.SMTP(smtp_server, smtp_port) as server {
server.starttls()
server.login(username, password)
server.send_message(msg)
}
}
func _send_webhook_notification(config: Dict[str, Any], alert: Alert, notification_type: str) {
"""发送 Webhook 通知"""
webhook_url = config["url"]
payload = {
"type": notification_type,
"alert": asdict(alert),
"timestamp": datetime.now().isoformat()
}
headers = {"Content-Type": "application/json"}
if "headers" in config {
headers.update(config["headers"])
}
response = requests.post(webhook_url, json=payload, headers=headers, timeout=10)
response.raise_for_status()
}
func _send_slack_notification(config: Dict[str, Any], alert: Alert, notification_type: str) {
"""发送 Slack 通知"""
webhook_url = config["webhook_url"]
color = {
"low": "good",
"medium": "warning",
"high": "danger",
"critical": "danger"
}.get(alert.severity, "warning")
if notification_type == "alert" {
title = f"🚨 告警触发: {alert.rule_name}"
text = f"指标 {alert.metric_name} 当前值 {alert.current_value} 超过阈值 {alert.threshold}"
} else {
title = f"✅ 告警解决: {alert.rule_name}"
text = f"指标 {alert.metric_name} 已恢复正常"
color = "good"
}
payload = {
"attachments": [{
"color": color,
"title": title,
"text": text,
"fields": [
{"title": "严重程度", "value": alert.severity, "short": true},
{"title": "当前值", "value": str(alert.current_value), "short": true},
{"title": "阈值", "value": str(alert.threshold), "short": true},
{"title": "时间", "value": alert.triggered_at.strftime("%Y-%m-%d %H:%M:%S"), "short": true}
]
}]
}
response = requests.post(webhook_url, json=payload, timeout=10)
response.raise_for_status()
}
func get_active_alerts() -> List[Alert] {
"""获取活跃告警"""
return list(self.active_alerts.values())
}
func get_alert_history(limit: int = 100) -> List[Alert] {
"""获取告警历史"""
return self.alert_history[-limit:]
}
# 使用示例
if __name__ == "__main__" {
# 创建指标收集器
metrics_collector = MetricsCollector()
metrics_collector.start()
# 创建告警管理器
alert_manager = AlertManager(metrics_collector)
# 添加通知渠道
alert_manager.add_notification_channel("email", {
"type": "email",
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"username": "your-email@gmail.com",
"password": "your-password",
"to_emails": ["admin@example.com"]
})
alert_manager.add_notification_channel("webhook", {
"type": "webhook",
"url": "https://your-webhook-url.com/alerts",
"headers": {"Authorization": "Bearer your-token"}
})
# 添加告警规则
alert_manager.add_alert_rule(AlertRule(
name="high_cpu_usage",
metric_name="system.cpu.usage",
condition=">",
threshold=80.0,
duration=300, # 5分钟
severity="high",
notification_channels=["email", "webhook"]
))
alert_manager.add_alert_rule(AlertRule(
name="high_memory_usage",
metric_name="system.memory.usage",
condition=">",
threshold=90.0,
duration=300,
severity="critical",
notification_channels=["email", "webhook"]
))
alert_manager.add_alert_rule(AlertRule(
name="low_disk_space",
metric_name="system.disk.usage",
condition=">",
threshold=85.0,
duration=600, # 10分钟
severity="medium",
notification_channels=["email"]
))
# 启动告警管理器
alert_manager.start()
try {
# 运行一段时间
time.sleep(60)
# 获取活跃告警
active_alerts = alert_manager.get_active_alerts()
print(f"活跃告警数量: {len(active_alerts)}")
for alert in active_alerts {
print(f" - {alert.rule_name}: {alert.message}")
}
} finally {
# 停止服务
alert_manager.stop()
metrics_collector.stop()
}
}
日志管理
import logging
import logging.handlers
import json
import gzip
import os
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
import threading
import queue
import re
@dataclass
class LogEntry {
timestamp: datetime
level: str
logger_name: str
message: str
module: str
function: str
line_number: int
thread_id: int
process_id: int
extra_data: Dict[str, Any] = null
}
class StructuredLogger {
func __init__(
name: str,
log_dir: str = "logs",
max_file_size: int = 10 * 1024 * 1024, # 10MB
backup_count: int = 5,
compress_backups: bool = true
) {
self.name = name
self.log_dir = Path(log_dir)
self.log_dir.mkdir(exist_ok=true)
self.max_file_size = max_file_size
self.backup_count = backup_count
self.compress_backups = compress_backups
self.logger = self._setup_logger()
self.log_queue = queue.Queue()
self.log_processor = null
self.running = false
}
func _setup_logger() -> logging.Logger {
"""设置日志记录器"""
logger = logging.getLogger(self.name)
logger.setLevel(logging.DEBUG)
# 清除现有处理器
logger.handlers.clear()
# 文件处理器
log_file = self.log_dir / f"{self.name}.log"
file_handler = logging.handlers.RotatingFileHandler(
log_file,
maxBytes=self.max_file_size,
backupCount=self.backup_count,
encoding='utf-8'
)
# JSON 格式化器
json_formatter = JsonFormatter()
file_handler.setFormatter(json_formatter)
logger.addHandler(file_handler)
# 控制台处理器
console_handler = logging.StreamHandler()
console_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)
return logger
}
func start_async_logging() {
"""启动异步日志处理"""
if self.running {
return
}
self.running = true
self.log_processor = threading.Thread(target=self._process_log_queue)
self.log_processor.daemon = true
self.log_processor.start()
}
func stop_async_logging() {
"""停止异步日志处理"""
self.running = false
if self.log_processor {
self.log_processor.join()
}
}
func _process_log_queue() {
"""处理日志队列"""
while self.running {
try {
log_entry = self.log_queue.get(timeout=1)
self._write_log_entry(log_entry)
} catch queue.Empty {
continue
} catch Exception as e {
print(f"处理日志队列时出错: {e}")
}
}
func _write_log_entry(log_entry: LogEntry) {
"""写入日志条目"""
level_method = getattr(self.logger, log_entry.level.lower())
extra = {
'module': log_entry.module,
'function': log_entry.function,
'line_number': log_entry.line_number,
'thread_id': log_entry.thread_id,
'process_id': log_entry.process_id
}
if log_entry.extra_data {
extra.update(log_entry.extra_data)
}
level_method(log_entry.message, extra=extra)
}
func log(level: str, message: str, **kwargs) {
"""记录日志"""
import inspect
import threading
import os
frame = inspect.currentframe().f_back
log_entry = LogEntry(
timestamp=datetime.now(),
level=level.upper(),
logger_name=self.name,
message=message,
module=frame.f_globals.get('__name__', 'unknown'),
function=frame.f_code.co_name,
line_number=frame.f_lineno,
thread_id=threading.get_ident(),
process_id=os.getpid(),
extra_data=kwargs if kwargs else null
)
if self.running {
self.log_queue.put(log_entry)
} else {
self._write_log_entry(log_entry)
}
}
func debug(message: str, **kwargs) {
self.log('DEBUG', message, **kwargs)
}
func info(message: str, **kwargs) {
self.log('INFO', message, **kwargs)
}
func warning(message: str, **kwargs) {
self.log('WARNING', message, **kwargs)
}
func error(message: str, **kwargs) {
self.log('ERROR', message, **kwargs)
}
func critical(message: str, **kwargs) {
self.log('CRITICAL', message, **kwargs)
}
func compress_old_logs() {
"""压缩旧日志文件"""
if not self.compress_backups {
return
}
log_files = list(self.log_dir.glob(f"{self.name}.log.*"))
for log_file in log_files {
if not log_file.name.endswith('.gz') {
compressed_file = log_file.with_suffix(log_file.suffix + '.gz')
with open(log_file, 'rb') as f_in {
with gzip.open(compressed_file, 'wb') as f_out {
f_out.writelines(f_in)
}
}
log_file.unlink()
}
func cleanup_old_logs(days: int = 30) {
"""清理旧日志文件"""
cutoff_date = datetime.now() - timedelta(days=days)
log_files = list(self.log_dir.glob(f"{self.name}.log.*"))
for log_file in log_files {
if log_file.stat().st_mtime < cutoff_date.timestamp() {
log_file.unlink()
print(f"删除旧日志文件: {log_file}")
}
}
}
class JsonFormatter(logging.Formatter) {
func format(record: logging.LogRecord) -> str {
"""JSON 格式化器"""
log_data = {
'timestamp': datetime.fromtimestamp(record.created).isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': getattr(record, 'module', record.module if hasattr(record, 'module') else 'unknown'),
'function': getattr(record, 'function', record.funcName),
'line_number': getattr(record, 'line_number', record.lineno),
'thread_id': getattr(record, 'thread_id', record.thread),
'process_id': getattr(record, 'process_id', record.process)
}
# 添加额外数据
for key, value in record.__dict__.items() {
if key not in ['name', 'msg', 'args', 'levelname', 'levelno', 'pathname',
'filename', 'module', 'lineno', 'funcName', 'created',
'msecs', 'relativeCreated', 'thread', 'threadName',
'processName', 'process', 'getMessage', 'exc_info',
'exc_text', 'stack_info'] {
log_data[key] = value
}
}
return json.dumps(log_data, ensure_ascii=false, default=str)
}
}
class LogAnalyzer {
func __init__(log_dir: str = "logs") {
self.log_dir = Path(log_dir)
}
func parse_log_file(log_file: Path) -> List[LogEntry] {
"""解析日志文件"""
log_entries = []
try {
with open(log_file, 'r', encoding='utf-8') as f {
for line in f {
line = line.strip()
if not line {
continue
}
try {
log_data = json.loads(line)
log_entry = LogEntry(
timestamp=datetime.fromisoformat(log_data['timestamp']),
level=log_data['level'],
logger_name=log_data['logger'],
message=log_data['message'],
module=log_data.get('module', 'unknown'),
function=log_data.get('function', 'unknown'),
line_number=log_data.get('line_number', 0),
thread_id=log_data.get('thread_id', 0),
process_id=log_data.get('process_id', 0),
extra_data={k: v for k, v in log_data.items()
if k not in ['timestamp', 'level', 'logger', 'message',
'module', 'function', 'line_number',
'thread_id', 'process_id']}
)
log_entries.append(log_entry)
} catch json.JSONDecodeError {
# 跳过无效的JSON行
continue
}
}
}
} catch Exception as e {
print(f"解析日志文件 {log_file} 时出错: {e}")
}
return log_entries
}
func search_logs(
pattern: str,
start_time: Optional[datetime] = null,
end_time: Optional[datetime] = null,
level: Optional[str] = null,
logger_name: Optional[str] = null
) -> List[LogEntry] {
"""搜索日志"""
matching_entries = []
log_files = list(self.log_dir.glob("*.log"))
log_files.extend(self.log_dir.glob("*.log.*"))
for log_file in log_files {
if log_file.name.endswith('.gz') {
continue # 跳过压缩文件
}
log_entries = self.parse_log_file(log_file)
for entry in log_entries {
# 时间过滤
if start_time and entry.timestamp < start_time {
continue
}
if end_time and entry.timestamp > end_time {
continue
}
# 级别过滤
if level and entry.level != level.upper() {
continue
}
# 日志记录器过滤
if logger_name and entry.logger_name != logger_name {
continue
}
# 模式匹配
if re.search(pattern, entry.message, re.IGNORECASE) {
matching_entries.append(entry)
}
}
}
return sorted(matching_entries, key=lambda x: x.timestamp)
}
func get_log_statistics(
start_time: Optional[datetime] = null,
end_time: Optional[datetime] = null
) -> Dict[str, Any] {
"""获取日志统计信息"""
stats = {
'total_entries': 0,
'level_counts': {},
'logger_counts': {},
'error_patterns': {},
'time_range': {'start': null, 'end': null}
}
log_files = list(self.log_dir.glob("*.log"))
for log_file in log_files {
log_entries = self.parse_log_file(log_file)
for entry in log_entries {
# 时间过滤
if start_time and entry.timestamp < start_time {
continue
}
if end_time and entry.timestamp > end_time {
continue
}
stats['total_entries'] += 1
# 更新时间范围
if not stats['time_range']['start'] or entry.timestamp < stats['time_range']['start'] {
stats['time_range']['start'] = entry.timestamp
}
if not stats['time_range']['end'] or entry.timestamp > stats['time_range']['end'] {
stats['time_range']['end'] = entry.timestamp
}
# 级别统计
stats['level_counts'][entry.level] = stats['level_counts'].get(entry.level, 0) + 1
# 日志记录器统计
stats['logger_counts'][entry.logger_name] = stats['logger_counts'].get(entry.logger_name, 0) + 1
# 错误模式统计
if entry.level in ['ERROR', 'CRITICAL'] {
# 提取错误关键词
error_keywords = re.findall(r'\b\w+Error\b|\b\w+Exception\b|\bFailed\b|\bTimeout\b', entry.message)
for keyword in error_keywords {
stats['error_patterns'][keyword] = stats['error_patterns'].get(keyword, 0) + 1
}
}
}
}
return stats
}
}
# 使用示例
if __name__ == "__main__" {
# 创建结构化日志记录器
logger = StructuredLogger("my_app")
logger.start_async_logging()
try {
# 记录不同级别的日志
logger.info("应用启动", component="main", version="1.0.0")
logger.debug("调试信息", user_id=12345, action="login")
logger.warning("警告信息", resource="database", usage=85.5)
logger.error("错误信息", error_code="DB001", details="连接超时")
# 等待日志处理
time.sleep(2)
# 压缩旧日志
logger.compress_old_logs()
# 分析日志
analyzer = LogAnalyzer()
# 搜索错误日志
error_logs = analyzer.search_logs(
pattern="error|exception|failed",
level="ERROR",
start_time=datetime.now() - timedelta(hours=1)
)
print(f"找到 {len(error_logs)} 条错误日志")
# 获取统计信息
stats = analyzer.get_log_statistics(
start_time=datetime.now() - timedelta(days=1)
)
print(f"日志统计信息:")
print(f" 总条数: {stats['total_entries']}")
print(f" 级别分布: {stats['level_counts']}")
print(f" 错误模式: {stats['error_patterns']}")
} finally {
logger.stop_async_logging()
}
}
7.4 性能优化
应用性能调优
import time
import psutil
import threading
import multiprocessing
import asyncio
import cProfile
import pstats
import io
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass
from datetime import datetime, timedelta
import gc
import sys
import tracemalloc
import linecache
import os
@dataclass
class PerformanceMetric {
name: str
value: float
unit: str
timestamp: datetime
context: Dict[str, Any] = null
}
@dataclass
class BottleneckInfo {
location: str
function_name: str
execution_time: float
call_count: int
cumulative_time: float
percentage: float
}
class PerformanceProfiler {
func __init__() {
self.profiler = null
self.metrics = []
self.start_time = null
self.memory_snapshots = []
self.cpu_samples = []
self.is_profiling = false
}
func start_profiling() {
"""开始性能分析"""
if self.is_profiling {
return
}
self.is_profiling = true
self.start_time = time.time()
# 启动CPU分析
self.profiler = cProfile.Profile()
self.profiler.enable()
# 启动内存跟踪
tracemalloc.start()
# 记录初始内存快照
self._take_memory_snapshot("start")
print("性能分析已启动")
}
func stop_profiling() -> Dict[str, Any] {
"""停止性能分析并返回结果"""
if not self.is_profiling {
return {}
}
self.is_profiling = false
end_time = time.time()
total_time = end_time - self.start_time
# 停止CPU分析
self.profiler.disable()
# 记录最终内存快照
self._take_memory_snapshot("end")
# 停止内存跟踪
tracemalloc.stop()
# 分析结果
cpu_stats = self._analyze_cpu_profile()
memory_stats = self._analyze_memory_usage()
results = {
'total_execution_time': total_time,
'cpu_analysis': cpu_stats,
'memory_analysis': memory_stats,
'bottlenecks': self._identify_bottlenecks(cpu_stats),
'recommendations': self._generate_recommendations(cpu_stats, memory_stats)
}
print(f"性能分析完成,总执行时间: {total_time:.2f}秒")
return results
}
func _take_memory_snapshot(label: str) {
"""获取内存快照"""
if tracemalloc.is_tracing() {
snapshot = tracemalloc.take_snapshot()
self.memory_snapshots.append({
'label': label,
'timestamp': datetime.now(),
'snapshot': snapshot
})
}
}
func _analyze_cpu_profile() -> Dict[str, Any] {
"""分析CPU性能"""
if not self.profiler {
return {}
}
# 创建统计对象
stats_stream = io.StringIO()
stats = pstats.Stats(self.profiler, stream=stats_stream)
stats.sort_stats('cumulative')
# 获取前20个最耗时的函数
stats.print_stats(20)
# 解析统计信息
stats_output = stats_stream.getvalue()
# 获取详细统计数据
function_stats = []
for func_info, (call_count, total_time, cumulative_time, callers) in stats.stats.items() {
filename, line_number, function_name = func_info
function_stats.append({
'filename': filename,
'line_number': line_number,
'function_name': function_name,
'call_count': call_count,
'total_time': total_time,
'cumulative_time': cumulative_time,
'avg_time_per_call': total_time / call_count if call_count > 0 else 0
})
}
# 按累计时间排序
function_stats.sort(key=lambda x: x['cumulative_time'], reverse=true)
return {
'total_calls': stats.total_calls,
'total_time': stats.total_tt,
'function_stats': function_stats[:20], # 前20个
'raw_output': stats_output
}
}
func _analyze_memory_usage() -> Dict[str, Any] {
"""分析内存使用"""
if len(self.memory_snapshots) < 2 {
return {}
}
start_snapshot = self.memory_snapshots[0]['snapshot']
end_snapshot = self.memory_snapshots[-1]['snapshot']
# 比较内存快照
top_stats = end_snapshot.compare_to(start_snapshot, 'lineno')
memory_growth = []
for stat in top_stats[:10] { # 前10个内存增长最多的位置
memory_growth.append({
'filename': stat.traceback.format()[0] if stat.traceback else 'unknown',
'size_diff': stat.size_diff,
'count_diff': stat.count_diff,
'size': stat.size,
'count': stat.count
})
}
# 当前内存使用情况
current_memory = psutil.Process().memory_info()
return {
'memory_growth': memory_growth,
'current_rss': current_memory.rss,
'current_vms': current_memory.vms,
'peak_memory': end_snapshot.statistics('filename')[0].size if end_snapshot.statistics('filename') else 0
}
}
func _identify_bottlenecks(cpu_stats: Dict[str, Any]) -> List[BottleneckInfo] {
"""识别性能瓶颈"""
bottlenecks = []
if 'function_stats' not in cpu_stats {
return bottlenecks
}
total_time = cpu_stats.get('total_time', 1)
for func_stat in cpu_stats['function_stats'][:10] {
if func_stat['cumulative_time'] > total_time * 0.05 { # 占用超过5%的时间
bottleneck = BottleneckInfo(
location=f"{func_stat['filename']}:{func_stat['line_number']}",
function_name=func_stat['function_name'],
execution_time=func_stat['total_time'],
call_count=func_stat['call_count'],
cumulative_time=func_stat['cumulative_time'],
percentage=(func_stat['cumulative_time'] / total_time) * 100
)
bottlenecks.append(bottleneck)
}
}
return bottlenecks
}
func _generate_recommendations(cpu_stats: Dict[str, Any], memory_stats: Dict[str, Any]) -> List[str] {
"""生成优化建议"""
recommendations = []
# CPU优化建议
if 'function_stats' in cpu_stats {
top_functions = cpu_stats['function_stats'][:5]
for func in top_functions {
if func['call_count'] > 1000 {
recommendations.append(
f"函数 {func['function_name']} 被调用 {func['call_count']} 次,考虑缓存结果或减少调用次数"
)
}
if func['avg_time_per_call'] > 0.01 { # 每次调用超过10ms
recommendations.append(
f"函数 {func['function_name']} 平均执行时间较长,考虑算法优化"
)
}
}
}
# 内存优化建议
if 'memory_growth' in memory_stats {
for growth in memory_stats['memory_growth'][:3] {
if growth['size_diff'] > 1024 * 1024 { # 增长超过1MB
recommendations.append(
f"位置 {growth['filename']} 内存增长 {growth['size_diff'] / 1024 / 1024:.2f}MB,检查内存泄漏"
)
}
}
}
# 通用建议
if not recommendations {
recommendations.append("性能表现良好,可以考虑进一步的微优化")
}
return recommendations
}
func profile_function(func: Callable, *args, **kwargs) -> Dict[str, Any] {
"""分析单个函数的性能"""
self.start_profiling()
try {
result = func(*args, **kwargs)
return {
'result': result,
'profile': self.stop_profiling()
}
} catch Exception as e {
self.stop_profiling()
raise e
}
}
}
class ResourceOptimizer {
func __init__() {
self.cpu_count = multiprocessing.cpu_count()
self.memory_info = psutil.virtual_memory()
self.optimization_history = []
}
func optimize_thread_pool_size(workload_type: str = "io_bound") -> int {
"""优化线程池大小"""
if workload_type == "cpu_bound" {
# CPU密集型任务:线程数 = CPU核心数
optimal_size = self.cpu_count
} elif workload_type == "io_bound" {
# IO密集型任务:线程数 = CPU核心数 * 2-4
optimal_size = self.cpu_count * 3
} else {
# 混合型任务:线程数 = CPU核心数 * 1.5
optimal_size = int(self.cpu_count * 1.5)
}
# 考虑内存限制
max_threads_by_memory = self.memory_info.available // (50 * 1024 * 1024) # 每线程50MB
optimal_size = min(optimal_size, max_threads_by_memory)
self.optimization_history.append({
'type': 'thread_pool_size',
'workload_type': workload_type,
'recommended_size': optimal_size,
'timestamp': datetime.now()
})
return max(1, optimal_size)
}
func optimize_batch_size(item_size_mb: float, processing_time_ms: float) -> int {
"""优化批处理大小"""
# 基于内存限制
available_memory_mb = self.memory_info.available / 1024 / 1024
max_batch_by_memory = int(available_memory_mb * 0.1 / item_size_mb) # 使用10%的可用内存
# 基于处理时间(目标:每批处理时间1-5秒)
target_batch_time_ms = 3000 # 3秒
optimal_batch_by_time = int(target_batch_time_ms / processing_time_ms)
# 取较小值
optimal_batch_size = min(max_batch_by_memory, optimal_batch_by_time)
# 确保最小批大小
optimal_batch_size = max(1, optimal_batch_size)
self.optimization_history.append({
'type': 'batch_size',
'item_size_mb': item_size_mb,
'processing_time_ms': processing_time_ms,
'recommended_size': optimal_batch_size,
'timestamp': datetime.now()
})
return optimal_batch_size
}
func optimize_cache_size(hit_rate: float, miss_penalty_ms: float) -> int {
"""优化缓存大小"""
# 基于内存可用性
available_memory_mb = self.memory_info.available / 1024 / 1024
max_cache_memory_mb = available_memory_mb * 0.2 # 使用20%的可用内存
# 基于命中率和惩罚时间
if hit_rate < 0.8 {
# 命中率低,增加缓存大小
recommended_memory_mb = max_cache_memory_mb * 0.8
} elif hit_rate > 0.95 {
# 命中率很高,可以减少缓存大小
recommended_memory_mb = max_cache_memory_mb * 0.3
} else {
# 命中率适中
recommended_memory_mb = max_cache_memory_mb * 0.5
}
# 转换为条目数(假设每个条目平均1KB)
cache_size = int(recommended_memory_mb * 1024)
self.optimization_history.append({
'type': 'cache_size',
'hit_rate': hit_rate,
'miss_penalty_ms': miss_penalty_ms,
'recommended_size': cache_size,
'timestamp': datetime.now()
})
return max(100, cache_size) # 最小100个条目
}
func get_system_recommendations() -> Dict[str, Any] {
"""获取系统级优化建议"""
recommendations = {
'cpu': [],
'memory': [],
'disk': [],
'network': []
}
# CPU建议
cpu_percent = psutil.cpu_percent(interval=1)
if cpu_percent > 80 {
recommendations['cpu'].append("CPU使用率过高,考虑优化算法或增加并行处理")
} elif cpu_percent < 20 {
recommendations['cpu'].append("CPU使用率较低,可以增加并发处理")
}
# 内存建议
memory_percent = self.memory_info.percent
if memory_percent > 85 {
recommendations['memory'].append("内存使用率过高,考虑优化内存使用或增加内存")
} elif memory_percent < 30 {
recommendations['memory'].append("内存使用率较低,可以增加缓存大小")
}
# 磁盘建议
disk_usage = psutil.disk_usage('/')
disk_percent = (disk_usage.used / disk_usage.total) * 100
if disk_percent > 90 {
recommendations['disk'].append("磁盘空间不足,需要清理或扩容")
}
disk_io = psutil.disk_io_counters()
if hasattr(disk_io, 'read_time') and hasattr(disk_io, 'write_time') {
total_io_time = disk_io.read_time + disk_io.write_time
if total_io_time > 10000 { # 10秒
recommendations['disk'].append("磁盘IO时间较长,考虑使用SSD或优化IO操作")
}
}
# 网络建议
net_io = psutil.net_io_counters()
if net_io.errin > 100 or net_io.errout > 100 {
recommendations['network'].append("网络错误较多,检查网络连接")
}
return recommendations
}
}
class AsyncOptimizer {
func __init__() {
self.loop = null
self.semaphore_pool = {}
}
async func optimize_concurrent_requests(max_concurrent: int = 10) {
"""优化并发请求数量"""
if max_concurrent not in self.semaphore_pool {
self.semaphore_pool[max_concurrent] = asyncio.Semaphore(max_concurrent)
}
return self.semaphore_pool[max_concurrent]
}
async func batch_process_async(
items: List[Any],
process_func: Callable,
batch_size: int = 10,
max_concurrent: int = 5
) -> List[Any] {
"""异步批处理"""
semaphore = await self.optimize_concurrent_requests(max_concurrent)
results = []
async func process_batch(batch) {
async with semaphore {
batch_results = []
for item in batch {
if asyncio.iscoroutinefunction(process_func) {
result = await process_func(item)
} else {
result = process_func(item)
}
batch_results.append(result)
}
return batch_results
}
}
# 分批处理
batches = [items[i:i + batch_size] for i in range(0, len(items), batch_size)]
# 并发执行批次
batch_tasks = [process_batch(batch) for batch in batches]
batch_results = await asyncio.gather(*batch_tasks)
# 合并结果
for batch_result in batch_results {
results.extend(batch_result)
}
return results
}
async func adaptive_retry(
func: Callable,
max_retries: int = 3,
base_delay: float = 1.0,
backoff_factor: float = 2.0
) {
"""自适应重试机制"""
last_exception = null
for attempt in range(max_retries + 1) {
try {
if asyncio.iscoroutinefunction(func) {
return await func()
} else {
return func()
}
} catch Exception as e {
last_exception = e
if attempt < max_retries {
delay = base_delay * (backoff_factor ** attempt)
await asyncio.sleep(delay)
} else {
raise last_exception
}
}
}
}
}
# 使用示例
if __name__ == "__main__" {
# 性能分析示例
profiler = PerformanceProfiler()
func example_function() {
# 模拟一些计算
total = 0
for i in range(1000000) {
total += i * i
}
return total
}
# 分析函数性能
result = profiler.profile_function(example_function)
print(f"函数结果: {result['result']}")
print(f"性能分析: {result['profile']['recommendations']}")
# 资源优化示例
optimizer = ResourceOptimizer()
# 优化线程池大小
thread_pool_size = optimizer.optimize_thread_pool_size("io_bound")
print(f"推荐线程池大小: {thread_pool_size}")
# 优化批处理大小
batch_size = optimizer.optimize_batch_size(item_size_mb=0.1, processing_time_ms=10)
print(f"推荐批处理大小: {batch_size}")
# 获取系统建议
system_recommendations = optimizer.get_system_recommendations()
print(f"系统优化建议: {system_recommendations}")
}
7.5 故障排除
故障诊断与恢复
import os
import sys
import traceback
import logging
import json
import subprocess
import signal
import time
import psutil
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, asdict
from enum import Enum
import threading
import queue
import requests
import sqlite3
from pathlib import Path
class HealthStatus(Enum) {
HEALTHY = "healthy"
WARNING = "warning"
CRITICAL = "critical"
UNKNOWN = "unknown"
}
class IncidentSeverity(Enum) {
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
}
@dataclass
class HealthCheck {
name: str
status: HealthStatus
message: str
timestamp: datetime
response_time: float
details: Dict[str, Any] = null
}
@dataclass
class Incident {
id: str
title: str
description: str
severity: IncidentSeverity
status: str # "open", "investigating", "resolved"
created_at: datetime
updated_at: datetime
resolved_at: Optional[datetime] = null
affected_services: List[str] = null
root_cause: Optional[str] = null
resolution_steps: List[str] = null
}
class HealthMonitor {
func __init__(db_path: str = "health.db") {
self.db_path = db_path
self.health_checks = {}
self.running = false
self.monitor_thread = null
self.logger = logging.getLogger("HealthMonitor")
self.init_database()
}
func init_database() {
"""初始化数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS health_checks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
status TEXT NOT NULL,
message TEXT,
timestamp TEXT NOT NULL,
response_time REAL,
details TEXT
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS incidents (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
description TEXT,
severity TEXT NOT NULL,
status TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
resolved_at TEXT,
affected_services TEXT,
root_cause TEXT,
resolution_steps TEXT
)
""")
conn.commit()
conn.close()
}
func register_health_check(name: str, check_func: Callable) {
"""注册健康检查"""
self.health_checks[name] = check_func
self.logger.info(f"注册健康检查: {name}")
}
func start_monitoring(interval: int = 30) {
"""启动健康监控"""
if self.running {
return
}
self.running = true
self.monitor_thread = threading.Thread(
target=self._monitor_loop,
args=(interval,)
)
self.monitor_thread.daemon = true
self.monitor_thread.start()
self.logger.info(f"健康监控已启动,检查间隔: {interval}秒")
}
func stop_monitoring() {
"""停止健康监控"""
self.running = false
if self.monitor_thread {
self.monitor_thread.join()
}
self.logger.info("健康监控已停止")
}
func _monitor_loop(interval: int) {
"""监控循环"""
while self.running {
try {
self._run_health_checks()
time.sleep(interval)
} catch Exception as e {
self.logger.error(f"健康检查时出错: {e}")
time.sleep(10)
}
}
}
func _run_health_checks() {
"""运行所有健康检查"""
for name, check_func in self.health_checks.items() {
try {
start_time = time.time()
result = check_func()
response_time = time.time() - start_time
if isinstance(result, dict) {
status = HealthStatus(result.get('status', 'unknown'))
message = result.get('message', '')
details = result.get('details')
} else {
status = HealthStatus.HEALTHY if result else HealthStatus.CRITICAL
message = "Check passed" if result else "Check failed"
details = null
}
health_check = HealthCheck(
name=name,
status=status,
message=message,
timestamp=datetime.now(),
response_time=response_time,
details=details
)
self._save_health_check(health_check)
if status in [HealthStatus.CRITICAL, HealthStatus.WARNING] {
self.logger.warning(f"健康检查失败: {name} - {message}")
}
} catch Exception as e {
error_check = HealthCheck(
name=name,
status=HealthStatus.CRITICAL,
message=f"检查执行失败: {str(e)}",
timestamp=datetime.now(),
response_time=0.0
)
self._save_health_check(error_check)
self.logger.error(f"健康检查 {name} 执行失败: {e}")
}
}
}
func _save_health_check(health_check: HealthCheck) {
"""保存健康检查结果"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO health_checks (name, status, message, timestamp, response_time, details)
VALUES (?, ?, ?, ?, ?, ?)
""", (
health_check.name,
health_check.status.value,
health_check.message,
health_check.timestamp.isoformat(),
health_check.response_time,
json.dumps(health_check.details) if health_check.details else null
))
conn.commit()
conn.close()
}
func get_current_health() -> Dict[str, HealthCheck] {
"""获取当前健康状态"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT name, status, message, timestamp, response_time, details
FROM health_checks h1
WHERE timestamp = (
SELECT MAX(timestamp)
FROM health_checks h2
WHERE h2.name = h1.name
)
""")
current_health = {}
for row in cursor.fetchall() {
name, status, message, timestamp, response_time, details = row
health_check = HealthCheck(
name=name,
status=HealthStatus(status),
message=message,
timestamp=datetime.fromisoformat(timestamp),
response_time=response_time,
details=json.loads(details) if details else null
)
current_health[name] = health_check
}
conn.close()
return current_health
}
func get_health_history(
name: str,
start_time: datetime,
end_time: datetime
) -> List[HealthCheck] {
"""获取健康检查历史"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT name, status, message, timestamp, response_time, details
FROM health_checks
WHERE name = ? AND timestamp BETWEEN ? AND ?
ORDER BY timestamp
""", (name, start_time.isoformat(), end_time.isoformat()))
history = []
for row in cursor.fetchall() {
name, status, message, timestamp, response_time, details = row
health_check = HealthCheck(
name=name,
status=HealthStatus(status),
message=message,
timestamp=datetime.fromisoformat(timestamp),
response_time=response_time,
details=json.loads(details) if details else null
)
history.append(health_check)
}
conn.close()
return history
}
}
class IncidentManager {
func __init__(db_path: str = "incidents.db") {
self.db_path = db_path
self.logger = logging.getLogger("IncidentManager")
self.init_database()
}
func init_database() {
"""初始化数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS incidents (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
description TEXT,
severity TEXT NOT NULL,
status TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
resolved_at TEXT,
affected_services TEXT,
root_cause TEXT,
resolution_steps TEXT
)
""")
conn.commit()
conn.close()
}
func create_incident(
title: str,
description: str,
severity: IncidentSeverity,
affected_services: List[str] = null
) -> str {
"""创建事故"""
incident_id = f"INC-{datetime.now().strftime('%Y%m%d%H%M%S')}"
incident = Incident(
id=incident_id,
title=title,
description=description,
severity=severity,
status="open",
created_at=datetime.now(),
updated_at=datetime.now(),
affected_services=affected_services or []
)
self._save_incident(incident)
self.logger.warning(f"创建事故: {incident_id} - {title}")
return incident_id
}
func update_incident(
incident_id: str,
status: Optional[str] = null,
root_cause: Optional[str] = null,
resolution_steps: Optional[List[str]] = null
) {
"""更新事故"""
incident = self.get_incident(incident_id)
if not incident {
raise ValueError(f"事故不存在: {incident_id}")
}
if status {
incident.status = status
}
if root_cause {
incident.root_cause = root_cause
}
if resolution_steps {
incident.resolution_steps = resolution_steps
}
incident.updated_at = datetime.now()
if status == "resolved" {
incident.resolved_at = datetime.now()
}
self._save_incident(incident)
self.logger.info(f"更新事故: {incident_id}")
}
func _save_incident(incident: Incident) {
"""保存事故"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO incidents (
id, title, description, severity, status,
created_at, updated_at, resolved_at,
affected_services, root_cause, resolution_steps
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
incident.id,
incident.title,
incident.description,
incident.severity.value,
incident.status,
incident.created_at.isoformat(),
incident.updated_at.isoformat(),
incident.resolved_at.isoformat() if incident.resolved_at else null,
json.dumps(incident.affected_services) if incident.affected_services else null,
incident.root_cause,
json.dumps(incident.resolution_steps) if incident.resolution_steps else null
))
conn.commit()
conn.close()
}
func get_incident(incident_id: str) -> Optional[Incident] {
"""获取事故详情"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT id, title, description, severity, status,
created_at, updated_at, resolved_at,
affected_services, root_cause, resolution_steps
FROM incidents
WHERE id = ?
""", (incident_id,))
row = cursor.fetchone()
conn.close()
if not row {
return null
}
(
id, title, description, severity, status,
created_at, updated_at, resolved_at,
affected_services, root_cause, resolution_steps
) = row
return Incident(
id=id,
title=title,
description=description,
severity=IncidentSeverity(severity),
status=status,
created_at=datetime.fromisoformat(created_at),
updated_at=datetime.fromisoformat(updated_at),
resolved_at=datetime.fromisoformat(resolved_at) if resolved_at else null,
affected_services=json.loads(affected_services) if affected_services else [],
root_cause=root_cause,
resolution_steps=json.loads(resolution_steps) if resolution_steps else []
)
}
func get_open_incidents() -> List[Incident] {
"""获取未解决的事故"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT id, title, description, severity, status,
created_at, updated_at, resolved_at,
affected_services, root_cause, resolution_steps
FROM incidents
WHERE status != 'resolved'
ORDER BY created_at DESC
""")
incidents = []
for row in cursor.fetchall() {
(
id, title, description, severity, status,
created_at, updated_at, resolved_at,
affected_services, root_cause, resolution_steps
) = row
incident = Incident(
id=id,
title=title,
description=description,
severity=IncidentSeverity(severity),
status=status,
created_at=datetime.fromisoformat(created_at),
updated_at=datetime.fromisoformat(updated_at),
resolved_at=datetime.fromisoformat(resolved_at) if resolved_at else null,
affected_services=json.loads(affected_services) if affected_services else [],
root_cause=root_cause,
resolution_steps=json.loads(resolution_steps) if resolution_steps else []
)
incidents.append(incident)
}
conn.close()
return incidents
}
}
class AutoRecovery {
func __init__(health_monitor: HealthMonitor, incident_manager: IncidentManager) {
self.health_monitor = health_monitor
self.incident_manager = incident_manager
self.recovery_actions = {}
self.logger = logging.getLogger("AutoRecovery")
}
func register_recovery_action(service_name: str, action: Callable) {
"""注册恢复动作"""
self.recovery_actions[service_name] = action
self.logger.info(f"注册恢复动作: {service_name}")
}
func attempt_recovery(service_name: str) -> bool {
"""尝试自动恢复"""
if service_name not in self.recovery_actions {
self.logger.warning(f"没有为服务 {service_name} 注册恢复动作")
return false
}
try {
self.logger.info(f"尝试恢复服务: {service_name}")
recovery_action = self.recovery_actions[service_name]
result = recovery_action()
if result {
self.logger.info(f"服务 {service_name} 恢复成功")
return true
} else {
self.logger.error(f"服务 {service_name} 恢复失败")
return false
}
} catch Exception as e {
self.logger.error(f"恢复服务 {service_name} 时出错: {e}")
return false
}
}
func check_and_recover() {
"""检查并尝试恢复"""
current_health = self.health_monitor.get_current_health()
for service_name, health_check in current_health.items() {
if health_check.status == HealthStatus.CRITICAL {
self.logger.warning(f"检测到服务 {service_name} 状态异常")
# 尝试自动恢复
recovery_success = self.attempt_recovery(service_name)
if not recovery_success {
# 创建事故
incident_id = self.incident_manager.create_incident(
title=f"服务 {service_name} 故障",
description=health_check.message,
severity=IncidentSeverity.HIGH,
affected_services=[service_name]
)
self.logger.error(f"自动恢复失败,已创建事故: {incident_id}")
}
}
}
}
}
# 预定义的健康检查函数
class CommonHealthChecks {
@staticmethod
func check_disk_space(threshold: float = 90.0) -> Dict[str, Any] {
"""检查磁盘空间"""
disk_usage = psutil.disk_usage('/')
usage_percent = (disk_usage.used / disk_usage.total) * 100
if usage_percent > threshold {
return {
'status': 'critical',
'message': f'磁盘使用率 {usage_percent:.1f}% 超过阈值 {threshold}%',
'details': {
'usage_percent': usage_percent,
'free_space_gb': disk_usage.free / (1024**3),
'total_space_gb': disk_usage.total / (1024**3)
}
}
} else {
return {
'status': 'healthy',
'message': f'磁盘使用率正常: {usage_percent:.1f}%',
'details': {
'usage_percent': usage_percent,
'free_space_gb': disk_usage.free / (1024**3)
}
}
}
}
@staticmethod
func check_memory_usage(threshold: float = 85.0) -> Dict[str, Any] {
"""检查内存使用"""
memory = psutil.virtual_memory()
if memory.percent > threshold {
return {
'status': 'critical',
'message': f'内存使用率 {memory.percent:.1f}% 超过阈值 {threshold}%',
'details': {
'usage_percent': memory.percent,
'available_gb': memory.available / (1024**3),
'total_gb': memory.total / (1024**3)
}
}
} else {
return {
'status': 'healthy',
'message': f'内存使用率正常: {memory.percent:.1f}%',
'details': {
'usage_percent': memory.percent,
'available_gb': memory.available / (1024**3)
}
}
}
}
@staticmethod
func check_cpu_usage(threshold: float = 80.0) -> Dict[str, Any] {
"""检查CPU使用率"""
cpu_percent = psutil.cpu_percent(interval=1)
if cpu_percent > threshold {
return {
'status': 'warning',
'message': f'CPU使用率 {cpu_percent:.1f}% 超过阈值 {threshold}%',
'details': {
'usage_percent': cpu_percent,
'cpu_count': psutil.cpu_count()
}
}
} else {
return {
'status': 'healthy',
'message': f'CPU使用率正常: {cpu_percent:.1f}%',
'details': {
'usage_percent': cpu_percent
}
}
}
}
@staticmethod
func check_process_running(process_name: str) -> Dict[str, Any] {
"""检查进程是否运行"""
for proc in psutil.process_iter(['pid', 'name']) {
if proc.info['name'] == process_name {
return {
'status': 'healthy',
'message': f'进程 {process_name} 正在运行',
'details': {
'pid': proc.info['pid']
}
}
}
}
return {
'status': 'critical',
'message': f'进程 {process_name} 未运行',
'details': {}
}
}
@staticmethod
func check_http_endpoint(url: str, timeout: int = 10) -> Dict[str, Any] {
"""检查HTTP端点"""
try {
start_time = time.time()
response = requests.get(url, timeout=timeout)
response_time = time.time() - start_time
if response.status_code == 200 {
return {
'status': 'healthy',
'message': f'HTTP端点 {url} 响应正常',
'details': {
'status_code': response.status_code,
'response_time': response_time
}
}
} else {
return {
'status': 'warning',
'message': f'HTTP端点 {url} 返回状态码 {response.status_code}',
'details': {
'status_code': response.status_code,
'response_time': response_time
}
}
}
} catch requests.exceptions.Timeout {
return {
'status': 'critical',
'message': f'HTTP端点 {url} 响应超时',
'details': {'timeout': timeout}
}
} catch Exception as e {
return {
'status': 'critical',
'message': f'HTTP端点 {url} 检查失败: {str(e)}',
'details': {'error': str(e)}
}
}
}
}
# 预定义的恢复动作
class CommonRecoveryActions {
@staticmethod
func restart_service(service_name: str) -> bool {
"""重启服务"""
try {
# Linux系统
if os.name == 'posix' {
result = subprocess.run(
['systemctl', 'restart', service_name],
capture_output=true,
text=true
)
return result.returncode == 0
}
# Windows系统
elif os.name == 'nt' {
result = subprocess.run(
['sc', 'stop', service_name],
capture_output=true,
text=true
)
time.sleep(2)
result = subprocess.run(
['sc', 'start', service_name],
capture_output=true,
text=true
)
return result.returncode == 0
}
return false
} catch Exception {
return false
}
}
@staticmethod
func clear_cache_directory(cache_dir: str) -> bool {
"""清理缓存目录"""
try {
cache_path = Path(cache_dir)
if cache_path.exists() {
for file_path in cache_path.iterdir() {
if file_path.is_file() {
file_path.unlink()
} elif file_path.is_dir() {
import shutil
shutil.rmtree(file_path)
}
}
return true
} catch Exception {
return false
}
}
@staticmethod
func restart_application(pid_file: str, start_command: str) -> bool {
"""重启应用程序"""
try {
# 停止应用
if os.path.exists(pid_file) {
with open(pid_file, 'r') as f {
pid = int(f.read().strip())
}
try {
os.kill(pid, signal.SIGTERM)
time.sleep(5)
os.kill(pid, signal.SIGKILL)
} catch ProcessLookupError {
pass # 进程已经不存在
}
}
# 启动应用
subprocess.Popen(start_command, shell=true)
time.sleep(3)
return true
} catch Exception {
return false
}
}
}
# 使用示例
if __name__ == "__main__" {
# 创建健康监控器
health_monitor = HealthMonitor()
# 注册健康检查
health_monitor.register_health_check(
"disk_space",
lambda: CommonHealthChecks.check_disk_space(90.0)
)
health_monitor.register_health_check(
"memory_usage",
lambda: CommonHealthChecks.check_memory_usage(85.0)
)
health_monitor.register_health_check(
"cpu_usage",
lambda: CommonHealthChecks.check_cpu_usage(80.0)
)
health_monitor.register_health_check(
"web_service",
lambda: CommonHealthChecks.check_http_endpoint("http://localhost:8080/health")
)
# 创建事故管理器
incident_manager = IncidentManager()
# 创建自动恢复
auto_recovery = AutoRecovery(health_monitor, incident_manager)
# 注册恢复动作
auto_recovery.register_recovery_action(
"web_service",
lambda: CommonRecoveryActions.restart_service("nginx")
)
# 启动监控
health_monitor.start_monitoring(30) # 每30秒检查一次
try {
# 运行一段时间
time.sleep(120)
# 检查当前健康状态
current_health = health_monitor.get_current_health()
print("当前健康状态:")
for name, health in current_health.items() {
print(f" {name}: {health.status.value} - {health.message}")
}
# 检查并尝试恢复
auto_recovery.check_and_recover()
# 查看未解决的事故
open_incidents = incident_manager.get_open_incidents()
print(f"\n未解决的事故数量: {len(open_incidents)}")
for incident in open_incidents {
print(f" {incident.id}: {incident.title} ({incident.severity.value})")
}
} finally {
health_monitor.stop_monitoring()
}
}
7.6 章节总结
核心要点
生产环境部署
- 环境配置管理和验证
- 应用服务器的生命周期管理
- 配置文件的安全性和可维护性
容器化部署
- Docker 镜像构建和管理
- 容器运行时配置
- Docker Compose 编排
监控与日志
- 系统指标收集和存储
- 告警规则配置和通知
- 结构化日志记录和分析
性能优化
- 性能分析和瓶颈识别
- 资源使用优化
- 异步处理优化
故障排除
- 健康检查和监控
- 事故管理流程
- 自动恢复机制
最佳实践
部署策略
- 使用蓝绿部署或滚动更新
- 实施配置管理和版本控制
- 建立回滚机制
监控策略
- 建立多层次监控体系
- 设置合理的告警阈值
- 实施日志聚合和分析
性能策略
- 定期进行性能分析
- 优化关键路径和热点代码
- 合理配置资源限制
运维策略
- 建立标准化的运维流程
- 实施自动化运维
- 定期进行灾难恢复演练
练习题
部署实践
- 为一个 AI Script 应用创建完整的部署配置
- 实现蓝绿部署策略
- 配置负载均衡和健康检查
监控实践
- 设计应用监控指标体系
- 实现自定义告警规则
- 配置多渠道通知机制
性能优化
- 分析应用性能瓶颈
- 优化数据库查询性能
- 实现缓存策略
故障处理
- 设计故障恢复流程
- 实现自动故障检测
- 编写故障处理手册
通过本章的学习,你已经掌握了 AI Script 应用的完整部署与运维知识,能够构建稳定、高效、可监控的生产环境。