学习目标

通过本章学习,您将掌握:

  • 理解分布式爬虫的概念和优势
  • 掌握Scrapy-Redis的安装和配置
  • 学会实现分布式爬虫架构
  • 了解Redis在爬虫中的应用场景
  • 掌握分布式爬虫的监控和管理

1. 分布式爬虫基础

1.1 什么是分布式爬虫

# 1. 分布式爬虫概念演示
print("🌐 分布式爬虫基础概念:")

# 分布式爬虫的优势
distributed_advantages = {
    "性能提升": "多台机器并行处理,提高爬取速度",
    "容错能力": "单台机器故障不影响整体爬取",
    "资源利用": "充分利用多台机器的计算资源",
    "扩展性": "可以根据需要动态增加爬虫节点",
    "负载分担": "将爬取任务分散到多个节点"
}

for advantage, description in distributed_advantages.items():
    print(f"  {advantage}: {description}")

# 分布式爬虫的挑战
distributed_challenges = {
    "任务调度": "如何合理分配爬取任务",
    "数据一致性": "确保数据不重复、不丢失",
    "状态同步": "多个节点间的状态同步",
    "故障处理": "节点故障的检测和恢复",
    "监控管理": "分布式系统的监控和管理"
}

print("\n分布式爬虫的挑战:")
for challenge, description in distributed_challenges.items():
    print(f"  {challenge}: {description}")

1.2 Scrapy-Redis架构

# 2. Scrapy-Redis架构介绍
print("\n🏗️ Scrapy-Redis架构:")

class ScrapyRedisArchitecture:
    """
    Scrapy-Redis架构演示
    """
    
    def __init__(self):
        self.components = {
            "Redis服务器": "中央调度器,存储请求队列和去重集合",
            "Master节点": "负责生成初始请求和监控",
            "Worker节点": "执行爬取任务的工作节点",
            "数据存储": "存储爬取结果的数据库"
        }
        
        self.workflow = [
            "1. Master节点将起始URL推送到Redis队列",
            "2. Worker节点从Redis队列获取请求",
            "3. Worker节点执行爬取并生成新请求",
            "4. 新请求经过去重后推送到Redis队列",
            "5. 爬取结果存储到数据库",
            "6. 重复步骤2-5直到队列为空"
        ]
    
    def show_architecture(self):
        """
        展示架构组件
        """
        print("\n架构组件:")
        for component, description in self.components.items():
            print(f"  {component}: {description}")
        
        print("\n工作流程:")
        for step in self.workflow:
            print(f"  {step}")
    
    def show_redis_data_structures(self):
        """
        展示Redis数据结构
        """
        redis_structures = {
            "请求队列": {
                "类型": "List",
                "键名": "spider_name:requests",
                "用途": "存储待处理的请求"
            },
            "去重集合": {
                "类型": "Set",
                "键名": "spider_name:dupefilter",
                "用途": "存储请求指纹,用于去重"
            },
            "统计信息": {
                "类型": "Hash",
                "键名": "spider_name:stats",
                "用途": "存储爬虫运行统计"
            },
            "Items队列": {
                "类型": "List",
                "键名": "spider_name:items",
                "用途": "存储爬取的数据项"
            }
        }
        
        print("\nRedis数据结构:")
        for name, info in redis_structures.items():
            print(f"  {name}:")
            for key, value in info.items():
                print(f"    {key}: {value}")

# 创建架构演示实例
architecture = ScrapyRedisArchitecture()
architecture.show_architecture()
architecture.show_redis_data_structures()

print("Scrapy-Redis架构介绍完成!")

2. Scrapy-Redis安装与配置

2.1 环境准备

# 3. 环境准备和安装
print("\n📦 环境准备和安装:")

# 安装命令
installation_commands = [
    "# 安装Redis服务器",
    "# Ubuntu/Debian:",
    "sudo apt-get update",
    "sudo apt-get install redis-server",
    "",
    "# CentOS/RHEL:",
    "sudo yum install redis",
    "",
    "# macOS:",
    "brew install redis",
    "",
    "# Windows:",
    "# 下载Redis for Windows或使用Docker",
    "",
    "# 安装Python依赖",
    "pip install scrapy-redis",
    "pip install redis",
    "",
    "# 启动Redis服务",
    "redis-server",
    "",
    "# 测试Redis连接",
    "redis-cli ping"
]

