学习目标

通过本章学习,您将掌握:

  • 理解Scrapy中间件的工作原理和类型
  • 掌握下载中间件的开发和配置
  • 学会Spider中间件的实现和应用
  • 了解中间件的最佳实践和性能优化
  • 实现常见的中间件功能(代理、用户代理、缓存等)

1. 中间件基础概念

1.1 什么是中间件

中间件是Scrapy框架中的一个重要组件,它允许您在请求和响应的处理过程中插入自定义逻辑。

# 1. 中间件基础概念演示
print("🔧 中间件基础概念:")

# 中间件的作用
middleware_roles = {
    "请求处理": "在发送请求前修改请求",
    "响应处理": "在处理响应前修改响应", 
    "异常处理": "处理请求和响应过程中的异常",
    "统计监控": "收集爬虫运行统计信息",
    "缓存管理": "实现请求和响应的缓存",
    "代理管理": "动态切换代理服务器"
}

for role, description in middleware_roles.items():
    print(f"  {role}: {description}")

# 中间件类型
middleware_types = {
    "下载中间件": "处理请求和响应",
    "Spider中间件": "处理Spider的输入和输出"
}

print("\n中间件类型:")
for mtype, description in middleware_types.items():
    print(f"  {mtype}: {description}")

1.2 中间件工作流程

# 2. 中间件工作流程
print("\n⚡ 中间件工作流程:")

class MiddlewareWorkflow:
    """
    中间件工作流程演示
    """
    
    def __init__(self):
        self.request_middlewares = []
        self.response_middlewares = []
        self.spider_middlewares = []
    
    def demonstrate_workflow(self):
        """
        演示中间件工作流程
        """
        print("\n1. 请求处理流程:")
        request_flow = [
            "1. Spider生成Request",
            "2. 经过Spider中间件处理",
            "3. 进入调度器队列",
            "4. 经过下载中间件处理",
            "5. 发送到目标服务器"
        ]
        
        for step in request_flow:
            print(f"   {step}")
        
        print("\n2. 响应处理流程:")
        response_flow = [
            "1. 服务器返回响应",
            "2. 经过下载中间件处理",
            "3. 传递给Spider处理",
            "4. 经过Spider中间件处理",
            "5. 生成Items和新的Requests"
        ]
        
        for step in response_flow:
            print(f"   {step}")
        
        print("\n3. 中间件执行顺序:")
        execution_order = {
            "请求处理": "按优先级从小到大执行",
            "响应处理": "按优先级从大到小执行",
            "异常处理": "按优先级从小到大执行"
        }
        
        for process, order in execution_order.items():
            print(f"   {process}: {order}")

# 创建工作流程演示实例
workflow = MiddlewareWorkflow()
workflow.demonstrate_workflow()

print("中间件工作流程演示完成!")

2. 下载中间件开发

2.1 基础下载中间件

# 3. 基础下载中间件开发
print("\n🔽 基础下载中间件开发:")

import scrapy
from scrapy.downloadermiddlewares.retry import RetryMiddleware
from scrapy.exceptions import IgnoreRequest, NotConfigured
from scrapy.http import HtmlResponse
import random
import time
import logging

