本章目标

通过本章学习,您将能够: - 掌握高级选择器技术 - 学会处理复杂的数据结构 - 实现数据清洗和验证 - 处理多媒体内容 - 优化数据提取性能

1. 高级选择器技术

1.1 CSS选择器进阶

import scrapy
import re
from urllib.parse import urljoin

class AdvancedCSSSpider(scrapy.Spider):
    name = 'advanced_css'
    start_urls = ['http://example.com/products']
    
    def parse(self, response):
        """
        高级CSS选择器示例
        """
        print("🎯 高级CSS选择器技术:")
        
        # 1. 属性选择器
        self.demo_attribute_selectors(response)
        
        # 2. 伪类选择器
        self.demo_pseudo_selectors(response)
        
        # 3. 组合选择器
        self.demo_combinator_selectors(response)
        
        # 4. 高级文本处理
        self.demo_text_processing(response)
    
    def demo_attribute_selectors(self, response):
        """
        属性选择器示例
        """
        print("\n1. 属性选择器:")
        
        # 精确匹配
        featured_products = response.css('div[data-featured="true"]')
        print(f"特色产品数量: {len(featured_products)}")
        
        # 包含匹配
        sale_products = response.css('div[class*="sale"]')
        print(f"促销产品数量: {len(sale_products)}")
        
        # 开始匹配
        category_electronics = response.css('div[data-category^="electronics"]')
        print(f"电子产品数量: {len(category_electronics)}")
        
        # 结束匹配
        image_jpg = response.css('img[src$=".jpg"]')
        print(f"JPG图片数量: {len(image_jpg)}")
        
        # 多属性组合
        premium_featured = response.css('div[data-featured="true"][data-premium="true"]')
        print(f"高级特色产品: {len(premium_featured)}")
        
        # 提取属性值
        for product in featured_products[:3]:
            product_data = {
                'id': product.css('::attr(data-id)').get(),
                'name': product.css('h3.product-name::text').get(),
                'price': product.css('span.price::attr(data-price)').get(),
                'category': product.css('::attr(data-category)').get(),
                'rating': product.css('div.rating::attr(data-rating)').get(),
            }
            print(f"产品数据: {product_data}")
    
    def demo_pseudo_selectors(self, response):
        """
        伪类选择器示例
        """
        print("\n2. 伪类选择器:")
        
        # 位置选择器
        first_product = response.css('div.product:first-child')
        last_product = response.css('div.product:last-child')
        third_product = response.css('div.product:nth-child(3)')
        even_products = response.css('div.product:nth-child(even)')
        odd_products = response.css('div.product:nth-child(odd)')
        
        print(f"第一个产品: {len(first_product)}")
        print(f"最后一个产品: {len(last_product)}")
        print(f"第三个产品: {len(third_product)}")
        print(f"偶数位产品: {len(even_products)}")
        print(f"奇数位产品: {len(odd_products)}")
        
        # 类型选择器
        first_h3 = response.css('h3:first-of-type')
        last_p = response.css('p:last-of-type')
        
        print(f"第一个h3: {len(first_h3)}")
        print(f"最后一个p: {len(last_p)}")
        
        # 内容选择器
        empty_divs = response.css('div:empty')
        non_empty_divs = response.css('div:not(:empty)')
        
        print(f"空div数量: {len(empty_divs)}")
        print(f"非空div数量: {len(non_empty_divs)}")
    
    def demo_combinator_selectors(self, response):
        """
        组合选择器示例
        """
        print("\n3. 组合选择器:")
        
        # 后代选择器
        product_titles = response.css('div.product h3.title')
        print(f"产品标题数量: {len(product_titles)}")
        
        # 子选择器
        direct_children = response.css('div.container > div.product')
        print(f"直接子产品: {len(direct_children)}")
        
        # 相邻兄弟选择器
        price_after_title = response.css('h3.title + span.price')
        print(f"标题后的价格: {len(price_after_title)}")
        
        # 通用兄弟选择器
        elements_after_title = response.css('h3.title ~ span')
        print(f"标题后的span: {len(elements_after_title)}")
        
        # 复杂组合
        featured_sale_products = response.css('div.product[data-featured="true"] span.price.sale')
        print(f"特色促销产品: {len(featured_sale_products)}")
    
    def demo_text_processing(self, response):
        """
        文本处理示例
        """
        print("\n4. 文本处理:")
        
        # 提取并清洗文本
        descriptions = response.css('div.product-description::text').getall()
        cleaned_descriptions = [self.clean_text(desc) for desc in descriptions]
        
        print(f"原始描述数量: {len(descriptions)}")
        print(f"清洗后描述数量: {len([d for d in cleaned_descriptions if d])}")
        
        # 提取数字
        prices = response.css('span.price::text').getall()
        price_values = [self.extract_price(price) for price in prices]
        valid_prices = [p for p in price_values if p is not None]
        
        print(f"价格文本数量: {len(prices)}")
        print(f"有效价格数量: {len(valid_prices)}")
        if valid_prices:
            print(f"平均价格: ${sum(valid_prices)/len(valid_prices):.2f}")
    
    def clean_text(self, text):
        """
        清洗文本
        """
        if not text:
            return ""
        
        # 移除多余空白
        text = re.sub(r'\s+', ' ', text)
        # 移除首尾空白
        text = text.strip()
        # 移除特殊字符
        text = re.sub(r'[^\w\s\-.,!?()$%]', '', text)
        
        return text if len(text) > 5 else ""
    
    def extract_price(self, price_text):
        """
        提取价格
        """
        if not price_text:
            return None
        
        # 移除货币符号和逗号
        cleaned = re.sub(r'[^\d.]', '', price_text)
        
        try:
            return float(cleaned)
        except ValueError:
            return None

print("高级CSS选择器示例完成!")

1.2 XPath高级技术

# 2. XPath高级技术
print("\n🔍 XPath高级技术:")

