3.1 监督学习概述

3.1.1 什么是监督学习

监督学习是机器学习的一个重要分支,它使用带有标签的训练数据来学习输入和输出之间的映射关系。监督学习的目标是构建一个模型,能够对新的、未见过的数据进行准确的预测。

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.metrics import classification_report, confusion_matrix
import warnings
warnings.filterwarnings('ignore')

# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

print("监督学习算法章节开始")
print("本章将介绍主要的监督学习算法及其应用")

3.1.2 监督学习的类型

监督学习主要分为两大类:

  1. 回归(Regression):预测连续数值

    • 房价预测
    • 股票价格预测
    • 销售额预测
  2. 分类(Classification):预测离散类别

    • 邮件垃圾分类
    • 图像识别
    • 疾病诊断
# 创建示例数据集
np.random.seed(42)

# 回归示例数据
n_samples = 1000
X_regression = np.random.randn(n_samples, 2)
y_regression = 3 * X_regression[:, 0] + 2 * X_regression[:, 1] + np.random.randn(n_samples) * 0.5

# 分类示例数据
from sklearn.datasets import make_classification
X_classification, y_classification = make_classification(
    n_samples=1000, n_features=2, n_redundant=0, n_informative=2,
    random_state=42, n_clusters_per_class=1
)

# 可视化数据
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

# 回归数据可视化
ax1.scatter(X_regression[:, 0], y_regression, alpha=0.6)
ax1.set_xlabel('特征1')
ax1.set_ylabel('目标值')
ax1.set_title('回归问题示例')

# 分类数据可视化
scatter = ax2.scatter(X_classification[:, 0], X_classification[:, 1], 
                     c=y_classification, cmap='viridis', alpha=0.6)
ax2.set_xlabel('特征1')
ax2.set_ylabel('特征2')
ax2.set_title('分类问题示例')
plt.colorbar(scatter, ax=ax2)

plt.tight_layout()
plt.show()

print(f"回归数据形状: {X_regression.shape}, 目标变量范围: [{y_regression.min():.2f}, {y_regression.max():.2f}]")
print(f"分类数据形状: {X_classification.shape}, 类别数量: {len(np.unique(y_classification))}")

3.1.3 监督学习的工作流程

def supervised_learning_workflow(X, y, problem_type='classification'):
    """
    监督学习标准工作流程
    """
    print("监督学习工作流程:")
    print("=" * 40)
    
    # 1. 数据分割
    print("1. 数据分割...")
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y if problem_type == 'classification' else None
    )
    print(f"   训练集大小: {X_train.shape[0]}")
    print(f"   测试集大小: {X_test.shape[0]}")
    
    # 2. 特征缩放
    print("\n2. 特征缩放...")
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    print("   完成标准化")
    
    # 3. 模型选择和训练
    print("\n3. 模型训练...")
    if problem_type == 'classification':
        from sklearn.linear_model import LogisticRegression
        model = LogisticRegression(random_state=42)
        model.fit(X_train_scaled, y_train)
        
        # 预测
        y_pred = model.predict(X_test_scaled)
        
        # 评估
        accuracy = accuracy_score(y_test, y_pred)
        print(f"   模型准确率: {accuracy:.4f}")
        
    else:  # regression
        from sklearn.linear_model import LinearRegression
        model = LinearRegression()
        model.fit(X_train_scaled, y_train)
        
        # 预测
        y_pred = model.predict(X_test_scaled)
        
        # 评估
        mse = mean_squared_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)
        print(f"   均方误差: {mse:.4f}")
        print(f"   R²分数: {r2:.4f}")
    
    return model, scaler, (X_train_scaled, X_test_scaled, y_train, y_test, y_pred)

# 演示工作流程
print("分类问题工作流程:")
clf_model, clf_scaler, clf_results = supervised_learning_workflow(
    X_classification, y_classification, 'classification'
)

print("\n" + "="*50)
print("回归问题工作流程:")
reg_model, reg_scaler, reg_results = supervised_learning_workflow(
    X_regression, y_regression, 'regression'
)

3.2 线性回归

3.2.1 线性回归原理

线性回归是最基础的回归算法,它假设目标变量与特征之间存在线性关系。

数学公式: $$y = \beta_0 + \beta_1 x_1 + \beta_2 x_2 + … + \beta_n x_n + \epsilon$$

其中: - $y$ 是目标变量 - $x_i$ 是特征变量 - $\beta_i$ 是回归系数 - $\epsilon$ 是误差项

from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.preprocessing import PolynomialFeatures
from sklearn.pipeline import Pipeline

class LinearRegressionAnalyzer:
    """
    线性回归分析器
    """
    def __init__(self):
        self.models = {}
        self.results = {}
    
    def fit_models(self, X_train, y_train, X_test, y_test):
        """
        训练多种线性回归模型
        """
        # 定义模型
        models = {
            'Linear Regression': LinearRegression(),
            'Ridge Regression': Ridge(alpha=1.0),
            'Lasso Regression': Lasso(alpha=1.0),
            'ElasticNet': ElasticNet(alpha=1.0, l1_ratio=0.5)
        }
        
        print("训练线性回归模型:")
        print("-" * 30)
        
        for name, model in models.items():
            # 训练模型
            model.fit(X_train, y_train)
            
            # 预测
            y_pred_train = model.predict(X_train)
            y_pred_test = model.predict(X_test)
            
            # 评估
            train_mse = mean_squared_error(y_train, y_pred_train)
            test_mse = mean_squared_error(y_test, y_pred_test)
            train_r2 = r2_score(y_train, y_pred_train)
            test_r2 = r2_score(y_test, y_pred_test)
            
            # 保存结果
            self.models[name] = model
            self.results[name] = {
                'train_mse': train_mse,
                'test_mse': test_mse,
                'train_r2': train_r2,
                'test_r2': test_r2,
                'y_pred_test': y_pred_test
            }
            
            print(f"{name}:")
            print(f"  训练MSE: {train_mse:.4f}, R²: {train_r2:.4f}")
            print(f"  测试MSE: {test_mse:.4f}, R²: {test_r2:.4f}")
            print()
    
    def plot_results(self, X_test, y_test):
        """
        可视化结果
        """
        fig, axes = plt.subplots(2, 2, figsize=(12, 10))
        axes = axes.ravel()
        
        for i, (name, results) in enumerate(self.results.items()):
            ax = axes[i]
            
            # 真实值 vs 预测值
            ax.scatter(y_test, results['y_pred_test'], alpha=0.6)
            
            # 完美预测线
            min_val = min(y_test.min(), results['y_pred_test'].min())
            max_val = max(y_test.max(), results['y_pred_test'].max())
            ax.plot([min_val, max_val], [min_val, max_val], 'r--', lw=2)
            
            ax.set_xlabel('真实值')
            ax.set_ylabel('预测值')
            ax.set_title(f'{name}\nR² = {results["test_r2"]:.4f}')
            
        plt.tight_layout()
        plt.show()
    
    def compare_coefficients(self):
        """
        比较不同模型的系数
        """
        coef_data = []
        
        for name, model in self.models.items():
            if hasattr(model, 'coef_'):
                coef_data.append({
                    'Model': name,
                    'Intercept': model.intercept_,
                    'Coefficients': model.coef_
                })
        
        # 可视化系数
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
        
        # 截距比较
        models = [item['Model'] for item in coef_data]
        intercepts = [item['Intercept'] for item in coef_data]
        
        ax1.bar(models, intercepts)
        ax1.set_title('模型截距比较')
        ax1.set_ylabel('截距值')
        plt.setp(ax1.get_xticklabels(), rotation=45)
        
        # 系数比较
        for i, item in enumerate(coef_data):
            ax2.plot(item['Coefficients'], 'o-', label=item['Model'], markersize=6)
        
        ax2.set_title('模型系数比较')
        ax2.set_xlabel('特征索引')
        ax2.set_ylabel('系数值')
        ax2.legend()
        ax2.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()

# 演示决策树
tree_analyzer = DecisionTreeAnalyzer()

# 信息增益演示
entropy_func, gini_func = tree_analyzer.information_gain_demo()

# 使用鸢尾花数据集进行分类
from sklearn.datasets import load_iris
iris = load_iris()
X_iris, y_iris = iris.data, iris.target

print("\n鸢尾花数据集决策树分析:")
print("=" * 40)

# 分割数据
X_train_iris, X_test_iris, y_train_iris, y_test_iris = train_test_split(
    X_iris, y_iris, test_size=0.2, random_state=42, stratify=y_iris
)

# 训练决策树
iris_results = tree_analyzer.train_decision_trees(
    X_train_iris, y_train_iris, X_test_iris, y_test_iris, 'classification'
)

# 可视化决策树
tree_analyzer.visualize_tree(
    'Pruned', 
    feature_names=iris.feature_names,
    class_names=iris.target_names,
    max_depth=3
)

# 特征重要性分析
importance_data = tree_analyzer.feature_importance_analysis(iris.feature_names)

# 绘制复杂度分析
tree_analyzer.plot_tree_complexity(iris_results)

3.4.2 决策树回归

决策树也可以用于回归问题,通过预测叶子节点中目标值的平均值。

