学习目标

通过本章学习,您将掌握:

  • 理解常见的反爬虫技术和原理
  • 掌握反爬虫对抗的策略和方法
  • 学会使用代理、用户代理轮换等技术
  • 了解验证码识别和处理技术
  • 掌握JavaScript渲染和动态内容处理
  • 学会模拟人类行为和智能爬虫技术

1. 反爬虫技术概述

1.1 常见反爬虫技术

# 1. 反爬虫技术分类和原理
print("🛡️ 反爬虫技术概述:")

class AntiSpiderTechniques:
    """
    反爬虫技术分类和分析
    """
    
    def __init__(self):
        self.techniques = {
            "基于请求特征": {
                "User-Agent检测": "检测异常的用户代理字符串",
                "请求频率限制": "限制单位时间内的请求数量",
                "IP地址封禁": "封禁异常IP地址",
                "请求头检测": "检测缺失或异常的HTTP头",
                "会话管理": "要求维持有效的会话状态"
            },
            
            "基于行为特征": {
                "鼠标轨迹检测": "检测鼠标移动和点击行为",
                "键盘输入检测": "检测键盘输入模式",
                "页面停留时间": "检测页面浏览时间",
                "滚动行为": "检测页面滚动模式",
                "交互行为": "检测用户交互行为"
            },
            
            "基于内容保护": {
                "验证码": "图片、滑块、点击验证码",
                "JavaScript挑战": "需要执行JavaScript才能获取内容",
                "动态加载": "内容通过AJAX动态加载",
                "字体反爬": "使用自定义字体混淆文本",
                "CSS反爬": "使用CSS隐藏或混淆内容"
            },
            
            "基于环境检测": {
                "浏览器指纹": "检测浏览器环境特征",
                "WebGL指纹": "检测图形渲染特征",
                "Canvas指纹": "检测Canvas渲染特征",
                "设备指纹": "检测设备硬件特征",
                "网络指纹": "检测网络环境特征"
            }
        }
    
    def analyze_technique(self, category, technique):
        """
        分析特定反爬虫技术
        """
        if category in self.techniques and technique in self.techniques[category]:
            description = self.techniques[category][technique]
            
            analysis = {
                "技术名称": technique,
                "分类": category,
                "描述": description,
                "检测难度": self._get_detection_difficulty(technique),
                "绕过难度": self._get_bypass_difficulty(technique),
                "常见场景": self._get_common_scenarios(technique)
            }
            
            return analysis
        
        return None
    
    def _get_detection_difficulty(self, technique):
        """
        获取检测难度
        """
        difficulty_map = {
            "User-Agent检测": "低",
            "请求频率限制": "低",
            "IP地址封禁": "低",
            "验证码": "中",
            "JavaScript挑战": "中",
            "浏览器指纹": "高",
            "Canvas指纹": "高"
        }
        return difficulty_map.get(technique, "中")
    
    def _get_bypass_difficulty(self, technique):
        """
        获取绕过难度
        """
        difficulty_map = {
            "User-Agent检测": "低",
            "请求频率限制": "中",
            "IP地址封禁": "中",
            "验证码": "高",
            "JavaScript挑战": "中",
            "浏览器指纹": "高",
            "Canvas指纹": "高"
        }
        return difficulty_map.get(technique, "中")
    
    def _get_common_scenarios(self, technique):
        """
        获取常见应用场景
        """
        scenarios_map = {
            "User-Agent检测": ["电商网站", "新闻网站", "论坛"],
            "请求频率限制": ["API接口", "搜索引擎", "社交媒体"],
            "验证码": ["登录页面", "注册页面", "敏感操作"],
            "JavaScript挑战": ["CDN保护", "DDoS防护", "高价值内容"],
            "浏览器指纹": ["金融网站", "广告平台", "安全要求高的网站"]
        }
        return scenarios_map.get(technique, ["通用场景"])
    
    def show_all_techniques(self):
        """
        展示所有反爬虫技术
        """
        print("反爬虫技术分类:")
        for category, techniques in self.techniques.items():
            print(f"\n{category}:")
            for technique, description in techniques.items():
                print(f"  • {technique}: {description}")

# 创建反爬虫技术分析器
anti_spider = AntiSpiderTechniques()
anti_spider.show_all_techniques()

# 分析特定技术
print("\n技术分析示例:")
analysis = anti_spider.analyze_technique("基于内容保护", "验证码")
if analysis:
    for key, value in analysis.items():
        print(f"{key}: {value}")

print("反爬虫技术概述完成!")

1.2 反爬虫检测原理

# 2. 反爬虫检测原理和实现
print("\n🔍 反爬虫检测原理:")

import time
import random
import hashlib
from collections import defaultdict, deque
from datetime import datetime, timedelta

class AntiSpiderDetector:
    """
    反爬虫检测器示例(用于理解检测原理)
    """
    
    def __init__(self):
        # 请求统计
        self.request_stats = defaultdict(lambda: {
            'count': 0,
            'timestamps': deque(maxlen=100),
            'user_agents': set(),
            'patterns': []
        })
        
        # 行为模式
        self.behavior_patterns = defaultdict(list)
        
        # 黑名单
        self.blacklist = set()
        
        # 检测规则
        self.detection_rules = {
            'max_requests_per_minute': 60,
            'max_requests_per_hour': 1000,
            'min_request_interval': 0.5,
            'suspicious_user_agents': [
                'python-requests',
                'scrapy',
                'curl',
                'wget'
            ],
            'required_headers': [
                'User-Agent',
                'Accept',
                'Accept-Language'
            ]
        }
    
    def analyze_request(self, ip, user_agent, headers, timestamp=None):
        """
        分析请求特征
        """
        if timestamp is None:
            timestamp = time.time()
        
        # 记录请求
        stats = self.request_stats[ip]
        stats['count'] += 1
        stats['timestamps'].append(timestamp)
        stats['user_agents'].add(user_agent)
        
        # 检测结果
        detection_result = {
            'ip': ip,
            'timestamp': timestamp,
            'is_suspicious': False,
            'reasons': [],
            'risk_score': 0
        }
        
        # 频率检测
        risk_score = 0
        
        # 1. 请求频率检测
        recent_requests = [t for t in stats['timestamps'] 
                          if timestamp - t < 60]  # 最近1分钟
        
        if len(recent_requests) > self.detection_rules['max_requests_per_minute']:
            detection_result['is_suspicious'] = True
            detection_result['reasons'].append('请求频率过高')
            risk_score += 30
        
        # 2. 请求间隔检测
        if len(stats['timestamps']) >= 2:
            last_timestamp = stats['timestamps'][-2]
            interval = timestamp - last_timestamp
            
            if interval < self.detection_rules['min_request_interval']:
                detection_result['is_suspicious'] = True
                detection_result['reasons'].append('请求间隔过短')
                risk_score += 20
        
        # 3. User-Agent检测
        if any(ua in user_agent.lower() for ua in self.detection_rules['suspicious_user_agents']):
            detection_result['is_suspicious'] = True
            detection_result['reasons'].append('可疑的User-Agent')
            risk_score += 40
        
        # 4. 请求头检测
        missing_headers = []
        for required_header in self.detection_rules['required_headers']:
            if required_header not in headers:
                missing_headers.append(required_header)
        
        if missing_headers:
            detection_result['is_suspicious'] = True
            detection_result['reasons'].append(f'缺失请求头: {missing_headers}')
            risk_score += 15 * len(missing_headers)
        
        # 5. 行为模式检测
        pattern_score = self.analyze_behavior_pattern(ip, timestamp)
        risk_score += pattern_score
        
        if pattern_score > 0:
            detection_result['reasons'].append('异常行为模式')
        
        detection_result['risk_score'] = min(risk_score, 100)
        
        # 更新黑名单
        if risk_score > 70:
            self.blacklist.add(ip)
            detection_result['action'] = 'blocked'
        elif risk_score > 50:
            detection_result['action'] = 'challenge'
        else:
            detection_result['action'] = 'allow'
        
        return detection_result
    
    def analyze_behavior_pattern(self, ip, timestamp):
        """
        分析行为模式
        """
        patterns = self.behavior_patterns[ip]
        patterns.append(timestamp)
        
        # 保留最近的行为记录
        cutoff_time = timestamp - 3600  # 1小时
        patterns[:] = [t for t in patterns if t > cutoff_time]
        
        risk_score = 0
        
        # 检测规律性访问
        if len(patterns) >= 10:
            intervals = [patterns[i] - patterns[i-1] for i in range(1, len(patterns))]
            
            # 计算间隔的标准差
            if intervals:
                mean_interval = sum(intervals) / len(intervals)
                variance = sum((x - mean_interval) ** 2 for x in intervals) / len(intervals)
                std_dev = variance ** 0.5
                
                # 如果间隔非常规律(标准差小),可能是机器人
                if std_dev < 0.1 and mean_interval < 5:
                    risk_score += 25
        
        return risk_score
    
    def generate_challenge(self, challenge_type='captcha'):
        """
        生成挑战
        """
        challenges = {
            'captcha': {
                'type': 'image_captcha',
                'image_url': '/captcha/generate',
                'input_field': 'captcha_code'
            },
            'javascript': {
                'type': 'js_challenge',
                'script': 'var result = Math.pow(2, 10) + Math.sqrt(144);',
                'expected': '1036'
            },
            'slider': {
                'type': 'slider_captcha',
                'background_image': '/captcha/slider_bg.jpg',
                'slider_image': '/captcha/slider_piece.png'
            }
        }
        
        return challenges.get(challenge_type, challenges['captcha'])
    
    def get_detection_stats(self):
        """
        获取检测统计
        """
        total_ips = len(self.request_stats)
        blacklisted_ips = len(self.blacklist)
        
        total_requests = sum(stats['count'] for stats in self.request_stats.values())
        
        return {
            'total_ips': total_ips,
            'blacklisted_ips': blacklisted_ips,
            'total_requests': total_requests,
            'blacklist_rate': blacklisted_ips / total_ips if total_ips > 0 else 0
        }