print("安装步骤:")
for command in installation_commands:
    print(command)

# Redis配置示例
redis_config = """
# redis.conf 配置示例
bind 0.0.0.0  # 允许外部连接
port 6379     # 默认端口
maxmemory 2gb # 最大内存限制
maxmemory-policy allkeys-lru  # 内存淘汰策略
save 900 1    # 持久化配置
save 300 10
save 60 10000
"""

print(f"\nRedis配置示例:\n{redis_config}")

2.2 Scrapy-Redis配置

# 4. Scrapy-Redis项目配置
print("\n⚙️ Scrapy-Redis项目配置:")

# settings.py 配置示例
scrapy_redis_settings = """
# settings.py

# 启用Scrapy-Redis组件
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
ITEM_PIPELINES = {
    'scrapy_redis.pipelines.RedisPipeline': 300,
}

# Redis连接配置
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
REDIS_PASSWORD = None  # 如果Redis设置了密码

# 或者使用Redis URL
# REDIS_URL = 'redis://user:password@hostname:port/db'

# 调度器配置
SCHEDULER_PERSIST = True  # 持久化调度器状态
SCHEDULER_QUEUE_KEY = '%(spider)s:requests'  # 请求队列键名
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'  # 队列类型

# 去重过滤器配置
DUPEFILTER_KEY = '%(spider)s:dupefilter'  # 去重集合键名
DUPEFILTER_DEBUG = True  # 调试模式

# 统计信息配置
STATS_CLASS = 'scrapy_redis.stats.RedisStatsCollector'

# 其他配置
REDIS_START_URLS_KEY = '%(name)s:start_urls'  # 起始URL键名
REDIS_START_URLS_AS_SET = False  # 起始URL是否使用集合

# 连接池配置
REDIS_PARAMS = {
    'socket_connect_timeout': 30,
    'socket_timeout': 30,
    'retry_on_timeout': True,
    'health_check_interval': 30,
}

# 日志配置
LOG_LEVEL = 'INFO'
"""

print("Scrapy-Redis配置:")
print(scrapy_redis_settings)

3. 分布式Spider开发

3.1 基础分布式Spider

# 5. 基础分布式Spider开发
print("\n🕷️ 基础分布式Spider开发:")

import scrapy
from scrapy_redis.spiders import RedisSpider
import json
import time

class DistributedNewsSpider(RedisSpider):
    """
    分布式新闻爬虫示例
    """
    name = 'distributed_news'
    redis_key = 'distributed_news:start_urls'
    
    # 自定义设置
    custom_settings = {
        'DOWNLOAD_DELAY': 1,
        'RANDOMIZE_DOWNLOAD_DELAY': True,
        'CONCURRENT_REQUESTS': 16,
        'CONCURRENT_REQUESTS_PER_DOMAIN': 8,
    }
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.start_time = time.time()
        self.processed_count = 0
    
    def parse(self, response):
        """
        解析新闻列表页
        """
        self.logger.info(f'Processing: {response.url}')
        
        # 提取新闻链接
        news_links = response.css('a.news-link::attr(href)').getall()
        
        for link in news_links:
            # 生成新闻详情页请求
            yield response.follow(
                link,
                callback=self.parse_news,
                meta={'source_url': response.url}
            )
        
        # 提取分页链接
        next_page = response.css('a.next-page::attr(href)').get()
        if next_page:
            yield response.follow(
                next_page,
                callback=self.parse,
                meta={'page_type': 'list'}
            )
    
    def parse_news(self, response):
        """
        解析新闻详情页
        """
        self.processed_count += 1
        
        # 提取新闻数据
        item = {
            'url': response.url,
            'title': response.css('h1.title::text').get(),
            'content': ' '.join(response.css('div.content p::text').getall()),
            'author': response.css('span.author::text').get(),
            'publish_time': response.css('time::attr(datetime)').get(),
            'category': response.css('span.category::text').get(),
            'tags': response.css('span.tag::text').getall(),
            'source_url': response.meta.get('source_url'),
            'crawl_time': time.time(),
            'spider_name': self.name,
        }
        
        # 数据清洗
        item = self.clean_item(item)
        
        if item['title'] and item['content']:
            yield item
        
        # 记录进度
        if self.processed_count % 100 == 0:
            elapsed_time = time.time() - self.start_time
            rate = self.processed_count / elapsed_time
            self.logger.info(f'Processed {self.processed_count} items, rate: {rate:.2f} items/sec')
    
    def clean_item(self, item):
        """
        清洗数据项
        """
        # 清理标题
        if item['title']:
            item['title'] = item['title'].strip()
        
        # 清理内容
        if item['content']:
            item['content'] = item['content'].strip()
            # 移除多余空格
            item['content'] = ' '.join(item['content'].split())
        
        # 清理作者
        if item['author']:
            item['author'] = item['author'].strip()
        
        # 清理标签
        if item['tags']:
            item['tags'] = [tag.strip() for tag in item['tags'] if tag.strip()]
        
        return item