class DecisionTreeRegressionDemo:
    """
    决策树回归演示
    """
    def __init__(self):
        self.model = None
    
    def create_regression_data(self, n_samples=200):
        """
        创建回归数据
        """
        np.random.seed(42)
        X = np.linspace(0, 10, n_samples).reshape(-1, 1)
        y = np.sin(X.ravel()) + 0.1 * np.random.randn(n_samples)
        return X, y
    
    def compare_tree_depths(self, X, y):
        """
        比较不同深度的决策树回归
        """
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )
        
        depths = [1, 3, 5, 10, None]
        
        fig, axes = plt.subplots(2, 3, figsize=(15, 10))
        axes = axes.ravel()
        
        # 用于绘图的密集点
        X_plot = np.linspace(0, 10, 300).reshape(-1, 1)
        
        for i, depth in enumerate(depths):
            if i >= len(axes):
                break
                
            ax = axes[i]
            
            # 训练模型
            model = DecisionTreeRegressor(max_depth=depth, random_state=42)
            model.fit(X_train, y_train)
            
            # 预测
            y_pred_plot = model.predict(X_plot)
            y_pred_test = model.predict(X_test)
            
            # 评估
            train_score = model.score(X_train, y_train)
            test_score = model.score(X_test, y_test)
            
            # 绘制
            ax.scatter(X_train, y_train, alpha=0.6, label='训练数据')
            ax.scatter(X_test, y_test, alpha=0.6, color='red', label='测试数据')
            ax.plot(X_plot, y_pred_plot, color='green', linewidth=2, 
                   label=f'决策树 (深度={depth})')
            
            ax.set_xlabel('X')
            ax.set_ylabel('y')
            ax.set_title(f'深度={depth}\n训练R²={train_score:.3f}, 测试R²={test_score:.3f}')
            ax.legend()
            ax.grid(True, alpha=0.3)
        
        # 隐藏多余的子图
        for i in range(len(depths), len(axes)):
            axes[i].set_visible(False)
        
        plt.tight_layout()
        plt.show()
    
    def pruning_demonstration(self, X, y):
        """
        剪枝演示
        """
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )
        
        # 训练完整的树
        full_tree = DecisionTreeRegressor(random_state=42)
        full_tree.fit(X_train, y_train)
        
        # 获取剪枝路径
        path = full_tree.cost_complexity_pruning_path(X_train, y_train)
        ccp_alphas, impurities = path.ccp_alphas, path.impurities
        
        # 训练不同alpha值的树
        clfs = []
        for ccp_alpha in ccp_alphas:
            clf = DecisionTreeRegressor(random_state=42, ccp_alpha=ccp_alpha)
            clf.fit(X_train, y_train)
            clfs.append(clf)
        
        # 移除最后一个(只有根节点的树)
        clfs = clfs[:-1]
        ccp_alphas = ccp_alphas[:-1]
        
        # 计算训练和测试分数
        train_scores = [clf.score(X_train, y_train) for clf in clfs]
        test_scores = [clf.score(X_test, y_test) for clf in clfs]
        
        # 可视化剪枝效果
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
        
        # 剪枝路径
        ax1.plot(ccp_alphas, train_scores, marker='o', label='训练集', drawstyle="steps-post")
        ax1.plot(ccp_alphas, test_scores, marker='o', label='测试集', drawstyle="steps-post")
        ax1.set_xlabel('Alpha')
        ax1.set_ylabel('R² Score')
        ax1.set_title('剪枝路径')
        ax1.legend()
        ax1.grid(True, alpha=0.3)
        
        # 节点数量变化
        node_counts = [clf.tree_.node_count for clf in clfs]
        ax2.plot(ccp_alphas, node_counts, marker='o', drawstyle="steps-post")
        ax2.set_xlabel('Alpha')
        ax2.set_ylabel('节点数量')
        ax2.set_title('剪枝后节点数量')
        ax2.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        # 找到最佳alpha
        best_alpha_idx = np.argmax(test_scores)
        best_alpha = ccp_alphas[best_alpha_idx]
        best_score = test_scores[best_alpha_idx]
        
        print(f"最佳alpha: {best_alpha:.6f}")
        print(f"最佳测试分数: {best_score:.4f}")
        
        return best_alpha, clfs[best_alpha_idx]

# 演示决策树回归
reg_demo = DecisionTreeRegressionDemo()

# 创建回归数据
X_reg_tree, y_reg_tree = reg_demo.create_regression_data()

print("\n决策树回归演示:")
print("=" * 30)

# 比较不同深度
reg_demo.compare_tree_depths(X_reg_tree, y_reg_tree)

# 剪枝演示
best_alpha, best_model = reg_demo.pruning_demonstration(X_reg_tree, y_reg_tree)

3.5 随机森林

3.5.1 随机森林原理

随机森林是一种集成学习方法,通过构建多个决策树并结合它们的预测来提高性能。

from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.ensemble import ExtraTreesClassifier, ExtraTreesRegressor

class RandomForestAnalyzer:
    """
    随机森林分析器
    """
    def __init__(self):
        self.models = {}
        self.feature_names = None
    
    def demonstrate_bootstrap_sampling(self, X, y, n_samples=100):
        """
        演示Bootstrap采样
        """
        print("Bootstrap采样演示:")
        print("=" * 25)
        
        original_indices = np.arange(len(X))
        
        # 生成几个bootstrap样本
        fig, axes = plt.subplots(2, 2, figsize=(12, 8))
        axes = axes.ravel()
        
        for i in range(4):
            # Bootstrap采样
            bootstrap_indices = np.random.choice(len(X), size=len(X), replace=True)
            unique_indices, counts = np.unique(bootstrap_indices, return_counts=True)
            
            ax = axes[i]
            
            # 可视化采样分布
            ax.bar(unique_indices, counts, alpha=0.7)
            ax.set_xlabel('原始样本索引')
            ax.set_ylabel('采样次数')
            ax.set_title(f'Bootstrap样本 {i+1}\n唯一样本: {len(unique_indices)}/{len(X)}')
            ax.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        # 统计信息
        bootstrap_stats = []
        for _ in range(1000):
            bootstrap_indices = np.random.choice(len(X), size=len(X), replace=True)
            unique_count = len(np.unique(bootstrap_indices))
            bootstrap_stats.append(unique_count / len(X))
        
        print(f"1000次Bootstrap采样统计:")
        print(f"平均唯一样本比例: {np.mean(bootstrap_stats):.3f}")
        print(f"标准差: {np.std(bootstrap_stats):.3f}")
        print(f"理论值 (1-1/e): {1-1/np.e:.3f}")
    
    def train_random_forest_models(self, X_train, y_train, X_test, y_test, problem_type='classification'):
        """
        训练随机森林模型
        """
        if problem_type == 'classification':
            models = {
                'Random Forest (默认)': RandomForestClassifier(random_state=42),
                'Random Forest (100棵树)': RandomForestClassifier(n_estimators=100, random_state=42),
                'Random Forest (深度限制)': RandomForestClassifier(n_estimators=100, max_depth=5, random_state=42),
                'Extra Trees': ExtraTreesClassifier(n_estimators=100, random_state=42),
                'Random Forest (调优)': RandomForestClassifier(
                    n_estimators=200, max_depth=10, min_samples_split=5, 
                    min_samples_leaf=2, random_state=42
                )
            }
        else:
            models = {
                'Random Forest (默认)': RandomForestRegressor(random_state=42),
                'Random Forest (100棵树)': RandomForestRegressor(n_estimators=100, random_state=42),
                'Random Forest (深度限制)': RandomForestRegressor(n_estimators=100, max_depth=5, random_state=42),
                'Extra Trees': ExtraTreesRegressor(n_estimators=100, random_state=42),
                'Random Forest (调优)': RandomForestRegressor(
                    n_estimators=200, max_depth=10, min_samples_split=5, 
                    min_samples_leaf=2, random_state=42
                )
            }
        
        results = {}
        
        for name, model in models.items():
            print(f"\n训练 {name}...")
            
            # 训练
            model.fit(X_train, y_train)
            
            # 预测
            y_pred_train = model.predict(X_train)
            y_pred_test = model.predict(X_test)
            
            if problem_type == 'classification':
                train_score = accuracy_score(y_train, y_pred_train)
                test_score = accuracy_score(y_test, y_pred_test)
                metric_name = '准确率'
            else:
                train_score = r2_score(y_train, y_pred_train)
                test_score = r2_score(y_test, y_pred_test)
                metric_name = 'R²分数'
            
            print(f"  训练{metric_name}: {train_score:.4f}")
            print(f"  测试{metric_name}: {test_score:.4f}")
            
            self.models[name] = model
            results[name] = {
                'train_score': train_score,
                'test_score': test_score,
                'y_pred_test': y_pred_test
            }
        
        return results
    
    def analyze_feature_importance(self, feature_names=None):
        """
        分析特征重要性
        """
        if not self.models:
            print("没有训练好的模型")
            return
        
        # 收集特征重要性
        importance_data = {}
        
        for name, model in self.models.items():
            if hasattr(model, 'feature_importances_'):
                importance_data[name] = model.feature_importances_
        
        if not importance_data:
            return
        
        # 创建DataFrame便于分析
        n_features = len(list(importance_data.values())[0])
        feature_names = feature_names or [f'特征{i+1}' for i in range(n_features)]
        
        importance_df = pd.DataFrame(importance_data, index=feature_names)
        
        # 可视化
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
        
        # 特征重要性热图
        sns.heatmap(importance_df.T, annot=True, fmt='.3f', cmap='viridis', ax=ax1)
        ax1.set_title('特征重要性热图')
        ax1.set_xlabel('特征')
        ax1.set_ylabel('模型')
        
        # 平均特征重要性
        mean_importance = importance_df.mean(axis=1).sort_values(ascending=True)
        mean_importance.plot(kind='barh', ax=ax2)
        ax2.set_title('平均特征重要性')
        ax2.set_xlabel('重要性')
        
        plt.tight_layout()
        plt.show()
        
        return importance_df
    
    def plot_learning_curves_trees(self, X_train, y_train, X_test, y_test, problem_type='classification'):
        """
        绘制树数量的学习曲线
        """
        n_estimators_range = range(10, 201, 20)
        train_scores = []
        test_scores = []
        
        for n_estimators in n_estimators_range:
            if problem_type == 'classification':
                model = RandomForestClassifier(n_estimators=n_estimators, random_state=42)
                model.fit(X_train, y_train)
                train_score = model.score(X_train, y_train)
                test_score = model.score(X_test, y_test)
            else:
                model = RandomForestRegressor(n_estimators=n_estimators, random_state=42)
                model.fit(X_train, y_train)
                train_score = model.score(X_train, y_train)
                test_score = model.score(X_test, y_test)
            
            train_scores.append(train_score)
            test_scores.append(test_score)
        
        plt.figure(figsize=(10, 6))
        plt.plot(n_estimators_range, train_scores, 'o-', label='训练集', linewidth=2)
        plt.plot(n_estimators_range, test_scores, 'o-', label='测试集', linewidth=2)
        
        plt.xlabel('树的数量')
        plt.ylabel('性能分数')
        plt.title('随机森林:树数量 vs 性能')
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.show()
        
        return n_estimators_range, train_scores, test_scores

# 演示随机森林
rf_analyzer = RandomForestAnalyzer()

# Bootstrap采样演示
rf_analyzer.demonstrate_bootstrap_sampling(X_iris, y_iris)

print("\n随机森林分类演示:")
print("=" * 30)

# 训练随机森林分类模型
rf_clf_results = rf_analyzer.train_random_forest_models(
    X_train_iris, y_train_iris, X_test_iris, y_test_iris, 'classification'
)

