学习目标
通过本章学习,您将掌握:
- 理解Scrapy中间件的工作原理和类型
- 掌握下载中间件的开发和配置
- 学会Spider中间件的实现和应用
- 了解中间件的最佳实践和性能优化
- 实现常见的中间件功能(代理、用户代理、缓存等)
1. 中间件基础概念
1.1 什么是中间件
中间件是Scrapy框架中的一个重要组件,它允许您在请求和响应的处理过程中插入自定义逻辑。
# 1. 中间件基础概念演示
print("🔧 中间件基础概念:")
# 中间件的作用
middleware_roles = {
"请求处理": "在发送请求前修改请求",
"响应处理": "在处理响应前修改响应",
"异常处理": "处理请求和响应过程中的异常",
"统计监控": "收集爬虫运行统计信息",
"缓存管理": "实现请求和响应的缓存",
"代理管理": "动态切换代理服务器"
}
for role, description in middleware_roles.items():
print(f" {role}: {description}")
# 中间件类型
middleware_types = {
"下载中间件": "处理请求和响应",
"Spider中间件": "处理Spider的输入和输出"
}
print("\n中间件类型:")
for mtype, description in middleware_types.items():
print(f" {mtype}: {description}")
1.2 中间件工作流程
# 2. 中间件工作流程
print("\n⚡ 中间件工作流程:")
class MiddlewareWorkflow:
"""
中间件工作流程演示
"""
def __init__(self):
self.request_middlewares = []
self.response_middlewares = []
self.spider_middlewares = []
def demonstrate_workflow(self):
"""
演示中间件工作流程
"""
print("\n1. 请求处理流程:")
request_flow = [
"1. Spider生成Request",
"2. 经过Spider中间件处理",
"3. 进入调度器队列",
"4. 经过下载中间件处理",
"5. 发送到目标服务器"
]
for step in request_flow:
print(f" {step}")
print("\n2. 响应处理流程:")
response_flow = [
"1. 服务器返回响应",
"2. 经过下载中间件处理",
"3. 传递给Spider处理",
"4. 经过Spider中间件处理",
"5. 生成Items和新的Requests"
]
for step in response_flow:
print(f" {step}")
print("\n3. 中间件执行顺序:")
execution_order = {
"请求处理": "按优先级从小到大执行",
"响应处理": "按优先级从大到小执行",
"异常处理": "按优先级从小到大执行"
}
for process, order in execution_order.items():
print(f" {process}: {order}")
# 创建工作流程演示实例
workflow = MiddlewareWorkflow()
workflow.demonstrate_workflow()
print("中间件工作流程演示完成!")
2. 下载中间件开发
2.1 基础下载中间件
# 3. 基础下载中间件开发
print("\n🔽 基础下载中间件开发:")
import scrapy
from scrapy.downloadermiddlewares.retry import RetryMiddleware
from scrapy.exceptions import IgnoreRequest, NotConfigured
from scrapy.http import HtmlResponse
import random
import time
import logging
class BasicDownloaderMiddleware:
"""
基础下载中间件示例
"""
def __init__(self, settings):
"""
初始化中间件
"""
self.settings = settings
self.enabled = settings.getbool('BASIC_MIDDLEWARE_ENABLED', True)
if not self.enabled:
raise NotConfigured('BasicDownloaderMiddleware is disabled')
# 配置参数
self.delay_range = settings.getfloat('DOWNLOAD_DELAY_RANGE', 1.0)
self.retry_times = settings.getint('RETRY_TIMES', 3)
# 统计信息
self.stats = {
'requests_processed': 0,
'responses_processed': 0,
'exceptions_handled': 0
}
self.logger = logging.getLogger(__name__)
@classmethod
def from_crawler(cls, crawler):
"""
从crawler创建中间件实例
"""
return cls(crawler.settings)
def process_request(self, request, spider):
"""
处理请求
返回值说明:
- None: 继续处理
- Response: 直接返回响应,不发送请求
- Request: 替换原请求
- IgnoreRequest: 忽略请求
"""
self.stats['requests_processed'] += 1
# 1. 添加随机延迟
if self.delay_range > 0:
delay = random.uniform(0, self.delay_range)
time.sleep(delay)
self.logger.debug(f'Request delayed by {delay:.2f}s: {request.url}')
# 2. 添加请求头
request.headers.setdefault('User-Agent', self._get_random_user_agent())
request.headers.setdefault('Accept-Language', 'zh-CN,zh;q=0.9,en;q=0.8')
# 3. 添加请求元数据
request.meta['middleware_processed'] = True
request.meta['process_time'] = time.time()
self.logger.debug(f'Processing request: {request.url}')
# 返回None继续处理
return None
def process_response(self, request, response, spider):
"""
处理响应
返回值说明:
- Response: 返回响应
- Request: 重新请求
- IgnoreRequest: 忽略请求
"""
self.stats['responses_processed'] += 1
# 1. 记录处理时间
if 'process_time' in request.meta:
process_time = time.time() - request.meta['process_time']
self.logger.debug(f'Response processed in {process_time:.2f}s: {response.url}')
# 2. 检查响应状态
if response.status in [200, 301, 302]:
self.logger.debug(f'Successful response: {response.status} {response.url}')
return response
# 3. 处理错误状态
elif response.status in [403, 404, 500, 502, 503]:
self.logger.warning(f'Error response: {response.status} {response.url}')
# 可以选择重试或忽略
retry_times = request.meta.get('retry_times', 0)
if retry_times < self.retry_times:
self.logger.info(f'Retrying request: {request.url} (attempt {retry_times + 1})')
retry_req = request.copy()
retry_req.meta['retry_times'] = retry_times + 1
return retry_req
else:
self.logger.error(f'Max retries exceeded: {request.url}')
raise IgnoreRequest(f'Max retries exceeded: {response.status}')
return response
def process_exception(self, request, exception, spider):
"""
处理异常
返回值说明:
- None: 继续处理异常
- Response: 返回响应
- Request: 重新请求
"""
self.stats['exceptions_handled'] += 1
self.logger.error(f'Exception in request {request.url}: {exception}')
# 记录异常类型
exception_type = type(exception).__name__
request.meta['exception_type'] = exception_type
# 根据异常类型决定处理方式
if exception_type in ['TimeoutError', 'ConnectionError']:
# 网络异常,可以重试
retry_times = request.meta.get('retry_times', 0)
if retry_times < self.retry_times:
self.logger.info(f'Retrying after {exception_type}: {request.url}')
retry_req = request.copy()
retry_req.meta['retry_times'] = retry_times + 1
return retry_req
# 其他异常,继续处理
return None
def _get_random_user_agent(self):
"""
获取随机User-Agent
"""
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
]
return random.choice(user_agents)
def get_stats(self):
"""
获取统计信息
"""
return self.stats.copy()
print("基础下载中间件开发完成!")
2.2 代理中间件
# 4. 代理中间件开发
print("\n🌐 代理中间件开发:")
import requests
from urllib.parse import urlparse
class ProxyMiddleware:
"""
代理中间件
"""
def __init__(self, settings):
self.settings = settings
self.proxy_list = settings.getlist('PROXY_LIST', [])
self.proxy_mode = settings.get('PROXY_MODE', 'random') # random, round_robin, failover
self.proxy_auth = settings.get('PROXY_AUTH', None)
# 代理状态管理
self.proxy_stats = {}
self.current_proxy_index = 0
self.failed_proxies = set()
# 初始化代理统计
for proxy in self.proxy_list:
self.proxy_stats[proxy] = {
'success_count': 0,
'failure_count': 0,
'last_used': None,
'response_time': []
}
self.logger = logging.getLogger(__name__)
if not self.proxy_list:
raise NotConfigured('No proxy list provided')
@classmethod
def from_crawler(cls, crawler):
return cls(crawler.settings)
def process_request(self, request, spider):
"""
为请求分配代理
"""
# 跳过已经设置代理的请求
if 'proxy' in request.meta:
return None
# 获取可用代理
proxy = self._get_proxy(request)
if proxy:
request.meta['proxy'] = proxy
request.meta['proxy_start_time'] = time.time()
# 设置代理认证
if self.proxy_auth:
request.headers['Proxy-Authorization'] = f'Basic {self.proxy_auth}'
self.logger.debug(f'Using proxy {proxy} for {request.url}')
return None
def process_response(self, request, response, spider):
"""
处理代理响应
"""
proxy = request.meta.get('proxy')
if proxy:
# 记录成功统计
self.proxy_stats[proxy]['success_count'] += 1
self.proxy_stats[proxy]['last_used'] = time.time()
# 记录响应时间
if 'proxy_start_time' in request.meta:
response_time = time.time() - request.meta['proxy_start_time']
self.proxy_stats[proxy]['response_time'].append(response_time)
# 保持最近100次记录
if len(self.proxy_stats[proxy]['response_time']) > 100:
self.proxy_stats[proxy]['response_time'].pop(0)
# 从失败列表中移除(如果存在)
self.failed_proxies.discard(proxy)
return response
def process_exception(self, request, exception, spider):
"""
处理代理异常
"""
proxy = request.meta.get('proxy')
if proxy:
# 记录失败统计
self.proxy_stats[proxy]['failure_count'] += 1
# 检查是否需要标记为失败
stats = self.proxy_stats[proxy]
failure_rate = stats['failure_count'] / (stats['success_count'] + stats['failure_count'])
if failure_rate > 0.5 and stats['failure_count'] > 5:
self.failed_proxies.add(proxy)
self.logger.warning(f'Proxy {proxy} marked as failed (failure rate: {failure_rate:.2f})')
# 尝试使用其他代理重试
new_proxy = self._get_proxy(request, exclude=proxy)
if new_proxy:
retry_req = request.copy()
retry_req.meta['proxy'] = new_proxy
retry_req.meta['proxy_start_time'] = time.time()
retry_req.dont_filter = True
self.logger.info(f'Retrying with different proxy: {new_proxy}')
return retry_req
return None
def _get_proxy(self, request, exclude=None):
"""
获取代理
"""
available_proxies = [p for p in self.proxy_list
if p not in self.failed_proxies and p != exclude]
if not available_proxies:
# 重置失败代理列表
self.failed_proxies.clear()
available_proxies = self.proxy_list
if not available_proxies:
return None
if self.proxy_mode == 'random':
return random.choice(available_proxies)
elif self.proxy_mode == 'round_robin':
proxy = available_proxies[self.current_proxy_index % len(available_proxies)]
self.current_proxy_index += 1
return proxy
elif self.proxy_mode == 'failover':
# 选择成功率最高的代理
best_proxy = min(available_proxies,
key=lambda p: self._get_failure_rate(p))
return best_proxy
return available_proxies[0]
def _get_failure_rate(self, proxy):
"""
计算代理失败率
"""
stats = self.proxy_stats[proxy]
total = stats['success_count'] + stats['failure_count']
if total == 0:
return 0
return stats['failure_count'] / total
def get_proxy_stats(self):
"""
获取代理统计信息
"""
stats = {}
for proxy, data in self.proxy_stats.items():
total = data['success_count'] + data['failure_count']
avg_response_time = (sum(data['response_time']) / len(data['response_time'])
if data['response_time'] else 0)
stats[proxy] = {
'total_requests': total,
'success_rate': data['success_count'] / total if total > 0 else 0,
'avg_response_time': avg_response_time,
'is_failed': proxy in self.failed_proxies
}
return stats
print("代理中间件开发完成!")
2.3 用户代理中间件
# 5. 用户代理中间件开发
print("\n👤 用户代理中间件开发:")
class UserAgentMiddleware:
"""
用户代理中间件
"""
def __init__(self, settings):
self.settings = settings
self.user_agent_mode = settings.get('USER_AGENT_MODE', 'random')
self.custom_user_agents = settings.getlist('USER_AGENT_LIST', [])
# 预定义用户代理列表
self.default_user_agents = {
'chrome': [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
],
'firefox': [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:89.0) Gecko/20100101 Firefox/89.0',
'Mozilla/5.0 (X11; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0'
],
'safari': [
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
'Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1'
],
'mobile': [
'Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1',
'Mozilla/5.0 (Linux; Android 11; SM-G991B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.120 Mobile Safari/537.36',
'Mozilla/5.0 (Linux; Android 11; Pixel 5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.120 Mobile Safari/537.36'
]
}
# 构建用户代理列表
self.user_agents = self._build_user_agent_list()
self.current_index = 0
# 统计信息
self.usage_stats = {}
self.logger = logging.getLogger(__name__)
@classmethod
def from_crawler(cls, crawler):
return cls(crawler.settings)
def process_request(self, request, spider):
"""
设置用户代理
"""
# 跳过已经设置User-Agent的请求
if 'User-Agent' in request.headers:
return None
# 获取用户代理
user_agent = self._get_user_agent(request, spider)
if user_agent:
request.headers['User-Agent'] = user_agent
request.meta['user_agent'] = user_agent
# 更新统计
self.usage_stats[user_agent] = self.usage_stats.get(user_agent, 0) + 1
self.logger.debug(f'Set User-Agent: {user_agent[:50]}...')
return None
def _build_user_agent_list(self):
"""
构建用户代理列表
"""
if self.custom_user_agents:
return self.custom_user_agents
# 使用默认列表
all_agents = []
for browser_agents in self.default_user_agents.values():
all_agents.extend(browser_agents)
return all_agents
def _get_user_agent(self, request, spider):
"""
获取用户代理
"""
if not self.user_agents:
return None
if self.user_agent_mode == 'random':
return random.choice(self.user_agents)
elif self.user_agent_mode == 'round_robin':
user_agent = self.user_agents[self.current_index % len(self.user_agents)]
self.current_index += 1
return user_agent
elif self.user_agent_mode == 'browser_specific':
# 根据目标网站选择特定浏览器
return self._get_browser_specific_agent(request.url)
elif self.user_agent_mode == 'mobile_first':
# 优先使用移动端用户代理
mobile_agents = self.default_user_agents.get('mobile', [])
if mobile_agents:
return random.choice(mobile_agents)
return random.choice(self.user_agents)
return self.user_agents[0]
def _get_browser_specific_agent(self, url):
"""
根据URL获取特定浏览器的用户代理
"""
domain = urlparse(url).netloc.lower()
# 根据域名特征选择浏览器
if 'mobile' in domain or 'm.' in domain:
return random.choice(self.default_user_agents['mobile'])
elif 'google' in domain:
return random.choice(self.default_user_agents['chrome'])
elif 'apple' in domain or 'safari' in domain:
return random.choice(self.default_user_agents['safari'])
else:
# 默认使用Chrome
return random.choice(self.default_user_agents['chrome'])
def get_usage_stats(self):
"""
获取使用统计
"""
total_usage = sum(self.usage_stats.values())
stats = {}
for ua, count in self.usage_stats.items():
stats[ua[:50] + '...'] = {
'count': count,
'percentage': (count / total_usage * 100) if total_usage > 0 else 0
}
return stats
print("用户代理中间件开发完成!")
3. Spider中间件开发
3.1 基础Spider中间件
# 6. 基础Spider中间件开发
print("\n🕷️ 基础Spider中间件开发:")
from scrapy.http import Request
from scrapy.exceptions import NotConfigured
class BasicSpiderMiddleware:
"""
基础Spider中间件
"""
def __init__(self, settings):
self.settings = settings
self.enabled = settings.getbool('SPIDER_MIDDLEWARE_ENABLED', True)
if not self.enabled:
raise NotConfigured('BasicSpiderMiddleware is disabled')
# 配置参数
self.max_depth = settings.getint('DEPTH_LIMIT', 0)
self.allowed_domains = settings.getlist('ALLOWED_DOMAINS', [])
# 统计信息
self.stats = {
'requests_generated': 0,
'items_scraped': 0,
'requests_filtered': 0,
'exceptions_handled': 0
}
self.logger = logging.getLogger(__name__)
@classmethod
def from_crawler(cls, crawler):
return cls(crawler.settings)
def process_spider_input(self, response, spider):
"""
处理Spider输入(响应)
在响应传递给Spider之前调用
"""
# 记录响应信息
self.logger.debug(f'Processing response: {response.url} (status: {response.status})')
# 可以在这里修改响应或添加元数据
response.meta['spider_middleware_processed'] = True
response.meta['process_timestamp'] = time.time()
# 返回None继续处理
return None
def process_spider_output(self, response, result, spider):
"""
处理Spider输出(Items和Requests)
处理Spider返回的结果
"""
processed_results = []
for item in result:
if isinstance(item, Request):
# 处理请求
processed_request = self._process_request(item, response, spider)
if processed_request:
processed_results.append(processed_request)
self.stats['requests_generated'] += 1
else:
self.stats['requests_filtered'] += 1
else:
# 处理Item
processed_item = self._process_item(item, response, spider)
if processed_item:
processed_results.append(processed_item)
self.stats['items_scraped'] += 1
return processed_results
def process_spider_exception(self, response, exception, spider):
"""
处理Spider异常
"""
self.stats['exceptions_handled'] += 1
self.logger.error(f'Spider exception for {response.url}: {exception}')
# 记录异常信息
exception_info = {
'url': response.url,
'exception_type': type(exception).__name__,
'exception_message': str(exception),
'timestamp': time.time()
}
# 可以选择返回空列表忽略异常,或者返回None继续处理
return []
def process_start_requests(self, start_requests, spider):
"""
处理起始请求
"""
for request in start_requests:
# 为起始请求添加元数据
request.meta['is_start_request'] = True
request.meta['depth'] = 0
self.logger.debug(f'Processing start request: {request.url}')
yield request
def _process_request(self, request, response, spider):
"""
处理单个请求
"""
# 1. 检查深度限制
current_depth = request.meta.get('depth', 0)
if self.max_depth > 0 and current_depth >= self.max_depth:
self.logger.debug(f'Request filtered by depth: {request.url} (depth: {current_depth})')
return None
# 2. 检查域名限制
if self.allowed_domains:
request_domain = urlparse(request.url).netloc
if not any(domain in request_domain for domain in self.allowed_domains):
self.logger.debug(f'Request filtered by domain: {request.url}')
return None
# 3. 添加元数据
request.meta['parent_url'] = response.url
request.meta['spider_middleware_processed'] = True
# 4. 设置深度
if 'depth' not in request.meta:
parent_depth = response.meta.get('depth', 0)
request.meta['depth'] = parent_depth + 1
return request
def _process_item(self, item, response, spider):
"""
处理单个Item
"""
# 添加元数据
if hasattr(item, '__setitem__'):
item['scraped_url'] = response.url
item['scraped_timestamp'] = time.time()
item['spider_name'] = spider.name
return item
def get_stats(self):
"""
获取统计信息
"""
return self.stats.copy()
print("基础Spider中间件开发完成!")
3.2 数据验证中间件
# 7. 数据验证中间件
print("\n✅ 数据验证中间件开发:")
import re
from urllib.parse import urljoin, urlparse
class DataValidationMiddleware:
"""
数据验证中间件
"""
def __init__(self, settings):
self.settings = settings
# 验证规则配置
self.validation_rules = settings.getdict('VALIDATION_RULES', {})
self.required_fields = settings.getlist('REQUIRED_FIELDS', [])
self.url_validation = settings.getbool('VALIDATE_URLS', True)
self.duplicate_detection = settings.getbool('DETECT_DUPLICATES', True)
# 重复检测
self.seen_items = set()
self.duplicate_threshold = settings.getfloat('DUPLICATE_THRESHOLD', 0.9)
# 统计信息
self.validation_stats = {
'total_items': 0,
'valid_items': 0,
'invalid_items': 0,
'duplicate_items': 0,
'validation_errors': {}
}
self.logger = logging.getLogger(__name__)
@classmethod
def from_crawler(cls, crawler):
return cls(crawler.settings)
def process_spider_output(self, response, result, spider):
"""
验证Spider输出
"""
for item in result:
if isinstance(item, Request):
# 验证请求URL
if self.url_validation:
validated_request = self._validate_request(item, response)
if validated_request:
yield validated_request
else:
yield item
else:
# 验证Item
validated_item = self._validate_item(item, response, spider)
if validated_item:
yield validated_item
def _validate_request(self, request, response):
"""
验证请求
"""
# 1. URL格式验证
if not self._is_valid_url(request.url):
self.logger.warning(f'Invalid URL format: {request.url}')
return None
# 2. URL补全
if not request.url.startswith(('http://', 'https://')):
request = request.replace(url=urljoin(response.url, request.url))
# 3. URL规范化
request = request.replace(url=self._normalize_url(request.url))
return request
def _validate_item(self, item, response, spider):
"""
验证Item
"""
self.validation_stats['total_items'] += 1
# 1. 必填字段验证
if not self._validate_required_fields(item):
self.validation_stats['invalid_items'] += 1
return None
# 2. 字段格式验证
if not self._validate_field_formats(item):
self.validation_stats['invalid_items'] += 1
return None
# 3. 重复检测
if self.duplicate_detection and self._is_duplicate(item):
self.validation_stats['duplicate_items'] += 1
self.logger.debug(f'Duplicate item detected: {item}')
return None
# 4. 数据清洗
cleaned_item = self._clean_item(item)
self.validation_stats['valid_items'] += 1
return cleaned_item
def _validate_required_fields(self, item):
"""
验证必填字段
"""
for field in self.required_fields:
if field not in item or not item[field]:
error_key = f'missing_{field}'
self.validation_stats['validation_errors'][error_key] = \
self.validation_stats['validation_errors'].get(error_key, 0) + 1
self.logger.warning(f'Missing required field: {field}')
return False
return True
def _validate_field_formats(self, item):
"""
验证字段格式
"""
for field, rules in self.validation_rules.items():
if field not in item:
continue
value = item[field]
# 类型验证
if 'type' in rules:
expected_type = rules['type']
if expected_type == 'int':
try:
int(value)
except (ValueError, TypeError):
self._record_validation_error(f'{field}_type_error')
return False
elif expected_type == 'float':
try:
float(value)
except (ValueError, TypeError):
self._record_validation_error(f'{field}_type_error')
return False
elif expected_type == 'url':
if not self._is_valid_url(value):
self._record_validation_error(f'{field}_url_error')
return False
elif expected_type == 'email':
if not self._is_valid_email(value):
self._record_validation_error(f'{field}_email_error')
return False
# 长度验证
if 'min_length' in rules:
if len(str(value)) < rules['min_length']:
self._record_validation_error(f'{field}_min_length_error')
return False
if 'max_length' in rules:
if len(str(value)) > rules['max_length']:
self._record_validation_error(f'{field}_max_length_error')
return False
# 正则表达式验证
if 'pattern' in rules:
if not re.match(rules['pattern'], str(value)):
self._record_validation_error(f'{field}_pattern_error')
return False
# 值范围验证
if 'min_value' in rules:
try:
if float(value) < rules['min_value']:
self._record_validation_error(f'{field}_min_value_error')
return False
except (ValueError, TypeError):
pass
if 'max_value' in rules:
try:
if float(value) > rules['max_value']:
self._record_validation_error(f'{field}_max_value_error')
return False
except (ValueError, TypeError):
pass
return True
def _is_duplicate(self, item):
"""
检测重复项
"""
# 生成项目指纹
fingerprint = self._generate_fingerprint(item)
# 检查是否已存在
if fingerprint in self.seen_items:
return True
# 添加到已见集合
self.seen_items.add(fingerprint)
return False
def _generate_fingerprint(self, item):
"""
生成项目指纹
"""
# 使用主要字段生成指纹
key_fields = ['title', 'url', 'name', 'id']
fingerprint_data = []
for field in key_fields:
if field in item and item[field]:
fingerprint_data.append(str(item[field]).strip().lower())
if not fingerprint_data:
# 如果没有关键字段,使用所有字段
fingerprint_data = [str(v).strip().lower() for v in item.values() if v]
return hash('|'.join(fingerprint_data))
def _clean_item(self, item):
"""
清洗Item数据
"""
cleaned_item = {}
for field, value in item.items():
if isinstance(value, str):
# 清理字符串
cleaned_value = value.strip()
cleaned_value = re.sub(r'\s+', ' ', cleaned_value) # 合并多个空格
cleaned_value = cleaned_value.replace('\n', ' ').replace('\r', ' ')
cleaned_item[field] = cleaned_value
else:
cleaned_item[field] = value
return cleaned_item
def _is_valid_url(self, url):
"""
验证URL格式
"""
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except:
return False
def _is_valid_email(self, email):
"""
验证邮箱格式
"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def _normalize_url(self, url):
"""
规范化URL
"""
# 移除URL片段
parsed = urlparse(url)
normalized = parsed._replace(fragment='').geturl()
# 移除尾部斜杠(除了根路径)
if normalized.endswith('/') and normalized.count('/') > 3:
normalized = normalized[:-1]
return normalized
def _record_validation_error(self, error_type):
"""
记录验证错误
"""
self.validation_stats['validation_errors'][error_type] = \
self.validation_stats['validation_errors'].get(error_type, 0) + 1
def get_validation_stats(self):
"""
获取验证统计
"""
stats = self.validation_stats.copy()
if stats['total_items'] > 0:
stats['valid_rate'] = stats['valid_items'] / stats['total_items']
stats['duplicate_rate'] = stats['duplicate_items'] / stats['total_items']
else:
stats['valid_rate'] = 0
stats['duplicate_rate'] = 0
return stats
print("数据验证中间件开发完成!")
4. 中间件配置与管理
4.1 中间件配置
# 8. 中间件配置管理
print("\n⚙️ 中间件配置管理:")
# settings.py 配置示例
MIDDLEWARE_SETTINGS = {
# 下载中间件配置
'DOWNLOADER_MIDDLEWARES': {
'myproject.middlewares.BasicDownloaderMiddleware': 100,
'myproject.middlewares.ProxyMiddleware': 200,
'myproject.middlewares.UserAgentMiddleware': 300,
'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None, # 禁用默认中间件
},
# Spider中间件配置
'SPIDER_MIDDLEWARES': {
'myproject.middlewares.BasicSpiderMiddleware': 100,
'myproject.middlewares.DataValidationMiddleware': 200,
},
# 中间件特定配置
'BASIC_MIDDLEWARE_ENABLED': True,
'DOWNLOAD_DELAY_RANGE': 2.0,
'RETRY_TIMES': 3,
# 代理配置
'PROXY_LIST': [
'http://proxy1.example.com:8080',
'http://proxy2.example.com:8080',
'http://proxy3.example.com:8080',
],
'PROXY_MODE': 'random', # random, round_robin, failover
'PROXY_AUTH': 'dXNlcjpwYXNz', # base64编码的用户名:密码
# 用户代理配置
'USER_AGENT_MODE': 'random',
'USER_AGENT_LIST': [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
],
# 数据验证配置
'VALIDATION_RULES': {
'title': {
'type': 'str',
'min_length': 1,
'max_length': 200,
},
'price': {
'type': 'float',
'min_value': 0,
'max_value': 999999,
},
'url': {
'type': 'url',
},
'email': {
'type': 'email',
}
},
'REQUIRED_FIELDS': ['title', 'url'],
'VALIDATE_URLS': True,
'DETECT_DUPLICATES': True,
}
class MiddlewareManager:
"""
中间件管理器
"""
def __init__(self, settings):
self.settings = settings
self.downloader_middlewares = {}
self.spider_middlewares = {}
self.middleware_stats = {}
self.logger = logging.getLogger(__name__)
def load_middlewares(self):
"""
加载中间件
"""
# 加载下载中间件
downloader_mw_config = self.settings.getdict('DOWNLOADER_MIDDLEWARES', {})
self.downloader_middlewares = self._load_middleware_dict(downloader_mw_config)
# 加载Spider中间件
spider_mw_config = self.settings.getdict('SPIDER_MIDDLEWARES', {})
self.spider_middlewares = self._load_middleware_dict(spider_mw_config)
self.logger.info(f'Loaded {len(self.downloader_middlewares)} downloader middlewares')
self.logger.info(f'Loaded {len(self.spider_middlewares)} spider middlewares')
def _load_middleware_dict(self, middleware_config):
"""
加载中间件字典
"""
middlewares = {}
for middleware_path, priority in middleware_config.items():
if priority is None:
# 禁用中间件
continue
try:
# 动态导入中间件类
module_path, class_name = middleware_path.rsplit('.', 1)
module = __import__(module_path, fromlist=[class_name])
middleware_class = getattr(module, class_name)
# 创建中间件实例
if hasattr(middleware_class, 'from_crawler'):
# 使用from_crawler方法创建
middleware_instance = middleware_class.from_crawler(self.crawler)
else:
# 直接创建实例
middleware_instance = middleware_class(self.settings)
middlewares[priority] = {
'instance': middleware_instance,
'path': middleware_path,
'priority': priority
}
self.logger.debug(f'Loaded middleware: {middleware_path} (priority: {priority})')
except Exception as e:
self.logger.error(f'Failed to load middleware {middleware_path}: {e}')
# 按优先级排序
return dict(sorted(middlewares.items()))
def get_middleware_stats(self):
"""
获取中间件统计信息
"""
stats = {
'downloader_middlewares': {},
'spider_middlewares': {}
}
# 收集下载中间件统计
for priority, middleware_info in self.downloader_middlewares.items():
instance = middleware_info['instance']
path = middleware_info['path']
if hasattr(instance, 'get_stats'):
stats['downloader_middlewares'][path] = instance.get_stats()
elif hasattr(instance, 'get_proxy_stats'):
stats['downloader_middlewares'][path] = instance.get_proxy_stats()
elif hasattr(instance, 'get_usage_stats'):
stats['downloader_middlewares'][path] = instance.get_usage_stats()
# 收集Spider中间件统计
for priority, middleware_info in self.spider_middlewares.items():
instance = middleware_info['instance']
path = middleware_info['path']
if hasattr(instance, 'get_stats'):
stats['spider_middlewares'][path] = instance.get_stats()
elif hasattr(instance, 'get_validation_stats'):
stats['spider_middlewares'][path] = instance.get_validation_stats()
return stats
def print_middleware_info(self):
"""
打印中间件信息
"""
print("\n📊 中间件信息:")
print("\n下载中间件:")
for priority, middleware_info in self.downloader_middlewares.items():
print(f" 优先级 {priority}: {middleware_info['path']}")
print("\nSpider中间件:")
for priority, middleware_info in self.spider_middlewares.items():
print(f" 优先级 {priority}: {middleware_info['path']}")
# 打印统计信息
stats = self.get_middleware_stats()
print("\n统计信息:")
for mw_type, mw_stats in stats.items():
print(f"\n{mw_type}:")
for mw_path, mw_stat in mw_stats.items():
print(f" {mw_path}:")
for key, value in mw_stat.items():
print(f" {key}: {value}")
# 创建中间件管理器示例
print("中间件配置管理完成!")
5. 本章小结
本章详细介绍了Scrapy中间件的开发与应用,主要内容包括:
5.1 核心概念
中间件类型
- 下载中间件:处理请求和响应
- Spider中间件:处理Spider的输入输出
工作流程
- 请求处理流程和响应处理流程
- 中间件执行顺序和优先级
5.2 实用中间件
基础功能中间件
- 请求延迟和重试机制
- 统计信息收集
代理中间件
- 多种代理模式(随机、轮询、故障转移)
- 代理状态监控和故障检测
用户代理中间件
- 多种用户代理策略
- 浏览器特定和移动端优先模式
数据验证中间件
- 字段验证和格式检查
- 重复检测和数据清洗
5.3 配置管理
- 中间件配置和优先级设置
- 动态加载和统计信息收集
- 中间件管理器的实现
6. 最佳实践
6.1 中间件设计原则
# 中间件设计最佳实践
design_principles = {
"单一职责": "每个中间件只负责一个特定功能",
"可配置性": "通过settings进行灵活配置",
"错误处理": "优雅处理异常和错误情况",
"性能考虑": "避免阻塞操作和内存泄漏",
"统计监控": "提供详细的运行统计信息"
}
6.2 性能优化
# 性能优化技巧
performance_tips = {
"缓存机制": "缓存计算结果和查询结果",
"批量处理": "批量处理请求和响应",
"异步操作": "使用异步操作避免阻塞",
"内存管理": "及时清理不需要的数据",
"优先级设置": "合理设置中间件优先级"
}
7. 常见陷阱
7.1 配置陷阱
- 优先级冲突:中间件优先级设置不当
- 循环依赖:中间件之间的循环依赖
- 配置错误:settings配置错误导致中间件无法加载
7.2 性能陷阱
- 阻塞操作:在中间件中执行耗时操作
- 内存泄漏:未及时清理缓存和统计数据
- 过度处理:对每个请求都执行复杂操作
8. 下一步学习
完成本章后,建议继续学习:
- 第6章:分布式爬虫 - 学习Scrapy-Redis分布式方案
- 第7章:反爬虫对抗 - 掌握反爬虫技术和对策
- 第8章:数据存储 - 了解多种数据存储方案
- 第9章:监控与部署 - 学习爬虫监控和生产部署
9. 练习题
9.1 基础练习
- 实现一个简单的请求限速中间件
- 开发一个响应缓存中间件
- 创建一个请求统计中间件
9.2 进阶练习
- 实现一个智能代理轮换中间件,支持代理健康检查
- 开发一个基于机器学习的反爬虫检测中间件
- 创建一个分布式会话管理中间件
9.3 项目练习
- 综合中间件系统:构建一个完整的中间件管理系统
- 智能爬虫中间件:开发具有自适应能力的智能中间件
- 监控中间件:创建实时监控和告警中间件
恭喜! 您已经掌握了Scrapy中间件的开发与应用。中间件是Scrapy框架的核心组件,掌握中间件开发将大大提升您的爬虫开发能力。继续学习下一章,探索分布式爬虫的奥秘!