本章目标

通过本章学习,您将能够: - 深入理解Spider的工作原理 - 掌握不同类型的Spider - 学会处理复杂的爬取场景 - 实现高级的数据提取技术 - 优化Spider性能

1. Spider基础原理

1.1 Spider生命周期

import scrapy
from scrapy import signals
from scrapy.exceptions import DontCloseSpider

class LifecycleSpider(scrapy.Spider):
    name = 'lifecycle'
    start_urls = ['http://quotes.toscrape.com/']
    
    @classmethod
    def from_crawler(cls, crawler, *args, **kwargs):
        """
        Spider实例化方法
        """
        spider = super().from_crawler(crawler, *args, **kwargs)
        
        # 连接信号
        crawler.signals.connect(spider.spider_opened, signal=signals.spider_opened)
        crawler.signals.connect(spider.spider_closed, signal=signals.spider_closed)
        crawler.signals.connect(spider.item_scraped, signal=signals.item_scraped)
        
        return spider
    
    def spider_opened(self, spider):
        """
        Spider开启时调用
        """
        self.logger.info(f'Spider {spider.name} 开始运行')
        self.start_time = time.time()
        self.scraped_count = 0
    
    def spider_closed(self, spider, reason):
        """
        Spider关闭时调用
        """
        end_time = time.time()
        duration = end_time - self.start_time
        self.logger.info(f'Spider {spider.name} 运行结束')
        self.logger.info(f'运行时间: {duration:.2f}秒')
        self.logger.info(f'爬取数量: {self.scraped_count}')
        self.logger.info(f'关闭原因: {reason}')
    
    def item_scraped(self, item, response, spider):
        """
        Item被爬取时调用
        """
        self.scraped_count += 1
        if self.scraped_count % 100 == 0:
            self.logger.info(f'已爬取 {self.scraped_count} 个items')
    
    def parse(self, response):
        """
        解析方法
        """
        quotes = response.css('div.quote')
        
        for quote in quotes:
            yield {
                'text': quote.css('span.text::text').get(),
                'author': quote.css('small.author::text').get(),
                'tags': quote.css('div.tags a.tag::text').getall(),
            }
        
        # 分页
        next_page = response.css('li.next a::attr(href)').get()
        if next_page:
            yield response.follow(next_page, self.parse)

print("🔄 Spider生命周期示例完成!")

1.2 Request和Response处理

# 2. Request和Response详解
print("\n📡 Request和Response处理:")

class RequestResponseSpider(scrapy.Spider):
    name = 'request_response'
    start_urls = ['http://quotes.toscrape.com/']
    
    def start_requests(self):
        """
        自定义初始请求
        """
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.5',
            'Accept-Encoding': 'gzip, deflate',
            'Connection': 'keep-alive',
        }
        
        cookies = {
            'session_id': 'abc123',
            'user_preference': 'dark_mode',
        }
        
        for url in self.start_urls:
            yield scrapy.Request(
                url=url,
                headers=headers,
                cookies=cookies,
                callback=self.parse,
                meta={
                    'page_type': 'listing',
                    'retry_count': 0,
                    'download_timeout': 30,
                },
                dont_filter=False,  # 是否过滤重复请求
                priority=0,         # 请求优先级
            )
    
    def parse(self, response):
        """
        解析响应
        """
        # 检查响应状态
        if response.status != 200:
            self.logger.warning(f'非200状态码: {response.status}')
            return
        
        # 获取请求信息
        request_meta = response.meta
        self.logger.info(f'页面类型: {request_meta.get("page_type")}')
        
        # 检查响应头
        content_type = response.headers.get('Content-Type', b'').decode()
        if 'text/html' not in content_type:
            self.logger.warning(f'非HTML内容: {content_type}')
            return
        
        # 检查页面编码
        encoding = response.encoding
        self.logger.debug(f'页面编码: {encoding}')
        
        # 提取数据
        quotes = response.css('div.quote')
        
        for quote in quotes:
            # 创建详情页请求
            author_url = quote.css('small.author ~ a::attr(href)').get()
            if author_url:
                yield response.follow(
                    author_url,
                    callback=self.parse_author,
                    meta={
                        'quote_data': {
                            'text': quote.css('span.text::text').get(),
                            'author': quote.css('small.author::text').get(),
                            'tags': quote.css('div.tags a.tag::text').getall(),
                        },
                        'page_type': 'detail',
                    },
                    headers={'Referer': response.url},  # 设置引用页
                )
        
        # 处理分页
        next_page = response.css('li.next a::attr(href)').get()
        if next_page:
            yield response.follow(
                next_page,
                callback=self.parse,
                meta={
                    'page_type': 'listing',
                    'page_number': request_meta.get('page_number', 1) + 1,
                }
            )
    
    def parse_author(self, response):
        """
        解析作者详情页
        """
        quote_data = response.meta['quote_data']
        
        # 提取作者信息
        author_info = {
            'name': response.css('h3.author-title::text').get(),
            'birth_date': response.css('span.author-born-date::text').get(),
            'birth_location': response.css('span.author-born-location::text').get(),
            'description': response.css('div.author-description::text').get(),
        }
        
        # 合并数据
        quote_data['author_info'] = author_info
        quote_data['scraped_from'] = response.url
        
        yield quote_data