# 特征重要性分析
importance_df = rf_analyzer.analyze_feature_importance(iris.feature_names)

# 学习曲线
n_est_range, train_scores, test_scores = rf_analyzer.plot_learning_curves_trees(
    X_train_iris, y_train_iris, X_test_iris, y_test_iris, 'classification'
)

print(f"\n最佳树数量: {n_est_range[np.argmax(test_scores)]}")
print(f"最佳测试分数: {max(test_scores):.4f}")

3.6 支持向量机 (SVM)

3.6.1 SVM原理

支持向量机是一种强大的监督学习算法,通过寻找最优分离超平面来进行分类和回归。

from sklearn.svm import SVC, SVR
from sklearn.svm import LinearSVC, LinearSVR

class SVMAnalyzer:
    """
    支持向量机分析器
    """
    def __init__(self):
        self.models = {}
        self.scaler = None
    
    def demonstrate_svm_concepts(self):
        """
        演示SVM核心概念
        """
        # 创建线性可分数据
        np.random.seed(42)
        X_linearly_separable = np.array([
            [1, 2], [2, 3], [3, 3], [2, 1], [3, 2],  # 类别0
            [6, 6], [7, 7], [8, 6], [7, 5], [8, 8]   # 类别1
        ])
        y_linearly_separable = np.array([0, 0, 0, 0, 0, 1, 1, 1, 1, 1])
        
        # 训练线性SVM
        svm_linear = SVC(kernel='linear', C=1.0)
        svm_linear.fit(X_linearly_separable, y_linearly_separable)
        
        # 可视化
        plt.figure(figsize=(12, 5))
        
        # 子图1:线性可分情况
        plt.subplot(1, 2, 1)
        
        # 绘制数据点
        colors = ['red', 'blue']
        for i in range(2):
            mask = y_linearly_separable == i
            plt.scatter(X_linearly_separable[mask, 0], X_linearly_separable[mask, 1], 
                       c=colors[i], label=f'类别{i}', s=100, alpha=0.8)
        
        # 绘制决策边界和支持向量
        ax = plt.gca()
        xlim = ax.get_xlim()
        ylim = ax.get_ylim()
        
        # 创建网格
        xx = np.linspace(xlim[0], xlim[1], 30)
        yy = np.linspace(ylim[0], ylim[1], 30)
        YY, XX = np.meshgrid(yy, xx)
        xy = np.vstack([XX.ravel(), YY.ravel()]).T
        Z = svm_linear.decision_function(xy).reshape(XX.shape)
        
        # 绘制决策边界和间隔
        ax.contour(XX, YY, Z, colors='k', levels=[-1, 0, 1], alpha=0.5, 
                  linestyles=['--', '-', '--'])
        
        # 标记支持向量
        ax.scatter(svm_linear.support_vectors_[:, 0], svm_linear.support_vectors_[:, 1], 
                  s=300, linewidth=1, facecolors='none', edgecolors='k', label='支持向量')
        
        plt.xlabel('特征1')
        plt.ylabel('特征2')
        plt.title('线性SVM - 线性可分数据')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # 子图2:非线性数据
        plt.subplot(1, 2, 2)
        
        # 创建非线性数据(圆形分布)
        from sklearn.datasets import make_circles
        X_circles, y_circles = make_circles(n_samples=100, noise=0.1, factor=0.3, random_state=42)
        
        # 训练RBF核SVM
        svm_rbf = SVC(kernel='rbf', C=1.0, gamma='scale')
        svm_rbf.fit(X_circles, y_circles)
        
        # 绘制数据点
        for i in range(2):
            mask = y_circles == i
            plt.scatter(X_circles[mask, 0], X_circles[mask, 1], 
                       c=colors[i], label=f'类别{i}', alpha=0.8)
        
        # 绘制决策边界
        ax = plt.gca()
        xlim = ax.get_xlim()
        ylim = ax.get_ylim()
        
        xx = np.linspace(xlim[0], xlim[1], 100)
        yy = np.linspace(ylim[0], ylim[1], 100)
        YY, XX = np.meshgrid(yy, xx)
        xy = np.vstack([XX.ravel(), YY.ravel()]).T
        Z = svm_rbf.decision_function(xy).reshape(XX.shape)
        
        ax.contour(XX, YY, Z, colors='k', levels=[0], alpha=0.5, linestyles=['-'])
        ax.contourf(XX, YY, Z, levels=50, alpha=0.3, cmap='RdYlBu')
        
        plt.xlabel('特征1')
        plt.ylabel('特征2')
        plt.title('RBF核SVM - 非线性数据')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        print("SVM核心概念:")
        print(f"线性SVM支持向量数量: {len(svm_linear.support_vectors_)}")
        print(f"RBF SVM支持向量数量: {len(svm_rbf.support_vectors_)}")
        
        return svm_linear, svm_rbf
    
    def train_svm_models(self, X_train, y_train, X_test, y_test, problem_type='classification'):
        """
        训练不同核函数的SVM模型
        """
        # 标准化数据
        self.scaler = StandardScaler()
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)
        
        if problem_type == 'classification':
            models = {
                'Linear SVM': SVC(kernel='linear', C=1.0, random_state=42),
                'RBF SVM': SVC(kernel='rbf', C=1.0, gamma='scale', random_state=42),
                'Polynomial SVM': SVC(kernel='poly', degree=3, C=1.0, random_state=42),
                'Sigmoid SVM': SVC(kernel='sigmoid', C=1.0, random_state=42)
            }
        else:
            models = {
                'Linear SVR': SVR(kernel='linear', C=1.0),
                'RBF SVR': SVR(kernel='rbf', C=1.0, gamma='scale'),
                'Polynomial SVR': SVR(kernel='poly', degree=3, C=1.0),
                'Sigmoid SVR': SVR(kernel='sigmoid', C=1.0)
            }
        
        results = {}
        
        for name, model in models.items():
            print(f"\n训练 {name}...")
            
            # 训练
            model.fit(X_train_scaled, y_train)
            
            # 预测
            y_pred_train = model.predict(X_train_scaled)
            y_pred_test = model.predict(X_test_scaled)
            
            if problem_type == 'classification':
                train_score = accuracy_score(y_train, y_pred_train)
                test_score = accuracy_score(y_test, y_pred_test)
                metric_name = '准确率'
            else:
                train_score = r2_score(y_train, y_pred_train)
                test_score = r2_score(y_test, y_pred_test)
                metric_name = 'R²分数'
            
            print(f"  训练{metric_name}: {train_score:.4f}")
            print(f"  测试{metric_name}: {test_score:.4f}")
            print(f"  支持向量数量: {len(model.support_vectors_)}")
            
            self.models[name] = model
            results[name] = {
                'train_score': train_score,
                'test_score': test_score,
                'n_support_vectors': len(model.support_vectors_),
                'y_pred_test': y_pred_test
            }
        
        return results, X_train_scaled, X_test_scaled
    
    def hyperparameter_tuning(self, X_train, y_train, problem_type='classification'):
        """
        SVM超参数调优
        """
        print("\nSVM超参数调优:")
        print("=" * 25)
        
        if problem_type == 'classification':
            # 定义参数网格
            param_grids = [
                {
                    'kernel': ['linear'],
                    'C': [0.1, 1, 10, 100]
                },
                {
                    'kernel': ['rbf'],
                    'C': [0.1, 1, 10, 100],
                    'gamma': ['scale', 'auto', 0.001, 0.01, 0.1, 1]
                },
                {
                    'kernel': ['poly'],
                    'C': [0.1, 1, 10],
                    'degree': [2, 3, 4],
                    'gamma': ['scale', 'auto']
                }
            ]
            
            base_model = SVC(random_state=42)
        else:
            param_grids = [
                {
                    'kernel': ['linear'],
                    'C': [0.1, 1, 10, 100]
                },
                {
                    'kernel': ['rbf'],
                    'C': [0.1, 1, 10, 100],
                    'gamma': ['scale', 'auto', 0.001, 0.01, 0.1, 1]
                }
            ]
            
            base_model = SVR()
        
        best_models = {}
        
        for i, param_grid in enumerate(param_grids):
            kernel_name = param_grid['kernel'][0]
            print(f"\n调优 {kernel_name} 核...")
            
            grid_search = GridSearchCV(
                base_model, param_grid, cv=5, scoring='accuracy' if problem_type == 'classification' else 'r2',
                n_jobs=-1, verbose=0
            )
            
            grid_search.fit(X_train, y_train)
            
            print(f"最佳参数: {grid_search.best_params_}")
            print(f"最佳交叉验证分数: {grid_search.best_score_:.4f}")
            
            best_models[kernel_name] = grid_search.best_estimator_
        
        return best_models
    
    def plot_svm_decision_boundaries(self, X, y, model_names=None):
        """
        绘制SVM决策边界(仅适用于2D数据)
        """
        if X.shape[1] != 2:
            print("决策边界可视化仅支持2维特征")
            return
        
        model_names = model_names or list(self.models.keys())[:4]
        
        fig, axes = plt.subplots(2, 2, figsize=(12, 10))
        axes = axes.ravel()
        
        for i, name in enumerate(model_names):
            if i >= len(axes) or name not in self.models:
                continue
                
            ax = axes[i]
            model = self.models[name]
            
            # 标准化数据
            X_scaled = self.scaler.transform(X)
            
            # 创建网格
            h = 0.02
            x_min, x_max = X_scaled[:, 0].min() - 1, X_scaled[:, 0].max() + 1
            y_min, y_max = X_scaled[:, 1].min() - 1, X_scaled[:, 1].max() + 1
            xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
                                np.arange(y_min, y_max, h))
            
            # 预测网格点
            grid_points = np.c_[xx.ravel(), yy.ravel()]
            Z = model.predict(grid_points)
            Z = Z.reshape(xx.shape)
            
            # 绘制决策区域
            ax.contourf(xx, yy, Z, alpha=0.6, cmap='viridis')
            
            # 绘制数据点
            scatter = ax.scatter(X_scaled[:, 0], X_scaled[:, 1], c=y, 
                               cmap='viridis', edgecolors='black', alpha=0.8)
            
            # 绘制支持向量
            if hasattr(model, 'support_vectors_'):
                ax.scatter(model.support_vectors_[:, 0], model.support_vectors_[:, 1], 
                          s=300, linewidth=1, facecolors='none', edgecolors='red')
            
            ax.set_xlabel('特征1 (标准化)')
            ax.set_ylabel('特征2 (标准化)')
            ax.set_title(f'{name}')
        
        plt.tight_layout()
        plt.show()