class BasicDownloaderMiddleware:
    """
    基础下载中间件示例
    """
    
    def __init__(self, settings):
        """
        初始化中间件
        """
        self.settings = settings
        self.enabled = settings.getbool('BASIC_MIDDLEWARE_ENABLED', True)
        
        if not self.enabled:
            raise NotConfigured('BasicDownloaderMiddleware is disabled')
        
        # 配置参数
        self.delay_range = settings.getfloat('DOWNLOAD_DELAY_RANGE', 1.0)
        self.retry_times = settings.getint('RETRY_TIMES', 3)
        
        # 统计信息
        self.stats = {
            'requests_processed': 0,
            'responses_processed': 0,
            'exceptions_handled': 0
        }
        
        self.logger = logging.getLogger(__name__)
    
    @classmethod
    def from_crawler(cls, crawler):
        """
        从crawler创建中间件实例
        """
        return cls(crawler.settings)
    
    def process_request(self, request, spider):
        """
        处理请求
        
        返回值说明:
        - None: 继续处理
        - Response: 直接返回响应,不发送请求
        - Request: 替换原请求
        - IgnoreRequest: 忽略请求
        """
        self.stats['requests_processed'] += 1
        
        # 1. 添加随机延迟
        if self.delay_range > 0:
            delay = random.uniform(0, self.delay_range)
            time.sleep(delay)
            self.logger.debug(f'Request delayed by {delay:.2f}s: {request.url}')
        
        # 2. 添加请求头
        request.headers.setdefault('User-Agent', self._get_random_user_agent())
        request.headers.setdefault('Accept-Language', 'zh-CN,zh;q=0.9,en;q=0.8')
        
        # 3. 添加请求元数据
        request.meta['middleware_processed'] = True
        request.meta['process_time'] = time.time()
        
        self.logger.debug(f'Processing request: {request.url}')
        
        # 返回None继续处理
        return None
    
    def process_response(self, request, response, spider):
        """
        处理响应
        
        返回值说明:
        - Response: 返回响应
        - Request: 重新请求
        - IgnoreRequest: 忽略请求
        """
        self.stats['responses_processed'] += 1
        
        # 1. 记录处理时间
        if 'process_time' in request.meta:
            process_time = time.time() - request.meta['process_time']
            self.logger.debug(f'Response processed in {process_time:.2f}s: {response.url}')
        
        # 2. 检查响应状态
        if response.status in [200, 301, 302]:
            self.logger.debug(f'Successful response: {response.status} {response.url}')
            return response
        
        # 3. 处理错误状态
        elif response.status in [403, 404, 500, 502, 503]:
            self.logger.warning(f'Error response: {response.status} {response.url}')
            
            # 可以选择重试或忽略
            retry_times = request.meta.get('retry_times', 0)
            if retry_times < self.retry_times:
                self.logger.info(f'Retrying request: {request.url} (attempt {retry_times + 1})')
                retry_req = request.copy()
                retry_req.meta['retry_times'] = retry_times + 1
                return retry_req
            else:
                self.logger.error(f'Max retries exceeded: {request.url}')
                raise IgnoreRequest(f'Max retries exceeded: {response.status}')
        
        return response
    
    def process_exception(self, request, exception, spider):
        """
        处理异常
        
        返回值说明:
        - None: 继续处理异常
        - Response: 返回响应
        - Request: 重新请求
        """
        self.stats['exceptions_handled'] += 1
        
        self.logger.error(f'Exception in request {request.url}: {exception}')
        
        # 记录异常类型
        exception_type = type(exception).__name__
        request.meta['exception_type'] = exception_type
        
        # 根据异常类型决定处理方式
        if exception_type in ['TimeoutError', 'ConnectionError']:
            # 网络异常,可以重试
            retry_times = request.meta.get('retry_times', 0)
            if retry_times < self.retry_times:
                self.logger.info(f'Retrying after {exception_type}: {request.url}')
                retry_req = request.copy()
                retry_req.meta['retry_times'] = retry_times + 1
                return retry_req
        
        # 其他异常,继续处理
        return None
    
    def _get_random_user_agent(self):
        """
        获取随机User-Agent
        """
        user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
        ]
        return random.choice(user_agents)
    
    def get_stats(self):
        """
        获取统计信息
        """
        return self.stats.copy()

print("基础下载中间件开发完成!")

2.2 代理中间件

# 4. 代理中间件开发
print("\n🌐 代理中间件开发:")

import requests
from urllib.parse import urlparse