class AdvancedXPathSpider(scrapy.Spider):
    name = 'advanced_xpath'
    start_urls = ['http://example.com/complex-page']
    
    def parse(self, response):
        """
        XPath高级技术示例
        """
        # 1. 轴操作
        self.demo_xpath_axes(response)
        
        # 2. 函数使用
        self.demo_xpath_functions(response)
        
        # 3. 条件表达式
        self.demo_xpath_conditions(response)
        
        # 4. 复杂查询
        self.demo_complex_queries(response)
    
    def demo_xpath_axes(self, response):
        """
        XPath轴操作示例
        """
        print("\n1. XPath轴操作:")
        
        # ancestor轴 - 祖先元素
        price_ancestors = response.xpath('//span[@class="price"]/ancestor::div')
        print(f"价格元素的祖先div: {len(price_ancestors)}")
        
        # descendant轴 - 后代元素
        container_descendants = response.xpath('//div[@class="container"]/descendant::span')
        print(f"容器的后代span: {len(container_descendants)}")
        
        # following-sibling轴 - 后续兄弟元素
        title_siblings = response.xpath('//h3[@class="title"]/following-sibling::*')
        print(f"标题的后续兄弟: {len(title_siblings)}")
        
        # preceding-sibling轴 - 前面兄弟元素
        price_preceding = response.xpath('//span[@class="price"]/preceding-sibling::*')
        print(f"价格的前面兄弟: {len(price_preceding)}")
        
        # parent轴 - 父元素
        image_parents = response.xpath('//img/parent::*')
        print(f"图片的父元素: {len(image_parents)}")
        
        # child轴 - 子元素
        product_children = response.xpath('//div[@class="product"]/child::*')
        print(f"产品的子元素: {len(product_children)}")
    
    def demo_xpath_functions(self, response):
        """
        XPath函数示例
        """
        print("\n2. XPath函数:")
        
        # 文本函数
        # contains() - 包含文本
        sale_elements = response.xpath('//*[contains(text(), "Sale")]')
        print(f"包含'Sale'的元素: {len(sale_elements)}")
        
        # starts-with() - 开始文本
        product_ids = response.xpath('//div[starts-with(@id, "product-")]')
        print(f"ID以'product-'开始的div: {len(product_ids)}")
        
        # normalize-space() - 标准化空白
        clean_texts = response.xpath('//p[normalize-space(text())!=""]')
        print(f"非空段落: {len(clean_texts)}")
        
        # string-length() - 字符串长度
        long_titles = response.xpath('//h3[string-length(text()) > 20]')
        print(f"长标题(>20字符): {len(long_titles)}")
        
        # 数值函数
        # position() - 位置
        first_three = response.xpath('//div[@class="product"][position() <= 3]')
        print(f"前三个产品: {len(first_three)}")
        
        # last() - 最后一个
        last_product = response.xpath('//div[@class="product"][position() = last()]')
        print(f"最后一个产品: {len(last_product)}")
        
        # count() - 计数
        product_count = response.xpath('count(//div[@class="product"])').get()
        print(f"产品总数: {product_count}")
    
    def demo_xpath_conditions(self, response):
        """
        XPath条件表达式示例
        """
        print("\n3. XPath条件表达式:")
        
        # 属性条件
        featured_products = response.xpath('//div[@data-featured="true"]')
        print(f"特色产品: {len(featured_products)}")
        
        # 多条件组合
        premium_featured = response.xpath('//div[@data-featured="true" and @data-premium="true"]')
        print(f"高级特色产品: {len(premium_featured)}")
        
        # 或条件
        sale_or_featured = response.xpath('//div[@data-sale="true" or @data-featured="true"]')
        print(f"促销或特色产品: {len(sale_or_featured)}")
        
        # 否定条件
        not_featured = response.xpath('//div[@class="product" and not(@data-featured)]')
        print(f"非特色产品: {len(not_featured)}")
        
        # 数值比较
        expensive_products = response.xpath('//div[@data-price > 100]')
        print(f"高价产品(>100): {len(expensive_products)}")
        
        # 文本条件
        electronics = response.xpath('//div[contains(@data-category, "electronics")]')
        print(f"电子产品: {len(electronics)}")
    
    def demo_complex_queries(self, response):
        """
        复杂查询示例
        """
        print("\n4. 复杂查询:")
        
        # 查找包含特定文本的元素的父级产品
        sale_products = response.xpath('//div[@class="product"][.//text()[contains(., "Sale")]]')
        print(f"包含'Sale'文本的产品: {len(sale_products)}")
        
        # 查找有图片的产品
        products_with_images = response.xpath('//div[@class="product"][.//img]')
        print(f"有图片的产品: {len(products_with_images)}")
        
        # 查找价格在特定范围的产品
        mid_range_products = response.xpath('//div[@class="product"][@data-price >= 50 and @data-price <= 200]')
        print(f"中等价位产品(50-200): {len(mid_range_products)}")
        
        # 查找评分高的产品
        high_rated = response.xpath('//div[@class="product"][.//div[@class="rating"]/@data-rating >= 4]')
        print(f"高评分产品(>=4): {len(high_rated)}")
        
        # 复杂的层级查询
        category_products = response.xpath('''
            //div[@class="category"]
                [h2[contains(text(), "Electronics")]]
                //div[@class="product"]
                    [@data-featured="true"]
        ''')
        print(f"电子类别的特色产品: {len(category_products)}")

print("XPath高级技术示例完成!")

2. 复杂数据结构处理

2.1 嵌套数据提取

# 3. 嵌套数据结构处理
print("\n🏗️ 嵌套数据结构处理:")

class NestedDataSpider(scrapy.Spider):
    name = 'nested_data'
    start_urls = ['http://example.com/nested-content']
    
    def parse(self, response):
        """
        处理嵌套数据结构
        """
        # 1. 多层级产品数据
        self.extract_product_hierarchy(response)
        
        # 2. 评论和回复
        self.extract_comments_with_replies(response)
        
        # 3. 分类和子分类
        self.extract_category_tree(response)
        
        # 4. 表格数据
        self.extract_table_data(response)
    
    def extract_product_hierarchy(self, response):
        """
        提取产品层级数据
        """
        print("\n1. 产品层级数据:")
        
        categories = response.css('div.category')
        
        for category in categories:
            category_data = {
                'name': category.css('h2.category-title::text').get(),
                'description': category.css('p.category-desc::text').get(),
                'subcategories': [],
                'products': []
            }
            
            # 提取子分类
            subcategories = category.css('div.subcategory')
            for subcategory in subcategories:
                subcategory_data = {
                    'name': subcategory.css('h3.subcategory-title::text').get(),
                    'products': []
                }
                
                # 提取子分类下的产品
                products = subcategory.css('div.product')
                for product in products:
                    product_data = self.extract_detailed_product(product, response)
                    if product_data:
                        subcategory_data['products'].append(product_data)
                
                category_data['subcategories'].append(subcategory_data)
            
            # 提取分类直属产品
            direct_products = category.css('> div.products > div.product')
            for product in direct_products:
                product_data = self.extract_detailed_product(product, response)
                if product_data:
                    category_data['products'].append(product_data)
            
            yield category_data
    
    def extract_detailed_product(self, product_selector, response):
        """
        提取详细产品信息
        """
        try:
            # 基本信息
            product_data = {
                'id': product_selector.css('::attr(data-id)').get(),
                'name': product_selector.css('h4.product-name::text').get(),
                'price': self.extract_price_info(product_selector),
                'images': self.extract_image_info(product_selector, response),
                'specifications': self.extract_specifications(product_selector),
                'reviews': self.extract_review_summary(product_selector),
                'availability': self.extract_availability(product_selector),
            }
            
            return product_data
            
        except Exception as e:
            self.logger.error(f'产品数据提取失败: {e}')
            return None
    
    def extract_price_info(self, selector):
        """
        提取价格信息
        """
        price_info = {}
        
        # 当前价格
        current_price = selector.css('span.current-price::text').get()
        if current_price:
            price_info['current'] = self.parse_price(current_price)
        
        # 原价
        original_price = selector.css('span.original-price::text').get()
        if original_price:
            price_info['original'] = self.parse_price(original_price)
        
        # 折扣
        discount = selector.css('span.discount::text').get()
        if discount:
            price_info['discount'] = discount
        
        return price_info
    
    def extract_image_info(self, selector, response):
        """
        提取图片信息
        """
        images = []
        
        # 主图
        main_image = selector.css('img.main-image::attr(src)').get()
        if main_image:
            images.append({
                'type': 'main',
                'url': response.urljoin(main_image),
                'alt': selector.css('img.main-image::attr(alt)').get()
            })
        
        # 缩略图
        thumbnails = selector.css('div.thumbnails img')
        for thumb in thumbnails:
            thumb_url = thumb.css('::attr(src)').get()
            if thumb_url:
                images.append({
                    'type': 'thumbnail',
                    'url': response.urljoin(thumb_url),
                    'alt': thumb.css('::attr(alt)').get()
                })
        
        return images
    
    def extract_specifications(self, selector):
        """
        提取规格信息
        """
        specs = {}
        
        spec_rows = selector.css('table.specifications tr')
        for row in spec_rows:
            key = row.css('td.spec-key::text').get()
            value = row.css('td.spec-value::text').get()
            
            if key and value:
                specs[key.strip()] = value.strip()
        
        return specs
    
    def extract_review_summary(self, selector):
        """
        提取评论摘要
        """
        review_info = {}
        
        # 平均评分
        rating = selector.css('div.rating span.score::text').get()
        if rating:
            try:
                review_info['average_rating'] = float(rating)
            except ValueError:
                pass
        
        # 评论数量
        count = selector.css('div.rating span.count::text').get()
        if count:
            review_info['review_count'] = self.extract_number(count)
        
        # 评分分布
        rating_bars = selector.css('div.rating-distribution div.rating-bar')
        distribution = {}
        for bar in rating_bars:
            stars = bar.css('::attr(data-stars)').get()
            percentage = bar.css('::attr(data-percentage)').get()
            if stars and percentage:
                distribution[f'{stars}_stars'] = float(percentage)
        
        if distribution:
            review_info['rating_distribution'] = distribution
        
        return review_info
    
    def extract_availability(self, selector):
        """
        提取库存信息
        """
        availability = {}
        
        # 库存状态
        status = selector.css('span.stock-status::text').get()
        if status:
            availability['status'] = status.strip()
        
        # 库存数量
        quantity = selector.css('span.stock-quantity::text').get()
        if quantity:
            availability['quantity'] = self.extract_number(quantity)
        
        # 预计发货时间
        shipping = selector.css('span.shipping-time::text').get()
        if shipping:
            availability['shipping_time'] = shipping.strip()
        
        return availability
    
    def parse_price(self, price_text):
        """
        解析价格
        """
        if not price_text:
            return None
        
        # 移除货币符号和空格
        cleaned = re.sub(r'[^\d.]', '', price_text)
        
        try:
            return float(cleaned)
        except ValueError:
            return None
    
    def extract_number(self, text):
        """
        提取数字
        """
        if not text:
            return None
        
        match = re.search(r'\d+', text)
        return int(match.group()) if match else None

