7.1 集成学习概述

7.1.1 集成学习的基本思想

集成学习(Ensemble Learning)是机器学习中的一种重要方法,通过组合多个学习器来获得比单一学习器更好的预测性能。

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import make_classification, make_regression, load_iris, load_boston
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.ensemble import (RandomForestClassifier, RandomForestRegressor,
                             BaggingClassifier, BaggingRegressor,
                             AdaBoostClassifier, AdaBoostRegressor,
                             GradientBoostingClassifier, GradientBoostingRegressor,
                             VotingClassifier, VotingRegressor,
                             ExtraTreesClassifier, ExtraTreesRegressor)
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.svm import SVC, SVR
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import (accuracy_score, classification_report, 
                           mean_squared_error, r2_score, confusion_matrix)
from sklearn.preprocessing import StandardScaler
import warnings
warnings.filterwarnings('ignore')

# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

class EnsembleLearningIntro:
    def __init__(self):
        self.models = {}
        
    def ensemble_theory(self):
        """集成学习理论介绍"""
        print("=== 集成学习理论 ===")
        print("1. 基本思想:")
        print("   - 三个臭皮匠,顶个诸葛亮")
        print("   - 通过组合多个弱学习器构建强学习器")
        print("   - 降低过拟合风险,提高泛化能力")
        
        print("\n2. 集成学习的优势:")
        print("   - 提高预测精度")
        print("   - 增强模型鲁棒性")
        print("   - 减少过拟合")
        print("   - 处理不同类型的数据")
        
        print("\n3. 集成学习的条件:")
        print("   - 个体学习器要有一定的准确性(好于随机猜测)")
        print("   - 个体学习器之间要有差异性(多样性)")
        
        print("\n4. 主要方法分类:")
        print("   - Bagging:并行训练,降低方差")
        print("   - Boosting:串行训练,降低偏差")
        print("   - Stacking:分层训练,元学习器")
        
    def diversity_importance_demo(self):
        """演示多样性的重要性"""
        print("\n=== 多样性重要性演示 ===")
        
        # 创建数据集
        X, y = make_classification(n_samples=1000, n_features=20, n_informative=10,
                                 n_redundant=10, n_clusters_per_class=1, random_state=42)
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
        
        # 创建不同类型的基学习器
        base_models = {
            '决策树': DecisionTreeClassifier(random_state=42),
            '逻辑回归': LogisticRegression(random_state=42, max_iter=1000),
            '朴素贝叶斯': GaussianNB(),
            'KNN': KNeighborsClassifier(),
            'SVM': SVC(probability=True, random_state=42)
        }
        
        # 评估单个模型
        print("单个模型性能:")
        individual_scores = {}
        predictions = {}
        
        for name, model in base_models.items():
            if name == 'SVM':
                # SVM需要标准化
                scaler = StandardScaler()
                X_train_scaled = scaler.fit_transform(X_train)
                X_test_scaled = scaler.transform(X_test)
                model.fit(X_train_scaled, y_train)
                pred = model.predict(X_test_scaled)
                pred_proba = model.predict_proba(X_test_scaled)
            else:
                model.fit(X_train, y_train)
                pred = model.predict(X_test)
                pred_proba = model.predict_proba(X_test) if hasattr(model, 'predict_proba') else None
            
            score = accuracy_score(y_test, pred)
            individual_scores[name] = score
            predictions[name] = pred
            print(f"  {name}: {score:.3f}")
        
        # 简单投票集成
        print("\n集成方法性能:")
        
        # 硬投票
        ensemble_pred_hard = np.array([predictions[name] for name in base_models.keys()])
        final_pred_hard = np.apply_along_axis(lambda x: np.bincount(x).argmax(), 
                                            axis=0, arr=ensemble_pred_hard)
        hard_voting_score = accuracy_score(y_test, final_pred_hard)
        print(f"  硬投票集成: {hard_voting_score:.3f}")
        
        # 可视化结果
        self.visualize_diversity_effect(individual_scores, hard_voting_score)
        
        return individual_scores, hard_voting_score
        
    def visualize_diversity_effect(self, individual_scores, ensemble_score):
        """可视化多样性效果"""
        fig, axes = plt.subplots(1, 2, figsize=(15, 5))
        
        # 1. 个体模型vs集成模型性能比较
        models = list(individual_scores.keys()) + ['硬投票集成']
        scores = list(individual_scores.values()) + [ensemble_score]
        colors = ['skyblue'] * len(individual_scores) + ['red']
        
        bars = axes[0].bar(models, scores, color=colors, alpha=0.7)
        axes[0].set_title('个体模型 vs 集成模型性能')
        axes[0].set_ylabel('准确率')
        axes[0].tick_params(axis='x', rotation=45)
        axes[0].grid(True, alpha=0.3)
        
        # 添加数值标注
        for bar, score in zip(bars, scores):
            axes[0].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.005,
                        f'{score:.3f}', ha='center', va='bottom')
        
        # 2. 集成学习优势示意图
        x = np.linspace(0, 1, 100)
        
        # 模拟个体学习器的错误率
        error1 = 0.3 + 0.1 * np.sin(10 * x)
        error2 = 0.25 + 0.15 * np.cos(8 * x)
        error3 = 0.35 + 0.1 * np.sin(12 * x + np.pi/4)
        
        # 集成后的错误率(简化模拟)
        ensemble_error = (error1 + error2 + error3) / 3 - 0.05
        
        axes[1].plot(x, error1, '--', label='学习器1', alpha=0.7)
        axes[1].plot(x, error2, '--', label='学习器2', alpha=0.7)
        axes[1].plot(x, error3, '--', label='学习器3', alpha=0.7)
        axes[1].plot(x, ensemble_error, '-', linewidth=3, label='集成学习器', color='red')
        
        axes[1].set_title('集成学习降低错误率示意')
        axes[1].set_xlabel('数据复杂度')
        axes[1].set_ylabel('错误率')
        axes[1].legend()
        axes[1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
    def bias_variance_analysis(self):
        """偏差-方差分析"""
        print("\n=== 偏差-方差分析 ===")
        print("1. 偏差(Bias):")
        print("   - 模型预测值与真实值之间的差异")
        print("   - 高偏差 → 欠拟合")
        print("   - Boosting主要降低偏差")
        
        print("\n2. 方差(Variance):")
        print("   - 模型在不同训练集上预测的变化程度")
        print("   - 高方差 → 过拟合")
        print("   - Bagging主要降低方差")
        
        print("\n3. 集成学习的作用:")
        print("   - Bagging:通过平均降低方差")
        print("   - Boosting:通过加权组合降低偏差")
        print("   - 理想情况:同时降低偏差和方差")
        
        # 可视化偏差-方差权衡
        self.visualize_bias_variance_tradeoff()
        
    def visualize_bias_variance_tradeoff(self):
        """可视化偏差-方差权衡"""
        fig, axes = plt.subplots(1, 3, figsize=(18, 5))
        
        # 1. 偏差-方差分解
        complexity = np.linspace(0.1, 2, 50)
        bias_squared = 1 / (complexity + 0.1)  # 偏差随复杂度降低
        variance = complexity ** 1.5  # 方差随复杂度增加
        noise = np.ones_like(complexity) * 0.1  # 噪声固定
        total_error = bias_squared + variance + noise
        
        axes[0].plot(complexity, bias_squared, label='偏差²', linewidth=2)
        axes[0].plot(complexity, variance, label='方差', linewidth=2)
        axes[0].plot(complexity, noise, label='噪声', linewidth=2)
        axes[0].plot(complexity, total_error, label='总误差', linewidth=3, linestyle='--')
        
        # 找到最优复杂度
        optimal_idx = np.argmin(total_error)
        axes[0].axvline(x=complexity[optimal_idx], color='red', linestyle=':', 
                       label=f'最优复杂度: {complexity[optimal_idx]:.2f}')
        
        axes[0].set_xlabel('模型复杂度')
        axes[0].set_ylabel('误差')
        axes[0].set_title('偏差-方差权衡')
        axes[0].legend()
        axes[0].grid(True, alpha=0.3)
        
        # 2. Bagging效果示意
        x = np.linspace(0, 10, 100)
        true_function = np.sin(x)
        
        # 模拟多个模型的预测
        np.random.seed(42)
        n_models = 5
        predictions = []
        
        for i in range(n_models):
            noise = np.random.normal(0, 0.3, len(x))
            pred = true_function + noise + 0.1 * np.random.randn()
            predictions.append(pred)
            axes[1].plot(x, pred, '--', alpha=0.5, color='blue')
        
        # Bagging平均
        bagging_pred = np.mean(predictions, axis=0)
        
        axes[1].plot(x, true_function, 'k-', linewidth=3, label='真实函数')
        axes[1].plot(x, bagging_pred, 'r-', linewidth=3, label='Bagging平均')
        axes[1].set_title('Bagging降低方差')
        axes[1].set_xlabel('x')
        axes[1].set_ylabel('y')
        axes[1].legend()
        axes[1].grid(True, alpha=0.3)
        
        # 3. Boosting效果示意
        # 模拟Boosting过程
        residuals = true_function.copy()
        boosting_pred = np.zeros_like(x)
        
        for i in range(3):
            # 简化的弱学习器(线性拟合残差)
            weak_pred = 0.3 * residuals + 0.1 * np.random.randn(len(x))
            boosting_pred += weak_pred
            residuals -= weak_pred
            
            axes[2].plot(x, boosting_pred, '--', alpha=0.7, 
                        label=f'第{i+1}轮' if i < 2 else '最终预测')
        
        axes[2].plot(x, true_function, 'k-', linewidth=3, label='真实函数')
        axes[2].set_title('Boosting降低偏差')
        axes[2].set_xlabel('x')
        axes[2].set_ylabel('y')
        axes[2].legend()
        axes[2].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()

# 演示集成学习概述
print("=== 集成学习概述演示 ===")
ensemble_intro = EnsembleLearningIntro()
ensemble_intro.ensemble_theory()
individual_scores, ensemble_score = ensemble_intro.diversity_importance_demo()
ensemble_intro.bias_variance_analysis()

7.2 Bagging方法

7.2.1 Bagging基本原理

Bagging(Bootstrap Aggregating)是一种并行集成方法,通过自助采样训练多个模型,然后平均它们的预测结果。

class BaggingDemo:
    def __init__(self):
        self.models = {}
        
    def bagging_theory(self):
        """Bagging理论介绍"""
        print("=== Bagging理论 ===")
        print("1. 基本思想:")
        print("   - Bootstrap采样:有放回抽样")
        print("   - Aggregating:聚合多个模型的预测")
        print("   - 并行训练,独立性强")
        
        print("\n2. 算法流程:")
        print("   - 从原始训练集中进行Bootstrap采样")
        print("   - 在每个采样集上训练一个基学习器")
        print("   - 对分类问题进行投票,对回归问题求平均")
        
        print("\n3. 主要优势:")
        print("   - 降低方差,减少过拟合")
        print("   - 提高模型稳定性")
        print("   - 可以并行训练")
        print("   - 对噪声数据鲁棒")
        
        print("\n4. 适用场景:")
        print("   - 基学习器容易过拟合(如决策树)")
        print("   - 数据集较小")
        print("   - 需要提高模型稳定性")
        
    def bootstrap_sampling_demo(self):
        """Bootstrap采样演示"""
        print("\n=== Bootstrap采样演示 ===")
        
        # 原始数据集
        original_data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
        print(f"原始数据集: {original_data}")
        
        # 进行多次Bootstrap采样
        n_samples = len(original_data)
        n_bootstrap = 5
        
        bootstrap_samples = []
        for i in range(n_bootstrap):
            # 有放回采样
            indices = np.random.choice(n_samples, size=n_samples, replace=True)
            sample = original_data[indices]
            bootstrap_samples.append(sample)
            print(f"Bootstrap样本 {i+1}: {sample}")
        
        # 分析采样特性
        self.analyze_bootstrap_properties(original_data, bootstrap_samples)
        
        return bootstrap_samples
        
    def analyze_bootstrap_properties(self, original_data, bootstrap_samples):
        """分析Bootstrap采样特性"""
        print("\n=== Bootstrap采样特性分析 ===")
        
        # 计算每个样本的出现频率
        n_samples = len(original_data)
        
        # 理论上,每个样本不被选中的概率
        prob_not_selected = (1 - 1/n_samples) ** n_samples
        prob_selected = 1 - prob_not_selected
        
        print(f"理论上每个样本被选中的概率: {prob_selected:.3f}")
        print(f"理论上约有 {prob_selected * n_samples:.1f} 个不同样本被选中")
        
        # 实际统计
        all_selected = np.concatenate(bootstrap_samples)
        unique_selected = np.unique(all_selected)
        
        print(f"实际选中的不同样本数: {len(unique_selected)}")
        print(f"未被选中的样本: {set(original_data) - set(unique_selected)}")
        
        # 可视化Bootstrap采样
        self.visualize_bootstrap_sampling(original_data, bootstrap_samples)
        
    def visualize_bootstrap_sampling(self, original_data, bootstrap_samples):
        """可视化Bootstrap采样"""
        fig, axes = plt.subplots(2, 3, figsize=(18, 10))
        
        # 1. 原始数据分布
        axes[0, 0].bar(range(len(original_data)), original_data, alpha=0.7)
        axes[0, 0].set_title('原始数据集')
        axes[0, 0].set_xlabel('索引')
        axes[0, 0].set_ylabel('值')
        axes[0, 0].grid(True, alpha=0.3)
        
        # 2-6. Bootstrap样本分布
        for i, sample in enumerate(bootstrap_samples):
            row = (i + 1) // 3
            col = (i + 1) % 3
            
            # 统计每个值的出现次数
            unique_vals, counts = np.unique(sample, return_counts=True)
            
            axes[row, col].bar(unique_vals, counts, alpha=0.7)
            axes[row, col].set_title(f'Bootstrap样本 {i+1}')
            axes[row, col].set_xlabel('值')
            axes[row, col].set_ylabel('出现次数')
            axes[row, col].grid(True, alpha=0.3)
            axes[row, col].set_xlim(0, 11)
        
        plt.tight_layout()
        plt.show()
        
    def bagging_classifier_demo(self):
        """Bagging分类器演示"""
        print("\n=== Bagging分类器演示 ===")
        
        # 创建数据集
        X, y = make_classification(n_samples=1000, n_features=2, n_redundant=0,
                                 n_informative=2, n_clusters_per_class=1, random_state=42)
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
        
        # 基学习器:决策树
        base_classifier = DecisionTreeClassifier(random_state=42)
        
        # Bagging分类器
        bagging_classifier = BaggingClassifier(
            base_estimator=base_classifier,
            n_estimators=10,
            random_state=42,
            bootstrap=True,
            n_jobs=-1
        )
        
        # 训练模型
        base_classifier.fit(X_train, y_train)
        bagging_classifier.fit(X_train, y_train)
        
        # 预测
        base_pred = base_classifier.predict(X_test)
        bagging_pred = bagging_classifier.predict(X_test)
        
        # 评估性能
        base_score = accuracy_score(y_test, base_pred)
        bagging_score = accuracy_score(y_test, bagging_pred)
        
        print(f"单个决策树准确率: {base_score:.3f}")
        print(f"Bagging准确率: {bagging_score:.3f}")
        print(f"性能提升: {bagging_score - base_score:.3f}")
        
        # 可视化决策边界
        self.visualize_bagging_decision_boundary(X_train, y_train, X_test, y_test,
                                               base_classifier, bagging_classifier)
        
        return base_score, bagging_score
        
    def visualize_bagging_decision_boundary(self, X_train, y_train, X_test, y_test,
                                          base_classifier, bagging_classifier):
        """可视化Bagging决策边界"""
        fig, axes = plt.subplots(1, 3, figsize=(18, 5))
        
        # 创建网格
        h = 0.02
        x_min, x_max = X_train[:, 0].min() - 1, X_train[:, 0].max() + 1
        y_min, y_max = X_train[:, 1].min() - 1, X_train[:, 1].max() + 1
        xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
                           np.arange(y_min, y_max, h))
        
        # 1. 单个决策树
        Z1 = base_classifier.predict(np.c_[xx.ravel(), yy.ravel()])
        Z1 = Z1.reshape(xx.shape)
        
        axes[0].contourf(xx, yy, Z1, alpha=0.4, cmap=plt.cm.RdYlBu)
        scatter = axes[0].scatter(X_train[:, 0], X_train[:, 1], c=y_train, cmap=plt.cm.RdYlBu)
        axes[0].set_title('单个决策树决策边界')
        axes[0].set_xlabel('特征1')
        axes[0].set_ylabel('特征2')
        
        # 2. Bagging集成
        Z2 = bagging_classifier.predict(np.c_[xx.ravel(), yy.ravel()])
        Z2 = Z2.reshape(xx.shape)
        
        axes[1].contourf(xx, yy, Z2, alpha=0.4, cmap=plt.cm.RdYlBu)
        axes[1].scatter(X_train[:, 0], X_train[:, 1], c=y_train, cmap=plt.cm.RdYlBu)
        axes[1].set_title('Bagging决策边界')
        axes[1].set_xlabel('特征1')
        axes[1].set_ylabel('特征2')
        
        # 3. 个体学习器的多样性展示
        # 显示前5个基学习器的决策边界
        colors = ['red', 'blue', 'green', 'orange', 'purple']
        
        for i, estimator in enumerate(bagging_classifier.estimators_[:5]):
            Z_individual = estimator.predict(np.c_[xx.ravel(), yy.ravel()])
            Z_individual = Z_individual.reshape(xx.shape)
            axes[2].contour(xx, yy, Z_individual, colors=[colors[i]], alpha=0.6, linewidths=1)
        
        axes[2].scatter(X_train[:, 0], X_train[:, 1], c=y_train, cmap=plt.cm.RdYlBu)
        axes[2].set_title('个体学习器决策边界')
        axes[2].set_xlabel('特征1')
        axes[2].set_ylabel('特征2')
        
        plt.tight_layout()
        plt.show()
        
    def bagging_regressor_demo(self):
        """Bagging回归器演示"""
        print("\n=== Bagging回归器演示 ===")
        
        # 创建回归数据集
        X, y = make_regression(n_samples=300, n_features=1, noise=10, random_state=42)
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
        
        # 基学习器:决策树回归器
        base_regressor = DecisionTreeRegressor(random_state=42, max_depth=5)
        
        # Bagging回归器
        bagging_regressor = BaggingRegressor(
            base_estimator=base_regressor,
            n_estimators=10,
            random_state=42,
            bootstrap=True,
            n_jobs=-1
        )
        
        # 训练模型
        base_regressor.fit(X_train, y_train)
        bagging_regressor.fit(X_train, y_train)
        
        # 预测
        base_pred = base_regressor.predict(X_test)
        bagging_pred = bagging_regressor.predict(X_test)
        
        # 评估性能
        base_mse = mean_squared_error(y_test, base_pred)
        bagging_mse = mean_squared_error(y_test, bagging_pred)
        base_r2 = r2_score(y_test, base_pred)
        bagging_r2 = r2_score(y_test, bagging_pred)
        
        print(f"单个决策树 - MSE: {base_mse:.2f}, R²: {base_r2:.3f}")
        print(f"Bagging - MSE: {bagging_mse:.2f}, R²: {bagging_r2:.3f}")
        print(f"MSE改善: {base_mse - bagging_mse:.2f}")
        
        # 可视化回归结果
        self.visualize_bagging_regression(X_train, y_train, X_test, y_test,
                                        base_regressor, bagging_regressor)
        
        return base_mse, bagging_mse
        
    def visualize_bagging_regression(self, X_train, y_train, X_test, y_test,
                                   base_regressor, bagging_regressor):
        """可视化Bagging回归结果"""
        fig, axes = plt.subplots(1, 3, figsize=(18, 5))
        
        # 创建预测范围
        X_range = np.linspace(X_train.min(), X_train.max(), 300).reshape(-1, 1)
        
        # 1. 单个决策树
        base_pred_range = base_regressor.predict(X_range)
        
        axes[0].scatter(X_train, y_train, alpha=0.6, label='训练数据')
        axes[0].scatter(X_test, y_test, alpha=0.6, color='red', label='测试数据')
        axes[0].plot(X_range, base_pred_range, color='green', linewidth=2, label='预测')
        axes[0].set_title('单个决策树回归')
        axes[0].set_xlabel('X')
        axes[0].set_ylabel('y')
        axes[0].legend()
        axes[0].grid(True, alpha=0.3)
        
        # 2. Bagging回归
        bagging_pred_range = bagging_regressor.predict(X_range)
        
        axes[1].scatter(X_train, y_train, alpha=0.6, label='训练数据')
        axes[1].scatter(X_test, y_test, alpha=0.6, color='red', label='测试数据')
        axes[1].plot(X_range, bagging_pred_range, color='green', linewidth=2, label='预测')
        axes[1].set_title('Bagging回归')
        axes[1].set_xlabel('X')
        axes[1].set_ylabel('y')
        axes[1].legend()
        axes[1].grid(True, alpha=0.3)
        
        # 3. 个体学习器预测展示
        axes[2].scatter(X_train, y_train, alpha=0.6, label='训练数据')
        
        # 显示前5个基学习器的预测
        colors = ['red', 'blue', 'green', 'orange', 'purple']
        for i, estimator in enumerate(bagging_regressor.estimators_[:5]):
            individual_pred = estimator.predict(X_range)
            axes[2].plot(X_range, individual_pred, color=colors[i], alpha=0.6, 
                        linewidth=1, label=f'学习器{i+1}')
        
        # Bagging平均预测
        axes[2].plot(X_range, bagging_pred_range, color='black', linewidth=3, 
                    label='Bagging平均')
        
        axes[2].set_title('个体学习器 vs Bagging平均')
        axes[2].set_xlabel('X')
        axes[2].set_ylabel('y')
        axes[2].legend()
        axes[2].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
    def bagging_parameters_analysis(self):
        """Bagging参数分析"""
        print("\n=== Bagging参数分析 ===")
        
        # 创建数据集
        X, y = make_classification(n_samples=1000, n_features=20, n_informative=10,
                                 n_redundant=10, n_clusters_per_class=1, random_state=42)
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
        
        # 1. 基学习器数量的影响
        n_estimators_range = [1, 5, 10, 20, 50, 100, 200]
        scores_vs_n_estimators = []
        
        for n_est in n_estimators_range:
            bagging = BaggingClassifier(
                base_estimator=DecisionTreeClassifier(random_state=42),
                n_estimators=n_est,
                random_state=42
            )
            bagging.fit(X_train, y_train)
            score = bagging.score(X_test, y_test)
            scores_vs_n_estimators.append(score)
            print(f"n_estimators={n_est:3d}: 准确率={score:.3f}")
        
        # 2. 样本采样比例的影响
        print("\n样本采样比例的影响:")
        max_samples_range = [0.1, 0.3, 0.5, 0.7, 0.9, 1.0]
        scores_vs_max_samples = []
        
        for max_samp in max_samples_range:
            bagging = BaggingClassifier(
                base_estimator=DecisionTreeClassifier(random_state=42),
                n_estimators=50,
                max_samples=max_samp,
                random_state=42
            )
            bagging.fit(X_train, y_train)
            score = bagging.score(X_test, y_test)
            scores_vs_max_samples.append(score)
            print(f"max_samples={max_samp:.1f}: 准确率={score:.3f}")
        
        # 可视化参数影响
        self.visualize_bagging_parameters(n_estimators_range, scores_vs_n_estimators,
                                        max_samples_range, scores_vs_max_samples)
        
    def visualize_bagging_parameters(self, n_estimators_range, scores_vs_n_estimators,
                                   max_samples_range, scores_vs_max_samples):
        """可视化Bagging参数影响"""
        fig, axes = plt.subplots(1, 2, figsize=(15, 5))
        
        # 1. 基学习器数量的影响
        axes[0].plot(n_estimators_range, scores_vs_n_estimators, 'o-', linewidth=2, markersize=8)
        axes[0].set_xlabel('基学习器数量')
        axes[0].set_ylabel('准确率')
        axes[0].set_title('基学习器数量对性能的影响')
        axes[0].grid(True, alpha=0.3)
        axes[0].set_xscale('log')
        
        # 添加数值标注
        for x, y in zip(n_estimators_range, scores_vs_n_estimators):
            axes[0].annotate(f'{y:.3f}', (x, y), textcoords="offset points", 
                           xytext=(0,10), ha='center')
        
        # 2. 样本采样比例的影响
        axes[1].plot(max_samples_range, scores_vs_max_samples, 'o-', linewidth=2, markersize=8)
        axes[1].set_xlabel('样本采样比例')
        axes[1].set_ylabel('准确率')
        axes[1].set_title('样本采样比例对性能的影响')
        axes[1].grid(True, alpha=0.3)
        
        # 添加数值标注
        for x, y in zip(max_samples_range, scores_vs_max_samples):
            axes[1].annotate(f'{y:.3f}', (x, y), textcoords="offset points", 
                           xytext=(0,10), ha='center')
        
        plt.tight_layout()
        plt.show()