class ProxyMiddleware:
    """
    代理中间件
    """
    
    def __init__(self, settings):
        self.settings = settings
        self.proxy_list = settings.getlist('PROXY_LIST', [])
        self.proxy_mode = settings.get('PROXY_MODE', 'random')  # random, round_robin, failover
        self.proxy_auth = settings.get('PROXY_AUTH', None)
        
        # 代理状态管理
        self.proxy_stats = {}
        self.current_proxy_index = 0
        self.failed_proxies = set()
        
        # 初始化代理统计
        for proxy in self.proxy_list:
            self.proxy_stats[proxy] = {
                'success_count': 0,
                'failure_count': 0,
                'last_used': None,
                'response_time': []
            }
        
        self.logger = logging.getLogger(__name__)
        
        if not self.proxy_list:
            raise NotConfigured('No proxy list provided')
    
    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler.settings)
    
    def process_request(self, request, spider):
        """
        为请求分配代理
        """
        # 跳过已经设置代理的请求
        if 'proxy' in request.meta:
            return None
        
        # 获取可用代理
        proxy = self._get_proxy(request)
        
        if proxy:
            request.meta['proxy'] = proxy
            request.meta['proxy_start_time'] = time.time()
            
            # 设置代理认证
            if self.proxy_auth:
                request.headers['Proxy-Authorization'] = f'Basic {self.proxy_auth}'
            
            self.logger.debug(f'Using proxy {proxy} for {request.url}')
        
        return None
    
    def process_response(self, request, response, spider):
        """
        处理代理响应
        """
        proxy = request.meta.get('proxy')
        
        if proxy:
            # 记录成功统计
            self.proxy_stats[proxy]['success_count'] += 1
            self.proxy_stats[proxy]['last_used'] = time.time()
            
            # 记录响应时间
            if 'proxy_start_time' in request.meta:
                response_time = time.time() - request.meta['proxy_start_time']
                self.proxy_stats[proxy]['response_time'].append(response_time)
                
                # 保持最近100次记录
                if len(self.proxy_stats[proxy]['response_time']) > 100:
                    self.proxy_stats[proxy]['response_time'].pop(0)
            
            # 从失败列表中移除(如果存在)
            self.failed_proxies.discard(proxy)
        
        return response
    
    def process_exception(self, request, exception, spider):
        """
        处理代理异常
        """
        proxy = request.meta.get('proxy')
        
        if proxy:
            # 记录失败统计
            self.proxy_stats[proxy]['failure_count'] += 1
            
            # 检查是否需要标记为失败
            stats = self.proxy_stats[proxy]
            failure_rate = stats['failure_count'] / (stats['success_count'] + stats['failure_count'])
            
            if failure_rate > 0.5 and stats['failure_count'] > 5:
                self.failed_proxies.add(proxy)
                self.logger.warning(f'Proxy {proxy} marked as failed (failure rate: {failure_rate:.2f})')
            
            # 尝试使用其他代理重试
            new_proxy = self._get_proxy(request, exclude=proxy)
            if new_proxy:
                retry_req = request.copy()
                retry_req.meta['proxy'] = new_proxy
                retry_req.meta['proxy_start_time'] = time.time()
                retry_req.dont_filter = True
                
                self.logger.info(f'Retrying with different proxy: {new_proxy}')
                return retry_req
        
        return None
    
    def _get_proxy(self, request, exclude=None):
        """
        获取代理
        """
        available_proxies = [p for p in self.proxy_list 
                           if p not in self.failed_proxies and p != exclude]
        
        if not available_proxies:
            # 重置失败代理列表
            self.failed_proxies.clear()
            available_proxies = self.proxy_list
        
        if not available_proxies:
            return None
        
        if self.proxy_mode == 'random':
            return random.choice(available_proxies)
        
        elif self.proxy_mode == 'round_robin':
            proxy = available_proxies[self.current_proxy_index % len(available_proxies)]
            self.current_proxy_index += 1
            return proxy
        
        elif self.proxy_mode == 'failover':
            # 选择成功率最高的代理
            best_proxy = min(available_proxies, 
                           key=lambda p: self._get_failure_rate(p))
            return best_proxy
        
        return available_proxies[0]
    
    def _get_failure_rate(self, proxy):
        """
        计算代理失败率
        """
        stats = self.proxy_stats[proxy]
        total = stats['success_count'] + stats['failure_count']
        
        if total == 0:
            return 0
        
        return stats['failure_count'] / total
    
    def get_proxy_stats(self):
        """
        获取代理统计信息
        """
        stats = {}
        
        for proxy, data in self.proxy_stats.items():
            total = data['success_count'] + data['failure_count']
            avg_response_time = (sum(data['response_time']) / len(data['response_time']) 
                               if data['response_time'] else 0)
            
            stats[proxy] = {
                'total_requests': total,
                'success_rate': data['success_count'] / total if total > 0 else 0,
                'avg_response_time': avg_response_time,
                'is_failed': proxy in self.failed_proxies
            }
        
        return stats

print("代理中间件开发完成!")

2.3 用户代理中间件

# 5. 用户代理中间件开发
print("\n👤 用户代理中间件开发:")