# 使用示例
print("\n检测器使用示例:")

detector = AntiSpiderDetector()

# 模拟正常用户请求
normal_request = detector.analyze_request(
    ip='192.168.1.100',
    user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
    headers={
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
        'Accept-Language': 'en-US,en;q=0.5'
    }
)

print("正常用户请求分析:")
print(f"  可疑: {normal_request['is_suspicious']}")
print(f"  风险分数: {normal_request['risk_score']}")
print(f"  动作: {normal_request['action']}")

# 模拟爬虫请求
spider_request = detector.analyze_request(
    ip='192.168.1.200',
    user_agent='python-requests/2.25.1',
    headers={
        'User-Agent': 'python-requests/2.25.1'
    }
)

print("\n爬虫请求分析:")
print(f"  可疑: {spider_request['is_suspicious']}")
print(f"  风险分数: {spider_request['risk_score']}")
print(f"  原因: {spider_request['reasons']}")
print(f"  动作: {spider_request['action']}")

print("反爬虫检测原理演示完成!")

2. 基础对抗技术

2.1 请求头和User-Agent管理

# 3. 请求头和User-Agent管理
print("\n🔧 请求头和User-Agent管理:")

import random
import json
from scrapy.downloadermiddlewares.useragent import UserAgentMiddleware

class AdvancedUserAgentMiddleware(UserAgentMiddleware):
    """
    高级User-Agent中间件
    """
    
    def __init__(self, user_agent='Scrapy'):
        super().__init__(user_agent)
        
        # 真实浏览器User-Agent池
        self.user_agent_pool = [
            # Chrome
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            
            # Firefox
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:89.0) Gecko/20100101 Firefox/89.0',
            'Mozilla/5.0 (X11; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0',
            
            # Safari
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
            'Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1',
            
            # Edge
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.59'
        ]
        
        # 对应的请求头模板
        self.header_templates = {
            'chrome': {
                'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
                'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7',
                'Accept-Encoding': 'gzip, deflate, br',
                'DNT': '1',
                'Connection': 'keep-alive',
                'Upgrade-Insecure-Requests': '1',
                'Sec-Fetch-Dest': 'document',
                'Sec-Fetch-Mode': 'navigate',
                'Sec-Fetch-Site': 'none',
                'Sec-Fetch-User': '?1',
                'Cache-Control': 'max-age=0'
            },
            'firefox': {
                'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
                'Accept-Language': 'en-US,en;q=0.5',
                'Accept-Encoding': 'gzip, deflate, br',
                'DNT': '1',
                'Connection': 'keep-alive',
                'Upgrade-Insecure-Requests': '1',
                'Sec-Fetch-Dest': 'document',
                'Sec-Fetch-Mode': 'navigate',
                'Sec-Fetch-Site': 'none',
                'Sec-Fetch-User': '?1'
            },
            'safari': {
                'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
                'Accept-Language': 'en-us',
                'Accept-Encoding': 'gzip, deflate, br',
                'Connection': 'keep-alive',
                'Upgrade-Insecure-Requests': '1'
            }
        }
    
    def process_request(self, request, spider):
        """
        处理请求,设置User-Agent和相关头部
        """
        # 随机选择User-Agent
        user_agent = random.choice(self.user_agent_pool)
        request.headers['User-Agent'] = user_agent
        
        # 根据User-Agent类型设置对应的请求头
        browser_type = self.detect_browser_type(user_agent)
        headers = self.header_templates.get(browser_type, self.header_templates['chrome'])
        
        # 设置请求头
        for header_name, header_value in headers.items():
            if header_name not in request.headers:
                request.headers[header_name] = header_value
        
        # 添加随机性
        self.add_randomness(request)
        
        return None
    
    def detect_browser_type(self, user_agent):
        """
        检测浏览器类型
        """
        user_agent_lower = user_agent.lower()
        
        if 'firefox' in user_agent_lower:
            return 'firefox'
        elif 'safari' in user_agent_lower and 'chrome' not in user_agent_lower:
            return 'safari'
        else:
            return 'chrome'
    
    def add_randomness(self, request):
        """
        添加随机性
        """
        # 随机添加一些可选头部
        optional_headers = {
            'X-Requested-With': 'XMLHttpRequest',
            'X-Forwarded-For': self.generate_fake_ip(),
            'X-Real-IP': self.generate_fake_ip(),
        }
        
        # 随机选择是否添加可选头部
        for header_name, header_value in optional_headers.items():
            if random.random() < 0.3:  # 30%概率添加
                request.headers[header_name] = header_value
    
    def generate_fake_ip(self):
        """
        生成虚假IP地址
        """
        return f"{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}"

# 请求头管理器
class HeaderManager:
    """
    请求头管理器
    """
    
    def __init__(self):
        self.session_headers = {}
        self.common_headers = {
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.5',
            'Accept-Encoding': 'gzip, deflate',
            'DNT': '1',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1'
        }
    
    def get_headers_for_domain(self, domain):
        """
        获取特定域名的请求头
        """
        if domain not in self.session_headers:
            self.session_headers[domain] = self.generate_session_headers()
        
        return self.session_headers[domain]
    
    def generate_session_headers(self):
        """
        生成会话请求头
        """
        headers = self.common_headers.copy()
        
        # 添加随机变化
        languages = [
            'en-US,en;q=0.9',
            'en-US,en;q=0.8,zh-CN;q=0.6',
            'zh-CN,zh;q=0.9,en;q=0.8'
        ]
        
        headers['Accept-Language'] = random.choice(languages)
        
        return headers
    
    def update_headers_with_referer(self, headers, referer):
        """
        更新请求头中的Referer
        """
        if referer:
            headers['Referer'] = referer
        
        return headers

print("请求头和User-Agent管理完成!")

2.2 代理管理系统

# 4. 代理管理系统
print("\n🌐 代理管理系统:")

import requests
import time
import threading
from queue import Queue, Empty
from urllib.parse import urlparse

