本章目标
通过本章学习,您将能够: - 掌握高级选择器技术 - 学会处理复杂的数据结构 - 实现数据清洗和验证 - 处理多媒体内容 - 优化数据提取性能
1. 高级选择器技术
1.1 CSS选择器进阶
import scrapy
import re
from urllib.parse import urljoin
class AdvancedCSSSpider(scrapy.Spider):
name = 'advanced_css'
start_urls = ['http://example.com/products']
def parse(self, response):
"""
高级CSS选择器示例
"""
print("🎯 高级CSS选择器技术:")
# 1. 属性选择器
self.demo_attribute_selectors(response)
# 2. 伪类选择器
self.demo_pseudo_selectors(response)
# 3. 组合选择器
self.demo_combinator_selectors(response)
# 4. 高级文本处理
self.demo_text_processing(response)
def demo_attribute_selectors(self, response):
"""
属性选择器示例
"""
print("\n1. 属性选择器:")
# 精确匹配
featured_products = response.css('div[data-featured="true"]')
print(f"特色产品数量: {len(featured_products)}")
# 包含匹配
sale_products = response.css('div[class*="sale"]')
print(f"促销产品数量: {len(sale_products)}")
# 开始匹配
category_electronics = response.css('div[data-category^="electronics"]')
print(f"电子产品数量: {len(category_electronics)}")
# 结束匹配
image_jpg = response.css('img[src$=".jpg"]')
print(f"JPG图片数量: {len(image_jpg)}")
# 多属性组合
premium_featured = response.css('div[data-featured="true"][data-premium="true"]')
print(f"高级特色产品: {len(premium_featured)}")
# 提取属性值
for product in featured_products[:3]:
product_data = {
'id': product.css('::attr(data-id)').get(),
'name': product.css('h3.product-name::text').get(),
'price': product.css('span.price::attr(data-price)').get(),
'category': product.css('::attr(data-category)').get(),
'rating': product.css('div.rating::attr(data-rating)').get(),
}
print(f"产品数据: {product_data}")
def demo_pseudo_selectors(self, response):
"""
伪类选择器示例
"""
print("\n2. 伪类选择器:")
# 位置选择器
first_product = response.css('div.product:first-child')
last_product = response.css('div.product:last-child')
third_product = response.css('div.product:nth-child(3)')
even_products = response.css('div.product:nth-child(even)')
odd_products = response.css('div.product:nth-child(odd)')
print(f"第一个产品: {len(first_product)}")
print(f"最后一个产品: {len(last_product)}")
print(f"第三个产品: {len(third_product)}")
print(f"偶数位产品: {len(even_products)}")
print(f"奇数位产品: {len(odd_products)}")
# 类型选择器
first_h3 = response.css('h3:first-of-type')
last_p = response.css('p:last-of-type')
print(f"第一个h3: {len(first_h3)}")
print(f"最后一个p: {len(last_p)}")
# 内容选择器
empty_divs = response.css('div:empty')
non_empty_divs = response.css('div:not(:empty)')
print(f"空div数量: {len(empty_divs)}")
print(f"非空div数量: {len(non_empty_divs)}")
def demo_combinator_selectors(self, response):
"""
组合选择器示例
"""
print("\n3. 组合选择器:")
# 后代选择器
product_titles = response.css('div.product h3.title')
print(f"产品标题数量: {len(product_titles)}")
# 子选择器
direct_children = response.css('div.container > div.product')
print(f"直接子产品: {len(direct_children)}")
# 相邻兄弟选择器
price_after_title = response.css('h3.title + span.price')
print(f"标题后的价格: {len(price_after_title)}")
# 通用兄弟选择器
elements_after_title = response.css('h3.title ~ span')
print(f"标题后的span: {len(elements_after_title)}")
# 复杂组合
featured_sale_products = response.css('div.product[data-featured="true"] span.price.sale')
print(f"特色促销产品: {len(featured_sale_products)}")
def demo_text_processing(self, response):
"""
文本处理示例
"""
print("\n4. 文本处理:")
# 提取并清洗文本
descriptions = response.css('div.product-description::text').getall()
cleaned_descriptions = [self.clean_text(desc) for desc in descriptions]
print(f"原始描述数量: {len(descriptions)}")
print(f"清洗后描述数量: {len([d for d in cleaned_descriptions if d])}")
# 提取数字
prices = response.css('span.price::text').getall()
price_values = [self.extract_price(price) for price in prices]
valid_prices = [p for p in price_values if p is not None]
print(f"价格文本数量: {len(prices)}")
print(f"有效价格数量: {len(valid_prices)}")
if valid_prices:
print(f"平均价格: ${sum(valid_prices)/len(valid_prices):.2f}")
def clean_text(self, text):
"""
清洗文本
"""
if not text:
return ""
# 移除多余空白
text = re.sub(r'\s+', ' ', text)
# 移除首尾空白
text = text.strip()
# 移除特殊字符
text = re.sub(r'[^\w\s\-.,!?()$%]', '', text)
return text if len(text) > 5 else ""
def extract_price(self, price_text):
"""
提取价格
"""
if not price_text:
return None
# 移除货币符号和逗号
cleaned = re.sub(r'[^\d.]', '', price_text)
try:
return float(cleaned)
except ValueError:
return None
print("高级CSS选择器示例完成!")
1.2 XPath高级技术
# 2. XPath高级技术
print("\n🔍 XPath高级技术:")
class AdvancedXPathSpider(scrapy.Spider):
name = 'advanced_xpath'
start_urls = ['http://example.com/complex-page']
def parse(self, response):
"""
XPath高级技术示例
"""
# 1. 轴操作
self.demo_xpath_axes(response)
# 2. 函数使用
self.demo_xpath_functions(response)
# 3. 条件表达式
self.demo_xpath_conditions(response)
# 4. 复杂查询
self.demo_complex_queries(response)
def demo_xpath_axes(self, response):
"""
XPath轴操作示例
"""
print("\n1. XPath轴操作:")
# ancestor轴 - 祖先元素
price_ancestors = response.xpath('//span[@class="price"]/ancestor::div')
print(f"价格元素的祖先div: {len(price_ancestors)}")
# descendant轴 - 后代元素
container_descendants = response.xpath('//div[@class="container"]/descendant::span')
print(f"容器的后代span: {len(container_descendants)}")
# following-sibling轴 - 后续兄弟元素
title_siblings = response.xpath('//h3[@class="title"]/following-sibling::*')
print(f"标题的后续兄弟: {len(title_siblings)}")
# preceding-sibling轴 - 前面兄弟元素
price_preceding = response.xpath('//span[@class="price"]/preceding-sibling::*')
print(f"价格的前面兄弟: {len(price_preceding)}")
# parent轴 - 父元素
image_parents = response.xpath('//img/parent::*')
print(f"图片的父元素: {len(image_parents)}")
# child轴 - 子元素
product_children = response.xpath('//div[@class="product"]/child::*')
print(f"产品的子元素: {len(product_children)}")
def demo_xpath_functions(self, response):
"""
XPath函数示例
"""
print("\n2. XPath函数:")
# 文本函数
# contains() - 包含文本
sale_elements = response.xpath('//*[contains(text(), "Sale")]')
print(f"包含'Sale'的元素: {len(sale_elements)}")
# starts-with() - 开始文本
product_ids = response.xpath('//div[starts-with(@id, "product-")]')
print(f"ID以'product-'开始的div: {len(product_ids)}")
# normalize-space() - 标准化空白
clean_texts = response.xpath('//p[normalize-space(text())!=""]')
print(f"非空段落: {len(clean_texts)}")
# string-length() - 字符串长度
long_titles = response.xpath('//h3[string-length(text()) > 20]')
print(f"长标题(>20字符): {len(long_titles)}")
# 数值函数
# position() - 位置
first_three = response.xpath('//div[@class="product"][position() <= 3]')
print(f"前三个产品: {len(first_three)}")
# last() - 最后一个
last_product = response.xpath('//div[@class="product"][position() = last()]')
print(f"最后一个产品: {len(last_product)}")
# count() - 计数
product_count = response.xpath('count(//div[@class="product"])').get()
print(f"产品总数: {product_count}")
def demo_xpath_conditions(self, response):
"""
XPath条件表达式示例
"""
print("\n3. XPath条件表达式:")
# 属性条件
featured_products = response.xpath('//div[@data-featured="true"]')
print(f"特色产品: {len(featured_products)}")
# 多条件组合
premium_featured = response.xpath('//div[@data-featured="true" and @data-premium="true"]')
print(f"高级特色产品: {len(premium_featured)}")
# 或条件
sale_or_featured = response.xpath('//div[@data-sale="true" or @data-featured="true"]')
print(f"促销或特色产品: {len(sale_or_featured)}")
# 否定条件
not_featured = response.xpath('//div[@class="product" and not(@data-featured)]')
print(f"非特色产品: {len(not_featured)}")
# 数值比较
expensive_products = response.xpath('//div[@data-price > 100]')
print(f"高价产品(>100): {len(expensive_products)}")
# 文本条件
electronics = response.xpath('//div[contains(@data-category, "electronics")]')
print(f"电子产品: {len(electronics)}")
def demo_complex_queries(self, response):
"""
复杂查询示例
"""
print("\n4. 复杂查询:")
# 查找包含特定文本的元素的父级产品
sale_products = response.xpath('//div[@class="product"][.//text()[contains(., "Sale")]]')
print(f"包含'Sale'文本的产品: {len(sale_products)}")
# 查找有图片的产品
products_with_images = response.xpath('//div[@class="product"][.//img]')
print(f"有图片的产品: {len(products_with_images)}")
# 查找价格在特定范围的产品
mid_range_products = response.xpath('//div[@class="product"][@data-price >= 50 and @data-price <= 200]')
print(f"中等价位产品(50-200): {len(mid_range_products)}")
# 查找评分高的产品
high_rated = response.xpath('//div[@class="product"][.//div[@class="rating"]/@data-rating >= 4]')
print(f"高评分产品(>=4): {len(high_rated)}")
# 复杂的层级查询
category_products = response.xpath('''
//div[@class="category"]
[h2[contains(text(), "Electronics")]]
//div[@class="product"]
[@data-featured="true"]
''')
print(f"电子类别的特色产品: {len(category_products)}")
print("XPath高级技术示例完成!")
2. 复杂数据结构处理
2.1 嵌套数据提取
# 3. 嵌套数据结构处理
print("\n🏗️ 嵌套数据结构处理:")
class NestedDataSpider(scrapy.Spider):
name = 'nested_data'
start_urls = ['http://example.com/nested-content']
def parse(self, response):
"""
处理嵌套数据结构
"""
# 1. 多层级产品数据
self.extract_product_hierarchy(response)
# 2. 评论和回复
self.extract_comments_with_replies(response)
# 3. 分类和子分类
self.extract_category_tree(response)
# 4. 表格数据
self.extract_table_data(response)
def extract_product_hierarchy(self, response):
"""
提取产品层级数据
"""
print("\n1. 产品层级数据:")
categories = response.css('div.category')
for category in categories:
category_data = {
'name': category.css('h2.category-title::text').get(),
'description': category.css('p.category-desc::text').get(),
'subcategories': [],
'products': []
}
# 提取子分类
subcategories = category.css('div.subcategory')
for subcategory in subcategories:
subcategory_data = {
'name': subcategory.css('h3.subcategory-title::text').get(),
'products': []
}
# 提取子分类下的产品
products = subcategory.css('div.product')
for product in products:
product_data = self.extract_detailed_product(product, response)
if product_data:
subcategory_data['products'].append(product_data)
category_data['subcategories'].append(subcategory_data)
# 提取分类直属产品
direct_products = category.css('> div.products > div.product')
for product in direct_products:
product_data = self.extract_detailed_product(product, response)
if product_data:
category_data['products'].append(product_data)
yield category_data
def extract_detailed_product(self, product_selector, response):
"""
提取详细产品信息
"""
try:
# 基本信息
product_data = {
'id': product_selector.css('::attr(data-id)').get(),
'name': product_selector.css('h4.product-name::text').get(),
'price': self.extract_price_info(product_selector),
'images': self.extract_image_info(product_selector, response),
'specifications': self.extract_specifications(product_selector),
'reviews': self.extract_review_summary(product_selector),
'availability': self.extract_availability(product_selector),
}
return product_data
except Exception as e:
self.logger.error(f'产品数据提取失败: {e}')
return None
def extract_price_info(self, selector):
"""
提取价格信息
"""
price_info = {}
# 当前价格
current_price = selector.css('span.current-price::text').get()
if current_price:
price_info['current'] = self.parse_price(current_price)
# 原价
original_price = selector.css('span.original-price::text').get()
if original_price:
price_info['original'] = self.parse_price(original_price)
# 折扣
discount = selector.css('span.discount::text').get()
if discount:
price_info['discount'] = discount
return price_info
def extract_image_info(self, selector, response):
"""
提取图片信息
"""
images = []
# 主图
main_image = selector.css('img.main-image::attr(src)').get()
if main_image:
images.append({
'type': 'main',
'url': response.urljoin(main_image),
'alt': selector.css('img.main-image::attr(alt)').get()
})
# 缩略图
thumbnails = selector.css('div.thumbnails img')
for thumb in thumbnails:
thumb_url = thumb.css('::attr(src)').get()
if thumb_url:
images.append({
'type': 'thumbnail',
'url': response.urljoin(thumb_url),
'alt': thumb.css('::attr(alt)').get()
})
return images
def extract_specifications(self, selector):
"""
提取规格信息
"""
specs = {}
spec_rows = selector.css('table.specifications tr')
for row in spec_rows:
key = row.css('td.spec-key::text').get()
value = row.css('td.spec-value::text').get()
if key and value:
specs[key.strip()] = value.strip()
return specs
def extract_review_summary(self, selector):
"""
提取评论摘要
"""
review_info = {}
# 平均评分
rating = selector.css('div.rating span.score::text').get()
if rating:
try:
review_info['average_rating'] = float(rating)
except ValueError:
pass
# 评论数量
count = selector.css('div.rating span.count::text').get()
if count:
review_info['review_count'] = self.extract_number(count)
# 评分分布
rating_bars = selector.css('div.rating-distribution div.rating-bar')
distribution = {}
for bar in rating_bars:
stars = bar.css('::attr(data-stars)').get()
percentage = bar.css('::attr(data-percentage)').get()
if stars and percentage:
distribution[f'{stars}_stars'] = float(percentage)
if distribution:
review_info['rating_distribution'] = distribution
return review_info
def extract_availability(self, selector):
"""
提取库存信息
"""
availability = {}
# 库存状态
status = selector.css('span.stock-status::text').get()
if status:
availability['status'] = status.strip()
# 库存数量
quantity = selector.css('span.stock-quantity::text').get()
if quantity:
availability['quantity'] = self.extract_number(quantity)
# 预计发货时间
shipping = selector.css('span.shipping-time::text').get()
if shipping:
availability['shipping_time'] = shipping.strip()
return availability
def parse_price(self, price_text):
"""
解析价格
"""
if not price_text:
return None
# 移除货币符号和空格
cleaned = re.sub(r'[^\d.]', '', price_text)
try:
return float(cleaned)
except ValueError:
return None
def extract_number(self, text):
"""
提取数字
"""
if not text:
return None
match = re.search(r'\d+', text)
return int(match.group()) if match else None
print("嵌套数据结构处理示例完成!")
2.2 评论和回复处理
# 4. 评论和回复处理
print("\n💬 评论和回复处理:")
def extract_comments_with_replies(self, response):
"""
提取评论和回复
"""
print("\n2. 评论和回复数据:")
comment_sections = response.css('div.comments-section')
for section in comment_sections:
comments_data = {
'product_id': section.css('::attr(data-product-id)').get(),
'total_comments': len(section.css('div.comment')),
'comments': []
}
# 提取顶级评论
top_level_comments = section.css('div.comment.top-level')
for comment in top_level_comments:
comment_data = self.extract_single_comment(comment, response)
# 提取回复
replies = comment.css('div.replies div.comment.reply')
comment_data['replies'] = []
for reply in replies:
reply_data = self.extract_single_comment(reply, response, is_reply=True)
# 提取嵌套回复
nested_replies = reply.css('div.nested-replies div.comment.nested-reply')
reply_data['nested_replies'] = []
for nested_reply in nested_replies:
nested_data = self.extract_single_comment(nested_reply, response, is_nested=True)
reply_data['nested_replies'].append(nested_data)
comment_data['replies'].append(reply_data)
comments_data['comments'].append(comment_data)
yield comments_data
def extract_single_comment(self, comment_selector, response, is_reply=False, is_nested=False):
"""
提取单个评论
"""
comment_data = {
'id': comment_selector.css('::attr(data-comment-id)').get(),
'author': self.extract_author_info(comment_selector),
'content': self.extract_comment_content(comment_selector),
'timestamp': comment_selector.css('time.comment-time::attr(datetime)').get(),
'rating': self.extract_comment_rating(comment_selector),
'helpful_votes': self.extract_helpful_votes(comment_selector),
'is_reply': is_reply,
'is_nested': is_nested,
}
# 提取附件
attachments = self.extract_comment_attachments(comment_selector, response)
if attachments:
comment_data['attachments'] = attachments
return comment_data
def extract_author_info(self, selector):
"""
提取作者信息
"""
author_info = {
'name': selector.css('span.author-name::text').get(),
'avatar': selector.css('img.author-avatar::attr(src)').get(),
'verified': selector.css('span.verified-badge').get() is not None,
'level': selector.css('span.author-level::text').get(),
}
return author_info
def extract_comment_content(self, selector):
"""
提取评论内容
"""
# 提取文本内容
text_content = selector.css('div.comment-text::text').getall()
content = ' '.join([text.strip() for text in text_content if text.strip()])
# 提取提及的用户
mentions = selector.css('div.comment-text a.mention::text').getall()
# 提取标签
tags = selector.css('div.comment-text span.tag::text').getall()
return {
'text': content,
'mentions': mentions,
'tags': tags,
'length': len(content)
}
def extract_comment_rating(self, selector):
"""
提取评论评分
"""
rating_element = selector.css('div.comment-rating')
if not rating_element:
return None
# 星级评分
stars = len(rating_element.css('span.star.filled'))
# 数值评分
rating_value = rating_element.css('span.rating-value::text').get()
return {
'stars': stars,
'value': float(rating_value) if rating_value else None
}
def extract_helpful_votes(self, selector):
"""
提取有用投票
"""
helpful_section = selector.css('div.helpful-votes')
if not helpful_section:
return None
helpful_count = helpful_section.css('span.helpful-count::text').get()
total_votes = helpful_section.css('span.total-votes::text').get()
return {
'helpful': self.extract_number(helpful_count) if helpful_count else 0,
'total': self.extract_number(total_votes) if total_votes else 0
}
def extract_comment_attachments(self, selector, response):
"""
提取评论附件
"""
attachments = []
# 图片附件
images = selector.css('div.comment-images img')
for img in images:
img_url = img.css('::attr(src)').get()
if img_url:
attachments.append({
'type': 'image',
'url': response.urljoin(img_url),
'alt': img.css('::attr(alt)').get(),
'thumbnail': img.css('::attr(data-thumbnail)').get()
})
# 视频附件
videos = selector.css('div.comment-videos video')
for video in videos:
video_url = video.css('::attr(src)').get()
if video_url:
attachments.append({
'type': 'video',
'url': response.urljoin(video_url),
'poster': video.css('::attr(poster)').get(),
'duration': video.css('::attr(data-duration)').get()
})
return attachments
print("评论和回复处理示例完成!")
3. 数据清洗和验证
3.1 数据清洗技术
# 5. 数据清洗技术
print("\n🧹 数据清洗技术:")
import re
from datetime import datetime
from urllib.parse import urljoin, urlparse
import html
class DataCleaningSpider(scrapy.Spider):
name = 'data_cleaning'
start_urls = ['http://example.com/messy-data']
def parse(self, response):
"""
数据清洗示例
"""
raw_items = response.css('div.item')
for item in raw_items:
# 提取原始数据
raw_data = self.extract_raw_data(item, response)
# 清洗数据
cleaned_data = self.clean_item_data(raw_data)
# 验证数据
if self.validate_item_data(cleaned_data):
yield cleaned_data
else:
self.logger.warning(f'数据验证失败: {cleaned_data.get("id", "unknown")}')
def extract_raw_data(self, selector, response):
"""
提取原始数据
"""
return {
'id': selector.css('::attr(data-id)').get(),
'title': selector.css('h3.title::text').get(),
'description': selector.css('div.description').get(),
'price': selector.css('span.price::text').get(),
'date': selector.css('time.date::text').get(),
'url': selector.css('a.link::attr(href)').get(),
'email': selector.css('span.email::text').get(),
'phone': selector.css('span.phone::text').get(),
'tags': selector.css('div.tags span.tag::text').getall(),
'rating': selector.css('div.rating::attr(data-rating)').get(),
'images': selector.css('div.images img::attr(src)').getall(),
}
def clean_item_data(self, raw_data):
"""
清洗数据
"""
cleaned = {}
# 清洗ID
cleaned['id'] = self.clean_id(raw_data.get('id'))
# 清洗标题
cleaned['title'] = self.clean_text(raw_data.get('title'))
# 清洗描述
cleaned['description'] = self.clean_html_content(raw_data.get('description'))
# 清洗价格
cleaned['price'] = self.clean_price(raw_data.get('price'))
# 清洗日期
cleaned['date'] = self.clean_date(raw_data.get('date'))
# 清洗URL
cleaned['url'] = self.clean_url(raw_data.get('url'))
# 清洗邮箱
cleaned['email'] = self.clean_email(raw_data.get('email'))
# 清洗电话
cleaned['phone'] = self.clean_phone(raw_data.get('phone'))
# 清洗标签
cleaned['tags'] = self.clean_tags(raw_data.get('tags', []))
# 清洗评分
cleaned['rating'] = self.clean_rating(raw_data.get('rating'))
# 清洗图片URL
cleaned['images'] = self.clean_image_urls(raw_data.get('images', []))
return cleaned
def clean_id(self, id_value):
"""
清洗ID
"""
if not id_value:
return None
# 移除非字母数字字符
cleaned = re.sub(r'[^\w\-]', '', str(id_value))
return cleaned if cleaned else None
def clean_text(self, text):
"""
清洗文本
"""
if not text:
return ""
# HTML解码
text = html.unescape(text)
# 移除多余空白
text = re.sub(r'\s+', ' ', text)
# 移除首尾空白
text = text.strip()
# 移除控制字符
text = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', text)
return text
def clean_html_content(self, html_content):
"""
清洗HTML内容
"""
if not html_content:
return ""
# 移除脚本和样式
html_content = re.sub(r'<script[^>]*>.*?</script>', '', html_content, flags=re.DOTALL | re.IGNORECASE)
html_content = re.sub(r'<style[^>]*>.*?</style>', '', html_content, flags=re.DOTALL | re.IGNORECASE)
# 移除HTML标签
from w3lib.html import remove_tags
text = remove_tags(html_content)
# 清洗文本
return self.clean_text(text)
def clean_price(self, price_text):
"""
清洗价格
"""
if not price_text:
return None
# 移除货币符号和空格
cleaned = re.sub(r'[^\d.,]', '', price_text)
# 处理千分位分隔符
if ',' in cleaned and '.' in cleaned:
# 假设最后一个点是小数点
parts = cleaned.split('.')
if len(parts) == 2 and len(parts[1]) <= 2:
# 移除逗号
cleaned = cleaned.replace(',', '')
else:
# 逗号是小数点
cleaned = cleaned.replace(',', '.')
elif ',' in cleaned:
# 检查是否为千分位分隔符
if len(cleaned.split(',')[-1]) == 3:
cleaned = cleaned.replace(',', '')
else:
cleaned = cleaned.replace(',', '.')
try:
return float(cleaned)
except ValueError:
return None
def clean_date(self, date_text):
"""
清洗日期
"""
if not date_text:
return None
# 常见日期格式
date_patterns = [
r'(\d{4})-(\d{1,2})-(\d{1,2})', # YYYY-MM-DD
r'(\d{1,2})/(\d{1,2})/(\d{4})', # MM/DD/YYYY
r'(\d{1,2})-(\d{1,2})-(\d{4})', # MM-DD-YYYY
r'(\d{4})年(\d{1,2})月(\d{1,2})日', # 中文格式
]
for pattern in date_patterns:
match = re.search(pattern, date_text)
if match:
try:
if '年' in pattern:
# 中文格式
year, month, day = match.groups()
return f'{year}-{month.zfill(2)}-{day.zfill(2)}'
elif pattern.startswith(r'(\d{4})'):
# YYYY-MM-DD
year, month, day = match.groups()
return f'{year}-{month.zfill(2)}-{day.zfill(2)}'
else:
# MM/DD/YYYY or MM-DD-YYYY
month, day, year = match.groups()
return f'{year}-{month.zfill(2)}-{day.zfill(2)}'
except:
continue
# 尝试使用dateutil解析
try:
from dateutil import parser
parsed_date = parser.parse(date_text)
return parsed_date.strftime('%Y-%m-%d')
except:
return None
def clean_url(self, url):
"""
清洗URL
"""
if not url:
return None
# 移除首尾空白
url = url.strip()
# 添加协议
if url.startswith('//'):
url = 'http:' + url
elif not url.startswith(('http://', 'https://')):
url = 'http://' + url
# 验证URL格式
try:
parsed = urlparse(url)
if parsed.netloc:
return url
except:
pass
return None
def clean_email(self, email):
"""
清洗邮箱
"""
if not email:
return None
# 移除空白
email = email.strip()
# 邮箱格式验证
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if re.match(email_pattern, email):
return email.lower()
return None
def clean_phone(self, phone):
"""
清洗电话号码
"""
if not phone:
return None
# 移除非数字字符
digits = re.sub(r'[^\d]', '', phone)
# 验证长度
if 10 <= len(digits) <= 15:
return digits
return None
def clean_tags(self, tags):
"""
清洗标签
"""
if not tags:
return []
cleaned_tags = []
for tag in tags:
if tag:
# 清洗单个标签
cleaned_tag = self.clean_text(tag)
# 验证标签
if cleaned_tag and len(cleaned_tag) <= 50:
cleaned_tags.append(cleaned_tag.lower())
# 去重并排序
return sorted(list(set(cleaned_tags)))
def clean_rating(self, rating):
"""
清洗评分
"""
if not rating:
return None
try:
rating_value = float(rating)
# 验证评分范围
if 0 <= rating_value <= 5:
return round(rating_value, 1)
except ValueError:
pass
return None
def clean_image_urls(self, image_urls):
"""
清洗图片URL
"""
if not image_urls:
return []
cleaned_urls = []
for url in image_urls:
cleaned_url = self.clean_url(url)
if cleaned_url:
# 验证是否为图片URL
if self.is_image_url(cleaned_url):
cleaned_urls.append(cleaned_url)
return cleaned_urls
def is_image_url(self, url):
"""
验证是否为图片URL
"""
image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.svg']
parsed_url = urlparse(url.lower())
path = parsed_url.path
return any(path.endswith(ext) for ext in image_extensions)
def validate_item_data(self, data):
"""
验证数据完整性
"""
# 必需字段验证
required_fields = ['id', 'title']
for field in required_fields:
if not data.get(field):
self.logger.warning(f'缺少必需字段: {field}')
return False
# 数据类型验证
if data.get('price') is not None and not isinstance(data['price'], (int, float)):
self.logger.warning('价格数据类型错误')
return False
if data.get('rating') is not None and not isinstance(data['rating'], (int, float)):
self.logger.warning('评分数据类型错误')
return False
# 数据范围验证
if data.get('rating') is not None and not (0 <= data['rating'] <= 5):
self.logger.warning('评分超出范围')
return False
return True
print("数据清洗技术示例完成!")
3.2 数据验证和质量控制
# 6. 数据验证和质量控制
print("\n✅ 数据验证和质量控制:")
class DataValidationSpider(scrapy.Spider):
name = 'data_validation'
start_urls = ['http://example.com/products']
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.validation_stats = {
'total_items': 0,
'valid_items': 0,
'invalid_items': 0,
'validation_errors': {}
}
def parse(self, response):
"""
带验证的数据提取
"""
items = response.css('div.product')
for item in items:
self.validation_stats['total_items'] += 1
# 提取数据
product_data = self.extract_product_data(item, response)
# 验证数据
validation_result = self.validate_product_data(product_data)
if validation_result['is_valid']:
self.validation_stats['valid_items'] += 1
yield product_data
else:
self.validation_stats['invalid_items'] += 1
self.record_validation_errors(validation_result['errors'])
# 记录无效数据
self.logger.warning(f'数据验证失败: {product_data.get("id", "unknown")}')
self.logger.warning(f'错误: {validation_result["errors"]}')
def extract_product_data(self, selector, response):
"""
提取产品数据
"""
return {
'id': selector.css('::attr(data-id)').get(),
'name': selector.css('h3.product-name::text').get(),
'price': selector.css('span.price::text').get(),
'description': selector.css('div.description::text').get(),
'category': selector.css('span.category::text').get(),
'brand': selector.css('span.brand::text').get(),
'rating': selector.css('div.rating::attr(data-rating)').get(),
'review_count': selector.css('span.review-count::text').get(),
'availability': selector.css('span.availability::text').get(),
'images': selector.css('div.images img::attr(src)').getall(),
'url': response.url,
'scraped_at': datetime.now().isoformat(),
}
def validate_product_data(self, data):
"""
验证产品数据
"""
errors = []
# 1. 必需字段验证
required_fields = ['id', 'name', 'price']
for field in required_fields:
if not data.get(field):
errors.append(f'缺少必需字段: {field}')
# 2. 数据类型验证
errors.extend(self.validate_data_types(data))
# 3. 数据格式验证
errors.extend(self.validate_data_formats(data))
# 4. 数据范围验证
errors.extend(self.validate_data_ranges(data))
# 5. 业务逻辑验证
errors.extend(self.validate_business_logic(data))
return {
'is_valid': len(errors) == 0,
'errors': errors
}
def validate_data_types(self, data):
"""
验证数据类型
"""
errors = []
# 价格应为数字
if data.get('price'):
try:
price_value = float(re.sub(r'[^\d.]', '', data['price']))
data['price_numeric'] = price_value
except (ValueError, TypeError):
errors.append('价格格式无效')
# 评分应为数字
if data.get('rating'):
try:
rating_value = float(data['rating'])
data['rating_numeric'] = rating_value
except (ValueError, TypeError):
errors.append('评分格式无效')
# 评论数量应为整数
if data.get('review_count'):
try:
count_value = int(re.sub(r'[^\d]', '', data['review_count']))
data['review_count_numeric'] = count_value
except (ValueError, TypeError):
errors.append('评论数量格式无效')
# 图片应为列表
if data.get('images') and not isinstance(data['images'], list):
errors.append('图片数据应为列表')
return errors
def validate_data_formats(self, data):
"""
验证数据格式
"""
errors = []
# ID格式验证
if data.get('id'):
if not re.match(r'^[a-zA-Z0-9\-_]+$', data['id']):
errors.append('ID格式无效')
# URL格式验证
if data.get('url'):
try:
parsed = urlparse(data['url'])
if not parsed.netloc:
errors.append('URL格式无效')
except:
errors.append('URL解析失败')
# 图片URL验证
if data.get('images'):
for img_url in data['images']:
if not self.is_valid_image_url(img_url):
errors.append(f'无效图片URL: {img_url}')
return errors
def validate_data_ranges(self, data):
"""
验证数据范围
"""
errors = []
# 价格范围验证
if data.get('price_numeric'):
if data['price_numeric'] < 0:
errors.append('价格不能为负数')
elif data['price_numeric'] > 1000000:
errors.append('价格过高,可能有误')
# 评分范围验证
if data.get('rating_numeric'):
if not (0 <= data['rating_numeric'] <= 5):
errors.append('评分应在0-5之间')
# 评论数量验证
if data.get('review_count_numeric'):
if data['review_count_numeric'] < 0:
errors.append('评论数量不能为负数')
elif data['review_count_numeric'] > 1000000:
errors.append('评论数量过多,可能有误')
# 文本长度验证
if data.get('name') and len(data['name']) > 200:
errors.append('产品名称过长')
if data.get('description') and len(data['description']) > 5000:
errors.append('产品描述过长')
return errors
def validate_business_logic(self, data):
"""
验证业务逻辑
"""
errors = []
# 评分和评论数量的逻辑关系
if (data.get('rating_numeric') and
data.get('review_count_numeric') == 0):
errors.append('有评分但评论数量为0,逻辑不符')
# 库存状态验证
if data.get('availability'):
valid_statuses = ['in_stock', 'out_of_stock', 'limited', 'pre_order']
if data['availability'].lower() not in valid_statuses:
errors.append('库存状态无效')
# 分类验证
if data.get('category'):
valid_categories = [
'electronics', 'clothing', 'books', 'home', 'sports',
'beauty', 'toys', 'automotive', 'health', 'food'
]
if data['category'].lower() not in valid_categories:
errors.append('产品分类无效')
return errors
def is_valid_image_url(self, url):
"""
验证图片URL
"""
if not url:
return False
try:
parsed = urlparse(url)
if not parsed.netloc:
return False
# 检查文件扩展名
image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp']
return any(parsed.path.lower().endswith(ext) for ext in image_extensions)
except:
return False
def record_validation_errors(self, errors):
"""
记录验证错误统计
"""
for error in errors:
if error in self.validation_stats['validation_errors']:
self.validation_stats['validation_errors'][error] += 1
else:
self.validation_stats['validation_errors'][error] = 1
def closed(self, reason):
"""
爬虫关闭时输出统计信息
"""
stats = self.validation_stats
self.logger.info("=== 数据验证统计 ===")
self.logger.info(f"总数据量: {stats['total_items']}")
self.logger.info(f"有效数据: {stats['valid_items']}")
self.logger.info(f"无效数据: {stats['invalid_items']}")
if stats['total_items'] > 0:
valid_rate = (stats['valid_items'] / stats['total_items']) * 100
self.logger.info(f"数据有效率: {valid_rate:.2f}%")
if stats['validation_errors']:
self.logger.info("=== 验证错误统计 ===")
for error, count in sorted(stats['validation_errors'].items(),
key=lambda x: x[1], reverse=True):
self.logger.info(f"{error}: {count}次")
print("数据验证和质量控制示例完成!")
4. 多媒体内容处理
4.1 图片处理
# 7. 图片处理技术
print("\n🖼️ 图片处理技术:")
import base64
from PIL import Image
import io
import hashlib
class ImageProcessingSpider(scrapy.Spider):
name = 'image_processing'
start_urls = ['http://example.com/gallery']
def parse(self, response):
"""
图片处理示例
"""
# 提取图片信息
images = response.css('div.gallery img')
for img in images:
image_data = self.extract_image_data(img, response)
if image_data:
# 下载和处理图片
yield scrapy.Request(
image_data['url'],
callback=self.process_image,
meta={'image_data': image_data}
)
def extract_image_data(self, img_selector, response):
"""
提取图片数据
"""
img_url = img_selector.css('::attr(src)').get()
if not img_url:
return None
return {
'url': response.urljoin(img_url),
'alt': img_selector.css('::attr(alt)').get(),
'title': img_selector.css('::attr(title)').get(),
'width': img_selector.css('::attr(width)').get(),
'height': img_selector.css('::attr(height)').get(),
'class': img_selector.css('::attr(class)').get(),
'data_attributes': self.extract_data_attributes(img_selector),
'parent_context': self.extract_parent_context(img_selector),
}
def extract_data_attributes(self, selector):
"""
提取data属性
"""
data_attrs = {}
# 获取所有属性
attributes = selector.css('::attr(*)').getall()
for attr in attributes:
if attr.startswith('data-'):
key = attr[5:] # 移除'data-'前缀
value = selector.css(f'::attr({attr})').get()
data_attrs[key] = value
return data_attrs
def extract_parent_context(self, selector):
"""
提取父级上下文
"""
# 获取父级元素信息
parent = selector.xpath('./parent::*')
if parent:
return {
'tag': parent.xpath('name()').get(),
'class': parent.css('::attr(class)').get(),
'id': parent.css('::attr(id)').get(),
}
return None
def process_image(self, response):
"""
处理图片响应
"""
image_data = response.meta['image_data']
try:
# 验证图片
if not self.is_valid_image(response):
self.logger.warning(f'无效图片: {response.url}')
return
# 分析图片
image_info = self.analyze_image(response.body)
# 合并数据
complete_data = {
**image_data,
**image_info,
'file_size': len(response.body),
'content_type': response.headers.get('Content-Type', b'').decode(),
'last_modified': response.headers.get('Last-Modified', b'').decode(),
'etag': response.headers.get('ETag', b'').decode(),
}
# 生成缩略图
thumbnail_data = self.generate_thumbnail(response.body)
if thumbnail_data:
complete_data['thumbnail'] = thumbnail_data
yield complete_data
except Exception as e:
self.logger.error(f'图片处理失败 {response.url}: {e}')
def is_valid_image(self, response):
"""
验证图片有效性
"""
# 检查Content-Type
content_type = response.headers.get('Content-Type', b'').decode().lower()
valid_types = ['image/jpeg', 'image/png', 'image/gif', 'image/webp', 'image/bmp']
if not any(vtype in content_type for vtype in valid_types):
return False
# 检查文件大小
if len(response.body) < 100: # 太小
return False
if len(response.body) > 10 * 1024 * 1024: # 太大 (10MB)
return False
# 尝试打开图片
try:
Image.open(io.BytesIO(response.body))
return True
except:
return False
def analyze_image(self, image_data):
"""
分析图片
"""
try:
img = Image.open(io.BytesIO(image_data))
return {
'format': img.format,
'mode': img.mode,
'size': img.size,
'width': img.width,
'height': img.height,
'aspect_ratio': round(img.width / img.height, 2),
'has_transparency': img.mode in ('RGBA', 'LA') or 'transparency' in img.info,
'color_count': len(img.getcolors(maxcolors=256)) if img.mode == 'P' else None,
'file_hash': hashlib.md5(image_data).hexdigest(),
}
except Exception as e:
self.logger.error(f'图片分析失败: {e}')
return {}
def generate_thumbnail(self, image_data, size=(150, 150)):
"""
生成缩略图
"""
try:
img = Image.open(io.BytesIO(image_data))
# 创建缩略图
img.thumbnail(size, Image.Resampling.LANCZOS)
# 转换为base64
buffer = io.BytesIO()
img.save(buffer, format='JPEG', quality=85)
thumbnail_b64 = base64.b64encode(buffer.getvalue()).decode()
return {
'data': thumbnail_b64,
'size': img.size,
'format': 'JPEG',
}
except Exception as e:
self.logger.error(f'缩略图生成失败: {e}')
return None
print("图片处理技术示例完成!")
4.2 视频和音频处理
# 8. 视频和音频处理
print("\n🎥 视频和音频处理:")
class MediaProcessingSpider(scrapy.Spider):
name = 'media_processing'
start_urls = ['http://example.com/media']
def parse(self, response):
"""
媒体处理示例
"""
# 处理视频
videos = response.css('video, div.video-container')
for video in videos:
video_data = self.extract_video_data(video, response)
if video_data:
yield video_data
# 处理音频
audios = response.css('audio, div.audio-container')
for audio in audios:
audio_data = self.extract_audio_data(audio, response)
if audio_data:
yield audio_data
# 处理嵌入式媒体
embeds = response.css('iframe, embed, object')
for embed in embeds:
embed_data = self.extract_embed_data(embed, response)
if embed_data:
yield embed_data
def extract_video_data(self, video_selector, response):
"""
提取视频数据
"""
video_data = {
'type': 'video',
'sources': [],
'poster': None,
'duration': None,
'controls': False,
'autoplay': False,
'loop': False,
'muted': False,
}
# 检查是否为video标签
if video_selector.css('video'):
video_element = video_selector.css('video')
# 基本属性
video_data.update({
'poster': video_element.css('::attr(poster)').get(),
'duration': video_element.css('::attr(duration)').get(),
'controls': video_element.css('::attr(controls)').get() is not None,
'autoplay': video_element.css('::attr(autoplay)').get() is not None,
'loop': video_element.css('::attr(loop)').get() is not None,
'muted': video_element.css('::attr(muted)').get() is not None,
'width': video_element.css('::attr(width)').get(),
'height': video_element.css('::attr(height)').get(),
})
# 提取视频源
sources = video_element.css('source')
for source in sources:
source_data = {
'src': response.urljoin(source.css('::attr(src)').get() or ''),
'type': source.css('::attr(type)').get(),
'media': source.css('::attr(media)').get(),
'sizes': source.css('::attr(sizes)').get(),
}
video_data['sources'].append(source_data)
# 如果没有source标签,检查src属性
if not video_data['sources']:
src = video_element.css('::attr(src)').get()
if src:
video_data['sources'].append({
'src': response.urljoin(src),
'type': None,
'media': None,
'sizes': None,
})
# 检查容器中的视频信息
else:
# 查找data属性中的视频信息
video_url = video_selector.css('::attr(data-video-url)').get()
if video_url:
video_data['sources'].append({
'src': response.urljoin(video_url),
'type': video_selector.css('::attr(data-video-type)').get(),
'media': None,
'sizes': None,
})
# 其他视频相关属性
video_data.update({
'poster': video_selector.css('::attr(data-poster)').get(),
'duration': video_selector.css('::attr(data-duration)').get(),
'title': video_selector.css('::attr(data-title)').get(),
'description': video_selector.css('::attr(data-description)').get(),
})
return video_data if video_data['sources'] else None
def extract_audio_data(self, audio_selector, response):
"""
提取音频数据
"""
audio_data = {
'type': 'audio',
'sources': [],
'controls': False,
'autoplay': False,
'loop': False,
'muted': False,
'preload': None,
}
# 检查是否为audio标签
if audio_selector.css('audio'):
audio_element = audio_selector.css('audio')
# 基本属性
audio_data.update({
'controls': audio_element.css('::attr(controls)').get() is not None,
'autoplay': audio_element.css('::attr(autoplay)').get() is not None,
'loop': audio_element.css('::attr(loop)').get() is not None,
'muted': audio_element.css('::attr(muted)').get() is not None,
'preload': audio_element.css('::attr(preload)').get(),
'duration': audio_element.css('::attr(duration)').get(),
})
# 提取音频源
sources = audio_element.css('source')
for source in sources:
source_data = {
'src': response.urljoin(source.css('::attr(src)').get() or ''),
'type': source.css('::attr(type)').get(),
}
audio_data['sources'].append(source_data)
# 如果没有source标签,检查src属性
if not audio_data['sources']:
src = audio_element.css('::attr(src)').get()
if src:
audio_data['sources'].append({
'src': response.urljoin(src),
'type': None,
})
# 检查容器中的音频信息
else:
audio_url = audio_selector.css('::attr(data-audio-url)').get()
if audio_url:
audio_data['sources'].append({
'src': response.urljoin(audio_url),
'type': audio_selector.css('::attr(data-audio-type)').get(),
})
# 其他音频相关属性
audio_data.update({
'title': audio_selector.css('::attr(data-title)').get(),
'artist': audio_selector.css('::attr(data-artist)').get(),
'album': audio_selector.css('::attr(data-album)').get(),
'duration': audio_selector.css('::attr(data-duration)').get(),
})
return audio_data if audio_data['sources'] else None
def extract_embed_data(self, embed_selector, response):
"""
提取嵌入式媒体数据
"""
embed_data = {
'type': 'embed',
'platform': None,
'embed_id': None,
'url': None,
'width': None,
'height': None,
}
# iframe处理
if embed_selector.css('iframe'):
iframe = embed_selector.css('iframe')
src = iframe.css('::attr(src)').get()
if src:
embed_data['url'] = src
embed_data['platform'] = self.detect_platform(src)
embed_data['embed_id'] = self.extract_embed_id(src, embed_data['platform'])
embed_data['width'] = iframe.css('::attr(width)').get()
embed_data['height'] = iframe.css('::attr(height)').get()
# embed标签处理
elif embed_selector.css('embed'):
embed = embed_selector.css('embed')
src = embed.css('::attr(src)').get()
if src:
embed_data['url'] = src
embed_data['platform'] = self.detect_platform(src)
embed_data['embed_id'] = self.extract_embed_id(src, embed_data['platform'])
embed_data['width'] = embed.css('::attr(width)').get()
embed_data['height'] = embed.css('::attr(height)').get()
# object标签处理
elif embed_selector.css('object'):
obj = embed_selector.css('object')
data = obj.css('::attr(data)').get()
if data:
embed_data['url'] = data
embed_data['platform'] = self.detect_platform(data)
embed_data['embed_id'] = self.extract_embed_id(data, embed_data['platform'])
embed_data['width'] = obj.css('::attr(width)').get()
embed_data['height'] = obj.css('::attr(height)').get()
return embed_data if embed_data['url'] else None
def detect_platform(self, url):
"""
检测媒体平台
"""
if not url:
return None
url_lower = url.lower()
if 'youtube.com' in url_lower or 'youtu.be' in url_lower:
return 'youtube'
elif 'vimeo.com' in url_lower:
return 'vimeo'
elif 'bilibili.com' in url_lower:
return 'bilibili'
elif 'tiktok.com' in url_lower:
return 'tiktok'
elif 'twitter.com' in url_lower or 'x.com' in url_lower:
return 'twitter'
elif 'instagram.com' in url_lower:
return 'instagram'
elif 'facebook.com' in url_lower:
return 'facebook'
elif 'soundcloud.com' in url_lower:
return 'soundcloud'
elif 'spotify.com' in url_lower:
return 'spotify'
else:
return 'unknown'
def extract_embed_id(self, url, platform):
"""
提取嵌入ID
"""
if not url or not platform:
return None
try:
if platform == 'youtube':
# YouTube视频ID提取
patterns = [
r'(?:youtube\.com/watch\?v=|youtu\.be/)([a-zA-Z0-9_-]+)',
r'youtube\.com/embed/([a-zA-Z0-9_-]+)',
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
elif platform == 'vimeo':
# Vimeo视频ID提取
match = re.search(r'vimeo\.com/(?:video/)?(\d+)', url)
if match:
return match.group(1)
elif platform == 'bilibili':
# B站视频ID提取
match = re.search(r'bilibili\.com/video/(BV[a-zA-Z0-9]+)', url)
if match:
return match.group(1)
# 其他平台的ID提取逻辑...
except Exception as e:
self.logger.error(f'嵌入ID提取失败: {e}')
return None
print("视频和音频处理示例完成!")
5. 性能优化技巧
5.1 选择器性能优化
# 9. 选择器性能优化
print("\n⚡ 选择器性能优化:")
class OptimizedSelectorSpider(scrapy.Spider):
name = 'optimized_selector'
start_urls = ['http://example.com/large-page']
def parse(self, response):
"""
优化的选择器使用示例
"""
# 1. 缓存选择器结果
self.demo_selector_caching(response)
# 2. 使用高效的选择器
self.demo_efficient_selectors(response)
# 3. 避免重复查询
self.demo_avoid_duplicate_queries(response)
# 4. 批量处理
self.demo_batch_processing(response)
def demo_selector_caching(self, response):
"""
选择器缓存示例
"""
print("\n1. 选择器缓存:")
# ❌ 低效:重复查询
# for i in range(100):
# products = response.css('div.product') # 每次都重新查询
# if products:
# print(f"产品 {i}: {len(products)}")
# ✅ 高效:缓存结果
products = response.css('div.product') # 只查询一次
for i in range(100):
if products:
print(f"产品 {i}: {len(products)}")
# 缓存复杂选择器
featured_products = products.css('[data-featured="true"]')
sale_products = products.css('[data-sale="true"]')
print(f"特色产品: {len(featured_products)}")
print(f"促销产品: {len(sale_products)}")
def demo_efficient_selectors(self, response):
"""
高效选择器示例
"""
print("\n2. 高效选择器:")
# ✅ 使用ID选择器(最快)
header = response.css('#header')
# ✅ 使用类选择器
products = response.css('.product')
# ✅ 使用属性选择器
featured = response.css('[data-featured="true"]')
# ❌ 避免复杂的后代选择器
# slow_selector = response.css('div div div span.price')
# ✅ 使用更直接的选择器
prices = response.css('.product .price')
# ✅ 使用XPath的优势
# 当需要复杂逻辑时,XPath可能更高效
expensive_products = response.xpath('//div[@class="product"][@data-price > 100]')
print(f"头部元素: {len(header)}")
print(f"产品数量: {len(products)}")
print(f"特色产品: {len(featured)}")
print(f"价格元素: {len(prices)}")
print(f"高价产品: {len(expensive_products)}")
def demo_avoid_duplicate_queries(self, response):
"""
避免重复查询示例
"""
print("\n3. 避免重复查询:")
# ❌ 低效:重复查询相同元素
# for product in response.css('div.product'):
# name = product.css('h3.name::text').get()
# price = product.css('span.price::text').get()
# rating = product.css('div.rating span::text').get()
# # 每次都重新查询product
# ✅ 高效:一次性提取所有需要的数据
products = response.css('div.product')
for product in products:
# 一次性提取所有数据
product_data = {
'name': product.css('h3.name::text').get(),
'price': product.css('span.price::text').get(),
'rating': product.css('div.rating span::text').get(),
'image': product.css('img::attr(src)').get(),
'url': product.css('a::attr(href)').get(),
}
# 处理数据
if product_data['name'] and product_data['price']:
yield product_data
def demo_batch_processing(self, response):
"""
批量处理示例
"""
print("\n4. 批量处理:")
# ✅ 批量提取文本
names = response.css('div.product h3.name::text').getall()
prices = response.css('div.product span.price::text').getall()
ratings = response.css('div.product div.rating span::text').getall()
# 批量处理
for i, (name, price, rating) in enumerate(zip(names, prices, ratings)):
if name and price:
yield {
'id': i,
'name': name.strip(),
'price': self.parse_price(price),
'rating': self.parse_rating(rating),
}
def parse_price(self, price_text):
"""解析价格"""
if not price_text:
return None
return float(re.sub(r'[^\d.]', '', price_text))
def parse_rating(self, rating_text):
"""解析评分"""
if not rating_text:
return None
try:
return float(rating_text)
except ValueError:
return None
print("选择器性能优化示例完成!")
5.2 内存优化
# 10. 内存优化技术
print("\n💾 内存优化技术:")
class MemoryOptimizedSpider(scrapy.Spider):
name = 'memory_optimized'
start_urls = ['http://example.com/large-dataset']
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.processed_count = 0
self.batch_size = 100
self.current_batch = []
def parse(self, response):
"""
内存优化的数据处理
"""
# 使用生成器而不是列表
for item in self.extract_items_generator(response):
self.current_batch.append(item)
# 批量处理,避免内存积累
if len(self.current_batch) >= self.batch_size:
yield from self.process_batch()
self.current_batch = []
# 处理剩余项目
if self.current_batch:
yield from self.process_batch()
def extract_items_generator(self, response):
"""
使用生成器提取数据,节省内存
"""
products = response.css('div.product')
for product in products:
# 立即处理,不存储在内存中
item_data = self.extract_single_item(product, response)
if item_data:
yield item_data
# 清理不需要的引用
del product
def extract_single_item(self, selector, response):
"""
提取单个项目,最小化内存使用
"""
try:
# 只提取必要的数据
item = {
'id': selector.css('::attr(data-id)').get(),
'name': selector.css('h3::text').get(),
'price': selector.css('.price::text').get(),
}
# 立即清洗数据,减少存储
if item['name']:
item['name'] = item['name'].strip()[:100] # 限制长度
if item['price']:
item['price'] = self.parse_price(item['price'])
return item if item['id'] else None
except Exception as e:
self.logger.error(f'项目提取失败: {e}')
return None
def process_batch(self):
"""
批量处理数据
"""
# 批量验证和清洗
valid_items = []
for item in self.current_batch:
if self.validate_item(item):
valid_items.append(item)
# 批量输出
for item in valid_items:
self.processed_count += 1
yield item
# 清理内存
del valid_items
# 记录进度
if self.processed_count % 1000 == 0:
self.logger.info(f'已处理 {self.processed_count} 个项目')
def validate_item(self, item):
"""
轻量级验证
"""
return (item.get('id') and
item.get('name') and
item.get('price') is not None)
def parse_price(self, price_text):
"""
解析价格
"""
if not price_text:
return None
try:
return float(re.sub(r'[^\d.]', '', price_text))
except ValueError:
return None
print("内存优化技术示例完成!")
6. 本章小结
本章深入介绍了Scrapy中的高级数据提取与处理技术,主要内容包括:
6.1 核心技术要点
高级选择器技术
- CSS选择器的属性、伪类和组合使用
- XPath的轴操作、函数和复杂查询
- 选择器性能优化技巧
复杂数据结构处理
- 嵌套数据的层级提取
- 评论和回复的递归处理
- 分类树和表格数据的结构化提取
数据清洗和验证
- 文本、HTML、价格、日期等数据的清洗
- 数据类型、格式、范围和业务逻辑验证
- 数据质量控制和统计
多媒体内容处理
- 图片的提取、分析和缩略图生成
- 视频和音频的元数据提取
- 嵌入式媒体的平台识别和ID提取
性能优化技巧
- 选择器缓存和批量处理
- 内存优化和生成器使用
- 避免重复查询和不必要的计算
6.2 实际应用场景
- 电商网站:产品信息、价格、评论的完整提取
- 新闻媒体:文章内容、图片、视频的多媒体抓取
- 社交平台:用户生成内容、评论回复的层级处理
- 分类目录:复杂分类结构的递归提取
7. 最佳实践
7.1 选择器使用原则
# 选择器最佳实践
best_practices = {
"性能优先": "优先使用ID和类选择器",
"缓存结果": "避免重复查询相同元素",
"批量处理": "一次性提取多个相关数据",
"错误处理": "始终验证选择器结果",
"可读性": "使用清晰的选择器表达式"
}
7.2 数据处理原则
# 数据处理最佳实践
data_practices = {
"立即清洗": "在提取时就进行基本清洗",
"分层验证": "实施多层次的数据验证",
"内存管理": "使用生成器和批量处理",
"错误记录": "详细记录处理错误和统计",
"性能监控": "监控处理速度和资源使用"
}
8. 常见陷阱
8.1 性能陷阱
- 重复查询:在循环中重复使用相同选择器
- 内存泄漏:大量数据未及时清理
- 复杂选择器:过度复杂的CSS或XPath表达式
8.2 数据质量陷阱
- 过度清洗:清洗掉有用信息
- 验证不足:缺乏充分的数据验证
- 编码问题:字符编码处理不当
9. 下一步学习
完成本章后,建议继续学习:
- 第5章:中间件开发 - 学习自定义中间件
- 第6章:分布式爬虫 - 了解Scrapy-Redis
- 第7章:反爬虫对抗 - 学习反爬虫技术
- 第8章:数据存储 - 掌握多种存储方案
10. 练习题
10.1 基础练习
- 编写一个Spider提取电商网站的产品信息,包括名称、价格、图片、评分
- 实现一个评论系统的数据提取,支持多层回复
- 创建一个新闻网站的爬虫,提取文章内容和多媒体资源
10.2 进阶练习
- 设计一个通用的数据清洗框架,支持多种数据类型
- 实现一个性能优化的大规模数据提取系统
- 开发一个多媒体内容的智能分析和处理工具
10.3 项目练习
- 综合电商爬虫:构建一个完整的电商数据提取系统
- 社交媒体分析:开发社交平台内容和用户行为分析工具
- 新闻聚合平台:创建多源新闻内容的聚合和处理系统
恭喜! 您已经掌握了Scrapy的高级数据提取与处理技术。这些技能将帮助您处理各种复杂的数据提取场景,构建高效、可靠的爬虫系统。继续学习下一章,探索中间件的强大功能!