# 演示Bagging方法
print("=== Bagging方法演示 ===")
bagging_demo = BaggingDemo()
bagging_demo.bagging_theory()
bootstrap_samples = bagging_demo.bootstrap_sampling_demo()
base_score, bagging_score = bagging_demo.bagging_classifier_demo()
base_mse, bagging_mse = bagging_demo.bagging_regressor_demo()
bagging_demo.bagging_parameters_analysis()

7.3 随机森林

7.3.1 随机森林原理

随机森林是Bagging的一个扩展,在训练过程中引入了随机特征选择,进一步增加了模型的多样性。

class RandomForestDemo:
    def __init__(self):
        self.models = {}
        
    def random_forest_theory(self):
        """随机森林理论介绍"""
        print("=== 随机森林理论 ===")
        print("1. 基本思想:")
        print("   - Bagging + 随机特征选择")
        print("   - 每个节点分裂时随机选择特征子集")
        print("   - 进一步增加模型多样性")
        
        print("\n2. 算法流程:")
        print("   - Bootstrap采样生成训练子集")
        print("   - 每个节点随机选择m个特征(m < 总特征数)")
        print("   - 在选定特征中找最佳分裂点")
        print("   - 构建多棵决策树")
        print("   - 投票或平均得到最终预测")
        
        print("\n3. 关键参数:")
        print("   - n_estimators: 树的数量")
        print("   - max_features: 每次分裂考虑的特征数")
        print("   - max_depth: 树的最大深度")
        print("   - min_samples_split: 分裂所需最小样本数")
        
        print("\n4. 主要优势:")
        print("   - 准确率高,泛化能力强")
        print("   - 对缺失值不敏感")
        print("   - 可以处理大规模数据")
        print("   - 提供特征重要性")
        print("   - 不容易过拟合")
        
    def feature_randomness_demo(self):
        """特征随机性演示"""
        print("\n=== 特征随机性演示 ===")
        
        # 创建高维数据集
        X, y = make_classification(n_samples=1000, n_features=20, n_informative=10,
                                 n_redundant=5, n_clusters_per_class=1, random_state=42)
        
        feature_names = [f'特征_{i+1}' for i in range(X.shape[1])]
        
        # 不同max_features设置的比较
        max_features_options = ['sqrt', 'log2', None, 0.5]
        
        print("不同特征选择策略的比较:")
        
        for max_feat in max_features_options:
            rf = RandomForestClassifier(
                n_estimators=100,
                max_features=max_feat,
                random_state=42
            )
            
            # 交叉验证评估
            scores = cross_val_score(rf, X, y, cv=5)
            
            if max_feat == 'sqrt':
                n_features_used = int(np.sqrt(X.shape[1]))
                desc = f"sqrt({X.shape[1]}) = {n_features_used}"
            elif max_feat == 'log2':
                n_features_used = int(np.log2(X.shape[1]))
                desc = f"log2({X.shape[1]}) = {n_features_used}"
            elif max_feat is None:
                n_features_used = X.shape[1]
                desc = f"全部特征 = {n_features_used}"
            else:
                n_features_used = int(max_feat * X.shape[1])
                desc = f"{max_feat} × {X.shape[1]} = {n_features_used}"
            
            print(f"  {desc:20s}: 准确率 = {scores.mean():.3f} ± {scores.std():.3f}")
        
        # 可视化特征重要性
        self.visualize_feature_importance(X, y, feature_names)
        
    def visualize_feature_importance(self, X, y, feature_names):
        """可视化特征重要性"""
        # 训练随机森林
        rf = RandomForestClassifier(n_estimators=100, random_state=42)
        rf.fit(X, y)
        
        # 获取特征重要性
        importances = rf.feature_importances_
        indices = np.argsort(importances)[::-1]
        
        # 可视化
        fig, axes = plt.subplots(1, 2, figsize=(15, 6))
        
        # 1. 特征重要性条形图
        axes[0].bar(range(len(importances)), importances[indices])
        axes[0].set_title('随机森林特征重要性')
        axes[0].set_xlabel('特征排名')
        axes[0].set_ylabel('重要性')
        axes[0].set_xticks(range(len(importances)))
        axes[0].set_xticklabels([feature_names[i] for i in indices], rotation=45)
        
        # 2. 累积重要性
        cumulative_importance = np.cumsum(importances[indices])
        axes[1].plot(range(1, len(importances)+1), cumulative_importance, 'o-')
        axes[1].axhline(y=0.8, color='r', linestyle='--', label='80%重要性')
        axes[1].axhline(y=0.9, color='orange', linestyle='--', label='90%重要性')
        axes[1].set_title('累积特征重要性')
        axes[1].set_xlabel('特征数量')
        axes[1].set_ylabel('累积重要性')
        axes[1].legend()
        axes[1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        # 输出重要特征
        print(f"\n前5个重要特征:")
        for i in range(5):
            idx = indices[i]
            print(f"  {feature_names[idx]}: {importances[idx]:.3f}")
        
    def random_forest_vs_decision_tree(self):
        """随机森林与决策树比较"""
        print("\n=== 随机森林 vs 决策树比较 ===")
        
        # 创建数据集
        X, y = make_classification(n_samples=1000, n_features=2, n_redundant=0,
                                 n_informative=2, n_clusters_per_class=1, 
                                 n_classes=3, random_state=42)
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
        
        # 模型定义
        models = {
            '单个决策树': DecisionTreeClassifier(random_state=42),
            '随机森林': RandomForestClassifier(n_estimators=100, random_state=42)
        }
        
        # 训练和评估
        results = {}
        for name, model in models.items():
            model.fit(X_train, y_train)
            train_score = model.score(X_train, y_train)
            test_score = model.score(X_test, y_test)
            
            results[name] = {
                'model': model,
                'train_score': train_score,
                'test_score': test_score
            }
            
            print(f"{name}:")
            print(f"  训练准确率: {train_score:.3f}")
            print(f"  测试准确率: {test_score:.3f}")
            print(f"  过拟合程度: {train_score - test_score:.3f}")
        
        # 可视化比较
        self.visualize_rf_vs_dt_comparison(X_train, y_train, X_test, y_test, results)
        
        return results
        
    def visualize_rf_vs_dt_comparison(self, X_train, y_train, X_test, y_test, results):
        """可视化随机森林与决策树比较"""
        fig, axes = plt.subplots(2, 2, figsize=(15, 12))
        
        # 创建网格
        h = 0.02
        x_min, x_max = X_train[:, 0].min() - 1, X_train[:, 0].max() + 1
        y_min, y_max = X_train[:, 1].min() - 1, X_train[:, 1].max() + 1
        xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
                           np.arange(y_min, y_max, h))
        
        # 决策树
        dt_model = results['单个决策树']['model']
        Z_dt = dt_model.predict(np.c_[xx.ravel(), yy.ravel()])
        Z_dt = Z_dt.reshape(xx.shape)
        
        axes[0, 0].contourf(xx, yy, Z_dt, alpha=0.4, cmap=plt.cm.Set3)
        axes[0, 0].scatter(X_train[:, 0], X_train[:, 1], c=y_train, cmap=plt.cm.Set3)
        axes[0, 0].set_title('决策树决策边界(训练集)')
        axes[0, 0].set_xlabel('特征1')
        axes[0, 0].set_ylabel('特征2')
        
        axes[0, 1].contourf(xx, yy, Z_dt, alpha=0.4, cmap=plt.cm.Set3)
        axes[0, 1].scatter(X_test[:, 0], X_test[:, 1], c=y_test, cmap=plt.cm.Set3)
        axes[0, 1].set_title('决策树决策边界(测试集)')
        axes[0, 1].set_xlabel('特征1')
        axes[0, 1].set_ylabel('特征2')
        
        # 随机森林
        rf_model = results['随机森林']['model']
        Z_rf = rf_model.predict(np.c_[xx.ravel(), yy.ravel()])
        Z_rf = Z_rf.reshape(xx.shape)
        
        axes[1, 0].contourf(xx, yy, Z_rf, alpha=0.4, cmap=plt.cm.Set3)
        axes[1, 0].scatter(X_train[:, 0], X_train[:, 1], c=y_train, cmap=plt.cm.Set3)
        axes[1, 0].set_title('随机森林决策边界(训练集)')
        axes[1, 0].set_xlabel('特征1')
        axes[1, 0].set_ylabel('特征2')
        
        axes[1, 1].contourf(xx, yy, Z_rf, alpha=0.4, cmap=plt.cm.Set3)
        axes[1, 1].scatter(X_test[:, 0], X_test[:, 1], c=y_test, cmap=plt.cm.Set3)
        axes[1, 1].set_title('随机森林决策边界(测试集)')
        axes[1, 1].set_xlabel('特征1')
        axes[1, 1].set_ylabel('特征2')
        
        plt.tight_layout()
        plt.show()
        
    def random_forest_regression_demo(self):
        """随机森林回归演示"""
        print("\n=== 随机森林回归演示 ===")
        
        # 创建非线性回归数据
        np.random.seed(42)
        X = np.linspace(0, 10, 300).reshape(-1, 1)
        y = np.sin(X).ravel() + np.random.normal(0, 0.3, X.shape[0])
        
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
        
        # 模型比较
        models = {
            '决策树回归': DecisionTreeRegressor(random_state=42),
            '随机森林回归': RandomForestRegressor(n_estimators=100, random_state=42)
        }
        
        # 训练和评估
        results = {}
        for name, model in models.items():
            model.fit(X_train, y_train)
            
            train_pred = model.predict(X_train)
            test_pred = model.predict(X_test)
            
            train_mse = mean_squared_error(y_train, train_pred)
            test_mse = mean_squared_error(y_test, test_pred)
            train_r2 = r2_score(y_train, train_pred)
            test_r2 = r2_score(y_test, test_pred)
            
            results[name] = {
                'model': model,
                'train_mse': train_mse,
                'test_mse': test_mse,
                'train_r2': train_r2,
                'test_r2': test_r2
            }
            
            print(f"{name}:")
            print(f"  训练 MSE: {train_mse:.3f}, R²: {train_r2:.3f}")
            print(f"  测试 MSE: {test_mse:.3f}, R²: {test_r2:.3f}")
        
        # 可视化回归结果
        self.visualize_rf_regression(X_train, y_train, X_test, y_test, results)
        
        return results
        
    def visualize_rf_regression(self, X_train, y_train, X_test, y_test, results):
        """可视化随机森林回归结果"""
        fig, axes = plt.subplots(1, 3, figsize=(18, 5))
        
        # 创建预测范围
        X_range = np.linspace(X_train.min(), X_train.max(), 300).reshape(-1, 1)
        
        # 1. 决策树回归
        dt_model = results['决策树回归']['model']
        dt_pred = dt_model.predict(X_range)
        
        axes[0].scatter(X_train, y_train, alpha=0.6, label='训练数据')
        axes[0].scatter(X_test, y_test, alpha=0.6, color='red', label='测试数据')
        axes[0].plot(X_range, dt_pred, color='green', linewidth=2, label='预测')
        axes[0].set_title('决策树回归')
        axes[0].set_xlabel('X')
        axes[0].set_ylabel('y')
        axes[0].legend()
        axes[0].grid(True, alpha=0.3)
        
        # 2. 随机森林回归
        rf_model = results['随机森林回归']['model']
        rf_pred = rf_model.predict(X_range)
        
        axes[1].scatter(X_train, y_train, alpha=0.6, label='训练数据')
        axes[1].scatter(X_test, y_test, alpha=0.6, color='red', label='测试数据')
        axes[1].plot(X_range, rf_pred, color='green', linewidth=2, label='预测')
        axes[1].set_title('随机森林回归')
        axes[1].set_xlabel('X')
        axes[1].set_ylabel('y')
        axes[1].legend()
        axes[1].grid(True, alpha=0.3)
        
        # 3. 个体树的预测(随机森林)
        axes[2].scatter(X_train, y_train, alpha=0.6, label='训练数据')
        
        # 显示前10棵树的预测
        colors = plt.cm.tab10(np.linspace(0, 1, 10))
        for i, tree in enumerate(rf_model.estimators_[:10]):
            tree_pred = tree.predict(X_range)
            axes[2].plot(X_range, tree_pred, color=colors[i], alpha=0.3, linewidth=1)
        
        # 随机森林平均预测
        axes[2].plot(X_range, rf_pred, color='red', linewidth=3, label='随机森林平均')
        axes[2].set_title('个体树 vs 随机森林平均')
        axes[2].set_xlabel('X')
        axes[2].set_ylabel('y')
        axes[2].legend()
        axes[2].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
    def random_forest_hyperparameter_tuning(self):
        """随机森林超参数调优"""
        print("\n=== 随机森林超参数调优 ===")
        
        # 创建数据集
        X, y = make_classification(n_samples=1000, n_features=20, n_informative=15,
                                 n_redundant=5, n_clusters_per_class=1, random_state=42)
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
        
        # 参数网格
        param_grid = {
            'n_estimators': [50, 100, 200],
            'max_depth': [None, 10, 20],
            'min_samples_split': [2, 5, 10],
            'min_samples_leaf': [1, 2, 4],
            'max_features': ['sqrt', 'log2']
        }
        
        # 网格搜索
        rf = RandomForestClassifier(random_state=42)
        grid_search = GridSearchCV(rf, param_grid, cv=5, scoring='accuracy', n_jobs=-1)
        
        print("进行网格搜索...")
        grid_search.fit(X_train, y_train)
        
        print(f"最佳参数: {grid_search.best_params_}")
        print(f"最佳交叉验证分数: {grid_search.best_score_:.3f}")
        
        # 最佳模型在测试集上的性能
        best_rf = grid_search.best_estimator_
        test_score = best_rf.score(X_test, y_test)
        print(f"测试集准确率: {test_score:.3f}")
        
        # 参数重要性分析
        self.analyze_parameter_importance(grid_search)
        
        return grid_search.best_estimator_
        
    def analyze_parameter_importance(self, grid_search):
        """分析参数重要性"""
        print("\n=== 参数重要性分析 ===")
        
        # 获取所有结果
        results_df = pd.DataFrame(grid_search.cv_results_)
        
        # 分析每个参数的影响
        params_to_analyze = ['n_estimators', 'max_depth', 'min_samples_split', 'max_features']
        
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        axes = axes.ravel()
        
        for i, param in enumerate(params_to_analyze):
            param_col = f'param_{param}'
            if param_col in results_df.columns:
                # 按参数值分组,计算平均分数
                param_scores = results_df.groupby(param_col)['mean_test_score'].agg(['mean', 'std'])
                
                x_values = param_scores.index
                y_values = param_scores['mean']
                y_errors = param_scores['std']
                
                axes[i].errorbar(range(len(x_values)), y_values, yerr=y_errors, 
                               marker='o', capsize=5, capthick=2)
                axes[i].set_xticks(range(len(x_values)))
                axes[i].set_xticklabels(x_values, rotation=45)
                axes[i].set_title(f'{param} 对性能的影响')
                axes[i].set_ylabel('交叉验证分数')
                axes[i].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()