class UserAgentMiddleware:
    """
    用户代理中间件
    """
    
    def __init__(self, settings):
        self.settings = settings
        self.user_agent_mode = settings.get('USER_AGENT_MODE', 'random')
        self.custom_user_agents = settings.getlist('USER_AGENT_LIST', [])
        
        # 预定义用户代理列表
        self.default_user_agents = {
            'chrome': [
                'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
                'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
                'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
            ],
            'firefox': [
                'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
                'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:89.0) Gecko/20100101 Firefox/89.0',
                'Mozilla/5.0 (X11; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0'
            ],
            'safari': [
                'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
                'Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1'
            ],
            'mobile': [
                'Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1',
                'Mozilla/5.0 (Linux; Android 11; SM-G991B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.120 Mobile Safari/537.36',
                'Mozilla/5.0 (Linux; Android 11; Pixel 5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.120 Mobile Safari/537.36'
            ]
        }
        
        # 构建用户代理列表
        self.user_agents = self._build_user_agent_list()
        self.current_index = 0
        
        # 统计信息
        self.usage_stats = {}
        
        self.logger = logging.getLogger(__name__)
    
    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler.settings)
    
    def process_request(self, request, spider):
        """
        设置用户代理
        """
        # 跳过已经设置User-Agent的请求
        if 'User-Agent' in request.headers:
            return None
        
        # 获取用户代理
        user_agent = self._get_user_agent(request, spider)
        
        if user_agent:
            request.headers['User-Agent'] = user_agent
            request.meta['user_agent'] = user_agent
            
            # 更新统计
            self.usage_stats[user_agent] = self.usage_stats.get(user_agent, 0) + 1
            
            self.logger.debug(f'Set User-Agent: {user_agent[:50]}...')
        
        return None
    
    def _build_user_agent_list(self):
        """
        构建用户代理列表
        """
        if self.custom_user_agents:
            return self.custom_user_agents
        
        # 使用默认列表
        all_agents = []
        for browser_agents in self.default_user_agents.values():
            all_agents.extend(browser_agents)
        
        return all_agents
    
    def _get_user_agent(self, request, spider):
        """
        获取用户代理
        """
        if not self.user_agents:
            return None
        
        if self.user_agent_mode == 'random':
            return random.choice(self.user_agents)
        
        elif self.user_agent_mode == 'round_robin':
            user_agent = self.user_agents[self.current_index % len(self.user_agents)]
            self.current_index += 1
            return user_agent
        
        elif self.user_agent_mode == 'browser_specific':
            # 根据目标网站选择特定浏览器
            return self._get_browser_specific_agent(request.url)
        
        elif self.user_agent_mode == 'mobile_first':
            # 优先使用移动端用户代理
            mobile_agents = self.default_user_agents.get('mobile', [])
            if mobile_agents:
                return random.choice(mobile_agents)
            return random.choice(self.user_agents)
        
        return self.user_agents[0]
    
    def _get_browser_specific_agent(self, url):
        """
        根据URL获取特定浏览器的用户代理
        """
        domain = urlparse(url).netloc.lower()
        
        # 根据域名特征选择浏览器
        if 'mobile' in domain or 'm.' in domain:
            return random.choice(self.default_user_agents['mobile'])
        elif 'google' in domain:
            return random.choice(self.default_user_agents['chrome'])
        elif 'apple' in domain or 'safari' in domain:
            return random.choice(self.default_user_agents['safari'])
        else:
            # 默认使用Chrome
            return random.choice(self.default_user_agents['chrome'])
    
    def get_usage_stats(self):
        """
        获取使用统计
        """
        total_usage = sum(self.usage_stats.values())
        
        stats = {}
        for ua, count in self.usage_stats.items():
            stats[ua[:50] + '...'] = {
                'count': count,
                'percentage': (count / total_usage * 100) if total_usage > 0 else 0
            }
        
        return stats

print("用户代理中间件开发完成!")

3. Spider中间件开发

3.1 基础Spider中间件

# 6. 基础Spider中间件开发
print("\n🕷️ 基础Spider中间件开发:")

from scrapy.http import Request
from scrapy.exceptions import NotConfigured

class BasicSpiderMiddleware:
    """
    基础Spider中间件
    """
    
    def __init__(self, settings):
        self.settings = settings
        self.enabled = settings.getbool('SPIDER_MIDDLEWARE_ENABLED', True)
        
        if not self.enabled:
            raise NotConfigured('BasicSpiderMiddleware is disabled')
        
        # 配置参数
        self.max_depth = settings.getint('DEPTH_LIMIT', 0)
        self.allowed_domains = settings.getlist('ALLOWED_DOMAINS', [])
        
        # 统计信息
        self.stats = {
            'requests_generated': 0,
            'items_scraped': 0,
            'requests_filtered': 0,
            'exceptions_handled': 0
        }
        
        self.logger = logging.getLogger(__name__)
    
    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler.settings)
    
    def process_spider_input(self, response, spider):
        """
        处理Spider输入(响应)
        
        在响应传递给Spider之前调用
        """
        # 记录响应信息
        self.logger.debug(f'Processing response: {response.url} (status: {response.status})')
        
        # 可以在这里修改响应或添加元数据
        response.meta['spider_middleware_processed'] = True
        response.meta['process_timestamp'] = time.time()
        
        # 返回None继续处理
        return None
    
    def process_spider_output(self, response, result, spider):
        """
        处理Spider输出(Items和Requests)
        
        处理Spider返回的结果
        """
        processed_results = []
        
        for item in result:
            if isinstance(item, Request):
                # 处理请求
                processed_request = self._process_request(item, response, spider)
                if processed_request:
                    processed_results.append(processed_request)
                    self.stats['requests_generated'] += 1
                else:
                    self.stats['requests_filtered'] += 1
            
            else:
                # 处理Item
                processed_item = self._process_item(item, response, spider)
                if processed_item:
                    processed_results.append(processed_item)
                    self.stats['items_scraped'] += 1
        
        return processed_results
    
    def process_spider_exception(self, response, exception, spider):
        """
        处理Spider异常
        """
        self.stats['exceptions_handled'] += 1
        
        self.logger.error(f'Spider exception for {response.url}: {exception}')
        
        # 记录异常信息
        exception_info = {
            'url': response.url,
            'exception_type': type(exception).__name__,
            'exception_message': str(exception),
            'timestamp': time.time()
        }
        
        # 可以选择返回空列表忽略异常,或者返回None继续处理
        return []
    
    def process_start_requests(self, start_requests, spider):
        """
        处理起始请求
        """
        for request in start_requests:
            # 为起始请求添加元数据
            request.meta['is_start_request'] = True
            request.meta['depth'] = 0
            
            self.logger.debug(f'Processing start request: {request.url}')
            yield request
    
    def _process_request(self, request, response, spider):
        """
        处理单个请求
        """
        # 1. 检查深度限制
        current_depth = request.meta.get('depth', 0)
        if self.max_depth > 0 and current_depth >= self.max_depth:
            self.logger.debug(f'Request filtered by depth: {request.url} (depth: {current_depth})')
            return None
        
        # 2. 检查域名限制
        if self.allowed_domains:
            request_domain = urlparse(request.url).netloc
            if not any(domain in request_domain for domain in self.allowed_domains):
                self.logger.debug(f'Request filtered by domain: {request.url}')
                return None
        
        # 3. 添加元数据
        request.meta['parent_url'] = response.url
        request.meta['spider_middleware_processed'] = True
        
        # 4. 设置深度
        if 'depth' not in request.meta:
            parent_depth = response.meta.get('depth', 0)
            request.meta['depth'] = parent_depth + 1
        
        return request
    
    def _process_item(self, item, response, spider):
        """
        处理单个Item
        """
        # 添加元数据
        if hasattr(item, '__setitem__'):
            item['scraped_url'] = response.url
            item['scraped_timestamp'] = time.time()
            item['spider_name'] = spider.name
        
        return item
    
    def get_stats(self):
        """
        获取统计信息
        """
        return self.stats.copy()

