7.1 集成学习概述
7.1.1 集成学习的基本思想
集成学习(Ensemble Learning)是机器学习中的一种重要方法,通过组合多个学习器来获得比单一学习器更好的预测性能。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import make_classification, make_regression, load_iris, load_boston
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.ensemble import (RandomForestClassifier, RandomForestRegressor,
BaggingClassifier, BaggingRegressor,
AdaBoostClassifier, AdaBoostRegressor,
GradientBoostingClassifier, GradientBoostingRegressor,
VotingClassifier, VotingRegressor,
ExtraTreesClassifier, ExtraTreesRegressor)
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.svm import SVC, SVR
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import (accuracy_score, classification_report,
mean_squared_error, r2_score, confusion_matrix)
from sklearn.preprocessing import StandardScaler
import warnings
warnings.filterwarnings('ignore')
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
class EnsembleLearningIntro:
def __init__(self):
self.models = {}
def ensemble_theory(self):
"""集成学习理论介绍"""
print("=== 集成学习理论 ===")
print("1. 基本思想:")
print(" - 三个臭皮匠,顶个诸葛亮")
print(" - 通过组合多个弱学习器构建强学习器")
print(" - 降低过拟合风险,提高泛化能力")
print("\n2. 集成学习的优势:")
print(" - 提高预测精度")
print(" - 增强模型鲁棒性")
print(" - 减少过拟合")
print(" - 处理不同类型的数据")
print("\n3. 集成学习的条件:")
print(" - 个体学习器要有一定的准确性(好于随机猜测)")
print(" - 个体学习器之间要有差异性(多样性)")
print("\n4. 主要方法分类:")
print(" - Bagging:并行训练,降低方差")
print(" - Boosting:串行训练,降低偏差")
print(" - Stacking:分层训练,元学习器")
def diversity_importance_demo(self):
"""演示多样性的重要性"""
print("\n=== 多样性重要性演示 ===")
# 创建数据集
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10,
n_redundant=10, n_clusters_per_class=1, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 创建不同类型的基学习器
base_models = {
'决策树': DecisionTreeClassifier(random_state=42),
'逻辑回归': LogisticRegression(random_state=42, max_iter=1000),
'朴素贝叶斯': GaussianNB(),
'KNN': KNeighborsClassifier(),
'SVM': SVC(probability=True, random_state=42)
}
# 评估单个模型
print("单个模型性能:")
individual_scores = {}
predictions = {}
for name, model in base_models.items():
if name == 'SVM':
# SVM需要标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
model.fit(X_train_scaled, y_train)
pred = model.predict(X_test_scaled)
pred_proba = model.predict_proba(X_test_scaled)
else:
model.fit(X_train, y_train)
pred = model.predict(X_test)
pred_proba = model.predict_proba(X_test) if hasattr(model, 'predict_proba') else None
score = accuracy_score(y_test, pred)
individual_scores[name] = score
predictions[name] = pred
print(f" {name}: {score:.3f}")
# 简单投票集成
print("\n集成方法性能:")
# 硬投票
ensemble_pred_hard = np.array([predictions[name] for name in base_models.keys()])
final_pred_hard = np.apply_along_axis(lambda x: np.bincount(x).argmax(),
axis=0, arr=ensemble_pred_hard)
hard_voting_score = accuracy_score(y_test, final_pred_hard)
print(f" 硬投票集成: {hard_voting_score:.3f}")
# 可视化结果
self.visualize_diversity_effect(individual_scores, hard_voting_score)
return individual_scores, hard_voting_score
def visualize_diversity_effect(self, individual_scores, ensemble_score):
"""可视化多样性效果"""
fig, axes = plt.subplots(1, 2, figsize=(15, 5))
# 1. 个体模型vs集成模型性能比较
models = list(individual_scores.keys()) + ['硬投票集成']
scores = list(individual_scores.values()) + [ensemble_score]
colors = ['skyblue'] * len(individual_scores) + ['red']
bars = axes[0].bar(models, scores, color=colors, alpha=0.7)
axes[0].set_title('个体模型 vs 集成模型性能')
axes[0].set_ylabel('准确率')
axes[0].tick_params(axis='x', rotation=45)
axes[0].grid(True, alpha=0.3)
# 添加数值标注
for bar, score in zip(bars, scores):
axes[0].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.005,
f'{score:.3f}', ha='center', va='bottom')
# 2. 集成学习优势示意图
x = np.linspace(0, 1, 100)
# 模拟个体学习器的错误率
error1 = 0.3 + 0.1 * np.sin(10 * x)
error2 = 0.25 + 0.15 * np.cos(8 * x)
error3 = 0.35 + 0.1 * np.sin(12 * x + np.pi/4)
# 集成后的错误率(简化模拟)
ensemble_error = (error1 + error2 + error3) / 3 - 0.05
axes[1].plot(x, error1, '--', label='学习器1', alpha=0.7)
axes[1].plot(x, error2, '--', label='学习器2', alpha=0.7)
axes[1].plot(x, error3, '--', label='学习器3', alpha=0.7)
axes[1].plot(x, ensemble_error, '-', linewidth=3, label='集成学习器', color='red')
axes[1].set_title('集成学习降低错误率示意')
axes[1].set_xlabel('数据复杂度')
axes[1].set_ylabel('错误率')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def bias_variance_analysis(self):
"""偏差-方差分析"""
print("\n=== 偏差-方差分析 ===")
print("1. 偏差(Bias):")
print(" - 模型预测值与真实值之间的差异")
print(" - 高偏差 → 欠拟合")
print(" - Boosting主要降低偏差")
print("\n2. 方差(Variance):")
print(" - 模型在不同训练集上预测的变化程度")
print(" - 高方差 → 过拟合")
print(" - Bagging主要降低方差")
print("\n3. 集成学习的作用:")
print(" - Bagging:通过平均降低方差")
print(" - Boosting:通过加权组合降低偏差")
print(" - 理想情况:同时降低偏差和方差")
# 可视化偏差-方差权衡
self.visualize_bias_variance_tradeoff()
def visualize_bias_variance_tradeoff(self):
"""可视化偏差-方差权衡"""
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
# 1. 偏差-方差分解
complexity = np.linspace(0.1, 2, 50)
bias_squared = 1 / (complexity + 0.1) # 偏差随复杂度降低
variance = complexity ** 1.5 # 方差随复杂度增加
noise = np.ones_like(complexity) * 0.1 # 噪声固定
total_error = bias_squared + variance + noise
axes[0].plot(complexity, bias_squared, label='偏差²', linewidth=2)
axes[0].plot(complexity, variance, label='方差', linewidth=2)
axes[0].plot(complexity, noise, label='噪声', linewidth=2)
axes[0].plot(complexity, total_error, label='总误差', linewidth=3, linestyle='--')
# 找到最优复杂度
optimal_idx = np.argmin(total_error)
axes[0].axvline(x=complexity[optimal_idx], color='red', linestyle=':',
label=f'最优复杂度: {complexity[optimal_idx]:.2f}')
axes[0].set_xlabel('模型复杂度')
axes[0].set_ylabel('误差')
axes[0].set_title('偏差-方差权衡')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 2. Bagging效果示意
x = np.linspace(0, 10, 100)
true_function = np.sin(x)
# 模拟多个模型的预测
np.random.seed(42)
n_models = 5
predictions = []
for i in range(n_models):
noise = np.random.normal(0, 0.3, len(x))
pred = true_function + noise + 0.1 * np.random.randn()
predictions.append(pred)
axes[1].plot(x, pred, '--', alpha=0.5, color='blue')
# Bagging平均
bagging_pred = np.mean(predictions, axis=0)
axes[1].plot(x, true_function, 'k-', linewidth=3, label='真实函数')
axes[1].plot(x, bagging_pred, 'r-', linewidth=3, label='Bagging平均')
axes[1].set_title('Bagging降低方差')
axes[1].set_xlabel('x')
axes[1].set_ylabel('y')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
# 3. Boosting效果示意
# 模拟Boosting过程
residuals = true_function.copy()
boosting_pred = np.zeros_like(x)
for i in range(3):
# 简化的弱学习器(线性拟合残差)
weak_pred = 0.3 * residuals + 0.1 * np.random.randn(len(x))
boosting_pred += weak_pred
residuals -= weak_pred
axes[2].plot(x, boosting_pred, '--', alpha=0.7,
label=f'第{i+1}轮' if i < 2 else '最终预测')
axes[2].plot(x, true_function, 'k-', linewidth=3, label='真实函数')
axes[2].set_title('Boosting降低偏差')
axes[2].set_xlabel('x')
axes[2].set_ylabel('y')
axes[2].legend()
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 演示集成学习概述
print("=== 集成学习概述演示 ===")
ensemble_intro = EnsembleLearningIntro()
ensemble_intro.ensemble_theory()
individual_scores, ensemble_score = ensemble_intro.diversity_importance_demo()
ensemble_intro.bias_variance_analysis()
7.2 Bagging方法
7.2.1 Bagging基本原理
Bagging(Bootstrap Aggregating)是一种并行集成方法,通过自助采样训练多个模型,然后平均它们的预测结果。
class BaggingDemo:
def __init__(self):
self.models = {}
def bagging_theory(self):
"""Bagging理论介绍"""
print("=== Bagging理论 ===")
print("1. 基本思想:")
print(" - Bootstrap采样:有放回抽样")
print(" - Aggregating:聚合多个模型的预测")
print(" - 并行训练,独立性强")
print("\n2. 算法流程:")
print(" - 从原始训练集中进行Bootstrap采样")
print(" - 在每个采样集上训练一个基学习器")
print(" - 对分类问题进行投票,对回归问题求平均")
print("\n3. 主要优势:")
print(" - 降低方差,减少过拟合")
print(" - 提高模型稳定性")
print(" - 可以并行训练")
print(" - 对噪声数据鲁棒")
print("\n4. 适用场景:")
print(" - 基学习器容易过拟合(如决策树)")
print(" - 数据集较小")
print(" - 需要提高模型稳定性")
def bootstrap_sampling_demo(self):
"""Bootstrap采样演示"""
print("\n=== Bootstrap采样演示 ===")
# 原始数据集
original_data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
print(f"原始数据集: {original_data}")
# 进行多次Bootstrap采样
n_samples = len(original_data)
n_bootstrap = 5
bootstrap_samples = []
for i in range(n_bootstrap):
# 有放回采样
indices = np.random.choice(n_samples, size=n_samples, replace=True)
sample = original_data[indices]
bootstrap_samples.append(sample)
print(f"Bootstrap样本 {i+1}: {sample}")
# 分析采样特性
self.analyze_bootstrap_properties(original_data, bootstrap_samples)
return bootstrap_samples
def analyze_bootstrap_properties(self, original_data, bootstrap_samples):
"""分析Bootstrap采样特性"""
print("\n=== Bootstrap采样特性分析 ===")
# 计算每个样本的出现频率
n_samples = len(original_data)
# 理论上,每个样本不被选中的概率
prob_not_selected = (1 - 1/n_samples) ** n_samples
prob_selected = 1 - prob_not_selected
print(f"理论上每个样本被选中的概率: {prob_selected:.3f}")
print(f"理论上约有 {prob_selected * n_samples:.1f} 个不同样本被选中")
# 实际统计
all_selected = np.concatenate(bootstrap_samples)
unique_selected = np.unique(all_selected)
print(f"实际选中的不同样本数: {len(unique_selected)}")
print(f"未被选中的样本: {set(original_data) - set(unique_selected)}")
# 可视化Bootstrap采样
self.visualize_bootstrap_sampling(original_data, bootstrap_samples)
def visualize_bootstrap_sampling(self, original_data, bootstrap_samples):
"""可视化Bootstrap采样"""
fig, axes = plt.subplots(2, 3, figsize=(18, 10))
# 1. 原始数据分布
axes[0, 0].bar(range(len(original_data)), original_data, alpha=0.7)
axes[0, 0].set_title('原始数据集')
axes[0, 0].set_xlabel('索引')
axes[0, 0].set_ylabel('值')
axes[0, 0].grid(True, alpha=0.3)
# 2-6. Bootstrap样本分布
for i, sample in enumerate(bootstrap_samples):
row = (i + 1) // 3
col = (i + 1) % 3
# 统计每个值的出现次数
unique_vals, counts = np.unique(sample, return_counts=True)
axes[row, col].bar(unique_vals, counts, alpha=0.7)
axes[row, col].set_title(f'Bootstrap样本 {i+1}')
axes[row, col].set_xlabel('值')
axes[row, col].set_ylabel('出现次数')
axes[row, col].grid(True, alpha=0.3)
axes[row, col].set_xlim(0, 11)
plt.tight_layout()
plt.show()
def bagging_classifier_demo(self):
"""Bagging分类器演示"""
print("\n=== Bagging分类器演示 ===")
# 创建数据集
X, y = make_classification(n_samples=1000, n_features=2, n_redundant=0,
n_informative=2, n_clusters_per_class=1, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 基学习器:决策树
base_classifier = DecisionTreeClassifier(random_state=42)
# Bagging分类器
bagging_classifier = BaggingClassifier(
base_estimator=base_classifier,
n_estimators=10,
random_state=42,
bootstrap=True,
n_jobs=-1
)
# 训练模型
base_classifier.fit(X_train, y_train)
bagging_classifier.fit(X_train, y_train)
# 预测
base_pred = base_classifier.predict(X_test)
bagging_pred = bagging_classifier.predict(X_test)
# 评估性能
base_score = accuracy_score(y_test, base_pred)
bagging_score = accuracy_score(y_test, bagging_pred)
print(f"单个决策树准确率: {base_score:.3f}")
print(f"Bagging准确率: {bagging_score:.3f}")
print(f"性能提升: {bagging_score - base_score:.3f}")
# 可视化决策边界
self.visualize_bagging_decision_boundary(X_train, y_train, X_test, y_test,
base_classifier, bagging_classifier)
return base_score, bagging_score
def visualize_bagging_decision_boundary(self, X_train, y_train, X_test, y_test,
base_classifier, bagging_classifier):
"""可视化Bagging决策边界"""
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
# 创建网格
h = 0.02
x_min, x_max = X_train[:, 0].min() - 1, X_train[:, 0].max() + 1
y_min, y_max = X_train[:, 1].min() - 1, X_train[:, 1].max() + 1
xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
np.arange(y_min, y_max, h))
# 1. 单个决策树
Z1 = base_classifier.predict(np.c_[xx.ravel(), yy.ravel()])
Z1 = Z1.reshape(xx.shape)
axes[0].contourf(xx, yy, Z1, alpha=0.4, cmap=plt.cm.RdYlBu)
scatter = axes[0].scatter(X_train[:, 0], X_train[:, 1], c=y_train, cmap=plt.cm.RdYlBu)
axes[0].set_title('单个决策树决策边界')
axes[0].set_xlabel('特征1')
axes[0].set_ylabel('特征2')
# 2. Bagging集成
Z2 = bagging_classifier.predict(np.c_[xx.ravel(), yy.ravel()])
Z2 = Z2.reshape(xx.shape)
axes[1].contourf(xx, yy, Z2, alpha=0.4, cmap=plt.cm.RdYlBu)
axes[1].scatter(X_train[:, 0], X_train[:, 1], c=y_train, cmap=plt.cm.RdYlBu)
axes[1].set_title('Bagging决策边界')
axes[1].set_xlabel('特征1')
axes[1].set_ylabel('特征2')
# 3. 个体学习器的多样性展示
# 显示前5个基学习器的决策边界
colors = ['red', 'blue', 'green', 'orange', 'purple']
for i, estimator in enumerate(bagging_classifier.estimators_[:5]):
Z_individual = estimator.predict(np.c_[xx.ravel(), yy.ravel()])
Z_individual = Z_individual.reshape(xx.shape)
axes[2].contour(xx, yy, Z_individual, colors=[colors[i]], alpha=0.6, linewidths=1)
axes[2].scatter(X_train[:, 0], X_train[:, 1], c=y_train, cmap=plt.cm.RdYlBu)
axes[2].set_title('个体学习器决策边界')
axes[2].set_xlabel('特征1')
axes[2].set_ylabel('特征2')
plt.tight_layout()
plt.show()
def bagging_regressor_demo(self):
"""Bagging回归器演示"""
print("\n=== Bagging回归器演示 ===")
# 创建回归数据集
X, y = make_regression(n_samples=300, n_features=1, noise=10, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 基学习器:决策树回归器
base_regressor = DecisionTreeRegressor(random_state=42, max_depth=5)
# Bagging回归器
bagging_regressor = BaggingRegressor(
base_estimator=base_regressor,
n_estimators=10,
random_state=42,
bootstrap=True,
n_jobs=-1
)
# 训练模型
base_regressor.fit(X_train, y_train)
bagging_regressor.fit(X_train, y_train)
# 预测
base_pred = base_regressor.predict(X_test)
bagging_pred = bagging_regressor.predict(X_test)
# 评估性能
base_mse = mean_squared_error(y_test, base_pred)
bagging_mse = mean_squared_error(y_test, bagging_pred)
base_r2 = r2_score(y_test, base_pred)
bagging_r2 = r2_score(y_test, bagging_pred)
print(f"单个决策树 - MSE: {base_mse:.2f}, R²: {base_r2:.3f}")
print(f"Bagging - MSE: {bagging_mse:.2f}, R²: {bagging_r2:.3f}")
print(f"MSE改善: {base_mse - bagging_mse:.2f}")
# 可视化回归结果
self.visualize_bagging_regression(X_train, y_train, X_test, y_test,
base_regressor, bagging_regressor)
return base_mse, bagging_mse
def visualize_bagging_regression(self, X_train, y_train, X_test, y_test,
base_regressor, bagging_regressor):
"""可视化Bagging回归结果"""
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
# 创建预测范围
X_range = np.linspace(X_train.min(), X_train.max(), 300).reshape(-1, 1)
# 1. 单个决策树
base_pred_range = base_regressor.predict(X_range)
axes[0].scatter(X_train, y_train, alpha=0.6, label='训练数据')
axes[0].scatter(X_test, y_test, alpha=0.6, color='red', label='测试数据')
axes[0].plot(X_range, base_pred_range, color='green', linewidth=2, label='预测')
axes[0].set_title('单个决策树回归')
axes[0].set_xlabel('X')
axes[0].set_ylabel('y')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 2. Bagging回归
bagging_pred_range = bagging_regressor.predict(X_range)
axes[1].scatter(X_train, y_train, alpha=0.6, label='训练数据')
axes[1].scatter(X_test, y_test, alpha=0.6, color='red', label='测试数据')
axes[1].plot(X_range, bagging_pred_range, color='green', linewidth=2, label='预测')
axes[1].set_title('Bagging回归')
axes[1].set_xlabel('X')
axes[1].set_ylabel('y')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
# 3. 个体学习器预测展示
axes[2].scatter(X_train, y_train, alpha=0.6, label='训练数据')
# 显示前5个基学习器的预测
colors = ['red', 'blue', 'green', 'orange', 'purple']
for i, estimator in enumerate(bagging_regressor.estimators_[:5]):
individual_pred = estimator.predict(X_range)
axes[2].plot(X_range, individual_pred, color=colors[i], alpha=0.6,
linewidth=1, label=f'学习器{i+1}')
# Bagging平均预测
axes[2].plot(X_range, bagging_pred_range, color='black', linewidth=3,
label='Bagging平均')
axes[2].set_title('个体学习器 vs Bagging平均')
axes[2].set_xlabel('X')
axes[2].set_ylabel('y')
axes[2].legend()
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def bagging_parameters_analysis(self):
"""Bagging参数分析"""
print("\n=== Bagging参数分析 ===")
# 创建数据集
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10,
n_redundant=10, n_clusters_per_class=1, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 1. 基学习器数量的影响
n_estimators_range = [1, 5, 10, 20, 50, 100, 200]
scores_vs_n_estimators = []
for n_est in n_estimators_range:
bagging = BaggingClassifier(
base_estimator=DecisionTreeClassifier(random_state=42),
n_estimators=n_est,
random_state=42
)
bagging.fit(X_train, y_train)
score = bagging.score(X_test, y_test)
scores_vs_n_estimators.append(score)
print(f"n_estimators={n_est:3d}: 准确率={score:.3f}")
# 2. 样本采样比例的影响
print("\n样本采样比例的影响:")
max_samples_range = [0.1, 0.3, 0.5, 0.7, 0.9, 1.0]
scores_vs_max_samples = []
for max_samp in max_samples_range:
bagging = BaggingClassifier(
base_estimator=DecisionTreeClassifier(random_state=42),
n_estimators=50,
max_samples=max_samp,
random_state=42
)
bagging.fit(X_train, y_train)
score = bagging.score(X_test, y_test)
scores_vs_max_samples.append(score)
print(f"max_samples={max_samp:.1f}: 准确率={score:.3f}")
# 可视化参数影响
self.visualize_bagging_parameters(n_estimators_range, scores_vs_n_estimators,
max_samples_range, scores_vs_max_samples)
def visualize_bagging_parameters(self, n_estimators_range, scores_vs_n_estimators,
max_samples_range, scores_vs_max_samples):
"""可视化Bagging参数影响"""
fig, axes = plt.subplots(1, 2, figsize=(15, 5))
# 1. 基学习器数量的影响
axes[0].plot(n_estimators_range, scores_vs_n_estimators, 'o-', linewidth=2, markersize=8)
axes[0].set_xlabel('基学习器数量')
axes[0].set_ylabel('准确率')
axes[0].set_title('基学习器数量对性能的影响')
axes[0].grid(True, alpha=0.3)
axes[0].set_xscale('log')
# 添加数值标注
for x, y in zip(n_estimators_range, scores_vs_n_estimators):
axes[0].annotate(f'{y:.3f}', (x, y), textcoords="offset points",
xytext=(0,10), ha='center')
# 2. 样本采样比例的影响
axes[1].plot(max_samples_range, scores_vs_max_samples, 'o-', linewidth=2, markersize=8)
axes[1].set_xlabel('样本采样比例')
axes[1].set_ylabel('准确率')
axes[1].set_title('样本采样比例对性能的影响')
axes[1].grid(True, alpha=0.3)
# 添加数值标注
for x, y in zip(max_samples_range, scores_vs_max_samples):
axes[1].annotate(f'{y:.3f}', (x, y), textcoords="offset points",
xytext=(0,10), ha='center')
plt.tight_layout()
plt.show()
# 演示Bagging方法
print("=== Bagging方法演示 ===")
bagging_demo = BaggingDemo()
bagging_demo.bagging_theory()
bootstrap_samples = bagging_demo.bootstrap_sampling_demo()
base_score, bagging_score = bagging_demo.bagging_classifier_demo()
base_mse, bagging_mse = bagging_demo.bagging_regressor_demo()
bagging_demo.bagging_parameters_analysis()
7.3 随机森林
7.3.1 随机森林原理
随机森林是Bagging的一个扩展,在训练过程中引入了随机特征选择,进一步增加了模型的多样性。
class RandomForestDemo:
def __init__(self):
self.models = {}
def random_forest_theory(self):
"""随机森林理论介绍"""
print("=== 随机森林理论 ===")
print("1. 基本思想:")
print(" - Bagging + 随机特征选择")
print(" - 每个节点分裂时随机选择特征子集")
print(" - 进一步增加模型多样性")
print("\n2. 算法流程:")
print(" - Bootstrap采样生成训练子集")
print(" - 每个节点随机选择m个特征(m < 总特征数)")
print(" - 在选定特征中找最佳分裂点")
print(" - 构建多棵决策树")
print(" - 投票或平均得到最终预测")
print("\n3. 关键参数:")
print(" - n_estimators: 树的数量")
print(" - max_features: 每次分裂考虑的特征数")
print(" - max_depth: 树的最大深度")
print(" - min_samples_split: 分裂所需最小样本数")
print("\n4. 主要优势:")
print(" - 准确率高,泛化能力强")
print(" - 对缺失值不敏感")
print(" - 可以处理大规模数据")
print(" - 提供特征重要性")
print(" - 不容易过拟合")
def feature_randomness_demo(self):
"""特征随机性演示"""
print("\n=== 特征随机性演示 ===")
# 创建高维数据集
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10,
n_redundant=5, n_clusters_per_class=1, random_state=42)
feature_names = [f'特征_{i+1}' for i in range(X.shape[1])]
# 不同max_features设置的比较
max_features_options = ['sqrt', 'log2', None, 0.5]
print("不同特征选择策略的比较:")
for max_feat in max_features_options:
rf = RandomForestClassifier(
n_estimators=100,
max_features=max_feat,
random_state=42
)
# 交叉验证评估
scores = cross_val_score(rf, X, y, cv=5)
if max_feat == 'sqrt':
n_features_used = int(np.sqrt(X.shape[1]))
desc = f"sqrt({X.shape[1]}) = {n_features_used}"
elif max_feat == 'log2':
n_features_used = int(np.log2(X.shape[1]))
desc = f"log2({X.shape[1]}) = {n_features_used}"
elif max_feat is None:
n_features_used = X.shape[1]
desc = f"全部特征 = {n_features_used}"
else:
n_features_used = int(max_feat * X.shape[1])
desc = f"{max_feat} × {X.shape[1]} = {n_features_used}"
print(f" {desc:20s}: 准确率 = {scores.mean():.3f} ± {scores.std():.3f}")
# 可视化特征重要性
self.visualize_feature_importance(X, y, feature_names)
def visualize_feature_importance(self, X, y, feature_names):
"""可视化特征重要性"""
# 训练随机森林
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X, y)
# 获取特征重要性
importances = rf.feature_importances_
indices = np.argsort(importances)[::-1]
# 可视化
fig, axes = plt.subplots(1, 2, figsize=(15, 6))
# 1. 特征重要性条形图
axes[0].bar(range(len(importances)), importances[indices])
axes[0].set_title('随机森林特征重要性')
axes[0].set_xlabel('特征排名')
axes[0].set_ylabel('重要性')
axes[0].set_xticks(range(len(importances)))
axes[0].set_xticklabels([feature_names[i] for i in indices], rotation=45)
# 2. 累积重要性
cumulative_importance = np.cumsum(importances[indices])
axes[1].plot(range(1, len(importances)+1), cumulative_importance, 'o-')
axes[1].axhline(y=0.8, color='r', linestyle='--', label='80%重要性')
axes[1].axhline(y=0.9, color='orange', linestyle='--', label='90%重要性')
axes[1].set_title('累积特征重要性')
axes[1].set_xlabel('特征数量')
axes[1].set_ylabel('累积重要性')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 输出重要特征
print(f"\n前5个重要特征:")
for i in range(5):
idx = indices[i]
print(f" {feature_names[idx]}: {importances[idx]:.3f}")
def random_forest_vs_decision_tree(self):
"""随机森林与决策树比较"""
print("\n=== 随机森林 vs 决策树比较 ===")
# 创建数据集
X, y = make_classification(n_samples=1000, n_features=2, n_redundant=0,
n_informative=2, n_clusters_per_class=1,
n_classes=3, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 模型定义
models = {
'单个决策树': DecisionTreeClassifier(random_state=42),
'随机森林': RandomForestClassifier(n_estimators=100, random_state=42)
}
# 训练和评估
results = {}
for name, model in models.items():
model.fit(X_train, y_train)
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)
results[name] = {
'model': model,
'train_score': train_score,
'test_score': test_score
}
print(f"{name}:")
print(f" 训练准确率: {train_score:.3f}")
print(f" 测试准确率: {test_score:.3f}")
print(f" 过拟合程度: {train_score - test_score:.3f}")
# 可视化比较
self.visualize_rf_vs_dt_comparison(X_train, y_train, X_test, y_test, results)
return results
def visualize_rf_vs_dt_comparison(self, X_train, y_train, X_test, y_test, results):
"""可视化随机森林与决策树比较"""
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
# 创建网格
h = 0.02
x_min, x_max = X_train[:, 0].min() - 1, X_train[:, 0].max() + 1
y_min, y_max = X_train[:, 1].min() - 1, X_train[:, 1].max() + 1
xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
np.arange(y_min, y_max, h))
# 决策树
dt_model = results['单个决策树']['model']
Z_dt = dt_model.predict(np.c_[xx.ravel(), yy.ravel()])
Z_dt = Z_dt.reshape(xx.shape)
axes[0, 0].contourf(xx, yy, Z_dt, alpha=0.4, cmap=plt.cm.Set3)
axes[0, 0].scatter(X_train[:, 0], X_train[:, 1], c=y_train, cmap=plt.cm.Set3)
axes[0, 0].set_title('决策树决策边界(训练集)')
axes[0, 0].set_xlabel('特征1')
axes[0, 0].set_ylabel('特征2')
axes[0, 1].contourf(xx, yy, Z_dt, alpha=0.4, cmap=plt.cm.Set3)
axes[0, 1].scatter(X_test[:, 0], X_test[:, 1], c=y_test, cmap=plt.cm.Set3)
axes[0, 1].set_title('决策树决策边界(测试集)')
axes[0, 1].set_xlabel('特征1')
axes[0, 1].set_ylabel('特征2')
# 随机森林
rf_model = results['随机森林']['model']
Z_rf = rf_model.predict(np.c_[xx.ravel(), yy.ravel()])
Z_rf = Z_rf.reshape(xx.shape)
axes[1, 0].contourf(xx, yy, Z_rf, alpha=0.4, cmap=plt.cm.Set3)
axes[1, 0].scatter(X_train[:, 0], X_train[:, 1], c=y_train, cmap=plt.cm.Set3)
axes[1, 0].set_title('随机森林决策边界(训练集)')
axes[1, 0].set_xlabel('特征1')
axes[1, 0].set_ylabel('特征2')
axes[1, 1].contourf(xx, yy, Z_rf, alpha=0.4, cmap=plt.cm.Set3)
axes[1, 1].scatter(X_test[:, 0], X_test[:, 1], c=y_test, cmap=plt.cm.Set3)
axes[1, 1].set_title('随机森林决策边界(测试集)')
axes[1, 1].set_xlabel('特征1')
axes[1, 1].set_ylabel('特征2')
plt.tight_layout()
plt.show()
def random_forest_regression_demo(self):
"""随机森林回归演示"""
print("\n=== 随机森林回归演示 ===")
# 创建非线性回归数据
np.random.seed(42)
X = np.linspace(0, 10, 300).reshape(-1, 1)
y = np.sin(X).ravel() + np.random.normal(0, 0.3, X.shape[0])
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 模型比较
models = {
'决策树回归': DecisionTreeRegressor(random_state=42),
'随机森林回归': RandomForestRegressor(n_estimators=100, random_state=42)
}
# 训练和评估
results = {}
for name, model in models.items():
model.fit(X_train, y_train)
train_pred = model.predict(X_train)
test_pred = model.predict(X_test)
train_mse = mean_squared_error(y_train, train_pred)
test_mse = mean_squared_error(y_test, test_pred)
train_r2 = r2_score(y_train, train_pred)
test_r2 = r2_score(y_test, test_pred)
results[name] = {
'model': model,
'train_mse': train_mse,
'test_mse': test_mse,
'train_r2': train_r2,
'test_r2': test_r2
}
print(f"{name}:")
print(f" 训练 MSE: {train_mse:.3f}, R²: {train_r2:.3f}")
print(f" 测试 MSE: {test_mse:.3f}, R²: {test_r2:.3f}")
# 可视化回归结果
self.visualize_rf_regression(X_train, y_train, X_test, y_test, results)
return results
def visualize_rf_regression(self, X_train, y_train, X_test, y_test, results):
"""可视化随机森林回归结果"""
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
# 创建预测范围
X_range = np.linspace(X_train.min(), X_train.max(), 300).reshape(-1, 1)
# 1. 决策树回归
dt_model = results['决策树回归']['model']
dt_pred = dt_model.predict(X_range)
axes[0].scatter(X_train, y_train, alpha=0.6, label='训练数据')
axes[0].scatter(X_test, y_test, alpha=0.6, color='red', label='测试数据')
axes[0].plot(X_range, dt_pred, color='green', linewidth=2, label='预测')
axes[0].set_title('决策树回归')
axes[0].set_xlabel('X')
axes[0].set_ylabel('y')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 2. 随机森林回归
rf_model = results['随机森林回归']['model']
rf_pred = rf_model.predict(X_range)
axes[1].scatter(X_train, y_train, alpha=0.6, label='训练数据')
axes[1].scatter(X_test, y_test, alpha=0.6, color='red', label='测试数据')
axes[1].plot(X_range, rf_pred, color='green', linewidth=2, label='预测')
axes[1].set_title('随机森林回归')
axes[1].set_xlabel('X')
axes[1].set_ylabel('y')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
# 3. 个体树的预测(随机森林)
axes[2].scatter(X_train, y_train, alpha=0.6, label='训练数据')
# 显示前10棵树的预测
colors = plt.cm.tab10(np.linspace(0, 1, 10))
for i, tree in enumerate(rf_model.estimators_[:10]):
tree_pred = tree.predict(X_range)
axes[2].plot(X_range, tree_pred, color=colors[i], alpha=0.3, linewidth=1)
# 随机森林平均预测
axes[2].plot(X_range, rf_pred, color='red', linewidth=3, label='随机森林平均')
axes[2].set_title('个体树 vs 随机森林平均')
axes[2].set_xlabel('X')
axes[2].set_ylabel('y')
axes[2].legend()
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def random_forest_hyperparameter_tuning(self):
"""随机森林超参数调优"""
print("\n=== 随机森林超参数调优 ===")
# 创建数据集
X, y = make_classification(n_samples=1000, n_features=20, n_informative=15,
n_redundant=5, n_clusters_per_class=1, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 参数网格
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [None, 10, 20],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4],
'max_features': ['sqrt', 'log2']
}
# 网格搜索
rf = RandomForestClassifier(random_state=42)
grid_search = GridSearchCV(rf, param_grid, cv=5, scoring='accuracy', n_jobs=-1)
print("进行网格搜索...")
grid_search.fit(X_train, y_train)
print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳交叉验证分数: {grid_search.best_score_:.3f}")
# 最佳模型在测试集上的性能
best_rf = grid_search.best_estimator_
test_score = best_rf.score(X_test, y_test)
print(f"测试集准确率: {test_score:.3f}")
# 参数重要性分析
self.analyze_parameter_importance(grid_search)
return grid_search.best_estimator_
def analyze_parameter_importance(self, grid_search):
"""分析参数重要性"""
print("\n=== 参数重要性分析 ===")
# 获取所有结果
results_df = pd.DataFrame(grid_search.cv_results_)
# 分析每个参数的影响
params_to_analyze = ['n_estimators', 'max_depth', 'min_samples_split', 'max_features']
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
axes = axes.ravel()
for i, param in enumerate(params_to_analyze):
param_col = f'param_{param}'
if param_col in results_df.columns:
# 按参数值分组,计算平均分数
param_scores = results_df.groupby(param_col)['mean_test_score'].agg(['mean', 'std'])
x_values = param_scores.index
y_values = param_scores['mean']
y_errors = param_scores['std']
axes[i].errorbar(range(len(x_values)), y_values, yerr=y_errors,
marker='o', capsize=5, capthick=2)
axes[i].set_xticks(range(len(x_values)))
axes[i].set_xticklabels(x_values, rotation=45)
axes[i].set_title(f'{param} 对性能的影响')
axes[i].set_ylabel('交叉验证分数')
axes[i].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 演示随机森林
print("=== 随机森林演示 ===")
rf_demo = RandomForestDemo()
rf_demo.random_forest_theory()
rf_demo.feature_randomness_demo()
rf_vs_dt_results = rf_demo.random_forest_vs_decision_tree()
rf_regression_results = rf_demo.random_forest_regression_demo()
best_rf = rf_demo.random_forest_hyperparameter_tuning()
7.4 Boosting方法
7.4.1 Boosting基本原理
Boosting是一种串行集成方法,通过逐步纠正前一个学习器的错误来提高整体性能。
class BoostingDemo:
def __init__(self):
self.models = {}
def boosting_theory(self):
"""Boosting理论介绍"""
print("=== Boosting理论 ===")
print("1. 基本思想:")
print(" - 串行训练,后续学习器关注前面学习器的错误")
print(" - 通过加权组合弱学习器构建强学习器")
print(" - 主要降低偏差,提高模型复杂度")
print("\n2. 算法流程:")
print(" - 初始化样本权重(均匀分布)")
print(" - 训练弱学习器")
print(" - 计算学习器权重(基于错误率)")
print(" - 更新样本权重(增加错误样本权重)")
print(" - 重复直到达到指定轮数")
print(" - 加权组合所有弱学习器")
print("\n3. 主要优势:")
print(" - 能够将弱学习器提升为强学习器")
print(" - 理论基础扎实")
print(" - 对偏差敏感,能有效降低偏差")
print("\n4. 主要缺点:")
print(" - 对噪声和异常值敏感")
print(" - 容易过拟合")
print(" - 训练时间较长(串行)")
print("\n5. 常见Boosting算法:")
print(" - AdaBoost: 自适应提升")
print(" - Gradient Boosting: 梯度提升")
print(" - XGBoost: 极端梯度提升")
print(" - LightGBM: 轻量级梯度提升")
def boosting_process_visualization(self):
"""Boosting过程可视化"""
print("\n=== Boosting过程可视化 ===")
# 创建简单的二分类数据
np.random.seed(42)
X = np.random.randn(100, 2)
y = (X[:, 0] + X[:, 1] > 0).astype(int)
# 添加一些噪声点
noise_indices = np.random.choice(100, 10, replace=False)
y[noise_indices] = 1 - y[noise_indices]
# 模拟AdaBoost过程
n_estimators = 4
sample_weights = np.ones(len(X)) / len(X)
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
axes = axes.ravel()
estimator_weights = []
for i in range(n_estimators):
# 简化的弱学习器(基于单个特征的阈值)
feature_idx = i % 2
threshold = np.median(X[:, feature_idx])
# 预测
predictions = (X[:, feature_idx] > threshold).astype(int)
# 计算加权错误率
errors = (predictions != y)
weighted_error = np.sum(sample_weights * errors)
# 计算学习器权重
if weighted_error == 0:
alpha = 1.0
else:
alpha = 0.5 * np.log((1 - weighted_error) / weighted_error)
estimator_weights.append(alpha)
# 更新样本权重
sample_weights *= np.exp(-alpha * y * (2 * predictions - 1))
sample_weights /= np.sum(sample_weights)
# 可视化
scatter = axes[i].scatter(X[:, 0], X[:, 1], c=y, s=sample_weights*5000,
alpha=0.7, cmap=plt.cm.RdYlBu)
# 绘制决策边界
if feature_idx == 0:
axes[i].axvline(x=threshold, color='red', linestyle='--', linewidth=2)
else:
axes[i].axhline(y=threshold, color='red', linestyle='--', linewidth=2)
axes[i].set_title(f'第{i+1}轮 - 特征{feature_idx+1}, α={alpha:.2f}')
axes[i].set_xlabel('特征1')
axes[i].set_ylabel('特征2')
plt.tight_layout()
plt.show()
print("学习器权重:", [f"{w:.2f}" for w in estimator_weights])
def adaboost_step_by_step(self):
"""AdaBoost逐步演示"""
print("\n=== AdaBoost逐步演示 ===")
# 创建数据集
X, y = make_classification(n_samples=300, n_features=2, n_redundant=0,
n_informative=2, n_clusters_per_class=1, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 手动实现简化的AdaBoost过程
n_estimators = 5
sample_weights = np.ones(len(X_train)) / len(X_train)
estimators = []
estimator_weights = []
print("AdaBoost训练过程:")
for i in range(n_estimators):
# 训练弱学习器(决策树桩)
weak_learner = DecisionTreeClassifier(max_depth=1, random_state=i)
weak_learner.fit(X_train, y_train, sample_weight=sample_weights)
# 预测
predictions = weak_learner.predict(X_train)
# 计算加权错误率
errors = (predictions != y_train)
weighted_error = np.sum(sample_weights * errors)
# 计算学习器权重
if weighted_error == 0:
alpha = 1.0
else:
alpha = 0.5 * np.log((1 - weighted_error) / weighted_error)
estimators.append(weak_learner)
estimator_weights.append(alpha)
print(f" 第{i+1}轮: 错误率={weighted_error:.3f}, 权重={alpha:.3f}")
# 更新样本权重
sample_weights *= np.exp(-alpha * (2 * y_train - 1) * (2 * predictions - 1))
sample_weights /= np.sum(sample_weights)
# 集成预测
def ensemble_predict(X):
predictions = np.zeros(len(X))
for estimator, weight in zip(estimators, estimator_weights):
pred = estimator.predict(X)
predictions += weight * (2 * pred - 1)
return (predictions > 0).astype(int)
# 评估性能
manual_pred = ensemble_predict(X_test)
manual_score = accuracy_score(y_test, manual_pred)
# 与sklearn的AdaBoost比较
ada_boost = AdaBoostClassifier(n_estimators=n_estimators, random_state=42)
ada_boost.fit(X_train, y_train)
sklearn_score = ada_boost.score(X_test, y_test)
print(f"\n手动实现AdaBoost准确率: {manual_score:.3f}")
print(f"sklearn AdaBoost准确率: {sklearn_score:.3f}")
# 可视化AdaBoost过程
self.visualize_adaboost_process(X_train, y_train, estimators, estimator_weights)
return estimators, estimator_weights
def visualize_adaboost_process(self, X_train, y_train, estimators, estimator_weights):
"""可视化AdaBoost过程"""
fig, axes = plt.subplots(2, 3, figsize=(18, 10))
# 创建网格
h = 0.02
x_min, x_max = X_train[:, 0].min() - 1, X_train[:, 0].max() + 1
y_min, y_max = X_train[:, 1].min() - 1, X_train[:, 1].max() + 1
xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
np.arange(y_min, y_max, h))
# 显示前5个弱学习器
for i in range(min(5, len(estimators))):
row = i // 3
col = i % 3
# 弱学习器决策边界
Z = estimators[i].predict(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
axes[row, col].contourf(xx, yy, Z, alpha=0.4, cmap=plt.cm.RdYlBu)
axes[row, col].scatter(X_train[:, 0], X_train[:, 1], c=y_train, cmap=plt.cm.RdYlBu)
axes[row, col].set_title(f'弱学习器{i+1} (权重={estimator_weights[i]:.2f})')
axes[row, col].set_xlabel('特征1')
axes[row, col].set_ylabel('特征2')
# 最终集成结果
def ensemble_predict_proba(X):
predictions = np.zeros(len(X))
for estimator, weight in zip(estimators, estimator_weights):
pred = estimator.predict(X)
predictions += weight * (2 * pred - 1)
return predictions
Z_ensemble = ensemble_predict_proba(np.c_[xx.ravel(), yy.ravel()])
Z_ensemble = (Z_ensemble > 0).astype(int).reshape(xx.shape)
axes[1, 2].contourf(xx, yy, Z_ensemble, alpha=0.4, cmap=plt.cm.RdYlBu)
axes[1, 2].scatter(X_train[:, 0], X_train[:, 1], c=y_train, cmap=plt.cm.RdYlBu)
axes[1, 2].set_title('AdaBoost集成结果')
axes[1, 2].set_xlabel('特征1')
axes[1, 2].set_ylabel('特征2')
plt.tight_layout()
plt.show()
# 演示Boosting方法
print("=== Boosting方法演示 ===")
boosting_demo = BoostingDemo()
boosting_demo.boosting_theory()
boosting_demo.boosting_process_visualization()
estimators, weights = boosting_demo.adaboost_step_by_step()
7.5 AdaBoost算法
7.5.1 AdaBoost详细实现
AdaBoost(Adaptive Boosting)是最经典的Boosting算法,通过自适应地调整样本权重来提升弱学习器。
class AdaBoostDemo:
def __init__(self):
self.models = {}
def adaboost_theory(self):
"""AdaBoost理论详解"""
print("=== AdaBoost理论详解 ===")
print("1. 算法核心思想:")
print(" - 每轮训练后,增加被错误分类样本的权重")
print(" - 减少被正确分类样本的权重")
print(" - 让后续学习器更关注困难样本")
print("\n2. 数学公式:")
print(" - 学习器权重: α_t = 0.5 * ln((1-ε_t)/ε_t)")
print(" - 样本权重更新: w_t+1 = w_t * exp(-α_t * y_i * h_t(x_i))")
print(" - 最终预测: H(x) = sign(Σ α_t * h_t(x))")
print("\n3. 算法特点:")
print(" - 对噪声敏感")
print(" - 理论保证强")
print(" - 适合弱学习器")
print(" - 可能过拟合")
def adaboost_classification_demo(self):
"""AdaBoost分类演示"""
print("\n=== AdaBoost分类演示 ===")
# 创建数据集
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10,
n_redundant=10, n_clusters_per_class=1, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 不同基学习器的AdaBoost
base_estimators = {
'决策树桩': DecisionTreeClassifier(max_depth=1),
'浅层决策树': DecisionTreeClassifier(max_depth=3),
'朴素贝叶斯': GaussianNB()
}
results = {}
for name, base_est in base_estimators.items():
print(f"\n{name}作为基学习器:")
# AdaBoost
ada_boost = AdaBoostClassifier(
base_estimator=base_est,
n_estimators=50,
learning_rate=1.0,
random_state=42
)
ada_boost.fit(X_train, y_train)
# 评估
train_score = ada_boost.score(X_train, y_train)
test_score = ada_boost.score(X_test, y_test)
results[name] = {
'model': ada_boost,
'train_score': train_score,
'test_score': test_score
}
print(f" 训练准确率: {train_score:.3f}")
print(f" 测试准确率: {test_score:.3f}")
print(f" 过拟合程度: {train_score - test_score:.3f}")
# 学习曲线分析
self.plot_adaboost_learning_curves(X_train, y_train, X_test, y_test)
return results
def plot_adaboost_learning_curves(self, X_train, y_train, X_test, y_test):
"""绘制AdaBoost学习曲线"""
print("\n=== AdaBoost学习曲线 ===")
# 不同n_estimators的性能
n_estimators_range = range(1, 101, 5)
train_scores = []
test_scores = []
for n_est in n_estimators_range:
ada_boost = AdaBoostClassifier(
base_estimator=DecisionTreeClassifier(max_depth=1),
n_estimators=n_est,
random_state=42
)
ada_boost.fit(X_train, y_train)
train_score = ada_boost.score(X_train, y_train)
test_score = ada_boost.score(X_test, y_test)
train_scores.append(train_score)
test_scores.append(test_score)
# 可视化学习曲线
fig, axes = plt.subplots(1, 2, figsize=(15, 5))
# 1. 学习曲线
axes[0].plot(n_estimators_range, train_scores, 'o-', label='训练集', linewidth=2)
axes[0].plot(n_estimators_range, test_scores, 'o-', label='测试集', linewidth=2)
axes[0].set_xlabel('基学习器数量')
axes[0].set_ylabel('准确率')
axes[0].set_title('AdaBoost学习曲线')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 2. 过拟合分析
overfitting = np.array(train_scores) - np.array(test_scores)
axes[1].plot(n_estimators_range, overfitting, 'o-', color='red', linewidth=2)
axes[1].axhline(y=0, color='black', linestyle='--', alpha=0.5)
axes[1].set_xlabel('基学习器数量')
axes[1].set_ylabel('过拟合程度 (训练-测试)')
axes[1].set_title('AdaBoost过拟合分析')
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def adaboost_regression_demo(self):
"""AdaBoost回归演示"""
print("\n=== AdaBoost回归演示 ===")
# 创建回归数据集
X, y = make_regression(n_samples=500, n_features=1, noise=10, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# AdaBoost回归器
ada_regressor = AdaBoostRegressor(
base_estimator=DecisionTreeRegressor(max_depth=3),
n_estimators=50,
learning_rate=1.0,
random_state=42
)
# 训练
ada_regressor.fit(X_train, y_train)
# 预测
train_pred = ada_regressor.predict(X_train)
test_pred = ada_regressor.predict(X_test)
# 评估
train_mse = mean_squared_error(y_train, train_pred)
test_mse = mean_squared_error(y_test, test_pred)
train_r2 = r2_score(y_train, train_pred)
test_r2 = r2_score(y_test, test_pred)
print(f"训练 MSE: {train_mse:.2f}, R²: {train_r2:.3f}")
print(f"测试 MSE: {test_mse:.2f}, R²: {test_r2:.3f}")
# 可视化回归结果
self.visualize_adaboost_regression(X_train, y_train, X_test, y_test, ada_regressor)
return ada_regressor
def visualize_adaboost_regression(self, X_train, y_train, X_test, y_test, ada_regressor):
"""可视化AdaBoost回归结果"""
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
# 创建预测范围
X_range = np.linspace(X_train.min(), X_train.max(), 300).reshape(-1, 1)
# 1. 最终预测结果
final_pred = ada_regressor.predict(X_range)
axes[0].scatter(X_train, y_train, alpha=0.6, label='训练数据')
axes[0].scatter(X_test, y_test, alpha=0.6, color='red', label='测试数据')
axes[0].plot(X_range, final_pred, color='green', linewidth=2, label='AdaBoost预测')
axes[0].set_title('AdaBoost回归结果')
axes[0].set_xlabel('X')
axes[0].set_ylabel('y')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 2. 个体学习器预测
axes[1].scatter(X_train, y_train, alpha=0.6, label='训练数据')
# 显示前10个基学习器的预测
colors = plt.cm.tab10(np.linspace(0, 1, 10))
for i, estimator in enumerate(ada_regressor.estimators_[:10]):
estimator_pred = estimator.predict(X_range)
axes[1].plot(X_range, estimator_pred, color=colors[i], alpha=0.5, linewidth=1)
axes[1].plot(X_range, final_pred, color='red', linewidth=3, label='AdaBoost集成')
axes[1].set_title('个体学习器 vs AdaBoost集成')
axes[1].set_xlabel('X')
axes[1].set_ylabel('y')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
# 3. 学习器权重分布
estimator_weights = ada_regressor.estimator_weights_
axes[2].bar(range(len(estimator_weights)), estimator_weights)
axes[2].set_title('基学习器权重分布')
axes[2].set_xlabel('学习器索引')
axes[2].set_ylabel('权重')
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def adaboost_parameter_analysis(self):
"""AdaBoost参数分析"""
print("\n=== AdaBoost参数分析 ===")
# 创建数据集
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10,
n_redundant=10, n_clusters_per_class=1, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 1. 学习率的影响
print("学习率的影响:")
learning_rates = [0.1, 0.5, 1.0, 1.5, 2.0]
lr_scores = []
for lr in learning_rates:
ada_boost = AdaBoostClassifier(
base_estimator=DecisionTreeClassifier(max_depth=1),
n_estimators=50,
learning_rate=lr,
random_state=42
)
ada_boost.fit(X_train, y_train)
score = ada_boost.score(X_test, y_test)
lr_scores.append(score)
print(f" 学习率={lr:.1f}: 准确率={score:.3f}")
# 2. 基学习器深度的影响
print("\n基学习器深度的影响:")
max_depths = [1, 2, 3, 4, 5]
depth_scores = []
for depth in max_depths:
ada_boost = AdaBoostClassifier(
base_estimator=DecisionTreeClassifier(max_depth=depth),
n_estimators=50,
learning_rate=1.0,
random_state=42
)
ada_boost.fit(X_train, y_train)
score = ada_boost.score(X_test, y_test)
depth_scores.append(score)
print(f" 最大深度={depth}: 准确率={score:.3f}")
# 可视化参数影响
self.visualize_adaboost_parameters(learning_rates, lr_scores, max_depths, depth_scores)
def visualize_adaboost_parameters(self, learning_rates, lr_scores, max_depths, depth_scores):
"""可视化AdaBoost参数影响"""
fig, axes = plt.subplots(1, 2, figsize=(15, 5))
# 1. 学习率影响
axes[0].plot(learning_rates, lr_scores, 'o-', linewidth=2, markersize=8)
axes[0].set_xlabel('学习率')
axes[0].set_ylabel('测试准确率')
axes[0].set_title('学习率对AdaBoost性能的影响')
axes[0].grid(True, alpha=0.3)
# 添加数值标注
for x, y in zip(learning_rates, lr_scores):
axes[0].annotate(f'{y:.3f}', (x, y), textcoords="offset points",
xytext=(0,10), ha='center')
# 2. 基学习器深度影响
axes[1].plot(max_depths, depth_scores, 'o-', linewidth=2, markersize=8)
axes[1].set_xlabel('基学习器最大深度')
axes[1].set_ylabel('测试准确率')
axes[1].set_title('基学习器深度对AdaBoost性能的影响')
axes[1].grid(True, alpha=0.3)
# 添加数值标注
for x, y in zip(max_depths, depth_scores):
axes[1].annotate(f'{y:.3f}', (x, y), textcoords="offset points",
xytext=(0,10), ha='center')
plt.tight_layout()
plt.show()
# 演示AdaBoost算法
print("=== AdaBoost算法演示 ===")
adaboost_demo = AdaBoostDemo()
adaboost_demo.adaboost_theory()
ada_classification_results = adaboost_demo.adaboost_classification_demo()
ada_regressor = adaboost_demo.adaboost_regression_demo()
adaboost_demo.adaboost_parameter_analysis()
7.6 梯度提升树
7.6.1 梯度提升基本原理
梯度提升(Gradient Boosting)是一种更加通用的Boosting方法,通过拟合残差来逐步改进模型。
class GradientBoostingDemo:
def __init__(self):
self.models = {}
def gradient_boosting_theory(self):
"""梯度提升理论介绍"""
print("=== 梯度提升理论 ===")
print("1. 基本思想:")
print(" - 每次拟合前一个模型的残差(负梯度)")
print(" - 通过梯度下降的思想优化损失函数")
print(" - 更加通用,可以处理各种损失函数")
print("\n2. 算法流程:")
print(" - 初始化模型F_0(x)")
print(" - 计算负梯度(残差)")
print(" - 训练弱学习器拟合负梯度")
print(" - 更新模型:F_m(x) = F_{m-1}(x) + γ_m * h_m(x)")
print(" - 重复直到收敛")
print("\n3. 数学表示:")
print(" - 损失函数: L(y, F(x))")
print(" - 负梯度: r_{im} = -[∂L(y_i, F(x_i))/∂F(x_i)]")
print(" - 模型更新: F_m(x) = F_{m-1}(x) + γ_m * h_m(x)")
print("\n4. 主要优势:")
print(" - 灵活的损失函数")
print(" - 强大的预测能力")
print(" - 处理各种数据类型")
print(" - 特征重要性评估")
def gradient_boosting_process_demo(self):
"""梯度提升过程演示"""
print("\n=== 梯度提升过程演示 ===")
# 创建简单的回归数据
np.random.seed(42)
X = np.linspace(0, 10, 100).reshape(-1, 1)
y = np.sin(X).ravel() + np.random.normal(0, 0.3, X.shape[0])
# 手动实现简化的梯度提升过程
n_estimators = 5
learning_rate = 0.1
# 初始化预测(均值)
F = np.full(len(y), np.mean(y))
predictions_history = [F.copy()]
print("梯度提升训练过程:")
print(f"初始预测(均值): {np.mean(y):.3f}")
estimators = []
for i in range(n_estimators):
# 计算残差(负梯度)
residuals = y - F
# 训练弱学习器拟合残差
weak_learner = DecisionTreeRegressor(max_depth=3, random_state=i)
weak_learner.fit(X, residuals)
# 预测残差
residual_pred = weak_learner.predict(X)
# 更新模型
F += learning_rate * residual_pred
estimators.append(weak_learner)
predictions_history.append(F.copy())
# 计算当前MSE
mse = mean_squared_error(y, F)
print(f"第{i+1}轮: MSE = {mse:.3f}")
# 可视化梯度提升过程
self.visualize_gradient_boosting_process(X, y, predictions_history, estimators, learning_rate)
return estimators, predictions_history
def visualize_gradient_boosting_process(self, X, y, predictions_history, estimators, learning_rate):
"""可视化梯度提升过程"""
fig, axes = plt.subplots(2, 3, figsize=(18, 10))
# 创建预测范围
X_range = np.linspace(X.min(), X.max(), 200).reshape(-1, 1)
# 显示前5轮的拟合过程
for i in range(min(5, len(estimators))):
row = i // 3
col = i % 3
# 当前轮次的预测
current_pred = predictions_history[i+1]
axes[row, col].scatter(X, y, alpha=0.6, label='真实数据')
axes[row, col].plot(X, current_pred, 'r-', linewidth=2, label=f'第{i+1}轮预测')
if i > 0:
axes[row, col].plot(X, predictions_history[i], 'g--', alpha=0.7, label=f'第{i}轮预测')
axes[row, col].set_title(f'第{i+1}轮梯度提升')
axes[row, col].set_xlabel('X')
axes[row, col].set_ylabel('y')
axes[row, col].legend()
axes[row, col].grid(True, alpha=0.3)
# 最终结果比较
final_pred = predictions_history[-1]
# 重构完整预测函数
def full_predict(X_pred):
pred = np.full(len(X_pred), np.mean(y))
for estimator in estimators:
pred += learning_rate * estimator.predict(X_pred)
return pred
range_pred = full_predict(X_range)
axes[1, 2].scatter(X, y, alpha=0.6, label='真实数据')
axes[1, 2].plot(X_range, range_pred, 'r-', linewidth=2, label='梯度提升预测')
axes[1, 2].set_title('最终梯度提升结果')
axes[1, 2].set_xlabel('X')
axes[1, 2].set_ylabel('y')
axes[1, 2].legend()
axes[1, 2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def gradient_boosting_classification_demo(self):
"""梯度提升分类演示"""
print("\n=== 梯度提升分类演示 ===")
# 创建数据集
X, y = make_classification(n_samples=1000, n_features=20, n_informative=15,
n_redundant=5, n_clusters_per_class=1, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 梯度提升分类器
gb_classifier = GradientBoostingClassifier(
n_estimators=100,
learning_rate=0.1,
max_depth=3,
random_state=42
)
# 训练
gb_classifier.fit(X_train, y_train)
# 预测
train_pred = gb_classifier.predict(X_train)
test_pred = gb_classifier.predict(X_test)
# 评估
train_score = accuracy_score(y_train, train_pred)
test_score = accuracy_score(y_test, test_pred)
print(f"训练准确率: {train_score:.3f}")
print(f"测试准确率: {test_score:.3f}")
print(f"过拟合程度: {train_score - test_score:.3f}")
# 特征重要性
feature_importance = gb_classifier.feature_importances_
print(f"\n前5个重要特征:")
top_features = np.argsort(feature_importance)[::-1][:5]
for i, idx in enumerate(top_features):
print(f" 特征{idx}: {feature_importance[idx]:.3f}")
# 学习曲线
self.plot_gb_learning_curves(X_train, y_train, X_test, y_test)
return gb_classifier
def plot_gb_learning_curves(self, X_train, y_train, X_test, y_test):
"""绘制梯度提升学习曲线"""
print("\n=== 梯度提升学习曲线 ===")
# 不同参数的比较
learning_rates = [0.01, 0.1, 0.2]
max_depths = [3, 5, 7]
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 1. 不同学习率的学习曲线
for lr in learning_rates:
gb = GradientBoostingClassifier(
n_estimators=100,
learning_rate=lr,
max_depth=3,
random_state=42
)
gb.fit(X_train, y_train)
# 获取训练过程中的分数
train_scores = []
test_scores = []
for i, pred in enumerate(gb.staged_predict(X_train)):
train_scores.append(accuracy_score(y_train, pred))
for i, pred in enumerate(gb.staged_predict(X_test)):
test_scores.append(accuracy_score(y_test, pred))
axes[0, 0].plot(train_scores, label=f'训练-lr={lr}', alpha=0.7)
axes[0, 1].plot(test_scores, label=f'测试-lr={lr}', alpha=0.7)
axes[0, 0].set_title('训练集学习曲线(不同学习率)')
axes[0, 0].set_xlabel('迭代次数')
axes[0, 0].set_ylabel('准确率')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
axes[0, 1].set_title('测试集学习曲线(不同学习率)')
axes[0, 1].set_xlabel('迭代次数')
axes[0, 1].set_ylabel('准确率')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# 2. 不同最大深度的比较
depth_train_scores = []
depth_test_scores = []
for depth in max_depths:
gb = GradientBoostingClassifier(
n_estimators=100,
learning_rate=0.1,
max_depth=depth,
random_state=42
)
gb.fit(X_train, y_train)
train_score = gb.score(X_train, y_train)
test_score = gb.score(X_test, y_test)
depth_train_scores.append(train_score)
depth_test_scores.append(test_score)
x_pos = np.arange(len(max_depths))
width = 0.35
axes[1, 0].bar(x_pos - width/2, depth_train_scores, width, label='训练集', alpha=0.7)
axes[1, 0].bar(x_pos + width/2, depth_test_scores, width, label='测试集', alpha=0.7)
axes[1, 0].set_xlabel('最大深度')
axes[1, 0].set_ylabel('准确率')
axes[1, 0].set_title('不同最大深度的性能比较')
axes[1, 0].set_xticks(x_pos)
axes[1, 0].set_xticklabels(max_depths)
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
# 3. 过拟合分析
overfitting = np.array(depth_train_scores) - np.array(depth_test_scores)
axes[1, 1].bar(max_depths, overfitting, alpha=0.7, color='red')
axes[1, 1].set_xlabel('最大深度')
axes[1, 1].set_ylabel('过拟合程度')
axes[1, 1].set_title('过拟合程度分析')
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def gradient_boosting_regression_demo(self):
"""梯度提升回归演示"""
print("\n=== 梯度提升回归演示 ===")
# 创建回归数据集
X, y = make_regression(n_samples=500, n_features=10, noise=10, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 梯度提升回归器
gb_regressor = GradientBoostingRegressor(
n_estimators=100,
learning_rate=0.1,
max_depth=3,
random_state=42
)
# 训练
gb_regressor.fit(X_train, y_train)
# 预测
train_pred = gb_regressor.predict(X_train)
test_pred = gb_regressor.predict(X_test)
# 评估
train_mse = mean_squared_error(y_train, train_pred)
test_mse = mean_squared_error(y_test, test_pred)
train_r2 = r2_score(y_train, train_pred)
test_r2 = r2_score(y_test, test_pred)
print(f"训练 MSE: {train_mse:.2f}, R²: {train_r2:.3f}")
print(f"测试 MSE: {test_mse:.2f}, R²: {test_r2:.3f}")
# 残差分析
self.analyze_gb_residuals(y_test, test_pred)
return gb_regressor
def analyze_gb_residuals(self, y_true, y_pred):
"""分析梯度提升回归残差"""
residuals = y_true - y_pred
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
# 1. 预测值 vs 真实值
axes[0].scatter(y_true, y_pred, alpha=0.6)
axes[0].plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], 'r--', lw=2)
axes[0].set_xlabel('真实值')
axes[0].set_ylabel('预测值')
axes[0].set_title('预测值 vs 真实值')
axes[0].grid(True, alpha=0.3)
# 2. 残差分布
axes[1].hist(residuals, bins=30, alpha=0.7, edgecolor='black')
axes[1].axvline(x=0, color='red', linestyle='--', linewidth=2)
axes[1].set_xlabel('残差')
axes[1].set_ylabel('频数')
axes[1].set_title('残差分布')
axes[1].grid(True, alpha=0.3)
# 3. 残差 vs 预测值
axes[2].scatter(y_pred, residuals, alpha=0.6)
axes[2].axhline(y=0, color='red', linestyle='--', linewidth=2)
axes[2].set_xlabel('预测值')
axes[2].set_ylabel('残差')
axes[2].set_title('残差 vs 预测值')
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 残差统计
print(f"残差均值: {np.mean(residuals):.3f}")
print(f"残差标准差: {np.std(residuals):.3f}")
print(f"残差范围: [{np.min(residuals):.2f}, {np.max(residuals):.2f}]")
# 演示梯度提升树
print("=== 梯度提升树演示 ===")
gb_demo = GradientBoostingDemo()
gb_demo.gradient_boosting_theory()
estimators, predictions = gb_demo.gradient_boosting_process_demo()
gb_classifier = gb_demo.gradient_boosting_classification_demo()
gb_regressor = gb_demo.gradient_boosting_regression_demo()
7.7 集成方法综合比较
7.7.1 不同集成方法的特点对比
class EnsembleComparison:
def __init__(self):
self.models = {}
def ensemble_methods_comparison(self):
"""集成方法理论比较"""
print("=== 集成方法综合比较 ===")
comparison_data = {
'方法': ['Bagging', 'Random Forest', 'AdaBoost', 'Gradient Boosting'],
'训练方式': ['并行', '并行', '串行', '串行'],
'主要目标': ['降低方差', '降低方差', '降低偏差', '降低偏差'],
'基学习器': ['任意', '决策树', '弱学习器', '弱学习器'],
'样本权重': ['均匀', '均匀', '自适应', '无'],
'特征选择': ['全部', '随机子集', '全部', '全部'],
'过拟合倾向': ['低', '低', '中等', '高'],
'噪声敏感性': ['低', '低', '高', '中等'],
'解释性': ['中等', '高', '低', '中等'],
'计算复杂度': ['低', '中等', '中等', '高']
}
import pandas as pd
df = pd.DataFrame(comparison_data)
print(df.to_string(index=False))
print("\n=== 各方法适用场景 ===")
scenarios = {
'Bagging': [
'数据集较大,计算资源充足',
'希望降低模型方差',
'基学习器容易过拟合',
'需要并行训练'
],
'Random Forest': [
'特征维度较高',
'需要特征重要性评估',
'数据包含噪声',
'需要良好的泛化性能'
],
'AdaBoost': [
'基学习器性能较弱',
'数据质量较好(少噪声)',
'二分类问题',
'需要理论保证'
],
'Gradient Boosting': [
'追求最高预测精度',
'特征工程充分',
'有足够的调参时间',
'数据集中等规模'
]
}
for method, uses in scenarios.items():
print(f"\n{method}适用场景:")
for use in uses:
print(f" • {use}")
def comprehensive_performance_comparison(self):
"""综合性能比较"""
print("\n=== 综合性能比较 ===")
# 创建多个不同特性的数据集
datasets = {
'小样本高维': make_classification(n_samples=200, n_features=50, n_informative=30,
n_redundant=20, random_state=42),
'大样本低维': make_classification(n_samples=2000, n_features=10, n_informative=8,
n_redundant=2, random_state=42),
'不平衡数据': make_classification(n_samples=1000, n_features=20, n_informative=15,
weights=[0.9, 0.1], random_state=42),
'含噪声数据': self.create_noisy_dataset()
}
# 定义集成方法
ensemble_methods = {
'Bagging': BaggingClassifier(n_estimators=50, random_state=42),
'Random Forest': RandomForestClassifier(n_estimators=50, random_state=42),
'AdaBoost': AdaBoostClassifier(n_estimators=50, random_state=42),
'Gradient Boosting': GradientBoostingClassifier(n_estimators=50, random_state=42)
}
# 性能比较结果
results = {}
for dataset_name, (X, y) in datasets.items():
print(f"\n{dataset_name}数据集结果:")
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
dataset_results = {}
for method_name, model in ensemble_methods.items():
# 训练和评估
start_time = time.time()
model.fit(X_train, y_train)
train_time = time.time() - start_time
# 预测
y_pred = model.predict(X_test)
y_proba = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else None
# 评估指标
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average='weighted')
recall = recall_score(y_test, y_pred, average='weighted')
f1 = f1_score(y_test, y_pred, average='weighted')
dataset_results[method_name] = {
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1': f1,
'train_time': train_time
}
print(f" {method_name:15} - 准确率: {accuracy:.3f}, F1: {f1:.3f}, 训练时间: {train_time:.2f}s")
results[dataset_name] = dataset_results
# 可视化比较结果
self.visualize_performance_comparison(results)
return results
def create_noisy_dataset(self):
"""创建含噪声的数据集"""
X, y = make_classification(n_samples=1000, n_features=20, n_informative=15,
n_redundant=5, random_state=42)
# 添加噪声特征
noise_features = np.random.randn(X.shape[0], 10)
X = np.hstack([X, noise_features])
# 添加标签噪声
noise_indices = np.random.choice(len(y), int(0.1 * len(y)), replace=False)
y[noise_indices] = 1 - y[noise_indices]
return X, y
def visualize_performance_comparison(self, results):
"""可视化性能比较结果"""
import pandas as pd
# 准备数据
methods = list(results[list(results.keys())[0]].keys())
datasets = list(results.keys())
metrics = ['accuracy', 'f1', 'train_time']
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
# 1. 准确率比较
accuracy_data = []
for dataset in datasets:
for method in methods:
accuracy_data.append({
'Dataset': dataset,
'Method': method,
'Accuracy': results[dataset][method]['accuracy']
})
df_accuracy = pd.DataFrame(accuracy_data)
pivot_accuracy = df_accuracy.pivot(index='Dataset', columns='Method', values='Accuracy')
pivot_accuracy.plot(kind='bar', ax=axes[0, 0], width=0.8)
axes[0, 0].set_title('不同数据集上的准确率比较')
axes[0, 0].set_ylabel('准确率')
axes[0, 0].legend(bbox_to_anchor=(1.05, 1), loc='upper left')
axes[0, 0].grid(True, alpha=0.3)
# 2. F1分数比较
f1_data = []
for dataset in datasets:
for method in methods:
f1_data.append({
'Dataset': dataset,
'Method': method,
'F1': results[dataset][method]['f1']
})
df_f1 = pd.DataFrame(f1_data)
pivot_f1 = df_f1.pivot(index='Dataset', columns='Method', values='F1')
pivot_f1.plot(kind='bar', ax=axes[0, 1], width=0.8)
axes[0, 1].set_title('不同数据集上的F1分数比较')
axes[0, 1].set_ylabel('F1分数')
axes[0, 1].legend(bbox_to_anchor=(1.05, 1), loc='upper left')
axes[0, 1].grid(True, alpha=0.3)
# 3. 训练时间比较
time_data = []
for dataset in datasets:
for method in methods:
time_data.append({
'Dataset': dataset,
'Method': method,
'Time': results[dataset][method]['train_time']
})
df_time = pd.DataFrame(time_data)
pivot_time = df_time.pivot(index='Dataset', columns='Method', values='Time')
pivot_time.plot(kind='bar', ax=axes[1, 0], width=0.8)
axes[1, 0].set_title('不同数据集上的训练时间比较')
axes[1, 0].set_ylabel('训练时间 (秒)')
axes[1, 0].legend(bbox_to_anchor=(1.05, 1), loc='upper left')
axes[1, 0].grid(True, alpha=0.3)
# 4. 综合性能雷达图
avg_metrics = {}
for method in methods:
avg_accuracy = np.mean([results[dataset][method]['accuracy'] for dataset in datasets])
avg_f1 = np.mean([results[dataset][method]['f1'] for dataset in datasets])
avg_time = np.mean([results[dataset][method]['train_time'] for dataset in datasets])
# 标准化时间(越小越好,所以取倒数)
normalized_time = 1 / (1 + avg_time)
avg_metrics[method] = [avg_accuracy, avg_f1, normalized_time]
# 雷达图
angles = np.linspace(0, 2 * np.pi, 3, endpoint=False).tolist()
angles += angles[:1] # 闭合
ax_radar = plt.subplot(2, 2, 4, projection='polar')
for method, values in avg_metrics.items():
values += values[:1] # 闭合
ax_radar.plot(angles, values, 'o-', linewidth=2, label=method)
ax_radar.fill(angles, values, alpha=0.25)
ax_radar.set_xticks(angles[:-1])
ax_radar.set_xticklabels(['准确率', 'F1分数', '效率'])
ax_radar.set_ylim(0, 1)
ax_radar.set_title('综合性能雷达图')
ax_radar.legend(bbox_to_anchor=(1.3, 1.0))
plt.tight_layout()
plt.show()
def ensemble_selection_guide(self):
"""集成方法选择指南"""
print("\n=== 集成方法选择指南 ===")
decision_tree = {
"数据规模": {
"小数据集 (<1000样本)": {
"推荐": ["Random Forest", "Bagging"],
"原因": "避免过拟合,提供稳定性"
},
"中等数据集 (1000-10000样本)": {
"推荐": ["Random Forest", "Gradient Boosting"],
"原因": "平衡性能和计算成本"
},
"大数据集 (>10000样本)": {
"推荐": ["Random Forest", "Gradient Boosting"],
"原因": "充分利用数据,获得最佳性能"
}
},
"特征维度": {
"低维 (<20特征)": {
"推荐": ["AdaBoost", "Gradient Boosting"],
"原因": "特征充分,可以深度学习"
},
"中维 (20-100特征)": {
"推荐": ["Random Forest", "Gradient Boosting"],
"原因": "特征选择和深度学习并重"
},
"高维 (>100特征)": {
"推荐": ["Random Forest", "Bagging"],
"原因": "特征选择更重要"
}
},
"数据质量": {
"高质量(少噪声)": {
"推荐": ["AdaBoost", "Gradient Boosting"],
"原因": "可以充分利用数据信息"
},
"中等质量": {
"推荐": ["Random Forest", "Bagging"],
"原因": "对噪声有一定鲁棒性"
},
"低质量(多噪声)": {
"推荐": ["Random Forest", "Bagging"],
"原因": "通过平均减少噪声影响"
}
},
"计算资源": {
"有限": {
"推荐": ["Random Forest", "Bagging"],
"原因": "可并行训练,效率高"
},
"充足": {
"推荐": ["Gradient Boosting", "AdaBoost"],
"原因": "可以进行精细调参"
}
}
}
for category, subcategories in decision_tree.items():
print(f"\n{category}:")
for condition, recommendation in subcategories.items():
print(f" {condition}:")
print(f" 推荐方法: {', '.join(recommendation['推荐'])}")
print(f" 选择原因: {recommendation['原因']}")
# 演示集成方法比较
print("=== 集成方法综合比较 ===")
import time
from sklearn.metrics import precision_score, recall_score, f1_score
comparison = EnsembleComparison()
comparison.ensemble_methods_comparison()
performance_results = comparison.comprehensive_performance_comparison()
comparison.ensemble_selection_guide()
7.8 综合案例:电商用户行为预测
7.8.1 项目背景与目标
在这个综合案例中,我们将使用多种集成学习方法来预测电商用户的购买行为,比较不同方法的性能。
class EcommercePredictionCase:
def __init__(self):
self.data = None
self.models = {}
self.results = {}
def create_ecommerce_dataset(self):
"""创建电商用户行为数据集"""
print("=== 创建电商用户行为数据集 ===")
np.random.seed(42)
n_samples = 5000
# 用户基本特征
age = np.random.normal(35, 12, n_samples)
age = np.clip(age, 18, 70)
income = np.random.lognormal(10, 0.5, n_samples)
income = np.clip(income, 20000, 200000)
# 行为特征
page_views = np.random.poisson(15, n_samples)
session_duration = np.random.exponential(20, n_samples) # 分钟
previous_purchases = np.random.poisson(3, n_samples)
# 时间特征
hour_of_day = np.random.randint(0, 24, n_samples)
day_of_week = np.random.randint(0, 7, n_samples)
# 商品特征
product_price = np.random.lognormal(4, 1, n_samples)
product_rating = np.random.beta(8, 2, n_samples) * 5
discount_rate = np.random.beta(2, 5, n_samples)
# 设备和渠道特征
is_mobile = np.random.binomial(1, 0.6, n_samples)
is_returning_user = np.random.binomial(1, 0.4, n_samples)
traffic_source = np.random.choice([0, 1, 2, 3], n_samples, p=[0.4, 0.3, 0.2, 0.1])
# 构建特征矩阵
X = np.column_stack([
age, income, page_views, session_duration, previous_purchases,
hour_of_day, day_of_week, product_price, product_rating, discount_rate,
is_mobile, is_returning_user, traffic_source
])
# 生成目标变量(购买概率)
# 复杂的非线性关系
purchase_prob = (
0.1 * (age - 30) / 20 +
0.15 * np.log(income / 50000) +
0.2 * np.tanh(page_views / 10) +
0.1 * np.tanh(session_duration / 30) +
0.25 * np.tanh(previous_purchases / 5) +
0.1 * (product_rating - 2.5) / 2.5 +
0.15 * discount_rate +
0.1 * is_returning_user +
-0.05 * is_mobile +
np.random.normal(0, 0.1, n_samples)
)
# 转换为二分类标签
y = (purchase_prob > np.median(purchase_prob)).astype(int)
# 特征名称
feature_names = [
'age', 'income', 'page_views', 'session_duration', 'previous_purchases',
'hour_of_day', 'day_of_week', 'product_price', 'product_rating', 'discount_rate',
'is_mobile', 'is_returning_user', 'traffic_source'
]
print(f"数据集大小: {X.shape}")
print(f"正样本比例: {np.mean(y):.3f}")
print(f"特征数量: {len(feature_names)}")
self.data = {
'X': X,
'y': y,
'feature_names': feature_names
}
# 数据探索性分析
self.exploratory_data_analysis()
return X, y, feature_names
def exploratory_data_analysis(self):
"""探索性数据分析"""
print("\n=== 探索性数据分析 ===")
X, y = self.data['X'], self.data['y']
feature_names = self.data['feature_names']
# 基本统计信息
print("特征统计信息:")
for i, name in enumerate(feature_names):
print(f" {name:20}: 均值={X[:, i].mean():.2f}, 标准差={X[:, i].std():.2f}")
# 可视化关键特征分布
fig, axes = plt.subplots(2, 3, figsize=(18, 10))
axes = axes.ravel()
key_features = [0, 1, 2, 3, 4, 8] # age, income, page_views, session_duration, previous_purchases, product_rating
for i, feature_idx in enumerate(key_features):
feature_name = feature_names[feature_idx]
# 按购买行为分组
purchase_data = X[y == 1, feature_idx]
no_purchase_data = X[y == 0, feature_idx]
axes[i].hist(no_purchase_data, alpha=0.7, label='未购买', bins=30, density=True)
axes[i].hist(purchase_data, alpha=0.7, label='购买', bins=30, density=True)
axes[i].set_title(f'{feature_name}分布')
axes[i].set_xlabel(feature_name)
axes[i].set_ylabel('密度')
axes[i].legend()
axes[i].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def data_preprocessing(self):
"""数据预处理"""
print("\n=== 数据预处理 ===")
X, y = self.data['X'], self.data['y']
# 划分训练测试集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# 特征标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
print(f"训练集大小: {X_train.shape}")
print(f"测试集大小: {X_test.shape}")
print(f"训练集正样本比例: {np.mean(y_train):.3f}")
print(f"测试集正样本比例: {np.mean(y_test):.3f}")
return X_train_scaled, X_test_scaled, y_train, y_test, scaler
def build_ensemble_models(self, X_train, y_train):
"""构建多种集成模型"""
print("\n=== 构建集成模型 ===")
# 定义集成模型
models = {
'Bagging': BaggingClassifier(
base_estimator=DecisionTreeClassifier(max_depth=10),
n_estimators=100,
random_state=42
),
'Random Forest': RandomForestClassifier(
n_estimators=100,
max_depth=10,
min_samples_split=5,
min_samples_leaf=2,
random_state=42
),
'Extra Trees': ExtraTreesClassifier(
n_estimators=100,
max_depth=10,
min_samples_split=5,
min_samples_leaf=2,
random_state=42
),
'AdaBoost': AdaBoostClassifier(
base_estimator=DecisionTreeClassifier(max_depth=3),
n_estimators=100,
learning_rate=1.0,
random_state=42
),
'Gradient Boosting': GradientBoostingClassifier(
n_estimators=100,
learning_rate=0.1,
max_depth=6,
min_samples_split=5,
min_samples_leaf=2,
random_state=42
)
}
# 训练模型
trained_models = {}
for name, model in models.items():
print(f"训练 {name}...")
start_time = time.time()
model.fit(X_train, y_train)
train_time = time.time() - start_time
trained_models[name] = {
'model': model,
'train_time': train_time
}
print(f" 训练时间: {train_time:.2f}秒")
self.models = trained_models
return trained_models
def evaluate_models(self, X_test, y_test):
"""评估模型性能"""
print("\n=== 模型性能评估 ===")
evaluation_results = {}
for name, model_info in self.models.items():
model = model_info['model']
# 预测
y_pred = model.predict(X_test)
y_proba = model.predict_proba(X_test)[:, 1]
# 计算评估指标
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
auc = roc_auc_score(y_test, y_proba)
evaluation_results[name] = {
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1': f1,
'auc': auc,
'train_time': model_info['train_time'],
'y_pred': y_pred,
'y_proba': y_proba
}
print(f"{name:20} - 准确率: {accuracy:.3f}, 精确率: {precision:.3f}, "
f"召回率: {recall:.3f}, F1: {f1:.3f}, AUC: {auc:.3f}")
self.results = evaluation_results
# 可视化评估结果
self.visualize_evaluation_results(y_test)
return evaluation_results
def visualize_evaluation_results(self, y_test):
"""可视化评估结果"""
fig, axes = plt.subplots(2, 3, figsize=(20, 12))
# 1. 性能指标比较
metrics = ['accuracy', 'precision', 'recall', 'f1', 'auc']
model_names = list(self.results.keys())
metric_data = {metric: [self.results[model][metric] for model in model_names]
for metric in metrics}
x = np.arange(len(model_names))
width = 0.15
for i, metric in enumerate(metrics):
axes[0, 0].bar(x + i * width, metric_data[metric], width, label=metric)
axes[0, 0].set_xlabel('模型')
axes[0, 0].set_ylabel('分数')
axes[0, 0].set_title('性能指标比较')
axes[0, 0].set_xticks(x + width * 2)
axes[0, 0].set_xticklabels(model_names, rotation=45)
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 2. ROC曲线
for name, results in self.results.items():
fpr, tpr, _ = roc_curve(y_test, results['y_proba'])
auc_score = results['auc']
axes[0, 1].plot(fpr, tpr, label=f'{name} (AUC={auc_score:.3f})')
axes[0, 1].plot([0, 1], [0, 1], 'k--', alpha=0.5)
axes[0, 1].set_xlabel('假正率')
axes[0, 1].set_ylabel('真正率')
axes[0, 1].set_title('ROC曲线比较')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# 3. 精确率-召回率曲线
for name, results in self.results.items():
precision_curve, recall_curve, _ = precision_recall_curve(y_test, results['y_proba'])
axes[0, 2].plot(recall_curve, precision_curve, label=name)
axes[0, 2].set_xlabel('召回率')
axes[0, 2].set_ylabel('精确率')
axes[0, 2].set_title('精确率-召回率曲线')
axes[0, 2].legend()
axes[0, 2].grid(True, alpha=0.3)
# 4. 训练时间比较
train_times = [self.results[model]['train_time'] for model in model_names]
axes[1, 0].bar(model_names, train_times)
axes[1, 0].set_xlabel('模型')
axes[1, 0].set_ylabel('训练时间 (秒)')
axes[1, 0].set_title('训练时间比较')
axes[1, 0].tick_params(axis='x', rotation=45)
axes[1, 0].grid(True, alpha=0.3)
# 5. 混淆矩阵(选择最佳模型)
best_model = max(self.results.keys(), key=lambda x: self.results[x]['f1'])
cm = confusion_matrix(y_test, self.results[best_model]['y_pred'])
im = axes[1, 1].imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
axes[1, 1].set_title(f'{best_model} 混淆矩阵')
# 添加数值标注
thresh = cm.max() / 2.
for i in range(cm.shape[0]):
for j in range(cm.shape[1]):
axes[1, 1].text(j, i, format(cm[i, j], 'd'),
ha="center", va="center",
color="white" if cm[i, j] > thresh else "black")
axes[1, 1].set_xlabel('预测标签')
axes[1, 1].set_ylabel('真实标签')
# 6. 特征重要性(Random Forest)
if 'Random Forest' in self.results:
rf_model = self.models['Random Forest']['model']
feature_importance = rf_model.feature_importances_
feature_names = self.data['feature_names']
# 排序
indices = np.argsort(feature_importance)[::-1][:10]
axes[1, 2].bar(range(len(indices)), feature_importance[indices])
axes[1, 2].set_xlabel('特征')
axes[1, 2].set_ylabel('重要性')
axes[1, 2].set_title('特征重要性 (Random Forest)')
axes[1, 2].set_xticks(range(len(indices)))
axes[1, 2].set_xticklabels([feature_names[i] for i in indices], rotation=45)
axes[1, 2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def model_interpretation(self):
"""模型解释与业务洞察"""
print("\n=== 模型解释与业务洞察 ===")
# 特征重要性分析
print("1. 特征重要性分析:")
if 'Random Forest' in self.models:
rf_model = self.models['Random Forest']['model']
feature_importance = rf_model.feature_importances_
feature_names = self.data['feature_names']
# 排序并显示
importance_pairs = list(zip(feature_names, feature_importance))
importance_pairs.sort(key=lambda x: x[1], reverse=True)
print(" Random Forest特征重要性排序:")
for i, (name, importance) in enumerate(importance_pairs[:10]):
print(f" {i+1:2d}. {name:20}: {importance:.3f}")
# 业务洞察
print("\n2. 业务洞察:")
insights = [
"历史购买次数是最重要的预测因子,说明用户忠诚度很关键",
"商品评分对购买决策有重要影响,需要重视商品质量",
"折扣率是重要因素,合理的促销策略能提高转化率",
"用户年龄和收入水平影响购买行为,需要精准定位目标用户",
"页面浏览数和会话时长反映用户兴趣度,可用于实时推荐"
]
for i, insight in enumerate(insights, 1):
print(f" {i}. {insight}")
# 模型选择建议
print("\n3. 模型选择建议:")
best_accuracy = max(self.results.keys(), key=lambda x: self.results[x]['accuracy'])
best_f1 = max(self.results.keys(), key=lambda x: self.results[x]['f1'])
best_auc = max(self.results.keys(), key=lambda x: self.results[x]['auc'])
fastest = min(self.results.keys(), key=lambda x: self.results[x]['train_time'])
print(f" 最高准确率: {best_accuracy} ({self.results[best_accuracy]['accuracy']:.3f})")
print(f" 最高F1分数: {best_f1} ({self.results[best_f1]['f1']:.3f})")
print(f" 最高AUC: {best_auc} ({self.results[best_auc]['auc']:.3f})")
print(f" 训练最快: {fastest} ({self.results[fastest]['train_time']:.2f}秒)")
print(f"\n 综合推荐: {best_f1} (平衡了精确率和召回率)")
# 运行电商用户行为预测案例
print("=== 电商用户行为预测综合案例 ===")
from sklearn.metrics import roc_curve, precision_recall_curve, roc_auc_score, confusion_matrix
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import ExtraTreesClassifier
case = EcommercePredictionCase()
X, y, feature_names = case.create_ecommerce_dataset()
X_train, X_test, y_train, y_test, scaler = case.data_preprocessing()
trained_models = case.build_ensemble_models(X_train, y_train)
evaluation_results = case.evaluate_models(X_test, y_test)
case.model_interpretation()
7.9 本章小结
7.9.1 集成学习核心概念回顾
本章深入探讨了集成学习的核心思想和主要方法:
1. 集成学习基础 - 集成学习通过组合多个学习器来提高整体性能 - 核心在于利用个体学习器的多样性 - 主要分为Bagging和Boosting两大类方法
2. Bagging方法 - 通过Bootstrap采样创建多样性 - 并行训练,主要降低方差 - 代表算法:Bagging、Random Forest
3. Boosting方法 - 串行训练,后续学习器关注前面学习器的错误 - 主要降低偏差,提高模型复杂度 - 代表算法:AdaBoost、Gradient Boosting
4. 随机森林 - 结合Bagging和随机特征选择 - 具有良好的泛化性能和特征重要性评估能力 - 对噪声和过拟合有较强的鲁棒性
5. 梯度提升树 - 通过拟合残差逐步改进模型 - 灵活的损失函数,强大的预测能力 - 需要仔细调参以避免过拟合
7.9.2 算法选择指导
数据特征导向的选择: - 小数据集:Random Forest、Bagging(避免过拟合) - 大数据集:Gradient Boosting、Random Forest(充分利用数据) - 高维数据:Random Forest(特征选择能力强) - 含噪声数据:Random Forest、Bagging(鲁棒性好) - 高质量数据:Gradient Boosting、AdaBoost(精度优先)
性能需求导向的选择: - 追求最高精度:Gradient Boosting - 需要快速训练:Random Forest、Bagging - 需要模型解释:Random Forest(特征重要性) - 内存受限:AdaBoost、单个Random Forest
7.9.3 最佳实践
1. 数据预处理 - 处理缺失值和异常值 - 适当的特征工程 - 对于Boosting方法,数据质量尤为重要
2. 超参数调优 - 使用交叉验证选择最优参数 - 重点关注:n_estimators、learning_rate、max_depth - 平衡模型复杂度和泛化能力
3. 模型评估 - 使用多种评估指标(准确率、精确率、召回率、F1、AUC) - 关注训练时间和预测时间 - 进行残差分析和特征重要性分析
4. 实际应用考虑 - 考虑模型的可解释性需求 - 评估计算资源限制 - 考虑在线学习和模型更新需求
7.9.4 常见陷阱
1. 过拟合问题 - Boosting方法容易过拟合,需要早停或正则化 - 监控验证集性能,避免过度训练
2. 参数设置不当 - 学习率过高导致不收敛 - 基学习器过于复杂导致过拟合 - n_estimators设置不合理
3. 数据质量忽视 - 对于AdaBoost,噪声数据影响很大 - 特征工程不充分影响所有集成方法
4. 评估偏差 - 仅关注单一指标 - 忽视计算成本 - 缺乏业务场景考虑
7.9.5 进阶学习方向
1. 高级集成方法 - XGBoost、LightGBM、CatBoost - Stacking和Blending - 多层集成架构
2. 深度学习中的集成 - 深度森林 - 神经网络集成 - 知识蒸馏
3. 在线集成学习 - 增量学习 - 概念漂移处理 - 自适应集成
4. 大规模集成学习 - 分布式训练 - 模型压缩 - 边缘计算部署
7.9.6 练习题
理论题: 1. 解释Bagging和Boosting的核心区别,并分析各自的优缺点 2. 为什么Random Forest能够提供特征重要性评估?其计算原理是什么? 3. AdaBoost中样本权重更新的数学原理是什么?为什么这样更新? 4. 梯度提升树如何选择损失函数?不同损失函数适用于什么场景?
实践题: 1. 使用本章学到的方法,在一个真实数据集上比较不同集成方法的性能 2. 实现一个简化版的AdaBoost算法,并与sklearn版本对比 3. 分析Random Forest中n_estimators和max_features参数对性能的影响 4. 设计一个集成方法选择的决策流程,考虑数据特征、性能需求和计算资源
项目题: 1. 选择一个实际业务场景,设计完整的集成学习解决方案 2. 实现一个自动化的集成方法选择系统 3. 比较集成学习与深度学习在特定任务上的性能差异
第7章完结
集成学习是机器学习中最实用和有效的方法之一。通过本章的学习,你应该掌握了主要集成方法的原理、实现和应用。在实际项目中,集成学习往往能够显著提升模型性能,是数据科学家必备的重要技能。
下一章我们将学习特征工程,这是提升模型性能的另一个重要方面。