class ProxyManager:
    """
    代理管理器
    """
    
    def __init__(self):
        self.proxy_pool = Queue()
        self.failed_proxies = set()
        self.proxy_stats = {}
        self.lock = threading.Lock()
        
        # 代理来源配置
        self.proxy_sources = [
            'https://www.proxy-list.download/api/v1/get?type=http',
            'https://api.proxyscrape.com/v2/?request=get&protocol=http',
            # 添加更多代理源
        ]
        
        # 验证配置
        self.test_url = 'http://httpbin.org/ip'
        self.test_timeout = 10
        
    def fetch_proxies_from_source(self, source_url):
        """
        从代理源获取代理列表
        """
        try:
            response = requests.get(source_url, timeout=30)
            if response.status_code == 200:
                # 解析代理列表(根据不同源的格式)
                proxies = self.parse_proxy_response(response.text, source_url)
                return proxies
        except Exception as e:
            print(f"获取代理失败 {source_url}: {e}")
        
        return []
    
    def parse_proxy_response(self, response_text, source_url):
        """
        解析代理响应
        """
        proxies = []
        lines = response_text.strip().split('\n')
        
        for line in lines:
            line = line.strip()
            if ':' in line:
                try:
                    # 简单的IP:PORT格式
                    ip, port = line.split(':')
                    proxy = {
                        'ip': ip.strip(),
                        'port': int(port.strip()),
                        'type': 'http',
                        'source': source_url
                    }
                    proxies.append(proxy)
                except ValueError:
                    continue
        
        return proxies
    
    def validate_proxy(self, proxy):
        """
        验证代理可用性
        """
        proxy_url = f"http://{proxy['ip']}:{proxy['port']}"
        proxies = {
            'http': proxy_url,
            'https': proxy_url
        }
        
        try:
            start_time = time.time()
            response = requests.get(
                self.test_url,
                proxies=proxies,
                timeout=self.test_timeout
            )
            
            if response.status_code == 200:
                response_time = time.time() - start_time
                
                # 更新代理统计
                proxy_key = f"{proxy['ip']}:{proxy['port']}"
                self.proxy_stats[proxy_key] = {
                    'response_time': response_time,
                    'success_count': self.proxy_stats.get(proxy_key, {}).get('success_count', 0) + 1,
                    'last_success': time.time(),
                    'total_requests': self.proxy_stats.get(proxy_key, {}).get('total_requests', 0) + 1
                }
                
                return True
        
        except Exception as e:
            # 记录失败
            proxy_key = f"{proxy['ip']}:{proxy['port']}"
            if proxy_key in self.proxy_stats:
                self.proxy_stats[proxy_key]['total_requests'] += 1
        
        return False
    
    def load_proxies(self):
        """
        加载代理到池中
        """
        print("开始加载代理...")
        
        all_proxies = []
        
        # 从各个源获取代理
        for source in self.proxy_sources:
            proxies = self.fetch_proxies_from_source(source)
            all_proxies.extend(proxies)
            print(f"从 {source} 获取到 {len(proxies)} 个代理")
        
        # 验证代理
        valid_proxies = []
        for proxy in all_proxies:
            if self.validate_proxy(proxy):
                valid_proxies.append(proxy)
                self.proxy_pool.put(proxy)
        
        print(f"验证完成,有效代理: {len(valid_proxies)}")
        return len(valid_proxies)
    
    def get_proxy(self):
        """
        获取一个可用代理
        """
        try:
            proxy = self.proxy_pool.get_nowait()
            
            # 检查是否在失败列表中
            proxy_key = f"{proxy['ip']}:{proxy['port']}"
            if proxy_key in self.failed_proxies:
                return self.get_proxy()  # 递归获取下一个
            
            return proxy
        
        except Empty:
            # 代理池为空,尝试重新加载
            if self.load_proxies() > 0:
                return self.get_proxy()
            else:
                return None
    
    def return_proxy(self, proxy, success=True):
        """
        归还代理到池中
        """
        proxy_key = f"{proxy['ip']}:{proxy['port']}"
        
        if success:
            # 成功使用,归还到池中
            self.proxy_pool.put(proxy)
            
            # 从失败列表中移除(如果存在)
            with self.lock:
                self.failed_proxies.discard(proxy_key)
        else:
            # 使用失败,加入失败列表
            with self.lock:
                self.failed_proxies.add(proxy_key)
    
    def get_proxy_stats(self):
        """
        获取代理统计信息
        """
        total_proxies = len(self.proxy_stats)
        failed_proxies = len(self.failed_proxies)
        available_proxies = self.proxy_pool.qsize()
        
        # 计算平均响应时间
        response_times = [stats.get('response_time', 0) 
                         for stats in self.proxy_stats.values() 
                         if 'response_time' in stats]
        
        avg_response_time = sum(response_times) / len(response_times) if response_times else 0
        
        return {
            'total_proxies': total_proxies,
            'failed_proxies': failed_proxies,
            'available_proxies': available_proxies,
            'average_response_time': avg_response_time
        }

# Scrapy代理中间件
class ProxyMiddleware:
    """
    Scrapy代理中间件
    """
    
    def __init__(self):
        self.proxy_manager = ProxyManager()
        self.proxy_manager.load_proxies()
    
    def process_request(self, request, spider):
        """
        处理请求,设置代理
        """
        proxy = self.proxy_manager.get_proxy()
        
        if proxy:
            proxy_url = f"http://{proxy['ip']}:{proxy['port']}"
            request.meta['proxy'] = proxy_url
            request.meta['proxy_info'] = proxy
            
            spider.logger.info(f"使用代理: {proxy_url}")
        
        return None
    
    def process_response(self, request, response, spider):
        """
        处理响应
        """
        if 'proxy_info' in request.meta:
            proxy = request.meta['proxy_info']
            
            # 根据响应状态判断代理是否成功
            if response.status == 200:
                self.proxy_manager.return_proxy(proxy, success=True)
            else:
                self.proxy_manager.return_proxy(proxy, success=False)
        
        return response
    
    def process_exception(self, request, exception, spider):
        """
        处理异常
        """
        if 'proxy_info' in request.meta:
            proxy = request.meta['proxy_info']
            self.proxy_manager.return_proxy(proxy, success=False)
        
        return None

# 使用示例
print("\n代理管理器使用示例:")

# 创建代理管理器
proxy_manager = ProxyManager()

# 手动添加一些测试代理
test_proxies = [
    {'ip': '127.0.0.1', 'port': 8080, 'type': 'http', 'source': 'manual'},
    {'ip': '192.168.1.1', 'port': 3128, 'type': 'http', 'source': 'manual'}
]

for proxy in test_proxies:
    proxy_manager.proxy_pool.put(proxy)

# 获取统计信息
stats = proxy_manager.get_proxy_stats()
print("代理统计:")
for key, value in stats.items():
    print(f"  {key}: {value}")

print("代理管理系统完成!")

3. 高级对抗技术

3.1 验证码识别与处理

# 5. 验证码识别与处理
print("\n🔐 验证码识别与处理:")

import base64
import io
from PIL import Image
import cv2
import numpy as np