print("嵌套数据结构处理示例完成!")

2.2 评论和回复处理

# 4. 评论和回复处理
print("\n💬 评论和回复处理:")

def extract_comments_with_replies(self, response):
    """
    提取评论和回复
    """
    print("\n2. 评论和回复数据:")
    
    comment_sections = response.css('div.comments-section')
    
    for section in comment_sections:
        comments_data = {
            'product_id': section.css('::attr(data-product-id)').get(),
            'total_comments': len(section.css('div.comment')),
            'comments': []
        }
        
        # 提取顶级评论
        top_level_comments = section.css('div.comment.top-level')
        
        for comment in top_level_comments:
            comment_data = self.extract_single_comment(comment, response)
            
            # 提取回复
            replies = comment.css('div.replies div.comment.reply')
            comment_data['replies'] = []
            
            for reply in replies:
                reply_data = self.extract_single_comment(reply, response, is_reply=True)
                
                # 提取嵌套回复
                nested_replies = reply.css('div.nested-replies div.comment.nested-reply')
                reply_data['nested_replies'] = []
                
                for nested_reply in nested_replies:
                    nested_data = self.extract_single_comment(nested_reply, response, is_nested=True)
                    reply_data['nested_replies'].append(nested_data)
                
                comment_data['replies'].append(reply_data)
            
            comments_data['comments'].append(comment_data)
        
        yield comments_data

def extract_single_comment(self, comment_selector, response, is_reply=False, is_nested=False):
    """
    提取单个评论
    """
    comment_data = {
        'id': comment_selector.css('::attr(data-comment-id)').get(),
        'author': self.extract_author_info(comment_selector),
        'content': self.extract_comment_content(comment_selector),
        'timestamp': comment_selector.css('time.comment-time::attr(datetime)').get(),
        'rating': self.extract_comment_rating(comment_selector),
        'helpful_votes': self.extract_helpful_votes(comment_selector),
        'is_reply': is_reply,
        'is_nested': is_nested,
    }
    
    # 提取附件
    attachments = self.extract_comment_attachments(comment_selector, response)
    if attachments:
        comment_data['attachments'] = attachments
    
    return comment_data

def extract_author_info(self, selector):
    """
    提取作者信息
    """
    author_info = {
        'name': selector.css('span.author-name::text').get(),
        'avatar': selector.css('img.author-avatar::attr(src)').get(),
        'verified': selector.css('span.verified-badge').get() is not None,
        'level': selector.css('span.author-level::text').get(),
    }
    
    return author_info

def extract_comment_content(self, selector):
    """
    提取评论内容
    """
    # 提取文本内容
    text_content = selector.css('div.comment-text::text').getall()
    content = ' '.join([text.strip() for text in text_content if text.strip()])
    
    # 提取提及的用户
    mentions = selector.css('div.comment-text a.mention::text').getall()
    
    # 提取标签
    tags = selector.css('div.comment-text span.tag::text').getall()
    
    return {
        'text': content,
        'mentions': mentions,
        'tags': tags,
        'length': len(content)
    }

def extract_comment_rating(self, selector):
    """
    提取评论评分
    """
    rating_element = selector.css('div.comment-rating')
    if not rating_element:
        return None
    
    # 星级评分
    stars = len(rating_element.css('span.star.filled'))
    
    # 数值评分
    rating_value = rating_element.css('span.rating-value::text').get()
    
    return {
        'stars': stars,
        'value': float(rating_value) if rating_value else None
    }

def extract_helpful_votes(self, selector):
    """
    提取有用投票
    """
    helpful_section = selector.css('div.helpful-votes')
    if not helpful_section:
        return None
    
    helpful_count = helpful_section.css('span.helpful-count::text').get()
    total_votes = helpful_section.css('span.total-votes::text').get()
    
    return {
        'helpful': self.extract_number(helpful_count) if helpful_count else 0,
        'total': self.extract_number(total_votes) if total_votes else 0
    }

def extract_comment_attachments(self, selector, response):
    """
    提取评论附件
    """
    attachments = []
    
    # 图片附件
    images = selector.css('div.comment-images img')
    for img in images:
        img_url = img.css('::attr(src)').get()
        if img_url:
            attachments.append({
                'type': 'image',
                'url': response.urljoin(img_url),
                'alt': img.css('::attr(alt)').get(),
                'thumbnail': img.css('::attr(data-thumbnail)').get()
            })
    
    # 视频附件
    videos = selector.css('div.comment-videos video')
    for video in videos:
        video_url = video.css('::attr(src)').get()
        if video_url:
            attachments.append({
                'type': 'video',
                'url': response.urljoin(video_url),
                'poster': video.css('::attr(poster)').get(),
                'duration': video.css('::attr(data-duration)').get()
            })
    
    return attachments

print("评论和回复处理示例完成!")

3. 数据清洗和验证

3.1 数据清洗技术

# 5. 数据清洗技术
print("\n🧹 数据清洗技术:")

import re
from datetime import datetime
from urllib.parse import urljoin, urlparse
import html

