本章概述

本章将详细介绍Scrapy项目的部署与运维,包括生产环境部署、监控系统、日志管理、性能优化、故障排除等内容。通过本章学习,你将掌握如何将Scrapy项目部署到生产环境并进行有效的运维管理。

学习目标

  • 掌握Scrapy项目的部署方法
  • 学会配置生产环境
  • 了解监控和日志管理
  • 掌握性能优化技巧
  • 学会故障排除和维护

1. 生产环境部署

1.1 部署准备

# 1. 部署准备
print("🚀 Scrapy生产环境部署:")

import os
import sys
import subprocess
import configparser
from pathlib import Path

class DeploymentManager:
    """
    部署管理器
    """
    
    def __init__(self, project_path):
        self.project_path = Path(project_path)
        self.config = configparser.ConfigParser()
        self.load_config()
    
    def load_config(self):
        """
        加载部署配置
        """
        config_file = self.project_path / 'deploy.ini'
        
        if config_file.exists():
            self.config.read(config_file)
        else:
            # 创建默认配置
            self.create_default_config(config_file)
    
    def create_default_config(self, config_file):
        """
        创建默认部署配置
        """
        self.config['DEFAULT'] = {
            'project_name': 'myspider',
            'python_version': '3.8',
            'scrapy_version': '2.5.0'
        }
        
        self.config['production'] = {
            'host': 'localhost',
            'port': '6800',
            'log_level': 'INFO',
            'concurrent_requests': '16',
            'download_delay': '1',
            'autothrottle_enabled': 'True'
        }
        
        self.config['database'] = {
            'host': 'localhost',
            'port': '5432',
            'name': 'scrapy_db',
            'user': 'scrapy_user',
            'password': 'your_password'
        }
        
        self.config['redis'] = {
            'host': 'localhost',
            'port': '6379',
            'db': '0',
            'password': ''
        }
        
        with open(config_file, 'w') as f:
            self.config.write(f)
        
        print(f"创建默认配置文件: {config_file}")
    
    def check_environment(self):
        """
        检查部署环境
        """
        print("\n🔍 检查部署环境:")
        
        checks = {
            'Python版本': self.check_python_version(),
            'Scrapy安装': self.check_scrapy_installation(),
            '依赖包': self.check_dependencies(),
            '系统资源': self.check_system_resources(),
            '网络连接': self.check_network_connectivity()
        }
        
        for check_name, result in checks.items():
            status = "✅" if result['success'] else "❌"
            print(f"  {status} {check_name}: {result['message']}")
        
        return all(check['success'] for check in checks.values())
    
    def check_python_version(self):
        """
        检查Python版本
        """
        try:
            version = sys.version_info
            required_version = tuple(map(int, self.config['DEFAULT']['python_version'].split('.')))
            
            if version[:2] >= required_version:
                return {
                    'success': True,
                    'message': f"Python {version.major}.{version.minor}.{version.micro}"
                }
            else:
                return {
                    'success': False,
                    'message': f"需要Python {required_version[0]}.{required_version[1]}+"
                }
        except Exception as e:
            return {'success': False, 'message': str(e)}
    
    def check_scrapy_installation(self):
        """
        检查Scrapy安装
        """
        try:
            import scrapy
            return {
                'success': True,
                'message': f"Scrapy {scrapy.__version__}"
            }
        except ImportError:
            return {
                'success': False,
                'message': "Scrapy未安装"
            }
    
    def check_dependencies(self):
        """
        检查依赖包
        """
        try:
            requirements_file = self.project_path / 'requirements.txt'
            
            if not requirements_file.exists():
                return {
                    'success': False,
                    'message': "requirements.txt文件不存在"
                }
            
            # 检查已安装的包
            result = subprocess.run(
                [sys.executable, '-m', 'pip', 'check'],
                capture_output=True,
                text=True
            )
            
            if result.returncode == 0:
                return {
                    'success': True,
                    'message': "所有依赖包正常"
                }
            else:
                return {
                    'success': False,
                    'message': f"依赖包问题: {result.stderr}"
                }
        except Exception as e:
            return {'success': False, 'message': str(e)}
    
    def check_system_resources(self):
        """
        检查系统资源
        """
        try:
            import psutil
            
            # 检查内存
            memory = psutil.virtual_memory()
            memory_gb = memory.total / (1024**3)
            
            # 检查磁盘空间
            disk = psutil.disk_usage('/')
            disk_free_gb = disk.free / (1024**3)
            
            if memory_gb >= 2 and disk_free_gb >= 5:
                return {
                    'success': True,
                    'message': f"内存: {memory_gb:.1f}GB, 磁盘: {disk_free_gb:.1f}GB可用"
                }
            else:
                return {
                    'success': False,
                    'message': f"资源不足 - 内存: {memory_gb:.1f}GB, 磁盘: {disk_free_gb:.1f}GB"
                }
        except ImportError:
            return {
                'success': False,
                'message': "psutil未安装,无法检查系统资源"
            }
        except Exception as e:
            return {'success': False, 'message': str(e)}
    
    def check_network_connectivity(self):
        """
        检查网络连接
        """
        try:
            import requests
            
            # 测试网络连接
            response = requests.get('https://httpbin.org/get', timeout=10)
            
            if response.status_code == 200:
                return {
                    'success': True,
                    'message': "网络连接正常"
                }
            else:
                return {
                    'success': False,
                    'message': f"网络连接异常: {response.status_code}"
                }
        except Exception as e:
            return {'success': False, 'message': f"网络连接失败: {str(e)}"}
    
    def create_requirements_file(self):
        """
        创建requirements.txt文件
        """
        requirements = [
            f"Scrapy=={self.config['DEFAULT']['scrapy_version']}",
            "scrapy-redis>=0.6.8",
            "pymongo>=3.12.0",
            "psycopg2-binary>=2.8.6",
            "redis>=3.5.3",
            "requests>=2.25.1",
            "lxml>=4.6.3",
            "Pillow>=8.2.0",
            "selenium>=3.141.0",
            "psutil>=5.8.0",
            "scrapyd>=1.2.1",
            "scrapyd-client>=1.2.0"
        ]
        
        requirements_file = self.project_path / 'requirements.txt'
        
        with open(requirements_file, 'w') as f:
            f.write('\n'.join(requirements))
        
        print(f"创建requirements.txt文件: {requirements_file}")
    
    def create_dockerfile(self):
        """
        创建Dockerfile
        """
        dockerfile_content = f"""
# 使用官方Python镜像
FROM python:{self.config['DEFAULT']['python_version']}-slim

# 设置工作目录
WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \\
    gcc \\
    g++ \\
    libxml2-dev \\
    libxslt1-dev \\
    libffi-dev \\
    libssl-dev \\
    zlib1g-dev \\
    libjpeg-dev \\
    libpng-dev \\
    && rm -rf /var/lib/apt/lists/*

# 复制requirements文件
COPY requirements.txt .

# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制项目文件
COPY . .

# 暴露端口
EXPOSE 6800

# 启动命令
CMD ["scrapyd"]
"""
        
        dockerfile_path = self.project_path / 'Dockerfile'
        
        with open(dockerfile_path, 'w') as f:
            f.write(dockerfile_content.strip())
        
        print(f"创建Dockerfile: {dockerfile_path}")
    
    def create_docker_compose(self):
        """
        创建docker-compose.yml文件
        """
        compose_content = f"""
version: '3.8'

services:
  scrapy:
    build: .
    ports:
      - "{self.config['production']['port']}:6800"
    environment:
      - SCRAPY_SETTINGS_MODULE=myproject.settings
    volumes:
      - ./logs:/app/logs
      - ./data:/app/data
    depends_on:
      - redis
      - postgres
    restart: unless-stopped

  redis:
    image: redis:6-alpine
    ports:
      - "{self.config['redis']['port']}:6379"
    volumes:
      - redis_data:/data
    restart: unless-stopped

  postgres:
    image: postgres:13-alpine
    environment:
      - POSTGRES_DB={self.config['database']['name']}
      - POSTGRES_USER={self.config['database']['user']}
      - POSTGRES_PASSWORD={self.config['database']['password']}
    ports:
      - "{self.config['database']['port']}:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
    restart: unless-stopped

  scrapyd-web:
    image: scrapyd-web:latest
    ports:
      - "5000:5000"
    environment:
      - SCRAPYD_SERVERS=scrapy:6800
    depends_on:
      - scrapy
    restart: unless-stopped

volumes:
  redis_data:
  postgres_data:
"""
        
        compose_path = self.project_path / 'docker-compose.yml'
        
        with open(compose_path, 'w') as f:
            f.write(compose_content.strip())
        
        print(f"创建docker-compose.yml: {compose_path}")

# 使用示例
deployment_manager = DeploymentManager('/path/to/your/scrapy/project')

# 检查环境
if deployment_manager.check_environment():
    print("\n✅ 环境检查通过,可以开始部署")
    
    # 创建部署文件
    deployment_manager.create_requirements_file()
    deployment_manager.create_dockerfile()
    deployment_manager.create_docker_compose()
else:
    print("\n❌ 环境检查失败,请解决问题后重试")

print("部署准备完成!")

1.2 Scrapyd部署

# 2. Scrapyd部署
print("\n🔧 Scrapyd部署配置:")

import json
import requests
import tarfile
import tempfile
from datetime import datetime