class CaptchaHandler:
    """
    验证码处理器
    """
    
    def __init__(self):
        self.ocr_engines = {
            'tesseract': self.tesseract_ocr,
            'ddddocr': self.ddddocr_ocr,
            'manual': self.manual_solve
        }
        
        # 验证码类型检测规则
        self.captcha_patterns = {
            'simple_text': {
                'description': '简单文本验证码',
                'features': ['4-6位字符', '无干扰线', '字符清晰'],
                'solver': 'tesseract'
            },
            'complex_text': {
                'description': '复杂文本验证码',
                'features': ['扭曲字符', '干扰线', '噪点'],
                'solver': 'ddddocr'
            },
            'math_captcha': {
                'description': '数学运算验证码',
                'features': ['数学表达式', '加减乘除'],
                'solver': 'math_solver'
            },
            'slider_captcha': {
                'description': '滑块验证码',
                'features': ['拼图块', '滑动轨迹'],
                'solver': 'slider_solver'
            },
            'click_captcha': {
                'description': '点击验证码',
                'features': ['点击指定区域', '文字描述'],
                'solver': 'click_solver'
            }
        }
    
    def detect_captcha_type(self, image_data):
        """
        检测验证码类型
        """
        # 这里简化处理,实际应该使用图像分析
        image = Image.open(io.BytesIO(image_data))
        width, height = image.size
        
        # 基于尺寸和特征判断
        if width > 300 and height > 150:
            return 'slider_captcha'
        elif width < 100 and height < 50:
            return 'simple_text'
        else:
            return 'complex_text'
    
    def preprocess_image(self, image_data, captcha_type):
        """
        图像预处理
        """
        image = Image.open(io.BytesIO(image_data))
        
        if captcha_type in ['simple_text', 'complex_text']:
            # 转换为灰度图
            image = image.convert('L')
            
            # 转换为numpy数组
            img_array = np.array(image)
            
            # 二值化
            _, binary = cv2.threshold(img_array, 127, 255, cv2.THRESH_BINARY)
            
            # 去噪
            kernel = np.ones((2, 2), np.uint8)
            cleaned = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel)
            
            # 转换回PIL图像
            processed_image = Image.fromarray(cleaned)
            
            return processed_image
        
        return image
    
    def tesseract_ocr(self, image):
        """
        使用Tesseract OCR识别
        """
        try:
            import pytesseract
            
            # 配置Tesseract
            config = '--psm 8 -c tessedit_char_whitelist=0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
            
            text = pytesseract.image_to_string(image, config=config)
            return text.strip()
        
        except ImportError:
            print("Tesseract未安装,请安装pytesseract")
            return None
        except Exception as e:
            print(f"Tesseract识别失败: {e}")
            return None
    
    def ddddocr_ocr(self, image):
        """
        使用ddddocr识别
        """
        try:
            import ddddocr
            
            ocr = ddddocr.DdddOcr()
            
            # 转换图像为字节
            img_bytes = io.BytesIO()
            image.save(img_bytes, format='PNG')
            img_bytes = img_bytes.getvalue()
            
            result = ocr.classification(img_bytes)
            return result
        
        except ImportError:
            print("ddddocr未安装,请安装ddddocr")
            return None
        except Exception as e:
            print(f"ddddocr识别失败: {e}")
            return None
    
    def manual_solve(self, image):
        """
        手动解决验证码
        """
        # 保存图像供手动识别
        image.save('captcha_manual.png')
        print("验证码已保存为 captcha_manual.png,请手动识别")
        
        # 在实际应用中,这里可以:
        # 1. 发送到人工识别平台
        # 2. 显示图像让用户输入
        # 3. 集成第三方验证码识别服务
        
        return input("请输入验证码: ")
    
    def solve_math_captcha(self, image):
        """
        解决数学验算验证码
        """
        # 先用OCR识别文本
        text = self.tesseract_ocr(image)
        
        if text:
            # 解析数学表达式
            try:
                # 简单的数学表达式求解
                # 注意:eval有安全风险,实际应用中应该使用更安全的方法
                result = eval(text.replace('=', '').replace('?', ''))
                return str(result)
            except:
                return None
        
        return None
    
    def solve_slider_captcha(self, background_image, slider_image):
        """
        解决滑块验证码
        """
        # 模板匹配找到滑块位置
        bg_array = np.array(background_image)
        slider_array = np.array(slider_image)
        
        # 使用OpenCV模板匹配
        result = cv2.matchTemplate(bg_array, slider_array, cv2.TM_CCOEFF_NORMED)
        min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
        
        # 返回滑动距离
        return max_loc[0]
    
    def generate_slider_trajectory(self, distance):
        """
        生成滑块轨迹(模拟人类行为)
        """
        trajectory = []
        current = 0
        mid = distance * 4 / 5  # 80%处开始减速
        
        t = 0.2
        v = 0
        
        while current < distance:
            if current < mid:
                a = 2  # 加速度
            else:
                a = -3  # 减速度
            
            v0 = v
            v = v0 + a * t
            move = v0 * t + 1 / 2 * a * t * t
            current += move
            
            trajectory.append(round(move))
        
        return trajectory
    
    def solve_captcha(self, image_data, captcha_type=None):
        """
        解决验证码
        """
        if captcha_type is None:
            captcha_type = self.detect_captcha_type(image_data)
        
        print(f"检测到验证码类型: {captcha_type}")
        
        # 预处理图像
        processed_image = self.preprocess_image(image_data, captcha_type)
        
        # 选择合适的解决方案
        if captcha_type == 'math_captcha':
            return self.solve_math_captcha(processed_image)
        elif captcha_type in ['simple_text', 'complex_text']:
            # 尝试多种OCR引擎
            for engine_name, engine_func in self.ocr_engines.items():
                if engine_name == 'manual':
                    continue
                
                result = engine_func(processed_image)
                if result and len(result) >= 3:  # 基本验证
                    return result
            
            # 如果自动识别失败,使用手动识别
            return self.manual_solve(processed_image)
        
        return None

# Scrapy验证码中间件
class CaptchaMiddleware:
    """
    Scrapy验证码处理中间件
    """
    
    def __init__(self):
        self.captcha_handler = CaptchaHandler()
        self.captcha_urls = set()  # 记录遇到验证码的URL
    
    def process_response(self, request, response, spider):
        """
        处理响应,检测验证码
        """
        # 检测是否遇到验证码页面
        if self.is_captcha_page(response):
            spider.logger.info(f"检测到验证码页面: {response.url}")
            
            # 提取验证码图像
            captcha_image_url = self.extract_captcha_image_url(response)
            
            if captcha_image_url:
                # 下载验证码图像
                captcha_response = self.download_captcha_image(captcha_image_url)
                
                if captcha_response:
                    # 解决验证码
                    captcha_result = self.captcha_handler.solve_captcha(captcha_response.content)
                    
                    if captcha_result:
                        # 提交验证码
                        return self.submit_captcha(request, response, captcha_result, spider)
        
        return response
    
    def is_captcha_page(self, response):
        """
        检测是否为验证码页面
        """
        # 检测页面特征
        captcha_indicators = [
            'captcha',
            'verification',
            'verify',
            '验证码',
            '人机验证'
        ]
        
        page_content = response.text.lower()
        
        for indicator in captcha_indicators:
            if indicator in page_content:
                return True
        
        return False
    
    def extract_captcha_image_url(self, response):
        """
        提取验证码图像URL
        """
        # 查找验证码图像
        captcha_selectors = [
            'img[src*="captcha"]::attr(src)',
            'img[src*="verify"]::attr(src)',
            'img[id*="captcha"]::attr(src)',
            '.captcha img::attr(src)'
        ]
        
        for selector in captcha_selectors:
            image_url = response.css(selector).get()
            if image_url:
                return response.urljoin(image_url)
        
        return None
    
    def download_captcha_image(self, image_url):
        """
        下载验证码图像
        """
        try:
            import requests
            response = requests.get(image_url, timeout=10)
            if response.status_code == 200:
                return response
        except Exception as e:
            print(f"下载验证码图像失败: {e}")
        
        return None
    
    def submit_captcha(self, request, response, captcha_result, spider):
        """
        提交验证码
        """
        # 这里需要根据具体网站的验证码提交方式来实现
        # 通常需要找到表单并提交验证码
        
        spider.logger.info(f"验证码识别结果: {captcha_result}")
        
        # 返回原响应,实际应用中应该提交验证码并返回新的请求
        return response

print("验证码识别与处理完成!")

3.2 JavaScript渲染和动态内容处理

# 6. JavaScript渲染和动态内容处理
print("\n🌐 JavaScript渲染和动态内容处理:")

import json
import time
import asyncio
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options