print("Request和Response处理示例完成!")

2. 不同类型的Spider

2.1 CrawlSpider

# 3. CrawlSpider详解
print("\n🕷️ CrawlSpider使用:")

from scrapy.spiders import CrawlSpider, Rule
from scrapy.linkextractors import LinkExtractor

class NewsCrawlSpider(CrawlSpider):
    name = 'news_crawl'
    allowed_domains = ['example-news.com']
    start_urls = ['http://example-news.com/']
    
    # 定义链接提取规则
    rules = (
        # 提取分类页面链接
        Rule(
            LinkExtractor(
                allow=r'/category/\w+/$',
                deny=r'/category/ads/$',
            ),
            callback='parse_category',
            follow=True,
        ),
        
        # 提取文章页面链接
        Rule(
            LinkExtractor(
                allow=r'/article/\d+/$',
                restrict_css='div.article-list',
            ),
            callback='parse_article',
            follow=False,
        ),
        
        # 提取分页链接
        Rule(
            LinkExtractor(
                allow=r'/page/\d+/$',
                restrict_css='div.pagination',
            ),
            follow=True,
        ),
    )
    
    def parse_category(self, response):
        """
        解析分类页面
        """
        category = self.extract_category_from_url(response.url)
        
        # 提取分类信息
        category_info = {
            'name': response.css('h1.category-title::text').get(),
            'description': response.css('div.category-desc::text').get(),
            'url': response.url,
            'article_count': len(response.css('div.article-item')),
        }
        
        self.logger.info(f'解析分类: {category}')
        yield category_info
    
    def parse_article(self, response):
        """
        解析文章页面
        """
        # 提取文章数据
        article = {
            'title': response.css('h1.article-title::text').get(),
            'content': self.extract_content(response),
            'author': response.css('span.author::text').get(),
            'publish_date': response.css('time.publish-date::attr(datetime)').get(),
            'category': self.extract_category_from_url(response.url),
            'tags': response.css('div.tags a.tag::text').getall(),
            'url': response.url,
        }
        
        # 数据清洗
        article = self.clean_article_data(article)
        
        yield article
    
    def extract_category_from_url(self, url):
        """
        从URL提取分类
        """
        import re
        match = re.search(r'/category/(\w+)/', url)
        return match.group(1) if match else 'unknown'
    
    def extract_content(self, response):
        """
        提取文章内容
        """
        # 提取段落文本
        paragraphs = response.css('div.article-content p::text').getall()
        content = '\n'.join([p.strip() for p in paragraphs if p.strip()])
        
        return content
    
    def clean_article_data(self, article):
        """
        清洗文章数据
        """
        # 清洗标题
        if article['title']:
            article['title'] = article['title'].strip()
        
        # 清洗内容
        if article['content']:
            article['content'] = re.sub(r'\s+', ' ', article['content']).strip()
        
        # 验证必要字段
        if not article['title'] or not article['content']:
            self.logger.warning(f'文章数据不完整: {article["url"]}')
        
        return article

print("CrawlSpider示例完成!")

2.2 XMLFeedSpider

# 4. XMLFeedSpider详解
print("\n📰 XMLFeedSpider使用:")

from scrapy.spiders import XMLFeedSpider

