学习目标
通过本章学习,您将掌握:
- 理解分布式爬虫的概念和优势
- 掌握Scrapy-Redis的安装和配置
- 学会实现分布式爬虫架构
- 了解Redis在爬虫中的应用场景
- 掌握分布式爬虫的监控和管理
1. 分布式爬虫基础
1.1 什么是分布式爬虫
# 1. 分布式爬虫概念演示
print("🌐 分布式爬虫基础概念:")
# 分布式爬虫的优势
distributed_advantages = {
"性能提升": "多台机器并行处理,提高爬取速度",
"容错能力": "单台机器故障不影响整体爬取",
"资源利用": "充分利用多台机器的计算资源",
"扩展性": "可以根据需要动态增加爬虫节点",
"负载分担": "将爬取任务分散到多个节点"
}
for advantage, description in distributed_advantages.items():
print(f" {advantage}: {description}")
# 分布式爬虫的挑战
distributed_challenges = {
"任务调度": "如何合理分配爬取任务",
"数据一致性": "确保数据不重复、不丢失",
"状态同步": "多个节点间的状态同步",
"故障处理": "节点故障的检测和恢复",
"监控管理": "分布式系统的监控和管理"
}
print("\n分布式爬虫的挑战:")
for challenge, description in distributed_challenges.items():
print(f" {challenge}: {description}")
1.2 Scrapy-Redis架构
# 2. Scrapy-Redis架构介绍
print("\n🏗️ Scrapy-Redis架构:")
class ScrapyRedisArchitecture:
"""
Scrapy-Redis架构演示
"""
def __init__(self):
self.components = {
"Redis服务器": "中央调度器,存储请求队列和去重集合",
"Master节点": "负责生成初始请求和监控",
"Worker节点": "执行爬取任务的工作节点",
"数据存储": "存储爬取结果的数据库"
}
self.workflow = [
"1. Master节点将起始URL推送到Redis队列",
"2. Worker节点从Redis队列获取请求",
"3. Worker节点执行爬取并生成新请求",
"4. 新请求经过去重后推送到Redis队列",
"5. 爬取结果存储到数据库",
"6. 重复步骤2-5直到队列为空"
]
def show_architecture(self):
"""
展示架构组件
"""
print("\n架构组件:")
for component, description in self.components.items():
print(f" {component}: {description}")
print("\n工作流程:")
for step in self.workflow:
print(f" {step}")
def show_redis_data_structures(self):
"""
展示Redis数据结构
"""
redis_structures = {
"请求队列": {
"类型": "List",
"键名": "spider_name:requests",
"用途": "存储待处理的请求"
},
"去重集合": {
"类型": "Set",
"键名": "spider_name:dupefilter",
"用途": "存储请求指纹,用于去重"
},
"统计信息": {
"类型": "Hash",
"键名": "spider_name:stats",
"用途": "存储爬虫运行统计"
},
"Items队列": {
"类型": "List",
"键名": "spider_name:items",
"用途": "存储爬取的数据项"
}
}
print("\nRedis数据结构:")
for name, info in redis_structures.items():
print(f" {name}:")
for key, value in info.items():
print(f" {key}: {value}")
# 创建架构演示实例
architecture = ScrapyRedisArchitecture()
architecture.show_architecture()
architecture.show_redis_data_structures()
print("Scrapy-Redis架构介绍完成!")
2. Scrapy-Redis安装与配置
2.1 环境准备
# 3. 环境准备和安装
print("\n📦 环境准备和安装:")
# 安装命令
installation_commands = [
"# 安装Redis服务器",
"# Ubuntu/Debian:",
"sudo apt-get update",
"sudo apt-get install redis-server",
"",
"# CentOS/RHEL:",
"sudo yum install redis",
"",
"# macOS:",
"brew install redis",
"",
"# Windows:",
"# 下载Redis for Windows或使用Docker",
"",
"# 安装Python依赖",
"pip install scrapy-redis",
"pip install redis",
"",
"# 启动Redis服务",
"redis-server",
"",
"# 测试Redis连接",
"redis-cli ping"
]
print("安装步骤:")
for command in installation_commands:
print(command)
# Redis配置示例
redis_config = """
# redis.conf 配置示例
bind 0.0.0.0 # 允许外部连接
port 6379 # 默认端口
maxmemory 2gb # 最大内存限制
maxmemory-policy allkeys-lru # 内存淘汰策略
save 900 1 # 持久化配置
save 300 10
save 60 10000
"""
print(f"\nRedis配置示例:\n{redis_config}")
2.2 Scrapy-Redis配置
# 4. Scrapy-Redis项目配置
print("\n⚙️ Scrapy-Redis项目配置:")
# settings.py 配置示例
scrapy_redis_settings = """
# settings.py
# 启用Scrapy-Redis组件
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
ITEM_PIPELINES = {
'scrapy_redis.pipelines.RedisPipeline': 300,
}
# Redis连接配置
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
REDIS_PASSWORD = None # 如果Redis设置了密码
# 或者使用Redis URL
# REDIS_URL = 'redis://user:password@hostname:port/db'
# 调度器配置
SCHEDULER_PERSIST = True # 持久化调度器状态
SCHEDULER_QUEUE_KEY = '%(spider)s:requests' # 请求队列键名
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue' # 队列类型
# 去重过滤器配置
DUPEFILTER_KEY = '%(spider)s:dupefilter' # 去重集合键名
DUPEFILTER_DEBUG = True # 调试模式
# 统计信息配置
STATS_CLASS = 'scrapy_redis.stats.RedisStatsCollector'
# 其他配置
REDIS_START_URLS_KEY = '%(name)s:start_urls' # 起始URL键名
REDIS_START_URLS_AS_SET = False # 起始URL是否使用集合
# 连接池配置
REDIS_PARAMS = {
'socket_connect_timeout': 30,
'socket_timeout': 30,
'retry_on_timeout': True,
'health_check_interval': 30,
}
# 日志配置
LOG_LEVEL = 'INFO'
"""
print("Scrapy-Redis配置:")
print(scrapy_redis_settings)
3. 分布式Spider开发
3.1 基础分布式Spider
# 5. 基础分布式Spider开发
print("\n🕷️ 基础分布式Spider开发:")
import scrapy
from scrapy_redis.spiders import RedisSpider
import json
import time
class DistributedNewsSpider(RedisSpider):
"""
分布式新闻爬虫示例
"""
name = 'distributed_news'
redis_key = 'distributed_news:start_urls'
# 自定义设置
custom_settings = {
'DOWNLOAD_DELAY': 1,
'RANDOMIZE_DOWNLOAD_DELAY': True,
'CONCURRENT_REQUESTS': 16,
'CONCURRENT_REQUESTS_PER_DOMAIN': 8,
}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.start_time = time.time()
self.processed_count = 0
def parse(self, response):
"""
解析新闻列表页
"""
self.logger.info(f'Processing: {response.url}')
# 提取新闻链接
news_links = response.css('a.news-link::attr(href)').getall()
for link in news_links:
# 生成新闻详情页请求
yield response.follow(
link,
callback=self.parse_news,
meta={'source_url': response.url}
)
# 提取分页链接
next_page = response.css('a.next-page::attr(href)').get()
if next_page:
yield response.follow(
next_page,
callback=self.parse,
meta={'page_type': 'list'}
)
def parse_news(self, response):
"""
解析新闻详情页
"""
self.processed_count += 1
# 提取新闻数据
item = {
'url': response.url,
'title': response.css('h1.title::text').get(),
'content': ' '.join(response.css('div.content p::text').getall()),
'author': response.css('span.author::text').get(),
'publish_time': response.css('time::attr(datetime)').get(),
'category': response.css('span.category::text').get(),
'tags': response.css('span.tag::text').getall(),
'source_url': response.meta.get('source_url'),
'crawl_time': time.time(),
'spider_name': self.name,
}
# 数据清洗
item = self.clean_item(item)
if item['title'] and item['content']:
yield item
# 记录进度
if self.processed_count % 100 == 0:
elapsed_time = time.time() - self.start_time
rate = self.processed_count / elapsed_time
self.logger.info(f'Processed {self.processed_count} items, rate: {rate:.2f} items/sec')
def clean_item(self, item):
"""
清洗数据项
"""
# 清理标题
if item['title']:
item['title'] = item['title'].strip()
# 清理内容
if item['content']:
item['content'] = item['content'].strip()
# 移除多余空格
item['content'] = ' '.join(item['content'].split())
# 清理作者
if item['author']:
item['author'] = item['author'].strip()
# 清理标签
if item['tags']:
item['tags'] = [tag.strip() for tag in item['tags'] if tag.strip()]
return item
print("基础分布式Spider开发完成!")
3.2 高级分布式Spider
# 6. 高级分布式Spider开发
print("\n🚀 高级分布式Spider开发:")
from scrapy_redis.spiders import RedisCrawlSpider
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import Rule
import hashlib
import redis
class AdvancedDistributedSpider(RedisCrawlSpider):
"""
高级分布式爬虫示例
"""
name = 'advanced_distributed'
redis_key = 'advanced_distributed:start_urls'
# 链接提取规则
rules = (
# 提取列表页链接
Rule(
LinkExtractor(
allow=r'/category/\w+/page/\d+',
deny=r'/admin|/login|/register'
),
callback='parse_list',
follow=True
),
# 提取详情页链接
Rule(
LinkExtractor(
allow=r'/article/\d+',
deny=r'/comment|/share'
),
callback='parse_item',
follow=False
),
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 初始化Redis连接
self.redis_conn = redis.Redis(
host=self.settings.get('REDIS_HOST', 'localhost'),
port=self.settings.get('REDIS_PORT', 6379),
db=self.settings.get('REDIS_DB', 0),
decode_responses=True
)
# 统计信息
self.stats_key = f'{self.name}:stats'
self.error_key = f'{self.name}:errors'
# 初始化统计
self.init_stats()
def init_stats(self):
"""
初始化统计信息
"""
stats = {
'start_time': time.time(),
'requests_count': 0,
'items_count': 0,
'errors_count': 0,
'spider_name': self.name
}
self.redis_conn.hmset(self.stats_key, stats)
def parse_list(self, response):
"""
解析列表页
"""
self.update_stats('requests_count')
# 提取文章链接
article_links = response.css('a.article-link::attr(href)').getall()
for link in article_links:
yield response.follow(
link,
callback=self.parse_item,
meta={
'category': self.extract_category(response.url),
'list_url': response.url
}
)
def parse_item(self, response):
"""
解析详情页
"""
self.update_stats('requests_count')
try:
# 提取数据
item = self.extract_item_data(response)
# 验证数据
if self.validate_item(item):
self.update_stats('items_count')
yield item
else:
self.log_error('validation_failed', response.url, 'Item validation failed')
except Exception as e:
self.log_error('parse_error', response.url, str(e))
def extract_item_data(self, response):
"""
提取项目数据
"""
item = {
'url': response.url,
'title': response.css('h1::text').get(),
'content': self.extract_content(response),
'author': response.css('.author::text').get(),
'publish_time': response.css('time::attr(datetime)').get(),
'category': response.meta.get('category'),
'tags': response.css('.tag::text').getall(),
'views': self.extract_views(response),
'comments_count': self.extract_comments_count(response),
'images': response.css('img::attr(src)').getall(),
'external_links': self.extract_external_links(response),
'word_count': 0, # 将在后续计算
'fingerprint': '', # 将在后续生成
'crawl_time': time.time(),
'spider_name': self.name,
'list_url': response.meta.get('list_url'),
}
# 计算字数
if item['content']:
item['word_count'] = len(item['content'].split())
# 生成指纹
item['fingerprint'] = self.generate_fingerprint(item)
return item
def extract_content(self, response):
"""
提取文章内容
"""
# 尝试多种选择器
content_selectors = [
'div.content p::text',
'article p::text',
'.article-body p::text',
'.post-content p::text'
]
for selector in content_selectors:
content_parts = response.css(selector).getall()
if content_parts:
return ' '.join(content_parts).strip()
return ''
def extract_views(self, response):
"""
提取浏览量
"""
views_text = response.css('.views::text').get()
if views_text:
import re
match = re.search(r'(\d+)', views_text)
if match:
return int(match.group(1))
return 0
def extract_comments_count(self, response):
"""
提取评论数
"""
comments_text = response.css('.comments-count::text').get()
if comments_text:
import re
match = re.search(r'(\d+)', comments_text)
if match:
return int(match.group(1))
return 0
def extract_external_links(self, response):
"""
提取外部链接
"""
all_links = response.css('a::attr(href)').getall()
external_links = []
for link in all_links:
if link and (link.startswith('http') and
response.urljoin(link) != response.url):
external_links.append(response.urljoin(link))
return external_links
def extract_category(self, url):
"""
从URL提取分类
"""
import re
match = re.search(r'/category/(\w+)', url)
return match.group(1) if match else 'unknown'
def generate_fingerprint(self, item):
"""
生成项目指纹
"""
content = f"{item['title']}{item['content']}{item['url']}"
return hashlib.md5(content.encode()).hexdigest()
def validate_item(self, item):
"""
验证项目数据
"""
# 必填字段检查
required_fields = ['title', 'content', 'url']
for field in required_fields:
if not item.get(field):
return False
# 内容长度检查
if len(item['content']) < 100:
return False
# 标题长度检查
if len(item['title']) < 5:
return False
return True
def update_stats(self, stat_name, increment=1):
"""
更新统计信息
"""
self.redis_conn.hincrby(self.stats_key, stat_name, increment)
def log_error(self, error_type, url, message):
"""
记录错误信息
"""
self.update_stats('errors_count')
error_info = {
'type': error_type,
'url': url,
'message': message,
'timestamp': time.time(),
'spider_name': self.name
}
self.redis_conn.lpush(self.error_key, json.dumps(error_info))
self.logger.error(f'{error_type}: {url} - {message}')
def get_stats(self):
"""
获取统计信息
"""
return self.redis_conn.hgetall(self.stats_key)
print("高级分布式Spider开发完成!")
4. 分布式任务管理
4.1 任务分发器
# 7. 分布式任务分发器
print("\n📋 分布式任务分发器:")
import redis
import json
import time
from urllib.parse import urljoin, urlparse
class DistributedTaskManager:
"""
分布式任务管理器
"""
def __init__(self, redis_host='localhost', redis_port=6379, redis_db=0):
self.redis_conn = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
decode_responses=True
)
self.task_queue_key = 'distributed_tasks:queue'
self.worker_status_key = 'distributed_tasks:workers'
self.task_status_key = 'distributed_tasks:status'
self.logger = self._setup_logger()
def _setup_logger(self):
"""
设置日志记录器
"""
import logging
logger = logging.getLogger('TaskManager')
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def add_start_urls(self, spider_name, urls):
"""
添加起始URL到Redis队列
"""
redis_key = f'{spider_name}:start_urls'
for url in urls:
self.redis_conn.lpush(redis_key, url)
self.logger.info(f'Added {len(urls)} start URLs for spider: {spider_name}')
# 更新任务状态
self.update_task_status(spider_name, 'urls_added', len(urls))
def add_seed_urls(self, spider_name, seed_data):
"""
添加种子URL和元数据
"""
redis_key = f'{spider_name}:start_urls'
for seed in seed_data:
if isinstance(seed, dict):
# 包含元数据的种子
seed_json = json.dumps(seed)
self.redis_conn.lpush(redis_key, seed_json)
else:
# 简单URL
self.redis_conn.lpush(redis_key, seed)
self.logger.info(f'Added {len(seed_data)} seed URLs for spider: {spider_name}')
def generate_url_list(self, base_url, patterns):
"""
生成URL列表
"""
urls = []
for pattern in patterns:
if 'range' in pattern:
# 数字范围模式
start, end = pattern['range']
url_template = pattern['template']
for i in range(start, end + 1):
url = url_template.format(i)
full_url = urljoin(base_url, url)
urls.append(full_url)
elif 'list' in pattern:
# 列表模式
url_template = pattern['template']
for item in pattern['list']:
url = url_template.format(item)
full_url = urljoin(base_url, url)
urls.append(full_url)
return urls
def distribute_tasks(self, spider_name, task_config):
"""
分发任务
"""
# 生成URL列表
if 'base_url' in task_config and 'patterns' in task_config:
urls = self.generate_url_list(
task_config['base_url'],
task_config['patterns']
)
else:
urls = task_config.get('urls', [])
# 分批添加URL
batch_size = task_config.get('batch_size', 100)
for i in range(0, len(urls), batch_size):
batch_urls = urls[i:i + batch_size]
self.add_start_urls(spider_name, batch_urls)
# 添加延迟避免Redis压力过大
time.sleep(0.1)
self.logger.info(f'Distributed {len(urls)} tasks for spider: {spider_name}')
return len(urls)
def monitor_workers(self):
"""
监控工作节点状态
"""
workers = self.redis_conn.hgetall(self.worker_status_key)
active_workers = []
inactive_workers = []
current_time = time.time()
for worker_id, last_seen in workers.items():
last_seen_time = float(last_seen)
if current_time - last_seen_time < 60: # 1分钟内活跃
active_workers.append(worker_id)
else:
inactive_workers.append(worker_id)
return {
'active_workers': active_workers,
'inactive_workers': inactive_workers,
'total_workers': len(workers)
}
def get_queue_status(self, spider_name):
"""
获取队列状态
"""
queue_key = f'{spider_name}:requests'
start_urls_key = f'{spider_name}:start_urls'
dupefilter_key = f'{spider_name}:dupefilter'
status = {
'pending_requests': self.redis_conn.llen(queue_key),
'start_urls': self.redis_conn.llen(start_urls_key),
'processed_urls': self.redis_conn.scard(dupefilter_key),
'spider_name': spider_name
}
return status
def update_task_status(self, spider_name, status_type, value):
"""
更新任务状态
"""
status_key = f'{self.task_status_key}:{spider_name}'
self.redis_conn.hset(status_key, status_type, value)
self.redis_conn.hset(status_key, 'last_update', time.time())
def clear_spider_data(self, spider_name):
"""
清理爬虫数据
"""
keys_to_clear = [
f'{spider_name}:requests',
f'{spider_name}:start_urls',
f'{spider_name}:dupefilter',
f'{spider_name}:items',
f'{spider_name}:stats',
f'{self.task_status_key}:{spider_name}'
]
for key in keys_to_clear:
self.redis_conn.delete(key)
self.logger.info(f'Cleared data for spider: {spider_name}')
def get_spider_stats(self, spider_name):
"""
获取爬虫统计信息
"""
stats_key = f'{spider_name}:stats'
stats = self.redis_conn.hgetall(stats_key)
# 添加队列状态
queue_status = self.get_queue_status(spider_name)
stats.update(queue_status)
return stats
# 使用示例
print("\n使用示例:")
# 创建任务管理器
task_manager = DistributedTaskManager()
# 示例任务配置
example_task_config = {
'base_url': 'https://example.com',
'patterns': [
{
'template': '/category/{}/page/{}',
'range': (1, 100),
},
{
'template': '/tag/{}',
'list': ['python', 'scrapy', 'redis', 'distributed']
}
],
'batch_size': 50
}
print("任务分发器开发完成!")
4.2 监控面板
# 8. 分布式爬虫监控面板
print("\n📊 分布式爬虫监控面板:")
import time
import json
from datetime import datetime, timedelta
class DistributedMonitor:
"""
分布式爬虫监控器
"""
def __init__(self, redis_conn):
self.redis_conn = redis_conn
self.monitor_key = 'distributed_monitor'
def collect_system_stats(self):
"""
收集系统统计信息
"""
stats = {
'timestamp': time.time(),
'redis_info': self.get_redis_info(),
'spider_stats': self.get_all_spider_stats(),
'worker_stats': self.get_worker_stats(),
'queue_stats': self.get_queue_stats(),
'performance_stats': self.get_performance_stats()
}
return stats
def get_redis_info(self):
"""
获取Redis信息
"""
info = self.redis_conn.info()
return {
'version': info.get('redis_version'),
'memory_used': info.get('used_memory_human'),
'memory_peak': info.get('used_memory_peak_human'),
'connected_clients': info.get('connected_clients'),
'total_commands_processed': info.get('total_commands_processed'),
'keyspace_hits': info.get('keyspace_hits'),
'keyspace_misses': info.get('keyspace_misses'),
'uptime_in_seconds': info.get('uptime_in_seconds')
}
def get_all_spider_stats(self):
"""
获取所有爬虫统计
"""
spider_stats = {}
# 查找所有爬虫的统计键
stats_keys = self.redis_conn.keys('*:stats')
for key in stats_keys:
spider_name = key.replace(':stats', '')
stats = self.redis_conn.hgetall(key)
# 转换数值类型
for stat_key, value in stats.items():
try:
if '.' in value:
stats[stat_key] = float(value)
else:
stats[stat_key] = int(value)
except (ValueError, TypeError):
pass
spider_stats[spider_name] = stats
return spider_stats
def get_worker_stats(self):
"""
获取工作节点统计
"""
workers = self.redis_conn.hgetall('distributed_tasks:workers')
current_time = time.time()
active_count = 0
inactive_count = 0
for worker_id, last_seen in workers.items():
try:
last_seen_time = float(last_seen)
if current_time - last_seen_time < 60:
active_count += 1
else:
inactive_count += 1
except (ValueError, TypeError):
inactive_count += 1
return {
'total_workers': len(workers),
'active_workers': active_count,
'inactive_workers': inactive_count,
'worker_list': list(workers.keys())
}
def get_queue_stats(self):
"""
获取队列统计
"""
queue_stats = {}
# 查找所有请求队列
request_keys = self.redis_conn.keys('*:requests')
start_url_keys = self.redis_conn.keys('*:start_urls')
dupefilter_keys = self.redis_conn.keys('*:dupefilter')
for key in request_keys:
spider_name = key.replace(':requests', '')
queue_stats[spider_name] = {
'pending_requests': self.redis_conn.llen(key),
'start_urls': 0,
'processed_urls': 0
}
for key in start_url_keys:
spider_name = key.replace(':start_urls', '')
if spider_name in queue_stats:
queue_stats[spider_name]['start_urls'] = self.redis_conn.llen(key)
for key in dupefilter_keys:
spider_name = key.replace(':dupefilter', '')
if spider_name in queue_stats:
queue_stats[spider_name]['processed_urls'] = self.redis_conn.scard(key)
return queue_stats
def get_performance_stats(self):
"""
获取性能统计
"""
# 计算总体性能指标
spider_stats = self.get_all_spider_stats()
total_requests = 0
total_items = 0
total_errors = 0
for spider_name, stats in spider_stats.items():
total_requests += stats.get('requests_count', 0)
total_items += stats.get('items_count', 0)
total_errors += stats.get('errors_count', 0)
# 计算成功率
success_rate = 0
if total_requests > 0:
success_rate = (total_requests - total_errors) / total_requests
return {
'total_requests': total_requests,
'total_items': total_items,
'total_errors': total_errors,
'success_rate': success_rate,
'error_rate': 1 - success_rate if total_requests > 0 else 0
}
def generate_report(self):
"""
生成监控报告
"""
stats = self.collect_system_stats()
report = f"""
=== 分布式爬虫监控报告 ===
时间: {datetime.fromtimestamp(stats['timestamp']).strftime('%Y-%m-%d %H:%M:%S')}
Redis状态:
版本: {stats['redis_info']['version']}
内存使用: {stats['redis_info']['memory_used']}
连接客户端: {stats['redis_info']['connected_clients']}
运行时间: {stats['redis_info']['uptime_in_seconds']}秒
工作节点:
总节点数: {stats['worker_stats']['total_workers']}
活跃节点: {stats['worker_stats']['active_workers']}
非活跃节点: {stats['worker_stats']['inactive_workers']}
性能统计:
总请求数: {stats['performance_stats']['total_requests']}
总数据项: {stats['performance_stats']['total_items']}
总错误数: {stats['performance_stats']['total_errors']}
成功率: {stats['performance_stats']['success_rate']:.2%}
爬虫状态:
"""
for spider_name, spider_stat in stats['spider_stats'].items():
report += f" {spider_name}:\n"
report += f" 请求数: {spider_stat.get('requests_count', 0)}\n"
report += f" 数据项: {spider_stat.get('items_count', 0)}\n"
report += f" 错误数: {spider_stat.get('errors_count', 0)}\n"
queue_stats = stats['queue_stats']
if queue_stats:
report += "\n队列状态:\n"
for spider_name, queue_stat in queue_stats.items():
report += f" {spider_name}:\n"
report += f" 待处理请求: {queue_stat['pending_requests']}\n"
report += f" 起始URL: {queue_stat['start_urls']}\n"
report += f" 已处理URL: {queue_stat['processed_urls']}\n"
return report
def save_stats_history(self):
"""
保存统计历史
"""
stats = self.collect_system_stats()
# 保存到Redis列表,保留最近24小时的数据
history_key = f'{self.monitor_key}:history'
self.redis_conn.lpush(history_key, json.dumps(stats))
# 保留最近1440条记录(24小时,每分钟一条)
self.redis_conn.ltrim(history_key, 0, 1439)
def get_stats_history(self, hours=1):
"""
获取统计历史
"""
history_key = f'{self.monitor_key}:history'
# 计算需要获取的记录数(每分钟一条)
records_count = hours * 60
history_data = self.redis_conn.lrange(history_key, 0, records_count - 1)
parsed_history = []
for data in history_data:
try:
parsed_history.append(json.loads(data))
except json.JSONDecodeError:
continue
return parsed_history
def alert_check(self):
"""
检查告警条件
"""
stats = self.collect_system_stats()
alerts = []
# 检查错误率
error_rate = stats['performance_stats']['error_rate']
if error_rate > 0.1: # 错误率超过10%
alerts.append({
'type': 'high_error_rate',
'message': f'错误率过高: {error_rate:.2%}',
'severity': 'warning'
})
# 检查活跃工作节点
active_workers = stats['worker_stats']['active_workers']
if active_workers == 0:
alerts.append({
'type': 'no_active_workers',
'message': '没有活跃的工作节点',
'severity': 'critical'
})
# 检查Redis内存使用
redis_info = stats['redis_info']
# 这里需要解析内存使用量,简化处理
return alerts
print("分布式爬虫监控面板开发完成!")
5. 本章小结
本章详细介绍了分布式爬虫和Scrapy-Redis的使用,主要内容包括:
5.1 核心概念
分布式爬虫优势
- 性能提升和容错能力
- 资源利用和扩展性
Scrapy-Redis架构
- Redis作为中央调度器
- Master-Worker模式
- 数据结构设计
5.2 实现技术
环境配置
- Redis安装和配置
- Scrapy-Redis组件配置
分布式Spider开发
- RedisSpider和RedisCrawlSpider
- 高级功能和错误处理
任务管理
- 任务分发和监控
- 队列状态管理
5.3 监控管理
- 系统状态监控
- 性能统计分析
- 告警机制设计
6. 最佳实践
6.1 架构设计
# 分布式架构最佳实践
architecture_best_practices = {
"Redis配置": "合理配置内存和持久化策略",
"负载均衡": "合理分配爬虫节点和任务",
"容错设计": "实现节点故障自动恢复",
"监控告警": "建立完善的监控和告警机制",
"数据一致性": "确保数据不重复不丢失"
}
6.2 性能优化
# 性能优化技巧
performance_optimization = {
"Redis优化": "使用合适的数据结构和过期策略",
"网络优化": "减少Redis网络延迟",
"内存管理": "控制队列大小和内存使用",
"并发控制": "合理设置并发数和延迟",
"批量操作": "使用批量操作减少Redis调用"
}
7. 常见陷阱
7.1 配置陷阱
- Redis连接:网络配置和连接池设置
- 队列管理:队列大小和内存限制
- 数据持久化:Redis持久化策略配置
7.2 运维陷阱
- 单点故障:Redis服务器故障
- 数据丢失:不当的清理操作
- 性能瓶颈:Redis成为性能瓶颈
8. 下一步学习
完成本章后,建议继续学习:
- 第7章:反爬虫对抗 - 学习反爬虫技术和对策
- 第8章:数据存储 - 掌握多种数据存储方案
- 第9章:监控与部署 - 学习生产环境部署
- 第10章:高级应用 - 探索高级爬虫技术
9. 练习题
9.1 基础练习
- 搭建一个简单的分布式爬虫系统
- 实现基于Redis的任务分发机制
- 开发一个爬虫状态监控工具
9.2 进阶练习
- 实现一个支持动态扩容的分布式爬虫系统
- 开发一个智能任务调度器
- 创建一个分布式爬虫管理平台
9.3 项目练习
- 大规模分布式爬虫:构建支持数百个节点的分布式系统
- 智能监控系统:开发具有预测能力的监控系统
- 云原生爬虫:基于Kubernetes的分布式爬虫系统
恭喜! 您已经掌握了分布式爬虫和Scrapy-Redis的核心技术。分布式爬虫是大规模数据采集的关键技术,掌握这些技能将使您能够构建高性能、高可用的爬虫系统。继续学习下一章,探索反爬虫对抗技术!