class JavaScriptRenderer:
    """
    JavaScript渲染器
    """
    
    def __init__(self, headless=True, proxy=None):
        self.headless = headless
        self.proxy = proxy
        self.driver = None
        
        # 渲染配置
        self.render_config = {
            'page_load_timeout': 30,
            'script_timeout': 30,
            'implicit_wait': 10,
            'window_size': (1920, 1080)
        }
        
        # JavaScript代码库
        self.js_scripts = {
            'scroll_to_bottom': """
                window.scrollTo(0, document.body.scrollHeight);
            """,
            
            'wait_for_ajax': """
                return jQuery.active == 0;
            """,
            
            'get_page_info': """
                return {
                    title: document.title,
                    url: window.location.href,
                    readyState: document.readyState,
                    height: document.body.scrollHeight,
                    width: document.body.scrollWidth
                };
            """,
            
            'simulate_human_behavior': """
                // 模拟鼠标移动
                function simulateMouseMove() {
                    var event = new MouseEvent('mousemove', {
                        view: window,
                        bubbles: true,
                        cancelable: true,
                        clientX: Math.random() * window.innerWidth,
                        clientY: Math.random() * window.innerHeight
                    });
                    document.dispatchEvent(event);
                }
                
                // 模拟滚动
                function simulateScroll() {
                    window.scrollBy(0, Math.random() * 100);
                }
                
                // 执行模拟行为
                simulateMouseMove();
                setTimeout(simulateScroll, 1000);
            """,
            
            'extract_dynamic_content': """
                // 提取动态加载的内容
                var dynamicElements = document.querySelectorAll('[data-loaded="true"], .dynamic-content, .ajax-content');
                var content = [];
                
                dynamicElements.forEach(function(element) {
                    content.push({
                        tag: element.tagName,
                        text: element.textContent.trim(),
                        html: element.innerHTML,
                        attributes: Array.from(element.attributes).map(attr => ({
                            name: attr.name,
                            value: attr.value
                        }))
                    });
                });
                
                return content;
            """
        }
    
    def setup_driver(self):
        """
        设置WebDriver
        """
        chrome_options = Options()
        
        if self.headless:
            chrome_options.add_argument('--headless')
        
        # 基本配置
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')
        chrome_options.add_argument('--disable-gpu')
        chrome_options.add_argument('--disable-extensions')
        chrome_options.add_argument('--disable-plugins')
        chrome_options.add_argument('--disable-images')  # 禁用图片加载
        
        # 设置窗口大小
        chrome_options.add_argument(f'--window-size={self.render_config["window_size"][0]},{self.render_config["window_size"][1]}')
        
        # 设置代理
        if self.proxy:
            chrome_options.add_argument(f'--proxy-server={self.proxy}')
        
        # 设置User-Agent
        chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
        
        # 禁用自动化检测
        chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
        chrome_options.add_experimental_option('useAutomationExtension', False)
        
        # 创建驱动
        self.driver = webdriver.Chrome(options=chrome_options)
        
        # 设置超时
        self.driver.set_page_load_timeout(self.render_config['page_load_timeout'])
        self.driver.set_script_timeout(self.render_config['script_timeout'])
        self.driver.implicitly_wait(self.render_config['implicit_wait'])
        
        # 执行反检测脚本
        self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
    
    def render_page(self, url, wait_conditions=None):
        """
        渲染页面
        """
        if not self.driver:
            self.setup_driver()
        
        try:
            # 访问页面
            self.driver.get(url)
            
            # 等待页面加载完成
            WebDriverWait(self.driver, 10).until(
                EC.presence_of_element_located((By.TAG_NAME, "body"))
            )
            
            # 执行自定义等待条件
            if wait_conditions:
                self.wait_for_conditions(wait_conditions)
            
            # 模拟人类行为
            self.simulate_human_behavior()
            
            # 等待动态内容加载
            self.wait_for_dynamic_content()
            
            # 获取页面信息
            page_info = self.get_page_info()
            
            # 获取最终的HTML
            html_content = self.driver.page_source
            
            return {
                'url': url,
                'html': html_content,
                'page_info': page_info,
                'screenshot': self.take_screenshot(),
                'dynamic_content': self.extract_dynamic_content()
            }
        
        except Exception as e:
            print(f"页面渲染失败 {url}: {e}")
            return None
    
    def wait_for_conditions(self, conditions):
        """
        等待自定义条件
        """
        for condition in conditions:
            if condition['type'] == 'element_present':
                WebDriverWait(self.driver, condition.get('timeout', 10)).until(
                    EC.presence_of_element_located((By.CSS_SELECTOR, condition['selector']))
                )
            
            elif condition['type'] == 'element_clickable':
                WebDriverWait(self.driver, condition.get('timeout', 10)).until(
                    EC.element_to_be_clickable((By.CSS_SELECTOR, condition['selector']))
                )
            
            elif condition['type'] == 'text_present':
                WebDriverWait(self.driver, condition.get('timeout', 10)).until(
                    EC.text_to_be_present_in_element((By.CSS_SELECTOR, condition['selector']), condition['text'])
                )
            
            elif condition['type'] == 'javascript':
                WebDriverWait(self.driver, condition.get('timeout', 10)).until(
                    lambda driver: driver.execute_script(condition['script'])
                )
    
    def simulate_human_behavior(self):
        """
        模拟人类行为
        """
        # 随机等待
        time.sleep(random.uniform(1, 3))
        
        # 执行模拟行为脚本
        self.driver.execute_script(self.js_scripts['simulate_human_behavior'])
        
        # 随机滚动
        for _ in range(random.randint(1, 3)):
            self.driver.execute_script(self.js_scripts['scroll_to_bottom'])
            time.sleep(random.uniform(0.5, 1.5))
    
    def wait_for_dynamic_content(self):
        """
        等待动态内容加载
        """
        # 等待jQuery AJAX完成(如果页面使用jQuery)
        try:
            WebDriverWait(self.driver, 5).until(
                lambda driver: driver.execute_script(self.js_scripts['wait_for_ajax'])
            )
        except:
            pass  # 页面可能不使用jQuery
        
        # 等待一段时间让其他异步内容加载
        time.sleep(2)
    
    def get_page_info(self):
        """
        获取页面信息
        """
        return self.driver.execute_script(self.js_scripts['get_page_info'])
    
    def extract_dynamic_content(self):
        """
        提取动态内容
        """
        return self.driver.execute_script(self.js_scripts['extract_dynamic_content'])
    
    def take_screenshot(self):
        """
        截取页面截图
        """
        try:
            screenshot = self.driver.get_screenshot_as_base64()
            return screenshot
        except:
            return None
    
    def close(self):
        """
        关闭浏览器
        """
        if self.driver:
            self.driver.quit()
            self.driver = None

# Scrapy JavaScript中间件
class JavaScriptMiddleware:
    """
    Scrapy JavaScript渲染中间件
    """
    
    def __init__(self):
        self.renderer = JavaScriptRenderer(headless=True)
        
        # 需要JavaScript渲染的URL模式
        self.js_required_patterns = [
            r'.*\.spa\..*',  # SPA应用
            r'.*react.*',    # React应用
            r'.*vue.*',      # Vue应用
            r'.*angular.*'   # Angular应用
        ]
    
    def process_request(self, request, spider):
        """
        处理请求
        """
        # 检查是否需要JavaScript渲染
        if self.requires_javascript(request.url):
            spider.logger.info(f"使用JavaScript渲染: {request.url}")
            
            # 渲染页面
            render_result = self.renderer.render_page(request.url)
            
            if render_result:
                # 创建新的响应对象
                from scrapy.http import HtmlResponse
                
                response = HtmlResponse(
                    url=request.url,
                    body=render_result['html'].encode('utf-8'),
                    encoding='utf-8',
                    request=request
                )
                
                # 添加渲染信息到meta
                response.meta['render_info'] = render_result
                
                return response
        
        return None
    
    def requires_javascript(self, url):
        """
        检查URL是否需要JavaScript渲染
        """
        import re
        
        for pattern in self.js_required_patterns:
            if re.match(pattern, url):
                return True
        
        return False
    
    def spider_closed(self, spider):
        """
        爬虫关闭时清理资源
        """
        self.renderer.close()

# 异步JavaScript渲染器(使用Playwright)
class AsyncJavaScriptRenderer:
    """
    异步JavaScript渲染器
    """
    
    def __init__(self):
        self.browser = None
        self.context = None
    
    async def setup(self):
        """
        设置浏览器
        """
        from playwright.async_api import async_playwright
        
        self.playwright = await async_playwright().start()
        
        self.browser = await self.playwright.chromium.launch(
            headless=True,
            args=[
                '--no-sandbox',
                '--disable-dev-shm-usage',
                '--disable-gpu'
            ]
        )
        
        self.context = await self.browser.new_context(
            viewport={'width': 1920, 'height': 1080},
            user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
        )
    
    async def render_page(self, url):
        """
        异步渲染页面
        """
        if not self.browser:
            await self.setup()
        
        page = await self.context.new_page()
        
        try:
            # 访问页面
            await page.goto(url, wait_until='networkidle')
            
            # 等待动态内容
            await page.wait_for_timeout(2000)
            
            # 获取HTML内容
            html_content = await page.content()
            
            # 获取页面信息
            page_info = await page.evaluate("""
                () => ({
                    title: document.title,
                    url: window.location.href,
                    height: document.body.scrollHeight,
                    width: document.body.scrollWidth
                })
            """)
            
            return {
                'url': url,
                'html': html_content,
                'page_info': page_info
            }
        
        finally:
            await page.close()
    
    async def close(self):
        """
        关闭浏览器
        """
        if self.context:
            await self.context.close()
        
        if self.browser:
            await self.browser.close()
        
        if hasattr(self, 'playwright'):
            await self.playwright.stop()

print("JavaScript渲染和动态内容处理完成!")

4. 智能行为模拟

4.1 人类行为模拟

# 7. 人类行为模拟
print("\n🤖 人类行为模拟:")

import random
import time
import math
from datetime import datetime, timedelta