print("基础分布式Spider开发完成!")

3.2 高级分布式Spider

# 6. 高级分布式Spider开发
print("\n🚀 高级分布式Spider开发:")

from scrapy_redis.spiders import RedisCrawlSpider
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import Rule
import hashlib
import redis

class AdvancedDistributedSpider(RedisCrawlSpider):
    """
    高级分布式爬虫示例
    """
    name = 'advanced_distributed'
    redis_key = 'advanced_distributed:start_urls'
    
    # 链接提取规则
    rules = (
        # 提取列表页链接
        Rule(
            LinkExtractor(
                allow=r'/category/\w+/page/\d+',
                deny=r'/admin|/login|/register'
            ),
            callback='parse_list',
            follow=True
        ),
        
        # 提取详情页链接
        Rule(
            LinkExtractor(
                allow=r'/article/\d+',
                deny=r'/comment|/share'
            ),
            callback='parse_item',
            follow=False
        ),
    )
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
        # 初始化Redis连接
        self.redis_conn = redis.Redis(
            host=self.settings.get('REDIS_HOST', 'localhost'),
            port=self.settings.get('REDIS_PORT', 6379),
            db=self.settings.get('REDIS_DB', 0),
            decode_responses=True
        )
        
        # 统计信息
        self.stats_key = f'{self.name}:stats'
        self.error_key = f'{self.name}:errors'
        
        # 初始化统计
        self.init_stats()
    
    def init_stats(self):
        """
        初始化统计信息
        """
        stats = {
            'start_time': time.time(),
            'requests_count': 0,
            'items_count': 0,
            'errors_count': 0,
            'spider_name': self.name
        }
        
        self.redis_conn.hmset(self.stats_key, stats)
    
    def parse_list(self, response):
        """
        解析列表页
        """
        self.update_stats('requests_count')
        
        # 提取文章链接
        article_links = response.css('a.article-link::attr(href)').getall()
        
        for link in article_links:
            yield response.follow(
                link,
                callback=self.parse_item,
                meta={
                    'category': self.extract_category(response.url),
                    'list_url': response.url
                }
            )
    
    def parse_item(self, response):
        """
        解析详情页
        """
        self.update_stats('requests_count')
        
        try:
            # 提取数据
            item = self.extract_item_data(response)
            
            # 验证数据
            if self.validate_item(item):
                self.update_stats('items_count')
                yield item
            else:
                self.log_error('validation_failed', response.url, 'Item validation failed')
        
        except Exception as e:
            self.log_error('parse_error', response.url, str(e))
    
    def extract_item_data(self, response):
        """
        提取项目数据
        """
        item = {
            'url': response.url,
            'title': response.css('h1::text').get(),
            'content': self.extract_content(response),
            'author': response.css('.author::text').get(),
            'publish_time': response.css('time::attr(datetime)').get(),
            'category': response.meta.get('category'),
            'tags': response.css('.tag::text').getall(),
            'views': self.extract_views(response),
            'comments_count': self.extract_comments_count(response),
            'images': response.css('img::attr(src)').getall(),
            'external_links': self.extract_external_links(response),
            'word_count': 0,  # 将在后续计算
            'fingerprint': '',  # 将在后续生成
            'crawl_time': time.time(),
            'spider_name': self.name,
            'list_url': response.meta.get('list_url'),
        }
        
        # 计算字数
        if item['content']:
            item['word_count'] = len(item['content'].split())
        
        # 生成指纹
        item['fingerprint'] = self.generate_fingerprint(item)
        
        return item
    
    def extract_content(self, response):
        """
        提取文章内容
        """
        # 尝试多种选择器
        content_selectors = [
            'div.content p::text',
            'article p::text',
            '.article-body p::text',
            '.post-content p::text'
        ]
        
        for selector in content_selectors:
            content_parts = response.css(selector).getall()
            if content_parts:
                return ' '.join(content_parts).strip()
        
        return ''
    
    def extract_views(self, response):
        """
        提取浏览量
        """
        views_text = response.css('.views::text').get()
        if views_text:
            import re
            match = re.search(r'(\d+)', views_text)
            if match:
                return int(match.group(1))
        return 0
    
    def extract_comments_count(self, response):
        """
        提取评论数
        """
        comments_text = response.css('.comments-count::text').get()
        if comments_text:
            import re
            match = re.search(r'(\d+)', comments_text)
            if match:
                return int(match.group(1))
        return 0
    
    def extract_external_links(self, response):
        """
        提取外部链接
        """
        all_links = response.css('a::attr(href)').getall()
        external_links = []
        
        for link in all_links:
            if link and (link.startswith('http') and 
                        response.urljoin(link) != response.url):
                external_links.append(response.urljoin(link))
        
        return external_links
    
    def extract_category(self, url):
        """
        从URL提取分类
        """
        import re
        match = re.search(r'/category/(\w+)', url)
        return match.group(1) if match else 'unknown'
    
    def generate_fingerprint(self, item):
        """
        生成项目指纹
        """
        content = f"{item['title']}{item['content']}{item['url']}"
        return hashlib.md5(content.encode()).hexdigest()
    
    def validate_item(self, item):
        """
        验证项目数据
        """
        # 必填字段检查
        required_fields = ['title', 'content', 'url']
        for field in required_fields:
            if not item.get(field):
                return False
        
        # 内容长度检查
        if len(item['content']) < 100:
            return False
        
        # 标题长度检查
        if len(item['title']) < 5:
            return False
        
        return True
    
    def update_stats(self, stat_name, increment=1):
        """
        更新统计信息
        """
        self.redis_conn.hincrby(self.stats_key, stat_name, increment)
    
    def log_error(self, error_type, url, message):
        """
        记录错误信息
        """
        self.update_stats('errors_count')
        
        error_info = {
            'type': error_type,
            'url': url,
            'message': message,
            'timestamp': time.time(),
            'spider_name': self.name
        }
        
        self.redis_conn.lpush(self.error_key, json.dumps(error_info))
        self.logger.error(f'{error_type}: {url} - {message}')
    
    def get_stats(self):
        """
        获取统计信息
        """
        return self.redis_conn.hgetall(self.stats_key)