print("基础Spider中间件开发完成!")

3.2 数据验证中间件

# 7. 数据验证中间件
print("\n✅ 数据验证中间件开发:")

import re
from urllib.parse import urljoin, urlparse

class DataValidationMiddleware:
    """
    数据验证中间件
    """
    
    def __init__(self, settings):
        self.settings = settings
        
        # 验证规则配置
        self.validation_rules = settings.getdict('VALIDATION_RULES', {})
        self.required_fields = settings.getlist('REQUIRED_FIELDS', [])
        self.url_validation = settings.getbool('VALIDATE_URLS', True)
        self.duplicate_detection = settings.getbool('DETECT_DUPLICATES', True)
        
        # 重复检测
        self.seen_items = set()
        self.duplicate_threshold = settings.getfloat('DUPLICATE_THRESHOLD', 0.9)
        
        # 统计信息
        self.validation_stats = {
            'total_items': 0,
            'valid_items': 0,
            'invalid_items': 0,
            'duplicate_items': 0,
            'validation_errors': {}
        }
        
        self.logger = logging.getLogger(__name__)
    
    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler.settings)
    
    def process_spider_output(self, response, result, spider):
        """
        验证Spider输出
        """
        for item in result:
            if isinstance(item, Request):
                # 验证请求URL
                if self.url_validation:
                    validated_request = self._validate_request(item, response)
                    if validated_request:
                        yield validated_request
                else:
                    yield item
            
            else:
                # 验证Item
                validated_item = self._validate_item(item, response, spider)
                if validated_item:
                    yield validated_item
    
    def _validate_request(self, request, response):
        """
        验证请求
        """
        # 1. URL格式验证
        if not self._is_valid_url(request.url):
            self.logger.warning(f'Invalid URL format: {request.url}')
            return None
        
        # 2. URL补全
        if not request.url.startswith(('http://', 'https://')):
            request = request.replace(url=urljoin(response.url, request.url))
        
        # 3. URL规范化
        request = request.replace(url=self._normalize_url(request.url))
        
        return request
    
    def _validate_item(self, item, response, spider):
        """
        验证Item
        """
        self.validation_stats['total_items'] += 1
        
        # 1. 必填字段验证
        if not self._validate_required_fields(item):
            self.validation_stats['invalid_items'] += 1
            return None
        
        # 2. 字段格式验证
        if not self._validate_field_formats(item):
            self.validation_stats['invalid_items'] += 1
            return None
        
        # 3. 重复检测
        if self.duplicate_detection and self._is_duplicate(item):
            self.validation_stats['duplicate_items'] += 1
            self.logger.debug(f'Duplicate item detected: {item}')
            return None
        
        # 4. 数据清洗
        cleaned_item = self._clean_item(item)
        
        self.validation_stats['valid_items'] += 1
        return cleaned_item
    
    def _validate_required_fields(self, item):
        """
        验证必填字段
        """
        for field in self.required_fields:
            if field not in item or not item[field]:
                error_key = f'missing_{field}'
                self.validation_stats['validation_errors'][error_key] = \
                    self.validation_stats['validation_errors'].get(error_key, 0) + 1
                
                self.logger.warning(f'Missing required field: {field}')
                return False
        
        return True
    
    def _validate_field_formats(self, item):
        """
        验证字段格式
        """
        for field, rules in self.validation_rules.items():
            if field not in item:
                continue
            
            value = item[field]
            
            # 类型验证
            if 'type' in rules:
                expected_type = rules['type']
                if expected_type == 'int':
                    try:
                        int(value)
                    except (ValueError, TypeError):
                        self._record_validation_error(f'{field}_type_error')
                        return False
                
                elif expected_type == 'float':
                    try:
                        float(value)
                    except (ValueError, TypeError):
                        self._record_validation_error(f'{field}_type_error')
                        return False
                
                elif expected_type == 'url':
                    if not self._is_valid_url(value):
                        self._record_validation_error(f'{field}_url_error')
                        return False
                
                elif expected_type == 'email':
                    if not self._is_valid_email(value):
                        self._record_validation_error(f'{field}_email_error')
                        return False
            
            # 长度验证
            if 'min_length' in rules:
                if len(str(value)) < rules['min_length']:
                    self._record_validation_error(f'{field}_min_length_error')
                    return False
            
            if 'max_length' in rules:
                if len(str(value)) > rules['max_length']:
                    self._record_validation_error(f'{field}_max_length_error')
                    return False
            
            # 正则表达式验证
            if 'pattern' in rules:
                if not re.match(rules['pattern'], str(value)):
                    self._record_validation_error(f'{field}_pattern_error')
                    return False
            
            # 值范围验证
            if 'min_value' in rules:
                try:
                    if float(value) < rules['min_value']:
                        self._record_validation_error(f'{field}_min_value_error')
                        return False
                except (ValueError, TypeError):
                    pass
            
            if 'max_value' in rules:
                try:
                    if float(value) > rules['max_value']:
                        self._record_validation_error(f'{field}_max_value_error')
                        return False
                except (ValueError, TypeError):
                    pass
        
        return True
    
    def _is_duplicate(self, item):
        """
        检测重复项
        """
        # 生成项目指纹
        fingerprint = self._generate_fingerprint(item)
        
        # 检查是否已存在
        if fingerprint in self.seen_items:
            return True
        
        # 添加到已见集合
        self.seen_items.add(fingerprint)
        return False
    
    def _generate_fingerprint(self, item):
        """
        生成项目指纹
        """
        # 使用主要字段生成指纹
        key_fields = ['title', 'url', 'name', 'id']
        fingerprint_data = []
        
        for field in key_fields:
            if field in item and item[field]:
                fingerprint_data.append(str(item[field]).strip().lower())
        
        if not fingerprint_data:
            # 如果没有关键字段,使用所有字段
            fingerprint_data = [str(v).strip().lower() for v in item.values() if v]
        
        return hash('|'.join(fingerprint_data))
    
    def _clean_item(self, item):
        """
        清洗Item数据
        """
        cleaned_item = {}
        
        for field, value in item.items():
            if isinstance(value, str):
                # 清理字符串
                cleaned_value = value.strip()
                cleaned_value = re.sub(r'\s+', ' ', cleaned_value)  # 合并多个空格
                cleaned_value = cleaned_value.replace('\n', ' ').replace('\r', ' ')
                cleaned_item[field] = cleaned_value
            else:
                cleaned_item[field] = value
        
        return cleaned_item
    
    def _is_valid_url(self, url):
        """
        验证URL格式
        """
        try:
            result = urlparse(url)
            return all([result.scheme, result.netloc])
        except:
            return False
    
    def _is_valid_email(self, email):
        """
        验证邮箱格式
        """
        pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        return re.match(pattern, email) is not None
    
    def _normalize_url(self, url):
        """
        规范化URL
        """
        # 移除URL片段
        parsed = urlparse(url)
        normalized = parsed._replace(fragment='').geturl()
        
        # 移除尾部斜杠(除了根路径)
        if normalized.endswith('/') and normalized.count('/') > 3:
            normalized = normalized[:-1]
        
        return normalized
    
    def _record_validation_error(self, error_type):
        """
        记录验证错误
        """
        self.validation_stats['validation_errors'][error_type] = \
            self.validation_stats['validation_errors'].get(error_type, 0) + 1
    
    def get_validation_stats(self):
        """
        获取验证统计
        """
        stats = self.validation_stats.copy()
        
        if stats['total_items'] > 0:
            stats['valid_rate'] = stats['valid_items'] / stats['total_items']
            stats['duplicate_rate'] = stats['duplicate_items'] / stats['total_items']
        else:
            stats['valid_rate'] = 0
            stats['duplicate_rate'] = 0
        
        return stats