# 演示SVM
svm_analyzer = SVMAnalyzer()

# SVM概念演示
linear_svm, rbf_svm = svm_analyzer.demonstrate_svm_concepts()

print("\nSVM分类演示:")
print("=" * 20)

# 训练SVM模型
svm_results, X_train_svm_scaled, X_test_svm_scaled = svm_analyzer.train_svm_models(
    X_train_iris, y_train_iris, X_test_iris, y_test_iris, 'classification'
)

# 超参数调优
best_svm_models = svm_analyzer.hyperparameter_tuning(
    X_train_svm_scaled, y_train_iris, 'classification'
)

# 绘制决策边界(使用前两个特征)
X_iris_2d = X_iris[:, :2]  # 只使用前两个特征
X_train_2d, X_test_2d, y_train_2d, y_test_2d = train_test_split(
    X_iris_2d, y_iris, test_size=0.2, random_state=42, stratify=y_iris
)

# 重新训练2D模型用于可视化
svm_2d_analyzer = SVMAnalyzer()
svm_2d_results, _, _ = svm_2d_analyzer.train_svm_models(
    X_train_2d, y_train_2d, X_test_2d, y_test_2d, 'classification'
)

svm_2d_analyzer.plot_svm_decision_boundaries(X_iris_2d, y_iris)

3.7 k近邻算法 (k-NN)

3.7.1 k-NN原理

k近邻算法是一种基于实例的学习方法,通过寻找k个最近邻居来进行预测。

from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor
from sklearn.neighbors import RadiusNeighborsClassifier, RadiusNeighborsRegressor
from sklearn.metrics import pairwise_distances

class KNNAnalyzer:
    """
    k近邻算法分析器
    """
    def __init__(self):
        self.models = {}
        self.scaler = None
    
    def demonstrate_knn_concept(self):
        """
        演示k-NN核心概念
        """
        # 创建简单的2D数据
        np.random.seed(42)
        X_demo = np.array([
            [1, 1], [1, 2], [2, 1], [2, 2],  # 类别0
            [4, 4], [4, 5], [5, 4], [5, 5],  # 类别1
            [7, 1], [7, 2], [8, 1], [8, 2]   # 类别2
        ])
        y_demo = np.array([0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2])
        
        # 新的测试点
        test_point = np.array([[3, 3]])
        
        # 计算距离
        distances = pairwise_distances(test_point, X_demo)[0]
        
        # 可视化
        fig, axes = plt.subplots(1, 3, figsize=(15, 5))
        
        colors = ['red', 'blue', 'green']
        k_values = [1, 3, 5]
        
        for i, k in enumerate(k_values):
            ax = axes[i]
            
            # 绘制训练数据
            for class_idx in range(3):
                mask = y_demo == class_idx
                ax.scatter(X_demo[mask, 0], X_demo[mask, 1], 
                          c=colors[class_idx], label=f'类别{class_idx}', s=100, alpha=0.7)
            
            # 绘制测试点
            ax.scatter(test_point[0, 0], test_point[0, 1], 
                      c='black', marker='x', s=200, linewidth=3, label='测试点')
            
            # 找到k个最近邻
            nearest_indices = np.argsort(distances)[:k]
            
            # 绘制最近邻连线
            for idx in nearest_indices:
                ax.plot([test_point[0, 0], X_demo[idx, 0]], 
                       [test_point[0, 1], X_demo[idx, 1]], 
                       'k--', alpha=0.5, linewidth=1)
            
            # 高亮最近邻
            ax.scatter(X_demo[nearest_indices, 0], X_demo[nearest_indices, 1], 
                      s=300, facecolors='none', edgecolors='black', linewidth=2)
            
            # 预测
            nearest_labels = y_demo[nearest_indices]
            prediction = np.bincount(nearest_labels).argmax()
            
            ax.set_xlabel('特征1')
            ax.set_ylabel('特征2')
            ax.set_title(f'k={k}, 预测类别: {prediction}')
            ax.legend()
            ax.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        print("k-NN预测过程:")
        for k in k_values:
            nearest_indices = np.argsort(distances)[:k]
            nearest_labels = y_demo[nearest_indices]
            nearest_distances = distances[nearest_indices]
            prediction = np.bincount(nearest_labels).argmax()
            
            print(f"\nk={k}:")
            print(f"  最近邻索引: {nearest_indices}")
            print(f"  最近邻标签: {nearest_labels}")
            print(f"  距离: {nearest_distances}")
            print(f"  预测类别: {prediction}")
    
    def train_knn_models(self, X_train, y_train, X_test, y_test, problem_type='classification'):
        """
        训练不同参数的k-NN模型
        """
        # 标准化数据
        self.scaler = StandardScaler()
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)
        
        if problem_type == 'classification':
            models = {
                'k-NN (k=1)': KNeighborsClassifier(n_neighbors=1),
                'k-NN (k=3)': KNeighborsClassifier(n_neighbors=3),
                'k-NN (k=5)': KNeighborsClassifier(n_neighbors=5),
                'k-NN (k=7)': KNeighborsClassifier(n_neighbors=7),
                'k-NN (距离权重)': KNeighborsClassifier(n_neighbors=5, weights='distance'),
                'k-NN (曼哈顿距离)': KNeighborsClassifier(n_neighbors=5, metric='manhattan')
            }
        else:
            models = {
                'k-NN (k=1)': KNeighborsRegressor(n_neighbors=1),
                'k-NN (k=3)': KNeighborsRegressor(n_neighbors=3),
                'k-NN (k=5)': KNeighborsRegressor(n_neighbors=5),
                'k-NN (k=7)': KNeighborsRegressor(n_neighbors=7),
                'k-NN (距离权重)': KNeighborsRegressor(n_neighbors=5, weights='distance'),
                'k-NN (曼哈顿距离)': KNeighborsRegressor(n_neighbors=5, metric='manhattan')
            }
        
        results = {}
        
        for name, model in models.items():
            print(f"\n训练 {name}...")
            
            # 训练
            model.fit(X_train_scaled, y_train)
            
            # 预测
            y_pred_train = model.predict(X_train_scaled)
            y_pred_test = model.predict(X_test_scaled)
            
            if problem_type == 'classification':
                train_score = accuracy_score(y_train, y_pred_train)
                test_score = accuracy_score(y_test, y_pred_test)
                metric_name = '准确率'
            else:
                train_score = r2_score(y_train, y_pred_train)
                test_score = r2_score(y_test, y_pred_test)
                metric_name = 'R²分数'
            
            print(f"  训练{metric_name}: {train_score:.4f}")
            print(f"  测试{metric_name}: {test_score:.4f}")
            
            self.models[name] = model
            results[name] = {
                'train_score': train_score,
                'test_score': test_score,
                'y_pred_test': y_pred_test
            }
        
        return results, X_train_scaled, X_test_scaled
    
    def find_optimal_k(self, X_train, y_train, max_k=20, problem_type='classification'):
        """
        寻找最优k值
        """
        k_range = range(1, max_k + 1)
        cv_scores = []
        
        for k in k_range:
            if problem_type == 'classification':
                knn = KNeighborsClassifier(n_neighbors=k)
                scores = cross_val_score(knn, X_train, y_train, cv=5, scoring='accuracy')
            else:
                knn = KNeighborsRegressor(n_neighbors=k)
                scores = cross_val_score(knn, X_train, y_train, cv=5, scoring='r2')
            
            cv_scores.append(scores.mean())
        
        # 可视化
        plt.figure(figsize=(10, 6))
        plt.plot(k_range, cv_scores, 'o-', linewidth=2, markersize=6)
        
        # 标记最优k
        best_k = k_range[np.argmax(cv_scores)]
        best_score = max(cv_scores)
        plt.axvline(x=best_k, color='red', linestyle='--', alpha=0.7, 
                   label=f'最优k={best_k}')
        
        plt.xlabel('k值')
        plt.ylabel('交叉验证分数')
        plt.title('k-NN: k值选择')
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.show()
        
        print(f"最优k值: {best_k}")
        print(f"最佳交叉验证分数: {best_score:.4f}")
        
        return best_k, cv_scores
    
    def plot_knn_decision_boundaries(self, X, y, k_values=[1, 3, 5, 7]):
        """
        绘制不同k值的决策边界
        """
        if X.shape[1] != 2:
            print("决策边界可视化仅支持2维特征")
            return
        
        fig, axes = plt.subplots(2, 2, figsize=(12, 10))
        axes = axes.ravel()
        
        X_scaled = self.scaler.transform(X)
        
        for i, k in enumerate(k_values):
            if i >= len(axes):
                break
                
            ax = axes[i]
            
            # 训练模型
            knn = KNeighborsClassifier(n_neighbors=k)
            knn.fit(X_scaled, y)
            
            # 创建网格
            h = 0.02
            x_min, x_max = X_scaled[:, 0].min() - 1, X_scaled[:, 0].max() + 1
            y_min, y_max = X_scaled[:, 1].min() - 1, X_scaled[:, 1].max() + 1
            xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
                                np.arange(y_min, y_max, h))
            
            # 预测网格点
            grid_points = np.c_[xx.ravel(), yy.ravel()]
            Z = knn.predict(grid_points)
            Z = Z.reshape(xx.shape)
            
            # 绘制决策区域
            ax.contourf(xx, yy, Z, alpha=0.6, cmap='viridis')
            
            # 绘制数据点
            scatter = ax.scatter(X_scaled[:, 0], X_scaled[:, 1], c=y, 
                               cmap='viridis', edgecolors='black', alpha=0.8)
            
            ax.set_xlabel('特征1 (标准化)')
            ax.set_ylabel('特征2 (标准化)')
            ax.set_title(f'k-NN (k={k})')
        
        plt.tight_layout()
        plt.show()

# 演示k-NN
knn_analyzer = KNNAnalyzer()

# k-NN概念演示
knn_analyzer.demonstrate_knn_concept()

print("\nk-NN分类演示:")
print("=" * 20)

# 训练k-NN模型
knn_results, X_train_knn_scaled, X_test_knn_scaled = knn_analyzer.train_knn_models(
    X_train_iris, y_train_iris, X_test_iris, y_test_iris, 'classification'
)

# 寻找最优k值
best_k, cv_scores = knn_analyzer.find_optimal_k(
    X_train_knn_scaled, y_train_iris, max_k=15, problem_type='classification'
)