class DataCleaningSpider(scrapy.Spider):
    name = 'data_cleaning'
    start_urls = ['http://example.com/messy-data']
    
    def parse(self, response):
        """
        数据清洗示例
        """
        raw_items = response.css('div.item')
        
        for item in raw_items:
            # 提取原始数据
            raw_data = self.extract_raw_data(item, response)
            
            # 清洗数据
            cleaned_data = self.clean_item_data(raw_data)
            
            # 验证数据
            if self.validate_item_data(cleaned_data):
                yield cleaned_data
            else:
                self.logger.warning(f'数据验证失败: {cleaned_data.get("id", "unknown")}')
    
    def extract_raw_data(self, selector, response):
        """
        提取原始数据
        """
        return {
            'id': selector.css('::attr(data-id)').get(),
            'title': selector.css('h3.title::text').get(),
            'description': selector.css('div.description').get(),
            'price': selector.css('span.price::text').get(),
            'date': selector.css('time.date::text').get(),
            'url': selector.css('a.link::attr(href)').get(),
            'email': selector.css('span.email::text').get(),
            'phone': selector.css('span.phone::text').get(),
            'tags': selector.css('div.tags span.tag::text').getall(),
            'rating': selector.css('div.rating::attr(data-rating)').get(),
            'images': selector.css('div.images img::attr(src)').getall(),
        }
    
    def clean_item_data(self, raw_data):
        """
        清洗数据
        """
        cleaned = {}
        
        # 清洗ID
        cleaned['id'] = self.clean_id(raw_data.get('id'))
        
        # 清洗标题
        cleaned['title'] = self.clean_text(raw_data.get('title'))
        
        # 清洗描述
        cleaned['description'] = self.clean_html_content(raw_data.get('description'))
        
        # 清洗价格
        cleaned['price'] = self.clean_price(raw_data.get('price'))
        
        # 清洗日期
        cleaned['date'] = self.clean_date(raw_data.get('date'))
        
        # 清洗URL
        cleaned['url'] = self.clean_url(raw_data.get('url'))
        
        # 清洗邮箱
        cleaned['email'] = self.clean_email(raw_data.get('email'))
        
        # 清洗电话
        cleaned['phone'] = self.clean_phone(raw_data.get('phone'))
        
        # 清洗标签
        cleaned['tags'] = self.clean_tags(raw_data.get('tags', []))
        
        # 清洗评分
        cleaned['rating'] = self.clean_rating(raw_data.get('rating'))
        
        # 清洗图片URL
        cleaned['images'] = self.clean_image_urls(raw_data.get('images', []))
        
        return cleaned
    
    def clean_id(self, id_value):
        """
        清洗ID
        """
        if not id_value:
            return None
        
        # 移除非字母数字字符
        cleaned = re.sub(r'[^\w\-]', '', str(id_value))
        
        return cleaned if cleaned else None
    
    def clean_text(self, text):
        """
        清洗文本
        """
        if not text:
            return ""
        
        # HTML解码
        text = html.unescape(text)
        
        # 移除多余空白
        text = re.sub(r'\s+', ' ', text)
        
        # 移除首尾空白
        text = text.strip()
        
        # 移除控制字符
        text = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', text)
        
        return text
    
    def clean_html_content(self, html_content):
        """
        清洗HTML内容
        """
        if not html_content:
            return ""
        
        # 移除脚本和样式
        html_content = re.sub(r'<script[^>]*>.*?</script>', '', html_content, flags=re.DOTALL | re.IGNORECASE)
        html_content = re.sub(r'<style[^>]*>.*?</style>', '', html_content, flags=re.DOTALL | re.IGNORECASE)
        
        # 移除HTML标签
        from w3lib.html import remove_tags
        text = remove_tags(html_content)
        
        # 清洗文本
        return self.clean_text(text)
    
    def clean_price(self, price_text):
        """
        清洗价格
        """
        if not price_text:
            return None
        
        # 移除货币符号和空格
        cleaned = re.sub(r'[^\d.,]', '', price_text)
        
        # 处理千分位分隔符
        if ',' in cleaned and '.' in cleaned:
            # 假设最后一个点是小数点
            parts = cleaned.split('.')
            if len(parts) == 2 and len(parts[1]) <= 2:
                # 移除逗号
                cleaned = cleaned.replace(',', '')
            else:
                # 逗号是小数点
                cleaned = cleaned.replace(',', '.')
        elif ',' in cleaned:
            # 检查是否为千分位分隔符
            if len(cleaned.split(',')[-1]) == 3:
                cleaned = cleaned.replace(',', '')
            else:
                cleaned = cleaned.replace(',', '.')
        
        try:
            return float(cleaned)
        except ValueError:
            return None
    
    def clean_date(self, date_text):
        """
        清洗日期
        """
        if not date_text:
            return None
        
        # 常见日期格式
        date_patterns = [
            r'(\d{4})-(\d{1,2})-(\d{1,2})',  # YYYY-MM-DD
            r'(\d{1,2})/(\d{1,2})/(\d{4})',  # MM/DD/YYYY
            r'(\d{1,2})-(\d{1,2})-(\d{4})',  # MM-DD-YYYY
            r'(\d{4})年(\d{1,2})月(\d{1,2})日',  # 中文格式
        ]
        
        for pattern in date_patterns:
            match = re.search(pattern, date_text)
            if match:
                try:
                    if '年' in pattern:
                        # 中文格式
                        year, month, day = match.groups()
                        return f'{year}-{month.zfill(2)}-{day.zfill(2)}'
                    elif pattern.startswith(r'(\d{4})'):
                        # YYYY-MM-DD
                        year, month, day = match.groups()
                        return f'{year}-{month.zfill(2)}-{day.zfill(2)}'
                    else:
                        # MM/DD/YYYY or MM-DD-YYYY
                        month, day, year = match.groups()
                        return f'{year}-{month.zfill(2)}-{day.zfill(2)}'
                except:
                    continue
        
        # 尝试使用dateutil解析
        try:
            from dateutil import parser
            parsed_date = parser.parse(date_text)
            return parsed_date.strftime('%Y-%m-%d')
        except:
            return None
    
    def clean_url(self, url):
        """
        清洗URL
        """
        if not url:
            return None
        
        # 移除首尾空白
        url = url.strip()
        
        # 添加协议
        if url.startswith('//'):
            url = 'http:' + url
        elif not url.startswith(('http://', 'https://')):
            url = 'http://' + url
        
        # 验证URL格式
        try:
            parsed = urlparse(url)
            if parsed.netloc:
                return url
        except:
            pass
        
        return None
    
    def clean_email(self, email):
        """
        清洗邮箱
        """
        if not email:
            return None
        
        # 移除空白
        email = email.strip()
        
        # 邮箱格式验证
        email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        
        if re.match(email_pattern, email):
            return email.lower()
        
        return None
    
    def clean_phone(self, phone):
        """
        清洗电话号码
        """
        if not phone:
            return None
        
        # 移除非数字字符
        digits = re.sub(r'[^\d]', '', phone)
        
        # 验证长度
        if 10 <= len(digits) <= 15:
            return digits
        
        return None
    
    def clean_tags(self, tags):
        """
        清洗标签
        """
        if not tags:
            return []
        
        cleaned_tags = []
        
        for tag in tags:
            if tag:
                # 清洗单个标签
                cleaned_tag = self.clean_text(tag)
                
                # 验证标签
                if cleaned_tag and len(cleaned_tag) <= 50:
                    cleaned_tags.append(cleaned_tag.lower())
        
        # 去重并排序
        return sorted(list(set(cleaned_tags)))
    
    def clean_rating(self, rating):
        """
        清洗评分
        """
        if not rating:
            return None
        
        try:
            rating_value = float(rating)
            
            # 验证评分范围
            if 0 <= rating_value <= 5:
                return round(rating_value, 1)
        except ValueError:
            pass
        
        return None
    
    def clean_image_urls(self, image_urls):
        """
        清洗图片URL
        """
        if not image_urls:
            return []
        
        cleaned_urls = []
        
        for url in image_urls:
            cleaned_url = self.clean_url(url)
            
            if cleaned_url:
                # 验证是否为图片URL
                if self.is_image_url(cleaned_url):
                    cleaned_urls.append(cleaned_url)
        
        return cleaned_urls
    
    def is_image_url(self, url):
        """
        验证是否为图片URL
        """
        image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.svg']
        
        parsed_url = urlparse(url.lower())
        path = parsed_url.path
        
        return any(path.endswith(ext) for ext in image_extensions)
    
    def validate_item_data(self, data):
        """
        验证数据完整性
        """
        # 必需字段验证
        required_fields = ['id', 'title']
        
        for field in required_fields:
            if not data.get(field):
                self.logger.warning(f'缺少必需字段: {field}')
                return False
        
        # 数据类型验证
        if data.get('price') is not None and not isinstance(data['price'], (int, float)):
            self.logger.warning('价格数据类型错误')
            return False
        
        if data.get('rating') is not None and not isinstance(data['rating'], (int, float)):
            self.logger.warning('评分数据类型错误')
            return False
        
        # 数据范围验证
        if data.get('rating') is not None and not (0 <= data['rating'] <= 5):
            self.logger.warning('评分超出范围')
            return False
        
        return True

print("数据清洗技术示例完成!")

3.2 数据验证和质量控制

# 6. 数据验证和质量控制
print("\n✅ 数据验证和质量控制:")