class RSSSpider(XMLFeedSpider):
    name = 'rss'
    allowed_domains = ['example.com']
    start_urls = ['http://example.com/rss.xml']
    
    # XML迭代器设置
    iterator = 'iternodes'  # 可选: 'iternodes', 'xml', 'html'
    itertag = 'item'        # 要迭代的标签
    
    def parse_node(self, response, selector):
        """
        解析XML节点
        """
        # 提取RSS item数据
        item = {
            'title': selector.css('title::text').get(),
            'link': selector.css('link::text').get(),
            'description': selector.css('description::text').get(),
            'pub_date': selector.css('pubDate::text').get(),
            'category': selector.css('category::text').get(),
            'guid': selector.css('guid::text').get(),
        }
        
        # 数据清洗
        item = self.clean_rss_item(item)
        
        # 如果需要获取完整内容,可以发起新请求
        if item['link']:
            yield response.follow(
                item['link'],
                callback=self.parse_full_article,
                meta={'rss_item': item}
            )
        else:
            yield item
    
    def parse_full_article(self, response):
        """
        解析完整文章
        """
        rss_item = response.meta['rss_item']
        
        # 提取完整内容
        full_content = response.css('div.article-content').get()
        if full_content:
            rss_item['full_content'] = full_content
        
        # 提取额外信息
        rss_item['word_count'] = len(response.css('div.article-content::text').get().split())
        rss_item['images'] = response.css('div.article-content img::attr(src)').getall()
        
        yield rss_item
    
    def clean_rss_item(self, item):
        """
        清洗RSS数据
        """
        # 清洗HTML标签
        if item['description']:
            from w3lib.html import remove_tags
            item['description'] = remove_tags(item['description']).strip()
        
        # 解析日期
        if item['pub_date']:
            item['pub_date'] = self.parse_date(item['pub_date'])
        
        return item
    
    def parse_date(self, date_str):
        """
        解析日期字符串
        """
        from dateutil import parser
        try:
            return parser.parse(date_str).isoformat()
        except:
            return date_str

print("XMLFeedSpider示例完成!")

2.3 CSVFeedSpider

# 5. CSVFeedSpider详解
print("\n📊 CSVFeedSpider使用:")

from scrapy.spiders import CSVFeedSpider

class ProductCSVSpider(CSVFeedSpider):
    name = 'product_csv'
    allowed_domains = ['example.com']
    start_urls = ['http://example.com/products.csv']
    
    # CSV设置
    delimiter = ','
    quotechar = '"'
    headers = ['id', 'name', 'price', 'category', 'url']
    
    def parse_row(self, response, row):
        """
        解析CSV行
        """
        # 验证数据
        if not row.get('url'):
            self.logger.warning(f'产品 {row.get("id")} 缺少URL')
            return
        
        # 创建产品数据
        product = {
            'csv_id': row['id'],
            'csv_name': row['name'],
            'csv_price': row['price'],
            'csv_category': row['category'],
        }
        
        # 请求产品详情页
        yield response.follow(
            row['url'],
            callback=self.parse_product,
            meta={'csv_product': product}
        )
    
    def parse_product(self, response):
        """
        解析产品详情页
        """
        csv_product = response.meta['csv_product']
        
        # 提取详细信息
        detailed_product = {
            **csv_product,
            'title': response.css('h1.product-title::text').get(),
            'description': response.css('div.product-description::text').get(),
            'images': response.css('div.product-images img::attr(src)').getall(),
            'rating': response.css('div.rating span.score::text').get(),
            'reviews_count': response.css('div.rating span.count::text').get(),
            'availability': response.css('span.availability::text').get(),
            'url': response.url,
        }
        
        # 数据验证和清洗
        detailed_product = self.validate_product(detailed_product)
        
        yield detailed_product
    
    def validate_product(self, product):
        """
        验证产品数据
        """
        # 价格验证
        if product.get('csv_price'):
            try:
                price_value = float(re.sub(r'[^\d.]', '', product['csv_price']))
                product['price_numeric'] = price_value
            except ValueError:
                self.logger.warning(f'无效价格: {product["csv_price"]}')
        
        # 评分验证
        if product.get('rating'):
            try:
                rating_value = float(product['rating'])
                product['rating_numeric'] = rating_value
            except ValueError:
                self.logger.warning(f'无效评分: {product["rating"]}')
        
        return product

print("CSVFeedSpider示例完成!")

3. 高级数据提取技术

3.1 复杂选择器