# 绘制决策边界(使用2D数据)
knn_2d_analyzer = KNNAnalyzer()
knn_2d_results, _, _ = knn_2d_analyzer.train_knn_models(
    X_train_2d, y_train_2d, X_test_2d, y_test_2d, 'classification'
)

knn_2d_analyzer.plot_knn_decision_boundaries(X_iris_2d, y_iris)

3.8 算法比较与选择

3.8.1 算法性能比较

class AlgorithmComparison:
    """
    监督学习算法比较
    """
    def __init__(self):
        self.results = {}
    
    def compare_all_algorithms(self, X_train, y_train, X_test, y_test, problem_type='classification'):
        """
        比较所有算法
        """
        # 标准化数据
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)
        
        if problem_type == 'classification':
            algorithms = {
                'Logistic Regression': LogisticRegression(random_state=42),
                'Decision Tree': DecisionTreeClassifier(random_state=42),
                'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
                'SVM (RBF)': SVC(kernel='rbf', random_state=42),
                'k-NN': KNeighborsClassifier(n_neighbors=5)
            }
            scoring_metrics = ['accuracy', 'precision_weighted', 'recall_weighted', 'f1_weighted']
        else:
            algorithms = {
                'Linear Regression': LinearRegression(),
                'Decision Tree': DecisionTreeRegressor(random_state=42),
                'Random Forest': RandomForestRegressor(n_estimators=100, random_state=42),
                'SVR (RBF)': SVR(kernel='rbf'),
                'k-NN': KNeighborsRegressor(n_neighbors=5)
            }
            scoring_metrics = ['r2', 'neg_mean_squared_error', 'neg_mean_absolute_error']
        
        comparison_results = {}
        
        for name, algorithm in algorithms.items():
            print(f"\n评估 {name}...")
            
            # 交叉验证
            cv_results = {}
            for metric in scoring_metrics:
                scores = cross_val_score(algorithm, X_train_scaled, y_train, 
                                       cv=5, scoring=metric)
                cv_results[metric] = {
                    'mean': scores.mean(),
                    'std': scores.std()
                }
            
            # 训练和测试
            algorithm.fit(X_train_scaled, y_train)
            y_pred_test = algorithm.predict(X_test_scaled)
            
            if problem_type == 'classification':
                test_accuracy = accuracy_score(y_test, y_pred_test)
                test_precision = precision_score(y_test, y_pred_test, average='weighted')
                test_recall = recall_score(y_test, y_pred_test, average='weighted')
                test_f1 = f1_score(y_test, y_pred_test, average='weighted')
                
                test_scores = {
                    'accuracy': test_accuracy,
                    'precision': test_precision,
                    'recall': test_recall,
                    'f1': test_f1
                }
            else:
                test_r2 = r2_score(y_test, y_pred_test)
                test_mse = mean_squared_error(y_test, y_pred_test)
                test_mae = mean_absolute_error(y_test, y_pred_test)
                
                test_scores = {
                    'r2': test_r2,
                    'mse': test_mse,
                    'mae': test_mae
                }
            
            comparison_results[name] = {
                'cv_results': cv_results,
                'test_scores': test_scores,
                'model': algorithm
            }
        
        self.results = comparison_results
        return comparison_results
    
    def plot_comparison_results(self, problem_type='classification'):
        """
        可视化比较结果
        """
        if not self.results:
            print("没有比较结果")
            return
        
        algorithms = list(self.results.keys())
        
        if problem_type == 'classification':
            metrics = ['accuracy', 'precision', 'recall', 'f1']
            cv_metrics = ['accuracy', 'precision_weighted', 'recall_weighted', 'f1_weighted']
        else:
            metrics = ['r2', 'mse', 'mae']
            cv_metrics = ['r2', 'neg_mean_squared_error', 'neg_mean_absolute_error']
        
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        axes = axes.ravel()
        
        # 交叉验证结果
        for i, (metric, cv_metric) in enumerate(zip(metrics[:2], cv_metrics[:2])):
            ax = axes[i]
            
            means = [self.results[alg]['cv_results'][cv_metric]['mean'] for alg in algorithms]
            stds = [self.results[alg]['cv_results'][cv_metric]['std'] for alg in algorithms]
            
            bars = ax.bar(algorithms, means, yerr=stds, capsize=5, alpha=0.7)
            ax.set_ylabel(f'{metric.upper()}')
            ax.set_title(f'交叉验证 {metric.upper()}')
            ax.tick_params(axis='x', rotation=45)
            
            # 添加数值标签
            for bar, mean in zip(bars, means):
                height = bar.get_height()
                ax.text(bar.get_x() + bar.get_width()/2., height + 0.01,
                       f'{mean:.3f}', ha='center', va='bottom')
        
        # 测试集结果
        for i, metric in enumerate(metrics[:2]):
            ax = axes[i + 2]
            
            scores = [self.results[alg]['test_scores'][metric] for alg in algorithms]
            
            bars = ax.bar(algorithms, scores, alpha=0.7, color='orange')
            ax.set_ylabel(f'{metric.upper()}')
            ax.set_title(f'测试集 {metric.upper()}')
            ax.tick_params(axis='x', rotation=45)
            
            # 添加数值标签
            for bar, score in zip(bars, scores):
                height = bar.get_height()
                ax.text(bar.get_x() + bar.get_width()/2., height + 0.01,
                       f'{score:.3f}', ha='center', va='bottom')
        
        plt.tight_layout()
        plt.show()
    
    def create_comparison_table(self, problem_type='classification'):
        """
        创建比较表格
        """
        if not self.results:
            print("没有比较结果")
            return
        
        data = []
        
        for alg_name, result in self.results.items():
            if problem_type == 'classification':
                row = {
                    '算法': alg_name,
                    'CV准确率': f"{result['cv_results']['accuracy']['mean']:.3f} ± {result['cv_results']['accuracy']['std']:.3f}",
                    '测试准确率': f"{result['test_scores']['accuracy']:.3f}",
                    '测试精确率': f"{result['test_scores']['precision']:.3f}",
                    '测试召回率': f"{result['test_scores']['recall']:.3f}",
                    '测试F1': f"{result['test_scores']['f1']:.3f}"
                }
            else:
                row = {
                    '算法': alg_name,
                    'CV R²': f"{result['cv_results']['r2']['mean']:.3f} ± {result['cv_results']['r2']['std']:.3f}",
                    '测试R²': f"{result['test_scores']['r2']:.3f}",
                    '测试MSE': f"{result['test_scores']['mse']:.3f}",
                    '测试MAE': f"{result['test_scores']['mae']:.3f}"
                }
            
            data.append(row)
        
        df = pd.DataFrame(data)
        print("\n算法性能比较表:")
        print("=" * 60)
        print(df.to_string(index=False))
        
        return df

# 算法比较演示
comparison = AlgorithmComparison()

print("\n监督学习算法比较:")
print("=" * 30)

# 比较所有算法
comparison_results = comparison.compare_all_algorithms(
    X_train_iris, y_train_iris, X_test_iris, y_test_iris, 'classification'
)

# 可视化比较结果
comparison.plot_comparison_results('classification')

# 创建比较表格
comparison_df = comparison.create_comparison_table('classification')

3.9 本章小结

3.9.1 核心内容回顾

本章详细介绍了监督学习的主要算法:

  1. 线性回归

    • 基础线性回归
    • 正则化回归(Ridge、Lasso、ElasticNet)
    • 多项式回归
  2. 逻辑回归

    • 二分类逻辑回归
    • 多类分类策略
    • 决策边界可视化
  3. 决策树

    • 信息增益和基尼不纯度
    • 决策树分类和回归
    • 剪枝技术
  4. 随机森林

    • Bootstrap采样
    • 特征重要性分析
    • 集成学习原理
  5. 支持向量机

    • 线性和非线性SVM
    • 核函数技巧
    • 超参数调优
  6. k近邻算法

    • 距离度量
    • k值选择
    • 权重策略

3.9.2 算法选择指南

算法 优点 缺点 适用场景
线性回归 简单、可解释性强 假设线性关系 线性关系明显的回归问题
逻辑回归 概率输出、快速 假设线性可分 二分类、需要概率的场景
决策树 可解释性强、处理非线性 容易过拟合 需要可解释性的分类问题
随机森林 性能好、鲁棒性强 黑盒模型 大多数分类和回归问题
SVM 处理高维数据好 参数敏感、训练慢 高维数据、小样本
k-NN 简单、无参数假设 计算量大、存储需求高 局部模式明显的问题

3.9.3 实践建议

  1. 数据预处理:始终进行适当的数据预处理
  2. 特征缩放:对距离敏感的算法(SVM、k-NN)进行标准化
  3. 交叉验证:使用交叉验证评估模型性能
  4. 超参数调优:使用网格搜索或随机搜索优化参数
  5. 集成方法:考虑使用集成方法提高性能

3.10 下一章预告

下一章我们将学习无监督学习算法,包括: - 聚类算法(K-means、层次聚类、DBSCAN) - 降维技术(PCA、t-SNE、UMAP) - 关联规则挖掘 - 异常检测

3.11 练习题

基础练习

  1. 实现一个简单的线性回归算法(不使用sklearn)
  2. 比较不同正则化参数对模型性能的影响
  3. 手动计算决策树的信息增益
  4. 实现k-NN算法的距离计算

进阶练习

  1. 在真实数据集上比较所有监督学习算法
  2. 实现自定义的集成学习算法
  3. 分析不同核函数对SVM性能的影响
  4. 研究样本不平衡对各算法的影响

项目练习

  1. 房价预测项目:使用多种回归算法预测房价
  2. 文本分类项目:实现垃圾邮件分类系统
  3. 图像分类项目:使用传统机器学习方法进行图像分类
  4. 推荐系统项目:基于协同过滤的推荐算法

思考题

  1. 为什么集成学习通常比单一算法性能更好?

  2. 在什么情况下简单模型比复杂模型更好?

  3. 如何处理高维数据的维度诅咒问题?

  4. 监督学习和无监督学习的本质区别是什么?0.3)

    plt.tight_layout()
    plt.show()
    
    
    return coef_data
    

使用线性回归分析器

analyzer = LinearRegressionAnalyzer()

创建更复杂的回归数据

np.random.seed(42) n_samples = 500 n_features = 5

X_complex = np.random.randn(n_samples, n_features)