class HumanBehaviorSimulator:
    """
    人类行为模拟器
    """
    
    def __init__(self):
        # 行为模式配置
        self.behavior_patterns = {
            'browsing_speed': {
                'min_page_time': 3,    # 最小页面停留时间(秒)
                'max_page_time': 30,   # 最大页面停留时间(秒)
                'avg_page_time': 8     # 平均页面停留时间(秒)
            },
            
            'request_intervals': {
                'min_interval': 0.5,   # 最小请求间隔(秒)
                'max_interval': 5.0,   # 最大请求间隔(秒)
                'avg_interval': 2.0    # 平均请求间隔(秒)
            },
            
            'session_patterns': {
                'session_duration': (300, 1800),  # 会话持续时间范围(秒)
                'pages_per_session': (5, 50),     # 每个会话的页面数量
                'break_probability': 0.1           # 休息概率
            },
            
            'mouse_behavior': {
                'move_speed': (100, 300),          # 鼠标移动速度(像素/秒)
                'click_delay': (0.1, 0.5),        # 点击延迟(秒)
                'scroll_speed': (50, 200)         # 滚动速度(像素/秒)
            }
        }
        
        # 用户类型配置
        self.user_types = {
            'casual_browser': {
                'description': '休闲浏览者',
                'page_time_multiplier': 1.5,
                'interval_multiplier': 2.0,
                'error_rate': 0.05
            },
            'power_user': {
                'description': '高级用户',
                'page_time_multiplier': 0.8,
                'interval_multiplier': 0.6,
                'error_rate': 0.02
            },
            'researcher': {
                'description': '研究人员',
                'page_time_multiplier': 3.0,
                'interval_multiplier': 1.5,
                'error_rate': 0.01
            }
        }
        
        # 当前会话状态
        self.session_state = {
            'start_time': None,
            'pages_visited': 0,
            'last_request_time': None,
            'user_type': 'casual_browser',
            'fatigue_level': 0.0
        }
    
    def start_session(self, user_type='casual_browser'):
        """
        开始新的会话
        """
        self.session_state = {
            'start_time': time.time(),
            'pages_visited': 0,
            'last_request_time': None,
            'user_type': user_type,
            'fatigue_level': 0.0
        }
        
        print(f"开始新会话,用户类型: {self.user_types[user_type]['description']}")
    
    def calculate_page_time(self):
        """
        计算页面停留时间
        """
        base_time = self.behavior_patterns['browsing_speed']['avg_page_time']
        user_config = self.user_types[self.session_state['user_type']]
        
        # 应用用户类型修正
        adjusted_time = base_time * user_config['page_time_multiplier']
        
        # 添加疲劳因子
        fatigue_factor = 1 + self.session_state['fatigue_level'] * 0.5
        adjusted_time *= fatigue_factor
        
        # 添加随机性
        min_time = self.behavior_patterns['browsing_speed']['min_page_time']
        max_time = self.behavior_patterns['browsing_speed']['max_page_time']
        
        # 使用正态分布生成更自然的时间
        page_time = random.normalvariate(adjusted_time, adjusted_time * 0.3)
        page_time = max(min_time, min(max_time, page_time))
        
        return page_time
    
    def calculate_request_interval(self):
        """
        计算请求间隔
        """
        base_interval = self.behavior_patterns['request_intervals']['avg_interval']
        user_config = self.user_types[self.session_state['user_type']]
        
        # 应用用户类型修正
        adjusted_interval = base_interval * user_config['interval_multiplier']
        
        # 添加疲劳因子
        fatigue_factor = 1 + self.session_state['fatigue_level'] * 0.3
        adjusted_interval *= fatigue_factor
        
        # 添加随机性
        min_interval = self.behavior_patterns['request_intervals']['min_interval']
        max_interval = self.behavior_patterns['request_intervals']['max_interval']
        
        # 使用指数分布生成更真实的间隔
        interval = random.expovariate(1.0 / adjusted_interval)
        interval = max(min_interval, min(max_interval, interval))
        
        return interval
    
    def should_take_break(self):
        """
        判断是否应该休息
        """
        # 基于访问页面数和疲劳程度判断
        pages_visited = self.session_state['pages_visited']
        fatigue_level = self.session_state['fatigue_level']
        
        break_probability = self.behavior_patterns['session_patterns']['break_probability']
        
        # 随着页面访问增加,休息概率增加
        adjusted_probability = break_probability * (1 + pages_visited * 0.01)
        adjusted_probability *= (1 + fatigue_level)
        
        return random.random() < adjusted_probability
    
    def take_break(self):
        """
        模拟休息行为
        """
        break_duration = random.uniform(30, 300)  # 30秒到5分钟
        print(f"用户休息 {break_duration:.1f} 秒")
        
        # 重置疲劳程度
        self.session_state['fatigue_level'] *= 0.5
        
        return break_duration
    
    def update_fatigue(self):
        """
        更新疲劳程度
        """
        # 每访问一个页面增加疲劳
        self.session_state['fatigue_level'] += 0.01
        
        # 疲劳程度上限
        self.session_state['fatigue_level'] = min(1.0, self.session_state['fatigue_level'])
    
    def simulate_mouse_movement(self, start_pos, end_pos):
        """
        模拟鼠标移动轨迹
        """
        x1, y1 = start_pos
        x2, y2 = end_pos
        
        # 计算距离
        distance = math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)
        
        # 计算移动时间
        speed = random.uniform(*self.behavior_patterns['mouse_behavior']['move_speed'])
        move_time = distance / speed
        
        # 生成贝塞尔曲线轨迹
        trajectory = []
        steps = max(10, int(move_time * 60))  # 60 FPS
        
        # 控制点(添加随机性)
        cx = (x1 + x2) / 2 + random.uniform(-50, 50)
        cy = (y1 + y2) / 2 + random.uniform(-50, 50)
        
        for i in range(steps + 1):
            t = i / steps
            
            # 二次贝塞尔曲线
            x = (1 - t) ** 2 * x1 + 2 * (1 - t) * t * cx + t ** 2 * x2
            y = (1 - t) ** 2 * y1 + 2 * (1 - t) * t * cy + t ** 2 * y2
            
            trajectory.append((int(x), int(y), move_time / steps))
        
        return trajectory
    
    def simulate_typing(self, text):
        """
        模拟打字行为
        """
        typing_pattern = []
        
        for char in text:
            # 基础打字速度(字符/分钟)
            base_speed = random.uniform(200, 400)
            
            # 字符难度修正
            if char.isupper():
                speed_modifier = 0.8  # 大写字母稍慢
            elif char.isdigit():
                speed_modifier = 0.9  # 数字稍慢
            elif char in '!@#$%^&*()':
                speed_modifier = 0.7  # 特殊字符更慢
            else:
                speed_modifier = 1.0
            
            # 计算按键间隔
            interval = 60.0 / (base_speed * speed_modifier)
            
            # 添加随机性
            interval *= random.uniform(0.7, 1.3)
            
            typing_pattern.append((char, interval))
        
        return typing_pattern
    
    def get_next_action_delay(self):
        """
        获取下一个动作的延迟时间
        """
        current_time = time.time()
        
        if self.session_state['last_request_time']:
            # 计算建议的间隔时间
            suggested_interval = self.calculate_request_interval()
            elapsed_time = current_time - self.session_state['last_request_time']
            
            if elapsed_time < suggested_interval:
                delay = suggested_interval - elapsed_time
            else:
                delay = 0
        else:
            delay = 0
        
        # 更新状态
        self.session_state['last_request_time'] = current_time + delay
        self.session_state['pages_visited'] += 1
        self.update_fatigue()
        
        return delay

# 智能请求调度器
class IntelligentRequestScheduler:
    """
    智能请求调度器
    """
    
    def __init__(self):
        self.behavior_simulator = HumanBehaviorSimulator()
        self.request_queue = []
        self.active_sessions = {}
        
        # 调度配置
        self.scheduler_config = {
            'max_concurrent_sessions': 5,
            'session_rotation_interval': 300,  # 5分钟
            'adaptive_delay': True,
            'respect_robots_txt': True
        }
    
    def add_request(self, request, priority=1):
        """
        添加请求到队列
        """
        request_info = {
            'request': request,
            'priority': priority,
            'added_time': time.time(),
            'attempts': 0,
            'session_id': self.get_session_for_domain(request.url)
        }
        
        self.request_queue.append(request_info)
        self.request_queue.sort(key=lambda x: x['priority'], reverse=True)
    
    def get_session_for_domain(self, url):
        """
        为域名获取或创建会话
        """
        from urllib.parse import urlparse
        domain = urlparse(url).netloc
        
        if domain not in self.active_sessions:
            session_id = f"{domain}_{int(time.time())}"
            self.active_sessions[domain] = {
                'session_id': session_id,
                'start_time': time.time(),
                'request_count': 0,
                'last_request_time': None
            }
        
        return self.active_sessions[domain]['session_id']
    
    def get_next_request(self):
        """
        获取下一个要处理的请求
        """
        if not self.request_queue:
            return None
        
        # 选择优先级最高且可以立即处理的请求
        for i, request_info in enumerate(self.request_queue):
            if self.can_process_request(request_info):
                return self.request_queue.pop(i)
        
        # 如果没有可立即处理的请求,返回延迟时间最短的
        return self.request_queue.pop(0)
    
    def can_process_request(self, request_info):
        """
        检查请求是否可以立即处理
        """
        session_id = request_info['session_id']
        
        # 检查会话是否存在
        for domain, session in self.active_sessions.items():
            if session['session_id'] == session_id:
                # 检查请求间隔
                if session['last_request_time']:
                    elapsed = time.time() - session['last_request_time']
                    min_interval = self.behavior_simulator.calculate_request_interval()
                    
                    return elapsed >= min_interval
                
                return True
        
        return True
    
    def calculate_adaptive_delay(self, response_time, error_rate):
        """
        计算自适应延迟
        """
        base_delay = self.behavior_simulator.calculate_request_interval()
        
        # 根据响应时间调整
        if response_time > 5.0:  # 响应时间超过5秒
            delay_multiplier = 1.5
        elif response_time > 2.0:  # 响应时间超过2秒
            delay_multiplier = 1.2
        else:
            delay_multiplier = 1.0
        
        # 根据错误率调整
        if error_rate > 0.1:  # 错误率超过10%
            delay_multiplier *= 2.0
        elif error_rate > 0.05:  # 错误率超过5%
            delay_multiplier *= 1.5
        
        return base_delay * delay_multiplier