print("数据验证中间件开发完成!")

4. 中间件配置与管理

4.1 中间件配置

# 8. 中间件配置管理
print("\n⚙️ 中间件配置管理:")

# settings.py 配置示例
MIDDLEWARE_SETTINGS = {
    # 下载中间件配置
    'DOWNLOADER_MIDDLEWARES': {
        'myproject.middlewares.BasicDownloaderMiddleware': 100,
        'myproject.middlewares.ProxyMiddleware': 200,
        'myproject.middlewares.UserAgentMiddleware': 300,
        'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None,  # 禁用默认中间件
    },
    
    # Spider中间件配置
    'SPIDER_MIDDLEWARES': {
        'myproject.middlewares.BasicSpiderMiddleware': 100,
        'myproject.middlewares.DataValidationMiddleware': 200,
    },
    
    # 中间件特定配置
    'BASIC_MIDDLEWARE_ENABLED': True,
    'DOWNLOAD_DELAY_RANGE': 2.0,
    'RETRY_TIMES': 3,
    
    # 代理配置
    'PROXY_LIST': [
        'http://proxy1.example.com:8080',
        'http://proxy2.example.com:8080',
        'http://proxy3.example.com:8080',
    ],
    'PROXY_MODE': 'random',  # random, round_robin, failover
    'PROXY_AUTH': 'dXNlcjpwYXNz',  # base64编码的用户名:密码
    
    # 用户代理配置
    'USER_AGENT_MODE': 'random',
    'USER_AGENT_LIST': [
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
    ],
    
    # 数据验证配置
    'VALIDATION_RULES': {
        'title': {
            'type': 'str',
            'min_length': 1,
            'max_length': 200,
        },
        'price': {
            'type': 'float',
            'min_value': 0,
            'max_value': 999999,
        },
        'url': {
            'type': 'url',
        },
        'email': {
            'type': 'email',
        }
    },
    'REQUIRED_FIELDS': ['title', 'url'],
    'VALIDATE_URLS': True,
    'DETECT_DUPLICATES': True,
}

