本章概述
在前面的章节中,我们学习了Scrapy的基础使用、高级功能和部署运维。本章将深入探讨Scrapy的扩展开发和插件系统,学习如何创建自定义扩展、开发插件、以及构建可复用的组件。
本章学习目标
- 理解Scrapy扩展系统的架构和原理
- 掌握自定义扩展的开发方法
- 学习插件开发的最佳实践
- 了解扩展的分发和管理
- 掌握高级扩展开发技巧
1. Scrapy扩展系统概述
1.1 扩展系统架构
# 1. Scrapy扩展系统架构
print("🏗️ Scrapy扩展系统架构:")
import scrapy
from scrapy import signals
from scrapy.extensions import ExtensionManager
from scrapy.utils.misc import load_object
class ExtensionSystemDemo:
"""
扩展系统演示
"""
def __init__(self):
self.extensions = {}
self.signals = {}
def demonstrate_extension_architecture(self):
"""
演示扩展架构
"""
print("\n📋 扩展系统组件:")
# 扩展系统组件
components = {
'Extension Manager': {
'description': '扩展管理器,负责加载和管理所有扩展',
'responsibilities': [
'扩展的加载和初始化',
'扩展生命周期管理',
'信号连接和断开',
'扩展配置管理'
]
},
'Signal System': {
'description': '信号系统,用于扩展间通信',
'responsibilities': [
'事件通知机制',
'异步消息传递',
'扩展解耦',
'生命周期事件'
]
},
'Extension Base': {
'description': '扩展基类,定义扩展接口',
'responsibilities': [
'扩展标准接口',
'生命周期方法',
'配置访问',
'信号处理'
]
},
'Plugin System': {
'description': '插件系统,支持动态加载',
'responsibilities': [
'插件发现和加载',
'依赖管理',
'版本控制',
'插件配置'
]
}
}
for component, info in components.items():
print(f"\n🔧 {component}:")
print(f" 描述: {info['description']}")
print(f" 职责:")
for responsibility in info['responsibilities']:
print(f" - {responsibility}")
def demonstrate_extension_lifecycle(self):
"""
演示扩展生命周期
"""
print("\n🔄 扩展生命周期:")
lifecycle_stages = [
('加载阶段', 'Extension Loading', [
'从配置中读取扩展列表',
'动态导入扩展模块',
'检查扩展依赖',
'验证扩展接口'
]),
('初始化阶段', 'Extension Initialization', [
'创建扩展实例',
'传递配置参数',
'连接信号处理器',
'初始化扩展状态'
]),
('运行阶段', 'Extension Runtime', [
'响应信号事件',
'执行扩展逻辑',
'与其他组件交互',
'处理异常情况'
]),
('关闭阶段', 'Extension Shutdown', [
'断开信号连接',
'清理资源',
'保存状态数据',
'执行清理逻辑'
])
]
for stage_name, stage_key, tasks in lifecycle_stages:
print(f"\n📋 {stage_name} ({stage_key}):")
for task in tasks:
print(f" ✓ {task}")
def demonstrate_signal_system(self):
"""
演示信号系统
"""
print("\n📡 信号系统:")
# 常用信号
common_signals = {
'engine_started': '引擎启动信号',
'engine_stopped': '引擎停止信号',
'spider_opened': '爬虫开启信号',
'spider_closed': '爬虫关闭信号',
'spider_idle': '爬虫空闲信号',
'request_scheduled': '请求调度信号',
'request_dropped': '请求丢弃信号',
'response_received': '响应接收信号',
'response_downloaded': '响应下载信号',
'item_scraped': '数据项抓取信号',
'item_dropped': '数据项丢弃信号'
}
print("🔔 常用信号:")
for signal_name, description in common_signals.items():
print(f" • {signal_name}: {description}")
# 信号处理示例
print("\n📝 信号处理示例:")
signal_examples = [
"连接信号: crawler.signals.connect(handler, signal=signals.spider_opened)",
"发送信号: crawler.signals.send_catch_log(signal=signals.item_scraped, item=item)",
"断开信号: crawler.signals.disconnect(handler, signal=signals.spider_closed)",
"一次性信号: crawler.signals.connect(handler, signal=signals.engine_started, sender=spider)"
]
for example in signal_examples:
print(f" 📌 {example}")
# 演示扩展系统
demo = ExtensionSystemDemo()
demo.demonstrate_extension_architecture()
demo.demonstrate_extension_lifecycle()
demo.demonstrate_signal_system()
print("扩展系统概述演示完成!")
1.2 扩展类型和分类
# 2. 扩展类型和分类
print("\n🏷️ 扩展类型和分类:")
class ExtensionTypesDemo:
"""
扩展类型演示
"""
def demonstrate_extension_types(self):
"""
演示扩展类型
"""
print("\n📂 扩展分类:")
extension_categories = {
'核心扩展': {
'description': 'Scrapy内置的核心扩展',
'examples': [
'LogStats - 日志统计扩展',
'MemoryUsage - 内存使用监控',
'CloseSpider - 爬虫关闭条件',
'AutoThrottle - 自动限速',
'HttpCache - HTTP缓存',
'Telnet - Telnet控制台'
],
'characteristics': [
'默认启用或可选启用',
'经过充分测试',
'性能优化',
'文档完善'
]
},
'第三方扩展': {
'description': '社区开发的扩展',
'examples': [
'scrapy-splash - JavaScript渲染',
'scrapy-redis - Redis集成',
'scrapy-deltafetch - 增量抓取',
'scrapy-rotating-proxies - 代理轮换',
'scrapy-user-agents - User-Agent轮换',
'scrapy-fake-useragent - 虚假User-Agent'
],
'characteristics': [
'功能特化',
'社区维护',
'安装简单',
'配置灵活'
]
},
'自定义扩展': {
'description': '根据特定需求开发的扩展',
'examples': [
'业务逻辑扩展',
'数据处理扩展',
'监控告警扩展',
'性能优化扩展',
'安全防护扩展',
'集成扩展'
],
'characteristics': [
'针对性强',
'业务相关',
'可定制化',
'维护成本'
]
}
}
for category, info in extension_categories.items():
print(f"\n🏷️ {category}:")
print(f" 描述: {info['description']}")
print(f" 示例:")
for example in info['examples']:
print(f" • {example}")
print(f" 特点:")
for characteristic in info['characteristics']:
print(f" ✓ {characteristic}")
def demonstrate_extension_patterns(self):
"""
演示扩展模式
"""
print("\n🎨 扩展设计模式:")
patterns = {
'监听器模式': {
'description': '监听特定信号并响应',
'use_cases': [
'统计信息收集',
'日志记录',
'性能监控',
'错误处理'
],
'example': 'LogStats扩展监听各种信号来收集统计信息'
},
'拦截器模式': {
'description': '拦截请求或响应进行处理',
'use_cases': [
'请求修改',
'响应处理',
'缓存控制',
'重试逻辑'
],
'example': 'HttpCache扩展拦截请求来实现缓存功能'
},
'装饰器模式': {
'description': '为现有功能添加额外行为',
'use_cases': [
'功能增强',
'行为修改',
'性能优化',
'安全加固'
],
'example': 'AutoThrottle扩展为下载器添加自动限速功能'
},
'策略模式': {
'description': '根据条件选择不同的处理策略',
'use_cases': [
'条件处理',
'算法选择',
'配置驱动',
'动态行为'
],
'example': 'CloseSpider扩展根据不同条件关闭爬虫'
}
}
for pattern, info in patterns.items():
print(f"\n🎯 {pattern}:")
print(f" 描述: {info['description']}")
print(f" 适用场景:")
for use_case in info['use_cases']:
print(f" • {use_case}")
print(f" 示例: {info['example']}")
# 演示扩展类型
types_demo = ExtensionTypesDemo()
types_demo.demonstrate_extension_types()
types_demo.demonstrate_extension_patterns()
print("扩展类型和分类演示完成!")
2. 自定义扩展开发
2.1 基础扩展开发
# 3. 基础扩展开发
print("\n🛠️ 基础扩展开发:")
import time
import json
import logging
from datetime import datetime
from scrapy import signals
from scrapy.exceptions import NotConfigured
class BasicExtensionDemo:
"""
基础扩展开发演示
"""
def create_simple_extension(self):
"""
创建简单扩展
"""
print("\n📝 简单扩展示例:")
# 简单的统计扩展
simple_extension_code = '''
class SimpleStatsExtension:
"""
简单统计扩展
"""
def __init__(self, crawler):
self.crawler = crawler
self.stats = {
'requests_count': 0,
'responses_count': 0,
'items_count': 0,
'errors_count': 0,
'start_time': None,
'end_time': None
}
# 连接信号
crawler.signals.connect(self.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(self.spider_closed, signal=signals.spider_closed)
crawler.signals.connect(self.request_scheduled, signal=signals.request_scheduled)
crawler.signals.connect(self.response_received, signal=signals.response_received)
crawler.signals.connect(self.item_scraped, signal=signals.item_scraped)
crawler.signals.connect(self.spider_error, signal=signals.spider_error)
@classmethod
def from_crawler(cls, crawler):
"""
从crawler创建扩展实例
"""
# 检查扩展是否启用
if not crawler.settings.getbool('SIMPLE_STATS_ENABLED', True):
raise NotConfigured('SimpleStats extension is disabled')
return cls(crawler)
def spider_opened(self, spider):
"""
爬虫开启时调用
"""
self.stats['start_time'] = datetime.now()
spider.logger.info(f"SimpleStats: Spider {spider.name} opened")
def spider_closed(self, spider, reason):
"""
爬虫关闭时调用
"""
self.stats['end_time'] = datetime.now()
duration = self.stats['end_time'] - self.stats['start_time']
# 输出统计信息
spider.logger.info(f"SimpleStats: Spider {spider.name} closed")
spider.logger.info(f"SimpleStats: Duration: {duration}")
spider.logger.info(f"SimpleStats: Requests: {self.stats['requests_count']}")
spider.logger.info(f"SimpleStats: Responses: {self.stats['responses_count']}")
spider.logger.info(f"SimpleStats: Items: {self.stats['items_count']}")
spider.logger.info(f"SimpleStats: Errors: {self.stats['errors_count']}")
def request_scheduled(self, request, spider):
"""
请求调度时调用
"""
self.stats['requests_count'] += 1
def response_received(self, response, request, spider):
"""
响应接收时调用
"""
self.stats['responses_count'] += 1
def item_scraped(self, item, response, spider):
"""
数据项抓取时调用
"""
self.stats['items_count'] += 1
def spider_error(self, failure, response, spider):
"""
爬虫错误时调用
"""
self.stats['errors_count'] += 1
'''
print("📄 简单统计扩展代码:")
print(simple_extension_code)
def create_configurable_extension(self):
"""
创建可配置扩展
"""
print("\n⚙️ 可配置扩展示例:")
configurable_extension_code = '''
class ConfigurableLogExtension:
"""
可配置日志扩展
"""
def __init__(self, crawler, log_level='INFO', log_file=None,
log_format=None, max_file_size=None):
self.crawler = crawler
self.log_level = getattr(logging, log_level.upper())
self.log_file = log_file
self.log_format = log_format or '%(asctime)s [%(name)s] %(levelname)s: %(message)s'
self.max_file_size = max_file_size or 10 * 1024 * 1024 # 10MB
# 设置日志记录器
self.logger = logging.getLogger(f'scrapy.{crawler.spider.name}')
self.setup_logger()
# 连接信号
crawler.signals.connect(self.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(self.spider_closed, signal=signals.spider_closed)
crawler.signals.connect(self.item_scraped, signal=signals.item_scraped)
crawler.signals.connect(self.spider_error, signal=signals.spider_error)
@classmethod
def from_crawler(cls, crawler):
"""
从crawler创建扩展实例
"""
settings = crawler.settings
# 检查扩展是否启用
if not settings.getbool('CONFIGURABLE_LOG_ENABLED', False):
raise NotConfigured('ConfigurableLog extension is disabled')
# 获取配置参数
log_level = settings.get('CONFIGURABLE_LOG_LEVEL', 'INFO')
log_file = settings.get('CONFIGURABLE_LOG_FILE')
log_format = settings.get('CONFIGURABLE_LOG_FORMAT')
max_file_size = settings.getint('CONFIGURABLE_LOG_MAX_SIZE', 10 * 1024 * 1024)
return cls(crawler, log_level, log_file, log_format, max_file_size)
def setup_logger(self):
"""
设置日志记录器
"""
# 创建格式化器
formatter = logging.Formatter(self.log_format)
# 如果指定了日志文件,添加文件处理器
if self.log_file:
from logging.handlers import RotatingFileHandler
file_handler = RotatingFileHandler(
self.log_file,
maxBytes=self.max_file_size,
backupCount=5
)
file_handler.setFormatter(formatter)
file_handler.setLevel(self.log_level)
self.logger.addHandler(file_handler)
# 设置日志级别
self.logger.setLevel(self.log_level)
def spider_opened(self, spider):
"""
爬虫开启时记录日志
"""
self.logger.info(f"Spider {spider.name} started with ConfigurableLog extension")
def spider_closed(self, spider, reason):
"""
爬虫关闭时记录日志
"""
self.logger.info(f"Spider {spider.name} closed. Reason: {reason}")
def item_scraped(self, item, response, spider):
"""
数据项抓取时记录日志
"""
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(f"Item scraped from {response.url}: {dict(item)}")
def spider_error(self, failure, response, spider):
"""
爬虫错误时记录日志
"""
self.logger.error(f"Spider error: {failure.getErrorMessage()}")
if response:
self.logger.error(f"Error URL: {response.url}")
'''
print("📄 可配置日志扩展代码:")
print(configurable_extension_code)
def create_advanced_extension(self):
"""
创建高级扩展
"""
print("\n🚀 高级扩展示例:")
advanced_extension_code = '''
class AdvancedMonitoringExtension:
"""
高级监控扩展
"""
def __init__(self, crawler, monitoring_interval=60, alert_threshold=None,
webhook_url=None, metrics_file=None):
self.crawler = crawler
self.monitoring_interval = monitoring_interval
self.alert_threshold = alert_threshold or {}
self.webhook_url = webhook_url
self.metrics_file = metrics_file
# 监控数据
self.metrics = {
'requests_per_minute': [],
'response_times': [],
'error_rates': [],
'memory_usage': [],
'cpu_usage': []
}
# 定时器
self.monitoring_timer = None
# 连接信号
crawler.signals.connect(self.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(self.spider_closed, signal=signals.spider_closed)
crawler.signals.connect(self.response_received, signal=signals.response_received)
crawler.signals.connect(self.spider_error, signal=signals.spider_error)
@classmethod
def from_crawler(cls, crawler):
"""
从crawler创建扩展实例
"""
settings = crawler.settings
if not settings.getbool('ADVANCED_MONITORING_ENABLED', False):
raise NotConfigured('AdvancedMonitoring extension is disabled')
monitoring_interval = settings.getint('MONITORING_INTERVAL', 60)
alert_threshold = settings.getdict('ALERT_THRESHOLD', {})
webhook_url = settings.get('MONITORING_WEBHOOK_URL')
metrics_file = settings.get('MONITORING_METRICS_FILE')
return cls(crawler, monitoring_interval, alert_threshold,
webhook_url, metrics_file)
def spider_opened(self, spider):
"""
爬虫开启时启动监控
"""
spider.logger.info("AdvancedMonitoring: Starting monitoring")
self.start_monitoring()
def spider_closed(self, spider, reason):
"""
爬虫关闭时停止监控
"""
spider.logger.info("AdvancedMonitoring: Stopping monitoring")
self.stop_monitoring()
self.save_metrics()
def start_monitoring(self):
"""
启动监控
"""
from twisted.internet import reactor
def monitor():
self.collect_metrics()
self.check_alerts()
# 安排下次监控
self.monitoring_timer = reactor.callLater(
self.monitoring_interval, monitor
)
# 开始监控
monitor()
def stop_monitoring(self):
"""
停止监控
"""
if self.monitoring_timer and self.monitoring_timer.active():
self.monitoring_timer.cancel()
def collect_metrics(self):
"""
收集监控指标
"""
import psutil
# 收集系统指标
memory_usage = psutil.virtual_memory().percent
cpu_usage = psutil.cpu_percent()
# 收集爬虫指标
stats = self.crawler.stats
requests_count = stats.get_value('downloader/request_count', 0)
response_count = stats.get_value('downloader/response_count', 0)
error_count = stats.get_value('downloader/exception_count', 0)
# 计算错误率
error_rate = (error_count / max(response_count, 1)) * 100
# 存储指标
self.metrics['memory_usage'].append(memory_usage)
self.metrics['cpu_usage'].append(cpu_usage)
self.metrics['error_rates'].append(error_rate)
# 限制历史数据长度
max_history = 100
for key in self.metrics:
if len(self.metrics[key]) > max_history:
self.metrics[key] = self.metrics[key][-max_history:]
def check_alerts(self):
"""
检查告警条件
"""
alerts = []
# 检查内存使用率
if self.metrics['memory_usage']:
current_memory = self.metrics['memory_usage'][-1]
threshold = self.alert_threshold.get('memory_usage', 90)
if current_memory > threshold:
alerts.append(f"High memory usage: {current_memory:.1f}%")
# 检查CPU使用率
if self.metrics['cpu_usage']:
current_cpu = self.metrics['cpu_usage'][-1]
threshold = self.alert_threshold.get('cpu_usage', 90)
if current_cpu > threshold:
alerts.append(f"High CPU usage: {current_cpu:.1f}%")
# 检查错误率
if self.metrics['error_rates']:
current_error_rate = self.metrics['error_rates'][-1]
threshold = self.alert_threshold.get('error_rate', 10)
if current_error_rate > threshold:
alerts.append(f"High error rate: {current_error_rate:.1f}%")
# 发送告警
if alerts:
self.send_alerts(alerts)
def send_alerts(self, alerts):
"""
发送告警
"""
if self.webhook_url:
import requests
alert_data = {
'timestamp': datetime.now().isoformat(),
'spider': self.crawler.spider.name,
'alerts': alerts,
'metrics': {
'memory_usage': self.metrics['memory_usage'][-1] if self.metrics['memory_usage'] else 0,
'cpu_usage': self.metrics['cpu_usage'][-1] if self.metrics['cpu_usage'] else 0,
'error_rate': self.metrics['error_rates'][-1] if self.metrics['error_rates'] else 0
}
}
try:
requests.post(self.webhook_url, json=alert_data, timeout=10)
except Exception as e:
self.crawler.spider.logger.error(f"Failed to send alert: {e}")
def save_metrics(self):
"""
保存监控指标
"""
if self.metrics_file:
try:
with open(self.metrics_file, 'w') as f:
json.dump(self.metrics, f, indent=2)
except Exception as e:
self.crawler.spider.logger.error(f"Failed to save metrics: {e}")
def response_received(self, response, request, spider):
"""
响应接收时记录响应时间
"""
if hasattr(request, 'meta') and 'download_start_time' in request.meta:
response_time = time.time() - request.meta['download_start_time']
self.metrics['response_times'].append(response_time)
def spider_error(self, failure, response, spider):
"""
爬虫错误时记录
"""
spider.logger.warning(f"AdvancedMonitoring: Error detected - {failure.getErrorMessage()}")
'''
print("📄 高级监控扩展代码:")
print(advanced_extension_code)
# 演示基础扩展开发
basic_demo = BasicExtensionDemo()
basic_demo.create_simple_extension()
basic_demo.create_configurable_extension()
basic_demo.create_advanced_extension()
print("基础扩展开发演示完成!")
2.2 扩展配置和注册
# 4. 扩展配置和注册
print("\n⚙️ 扩展配置和注册:")
class ExtensionConfigurationDemo:
"""
扩展配置演示
"""
def demonstrate_extension_registration(self):
"""
演示扩展注册
"""
print("\n📋 扩展注册方法:")
registration_methods = {
'settings.py配置': {
'description': '在项目settings.py中配置扩展',
'example': '''
# settings.py
EXTENSIONS = {
'myproject.extensions.SimpleStatsExtension': 500,
'myproject.extensions.ConfigurableLogExtension': 600,
'myproject.extensions.AdvancedMonitoringExtension': 700,
}
# 扩展配置
SIMPLE_STATS_ENABLED = True
CONFIGURABLE_LOG_ENABLED = True
CONFIGURABLE_LOG_LEVEL = 'DEBUG'
CONFIGURABLE_LOG_FILE = 'scrapy.log'
ADVANCED_MONITORING_ENABLED = True
MONITORING_INTERVAL = 30
ALERT_THRESHOLD = {
'memory_usage': 85,
'cpu_usage': 80,
'error_rate': 5
}
''',
'priority': '数字越小优先级越高'
},
'命令行配置': {
'description': '通过命令行参数配置扩展',
'example': '''
# 启用扩展
scrapy crawl myspider -s EXTENSIONS='{"myproject.extensions.SimpleStatsExtension": 500}'
# 配置扩展参数
scrapy crawl myspider -s SIMPLE_STATS_ENABLED=True -s MONITORING_INTERVAL=60
# 禁用默认扩展
scrapy crawl myspider -s EXTENSIONS='{"scrapy.extensions.logstats.LogStats": None}'
''',
'priority': '命令行参数优先级最高'
},
'自定义设置': {
'description': '在Spider中自定义设置',
'example': '''
class MySpider(scrapy.Spider):
name = 'myspider'
custom_settings = {
'EXTENSIONS': {
'myproject.extensions.SimpleStatsExtension': 500,
},
'SIMPLE_STATS_ENABLED': True,
'MONITORING_INTERVAL': 45,
}
''',
'priority': 'Spider级别设置优先级中等'
}
}
for method, info in registration_methods.items():
print(f"\n🔧 {method}:")
print(f" 描述: {info['description']}")
print(f" 优先级: {info['priority']}")
print(f" 示例:")
print(info['example'])
def demonstrate_extension_configuration(self):
"""
演示扩展配置
"""
print("\n⚙️ 扩展配置最佳实践:")
configuration_example = '''
# extensions.py - 扩展配置示例
class ConfigurableExtension:
"""
可配置扩展示例
"""
# 默认配置
DEFAULT_CONFIG = {
'enabled': True,
'interval': 60,
'threshold': 10,
'output_file': None,
'debug_mode': False
}
def __init__(self, crawler, **kwargs):
self.crawler = crawler
self.config = self.get_config(crawler.settings, **kwargs)
# 验证配置
self.validate_config()
# 根据配置初始化
self.setup_extension()
@classmethod
def from_crawler(cls, crawler):
"""
从crawler创建扩展实例
"""
# 检查扩展是否启用
if not crawler.settings.getbool('MY_EXTENSION_ENABLED', True):
raise NotConfigured('MyExtension is disabled')
return cls(crawler)
def get_config(self, settings, **kwargs):
"""
获取扩展配置
"""
config = self.DEFAULT_CONFIG.copy()
# 从settings中读取配置
config.update({
'enabled': settings.getbool('MY_EXTENSION_ENABLED', config['enabled']),
'interval': settings.getint('MY_EXTENSION_INTERVAL', config['interval']),
'threshold': settings.getfloat('MY_EXTENSION_THRESHOLD', config['threshold']),
'output_file': settings.get('MY_EXTENSION_OUTPUT_FILE', config['output_file']),
'debug_mode': settings.getbool('MY_EXTENSION_DEBUG', config['debug_mode'])
})
# 从kwargs中读取配置(优先级最高)
config.update(kwargs)
return config
def validate_config(self):
"""
验证配置参数
"""
if self.config['interval'] <= 0:
raise ValueError('Interval must be positive')
if self.config['threshold'] < 0:
raise ValueError('Threshold must be non-negative')
if self.config['output_file']:
import os
output_dir = os.path.dirname(self.config['output_file'])
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
def setup_extension(self):
"""
根据配置设置扩展
"""
if self.config['debug_mode']:
logging.getLogger().setLevel(logging.DEBUG)
# 其他初始化逻辑...
'''
print("📄 配置示例代码:")
print(configuration_example)
def demonstrate_extension_dependencies(self):
"""
演示扩展依赖管理
"""
print("\n🔗 扩展依赖管理:")
dependency_example = '''
class DependentExtension:
"""
有依赖的扩展示例
"""
# 声明依赖
REQUIRED_EXTENSIONS = [
'scrapy.extensions.logstats.LogStats',
'myproject.extensions.SimpleStatsExtension'
]
REQUIRED_PACKAGES = [
'requests>=2.25.0',
'psutil>=5.8.0'
]
def __init__(self, crawler):
self.crawler = crawler
# 检查依赖
self.check_dependencies()
# 获取依赖的扩展
self.stats_extension = self.get_extension('myproject.extensions.SimpleStatsExtension')
@classmethod
def from_crawler(cls, crawler):
"""
从crawler创建扩展实例
"""
return cls(crawler)
def check_dependencies(self):
"""
检查扩展依赖
"""
# 检查扩展依赖
enabled_extensions = self.crawler.settings.get('EXTENSIONS', {})
for required_ext in self.REQUIRED_EXTENSIONS:
if required_ext not in enabled_extensions:
raise NotConfigured(f'Required extension not enabled: {required_ext}')
# 检查包依赖
for package in self.REQUIRED_PACKAGES:
try:
import pkg_resources
pkg_resources.require(package)
except pkg_resources.DistributionNotFound:
raise NotConfigured(f'Required package not found: {package}')
except pkg_resources.VersionConflict:
raise NotConfigured(f'Package version conflict: {package}')
def get_extension(self, extension_path):
"""
获取其他扩展实例
"""
# 这里需要通过crawler的扩展管理器获取
# 实际实现可能需要更复杂的逻辑
return None
'''
print("📄 依赖管理示例:")
print(dependency_example)
# 演示扩展配置
config_demo = ExtensionConfigurationDemo()
config_demo.demonstrate_extension_registration()
config_demo.demonstrate_extension_configuration()
config_demo.demonstrate_extension_dependencies()
print("扩展配置和注册演示完成!")
3. 插件开发与管理
3.1 插件架构设计
# 5. 插件架构设计
print("\n🏗️ 插件架构设计:")
import importlib
import inspect
from abc import ABC, abstractmethod
class PluginSystemDemo:
"""
插件系统演示
"""
def demonstrate_plugin_architecture(self):
"""
演示插件架构
"""
print("\n🏛️ 插件架构组件:")
# 插件接口定义
plugin_interface_code = '''
class PluginInterface(ABC):
"""
插件接口基类
"""
@property
@abstractmethod
def name(self):
"""插件名称"""
pass
@property
@abstractmethod
def version(self):
"""插件版本"""
pass
@property
@abstractmethod
def description(self):
"""插件描述"""
pass
@property
def dependencies(self):
"""插件依赖"""
return []
@property
def priority(self):
"""插件优先级"""
return 500
@abstractmethod
def initialize(self, crawler):
"""初始化插件"""
pass
@abstractmethod
def finalize(self, crawler):
"""清理插件"""
pass
def is_enabled(self, settings):
"""检查插件是否启用"""
return settings.getbool(f'{self.name.upper()}_ENABLED', True)
class DataProcessorPlugin(PluginInterface):
"""
数据处理插件接口
"""
@abstractmethod
def process_item(self, item, spider):
"""处理数据项"""
pass
class MiddlewarePlugin(PluginInterface):
"""
中间件插件接口
"""
@abstractmethod
def process_request(self, request, spider):
"""处理请求"""
pass
@abstractmethod
def process_response(self, request, response, spider):
"""处理响应"""
pass
class MonitoringPlugin(PluginInterface):
"""
监控插件接口
"""
@abstractmethod
def collect_metrics(self, spider):
"""收集监控指标"""
pass
@abstractmethod
def send_alert(self, alert_data):
"""发送告警"""
pass
'''
print("📄 插件接口定义:")
print(plugin_interface_code)
def demonstrate_plugin_manager(self):
"""
演示插件管理器
"""
print("\n🎛️ 插件管理器:")
plugin_manager_code = '''
class PluginManager:
"""
插件管理器
"""
def __init__(self, crawler):
self.crawler = crawler
self.plugins = {}
self.plugin_instances = {}
self.plugin_order = []
def discover_plugins(self, plugin_paths=None):
"""
发现插件
"""
plugin_paths = plugin_paths or [
'scrapy_plugins',
'myproject.plugins',
'third_party_plugins'
]
discovered_plugins = {}
for plugin_path in plugin_paths:
try:
# 动态导入插件模块
module = importlib.import_module(plugin_path)
# 查找插件类
for name, obj in inspect.getmembers(module, inspect.isclass):
if (issubclass(obj, PluginInterface) and
obj != PluginInterface and
not inspect.isabstract(obj)):
plugin_instance = obj()
discovered_plugins[plugin_instance.name] = {
'class': obj,
'instance': plugin_instance,
'module': plugin_path
}
self.crawler.spider.logger.info(
f"Discovered plugin: {plugin_instance.name} "
f"v{plugin_instance.version}"
)
except ImportError as e:
self.crawler.spider.logger.warning(
f"Failed to import plugin module {plugin_path}: {e}"
)
return discovered_plugins
def load_plugins(self, plugin_configs=None):
"""
加载插件
"""
# 发现所有可用插件
available_plugins = self.discover_plugins()
# 获取插件配置
plugin_configs = plugin_configs or self.crawler.settings.get('PLUGINS', {})
# 加载启用的插件
for plugin_name, config in plugin_configs.items():
if plugin_name in available_plugins:
plugin_info = available_plugins[plugin_name]
plugin_instance = plugin_info['instance']
# 检查插件是否启用
if plugin_instance.is_enabled(self.crawler.settings):
# 检查依赖
if self.check_dependencies(plugin_instance):
try:
# 初始化插件
plugin_instance.initialize(self.crawler)
# 存储插件
self.plugins[plugin_name] = plugin_info
self.plugin_instances[plugin_name] = plugin_instance
self.crawler.spider.logger.info(
f"Loaded plugin: {plugin_name}"
)
except Exception as e:
self.crawler.spider.logger.error(
f"Failed to initialize plugin {plugin_name}: {e}"
)
else:
self.crawler.spider.logger.warning(
f"Plugin {plugin_name} dependencies not met"
)
else:
self.crawler.spider.logger.info(
f"Plugin {plugin_name} is disabled"
)
else:
self.crawler.spider.logger.warning(
f"Plugin {plugin_name} not found"
)
# 按优先级排序
self.plugin_order = sorted(
self.plugin_instances.keys(),
key=lambda name: self.plugin_instances[name].priority
)
def check_dependencies(self, plugin):
"""
检查插件依赖
"""
for dependency in plugin.dependencies:
if dependency not in self.plugin_instances:
return False
return True
def get_plugin(self, plugin_name):
"""
获取插件实例
"""
return self.plugin_instances.get(plugin_name)
def get_plugins_by_type(self, plugin_type):
"""
按类型获取插件
"""
plugins = []
for plugin in self.plugin_instances.values():
if isinstance(plugin, plugin_type):
plugins.append(plugin)
return plugins
def unload_plugins(self):
"""
卸载所有插件
"""
for plugin_name, plugin in self.plugin_instances.items():
try:
plugin.finalize(self.crawler)
self.crawler.spider.logger.info(f"Unloaded plugin: {plugin_name}")
except Exception as e:
self.crawler.spider.logger.error(
f"Failed to finalize plugin {plugin_name}: {e}"
)
self.plugins.clear()
self.plugin_instances.clear()
self.plugin_order.clear()
'''
print("📄 插件管理器代码:")
print(plugin_manager_code)
def demonstrate_plugin_examples(self):
"""
演示插件示例
"""
print("\n🔌 插件实现示例:")
# 数据清洗插件
data_cleaner_plugin = '''
class DataCleanerPlugin(DataProcessorPlugin):
"""
数据清洗插件
"""
@property
def name(self):
return 'data_cleaner'
@property
def version(self):
return '1.0.0'
@property
def description(self):
return 'Clean and validate scraped data'
def __init__(self):
self.cleaning_rules = {}
self.validation_rules = {}
def initialize(self, crawler):
"""
初始化插件
"""
settings = crawler.settings
# 加载清洗规则
self.cleaning_rules = settings.getdict('DATA_CLEANER_RULES', {})
self.validation_rules = settings.getdict('DATA_VALIDATOR_RULES', {})
crawler.spider.logger.info("DataCleanerPlugin initialized")
def finalize(self, crawler):
"""
清理插件
"""
crawler.spider.logger.info("DataCleanerPlugin finalized")
def process_item(self, item, spider):
"""
处理数据项
"""
# 数据清洗
cleaned_item = self.clean_item(item)
# 数据验证
if self.validate_item(cleaned_item):
return cleaned_item
else:
spider.logger.warning(f"Item validation failed: {dict(item)}")
return None
def clean_item(self, item):
"""
清洗数据项
"""
cleaned_item = item.copy()
for field, rules in self.cleaning_rules.items():
if field in cleaned_item:
value = cleaned_item[field]
# 应用清洗规则
for rule in rules:
if rule == 'strip':
value = value.strip() if isinstance(value, str) else value
elif rule == 'lower':
value = value.lower() if isinstance(value, str) else value
elif rule == 'remove_html':
import re
value = re.sub(r'<[^>]+>', '', value) if isinstance(value, str) else value
elif rule.startswith('regex:'):
import re
pattern = rule[6:]
value = re.sub(pattern, '', value) if isinstance(value, str) else value
cleaned_item[field] = value
return cleaned_item
def validate_item(self, item):
"""
验证数据项
"""
for field, rules in self.validation_rules.items():
if field in item:
value = item[field]
for rule in rules:
if rule == 'required' and not value:
return False
elif rule == 'not_empty' and not str(value).strip():
return False
elif rule.startswith('min_length:'):
min_len = int(rule[11:])
if len(str(value)) < min_len:
return False
elif rule.startswith('max_length:'):
max_len = int(rule[11:])
if len(str(value)) > max_len:
return False
return True
'''
# 缓存插件
cache_plugin = '''
class CachePlugin(MiddlewarePlugin):
"""
缓存插件
"""
@property
def name(self):
return 'cache'
@property
def version(self):
return '1.0.0'
@property
def description(self):
return 'Cache responses to improve performance'
def __init__(self):
self.cache = {}
self.cache_enabled = True
self.cache_ttl = 3600 # 1小时
def initialize(self, crawler):
"""
初始化插件
"""
settings = crawler.settings
self.cache_enabled = settings.getbool('CACHE_ENABLED', True)
self.cache_ttl = settings.getint('CACHE_TTL', 3600)
# 可以集成Redis等外部缓存
cache_backend = settings.get('CACHE_BACKEND', 'memory')
if cache_backend == 'redis':
self.setup_redis_cache(settings)
crawler.spider.logger.info("CachePlugin initialized")
def finalize(self, crawler):
"""
清理插件
"""
self.cache.clear()
crawler.spider.logger.info("CachePlugin finalized")
def process_request(self, request, spider):
"""
处理请求 - 检查缓存
"""
if not self.cache_enabled:
return None
cache_key = self.get_cache_key(request)
cached_response = self.get_cached_response(cache_key)
if cached_response:
spider.logger.debug(f"Cache hit for {request.url}")
return cached_response
return None
def process_response(self, request, response, spider):
"""
处理响应 - 存储到缓存
"""
if not self.cache_enabled:
return response
if response.status == 200:
cache_key = self.get_cache_key(request)
self.cache_response(cache_key, response)
spider.logger.debug(f"Cached response for {request.url}")
return response
def get_cache_key(self, request):
"""
生成缓存键
"""
import hashlib
# 使用URL和请求头生成缓存键
key_data = f"{request.url}:{request.headers}"
return hashlib.md5(key_data.encode()).hexdigest()
def get_cached_response(self, cache_key):
"""
获取缓存的响应
"""
if cache_key in self.cache:
cached_data = self.cache[cache_key]
# 检查是否过期
import time
if time.time() - cached_data['timestamp'] < self.cache_ttl:
return cached_data['response']
else:
# 删除过期缓存
del self.cache[cache_key]
return None
def cache_response(self, cache_key, response):
"""
缓存响应
"""
import time
self.cache[cache_key] = {
'response': response,
'timestamp': time.time()
}
def setup_redis_cache(self, settings):
"""
设置Redis缓存
"""
try:
import redis
redis_host = settings.get('REDIS_HOST', 'localhost')
redis_port = settings.getint('REDIS_PORT', 6379)
redis_db = settings.getint('REDIS_DB', 0)
self.redis_client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db
)
except ImportError:
raise NotConfigured('Redis not available for caching')
'''
print("📄 数据清洗插件:")
print(data_cleaner_plugin)
print("\n📄 缓存插件:")
print(cache_plugin)
# 演示插件系统
plugin_demo = PluginSystemDemo()
plugin_demo.demonstrate_plugin_architecture()
plugin_demo.demonstrate_plugin_manager()
plugin_demo.demonstrate_plugin_examples()
print("插件架构设计演示完成!")
3.2 插件分发与管理
# 6. 插件分发与管理
print("\n📦 插件分发与管理:")
class PluginDistributionDemo:
"""
插件分发演示
"""
def demonstrate_plugin_packaging(self):
"""
演示插件打包
"""
print("\n📦 插件打包:")
# setup.py示例
setup_py_example = '''
# setup.py - 插件打包配置
from setuptools import setup, find_packages
setup(
name='scrapy-data-cleaner',
version='1.0.0',
description='A Scrapy plugin for data cleaning and validation',
long_description=open('README.md').read(),
long_description_content_type='text/markdown',
author='Your Name',
author_email='your.email@example.com',
url='https://github.com/yourusername/scrapy-data-cleaner',
packages=find_packages(),
classifiers=[
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Framework :: Scrapy',
],
python_requires='>=3.7',
install_requires=[
'scrapy>=2.0.0',
'lxml>=4.6.0',
],
extras_require={
'dev': [
'pytest>=6.0.0',
'pytest-cov>=2.10.0',
'black>=21.0.0',
'flake8>=3.8.0',
],
'redis': [
'redis>=3.5.0',
],
},
entry_points={
'scrapy.plugins': [
'data_cleaner = scrapy_data_cleaner.plugins:DataCleanerPlugin',
'cache = scrapy_data_cleaner.plugins:CachePlugin',
],
},
include_package_data=True,
package_data={
'scrapy_data_cleaner': ['config/*.yaml', 'templates/*.html'],
},
)
'''
# 项目结构
project_structure = '''
scrapy-data-cleaner/
├── scrapy_data_cleaner/
│ ├── __init__.py
│ ├── plugins/
│ │ ├── __init__.py
│ │ ├── data_cleaner.py
│ │ └── cache.py
│ ├── config/
│ │ └── default.yaml
│ └── utils/
│ ├── __init__.py
│ └── validators.py
├── tests/
│ ├── __init__.py
│ ├── test_data_cleaner.py
│ └── test_cache.py
├── docs/
│ ├── index.md
│ └── api.md
├── setup.py
├── README.md
├── LICENSE
├── MANIFEST.in
└── requirements.txt
'''
# MANIFEST.in示例
manifest_example = '''
# MANIFEST.in - 包含额外文件
include README.md
include LICENSE
include requirements.txt
recursive-include scrapy_data_cleaner/config *.yaml
recursive-include scrapy_data_cleaner/templates *.html
recursive-include docs *.md
recursive-exclude * __pycache__
recursive-exclude * *.py[co]
'''
print("📄 setup.py配置:")
print(setup_py_example)
print("\n📁 项目结构:")
print(project_structure)
print("\n📄 MANIFEST.in:")
print(manifest_example)
def demonstrate_plugin_registry(self):
"""
演示插件注册表
"""
print("\n📋 插件注册表:")
registry_code = '''
class PluginRegistry:
"""
插件注册表
"""
def __init__(self):
self.plugins = {}
self.categories = {}
self.dependencies = {}
def register_plugin(self, plugin_class, category=None):
"""
注册插件
"""
plugin_instance = plugin_class()
plugin_name = plugin_instance.name
# 检查插件是否已注册
if plugin_name in self.plugins:
raise ValueError(f"Plugin {plugin_name} already registered")
# 注册插件
self.plugins[plugin_name] = {
'class': plugin_class,
'instance': plugin_instance,
'category': category or 'general',
'version': plugin_instance.version,
'description': plugin_instance.description,
'dependencies': plugin_instance.dependencies
}
# 按类别分组
if category:
if category not in self.categories:
self.categories[category] = []
self.categories[category].append(plugin_name)
# 记录依赖关系
self.dependencies[plugin_name] = plugin_instance.dependencies
def get_plugin(self, plugin_name):
"""
获取插件信息
"""
return self.plugins.get(plugin_name)
def get_plugins_by_category(self, category):
"""
按类别获取插件
"""
return self.categories.get(category, [])
def list_plugins(self):
"""
列出所有插件
"""
return list(self.plugins.keys())
def check_dependencies(self, plugin_name):
"""
检查插件依赖
"""
if plugin_name not in self.plugins:
return False, f"Plugin {plugin_name} not found"
dependencies = self.dependencies.get(plugin_name, [])
missing_deps = []
for dep in dependencies:
if dep not in self.plugins:
missing_deps.append(dep)
if missing_deps:
return False, f"Missing dependencies: {missing_deps}"
return True, "All dependencies satisfied"
def get_dependency_order(self, plugin_names):
"""
获取依赖顺序
"""
# 拓扑排序算法
visited = set()
temp_visited = set()
order = []
def visit(plugin_name):
if plugin_name in temp_visited:
raise ValueError(f"Circular dependency detected: {plugin_name}")
if plugin_name not in visited:
temp_visited.add(plugin_name)
# 访问依赖
for dep in self.dependencies.get(plugin_name, []):
if dep in plugin_names:
visit(dep)
temp_visited.remove(plugin_name)
visited.add(plugin_name)
order.append(plugin_name)
for plugin_name in plugin_names:
if plugin_name not in visited:
visit(plugin_name)
return order
# 全局插件注册表
plugin_registry = PluginRegistry()
# 插件注册装饰器
def register_plugin(category=None):
"""
插件注册装饰器
"""
def decorator(plugin_class):
plugin_registry.register_plugin(plugin_class, category)
return plugin_class
return decorator
# 使用示例
@register_plugin(category='data_processing')
class DataCleanerPlugin(DataProcessorPlugin):
# 插件实现...
pass
@register_plugin(category='middleware')
class CachePlugin(MiddlewarePlugin):
# 插件实现...
pass
'''
print("📄 插件注册表代码:")
print(registry_code)
def demonstrate_plugin_installation(self):
"""
演示插件安装
"""
print("\n💾 插件安装方法:")
installation_methods = {
'pip安装': {
'description': '通过pip从PyPI安装',
'commands': [
'pip install scrapy-data-cleaner',
'pip install scrapy-data-cleaner[redis] # 安装额外依赖',
'pip install scrapy-data-cleaner==1.0.0 # 指定版本'
]
},
'开发安装': {
'description': '从源码安装用于开发',
'commands': [
'git clone https://github.com/user/scrapy-data-cleaner.git',
'cd scrapy-data-cleaner',
'pip install -e . # 可编辑安装',
'pip install -e .[dev] # 安装开发依赖'
]
},
'本地安装': {
'description': '从本地文件安装',
'commands': [
'pip install ./scrapy-data-cleaner',
'pip install scrapy-data-cleaner-1.0.0.tar.gz',
'pip install scrapy-data-cleaner-1.0.0-py3-none-any.whl'
]
}
}
for method, info in installation_methods.items():
print(f"\n🔧 {method}:")
print(f" 描述: {info['description']}")
print(f" 命令:")
for command in info['commands']:
print(f" $ {command}")
# 配置示例
configuration_example = '''
# settings.py - 插件配置
PLUGINS = {
'data_cleaner': {
'enabled': True,
'priority': 100,
},
'cache': {
'enabled': True,
'priority': 200,
},
}
# 插件特定配置
DATA_CLEANER_RULES = {
'title': ['strip', 'remove_html'],
'description': ['strip', 'regex:\\s+'],
}
DATA_VALIDATOR_RULES = {
'title': ['required', 'min_length:5'],
'price': ['required'],
}
CACHE_ENABLED = True
CACHE_TTL = 3600
CACHE_BACKEND = 'redis'
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
'''
print("\n⚙️ 插件配置示例:")
print(configuration_example)
# 演示插件分发
distribution_demo = PluginDistributionDemo()
distribution_demo.demonstrate_plugin_packaging()
distribution_demo.demonstrate_plugin_registry()
distribution_demo.demonstrate_plugin_installation()
print("插件分发与管理演示完成!")
4. 高级扩展开发技巧
4.1 性能优化技巧
# 7. 性能优化技巧
print("\n⚡ 性能优化技巧:")
class PerformanceOptimizationDemo:
"""
性能优化演示
"""
def demonstrate_async_extensions(self):
"""
演示异步扩展
"""
print("\n🔄 异步扩展开发:")
async_extension_code = '''
from twisted.internet import defer, reactor
from twisted.internet.defer import inlineCallbacks
class AsyncMonitoringExtension:
"""
异步监控扩展
"""
def __init__(self, crawler):
self.crawler = crawler
self.monitoring_tasks = []
self.is_running = False
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
@inlineCallbacks
def spider_opened(self, spider):
"""
爬虫开启时启动异步监控
"""
self.is_running = True
spider.logger.info("Starting async monitoring")
# 启动多个异步监控任务
self.monitoring_tasks = [
self.monitor_memory(),
self.monitor_network(),
self.monitor_performance()
]
# 并行执行监控任务
yield defer.DeferredList(self.monitoring_tasks)
def spider_closed(self, spider, reason):
"""
爬虫关闭时停止监控
"""
self.is_running = False
spider.logger.info("Stopping async monitoring")
# 取消所有监控任务
for task in self.monitoring_tasks:
if not task.called:
task.cancel()
@inlineCallbacks
def monitor_memory(self):
"""
异步内存监控
"""
import psutil
while self.is_running:
try:
memory_usage = psutil.virtual_memory().percent
if memory_usage > 90:
yield self.send_alert({
'type': 'memory',
'value': memory_usage,
'message': f'High memory usage: {memory_usage:.1f}%'
})
# 等待30秒后继续监控
yield self.sleep(30)
except Exception as e:
self.crawler.spider.logger.error(f"Memory monitoring error: {e}")
yield self.sleep(60) # 错误时等待更长时间
@inlineCallbacks
def monitor_network(self):
"""
异步网络监控
"""
while self.is_running:
try:
# 检查网络连接
yield self.check_network_connectivity()
# 监控下载速度
yield self.monitor_download_speed()
yield self.sleep(60)
except Exception as e:
self.crawler.spider.logger.error(f"Network monitoring error: {e}")
yield self.sleep(120)
@inlineCallbacks
def monitor_performance(self):
"""
异步性能监控
"""
while self.is_running:
try:
# 收集性能指标
stats = self.crawler.stats
requests_count = stats.get_value('downloader/request_count', 0)
response_count = stats.get_value('downloader/response_count', 0)
if requests_count > 0:
success_rate = (response_count / requests_count) * 100
if success_rate < 80:
yield self.send_alert({
'type': 'performance',
'value': success_rate,
'message': f'Low success rate: {success_rate:.1f}%'
})
yield self.sleep(120)
except Exception as e:
self.crawler.spider.logger.error(f"Performance monitoring error: {e}")
yield self.sleep(180)
@inlineCallbacks
def check_network_connectivity(self):
"""
检查网络连接
"""
from twisted.web.client import getPage
try:
yield getPage(b'http://httpbin.org/status/200', timeout=10)
except Exception as e:
yield self.send_alert({
'type': 'network',
'message': f'Network connectivity issue: {e}'
})
@inlineCallbacks
def monitor_download_speed(self):
"""
监控下载速度
"""
stats = self.crawler.stats
# 获取下载统计
bytes_downloaded = stats.get_value('downloader/response_bytes', 0)
# 这里可以实现更复杂的速度计算逻辑
# ...
defer.returnValue(None)
@inlineCallbacks
def send_alert(self, alert_data):
"""
发送告警
"""
from twisted.web.client import getPage
import json
webhook_url = self.crawler.settings.get('MONITORING_WEBHOOK_URL')
if webhook_url:
try:
alert_json = json.dumps(alert_data).encode('utf-8')
yield getPage(
webhook_url.encode('utf-8'),
method=b'POST',
postdata=alert_json,
headers={b'Content-Type': [b'application/json']},
timeout=10
)
except Exception as e:
self.crawler.spider.logger.error(f"Failed to send alert: {e}")
def sleep(self, seconds):
"""
异步睡眠
"""
d = defer.Deferred()
reactor.callLater(seconds, d.callback, None)
return d
'''
print("📄 异步扩展代码:")
print(async_extension_code)
def demonstrate_memory_optimization(self):
"""
演示内存优化
"""
print("\n💾 内存优化技巧:")
memory_optimization_code = '''
import weakref
from collections import deque
class MemoryOptimizedExtension:
"""
内存优化扩展
"""
def __init__(self, crawler):
self.crawler = crawler
# 使用弱引用避免循环引用
self._spider_ref = None
# 使用deque限制内存中的数据量
self.recent_requests = deque(maxlen=1000)
self.recent_responses = deque(maxlen=1000)
# 使用__slots__减少内存占用
self.__slots__ = ['crawler', '_spider_ref', 'recent_requests', 'recent_responses']
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def spider_opened(self, spider):
"""
使用弱引用存储spider
"""
self._spider_ref = weakref.ref(spider)
def request_scheduled(self, request, spider):
"""
高效存储请求信息
"""
# 只存储必要信息,避免存储整个request对象
request_info = {
'url': request.url,
'method': request.method,
'timestamp': time.time()
}
self.recent_requests.append(request_info)
def response_received(self, response, request, spider):
"""
高效存储响应信息
"""
# 只存储关键信息
response_info = {
'url': response.url,
'status': response.status,
'size': len(response.body),
'timestamp': time.time()
}
self.recent_responses.append(response_info)
def get_spider(self):
"""
安全获取spider引用
"""
if self._spider_ref:
return self._spider_ref()
return None
def cleanup_old_data(self):
"""
清理旧数据
"""
current_time = time.time()
cutoff_time = current_time - 3600 # 1小时前
# 清理旧请求
while (self.recent_requests and
self.recent_requests[0]['timestamp'] < cutoff_time):
self.recent_requests.popleft()
# 清理旧响应
while (self.recent_responses and
self.recent_responses[0]['timestamp'] < cutoff_time):
self.recent_responses.popleft()
class EfficientDataProcessor:
"""
高效数据处理器
"""
def __init__(self):
# 使用生成器处理大量数据
self.data_generator = None
# 使用对象池重用对象
self.object_pool = []
def process_large_dataset(self, data_source):
"""
使用生成器处理大数据集
"""
def data_generator():
for chunk in self.read_data_chunks(data_source):
# 逐块处理数据,避免一次性加载到内存
processed_chunk = self.process_chunk(chunk)
yield processed_chunk
return data_generator()
def read_data_chunks(self, data_source, chunk_size=1000):
"""
分块读取数据
"""
chunk = []
for item in data_source:
chunk.append(item)
if len(chunk) >= chunk_size:
yield chunk
chunk = []
# 处理最后一块
if chunk:
yield chunk
def process_chunk(self, chunk):
"""
处理数据块
"""
processed_items = []
for item in chunk:
# 从对象池获取处理器
processor = self.get_processor()
try:
processed_item = processor.process(item)
processed_items.append(processed_item)
finally:
# 归还处理器到对象池
self.return_processor(processor)
return processed_items
def get_processor(self):
"""
从对象池获取处理器
"""
if self.object_pool:
return self.object_pool.pop()
else:
return ItemProcessor() # 创建新的处理器
def return_processor(self, processor):
"""
归还处理器到对象池
"""
processor.reset() # 重置处理器状态
self.object_pool.append(processor)
class ItemProcessor:
"""
数据项处理器
"""
def __init__(self):
self.reset()
def process(self, item):
"""
处理数据项
"""
# 处理逻辑...
return item
def reset(self):
"""
重置处理器状态
"""
# 清理状态...
pass
'''
print("📄 内存优化代码:")
print(memory_optimization_code)
def demonstrate_caching_strategies(self):
"""
演示缓存策略
"""
print("\n🗄️ 缓存策略:")
caching_strategies_code = '''
import time
import hashlib
from functools import lru_cache, wraps
class CachingStrategiesExtension:
"""
缓存策略扩展
"""
def __init__(self, crawler):
self.crawler = crawler
# 多级缓存
self.l1_cache = {} # 内存缓存
self.l2_cache = None # Redis缓存
self.l3_cache = None # 文件缓存
# 缓存统计
self.cache_stats = {
'l1_hits': 0,
'l2_hits': 0,
'l3_hits': 0,
'misses': 0
}
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def spider_opened(self, spider):
"""
初始化缓存后端
"""
settings = crawler.settings
# 初始化Redis缓存
if settings.getbool('CACHE_REDIS_ENABLED', False):
self.setup_redis_cache(settings)
# 初始化文件缓存
if settings.getbool('CACHE_FILE_ENABLED', False):
self.setup_file_cache(settings)
def setup_redis_cache(self, settings):
"""
设置Redis缓存
"""
try:
import redis
self.l2_cache = redis.Redis(
host=settings.get('REDIS_HOST', 'localhost'),
port=settings.getint('REDIS_PORT', 6379),
db=settings.getint('REDIS_DB', 0),
decode_responses=True
)
except ImportError:
self.crawler.spider.logger.warning("Redis not available")
def setup_file_cache(self, settings):
"""
设置文件缓存
"""
import os
cache_dir = settings.get('CACHE_FILE_DIR', '.cache')
os.makedirs(cache_dir, exist_ok=True)
self.l3_cache = FileCacheBackend(cache_dir)
def get_cache_key(self, request):
"""
生成缓存键
"""
# 使用URL、方法和关键头部生成缓存键
key_data = f"{request.url}:{request.method}"
# 包含重要的请求头
important_headers = ['User-Agent', 'Accept', 'Authorization']
for header in important_headers:
if header in request.headers:
key_data += f":{request.headers[header]}"
return hashlib.md5(key_data.encode()).hexdigest()
def get_cached_response(self, cache_key):
"""
多级缓存查找
"""
# L1缓存查找(内存)
if cache_key in self.l1_cache:
cached_data = self.l1_cache[cache_key]
if not self.is_expired(cached_data):
self.cache_stats['l1_hits'] += 1
return cached_data['response']
else:
del self.l1_cache[cache_key]
# L2缓存查找(Redis)
if self.l2_cache:
try:
cached_data = self.l2_cache.get(cache_key)
if cached_data:
import pickle
cached_data = pickle.loads(cached_data.encode('latin1'))
if not self.is_expired(cached_data):
# 提升到L1缓存
self.l1_cache[cache_key] = cached_data
self.cache_stats['l2_hits'] += 1
return cached_data['response']
else:
self.l2_cache.delete(cache_key)
except Exception as e:
self.crawler.spider.logger.warning(f"Redis cache error: {e}")
# L3缓存查找(文件)
if self.l3_cache:
cached_data = self.l3_cache.get(cache_key)
if cached_data and not self.is_expired(cached_data):
# 提升到上级缓存
self.l1_cache[cache_key] = cached_data
if self.l2_cache:
try:
import pickle
self.l2_cache.setex(
cache_key,
3600, # 1小时TTL
pickle.dumps(cached_data).decode('latin1')
)
except Exception:
pass
self.cache_stats['l3_hits'] += 1
return cached_data['response']
# 缓存未命中
self.cache_stats['misses'] += 1
return None
def cache_response(self, cache_key, response, ttl=3600):
"""
多级缓存存储
"""
cached_data = {
'response': response,
'timestamp': time.time(),
'ttl': ttl
}
# 存储到L1缓存
self.l1_cache[cache_key] = cached_data
# 存储到L2缓存
if self.l2_cache:
try:
import pickle
self.l2_cache.setex(
cache_key,
ttl,
pickle.dumps(cached_data).decode('latin1')
)
except Exception as e:
self.crawler.spider.logger.warning(f"Redis cache store error: {e}")
# 存储到L3缓存
if self.l3_cache:
self.l3_cache.set(cache_key, cached_data)
def is_expired(self, cached_data):
"""
检查缓存是否过期
"""
return (time.time() - cached_data['timestamp']) > cached_data['ttl']
@lru_cache(maxsize=1000)
def expensive_computation(self, data):
"""
使用LRU缓存装饰器缓存计算结果
"""
# 模拟昂贵的计算
import hashlib
return hashlib.sha256(data.encode()).hexdigest()
def cache_with_timeout(self, timeout=300):
"""
带超时的缓存装饰器
"""
def decorator(func):
cache = {}
@wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存键
key = str(args) + str(sorted(kwargs.items()))
# 检查缓存
if key in cache:
result, timestamp = cache[key]
if time.time() - timestamp < timeout:
return result
else:
del cache[key]
# 执行函数并缓存结果
result = func(*args, **kwargs)
cache[key] = (result, time.time())
return result
return wrapper
return decorator
class FileCacheBackend:
"""
文件缓存后端
"""
def __init__(self, cache_dir):
self.cache_dir = cache_dir
def get(self, key):
"""
获取缓存
"""
import os
import pickle
cache_file = os.path.join(self.cache_dir, f"{key}.cache")
if os.path.exists(cache_file):
try:
with open(cache_file, 'rb') as f:
return pickle.load(f)
except Exception:
# 删除损坏的缓存文件
os.remove(cache_file)
return None
def set(self, key, data):
"""
设置缓存
"""
import os
import pickle
cache_file = os.path.join(self.cache_dir, f"{key}.cache")
try:
with open(cache_file, 'wb') as f:
pickle.dump(data, f)
except Exception as e:
# 记录错误但不抛出异常
pass
'''
print("📄 缓存策略代码:")
print(caching_strategies_code)
# 演示性能优化
perf_demo = PerformanceOptimizationDemo()
perf_demo.demonstrate_async_extensions()
perf_demo.demonstrate_memory_optimization()
perf_demo.demonstrate_caching_strategies()
print("性能优化技巧演示完成!")
4.2 调试与测试技巧
# 8. 调试与测试技巧
print("\n🐛 调试与测试技巧:")
class DebuggingTestingDemo:
"""
调试与测试演示
"""
def demonstrate_extension_debugging(self):
"""
演示扩展调试
"""
print("\n🔍 扩展调试技巧:")
debugging_code = '''
import logging
import traceback
import functools
from scrapy.utils.log import configure_logging
class DebuggableExtension:
"""
可调试的扩展基类
"""
def __init__(self, crawler):
self.crawler = crawler
self.logger = logging.getLogger(self.__class__.__name__)
# 设置调试模式
self.debug_mode = crawler.settings.getbool('DEBUG_EXTENSIONS', False)
if self.debug_mode:
self.setup_debug_logging()
def setup_debug_logging(self):
"""
设置调试日志
"""
# 创建详细的日志格式
formatter = logging.Formatter(
'%(asctime)s [%(name)s] %(levelname)s: %(message)s'
)
# 创建文件处理器
file_handler = logging.FileHandler(f'{self.__class__.__name__}.debug.log')
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.DEBUG)
self.logger.addHandler(file_handler)
self.logger.setLevel(logging.DEBUG)
def debug_method(self, method_name):
"""
方法调试装饰器
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
if self.debug_mode:
self.logger.debug(f"Entering {method_name} with args: {args[1:]}, kwargs: {kwargs}")
try:
result = func(*args, **kwargs)
if self.debug_mode:
self.logger.debug(f"Exiting {method_name} with result: {result}")
return result
except Exception as e:
self.logger.error(f"Error in {method_name}: {e}")
self.logger.error(f"Traceback: {traceback.format_exc()}")
raise
return wrapper
return decorator
def log_stats(self):
"""
记录统计信息
"""
if self.debug_mode:
stats = self.crawler.stats
self.logger.debug("Current stats:")
for key, value in stats.get_stats().items():
self.logger.debug(f" {key}: {value}")
class DebugMonitoringExtension(DebuggableExtension):
"""
调试监控扩展示例
"""
def __init__(self, crawler):
super().__init__(crawler)
self.request_count = 0
self.response_count = 0
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def spider_opened(self, spider):
"""
爬虫开启
"""
self.debug_method('spider_opened')(self._spider_opened)(spider)
def _spider_opened(self, spider):
self.logger.info(f"Spider {spider.name} opened")
self.log_stats()
def request_scheduled(self, request, spider):
"""
请求调度
"""
self.debug_method('request_scheduled')(self._request_scheduled)(request, spider)
def _request_scheduled(self, request, spider):
self.request_count += 1
if self.debug_mode:
self.logger.debug(f"Request scheduled: {request.url}")
self.logger.debug(f"Total requests: {self.request_count}")
def response_received(self, response, request, spider):
"""
响应接收
"""
self.debug_method('response_received')(self._response_received)(response, request, spider)
def _response_received(self, response, request, spider):
self.response_count += 1
if self.debug_mode:
self.logger.debug(f"Response received: {response.url} (status: {response.status})")
self.logger.debug(f"Total responses: {self.response_count}")
# 记录响应头
for header, value in response.headers.items():
self.logger.debug(f" {header.decode()}: {value[0].decode()}")
def spider_closed(self, spider, reason):
"""
爬虫关闭
"""
self.debug_method('spider_closed')(self._spider_closed)(spider, reason)
def _spider_closed(self, spider, reason):
self.logger.info(f"Spider {spider.name} closed: {reason}")
self.logger.info(f"Final stats - Requests: {self.request_count}, Responses: {self.response_count}")
self.log_stats()
# 性能分析装饰器
def profile_method(func):
"""
性能分析装饰器
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
import time
import psutil
import os
# 记录开始时间和内存
start_time = time.time()
process = psutil.Process(os.getpid())
start_memory = process.memory_info().rss / 1024 / 1024 # MB
try:
result = func(*args, **kwargs)
# 记录结束时间和内存
end_time = time.time()
end_memory = process.memory_info().rss / 1024 / 1024 # MB
# 计算性能指标
execution_time = end_time - start_time
memory_delta = end_memory - start_memory
# 记录性能信息
logger = logging.getLogger(func.__module__)
logger.info(f"Performance - {func.__name__}:")
logger.info(f" Execution time: {execution_time:.3f}s")
logger.info(f" Memory usage: {memory_delta:+.2f}MB")
return result
except Exception as e:
logger = logging.getLogger(func.__module__)
logger.error(f"Error in {func.__name__}: {e}")
raise
return wrapper
'''
print("📄 调试技巧代码:")
print(debugging_code)
def demonstrate_unit_testing(self):
"""
演示单元测试
"""
print("\n🧪 单元测试:")
testing_code = '''
import unittest
from unittest.mock import Mock, patch, MagicMock
from scrapy.http import Request, Response
from scrapy.utils.test import get_crawler
class TestExtensionBase(unittest.TestCase):
"""
扩展测试基类
"""
def setUp(self):
"""
测试设置
"""
self.crawler = get_crawler()
self.spider = Mock()
self.spider.name = 'test_spider'
self.spider.logger = Mock()
def create_request(self, url='http://example.com', **kwargs):
"""
创建测试请求
"""
return Request(url, **kwargs)
def create_response(self, url='http://example.com', status=200, body=b'', **kwargs):
"""
创建测试响应
"""
return Response(url, status=status, body=body, **kwargs)
class TestMonitoringExtension(TestExtensionBase):
"""
监控扩展测试
"""
def setUp(self):
super().setUp()
self.extension = MonitoringExtension.from_crawler(self.crawler)
def test_spider_opened(self):
"""
测试爬虫开启
"""
# 执行方法
self.extension.spider_opened(self.spider)
# 验证结果
self.assertTrue(self.extension.is_monitoring)
self.spider.logger.info.assert_called_with('Monitoring started')
def test_request_scheduled(self):
"""
测试请求调度
"""
request = self.create_request()
# 执行方法
self.extension.request_scheduled(request, self.spider)
# 验证结果
self.assertEqual(self.extension.request_count, 1)
def test_response_received(self):
"""
测试响应接收
"""
request = self.create_request()
response = self.create_response()
# 执行方法
self.extension.response_received(response, request, self.spider)
# 验证结果
self.assertEqual(self.extension.response_count, 1)
@patch('time.time')
def test_performance_monitoring(self, mock_time):
"""
测试性能监控
"""
# 模拟时间
mock_time.side_effect = [1000, 1005] # 5秒间隔
request = self.create_request()
response = self.create_response()
# 执行方法
self.extension.request_scheduled(request, self.spider)
self.extension.response_received(response, request, self.spider)
# 验证性能计算
performance_data = self.extension.get_performance_data()
self.assertEqual(performance_data['avg_response_time'], 5.0)
def test_error_handling(self):
"""
测试错误处理
"""
# 模拟错误情况
with patch.object(self.extension, 'send_alert') as mock_alert:
# 触发错误条件
self.extension.check_error_rate(error_rate=0.5) # 50%错误率
# 验证告警被发送
mock_alert.assert_called_once()
class TestDataProcessingExtension(TestExtensionBase):
"""
数据处理扩展测试
"""
def setUp(self):
super().setUp()
# 设置测试配置
self.crawler.settings.set('DATA_CLEANER_RULES', {
'title': ['strip', 'remove_html'],
'price': ['strip']
})
self.extension = DataCleanerPlugin.from_crawler(self.crawler)
def test_data_cleaning(self):
"""
测试数据清洗
"""
# 创建测试数据
item = {
'title': ' <h1>Test Title</h1> ',
'price': ' $19.99 '
}
# 执行清洗
cleaned_item = self.extension.clean_item(item)
# 验证结果
self.assertEqual(cleaned_item['title'], 'Test Title')
self.assertEqual(cleaned_item['price'], '$19.99')
def test_data_validation(self):
"""
测试数据验证
"""
# 设置验证规则
self.extension.validation_rules = {
'title': ['required', 'min_length:5'],
'price': ['required']
}
# 测试有效数据
valid_item = {'title': 'Valid Title', 'price': '$19.99'}
self.assertTrue(self.extension.validate_item(valid_item))
# 测试无效数据
invalid_item = {'title': 'Hi', 'price': ''}
self.assertFalse(self.extension.validate_item(invalid_item))
def test_process_item(self):
"""
测试数据项处理
"""
item = {
'title': ' <h1>Test Title</h1> ',
'price': ' $19.99 '
}
# 执行处理
result = self.extension.process_item(item, self.spider)
# 验证结果
self.assertIsNotNone(result)
self.assertEqual(result['title'], 'Test Title')
# 集成测试
class TestExtensionIntegration(unittest.TestCase):
"""
扩展集成测试
"""
def test_extension_loading(self):
"""
测试扩展加载
"""
from scrapy.crawler import CrawlerProcess
# 创建爬虫进程
process = CrawlerProcess({
'EXTENSIONS': {
'myproject.extensions.MonitoringExtension': 500,
}
})
# 验证扩展被正确加载
crawler = process.create_crawler('test_spider')
self.assertIn('myproject.extensions.MonitoringExtension',
crawler.extensions.middlewares)
@patch('scrapy.crawler.CrawlerRunner.crawl')
def test_extension_lifecycle(self, mock_crawl):
"""
测试扩展生命周期
"""
from scrapy.crawler import CrawlerRunner
# 创建爬虫运行器
runner = CrawlerRunner({
'EXTENSIONS': {
'myproject.extensions.MonitoringExtension': 500,
}
})
# 模拟爬虫运行
mock_crawl.return_value = Mock()
# 验证扩展生命周期方法被调用
# 这里需要根据具体的扩展实现来验证
# 测试运行器
def run_tests():
"""
运行所有测试
"""
# 创建测试套件
test_suite = unittest.TestSuite()
# 添加测试用例
test_suite.addTest(unittest.makeSuite(TestMonitoringExtension))
test_suite.addTest(unittest.makeSuite(TestDataProcessingExtension))
test_suite.addTest(unittest.makeSuite(TestExtensionIntegration))
# 运行测试
runner = unittest.TextTestRunner(verbosity=2)
result = runner.run(test_suite)
return result.wasSuccessful()
if __name__ == '__main__':
success = run_tests()
exit(0 if success else 1)
'''
print("📄 单元测试代码:")
print(testing_code)
def demonstrate_integration_testing(self):
"""
演示集成测试
"""
print("\n🔗 集成测试:")
integration_testing_code = '''
import pytest
from scrapy.crawler import CrawlerProcess
from scrapy.utils.test import get_crawler
from scrapy.http import Request, Response
from twisted.internet import defer
from twisted.trial import unittest
class IntegrationTestCase(unittest.TestCase):
"""
集成测试基类
"""
def setUp(self):
"""
设置测试环境
"""
self.crawler = get_crawler(settings_dict={
'EXTENSIONS': {
'myproject.extensions.MonitoringExtension': 500,
'myproject.extensions.DataCleanerPlugin': 600,
},
'DEBUG_EXTENSIONS': True,
})
@defer.inlineCallbacks
def test_full_crawling_pipeline(self):
"""
测试完整的爬取流程
"""
# 创建测试爬虫
spider = self.create_test_spider()
# 启动爬虫
yield self.crawler.crawl(spider)
# 验证扩展工作正常
stats = self.crawler.stats
# 检查监控扩展统计
self.assertGreater(stats.get_value('monitoring/requests_monitored', 0), 0)
# 检查数据清洗扩展统计
self.assertGreater(stats.get_value('data_cleaner/items_processed', 0), 0)
def create_test_spider(self):
"""
创建测试爬虫
"""
from scrapy import Spider
class TestSpider(Spider):
name = 'test_spider'
start_urls = ['http://httpbin.org/html']
def parse(self, response):
yield {
'title': response.css('title::text').get(),
'url': response.url
}
return TestSpider()
@defer.inlineCallbacks
def test_extension_interaction(self):
"""
测试扩展间交互
"""
# 模拟扩展间的数据传递
monitoring_ext = self.crawler.extensions.middlewares[
'myproject.extensions.MonitoringExtension'
]
data_cleaner_ext = self.crawler.extensions.middlewares[
'myproject.extensions.DataCleanerPlugin'
]
# 测试扩展间的协作
test_item = {'title': ' Test Title ', 'url': 'http://example.com'}
# 数据清洗
cleaned_item = data_cleaner_ext.process_item(test_item, None)
# 监控记录
monitoring_ext.record_item_processed(cleaned_item)
# 验证结果
self.assertEqual(cleaned_item['title'], 'Test Title')
self.assertEqual(monitoring_ext.items_processed, 1)
# 性能测试
class PerformanceTestCase(unittest.TestCase):
"""
性能测试
"""
def test_extension_performance(self):
"""
测试扩展性能
"""
import time
# 创建大量测试数据
test_items = [
{'title': f'Title {i}', 'content': f'Content {i}' * 100}
for i in range(1000)
]
# 测试数据处理性能
extension = DataCleanerPlugin.from_crawler(get_crawler())
start_time = time.time()
for item in test_items:
extension.process_item(item, None)
end_time = time.time()
processing_time = end_time - start_time
# 验证性能要求(每秒处理至少100个项目)
items_per_second = len(test_items) / processing_time
self.assertGreater(items_per_second, 100)
def test_memory_usage(self):
"""
测试内存使用
"""
import psutil
import os
process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss
# 创建扩展并处理大量数据
extension = MonitoringExtension.from_crawler(get_crawler())
# 模拟大量请求
for i in range(10000):
request = Request(f'http://example.com/{i}')
extension.request_scheduled(request, None)
final_memory = process.memory_info().rss
memory_increase = final_memory - initial_memory
# 验证内存增长在合理范围内(小于100MB)
self.assertLess(memory_increase, 100 * 1024 * 1024)
# 端到端测试
@pytest.mark.e2e
class EndToEndTestCase:
"""
端到端测试
"""
def test_real_website_crawling(self):
"""
测试真实网站爬取
"""
from scrapy.crawler import CrawlerProcess
# 配置爬虫进程
process = CrawlerProcess({
'USER_AGENT': 'Test Spider',
'ROBOTSTXT_OBEY': True,
'EXTENSIONS': {
'myproject.extensions.MonitoringExtension': 500,
},
'DOWNLOAD_DELAY': 1,
})
# 创建测试爬虫
class E2ETestSpider(scrapy.Spider):
name = 'e2e_test'
start_urls = ['http://quotes.toscrape.com/']
def parse(self, response):
for quote in response.css('div.quote'):
yield {
'text': quote.css('span.text::text').get(),
'author': quote.css('small.author::text').get(),
}
# 运行爬虫
process.crawl(E2ETestSpider)
process.start()
# 验证结果(这里需要根据实际情况调整)
assert True # 如果爬虫成功运行,测试通过
# 测试配置
pytest_config = '''
# pytest.ini
[tool:pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
markers =
unit: Unit tests
integration: Integration tests
e2e: End-to-end tests
slow: Slow running tests
addopts =
--verbose
--tb=short
--strict-markers
'''
# 运行测试的脚本
test_runner_script = '''
#!/usr/bin/env python
"""
测试运行脚本
"""
import sys
import pytest
def run_unit_tests():
"""运行单元测试"""
return pytest.main(['-m', 'unit', '-v'])
def run_integration_tests():
"""运行集成测试"""
return pytest.main(['-m', 'integration', '-v'])
def run_e2e_tests():
"""运行端到端测试"""
return pytest.main(['-m', 'e2e', '-v'])
def run_all_tests():
"""运行所有测试"""
return pytest.main(['-v'])
if __name__ == '__main__':
test_type = sys.argv[1] if len(sys.argv) > 1 else 'all'
if test_type == 'unit':
exit_code = run_unit_tests()
elif test_type == 'integration':
exit_code = run_integration_tests()
elif test_type == 'e2e':
exit_code = run_e2e_tests()
else:
exit_code = run_all_tests()
sys.exit(exit_code)
'''
print("📄 集成测试代码:")
print(integration_testing_code)
print("\n📄 pytest配置:")
print(pytest_config)
print("\n📄 测试运行脚本:")
print(test_runner_script)
# 演示调试与测试
debug_test_demo = DebuggingTestingDemo()
debug_test_demo.demonstrate_extension_debugging()
debug_test_demo.demonstrate_unit_testing()
debug_test_demo.demonstrate_integration_testing()
print("调试与测试技巧演示完成!")
5. 本章小结
5.1 核心概念回顾
# 9. 本章小结
print("\n📚 本章小结:")
class ChapterSummary:
"""
第9章总结
"""
def summarize_key_concepts(self):
"""
总结核心概念
"""
print("\n🎯 核心概念:")
key_concepts = {
'扩展系统': [
'扩展是Scrapy的核心组件,用于增强爬虫功能',
'扩展通过信号系统与爬虫生命周期集成',
'扩展可以监控、修改和控制爬虫行为'
],
'扩展类型': [
'统计扩展:收集和报告爬虫统计信息',
'监控扩展:实时监控爬虫状态和性能',
'日志扩展:增强日志记录和分析功能',
'缓存扩展:提供响应缓存功能'
],
'插件系统': [
'插件是可重用的扩展组件',
'插件支持依赖管理和版本控制',
'插件可以通过包管理器分发和安装'
],
'性能优化': [
'异步编程提高扩展性能',
'内存优化减少资源消耗',
'多级缓存提升响应速度',
'对象池重用减少创建开销'
],
'调试测试': [
'单元测试验证扩展功能',
'集成测试确保扩展协作',
'性能测试保证扩展效率',
'调试工具帮助问题定位'
]
}
for concept, points in key_concepts.items():
print(f"\n📖 {concept}:")
for point in points:
print(f" • {point}")
def summarize_best_practices(self):
"""
总结最佳实践
"""
print("\n✅ 最佳实践:")
best_practices = [
"设计原则:",
" • 单一职责:每个扩展只负责一个特定功能",
" • 松耦合:扩展间通过信号系统通信",
" • 可配置:通过settings提供配置选项",
" • 可测试:编写完整的单元测试和集成测试",
"",
"开发规范:",
" • 使用from_crawler类方法创建扩展实例",
" • 正确处理异常,避免影响爬虫运行",
" • 实现适当的清理逻辑,防止资源泄漏",
" • 提供详细的日志记录和错误信息",
"",
"性能优化:",
" • 使用异步编程处理I/O密集型操作",
" • 实现合理的缓存策略",
" • 避免在热路径中进行昂贵操作",
" • 使用内存优化技术减少资源消耗",
"",
"部署管理:",
" • 使用版本控制管理扩展代码",
" • 提供完整的文档和使用示例",
" • 实现适当的监控和告警机制",
" • 支持热重载和动态配置更新"
]
for practice in best_practices:
print(f" {practice}")
def summarize_common_pitfalls(self):
"""
总结常见陷阱
"""
print("\n⚠️ 常见陷阱:")
pitfalls = [
"内存泄漏:",
" • 未正确清理事件监听器",
" • 循环引用导致对象无法回收",
" • 缓存数据无限增长",
"",
"性能问题:",
" • 在信号处理器中执行阻塞操作",
" • 频繁的数据库或网络访问",
" • 不必要的数据序列化和反序列化",
"",
"并发问题:",
" • 共享状态的竞态条件",
" • 不正确的异步编程模式",
" • 死锁和资源竞争",
"",
"配置错误:",
" • 扩展优先级设置不当",
" • 依赖关系配置错误",
" • 环境特定配置硬编码"
]
for pitfall in pitfalls:
print(f" {pitfall}")
# 生成总结
summary = ChapterSummary()
summary.summarize_key_concepts()
summary.summarize_best_practices()
summary.summarize_common_pitfalls()
print("\n🎓 第9章学习完成!")
5.2 下一步学习建议
# 10. 下一步学习建议
print("\n🚀 下一步学习建议:")
next_steps = [
"深入学习:",
" • 研究Scrapy源码,理解扩展系统实现原理",
" • 学习Twisted框架,掌握异步编程模式",
" • 探索更多第三方扩展,了解最佳实践",
"",
"实践项目:",
" • 开发自定义监控扩展,集成到实际项目",
" • 创建数据处理插件,提高数据质量",
" • 构建分布式爬虫扩展,支持大规模爬取",
"",
"进阶主题:",
" • 学习微服务架构,将扩展服务化",
" • 研究机器学习集成,智能化数据处理",
" • 探索云原生部署,容器化扩展管理",
"",
"社区参与:",
" • 贡献开源扩展项目",
" • 分享扩展开发经验",
" • 参与Scrapy社区讨论"
]
for step in next_steps:
print(f" {step}")
print("\n📖 推荐阅读:")
print(" • Scrapy官方文档 - 扩展开发指南")
print(" • Twisted文档 - 异步编程教程")
print(" • Python包装和分发指南")
print(" • 软件测试最佳实践")
print("\n🎯 学习目标检查清单:")
checklist = [
"□ 理解Scrapy扩展系统架构",
"□ 能够开发自定义扩展",
"□ 掌握插件开发和分发",
"□ 了解性能优化技巧",
"□ 能够进行扩展测试和调试",
"□ 熟悉扩展部署和管理"
]
for item in checklist:
print(f" {item}")
5.3 练习题
# 11. 练习题
print("\n📝 练习题:")
exercises = [
"基础练习:",
"1. 开发一个简单的统计扩展,记录爬虫的请求数、响应数和错误数",
"2. 创建一个日志扩展,将不同级别的日志输出到不同文件",
"3. 实现一个缓存扩展,支持内存和文件两种缓存方式",
"",
"进阶练习:",
"4. 开发一个监控扩展,实时监控爬虫性能并发送告警",
"5. 创建一个数据验证插件,支持自定义验证规则",
"6. 实现一个分布式任务分发扩展,支持多节点协作",
"",
"高级练习:",
"7. 开发一个智能重试扩展,根据错误类型调整重试策略",
"8. 创建一个数据去重插件,支持多种去重算法",
"9. 实现一个性能分析扩展,生成详细的性能报告",
"",
"项目练习:",
"10. 设计并实现一个完整的爬虫监控系统",
"11. 开发一个可视化扩展,提供Web界面监控爬虫状态",
"12. 创建一个扩展市场,支持扩展的发布、下载和管理"
]
for exercise in exercises:
print(f" {exercise}")
print("\n✨ 第9章《扩展开发与插件系统》学习完成!")
print("🎉 恭喜你掌握了Scrapy扩展开发的核心技能!")