# 演示随机森林
print("=== 随机森林演示 ===")
rf_demo = RandomForestDemo()
rf_demo.random_forest_theory()
rf_demo.feature_randomness_demo()
rf_vs_dt_results = rf_demo.random_forest_vs_decision_tree()
rf_regression_results = rf_demo.random_forest_regression_demo()
best_rf = rf_demo.random_forest_hyperparameter_tuning()

7.4 Boosting方法

7.4.1 Boosting基本原理

Boosting是一种串行集成方法,通过逐步纠正前一个学习器的错误来提高整体性能。

class BoostingDemo:
    def __init__(self):
        self.models = {}
        
    def boosting_theory(self):
        """Boosting理论介绍"""
        print("=== Boosting理论 ===")
        print("1. 基本思想:")
        print("   - 串行训练,后续学习器关注前面学习器的错误")
        print("   - 通过加权组合弱学习器构建强学习器")
        print("   - 主要降低偏差,提高模型复杂度")
        
        print("\n2. 算法流程:")
        print("   - 初始化样本权重(均匀分布)")
        print("   - 训练弱学习器")
        print("   - 计算学习器权重(基于错误率)")
        print("   - 更新样本权重(增加错误样本权重)")
        print("   - 重复直到达到指定轮数")
        print("   - 加权组合所有弱学习器")
        
        print("\n3. 主要优势:")
        print("   - 能够将弱学习器提升为强学习器")
        print("   - 理论基础扎实")
        print("   - 对偏差敏感,能有效降低偏差")
        
        print("\n4. 主要缺点:")
        print("   - 对噪声和异常值敏感")
        print("   - 容易过拟合")
        print("   - 训练时间较长(串行)")
        
        print("\n5. 常见Boosting算法:")
        print("   - AdaBoost: 自适应提升")
        print("   - Gradient Boosting: 梯度提升")
        print("   - XGBoost: 极端梯度提升")
        print("   - LightGBM: 轻量级梯度提升")
        
    def boosting_process_visualization(self):
        """Boosting过程可视化"""
        print("\n=== Boosting过程可视化 ===")
        
        # 创建简单的二分类数据
        np.random.seed(42)
        X = np.random.randn(100, 2)
        y = (X[:, 0] + X[:, 1] > 0).astype(int)
        
        # 添加一些噪声点
        noise_indices = np.random.choice(100, 10, replace=False)
        y[noise_indices] = 1 - y[noise_indices]
        
        # 模拟AdaBoost过程
        n_estimators = 4
        sample_weights = np.ones(len(X)) / len(X)
        
        fig, axes = plt.subplots(2, 2, figsize=(15, 12))
        axes = axes.ravel()
        
        estimator_weights = []
        
        for i in range(n_estimators):
            # 简化的弱学习器(基于单个特征的阈值)
            feature_idx = i % 2
            threshold = np.median(X[:, feature_idx])
            
            # 预测
            predictions = (X[:, feature_idx] > threshold).astype(int)
            
            # 计算加权错误率
            errors = (predictions != y)
            weighted_error = np.sum(sample_weights * errors)
            
            # 计算学习器权重
            if weighted_error == 0:
                alpha = 1.0
            else:
                alpha = 0.5 * np.log((1 - weighted_error) / weighted_error)
            
            estimator_weights.append(alpha)
            
            # 更新样本权重
            sample_weights *= np.exp(-alpha * y * (2 * predictions - 1))
            sample_weights /= np.sum(sample_weights)
            
            # 可视化
            scatter = axes[i].scatter(X[:, 0], X[:, 1], c=y, s=sample_weights*5000, 
                                    alpha=0.7, cmap=plt.cm.RdYlBu)
            
            # 绘制决策边界
            if feature_idx == 0:
                axes[i].axvline(x=threshold, color='red', linestyle='--', linewidth=2)
            else:
                axes[i].axhline(y=threshold, color='red', linestyle='--', linewidth=2)
            
            axes[i].set_title(f'第{i+1}轮 - 特征{feature_idx+1}, α={alpha:.2f}')
            axes[i].set_xlabel('特征1')
            axes[i].set_ylabel('特征2')
            
        plt.tight_layout()
        plt.show()
        
        print("学习器权重:", [f"{w:.2f}" for w in estimator_weights])
        
    def adaboost_step_by_step(self):
        """AdaBoost逐步演示"""
        print("\n=== AdaBoost逐步演示 ===")
        
        # 创建数据集
        X, y = make_classification(n_samples=300, n_features=2, n_redundant=0,
                                 n_informative=2, n_clusters_per_class=1, random_state=42)
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
        
        # 手动实现简化的AdaBoost过程
        n_estimators = 5
        sample_weights = np.ones(len(X_train)) / len(X_train)
        estimators = []
        estimator_weights = []
        
        print("AdaBoost训练过程:")
        
        for i in range(n_estimators):
            # 训练弱学习器(决策树桩)
            weak_learner = DecisionTreeClassifier(max_depth=1, random_state=i)
            weak_learner.fit(X_train, y_train, sample_weight=sample_weights)
            
            # 预测
            predictions = weak_learner.predict(X_train)
            
            # 计算加权错误率
            errors = (predictions != y_train)
            weighted_error = np.sum(sample_weights * errors)
            
            # 计算学习器权重
            if weighted_error == 0:
                alpha = 1.0
            else:
                alpha = 0.5 * np.log((1 - weighted_error) / weighted_error)
            
            estimators.append(weak_learner)
            estimator_weights.append(alpha)
            
            print(f"  第{i+1}轮: 错误率={weighted_error:.3f}, 权重={alpha:.3f}")
            
            # 更新样本权重
            sample_weights *= np.exp(-alpha * (2 * y_train - 1) * (2 * predictions - 1))
            sample_weights /= np.sum(sample_weights)
        
        # 集成预测
        def ensemble_predict(X):
            predictions = np.zeros(len(X))
            for estimator, weight in zip(estimators, estimator_weights):
                pred = estimator.predict(X)
                predictions += weight * (2 * pred - 1)
            return (predictions > 0).astype(int)
        
        # 评估性能
        manual_pred = ensemble_predict(X_test)
        manual_score = accuracy_score(y_test, manual_pred)
        
        # 与sklearn的AdaBoost比较
        ada_boost = AdaBoostClassifier(n_estimators=n_estimators, random_state=42)
        ada_boost.fit(X_train, y_train)
        sklearn_score = ada_boost.score(X_test, y_test)
        
        print(f"\n手动实现AdaBoost准确率: {manual_score:.3f}")
        print(f"sklearn AdaBoost准确率: {sklearn_score:.3f}")
        
        # 可视化AdaBoost过程
        self.visualize_adaboost_process(X_train, y_train, estimators, estimator_weights)
        
        return estimators, estimator_weights
        
    def visualize_adaboost_process(self, X_train, y_train, estimators, estimator_weights):
        """可视化AdaBoost过程"""
        fig, axes = plt.subplots(2, 3, figsize=(18, 10))
        
        # 创建网格
        h = 0.02
        x_min, x_max = X_train[:, 0].min() - 1, X_train[:, 0].max() + 1
        y_min, y_max = X_train[:, 1].min() - 1, X_train[:, 1].max() + 1
        xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
                           np.arange(y_min, y_max, h))
        
        # 显示前5个弱学习器
        for i in range(min(5, len(estimators))):
            row = i // 3
            col = i % 3
            
            # 弱学习器决策边界
            Z = estimators[i].predict(np.c_[xx.ravel(), yy.ravel()])
            Z = Z.reshape(xx.shape)
            
            axes[row, col].contourf(xx, yy, Z, alpha=0.4, cmap=plt.cm.RdYlBu)
            axes[row, col].scatter(X_train[:, 0], X_train[:, 1], c=y_train, cmap=plt.cm.RdYlBu)
            axes[row, col].set_title(f'弱学习器{i+1} (权重={estimator_weights[i]:.2f})')
            axes[row, col].set_xlabel('特征1')
            axes[row, col].set_ylabel('特征2')
        
        # 最终集成结果
        def ensemble_predict_proba(X):
            predictions = np.zeros(len(X))
            for estimator, weight in zip(estimators, estimator_weights):
                pred = estimator.predict(X)
                predictions += weight * (2 * pred - 1)
            return predictions
        
        Z_ensemble = ensemble_predict_proba(np.c_[xx.ravel(), yy.ravel()])
        Z_ensemble = (Z_ensemble > 0).astype(int).reshape(xx.shape)
        
        axes[1, 2].contourf(xx, yy, Z_ensemble, alpha=0.4, cmap=plt.cm.RdYlBu)
        axes[1, 2].scatter(X_train[:, 0], X_train[:, 1], c=y_train, cmap=plt.cm.RdYlBu)
        axes[1, 2].set_title('AdaBoost集成结果')
        axes[1, 2].set_xlabel('特征1')
        axes[1, 2].set_ylabel('特征2')
        
        plt.tight_layout()
        plt.show()