class DataValidationSpider(scrapy.Spider):
    name = 'data_validation'
    start_urls = ['http://example.com/products']
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.validation_stats = {
            'total_items': 0,
            'valid_items': 0,
            'invalid_items': 0,
            'validation_errors': {}
        }
    
    def parse(self, response):
        """
        带验证的数据提取
        """
        items = response.css('div.product')
        
        for item in items:
            self.validation_stats['total_items'] += 1
            
            # 提取数据
            product_data = self.extract_product_data(item, response)
            
            # 验证数据
            validation_result = self.validate_product_data(product_data)
            
            if validation_result['is_valid']:
                self.validation_stats['valid_items'] += 1
                yield product_data
            else:
                self.validation_stats['invalid_items'] += 1
                self.record_validation_errors(validation_result['errors'])
                
                # 记录无效数据
                self.logger.warning(f'数据验证失败: {product_data.get("id", "unknown")}')
                self.logger.warning(f'错误: {validation_result["errors"]}')
    
    def extract_product_data(self, selector, response):
        """
        提取产品数据
        """
        return {
            'id': selector.css('::attr(data-id)').get(),
            'name': selector.css('h3.product-name::text').get(),
            'price': selector.css('span.price::text').get(),
            'description': selector.css('div.description::text').get(),
            'category': selector.css('span.category::text').get(),
            'brand': selector.css('span.brand::text').get(),
            'rating': selector.css('div.rating::attr(data-rating)').get(),
            'review_count': selector.css('span.review-count::text').get(),
            'availability': selector.css('span.availability::text').get(),
            'images': selector.css('div.images img::attr(src)').getall(),
            'url': response.url,
            'scraped_at': datetime.now().isoformat(),
        }
    
    def validate_product_data(self, data):
        """
        验证产品数据
        """
        errors = []
        
        # 1. 必需字段验证
        required_fields = ['id', 'name', 'price']
        for field in required_fields:
            if not data.get(field):
                errors.append(f'缺少必需字段: {field}')
        
        # 2. 数据类型验证
        errors.extend(self.validate_data_types(data))
        
        # 3. 数据格式验证
        errors.extend(self.validate_data_formats(data))
        
        # 4. 数据范围验证
        errors.extend(self.validate_data_ranges(data))
        
        # 5. 业务逻辑验证
        errors.extend(self.validate_business_logic(data))
        
        return {
            'is_valid': len(errors) == 0,
            'errors': errors
        }
    
    def validate_data_types(self, data):
        """
        验证数据类型
        """
        errors = []
        
        # 价格应为数字
        if data.get('price'):
            try:
                price_value = float(re.sub(r'[^\d.]', '', data['price']))
                data['price_numeric'] = price_value
            except (ValueError, TypeError):
                errors.append('价格格式无效')
        
        # 评分应为数字
        if data.get('rating'):
            try:
                rating_value = float(data['rating'])
                data['rating_numeric'] = rating_value
            except (ValueError, TypeError):
                errors.append('评分格式无效')
        
        # 评论数量应为整数
        if data.get('review_count'):
            try:
                count_value = int(re.sub(r'[^\d]', '', data['review_count']))
                data['review_count_numeric'] = count_value
            except (ValueError, TypeError):
                errors.append('评论数量格式无效')
        
        # 图片应为列表
        if data.get('images') and not isinstance(data['images'], list):
            errors.append('图片数据应为列表')
        
        return errors
    
    def validate_data_formats(self, data):
        """
        验证数据格式
        """
        errors = []
        
        # ID格式验证
        if data.get('id'):
            if not re.match(r'^[a-zA-Z0-9\-_]+$', data['id']):
                errors.append('ID格式无效')
        
        # URL格式验证
        if data.get('url'):
            try:
                parsed = urlparse(data['url'])
                if not parsed.netloc:
                    errors.append('URL格式无效')
            except:
                errors.append('URL解析失败')
        
        # 图片URL验证
        if data.get('images'):
            for img_url in data['images']:
                if not self.is_valid_image_url(img_url):
                    errors.append(f'无效图片URL: {img_url}')
        
        return errors
    
    def validate_data_ranges(self, data):
        """
        验证数据范围
        """
        errors = []
        
        # 价格范围验证
        if data.get('price_numeric'):
            if data['price_numeric'] < 0:
                errors.append('价格不能为负数')
            elif data['price_numeric'] > 1000000:
                errors.append('价格过高,可能有误')
        
        # 评分范围验证
        if data.get('rating_numeric'):
            if not (0 <= data['rating_numeric'] <= 5):
                errors.append('评分应在0-5之间')
        
        # 评论数量验证
        if data.get('review_count_numeric'):
            if data['review_count_numeric'] < 0:
                errors.append('评论数量不能为负数')
            elif data['review_count_numeric'] > 1000000:
                errors.append('评论数量过多,可能有误')
        
        # 文本长度验证
        if data.get('name') and len(data['name']) > 200:
            errors.append('产品名称过长')
        
        if data.get('description') and len(data['description']) > 5000:
            errors.append('产品描述过长')
        
        return errors
    
    def validate_business_logic(self, data):
        """
        验证业务逻辑
        """
        errors = []
        
        # 评分和评论数量的逻辑关系
        if (data.get('rating_numeric') and 
            data.get('review_count_numeric') == 0):
            errors.append('有评分但评论数量为0,逻辑不符')
        
        # 库存状态验证
        if data.get('availability'):
            valid_statuses = ['in_stock', 'out_of_stock', 'limited', 'pre_order']
            if data['availability'].lower() not in valid_statuses:
                errors.append('库存状态无效')
        
        # 分类验证
        if data.get('category'):
            valid_categories = [
                'electronics', 'clothing', 'books', 'home', 'sports',
                'beauty', 'toys', 'automotive', 'health', 'food'
            ]
            if data['category'].lower() not in valid_categories:
                errors.append('产品分类无效')
        
        return errors
    
    def is_valid_image_url(self, url):
        """
        验证图片URL
        """
        if not url:
            return False
        
        try:
            parsed = urlparse(url)
            if not parsed.netloc:
                return False
            
            # 检查文件扩展名
            image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp']
            return any(parsed.path.lower().endswith(ext) for ext in image_extensions)
        except:
            return False
    
    def record_validation_errors(self, errors):
        """
        记录验证错误统计
        """
        for error in errors:
            if error in self.validation_stats['validation_errors']:
                self.validation_stats['validation_errors'][error] += 1
            else:
                self.validation_stats['validation_errors'][error] = 1
    
    def closed(self, reason):
        """
        爬虫关闭时输出统计信息
        """
        stats = self.validation_stats
        
        self.logger.info("=== 数据验证统计 ===")
        self.logger.info(f"总数据量: {stats['total_items']}")
        self.logger.info(f"有效数据: {stats['valid_items']}")
        self.logger.info(f"无效数据: {stats['invalid_items']}")
        
        if stats['total_items'] > 0:
            valid_rate = (stats['valid_items'] / stats['total_items']) * 100
            self.logger.info(f"数据有效率: {valid_rate:.2f}%")
        
        if stats['validation_errors']:
            self.logger.info("=== 验证错误统计 ===")
            for error, count in sorted(stats['validation_errors'].items(), 
                                     key=lambda x: x[1], reverse=True):
                self.logger.info(f"{error}: {count}次")

print("数据验证和质量控制示例完成!")

4. 多媒体内容处理

4.1 图片处理

# 7. 图片处理技术
print("\n🖼️ 图片处理技术:")

import base64
from PIL import Image
import io
import hashlib

