本章概述

本章将通过多个完整的实战项目,展示如何将前面学到的Scrapy知识综合运用到实际场景中。我们将从简单的单页面爬虫开始,逐步深入到复杂的分布式爬虫系统,涵盖电商数据采集、新闻聚合、社交媒体监控等多个领域。

学习目标

  • 掌握完整的Scrapy项目开发流程
  • 学会处理各种复杂的爬取场景
  • 了解项目架构设计和最佳实践
  • 掌握性能优化和问题排查技巧
  • 学会项目部署和运维管理

1. 项目开发流程与规范

1.1 项目规划与设计

# 1. 项目开发流程演示
print("🚀 Scrapy实战项目开发流程:")

class ProjectPlanningDemo:
    """
    项目规划演示
    """
    
    def demonstrate_project_analysis(self):
        """
        演示项目需求分析
        """
        print("\n📋 项目需求分析:")
        
        analysis_template = '''
# 项目需求分析模板

## 1. 项目背景
- 业务需求:明确爬取数据的用途和价值
- 目标网站:分析目标网站的结构和特点
- 数据规模:估算数据量和更新频率
- 时间要求:确定项目开发和交付时间

## 2. 技术分析
- 网站技术栈:静态页面 vs 动态页面
- 反爬虫机制:验证码、IP限制、用户代理检测等
- 数据格式:HTML、JSON、XML等
- 存储需求:数据库类型和存储方案

## 3. 风险评估
- 法律风险:robots.txt、服务条款
- 技术风险:反爬虫升级、网站改版
- 性能风险:服务器负载、网络稳定性
- 数据风险:数据质量、数据完整性

## 4. 资源规划
- 开发人员:技能要求和人员配置
- 硬件资源:服务器、存储、网络
- 软件资源:开发工具、第三方服务
- 时间资源:开发周期和里程碑
'''
        
        print("📄 需求分析模板:")
        print(analysis_template)
    
    def demonstrate_architecture_design(self):
        """
        演示架构设计
        """
        print("\n🏗️ 系统架构设计:")
        
        architecture_code = '''
import os
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum

class ProjectType(Enum):
    """项目类型"""
    SIMPLE_CRAWLER = "simple_crawler"
    DISTRIBUTED_CRAWLER = "distributed_crawler"
    REAL_TIME_CRAWLER = "real_time_crawler"
    BATCH_CRAWLER = "batch_crawler"

class DataSource(Enum):
    """数据源类型"""
    WEB_PAGE = "web_page"
    API = "api"
    DATABASE = "database"
    FILE = "file"

@dataclass
class ProjectConfig:
    """项目配置"""
    name: str
    type: ProjectType
    description: str
    target_urls: List[str]
    data_sources: List[DataSource]
    output_formats: List[str]
    storage_backends: List[str]
    
    # 性能配置
    concurrent_requests: int = 16
    download_delay: float = 1.0
    randomize_download_delay: bool = True
    
    # 反爬虫配置
    use_proxy: bool = False
    rotate_user_agent: bool = True
    respect_robots_txt: bool = True
    
    # 监控配置
    enable_monitoring: bool = True
    enable_alerting: bool = True
    log_level: str = "INFO"

class ProjectArchitect:
    """项目架构师"""
    
    def __init__(self):
        self.components = {}
        self.dependencies = {}
    
    def design_architecture(self, config: ProjectConfig) -> Dict:
        """
        设计项目架构
        """
        architecture = {
            'project_info': {
                'name': config.name,
                'type': config.type.value,
                'description': config.description
            },
            'components': self._design_components(config),
            'data_flow': self._design_data_flow(config),
            'deployment': self._design_deployment(config),
            'monitoring': self._design_monitoring(config)
        }
        
        return architecture
    
    def _design_components(self, config: ProjectConfig) -> Dict:
        """
        设计系统组件
        """
        components = {
            'spiders': {
                'description': '爬虫组件,负责数据采集',
                'files': ['spiders/__init__.py', 'spiders/main_spider.py'],
                'dependencies': ['scrapy', 'requests']
            },
            'items': {
                'description': '数据模型,定义数据结构',
                'files': ['items.py'],
                'dependencies': ['scrapy']
            },
            'pipelines': {
                'description': '数据处理管道',
                'files': ['pipelines.py'],
                'dependencies': ['scrapy', 'pymongo', 'redis']
            },
            'middlewares': {
                'description': '中间件,处理请求和响应',
                'files': ['middlewares.py'],
                'dependencies': ['scrapy', 'fake-useragent']
            },
            'extensions': {
                'description': '扩展组件,增强功能',
                'files': ['extensions.py'],
                'dependencies': ['scrapy']
            },
            'utils': {
                'description': '工具函数',
                'files': ['utils/__init__.py', 'utils/helpers.py'],
                'dependencies': []
            }
        }
        
        # 根据项目类型添加特定组件
        if config.type == ProjectType.DISTRIBUTED_CRAWLER:
            components['scheduler'] = {
                'description': '分布式调度器',
                'files': ['scheduler.py'],
                'dependencies': ['scrapy-redis', 'redis']
            }
        
        if config.enable_monitoring:
            components['monitoring'] = {
                'description': '监控组件',
                'files': ['monitoring.py'],
                'dependencies': ['prometheus-client', 'grafana-api']
            }
        
        return components
    
    def _design_data_flow(self, config: ProjectConfig) -> Dict:
        """
        设计数据流
        """
        data_flow = {
            'input': {
                'sources': [source.value for source in config.data_sources],
                'formats': ['html', 'json', 'xml']
            },
            'processing': {
                'extraction': '使用CSS选择器和XPath提取数据',
                'cleaning': '数据清洗和标准化',
                'validation': '数据验证和质量检查',
                'transformation': '数据转换和格式化'
            },
            'output': {
                'formats': config.output_formats,
                'storage': config.storage_backends,
                'apis': ['REST API', 'GraphQL']
            }
        }
        
        return data_flow
    
    def _design_deployment(self, config: ProjectConfig) -> Dict:
        """
        设计部署方案
        """
        deployment = {
            'development': {
                'environment': 'local',
                'tools': ['Docker', 'docker-compose'],
                'services': ['scrapy', 'redis', 'mongodb']
            },
            'testing': {
                'environment': 'staging',
                'tools': ['pytest', 'coverage'],
                'ci_cd': ['GitHub Actions', 'Jenkins']
            },
            'production': {
                'environment': 'cloud',
                'platform': ['AWS', 'GCP', 'Azure'],
                'orchestration': ['Kubernetes', 'Docker Swarm'],
                'monitoring': ['Prometheus', 'Grafana', 'ELK Stack']
            }
        }
        
        return deployment
    
    def _design_monitoring(self, config: ProjectConfig) -> Dict:
        """
        设计监控方案
        """
        monitoring = {
            'metrics': {
                'system': ['CPU', 'Memory', 'Disk', 'Network'],
                'application': ['Requests/sec', 'Response time', 'Error rate'],
                'business': ['Items scraped', 'Data quality', 'Coverage']
            },
            'logging': {
                'levels': ['DEBUG', 'INFO', 'WARNING', 'ERROR'],
                'destinations': ['File', 'Database', 'Cloud'],
                'format': 'JSON structured logging'
            },
            'alerting': {
                'channels': ['Email', 'Slack', 'PagerDuty'],
                'rules': ['Error rate > 5%', 'Response time > 10s'],
                'escalation': 'Auto-escalation after 15 minutes'
            }
        }
        
        return monitoring
    
    def generate_project_structure(self, config: ProjectConfig) -> str:
        """
        生成项目结构
        """
        structure = f'''
{config.name}/
├── scrapy.cfg                 # Scrapy配置文件
├── requirements.txt           # Python依赖
├── Dockerfile                # Docker配置
├── docker-compose.yml        # Docker Compose配置
├── README.md                 # 项目文档
├── .gitignore               # Git忽略文件
├── .env                     # 环境变量
├── {config.name}/
│   ├── __init__.py
│   ├── items.py             # 数据模型
│   ├── middlewares.py       # 中间件
│   ├── pipelines.py         # 数据管道
│   ├── settings.py          # 项目设置
│   ├── extensions.py        # 扩展组件
│   ├── spiders/
│   │   ├── __init__.py
│   │   └── main_spider.py   # 主爬虫
│   └── utils/
│       ├── __init__.py
│       ├── helpers.py       # 工具函数
│       └── validators.py    # 数据验证
├── tests/                   # 测试文件
│   ├── __init__.py
│   ├── test_spiders.py
│   ├── test_pipelines.py
│   └── test_utils.py
├── docs/                    # 文档
│   ├── api.md
│   ├── deployment.md
│   └── troubleshooting.md
├── scripts/                 # 脚本文件
│   ├── deploy.sh
│   ├── backup.sh
│   └── monitor.py
└── data/                    # 数据文件
    ├── raw/
    ├── processed/
    └── exports/
'''
        
        return structure

# 演示项目规划
project_demo = ProjectPlanningDemo()
project_demo.demonstrate_project_analysis()
project_demo.demonstrate_architecture_design()

# 创建示例项目配置
sample_config = ProjectConfig(
    name="ecommerce_crawler",
    type=ProjectType.DISTRIBUTED_CRAWLER,
    description="电商网站数据采集系统",
    target_urls=["https://example-shop.com"],
    data_sources=[DataSource.WEB_PAGE, DataSource.API],
    output_formats=["json", "csv"],
    storage_backends=["mongodb", "elasticsearch"],
    concurrent_requests=32,
    download_delay=0.5,
    use_proxy=True,
    enable_monitoring=True
)

# 生成项目架构
architect = ProjectArchitect()
architecture = architect.design_architecture(sample_config)

print("\n🏗️ 生成的项目架构:")
for key, value in architecture.items():
    print(f"📁 {key}: {value}")

print("\n📂 项目结构:")
print(architect.generate_project_structure(sample_config))

print("项目规划演示完成!")

1.2 开发环境搭建

# 2. 开发环境搭建
print("\n🛠️ 开发环境搭建:")

class DevelopmentEnvironment:
    """
    开发环境管理
    """
    
    def __init__(self):
        self.tools = {}
        self.dependencies = {}
    
    def setup_local_environment(self):
        """
        搭建本地开发环境
        """
        print("\n💻 本地开发环境搭建:")
        
        setup_script = '''
#!/bin/bash
# 本地开发环境搭建脚本

echo "🚀 开始搭建Scrapy开发环境..."

# 1. 检查Python版本
echo "📋 检查Python版本..."
python_version=$(python3 --version 2>&1)
echo "Python版本: $python_version"

if ! command -v python3 &> /dev/null; then
    echo "❌ Python3未安装,请先安装Python3.8+"
    exit 1
fi

# 2. 创建虚拟环境
echo "🔧 创建虚拟环境..."
python3 -m venv scrapy_env
source scrapy_env/bin/activate

# 3. 升级pip
echo "⬆️ 升级pip..."
pip install --upgrade pip

# 4. 安装核心依赖
echo "📦 安装核心依赖..."
pip install scrapy
pip install scrapy-redis
pip install scrapy-splash
pip install scrapy-playwright

# 5. 安装数据库驱动
echo "🗄️ 安装数据库驱动..."
pip install pymongo
pip install redis
pip install elasticsearch
pip install psycopg2-binary

# 6. 安装开发工具
echo "🛠️ 安装开发工具..."
pip install pytest
pip install pytest-cov
pip install black
pip install flake8
pip install mypy

# 7. 安装监控工具
echo "📊 安装监控工具..."
pip install prometheus-client
pip install grafana-api
pip install sentry-sdk

# 8. 安装其他工具
echo "🔧 安装其他工具..."
pip install fake-useragent
pip install requests
pip install beautifulsoup4
pip install lxml
pip install pillow

# 9. 生成requirements.txt
echo "📝 生成requirements.txt..."
pip freeze > requirements.txt

echo "✅ 开发环境搭建完成!"
echo "🎯 激活环境: source scrapy_env/bin/activate"
'''
        
        print("📄 环境搭建脚本:")
        print(setup_script)
    
    def setup_docker_environment(self):
        """
        搭建Docker开发环境
        """
        print("\n🐳 Docker开发环境:")
        
        dockerfile = '''
# Dockerfile
FROM python:3.9-slim

# 设置工作目录
WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \\
    gcc \\
    g++ \\
    libxml2-dev \\
    libxslt-dev \\
    libffi-dev \\
    libssl-dev \\
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .

# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制项目文件
COPY . .

# 设置环境变量
ENV PYTHONPATH=/app
ENV SCRAPY_SETTINGS_MODULE=myproject.settings

# 暴露端口
EXPOSE 6800

# 启动命令
CMD ["scrapyd"]
'''
        
        docker_compose = '''
# docker-compose.yml
version: '3.8'

services:
  scrapy:
    build: .
    ports:
      - "6800:6800"
    volumes:
      - .:/app
      - ./data:/app/data
    environment:
      - REDIS_URL=redis://redis:6379
      - MONGO_URL=mongodb://mongo:27017
    depends_on:
      - redis
      - mongo
    networks:
      - scrapy-network

  redis:
    image: redis:6-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    networks:
      - scrapy-network

  mongo:
    image: mongo:4.4
    ports:
      - "27017:27017"
    volumes:
      - mongo_data:/data/db
    environment:
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=password
    networks:
      - scrapy-network

  elasticsearch:
    image: elasticsearch:7.14.0
    ports:
      - "9200:9200"
    environment:
      - discovery.type=single-node
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - es_data:/usr/share/elasticsearch/data
    networks:
      - scrapy-network

  kibana:
    image: kibana:7.14.0
    ports:
      - "5601:5601"
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    depends_on:
      - elasticsearch
    networks:
      - scrapy-network

  prometheus:
    image: prom/prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus
    networks:
      - scrapy-network

  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    volumes:
      - grafana_data:/var/lib/grafana
    networks:
      - scrapy-network

volumes:
  redis_data:
  mongo_data:
  es_data:
  prometheus_data:
  grafana_data:

networks:
  scrapy-network:
    driver: bridge
'''
        
        print("📄 Dockerfile:")
        print(dockerfile)
        print("\n📄 docker-compose.yml:")
        print(docker_compose)
    
    def setup_development_tools(self):
        """
        配置开发工具
        """
        print("\n🔧 开发工具配置:")
        
        # pytest配置
        pytest_ini = '''