# 6. 复杂选择器技术
print("\n🎯 复杂选择器技术:")

class AdvancedSelectorSpider(scrapy.Spider):
    name = 'advanced_selector'
    start_urls = ['http://example.com/complex-page']
    
    def parse(self, response):
        """
        使用高级选择器技术
        """
        # 1. 组合选择器
        self.demo_combined_selectors(response)
        
        # 2. 条件选择器
        self.demo_conditional_selectors(response)
        
        # 3. 文本处理选择器
        self.demo_text_processing(response)
        
        # 4. 属性提取技巧
        self.demo_attribute_extraction(response)
    
    def demo_combined_selectors(self, response):
        """
        组合选择器示例
        """
        print("\n1. 组合选择器:")
        
        # CSS + XPath组合
        elements = response.css('div.content').xpath('.//p[contains(@class, "highlight")]')
        
        # 多级选择
        nested_data = response.css('div.article').css('div.meta').css('span.author::text').getall()
        
        # 兄弟元素选择
        next_siblings = response.css('h2.title').xpath('./following-sibling::p[1]/text()').getall()
        
        # 父元素选择
        parent_elements = response.css('span.price').xpath('./parent::div/@class').getall()
        
        print(f"嵌套数据: {nested_data[:3]}")
        print(f"兄弟元素: {next_siblings[:3]}")
        print(f"父元素类名: {parent_elements[:3]}")
    
    def demo_conditional_selectors(self, response):
        """
        条件选择器示例
        """
        print("\n2. 条件选择器:")
        
        # 基于属性条件
        featured_products = response.css('div.product[data-featured="true"]')
        
        # 基于文本内容条件
        sale_items = response.xpath('//div[@class="product"][contains(.//span[@class="price"], "Sale")]')
        
        # 基于位置条件
        first_three = response.css('div.item:nth-child(-n+3)')
        last_item = response.css('div.item:last-child')
        
        # 基于存在性条件
        with_images = response.css('div.product:has(img)')
        
        print(f"特色产品: {len(featured_products)}")
        print(f"促销商品: {len(sale_items)}")
        print(f"前三个项目: {len(first_three)}")
    
    def demo_text_processing(self, response):
        """
        文本处理示例
        """
        print("\n3. 文本处理:")
        
        # 提取并清洗文本
        raw_text = response.css('div.description::text').get()
        if raw_text:
            cleaned_text = self.clean_text(raw_text)
            print(f"清洗前: '{raw_text[:50]}...'")
            print(f"清洗后: '{cleaned_text[:50]}...'")
        
        # 提取数字
        price_text = response.css('span.price::text').get()
        if price_text:
            price_value = self.extract_price(price_text)
            print(f"价格文本: {price_text}")
            print(f"价格数值: {price_value}")
        
        # 提取日期
        date_text = response.css('time.publish-date::text').get()
        if date_text:
            parsed_date = self.parse_date(date_text)
            print(f"日期文本: {date_text}")
            print(f"解析日期: {parsed_date}")
    
    def demo_attribute_extraction(self, response):
        """
        属性提取示例
        """
        print("\n4. 属性提取:")
        
        # 提取多个属性
        links = response.css('a')
        for link in links[:3]:
            link_data = {
                'text': link.css('::text').get(),
                'href': link.css('::attr(href)').get(),
                'title': link.css('::attr(title)').get(),
                'class': link.css('::attr(class)').get(),
            }
            print(f"链接数据: {link_data}")
        
        # 提取数据属性
        products = response.css('div.product')
        for product in products[:3]:
            product_data = {
                'id': product.css('::attr(data-id)').get(),
                'category': product.css('::attr(data-category)').get(),
                'price': product.css('::attr(data-price)').get(),
            }
            print(f"产品数据: {product_data}")
    
    def clean_text(self, text):
        """
        清洗文本
        """
        if not text:
            return ""
        
        import re
        # 移除多余空白
        text = re.sub(r'\s+', ' ', text)
        # 移除首尾空白
        text = text.strip()
        # 移除特殊字符
        text = re.sub(r'[^\w\s\-.,!?]', '', text)
        
        return text
    
    def extract_price(self, price_text):
        """
        提取价格
        """
        if not price_text:
            return None
        
        import re
        # 提取数字
        match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', ''))
        return float(match.group()) if match else None
    
    def parse_date(self, date_text):
        """
        解析日期
        """
        if not date_text:
            return None
        
        from dateutil import parser
        try:
            return parser.parse(date_text).isoformat()
        except:
            return None