print("高级分布式Spider开发完成!")

4. 分布式任务管理

4.1 任务分发器

# 7. 分布式任务分发器
print("\n📋 分布式任务分发器:")

import redis
import json
import time
from urllib.parse import urljoin, urlparse

class DistributedTaskManager:
    """
    分布式任务管理器
    """
    
    def __init__(self, redis_host='localhost', redis_port=6379, redis_db=0):
        self.redis_conn = redis.Redis(
            host=redis_host,
            port=redis_port,
            db=redis_db,
            decode_responses=True
        )
        
        self.task_queue_key = 'distributed_tasks:queue'
        self.worker_status_key = 'distributed_tasks:workers'
        self.task_status_key = 'distributed_tasks:status'
        
        self.logger = self._setup_logger()
    
    def _setup_logger(self):
        """
        设置日志记录器
        """
        import logging
        logger = logging.getLogger('TaskManager')
        logger.setLevel(logging.INFO)
        
        if not logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter(
                '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
            )
            handler.setFormatter(formatter)
            logger.addHandler(handler)
        
        return logger
    
    def add_start_urls(self, spider_name, urls):
        """
        添加起始URL到Redis队列
        """
        redis_key = f'{spider_name}:start_urls'
        
        for url in urls:
            self.redis_conn.lpush(redis_key, url)
        
        self.logger.info(f'Added {len(urls)} start URLs for spider: {spider_name}')
        
        # 更新任务状态
        self.update_task_status(spider_name, 'urls_added', len(urls))
    
    def add_seed_urls(self, spider_name, seed_data):
        """
        添加种子URL和元数据
        """
        redis_key = f'{spider_name}:start_urls'
        
        for seed in seed_data:
            if isinstance(seed, dict):
                # 包含元数据的种子
                seed_json = json.dumps(seed)
                self.redis_conn.lpush(redis_key, seed_json)
            else:
                # 简单URL
                self.redis_conn.lpush(redis_key, seed)
        
        self.logger.info(f'Added {len(seed_data)} seed URLs for spider: {spider_name}')
    
    def generate_url_list(self, base_url, patterns):
        """
        生成URL列表
        """
        urls = []
        
        for pattern in patterns:
            if 'range' in pattern:
                # 数字范围模式
                start, end = pattern['range']
                url_template = pattern['template']
                
                for i in range(start, end + 1):
                    url = url_template.format(i)
                    full_url = urljoin(base_url, url)
                    urls.append(full_url)
            
            elif 'list' in pattern:
                # 列表模式
                url_template = pattern['template']
                
                for item in pattern['list']:
                    url = url_template.format(item)
                    full_url = urljoin(base_url, url)
                    urls.append(full_url)
        
        return urls
    
    def distribute_tasks(self, spider_name, task_config):
        """
        分发任务
        """
        # 生成URL列表
        if 'base_url' in task_config and 'patterns' in task_config:
            urls = self.generate_url_list(
                task_config['base_url'],
                task_config['patterns']
            )
        else:
            urls = task_config.get('urls', [])
        
        # 分批添加URL
        batch_size = task_config.get('batch_size', 100)
        
        for i in range(0, len(urls), batch_size):
            batch_urls = urls[i:i + batch_size]
            self.add_start_urls(spider_name, batch_urls)
            
            # 添加延迟避免Redis压力过大
            time.sleep(0.1)
        
        self.logger.info(f'Distributed {len(urls)} tasks for spider: {spider_name}')
        
        return len(urls)
    
    def monitor_workers(self):
        """
        监控工作节点状态
        """
        workers = self.redis_conn.hgetall(self.worker_status_key)
        
        active_workers = []
        inactive_workers = []
        
        current_time = time.time()
        
        for worker_id, last_seen in workers.items():
            last_seen_time = float(last_seen)
            
            if current_time - last_seen_time < 60:  # 1分钟内活跃
                active_workers.append(worker_id)
            else:
                inactive_workers.append(worker_id)
        
        return {
            'active_workers': active_workers,
            'inactive_workers': inactive_workers,
            'total_workers': len(workers)
        }
    
    def get_queue_status(self, spider_name):
        """
        获取队列状态
        """
        queue_key = f'{spider_name}:requests'
        start_urls_key = f'{spider_name}:start_urls'
        dupefilter_key = f'{spider_name}:dupefilter'
        
        status = {
            'pending_requests': self.redis_conn.llen(queue_key),
            'start_urls': self.redis_conn.llen(start_urls_key),
            'processed_urls': self.redis_conn.scard(dupefilter_key),
            'spider_name': spider_name
        }
        
        return status
    
    def update_task_status(self, spider_name, status_type, value):
        """
        更新任务状态
        """
        status_key = f'{self.task_status_key}:{spider_name}'
        
        self.redis_conn.hset(status_key, status_type, value)
        self.redis_conn.hset(status_key, 'last_update', time.time())
    
    def clear_spider_data(self, spider_name):
        """
        清理爬虫数据
        """
        keys_to_clear = [
            f'{spider_name}:requests',
            f'{spider_name}:start_urls',
            f'{spider_name}:dupefilter',
            f'{spider_name}:items',
            f'{spider_name}:stats',
            f'{self.task_status_key}:{spider_name}'
        ]
        
        for key in keys_to_clear:
            self.redis_conn.delete(key)
        
        self.logger.info(f'Cleared data for spider: {spider_name}')
    
    def get_spider_stats(self, spider_name):
        """
        获取爬虫统计信息
        """
        stats_key = f'{spider_name}:stats'
        stats = self.redis_conn.hgetall(stats_key)
        
        # 添加队列状态
        queue_status = self.get_queue_status(spider_name)
        stats.update(queue_status)
        
        return stats