class ScrapydDeployer:
    """
    Scrapyd部署器
    """
    
    def __init__(self, scrapyd_url='http://localhost:6800'):
        self.scrapyd_url = scrapyd_url.rstrip('/')
        self.session = requests.Session()
    
    def check_scrapyd_status(self):
        """
        检查Scrapyd状态
        """
        try:
            response = self.session.get(f"{self.scrapyd_url}/daemonstatus.json")
            
            if response.status_code == 200:
                data = response.json()
                print(f"✅ Scrapyd运行正常")
                print(f"   - 状态: {data['status']}")
                print(f"   - 运行中任务: {data['running']}")
                print(f"   - 等待中任务: {data['pending']}")
                print(f"   - 已完成任务: {data['finished']}")
                return True
            else:
                print(f"❌ Scrapyd连接失败: {response.status_code}")
                return False
        except Exception as e:
            print(f"❌ Scrapyd连接错误: {str(e)}")
            return False
    
    def list_projects(self):
        """
        列出所有项目
        """
        try:
            response = self.session.get(f"{self.scrapyd_url}/listprojects.json")
            
            if response.status_code == 200:
                data = response.json()
                projects = data['projects']
                
                print(f"📋 已部署项目 ({len(projects)}个):")
                for project in projects:
                    print(f"   - {project}")
                
                return projects
            else:
                print(f"❌ 获取项目列表失败: {response.status_code}")
                return []
        except Exception as e:
            print(f"❌ 获取项目列表错误: {str(e)}")
            return []
    
    def list_spiders(self, project):
        """
        列出项目中的爬虫
        """
        try:
            response = self.session.get(
                f"{self.scrapyd_url}/listspiders.json",
                params={'project': project}
            )
            
            if response.status_code == 200:
                data = response.json()
                spiders = data['spiders']
                
                print(f"🕷️ 项目 '{project}' 中的爬虫 ({len(spiders)}个):")
                for spider in spiders:
                    print(f"   - {spider}")
                
                return spiders
            else:
                print(f"❌ 获取爬虫列表失败: {response.status_code}")
                return []
        except Exception as e:
            print(f"❌ 获取爬虫列表错误: {str(e)}")
            return []
    
    def deploy_project(self, project_name, project_path, version=None):
        """
        部署项目到Scrapyd
        """
        if version is None:
            version = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        print(f"📦 开始部署项目 '{project_name}' 版本 '{version}'")
        
        try:
            # 创建项目包
            egg_path = self.create_egg(project_path, project_name, version)
            
            # 上传到Scrapyd
            with open(egg_path, 'rb') as f:
                files = {'egg': f}
                data = {
                    'project': project_name,
                    'version': version
                }
                
                response = self.session.post(
                    f"{self.scrapyd_url}/addversion.json",
                    files=files,
                    data=data
                )
            
            if response.status_code == 200:
                result = response.json()
                
                if result['status'] == 'ok':
                    print(f"✅ 项目部署成功")
                    print(f"   - 项目: {project_name}")
                    print(f"   - 版本: {version}")
                    print(f"   - 爬虫数量: {result['spiders']}")
                    return True
                else:
                    print(f"❌ 部署失败: {result.get('message', '未知错误')}")
                    return False
            else:
                print(f"❌ 部署请求失败: {response.status_code}")
                return False
        
        except Exception as e:
            print(f"❌ 部署过程出错: {str(e)}")
            return False
        finally:
            # 清理临时文件
            if 'egg_path' in locals() and os.path.exists(egg_path):
                os.remove(egg_path)
    
    def create_egg(self, project_path, project_name, version):
        """
        创建项目egg包
        """
        import subprocess
        import tempfile
        
        # 创建临时目录
        with tempfile.TemporaryDirectory() as temp_dir:
            egg_path = os.path.join(temp_dir, f"{project_name}-{version}.egg")
            
            # 使用scrapyd-deploy创建egg包
            cmd = [
                sys.executable, '-m', 'scrapyd_client.deploy',
                '--build-egg', egg_path,
                '--project', project_name
            ]
            
            result = subprocess.run(
                cmd,
                cwd=project_path,
                capture_output=True,
                text=True
            )
            
            if result.returncode == 0:
                # 复制到永久位置
                final_egg_path = f"{project_name}-{version}.egg"
                import shutil
                shutil.copy2(egg_path, final_egg_path)
                return final_egg_path
            else:
                raise Exception(f"创建egg包失败: {result.stderr}")
    
    def schedule_spider(self, project, spider, **kwargs):
        """
        调度爬虫运行
        """
        data = {
            'project': project,
            'spider': spider
        }
        data.update(kwargs)
        
        try:
            response = self.session.post(
                f"{self.scrapyd_url}/schedule.json",
                data=data
            )
            
            if response.status_code == 200:
                result = response.json()
                
                if result['status'] == 'ok':
                    job_id = result['jobid']
                    print(f"✅ 爬虫调度成功")
                    print(f"   - 项目: {project}")
                    print(f"   - 爬虫: {spider}")
                    print(f"   - 任务ID: {job_id}")
                    return job_id
                else:
                    print(f"❌ 调度失败: {result.get('message', '未知错误')}")
                    return None
            else:
                print(f"❌ 调度请求失败: {response.status_code}")
                return None
        
        except Exception as e:
            print(f"❌ 调度过程出错: {str(e)}")
            return None
    
    def cancel_job(self, project, job_id):
        """
        取消任务
        """
        try:
            response = self.session.post(
                f"{self.scrapyd_url}/cancel.json",
                data={
                    'project': project,
                    'job': job_id
                }
            )
            
            if response.status_code == 200:
                result = response.json()
                
                if result['status'] == 'ok':
                    print(f"✅ 任务取消成功: {job_id}")
                    return True
                else:
                    print(f"❌ 取消失败: {result.get('message', '未知错误')}")
                    return False
            else:
                print(f"❌ 取消请求失败: {response.status_code}")
                return False
        
        except Exception as e:
            print(f"❌ 取消过程出错: {str(e)}")
            return False
    
    def get_job_status(self, project, job_id):
        """
        获取任务状态
        """
        try:
            # 获取运行中的任务
            response = self.session.get(f"{self.scrapyd_url}/listjobs.json", 
                                      params={'project': project})
            
            if response.status_code == 200:
                data = response.json()
                
                # 检查各个状态
                for status in ['running', 'pending', 'finished']:
                    for job in data.get(status, []):
                        if job['id'] == job_id:
                            return {
                                'status': status,
                                'spider': job['spider'],
                                'start_time': job.get('start_time'),
                                'end_time': job.get('end_time')
                            }
                
                return {'status': 'not_found'}
            else:
                print(f"❌ 获取任务状态失败: {response.status_code}")
                return None
        
        except Exception as e:
            print(f"❌ 获取任务状态错误: {str(e)}")
            return None
    
    def delete_project(self, project):
        """
        删除项目
        """
        try:
            response = self.session.post(
                f"{self.scrapyd_url}/delproject.json",
                data={'project': project}
            )
            
            if response.status_code == 200:
                result = response.json()
                
                if result['status'] == 'ok':
                    print(f"✅ 项目删除成功: {project}")
                    return True
                else:
                    print(f"❌ 删除失败: {result.get('message', '未知错误')}")
                    return False
            else:
                print(f"❌ 删除请求失败: {response.status_code}")
                return False
        
        except Exception as e:
            print(f"❌ 删除过程出错: {str(e)}")
            return False

# 使用示例
deployer = ScrapydDeployer('http://localhost:6800')

# 检查Scrapyd状态
if deployer.check_scrapyd_status():
    # 列出现有项目
    projects = deployer.list_projects()
    
    # 部署新项目
    if deployer.deploy_project('myproject', '/path/to/project'):
        # 列出爬虫
        spiders = deployer.list_spiders('myproject')
        
        # 调度爬虫
        if spiders:
            job_id = deployer.schedule_spider('myproject', spiders[0])
            
            if job_id:
                # 检查任务状态
                import time
                time.sleep(2)
                status = deployer.get_job_status('myproject', job_id)
                print(f"任务状态: {status}")

print("Scrapyd部署完成!")

1.3 Docker部署

# 3. Docker部署
print("\n🐳 Docker部署管理:")

import docker
import yaml
from pathlib import Path

class DockerDeployer:
    """
    Docker部署管理器
    """
    
    def __init__(self):
        try:
            self.client = docker.from_env()
            print("✅ Docker客户端连接成功")
        except Exception as e:
            print(f"❌ Docker客户端连接失败: {str(e)}")
            self.client = None
    
    def build_image(self, project_path, image_name, tag='latest'):
        """
        构建Docker镜像
        """
        if not self.client:
            return False
        
        try:
            print(f"🔨 开始构建镜像: {image_name}:{tag}")
            
            # 构建镜像
            image, logs = self.client.images.build(
                path=str(project_path),
                tag=f"{image_name}:{tag}",
                rm=True,
                forcerm=True
            )
            
            # 输出构建日志
            for log in logs:
                if 'stream' in log:
                    print(log['stream'].strip())
            
            print(f"✅ 镜像构建成功: {image.id[:12]}")
            return True
        
        except Exception as e:
            print(f"❌ 镜像构建失败: {str(e)}")
            return False
    
    def run_container(self, image_name, container_name, ports=None, volumes=None, environment=None):
        """
        运行容器
        """
        if not self.client:
            return None
        
        try:
            print(f"🚀 启动容器: {container_name}")
            
            # 停止并删除同名容器
            self.stop_container(container_name)
            
            # 运行新容器
            container = self.client.containers.run(
                image_name,
                name=container_name,
                ports=ports or {},
                volumes=volumes or {},
                environment=environment or {},
                detach=True,
                restart_policy={"Name": "unless-stopped"}
            )
            
            print(f"✅ 容器启动成功: {container.id[:12]}")
            return container
        
        except Exception as e:
            print(f"❌ 容器启动失败: {str(e)}")
            return None
    
    def stop_container(self, container_name):
        """
        停止并删除容器
        """
        if not self.client:
            return False
        
        try:
            # 查找容器
            containers = self.client.containers.list(
                all=True,
                filters={'name': container_name}
            )
            
            for container in containers:
                if container.name == container_name:
                    print(f"🛑 停止容器: {container_name}")
                    container.stop()
                    container.remove()
                    return True
            
            return True
        
        except Exception as e:
            print(f"❌ 停止容器失败: {str(e)}")
            return False
    
    def list_containers(self):
        """
        列出所有容器
        """
        if not self.client:
            return []
        
        try:
            containers = self.client.containers.list(all=True)
            
            print("📋 容器列表:")
            for container in containers:
                status = container.status
                ports = container.ports
                
                print(f"   - {container.name}: {status}")
                if ports:
                    for port, bindings in ports.items():
                        if bindings:
                            for binding in bindings:
                                print(f"     端口映射: {binding['HostPort']} -> {port}")
            
            return containers
        
        except Exception as e:
            print(f"❌ 获取容器列表失败: {str(e)}")
            return []
    
    def get_container_logs(self, container_name, tail=100):
        """
        获取容器日志
        """
        if not self.client:
            return None
        
        try:
            container = self.client.containers.get(container_name)
            logs = container.logs(tail=tail, timestamps=True)
            
            print(f"📄 容器 '{container_name}' 日志 (最近{tail}行):")
            print(logs.decode('utf-8'))
            
            return logs
        
        except Exception as e:
            print(f"❌ 获取容器日志失败: {str(e)}")
            return None
    
    def deploy_with_compose(self, compose_file_path):
        """
        使用Docker Compose部署
        """
        try:
            import subprocess
            
            print(f"🐙 使用Docker Compose部署: {compose_file_path}")
            
            # 停止现有服务
            subprocess.run([
                'docker-compose', '-f', str(compose_file_path), 'down'
            ], check=False)
            
            # 启动服务
            result = subprocess.run([
                'docker-compose', '-f', str(compose_file_path), 'up', '-d'
            ], capture_output=True, text=True)
            
            if result.returncode == 0:
                print("✅ Docker Compose部署成功")
                
                # 显示服务状态
                subprocess.run([
                    'docker-compose', '-f', str(compose_file_path), 'ps'
                ])
                
                return True
            else:
                print(f"❌ Docker Compose部署失败: {result.stderr}")
                return False
        
        except Exception as e:
            print(f"❌ Docker Compose部署错误: {str(e)}")
            return False
    
    def scale_service(self, compose_file_path, service_name, replicas):
        """
        扩展服务实例
        """
        try:
            import subprocess
            
            print(f"📈 扩展服务 '{service_name}' 到 {replicas} 个实例")
            
            result = subprocess.run([
                'docker-compose', '-f', str(compose_file_path),
                'up', '-d', '--scale', f"{service_name}={replicas}"
            ], capture_output=True, text=True)
            
            if result.returncode == 0:
                print(f"✅ 服务扩展成功")
                return True
            else:
                print(f"❌ 服务扩展失败: {result.stderr}")
                return False
        
        except Exception as e:
            print(f"❌ 服务扩展错误: {str(e)}")
            return False
    
    def monitor_resources(self, container_name):
        """
        监控容器资源使用
        """
        if not self.client:
            return None
        
        try:
            container = self.client.containers.get(container_name)
            stats = container.stats(stream=False)
            
            # 计算CPU使用率
            cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - \
                       stats['precpu_stats']['cpu_usage']['total_usage']
            system_delta = stats['cpu_stats']['system_cpu_usage'] - \
                          stats['precpu_stats']['system_cpu_usage']
            
            cpu_percent = 0
            if system_delta > 0:
                cpu_percent = (cpu_delta / system_delta) * 100
            
            # 计算内存使用
            memory_usage = stats['memory_stats']['usage']
            memory_limit = stats['memory_stats']['limit']
            memory_percent = (memory_usage / memory_limit) * 100
            
            print(f"📊 容器 '{container_name}' 资源使用:")
            print(f"   - CPU: {cpu_percent:.2f}%")
            print(f"   - 内存: {memory_usage / 1024 / 1024:.1f}MB / {memory_limit / 1024 / 1024:.1f}MB ({memory_percent:.1f}%)")
            
            return {
                'cpu_percent': cpu_percent,
                'memory_usage': memory_usage,
                'memory_limit': memory_limit,
                'memory_percent': memory_percent
            }
        
        except Exception as e:
            print(f"❌ 获取资源使用情况失败: {str(e)}")
            return None