print("高级选择器技术示例完成!")

3.2 JavaScript处理

# 7. JavaScript处理技术
print("\n🌐 JavaScript处理:")

class JavaScriptSpider(scrapy.Spider):
    name = 'javascript'
    start_urls = ['http://example.com/spa-page']
    
    def parse(self, response):
        """
        处理JavaScript渲染的页面
        """
        # 检查是否为SPA页面
        if self.is_spa_page(response):
            self.logger.info('检测到SPA页面,需要JavaScript渲染')
            
            # 方法1: 使用Splash
            yield self.create_splash_request(response.url)
            
            # 方法2: 使用Selenium
            # yield self.create_selenium_request(response.url)
        else:
            # 常规处理
            yield from self.parse_regular_page(response)
    
    def is_spa_page(self, response):
        """
        检测是否为SPA页面
        """
        # 检查页面内容
        body_text = response.css('body::text').get()
        if not body_text or len(body_text.strip()) < 100:
            return True
        
        # 检查JavaScript框架
        js_frameworks = [
            'react', 'vue', 'angular', 'backbone',
            'ember', 'knockout', 'meteor'
        ]
        
        page_content = response.text.lower()
        for framework in js_frameworks:
            if framework in page_content:
                return True
        
        return False
    
    def create_splash_request(self, url):
        """
        创建Splash请求
        """
        splash_args = {
            'html': 1,
            'png': 0,
            'wait': 3,
            'timeout': 30,
            'resource_timeout': 10,
        }
        
        return scrapy.Request(
            url=f'http://localhost:8050/render.html',
            method='POST',
            body=json.dumps({
                'url': url,
                'wait': 3,
                'html': 1,
                'png': 0,
            }),
            headers={'Content-Type': 'application/json'},
            callback=self.parse_splash_response,
            meta={'original_url': url}
        )
    
    def parse_splash_response(self, response):
        """
        解析Splash响应
        """
        import json
        
        try:
            data = json.loads(response.text)
            html_content = data.get('html', '')
            
            if html_content:
                # 创建新的响应对象
                from scrapy.http import HtmlResponse
                
                rendered_response = HtmlResponse(
                    url=response.meta['original_url'],
                    body=html_content.encode('utf-8'),
                    encoding='utf-8'
                )
                
                # 解析渲染后的内容
                yield from self.parse_rendered_page(rendered_response)
            
        except json.JSONDecodeError:
            self.logger.error('Splash响应解析失败')
    
    def parse_rendered_page(self, response):
        """
        解析渲染后的页面
        """
        # 提取动态加载的内容
        dynamic_items = response.css('div.dynamic-item')
        
        for item in dynamic_items:
            yield {
                'title': item.css('h3.title::text').get(),
                'content': item.css('div.content::text').get(),
                'timestamp': item.css('span.timestamp::text').get(),
                'url': response.url,
                'rendered': True,
            }
    
    def parse_regular_page(self, response):
        """
        解析常规页面
        """
        # 常规页面处理逻辑
        items = response.css('div.item')
        
        for item in items:
            yield {
                'title': item.css('h3::text').get(),
                'content': item.css('p::text').get(),
                'url': response.url,
                'rendered': False,
            }

print("JavaScript处理示例完成!")

4. 错误处理和重试机制

4.1 异常处理

# 8. 异常处理机制
print("\n🛡️ 异常处理:")

from scrapy.exceptions import IgnoreRequest, CloseSpider
from scrapy.downloadermiddlewares.retry import RetryMiddleware