print("人类行为模拟完成!")

4.2 会话管理和状态保持

# 8. 会话管理和状态保持
print("\n🔄 会话管理和状态保持:")

import pickle
import sqlite3
from http.cookies import SimpleCookie

class SessionManager:
    """
    会话管理器
    """
    
    def __init__(self, storage_type='file'):
        self.storage_type = storage_type
        self.sessions = {}
        
        if storage_type == 'database':
            self.init_database()
    
    def init_database(self):
        """
        初始化数据库
        """
        self.conn = sqlite3.connect('sessions.db')
        cursor = self.conn.cursor()
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS sessions (
                session_id TEXT PRIMARY KEY,
                domain TEXT,
                cookies TEXT,
                headers TEXT,
                user_agent TEXT,
                proxy TEXT,
                created_time REAL,
                last_used_time REAL,
                request_count INTEGER,
                success_count INTEGER
            )
        ''')
        
        self.conn.commit()
    
    def create_session(self, domain, user_agent=None, proxy=None):
        """
        创建新会话
        """
        session_id = f"{domain}_{int(time.time())}_{random.randint(1000, 9999)}"
        
        session_data = {
            'session_id': session_id,
            'domain': domain,
            'cookies': {},
            'headers': {},
            'user_agent': user_agent,
            'proxy': proxy,
            'created_time': time.time(),
            'last_used_time': time.time(),
            'request_count': 0,
            'success_count': 0,
            'state': 'active'
        }
        
        self.sessions[session_id] = session_data
        self.save_session(session_data)
        
        return session_id
    
    def get_session(self, session_id):
        """
        获取会话
        """
        if session_id in self.sessions:
            return self.sessions[session_id]
        
        # 从存储中加载
        return self.load_session(session_id)
    
    def update_session(self, session_id, **kwargs):
        """
        更新会话
        """
        if session_id in self.sessions:
            session = self.sessions[session_id]
            
            for key, value in kwargs.items():
                if key in session:
                    session[key] = value
            
            session['last_used_time'] = time.time()
            self.save_session(session)
    
    def add_cookies(self, session_id, cookies):
        """
        添加Cookie到会话
        """
        session = self.get_session(session_id)
        if session:
            if isinstance(cookies, dict):
                session['cookies'].update(cookies)
            elif isinstance(cookies, str):
                # 解析Cookie字符串
                cookie_obj = SimpleCookie()
                cookie_obj.load(cookies)
                
                for key, morsel in cookie_obj.items():
                    session['cookies'][key] = morsel.value
            
            self.save_session(session)
    
    def get_cookies_for_request(self, session_id):
        """
        获取请求的Cookie
        """
        session = self.get_session(session_id)
        if session and session['cookies']:
            return session['cookies']
        
        return {}
    
    def save_session(self, session_data):
        """
        保存会话
        """
        if self.storage_type == 'file':
            filename = f"session_{session_data['session_id']}.pkl"
            with open(filename, 'wb') as f:
                pickle.dump(session_data, f)
        
        elif self.storage_type == 'database':
            cursor = self.conn.cursor()
            
            cursor.execute('''
                INSERT OR REPLACE INTO sessions 
                (session_id, domain, cookies, headers, user_agent, proxy, 
                 created_time, last_used_time, request_count, success_count)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                session_data['session_id'],
                session_data['domain'],
                json.dumps(session_data['cookies']),
                json.dumps(session_data['headers']),
                session_data['user_agent'],
                session_data['proxy'],
                session_data['created_time'],
                session_data['last_used_time'],
                session_data['request_count'],
                session_data['success_count']
            ))
            
            self.conn.commit()
    
    def load_session(self, session_id):
        """
        加载会话
        """
        if self.storage_type == 'file':
            filename = f"session_{session_id}.pkl"
            try:
                with open(filename, 'rb') as f:
                    session_data = pickle.load(f)
                    self.sessions[session_id] = session_data
                    return session_data
            except FileNotFoundError:
                return None
        
        elif self.storage_type == 'database':
            cursor = self.conn.cursor()
            cursor.execute('SELECT * FROM sessions WHERE session_id = ?', (session_id,))
            row = cursor.fetchone()
            
            if row:
                session_data = {
                    'session_id': row[0],
                    'domain': row[1],
                    'cookies': json.loads(row[2]),
                    'headers': json.loads(row[3]),
                    'user_agent': row[4],
                    'proxy': row[5],
                    'created_time': row[6],
                    'last_used_time': row[7],
                    'request_count': row[8],
                    'success_count': row[9]
                }
                
                self.sessions[session_id] = session_data
                return session_data
        
        return None
    
    def cleanup_expired_sessions(self, max_age=3600):
        """
        清理过期会话
        """
        current_time = time.time()
        expired_sessions = []
        
        for session_id, session in self.sessions.items():
            if current_time - session['last_used_time'] > max_age:
                expired_sessions.append(session_id)
        
        for session_id in expired_sessions:
            del self.sessions[session_id]
            
            if self.storage_type == 'database':
                cursor = self.conn.cursor()
                cursor.execute('DELETE FROM sessions WHERE session_id = ?', (session_id,))
                self.conn.commit()

# Scrapy会话中间件
class SessionMiddleware:
    """
    Scrapy会话管理中间件
    """
    
    def __init__(self):
        self.session_manager = SessionManager(storage_type='database')
        self.domain_sessions = {}
    
    def process_request(self, request, spider):
        """
        处理请求
        """
        from urllib.parse import urlparse
        domain = urlparse(request.url).netloc
        
        # 获取或创建会话
        if domain not in self.domain_sessions:
            session_id = self.session_manager.create_session(domain)
            self.domain_sessions[domain] = session_id
        else:
            session_id = self.domain_sessions[domain]
        
        # 获取会话数据
        session = self.session_manager.get_session(session_id)
        
        if session:
            # 设置Cookie
            cookies = self.session_manager.get_cookies_for_request(session_id)
            if cookies:
                request.cookies.update(cookies)
            
            # 设置User-Agent
            if session['user_agent']:
                request.headers['User-Agent'] = session['user_agent']
            
            # 设置代理
            if session['proxy']:
                request.meta['proxy'] = session['proxy']
            
            # 记录会话ID
            request.meta['session_id'] = session_id
        
        return None
    
    def process_response(self, request, response, spider):
        """
        处理响应
        """
        session_id = request.meta.get('session_id')
        
        if session_id:
            # 更新会话统计
            self.session_manager.update_session(
                session_id,
                request_count=self.session_manager.get_session(session_id)['request_count'] + 1
            )
            
            # 提取并保存Cookie
            if 'Set-Cookie' in response.headers:
                cookies = response.headers.getlist('Set-Cookie')
                for cookie in cookies:
                    self.session_manager.add_cookies(session_id, cookie.decode())
            
            # 如果响应成功,更新成功计数
            if response.status == 200:
                session = self.session_manager.get_session(session_id)
                self.session_manager.update_session(
                    session_id,
                    success_count=session['success_count'] + 1
                )
        
        return response

print("会话管理和状态保持完成!")

5. 实战案例

5.1 综合反爬虫对抗案例

# 9. 综合反爬虫对抗案例
print("\n🎯 综合反爬虫对抗案例:")

import scrapy
from scrapy.http import Request
import logging