# 演示Boosting方法
print("=== Boosting方法演示 ===")
boosting_demo = BoostingDemo()
boosting_demo.boosting_theory()
boosting_demo.boosting_process_visualization()
estimators, weights = boosting_demo.adaboost_step_by_step()

7.5 AdaBoost算法

7.5.1 AdaBoost详细实现

AdaBoost(Adaptive Boosting)是最经典的Boosting算法,通过自适应地调整样本权重来提升弱学习器。

class AdaBoostDemo:
    def __init__(self):
        self.models = {}
        
    def adaboost_theory(self):
        """AdaBoost理论详解"""
        print("=== AdaBoost理论详解 ===")
        print("1. 算法核心思想:")
        print("   - 每轮训练后,增加被错误分类样本的权重")
        print("   - 减少被正确分类样本的权重")
        print("   - 让后续学习器更关注困难样本")
        
        print("\n2. 数学公式:")
        print("   - 学习器权重: α_t = 0.5 * ln((1-ε_t)/ε_t)")
        print("   - 样本权重更新: w_t+1 = w_t * exp(-α_t * y_i * h_t(x_i))")
        print("   - 最终预测: H(x) = sign(Σ α_t * h_t(x))")
        
        print("\n3. 算法特点:")
        print("   - 对噪声敏感")
        print("   - 理论保证强")
        print("   - 适合弱学习器")
        print("   - 可能过拟合")
        
    def adaboost_classification_demo(self):
        """AdaBoost分类演示"""
        print("\n=== AdaBoost分类演示 ===")
        
        # 创建数据集
        X, y = make_classification(n_samples=1000, n_features=20, n_informative=10,
                                 n_redundant=10, n_clusters_per_class=1, random_state=42)
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
        
        # 不同基学习器的AdaBoost
        base_estimators = {
            '决策树桩': DecisionTreeClassifier(max_depth=1),
            '浅层决策树': DecisionTreeClassifier(max_depth=3),
            '朴素贝叶斯': GaussianNB()
        }
        
        results = {}
        
        for name, base_est in base_estimators.items():
            print(f"\n{name}作为基学习器:")
            
            # AdaBoost
            ada_boost = AdaBoostClassifier(
                base_estimator=base_est,
                n_estimators=50,
                learning_rate=1.0,
                random_state=42
            )
            
            ada_boost.fit(X_train, y_train)
            
            # 评估
            train_score = ada_boost.score(X_train, y_train)
            test_score = ada_boost.score(X_test, y_test)
            
            results[name] = {
                'model': ada_boost,
                'train_score': train_score,
                'test_score': test_score
            }
            
            print(f"  训练准确率: {train_score:.3f}")
            print(f"  测试准确率: {test_score:.3f}")
            print(f"  过拟合程度: {train_score - test_score:.3f}")
        
        # 学习曲线分析
        self.plot_adaboost_learning_curves(X_train, y_train, X_test, y_test)
        
        return results
        
    def plot_adaboost_learning_curves(self, X_train, y_train, X_test, y_test):
        """绘制AdaBoost学习曲线"""
        print("\n=== AdaBoost学习曲线 ===")
        
        # 不同n_estimators的性能
        n_estimators_range = range(1, 101, 5)
        train_scores = []
        test_scores = []
        
        for n_est in n_estimators_range:
            ada_boost = AdaBoostClassifier(
                base_estimator=DecisionTreeClassifier(max_depth=1),
                n_estimators=n_est,
                random_state=42
            )
            
            ada_boost.fit(X_train, y_train)
            
            train_score = ada_boost.score(X_train, y_train)
            test_score = ada_boost.score(X_test, y_test)
            
            train_scores.append(train_score)
            test_scores.append(test_score)
        
        # 可视化学习曲线
        fig, axes = plt.subplots(1, 2, figsize=(15, 5))
        
        # 1. 学习曲线
        axes[0].plot(n_estimators_range, train_scores, 'o-', label='训练集', linewidth=2)
        axes[0].plot(n_estimators_range, test_scores, 'o-', label='测试集', linewidth=2)
        axes[0].set_xlabel('基学习器数量')
        axes[0].set_ylabel('准确率')
        axes[0].set_title('AdaBoost学习曲线')
        axes[0].legend()
        axes[0].grid(True, alpha=0.3)
        
        # 2. 过拟合分析
        overfitting = np.array(train_scores) - np.array(test_scores)
        axes[1].plot(n_estimators_range, overfitting, 'o-', color='red', linewidth=2)
        axes[1].axhline(y=0, color='black', linestyle='--', alpha=0.5)
        axes[1].set_xlabel('基学习器数量')
        axes[1].set_ylabel('过拟合程度 (训练-测试)')
        axes[1].set_title('AdaBoost过拟合分析')
        axes[1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
    def adaboost_regression_demo(self):
        """AdaBoost回归演示"""
        print("\n=== AdaBoost回归演示 ===")
        
        # 创建回归数据集
        X, y = make_regression(n_samples=500, n_features=1, noise=10, random_state=42)
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
        
        # AdaBoost回归器
        ada_regressor = AdaBoostRegressor(
            base_estimator=DecisionTreeRegressor(max_depth=3),
            n_estimators=50,
            learning_rate=1.0,
            random_state=42
        )
        
        # 训练
        ada_regressor.fit(X_train, y_train)
        
        # 预测
        train_pred = ada_regressor.predict(X_train)
        test_pred = ada_regressor.predict(X_test)
        
        # 评估
        train_mse = mean_squared_error(y_train, train_pred)
        test_mse = mean_squared_error(y_test, test_pred)
        train_r2 = r2_score(y_train, train_pred)
        test_r2 = r2_score(y_test, test_pred)
        
        print(f"训练 MSE: {train_mse:.2f}, R²: {train_r2:.3f}")
        print(f"测试 MSE: {test_mse:.2f}, R²: {test_r2:.3f}")
        
        # 可视化回归结果
        self.visualize_adaboost_regression(X_train, y_train, X_test, y_test, ada_regressor)
        
        return ada_regressor
        
    def visualize_adaboost_regression(self, X_train, y_train, X_test, y_test, ada_regressor):
        """可视化AdaBoost回归结果"""
        fig, axes = plt.subplots(1, 3, figsize=(18, 5))
        
        # 创建预测范围
        X_range = np.linspace(X_train.min(), X_train.max(), 300).reshape(-1, 1)
        
        # 1. 最终预测结果
        final_pred = ada_regressor.predict(X_range)
        
        axes[0].scatter(X_train, y_train, alpha=0.6, label='训练数据')
        axes[0].scatter(X_test, y_test, alpha=0.6, color='red', label='测试数据')
        axes[0].plot(X_range, final_pred, color='green', linewidth=2, label='AdaBoost预测')
        axes[0].set_title('AdaBoost回归结果')
        axes[0].set_xlabel('X')
        axes[0].set_ylabel('y')
        axes[0].legend()
        axes[0].grid(True, alpha=0.3)
        
        # 2. 个体学习器预测
        axes[1].scatter(X_train, y_train, alpha=0.6, label='训练数据')
        
        # 显示前10个基学习器的预测
        colors = plt.cm.tab10(np.linspace(0, 1, 10))
        for i, estimator in enumerate(ada_regressor.estimators_[:10]):
            estimator_pred = estimator.predict(X_range)
            axes[1].plot(X_range, estimator_pred, color=colors[i], alpha=0.5, linewidth=1)
        
        axes[1].plot(X_range, final_pred, color='red', linewidth=3, label='AdaBoost集成')
        axes[1].set_title('个体学习器 vs AdaBoost集成')
        axes[1].set_xlabel('X')
        axes[1].set_ylabel('y')
        axes[1].legend()
        axes[1].grid(True, alpha=0.3)
        
        # 3. 学习器权重分布
        estimator_weights = ada_regressor.estimator_weights_
        axes[2].bar(range(len(estimator_weights)), estimator_weights)
        axes[2].set_title('基学习器权重分布')
        axes[2].set_xlabel('学习器索引')
        axes[2].set_ylabel('权重')
        axes[2].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
    def adaboost_parameter_analysis(self):
        """AdaBoost参数分析"""
        print("\n=== AdaBoost参数分析 ===")
        
        # 创建数据集
        X, y = make_classification(n_samples=1000, n_features=20, n_informative=10,
                                 n_redundant=10, n_clusters_per_class=1, random_state=42)
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
        
        # 1. 学习率的影响
        print("学习率的影响:")
        learning_rates = [0.1, 0.5, 1.0, 1.5, 2.0]
        lr_scores = []
        
        for lr in learning_rates:
            ada_boost = AdaBoostClassifier(
                base_estimator=DecisionTreeClassifier(max_depth=1),
                n_estimators=50,
                learning_rate=lr,
                random_state=42
            )
            
            ada_boost.fit(X_train, y_train)
            score = ada_boost.score(X_test, y_test)
            lr_scores.append(score)
            print(f"  学习率={lr:.1f}: 准确率={score:.3f}")
        
        # 2. 基学习器深度的影响
        print("\n基学习器深度的影响:")
        max_depths = [1, 2, 3, 4, 5]
        depth_scores = []
        
        for depth in max_depths:
            ada_boost = AdaBoostClassifier(
                base_estimator=DecisionTreeClassifier(max_depth=depth),
                n_estimators=50,
                learning_rate=1.0,
                random_state=42
            )
            
            ada_boost.fit(X_train, y_train)
            score = ada_boost.score(X_test, y_test)
            depth_scores.append(score)
            print(f"  最大深度={depth}: 准确率={score:.3f}")
        
        # 可视化参数影响
        self.visualize_adaboost_parameters(learning_rates, lr_scores, max_depths, depth_scores)
        
    def visualize_adaboost_parameters(self, learning_rates, lr_scores, max_depths, depth_scores):
        """可视化AdaBoost参数影响"""
        fig, axes = plt.subplots(1, 2, figsize=(15, 5))
        
        # 1. 学习率影响
        axes[0].plot(learning_rates, lr_scores, 'o-', linewidth=2, markersize=8)
        axes[0].set_xlabel('学习率')
        axes[0].set_ylabel('测试准确率')
        axes[0].set_title('学习率对AdaBoost性能的影响')
        axes[0].grid(True, alpha=0.3)
        
        # 添加数值标注
        for x, y in zip(learning_rates, lr_scores):
            axes[0].annotate(f'{y:.3f}', (x, y), textcoords="offset points", 
                           xytext=(0,10), ha='center')
        
        # 2. 基学习器深度影响
        axes[1].plot(max_depths, depth_scores, 'o-', linewidth=2, markersize=8)
        axes[1].set_xlabel('基学习器最大深度')
        axes[1].set_ylabel('测试准确率')
        axes[1].set_title('基学习器深度对AdaBoost性能的影响')
        axes[1].grid(True, alpha=0.3)
        
        # 添加数值标注
        for x, y in zip(max_depths, depth_scores):
            axes[1].annotate(f'{y:.3f}', (x, y), textcoords="offset points", 
                           xytext=(0,10), ha='center')
        
        plt.tight_layout()
        plt.show()

# 演示AdaBoost算法
print("=== AdaBoost算法演示 ===")
adaboost_demo = AdaBoostDemo()
adaboost_demo.adaboost_theory()
ada_classification_results = adaboost_demo.adaboost_classification_demo()
ada_regressor = adaboost_demo.adaboost_regression_demo()
adaboost_demo.adaboost_parameter_analysis()

7.6 梯度提升树

7.6.1 梯度提升基本原理

梯度提升(Gradient Boosting)是一种更加通用的Boosting方法,通过拟合残差来逐步改进模型。

class GradientBoostingDemo:
    def __init__(self):
        self.models = {}
        
    def gradient_boosting_theory(self):
        """梯度提升理论介绍"""
        print("=== 梯度提升理论 ===")
        print("1. 基本思想:")
        print("   - 每次拟合前一个模型的残差(负梯度)")
        print("   - 通过梯度下降的思想优化损失函数")
        print("   - 更加通用,可以处理各种损失函数")
        
        print("\n2. 算法流程:")
        print("   - 初始化模型F_0(x)")
        print("   - 计算负梯度(残差)")
        print("   - 训练弱学习器拟合负梯度")
        print("   - 更新模型:F_m(x) = F_{m-1}(x) + γ_m * h_m(x)")
        print("   - 重复直到收敛")
        
        print("\n3. 数学表示:")
        print("   - 损失函数: L(y, F(x))")
        print("   - 负梯度: r_{im} = -[∂L(y_i, F(x_i))/∂F(x_i)]")
        print("   - 模型更新: F_m(x) = F_{m-1}(x) + γ_m * h_m(x)")
        
        print("\n4. 主要优势:")
        print("   - 灵活的损失函数")
        print("   - 强大的预测能力")
        print("   - 处理各种数据类型")
        print("   - 特征重要性评估")
        
    def gradient_boosting_process_demo(self):
        """梯度提升过程演示"""
        print("\n=== 梯度提升过程演示 ===")
        
        # 创建简单的回归数据
        np.random.seed(42)
        X = np.linspace(0, 10, 100).reshape(-1, 1)
        y = np.sin(X).ravel() + np.random.normal(0, 0.3, X.shape[0])
        
        # 手动实现简化的梯度提升过程
        n_estimators = 5
        learning_rate = 0.1
        
        # 初始化预测(均值)
        F = np.full(len(y), np.mean(y))
        predictions_history = [F.copy()]
        
        print("梯度提升训练过程:")
        print(f"初始预测(均值): {np.mean(y):.3f}")
        
        estimators = []
        
        for i in range(n_estimators):
            # 计算残差(负梯度)
            residuals = y - F
            
            # 训练弱学习器拟合残差
            weak_learner = DecisionTreeRegressor(max_depth=3, random_state=i)
            weak_learner.fit(X, residuals)
            
            # 预测残差
            residual_pred = weak_learner.predict(X)
            
            # 更新模型
            F += learning_rate * residual_pred
            
            estimators.append(weak_learner)
            predictions_history.append(F.copy())
            
            # 计算当前MSE
            mse = mean_squared_error(y, F)
            print(f"第{i+1}轮: MSE = {mse:.3f}")
        
        # 可视化梯度提升过程
        self.visualize_gradient_boosting_process(X, y, predictions_history, estimators, learning_rate)
        
        return estimators, predictions_history
        
    def visualize_gradient_boosting_process(self, X, y, predictions_history, estimators, learning_rate):
        """可视化梯度提升过程"""
        fig, axes = plt.subplots(2, 3, figsize=(18, 10))
        
        # 创建预测范围
        X_range = np.linspace(X.min(), X.max(), 200).reshape(-1, 1)
        
        # 显示前5轮的拟合过程
        for i in range(min(5, len(estimators))):
            row = i // 3
            col = i % 3
            
            # 当前轮次的预测
            current_pred = predictions_history[i+1]
            
            axes[row, col].scatter(X, y, alpha=0.6, label='真实数据')
            axes[row, col].plot(X, current_pred, 'r-', linewidth=2, label=f'第{i+1}轮预测')
            
            if i > 0:
                axes[row, col].plot(X, predictions_history[i], 'g--', alpha=0.7, label=f'第{i}轮预测')
            
            axes[row, col].set_title(f'第{i+1}轮梯度提升')
            axes[row, col].set_xlabel('X')
            axes[row, col].set_ylabel('y')
            axes[row, col].legend()
            axes[row, col].grid(True, alpha=0.3)
        
        # 最终结果比较
        final_pred = predictions_history[-1]
        
        # 重构完整预测函数
        def full_predict(X_pred):
            pred = np.full(len(X_pred), np.mean(y))
            for estimator in estimators:
                pred += learning_rate * estimator.predict(X_pred)
            return pred
        
        range_pred = full_predict(X_range)
        
        axes[1, 2].scatter(X, y, alpha=0.6, label='真实数据')
        axes[1, 2].plot(X_range, range_pred, 'r-', linewidth=2, label='梯度提升预测')
        axes[1, 2].set_title('最终梯度提升结果')
        axes[1, 2].set_xlabel('X')
        axes[1, 2].set_ylabel('y')
        axes[1, 2].legend()
        axes[1, 2].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
    def gradient_boosting_classification_demo(self):
        """梯度提升分类演示"""
        print("\n=== 梯度提升分类演示 ===")
        
        # 创建数据集
        X, y = make_classification(n_samples=1000, n_features=20, n_informative=15,
                                 n_redundant=5, n_clusters_per_class=1, random_state=42)
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
        
        # 梯度提升分类器
        gb_classifier = GradientBoostingClassifier(
            n_estimators=100,
            learning_rate=0.1,
            max_depth=3,
            random_state=42
        )
        
        # 训练
        gb_classifier.fit(X_train, y_train)
        
        # 预测
        train_pred = gb_classifier.predict(X_train)
        test_pred = gb_classifier.predict(X_test)
        
        # 评估
        train_score = accuracy_score(y_train, train_pred)
        test_score = accuracy_score(y_test, test_pred)
        
        print(f"训练准确率: {train_score:.3f}")
        print(f"测试准确率: {test_score:.3f}")
        print(f"过拟合程度: {train_score - test_score:.3f}")
        
        # 特征重要性
        feature_importance = gb_classifier.feature_importances_
        print(f"\n前5个重要特征:")
        top_features = np.argsort(feature_importance)[::-1][:5]
        for i, idx in enumerate(top_features):
            print(f"  特征{idx}: {feature_importance[idx]:.3f}")
        
        # 学习曲线
        self.plot_gb_learning_curves(X_train, y_train, X_test, y_test)
        
        return gb_classifier
        
    def plot_gb_learning_curves(self, X_train, y_train, X_test, y_test):
        """绘制梯度提升学习曲线"""
        print("\n=== 梯度提升学习曲线 ===")
        
        # 不同参数的比较
        learning_rates = [0.01, 0.1, 0.2]
        max_depths = [3, 5, 7]
        
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        
        # 1. 不同学习率的学习曲线
        for lr in learning_rates:
            gb = GradientBoostingClassifier(
                n_estimators=100,
                learning_rate=lr,
                max_depth=3,
                random_state=42
            )
            gb.fit(X_train, y_train)
            
            # 获取训练过程中的分数
            train_scores = []
            test_scores = []
            
            for i, pred in enumerate(gb.staged_predict(X_train)):
                train_scores.append(accuracy_score(y_train, pred))
            
            for i, pred in enumerate(gb.staged_predict(X_test)):
                test_scores.append(accuracy_score(y_test, pred))
            
            axes[0, 0].plot(train_scores, label=f'训练-lr={lr}', alpha=0.7)
            axes[0, 1].plot(test_scores, label=f'测试-lr={lr}', alpha=0.7)
        
        axes[0, 0].set_title('训练集学习曲线(不同学习率)')
        axes[0, 0].set_xlabel('迭代次数')
        axes[0, 0].set_ylabel('准确率')
        axes[0, 0].legend()
        axes[0, 0].grid(True, alpha=0.3)
        
        axes[0, 1].set_title('测试集学习曲线(不同学习率)')
        axes[0, 1].set_xlabel('迭代次数')
        axes[0, 1].set_ylabel('准确率')
        axes[0, 1].legend()
        axes[0, 1].grid(True, alpha=0.3)
        
        # 2. 不同最大深度的比较
        depth_train_scores = []
        depth_test_scores = []
        
        for depth in max_depths:
            gb = GradientBoostingClassifier(
                n_estimators=100,
                learning_rate=0.1,
                max_depth=depth,
                random_state=42
            )
            gb.fit(X_train, y_train)
            
            train_score = gb.score(X_train, y_train)
            test_score = gb.score(X_test, y_test)
            
            depth_train_scores.append(train_score)
            depth_test_scores.append(test_score)
        
        x_pos = np.arange(len(max_depths))
        width = 0.35
        
        axes[1, 0].bar(x_pos - width/2, depth_train_scores, width, label='训练集', alpha=0.7)
        axes[1, 0].bar(x_pos + width/2, depth_test_scores, width, label='测试集', alpha=0.7)
        axes[1, 0].set_xlabel('最大深度')
        axes[1, 0].set_ylabel('准确率')
        axes[1, 0].set_title('不同最大深度的性能比较')
        axes[1, 0].set_xticks(x_pos)
        axes[1, 0].set_xticklabels(max_depths)
        axes[1, 0].legend()
        axes[1, 0].grid(True, alpha=0.3)
        
        # 3. 过拟合分析
        overfitting = np.array(depth_train_scores) - np.array(depth_test_scores)
        axes[1, 1].bar(max_depths, overfitting, alpha=0.7, color='red')
        axes[1, 1].set_xlabel('最大深度')
        axes[1, 1].set_ylabel('过拟合程度')
        axes[1, 1].set_title('过拟合程度分析')
        axes[1, 1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
    def gradient_boosting_regression_demo(self):
        """梯度提升回归演示"""
        print("\n=== 梯度提升回归演示 ===")
        
        # 创建回归数据集
        X, y = make_regression(n_samples=500, n_features=10, noise=10, random_state=42)
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
        
        # 梯度提升回归器
        gb_regressor = GradientBoostingRegressor(
            n_estimators=100,
            learning_rate=0.1,
            max_depth=3,
            random_state=42
        )
        
        # 训练
        gb_regressor.fit(X_train, y_train)
        
        # 预测
        train_pred = gb_regressor.predict(X_train)
        test_pred = gb_regressor.predict(X_test)
        
        # 评估
        train_mse = mean_squared_error(y_train, train_pred)
        test_mse = mean_squared_error(y_test, test_pred)
        train_r2 = r2_score(y_train, train_pred)
        test_r2 = r2_score(y_test, test_pred)
        
        print(f"训练 MSE: {train_mse:.2f}, R²: {train_r2:.3f}")
        print(f"测试 MSE: {test_mse:.2f}, R²: {test_r2:.3f}")
        
        # 残差分析
        self.analyze_gb_residuals(y_test, test_pred)
        
        return gb_regressor
        
    def analyze_gb_residuals(self, y_true, y_pred):
        """分析梯度提升回归残差"""
        residuals = y_true - y_pred
        
        fig, axes = plt.subplots(1, 3, figsize=(18, 5))
        
        # 1. 预测值 vs 真实值
        axes[0].scatter(y_true, y_pred, alpha=0.6)
        axes[0].plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], 'r--', lw=2)
        axes[0].set_xlabel('真实值')
        axes[0].set_ylabel('预测值')
        axes[0].set_title('预测值 vs 真实值')
        axes[0].grid(True, alpha=0.3)
        
        # 2. 残差分布
        axes[1].hist(residuals, bins=30, alpha=0.7, edgecolor='black')
        axes[1].axvline(x=0, color='red', linestyle='--', linewidth=2)
        axes[1].set_xlabel('残差')
        axes[1].set_ylabel('频数')
        axes[1].set_title('残差分布')
        axes[1].grid(True, alpha=0.3)
        
        # 3. 残差 vs 预测值
        axes[2].scatter(y_pred, residuals, alpha=0.6)
        axes[2].axhline(y=0, color='red', linestyle='--', linewidth=2)
        axes[2].set_xlabel('预测值')
        axes[2].set_ylabel('残差')
        axes[2].set_title('残差 vs 预测值')
        axes[2].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        # 残差统计
        print(f"残差均值: {np.mean(residuals):.3f}")
        print(f"残差标准差: {np.std(residuals):.3f}")
        print(f"残差范围: [{np.min(residuals):.2f}, {np.max(residuals):.2f}]")

