6.1 集成学习概述
6.1.1 什么是集成学习
集成学习(Ensemble Learning)是机器学习中的一种重要方法,通过组合多个学习器来完成学习任务。集成学习的基本思想是”三个臭皮匠,顶个诸葛亮”,即通过合理地组合多个弱学习器,可以获得比单个强学习器更好的性能。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import make_classification, make_regression, load_iris, load_boston
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.ensemble import (
RandomForestClassifier, RandomForestRegressor,
GradientBoostingClassifier, GradientBoostingRegressor,
AdaBoostClassifier, AdaBoostRegressor,
VotingClassifier, VotingRegressor,
BaggingClassifier, BaggingRegressor,
ExtraTreesClassifier, ExtraTreesRegressor
)
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.svm import SVC, SVR
from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor
from sklearn.naive_bayes import GaussianNB
from sklearn.metrics import (
accuracy_score, classification_report, confusion_matrix,
mean_squared_error, r2_score, mean_absolute_error
)
from sklearn.preprocessing import StandardScaler
from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin
import warnings
warnings.filterwarnings('ignore')
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
class EnsembleLearningDemo:
"""
集成学习演示类
"""
def __init__(self):
self.models = {}
self.results = {}
def demonstrate_ensemble_concept(self):
"""
演示集成学习的基本概念
"""
# 创建分类数据集
X, y = make_classification(
n_samples=1000,
n_features=20,
n_informative=15,
n_redundant=5,
n_clusters_per_class=1,
random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# 单个学习器
individual_models = {
'Decision Tree': DecisionTreeClassifier(random_state=42),
'Logistic Regression': LogisticRegression(random_state=42, max_iter=1000),
'SVM': SVC(probability=True, random_state=42),
'KNN': KNeighborsClassifier(),
'Naive Bayes': GaussianNB()
}
# 训练单个模型并记录性能
individual_scores = []
individual_predictions = []
print("单个学习器性能:")
print("=" * 30)
for name, model in individual_models.items():
model.fit(X_train, y_train)
score = model.score(X_test, y_test)
pred = model.predict(X_test)
individual_scores.append(score)
individual_predictions.append(pred)
print(f"{name}: {score:.4f}")
# 简单投票集成
ensemble_pred = np.array(individual_predictions).T
majority_vote = np.apply_along_axis(
lambda x: np.bincount(x).argmax(), axis=1, arr=ensemble_pred
)
ensemble_score = accuracy_score(y_test, majority_vote)
print(f"\n集成学习性能:")
print("=" * 30)
print(f"简单投票集成: {ensemble_score:.4f}")
print(f"平均单模型性能: {np.mean(individual_scores):.4f}")
print(f"最佳单模型性能: {np.max(individual_scores):.4f}")
# 可视化比较
self.visualize_ensemble_comparison(
individual_scores, ensemble_score, list(individual_models.keys())
)
return individual_scores, ensemble_score
def visualize_ensemble_comparison(self, individual_scores, ensemble_score, model_names):
"""
可视化集成学习与单个学习器的比较
"""
plt.figure(figsize=(12, 6))
# 性能比较柱状图
plt.subplot(1, 2, 1)
colors = ['lightblue'] * len(individual_scores) + ['red']
all_scores = individual_scores + [ensemble_score]
all_names = model_names + ['集成模型']
bars = plt.bar(range(len(all_scores)), all_scores, color=colors, alpha=0.7)
plt.xlabel('模型')
plt.ylabel('准确率')
plt.title('单个学习器 vs 集成学习')
plt.xticks(range(len(all_names)), all_names, rotation=45)
# 添加数值标签
for i, (bar, score) in enumerate(zip(bars, all_scores)):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
f'{score:.3f}', ha='center', va='bottom')
plt.grid(True, alpha=0.3)
# 性能分布箱线图
plt.subplot(1, 2, 2)
plt.boxplot([individual_scores], labels=['单个学习器'])
plt.axhline(y=ensemble_score, color='red', linestyle='--',
label=f'集成模型 ({ensemble_score:.3f})')
plt.ylabel('准确率')
plt.title('性能分布比较')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def ensemble_types_overview(self):
"""
集成学习类型概述
"""
ensemble_types = {
"Bagging (Bootstrap Aggregating)": {
"原理": "通过Bootstrap采样创建多个训练集,训练多个模型,最后平均预测结果",
"代表算法": ["Random Forest", "Extra Trees"],
"优点": ["减少方差", "并行训练", "对噪声鲁棒"],
"缺点": ["可能增加偏差", "模型解释性降低"],
"适用场景": "高方差的强学习器(如决策树)"
},
"Boosting": {
"原理": "串行训练多个弱学习器,每个学习器关注前一个学习器的错误",
"代表算法": ["AdaBoost", "Gradient Boosting", "XGBoost"],
"优点": ["减少偏差", "提高弱学习器性能"],
"缺点": ["容易过拟合", "对噪声敏感", "串行训练"],
"适用场景": "高偏差的弱学习器"
},
"Stacking": {
"原理": "使用元学习器学习如何组合基学习器的预测",
"代表算法": ["Stacked Generalization", "Blending"],
"优点": ["灵活的组合方式", "理论上最优"],
"缺点": ["计算复杂", "容易过拟合", "需要更多数据"],
"适用场景": "有足够数据且计算资源充足"
},
"Voting": {
"原理": "通过投票机制组合多个学习器的预测",
"代表算法": ["Hard Voting", "Soft Voting"],
"优点": ["简单直观", "易于实现"],
"缺点": ["权重固定", "可能被弱学习器拖累"],
"适用场景": "多个性能相近的学习器"
}
}
print("集成学习方法概述:")
print("=" * 50)
for method, details in ensemble_types.items():
print(f"\n{method}:")
print(f" 原理: {details['原理']}")
print(f" 代表算法: {', '.join(details['代表算法'])}")
print(f" 优点: {', '.join(details['优点'])}")
print(f" 缺点: {', '.join(details['缺点'])}")
print(f" 适用场景: {details['适用场景']}")
return ensemble_types
# 集成学习演示
ensemble_demo = EnsembleLearningDemo()
print("集成学习概述演示:")
print("=" * 40)
# 演示集成学习概念
individual_scores, ensemble_score = ensemble_demo.demonstrate_ensemble_concept()
# 集成学习类型概述
ensemble_types = ensemble_demo.ensemble_types_overview()
6.1.2 集成学习的理论基础
class EnsembleTheory:
"""
集成学习理论分析
"""
@staticmethod
def bias_variance_decomposition():
"""
偏差-方差分解理论
"""
print("偏差-方差分解理论:")
print("=" * 30)
print("对于回归问题,预测误差可以分解为:")
print("Error = Bias² + Variance + Noise")
print("")
print("其中:")
print("- Bias (偏差): 模型预测的期望值与真实值的差异")
print("- Variance (方差): 模型预测值的变异程度")
print("- Noise (噪声): 数据中的不可约误差")
print("")
print("集成学习的作用:")
print("- Bagging: 主要减少方差")
print("- Boosting: 主要减少偏差")
print("- Stacking: 可以同时减少偏差和方差")
@staticmethod
def diversity_importance():
"""
多样性的重要性
"""
print("\n多样性在集成学习中的重要性:")
print("=" * 40)
print("")
print("1. 错误独立性假设:")
print(" 如果各个学习器的错误是独立的,那么集成的错误率会显著降低")
print("")
print("2. 多样性来源:")
print(" - 数据多样性: 不同的训练子集")
print(" - 算法多样性: 不同的学习算法")
print(" - 参数多样性: 相同算法的不同参数")
print(" - 特征多样性: 不同的特征子集")
print("")
print("3. 准确性-多样性权衡:")
print(" 集成性能 = 个体准确性 + 个体多样性")
print(" 需要在准确性和多样性之间找到平衡")
def demonstrate_diversity_effect(self):
"""
演示多样性对集成性能的影响
"""
# 创建数据集
X, y = make_classification(
n_samples=1000, n_features=20, n_informative=10,
n_redundant=10, random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# 高多样性集成(不同算法)
diverse_models = [
('dt', DecisionTreeClassifier(random_state=42)),
('lr', LogisticRegression(random_state=42, max_iter=1000)),
('svm', SVC(probability=True, random_state=42)),
('nb', GaussianNB()),
('knn', KNeighborsClassifier())
]
# 低多样性集成(相似的决策树)
similar_models = [
('dt1', DecisionTreeClassifier(random_state=42, max_depth=5)),
('dt2', DecisionTreeClassifier(random_state=43, max_depth=5)),
('dt3', DecisionTreeClassifier(random_state=44, max_depth=5)),
('dt4', DecisionTreeClassifier(random_state=45, max_depth=5)),
('dt5', DecisionTreeClassifier(random_state=46, max_depth=5))
]
# 创建投票分类器
diverse_ensemble = VotingClassifier(
estimators=diverse_models, voting='soft'
)
similar_ensemble = VotingClassifier(
estimators=similar_models, voting='soft'
)
# 训练和评估
diverse_ensemble.fit(X_train, y_train)
similar_ensemble.fit(X_train, y_train)
diverse_score = diverse_ensemble.score(X_test, y_test)
similar_score = similar_ensemble.score(X_test, y_test)
# 计算个体模型性能
diverse_individual = []
similar_individual = []
for name, model in diverse_models:
model.fit(X_train, y_train)
diverse_individual.append(model.score(X_test, y_test))
for name, model in similar_models:
model.fit(X_train, y_train)
similar_individual.append(model.score(X_test, y_test))
print("\n多样性对集成性能的影响:")
print("=" * 40)
print(f"高多样性集成: {diverse_score:.4f}")
print(f"高多样性个体平均: {np.mean(diverse_individual):.4f}")
print(f"低多样性集成: {similar_score:.4f}")
print(f"低多样性个体平均: {np.mean(similar_individual):.4f}")
# 可视化
self.visualize_diversity_effect(
diverse_individual, similar_individual,
diverse_score, similar_score
)
return diverse_score, similar_score
def visualize_diversity_effect(self, diverse_scores, similar_scores,
diverse_ensemble, similar_ensemble):
"""
可视化多样性效果
"""
plt.figure(figsize=(12, 5))
# 个体性能比较
plt.subplot(1, 2, 1)
x_pos = np.arange(5)
width = 0.35
plt.bar(x_pos - width/2, diverse_scores, width,
label='高多样性', alpha=0.7)
plt.bar(x_pos + width/2, similar_scores, width,
label='低多样性', alpha=0.7)
plt.xlabel('模型编号')
plt.ylabel('准确率')
plt.title('个体模型性能比较')
plt.legend()
plt.grid(True, alpha=0.3)
# 集成效果比较
plt.subplot(1, 2, 2)
ensemble_scores = [diverse_ensemble, similar_ensemble]
ensemble_names = ['高多样性集成', '低多样性集成']
colors = ['green', 'orange']
bars = plt.bar(ensemble_names, ensemble_scores, color=colors, alpha=0.7)
# 添加个体平均线
plt.axhline(y=np.mean(diverse_scores), color='green',
linestyle='--', alpha=0.5, label='高多样性个体平均')
plt.axhline(y=np.mean(similar_scores), color='orange',
linestyle='--', alpha=0.5, label='低多样性个体平均')
plt.ylabel('准确率')
plt.title('集成性能比较')
plt.legend()
plt.grid(True, alpha=0.3)
# 添加数值标签
for bar, score in zip(bars, ensemble_scores):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
f'{score:.3f}', ha='center', va='bottom')
plt.tight_layout()
plt.show()
# 理论分析演示
theory = EnsembleTheory()
print("\n集成学习理论基础:")
print("=" * 40)
# 偏差-方差分解
theory.bias_variance_decomposition()
# 多样性重要性
theory.diversity_importance()
# 多样性效果演示
diverse_score, similar_score = theory.demonstrate_diversity_effect()
6.2 Bagging方法
6.2.1 Bootstrap采样
class BootstrapDemo:
"""
Bootstrap采样演示
"""
def __init__(self):
self.original_data = None
self.bootstrap_samples = []
def demonstrate_bootstrap(self, n_samples=100, n_bootstrap=10):
"""
演示Bootstrap采样过程
"""
# 创建原始数据
np.random.seed(42)
self.original_data = np.random.normal(50, 15, n_samples)
print(f"原始数据统计:")
print(f"样本数量: {len(self.original_data)}")
print(f"均值: {np.mean(self.original_data):.2f}")
print(f"标准差: {np.std(self.original_data):.2f}")
# 生成Bootstrap样本
self.bootstrap_samples = []
bootstrap_means = []
bootstrap_stds = []
for i in range(n_bootstrap):
# Bootstrap采样(有放回采样)
bootstrap_sample = np.random.choice(
self.original_data, size=n_samples, replace=True
)
self.bootstrap_samples.append(bootstrap_sample)
bootstrap_means.append(np.mean(bootstrap_sample))
bootstrap_stds.append(np.std(bootstrap_sample))
print(f"\nBootstrap样本统计:")
print(f"Bootstrap样本数量: {n_bootstrap}")
print(f"均值的均值: {np.mean(bootstrap_means):.2f}")
print(f"均值的标准差: {np.std(bootstrap_means):.2f}")
print(f"标准差的均值: {np.mean(bootstrap_stds):.2f}")
# 可视化Bootstrap采样
self.visualize_bootstrap(bootstrap_means, bootstrap_stds)
return bootstrap_means, bootstrap_stds
def visualize_bootstrap(self, bootstrap_means, bootstrap_stds):
"""
可视化Bootstrap采样结果
"""
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 原始数据分布
axes[0, 0].hist(self.original_data, bins=20, alpha=0.7,
color='blue', edgecolor='black')
axes[0, 0].axvline(np.mean(self.original_data), color='red',
linestyle='--', label=f'均值: {np.mean(self.original_data):.2f}')
axes[0, 0].set_title('原始数据分布')
axes[0, 0].set_xlabel('值')
axes[0, 0].set_ylabel('频次')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 几个Bootstrap样本的分布
colors = ['red', 'green', 'orange']
for i, color in enumerate(colors):
if i < len(self.bootstrap_samples):
axes[0, 1].hist(self.bootstrap_samples[i], bins=20, alpha=0.5,
color=color, label=f'Bootstrap {i+1}',
edgecolor='black')
axes[0, 1].set_title('Bootstrap样本分布示例')
axes[0, 1].set_xlabel('值')
axes[0, 1].set_ylabel('频次')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# Bootstrap均值分布
axes[1, 0].hist(bootstrap_means, bins=15, alpha=0.7,
color='green', edgecolor='black')
axes[1, 0].axvline(np.mean(bootstrap_means), color='red',
linestyle='--', label=f'均值: {np.mean(bootstrap_means):.2f}')
axes[1, 0].axvline(np.mean(self.original_data), color='blue',
linestyle=':', label=f'原始均值: {np.mean(self.original_data):.2f}')
axes[1, 0].set_title('Bootstrap均值分布')
axes[1, 0].set_xlabel('均值')
axes[1, 0].set_ylabel('频次')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
# Bootstrap标准差分布
axes[1, 1].hist(bootstrap_stds, bins=15, alpha=0.7,
color='orange', edgecolor='black')
axes[1, 1].axvline(np.mean(bootstrap_stds), color='red',
linestyle='--', label=f'均值: {np.mean(bootstrap_stds):.2f}')
axes[1, 1].axvline(np.std(self.original_data), color='blue',
linestyle=':', label=f'原始标准差: {np.std(self.original_data):.2f}')
axes[1, 1].set_title('Bootstrap标准差分布')
axes[1, 1].set_xlabel('标准差')
axes[1, 1].set_ylabel('频次')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def out_of_bag_analysis(self):
"""
Out-of-Bag (OOB) 分析
"""
n_samples = 1000
n_experiments = 1000
oob_ratios = []
for _ in range(n_experiments):
# 生成索引
indices = np.arange(n_samples)
# Bootstrap采样
bootstrap_indices = np.random.choice(
indices, size=n_samples, replace=True
)
# 计算OOB样本
unique_indices = np.unique(bootstrap_indices)
oob_indices = np.setdiff1d(indices, unique_indices)
oob_ratio = len(oob_indices) / n_samples
oob_ratios.append(oob_ratio)
theoretical_oob = 1 - (1 - 1/n_samples)**n_samples
empirical_oob = np.mean(oob_ratios)
print(f"\nOut-of-Bag分析:")
print(f"理论OOB比例: {theoretical_oob:.4f}")
print(f"经验OOB比例: {empirical_oob:.4f}")
print(f"近似值 (1-1/e): {1-1/np.e:.4f}")
# 可视化OOB比例分布
plt.figure(figsize=(10, 6))
plt.subplot(1, 2, 1)
plt.hist(oob_ratios, bins=30, alpha=0.7, color='skyblue',
edgecolor='black', density=True)
plt.axvline(theoretical_oob, color='red', linestyle='--',
label=f'理论值: {theoretical_oob:.4f}')
plt.axvline(empirical_oob, color='green', linestyle='--',
label=f'经验值: {empirical_oob:.4f}')
plt.xlabel('OOB比例')
plt.ylabel('密度')
plt.title('OOB比例分布')
plt.legend()
plt.grid(True, alpha=0.3)
# 不同样本大小的OOB比例
sample_sizes = np.logspace(1, 4, 20).astype(int)
theoretical_oobs = 1 - (1 - 1/sample_sizes)**sample_sizes
plt.subplot(1, 2, 2)
plt.semilogx(sample_sizes, theoretical_oobs, 'o-',
label='理论OOB比例')
plt.axhline(y=1-1/np.e, color='red', linestyle='--',
label=f'渐近值: {1-1/np.e:.4f}')
plt.xlabel('样本大小')
plt.ylabel('OOB比例')
plt.title('OOB比例 vs 样本大小')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return oob_ratios
# Bootstrap演示
bootstrap_demo = BootstrapDemo()
print("Bootstrap采样演示:")
print("=" * 40)
# Bootstrap采样演示
bootstrap_means, bootstrap_stds = bootstrap_demo.demonstrate_bootstrap(
n_samples=200, n_bootstrap=50
)
# OOB分析
oob_ratios = bootstrap_demo.out_of_bag_analysis()
6.2.2 Bagging算法实现
class BaggingAnalyzer:
"""
Bagging算法分析器
"""
def __init__(self):
self.models = {}
self.results = {}
def demonstrate_bagging_process(self):
"""
演示Bagging过程
"""
# 创建数据集
X, y = make_classification(
n_samples=1000, n_features=20, n_informative=15,
n_redundant=5, random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# 手动实现Bagging
n_estimators = 10
base_models = []
predictions = []
print("Bagging过程演示:")
print("=" * 30)
for i in range(n_estimators):
# Bootstrap采样
n_samples = len(X_train)
bootstrap_indices = np.random.choice(
n_samples, size=n_samples, replace=True
)
X_bootstrap = X_train[bootstrap_indices]
y_bootstrap = y_train[bootstrap_indices]
# 训练基学习器
base_model = DecisionTreeClassifier(
random_state=i, max_depth=10
)
base_model.fit(X_bootstrap, y_bootstrap)
base_models.append(base_model)
# 预测
pred = base_model.predict(X_test)
predictions.append(pred)
# 计算个体性能
score = accuracy_score(y_test, pred)
print(f"模型 {i+1}: 准确率 = {score:.4f}")
# 集成预测(多数投票)
predictions_array = np.array(predictions)
ensemble_pred = np.apply_along_axis(
lambda x: np.bincount(x).argmax(),
axis=0, arr=predictions_array
)
ensemble_score = accuracy_score(y_test, ensemble_pred)
individual_scores = [accuracy_score(y_test, pred) for pred in predictions]
print(f"\n集成结果:")
print(f"个体模型平均准确率: {np.mean(individual_scores):.4f}")
print(f"集成模型准确率: {ensemble_score:.4f}")
print(f"性能提升: {ensemble_score - np.mean(individual_scores):.4f}")
# 与sklearn的BaggingClassifier比较
sklearn_bagging = BaggingClassifier(
base_estimator=DecisionTreeClassifier(max_depth=10),
n_estimators=n_estimators,
random_state=42
)
sklearn_bagging.fit(X_train, y_train)
sklearn_score = sklearn_bagging.score(X_test, y_test)
print(f"sklearn BaggingClassifier: {sklearn_score:.4f}")
# 可视化结果
self.visualize_bagging_results(
individual_scores, ensemble_score, sklearn_score
)
return individual_scores, ensemble_score, sklearn_score
def visualize_bagging_results(self, individual_scores, ensemble_score, sklearn_score):
"""
可视化Bagging结果
"""
plt.figure(figsize=(12, 8))
# 个体模型性能分布
plt.subplot(2, 2, 1)
plt.hist(individual_scores, bins=10, alpha=0.7, color='lightblue',
edgecolor='black')
plt.axvline(np.mean(individual_scores), color='red', linestyle='--',
label=f'平均: {np.mean(individual_scores):.4f}')
plt.axvline(ensemble_score, color='green', linestyle='--',
label=f'集成: {ensemble_score:.4f}')
plt.xlabel('准确率')
plt.ylabel('频次')
plt.title('个体模型性能分布')
plt.legend()
plt.grid(True, alpha=0.3)
# 性能比较
plt.subplot(2, 2, 2)
methods = ['个体平均', '手动集成', 'sklearn集成']
scores = [np.mean(individual_scores), ensemble_score, sklearn_score]
colors = ['lightblue', 'green', 'orange']
bars = plt.bar(methods, scores, color=colors, alpha=0.7)
plt.ylabel('准确率')
plt.title('性能比较')
plt.ylim(0.8, 1.0)
# 添加数值标签
for bar, score in zip(bars, scores):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.005,
f'{score:.4f}', ha='center', va='bottom')
plt.grid(True, alpha=0.3)
# 累积性能
plt.subplot(2, 2, 3)
cumulative_scores = []
for i in range(1, len(individual_scores) + 1):
# 计算前i个模型的集成性能
cumulative_scores.append(np.mean(individual_scores[:i]))
plt.plot(range(1, len(individual_scores) + 1), cumulative_scores,
'o-', label='累积平均')
plt.axhline(y=ensemble_score, color='green', linestyle='--',
label=f'最终集成: {ensemble_score:.4f}')
plt.xlabel('模型数量')
plt.ylabel('准确率')
plt.title('累积性能变化')
plt.legend()
plt.grid(True, alpha=0.3)
# 方差分析
plt.subplot(2, 2, 4)
model_numbers = range(1, len(individual_scores) + 1)
variances = []
for i in range(1, len(individual_scores) + 1):
variances.append(np.var(individual_scores[:i]))
plt.plot(model_numbers, variances, 'o-', color='red')
plt.xlabel('模型数量')
plt.ylabel('方差')
plt.title('性能方差变化')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def compare_bagging_variants(self):
"""
比较不同的Bagging变体
"""
# 创建数据集
X, y = make_classification(
n_samples=1000, n_features=20, n_informative=15,
random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# 不同的Bagging变体
bagging_variants = {
'Standard Bagging': BaggingClassifier(
base_estimator=DecisionTreeClassifier(),
n_estimators=50, random_state=42
),
'Random Forest': RandomForestClassifier(
n_estimators=50, random_state=42
),
'Extra Trees': ExtraTreesClassifier(
n_estimators=50, random_state=42
),
'Bagging with SVM': BaggingClassifier(
base_estimator=SVC(probability=True),
n_estimators=10, random_state=42
)
}
results = {}
print("\nBagging变体比较:")
print("=" * 30)
for name, model in bagging_variants.items():
# 交叉验证
cv_scores = cross_val_score(model, X_train, y_train, cv=5)
# 训练和测试
model.fit(X_train, y_train)
test_score = model.score(X_test, y_test)
results[name] = {
'cv_mean': cv_scores.mean(),
'cv_std': cv_scores.std(),
'test_score': test_score
}
print(f"{name}:")
print(f" CV: {cv_scores.mean():.4f} (±{cv_scores.std():.4f})")
print(f" Test: {test_score:.4f}")
# 可视化比较
self.visualize_bagging_variants(results)
return results
def visualize_bagging_variants(self, results):
"""
可视化Bagging变体比较
"""
plt.figure(figsize=(12, 6))
names = list(results.keys())
cv_means = [results[name]['cv_mean'] for name in names]
cv_stds = [results[name]['cv_std'] for name in names]
test_scores = [results[name]['test_score'] for name in names]
x = np.arange(len(names))
width = 0.35
# CV性能
plt.subplot(1, 2, 1)
plt.bar(x - width/2, cv_means, width, yerr=cv_stds,
label='CV', alpha=0.7, capsize=5)
plt.bar(x + width/2, test_scores, width,
label='Test', alpha=0.7)
plt.xlabel('方法')
plt.ylabel('准确率')
plt.title('Bagging变体性能比较')
plt.xticks(x, names, rotation=45)
plt.legend()
plt.grid(True, alpha=0.3)
# 性能排序
plt.subplot(1, 2, 2)
sorted_indices = np.argsort(test_scores)[::-1]
sorted_names = [names[i] for i in sorted_indices]
sorted_scores = [test_scores[i] for i in sorted_indices]
colors = plt.cm.viridis(np.linspace(0, 1, len(sorted_names)))
bars = plt.bar(range(len(sorted_names)), sorted_scores,
color=colors, alpha=0.7)
plt.xlabel('排名')
plt.ylabel('测试准确率')
plt.title('性能排序')
plt.xticks(range(len(sorted_names)),
[f'{i+1}' for i in range(len(sorted_names))])
# 添加方法名标签
for i, (bar, name, score) in enumerate(zip(bars, sorted_names, sorted_scores)):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.005,
f'{name}\n{score:.4f}', ha='center', va='bottom',
fontsize=8, rotation=0)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Bagging分析演示
bagging_analyzer = BaggingAnalyzer()
print("\nBagging方法演示:")
print("=" * 40)
# Bagging过程演示
individual_scores, ensemble_score, sklearn_score = bagging_analyzer.demonstrate_bagging_process()
# Bagging变体比较
bagging_results = bagging_analyzer.compare_bagging_variants()
6.3 随机森林
6.3.1 随机森林原理
随机森林是Bagging的一个扩展,它在Bootstrap采样的基础上,还引入了特征随机选择,进一步增加了模型的多样性。
class RandomForestAnalyzer:
"""
随机森林分析器
"""
def __init__(self):
self.models = {}
self.results = {}
def demonstrate_feature_randomness(self):
"""
演示特征随机性的作用
"""
# 创建高维数据集
X, y = make_classification(
n_samples=1000,
n_features=50,
n_informative=30,
n_redundant=20,
random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# 比较不同的特征选择策略
feature_strategies = {
'All Features': RandomForestClassifier(
n_estimators=100, max_features=None, random_state=42
),
'Sqrt Features': RandomForestClassifier(
n_estimators=100, max_features='sqrt', random_state=42
),
'Log2 Features': RandomForestClassifier(
n_estimators=100, max_features='log2', random_state=42
),
'Half Features': RandomForestClassifier(
n_estimators=100, max_features=0.5, random_state=42
),
'Fixed 10 Features': RandomForestClassifier(
n_estimators=100, max_features=10, random_state=42
)
}
results = {}
print("特征随机性对随机森林的影响:")
print("=" * 40)
print(f"总特征数: {X.shape[1]}")
print(f"sqrt(特征数): {int(np.sqrt(X.shape[1]))}")
print(f"log2(特征数): {int(np.log2(X.shape[1]))}")
print()
for name, model in feature_strategies.items():
# 交叉验证
cv_scores = cross_val_score(model, X_train, y_train, cv=5)
# 训练和测试
model.fit(X_train, y_train)
test_score = model.score(X_test, y_test)
# 计算实际使用的特征数
if hasattr(model, 'max_features_'):
actual_features = model.max_features_
else:
actual_features = model.max_features
results[name] = {
'cv_mean': cv_scores.mean(),
'cv_std': cv_scores.std(),
'test_score': test_score,
'max_features': actual_features
}
print(f"{name}:")
print(f" 使用特征数: {actual_features}")
print(f" CV: {cv_scores.mean():.4f} (±{cv_scores.std():.4f})")
print(f" Test: {test_score:.4f}")
# 可视化结果
self.visualize_feature_randomness(results)
return results
def visualize_feature_randomness(self, results):
"""
可视化特征随机性效果
"""
plt.figure(figsize=(15, 10))
names = list(results.keys())
cv_means = [results[name]['cv_mean'] for name in names]
cv_stds = [results[name]['cv_std'] for name in names]
test_scores = [results[name]['test_score'] for name in names]
max_features = [results[name]['max_features'] for name in names]
# 性能比较
plt.subplot(2, 2, 1)
x = np.arange(len(names))
width = 0.35
plt.bar(x - width/2, cv_means, width, yerr=cv_stds,
label='CV', alpha=0.7, capsize=5)
plt.bar(x + width/2, test_scores, width,
label='Test', alpha=0.7)
plt.xlabel('特征选择策略')
plt.ylabel('准确率')
plt.title('不同特征选择策略的性能')
plt.xticks(x, names, rotation=45)
plt.legend()
plt.grid(True, alpha=0.3)
# 特征数 vs 性能
plt.subplot(2, 2, 2)
# 处理字符串类型的max_features
numeric_features = []
numeric_scores = []
for i, (feat, score) in enumerate(zip(max_features, test_scores)):
if isinstance(feat, (int, float)):
numeric_features.append(feat)
numeric_scores.append(score)
if numeric_features:
plt.scatter(numeric_features, numeric_scores, s=100, alpha=0.7)
# 添加标签
for feat, score, name in zip(numeric_features, numeric_scores,
[names[i] for i in range(len(names))
if isinstance(max_features[i], (int, float))]):
plt.annotate(name, (feat, score), xytext=(5, 5),
textcoords='offset points', fontsize=8)
plt.xlabel('使用的特征数')
plt.ylabel('测试准确率')
plt.title('特征数 vs 性能')
plt.grid(True, alpha=0.3)
# 性能稳定性
plt.subplot(2, 2, 3)
plt.bar(names, cv_stds, alpha=0.7, color='orange')
plt.xlabel('特征选择策略')
plt.ylabel('CV标准差')
plt.title('性能稳定性比较')
plt.xticks(rotation=45)
plt.grid(True, alpha=0.3)
# 综合评分
plt.subplot(2, 2, 4)
# 计算综合评分:性能 - 稳定性惩罚
composite_scores = [score - 2*std for score, std in zip(test_scores, cv_stds)]
colors = plt.cm.viridis(np.linspace(0, 1, len(names)))
bars = plt.bar(names, composite_scores, color=colors, alpha=0.7)
plt.xlabel('特征选择策略')
plt.ylabel('综合评分')
plt.title('综合评分 (性能 - 2×稳定性)')
plt.xticks(rotation=45)
# 添加数值标签
for bar, score in zip(bars, composite_scores):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.005,
f'{score:.3f}', ha='center', va='bottom', fontsize=8)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def analyze_tree_diversity(self):
"""
分析随机森林中树的多样性
"""
# 创建数据集
X, y = make_classification(
n_samples=500, n_features=20, n_informative=15,
random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# 训练随机森林
rf = RandomForestClassifier(
n_estimators=20, max_depth=10, random_state=42
)
rf.fit(X_train, y_train)
# 分析每棵树的预测
tree_predictions = []
tree_scores = []
for tree in rf.estimators_:
pred = tree.predict(X_test)
tree_predictions.append(pred)
score = accuracy_score(y_test, pred)
tree_scores.append(score)
# 计算树之间的相关性
prediction_matrix = np.array(tree_predictions)
correlation_matrix = np.corrcoef(prediction_matrix)
# 计算平均相关性(排除对角线)
mask = ~np.eye(correlation_matrix.shape[0], dtype=bool)
avg_correlation = correlation_matrix[mask].mean()
print("随机森林树的多样性分析:")
print("=" * 40)
print(f"树的数量: {len(rf.estimators_)}")
print(f"个体树平均准确率: {np.mean(tree_scores):.4f}")
print(f"个体树准确率标准差: {np.std(tree_scores):.4f}")
print(f"树之间平均相关性: {avg_correlation:.4f}")
print(f"随机森林准确率: {rf.score(X_test, y_test):.4f}")
# 可视化多样性
self.visualize_tree_diversity(
tree_scores, correlation_matrix, rf.score(X_test, y_test)
)
return tree_scores, correlation_matrix
def visualize_tree_diversity(self, tree_scores, correlation_matrix, rf_score):
"""
可视化树的多样性
"""
plt.figure(figsize=(15, 10))
# 个体树性能分布
plt.subplot(2, 3, 1)
plt.hist(tree_scores, bins=10, alpha=0.7, color='lightblue',
edgecolor='black')
plt.axvline(np.mean(tree_scores), color='red', linestyle='--',
label=f'平均: {np.mean(tree_scores):.4f}')
plt.axvline(rf_score, color='green', linestyle='--',
label=f'RF: {rf_score:.4f}')
plt.xlabel('准确率')
plt.ylabel('频次')
plt.title('个体树性能分布')
plt.legend()
plt.grid(True, alpha=0.3)
# 相关性矩阵热力图
plt.subplot(2, 3, 2)
sns.heatmap(correlation_matrix, cmap='coolwarm', center=0,
square=True, cbar_kws={'label': '相关性'})
plt.title('树预测相关性矩阵')
# 相关性分布
plt.subplot(2, 3, 3)
# 提取上三角矩阵(排除对角线)
mask = np.triu(np.ones_like(correlation_matrix, dtype=bool), k=1)
correlations = correlation_matrix[mask]
plt.hist(correlations, bins=15, alpha=0.7, color='orange',
edgecolor='black')
plt.axvline(np.mean(correlations), color='red', linestyle='--',
label=f'平均: {np.mean(correlations):.4f}')
plt.xlabel('相关性')
plt.ylabel('频次')
plt.title('树间相关性分布')
plt.legend()
plt.grid(True, alpha=0.3)
# 累积性能
plt.subplot(2, 3, 4)
n_trees = len(tree_scores)
cumulative_scores = []
for i in range(1, n_trees + 1):
# 模拟前i棵树的集成性能
subset_predictions = np.array([tree_scores[:i]]).mean()
cumulative_scores.append(subset_predictions)
plt.plot(range(1, n_trees + 1), cumulative_scores, 'o-')
plt.axhline(y=rf_score, color='green', linestyle='--',
label=f'最终RF: {rf_score:.4f}')
plt.xlabel('树的数量')
plt.ylabel('平均准确率')
plt.title('累积性能变化')
plt.legend()
plt.grid(True, alpha=0.3)
# 性能 vs 多样性
plt.subplot(2, 3, 5)
# 计算每棵树与其他树的平均相关性
avg_correlations = []
for i in range(len(tree_scores)):
other_correlations = [correlation_matrix[i, j]
for j in range(len(tree_scores)) if i != j]
avg_correlations.append(np.mean(other_correlations))
plt.scatter(avg_correlations, tree_scores, alpha=0.7)
plt.xlabel('与其他树的平均相关性')
plt.ylabel('个体准确率')
plt.title('性能 vs 多样性')
# 添加趋势线
z = np.polyfit(avg_correlations, tree_scores, 1)
p = np.poly1d(z)
plt.plot(avg_correlations, p(avg_correlations), "r--", alpha=0.8)
plt.grid(True, alpha=0.3)
# 特征重要性分析
plt.subplot(2, 3, 6)
# 这里需要重新训练一个随机森林来获取特征重要性
X, y = make_classification(
n_samples=500, n_features=20, n_informative=15,
random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
rf_temp = RandomForestClassifier(
n_estimators=20, max_depth=10, random_state=42
)
rf_temp.fit(X_train, y_train)
feature_importance = rf_temp.feature_importances_
feature_indices = np.argsort(feature_importance)[::-1][:10]
plt.bar(range(10), feature_importance[feature_indices])
plt.xlabel('特征排名')
plt.ylabel('重要性')
plt.title('前10个重要特征')
plt.xticks(range(10), [f'F{i}' for i in feature_indices])
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def parameter_sensitivity_analysis(self):
"""
随机森林参数敏感性分析
"""
# 创建数据集
X, y = make_classification(
n_samples=1000, n_features=20, n_informative=15,
random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# 参数范围
param_ranges = {
'n_estimators': [10, 25, 50, 100, 200, 500],
'max_depth': [3, 5, 10, 15, 20, None],
'min_samples_split': [2, 5, 10, 20, 50],
'min_samples_leaf': [1, 2, 5, 10, 20],
'max_features': ['sqrt', 'log2', 0.3, 0.5, 0.7, None]
}
results = {}
print("随机森林参数敏感性分析:")
print("=" * 40)
for param_name, param_values in param_ranges.items():
param_scores = []
for param_value in param_values:
# 创建参数字典
params = {'random_state': 42}
params[param_name] = param_value
# 训练模型
rf = RandomForestClassifier(**params)
# 交叉验证
cv_scores = cross_val_score(rf, X_train, y_train, cv=3)
param_scores.append(cv_scores.mean())
results[param_name] = {
'values': param_values,
'scores': param_scores
}
best_idx = np.argmax(param_scores)
print(f"{param_name}:")
print(f" 最佳值: {param_values[best_idx]}")
print(f" 最佳分数: {param_scores[best_idx]:.4f}")
# 可视化参数敏感性
self.visualize_parameter_sensitivity(results)
return results
def visualize_parameter_sensitivity(self, results):
"""
可视化参数敏感性
"""
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
axes = axes.ravel()
for i, (param_name, data) in enumerate(results.items()):
if i >= len(axes):
break
values = data['values']
scores = data['scores']
# 处理不同类型的参数值
if all(isinstance(v, (int, float)) or v is None for v in values):
# 数值参数
x_values = [v if v is not None else max([v for v in values if v is not None]) * 1.1
for v in values]
axes[i].plot(x_values, scores, 'o-', linewidth=2, markersize=8)
axes[i].set_xlabel(param_name)
else:
# 分类参数
x_pos = range(len(values))
axes[i].bar(x_pos, scores, alpha=0.7)
axes[i].set_xlabel(param_name)
axes[i].set_xticks(x_pos)
axes[i].set_xticklabels([str(v) for v in values], rotation=45)
axes[i].set_ylabel('CV准确率')
axes[i].set_title(f'{param_name} 敏感性')
axes[i].grid(True, alpha=0.3)
# 标记最佳值
best_idx = np.argmax(scores)
if all(isinstance(v, (int, float)) or v is None for v in values):
x_best = values[best_idx] if values[best_idx] is not None else max([v for v in values if v is not None]) * 1.1
axes[i].scatter([x_best], [scores[best_idx]],
color='red', s=100, zorder=5)
else:
axes[i].bar(best_idx, scores[best_idx],
color='red', alpha=0.8)
# 删除多余的子图
for i in range(len(results), len(axes)):
fig.delaxes(axes[i])
plt.tight_layout()
plt.show()
# 随机森林分析演示
rf_analyzer = RandomForestAnalyzer()
print("\n随机森林详细分析:")
print("=" * 40)
# 特征随机性演示
feature_results = rf_analyzer.demonstrate_feature_randomness()
# 树多样性分析
tree_scores, correlation_matrix = rf_analyzer.analyze_tree_diversity()
# 参数敏感性分析
param_results = rf_analyzer.parameter_sensitivity_analysis()
6.4 Boosting方法
6.4.1 AdaBoost算法
class AdaBoostAnalyzer:
"""
AdaBoost算法分析器
"""
def __init__(self):
self.models = {}
self.results = {}
def demonstrate_adaboost_process(self):
"""
演示AdaBoost训练过程
"""
# 创建简单的二分类数据集
X, y = make_classification(
n_samples=200, n_features=2, n_redundant=0,
n_informative=2, n_clusters_per_class=1,
random_state=42
)
# 将标签转换为-1和1
y = np.where(y == 0, -1, 1)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# 手动实现AdaBoost
n_estimators = 5
models = []
alphas = []
sample_weights = np.ones(len(X_train)) / len(X_train)
print("AdaBoost训练过程:")
print("=" * 30)
for t in range(n_estimators):
print(f"\n第 {t+1} 轮:")
# 训练弱学习器(决策树桩)
weak_learner = DecisionTreeClassifier(
max_depth=1, random_state=t
)
weak_learner.fit(X_train, y_train, sample_weight=sample_weights)
# 预测
y_pred = weak_learner.predict(X_train)
# 计算加权错误率
incorrect = y_pred != y_train
error_rate = np.sum(sample_weights[incorrect])
print(f" 加权错误率: {error_rate:.4f}")
# 计算学习器权重
if error_rate == 0:
alpha = 10 # 避免除零
elif error_rate >= 0.5:
alpha = 0
else:
alpha = 0.5 * np.log((1 - error_rate) / error_rate)
print(f" 学习器权重: {alpha:.4f}")
# 更新样本权重
sample_weights *= np.exp(-alpha * y_train * y_pred)
sample_weights /= np.sum(sample_weights) # 归一化
print(f" 样本权重范围: [{np.min(sample_weights):.6f}, {np.max(sample_weights):.6f}]")
models.append(weak_learner)
alphas.append(alpha)
# 集成预测
def ensemble_predict(X):
predictions = np.zeros(len(X))
for alpha, model in zip(alphas, models):
predictions += alpha * model.predict(X)
return np.sign(predictions)
# 评估性能
train_pred = ensemble_predict(X_train)
test_pred = ensemble_predict(X_test)
train_accuracy = np.mean(train_pred == y_train)
test_accuracy = np.mean(test_pred == y_test)
print(f"\n最终结果:")
print(f"训练准确率: {train_accuracy:.4f}")
print(f"测试准确率: {test_accuracy:.4f}")
# 与sklearn的AdaBoost比较
sklearn_ada = AdaBoostClassifier(
base_estimator=DecisionTreeClassifier(max_depth=1),
n_estimators=n_estimators,
random_state=42
)
sklearn_ada.fit(X_train, y_train)
sklearn_score = sklearn_ada.score(X_test, y_test)
print(f"sklearn AdaBoost: {sklearn_score:.4f}")
# 可视化AdaBoost过程
self.visualize_adaboost_process(
X_train, y_train, models, alphas, sample_weights
)
return models, alphas, train_accuracy, test_accuracy
def visualize_adaboost_process(self, X, y, models, alphas, final_weights):
"""
可视化AdaBoost训练过程
"""
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
# 绘制前5个弱学习器的决策边界
for i in range(min(5, len(models))):
row = i // 3
col = i % 3
if row < 2 and col < 3:
ax = axes[row, col]
# 创建网格
h = 0.02
x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1
xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
np.arange(y_min, y_max, h))
# 预测网格点
Z = models[i].predict(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
# 绘制决策边界
ax.contourf(xx, yy, Z, alpha=0.3, cmap=plt.cm.RdYlBu)
# 绘制数据点
scatter = ax.scatter(X[:, 0], X[:, 1], c=y,
s=final_weights*1000, alpha=0.7,
cmap=plt.cm.RdYlBu, edgecolors='black')
ax.set_title(f'弱学习器 {i+1}\n权重: {alphas[i]:.3f}')
ax.set_xlabel('特征 1')
ax.set_ylabel('特征 2')
# 删除多余的子图
if len(models) < 6:
for i in range(len(models), 6):
row = i // 3
col = i % 3
if row < 2 and col < 3:
fig.delaxes(axes[row, col])
plt.tight_layout()
plt.show()
# 绘制权重变化和性能变化
self.plot_adaboost_convergence(models, alphas, X, y)
def plot_adaboost_convergence(self, models, alphas, X, y):
"""
绘制AdaBoost收敛过程
"""
plt.figure(figsize=(15, 5))
# 累积性能
plt.subplot(1, 3, 1)
cumulative_scores = []
for i in range(1, len(models) + 1):
# 计算前i个模型的集成性能
predictions = np.zeros(len(X))
for j in range(i):
predictions += alphas[j] * models[j].predict(X)
ensemble_pred = np.sign(predictions)
accuracy = np.mean(ensemble_pred == y)
cumulative_scores.append(accuracy)
plt.plot(range(1, len(models) + 1), cumulative_scores, 'o-')
plt.xlabel('弱学习器数量')
plt.ylabel('训练准确率')
plt.title('AdaBoost收敛过程')
plt.grid(True, alpha=0.3)
# 学习器权重
plt.subplot(1, 3, 2)
plt.bar(range(1, len(alphas) + 1), alphas, alpha=0.7)
plt.xlabel('弱学习器编号')
plt.ylabel('权重 (α)')
plt.title('弱学习器权重分布')
plt.grid(True, alpha=0.3)
# 个体学习器性能
plt.subplot(1, 3, 3)
individual_scores = []
for model in models:
pred = model.predict(X)
accuracy = np.mean(pred == y)
individual_scores.append(accuracy)
plt.bar(range(1, len(individual_scores) + 1), individual_scores,
alpha=0.7, color='orange')
plt.axhline(y=0.5, color='red', linestyle='--', label='随机猜测')
plt.xlabel('弱学习器编号')
plt.ylabel('个体准确率')
plt.title('个体学习器性能')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def compare_boosting_algorithms(self):
"""
比较不同的Boosting算法
"""
# 创建数据集
X, y = make_classification(
n_samples=1000, n_features=20, n_informative=15,
random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# 不同的Boosting算法
boosting_algorithms = {
'AdaBoost': AdaBoostClassifier(
base_estimator=DecisionTreeClassifier(max_depth=1),
n_estimators=50, random_state=42
),
'Gradient Boosting': GradientBoostingClassifier(
n_estimators=50, max_depth=3, random_state=42
),
'Random Forest (对比)': RandomForestClassifier(
n_estimators=50, random_state=42
)
}
results = {}
print("\nBoosting算法比较:")
print("=" * 30)
for name, model in boosting_algorithms.items():
# 交叉验证
cv_scores = cross_val_score(model, X_train, y_train, cv=5)
# 训练和测试
model.fit(X_train, y_train)
test_score = model.score(X_test, y_test)
results[name] = {
'cv_mean': cv_scores.mean(),
'cv_std': cv_scores.std(),
'test_score': test_score
}
print(f"{name}:")
print(f" CV: {cv_scores.mean():.4f} (±{cv_scores.std():.4f})")
print(f" Test: {test_score:.4f}")
# 可视化比较
self.visualize_boosting_comparison(results)
return results
def visualize_boosting_comparison(self, results):
"""
可视化Boosting算法比较
"""
plt.figure(figsize=(12, 6))
names = list(results.keys())
cv_means = [results[name]['cv_mean'] for name in names]
cv_stds = [results[name]['cv_std'] for name in names]
test_scores = [results[name]['test_score'] for name in names]
x = np.arange(len(names))
width = 0.35
# 性能比较
plt.subplot(1, 2, 1)
plt.bar(x - width/2, cv_means, width, yerr=cv_stds,
label='CV', alpha=0.7, capsize=5)
plt.bar(x + width/2, test_scores, width,
label='Test', alpha=0.7)
plt.xlabel('算法')
plt.ylabel('准确率')
plt.title('Boosting算法性能比较')
plt.xticks(x, names, rotation=45)
plt.legend()
plt.grid(True, alpha=0.3)
# 稳定性比较
plt.subplot(1, 2, 2)
colors = ['blue', 'orange', 'green']
bars = plt.bar(names, cv_stds, color=colors, alpha=0.7)
plt.xlabel('算法')
plt.ylabel('CV标准差')
plt.title('算法稳定性比较')
plt.xticks(rotation=45)
# 添加数值标签
for bar, std in zip(bars, cv_stds):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.001,
f'{std:.4f}', ha='center', va='bottom')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# AdaBoost分析演示
ada_analyzer = AdaBoostAnalyzer()
print("\nAdaBoost算法分析:")
print("=" * 40)
# AdaBoost过程演示
models, alphas, train_acc, test_acc = ada_analyzer.demonstrate_adaboost_process()
# Boosting算法比较
boosting_results = ada_analyzer.compare_boosting_algorithms()
6.4.2 梯度提升算法
class GradientBoostingAnalyzer:
"""
梯度提升算法分析器
"""
def __init__(self):
self.models = {}
self.results = {}
def demonstrate_gradient_boosting(self):
"""
演示梯度提升过程
"""
# 创建回归数据集
X, y = make_regression(
n_samples=200, n_features=1, noise=10,
random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# 手动实现简化的梯度提升
n_estimators = 5
learning_rate = 0.1
models = []
# 初始预测(均值)
initial_prediction = np.mean(y_train)
current_prediction = np.full(len(y_train), initial_prediction)
print("梯度提升训练过程:")
print("=" * 30)
print(f"初始预测(均值): {initial_prediction:.4f}")
for t in range(n_estimators):
print(f"\n第 {t+1} 轮:")
# 计算残差(负梯度)
residuals = y_train - current_prediction
print(f" 残差均值: {np.mean(residuals):.4f}")
print(f" 残差标准差: {np.std(residuals):.4f}")
# 训练弱学习器拟合残差
weak_learner = DecisionTreeRegressor(
max_depth=3, random_state=t
)
weak_learner.fit(X_train, residuals)
# 预测残差
residual_pred = weak_learner.predict(X_train)
# 更新预测
current_prediction += learning_rate * residual_pred
# 计算当前MSE
mse = np.mean((y_train - current_prediction) ** 2)
print(f" 当前MSE: {mse:.4f}")
models.append(weak_learner)
# 集成预测函数
def ensemble_predict(X):
prediction = np.full(len(X), initial_prediction)
for model in models:
prediction += learning_rate * model.predict(X)
return prediction
# 评估性能
train_pred = ensemble_predict(X_train)
test_pred = ensemble_predict(X_test)
train_mse = np.mean((y_train - train_pred) ** 2)
test_mse = np.mean((y_test - test_pred) ** 2)
print(f"\n最终结果:")
print(f"训练MSE: {train_mse:.4f}")
print(f"测试MSE: {test_mse:.4f}")
# 与sklearn的梯度提升比较
sklearn_gb = GradientBoostingRegressor(
n_estimators=n_estimators,
learning_rate=learning_rate,
max_depth=3,
random_state=42
)
sklearn_gb.fit(X_train, y_train)
sklearn_mse = np.mean((y_test - sklearn_gb.predict(X_test)) ** 2)
print(f"sklearn GradientBoosting: {sklearn_mse:.4f}")
# 可视化梯度提升过程
self.visualize_gradient_boosting(
X_train, y_train, X_test, y_test, models,
initial_prediction, learning_rate
)
return models, train_mse, test_mse
def visualize_gradient_boosting(self, X_train, y_train, X_test, y_test,
models, initial_pred, lr):
"""
可视化梯度提升过程
"""
plt.figure(figsize=(15, 10))
# 创建用于绘图的X范围
X_plot = np.linspace(X_train.min(), X_train.max(), 100).reshape(-1, 1)
# 绘制每一步的拟合过程
for i in range(min(4, len(models))):
plt.subplot(2, 2, i+1)
# 计算到第i步的预测
prediction = np.full(len(X_plot), initial_pred)
for j in range(i+1):
prediction += lr * models[j].predict(X_plot)
# 绘制数据点
plt.scatter(X_train, y_train, alpha=0.6, label='训练数据')
plt.scatter(X_test, y_test, alpha=0.6, label='测试数据')
# 绘制拟合曲线
plt.plot(X_plot, prediction, 'r-', linewidth=2,
label=f'第{i+1}步预测')
plt.xlabel('X')
plt.ylabel('y')
plt.title(f'梯度提升第{i+1}步')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 绘制收敛过程
self.plot_gb_convergence(X_train, y_train, X_test, y_test,
models, initial_pred, lr)
def plot_gb_convergence(self, X_train, y_train, X_test, y_test,
models, initial_pred, lr):
"""
绘制梯度提升收敛过程
"""
plt.figure(figsize=(15, 5))
# 计算每一步的MSE
train_mses = []
test_mses = []
for i in range(len(models)):
# 计算到第i步的预测
train_pred = np.full(len(X_train), initial_pred)
test_pred = np.full(len(X_test), initial_pred)
for j in range(i+1):
train_pred += lr * models[j].predict(X_train)
test_pred += lr * models[j].predict(X_test)
train_mse = np.mean((y_train - train_pred) ** 2)
test_mse = np.mean((y_test - test_pred) ** 2)
train_mses.append(train_mse)
test_mses.append(test_mse)
# MSE收敛曲线
plt.subplot(1, 3, 1)
plt.plot(range(1, len(models)+1), train_mses, 'o-', label='训练MSE')
plt.plot(range(1, len(models)+1), test_mses, 'o-', label='测试MSE')
plt.xlabel('迭代次数')
plt.ylabel('MSE')
plt.title('梯度提升收敛过程')
plt.legend()
plt.grid(True, alpha=0.3)
# 残差分析
plt.subplot(1, 3, 2)
final_train_pred = np.full(len(X_train), initial_pred)
for model in models:
final_train_pred += lr * model.predict(X_train)
residuals = y_train - final_train_pred
plt.scatter(final_train_pred, residuals, alpha=0.6)
plt.axhline(y=0, color='red', linestyle='--')
plt.xlabel('预测值')
plt.ylabel('残差')
plt.title('残差分析')
plt.grid(True, alpha=0.3)
# 学习率影响
plt.subplot(1, 3, 3)
learning_rates = [0.01, 0.05, 0.1, 0.2, 0.5]
final_mses = []
for lr_test in learning_rates:
test_pred = np.full(len(X_test), initial_pred)
for model in models:
test_pred += lr_test * model.predict(X_test)
mse = np.mean((y_test - test_pred) ** 2)
final_mses.append(mse)
plt.plot(learning_rates, final_mses, 'o-')
plt.axvline(x=lr, color='red', linestyle='--',
label=f'当前lr={lr}')
plt.xlabel('学习率')
plt.ylabel('测试MSE')
plt.title('学习率对性能的影响')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def compare_gb_variants(self):
"""
比较不同的梯度提升变体
"""
# 创建数据集
X, y = make_classification(
n_samples=1000, n_features=20, n_informative=15,
random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# 不同的梯度提升变体
gb_variants = {
'Gradient Boosting': GradientBoostingClassifier(
n_estimators=100, learning_rate=0.1, max_depth=3,
random_state=42
),
'XGBoost': None, # 需要安装xgboost
'LightGBM': None, # 需要安装lightgbm
'Histogram GB': GradientBoostingClassifier(
n_estimators=100, learning_rate=0.1, max_depth=3,
random_state=42
)
}
# 尝试导入XGBoost和LightGBM
try:
import xgboost as xgb
gb_variants['XGBoost'] = xgb.XGBClassifier(
n_estimators=100, learning_rate=0.1, max_depth=3,
random_state=42, eval_metric='logloss'
)
except ImportError:
print("XGBoost未安装,跳过XGBoost比较")
del gb_variants['XGBoost']
try:
import lightgbm as lgb
gb_variants['LightGBM'] = lgb.LGBMClassifier(
n_estimators=100, learning_rate=0.1, max_depth=3,
random_state=42, verbose=-1
)
except ImportError:
print("LightGBM未安装,跳过LightGBM比较")
del gb_variants['LightGBM']
results = {}
print("\n梯度提升变体比较:")
print("=" * 30)
for name, model in gb_variants.items():
if model is None:
continue
# 记录训练时间
start_time = time.time()
# 交叉验证
cv_scores = cross_val_score(model, X_train, y_train, cv=3)
# 训练和测试
model.fit(X_train, y_train)
test_score = model.score(X_test, y_test)
training_time = time.time() - start_time
results[name] = {
'cv_mean': cv_scores.mean(),
'cv_std': cv_scores.std(),
'test_score': test_score,
'training_time': training_time
}
print(f"{name}:")
print(f" CV: {cv_scores.mean():.4f} (±{cv_scores.std():.4f})")
print(f" Test: {test_score:.4f}")
print(f" 训练时间: {training_time:.2f}秒")
# 可视化比较
self.visualize_gb_variants(results)
return results
def visualize_gb_variants(self, results):
"""
可视化梯度提升变体比较
"""
if not results:
return
plt.figure(figsize=(15, 10))
names = list(results.keys())
cv_means = [results[name]['cv_mean'] for name in names]
cv_stds = [results[name]['cv_std'] for name in names]
test_scores = [results[name]['test_score'] for name in names]
training_times = [results[name]['training_time'] for name in names]
# 性能比较
plt.subplot(2, 2, 1)
x = np.arange(len(names))
width = 0.35
plt.bar(x - width/2, cv_means, width, yerr=cv_stds,
label='CV', alpha=0.7, capsize=5)
plt.bar(x + width/2, test_scores, width,
label='Test', alpha=0.7)
plt.xlabel('算法')
plt.ylabel('准确率')
plt.title('梯度提升变体性能比较')
plt.xticks(x, names, rotation=45)
plt.legend()
plt.grid(True, alpha=0.3)
# 训练时间比较
plt.subplot(2, 2, 2)
bars = plt.bar(names, training_times, alpha=0.7, color='orange')
plt.xlabel('算法')
plt.ylabel('训练时间 (秒)')
plt.title('训练时间比较')
plt.xticks(rotation=45)
# 添加数值标签
for bar, time_val in zip(bars, training_times):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.1,
f'{time_val:.2f}', ha='center', va='bottom')
plt.grid(True, alpha=0.3)
# 性能vs时间散点图
plt.subplot(2, 2, 3)
colors = plt.cm.viridis(np.linspace(0, 1, len(names)))
for i, (name, color) in enumerate(zip(names, colors)):
plt.scatter(training_times[i], test_scores[i],
s=100, c=[color], label=name, alpha=0.7)
plt.xlabel('训练时间 (秒)')
plt.ylabel('测试准确率')
plt.title('性能 vs 训练时间')
plt.legend()
plt.grid(True, alpha=0.3)
# 综合评分
plt.subplot(2, 2, 4)
# 归一化指标
norm_scores = np.array(test_scores) / max(test_scores)
norm_times = 1 - (np.array(training_times) / max(training_times)) # 时间越短越好
norm_stability = 1 - (np.array(cv_stds) / max(cv_stds)) # 标准差越小越好
# 综合评分:性能40% + 速度30% + 稳定性30%
composite_scores = 0.4 * norm_scores + 0.3 * norm_times + 0.3 * norm_stability
bars = plt.bar(names, composite_scores, alpha=0.7, color='green')
plt.xlabel('算法')
plt.ylabel('综合评分')
plt.title('综合评分 (性能+速度+稳定性)')
plt.xticks(rotation=45)
# 添加数值标签
for bar, score in zip(bars, composite_scores):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
f'{score:.3f}', ha='center', va='bottom')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 梯度提升分析演示
import time
gb_analyzer = GradientBoostingAnalyzer()
print("\n梯度提升算法分析:")
print("=" * 40)
# 梯度提升过程演示
models, train_mse, test_mse = gb_analyzer.demonstrate_gradient_boosting()
# 梯度提升变体比较
gb_variants_results = gb_analyzer.compare_gb_variants()
6.5 投票方法
6.5.1 硬投票和软投票
class VotingAnalyzer:
"""
投票方法分析器
"""
def __init__(self):
self.models = {}
self.results = {}
def demonstrate_voting_methods(self):
"""
演示硬投票和软投票
"""
# 创建数据集
X, y = make_classification(
n_samples=1000, n_features=20, n_informative=15,
random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# 基础分类器
base_classifiers = [
('rf', RandomForestClassifier(n_estimators=50, random_state=42)),
('svm', SVC(probability=True, random_state=42)),
('nb', GaussianNB()),
('lr', LogisticRegression(random_state=42, max_iter=1000))
]
# 硬投票
hard_voting = VotingClassifier(
estimators=base_classifiers,
voting='hard'
)
# 软投票
soft_voting = VotingClassifier(
estimators=base_classifiers,
voting='soft'
)
# 训练所有模型
models = {
'Random Forest': base_classifiers[0][1],
'SVM': base_classifiers[1][1],
'Naive Bayes': base_classifiers[2][1],
'Logistic Regression': base_classifiers[3][1],
'Hard Voting': hard_voting,
'Soft Voting': soft_voting
}
results = {}
print("投票方法比较:")
print("=" * 30)
for name, model in models.items():
# 交叉验证
cv_scores = cross_val_score(model, X_train, y_train, cv=5)
# 训练和测试
model.fit(X_train, y_train)
test_score = model.score(X_test, y_test)
results[name] = {
'cv_mean': cv_scores.mean(),
'cv_std': cv_scores.std(),
'test_score': test_score
}
print(f"{name}:")
print(f" CV: {cv_scores.mean():.4f} (±{cv_scores.std():.4f})")
print(f" Test: {test_score:.4f}")
# 分析投票过程
self.analyze_voting_process(X_test, y_test, base_classifiers,
hard_voting, soft_voting)
# 可视化结果
self.visualize_voting_results(results)
return results
def analyze_voting_process(self, X_test, y_test, base_classifiers,
hard_voting, soft_voting):
"""
分析投票过程
"""
print("\n投票过程分析:")
print("=" * 30)
# 获取基础分类器的预测
base_predictions = {}
base_probabilities = {}
for name, model in base_classifiers:
pred = model.predict(X_test)
prob = model.predict_proba(X_test)
base_predictions[name] = pred
base_probabilities[name] = prob
# 硬投票预测
hard_pred = hard_voting.predict(X_test)
# 软投票预测
soft_pred = soft_voting.predict(X_test)
# 分析前10个样本的投票过程
print("前10个样本的投票过程:")
print("-" * 60)
for i in range(min(10, len(X_test))):
print(f"\n样本 {i+1} (真实标签: {y_test[i]}):")
# 基础分类器预测
print(" 基础分类器预测:")
votes_for_class_1 = 0
avg_prob_class_1 = 0
for name, model in base_classifiers:
pred = base_predictions[name][i]
prob = base_probabilities[name][i]
print(f" {name}: {pred} (概率: {prob[1]:.3f})")
if pred == 1:
votes_for_class_1 += 1
avg_prob_class_1 += prob[1]
avg_prob_class_1 /= len(base_classifiers)
print(f" 硬投票: {hard_pred[i]} (类别1得票: {votes_for_class_1}/{len(base_classifiers)})")
print(f" 软投票: {soft_pred[i]} (平均概率: {avg_prob_class_1:.3f})")
def visualize_voting_results(self, results):
"""
可视化投票结果
"""
plt.figure(figsize=(15, 10))
# 分离基础分类器和集成方法
base_names = ['Random Forest', 'SVM', 'Naive Bayes', 'Logistic Regression']
ensemble_names = ['Hard Voting', 'Soft Voting']
base_scores = [results[name]['test_score'] for name in base_names]
ensemble_scores = [results[name]['test_score'] for name in ensemble_names]
# 性能比较
plt.subplot(2, 2, 1)
x_base = np.arange(len(base_names))
x_ensemble = np.arange(len(ensemble_names))
plt.bar(x_base, base_scores, alpha=0.7, label='基础分类器', color='lightblue')
plt.bar(x_ensemble + len(base_names) + 0.5, ensemble_scores,
alpha=0.7, label='集成方法', color='orange')
all_names = base_names + [''] + ensemble_names
plt.xticks(range(len(all_names)), all_names, rotation=45)
plt.ylabel('测试准确率')
plt.title('基础分类器 vs 集成方法')
plt.legend()
plt.grid(True, alpha=0.3)
# CV性能和稳定性
plt.subplot(2, 2, 2)
all_names = list(results.keys())
cv_means = [results[name]['cv_mean'] for name in all_names]
cv_stds = [results[name]['cv_std'] for name in all_names]
plt.errorbar(range(len(all_names)), cv_means, yerr=cv_stds,
fmt='o-', capsize=5, capthick=2)
plt.xticks(range(len(all_names)), all_names, rotation=45)
plt.ylabel('CV准确率')
plt.title('交叉验证性能和稳定性')
plt.grid(True, alpha=0.3)
# 硬投票 vs 软投票
plt.subplot(2, 2, 3)
voting_methods = ['Hard Voting', 'Soft Voting']
voting_scores = [results[name]['test_score'] for name in voting_methods]
voting_stds = [results[name]['cv_std'] for name in voting_methods]
bars = plt.bar(voting_methods, voting_scores,
yerr=voting_stds, alpha=0.7, capsize=5)
plt.ylabel('测试准确率')
plt.title('硬投票 vs 软投票')
# 添加数值标签
for bar, score in zip(bars, voting_scores):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.005,
f'{score:.4f}', ha='center', va='bottom')
plt.grid(True, alpha=0.3)
# 性能提升分析
plt.subplot(2, 2, 4)
base_avg = np.mean(base_scores)
hard_improvement = results['Hard Voting']['test_score'] - base_avg
soft_improvement = results['Soft Voting']['test_score'] - base_avg
improvements = [hard_improvement, soft_improvement]
colors = ['green' if imp > 0 else 'red' for imp in improvements]
bars = plt.bar(['Hard Voting', 'Soft Voting'], improvements,
color=colors, alpha=0.7)
plt.axhline(y=0, color='black', linestyle='-', alpha=0.5)
plt.ylabel('性能提升')
plt.title('相对于基础分类器平均性能的提升')
# 添加数值标签
for bar, imp in zip(bars, improvements):
plt.text(bar.get_x() + bar.get_width()/2,
bar.get_height() + (0.002 if imp > 0 else -0.005),
f'{imp:+.4f}', ha='center',
va='bottom' if imp > 0 else 'top')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def weighted_voting_analysis(self):
"""
加权投票分析
"""
# 创建数据集
X, y = make_classification(
n_samples=1000, n_features=20, n_informative=15,
random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# 基础分类器
base_classifiers = [
('rf', RandomForestClassifier(n_estimators=50, random_state=42)),
('svm', SVC(probability=True, random_state=42)),
('lr', LogisticRegression(random_state=42, max_iter=1000))
]
# 评估基础分类器性能
base_scores = []
for name, model in base_classifiers:
cv_score = cross_val_score(model, X_train, y_train, cv=3).mean()
base_scores.append(cv_score)
print("\n加权投票分析:")
print("=" * 30)
print("基础分类器性能:")
for (name, _), score in zip(base_classifiers, base_scores):
print(f" {name}: {score:.4f}")
# 不同的权重策略
weight_strategies = {
'Equal': [1, 1, 1],
'Performance': base_scores,
'Squared Performance': [s**2 for s in base_scores],
'Custom': [0.5, 0.3, 0.2] # 假设我们更信任随机森林
}
results = {}
for strategy_name, weights in weight_strategies.items():
# 归一化权重
normalized_weights = np.array(weights) / np.sum(weights)
# 手动实现加权软投票
weighted_probs = np.zeros((len(X_test), 2))
for i, (name, model) in enumerate(base_classifiers):
model.fit(X_train, y_train)
probs = model.predict_proba(X_test)
weighted_probs += normalized_weights[i] * probs
# 预测
weighted_pred = np.argmax(weighted_probs, axis=1)
accuracy = np.mean(weighted_pred == y_test)
results[strategy_name] = {
'weights': normalized_weights,
'accuracy': accuracy
}
print(f"\n{strategy_name} 权重策略:")
print(f" 权重: {normalized_weights}")
print(f" 准确率: {accuracy:.4f}")
# 可视化加权投票结果
self.visualize_weighted_voting(results)
return results
def visualize_weighted_voting(self, results):
"""
可视化加权投票结果
"""
plt.figure(figsize=(12, 8))
strategies = list(results.keys())
accuracies = [results[strategy]['accuracy'] for strategy in strategies]
# 准确率比较
plt.subplot(2, 2, 1)
bars = plt.bar(strategies, accuracies, alpha=0.7)
plt.ylabel('测试准确率')
plt.title('不同权重策略的性能')
plt.xticks(rotation=45)
# 添加数值标签
for bar, acc in zip(bars, accuracies):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.002,
f'{acc:.4f}', ha='center', va='bottom')
plt.grid(True, alpha=0.3)
# 权重分布
plt.subplot(2, 2, 2)
classifier_names = ['RF', 'SVM', 'LR']
x = np.arange(len(classifier_names))
width = 0.2
for i, strategy in enumerate(strategies):
weights = results[strategy]['weights']
plt.bar(x + i*width, weights, width, label=strategy, alpha=0.7)
plt.xlabel('基础分类器')
plt.ylabel('权重')
plt.title('不同策略的权重分布')
plt.xticks(x + width*1.5, classifier_names)
plt.legend()
plt.grid(True, alpha=0.3)
# 权重 vs 性能散点图
plt.subplot(2, 2, 3)
for strategy in strategies:
weights = results[strategy]['weights']
accuracy = results[strategy]['accuracy']
# 使用权重的方差作为多样性指标
weight_variance = np.var(weights)
plt.scatter(weight_variance, accuracy, s=100,
label=strategy, alpha=0.7)
plt.xlabel('权重方差 (多样性)')
plt.ylabel('准确率')
plt.title('权重多样性 vs 性能')
plt.legend()
plt.grid(True, alpha=0.3)
# 最佳策略分析
plt.subplot(2, 2, 4)
best_strategy = max(strategies, key=lambda s: results[s]['accuracy'])
best_weights = results[best_strategy]['weights']
plt.pie(best_weights, labels=classifier_names, autopct='%1.2f%%',
startangle=90)
plt.title(f'最佳策略权重分布\n({best_strategy})')
plt.tight_layout()
plt.show()
# 投票方法分析演示
voting_analyzer = VotingAnalyzer()
print("\n投票方法分析:")
print("=" * 40)
# 投票方法演示
voting_results = voting_analyzer.demonstrate_voting_methods()
# 加权投票分析
weighted_results = voting_analyzer.weighted_voting_analysis()
6.6 堆叠方法 (Stacking)
6.6.1 堆叠原理和实现
class StackingAnalyzer:
"""
堆叠方法分析器
"""
def __init__(self):
self.models = {}
self.results = {}
def demonstrate_stacking(self):
"""
演示堆叠方法
"""
# 创建数据集
X, y = make_classification(
n_samples=1000, n_features=20, n_informative=15,
random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# 第一层基础学习器
base_learners = [
('rf', RandomForestClassifier(n_estimators=50, random_state=42)),
('svm', SVC(probability=True, random_state=42)),
('nb', GaussianNB()),
('lr', LogisticRegression(random_state=42, max_iter=1000))
]
# 第二层元学习器
meta_learner = LogisticRegression(random_state=42)
# 使用sklearn的StackingClassifier
stacking_clf = StackingClassifier(
estimators=base_learners,
final_estimator=meta_learner,
cv=5, # 交叉验证折数
stack_method='predict_proba' # 使用概率作为特征
)
# 手动实现堆叠过程
print("手动实现堆叠过程:")
print("=" * 30)
# 第一步:生成元特征
meta_features = self.generate_meta_features(
X_train, y_train, base_learners, cv=5
)
print(f"元特征形状: {meta_features.shape}")
print(f"原始特征数: {X_train.shape[1]}")
print(f"元特征数: {meta_features.shape[1]}")
# 第二步:训练元学习器
meta_learner_manual = LogisticRegression(random_state=42)
meta_learner_manual.fit(meta_features, y_train)
# 第三步:在测试集上预测
test_meta_features = self.generate_test_meta_features(
X_train, y_train, X_test, base_learners
)
manual_pred = meta_learner_manual.predict(test_meta_features)
manual_accuracy = np.mean(manual_pred == y_test)
# 比较sklearn实现
stacking_clf.fit(X_train, y_train)
sklearn_pred = stacking_clf.predict(X_test)
sklearn_accuracy = np.mean(sklearn_pred == y_test)
print(f"\n性能比较:")
print(f"手动实现准确率: {manual_accuracy:.4f}")
print(f"sklearn实现准确率: {sklearn_accuracy:.4f}")
# 比较基础学习器性能
base_accuracies = {}
for name, model in base_learners:
model.fit(X_train, y_train)
pred = model.predict(X_test)
accuracy = np.mean(pred == y_test)
base_accuracies[name] = accuracy
print(f"{name}准确率: {accuracy:.4f}")
# 可视化堆叠过程
self.visualize_stacking_process(
meta_features, y_train, base_accuracies,
manual_accuracy, sklearn_accuracy
)
return {
'manual_accuracy': manual_accuracy,
'sklearn_accuracy': sklearn_accuracy,
'base_accuracies': base_accuracies
}
def generate_meta_features(self, X, y, base_learners, cv=5):
"""
生成元特征(交叉验证预测)
"""
from sklearn.model_selection import StratifiedKFold
n_samples = X.shape[0]
n_classes = len(np.unique(y))
meta_features = np.zeros((n_samples, len(base_learners) * n_classes))
skf = StratifiedKFold(n_splits=cv, shuffle=True, random_state=42)
print("生成元特征:")
for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
print(f" 处理第 {fold+1}/{cv} 折...")
X_fold_train, X_fold_val = X[train_idx], X[val_idx]
y_fold_train = y[train_idx]
for i, (name, model) in enumerate(base_learners):
# 训练基础学习器
model_copy = clone(model)
model_copy.fit(X_fold_train, y_fold_train)
# 预测验证集
probs = model_copy.predict_proba(X_fold_val)
# 存储概率作为元特征
start_col = i * n_classes
end_col = (i + 1) * n_classes
meta_features[val_idx, start_col:end_col] = probs
return meta_features
def generate_test_meta_features(self, X_train, y_train, X_test, base_learners):
"""
生成测试集的元特征
"""
n_classes = len(np.unique(y_train))
test_meta_features = np.zeros((X_test.shape[0], len(base_learners) * n_classes))
for i, (name, model) in enumerate(base_learners):
# 在全部训练数据上训练
model_copy = clone(model)
model_copy.fit(X_train, y_train)
# 预测测试集
probs = model_copy.predict_proba(X_test)
# 存储概率作为元特征
start_col = i * n_classes
end_col = (i + 1) * n_classes
test_meta_features[:, start_col:end_col] = probs
return test_meta_features
def visualize_stacking_process(self, meta_features, y_train,
base_accuracies, manual_acc, sklearn_acc):
"""
可视化堆叠过程
"""
plt.figure(figsize=(15, 12))
# 元特征分布
plt.subplot(2, 3, 1)
plt.hist(meta_features.flatten(), bins=50, alpha=0.7, edgecolor='black')
plt.xlabel('元特征值')
plt.ylabel('频次')
plt.title('元特征分布')
plt.grid(True, alpha=0.3)
# 元特征相关性
plt.subplot(2, 3, 2)
correlation_matrix = np.corrcoef(meta_features.T)
im = plt.imshow(correlation_matrix, cmap='coolwarm', vmin=-1, vmax=1)
plt.colorbar(im)
plt.title('元特征相关性矩阵')
# 基础学习器vs堆叠性能
plt.subplot(2, 3, 3)
names = list(base_accuracies.keys()) + ['Manual Stack', 'Sklearn Stack']
accuracies = list(base_accuracies.values()) + [manual_acc, sklearn_acc]
colors = ['lightblue'] * len(base_accuracies) + ['orange', 'red']
bars = plt.bar(names, accuracies, color=colors, alpha=0.7)
plt.ylabel('准确率')
plt.title('基础学习器 vs 堆叠方法')
plt.xticks(rotation=45)
# 添加数值标签
for bar, acc in zip(bars, accuracies):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.005,
f'{acc:.3f}', ha='center', va='bottom')
plt.grid(True, alpha=0.3)
# 元特征的类别分离度
plt.subplot(2, 3, 4)
# 使用PCA降维到2D进行可视化
from sklearn.decomposition import PCA
pca = PCA(n_components=2)
meta_2d = pca.fit_transform(meta_features)
scatter = plt.scatter(meta_2d[:, 0], meta_2d[:, 1], c=y_train,
cmap='viridis', alpha=0.6)
plt.colorbar(scatter)
plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%} variance)')
plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%} variance)')
plt.title('元特征空间中的类别分布')
plt.grid(True, alpha=0.3)
# 性能提升分析
plt.subplot(2, 3, 5)
base_avg = np.mean(list(base_accuracies.values()))
base_max = max(base_accuracies.values())
improvements = {
'vs Average': manual_acc - base_avg,
'vs Best': manual_acc - base_max
}
colors = ['green' if imp > 0 else 'red' for imp in improvements.values()]
bars = plt.bar(improvements.keys(), improvements.values(),
color=colors, alpha=0.7)
plt.axhline(y=0, color='black', linestyle='-', alpha=0.5)
plt.ylabel('性能提升')
plt.title('堆叠方法性能提升')
# 添加数值标签
for bar, imp in zip(bars, improvements.values()):
plt.text(bar.get_x() + bar.get_width()/2,
bar.get_height() + (0.002 if imp > 0 else -0.005),
f'{imp:+.4f}', ha='center',
va='bottom' if imp > 0 else 'top')
plt.grid(True, alpha=0.3)
# 元学习器特征重要性
plt.subplot(2, 3, 6)
# 训练一个简单的元学习器来分析特征重要性
from sklearn.ensemble import RandomForestClassifier
rf_meta = RandomForestClassifier(n_estimators=100, random_state=42)
rf_meta.fit(meta_features, y_train)
feature_names = []
for i, (name, _) in enumerate([('rf', None), ('svm', None), ('nb', None), ('lr', None)]):
feature_names.extend([f'{name}_class0', f'{name}_class1'])
importances = rf_meta.feature_importances_
indices = np.argsort(importances)[::-1]
plt.bar(range(len(importances)), importances[indices], alpha=0.7)
plt.xlabel('元特征索引')
plt.ylabel('重要性')
plt.title('元特征重要性')
plt.xticks(range(len(importances)),
[feature_names[i] for i in indices], rotation=45)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def multi_level_stacking(self):
"""
多层堆叠演示
"""
# 创建数据集
X, y = make_classification(
n_samples=1000, n_features=20, n_informative=15,
random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
print("\n多层堆叠演示:")
print("=" * 30)
# 第一层学习器
level1_learners = [
('rf1', RandomForestClassifier(n_estimators=50, max_depth=5, random_state=42)),
('rf2', RandomForestClassifier(n_estimators=100, max_depth=10, random_state=43)),
('svm1', SVC(probability=True, C=0.1, random_state=42)),
('svm2', SVC(probability=True, C=1.0, random_state=42)),
('lr1', LogisticRegression(C=0.1, random_state=42, max_iter=1000)),
('lr2', LogisticRegression(C=1.0, random_state=42, max_iter=1000))
]
# 第二层学习器
level2_learners = [
('rf_meta', RandomForestClassifier(n_estimators=50, random_state=42)),
('svm_meta', SVC(probability=True, random_state=42)),
('lr_meta', LogisticRegression(random_state=42, max_iter=1000))
]
# 第三层学习器(最终元学习器)
final_learner = LogisticRegression(random_state=42)
# 生成第一层元特征
level1_meta_features = self.generate_meta_features(
X_train, y_train, level1_learners, cv=3
)
print(f"第一层元特征形状: {level1_meta_features.shape}")
# 生成第二层元特征
level2_meta_features = self.generate_meta_features(
level1_meta_features, y_train, level2_learners, cv=3
)
print(f"第二层元特征形状: {level2_meta_features.shape}")
# 训练最终元学习器
final_learner.fit(level2_meta_features, y_train)
# 在测试集上预测
# 第一层预测
test_level1_meta = self.generate_test_meta_features(
X_train, y_train, X_test, level1_learners
)
# 第二层预测
test_level2_meta = self.generate_test_meta_features(
level1_meta_features, y_train, test_level1_meta, level2_learners
)
# 最终预测
final_pred = final_learner.predict(test_level2_meta)
final_accuracy = np.mean(final_pred == y_test)
print(f"多层堆叠准确率: {final_accuracy:.4f}")
# 比较单层堆叠
single_stack = StackingClassifier(
estimators=level1_learners[:3], # 使用前3个学习器
final_estimator=LogisticRegression(random_state=42),
cv=3
)
single_stack.fit(X_train, y_train)
single_accuracy = single_stack.score(X_test, y_test)
print(f"单层堆叠准确率: {single_accuracy:.4f}")
# 可视化多层堆叠
self.visualize_multi_level_stacking(
level1_meta_features, level2_meta_features, y_train,
final_accuracy, single_accuracy
)
return {
'multi_level_accuracy': final_accuracy,
'single_level_accuracy': single_accuracy
}
def visualize_multi_level_stacking(self, level1_features, level2_features,
y_train, multi_acc, single_acc):
"""
可视化多层堆叠
"""
plt.figure(figsize=(15, 10))
# 特征维度变化
plt.subplot(2, 3, 1)
dimensions = ['Original', 'Level 1', 'Level 2']
dim_sizes = [20, level1_features.shape[1], level2_features.shape[1]] # 假设原始特征20维
plt.bar(dimensions, dim_sizes, alpha=0.7, color=['blue', 'orange', 'green'])
plt.ylabel('特征维度')
plt.title('多层堆叠特征维度变化')
# 添加数值标签
for i, (dim, size) in enumerate(zip(dimensions, dim_sizes)):
plt.text(i, size + 1, str(size), ha='center', va='bottom')
plt.grid(True, alpha=0.3)
# 第一层特征分布
plt.subplot(2, 3, 2)
plt.hist(level1_features.flatten(), bins=30, alpha=0.7,
label='Level 1', color='orange')
plt.xlabel('特征值')
plt.ylabel('频次')
plt.title('第一层元特征分布')
plt.legend()
plt.grid(True, alpha=0.3)
# 第二层特征分布
plt.subplot(2, 3, 3)
plt.hist(level2_features.flatten(), bins=30, alpha=0.7,
label='Level 2', color='green')
plt.xlabel('特征值')
plt.ylabel('频次')
plt.title('第二层元特征分布')
plt.legend()
plt.grid(True, alpha=0.3)
# 性能比较
plt.subplot(2, 3, 4)
methods = ['Single Level', 'Multi Level']
accuracies = [single_acc, multi_acc]
colors = ['lightblue', 'lightgreen']
bars = plt.bar(methods, accuracies, color=colors, alpha=0.7)
plt.ylabel('准确率')
plt.title('单层 vs 多层堆叠')
# 添加数值标签
for bar, acc in zip(bars, accuracies):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.002,
f'{acc:.4f}', ha='center', va='bottom')
plt.grid(True, alpha=0.3)
# 特征相关性比较
plt.subplot(2, 3, 5)
corr1 = np.corrcoef(level1_features.T)
corr2 = np.corrcoef(level2_features.T)
avg_corr1 = np.mean(np.abs(corr1[np.triu_indices_from(corr1, k=1)]))
avg_corr2 = np.mean(np.abs(corr2[np.triu_indices_from(corr2, k=1)]))
levels = ['Level 1', 'Level 2']
correlations = [avg_corr1, avg_corr2]
bars = plt.bar(levels, correlations, alpha=0.7, color=['orange', 'green'])
plt.ylabel('平均绝对相关性')
plt.title('不同层次特征相关性')
# 添加数值标签
for bar, corr in zip(bars, correlations):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
f'{corr:.3f}', ha='center', va='bottom')
plt.grid(True, alpha=0.3)
# 复杂度分析
plt.subplot(2, 3, 6)
# 模拟训练时间(实际应用中可以测量真实时间)
single_time = 1.0 # 基准时间
multi_time = 2.5 # 多层堆叠通常需要更多时间
methods = ['Single Level', 'Multi Level']
times = [single_time, multi_time]
bars = plt.bar(methods, times, alpha=0.7, color=['lightcoral', 'lightsalmon'])
plt.ylabel('相对训练时间')
plt.title('训练复杂度比较')
# 添加数值标签
for bar, time_val in zip(bars, times):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.05,
f'{time_val:.1f}x', ha='center', va='bottom')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 堆叠方法分析演示
from sklearn.base import clone
from sklearn.ensemble import StackingClassifier
stacking_analyzer = StackingAnalyzer()
print("\n堆叠方法分析:")
print("=" * 40)
# 基础堆叠演示
stacking_results = stacking_analyzer.demonstrate_stacking()
# 多层堆叠演示
multi_level_results = stacking_analyzer.multi_level_stacking()
6.7 实际案例:信用评分模型
6.7.1 案例背景和数据准备
class CreditScoringEnsemble:
"""
信用评分集成学习案例
"""
def __init__(self):
self.models = {}
self.results = {}
self.ensemble_results = {}
def create_credit_dataset(self):
"""
创建模拟信用评分数据集
"""
np.random.seed(42)
n_samples = 5000
# 生成特征
# 年龄 (18-80)
age = np.random.normal(40, 12, n_samples)
age = np.clip(age, 18, 80)
# 年收入 (20k-200k)
income = np.random.lognormal(10.5, 0.5, n_samples)
income = np.clip(income, 20000, 200000)
# 信用历史长度 (0-30年)
credit_history = np.random.exponential(8, n_samples)
credit_history = np.clip(credit_history, 0, 30)
# 债务收入比 (0-1)
debt_ratio = np.random.beta(2, 5, n_samples)
# 信用卡数量 (0-10)
num_cards = np.random.poisson(2.5, n_samples)
num_cards = np.clip(num_cards, 0, 10)
# 贷款申请次数 (0-20)
loan_applications = np.random.poisson(1.5, n_samples)
loan_applications = np.clip(loan_applications, 0, 20)
# 就业状态 (0: 失业, 1: 就业)
employment = np.random.binomial(1, 0.85, n_samples)
# 房屋所有权 (0: 租房, 1: 自有)
home_ownership = np.random.binomial(1, 0.65, n_samples)
# 组合特征
X = np.column_stack([
age, income, credit_history, debt_ratio,
num_cards, loan_applications, employment, home_ownership
])
# 生成目标变量(违约概率)
# 基于特征的逻辑回归模型
coefficients = np.array([-0.02, 0.00001, -0.1, 3.0, 0.1, 0.2, -1.5, -0.8])
intercept = 2.0
linear_combination = np.dot(X, coefficients) + intercept
probabilities = 1 / (1 + np.exp(-linear_combination))
# 添加噪声
noise = np.random.normal(0, 0.1, n_samples)
probabilities = np.clip(probabilities + noise, 0, 1)
# 生成二元标签
y = np.random.binomial(1, probabilities, n_samples)
# 特征名称
feature_names = [
'age', 'income', 'credit_history', 'debt_ratio',
'num_cards', 'loan_applications', 'employment', 'home_ownership'
]
print("信用评分数据集统计:")
print("=" * 30)
print(f"样本数量: {n_samples}")
print(f"特征数量: {X.shape[1]}")
print(f"违约率: {np.mean(y):.2%}")
print(f"类别分布: {np.bincount(y)}")
return X, y, feature_names
def comprehensive_ensemble_analysis(self):
"""
综合集成学习分析
"""
# 创建数据集
X, y, feature_names = self.create_credit_dataset()
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# 标准化特征
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
print("\n综合集成学习分析:")
print("=" * 40)
# 定义所有模型
models = {
# 基础模型
'Logistic Regression': LogisticRegression(random_state=42, max_iter=1000),
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
'SVM': SVC(probability=True, random_state=42),
'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42),
# Bagging方法
'Bagging': BaggingClassifier(
base_estimator=DecisionTreeClassifier(random_state=42),
n_estimators=100, random_state=42
),
'Extra Trees': ExtraTreesClassifier(n_estimators=100, random_state=42),
# Boosting方法
'AdaBoost': AdaBoostClassifier(n_estimators=100, random_state=42),
# 投票方法
'Hard Voting': VotingClassifier(
estimators=[
('lr', LogisticRegression(random_state=42, max_iter=1000)),
('rf', RandomForestClassifier(n_estimators=50, random_state=42)),
('svm', SVC(random_state=42))
],
voting='hard'
),
'Soft Voting': VotingClassifier(
estimators=[
('lr', LogisticRegression(random_state=42, max_iter=1000)),
('rf', RandomForestClassifier(n_estimators=50, random_state=42)),
('svm', SVC(probability=True, random_state=42))
],
voting='soft'
),
# 堆叠方法
'Stacking': StackingClassifier(
estimators=[
('lr', LogisticRegression(random_state=42, max_iter=1000)),
('rf', RandomForestClassifier(n_estimators=50, random_state=42)),
('gb', GradientBoostingClassifier(n_estimators=50, random_state=42))
],
final_estimator=LogisticRegression(random_state=42),
cv=5
)
}
# 评估所有模型
results = {}
for name, model in models.items():
print(f"\n训练 {name}...")
# 选择合适的数据(SVM使用标准化数据)
if 'SVM' in name or 'Logistic' in name:
X_train_use = X_train_scaled
X_test_use = X_test_scaled
else:
X_train_use = X_train
X_test_use = X_test
# 交叉验证
cv_scores = cross_val_score(model, X_train_use, y_train,
cv=5, scoring='roc_auc')
# 训练和预测
start_time = time.time()
model.fit(X_train_use, y_train)
training_time = time.time() - start_time
# 预测
y_pred = model.predict(X_test_use)
y_pred_proba = model.predict_proba(X_test_use)[:, 1]
# 计算指标
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
results[name] = {
'cv_auc_mean': cv_scores.mean(),
'cv_auc_std': cv_scores.std(),
'test_accuracy': accuracy_score(y_test, y_pred),
'test_precision': precision_score(y_test, y_pred),
'test_recall': recall_score(y_test, y_pred),
'test_f1': f1_score(y_test, y_pred),
'test_auc': roc_auc_score(y_test, y_pred_proba),
'training_time': training_time
}
print(f" CV AUC: {cv_scores.mean():.4f} (±{cv_scores.std():.4f})")
print(f" Test AUC: {results[name]['test_auc']:.4f}")
print(f" Test F1: {results[name]['test_f1']:.4f}")
print(f" 训练时间: {training_time:.2f}秒")
# 可视化结果
self.visualize_comprehensive_results(results)
# 详细分析最佳模型
best_model_name = max(results.keys(), key=lambda k: results[k]['test_auc'])
print(f"\n最佳模型: {best_model_name}")
print(f"AUC: {results[best_model_name]['test_auc']:.4f}")
self.results = results
return results
def visualize_comprehensive_results(self, results):
"""
可视化综合结果
"""
plt.figure(figsize=(20, 15))
model_names = list(results.keys())
# AUC性能比较
plt.subplot(3, 4, 1)
test_aucs = [results[name]['test_auc'] for name in model_names]
cv_aucs = [results[name]['cv_auc_mean'] for name in model_names]
cv_stds = [results[name]['cv_auc_std'] for name in model_names]
x = np.arange(len(model_names))
width = 0.35
plt.bar(x - width/2, cv_aucs, width, yerr=cv_stds,
label='CV AUC', alpha=0.7, capsize=3)
plt.bar(x + width/2, test_aucs, width,
label='Test AUC', alpha=0.7)
plt.xlabel('模型')
plt.ylabel('AUC')
plt.title('AUC性能比较')
plt.xticks(x, model_names, rotation=45, ha='right')
plt.legend()
plt.grid(True, alpha=0.3)
# F1分数比较
plt.subplot(3, 4, 2)
f1_scores = [results[name]['test_f1'] for name in model_names]
bars = plt.bar(model_names, f1_scores, alpha=0.7, color='orange')
plt.ylabel('F1 Score')
plt.title('F1分数比较')
plt.xticks(rotation=45, ha='right')
# 添加数值标签
for bar, score in zip(bars, f1_scores):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
f'{score:.3f}', ha='center', va='bottom', fontsize=8)
plt.grid(True, alpha=0.3)
# 训练时间比较
plt.subplot(3, 4, 3)
training_times = [results[name]['training_time'] for name in model_names]
bars = plt.bar(model_names, training_times, alpha=0.7, color='green')
plt.ylabel('训练时间 (秒)')
plt.title('训练时间比较')
plt.xticks(rotation=45, ha='right')
plt.yscale('log') # 使用对数刻度
plt.grid(True, alpha=0.3)
# 精确率vs召回率
plt.subplot(3, 4, 4)
precisions = [results[name]['test_precision'] for name in model_names]
recalls = [results[name]['test_recall'] for name in model_names]
colors = plt.cm.viridis(np.linspace(0, 1, len(model_names)))
for i, (name, color) in enumerate(zip(model_names, colors)):
plt.scatter(recalls[i], precisions[i], s=100, c=[color],
label=name, alpha=0.7)
plt.xlabel('召回率')
plt.ylabel('精确率')
plt.title('精确率 vs 召回率')
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.grid(True, alpha=0.3)
# 性能vs时间散点图
plt.subplot(3, 4, 5)
for i, (name, color) in enumerate(zip(model_names, colors)):
plt.scatter(training_times[i], test_aucs[i], s=100, c=[color],
label=name, alpha=0.7)
plt.xlabel('训练时间 (秒)')
plt.ylabel('Test AUC')
plt.title('性能 vs 训练时间')
plt.xscale('log')
plt.grid(True, alpha=0.3)
# 模型类型分组比较
plt.subplot(3, 4, 6)
model_types = {
'Basic': ['Logistic Regression', 'Random Forest', 'SVM', 'Gradient Boosting'],
'Bagging': ['Bagging', 'Extra Trees'],
'Boosting': ['AdaBoost'],
'Voting': ['Hard Voting', 'Soft Voting'],
'Stacking': ['Stacking']
}
type_aucs = {}
for type_name, models in model_types.items():
type_aucs[type_name] = np.mean([results[model]['test_auc']
for model in models if model in results])
bars = plt.bar(type_aucs.keys(), type_aucs.values(), alpha=0.7)
plt.ylabel('平均 Test AUC')
plt.title('模型类型平均性能')
plt.xticks(rotation=45)
# 添加数值标签
for bar, auc in zip(bars, type_aucs.values()):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.005,
f'{auc:.3f}', ha='center', va='bottom')
plt.grid(True, alpha=0.3)
# 稳定性分析(CV标准差)
plt.subplot(3, 4, 7)
stability_scores = [1 / (1 + results[name]['cv_auc_std']) for name in model_names]
bars = plt.bar(model_names, stability_scores, alpha=0.7, color='purple')
plt.ylabel('稳定性分数')
plt.title('模型稳定性 (1/(1+CV_std))')
plt.xticks(rotation=45, ha='right')
plt.grid(True, alpha=0.3)
# 综合评分
plt.subplot(3, 4, 8)
# 归一化指标
norm_auc = np.array(test_aucs) / max(test_aucs)
norm_time = 1 - (np.array(training_times) / max(training_times))
norm_stability = np.array(stability_scores) / max(stability_scores)
# 综合评分:性能50% + 速度25% + 稳定性25%
composite_scores = 0.5 * norm_auc + 0.25 * norm_time + 0.25 * norm_stability
bars = plt.bar(model_names, composite_scores, alpha=0.7, color='red')
plt.ylabel('综合评分')
plt.title('综合评分 (性能+速度+稳定性)')
plt.xticks(rotation=45, ha='right')
# 添加数值标签
for bar, score in zip(bars, composite_scores):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
f'{score:.3f}', ha='center', va='bottom', fontsize=8)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 信用评分集成学习案例演示
from sklearn.ensemble import ExtraTreesClassifier
credit_ensemble = CreditScoringEnsemble()
print("\n信用评分集成学习案例:")
print("=" * 50)
# 综合集成学习分析
credit_results = credit_ensemble.comprehensive_ensemble_analysis()
6.8 集成学习最佳实践
6.8.1 模型选择和组合策略
class EnsembleBestPractices:
"""
集成学习最佳实践指南
"""
def __init__(self):
self.guidelines = {}
self.examples = {}
def diversity_analysis(self):
"""
多样性分析和基础学习器选择
"""
print("集成学习最佳实践:")
print("=" * 40)
# 创建数据集
X, y = make_classification(
n_samples=1000, n_features=20, n_informative=15,
random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
print("\n1. 基础学习器多样性分析:")
print("-" * 30)
# 不同类型的基础学习器
diverse_learners = [
('决策树', DecisionTreeClassifier(random_state=42)),
('线性模型', LogisticRegression(random_state=42, max_iter=1000)),
('朴素贝叶斯', GaussianNB()),
('k近邻', KNeighborsClassifier(n_neighbors=5)),
('SVM', SVC(probability=True, random_state=42))
]
# 相似类型的基础学习器
similar_learners = [
('决策树1', DecisionTreeClassifier(max_depth=5, random_state=42)),
('决策树2', DecisionTreeClassifier(max_depth=10, random_state=43)),
('决策树3', DecisionTreeClassifier(max_depth=15, random_state=44)),
('决策树4', DecisionTreeClassifier(max_depth=20, random_state=45)),
('决策树5', DecisionTreeClassifier(max_depth=25, random_state=46))
]
# 比较多样性集成vs相似性集成
diverse_ensemble = VotingClassifier(
estimators=diverse_learners,
voting='soft'
)
similar_ensemble = VotingClassifier(
estimators=similar_learners,
voting='soft'
)
# 评估性能
diverse_scores = cross_val_score(diverse_ensemble, X_train, y_train, cv=5)
similar_scores = cross_val_score(similar_ensemble, X_train, y_train, cv=5)
print(f"多样性集成 CV准确率: {diverse_scores.mean():.4f} (±{diverse_scores.std():.4f})")
print(f"相似性集成 CV准确率: {similar_scores.mean():.4f} (±{similar_scores.std():.4f})")
# 计算预测多样性
diverse_ensemble.fit(X_train, y_train)
similar_ensemble.fit(X_train, y_train)
diverse_predictions = self.get_individual_predictions(diverse_learners, X_train, y_train, X_test)
similar_predictions = self.get_individual_predictions(similar_learners, X_train, y_train, X_test)
diverse_disagreement = self.calculate_disagreement(diverse_predictions)
similar_disagreement = self.calculate_disagreement(similar_predictions)
print(f"多样性集成分歧度: {diverse_disagreement:.4f}")
print(f"相似性集成分歧度: {similar_disagreement:.4f}")
# 可视化多样性分析
self.visualize_diversity_analysis(
diverse_scores, similar_scores,
diverse_disagreement, similar_disagreement,
diverse_predictions, similar_predictions
)
return {
'diverse_performance': diverse_scores.mean(),
'similar_performance': similar_scores.mean(),
'diverse_disagreement': diverse_disagreement,
'similar_disagreement': similar_disagreement
}
def get_individual_predictions(self, learners, X_train, y_train, X_test):
"""
获取各个学习器的预测结果
"""
predictions = []
for name, learner in learners:
learner.fit(X_train, y_train)
pred = learner.predict(X_test)
predictions.append(pred)
return np.array(predictions)
def calculate_disagreement(self, predictions):
"""
计算学习器之间的分歧度
"""
n_learners, n_samples = predictions.shape
disagreements = []
for i in range(n_learners):
for j in range(i+1, n_learners):
disagreement = np.mean(predictions[i] != predictions[j])
disagreements.append(disagreement)
return np.mean(disagreements)
def visualize_diversity_analysis(self, diverse_scores, similar_scores,
diverse_disagreement, similar_disagreement,
diverse_predictions, similar_predictions):
"""
可视化多样性分析
"""
plt.figure(figsize=(15, 10))
# 性能比较
plt.subplot(2, 3, 1)
ensemble_types = ['多样性集成', '相似性集成']
performances = [diverse_scores.mean(), similar_scores.mean()]
errors = [diverse_scores.std(), similar_scores.std()]
bars = plt.bar(ensemble_types, performances, yerr=errors,
capsize=5, alpha=0.7, color=['blue', 'orange'])
plt.ylabel('CV准确率')
plt.title('集成性能比较')
# 添加数值标签
for bar, perf in zip(bars, performances):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
f'{perf:.4f}', ha='center', va='bottom')
plt.grid(True, alpha=0.3)
# 分歧度比较
plt.subplot(2, 3, 2)
disagreements = [diverse_disagreement, similar_disagreement]
bars = plt.bar(ensemble_types, disagreements, alpha=0.7,
color=['green', 'red'])
plt.ylabel('分歧度')
plt.title('学习器分歧度比较')
# 添加数值标签
for bar, disagree in zip(bars, disagreements):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
f'{disagree:.4f}', ha='center', va='bottom')
plt.grid(True, alpha=0.3)
# 预测一致性热图(多样性集成)
plt.subplot(2, 3, 3)
diverse_agreement = np.zeros((len(diverse_predictions), len(diverse_predictions)))
for i in range(len(diverse_predictions)):
for j in range(len(diverse_predictions)):
diverse_agreement[i, j] = np.mean(diverse_predictions[i] == diverse_predictions[j])
im1 = plt.imshow(diverse_agreement, cmap='Blues', vmin=0, vmax=1)
plt.colorbar(im1)
plt.title('多样性集成一致性矩阵')
plt.xlabel('学习器')
plt.ylabel('学习器')
# 预测一致性热图(相似性集成)
plt.subplot(2, 3, 4)
similar_agreement = np.zeros((len(similar_predictions), len(similar_predictions)))
for i in range(len(similar_predictions)):
for j in range(len(similar_predictions)):
similar_agreement[i, j] = np.mean(similar_predictions[i] == similar_predictions[j])
im2 = plt.imshow(similar_agreement, cmap='Reds', vmin=0, vmax=1)
plt.colorbar(im2)
plt.title('相似性集成一致性矩阵')
plt.xlabel('学习器')
plt.ylabel('学习器')
# 性能vs多样性散点图
plt.subplot(2, 3, 5)
plt.scatter([diverse_disagreement], [diverse_scores.mean()],
s=100, c='blue', label='多样性集成', alpha=0.7)
plt.scatter([similar_disagreement], [similar_scores.mean()],
s=100, c='orange', label='相似性集成', alpha=0.7)
plt.xlabel('分歧度')
plt.ylabel('CV准确率')
plt.title('性能 vs 多样性')
plt.legend()
plt.grid(True, alpha=0.3)
# 最佳实践建议
plt.subplot(2, 3, 6)
plt.text(0.1, 0.9, '集成学习最佳实践:', fontsize=14, fontweight='bold')
plt.text(0.1, 0.8, '1. 选择不同类型的基础学习器', fontsize=10)
plt.text(0.1, 0.7, '2. 确保学习器之间有足够多样性', fontsize=10)
plt.text(0.1, 0.6, '3. 避免使用过于相似的模型', fontsize=10)
plt.text(0.1, 0.5, '4. 平衡个体性能和多样性', fontsize=10)
plt.text(0.1, 0.4, '5. 考虑计算复杂度和解释性', fontsize=10)
plt.text(0.1, 0.3, '6. 使用交叉验证评估集成性能', fontsize=10)
plt.text(0.1, 0.2, '7. 根据问题特点选择集成方法', fontsize=10)
plt.axis('off')
plt.tight_layout()
plt.show()
def ensemble_selection_guide(self):
"""
集成方法选择指南
"""
print("\n2. 集成方法选择指南:")
print("-" * 30)
selection_guide = {
'Bagging': {
'适用场景': ['高方差模型', '过拟合严重', '数据量充足'],
'优点': ['减少方差', '并行训练', '简单有效'],
'缺点': ['可能增加偏差', '内存消耗大'],
'推荐模型': ['决策树', '神经网络', 'k-NN']
},
'Boosting': {
'适用场景': ['高偏差模型', '欠拟合问题', '需要高精度'],
'优点': ['减少偏差', '提升弱学习器', '理论保证'],
'缺点': ['容易过拟合', '对噪声敏感', '串行训练'],
'推荐模型': ['决策树桩', '线性模型', '简单规则']
},
'Voting': {
'适用场景': ['模型性能相近', '快速集成', '模型解释性'],
'优点': ['简单直观', '计算高效', '易于理解'],
'缺点': ['性能提升有限', '依赖基础模型质量'],
'推荐模型': ['不同类型的强学习器']
},
'Stacking': {
'适用场景': ['复杂问题', '充足计算资源', '追求最优性能'],
'优点': ['性能最优', '充分利用模型', '灵活性高'],
'缺点': ['计算复杂', '容易过拟合', '难以解释'],
'推荐模型': ['多样化的强学习器']
}
}
for method, details in selection_guide.items():
print(f"\n{method}:")
for aspect, items in details.items():
print(f" {aspect}: {', '.join(items)}")
return selection_guide
def hyperparameter_tuning_strategies(self):
"""
集成学习超参数调优策略
"""
print("\n3. 超参数调优策略:")
print("-" * 30)
# 创建数据集
X, y = make_classification(
n_samples=1000, n_features=20, n_informative=15,
random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# 随机森林超参数调优示例
print("随机森林超参数调优示例:")
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
# 定义参数网格
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [5, 10, 15, None],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4],
'max_features': ['sqrt', 'log2', None]
}
# 网格搜索
rf = RandomForestClassifier(random_state=42)
grid_search = GridSearchCV(
rf, param_grid, cv=3, scoring='accuracy',
n_jobs=-1, verbose=1
)
print("执行网格搜索...")
grid_search.fit(X_train, y_train)
print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳CV分数: {grid_search.best_score_:.4f}")
# 随机搜索(更高效)
from scipy.stats import randint, uniform
param_dist = {
'n_estimators': randint(50, 300),
'max_depth': [5, 10, 15, 20, None],
'min_samples_split': randint(2, 20),
'min_samples_leaf': randint(1, 10),
'max_features': ['sqrt', 'log2', None]
}
random_search = RandomizedSearchCV(
rf, param_dist, n_iter=50, cv=3,
scoring='accuracy', n_jobs=-1, random_state=42
)
print("\n执行随机搜索...")
random_search.fit(X_train, y_train)
print(f"最佳参数: {random_search.best_params_}")
print(f"最佳CV分数: {random_search.best_score_:.4f}")
# 比较调优前后性能
rf_default = RandomForestClassifier(random_state=42)
rf_tuned = grid_search.best_estimator_
rf_default.fit(X_train, y_train)
rf_tuned.fit(X_train, y_train)
default_score = rf_default.score(X_test, y_test)
tuned_score = rf_tuned.score(X_test, y_test)
print(f"\n性能比较:")
print(f"默认参数准确率: {default_score:.4f}")
print(f"调优后准确率: {tuned_score:.4f}")
print(f"性能提升: {tuned_score - default_score:.4f}")
return {
'grid_search_best': grid_search.best_params_,
'random_search_best': random_search.best_params_,
'performance_improvement': tuned_score - default_score
}
# 集成学习最佳实践演示
best_practices = EnsembleBestPractices()
print("\n集成学习最佳实践指南:")
print("=" * 50)
# 多样性分析
diversity_results = best_practices.diversity_analysis()
# 方法选择指南
selection_guide = best_practices.ensemble_selection_guide()
# 超参数调优策略
tuning_results = best_practices.hyperparameter_tuning_strategies()
6.8.2 常见陷阱和解决方案
class EnsembleCommonPitfalls:
"""
集成学习常见陷阱和解决方案
"""
def __init__(self):
self.pitfalls = {}
self.solutions = {}
def demonstrate_overfitting_pitfall(self):
"""
演示过拟合陷阱
"""
print("\n常见陷阱1: 集成过拟合")
print("-" * 30)
# 创建小数据集(容易过拟合)
X, y = make_classification(
n_samples=200, n_features=50, n_informative=10,
random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# 过度复杂的集成(容易过拟合)
complex_ensemble = VotingClassifier(
estimators=[
('rf1', RandomForestClassifier(n_estimators=500, max_depth=None, random_state=42)),
('rf2', RandomForestClassifier(n_estimators=500, max_depth=None, random_state=43)),
('rf3', RandomForestClassifier(n_estimators=500, max_depth=None, random_state=44)),
('rf4', RandomForestClassifier(n_estimators=500, max_depth=None, random_state=45)),
('rf5', RandomForestClassifier(n_estimators=500, max_depth=None, random_state=46))
],
voting='soft'
)
# 简单集成(更好的泛化)
simple_ensemble = VotingClassifier(
estimators=[
('rf', RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42)),
('lr', LogisticRegression(random_state=42, max_iter=1000)),
('svm', SVC(probability=True, C=1.0, random_state=42))
],
voting='soft'
)
# 训练和评估
complex_ensemble.fit(X_train, y_train)
simple_ensemble.fit(X_train, y_train)
complex_train_score = complex_ensemble.score(X_train, y_train)
complex_test_score = complex_ensemble.score(X_test, y_test)
simple_train_score = simple_ensemble.score(X_train, y_train)
simple_test_score = simple_ensemble.score(X_test, y_test)
print(f"复杂集成 - 训练准确率: {complex_train_score:.4f}, 测试准确率: {complex_test_score:.4f}")
print(f"简单集成 - 训练准确率: {simple_train_score:.4f}, 测试准确率: {simple_test_score:.4f}")
print(f"复杂集成过拟合程度: {complex_train_score - complex_test_score:.4f}")
print(f"简单集成过拟合程度: {simple_train_score - simple_test_score:.4f}")
return {
'complex_overfitting': complex_train_score - complex_test_score,
'simple_overfitting': simple_train_score - simple_test_score
}
def demonstrate_data_leakage_pitfall(self):
"""
演示数据泄漏陷阱
"""
print("\n常见陷阱2: 数据泄漏")
print("-" * 30)
# 创建数据集
X, y = make_classification(
n_samples=1000, n_features=20, n_informative=15,
random_state=42
)
# 错误做法:在分割前进行特征选择
print("错误做法:在分割前进行特征选择")
from sklearn.feature_selection import SelectKBest, f_classif
# 在全部数据上进行特征选择(数据泄漏)
selector = SelectKBest(f_classif, k=10)
X_selected_wrong = selector.fit_transform(X, y)
X_train_wrong, X_test_wrong, y_train, y_test = train_test_split(
X_selected_wrong, y, test_size=0.3, random_state=42
)
rf_wrong = RandomForestClassifier(n_estimators=100, random_state=42)
rf_wrong.fit(X_train_wrong, y_train)
wrong_score = rf_wrong.score(X_test_wrong, y_test)
print(f"错误做法测试准确率: {wrong_score:.4f}")
# 正确做法:在分割后进行特征选择
print("\n正确做法:在分割后进行特征选择")
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# 只在训练集上进行特征选择
selector_correct = SelectKBest(f_classif, k=10)
X_train_selected = selector_correct.fit_transform(X_train, y_train)
X_test_selected = selector_correct.transform(X_test)
rf_correct = RandomForestClassifier(n_estimators=100, random_state=42)
rf_correct.fit(X_train_selected, y_train)
correct_score = rf_correct.score(X_test_selected, y_test)
print(f"正确做法测试准确率: {correct_score:.4f}")
print(f"性能差异: {wrong_score - correct_score:.4f}")
return {
'wrong_approach_score': wrong_score,
'correct_approach_score': correct_score,
'performance_difference': wrong_score - correct_score
}
def demonstrate_computational_pitfall(self):
"""
演示计算复杂度陷阱
"""
print("\n常见陷阱3: 计算复杂度过高")
print("-" * 30)
# 创建数据集
X, y = make_classification(
n_samples=1000, n_features=20, n_informative=15,
random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# 计算密集型集成
intensive_ensemble = VotingClassifier(
estimators=[
('rf1', RandomForestClassifier(n_estimators=1000, random_state=42)),
('rf2', RandomForestClassifier(n_estimators=1000, random_state=43)),
('rf3', RandomForestClassifier(n_estimators=1000, random_state=44)),
('svm1', SVC(probability=True, gamma='scale', random_state=42)),
('svm2', SVC(probability=True, gamma='scale', random_state=43))
],
voting='soft'
)
# 高效集成
efficient_ensemble = VotingClassifier(
estimators=[
('rf', RandomForestClassifier(n_estimators=100, random_state=42)),
('lr', LogisticRegression(random_state=42, max_iter=1000)),
('nb', GaussianNB())
],
voting='soft'
)
# 测量训练时间
import time
start_time = time.time()
intensive_ensemble.fit(X_train, y_train)
intensive_time = time.time() - start_time
intensive_score = intensive_ensemble.score(X_test, y_test)
start_time = time.time()
efficient_ensemble.fit(X_train, y_train)
efficient_time = time.time() - start_time
efficient_score = efficient_ensemble.score(X_test, y_test)
print(f"计算密集型集成 - 训练时间: {intensive_time:.2f}秒, 准确率: {intensive_score:.4f}")
print(f"高效集成 - 训练时间: {efficient_time:.2f}秒, 准确率: {efficient_score:.4f}")
print(f"时间效率提升: {intensive_time / efficient_time:.1f}倍")
print(f"性能差异: {intensive_score - efficient_score:.4f}")
return {
'intensive_time': intensive_time,
'efficient_time': efficient_time,
'time_ratio': intensive_time / efficient_time,
'performance_difference': intensive_score - efficient_score
}
# 常见陷阱演示
pitfalls_demo = EnsembleCommonPitfalls()
print("\n集成学习常见陷阱和解决方案:")
print("=" * 50)
# 过拟合陷阱
overfitting_results = pitfalls_demo.demonstrate_overfitting_pitfall()
# 数据泄漏陷阱
leakage_results = pitfalls_demo.demonstrate_data_leakage_pitfall()
# 计算复杂度陷阱
computational_results = pitfalls_demo.demonstrate_computational_pitfall()
6.9 本章小结
通过本章的学习,我们深入了解了集成学习的核心概念、主要方法和实际应用。让我们回顾一下本章的主要内容:
6.9.1 核心概念回顾
集成学习基础
- 集成学习通过组合多个学习器来提升预测性能
- 偏差-方差分解理论解释了集成学习的有效性
- 多样性是集成学习成功的关键因素
主要集成方法
- Bagging: 通过Bootstrap采样减少方差,适用于高方差模型
- Boosting: 通过序列训练减少偏差,适用于高偏差模型
- Voting: 通过投票机制组合预测,简单有效
- Stacking: 通过元学习器学习组合策略,性能最优
具体算法实现
- 随机森林:Bagging + 特征随机性
- AdaBoost:自适应权重调整
- 梯度提升:梯度下降优化
- 投票分类器:硬投票和软投票
- 堆叠分类器:多层学习架构
6.9.2 实践技能总结
模型选择策略
- 根据问题特点选择合适的集成方法
- 平衡模型复杂度和性能提升
- 考虑计算资源和时间约束
超参数优化
- 使用网格搜索和随机搜索
- 交叉验证评估集成性能
- 避免过拟合和数据泄漏
性能评估
- 多指标综合评估
- 稳定性和鲁棒性分析
- 计算效率考量
6.9.3 最佳实践指南
基础学习器选择
- 选择不同类型的学习器增加多样性
- 确保个体学习器有一定的预测能力
- 避免使用过于相似的模型
避免常见陷阱
- 防止集成过拟合
- 避免数据泄漏
- 控制计算复杂度
实际应用建议
- 从简单方法开始(如Voting)
- 逐步尝试复杂方法(如Stacking)
- 根据业务需求平衡性能和解释性
6.9.4 下一章预告
在下一章中,我们将学习深度学习基础,包括: - 神经网络基本原理 - 反向传播算法 - 深度学习框架使用 - 卷积神经网络和循环神经网络 - 深度学习在实际问题中的应用
6.9.5 练习题
理论题
- 解释偏差-方差分解如何指导集成学习方法的选择
- 比较Bagging和Boosting的优缺点和适用场景
- 分析为什么多样性对集成学习如此重要
编程题
- 实现一个简单的Bagging分类器,并与sklearn版本比较
- 使用不同的集成方法解决一个多分类问题
- 设计实验比较不同集成方法在不平衡数据集上的性能
项目题
- 选择一个实际数据集,设计完整的集成学习解决方案
- 包括数据预处理、模型选择、超参数优化和性能评估
- 分析不同集成方法的性能差异和计算效率
思考题
- 在什么情况下集成学习可能不会带来性能提升?
- 如何在保持高性能的同时提高集成模型的解释性?
- 讨论集成学习在大数据和实时预测场景中的挑战和解决方案
通过本章的学习,你应该能够: - 理解集成学习的理论基础和工作原理 - 熟练使用各种集成学习方法 - 根据问题特点选择合适的集成策略 - 避免常见的陷阱和错误 - 在实际项目中应用集成学习技术
集成学习是机器学习中最实用和有效的技术之一,掌握这些方法将大大提升你解决实际问题的能力。在下一章中,我们将进入深度学习的世界,探索更加强大的学习算法。