# 使用示例
print("\n使用示例:")

# 创建任务管理器
task_manager = DistributedTaskManager()

# 示例任务配置
example_task_config = {
    'base_url': 'https://example.com',
    'patterns': [
        {
            'template': '/category/{}/page/{}',
            'range': (1, 100),
        },
        {
            'template': '/tag/{}',
            'list': ['python', 'scrapy', 'redis', 'distributed']
        }
    ],
    'batch_size': 50
}

print("任务分发器开发完成!")

4.2 监控面板

# 8. 分布式爬虫监控面板
print("\n📊 分布式爬虫监控面板:")

import time
import json
from datetime import datetime, timedelta

class DistributedMonitor:
    """
    分布式爬虫监控器
    """
    
    def __init__(self, redis_conn):
        self.redis_conn = redis_conn
        self.monitor_key = 'distributed_monitor'
        
    def collect_system_stats(self):
        """
        收集系统统计信息
        """
        stats = {
            'timestamp': time.time(),
            'redis_info': self.get_redis_info(),
            'spider_stats': self.get_all_spider_stats(),
            'worker_stats': self.get_worker_stats(),
            'queue_stats': self.get_queue_stats(),
            'performance_stats': self.get_performance_stats()
        }
        
        return stats
    
    def get_redis_info(self):
        """
        获取Redis信息
        """
        info = self.redis_conn.info()
        
        return {
            'version': info.get('redis_version'),
            'memory_used': info.get('used_memory_human'),
            'memory_peak': info.get('used_memory_peak_human'),
            'connected_clients': info.get('connected_clients'),
            'total_commands_processed': info.get('total_commands_processed'),
            'keyspace_hits': info.get('keyspace_hits'),
            'keyspace_misses': info.get('keyspace_misses'),
            'uptime_in_seconds': info.get('uptime_in_seconds')
        }
    
    def get_all_spider_stats(self):
        """
        获取所有爬虫统计
        """
        spider_stats = {}
        
        # 查找所有爬虫的统计键
        stats_keys = self.redis_conn.keys('*:stats')
        
        for key in stats_keys:
            spider_name = key.replace(':stats', '')
            stats = self.redis_conn.hgetall(key)
            
            # 转换数值类型
            for stat_key, value in stats.items():
                try:
                    if '.' in value:
                        stats[stat_key] = float(value)
                    else:
                        stats[stat_key] = int(value)
                except (ValueError, TypeError):
                    pass
            
            spider_stats[spider_name] = stats
        
        return spider_stats
    
    def get_worker_stats(self):
        """
        获取工作节点统计
        """
        workers = self.redis_conn.hgetall('distributed_tasks:workers')
        current_time = time.time()
        
        active_count = 0
        inactive_count = 0
        
        for worker_id, last_seen in workers.items():
            try:
                last_seen_time = float(last_seen)
                if current_time - last_seen_time < 60:
                    active_count += 1
                else:
                    inactive_count += 1
            except (ValueError, TypeError):
                inactive_count += 1
        
        return {
            'total_workers': len(workers),
            'active_workers': active_count,
            'inactive_workers': inactive_count,
            'worker_list': list(workers.keys())
        }
    
    def get_queue_stats(self):
        """
        获取队列统计
        """
        queue_stats = {}
        
        # 查找所有请求队列
        request_keys = self.redis_conn.keys('*:requests')
        start_url_keys = self.redis_conn.keys('*:start_urls')
        dupefilter_keys = self.redis_conn.keys('*:dupefilter')
        
        for key in request_keys:
            spider_name = key.replace(':requests', '')
            queue_stats[spider_name] = {
                'pending_requests': self.redis_conn.llen(key),
                'start_urls': 0,
                'processed_urls': 0
            }
        
        for key in start_url_keys:
            spider_name = key.replace(':start_urls', '')
            if spider_name in queue_stats:
                queue_stats[spider_name]['start_urls'] = self.redis_conn.llen(key)
        
        for key in dupefilter_keys:
            spider_name = key.replace(':dupefilter', '')
            if spider_name in queue_stats:
                queue_stats[spider_name]['processed_urls'] = self.redis_conn.scard(key)
        
        return queue_stats
    
    def get_performance_stats(self):
        """
        获取性能统计
        """
        # 计算总体性能指标
        spider_stats = self.get_all_spider_stats()
        
        total_requests = 0
        total_items = 0
        total_errors = 0
        
        for spider_name, stats in spider_stats.items():
            total_requests += stats.get('requests_count', 0)
            total_items += stats.get('items_count', 0)
            total_errors += stats.get('errors_count', 0)
        
        # 计算成功率
        success_rate = 0
        if total_requests > 0:
            success_rate = (total_requests - total_errors) / total_requests
        
        return {
            'total_requests': total_requests,
            'total_items': total_items,
            'total_errors': total_errors,
            'success_rate': success_rate,
            'error_rate': 1 - success_rate if total_requests > 0 else 0
        }
    
    def generate_report(self):
        """
        生成监控报告
        """
        stats = self.collect_system_stats()
        
        report = f"""
=== 分布式爬虫监控报告 ===
时间: {datetime.fromtimestamp(stats['timestamp']).strftime('%Y-%m-%d %H:%M:%S')}

Redis状态:
  版本: {stats['redis_info']['version']}
  内存使用: {stats['redis_info']['memory_used']}
  连接客户端: {stats['redis_info']['connected_clients']}
  运行时间: {stats['redis_info']['uptime_in_seconds']}秒

工作节点:
  总节点数: {stats['worker_stats']['total_workers']}
  活跃节点: {stats['worker_stats']['active_workers']}
  非活跃节点: {stats['worker_stats']['inactive_workers']}

性能统计:
  总请求数: {stats['performance_stats']['total_requests']}
  总数据项: {stats['performance_stats']['total_items']}
  总错误数: {stats['performance_stats']['total_errors']}
  成功率: {stats['performance_stats']['success_rate']:.2%}

爬虫状态:
"""
        
        for spider_name, spider_stat in stats['spider_stats'].items():
            report += f"  {spider_name}:\n"
            report += f"    请求数: {spider_stat.get('requests_count', 0)}\n"
            report += f"    数据项: {spider_stat.get('items_count', 0)}\n"
            report += f"    错误数: {spider_stat.get('errors_count', 0)}\n"
        
        queue_stats = stats['queue_stats']
        if queue_stats:
            report += "\n队列状态:\n"
            for spider_name, queue_stat in queue_stats.items():
                report += f"  {spider_name}:\n"
                report += f"    待处理请求: {queue_stat['pending_requests']}\n"
                report += f"    起始URL: {queue_stat['start_urls']}\n"
                report += f"    已处理URL: {queue_stat['processed_urls']}\n"
        
        return report
    
    def save_stats_history(self):
        """
        保存统计历史
        """
        stats = self.collect_system_stats()
        
        # 保存到Redis列表,保留最近24小时的数据
        history_key = f'{self.monitor_key}:history'
        
        self.redis_conn.lpush(history_key, json.dumps(stats))
        
        # 保留最近1440条记录(24小时,每分钟一条)
        self.redis_conn.ltrim(history_key, 0, 1439)
    
    def get_stats_history(self, hours=1):
        """
        获取统计历史
        """
        history_key = f'{self.monitor_key}:history'
        
        # 计算需要获取的记录数(每分钟一条)
        records_count = hours * 60
        
        history_data = self.redis_conn.lrange(history_key, 0, records_count - 1)
        
        parsed_history = []
        for data in history_data:
            try:
                parsed_history.append(json.loads(data))
            except json.JSONDecodeError:
                continue
        
        return parsed_history
    
    def alert_check(self):
        """
        检查告警条件
        """
        stats = self.collect_system_stats()
        alerts = []
        
        # 检查错误率
        error_rate = stats['performance_stats']['error_rate']
        if error_rate > 0.1:  # 错误率超过10%
            alerts.append({
                'type': 'high_error_rate',
                'message': f'错误率过高: {error_rate:.2%}',
                'severity': 'warning'
            })
        
        # 检查活跃工作节点
        active_workers = stats['worker_stats']['active_workers']
        if active_workers == 0:
            alerts.append({
                'type': 'no_active_workers',
                'message': '没有活跃的工作节点',
                'severity': 'critical'
            })
        
        # 检查Redis内存使用
        redis_info = stats['redis_info']
        # 这里需要解析内存使用量,简化处理
        
        return alerts