class MiddlewareManager:
    """
    中间件管理器
    """
    
    def __init__(self, settings):
        self.settings = settings
        self.downloader_middlewares = {}
        self.spider_middlewares = {}
        self.middleware_stats = {}
        
        self.logger = logging.getLogger(__name__)
    
    def load_middlewares(self):
        """
        加载中间件
        """
        # 加载下载中间件
        downloader_mw_config = self.settings.getdict('DOWNLOADER_MIDDLEWARES', {})
        self.downloader_middlewares = self._load_middleware_dict(downloader_mw_config)
        
        # 加载Spider中间件
        spider_mw_config = self.settings.getdict('SPIDER_MIDDLEWARES', {})
        self.spider_middlewares = self._load_middleware_dict(spider_mw_config)
        
        self.logger.info(f'Loaded {len(self.downloader_middlewares)} downloader middlewares')
        self.logger.info(f'Loaded {len(self.spider_middlewares)} spider middlewares')
    
    def _load_middleware_dict(self, middleware_config):
        """
        加载中间件字典
        """
        middlewares = {}
        
        for middleware_path, priority in middleware_config.items():
            if priority is None:
                # 禁用中间件
                continue
            
            try:
                # 动态导入中间件类
                module_path, class_name = middleware_path.rsplit('.', 1)
                module = __import__(module_path, fromlist=[class_name])
                middleware_class = getattr(module, class_name)
                
                # 创建中间件实例
                if hasattr(middleware_class, 'from_crawler'):
                    # 使用from_crawler方法创建
                    middleware_instance = middleware_class.from_crawler(self.crawler)
                else:
                    # 直接创建实例
                    middleware_instance = middleware_class(self.settings)
                
                middlewares[priority] = {
                    'instance': middleware_instance,
                    'path': middleware_path,
                    'priority': priority
                }
                
                self.logger.debug(f'Loaded middleware: {middleware_path} (priority: {priority})')
                
            except Exception as e:
                self.logger.error(f'Failed to load middleware {middleware_path}: {e}')
        
        # 按优先级排序
        return dict(sorted(middlewares.items()))
    
    def get_middleware_stats(self):
        """
        获取中间件统计信息
        """
        stats = {
            'downloader_middlewares': {},
            'spider_middlewares': {}
        }
        
        # 收集下载中间件统计
        for priority, middleware_info in self.downloader_middlewares.items():
            instance = middleware_info['instance']
            path = middleware_info['path']
            
            if hasattr(instance, 'get_stats'):
                stats['downloader_middlewares'][path] = instance.get_stats()
            elif hasattr(instance, 'get_proxy_stats'):
                stats['downloader_middlewares'][path] = instance.get_proxy_stats()
            elif hasattr(instance, 'get_usage_stats'):
                stats['downloader_middlewares'][path] = instance.get_usage_stats()
        
        # 收集Spider中间件统计
        for priority, middleware_info in self.spider_middlewares.items():
            instance = middleware_info['instance']
            path = middleware_info['path']
            
            if hasattr(instance, 'get_stats'):
                stats['spider_middlewares'][path] = instance.get_stats()
            elif hasattr(instance, 'get_validation_stats'):
                stats['spider_middlewares'][path] = instance.get_validation_stats()
        
        return stats
    
    def print_middleware_info(self):
        """
        打印中间件信息
        """
        print("\n📊 中间件信息:")
        
        print("\n下载中间件:")
        for priority, middleware_info in self.downloader_middlewares.items():
            print(f"  优先级 {priority}: {middleware_info['path']}")
        
        print("\nSpider中间件:")
        for priority, middleware_info in self.spider_middlewares.items():
            print(f"  优先级 {priority}: {middleware_info['path']}")
        
        # 打印统计信息
        stats = self.get_middleware_stats()
        
        print("\n统计信息:")
        for mw_type, mw_stats in stats.items():
            print(f"\n{mw_type}:")
            for mw_path, mw_stat in mw_stats.items():
                print(f"  {mw_path}:")
                for key, value in mw_stat.items():
                    print(f"    {key}: {value}")