创建真实系数

true_coef = np.array([2.5, -1.5, 3.0, 0.5, -2.0]) y_complex = X_complex @ true_coef + np.random.randn(n_samples) * 0.5

分割数据

X_train, X_test, y_train, y_test = train_test_split( X_complex, y_complex, test_size=0.2, random_state=42 )

标准化

scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) X_test_scaled = scaler.transform(X_test)

训练模型

analyzer.fit_models(X_train_scaled, y_train, X_test_scaled, y_test)

可视化结果

analyzer.plot_results(X_test_scaled, y_test)

比较系数

coef_comparison = analyzer.compare_coefficients()


### 3.2.2 正则化回归

正则化技术用于防止过拟合,主要包括Ridge、Lasso和ElasticNet。

```python
class RegularizationDemo:
    """
    正则化回归演示
    """
    def __init__(self):
        self.alphas = np.logspace(-4, 2, 50)
        
    def regularization_path(self, X_train, y_train, X_test, y_test):
        """
        绘制正则化路径
        """
        ridge_train_scores = []
        ridge_test_scores = []
        lasso_train_scores = []
        lasso_test_scores = []
        lasso_n_features = []
        
        for alpha in self.alphas:
            # Ridge回归
            ridge = Ridge(alpha=alpha)
            ridge.fit(X_train, y_train)
            ridge_train_scores.append(ridge.score(X_train, y_train))
            ridge_test_scores.append(ridge.score(X_test, y_test))
            
            # Lasso回归
            lasso = Lasso(alpha=alpha, max_iter=2000)
            lasso.fit(X_train, y_train)
            lasso_train_scores.append(lasso.score(X_train, y_train))
            lasso_test_scores.append(lasso.score(X_test, y_test))
            lasso_n_features.append(np.sum(lasso.coef_ != 0))
        
        # 可视化
        fig, axes = plt.subplots(2, 2, figsize=(12, 10))
        
        # Ridge正则化路径
        axes[0, 0].semilogx(self.alphas, ridge_train_scores, 'b-', label='训练集')
        axes[0, 0].semilogx(self.alphas, ridge_test_scores, 'r-', label='测试集')
        axes[0, 0].set_xlabel('Alpha')
        axes[0, 0].set_ylabel('R² Score')
        axes[0, 0].set_title('Ridge回归正则化路径')
        axes[0, 0].legend()
        axes[0, 0].grid(True, alpha=0.3)
        
        # Lasso正则化路径
        axes[0, 1].semilogx(self.alphas, lasso_train_scores, 'b-', label='训练集')
        axes[0, 1].semilogx(self.alphas, lasso_test_scores, 'r-', label='测试集')
        axes[0, 1].set_xlabel('Alpha')
        axes[0, 1].set_ylabel('R² Score')
        axes[0, 1].set_title('Lasso回归正则化路径')
        axes[0, 1].legend()
        axes[0, 1].grid(True, alpha=0.3)
        
        # Lasso特征选择
        axes[1, 0].semilogx(self.alphas, lasso_n_features, 'g-', linewidth=2)
        axes[1, 0].set_xlabel('Alpha')
        axes[1, 0].set_ylabel('选择的特征数量')
        axes[1, 0].set_title('Lasso特征选择效果')
        axes[1, 0].grid(True, alpha=0.3)
        
        # 系数路径(Lasso)
        lasso_coefs = []
        for alpha in self.alphas:
            lasso = Lasso(alpha=alpha, max_iter=2000)
            lasso.fit(X_train, y_train)
            lasso_coefs.append(lasso.coef_)
        
        lasso_coefs = np.array(lasso_coefs)
        for i in range(lasso_coefs.shape[1]):
            axes[1, 1].semilogx(self.alphas, lasso_coefs[:, i], label=f'特征{i+1}')
        
        axes[1, 1].set_xlabel('Alpha')
        axes[1, 1].set_ylabel('系数值')
        axes[1, 1].set_title('Lasso系数路径')
        axes[1, 1].legend()
        axes[1, 1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        return {
            'ridge_scores': (ridge_train_scores, ridge_test_scores),
            'lasso_scores': (lasso_train_scores, lasso_test_scores),
            'lasso_features': lasso_n_features
        }
    
    def find_optimal_alpha(self, X_train, y_train):
        """
        使用交叉验证寻找最优alpha
        """
        from sklearn.model_selection import RidgeCV, LassoCV
        
        # Ridge交叉验证
        ridge_cv = RidgeCV(alphas=self.alphas, cv=5)
        ridge_cv.fit(X_train, y_train)
        
        # Lasso交叉验证
        lasso_cv = LassoCV(alphas=self.alphas, cv=5, max_iter=2000)
        lasso_cv.fit(X_train, y_train)
        
        print("交叉验证结果:")
        print(f"Ridge最优alpha: {ridge_cv.alpha_:.6f}")
        print(f"Lasso最优alpha: {lasso_cv.alpha_:.6f}")
        
        return ridge_cv.alpha_, lasso_cv.alpha_

# 演示正则化
reg_demo = RegularizationDemo()

# 创建有噪声和冗余特征的数据
np.random.seed(42)
n_samples = 200
n_features = 20
n_informative = 5

X_reg = np.random.randn(n_samples, n_features)
# 只有前5个特征是有用的
true_coef = np.zeros(n_features)
true_coef[:n_informative] = np.random.randn(n_informative) * 2
y_reg = X_reg @ true_coef + np.random.randn(n_samples) * 0.1

# 分割数据
X_train_reg, X_test_reg, y_train_reg, y_test_reg = train_test_split(
    X_reg, y_reg, test_size=0.2, random_state=42
)

# 标准化
scaler_reg = StandardScaler()
X_train_reg_scaled = scaler_reg.fit_transform(X_train_reg)
X_test_reg_scaled = scaler_reg.transform(X_test_reg)

# 绘制正则化路径
reg_results = reg_demo.regularization_path(
    X_train_reg_scaled, y_train_reg, X_test_reg_scaled, y_test_reg
)

# 寻找最优alpha
optimal_alphas = reg_demo.find_optimal_alpha(X_train_reg_scaled, y_train_reg)

3.2.3 多项式回归

多项式回归通过增加特征的高次项来捕捉非线性关系。

class PolynomialRegressionDemo:
    """
    多项式回归演示
    """
    def __init__(self):
        self.degrees = range(1, 11)
    
    def generate_nonlinear_data(self, n_samples=100):
        """
        生成非线性数据
        """
        np.random.seed(42)
        X = np.linspace(-2, 2, n_samples).reshape(-1, 1)
        y = 0.5 * X.ravel() ** 3 - 2 * X.ravel() ** 2 + X.ravel() + np.random.randn(n_samples) * 0.3
        return X, y
    
    def fit_polynomial_models(self, X, y):
        """
        拟合不同阶数的多项式模型
        """
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )
        
        train_scores = []
        test_scores = []
        models = {}
        
        for degree in self.degrees:
            # 创建多项式特征
            poly_features = PolynomialFeatures(degree=degree)
            X_train_poly = poly_features.fit_transform(X_train)
            X_test_poly = poly_features.transform(X_test)
            
            # 训练线性回归模型
            model = LinearRegression()
            model.fit(X_train_poly, y_train)
            
            # 评估
            train_score = model.score(X_train_poly, y_train)
            test_score = model.score(X_test_poly, y_test)
            
            train_scores.append(train_score)
            test_scores.append(test_score)
            models[degree] = (model, poly_features)
            
            print(f"阶数 {degree}: 训练R² = {train_score:.4f}, 测试R² = {test_score:.4f}")
        
        return models, train_scores, test_scores, (X_train, X_test, y_train, y_test)
    
    def plot_polynomial_fits(self, models, data, degrees_to_plot=[1, 3, 5, 9]):
        """
        可视化不同阶数的多项式拟合
        """
        X_train, X_test, y_train, y_test = data
        
        # 创建用于绘图的密集点
        X_plot = np.linspace(X_train.min(), X_train.max(), 300).reshape(-1, 1)
        
        fig, axes = plt.subplots(2, 2, figsize=(12, 10))
        axes = axes.ravel()
        
        for i, degree in enumerate(degrees_to_plot):
            ax = axes[i]
            
            if degree in models:
                model, poly_features = models[degree]
                X_plot_poly = poly_features.transform(X_plot)
                y_plot = model.predict(X_plot_poly)
                
                # 绘制数据点
                ax.scatter(X_train, y_train, alpha=0.6, label='训练数据')
                ax.scatter(X_test, y_test, alpha=0.6, color='red', label='测试数据')
                
                # 绘制拟合曲线
                ax.plot(X_plot, y_plot, color='green', linewidth=2, label=f'{degree}阶多项式')
                
                ax.set_xlabel('X')
                ax.set_ylabel('y')
                ax.set_title(f'{degree}阶多项式回归')
                ax.legend()
                ax.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
    
    def plot_learning_curves(self, train_scores, test_scores):
        """
        绘制学习曲线
        """
        plt.figure(figsize=(10, 6))
        
        plt.plot(self.degrees, train_scores, 'o-', label='训练集R²', linewidth=2, markersize=6)
        plt.plot(self.degrees, test_scores, 'o-', label='测试集R²', linewidth=2, markersize=6)
        
        plt.xlabel('多项式阶数')
        plt.ylabel('R² Score')
        plt.title('多项式回归:训练集vs测试集性能')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # 标注过拟合区域
        best_test_idx = np.argmax(test_scores)
        plt.axvline(x=self.degrees[best_test_idx], color='red', linestyle='--', alpha=0.7, 
                   label=f'最佳阶数: {self.degrees[best_test_idx]}')
        
        plt.legend()
        plt.show()
        
        return self.degrees[best_test_idx]

# 演示多项式回归
poly_demo = PolynomialRegressionDemo()

# 生成非线性数据
X_nonlinear, y_nonlinear = poly_demo.generate_nonlinear_data(150)

print("多项式回归演示:")
print("=" * 30)

# 拟合多项式模型
poly_models, train_scores, test_scores, poly_data = poly_demo.fit_polynomial_models(
    X_nonlinear, y_nonlinear
)

# 可视化拟合结果
poly_demo.plot_polynomial_fits(poly_models, poly_data)

# 绘制学习曲线
best_degree = poly_demo.plot_learning_curves(train_scores, test_scores)
print(f"\n最佳多项式阶数: {best_degree}")

3.3 逻辑回归

3.3.1 逻辑回归原理

逻辑回归是用于分类问题的线性模型,它使用逻辑函数(sigmoid函数)将线性组合映射到概率。

数学公式: $$P(y=1|x) = \frac{1}{1 + e^{-(\beta_0 + \beta_1 x_1 + … + \beta_n x_n)}}$$

from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_curve, auc, roc_auc_score
from sklearn.metrics import precision_recall_curve, average_precision_score

class LogisticRegressionAnalyzer:
    """
    逻辑回归分析器
    """
    def __init__(self):
        self.model = None
        self.scaler = None
    
    def sigmoid(self, z):
        """
        Sigmoid函数
        """
        return 1 / (1 + np.exp(-np.clip(z, -250, 250)))
    
    def plot_sigmoid(self):
        """
        可视化sigmoid函数
        """
        z = np.linspace(-10, 10, 100)
        y = self.sigmoid(z)
        
        plt.figure(figsize=(8, 6))
        plt.plot(z, y, 'b-', linewidth=2, label='Sigmoid函数')
        plt.axhline(y=0.5, color='r', linestyle='--', alpha=0.7, label='决策边界')
        plt.axvline(x=0, color='r', linestyle='--', alpha=0.7)
        
        plt.xlabel('z = β₀ + β₁x₁ + ... + βₙxₙ')
        plt.ylabel('P(y=1|x)')
        plt.title('Sigmoid函数')
        plt.grid(True, alpha=0.3)
        plt.legend()
        plt.show()
    
    def train_and_evaluate(self, X_train, y_train, X_test, y_test):
        """
        训练和评估逻辑回归模型
        """
        # 标准化特征
        self.scaler = StandardScaler()
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)
        
        # 训练模型
        self.model = LogisticRegression(random_state=42)
        self.model.fit(X_train_scaled, y_train)
        
        # 预测
        y_pred = self.model.predict(X_test_scaled)
        y_pred_proba = self.model.predict_proba(X_test_scaled)[:, 1]
        
        # 评估
        accuracy = accuracy_score(y_test, y_pred)
        precision = precision_score(y_test, y_pred)
        recall = recall_score(y_test, y_pred)
        f1 = f1_score(y_test, y_pred)
        auc_score = roc_auc_score(y_test, y_pred_proba)
        
        print("逻辑回归模型评估:")
        print(f"准确率: {accuracy:.4f}")
        print(f"精确率: {precision:.4f}")
        print(f"召回率: {recall:.4f}")
        print(f"F1分数: {f1:.4f}")
        print(f"AUC分数: {auc_score:.4f}")
        
        return {
            'y_pred': y_pred,
            'y_pred_proba': y_pred_proba,
            'metrics': {
                'accuracy': accuracy,
                'precision': precision,
                'recall': recall,
                'f1': f1,
                'auc': auc_score
            }
        }
    
    def plot_decision_boundary(self, X, y, resolution=0.02):
        """
        绘制决策边界(仅适用于2D数据)
        """
        if X.shape[1] != 2:
            print("决策边界可视化仅支持2维特征")
            return
        
        # 标准化数据
        X_scaled = self.scaler.transform(X)
        
        # 创建网格
        x_min, x_max = X_scaled[:, 0].min() - 1, X_scaled[:, 0].max() + 1
        y_min, y_max = X_scaled[:, 1].min() - 1, X_scaled[:, 1].max() + 1
        
        xx, yy = np.meshgrid(np.arange(x_min, x_max, resolution),
                            np.arange(y_min, y_max, resolution))
        
        # 预测网格点
        grid_points = np.c_[xx.ravel(), yy.ravel()]
        Z = self.model.predict_proba(grid_points)[:, 1]
        Z = Z.reshape(xx.shape)
        
        # 绘制
        plt.figure(figsize=(10, 8))
        
        # 绘制概率等高线
        contour = plt.contourf(xx, yy, Z, levels=50, alpha=0.6, cmap='RdYlBu')
        plt.colorbar(contour, label='P(y=1)')
        
        # 绘制决策边界
        plt.contour(xx, yy, Z, levels=[0.5], colors='black', linestyles='--', linewidths=2)
        
        # 绘制数据点
        scatter = plt.scatter(X_scaled[:, 0], X_scaled[:, 1], c=y, 
                            cmap='RdYlBu', edgecolors='black', alpha=0.8)
        
        plt.xlabel('特征1 (标准化)')
        plt.ylabel('特征2 (标准化)')
        plt.title('逻辑回归决策边界')
        plt.show()
    
    def plot_roc_and_pr_curves(self, y_test, y_pred_proba):
        """
        绘制ROC曲线和PR曲线
        """
        # 计算ROC曲线
        fpr, tpr, roc_thresholds = roc_curve(y_test, y_pred_proba)
        roc_auc = auc(fpr, tpr)
        
        # 计算PR曲线
        precision, recall, pr_thresholds = precision_recall_curve(y_test, y_pred_proba)
        avg_precision = average_precision_score(y_test, y_pred_proba)
        
        # 绘制
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
        
        # ROC曲线
        ax1.plot(fpr, tpr, 'b-', linewidth=2, label=f'ROC曲线 (AUC = {roc_auc:.3f})')
        ax1.plot([0, 1], [0, 1], 'r--', linewidth=1, label='随机分类器')
        ax1.set_xlabel('假正率 (FPR)')
        ax1.set_ylabel('真正率 (TPR)')
        ax1.set_title('ROC曲线')
        ax1.legend()
        ax1.grid(True, alpha=0.3)
        
        # PR曲线
        ax2.plot(recall, precision, 'g-', linewidth=2, 
                label=f'PR曲线 (AP = {avg_precision:.3f})')
        ax2.axhline(y=y_test.mean(), color='r', linestyle='--', linewidth=1, 
                   label=f'随机分类器 (AP = {y_test.mean():.3f})')
        ax2.set_xlabel('召回率 (Recall)')
        ax2.set_ylabel('精确率 (Precision)')
        ax2.set_title('精确率-召回率曲线')
        ax2.legend()
        ax2.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        return roc_auc, avg_precision

# 演示逻辑回归
logistic_analyzer = LogisticRegressionAnalyzer()

# 可视化sigmoid函数
logistic_analyzer.plot_sigmoid()

# 使用之前创建的分类数据
X_train_clf, X_test_clf, y_train_clf, y_test_clf = train_test_split(
    X_classification, y_classification, test_size=0.2, random_state=42, stratify=y_classification
)

# 训练和评估
results = logistic_analyzer.train_and_evaluate(
    X_train_clf, y_train_clf, X_test_clf, y_test_clf
)

# 绘制决策边界
logistic_analyzer.plot_decision_boundary(X_classification, y_classification)

# 绘制ROC和PR曲线
roc_auc, avg_precision = logistic_analyzer.plot_roc_and_pr_curves(
    y_test_clf, results['y_pred_proba']
)

# 混淆矩阵
from sklearn.metrics import confusion_matrix
cm = confusion_matrix(y_test_clf, results['y_pred'])

plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=['类别0', '类别1'], 
            yticklabels=['类别0', '类别1'])
plt.title('混淆矩阵')
plt.ylabel('真实标签')
plt.xlabel('预测标签')
plt.show()

print(f"\n混淆矩阵:")
print(f"真负例: {cm[0,0]}, 假正例: {cm[0,1]}")
print(f"假负例: {cm[1,0]}, 真正例: {cm[1,1]}")

3.3.2 多类分类

逻辑回归可以扩展到多类分类问题。

from sklearn.datasets import make_classification
from sklearn.multiclass import OneVsRestClassifier, OneVsOneClassifier

class MultiClassLogisticRegression:
    """
    多类逻辑回归
    """
    def __init__(self):
        self.models = {}
        self.scaler = None
    
    def create_multiclass_data(self, n_samples=1000, n_classes=3):
        """
        创建多类分类数据
        """
        X, y = make_classification(
            n_samples=n_samples,
            n_features=2,
            n_redundant=0,
            n_informative=2,
            n_clusters_per_class=1,
            n_classes=n_classes,
            random_state=42
        )
        return X, y
    
    def train_multiclass_models(self, X_train, y_train, X_test, y_test):
        """
        训练多种多类分类策略
        """
        # 标准化
        self.scaler = StandardScaler()
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)
        
        # 定义模型
        models = {
            'Multinomial': LogisticRegression(multi_class='multinomial', solver='lbfgs', random_state=42),
            'One-vs-Rest': OneVsRestClassifier(LogisticRegression(random_state=42)),
            'One-vs-One': OneVsOneClassifier(LogisticRegression(random_state=42))
        }
        
        results = {}
        
        for name, model in models.items():
            print(f"\n训练 {name} 模型...")
            
            # 训练
            model.fit(X_train_scaled, y_train)
            
            # 预测
            y_pred = model.predict(X_test_scaled)
            y_pred_proba = model.predict_proba(X_test_scaled)
            
            # 评估
            accuracy = accuracy_score(y_test, y_pred)
            precision = precision_score(y_test, y_pred, average='weighted')
            recall = recall_score(y_test, y_pred, average='weighted')
            f1 = f1_score(y_test, y_pred, average='weighted')
            
            print(f"准确率: {accuracy:.4f}")
            print(f"精确率: {precision:.4f}")
            print(f"召回率: {recall:.4f}")
            print(f"F1分数: {f1:.4f}")
            
            self.models[name] = model
            results[name] = {
                'y_pred': y_pred,
                'y_pred_proba': y_pred_proba,
                'accuracy': accuracy,
                'precision': precision,
                'recall': recall,
                'f1': f1
            }
        
        return results, X_train_scaled, X_test_scaled
    
    def plot_multiclass_decision_boundaries(self, X, y, resolution=0.02):
        """
        绘制多类决策边界
        """
        X_scaled = self.scaler.transform(X)
        
        # 创建网格
        x_min, x_max = X_scaled[:, 0].min() - 1, X_scaled[:, 0].max() + 1
        y_min, y_max = X_scaled[:, 1].min() - 1, X_scaled[:, 1].max() + 1
        
        xx, yy = np.meshgrid(np.arange(x_min, x_max, resolution),
                            np.arange(y_min, y_max, resolution))
        
        grid_points = np.c_[xx.ravel(), yy.ravel()]
        
        fig, axes = plt.subplots(1, 3, figsize=(15, 5))
        
        for i, (name, model) in enumerate(self.models.items()):
            ax = axes[i]
            
            # 预测网格点
            Z = model.predict(grid_points)
            Z = Z.reshape(xx.shape)
            
            # 绘制决策区域
            ax.contourf(xx, yy, Z, alpha=0.6, cmap='viridis')
            
            # 绘制数据点
            scatter = ax.scatter(X_scaled[:, 0], X_scaled[:, 1], c=y, 
                               cmap='viridis', edgecolors='black', alpha=0.8)
            
            ax.set_xlabel('特征1 (标准化)')
            ax.set_ylabel('特征2 (标准化)')
            ax.set_title(f'{name} 决策边界')
        
        plt.tight_layout()
        plt.show()
    
    def plot_confusion_matrices(self, y_test, results):
        """
        绘制混淆矩阵
        """
        fig, axes = plt.subplots(1, 3, figsize=(15, 4))
        
        for i, (name, result) in enumerate(results.items()):
            ax = axes[i]
            
            cm = confusion_matrix(y_test, result['y_pred'])
            
            sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=ax,
                       xticklabels=[f'类别{i}' for i in range(len(np.unique(y_test)))],
                       yticklabels=[f'类别{i}' for i in range(len(np.unique(y_test)))])
            
            ax.set_title(f'{name}\n准确率: {result["accuracy"]:.3f}')
            ax.set_ylabel('真实标签')
            ax.set_xlabel('预测标签')
        
        plt.tight_layout()
        plt.show()

# 演示多类逻辑回归
multi_clf = MultiClassLogisticRegression()

# 创建多类数据
X_multi, y_multi = multi_clf.create_multiclass_data(n_samples=800, n_classes=3)

print(f"多类数据形状: {X_multi.shape}")
print(f"类别分布: {np.bincount(y_multi)}")

# 分割数据
X_train_multi, X_test_multi, y_train_multi, y_test_multi = train_test_split(
    X_multi, y_multi, test_size=0.2, random_state=42, stratify=y_multi
)

# 训练模型
print("多类逻辑回归模型比较:")
print("=" * 40)

multi_results, X_train_multi_scaled, X_test_multi_scaled = multi_clf.train_multiclass_models(
    X_train_multi, y_train_multi, X_test_multi, y_test_multi
)

# 绘制决策边界
multi_clf.plot_multiclass_decision_boundaries(X_multi, y_multi)

# 绘制混淆矩阵
multi_clf.plot_confusion_matrices(y_test_multi, multi_results)

# 详细分类报告
print("\n详细分类报告:")
for name, result in multi_results.items():
    print(f"\n{name}:")
    print(classification_report(y_test_multi, result['y_pred'], 
                              target_names=[f'类别{i}' for i in range(3)]))

3.4 决策树

3.4.1 决策树原理

决策树是一种基于树结构的监督学习算法,通过一系列if-else条件来进行决策。

”`python from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor from sklearn.tree import plot_tree, export_text from sklearn import tree import graphviz