class RobustSpider(scrapy.Spider):
    name = 'robust'
    start_urls = ['http://example.com/']
    
    # 自定义设置
    custom_settings = {
        'RETRY_TIMES': 3,
        'RETRY_HTTP_CODES': [500, 502, 503, 504, 408, 429],
        'DOWNLOAD_TIMEOUT': 30,
    }
    
    def parse(self, response):
        """
        带异常处理的解析方法
        """
        try:
            # 检查响应状态
            self.validate_response(response)
            
            # 提取数据
            items = self.extract_items(response)
            
            for item in items:
                yield item
            
            # 处理分页
            yield from self.handle_pagination(response)
            
        except Exception as e:
            self.handle_parse_error(response, e)
    
    def validate_response(self, response):
        """
        验证响应
        """
        # 检查状态码
        if response.status >= 400:
            raise IgnoreRequest(f'HTTP错误: {response.status}')
        
        # 检查内容类型
        content_type = response.headers.get('Content-Type', b'').decode()
        if 'text/html' not in content_type:
            raise IgnoreRequest(f'非HTML内容: {content_type}')
        
        # 检查页面大小
        if len(response.body) < 100:
            raise IgnoreRequest('页面内容过少')
        
        # 检查错误页面
        if self.is_error_page(response):
            raise IgnoreRequest('检测到错误页面')
    
    def is_error_page(self, response):
        """
        检测错误页面
        """
        error_indicators = [
            'page not found',
            '404 error',
            'access denied',
            'server error',
            'temporarily unavailable'
        ]
        
        page_text = response.text.lower()
        return any(indicator in page_text for indicator in error_indicators)
    
    def extract_items(self, response):
        """
        提取数据项
        """
        items = []
        
        try:
            elements = response.css('div.item')
            
            for element in elements:
                item = self.extract_single_item(element, response)
                if item:
                    items.append(item)
        
        except Exception as e:
            self.logger.error(f'数据提取失败: {e}')
            raise
        
        return items
    
    def extract_single_item(self, element, response):
        """
        提取单个数据项
        """
        try:
            item = {
                'title': element.css('h3::text').get(),
                'description': element.css('p::text').get(),
                'url': response.url,
            }
            
            # 数据验证
            if not item['title']:
                self.logger.warning('标题为空,跳过该项')
                return None
            
            return item
            
        except Exception as e:
            self.logger.error(f'单项提取失败: {e}')
            return None
    
    def handle_pagination(self, response):
        """
        处理分页
        """
        try:
            next_page = response.css('a.next::attr(href)').get()
            
            if next_page:
                # 检查分页限制
                current_page = response.meta.get('page_number', 1)
                max_pages = getattr(self, 'max_pages', 100)
                
                if current_page >= max_pages:
                    self.logger.info(f'达到最大页数限制: {max_pages}')
                    return
                
                yield response.follow(
                    next_page,
                    callback=self.parse,
                    meta={'page_number': current_page + 1},
                    errback=self.handle_request_error
                )
        
        except Exception as e:
            self.logger.error(f'分页处理失败: {e}')
    
    def handle_parse_error(self, response, error):
        """
        处理解析错误
        """
        self.logger.error(f'解析错误 {response.url}: {error}')
        
        # 记录错误统计
        if not hasattr(self, 'error_count'):
            self.error_count = 0
        self.error_count += 1
        
        # 错误过多时停止爬虫
        if self.error_count > 100:
            raise CloseSpider('错误过多,停止爬虫')
    
    def handle_request_error(self, failure):
        """
        处理请求错误
        """
        request = failure.request
        self.logger.error(f'请求失败 {request.url}: {failure.value}')
        
        # 根据错误类型决定是否重试
        if failure.check(TimeoutError):
            self.logger.info('超时错误,将重试')
            return request.replace(dont_filter=True)
        
        elif failure.check(ConnectionError):
            self.logger.info('连接错误,将重试')
            return request.replace(dont_filter=True)
        
        else:
            self.logger.error('未知错误,不重试')

print("异常处理示例完成!")

4.2 自定义重试中间件

# 9. 自定义重试中间件
print("\n🔄 自定义重试中间件:")

