学习目标
通过本章学习,您将掌握:
- 理解常见的反爬虫技术和原理
- 掌握反爬虫对抗的策略和方法
- 学会使用代理、用户代理轮换等技术
- 了解验证码识别和处理技术
- 掌握JavaScript渲染和动态内容处理
- 学会模拟人类行为和智能爬虫技术
1. 反爬虫技术概述
1.1 常见反爬虫技术
# 1. 反爬虫技术分类和原理
print("🛡️ 反爬虫技术概述:")
class AntiSpiderTechniques:
"""
反爬虫技术分类和分析
"""
def __init__(self):
self.techniques = {
"基于请求特征": {
"User-Agent检测": "检测异常的用户代理字符串",
"请求频率限制": "限制单位时间内的请求数量",
"IP地址封禁": "封禁异常IP地址",
"请求头检测": "检测缺失或异常的HTTP头",
"会话管理": "要求维持有效的会话状态"
},
"基于行为特征": {
"鼠标轨迹检测": "检测鼠标移动和点击行为",
"键盘输入检测": "检测键盘输入模式",
"页面停留时间": "检测页面浏览时间",
"滚动行为": "检测页面滚动模式",
"交互行为": "检测用户交互行为"
},
"基于内容保护": {
"验证码": "图片、滑块、点击验证码",
"JavaScript挑战": "需要执行JavaScript才能获取内容",
"动态加载": "内容通过AJAX动态加载",
"字体反爬": "使用自定义字体混淆文本",
"CSS反爬": "使用CSS隐藏或混淆内容"
},
"基于环境检测": {
"浏览器指纹": "检测浏览器环境特征",
"WebGL指纹": "检测图形渲染特征",
"Canvas指纹": "检测Canvas渲染特征",
"设备指纹": "检测设备硬件特征",
"网络指纹": "检测网络环境特征"
}
}
def analyze_technique(self, category, technique):
"""
分析特定反爬虫技术
"""
if category in self.techniques and technique in self.techniques[category]:
description = self.techniques[category][technique]
analysis = {
"技术名称": technique,
"分类": category,
"描述": description,
"检测难度": self._get_detection_difficulty(technique),
"绕过难度": self._get_bypass_difficulty(technique),
"常见场景": self._get_common_scenarios(technique)
}
return analysis
return None
def _get_detection_difficulty(self, technique):
"""
获取检测难度
"""
difficulty_map = {
"User-Agent检测": "低",
"请求频率限制": "低",
"IP地址封禁": "低",
"验证码": "中",
"JavaScript挑战": "中",
"浏览器指纹": "高",
"Canvas指纹": "高"
}
return difficulty_map.get(technique, "中")
def _get_bypass_difficulty(self, technique):
"""
获取绕过难度
"""
difficulty_map = {
"User-Agent检测": "低",
"请求频率限制": "中",
"IP地址封禁": "中",
"验证码": "高",
"JavaScript挑战": "中",
"浏览器指纹": "高",
"Canvas指纹": "高"
}
return difficulty_map.get(technique, "中")
def _get_common_scenarios(self, technique):
"""
获取常见应用场景
"""
scenarios_map = {
"User-Agent检测": ["电商网站", "新闻网站", "论坛"],
"请求频率限制": ["API接口", "搜索引擎", "社交媒体"],
"验证码": ["登录页面", "注册页面", "敏感操作"],
"JavaScript挑战": ["CDN保护", "DDoS防护", "高价值内容"],
"浏览器指纹": ["金融网站", "广告平台", "安全要求高的网站"]
}
return scenarios_map.get(technique, ["通用场景"])
def show_all_techniques(self):
"""
展示所有反爬虫技术
"""
print("反爬虫技术分类:")
for category, techniques in self.techniques.items():
print(f"\n{category}:")
for technique, description in techniques.items():
print(f" • {technique}: {description}")
# 创建反爬虫技术分析器
anti_spider = AntiSpiderTechniques()
anti_spider.show_all_techniques()
# 分析特定技术
print("\n技术分析示例:")
analysis = anti_spider.analyze_technique("基于内容保护", "验证码")
if analysis:
for key, value in analysis.items():
print(f"{key}: {value}")
print("反爬虫技术概述完成!")
1.2 反爬虫检测原理
# 2. 反爬虫检测原理和实现
print("\n🔍 反爬虫检测原理:")
import time
import random
import hashlib
from collections import defaultdict, deque
from datetime import datetime, timedelta
class AntiSpiderDetector:
"""
反爬虫检测器示例(用于理解检测原理)
"""
def __init__(self):
# 请求统计
self.request_stats = defaultdict(lambda: {
'count': 0,
'timestamps': deque(maxlen=100),
'user_agents': set(),
'patterns': []
})
# 行为模式
self.behavior_patterns = defaultdict(list)
# 黑名单
self.blacklist = set()
# 检测规则
self.detection_rules = {
'max_requests_per_minute': 60,
'max_requests_per_hour': 1000,
'min_request_interval': 0.5,
'suspicious_user_agents': [
'python-requests',
'scrapy',
'curl',
'wget'
],
'required_headers': [
'User-Agent',
'Accept',
'Accept-Language'
]
}
def analyze_request(self, ip, user_agent, headers, timestamp=None):
"""
分析请求特征
"""
if timestamp is None:
timestamp = time.time()
# 记录请求
stats = self.request_stats[ip]
stats['count'] += 1
stats['timestamps'].append(timestamp)
stats['user_agents'].add(user_agent)
# 检测结果
detection_result = {
'ip': ip,
'timestamp': timestamp,
'is_suspicious': False,
'reasons': [],
'risk_score': 0
}
# 频率检测
risk_score = 0
# 1. 请求频率检测
recent_requests = [t for t in stats['timestamps']
if timestamp - t < 60] # 最近1分钟
if len(recent_requests) > self.detection_rules['max_requests_per_minute']:
detection_result['is_suspicious'] = True
detection_result['reasons'].append('请求频率过高')
risk_score += 30
# 2. 请求间隔检测
if len(stats['timestamps']) >= 2:
last_timestamp = stats['timestamps'][-2]
interval = timestamp - last_timestamp
if interval < self.detection_rules['min_request_interval']:
detection_result['is_suspicious'] = True
detection_result['reasons'].append('请求间隔过短')
risk_score += 20
# 3. User-Agent检测
if any(ua in user_agent.lower() for ua in self.detection_rules['suspicious_user_agents']):
detection_result['is_suspicious'] = True
detection_result['reasons'].append('可疑的User-Agent')
risk_score += 40
# 4. 请求头检测
missing_headers = []
for required_header in self.detection_rules['required_headers']:
if required_header not in headers:
missing_headers.append(required_header)
if missing_headers:
detection_result['is_suspicious'] = True
detection_result['reasons'].append(f'缺失请求头: {missing_headers}')
risk_score += 15 * len(missing_headers)
# 5. 行为模式检测
pattern_score = self.analyze_behavior_pattern(ip, timestamp)
risk_score += pattern_score
if pattern_score > 0:
detection_result['reasons'].append('异常行为模式')
detection_result['risk_score'] = min(risk_score, 100)
# 更新黑名单
if risk_score > 70:
self.blacklist.add(ip)
detection_result['action'] = 'blocked'
elif risk_score > 50:
detection_result['action'] = 'challenge'
else:
detection_result['action'] = 'allow'
return detection_result
def analyze_behavior_pattern(self, ip, timestamp):
"""
分析行为模式
"""
patterns = self.behavior_patterns[ip]
patterns.append(timestamp)
# 保留最近的行为记录
cutoff_time = timestamp - 3600 # 1小时
patterns[:] = [t for t in patterns if t > cutoff_time]
risk_score = 0
# 检测规律性访问
if len(patterns) >= 10:
intervals = [patterns[i] - patterns[i-1] for i in range(1, len(patterns))]
# 计算间隔的标准差
if intervals:
mean_interval = sum(intervals) / len(intervals)
variance = sum((x - mean_interval) ** 2 for x in intervals) / len(intervals)
std_dev = variance ** 0.5
# 如果间隔非常规律(标准差小),可能是机器人
if std_dev < 0.1 and mean_interval < 5:
risk_score += 25
return risk_score
def generate_challenge(self, challenge_type='captcha'):
"""
生成挑战
"""
challenges = {
'captcha': {
'type': 'image_captcha',
'image_url': '/captcha/generate',
'input_field': 'captcha_code'
},
'javascript': {
'type': 'js_challenge',
'script': 'var result = Math.pow(2, 10) + Math.sqrt(144);',
'expected': '1036'
},
'slider': {
'type': 'slider_captcha',
'background_image': '/captcha/slider_bg.jpg',
'slider_image': '/captcha/slider_piece.png'
}
}
return challenges.get(challenge_type, challenges['captcha'])
def get_detection_stats(self):
"""
获取检测统计
"""
total_ips = len(self.request_stats)
blacklisted_ips = len(self.blacklist)
total_requests = sum(stats['count'] for stats in self.request_stats.values())
return {
'total_ips': total_ips,
'blacklisted_ips': blacklisted_ips,
'total_requests': total_requests,
'blacklist_rate': blacklisted_ips / total_ips if total_ips > 0 else 0
}
# 使用示例
print("\n检测器使用示例:")
detector = AntiSpiderDetector()
# 模拟正常用户请求
normal_request = detector.analyze_request(
ip='192.168.1.100',
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5'
}
)
print("正常用户请求分析:")
print(f" 可疑: {normal_request['is_suspicious']}")
print(f" 风险分数: {normal_request['risk_score']}")
print(f" 动作: {normal_request['action']}")
# 模拟爬虫请求
spider_request = detector.analyze_request(
ip='192.168.1.200',
user_agent='python-requests/2.25.1',
headers={
'User-Agent': 'python-requests/2.25.1'
}
)
print("\n爬虫请求分析:")
print(f" 可疑: {spider_request['is_suspicious']}")
print(f" 风险分数: {spider_request['risk_score']}")
print(f" 原因: {spider_request['reasons']}")
print(f" 动作: {spider_request['action']}")
print("反爬虫检测原理演示完成!")
2. 基础对抗技术
2.1 请求头和User-Agent管理
# 3. 请求头和User-Agent管理
print("\n🔧 请求头和User-Agent管理:")
import random
import json
from scrapy.downloadermiddlewares.useragent import UserAgentMiddleware
class AdvancedUserAgentMiddleware(UserAgentMiddleware):
"""
高级User-Agent中间件
"""
def __init__(self, user_agent='Scrapy'):
super().__init__(user_agent)
# 真实浏览器User-Agent池
self.user_agent_pool = [
# Chrome
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
# Firefox
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:89.0) Gecko/20100101 Firefox/89.0',
'Mozilla/5.0 (X11; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0',
# Safari
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
'Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1',
# Edge
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.59'
]
# 对应的请求头模板
self.header_templates = {
'chrome': {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'Cache-Control': 'max-age=0'
},
'firefox': {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1'
},
'safari': {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-us',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
}
def process_request(self, request, spider):
"""
处理请求,设置User-Agent和相关头部
"""
# 随机选择User-Agent
user_agent = random.choice(self.user_agent_pool)
request.headers['User-Agent'] = user_agent
# 根据User-Agent类型设置对应的请求头
browser_type = self.detect_browser_type(user_agent)
headers = self.header_templates.get(browser_type, self.header_templates['chrome'])
# 设置请求头
for header_name, header_value in headers.items():
if header_name not in request.headers:
request.headers[header_name] = header_value
# 添加随机性
self.add_randomness(request)
return None
def detect_browser_type(self, user_agent):
"""
检测浏览器类型
"""
user_agent_lower = user_agent.lower()
if 'firefox' in user_agent_lower:
return 'firefox'
elif 'safari' in user_agent_lower and 'chrome' not in user_agent_lower:
return 'safari'
else:
return 'chrome'
def add_randomness(self, request):
"""
添加随机性
"""
# 随机添加一些可选头部
optional_headers = {
'X-Requested-With': 'XMLHttpRequest',
'X-Forwarded-For': self.generate_fake_ip(),
'X-Real-IP': self.generate_fake_ip(),
}
# 随机选择是否添加可选头部
for header_name, header_value in optional_headers.items():
if random.random() < 0.3: # 30%概率添加
request.headers[header_name] = header_value
def generate_fake_ip(self):
"""
生成虚假IP地址
"""
return f"{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}"
# 请求头管理器
class HeaderManager:
"""
请求头管理器
"""
def __init__(self):
self.session_headers = {}
self.common_headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
def get_headers_for_domain(self, domain):
"""
获取特定域名的请求头
"""
if domain not in self.session_headers:
self.session_headers[domain] = self.generate_session_headers()
return self.session_headers[domain]
def generate_session_headers(self):
"""
生成会话请求头
"""
headers = self.common_headers.copy()
# 添加随机变化
languages = [
'en-US,en;q=0.9',
'en-US,en;q=0.8,zh-CN;q=0.6',
'zh-CN,zh;q=0.9,en;q=0.8'
]
headers['Accept-Language'] = random.choice(languages)
return headers
def update_headers_with_referer(self, headers, referer):
"""
更新请求头中的Referer
"""
if referer:
headers['Referer'] = referer
return headers
print("请求头和User-Agent管理完成!")
2.2 代理管理系统
# 4. 代理管理系统
print("\n🌐 代理管理系统:")
import requests
import time
import threading
from queue import Queue, Empty
from urllib.parse import urlparse
class ProxyManager:
"""
代理管理器
"""
def __init__(self):
self.proxy_pool = Queue()
self.failed_proxies = set()
self.proxy_stats = {}
self.lock = threading.Lock()
# 代理来源配置
self.proxy_sources = [
'https://www.proxy-list.download/api/v1/get?type=http',
'https://api.proxyscrape.com/v2/?request=get&protocol=http',
# 添加更多代理源
]
# 验证配置
self.test_url = 'http://httpbin.org/ip'
self.test_timeout = 10
def fetch_proxies_from_source(self, source_url):
"""
从代理源获取代理列表
"""
try:
response = requests.get(source_url, timeout=30)
if response.status_code == 200:
# 解析代理列表(根据不同源的格式)
proxies = self.parse_proxy_response(response.text, source_url)
return proxies
except Exception as e:
print(f"获取代理失败 {source_url}: {e}")
return []
def parse_proxy_response(self, response_text, source_url):
"""
解析代理响应
"""
proxies = []
lines = response_text.strip().split('\n')
for line in lines:
line = line.strip()
if ':' in line:
try:
# 简单的IP:PORT格式
ip, port = line.split(':')
proxy = {
'ip': ip.strip(),
'port': int(port.strip()),
'type': 'http',
'source': source_url
}
proxies.append(proxy)
except ValueError:
continue
return proxies
def validate_proxy(self, proxy):
"""
验证代理可用性
"""
proxy_url = f"http://{proxy['ip']}:{proxy['port']}"
proxies = {
'http': proxy_url,
'https': proxy_url
}
try:
start_time = time.time()
response = requests.get(
self.test_url,
proxies=proxies,
timeout=self.test_timeout
)
if response.status_code == 200:
response_time = time.time() - start_time
# 更新代理统计
proxy_key = f"{proxy['ip']}:{proxy['port']}"
self.proxy_stats[proxy_key] = {
'response_time': response_time,
'success_count': self.proxy_stats.get(proxy_key, {}).get('success_count', 0) + 1,
'last_success': time.time(),
'total_requests': self.proxy_stats.get(proxy_key, {}).get('total_requests', 0) + 1
}
return True
except Exception as e:
# 记录失败
proxy_key = f"{proxy['ip']}:{proxy['port']}"
if proxy_key in self.proxy_stats:
self.proxy_stats[proxy_key]['total_requests'] += 1
return False
def load_proxies(self):
"""
加载代理到池中
"""
print("开始加载代理...")
all_proxies = []
# 从各个源获取代理
for source in self.proxy_sources:
proxies = self.fetch_proxies_from_source(source)
all_proxies.extend(proxies)
print(f"从 {source} 获取到 {len(proxies)} 个代理")
# 验证代理
valid_proxies = []
for proxy in all_proxies:
if self.validate_proxy(proxy):
valid_proxies.append(proxy)
self.proxy_pool.put(proxy)
print(f"验证完成,有效代理: {len(valid_proxies)}")
return len(valid_proxies)
def get_proxy(self):
"""
获取一个可用代理
"""
try:
proxy = self.proxy_pool.get_nowait()
# 检查是否在失败列表中
proxy_key = f"{proxy['ip']}:{proxy['port']}"
if proxy_key in self.failed_proxies:
return self.get_proxy() # 递归获取下一个
return proxy
except Empty:
# 代理池为空,尝试重新加载
if self.load_proxies() > 0:
return self.get_proxy()
else:
return None
def return_proxy(self, proxy, success=True):
"""
归还代理到池中
"""
proxy_key = f"{proxy['ip']}:{proxy['port']}"
if success:
# 成功使用,归还到池中
self.proxy_pool.put(proxy)
# 从失败列表中移除(如果存在)
with self.lock:
self.failed_proxies.discard(proxy_key)
else:
# 使用失败,加入失败列表
with self.lock:
self.failed_proxies.add(proxy_key)
def get_proxy_stats(self):
"""
获取代理统计信息
"""
total_proxies = len(self.proxy_stats)
failed_proxies = len(self.failed_proxies)
available_proxies = self.proxy_pool.qsize()
# 计算平均响应时间
response_times = [stats.get('response_time', 0)
for stats in self.proxy_stats.values()
if 'response_time' in stats]
avg_response_time = sum(response_times) / len(response_times) if response_times else 0
return {
'total_proxies': total_proxies,
'failed_proxies': failed_proxies,
'available_proxies': available_proxies,
'average_response_time': avg_response_time
}
# Scrapy代理中间件
class ProxyMiddleware:
"""
Scrapy代理中间件
"""
def __init__(self):
self.proxy_manager = ProxyManager()
self.proxy_manager.load_proxies()
def process_request(self, request, spider):
"""
处理请求,设置代理
"""
proxy = self.proxy_manager.get_proxy()
if proxy:
proxy_url = f"http://{proxy['ip']}:{proxy['port']}"
request.meta['proxy'] = proxy_url
request.meta['proxy_info'] = proxy
spider.logger.info(f"使用代理: {proxy_url}")
return None
def process_response(self, request, response, spider):
"""
处理响应
"""
if 'proxy_info' in request.meta:
proxy = request.meta['proxy_info']
# 根据响应状态判断代理是否成功
if response.status == 200:
self.proxy_manager.return_proxy(proxy, success=True)
else:
self.proxy_manager.return_proxy(proxy, success=False)
return response
def process_exception(self, request, exception, spider):
"""
处理异常
"""
if 'proxy_info' in request.meta:
proxy = request.meta['proxy_info']
self.proxy_manager.return_proxy(proxy, success=False)
return None
# 使用示例
print("\n代理管理器使用示例:")
# 创建代理管理器
proxy_manager = ProxyManager()
# 手动添加一些测试代理
test_proxies = [
{'ip': '127.0.0.1', 'port': 8080, 'type': 'http', 'source': 'manual'},
{'ip': '192.168.1.1', 'port': 3128, 'type': 'http', 'source': 'manual'}
]
for proxy in test_proxies:
proxy_manager.proxy_pool.put(proxy)
# 获取统计信息
stats = proxy_manager.get_proxy_stats()
print("代理统计:")
for key, value in stats.items():
print(f" {key}: {value}")
print("代理管理系统完成!")
3. 高级对抗技术
3.1 验证码识别与处理
# 5. 验证码识别与处理
print("\n🔐 验证码识别与处理:")
import base64
import io
from PIL import Image
import cv2
import numpy as np
class CaptchaHandler:
"""
验证码处理器
"""
def __init__(self):
self.ocr_engines = {
'tesseract': self.tesseract_ocr,
'ddddocr': self.ddddocr_ocr,
'manual': self.manual_solve
}
# 验证码类型检测规则
self.captcha_patterns = {
'simple_text': {
'description': '简单文本验证码',
'features': ['4-6位字符', '无干扰线', '字符清晰'],
'solver': 'tesseract'
},
'complex_text': {
'description': '复杂文本验证码',
'features': ['扭曲字符', '干扰线', '噪点'],
'solver': 'ddddocr'
},
'math_captcha': {
'description': '数学运算验证码',
'features': ['数学表达式', '加减乘除'],
'solver': 'math_solver'
},
'slider_captcha': {
'description': '滑块验证码',
'features': ['拼图块', '滑动轨迹'],
'solver': 'slider_solver'
},
'click_captcha': {
'description': '点击验证码',
'features': ['点击指定区域', '文字描述'],
'solver': 'click_solver'
}
}
def detect_captcha_type(self, image_data):
"""
检测验证码类型
"""
# 这里简化处理,实际应该使用图像分析
image = Image.open(io.BytesIO(image_data))
width, height = image.size
# 基于尺寸和特征判断
if width > 300 and height > 150:
return 'slider_captcha'
elif width < 100 and height < 50:
return 'simple_text'
else:
return 'complex_text'
def preprocess_image(self, image_data, captcha_type):
"""
图像预处理
"""
image = Image.open(io.BytesIO(image_data))
if captcha_type in ['simple_text', 'complex_text']:
# 转换为灰度图
image = image.convert('L')
# 转换为numpy数组
img_array = np.array(image)
# 二值化
_, binary = cv2.threshold(img_array, 127, 255, cv2.THRESH_BINARY)
# 去噪
kernel = np.ones((2, 2), np.uint8)
cleaned = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel)
# 转换回PIL图像
processed_image = Image.fromarray(cleaned)
return processed_image
return image
def tesseract_ocr(self, image):
"""
使用Tesseract OCR识别
"""
try:
import pytesseract
# 配置Tesseract
config = '--psm 8 -c tessedit_char_whitelist=0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
text = pytesseract.image_to_string(image, config=config)
return text.strip()
except ImportError:
print("Tesseract未安装,请安装pytesseract")
return None
except Exception as e:
print(f"Tesseract识别失败: {e}")
return None
def ddddocr_ocr(self, image):
"""
使用ddddocr识别
"""
try:
import ddddocr
ocr = ddddocr.DdddOcr()
# 转换图像为字节
img_bytes = io.BytesIO()
image.save(img_bytes, format='PNG')
img_bytes = img_bytes.getvalue()
result = ocr.classification(img_bytes)
return result
except ImportError:
print("ddddocr未安装,请安装ddddocr")
return None
except Exception as e:
print(f"ddddocr识别失败: {e}")
return None
def manual_solve(self, image):
"""
手动解决验证码
"""
# 保存图像供手动识别
image.save('captcha_manual.png')
print("验证码已保存为 captcha_manual.png,请手动识别")
# 在实际应用中,这里可以:
# 1. 发送到人工识别平台
# 2. 显示图像让用户输入
# 3. 集成第三方验证码识别服务
return input("请输入验证码: ")
def solve_math_captcha(self, image):
"""
解决数学验算验证码
"""
# 先用OCR识别文本
text = self.tesseract_ocr(image)
if text:
# 解析数学表达式
try:
# 简单的数学表达式求解
# 注意:eval有安全风险,实际应用中应该使用更安全的方法
result = eval(text.replace('=', '').replace('?', ''))
return str(result)
except:
return None
return None
def solve_slider_captcha(self, background_image, slider_image):
"""
解决滑块验证码
"""
# 模板匹配找到滑块位置
bg_array = np.array(background_image)
slider_array = np.array(slider_image)
# 使用OpenCV模板匹配
result = cv2.matchTemplate(bg_array, slider_array, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
# 返回滑动距离
return max_loc[0]
def generate_slider_trajectory(self, distance):
"""
生成滑块轨迹(模拟人类行为)
"""
trajectory = []
current = 0
mid = distance * 4 / 5 # 80%处开始减速
t = 0.2
v = 0
while current < distance:
if current < mid:
a = 2 # 加速度
else:
a = -3 # 减速度
v0 = v
v = v0 + a * t
move = v0 * t + 1 / 2 * a * t * t
current += move
trajectory.append(round(move))
return trajectory
def solve_captcha(self, image_data, captcha_type=None):
"""
解决验证码
"""
if captcha_type is None:
captcha_type = self.detect_captcha_type(image_data)
print(f"检测到验证码类型: {captcha_type}")
# 预处理图像
processed_image = self.preprocess_image(image_data, captcha_type)
# 选择合适的解决方案
if captcha_type == 'math_captcha':
return self.solve_math_captcha(processed_image)
elif captcha_type in ['simple_text', 'complex_text']:
# 尝试多种OCR引擎
for engine_name, engine_func in self.ocr_engines.items():
if engine_name == 'manual':
continue
result = engine_func(processed_image)
if result and len(result) >= 3: # 基本验证
return result
# 如果自动识别失败,使用手动识别
return self.manual_solve(processed_image)
return None
# Scrapy验证码中间件
class CaptchaMiddleware:
"""
Scrapy验证码处理中间件
"""
def __init__(self):
self.captcha_handler = CaptchaHandler()
self.captcha_urls = set() # 记录遇到验证码的URL
def process_response(self, request, response, spider):
"""
处理响应,检测验证码
"""
# 检测是否遇到验证码页面
if self.is_captcha_page(response):
spider.logger.info(f"检测到验证码页面: {response.url}")
# 提取验证码图像
captcha_image_url = self.extract_captcha_image_url(response)
if captcha_image_url:
# 下载验证码图像
captcha_response = self.download_captcha_image(captcha_image_url)
if captcha_response:
# 解决验证码
captcha_result = self.captcha_handler.solve_captcha(captcha_response.content)
if captcha_result:
# 提交验证码
return self.submit_captcha(request, response, captcha_result, spider)
return response
def is_captcha_page(self, response):
"""
检测是否为验证码页面
"""
# 检测页面特征
captcha_indicators = [
'captcha',
'verification',
'verify',
'验证码',
'人机验证'
]
page_content = response.text.lower()
for indicator in captcha_indicators:
if indicator in page_content:
return True
return False
def extract_captcha_image_url(self, response):
"""
提取验证码图像URL
"""
# 查找验证码图像
captcha_selectors = [
'img[src*="captcha"]::attr(src)',
'img[src*="verify"]::attr(src)',
'img[id*="captcha"]::attr(src)',
'.captcha img::attr(src)'
]
for selector in captcha_selectors:
image_url = response.css(selector).get()
if image_url:
return response.urljoin(image_url)
return None
def download_captcha_image(self, image_url):
"""
下载验证码图像
"""
try:
import requests
response = requests.get(image_url, timeout=10)
if response.status_code == 200:
return response
except Exception as e:
print(f"下载验证码图像失败: {e}")
return None
def submit_captcha(self, request, response, captcha_result, spider):
"""
提交验证码
"""
# 这里需要根据具体网站的验证码提交方式来实现
# 通常需要找到表单并提交验证码
spider.logger.info(f"验证码识别结果: {captcha_result}")
# 返回原响应,实际应用中应该提交验证码并返回新的请求
return response
print("验证码识别与处理完成!")
3.2 JavaScript渲染和动态内容处理
# 6. JavaScript渲染和动态内容处理
print("\n🌐 JavaScript渲染和动态内容处理:")
import json
import time
import asyncio
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
class JavaScriptRenderer:
"""
JavaScript渲染器
"""
def __init__(self, headless=True, proxy=None):
self.headless = headless
self.proxy = proxy
self.driver = None
# 渲染配置
self.render_config = {
'page_load_timeout': 30,
'script_timeout': 30,
'implicit_wait': 10,
'window_size': (1920, 1080)
}
# JavaScript代码库
self.js_scripts = {
'scroll_to_bottom': """
window.scrollTo(0, document.body.scrollHeight);
""",
'wait_for_ajax': """
return jQuery.active == 0;
""",
'get_page_info': """
return {
title: document.title,
url: window.location.href,
readyState: document.readyState,
height: document.body.scrollHeight,
width: document.body.scrollWidth
};
""",
'simulate_human_behavior': """
// 模拟鼠标移动
function simulateMouseMove() {
var event = new MouseEvent('mousemove', {
view: window,
bubbles: true,
cancelable: true,
clientX: Math.random() * window.innerWidth,
clientY: Math.random() * window.innerHeight
});
document.dispatchEvent(event);
}
// 模拟滚动
function simulateScroll() {
window.scrollBy(0, Math.random() * 100);
}
// 执行模拟行为
simulateMouseMove();
setTimeout(simulateScroll, 1000);
""",
'extract_dynamic_content': """
// 提取动态加载的内容
var dynamicElements = document.querySelectorAll('[data-loaded="true"], .dynamic-content, .ajax-content');
var content = [];
dynamicElements.forEach(function(element) {
content.push({
tag: element.tagName,
text: element.textContent.trim(),
html: element.innerHTML,
attributes: Array.from(element.attributes).map(attr => ({
name: attr.name,
value: attr.value
}))
});
});
return content;
"""
}
def setup_driver(self):
"""
设置WebDriver
"""
chrome_options = Options()
if self.headless:
chrome_options.add_argument('--headless')
# 基本配置
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--disable-extensions')
chrome_options.add_argument('--disable-plugins')
chrome_options.add_argument('--disable-images') # 禁用图片加载
# 设置窗口大小
chrome_options.add_argument(f'--window-size={self.render_config["window_size"][0]},{self.render_config["window_size"][1]}')
# 设置代理
if self.proxy:
chrome_options.add_argument(f'--proxy-server={self.proxy}')
# 设置User-Agent
chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
# 禁用自动化检测
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option('useAutomationExtension', False)
# 创建驱动
self.driver = webdriver.Chrome(options=chrome_options)
# 设置超时
self.driver.set_page_load_timeout(self.render_config['page_load_timeout'])
self.driver.set_script_timeout(self.render_config['script_timeout'])
self.driver.implicitly_wait(self.render_config['implicit_wait'])
# 执行反检测脚本
self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
def render_page(self, url, wait_conditions=None):
"""
渲染页面
"""
if not self.driver:
self.setup_driver()
try:
# 访问页面
self.driver.get(url)
# 等待页面加载完成
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# 执行自定义等待条件
if wait_conditions:
self.wait_for_conditions(wait_conditions)
# 模拟人类行为
self.simulate_human_behavior()
# 等待动态内容加载
self.wait_for_dynamic_content()
# 获取页面信息
page_info = self.get_page_info()
# 获取最终的HTML
html_content = self.driver.page_source
return {
'url': url,
'html': html_content,
'page_info': page_info,
'screenshot': self.take_screenshot(),
'dynamic_content': self.extract_dynamic_content()
}
except Exception as e:
print(f"页面渲染失败 {url}: {e}")
return None
def wait_for_conditions(self, conditions):
"""
等待自定义条件
"""
for condition in conditions:
if condition['type'] == 'element_present':
WebDriverWait(self.driver, condition.get('timeout', 10)).until(
EC.presence_of_element_located((By.CSS_SELECTOR, condition['selector']))
)
elif condition['type'] == 'element_clickable':
WebDriverWait(self.driver, condition.get('timeout', 10)).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, condition['selector']))
)
elif condition['type'] == 'text_present':
WebDriverWait(self.driver, condition.get('timeout', 10)).until(
EC.text_to_be_present_in_element((By.CSS_SELECTOR, condition['selector']), condition['text'])
)
elif condition['type'] == 'javascript':
WebDriverWait(self.driver, condition.get('timeout', 10)).until(
lambda driver: driver.execute_script(condition['script'])
)
def simulate_human_behavior(self):
"""
模拟人类行为
"""
# 随机等待
time.sleep(random.uniform(1, 3))
# 执行模拟行为脚本
self.driver.execute_script(self.js_scripts['simulate_human_behavior'])
# 随机滚动
for _ in range(random.randint(1, 3)):
self.driver.execute_script(self.js_scripts['scroll_to_bottom'])
time.sleep(random.uniform(0.5, 1.5))
def wait_for_dynamic_content(self):
"""
等待动态内容加载
"""
# 等待jQuery AJAX完成(如果页面使用jQuery)
try:
WebDriverWait(self.driver, 5).until(
lambda driver: driver.execute_script(self.js_scripts['wait_for_ajax'])
)
except:
pass # 页面可能不使用jQuery
# 等待一段时间让其他异步内容加载
time.sleep(2)
def get_page_info(self):
"""
获取页面信息
"""
return self.driver.execute_script(self.js_scripts['get_page_info'])
def extract_dynamic_content(self):
"""
提取动态内容
"""
return self.driver.execute_script(self.js_scripts['extract_dynamic_content'])
def take_screenshot(self):
"""
截取页面截图
"""
try:
screenshot = self.driver.get_screenshot_as_base64()
return screenshot
except:
return None
def close(self):
"""
关闭浏览器
"""
if self.driver:
self.driver.quit()
self.driver = None
# Scrapy JavaScript中间件
class JavaScriptMiddleware:
"""
Scrapy JavaScript渲染中间件
"""
def __init__(self):
self.renderer = JavaScriptRenderer(headless=True)
# 需要JavaScript渲染的URL模式
self.js_required_patterns = [
r'.*\.spa\..*', # SPA应用
r'.*react.*', # React应用
r'.*vue.*', # Vue应用
r'.*angular.*' # Angular应用
]
def process_request(self, request, spider):
"""
处理请求
"""
# 检查是否需要JavaScript渲染
if self.requires_javascript(request.url):
spider.logger.info(f"使用JavaScript渲染: {request.url}")
# 渲染页面
render_result = self.renderer.render_page(request.url)
if render_result:
# 创建新的响应对象
from scrapy.http import HtmlResponse
response = HtmlResponse(
url=request.url,
body=render_result['html'].encode('utf-8'),
encoding='utf-8',
request=request
)
# 添加渲染信息到meta
response.meta['render_info'] = render_result
return response
return None
def requires_javascript(self, url):
"""
检查URL是否需要JavaScript渲染
"""
import re
for pattern in self.js_required_patterns:
if re.match(pattern, url):
return True
return False
def spider_closed(self, spider):
"""
爬虫关闭时清理资源
"""
self.renderer.close()
# 异步JavaScript渲染器(使用Playwright)
class AsyncJavaScriptRenderer:
"""
异步JavaScript渲染器
"""
def __init__(self):
self.browser = None
self.context = None
async def setup(self):
"""
设置浏览器
"""
from playwright.async_api import async_playwright
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(
headless=True,
args=[
'--no-sandbox',
'--disable-dev-shm-usage',
'--disable-gpu'
]
)
self.context = await self.browser.new_context(
viewport={'width': 1920, 'height': 1080},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
)
async def render_page(self, url):
"""
异步渲染页面
"""
if not self.browser:
await self.setup()
page = await self.context.new_page()
try:
# 访问页面
await page.goto(url, wait_until='networkidle')
# 等待动态内容
await page.wait_for_timeout(2000)
# 获取HTML内容
html_content = await page.content()
# 获取页面信息
page_info = await page.evaluate("""
() => ({
title: document.title,
url: window.location.href,
height: document.body.scrollHeight,
width: document.body.scrollWidth
})
""")
return {
'url': url,
'html': html_content,
'page_info': page_info
}
finally:
await page.close()
async def close(self):
"""
关闭浏览器
"""
if self.context:
await self.context.close()
if self.browser:
await self.browser.close()
if hasattr(self, 'playwright'):
await self.playwright.stop()
print("JavaScript渲染和动态内容处理完成!")
4. 智能行为模拟
4.1 人类行为模拟
# 7. 人类行为模拟
print("\n🤖 人类行为模拟:")
import random
import time
import math
from datetime import datetime, timedelta
class HumanBehaviorSimulator:
"""
人类行为模拟器
"""
def __init__(self):
# 行为模式配置
self.behavior_patterns = {
'browsing_speed': {
'min_page_time': 3, # 最小页面停留时间(秒)
'max_page_time': 30, # 最大页面停留时间(秒)
'avg_page_time': 8 # 平均页面停留时间(秒)
},
'request_intervals': {
'min_interval': 0.5, # 最小请求间隔(秒)
'max_interval': 5.0, # 最大请求间隔(秒)
'avg_interval': 2.0 # 平均请求间隔(秒)
},
'session_patterns': {
'session_duration': (300, 1800), # 会话持续时间范围(秒)
'pages_per_session': (5, 50), # 每个会话的页面数量
'break_probability': 0.1 # 休息概率
},
'mouse_behavior': {
'move_speed': (100, 300), # 鼠标移动速度(像素/秒)
'click_delay': (0.1, 0.5), # 点击延迟(秒)
'scroll_speed': (50, 200) # 滚动速度(像素/秒)
}
}
# 用户类型配置
self.user_types = {
'casual_browser': {
'description': '休闲浏览者',
'page_time_multiplier': 1.5,
'interval_multiplier': 2.0,
'error_rate': 0.05
},
'power_user': {
'description': '高级用户',
'page_time_multiplier': 0.8,
'interval_multiplier': 0.6,
'error_rate': 0.02
},
'researcher': {
'description': '研究人员',
'page_time_multiplier': 3.0,
'interval_multiplier': 1.5,
'error_rate': 0.01
}
}
# 当前会话状态
self.session_state = {
'start_time': None,
'pages_visited': 0,
'last_request_time': None,
'user_type': 'casual_browser',
'fatigue_level': 0.0
}
def start_session(self, user_type='casual_browser'):
"""
开始新的会话
"""
self.session_state = {
'start_time': time.time(),
'pages_visited': 0,
'last_request_time': None,
'user_type': user_type,
'fatigue_level': 0.0
}
print(f"开始新会话,用户类型: {self.user_types[user_type]['description']}")
def calculate_page_time(self):
"""
计算页面停留时间
"""
base_time = self.behavior_patterns['browsing_speed']['avg_page_time']
user_config = self.user_types[self.session_state['user_type']]
# 应用用户类型修正
adjusted_time = base_time * user_config['page_time_multiplier']
# 添加疲劳因子
fatigue_factor = 1 + self.session_state['fatigue_level'] * 0.5
adjusted_time *= fatigue_factor
# 添加随机性
min_time = self.behavior_patterns['browsing_speed']['min_page_time']
max_time = self.behavior_patterns['browsing_speed']['max_page_time']
# 使用正态分布生成更自然的时间
page_time = random.normalvariate(adjusted_time, adjusted_time * 0.3)
page_time = max(min_time, min(max_time, page_time))
return page_time
def calculate_request_interval(self):
"""
计算请求间隔
"""
base_interval = self.behavior_patterns['request_intervals']['avg_interval']
user_config = self.user_types[self.session_state['user_type']]
# 应用用户类型修正
adjusted_interval = base_interval * user_config['interval_multiplier']
# 添加疲劳因子
fatigue_factor = 1 + self.session_state['fatigue_level'] * 0.3
adjusted_interval *= fatigue_factor
# 添加随机性
min_interval = self.behavior_patterns['request_intervals']['min_interval']
max_interval = self.behavior_patterns['request_intervals']['max_interval']
# 使用指数分布生成更真实的间隔
interval = random.expovariate(1.0 / adjusted_interval)
interval = max(min_interval, min(max_interval, interval))
return interval
def should_take_break(self):
"""
判断是否应该休息
"""
# 基于访问页面数和疲劳程度判断
pages_visited = self.session_state['pages_visited']
fatigue_level = self.session_state['fatigue_level']
break_probability = self.behavior_patterns['session_patterns']['break_probability']
# 随着页面访问增加,休息概率增加
adjusted_probability = break_probability * (1 + pages_visited * 0.01)
adjusted_probability *= (1 + fatigue_level)
return random.random() < adjusted_probability
def take_break(self):
"""
模拟休息行为
"""
break_duration = random.uniform(30, 300) # 30秒到5分钟
print(f"用户休息 {break_duration:.1f} 秒")
# 重置疲劳程度
self.session_state['fatigue_level'] *= 0.5
return break_duration
def update_fatigue(self):
"""
更新疲劳程度
"""
# 每访问一个页面增加疲劳
self.session_state['fatigue_level'] += 0.01
# 疲劳程度上限
self.session_state['fatigue_level'] = min(1.0, self.session_state['fatigue_level'])
def simulate_mouse_movement(self, start_pos, end_pos):
"""
模拟鼠标移动轨迹
"""
x1, y1 = start_pos
x2, y2 = end_pos
# 计算距离
distance = math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)
# 计算移动时间
speed = random.uniform(*self.behavior_patterns['mouse_behavior']['move_speed'])
move_time = distance / speed
# 生成贝塞尔曲线轨迹
trajectory = []
steps = max(10, int(move_time * 60)) # 60 FPS
# 控制点(添加随机性)
cx = (x1 + x2) / 2 + random.uniform(-50, 50)
cy = (y1 + y2) / 2 + random.uniform(-50, 50)
for i in range(steps + 1):
t = i / steps
# 二次贝塞尔曲线
x = (1 - t) ** 2 * x1 + 2 * (1 - t) * t * cx + t ** 2 * x2
y = (1 - t) ** 2 * y1 + 2 * (1 - t) * t * cy + t ** 2 * y2
trajectory.append((int(x), int(y), move_time / steps))
return trajectory
def simulate_typing(self, text):
"""
模拟打字行为
"""
typing_pattern = []
for char in text:
# 基础打字速度(字符/分钟)
base_speed = random.uniform(200, 400)
# 字符难度修正
if char.isupper():
speed_modifier = 0.8 # 大写字母稍慢
elif char.isdigit():
speed_modifier = 0.9 # 数字稍慢
elif char in '!@#$%^&*()':
speed_modifier = 0.7 # 特殊字符更慢
else:
speed_modifier = 1.0
# 计算按键间隔
interval = 60.0 / (base_speed * speed_modifier)
# 添加随机性
interval *= random.uniform(0.7, 1.3)
typing_pattern.append((char, interval))
return typing_pattern
def get_next_action_delay(self):
"""
获取下一个动作的延迟时间
"""
current_time = time.time()
if self.session_state['last_request_time']:
# 计算建议的间隔时间
suggested_interval = self.calculate_request_interval()
elapsed_time = current_time - self.session_state['last_request_time']
if elapsed_time < suggested_interval:
delay = suggested_interval - elapsed_time
else:
delay = 0
else:
delay = 0
# 更新状态
self.session_state['last_request_time'] = current_time + delay
self.session_state['pages_visited'] += 1
self.update_fatigue()
return delay
# 智能请求调度器
class IntelligentRequestScheduler:
"""
智能请求调度器
"""
def __init__(self):
self.behavior_simulator = HumanBehaviorSimulator()
self.request_queue = []
self.active_sessions = {}
# 调度配置
self.scheduler_config = {
'max_concurrent_sessions': 5,
'session_rotation_interval': 300, # 5分钟
'adaptive_delay': True,
'respect_robots_txt': True
}
def add_request(self, request, priority=1):
"""
添加请求到队列
"""
request_info = {
'request': request,
'priority': priority,
'added_time': time.time(),
'attempts': 0,
'session_id': self.get_session_for_domain(request.url)
}
self.request_queue.append(request_info)
self.request_queue.sort(key=lambda x: x['priority'], reverse=True)
def get_session_for_domain(self, url):
"""
为域名获取或创建会话
"""
from urllib.parse import urlparse
domain = urlparse(url).netloc
if domain not in self.active_sessions:
session_id = f"{domain}_{int(time.time())}"
self.active_sessions[domain] = {
'session_id': session_id,
'start_time': time.time(),
'request_count': 0,
'last_request_time': None
}
return self.active_sessions[domain]['session_id']
def get_next_request(self):
"""
获取下一个要处理的请求
"""
if not self.request_queue:
return None
# 选择优先级最高且可以立即处理的请求
for i, request_info in enumerate(self.request_queue):
if self.can_process_request(request_info):
return self.request_queue.pop(i)
# 如果没有可立即处理的请求,返回延迟时间最短的
return self.request_queue.pop(0)
def can_process_request(self, request_info):
"""
检查请求是否可以立即处理
"""
session_id = request_info['session_id']
# 检查会话是否存在
for domain, session in self.active_sessions.items():
if session['session_id'] == session_id:
# 检查请求间隔
if session['last_request_time']:
elapsed = time.time() - session['last_request_time']
min_interval = self.behavior_simulator.calculate_request_interval()
return elapsed >= min_interval
return True
return True
def calculate_adaptive_delay(self, response_time, error_rate):
"""
计算自适应延迟
"""
base_delay = self.behavior_simulator.calculate_request_interval()
# 根据响应时间调整
if response_time > 5.0: # 响应时间超过5秒
delay_multiplier = 1.5
elif response_time > 2.0: # 响应时间超过2秒
delay_multiplier = 1.2
else:
delay_multiplier = 1.0
# 根据错误率调整
if error_rate > 0.1: # 错误率超过10%
delay_multiplier *= 2.0
elif error_rate > 0.05: # 错误率超过5%
delay_multiplier *= 1.5
return base_delay * delay_multiplier
print("人类行为模拟完成!")
4.2 会话管理和状态保持
# 8. 会话管理和状态保持
print("\n🔄 会话管理和状态保持:")
import pickle
import sqlite3
from http.cookies import SimpleCookie
class SessionManager:
"""
会话管理器
"""
def __init__(self, storage_type='file'):
self.storage_type = storage_type
self.sessions = {}
if storage_type == 'database':
self.init_database()
def init_database(self):
"""
初始化数据库
"""
self.conn = sqlite3.connect('sessions.db')
cursor = self.conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
domain TEXT,
cookies TEXT,
headers TEXT,
user_agent TEXT,
proxy TEXT,
created_time REAL,
last_used_time REAL,
request_count INTEGER,
success_count INTEGER
)
''')
self.conn.commit()
def create_session(self, domain, user_agent=None, proxy=None):
"""
创建新会话
"""
session_id = f"{domain}_{int(time.time())}_{random.randint(1000, 9999)}"
session_data = {
'session_id': session_id,
'domain': domain,
'cookies': {},
'headers': {},
'user_agent': user_agent,
'proxy': proxy,
'created_time': time.time(),
'last_used_time': time.time(),
'request_count': 0,
'success_count': 0,
'state': 'active'
}
self.sessions[session_id] = session_data
self.save_session(session_data)
return session_id
def get_session(self, session_id):
"""
获取会话
"""
if session_id in self.sessions:
return self.sessions[session_id]
# 从存储中加载
return self.load_session(session_id)
def update_session(self, session_id, **kwargs):
"""
更新会话
"""
if session_id in self.sessions:
session = self.sessions[session_id]
for key, value in kwargs.items():
if key in session:
session[key] = value
session['last_used_time'] = time.time()
self.save_session(session)
def add_cookies(self, session_id, cookies):
"""
添加Cookie到会话
"""
session = self.get_session(session_id)
if session:
if isinstance(cookies, dict):
session['cookies'].update(cookies)
elif isinstance(cookies, str):
# 解析Cookie字符串
cookie_obj = SimpleCookie()
cookie_obj.load(cookies)
for key, morsel in cookie_obj.items():
session['cookies'][key] = morsel.value
self.save_session(session)
def get_cookies_for_request(self, session_id):
"""
获取请求的Cookie
"""
session = self.get_session(session_id)
if session and session['cookies']:
return session['cookies']
return {}
def save_session(self, session_data):
"""
保存会话
"""
if self.storage_type == 'file':
filename = f"session_{session_data['session_id']}.pkl"
with open(filename, 'wb') as f:
pickle.dump(session_data, f)
elif self.storage_type == 'database':
cursor = self.conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO sessions
(session_id, domain, cookies, headers, user_agent, proxy,
created_time, last_used_time, request_count, success_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
session_data['session_id'],
session_data['domain'],
json.dumps(session_data['cookies']),
json.dumps(session_data['headers']),
session_data['user_agent'],
session_data['proxy'],
session_data['created_time'],
session_data['last_used_time'],
session_data['request_count'],
session_data['success_count']
))
self.conn.commit()
def load_session(self, session_id):
"""
加载会话
"""
if self.storage_type == 'file':
filename = f"session_{session_id}.pkl"
try:
with open(filename, 'rb') as f:
session_data = pickle.load(f)
self.sessions[session_id] = session_data
return session_data
except FileNotFoundError:
return None
elif self.storage_type == 'database':
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM sessions WHERE session_id = ?', (session_id,))
row = cursor.fetchone()
if row:
session_data = {
'session_id': row[0],
'domain': row[1],
'cookies': json.loads(row[2]),
'headers': json.loads(row[3]),
'user_agent': row[4],
'proxy': row[5],
'created_time': row[6],
'last_used_time': row[7],
'request_count': row[8],
'success_count': row[9]
}
self.sessions[session_id] = session_data
return session_data
return None
def cleanup_expired_sessions(self, max_age=3600):
"""
清理过期会话
"""
current_time = time.time()
expired_sessions = []
for session_id, session in self.sessions.items():
if current_time - session['last_used_time'] > max_age:
expired_sessions.append(session_id)
for session_id in expired_sessions:
del self.sessions[session_id]
if self.storage_type == 'database':
cursor = self.conn.cursor()
cursor.execute('DELETE FROM sessions WHERE session_id = ?', (session_id,))
self.conn.commit()
# Scrapy会话中间件
class SessionMiddleware:
"""
Scrapy会话管理中间件
"""
def __init__(self):
self.session_manager = SessionManager(storage_type='database')
self.domain_sessions = {}
def process_request(self, request, spider):
"""
处理请求
"""
from urllib.parse import urlparse
domain = urlparse(request.url).netloc
# 获取或创建会话
if domain not in self.domain_sessions:
session_id = self.session_manager.create_session(domain)
self.domain_sessions[domain] = session_id
else:
session_id = self.domain_sessions[domain]
# 获取会话数据
session = self.session_manager.get_session(session_id)
if session:
# 设置Cookie
cookies = self.session_manager.get_cookies_for_request(session_id)
if cookies:
request.cookies.update(cookies)
# 设置User-Agent
if session['user_agent']:
request.headers['User-Agent'] = session['user_agent']
# 设置代理
if session['proxy']:
request.meta['proxy'] = session['proxy']
# 记录会话ID
request.meta['session_id'] = session_id
return None
def process_response(self, request, response, spider):
"""
处理响应
"""
session_id = request.meta.get('session_id')
if session_id:
# 更新会话统计
self.session_manager.update_session(
session_id,
request_count=self.session_manager.get_session(session_id)['request_count'] + 1
)
# 提取并保存Cookie
if 'Set-Cookie' in response.headers:
cookies = response.headers.getlist('Set-Cookie')
for cookie in cookies:
self.session_manager.add_cookies(session_id, cookie.decode())
# 如果响应成功,更新成功计数
if response.status == 200:
session = self.session_manager.get_session(session_id)
self.session_manager.update_session(
session_id,
success_count=session['success_count'] + 1
)
return response
print("会话管理和状态保持完成!")
5. 实战案例
5.1 综合反爬虫对抗案例
# 9. 综合反爬虫对抗案例
print("\n🎯 综合反爬虫对抗案例:")
import scrapy
from scrapy.http import Request
import logging
class AdvancedAntiSpiderSpider(scrapy.Spider):
"""
高级反爬虫对抗爬虫示例
"""
name = 'advanced_anti_spider'
custom_settings = {
'DOWNLOAD_DELAY': 2,
'RANDOMIZE_DOWNLOAD_DELAY': 0.5,
'CONCURRENT_REQUESTS': 1,
'CONCURRENT_REQUESTS_PER_DOMAIN': 1,
'DOWNLOADER_MIDDLEWARES': {
'myproject.middlewares.AdvancedUserAgentMiddleware': 400,
'myproject.middlewares.ProxyMiddleware': 410,
'myproject.middlewares.SessionMiddleware': 420,
'myproject.middlewares.CaptchaMiddleware': 430,
'myproject.middlewares.JavaScriptMiddleware': 440,
},
'ITEM_PIPELINES': {
'myproject.pipelines.DataValidationPipeline': 300,
'myproject.pipelines.DuplicateFilterPipeline': 400,
}
}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 初始化组件
self.behavior_simulator = HumanBehaviorSimulator()
self.behavior_simulator.start_session('researcher')
# 目标网站配置
self.target_config = {
'base_url': 'https://example-protected-site.com',
'login_required': True,
'has_captcha': True,
'uses_javascript': True,
'rate_limited': True
}
# 爬取统计
self.stats = {
'requests_sent': 0,
'responses_received': 0,
'captchas_solved': 0,
'errors_encountered': 0,
'data_extracted': 0
}
def start_requests(self):
"""
生成初始请求
"""
# 首先访问首页建立会话
yield Request(
url=self.target_config['base_url'],
callback=self.parse_homepage,
meta={
'dont_cache': True,
'handle_httpstatus_list': [403, 429, 503]
}
)
def parse_homepage(self, response):
"""
解析首页
"""
self.logger.info(f"访问首页: {response.status}")
# 检查是否需要登录
if self.target_config['login_required']:
login_url = response.urljoin('/login')
yield Request(
url=login_url,
callback=self.parse_login_page,
meta=response.meta
)
else:
# 直接开始爬取数据
yield from self.generate_data_requests(response)
def parse_login_page(self, response):
"""
解析登录页面
"""
self.logger.info("处理登录页面")
# 检查是否有验证码
captcha_img = response.css('img[src*="captcha"]::attr(src)').get()
if captcha_img:
self.logger.info("检测到验证码,开始处理")
# 这里应该调用验证码处理逻辑
# 为了演示,我们假设验证码已解决
captcha_solution = "ABCD" # 实际应该调用验证码识别
self.stats['captchas_solved'] += 1
else:
captcha_solution = None
# 提取登录表单
form_data = {
'username': 'your_username',
'password': 'your_password'
}
if captcha_solution:
form_data['captcha'] = captcha_solution
# 提交登录表单
yield scrapy.FormRequest.from_response(
response,
formdata=form_data,
callback=self.after_login,
meta=response.meta
)
def after_login(self, response):
"""
登录后处理
"""
if "欢迎" in response.text or "dashboard" in response.url.lower():
self.logger.info("登录成功")
# 开始爬取数据
yield from self.generate_data_requests(response)
else:
self.logger.error("登录失败")
self.stats['errors_encountered'] += 1
def generate_data_requests(self, response):
"""
生成数据爬取请求
"""
# 模拟人类浏览行为
delay = self.behavior_simulator.get_next_action_delay()
# 生成目标URL列表
target_urls = [
'/products',
'/categories',
'/search?q=example'
]
for url in target_urls:
full_url = response.urljoin(url)
yield Request(
url=full_url,
callback=self.parse_data_page,
meta={
'delay': delay,
'dont_cache': True
}
)
def parse_data_page(self, response):
"""
解析数据页面
"""
self.logger.info(f"解析数据页面: {response.url}")
# 检查是否遇到反爬虫措施
if self.is_blocked(response):
self.logger.warning("检测到反爬虫阻拦")
yield from self.handle_blocking(response)
return
# 提取数据
items = response.css('.item')
for item in items:
data = {
'title': item.css('.title::text').get(),
'price': item.css('.price::text').get(),
'description': item.css('.description::text').get(),
'url': response.url,
'extracted_time': time.time()
}
if data['title']: # 基本验证
self.stats['data_extracted'] += 1
yield data
# 查找下一页
next_page = response.css('.pagination .next::attr(href)').get()
if next_page:
# 计算延迟时间
delay = self.behavior_simulator.get_next_action_delay()
yield Request(
url=response.urljoin(next_page),
callback=self.parse_data_page,
meta={
'delay': delay,
'dont_cache': True
}
)
def is_blocked(self, response):
"""
检查是否被阻拦
"""
# 检查状态码
if response.status in [403, 429, 503]:
return True
# 检查页面内容
blocking_indicators = [
'访问被拒绝',
'access denied',
'验证码',
'captcha',
'请稍后再试',
'rate limit'
]
page_text = response.text.lower()
for indicator in blocking_indicators:
if indicator in page_text:
return True
return False
def handle_blocking(self, response):
"""
处理阻拦情况
"""
self.logger.info("处理反爬虫阻拦")
# 增加延迟
extended_delay = self.behavior_simulator.calculate_request_interval() * 3
# 如果是验证码页面
if 'captcha' in response.text.lower():
self.logger.info("遇到验证码页面")
# 这里应该调用验证码处理逻辑
# 为了演示,我们简单地重试
yield Request(
url=response.url,
callback=self.parse_data_page,
meta={
'delay': extended_delay,
'dont_cache': True
},
dont_filter=True
)
# 如果是频率限制
elif response.status == 429:
self.logger.info("遇到频率限制,等待后重试")
yield Request(
url=response.url,
callback=self.parse_data_page,
meta={
'delay': extended_delay * 2,
'dont_cache': True
},
dont_filter=True
)
def closed(self, reason):
"""
爬虫关闭时的清理工作
"""
self.logger.info("爬虫关闭,统计信息:")
for key, value in self.stats.items():
self.logger.info(f" {key}: {value}")
print("综合反爬虫对抗案例完成!")
6. 本章小结
本章详细介绍了反爬虫对抗的各种技术和策略:
主要内容回顾
反爬虫技术概述
- 常见反爬虫技术分类
- 检测原理和实现机制
- 风险评估和应对策略
基础对抗技术
- 请求头和User-Agent管理
- 代理管理系统
- 会话状态保持
高级对抗技术
- 验证码识别与处理
- JavaScript渲染和动态内容
- 浏览器自动化技术
智能行为模拟
- 人类行为模式分析
- 智能请求调度
- 自适应延迟策略
实战案例
- 综合反爬虫对抗实现
- 多种技术组合应用
- 错误处理和恢复机制
最佳实践
渐进式对抗策略
- 从简单技术开始
- 根据需要逐步升级
- 避免过度工程化
合理使用资源
- 平衡效率和隐蔽性
- 合理配置并发和延迟
- 监控资源使用情况
遵守法律法规
- 尊重robots.txt
- 避免对服务器造成过大压力
- 遵守相关法律法规
常见陷阱
过度依赖单一技术
- 应该组合使用多种技术
- 建立多层防护体系
忽视行为模式
- 机械化的请求模式容易被识别
- 需要模拟真实用户行为
缺乏错误处理
- 必须处理各种异常情况
- 建立完善的重试机制
下一步学习建议
深入学习浏览器自动化
- Selenium高级用法
- Playwright和Puppeteer
- 无头浏览器优化
机器学习在反爬虫中的应用
- 验证码识别模型
- 行为模式学习
- 异常检测算法
分布式爬虫架构
- 大规模爬虫系统设计
- 负载均衡和容错
- 监控和运维
练习题
基础练习
- 实现一个简单的User-Agent轮换中间件
- 编写代理验证和管理程序
- 创建基本的会话管理系统
进阶练习
- 实现滑块验证码自动识别
- 开发JavaScript渲染中间件
- 设计智能请求调度算法
综合项目
- 构建完整的反爬虫对抗系统
- 实现多网站适配的通用爬虫
- 开发爬虫监控和管理平台
第7章完结! 🎉
下一章我们将学习Scrapy的部署与运维,包括生产环境部署、监控、日志管理等内容。