# 演示梯度提升树
print("=== 梯度提升树演示 ===")
gb_demo = GradientBoostingDemo()
gb_demo.gradient_boosting_theory()
estimators, predictions = gb_demo.gradient_boosting_process_demo()
gb_classifier = gb_demo.gradient_boosting_classification_demo()
gb_regressor = gb_demo.gradient_boosting_regression_demo()

7.7 集成方法综合比较

7.7.1 不同集成方法的特点对比

class EnsembleComparison:
    def __init__(self):
        self.models = {}
        
    def ensemble_methods_comparison(self):
        """集成方法理论比较"""
        print("=== 集成方法综合比较 ===")
        
        comparison_data = {
            '方法': ['Bagging', 'Random Forest', 'AdaBoost', 'Gradient Boosting'],
            '训练方式': ['并行', '并行', '串行', '串行'],
            '主要目标': ['降低方差', '降低方差', '降低偏差', '降低偏差'],
            '基学习器': ['任意', '决策树', '弱学习器', '弱学习器'],
            '样本权重': ['均匀', '均匀', '自适应', '无'],
            '特征选择': ['全部', '随机子集', '全部', '全部'],
            '过拟合倾向': ['低', '低', '中等', '高'],
            '噪声敏感性': ['低', '低', '高', '中等'],
            '解释性': ['中等', '高', '低', '中等'],
            '计算复杂度': ['低', '中等', '中等', '高']
        }
        
        import pandas as pd
        df = pd.DataFrame(comparison_data)
        print(df.to_string(index=False))
        
        print("\n=== 各方法适用场景 ===")
        scenarios = {
            'Bagging': [
                '数据集较大,计算资源充足',
                '希望降低模型方差',
                '基学习器容易过拟合',
                '需要并行训练'
            ],
            'Random Forest': [
                '特征维度较高',
                '需要特征重要性评估',
                '数据包含噪声',
                '需要良好的泛化性能'
            ],
            'AdaBoost': [
                '基学习器性能较弱',
                '数据质量较好(少噪声)',
                '二分类问题',
                '需要理论保证'
            ],
            'Gradient Boosting': [
                '追求最高预测精度',
                '特征工程充分',
                '有足够的调参时间',
                '数据集中等规模'
            ]
        }
        
        for method, uses in scenarios.items():
            print(f"\n{method}适用场景:")
            for use in uses:
                print(f"  • {use}")
                
    def comprehensive_performance_comparison(self):
        """综合性能比较"""
        print("\n=== 综合性能比较 ===")
        
        # 创建多个不同特性的数据集
        datasets = {
            '小样本高维': make_classification(n_samples=200, n_features=50, n_informative=30, 
                                           n_redundant=20, random_state=42),
            '大样本低维': make_classification(n_samples=2000, n_features=10, n_informative=8, 
                                           n_redundant=2, random_state=42),
            '不平衡数据': make_classification(n_samples=1000, n_features=20, n_informative=15,
                                           weights=[0.9, 0.1], random_state=42),
            '含噪声数据': self.create_noisy_dataset()
        }
        
        # 定义集成方法
        ensemble_methods = {
            'Bagging': BaggingClassifier(n_estimators=50, random_state=42),
            'Random Forest': RandomForestClassifier(n_estimators=50, random_state=42),
            'AdaBoost': AdaBoostClassifier(n_estimators=50, random_state=42),
            'Gradient Boosting': GradientBoostingClassifier(n_estimators=50, random_state=42)
        }
        
        # 性能比较结果
        results = {}
        
        for dataset_name, (X, y) in datasets.items():
            print(f"\n{dataset_name}数据集结果:")
            X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
            
            dataset_results = {}
            
            for method_name, model in ensemble_methods.items():
                # 训练和评估
                start_time = time.time()
                model.fit(X_train, y_train)
                train_time = time.time() - start_time
                
                # 预测
                y_pred = model.predict(X_test)
                y_proba = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else None
                
                # 评估指标
                accuracy = accuracy_score(y_test, y_pred)
                precision = precision_score(y_test, y_pred, average='weighted')
                recall = recall_score(y_test, y_pred, average='weighted')
                f1 = f1_score(y_test, y_pred, average='weighted')
                
                dataset_results[method_name] = {
                    'accuracy': accuracy,
                    'precision': precision,
                    'recall': recall,
                    'f1': f1,
                    'train_time': train_time
                }
                
                print(f"  {method_name:15} - 准确率: {accuracy:.3f}, F1: {f1:.3f}, 训练时间: {train_time:.2f}s")
            
            results[dataset_name] = dataset_results
        
        # 可视化比较结果
        self.visualize_performance_comparison(results)
        
        return results
        
    def create_noisy_dataset(self):
        """创建含噪声的数据集"""
        X, y = make_classification(n_samples=1000, n_features=20, n_informative=15,
                                 n_redundant=5, random_state=42)
        
        # 添加噪声特征
        noise_features = np.random.randn(X.shape[0], 10)
        X = np.hstack([X, noise_features])
        
        # 添加标签噪声
        noise_indices = np.random.choice(len(y), int(0.1 * len(y)), replace=False)
        y[noise_indices] = 1 - y[noise_indices]
        
        return X, y
        
    def visualize_performance_comparison(self, results):
        """可视化性能比较结果"""
        import pandas as pd
        
        # 准备数据
        methods = list(results[list(results.keys())[0]].keys())
        datasets = list(results.keys())
        metrics = ['accuracy', 'f1', 'train_time']
        
        fig, axes = plt.subplots(2, 2, figsize=(16, 12))
        
        # 1. 准确率比较
        accuracy_data = []
        for dataset in datasets:
            for method in methods:
                accuracy_data.append({
                    'Dataset': dataset,
                    'Method': method,
                    'Accuracy': results[dataset][method]['accuracy']
                })
        
        df_accuracy = pd.DataFrame(accuracy_data)
        pivot_accuracy = df_accuracy.pivot(index='Dataset', columns='Method', values='Accuracy')
        
        pivot_accuracy.plot(kind='bar', ax=axes[0, 0], width=0.8)
        axes[0, 0].set_title('不同数据集上的准确率比较')
        axes[0, 0].set_ylabel('准确率')
        axes[0, 0].legend(bbox_to_anchor=(1.05, 1), loc='upper left')
        axes[0, 0].grid(True, alpha=0.3)
        
        # 2. F1分数比较
        f1_data = []
        for dataset in datasets:
            for method in methods:
                f1_data.append({
                    'Dataset': dataset,
                    'Method': method,
                    'F1': results[dataset][method]['f1']
                })
        
        df_f1 = pd.DataFrame(f1_data)
        pivot_f1 = df_f1.pivot(index='Dataset', columns='Method', values='F1')
        
        pivot_f1.plot(kind='bar', ax=axes[0, 1], width=0.8)
        axes[0, 1].set_title('不同数据集上的F1分数比较')
        axes[0, 1].set_ylabel('F1分数')
        axes[0, 1].legend(bbox_to_anchor=(1.05, 1), loc='upper left')
        axes[0, 1].grid(True, alpha=0.3)
        
        # 3. 训练时间比较
        time_data = []
        for dataset in datasets:
            for method in methods:
                time_data.append({
                    'Dataset': dataset,
                    'Method': method,
                    'Time': results[dataset][method]['train_time']
                })
        
        df_time = pd.DataFrame(time_data)
        pivot_time = df_time.pivot(index='Dataset', columns='Method', values='Time')
        
        pivot_time.plot(kind='bar', ax=axes[1, 0], width=0.8)
        axes[1, 0].set_title('不同数据集上的训练时间比较')
        axes[1, 0].set_ylabel('训练时间 (秒)')
        axes[1, 0].legend(bbox_to_anchor=(1.05, 1), loc='upper left')
        axes[1, 0].grid(True, alpha=0.3)
        
        # 4. 综合性能雷达图
        avg_metrics = {}
        for method in methods:
            avg_accuracy = np.mean([results[dataset][method]['accuracy'] for dataset in datasets])
            avg_f1 = np.mean([results[dataset][method]['f1'] for dataset in datasets])
            avg_time = np.mean([results[dataset][method]['train_time'] for dataset in datasets])
            
            # 标准化时间(越小越好,所以取倒数)
            normalized_time = 1 / (1 + avg_time)
            
            avg_metrics[method] = [avg_accuracy, avg_f1, normalized_time]
        
        # 雷达图
        angles = np.linspace(0, 2 * np.pi, 3, endpoint=False).tolist()
        angles += angles[:1]  # 闭合
        
        ax_radar = plt.subplot(2, 2, 4, projection='polar')
        
        for method, values in avg_metrics.items():
            values += values[:1]  # 闭合
            ax_radar.plot(angles, values, 'o-', linewidth=2, label=method)
            ax_radar.fill(angles, values, alpha=0.25)
        
        ax_radar.set_xticks(angles[:-1])
        ax_radar.set_xticklabels(['准确率', 'F1分数', '效率'])
        ax_radar.set_ylim(0, 1)
        ax_radar.set_title('综合性能雷达图')
        ax_radar.legend(bbox_to_anchor=(1.3, 1.0))
        
        plt.tight_layout()
        plt.show()
        
    def ensemble_selection_guide(self):
        """集成方法选择指南"""
        print("\n=== 集成方法选择指南 ===")
        
        decision_tree = {
            "数据规模": {
                "小数据集 (<1000样本)": {
                    "推荐": ["Random Forest", "Bagging"],
                    "原因": "避免过拟合,提供稳定性"
                },
                "中等数据集 (1000-10000样本)": {
                    "推荐": ["Random Forest", "Gradient Boosting"],
                    "原因": "平衡性能和计算成本"
                },
                "大数据集 (>10000样本)": {
                    "推荐": ["Random Forest", "Gradient Boosting"],
                    "原因": "充分利用数据,获得最佳性能"
                }
            },
            "特征维度": {
                "低维 (<20特征)": {
                    "推荐": ["AdaBoost", "Gradient Boosting"],
                    "原因": "特征充分,可以深度学习"
                },
                "中维 (20-100特征)": {
                    "推荐": ["Random Forest", "Gradient Boosting"],
                    "原因": "特征选择和深度学习并重"
                },
                "高维 (>100特征)": {
                    "推荐": ["Random Forest", "Bagging"],
                    "原因": "特征选择更重要"
                }
            },
            "数据质量": {
                "高质量(少噪声)": {
                    "推荐": ["AdaBoost", "Gradient Boosting"],
                    "原因": "可以充分利用数据信息"
                },
                "中等质量": {
                    "推荐": ["Random Forest", "Bagging"],
                    "原因": "对噪声有一定鲁棒性"
                },
                "低质量(多噪声)": {
                    "推荐": ["Random Forest", "Bagging"],
                    "原因": "通过平均减少噪声影响"
                }
            },
            "计算资源": {
                "有限": {
                    "推荐": ["Random Forest", "Bagging"],
                    "原因": "可并行训练,效率高"
                },
                "充足": {
                    "推荐": ["Gradient Boosting", "AdaBoost"],
                    "原因": "可以进行精细调参"
                }
            }
        }
        
        for category, subcategories in decision_tree.items():
            print(f"\n{category}:")
            for condition, recommendation in subcategories.items():
                print(f"  {condition}:")
                print(f"    推荐方法: {', '.join(recommendation['推荐'])}")
                print(f"    选择原因: {recommendation['原因']}")