# 使用示例
docker_deployer = DockerDeployer()

if docker_deployer.client:
    # 构建镜像
    if docker_deployer.build_image('/path/to/project', 'myspider'):
        # 运行容器
        container = docker_deployer.run_container(
            'myspider:latest',
            'myspider-container',
            ports={'6800/tcp': 6800},
            volumes={'/path/to/logs': {'bind': '/app/logs', 'mode': 'rw'}},
            environment={'SCRAPY_SETTINGS_MODULE': 'myproject.settings'}
        )
        
        if container:
            # 列出容器
            docker_deployer.list_containers()
            
            # 监控资源
            import time
            time.sleep(5)
            docker_deployer.monitor_resources('myspider-container')
    
    # 使用Docker Compose部署
    compose_file = Path('/path/to/docker-compose.yml')
    if compose_file.exists():
        docker_deployer.deploy_with_compose(compose_file)

print("Docker部署完成!")

2. 监控系统

2.1 性能监控

# 4. 性能监控系统
print("\n📊 性能监控系统:")

import time
import psutil
import threading
from collections import deque, defaultdict
from datetime import datetime, timedelta

class PerformanceMonitor:
    """
    性能监控器
    """
    
    def __init__(self, max_history=1000):
        self.max_history = max_history
        self.metrics = defaultdict(lambda: deque(maxlen=max_history))
        self.is_monitoring = False
        self.monitor_thread = None
        
        # 监控配置
        self.monitor_interval = 5  # 秒
        self.alert_thresholds = {
            'cpu_percent': 80,
            'memory_percent': 85,
            'disk_percent': 90,
            'response_time': 10,  # 秒
            'error_rate': 5  # 百分比
        }
        
        # 统计数据
        self.stats = {
            'requests_total': 0,
            'requests_success': 0,
            'requests_failed': 0,
            'response_times': deque(maxlen=1000),
            'errors': deque(maxlen=100)
        }
    
    def start_monitoring(self):
        """
        开始监控
        """
        if self.is_monitoring:
            print("⚠️ 监控已在运行中")
            return
        
        self.is_monitoring = True
        self.monitor_thread = threading.Thread(target=self._monitor_loop)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
        
        print("🔍 性能监控已启动")
    
    def stop_monitoring(self):
        """
        停止监控
        """
        self.is_monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join()
        
        print("🛑 性能监控已停止")
    
    def _monitor_loop(self):
        """
        监控循环
        """
        while self.is_monitoring:
            try:
                # 收集系统指标
                self._collect_system_metrics()
                
                # 收集应用指标
                self._collect_application_metrics()
                
                # 检查告警
                self._check_alerts()
                
                time.sleep(self.monitor_interval)
            
            except Exception as e:
                print(f"❌ 监控过程出错: {str(e)}")
                time.sleep(self.monitor_interval)
    
    def _collect_system_metrics(self):
        """
        收集系统指标
        """
        timestamp = datetime.now()
        
        # CPU使用率
        cpu_percent = psutil.cpu_percent(interval=1)
        self.metrics['cpu_percent'].append((timestamp, cpu_percent))
        
        # 内存使用率
        memory = psutil.virtual_memory()
        self.metrics['memory_percent'].append((timestamp, memory.percent))
        self.metrics['memory_used'].append((timestamp, memory.used))
        self.metrics['memory_available'].append((timestamp, memory.available))
        
        # 磁盘使用率
        disk = psutil.disk_usage('/')
        disk_percent = (disk.used / disk.total) * 100
        self.metrics['disk_percent'].append((timestamp, disk_percent))
        self.metrics['disk_used'].append((timestamp, disk.used))
        self.metrics['disk_free'].append((timestamp, disk.free))
        
        # 网络IO
        net_io = psutil.net_io_counters()
        self.metrics['network_bytes_sent'].append((timestamp, net_io.bytes_sent))
        self.metrics['network_bytes_recv'].append((timestamp, net_io.bytes_recv))
        
        # 磁盘IO
        disk_io = psutil.disk_io_counters()
        if disk_io:
            self.metrics['disk_read_bytes'].append((timestamp, disk_io.read_bytes))
            self.metrics['disk_write_bytes'].append((timestamp, disk_io.write_bytes))
    
    def _collect_application_metrics(self):
        """
        收集应用指标
        """
        timestamp = datetime.now()
        
        # 计算错误率
        if self.stats['requests_total'] > 0:
            error_rate = (self.stats['requests_failed'] / self.stats['requests_total']) * 100
            self.metrics['error_rate'].append((timestamp, error_rate))
        
        # 计算平均响应时间
        if self.stats['response_times']:
            avg_response_time = sum(self.stats['response_times']) / len(self.stats['response_times'])
            self.metrics['avg_response_time'].append((timestamp, avg_response_time))
        
        # 请求速率
        recent_requests = len([t for t in self.stats['response_times'] 
                              if timestamp - datetime.fromtimestamp(t) < timedelta(minutes=1)])
        self.metrics['requests_per_minute'].append((timestamp, recent_requests))
    
    def _check_alerts(self):
        """
        检查告警条件
        """
        current_time = datetime.now()
        
        for metric_name, threshold in self.alert_thresholds.items():
            if metric_name in self.metrics and self.metrics[metric_name]:
                latest_value = self.metrics[metric_name][-1][1]
                
                if latest_value > threshold:
                    self._trigger_alert(metric_name, latest_value, threshold)
    
    def _trigger_alert(self, metric_name, current_value, threshold):
        """
        触发告警
        """
        alert_message = f"🚨 告警: {metric_name} = {current_value:.2f} (阈值: {threshold})"
        print(alert_message)
        
        # 这里可以添加更多告警处理逻辑
        # 例如发送邮件、短信、Slack通知等
    
    def record_request(self, response_time, success=True):
        """
        记录请求
        """
        self.stats['requests_total'] += 1
        
        if success:
            self.stats['requests_success'] += 1
        else:
            self.stats['requests_failed'] += 1
        
        self.stats['response_times'].append(response_time)
    
    def record_error(self, error_message):
        """
        记录错误
        """
        timestamp = datetime.now()
        self.stats['errors'].append((timestamp, error_message))
    
    def get_current_metrics(self):
        """
        获取当前指标
        """
        current_metrics = {}
        
        for metric_name, values in self.metrics.items():
            if values:
                current_metrics[metric_name] = values[-1][1]
        
        return current_metrics
    
    def get_metrics_history(self, metric_name, duration_minutes=60):
        """
        获取指标历史数据
        """
        if metric_name not in self.metrics:
            return []
        
        cutoff_time = datetime.now() - timedelta(minutes=duration_minutes)
        
        return [(timestamp, value) for timestamp, value in self.metrics[metric_name]
                if timestamp >= cutoff_time]
    
    def generate_report(self):
        """
        生成监控报告
        """
        current_metrics = self.get_current_metrics()
        
        print("\n📊 性能监控报告")
        print("=" * 50)
        print(f"报告时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print()
        
        # 系统指标
        print("🖥️ 系统指标:")
        if 'cpu_percent' in current_metrics:
            print(f"   CPU使用率: {current_metrics['cpu_percent']:.1f}%")
        if 'memory_percent' in current_metrics:
            print(f"   内存使用率: {current_metrics['memory_percent']:.1f}%")
        if 'disk_percent' in current_metrics:
            print(f"   磁盘使用率: {current_metrics['disk_percent']:.1f}%")
        
        # 应用指标
        print("\n🕷️ 应用指标:")
        print(f"   总请求数: {self.stats['requests_total']}")
        print(f"   成功请求: {self.stats['requests_success']}")
        print(f"   失败请求: {self.stats['requests_failed']}")
        
        if 'error_rate' in current_metrics:
            print(f"   错误率: {current_metrics['error_rate']:.2f}%")
        if 'avg_response_time' in current_metrics:
            print(f"   平均响应时间: {current_metrics['avg_response_time']:.2f}秒")
        if 'requests_per_minute' in current_metrics:
            print(f"   每分钟请求数: {current_metrics['requests_per_minute']:.0f}")
        
        # 最近错误
        if self.stats['errors']:
            print("\n❌ 最近错误:")
            for timestamp, error in list(self.stats['errors'])[-5:]:
                print(f"   {timestamp.strftime('%H:%M:%S')}: {error}")
        
        print("=" * 50)

# Scrapy监控中间件
class MonitoringMiddleware:
    """
    Scrapy监控中间件
    """
    
    def __init__(self):
        self.monitor = PerformanceMonitor()
        self.monitor.start_monitoring()
    
    def process_request(self, request, spider):
        """
        处理请求
        """
        request.meta['start_time'] = time.time()
        return None
    
    def process_response(self, request, response, spider):
        """
        处理响应
        """
        start_time = request.meta.get('start_time')
        
        if start_time:
            response_time = time.time() - start_time
            success = response.status < 400
            
            self.monitor.record_request(response_time, success)
            
            if not success:
                self.monitor.record_error(f"HTTP {response.status}: {request.url}")
        
        return response
    
    def process_exception(self, request, exception, spider):
        """
        处理异常
        """
        self.monitor.record_error(f"Exception: {str(exception)}")
        return None

# 使用示例
monitor = PerformanceMonitor()
monitor.start_monitoring()

# 模拟一些请求
import random
for i in range(10):
    response_time = random.uniform(0.5, 3.0)
    success = random.choice([True, True, True, False])  # 75%成功率
    
    monitor.record_request(response_time, success)
    
    if not success:
        monitor.record_error(f"模拟错误 {i}")
    
    time.sleep(0.1)

# 生成报告
time.sleep(2)
monitor.generate_report()

# 获取指标历史
cpu_history = monitor.get_metrics_history('cpu_percent', 5)
print(f"\n📈 CPU使用率历史 (最近5分钟): {len(cpu_history)}个数据点")

monitor.stop_monitoring()

print("性能监控系统演示完成!")

2.2 日志监控

# 5. 日志监控系统
print("\n📝 日志监控系统:")

import logging
import re
import json
from datetime import datetime, timedelta
from collections import defaultdict, Counter
import threading
import queue

class LogMonitor:
    """
    日志监控器
    """
    
    def __init__(self, log_file_path=None):
        self.log_file_path = log_file_path
        self.log_queue = queue.Queue()
        self.is_monitoring = False
        self.monitor_thread = None
        
        # 日志统计
        self.log_stats = {
            'total_lines': 0,
            'error_count': 0,
            'warning_count': 0,
            'info_count': 0,
            'debug_count': 0,
            'spider_stats': defaultdict(lambda: defaultdict(int)),
            'error_patterns': Counter(),
            'recent_errors': [],
            'performance_metrics': []
        }
        
        # 错误模式
        self.error_patterns = [
            (r'TimeoutError', '请求超时'),
            (r'ConnectionError', '连接错误'),
            (r'HTTP (\d+)', 'HTTP错误'),
            (r'DNSLookupError', 'DNS解析错误'),
            (r'TunnelError', '代理隧道错误'),
            (r'SSLError', 'SSL证书错误'),
            (r'MemoryError', '内存不足'),
            (r'KeyError', '键值错误'),
            (r'AttributeError', '属性错误'),
            (r'ValueError', '值错误')
        ]
        
        # 性能模式
        self.performance_patterns = [
            (r'Crawled \((\d+)\) <GET ([^>]+)> \(referer: [^)]*\) \[(\d+)ms\]', 'request_time'),
            (r'Spider opened', 'spider_start'),
            (r'Spider closed \(([^)]+)\)', 'spider_close'),
            (r'Scraped from <(\d+) ([^>]+)>', 'item_scraped')
        ]
    
    def start_monitoring(self):
        """
        开始监控日志
        """
        if self.is_monitoring:
            print("⚠️ 日志监控已在运行中")
            return
        
        self.is_monitoring = True
        self.monitor_thread = threading.Thread(target=self._monitor_log_file)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
        
        print(f"📝 开始监控日志文件: {self.log_file_path}")
    
    def stop_monitoring(self):
        """
        停止监控
        """
        self.is_monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join()
        
        print("🛑 日志监控已停止")
    
    def _monitor_log_file(self):
        """
        监控日志文件
        """
        if not self.log_file_path:
            return
        
        try:
            with open(self.log_file_path, 'r', encoding='utf-8') as f:
                # 移动到文件末尾
                f.seek(0, 2)
                
                while self.is_monitoring:
                    line = f.readline()
                    
                    if line:
                        self._process_log_line(line.strip())
                    else:
                        time.sleep(0.1)
        
        except FileNotFoundError:
            print(f"❌ 日志文件不存在: {self.log_file_path}")
        except Exception as e:
            print(f"❌ 监控日志文件出错: {str(e)}")
    
    def _process_log_line(self, line):
        """
        处理日志行
        """
        self.log_stats['total_lines'] += 1
        
        # 解析日志级别
        log_level = self._extract_log_level(line)
        if log_level:
            self.log_stats[f'{log_level.lower()}_count'] += 1
        
        # 检查错误模式
        if log_level in ['ERROR', 'CRITICAL']:
            self._analyze_error(line)
        
        # 检查性能模式
        self._analyze_performance(line)
        
        # 提取爬虫统计
        self._extract_spider_stats(line)
    
    def _extract_log_level(self, line):
        """
        提取日志级别
        """
        level_pattern = r'\[(DEBUG|INFO|WARNING|ERROR|CRITICAL)\]'
        match = re.search(level_pattern, line)
        
        return match.group(1) if match else None
    
    def _analyze_error(self, line):
        """
        分析错误日志
        """
        # 记录最近错误
        timestamp = datetime.now()
        self.log_stats['recent_errors'].append((timestamp, line))
        
        # 保持最近100个错误
        if len(self.log_stats['recent_errors']) > 100:
            self.log_stats['recent_errors'] = self.log_stats['recent_errors'][-100:]
        
        # 匹配错误模式
        for pattern, description in self.error_patterns:
            if re.search(pattern, line, re.IGNORECASE):
                self.log_stats['error_patterns'][description] += 1
                break
    
    def _analyze_performance(self, line):
        """
        分析性能日志
        """
        for pattern, metric_type in self.performance_patterns:
            match = re.search(pattern, line)
            
            if match:
                timestamp = datetime.now()
                
                if metric_type == 'request_time':
                    status_code = int(match.group(1))
                    url = match.group(2)
                    response_time = int(match.group(3))
                    
                    self.log_stats['performance_metrics'].append({
                        'timestamp': timestamp,
                        'type': 'request',
                        'status_code': status_code,
                        'url': url,
                        'response_time': response_time
                    })
                
                elif metric_type == 'spider_start':
                    self.log_stats['performance_metrics'].append({
                        'timestamp': timestamp,
                        'type': 'spider_start'
                    })
                
                elif metric_type == 'spider_close':
                    reason = match.group(1)
                    self.log_stats['performance_metrics'].append({
                        'timestamp': timestamp,
                        'type': 'spider_close',
                        'reason': reason
                    })
                
                break
    
    def _extract_spider_stats(self, line):
        """
        提取爬虫统计信息
        """
        # 提取爬虫名称
        spider_pattern = r'\[([^\]]+)\]'
        spider_match = re.search(spider_pattern, line)
        
        if spider_match:
            spider_name = spider_match.group(1)
            
            # 统计不同类型的日志
            if 'Scraped from' in line:
                self.log_stats['spider_stats'][spider_name]['items_scraped'] += 1
            elif 'Crawled' in line:
                self.log_stats['spider_stats'][spider_name]['pages_crawled'] += 1
            elif 'ERROR' in line:
                self.log_stats['spider_stats'][spider_name]['errors'] += 1
    
    def get_error_summary(self, hours=24):
        """
        获取错误摘要
        """
        cutoff_time = datetime.now() - timedelta(hours=hours)
        
        recent_errors = [
            (timestamp, error) for timestamp, error in self.log_stats['recent_errors']
            if timestamp >= cutoff_time
        ]
        
        return {
            'total_errors': len(recent_errors),
            'error_patterns': dict(self.log_stats['error_patterns']),
            'recent_errors': recent_errors[-10:]  # 最近10个错误
        }
    
    def get_performance_summary(self, hours=24):
        """
        获取性能摘要
        """
        cutoff_time = datetime.now() - timedelta(hours=hours)
        
        recent_metrics = [
            metric for metric in self.log_stats['performance_metrics']
            if metric['timestamp'] >= cutoff_time
        ]
        
        # 计算平均响应时间
        request_metrics = [m for m in recent_metrics if m['type'] == 'request']
        
        if request_metrics:
            avg_response_time = sum(m['response_time'] for m in request_metrics) / len(request_metrics)
            max_response_time = max(m['response_time'] for m in request_metrics)
            min_response_time = min(m['response_time'] for m in request_metrics)
        else:
            avg_response_time = max_response_time = min_response_time = 0
        
        # 统计状态码
        status_codes = Counter(m['status_code'] for m in request_metrics)
        
        return {
            'total_requests': len(request_metrics),
            'avg_response_time': avg_response_time,
            'max_response_time': max_response_time,
            'min_response_time': min_response_time,
            'status_codes': dict(status_codes)
        }
    
    def generate_log_report(self):
        """
        生成日志报告
        """
        print("\n📊 日志监控报告")
        print("=" * 50)
        print(f"报告时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print()
        
        # 基本统计
        print("📈 基本统计:")
        print(f"   总日志行数: {self.log_stats['total_lines']}")
        print(f"   错误日志: {self.log_stats['error_count']}")
        print(f"   警告日志: {self.log_stats['warning_count']}")
        print(f"   信息日志: {self.log_stats['info_count']}")
        print(f"   调试日志: {self.log_stats['debug_count']}")
        
        # 错误摘要
        error_summary = self.get_error_summary()
        print(f"\n❌ 错误摘要 (最近24小时):")
        print(f"   总错误数: {error_summary['total_errors']}")
        
        if error_summary['error_patterns']:
            print("   错误类型分布:")
            for error_type, count in error_summary['error_patterns'].most_common(5):
                print(f"     - {error_type}: {count}")
        
        # 性能摘要
        perf_summary = self.get_performance_summary()
        print(f"\n⚡ 性能摘要 (最近24小时):")
        print(f"   总请求数: {perf_summary['total_requests']}")
        
        if perf_summary['total_requests'] > 0:
            print(f"   平均响应时间: {perf_summary['avg_response_time']:.2f}ms")
            print(f"   最大响应时间: {perf_summary['max_response_time']}ms")
            print(f"   最小响应时间: {perf_summary['min_response_time']}ms")
        
        if perf_summary['status_codes']:
            print("   状态码分布:")
            for status_code, count in perf_summary['status_codes'].items():
                print(f"     - {status_code}: {count}")
        
        # 爬虫统计
        if self.log_stats['spider_stats']:
            print(f"\n🕷️ 爬虫统计:")
            for spider_name, stats in self.log_stats['spider_stats'].items():
                print(f"   {spider_name}:")
                for stat_name, value in stats.items():
                    print(f"     - {stat_name}: {value}")
        
        print("=" * 50)

# 自定义日志处理器
class ScrapyLogHandler(logging.Handler):
    """
    Scrapy日志处理器
    """
    
    def __init__(self, log_monitor):
        super().__init__()
        self.log_monitor = log_monitor
    
    def emit(self, record):
        """
        处理日志记录
        """
        try:
            log_line = self.format(record)
            self.log_monitor._process_log_line(log_line)
        except Exception:
            self.handleError(record)

# 日志配置
def setup_logging_with_monitoring(log_file_path):
    """
    设置带监控的日志配置
    """
    # 创建日志监控器
    log_monitor = LogMonitor(log_file_path)
    
    # 配置日志格式
    formatter = logging.Formatter(
        '%(asctime)s [%(name)s] %(levelname)s: %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    
    # 文件处理器
    file_handler = logging.FileHandler(log_file_path, encoding='utf-8')
    file_handler.setFormatter(formatter)
    
    # 监控处理器
    monitor_handler = ScrapyLogHandler(log_monitor)
    monitor_handler.setFormatter(formatter)
    
    # 配置根日志器
    root_logger = logging.getLogger()
    root_logger.setLevel(logging.INFO)
    root_logger.addHandler(file_handler)
    root_logger.addHandler(monitor_handler)
    
    # 启动监控
    log_monitor.start_monitoring()
    
    return log_monitor

# 使用示例
log_file = 'scrapy.log'
log_monitor = setup_logging_with_monitoring(log_file)

# 模拟一些日志
logger = logging.getLogger('test_spider')

logger.info("Spider opened")
logger.info("Crawled (200) <GET https://example.com> (referer: None) [150ms]")
logger.info("Scraped from <200 https://example.com>")
logger.warning("Retrying request due to timeout")
logger.error("TimeoutError: Request timed out")
logger.error("ConnectionError: Failed to establish connection")
logger.info("Spider closed (finished)")

# 等待处理
time.sleep(1)

# 生成报告
log_monitor.generate_log_report()

log_monitor.stop_monitoring()

print("日志监控系统演示完成!")

2.3 告警系统

# 6. 告警系统
print("\n🚨 告警系统:")

import smtplib
import json
import requests
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart
from datetime import datetime, timedelta
from enum import Enum

class AlertLevel(Enum):
    """告警级别"""
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    CRITICAL = "critical"

class AlertChannel(Enum):
    """告警渠道"""
    EMAIL = "email"
    SLACK = "slack"
    WEBHOOK = "webhook"
    SMS = "sms"

class AlertManager:
    """
    告警管理器
    """
    
    def __init__(self, config_file='alert_config.json'):
        self.config = self.load_config(config_file)
        self.alert_history = []
        self.suppressed_alerts = set()
        
        # 告警规则
        self.alert_rules = {
            'high_error_rate': {
                'condition': lambda stats: stats.get('error_rate', 0) > 10,
                'level': AlertLevel.ERROR,
                'message': "错误率过高: {error_rate:.2f}%",
                'cooldown': 300  # 5分钟冷却
            },
            'high_response_time': {
                'condition': lambda stats: stats.get('avg_response_time', 0) > 5,
                'level': AlertLevel.WARNING,
                'message': "响应时间过长: {avg_response_time:.2f}秒",
                'cooldown': 180  # 3分钟冷却
            },
            'spider_failure': {
                'condition': lambda stats: stats.get('spider_status') == 'failed',
                'level': AlertLevel.CRITICAL,
                'message': "爬虫运行失败: {spider_name}",
                'cooldown': 60  # 1分钟冷却
            },
            'memory_usage_high': {
                'condition': lambda stats: stats.get('memory_percent', 0) > 90,
                'level': AlertLevel.CRITICAL,
                'message': "内存使用率过高: {memory_percent:.1f}%",
                'cooldown': 300  # 5分钟冷却
            },
            'disk_space_low': {
                'condition': lambda stats: stats.get('disk_percent', 0) > 95,
                'level': AlertLevel.CRITICAL,
                'message': "磁盘空间不足: {disk_percent:.1f}%",
                'cooldown': 600  # 10分钟冷却
            }
        }
    
    def load_config(self, config_file):
        """
        加载告警配置
        """
        try:
            with open(config_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        except FileNotFoundError:
            # 创建默认配置
            default_config = {
                'email': {
                    'smtp_server': 'smtp.gmail.com',
                    'smtp_port': 587,
                    'username': 'your_email@gmail.com',
                    'password': 'your_password',
                    'recipients': ['admin@example.com']
                },
                'slack': {
                    'webhook_url': 'https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK',
                    'channel': '#alerts'
                },
                'webhook': {
                    'url': 'https://your-webhook-endpoint.com/alerts',
                    'headers': {'Content-Type': 'application/json'}
                },
                'enabled_channels': ['email', 'slack'],
                'alert_levels': {
                    'info': ['slack'],
                    'warning': ['slack'],
                    'error': ['email', 'slack'],
                    'critical': ['email', 'slack', 'webhook']
                }
            }
            
            with open(config_file, 'w', encoding='utf-8') as f:
                json.dump(default_config, f, indent=2, ensure_ascii=False)
            
            print(f"创建默认告警配置: {config_file}")
            return default_config
    
    def check_alerts(self, stats):
        """
        检查告警条件
        """
        current_time = datetime.now()
        triggered_alerts = []
        
        for rule_name, rule in self.alert_rules.items():
            # 检查冷却时间
            if self._is_suppressed(rule_name, current_time):
                continue
            
            # 检查告警条件
            if rule['condition'](stats):
                alert = {
                    'rule_name': rule_name,
                    'level': rule['level'],
                    'message': rule['message'].format(**stats),
                    'timestamp': current_time,
                    'stats': stats
                }
                
                triggered_alerts.append(alert)
                
                # 发送告警
                self.send_alert(alert)
                
                # 添加到抑制列表
                self._suppress_alert(rule_name, current_time, rule['cooldown'])
        
        return triggered_alerts
    
    def _is_suppressed(self, rule_name, current_time):
        """
        检查告警是否被抑制
        """
        suppression_key = f"{rule_name}_{current_time.strftime('%Y%m%d%H%M')}"
        
        # 清理过期的抑制
        expired_keys = [key for key in self.suppressed_alerts 
                       if self._is_suppression_expired(key, current_time)]
        
        for key in expired_keys:
            self.suppressed_alerts.discard(key)
        
        return any(key.startswith(rule_name) for key in self.suppressed_alerts)
    
    def _suppress_alert(self, rule_name, current_time, cooldown_seconds):
        """
        抑制告警
        """
        expiry_time = current_time + timedelta(seconds=cooldown_seconds)
        suppression_key = f"{rule_name}_{expiry_time.strftime('%Y%m%d%H%M%S')}"
        self.suppressed_alerts.add(suppression_key)
    
    def _is_suppression_expired(self, suppression_key, current_time):
        """
        检查抑制是否过期
        """
        try:
            expiry_str = suppression_key.split('_')[-1]
            expiry_time = datetime.strptime(expiry_str, '%Y%m%d%H%M%S')
            return current_time > expiry_time
        except (ValueError, IndexError):
            return True
    
    def send_alert(self, alert):
        """
        发送告警
        """
        level = alert['level'].value
        enabled_channels = self.config.get('alert_levels', {}).get(level, [])
        
        for channel in enabled_channels:
            if channel in self.config.get('enabled_channels', []):
                try:
                    if channel == 'email':
                        self._send_email_alert(alert)
                    elif channel == 'slack':
                        self._send_slack_alert(alert)
                    elif channel == 'webhook':
                        self._send_webhook_alert(alert)
                    
                    print(f"✅ 告警已发送到 {channel}: {alert['message']}")
                
                except Exception as e:
                    print(f"❌ 发送告警到 {channel} 失败: {str(e)}")
        
        # 记录告警历史
        self.alert_history.append(alert)
        
        # 保持最近1000条告警记录
        if len(self.alert_history) > 1000:
            self.alert_history = self.alert_history[-1000:]
    
    def _send_email_alert(self, alert):
        """
        发送邮件告警
        """
        email_config = self.config.get('email', {})
        
        if not email_config.get('username') or not email_config.get('recipients'):
            raise ValueError("邮件配置不完整")
        
        # 创建邮件
        msg = MimeMultipart()
        msg['From'] = email_config['username']
        msg['To'] = ', '.join(email_config['recipients'])
        msg['Subject'] = f"[{alert['level'].value.upper()}] Scrapy告警: {alert['rule_name']}"
        
        # 邮件内容
        body = f"""
告警详情:
- 规则: {alert['rule_name']}
- 级别: {alert['level'].value.upper()}
- 时间: {alert['timestamp'].strftime('%Y-%m-%d %H:%M:%S')}
- 消息: {alert['message']}

系统状态:
{json.dumps(alert['stats'], indent=2, ensure_ascii=False)}
"""
        
        msg.attach(MimeText(body, 'plain', 'utf-8'))
        
        # 发送邮件
        with smtplib.SMTP(email_config['smtp_server'], email_config['smtp_port']) as server:
            server.starttls()
            server.login(email_config['username'], email_config['password'])
            server.send_message(msg)
    
    def _send_slack_alert(self, alert):
        """
        发送Slack告警
        """
        slack_config = self.config.get('slack', {})
        
        if not slack_config.get('webhook_url'):
            raise ValueError("Slack配置不完整")
        
        # 设置颜色
        color_map = {
            AlertLevel.INFO: 'good',
            AlertLevel.WARNING: 'warning',
            AlertLevel.ERROR: 'danger',
            AlertLevel.CRITICAL: 'danger'
        }
        
        # 构建消息
        payload = {
            'channel': slack_config.get('channel', '#alerts'),
            'username': 'Scrapy Monitor',
            'icon_emoji': ':spider:',
            'attachments': [{
                'color': color_map.get(alert['level'], 'danger'),
                'title': f"{alert['level'].value.upper()} Alert: {alert['rule_name']}",
                'text': alert['message'],
                'fields': [
                    {
                        'title': '时间',
                        'value': alert['timestamp'].strftime('%Y-%m-%d %H:%M:%S'),
                        'short': True
                    },
                    {
                        'title': '级别',
                        'value': alert['level'].value.upper(),
                        'short': True
                    }
                ],
                'footer': 'Scrapy Monitor',
                'ts': int(alert['timestamp'].timestamp())
            }]
        }
        
        # 发送到Slack
        response = requests.post(
            slack_config['webhook_url'],
            json=payload,
            timeout=10
        )
        
        response.raise_for_status()
    
    def _send_webhook_alert(self, alert):
        """
        发送Webhook告警
        """
        webhook_config = self.config.get('webhook', {})
        
        if not webhook_config.get('url'):
            raise ValueError("Webhook配置不完整")
        
        # 构建载荷
        payload = {
            'alert_type': 'scrapy_monitor',
            'rule_name': alert['rule_name'],
            'level': alert['level'].value,
            'message': alert['message'],
            'timestamp': alert['timestamp'].isoformat(),
            'stats': alert['stats']
        }
        
        # 发送请求
        response = requests.post(
            webhook_config['url'],
            json=payload,
            headers=webhook_config.get('headers', {}),
            timeout=10
        )
        
        response.raise_for_status()
    
    def get_alert_history(self, hours=24, level=None):
        """
        获取告警历史
        """
        cutoff_time = datetime.now() - timedelta(hours=hours)
        
        filtered_alerts = [
            alert for alert in self.alert_history
            if alert['timestamp'] >= cutoff_time
        ]
        
        if level:
            filtered_alerts = [
                alert for alert in filtered_alerts
                if alert['level'] == level
            ]
        
        return filtered_alerts
    
    def generate_alert_report(self):
        """
        生成告警报告
        """
        print("\n🚨 告警系统报告")
        print("=" * 50)
        print(f"报告时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print()
        
        # 最近24小时告警统计
        recent_alerts = self.get_alert_history(24)
        
        if recent_alerts:
            print(f"📊 最近24小时告警统计 (共{len(recent_alerts)}条):")
            
            # 按级别统计
            level_counts = {}
            for alert in recent_alerts:
                level = alert['level'].value
                level_counts[level] = level_counts.get(level, 0) + 1
            
            for level, count in level_counts.items():
                print(f"   - {level.upper()}: {count}")
            
            # 最近告警
            print(f"\n🔔 最近告警 (最近5条):")
            for alert in recent_alerts[-5:]:
                print(f"   - {alert['timestamp'].strftime('%H:%M:%S')} [{alert['level'].value.upper()}] {alert['message']}")
        else:
            print("✅ 最近24小时无告警")
        
        # 当前抑制的告警
        if self.suppressed_alerts:
            print(f"\n🔇 当前抑制的告警规则: {len(self.suppressed_alerts)}个")
        
        print("=" * 50)

# 使用示例
alert_manager = AlertManager()

# 模拟一些统计数据和告警
test_stats = [
    {'error_rate': 15, 'avg_response_time': 2, 'memory_percent': 70},
    {'error_rate': 5, 'avg_response_time': 8, 'memory_percent': 85},
    {'error_rate': 3, 'avg_response_time': 3, 'memory_percent': 95},
    {'spider_status': 'failed', 'spider_name': 'test_spider', 'memory_percent': 60}
]

for i, stats in enumerate(test_stats):
    print(f"\n检查告警 #{i+1}: {stats}")
    alerts = alert_manager.check_alerts(stats)
    
    if alerts:
        for alert in alerts:
            print(f"  触发告警: {alert['message']}")
    else:
        print("  无告警触发")
    
    time.sleep(1)

# 生成告警报告
alert_manager.generate_alert_report()

print("告警系统演示完成!")

3. 故障排除与维护

3.1 常见问题诊断

# 7. 故障排除与维护
print("\n🔧 故障排除与维护:")

import subprocess
import psutil
import socket
from urllib.parse import urlparse

class DiagnosticTool:
    """
    诊断工具
    """
    
    def __init__(self):
        self.diagnostic_results = {}
    
    def run_full_diagnosis(self):
        """
        运行完整诊断
        """
        print("🔍 开始系统诊断...")
        
        diagnostics = [
            ('系统资源', self.check_system_resources),
            ('网络连接', self.check_network_connectivity),
            ('Scrapy环境', self.check_scrapy_environment),
            ('依赖包', self.check_dependencies),
            ('配置文件', self.check_configuration),
            ('日志文件', self.check_log_files),
            ('数据库连接', self.check_database_connection),
            ('Redis连接', self.check_redis_connection)
        ]
        
        for name, diagnostic_func in diagnostics:
            print(f"\n📋 检查 {name}:")
            try:
                result = diagnostic_func()
                self.diagnostic_results[name] = result
                
                if result['status'] == 'ok':
                    print(f"  ✅ {result['message']}")
                else:
                    print(f"  ❌ {result['message']}")
                    if 'suggestions' in result:
                        print("  💡 建议:")
                        for suggestion in result['suggestions']:
                            print(f"     - {suggestion}")
            
            except Exception as e:
                error_result = {
                    'status': 'error',
                    'message': f"诊断过程出错: {str(e)}"
                }
                self.diagnostic_results[name] = error_result
                print(f"  ❌ {error_result['message']}")
        
        return self.diagnostic_results
    
    def check_system_resources(self):
        """
        检查系统资源
        """
        try:
            # CPU使用率
            cpu_percent = psutil.cpu_percent(interval=1)
            
            # 内存使用
            memory = psutil.virtual_memory()
            
            # 磁盘使用
            disk = psutil.disk_usage('/')
            
            issues = []
            suggestions = []
            
            if cpu_percent > 90:
                issues.append(f"CPU使用率过高: {cpu_percent:.1f}%")
                suggestions.append("检查是否有异常进程占用CPU")
            
            if memory.percent > 90:
                issues.append(f"内存使用率过高: {memory.percent:.1f}%")
                suggestions.append("考虑增加内存或优化内存使用")
            
            disk_percent = (disk.used / disk.total) * 100
            if disk_percent > 90:
                issues.append(f"磁盘使用率过高: {disk_percent:.1f}%")
                suggestions.append("清理日志文件和临时文件")
            
            if issues:
                return {
                    'status': 'warning',
                    'message': '; '.join(issues),
                    'suggestions': suggestions,
                    'details': {
                        'cpu_percent': cpu_percent,
                        'memory_percent': memory.percent,
                        'disk_percent': disk_percent
                    }
                }
            else:
                return {
                    'status': 'ok',
                    'message': f"系统资源正常 (CPU: {cpu_percent:.1f}%, 内存: {memory.percent:.1f}%, 磁盘: {disk_percent:.1f}%)",
                    'details': {
                        'cpu_percent': cpu_percent,
                        'memory_percent': memory.percent,
                        'disk_percent': disk_percent
                    }
                }
        
        except Exception as e:
            return {
                'status': 'error',
                'message': f"无法检查系统资源: {str(e)}"
            }
    
    def check_network_connectivity(self):
        """
        检查网络连接
        """
        test_urls = [
            'https://httpbin.org/get',
            'https://www.google.com',
            'https://www.baidu.com'
        ]
        
        successful_connections = 0
        failed_connections = []
        
        for url in test_urls:
            try:
                import requests
                response = requests.get(url, timeout=10)
                
                if response.status_code == 200:
                    successful_connections += 1
                else:
                    failed_connections.append(f"{url} (HTTP {response.status_code})")
            
            except Exception as e:
                failed_connections.append(f"{url} ({str(e)})")
        
        if successful_connections == len(test_urls):
            return {
                'status': 'ok',
                'message': f"网络连接正常 ({successful_connections}/{len(test_urls)} 成功)"
            }
        elif successful_connections > 0:
            return {
                'status': 'warning',
                'message': f"部分网络连接失败 ({successful_connections}/{len(test_urls)} 成功)",
                'suggestions': [
                    "检查网络配置",
                    "检查防火墙设置",
                    "检查代理配置"
                ],
                'details': {'failed_connections': failed_connections}
            }
        else:
            return {
                'status': 'error',
                'message': "所有网络连接失败",
                'suggestions': [
                    "检查网络连接",
                    "检查DNS设置",
                    "检查防火墙配置"
                ],
                'details': {'failed_connections': failed_connections}
            }
    
    def check_scrapy_environment(self):
        """
        检查Scrapy环境
        """
        try:
            import scrapy
            import twisted
            
            # 检查版本
            scrapy_version = scrapy.__version__
            twisted_version = twisted.__version__
            
            # 检查Scrapy命令
            result = subprocess.run(
                [sys.executable, '-m', 'scrapy', 'version'],
                capture_output=True,
                text=True,
                timeout=10
            )
            
            if result.returncode == 0:
                return {
                    'status': 'ok',
                    'message': f"Scrapy环境正常 (Scrapy: {scrapy_version}, Twisted: {twisted_version})",
                    'details': {
                        'scrapy_version': scrapy_version,
                        'twisted_version': twisted_version,
                        'command_output': result.stdout
                    }
                }
            else:
                return {
                    'status': 'error',
                    'message': f"Scrapy命令执行失败: {result.stderr}",
                    'suggestions': [
                        "重新安装Scrapy",
                        "检查Python环境",
                        "检查PATH环境变量"
                    ]
                }
        
        except ImportError as e:
            return {
                'status': 'error',
                'message': f"Scrapy未正确安装: {str(e)}",
                'suggestions': [
                    "安装Scrapy: pip install scrapy",
                    "检查虚拟环境",
                    "检查Python版本兼容性"
                ]
            }
        except Exception as e:
            return {
                'status': 'error',
                'message': f"检查Scrapy环境时出错: {str(e)}"
            }
    
    def check_dependencies(self):
        """
        检查依赖包
        """
        try:
            # 检查关键依赖
            critical_packages = [
                'scrapy',
                'twisted',
                'lxml',
                'requests',
                'urllib3'
            ]
            
            missing_packages = []
            installed_packages = {}
            
            for package in critical_packages:
                try:
                    module = __import__(package)
                    version = getattr(module, '__version__', 'unknown')
                    installed_packages[package] = version
                except ImportError:
                    missing_packages.append(package)
            
            if missing_packages:
                return {
                    'status': 'error',
                    'message': f"缺少关键依赖包: {', '.join(missing_packages)}",
                    'suggestions': [
                        f"安装缺少的包: pip install {' '.join(missing_packages)}",
                        "检查requirements.txt文件",
                        "使用虚拟环境管理依赖"
                    ],
                    'details': {
                        'missing_packages': missing_packages,
                        'installed_packages': installed_packages
                    }
                }
            else:
                return {
                    'status': 'ok',
                    'message': f"所有关键依赖包已安装 ({len(installed_packages)}个)",
                    'details': {'installed_packages': installed_packages}
                }
        
        except Exception as e:
            return {
                'status': 'error',
                'message': f"检查依赖包时出错: {str(e)}"
            }
    
    def check_configuration(self):
        """
        检查配置文件
        """
        config_files = [
            'scrapy.cfg',
            'settings.py',
            'deploy.ini'
        ]
        
        found_configs = []
        missing_configs = []
        
        for config_file in config_files:
            if os.path.exists(config_file):
                found_configs.append(config_file)
            else:
                missing_configs.append(config_file)
        
        if found_configs:
            return {
                'status': 'ok',
                'message': f"找到配置文件: {', '.join(found_configs)}",
                'details': {
                    'found_configs': found_configs,
                    'missing_configs': missing_configs
                }
            }
        else:
            return {
                'status': 'warning',
                'message': "未找到配置文件",
                'suggestions': [
                    "检查当前目录是否为Scrapy项目根目录",
                    "创建必要的配置文件",
                    "检查文件权限"
                ],
                'details': {'missing_configs': missing_configs}
            }
    
    def check_log_files(self):
        """
        检查日志文件
        """
        log_patterns = [
            '*.log',
            'logs/*.log',
            'scrapy.log'
        ]
        
        import glob
        
        found_logs = []
        for pattern in log_patterns:
            found_logs.extend(glob.glob(pattern))
        
        if found_logs:
            # 检查日志文件大小
            large_logs = []
            for log_file in found_logs:
                size = os.path.getsize(log_file)
                if size > 100 * 1024 * 1024:  # 100MB
                    large_logs.append((log_file, size))
            
            suggestions = []
            if large_logs:
                suggestions.append("考虑清理或轮转大型日志文件")
                suggestions.append("配置日志轮转策略")
            
            return {
                'status': 'ok' if not large_logs else 'warning',
                'message': f"找到日志文件: {len(found_logs)}个",
                'suggestions': suggestions,
                'details': {
                    'log_files': found_logs,
                    'large_logs': large_logs
                }
            }
        else:
            return {
                'status': 'warning',
                'message': "未找到日志文件",
                'suggestions': [
                    "检查日志配置",
                    "确认爬虫已运行过",
                    "检查日志目录权限"
                ]
            }
    
    def check_database_connection(self):
        """
        检查数据库连接
        """
        try:
            # 这里以PostgreSQL为例
            import psycopg2
            
            # 从环境变量或配置文件获取连接信息
            conn_params = {
                'host': os.getenv('DB_HOST', 'localhost'),
                'port': os.getenv('DB_PORT', '5432'),
                'database': os.getenv('DB_NAME', 'scrapy_db'),
                'user': os.getenv('DB_USER', 'scrapy_user'),
                'password': os.getenv('DB_PASSWORD', '')
            }
            
            if not conn_params['password']:
                return {
                    'status': 'warning',
                    'message': "数据库密码未配置",
                    'suggestions': [
                        "设置DB_PASSWORD环境变量",
                        "检查数据库配置文件"
                    ]
                }
            
            # 尝试连接
            conn = psycopg2.connect(**conn_params)
            cursor = conn.cursor()
            cursor.execute('SELECT version();')
            version = cursor.fetchone()[0]
            
            cursor.close()
            conn.close()
            
            return {
                'status': 'ok',
                'message': f"数据库连接正常",
                'details': {
                    'database_version': version,
                    'connection_params': {k: v for k, v in conn_params.items() if k != 'password'}
                }
            }
        
        except ImportError:
            return {
                'status': 'warning',
                'message': "数据库驱动未安装",
                'suggestions': [
                    "安装数据库驱动: pip install psycopg2-binary",
                    "根据使用的数据库安装相应驱动"
                ]
            }
        except Exception as e:
            return {
                'status': 'error',
                'message': f"数据库连接失败: {str(e)}",
                'suggestions': [
                    "检查数据库服务是否运行",
                    "检查连接参数",
                    "检查网络连接",
                    "检查数据库用户权限"
                ]
            }
    
    def check_redis_connection(self):
        """
        检查Redis连接
        """
        try:
            import redis
            
            # 从环境变量获取Redis配置
            redis_host = os.getenv('REDIS_HOST', 'localhost')
            redis_port = int(os.getenv('REDIS_PORT', '6379'))
            redis_db = int(os.getenv('REDIS_DB', '0'))
            redis_password = os.getenv('REDIS_PASSWORD', None)
            
            # 创建Redis连接
            r = redis.Redis(
                host=redis_host,
                port=redis_port,
                db=redis_db,
                password=redis_password,
                socket_timeout=5
            )
            
            # 测试连接
            r.ping()
            
            # 获取Redis信息
            info = r.info()
            
            return {
                'status': 'ok',
                'message': f"Redis连接正常",
                'details': {
                    'redis_version': info.get('redis_version'),
                    'used_memory_human': info.get('used_memory_human'),
                    'connected_clients': info.get('connected_clients')
                }
            }
        
        except ImportError:
            return {
                'status': 'warning',
                'message': "Redis客户端未安装",
                'suggestions': [
                    "安装Redis客户端: pip install redis"
                ]
            }
        except Exception as e:
            return {
                'status': 'error',
                'message': f"Redis连接失败: {str(e)}",
                'suggestions': [
                    "检查Redis服务是否运行",
                    "检查Redis配置",
                    "检查网络连接",
                    "检查Redis密码"
                ]
            }
    
    def generate_diagnostic_report(self):
        """
        生成诊断报告
        """
        print("\n🔍 系统诊断报告")
        print("=" * 60)
        print(f"诊断时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print()
        
        # 统计结果
        total_checks = len(self.diagnostic_results)
        ok_count = sum(1 for result in self.diagnostic_results.values() if result['status'] == 'ok')
        warning_count = sum(1 for result in self.diagnostic_results.values() if result['status'] == 'warning')
        error_count = sum(1 for result in self.diagnostic_results.values() if result['status'] == 'error')
        
        print(f"📊 诊断概览:")
        print(f"   总检查项: {total_checks}")
        print(f"   正常: {ok_count} ✅")
        print(f"   警告: {warning_count} ⚠️")
        print(f"   错误: {error_count} ❌")
        print()
        
        # 详细结果
        for check_name, result in self.diagnostic_results.items():
            status_icon = {
                'ok': '✅',
                'warning': '⚠️',
                'error': '❌'
            }.get(result['status'], '❓')
            
            print(f"{status_icon} {check_name}: {result['message']}")
            
            if 'suggestions' in result and result['suggestions']:
                print("   💡 建议:")
                for suggestion in result['suggestions']:
                    print(f"      - {suggestion}")
            
            print()
        
        print("=" * 60)

# 使用示例
diagnostic_tool = DiagnosticTool()
results = diagnostic_tool.run_full_diagnosis()
diagnostic_tool.generate_diagnostic_report()

print("故障排除与维护演示完成!")

3.2 维护脚本

# 8. 维护脚本
print("\n🛠️ 维护脚本:")

import shutil
import tarfile
import zipfile
from datetime import datetime, timedelta

class MaintenanceManager:
    """
    维护管理器
    """
    
    def __init__(self, config=None):
        self.config = config or {
            'log_retention_days': 30,
            'backup_retention_days': 7,
            'max_log_size_mb': 100,
            'backup_directory': 'backups',
            'log_directory': 'logs',
            'data_directory': 'data'
        }
    
    def run_daily_maintenance(self):
        """
        运行日常维护任务
        """
        print("🔄 开始日常维护任务...")
        
        maintenance_tasks = [
            ('清理旧日志', self.cleanup_old_logs),
            ('轮转大日志文件', self.rotate_large_logs),
            ('清理临时文件', self.cleanup_temp_files),
            ('备份重要数据', self.backup_important_data),
            ('清理旧备份', self.cleanup_old_backups),
            ('检查磁盘空间', self.check_disk_space),
            ('优化数据库', self.optimize_database),
            ('更新统计信息', self.update_statistics)
        ]
        
        results = {}
        
        for task_name, task_func in maintenance_tasks:
            print(f"\n📋 执行任务: {task_name}")
            try:
                result = task_func()
                results[task_name] = result
                
                if result.get('success', True):
                    print(f"  ✅ {result.get('message', '任务完成')}")
                else:
                    print(f"  ❌ {result.get('message', '任务失败')}")
                
                if 'details' in result:
                    for detail in result['details']:
                        print(f"     - {detail}")
            
            except Exception as e:
                error_result = {
                    'success': False,
                    'message': f"任务执行出错: {str(e)}"
                }
                results[task_name] = error_result
                print(f"  ❌ {error_result['message']}")
        
        # 生成维护报告
        self.generate_maintenance_report(results)
        
        return results
    
    def cleanup_old_logs(self):
        """
        清理旧日志文件
        """
        try:
            log_dir = self.config['log_directory']
            retention_days = self.config['log_retention_days']
            cutoff_date = datetime.now() - timedelta(days=retention_days)
            
            if not os.path.exists(log_dir):
                return {
                    'success': True,
                    'message': f"日志目录不存在: {log_dir}"
                }
            
            deleted_files = []
            total_size_freed = 0
            
            for root, dirs, files in os.walk(log_dir):
                for file in files:
                    if file.endswith('.log'):
                        file_path = os.path.join(root, file)
                        
                        # 检查文件修改时间
                        file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
                        
                        if file_mtime < cutoff_date:
                            file_size = os.path.getsize(file_path)
                            os.remove(file_path)
                            
                            deleted_files.append(file)
                            total_size_freed += file_size
            
            return {
                'success': True,
                'message': f"清理了 {len(deleted_files)} 个旧日志文件,释放 {total_size_freed / 1024 / 1024:.2f} MB",
                'details': [f"删除文件: {file}" for file in deleted_files[:10]]  # 只显示前10个
            }
        
        except Exception as e:
            return {
                'success': False,
                'message': f"清理旧日志失败: {str(e)}"
            }
    
    def rotate_large_logs(self):
        """
        轮转大日志文件
        """
        try:
            log_dir = self.config['log_directory']
            max_size_bytes = self.config['max_log_size_mb'] * 1024 * 1024
            
            if not os.path.exists(log_dir):
                return {
                    'success': True,
                    'message': f"日志目录不存在: {log_dir}"
                }
            
            rotated_files = []
            
            for root, dirs, files in os.walk(log_dir):
                for file in files:
                    if file.endswith('.log'):
                        file_path = os.path.join(root, file)
                        
                        if os.path.getsize(file_path) > max_size_bytes:
                            # 创建轮转文件名
                            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
                            rotated_name = f"{file}.{timestamp}"
                            rotated_path = os.path.join(root, rotated_name)
                            
                            # 移动文件
                            shutil.move(file_path, rotated_path)
                            
                            # 创建新的空日志文件
                            open(file_path, 'w').close()
                            
                            rotated_files.append(f"{file} -> {rotated_name}")
            
            return {
                'success': True,
                'message': f"轮转了 {len(rotated_files)} 个大日志文件",
                'details': rotated_files
            }
        
        except Exception as e:
            return {
                'success': False,
                'message': f"轮转日志文件失败: {str(e)}"
            }
    
    def cleanup_temp_files(self):
        """
        清理临时文件
        """
        try:
            temp_patterns = [
                '*.tmp',
                '*.temp',
                '__pycache__',
                '*.pyc',
                '.scrapy'
            ]
            
            deleted_items = []
            total_size_freed = 0
            
            for pattern in temp_patterns:
                if pattern == '__pycache__':
                    # 清理Python缓存目录
                    for root, dirs, files in os.walk('.'):
                        if '__pycache__' in dirs:
                            cache_dir = os.path.join(root, '__pycache__')
                            dir_size = sum(
                                os.path.getsize(os.path.join(dirpath, filename))
                                for dirpath, dirnames, filenames in os.walk(cache_dir)
                                for filename in filenames
                            )
                            shutil.rmtree(cache_dir)
                            deleted_items.append(f"目录: {cache_dir}")
                            total_size_freed += dir_size
                
                elif pattern == '.scrapy':
                    # 清理Scrapy临时目录
                    if os.path.exists('.scrapy'):
                        dir_size = sum(
                            os.path.getsize(os.path.join(dirpath, filename))
                            for dirpath, dirnames, filenames in os.walk('.scrapy')
                            for filename in filenames
                        )
                        shutil.rmtree('.scrapy')
                        deleted_items.append("目录: .scrapy")
                        total_size_freed += dir_size
                
                else:
                    # 清理匹配模式的文件
                    import glob
                    for file_path in glob.glob(pattern):
                        if os.path.isfile(file_path):
                            file_size = os.path.getsize(file_path)
                            os.remove(file_path)
                            deleted_items.append(f"文件: {file_path}")
                            total_size_freed += file_size
            
            return {
                'success': True,
                'message': f"清理了 {len(deleted_items)} 个临时文件/目录,释放 {total_size_freed / 1024 / 1024:.2f} MB",
                'details': deleted_items[:10]  # 只显示前10个
            }
        
        except Exception as e:
            return {
                'success': False,
                'message': f"清理临时文件失败: {str(e)}"
            }
    
    def backup_important_data(self):
        """
        备份重要数据
        """
        try:
            backup_dir = self.config['backup_directory']
            data_dir = self.config['data_directory']
            
            # 创建备份目录
            os.makedirs(backup_dir, exist_ok=True)
            
            # 创建备份文件名
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            backup_filename = f"scrapy_backup_{timestamp}.tar.gz"
            backup_path = os.path.join(backup_dir, backup_filename)
            
            # 要备份的文件和目录
            backup_items = [
                'scrapy.cfg',
                'settings.py',
                'deploy.ini'
            ]
            
            # 添加数据目录(如果存在)
            if os.path.exists(data_dir):
                backup_items.append(data_dir)
            
            # 添加爬虫代码目录
            for item in os.listdir('.'):
                if os.path.isdir(item) and item not in ['logs', 'backups', '__pycache__', '.scrapy']:
                    if any(f.endswith('.py') for f in os.listdir(item) if os.path.isfile(os.path.join(item, f))):
                        backup_items.append(item)
            
            # 创建tar.gz备份
            with tarfile.open(backup_path, 'w:gz') as tar:
                for item in backup_items:
                    if os.path.exists(item):
                        tar.add(item, arcname=item)
            
            backup_size = os.path.getsize(backup_path)
            
            return {
                'success': True,
                'message': f"创建备份: {backup_filename} ({backup_size / 1024 / 1024:.2f} MB)",
                'details': [f"备份项目: {item}" for item in backup_items if os.path.exists(item)]
            }
        
        except Exception as e:
            return {
                'success': False,
                'message': f"备份数据失败: {str(e)}"
            }
    
    def cleanup_old_backups(self):
        """
        清理旧备份文件
        """
        try:
            backup_dir = self.config['backup_directory']
            retention_days = self.config['backup_retention_days']
            cutoff_date = datetime.now() - timedelta(days=retention_days)
            
            if not os.path.exists(backup_dir):
                return {
                    'success': True,
                    'message': f"备份目录不存在: {backup_dir}"
                }
            
            deleted_backups = []
            total_size_freed = 0
            
            for file in os.listdir(backup_dir):
                if file.startswith('scrapy_backup_') and (file.endswith('.tar.gz') or file.endswith('.zip')):
                    file_path = os.path.join(backup_dir, file)
                    file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
                    
                    if file_mtime < cutoff_date:
                        file_size = os.path.getsize(file_path)
                        os.remove(file_path)
                        
                        deleted_backups.append(file)
                        total_size_freed += file_size
            
            return {
                'success': True,
                'message': f"清理了 {len(deleted_backups)} 个旧备份文件,释放 {total_size_freed / 1024 / 1024:.2f} MB",
                'details': [f"删除备份: {backup}" for backup in deleted_backups]
            }
        
        except Exception as e:
            return {
                'success': False,
                'message': f"清理旧备份失败: {str(e)}"
            }
    
    def check_disk_space(self):
        """
        检查磁盘空间
        """
        try:
            disk_usage = psutil.disk_usage('.')
            
            total_gb = disk_usage.total / (1024**3)
            used_gb = disk_usage.used / (1024**3)
            free_gb = disk_usage.free / (1024**3)
            used_percent = (disk_usage.used / disk_usage.total) * 100
            
            status = "正常"
            if used_percent > 95:
                status = "严重不足"
            elif used_percent > 90:
                status = "不足"
            elif used_percent > 80:
                status = "偏高"
            
            return {
                'success': True,
                'message': f"磁盘空间状态: {status} (使用率: {used_percent:.1f}%)",
                'details': [
                    f"总空间: {total_gb:.2f} GB",
                    f"已使用: {used_gb:.2f} GB",
                    f"可用空间: {free_gb:.2f} GB"
                ]
            }
        
        except Exception as e:
            return {
                'success': False,
                'message': f"检查磁盘空间失败: {str(e)}"
            }
    
    def optimize_database(self):
        """
        优化数据库
        """
        try:
            # 这里以SQLite为例
            import sqlite3
            
            db_files = []
            for root, dirs, files in os.walk('.'):
                for file in files:
                    if file.endswith('.db') or file.endswith('.sqlite'):
                        db_files.append(os.path.join(root, file))
            
            if not db_files:
                return {
                    'success': True,
                    'message': "未找到数据库文件"
                }
            
            optimized_dbs = []
            
            for db_file in db_files:
                try:
                    # 获取优化前的大小
                    original_size = os.path.getsize(db_file)
                    
                    # 连接数据库并执行VACUUM
                    conn = sqlite3.connect(db_file)
                    conn.execute('VACUUM;')
                    conn.close()
                    
                    # 获取优化后的大小
                    new_size = os.path.getsize(db_file)
                    size_saved = original_size - new_size
                    
                    optimized_dbs.append(f"{db_file}: 节省 {size_saved / 1024:.2f} KB")
                
                except Exception as e:
                    optimized_dbs.append(f"{db_file}: 优化失败 - {str(e)}")
            
            return {
                'success': True,
                'message': f"优化了 {len(db_files)} 个数据库文件",
                'details': optimized_dbs
            }
        
        except Exception as e:
            return {
                'success': False,
                'message': f"优化数据库失败: {str(e)}"
            }
    
    def update_statistics(self):
        """
        更新统计信息
        """
        try:
            stats = {}
            
            # 统计项目文件
            python_files = 0
            total_lines = 0
            
            for root, dirs, files in os.walk('.'):
                # 跳过特定目录
                dirs[:] = [d for d in dirs if d not in ['__pycache__', '.git', 'logs', 'backups']]
                
                for file in files:
                    if file.endswith('.py'):
                        python_files += 1
                        file_path = os.path.join(root, file)
                        
                        try:
                            with open(file_path, 'r', encoding='utf-8') as f:
                                total_lines += len(f.readlines())
                        except:
                            pass
            
            stats['python_files'] = python_files
            stats['total_lines'] = total_lines
            
            # 统计日志文件
            log_dir = self.config['log_directory']
            if os.path.exists(log_dir):
                log_files = len([f for f in os.listdir(log_dir) if f.endswith('.log')])
                log_size = sum(
                    os.path.getsize(os.path.join(log_dir, f))
                    for f in os.listdir(log_dir)
                    if f.endswith('.log')
                )
                stats['log_files'] = log_files
                stats['log_size_mb'] = log_size / 1024 / 1024
            
            # 统计备份文件
            backup_dir = self.config['backup_directory']
            if os.path.exists(backup_dir):
                backup_files = len([f for f in os.listdir(backup_dir) 
                                  if f.startswith('scrapy_backup_')])
                stats['backup_files'] = backup_files
            
            # 保存统计信息
            stats_file = 'project_stats.json'
            stats['last_updated'] = datetime.now().isoformat()
            
            with open(stats_file, 'w', encoding='utf-8') as f:
                json.dump(stats, f, indent=2, ensure_ascii=False)
            
            return {
                'success': True,
                'message': f"更新项目统计信息",
                'details': [
                    f"Python文件: {stats.get('python_files', 0)} 个",
                    f"代码行数: {stats.get('total_lines', 0)} 行",
                    f"日志文件: {stats.get('log_files', 0)} 个",
                    f"备份文件: {stats.get('backup_files', 0)} 个"
                ]
            }
        
        except Exception as e:
            return {
                'success': False,
                'message': f"更新统计信息失败: {str(e)}"
            }
    
    def generate_maintenance_report(self, results):
        """
        生成维护报告
        """
        print("\n📊 维护任务报告")
        print("=" * 60)
        print(f"维护时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print()
        
        # 统计结果
        total_tasks = len(results)
        successful_tasks = sum(1 for result in results.values() if result.get('success', True))
        failed_tasks = total_tasks - successful_tasks
        
        print(f"📈 任务概览:")
        print(f"   总任务数: {total_tasks}")
        print(f"   成功: {successful_tasks} ✅")
        print(f"   失败: {failed_tasks} ❌")
        print()
        
        # 详细结果
        for task_name, result in results.items():
            status_icon = "✅" if result.get('success', True) else "❌"
            print(f"{status_icon} {task_name}: {result.get('message', '无消息')}")
            
            if 'details' in result and result['details']:
                for detail in result['details'][:3]:  # 只显示前3个详情
                    print(f"     - {detail}")
                
                if len(result['details']) > 3:
                    print(f"     ... 还有 {len(result['details']) - 3} 项")
            
            print()
        
        print("=" * 60)

# 使用示例
maintenance_manager = MaintenanceManager()

# 运行日常维护
results = maintenance_manager.run_daily_maintenance()

print("维护脚本演示完成!")

4. 本章小结

在本章中,我们详细介绍了Scrapy项目的部署与运维,包括:

4.1 主要内容回顾

  1. 生产环境部署

    • 部署环境准备和配置
    • Scrapyd部署服务器的使用
    • Docker容器化部署
    • 项目打包和版本管理
  2. 监控系统

    • 性能监控和指标收集
    • 日志监控和分析
    • 告警系统和通知机制
    • 实时监控面板
  3. 故障排除与维护

    • 系统诊断工具
    • 常见问题排查
    • 维护脚本和自动化
    • 数据备份和恢复

4.2 最佳实践

  1. 部署最佳实践

    • 使用版本控制管理代码
    • 环境隔离和配置管理
    • 自动化部署流程
    • 滚动更新和回滚策略
  2. 监控最佳实践

    • 设置合理的监控指标
    • 配置多层次告警机制
    • 定期检查监控系统
    • 保持监控数据的时效性
  3. 维护最佳实践

    • 定期执行维护任务
    • 保持系统资源充足
    • 及时清理无用数据
    • 建立完善的备份策略

4.3 常见陷阱

  1. 部署陷阱

    • 忽略环境差异导致的问题
    • 配置文件管理不当
    • 依赖包版本冲突
    • 权限配置错误
  2. 监控陷阱

    • 监控指标设置不合理
    • 告警频率过高或过低
    • 忽略系统资源监控
    • 日志文件过大影响性能
  3. 维护陷阱

    • 维护任务执行不及时
    • 备份策略不完善
    • 忽略磁盘空间管理
    • 数据库优化不当

4.4 下一步学习建议

  1. 深入学习容器技术

    • Docker进阶使用
    • Kubernetes集群管理
    • 微服务架构设计
  2. 监控系统进阶

    • Prometheus + Grafana
    • ELK日志分析栈
    • APM应用性能监控
  3. 自动化运维

    • CI/CD流水线
    • Infrastructure as Code
    • 自动化测试和部署
  4. 云平台部署

    • AWS/Azure/GCP部署
    • 云原生架构
    • Serverless爬虫

4.5 练习题

  1. 基础练习

    • 使用Docker部署一个Scrapy项目
    • 配置Scrapyd服务器并部署爬虫
    • 实现基本的性能监控
  2. 进阶练习

    • 设计完整的监控告警系统
    • 实现自动化维护脚本
    • 配置日志轮转和备份策略
  3. 高级练习

    • 实现分布式监控系统
    • 设计故障自动恢复机制
    • 构建完整的运维平台

🎉 恭喜!你已经完成了Scrapy教程的第8章!

通过本章的学习,你已经掌握了Scrapy项目的部署与运维技能,能够将爬虫项目成功部署到生产环境,并建立完善的监控和维护体系。这些技能对于构建稳定、可靠的爬虫系统至关重要。

在下一章中,我们将学习Scrapy的扩展开发,包括自定义扩展、插件开发等高级主题。