class AdvancedAntiSpiderSpider(scrapy.Spider):
    """
    高级反爬虫对抗爬虫示例
    """
    
    name = 'advanced_anti_spider'
    
    custom_settings = {
        'DOWNLOAD_DELAY': 2,
        'RANDOMIZE_DOWNLOAD_DELAY': 0.5,
        'CONCURRENT_REQUESTS': 1,
        'CONCURRENT_REQUESTS_PER_DOMAIN': 1,
        
        'DOWNLOADER_MIDDLEWARES': {
            'myproject.middlewares.AdvancedUserAgentMiddleware': 400,
            'myproject.middlewares.ProxyMiddleware': 410,
            'myproject.middlewares.SessionMiddleware': 420,
            'myproject.middlewares.CaptchaMiddleware': 430,
            'myproject.middlewares.JavaScriptMiddleware': 440,
        },
        
        'ITEM_PIPELINES': {
            'myproject.pipelines.DataValidationPipeline': 300,
            'myproject.pipelines.DuplicateFilterPipeline': 400,
        }
    }
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
        # 初始化组件
        self.behavior_simulator = HumanBehaviorSimulator()
        self.behavior_simulator.start_session('researcher')
        
        # 目标网站配置
        self.target_config = {
            'base_url': 'https://example-protected-site.com',
            'login_required': True,
            'has_captcha': True,
            'uses_javascript': True,
            'rate_limited': True
        }
        
        # 爬取统计
        self.stats = {
            'requests_sent': 0,
            'responses_received': 0,
            'captchas_solved': 0,
            'errors_encountered': 0,
            'data_extracted': 0
        }
    
    def start_requests(self):
        """
        生成初始请求
        """
        # 首先访问首页建立会话
        yield Request(
            url=self.target_config['base_url'],
            callback=self.parse_homepage,
            meta={
                'dont_cache': True,
                'handle_httpstatus_list': [403, 429, 503]
            }
        )
    
    def parse_homepage(self, response):
        """
        解析首页
        """
        self.logger.info(f"访问首页: {response.status}")
        
        # 检查是否需要登录
        if self.target_config['login_required']:
            login_url = response.urljoin('/login')
            
            yield Request(
                url=login_url,
                callback=self.parse_login_page,
                meta=response.meta
            )
        else:
            # 直接开始爬取数据
            yield from self.generate_data_requests(response)
    
    def parse_login_page(self, response):
        """
        解析登录页面
        """
        self.logger.info("处理登录页面")
        
        # 检查是否有验证码
        captcha_img = response.css('img[src*="captcha"]::attr(src)').get()
        
        if captcha_img:
            self.logger.info("检测到验证码,开始处理")
            
            # 这里应该调用验证码处理逻辑
            # 为了演示,我们假设验证码已解决
            captcha_solution = "ABCD"  # 实际应该调用验证码识别
            self.stats['captchas_solved'] += 1
        else:
            captcha_solution = None
        
        # 提取登录表单
        form_data = {
            'username': 'your_username',
            'password': 'your_password'
        }
        
        if captcha_solution:
            form_data['captcha'] = captcha_solution
        
        # 提交登录表单
        yield scrapy.FormRequest.from_response(
            response,
            formdata=form_data,
            callback=self.after_login,
            meta=response.meta
        )
    
    def after_login(self, response):
        """
        登录后处理
        """
        if "欢迎" in response.text or "dashboard" in response.url.lower():
            self.logger.info("登录成功")
            
            # 开始爬取数据
            yield from self.generate_data_requests(response)
        else:
            self.logger.error("登录失败")
            self.stats['errors_encountered'] += 1
    
    def generate_data_requests(self, response):
        """
        生成数据爬取请求
        """
        # 模拟人类浏览行为
        delay = self.behavior_simulator.get_next_action_delay()
        
        # 生成目标URL列表
        target_urls = [
            '/products',
            '/categories',
            '/search?q=example'
        ]
        
        for url in target_urls:
            full_url = response.urljoin(url)
            
            yield Request(
                url=full_url,
                callback=self.parse_data_page,
                meta={
                    'delay': delay,
                    'dont_cache': True
                }
            )
    
    def parse_data_page(self, response):
        """
        解析数据页面
        """
        self.logger.info(f"解析数据页面: {response.url}")
        
        # 检查是否遇到反爬虫措施
        if self.is_blocked(response):
            self.logger.warning("检测到反爬虫阻拦")
            yield from self.handle_blocking(response)
            return
        
        # 提取数据
        items = response.css('.item')
        
        for item in items:
            data = {
                'title': item.css('.title::text').get(),
                'price': item.css('.price::text').get(),
                'description': item.css('.description::text').get(),
                'url': response.url,
                'extracted_time': time.time()
            }
            
            if data['title']:  # 基本验证
                self.stats['data_extracted'] += 1
                yield data
        
        # 查找下一页
        next_page = response.css('.pagination .next::attr(href)').get()
        if next_page:
            # 计算延迟时间
            delay = self.behavior_simulator.get_next_action_delay()
            
            yield Request(
                url=response.urljoin(next_page),
                callback=self.parse_data_page,
                meta={
                    'delay': delay,
                    'dont_cache': True
                }
            )
    
    def is_blocked(self, response):
        """
        检查是否被阻拦
        """
        # 检查状态码
        if response.status in [403, 429, 503]:
            return True
        
        # 检查页面内容
        blocking_indicators = [
            '访问被拒绝',
            'access denied',
            '验证码',
            'captcha',
            '请稍后再试',
            'rate limit'
        ]
        
        page_text = response.text.lower()
        
        for indicator in blocking_indicators:
            if indicator in page_text:
                return True
        
        return False
    
    def handle_blocking(self, response):
        """
        处理阻拦情况
        """
        self.logger.info("处理反爬虫阻拦")
        
        # 增加延迟
        extended_delay = self.behavior_simulator.calculate_request_interval() * 3
        
        # 如果是验证码页面
        if 'captcha' in response.text.lower():
            self.logger.info("遇到验证码页面")
            
            # 这里应该调用验证码处理逻辑
            # 为了演示,我们简单地重试
            yield Request(
                url=response.url,
                callback=self.parse_data_page,
                meta={
                    'delay': extended_delay,
                    'dont_cache': True
                },
                dont_filter=True
            )
        
        # 如果是频率限制
        elif response.status == 429:
            self.logger.info("遇到频率限制,等待后重试")
            
            yield Request(
                url=response.url,
                callback=self.parse_data_page,
                meta={
                    'delay': extended_delay * 2,
                    'dont_cache': True
                },
                dont_filter=True
            )
    
    def closed(self, reason):
        """
        爬虫关闭时的清理工作
        """
        self.logger.info("爬虫关闭,统计信息:")
        for key, value in self.stats.items():
            self.logger.info(f"  {key}: {value}")

print("综合反爬虫对抗案例完成!")

6. 本章小结

本章详细介绍了反爬虫对抗的各种技术和策略:

主要内容回顾

  1. 反爬虫技术概述

    • 常见反爬虫技术分类
    • 检测原理和实现机制
    • 风险评估和应对策略
  2. 基础对抗技术

    • 请求头和User-Agent管理
    • 代理管理系统
    • 会话状态保持
  3. 高级对抗技术

    • 验证码识别与处理
    • JavaScript渲染和动态内容
    • 浏览器自动化技术
  4. 智能行为模拟

    • 人类行为模式分析
    • 智能请求调度
    • 自适应延迟策略
  5. 实战案例

    • 综合反爬虫对抗实现
    • 多种技术组合应用
    • 错误处理和恢复机制

最佳实践

  1. 渐进式对抗策略

    • 从简单技术开始
    • 根据需要逐步升级
    • 避免过度工程化
  2. 合理使用资源

    • 平衡效率和隐蔽性
    • 合理配置并发和延迟
    • 监控资源使用情况
  3. 遵守法律法规

    • 尊重robots.txt
    • 避免对服务器造成过大压力
    • 遵守相关法律法规

常见陷阱

  1. 过度依赖单一技术

    • 应该组合使用多种技术
    • 建立多层防护体系
  2. 忽视行为模式

    • 机械化的请求模式容易被识别
    • 需要模拟真实用户行为
  3. 缺乏错误处理

    • 必须处理各种异常情况
    • 建立完善的重试机制

下一步学习建议

  1. 深入学习浏览器自动化

    • Selenium高级用法
    • Playwright和Puppeteer
    • 无头浏览器优化
  2. 机器学习在反爬虫中的应用

    • 验证码识别模型
    • 行为模式学习
    • 异常检测算法
  3. 分布式爬虫架构

    • 大规模爬虫系统设计
    • 负载均衡和容错
    • 监控和运维

练习题

  1. 基础练习

    • 实现一个简单的User-Agent轮换中间件
    • 编写代理验证和管理程序
    • 创建基本的会话管理系统
  2. 进阶练习

    • 实现滑块验证码自动识别
    • 开发JavaScript渲染中间件
    • 设计智能请求调度算法
  3. 综合项目

    • 构建完整的反爬虫对抗系统
    • 实现多网站适配的通用爬虫
    • 开发爬虫监控和管理平台

第7章完结! 🎉

下一章我们将学习Scrapy的部署与运维,包括生产环境部署、监控、日志管理等内容。