class DecisionTreeAnalyzer: “”” 决策树分析器 “”” def init(self): self.models = {} self.feature_names = None

def information_gain_demo(self):
    """
    演示信息增益计算
    """
    def entropy(y):
        """计算熵"""
        if len(y) == 0:
            return 0

        _, counts = np.unique(y, return_counts=True)
        probabilities = counts / len(y)
        return -np.sum(probabilities * np.log2(probabilities + 1e-10))

    def gini_impurity(y):
        """计算基尼不纯度"""
        if len(y) == 0:
            return 0

        _, counts = np.unique(y, return_counts=True)
        probabilities = counts / len(y)
        return 1 - np.sum(probabilities ** 2)

    # 示例数据
    y_example = np.array([0, 0, 1, 1, 1, 0, 1, 0, 1, 1])

    print("决策树分裂准则演示:")
    print("=" * 30)
    print(f"示例标签: {y_example}")
    print(f"熵: {entropy(y_example):.4f}")
    print(f"基尼不纯度: {gini_impurity(y_example):.4f}")

    # 模拟分裂
    left_split = y_example[:5]  # [0, 0, 1, 1, 1]
    right_split = y_example[5:]  # [0, 1, 0, 1, 1]

    print(f"\n分裂后:")
    print(f"左子树: {left_split}, 熵: {entropy(left_split):.4f}, 基尼: {gini_impurity(left_split):.4f}")
    print(f"右子树: {right_split}, 熵: {entropy(right_split):.4f}, 基尼: {gini_impurity(right_split):.4f}")

    # 计算信息增益
    original_entropy = entropy(y_example)
    weighted_entropy = (len(left_split) * entropy(left_split) + 
                      len(right_split) * entropy(right_split)) / len(y_example)
    information_gain = original_entropy - weighted_entropy

    print(f"\n信息增益: {information_gain:.4f}")

    return entropy, gini_impurity