class ImageProcessingSpider(scrapy.Spider):
    name = 'image_processing'
    start_urls = ['http://example.com/gallery']
    
    def parse(self, response):
        """
        图片处理示例
        """
        # 提取图片信息
        images = response.css('div.gallery img')
        
        for img in images:
            image_data = self.extract_image_data(img, response)
            
            if image_data:
                # 下载和处理图片
                yield scrapy.Request(
                    image_data['url'],
                    callback=self.process_image,
                    meta={'image_data': image_data}
                )
    
    def extract_image_data(self, img_selector, response):
        """
        提取图片数据
        """
        img_url = img_selector.css('::attr(src)').get()
        if not img_url:
            return None
        
        return {
            'url': response.urljoin(img_url),
            'alt': img_selector.css('::attr(alt)').get(),
            'title': img_selector.css('::attr(title)').get(),
            'width': img_selector.css('::attr(width)').get(),
            'height': img_selector.css('::attr(height)').get(),
            'class': img_selector.css('::attr(class)').get(),
            'data_attributes': self.extract_data_attributes(img_selector),
            'parent_context': self.extract_parent_context(img_selector),
        }
    
    def extract_data_attributes(self, selector):
        """
        提取data属性
        """
        data_attrs = {}
        
        # 获取所有属性
        attributes = selector.css('::attr(*)').getall()
        
        for attr in attributes:
            if attr.startswith('data-'):
                key = attr[5:]  # 移除'data-'前缀
                value = selector.css(f'::attr({attr})').get()
                data_attrs[key] = value
        
        return data_attrs
    
    def extract_parent_context(self, selector):
        """
        提取父级上下文
        """
        # 获取父级元素信息
        parent = selector.xpath('./parent::*')
        
        if parent:
            return {
                'tag': parent.xpath('name()').get(),
                'class': parent.css('::attr(class)').get(),
                'id': parent.css('::attr(id)').get(),
            }
        
        return None
    
    def process_image(self, response):
        """
        处理图片响应
        """
        image_data = response.meta['image_data']
        
        try:
            # 验证图片
            if not self.is_valid_image(response):
                self.logger.warning(f'无效图片: {response.url}')
                return
            
            # 分析图片
            image_info = self.analyze_image(response.body)
            
            # 合并数据
            complete_data = {
                **image_data,
                **image_info,
                'file_size': len(response.body),
                'content_type': response.headers.get('Content-Type', b'').decode(),
                'last_modified': response.headers.get('Last-Modified', b'').decode(),
                'etag': response.headers.get('ETag', b'').decode(),
            }
            
            # 生成缩略图
            thumbnail_data = self.generate_thumbnail(response.body)
            if thumbnail_data:
                complete_data['thumbnail'] = thumbnail_data
            
            yield complete_data
            
        except Exception as e:
            self.logger.error(f'图片处理失败 {response.url}: {e}')
    
    def is_valid_image(self, response):
        """
        验证图片有效性
        """
        # 检查Content-Type
        content_type = response.headers.get('Content-Type', b'').decode().lower()
        valid_types = ['image/jpeg', 'image/png', 'image/gif', 'image/webp', 'image/bmp']
        
        if not any(vtype in content_type for vtype in valid_types):
            return False
        
        # 检查文件大小
        if len(response.body) < 100:  # 太小
            return False
        
        if len(response.body) > 10 * 1024 * 1024:  # 太大 (10MB)
            return False
        
        # 尝试打开图片
        try:
            Image.open(io.BytesIO(response.body))
            return True
        except:
            return False
    
    def analyze_image(self, image_data):
        """
        分析图片
        """
        try:
            img = Image.open(io.BytesIO(image_data))
            
            return {
                'format': img.format,
                'mode': img.mode,
                'size': img.size,
                'width': img.width,
                'height': img.height,
                'aspect_ratio': round(img.width / img.height, 2),
                'has_transparency': img.mode in ('RGBA', 'LA') or 'transparency' in img.info,
                'color_count': len(img.getcolors(maxcolors=256)) if img.mode == 'P' else None,
                'file_hash': hashlib.md5(image_data).hexdigest(),
            }
        except Exception as e:
            self.logger.error(f'图片分析失败: {e}')
            return {}
    
    def generate_thumbnail(self, image_data, size=(150, 150)):
        """
        生成缩略图
        """
        try:
            img = Image.open(io.BytesIO(image_data))
            
            # 创建缩略图
            img.thumbnail(size, Image.Resampling.LANCZOS)
            
            # 转换为base64
            buffer = io.BytesIO()
            img.save(buffer, format='JPEG', quality=85)
            thumbnail_b64 = base64.b64encode(buffer.getvalue()).decode()
            
            return {
                'data': thumbnail_b64,
                'size': img.size,
                'format': 'JPEG',
            }
        except Exception as e:
            self.logger.error(f'缩略图生成失败: {e}')
            return None

print("图片处理技术示例完成!")

4.2 视频和音频处理

# 8. 视频和音频处理
print("\n🎥 视频和音频处理:")