print("分布式爬虫监控面板开发完成!")

5. 本章小结

本章详细介绍了分布式爬虫和Scrapy-Redis的使用,主要内容包括:

5.1 核心概念

  1. 分布式爬虫优势

    • 性能提升和容错能力
    • 资源利用和扩展性
  2. Scrapy-Redis架构

    • Redis作为中央调度器
    • Master-Worker模式
    • 数据结构设计

5.2 实现技术

  1. 环境配置

    • Redis安装和配置
    • Scrapy-Redis组件配置
  2. 分布式Spider开发

    • RedisSpider和RedisCrawlSpider
    • 高级功能和错误处理
  3. 任务管理

    • 任务分发和监控
    • 队列状态管理

5.3 监控管理

  • 系统状态监控
  • 性能统计分析
  • 告警机制设计

6. 最佳实践

6.1 架构设计

# 分布式架构最佳实践
architecture_best_practices = {
    "Redis配置": "合理配置内存和持久化策略",
    "负载均衡": "合理分配爬虫节点和任务",
    "容错设计": "实现节点故障自动恢复",
    "监控告警": "建立完善的监控和告警机制",
    "数据一致性": "确保数据不重复不丢失"
}

6.2 性能优化

# 性能优化技巧
performance_optimization = {
    "Redis优化": "使用合适的数据结构和过期策略",
    "网络优化": "减少Redis网络延迟",
    "内存管理": "控制队列大小和内存使用",
    "并发控制": "合理设置并发数和延迟",
    "批量操作": "使用批量操作减少Redis调用"
}