# 创建中间件管理器示例
print("中间件配置管理完成!")

5. 本章小结

本章详细介绍了Scrapy中间件的开发与应用,主要内容包括:

5.1 核心概念

  1. 中间件类型

    • 下载中间件:处理请求和响应
    • Spider中间件:处理Spider的输入输出
  2. 工作流程

    • 请求处理流程和响应处理流程
    • 中间件执行顺序和优先级

5.2 实用中间件

  1. 基础功能中间件

    • 请求延迟和重试机制
    • 统计信息收集
  2. 代理中间件

    • 多种代理模式(随机、轮询、故障转移)
    • 代理状态监控和故障检测
  3. 用户代理中间件

    • 多种用户代理策略
    • 浏览器特定和移动端优先模式
  4. 数据验证中间件

    • 字段验证和格式检查
    • 重复检测和数据清洗

5.3 配置管理

  • 中间件配置和优先级设置
  • 动态加载和统计信息收集
  • 中间件管理器的实现

6. 最佳实践

6.1 中间件设计原则

# 中间件设计最佳实践
design_principles = {
    "单一职责": "每个中间件只负责一个特定功能",
    "可配置性": "通过settings进行灵活配置",
    "错误处理": "优雅处理异常和错误情况",
    "性能考虑": "避免阻塞操作和内存泄漏",
    "统计监控": "提供详细的运行统计信息"
}

6.2 性能优化

# 性能优化技巧
performance_tips = {
    "缓存机制": "缓存计算结果和查询结果",
    "批量处理": "批量处理请求和响应",
    "异步操作": "使用异步操作避免阻塞",
    "内存管理": "及时清理不需要的数据",
    "优先级设置": "合理设置中间件优先级"
}

7. 常见陷阱

7.1 配置陷阱

  • 优先级冲突:中间件优先级设置不当
  • 循环依赖:中间件之间的循环依赖
  • 配置错误:settings配置错误导致中间件无法加载

7.2 性能陷阱

  • 阻塞操作:在中间件中执行耗时操作
  • 内存泄漏:未及时清理缓存和统计数据
  • 过度处理:对每个请求都执行复杂操作

8. 下一步学习

完成本章后,建议继续学习:

  1. 第6章:分布式爬虫 - 学习Scrapy-Redis分布式方案
  2. 第7章:反爬虫对抗 - 掌握反爬虫技术和对策
  3. 第8章:数据存储 - 了解多种数据存储方案
  4. 第9章:监控与部署 - 学习爬虫监控和生产部署

9. 练习题

9.1 基础练习

  1. 实现一个简单的请求限速中间件
  2. 开发一个响应缓存中间件
  3. 创建一个请求统计中间件

9.2 进阶练习

  1. 实现一个智能代理轮换中间件,支持代理健康检查
  2. 开发一个基于机器学习的反爬虫检测中间件
  3. 创建一个分布式会话管理中间件

9.3 项目练习

  1. 综合中间件系统:构建一个完整的中间件管理系统
  2. 智能爬虫中间件:开发具有自适应能力的智能中间件
  3. 监控中间件:创建实时监控和告警中间件

恭喜! 您已经掌握了Scrapy中间件的开发与应用。中间件是Scrapy框架的核心组件,掌握中间件开发将大大提升您的爬虫开发能力。继续学习下一章,探索分布式爬虫的奥秘!