本章概述
本章将通过多个完整的实战项目,展示如何将前面学到的Scrapy知识综合运用到实际场景中。我们将从简单的单页面爬虫开始,逐步深入到复杂的分布式爬虫系统,涵盖电商数据采集、新闻聚合、社交媒体监控等多个领域。
学习目标
- 掌握完整的Scrapy项目开发流程
- 学会处理各种复杂的爬取场景
- 了解项目架构设计和最佳实践
- 掌握性能优化和问题排查技巧
- 学会项目部署和运维管理
1. 项目开发流程与规范
1.1 项目规划与设计
# 1. 项目开发流程演示
print("🚀 Scrapy实战项目开发流程:")
class ProjectPlanningDemo:
"""
项目规划演示
"""
def demonstrate_project_analysis(self):
"""
演示项目需求分析
"""
print("\n📋 项目需求分析:")
analysis_template = '''
# 项目需求分析模板
## 1. 项目背景
- 业务需求:明确爬取数据的用途和价值
- 目标网站:分析目标网站的结构和特点
- 数据规模:估算数据量和更新频率
- 时间要求:确定项目开发和交付时间
## 2. 技术分析
- 网站技术栈:静态页面 vs 动态页面
- 反爬虫机制:验证码、IP限制、用户代理检测等
- 数据格式:HTML、JSON、XML等
- 存储需求:数据库类型和存储方案
## 3. 风险评估
- 法律风险:robots.txt、服务条款
- 技术风险:反爬虫升级、网站改版
- 性能风险:服务器负载、网络稳定性
- 数据风险:数据质量、数据完整性
## 4. 资源规划
- 开发人员:技能要求和人员配置
- 硬件资源:服务器、存储、网络
- 软件资源:开发工具、第三方服务
- 时间资源:开发周期和里程碑
'''
print("📄 需求分析模板:")
print(analysis_template)
def demonstrate_architecture_design(self):
"""
演示架构设计
"""
print("\n🏗️ 系统架构设计:")
architecture_code = '''
import os
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
class ProjectType(Enum):
"""项目类型"""
SIMPLE_CRAWLER = "simple_crawler"
DISTRIBUTED_CRAWLER = "distributed_crawler"
REAL_TIME_CRAWLER = "real_time_crawler"
BATCH_CRAWLER = "batch_crawler"
class DataSource(Enum):
"""数据源类型"""
WEB_PAGE = "web_page"
API = "api"
DATABASE = "database"
FILE = "file"
@dataclass
class ProjectConfig:
"""项目配置"""
name: str
type: ProjectType
description: str
target_urls: List[str]
data_sources: List[DataSource]
output_formats: List[str]
storage_backends: List[str]
# 性能配置
concurrent_requests: int = 16
download_delay: float = 1.0
randomize_download_delay: bool = True
# 反爬虫配置
use_proxy: bool = False
rotate_user_agent: bool = True
respect_robots_txt: bool = True
# 监控配置
enable_monitoring: bool = True
enable_alerting: bool = True
log_level: str = "INFO"
class ProjectArchitect:
"""项目架构师"""
def __init__(self):
self.components = {}
self.dependencies = {}
def design_architecture(self, config: ProjectConfig) -> Dict:
"""
设计项目架构
"""
architecture = {
'project_info': {
'name': config.name,
'type': config.type.value,
'description': config.description
},
'components': self._design_components(config),
'data_flow': self._design_data_flow(config),
'deployment': self._design_deployment(config),
'monitoring': self._design_monitoring(config)
}
return architecture
def _design_components(self, config: ProjectConfig) -> Dict:
"""
设计系统组件
"""
components = {
'spiders': {
'description': '爬虫组件,负责数据采集',
'files': ['spiders/__init__.py', 'spiders/main_spider.py'],
'dependencies': ['scrapy', 'requests']
},
'items': {
'description': '数据模型,定义数据结构',
'files': ['items.py'],
'dependencies': ['scrapy']
},
'pipelines': {
'description': '数据处理管道',
'files': ['pipelines.py'],
'dependencies': ['scrapy', 'pymongo', 'redis']
},
'middlewares': {
'description': '中间件,处理请求和响应',
'files': ['middlewares.py'],
'dependencies': ['scrapy', 'fake-useragent']
},
'extensions': {
'description': '扩展组件,增强功能',
'files': ['extensions.py'],
'dependencies': ['scrapy']
},
'utils': {
'description': '工具函数',
'files': ['utils/__init__.py', 'utils/helpers.py'],
'dependencies': []
}
}
# 根据项目类型添加特定组件
if config.type == ProjectType.DISTRIBUTED_CRAWLER:
components['scheduler'] = {
'description': '分布式调度器',
'files': ['scheduler.py'],
'dependencies': ['scrapy-redis', 'redis']
}
if config.enable_monitoring:
components['monitoring'] = {
'description': '监控组件',
'files': ['monitoring.py'],
'dependencies': ['prometheus-client', 'grafana-api']
}
return components
def _design_data_flow(self, config: ProjectConfig) -> Dict:
"""
设计数据流
"""
data_flow = {
'input': {
'sources': [source.value for source in config.data_sources],
'formats': ['html', 'json', 'xml']
},
'processing': {
'extraction': '使用CSS选择器和XPath提取数据',
'cleaning': '数据清洗和标准化',
'validation': '数据验证和质量检查',
'transformation': '数据转换和格式化'
},
'output': {
'formats': config.output_formats,
'storage': config.storage_backends,
'apis': ['REST API', 'GraphQL']
}
}
return data_flow
def _design_deployment(self, config: ProjectConfig) -> Dict:
"""
设计部署方案
"""
deployment = {
'development': {
'environment': 'local',
'tools': ['Docker', 'docker-compose'],
'services': ['scrapy', 'redis', 'mongodb']
},
'testing': {
'environment': 'staging',
'tools': ['pytest', 'coverage'],
'ci_cd': ['GitHub Actions', 'Jenkins']
},
'production': {
'environment': 'cloud',
'platform': ['AWS', 'GCP', 'Azure'],
'orchestration': ['Kubernetes', 'Docker Swarm'],
'monitoring': ['Prometheus', 'Grafana', 'ELK Stack']
}
}
return deployment
def _design_monitoring(self, config: ProjectConfig) -> Dict:
"""
设计监控方案
"""
monitoring = {
'metrics': {
'system': ['CPU', 'Memory', 'Disk', 'Network'],
'application': ['Requests/sec', 'Response time', 'Error rate'],
'business': ['Items scraped', 'Data quality', 'Coverage']
},
'logging': {
'levels': ['DEBUG', 'INFO', 'WARNING', 'ERROR'],
'destinations': ['File', 'Database', 'Cloud'],
'format': 'JSON structured logging'
},
'alerting': {
'channels': ['Email', 'Slack', 'PagerDuty'],
'rules': ['Error rate > 5%', 'Response time > 10s'],
'escalation': 'Auto-escalation after 15 minutes'
}
}
return monitoring
def generate_project_structure(self, config: ProjectConfig) -> str:
"""
生成项目结构
"""
structure = f'''
{config.name}/
├── scrapy.cfg # Scrapy配置文件
├── requirements.txt # Python依赖
├── Dockerfile # Docker配置
├── docker-compose.yml # Docker Compose配置
├── README.md # 项目文档
├── .gitignore # Git忽略文件
├── .env # 环境变量
├── {config.name}/
│ ├── __init__.py
│ ├── items.py # 数据模型
│ ├── middlewares.py # 中间件
│ ├── pipelines.py # 数据管道
│ ├── settings.py # 项目设置
│ ├── extensions.py # 扩展组件
│ ├── spiders/
│ │ ├── __init__.py
│ │ └── main_spider.py # 主爬虫
│ └── utils/
│ ├── __init__.py
│ ├── helpers.py # 工具函数
│ └── validators.py # 数据验证
├── tests/ # 测试文件
│ ├── __init__.py
│ ├── test_spiders.py
│ ├── test_pipelines.py
│ └── test_utils.py
├── docs/ # 文档
│ ├── api.md
│ ├── deployment.md
│ └── troubleshooting.md
├── scripts/ # 脚本文件
│ ├── deploy.sh
│ ├── backup.sh
│ └── monitor.py
└── data/ # 数据文件
├── raw/
├── processed/
└── exports/
'''
return structure
# 演示项目规划
project_demo = ProjectPlanningDemo()
project_demo.demonstrate_project_analysis()
project_demo.demonstrate_architecture_design()
# 创建示例项目配置
sample_config = ProjectConfig(
name="ecommerce_crawler",
type=ProjectType.DISTRIBUTED_CRAWLER,
description="电商网站数据采集系统",
target_urls=["https://example-shop.com"],
data_sources=[DataSource.WEB_PAGE, DataSource.API],
output_formats=["json", "csv"],
storage_backends=["mongodb", "elasticsearch"],
concurrent_requests=32,
download_delay=0.5,
use_proxy=True,
enable_monitoring=True
)
# 生成项目架构
architect = ProjectArchitect()
architecture = architect.design_architecture(sample_config)
print("\n🏗️ 生成的项目架构:")
for key, value in architecture.items():
print(f"📁 {key}: {value}")
print("\n📂 项目结构:")
print(architect.generate_project_structure(sample_config))
print("项目规划演示完成!")
1.2 开发环境搭建
# 2. 开发环境搭建
print("\n🛠️ 开发环境搭建:")
class DevelopmentEnvironment:
"""
开发环境管理
"""
def __init__(self):
self.tools = {}
self.dependencies = {}
def setup_local_environment(self):
"""
搭建本地开发环境
"""
print("\n💻 本地开发环境搭建:")
setup_script = '''
#!/bin/bash
# 本地开发环境搭建脚本
echo "🚀 开始搭建Scrapy开发环境..."
# 1. 检查Python版本
echo "📋 检查Python版本..."
python_version=$(python3 --version 2>&1)
echo "Python版本: $python_version"
if ! command -v python3 &> /dev/null; then
echo "❌ Python3未安装,请先安装Python3.8+"
exit 1
fi
# 2. 创建虚拟环境
echo "🔧 创建虚拟环境..."
python3 -m venv scrapy_env
source scrapy_env/bin/activate
# 3. 升级pip
echo "⬆️ 升级pip..."
pip install --upgrade pip
# 4. 安装核心依赖
echo "📦 安装核心依赖..."
pip install scrapy
pip install scrapy-redis
pip install scrapy-splash
pip install scrapy-playwright
# 5. 安装数据库驱动
echo "🗄️ 安装数据库驱动..."
pip install pymongo
pip install redis
pip install elasticsearch
pip install psycopg2-binary
# 6. 安装开发工具
echo "🛠️ 安装开发工具..."
pip install pytest
pip install pytest-cov
pip install black
pip install flake8
pip install mypy
# 7. 安装监控工具
echo "📊 安装监控工具..."
pip install prometheus-client
pip install grafana-api
pip install sentry-sdk
# 8. 安装其他工具
echo "🔧 安装其他工具..."
pip install fake-useragent
pip install requests
pip install beautifulsoup4
pip install lxml
pip install pillow
# 9. 生成requirements.txt
echo "📝 生成requirements.txt..."
pip freeze > requirements.txt
echo "✅ 开发环境搭建完成!"
echo "🎯 激活环境: source scrapy_env/bin/activate"
'''
print("📄 环境搭建脚本:")
print(setup_script)
def setup_docker_environment(self):
"""
搭建Docker开发环境
"""
print("\n🐳 Docker开发环境:")
dockerfile = '''
# Dockerfile
FROM python:3.9-slim
# 设置工作目录
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \\
gcc \\
g++ \\
libxml2-dev \\
libxslt-dev \\
libffi-dev \\
libssl-dev \\
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制项目文件
COPY . .
# 设置环境变量
ENV PYTHONPATH=/app
ENV SCRAPY_SETTINGS_MODULE=myproject.settings
# 暴露端口
EXPOSE 6800
# 启动命令
CMD ["scrapyd"]
'''
docker_compose = '''
# docker-compose.yml
version: '3.8'
services:
scrapy:
build: .
ports:
- "6800:6800"
volumes:
- .:/app
- ./data:/app/data
environment:
- REDIS_URL=redis://redis:6379
- MONGO_URL=mongodb://mongo:27017
depends_on:
- redis
- mongo
networks:
- scrapy-network
redis:
image: redis:6-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
networks:
- scrapy-network
mongo:
image: mongo:4.4
ports:
- "27017:27017"
volumes:
- mongo_data:/data/db
environment:
- MONGO_INITDB_ROOT_USERNAME=admin
- MONGO_INITDB_ROOT_PASSWORD=password
networks:
- scrapy-network
elasticsearch:
image: elasticsearch:7.14.0
ports:
- "9200:9200"
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- es_data:/usr/share/elasticsearch/data
networks:
- scrapy-network
kibana:
image: kibana:7.14.0
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
depends_on:
- elasticsearch
networks:
- scrapy-network
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
networks:
- scrapy-network
grafana:
image: grafana/grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana_data:/var/lib/grafana
networks:
- scrapy-network
volumes:
redis_data:
mongo_data:
es_data:
prometheus_data:
grafana_data:
networks:
scrapy-network:
driver: bridge
'''
print("📄 Dockerfile:")
print(dockerfile)
print("\n📄 docker-compose.yml:")
print(docker_compose)
def setup_development_tools(self):
"""
配置开发工具
"""
print("\n🔧 开发工具配置:")
# pytest配置
pytest_ini = '''
# pytest.ini
[tool:pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts =
--verbose
--tb=short
--cov=myproject
--cov-report=html
--cov-report=term-missing
markers =
unit: Unit tests
integration: Integration tests
e2e: End-to-end tests
slow: Slow running tests
'''
# Black配置
pyproject_toml = '''
# pyproject.toml
[tool.black]
line-length = 88
target-version = ['py38']
include = '\\.pyi?$'
extend-exclude = '''
/(
# directories
\\.eggs
| \\.git
| \\.hg
| \\.mypy_cache
| \\.tox
| \\.venv
| build
| dist
)/
'''
[tool.isort]
profile = "black"
multi_line_output = 3
line_length = 88
[tool.mypy]
python_version = "3.8"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
'''
# flake8配置
flake8_cfg = '''
# .flake8
[flake8]
max-line-length = 88
extend-ignore = E203, W503
exclude =
.git,
__pycache__,
.venv,
build,
dist,
*.egg-info
'''
# pre-commit配置
pre_commit_yaml = '''
# .pre-commit-config.yaml
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- repo: https://github.com/psf/black
rev: 22.10.0
hooks:
- id: black
- repo: https://github.com/pycqa/isort
rev: 5.10.1
hooks:
- id: isort
- repo: https://github.com/pycqa/flake8
rev: 5.0.4
hooks:
- id: flake8
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.991
hooks:
- id: mypy
additional_dependencies: [types-requests]
'''
print("📄 pytest.ini:")
print(pytest_ini)
print("\n📄 pyproject.toml:")
print(pyproject_toml)
print("\n📄 .flake8:")
print(flake8_cfg)
print("\n📄 .pre-commit-config.yaml:")
print(pre_commit_yaml)
# 演示开发环境搭建
dev_env = DevelopmentEnvironment()
dev_env.setup_local_environment()
dev_env.setup_docker_environment()
dev_env.setup_development_tools()
print("开发环境搭建演示完成!")
2. 电商数据采集项目
2.1 项目需求分析
# 3. 电商数据采集项目
print("\n🛒 电商数据采集项目:")
class EcommerceProject:
"""
电商数据采集项目
"""
def __init__(self):
self.project_name = "ecommerce_crawler"
self.target_sites = ["amazon", "ebay", "shopify"]
self.data_types = ["products", "reviews", "prices", "sellers"]
def analyze_requirements(self):
"""
需求分析
"""
print("\n📋 项目需求分析:")
requirements = {
'业务需求': [
'采集主流电商平台的商品信息',
'监控商品价格变化趋势',
'分析用户评论和评分',
'跟踪卖家信息和信誉',
'生成市场分析报告'
],
'技术需求': [
'支持多个电商平台',
'处理JavaScript渲染页面',
'应对反爬虫机制',
'实现分布式爬取',
'数据质量保证'
],
'性能需求': [
'每日处理100万+商品',
'响应时间<5秒',
'数据准确率>95%',
'系统可用性>99%',
'支持水平扩展'
],
'合规需求': [
'遵守robots.txt',
'控制访问频率',
'保护用户隐私',
'数据使用合规',
'避免服务器过载'
]
}
for category, items in requirements.items():
print(f"\n📖 {category}:")
for item in items:
print(f" • {item}")
def design_data_model(self):
"""
设计数据模型
"""
print("\n🗄️ 数据模型设计:")
data_model_code = '''
import scrapy
from scrapy import Item, Field
from datetime import datetime
from typing import Optional, List, Dict
class ProductItem(scrapy.Item):
"""
商品数据模型
"""
# 基本信息
product_id = Field() # 商品ID
title = Field() # 商品标题
brand = Field() # 品牌
category = Field() # 分类
subcategory = Field() # 子分类
# 价格信息
current_price = Field() # 当前价格
original_price = Field() # 原价
discount = Field() # 折扣
currency = Field() # 货币
# 详细信息
description = Field() # 商品描述
specifications = Field() # 规格参数
images = Field() # 商品图片
videos = Field() # 商品视频
# 销售信息
seller_id = Field() # 卖家ID
seller_name = Field() # 卖家名称
stock_quantity = Field() # 库存数量
sales_count = Field() # 销量
# 评价信息
rating = Field() # 评分
review_count = Field() # 评论数
rating_distribution = Field() # 评分分布
# 元数据
source_url = Field() # 来源URL
source_site = Field() # 来源网站
crawl_time = Field() # 爬取时间
update_time = Field() # 更新时间
class ReviewItem(scrapy.Item):
"""
评论数据模型
"""
# 基本信息
review_id = Field() # 评论ID
product_id = Field() # 商品ID
user_id = Field() # 用户ID
user_name = Field() # 用户名
# 评论内容
rating = Field() # 评分
title = Field() # 评论标题
content = Field() # 评论内容
pros = Field() # 优点
cons = Field() # 缺点
# 评论属性
verified_purchase = Field() # 是否验证购买
helpful_count = Field() # 有用数
total_votes = Field() # 总投票数
# 时间信息
review_date = Field() # 评论日期
crawl_time = Field() # 爬取时间
class SellerItem(scrapy.Item):
"""
卖家数据模型
"""
# 基本信息
seller_id = Field() # 卖家ID
seller_name = Field() # 卖家名称
company_name = Field() # 公司名称
# 联系信息
email = Field() # 邮箱
phone = Field() # 电话
address = Field() # 地址
website = Field() # 网站
# 业务信息
business_type = Field() # 业务类型
registration_date = Field() # 注册日期
product_count = Field() # 商品数量
category_focus = Field() # 主营类目
# 信誉信息
rating = Field() # 评分
review_count = Field() # 评论数
positive_feedback = Field() # 好评率
response_rate = Field() # 响应率
response_time = Field() # 响应时间
# 元数据
source_url = Field() # 来源URL
source_site = Field() # 来源网站
crawl_time = Field() # 爬取时间
class PriceHistoryItem(scrapy.Item):
"""
价格历史数据模型
"""
product_id = Field() # 商品ID
price = Field() # 价格
currency = Field() # 货币
date = Field() # 日期
source_site = Field() # 来源网站
crawl_time = Field() # 爬取时间
# 数据验证器
class DataValidator:
"""
数据验证器
"""
@staticmethod
def validate_product(item: ProductItem) -> bool:
"""
验证商品数据
"""
required_fields = ['product_id', 'title', 'current_price', 'source_url']
for field in required_fields:
if not item.get(field):
return False
# 验证价格格式
try:
float(item['current_price'])
except (ValueError, TypeError):
return False
# 验证URL格式
if not item['source_url'].startswith(('http://', 'https://')):
return False
return True
@staticmethod
def validate_review(item: ReviewItem) -> bool:
"""
验证评论数据
"""
required_fields = ['review_id', 'product_id', 'rating', 'content']
for field in required_fields:
if not item.get(field):
return False
# 验证评分范围
try:
rating = float(item['rating'])
if not 1 <= rating <= 5:
return False
except (ValueError, TypeError):
return False
return True
@staticmethod
def clean_price(price_str: str) -> Optional[float]:
"""
清洗价格数据
"""
if not price_str:
return None
# 移除货币符号和空格
import re
cleaned = re.sub(r'[^0-9.,]', '', str(price_str))
# 处理千分位分隔符
if ',' in cleaned and '.' in cleaned:
# 假设最后一个是小数点
parts = cleaned.split('.')
if len(parts[-1]) <= 2: # 小数部分
cleaned = ''.join(parts[:-1]).replace(',', '') + '.' + parts[-1]
else:
cleaned = cleaned.replace(',', '')
else:
cleaned = cleaned.replace(',', '')
try:
return float(cleaned)
except ValueError:
return None
@staticmethod
def clean_text(text: str) -> str:
"""
清洗文本数据
"""
if not text:
return ""
import re
# 移除多余空白
text = re.sub(r'\\s+', ' ', text.strip())
# 移除HTML标签
text = re.sub(r'<[^>]+>', '', text)
# 解码HTML实体
import html
text = html.unescape(text)
return text
'''
print("📄 数据模型代码:")
print(data_model_code)
# 演示电商项目
ecommerce_project = EcommerceProject()
ecommerce_project.analyze_requirements()
ecommerce_project.design_data_model()
print("电商项目需求分析完成!")
2.2 爬虫实现
”`python
4. 电商爬虫实现
print(“\n🕷️ 电商爬虫实现:”)
class EcommerceSpiderImplementation: “”” 电商爬虫实现 “””
def demonstrate_amazon_spider(self):
"""
演示Amazon爬虫
"""
print("\n🛒 Amazon商品爬虫:")
amazon_spider_code = '''
import scrapy import json import re from urllib.parse import urljoin, urlparse from scrapy.http import Request from ..items import ProductItem, ReviewItem from ..utils.helpers import clean_price, clean_text
class AmazonProductSpider(scrapy.Spider): “”” Amazon商品爬虫 “”” name = ‘amazon_products’ allowed_domains = [‘amazon.com’]
# 自定义设置
custom_settings = {
'DOWNLOAD_DELAY': 2,
'RANDOMIZE_DOWNLOAD_DELAY': True,
'CONCURRENT_REQUESTS': 8,
'CONCURRENT_REQUESTS_PER_DOMAIN': 4,
'USER_AGENT': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'COOKIES_ENABLED': True,
'RETRY_TIMES': 3,
'RETRY_HTTP_CODES': [500, 502, 503, 504, 408, 429],
}
def __init__(self, category=None, max_pages=10, *args, **kwargs):
super().__init__(*args, **kwargs)
self.category = category or 'electronics'
self.max_pages = int(max_pages)
self.page_count = 0
def start_requests(self):
"""
生成初始请求
"""
# 分类页面URL
category_urls = {
'electronics': 'https://www.amazon.com/s?k=electronics',
'books': 'https://www.amazon.com/s?k=books',
'clothing': 'https://www.amazon.com/s?k=clothing',
'home': 'https://www.amazon.com/s?k=home+kitchen',
}
start_url = category_urls.get(self.category, category_urls['electronics'])
yield Request(
url=start_url,
callback=self.parse_product_list,
meta={'page': 1}
)
def parse_product_list(self, response):
"""
解析商品列表页
"""
# 提取商品链接
product_links = response.css('[data-component-type="s-search-result"] h2 a::attr(href)').getall()
for link in product_links:
product_url = urljoin(response.url, link)
yield Request(
url=product_url,
callback=self.parse_product,
meta={'source_url': product_url}
)
# 处理分页
current_page = response.meta.get('page', 1)
if current_page < self.max_pages:
next_page_link = response.css('a[aria-label="Go to next page"]::attr(href)').get()
if next_page_link:
next_page_url = urljoin(response.url, next_page_link)
yield Request(
url=next_page_url,
callback=self.parse_product_list,
meta={'page': current_page + 1}
)
def parse_product(self, response):
"""
解析商品详情页
"""
# 检查页面是否被阻止
if self.is_blocked(response):
self.logger.warning(f"Blocked page detected: {response.url}")
return
# 提取商品信息
item = ProductItem()
# 基本信息
item['product_id'] = self.extract_product_id(response.url)
item['title'] = self.extract_title(response)
item['brand'] = self.extract_brand(response)
item['category'] = self.extract_category(response)
# 价格信息
item['current_price'] = self.extract_current_price(response)
item['original_price'] = self.extract_original_price(response)
item['currency'] = 'USD'
# 详细信息
item['description'] = self.extract_description(response)
item['specifications'] = self.extract_specifications(response)
item['images'] = self.extract_images(response)
# 销售信息
item['seller_name'] = self.extract_seller(response)
item['stock_quantity'] = self.extract_stock(response)
# 评价信息
item['rating'] = self.extract_rating(response)
item['review_count'] = self.extract_review_count(response)
# 元数据
item['source_url'] = response.url
item['source_site'] = 'amazon'
item['crawl_time'] = self.get_current_time()
yield item
# 爬取评论
reviews_url = self.get_reviews_url(response)
if reviews_url:
yield Request(
url=reviews_url,
callback=self.parse_reviews,
meta={'product_id': item['product_id']}
)
def parse_reviews(self, response):
"""
解析商品评论
"""
product_id = response.meta['product_id']
# 提取评论列表
review_elements = response.css('[data-hook="review"]')
for review_element in review_elements:
review_item = ReviewItem()
review_item['product_id'] = product_id
review_item['review_id'] = review_element.css('::attr(id)').get()
review_item['user_name'] = review_element.css('[class*="author"] span::text').get()
review_item['rating'] = self.extract_review_rating(review_element)
review_item['title'] = review_element.css('[data-hook="review-title"] span::text').get()
review_item['content'] = review_element.css('[data-hook="review-body"] span::text').get()
review_item['review_date'] = review_element.css('[data-hook="review-date"]::text').get()
review_item['verified_purchase'] = bool(review_element.css('[data-hook="avp-badge"]'))
review_item['helpful_count'] = self.extract_helpful_count(review_element)
review_item['crawl_time'] = self.get_current_time()
# 清洗数据
if review_item['title']:
review_item['title'] = clean_text(review_item['title'])
if review_item['content']:
review_item['content'] = clean_text(review_item['content'])
yield review_item
# 处理评论分页
next_page = response.css('li.a-last a::attr(href)').get()
if next_page:
next_url = urljoin(response.url, next_page)
yield Request(
url=next_url,
callback=self.parse_reviews,
meta={'product_id': product_id}
)
def extract_product_id(self, url):
"""
提取商品ID
"""
# Amazon商品ID通常在URL中
match = re.search(r'/dp/([A-Z0-9]{10})', url)
return match.group(1) if match else None
def extract_title(self, response):
"""
提取商品标题
"""
title_selectors = [
'#productTitle::text',
'.product-title::text',
'h1.a-size-large::text'
]
for selector in title_selectors:
title = response.css(selector).get()
if title:
return clean_text(title)
return None
def extract_brand(self, response):
"""
提取品牌信息
"""
brand_selectors = [
'#bylineInfo::text',
'.a-link-normal[id*="brand"]::text',
'[data-feature-name="bylineInfo"] span::text'
]
for selector in brand_selectors:
brand = response.css(selector).get()
if brand and 'Brand:' in brand:
return brand.replace('Brand:', '').strip()
elif brand:
return clean_text(brand)
return None
def extract_current_price(self, response):
"""
提取当前价格
"""
price_selectors = [
'.a-price-current .a-offscreen::text',
'.a-price .a-offscreen::text',
'#price_inside_buybox::text',
'.a-price-range .a-offscreen::text'
]
for selector in price_selectors:
price = response.css(selector).get()
if price:
return clean_price(price)
return None
def extract_original_price(self, response):
"""
提取原价
"""
original_price_selectors = [
'.a-price-was .a-offscreen::text',
'.a-text-strike .a-offscreen::text'
]
for selector in original_price_selectors:
price = response.css(selector).get()
if price:
return clean_price(price)
return None
def extract_rating(self, response):
"""
提取评分
"""
rating_selectors = [
'.a-icon-alt::text',
'[data-hook="rating-out-of-text"]::text'
]
for selector in rating_selectors:
rating_text = response.css(selector).get()
if rating_text:
match = re.search(r'(\\d+\\.\\d+)', rating_text)
if match:
return float(match.group(1))
return None
def extract_review_count(self, response):
"""
提取评论数量
"""
review_count_selectors = [
'#acrCustomerReviewText::text',
'[data-hook="total-review-count"]::text'
]
for selector in review_count_selectors:
count_text = response.css(selector).get()
if count_text:
match = re.search(r'([\\d,]+)', count_text)
if match:
return int(match.group(1).replace(',', ''))
return None
def is_blocked(self, response):
"""
检查是否被阻止
"""
blocked_indicators = [
'Robot Check',
'Enter the characters you see below',
'Sorry, we just need to make sure you\\'re not a robot',
'captcha'
]
page_text = response.text.lower()
return any(indicator.lower() in page_text for indicator in blocked_indicators)
def get_current_time(self):
"""
获取当前时间
"""
from datetime import datetime
return datetime.now().isoformat()
def get_reviews_url(self, response):
"""
获取评论页面URL
"""
product_id = self.extract_product_id(response.url)
if product_id:
return f"https://www.amazon.com/product-reviews/{product_id}/ref=cm_cr_dp_d_show_all_btm"
return None
”’
print("📄 Amazon爬虫代码:")
print(amazon_spider_code)
def demonstrate_shopify_spider(self):
"""
演示Shopify爬虫
"""
print("\n🏪 Shopify商店爬虫:")
shopify_spider_code = '''
import scrapy import json from urllib.parse import urljoin from ..items import ProductItem
class ShopifySpider(scrapy.Spider): “”” Shopify商店爬虫 “”” name = ‘shopify_products’
def __init__(self, shop_domain=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.shop_domain = shop_domain
if not shop_domain:
raise ValueError("shop_domain is required")
self.allowed_domains = [shop_domain]
self.base_url = f"https://{shop_domain}"
def start_requests(self):
"""
生成初始请求
"""
# 使用Shopify API获取商品
api_url = f"{self.base_url}/products.json"
yield scrapy.Request(
url=api_url,
callback=self.parse_products_api,
meta={'page': 1}
)
def parse_products_api(self, response):
"""
解析商品API响应
"""
try:
data = json.loads(response.text)
products = data.get('products', [])
for product_data in products:
item = self.parse_product_data(product_data)
yield item
# 处理分页
if len(products) == 250: # Shopify API每页最多250个商品
page = response.meta['page']
next_page_url = f"{self.base_url}/products.json?page={page + 1}"
yield scrapy.Request(
url=next_page_url,
callback=self.parse_products_api,
meta={'page': page + 1}
)
except json.JSONDecodeError:
self.logger.error(f"Failed to parse JSON from {response.url}")
def parse_product_data(self, product_data):
"""
解析商品数据
"""
item = ProductItem()
# 基本信息
item['product_id'] = str(product_data.get('id'))
item['title'] = product_data.get('title')
item['brand'] = product_data.get('vendor')
item['category'] = product_data.get('product_type')
# 描述信息
item['description'] = product_data.get('body_html')
# 图片信息
images = []
for image in product_data.get('images', []):
images.append(image.get('src'))
item['images'] = images
# 变体信息(价格等)
variants = product_data.get('variants', [])
if variants:
# 取第一个变体的价格
first_variant = variants[0]
item['current_price'] = float(first_variant.get('price', 0))
item['original_price'] = float(first_variant.get('compare_at_price', 0)) if first_variant.get('compare_at_price') else None
item['stock_quantity'] = first_variant.get('inventory_quantity')
# 元数据
item['source_url'] = f"{self.base_url}/products/{product_data.get('handle')}"
item['source_site'] = 'shopify'
item['crawl_time'] = self.get_current_time()
return item
def get_current_time(self):
"""
获取当前时间
"""
from datetime import datetime
return datetime.now().isoformat()
”’
print("📄 Shopify爬虫代码:")
print(shopify_spider_code)
演示爬虫实现
spider_implementation = EcommerceSpiderImplementation() spider_implementation.demonstrate_amazon_spider() spider_implementation.demonstrate_shopify_spider()
print(“电商爬虫实现演示完成!”)
2.3 数据处理管道
”`python
5. 数据处理管道
print(“\n🔄 数据处理管道:”)
class EcommercePipelines: “”” 电商数据处理管道 “””
def demonstrate_validation_pipeline(self):
"""
演示数据验证管道
"""
print("\n✅ 数据验证管道:")
validation_pipeline_code = '''
import logging from scrapy.exceptions import DropItem from ..items import ProductItem, ReviewItem from ..utils.validators import DataValidator
class ValidationPipeline: “”” 数据验证管道 “””
def __init__(self):
self.validator = DataValidator()
self.stats = {
'total_items': 0,
'valid_items': 0,
'invalid_items': 0,
'validation_errors': {}
}
def process_item(self, item, spider):
"""
处理数据项
"""
self.stats['total_items'] += 1
# 根据数据类型选择验证方法
if isinstance(item, ProductItem):
is_valid, errors = self.validate_product(item)
elif isinstance(item, ReviewItem):
is_valid, errors = self.validate_review(item)
else:
is_valid, errors = True, []
if is_valid:
self.stats['valid_items'] += 1
return item
else:
self.stats['invalid_items'] += 1
self.record_validation_errors(errors)
raise DropItem(f"Invalid item: {errors}")
def validate_product(self, item):
"""
验证商品数据
"""
errors = []
# 必填字段检查
required_fields = ['product_id', 'title', 'source_url']
for field in required_fields:
if not item.get(field):
errors.append(f"Missing required field: {field}")
# 价格验证
if item.get('current_price'):
try:
price = float(item['current_price'])
if price <= 0:
errors.append("Price must be positive")
except (ValueError, TypeError):
errors.append("Invalid price format")
# URL验证
if item.get('source_url'):
if not item['source_url'].startswith(('http://', 'https://')):
errors.append("Invalid URL format")
# 评分验证
if item.get('rating'):
try:
rating = float(item['rating'])
if not 0 <= rating <= 5:
errors.append("Rating must be between 0 and 5")
except (ValueError, TypeError):
errors.append("Invalid rating format")
return len(errors) == 0, errors
def validate_review(self, item):
"""
验证评论数据
"""
errors = []
# 必填字段检查
required_fields = ['review_id', 'product_id', 'content']
for field in required_fields:
if not item.get(field):
errors.append(f"Missing required field: {field}")
# 评分验证
if item.get('rating'):
try:
rating = float(item['rating'])
if not 1 <= rating <= 5:
errors.append("Review rating must be between 1 and 5")
except (ValueError, TypeError):
errors.append("Invalid rating format")
# 内容长度验证
if item.get('content'):
if len(item['content']) < 10:
errors.append("Review content too short")
if len(item['content']) > 10000:
errors.append("Review content too long")
return len(errors) == 0, errors
def record_validation_errors(self, errors):
"""
记录验证错误
"""
for error in errors:
if error not in self.stats['validation_errors']:
self.stats['validation_errors'][error] = 0
self.stats['validation_errors'][error] += 1
def close_spider(self, spider):
"""
爬虫关闭时的统计
"""
spider.logger.info(f"Validation Statistics:")
spider.logger.info(f"Total items: {self.stats['total_items']}")
spider.logger.info(f"Valid items: {self.stats['valid_items']}")
spider.logger.info(f"Invalid items: {self.stats['invalid_items']}")
spider.logger.info(f"Validation errors: {self.stats['validation_errors']}")
”’
print("📄 数据验证管道代码:")
print(validation_pipeline_code)
def demonstrate_cleaning_pipeline(self):
"""
演示数据清洗管道
"""
print("\n🧹 数据清洗管道:")
cleaning_pipeline_code = '''
import re import html from datetime import datetime from scrapy import signals from ..utils.helpers import clean_text, clean_price
class DataCleaningPipeline: “”” 数据清洗管道 “””
def __init__(self):
self.cleaning_stats = {
'items_processed': 0,
'fields_cleaned': 0,
'cleaning_operations': {}
}
def process_item(self, item, spider):
"""
清洗数据项
"""
self.cleaning_stats['items_processed'] += 1
# 清洗文本字段
text_fields = ['title', 'description', 'brand', 'category']
for field in text_fields:
if item.get(field):
original_value = item[field]
cleaned_value = self.clean_text_field(original_value)
if cleaned_value != original_value:
item[field] = cleaned_value
self.record_cleaning_operation('text_cleaning', field)
# 清洗价格字段
price_fields = ['current_price', 'original_price']
for field in price_fields:
if item.get(field):
original_value = item[field]
cleaned_value = self.clean_price_field(original_value)
if cleaned_value != original_value:
item[field] = cleaned_value
self.record_cleaning_operation('price_cleaning', field)
# 清洗数值字段
numeric_fields = ['rating', 'review_count', 'stock_quantity']
for field in numeric_fields:
if item.get(field):
original_value = item[field]
cleaned_value = self.clean_numeric_field(original_value)
if cleaned_value != original_value:
item[field] = cleaned_value
self.record_cleaning_operation('numeric_cleaning', field)
# 清洗URL字段
url_fields = ['source_url']
for field in url_fields:
if item.get(field):
original_value = item[field]
cleaned_value = self.clean_url_field(original_value)
if cleaned_value != original_value:
item[field] = cleaned_value
self.record_cleaning_operation('url_cleaning', field)
# 清洗列表字段
list_fields = ['images', 'categories']
for field in list_fields:
if item.get(field):
original_value = item[field]
cleaned_value = self.clean_list_field(original_value)
if cleaned_value != original_value:
item[field] = cleaned_value
self.record_cleaning_operation('list_cleaning', field)
return item
def clean_text_field(self, text):
"""
清洗文本字段
"""
if not text:
return ""
# 转换为字符串
text = str(text)
# 解码HTML实体
text = html.unescape(text)
# 移除HTML标签
text = re.sub(r'<[^>]+>', '', text)
# 标准化空白字符
text = re.sub(r'\\s+', ' ', text)
# 移除首尾空白
text = text.strip()
# 移除控制字符
text = re.sub(r'[\\x00-\\x1f\\x7f-\\x9f]', '', text)
return text
def clean_price_field(self, price):
"""
清洗价格字段
"""
if not price:
return None
# 如果已经是数字,直接返回
if isinstance(price, (int, float)):
return float(price) if price >= 0 else None
# 转换为字符串并清洗
price_str = str(price)
# 移除货币符号和其他非数字字符
cleaned = re.sub(r'[^\\d.,]', '', price_str)
if not cleaned:
return None
# 处理千分位分隔符
if ',' in cleaned and '.' in cleaned:
# 判断哪个是小数点
comma_pos = cleaned.rfind(',')
dot_pos = cleaned.rfind('.')
if dot_pos > comma_pos:
# 点是小数点,逗号是千分位
cleaned = cleaned.replace(',', '')
else:
# 逗号是小数点,点是千分位
cleaned = cleaned.replace('.', '').replace(',', '.')
elif ',' in cleaned:
# 只有逗号,判断是否为小数点
parts = cleaned.split(',')
if len(parts) == 2 and len(parts[1]) <= 2:
# 可能是小数点
cleaned = cleaned.replace(',', '.')
else:
# 千分位分隔符
cleaned = cleaned.replace(',', '')
try:
return float(cleaned)
except ValueError:
return None
def clean_numeric_field(self, value):
"""
清洗数值字段
"""
if not value:
return None
# 如果已经是数字,直接返回
if isinstance(value, (int, float)):
return value
# 提取数字
numeric_str = re.sub(r'[^\\d.,]', '', str(value))
if not numeric_str:
return None
try:
# 移除千分位分隔符
numeric_str = numeric_str.replace(',', '')
return float(numeric_str)
except ValueError:
return None
def clean_url_field(self, url):
"""
清洗URL字段
"""
if not url:
return ""
url = str(url).strip()
# 移除多余的空白字符
url = re.sub(r'\\s+', '', url)
# 确保URL有协议
if url and not url.startswith(('http://', 'https://')):
if url.startswith('//'):
url = 'https:' + url
elif url.startswith('/'):
# 相对URL,需要基础URL
pass
else:
url = 'https://' + url
return url
def clean_list_field(self, value):
"""
清洗列表字段
"""
if not value:
return []
# 如果不是列表,尝试转换
if not isinstance(value, list):
if isinstance(value, str):
# 尝试按逗号分割
value = [item.strip() for item in value.split(',')]
else:
value = [value]
# 清洗列表中的每个元素
cleaned_list = []
for item in value:
if item:
cleaned_item = self.clean_text_field(str(item))
if cleaned_item:
cleaned_list.append(cleaned_item)
return cleaned_list
def record_cleaning_operation(self, operation_type, field):
"""
记录清洗操作
"""
self.cleaning_stats['fields_cleaned'] += 1
if operation_type not in self.cleaning_stats['cleaning_operations']:
self.cleaning_stats['cleaning_operations'][operation_type] = {}
if field not in self.cleaning_stats['cleaning_operations'][operation_type]:
self.cleaning_stats['cleaning_operations'][operation_type][field] = 0
self.cleaning_stats['cleaning_operations'][operation_type][field] += 1
def close_spider(self, spider):
"""
爬虫关闭时的统计
"""
spider.logger.info(f"Data Cleaning Statistics:")
spider.logger.info(f"Items processed: {self.cleaning_stats['items_processed']}")
spider.logger.info(f"Fields cleaned: {self.cleaning_stats['fields_cleaned']}")
spider.logger.info(f"Cleaning operations: {self.cleaning_stats['cleaning_operations']}")
”’
print("📄 数据清洗管道代码:")
print(cleaning_pipeline_code)
def demonstrate_storage_pipeline(self):
"""
演示数据存储管道
"""
print("\n💾 数据存储管道:")
storage_pipeline_code = '''
import json import pymongo import redis from elasticsearch import Elasticsearch from datetime import datetime from scrapy.exceptions import DropItem
class MultiStoragePipeline: “”” 多存储后端管道 “””
def __init__(self, mongo_uri, mongo_db, redis_host, redis_port, es_host, es_port):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
self.redis_host = redis_host
self.redis_port = redis_port
self.es_host = es_host
self.es_port = es_port
self.storage_stats = {
'mongo_items': 0,
'redis_items': 0,
'es_items': 0,
'storage_errors': 0
}
@classmethod
def from_crawler(cls, crawler):
"""
从爬虫配置创建管道实例
"""
return cls(
mongo_uri=crawler.settings.get("MONGO_URI"),
mongo_db=crawler.settings.get("MONGO_DATABASE"),
redis_host=crawler.settings.get("REDIS_HOST"),
redis_port=crawler.settings.get("REDIS_PORT"),
es_host=crawler.settings.get("ELASTICSEARCH_HOST"),
es_port=crawler.settings.get("ELASTICSEARCH_PORT"),
)
def open_spider(self, spider):
"""
爬虫开始时初始化连接
"""
# MongoDB连接
try:
self.mongo_client = pymongo.MongoClient(self.mongo_uri)
self.mongo_database = self.mongo_client[self.mongo_db]
spider.logger.info("MongoDB connection established")
except Exception as e:
spider.logger.error(f"Failed to connect to MongoDB: {e}")
self.mongo_client = None
# Redis连接
try:
self.redis_client = redis.Redis(
host=self.redis_host,
port=self.redis_port,
decode_responses=True
)
self.redis_client.ping()
spider.logger.info("Redis connection established")
except Exception as e:
spider.logger.error(f"Failed to connect to Redis: {e}")
self.redis_client = None
# Elasticsearch连接
try:
self.es_client = Elasticsearch([{
'host': self.es_host,
'port': self.es_port
}])
spider.logger.info("Elasticsearch connection established")
except Exception as e:
spider.logger.error(f"Failed to connect to Elasticsearch: {e}")
self.es_client = None
def close_spider(self, spider):
"""
爬虫结束时关闭连接
"""
if self.mongo_client:
self.mongo_client.close()
spider.logger.info(f"Storage Statistics:")
spider.logger.info(f"MongoDB items: {self.storage_stats['mongo_items']}")
spider.logger.info(f"Redis items: {self.storage_stats['redis_items']}")
spider.logger.info(f"Elasticsearch items: {self.storage_stats['es_items']}")
spider.logger.info(f"Storage errors: {self.storage_stats['storage_errors']}")
def process_item(self, item, spider):
"""
处理数据项
"""
try:
# 存储到MongoDB
if self.mongo_client:
self.store_to_mongo(item, spider)
# 存储到Redis
if self.redis_client:
self.store_to_redis(item, spider)
# 存储到Elasticsearch
if self.es_client:
self.store_to_elasticsearch(item, spider)
return item
except Exception as e:
self.storage_stats['storage_errors'] += 1
spider.logger.error(f"Storage error: {e}")
raise DropItem(f"Error storing item: {e}")
def store_to_mongo(self, item, spider):
"""
存储到MongoDB
"""
try:
# 根据数据类型选择集合
if hasattr(item, '__class__'):
collection_name = item.__class__.__name__.lower().replace('item', 's')
else:
collection_name = 'items'
collection = self.mongo_database[collection_name]
# 转换为字典
item_dict = dict(item)
item_dict['_created_at'] = datetime.utcnow()
# 使用upsert避免重复
if 'product_id' in item_dict:
filter_dict = {'product_id': item_dict['product_id']}
collection.update_one(
filter_dict,
{'$set': item_dict},
upsert=True
)
else:
collection.insert_one(item_dict)
self.storage_stats['mongo_items'] += 1
except Exception as e:
spider.logger.error(f"MongoDB storage error: {e}")
raise
def store_to_redis(self, item, spider):
"""
存储到Redis
"""
try:
# 生成Redis键
if 'product_id' in item:
key = f"product:{item['product_id']}"
elif 'review_id' in item:
key = f"review:{item['review_id']}"
else:
key = f"item:{hash(str(item))}"
# 存储JSON数据
item_json = json.dumps(dict(item), default=str)
self.redis_client.setex(key, 86400, item_json) # 24小时过期
# 添加到集合
if 'product_id' in item:
self.redis_client.sadd('products', item['product_id'])
self.storage_stats['redis_items'] += 1
except Exception as e:
spider.logger.error(f"Redis storage error: {e}")
raise
def store_to_elasticsearch(self, item, spider):
"""
存储到Elasticsearch
"""
try:
# 确定索引名称
if hasattr(item, '__class__'):
index_name = item.__class__.__name__.lower().replace('item', 's')
else:
index_name = 'items'
# 生成文档ID
if 'product_id' in item:
doc_id = item['product_id']
elif 'review_id' in item:
doc_id = item['review_id']
else:
doc_id = None
# 准备文档
doc = dict(item)
doc['@timestamp'] = datetime.utcnow().isoformat()
# 索引文档
self.es_client.index(
index=index_name,
id=doc_id,
body=doc
)
self.storage_stats['es_items'] += 1
except Exception as e:
spider.logger.error(f"Elasticsearch storage error: {e}")
raise
class FileStoragePipeline: “”” 文件存储管道 “””
def __init__(self, file_path):
self.file_path = file_path
self.files = {}
@classmethod
def from_crawler(cls, crawler):
return cls(
file_path=crawler.settings.get("FILES_STORE", "data")
)
def open_spider(self, spider):
"""
打开文件
"""
import os
os.makedirs(self.file_path, exist_ok=True)
def close_spider(self, spider):
"""
关闭文件
"""
for file_handle in self.files.values():
file_handle.close()
def process_item(self, item, spider):
"""
写入文件
"""
# 根据数据类型确定文件名
if hasattr(item, '__class__'):
filename = f"{item.__class__.__name__.lower()}.jsonl"
else:
filename = "items.jsonl"
filepath = f"{self.file_path}/{filename}"
# 打开文件(如果尚未打开)
if filepath not in self.files:
self.files[filepath] = open(filepath, 'a', encoding='utf-8')
# 写入数据
line = json.dumps(dict(item), ensure_ascii=False, default=str) + '\\n'
self.files[filepath].write(line)
self.files[filepath].flush()
return item
”’
print("📄 数据存储管道代码:")
print(storage_pipeline_code)
演示数据处理管道
pipelines_demo = EcommercePipelines() pipelines_demo.demonstrate_validation_pipeline() pipelines_demo.demonstrate_cleaning_pipeline() pipelines_demo.demonstrate_storage_pipeline()
print(“数据处理管道演示完成!”)
def demonstrate_processing_pipeline(self):
"""
演示数据处理管道
"""
print("\n🔧 数据处理管道:")
processing_code = '''
import re from textblob import TextBlob from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.cluster import KMeans import jieba import jieba.analyse
class NewsProcessingPipeline: “”” 新闻数据处理管道 “””
def __init__(self):
# 初始化分类器
self.category_keywords = {
'technology': ['tech', 'ai', 'software', 'computer', 'digital', 'internet'],
'business': ['business', 'economy', 'finance', 'market', 'company', 'stock'],
'politics': ['politics', 'government', 'election', 'policy', 'law', 'congress'],
'sports': ['sports', 'game', 'team', 'player', 'match', 'championship'],
'health': ['health', 'medical', 'doctor', 'hospital', 'disease', 'treatment'],
'entertainment': ['movie', 'music', 'celebrity', 'entertainment', 'show', 'film']
}
# 初始化TF-IDF向量化器
self.tfidf_vectorizer = TfidfVectorizer(
max_features=1000,
stop_words='english',
ngram_range=(1, 2)
)
# 情感分析缓存
self.sentiment_cache = {}
def process_item(self, item, spider):
"""
处理数据项
"""
try:
# 分类识别
item['category'] = self.classify_article(item)
# 关键词提取
item['keywords'] = self.extract_keywords(item)
# 标签生成
item['tags'] = self.generate_tags(item)
# 情感分析
sentiment_result = self.analyze_sentiment(item.get('content', ''))
item['sentiment'] = sentiment_result['sentiment']
item['sentiment_score'] = sentiment_result['score']
# 语言检测
if not item.get('language'):
item['language'] = self.detect_language(item.get('content', ''))
# 内容质量评分
item['quality_score'] = self.calculate_quality_score(item)
spider.logger.info(f"Processed article: {item.get('title', '')}")
return item
except Exception as e:
spider.logger.error(f"Processing error: {e}")
return item
def classify_article(self, item):
"""
文章分类
"""
title = item.get('title', '').lower()
content = item.get('content', '').lower()
text = f"{title} {content}"
# 基于关键词的分类
category_scores = {}
for category, keywords in self.category_keywords.items():
score = 0
for keyword in keywords:
# 标题中的关键词权重更高
score += title.count(keyword) * 3
score += content.count(keyword)
category_scores[category] = score
# 返回得分最高的分类
if category_scores:
best_category = max(category_scores, key=category_scores.get)
if category_scores[best_category] > 0:
return best_category
return 'general'
def extract_keywords(self, item):
"""
提取关键词
"""
content = item.get('content', '')
language = item.get('language', 'en')
try:
if language == 'zh':
# 中文关键词提取
keywords = jieba.analyse.extract_tags(
content,
topK=10,
withWeight=False
)
else:
# 英文关键词提取
keywords = self.extract_english_keywords(content)
return keywords[:10] # 最多返回10个关键词
except Exception as e:
return []
def extract_english_keywords(self, text):
"""
提取英文关键词
"""
try:
# 清理文本
text = re.sub(r'[^a-zA-Z\\s]', '', text)
text = text.lower()
# 使用TF-IDF提取关键词
tfidf_matrix = self.tfidf_vectorizer.fit_transform([text])
feature_names = self.tfidf_vectorizer.get_feature_names_out()
tfidf_scores = tfidf_matrix.toarray()[0]
# 获取得分最高的词
keyword_scores = list(zip(feature_names, tfidf_scores))
keyword_scores.sort(key=lambda x: x[1], reverse=True)
return [keyword for keyword, score in keyword_scores[:10] if score > 0]
except Exception:
# 简单的关键词提取
words = text.split()
word_freq = {}
for word in words:
if len(word) > 3: # 只考虑长度大于3的词
word_freq[word] = word_freq.get(word, 0) + 1
# 返回频率最高的词
sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
return [word for word, freq in sorted_words[:10]]
def generate_tags(self, item):
"""
生成标签
"""
tags = []
# 基于分类的标签
category = item.get('category', '')
if category and category != 'general':
tags.append(category)
# 基于关键词的标签
keywords = item.get('keywords', [])
tags.extend(keywords[:5]) # 取前5个关键词作为标签
# 基于来源的标签
source = item.get('source', '')
if source:
tags.append(f"source:{source}")
# 基于语言的标签
language = item.get('language', '')
if language:
tags.append(f"lang:{language}")
# 去重并返回
return list(set(tags))
def analyze_sentiment(self, text):
"""
情感分析
"""
if not text:
return {'sentiment': 'neutral', 'score': 0.0}
# 检查缓存
text_hash = hash(text)
if text_hash in self.sentiment_cache:
return self.sentiment_cache[text_hash]
try:
# 使用TextBlob进行情感分析
blob = TextBlob(text)
polarity = blob.sentiment.polarity
# 确定情感类别
if polarity > 0.1:
sentiment = 'positive'
elif polarity < -0.1:
sentiment = 'negative'
else:
sentiment = 'neutral'
result = {
'sentiment': sentiment,
'score': round(polarity, 3)
}
# 缓存结果
self.sentiment_cache[text_hash] = result
return result
except Exception:
return {'sentiment': 'neutral', 'score': 0.0}
def detect_language(self, text):
"""
检测语言
"""
if not text:
return 'unknown'
try:
from langdetect import detect
return detect(text)
except:
# 简单的语言检测
chinese_chars = len([c for c in text if '\\u4e00' <= c <= '\\u9fff'])
if chinese_chars > len(text) * 0.3:
return 'zh'
else:
return 'en'
def calculate_quality_score(self, item):
"""
计算内容质量评分
"""
score = 0.0
# 标题质量 (20%)
title = item.get('title', '')
if title:
if 10 <= len(title) <= 100:
score += 0.2
elif len(title) > 100:
score += 0.1
# 内容长度 (30%)
content = item.get('content', '')
if content:
word_count = len(content.split())
if word_count >= 300:
score += 0.3
elif word_count >= 100:
score += 0.2
elif word_count >= 50:
score += 0.1
# 作者信息 (10%)
if item.get('author'):
score += 0.1
# 图片数量 (10%)
images = item.get('images', [])
if len(images) >= 3:
score += 0.1
elif len(images) >= 1:
score += 0.05
# 关键词数量 (15%)
keywords = item.get('keywords', [])
if len(keywords) >= 5:
score += 0.15
elif len(keywords) >= 3:
score += 0.1
elif len(keywords) >= 1:
score += 0.05
# 分类明确性 (15%)
category = item.get('category', '')
if category and category != 'general':
score += 0.15
return round(min(score, 1.0), 2)
”’
print("📄 数据处理管道代码:")
print(processing_code)
def demonstrate_storage_pipeline(self):
"""
演示存储管道
"""
print("\n💾 存储管道:")
storage_code = '''
import json import pymongo import elasticsearch from datetime import datetime
class NewsStoragePipeline: “”” 新闻存储管道 “””
def __init__(self, mongo_uri='mongodb://localhost:27017/',
mongo_db='news_db', es_host='localhost:9200'):
# MongoDB配置
self.mongo_client = pymongo.MongoClient(mongo_uri)
self.mongo_db = self.mongo_client[mongo_db]
self.news_collection = self.mongo_db['news']
# Elasticsearch配置
try:
self.es_client = elasticsearch.Elasticsearch([es_host])
self.es_index = 'news_index'
self.setup_elasticsearch_mapping()
except Exception as e:
self.es_client = None
print(f"Elasticsearch connection failed: {e}")
@classmethod
def from_crawler(cls, crawler):
"""
从爬虫配置创建实例
"""
settings = crawler.settings
return cls(
mongo_uri=settings.get('MONGO_URI', 'mongodb://localhost:27017/'),
mongo_db=settings.get('MONGO_DB', 'news_db'),
es_host=settings.get('ELASTICSEARCH_HOST', 'localhost:9200')
)
def setup_elasticsearch_mapping(self):
"""
设置Elasticsearch映射
"""
if not self.es_client:
return
mapping = {
"mappings": {
"properties": {
"article_id": {"type": "keyword"},
"title": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {"type": "keyword"}
}
},
"content": {
"type": "text",
"analyzer": "standard"
},
"summary": {
"type": "text",
"analyzer": "standard"
},
"source": {"type": "keyword"},
"source_url": {"type": "keyword"},
"author": {"type": "keyword"},
"category": {"type": "keyword"},
"keywords": {"type": "keyword"},
"tags": {"type": "keyword"},
"language": {"type": "keyword"},
"sentiment": {"type": "keyword"},
"sentiment_score": {"type": "float"},
"quality_score": {"type": "float"},
"publish_time": {"type": "date"},
"crawl_time": {"type": "date"},
"word_count": {"type": "integer"},
"reading_time": {"type": "integer"},
"view_count": {"type": "integer"},
"comment_count": {"type": "integer"},
"share_count": {"type": "integer"}
}
}
}
try:
if not self.es_client.indices.exists(index=self.es_index):
self.es_client.indices.create(index=self.es_index, body=mapping)
except Exception as e:
print(f"Failed to create Elasticsearch mapping: {e}")
def process_item(self, item, spider):
"""
处理数据项
"""
try:
# 转换为字典
item_dict = dict(item)
# 存储到MongoDB
self.store_to_mongodb(item_dict, spider)
# 存储到Elasticsearch
if self.es_client:
self.store_to_elasticsearch(item_dict, spider)
# 存储到文件(可选)
self.store_to_file(item_dict, spider)
spider.logger.info(f"Stored article: {item_dict.get('title', '')}")
return item
except Exception as e:
spider.logger.error(f"Storage error: {e}")
return item
def store_to_mongodb(self, item_dict, spider):
"""
存储到MongoDB
"""
try:
# 检查是否已存在
existing = self.news_collection.find_one({
'article_id': item_dict.get('article_id')
})
if existing:
# 更新现有记录
self.news_collection.update_one(
{'article_id': item_dict.get('article_id')},
{'$set': item_dict}
)
spider.logger.info(f"Updated article in MongoDB: {item_dict.get('title', '')}")
else:
# 插入新记录
self.news_collection.insert_one(item_dict)
spider.logger.info(f"Inserted article to MongoDB: {item_dict.get('title', '')}")
except Exception as e:
spider.logger.error(f"MongoDB storage error: {e}")
def store_to_elasticsearch(self, item_dict, spider):
"""
存储到Elasticsearch
"""
try:
# 准备文档
doc = {
'article_id': item_dict.get('article_id'),
'title': item_dict.get('title'),
'content': item_dict.get('content'),
'summary': item_dict.get('summary'),
'source': item_dict.get('source'),
'source_url': item_dict.get('source_url'),
'author': item_dict.get('author'),
'category': item_dict.get('category'),
'keywords': item_dict.get('keywords', []),
'tags': item_dict.get('tags', []),
'language': item_dict.get('language'),
'sentiment': item_dict.get('sentiment'),
'sentiment_score': item_dict.get('sentiment_score', 0.0),
'quality_score': item_dict.get('quality_score', 0.0),
'publish_time': item_dict.get('publish_time'),
'crawl_time': item_dict.get('crawl_time'),
'word_count': item_dict.get('word_count', 0),
'reading_time': item_dict.get('reading_time', 0),
'view_count': item_dict.get('view_count', 0),
'comment_count': item_dict.get('comment_count', 0),
'share_count': item_dict.get('share_count', 0)
}
# 索引文档
self.es_client.index(
index=self.es_index,
id=item_dict.get('article_id'),
body=doc
)
spider.logger.info(f"Indexed article to Elasticsearch: {item_dict.get('title', '')}")
except Exception as e:
spider.logger.error(f"Elasticsearch storage error: {e}")
def store_to_file(self, item_dict, spider):
"""
存储到文件(JSON格式)
"""
try:
# 准备文件名
date_str = datetime.now().strftime('%Y-%m-%d')
filename = f"news_{date_str}.jsonl"
# 序列化时间对象
serializable_item = {}
for key, value in item_dict.items():
if isinstance(value, datetime):
serializable_item[key] = value.isoformat()
else:
serializable_item[key] = value
# 写入文件
with open(filename, 'a', encoding='utf-8') as f:
f.write(json.dumps(serializable_item, ensure_ascii=False) + '\\n')
except Exception as e:
spider.logger.error(f"File storage error: {e}")
def close_spider(self, spider):
"""
爬虫关闭时的清理工作
"""
try:
if self.mongo_client:
self.mongo_client.close()
spider.logger.info("Storage pipeline closed successfully")
except Exception as e:
spider.logger.error(f"Error closing storage pipeline: {e}")
”’
print("📄 存储管道代码:")
print(storage_code)
继续演示数据处理管道
pipeline_demo.demonstrate_processing_pipeline() pipeline_demo.demonstrate_storage_pipeline()
print(“完整数据处理管道演示完成!”)
4. 数据分析与可视化
”`python
10. 数据分析与可视化
print(“\n📊 数据分析与可视化:”)
class NewsDataAnalysis: “”” 新闻数据分析 “””
def __init__(self, mongo_uri='mongodb://localhost:27017/', mongo_db='news_db'):
self.mongo_client = pymongo.MongoClient(mongo_uri)
self.mongo_db = self.mongo_client[mongo_db]
self.news_collection = self.mongo_db['news']
def demonstrate_basic_analysis(self):
"""
演示基础数据分析
"""
print("\n📈 基础数据分析:")
analysis_code = '''
import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from collections import Counter from datetime import datetime, timedelta import numpy as np
class BasicNewsAnalysis: “”” 基础新闻数据分析 “””
def __init__(self, news_collection):
self.collection = news_collection
def get_basic_statistics(self):
"""
获取基础统计信息
"""
print("📊 基础统计信息:")
# 总文章数
total_articles = self.collection.count_documents({})
print(f"总文章数: {total_articles}")
# 按来源统计
source_stats = list(self.collection.aggregate([
{"$group": {"_id": "$source", "count": {"$sum": 1}}},
{"$sort": {"count": -1}},
{"$limit": 10}
]))
print("\\n📰 按来源统计 (Top 10):")
for stat in source_stats:
print(f" {stat['_id']}: {stat['count']} 篇")
# 按分类统计
category_stats = list(self.collection.aggregate([
{"$group": {"_id": "$category", "count": {"$sum": 1}}},
{"$sort": {"count": -1}}
]))
print("\\n🏷️ 按分类统计:")
for stat in category_stats:
print(f" {stat['_id']}: {stat['count']} 篇")
# 按语言统计
language_stats = list(self.collection.aggregate([
{"$group": {"_id": "$language", "count": {"$sum": 1}}},
{"$sort": {"count": -1}}
]))
print("\\n🌍 按语言统计:")
for stat in language_stats:
print(f" {stat['_id']}: {stat['count']} 篇")
return {
'total_articles': total_articles,
'source_stats': source_stats,
'category_stats': category_stats,
'language_stats': language_stats
}
def analyze_time_trends(self):
"""
分析时间趋势
"""
print("\\n⏰ 时间趋势分析:")
# 按日期统计
daily_stats = list(self.collection.aggregate([
{
"$group": {
"_id": {
"$dateToString": {
"format": "%Y-%m-%d",
"date": "$publish_time"
}
},
"count": {"$sum": 1}
}
},
{"$sort": {"_id": 1}},
{"$limit": 30} # 最近30天
]))
print("📅 每日发布量 (最近30天):")
for stat in daily_stats[-10:]: # 显示最近10天
print(f" {stat['_id']}: {stat['count']} 篇")
# 按小时统计
hourly_stats = list(self.collection.aggregate([
{
"$group": {
"_id": {"$hour": "$publish_time"},
"count": {"$sum": 1}
}
},
{"$sort": {"_id": 1}}
]))
print("\\n🕐 按小时统计:")
for stat in hourly_stats:
hour = stat['_id']
count = stat['count']
print(f" {hour:02d}:00 - {count} 篇")
return {
'daily_stats': daily_stats,
'hourly_stats': hourly_stats
}
def analyze_content_quality(self):
"""
分析内容质量
"""
print("\\n⭐ 内容质量分析:")
# 质量评分分布
quality_stats = list(self.collection.aggregate([
{
"$bucket": {
"groupBy": "$quality_score",
"boundaries": [0, 0.2, 0.4, 0.6, 0.8, 1.0],
"default": "其他",
"output": {"count": {"$sum": 1}}
}
}
]))
print("📊 质量评分分布:")
quality_labels = ["0-0.2", "0.2-0.4", "0.4-0.6", "0.6-0.8", "0.8-1.0"]
for i, stat in enumerate(quality_stats):
if i < len(quality_labels):
print(f" {quality_labels[i]}: {stat['count']} 篇")
# 平均质量评分
avg_quality = list(self.collection.aggregate([
{"$group": {"_id": None, "avg_quality": {"$avg": "$quality_score"}}}
]))
if avg_quality:
print(f"\\n📈 平均质量评分: {avg_quality[0]['avg_quality']:.3f}")
# 字数统计
word_count_stats = list(self.collection.aggregate([
{
"$group": {
"_id": None,
"avg_words": {"$avg": "$word_count"},
"min_words": {"$min": "$word_count"},
"max_words": {"$max": "$word_count"}
}
}
]))
if word_count_stats:
stats = word_count_stats[0]
print(f"\\n📝 字数统计:")
print(f" 平均字数: {stats['avg_words']:.0f}")
print(f" 最少字数: {stats['min_words']}")
print(f" 最多字数: {stats['max_words']}")
return {
'quality_stats': quality_stats,
'avg_quality': avg_quality[0]['avg_quality'] if avg_quality else 0,
'word_count_stats': word_count_stats[0] if word_count_stats else {}
}
def analyze_sentiment_distribution(self):
"""
分析情感分布
"""
print("\\n😊 情感分析:")
# 情感分布
sentiment_stats = list(self.collection.aggregate([
{"$group": {"_id": "$sentiment", "count": {"$sum": 1}}},
{"$sort": {"count": -1}}
]))
print("💭 情感分布:")
for stat in sentiment_stats:
print(f" {stat['_id']}: {stat['count']} 篇")
# 按分类的情感分布
category_sentiment = list(self.collection.aggregate([
{
"$group": {
"_id": {"category": "$category", "sentiment": "$sentiment"},
"count": {"$sum": 1}
}
},
{"$sort": {"_id.category": 1, "count": -1}}
]))
print("\\n📊 按分类的情感分布:")
current_category = None
for stat in category_sentiment:
category = stat['_id']['category']
sentiment = stat['_id']['sentiment']
count = stat['count']
if category != current_category:
print(f"\\n {category}:")
current_category = category
print(f" {sentiment}: {count} 篇")
return {
'sentiment_stats': sentiment_stats,
'category_sentiment': category_sentiment
}
def get_top_keywords(self, limit=20):
"""
获取热门关键词
"""
print(f"\\n🔥 热门关键词 (Top {limit}):")
# 聚合所有关键词
pipeline = [
{"$unwind": "$keywords"},
{"$group": {"_id": "$keywords", "count": {"$sum": 1}}},
{"$sort": {"count": -1}},
{"$limit": limit}
]
top_keywords = list(self.collection.aggregate(pipeline))
for i, keyword_stat in enumerate(top_keywords, 1):
keyword = keyword_stat['_id']
count = keyword_stat['count']
print(f" {i:2d}. {keyword}: {count} 次")
return top_keywords
def generate_summary_report(self):
"""
生成汇总报告
"""
print("\\n📋 数据汇总报告:")
print("=" * 50)
# 获取各项统计
basic_stats = self.get_basic_statistics()
time_trends = self.analyze_time_trends()
quality_analysis = self.analyze_content_quality()
sentiment_analysis = self.analyze_sentiment_distribution()
top_keywords = self.get_top_keywords(10)
# 生成报告
report = {
'generated_at': datetime.now().isoformat(),
'basic_statistics': basic_stats,
'time_trends': time_trends,
'quality_analysis': quality_analysis,
'sentiment_analysis': sentiment_analysis,
'top_keywords': top_keywords
}
print("\\n✅ 报告生成完成!")
return report
”’
print("📄 基础数据分析代码:")
print(analysis_code)
演示数据分析
analysis_demo = NewsDataAnalysis() analysis_demo.demonstrate_basic_analysis()
print(“数据分析演示完成!”)
2.4 监控与告警系统
”`python
6. 监控与告警系统
print(“\n📊 监控与告警系统:”)
class EcommerceMonitoring: “”” 电商爬虫监控系统 “””
def demonstrate_performance_monitoring(self):
"""
演示性能监控
"""
print("\n📈 性能监控系统:")
monitoring_code = '''
import time import psutil import threading from datetime import datetime, timedelta from collections import defaultdict, deque from scrapy import signals from scrapy.extensions import telnet
class PerformanceMonitor: “”” 性能监控器 “””
def __init__(self, crawler):
self.crawler = crawler
self.stats = crawler.stats
self.settings = crawler.settings
# 监控数据存储
self.metrics = defaultdict(deque)
self.alerts = []
self.start_time = None
# 监控配置
self.monitor_interval = self.settings.getint('MONITOR_INTERVAL', 60)
self.alert_thresholds = {
'cpu_usage': self.settings.getfloat('CPU_ALERT_THRESHOLD', 80.0),
'memory_usage': self.settings.getfloat('MEMORY_ALERT_THRESHOLD', 80.0),
'error_rate': self.settings.getfloat('ERROR_RATE_THRESHOLD', 5.0),
'response_time': self.settings.getfloat('RESPONSE_TIME_THRESHOLD', 10.0)
}
# 启动监控线程
self.monitoring_thread = None
self.stop_monitoring = threading.Event()
@classmethod
def from_crawler(cls, crawler):
"""
从爬虫创建监控器
"""
ext = cls(crawler)
# 连接信号
crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
crawler.signals.connect(ext.request_scheduled, signal=signals.request_scheduled)
crawler.signals.connect(ext.response_received, signal=signals.response_received)
crawler.signals.connect(ext.item_scraped, signal=signals.item_scraped)
crawler.signals.connect(ext.item_dropped, signal=signals.item_dropped)
return ext
def spider_opened(self, spider):
"""
爬虫开始时启动监控
"""
self.start_time = datetime.now()
spider.logger.info("Performance monitoring started")
# 启动监控线程
self.monitoring_thread = threading.Thread(target=self.monitor_loop)
self.monitoring_thread.daemon = True
self.monitoring_thread.start()
def spider_closed(self, spider, reason):
"""
爬虫结束时停止监控
"""
self.stop_monitoring.set()
if self.monitoring_thread:
self.monitoring_thread.join(timeout=5)
# 生成最终报告
self.generate_final_report(spider)
def monitor_loop(self):
"""
监控循环
"""
while not self.stop_monitoring.wait(self.monitor_interval):
try:
self.collect_system_metrics()
self.collect_scrapy_metrics()
self.check_alerts()
except Exception as e:
print(f"Monitoring error: {e}")
def collect_system_metrics(self):
"""
收集系统指标
"""
current_time = datetime.now()
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
self.metrics['cpu_usage'].append((current_time, cpu_percent))
# 内存使用率
memory = psutil.virtual_memory()
memory_percent = memory.percent
self.metrics['memory_usage'].append((current_time, memory_percent))
# 磁盘使用率
disk = psutil.disk_usage('/')
disk_percent = (disk.used / disk.total) * 100
self.metrics['disk_usage'].append((current_time, disk_percent))
# 网络IO
network = psutil.net_io_counters()
self.metrics['network_sent'].append((current_time, network.bytes_sent))
self.metrics['network_recv'].append((current_time, network.bytes_recv))
# 保持最近1小时的数据
cutoff_time = current_time - timedelta(hours=1)
for metric_name, metric_data in self.metrics.items():
while metric_data and metric_data[0][0] < cutoff_time:
metric_data.popleft()
def collect_scrapy_metrics(self):
"""
收集Scrapy指标
"""
current_time = datetime.now()
# 请求统计
requests_total = self.stats.get_value('downloader/request_count', 0)
responses_total = self.stats.get_value('downloader/response_count', 0)
# 错误统计
errors_total = sum([
self.stats.get_value('downloader/exception_count', 0),
self.stats.get_value('spider_exceptions', 0),
self.stats.get_value('item_dropped_count', 0)
])
# 计算错误率
error_rate = (errors_total / max(requests_total, 1)) * 100
# 响应时间
response_times = self.stats.get_value('downloader/response_time', [])
avg_response_time = sum(response_times) / len(response_times) if response_times else 0
# 存储指标
self.metrics['requests_total'].append((current_time, requests_total))
self.metrics['responses_total'].append((current_time, responses_total))
self.metrics['errors_total'].append((current_time, errors_total))
self.metrics['error_rate'].append((current_time, error_rate))
self.metrics['avg_response_time'].append((current_time, avg_response_time))
# 数据项统计
items_scraped = self.stats.get_value('item_scraped_count', 0)
items_dropped = self.stats.get_value('item_dropped_count', 0)
self.metrics['items_scraped'].append((current_time, items_scraped))
self.metrics['items_dropped'].append((current_time, items_dropped))
def check_alerts(self):
"""
检查告警条件
"""
current_time = datetime.now()
# 检查CPU使用率
if self.metrics['cpu_usage']:
latest_cpu = self.metrics['cpu_usage'][-1][1]
if latest_cpu > self.alert_thresholds['cpu_usage']:
self.create_alert('HIGH_CPU_USAGE', f"CPU usage: {latest_cpu:.1f}%")
# 检查内存使用率
if self.metrics['memory_usage']:
latest_memory = self.metrics['memory_usage'][-1][1]
if latest_memory > self.alert_thresholds['memory_usage']:
self.create_alert('HIGH_MEMORY_USAGE', f"Memory usage: {latest_memory:.1f}%")
# 检查错误率
if self.metrics['error_rate']:
latest_error_rate = self.metrics['error_rate'][-1][1]
if latest_error_rate > self.alert_thresholds['error_rate']:
self.create_alert('HIGH_ERROR_RATE', f"Error rate: {latest_error_rate:.1f}%")
# 检查响应时间
if self.metrics['avg_response_time']:
latest_response_time = self.metrics['avg_response_time'][-1][1]
if latest_response_time > self.alert_thresholds['response_time']:
self.create_alert('HIGH_RESPONSE_TIME', f"Response time: {latest_response_time:.1f}s")
def create_alert(self, alert_type, message):
"""
创建告警
"""
alert = {
'type': alert_type,
'message': message,
'timestamp': datetime.now(),
'severity': self.get_alert_severity(alert_type)
}
self.alerts.append(alert)
# 发送告警通知
self.send_alert_notification(alert)
def get_alert_severity(self, alert_type):
"""
获取告警严重程度
"""
severity_map = {
'HIGH_CPU_USAGE': 'WARNING',
'HIGH_MEMORY_USAGE': 'WARNING',
'HIGH_ERROR_RATE': 'CRITICAL',
'HIGH_RESPONSE_TIME': 'WARNING'
}
return severity_map.get(alert_type, 'INFO')
def send_alert_notification(self, alert):
"""
发送告警通知
"""
# 这里可以集成邮件、Slack、钉钉等通知方式
print(f"🚨 ALERT [{alert['severity']}]: {alert['message']}")
def request_scheduled(self, request, spider):
"""
请求调度时的回调
"""
pass
def response_received(self, response, request, spider):
"""
响应接收时的回调
"""
# 记录响应时间
if hasattr(request, 'meta') and 'download_start_time' in request.meta:
response_time = time.time() - request.meta['download_start_time']
# 更新统计
response_times = self.stats.get_value('downloader/response_time', [])
response_times.append(response_time)
self.stats.set_value('downloader/response_time', response_times[-100:]) # 保持最近100个
def item_scraped(self, item, response, spider):
"""
数据项抓取时的回调
"""
pass
def item_dropped(self, item, response, exception, spider):
"""
数据项丢弃时的回调
"""
pass
def generate_final_report(self, spider):
"""
生成最终报告
"""
if not self.start_time:
return
end_time = datetime.now()
duration = end_time - self.start_time
# 基本统计
total_requests = self.stats.get_value('downloader/request_count', 0)
total_responses = self.stats.get_value('downloader/response_count', 0)
total_items = self.stats.get_value('item_scraped_count', 0)
total_errors = sum([
self.stats.get_value('downloader/exception_count', 0),
self.stats.get_value('spider_exceptions', 0),
self.stats.get_value('item_dropped_count', 0)
])
# 性能指标
avg_requests_per_minute = (total_requests / duration.total_seconds()) * 60
success_rate = (total_responses / max(total_requests, 1)) * 100
error_rate = (total_errors / max(total_requests, 1)) * 100
# 生成报告
report = f'''
📊 爬虫性能报告
⏱️ 运行时间: {duration} 📈 基本统计: • 总请求数: {total_requests:,} • 总响应数: {total_responses:,} • 总数据项: {total_items:,} • 总错误数: {total_errors:,}
📊 性能指标: • 平均请求/分钟: {avg_requests_per_minute:.1f} • 成功率: {success_rate:.1f}% • 错误率: {error_rate:.1f}%
🚨 告警统计: • 总告警数: {len(self.alerts)} • 严重告警: {len([a for a in self.alerts if a[‘severity’] == ‘CRITICAL’])} • 警告告警: {len([a for a in self.alerts if a[‘severity’] == ‘WARNING’])} “’
spider.logger.info(report)
# 保存报告到文件
report_file = f"performance_report_{self.start_time.strftime('%Y%m%d_%H%M%S')}.txt"
with open(report_file, 'w', encoding='utf-8') as f:
f.write(report)
spider.logger.info(f"Performance report saved to {report_file}")
在settings.py中启用监控扩展
EXTENSIONS = { ‘myproject.extensions.PerformanceMonitor’: 500, }
监控配置
MONITOR_INTERVAL = 30 # 监控间隔(秒) CPU_ALERT_THRESHOLD = 80.0 # CPU告警阈值 MEMORY_ALERT_THRESHOLD = 80.0 # 内存告警阈值 ERROR_RATE_THRESHOLD = 5.0 # 错误率告警阈值 RESPONSE_TIME_THRESHOLD = 10.0 # 响应时间告警阈值 “’
print("📄 性能监控代码:")
print(monitoring_code)
def demonstrate_dashboard_system(self):
"""
演示监控面板
"""
print("\n📊 监控面板系统:")
dashboard_code = '''
import json import asyncio from datetime import datetime, timedelta from flask import Flask, render_template, jsonify from flask_socketio import SocketIO, emit import redis import pymongo from threading import Thread
class ScrapyDashboard: “”” Scrapy监控面板 “””
def __init__(self, redis_host='localhost', redis_port=6379, mongo_uri='mongodb://localhost:27017'):
self.app = Flask(__name__)
self.app.config['SECRET_KEY'] = 'scrapy_dashboard_secret'
self.socketio = SocketIO(self.app, cors_allowed_origins="*")
# 数据库连接
self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
self.mongo_client = pymongo.MongoClient(mongo_uri)
self.mongo_db = self.mongo_client['scrapy_monitoring']
# 注册路由
self.register_routes()
self.register_socketio_events()
# 启动数据推送线程
self.start_data_pusher()
def register_routes(self):
"""
注册Web路由
"""
@self.app.route('/')
def index():
return render_template('dashboard.html')
@self.app.route('/api/stats')
def get_stats():
"""
获取统计数据
"""
stats = self.get_current_stats()
return jsonify(stats)
@self.app.route('/api/spiders')
def get_spiders():
"""
获取爬虫列表
"""
spiders = self.get_spider_list()
return jsonify(spiders)
@self.app.route('/api/alerts')
def get_alerts():
"""
获取告警信息
"""
alerts = self.get_recent_alerts()
return jsonify(alerts)
@self.app.route('/api/performance/<spider_name>')
def get_performance(spider_name):
"""
获取爬虫性能数据
"""
performance = self.get_spider_performance(spider_name)
return jsonify(performance)
def register_socketio_events(self):
"""
注册SocketIO事件
"""
@self.socketio.on('connect')
def handle_connect():
print('Client connected')
# 发送初始数据
emit('stats_update', self.get_current_stats())
@self.socketio.on('disconnect')
def handle_disconnect():
print('Client disconnected')
def start_data_pusher(self):
"""
启动数据推送线程
"""
def push_data():
while True:
try:
# 推送实时统计数据
stats = self.get_current_stats()
self.socketio.emit('stats_update', stats)
# 推送告警信息
alerts = self.get_recent_alerts()
self.socketio.emit('alerts_update', alerts)
asyncio.sleep(5) # 每5秒推送一次
except Exception as e:
print(f"Data push error: {e}")
asyncio.sleep(10)
thread = Thread(target=push_data)
thread.daemon = True
thread.start()
def get_current_stats(self):
"""
获取当前统计数据
"""
try:
# 从Redis获取实时数据
stats = {
'total_spiders': len(self.get_spider_list()),
'active_spiders': self.redis_client.scard('active_spiders'),
'total_requests': self.redis_client.get('total_requests') or 0,
'total_items': self.redis_client.get('total_items') or 0,
'total_errors': self.redis_client.get('total_errors') or 0,
'system_metrics': self.get_system_metrics(),
'timestamp': datetime.now().isoformat()
}
return stats
except Exception as e:
print(f"Error getting stats: {e}")
return {}
def get_spider_list(self):
"""
获取爬虫列表
"""
try:
# 从MongoDB获取爬虫信息
spiders_collection = self.mongo_db['spiders']
spiders = list(spiders_collection.find({}, {'_id': 0}))
# 添加实时状态
for spider in spiders:
spider_name = spider.get('name')
spider['status'] = 'running' if self.redis_client.sismember('active_spiders', spider_name) else 'stopped'
spider['last_run'] = self.redis_client.get(f'spider:{spider_name}:last_run')
return spiders
except Exception as e:
print(f"Error getting spider list: {e}")
return []
def get_recent_alerts(self):
"""
获取最近的告警
"""
try:
# 从MongoDB获取最近24小时的告警
alerts_collection = self.mongo_db['alerts']
cutoff_time = datetime.now() - timedelta(hours=24)
alerts = list(alerts_collection.find(
{'timestamp': {'$gte': cutoff_time}},
{'_id': 0}
).sort('timestamp', -1).limit(100))
return alerts
except Exception as e:
print(f"Error getting alerts: {e}")
return []
def get_spider_performance(self, spider_name):
"""
获取爬虫性能数据
"""
try:
# 从MongoDB获取性能数据
performance_collection = self.mongo_db['performance']
cutoff_time = datetime.now() - timedelta(hours=24)
performance_data = list(performance_collection.find(
{
'spider_name': spider_name,
'timestamp': {'$gte': cutoff_time}
},
{'_id': 0}
).sort('timestamp', 1))
return performance_data
except Exception as e:
print(f"Error getting performance data: {e}")
return []
def get_system_metrics(self):
"""
获取系统指标
"""
try:
import psutil
metrics = {
'cpu_percent': psutil.cpu_percent(),
'memory_percent': psutil.virtual_memory().percent,
'disk_percent': psutil.disk_usage('/').percent,
'network_io': {
'bytes_sent': psutil.net_io_counters().bytes_sent,
'bytes_recv': psutil.net_io_counters().bytes_recv
}
}
return metrics
except Exception as e:
print(f"Error getting system metrics: {e}")
return {}
def run(self, host='0.0.0.0', port=5000, debug=False):
"""
运行面板服务
"""
self.socketio.run(self.app, host=host, port=port, debug=debug)
HTML模板 (templates/dashboard.html)
dashboard_html = “’ <!DOCTYPE html>
🕷️ Scrapy监控面板
实时监控爬虫运行状态和性能指标
<div class="stats-grid">
<div class="stat-card">
<div class="stat-value" id="total-spiders">0</div>
<div class="stat-label">总爬虫数</div>
</div>
<div class="stat-card">
<div class="stat-value" id="active-spiders">0</div>
<div class="stat-label">运行中</div>
</div>
<div class="stat-card">
<div class="stat-value" id="total-requests">0</div>
<div class="stat-label">总请求数</div>
</div>
<div class="stat-card">
<div class="stat-value" id="total-items">0</div>
<div class="stat-label">总数据项</div>
</div>
</div>
<div class="chart-container">
<h3>系统性能</h3>
<canvas id="performance-chart" width="400" height="200"></canvas>
</div>
<div class="alerts-container">
<h3>最近告警</h3>
<div id="alerts-list"></div>
</div>
</div>
<script>
// 初始化Socket.IO连接
const socket = io();
// 初始化图表
const ctx = document.getElementById('performance-chart').getContext('2d');
const chart = new Chart(ctx, {
type: 'line',
data: {
labels: [],
datasets: [{
label: 'CPU使用率',
data: [],
borderColor: 'rgb(255, 99, 132)',
tension: 0.1
}, {
label: '内存使用率',
data: [],
borderColor: 'rgb(54, 162, 235)',
tension: 0.1
}]
},
options: {
responsive: true,
scales: {
y: {
beginAtZero: true,
max: 100
}
}
}
});
// 监听统计数据更新
socket.on('stats_update', function(data) {
document.getElementById('total-spiders').textContent = data.total_spiders || 0;
document.getElementById('active-spiders').textContent = data.active_spiders || 0;
document.getElementById('total-requests').textContent = (data.total_requests || 0).toLocaleString();
document.getElementById('total-items').textContent = (data.total_items || 0).toLocaleString();
// 更新图表
if (data.system_metrics) {
const now = new Date().toLocaleTimeString();
chart.data.labels.push(now);
chart.data.datasets[0].data.push(data.system_metrics.cpu_percent);
chart.data.datasets[1].data.push(data.system_metrics.memory_percent);
// 保持最近20个数据点
if (chart.data.labels.length > 20) {
chart.data.labels.shift();
chart.data.datasets[0].data.shift();
chart.data.datasets[1].data.shift();
}
chart.update();
}
});
// 监听告警更新
socket.on('alerts_update', function(alerts) {
const alertsList = document.getElementById('alerts-list');
alertsList.innerHTML = '';
alerts.slice(0, 10).forEach(alert => {
const alertDiv = document.createElement('div');
alertDiv.className = `alert ${alert.severity.toLowerCase()}`;
alertDiv.innerHTML = `
<strong>${alert.type}</strong>: ${alert.message}
<small style="float: right;">${new Date(alert.timestamp).toLocaleString()}</small>
`;
alertsList.appendChild(alertDiv);
});
});
</script>
“’
启动监控面板
if name == ‘main’: dashboard = ScrapyDashboard() dashboard.run(debug=True) “’
print("📄 监控面板代码:")
print(dashboard_code)
演示监控系统
monitoring_demo = EcommerceMonitoring() monitoring_demo.demonstrate_performance_monitoring() monitoring_demo.demonstrate_dashboard_system()
print(“监控与告警系统演示完成!”)
3. 新闻聚合项目
3.1 项目概述
”`python
7. 新闻聚合项目
print(“\n📰 新闻聚合项目:”)
class NewsAggregationProject: “”” 新闻聚合项目 “””
def __init__(self):
self.project_name = "news_aggregator"
self.target_sites = ["cnn", "bbc", "reuters", "techcrunch"]
self.categories = ["technology", "business", "politics", "sports"]
def analyze_requirements(self):
"""
需求分析
"""
print("\n📋 新闻聚合项目需求:")
requirements = {
'功能需求': [
'多源新闻采集',
'内容去重和分类',
'情感分析和关键词提取',
'实时更新和推送',
'搜索和推荐功能'
],
'技术需求': [
'支持RSS和网页爬取',
'自然语言处理',
'分布式存储',
'实时数据流处理',
'API接口提供'
],
'性能需求': [
'每小时更新1000+文章',
'去重准确率>90%',
'分类准确率>85%',
'响应时间<2秒',
'支持并发访问'
]
}
for category, items in requirements.items():
print(f"\n📖 {category}:")
for item in items:
print(f" • {item}")
def design_data_model(self):
"""
设计数据模型
"""
print("\n🗄️ 新闻数据模型:")
data_model_code = '''
import scrapy from scrapy import Item, Field from datetime import datetime import hashlib
class NewsItem(scrapy.Item): “”” 新闻数据模型 “”” # 基本信息 article_id = Field() # 文章ID title = Field() # 标题 content = Field() # 内容 summary = Field() # 摘要
# 分类信息
category = Field() # 分类
tags = Field() # 标签
keywords = Field() # 关键词
# 来源信息
source = Field() # 来源网站
author = Field() # 作者
source_url = Field() # 原文链接
# 时间信息
publish_time = Field() # 发布时间
crawl_time = Field() # 爬取时间
update_time = Field() # 更新时间
# 媒体信息
images = Field() # 图片
videos = Field() # 视频
# 统计信息
view_count = Field() # 浏览量
comment_count = Field() # 评论数
share_count = Field() # 分享数
# 分析结果
sentiment = Field() # 情感分析
sentiment_score = Field() # 情感得分
language = Field() # 语言
# 元数据
content_hash = Field() # 内容哈希(用于去重)
word_count = Field() # 字数
reading_time = Field() # 阅读时间
class CommentItem(scrapy.Item): “”” 评论数据模型 “”” comment_id = Field() # 评论ID article_id = Field() # 文章ID user_name = Field() # 用户名 content = Field() # 评论内容 publish_time = Field() # 发布时间 like_count = Field() # 点赞数 reply_count = Field() # 回复数 sentiment = Field() # 情感分析 crawl_time = Field() # 爬取时间
class CategoryItem(scrapy.Item): “”” 分类数据模型 “”” category_id = Field() # 分类ID name = Field() # 分类名称 description = Field() # 描述 parent_category = Field() # 父分类 article_count = Field() # 文章数量
数据处理工具
class NewsDataProcessor: “”” 新闻数据处理器 “””
@staticmethod
def generate_article_id(title, source, publish_time):
"""
生成文章ID
"""
content = f"{title}_{source}_{publish_time}"
return hashlib.md5(content.encode()).hexdigest()
@staticmethod
def generate_content_hash(content):
"""
生成内容哈希
"""
if not content:
return None
# 清理内容
cleaned_content = content.strip().lower()
return hashlib.sha256(cleaned_content.encode()).hexdigest()
@staticmethod
def extract_keywords(text, max_keywords=10):
"""
提取关键词
"""
try:
from textrank4zh import TextRank4Keyword
tr4w = TextRank4Keyword()
tr4w.analyze(text=text, lower=True, window=2)
keywords = []
for item in tr4w.get_keywords(max_keywords, word_min_len=2):
keywords.append(item.word)
return keywords
except ImportError:
# 简单的关键词提取
import re
words = re.findall(r'\\b\\w{2,}\\b', text.lower())
word_freq = {}
for word in words:
word_freq[word] = word_freq.get(word, 0) + 1
# 返回频率最高的词
sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
return [word for word, freq in sorted_words[:max_keywords]]
@staticmethod
def analyze_sentiment(text):
"""
情感分析
"""
try:
from snownlp import SnowNLP
s = SnowNLP(text)
sentiment_score = s.sentiments
if sentiment_score > 0.6:
sentiment = 'positive'
elif sentiment_score < 0.4:
sentiment = 'negative'
else:
sentiment = 'neutral'
return sentiment, sentiment_score
except ImportError:
# 简单的情感分析
positive_words = ['好', '棒', '优秀', '成功', '增长', '提升']
negative_words = ['坏', '差', '失败', '下降', '问题', '危机']
positive_count = sum(1 for word in positive_words if word in text)
negative_count = sum(1 for word in negative_words if word in text)
if positive_count > negative_count:
return 'positive', 0.7
elif negative_count > positive_count:
return 'negative', 0.3
else:
return 'neutral', 0.5
@staticmethod
def classify_category(title, content):
"""
分类识别
"""
# 关键词映射
category_keywords = {
'technology': ['科技', '技术', '互联网', 'AI', '人工智能', '区块链', '5G'],
'business': ['经济', '商业', '金融', '股票', '投资', '企业', '市场'],
'politics': ['政治', '政府', '政策', '法律', '选举', '国际'],
'sports': ['体育', '运动', '足球', '篮球', '奥运', '比赛'],
'entertainment': ['娱乐', '电影', '音乐', '明星', '综艺'],
'health': ['健康', '医疗', '疾病', '药物', '医院', '医生']
}
text = f"{title} {content}".lower()
category_scores = {}
for category, keywords in category_keywords.items():
score = sum(1 for keyword in keywords if keyword in text)
if score > 0:
category_scores[category] = score
if category_scores:
return max(category_scores, key=category_scores.get)
else:
return 'general'
@staticmethod
def calculate_reading_time(content):
"""
计算阅读时间
"""
if not content:
return 0
# 假设平均阅读速度为每分钟200字
word_count = len(content)
reading_time = max(1, word_count // 200) # 至少1分钟
return reading_time
@staticmethod
def generate_summary(content, max_sentences=3):
"""
生成摘要
"""
if not content:
return ""
# 简单的摘要生成:取前几句话
import re
sentences = re.split(r'[。!?]', content)
sentences = [s.strip() for s in sentences if s.strip()]
summary_sentences = sentences[:max_sentences]
return '。'.join(summary_sentences) + '。' if summary_sentences else ""
”’
print("📄 新闻数据模型代码:")
print(data_model_code)
演示新闻聚合项目
news_project = NewsAggregationProject() news_project.analyze_requirements() news_project.design_data_model()
print(“新闻聚合项目概述完成!”)
3.2 新闻爬虫实现
”`python
8. 新闻爬虫实现
print(“\n🕷️ 新闻爬虫实现:”)
class NewsSpiderImplementation: “”” 新闻爬虫实现 “””
def demonstrate_rss_spider(self):
"""
演示RSS爬虫
"""
print("\n📡 RSS新闻爬虫:")
rss_spider_code = '''
import scrapy import feedparser from datetime import datetime from urllib.parse import urljoin from scrapy.http import Request from news_aggregator.items import NewsItem from news_aggregator.utils import NewsDataProcessor
class RSSNewsSpider(scrapy.Spider): “”” RSS新闻爬虫 “”” name = ‘rss_news’
# RSS源配置
rss_feeds = {
'techcrunch': 'https://techcrunch.com/feed/',
'bbc_tech': 'http://feeds.bbci.co.uk/news/technology/rss.xml',
'reuters_tech': 'https://www.reuters.com/technology/feed',
'cnn_tech': 'http://rss.cnn.com/rss/edition.rss'
}
custom_settings = {
'DOWNLOAD_DELAY': 2,
'RANDOMIZE_DOWNLOAD_DELAY': True,
'CONCURRENT_REQUESTS': 8,
'ITEM_PIPELINES': {
'news_aggregator.pipelines.NewsValidationPipeline': 300,
'news_aggregator.pipelines.DuplicationPipeline': 400,
'news_aggregator.pipelines.NewsProcessingPipeline': 500,
'news_aggregator.pipelines.NewsStoragePipeline': 600,
}
}
def start_requests(self):
"""
生成初始请求
"""
for source, feed_url in self.rss_feeds.items():
yield Request(
url=feed_url,
callback=self.parse_rss,
meta={'source': source},
headers={'User-Agent': self.get_user_agent()}
)
def get_user_agent(self):
"""
获取User-Agent
"""
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
]
import random
return random.choice(user_agents)
def parse_rss(self, response):
"""
解析RSS源
"""
source = response.meta['source']
try:
# 解析RSS内容
feed = feedparser.parse(response.text)
self.logger.info(f"Found {len(feed.entries)} articles from {source}")
for entry in feed.entries:
# 提取基本信息
article_data = {
'title': entry.get('title', ''),
'link': entry.get('link', ''),
'description': entry.get('description', ''),
'published': entry.get('published', ''),
'author': entry.get('author', ''),
'source': source
}
# 请求完整文章内容
if article_data['link']:
yield Request(
url=article_data['link'],
callback=self.parse_article,
meta={'article_data': article_data},
headers={'User-Agent': self.get_user_agent()}
)
except Exception as e:
self.logger.error(f"Error parsing RSS from {source}: {e}")
def parse_article(self, response):
"""
解析文章详情
"""
article_data = response.meta['article_data']
source = article_data['source']
try:
# 根据不同源使用不同的解析规则
if source == 'techcrunch':
content = self.parse_techcrunch_article(response)
elif source == 'bbc_tech':
content = self.parse_bbc_article(response)
elif source == 'reuters_tech':
content = self.parse_reuters_article(response)
elif source == 'cnn_tech':
content = self.parse_cnn_article(response)
else:
content = self.parse_generic_article(response)
# 创建新闻项
item = NewsItem()
# 基本信息
item['title'] = article_data['title']
item['content'] = content
item['source_url'] = response.url
item['source'] = source
item['author'] = article_data.get('author', '')
# 时间信息
item['publish_time'] = self.parse_publish_time(article_data.get('published'))
item['crawl_time'] = datetime.now()
# 生成ID和哈希
item['article_id'] = NewsDataProcessor.generate_article_id(
item['title'], item['source'], item['publish_time']
)
item['content_hash'] = NewsDataProcessor.generate_content_hash(content)
# 提取图片
item['images'] = self.extract_images(response)
# 基本处理
item['word_count'] = len(content) if content else 0
item['reading_time'] = NewsDataProcessor.calculate_reading_time(content)
item['summary'] = NewsDataProcessor.generate_summary(content)
# 分类和关键词(在pipeline中处理)
item['category'] = ''
item['keywords'] = []
item['tags'] = []
# 情感分析(在pipeline中处理)
item['sentiment'] = ''
item['sentiment_score'] = 0.0
# 语言检测
item['language'] = self.detect_language(content)
yield item
except Exception as e:
self.logger.error(f"Error parsing article from {response.url}: {e}")
def parse_techcrunch_article(self, response):
"""
解析TechCrunch文章
"""
# 提取文章内容
content_selectors = [
'.article-content .entry-content p::text',
'.post-content p::text',
'.entry-content p::text'
]
content = ""
for selector in content_selectors:
paragraphs = response.css(selector).getall()
if paragraphs:
content = '\\n'.join(paragraphs)
break
return content.strip()
def parse_bbc_article(self, response):
"""
解析BBC文章
"""
# BBC文章内容选择器
content_selectors = [
'[data-component="text-block"] p::text',
'.story-body__inner p::text',
'.post__content p::text'
]
content = ""
for selector in content_selectors:
paragraphs = response.css(selector).getall()
if paragraphs:
content = '\\n'.join(paragraphs)
break
return content.strip()
def parse_reuters_article(self, response):
"""
解析Reuters文章
"""
# Reuters文章内容选择器
content_selectors = [
'.ArticleBodyWrapper p::text',
'.StandardArticleBody_body p::text',
'.article-wrap p::text'
]
content = ""
for selector in content_selectors:
paragraphs = response.css(selector).getall()
if paragraphs:
content = '\\n'.join(paragraphs)
break
return content.strip()
def parse_cnn_article(self, response):
"""
解析CNN文章
"""
# CNN文章内容选择器
content_selectors = [
'.zn-body__paragraph::text',
'.l-container p::text',
'.cnn_storypgraphtxt::text'
]
content = ""
for selector in content_selectors:
paragraphs = response.css(selector).getall()
if paragraphs:
content = '\\n'.join(paragraphs)
break
return content.strip()
def parse_generic_article(self, response):
"""
通用文章解析
"""
# 通用内容选择器
content_selectors = [
'article p::text',
'.content p::text',
'.post-content p::text',
'.entry-content p::text',
'main p::text',
'.article-body p::text'
]
content = ""
for selector in content_selectors:
paragraphs = response.css(selector).getall()
if paragraphs:
content = '\\n'.join(paragraphs)
break
return content.strip()
def extract_images(self, response):
"""
提取文章图片
"""
images = []
# 图片选择器
img_selectors = [
'article img::attr(src)',
'.content img::attr(src)',
'.post-content img::attr(src)',
'.entry-content img::attr(src)'
]
for selector in img_selectors:
img_urls = response.css(selector).getall()
for img_url in img_urls:
if img_url:
# 转换为绝对URL
absolute_url = urljoin(response.url, img_url)
if absolute_url not in images:
images.append(absolute_url)
return images[:5] # 最多保存5张图片
def parse_publish_time(self, published_str):
"""
解析发布时间
"""
if not published_str:
return datetime.now()
try:
# 尝试多种时间格式
time_formats = [
'%a, %d %b %Y %H:%M:%S %Z',
'%a, %d %b %Y %H:%M:%S %z',
'%Y-%m-%dT%H:%M:%S%z',
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%d'
]
for fmt in time_formats:
try:
return datetime.strptime(published_str, fmt)
except ValueError:
continue
# 如果都失败,返回当前时间
return datetime.now()
except Exception:
return datetime.now()
def detect_language(self, text):
"""
检测语言
"""
if not text:
return 'unknown'
try:
from langdetect import detect
return detect(text)
except:
# 简单的语言检测
chinese_chars = len([c for c in text if '\\u4e00' <= c <= '\\u9fff'])
if chinese_chars > len(text) * 0.3:
return 'zh'
else:
return 'en'
”’
print("📄 RSS爬虫代码:")
print(rss_spider_code)
def demonstrate_web_spider(self):
"""
演示网页爬虫
"""
print("\n🌐 网页新闻爬虫:")
web_spider_code = '''
import scrapy from datetime import datetime from scrapy.http import Request from news_aggregator.items import NewsItem from news_aggregator.utils import NewsDataProcessor
class WebNewsSpider(scrapy.Spider): “”” 网页新闻爬虫 “”” name = ‘web_news’
# 目标网站配置
start_urls = [
'https://www.example-news.com/technology',
'https://www.example-news.com/business',
'https://www.example-news.com/politics'
]
custom_settings = {
'DOWNLOAD_DELAY': 3,
'RANDOMIZE_DOWNLOAD_DELAY': True,
'CONCURRENT_REQUESTS': 4,
'DEPTH_LIMIT': 3,
'ITEM_PIPELINES': {
'news_aggregator.pipelines.NewsValidationPipeline': 300,
'news_aggregator.pipelines.DuplicationPipeline': 400,
'news_aggregator.pipelines.NewsProcessingPipeline': 500,
'news_aggregator.pipelines.NewsStoragePipeline': 600,
}
}
def parse(self, response):
"""
解析列表页
"""
# 提取文章链接
article_links = response.css('.article-list .article-item a::attr(href)').getall()
for link in article_links:
if link:
absolute_url = response.urljoin(link)
yield Request(
url=absolute_url,
callback=self.parse_article,
meta={'category': self.extract_category_from_url(response.url)}
)
# 跟进分页
next_page = response.css('.pagination .next::attr(href)').get()
if next_page:
yield Request(
url=response.urljoin(next_page),
callback=self.parse
)
def parse_article(self, response):
"""
解析文章详情
"""
try:
# 提取文章信息
title = response.css('h1.article-title::text').get()
if not title:
title = response.css('title::text').get()
# 提取内容
content_paragraphs = response.css('.article-content p::text').getall()
content = '\\n'.join(content_paragraphs) if content_paragraphs else ''
# 提取作者
author = response.css('.article-author::text').get()
if not author:
author = response.css('[rel="author"]::text').get()
# 提取发布时间
publish_time_str = response.css('.article-date::text').get()
if not publish_time_str:
publish_time_str = response.css('time::attr(datetime)').get()
# 提取图片
images = response.css('.article-content img::attr(src)').getall()
images = [response.urljoin(img) for img in images if img]
# 创建新闻项
item = NewsItem()
# 基本信息
item['title'] = title.strip() if title else ''
item['content'] = content.strip()
item['source_url'] = response.url
item['source'] = self.extract_source_from_url(response.url)
item['author'] = author.strip() if author else ''
# 时间信息
item['publish_time'] = self.parse_publish_time(publish_time_str)
item['crawl_time'] = datetime.now()
# 分类
item['category'] = response.meta.get('category', '')
# 生成ID和哈希
item['article_id'] = NewsDataProcessor.generate_article_id(
item['title'], item['source'], item['publish_time']
)
item['content_hash'] = NewsDataProcessor.generate_content_hash(content)
# 媒体信息
item['images'] = images[:5] # 最多5张图片
item['videos'] = []
# 基本处理
item['word_count'] = len(content) if content else 0
item['reading_time'] = NewsDataProcessor.calculate_reading_time(content)
item['summary'] = NewsDataProcessor.generate_summary(content)
# 初始化其他字段
item['keywords'] = []
item['tags'] = []
item['sentiment'] = ''
item['sentiment_score'] = 0.0
item['language'] = ''
item['view_count'] = 0
item['comment_count'] = 0
item['share_count'] = 0
# 只有标题和内容都存在才返回
if item['title'] and item['content']:
yield item
except Exception as e:
self.logger.error(f"Error parsing article from {response.url}: {e}")
def extract_category_from_url(self, url):
"""
从URL提取分类
"""
category_mapping = {
'technology': 'technology',
'tech': 'technology',
'business': 'business',
'finance': 'business',
'politics': 'politics',
'sports': 'sports',
'entertainment': 'entertainment',
'health': 'health'
}
url_lower = url.lower()
for keyword, category in category_mapping.items():
if keyword in url_lower:
return category
return 'general'
def extract_source_from_url(self, url):
"""
从URL提取来源
"""
from urllib.parse import urlparse
parsed = urlparse(url)
domain = parsed.netloc.lower()
# 移除www前缀
if domain.startswith('www.'):
domain = domain[4:]
# 移除顶级域名
if '.' in domain:
domain = domain.split('.')[0]
return domain
def parse_publish_time(self, time_str):
"""
解析发布时间
"""
if not time_str:
return datetime.now()
try:
# 清理时间字符串
time_str = time_str.strip()
# 尝试多种格式
formats = [
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%dT%H:%M:%S',
'%Y-%m-%dT%H:%M:%SZ',
'%Y-%m-%d',
'%d/%m/%Y %H:%M',
'%d/%m/%Y'
]
for fmt in formats:
try:
return datetime.strptime(time_str, fmt)
except ValueError:
continue
return datetime.now()
except Exception:
return datetime.now()
”’
print("📄 网页爬虫代码:")
print(web_spider_code)
演示新闻爬虫实现
spider_demo = NewsSpiderImplementation() spider_demo.demonstrate_rss_spider() spider_demo.demonstrate_web_spider()
print(“新闻爬虫实现演示完成!”)
3.3 数据处理管道
”`python
9. 新闻数据处理管道
print(“\n⚙️ 新闻数据处理管道:”)
class NewsProcessingPipelines: “”” 新闻数据处理管道 “””
def demonstrate_validation_pipeline(self):
"""
演示数据验证管道
"""
print("\n✅ 数据验证管道:")
validation_code = '''
from scrapy.exceptions import DropItem from datetime import datetime, timedelta import re
class NewsValidationPipeline: “”” 新闻数据验证管道 “””
def __init__(self):
self.min_title_length = 10
self.min_content_length = 100
self.max_age_days = 30
# 垃圾内容关键词
self.spam_keywords = [
'advertisement', '广告', 'sponsored', '赞助',
'click here', '点击这里', 'buy now', '立即购买'
]
def process_item(self, item, spider):
"""
处理数据项
"""
try:
# 验证标题
if not self.validate_title(item.get('title', '')):
raise DropItem(f"Invalid title: {item.get('title', '')}")
# 验证内容
if not self.validate_content(item.get('content', '')):
raise DropItem(f"Invalid content for article: {item.get('title', '')}")
# 验证发布时间
if not self.validate_publish_time(item.get('publish_time')):
raise DropItem(f"Invalid publish time for article: {item.get('title', '')}")
# 验证来源URL
if not self.validate_source_url(item.get('source_url', '')):
raise DropItem(f"Invalid source URL: {item.get('source_url', '')}")
# 检查垃圾内容
if self.is_spam_content(item):
raise DropItem(f"Spam content detected: {item.get('title', '')}")
spider.logger.info(f"Validated article: {item.get('title', '')}")
return item
except DropItem:
raise
except Exception as e:
spider.logger.error(f"Validation error: {e}")
raise DropItem(f"Validation failed: {e}")
def validate_title(self, title):
"""
验证标题
"""
if not title or not isinstance(title, str):
return False
title = title.strip()
# 检查长度
if len(title) < self.min_title_length:
return False
# 检查是否全是特殊字符
if not re.search(r'[a-zA-Z\\u4e00-\\u9fff]', title):
return False
return True
def validate_content(self, content):
"""
验证内容
"""
if not content or not isinstance(content, str):
return False
content = content.strip()
# 检查长度
if len(content) < self.min_content_length:
return False
# 检查是否有实际内容
if not re.search(r'[a-zA-Z\\u4e00-\\u9fff]', content):
return False
return True
def validate_publish_time(self, publish_time):
"""
验证发布时间
"""
if not publish_time:
return False
if not isinstance(publish_time, datetime):
return False
# 检查时间是否在合理范围内
now = datetime.now()
max_age = timedelta(days=self.max_age_days)
if publish_time > now:
return False # 未来时间
if now - publish_time > max_age:
return False # 太旧的文章
return True
def validate_source_url(self, url):
"""
验证来源URL
"""
if not url or not isinstance(url, str):
return False
# 简单的URL格式检查
url_pattern = re.compile(
r'^https?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\\.)+' # domain...
r'(?:[A-Z]{2,6}\\.?|[A-Z0-9-]{2,}\\.?)|' # host...
r'localhost|' # localhost...
r'\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3})' # ...or ip
r'(?::\\d+)?' # optional port
r'(?:/?|[/?]\\S+)$', re.IGNORECASE)
return url_pattern.match(url) is not None
def is_spam_content(self, item):
"""
检查是否为垃圾内容
"""
title = item.get('title', '').lower()
content = item.get('content', '').lower()
text = f"{title} {content}"
# 检查垃圾关键词
for keyword in self.spam_keywords:
if keyword.lower() in text:
return True
# 检查重复字符
if self.has_excessive_repetition(text):
return True
return False
def has_excessive_repetition(self, text):
"""
检查是否有过度重复
"""
# 检查连续重复字符
if re.search(r'(.)\\1{10,}', text):
return True
# 检查重复单词
words = text.split()
if len(words) > 10:
word_count = {}
for word in words:
word_count[word] = word_count.get(word, 0) + 1
# 如果某个词出现次数超过总词数的30%
max_count = max(word_count.values())
if max_count > len(words) * 0.3:
return True
return False
”’
print("📄 数据验证管道代码:")
print(validation_code)
def demonstrate_deduplication_pipeline(self):
"""
演示去重管道
"""
print("\n🔄 去重管道:")
dedup_code = '''
import hashlib import redis from scrapy.exceptions import DropItem from scrapy.dupefilters import BaseDupeFilter
class DuplicationPipeline: “”” 去重管道 “””
def __init__(self, redis_host='localhost', redis_port=6379, redis_db=0):
self.redis_client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
decode_responses=True
)
self.content_hash_key = 'news:content_hashes'
self.url_hash_key = 'news:url_hashes'
self.title_hash_key = 'news:title_hashes'
# 相似度阈值
self.similarity_threshold = 0.8
@classmethod
def from_crawler(cls, crawler):
"""
从爬虫配置创建实例
"""
settings = crawler.settings
return cls(
redis_host=settings.get('REDIS_HOST', 'localhost'),
redis_port=settings.get('REDIS_PORT', 6379),
redis_db=settings.get('REDIS_DB', 0)
)
def process_item(self, item, spider):
"""
处理数据项
"""
try:
# URL去重
if self.is_duplicate_url(item.get('source_url', '')):
raise DropItem(f"Duplicate URL: {item.get('source_url', '')}")
# 内容去重
content_hash = item.get('content_hash')
if content_hash and self.is_duplicate_content(content_hash):
raise DropItem(f"Duplicate content: {item.get('title', '')}")
# 标题相似度检查
if self.is_similar_title(item.get('title', '')):
raise DropItem(f"Similar title exists: {item.get('title', '')}")
# 记录哈希值
self.record_hashes(item)
spider.logger.info(f"Passed deduplication: {item.get('title', '')}")
return item
except DropItem:
raise
except Exception as e:
spider.logger.error(f"Deduplication error: {e}")
return item
def is_duplicate_url(self, url):
"""
检查URL是否重复
"""
if not url:
return False
url_hash = hashlib.md5(url.encode()).hexdigest()
return self.redis_client.sismember(self.url_hash_key, url_hash)
def is_duplicate_content(self, content_hash):
"""
检查内容是否重复
"""
if not content_hash:
return False
return self.redis_client.sismember(self.content_hash_key, content_hash)
def is_similar_title(self, title):
"""
检查标题相似度
"""
if not title:
return False
title_hash = self.generate_title_hash(title)
# 获取所有已存在的标题哈希
existing_hashes = self.redis_client.smembers(self.title_hash_key)
for existing_hash in existing_hashes:
similarity = self.calculate_similarity(title_hash, existing_hash)
if similarity > self.similarity_threshold:
return True
return False
def generate_title_hash(self, title):
"""
生成标题哈希(用于相似度比较)
"""
# 清理标题
import re
cleaned_title = re.sub(r'[^\\w\\s]', '', title.lower())
words = cleaned_title.split()
# 移除停用词
stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
filtered_words = [word for word in words if word not in stop_words]
# 排序并连接
sorted_words = sorted(filtered_words)
return hashlib.md5(' '.join(sorted_words).encode()).hexdigest()
def calculate_similarity(self, hash1, hash2):
"""
计算哈希相似度
"""
if hash1 == hash2:
return 1.0
# 简单的字符级相似度
common_chars = sum(1 for a, b in zip(hash1, hash2) if a == b)
return common_chars / len(hash1)
def record_hashes(self, item):
"""
记录哈希值
"""
# 记录URL哈希
url = item.get('source_url', '')
if url:
url_hash = hashlib.md5(url.encode()).hexdigest()
self.redis_client.sadd(self.url_hash_key, url_hash)
# 记录内容哈希
content_hash = item.get('content_hash')
if content_hash:
self.redis_client.sadd(self.content_hash_key, content_hash)
# 记录标题哈希
title = item.get('title', '')
if title:
title_hash = self.generate_title_hash(title)
self.redis_client.sadd(self.title_hash_key, title_hash)
# 设置过期时间(30天)
expire_time = 30 * 24 * 3600
self.redis_client.expire(self.url_hash_key, expire_time)
self.redis_client.expire(self.content_hash_key, expire_time)
self.redis_client.expire(self.title_hash_key, expire_time)
def clear_old_hashes(self):
"""
清理旧的哈希值
"""
# 这个方法可以定期调用来清理过期数据
pass
”’
print("📄 去重管道代码:")
print(dedup_code)
演示数据处理管道
pipeline_demo = NewsProcessingPipelines() pipeline_demo.demonstrate_validation_pipeline() pipeline_demo.demonstrate_deduplication_pipeline()
print(“数据处理管道演示完成!”)