7. 常见陷阱

7.1 配置陷阱

  • Redis连接:网络配置和连接池设置
  • 队列管理:队列大小和内存限制
  • 数据持久化:Redis持久化策略配置

7.2 运维陷阱

  • 单点故障:Redis服务器故障
  • 数据丢失:不当的清理操作
  • 性能瓶颈:Redis成为性能瓶颈

8. 下一步学习

完成本章后,建议继续学习:

  1. 第7章:反爬虫对抗 - 学习反爬虫技术和对策
  2. 第8章:数据存储 - 掌握多种数据存储方案
  3. 第9章:监控与部署 - 学习生产环境部署
  4. 第10章:高级应用 - 探索高级爬虫技术

9. 练习题

9.1 基础练习

  1. 搭建一个简单的分布式爬虫系统
  2. 实现基于Redis的任务分发机制
  3. 开发一个爬虫状态监控工具

9.2 进阶练习

  1. 实现一个支持动态扩容的分布式爬虫系统
  2. 开发一个智能任务调度器
  3. 创建一个分布式爬虫管理平台

9.3 项目练习

  1. 大规模分布式爬虫:构建支持数百个节点的分布式系统
  2. 智能监控系统:开发具有预测能力的监控系统
  3. 云原生爬虫:基于Kubernetes的分布式爬虫系统

恭喜! 您已经掌握了分布式爬虫和Scrapy-Redis的核心技术。分布式爬虫是大规模数据采集的关键技术,掌握这些技能将使您能够构建高性能、高可用的爬虫系统。继续学习下一章,探索反爬虫对抗技术!