def train_decision_trees(self, X_train, y_train, X_test, y_test, problem_type='classification'):
    """
    训练不同参数的决策树
    """
    if problem_type == 'classification':
        models = {
            'Default': DecisionTreeClassifier(random_state=42),
            'Max Depth 3': DecisionTreeClassifier(max_depth=3, random_state=42),
            'Min Samples Split 20': DecisionTreeClassifier(min_samples_split=20, random_state=42),
            'Min Samples Leaf 10': DecisionTreeClassifier(min_samples_leaf=10, random_state=42),
            'Pruned': DecisionTreeClassifier(max_depth=5, min_samples_split=10, 
                                           min_samples_leaf=5, random_state=42)
        }
    else:
        models = {
            'Default': DecisionTreeRegressor(random_state=42),
            'Max Depth 3': DecisionTreeRegressor(max_depth=3, random_state=42),
            'Min Samples Split 20': DecisionTreeRegressor(min_samples_split=20, random_state=42),
            'Min Samples Leaf 10': DecisionTreeRegressor(min_samples_leaf=10, random_state=42),
            'Pruned': DecisionTreeRegressor(max_depth=5, min_samples_split=10, 
                                          min_samples_leaf=5, random_state=42)
        }

    results = {}

    for name, model in models.items():
        # 训练
        model.fit(X_train, y_train)

        # 预测
        y_pred_train = model.predict(X_train)
        y_pred_test = model.predict(X_test)

        if problem_type == 'classification':
            train_score = accuracy_score(y_train, y_pred_train)
            test_score = accuracy_score(y_test, y_pred_test)
            metric_name = '准确率'
        else:
            train_score = r2_score(y_train, y_pred_train)
            test_score = r2_score(y_test, y_pred_test)
            metric_name = 'R²分数'

        print(f"{name}:")
        print(f"  训练{metric_name}: {train_score:.4f}")
        print(f"  测试{metric_name}: {test_score:.4f}")
        print(f"  树深度: {model.get_depth()}")
        print(f"  叶子节点数: {model.get_n_leaves()}")
        print()

        self.models[name] = model
        results[name] = {
            'train_score': train_score,
            'test_score': test_score,
            'depth': model.get_depth(),
            'n_leaves': model.get_n_leaves(),
            'y_pred_test': y_pred_test
        }

    return results

def visualize_tree(self, model_name='Pruned', feature_names=None, class_names=None, max_depth=3):
    """
    可视化决策树
    """
    if model_name not in self.models:
        print(f"模型 {model_name} 不存在")
        return

    model = self.models[model_name]

    # 文本表示
    print(f"\n{model_name} 决策树文本表示:")
    print("=" * 40)
    tree_rules = export_text(model, feature_names=feature_names, max_depth=max_depth)
    print(tree_rules)

    # 图形可视化
    plt.figure(figsize=(15, 10))
    plot_tree(model, 
             feature_names=feature_names,
             class_names=class_names,
             filled=True,
             rounded=True,
             fontsize=10,
             max_depth=max_depth)
    plt.title(f'{model_name} 决策树可视化')
    plt.show()

def feature_importance_analysis(self, feature_names=None):
    """
    特征重要性分析
    """
    if not self.models:
        print("没有训练好的模型")
        return

    # 收集所有模型的特征重要性
    importance_data = []

    for name, model in self.models.items():
        if hasattr(model, 'feature_importances_'):
            importance_data.append({
                'model': name,
                'importances': model.feature_importances_
            })

    if not importance_data:
        print("模型没有特征重要性信息")
        return

    # 可视化
    n_features = len(importance_data[0]['importances'])
    feature_names = feature_names or [f'特征{i+1}' for i in range(n_features)]

    fig, ax = plt.subplots(figsize=(10, 6))

    x = np.arange(n_features)
    width = 0.15

    for i, data in enumerate(importance_data):
        ax.bar(x + i * width, data['importances'], width, 
              label=data['model'], alpha=0.8)

    ax.set_xlabel('特征')
    ax.set_ylabel('重要性')
    ax.set_title('决策树特征重要性比较')
    ax.set_xticks(x + width * (len(importance_data) - 1) / 2)
    ax.set_xticklabels(feature_names, rotation=45)
    ax.legend()
    ax.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

    return importance_data

def plot_tree_complexity(self, results):
    """
    绘制树复杂度与性能的关系
    """
    models = list(results.keys())
    train_scores = [results[model]['train_score'] for model in models]
    test_scores = [results[model]['test_score'] for model in models]
    depths = [results[model]['depth'] for model in models]
    n_leaves = [results[model]['n_leaves'] for model in models]

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

    # 深度 vs 性能
    ax1.scatter(depths, train_scores, label='训练集', alpha=0.7, s=60)
    ax1.scatter(depths, test_scores, label='测试集', alpha=0.7, s=60)

    for i, model in enumerate(models):
        ax1.annotate(model, (depths[i], test_scores[i]), 
                    xytext=(5, 5), textcoords='offset points', fontsize=8)

    ax1.set_xlabel('树深度')
    ax1.set_ylabel('性能分数')
    ax1.set_title('树深度 vs 性能')
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    # 叶子节点数 vs 性能
    ax2.scatter(n_leaves, train_scores, label='训练集', alpha=0.7, s=60)
    ax2.scatter(n_leaves, test_scores, label='测试集', alpha=0.7, s=60)

    for i, model in enumerate(models):
        ax2.annotate(model, (n_leaves[i], test_scores[i]), 
                    xytext=(5, 5), textcoords='offset points', fontsize=8)

    ax2.set_xlabel('叶子节点数')
    ax2.set_ylabel('性能分数')
    ax2.set_title('叶子节点数 vs 性能')
    ax2.legend()
    ax2.grid(True, alpha=