本章概述
本章将详细介绍Scrapy项目的部署与运维,包括生产环境部署、监控系统、日志管理、性能优化、故障排除等内容。通过本章学习,你将掌握如何将Scrapy项目部署到生产环境并进行有效的运维管理。
学习目标
- 掌握Scrapy项目的部署方法
- 学会配置生产环境
- 了解监控和日志管理
- 掌握性能优化技巧
- 学会故障排除和维护
1. 生产环境部署
1.1 部署准备
# 1. 部署准备
print("🚀 Scrapy生产环境部署:")
import os
import sys
import subprocess
import configparser
from pathlib import Path
class DeploymentManager:
"""
部署管理器
"""
def __init__(self, project_path):
self.project_path = Path(project_path)
self.config = configparser.ConfigParser()
self.load_config()
def load_config(self):
"""
加载部署配置
"""
config_file = self.project_path / 'deploy.ini'
if config_file.exists():
self.config.read(config_file)
else:
# 创建默认配置
self.create_default_config(config_file)
def create_default_config(self, config_file):
"""
创建默认部署配置
"""
self.config['DEFAULT'] = {
'project_name': 'myspider',
'python_version': '3.8',
'scrapy_version': '2.5.0'
}
self.config['production'] = {
'host': 'localhost',
'port': '6800',
'log_level': 'INFO',
'concurrent_requests': '16',
'download_delay': '1',
'autothrottle_enabled': 'True'
}
self.config['database'] = {
'host': 'localhost',
'port': '5432',
'name': 'scrapy_db',
'user': 'scrapy_user',
'password': 'your_password'
}
self.config['redis'] = {
'host': 'localhost',
'port': '6379',
'db': '0',
'password': ''
}
with open(config_file, 'w') as f:
self.config.write(f)
print(f"创建默认配置文件: {config_file}")
def check_environment(self):
"""
检查部署环境
"""
print("\n🔍 检查部署环境:")
checks = {
'Python版本': self.check_python_version(),
'Scrapy安装': self.check_scrapy_installation(),
'依赖包': self.check_dependencies(),
'系统资源': self.check_system_resources(),
'网络连接': self.check_network_connectivity()
}
for check_name, result in checks.items():
status = "✅" if result['success'] else "❌"
print(f" {status} {check_name}: {result['message']}")
return all(check['success'] for check in checks.values())
def check_python_version(self):
"""
检查Python版本
"""
try:
version = sys.version_info
required_version = tuple(map(int, self.config['DEFAULT']['python_version'].split('.')))
if version[:2] >= required_version:
return {
'success': True,
'message': f"Python {version.major}.{version.minor}.{version.micro}"
}
else:
return {
'success': False,
'message': f"需要Python {required_version[0]}.{required_version[1]}+"
}
except Exception as e:
return {'success': False, 'message': str(e)}
def check_scrapy_installation(self):
"""
检查Scrapy安装
"""
try:
import scrapy
return {
'success': True,
'message': f"Scrapy {scrapy.__version__}"
}
except ImportError:
return {
'success': False,
'message': "Scrapy未安装"
}
def check_dependencies(self):
"""
检查依赖包
"""
try:
requirements_file = self.project_path / 'requirements.txt'
if not requirements_file.exists():
return {
'success': False,
'message': "requirements.txt文件不存在"
}
# 检查已安装的包
result = subprocess.run(
[sys.executable, '-m', 'pip', 'check'],
capture_output=True,
text=True
)
if result.returncode == 0:
return {
'success': True,
'message': "所有依赖包正常"
}
else:
return {
'success': False,
'message': f"依赖包问题: {result.stderr}"
}
except Exception as e:
return {'success': False, 'message': str(e)}
def check_system_resources(self):
"""
检查系统资源
"""
try:
import psutil
# 检查内存
memory = psutil.virtual_memory()
memory_gb = memory.total / (1024**3)
# 检查磁盘空间
disk = psutil.disk_usage('/')
disk_free_gb = disk.free / (1024**3)
if memory_gb >= 2 and disk_free_gb >= 5:
return {
'success': True,
'message': f"内存: {memory_gb:.1f}GB, 磁盘: {disk_free_gb:.1f}GB可用"
}
else:
return {
'success': False,
'message': f"资源不足 - 内存: {memory_gb:.1f}GB, 磁盘: {disk_free_gb:.1f}GB"
}
except ImportError:
return {
'success': False,
'message': "psutil未安装,无法检查系统资源"
}
except Exception as e:
return {'success': False, 'message': str(e)}
def check_network_connectivity(self):
"""
检查网络连接
"""
try:
import requests
# 测试网络连接
response = requests.get('https://httpbin.org/get', timeout=10)
if response.status_code == 200:
return {
'success': True,
'message': "网络连接正常"
}
else:
return {
'success': False,
'message': f"网络连接异常: {response.status_code}"
}
except Exception as e:
return {'success': False, 'message': f"网络连接失败: {str(e)}"}
def create_requirements_file(self):
"""
创建requirements.txt文件
"""
requirements = [
f"Scrapy=={self.config['DEFAULT']['scrapy_version']}",
"scrapy-redis>=0.6.8",
"pymongo>=3.12.0",
"psycopg2-binary>=2.8.6",
"redis>=3.5.3",
"requests>=2.25.1",
"lxml>=4.6.3",
"Pillow>=8.2.0",
"selenium>=3.141.0",
"psutil>=5.8.0",
"scrapyd>=1.2.1",
"scrapyd-client>=1.2.0"
]
requirements_file = self.project_path / 'requirements.txt'
with open(requirements_file, 'w') as f:
f.write('\n'.join(requirements))
print(f"创建requirements.txt文件: {requirements_file}")
def create_dockerfile(self):
"""
创建Dockerfile
"""
dockerfile_content = f"""
# 使用官方Python镜像
FROM python:{self.config['DEFAULT']['python_version']}-slim
# 设置工作目录
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \\
gcc \\
g++ \\
libxml2-dev \\
libxslt1-dev \\
libffi-dev \\
libssl-dev \\
zlib1g-dev \\
libjpeg-dev \\
libpng-dev \\
&& rm -rf /var/lib/apt/lists/*
# 复制requirements文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制项目文件
COPY . .
# 暴露端口
EXPOSE 6800
# 启动命令
CMD ["scrapyd"]
"""
dockerfile_path = self.project_path / 'Dockerfile'
with open(dockerfile_path, 'w') as f:
f.write(dockerfile_content.strip())
print(f"创建Dockerfile: {dockerfile_path}")
def create_docker_compose(self):
"""
创建docker-compose.yml文件
"""
compose_content = f"""
version: '3.8'
services:
scrapy:
build: .
ports:
- "{self.config['production']['port']}:6800"
environment:
- SCRAPY_SETTINGS_MODULE=myproject.settings
volumes:
- ./logs:/app/logs
- ./data:/app/data
depends_on:
- redis
- postgres
restart: unless-stopped
redis:
image: redis:6-alpine
ports:
- "{self.config['redis']['port']}:6379"
volumes:
- redis_data:/data
restart: unless-stopped
postgres:
image: postgres:13-alpine
environment:
- POSTGRES_DB={self.config['database']['name']}
- POSTGRES_USER={self.config['database']['user']}
- POSTGRES_PASSWORD={self.config['database']['password']}
ports:
- "{self.config['database']['port']}:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
restart: unless-stopped
scrapyd-web:
image: scrapyd-web:latest
ports:
- "5000:5000"
environment:
- SCRAPYD_SERVERS=scrapy:6800
depends_on:
- scrapy
restart: unless-stopped
volumes:
redis_data:
postgres_data:
"""
compose_path = self.project_path / 'docker-compose.yml'
with open(compose_path, 'w') as f:
f.write(compose_content.strip())
print(f"创建docker-compose.yml: {compose_path}")
# 使用示例
deployment_manager = DeploymentManager('/path/to/your/scrapy/project')
# 检查环境
if deployment_manager.check_environment():
print("\n✅ 环境检查通过,可以开始部署")
# 创建部署文件
deployment_manager.create_requirements_file()
deployment_manager.create_dockerfile()
deployment_manager.create_docker_compose()
else:
print("\n❌ 环境检查失败,请解决问题后重试")
print("部署准备完成!")
1.2 Scrapyd部署
# 2. Scrapyd部署
print("\n🔧 Scrapyd部署配置:")
import json
import requests
import tarfile
import tempfile
from datetime import datetime
class ScrapydDeployer:
"""
Scrapyd部署器
"""
def __init__(self, scrapyd_url='http://localhost:6800'):
self.scrapyd_url = scrapyd_url.rstrip('/')
self.session = requests.Session()
def check_scrapyd_status(self):
"""
检查Scrapyd状态
"""
try:
response = self.session.get(f"{self.scrapyd_url}/daemonstatus.json")
if response.status_code == 200:
data = response.json()
print(f"✅ Scrapyd运行正常")
print(f" - 状态: {data['status']}")
print(f" - 运行中任务: {data['running']}")
print(f" - 等待中任务: {data['pending']}")
print(f" - 已完成任务: {data['finished']}")
return True
else:
print(f"❌ Scrapyd连接失败: {response.status_code}")
return False
except Exception as e:
print(f"❌ Scrapyd连接错误: {str(e)}")
return False
def list_projects(self):
"""
列出所有项目
"""
try:
response = self.session.get(f"{self.scrapyd_url}/listprojects.json")
if response.status_code == 200:
data = response.json()
projects = data['projects']
print(f"📋 已部署项目 ({len(projects)}个):")
for project in projects:
print(f" - {project}")
return projects
else:
print(f"❌ 获取项目列表失败: {response.status_code}")
return []
except Exception as e:
print(f"❌ 获取项目列表错误: {str(e)}")
return []
def list_spiders(self, project):
"""
列出项目中的爬虫
"""
try:
response = self.session.get(
f"{self.scrapyd_url}/listspiders.json",
params={'project': project}
)
if response.status_code == 200:
data = response.json()
spiders = data['spiders']
print(f"🕷️ 项目 '{project}' 中的爬虫 ({len(spiders)}个):")
for spider in spiders:
print(f" - {spider}")
return spiders
else:
print(f"❌ 获取爬虫列表失败: {response.status_code}")
return []
except Exception as e:
print(f"❌ 获取爬虫列表错误: {str(e)}")
return []
def deploy_project(self, project_name, project_path, version=None):
"""
部署项目到Scrapyd
"""
if version is None:
version = datetime.now().strftime("%Y%m%d_%H%M%S")
print(f"📦 开始部署项目 '{project_name}' 版本 '{version}'")
try:
# 创建项目包
egg_path = self.create_egg(project_path, project_name, version)
# 上传到Scrapyd
with open(egg_path, 'rb') as f:
files = {'egg': f}
data = {
'project': project_name,
'version': version
}
response = self.session.post(
f"{self.scrapyd_url}/addversion.json",
files=files,
data=data
)
if response.status_code == 200:
result = response.json()
if result['status'] == 'ok':
print(f"✅ 项目部署成功")
print(f" - 项目: {project_name}")
print(f" - 版本: {version}")
print(f" - 爬虫数量: {result['spiders']}")
return True
else:
print(f"❌ 部署失败: {result.get('message', '未知错误')}")
return False
else:
print(f"❌ 部署请求失败: {response.status_code}")
return False
except Exception as e:
print(f"❌ 部署过程出错: {str(e)}")
return False
finally:
# 清理临时文件
if 'egg_path' in locals() and os.path.exists(egg_path):
os.remove(egg_path)
def create_egg(self, project_path, project_name, version):
"""
创建项目egg包
"""
import subprocess
import tempfile
# 创建临时目录
with tempfile.TemporaryDirectory() as temp_dir:
egg_path = os.path.join(temp_dir, f"{project_name}-{version}.egg")
# 使用scrapyd-deploy创建egg包
cmd = [
sys.executable, '-m', 'scrapyd_client.deploy',
'--build-egg', egg_path,
'--project', project_name
]
result = subprocess.run(
cmd,
cwd=project_path,
capture_output=True,
text=True
)
if result.returncode == 0:
# 复制到永久位置
final_egg_path = f"{project_name}-{version}.egg"
import shutil
shutil.copy2(egg_path, final_egg_path)
return final_egg_path
else:
raise Exception(f"创建egg包失败: {result.stderr}")
def schedule_spider(self, project, spider, **kwargs):
"""
调度爬虫运行
"""
data = {
'project': project,
'spider': spider
}
data.update(kwargs)
try:
response = self.session.post(
f"{self.scrapyd_url}/schedule.json",
data=data
)
if response.status_code == 200:
result = response.json()
if result['status'] == 'ok':
job_id = result['jobid']
print(f"✅ 爬虫调度成功")
print(f" - 项目: {project}")
print(f" - 爬虫: {spider}")
print(f" - 任务ID: {job_id}")
return job_id
else:
print(f"❌ 调度失败: {result.get('message', '未知错误')}")
return None
else:
print(f"❌ 调度请求失败: {response.status_code}")
return None
except Exception as e:
print(f"❌ 调度过程出错: {str(e)}")
return None
def cancel_job(self, project, job_id):
"""
取消任务
"""
try:
response = self.session.post(
f"{self.scrapyd_url}/cancel.json",
data={
'project': project,
'job': job_id
}
)
if response.status_code == 200:
result = response.json()
if result['status'] == 'ok':
print(f"✅ 任务取消成功: {job_id}")
return True
else:
print(f"❌ 取消失败: {result.get('message', '未知错误')}")
return False
else:
print(f"❌ 取消请求失败: {response.status_code}")
return False
except Exception as e:
print(f"❌ 取消过程出错: {str(e)}")
return False
def get_job_status(self, project, job_id):
"""
获取任务状态
"""
try:
# 获取运行中的任务
response = self.session.get(f"{self.scrapyd_url}/listjobs.json",
params={'project': project})
if response.status_code == 200:
data = response.json()
# 检查各个状态
for status in ['running', 'pending', 'finished']:
for job in data.get(status, []):
if job['id'] == job_id:
return {
'status': status,
'spider': job['spider'],
'start_time': job.get('start_time'),
'end_time': job.get('end_time')
}
return {'status': 'not_found'}
else:
print(f"❌ 获取任务状态失败: {response.status_code}")
return None
except Exception as e:
print(f"❌ 获取任务状态错误: {str(e)}")
return None
def delete_project(self, project):
"""
删除项目
"""
try:
response = self.session.post(
f"{self.scrapyd_url}/delproject.json",
data={'project': project}
)
if response.status_code == 200:
result = response.json()
if result['status'] == 'ok':
print(f"✅ 项目删除成功: {project}")
return True
else:
print(f"❌ 删除失败: {result.get('message', '未知错误')}")
return False
else:
print(f"❌ 删除请求失败: {response.status_code}")
return False
except Exception as e:
print(f"❌ 删除过程出错: {str(e)}")
return False
# 使用示例
deployer = ScrapydDeployer('http://localhost:6800')
# 检查Scrapyd状态
if deployer.check_scrapyd_status():
# 列出现有项目
projects = deployer.list_projects()
# 部署新项目
if deployer.deploy_project('myproject', '/path/to/project'):
# 列出爬虫
spiders = deployer.list_spiders('myproject')
# 调度爬虫
if spiders:
job_id = deployer.schedule_spider('myproject', spiders[0])
if job_id:
# 检查任务状态
import time
time.sleep(2)
status = deployer.get_job_status('myproject', job_id)
print(f"任务状态: {status}")
print("Scrapyd部署完成!")
1.3 Docker部署
# 3. Docker部署
print("\n🐳 Docker部署管理:")
import docker
import yaml
from pathlib import Path
class DockerDeployer:
"""
Docker部署管理器
"""
def __init__(self):
try:
self.client = docker.from_env()
print("✅ Docker客户端连接成功")
except Exception as e:
print(f"❌ Docker客户端连接失败: {str(e)}")
self.client = None
def build_image(self, project_path, image_name, tag='latest'):
"""
构建Docker镜像
"""
if not self.client:
return False
try:
print(f"🔨 开始构建镜像: {image_name}:{tag}")
# 构建镜像
image, logs = self.client.images.build(
path=str(project_path),
tag=f"{image_name}:{tag}",
rm=True,
forcerm=True
)
# 输出构建日志
for log in logs:
if 'stream' in log:
print(log['stream'].strip())
print(f"✅ 镜像构建成功: {image.id[:12]}")
return True
except Exception as e:
print(f"❌ 镜像构建失败: {str(e)}")
return False
def run_container(self, image_name, container_name, ports=None, volumes=None, environment=None):
"""
运行容器
"""
if not self.client:
return None
try:
print(f"🚀 启动容器: {container_name}")
# 停止并删除同名容器
self.stop_container(container_name)
# 运行新容器
container = self.client.containers.run(
image_name,
name=container_name,
ports=ports or {},
volumes=volumes or {},
environment=environment or {},
detach=True,
restart_policy={"Name": "unless-stopped"}
)
print(f"✅ 容器启动成功: {container.id[:12]}")
return container
except Exception as e:
print(f"❌ 容器启动失败: {str(e)}")
return None
def stop_container(self, container_name):
"""
停止并删除容器
"""
if not self.client:
return False
try:
# 查找容器
containers = self.client.containers.list(
all=True,
filters={'name': container_name}
)
for container in containers:
if container.name == container_name:
print(f"🛑 停止容器: {container_name}")
container.stop()
container.remove()
return True
return True
except Exception as e:
print(f"❌ 停止容器失败: {str(e)}")
return False
def list_containers(self):
"""
列出所有容器
"""
if not self.client:
return []
try:
containers = self.client.containers.list(all=True)
print("📋 容器列表:")
for container in containers:
status = container.status
ports = container.ports
print(f" - {container.name}: {status}")
if ports:
for port, bindings in ports.items():
if bindings:
for binding in bindings:
print(f" 端口映射: {binding['HostPort']} -> {port}")
return containers
except Exception as e:
print(f"❌ 获取容器列表失败: {str(e)}")
return []
def get_container_logs(self, container_name, tail=100):
"""
获取容器日志
"""
if not self.client:
return None
try:
container = self.client.containers.get(container_name)
logs = container.logs(tail=tail, timestamps=True)
print(f"📄 容器 '{container_name}' 日志 (最近{tail}行):")
print(logs.decode('utf-8'))
return logs
except Exception as e:
print(f"❌ 获取容器日志失败: {str(e)}")
return None
def deploy_with_compose(self, compose_file_path):
"""
使用Docker Compose部署
"""
try:
import subprocess
print(f"🐙 使用Docker Compose部署: {compose_file_path}")
# 停止现有服务
subprocess.run([
'docker-compose', '-f', str(compose_file_path), 'down'
], check=False)
# 启动服务
result = subprocess.run([
'docker-compose', '-f', str(compose_file_path), 'up', '-d'
], capture_output=True, text=True)
if result.returncode == 0:
print("✅ Docker Compose部署成功")
# 显示服务状态
subprocess.run([
'docker-compose', '-f', str(compose_file_path), 'ps'
])
return True
else:
print(f"❌ Docker Compose部署失败: {result.stderr}")
return False
except Exception as e:
print(f"❌ Docker Compose部署错误: {str(e)}")
return False
def scale_service(self, compose_file_path, service_name, replicas):
"""
扩展服务实例
"""
try:
import subprocess
print(f"📈 扩展服务 '{service_name}' 到 {replicas} 个实例")
result = subprocess.run([
'docker-compose', '-f', str(compose_file_path),
'up', '-d', '--scale', f"{service_name}={replicas}"
], capture_output=True, text=True)
if result.returncode == 0:
print(f"✅ 服务扩展成功")
return True
else:
print(f"❌ 服务扩展失败: {result.stderr}")
return False
except Exception as e:
print(f"❌ 服务扩展错误: {str(e)}")
return False
def monitor_resources(self, container_name):
"""
监控容器资源使用
"""
if not self.client:
return None
try:
container = self.client.containers.get(container_name)
stats = container.stats(stream=False)
# 计算CPU使用率
cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - \
stats['precpu_stats']['cpu_usage']['total_usage']
system_delta = stats['cpu_stats']['system_cpu_usage'] - \
stats['precpu_stats']['system_cpu_usage']
cpu_percent = 0
if system_delta > 0:
cpu_percent = (cpu_delta / system_delta) * 100
# 计算内存使用
memory_usage = stats['memory_stats']['usage']
memory_limit = stats['memory_stats']['limit']
memory_percent = (memory_usage / memory_limit) * 100
print(f"📊 容器 '{container_name}' 资源使用:")
print(f" - CPU: {cpu_percent:.2f}%")
print(f" - 内存: {memory_usage / 1024 / 1024:.1f}MB / {memory_limit / 1024 / 1024:.1f}MB ({memory_percent:.1f}%)")
return {
'cpu_percent': cpu_percent,
'memory_usage': memory_usage,
'memory_limit': memory_limit,
'memory_percent': memory_percent
}
except Exception as e:
print(f"❌ 获取资源使用情况失败: {str(e)}")
return None
# 使用示例
docker_deployer = DockerDeployer()
if docker_deployer.client:
# 构建镜像
if docker_deployer.build_image('/path/to/project', 'myspider'):
# 运行容器
container = docker_deployer.run_container(
'myspider:latest',
'myspider-container',
ports={'6800/tcp': 6800},
volumes={'/path/to/logs': {'bind': '/app/logs', 'mode': 'rw'}},
environment={'SCRAPY_SETTINGS_MODULE': 'myproject.settings'}
)
if container:
# 列出容器
docker_deployer.list_containers()
# 监控资源
import time
time.sleep(5)
docker_deployer.monitor_resources('myspider-container')
# 使用Docker Compose部署
compose_file = Path('/path/to/docker-compose.yml')
if compose_file.exists():
docker_deployer.deploy_with_compose(compose_file)
print("Docker部署完成!")
2. 监控系统
2.1 性能监控
# 4. 性能监控系统
print("\n📊 性能监控系统:")
import time
import psutil
import threading
from collections import deque, defaultdict
from datetime import datetime, timedelta
class PerformanceMonitor:
"""
性能监控器
"""
def __init__(self, max_history=1000):
self.max_history = max_history
self.metrics = defaultdict(lambda: deque(maxlen=max_history))
self.is_monitoring = False
self.monitor_thread = None
# 监控配置
self.monitor_interval = 5 # 秒
self.alert_thresholds = {
'cpu_percent': 80,
'memory_percent': 85,
'disk_percent': 90,
'response_time': 10, # 秒
'error_rate': 5 # 百分比
}
# 统计数据
self.stats = {
'requests_total': 0,
'requests_success': 0,
'requests_failed': 0,
'response_times': deque(maxlen=1000),
'errors': deque(maxlen=100)
}
def start_monitoring(self):
"""
开始监控
"""
if self.is_monitoring:
print("⚠️ 监控已在运行中")
return
self.is_monitoring = True
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
print("🔍 性能监控已启动")
def stop_monitoring(self):
"""
停止监控
"""
self.is_monitoring = False
if self.monitor_thread:
self.monitor_thread.join()
print("🛑 性能监控已停止")
def _monitor_loop(self):
"""
监控循环
"""
while self.is_monitoring:
try:
# 收集系统指标
self._collect_system_metrics()
# 收集应用指标
self._collect_application_metrics()
# 检查告警
self._check_alerts()
time.sleep(self.monitor_interval)
except Exception as e:
print(f"❌ 监控过程出错: {str(e)}")
time.sleep(self.monitor_interval)
def _collect_system_metrics(self):
"""
收集系统指标
"""
timestamp = datetime.now()
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
self.metrics['cpu_percent'].append((timestamp, cpu_percent))
# 内存使用率
memory = psutil.virtual_memory()
self.metrics['memory_percent'].append((timestamp, memory.percent))
self.metrics['memory_used'].append((timestamp, memory.used))
self.metrics['memory_available'].append((timestamp, memory.available))
# 磁盘使用率
disk = psutil.disk_usage('/')
disk_percent = (disk.used / disk.total) * 100
self.metrics['disk_percent'].append((timestamp, disk_percent))
self.metrics['disk_used'].append((timestamp, disk.used))
self.metrics['disk_free'].append((timestamp, disk.free))
# 网络IO
net_io = psutil.net_io_counters()
self.metrics['network_bytes_sent'].append((timestamp, net_io.bytes_sent))
self.metrics['network_bytes_recv'].append((timestamp, net_io.bytes_recv))
# 磁盘IO
disk_io = psutil.disk_io_counters()
if disk_io:
self.metrics['disk_read_bytes'].append((timestamp, disk_io.read_bytes))
self.metrics['disk_write_bytes'].append((timestamp, disk_io.write_bytes))
def _collect_application_metrics(self):
"""
收集应用指标
"""
timestamp = datetime.now()
# 计算错误率
if self.stats['requests_total'] > 0:
error_rate = (self.stats['requests_failed'] / self.stats['requests_total']) * 100
self.metrics['error_rate'].append((timestamp, error_rate))
# 计算平均响应时间
if self.stats['response_times']:
avg_response_time = sum(self.stats['response_times']) / len(self.stats['response_times'])
self.metrics['avg_response_time'].append((timestamp, avg_response_time))
# 请求速率
recent_requests = len([t for t in self.stats['response_times']
if timestamp - datetime.fromtimestamp(t) < timedelta(minutes=1)])
self.metrics['requests_per_minute'].append((timestamp, recent_requests))
def _check_alerts(self):
"""
检查告警条件
"""
current_time = datetime.now()
for metric_name, threshold in self.alert_thresholds.items():
if metric_name in self.metrics and self.metrics[metric_name]:
latest_value = self.metrics[metric_name][-1][1]
if latest_value > threshold:
self._trigger_alert(metric_name, latest_value, threshold)
def _trigger_alert(self, metric_name, current_value, threshold):
"""
触发告警
"""
alert_message = f"🚨 告警: {metric_name} = {current_value:.2f} (阈值: {threshold})"
print(alert_message)
# 这里可以添加更多告警处理逻辑
# 例如发送邮件、短信、Slack通知等
def record_request(self, response_time, success=True):
"""
记录请求
"""
self.stats['requests_total'] += 1
if success:
self.stats['requests_success'] += 1
else:
self.stats['requests_failed'] += 1
self.stats['response_times'].append(response_time)
def record_error(self, error_message):
"""
记录错误
"""
timestamp = datetime.now()
self.stats['errors'].append((timestamp, error_message))
def get_current_metrics(self):
"""
获取当前指标
"""
current_metrics = {}
for metric_name, values in self.metrics.items():
if values:
current_metrics[metric_name] = values[-1][1]
return current_metrics
def get_metrics_history(self, metric_name, duration_minutes=60):
"""
获取指标历史数据
"""
if metric_name not in self.metrics:
return []
cutoff_time = datetime.now() - timedelta(minutes=duration_minutes)
return [(timestamp, value) for timestamp, value in self.metrics[metric_name]
if timestamp >= cutoff_time]
def generate_report(self):
"""
生成监控报告
"""
current_metrics = self.get_current_metrics()
print("\n📊 性能监控报告")
print("=" * 50)
print(f"报告时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print()
# 系统指标
print("🖥️ 系统指标:")
if 'cpu_percent' in current_metrics:
print(f" CPU使用率: {current_metrics['cpu_percent']:.1f}%")
if 'memory_percent' in current_metrics:
print(f" 内存使用率: {current_metrics['memory_percent']:.1f}%")
if 'disk_percent' in current_metrics:
print(f" 磁盘使用率: {current_metrics['disk_percent']:.1f}%")
# 应用指标
print("\n🕷️ 应用指标:")
print(f" 总请求数: {self.stats['requests_total']}")
print(f" 成功请求: {self.stats['requests_success']}")
print(f" 失败请求: {self.stats['requests_failed']}")
if 'error_rate' in current_metrics:
print(f" 错误率: {current_metrics['error_rate']:.2f}%")
if 'avg_response_time' in current_metrics:
print(f" 平均响应时间: {current_metrics['avg_response_time']:.2f}秒")
if 'requests_per_minute' in current_metrics:
print(f" 每分钟请求数: {current_metrics['requests_per_minute']:.0f}")
# 最近错误
if self.stats['errors']:
print("\n❌ 最近错误:")
for timestamp, error in list(self.stats['errors'])[-5:]:
print(f" {timestamp.strftime('%H:%M:%S')}: {error}")
print("=" * 50)
# Scrapy监控中间件
class MonitoringMiddleware:
"""
Scrapy监控中间件
"""
def __init__(self):
self.monitor = PerformanceMonitor()
self.monitor.start_monitoring()
def process_request(self, request, spider):
"""
处理请求
"""
request.meta['start_time'] = time.time()
return None
def process_response(self, request, response, spider):
"""
处理响应
"""
start_time = request.meta.get('start_time')
if start_time:
response_time = time.time() - start_time
success = response.status < 400
self.monitor.record_request(response_time, success)
if not success:
self.monitor.record_error(f"HTTP {response.status}: {request.url}")
return response
def process_exception(self, request, exception, spider):
"""
处理异常
"""
self.monitor.record_error(f"Exception: {str(exception)}")
return None
# 使用示例
monitor = PerformanceMonitor()
monitor.start_monitoring()
# 模拟一些请求
import random
for i in range(10):
response_time = random.uniform(0.5, 3.0)
success = random.choice([True, True, True, False]) # 75%成功率
monitor.record_request(response_time, success)
if not success:
monitor.record_error(f"模拟错误 {i}")
time.sleep(0.1)
# 生成报告
time.sleep(2)
monitor.generate_report()
# 获取指标历史
cpu_history = monitor.get_metrics_history('cpu_percent', 5)
print(f"\n📈 CPU使用率历史 (最近5分钟): {len(cpu_history)}个数据点")
monitor.stop_monitoring()
print("性能监控系统演示完成!")
2.2 日志监控
# 5. 日志监控系统
print("\n📝 日志监控系统:")
import logging
import re
import json
from datetime import datetime, timedelta
from collections import defaultdict, Counter
import threading
import queue
class LogMonitor:
"""
日志监控器
"""
def __init__(self, log_file_path=None):
self.log_file_path = log_file_path
self.log_queue = queue.Queue()
self.is_monitoring = False
self.monitor_thread = None
# 日志统计
self.log_stats = {
'total_lines': 0,
'error_count': 0,
'warning_count': 0,
'info_count': 0,
'debug_count': 0,
'spider_stats': defaultdict(lambda: defaultdict(int)),
'error_patterns': Counter(),
'recent_errors': [],
'performance_metrics': []
}
# 错误模式
self.error_patterns = [
(r'TimeoutError', '请求超时'),
(r'ConnectionError', '连接错误'),
(r'HTTP (\d+)', 'HTTP错误'),
(r'DNSLookupError', 'DNS解析错误'),
(r'TunnelError', '代理隧道错误'),
(r'SSLError', 'SSL证书错误'),
(r'MemoryError', '内存不足'),
(r'KeyError', '键值错误'),
(r'AttributeError', '属性错误'),
(r'ValueError', '值错误')
]
# 性能模式
self.performance_patterns = [
(r'Crawled \((\d+)\) <GET ([^>]+)> \(referer: [^)]*\) \[(\d+)ms\]', 'request_time'),
(r'Spider opened', 'spider_start'),
(r'Spider closed \(([^)]+)\)', 'spider_close'),
(r'Scraped from <(\d+) ([^>]+)>', 'item_scraped')
]
def start_monitoring(self):
"""
开始监控日志
"""
if self.is_monitoring:
print("⚠️ 日志监控已在运行中")
return
self.is_monitoring = True
self.monitor_thread = threading.Thread(target=self._monitor_log_file)
self.monitor_thread.daemon = True
self.monitor_thread.start()
print(f"📝 开始监控日志文件: {self.log_file_path}")
def stop_monitoring(self):
"""
停止监控
"""
self.is_monitoring = False
if self.monitor_thread:
self.monitor_thread.join()
print("🛑 日志监控已停止")
def _monitor_log_file(self):
"""
监控日志文件
"""
if not self.log_file_path:
return
try:
with open(self.log_file_path, 'r', encoding='utf-8') as f:
# 移动到文件末尾
f.seek(0, 2)
while self.is_monitoring:
line = f.readline()
if line:
self._process_log_line(line.strip())
else:
time.sleep(0.1)
except FileNotFoundError:
print(f"❌ 日志文件不存在: {self.log_file_path}")
except Exception as e:
print(f"❌ 监控日志文件出错: {str(e)}")
def _process_log_line(self, line):
"""
处理日志行
"""
self.log_stats['total_lines'] += 1
# 解析日志级别
log_level = self._extract_log_level(line)
if log_level:
self.log_stats[f'{log_level.lower()}_count'] += 1
# 检查错误模式
if log_level in ['ERROR', 'CRITICAL']:
self._analyze_error(line)
# 检查性能模式
self._analyze_performance(line)
# 提取爬虫统计
self._extract_spider_stats(line)
def _extract_log_level(self, line):
"""
提取日志级别
"""
level_pattern = r'\[(DEBUG|INFO|WARNING|ERROR|CRITICAL)\]'
match = re.search(level_pattern, line)
return match.group(1) if match else None
def _analyze_error(self, line):
"""
分析错误日志
"""
# 记录最近错误
timestamp = datetime.now()
self.log_stats['recent_errors'].append((timestamp, line))
# 保持最近100个错误
if len(self.log_stats['recent_errors']) > 100:
self.log_stats['recent_errors'] = self.log_stats['recent_errors'][-100:]
# 匹配错误模式
for pattern, description in self.error_patterns:
if re.search(pattern, line, re.IGNORECASE):
self.log_stats['error_patterns'][description] += 1
break
def _analyze_performance(self, line):
"""
分析性能日志
"""
for pattern, metric_type in self.performance_patterns:
match = re.search(pattern, line)
if match:
timestamp = datetime.now()
if metric_type == 'request_time':
status_code = int(match.group(1))
url = match.group(2)
response_time = int(match.group(3))
self.log_stats['performance_metrics'].append({
'timestamp': timestamp,
'type': 'request',
'status_code': status_code,
'url': url,
'response_time': response_time
})
elif metric_type == 'spider_start':
self.log_stats['performance_metrics'].append({
'timestamp': timestamp,
'type': 'spider_start'
})
elif metric_type == 'spider_close':
reason = match.group(1)
self.log_stats['performance_metrics'].append({
'timestamp': timestamp,
'type': 'spider_close',
'reason': reason
})
break
def _extract_spider_stats(self, line):
"""
提取爬虫统计信息
"""
# 提取爬虫名称
spider_pattern = r'\[([^\]]+)\]'
spider_match = re.search(spider_pattern, line)
if spider_match:
spider_name = spider_match.group(1)
# 统计不同类型的日志
if 'Scraped from' in line:
self.log_stats['spider_stats'][spider_name]['items_scraped'] += 1
elif 'Crawled' in line:
self.log_stats['spider_stats'][spider_name]['pages_crawled'] += 1
elif 'ERROR' in line:
self.log_stats['spider_stats'][spider_name]['errors'] += 1
def get_error_summary(self, hours=24):
"""
获取错误摘要
"""
cutoff_time = datetime.now() - timedelta(hours=hours)
recent_errors = [
(timestamp, error) for timestamp, error in self.log_stats['recent_errors']
if timestamp >= cutoff_time
]
return {
'total_errors': len(recent_errors),
'error_patterns': dict(self.log_stats['error_patterns']),
'recent_errors': recent_errors[-10:] # 最近10个错误
}
def get_performance_summary(self, hours=24):
"""
获取性能摘要
"""
cutoff_time = datetime.now() - timedelta(hours=hours)
recent_metrics = [
metric for metric in self.log_stats['performance_metrics']
if metric['timestamp'] >= cutoff_time
]
# 计算平均响应时间
request_metrics = [m for m in recent_metrics if m['type'] == 'request']
if request_metrics:
avg_response_time = sum(m['response_time'] for m in request_metrics) / len(request_metrics)
max_response_time = max(m['response_time'] for m in request_metrics)
min_response_time = min(m['response_time'] for m in request_metrics)
else:
avg_response_time = max_response_time = min_response_time = 0
# 统计状态码
status_codes = Counter(m['status_code'] for m in request_metrics)
return {
'total_requests': len(request_metrics),
'avg_response_time': avg_response_time,
'max_response_time': max_response_time,
'min_response_time': min_response_time,
'status_codes': dict(status_codes)
}
def generate_log_report(self):
"""
生成日志报告
"""
print("\n📊 日志监控报告")
print("=" * 50)
print(f"报告时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print()
# 基本统计
print("📈 基本统计:")
print(f" 总日志行数: {self.log_stats['total_lines']}")
print(f" 错误日志: {self.log_stats['error_count']}")
print(f" 警告日志: {self.log_stats['warning_count']}")
print(f" 信息日志: {self.log_stats['info_count']}")
print(f" 调试日志: {self.log_stats['debug_count']}")
# 错误摘要
error_summary = self.get_error_summary()
print(f"\n❌ 错误摘要 (最近24小时):")
print(f" 总错误数: {error_summary['total_errors']}")
if error_summary['error_patterns']:
print(" 错误类型分布:")
for error_type, count in error_summary['error_patterns'].most_common(5):
print(f" - {error_type}: {count}")
# 性能摘要
perf_summary = self.get_performance_summary()
print(f"\n⚡ 性能摘要 (最近24小时):")
print(f" 总请求数: {perf_summary['total_requests']}")
if perf_summary['total_requests'] > 0:
print(f" 平均响应时间: {perf_summary['avg_response_time']:.2f}ms")
print(f" 最大响应时间: {perf_summary['max_response_time']}ms")
print(f" 最小响应时间: {perf_summary['min_response_time']}ms")
if perf_summary['status_codes']:
print(" 状态码分布:")
for status_code, count in perf_summary['status_codes'].items():
print(f" - {status_code}: {count}")
# 爬虫统计
if self.log_stats['spider_stats']:
print(f"\n🕷️ 爬虫统计:")
for spider_name, stats in self.log_stats['spider_stats'].items():
print(f" {spider_name}:")
for stat_name, value in stats.items():
print(f" - {stat_name}: {value}")
print("=" * 50)
# 自定义日志处理器
class ScrapyLogHandler(logging.Handler):
"""
Scrapy日志处理器
"""
def __init__(self, log_monitor):
super().__init__()
self.log_monitor = log_monitor
def emit(self, record):
"""
处理日志记录
"""
try:
log_line = self.format(record)
self.log_monitor._process_log_line(log_line)
except Exception:
self.handleError(record)
# 日志配置
def setup_logging_with_monitoring(log_file_path):
"""
设置带监控的日志配置
"""
# 创建日志监控器
log_monitor = LogMonitor(log_file_path)
# 配置日志格式
formatter = logging.Formatter(
'%(asctime)s [%(name)s] %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# 文件处理器
file_handler = logging.FileHandler(log_file_path, encoding='utf-8')
file_handler.setFormatter(formatter)
# 监控处理器
monitor_handler = ScrapyLogHandler(log_monitor)
monitor_handler.setFormatter(formatter)
# 配置根日志器
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(file_handler)
root_logger.addHandler(monitor_handler)
# 启动监控
log_monitor.start_monitoring()
return log_monitor
# 使用示例
log_file = 'scrapy.log'
log_monitor = setup_logging_with_monitoring(log_file)
# 模拟一些日志
logger = logging.getLogger('test_spider')
logger.info("Spider opened")
logger.info("Crawled (200) <GET https://example.com> (referer: None) [150ms]")
logger.info("Scraped from <200 https://example.com>")
logger.warning("Retrying request due to timeout")
logger.error("TimeoutError: Request timed out")
logger.error("ConnectionError: Failed to establish connection")
logger.info("Spider closed (finished)")
# 等待处理
time.sleep(1)
# 生成报告
log_monitor.generate_log_report()
log_monitor.stop_monitoring()
print("日志监控系统演示完成!")
2.3 告警系统
# 6. 告警系统
print("\n🚨 告警系统:")
import smtplib
import json
import requests
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart
from datetime import datetime, timedelta
from enum import Enum
class AlertLevel(Enum):
"""告警级别"""
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
class AlertChannel(Enum):
"""告警渠道"""
EMAIL = "email"
SLACK = "slack"
WEBHOOK = "webhook"
SMS = "sms"
class AlertManager:
"""
告警管理器
"""
def __init__(self, config_file='alert_config.json'):
self.config = self.load_config(config_file)
self.alert_history = []
self.suppressed_alerts = set()
# 告警规则
self.alert_rules = {
'high_error_rate': {
'condition': lambda stats: stats.get('error_rate', 0) > 10,
'level': AlertLevel.ERROR,
'message': "错误率过高: {error_rate:.2f}%",
'cooldown': 300 # 5分钟冷却
},
'high_response_time': {
'condition': lambda stats: stats.get('avg_response_time', 0) > 5,
'level': AlertLevel.WARNING,
'message': "响应时间过长: {avg_response_time:.2f}秒",
'cooldown': 180 # 3分钟冷却
},
'spider_failure': {
'condition': lambda stats: stats.get('spider_status') == 'failed',
'level': AlertLevel.CRITICAL,
'message': "爬虫运行失败: {spider_name}",
'cooldown': 60 # 1分钟冷却
},
'memory_usage_high': {
'condition': lambda stats: stats.get('memory_percent', 0) > 90,
'level': AlertLevel.CRITICAL,
'message': "内存使用率过高: {memory_percent:.1f}%",
'cooldown': 300 # 5分钟冷却
},
'disk_space_low': {
'condition': lambda stats: stats.get('disk_percent', 0) > 95,
'level': AlertLevel.CRITICAL,
'message': "磁盘空间不足: {disk_percent:.1f}%",
'cooldown': 600 # 10分钟冷却
}
}
def load_config(self, config_file):
"""
加载告警配置
"""
try:
with open(config_file, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
# 创建默认配置
default_config = {
'email': {
'smtp_server': 'smtp.gmail.com',
'smtp_port': 587,
'username': 'your_email@gmail.com',
'password': 'your_password',
'recipients': ['admin@example.com']
},
'slack': {
'webhook_url': 'https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK',
'channel': '#alerts'
},
'webhook': {
'url': 'https://your-webhook-endpoint.com/alerts',
'headers': {'Content-Type': 'application/json'}
},
'enabled_channels': ['email', 'slack'],
'alert_levels': {
'info': ['slack'],
'warning': ['slack'],
'error': ['email', 'slack'],
'critical': ['email', 'slack', 'webhook']
}
}
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(default_config, f, indent=2, ensure_ascii=False)
print(f"创建默认告警配置: {config_file}")
return default_config
def check_alerts(self, stats):
"""
检查告警条件
"""
current_time = datetime.now()
triggered_alerts = []
for rule_name, rule in self.alert_rules.items():
# 检查冷却时间
if self._is_suppressed(rule_name, current_time):
continue
# 检查告警条件
if rule['condition'](stats):
alert = {
'rule_name': rule_name,
'level': rule['level'],
'message': rule['message'].format(**stats),
'timestamp': current_time,
'stats': stats
}
triggered_alerts.append(alert)
# 发送告警
self.send_alert(alert)
# 添加到抑制列表
self._suppress_alert(rule_name, current_time, rule['cooldown'])
return triggered_alerts
def _is_suppressed(self, rule_name, current_time):
"""
检查告警是否被抑制
"""
suppression_key = f"{rule_name}_{current_time.strftime('%Y%m%d%H%M')}"
# 清理过期的抑制
expired_keys = [key for key in self.suppressed_alerts
if self._is_suppression_expired(key, current_time)]
for key in expired_keys:
self.suppressed_alerts.discard(key)
return any(key.startswith(rule_name) for key in self.suppressed_alerts)
def _suppress_alert(self, rule_name, current_time, cooldown_seconds):
"""
抑制告警
"""
expiry_time = current_time + timedelta(seconds=cooldown_seconds)
suppression_key = f"{rule_name}_{expiry_time.strftime('%Y%m%d%H%M%S')}"
self.suppressed_alerts.add(suppression_key)
def _is_suppression_expired(self, suppression_key, current_time):
"""
检查抑制是否过期
"""
try:
expiry_str = suppression_key.split('_')[-1]
expiry_time = datetime.strptime(expiry_str, '%Y%m%d%H%M%S')
return current_time > expiry_time
except (ValueError, IndexError):
return True
def send_alert(self, alert):
"""
发送告警
"""
level = alert['level'].value
enabled_channels = self.config.get('alert_levels', {}).get(level, [])
for channel in enabled_channels:
if channel in self.config.get('enabled_channels', []):
try:
if channel == 'email':
self._send_email_alert(alert)
elif channel == 'slack':
self._send_slack_alert(alert)
elif channel == 'webhook':
self._send_webhook_alert(alert)
print(f"✅ 告警已发送到 {channel}: {alert['message']}")
except Exception as e:
print(f"❌ 发送告警到 {channel} 失败: {str(e)}")
# 记录告警历史
self.alert_history.append(alert)
# 保持最近1000条告警记录
if len(self.alert_history) > 1000:
self.alert_history = self.alert_history[-1000:]
def _send_email_alert(self, alert):
"""
发送邮件告警
"""
email_config = self.config.get('email', {})
if not email_config.get('username') or not email_config.get('recipients'):
raise ValueError("邮件配置不完整")
# 创建邮件
msg = MimeMultipart()
msg['From'] = email_config['username']
msg['To'] = ', '.join(email_config['recipients'])
msg['Subject'] = f"[{alert['level'].value.upper()}] Scrapy告警: {alert['rule_name']}"
# 邮件内容
body = f"""
告警详情:
- 规则: {alert['rule_name']}
- 级别: {alert['level'].value.upper()}
- 时间: {alert['timestamp'].strftime('%Y-%m-%d %H:%M:%S')}
- 消息: {alert['message']}
系统状态:
{json.dumps(alert['stats'], indent=2, ensure_ascii=False)}
"""
msg.attach(MimeText(body, 'plain', 'utf-8'))
# 发送邮件
with smtplib.SMTP(email_config['smtp_server'], email_config['smtp_port']) as server:
server.starttls()
server.login(email_config['username'], email_config['password'])
server.send_message(msg)
def _send_slack_alert(self, alert):
"""
发送Slack告警
"""
slack_config = self.config.get('slack', {})
if not slack_config.get('webhook_url'):
raise ValueError("Slack配置不完整")
# 设置颜色
color_map = {
AlertLevel.INFO: 'good',
AlertLevel.WARNING: 'warning',
AlertLevel.ERROR: 'danger',
AlertLevel.CRITICAL: 'danger'
}
# 构建消息
payload = {
'channel': slack_config.get('channel', '#alerts'),
'username': 'Scrapy Monitor',
'icon_emoji': ':spider:',
'attachments': [{
'color': color_map.get(alert['level'], 'danger'),
'title': f"{alert['level'].value.upper()} Alert: {alert['rule_name']}",
'text': alert['message'],
'fields': [
{
'title': '时间',
'value': alert['timestamp'].strftime('%Y-%m-%d %H:%M:%S'),
'short': True
},
{
'title': '级别',
'value': alert['level'].value.upper(),
'short': True
}
],
'footer': 'Scrapy Monitor',
'ts': int(alert['timestamp'].timestamp())
}]
}
# 发送到Slack
response = requests.post(
slack_config['webhook_url'],
json=payload,
timeout=10
)
response.raise_for_status()
def _send_webhook_alert(self, alert):
"""
发送Webhook告警
"""
webhook_config = self.config.get('webhook', {})
if not webhook_config.get('url'):
raise ValueError("Webhook配置不完整")
# 构建载荷
payload = {
'alert_type': 'scrapy_monitor',
'rule_name': alert['rule_name'],
'level': alert['level'].value,
'message': alert['message'],
'timestamp': alert['timestamp'].isoformat(),
'stats': alert['stats']
}
# 发送请求
response = requests.post(
webhook_config['url'],
json=payload,
headers=webhook_config.get('headers', {}),
timeout=10
)
response.raise_for_status()
def get_alert_history(self, hours=24, level=None):
"""
获取告警历史
"""
cutoff_time = datetime.now() - timedelta(hours=hours)
filtered_alerts = [
alert for alert in self.alert_history
if alert['timestamp'] >= cutoff_time
]
if level:
filtered_alerts = [
alert for alert in filtered_alerts
if alert['level'] == level
]
return filtered_alerts
def generate_alert_report(self):
"""
生成告警报告
"""
print("\n🚨 告警系统报告")
print("=" * 50)
print(f"报告时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print()
# 最近24小时告警统计
recent_alerts = self.get_alert_history(24)
if recent_alerts:
print(f"📊 最近24小时告警统计 (共{len(recent_alerts)}条):")
# 按级别统计
level_counts = {}
for alert in recent_alerts:
level = alert['level'].value
level_counts[level] = level_counts.get(level, 0) + 1
for level, count in level_counts.items():
print(f" - {level.upper()}: {count}")
# 最近告警
print(f"\n🔔 最近告警 (最近5条):")
for alert in recent_alerts[-5:]:
print(f" - {alert['timestamp'].strftime('%H:%M:%S')} [{alert['level'].value.upper()}] {alert['message']}")
else:
print("✅ 最近24小时无告警")
# 当前抑制的告警
if self.suppressed_alerts:
print(f"\n🔇 当前抑制的告警规则: {len(self.suppressed_alerts)}个")
print("=" * 50)
# 使用示例
alert_manager = AlertManager()
# 模拟一些统计数据和告警
test_stats = [
{'error_rate': 15, 'avg_response_time': 2, 'memory_percent': 70},
{'error_rate': 5, 'avg_response_time': 8, 'memory_percent': 85},
{'error_rate': 3, 'avg_response_time': 3, 'memory_percent': 95},
{'spider_status': 'failed', 'spider_name': 'test_spider', 'memory_percent': 60}
]
for i, stats in enumerate(test_stats):
print(f"\n检查告警 #{i+1}: {stats}")
alerts = alert_manager.check_alerts(stats)
if alerts:
for alert in alerts:
print(f" 触发告警: {alert['message']}")
else:
print(" 无告警触发")
time.sleep(1)
# 生成告警报告
alert_manager.generate_alert_report()
print("告警系统演示完成!")
3. 故障排除与维护
3.1 常见问题诊断
# 7. 故障排除与维护
print("\n🔧 故障排除与维护:")
import subprocess
import psutil
import socket
from urllib.parse import urlparse
class DiagnosticTool:
"""
诊断工具
"""
def __init__(self):
self.diagnostic_results = {}
def run_full_diagnosis(self):
"""
运行完整诊断
"""
print("🔍 开始系统诊断...")
diagnostics = [
('系统资源', self.check_system_resources),
('网络连接', self.check_network_connectivity),
('Scrapy环境', self.check_scrapy_environment),
('依赖包', self.check_dependencies),
('配置文件', self.check_configuration),
('日志文件', self.check_log_files),
('数据库连接', self.check_database_connection),
('Redis连接', self.check_redis_connection)
]
for name, diagnostic_func in diagnostics:
print(f"\n📋 检查 {name}:")
try:
result = diagnostic_func()
self.diagnostic_results[name] = result
if result['status'] == 'ok':
print(f" ✅ {result['message']}")
else:
print(f" ❌ {result['message']}")
if 'suggestions' in result:
print(" 💡 建议:")
for suggestion in result['suggestions']:
print(f" - {suggestion}")
except Exception as e:
error_result = {
'status': 'error',
'message': f"诊断过程出错: {str(e)}"
}
self.diagnostic_results[name] = error_result
print(f" ❌ {error_result['message']}")
return self.diagnostic_results
def check_system_resources(self):
"""
检查系统资源
"""
try:
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
# 内存使用
memory = psutil.virtual_memory()
# 磁盘使用
disk = psutil.disk_usage('/')
issues = []
suggestions = []
if cpu_percent > 90:
issues.append(f"CPU使用率过高: {cpu_percent:.1f}%")
suggestions.append("检查是否有异常进程占用CPU")
if memory.percent > 90:
issues.append(f"内存使用率过高: {memory.percent:.1f}%")
suggestions.append("考虑增加内存或优化内存使用")
disk_percent = (disk.used / disk.total) * 100
if disk_percent > 90:
issues.append(f"磁盘使用率过高: {disk_percent:.1f}%")
suggestions.append("清理日志文件和临时文件")
if issues:
return {
'status': 'warning',
'message': '; '.join(issues),
'suggestions': suggestions,
'details': {
'cpu_percent': cpu_percent,
'memory_percent': memory.percent,
'disk_percent': disk_percent
}
}
else:
return {
'status': 'ok',
'message': f"系统资源正常 (CPU: {cpu_percent:.1f}%, 内存: {memory.percent:.1f}%, 磁盘: {disk_percent:.1f}%)",
'details': {
'cpu_percent': cpu_percent,
'memory_percent': memory.percent,
'disk_percent': disk_percent
}
}
except Exception as e:
return {
'status': 'error',
'message': f"无法检查系统资源: {str(e)}"
}
def check_network_connectivity(self):
"""
检查网络连接
"""
test_urls = [
'https://httpbin.org/get',
'https://www.google.com',
'https://www.baidu.com'
]
successful_connections = 0
failed_connections = []
for url in test_urls:
try:
import requests
response = requests.get(url, timeout=10)
if response.status_code == 200:
successful_connections += 1
else:
failed_connections.append(f"{url} (HTTP {response.status_code})")
except Exception as e:
failed_connections.append(f"{url} ({str(e)})")
if successful_connections == len(test_urls):
return {
'status': 'ok',
'message': f"网络连接正常 ({successful_connections}/{len(test_urls)} 成功)"
}
elif successful_connections > 0:
return {
'status': 'warning',
'message': f"部分网络连接失败 ({successful_connections}/{len(test_urls)} 成功)",
'suggestions': [
"检查网络配置",
"检查防火墙设置",
"检查代理配置"
],
'details': {'failed_connections': failed_connections}
}
else:
return {
'status': 'error',
'message': "所有网络连接失败",
'suggestions': [
"检查网络连接",
"检查DNS设置",
"检查防火墙配置"
],
'details': {'failed_connections': failed_connections}
}
def check_scrapy_environment(self):
"""
检查Scrapy环境
"""
try:
import scrapy
import twisted
# 检查版本
scrapy_version = scrapy.__version__
twisted_version = twisted.__version__
# 检查Scrapy命令
result = subprocess.run(
[sys.executable, '-m', 'scrapy', 'version'],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
return {
'status': 'ok',
'message': f"Scrapy环境正常 (Scrapy: {scrapy_version}, Twisted: {twisted_version})",
'details': {
'scrapy_version': scrapy_version,
'twisted_version': twisted_version,
'command_output': result.stdout
}
}
else:
return {
'status': 'error',
'message': f"Scrapy命令执行失败: {result.stderr}",
'suggestions': [
"重新安装Scrapy",
"检查Python环境",
"检查PATH环境变量"
]
}
except ImportError as e:
return {
'status': 'error',
'message': f"Scrapy未正确安装: {str(e)}",
'suggestions': [
"安装Scrapy: pip install scrapy",
"检查虚拟环境",
"检查Python版本兼容性"
]
}
except Exception as e:
return {
'status': 'error',
'message': f"检查Scrapy环境时出错: {str(e)}"
}
def check_dependencies(self):
"""
检查依赖包
"""
try:
# 检查关键依赖
critical_packages = [
'scrapy',
'twisted',
'lxml',
'requests',
'urllib3'
]
missing_packages = []
installed_packages = {}
for package in critical_packages:
try:
module = __import__(package)
version = getattr(module, '__version__', 'unknown')
installed_packages[package] = version
except ImportError:
missing_packages.append(package)
if missing_packages:
return {
'status': 'error',
'message': f"缺少关键依赖包: {', '.join(missing_packages)}",
'suggestions': [
f"安装缺少的包: pip install {' '.join(missing_packages)}",
"检查requirements.txt文件",
"使用虚拟环境管理依赖"
],
'details': {
'missing_packages': missing_packages,
'installed_packages': installed_packages
}
}
else:
return {
'status': 'ok',
'message': f"所有关键依赖包已安装 ({len(installed_packages)}个)",
'details': {'installed_packages': installed_packages}
}
except Exception as e:
return {
'status': 'error',
'message': f"检查依赖包时出错: {str(e)}"
}
def check_configuration(self):
"""
检查配置文件
"""
config_files = [
'scrapy.cfg',
'settings.py',
'deploy.ini'
]
found_configs = []
missing_configs = []
for config_file in config_files:
if os.path.exists(config_file):
found_configs.append(config_file)
else:
missing_configs.append(config_file)
if found_configs:
return {
'status': 'ok',
'message': f"找到配置文件: {', '.join(found_configs)}",
'details': {
'found_configs': found_configs,
'missing_configs': missing_configs
}
}
else:
return {
'status': 'warning',
'message': "未找到配置文件",
'suggestions': [
"检查当前目录是否为Scrapy项目根目录",
"创建必要的配置文件",
"检查文件权限"
],
'details': {'missing_configs': missing_configs}
}
def check_log_files(self):
"""
检查日志文件
"""
log_patterns = [
'*.log',
'logs/*.log',
'scrapy.log'
]
import glob
found_logs = []
for pattern in log_patterns:
found_logs.extend(glob.glob(pattern))
if found_logs:
# 检查日志文件大小
large_logs = []
for log_file in found_logs:
size = os.path.getsize(log_file)
if size > 100 * 1024 * 1024: # 100MB
large_logs.append((log_file, size))
suggestions = []
if large_logs:
suggestions.append("考虑清理或轮转大型日志文件")
suggestions.append("配置日志轮转策略")
return {
'status': 'ok' if not large_logs else 'warning',
'message': f"找到日志文件: {len(found_logs)}个",
'suggestions': suggestions,
'details': {
'log_files': found_logs,
'large_logs': large_logs
}
}
else:
return {
'status': 'warning',
'message': "未找到日志文件",
'suggestions': [
"检查日志配置",
"确认爬虫已运行过",
"检查日志目录权限"
]
}
def check_database_connection(self):
"""
检查数据库连接
"""
try:
# 这里以PostgreSQL为例
import psycopg2
# 从环境变量或配置文件获取连接信息
conn_params = {
'host': os.getenv('DB_HOST', 'localhost'),
'port': os.getenv('DB_PORT', '5432'),
'database': os.getenv('DB_NAME', 'scrapy_db'),
'user': os.getenv('DB_USER', 'scrapy_user'),
'password': os.getenv('DB_PASSWORD', '')
}
if not conn_params['password']:
return {
'status': 'warning',
'message': "数据库密码未配置",
'suggestions': [
"设置DB_PASSWORD环境变量",
"检查数据库配置文件"
]
}
# 尝试连接
conn = psycopg2.connect(**conn_params)
cursor = conn.cursor()
cursor.execute('SELECT version();')
version = cursor.fetchone()[0]
cursor.close()
conn.close()
return {
'status': 'ok',
'message': f"数据库连接正常",
'details': {
'database_version': version,
'connection_params': {k: v for k, v in conn_params.items() if k != 'password'}
}
}
except ImportError:
return {
'status': 'warning',
'message': "数据库驱动未安装",
'suggestions': [
"安装数据库驱动: pip install psycopg2-binary",
"根据使用的数据库安装相应驱动"
]
}
except Exception as e:
return {
'status': 'error',
'message': f"数据库连接失败: {str(e)}",
'suggestions': [
"检查数据库服务是否运行",
"检查连接参数",
"检查网络连接",
"检查数据库用户权限"
]
}
def check_redis_connection(self):
"""
检查Redis连接
"""
try:
import redis
# 从环境变量获取Redis配置
redis_host = os.getenv('REDIS_HOST', 'localhost')
redis_port = int(os.getenv('REDIS_PORT', '6379'))
redis_db = int(os.getenv('REDIS_DB', '0'))
redis_password = os.getenv('REDIS_PASSWORD', None)
# 创建Redis连接
r = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
password=redis_password,
socket_timeout=5
)
# 测试连接
r.ping()
# 获取Redis信息
info = r.info()
return {
'status': 'ok',
'message': f"Redis连接正常",
'details': {
'redis_version': info.get('redis_version'),
'used_memory_human': info.get('used_memory_human'),
'connected_clients': info.get('connected_clients')
}
}
except ImportError:
return {
'status': 'warning',
'message': "Redis客户端未安装",
'suggestions': [
"安装Redis客户端: pip install redis"
]
}
except Exception as e:
return {
'status': 'error',
'message': f"Redis连接失败: {str(e)}",
'suggestions': [
"检查Redis服务是否运行",
"检查Redis配置",
"检查网络连接",
"检查Redis密码"
]
}
def generate_diagnostic_report(self):
"""
生成诊断报告
"""
print("\n🔍 系统诊断报告")
print("=" * 60)
print(f"诊断时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print()
# 统计结果
total_checks = len(self.diagnostic_results)
ok_count = sum(1 for result in self.diagnostic_results.values() if result['status'] == 'ok')
warning_count = sum(1 for result in self.diagnostic_results.values() if result['status'] == 'warning')
error_count = sum(1 for result in self.diagnostic_results.values() if result['status'] == 'error')
print(f"📊 诊断概览:")
print(f" 总检查项: {total_checks}")
print(f" 正常: {ok_count} ✅")
print(f" 警告: {warning_count} ⚠️")
print(f" 错误: {error_count} ❌")
print()
# 详细结果
for check_name, result in self.diagnostic_results.items():
status_icon = {
'ok': '✅',
'warning': '⚠️',
'error': '❌'
}.get(result['status'], '❓')
print(f"{status_icon} {check_name}: {result['message']}")
if 'suggestions' in result and result['suggestions']:
print(" 💡 建议:")
for suggestion in result['suggestions']:
print(f" - {suggestion}")
print()
print("=" * 60)
# 使用示例
diagnostic_tool = DiagnosticTool()
results = diagnostic_tool.run_full_diagnosis()
diagnostic_tool.generate_diagnostic_report()
print("故障排除与维护演示完成!")
3.2 维护脚本
# 8. 维护脚本
print("\n🛠️ 维护脚本:")
import shutil
import tarfile
import zipfile
from datetime import datetime, timedelta
class MaintenanceManager:
"""
维护管理器
"""
def __init__(self, config=None):
self.config = config or {
'log_retention_days': 30,
'backup_retention_days': 7,
'max_log_size_mb': 100,
'backup_directory': 'backups',
'log_directory': 'logs',
'data_directory': 'data'
}
def run_daily_maintenance(self):
"""
运行日常维护任务
"""
print("🔄 开始日常维护任务...")
maintenance_tasks = [
('清理旧日志', self.cleanup_old_logs),
('轮转大日志文件', self.rotate_large_logs),
('清理临时文件', self.cleanup_temp_files),
('备份重要数据', self.backup_important_data),
('清理旧备份', self.cleanup_old_backups),
('检查磁盘空间', self.check_disk_space),
('优化数据库', self.optimize_database),
('更新统计信息', self.update_statistics)
]
results = {}
for task_name, task_func in maintenance_tasks:
print(f"\n📋 执行任务: {task_name}")
try:
result = task_func()
results[task_name] = result
if result.get('success', True):
print(f" ✅ {result.get('message', '任务完成')}")
else:
print(f" ❌ {result.get('message', '任务失败')}")
if 'details' in result:
for detail in result['details']:
print(f" - {detail}")
except Exception as e:
error_result = {
'success': False,
'message': f"任务执行出错: {str(e)}"
}
results[task_name] = error_result
print(f" ❌ {error_result['message']}")
# 生成维护报告
self.generate_maintenance_report(results)
return results
def cleanup_old_logs(self):
"""
清理旧日志文件
"""
try:
log_dir = self.config['log_directory']
retention_days = self.config['log_retention_days']
cutoff_date = datetime.now() - timedelta(days=retention_days)
if not os.path.exists(log_dir):
return {
'success': True,
'message': f"日志目录不存在: {log_dir}"
}
deleted_files = []
total_size_freed = 0
for root, dirs, files in os.walk(log_dir):
for file in files:
if file.endswith('.log'):
file_path = os.path.join(root, file)
# 检查文件修改时间
file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_mtime < cutoff_date:
file_size = os.path.getsize(file_path)
os.remove(file_path)
deleted_files.append(file)
total_size_freed += file_size
return {
'success': True,
'message': f"清理了 {len(deleted_files)} 个旧日志文件,释放 {total_size_freed / 1024 / 1024:.2f} MB",
'details': [f"删除文件: {file}" for file in deleted_files[:10]] # 只显示前10个
}
except Exception as e:
return {
'success': False,
'message': f"清理旧日志失败: {str(e)}"
}
def rotate_large_logs(self):
"""
轮转大日志文件
"""
try:
log_dir = self.config['log_directory']
max_size_bytes = self.config['max_log_size_mb'] * 1024 * 1024
if not os.path.exists(log_dir):
return {
'success': True,
'message': f"日志目录不存在: {log_dir}"
}
rotated_files = []
for root, dirs, files in os.walk(log_dir):
for file in files:
if file.endswith('.log'):
file_path = os.path.join(root, file)
if os.path.getsize(file_path) > max_size_bytes:
# 创建轮转文件名
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
rotated_name = f"{file}.{timestamp}"
rotated_path = os.path.join(root, rotated_name)
# 移动文件
shutil.move(file_path, rotated_path)
# 创建新的空日志文件
open(file_path, 'w').close()
rotated_files.append(f"{file} -> {rotated_name}")
return {
'success': True,
'message': f"轮转了 {len(rotated_files)} 个大日志文件",
'details': rotated_files
}
except Exception as e:
return {
'success': False,
'message': f"轮转日志文件失败: {str(e)}"
}
def cleanup_temp_files(self):
"""
清理临时文件
"""
try:
temp_patterns = [
'*.tmp',
'*.temp',
'__pycache__',
'*.pyc',
'.scrapy'
]
deleted_items = []
total_size_freed = 0
for pattern in temp_patterns:
if pattern == '__pycache__':
# 清理Python缓存目录
for root, dirs, files in os.walk('.'):
if '__pycache__' in dirs:
cache_dir = os.path.join(root, '__pycache__')
dir_size = sum(
os.path.getsize(os.path.join(dirpath, filename))
for dirpath, dirnames, filenames in os.walk(cache_dir)
for filename in filenames
)
shutil.rmtree(cache_dir)
deleted_items.append(f"目录: {cache_dir}")
total_size_freed += dir_size
elif pattern == '.scrapy':
# 清理Scrapy临时目录
if os.path.exists('.scrapy'):
dir_size = sum(
os.path.getsize(os.path.join(dirpath, filename))
for dirpath, dirnames, filenames in os.walk('.scrapy')
for filename in filenames
)
shutil.rmtree('.scrapy')
deleted_items.append("目录: .scrapy")
total_size_freed += dir_size
else:
# 清理匹配模式的文件
import glob
for file_path in glob.glob(pattern):
if os.path.isfile(file_path):
file_size = os.path.getsize(file_path)
os.remove(file_path)
deleted_items.append(f"文件: {file_path}")
total_size_freed += file_size
return {
'success': True,
'message': f"清理了 {len(deleted_items)} 个临时文件/目录,释放 {total_size_freed / 1024 / 1024:.2f} MB",
'details': deleted_items[:10] # 只显示前10个
}
except Exception as e:
return {
'success': False,
'message': f"清理临时文件失败: {str(e)}"
}
def backup_important_data(self):
"""
备份重要数据
"""
try:
backup_dir = self.config['backup_directory']
data_dir = self.config['data_directory']
# 创建备份目录
os.makedirs(backup_dir, exist_ok=True)
# 创建备份文件名
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_filename = f"scrapy_backup_{timestamp}.tar.gz"
backup_path = os.path.join(backup_dir, backup_filename)
# 要备份的文件和目录
backup_items = [
'scrapy.cfg',
'settings.py',
'deploy.ini'
]
# 添加数据目录(如果存在)
if os.path.exists(data_dir):
backup_items.append(data_dir)
# 添加爬虫代码目录
for item in os.listdir('.'):
if os.path.isdir(item) and item not in ['logs', 'backups', '__pycache__', '.scrapy']:
if any(f.endswith('.py') for f in os.listdir(item) if os.path.isfile(os.path.join(item, f))):
backup_items.append(item)
# 创建tar.gz备份
with tarfile.open(backup_path, 'w:gz') as tar:
for item in backup_items:
if os.path.exists(item):
tar.add(item, arcname=item)
backup_size = os.path.getsize(backup_path)
return {
'success': True,
'message': f"创建备份: {backup_filename} ({backup_size / 1024 / 1024:.2f} MB)",
'details': [f"备份项目: {item}" for item in backup_items if os.path.exists(item)]
}
except Exception as e:
return {
'success': False,
'message': f"备份数据失败: {str(e)}"
}
def cleanup_old_backups(self):
"""
清理旧备份文件
"""
try:
backup_dir = self.config['backup_directory']
retention_days = self.config['backup_retention_days']
cutoff_date = datetime.now() - timedelta(days=retention_days)
if not os.path.exists(backup_dir):
return {
'success': True,
'message': f"备份目录不存在: {backup_dir}"
}
deleted_backups = []
total_size_freed = 0
for file in os.listdir(backup_dir):
if file.startswith('scrapy_backup_') and (file.endswith('.tar.gz') or file.endswith('.zip')):
file_path = os.path.join(backup_dir, file)
file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_mtime < cutoff_date:
file_size = os.path.getsize(file_path)
os.remove(file_path)
deleted_backups.append(file)
total_size_freed += file_size
return {
'success': True,
'message': f"清理了 {len(deleted_backups)} 个旧备份文件,释放 {total_size_freed / 1024 / 1024:.2f} MB",
'details': [f"删除备份: {backup}" for backup in deleted_backups]
}
except Exception as e:
return {
'success': False,
'message': f"清理旧备份失败: {str(e)}"
}
def check_disk_space(self):
"""
检查磁盘空间
"""
try:
disk_usage = psutil.disk_usage('.')
total_gb = disk_usage.total / (1024**3)
used_gb = disk_usage.used / (1024**3)
free_gb = disk_usage.free / (1024**3)
used_percent = (disk_usage.used / disk_usage.total) * 100
status = "正常"
if used_percent > 95:
status = "严重不足"
elif used_percent > 90:
status = "不足"
elif used_percent > 80:
status = "偏高"
return {
'success': True,
'message': f"磁盘空间状态: {status} (使用率: {used_percent:.1f}%)",
'details': [
f"总空间: {total_gb:.2f} GB",
f"已使用: {used_gb:.2f} GB",
f"可用空间: {free_gb:.2f} GB"
]
}
except Exception as e:
return {
'success': False,
'message': f"检查磁盘空间失败: {str(e)}"
}
def optimize_database(self):
"""
优化数据库
"""
try:
# 这里以SQLite为例
import sqlite3
db_files = []
for root, dirs, files in os.walk('.'):
for file in files:
if file.endswith('.db') or file.endswith('.sqlite'):
db_files.append(os.path.join(root, file))
if not db_files:
return {
'success': True,
'message': "未找到数据库文件"
}
optimized_dbs = []
for db_file in db_files:
try:
# 获取优化前的大小
original_size = os.path.getsize(db_file)
# 连接数据库并执行VACUUM
conn = sqlite3.connect(db_file)
conn.execute('VACUUM;')
conn.close()
# 获取优化后的大小
new_size = os.path.getsize(db_file)
size_saved = original_size - new_size
optimized_dbs.append(f"{db_file}: 节省 {size_saved / 1024:.2f} KB")
except Exception as e:
optimized_dbs.append(f"{db_file}: 优化失败 - {str(e)}")
return {
'success': True,
'message': f"优化了 {len(db_files)} 个数据库文件",
'details': optimized_dbs
}
except Exception as e:
return {
'success': False,
'message': f"优化数据库失败: {str(e)}"
}
def update_statistics(self):
"""
更新统计信息
"""
try:
stats = {}
# 统计项目文件
python_files = 0
total_lines = 0
for root, dirs, files in os.walk('.'):
# 跳过特定目录
dirs[:] = [d for d in dirs if d not in ['__pycache__', '.git', 'logs', 'backups']]
for file in files:
if file.endswith('.py'):
python_files += 1
file_path = os.path.join(root, file)
try:
with open(file_path, 'r', encoding='utf-8') as f:
total_lines += len(f.readlines())
except:
pass
stats['python_files'] = python_files
stats['total_lines'] = total_lines
# 统计日志文件
log_dir = self.config['log_directory']
if os.path.exists(log_dir):
log_files = len([f for f in os.listdir(log_dir) if f.endswith('.log')])
log_size = sum(
os.path.getsize(os.path.join(log_dir, f))
for f in os.listdir(log_dir)
if f.endswith('.log')
)
stats['log_files'] = log_files
stats['log_size_mb'] = log_size / 1024 / 1024
# 统计备份文件
backup_dir = self.config['backup_directory']
if os.path.exists(backup_dir):
backup_files = len([f for f in os.listdir(backup_dir)
if f.startswith('scrapy_backup_')])
stats['backup_files'] = backup_files
# 保存统计信息
stats_file = 'project_stats.json'
stats['last_updated'] = datetime.now().isoformat()
with open(stats_file, 'w', encoding='utf-8') as f:
json.dump(stats, f, indent=2, ensure_ascii=False)
return {
'success': True,
'message': f"更新项目统计信息",
'details': [
f"Python文件: {stats.get('python_files', 0)} 个",
f"代码行数: {stats.get('total_lines', 0)} 行",
f"日志文件: {stats.get('log_files', 0)} 个",
f"备份文件: {stats.get('backup_files', 0)} 个"
]
}
except Exception as e:
return {
'success': False,
'message': f"更新统计信息失败: {str(e)}"
}
def generate_maintenance_report(self, results):
"""
生成维护报告
"""
print("\n📊 维护任务报告")
print("=" * 60)
print(f"维护时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print()
# 统计结果
total_tasks = len(results)
successful_tasks = sum(1 for result in results.values() if result.get('success', True))
failed_tasks = total_tasks - successful_tasks
print(f"📈 任务概览:")
print(f" 总任务数: {total_tasks}")
print(f" 成功: {successful_tasks} ✅")
print(f" 失败: {failed_tasks} ❌")
print()
# 详细结果
for task_name, result in results.items():
status_icon = "✅" if result.get('success', True) else "❌"
print(f"{status_icon} {task_name}: {result.get('message', '无消息')}")
if 'details' in result and result['details']:
for detail in result['details'][:3]: # 只显示前3个详情
print(f" - {detail}")
if len(result['details']) > 3:
print(f" ... 还有 {len(result['details']) - 3} 项")
print()
print("=" * 60)
# 使用示例
maintenance_manager = MaintenanceManager()
# 运行日常维护
results = maintenance_manager.run_daily_maintenance()
print("维护脚本演示完成!")
4. 本章小结
在本章中,我们详细介绍了Scrapy项目的部署与运维,包括:
4.1 主要内容回顾
生产环境部署
- 部署环境准备和配置
- Scrapyd部署服务器的使用
- Docker容器化部署
- 项目打包和版本管理
监控系统
- 性能监控和指标收集
- 日志监控和分析
- 告警系统和通知机制
- 实时监控面板
故障排除与维护
- 系统诊断工具
- 常见问题排查
- 维护脚本和自动化
- 数据备份和恢复
4.2 最佳实践
部署最佳实践
- 使用版本控制管理代码
- 环境隔离和配置管理
- 自动化部署流程
- 滚动更新和回滚策略
监控最佳实践
- 设置合理的监控指标
- 配置多层次告警机制
- 定期检查监控系统
- 保持监控数据的时效性
维护最佳实践
- 定期执行维护任务
- 保持系统资源充足
- 及时清理无用数据
- 建立完善的备份策略
4.3 常见陷阱
部署陷阱
- 忽略环境差异导致的问题
- 配置文件管理不当
- 依赖包版本冲突
- 权限配置错误
监控陷阱
- 监控指标设置不合理
- 告警频率过高或过低
- 忽略系统资源监控
- 日志文件过大影响性能
维护陷阱
- 维护任务执行不及时
- 备份策略不完善
- 忽略磁盘空间管理
- 数据库优化不当
4.4 下一步学习建议
深入学习容器技术
- Docker进阶使用
- Kubernetes集群管理
- 微服务架构设计
监控系统进阶
- Prometheus + Grafana
- ELK日志分析栈
- APM应用性能监控
自动化运维
- CI/CD流水线
- Infrastructure as Code
- 自动化测试和部署
云平台部署
- AWS/Azure/GCP部署
- 云原生架构
- Serverless爬虫
4.5 练习题
基础练习
- 使用Docker部署一个Scrapy项目
- 配置Scrapyd服务器并部署爬虫
- 实现基本的性能监控
进阶练习
- 设计完整的监控告警系统
- 实现自动化维护脚本
- 配置日志轮转和备份策略
高级练习
- 实现分布式监控系统
- 设计故障自动恢复机制
- 构建完整的运维平台
🎉 恭喜!你已经完成了Scrapy教程的第8章!
通过本章的学习,你已经掌握了Scrapy项目的部署与运维技能,能够将爬虫项目成功部署到生产环境,并建立完善的监控和维护体系。这些技能对于构建稳定、可靠的爬虫系统至关重要。
在下一章中,我们将学习Scrapy的扩展开发,包括自定义扩展、插件开发等高级主题。