本章概述
分类是监督学习中最重要的任务之一,目标是根据输入特征预测样本的类别标签。本章将深入介绍Scikit-learn中的主要分类算法,包括它们的原理、实现、参数调优和应用场景。
学习目标
- 理解分类问题的基本概念和评估指标
- 掌握主要分类算法的原理和实现
- 学会选择合适的分类算法
- 掌握分类模型的调优技巧
- 了解分类算法的优缺点和适用场景
1. 分类基础
1.1 分类问题概述
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import make_classification, load_iris, load_wine, load_breast_cancer
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, precision_score, recall_score, f1_score
import warnings
warnings.filterwarnings('ignore')
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
class ClassificationBasics:
"""分类基础知识演示"""
def __init__(self):
self.datasets = {}
self.models = {}
def create_sample_datasets(self):
"""创建示例数据集"""
# 二分类数据集
X_binary, y_binary = make_classification(
n_samples=1000, n_features=20, n_informative=10,
n_redundant=10, n_classes=2, random_state=42
)
# 多分类数据集
X_multi, y_multi = make_classification(
n_samples=1000, n_features=20, n_informative=15,
n_redundant=5, n_classes=3, random_state=42
)
# 不平衡数据集
X_imbalanced, y_imbalanced = make_classification(
n_samples=1000, n_features=20, n_informative=10,
n_redundant=10, n_classes=2, weights=[0.9, 0.1], random_state=42
)
self.datasets = {
'binary': (X_binary, y_binary),
'multi': (X_multi, y_multi),
'imbalanced': (X_imbalanced, y_imbalanced)
}
print("=== 数据集创建完成 ===")
for name, (X, y) in self.datasets.items():
print(f"{name}: 样本数={X.shape[0]}, 特征数={X.shape[1]}, 类别数={len(np.unique(y))}")
print(f" 类别分布: {np.bincount(y)}")
return self.datasets
def load_real_datasets(self):
"""加载真实数据集"""
# 鸢尾花数据集(多分类)
iris = load_iris()
# 红酒数据集(多分类)
wine = load_wine()
# 乳腺癌数据集(二分类)
cancer = load_breast_cancer()
real_datasets = {
'iris': (iris.data, iris.target, iris.feature_names, iris.target_names),
'wine': (wine.data, wine.target, wine.feature_names, wine.target_names),
'cancer': (cancer.data, cancer.target, cancer.feature_names, cancer.target_names)
}
print("\n=== 真实数据集信息 ===")
for name, (X, y, feature_names, target_names) in real_datasets.items():
print(f"{name}: 样本数={X.shape[0]}, 特征数={X.shape[1]}")
print(f" 类别: {target_names}")
print(f" 类别分布: {np.bincount(y)}")
return real_datasets
def visualize_classification_data(self, X, y, title="分类数据可视化"):
"""可视化分类数据"""
# 使用PCA降维到2D进行可视化
from sklearn.decomposition import PCA
if X.shape[1] > 2:
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X)
explained_var = pca.explained_variance_ratio_
else:
X_pca = X
explained_var = [1.0, 1.0]
plt.figure(figsize=(10, 8))
# 绘制不同类别的点
unique_labels = np.unique(y)
colors = plt.cm.Set1(np.linspace(0, 1, len(unique_labels)))
for i, label in enumerate(unique_labels):
mask = y == label
plt.scatter(X_pca[mask, 0], X_pca[mask, 1],
c=[colors[i]], label=f'类别 {label}', alpha=0.7, s=50)
plt.xlabel(f'第一主成分 (解释方差: {explained_var[0]:.2%})')
plt.ylabel(f'第二主成分 (解释方差: {explained_var[1]:.2%})')
plt.title(title)
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return X_pca
# 演示分类基础
classification_demo = ClassificationBasics()
# 创建示例数据集
sample_datasets = classification_demo.create_sample_datasets()
# 加载真实数据集
real_datasets = classification_demo.load_real_datasets()
# 可视化数据
X_binary, y_binary = sample_datasets['binary']
classification_demo.visualize_classification_data(X_binary, y_binary, "二分类数据可视化")
X_multi, y_multi = sample_datasets['multi']
classification_demo.visualize_classification_data(X_multi, y_multi, "多分类数据可视化")
1.2 分类评估指标
class ClassificationMetrics:
"""分类评估指标详解"""
def __init__(self):
self.metrics_results = {}
def calculate_basic_metrics(self, y_true, y_pred, average='weighted'):
"""计算基础分类指标"""
from sklearn.metrics import (accuracy_score, precision_score, recall_score,
f1_score, classification_report, confusion_matrix)
# 基础指标
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average=average, zero_division=0)
recall = recall_score(y_true, y_pred, average=average, zero_division=0)
f1 = f1_score(y_true, y_pred, average=average, zero_division=0)
metrics = {
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1_score': f1
}
print(f"=== 基础分类指标 (average={average}) ===")
for metric, value in metrics.items():
print(f"{metric}: {value:.4f}")
# 详细分类报告
print(f"\n=== 详细分类报告 ===")
print(classification_report(y_true, y_pred))
# 混淆矩阵
cm = confusion_matrix(y_true, y_pred)
self.plot_confusion_matrix(cm, np.unique(y_true))
return metrics, cm
def plot_confusion_matrix(self, cm, class_names):
"""绘制混淆矩阵"""
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=class_names, yticklabels=class_names)
plt.title('混淆矩阵')
plt.xlabel('预测标签')
plt.ylabel('真实标签')
plt.tight_layout()
plt.show()
def calculate_advanced_metrics(self, y_true, y_pred_proba, y_pred):
"""计算高级分类指标"""
from sklearn.metrics import roc_auc_score, roc_curve, precision_recall_curve, average_precision_score
n_classes = len(np.unique(y_true))
if n_classes == 2:
# 二分类指标
# ROC-AUC
if y_pred_proba.ndim > 1:
auc = roc_auc_score(y_true, y_pred_proba[:, 1])
fpr, tpr, _ = roc_curve(y_true, y_pred_proba[:, 1])
else:
auc = roc_auc_score(y_true, y_pred_proba)
fpr, tpr, _ = roc_curve(y_true, y_pred_proba)
# Precision-Recall AUC
if y_pred_proba.ndim > 1:
ap = average_precision_score(y_true, y_pred_proba[:, 1])
precision_curve, recall_curve, _ = precision_recall_curve(y_true, y_pred_proba[:, 1])
else:
ap = average_precision_score(y_true, y_pred_proba)
precision_curve, recall_curve, _ = precision_recall_curve(y_true, y_pred_proba)
print(f"=== 二分类高级指标 ===")
print(f"ROC-AUC: {auc:.4f}")
print(f"Average Precision: {ap:.4f}")
# 绘制ROC和PR曲线
self.plot_roc_pr_curves(fpr, tpr, auc, precision_curve, recall_curve, ap)
else:
# 多分类指标
try:
auc_ovr = roc_auc_score(y_true, y_pred_proba, multi_class='ovr', average='weighted')
auc_ovo = roc_auc_score(y_true, y_pred_proba, multi_class='ovo', average='weighted')
print(f"=== 多分类高级指标 ===")
print(f"ROC-AUC (OvR): {auc_ovr:.4f}")
print(f"ROC-AUC (OvO): {auc_ovo:.4f}")
except Exception as e:
print(f"多分类AUC计算失败: {e}")
def plot_roc_pr_curves(self, fpr, tpr, auc, precision, recall, ap):
"""绘制ROC和PR曲线"""
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# ROC曲线
ax1.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC曲线 (AUC = {auc:.2f})')
ax1.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--', label='随机分类器')
ax1.set_xlim([0.0, 1.0])
ax1.set_ylim([0.0, 1.05])
ax1.set_xlabel('假正率 (FPR)')
ax1.set_ylabel('真正率 (TPR)')
ax1.set_title('ROC曲线')
ax1.legend(loc="lower right")
ax1.grid(True, alpha=0.3)
# PR曲线
ax2.plot(recall, precision, color='darkorange', lw=2, label=f'PR曲线 (AP = {ap:.2f})')
ax2.set_xlim([0.0, 1.0])
ax2.set_ylim([0.0, 1.05])
ax2.set_xlabel('召回率 (Recall)')
ax2.set_ylabel('精确率 (Precision)')
ax2.set_title('Precision-Recall曲线')
ax2.legend(loc="lower left")
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def compare_models_metrics(self, models_results):
"""比较多个模型的指标"""
metrics_df = pd.DataFrame(models_results).T
print("=== 模型性能比较 ===")
print(metrics_df.round(4))
# 可视化比较
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
metrics = ['accuracy', 'precision', 'recall', 'f1_score']
for i, metric in enumerate(metrics):
ax = axes[i//2, i%2]
if metric in metrics_df.columns:
metrics_df[metric].plot(kind='bar', ax=ax, color='skyblue', alpha=0.7)
ax.set_title(f'{metric.upper()} 比较')
ax.set_ylabel(metric.upper())
ax.tick_params(axis='x', rotation=45)
ax.grid(True, alpha=0.3)
# 添加数值标签
for j, v in enumerate(metrics_df[metric]):
ax.text(j, v + 0.01, f'{v:.3f}', ha='center', va='bottom')
plt.tight_layout()
plt.show()
return metrics_df
# 演示评估指标
metrics_demo = ClassificationMetrics()
# 使用示例数据演示指标计算
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
X, y = sample_datasets['binary']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 训练一个简单模型
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X_train, y_train)
y_pred = rf.predict(X_test)
y_pred_proba = rf.predict_proba(X_test)
# 计算指标
basic_metrics, cm = metrics_demo.calculate_basic_metrics(y_test, y_pred)
metrics_demo.calculate_advanced_metrics(y_test, y_pred_proba, y_pred)
2. 逻辑回归
2.1 逻辑回归原理与实现
class LogisticRegressionDemo:
"""逻辑回归演示"""
def __init__(self):
self.models = {}
self.results = {}
def logistic_regression_theory(self):
"""逻辑回归理论解释"""
print("=== 逻辑回归理论 ===")
print("1. Sigmoid函数: σ(z) = 1 / (1 + e^(-z))")
print("2. 线性组合: z = w₀ + w₁x₁ + w₂x₂ + ... + wₙxₙ")
print("3. 概率预测: P(y=1|x) = σ(w^T x)")
print("4. 损失函数: Log-likelihood (交叉熵)")
print("5. 优化方法: 梯度下降、牛顿法等")
# 可视化Sigmoid函数
self.plot_sigmoid_function()
def plot_sigmoid_function(self):
"""绘制Sigmoid函数"""
z = np.linspace(-10, 10, 100)
sigmoid = 1 / (1 + np.exp(-z))
plt.figure(figsize=(10, 6))
plt.plot(z, sigmoid, 'b-', linewidth=2, label='Sigmoid函数')
plt.axhline(y=0.5, color='r', linestyle='--', alpha=0.7, label='决策边界 (0.5)')
plt.axvline(x=0, color='g', linestyle='--', alpha=0.7, label='z=0')
plt.xlabel('z (线性组合)')
plt.ylabel('σ(z) (概率)')
plt.title('Sigmoid函数')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def basic_logistic_regression(self, X, y):
"""基础逻辑回归"""
from sklearn.linear_model import LogisticRegression
# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 特征标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 训练模型
lr = LogisticRegression(random_state=42)
lr.fit(X_train_scaled, y_train)
# 预测
y_pred = lr.predict(X_test_scaled)
y_pred_proba = lr.predict_proba(X_test_scaled)
# 评估
accuracy = accuracy_score(y_test, y_pred)
print(f"=== 基础逻辑回归结果 ===")
print(f"准确率: {accuracy:.4f}")
print(f"模型系数: {lr.coef_[0][:5]}...") # 显示前5个系数
print(f"截距: {lr.intercept_[0]:.4f}")
self.models['basic_lr'] = lr
self.results['basic_lr'] = {
'accuracy': accuracy,
'y_test': y_test,
'y_pred': y_pred,
'y_pred_proba': y_pred_proba
}
return lr, accuracy
def regularized_logistic_regression(self, X, y):
"""正则化逻辑回归"""
from sklearn.linear_model import LogisticRegression
# 数据分割和标准化
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 不同正则化方法
regularization_methods = {
'L1 (Lasso)': LogisticRegression(penalty='l1', solver='liblinear', C=1.0, random_state=42),
'L2 (Ridge)': LogisticRegression(penalty='l2', solver='liblinear', C=1.0, random_state=42),
'Elastic Net': LogisticRegression(penalty='elasticnet', solver='saga', l1_ratio=0.5, C=1.0, random_state=42, max_iter=1000)
}
results = {}
print("=== 正则化逻辑回归比较 ===")
for name, model in regularization_methods.items():
try:
model.fit(X_train_scaled, y_train)
y_pred = model.predict(X_test_scaled)
accuracy = accuracy_score(y_test, y_pred)
# 计算非零系数数量(特征选择效果)
non_zero_coefs = np.sum(np.abs(model.coef_[0]) > 1e-5)
results[name] = {
'accuracy': accuracy,
'non_zero_features': non_zero_coefs,
'model': model
}
print(f"{name}: 准确率={accuracy:.4f}, 非零特征数={non_zero_coefs}")
except Exception as e:
print(f"{name}: 训练失败 - {e}")
# 可视化系数比较
self.plot_regularization_coefficients(results, X.shape[1])
return results
def plot_regularization_coefficients(self, results, n_features):
"""可视化正则化系数"""
fig, axes = plt.subplots(1, len(results), figsize=(15, 5))
if len(results) == 1:
axes = [axes]
for i, (name, result) in enumerate(results.items()):
if 'model' in result:
coefs = result['model'].coef_[0]
feature_indices = range(len(coefs))
axes[i].bar(feature_indices, coefs, alpha=0.7)
axes[i].set_title(f'{name}\n非零特征: {result["non_zero_features"]}')
axes[i].set_xlabel('特征索引')
axes[i].set_ylabel('系数值')
axes[i].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def hyperparameter_tuning(self, X, y):
"""超参数调优"""
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV
# 数据分割和标准化
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 参数网格
param_grid = {
'C': [0.001, 0.01, 0.1, 1, 10, 100],
'penalty': ['l1', 'l2'],
'solver': ['liblinear']
}
# 网格搜索
lr = LogisticRegression(random_state=42, max_iter=1000)
grid_search = GridSearchCV(lr, param_grid, cv=5, scoring='accuracy', n_jobs=-1)
grid_search.fit(X_train_scaled, y_train)
# 最佳模型
best_model = grid_search.best_estimator_
y_pred = best_model.predict(X_test_scaled)
accuracy = accuracy_score(y_test, y_pred)
print(f"=== 超参数调优结果 ===")
print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳交叉验证分数: {grid_search.best_score_:.4f}")
print(f"测试集准确率: {accuracy:.4f}")
# 可视化参数影响
self.plot_hyperparameter_effects(grid_search)
return best_model, grid_search.best_params_
def plot_hyperparameter_effects(self, grid_search):
"""可视化超参数效果"""
results_df = pd.DataFrame(grid_search.cv_results_)
# C参数的影响
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
for penalty in ['l1', 'l2']:
mask = results_df['param_penalty'] == penalty
subset = results_df[mask]
plt.semilogx(subset['param_C'], subset['mean_test_score'],
'o-', label=f'{penalty} 正则化')
plt.xlabel('C (正则化强度的倒数)')
plt.ylabel('交叉验证准确率')
plt.title('C参数对模型性能的影响')
plt.legend()
plt.grid(True, alpha=0.3)
# 热力图显示参数组合效果
plt.subplot(1, 2, 2)
pivot_table = results_df.pivot_table(values='mean_test_score',
index='param_penalty',
columns='param_C')
sns.heatmap(pivot_table, annot=True, fmt='.3f', cmap='viridis')
plt.title('参数组合效果热力图')
plt.tight_layout()
plt.show()
def decision_boundary_visualization(self, X, y):
"""决策边界可视化(仅适用于2D数据)"""
if X.shape[1] != 2:
# 使用PCA降维到2D
from sklearn.decomposition import PCA
pca = PCA(n_components=2)
X_2d = pca.fit_transform(X)
else:
X_2d = X
# 训练模型
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_2d)
lr = LogisticRegression(random_state=42)
lr.fit(X_scaled, y)
# 创建网格
h = 0.02
x_min, x_max = X_scaled[:, 0].min() - 1, X_scaled[:, 0].max() + 1
y_min, y_max = X_scaled[:, 1].min() - 1, X_scaled[:, 1].max() + 1
xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
np.arange(y_min, y_max, h))
# 预测网格点
Z = lr.predict_proba(np.c_[xx.ravel(), yy.ravel()])[:, 1]
Z = Z.reshape(xx.shape)
# 绘制决策边界
plt.figure(figsize=(10, 8))
plt.contourf(xx, yy, Z, levels=50, alpha=0.8, cmap='RdYlBu')
plt.colorbar(label='预测概率')
# 绘制数据点
scatter = plt.scatter(X_scaled[:, 0], X_scaled[:, 1], c=y, cmap='RdYlBu', edgecolors='black')
plt.xlabel('特征1 (标准化)')
plt.ylabel('特征2 (标准化)')
plt.title('逻辑回归决策边界')
plt.tight_layout()
plt.show()
# 演示逻辑回归
lr_demo = LogisticRegressionDemo()
# 理论解释
lr_demo.logistic_regression_theory()
# 基础逻辑回归
X_binary, y_binary = sample_datasets['binary']
basic_lr, basic_accuracy = lr_demo.basic_logistic_regression(X_binary, y_binary)
# 正则化逻辑回归
regularization_results = lr_demo.regularized_logistic_regression(X_binary, y_binary)
# 超参数调优
best_lr, best_params = lr_demo.hyperparameter_tuning(X_binary, y_binary)
# 决策边界可视化
lr_demo.decision_boundary_visualization(X_binary, y_binary)
3. 决策树
3.1 决策树原理与实现
class DecisionTreeDemo:
"""决策树演示"""
def __init__(self):
self.models = {}
self.results = {}
def decision_tree_theory(self):
"""决策树理论解释"""
print("=== 决策树理论 ===")
print("1. 基本思想: 通过一系列if-else规则进行分类")
print("2. 分裂准则:")
print(" - 基尼不纯度: Gini = 1 - Σ(pi²)")
print(" - 信息熵: Entropy = -Σ(pi * log2(pi))")
print(" - 信息增益: IG = Entropy(parent) - Σ(weighted_entropy(children))")
print("3. 停止条件: 最大深度、最小样本数、最小信息增益等")
print("4. 剪枝: 预剪枝(训练时)和后剪枝(训练后)")
# 可视化分裂准则
self.plot_splitting_criteria()
def plot_splitting_criteria(self):
"""可视化分裂准则"""
p = np.linspace(0.01, 0.99, 100)
# 基尼不纯度
gini = 1 - p**2 - (1-p)**2
# 信息熵
entropy = -p * np.log2(p) - (1-p) * np.log2(1-p)
# 分类错误率
misclass = 1 - np.maximum(p, 1-p)
plt.figure(figsize=(12, 8))
plt.subplot(2, 2, 1)
plt.plot(p, gini, 'b-', linewidth=2, label='基尼不纯度')
plt.xlabel('正类比例 (p)')
plt.ylabel('基尼不纯度')
plt.title('基尼不纯度')
plt.grid(True, alpha=0.3)
plt.legend()
plt.subplot(2, 2, 2)
plt.plot(p, entropy, 'r-', linewidth=2, label='信息熵')
plt.xlabel('正类比例 (p)')
plt.ylabel('信息熵')
plt.title('信息熵')
plt.grid(True, alpha=0.3)
plt.legend()
plt.subplot(2, 2, 3)
plt.plot(p, misclass, 'g-', linewidth=2, label='分类错误率')
plt.xlabel('正类比例 (p)')
plt.ylabel('分类错误率')
plt.title('分类错误率')
plt.grid(True, alpha=0.3)
plt.legend()
plt.subplot(2, 2, 4)
plt.plot(p, gini, 'b-', linewidth=2, label='基尼不纯度')
plt.plot(p, entropy, 'r-', linewidth=2, label='信息熵')
plt.plot(p, misclass, 'g-', linewidth=2, label='分类错误率')
plt.xlabel('正类比例 (p)')
plt.ylabel('不纯度')
plt.title('分裂准则比较')
plt.grid(True, alpha=0.3)
plt.legend()
plt.tight_layout()
plt.show()
def basic_decision_tree(self, X, y):
"""基础决策树"""
from sklearn.tree import DecisionTreeClassifier
# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 训练模型
dt = DecisionTreeClassifier(random_state=42)
dt.fit(X_train, y_train)
# 预测
y_pred = dt.predict(X_test)
y_pred_proba = dt.predict_proba(X_test)
# 评估
accuracy = accuracy_score(y_test, y_pred)
print(f"=== 基础决策树结果 ===")
print(f"准确率: {accuracy:.4f}")
print(f"树的深度: {dt.get_depth()}")
print(f"叶子节点数: {dt.get_n_leaves()}")
print(f"特征重要性前5: {sorted(zip(range(len(dt.feature_importances_)), dt.feature_importances_), key=lambda x: x[1], reverse=True)[:5]}")
self.models['basic_dt'] = dt
self.results['basic_dt'] = {
'accuracy': accuracy,
'y_test': y_test,
'y_pred': y_pred,
'y_pred_proba': y_pred_proba
}
return dt, accuracy
def compare_splitting_criteria(self, X, y):
"""比较不同分裂准则"""
from sklearn.tree import DecisionTreeClassifier
# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 不同分裂准则
criteria = ['gini', 'entropy']
results = {}
print("=== 分裂准则比较 ===")
for criterion in criteria:
dt = DecisionTreeClassifier(criterion=criterion, random_state=42)
dt.fit(X_train, y_train)
y_pred = dt.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
results[criterion] = {
'model': dt,
'accuracy': accuracy,
'depth': dt.get_depth(),
'n_leaves': dt.get_n_leaves()
}
print(f"{criterion}: 准确率={accuracy:.4f}, 深度={dt.get_depth()}, 叶子数={dt.get_n_leaves()}")
return results
def pruning_demonstration(self, X, y):
"""剪枝演示"""
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import validation_curve
# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 不同的剪枝参数
max_depths = range(1, 21)
min_samples_splits = [2, 5, 10, 20, 50]
min_samples_leafs = [1, 2, 5, 10, 20]
# 最大深度的影响
train_scores, val_scores = validation_curve(
DecisionTreeClassifier(random_state=42), X_train, y_train,
param_name='max_depth', param_range=max_depths,
cv=5, scoring='accuracy'
)
# 可视化剪枝效果
self.plot_pruning_effects(max_depths, train_scores, val_scores, 'max_depth', '最大深度')
# 找到最佳深度
val_mean = np.mean(val_scores, axis=1)
best_depth = max_depths[np.argmax(val_mean)]
print(f"=== 剪枝结果 ===")
print(f"最佳最大深度: {best_depth}")
print(f"最佳验证分数: {np.max(val_mean):.4f}")
# 使用最佳参数训练最终模型
best_dt = DecisionTreeClassifier(max_depth=best_depth, random_state=42)
best_dt.fit(X_train, y_train)
y_pred = best_dt.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"剪枝后测试准确率: {accuracy:.4f}")
return best_dt, best_depth
def plot_pruning_effects(self, param_range, train_scores, val_scores, param_name, param_label):
"""绘制剪枝效果"""
train_mean = np.mean(train_scores, axis=1)
train_std = np.std(train_scores, axis=1)
val_mean = np.mean(val_scores, axis=1)
val_std = np.std(val_scores, axis=1)
plt.figure(figsize=(10, 6))
plt.plot(param_range, train_mean, 'o-', color='blue', label='训练分数')
plt.fill_between(param_range, train_mean - train_std, train_mean + train_std, alpha=0.1, color='blue')
plt.plot(param_range, val_mean, 'o-', color='red', label='验证分数')
plt.fill_between(param_range, val_mean - val_std, val_mean + val_std, alpha=0.1, color='red')
plt.xlabel(param_label)
plt.ylabel('准确率')
plt.title(f'{param_label}对模型性能的影响')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def feature_importance_analysis(self, X, y, feature_names=None):
"""特征重要性分析"""
from sklearn.tree import DecisionTreeClassifier
# 训练模型
dt = DecisionTreeClassifier(random_state=42)
dt.fit(X, y)
# 获取特征重要性
importances = dt.feature_importances_
if feature_names is None:
feature_names = [f'特征_{i}' for i in range(X.shape[1])]
# 排序
indices = np.argsort(importances)[::-1]
print("=== 特征重要性排序 ===")
for i in range(min(10, len(indices))):
print(f"{i+1}. {feature_names[indices[i]]}: {importances[indices[i]]:.4f}")
# 可视化特征重要性
self.plot_feature_importance(importances, feature_names)
return importances, indices
def plot_feature_importance(self, importances, feature_names):
"""绘制特征重要性"""
# 选择前15个最重要的特征
indices = np.argsort(importances)[::-1][:15]
plt.figure(figsize=(12, 8))
plt.bar(range(len(indices)), importances[indices], alpha=0.7)
plt.xlabel('特征')
plt.ylabel('重要性')
plt.title('决策树特征重要性')
plt.xticks(range(len(indices)), [feature_names[i] for i in indices], rotation=45)
plt.tight_layout()
plt.show()
def visualize_tree_structure(self, X, y, max_depth=3):
"""可视化决策树结构"""
from sklearn.tree import DecisionTreeClassifier, plot_tree
# 训练一个浅层树便于可视化
dt = DecisionTreeClassifier(max_depth=max_depth, random_state=42)
dt.fit(X, y)
# 绘制树结构
plt.figure(figsize=(20, 10))
plot_tree(dt, filled=True, feature_names=[f'特征_{i}' for i in range(X.shape[1])],
class_names=[f'类别_{i}' for i in range(len(np.unique(y)))],
rounded=True, fontsize=10)
plt.title(f'决策树结构 (最大深度={max_depth})')
plt.tight_layout()
plt.show()
return dt
# 演示决策树
dt_demo = DecisionTreeDemo()
# 理论解释
dt_demo.decision_tree_theory()
# 基础决策树
basic_dt, basic_dt_accuracy = dt_demo.basic_decision_tree(X_binary, y_binary)
# 比较分裂准则
criteria_results = dt_demo.compare_splitting_criteria(X_binary, y_binary)
# 剪枝演示
pruned_dt, best_depth = dt_demo.pruning_demonstration(X_binary, y_binary)
# 特征重要性分析
importances, indices = dt_demo.feature_importance_analysis(X_binary, y_binary)
# 可视化树结构
tree_viz = dt_demo.visualize_tree_structure(X_binary, y_binary, max_depth=3)
4. 随机森林
4.1 随机森林原理与实现
class RandomForestDemo:
"""随机森林演示"""
def __init__(self):
self.models = {}
self.results = {}
def random_forest_theory(self):
"""随机森林理论解释"""
print("=== 随机森林理论 ===")
print("1. 集成学习: 组合多个决策树的预测结果")
print("2. Bootstrap采样: 每棵树使用不同的训练子集")
print("3. 随机特征选择: 每次分裂时随机选择特征子集")
print("4. 投票机制: 分类用多数投票,回归用平均值")
print("5. 优点: 减少过拟合、提高泛化能力、特征重要性")
print("6. 参数: n_estimators, max_features, max_depth等")
# 可视化随机森林概念
self.plot_random_forest_concept()
def plot_random_forest_concept(self):
"""可视化随机森林概念"""
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
# Bootstrap采样示意图
np.random.seed(42)
original_data = np.arange(1, 11)
axes[0, 0].bar(range(len(original_data)), original_data, alpha=0.7, label='原始数据')
axes[0, 0].set_title('原始训练数据')
axes[0, 0].set_xlabel('样本索引')
axes[0, 0].set_ylabel('样本值')
axes[0, 0].legend()
# Bootstrap样本
bootstrap_samples = []
for i in range(3):
bootstrap_sample = np.random.choice(original_data, size=len(original_data), replace=True)
bootstrap_samples.append(bootstrap_sample)
axes[0, 1].bar(range(len(bootstrap_sample)), bootstrap_sample,
alpha=0.5, label=f'Bootstrap样本{i+1}')
axes[0, 1].set_title('Bootstrap采样')
axes[0, 1].set_xlabel('样本索引')
axes[0, 1].set_ylabel('样本值')
axes[0, 1].legend()
# 特征随机选择示意图
n_features = 10
max_features_options = [1, 3, 5, 'sqrt', 'log2']
feature_counts = []
for max_features in max_features_options:
if max_features == 'sqrt':
count = int(np.sqrt(n_features))
elif max_features == 'log2':
count = int(np.log2(n_features))
else:
count = max_features
feature_counts.append(count)
axes[1, 0].bar(range(len(max_features_options)), feature_counts, alpha=0.7)
axes[1, 0].set_title('不同max_features设置下的特征数量')
axes[1, 0].set_xlabel('max_features设置')
axes[1, 0].set_ylabel('选择的特征数量')
axes[1, 0].set_xticks(range(len(max_features_options)))
axes[1, 0].set_xticklabels(max_features_options)
# 投票机制示意图
tree_predictions = np.array([[0, 1, 1, 0, 1],
[1, 1, 0, 0, 1],
[0, 1, 1, 1, 1],
[1, 0, 1, 0, 1]])
final_predictions = np.mean(tree_predictions, axis=0) > 0.5
im = axes[1, 1].imshow(tree_predictions, cmap='RdYlBu', aspect='auto')
axes[1, 1].set_title('随机森林投票机制')
axes[1, 1].set_xlabel('样本')
axes[1, 1].set_ylabel('决策树')
# 添加文本标注
for i in range(tree_predictions.shape[0]):
for j in range(tree_predictions.shape[1]):
axes[1, 1].text(j, i, str(tree_predictions[i, j]),
ha='center', va='center', color='white', fontweight='bold')
# 显示最终预测
for j, pred in enumerate(final_predictions):
axes[1, 1].text(j, -0.7, f'最终: {int(pred)}',
ha='center', va='center', fontweight='bold')
plt.tight_layout()
plt.show()
def basic_random_forest(self, X, y):
"""基础随机森林"""
from sklearn.ensemble import RandomForestClassifier
# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 训练模型
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X_train, y_train)
# 预测
y_pred = rf.predict(X_test)
y_pred_proba = rf.predict_proba(X_test)
# 评估
accuracy = accuracy_score(y_test, y_pred)
print(f"=== 基础随机森林结果 ===")
print(f"准确率: {accuracy:.4f}")
print(f"树的数量: {rf.n_estimators}")
print(f"特征重要性前5: {sorted(zip(range(len(rf.feature_importances_)), rf.feature_importances_), key=lambda x: x[1], reverse=True)[:5]}")
self.models['basic_rf'] = rf
self.results['basic_rf'] = {
'accuracy': accuracy,
'y_test': y_test,
'y_pred': y_pred,
'y_pred_proba': y_pred_proba
}
return rf, accuracy
def hyperparameter_optimization(self, X, y):
"""超参数优化"""
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import RandomizedSearchCV
# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 参数分布
param_distributions = {
'n_estimators': [50, 100, 200, 300],
'max_depth': [None, 10, 20, 30],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4],
'max_features': ['sqrt', 'log2', None],
'bootstrap': [True, False]
}
# 随机搜索
rf = RandomForestClassifier(random_state=42)
random_search = RandomizedSearchCV(
rf, param_distributions, n_iter=50, cv=5,
scoring='accuracy', n_jobs=-1, random_state=42
)
random_search.fit(X_train, y_train)
# 最佳模型
best_rf = random_search.best_estimator_
y_pred = best_rf.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"=== 随机森林超参数优化结果 ===")
print(f"最佳参数: {random_search.best_params_}")
print(f"最佳交叉验证分数: {random_search.best_score_:.4f}")
print(f"测试集准确率: {accuracy:.4f}")
# 可视化参数重要性
self.plot_parameter_importance(random_search)
return best_rf, random_search.best_params_
def plot_parameter_importance(self, random_search):
"""可视化参数重要性"""
results_df = pd.DataFrame(random_search.cv_results_)
# 分析不同参数的影响
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
axes = axes.ravel()
params_to_plot = ['param_n_estimators', 'param_max_depth', 'param_min_samples_split',
'param_min_samples_leaf', 'param_max_features', 'param_bootstrap']
for i, param in enumerate(params_to_plot):
if param in results_df.columns:
# 计算每个参数值的平均性能
param_performance = results_df.groupby(param)['mean_test_score'].agg(['mean', 'std'])
if len(param_performance) > 1:
param_performance.plot(kind='bar', y='mean', yerr='std',
ax=axes[i], alpha=0.7, capsize=4)
axes[i].set_title(f'{param.replace("param_", "")} 的影响')
axes[i].set_ylabel('交叉验证分数')
axes[i].tick_params(axis='x', rotation=45)
axes[i].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def feature_importance_comparison(self, X, y, feature_names=None):
"""特征重要性比较"""
from sklearn.ensemble import RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier
if feature_names is None:
feature_names = [f'特征_{i}' for i in range(X.shape[1])]
# 训练模型
dt = DecisionTreeClassifier(random_state=42)
rf = RandomForestClassifier(n_estimators=100, random_state=42)
dt.fit(X, y)
rf.fit(X, y)
# 获取特征重要性
dt_importance = dt.feature_importances_
rf_importance = rf.feature_importances_
# 比较可视化
self.plot_importance_comparison(dt_importance, rf_importance, feature_names)
return dt_importance, rf_importance
def plot_importance_comparison(self, dt_importance, rf_importance, feature_names):
"""绘制特征重要性比较"""
# 选择前10个最重要的特征(基于随机森林)
top_indices = np.argsort(rf_importance)[::-1][:10]
x = np.arange(len(top_indices))
width = 0.35
plt.figure(figsize=(12, 8))
plt.bar(x - width/2, dt_importance[top_indices], width, label='决策树', alpha=0.7)
plt.bar(x + width/2, rf_importance[top_indices], width, label='随机森林', alpha=0.7)
plt.xlabel('特征')
plt.ylabel('重要性')
plt.title('决策树 vs 随机森林特征重要性比较')
plt.xticks(x, [feature_names[i] for i in top_indices], rotation=45)
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def out_of_bag_analysis(self, X, y):
"""袋外误差分析"""
from sklearn.ensemble import RandomForestClassifier
# 启用OOB评估
rf_oob = RandomForestClassifier(n_estimators=100, oob_score=True, random_state=42)
rf_oob.fit(X, y)
print(f"=== 袋外误差分析 ===")
print(f"OOB分数: {rf_oob.oob_score_:.4f}")
# 分析不同树数量对OOB分数的影响
n_estimators_range = range(10, 201, 10)
oob_scores = []
for n_est in n_estimators_range:
rf_temp = RandomForestClassifier(n_estimators=n_est, oob_score=True, random_state=42)
rf_temp.fit(X, y)
oob_scores.append(rf_temp.oob_score_)
# 可视化OOB分数变化
plt.figure(figsize=(10, 6))
plt.plot(n_estimators_range, oob_scores, 'b-o', linewidth=2, markersize=4)
plt.xlabel('树的数量')
plt.ylabel('OOB分数')
plt.title('树的数量对OOB分数的影响')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return rf_oob, oob_scores
# 演示随机森林
rf_demo = RandomForestDemo()
# 理论解释
rf_demo.random_forest_theory()
# 基础随机森林
basic_rf, basic_rf_accuracy = rf_demo.basic_random_forest(X_binary, y_binary)
# 超参数优化
best_rf, best_rf_params = rf_demo.hyperparameter_optimization(X_binary, y_binary)
# 特征重要性比较
dt_imp, rf_imp = rf_demo.feature_importance_comparison(X_binary, y_binary)
# 袋外误差分析
rf_oob, oob_scores = rf_demo.out_of_bag_analysis(X_binary, y_binary)
5. 支持向量机 (SVM)
5.1 SVM原理与实现
class SVMDemo:
"""支持向量机演示"""
def __init__(self):
self.models = {}
self.results = {}
def svm_theory(self):
"""SVM理论解释"""
print("=== SVM理论 ===")
print("1. 基本思想: 找到最优分离超平面,最大化间隔")
print("2. 支持向量: 距离分离超平面最近的样本点")
print("3. 核技巧: 将数据映射到高维空间进行线性分离")
print("4. 常用核函数:")
print(" - 线性核: K(x,y) = x^T * y")
print(" - 多项式核: K(x,y) = (γ*x^T*y + r)^d")
print(" - RBF核: K(x,y) = exp(-γ*||x-y||²)")
print(" - Sigmoid核: K(x,y) = tanh(γ*x^T*y + r)")
print("5. 参数: C(正则化), γ(核参数), kernel(核函数)")
# 可视化核函数
self.plot_kernel_functions()
def plot_kernel_functions(self):
"""可视化核函数"""
# 生成示例数据
x = np.linspace(-3, 3, 100)
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
# RBF核函数
gamma_values = [0.1, 1, 10]
for gamma in gamma_values:
rbf_values = np.exp(-gamma * x**2)
axes[0, 0].plot(x, rbf_values, label=f'γ={gamma}')
axes[0, 0].set_title('RBF核函数')
axes[0, 0].set_xlabel('距离')
axes[0, 0].set_ylabel('核值')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 多项式核函数
degrees = [1, 2, 3]
for degree in degrees:
poly_values = (x + 1)**degree
axes[0, 1].plot(x, poly_values, label=f'度数={degree}')
axes[0, 1].set_title('多项式核函数')
axes[0, 1].set_xlabel('x')
axes[0, 1].set_ylabel('核值')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# Sigmoid核函数
gamma_values = [0.1, 1, 10]
for gamma in gamma_values:
sigmoid_values = np.tanh(gamma * x)
axes[1, 0].plot(x, sigmoid_values, label=f'γ={gamma}')
axes[1, 0].set_title('Sigmoid核函数')
axes[1, 0].set_xlabel('x')
axes[1, 0].set_ylabel('核值')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
# 线性核函数(简单的线性关系)
axes[1, 1].plot(x, x, 'b-', linewidth=2, label='线性核')
axes[1, 1].set_title('线性核函数')
axes[1, 1].set_xlabel('x')
axes[1, 1].set_ylabel('核值')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def basic_svm(self, X, y):
"""基础SVM"""
from sklearn.svm import SVC
# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 特征标准化(SVM对特征尺度敏感)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 训练模型
svm = SVC(kernel='rbf', random_state=42, probability=True)
svm.fit(X_train_scaled, y_train)
# 预测
y_pred = svm.predict(X_test_scaled)
y_pred_proba = svm.predict_proba(X_test_scaled)
# 评估
accuracy = accuracy_score(y_test, y_pred)
print(f"=== 基础SVM结果 ===")
print(f"准确率: {accuracy:.4f}")
print(f"支持向量数量: {svm.n_support_}")
print(f"支持向量总数: {np.sum(svm.n_support_)}")
self.models['basic_svm'] = svm
self.results['basic_svm'] = {
'accuracy': accuracy,
'y_test': y_test,
'y_pred': y_pred,
'y_pred_proba': y_pred_proba,
'scaler': scaler
}
return svm, accuracy
def compare_kernels(self, X, y):
"""比较不同核函数"""
from sklearn.svm import SVC
# 数据分割和标准化
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 不同核函数
kernels = {
'linear': SVC(kernel='linear', random_state=42),
'poly': SVC(kernel='poly', degree=3, random_state=42),
'rbf': SVC(kernel='rbf', random_state=42),
'sigmoid': SVC(kernel='sigmoid', random_state=42)
}
results = {}
print("=== 核函数比较 ===")
for name, model in kernels.items():
model.fit(X_train_scaled, y_train)
y_pred = model.predict(X_test_scaled)
accuracy = accuracy_score(y_test, y_pred)
results[name] = {
'model': model,
'accuracy': accuracy,
'n_support': np.sum(model.n_support_)
}
print(f"{name}: 准确率={accuracy:.4f}, 支持向量数={np.sum(model.n_support_)}")
# 可视化比较
self.plot_kernel_comparison(results)
return results
def plot_kernel_comparison(self, results):
"""可视化核函数比较"""
kernels = list(results.keys())
accuracies = [results[k]['accuracy'] for k in kernels]
n_supports = [results[k]['n_support'] for k in kernels]
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# 准确率比较
bars1 = ax1.bar(kernels, accuracies, alpha=0.7, color='skyblue')
ax1.set_title('不同核函数的准确率比较')
ax1.set_ylabel('准确率')
ax1.set_ylim(0, 1)
ax1.grid(True, alpha=0.3)
# 添加数值标签
for bar, acc in zip(bars1, accuracies):
ax1.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
f'{acc:.3f}', ha='center', va='bottom')
# 支持向量数量比较
bars2 = ax2.bar(kernels, n_supports, alpha=0.7, color='lightcoral')
ax2.set_title('不同核函数的支持向量数量')
ax2.set_ylabel('支持向量数量')
ax2.grid(True, alpha=0.3)
# 添加数值标签
for bar, n_sup in zip(bars2, n_supports):
ax2.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 1,
str(n_sup), ha='center', va='bottom')
plt.tight_layout()
plt.show()
def hyperparameter_tuning(self, X, y):
"""SVM超参数调优"""
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
# 数据分割和标准化
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 参数网格
param_grid = [
{
'kernel': ['linear'],
'C': [0.1, 1, 10, 100]
},
{
'kernel': ['rbf'],
'C': [0.1, 1, 10, 100],
'gamma': ['scale', 'auto', 0.001, 0.01, 0.1, 1]
},
{
'kernel': ['poly'],
'C': [0.1, 1, 10],
'degree': [2, 3, 4],
'gamma': ['scale', 'auto']
}
]
# 网格搜索
svm = SVC(random_state=42)
grid_search = GridSearchCV(svm, param_grid, cv=5, scoring='accuracy', n_jobs=-1)
grid_search.fit(X_train_scaled, y_train)
# 最佳模型
best_svm = grid_search.best_estimator_
y_pred = best_svm.predict(X_test_scaled)
accuracy = accuracy_score(y_test, y_pred)
print(f"=== SVM超参数调优结果 ===")
print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳交叉验证分数: {grid_search.best_score_:.4f}")
print(f"测试集准确率: {accuracy:.4f}")
# 可视化参数效果
self.plot_svm_parameter_effects(grid_search)
return best_svm, grid_search.best_params_
def plot_svm_parameter_effects(self, grid_search):
"""可视化SVM参数效果"""
results_df = pd.DataFrame(grid_search.cv_results_)
# 分析RBF核的C和gamma参数
rbf_results = results_df[results_df['param_kernel'] == 'rbf']
if len(rbf_results) > 0:
# 创建C和gamma的组合效果热力图
pivot_table = rbf_results.pivot_table(
values='mean_test_score',
index='param_gamma',
columns='param_C'
)
plt.figure(figsize=(10, 8))
sns.heatmap(pivot_table, annot=True, fmt='.3f', cmap='viridis')
plt.title('RBF核:C和γ参数组合效果')
plt.xlabel('C参数')
plt.ylabel('γ参数')
plt.tight_layout()
plt.show()
def decision_boundary_visualization(self, X, y):
"""SVM决策边界可视化"""
if X.shape[1] != 2:
# 使用PCA降维到2D
from sklearn.decomposition import PCA
pca = PCA(n_components=2)
X_2d = pca.fit_transform(X)
else:
X_2d = X
# 标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_2d)
# 训练不同核函数的SVM
kernels = ['linear', 'rbf', 'poly']
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
for i, kernel in enumerate(kernels):
svm = SVC(kernel=kernel, random_state=42)
svm.fit(X_scaled, y)
# 创建网格
h = 0.02
x_min, x_max = X_scaled[:, 0].min() - 1, X_scaled[:, 0].max() + 1
y_min, y_max = X_scaled[:, 1].min() - 1, X_scaled[:, 1].max() + 1
xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
np.arange(y_min, y_max, h))
# 预测网格点
Z = svm.predict(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
# 绘制决策边界
axes[i].contourf(xx, yy, Z, alpha=0.8, cmap='RdYlBu')
# 绘制数据点
scatter = axes[i].scatter(X_scaled[:, 0], X_scaled[:, 1], c=y,
cmap='RdYlBu', edgecolors='black')
# 绘制支持向量
axes[i].scatter(svm.support_vectors_[:, 0], svm.support_vectors_[:, 1],
s=100, facecolors='none', edgecolors='red', linewidth=2,
label='支持向量')
axes[i].set_title(f'{kernel.upper()}核决策边界')
axes[i].set_xlabel('特征1 (标准化)')
axes[i].set_ylabel('特征2 (标准化)')
axes[i].legend()
plt.tight_layout()
plt.show()
# 演示SVM
svm_demo = SVMDemo()
# 理论解释
svm_demo.svm_theory()
# 基础SVM
basic_svm, basic_svm_accuracy = svm_demo.basic_svm(X_binary, y_binary)
# 比较核函数
kernel_results = svm_demo.compare_kernels(X_binary, y_binary)
# 超参数调优
best_svm, best_svm_params = svm_demo.hyperparameter_tuning(X_binary, y_binary)
# 决策边界可视化
svm_demo.decision_boundary_visualization(X_binary, y_binary)
6. 朴素贝叶斯
6.1 朴素贝叶斯原理与实现
class NaiveBayesDemo:
"""朴素贝叶斯演示"""
def __init__(self):
self.models = {}
self.results = {}
def naive_bayes_theory(self):
"""朴素贝叶斯理论解释"""
print("=== 朴素贝叶斯理论 ===")
print("1. 基于贝叶斯定理: P(A|B) = P(B|A) * P(A) / P(B)")
print("2. 朴素假设: 特征之间相互独立")
print("3. 分类公式: P(y|x₁,x₂,...,xₙ) ∝ P(y) * ∏P(xᵢ|y)")
print("4. 常见类型:")
print(" - 高斯朴素贝叶斯: 连续特征,假设正态分布")
print(" - 多项式朴素贝叶斯: 离散特征,如文本分类")
print(" - 伯努利朴素贝叶斯: 二元特征")
print("5. 优点: 简单快速、需要数据少、对噪声不敏感")
print("6. 缺点: 特征独立假设往往不现实")
# 可视化贝叶斯定理
self.plot_bayes_theorem()
def plot_bayes_theorem(self):
"""可视化贝叶斯定理"""
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
# 先验概率示例
classes = ['类别A', '类别B', '类别C']
prior_probs = [0.5, 0.3, 0.2]
axes[0, 0].pie(prior_probs, labels=classes, autopct='%1.1f%%', startangle=90)
axes[0, 0].set_title('先验概率 P(y)')
# 似然概率示例
x = np.linspace(-4, 4, 100)
likelihood_A = np.exp(-(x-1)**2/2) / np.sqrt(2*np.pi)
likelihood_B = np.exp(-(x+1)**2/2) / np.sqrt(2*np.pi)
axes[0, 1].plot(x, likelihood_A, label='P(x|类别A)', linewidth=2)
axes[0, 1].plot(x, likelihood_B, label='P(x|类别B)', linewidth=2)
axes[0, 1].set_title('似然概率 P(x|y)')
axes[0, 1].set_xlabel('特征值 x')
axes[0, 1].set_ylabel('概率密度')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# 后验概率计算
evidence = 0.5 * likelihood_A + 0.5 * likelihood_B
posterior_A = (0.5 * likelihood_A) / evidence
posterior_B = (0.5 * likelihood_B) / evidence
axes[1, 0].plot(x, posterior_A, label='P(类别A|x)', linewidth=2)
axes[1, 0].plot(x, posterior_B, label='P(类别B|x)', linewidth=2)
axes[1, 0].set_title('后验概率 P(y|x)')
axes[1, 0].set_xlabel('特征值 x')
axes[1, 0].set_ylabel('后验概率')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
# 决策边界
decision_boundary = x[np.argmin(np.abs(posterior_A - posterior_B))]
axes[1, 1].plot(x, posterior_A, label='P(类别A|x)', linewidth=2)
axes[1, 1].plot(x, posterior_B, label='P(类别B|x)', linewidth=2)
axes[1, 1].axvline(x=decision_boundary, color='red', linestyle='--',
label=f'决策边界 (x={decision_boundary:.2f})')
axes[1, 1].set_title('朴素贝叶斯决策边界')
axes[1, 1].set_xlabel('特征值 x')
axes[1, 1].set_ylabel('后验概率')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def compare_naive_bayes_types(self, X, y):
"""比较不同类型的朴素贝叶斯"""
from sklearn.naive_bayes import GaussianNB, MultinomialNB, BernoulliNB
from sklearn.preprocessing import MinMaxScaler
# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 为MultinomialNB准备非负数据
scaler = MinMaxScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 为BernoulliNB准备二元数据
X_train_binary = (X_train_scaled > 0.5).astype(int)
X_test_binary = (X_test_scaled > 0.5).astype(int)
# 不同类型的朴素贝叶斯
models = {
'Gaussian': (GaussianNB(), X_train, X_test),
'Multinomial': (MultinomialNB(), X_train_scaled, X_test_scaled),
'Bernoulli': (BernoulliNB(), X_train_binary, X_test_binary)
}
results = {}
print("=== 朴素贝叶斯类型比较 ===")
for name, (model, X_tr, X_te) in models.items():
model.fit(X_tr, y_train)
y_pred = model.predict(X_te)
y_pred_proba = model.predict_proba(X_te)
accuracy = accuracy_score(y_test, y_pred)
results[name] = {
'model': model,
'accuracy': accuracy,
'y_pred': y_pred,
'y_pred_proba': y_pred_proba
}
print(f"{name}NB: 准确率={accuracy:.4f}")
# 可视化比较
self.plot_nb_comparison(results)
return results
def plot_nb_comparison(self, results):
"""可视化朴素贝叶斯比较"""
models = list(results.keys())
accuracies = [results[model]['accuracy'] for model in models]
plt.figure(figsize=(10, 6))
bars = plt.bar(models, accuracies, alpha=0.7, color=['skyblue', 'lightgreen', 'lightcoral'])
plt.title('不同类型朴素贝叶斯准确率比较')
plt.ylabel('准确率')
plt.ylim(0, 1)
plt.grid(True, alpha=0.3)
# 添加数值标签
for bar, acc in zip(bars, accuracies):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
f'{acc:.3f}', ha='center', va='bottom')
plt.tight_layout()
plt.show()
def feature_probability_analysis(self, X, y):
"""特征概率分析"""
from sklearn.naive_bayes import GaussianNB
# 训练高斯朴素贝叶斯
nb = GaussianNB()
nb.fit(X, y)
# 获取每个类别的特征均值和方差
print("=== 特征概率分析 ===")
for i, class_label in enumerate(nb.classes_):
print(f"\n类别 {class_label}:")
print(f" 先验概率: {np.exp(nb.class_log_prior_[i]):.4f}")
print(f" 特征均值前5个: {nb.theta_[i][:5]}")
print(f" 特征方差前5个: {nb.var_[i][:5]}")
# 可视化特征分布
self.plot_feature_distributions(X, y, nb)
return nb
def plot_feature_distributions(self, X, y, nb_model):
"""可视化特征分布"""
# 选择前4个特征进行可视化
n_features_to_plot = min(4, X.shape[1])
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
axes = axes.ravel()
for i in range(n_features_to_plot):
ax = axes[i]
# 为每个类别绘制特征分布
for j, class_label in enumerate(nb_model.classes_):
class_data = X[y == class_label, i]
# 绘制直方图
ax.hist(class_data, alpha=0.6, label=f'类别 {class_label}', bins=20)
# 绘制高斯分布拟合
x_range = np.linspace(class_data.min(), class_data.max(), 100)
mean = nb_model.theta_[j, i]
var = nb_model.var_[j, i]
gaussian = np.exp(-(x_range - mean)**2 / (2 * var)) / np.sqrt(2 * np.pi * var)
# 缩放高斯曲线以匹配直方图
scale_factor = len(class_data) * (class_data.max() - class_data.min()) / 20
ax.plot(x_range, gaussian * scale_factor, '--', linewidth=2)
ax.set_title(f'特征 {i+1} 的分布')
ax.set_xlabel('特征值')
ax.set_ylabel('频次')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def text_classification_example(self):
"""文本分类示例"""
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import Pipeline
# 示例文本数据
texts = [
"机器学习是人工智能的重要分支",
"深度学习神经网络模型效果很好",
"今天天气很好适合出门散步",
"明天可能会下雨记得带伞",
"Python是数据科学的重要工具",
"算法优化可以提高模型性能",
"春天来了花儿都开了",
"夏天很热需要开空调",
"数据预处理是机器学习的第一步",
"特征工程对模型效果很重要"
]
# 标签:0-技术,1-天气
labels = [0, 0, 1, 1, 0, 0, 1, 1, 0, 0]
# 创建管道
text_pipeline = Pipeline([
('vectorizer', CountVectorizer()),
('classifier', MultinomialNB())
])
# 训练模型
text_pipeline.fit(texts, labels)
# 测试预测
test_texts = [
"神经网络算法很复杂",
"今天阳光明媚",
"数据挖掘技术应用广泛"
]
predictions = text_pipeline.predict(test_texts)
probabilities = text_pipeline.predict_proba(test_texts)
print("=== 文本分类示例 ===")
class_names = ['技术', '天气']
for i, text in enumerate(test_texts):
pred_class = class_names[predictions[i]]
confidence = probabilities[i].max()
print(f"文本: '{text}'")
print(f"预测类别: {pred_class} (置信度: {confidence:.3f})")
print()
return text_pipeline
# 演示朴素贝叶斯
nb_demo = NaiveBayesDemo()
# 理论解释
nb_demo.naive_bayes_theory()
# 比较不同类型
nb_results = nb_demo.compare_naive_bayes_types(X_binary, y_binary)
# 特征概率分析
nb_model = nb_demo.feature_probability_analysis(X_binary, y_binary)
# 文本分类示例
text_classifier = nb_demo.text_classification_example()
7. 算法综合比较与选择
7.1 算法性能比较
class AlgorithmComparison:
"""算法综合比较"""
def __init__(self):
self.results = {}
self.models = {}
def comprehensive_comparison(self, X, y):
"""综合比较所有算法"""
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import cross_val_score
# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 特征标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 定义算法
algorithms = {
'Logistic Regression': LogisticRegression(random_state=42, max_iter=1000),
'Decision Tree': DecisionTreeClassifier(random_state=42),
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
'SVM (RBF)': SVC(kernel='rbf', random_state=42, probability=True),
'Naive Bayes': GaussianNB(),
'K-Nearest Neighbors': KNeighborsClassifier(n_neighbors=5)
}
results = {}
print("=== 算法综合比较 ===")
for name, model in algorithms.items():
# 选择合适的数据
if name in ['Logistic Regression', 'SVM (RBF)', 'K-Nearest Neighbors']:
X_tr, X_te = X_train_scaled, X_test_scaled
else:
X_tr, X_te = X_train, X_test
# 训练和评估
start_time = time.time()
model.fit(X_tr, y_train)
training_time = time.time() - start_time
start_time = time.time()
y_pred = model.predict(X_te)
prediction_time = time.time() - start_time
# 计算指标
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average='weighted', zero_division=0)
recall = recall_score(y_test, y_pred, average='weighted', zero_division=0)
f1 = f1_score(y_test, y_pred, average='weighted', zero_division=0)
# 交叉验证
cv_scores = cross_val_score(model, X_tr, y_train, cv=5, scoring='accuracy')
results[name] = {
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1_score': f1,
'cv_mean': cv_scores.mean(),
'cv_std': cv_scores.std(),
'training_time': training_time,
'prediction_time': prediction_time,
'model': model
}
print(f"{name}:")
print(f" 准确率: {accuracy:.4f}")
print(f" 交叉验证: {cv_scores.mean():.4f} (±{cv_scores.std():.4f})")
print(f" 训练时间: {training_time:.4f}s")
print(f" 预测时间: {prediction_time:.4f}s")
print()
self.results = results
# 可视化比较
self.plot_algorithm_comparison(results)
return results
def plot_algorithm_comparison(self, results):
"""可视化算法比较"""
algorithms = list(results.keys())
metrics = ['accuracy', 'precision', 'recall', 'f1_score']
# 性能指标比较
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
for i, metric in enumerate(metrics):
ax = axes[i//2, i%2]
values = [results[alg][metric] for alg in algorithms]
bars = ax.bar(range(len(algorithms)), values, alpha=0.7)
ax.set_title(f'{metric.upper()} 比较')
ax.set_ylabel(metric.upper())
ax.set_xticks(range(len(algorithms)))
ax.set_xticklabels(algorithms, rotation=45, ha='right')
ax.grid(True, alpha=0.3)
# 添加数值标签
for j, (bar, val) in enumerate(zip(bars, values)):
ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
f'{val:.3f}', ha='center', va='bottom', fontsize=9)
plt.tight_layout()
plt.show()
# 时间性能比较
self.plot_time_comparison(results)
# 综合性能雷达图
self.plot_radar_chart(results)
def plot_time_comparison(self, results):
"""时间性能比较"""
algorithms = list(results.keys())
training_times = [results[alg]['training_time'] for alg in algorithms]
prediction_times = [results[alg]['prediction_time'] for alg in algorithms]
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# 训练时间
bars1 = ax1.bar(range(len(algorithms)), training_times, alpha=0.7, color='lightblue')
ax1.set_title('训练时间比较')
ax1.set_ylabel('时间 (秒)')
ax1.set_xticks(range(len(algorithms)))
ax1.set_xticklabels(algorithms, rotation=45, ha='right')
ax1.grid(True, alpha=0.3)
for bar, time_val in zip(bars1, training_times):
ax1.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.001,
f'{time_val:.3f}', ha='center', va='bottom', fontsize=9)
# 预测时间
bars2 = ax2.bar(range(len(algorithms)), prediction_times, alpha=0.7, color='lightcoral')
ax2.set_title('预测时间比较')
ax2.set_ylabel('时间 (秒)')
ax2.set_xticks(range(len(algorithms)))
ax2.set_xticklabels(algorithms, rotation=45, ha='right')
ax2.grid(True, alpha=0.3)
for bar, time_val in zip(bars2, prediction_times):
ax2.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.0001,
f'{time_val:.4f}', ha='center', va='bottom', fontsize=9)
plt.tight_layout()
plt.show()
def plot_radar_chart(self, results):
"""绘制综合性能雷达图"""
import math
# 选择前4个算法进行雷达图比较
top_algorithms = sorted(results.keys(),
key=lambda x: results[x]['accuracy'], reverse=True)[:4]
# 指标
metrics = ['accuracy', 'precision', 'recall', 'f1_score']
# 设置雷达图
angles = [n / float(len(metrics)) * 2 * math.pi for n in range(len(metrics))]
angles += angles[:1] # 闭合图形
fig, ax = plt.subplots(figsize=(10, 10), subplot_kw=dict(projection='polar'))
colors = ['red', 'blue', 'green', 'orange']
for i, alg in enumerate(top_algorithms):
values = [results[alg][metric] for metric in metrics]
values += values[:1] # 闭合图形
ax.plot(angles, values, 'o-', linewidth=2, label=alg, color=colors[i])
ax.fill(angles, values, alpha=0.25, color=colors[i])
ax.set_xticks(angles[:-1])
ax.set_xticklabels(metrics)
ax.set_ylim(0, 1)
ax.set_title('算法综合性能雷达图', size=16, y=1.1)
ax.legend(loc='upper right', bbox_to_anchor=(1.3, 1.0))
ax.grid(True)
plt.tight_layout()
plt.show()
def algorithm_selection_guide(self):
"""算法选择指南"""
print("=== 分类算法选择指南 ===")
print()
guide = {
"数据量": {
"小数据集 (<1000样本)": ["朴素贝叶斯", "K近邻", "决策树"],
"中等数据集 (1000-100000样本)": ["逻辑回归", "SVM", "随机森林"],
"大数据集 (>100000样本)": ["逻辑回归", "随机森林", "梯度提升"]
},
"特征数量": {
"低维 (<100特征)": ["所有算法都适用"],
"高维 (>100特征)": ["逻辑回归", "SVM", "朴素贝叶斯"],
"超高维 (>10000特征)": ["逻辑回归", "朴素贝叶斯"]
},
"数据类型": {
"数值特征": ["逻辑回归", "SVM", "随机森林"],
"类别特征": ["决策树", "随机森林", "朴素贝叶斯"],
"文本数据": ["朴素贝叶斯", "逻辑回归", "SVM"],
"混合类型": ["随机森林", "梯度提升"]
},
"可解释性要求": {
"高可解释性": ["逻辑回归", "决策树", "朴素贝叶斯"],
"中等可解释性": ["随机森林", "K近邻"],
"低可解释性": ["SVM", "神经网络"]
},
"训练速度要求": {
"快速训练": ["朴素贝叶斯", "逻辑回归", "K近邻"],
"中等速度": ["决策树", "SVM"],
"可接受较慢": ["随机森林", "梯度提升"]
}
}
for category, subcategories in guide.items():
print(f"### {category}")
for condition, algorithms in subcategories.items():
print(f" {condition}: {', '.join(algorithms)}")
print()
# 演示算法比较
import time
comparison = AlgorithmComparison()
# 综合比较
comparison_results = comparison.comprehensive_comparison(X_binary, y_binary)
# 算法选择指南
comparison.algorithm_selection_guide()
8. 实战案例:客户流失预测
8.1 完整的分类项目流程
class CustomerChurnPrediction:
"""客户流失预测实战案例"""
def __init__(self):
self.models = {}
self.results = {}
self.best_model = None
def create_customer_dataset(self):
"""创建客户流失数据集"""
np.random.seed(42)
n_samples = 2000
# 生成特征
age = np.random.normal(40, 15, n_samples)
tenure = np.random.exponential(2, n_samples)
monthly_charges = np.random.normal(70, 20, n_samples)
total_charges = monthly_charges * tenure + np.random.normal(0, 100, n_samples)
# 服务相关特征
internet_service = np.random.choice([0, 1, 2], n_samples, p=[0.3, 0.4, 0.3]) # 0:无, 1:DSL, 2:光纤
online_security = np.random.choice([0, 1], n_samples, p=[0.6, 0.4])
tech_support = np.random.choice([0, 1], n_samples, p=[0.7, 0.3])
# 合同相关特征
contract_type = np.random.choice([0, 1, 2], n_samples, p=[0.5, 0.3, 0.2]) # 0:月付, 1:年付, 2:两年
paperless_billing = np.random.choice([0, 1], n_samples, p=[0.4, 0.6])
payment_method = np.random.choice([0, 1, 2, 3], n_samples, p=[0.25, 0.25, 0.25, 0.25])
# 生成流失标签(基于特征的逻辑关系)
churn_prob = (
0.1 + # 基础流失率
0.3 * (monthly_charges > 80) + # 高费用客户更容易流失
0.2 * (tenure < 1) + # 新客户更容易流失
0.15 * (contract_type == 0) + # 月付客户更容易流失
0.1 * (internet_service == 2) + # 光纤客户可能因为价格流失
-0.2 * online_security + # 有安全服务的客户不容易流失
-0.15 * tech_support + # 有技术支持的客户不容易流失
0.1 * paperless_billing # 无纸化账单客户稍微容易流失
)
# 添加噪声并生成二元标签
churn_prob += np.random.normal(0, 0.1, n_samples)
churn = (churn_prob > 0.5).astype(int)
# 组合特征
X = np.column_stack([
age, tenure, monthly_charges, total_charges,
internet_service, online_security, tech_support,
contract_type, paperless_billing, payment_method
])
# 特征名称
feature_names = [
'age', 'tenure', 'monthly_charges', 'total_charges',
'internet_service', 'online_security', 'tech_support',
'contract_type', 'paperless_billing', 'payment_method'
]
return X, churn, feature_names
def exploratory_data_analysis(self, X, y, feature_names):
"""探索性数据分析"""
import pandas as pd
# 创建DataFrame
df = pd.DataFrame(X, columns=feature_names)
df['churn'] = y
print("=== 数据集基本信息 ===")
print(f"样本数量: {len(df)}")
print(f"特征数量: {len(feature_names)}")
print(f"流失率: {y.mean():.2%}")
print()
# 基本统计
print("=== 数值特征统计 ===")
numeric_features = ['age', 'tenure', 'monthly_charges', 'total_charges']
print(df[numeric_features].describe())
print()
# 类别特征分布
print("=== 类别特征分布 ===")
categorical_features = ['internet_service', 'online_security', 'tech_support',
'contract_type', 'paperless_billing', 'payment_method']
for feature in categorical_features:
print(f"{feature}: {df[feature].value_counts().to_dict()}")
print()
# 可视化
self.plot_eda(df, feature_names)
return df
def plot_eda(self, df, feature_names):
"""可视化探索性数据分析"""
fig, axes = plt.subplots(3, 3, figsize=(18, 15))
axes = axes.ravel()
# 数值特征分布
numeric_features = ['age', 'tenure', 'monthly_charges', 'total_charges']
for i, feature in enumerate(numeric_features):
ax = axes[i]
# 按流失情况分组绘制直方图
df[df['churn'] == 0][feature].hist(alpha=0.6, label='未流失', bins=30, ax=ax)
df[df['churn'] == 1][feature].hist(alpha=0.6, label='流失', bins=30, ax=ax)
ax.set_title(f'{feature} 分布')
ax.set_xlabel(feature)
ax.set_ylabel('频次')
ax.legend()
ax.grid(True, alpha=0.3)
# 类别特征与流失关系
categorical_features = ['internet_service', 'contract_type', 'online_security',
'tech_support', 'paperless_billing']
for i, feature in enumerate(categorical_features):
ax = axes[i + 4]
# 计算流失率
churn_rate = df.groupby(feature)['churn'].mean()
bars = ax.bar(range(len(churn_rate)), churn_rate.values, alpha=0.7)
ax.set_title(f'{feature} 流失率')
ax.set_xlabel(feature)
ax.set_ylabel('流失率')
ax.set_xticks(range(len(churn_rate)))
ax.set_xticklabels(churn_rate.index)
ax.grid(True, alpha=0.3)
# 添加数值标签
for bar, rate in zip(bars, churn_rate.values):
ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
f'{rate:.2%}', ha='center', va='bottom')
plt.tight_layout()
plt.show()
def build_and_compare_models(self, X, y, feature_names):
"""构建和比较多个模型"""
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.naive_bayes import GaussianNB
from sklearn.metrics import classification_report, confusion_matrix
# 数据分割
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# 特征标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 定义模型和参数网格
models_params = {
'Logistic Regression': {
'model': LogisticRegression(random_state=42, max_iter=1000),
'params': {'C': [0.1, 1, 10], 'penalty': ['l1', 'l2']},
'data': (X_train_scaled, X_test_scaled)
},
'Decision Tree': {
'model': DecisionTreeClassifier(random_state=42),
'params': {'max_depth': [3, 5, 10, None], 'min_samples_split': [2, 5, 10]},
'data': (X_train, X_test)
},
'Random Forest': {
'model': RandomForestClassifier(random_state=42),
'params': {'n_estimators': [50, 100, 200], 'max_depth': [3, 5, 10]},
'data': (X_train, X_test)
},
'SVM': {
'model': SVC(random_state=42, probability=True),
'params': {'C': [0.1, 1, 10], 'kernel': ['rbf', 'linear']},
'data': (X_train_scaled, X_test_scaled)
},
'Naive Bayes': {
'model': GaussianNB(),
'params': {},
'data': (X_train, X_test)
}
}
results = {}
best_score = 0
print("=== 模型训练和评估 ===")
for name, config in models_params.items():
print(f"\n训练 {name}...")
model = config['model']
params = config['params']
X_tr, X_te = config['data']
if params:
# 网格搜索
grid_search = GridSearchCV(
model, params, cv=5, scoring='f1', n_jobs=-1
)
grid_search.fit(X_tr, y_train)
best_model = grid_search.best_estimator_
print(f"最佳参数: {grid_search.best_params_}")
else:
best_model = model
best_model.fit(X_tr, y_train)
# 预测和评估
y_pred = best_model.predict(X_te)
y_pred_proba = best_model.predict_proba(X_te)[:, 1] if hasattr(best_model, 'predict_proba') else None
# 计算指标
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
# 交叉验证
cv_scores = cross_val_score(best_model, X_tr, y_train, cv=5, scoring='f1')
results[name] = {
'model': best_model,
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1_score': f1,
'cv_mean': cv_scores.mean(),
'cv_std': cv_scores.std(),
'y_pred': y_pred,
'y_pred_proba': y_pred_proba
}
print(f"准确率: {accuracy:.4f}")
print(f"精确率: {precision:.4f}")
print(f"召回率: {recall:.4f}")
print(f"F1分数: {f1:.4f}")
print(f"交叉验证F1: {cv_scores.mean():.4f} (±{cv_scores.std():.4f})")
# 更新最佳模型
if f1 > best_score:
best_score = f1
self.best_model = best_model
self.results = results
# 可视化比较
self.plot_model_comparison(results)
# 详细评估最佳模型
self.detailed_evaluation(y_test, results)
return results
def plot_model_comparison(self, results):
"""可视化模型比较"""
models = list(results.keys())
metrics = ['accuracy', 'precision', 'recall', 'f1_score']
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
for i, metric in enumerate(metrics):
ax = axes[i//2, i%2]
values = [results[model][metric] for model in models]
bars = ax.bar(range(len(models)), values, alpha=0.7)
ax.set_title(f'{metric.upper()} 比较')
ax.set_ylabel(metric.upper())
ax.set_xticks(range(len(models)))
ax.set_xticklabels(models, rotation=45, ha='right')
ax.grid(True, alpha=0.3)
# 添加数值标签
for bar, val in zip(bars, values):
ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
f'{val:.3f}', ha='center', va='bottom', fontsize=9)
plt.tight_layout()
plt.show()
def detailed_evaluation(self, y_test, results):
"""详细评估"""
# 找到最佳模型
best_model_name = max(results.keys(), key=lambda x: results[x]['f1_score'])
best_result = results[best_model_name]
print(f"\n=== 最佳模型: {best_model_name} ===")
# 分类报告
print("\n分类报告:")
print(classification_report(y_test, best_result['y_pred'],
target_names=['未流失', '流失']))
# 混淆矩阵
cm = confusion_matrix(y_test, best_result['y_pred'])
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=['未流失', '流失'],
yticklabels=['未流失', '流失'])
plt.title(f'{best_model_name} 混淆矩阵')
plt.ylabel('真实标签')
plt.xlabel('预测标签')
plt.show()
# ROC曲线
if best_result['y_pred_proba'] is not None:
from sklearn.metrics import roc_curve, auc
fpr, tpr, _ = roc_curve(y_test, best_result['y_pred_proba'])
roc_auc = auc(fpr, tpr)
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='darkorange', lw=2,
label=f'ROC曲线 (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('假正率')
plt.ylabel('真正率')
plt.title(f'{best_model_name} ROC曲线')
plt.legend(loc="lower right")
plt.grid(True, alpha=0.3)
plt.show()
# 运行客户流失预测案例
churn_predictor = CustomerChurnPrediction()
# 创建数据集
X_churn, y_churn, feature_names = churn_predictor.create_customer_dataset()
# 探索性数据分析
df_churn = churn_predictor.exploratory_data_analysis(X_churn, y_churn, feature_names)
# 构建和比较模型
churn_results = churn_predictor.build_and_compare_models(X_churn, y_churn, feature_names)
本章小结
主要内容回顾
本章深入介绍了Scikit-learn中的主要分类算法:
逻辑回归
- 基于线性模型的概率分类器
- 适用于线性可分问题
- 具有良好的可解释性
决策树
- 基于规则的分类器
- 易于理解和解释
- 容易过拟合,需要剪枝
随机森林
- 集成学习方法
- 减少过拟合,提高泛化能力
- 能处理大量特征
支持向量机(SVM)
- 基于最大间隔的分类器
- 通过核函数处理非线性问题
- 在高维空间表现优秀
朴素贝叶斯
- 基于概率的分类器
- 训练速度快,需要数据少
- 适用于文本分类
算法选择指导原则
- 数据量小: 朴素贝叶斯、K近邻、决策树
- 高维数据: 逻辑回归、SVM、朴素贝叶斯
- 需要可解释性: 逻辑回归、决策树、朴素贝叶斯
- 追求高精度: 随机森林、SVM、集成方法
- 文本分类: 朴素贝叶斯、逻辑回归、SVM
最佳实践
数据预处理
- 处理缺失值和异常值
- 特征标准化(对SVM、逻辑回归重要)
- 特征选择和降维
模型选择
- 使用交叉验证评估模型
- 网格搜索优化超参数
- 考虑模型的复杂度和可解释性
评估指标
- 准确率:整体性能
- 精确率:减少误报
- 召回率:减少漏报
- F1分数:平衡精确率和召回率
常见陷阱
- 数据泄露: 使用未来信息预测过去
- 过拟合: 模型在训练集上表现好,测试集差
- 类别不平衡: 需要使用适当的评估指标和采样方法
- 特征缩放: 忘记对距离敏感的算法进行标准化
下一步学习
- 第4章:监督学习-回归算法
- 深入学习集成学习方法
- 了解深度学习在分类中的应用
- 学习处理不平衡数据的技术
练习题
基础练习
- 使用iris数据集比较不同分类算法的性能
- 实现一个简单的逻辑回归分类器
- 可视化决策树的分裂过程
进阶练习
- 在wine数据集上进行特征选择和模型优化
- 实现一个文本分类系统
- 比较不同核函数的SVM性能
项目练习
- 完成一个完整的分类项目(如垃圾邮件检测)
- 处理真实世界的不平衡数据集
- 构建一个模型选择和评估的自动化流程
第3章完结 ✅
下一章我们将学习监督学习中的回归算法,包括线性回归、多项式回归、正则化方法等内容。