class CustomRetryMiddleware:
    """
    自定义重试中间件
    """
    
    def __init__(self, settings):
        self.max_retry_times = settings.getint('RETRY_TIMES')
        self.retry_http_codes = set(int(x) for x in settings.getlist('RETRY_HTTP_CODES'))
        self.priority_adjust = settings.getint('RETRY_PRIORITY_ADJUST', -1)
    
    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler.settings)
    
    def process_response(self, request, response, spider):
        """
        处理响应
        """
        if response.status in self.retry_http_codes:
            reason = f'HTTP {response.status}'
            return self._retry(request, reason, spider) or response
        
        # 检查响应内容
        if self._should_retry_content(response, spider):
            reason = 'Invalid content'
            return self._retry(request, reason, spider) or response
        
        return response
    
    def process_exception(self, request, exception, spider):
        """
        处理异常
        """
        if isinstance(exception, (TimeoutError, ConnectionError)):
            reason = f'{exception.__class__.__name__}: {exception}'
            return self._retry(request, reason, spider)
        
        return None
    
    def _should_retry_content(self, response, spider):
        """
        检查是否应该基于内容重试
        """
        # 检查页面大小
        if len(response.body) < 100:
            spider.logger.warning(f'页面内容过少: {response.url}')
            return True
        
        # 检查错误页面标识
        error_indicators = [
            'temporarily unavailable',
            'service unavailable',
            'please try again',
            'rate limit exceeded'
        ]
        
        page_text = response.text.lower()
        for indicator in error_indicators:
            if indicator in page_text:
                spider.logger.warning(f'检测到错误页面: {indicator}')
                return True
        
        return False
    
    def _retry(self, request, reason, spider):
        """
        执行重试
        """
        retries = request.meta.get('retry_times', 0) + 1
        
        if retries <= self.max_retry_times:
            spider.logger.info(f'重试 {request.url} (第{retries}次): {reason}')
            
            # 创建重试请求
            retry_req = request.copy()
            retry_req.meta['retry_times'] = retries
            retry_req.priority = request.priority + self.priority_adjust
            
            # 添加延迟
            retry_req.meta['download_delay'] = retries * 2
            
            return retry_req
        else:
            spider.logger.error(f'重试次数超限 {request.url}: {reason}')
            return None

print("自定义重试中间件示例完成!")

5. 性能优化技术

5.1 并发控制

# 10. 并发控制和性能优化
print("\n⚡ 性能优化:")

class OptimizedSpider(scrapy.Spider):
    name = 'optimized'
    start_urls = ['http://example.com/']
    
    # 性能优化设置
    custom_settings = {
        # 并发设置
        'CONCURRENT_REQUESTS': 32,
        'CONCURRENT_REQUESTS_PER_DOMAIN': 16,
        'CONCURRENT_ITEMS': 100,
        
        # 下载延迟
        'DOWNLOAD_DELAY': 0.5,
        'RANDOMIZE_DOWNLOAD_DELAY': 0.5,
        'AUTOTHROTTLE_ENABLED': True,
        'AUTOTHROTTLE_START_DELAY': 1,
        'AUTOTHROTTLE_MAX_DELAY': 10,
        'AUTOTHROTTLE_TARGET_CONCURRENCY': 2.0,
        
        # 内存优化
        'MEMUSAGE_ENABLED': True,
        'MEMUSAGE_LIMIT_MB': 2048,
        'MEMUSAGE_WARNING_MB': 1024,
        
        # 缓存设置
        'HTTPCACHE_ENABLED': True,
        'HTTPCACHE_EXPIRATION_SECS': 3600,
        'HTTPCACHE_DIR': 'httpcache',
        'HTTPCACHE_IGNORE_HTTP_CODES': [503, 504, 505, 500, 403, 404, 408],
        
        # 压缩
        'COMPRESSION_ENABLED': True,
    }
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.processed_count = 0
        self.start_time = time.time()
    
    def parse(self, response):
        """
        优化的解析方法
        """
        # 批量处理
        items = response.css('div.item')
        
        # 使用生成器减少内存使用
        for item in self.process_items_batch(items, response):
            yield item
        
        # 智能分页
        yield from self.smart_pagination(response)
    
    def process_items_batch(self, items, response):
        """
        批量处理数据项
        """
        batch_size = 50
        batch = []
        
        for item_selector in items:
            item_data = self.extract_item_data(item_selector, response)
            if item_data:
                batch.append(item_data)
                
                if len(batch) >= batch_size:
                    # 批量处理
                    yield from self.process_batch(batch)
                    batch = []
        
        # 处理剩余项目
        if batch:
            yield from self.process_batch(batch)
    
    def extract_item_data(self, selector, response):
        """
        提取单项数据
        """
        try:
            return {
                'title': selector.css('h3::text').get(),
                'price': selector.css('span.price::text').get(),
                'url': response.url,
            }
        except Exception as e:
            self.logger.error(f'数据提取失败: {e}')
            return None
    
    def process_batch(self, batch):
        """
        处理数据批次
        """
        # 批量数据清洗和验证
        cleaned_batch = []
        
        for item in batch:
            cleaned_item = self.clean_item(item)
            if cleaned_item:
                cleaned_batch.append(cleaned_item)
        
        # 更新统计
        self.processed_count += len(cleaned_batch)
        
        # 记录进度
        if self.processed_count % 1000 == 0:
            elapsed = time.time() - self.start_time
            rate = self.processed_count / elapsed
            self.logger.info(f'已处理 {self.processed_count} 项,速率: {rate:.2f} 项/秒')
        
        yield from cleaned_batch
    
    def clean_item(self, item):
        """
        清洗数据项
        """
        if not item.get('title'):
            return None
        
        # 清洗价格
        if item.get('price'):
            price_text = item['price']
            price_value = self.extract_price(price_text)
            item['price_numeric'] = price_value
        
        return item
    
    def extract_price(self, price_text):
        """
        提取价格数值
        """
        import re
        match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', ''))
        return float(match.group()) if match else None
    
    def smart_pagination(self, response):
        """
        智能分页处理
        """
        # 检查是否还有更多页面
        next_page = response.css('a.next::attr(href)').get()
        
        if next_page:
            current_page = response.meta.get('page_number', 1)
            
            # 动态调整并发
            if current_page > 10:
                # 减少并发以避免被封
                yield scrapy.Request(
                    response.urljoin(next_page),
                    callback=self.parse,
                    meta={
                        'page_number': current_page + 1,
                        'download_delay': 2,
                    }
                )
            else:
                yield response.follow(
                    next_page,
                    callback=self.parse,
                    meta={'page_number': current_page + 1}
                )