# 演示集成方法比较
print("=== 集成方法综合比较 ===")
import time
from sklearn.metrics import precision_score, recall_score, f1_score

comparison = EnsembleComparison()
comparison.ensemble_methods_comparison()
performance_results = comparison.comprehensive_performance_comparison()
comparison.ensemble_selection_guide()

7.8 综合案例:电商用户行为预测

7.8.1 项目背景与目标

在这个综合案例中,我们将使用多种集成学习方法来预测电商用户的购买行为,比较不同方法的性能。

class EcommercePredictionCase:
    def __init__(self):
        self.data = None
        self.models = {}
        self.results = {}
        
    def create_ecommerce_dataset(self):
        """创建电商用户行为数据集"""
        print("=== 创建电商用户行为数据集 ===")
        
        np.random.seed(42)
        n_samples = 5000
        
        # 用户基本特征
        age = np.random.normal(35, 12, n_samples)
        age = np.clip(age, 18, 70)
        
        income = np.random.lognormal(10, 0.5, n_samples)
        income = np.clip(income, 20000, 200000)
        
        # 行为特征
        page_views = np.random.poisson(15, n_samples)
        session_duration = np.random.exponential(20, n_samples)  # 分钟
        previous_purchases = np.random.poisson(3, n_samples)
        
        # 时间特征
        hour_of_day = np.random.randint(0, 24, n_samples)
        day_of_week = np.random.randint(0, 7, n_samples)
        
        # 商品特征
        product_price = np.random.lognormal(4, 1, n_samples)
        product_rating = np.random.beta(8, 2, n_samples) * 5
        discount_rate = np.random.beta(2, 5, n_samples)
        
        # 设备和渠道特征
        is_mobile = np.random.binomial(1, 0.6, n_samples)
        is_returning_user = np.random.binomial(1, 0.4, n_samples)
        traffic_source = np.random.choice([0, 1, 2, 3], n_samples, p=[0.4, 0.3, 0.2, 0.1])
        
        # 构建特征矩阵
        X = np.column_stack([
            age, income, page_views, session_duration, previous_purchases,
            hour_of_day, day_of_week, product_price, product_rating, discount_rate,
            is_mobile, is_returning_user, traffic_source
        ])
        
        # 生成目标变量(购买概率)
        # 复杂的非线性关系
        purchase_prob = (
            0.1 * (age - 30) / 20 +
            0.15 * np.log(income / 50000) +
            0.2 * np.tanh(page_views / 10) +
            0.1 * np.tanh(session_duration / 30) +
            0.25 * np.tanh(previous_purchases / 5) +
            0.1 * (product_rating - 2.5) / 2.5 +
            0.15 * discount_rate +
            0.1 * is_returning_user +
            -0.05 * is_mobile +
            np.random.normal(0, 0.1, n_samples)
        )
        
        # 转换为二分类标签
        y = (purchase_prob > np.median(purchase_prob)).astype(int)
        
        # 特征名称
        feature_names = [
            'age', 'income', 'page_views', 'session_duration', 'previous_purchases',
            'hour_of_day', 'day_of_week', 'product_price', 'product_rating', 'discount_rate',
            'is_mobile', 'is_returning_user', 'traffic_source'
        ]
        
        print(f"数据集大小: {X.shape}")
        print(f"正样本比例: {np.mean(y):.3f}")
        print(f"特征数量: {len(feature_names)}")
        
        self.data = {
            'X': X,
            'y': y,
            'feature_names': feature_names
        }
        
        # 数据探索性分析
        self.exploratory_data_analysis()
        
        return X, y, feature_names
        
    def exploratory_data_analysis(self):
        """探索性数据分析"""
        print("\n=== 探索性数据分析 ===")
        
        X, y = self.data['X'], self.data['y']
        feature_names = self.data['feature_names']
        
        # 基本统计信息
        print("特征统计信息:")
        for i, name in enumerate(feature_names):
            print(f"  {name:20}: 均值={X[:, i].mean():.2f}, 标准差={X[:, i].std():.2f}")
        
        # 可视化关键特征分布
        fig, axes = plt.subplots(2, 3, figsize=(18, 10))
        axes = axes.ravel()
        
        key_features = [0, 1, 2, 3, 4, 8]  # age, income, page_views, session_duration, previous_purchases, product_rating
        
        for i, feature_idx in enumerate(key_features):
            feature_name = feature_names[feature_idx]
            
            # 按购买行为分组
            purchase_data = X[y == 1, feature_idx]
            no_purchase_data = X[y == 0, feature_idx]
            
            axes[i].hist(no_purchase_data, alpha=0.7, label='未购买', bins=30, density=True)
            axes[i].hist(purchase_data, alpha=0.7, label='购买', bins=30, density=True)
            axes[i].set_title(f'{feature_name}分布')
            axes[i].set_xlabel(feature_name)
            axes[i].set_ylabel('密度')
            axes[i].legend()
            axes[i].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
    def data_preprocessing(self):
        """数据预处理"""
        print("\n=== 数据预处理 ===")
        
        X, y = self.data['X'], self.data['y']
        
        # 划分训练测试集
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )
        
        # 特征标准化
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)
        
        print(f"训练集大小: {X_train.shape}")
        print(f"测试集大小: {X_test.shape}")
        print(f"训练集正样本比例: {np.mean(y_train):.3f}")
        print(f"测试集正样本比例: {np.mean(y_test):.3f}")
        
        return X_train_scaled, X_test_scaled, y_train, y_test, scaler
        
    def build_ensemble_models(self, X_train, y_train):
        """构建多种集成模型"""
        print("\n=== 构建集成模型 ===")
        
        # 定义集成模型
        models = {
            'Bagging': BaggingClassifier(
                base_estimator=DecisionTreeClassifier(max_depth=10),
                n_estimators=100,
                random_state=42
            ),
            'Random Forest': RandomForestClassifier(
                n_estimators=100,
                max_depth=10,
                min_samples_split=5,
                min_samples_leaf=2,
                random_state=42
            ),
            'Extra Trees': ExtraTreesClassifier(
                n_estimators=100,
                max_depth=10,
                min_samples_split=5,
                min_samples_leaf=2,
                random_state=42
            ),
            'AdaBoost': AdaBoostClassifier(
                base_estimator=DecisionTreeClassifier(max_depth=3),
                n_estimators=100,
                learning_rate=1.0,
                random_state=42
            ),
            'Gradient Boosting': GradientBoostingClassifier(
                n_estimators=100,
                learning_rate=0.1,
                max_depth=6,
                min_samples_split=5,
                min_samples_leaf=2,
                random_state=42
            )
        }
        
        # 训练模型
        trained_models = {}
        
        for name, model in models.items():
            print(f"训练 {name}...")
            start_time = time.time()
            model.fit(X_train, y_train)
            train_time = time.time() - start_time
            
            trained_models[name] = {
                'model': model,
                'train_time': train_time
            }
            
            print(f"  训练时间: {train_time:.2f}秒")
        
        self.models = trained_models
        return trained_models
        
    def evaluate_models(self, X_test, y_test):
        """评估模型性能"""
        print("\n=== 模型性能评估 ===")
        
        evaluation_results = {}
        
        for name, model_info in self.models.items():
            model = model_info['model']
            
            # 预测
            y_pred = model.predict(X_test)
            y_proba = model.predict_proba(X_test)[:, 1]
            
            # 计算评估指标
            accuracy = accuracy_score(y_test, y_pred)
            precision = precision_score(y_test, y_pred)
            recall = recall_score(y_test, y_pred)
            f1 = f1_score(y_test, y_pred)
            auc = roc_auc_score(y_test, y_proba)
            
            evaluation_results[name] = {
                'accuracy': accuracy,
                'precision': precision,
                'recall': recall,
                'f1': f1,
                'auc': auc,
                'train_time': model_info['train_time'],
                'y_pred': y_pred,
                'y_proba': y_proba
            }
            
            print(f"{name:20} - 准确率: {accuracy:.3f}, 精确率: {precision:.3f}, "
                  f"召回率: {recall:.3f}, F1: {f1:.3f}, AUC: {auc:.3f}")
        
        self.results = evaluation_results
        
        # 可视化评估结果
        self.visualize_evaluation_results(y_test)
        
        return evaluation_results
        
    def visualize_evaluation_results(self, y_test):
        """可视化评估结果"""
        fig, axes = plt.subplots(2, 3, figsize=(20, 12))
        
        # 1. 性能指标比较
        metrics = ['accuracy', 'precision', 'recall', 'f1', 'auc']
        model_names = list(self.results.keys())
        
        metric_data = {metric: [self.results[model][metric] for model in model_names] 
                      for metric in metrics}
        
        x = np.arange(len(model_names))
        width = 0.15
        
        for i, metric in enumerate(metrics):
            axes[0, 0].bar(x + i * width, metric_data[metric], width, label=metric)
        
        axes[0, 0].set_xlabel('模型')
        axes[0, 0].set_ylabel('分数')
        axes[0, 0].set_title('性能指标比较')
        axes[0, 0].set_xticks(x + width * 2)
        axes[0, 0].set_xticklabels(model_names, rotation=45)
        axes[0, 0].legend()
        axes[0, 0].grid(True, alpha=0.3)
        
        # 2. ROC曲线
        for name, results in self.results.items():
            fpr, tpr, _ = roc_curve(y_test, results['y_proba'])
            auc_score = results['auc']
            axes[0, 1].plot(fpr, tpr, label=f'{name} (AUC={auc_score:.3f})')
        
        axes[0, 1].plot([0, 1], [0, 1], 'k--', alpha=0.5)
        axes[0, 1].set_xlabel('假正率')
        axes[0, 1].set_ylabel('真正率')
        axes[0, 1].set_title('ROC曲线比较')
        axes[0, 1].legend()
        axes[0, 1].grid(True, alpha=0.3)
        
        # 3. 精确率-召回率曲线
        for name, results in self.results.items():
            precision_curve, recall_curve, _ = precision_recall_curve(y_test, results['y_proba'])
            axes[0, 2].plot(recall_curve, precision_curve, label=name)
        
        axes[0, 2].set_xlabel('召回率')
        axes[0, 2].set_ylabel('精确率')
        axes[0, 2].set_title('精确率-召回率曲线')
        axes[0, 2].legend()
        axes[0, 2].grid(True, alpha=0.3)
        
        # 4. 训练时间比较
        train_times = [self.results[model]['train_time'] for model in model_names]
        axes[1, 0].bar(model_names, train_times)
        axes[1, 0].set_xlabel('模型')
        axes[1, 0].set_ylabel('训练时间 (秒)')
        axes[1, 0].set_title('训练时间比较')
        axes[1, 0].tick_params(axis='x', rotation=45)
        axes[1, 0].grid(True, alpha=0.3)
        
        # 5. 混淆矩阵(选择最佳模型)
        best_model = max(self.results.keys(), key=lambda x: self.results[x]['f1'])
        cm = confusion_matrix(y_test, self.results[best_model]['y_pred'])
        
        im = axes[1, 1].imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
        axes[1, 1].set_title(f'{best_model} 混淆矩阵')
        
        # 添加数值标注
        thresh = cm.max() / 2.
        for i in range(cm.shape[0]):
            for j in range(cm.shape[1]):
                axes[1, 1].text(j, i, format(cm[i, j], 'd'),
                               ha="center", va="center",
                               color="white" if cm[i, j] > thresh else "black")
        
        axes[1, 1].set_xlabel('预测标签')
        axes[1, 1].set_ylabel('真实标签')
        
        # 6. 特征重要性(Random Forest)
        if 'Random Forest' in self.results:
            rf_model = self.models['Random Forest']['model']
            feature_importance = rf_model.feature_importances_
            feature_names = self.data['feature_names']
            
            # 排序
            indices = np.argsort(feature_importance)[::-1][:10]
            
            axes[1, 2].bar(range(len(indices)), feature_importance[indices])
            axes[1, 2].set_xlabel('特征')
            axes[1, 2].set_ylabel('重要性')
            axes[1, 2].set_title('特征重要性 (Random Forest)')
            axes[1, 2].set_xticks(range(len(indices)))
            axes[1, 2].set_xticklabels([feature_names[i] for i in indices], rotation=45)
            axes[1, 2].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
    def model_interpretation(self):
        """模型解释与业务洞察"""
        print("\n=== 模型解释与业务洞察 ===")
        
        # 特征重要性分析
        print("1. 特征重要性分析:")
        
        if 'Random Forest' in self.models:
            rf_model = self.models['Random Forest']['model']
            feature_importance = rf_model.feature_importances_
            feature_names = self.data['feature_names']
            
            # 排序并显示
            importance_pairs = list(zip(feature_names, feature_importance))
            importance_pairs.sort(key=lambda x: x[1], reverse=True)
            
            print("  Random Forest特征重要性排序:")
            for i, (name, importance) in enumerate(importance_pairs[:10]):
                print(f"    {i+1:2d}. {name:20}: {importance:.3f}")
        
        # 业务洞察
        print("\n2. 业务洞察:")
        insights = [
            "历史购买次数是最重要的预测因子,说明用户忠诚度很关键",
            "商品评分对购买决策有重要影响,需要重视商品质量",
            "折扣率是重要因素,合理的促销策略能提高转化率",
            "用户年龄和收入水平影响购买行为,需要精准定位目标用户",
            "页面浏览数和会话时长反映用户兴趣度,可用于实时推荐"
        ]
        
        for i, insight in enumerate(insights, 1):
            print(f"  {i}. {insight}")
        
        # 模型选择建议
        print("\n3. 模型选择建议:")
        
        best_accuracy = max(self.results.keys(), key=lambda x: self.results[x]['accuracy'])
        best_f1 = max(self.results.keys(), key=lambda x: self.results[x]['f1'])
        best_auc = max(self.results.keys(), key=lambda x: self.results[x]['auc'])
        fastest = min(self.results.keys(), key=lambda x: self.results[x]['train_time'])
        
        print(f"  最高准确率: {best_accuracy} ({self.results[best_accuracy]['accuracy']:.3f})")
        print(f"  最高F1分数: {best_f1} ({self.results[best_f1]['f1']:.3f})")
        print(f"  最高AUC: {best_auc} ({self.results[best_auc]['auc']:.3f})")
        print(f"  训练最快: {fastest} ({self.results[fastest]['train_time']:.2f}秒)")
        
        print(f"\n  综合推荐: {best_f1} (平衡了精确率和召回率)")

