5.1 模型评估概述
模型评估是机器学习项目中的关键环节,它帮助我们了解模型的性能、选择最佳模型、调优超参数,并确保模型的泛化能力。
5.1.1 评估的重要性
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import make_classification, make_regression, load_breast_cancer, load_boston
from sklearn.model_selection import train_test_split, cross_val_score, validation_curve, learning_curve
from sklearn.linear_model import LogisticRegression, LinearRegression, Ridge, Lasso
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.svm import SVC, SVR
from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score, roc_auc_score,
confusion_matrix, classification_report, roc_curve, precision_recall_curve,
mean_squared_error, mean_absolute_error, r2_score
)
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
import warnings
warnings.filterwarnings('ignore')
class ModelEvaluationDemo:
"""
模型评估演示类
"""
def __init__(self):
self.results = {}
def demonstrate_evaluation_importance(self):
"""
演示模型评估的重要性
"""
# 创建分类数据集
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10,
n_redundant=10, n_clusters_per_class=1, random_state=42)
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 不同复杂度的模型
models = {
'简单模型': LogisticRegression(random_state=42),
'中等复杂度': DecisionTreeClassifier(max_depth=10, random_state=42),
'复杂模型': DecisionTreeClassifier(random_state=42), # 无深度限制
'过拟合模型': DecisionTreeClassifier(min_samples_split=2, min_samples_leaf=1, random_state=42)
}
results = []
for name, model in models.items():
# 训练模型
model.fit(X_train, y_train)
# 计算训练和测试准确率
train_acc = model.score(X_train, y_train)
test_acc = model.score(X_test, y_test)
results.append({
'模型': name,
'训练准确率': train_acc,
'测试准确率': test_acc,
'差异': train_acc - test_acc
})
# 可视化结果
results_df = pd.DataFrame(results)
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# 训练vs测试准确率
x = np.arange(len(results_df))
width = 0.35
axes[0].bar(x - width/2, results_df['训练准确率'], width, label='训练准确率', alpha=0.8)
axes[0].bar(x + width/2, results_df['测试准确率'], width, label='测试准确率', alpha=0.8)
axes[0].set_xlabel('模型')
axes[0].set_ylabel('准确率')
axes[0].set_title('训练 vs 测试准确率')
axes[0].set_xticks(x)
axes[0].set_xticklabels(results_df['模型'], rotation=45)
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 过拟合程度
axes[1].bar(results_df['模型'], results_df['差异'], alpha=0.8, color='red')
axes[1].set_xlabel('模型')
axes[1].set_ylabel('过拟合程度 (训练-测试)')
axes[1].set_title('模型过拟合程度')
axes[1].tick_params(axis='x', rotation=45)
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("模型评估结果:")
print(results_df.round(4))
return results_df
def demonstrate_train_test_split_importance(self):
"""
演示训练测试分割的重要性
"""
# 创建数据
X, y = make_classification(n_samples=500, n_features=10, n_informative=5,
n_redundant=5, random_state=42)
# 不同的测试集比例
test_sizes = [0.1, 0.2, 0.3, 0.4, 0.5]
results = []
for test_size in test_sizes:
scores = []
# 多次随机分割
for random_state in range(10):
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state
)
model = LogisticRegression(random_state=42)
model.fit(X_train, y_train)
score = model.score(X_test, y_test)
scores.append(score)
results.append({
'测试集比例': test_size,
'平均准确率': np.mean(scores),
'标准差': np.std(scores),
'训练集大小': int(len(X) * (1 - test_size)),
'测试集大小': int(len(X) * test_size)
})
results_df = pd.DataFrame(results)
# 可视化
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# 准确率 vs 测试集比例
axes[0].errorbar(results_df['测试集比例'], results_df['平均准确率'],
yerr=results_df['标准差'], marker='o', capsize=5)
axes[0].set_xlabel('测试集比例')
axes[0].set_ylabel('平均准确率')
axes[0].set_title('测试集比例对评估的影响')
axes[0].grid(True, alpha=0.3)
# 数据集大小
axes[1].plot(results_df['测试集比例'], results_df['训练集大小'],
marker='o', label='训练集大小')
axes[1].plot(results_df['测试集比例'], results_df['测试集大小'],
marker='s', label='测试集大小')
axes[1].set_xlabel('测试集比例')
axes[1].set_ylabel('样本数量')
axes[1].set_title('数据集大小变化')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("测试集比例影响分析:")
print(results_df.round(4))
return results_df
# 演示模型评估的重要性
evaluation_demo = ModelEvaluationDemo()
print("模型评估重要性演示:")
print("=" * 30)
# 演示评估重要性
eval_results = evaluation_demo.demonstrate_evaluation_importance()
# 演示训练测试分割的重要性
split_results = evaluation_demo.demonstrate_train_test_split_importance()
5.2 分类模型评估指标
5.2.1 基础评估指标
class ClassificationMetrics:
"""
分类模型评估指标分析
"""
def __init__(self):
self.metrics_results = {}
def demonstrate_basic_metrics(self):
"""
演示基础分类指标
"""
# 加载乳腺癌数据集
data = load_breast_cancer()
X, y = data.data, data.target
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 训练不同模型
models = {
'Logistic Regression': LogisticRegression(random_state=42),
'Decision Tree': DecisionTreeClassifier(random_state=42),
'Random Forest': RandomForestClassifier(random_state=42),
'SVM': SVC(probability=True, random_state=42)
}
results = []
predictions = {}
for name, model in models.items():
# 训练模型
model.fit(X_train_scaled, y_train)
# 预测
y_pred = model.predict(X_test_scaled)
y_pred_proba = model.predict_proba(X_test_scaled)[:, 1]
predictions[name] = {
'y_pred': y_pred,
'y_pred_proba': y_pred_proba
}
# 计算指标
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
auc = roc_auc_score(y_test, y_pred_proba)
results.append({
'模型': name,
'准确率': accuracy,
'精确率': precision,
'召回率': recall,
'F1分数': f1,
'AUC': auc
})
# 结果可视化
results_df = pd.DataFrame(results)
# 指标比较热力图
metrics_data = results_df.set_index('模型')[['准确率', '精确率', '召回率', 'F1分数', 'AUC']]
plt.figure(figsize=(10, 6))
sns.heatmap(metrics_data.T, annot=True, fmt='.3f', cmap='viridis',
cbar_kws={'label': '分数'})
plt.title('分类模型评估指标比较')
plt.xlabel('模型')
plt.ylabel('评估指标')
plt.tight_layout()
plt.show()
print("分类模型评估结果:")
print(results_df.round(4))
return results_df, predictions, y_test
def analyze_confusion_matrices(self, predictions, y_test):
"""
分析混淆矩阵
"""
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
axes = axes.ravel()
for i, (name, pred_data) in enumerate(predictions.items()):
y_pred = pred_data['y_pred']
# 计算混淆矩阵
cm = confusion_matrix(y_test, y_pred)
# 绘制混淆矩阵
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=axes[i],
xticklabels=['恶性', '良性'], yticklabels=['恶性', '良性'])
axes[i].set_title(f'{name} 混淆矩阵')
axes[i].set_xlabel('预测标签')
axes[i].set_ylabel('真实标签')
# 计算各类指标
tn, fp, fn, tp = cm.ravel()
# 添加文本说明
axes[i].text(0.02, 0.98, f'TN: {tn}\nFP: {fp}\nFN: {fn}\nTP: {tp}',
transform=axes[i].transAxes, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
plt.tight_layout()
plt.show()
# 详细分析每个模型的混淆矩阵
for name, pred_data in predictions.items():
y_pred = pred_data['y_pred']
print(f"\n{name} 详细分析:")
print("=" * 20)
print(classification_report(y_test, y_pred, target_names=['恶性', '良性']))
def plot_roc_curves(self, predictions, y_test):
"""
绘制ROC曲线
"""
plt.figure(figsize=(10, 8))
for name, pred_data in predictions.items():
y_pred_proba = pred_data['y_pred_proba']
# 计算ROC曲线
fpr, tpr, _ = roc_curve(y_test, y_pred_proba)
auc = roc_auc_score(y_test, y_pred_proba)
plt.plot(fpr, tpr, label=f'{name} (AUC = {auc:.3f})', linewidth=2)
# 绘制对角线
plt.plot([0, 1], [0, 1], 'k--', alpha=0.5, label='随机分类器')
plt.xlabel('假正率 (FPR)')
plt.ylabel('真正率 (TPR)')
plt.title('ROC曲线比较')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
def plot_precision_recall_curves(self, predictions, y_test):
"""
绘制精确率-召回率曲线
"""
plt.figure(figsize=(10, 8))
for name, pred_data in predictions.items():
y_pred_proba = pred_data['y_pred_proba']
# 计算PR曲线
precision, recall, _ = precision_recall_curve(y_test, y_pred_proba)
plt.plot(recall, precision, label=f'{name}', linewidth=2)
plt.xlabel('召回率 (Recall)')
plt.ylabel('精确率 (Precision)')
plt.title('精确率-召回率曲线比较')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
def analyze_threshold_effects(self, predictions, y_test):
"""
分析阈值对分类结果的影响
"""
# 选择一个模型进行分析
model_name = 'Logistic Regression'
y_pred_proba = predictions[model_name]['y_pred_proba']
# 不同阈值
thresholds = np.arange(0.1, 1.0, 0.05)
metrics_by_threshold = []
for threshold in thresholds:
y_pred_thresh = (y_pred_proba >= threshold).astype(int)
accuracy = accuracy_score(y_test, y_pred_thresh)
precision = precision_score(y_test, y_pred_thresh, zero_division=0)
recall = recall_score(y_test, y_pred_thresh, zero_division=0)
f1 = f1_score(y_test, y_pred_thresh, zero_division=0)
metrics_by_threshold.append({
'阈值': threshold,
'准确率': accuracy,
'精确率': precision,
'召回率': recall,
'F1分数': f1
})
metrics_df = pd.DataFrame(metrics_by_threshold)
# 可视化
plt.figure(figsize=(12, 8))
plt.plot(metrics_df['阈值'], metrics_df['准确率'], 'o-', label='准确率')
plt.plot(metrics_df['阈值'], metrics_df['精确率'], 's-', label='精确率')
plt.plot(metrics_df['阈值'], metrics_df['召回率'], '^-', label='召回率')
plt.plot(metrics_df['阈值'], metrics_df['F1分数'], 'd-', label='F1分数')
plt.xlabel('分类阈值')
plt.ylabel('指标值')
plt.title(f'{model_name} - 阈值对评估指标的影响')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
# 找到最佳F1分数对应的阈值
best_f1_idx = metrics_df['F1分数'].idxmax()
best_threshold = metrics_df.loc[best_f1_idx, '阈值']
best_f1 = metrics_df.loc[best_f1_idx, 'F1分数']
print(f"最佳F1分数阈值: {best_threshold:.3f}")
print(f"最佳F1分数: {best_f1:.3f}")
return metrics_df
# 分类指标演示
classification_metrics = ClassificationMetrics()
print("\n分类模型评估指标演示:")
print("=" * 30)
# 基础指标演示
metrics_results, predictions, y_test = classification_metrics.demonstrate_basic_metrics()
# 混淆矩阵分析
classification_metrics.analyze_confusion_matrices(predictions, y_test)
# ROC曲线
classification_metrics.plot_roc_curves(predictions, y_test)
# PR曲线
classification_metrics.plot_precision_recall_curves(predictions, y_test)
# 阈值分析
threshold_analysis = classification_metrics.analyze_threshold_effects(predictions, y_test)
5.2.2 多分类评估
class MultiClassEvaluation:
"""
多分类模型评估
"""
def __init__(self):
self.results = {}
def demonstrate_multiclass_metrics(self):
"""
演示多分类评估指标
"""
# 创建多分类数据集
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10,
n_redundant=10, n_classes=4, n_clusters_per_class=1,
random_state=42)
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 训练模型
models = {
'Logistic Regression': LogisticRegression(random_state=42, max_iter=1000),
'Random Forest': RandomForestClassifier(random_state=42),
'SVM': SVC(random_state=42)
}
results = []
predictions = {}
for name, model in models.items():
model.fit(X_train_scaled, y_train)
y_pred = model.predict(X_test_scaled)
predictions[name] = y_pred
# 计算不同平均方式的指标
accuracy = accuracy_score(y_test, y_pred)
precision_macro = precision_score(y_test, y_pred, average='macro')
precision_micro = precision_score(y_test, y_pred, average='micro')
precision_weighted = precision_score(y_test, y_pred, average='weighted')
recall_macro = recall_score(y_test, y_pred, average='macro')
recall_micro = recall_score(y_test, y_pred, average='micro')
recall_weighted = recall_score(y_test, y_pred, average='weighted')
f1_macro = f1_score(y_test, y_pred, average='macro')
f1_micro = f1_score(y_test, y_pred, average='micro')
f1_weighted = f1_score(y_test, y_pred, average='weighted')
results.append({
'模型': name,
'准确率': accuracy,
'精确率(macro)': precision_macro,
'精确率(micro)': precision_micro,
'精确率(weighted)': precision_weighted,
'召回率(macro)': recall_macro,
'召回率(micro)': recall_micro,
'召回率(weighted)': recall_weighted,
'F1(macro)': f1_macro,
'F1(micro)': f1_micro,
'F1(weighted)': f1_weighted
})
results_df = pd.DataFrame(results)
# 可视化不同平均方式的比较
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
metrics = ['精确率', '召回率', 'F1']
averages = ['macro', 'micro', 'weighted']
for i, metric in enumerate(metrics):
metric_data = []
for avg in averages:
col_name = f'{metric}({avg})'
metric_data.append(results_df[col_name].values)
x = np.arange(len(results_df))
width = 0.25
for j, (avg, data) in enumerate(zip(averages, metric_data)):
axes[i].bar(x + j*width, data, width, label=avg, alpha=0.8)
axes[i].set_xlabel('模型')
axes[i].set_ylabel(f'{metric}分数')
axes[i].set_title(f'{metric}不同平均方式比较')
axes[i].set_xticks(x + width)
axes[i].set_xticklabels(results_df['模型'], rotation=45)
axes[i].legend()
axes[i].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("多分类评估结果:")
print(results_df.round(4))
return results_df, predictions, y_test
def plot_multiclass_confusion_matrices(self, predictions, y_test):
"""
绘制多分类混淆矩阵
"""
n_models = len(predictions)
fig, axes = plt.subplots(1, n_models, figsize=(5*n_models, 4))
if n_models == 1:
axes = [axes]
for i, (name, y_pred) in enumerate(predictions.items()):
cm = confusion_matrix(y_test, y_pred)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=axes[i])
axes[i].set_title(f'{name} 混淆矩阵')
axes[i].set_xlabel('预测标签')
axes[i].set_ylabel('真实标签')
plt.tight_layout()
plt.show()
def analyze_per_class_performance(self, predictions, y_test):
"""
分析每个类别的性能
"""
# 选择一个模型进行详细分析
model_name = list(predictions.keys())[0]
y_pred = predictions[model_name]
# 计算每个类别的指标
precision_per_class = precision_score(y_test, y_pred, average=None)
recall_per_class = recall_score(y_test, y_pred, average=None)
f1_per_class = f1_score(y_test, y_pred, average=None)
# 每个类别的样本数量
class_counts = np.bincount(y_test)
# 创建DataFrame
per_class_df = pd.DataFrame({
'类别': range(len(precision_per_class)),
'样本数量': class_counts,
'精确率': precision_per_class,
'召回率': recall_per_class,
'F1分数': f1_per_class
})
# 可视化
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
# 每个类别的指标
x = per_class_df['类别']
width = 0.25
axes[0, 0].bar(x - width, per_class_df['精确率'], width, label='精确率', alpha=0.8)
axes[0, 0].bar(x, per_class_df['召回率'], width, label='召回率', alpha=0.8)
axes[0, 0].bar(x + width, per_class_df['F1分数'], width, label='F1分数', alpha=0.8)
axes[0, 0].set_xlabel('类别')
axes[0, 0].set_ylabel('分数')
axes[0, 0].set_title('每个类别的性能指标')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 样本数量分布
axes[0, 1].bar(per_class_df['类别'], per_class_df['样本数量'], alpha=0.8)
axes[0, 1].set_xlabel('类别')
axes[0, 1].set_ylabel('样本数量')
axes[0, 1].set_title('每个类别的样本数量')
axes[0, 1].grid(True, alpha=0.3)
# 性能vs样本数量的关系
axes[1, 0].scatter(per_class_df['样本数量'], per_class_df['F1分数'], alpha=0.8)
axes[1, 0].set_xlabel('样本数量')
axes[1, 0].set_ylabel('F1分数')
axes[1, 0].set_title('样本数量 vs F1分数')
axes[1, 0].grid(True, alpha=0.3)
# 精确率vs召回率
axes[1, 1].scatter(per_class_df['召回率'], per_class_df['精确率'], alpha=0.8)
for i, txt in enumerate(per_class_df['类别']):
axes[1, 1].annotate(f'类别{txt}',
(per_class_df['召回率'].iloc[i], per_class_df['精确率'].iloc[i]))
axes[1, 1].set_xlabel('召回率')
axes[1, 1].set_ylabel('精确率')
axes[1, 1].set_title('精确率 vs 召回率')
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print(f"\n{model_name} 每个类别的详细性能:")
print(per_class_df.round(4))
return per_class_df
# 多分类评估演示
multiclass_eval = MultiClassEvaluation()
print("\n多分类模型评估演示:")
print("=" * 30)
# 多分类指标演示
multiclass_results, multiclass_predictions, y_test_multi = multiclass_eval.demonstrate_multiclass_metrics()
# 多分类混淆矩阵
multiclass_eval.plot_multiclass_confusion_matrices(multiclass_predictions, y_test_multi)
# 每个类别的性能分析
per_class_results = multiclass_eval.analyze_per_class_performance(multiclass_predictions, y_test_multi)
5.3 回归模型评估指标
5.3.1 基础回归指标
class RegressionMetrics:
"""
回归模型评估指标分析
"""
def __init__(self):
self.results = {}
def demonstrate_regression_metrics(self):
"""
演示回归评估指标
"""
# 创建回归数据集
X, y = make_regression(n_samples=1000, n_features=10, noise=10, random_state=42)
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 训练不同模型
models = {
'Linear Regression': LinearRegression(),
'Ridge Regression': Ridge(alpha=1.0),
'Lasso Regression': Lasso(alpha=1.0),
'Decision Tree': DecisionTreeRegressor(random_state=42),
'Random Forest': RandomForestRegressor(random_state=42),
'SVR': SVR()
}
results = []
predictions = {}
for name, model in models.items():
# 训练模型
model.fit(X_train_scaled, y_train)
# 预测
y_pred = model.predict(X_test_scaled)
predictions[name] = y_pred
# 计算指标
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
# 计算MAPE (Mean Absolute Percentage Error)
mape = np.mean(np.abs((y_test - y_pred) / y_test)) * 100
results.append({
'模型': name,
'MSE': mse,
'RMSE': rmse,
'MAE': mae,
'R²': r2,
'MAPE(%)': mape
})
results_df = pd.DataFrame(results)
# 可视化结果
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.ravel()
metrics = ['MSE', 'RMSE', 'MAE', 'R²', 'MAPE(%)']
for i, metric in enumerate(metrics):
if i < len(axes):
axes[i].bar(results_df['模型'], results_df[metric], alpha=0.8)
axes[i].set_title(f'{metric} 比较')
axes[i].set_ylabel(metric)
axes[i].tick_params(axis='x', rotation=45)
axes[i].grid(True, alpha=0.3)
# 最后一个子图显示所有指标的标准化比较
# 标准化指标(除了R²)
normalized_data = results_df.copy()
for metric in ['MSE', 'RMSE', 'MAE', 'MAPE(%)']:
normalized_data[metric] = (normalized_data[metric] - normalized_data[metric].min()) / \
(normalized_data[metric].max() - normalized_data[metric].min())
# R²越大越好,所以不需要反转
normalized_data['R²'] = normalized_data['R²']
# 绘制雷达图风格的比较
x = np.arange(len(results_df))
width = 0.15
for i, metric in enumerate(['MSE', 'RMSE', 'MAE', 'MAPE(%)']):
axes[5].bar(x + i*width, normalized_data[metric], width,
label=metric, alpha=0.8)
axes[5].set_xlabel('模型')
axes[5].set_ylabel('标准化分数')
axes[5].set_title('标准化指标比较')
axes[5].set_xticks(x + width * 1.5)
axes[5].set_xticklabels(results_df['模型'], rotation=45)
axes[5].legend()
axes[5].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("回归模型评估结果:")
print(results_df.round(4))
return results_df, predictions, y_test
def analyze_prediction_quality(self, predictions, y_test):
"""
分析预测质量
"""
n_models = len(predictions)
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.ravel()
for i, (name, y_pred) in enumerate(predictions.items()):
if i >= len(axes):
break
# 真实值vs预测值散点图
axes[i].scatter(y_test, y_pred, alpha=0.6)
# 绘制完美预测线
min_val = min(y_test.min(), y_pred.min())
max_val = max(y_test.max(), y_pred.max())
axes[i].plot([min_val, max_val], [min_val, max_val], 'r--', alpha=0.8)
axes[i].set_xlabel('真实值')
axes[i].set_ylabel('预测值')
axes[i].set_title(f'{name}\n真实值 vs 预测值')
axes[i].grid(True, alpha=0.3)
# 计算并显示R²
r2 = r2_score(y_test, y_pred)
axes[i].text(0.05, 0.95, f'R² = {r2:.3f}',
transform=axes[i].transAxes,
bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
plt.tight_layout()
plt.show()
def analyze_residuals(self, predictions, y_test):
"""
分析残差
"""
n_models = len(predictions)
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.ravel()
for i, (name, y_pred) in enumerate(predictions.items()):
if i >= len(axes):
break
# 计算残差
residuals = y_test - y_pred
# 残差vs预测值
axes[i].scatter(y_pred, residuals, alpha=0.6)
axes[i].axhline(y=0, color='r', linestyle='--', alpha=0.8)
axes[i].set_xlabel('预测值')
axes[i].set_ylabel('残差')
axes[i].set_title(f'{name}\n残差图')
axes[i].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 残差分布分析
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.ravel()
for i, (name, y_pred) in enumerate(predictions.items()):
if i >= len(axes):
break
residuals = y_test - y_pred
# 残差直方图
axes[i].hist(residuals, bins=30, alpha=0.7, density=True)
axes[i].set_xlabel('残差')
axes[i].set_ylabel('密度')
axes[i].set_title(f'{name}\n残差分布')
axes[i].grid(True, alpha=0.3)
# 添加正态分布曲线
mu, sigma = residuals.mean(), residuals.std()
x = np.linspace(residuals.min(), residuals.max(), 100)
y = ((1 / (sigma * np.sqrt(2 * np.pi))) *
np.exp(-0.5 * ((x - mu) / sigma) ** 2))
axes[i].plot(x, y, 'r-', alpha=0.8, label='正态分布')
axes[i].legend()
plt.tight_layout()
plt.show()
def compare_model_performance(self, results_df):
"""
综合比较模型性能
"""
# 创建性能排名
ranking_df = results_df.copy()
# 对于MSE, RMSE, MAE, MAPE,越小越好
for metric in ['MSE', 'RMSE', 'MAE', 'MAPE(%)']:
ranking_df[f'{metric}_rank'] = ranking_df[metric].rank()
# 对于R²,越大越好
ranking_df['R²_rank'] = ranking_df['R²'].rank(ascending=False)
# 计算平均排名
rank_cols = [col for col in ranking_df.columns if col.endswith('_rank')]
ranking_df['平均排名'] = ranking_df[rank_cols].mean(axis=1)
# 按平均排名排序
ranking_df = ranking_df.sort_values('平均排名')
# 可视化排名
plt.figure(figsize=(12, 8))
# 绘制每个指标的排名
x = np.arange(len(ranking_df))
width = 0.15
metrics = ['MSE', 'RMSE', 'MAE', 'R²', 'MAPE(%)']
colors = plt.cm.Set3(np.linspace(0, 1, len(metrics)))
for i, metric in enumerate(metrics):
rank_col = f'{metric}_rank'
plt.bar(x + i*width, ranking_df[rank_col], width,
label=metric, alpha=0.8, color=colors[i])
plt.xlabel('模型')
plt.ylabel('排名 (越小越好)')
plt.title('模型性能排名比较')
plt.xticks(x + width * 2, ranking_df['模型'], rotation=45)
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("模型性能排名:")
print(ranking_df[['模型', '平均排名'] + rank_cols].round(2))
return ranking_df
# 回归指标演示
regression_metrics = RegressionMetrics()
print("\n回归模型评估指标演示:")
print("=" * 30)
# 回归指标演示
regression_results, regression_predictions, y_test_reg = regression_metrics.demonstrate_regression_metrics()
# 预测质量分析
regression_metrics.analyze_prediction_quality(regression_predictions, y_test_reg)
# 残差分析
regression_metrics.analyze_residuals(regression_predictions, y_test_reg)
# 模型性能比较
ranking_results = regression_metrics.compare_model_performance(regression_results)
5.4 交叉验证
5.4.1 交叉验证基础
from sklearn.model_selection import (
KFold, StratifiedKFold, LeaveOneOut, LeavePOut,
ShuffleSplit, StratifiedShuffleSplit, TimeSeriesSplit,
cross_validate, GridSearchCV, RandomizedSearchCV
)
from sklearn.metrics import make_scorer
class CrossValidationDemo:
"""
交叉验证演示类
"""
def __init__(self):
self.cv_results = {}
def demonstrate_cv_methods(self):
"""
演示不同的交叉验证方法
"""
# 创建数据集
X, y = make_classification(n_samples=200, n_features=10, n_informative=5,
n_redundant=5, n_classes=3, random_state=42)
# 不同的交叉验证策略
cv_strategies = {
'K-Fold (k=5)': KFold(n_splits=5, shuffle=True, random_state=42),
'Stratified K-Fold (k=5)': StratifiedKFold(n_splits=5, shuffle=True, random_state=42),
'Leave-One-Out': LeaveOneOut(),
'Shuffle Split': ShuffleSplit(n_splits=5, test_size=0.3, random_state=42),
'Stratified Shuffle Split': StratifiedShuffleSplit(n_splits=5, test_size=0.3, random_state=42)
}
# 模型
model = LogisticRegression(random_state=42, max_iter=1000)
results = []
for cv_name, cv_strategy in cv_strategies.items():
if cv_name == 'Leave-One-Out' and len(X) > 50:
# LOO对大数据集太慢,跳过
continue
try:
# 执行交叉验证
cv_scores = cross_val_score(model, X, y, cv=cv_strategy, scoring='accuracy')
results.append({
'交叉验证方法': cv_name,
'平均准确率': cv_scores.mean(),
'标准差': cv_scores.std(),
'最小值': cv_scores.min(),
'最大值': cv_scores.max(),
'折数': len(cv_scores)
})
except Exception as e:
print(f"{cv_name} 执行失败: {e}")
results_df = pd.DataFrame(results)
# 可视化结果
fig, axes = plt.subplots(1, 2, figsize=(15, 6))
# 平均准确率比较
axes[0].bar(range(len(results_df)), results_df['平均准确率'],
yerr=results_df['标准差'], capsize=5, alpha=0.8)
axes[0].set_xlabel('交叉验证方法')
axes[0].set_ylabel('准确率')
axes[0].set_title('不同交叉验证方法的性能比较')
axes[0].set_xticks(range(len(results_df)))
axes[0].set_xticklabels(results_df['交叉验证方法'], rotation=45)
axes[0].grid(True, alpha=0.3)
# 标准差比较
axes[1].bar(range(len(results_df)), results_df['标准差'], alpha=0.8, color='orange')
axes[1].set_xlabel('交叉验证方法')
axes[1].set_ylabel('标准差')
axes[1].set_title('不同交叉验证方法的稳定性比较')
axes[1].set_xticks(range(len(results_df)))
axes[1].set_xticklabels(results_df['交叉验证方法'], rotation=45)
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("交叉验证方法比较:")
print(results_df.round(4))
return results_df
def demonstrate_cv_visualization(self):
"""
可视化交叉验证过程
"""
# 创建小数据集便于可视化
X, y = make_classification(n_samples=50, n_features=2, n_informative=2,
n_redundant=0, n_clusters_per_class=1, random_state=42)
# K-Fold交叉验证
kf = KFold(n_splits=5, shuffle=True, random_state=42)
fig, axes = plt.subplots(1, 5, figsize=(20, 4))
for i, (train_idx, test_idx) in enumerate(kf.split(X)):
# 绘制训练集和测试集
axes[i].scatter(X[train_idx, 0], X[train_idx, 1],
c=y[train_idx], cmap='viridis', alpha=0.8,
s=50, label='训练集', marker='o')
axes[i].scatter(X[test_idx, 0], X[test_idx, 1],
c=y[test_idx], cmap='viridis', alpha=0.8,
s=100, label='测试集', marker='s', edgecolors='red', linewidth=2)
axes[i].set_title(f'Fold {i+1}\n训练: {len(train_idx)}, 测试: {len(test_idx)}')
axes[i].set_xlabel('特征1')
if i == 0:
axes[i].set_ylabel('特征2')
axes[i].legend()
axes[i].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def compare_cv_strategies_on_imbalanced_data(self):
"""
在不平衡数据上比较交叉验证策略
"""
# 创建不平衡数据集
X, y = make_classification(n_samples=1000, n_features=10, n_informative=5,
n_redundant=5, n_classes=2, weights=[0.9, 0.1],
random_state=42)
print(f"类别分布: {np.bincount(y)}")
# 比较普通K-Fold和分层K-Fold
cv_strategies = {
'K-Fold': KFold(n_splits=5, shuffle=True, random_state=42),
'Stratified K-Fold': StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
}
model = LogisticRegression(random_state=42, max_iter=1000)
results = []
fold_distributions = {}
for cv_name, cv_strategy in cv_strategies.items():
cv_scores = cross_val_score(model, X, y, cv=cv_strategy, scoring='f1')
results.append({
'交叉验证方法': cv_name,
'平均F1分数': cv_scores.mean(),
'标准差': cv_scores.std()
})
# 记录每折的类别分布
fold_dist = []
for train_idx, test_idx in cv_strategy.split(X, y):
test_dist = np.bincount(y[test_idx], minlength=2)
fold_dist.append(test_dist[1] / len(test_idx)) # 少数类比例
fold_distributions[cv_name] = fold_dist
# 可视化结果
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# F1分数比较
results_df = pd.DataFrame(results)
axes[0].bar(range(len(results_df)), results_df['平均F1分数'],
yerr=results_df['标准差'], capsize=5, alpha=0.8)
axes[0].set_xlabel('交叉验证方法')
axes[0].set_ylabel('F1分数')
axes[0].set_title('不平衡数据上的交叉验证比较')
axes[0].set_xticks(range(len(results_df)))
axes[0].set_xticklabels(results_df['交叉验证方法'])
axes[0].grid(True, alpha=0.3)
# 每折的类别分布
for i, (cv_name, dist) in enumerate(fold_distributions.items()):
axes[1].plot(range(1, len(dist)+1), dist, 'o-', label=cv_name, alpha=0.8)
axes[1].axhline(y=0.1, color='red', linestyle='--', alpha=0.8, label='总体少数类比例')
axes[1].set_xlabel('折数')
axes[1].set_ylabel('测试集中少数类比例')
axes[1].set_title('每折测试集的类别分布')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("不平衡数据交叉验证结果:")
print(results_df.round(4))
return results_df, fold_distributions
def demonstrate_time_series_cv(self):
"""
演示时间序列交叉验证
"""
# 创建时间序列数据
np.random.seed(42)
n_samples = 100
time = np.arange(n_samples)
# 创建带趋势和季节性的时间序列
trend = 0.1 * time
seasonal = 2 * np.sin(2 * np.pi * time / 12)
noise = np.random.normal(0, 0.5, n_samples)
y = trend + seasonal + noise
# 创建特征(滞后特征)
X = np.column_stack([
np.roll(y, 1), # lag 1
np.roll(y, 2), # lag 2
np.roll(y, 3), # lag 3
])[3:] # 去掉前3个样本(因为滞后)
y = y[3:]
# 时间序列交叉验证
tscv = TimeSeriesSplit(n_splits=5)
# 可视化时间序列分割
fig, axes = plt.subplots(2, 3, figsize=(15, 8))
axes = axes.ravel()
# 原始时间序列
axes[0].plot(range(len(y)), y, 'b-', alpha=0.8)
axes[0].set_title('原始时间序列')
axes[0].set_xlabel('时间')
axes[0].set_ylabel('值')
axes[0].grid(True, alpha=0.3)
# 每个分割的可视化
for i, (train_idx, test_idx) in enumerate(tscv.split(X)):
if i >= 5: # 只显示前5个分割
break
axes[i+1].plot(train_idx, y[train_idx], 'b-', alpha=0.8, label='训练集')
axes[i+1].plot(test_idx, y[test_idx], 'r-', alpha=0.8, label='测试集')
axes[i+1].set_title(f'分割 {i+1}\n训练: {len(train_idx)}, 测试: {len(test_idx)}')
axes[i+1].set_xlabel('时间')
axes[i+1].set_ylabel('值')
axes[i+1].legend()
axes[i+1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 比较时间序列CV和普通CV
model = LinearRegression()
# 时间序列交叉验证
ts_scores = cross_val_score(model, X, y, cv=tscv, scoring='r2')
# 普通K-Fold(错误的做法)
kf = KFold(n_splits=5, shuffle=True, random_state=42)
kf_scores = cross_val_score(model, X, y, cv=kf, scoring='r2')
# 比较结果
comparison_df = pd.DataFrame({
'交叉验证方法': ['时间序列CV', '普通K-Fold'],
'平均R²': [ts_scores.mean(), kf_scores.mean()],
'标准差': [ts_scores.std(), kf_scores.std()]
})
print("时间序列交叉验证比较:")
print(comparison_df.round(4))
print("\n注意: 普通K-Fold在时间序列数据上会产生数据泄露,导致过于乐观的结果")
return comparison_df
# 交叉验证演示
cv_demo = CrossValidationDemo()
print("\n交叉验证演示:")
print("=" * 30)
# 不同交叉验证方法比较
cv_methods_results = cv_demo.demonstrate_cv_methods()
# 交叉验证可视化
cv_demo.demonstrate_cv_visualization()
# 不平衡数据上的交叉验证
imbalanced_results, fold_dist = cv_demo.compare_cv_strategies_on_imbalanced_data()
# 时间序列交叉验证
ts_cv_results = cv_demo.demonstrate_time_series_cv()
5.4.2 高级交叉验证技术
class AdvancedCrossValidation:
"""
高级交叉验证技术
"""
def __init__(self):
self.results = {}
def nested_cross_validation(self):
"""
嵌套交叉验证演示
"""
# 创建数据集
X, y = make_classification(n_samples=500, n_features=20, n_informative=10,
n_redundant=10, random_state=42)
# 模型和参数网格
models_params = {
'SVM': {
'model': SVC(random_state=42),
'params': {
'C': [0.1, 1, 10],
'gamma': ['scale', 'auto', 0.1, 1]
}
},
'Random Forest': {
'model': RandomForestClassifier(random_state=42),
'params': {
'n_estimators': [50, 100, 200],
'max_depth': [None, 5, 10]
}
}
}
# 外层交叉验证
outer_cv = KFold(n_splits=5, shuffle=True, random_state=42)
# 内层交叉验证
inner_cv = KFold(n_splits=3, shuffle=True, random_state=42)
nested_scores = {}
for model_name, model_config in models_params.items():
print(f"\n执行 {model_name} 的嵌套交叉验证...")
outer_scores = []
best_params_list = []
for train_idx, test_idx in outer_cv.split(X):
X_train, X_test = X[train_idx], X[test_idx]
y_train, y_test = y[train_idx], y[test_idx]
# 内层交叉验证进行超参数优化
grid_search = GridSearchCV(
model_config['model'],
model_config['params'],
cv=inner_cv,
scoring='accuracy',
n_jobs=-1
)
grid_search.fit(X_train, y_train)
# 在外层测试集上评估最佳模型
best_score = grid_search.score(X_test, y_test)
outer_scores.append(best_score)
best_params_list.append(grid_search.best_params_)
nested_scores[model_name] = {
'scores': outer_scores,
'mean': np.mean(outer_scores),
'std': np.std(outer_scores),
'best_params': best_params_list
}
# 可视化结果
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# 嵌套CV分数比较
model_names = list(nested_scores.keys())
means = [nested_scores[name]['mean'] for name in model_names]
stds = [nested_scores[name]['std'] for name in model_names]
axes[0].bar(range(len(model_names)), means, yerr=stds,
capsize=5, alpha=0.8)
axes[0].set_xlabel('模型')
axes[0].set_ylabel('准确率')
axes[0].set_title('嵌套交叉验证结果')
axes[0].set_xticks(range(len(model_names)))
axes[0].set_xticklabels(model_names)
axes[0].grid(True, alpha=0.3)
# 每折的分数分布
for i, (name, results) in enumerate(nested_scores.items()):
axes[1].scatter([i] * len(results['scores']), results['scores'],
alpha=0.7, s=50, label=name)
axes[1].set_xlabel('模型')
axes[1].set_ylabel('准确率')
axes[1].set_title('每折分数分布')
axes[1].set_xticks(range(len(model_names)))
axes[1].set_xticklabels(model_names)
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 打印详细结果
for model_name, results in nested_scores.items():
print(f"\n{model_name} 嵌套交叉验证结果:")
print(f"平均准确率: {results['mean']:.4f} ± {results['std']:.4f}")
print("每折最佳参数:")
for i, params in enumerate(results['best_params']):
print(f" Fold {i+1}: {params}")
return nested_scores
def cross_validation_with_multiple_metrics(self):
"""
多指标交叉验证
"""
# 创建数据集
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10,
n_redundant=10, random_state=42)
# 模型
model = RandomForestClassifier(random_state=42)
# 多个评估指标
scoring = {
'accuracy': 'accuracy',
'precision': 'precision_macro',
'recall': 'recall_macro',
'f1': 'f1_macro',
'roc_auc': 'roc_auc_ovr_weighted'
}
# 执行多指标交叉验证
cv_results = cross_validate(model, X, y, cv=5, scoring=scoring,
return_train_score=True)
# 整理结果
results_summary = []
for metric in scoring.keys():
test_scores = cv_results[f'test_{metric}']
train_scores = cv_results[f'train_{metric}']
results_summary.append({
'指标': metric,
'测试_平均': test_scores.mean(),
'测试_标准差': test_scores.std(),
'训练_平均': train_scores.mean(),
'训练_标准差': train_scores.std(),
'过拟合程度': train_scores.mean() - test_scores.mean()
})
results_df = pd.DataFrame(results_summary)
# 可视化
fig, axes = plt.subplots(1, 2, figsize=(15, 6))
# 训练vs测试性能
x = np.arange(len(results_df))
width = 0.35
axes[0].bar(x - width/2, results_df['训练_平均'], width,
yerr=results_df['训练_标准差'], label='训练', alpha=0.8)
axes[0].bar(x + width/2, results_df['测试_平均'], width,
yerr=results_df['测试_标准差'], label='测试', alpha=0.8)
axes[0].set_xlabel('指标')
axes[0].set_ylabel('分数')
axes[0].set_title('训练 vs 测试性能')
axes[0].set_xticks(x)
axes[0].set_xticklabels(results_df['指标'])
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 过拟合程度
axes[1].bar(results_df['指标'], results_df['过拟合程度'], alpha=0.8, color='red')
axes[1].set_xlabel('指标')
axes[1].set_ylabel('过拟合程度 (训练-测试)')
axes[1].set_title('各指标的过拟合程度')
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("多指标交叉验证结果:")
print(results_df.round(4))
return results_df, cv_results
def custom_cross_validation_splitter(self):
"""
自定义交叉验证分割器
"""
# 创建带有组信息的数据集
X, y = make_classification(n_samples=300, n_features=10, n_informative=5,
n_redundant=5, random_state=42)
# 创建组信息(模拟不同的患者、实验等)
n_groups = 10
groups = np.repeat(np.arange(n_groups), len(X) // n_groups)
groups = np.concatenate([groups, np.full(len(X) - len(groups), n_groups-1)])
from sklearn.model_selection import GroupKFold, LeaveOneGroupOut
# 不同的分组交叉验证策略
cv_strategies = {
'Group K-Fold': GroupKFold(n_splits=5),
'Leave-One-Group-Out': LeaveOneGroupOut()
}
model = LogisticRegression(random_state=42, max_iter=1000)
results = []
for cv_name, cv_strategy in cv_strategies.items():
if cv_name == 'Leave-One-Group-Out' and n_groups > 10:
# LOGO对太多组会很慢
continue
try:
cv_scores = cross_val_score(model, X, y, groups=groups,
cv=cv_strategy, scoring='accuracy')
results.append({
'交叉验证方法': cv_name,
'平均准确率': cv_scores.mean(),
'标准差': cv_scores.std(),
'折数': len(cv_scores)
})
except Exception as e:
print(f"{cv_name} 执行失败: {e}")
# 比较普通CV和分组CV
normal_kf = KFold(n_splits=5, shuffle=True, random_state=42)
normal_scores = cross_val_score(model, X, y, cv=normal_kf, scoring='accuracy')
results.append({
'交叉验证方法': 'Normal K-Fold',
'平均准确率': normal_scores.mean(),
'标准差': normal_scores.std(),
'折数': len(normal_scores)
})
results_df = pd.DataFrame(results)
# 可视化组分布
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# 组分布
group_counts = np.bincount(groups)
axes[0].bar(range(len(group_counts)), group_counts, alpha=0.8)
axes[0].set_xlabel('组ID')
axes[0].set_ylabel('样本数量')
axes[0].set_title('每组的样本数量分布')
axes[0].grid(True, alpha=0.3)
# CV结果比较
axes[1].bar(range(len(results_df)), results_df['平均准确率'],
yerr=results_df['标准差'], capsize=5, alpha=0.8)
axes[1].set_xlabel('交叉验证方法')
axes[1].set_ylabel('准确率')
axes[1].set_title('不同交叉验证方法比较')
axes[1].set_xticks(range(len(results_df)))
axes[1].set_xticklabels(results_df['交叉验证方法'], rotation=45)
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("分组交叉验证结果:")
print(results_df.round(4))
return results_df
# 高级交叉验证演示
advanced_cv = AdvancedCrossValidation()
print("\n高级交叉验证技术演示:")
print("=" * 30)
# 嵌套交叉验证
nested_cv_results = advanced_cv.nested_cross_validation()
# 多指标交叉验证
multi_metric_results, cv_detailed = advanced_cv.cross_validation_with_multiple_metrics()
# 自定义分组交叉验证
group_cv_results = advanced_cv.custom_cross_validation_splitter()
5.5 学习曲线与验证曲线
5.5.1 学习曲线分析
class LearningCurveAnalysis:
"""
学习曲线分析
"""
def __init__(self):
self.results = {}
def plot_learning_curves(self):
"""
绘制学习曲线
"""
# 创建数据集
X, y = make_classification(n_samples=2000, n_features=20, n_informative=10,
n_redundant=10, random_state=42)
# 不同复杂度的模型
models = {
'简单模型': LogisticRegression(random_state=42, max_iter=1000),
'中等复杂度': DecisionTreeClassifier(max_depth=10, random_state=42),
'复杂模型': DecisionTreeClassifier(random_state=42),
'集成模型': RandomForestClassifier(n_estimators=100, random_state=42)
}
# 训练集大小范围
train_sizes = np.linspace(0.1, 1.0, 10)
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
axes = axes.ravel()
for i, (model_name, model) in enumerate(models.items()):
print(f"计算 {model_name} 的学习曲线...")
# 计算学习曲线
train_sizes_abs, train_scores, val_scores = learning_curve(
model, X, y, train_sizes=train_sizes, cv=5,
scoring='accuracy', n_jobs=-1, random_state=42
)
# 计算均值和标准差
train_mean = np.mean(train_scores, axis=1)
train_std = np.std(train_scores, axis=1)
val_mean = np.mean(val_scores, axis=1)
val_std = np.std(val_scores, axis=1)
# 绘制学习曲线
axes[i].plot(train_sizes_abs, train_mean, 'o-', color='blue',
label='训练分数')
axes[i].fill_between(train_sizes_abs, train_mean - train_std,
train_mean + train_std, alpha=0.1, color='blue')
axes[i].plot(train_sizes_abs, val_mean, 'o-', color='red',
label='验证分数')
axes[i].fill_between(train_sizes_abs, val_mean - val_std,
val_mean + val_std, alpha=0.1, color='red')
axes[i].set_xlabel('训练集大小')
axes[i].set_ylabel('准确率')
axes[i].set_title(f'{model_name} 学习曲线')
axes[i].legend()
axes[i].grid(True, alpha=0.3)
# 分析过拟合程度
final_gap = train_mean[-1] - val_mean[-1]
axes[i].text(0.02, 0.98, f'最终差距: {final_gap:.3f}',
transform=axes[i].transAxes, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
plt.tight_layout()
plt.show()
return train_sizes_abs, models
def analyze_learning_curve_patterns(self):
"""
分析不同的学习曲线模式
"""
# 创建不同特征的数据集
datasets = {
'简单数据': make_classification(n_samples=1000, n_features=5, n_informative=3,
n_redundant=2, random_state=42),
'复杂数据': make_classification(n_samples=1000, n_features=50, n_informative=25,
n_redundant=25, random_state=42),
'噪声数据': make_classification(n_samples=1000, n_features=20, n_informative=5,
n_redundant=15, random_state=42)
}
model = RandomForestClassifier(n_estimators=100, random_state=42)
train_sizes = np.linspace(0.1, 1.0, 10)
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
patterns = {}
for i, (data_name, (X, y)) in enumerate(datasets.items()):
print(f"分析 {data_name} 的学习曲线模式...")
train_sizes_abs, train_scores, val_scores = learning_curve(
model, X, y, train_sizes=train_sizes, cv=5,
scoring='accuracy', n_jobs=-1, random_state=42
)
train_mean = np.mean(train_scores, axis=1)
val_mean = np.mean(val_scores, axis=1)
# 绘制学习曲线
axes[i].plot(train_sizes_abs, train_mean, 'o-', color='blue',
label='训练分数')
axes[i].plot(train_sizes_abs, val_mean, 'o-', color='red',
label='验证分数')
axes[i].set_xlabel('训练集大小')
axes[i].set_ylabel('准确率')
axes[i].set_title(f'{data_name} 学习曲线')
axes[i].legend()
axes[i].grid(True, alpha=0.3)
# 分析模式
final_gap = train_mean[-1] - val_mean[-1]
convergence = np.std(val_mean[-3:]) # 最后3个点的标准差
if final_gap > 0.1:
pattern = "过拟合"
elif final_gap < 0.02:
pattern = "良好拟合"
else:
pattern = "轻微过拟合"
if convergence > 0.01:
pattern += " + 未收敛"
else:
pattern += " + 已收敛"
patterns[data_name] = {
'pattern': pattern,
'final_gap': final_gap,
'convergence': convergence,
'final_val_score': val_mean[-1]
}
axes[i].text(0.02, 0.02, pattern, transform=axes[i].transAxes,
bbox=dict(boxstyle='round', facecolor='yellow', alpha=0.8))
plt.tight_layout()
plt.show()
print("\n学习曲线模式分析:")
for data_name, analysis in patterns.items():
print(f"{data_name}:")
print(f" 模式: {analysis['pattern']}")
print(f" 最终差距: {analysis['final_gap']:.4f}")
print(f" 收敛性: {analysis['convergence']:.4f}")
print(f" 最终验证分数: {analysis['final_val_score']:.4f}")
return patterns
def plot_validation_curves(self):
"""
绘制验证曲线
"""
# 创建数据集
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10,
n_redundant=10, random_state=42)
# 不同模型的超参数
model_params = {
'Decision Tree': {
'model': DecisionTreeClassifier(random_state=42),
'param_name': 'max_depth',
'param_range': range(1, 21)
},
'Random Forest': {
'model': RandomForestClassifier(random_state=42),
'param_name': 'n_estimators',
'param_range': [10, 25, 50, 75, 100, 150, 200]
},
'SVM': {
'model': SVC(random_state=42),
'param_name': 'C',
'param_range': np.logspace(-3, 2, 6)
},
'Logistic Regression': {
'model': LogisticRegression(random_state=42, max_iter=1000),
'param_name': 'C',
'param_range': np.logspace(-3, 2, 6)
}
}
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
axes = axes.ravel()
for i, (model_name, config) in enumerate(model_params.items()):
print(f"计算 {model_name} 的验证曲线...")
# 计算验证曲线
train_scores, val_scores = validation_curve(
config['model'], X, y,
param_name=config['param_name'],
param_range=config['param_range'],
cv=5, scoring='accuracy', n_jobs=-1
)
# 计算均值和标准差
train_mean = np.mean(train_scores, axis=1)
train_std = np.std(train_scores, axis=1)
val_mean = np.mean(val_scores, axis=1)
val_std = np.std(val_scores, axis=1)
# 绘制验证曲线
axes[i].plot(config['param_range'], train_mean, 'o-', color='blue',
label='训练分数')
axes[i].fill_between(config['param_range'], train_mean - train_std,
train_mean + train_std, alpha=0.1, color='blue')
axes[i].plot(config['param_range'], val_mean, 'o-', color='red',
label='验证分数')
axes[i].fill_between(config['param_range'], val_mean - val_std,
val_mean + val_std, alpha=0.1, color='red')
# 找到最佳参数
best_idx = np.argmax(val_mean)
best_param = config['param_range'][best_idx]
best_score = val_mean[best_idx]
axes[i].axvline(x=best_param, color='green', linestyle='--', alpha=0.8,
label=f'最佳: {best_param}')
axes[i].set_xlabel(config['param_name'])
axes[i].set_ylabel('准确率')
axes[i].set_title(f'{model_name} 验证曲线')
axes[i].legend()
axes[i].grid(True, alpha=0.3)
# 对数尺度(如果参数跨度很大)
if config['param_name'] == 'C':
axes[i].set_xscale('log')
# 添加最佳参数信息
axes[i].text(0.02, 0.98, f'最佳{config["param_name"]}: {best_param}\n最佳分数: {best_score:.3f}',
transform=axes[i].transAxes, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
plt.tight_layout()
plt.show()
return model_params
def bias_variance_analysis(self):
"""
偏差-方差分析
"""
# 创建数据集
X, y = make_classification(n_samples=500, n_features=10, n_informative=5,
n_redundant=5, random_state=42)
# 不同复杂度的模型
models = {
'高偏差低方差': DecisionTreeClassifier(max_depth=3, random_state=42),
'中等偏差方差': DecisionTreeClassifier(max_depth=8, random_state=42),
'低偏差高方差': DecisionTreeClassifier(random_state=42)
}
n_experiments = 50
train_size = 0.7
results = {}
for model_name, model in models.items():
print(f"分析 {model_name} 的偏差-方差...")
predictions = []
for i in range(n_experiments):
# 随机分割数据
X_train, X_test, y_train, y_test = train_test_split(
X, y, train_size=train_size, random_state=i
)
# 训练模型并预测
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
predictions.append(y_pred)
# 计算偏差和方差
predictions = np.array(predictions)
# 对每个测试样本计算预测的方差
prediction_variance = np.var(predictions, axis=0)
avg_variance = np.mean(prediction_variance)
# 计算平均预测
avg_predictions = np.mean(predictions, axis=0)
# 使用最后一次分割的y_test作为"真实"标签
bias_squared = np.mean((avg_predictions - y_test) ** 2)
results[model_name] = {
'bias_squared': bias_squared,
'variance': avg_variance,
'total_error': bias_squared + avg_variance
}
# 可视化偏差-方差权衡
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# 偏差vs方差
model_names = list(results.keys())
bias_values = [results[name]['bias_squared'] for name in model_names]
variance_values = [results[name]['variance'] for name in model_names]
total_errors = [results[name]['total_error'] for name in model_names]
x = np.arange(len(model_names))
width = 0.25
axes[0].bar(x - width, bias_values, width, label='偏差²', alpha=0.8)
axes[0].bar(x, variance_values, width, label='方差', alpha=0.8)
axes[0].bar(x + width, total_errors, width, label='总误差', alpha=0.8)
axes[0].set_xlabel('模型')
axes[0].set_ylabel('误差')
axes[0].set_title('偏差-方差分解')
axes[0].set_xticks(x)
axes[0].set_xticklabels(model_names, rotation=45)
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 偏差vs方差散点图
axes[1].scatter(bias_values, variance_values, s=100, alpha=0.8)
for i, name in enumerate(model_names):
axes[1].annotate(name, (bias_values[i], variance_values[i]),
xytext=(5, 5), textcoords='offset points')
axes[1].set_xlabel('偏差²')
axes[1].set_ylabel('方差')
axes[1].set_title('偏差-方差权衡')
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("\n偏差-方差分析结果:")
for model_name, analysis in results.items():
print(f"{model_name}:")
print(f" 偏差²: {analysis['bias_squared']:.4f}")
print(f" 方差: {analysis['variance']:.4f}")
print(f" 总误差: {analysis['total_error']:.4f}")
return results
# 学习曲线分析演示
learning_analysis = LearningCurveAnalysis()
print("\n学习曲线与验证曲线分析:")
print("=" * 30)
# 学习曲线
train_sizes, models = learning_analysis.plot_learning_curves()
# 学习曲线模式分析
patterns = learning_analysis.analyze_learning_curve_patterns()
# 验证曲线
validation_curves = learning_analysis.plot_validation_curves()
# 偏差-方差分析
bias_variance_results = learning_analysis.bias_variance_analysis()
5.6 超参数优化
5.6.1 网格搜索与随机搜索
class HyperparameterOptimization:
"""
超参数优化技术
"""
def __init__(self):
self.results = {}
def grid_search_demo(self):
"""
网格搜索演示
"""
# 创建数据集
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10,
n_redundant=10, random_state=42)
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 定义模型和参数网格
models_params = {
'Random Forest': {
'model': RandomForestClassifier(random_state=42),
'params': {
'n_estimators': [50, 100, 200],
'max_depth': [None, 5, 10, 15],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4]
}
},
'SVM': {
'model': SVC(random_state=42),
'params': {
'C': [0.1, 1, 10, 100],
'gamma': ['scale', 'auto', 0.1, 1],
'kernel': ['rbf', 'poly', 'sigmoid']
}
}
}
grid_results = {}
for model_name, config in models_params.items():
print(f"\n执行 {model_name} 的网格搜索...")
# 网格搜索
grid_search = GridSearchCV(
config['model'],
config['params'],
cv=5,
scoring='accuracy',
n_jobs=-1,
verbose=1
)
# 记录搜索时间
start_time = time.time()
grid_search.fit(X_train, y_train)
search_time = time.time() - start_time
# 在测试集上评估
test_score = grid_search.score(X_test, y_test)
grid_results[model_name] = {
'best_params': grid_search.best_params_,
'best_cv_score': grid_search.best_score_,
'test_score': test_score,
'search_time': search_time,
'n_combinations': len(grid_search.cv_results_['params'])
}
print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳CV分数: {grid_search.best_score_:.4f}")
print(f"测试分数: {test_score:.4f}")
print(f"搜索时间: {search_time:.2f}秒")
print(f"参数组合数: {len(grid_search.cv_results_['params'])}")
return grid_results
def random_search_demo(self):
"""
随机搜索演示
"""
from scipy.stats import randint, uniform
# 创建数据集
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10,
n_redundant=10, random_state=42)
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 定义随机搜索参数分布
models_params = {
'Random Forest': {
'model': RandomForestClassifier(random_state=42),
'params': {
'n_estimators': randint(50, 300),
'max_depth': [None] + list(range(5, 21)),
'min_samples_split': randint(2, 11),
'min_samples_leaf': randint(1, 5),
'max_features': ['sqrt', 'log2', None]
}
},
'SVM': {
'model': SVC(random_state=42),
'params': {
'C': uniform(0.1, 100),
'gamma': uniform(0.001, 1),
'kernel': ['rbf', 'poly', 'sigmoid']
}
}
}
random_results = {}
n_iter = 100 # 随机搜索迭代次数
for model_name, config in models_params.items():
print(f"\n执行 {model_name} 的随机搜索...")
# 随机搜索
random_search = RandomizedSearchCV(
config['model'],
config['params'],
n_iter=n_iter,
cv=5,
scoring='accuracy',
n_jobs=-1,
random_state=42,
verbose=1
)
# 记录搜索时间
start_time = time.time()
random_search.fit(X_train, y_train)
search_time = time.time() - start_time
# 在测试集上评估
test_score = random_search.score(X_test, y_test)
random_results[model_name] = {
'best_params': random_search.best_params_,
'best_cv_score': random_search.best_score_,
'test_score': test_score,
'search_time': search_time,
'n_combinations': n_iter
}
print(f"最佳参数: {random_search.best_params_}")
print(f"最佳CV分数: {random_search.best_score_:.4f}")
print(f"测试分数: {test_score:.4f}")
print(f"搜索时间: {search_time:.2f}秒")
return random_results
def compare_search_methods(self):
"""
比较网格搜索和随机搜索
"""
print("比较网格搜索和随机搜索...")
# 执行两种搜索方法
grid_results = self.grid_search_demo()
random_results = self.random_search_demo()
# 整理比较结果
comparison_data = []
for model_name in grid_results.keys():
if model_name in random_results:
comparison_data.append({
'模型': model_name,
'网格搜索_CV分数': grid_results[model_name]['best_cv_score'],
'网格搜索_测试分数': grid_results[model_name]['test_score'],
'网格搜索_时间': grid_results[model_name]['search_time'],
'网格搜索_组合数': grid_results[model_name]['n_combinations'],
'随机搜索_CV分数': random_results[model_name]['best_cv_score'],
'随机搜索_测试分数': random_results[model_name]['test_score'],
'随机搜索_时间': random_results[model_name]['search_time'],
'随机搜索_组合数': random_results[model_name]['n_combinations']
})
comparison_df = pd.DataFrame(comparison_data)
# 可视化比较
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
models = comparison_df['模型']
x = np.arange(len(models))
width = 0.35
# CV分数比较
axes[0].bar(x - width/2, comparison_df['网格搜索_CV分数'], width,
label='网格搜索', alpha=0.8)
axes[0].bar(x + width/2, comparison_df['随机搜索_CV分数'], width,
label='随机搜索', alpha=0.8)
axes[0].set_xlabel('模型')
axes[0].set_ylabel('CV分数')
axes[0].set_title('交叉验证分数比较')
axes[0].set_xticks(x)
axes[0].set_xticklabels(models)
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 测试分数比较
axes[1].bar(x - width/2, comparison_df['网格搜索_测试分数'], width,
label='网格搜索', alpha=0.8)
axes[1].bar(x + width/2, comparison_df['随机搜索_测试分数'], width,
label='随机搜索', alpha=0.8)
axes[1].set_xlabel('模型')
axes[1].set_ylabel('测试分数')
axes[1].set_title('测试分数比较')
axes[1].set_xticks(x)
axes[1].set_xticklabels(models)
axes[1].legend()
axes[1].grid(True, alpha=0.3)
# 搜索时间比较
axes[2].bar(x - width/2, comparison_df['网格搜索_时间'], width,
label='网格搜索', alpha=0.8)
axes[2].bar(x + width/2, comparison_df['随机搜索_时间'], width,
label='随机搜索', alpha=0.8)
axes[2].set_xlabel('模型')
axes[2].set_ylabel('搜索时间 (秒)')
axes[2].set_title('搜索时间比较')
axes[2].set_xticks(x)
axes[2].set_xticklabels(models)
axes[2].legend()
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("\n搜索方法比较结果:")
print(comparison_df.round(4))
return comparison_df
def bayesian_optimization_demo(self):
"""
贝叶斯优化演示(使用scikit-optimize)
"""
try:
from skopt import gp_minimize
from skopt.space import Real, Integer, Categorical
from skopt.utils import use_named_args
except ImportError:
print("需要安装 scikit-optimize: pip install scikit-optimize")
return None
# 创建数据集
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10,
n_redundant=10, random_state=42)
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 定义搜索空间
dimensions = [
Integer(50, 300, name='n_estimators'),
Integer(5, 20, name='max_depth'),
Integer(2, 10, name='min_samples_split'),
Integer(1, 5, name='min_samples_leaf'),
Real(0.1, 1.0, name='max_features_ratio')
]
# 定义目标函数
@use_named_args(dimensions)
def objective(**params):
# 处理max_features参数
max_features = int(params['max_features_ratio'] * X_train.shape[1])
model = RandomForestClassifier(
n_estimators=params['n_estimators'],
max_depth=params['max_depth'],
min_samples_split=params['min_samples_split'],
min_samples_leaf=params['min_samples_leaf'],
max_features=max_features,
random_state=42
)
# 交叉验证
cv_scores = cross_val_score(model, X_train, y_train, cv=3, scoring='accuracy')
# 返回负分数(因为gp_minimize是最小化)
return -cv_scores.mean()
print("执行贝叶斯优化...")
# 执行贝叶斯优化
start_time = time.time()
result = gp_minimize(
func=objective,
dimensions=dimensions,
n_calls=50,
random_state=42,
verbose=True
)
search_time = time.time() - start_time
# 获取最佳参数
best_params = {
'n_estimators': result.x[0],
'max_depth': result.x[1],
'min_samples_split': result.x[2],
'min_samples_leaf': result.x[3],
'max_features': int(result.x[4] * X_train.shape[1])
}
# 用最佳参数训练模型
best_model = RandomForestClassifier(**best_params, random_state=42)
best_model.fit(X_train, y_train)
test_score = best_model.score(X_test, y_test)
print(f"\n贝叶斯优化结果:")
print(f"最佳参数: {best_params}")
print(f"最佳CV分数: {-result.fun:.4f}")
print(f"测试分数: {test_score:.4f}")
print(f"搜索时间: {search_time:.2f}秒")
print(f"函数评估次数: {len(result.func_vals)}")
# 可视化优化过程
plt.figure(figsize=(10, 6))
plt.plot(range(1, len(result.func_vals) + 1), -np.array(result.func_vals), 'b-o')
plt.xlabel('迭代次数')
plt.ylabel('最佳CV分数')
plt.title('贝叶斯优化过程')
plt.grid(True, alpha=0.3)
plt.show()
return {
'best_params': best_params,
'best_cv_score': -result.fun,
'test_score': test_score,
'search_time': search_time,
'n_evaluations': len(result.func_vals)
}
# 超参数优化演示
hyperopt = HyperparameterOptimization()
print("\n超参数优化演示:")
print("=" * 30)
# 比较不同搜索方法
search_comparison = hyperopt.compare_search_methods()
# 贝叶斯优化
bayesian_results = hyperopt.bayesian_optimization_demo()
5.6.2 自动化超参数优化
class AutoMLDemo:
"""
自动化机器学习演示
"""
def __init__(self):
self.results = {}
def automated_model_selection(self):
"""
自动化模型选择和超参数优化
"""
# 创建数据集
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10,
n_redundant=10, random_state=42)
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 定义候选模型和参数
models_params = {
'Logistic Regression': {
'model': LogisticRegression(random_state=42, max_iter=1000),
'params': {
'C': [0.1, 1, 10],
'penalty': ['l1', 'l2'],
'solver': ['liblinear', 'saga']
}
},
'Random Forest': {
'model': RandomForestClassifier(random_state=42),
'params': {
'n_estimators': [50, 100, 200],
'max_depth': [None, 5, 10],
'min_samples_split': [2, 5]
}
},
'SVM': {
'model': SVC(random_state=42),
'params': {
'C': [0.1, 1, 10],
'gamma': ['scale', 'auto'],
'kernel': ['rbf', 'linear']
}
},
'Gradient Boosting': {
'model': GradientBoostingClassifier(random_state=42),
'params': {
'n_estimators': [50, 100, 200],
'learning_rate': [0.01, 0.1, 0.2],
'max_depth': [3, 5, 7]
}
}
}
best_models = {}
print("自动化模型选择和超参数优化...")
for model_name, config in models_params.items():
print(f"\n优化 {model_name}...")
# 网格搜索
grid_search = GridSearchCV(
config['model'],
config['params'],
cv=5,
scoring='accuracy',
n_jobs=-1
)
grid_search.fit(X_train, y_train)
# 在测试集上评估
test_score = grid_search.score(X_test, y_test)
best_models[model_name] = {
'model': grid_search.best_estimator_,
'best_params': grid_search.best_params_,
'cv_score': grid_search.best_score_,
'test_score': test_score
}
print(f"最佳参数: {grid_search.best_params_}")
print(f"CV分数: {grid_search.best_score_:.4f}")
print(f"测试分数: {test_score:.4f}")
# 选择最佳模型
best_model_name = max(best_models.keys(),
key=lambda x: best_models[x]['test_score'])
print(f"\n最佳模型: {best_model_name}")
print(f"最佳测试分数: {best_models[best_model_name]['test_score']:.4f}")
# 可视化模型比较
self.visualize_model_comparison(best_models)
return best_models, best_model_name
def visualize_model_comparison(self, best_models):
"""
可视化模型比较结果
"""
# 准备数据
model_names = list(best_models.keys())
cv_scores = [best_models[name]['cv_score'] for name in model_names]
test_scores = [best_models[name]['test_score'] for name in model_names]
# 创建图表
fig, axes = plt.subplots(1, 2, figsize=(15, 6))
# CV分数 vs 测试分数
axes[0].scatter(cv_scores, test_scores, s=100, alpha=0.7)
for i, name in enumerate(model_names):
axes[0].annotate(name, (cv_scores[i], test_scores[i]),
xytext=(5, 5), textcoords='offset points')
# 添加对角线(理想情况)
min_score = min(min(cv_scores), min(test_scores))
max_score = max(max(cv_scores), max(test_scores))
axes[0].plot([min_score, max_score], [min_score, max_score],
'r--', alpha=0.5, label='理想线')
axes[0].set_xlabel('交叉验证分数')
axes[0].set_ylabel('测试分数')
axes[0].set_title('CV分数 vs 测试分数')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 模型性能排名
x = np.arange(len(model_names))
width = 0.35
axes[1].bar(x - width/2, cv_scores, width, label='CV分数', alpha=0.8)
axes[1].bar(x + width/2, test_scores, width, label='测试分数', alpha=0.8)
axes[1].set_xlabel('模型')
axes[1].set_ylabel('分数')
axes[1].set_title('模型性能比较')
axes[1].set_xticks(x)
axes[1].set_xticklabels(model_names, rotation=45)
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def ensemble_model_optimization(self):
"""
集成模型优化
"""
from sklearn.ensemble import VotingClassifier
# 创建数据集
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10,
n_redundant=10, random_state=42)
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 基础模型
base_models = [
('rf', RandomForestClassifier(n_estimators=100, random_state=42)),
('svm', SVC(probability=True, random_state=42)),
('lr', LogisticRegression(random_state=42, max_iter=1000))
]
# 创建投票分类器
voting_clf = VotingClassifier(
estimators=base_models,
voting='soft' # 使用概率投票
)
# 优化集成模型的参数
param_grid = {
'rf__n_estimators': [50, 100, 200],
'rf__max_depth': [None, 5, 10],
'svm__C': [0.1, 1, 10],
'svm__gamma': ['scale', 'auto'],
'lr__C': [0.1, 1, 10]
}
print("优化集成模型...")
# 网格搜索
grid_search = GridSearchCV(
voting_clf,
param_grid,
cv=3, # 减少CV折数以加快速度
scoring='accuracy',
n_jobs=-1,
verbose=1
)
grid_search.fit(X_train, y_train)
# 评估结果
test_score = grid_search.score(X_test, y_test)
print(f"\n集成模型优化结果:")
print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳CV分数: {grid_search.best_score_:.4f}")
print(f"测试分数: {test_score:.4f}")
# 比较单个模型和集成模型
individual_scores = []
for name, model in base_models:
model.fit(X_train, y_train)
score = model.score(X_test, y_test)
individual_scores.append(score)
print(f"{name} 测试分数: {score:.4f}")
# 可视化比较
plt.figure(figsize=(10, 6))
model_names = [name for name, _ in base_models] + ['Ensemble']
scores = individual_scores + [test_score]
bars = plt.bar(model_names, scores, alpha=0.8)
bars[-1].set_color('red') # 突出显示集成模型
plt.xlabel('模型')
plt.ylabel('测试分数')
plt.title('单个模型 vs 集成模型性能比较')
plt.grid(True, alpha=0.3)
# 添加数值标签
for i, score in enumerate(scores):
plt.text(i, score + 0.005, f'{score:.3f}',
ha='center', va='bottom')
plt.tight_layout()
plt.show()
return {
'best_ensemble': grid_search.best_estimator_,
'best_params': grid_search.best_params_,
'ensemble_score': test_score,
'individual_scores': dict(zip([name for name, _ in base_models], individual_scores))
}
# 自动化ML演示
automl = AutoMLDemo()
print("\n自动化机器学习演示:")
print("=" * 30)
# 自动化模型选择
best_models, best_model_name = automl.automated_model_selection()
# 集成模型优化
ensemble_results = automl.ensemble_model_optimization()
5.7 模型选择策略
5.7.1 模型选择准则
class ModelSelectionCriteria:
"""
模型选择准则演示
"""
def __init__(self):
self.results = {}
def information_criteria_demo(self):
"""
信息准则演示(AIC, BIC)
"""
# 创建回归数据集
X, y = make_regression(n_samples=200, n_features=10, noise=0.1, random_state=42)
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 不同复杂度的多项式特征
degrees = range(1, 8)
results = []
for degree in degrees:
# 创建多项式特征
poly_features = PolynomialFeatures(degree=degree, include_bias=False)
X_poly = poly_features.fit_transform(X_train)
X_test_poly = poly_features.transform(X_test)
# 训练线性回归模型
model = LinearRegression()
model.fit(X_poly, y_train)
# 预测
y_pred_train = model.predict(X_poly)
y_pred_test = model.predict(X_test_poly)
# 计算误差
mse_train = mean_squared_error(y_train, y_pred_train)
mse_test = mean_squared_error(y_test, y_pred_test)
# 计算信息准则(简化版本)
n = len(y_train)
k = X_poly.shape[1] # 参数数量
# AIC = n * log(MSE) + 2 * k
aic = n * np.log(mse_train) + 2 * k
# BIC = n * log(MSE) + k * log(n)
bic = n * np.log(mse_train) + k * np.log(n)
results.append({
'多项式阶数': degree,
'参数数量': k,
'训练MSE': mse_train,
'测试MSE': mse_test,
'AIC': aic,
'BIC': bic
})
results_df = pd.DataFrame(results)
# 找到最佳模型
best_aic_idx = results_df['AIC'].idxmin()
best_bic_idx = results_df['BIC'].idxmin()
best_test_idx = results_df['测试MSE'].idxmin()
print("信息准则模型选择结果:")
print(results_df.round(4))
print(f"\nAIC最佳模型: 阶数 {results_df.loc[best_aic_idx, '多项式阶数']}")
print(f"BIC最佳模型: 阶数 {results_df.loc[best_bic_idx, '多项式阶数']}")
print(f"测试MSE最佳模型: 阶数 {results_df.loc[best_test_idx, '多项式阶数']}")
# 可视化
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
# MSE比较
axes[0].plot(results_df['多项式阶数'], results_df['训练MSE'], 'o-', label='训练MSE')
axes[0].plot(results_df['多项式阶数'], results_df['测试MSE'], 'o-', label='测试MSE')
axes[0].axvline(x=results_df.loc[best_test_idx, '多项式阶数'],
color='red', linestyle='--', alpha=0.7, label='最佳测试MSE')
axes[0].set_xlabel('多项式阶数')
axes[0].set_ylabel('MSE')
axes[0].set_title('训练 vs 测试 MSE')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# AIC
axes[1].plot(results_df['多项式阶数'], results_df['AIC'], 'o-', color='green')
axes[1].axvline(x=results_df.loc[best_aic_idx, '多项式阶数'],
color='red', linestyle='--', alpha=0.7, label='最佳AIC')
axes[1].set_xlabel('多项式阶数')
axes[1].set_ylabel('AIC')
axes[1].set_title('AIC vs 模型复杂度')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
# BIC
axes[2].plot(results_df['多项式阶数'], results_df['BIC'], 'o-', color='orange')
axes[2].axvline(x=results_df.loc[best_bic_idx, '多项式阶数'],
color='red', linestyle='--', alpha=0.7, label='最佳BIC')
axes[2].set_xlabel('多项式阶数')
axes[2].set_ylabel('BIC')
axes[2].set_title('BIC vs 模型复杂度')
axes[2].legend()
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return results_df
def statistical_significance_testing(self):
"""
统计显著性检验
"""
from scipy import stats
# 创建数据集
X, y = make_classification(n_samples=500, n_features=20, n_informative=10,
n_redundant=10, random_state=42)
# 模型列表
models = {
'Logistic Regression': LogisticRegression(random_state=42, max_iter=1000),
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
'SVM': SVC(random_state=42),
'Naive Bayes': GaussianNB()
}
# 交叉验证获取分数
cv_scores = {}
for name, model in models.items():
scores = cross_val_score(model, X, y, cv=10, scoring='accuracy')
cv_scores[name] = scores
# 配对t检验
model_names = list(models.keys())
n_models = len(model_names)
# 创建p值矩阵
p_values = np.ones((n_models, n_models))
for i in range(n_models):
for j in range(i+1, n_models):
# 配对t检验
t_stat, p_val = stats.ttest_rel(cv_scores[model_names[i]],
cv_scores[model_names[j]])
p_values[i, j] = p_val
p_values[j, i] = p_val
# 可视化结果
fig, axes = plt.subplots(1, 2, figsize=(15, 6))
# 模型性能比较
means = [cv_scores[name].mean() for name in model_names]
stds = [cv_scores[name].std() for name in model_names]
axes[0].bar(range(len(model_names)), means, yerr=stds,
capsize=5, alpha=0.8)
axes[0].set_xlabel('模型')
axes[0].set_ylabel('准确率')
axes[0].set_title('模型性能比较(均值±标准差)')
axes[0].set_xticks(range(len(model_names)))
axes[0].set_xticklabels(model_names, rotation=45)
axes[0].grid(True, alpha=0.3)
# p值热力图
im = axes[1].imshow(p_values, cmap='RdYlBu_r', vmin=0, vmax=0.1)
axes[1].set_xticks(range(n_models))
axes[1].set_yticks(range(n_models))
axes[1].set_xticklabels(model_names, rotation=45)
axes[1].set_yticklabels(model_names)
axes[1].set_title('配对t检验 p值矩阵')
# 添加p值文本
for i in range(n_models):
for j in range(n_models):
if i != j:
text = axes[1].text(j, i, f'{p_values[i, j]:.3f}',
ha="center", va="center", color="black")
plt.colorbar(im, ax=axes[1])
plt.tight_layout()
plt.show()
# 打印显著性结果
print("统计显著性检验结果 (α = 0.05):")
print("=" * 40)
for i in range(n_models):
for j in range(i+1, n_models):
p_val = p_values[i, j]
significance = "显著" if p_val < 0.05 else "不显著"
print(f"{model_names[i]} vs {model_names[j]}: p = {p_val:.4f} ({significance})")
return cv_scores, p_values
def model_complexity_analysis(self):
"""
模型复杂度分析
"""
# 创建数据集
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10,
n_redundant=10, random_state=42)
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 不同复杂度的模型
models = {
'线性模型': LogisticRegression(random_state=42, max_iter=1000),
'浅层树': DecisionTreeClassifier(max_depth=3, random_state=42),
'中等树': DecisionTreeClassifier(max_depth=8, random_state=42),
'深层树': DecisionTreeClassifier(max_depth=15, random_state=42),
'随机森林': RandomForestClassifier(n_estimators=100, random_state=42),
'SVM-RBF': SVC(kernel='rbf', random_state=42)
}
results = []
for name, model in models.items():
# 训练模型
start_time = time.time()
model.fit(X_train, y_train)
train_time = time.time() - start_time
# 预测时间
start_time = time.time()
y_pred = model.predict(X_test)
predict_time = time.time() - start_time
# 性能指标
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)
# 模型复杂度指标
if hasattr(model, 'coef_'):
# 线性模型:非零系数数量
complexity = np.sum(np.abs(model.coef_) > 1e-6)
elif hasattr(model, 'tree_'):
# 决策树:叶子节点数量
complexity = model.tree_.n_leaves
elif hasattr(model, 'estimators_'):
# 随机森林:总叶子节点数量
complexity = sum(tree.tree_.n_leaves for tree in model.estimators_)
else:
# 其他模型:支持向量数量(SVM)
if hasattr(model, 'n_support_'):
complexity = np.sum(model.n_support_)
else:
complexity = 0
results.append({
'模型': name,
'训练分数': train_score,
'测试分数': test_score,
'过拟合程度': train_score - test_score,
'模型复杂度': complexity,
'训练时间': train_time,
'预测时间': predict_time
})
results_df = pd.DataFrame(results)
# 可视化
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
# 复杂度 vs 性能
axes[0, 0].scatter(results_df['模型复杂度'], results_df['测试分数'],
s=100, alpha=0.7)
for i, row in results_df.iterrows():
axes[0, 0].annotate(row['模型'],
(row['模型复杂度'], row['测试分数']),
xytext=(5, 5), textcoords='offset points')
axes[0, 0].set_xlabel('模型复杂度')
axes[0, 0].set_ylabel('测试分数')
axes[0, 0].set_title('模型复杂度 vs 测试性能')
axes[0, 0].grid(True, alpha=0.3)
# 复杂度 vs 过拟合
axes[0, 1].scatter(results_df['模型复杂度'], results_df['过拟合程度'],
s=100, alpha=0.7, color='red')
for i, row in results_df.iterrows():
axes[0, 1].annotate(row['模型'],
(row['模型复杂度'], row['过拟合程度']),
xytext=(5, 5), textcoords='offset points')
axes[0, 1].set_xlabel('模型复杂度')
axes[0, 1].set_ylabel('过拟合程度')
axes[0, 1].set_title('模型复杂度 vs 过拟合程度')
axes[0, 1].grid(True, alpha=0.3)
# 训练时间比较
axes[1, 0].bar(range(len(results_df)), results_df['训练时间'], alpha=0.8)
axes[1, 0].set_xlabel('模型')
axes[1, 0].set_ylabel('训练时间 (秒)')
axes[1, 0].set_title('模型训练时间比较')
axes[1, 0].set_xticks(range(len(results_df)))
axes[1, 0].set_xticklabels(results_df['模型'], rotation=45)
axes[1, 0].grid(True, alpha=0.3)
# 预测时间比较
axes[1, 1].bar(range(len(results_df)), results_df['预测时间'],
alpha=0.8, color='orange')
axes[1, 1].set_xlabel('模型')
axes[1, 1].set_ylabel('预测时间 (秒)')
axes[1, 1].set_title('模型预测时间比较')
axes[1, 1].set_xticks(range(len(results_df)))
axes[1, 1].set_xticklabels(results_df['模型'], rotation=45)
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("模型复杂度分析结果:")
print(results_df.round(4))
return results_df
# 模型选择准则演示
model_selection = ModelSelectionCriteria()
print("\n模型选择准则演示:")
print("=" * 30)
# 信息准则
info_criteria_results = model_selection.information_criteria_demo()
# 统计显著性检验
cv_scores, p_values = model_selection.statistical_significance_testing()
# 模型复杂度分析
complexity_results = model_selection.model_complexity_analysis()
5.8 实际案例:信用卡欺诈检测
5.8.1 案例背景
class CreditCardFraudDetection:
"""
信用卡欺诈检测完整案例
"""
def __init__(self):
self.models = {}
self.results = {}
self.best_model = None
def create_imbalanced_dataset(self):
"""
创建不平衡的信用卡欺诈数据集
"""
# 创建不平衡数据集(模拟信用卡欺诈检测)
X, y = make_classification(
n_samples=10000,
n_features=30,
n_informative=20,
n_redundant=10,
n_clusters_per_class=1,
weights=[0.99, 0.01], # 99%正常,1%欺诈
flip_y=0.01,
random_state=42
)
# 添加特征名称
feature_names = [f'feature_{i}' for i in range(X.shape[1])]
print(f"数据集信息:")
print(f"样本数量: {X.shape[0]}")
print(f"特征数量: {X.shape[1]}")
print(f"正常交易: {np.sum(y == 0)} ({np.sum(y == 0)/len(y)*100:.2f}%)")
print(f"欺诈交易: {np.sum(y == 1)} ({np.sum(y == 1)/len(y)*100:.2f}%)")
return X, y, feature_names
def comprehensive_evaluation(self):
"""
综合评估流程
"""
# 创建数据集
X, y, feature_names = self.create_imbalanced_dataset()
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# 标准化特征
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 定义候选模型
models = {
'Logistic Regression': LogisticRegression(random_state=42, max_iter=1000),
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
'SVM': SVC(probability=True, random_state=42),
'Gradient Boosting': GradientBoostingClassifier(random_state=42),
'XGBoost': XGBClassifier(random_state=42, eval_metric='logloss')
}
# 评估指标
scoring_metrics = ['accuracy', 'precision', 'recall', 'f1', 'roc_auc']
evaluation_results = []
print("\n开始模型评估...")
print("=" * 50)
for name, model in models.items():
print(f"\n评估 {name}...")
# 交叉验证
cv_results = {}
for metric in scoring_metrics:
scores = cross_val_score(model, X_train_scaled, y_train,
cv=5, scoring=metric)
cv_results[f'cv_{metric}'] = scores.mean()
cv_results[f'cv_{metric}_std'] = scores.std()
# 训练模型
model.fit(X_train_scaled, y_train)
# 预测
y_pred = model.predict(X_test_scaled)
y_pred_proba = model.predict_proba(X_test_scaled)[:, 1]
# 计算测试集指标
test_results = {
'test_accuracy': accuracy_score(y_test, y_pred),
'test_precision': precision_score(y_test, y_pred),
'test_recall': recall_score(y_test, y_pred),
'test_f1': f1_score(y_test, y_pred),
'test_roc_auc': roc_auc_score(y_test, y_pred_proba)
}
# 合并结果
model_results = {'model': name, **cv_results, **test_results}
evaluation_results.append(model_results)
# 存储模型
self.models[name] = model
print(f"CV AUC: {cv_results['cv_roc_auc']:.4f} (±{cv_results['cv_roc_auc_std']:.4f})")
print(f"Test AUC: {test_results['test_roc_auc']:.4f}")
print(f"Test F1: {test_results['test_f1']:.4f}")
# 转换为DataFrame
results_df = pd.DataFrame(evaluation_results)
# 可视化结果
self.visualize_model_comparison(results_df)
# 选择最佳模型(基于AUC)
best_model_idx = results_df['test_roc_auc'].idxmax()
best_model_name = results_df.loc[best_model_idx, 'model']
self.best_model = self.models[best_model_name]
print(f"\n最佳模型: {best_model_name}")
print(f"最佳AUC: {results_df.loc[best_model_idx, 'test_roc_auc']:.4f}")
return results_df, X_test_scaled, y_test, scaler
def visualize_model_comparison(self, results_df):
"""
可视化模型比较
"""
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
metrics = ['accuracy', 'precision', 'recall', 'f1', 'roc_auc']
for i, metric in enumerate(metrics):
row = i // 3
col = i % 3
cv_scores = results_df[f'cv_{metric}']
test_scores = results_df[f'test_{metric}']
x = np.arange(len(results_df))
width = 0.35
axes[row, col].bar(x - width/2, cv_scores, width,
label='CV', alpha=0.8)
axes[row, col].bar(x + width/2, test_scores, width,
label='Test', alpha=0.8)
axes[row, col].set_xlabel('模型')
axes[row, col].set_ylabel(metric.upper())
axes[row, col].set_title(f'{metric.upper()} 比较')
axes[row, col].set_xticks(x)
axes[row, col].set_xticklabels(results_df['model'], rotation=45)
axes[row, col].legend()
axes[row, col].grid(True, alpha=0.3)
# 删除多余的子图
axes[1, 2].remove()
plt.tight_layout()
plt.show()
def detailed_analysis(self, X_test, y_test):
"""
详细分析最佳模型
"""
if self.best_model is None:
print("请先运行 comprehensive_evaluation()")
return
# 预测
y_pred = self.best_model.predict(X_test)
y_pred_proba = self.best_model.predict_proba(X_test)[:, 1]
# 详细分类报告
print("\n详细分类报告:")
print("=" * 30)
print(classification_report(y_test, y_pred,
target_names=['正常', '欺诈']))
# 混淆矩阵
cm = confusion_matrix(y_test, y_pred)
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
# 混淆矩阵热力图
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=['正常', '欺诈'],
yticklabels=['正常', '欺诈'], ax=axes[0])
axes[0].set_title('混淆矩阵')
axes[0].set_xlabel('预测标签')
axes[0].set_ylabel('真实标签')
# ROC曲线
fpr, tpr, _ = roc_curve(y_test, y_pred_proba)
roc_auc = auc(fpr, tpr)
axes[1].plot(fpr, tpr, color='darkorange', lw=2,
label=f'ROC curve (AUC = {roc_auc:.4f})')
axes[1].plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
axes[1].set_xlim([0.0, 1.0])
axes[1].set_ylim([0.0, 1.05])
axes[1].set_xlabel('假正率 (FPR)')
axes[1].set_ylabel('真正率 (TPR)')
axes[1].set_title('ROC曲线')
axes[1].legend(loc="lower right")
axes[1].grid(True, alpha=0.3)
# 精确率-召回率曲线
precision, recall, _ = precision_recall_curve(y_test, y_pred_proba)
pr_auc = auc(recall, precision)
axes[2].plot(recall, precision, color='blue', lw=2,
label=f'PR curve (AUC = {pr_auc:.4f})')
axes[2].set_xlabel('召回率')
axes[2].set_ylabel('精确率')
axes[2].set_title('精确率-召回率曲线')
axes[2].legend(loc="lower left")
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 阈值分析
self.threshold_analysis(y_test, y_pred_proba)
def threshold_analysis(self, y_test, y_pred_proba):
"""
阈值分析
"""
thresholds = np.arange(0.1, 1.0, 0.05)
threshold_results = []
for threshold in thresholds:
y_pred_thresh = (y_pred_proba >= threshold).astype(int)
precision = precision_score(y_test, y_pred_thresh)
recall = recall_score(y_test, y_pred_thresh)
f1 = f1_score(y_test, y_pred_thresh)
threshold_results.append({
'threshold': threshold,
'precision': precision,
'recall': recall,
'f1': f1
})
threshold_df = pd.DataFrame(threshold_results)
# 可视化阈值分析
plt.figure(figsize=(12, 8))
plt.subplot(2, 2, 1)
plt.plot(threshold_df['threshold'], threshold_df['precision'], 'o-', label='精确率')
plt.plot(threshold_df['threshold'], threshold_df['recall'], 'o-', label='召回率')
plt.plot(threshold_df['threshold'], threshold_df['f1'], 'o-', label='F1分数')
plt.xlabel('阈值')
plt.ylabel('分数')
plt.title('阈值 vs 性能指标')
plt.legend()
plt.grid(True, alpha=0.3)
# 找到最佳F1阈值
best_f1_idx = threshold_df['f1'].idxmax()
best_threshold = threshold_df.loc[best_f1_idx, 'threshold']
best_f1 = threshold_df.loc[best_f1_idx, 'f1']
plt.subplot(2, 2, 2)
plt.plot(threshold_df['threshold'], threshold_df['f1'], 'o-', color='red')
plt.axvline(x=best_threshold, color='blue', linestyle='--',
label=f'最佳阈值: {best_threshold:.2f}')
plt.xlabel('阈值')
plt.ylabel('F1分数')
plt.title('F1分数 vs 阈值')
plt.legend()
plt.grid(True, alpha=0.3)
# 精确率-召回率权衡
plt.subplot(2, 2, 3)
plt.plot(threshold_df['recall'], threshold_df['precision'], 'o-')
plt.xlabel('召回率')
plt.ylabel('精确率')
plt.title('精确率-召回率权衡')
plt.grid(True, alpha=0.3)
# 业务成本分析(假设)
plt.subplot(2, 2, 4)
# 假设:漏检一个欺诈的成本是误报的10倍
fn_cost = 1000 # 漏检成本
fp_cost = 100 # 误报成本
costs = []
for _, row in threshold_df.iterrows():
y_pred_thresh = (y_pred_proba >= row['threshold']).astype(int)
cm = confusion_matrix(y_test, y_pred_thresh)
if cm.shape == (2, 2):
tn, fp, fn, tp = cm.ravel()
total_cost = fn * fn_cost + fp * fp_cost
costs.append(total_cost)
else:
costs.append(float('inf'))
plt.plot(threshold_df['threshold'], costs, 'o-', color='purple')
plt.xlabel('阈值')
plt.ylabel('总成本')
plt.title('业务成本 vs 阈值')
plt.grid(True, alpha=0.3)
# 找到最低成本阈值
min_cost_idx = np.argmin(costs)
min_cost_threshold = threshold_df.loc[min_cost_idx, 'threshold']
plt.axvline(x=min_cost_threshold, color='red', linestyle='--',
label=f'最低成本阈值: {min_cost_threshold:.2f}')
plt.legend()
plt.tight_layout()
plt.show()
print(f"\n阈值分析结果:")
print(f"最佳F1阈值: {best_threshold:.3f} (F1 = {best_f1:.4f})")
print(f"最低成本阈值: {min_cost_threshold:.3f} (成本 = {min(costs):.0f})")
return threshold_df
def feature_importance_analysis(self, X_test, feature_names):
"""
特征重要性分析
"""
if self.best_model is None:
print("请先运行 comprehensive_evaluation()")
return
# 获取特征重要性
if hasattr(self.best_model, 'feature_importances_'):
importances = self.best_model.feature_importances_
elif hasattr(self.best_model, 'coef_'):
importances = np.abs(self.best_model.coef_[0])
else:
print("该模型不支持特征重要性分析")
return
# 创建特征重要性DataFrame
feature_importance_df = pd.DataFrame({
'feature': feature_names,
'importance': importances
}).sort_values('importance', ascending=False)
# 可视化特征重要性
plt.figure(figsize=(12, 8))
# 前20个最重要特征
top_features = feature_importance_df.head(20)
plt.subplot(1, 2, 1)
plt.barh(range(len(top_features)), top_features['importance'])
plt.yticks(range(len(top_features)), top_features['feature'])
plt.xlabel('重要性')
plt.title('前20个最重要特征')
plt.gca().invert_yaxis()
# 累积重要性
plt.subplot(1, 2, 2)
cumsum_importance = np.cumsum(feature_importance_df['importance'])
plt.plot(range(1, len(cumsum_importance) + 1),
cumsum_importance / cumsum_importance[-1], 'o-')
plt.axhline(y=0.8, color='red', linestyle='--', label='80%重要性')
plt.axhline(y=0.9, color='orange', linestyle='--', label='90%重要性')
plt.xlabel('特征数量')
plt.ylabel('累积重要性比例')
plt.title('累积特征重要性')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 找到达到80%重要性的特征数量
n_features_80 = np.argmax(cumsum_importance / cumsum_importance[-1] >= 0.8) + 1
n_features_90 = np.argmax(cumsum_importance / cumsum_importance[-1] >= 0.9) + 1
print(f"\n特征重要性分析:")
print(f"达到80%重要性需要: {n_features_80} 个特征")
print(f"达到90%重要性需要: {n_features_90} 个特征")
print(f"\n前10个最重要特征:")
for i, row in feature_importance_df.head(10).iterrows():
print(f"{row['feature']}: {row['importance']:.4f}")
return feature_importance_df
# 信用卡欺诈检测案例演示
fraud_detection = CreditCardFraudDetection()
print("\n信用卡欺诈检测案例:")
print("=" * 40)
# 综合评估
results_df, X_test, y_test, scaler = fraud_detection.comprehensive_evaluation()
# 详细分析
fraud_detection.detailed_analysis(X_test, y_test)
# 特征重要性分析
feature_names = [f'feature_{i}' for i in range(X_test.shape[1])]
feature_importance = fraud_detection.feature_importance_analysis(X_test, feature_names)
5.9 最佳实践与建议
5.9.1 模型评估最佳实践
class ModelEvaluationBestPractices:
"""
模型评估最佳实践指南
"""
@staticmethod
def evaluation_checklist():
"""
模型评估检查清单
"""
checklist = {
"数据准备": [
"✓ 检查数据质量和完整性",
"✓ 处理缺失值和异常值",
"✓ 确保训练/验证/测试集的代表性",
"✓ 检查数据泄露问题",
"✓ 考虑时间序列数据的时间顺序"
],
"评估策略": [
"✓ 选择合适的交叉验证策略",
"✓ 使用分层采样处理不平衡数据",
"✓ 设置合适的随机种子确保可重复性",
"✓ 考虑计算资源和时间限制",
"✓ 使用多个评估指标"
],
"指标选择": [
"✓ 根据业务目标选择主要指标",
"✓ 考虑类别不平衡的影响",
"✓ 使用置信区间评估指标稳定性",
"✓ 比较多个模型的统计显著性",
"✓ 考虑模型的解释性需求"
],
"模型选择": [
"✓ 避免在测试集上进行模型选择",
"✓ 使用嵌套交叉验证进行无偏估计",
"✓ 考虑模型复杂度和泛化能力",
"✓ 评估模型的鲁棒性",
"✓ 考虑部署和维护成本"
],
"结果报告": [
"✓ 报告所有相关指标和置信区间",
"✓ 提供详细的实验设置",
"✓ 分析模型的失败案例",
"✓ 讨论模型的局限性",
"✓ 提供可重现的代码和数据"
]
}
print("模型评估最佳实践检查清单:")
print("=" * 40)
for category, items in checklist.items():
print(f"\n{category}:")
for item in items:
print(f" {item}")
return checklist
@staticmethod
def common_pitfalls():
"""
常见陷阱和错误
"""
pitfalls = {
"数据泄露": {
"描述": "训练数据中包含了未来信息或目标变量的直接信息",
"例子": "在预测客户流失时使用了流失后的行为数据",
"解决方案": "严格按时间顺序分割数据,仔细检查特征工程过程"
},
"过度拟合验证集": {
"描述": "在验证集上反复调参导致模型过拟合验证集",
"例子": "多次在验证集上测试不同超参数组合",
"解决方案": "使用嵌套交叉验证或保留独立的测试集"
},
"不当的交叉验证": {
"描述": "在时间序列或聚类数据上使用随机交叉验证",
"例子": "对股价预测使用K-Fold而非时间序列分割",
"解决方案": "根据数据特性选择合适的交叉验证策略"
},
"忽略类别不平衡": {
"描述": "在不平衡数据上只关注准确率",
"例子": "在欺诈检测中只看准确率而忽略召回率",
"解决方案": "使用F1、AUC等适合不平衡数据的指标"
},
"统计显著性忽视": {
"描述": "比较模型时不考虑性能差异的统计显著性",
"例子": "认为0.85和0.84的AUC有实质性差异",
"解决方案": "使用配对t检验等统计方法验证差异显著性"
}
}
print("\n常见陷阱和解决方案:")
print("=" * 40)
for pitfall, details in pitfalls.items():
print(f"\n{pitfall}:")
print(f" 描述: {details['描述']}")
print(f" 例子: {details['例子']}")
print(f" 解决方案: {details['解决方案']}")
return pitfalls
@staticmethod
def metric_selection_guide():
"""
指标选择指南
"""
guide = {
"分类问题": {
"平衡数据": {
"主要指标": ["准确率", "F1分数"],
"辅助指标": ["精确率", "召回率", "AUC"],
"适用场景": "各类别样本数量相近"
},
"不平衡数据": {
"主要指标": ["F1分数", "AUC", "平均精确率"],
"辅助指标": ["精确率", "召回率", "特异性"],
"适用场景": "少数类别样本稀少但重要"
},
"多分类": {
"主要指标": ["宏平均F1", "加权平均F1"],
"辅助指标": ["微平均F1", "每类别F1"],
"适用场景": "多个类别需要同等重视"
}
},
"回归问题": {
"一般回归": {
"主要指标": ["RMSE", "MAE"],
"辅助指标": ["R²", "MAPE"],
"适用场景": "预测连续数值"
},
"比例预测": {
"主要指标": ["MAPE", "SMAPE"],
"辅助指标": ["MAE", "RMSE"],
"适用场景": "预测比例、百分比等"
},
"时间序列": {
"主要指标": ["MASE", "sMAPE"],
"辅助指标": ["MAE", "RMSE"],
"适用场景": "时间序列预测"
}
}
}
print("\n指标选择指南:")
print("=" * 40)
for problem_type, scenarios in guide.items():
print(f"\n{problem_type}:")
for scenario, details in scenarios.items():
print(f" {scenario}:")
print(f" 主要指标: {', '.join(details['主要指标'])}")
print(f" 辅助指标: {', '.join(details['辅助指标'])}")
print(f" 适用场景: {details['适用场景']}")
return guide
# 最佳实践演示
best_practices = ModelEvaluationBestPractices()
print("\n模型评估最佳实践:")
print("=" * 40)
# 检查清单
checklist = best_practices.evaluation_checklist()
# 常见陷阱
pitfalls = best_practices.common_pitfalls()
# 指标选择指南
metric_guide = best_practices.metric_selection_guide()
5.10 本章小结
核心内容回顾
本章深入探讨了机器学习中的模型评估与选择,涵盖了以下核心内容:
1. 评估指标体系 - 分类指标:准确率、精确率、召回率、F1分数、AUC-ROC、AUC-PR - 回归指标:MSE、RMSE、MAE、R²、MAPE - 多分类指标:宏平均、微平均、加权平均
2. 交叉验证技术 - 基础方法:K-Fold、Stratified K-Fold、Leave-One-Out - 高级技术:时间序列分割、分组交叉验证、嵌套交叉验证 - 应用场景:不平衡数据、时间序列、聚类数据
3. 学习曲线分析 - 学习曲线:诊断过拟合和欠拟合 - 验证曲线:超参数对性能的影响 - 偏差-方差分析:理解模型误差来源
4. 超参数优化 - 搜索策略:网格搜索、随机搜索、贝叶斯优化 - 自动化ML:模型选择、集成优化 - 效率考虑:计算成本与性能平衡
5. 模型选择准则 - 信息准则:AIC、BIC - 统计检验:配对t检验、显著性分析 - 复杂度分析:模型复杂度与性能权衡
实践技能
通过本章学习,你应该掌握:
- 评估策略设计:根据问题特点选择合适的评估方法
- 指标解释能力:理解不同指标的含义和适用场景
- 交叉验证实施:正确实施各种交叉验证技术
- 超参数调优:高效地搜索最优超参数
- 模型比较分析:科学地比较不同模型的性能
- 结果可视化:清晰地展示评估结果
最佳实践
- 避免数据泄露:严格控制信息流向
- 合理分割数据:确保训练/验证/测试集的代表性
- 多指标评估:不依赖单一指标做决策
- 统计显著性:验证模型差异的可靠性
- 业务导向:将技术指标与业务目标对齐
下一章预告
下一章我们将学习集成学习方法,包括: - Bagging和Boosting算法 - 随机森林和梯度提升 - 投票和堆叠集成 - 集成学习的理论基础 - 实际应用案例
练习题
基础题 1. 解释精确率和召回率的区别,什么情况下应该优化哪个指标? 2. 为什么在不平衡数据集上准确率可能不是好的评估指标? 3. K-Fold交叉验证和留一法交叉验证各有什么优缺点?
进阶题 4. 设计一个实验比较网格搜索和随机搜索的效率和效果 5. 实现一个自定义的交叉验证策略用于时间序列数据 6. 分析学习曲线,判断模型是否存在过拟合或欠拟合问题
项目题 7. 选择一个实际数据集,实施完整的模型评估流程 8. 比较至少5种不同算法在你选择的数据集上的性能 9. 进行超参数优化,并分析不同超参数对模型性能的影响
思考题 10. 在什么情况下应该使用嵌套交叉验证? 11. 如何在模型性能和计算效率之间找到平衡? 12. 讨论模型评估中可能遇到的伦理和公平性问题