class MediaProcessingSpider(scrapy.Spider):
    name = 'media_processing'
    start_urls = ['http://example.com/media']
    
    def parse(self, response):
        """
        媒体处理示例
        """
        # 处理视频
        videos = response.css('video, div.video-container')
        for video in videos:
            video_data = self.extract_video_data(video, response)
            if video_data:
                yield video_data
        
        # 处理音频
        audios = response.css('audio, div.audio-container')
        for audio in audios:
            audio_data = self.extract_audio_data(audio, response)
            if audio_data:
                yield audio_data
        
        # 处理嵌入式媒体
        embeds = response.css('iframe, embed, object')
        for embed in embeds:
            embed_data = self.extract_embed_data(embed, response)
            if embed_data:
                yield embed_data
    
    def extract_video_data(self, video_selector, response):
        """
        提取视频数据
        """
        video_data = {
            'type': 'video',
            'sources': [],
            'poster': None,
            'duration': None,
            'controls': False,
            'autoplay': False,
            'loop': False,
            'muted': False,
        }
        
        # 检查是否为video标签
        if video_selector.css('video'):
            video_element = video_selector.css('video')
            
            # 基本属性
            video_data.update({
                'poster': video_element.css('::attr(poster)').get(),
                'duration': video_element.css('::attr(duration)').get(),
                'controls': video_element.css('::attr(controls)').get() is not None,
                'autoplay': video_element.css('::attr(autoplay)').get() is not None,
                'loop': video_element.css('::attr(loop)').get() is not None,
                'muted': video_element.css('::attr(muted)').get() is not None,
                'width': video_element.css('::attr(width)').get(),
                'height': video_element.css('::attr(height)').get(),
            })
            
            # 提取视频源
            sources = video_element.css('source')
            for source in sources:
                source_data = {
                    'src': response.urljoin(source.css('::attr(src)').get() or ''),
                    'type': source.css('::attr(type)').get(),
                    'media': source.css('::attr(media)').get(),
                    'sizes': source.css('::attr(sizes)').get(),
                }
                video_data['sources'].append(source_data)
            
            # 如果没有source标签,检查src属性
            if not video_data['sources']:
                src = video_element.css('::attr(src)').get()
                if src:
                    video_data['sources'].append({
                        'src': response.urljoin(src),
                        'type': None,
                        'media': None,
                        'sizes': None,
                    })
        
        # 检查容器中的视频信息
        else:
            # 查找data属性中的视频信息
            video_url = video_selector.css('::attr(data-video-url)').get()
            if video_url:
                video_data['sources'].append({
                    'src': response.urljoin(video_url),
                    'type': video_selector.css('::attr(data-video-type)').get(),
                    'media': None,
                    'sizes': None,
                })
            
            # 其他视频相关属性
            video_data.update({
                'poster': video_selector.css('::attr(data-poster)').get(),
                'duration': video_selector.css('::attr(data-duration)').get(),
                'title': video_selector.css('::attr(data-title)').get(),
                'description': video_selector.css('::attr(data-description)').get(),
            })
        
        return video_data if video_data['sources'] else None
    
    def extract_audio_data(self, audio_selector, response):
        """
        提取音频数据
        """
        audio_data = {
            'type': 'audio',
            'sources': [],
            'controls': False,
            'autoplay': False,
            'loop': False,
            'muted': False,
            'preload': None,
        }
        
        # 检查是否为audio标签
        if audio_selector.css('audio'):
            audio_element = audio_selector.css('audio')
            
            # 基本属性
            audio_data.update({
                'controls': audio_element.css('::attr(controls)').get() is not None,
                'autoplay': audio_element.css('::attr(autoplay)').get() is not None,
                'loop': audio_element.css('::attr(loop)').get() is not None,
                'muted': audio_element.css('::attr(muted)').get() is not None,
                'preload': audio_element.css('::attr(preload)').get(),
                'duration': audio_element.css('::attr(duration)').get(),
            })
            
            # 提取音频源
            sources = audio_element.css('source')
            for source in sources:
                source_data = {
                    'src': response.urljoin(source.css('::attr(src)').get() or ''),
                    'type': source.css('::attr(type)').get(),
                }
                audio_data['sources'].append(source_data)
            
            # 如果没有source标签,检查src属性
            if not audio_data['sources']:
                src = audio_element.css('::attr(src)').get()
                if src:
                    audio_data['sources'].append({
                        'src': response.urljoin(src),
                        'type': None,
                    })
        
        # 检查容器中的音频信息
        else:
            audio_url = audio_selector.css('::attr(data-audio-url)').get()
            if audio_url:
                audio_data['sources'].append({
                    'src': response.urljoin(audio_url),
                    'type': audio_selector.css('::attr(data-audio-type)').get(),
                })
            
            # 其他音频相关属性
            audio_data.update({
                'title': audio_selector.css('::attr(data-title)').get(),
                'artist': audio_selector.css('::attr(data-artist)').get(),
                'album': audio_selector.css('::attr(data-album)').get(),
                'duration': audio_selector.css('::attr(data-duration)').get(),
            })
        
        return audio_data if audio_data['sources'] else None
    
    def extract_embed_data(self, embed_selector, response):
        """
        提取嵌入式媒体数据
        """
        embed_data = {
            'type': 'embed',
            'platform': None,
            'embed_id': None,
            'url': None,
            'width': None,
            'height': None,
        }
        
        # iframe处理
        if embed_selector.css('iframe'):
            iframe = embed_selector.css('iframe')
            src = iframe.css('::attr(src)').get()
            
            if src:
                embed_data['url'] = src
                embed_data['platform'] = self.detect_platform(src)
                embed_data['embed_id'] = self.extract_embed_id(src, embed_data['platform'])
                embed_data['width'] = iframe.css('::attr(width)').get()
                embed_data['height'] = iframe.css('::attr(height)').get()
        
        # embed标签处理
        elif embed_selector.css('embed'):
            embed = embed_selector.css('embed')
            src = embed.css('::attr(src)').get()
            
            if src:
                embed_data['url'] = src
                embed_data['platform'] = self.detect_platform(src)
                embed_data['embed_id'] = self.extract_embed_id(src, embed_data['platform'])
                embed_data['width'] = embed.css('::attr(width)').get()
                embed_data['height'] = embed.css('::attr(height)').get()
        
        # object标签处理
        elif embed_selector.css('object'):
            obj = embed_selector.css('object')
            data = obj.css('::attr(data)').get()
            
            if data:
                embed_data['url'] = data
                embed_data['platform'] = self.detect_platform(data)
                embed_data['embed_id'] = self.extract_embed_id(data, embed_data['platform'])
                embed_data['width'] = obj.css('::attr(width)').get()
                embed_data['height'] = obj.css('::attr(height)').get()
        
        return embed_data if embed_data['url'] else None
    
    def detect_platform(self, url):
        """
        检测媒体平台
        """
        if not url:
            return None
        
        url_lower = url.lower()
        
        if 'youtube.com' in url_lower or 'youtu.be' in url_lower:
            return 'youtube'
        elif 'vimeo.com' in url_lower:
            return 'vimeo'
        elif 'bilibili.com' in url_lower:
            return 'bilibili'
        elif 'tiktok.com' in url_lower:
            return 'tiktok'
        elif 'twitter.com' in url_lower or 'x.com' in url_lower:
            return 'twitter'
        elif 'instagram.com' in url_lower:
            return 'instagram'
        elif 'facebook.com' in url_lower:
            return 'facebook'
        elif 'soundcloud.com' in url_lower:
            return 'soundcloud'
        elif 'spotify.com' in url_lower:
            return 'spotify'
        else:
            return 'unknown'
    
    def extract_embed_id(self, url, platform):
        """
        提取嵌入ID
        """
        if not url or not platform:
            return None
        
        try:
            if platform == 'youtube':
                # YouTube视频ID提取
                patterns = [
                    r'(?:youtube\.com/watch\?v=|youtu\.be/)([a-zA-Z0-9_-]+)',
                    r'youtube\.com/embed/([a-zA-Z0-9_-]+)',
                ]
                for pattern in patterns:
                    match = re.search(pattern, url)
                    if match:
                        return match.group(1)
            
            elif platform == 'vimeo':
                # Vimeo视频ID提取
                match = re.search(r'vimeo\.com/(?:video/)?(\d+)', url)
                if match:
                    return match.group(1)
            
            elif platform == 'bilibili':
                # B站视频ID提取
                match = re.search(r'bilibili\.com/video/(BV[a-zA-Z0-9]+)', url)
                if match:
                    return match.group(1)
            
            # 其他平台的ID提取逻辑...
            
        except Exception as e:
            self.logger.error(f'嵌入ID提取失败: {e}')
        
        return None

print("视频和音频处理示例完成!")

5. 性能优化技巧

5.1 选择器性能优化

# 9. 选择器性能优化
print("\n⚡ 选择器性能优化:")

class OptimizedSelectorSpider(scrapy.Spider):
    name = 'optimized_selector'
    start_urls = ['http://example.com/large-page']
    
    def parse(self, response):
        """
        优化的选择器使用示例
        """
        # 1. 缓存选择器结果
        self.demo_selector_caching(response)
        
        # 2. 使用高效的选择器
        self.demo_efficient_selectors(response)
        
        # 3. 避免重复查询
        self.demo_avoid_duplicate_queries(response)
        
        # 4. 批量处理
        self.demo_batch_processing(response)
    
    def demo_selector_caching(self, response):
        """
        选择器缓存示例
        """
        print("\n1. 选择器缓存:")
        
        # ❌ 低效:重复查询
        # for i in range(100):
        #     products = response.css('div.product')  # 每次都重新查询
        #     if products:
        #         print(f"产品 {i}: {len(products)}")
        
        # ✅ 高效:缓存结果
        products = response.css('div.product')  # 只查询一次
        for i in range(100):
            if products:
                print(f"产品 {i}: {len(products)}")
        
        # 缓存复杂选择器
        featured_products = products.css('[data-featured="true"]')
        sale_products = products.css('[data-sale="true"]')
        
        print(f"特色产品: {len(featured_products)}")
        print(f"促销产品: {len(sale_products)}")
    
    def demo_efficient_selectors(self, response):
        """
        高效选择器示例
        """
        print("\n2. 高效选择器:")
        
        # ✅ 使用ID选择器(最快)
        header = response.css('#header')
        
        # ✅ 使用类选择器
        products = response.css('.product')
        
        # ✅ 使用属性选择器
        featured = response.css('[data-featured="true"]')
        
        # ❌ 避免复杂的后代选择器
        # slow_selector = response.css('div div div span.price')
        
        # ✅ 使用更直接的选择器
        prices = response.css('.product .price')
        
        # ✅ 使用XPath的优势
        # 当需要复杂逻辑时,XPath可能更高效
        expensive_products = response.xpath('//div[@class="product"][@data-price > 100]')
        
        print(f"头部元素: {len(header)}")
        print(f"产品数量: {len(products)}")
        print(f"特色产品: {len(featured)}")
        print(f"价格元素: {len(prices)}")
        print(f"高价产品: {len(expensive_products)}")
    
    def demo_avoid_duplicate_queries(self, response):
        """
        避免重复查询示例
        """
        print("\n3. 避免重复查询:")
        
        # ❌ 低效:重复查询相同元素
        # for product in response.css('div.product'):
        #     name = product.css('h3.name::text').get()
        #     price = product.css('span.price::text').get()
        #     rating = product.css('div.rating span::text').get()
        #     # 每次都重新查询product
        
        # ✅ 高效:一次性提取所有需要的数据
        products = response.css('div.product')
        
        for product in products:
            # 一次性提取所有数据
            product_data = {
                'name': product.css('h3.name::text').get(),
                'price': product.css('span.price::text').get(),
                'rating': product.css('div.rating span::text').get(),
                'image': product.css('img::attr(src)').get(),
                'url': product.css('a::attr(href)').get(),
            }
            
            # 处理数据
            if product_data['name'] and product_data['price']:
                yield product_data
    
    def demo_batch_processing(self, response):
        """
        批量处理示例
        """
        print("\n4. 批量处理:")
        
        # ✅ 批量提取文本
        names = response.css('div.product h3.name::text').getall()
        prices = response.css('div.product span.price::text').getall()
        ratings = response.css('div.product div.rating span::text').getall()
        
        # 批量处理
        for i, (name, price, rating) in enumerate(zip(names, prices, ratings)):
            if name and price:
                yield {
                    'id': i,
                    'name': name.strip(),
                    'price': self.parse_price(price),
                    'rating': self.parse_rating(rating),
                }
    
    def parse_price(self, price_text):
        """解析价格"""
        if not price_text:
            return None
        return float(re.sub(r'[^\d.]', '', price_text))
    
    def parse_rating(self, rating_text):
        """解析评分"""
        if not rating_text:
            return None
        try:
            return float(rating_text)
        except ValueError:
            return None