# 运行电商用户行为预测案例
print("=== 电商用户行为预测综合案例 ===")
from sklearn.metrics import roc_curve, precision_recall_curve, roc_auc_score, confusion_matrix
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import ExtraTreesClassifier

case = EcommercePredictionCase()
X, y, feature_names = case.create_ecommerce_dataset()
X_train, X_test, y_train, y_test, scaler = case.data_preprocessing()
trained_models = case.build_ensemble_models(X_train, y_train)
evaluation_results = case.evaluate_models(X_test, y_test)
case.model_interpretation()

7.9 本章小结

7.9.1 集成学习核心概念回顾

本章深入探讨了集成学习的核心思想和主要方法:

1. 集成学习基础 - 集成学习通过组合多个学习器来提高整体性能 - 核心在于利用个体学习器的多样性 - 主要分为Bagging和Boosting两大类方法

2. Bagging方法 - 通过Bootstrap采样创建多样性 - 并行训练,主要降低方差 - 代表算法:Bagging、Random Forest

3. Boosting方法 - 串行训练,后续学习器关注前面学习器的错误 - 主要降低偏差,提高模型复杂度 - 代表算法:AdaBoost、Gradient Boosting

4. 随机森林 - 结合Bagging和随机特征选择 - 具有良好的泛化性能和特征重要性评估能力 - 对噪声和过拟合有较强的鲁棒性