# pytest.ini
[tool:pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts = 
    --verbose
    --tb=short
    --cov=myproject
    --cov-report=html
    --cov-report=term-missing
markers =
    unit: Unit tests
    integration: Integration tests
    e2e: End-to-end tests
    slow: Slow running tests
'''
        
        # Black配置
        pyproject_toml = '''
# pyproject.toml
[tool.black]
line-length = 88
target-version = ['py38']
include = '\\.pyi?$'
extend-exclude = '''
/(
  # directories
  \\.eggs
  | \\.git
  | \\.hg
  | \\.mypy_cache
  | \\.tox
  | \\.venv
  | build
  | dist
)/
'''

[tool.isort]
profile = "black"
multi_line_output = 3
line_length = 88

[tool.mypy]
python_version = "3.8"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
'''
        
        # flake8配置
        flake8_cfg = '''
# .flake8
[flake8]
max-line-length = 88
extend-ignore = E203, W503
exclude = 
    .git,
    __pycache__,
    .venv,
    build,
    dist,
    *.egg-info
'''
        
        # pre-commit配置
        pre_commit_yaml = '''
# .pre-commit-config.yaml
repos:
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.4.0
    hooks:
      - id: trailing-whitespace
      - id: end-of-file-fixer
      - id: check-yaml
      - id: check-added-large-files

  - repo: https://github.com/psf/black
    rev: 22.10.0
    hooks:
      - id: black

  - repo: https://github.com/pycqa/isort
    rev: 5.10.1
    hooks:
      - id: isort

  - repo: https://github.com/pycqa/flake8
    rev: 5.0.4
    hooks:
      - id: flake8

  - repo: https://github.com/pre-commit/mirrors-mypy
    rev: v0.991
    hooks:
      - id: mypy
        additional_dependencies: [types-requests]
'''
        
        print("📄 pytest.ini:")
        print(pytest_ini)
        print("\n📄 pyproject.toml:")
        print(pyproject_toml)
        print("\n📄 .flake8:")
        print(flake8_cfg)
        print("\n📄 .pre-commit-config.yaml:")
        print(pre_commit_yaml)

# 演示开发环境搭建
dev_env = DevelopmentEnvironment()
dev_env.setup_local_environment()
dev_env.setup_docker_environment()
dev_env.setup_development_tools()

print("开发环境搭建演示完成!")

2. 电商数据采集项目

2.1 项目需求分析

# 3. 电商数据采集项目
print("\n🛒 电商数据采集项目:")

class EcommerceProject:
    """
    电商数据采集项目
    """
    
    def __init__(self):
        self.project_name = "ecommerce_crawler"
        self.target_sites = ["amazon", "ebay", "shopify"]
        self.data_types = ["products", "reviews", "prices", "sellers"]
    
    def analyze_requirements(self):
        """
        需求分析
        """
        print("\n📋 项目需求分析:")
        
        requirements = {
            '业务需求': [
                '采集主流电商平台的商品信息',
                '监控商品价格变化趋势',
                '分析用户评论和评分',
                '跟踪卖家信息和信誉',
                '生成市场分析报告'
            ],
            '技术需求': [
                '支持多个电商平台',
                '处理JavaScript渲染页面',
                '应对反爬虫机制',
                '实现分布式爬取',
                '数据质量保证'
            ],
            '性能需求': [
                '每日处理100万+商品',
                '响应时间<5秒',
                '数据准确率>95%',
                '系统可用性>99%',
                '支持水平扩展'
            ],
            '合规需求': [
                '遵守robots.txt',
                '控制访问频率',
                '保护用户隐私',
                '数据使用合规',
                '避免服务器过载'
            ]
        }
        
        for category, items in requirements.items():
            print(f"\n📖 {category}:")
            for item in items:
                print(f"   • {item}")
    
    def design_data_model(self):
        """
        设计数据模型
        """
        print("\n🗄️ 数据模型设计:")
        
        data_model_code = '''
import scrapy
from scrapy import Item, Field
from datetime import datetime
from typing import Optional, List, Dict

class ProductItem(scrapy.Item):
    """
    商品数据模型
    """
    # 基本信息
    product_id = Field()          # 商品ID
    title = Field()               # 商品标题
    brand = Field()               # 品牌
    category = Field()            # 分类
    subcategory = Field()         # 子分类
    
    # 价格信息
    current_price = Field()       # 当前价格
    original_price = Field()      # 原价
    discount = Field()            # 折扣
    currency = Field()            # 货币
    
    # 详细信息
    description = Field()         # 商品描述
    specifications = Field()      # 规格参数
    images = Field()              # 商品图片
    videos = Field()              # 商品视频
    
    # 销售信息
    seller_id = Field()           # 卖家ID
    seller_name = Field()         # 卖家名称
    stock_quantity = Field()      # 库存数量
    sales_count = Field()         # 销量
    
    # 评价信息
    rating = Field()              # 评分
    review_count = Field()        # 评论数
    rating_distribution = Field() # 评分分布
    
    # 元数据
    source_url = Field()          # 来源URL
    source_site = Field()         # 来源网站
    crawl_time = Field()          # 爬取时间
    update_time = Field()         # 更新时间

class ReviewItem(scrapy.Item):
    """
    评论数据模型
    """
    # 基本信息
    review_id = Field()           # 评论ID
    product_id = Field()          # 商品ID
    user_id = Field()             # 用户ID
    user_name = Field()           # 用户名
    
    # 评论内容
    rating = Field()              # 评分
    title = Field()               # 评论标题
    content = Field()             # 评论内容
    pros = Field()                # 优点
    cons = Field()                # 缺点
    
    # 评论属性
    verified_purchase = Field()   # 是否验证购买
    helpful_count = Field()       # 有用数
    total_votes = Field()         # 总投票数
    
    # 时间信息
    review_date = Field()         # 评论日期
    crawl_time = Field()          # 爬取时间

class SellerItem(scrapy.Item):
    """
    卖家数据模型
    """
    # 基本信息
    seller_id = Field()           # 卖家ID
    seller_name = Field()         # 卖家名称
    company_name = Field()        # 公司名称
    
    # 联系信息
    email = Field()               # 邮箱
    phone = Field()               # 电话
    address = Field()             # 地址
    website = Field()             # 网站
    
    # 业务信息
    business_type = Field()       # 业务类型
    registration_date = Field()   # 注册日期
    product_count = Field()       # 商品数量
    category_focus = Field()      # 主营类目
    
    # 信誉信息
    rating = Field()              # 评分
    review_count = Field()        # 评论数
    positive_feedback = Field()   # 好评率
    response_rate = Field()       # 响应率
    response_time = Field()       # 响应时间
    
    # 元数据
    source_url = Field()          # 来源URL
    source_site = Field()         # 来源网站
    crawl_time = Field()          # 爬取时间

class PriceHistoryItem(scrapy.Item):
    """
    价格历史数据模型
    """
    product_id = Field()          # 商品ID
    price = Field()               # 价格
    currency = Field()            # 货币
    date = Field()                # 日期
    source_site = Field()         # 来源网站
    crawl_time = Field()          # 爬取时间

# 数据验证器
class DataValidator:
    """
    数据验证器
    """
    
    @staticmethod
    def validate_product(item: ProductItem) -> bool:
        """
        验证商品数据
        """
        required_fields = ['product_id', 'title', 'current_price', 'source_url']
        
        for field in required_fields:
            if not item.get(field):
                return False
        
        # 验证价格格式
        try:
            float(item['current_price'])
        except (ValueError, TypeError):
            return False
        
        # 验证URL格式
        if not item['source_url'].startswith(('http://', 'https://')):
            return False
        
        return True
    
    @staticmethod
    def validate_review(item: ReviewItem) -> bool:
        """
        验证评论数据
        """
        required_fields = ['review_id', 'product_id', 'rating', 'content']
        
        for field in required_fields:
            if not item.get(field):
                return False
        
        # 验证评分范围
        try:
            rating = float(item['rating'])
            if not 1 <= rating <= 5:
                return False
        except (ValueError, TypeError):
            return False
        
        return True
    
    @staticmethod
    def clean_price(price_str: str) -> Optional[float]:
        """
        清洗价格数据
        """
        if not price_str:
            return None
        
        # 移除货币符号和空格
        import re
        cleaned = re.sub(r'[^0-9.,]', '', str(price_str))
        
        # 处理千分位分隔符
        if ',' in cleaned and '.' in cleaned:
            # 假设最后一个是小数点
            parts = cleaned.split('.')
            if len(parts[-1]) <= 2:  # 小数部分
                cleaned = ''.join(parts[:-1]).replace(',', '') + '.' + parts[-1]
            else:
                cleaned = cleaned.replace(',', '')
        else:
            cleaned = cleaned.replace(',', '')
        
        try:
            return float(cleaned)
        except ValueError:
            return None
    
    @staticmethod
    def clean_text(text: str) -> str:
        """
        清洗文本数据
        """
        if not text:
            return ""
        
        import re
        
        # 移除多余空白
        text = re.sub(r'\\s+', ' ', text.strip())
        
        # 移除HTML标签
        text = re.sub(r'<[^>]+>', '', text)
        
        # 解码HTML实体
        import html
        text = html.unescape(text)
        
        return text
'''
        
        print("📄 数据模型代码:")
        print(data_model_code)

# 演示电商项目
ecommerce_project = EcommerceProject()
ecommerce_project.analyze_requirements()
ecommerce_project.design_data_model()

print("电商项目需求分析完成!")

2.2 爬虫实现

”`python

4. 电商爬虫实现

print(“\n🕷️ 电商爬虫实现:”)

class EcommerceSpiderImplementation: “”” 电商爬虫实现 “””

def demonstrate_amazon_spider(self):
    """
    演示Amazon爬虫
    """
    print("\n🛒 Amazon商品爬虫:")

    amazon_spider_code = '''

import scrapy import json import re from urllib.parse import urljoin, urlparse from scrapy.http import Request from ..items import ProductItem, ReviewItem from ..utils.helpers import clean_price, clean_text

class AmazonProductSpider(scrapy.Spider): “”” Amazon商品爬虫 “”” name = ‘amazon_products’ allowed_domains = [‘amazon.com’]

# 自定义设置
custom_settings = {
    'DOWNLOAD_DELAY': 2,
    'RANDOMIZE_DOWNLOAD_DELAY': True,
    'CONCURRENT_REQUESTS': 8,
    'CONCURRENT_REQUESTS_PER_DOMAIN': 4,
    'USER_AGENT': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
    'COOKIES_ENABLED': True,
    'RETRY_TIMES': 3,
    'RETRY_HTTP_CODES': [500, 502, 503, 504, 408, 429],
}

def __init__(self, category=None, max_pages=10, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.category = category or 'electronics'
    self.max_pages = int(max_pages)
    self.page_count = 0

def start_requests(self):
    """
    生成初始请求
    """
    # 分类页面URL
    category_urls = {
        'electronics': 'https://www.amazon.com/s?k=electronics',
        'books': 'https://www.amazon.com/s?k=books',
        'clothing': 'https://www.amazon.com/s?k=clothing',
        'home': 'https://www.amazon.com/s?k=home+kitchen',
    }

    start_url = category_urls.get(self.category, category_urls['electronics'])

    yield Request(
        url=start_url,
        callback=self.parse_product_list,
        meta={'page': 1}
    )

def parse_product_list(self, response):
    """
    解析商品列表页
    """
    # 提取商品链接
    product_links = response.css('[data-component-type="s-search-result"] h2 a::attr(href)').getall()

    for link in product_links:
        product_url = urljoin(response.url, link)
        yield Request(
            url=product_url,
            callback=self.parse_product,
            meta={'source_url': product_url}
        )

    # 处理分页
    current_page = response.meta.get('page', 1)
    if current_page < self.max_pages:
        next_page_link = response.css('a[aria-label="Go to next page"]::attr(href)').get()
        if next_page_link:
            next_page_url = urljoin(response.url, next_page_link)
            yield Request(
                url=next_page_url,
                callback=self.parse_product_list,
                meta={'page': current_page + 1}
            )

def parse_product(self, response):
    """
    解析商品详情页
    """
    # 检查页面是否被阻止
    if self.is_blocked(response):
        self.logger.warning(f"Blocked page detected: {response.url}")
        return

    # 提取商品信息
    item = ProductItem()

    # 基本信息
    item['product_id'] = self.extract_product_id(response.url)
    item['title'] = self.extract_title(response)
    item['brand'] = self.extract_brand(response)
    item['category'] = self.extract_category(response)

    # 价格信息
    item['current_price'] = self.extract_current_price(response)
    item['original_price'] = self.extract_original_price(response)
    item['currency'] = 'USD'

    # 详细信息
    item['description'] = self.extract_description(response)
    item['specifications'] = self.extract_specifications(response)
    item['images'] = self.extract_images(response)

    # 销售信息
    item['seller_name'] = self.extract_seller(response)
    item['stock_quantity'] = self.extract_stock(response)

    # 评价信息
    item['rating'] = self.extract_rating(response)
    item['review_count'] = self.extract_review_count(response)

    # 元数据
    item['source_url'] = response.url
    item['source_site'] = 'amazon'
    item['crawl_time'] = self.get_current_time()

    yield item

    # 爬取评论
    reviews_url = self.get_reviews_url(response)
    if reviews_url:
        yield Request(
            url=reviews_url,
            callback=self.parse_reviews,
            meta={'product_id': item['product_id']}
        )

def parse_reviews(self, response):
    """
    解析商品评论
    """
    product_id = response.meta['product_id']

    # 提取评论列表
    review_elements = response.css('[data-hook="review"]')

    for review_element in review_elements:
        review_item = ReviewItem()

        review_item['product_id'] = product_id
        review_item['review_id'] = review_element.css('::attr(id)').get()
        review_item['user_name'] = review_element.css('[class*="author"] span::text').get()
        review_item['rating'] = self.extract_review_rating(review_element)
        review_item['title'] = review_element.css('[data-hook="review-title"] span::text').get()
        review_item['content'] = review_element.css('[data-hook="review-body"] span::text').get()
        review_item['review_date'] = review_element.css('[data-hook="review-date"]::text').get()
        review_item['verified_purchase'] = bool(review_element.css('[data-hook="avp-badge"]'))
        review_item['helpful_count'] = self.extract_helpful_count(review_element)
        review_item['crawl_time'] = self.get_current_time()

        # 清洗数据
        if review_item['title']:
            review_item['title'] = clean_text(review_item['title'])
        if review_item['content']:
            review_item['content'] = clean_text(review_item['content'])

        yield review_item

    # 处理评论分页
    next_page = response.css('li.a-last a::attr(href)').get()
    if next_page:
        next_url = urljoin(response.url, next_page)
        yield Request(
            url=next_url,
            callback=self.parse_reviews,
            meta={'product_id': product_id}
        )

def extract_product_id(self, url):
    """
    提取商品ID
    """
    # Amazon商品ID通常在URL中
    match = re.search(r'/dp/([A-Z0-9]{10})', url)
    return match.group(1) if match else None

def extract_title(self, response):
    """
    提取商品标题
    """
    title_selectors = [
        '#productTitle::text',
        '.product-title::text',
        'h1.a-size-large::text'
    ]

    for selector in title_selectors:
        title = response.css(selector).get()
        if title:
            return clean_text(title)

    return None

def extract_brand(self, response):
    """
    提取品牌信息
    """
    brand_selectors = [
        '#bylineInfo::text',
        '.a-link-normal[id*="brand"]::text',
        '[data-feature-name="bylineInfo"] span::text'
    ]

    for selector in brand_selectors:
        brand = response.css(selector).get()
        if brand and 'Brand:' in brand:
            return brand.replace('Brand:', '').strip()
        elif brand:
            return clean_text(brand)

    return None

def extract_current_price(self, response):
    """
    提取当前价格
    """
    price_selectors = [
        '.a-price-current .a-offscreen::text',
        '.a-price .a-offscreen::text',
        '#price_inside_buybox::text',
        '.a-price-range .a-offscreen::text'
    ]

    for selector in price_selectors:
        price = response.css(selector).get()
        if price:
            return clean_price(price)

    return None

def extract_original_price(self, response):
    """
    提取原价
    """
    original_price_selectors = [
        '.a-price-was .a-offscreen::text',
        '.a-text-strike .a-offscreen::text'
    ]

    for selector in original_price_selectors:
        price = response.css(selector).get()
        if price:
            return clean_price(price)

    return None

def extract_rating(self, response):
    """
    提取评分
    """
    rating_selectors = [
        '.a-icon-alt::text',
        '[data-hook="rating-out-of-text"]::text'
    ]

    for selector in rating_selectors:
        rating_text = response.css(selector).get()
        if rating_text:
            match = re.search(r'(\\d+\\.\\d+)', rating_text)
            if match:
                return float(match.group(1))

    return None

def extract_review_count(self, response):
    """
    提取评论数量
    """
    review_count_selectors = [
        '#acrCustomerReviewText::text',
        '[data-hook="total-review-count"]::text'
    ]

    for selector in review_count_selectors:
        count_text = response.css(selector).get()
        if count_text:
            match = re.search(r'([\\d,]+)', count_text)
            if match:
                return int(match.group(1).replace(',', ''))

    return None

def is_blocked(self, response):
    """
    检查是否被阻止
    """
    blocked_indicators = [
        'Robot Check',
        'Enter the characters you see below',
        'Sorry, we just need to make sure you\\'re not a robot',
        'captcha'
    ]

    page_text = response.text.lower()
    return any(indicator.lower() in page_text for indicator in blocked_indicators)

def get_current_time(self):
    """
    获取当前时间
    """
    from datetime import datetime
    return datetime.now().isoformat()

def get_reviews_url(self, response):
    """
    获取评论页面URL
    """
    product_id = self.extract_product_id(response.url)
    if product_id:
        return f"https://www.amazon.com/product-reviews/{product_id}/ref=cm_cr_dp_d_show_all_btm"
    return None

”’

    print("📄 Amazon爬虫代码:")
    print(amazon_spider_code)

def demonstrate_shopify_spider(self):
    """
    演示Shopify爬虫
    """
    print("\n🏪 Shopify商店爬虫:")

    shopify_spider_code = '''

import scrapy import json from urllib.parse import urljoin from ..items import ProductItem

class ShopifySpider(scrapy.Spider): “”” Shopify商店爬虫 “”” name = ‘shopify_products’

def __init__(self, shop_domain=None, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.shop_domain = shop_domain
    if not shop_domain:
        raise ValueError("shop_domain is required")

    self.allowed_domains = [shop_domain]
    self.base_url = f"https://{shop_domain}"

def start_requests(self):
    """
    生成初始请求
    """
    # 使用Shopify API获取商品
    api_url = f"{self.base_url}/products.json"

    yield scrapy.Request(
        url=api_url,
        callback=self.parse_products_api,
        meta={'page': 1}
    )

def parse_products_api(self, response):
    """
    解析商品API响应
    """
    try:
        data = json.loads(response.text)
        products = data.get('products', [])

        for product_data in products:
            item = self.parse_product_data(product_data)
            yield item

        # 处理分页
        if len(products) == 250:  # Shopify API每页最多250个商品
            page = response.meta['page']
            next_page_url = f"{self.base_url}/products.json?page={page + 1}"
            yield scrapy.Request(
                url=next_page_url,
                callback=self.parse_products_api,
                meta={'page': page + 1}
            )

    except json.JSONDecodeError:
        self.logger.error(f"Failed to parse JSON from {response.url}")

def parse_product_data(self, product_data):
    """
    解析商品数据
    """
    item = ProductItem()

    # 基本信息
    item['product_id'] = str(product_data.get('id'))
    item['title'] = product_data.get('title')
    item['brand'] = product_data.get('vendor')
    item['category'] = product_data.get('product_type')

    # 描述信息
    item['description'] = product_data.get('body_html')

    # 图片信息
    images = []
    for image in product_data.get('images', []):
        images.append(image.get('src'))
    item['images'] = images

    # 变体信息(价格等)
    variants = product_data.get('variants', [])
    if variants:
        # 取第一个变体的价格
        first_variant = variants[0]
        item['current_price'] = float(first_variant.get('price', 0))
        item['original_price'] = float(first_variant.get('compare_at_price', 0)) if first_variant.get('compare_at_price') else None
        item['stock_quantity'] = first_variant.get('inventory_quantity')

    # 元数据
    item['source_url'] = f"{self.base_url}/products/{product_data.get('handle')}"
    item['source_site'] = 'shopify'
    item['crawl_time'] = self.get_current_time()

    return item

def get_current_time(self):
    """
    获取当前时间
    """
    from datetime import datetime
    return datetime.now().isoformat()

”’

    print("📄 Shopify爬虫代码:")
    print(shopify_spider_code)

演示爬虫实现

spider_implementation = EcommerceSpiderImplementation() spider_implementation.demonstrate_amazon_spider() spider_implementation.demonstrate_shopify_spider()

print(“电商爬虫实现演示完成!”)

2.3 数据处理管道

”`python

5. 数据处理管道

print(“\n🔄 数据处理管道:”)

class EcommercePipelines: “”” 电商数据处理管道 “””

def demonstrate_validation_pipeline(self):
    """
    演示数据验证管道
    """
    print("\n✅ 数据验证管道:")

    validation_pipeline_code = '''

import logging from scrapy.exceptions import DropItem from ..items import ProductItem, ReviewItem from ..utils.validators import DataValidator

class ValidationPipeline: “”” 数据验证管道 “””

def __init__(self):
    self.validator = DataValidator()
    self.stats = {
        'total_items': 0,
        'valid_items': 0,
        'invalid_items': 0,
        'validation_errors': {}
    }

def process_item(self, item, spider):
    """
    处理数据项
    """
    self.stats['total_items'] += 1

    # 根据数据类型选择验证方法
    if isinstance(item, ProductItem):
        is_valid, errors = self.validate_product(item)
    elif isinstance(item, ReviewItem):
        is_valid, errors = self.validate_review(item)
    else:
        is_valid, errors = True, []

    if is_valid:
        self.stats['valid_items'] += 1
        return item
    else:
        self.stats['invalid_items'] += 1
        self.record_validation_errors(errors)
        raise DropItem(f"Invalid item: {errors}")

def validate_product(self, item):
    """
    验证商品数据
    """
    errors = []

    # 必填字段检查
    required_fields = ['product_id', 'title', 'source_url']
    for field in required_fields:
        if not item.get(field):
            errors.append(f"Missing required field: {field}")

    # 价格验证
    if item.get('current_price'):
        try:
            price = float(item['current_price'])
            if price <= 0:
                errors.append("Price must be positive")
        except (ValueError, TypeError):
            errors.append("Invalid price format")

    # URL验证
    if item.get('source_url'):
        if not item['source_url'].startswith(('http://', 'https://')):
            errors.append("Invalid URL format")

    # 评分验证
    if item.get('rating'):
        try:
            rating = float(item['rating'])
            if not 0 <= rating <= 5:
                errors.append("Rating must be between 0 and 5")
        except (ValueError, TypeError):
            errors.append("Invalid rating format")

    return len(errors) == 0, errors

def validate_review(self, item):
    """
    验证评论数据
    """
    errors = []

    # 必填字段检查
    required_fields = ['review_id', 'product_id', 'content']
    for field in required_fields:
        if not item.get(field):
            errors.append(f"Missing required field: {field}")

    # 评分验证
    if item.get('rating'):
        try:
            rating = float(item['rating'])
            if not 1 <= rating <= 5:
                errors.append("Review rating must be between 1 and 5")
        except (ValueError, TypeError):
            errors.append("Invalid rating format")

    # 内容长度验证
    if item.get('content'):
        if len(item['content']) < 10:
            errors.append("Review content too short")
        if len(item['content']) > 10000:
            errors.append("Review content too long")

    return len(errors) == 0, errors

def record_validation_errors(self, errors):
    """
    记录验证错误
    """
    for error in errors:
        if error not in self.stats['validation_errors']:
            self.stats['validation_errors'][error] = 0
        self.stats['validation_errors'][error] += 1

def close_spider(self, spider):
    """
    爬虫关闭时的统计
    """
    spider.logger.info(f"Validation Statistics:")
    spider.logger.info(f"Total items: {self.stats['total_items']}")
    spider.logger.info(f"Valid items: {self.stats['valid_items']}")
    spider.logger.info(f"Invalid items: {self.stats['invalid_items']}")
    spider.logger.info(f"Validation errors: {self.stats['validation_errors']}")

”’

    print("📄 数据验证管道代码:")
    print(validation_pipeline_code)

def demonstrate_cleaning_pipeline(self):
    """
    演示数据清洗管道
    """
    print("\n🧹 数据清洗管道:")

    cleaning_pipeline_code = '''

import re import html from datetime import datetime from scrapy import signals from ..utils.helpers import clean_text, clean_price

class DataCleaningPipeline: “”” 数据清洗管道 “””

def __init__(self):
    self.cleaning_stats = {
        'items_processed': 0,
        'fields_cleaned': 0,
        'cleaning_operations': {}
    }

def process_item(self, item, spider):
    """
    清洗数据项
    """
    self.cleaning_stats['items_processed'] += 1

    # 清洗文本字段
    text_fields = ['title', 'description', 'brand', 'category']
    for field in text_fields:
        if item.get(field):
            original_value = item[field]
            cleaned_value = self.clean_text_field(original_value)
            if cleaned_value != original_value:
                item[field] = cleaned_value
                self.record_cleaning_operation('text_cleaning', field)

    # 清洗价格字段
    price_fields = ['current_price', 'original_price']
    for field in price_fields:
        if item.get(field):
            original_value = item[field]
            cleaned_value = self.clean_price_field(original_value)
            if cleaned_value != original_value:
                item[field] = cleaned_value
                self.record_cleaning_operation('price_cleaning', field)

    # 清洗数值字段
    numeric_fields = ['rating', 'review_count', 'stock_quantity']
    for field in numeric_fields:
        if item.get(field):
            original_value = item[field]
            cleaned_value = self.clean_numeric_field(original_value)
            if cleaned_value != original_value:
                item[field] = cleaned_value
                self.record_cleaning_operation('numeric_cleaning', field)

    # 清洗URL字段
    url_fields = ['source_url']
    for field in url_fields:
        if item.get(field):
            original_value = item[field]
            cleaned_value = self.clean_url_field(original_value)
            if cleaned_value != original_value:
                item[field] = cleaned_value
                self.record_cleaning_operation('url_cleaning', field)

    # 清洗列表字段
    list_fields = ['images', 'categories']
    for field in list_fields:
        if item.get(field):
            original_value = item[field]
            cleaned_value = self.clean_list_field(original_value)
            if cleaned_value != original_value:
                item[field] = cleaned_value
                self.record_cleaning_operation('list_cleaning', field)

    return item

def clean_text_field(self, text):
    """
    清洗文本字段
    """
    if not text:
        return ""

    # 转换为字符串
    text = str(text)

    # 解码HTML实体
    text = html.unescape(text)

    # 移除HTML标签
    text = re.sub(r'<[^>]+>', '', text)

    # 标准化空白字符
    text = re.sub(r'\\s+', ' ', text)

    # 移除首尾空白
    text = text.strip()

    # 移除控制字符
    text = re.sub(r'[\\x00-\\x1f\\x7f-\\x9f]', '', text)

    return text

def clean_price_field(self, price):
    """
    清洗价格字段
    """
    if not price:
        return None

    # 如果已经是数字,直接返回
    if isinstance(price, (int, float)):
        return float(price) if price >= 0 else None

    # 转换为字符串并清洗
    price_str = str(price)

    # 移除货币符号和其他非数字字符
    cleaned = re.sub(r'[^\\d.,]', '', price_str)

    if not cleaned:
        return None

    # 处理千分位分隔符
    if ',' in cleaned and '.' in cleaned:
        # 判断哪个是小数点
        comma_pos = cleaned.rfind(',')
        dot_pos = cleaned.rfind('.')

        if dot_pos > comma_pos:
            # 点是小数点,逗号是千分位
            cleaned = cleaned.replace(',', '')
        else:
            # 逗号是小数点,点是千分位
            cleaned = cleaned.replace('.', '').replace(',', '.')
    elif ',' in cleaned:
        # 只有逗号,判断是否为小数点
        parts = cleaned.split(',')
        if len(parts) == 2 and len(parts[1]) <= 2:
            # 可能是小数点
            cleaned = cleaned.replace(',', '.')
        else:
            # 千分位分隔符
            cleaned = cleaned.replace(',', '')

    try:
        return float(cleaned)
    except ValueError:
        return None

def clean_numeric_field(self, value):
    """
    清洗数值字段
    """
    if not value:
        return None

    # 如果已经是数字,直接返回
    if isinstance(value, (int, float)):
        return value

    # 提取数字
    numeric_str = re.sub(r'[^\\d.,]', '', str(value))

    if not numeric_str:
        return None

    try:
        # 移除千分位分隔符
        numeric_str = numeric_str.replace(',', '')
        return float(numeric_str)
    except ValueError:
        return None

def clean_url_field(self, url):
    """
    清洗URL字段
    """
    if not url:
        return ""

    url = str(url).strip()

    # 移除多余的空白字符
    url = re.sub(r'\\s+', '', url)

    # 确保URL有协议
    if url and not url.startswith(('http://', 'https://')):
        if url.startswith('//'):
            url = 'https:' + url
        elif url.startswith('/'):
            # 相对URL,需要基础URL
            pass
        else:
            url = 'https://' + url

    return url

def clean_list_field(self, value):
    """
    清洗列表字段
    """
    if not value:
        return []

    # 如果不是列表,尝试转换
    if not isinstance(value, list):
        if isinstance(value, str):
            # 尝试按逗号分割
            value = [item.strip() for item in value.split(',')]
        else:
            value = [value]

    # 清洗列表中的每个元素
    cleaned_list = []
    for item in value:
        if item:
            cleaned_item = self.clean_text_field(str(item))
            if cleaned_item:
                cleaned_list.append(cleaned_item)

    return cleaned_list

def record_cleaning_operation(self, operation_type, field):
    """
    记录清洗操作
    """
    self.cleaning_stats['fields_cleaned'] += 1

    if operation_type not in self.cleaning_stats['cleaning_operations']:
        self.cleaning_stats['cleaning_operations'][operation_type] = {}

    if field not in self.cleaning_stats['cleaning_operations'][operation_type]:
        self.cleaning_stats['cleaning_operations'][operation_type][field] = 0

    self.cleaning_stats['cleaning_operations'][operation_type][field] += 1

def close_spider(self, spider):
    """
    爬虫关闭时的统计
    """
    spider.logger.info(f"Data Cleaning Statistics:")
    spider.logger.info(f"Items processed: {self.cleaning_stats['items_processed']}")
    spider.logger.info(f"Fields cleaned: {self.cleaning_stats['fields_cleaned']}")
    spider.logger.info(f"Cleaning operations: {self.cleaning_stats['cleaning_operations']}")

”’

    print("📄 数据清洗管道代码:")
    print(cleaning_pipeline_code)

def demonstrate_storage_pipeline(self):
    """
    演示数据存储管道
    """
    print("\n💾 数据存储管道:")

    storage_pipeline_code = '''

import json import pymongo import redis from elasticsearch import Elasticsearch from datetime import datetime from scrapy.exceptions import DropItem

class MultiStoragePipeline: “”” 多存储后端管道 “””

def __init__(self, mongo_uri, mongo_db, redis_host, redis_port, es_host, es_port):
    self.mongo_uri = mongo_uri
    self.mongo_db = mongo_db
    self.redis_host = redis_host
    self.redis_port = redis_port
    self.es_host = es_host
    self.es_port = es_port

    self.storage_stats = {
        'mongo_items': 0,
        'redis_items': 0,
        'es_items': 0,
        'storage_errors': 0
    }

@classmethod
def from_crawler(cls, crawler):
    """
    从爬虫配置创建管道实例
    """
    return cls(
        mongo_uri=crawler.settings.get("MONGO_URI"),
        mongo_db=crawler.settings.get("MONGO_DATABASE"),
        redis_host=crawler.settings.get("REDIS_HOST"),
        redis_port=crawler.settings.get("REDIS_PORT"),
        es_host=crawler.settings.get("ELASTICSEARCH_HOST"),
        es_port=crawler.settings.get("ELASTICSEARCH_PORT"),
    )

def open_spider(self, spider):
    """
    爬虫开始时初始化连接
    """
    # MongoDB连接
    try:
        self.mongo_client = pymongo.MongoClient(self.mongo_uri)
        self.mongo_database = self.mongo_client[self.mongo_db]
        spider.logger.info("MongoDB connection established")
    except Exception as e:
        spider.logger.error(f"Failed to connect to MongoDB: {e}")
        self.mongo_client = None

    # Redis连接
    try:
        self.redis_client = redis.Redis(
            host=self.redis_host,
            port=self.redis_port,
            decode_responses=True
        )
        self.redis_client.ping()
        spider.logger.info("Redis connection established")
    except Exception as e:
        spider.logger.error(f"Failed to connect to Redis: {e}")
        self.redis_client = None

    # Elasticsearch连接
    try:
        self.es_client = Elasticsearch([{
            'host': self.es_host,
            'port': self.es_port
        }])
        spider.logger.info("Elasticsearch connection established")
    except Exception as e:
        spider.logger.error(f"Failed to connect to Elasticsearch: {e}")
        self.es_client = None

def close_spider(self, spider):
    """
    爬虫结束时关闭连接
    """
    if self.mongo_client:
        self.mongo_client.close()

    spider.logger.info(f"Storage Statistics:")
    spider.logger.info(f"MongoDB items: {self.storage_stats['mongo_items']}")
    spider.logger.info(f"Redis items: {self.storage_stats['redis_items']}")
    spider.logger.info(f"Elasticsearch items: {self.storage_stats['es_items']}")
    spider.logger.info(f"Storage errors: {self.storage_stats['storage_errors']}")

def process_item(self, item, spider):
    """
    处理数据项
    """
    try:
        # 存储到MongoDB
        if self.mongo_client:
            self.store_to_mongo(item, spider)

        # 存储到Redis
        if self.redis_client:
            self.store_to_redis(item, spider)

        # 存储到Elasticsearch
        if self.es_client:
            self.store_to_elasticsearch(item, spider)

        return item

    except Exception as e:
        self.storage_stats['storage_errors'] += 1
        spider.logger.error(f"Storage error: {e}")
        raise DropItem(f"Error storing item: {e}")

def store_to_mongo(self, item, spider):
    """
    存储到MongoDB
    """
    try:
        # 根据数据类型选择集合
        if hasattr(item, '__class__'):
            collection_name = item.__class__.__name__.lower().replace('item', 's')
        else:
            collection_name = 'items'

        collection = self.mongo_database[collection_name]

        # 转换为字典
        item_dict = dict(item)
        item_dict['_created_at'] = datetime.utcnow()

        # 使用upsert避免重复
        if 'product_id' in item_dict:
            filter_dict = {'product_id': item_dict['product_id']}
            collection.update_one(
                filter_dict,
                {'$set': item_dict},
                upsert=True
            )
        else:
            collection.insert_one(item_dict)

        self.storage_stats['mongo_items'] += 1

    except Exception as e:
        spider.logger.error(f"MongoDB storage error: {e}")
        raise

def store_to_redis(self, item, spider):
    """
    存储到Redis
    """
    try:
        # 生成Redis键
        if 'product_id' in item:
            key = f"product:{item['product_id']}"
        elif 'review_id' in item:
            key = f"review:{item['review_id']}"
        else:
            key = f"item:{hash(str(item))}"

        # 存储JSON数据
        item_json = json.dumps(dict(item), default=str)
        self.redis_client.setex(key, 86400, item_json)  # 24小时过期

        # 添加到集合
        if 'product_id' in item:
            self.redis_client.sadd('products', item['product_id'])

        self.storage_stats['redis_items'] += 1

    except Exception as e:
        spider.logger.error(f"Redis storage error: {e}")
        raise

def store_to_elasticsearch(self, item, spider):
    """
    存储到Elasticsearch
    """
    try:
        # 确定索引名称
        if hasattr(item, '__class__'):
            index_name = item.__class__.__name__.lower().replace('item', 's')
        else:
            index_name = 'items'

        # 生成文档ID
        if 'product_id' in item:
            doc_id = item['product_id']
        elif 'review_id' in item:
            doc_id = item['review_id']
        else:
            doc_id = None

        # 准备文档
        doc = dict(item)
        doc['@timestamp'] = datetime.utcnow().isoformat()

        # 索引文档
        self.es_client.index(
            index=index_name,
            id=doc_id,
            body=doc
        )

        self.storage_stats['es_items'] += 1

    except Exception as e:
        spider.logger.error(f"Elasticsearch storage error: {e}")
        raise

class FileStoragePipeline: “”” 文件存储管道 “””

def __init__(self, file_path):
    self.file_path = file_path
    self.files = {}

@classmethod
def from_crawler(cls, crawler):
    return cls(
        file_path=crawler.settings.get("FILES_STORE", "data")
    )

def open_spider(self, spider):
    """
    打开文件
    """
    import os
    os.makedirs(self.file_path, exist_ok=True)

def close_spider(self, spider):
    """
    关闭文件
    """
    for file_handle in self.files.values():
        file_handle.close()

def process_item(self, item, spider):
    """
    写入文件
    """
    # 根据数据类型确定文件名
    if hasattr(item, '__class__'):
        filename = f"{item.__class__.__name__.lower()}.jsonl"
    else:
        filename = "items.jsonl"

    filepath = f"{self.file_path}/{filename}"

    # 打开文件(如果尚未打开)
    if filepath not in self.files:
        self.files[filepath] = open(filepath, 'a', encoding='utf-8')

    # 写入数据
    line = json.dumps(dict(item), ensure_ascii=False, default=str) + '\\n'
    self.files[filepath].write(line)
    self.files[filepath].flush()

    return item

”’

    print("📄 数据存储管道代码:")
    print(storage_pipeline_code)

演示数据处理管道

pipelines_demo = EcommercePipelines() pipelines_demo.demonstrate_validation_pipeline() pipelines_demo.demonstrate_cleaning_pipeline() pipelines_demo.demonstrate_storage_pipeline()

print(“数据处理管道演示完成!”)

def demonstrate_processing_pipeline(self):
    """
    演示数据处理管道
    """
    print("\n🔧 数据处理管道:")

    processing_code = '''

import re from textblob import TextBlob from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.cluster import KMeans import jieba import jieba.analyse

class NewsProcessingPipeline: “”” 新闻数据处理管道 “””

def __init__(self):
    # 初始化分类器
    self.category_keywords = {
        'technology': ['tech', 'ai', 'software', 'computer', 'digital', 'internet'],
        'business': ['business', 'economy', 'finance', 'market', 'company', 'stock'],
        'politics': ['politics', 'government', 'election', 'policy', 'law', 'congress'],
        'sports': ['sports', 'game', 'team', 'player', 'match', 'championship'],
        'health': ['health', 'medical', 'doctor', 'hospital', 'disease', 'treatment'],
        'entertainment': ['movie', 'music', 'celebrity', 'entertainment', 'show', 'film']
    }

    # 初始化TF-IDF向量化器
    self.tfidf_vectorizer = TfidfVectorizer(
        max_features=1000,
        stop_words='english',
        ngram_range=(1, 2)
    )

    # 情感分析缓存
    self.sentiment_cache = {}

def process_item(self, item, spider):
    """
    处理数据项
    """
    try:
        # 分类识别
        item['category'] = self.classify_article(item)

        # 关键词提取
        item['keywords'] = self.extract_keywords(item)

        # 标签生成
        item['tags'] = self.generate_tags(item)

        # 情感分析
        sentiment_result = self.analyze_sentiment(item.get('content', ''))
        item['sentiment'] = sentiment_result['sentiment']
        item['sentiment_score'] = sentiment_result['score']

        # 语言检测
        if not item.get('language'):
            item['language'] = self.detect_language(item.get('content', ''))

        # 内容质量评分
        item['quality_score'] = self.calculate_quality_score(item)

        spider.logger.info(f"Processed article: {item.get('title', '')}")
        return item

    except Exception as e:
        spider.logger.error(f"Processing error: {e}")
        return item

def classify_article(self, item):
    """
    文章分类
    """
    title = item.get('title', '').lower()
    content = item.get('content', '').lower()
    text = f"{title} {content}"

    # 基于关键词的分类
    category_scores = {}

    for category, keywords in self.category_keywords.items():
        score = 0
        for keyword in keywords:
            # 标题中的关键词权重更高
            score += title.count(keyword) * 3
            score += content.count(keyword)

        category_scores[category] = score

    # 返回得分最高的分类
    if category_scores:
        best_category = max(category_scores, key=category_scores.get)
        if category_scores[best_category] > 0:
            return best_category

    return 'general'

def extract_keywords(self, item):
    """
    提取关键词
    """
    content = item.get('content', '')
    language = item.get('language', 'en')

    try:
        if language == 'zh':
            # 中文关键词提取
            keywords = jieba.analyse.extract_tags(
                content, 
                topK=10, 
                withWeight=False
            )
        else:
            # 英文关键词提取
            keywords = self.extract_english_keywords(content)

        return keywords[:10]  # 最多返回10个关键词

    except Exception as e:
        return []

def extract_english_keywords(self, text):
    """
    提取英文关键词
    """
    try:
        # 清理文本
        text = re.sub(r'[^a-zA-Z\\s]', '', text)
        text = text.lower()

        # 使用TF-IDF提取关键词
        tfidf_matrix = self.tfidf_vectorizer.fit_transform([text])
        feature_names = self.tfidf_vectorizer.get_feature_names_out()
        tfidf_scores = tfidf_matrix.toarray()[0]

        # 获取得分最高的词
        keyword_scores = list(zip(feature_names, tfidf_scores))
        keyword_scores.sort(key=lambda x: x[1], reverse=True)

        return [keyword for keyword, score in keyword_scores[:10] if score > 0]

    except Exception:
        # 简单的关键词提取
        words = text.split()
        word_freq = {}
        for word in words:
            if len(word) > 3:  # 只考虑长度大于3的词
                word_freq[word] = word_freq.get(word, 0) + 1

        # 返回频率最高的词
        sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
        return [word for word, freq in sorted_words[:10]]

def generate_tags(self, item):
    """
    生成标签
    """
    tags = []

    # 基于分类的标签
    category = item.get('category', '')
    if category and category != 'general':
        tags.append(category)

    # 基于关键词的标签
    keywords = item.get('keywords', [])
    tags.extend(keywords[:5])  # 取前5个关键词作为标签

    # 基于来源的标签
    source = item.get('source', '')
    if source:
        tags.append(f"source:{source}")

    # 基于语言的标签
    language = item.get('language', '')
    if language:
        tags.append(f"lang:{language}")

    # 去重并返回
    return list(set(tags))

def analyze_sentiment(self, text):
    """
    情感分析
    """
    if not text:
        return {'sentiment': 'neutral', 'score': 0.0}

    # 检查缓存
    text_hash = hash(text)
    if text_hash in self.sentiment_cache:
        return self.sentiment_cache[text_hash]

    try:
        # 使用TextBlob进行情感分析
        blob = TextBlob(text)
        polarity = blob.sentiment.polarity

        # 确定情感类别
        if polarity > 0.1:
            sentiment = 'positive'
        elif polarity < -0.1:
            sentiment = 'negative'
        else:
            sentiment = 'neutral'

        result = {
            'sentiment': sentiment,
            'score': round(polarity, 3)
        }

        # 缓存结果
        self.sentiment_cache[text_hash] = result
        return result

    except Exception:
        return {'sentiment': 'neutral', 'score': 0.0}

def detect_language(self, text):
    """
    检测语言
    """
    if not text:
        return 'unknown'

    try:
        from langdetect import detect
        return detect(text)
    except:
        # 简单的语言检测
        chinese_chars = len([c for c in text if '\\u4e00' <= c <= '\\u9fff'])
        if chinese_chars > len(text) * 0.3:
            return 'zh'
        else:
            return 'en'

def calculate_quality_score(self, item):
    """
    计算内容质量评分
    """
    score = 0.0

    # 标题质量 (20%)
    title = item.get('title', '')
    if title:
        if 10 <= len(title) <= 100:
            score += 0.2
        elif len(title) > 100:
            score += 0.1

    # 内容长度 (30%)
    content = item.get('content', '')
    if content:
        word_count = len(content.split())
        if word_count >= 300:
            score += 0.3
        elif word_count >= 100:
            score += 0.2
        elif word_count >= 50:
            score += 0.1

    # 作者信息 (10%)
    if item.get('author'):
        score += 0.1

    # 图片数量 (10%)
    images = item.get('images', [])
    if len(images) >= 3:
        score += 0.1
    elif len(images) >= 1:
        score += 0.05

    # 关键词数量 (15%)
    keywords = item.get('keywords', [])
    if len(keywords) >= 5:
        score += 0.15
    elif len(keywords) >= 3:
        score += 0.1
    elif len(keywords) >= 1:
        score += 0.05

    # 分类明确性 (15%)
    category = item.get('category', '')
    if category and category != 'general':
        score += 0.15

    return round(min(score, 1.0), 2)

”’

    print("📄 数据处理管道代码:")
    print(processing_code)

def demonstrate_storage_pipeline(self):
    """
    演示存储管道
    """
    print("\n💾 存储管道:")

    storage_code = '''

import json import pymongo import elasticsearch from datetime import datetime

class NewsStoragePipeline: “”” 新闻存储管道 “””

def __init__(self, mongo_uri='mongodb://localhost:27017/', 
             mongo_db='news_db', es_host='localhost:9200'):
    # MongoDB配置
    self.mongo_client = pymongo.MongoClient(mongo_uri)
    self.mongo_db = self.mongo_client[mongo_db]
    self.news_collection = self.mongo_db['news']

    # Elasticsearch配置
    try:
        self.es_client = elasticsearch.Elasticsearch([es_host])
        self.es_index = 'news_index'
        self.setup_elasticsearch_mapping()
    except Exception as e:
        self.es_client = None
        print(f"Elasticsearch connection failed: {e}")

@classmethod
def from_crawler(cls, crawler):
    """
    从爬虫配置创建实例
    """
    settings = crawler.settings
    return cls(
        mongo_uri=settings.get('MONGO_URI', 'mongodb://localhost:27017/'),
        mongo_db=settings.get('MONGO_DB', 'news_db'),
        es_host=settings.get('ELASTICSEARCH_HOST', 'localhost:9200')
    )

def setup_elasticsearch_mapping(self):
    """
    设置Elasticsearch映射
    """
    if not self.es_client:
        return

    mapping = {
        "mappings": {
            "properties": {
                "article_id": {"type": "keyword"},
                "title": {
                    "type": "text",
                    "analyzer": "standard",
                    "fields": {
                        "keyword": {"type": "keyword"}
                    }
                },
                "content": {
                    "type": "text",
                    "analyzer": "standard"
                },
                "summary": {
                    "type": "text",
                    "analyzer": "standard"
                },
                "source": {"type": "keyword"},
                "source_url": {"type": "keyword"},
                "author": {"type": "keyword"},
                "category": {"type": "keyword"},
                "keywords": {"type": "keyword"},
                "tags": {"type": "keyword"},
                "language": {"type": "keyword"},
                "sentiment": {"type": "keyword"},
                "sentiment_score": {"type": "float"},
                "quality_score": {"type": "float"},
                "publish_time": {"type": "date"},
                "crawl_time": {"type": "date"},
                "word_count": {"type": "integer"},
                "reading_time": {"type": "integer"},
                "view_count": {"type": "integer"},
                "comment_count": {"type": "integer"},
                "share_count": {"type": "integer"}
            }
        }
    }

    try:
        if not self.es_client.indices.exists(index=self.es_index):
            self.es_client.indices.create(index=self.es_index, body=mapping)
    except Exception as e:
        print(f"Failed to create Elasticsearch mapping: {e}")

def process_item(self, item, spider):
    """
    处理数据项
    """
    try:
        # 转换为字典
        item_dict = dict(item)

        # 存储到MongoDB
        self.store_to_mongodb(item_dict, spider)

        # 存储到Elasticsearch
        if self.es_client:
            self.store_to_elasticsearch(item_dict, spider)

        # 存储到文件(可选)
        self.store_to_file(item_dict, spider)

        spider.logger.info(f"Stored article: {item_dict.get('title', '')}")
        return item

    except Exception as e:
        spider.logger.error(f"Storage error: {e}")
        return item

def store_to_mongodb(self, item_dict, spider):
    """
    存储到MongoDB
    """
    try:
        # 检查是否已存在
        existing = self.news_collection.find_one({
            'article_id': item_dict.get('article_id')
        })

        if existing:
            # 更新现有记录
            self.news_collection.update_one(
                {'article_id': item_dict.get('article_id')},
                {'$set': item_dict}
            )
            spider.logger.info(f"Updated article in MongoDB: {item_dict.get('title', '')}")
        else:
            # 插入新记录
            self.news_collection.insert_one(item_dict)
            spider.logger.info(f"Inserted article to MongoDB: {item_dict.get('title', '')}")

    except Exception as e:
        spider.logger.error(f"MongoDB storage error: {e}")

def store_to_elasticsearch(self, item_dict, spider):
    """
    存储到Elasticsearch
    """
    try:
        # 准备文档
        doc = {
            'article_id': item_dict.get('article_id'),
            'title': item_dict.get('title'),
            'content': item_dict.get('content'),
            'summary': item_dict.get('summary'),
            'source': item_dict.get('source'),
            'source_url': item_dict.get('source_url'),
            'author': item_dict.get('author'),
            'category': item_dict.get('category'),
            'keywords': item_dict.get('keywords', []),
            'tags': item_dict.get('tags', []),
            'language': item_dict.get('language'),
            'sentiment': item_dict.get('sentiment'),
            'sentiment_score': item_dict.get('sentiment_score', 0.0),
            'quality_score': item_dict.get('quality_score', 0.0),
            'publish_time': item_dict.get('publish_time'),
            'crawl_time': item_dict.get('crawl_time'),
            'word_count': item_dict.get('word_count', 0),
            'reading_time': item_dict.get('reading_time', 0),
            'view_count': item_dict.get('view_count', 0),
            'comment_count': item_dict.get('comment_count', 0),
            'share_count': item_dict.get('share_count', 0)
        }

        # 索引文档
        self.es_client.index(
            index=self.es_index,
            id=item_dict.get('article_id'),
            body=doc
        )

        spider.logger.info(f"Indexed article to Elasticsearch: {item_dict.get('title', '')}")

    except Exception as e:
        spider.logger.error(f"Elasticsearch storage error: {e}")

def store_to_file(self, item_dict, spider):
    """
    存储到文件(JSON格式)
    """
    try:
        # 准备文件名
        date_str = datetime.now().strftime('%Y-%m-%d')
        filename = f"news_{date_str}.jsonl"

        # 序列化时间对象
        serializable_item = {}
        for key, value in item_dict.items():
            if isinstance(value, datetime):
                serializable_item[key] = value.isoformat()
            else:
                serializable_item[key] = value

        # 写入文件
        with open(filename, 'a', encoding='utf-8') as f:
            f.write(json.dumps(serializable_item, ensure_ascii=False) + '\\n')

    except Exception as e:
        spider.logger.error(f"File storage error: {e}")

def close_spider(self, spider):
    """
    爬虫关闭时的清理工作
    """
    try:
        if self.mongo_client:
            self.mongo_client.close()

        spider.logger.info("Storage pipeline closed successfully")

    except Exception as e:
        spider.logger.error(f"Error closing storage pipeline: {e}")

”’

    print("📄 存储管道代码:")
    print(storage_code)

继续演示数据处理管道

pipeline_demo.demonstrate_processing_pipeline() pipeline_demo.demonstrate_storage_pipeline()

print(“完整数据处理管道演示完成!”)

4. 数据分析与可视化

”`python

10. 数据分析与可视化

print(“\n📊 数据分析与可视化:”)

class NewsDataAnalysis: “”” 新闻数据分析 “””

def __init__(self, mongo_uri='mongodb://localhost:27017/', mongo_db='news_db'):
    self.mongo_client = pymongo.MongoClient(mongo_uri)
    self.mongo_db = self.mongo_client[mongo_db]
    self.news_collection = self.mongo_db['news']

def demonstrate_basic_analysis(self):
    """
    演示基础数据分析
    """
    print("\n📈 基础数据分析:")

    analysis_code = '''

import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from collections import Counter from datetime import datetime, timedelta import numpy as np

class BasicNewsAnalysis: “”” 基础新闻数据分析 “””

def __init__(self, news_collection):
    self.collection = news_collection

def get_basic_statistics(self):
    """
    获取基础统计信息
    """
    print("📊 基础统计信息:")

    # 总文章数
    total_articles = self.collection.count_documents({})
    print(f"总文章数: {total_articles}")

    # 按来源统计
    source_stats = list(self.collection.aggregate([
        {"$group": {"_id": "$source", "count": {"$sum": 1}}},
        {"$sort": {"count": -1}},
        {"$limit": 10}
    ]))

    print("\\n📰 按来源统计 (Top 10):")
    for stat in source_stats:
        print(f"  {stat['_id']}: {stat['count']} 篇")

    # 按分类统计
    category_stats = list(self.collection.aggregate([
        {"$group": {"_id": "$category", "count": {"$sum": 1}}},
        {"$sort": {"count": -1}}
    ]))

    print("\\n🏷️ 按分类统计:")
    for stat in category_stats:
        print(f"  {stat['_id']}: {stat['count']} 篇")

    # 按语言统计
    language_stats = list(self.collection.aggregate([
        {"$group": {"_id": "$language", "count": {"$sum": 1}}},
        {"$sort": {"count": -1}}
    ]))

    print("\\n🌍 按语言统计:")
    for stat in language_stats:
        print(f"  {stat['_id']}: {stat['count']} 篇")

    return {
        'total_articles': total_articles,
        'source_stats': source_stats,
        'category_stats': category_stats,
        'language_stats': language_stats
    }

def analyze_time_trends(self):
    """
    分析时间趋势
    """
    print("\\n⏰ 时间趋势分析:")

    # 按日期统计
    daily_stats = list(self.collection.aggregate([
        {
            "$group": {
                "_id": {
                    "$dateToString": {
                        "format": "%Y-%m-%d",
                        "date": "$publish_time"
                    }
                },
                "count": {"$sum": 1}
            }
        },
        {"$sort": {"_id": 1}},
        {"$limit": 30}  # 最近30天
    ]))

    print("📅 每日发布量 (最近30天):")
    for stat in daily_stats[-10:]:  # 显示最近10天
        print(f"  {stat['_id']}: {stat['count']} 篇")

    # 按小时统计
    hourly_stats = list(self.collection.aggregate([
        {
            "$group": {
                "_id": {"$hour": "$publish_time"},
                "count": {"$sum": 1}
            }
        },
        {"$sort": {"_id": 1}}
    ]))

    print("\\n🕐 按小时统计:")
    for stat in hourly_stats:
        hour = stat['_id']
        count = stat['count']
        print(f"  {hour:02d}:00 - {count} 篇")

    return {
        'daily_stats': daily_stats,
        'hourly_stats': hourly_stats
    }

def analyze_content_quality(self):
    """
    分析内容质量
    """
    print("\\n⭐ 内容质量分析:")

    # 质量评分分布
    quality_stats = list(self.collection.aggregate([
        {
            "$bucket": {
                "groupBy": "$quality_score",
                "boundaries": [0, 0.2, 0.4, 0.6, 0.8, 1.0],
                "default": "其他",
                "output": {"count": {"$sum": 1}}
            }
        }
    ]))

    print("📊 质量评分分布:")
    quality_labels = ["0-0.2", "0.2-0.4", "0.4-0.6", "0.6-0.8", "0.8-1.0"]
    for i, stat in enumerate(quality_stats):
        if i < len(quality_labels):
            print(f"  {quality_labels[i]}: {stat['count']} 篇")

    # 平均质量评分
    avg_quality = list(self.collection.aggregate([
        {"$group": {"_id": None, "avg_quality": {"$avg": "$quality_score"}}}
    ]))

    if avg_quality:
        print(f"\\n📈 平均质量评分: {avg_quality[0]['avg_quality']:.3f}")

    # 字数统计
    word_count_stats = list(self.collection.aggregate([
        {
            "$group": {
                "_id": None,
                "avg_words": {"$avg": "$word_count"},
                "min_words": {"$min": "$word_count"},
                "max_words": {"$max": "$word_count"}
            }
        }
    ]))

    if word_count_stats:
        stats = word_count_stats[0]
        print(f"\\n📝 字数统计:")
        print(f"  平均字数: {stats['avg_words']:.0f}")
        print(f"  最少字数: {stats['min_words']}")
        print(f"  最多字数: {stats['max_words']}")

    return {
        'quality_stats': quality_stats,
        'avg_quality': avg_quality[0]['avg_quality'] if avg_quality else 0,
        'word_count_stats': word_count_stats[0] if word_count_stats else {}
    }

def analyze_sentiment_distribution(self):
    """
    分析情感分布
    """
    print("\\n😊 情感分析:")

    # 情感分布
    sentiment_stats = list(self.collection.aggregate([
        {"$group": {"_id": "$sentiment", "count": {"$sum": 1}}},
        {"$sort": {"count": -1}}
    ]))

    print("💭 情感分布:")
    for stat in sentiment_stats:
        print(f"  {stat['_id']}: {stat['count']} 篇")

    # 按分类的情感分布
    category_sentiment = list(self.collection.aggregate([
        {
            "$group": {
                "_id": {"category": "$category", "sentiment": "$sentiment"},
                "count": {"$sum": 1}
            }
        },
        {"$sort": {"_id.category": 1, "count": -1}}
    ]))

    print("\\n📊 按分类的情感分布:")
    current_category = None
    for stat in category_sentiment:
        category = stat['_id']['category']
        sentiment = stat['_id']['sentiment']
        count = stat['count']

        if category != current_category:
            print(f"\\n  {category}:")
            current_category = category

        print(f"    {sentiment}: {count} 篇")

    return {
        'sentiment_stats': sentiment_stats,
        'category_sentiment': category_sentiment
    }

def get_top_keywords(self, limit=20):
    """
    获取热门关键词
    """
    print(f"\\n🔥 热门关键词 (Top {limit}):")

    # 聚合所有关键词
    pipeline = [
        {"$unwind": "$keywords"},
        {"$group": {"_id": "$keywords", "count": {"$sum": 1}}},
        {"$sort": {"count": -1}},
        {"$limit": limit}
    ]

    top_keywords = list(self.collection.aggregate(pipeline))

    for i, keyword_stat in enumerate(top_keywords, 1):
        keyword = keyword_stat['_id']
        count = keyword_stat['count']
        print(f"  {i:2d}. {keyword}: {count} 次")

    return top_keywords

def generate_summary_report(self):
    """
    生成汇总报告
    """
    print("\\n📋 数据汇总报告:")
    print("=" * 50)

    # 获取各项统计
    basic_stats = self.get_basic_statistics()
    time_trends = self.analyze_time_trends()
    quality_analysis = self.analyze_content_quality()
    sentiment_analysis = self.analyze_sentiment_distribution()
    top_keywords = self.get_top_keywords(10)

    # 生成报告
    report = {
        'generated_at': datetime.now().isoformat(),
        'basic_statistics': basic_stats,
        'time_trends': time_trends,
        'quality_analysis': quality_analysis,
        'sentiment_analysis': sentiment_analysis,
        'top_keywords': top_keywords
    }

    print("\\n✅ 报告生成完成!")
    return report

”’

    print("📄 基础数据分析代码:")
    print(analysis_code)

演示数据分析

analysis_demo = NewsDataAnalysis() analysis_demo.demonstrate_basic_analysis()

print(“数据分析演示完成!”)

2.4 监控与告警系统

”`python

6. 监控与告警系统

print(“\n📊 监控与告警系统:”)

class EcommerceMonitoring: “”” 电商爬虫监控系统 “””

def demonstrate_performance_monitoring(self):
    """
    演示性能监控
    """
    print("\n📈 性能监控系统:")

    monitoring_code = '''

import time import psutil import threading from datetime import datetime, timedelta from collections import defaultdict, deque from scrapy import signals from scrapy.extensions import telnet

class PerformanceMonitor: “”” 性能监控器 “””

def __init__(self, crawler):
    self.crawler = crawler
    self.stats = crawler.stats
    self.settings = crawler.settings

    # 监控数据存储
    self.metrics = defaultdict(deque)
    self.alerts = []
    self.start_time = None

    # 监控配置
    self.monitor_interval = self.settings.getint('MONITOR_INTERVAL', 60)
    self.alert_thresholds = {
        'cpu_usage': self.settings.getfloat('CPU_ALERT_THRESHOLD', 80.0),
        'memory_usage': self.settings.getfloat('MEMORY_ALERT_THRESHOLD', 80.0),
        'error_rate': self.settings.getfloat('ERROR_RATE_THRESHOLD', 5.0),
        'response_time': self.settings.getfloat('RESPONSE_TIME_THRESHOLD', 10.0)
    }

    # 启动监控线程
    self.monitoring_thread = None
    self.stop_monitoring = threading.Event()

@classmethod
def from_crawler(cls, crawler):
    """
    从爬虫创建监控器
    """
    ext = cls(crawler)

    # 连接信号
    crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
    crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
    crawler.signals.connect(ext.request_scheduled, signal=signals.request_scheduled)
    crawler.signals.connect(ext.response_received, signal=signals.response_received)
    crawler.signals.connect(ext.item_scraped, signal=signals.item_scraped)
    crawler.signals.connect(ext.item_dropped, signal=signals.item_dropped)

    return ext

def spider_opened(self, spider):
    """
    爬虫开始时启动监控
    """
    self.start_time = datetime.now()
    spider.logger.info("Performance monitoring started")

    # 启动监控线程
    self.monitoring_thread = threading.Thread(target=self.monitor_loop)
    self.monitoring_thread.daemon = True
    self.monitoring_thread.start()

def spider_closed(self, spider, reason):
    """
    爬虫结束时停止监控
    """
    self.stop_monitoring.set()
    if self.monitoring_thread:
        self.monitoring_thread.join(timeout=5)

    # 生成最终报告
    self.generate_final_report(spider)

def monitor_loop(self):
    """
    监控循环
    """
    while not self.stop_monitoring.wait(self.monitor_interval):
        try:
            self.collect_system_metrics()
            self.collect_scrapy_metrics()
            self.check_alerts()
        except Exception as e:
            print(f"Monitoring error: {e}")

def collect_system_metrics(self):
    """
    收集系统指标
    """
    current_time = datetime.now()

    # CPU使用率
    cpu_percent = psutil.cpu_percent(interval=1)
    self.metrics['cpu_usage'].append((current_time, cpu_percent))

    # 内存使用率
    memory = psutil.virtual_memory()
    memory_percent = memory.percent
    self.metrics['memory_usage'].append((current_time, memory_percent))

    # 磁盘使用率
    disk = psutil.disk_usage('/')
    disk_percent = (disk.used / disk.total) * 100
    self.metrics['disk_usage'].append((current_time, disk_percent))

    # 网络IO
    network = psutil.net_io_counters()
    self.metrics['network_sent'].append((current_time, network.bytes_sent))
    self.metrics['network_recv'].append((current_time, network.bytes_recv))

    # 保持最近1小时的数据
    cutoff_time = current_time - timedelta(hours=1)
    for metric_name, metric_data in self.metrics.items():
        while metric_data and metric_data[0][0] < cutoff_time:
            metric_data.popleft()

def collect_scrapy_metrics(self):
    """
    收集Scrapy指标
    """
    current_time = datetime.now()

    # 请求统计
    requests_total = self.stats.get_value('downloader/request_count', 0)
    responses_total = self.stats.get_value('downloader/response_count', 0)

    # 错误统计
    errors_total = sum([
        self.stats.get_value('downloader/exception_count', 0),
        self.stats.get_value('spider_exceptions', 0),
        self.stats.get_value('item_dropped_count', 0)
    ])

    # 计算错误率
    error_rate = (errors_total / max(requests_total, 1)) * 100

    # 响应时间
    response_times = self.stats.get_value('downloader/response_time', [])
    avg_response_time = sum(response_times) / len(response_times) if response_times else 0

    # 存储指标
    self.metrics['requests_total'].append((current_time, requests_total))
    self.metrics['responses_total'].append((current_time, responses_total))
    self.metrics['errors_total'].append((current_time, errors_total))
    self.metrics['error_rate'].append((current_time, error_rate))
    self.metrics['avg_response_time'].append((current_time, avg_response_time))

    # 数据项统计
    items_scraped = self.stats.get_value('item_scraped_count', 0)
    items_dropped = self.stats.get_value('item_dropped_count', 0)

    self.metrics['items_scraped'].append((current_time, items_scraped))
    self.metrics['items_dropped'].append((current_time, items_dropped))

def check_alerts(self):
    """
    检查告警条件
    """
    current_time = datetime.now()

    # 检查CPU使用率
    if self.metrics['cpu_usage']:
        latest_cpu = self.metrics['cpu_usage'][-1][1]
        if latest_cpu > self.alert_thresholds['cpu_usage']:
            self.create_alert('HIGH_CPU_USAGE', f"CPU usage: {latest_cpu:.1f}%")

    # 检查内存使用率
    if self.metrics['memory_usage']:
        latest_memory = self.metrics['memory_usage'][-1][1]
        if latest_memory > self.alert_thresholds['memory_usage']:
            self.create_alert('HIGH_MEMORY_USAGE', f"Memory usage: {latest_memory:.1f}%")

    # 检查错误率
    if self.metrics['error_rate']:
        latest_error_rate = self.metrics['error_rate'][-1][1]
        if latest_error_rate > self.alert_thresholds['error_rate']:
            self.create_alert('HIGH_ERROR_RATE', f"Error rate: {latest_error_rate:.1f}%")

    # 检查响应时间
    if self.metrics['avg_response_time']:
        latest_response_time = self.metrics['avg_response_time'][-1][1]
        if latest_response_time > self.alert_thresholds['response_time']:
            self.create_alert('HIGH_RESPONSE_TIME', f"Response time: {latest_response_time:.1f}s")

def create_alert(self, alert_type, message):
    """
    创建告警
    """
    alert = {
        'type': alert_type,
        'message': message,
        'timestamp': datetime.now(),
        'severity': self.get_alert_severity(alert_type)
    }

    self.alerts.append(alert)

    # 发送告警通知
    self.send_alert_notification(alert)

def get_alert_severity(self, alert_type):
    """
    获取告警严重程度
    """
    severity_map = {
        'HIGH_CPU_USAGE': 'WARNING',
        'HIGH_MEMORY_USAGE': 'WARNING',
        'HIGH_ERROR_RATE': 'CRITICAL',
        'HIGH_RESPONSE_TIME': 'WARNING'
    }
    return severity_map.get(alert_type, 'INFO')

def send_alert_notification(self, alert):
    """
    发送告警通知
    """
    # 这里可以集成邮件、Slack、钉钉等通知方式
    print(f"🚨 ALERT [{alert['severity']}]: {alert['message']}")

def request_scheduled(self, request, spider):
    """
    请求调度时的回调
    """
    pass

def response_received(self, response, request, spider):
    """
    响应接收时的回调
    """
    # 记录响应时间
    if hasattr(request, 'meta') and 'download_start_time' in request.meta:
        response_time = time.time() - request.meta['download_start_time']

        # 更新统计
        response_times = self.stats.get_value('downloader/response_time', [])
        response_times.append(response_time)
        self.stats.set_value('downloader/response_time', response_times[-100:])  # 保持最近100个

def item_scraped(self, item, response, spider):
    """
    数据项抓取时的回调
    """
    pass

def item_dropped(self, item, response, exception, spider):
    """
    数据项丢弃时的回调
    """
    pass

def generate_final_report(self, spider):
    """
    生成最终报告
    """
    if not self.start_time:
        return

    end_time = datetime.now()
    duration = end_time - self.start_time

    # 基本统计
    total_requests = self.stats.get_value('downloader/request_count', 0)
    total_responses = self.stats.get_value('downloader/response_count', 0)
    total_items = self.stats.get_value('item_scraped_count', 0)
    total_errors = sum([
        self.stats.get_value('downloader/exception_count', 0),
        self.stats.get_value('spider_exceptions', 0),
        self.stats.get_value('item_dropped_count', 0)
    ])

    # 性能指标
    avg_requests_per_minute = (total_requests / duration.total_seconds()) * 60
    success_rate = (total_responses / max(total_requests, 1)) * 100
    error_rate = (total_errors / max(total_requests, 1)) * 100

    # 生成报告
    report = f'''

📊 爬虫性能报告

⏱️ 运行时间: {duration} 📈 基本统计: • 总请求数: {total_requests:,} • 总响应数: {total_responses:,} • 总数据项: {total_items:,} • 总错误数: {total_errors:,}

📊 性能指标: • 平均请求/分钟: {avg_requests_per_minute:.1f} • 成功率: {success_rate:.1f}% • 错误率: {error_rate:.1f}%

🚨 告警统计: • 总告警数: {len(self.alerts)} • 严重告警: {len([a for a in self.alerts if a[‘severity’] == ‘CRITICAL’])} • 警告告警: {len([a for a in self.alerts if a[‘severity’] == ‘WARNING’])} “’

    spider.logger.info(report)

    # 保存报告到文件
    report_file = f"performance_report_{self.start_time.strftime('%Y%m%d_%H%M%S')}.txt"
    with open(report_file, 'w', encoding='utf-8') as f:
        f.write(report)

    spider.logger.info(f"Performance report saved to {report_file}")

在settings.py中启用监控扩展

EXTENSIONS = { ‘myproject.extensions.PerformanceMonitor’: 500, }

监控配置

MONITOR_INTERVAL = 30 # 监控间隔(秒) CPU_ALERT_THRESHOLD = 80.0 # CPU告警阈值 MEMORY_ALERT_THRESHOLD = 80.0 # 内存告警阈值 ERROR_RATE_THRESHOLD = 5.0 # 错误率告警阈值 RESPONSE_TIME_THRESHOLD = 10.0 # 响应时间告警阈值 “’

    print("📄 性能监控代码:")
    print(monitoring_code)

def demonstrate_dashboard_system(self):
    """
    演示监控面板
    """
    print("\n📊 监控面板系统:")

    dashboard_code = '''

import json import asyncio from datetime import datetime, timedelta from flask import Flask, render_template, jsonify from flask_socketio import SocketIO, emit import redis import pymongo from threading import Thread

class ScrapyDashboard: “”” Scrapy监控面板 “””

def __init__(self, redis_host='localhost', redis_port=6379, mongo_uri='mongodb://localhost:27017'):
    self.app = Flask(__name__)
    self.app.config['SECRET_KEY'] = 'scrapy_dashboard_secret'
    self.socketio = SocketIO(self.app, cors_allowed_origins="*")

    # 数据库连接
    self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
    self.mongo_client = pymongo.MongoClient(mongo_uri)
    self.mongo_db = self.mongo_client['scrapy_monitoring']

    # 注册路由
    self.register_routes()
    self.register_socketio_events()

    # 启动数据推送线程
    self.start_data_pusher()

def register_routes(self):
    """
    注册Web路由
    """
    @self.app.route('/')
    def index():
        return render_template('dashboard.html')

    @self.app.route('/api/stats')
    def get_stats():
        """
        获取统计数据
        """
        stats = self.get_current_stats()
        return jsonify(stats)

    @self.app.route('/api/spiders')
    def get_spiders():
        """
        获取爬虫列表
        """
        spiders = self.get_spider_list()
        return jsonify(spiders)

    @self.app.route('/api/alerts')
    def get_alerts():
        """
        获取告警信息
        """
        alerts = self.get_recent_alerts()
        return jsonify(alerts)

    @self.app.route('/api/performance/<spider_name>')
    def get_performance(spider_name):
        """
        获取爬虫性能数据
        """
        performance = self.get_spider_performance(spider_name)
        return jsonify(performance)

def register_socketio_events(self):
    """
    注册SocketIO事件
    """
    @self.socketio.on('connect')
    def handle_connect():
        print('Client connected')
        # 发送初始数据
        emit('stats_update', self.get_current_stats())

    @self.socketio.on('disconnect')
    def handle_disconnect():
        print('Client disconnected')

def start_data_pusher(self):
    """
    启动数据推送线程
    """
    def push_data():
        while True:
            try:
                # 推送实时统计数据
                stats = self.get_current_stats()
                self.socketio.emit('stats_update', stats)

                # 推送告警信息
                alerts = self.get_recent_alerts()
                self.socketio.emit('alerts_update', alerts)

                asyncio.sleep(5)  # 每5秒推送一次
            except Exception as e:
                print(f"Data push error: {e}")
                asyncio.sleep(10)

    thread = Thread(target=push_data)
    thread.daemon = True
    thread.start()

def get_current_stats(self):
    """
    获取当前统计数据
    """
    try:
        # 从Redis获取实时数据
        stats = {
            'total_spiders': len(self.get_spider_list()),
            'active_spiders': self.redis_client.scard('active_spiders'),
            'total_requests': self.redis_client.get('total_requests') or 0,
            'total_items': self.redis_client.get('total_items') or 0,
            'total_errors': self.redis_client.get('total_errors') or 0,
            'system_metrics': self.get_system_metrics(),
            'timestamp': datetime.now().isoformat()
        }

        return stats
    except Exception as e:
        print(f"Error getting stats: {e}")
        return {}

def get_spider_list(self):
    """
    获取爬虫列表
    """
    try:
        # 从MongoDB获取爬虫信息
        spiders_collection = self.mongo_db['spiders']
        spiders = list(spiders_collection.find({}, {'_id': 0}))

        # 添加实时状态
        for spider in spiders:
            spider_name = spider.get('name')
            spider['status'] = 'running' if self.redis_client.sismember('active_spiders', spider_name) else 'stopped'
            spider['last_run'] = self.redis_client.get(f'spider:{spider_name}:last_run')

        return spiders
    except Exception as e:
        print(f"Error getting spider list: {e}")
        return []

def get_recent_alerts(self):
    """
    获取最近的告警
    """
    try:
        # 从MongoDB获取最近24小时的告警
        alerts_collection = self.mongo_db['alerts']
        cutoff_time = datetime.now() - timedelta(hours=24)

        alerts = list(alerts_collection.find(
            {'timestamp': {'$gte': cutoff_time}},
            {'_id': 0}
        ).sort('timestamp', -1).limit(100))

        return alerts
    except Exception as e:
        print(f"Error getting alerts: {e}")
        return []

def get_spider_performance(self, spider_name):
    """
    获取爬虫性能数据
    """
    try:
        # 从MongoDB获取性能数据
        performance_collection = self.mongo_db['performance']
        cutoff_time = datetime.now() - timedelta(hours=24)

        performance_data = list(performance_collection.find(
            {
                'spider_name': spider_name,
                'timestamp': {'$gte': cutoff_time}
            },
            {'_id': 0}
        ).sort('timestamp', 1))

        return performance_data
    except Exception as e:
        print(f"Error getting performance data: {e}")
        return []

def get_system_metrics(self):
    """
    获取系统指标
    """
    try:
        import psutil

        metrics = {
            'cpu_percent': psutil.cpu_percent(),
            'memory_percent': psutil.virtual_memory().percent,
            'disk_percent': psutil.disk_usage('/').percent,
            'network_io': {
                'bytes_sent': psutil.net_io_counters().bytes_sent,
                'bytes_recv': psutil.net_io_counters().bytes_recv
            }
        }

        return metrics
    except Exception as e:
        print(f"Error getting system metrics: {e}")
        return {}

def run(self, host='0.0.0.0', port=5000, debug=False):
    """
    运行面板服务
    """
    self.socketio.run(self.app, host=host, port=port, debug=debug)

HTML模板 (templates/dashboard.html)

dashboard_html = “’ <!DOCTYPE html> Scrapy监控面板

🕷️ Scrapy监控面板

实时监控爬虫运行状态和性能指标

    <div class="stats-grid">
        <div class="stat-card">
            <div class="stat-value" id="total-spiders">0</div>
            <div class="stat-label">总爬虫数</div>
        </div>
        <div class="stat-card">
            <div class="stat-value" id="active-spiders">0</div>
            <div class="stat-label">运行中</div>
        </div>
        <div class="stat-card">
            <div class="stat-value" id="total-requests">0</div>
            <div class="stat-label">总请求数</div>
        </div>
        <div class="stat-card">
            <div class="stat-value" id="total-items">0</div>
            <div class="stat-label">总数据项</div>
        </div>
    </div>

    <div class="chart-container">
        <h3>系统性能</h3>
        <canvas id="performance-chart" width="400" height="200"></canvas>
    </div>

    <div class="alerts-container">
        <h3>最近告警</h3>
        <div id="alerts-list"></div>
    </div>
</div>

<script>
    // 初始化Socket.IO连接
    const socket = io();

    // 初始化图表
    const ctx = document.getElementById('performance-chart').getContext('2d');
    const chart = new Chart(ctx, {
        type: 'line',
        data: {
            labels: [],
            datasets: [{
                label: 'CPU使用率',
                data: [],
                borderColor: 'rgb(255, 99, 132)',
                tension: 0.1
            }, {
                label: '内存使用率',
                data: [],
                borderColor: 'rgb(54, 162, 235)',
                tension: 0.1
            }]
        },
        options: {
            responsive: true,
            scales: {
                y: {
                    beginAtZero: true,
                    max: 100
                }
            }
        }
    });

    // 监听统计数据更新
    socket.on('stats_update', function(data) {
        document.getElementById('total-spiders').textContent = data.total_spiders || 0;
        document.getElementById('active-spiders').textContent = data.active_spiders || 0;
        document.getElementById('total-requests').textContent = (data.total_requests || 0).toLocaleString();
        document.getElementById('total-items').textContent = (data.total_items || 0).toLocaleString();

        // 更新图表
        if (data.system_metrics) {
            const now = new Date().toLocaleTimeString();
            chart.data.labels.push(now);
            chart.data.datasets[0].data.push(data.system_metrics.cpu_percent);
            chart.data.datasets[1].data.push(data.system_metrics.memory_percent);

            // 保持最近20个数据点
            if (chart.data.labels.length > 20) {
                chart.data.labels.shift();
                chart.data.datasets[0].data.shift();
                chart.data.datasets[1].data.shift();
            }

            chart.update();
        }
    });

    // 监听告警更新
    socket.on('alerts_update', function(alerts) {
        const alertsList = document.getElementById('alerts-list');
        alertsList.innerHTML = '';

        alerts.slice(0, 10).forEach(alert => {
            const alertDiv = document.createElement('div');
            alertDiv.className = `alert ${alert.severity.toLowerCase()}`;
            alertDiv.innerHTML = `
                <strong>${alert.type}</strong>: ${alert.message}
                <small style="float: right;">${new Date(alert.timestamp).toLocaleString()}</small>
            `;
            alertsList.appendChild(alertDiv);
        });
    });
</script>

“’

启动监控面板

if name == ‘main’: dashboard = ScrapyDashboard() dashboard.run(debug=True) “’

    print("📄 监控面板代码:")
    print(dashboard_code)

演示监控系统

monitoring_demo = EcommerceMonitoring() monitoring_demo.demonstrate_performance_monitoring() monitoring_demo.demonstrate_dashboard_system()

print(“监控与告警系统演示完成!”)

3. 新闻聚合项目

3.1 项目概述

”`python

7. 新闻聚合项目

print(“\n📰 新闻聚合项目:”)

class NewsAggregationProject: “”” 新闻聚合项目 “””

def __init__(self):
    self.project_name = "news_aggregator"
    self.target_sites = ["cnn", "bbc", "reuters", "techcrunch"]
    self.categories = ["technology", "business", "politics", "sports"]

def analyze_requirements(self):
    """
    需求分析
    """
    print("\n📋 新闻聚合项目需求:")

    requirements = {
        '功能需求': [
            '多源新闻采集',
            '内容去重和分类',
            '情感分析和关键词提取',
            '实时更新和推送',
            '搜索和推荐功能'
        ],
        '技术需求': [
            '支持RSS和网页爬取',
            '自然语言处理',
            '分布式存储',
            '实时数据流处理',
            'API接口提供'
        ],
        '性能需求': [
            '每小时更新1000+文章',
            '去重准确率>90%',
            '分类准确率>85%',
            '响应时间<2秒',
            '支持并发访问'
        ]
    }

    for category, items in requirements.items():
        print(f"\n📖 {category}:")
        for item in items:
            print(f"   • {item}")

def design_data_model(self):
    """
    设计数据模型
    """
    print("\n🗄️ 新闻数据模型:")

    data_model_code = '''

import scrapy from scrapy import Item, Field from datetime import datetime import hashlib

class NewsItem(scrapy.Item): “”” 新闻数据模型 “”” # 基本信息 article_id = Field() # 文章ID title = Field() # 标题 content = Field() # 内容 summary = Field() # 摘要

# 分类信息
category = Field()            # 分类
tags = Field()                # 标签
keywords = Field()            # 关键词

# 来源信息
source = Field()              # 来源网站
author = Field()              # 作者
source_url = Field()          # 原文链接

# 时间信息
publish_time = Field()        # 发布时间
crawl_time = Field()          # 爬取时间
update_time = Field()         # 更新时间

# 媒体信息
images = Field()              # 图片
videos = Field()              # 视频

# 统计信息
view_count = Field()          # 浏览量
comment_count = Field()       # 评论数
share_count = Field()         # 分享数

# 分析结果
sentiment = Field()           # 情感分析
sentiment_score = Field()     # 情感得分
language = Field()            # 语言

# 元数据
content_hash = Field()        # 内容哈希(用于去重)
word_count = Field()          # 字数
reading_time = Field()        # 阅读时间

class CommentItem(scrapy.Item): “”” 评论数据模型 “”” comment_id = Field() # 评论ID article_id = Field() # 文章ID user_name = Field() # 用户名 content = Field() # 评论内容 publish_time = Field() # 发布时间 like_count = Field() # 点赞数 reply_count = Field() # 回复数 sentiment = Field() # 情感分析 crawl_time = Field() # 爬取时间

class CategoryItem(scrapy.Item): “”” 分类数据模型 “”” category_id = Field() # 分类ID name = Field() # 分类名称 description = Field() # 描述 parent_category = Field() # 父分类 article_count = Field() # 文章数量

数据处理工具

class NewsDataProcessor: “”” 新闻数据处理器 “””

@staticmethod
def generate_article_id(title, source, publish_time):
    """
    生成文章ID
    """
    content = f"{title}_{source}_{publish_time}"
    return hashlib.md5(content.encode()).hexdigest()

@staticmethod
def generate_content_hash(content):
    """
    生成内容哈希
    """
    if not content:
        return None

    # 清理内容
    cleaned_content = content.strip().lower()
    return hashlib.sha256(cleaned_content.encode()).hexdigest()

@staticmethod
def extract_keywords(text, max_keywords=10):
    """
    提取关键词
    """
    try:
        from textrank4zh import TextRank4Keyword

        tr4w = TextRank4Keyword()
        tr4w.analyze(text=text, lower=True, window=2)

        keywords = []
        for item in tr4w.get_keywords(max_keywords, word_min_len=2):
            keywords.append(item.word)

        return keywords
    except ImportError:
        # 简单的关键词提取
        import re
        words = re.findall(r'\\b\\w{2,}\\b', text.lower())
        word_freq = {}
        for word in words:
            word_freq[word] = word_freq.get(word, 0) + 1

        # 返回频率最高的词
        sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
        return [word for word, freq in sorted_words[:max_keywords]]

@staticmethod
def analyze_sentiment(text):
    """
    情感分析
    """
    try:
        from snownlp import SnowNLP

        s = SnowNLP(text)
        sentiment_score = s.sentiments

        if sentiment_score > 0.6:
            sentiment = 'positive'
        elif sentiment_score < 0.4:
            sentiment = 'negative'
        else:
            sentiment = 'neutral'

        return sentiment, sentiment_score
    except ImportError:
        # 简单的情感分析
        positive_words = ['好', '棒', '优秀', '成功', '增长', '提升']
        negative_words = ['坏', '差', '失败', '下降', '问题', '危机']

        positive_count = sum(1 for word in positive_words if word in text)
        negative_count = sum(1 for word in negative_words if word in text)

        if positive_count > negative_count:
            return 'positive', 0.7
        elif negative_count > positive_count:
            return 'negative', 0.3
        else:
            return 'neutral', 0.5

@staticmethod
def classify_category(title, content):
    """
    分类识别
    """
    # 关键词映射
    category_keywords = {
        'technology': ['科技', '技术', '互联网', 'AI', '人工智能', '区块链', '5G'],
        'business': ['经济', '商业', '金融', '股票', '投资', '企业', '市场'],
        'politics': ['政治', '政府', '政策', '法律', '选举', '国际'],
        'sports': ['体育', '运动', '足球', '篮球', '奥运', '比赛'],
        'entertainment': ['娱乐', '电影', '音乐', '明星', '综艺'],
        'health': ['健康', '医疗', '疾病', '药物', '医院', '医生']
    }

    text = f"{title} {content}".lower()
    category_scores = {}

    for category, keywords in category_keywords.items():
        score = sum(1 for keyword in keywords if keyword in text)
        if score > 0:
            category_scores[category] = score

    if category_scores:
        return max(category_scores, key=category_scores.get)
    else:
        return 'general'

@staticmethod
def calculate_reading_time(content):
    """
    计算阅读时间
    """
    if not content:
        return 0

    # 假设平均阅读速度为每分钟200字
    word_count = len(content)
    reading_time = max(1, word_count // 200)  # 至少1分钟

    return reading_time

@staticmethod
def generate_summary(content, max_sentences=3):
    """
    生成摘要
    """
    if not content:
        return ""

    # 简单的摘要生成:取前几句话
    import re
    sentences = re.split(r'[。!?]', content)
    sentences = [s.strip() for s in sentences if s.strip()]

    summary_sentences = sentences[:max_sentences]
    return '。'.join(summary_sentences) + '。' if summary_sentences else ""

”’

    print("📄 新闻数据模型代码:")
    print(data_model_code)

演示新闻聚合项目

news_project = NewsAggregationProject() news_project.analyze_requirements() news_project.design_data_model()

print(“新闻聚合项目概述完成!”)

3.2 新闻爬虫实现

”`python

8. 新闻爬虫实现

print(“\n🕷️ 新闻爬虫实现:”)

class NewsSpiderImplementation: “”” 新闻爬虫实现 “””

def demonstrate_rss_spider(self):
    """
    演示RSS爬虫
    """
    print("\n📡 RSS新闻爬虫:")

    rss_spider_code = '''

import scrapy import feedparser from datetime import datetime from urllib.parse import urljoin from scrapy.http import Request from news_aggregator.items import NewsItem from news_aggregator.utils import NewsDataProcessor

class RSSNewsSpider(scrapy.Spider): “”” RSS新闻爬虫 “”” name = ‘rss_news’

# RSS源配置
rss_feeds = {
    'techcrunch': 'https://techcrunch.com/feed/',
    'bbc_tech': 'http://feeds.bbci.co.uk/news/technology/rss.xml',
    'reuters_tech': 'https://www.reuters.com/technology/feed',
    'cnn_tech': 'http://rss.cnn.com/rss/edition.rss'
}

custom_settings = {
    'DOWNLOAD_DELAY': 2,
    'RANDOMIZE_DOWNLOAD_DELAY': True,
    'CONCURRENT_REQUESTS': 8,
    'ITEM_PIPELINES': {
        'news_aggregator.pipelines.NewsValidationPipeline': 300,
        'news_aggregator.pipelines.DuplicationPipeline': 400,
        'news_aggregator.pipelines.NewsProcessingPipeline': 500,
        'news_aggregator.pipelines.NewsStoragePipeline': 600,
    }
}

def start_requests(self):
    """
    生成初始请求
    """
    for source, feed_url in self.rss_feeds.items():
        yield Request(
            url=feed_url,
            callback=self.parse_rss,
            meta={'source': source},
            headers={'User-Agent': self.get_user_agent()}
        )

def get_user_agent(self):
    """
    获取User-Agent
    """
    user_agents = [
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
        'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
    ]
    import random
    return random.choice(user_agents)

def parse_rss(self, response):
    """
    解析RSS源
    """
    source = response.meta['source']

    try:
        # 解析RSS内容
        feed = feedparser.parse(response.text)

        self.logger.info(f"Found {len(feed.entries)} articles from {source}")

        for entry in feed.entries:
            # 提取基本信息
            article_data = {
                'title': entry.get('title', ''),
                'link': entry.get('link', ''),
                'description': entry.get('description', ''),
                'published': entry.get('published', ''),
                'author': entry.get('author', ''),
                'source': source
            }

            # 请求完整文章内容
            if article_data['link']:
                yield Request(
                    url=article_data['link'],
                    callback=self.parse_article,
                    meta={'article_data': article_data},
                    headers={'User-Agent': self.get_user_agent()}
                )

    except Exception as e:
        self.logger.error(f"Error parsing RSS from {source}: {e}")

def parse_article(self, response):
    """
    解析文章详情
    """
    article_data = response.meta['article_data']
    source = article_data['source']

    try:
        # 根据不同源使用不同的解析规则
        if source == 'techcrunch':
            content = self.parse_techcrunch_article(response)
        elif source == 'bbc_tech':
            content = self.parse_bbc_article(response)
        elif source == 'reuters_tech':
            content = self.parse_reuters_article(response)
        elif source == 'cnn_tech':
            content = self.parse_cnn_article(response)
        else:
            content = self.parse_generic_article(response)

        # 创建新闻项
        item = NewsItem()

        # 基本信息
        item['title'] = article_data['title']
        item['content'] = content
        item['source_url'] = response.url
        item['source'] = source
        item['author'] = article_data.get('author', '')

        # 时间信息
        item['publish_time'] = self.parse_publish_time(article_data.get('published'))
        item['crawl_time'] = datetime.now()

        # 生成ID和哈希
        item['article_id'] = NewsDataProcessor.generate_article_id(
            item['title'], item['source'], item['publish_time']
        )
        item['content_hash'] = NewsDataProcessor.generate_content_hash(content)

        # 提取图片
        item['images'] = self.extract_images(response)

        # 基本处理
        item['word_count'] = len(content) if content else 0
        item['reading_time'] = NewsDataProcessor.calculate_reading_time(content)
        item['summary'] = NewsDataProcessor.generate_summary(content)

        # 分类和关键词(在pipeline中处理)
        item['category'] = ''
        item['keywords'] = []
        item['tags'] = []

        # 情感分析(在pipeline中处理)
        item['sentiment'] = ''
        item['sentiment_score'] = 0.0

        # 语言检测
        item['language'] = self.detect_language(content)

        yield item

    except Exception as e:
        self.logger.error(f"Error parsing article from {response.url}: {e}")

def parse_techcrunch_article(self, response):
    """
    解析TechCrunch文章
    """
    # 提取文章内容
    content_selectors = [
        '.article-content .entry-content p::text',
        '.post-content p::text',
        '.entry-content p::text'
    ]

    content = ""
    for selector in content_selectors:
        paragraphs = response.css(selector).getall()
        if paragraphs:
            content = '\\n'.join(paragraphs)
            break

    return content.strip()

def parse_bbc_article(self, response):
    """
    解析BBC文章
    """
    # BBC文章内容选择器
    content_selectors = [
        '[data-component="text-block"] p::text',
        '.story-body__inner p::text',
        '.post__content p::text'
    ]

    content = ""
    for selector in content_selectors:
        paragraphs = response.css(selector).getall()
        if paragraphs:
            content = '\\n'.join(paragraphs)
            break

    return content.strip()

def parse_reuters_article(self, response):
    """
    解析Reuters文章
    """
    # Reuters文章内容选择器
    content_selectors = [
        '.ArticleBodyWrapper p::text',
        '.StandardArticleBody_body p::text',
        '.article-wrap p::text'
    ]

    content = ""
    for selector in content_selectors:
        paragraphs = response.css(selector).getall()
        if paragraphs:
            content = '\\n'.join(paragraphs)
            break

    return content.strip()

def parse_cnn_article(self, response):
    """
    解析CNN文章
    """
    # CNN文章内容选择器
    content_selectors = [
        '.zn-body__paragraph::text',
        '.l-container p::text',
        '.cnn_storypgraphtxt::text'
    ]

    content = ""
    for selector in content_selectors:
        paragraphs = response.css(selector).getall()
        if paragraphs:
            content = '\\n'.join(paragraphs)
            break

    return content.strip()

def parse_generic_article(self, response):
    """
    通用文章解析
    """
    # 通用内容选择器
    content_selectors = [
        'article p::text',
        '.content p::text',
        '.post-content p::text',
        '.entry-content p::text',
        'main p::text',
        '.article-body p::text'
    ]

    content = ""
    for selector in content_selectors:
        paragraphs = response.css(selector).getall()
        if paragraphs:
            content = '\\n'.join(paragraphs)
            break

    return content.strip()

def extract_images(self, response):
    """
    提取文章图片
    """
    images = []

    # 图片选择器
    img_selectors = [
        'article img::attr(src)',
        '.content img::attr(src)',
        '.post-content img::attr(src)',
        '.entry-content img::attr(src)'
    ]

    for selector in img_selectors:
        img_urls = response.css(selector).getall()
        for img_url in img_urls:
            if img_url:
                # 转换为绝对URL
                absolute_url = urljoin(response.url, img_url)
                if absolute_url not in images:
                    images.append(absolute_url)

    return images[:5]  # 最多保存5张图片

def parse_publish_time(self, published_str):
    """
    解析发布时间
    """
    if not published_str:
        return datetime.now()

    try:
        # 尝试多种时间格式
        time_formats = [
            '%a, %d %b %Y %H:%M:%S %Z',
            '%a, %d %b %Y %H:%M:%S %z',
            '%Y-%m-%dT%H:%M:%S%z',
            '%Y-%m-%d %H:%M:%S',
            '%Y-%m-%d'
        ]

        for fmt in time_formats:
            try:
                return datetime.strptime(published_str, fmt)
            except ValueError:
                continue

        # 如果都失败,返回当前时间
        return datetime.now()

    except Exception:
        return datetime.now()

def detect_language(self, text):
    """
    检测语言
    """
    if not text:
        return 'unknown'

    try:
        from langdetect import detect
        return detect(text)
    except:
        # 简单的语言检测
        chinese_chars = len([c for c in text if '\\u4e00' <= c <= '\\u9fff'])
        if chinese_chars > len(text) * 0.3:
            return 'zh'
        else:
            return 'en'

”’

    print("📄 RSS爬虫代码:")
    print(rss_spider_code)

def demonstrate_web_spider(self):
    """
    演示网页爬虫
    """
    print("\n🌐 网页新闻爬虫:")

    web_spider_code = '''

import scrapy from datetime import datetime from scrapy.http import Request from news_aggregator.items import NewsItem from news_aggregator.utils import NewsDataProcessor

class WebNewsSpider(scrapy.Spider): “”” 网页新闻爬虫 “”” name = ‘web_news’

# 目标网站配置
start_urls = [
    'https://www.example-news.com/technology',
    'https://www.example-news.com/business',
    'https://www.example-news.com/politics'
]

custom_settings = {
    'DOWNLOAD_DELAY': 3,
    'RANDOMIZE_DOWNLOAD_DELAY': True,
    'CONCURRENT_REQUESTS': 4,
    'DEPTH_LIMIT': 3,
    'ITEM_PIPELINES': {
        'news_aggregator.pipelines.NewsValidationPipeline': 300,
        'news_aggregator.pipelines.DuplicationPipeline': 400,
        'news_aggregator.pipelines.NewsProcessingPipeline': 500,
        'news_aggregator.pipelines.NewsStoragePipeline': 600,
    }
}

def parse(self, response):
    """
    解析列表页
    """
    # 提取文章链接
    article_links = response.css('.article-list .article-item a::attr(href)').getall()

    for link in article_links:
        if link:
            absolute_url = response.urljoin(link)
            yield Request(
                url=absolute_url,
                callback=self.parse_article,
                meta={'category': self.extract_category_from_url(response.url)}
            )

    # 跟进分页
    next_page = response.css('.pagination .next::attr(href)').get()
    if next_page:
        yield Request(
            url=response.urljoin(next_page),
            callback=self.parse
        )

def parse_article(self, response):
    """
    解析文章详情
    """
    try:
        # 提取文章信息
        title = response.css('h1.article-title::text').get()
        if not title:
            title = response.css('title::text').get()

        # 提取内容
        content_paragraphs = response.css('.article-content p::text').getall()
        content = '\\n'.join(content_paragraphs) if content_paragraphs else ''

        # 提取作者
        author = response.css('.article-author::text').get()
        if not author:
            author = response.css('[rel="author"]::text').get()

        # 提取发布时间
        publish_time_str = response.css('.article-date::text').get()
        if not publish_time_str:
            publish_time_str = response.css('time::attr(datetime)').get()

        # 提取图片
        images = response.css('.article-content img::attr(src)').getall()
        images = [response.urljoin(img) for img in images if img]

        # 创建新闻项
        item = NewsItem()

        # 基本信息
        item['title'] = title.strip() if title else ''
        item['content'] = content.strip()
        item['source_url'] = response.url
        item['source'] = self.extract_source_from_url(response.url)
        item['author'] = author.strip() if author else ''

        # 时间信息
        item['publish_time'] = self.parse_publish_time(publish_time_str)
        item['crawl_time'] = datetime.now()

        # 分类
        item['category'] = response.meta.get('category', '')

        # 生成ID和哈希
        item['article_id'] = NewsDataProcessor.generate_article_id(
            item['title'], item['source'], item['publish_time']
        )
        item['content_hash'] = NewsDataProcessor.generate_content_hash(content)

        # 媒体信息
        item['images'] = images[:5]  # 最多5张图片
        item['videos'] = []

        # 基本处理
        item['word_count'] = len(content) if content else 0
        item['reading_time'] = NewsDataProcessor.calculate_reading_time(content)
        item['summary'] = NewsDataProcessor.generate_summary(content)

        # 初始化其他字段
        item['keywords'] = []
        item['tags'] = []
        item['sentiment'] = ''
        item['sentiment_score'] = 0.0
        item['language'] = ''
        item['view_count'] = 0
        item['comment_count'] = 0
        item['share_count'] = 0

        # 只有标题和内容都存在才返回
        if item['title'] and item['content']:
            yield item

    except Exception as e:
        self.logger.error(f"Error parsing article from {response.url}: {e}")

def extract_category_from_url(self, url):
    """
    从URL提取分类
    """
    category_mapping = {
        'technology': 'technology',
        'tech': 'technology',
        'business': 'business',
        'finance': 'business',
        'politics': 'politics',
        'sports': 'sports',
        'entertainment': 'entertainment',
        'health': 'health'
    }

    url_lower = url.lower()
    for keyword, category in category_mapping.items():
        if keyword in url_lower:
            return category

    return 'general'

def extract_source_from_url(self, url):
    """
    从URL提取来源
    """
    from urllib.parse import urlparse

    parsed = urlparse(url)
    domain = parsed.netloc.lower()

    # 移除www前缀
    if domain.startswith('www.'):
        domain = domain[4:]

    # 移除顶级域名
    if '.' in domain:
        domain = domain.split('.')[0]

    return domain

def parse_publish_time(self, time_str):
    """
    解析发布时间
    """
    if not time_str:
        return datetime.now()

    try:
        # 清理时间字符串
        time_str = time_str.strip()

        # 尝试多种格式
        formats = [
            '%Y-%m-%d %H:%M:%S',
            '%Y-%m-%dT%H:%M:%S',
            '%Y-%m-%dT%H:%M:%SZ',
            '%Y-%m-%d',
            '%d/%m/%Y %H:%M',
            '%d/%m/%Y'
        ]

        for fmt in formats:
            try:
                return datetime.strptime(time_str, fmt)
            except ValueError:
                continue

        return datetime.now()

    except Exception:
        return datetime.now()

”’

    print("📄 网页爬虫代码:")
    print(web_spider_code)

演示新闻爬虫实现

spider_demo = NewsSpiderImplementation() spider_demo.demonstrate_rss_spider() spider_demo.demonstrate_web_spider()

print(“新闻爬虫实现演示完成!”)

3.3 数据处理管道

”`python

9. 新闻数据处理管道

print(“\n⚙️ 新闻数据处理管道:”)

class NewsProcessingPipelines: “”” 新闻数据处理管道 “””

def demonstrate_validation_pipeline(self):
    """
    演示数据验证管道
    """
    print("\n✅ 数据验证管道:")

    validation_code = '''

from scrapy.exceptions import DropItem from datetime import datetime, timedelta import re

class NewsValidationPipeline: “”” 新闻数据验证管道 “””

def __init__(self):
    self.min_title_length = 10
    self.min_content_length = 100
    self.max_age_days = 30

    # 垃圾内容关键词
    self.spam_keywords = [
        'advertisement', '广告', 'sponsored', '赞助',
        'click here', '点击这里', 'buy now', '立即购买'
    ]

def process_item(self, item, spider):
    """
    处理数据项
    """
    try:
        # 验证标题
        if not self.validate_title(item.get('title', '')):
            raise DropItem(f"Invalid title: {item.get('title', '')}")

        # 验证内容
        if not self.validate_content(item.get('content', '')):
            raise DropItem(f"Invalid content for article: {item.get('title', '')}")

        # 验证发布时间
        if not self.validate_publish_time(item.get('publish_time')):
            raise DropItem(f"Invalid publish time for article: {item.get('title', '')}")

        # 验证来源URL
        if not self.validate_source_url(item.get('source_url', '')):
            raise DropItem(f"Invalid source URL: {item.get('source_url', '')}")

        # 检查垃圾内容
        if self.is_spam_content(item):
            raise DropItem(f"Spam content detected: {item.get('title', '')}")

        spider.logger.info(f"Validated article: {item.get('title', '')}")
        return item

    except DropItem:
        raise
    except Exception as e:
        spider.logger.error(f"Validation error: {e}")
        raise DropItem(f"Validation failed: {e}")

def validate_title(self, title):
    """
    验证标题
    """
    if not title or not isinstance(title, str):
        return False

    title = title.strip()

    # 检查长度
    if len(title) < self.min_title_length:
        return False

    # 检查是否全是特殊字符
    if not re.search(r'[a-zA-Z\\u4e00-\\u9fff]', title):
        return False

    return True

def validate_content(self, content):
    """
    验证内容
    """
    if not content or not isinstance(content, str):
        return False

    content = content.strip()

    # 检查长度
    if len(content) < self.min_content_length:
        return False

    # 检查是否有实际内容
    if not re.search(r'[a-zA-Z\\u4e00-\\u9fff]', content):
        return False

    return True

def validate_publish_time(self, publish_time):
    """
    验证发布时间
    """
    if not publish_time:
        return False

    if not isinstance(publish_time, datetime):
        return False

    # 检查时间是否在合理范围内
    now = datetime.now()
    max_age = timedelta(days=self.max_age_days)

    if publish_time > now:
        return False  # 未来时间

    if now - publish_time > max_age:
        return False  # 太旧的文章

    return True

def validate_source_url(self, url):
    """
    验证来源URL
    """
    if not url or not isinstance(url, str):
        return False

    # 简单的URL格式检查
    url_pattern = re.compile(
        r'^https?://'  # http:// or https://
        r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\\.)+'  # domain...
        r'(?:[A-Z]{2,6}\\.?|[A-Z0-9-]{2,}\\.?)|'  # host...
        r'localhost|'  # localhost...
        r'\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3})'  # ...or ip
        r'(?::\\d+)?'  # optional port
        r'(?:/?|[/?]\\S+)$', re.IGNORECASE)

    return url_pattern.match(url) is not None

def is_spam_content(self, item):
    """
    检查是否为垃圾内容
    """
    title = item.get('title', '').lower()
    content = item.get('content', '').lower()

    text = f"{title} {content}"

    # 检查垃圾关键词
    for keyword in self.spam_keywords:
        if keyword.lower() in text:
            return True

    # 检查重复字符
    if self.has_excessive_repetition(text):
        return True

    return False

def has_excessive_repetition(self, text):
    """
    检查是否有过度重复
    """
    # 检查连续重复字符
    if re.search(r'(.)\\1{10,}', text):
        return True

    # 检查重复单词
    words = text.split()
    if len(words) > 10:
        word_count = {}
        for word in words:
            word_count[word] = word_count.get(word, 0) + 1

        # 如果某个词出现次数超过总词数的30%
        max_count = max(word_count.values())
        if max_count > len(words) * 0.3:
            return True

    return False

”’

    print("📄 数据验证管道代码:")
    print(validation_code)

def demonstrate_deduplication_pipeline(self):
    """
    演示去重管道
    """
    print("\n🔄 去重管道:")

    dedup_code = '''

import hashlib import redis from scrapy.exceptions import DropItem from scrapy.dupefilters import BaseDupeFilter

class DuplicationPipeline: “”” 去重管道 “””

def __init__(self, redis_host='localhost', redis_port=6379, redis_db=0):
    self.redis_client = redis.Redis(
        host=redis_host, 
        port=redis_port, 
        db=redis_db,
        decode_responses=True
    )
    self.content_hash_key = 'news:content_hashes'
    self.url_hash_key = 'news:url_hashes'
    self.title_hash_key = 'news:title_hashes'

    # 相似度阈值
    self.similarity_threshold = 0.8

@classmethod
def from_crawler(cls, crawler):
    """
    从爬虫配置创建实例
    """
    settings = crawler.settings
    return cls(
        redis_host=settings.get('REDIS_HOST', 'localhost'),
        redis_port=settings.get('REDIS_PORT', 6379),
        redis_db=settings.get('REDIS_DB', 0)
    )

def process_item(self, item, spider):
    """
    处理数据项
    """
    try:
        # URL去重
        if self.is_duplicate_url(item.get('source_url', '')):
            raise DropItem(f"Duplicate URL: {item.get('source_url', '')}")

        # 内容去重
        content_hash = item.get('content_hash')
        if content_hash and self.is_duplicate_content(content_hash):
            raise DropItem(f"Duplicate content: {item.get('title', '')}")

        # 标题相似度检查
        if self.is_similar_title(item.get('title', '')):
            raise DropItem(f"Similar title exists: {item.get('title', '')}")

        # 记录哈希值
        self.record_hashes(item)

        spider.logger.info(f"Passed deduplication: {item.get('title', '')}")
        return item

    except DropItem:
        raise
    except Exception as e:
        spider.logger.error(f"Deduplication error: {e}")
        return item

def is_duplicate_url(self, url):
    """
    检查URL是否重复
    """
    if not url:
        return False

    url_hash = hashlib.md5(url.encode()).hexdigest()
    return self.redis_client.sismember(self.url_hash_key, url_hash)

def is_duplicate_content(self, content_hash):
    """
    检查内容是否重复
    """
    if not content_hash:
        return False

    return self.redis_client.sismember(self.content_hash_key, content_hash)

def is_similar_title(self, title):
    """
    检查标题相似度
    """
    if not title:
        return False

    title_hash = self.generate_title_hash(title)

    # 获取所有已存在的标题哈希
    existing_hashes = self.redis_client.smembers(self.title_hash_key)

    for existing_hash in existing_hashes:
        similarity = self.calculate_similarity(title_hash, existing_hash)
        if similarity > self.similarity_threshold:
            return True

    return False

def generate_title_hash(self, title):
    """
    生成标题哈希(用于相似度比较)
    """
    # 清理标题
    import re
    cleaned_title = re.sub(r'[^\\w\\s]', '', title.lower())
    words = cleaned_title.split()

    # 移除停用词
    stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
    filtered_words = [word for word in words if word not in stop_words]

    # 排序并连接
    sorted_words = sorted(filtered_words)
    return hashlib.md5(' '.join(sorted_words).encode()).hexdigest()

def calculate_similarity(self, hash1, hash2):
    """
    计算哈希相似度
    """
    if hash1 == hash2:
        return 1.0

    # 简单的字符级相似度
    common_chars = sum(1 for a, b in zip(hash1, hash2) if a == b)
    return common_chars / len(hash1)

def record_hashes(self, item):
    """
    记录哈希值
    """
    # 记录URL哈希
    url = item.get('source_url', '')
    if url:
        url_hash = hashlib.md5(url.encode()).hexdigest()
        self.redis_client.sadd(self.url_hash_key, url_hash)

    # 记录内容哈希
    content_hash = item.get('content_hash')
    if content_hash:
        self.redis_client.sadd(self.content_hash_key, content_hash)

    # 记录标题哈希
    title = item.get('title', '')
    if title:
        title_hash = self.generate_title_hash(title)
        self.redis_client.sadd(self.title_hash_key, title_hash)

    # 设置过期时间(30天)
    expire_time = 30 * 24 * 3600
    self.redis_client.expire(self.url_hash_key, expire_time)
    self.redis_client.expire(self.content_hash_key, expire_time)
    self.redis_client.expire(self.title_hash_key, expire_time)

def clear_old_hashes(self):
    """
    清理旧的哈希值
    """
    # 这个方法可以定期调用来清理过期数据
    pass

”’

    print("📄 去重管道代码:")
    print(dedup_code)

演示数据处理管道

pipeline_demo = NewsProcessingPipelines() pipeline_demo.demonstrate_validation_pipeline() pipeline_demo.demonstrate_deduplication_pipeline()

print(“数据处理管道演示完成!”)