print("选择器性能优化示例完成!")

5.2 内存优化

# 10. 内存优化技术
print("\n💾 内存优化技术:")

class MemoryOptimizedSpider(scrapy.Spider):
    name = 'memory_optimized'
    start_urls = ['http://example.com/large-dataset']
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.processed_count = 0
        self.batch_size = 100
        self.current_batch = []
    
    def parse(self, response):
        """
        内存优化的数据处理
        """
        # 使用生成器而不是列表
        for item in self.extract_items_generator(response):
            self.current_batch.append(item)
            
            # 批量处理,避免内存积累
            if len(self.current_batch) >= self.batch_size:
                yield from self.process_batch()
                self.current_batch = []
        
        # 处理剩余项目
        if self.current_batch:
            yield from self.process_batch()
    
    def extract_items_generator(self, response):
        """
        使用生成器提取数据,节省内存
        """
        products = response.css('div.product')
        
        for product in products:
            # 立即处理,不存储在内存中
            item_data = self.extract_single_item(product, response)
            
            if item_data:
                yield item_data
            
            # 清理不需要的引用
            del product
    
    def extract_single_item(self, selector, response):
        """
        提取单个项目,最小化内存使用
        """
        try:
            # 只提取必要的数据
            item = {
                'id': selector.css('::attr(data-id)').get(),
                'name': selector.css('h3::text').get(),
                'price': selector.css('.price::text').get(),
            }
            
            # 立即清洗数据,减少存储
            if item['name']:
                item['name'] = item['name'].strip()[:100]  # 限制长度
            
            if item['price']:
                item['price'] = self.parse_price(item['price'])
            
            return item if item['id'] else None
            
        except Exception as e:
            self.logger.error(f'项目提取失败: {e}')
            return None
    
    def process_batch(self):
        """
        批量处理数据
        """
        # 批量验证和清洗
        valid_items = []
        
        for item in self.current_batch:
            if self.validate_item(item):
                valid_items.append(item)
        
        # 批量输出
        for item in valid_items:
            self.processed_count += 1
            yield item
        
        # 清理内存
        del valid_items
        
        # 记录进度
        if self.processed_count % 1000 == 0:
            self.logger.info(f'已处理 {self.processed_count} 个项目')
    
    def validate_item(self, item):
        """
        轻量级验证
        """
        return (item.get('id') and 
                item.get('name') and 
                item.get('price') is not None)
    
    def parse_price(self, price_text):
        """
        解析价格
        """
        if not price_text:
            return None
        
        try:
            return float(re.sub(r'[^\d.]', '', price_text))
        except ValueError:
            return None

print("内存优化技术示例完成!")

6. 本章小结

本章深入介绍了Scrapy中的高级数据提取与处理技术,主要内容包括:

6.1 核心技术要点

  1. 高级选择器技术

    • CSS选择器的属性、伪类和组合使用
    • XPath的轴操作、函数和复杂查询
    • 选择器性能优化技巧
  2. 复杂数据结构处理

    • 嵌套数据的层级提取
    • 评论和回复的递归处理
    • 分类树和表格数据的结构化提取
  3. 数据清洗和验证

    • 文本、HTML、价格、日期等数据的清洗
    • 数据类型、格式、范围和业务逻辑验证
    • 数据质量控制和统计
  4. 多媒体内容处理

    • 图片的提取、分析和缩略图生成
    • 视频和音频的元数据提取
    • 嵌入式媒体的平台识别和ID提取
  5. 性能优化技巧

    • 选择器缓存和批量处理
    • 内存优化和生成器使用
    • 避免重复查询和不必要的计算

6.2 实际应用场景

  • 电商网站:产品信息、价格、评论的完整提取
  • 新闻媒体:文章内容、图片、视频的多媒体抓取
  • 社交平台:用户生成内容、评论回复的层级处理
  • 分类目录:复杂分类结构的递归提取

7. 最佳实践

7.1 选择器使用原则

# 选择器最佳实践
best_practices = {
    "性能优先": "优先使用ID和类选择器",
    "缓存结果": "避免重复查询相同元素",
    "批量处理": "一次性提取多个相关数据",
    "错误处理": "始终验证选择器结果",
    "可读性": "使用清晰的选择器表达式"
}

7.2 数据处理原则

# 数据处理最佳实践
data_practices = {
    "立即清洗": "在提取时就进行基本清洗",
    "分层验证": "实施多层次的数据验证",
    "内存管理": "使用生成器和批量处理",
    "错误记录": "详细记录处理错误和统计",
    "性能监控": "监控处理速度和资源使用"
}

8. 常见陷阱

8.1 性能陷阱

  • 重复查询:在循环中重复使用相同选择器
  • 内存泄漏:大量数据未及时清理
  • 复杂选择器:过度复杂的CSS或XPath表达式

8.2 数据质量陷阱

  • 过度清洗:清洗掉有用信息
  • 验证不足:缺乏充分的数据验证
  • 编码问题:字符编码处理不当

9. 下一步学习

完成本章后,建议继续学习:

  1. 第5章:中间件开发 - 学习自定义中间件
  2. 第6章:分布式爬虫 - 了解Scrapy-Redis
  3. 第7章:反爬虫对抗 - 学习反爬虫技术
  4. 第8章:数据存储 - 掌握多种存储方案

10. 练习题

10.1 基础练习

  1. 编写一个Spider提取电商网站的产品信息,包括名称、价格、图片、评分
  2. 实现一个评论系统的数据提取,支持多层回复
  3. 创建一个新闻网站的爬虫,提取文章内容和多媒体资源

10.2 进阶练习

  1. 设计一个通用的数据清洗框架,支持多种数据类型
  2. 实现一个性能优化的大规模数据提取系统
  3. 开发一个多媒体内容的智能分析和处理工具

10.3 项目练习

  1. 综合电商爬虫:构建一个完整的电商数据提取系统
  2. 社交媒体分析:开发社交平台内容和用户行为分析工具
  3. 新闻聚合平台:创建多源新闻内容的聚合和处理系统

恭喜! 您已经掌握了Scrapy的高级数据提取与处理技术。这些技能将帮助您处理各种复杂的数据提取场景,构建高效、可靠的爬虫系统。继续学习下一章,探索中间件的强大功能!