3.1 监督学习概述
3.1.1 什么是监督学习
监督学习是机器学习的一个重要分支,它使用带有标签的训练数据来学习输入和输出之间的映射关系。监督学习的目标是构建一个模型,能够对新的、未见过的数据进行准确的预测。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.metrics import classification_report, confusion_matrix
import warnings
warnings.filterwarnings('ignore')
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
print("监督学习算法章节开始")
print("本章将介绍主要的监督学习算法及其应用")
3.1.2 监督学习的类型
监督学习主要分为两大类:
回归(Regression):预测连续数值
- 房价预测
- 股票价格预测
- 销售额预测
分类(Classification):预测离散类别
- 邮件垃圾分类
- 图像识别
- 疾病诊断
# 创建示例数据集
np.random.seed(42)
# 回归示例数据
n_samples = 1000
X_regression = np.random.randn(n_samples, 2)
y_regression = 3 * X_regression[:, 0] + 2 * X_regression[:, 1] + np.random.randn(n_samples) * 0.5
# 分类示例数据
from sklearn.datasets import make_classification
X_classification, y_classification = make_classification(
n_samples=1000, n_features=2, n_redundant=0, n_informative=2,
random_state=42, n_clusters_per_class=1
)
# 可视化数据
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
# 回归数据可视化
ax1.scatter(X_regression[:, 0], y_regression, alpha=0.6)
ax1.set_xlabel('特征1')
ax1.set_ylabel('目标值')
ax1.set_title('回归问题示例')
# 分类数据可视化
scatter = ax2.scatter(X_classification[:, 0], X_classification[:, 1],
c=y_classification, cmap='viridis', alpha=0.6)
ax2.set_xlabel('特征1')
ax2.set_ylabel('特征2')
ax2.set_title('分类问题示例')
plt.colorbar(scatter, ax=ax2)
plt.tight_layout()
plt.show()
print(f"回归数据形状: {X_regression.shape}, 目标变量范围: [{y_regression.min():.2f}, {y_regression.max():.2f}]")
print(f"分类数据形状: {X_classification.shape}, 类别数量: {len(np.unique(y_classification))}")
3.1.3 监督学习的工作流程
def supervised_learning_workflow(X, y, problem_type='classification'):
"""
监督学习标准工作流程
"""
print("监督学习工作流程:")
print("=" * 40)
# 1. 数据分割
print("1. 数据分割...")
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y if problem_type == 'classification' else None
)
print(f" 训练集大小: {X_train.shape[0]}")
print(f" 测试集大小: {X_test.shape[0]}")
# 2. 特征缩放
print("\n2. 特征缩放...")
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
print(" 完成标准化")
# 3. 模型选择和训练
print("\n3. 模型训练...")
if problem_type == 'classification':
from sklearn.linear_model import LogisticRegression
model = LogisticRegression(random_state=42)
model.fit(X_train_scaled, y_train)
# 预测
y_pred = model.predict(X_test_scaled)
# 评估
accuracy = accuracy_score(y_test, y_pred)
print(f" 模型准确率: {accuracy:.4f}")
else: # regression
from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X_train_scaled, y_train)
# 预测
y_pred = model.predict(X_test_scaled)
# 评估
mse = mean_squared_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
print(f" 均方误差: {mse:.4f}")
print(f" R²分数: {r2:.4f}")
return model, scaler, (X_train_scaled, X_test_scaled, y_train, y_test, y_pred)
# 演示工作流程
print("分类问题工作流程:")
clf_model, clf_scaler, clf_results = supervised_learning_workflow(
X_classification, y_classification, 'classification'
)
print("\n" + "="*50)
print("回归问题工作流程:")
reg_model, reg_scaler, reg_results = supervised_learning_workflow(
X_regression, y_regression, 'regression'
)
3.2 线性回归
3.2.1 线性回归原理
线性回归是最基础的回归算法,它假设目标变量与特征之间存在线性关系。
数学公式: $$y = \beta_0 + \beta_1 x_1 + \beta_2 x_2 + … + \beta_n x_n + \epsilon$$
其中: - $y$ 是目标变量 - $x_i$ 是特征变量 - $\beta_i$ 是回归系数 - $\epsilon$ 是误差项
from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.preprocessing import PolynomialFeatures
from sklearn.pipeline import Pipeline
class LinearRegressionAnalyzer:
"""
线性回归分析器
"""
def __init__(self):
self.models = {}
self.results = {}
def fit_models(self, X_train, y_train, X_test, y_test):
"""
训练多种线性回归模型
"""
# 定义模型
models = {
'Linear Regression': LinearRegression(),
'Ridge Regression': Ridge(alpha=1.0),
'Lasso Regression': Lasso(alpha=1.0),
'ElasticNet': ElasticNet(alpha=1.0, l1_ratio=0.5)
}
print("训练线性回归模型:")
print("-" * 30)
for name, model in models.items():
# 训练模型
model.fit(X_train, y_train)
# 预测
y_pred_train = model.predict(X_train)
y_pred_test = model.predict(X_test)
# 评估
train_mse = mean_squared_error(y_train, y_pred_train)
test_mse = mean_squared_error(y_test, y_pred_test)
train_r2 = r2_score(y_train, y_pred_train)
test_r2 = r2_score(y_test, y_pred_test)
# 保存结果
self.models[name] = model
self.results[name] = {
'train_mse': train_mse,
'test_mse': test_mse,
'train_r2': train_r2,
'test_r2': test_r2,
'y_pred_test': y_pred_test
}
print(f"{name}:")
print(f" 训练MSE: {train_mse:.4f}, R²: {train_r2:.4f}")
print(f" 测试MSE: {test_mse:.4f}, R²: {test_r2:.4f}")
print()
def plot_results(self, X_test, y_test):
"""
可视化结果
"""
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
axes = axes.ravel()
for i, (name, results) in enumerate(self.results.items()):
ax = axes[i]
# 真实值 vs 预测值
ax.scatter(y_test, results['y_pred_test'], alpha=0.6)
# 完美预测线
min_val = min(y_test.min(), results['y_pred_test'].min())
max_val = max(y_test.max(), results['y_pred_test'].max())
ax.plot([min_val, max_val], [min_val, max_val], 'r--', lw=2)
ax.set_xlabel('真实值')
ax.set_ylabel('预测值')
ax.set_title(f'{name}\nR² = {results["test_r2"]:.4f}')
plt.tight_layout()
plt.show()
def compare_coefficients(self):
"""
比较不同模型的系数
"""
coef_data = []
for name, model in self.models.items():
if hasattr(model, 'coef_'):
coef_data.append({
'Model': name,
'Intercept': model.intercept_,
'Coefficients': model.coef_
})
# 可视化系数
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
# 截距比较
models = [item['Model'] for item in coef_data]
intercepts = [item['Intercept'] for item in coef_data]
ax1.bar(models, intercepts)
ax1.set_title('模型截距比较')
ax1.set_ylabel('截距值')
plt.setp(ax1.get_xticklabels(), rotation=45)
# 系数比较
for i, item in enumerate(coef_data):
ax2.plot(item['Coefficients'], 'o-', label=item['Model'], markersize=6)
ax2.set_title('模型系数比较')
ax2.set_xlabel('特征索引')
ax2.set_ylabel('系数值')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 演示决策树
tree_analyzer = DecisionTreeAnalyzer()
# 信息增益演示
entropy_func, gini_func = tree_analyzer.information_gain_demo()
# 使用鸢尾花数据集进行分类
from sklearn.datasets import load_iris
iris = load_iris()
X_iris, y_iris = iris.data, iris.target
print("\n鸢尾花数据集决策树分析:")
print("=" * 40)
# 分割数据
X_train_iris, X_test_iris, y_train_iris, y_test_iris = train_test_split(
X_iris, y_iris, test_size=0.2, random_state=42, stratify=y_iris
)
# 训练决策树
iris_results = tree_analyzer.train_decision_trees(
X_train_iris, y_train_iris, X_test_iris, y_test_iris, 'classification'
)
# 可视化决策树
tree_analyzer.visualize_tree(
'Pruned',
feature_names=iris.feature_names,
class_names=iris.target_names,
max_depth=3
)
# 特征重要性分析
importance_data = tree_analyzer.feature_importance_analysis(iris.feature_names)
# 绘制复杂度分析
tree_analyzer.plot_tree_complexity(iris_results)
3.4.2 决策树回归
决策树也可以用于回归问题,通过预测叶子节点中目标值的平均值。
class DecisionTreeRegressionDemo:
"""
决策树回归演示
"""
def __init__(self):
self.model = None
def create_regression_data(self, n_samples=200):
"""
创建回归数据
"""
np.random.seed(42)
X = np.linspace(0, 10, n_samples).reshape(-1, 1)
y = np.sin(X.ravel()) + 0.1 * np.random.randn(n_samples)
return X, y
def compare_tree_depths(self, X, y):
"""
比较不同深度的决策树回归
"""
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
depths = [1, 3, 5, 10, None]
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.ravel()
# 用于绘图的密集点
X_plot = np.linspace(0, 10, 300).reshape(-1, 1)
for i, depth in enumerate(depths):
if i >= len(axes):
break
ax = axes[i]
# 训练模型
model = DecisionTreeRegressor(max_depth=depth, random_state=42)
model.fit(X_train, y_train)
# 预测
y_pred_plot = model.predict(X_plot)
y_pred_test = model.predict(X_test)
# 评估
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)
# 绘制
ax.scatter(X_train, y_train, alpha=0.6, label='训练数据')
ax.scatter(X_test, y_test, alpha=0.6, color='red', label='测试数据')
ax.plot(X_plot, y_pred_plot, color='green', linewidth=2,
label=f'决策树 (深度={depth})')
ax.set_xlabel('X')
ax.set_ylabel('y')
ax.set_title(f'深度={depth}\n训练R²={train_score:.3f}, 测试R²={test_score:.3f}')
ax.legend()
ax.grid(True, alpha=0.3)
# 隐藏多余的子图
for i in range(len(depths), len(axes)):
axes[i].set_visible(False)
plt.tight_layout()
plt.show()
def pruning_demonstration(self, X, y):
"""
剪枝演示
"""
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# 训练完整的树
full_tree = DecisionTreeRegressor(random_state=42)
full_tree.fit(X_train, y_train)
# 获取剪枝路径
path = full_tree.cost_complexity_pruning_path(X_train, y_train)
ccp_alphas, impurities = path.ccp_alphas, path.impurities
# 训练不同alpha值的树
clfs = []
for ccp_alpha in ccp_alphas:
clf = DecisionTreeRegressor(random_state=42, ccp_alpha=ccp_alpha)
clf.fit(X_train, y_train)
clfs.append(clf)
# 移除最后一个(只有根节点的树)
clfs = clfs[:-1]
ccp_alphas = ccp_alphas[:-1]
# 计算训练和测试分数
train_scores = [clf.score(X_train, y_train) for clf in clfs]
test_scores = [clf.score(X_test, y_test) for clf in clfs]
# 可视化剪枝效果
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
# 剪枝路径
ax1.plot(ccp_alphas, train_scores, marker='o', label='训练集', drawstyle="steps-post")
ax1.plot(ccp_alphas, test_scores, marker='o', label='测试集', drawstyle="steps-post")
ax1.set_xlabel('Alpha')
ax1.set_ylabel('R² Score')
ax1.set_title('剪枝路径')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 节点数量变化
node_counts = [clf.tree_.node_count for clf in clfs]
ax2.plot(ccp_alphas, node_counts, marker='o', drawstyle="steps-post")
ax2.set_xlabel('Alpha')
ax2.set_ylabel('节点数量')
ax2.set_title('剪枝后节点数量')
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 找到最佳alpha
best_alpha_idx = np.argmax(test_scores)
best_alpha = ccp_alphas[best_alpha_idx]
best_score = test_scores[best_alpha_idx]
print(f"最佳alpha: {best_alpha:.6f}")
print(f"最佳测试分数: {best_score:.4f}")
return best_alpha, clfs[best_alpha_idx]
# 演示决策树回归
reg_demo = DecisionTreeRegressionDemo()
# 创建回归数据
X_reg_tree, y_reg_tree = reg_demo.create_regression_data()
print("\n决策树回归演示:")
print("=" * 30)
# 比较不同深度
reg_demo.compare_tree_depths(X_reg_tree, y_reg_tree)
# 剪枝演示
best_alpha, best_model = reg_demo.pruning_demonstration(X_reg_tree, y_reg_tree)
3.5 随机森林
3.5.1 随机森林原理
随机森林是一种集成学习方法,通过构建多个决策树并结合它们的预测来提高性能。
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.ensemble import ExtraTreesClassifier, ExtraTreesRegressor
class RandomForestAnalyzer:
"""
随机森林分析器
"""
def __init__(self):
self.models = {}
self.feature_names = None
def demonstrate_bootstrap_sampling(self, X, y, n_samples=100):
"""
演示Bootstrap采样
"""
print("Bootstrap采样演示:")
print("=" * 25)
original_indices = np.arange(len(X))
# 生成几个bootstrap样本
fig, axes = plt.subplots(2, 2, figsize=(12, 8))
axes = axes.ravel()
for i in range(4):
# Bootstrap采样
bootstrap_indices = np.random.choice(len(X), size=len(X), replace=True)
unique_indices, counts = np.unique(bootstrap_indices, return_counts=True)
ax = axes[i]
# 可视化采样分布
ax.bar(unique_indices, counts, alpha=0.7)
ax.set_xlabel('原始样本索引')
ax.set_ylabel('采样次数')
ax.set_title(f'Bootstrap样本 {i+1}\n唯一样本: {len(unique_indices)}/{len(X)}')
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 统计信息
bootstrap_stats = []
for _ in range(1000):
bootstrap_indices = np.random.choice(len(X), size=len(X), replace=True)
unique_count = len(np.unique(bootstrap_indices))
bootstrap_stats.append(unique_count / len(X))
print(f"1000次Bootstrap采样统计:")
print(f"平均唯一样本比例: {np.mean(bootstrap_stats):.3f}")
print(f"标准差: {np.std(bootstrap_stats):.3f}")
print(f"理论值 (1-1/e): {1-1/np.e:.3f}")
def train_random_forest_models(self, X_train, y_train, X_test, y_test, problem_type='classification'):
"""
训练随机森林模型
"""
if problem_type == 'classification':
models = {
'Random Forest (默认)': RandomForestClassifier(random_state=42),
'Random Forest (100棵树)': RandomForestClassifier(n_estimators=100, random_state=42),
'Random Forest (深度限制)': RandomForestClassifier(n_estimators=100, max_depth=5, random_state=42),
'Extra Trees': ExtraTreesClassifier(n_estimators=100, random_state=42),
'Random Forest (调优)': RandomForestClassifier(
n_estimators=200, max_depth=10, min_samples_split=5,
min_samples_leaf=2, random_state=42
)
}
else:
models = {
'Random Forest (默认)': RandomForestRegressor(random_state=42),
'Random Forest (100棵树)': RandomForestRegressor(n_estimators=100, random_state=42),
'Random Forest (深度限制)': RandomForestRegressor(n_estimators=100, max_depth=5, random_state=42),
'Extra Trees': ExtraTreesRegressor(n_estimators=100, random_state=42),
'Random Forest (调优)': RandomForestRegressor(
n_estimators=200, max_depth=10, min_samples_split=5,
min_samples_leaf=2, random_state=42
)
}
results = {}
for name, model in models.items():
print(f"\n训练 {name}...")
# 训练
model.fit(X_train, y_train)
# 预测
y_pred_train = model.predict(X_train)
y_pred_test = model.predict(X_test)
if problem_type == 'classification':
train_score = accuracy_score(y_train, y_pred_train)
test_score = accuracy_score(y_test, y_pred_test)
metric_name = '准确率'
else:
train_score = r2_score(y_train, y_pred_train)
test_score = r2_score(y_test, y_pred_test)
metric_name = 'R²分数'
print(f" 训练{metric_name}: {train_score:.4f}")
print(f" 测试{metric_name}: {test_score:.4f}")
self.models[name] = model
results[name] = {
'train_score': train_score,
'test_score': test_score,
'y_pred_test': y_pred_test
}
return results
def analyze_feature_importance(self, feature_names=None):
"""
分析特征重要性
"""
if not self.models:
print("没有训练好的模型")
return
# 收集特征重要性
importance_data = {}
for name, model in self.models.items():
if hasattr(model, 'feature_importances_'):
importance_data[name] = model.feature_importances_
if not importance_data:
return
# 创建DataFrame便于分析
n_features = len(list(importance_data.values())[0])
feature_names = feature_names or [f'特征{i+1}' for i in range(n_features)]
importance_df = pd.DataFrame(importance_data, index=feature_names)
# 可视化
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# 特征重要性热图
sns.heatmap(importance_df.T, annot=True, fmt='.3f', cmap='viridis', ax=ax1)
ax1.set_title('特征重要性热图')
ax1.set_xlabel('特征')
ax1.set_ylabel('模型')
# 平均特征重要性
mean_importance = importance_df.mean(axis=1).sort_values(ascending=True)
mean_importance.plot(kind='barh', ax=ax2)
ax2.set_title('平均特征重要性')
ax2.set_xlabel('重要性')
plt.tight_layout()
plt.show()
return importance_df
def plot_learning_curves_trees(self, X_train, y_train, X_test, y_test, problem_type='classification'):
"""
绘制树数量的学习曲线
"""
n_estimators_range = range(10, 201, 20)
train_scores = []
test_scores = []
for n_estimators in n_estimators_range:
if problem_type == 'classification':
model = RandomForestClassifier(n_estimators=n_estimators, random_state=42)
model.fit(X_train, y_train)
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)
else:
model = RandomForestRegressor(n_estimators=n_estimators, random_state=42)
model.fit(X_train, y_train)
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)
train_scores.append(train_score)
test_scores.append(test_score)
plt.figure(figsize=(10, 6))
plt.plot(n_estimators_range, train_scores, 'o-', label='训练集', linewidth=2)
plt.plot(n_estimators_range, test_scores, 'o-', label='测试集', linewidth=2)
plt.xlabel('树的数量')
plt.ylabel('性能分数')
plt.title('随机森林:树数量 vs 性能')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
return n_estimators_range, train_scores, test_scores
# 演示随机森林
rf_analyzer = RandomForestAnalyzer()
# Bootstrap采样演示
rf_analyzer.demonstrate_bootstrap_sampling(X_iris, y_iris)
print("\n随机森林分类演示:")
print("=" * 30)
# 训练随机森林分类模型
rf_clf_results = rf_analyzer.train_random_forest_models(
X_train_iris, y_train_iris, X_test_iris, y_test_iris, 'classification'
)
# 特征重要性分析
importance_df = rf_analyzer.analyze_feature_importance(iris.feature_names)
# 学习曲线
n_est_range, train_scores, test_scores = rf_analyzer.plot_learning_curves_trees(
X_train_iris, y_train_iris, X_test_iris, y_test_iris, 'classification'
)
print(f"\n最佳树数量: {n_est_range[np.argmax(test_scores)]}")
print(f"最佳测试分数: {max(test_scores):.4f}")
3.6 支持向量机 (SVM)
3.6.1 SVM原理
支持向量机是一种强大的监督学习算法,通过寻找最优分离超平面来进行分类和回归。
from sklearn.svm import SVC, SVR
from sklearn.svm import LinearSVC, LinearSVR
class SVMAnalyzer:
"""
支持向量机分析器
"""
def __init__(self):
self.models = {}
self.scaler = None
def demonstrate_svm_concepts(self):
"""
演示SVM核心概念
"""
# 创建线性可分数据
np.random.seed(42)
X_linearly_separable = np.array([
[1, 2], [2, 3], [3, 3], [2, 1], [3, 2], # 类别0
[6, 6], [7, 7], [8, 6], [7, 5], [8, 8] # 类别1
])
y_linearly_separable = np.array([0, 0, 0, 0, 0, 1, 1, 1, 1, 1])
# 训练线性SVM
svm_linear = SVC(kernel='linear', C=1.0)
svm_linear.fit(X_linearly_separable, y_linearly_separable)
# 可视化
plt.figure(figsize=(12, 5))
# 子图1:线性可分情况
plt.subplot(1, 2, 1)
# 绘制数据点
colors = ['red', 'blue']
for i in range(2):
mask = y_linearly_separable == i
plt.scatter(X_linearly_separable[mask, 0], X_linearly_separable[mask, 1],
c=colors[i], label=f'类别{i}', s=100, alpha=0.8)
# 绘制决策边界和支持向量
ax = plt.gca()
xlim = ax.get_xlim()
ylim = ax.get_ylim()
# 创建网格
xx = np.linspace(xlim[0], xlim[1], 30)
yy = np.linspace(ylim[0], ylim[1], 30)
YY, XX = np.meshgrid(yy, xx)
xy = np.vstack([XX.ravel(), YY.ravel()]).T
Z = svm_linear.decision_function(xy).reshape(XX.shape)
# 绘制决策边界和间隔
ax.contour(XX, YY, Z, colors='k', levels=[-1, 0, 1], alpha=0.5,
linestyles=['--', '-', '--'])
# 标记支持向量
ax.scatter(svm_linear.support_vectors_[:, 0], svm_linear.support_vectors_[:, 1],
s=300, linewidth=1, facecolors='none', edgecolors='k', label='支持向量')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title('线性SVM - 线性可分数据')
plt.legend()
plt.grid(True, alpha=0.3)
# 子图2:非线性数据
plt.subplot(1, 2, 2)
# 创建非线性数据(圆形分布)
from sklearn.datasets import make_circles
X_circles, y_circles = make_circles(n_samples=100, noise=0.1, factor=0.3, random_state=42)
# 训练RBF核SVM
svm_rbf = SVC(kernel='rbf', C=1.0, gamma='scale')
svm_rbf.fit(X_circles, y_circles)
# 绘制数据点
for i in range(2):
mask = y_circles == i
plt.scatter(X_circles[mask, 0], X_circles[mask, 1],
c=colors[i], label=f'类别{i}', alpha=0.8)
# 绘制决策边界
ax = plt.gca()
xlim = ax.get_xlim()
ylim = ax.get_ylim()
xx = np.linspace(xlim[0], xlim[1], 100)
yy = np.linspace(ylim[0], ylim[1], 100)
YY, XX = np.meshgrid(yy, xx)
xy = np.vstack([XX.ravel(), YY.ravel()]).T
Z = svm_rbf.decision_function(xy).reshape(XX.shape)
ax.contour(XX, YY, Z, colors='k', levels=[0], alpha=0.5, linestyles=['-'])
ax.contourf(XX, YY, Z, levels=50, alpha=0.3, cmap='RdYlBu')
plt.xlabel('特征1')
plt.ylabel('特征2')
plt.title('RBF核SVM - 非线性数据')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("SVM核心概念:")
print(f"线性SVM支持向量数量: {len(svm_linear.support_vectors_)}")
print(f"RBF SVM支持向量数量: {len(svm_rbf.support_vectors_)}")
return svm_linear, svm_rbf
def train_svm_models(self, X_train, y_train, X_test, y_test, problem_type='classification'):
"""
训练不同核函数的SVM模型
"""
# 标准化数据
self.scaler = StandardScaler()
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)
if problem_type == 'classification':
models = {
'Linear SVM': SVC(kernel='linear', C=1.0, random_state=42),
'RBF SVM': SVC(kernel='rbf', C=1.0, gamma='scale', random_state=42),
'Polynomial SVM': SVC(kernel='poly', degree=3, C=1.0, random_state=42),
'Sigmoid SVM': SVC(kernel='sigmoid', C=1.0, random_state=42)
}
else:
models = {
'Linear SVR': SVR(kernel='linear', C=1.0),
'RBF SVR': SVR(kernel='rbf', C=1.0, gamma='scale'),
'Polynomial SVR': SVR(kernel='poly', degree=3, C=1.0),
'Sigmoid SVR': SVR(kernel='sigmoid', C=1.0)
}
results = {}
for name, model in models.items():
print(f"\n训练 {name}...")
# 训练
model.fit(X_train_scaled, y_train)
# 预测
y_pred_train = model.predict(X_train_scaled)
y_pred_test = model.predict(X_test_scaled)
if problem_type == 'classification':
train_score = accuracy_score(y_train, y_pred_train)
test_score = accuracy_score(y_test, y_pred_test)
metric_name = '准确率'
else:
train_score = r2_score(y_train, y_pred_train)
test_score = r2_score(y_test, y_pred_test)
metric_name = 'R²分数'
print(f" 训练{metric_name}: {train_score:.4f}")
print(f" 测试{metric_name}: {test_score:.4f}")
print(f" 支持向量数量: {len(model.support_vectors_)}")
self.models[name] = model
results[name] = {
'train_score': train_score,
'test_score': test_score,
'n_support_vectors': len(model.support_vectors_),
'y_pred_test': y_pred_test
}
return results, X_train_scaled, X_test_scaled
def hyperparameter_tuning(self, X_train, y_train, problem_type='classification'):
"""
SVM超参数调优
"""
print("\nSVM超参数调优:")
print("=" * 25)
if problem_type == 'classification':
# 定义参数网格
param_grids = [
{
'kernel': ['linear'],
'C': [0.1, 1, 10, 100]
},
{
'kernel': ['rbf'],
'C': [0.1, 1, 10, 100],
'gamma': ['scale', 'auto', 0.001, 0.01, 0.1, 1]
},
{
'kernel': ['poly'],
'C': [0.1, 1, 10],
'degree': [2, 3, 4],
'gamma': ['scale', 'auto']
}
]
base_model = SVC(random_state=42)
else:
param_grids = [
{
'kernel': ['linear'],
'C': [0.1, 1, 10, 100]
},
{
'kernel': ['rbf'],
'C': [0.1, 1, 10, 100],
'gamma': ['scale', 'auto', 0.001, 0.01, 0.1, 1]
}
]
base_model = SVR()
best_models = {}
for i, param_grid in enumerate(param_grids):
kernel_name = param_grid['kernel'][0]
print(f"\n调优 {kernel_name} 核...")
grid_search = GridSearchCV(
base_model, param_grid, cv=5, scoring='accuracy' if problem_type == 'classification' else 'r2',
n_jobs=-1, verbose=0
)
grid_search.fit(X_train, y_train)
print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳交叉验证分数: {grid_search.best_score_:.4f}")
best_models[kernel_name] = grid_search.best_estimator_
return best_models
def plot_svm_decision_boundaries(self, X, y, model_names=None):
"""
绘制SVM决策边界(仅适用于2D数据)
"""
if X.shape[1] != 2:
print("决策边界可视化仅支持2维特征")
return
model_names = model_names or list(self.models.keys())[:4]
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
axes = axes.ravel()
for i, name in enumerate(model_names):
if i >= len(axes) or name not in self.models:
continue
ax = axes[i]
model = self.models[name]
# 标准化数据
X_scaled = self.scaler.transform(X)
# 创建网格
h = 0.02
x_min, x_max = X_scaled[:, 0].min() - 1, X_scaled[:, 0].max() + 1
y_min, y_max = X_scaled[:, 1].min() - 1, X_scaled[:, 1].max() + 1
xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
np.arange(y_min, y_max, h))
# 预测网格点
grid_points = np.c_[xx.ravel(), yy.ravel()]
Z = model.predict(grid_points)
Z = Z.reshape(xx.shape)
# 绘制决策区域
ax.contourf(xx, yy, Z, alpha=0.6, cmap='viridis')
# 绘制数据点
scatter = ax.scatter(X_scaled[:, 0], X_scaled[:, 1], c=y,
cmap='viridis', edgecolors='black', alpha=0.8)
# 绘制支持向量
if hasattr(model, 'support_vectors_'):
ax.scatter(model.support_vectors_[:, 0], model.support_vectors_[:, 1],
s=300, linewidth=1, facecolors='none', edgecolors='red')
ax.set_xlabel('特征1 (标准化)')
ax.set_ylabel('特征2 (标准化)')
ax.set_title(f'{name}')
plt.tight_layout()
plt.show()
# 演示SVM
svm_analyzer = SVMAnalyzer()
# SVM概念演示
linear_svm, rbf_svm = svm_analyzer.demonstrate_svm_concepts()
print("\nSVM分类演示:")
print("=" * 20)
# 训练SVM模型
svm_results, X_train_svm_scaled, X_test_svm_scaled = svm_analyzer.train_svm_models(
X_train_iris, y_train_iris, X_test_iris, y_test_iris, 'classification'
)
# 超参数调优
best_svm_models = svm_analyzer.hyperparameter_tuning(
X_train_svm_scaled, y_train_iris, 'classification'
)
# 绘制决策边界(使用前两个特征)
X_iris_2d = X_iris[:, :2] # 只使用前两个特征
X_train_2d, X_test_2d, y_train_2d, y_test_2d = train_test_split(
X_iris_2d, y_iris, test_size=0.2, random_state=42, stratify=y_iris
)
# 重新训练2D模型用于可视化
svm_2d_analyzer = SVMAnalyzer()
svm_2d_results, _, _ = svm_2d_analyzer.train_svm_models(
X_train_2d, y_train_2d, X_test_2d, y_test_2d, 'classification'
)
svm_2d_analyzer.plot_svm_decision_boundaries(X_iris_2d, y_iris)
3.7 k近邻算法 (k-NN)
3.7.1 k-NN原理
k近邻算法是一种基于实例的学习方法,通过寻找k个最近邻居来进行预测。
from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor
from sklearn.neighbors import RadiusNeighborsClassifier, RadiusNeighborsRegressor
from sklearn.metrics import pairwise_distances
class KNNAnalyzer:
"""
k近邻算法分析器
"""
def __init__(self):
self.models = {}
self.scaler = None
def demonstrate_knn_concept(self):
"""
演示k-NN核心概念
"""
# 创建简单的2D数据
np.random.seed(42)
X_demo = np.array([
[1, 1], [1, 2], [2, 1], [2, 2], # 类别0
[4, 4], [4, 5], [5, 4], [5, 5], # 类别1
[7, 1], [7, 2], [8, 1], [8, 2] # 类别2
])
y_demo = np.array([0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2])
# 新的测试点
test_point = np.array([[3, 3]])
# 计算距离
distances = pairwise_distances(test_point, X_demo)[0]
# 可视化
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
colors = ['red', 'blue', 'green']
k_values = [1, 3, 5]
for i, k in enumerate(k_values):
ax = axes[i]
# 绘制训练数据
for class_idx in range(3):
mask = y_demo == class_idx
ax.scatter(X_demo[mask, 0], X_demo[mask, 1],
c=colors[class_idx], label=f'类别{class_idx}', s=100, alpha=0.7)
# 绘制测试点
ax.scatter(test_point[0, 0], test_point[0, 1],
c='black', marker='x', s=200, linewidth=3, label='测试点')
# 找到k个最近邻
nearest_indices = np.argsort(distances)[:k]
# 绘制最近邻连线
for idx in nearest_indices:
ax.plot([test_point[0, 0], X_demo[idx, 0]],
[test_point[0, 1], X_demo[idx, 1]],
'k--', alpha=0.5, linewidth=1)
# 高亮最近邻
ax.scatter(X_demo[nearest_indices, 0], X_demo[nearest_indices, 1],
s=300, facecolors='none', edgecolors='black', linewidth=2)
# 预测
nearest_labels = y_demo[nearest_indices]
prediction = np.bincount(nearest_labels).argmax()
ax.set_xlabel('特征1')
ax.set_ylabel('特征2')
ax.set_title(f'k={k}, 预测类别: {prediction}')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("k-NN预测过程:")
for k in k_values:
nearest_indices = np.argsort(distances)[:k]
nearest_labels = y_demo[nearest_indices]
nearest_distances = distances[nearest_indices]
prediction = np.bincount(nearest_labels).argmax()
print(f"\nk={k}:")
print(f" 最近邻索引: {nearest_indices}")
print(f" 最近邻标签: {nearest_labels}")
print(f" 距离: {nearest_distances}")
print(f" 预测类别: {prediction}")
def train_knn_models(self, X_train, y_train, X_test, y_test, problem_type='classification'):
"""
训练不同参数的k-NN模型
"""
# 标准化数据
self.scaler = StandardScaler()
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)
if problem_type == 'classification':
models = {
'k-NN (k=1)': KNeighborsClassifier(n_neighbors=1),
'k-NN (k=3)': KNeighborsClassifier(n_neighbors=3),
'k-NN (k=5)': KNeighborsClassifier(n_neighbors=5),
'k-NN (k=7)': KNeighborsClassifier(n_neighbors=7),
'k-NN (距离权重)': KNeighborsClassifier(n_neighbors=5, weights='distance'),
'k-NN (曼哈顿距离)': KNeighborsClassifier(n_neighbors=5, metric='manhattan')
}
else:
models = {
'k-NN (k=1)': KNeighborsRegressor(n_neighbors=1),
'k-NN (k=3)': KNeighborsRegressor(n_neighbors=3),
'k-NN (k=5)': KNeighborsRegressor(n_neighbors=5),
'k-NN (k=7)': KNeighborsRegressor(n_neighbors=7),
'k-NN (距离权重)': KNeighborsRegressor(n_neighbors=5, weights='distance'),
'k-NN (曼哈顿距离)': KNeighborsRegressor(n_neighbors=5, metric='manhattan')
}
results = {}
for name, model in models.items():
print(f"\n训练 {name}...")
# 训练
model.fit(X_train_scaled, y_train)
# 预测
y_pred_train = model.predict(X_train_scaled)
y_pred_test = model.predict(X_test_scaled)
if problem_type == 'classification':
train_score = accuracy_score(y_train, y_pred_train)
test_score = accuracy_score(y_test, y_pred_test)
metric_name = '准确率'
else:
train_score = r2_score(y_train, y_pred_train)
test_score = r2_score(y_test, y_pred_test)
metric_name = 'R²分数'
print(f" 训练{metric_name}: {train_score:.4f}")
print(f" 测试{metric_name}: {test_score:.4f}")
self.models[name] = model
results[name] = {
'train_score': train_score,
'test_score': test_score,
'y_pred_test': y_pred_test
}
return results, X_train_scaled, X_test_scaled
def find_optimal_k(self, X_train, y_train, max_k=20, problem_type='classification'):
"""
寻找最优k值
"""
k_range = range(1, max_k + 1)
cv_scores = []
for k in k_range:
if problem_type == 'classification':
knn = KNeighborsClassifier(n_neighbors=k)
scores = cross_val_score(knn, X_train, y_train, cv=5, scoring='accuracy')
else:
knn = KNeighborsRegressor(n_neighbors=k)
scores = cross_val_score(knn, X_train, y_train, cv=5, scoring='r2')
cv_scores.append(scores.mean())
# 可视化
plt.figure(figsize=(10, 6))
plt.plot(k_range, cv_scores, 'o-', linewidth=2, markersize=6)
# 标记最优k
best_k = k_range[np.argmax(cv_scores)]
best_score = max(cv_scores)
plt.axvline(x=best_k, color='red', linestyle='--', alpha=0.7,
label=f'最优k={best_k}')
plt.xlabel('k值')
plt.ylabel('交叉验证分数')
plt.title('k-NN: k值选择')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
print(f"最优k值: {best_k}")
print(f"最佳交叉验证分数: {best_score:.4f}")
return best_k, cv_scores
def plot_knn_decision_boundaries(self, X, y, k_values=[1, 3, 5, 7]):
"""
绘制不同k值的决策边界
"""
if X.shape[1] != 2:
print("决策边界可视化仅支持2维特征")
return
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
axes = axes.ravel()
X_scaled = self.scaler.transform(X)
for i, k in enumerate(k_values):
if i >= len(axes):
break
ax = axes[i]
# 训练模型
knn = KNeighborsClassifier(n_neighbors=k)
knn.fit(X_scaled, y)
# 创建网格
h = 0.02
x_min, x_max = X_scaled[:, 0].min() - 1, X_scaled[:, 0].max() + 1
y_min, y_max = X_scaled[:, 1].min() - 1, X_scaled[:, 1].max() + 1
xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
np.arange(y_min, y_max, h))
# 预测网格点
grid_points = np.c_[xx.ravel(), yy.ravel()]
Z = knn.predict(grid_points)
Z = Z.reshape(xx.shape)
# 绘制决策区域
ax.contourf(xx, yy, Z, alpha=0.6, cmap='viridis')
# 绘制数据点
scatter = ax.scatter(X_scaled[:, 0], X_scaled[:, 1], c=y,
cmap='viridis', edgecolors='black', alpha=0.8)
ax.set_xlabel('特征1 (标准化)')
ax.set_ylabel('特征2 (标准化)')
ax.set_title(f'k-NN (k={k})')
plt.tight_layout()
plt.show()
# 演示k-NN
knn_analyzer = KNNAnalyzer()
# k-NN概念演示
knn_analyzer.demonstrate_knn_concept()
print("\nk-NN分类演示:")
print("=" * 20)
# 训练k-NN模型
knn_results, X_train_knn_scaled, X_test_knn_scaled = knn_analyzer.train_knn_models(
X_train_iris, y_train_iris, X_test_iris, y_test_iris, 'classification'
)
# 寻找最优k值
best_k, cv_scores = knn_analyzer.find_optimal_k(
X_train_knn_scaled, y_train_iris, max_k=15, problem_type='classification'
)
# 绘制决策边界(使用2D数据)
knn_2d_analyzer = KNNAnalyzer()
knn_2d_results, _, _ = knn_2d_analyzer.train_knn_models(
X_train_2d, y_train_2d, X_test_2d, y_test_2d, 'classification'
)
knn_2d_analyzer.plot_knn_decision_boundaries(X_iris_2d, y_iris)
3.8 算法比较与选择
3.8.1 算法性能比较
class AlgorithmComparison:
"""
监督学习算法比较
"""
def __init__(self):
self.results = {}
def compare_all_algorithms(self, X_train, y_train, X_test, y_test, problem_type='classification'):
"""
比较所有算法
"""
# 标准化数据
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
if problem_type == 'classification':
algorithms = {
'Logistic Regression': LogisticRegression(random_state=42),
'Decision Tree': DecisionTreeClassifier(random_state=42),
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
'SVM (RBF)': SVC(kernel='rbf', random_state=42),
'k-NN': KNeighborsClassifier(n_neighbors=5)
}
scoring_metrics = ['accuracy', 'precision_weighted', 'recall_weighted', 'f1_weighted']
else:
algorithms = {
'Linear Regression': LinearRegression(),
'Decision Tree': DecisionTreeRegressor(random_state=42),
'Random Forest': RandomForestRegressor(n_estimators=100, random_state=42),
'SVR (RBF)': SVR(kernel='rbf'),
'k-NN': KNeighborsRegressor(n_neighbors=5)
}
scoring_metrics = ['r2', 'neg_mean_squared_error', 'neg_mean_absolute_error']
comparison_results = {}
for name, algorithm in algorithms.items():
print(f"\n评估 {name}...")
# 交叉验证
cv_results = {}
for metric in scoring_metrics:
scores = cross_val_score(algorithm, X_train_scaled, y_train,
cv=5, scoring=metric)
cv_results[metric] = {
'mean': scores.mean(),
'std': scores.std()
}
# 训练和测试
algorithm.fit(X_train_scaled, y_train)
y_pred_test = algorithm.predict(X_test_scaled)
if problem_type == 'classification':
test_accuracy = accuracy_score(y_test, y_pred_test)
test_precision = precision_score(y_test, y_pred_test, average='weighted')
test_recall = recall_score(y_test, y_pred_test, average='weighted')
test_f1 = f1_score(y_test, y_pred_test, average='weighted')
test_scores = {
'accuracy': test_accuracy,
'precision': test_precision,
'recall': test_recall,
'f1': test_f1
}
else:
test_r2 = r2_score(y_test, y_pred_test)
test_mse = mean_squared_error(y_test, y_pred_test)
test_mae = mean_absolute_error(y_test, y_pred_test)
test_scores = {
'r2': test_r2,
'mse': test_mse,
'mae': test_mae
}
comparison_results[name] = {
'cv_results': cv_results,
'test_scores': test_scores,
'model': algorithm
}
self.results = comparison_results
return comparison_results
def plot_comparison_results(self, problem_type='classification'):
"""
可视化比较结果
"""
if not self.results:
print("没有比较结果")
return
algorithms = list(self.results.keys())
if problem_type == 'classification':
metrics = ['accuracy', 'precision', 'recall', 'f1']
cv_metrics = ['accuracy', 'precision_weighted', 'recall_weighted', 'f1_weighted']
else:
metrics = ['r2', 'mse', 'mae']
cv_metrics = ['r2', 'neg_mean_squared_error', 'neg_mean_absolute_error']
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
axes = axes.ravel()
# 交叉验证结果
for i, (metric, cv_metric) in enumerate(zip(metrics[:2], cv_metrics[:2])):
ax = axes[i]
means = [self.results[alg]['cv_results'][cv_metric]['mean'] for alg in algorithms]
stds = [self.results[alg]['cv_results'][cv_metric]['std'] for alg in algorithms]
bars = ax.bar(algorithms, means, yerr=stds, capsize=5, alpha=0.7)
ax.set_ylabel(f'{metric.upper()}')
ax.set_title(f'交叉验证 {metric.upper()}')
ax.tick_params(axis='x', rotation=45)
# 添加数值标签
for bar, mean in zip(bars, means):
height = bar.get_height()
ax.text(bar.get_x() + bar.get_width()/2., height + 0.01,
f'{mean:.3f}', ha='center', va='bottom')
# 测试集结果
for i, metric in enumerate(metrics[:2]):
ax = axes[i + 2]
scores = [self.results[alg]['test_scores'][metric] for alg in algorithms]
bars = ax.bar(algorithms, scores, alpha=0.7, color='orange')
ax.set_ylabel(f'{metric.upper()}')
ax.set_title(f'测试集 {metric.upper()}')
ax.tick_params(axis='x', rotation=45)
# 添加数值标签
for bar, score in zip(bars, scores):
height = bar.get_height()
ax.text(bar.get_x() + bar.get_width()/2., height + 0.01,
f'{score:.3f}', ha='center', va='bottom')
plt.tight_layout()
plt.show()
def create_comparison_table(self, problem_type='classification'):
"""
创建比较表格
"""
if not self.results:
print("没有比较结果")
return
data = []
for alg_name, result in self.results.items():
if problem_type == 'classification':
row = {
'算法': alg_name,
'CV准确率': f"{result['cv_results']['accuracy']['mean']:.3f} ± {result['cv_results']['accuracy']['std']:.3f}",
'测试准确率': f"{result['test_scores']['accuracy']:.3f}",
'测试精确率': f"{result['test_scores']['precision']:.3f}",
'测试召回率': f"{result['test_scores']['recall']:.3f}",
'测试F1': f"{result['test_scores']['f1']:.3f}"
}
else:
row = {
'算法': alg_name,
'CV R²': f"{result['cv_results']['r2']['mean']:.3f} ± {result['cv_results']['r2']['std']:.3f}",
'测试R²': f"{result['test_scores']['r2']:.3f}",
'测试MSE': f"{result['test_scores']['mse']:.3f}",
'测试MAE': f"{result['test_scores']['mae']:.3f}"
}
data.append(row)
df = pd.DataFrame(data)
print("\n算法性能比较表:")
print("=" * 60)
print(df.to_string(index=False))
return df
# 算法比较演示
comparison = AlgorithmComparison()
print("\n监督学习算法比较:")
print("=" * 30)
# 比较所有算法
comparison_results = comparison.compare_all_algorithms(
X_train_iris, y_train_iris, X_test_iris, y_test_iris, 'classification'
)
# 可视化比较结果
comparison.plot_comparison_results('classification')
# 创建比较表格
comparison_df = comparison.create_comparison_table('classification')
3.9 本章小结
3.9.1 核心内容回顾
本章详细介绍了监督学习的主要算法:
线性回归
- 基础线性回归
- 正则化回归(Ridge、Lasso、ElasticNet)
- 多项式回归
逻辑回归
- 二分类逻辑回归
- 多类分类策略
- 决策边界可视化
决策树
- 信息增益和基尼不纯度
- 决策树分类和回归
- 剪枝技术
随机森林
- Bootstrap采样
- 特征重要性分析
- 集成学习原理
支持向量机
- 线性和非线性SVM
- 核函数技巧
- 超参数调优
k近邻算法
- 距离度量
- k值选择
- 权重策略
3.9.2 算法选择指南
算法 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
线性回归 | 简单、可解释性强 | 假设线性关系 | 线性关系明显的回归问题 |
逻辑回归 | 概率输出、快速 | 假设线性可分 | 二分类、需要概率的场景 |
决策树 | 可解释性强、处理非线性 | 容易过拟合 | 需要可解释性的分类问题 |
随机森林 | 性能好、鲁棒性强 | 黑盒模型 | 大多数分类和回归问题 |
SVM | 处理高维数据好 | 参数敏感、训练慢 | 高维数据、小样本 |
k-NN | 简单、无参数假设 | 计算量大、存储需求高 | 局部模式明显的问题 |
3.9.3 实践建议
- 数据预处理:始终进行适当的数据预处理
- 特征缩放:对距离敏感的算法(SVM、k-NN)进行标准化
- 交叉验证:使用交叉验证评估模型性能
- 超参数调优:使用网格搜索或随机搜索优化参数
- 集成方法:考虑使用集成方法提高性能
3.10 下一章预告
下一章我们将学习无监督学习算法,包括: - 聚类算法(K-means、层次聚类、DBSCAN) - 降维技术(PCA、t-SNE、UMAP) - 关联规则挖掘 - 异常检测
3.11 练习题
基础练习
- 实现一个简单的线性回归算法(不使用sklearn)
- 比较不同正则化参数对模型性能的影响
- 手动计算决策树的信息增益
- 实现k-NN算法的距离计算
进阶练习
- 在真实数据集上比较所有监督学习算法
- 实现自定义的集成学习算法
- 分析不同核函数对SVM性能的影响
- 研究样本不平衡对各算法的影响
项目练习
- 房价预测项目:使用多种回归算法预测房价
- 文本分类项目:实现垃圾邮件分类系统
- 图像分类项目:使用传统机器学习方法进行图像分类
- 推荐系统项目:基于协同过滤的推荐算法
思考题
为什么集成学习通常比单一算法性能更好?
在什么情况下简单模型比复杂模型更好?
如何处理高维数据的维度诅咒问题?
监督学习和无监督学习的本质区别是什么?0.3)
plt.tight_layout() plt.show() return coef_data
使用线性回归分析器
analyzer = LinearRegressionAnalyzer()
创建更复杂的回归数据
np.random.seed(42) n_samples = 500 n_features = 5
X_complex = np.random.randn(n_samples, n_features)
创建真实系数
true_coef = np.array([2.5, -1.5, 3.0, 0.5, -2.0]) y_complex = X_complex @ true_coef + np.random.randn(n_samples) * 0.5
分割数据
X_train, X_test, y_train, y_test = train_test_split( X_complex, y_complex, test_size=0.2, random_state=42 )
标准化
scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) X_test_scaled = scaler.transform(X_test)
训练模型
analyzer.fit_models(X_train_scaled, y_train, X_test_scaled, y_test)
可视化结果
analyzer.plot_results(X_test_scaled, y_test)
比较系数
coef_comparison = analyzer.compare_coefficients()
### 3.2.2 正则化回归
正则化技术用于防止过拟合,主要包括Ridge、Lasso和ElasticNet。
```python
class RegularizationDemo:
"""
正则化回归演示
"""
def __init__(self):
self.alphas = np.logspace(-4, 2, 50)
def regularization_path(self, X_train, y_train, X_test, y_test):
"""
绘制正则化路径
"""
ridge_train_scores = []
ridge_test_scores = []
lasso_train_scores = []
lasso_test_scores = []
lasso_n_features = []
for alpha in self.alphas:
# Ridge回归
ridge = Ridge(alpha=alpha)
ridge.fit(X_train, y_train)
ridge_train_scores.append(ridge.score(X_train, y_train))
ridge_test_scores.append(ridge.score(X_test, y_test))
# Lasso回归
lasso = Lasso(alpha=alpha, max_iter=2000)
lasso.fit(X_train, y_train)
lasso_train_scores.append(lasso.score(X_train, y_train))
lasso_test_scores.append(lasso.score(X_test, y_test))
lasso_n_features.append(np.sum(lasso.coef_ != 0))
# 可视化
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
# Ridge正则化路径
axes[0, 0].semilogx(self.alphas, ridge_train_scores, 'b-', label='训练集')
axes[0, 0].semilogx(self.alphas, ridge_test_scores, 'r-', label='测试集')
axes[0, 0].set_xlabel('Alpha')
axes[0, 0].set_ylabel('R² Score')
axes[0, 0].set_title('Ridge回归正则化路径')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# Lasso正则化路径
axes[0, 1].semilogx(self.alphas, lasso_train_scores, 'b-', label='训练集')
axes[0, 1].semilogx(self.alphas, lasso_test_scores, 'r-', label='测试集')
axes[0, 1].set_xlabel('Alpha')
axes[0, 1].set_ylabel('R² Score')
axes[0, 1].set_title('Lasso回归正则化路径')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# Lasso特征选择
axes[1, 0].semilogx(self.alphas, lasso_n_features, 'g-', linewidth=2)
axes[1, 0].set_xlabel('Alpha')
axes[1, 0].set_ylabel('选择的特征数量')
axes[1, 0].set_title('Lasso特征选择效果')
axes[1, 0].grid(True, alpha=0.3)
# 系数路径(Lasso)
lasso_coefs = []
for alpha in self.alphas:
lasso = Lasso(alpha=alpha, max_iter=2000)
lasso.fit(X_train, y_train)
lasso_coefs.append(lasso.coef_)
lasso_coefs = np.array(lasso_coefs)
for i in range(lasso_coefs.shape[1]):
axes[1, 1].semilogx(self.alphas, lasso_coefs[:, i], label=f'特征{i+1}')
axes[1, 1].set_xlabel('Alpha')
axes[1, 1].set_ylabel('系数值')
axes[1, 1].set_title('Lasso系数路径')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return {
'ridge_scores': (ridge_train_scores, ridge_test_scores),
'lasso_scores': (lasso_train_scores, lasso_test_scores),
'lasso_features': lasso_n_features
}
def find_optimal_alpha(self, X_train, y_train):
"""
使用交叉验证寻找最优alpha
"""
from sklearn.model_selection import RidgeCV, LassoCV
# Ridge交叉验证
ridge_cv = RidgeCV(alphas=self.alphas, cv=5)
ridge_cv.fit(X_train, y_train)
# Lasso交叉验证
lasso_cv = LassoCV(alphas=self.alphas, cv=5, max_iter=2000)
lasso_cv.fit(X_train, y_train)
print("交叉验证结果:")
print(f"Ridge最优alpha: {ridge_cv.alpha_:.6f}")
print(f"Lasso最优alpha: {lasso_cv.alpha_:.6f}")
return ridge_cv.alpha_, lasso_cv.alpha_
# 演示正则化
reg_demo = RegularizationDemo()
# 创建有噪声和冗余特征的数据
np.random.seed(42)
n_samples = 200
n_features = 20
n_informative = 5
X_reg = np.random.randn(n_samples, n_features)
# 只有前5个特征是有用的
true_coef = np.zeros(n_features)
true_coef[:n_informative] = np.random.randn(n_informative) * 2
y_reg = X_reg @ true_coef + np.random.randn(n_samples) * 0.1
# 分割数据
X_train_reg, X_test_reg, y_train_reg, y_test_reg = train_test_split(
X_reg, y_reg, test_size=0.2, random_state=42
)
# 标准化
scaler_reg = StandardScaler()
X_train_reg_scaled = scaler_reg.fit_transform(X_train_reg)
X_test_reg_scaled = scaler_reg.transform(X_test_reg)
# 绘制正则化路径
reg_results = reg_demo.regularization_path(
X_train_reg_scaled, y_train_reg, X_test_reg_scaled, y_test_reg
)
# 寻找最优alpha
optimal_alphas = reg_demo.find_optimal_alpha(X_train_reg_scaled, y_train_reg)
3.2.3 多项式回归
多项式回归通过增加特征的高次项来捕捉非线性关系。
class PolynomialRegressionDemo:
"""
多项式回归演示
"""
def __init__(self):
self.degrees = range(1, 11)
def generate_nonlinear_data(self, n_samples=100):
"""
生成非线性数据
"""
np.random.seed(42)
X = np.linspace(-2, 2, n_samples).reshape(-1, 1)
y = 0.5 * X.ravel() ** 3 - 2 * X.ravel() ** 2 + X.ravel() + np.random.randn(n_samples) * 0.3
return X, y
def fit_polynomial_models(self, X, y):
"""
拟合不同阶数的多项式模型
"""
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
train_scores = []
test_scores = []
models = {}
for degree in self.degrees:
# 创建多项式特征
poly_features = PolynomialFeatures(degree=degree)
X_train_poly = poly_features.fit_transform(X_train)
X_test_poly = poly_features.transform(X_test)
# 训练线性回归模型
model = LinearRegression()
model.fit(X_train_poly, y_train)
# 评估
train_score = model.score(X_train_poly, y_train)
test_score = model.score(X_test_poly, y_test)
train_scores.append(train_score)
test_scores.append(test_score)
models[degree] = (model, poly_features)
print(f"阶数 {degree}: 训练R² = {train_score:.4f}, 测试R² = {test_score:.4f}")
return models, train_scores, test_scores, (X_train, X_test, y_train, y_test)
def plot_polynomial_fits(self, models, data, degrees_to_plot=[1, 3, 5, 9]):
"""
可视化不同阶数的多项式拟合
"""
X_train, X_test, y_train, y_test = data
# 创建用于绘图的密集点
X_plot = np.linspace(X_train.min(), X_train.max(), 300).reshape(-1, 1)
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
axes = axes.ravel()
for i, degree in enumerate(degrees_to_plot):
ax = axes[i]
if degree in models:
model, poly_features = models[degree]
X_plot_poly = poly_features.transform(X_plot)
y_plot = model.predict(X_plot_poly)
# 绘制数据点
ax.scatter(X_train, y_train, alpha=0.6, label='训练数据')
ax.scatter(X_test, y_test, alpha=0.6, color='red', label='测试数据')
# 绘制拟合曲线
ax.plot(X_plot, y_plot, color='green', linewidth=2, label=f'{degree}阶多项式')
ax.set_xlabel('X')
ax.set_ylabel('y')
ax.set_title(f'{degree}阶多项式回归')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def plot_learning_curves(self, train_scores, test_scores):
"""
绘制学习曲线
"""
plt.figure(figsize=(10, 6))
plt.plot(self.degrees, train_scores, 'o-', label='训练集R²', linewidth=2, markersize=6)
plt.plot(self.degrees, test_scores, 'o-', label='测试集R²', linewidth=2, markersize=6)
plt.xlabel('多项式阶数')
plt.ylabel('R² Score')
plt.title('多项式回归:训练集vs测试集性能')
plt.legend()
plt.grid(True, alpha=0.3)
# 标注过拟合区域
best_test_idx = np.argmax(test_scores)
plt.axvline(x=self.degrees[best_test_idx], color='red', linestyle='--', alpha=0.7,
label=f'最佳阶数: {self.degrees[best_test_idx]}')
plt.legend()
plt.show()
return self.degrees[best_test_idx]
# 演示多项式回归
poly_demo = PolynomialRegressionDemo()
# 生成非线性数据
X_nonlinear, y_nonlinear = poly_demo.generate_nonlinear_data(150)
print("多项式回归演示:")
print("=" * 30)
# 拟合多项式模型
poly_models, train_scores, test_scores, poly_data = poly_demo.fit_polynomial_models(
X_nonlinear, y_nonlinear
)
# 可视化拟合结果
poly_demo.plot_polynomial_fits(poly_models, poly_data)
# 绘制学习曲线
best_degree = poly_demo.plot_learning_curves(train_scores, test_scores)
print(f"\n最佳多项式阶数: {best_degree}")
3.3 逻辑回归
3.3.1 逻辑回归原理
逻辑回归是用于分类问题的线性模型,它使用逻辑函数(sigmoid函数)将线性组合映射到概率。
数学公式: $$P(y=1|x) = \frac{1}{1 + e^{-(\beta_0 + \beta_1 x_1 + … + \beta_n x_n)}}$$
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_curve, auc, roc_auc_score
from sklearn.metrics import precision_recall_curve, average_precision_score
class LogisticRegressionAnalyzer:
"""
逻辑回归分析器
"""
def __init__(self):
self.model = None
self.scaler = None
def sigmoid(self, z):
"""
Sigmoid函数
"""
return 1 / (1 + np.exp(-np.clip(z, -250, 250)))
def plot_sigmoid(self):
"""
可视化sigmoid函数
"""
z = np.linspace(-10, 10, 100)
y = self.sigmoid(z)
plt.figure(figsize=(8, 6))
plt.plot(z, y, 'b-', linewidth=2, label='Sigmoid函数')
plt.axhline(y=0.5, color='r', linestyle='--', alpha=0.7, label='决策边界')
plt.axvline(x=0, color='r', linestyle='--', alpha=0.7)
plt.xlabel('z = β₀ + β₁x₁ + ... + βₙxₙ')
plt.ylabel('P(y=1|x)')
plt.title('Sigmoid函数')
plt.grid(True, alpha=0.3)
plt.legend()
plt.show()
def train_and_evaluate(self, X_train, y_train, X_test, y_test):
"""
训练和评估逻辑回归模型
"""
# 标准化特征
self.scaler = StandardScaler()
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)
# 训练模型
self.model = LogisticRegression(random_state=42)
self.model.fit(X_train_scaled, y_train)
# 预测
y_pred = self.model.predict(X_test_scaled)
y_pred_proba = self.model.predict_proba(X_test_scaled)[:, 1]
# 评估
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
auc_score = roc_auc_score(y_test, y_pred_proba)
print("逻辑回归模型评估:")
print(f"准确率: {accuracy:.4f}")
print(f"精确率: {precision:.4f}")
print(f"召回率: {recall:.4f}")
print(f"F1分数: {f1:.4f}")
print(f"AUC分数: {auc_score:.4f}")
return {
'y_pred': y_pred,
'y_pred_proba': y_pred_proba,
'metrics': {
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1': f1,
'auc': auc_score
}
}
def plot_decision_boundary(self, X, y, resolution=0.02):
"""
绘制决策边界(仅适用于2D数据)
"""
if X.shape[1] != 2:
print("决策边界可视化仅支持2维特征")
return
# 标准化数据
X_scaled = self.scaler.transform(X)
# 创建网格
x_min, x_max = X_scaled[:, 0].min() - 1, X_scaled[:, 0].max() + 1
y_min, y_max = X_scaled[:, 1].min() - 1, X_scaled[:, 1].max() + 1
xx, yy = np.meshgrid(np.arange(x_min, x_max, resolution),
np.arange(y_min, y_max, resolution))
# 预测网格点
grid_points = np.c_[xx.ravel(), yy.ravel()]
Z = self.model.predict_proba(grid_points)[:, 1]
Z = Z.reshape(xx.shape)
# 绘制
plt.figure(figsize=(10, 8))
# 绘制概率等高线
contour = plt.contourf(xx, yy, Z, levels=50, alpha=0.6, cmap='RdYlBu')
plt.colorbar(contour, label='P(y=1)')
# 绘制决策边界
plt.contour(xx, yy, Z, levels=[0.5], colors='black', linestyles='--', linewidths=2)
# 绘制数据点
scatter = plt.scatter(X_scaled[:, 0], X_scaled[:, 1], c=y,
cmap='RdYlBu', edgecolors='black', alpha=0.8)
plt.xlabel('特征1 (标准化)')
plt.ylabel('特征2 (标准化)')
plt.title('逻辑回归决策边界')
plt.show()
def plot_roc_and_pr_curves(self, y_test, y_pred_proba):
"""
绘制ROC曲线和PR曲线
"""
# 计算ROC曲线
fpr, tpr, roc_thresholds = roc_curve(y_test, y_pred_proba)
roc_auc = auc(fpr, tpr)
# 计算PR曲线
precision, recall, pr_thresholds = precision_recall_curve(y_test, y_pred_proba)
avg_precision = average_precision_score(y_test, y_pred_proba)
# 绘制
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
# ROC曲线
ax1.plot(fpr, tpr, 'b-', linewidth=2, label=f'ROC曲线 (AUC = {roc_auc:.3f})')
ax1.plot([0, 1], [0, 1], 'r--', linewidth=1, label='随机分类器')
ax1.set_xlabel('假正率 (FPR)')
ax1.set_ylabel('真正率 (TPR)')
ax1.set_title('ROC曲线')
ax1.legend()
ax1.grid(True, alpha=0.3)
# PR曲线
ax2.plot(recall, precision, 'g-', linewidth=2,
label=f'PR曲线 (AP = {avg_precision:.3f})')
ax2.axhline(y=y_test.mean(), color='r', linestyle='--', linewidth=1,
label=f'随机分类器 (AP = {y_test.mean():.3f})')
ax2.set_xlabel('召回率 (Recall)')
ax2.set_ylabel('精确率 (Precision)')
ax2.set_title('精确率-召回率曲线')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return roc_auc, avg_precision
# 演示逻辑回归
logistic_analyzer = LogisticRegressionAnalyzer()
# 可视化sigmoid函数
logistic_analyzer.plot_sigmoid()
# 使用之前创建的分类数据
X_train_clf, X_test_clf, y_train_clf, y_test_clf = train_test_split(
X_classification, y_classification, test_size=0.2, random_state=42, stratify=y_classification
)
# 训练和评估
results = logistic_analyzer.train_and_evaluate(
X_train_clf, y_train_clf, X_test_clf, y_test_clf
)
# 绘制决策边界
logistic_analyzer.plot_decision_boundary(X_classification, y_classification)
# 绘制ROC和PR曲线
roc_auc, avg_precision = logistic_analyzer.plot_roc_and_pr_curves(
y_test_clf, results['y_pred_proba']
)
# 混淆矩阵
from sklearn.metrics import confusion_matrix
cm = confusion_matrix(y_test_clf, results['y_pred'])
plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=['类别0', '类别1'],
yticklabels=['类别0', '类别1'])
plt.title('混淆矩阵')
plt.ylabel('真实标签')
plt.xlabel('预测标签')
plt.show()
print(f"\n混淆矩阵:")
print(f"真负例: {cm[0,0]}, 假正例: {cm[0,1]}")
print(f"假负例: {cm[1,0]}, 真正例: {cm[1,1]}")
3.3.2 多类分类
逻辑回归可以扩展到多类分类问题。
from sklearn.datasets import make_classification
from sklearn.multiclass import OneVsRestClassifier, OneVsOneClassifier
class MultiClassLogisticRegression:
"""
多类逻辑回归
"""
def __init__(self):
self.models = {}
self.scaler = None
def create_multiclass_data(self, n_samples=1000, n_classes=3):
"""
创建多类分类数据
"""
X, y = make_classification(
n_samples=n_samples,
n_features=2,
n_redundant=0,
n_informative=2,
n_clusters_per_class=1,
n_classes=n_classes,
random_state=42
)
return X, y
def train_multiclass_models(self, X_train, y_train, X_test, y_test):
"""
训练多种多类分类策略
"""
# 标准化
self.scaler = StandardScaler()
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)
# 定义模型
models = {
'Multinomial': LogisticRegression(multi_class='multinomial', solver='lbfgs', random_state=42),
'One-vs-Rest': OneVsRestClassifier(LogisticRegression(random_state=42)),
'One-vs-One': OneVsOneClassifier(LogisticRegression(random_state=42))
}
results = {}
for name, model in models.items():
print(f"\n训练 {name} 模型...")
# 训练
model.fit(X_train_scaled, y_train)
# 预测
y_pred = model.predict(X_test_scaled)
y_pred_proba = model.predict_proba(X_test_scaled)
# 评估
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average='weighted')
recall = recall_score(y_test, y_pred, average='weighted')
f1 = f1_score(y_test, y_pred, average='weighted')
print(f"准确率: {accuracy:.4f}")
print(f"精确率: {precision:.4f}")
print(f"召回率: {recall:.4f}")
print(f"F1分数: {f1:.4f}")
self.models[name] = model
results[name] = {
'y_pred': y_pred,
'y_pred_proba': y_pred_proba,
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1': f1
}
return results, X_train_scaled, X_test_scaled
def plot_multiclass_decision_boundaries(self, X, y, resolution=0.02):
"""
绘制多类决策边界
"""
X_scaled = self.scaler.transform(X)
# 创建网格
x_min, x_max = X_scaled[:, 0].min() - 1, X_scaled[:, 0].max() + 1
y_min, y_max = X_scaled[:, 1].min() - 1, X_scaled[:, 1].max() + 1
xx, yy = np.meshgrid(np.arange(x_min, x_max, resolution),
np.arange(y_min, y_max, resolution))
grid_points = np.c_[xx.ravel(), yy.ravel()]
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
for i, (name, model) in enumerate(self.models.items()):
ax = axes[i]
# 预测网格点
Z = model.predict(grid_points)
Z = Z.reshape(xx.shape)
# 绘制决策区域
ax.contourf(xx, yy, Z, alpha=0.6, cmap='viridis')
# 绘制数据点
scatter = ax.scatter(X_scaled[:, 0], X_scaled[:, 1], c=y,
cmap='viridis', edgecolors='black', alpha=0.8)
ax.set_xlabel('特征1 (标准化)')
ax.set_ylabel('特征2 (标准化)')
ax.set_title(f'{name} 决策边界')
plt.tight_layout()
plt.show()
def plot_confusion_matrices(self, y_test, results):
"""
绘制混淆矩阵
"""
fig, axes = plt.subplots(1, 3, figsize=(15, 4))
for i, (name, result) in enumerate(results.items()):
ax = axes[i]
cm = confusion_matrix(y_test, result['y_pred'])
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=ax,
xticklabels=[f'类别{i}' for i in range(len(np.unique(y_test)))],
yticklabels=[f'类别{i}' for i in range(len(np.unique(y_test)))])
ax.set_title(f'{name}\n准确率: {result["accuracy"]:.3f}')
ax.set_ylabel('真实标签')
ax.set_xlabel('预测标签')
plt.tight_layout()
plt.show()
# 演示多类逻辑回归
multi_clf = MultiClassLogisticRegression()
# 创建多类数据
X_multi, y_multi = multi_clf.create_multiclass_data(n_samples=800, n_classes=3)
print(f"多类数据形状: {X_multi.shape}")
print(f"类别分布: {np.bincount(y_multi)}")
# 分割数据
X_train_multi, X_test_multi, y_train_multi, y_test_multi = train_test_split(
X_multi, y_multi, test_size=0.2, random_state=42, stratify=y_multi
)
# 训练模型
print("多类逻辑回归模型比较:")
print("=" * 40)
multi_results, X_train_multi_scaled, X_test_multi_scaled = multi_clf.train_multiclass_models(
X_train_multi, y_train_multi, X_test_multi, y_test_multi
)
# 绘制决策边界
multi_clf.plot_multiclass_decision_boundaries(X_multi, y_multi)
# 绘制混淆矩阵
multi_clf.plot_confusion_matrices(y_test_multi, multi_results)
# 详细分类报告
print("\n详细分类报告:")
for name, result in multi_results.items():
print(f"\n{name}:")
print(classification_report(y_test_multi, result['y_pred'],
target_names=[f'类别{i}' for i in range(3)]))
3.4 决策树
3.4.1 决策树原理
决策树是一种基于树结构的监督学习算法,通过一系列if-else条件来进行决策。
”`python from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor from sklearn.tree import plot_tree, export_text from sklearn import tree import graphviz
class DecisionTreeAnalyzer: “”” 决策树分析器 “”” def init(self): self.models = {} self.feature_names = None
def information_gain_demo(self):
"""
演示信息增益计算
"""
def entropy(y):
"""计算熵"""
if len(y) == 0:
return 0
_, counts = np.unique(y, return_counts=True)
probabilities = counts / len(y)
return -np.sum(probabilities * np.log2(probabilities + 1e-10))
def gini_impurity(y):
"""计算基尼不纯度"""
if len(y) == 0:
return 0
_, counts = np.unique(y, return_counts=True)
probabilities = counts / len(y)
return 1 - np.sum(probabilities ** 2)
# 示例数据
y_example = np.array([0, 0, 1, 1, 1, 0, 1, 0, 1, 1])
print("决策树分裂准则演示:")
print("=" * 30)
print(f"示例标签: {y_example}")
print(f"熵: {entropy(y_example):.4f}")
print(f"基尼不纯度: {gini_impurity(y_example):.4f}")
# 模拟分裂
left_split = y_example[:5] # [0, 0, 1, 1, 1]
right_split = y_example[5:] # [0, 1, 0, 1, 1]
print(f"\n分裂后:")
print(f"左子树: {left_split}, 熵: {entropy(left_split):.4f}, 基尼: {gini_impurity(left_split):.4f}")
print(f"右子树: {right_split}, 熵: {entropy(right_split):.4f}, 基尼: {gini_impurity(right_split):.4f}")
# 计算信息增益
original_entropy = entropy(y_example)
weighted_entropy = (len(left_split) * entropy(left_split) +
len(right_split) * entropy(right_split)) / len(y_example)
information_gain = original_entropy - weighted_entropy
print(f"\n信息增益: {information_gain:.4f}")
return entropy, gini_impurity
def train_decision_trees(self, X_train, y_train, X_test, y_test, problem_type='classification'):
"""
训练不同参数的决策树
"""
if problem_type == 'classification':
models = {
'Default': DecisionTreeClassifier(random_state=42),
'Max Depth 3': DecisionTreeClassifier(max_depth=3, random_state=42),
'Min Samples Split 20': DecisionTreeClassifier(min_samples_split=20, random_state=42),
'Min Samples Leaf 10': DecisionTreeClassifier(min_samples_leaf=10, random_state=42),
'Pruned': DecisionTreeClassifier(max_depth=5, min_samples_split=10,
min_samples_leaf=5, random_state=42)
}
else:
models = {
'Default': DecisionTreeRegressor(random_state=42),
'Max Depth 3': DecisionTreeRegressor(max_depth=3, random_state=42),
'Min Samples Split 20': DecisionTreeRegressor(min_samples_split=20, random_state=42),
'Min Samples Leaf 10': DecisionTreeRegressor(min_samples_leaf=10, random_state=42),
'Pruned': DecisionTreeRegressor(max_depth=5, min_samples_split=10,
min_samples_leaf=5, random_state=42)
}
results = {}
for name, model in models.items():
# 训练
model.fit(X_train, y_train)
# 预测
y_pred_train = model.predict(X_train)
y_pred_test = model.predict(X_test)
if problem_type == 'classification':
train_score = accuracy_score(y_train, y_pred_train)
test_score = accuracy_score(y_test, y_pred_test)
metric_name = '准确率'
else:
train_score = r2_score(y_train, y_pred_train)
test_score = r2_score(y_test, y_pred_test)
metric_name = 'R²分数'
print(f"{name}:")
print(f" 训练{metric_name}: {train_score:.4f}")
print(f" 测试{metric_name}: {test_score:.4f}")
print(f" 树深度: {model.get_depth()}")
print(f" 叶子节点数: {model.get_n_leaves()}")
print()
self.models[name] = model
results[name] = {
'train_score': train_score,
'test_score': test_score,
'depth': model.get_depth(),
'n_leaves': model.get_n_leaves(),
'y_pred_test': y_pred_test
}
return results
def visualize_tree(self, model_name='Pruned', feature_names=None, class_names=None, max_depth=3):
"""
可视化决策树
"""
if model_name not in self.models:
print(f"模型 {model_name} 不存在")
return
model = self.models[model_name]
# 文本表示
print(f"\n{model_name} 决策树文本表示:")
print("=" * 40)
tree_rules = export_text(model, feature_names=feature_names, max_depth=max_depth)
print(tree_rules)
# 图形可视化
plt.figure(figsize=(15, 10))
plot_tree(model,
feature_names=feature_names,
class_names=class_names,
filled=True,
rounded=True,
fontsize=10,
max_depth=max_depth)
plt.title(f'{model_name} 决策树可视化')
plt.show()
def feature_importance_analysis(self, feature_names=None):
"""
特征重要性分析
"""
if not self.models:
print("没有训练好的模型")
return
# 收集所有模型的特征重要性
importance_data = []
for name, model in self.models.items():
if hasattr(model, 'feature_importances_'):
importance_data.append({
'model': name,
'importances': model.feature_importances_
})
if not importance_data:
print("模型没有特征重要性信息")
return
# 可视化
n_features = len(importance_data[0]['importances'])
feature_names = feature_names or [f'特征{i+1}' for i in range(n_features)]
fig, ax = plt.subplots(figsize=(10, 6))
x = np.arange(n_features)
width = 0.15
for i, data in enumerate(importance_data):
ax.bar(x + i * width, data['importances'], width,
label=data['model'], alpha=0.8)
ax.set_xlabel('特征')
ax.set_ylabel('重要性')
ax.set_title('决策树特征重要性比较')
ax.set_xticks(x + width * (len(importance_data) - 1) / 2)
ax.set_xticklabels(feature_names, rotation=45)
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return importance_data
def plot_tree_complexity(self, results):
"""
绘制树复杂度与性能的关系
"""
models = list(results.keys())
train_scores = [results[model]['train_score'] for model in models]
test_scores = [results[model]['test_score'] for model in models]
depths = [results[model]['depth'] for model in models]
n_leaves = [results[model]['n_leaves'] for model in models]
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
# 深度 vs 性能
ax1.scatter(depths, train_scores, label='训练集', alpha=0.7, s=60)
ax1.scatter(depths, test_scores, label='测试集', alpha=0.7, s=60)
for i, model in enumerate(models):
ax1.annotate(model, (depths[i], test_scores[i]),
xytext=(5, 5), textcoords='offset points', fontsize=8)
ax1.set_xlabel('树深度')
ax1.set_ylabel('性能分数')
ax1.set_title('树深度 vs 性能')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 叶子节点数 vs 性能
ax2.scatter(n_leaves, train_scores, label='训练集', alpha=0.7, s=60)
ax2.scatter(n_leaves, test_scores, label='测试集', alpha=0.7, s=60)
for i, model in enumerate(models):
ax2.annotate(model, (n_leaves[i], test_scores[i]),
xytext=(5, 5), textcoords='offset points', fontsize=8)
ax2.set_xlabel('叶子节点数')
ax2.set_ylabel('性能分数')
ax2.set_title('叶子节点数 vs 性能')
ax2.legend()
ax2.grid(True, alpha=