5. 梯度提升树 - 通过拟合残差逐步改进模型 - 灵活的损失函数,强大的预测能力 - 需要仔细调参以避免过拟合

7.9.2 算法选择指导

数据特征导向的选择: - 小数据集:Random Forest、Bagging(避免过拟合) - 大数据集:Gradient Boosting、Random Forest(充分利用数据) - 高维数据:Random Forest(特征选择能力强) - 含噪声数据:Random Forest、Bagging(鲁棒性好) - 高质量数据:Gradient Boosting、AdaBoost(精度优先)

性能需求导向的选择: - 追求最高精度:Gradient Boosting - 需要快速训练:Random Forest、Bagging - 需要模型解释:Random Forest(特征重要性) - 内存受限:AdaBoost、单个Random Forest

7.9.3 最佳实践

1. 数据预处理 - 处理缺失值和异常值 - 适当的特征工程 - 对于Boosting方法,数据质量尤为重要

2. 超参数调优 - 使用交叉验证选择最优参数 - 重点关注:n_estimators、learning_rate、max_depth - 平衡模型复杂度和泛化能力

3. 模型评估 - 使用多种评估指标(准确率、精确率、召回率、F1、AUC) - 关注训练时间和预测时间 - 进行残差分析和特征重要性分析

4. 实际应用考虑 - 考虑模型的可解释性需求 - 评估计算资源限制 - 考虑在线学习和模型更新需求

7.9.4 常见陷阱

1. 过拟合问题 - Boosting方法容易过拟合,需要早停或正则化 - 监控验证集性能,避免过度训练

2. 参数设置不当 - 学习率过高导致不收敛 - 基学习器过于复杂导致过拟合 - n_estimators设置不合理

3. 数据质量忽视 - 对于AdaBoost,噪声数据影响很大 - 特征工程不充分影响所有集成方法

4. 评估偏差 - 仅关注单一指标 - 忽视计算成本 - 缺乏业务场景考虑

7.9.5 进阶学习方向

1. 高级集成方法 - XGBoost、LightGBM、CatBoost - Stacking和Blending - 多层集成架构

2. 深度学习中的集成 - 深度森林 - 神经网络集成 - 知识蒸馏

3. 在线集成学习 - 增量学习 - 概念漂移处理 - 自适应集成

4. 大规模集成学习 - 分布式训练 - 模型压缩 - 边缘计算部署

7.9.6 练习题

理论题: 1. 解释Bagging和Boosting的核心区别,并分析各自的优缺点 2. 为什么Random Forest能够提供特征重要性评估?其计算原理是什么? 3. AdaBoost中样本权重更新的数学原理是什么?为什么这样更新? 4. 梯度提升树如何选择损失函数?不同损失函数适用于什么场景?

实践题: 1. 使用本章学到的方法,在一个真实数据集上比较不同集成方法的性能 2. 实现一个简化版的AdaBoost算法,并与sklearn版本对比 3. 分析Random Forest中n_estimators和max_features参数对性能的影响 4. 设计一个集成方法选择的决策流程,考虑数据特征、性能需求和计算资源

项目题: 1. 选择一个实际业务场景,设计完整的集成学习解决方案 2. 实现一个自动化的集成方法选择系统 3. 比较集成学习与深度学习在特定任务上的性能差异


第7章完结

集成学习是机器学习中最实用和有效的方法之一。通过本章的学习,你应该掌握了主要集成方法的原理、实现和应用。在实际项目中,集成学习往往能够显著提升模型性能,是数据科学家必备的重要技能。

下一章我们将学习特征工程,这是提升模型性能的另一个重要方面。