print("性能优化示例完成!")

6. 本章小结

6.1 核心知识点

  1. Spider原理和生命周期

    • Spider的工作机制
    • 生命周期管理
    • 信号处理
  2. 不同类型的Spider

    • CrawlSpider的规则配置
    • XMLFeedSpider的XML处理
    • CSVFeedSpider的CSV处理
  3. 高级数据提取

    • 复杂选择器技术
    • JavaScript页面处理
    • 数据清洗和验证
  4. 错误处理和重试

    • 异常处理机制
    • 自定义重试策略
    • 错误监控和统计
  5. 性能优化

    • 并发控制策略
    • 内存使用优化
    • 批量处理技术

6.2 最佳实践

  • 🏗️ 架构设计: 根据需求选择合适的Spider类型
  • 🎯 选择器优化: 使用高效、稳定的选择器
  • 🛡️ 错误处理: 实现完善的异常处理机制
  • 性能调优: 合理配置并发和延迟参数

6.3 常见陷阱

  • 过度并发: 并发过高导致被封IP
  • 内存泄漏: 大量数据处理时内存管理不当
  • 错误忽略: 忽略错误处理导致数据丢失
  • 选择器脆弱: 选择器过于依赖页面结构

6.4 下一步学习

  • 📦 学习Items和Pipelines的使用
  • 🔧 掌握中间件的开发和配置
  • 🌐 了解分布式爬虫的实现
  • 🛡️ 学习反爬虫技术和应对策略

7. 练习题

7.1 基础练习

  1. Spider类型选择

    • 为不同场景选择合适的Spider类型
    • 配置CrawlSpider规则
    • 处理XML和CSV数据源
  2. 数据提取优化

    • 编写复杂的选择器
    • 实现数据清洗功能
    • 处理JavaScript渲染页面
  3. 错误处理

    • 实现异常处理机制
    • 配置重试策略
    • 添加错误监控

7.2 进阶练习

  1. 性能优化

    • 优化Spider并发配置
    • 实现批量数据处理
    • 监控内存使用
  2. 复杂场景处理

    • 处理多种数据格式
    • 实现智能分页
    • 处理动态内容
  3. 自定义中间件

    • 开发自定义重试中间件
    • 实现请求过滤中间件
    • 添加性能监控中间件

7.3 挑战练习

  1. 大规模爬虫

    • 设计高性能爬虫架构
    • 实现分布式爬取
    • 优化资源使用
  2. 复杂网站爬取

    • 处理SPA应用
    • 应对反爬虫措施
    • 实现智能重试
  3. 实时监控

    • 实现爬虫状态监控
    • 添加性能指标收集
    • 设计报警机制

恭喜您完成第3章的学习! 🎉

您已经深入掌握了Spider的高级功能和优化技术。在下一章中,我们将学习数据提取和处理的更多技巧。