本章目标
通过本章学习,您将能够: - 深入理解Spider的工作原理 - 掌握不同类型的Spider - 学会处理复杂的爬取场景 - 实现高级的数据提取技术 - 优化Spider性能
1. Spider基础原理
1.1 Spider生命周期
import scrapy
from scrapy import signals
from scrapy.exceptions import DontCloseSpider
class LifecycleSpider(scrapy.Spider):
name = 'lifecycle'
start_urls = ['http://quotes.toscrape.com/']
@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
"""
Spider实例化方法
"""
spider = super().from_crawler(crawler, *args, **kwargs)
# 连接信号
crawler.signals.connect(spider.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(spider.spider_closed, signal=signals.spider_closed)
crawler.signals.connect(spider.item_scraped, signal=signals.item_scraped)
return spider
def spider_opened(self, spider):
"""
Spider开启时调用
"""
self.logger.info(f'Spider {spider.name} 开始运行')
self.start_time = time.time()
self.scraped_count = 0
def spider_closed(self, spider, reason):
"""
Spider关闭时调用
"""
end_time = time.time()
duration = end_time - self.start_time
self.logger.info(f'Spider {spider.name} 运行结束')
self.logger.info(f'运行时间: {duration:.2f}秒')
self.logger.info(f'爬取数量: {self.scraped_count}')
self.logger.info(f'关闭原因: {reason}')
def item_scraped(self, item, response, spider):
"""
Item被爬取时调用
"""
self.scraped_count += 1
if self.scraped_count % 100 == 0:
self.logger.info(f'已爬取 {self.scraped_count} 个items')
def parse(self, response):
"""
解析方法
"""
quotes = response.css('div.quote')
for quote in quotes:
yield {
'text': quote.css('span.text::text').get(),
'author': quote.css('small.author::text').get(),
'tags': quote.css('div.tags a.tag::text').getall(),
}
# 分页
next_page = response.css('li.next a::attr(href)').get()
if next_page:
yield response.follow(next_page, self.parse)
print("🔄 Spider生命周期示例完成!")
1.2 Request和Response处理
# 2. Request和Response详解
print("\n📡 Request和Response处理:")
class RequestResponseSpider(scrapy.Spider):
name = 'request_response'
start_urls = ['http://quotes.toscrape.com/']
def start_requests(self):
"""
自定义初始请求
"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
}
cookies = {
'session_id': 'abc123',
'user_preference': 'dark_mode',
}
for url in self.start_urls:
yield scrapy.Request(
url=url,
headers=headers,
cookies=cookies,
callback=self.parse,
meta={
'page_type': 'listing',
'retry_count': 0,
'download_timeout': 30,
},
dont_filter=False, # 是否过滤重复请求
priority=0, # 请求优先级
)
def parse(self, response):
"""
解析响应
"""
# 检查响应状态
if response.status != 200:
self.logger.warning(f'非200状态码: {response.status}')
return
# 获取请求信息
request_meta = response.meta
self.logger.info(f'页面类型: {request_meta.get("page_type")}')
# 检查响应头
content_type = response.headers.get('Content-Type', b'').decode()
if 'text/html' not in content_type:
self.logger.warning(f'非HTML内容: {content_type}')
return
# 检查页面编码
encoding = response.encoding
self.logger.debug(f'页面编码: {encoding}')
# 提取数据
quotes = response.css('div.quote')
for quote in quotes:
# 创建详情页请求
author_url = quote.css('small.author ~ a::attr(href)').get()
if author_url:
yield response.follow(
author_url,
callback=self.parse_author,
meta={
'quote_data': {
'text': quote.css('span.text::text').get(),
'author': quote.css('small.author::text').get(),
'tags': quote.css('div.tags a.tag::text').getall(),
},
'page_type': 'detail',
},
headers={'Referer': response.url}, # 设置引用页
)
# 处理分页
next_page = response.css('li.next a::attr(href)').get()
if next_page:
yield response.follow(
next_page,
callback=self.parse,
meta={
'page_type': 'listing',
'page_number': request_meta.get('page_number', 1) + 1,
}
)
def parse_author(self, response):
"""
解析作者详情页
"""
quote_data = response.meta['quote_data']
# 提取作者信息
author_info = {
'name': response.css('h3.author-title::text').get(),
'birth_date': response.css('span.author-born-date::text').get(),
'birth_location': response.css('span.author-born-location::text').get(),
'description': response.css('div.author-description::text').get(),
}
# 合并数据
quote_data['author_info'] = author_info
quote_data['scraped_from'] = response.url
yield quote_data
print("Request和Response处理示例完成!")
2. 不同类型的Spider
2.1 CrawlSpider
# 3. CrawlSpider详解
print("\n🕷️ CrawlSpider使用:")
from scrapy.spiders import CrawlSpider, Rule
from scrapy.linkextractors import LinkExtractor
class NewsCrawlSpider(CrawlSpider):
name = 'news_crawl'
allowed_domains = ['example-news.com']
start_urls = ['http://example-news.com/']
# 定义链接提取规则
rules = (
# 提取分类页面链接
Rule(
LinkExtractor(
allow=r'/category/\w+/$',
deny=r'/category/ads/$',
),
callback='parse_category',
follow=True,
),
# 提取文章页面链接
Rule(
LinkExtractor(
allow=r'/article/\d+/$',
restrict_css='div.article-list',
),
callback='parse_article',
follow=False,
),
# 提取分页链接
Rule(
LinkExtractor(
allow=r'/page/\d+/$',
restrict_css='div.pagination',
),
follow=True,
),
)
def parse_category(self, response):
"""
解析分类页面
"""
category = self.extract_category_from_url(response.url)
# 提取分类信息
category_info = {
'name': response.css('h1.category-title::text').get(),
'description': response.css('div.category-desc::text').get(),
'url': response.url,
'article_count': len(response.css('div.article-item')),
}
self.logger.info(f'解析分类: {category}')
yield category_info
def parse_article(self, response):
"""
解析文章页面
"""
# 提取文章数据
article = {
'title': response.css('h1.article-title::text').get(),
'content': self.extract_content(response),
'author': response.css('span.author::text').get(),
'publish_date': response.css('time.publish-date::attr(datetime)').get(),
'category': self.extract_category_from_url(response.url),
'tags': response.css('div.tags a.tag::text').getall(),
'url': response.url,
}
# 数据清洗
article = self.clean_article_data(article)
yield article
def extract_category_from_url(self, url):
"""
从URL提取分类
"""
import re
match = re.search(r'/category/(\w+)/', url)
return match.group(1) if match else 'unknown'
def extract_content(self, response):
"""
提取文章内容
"""
# 提取段落文本
paragraphs = response.css('div.article-content p::text').getall()
content = '\n'.join([p.strip() for p in paragraphs if p.strip()])
return content
def clean_article_data(self, article):
"""
清洗文章数据
"""
# 清洗标题
if article['title']:
article['title'] = article['title'].strip()
# 清洗内容
if article['content']:
article['content'] = re.sub(r'\s+', ' ', article['content']).strip()
# 验证必要字段
if not article['title'] or not article['content']:
self.logger.warning(f'文章数据不完整: {article["url"]}')
return article
print("CrawlSpider示例完成!")
2.2 XMLFeedSpider
# 4. XMLFeedSpider详解
print("\n📰 XMLFeedSpider使用:")
from scrapy.spiders import XMLFeedSpider
class RSSSpider(XMLFeedSpider):
name = 'rss'
allowed_domains = ['example.com']
start_urls = ['http://example.com/rss.xml']
# XML迭代器设置
iterator = 'iternodes' # 可选: 'iternodes', 'xml', 'html'
itertag = 'item' # 要迭代的标签
def parse_node(self, response, selector):
"""
解析XML节点
"""
# 提取RSS item数据
item = {
'title': selector.css('title::text').get(),
'link': selector.css('link::text').get(),
'description': selector.css('description::text').get(),
'pub_date': selector.css('pubDate::text').get(),
'category': selector.css('category::text').get(),
'guid': selector.css('guid::text').get(),
}
# 数据清洗
item = self.clean_rss_item(item)
# 如果需要获取完整内容,可以发起新请求
if item['link']:
yield response.follow(
item['link'],
callback=self.parse_full_article,
meta={'rss_item': item}
)
else:
yield item
def parse_full_article(self, response):
"""
解析完整文章
"""
rss_item = response.meta['rss_item']
# 提取完整内容
full_content = response.css('div.article-content').get()
if full_content:
rss_item['full_content'] = full_content
# 提取额外信息
rss_item['word_count'] = len(response.css('div.article-content::text').get().split())
rss_item['images'] = response.css('div.article-content img::attr(src)').getall()
yield rss_item
def clean_rss_item(self, item):
"""
清洗RSS数据
"""
# 清洗HTML标签
if item['description']:
from w3lib.html import remove_tags
item['description'] = remove_tags(item['description']).strip()
# 解析日期
if item['pub_date']:
item['pub_date'] = self.parse_date(item['pub_date'])
return item
def parse_date(self, date_str):
"""
解析日期字符串
"""
from dateutil import parser
try:
return parser.parse(date_str).isoformat()
except:
return date_str
print("XMLFeedSpider示例完成!")
2.3 CSVFeedSpider
# 5. CSVFeedSpider详解
print("\n📊 CSVFeedSpider使用:")
from scrapy.spiders import CSVFeedSpider
class ProductCSVSpider(CSVFeedSpider):
name = 'product_csv'
allowed_domains = ['example.com']
start_urls = ['http://example.com/products.csv']
# CSV设置
delimiter = ','
quotechar = '"'
headers = ['id', 'name', 'price', 'category', 'url']
def parse_row(self, response, row):
"""
解析CSV行
"""
# 验证数据
if not row.get('url'):
self.logger.warning(f'产品 {row.get("id")} 缺少URL')
return
# 创建产品数据
product = {
'csv_id': row['id'],
'csv_name': row['name'],
'csv_price': row['price'],
'csv_category': row['category'],
}
# 请求产品详情页
yield response.follow(
row['url'],
callback=self.parse_product,
meta={'csv_product': product}
)
def parse_product(self, response):
"""
解析产品详情页
"""
csv_product = response.meta['csv_product']
# 提取详细信息
detailed_product = {
**csv_product,
'title': response.css('h1.product-title::text').get(),
'description': response.css('div.product-description::text').get(),
'images': response.css('div.product-images img::attr(src)').getall(),
'rating': response.css('div.rating span.score::text').get(),
'reviews_count': response.css('div.rating span.count::text').get(),
'availability': response.css('span.availability::text').get(),
'url': response.url,
}
# 数据验证和清洗
detailed_product = self.validate_product(detailed_product)
yield detailed_product
def validate_product(self, product):
"""
验证产品数据
"""
# 价格验证
if product.get('csv_price'):
try:
price_value = float(re.sub(r'[^\d.]', '', product['csv_price']))
product['price_numeric'] = price_value
except ValueError:
self.logger.warning(f'无效价格: {product["csv_price"]}')
# 评分验证
if product.get('rating'):
try:
rating_value = float(product['rating'])
product['rating_numeric'] = rating_value
except ValueError:
self.logger.warning(f'无效评分: {product["rating"]}')
return product
print("CSVFeedSpider示例完成!")
3. 高级数据提取技术
3.1 复杂选择器
# 6. 复杂选择器技术
print("\n🎯 复杂选择器技术:")
class AdvancedSelectorSpider(scrapy.Spider):
name = 'advanced_selector'
start_urls = ['http://example.com/complex-page']
def parse(self, response):
"""
使用高级选择器技术
"""
# 1. 组合选择器
self.demo_combined_selectors(response)
# 2. 条件选择器
self.demo_conditional_selectors(response)
# 3. 文本处理选择器
self.demo_text_processing(response)
# 4. 属性提取技巧
self.demo_attribute_extraction(response)
def demo_combined_selectors(self, response):
"""
组合选择器示例
"""
print("\n1. 组合选择器:")
# CSS + XPath组合
elements = response.css('div.content').xpath('.//p[contains(@class, "highlight")]')
# 多级选择
nested_data = response.css('div.article').css('div.meta').css('span.author::text').getall()
# 兄弟元素选择
next_siblings = response.css('h2.title').xpath('./following-sibling::p[1]/text()').getall()
# 父元素选择
parent_elements = response.css('span.price').xpath('./parent::div/@class').getall()
print(f"嵌套数据: {nested_data[:3]}")
print(f"兄弟元素: {next_siblings[:3]}")
print(f"父元素类名: {parent_elements[:3]}")
def demo_conditional_selectors(self, response):
"""
条件选择器示例
"""
print("\n2. 条件选择器:")
# 基于属性条件
featured_products = response.css('div.product[data-featured="true"]')
# 基于文本内容条件
sale_items = response.xpath('//div[@class="product"][contains(.//span[@class="price"], "Sale")]')
# 基于位置条件
first_three = response.css('div.item:nth-child(-n+3)')
last_item = response.css('div.item:last-child')
# 基于存在性条件
with_images = response.css('div.product:has(img)')
print(f"特色产品: {len(featured_products)}")
print(f"促销商品: {len(sale_items)}")
print(f"前三个项目: {len(first_three)}")
def demo_text_processing(self, response):
"""
文本处理示例
"""
print("\n3. 文本处理:")
# 提取并清洗文本
raw_text = response.css('div.description::text').get()
if raw_text:
cleaned_text = self.clean_text(raw_text)
print(f"清洗前: '{raw_text[:50]}...'")
print(f"清洗后: '{cleaned_text[:50]}...'")
# 提取数字
price_text = response.css('span.price::text').get()
if price_text:
price_value = self.extract_price(price_text)
print(f"价格文本: {price_text}")
print(f"价格数值: {price_value}")
# 提取日期
date_text = response.css('time.publish-date::text').get()
if date_text:
parsed_date = self.parse_date(date_text)
print(f"日期文本: {date_text}")
print(f"解析日期: {parsed_date}")
def demo_attribute_extraction(self, response):
"""
属性提取示例
"""
print("\n4. 属性提取:")
# 提取多个属性
links = response.css('a')
for link in links[:3]:
link_data = {
'text': link.css('::text').get(),
'href': link.css('::attr(href)').get(),
'title': link.css('::attr(title)').get(),
'class': link.css('::attr(class)').get(),
}
print(f"链接数据: {link_data}")
# 提取数据属性
products = response.css('div.product')
for product in products[:3]:
product_data = {
'id': product.css('::attr(data-id)').get(),
'category': product.css('::attr(data-category)').get(),
'price': product.css('::attr(data-price)').get(),
}
print(f"产品数据: {product_data}")
def clean_text(self, text):
"""
清洗文本
"""
if not text:
return ""
import re
# 移除多余空白
text = re.sub(r'\s+', ' ', text)
# 移除首尾空白
text = text.strip()
# 移除特殊字符
text = re.sub(r'[^\w\s\-.,!?]', '', text)
return text
def extract_price(self, price_text):
"""
提取价格
"""
if not price_text:
return None
import re
# 提取数字
match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', ''))
return float(match.group()) if match else None
def parse_date(self, date_text):
"""
解析日期
"""
if not date_text:
return None
from dateutil import parser
try:
return parser.parse(date_text).isoformat()
except:
return None
print("高级选择器技术示例完成!")
3.2 JavaScript处理
# 7. JavaScript处理技术
print("\n🌐 JavaScript处理:")
class JavaScriptSpider(scrapy.Spider):
name = 'javascript'
start_urls = ['http://example.com/spa-page']
def parse(self, response):
"""
处理JavaScript渲染的页面
"""
# 检查是否为SPA页面
if self.is_spa_page(response):
self.logger.info('检测到SPA页面,需要JavaScript渲染')
# 方法1: 使用Splash
yield self.create_splash_request(response.url)
# 方法2: 使用Selenium
# yield self.create_selenium_request(response.url)
else:
# 常规处理
yield from self.parse_regular_page(response)
def is_spa_page(self, response):
"""
检测是否为SPA页面
"""
# 检查页面内容
body_text = response.css('body::text').get()
if not body_text or len(body_text.strip()) < 100:
return True
# 检查JavaScript框架
js_frameworks = [
'react', 'vue', 'angular', 'backbone',
'ember', 'knockout', 'meteor'
]
page_content = response.text.lower()
for framework in js_frameworks:
if framework in page_content:
return True
return False
def create_splash_request(self, url):
"""
创建Splash请求
"""
splash_args = {
'html': 1,
'png': 0,
'wait': 3,
'timeout': 30,
'resource_timeout': 10,
}
return scrapy.Request(
url=f'http://localhost:8050/render.html',
method='POST',
body=json.dumps({
'url': url,
'wait': 3,
'html': 1,
'png': 0,
}),
headers={'Content-Type': 'application/json'},
callback=self.parse_splash_response,
meta={'original_url': url}
)
def parse_splash_response(self, response):
"""
解析Splash响应
"""
import json
try:
data = json.loads(response.text)
html_content = data.get('html', '')
if html_content:
# 创建新的响应对象
from scrapy.http import HtmlResponse
rendered_response = HtmlResponse(
url=response.meta['original_url'],
body=html_content.encode('utf-8'),
encoding='utf-8'
)
# 解析渲染后的内容
yield from self.parse_rendered_page(rendered_response)
except json.JSONDecodeError:
self.logger.error('Splash响应解析失败')
def parse_rendered_page(self, response):
"""
解析渲染后的页面
"""
# 提取动态加载的内容
dynamic_items = response.css('div.dynamic-item')
for item in dynamic_items:
yield {
'title': item.css('h3.title::text').get(),
'content': item.css('div.content::text').get(),
'timestamp': item.css('span.timestamp::text').get(),
'url': response.url,
'rendered': True,
}
def parse_regular_page(self, response):
"""
解析常规页面
"""
# 常规页面处理逻辑
items = response.css('div.item')
for item in items:
yield {
'title': item.css('h3::text').get(),
'content': item.css('p::text').get(),
'url': response.url,
'rendered': False,
}
print("JavaScript处理示例完成!")
4. 错误处理和重试机制
4.1 异常处理
# 8. 异常处理机制
print("\n🛡️ 异常处理:")
from scrapy.exceptions import IgnoreRequest, CloseSpider
from scrapy.downloadermiddlewares.retry import RetryMiddleware
class RobustSpider(scrapy.Spider):
name = 'robust'
start_urls = ['http://example.com/']
# 自定义设置
custom_settings = {
'RETRY_TIMES': 3,
'RETRY_HTTP_CODES': [500, 502, 503, 504, 408, 429],
'DOWNLOAD_TIMEOUT': 30,
}
def parse(self, response):
"""
带异常处理的解析方法
"""
try:
# 检查响应状态
self.validate_response(response)
# 提取数据
items = self.extract_items(response)
for item in items:
yield item
# 处理分页
yield from self.handle_pagination(response)
except Exception as e:
self.handle_parse_error(response, e)
def validate_response(self, response):
"""
验证响应
"""
# 检查状态码
if response.status >= 400:
raise IgnoreRequest(f'HTTP错误: {response.status}')
# 检查内容类型
content_type = response.headers.get('Content-Type', b'').decode()
if 'text/html' not in content_type:
raise IgnoreRequest(f'非HTML内容: {content_type}')
# 检查页面大小
if len(response.body) < 100:
raise IgnoreRequest('页面内容过少')
# 检查错误页面
if self.is_error_page(response):
raise IgnoreRequest('检测到错误页面')
def is_error_page(self, response):
"""
检测错误页面
"""
error_indicators = [
'page not found',
'404 error',
'access denied',
'server error',
'temporarily unavailable'
]
page_text = response.text.lower()
return any(indicator in page_text for indicator in error_indicators)
def extract_items(self, response):
"""
提取数据项
"""
items = []
try:
elements = response.css('div.item')
for element in elements:
item = self.extract_single_item(element, response)
if item:
items.append(item)
except Exception as e:
self.logger.error(f'数据提取失败: {e}')
raise
return items
def extract_single_item(self, element, response):
"""
提取单个数据项
"""
try:
item = {
'title': element.css('h3::text').get(),
'description': element.css('p::text').get(),
'url': response.url,
}
# 数据验证
if not item['title']:
self.logger.warning('标题为空,跳过该项')
return None
return item
except Exception as e:
self.logger.error(f'单项提取失败: {e}')
return None
def handle_pagination(self, response):
"""
处理分页
"""
try:
next_page = response.css('a.next::attr(href)').get()
if next_page:
# 检查分页限制
current_page = response.meta.get('page_number', 1)
max_pages = getattr(self, 'max_pages', 100)
if current_page >= max_pages:
self.logger.info(f'达到最大页数限制: {max_pages}')
return
yield response.follow(
next_page,
callback=self.parse,
meta={'page_number': current_page + 1},
errback=self.handle_request_error
)
except Exception as e:
self.logger.error(f'分页处理失败: {e}')
def handle_parse_error(self, response, error):
"""
处理解析错误
"""
self.logger.error(f'解析错误 {response.url}: {error}')
# 记录错误统计
if not hasattr(self, 'error_count'):
self.error_count = 0
self.error_count += 1
# 错误过多时停止爬虫
if self.error_count > 100:
raise CloseSpider('错误过多,停止爬虫')
def handle_request_error(self, failure):
"""
处理请求错误
"""
request = failure.request
self.logger.error(f'请求失败 {request.url}: {failure.value}')
# 根据错误类型决定是否重试
if failure.check(TimeoutError):
self.logger.info('超时错误,将重试')
return request.replace(dont_filter=True)
elif failure.check(ConnectionError):
self.logger.info('连接错误,将重试')
return request.replace(dont_filter=True)
else:
self.logger.error('未知错误,不重试')
print("异常处理示例完成!")
4.2 自定义重试中间件
# 9. 自定义重试中间件
print("\n🔄 自定义重试中间件:")
class CustomRetryMiddleware:
"""
自定义重试中间件
"""
def __init__(self, settings):
self.max_retry_times = settings.getint('RETRY_TIMES')
self.retry_http_codes = set(int(x) for x in settings.getlist('RETRY_HTTP_CODES'))
self.priority_adjust = settings.getint('RETRY_PRIORITY_ADJUST', -1)
@classmethod
def from_crawler(cls, crawler):
return cls(crawler.settings)
def process_response(self, request, response, spider):
"""
处理响应
"""
if response.status in self.retry_http_codes:
reason = f'HTTP {response.status}'
return self._retry(request, reason, spider) or response
# 检查响应内容
if self._should_retry_content(response, spider):
reason = 'Invalid content'
return self._retry(request, reason, spider) or response
return response
def process_exception(self, request, exception, spider):
"""
处理异常
"""
if isinstance(exception, (TimeoutError, ConnectionError)):
reason = f'{exception.__class__.__name__}: {exception}'
return self._retry(request, reason, spider)
return None
def _should_retry_content(self, response, spider):
"""
检查是否应该基于内容重试
"""
# 检查页面大小
if len(response.body) < 100:
spider.logger.warning(f'页面内容过少: {response.url}')
return True
# 检查错误页面标识
error_indicators = [
'temporarily unavailable',
'service unavailable',
'please try again',
'rate limit exceeded'
]
page_text = response.text.lower()
for indicator in error_indicators:
if indicator in page_text:
spider.logger.warning(f'检测到错误页面: {indicator}')
return True
return False
def _retry(self, request, reason, spider):
"""
执行重试
"""
retries = request.meta.get('retry_times', 0) + 1
if retries <= self.max_retry_times:
spider.logger.info(f'重试 {request.url} (第{retries}次): {reason}')
# 创建重试请求
retry_req = request.copy()
retry_req.meta['retry_times'] = retries
retry_req.priority = request.priority + self.priority_adjust
# 添加延迟
retry_req.meta['download_delay'] = retries * 2
return retry_req
else:
spider.logger.error(f'重试次数超限 {request.url}: {reason}')
return None
print("自定义重试中间件示例完成!")
5. 性能优化技术
5.1 并发控制
# 10. 并发控制和性能优化
print("\n⚡ 性能优化:")
class OptimizedSpider(scrapy.Spider):
name = 'optimized'
start_urls = ['http://example.com/']
# 性能优化设置
custom_settings = {
# 并发设置
'CONCURRENT_REQUESTS': 32,
'CONCURRENT_REQUESTS_PER_DOMAIN': 16,
'CONCURRENT_ITEMS': 100,
# 下载延迟
'DOWNLOAD_DELAY': 0.5,
'RANDOMIZE_DOWNLOAD_DELAY': 0.5,
'AUTOTHROTTLE_ENABLED': True,
'AUTOTHROTTLE_START_DELAY': 1,
'AUTOTHROTTLE_MAX_DELAY': 10,
'AUTOTHROTTLE_TARGET_CONCURRENCY': 2.0,
# 内存优化
'MEMUSAGE_ENABLED': True,
'MEMUSAGE_LIMIT_MB': 2048,
'MEMUSAGE_WARNING_MB': 1024,
# 缓存设置
'HTTPCACHE_ENABLED': True,
'HTTPCACHE_EXPIRATION_SECS': 3600,
'HTTPCACHE_DIR': 'httpcache',
'HTTPCACHE_IGNORE_HTTP_CODES': [503, 504, 505, 500, 403, 404, 408],
# 压缩
'COMPRESSION_ENABLED': True,
}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.processed_count = 0
self.start_time = time.time()
def parse(self, response):
"""
优化的解析方法
"""
# 批量处理
items = response.css('div.item')
# 使用生成器减少内存使用
for item in self.process_items_batch(items, response):
yield item
# 智能分页
yield from self.smart_pagination(response)
def process_items_batch(self, items, response):
"""
批量处理数据项
"""
batch_size = 50
batch = []
for item_selector in items:
item_data = self.extract_item_data(item_selector, response)
if item_data:
batch.append(item_data)
if len(batch) >= batch_size:
# 批量处理
yield from self.process_batch(batch)
batch = []
# 处理剩余项目
if batch:
yield from self.process_batch(batch)
def extract_item_data(self, selector, response):
"""
提取单项数据
"""
try:
return {
'title': selector.css('h3::text').get(),
'price': selector.css('span.price::text').get(),
'url': response.url,
}
except Exception as e:
self.logger.error(f'数据提取失败: {e}')
return None
def process_batch(self, batch):
"""
处理数据批次
"""
# 批量数据清洗和验证
cleaned_batch = []
for item in batch:
cleaned_item = self.clean_item(item)
if cleaned_item:
cleaned_batch.append(cleaned_item)
# 更新统计
self.processed_count += len(cleaned_batch)
# 记录进度
if self.processed_count % 1000 == 0:
elapsed = time.time() - self.start_time
rate = self.processed_count / elapsed
self.logger.info(f'已处理 {self.processed_count} 项,速率: {rate:.2f} 项/秒')
yield from cleaned_batch
def clean_item(self, item):
"""
清洗数据项
"""
if not item.get('title'):
return None
# 清洗价格
if item.get('price'):
price_text = item['price']
price_value = self.extract_price(price_text)
item['price_numeric'] = price_value
return item
def extract_price(self, price_text):
"""
提取价格数值
"""
import re
match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', ''))
return float(match.group()) if match else None
def smart_pagination(self, response):
"""
智能分页处理
"""
# 检查是否还有更多页面
next_page = response.css('a.next::attr(href)').get()
if next_page:
current_page = response.meta.get('page_number', 1)
# 动态调整并发
if current_page > 10:
# 减少并发以避免被封
yield scrapy.Request(
response.urljoin(next_page),
callback=self.parse,
meta={
'page_number': current_page + 1,
'download_delay': 2,
}
)
else:
yield response.follow(
next_page,
callback=self.parse,
meta={'page_number': current_page + 1}
)
print("性能优化示例完成!")
6. 本章小结
6.1 核心知识点
Spider原理和生命周期
- Spider的工作机制
- 生命周期管理
- 信号处理
不同类型的Spider
- CrawlSpider的规则配置
- XMLFeedSpider的XML处理
- CSVFeedSpider的CSV处理
高级数据提取
- 复杂选择器技术
- JavaScript页面处理
- 数据清洗和验证
错误处理和重试
- 异常处理机制
- 自定义重试策略
- 错误监控和统计
性能优化
- 并发控制策略
- 内存使用优化
- 批量处理技术
6.2 最佳实践
- 🏗️ 架构设计: 根据需求选择合适的Spider类型
- 🎯 选择器优化: 使用高效、稳定的选择器
- 🛡️ 错误处理: 实现完善的异常处理机制
- ⚡ 性能调优: 合理配置并发和延迟参数
6.3 常见陷阱
- ❌ 过度并发: 并发过高导致被封IP
- ❌ 内存泄漏: 大量数据处理时内存管理不当
- ❌ 错误忽略: 忽略错误处理导致数据丢失
- ❌ 选择器脆弱: 选择器过于依赖页面结构
6.4 下一步学习
- 📦 学习Items和Pipelines的使用
- 🔧 掌握中间件的开发和配置
- 🌐 了解分布式爬虫的实现
- 🛡️ 学习反爬虫技术和应对策略
7. 练习题
7.1 基础练习
Spider类型选择
- 为不同场景选择合适的Spider类型
- 配置CrawlSpider规则
- 处理XML和CSV数据源
数据提取优化
- 编写复杂的选择器
- 实现数据清洗功能
- 处理JavaScript渲染页面
错误处理
- 实现异常处理机制
- 配置重试策略
- 添加错误监控
7.2 进阶练习
性能优化
- 优化Spider并发配置
- 实现批量数据处理
- 监控内存使用
复杂场景处理
- 处理多种数据格式
- 实现智能分页
- 处理动态内容
自定义中间件
- 开发自定义重试中间件
- 实现请求过滤中间件
- 添加性能监控中间件
7.3 挑战练习
大规模爬虫
- 设计高性能爬虫架构
- 实现分布式爬取
- 优化资源使用
复杂网站爬取
- 处理SPA应用
- 应对反爬虫措施
- 实现智能重试
实时监控
- 实现爬虫状态监控
- 添加性能指标收集
- 设计报警机制
恭喜您完成第3章的学习! 🎉
您已经深入掌握了Spider的高级功能和优化技术。在下一章中,我们将学习数据提取和处理的更多技巧。