本章概述
回归分析是机器学习中的基础任务之一,用于预测连续数值型目标变量。本章将深入介绍Scikit-learn中的主要回归算法,包括线性回归、多项式回归、正则化方法、回归树等,并通过丰富的代码示例和可视化帮助理解。
学习目标
- 理解回归问题的本质和评估指标
- 掌握线性回归的原理和实现
- 学习多项式回归处理非线性关系
- 理解正则化方法(岭回归、Lasso、弹性网络)
- 掌握回归树和集成回归方法
- 学会选择合适的回归算法
- 完成房价预测实战项目
1. 回归基础
1.1 回归问题概述
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.pipeline import Pipeline
import warnings
warnings.filterwarnings('ignore')
# 设置中文字体和图形样式
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
sns.set_style("whitegrid")
class RegressionBasics:
"""回归基础知识演示"""
def __init__(self):
self.datasets = {}
def regression_overview(self):
"""回归问题概述"""
print("=== 回归问题概述 ===")
print("1. 定义: 预测连续数值型目标变量")
print("2. 目标: 找到输入特征与输出之间的函数关系")
print("3. 应用场景:")
print(" - 房价预测")
print(" - 股票价格预测")
print(" - 销售额预测")
print(" - 温度预测")
print("4. 与分类的区别:")
print(" - 分类: 预测离散类别")
print(" - 回归: 预测连续数值")
print("5. 常见算法:")
print(" - 线性回归")
print(" - 多项式回归")
print(" - 岭回归/Lasso回归")
print(" - 回归树")
print(" - 支持向量回归")
def create_sample_datasets(self):
"""创建示例数据集"""
np.random.seed(42)
# 1. 简单线性关系
n_samples = 100
X_linear = np.random.uniform(-3, 3, (n_samples, 1))
y_linear = 2 * X_linear.ravel() + 1 + np.random.normal(0, 0.5, n_samples)
# 2. 非线性关系
X_nonlinear = np.random.uniform(-2, 2, (n_samples, 1))
y_nonlinear = X_nonlinear.ravel()**2 + 0.5 * X_nonlinear.ravel() + np.random.normal(0, 0.3, n_samples)
# 3. 多特征线性关系
X_multi = np.random.randn(n_samples, 3)
y_multi = 2*X_multi[:, 0] - 1.5*X_multi[:, 1] + 0.8*X_multi[:, 2] + np.random.normal(0, 0.2, n_samples)
# 4. 带噪声的复杂关系
X_complex = np.random.uniform(-3, 3, (n_samples, 1))
y_complex = (np.sin(X_complex.ravel()) + 0.3 * X_complex.ravel()**2 +
np.random.normal(0, 0.2, n_samples))
self.datasets = {
'linear': (X_linear, y_linear),
'nonlinear': (X_nonlinear, y_nonlinear),
'multi': (X_multi, y_multi),
'complex': (X_complex, y_complex)
}
# 可视化数据集
self.plot_sample_datasets()
return self.datasets
def plot_sample_datasets(self):
"""可视化示例数据集"""
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
# 线性关系
X_linear, y_linear = self.datasets['linear']
axes[0, 0].scatter(X_linear, y_linear, alpha=0.6, color='blue')
axes[0, 0].set_title('线性关系数据')
axes[0, 0].set_xlabel('X')
axes[0, 0].set_ylabel('y')
axes[0, 0].grid(True, alpha=0.3)
# 非线性关系
X_nonlinear, y_nonlinear = self.datasets['nonlinear']
axes[0, 1].scatter(X_nonlinear, y_nonlinear, alpha=0.6, color='green')
axes[0, 1].set_title('非线性关系数据')
axes[0, 1].set_xlabel('X')
axes[0, 1].set_ylabel('y')
axes[0, 1].grid(True, alpha=0.3)
# 多特征关系(显示前两个特征)
X_multi, y_multi = self.datasets['multi']
scatter = axes[1, 0].scatter(X_multi[:, 0], X_multi[:, 1], c=y_multi,
cmap='viridis', alpha=0.6)
axes[1, 0].set_title('多特征关系数据')
axes[1, 0].set_xlabel('特征1')
axes[1, 0].set_ylabel('特征2')
plt.colorbar(scatter, ax=axes[1, 0], label='目标值')
axes[1, 0].grid(True, alpha=0.3)
# 复杂关系
X_complex, y_complex = self.datasets['complex']
axes[1, 1].scatter(X_complex, y_complex, alpha=0.6, color='red')
axes[1, 1].set_title('复杂关系数据')
axes[1, 1].set_xlabel('X')
axes[1, 1].set_ylabel('y')
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def regression_metrics_explanation(self):
"""回归评估指标解释"""
print("=== 回归评估指标 ===")
print()
print("1. 均方误差 (MSE - Mean Squared Error)")
print(" 公式: MSE = (1/n) * Σ(y_true - y_pred)²")
print(" 特点: 对大误差敏感,单位是目标变量的平方")
print()
print("2. 均方根误差 (RMSE - Root Mean Squared Error)")
print(" 公式: RMSE = √MSE")
print(" 特点: 与目标变量同单位,易于解释")
print()
print("3. 平均绝对误差 (MAE - Mean Absolute Error)")
print(" 公式: MAE = (1/n) * Σ|y_true - y_pred|")
print(" 特点: 对异常值不敏感,线性惩罚")
print()
print("4. 决定系数 (R² - R-squared)")
print(" 公式: R² = 1 - SS_res/SS_tot")
print(" 特点: 0-1之间,越接近1越好,表示解释的方差比例")
print()
# 可视化不同误差的影响
self.visualize_error_metrics()
def visualize_error_metrics(self):
"""可视化不同误差指标的特点"""
# 创建示例数据
y_true = np.array([1, 2, 3, 4, 5])
predictions = [
np.array([1.1, 2.1, 2.9, 4.1, 4.9]), # 小误差
np.array([1.5, 2.5, 2.5, 4.5, 4.5]), # 中等误差
np.array([2, 3, 2, 5, 3]), # 大误差
np.array([1, 2, 3, 4, 10]) # 包含异常值
]
labels = ['小误差', '中等误差', '大误差', '包含异常值']
# 计算各种指标
metrics_data = []
for i, y_pred in enumerate(predictions):
mse = mean_squared_error(y_true, y_pred)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_true, y_pred)
r2 = r2_score(y_true, y_pred)
metrics_data.append({
'scenario': labels[i],
'MSE': mse,
'RMSE': rmse,
'MAE': mae,
'R²': r2
})
# 可视化
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
scenarios = [data['scenario'] for data in metrics_data]
# MSE
mse_values = [data['MSE'] for data in metrics_data]
axes[0, 0].bar(scenarios, mse_values, alpha=0.7, color='skyblue')
axes[0, 0].set_title('均方误差 (MSE)')
axes[0, 0].set_ylabel('MSE')
axes[0, 0].tick_params(axis='x', rotation=45)
# RMSE
rmse_values = [data['RMSE'] for data in metrics_data]
axes[0, 1].bar(scenarios, rmse_values, alpha=0.7, color='lightgreen')
axes[0, 1].set_title('均方根误差 (RMSE)')
axes[0, 1].set_ylabel('RMSE')
axes[0, 1].tick_params(axis='x', rotation=45)
# MAE
mae_values = [data['MAE'] for data in metrics_data]
axes[1, 0].bar(scenarios, mae_values, alpha=0.7, color='lightcoral')
axes[1, 0].set_title('平均绝对误差 (MAE)')
axes[1, 0].set_ylabel('MAE')
axes[1, 0].tick_params(axis='x', rotation=45)
# R²
r2_values = [data['R²'] for data in metrics_data]
axes[1, 1].bar(scenarios, r2_values, alpha=0.7, color='gold')
axes[1, 1].set_title('决定系数 (R²)')
axes[1, 1].set_ylabel('R²')
axes[1, 1].tick_params(axis='x', rotation=45)
plt.tight_layout()
plt.show()
# 打印数值结果
print("\n指标比较表:")
df_metrics = pd.DataFrame(metrics_data)
print(df_metrics.round(3))
# 演示回归基础
regression_basics = RegressionBasics()
# 回归概述
regression_basics.regression_overview()
# 创建示例数据集
datasets = regression_basics.create_sample_datasets()
# 评估指标解释
regression_basics.regression_metrics_explanation()
2. 线性回归
2.1 线性回归原理与实现
class LinearRegressionDemo:
"""线性回归演示"""
def __init__(self):
self.models = {}
self.results = {}
def linear_regression_theory(self):
"""线性回归理论解释"""
print("=== 线性回归理论 ===")
print("1. 基本假设:")
print(" - 线性关系: y = β₀ + β₁x₁ + β₂x₂ + ... + βₙxₙ + ε")
print(" - 误差项独立同分布")
print(" - 误差项服从正态分布")
print(" - 特征之间无完全共线性")
print()
print("2. 目标函数:")
print(" - 最小化残差平方和: min Σ(y - ŷ)²")
print(" - 最小二乘法求解")
print()
print("3. 解析解:")
print(" - β = (X'X)⁻¹X'y")
print(" - 其中X是设计矩阵,y是目标向量")
print()
print("4. 优点:")
print(" - 简单易懂,计算快速")
print(" - 有解析解,无需迭代")
print(" - 可解释性强")
print()
print("5. 缺点:")
print(" - 假设线性关系")
print(" - 对异常值敏感")
print(" - 可能过拟合(高维情况)")
# 可视化线性回归原理
self.visualize_linear_regression_principle()
def visualize_linear_regression_principle(self):
"""可视化线性回归原理"""
# 生成示例数据
np.random.seed(42)
X = np.random.uniform(-2, 2, 50)
y = 1.5 * X + 0.5 + np.random.normal(0, 0.3, 50)
# 拟合线性回归
X_reshaped = X.reshape(-1, 1)
lr = LinearRegression()
lr.fit(X_reshaped, y)
# 预测
X_plot = np.linspace(-2, 2, 100).reshape(-1, 1)
y_plot = lr.predict(X_plot)
# 计算残差
y_pred = lr.predict(X_reshaped)
residuals = y - y_pred
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
# 原始数据和拟合线
axes[0, 0].scatter(X, y, alpha=0.6, color='blue', label='数据点')
axes[0, 0].plot(X_plot, y_plot, color='red', linewidth=2, label='拟合线')
axes[0, 0].set_title('线性回归拟合')
axes[0, 0].set_xlabel('X')
axes[0, 0].set_ylabel('y')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 残差图
axes[0, 1].scatter(y_pred, residuals, alpha=0.6, color='green')
axes[0, 1].axhline(y=0, color='red', linestyle='--')
axes[0, 1].set_title('残差图')
axes[0, 1].set_xlabel('预测值')
axes[0, 1].set_ylabel('残差')
axes[0, 1].grid(True, alpha=0.3)
# 残差分布
axes[1, 0].hist(residuals, bins=15, alpha=0.7, color='orange', edgecolor='black')
axes[1, 0].set_title('残差分布')
axes[1, 0].set_xlabel('残差')
axes[1, 0].set_ylabel('频次')
axes[1, 0].grid(True, alpha=0.3)
# QQ图检验正态性
from scipy import stats
stats.probplot(residuals, dist="norm", plot=axes[1, 1])
axes[1, 1].set_title('残差QQ图')
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 打印模型参数
print(f"\n模型参数:")
print(f"截距 (β₀): {lr.intercept_:.3f}")
print(f"斜率 (β₁): {lr.coef_[0]:.3f}")
print(f"R²分数: {lr.score(X_reshaped, y):.3f}")
def simple_linear_regression(self, X, y):
"""简单线性回归"""
print("=== 简单线性回归 ===")
# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 训练模型
lr = LinearRegression()
lr.fit(X_train, y_train)
# 预测
y_train_pred = lr.predict(X_train)
y_test_pred = lr.predict(X_test)
# 评估
train_mse = mean_squared_error(y_train, y_train_pred)
test_mse = mean_squared_error(y_test, y_test_pred)
train_r2 = r2_score(y_train, y_train_pred)
test_r2 = r2_score(y_test, y_test_pred)
print(f"训练集 MSE: {train_mse:.4f}")
print(f"测试集 MSE: {test_mse:.4f}")
print(f"训练集 R²: {train_r2:.4f}")
print(f"测试集 R²: {test_r2:.4f}")
# 可视化结果
if X.shape[1] == 1: # 单特征情况
self.plot_simple_regression_results(X, y, lr, X_train, X_test, y_train, y_test)
self.models['simple_linear'] = lr
return lr
def plot_simple_regression_results(self, X, y, model, X_train, X_test, y_train, y_test):
"""可视化简单线性回归结果"""
fig, axes = plt.subplots(1, 2, figsize=(15, 6))
# 拟合结果
X_plot = np.linspace(X.min(), X.max(), 100).reshape(-1, 1)
y_plot = model.predict(X_plot)
axes[0].scatter(X_train, y_train, alpha=0.6, color='blue', label='训练集')
axes[0].scatter(X_test, y_test, alpha=0.6, color='red', label='测试集')
axes[0].plot(X_plot, y_plot, color='green', linewidth=2, label='拟合线')
axes[0].set_title('线性回归拟合结果')
axes[0].set_xlabel('X')
axes[0].set_ylabel('y')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 预测vs真实值
y_pred_all = model.predict(X)
axes[1].scatter(y, y_pred_all, alpha=0.6)
axes[1].plot([y.min(), y.max()], [y.min(), y.max()], 'r--', linewidth=2)
axes[1].set_title('预测值 vs 真实值')
axes[1].set_xlabel('真实值')
axes[1].set_ylabel('预测值')
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def multiple_linear_regression(self, X, y):
"""多元线性回归"""
print("\n=== 多元线性回归 ===")
# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 特征标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 训练模型
lr = LinearRegression()
lr.fit(X_train_scaled, y_train)
# 预测
y_train_pred = lr.predict(X_train_scaled)
y_test_pred = lr.predict(X_test_scaled)
# 评估
train_mse = mean_squared_error(y_train, y_train_pred)
test_mse = mean_squared_error(y_test, y_test_pred)
train_r2 = r2_score(y_train, y_train_pred)
test_r2 = r2_score(y_test, y_test_pred)
print(f"训练集 MSE: {train_mse:.4f}")
print(f"测试集 MSE: {test_mse:.4f}")
print(f"训练集 R²: {train_r2:.4f}")
print(f"测试集 R²: {test_r2:.4f}")
# 特征重要性分析
feature_importance = np.abs(lr.coef_)
feature_names = [f'特征{i+1}' for i in range(len(feature_importance))]
print(f"\n特征重要性 (系数绝对值):")
for name, importance in zip(feature_names, feature_importance):
print(f"{name}: {importance:.4f}")
# 可视化特征重要性
plt.figure(figsize=(10, 6))
bars = plt.bar(feature_names, feature_importance, alpha=0.7)
plt.title('特征重要性 (系数绝对值)')
plt.xlabel('特征')
plt.ylabel('重要性')
plt.grid(True, alpha=0.3)
# 添加数值标签
for bar, importance in zip(bars, feature_importance):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
f'{importance:.3f}', ha='center', va='bottom')
plt.tight_layout()
plt.show()
self.models['multiple_linear'] = lr
return lr, scaler
def linear_regression_assumptions_check(self, X, y, model):
"""检验线性回归假设"""
print("\n=== 线性回归假设检验 ===")
# 预测和残差
y_pred = model.predict(X)
residuals = y - y_pred
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
# 1. 线性关系检验
axes[0, 0].scatter(y_pred, y, alpha=0.6)
axes[0, 0].plot([y.min(), y.max()], [y.min(), y.max()], 'r--')
axes[0, 0].set_title('线性关系检验\n预测值 vs 真实值')
axes[0, 0].set_xlabel('预测值')
axes[0, 0].set_ylabel('真实值')
axes[0, 0].grid(True, alpha=0.3)
# 2. 残差独立性检验
axes[0, 1].scatter(y_pred, residuals, alpha=0.6)
axes[0, 1].axhline(y=0, color='red', linestyle='--')
axes[0, 1].set_title('残差独立性检验\n残差 vs 预测值')
axes[0, 1].set_xlabel('预测值')
axes[0, 1].set_ylabel('残差')
axes[0, 1].grid(True, alpha=0.3)
# 3. 残差正态性检验
axes[0, 2].hist(residuals, bins=20, alpha=0.7, edgecolor='black')
axes[0, 2].set_title('残差正态性检验\n残差分布')
axes[0, 2].set_xlabel('残差')
axes[0, 2].set_ylabel('频次')
axes[0, 2].grid(True, alpha=0.3)
# 4. 残差QQ图
from scipy import stats
stats.probplot(residuals, dist="norm", plot=axes[1, 0])
axes[1, 0].set_title('残差QQ图')
axes[1, 0].grid(True, alpha=0.3)
# 5. 残差vs拟合值(检验等方差性)
axes[1, 1].scatter(y_pred, np.abs(residuals), alpha=0.6)
axes[1, 1].set_title('等方差性检验\n|残差| vs 预测值')
axes[1, 1].set_xlabel('预测值')
axes[1, 1].set_ylabel('|残差|')
axes[1, 1].grid(True, alpha=0.3)
# 6. 残差自相关检验(如果有时间序列特性)
if len(residuals) > 1:
axes[1, 2].plot(residuals[:-1], residuals[1:], 'o', alpha=0.6)
axes[1, 2].set_title('残差自相关检验\n残差(t) vs 残差(t-1)')
axes[1, 2].set_xlabel('残差(t-1)')
axes[1, 2].set_ylabel('残差(t)')
axes[1, 2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 统计检验
from scipy.stats import shapiro, jarque_bera
print("统计检验结果:")
# Shapiro-Wilk正态性检验
shapiro_stat, shapiro_p = shapiro(residuals)
print(f"Shapiro-Wilk检验: 统计量={shapiro_stat:.4f}, p值={shapiro_p:.4f}")
print(f"正态性: {'通过' if shapiro_p > 0.05 else '不通过'} (α=0.05)")
# Jarque-Bera正态性检验
jb_stat, jb_p = jarque_bera(residuals)
print(f"Jarque-Bera检验: 统计量={jb_stat:.4f}, p值={jb_p:.4f}")
print(f"正态性: {'通过' if jb_p > 0.05 else '不通过'} (α=0.05)")
# 演示线性回归
lr_demo = LinearRegressionDemo()
# 理论解释
lr_demo.linear_regression_theory()
# 简单线性回归
X_linear, y_linear = datasets['linear']
simple_lr = lr_demo.simple_linear_regression(X_linear, y_linear)
# 多元线性回归
X_multi, y_multi = datasets['multi']
multiple_lr, scaler = lr_demo.multiple_linear_regression(X_multi, y_multi)
# 假设检验
lr_demo.linear_regression_assumptions_check(X_linear, y_linear, simple_lr)
3. 多项式回归
3.1 多项式回归原理与实现
class PolynomialRegressionDemo:
"""多项式回归演示"""
def __init__(self):
self.models = {}
self.scalers = {}
def polynomial_regression_theory(self):
"""多项式回归理论解释"""
print("=== 多项式回归理论 ===")
print("1. 基本思想:")
print(" - 将非线性关系转换为线性关系")
print(" - 通过增加特征的高次项来拟合非线性数据")
print()
print("2. 数学形式:")
print(" - 一元: y = β₀ + β₁x + β₂x² + ... + βₙxⁿ")
print(" - 多元: y = β₀ + Σβᵢxᵢ + Σβᵢⱼxᵢxⱼ + Σβᵢⱼₖxᵢxⱼxₖ + ...")
print()
print("3. 特征变换:")
print(" - 原特征: [x₁, x₂]")
print(" - 2次多项式: [1, x₁, x₂, x₁², x₁x₂, x₂²]")
print(" - 3次多项式: [1, x₁, x₂, x₁², x₁x₂, x₂², x₁³, x₁²x₂, x₁x₂², x₂³]")
print()
print("4. 优点:")
print(" - 可以拟合非线性关系")
print(" - 仍然是线性模型,易于求解")
print(" - 可解释性较好")
print()
print("5. 缺点:")
print(" - 容易过拟合")
print(" - 特征数量快速增长")
print(" - 对异常值敏感")
# 可视化多项式特征
self.visualize_polynomial_features()
def visualize_polynomial_features(self):
"""可视化多项式特征"""
# 生成示例数据
X = np.array([[1, 2], [2, 3], [3, 1]])
print("\n=== 多项式特征变换示例 ===")
print("原始特征:")
print(X)
# 不同阶数的多项式特征
degrees = [1, 2, 3]
for degree in degrees:
poly = PolynomialFeatures(degree=degree, include_bias=True)
X_poly = poly.fit_transform(X)
feature_names = poly.get_feature_names_out(['x1', 'x2'])
print(f"\n{degree}次多项式特征:")
print(f"特征名称: {feature_names}")
print(f"特征矩阵形状: {X_poly.shape}")
print("变换后的特征:")
print(X_poly)
def polynomial_regression_comparison(self, X, y):
"""不同阶数多项式回归比较"""
print("\n=== 不同阶数多项式回归比较 ===")
# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
degrees = range(1, 8)
train_scores = []
test_scores = []
train_mse = []
test_mse = []
models = {}
for degree in degrees:
# 创建多项式特征
poly = PolynomialFeatures(degree=degree)
X_train_poly = poly.fit_transform(X_train)
X_test_poly = poly.transform(X_test)
# 标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_poly)
X_test_scaled = scaler.transform(X_test_poly)
# 训练模型
lr = LinearRegression()
lr.fit(X_train_scaled, y_train)
# 预测和评估
y_train_pred = lr.predict(X_train_scaled)
y_test_pred = lr.predict(X_test_scaled)
train_score = r2_score(y_train, y_train_pred)
test_score = r2_score(y_test, y_test_pred)
train_mse_score = mean_squared_error(y_train, y_train_pred)
test_mse_score = mean_squared_error(y_test, y_test_pred)
train_scores.append(train_score)
test_scores.append(test_score)
train_mse.append(train_mse_score)
test_mse.append(test_mse_score)
models[degree] = (lr, poly, scaler)
print(f"阶数 {degree}: 训练R²={train_score:.4f}, 测试R²={test_score:.4f}")
# 可视化比较结果
self.plot_polynomial_comparison(degrees, train_scores, test_scores, train_mse, test_mse)
# 选择最佳阶数
best_degree = degrees[np.argmax(test_scores)]
print(f"\n最佳多项式阶数: {best_degree}")
self.models.update(models)
return models[best_degree]
def plot_polynomial_comparison(self, degrees, train_scores, test_scores, train_mse, test_mse):
"""可视化多项式回归比较结果"""
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
# R²分数比较
axes[0, 0].plot(degrees, train_scores, 'o-', label='训练集', color='blue')
axes[0, 0].plot(degrees, test_scores, 's-', label='测试集', color='red')
axes[0, 0].set_title('R²分数 vs 多项式阶数')
axes[0, 0].set_xlabel('多项式阶数')
axes[0, 0].set_ylabel('R²分数')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# MSE比较
axes[0, 1].plot(degrees, train_mse, 'o-', label='训练集', color='blue')
axes[0, 1].plot(degrees, test_mse, 's-', label='测试集', color='red')
axes[0, 1].set_title('MSE vs 多项式阶数')
axes[0, 1].set_xlabel('多项式阶数')
axes[0, 1].set_ylabel('MSE')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
axes[0, 1].set_yscale('log')
# 过拟合检测
overfitting = np.array(train_scores) - np.array(test_scores)
axes[1, 0].plot(degrees, overfitting, 'o-', color='purple')
axes[1, 0].axhline(y=0, color='red', linestyle='--')
axes[1, 0].set_title('过拟合程度 (训练R² - 测试R²)')
axes[1, 0].set_xlabel('多项式阶数')
axes[1, 0].set_ylabel('过拟合程度')
axes[1, 0].grid(True, alpha=0.3)
# 模型复杂度
feature_counts = []
for degree in degrees:
poly = PolynomialFeatures(degree=degree)
# 假设单特征输入
X_dummy = np.array([[1]])
X_poly = poly.fit_transform(X_dummy)
feature_counts.append(X_poly.shape[1])
axes[1, 1].plot(degrees, feature_counts, 'o-', color='green')
axes[1, 1].set_title('特征数量 vs 多项式阶数')
axes[1, 1].set_xlabel('多项式阶数')
axes[1, 1].set_ylabel('特征数量')
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def polynomial_regression_visualization(self, X, y, degree=3):
"""可视化多项式回归拟合效果"""
if X.shape[1] != 1:
print("可视化仅支持单特征数据")
return
print(f"\n=== {degree}次多项式回归可视化 ===")
# 创建多项式特征
poly = PolynomialFeatures(degree=degree)
X_poly = poly.fit_transform(X)
# 训练模型
lr = LinearRegression()
lr.fit(X_poly, y)
# 生成平滑的预测曲线
X_plot = np.linspace(X.min(), X.max(), 300).reshape(-1, 1)
X_plot_poly = poly.transform(X_plot)
y_plot = lr.predict(X_plot_poly)
# 预测原数据
y_pred = lr.predict(X_poly)
# 计算评估指标
mse = mean_squared_error(y, y_pred)
r2 = r2_score(y, y_pred)
# 可视化
plt.figure(figsize=(12, 8))
plt.scatter(X, y, alpha=0.6, color='blue', s=50, label='原始数据')
plt.plot(X_plot, y_plot, color='red', linewidth=2, label=f'{degree}次多项式拟合')
plt.title(f'{degree}次多项式回归\nMSE: {mse:.4f}, R²: {r2:.4f}')
plt.xlabel('X')
plt.ylabel('y')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 打印模型信息
print(f"模型系数: {lr.coef_}")
print(f"截距: {lr.intercept_:.4f}")
print(f"特征数量: {X_poly.shape[1]}")
return lr, poly
def cross_validation_polynomial(self, X, y, max_degree=10):
"""使用交叉验证选择最佳多项式阶数"""
print("\n=== 交叉验证选择最佳多项式阶数 ===")
degrees = range(1, max_degree + 1)
cv_scores_mean = []
cv_scores_std = []
for degree in degrees:
# 创建管道
pipeline = Pipeline([
('poly', PolynomialFeatures(degree=degree)),
('scaler', StandardScaler()),
('lr', LinearRegression())
])
# 交叉验证
cv_scores = cross_val_score(pipeline, X, y, cv=5, scoring='r2')
cv_scores_mean.append(cv_scores.mean())
cv_scores_std.append(cv_scores.std())
print(f"阶数 {degree}: CV R² = {cv_scores.mean():.4f} ± {cv_scores.std():.4f}")
# 可视化交叉验证结果
plt.figure(figsize=(10, 6))
plt.errorbar(degrees, cv_scores_mean, yerr=cv_scores_std,
marker='o', capsize=5, capthick=2)
plt.title('交叉验证选择最佳多项式阶数')
plt.xlabel('多项式阶数')
plt.ylabel('交叉验证 R² 分数')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 选择最佳阶数
best_degree = degrees[np.argmax(cv_scores_mean)]
best_score = max(cv_scores_mean)
print(f"\n最佳多项式阶数: {best_degree}")
print(f"最佳交叉验证分数: {best_score:.4f}")
return best_degree
# 演示多项式回归
poly_demo = PolynomialRegressionDemo()
# 理论解释
poly_demo.polynomial_regression_theory()
# 不同阶数比较
X_nonlinear, y_nonlinear = datasets['nonlinear']
best_model = poly_demo.polynomial_regression_comparison(X_nonlinear, y_nonlinear)
# 可视化拟合效果
poly_demo.polynomial_regression_visualization(X_nonlinear, y_nonlinear, degree=3)
# 交叉验证选择最佳阶数
best_degree = poly_demo.cross_validation_polynomial(X_nonlinear, y_nonlinear)
4. 正则化回归
4.1 正则化原理
class RegularizedRegressionDemo:
"""正则化回归演示"""
def __init__(self):
self.models = {}
def regularization_theory(self):
"""正则化理论解释"""
print("=== 正则化理论 ===")
print("1. 正则化目的:")
print(" - 防止过拟合")
print(" - 提高模型泛化能力")
print(" - 处理多重共线性")
print(" - 特征选择")
print()
print("2. 正则化方法:")
print(" - L1正则化 (Lasso): 促进稀疏性,自动特征选择")
print(" - L2正则化 (Ridge): 防止过拟合,保留所有特征")
print(" - 弹性网络 (Elastic Net): L1和L2的结合")
print()
print("3. 数学形式:")
print(" - Ridge: min ||y - Xβ||² + α||β||²")
print(" - Lasso: min ||y - Xβ||² + α||β||₁")
print(" - Elastic Net: min ||y - Xβ||² + α₁||β||₁ + α₂||β||²")
print()
print("4. 超参数α的作用:")
print(" - α = 0: 普通线性回归")
print(" - α → ∞: 强正则化,系数趋向于0")
print(" - 需要通过交叉验证选择最佳α")
# 可视化正则化效果
self.visualize_regularization_effect()
def visualize_regularization_effect(self):
"""可视化正则化效果"""
# 生成高维数据
np.random.seed(42)
n_samples, n_features = 100, 20
X = np.random.randn(n_samples, n_features)
# 只有前5个特征是真正有用的
true_coef = np.zeros(n_features)
true_coef[:5] = [2, -1.5, 1, -0.5, 0.8]
y = X @ true_coef + 0.1 * np.random.randn(n_samples)
# 不同的正则化强度
alphas = [0, 0.1, 1, 10, 100]
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
axes = axes.ravel()
for i, alpha in enumerate(alphas):
# Ridge回归
ridge = Ridge(alpha=alpha)
ridge.fit(X, y)
# Lasso回归
lasso = Lasso(alpha=alpha, max_iter=1000)
lasso.fit(X, y)
# 绘制系数
x_pos = np.arange(n_features)
width = 0.35
axes[i].bar(x_pos - width/2, ridge.coef_, width,
label='Ridge', alpha=0.7, color='blue')
axes[i].bar(x_pos + width/2, lasso.coef_, width,
label='Lasso', alpha=0.7, color='red')
axes[i].axhline(y=0, color='black', linestyle='-', alpha=0.3)
# 标记真实系数
axes[i].scatter(x_pos[:5], true_coef[:5],
color='green', s=100, marker='*',
label='真实系数', zorder=5)
axes[i].set_title(f'α = {alpha}')
axes[i].set_xlabel('特征索引')
axes[i].set_ylabel('系数值')
axes[i].legend()
axes[i].grid(True, alpha=0.3)
# 最后一个子图显示图例说明
axes[-1].axis('off')
axes[-1].text(0.1, 0.5,
'正则化效果说明:\n\n'
'• α = 0: 无正则化,可能过拟合\n'
'• α 增大: 系数逐渐收缩\n'
'• Ridge: 系数收缩但不为0\n'
'• Lasso: 部分系数变为0\n'
'• 绿色星号: 真实有效特征',
fontsize=12, verticalalignment='center')
plt.tight_layout()
plt.show()
def ridge_regression_demo(self, X, y):
"""岭回归演示"""
print("\n=== 岭回归演示 ===")
# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 不同的α值
alphas = np.logspace(-3, 3, 50)
train_scores = []
test_scores = []
for alpha in alphas:
ridge = Ridge(alpha=alpha)
ridge.fit(X_train_scaled, y_train)
train_score = ridge.score(X_train_scaled, y_train)
test_score = ridge.score(X_test_scaled, y_test)
train_scores.append(train_score)
test_scores.append(test_score)
# 可视化α的影响
plt.figure(figsize=(12, 8))
plt.subplot(2, 2, 1)
plt.semilogx(alphas, train_scores, label='训练集', color='blue')
plt.semilogx(alphas, test_scores, label='测试集', color='red')
plt.xlabel('α (正则化强度)')
plt.ylabel('R² 分数')
plt.title('岭回归: α vs 性能')
plt.legend()
plt.grid(True, alpha=0.3)
# 使用交叉验证选择最佳α
ridge_cv = Ridge()
param_grid = {'alpha': alphas}
grid_search = GridSearchCV(ridge_cv, param_grid, cv=5, scoring='r2')
grid_search.fit(X_train_scaled, y_train)
best_alpha = grid_search.best_params_['alpha']
best_ridge = grid_search.best_estimator_
print(f"最佳α: {best_alpha:.4f}")
print(f"最佳交叉验证分数: {grid_search.best_score_:.4f}")
# 最终模型评估
y_train_pred = best_ridge.predict(X_train_scaled)
y_test_pred = best_ridge.predict(X_test_scaled)
train_mse = mean_squared_error(y_train, y_train_pred)
test_mse = mean_squared_error(y_test, y_test_pred)
train_r2 = r2_score(y_train, y_train_pred)
test_r2 = r2_score(y_test, y_test_pred)
print(f"训练集 MSE: {train_mse:.4f}, R²: {train_r2:.4f}")
print(f"测试集 MSE: {test_mse:.4f}, R²: {test_r2:.4f}")
# 系数路径
plt.subplot(2, 2, 2)
coef_path = []
for alpha in alphas:
ridge = Ridge(alpha=alpha)
ridge.fit(X_train_scaled, y_train)
coef_path.append(ridge.coef_)
coef_path = np.array(coef_path)
for i in range(coef_path.shape[1]):
plt.semilogx(alphas, coef_path[:, i], alpha=0.7)
plt.axvline(x=best_alpha, color='red', linestyle='--', label=f'最佳α={best_alpha:.4f}')
plt.xlabel('α (正则化强度)')
plt.ylabel('系数值')
plt.title('岭回归系数路径')
plt.legend()
plt.grid(True, alpha=0.3)
# 预测vs真实值
plt.subplot(2, 2, 3)
plt.scatter(y_test, y_test_pred, alpha=0.6)
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--')
plt.xlabel('真实值')
plt.ylabel('预测值')
plt.title('岭回归: 预测 vs 真实')
plt.grid(True, alpha=0.3)
# 残差图
plt.subplot(2, 2, 4)
residuals = y_test - y_test_pred
plt.scatter(y_test_pred, residuals, alpha=0.6)
plt.axhline(y=0, color='red', linestyle='--')
plt.xlabel('预测值')
plt.ylabel('残差')
plt.title('岭回归残差图')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
self.models['ridge'] = best_ridge
return best_ridge, scaler
def lasso_regression_demo(self, X, y):
"""Lasso回归演示"""
print("\n=== Lasso回归演示 ===")
# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 不同的α值
alphas = np.logspace(-4, 1, 50)
train_scores = []
test_scores = []
n_features_selected = []
for alpha in alphas:
lasso = Lasso(alpha=alpha, max_iter=1000)
lasso.fit(X_train_scaled, y_train)
train_score = lasso.score(X_train_scaled, y_train)
test_score = lasso.score(X_test_scaled, y_test)
n_selected = np.sum(lasso.coef_ != 0)
train_scores.append(train_score)
test_scores.append(test_score)
n_features_selected.append(n_selected)
# 可视化α的影响
plt.figure(figsize=(15, 10))
plt.subplot(2, 3, 1)
plt.semilogx(alphas, train_scores, label='训练集', color='blue')
plt.semilogx(alphas, test_scores, label='测试集', color='red')
plt.xlabel('α (正则化强度)')
plt.ylabel('R² 分数')
plt.title('Lasso回归: α vs 性能')
plt.legend()
plt.grid(True, alpha=0.3)
plt.subplot(2, 3, 2)
plt.semilogx(alphas, n_features_selected, color='green')
plt.xlabel('α (正则化强度)')
plt.ylabel('选择的特征数量')
plt.title('Lasso回归: α vs 特征选择')
plt.grid(True, alpha=0.3)
# 使用交叉验证选择最佳α
lasso_cv = Lasso(max_iter=1000)
param_grid = {'alpha': alphas}
grid_search = GridSearchCV(lasso_cv, param_grid, cv=5, scoring='r2')
grid_search.fit(X_train_scaled, y_train)
best_alpha = grid_search.best_params_['alpha']
best_lasso = grid_search.best_estimator_
print(f"最佳α: {best_alpha:.4f}")
print(f"最佳交叉验证分数: {grid_search.best_score_:.4f}")
print(f"选择的特征数量: {np.sum(best_lasso.coef_ != 0)}")
# 最终模型评估
y_train_pred = best_lasso.predict(X_train_scaled)
y_test_pred = best_lasso.predict(X_test_scaled)
train_mse = mean_squared_error(y_train, y_train_pred)
test_mse = mean_squared_error(y_test, y_test_pred)
train_r2 = r2_score(y_train, y_train_pred)
test_r2 = r2_score(y_test, y_test_pred)
print(f"训练集 MSE: {train_mse:.4f}, R²: {train_r2:.4f}")
print(f"测试集 MSE: {test_mse:.4f}, R²: {test_r2:.4f}")
# 系数路径
plt.subplot(2, 3, 3)
coef_path = []
for alpha in alphas:
lasso = Lasso(alpha=alpha, max_iter=1000)
lasso.fit(X_train_scaled, y_train)
coef_path.append(lasso.coef_)
coef_path = np.array(coef_path)
for i in range(coef_path.shape[1]):
plt.semilogx(alphas, coef_path[:, i], alpha=0.7)
plt.axvline(x=best_alpha, color='red', linestyle='--', label=f'最佳α={best_alpha:.4f}')
plt.xlabel('α (正则化强度)')
plt.ylabel('系数值')
plt.title('Lasso回归系数路径')
plt.legend()
plt.grid(True, alpha=0.3)
# 特征重要性
plt.subplot(2, 3, 4)
feature_importance = np.abs(best_lasso.coef_)
selected_features = feature_importance > 0
if np.any(selected_features):
plt.bar(range(len(feature_importance)), feature_importance, alpha=0.7)
plt.xlabel('特征索引')
plt.ylabel('|系数|')
plt.title('Lasso特征重要性')
plt.grid(True, alpha=0.3)
# 预测vs真实值
plt.subplot(2, 3, 5)
plt.scatter(y_test, y_test_pred, alpha=0.6)
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--')
plt.xlabel('真实值')
plt.ylabel('预测值')
plt.title('Lasso: 预测 vs 真实')
plt.grid(True, alpha=0.3)
# 残差图
plt.subplot(2, 3, 6)
residuals = y_test - y_test_pred
plt.scatter(y_test_pred, residuals, alpha=0.6)
plt.axhline(y=0, color='red', linestyle='--')
plt.xlabel('预测值')
plt.ylabel('残差')
plt.title('Lasso残差图')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 打印选择的特征
selected_features = np.where(best_lasso.coef_ != 0)[0]
print(f"\n选择的特征索引: {selected_features}")
print(f"对应的系数: {best_lasso.coef_[selected_features]}")
self.models['lasso'] = best_lasso
return best_lasso, scaler
def elastic_net_demo(self, X, y):
"""弹性网络演示"""
print("\n=== 弹性网络演示 ===")
# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 网格搜索最佳参数
param_grid = {
'alpha': np.logspace(-4, 1, 20),
'l1_ratio': np.linspace(0.1, 0.9, 9)
}
elastic_net = ElasticNet(max_iter=1000)
grid_search = GridSearchCV(elastic_net, param_grid, cv=5, scoring='r2')
grid_search.fit(X_train_scaled, y_train)
best_alpha = grid_search.best_params_['alpha']
best_l1_ratio = grid_search.best_params_['l1_ratio']
best_elastic_net = grid_search.best_estimator_
print(f"最佳α: {best_alpha:.4f}")
print(f"最佳l1_ratio: {best_l1_ratio:.4f}")
print(f"最佳交叉验证分数: {grid_search.best_score_:.4f}")
# 最终模型评估
y_train_pred = best_elastic_net.predict(X_train_scaled)
y_test_pred = best_elastic_net.predict(X_test_scaled)
train_mse = mean_squared_error(y_train, y_train_pred)
test_mse = mean_squared_error(y_test, y_test_pred)
train_r2 = r2_score(y_train, y_train_pred)
test_r2 = r2_score(y_test, y_test_pred)
print(f"训练集 MSE: {train_mse:.4f}, R²: {train_r2:.4f}")
print(f"测试集 MSE: {test_mse:.4f}, R²: {test_r2:.4f}")
print(f"选择的特征数量: {np.sum(best_elastic_net.coef_ != 0)}")
# 可视化参数搜索结果
self.plot_elastic_net_grid_search(grid_search)
self.models['elastic_net'] = best_elastic_net
return best_elastic_net, scaler
def plot_elastic_net_grid_search(self, grid_search):
"""可视化弹性网络网格搜索结果"""
results = grid_search.cv_results_
# 重塑结果为网格形状
alphas = np.unique([params['alpha'] for params in results['params']])
l1_ratios = np.unique([params['l1_ratio'] for params in results['params']])
scores = results['mean_test_score'].reshape(len(alphas), len(l1_ratios))
plt.figure(figsize=(12, 8))
# 热力图
plt.subplot(2, 2, 1)
im = plt.imshow(scores, cmap='viridis', aspect='auto')
plt.colorbar(im, label='交叉验证分数')
plt.xlabel('l1_ratio索引')
plt.ylabel('alpha索引')
plt.title('弹性网络参数网格搜索')
# 最佳参数位置
best_alpha_idx = np.where(alphas == grid_search.best_params_['alpha'])[0][0]
best_l1_idx = np.where(l1_ratios == grid_search.best_params_['l1_ratio'])[0][0]
plt.scatter(best_l1_idx, best_alpha_idx, color='red', s=100, marker='*')
# α vs 分数(固定最佳l1_ratio)
plt.subplot(2, 2, 2)
best_l1_idx = np.where(l1_ratios == grid_search.best_params_['l1_ratio'])[0][0]
plt.semilogx(alphas, scores[:, best_l1_idx])
plt.axvline(x=grid_search.best_params_['alpha'], color='red', linestyle='--')
plt.xlabel('α')
plt.ylabel('交叉验证分数')
plt.title(f'α vs 分数 (l1_ratio={grid_search.best_params_["l1_ratio"]:.2f})')
plt.grid(True, alpha=0.3)
# l1_ratio vs 分数(固定最佳α)
plt.subplot(2, 2, 3)
best_alpha_idx = np.where(alphas == grid_search.best_params_['alpha'])[0][0]
plt.plot(l1_ratios, scores[best_alpha_idx, :])
plt.axvline(x=grid_search.best_params_['l1_ratio'], color='red', linestyle='--')
plt.xlabel('l1_ratio')
plt.ylabel('交叉验证分数')
plt.title(f'l1_ratio vs 分数 (α={grid_search.best_params_["alpha"]:.4f})')
plt.grid(True, alpha=0.3)
# 系数比较
plt.subplot(2, 2, 4)
coef = grid_search.best_estimator_.coef_
plt.bar(range(len(coef)), coef, alpha=0.7)
plt.xlabel('特征索引')
plt.ylabel('系数值')
plt.title('弹性网络最终系数')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def regularization_comparison(self, X, y):
"""正则化方法比较"""
print("\n=== 正则化方法比较 ===")
# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 定义模型
models = {
'线性回归': LinearRegression(),
'岭回归': Ridge(alpha=1.0),
'Lasso回归': Lasso(alpha=0.1, max_iter=1000),
'弹性网络': ElasticNet(alpha=0.1, l1_ratio=0.5, max_iter=1000)
}
results = {}
for name, model in models.items():
# 训练模型
model.fit(X_train_scaled, y_train)
# 预测
y_train_pred = model.predict(X_train_scaled)
y_test_pred = model.predict(X_test_scaled)
# 评估
train_mse = mean_squared_error(y_train, y_train_pred)
test_mse = mean_squared_error(y_test, y_test_pred)
train_r2 = r2_score(y_train, y_train_pred)
test_r2 = r2_score(y_test, y_test_pred)
# 计算非零系数数量
if hasattr(model, 'coef_'):
n_nonzero = np.sum(np.abs(model.coef_) > 1e-5)
else:
n_nonzero = X.shape[1]
results[name] = {
'train_mse': train_mse,
'test_mse': test_mse,
'train_r2': train_r2,
'test_r2': test_r2,
'n_nonzero': n_nonzero,
'model': model
}
print(f"{name}:")
print(f" 训练 MSE: {train_mse:.4f}, R²: {train_r2:.4f}")
print(f" 测试 MSE: {test_mse:.4f}, R²: {test_r2:.4f}")
print(f" 非零系数: {n_nonzero}")
print()
# 可视化比较结果
self.plot_regularization_comparison(results)
return results
def plot_regularization_comparison(self, results):
"""可视化正则化方法比较结果"""
methods = list(results.keys())
# 提取指标
train_mse = [results[method]['train_mse'] for method in methods]
test_mse = [results[method]['test_mse'] for method in methods]
train_r2 = [results[method]['train_r2'] for method in methods]
test_r2 = [results[method]['test_r2'] for method in methods]
n_nonzero = [results[method]['n_nonzero'] for method in methods]
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
# MSE比较
x = np.arange(len(methods))
width = 0.35
axes[0, 0].bar(x - width/2, train_mse, width, label='训练集', alpha=0.7)
axes[0, 0].bar(x + width/2, test_mse, width, label='测试集', alpha=0.7)
axes[0, 0].set_xlabel('方法')
axes[0, 0].set_ylabel('MSE')
axes[0, 0].set_title('MSE比较')
axes[0, 0].set_xticks(x)
axes[0, 0].set_xticklabels(methods, rotation=45)
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# R²比较
axes[0, 1].bar(x - width/2, train_r2, width, label='训练集', alpha=0.7)
axes[0, 1].bar(x + width/2, test_r2, width, label='测试集', alpha=0.7)
axes[0, 1].set_xlabel('方法')
axes[0, 1].set_ylabel('R²')
axes[0, 1].set_title('R²比较')
axes[0, 1].set_xticks(x)
axes[0, 1].set_xticklabels(methods, rotation=45)
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# 非零系数数量
axes[0, 2].bar(methods, n_nonzero, alpha=0.7, color='green')
axes[0, 2].set_xlabel('方法')
axes[0, 2].set_ylabel('非零系数数量')
axes[0, 2].set_title('模型复杂度')
axes[0, 2].tick_params(axis='x', rotation=45)
axes[0, 2].grid(True, alpha=0.3)
# 系数比较
for i, method in enumerate(methods):
model = results[method]['model']
if hasattr(model, 'coef_'):
axes[1, i].bar(range(len(model.coef_)), model.coef_, alpha=0.7)
axes[1, i].set_title(f'{method}系数')
axes[1, i].set_xlabel('特征索引')
axes[1, i].set_ylabel('系数值')
axes[1, i].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 演示正则化回归
reg_demo = RegularizedRegressionDemo()
# 理论解释
reg_demo.regularization_theory()
# 创建高维数据用于演示
np.random.seed(42)
n_samples, n_features = 100, 15
X_high_dim = np.random.randn(n_samples, n_features)
true_coef = np.zeros(n_features)
true_coef[:5] = [2, -1.5, 1, -0.5, 0.8] # 只有前5个特征有用
y_high_dim = X_high_dim @ true_coef + 0.1 * np.random.randn(n_samples)
# 岭回归演示
ridge_model, ridge_scaler = reg_demo.ridge_regression_demo(X_high_dim, y_high_dim)
# Lasso回归演示
lasso_model, lasso_scaler = reg_demo.lasso_regression_demo(X_high_dim, y_high_dim)
# 弹性网络演示
elastic_model, elastic_scaler = reg_demo.elastic_net_demo(X_high_dim, y_high_dim)
# 正则化方法比较
comparison_results = reg_demo.regularization_comparison(X_high_dim, y_high_dim)
4.4 回归树
4.4.1 理论基础
回归树是决策树在回归问题上的应用,通过递归地分割特征空间来预测连续值。
class RegressionTreeDemo:
def __init__(self):
self.models = {}
def theory_explanation(self):
"""回归树理论解释"""
print("=== 回归树理论 ===")
print("1. 分裂准则:最小化均方误差(MSE)")
print("2. 叶节点预测:该节点样本的平均值")
print("3. 剪枝:防止过拟合的重要技术")
print("4. 优点:非线性、可解释性强、处理缺失值")
print("5. 缺点:容易过拟合、不稳定")
def visualize_tree_splits(self):
"""可视化树的分裂过程"""
# 创建简单数据
np.random.seed(42)
X = np.linspace(0, 10, 100).reshape(-1, 1)
y = np.sin(X.ravel()) + np.random.normal(0, 0.1, 100)
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('回归树分裂过程可视化', fontsize=16)
max_depths = [1, 2, 3, 5]
for i, max_depth in enumerate(max_depths):
ax = axes[i//2, i%2]
# 训练模型
tree = DecisionTreeRegressor(max_depth=max_depth, random_state=42)
tree.fit(X, y)
# 预测
X_plot = np.linspace(0, 10, 300).reshape(-1, 1)
y_pred = tree.predict(X_plot)
# 绘图
ax.scatter(X, y, alpha=0.6, color='blue', label='真实数据')
ax.plot(X_plot, y_pred, color='red', linewidth=2, label=f'深度={max_depth}')
ax.set_title(f'最大深度: {max_depth}')
ax.set_xlabel('X')
ax.set_ylabel('y')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def basic_regression_tree(self):
"""基础回归树演示"""
# 生成数据
X, y = make_regression(n_samples=100, n_features=1, noise=10, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 训练模型
tree = DecisionTreeRegressor(random_state=42)
tree.fit(X_train, y_train)
# 预测
y_pred = tree.predict(X_test)
# 评估
mse = mean_squared_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
print(f"基础回归树:")
print(f"MSE: {mse:.4f}")
print(f"R²: {r2:.4f}")
# 可视化
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
X_plot = np.linspace(X.min(), X.max(), 300).reshape(-1, 1)
y_plot = tree.predict(X_plot)
plt.scatter(X_train, y_train, alpha=0.6, label='训练集')
plt.scatter(X_test, y_test, alpha=0.6, label='测试集')
plt.plot(X_plot, y_plot, color='red', linewidth=2, label='回归树')
plt.xlabel('X')
plt.ylabel('y')
plt.title('回归树拟合结果')
plt.legend()
plt.grid(True, alpha=0.3)
plt.subplot(1, 2, 2)
plt.scatter(y_test, y_pred, alpha=0.6)
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--', lw=2)
plt.xlabel('真实值')
plt.ylabel('预测值')
plt.title('预测 vs 真实值')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return tree
def hyperparameter_tuning(self):
"""超参数调优"""
# 生成数据
X, y = make_regression(n_samples=200, n_features=5, noise=10, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 参数网格
param_grid = {
'max_depth': [3, 5, 7, 10, None],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4],
'max_features': ['auto', 'sqrt', 'log2']
}
# 网格搜索
tree = DecisionTreeRegressor(random_state=42)
grid_search = GridSearchCV(tree, param_grid, cv=5, scoring='neg_mean_squared_error', n_jobs=-1)
grid_search.fit(X_train, y_train)
# 最佳模型
best_tree = grid_search.best_estimator_
y_pred = best_tree.predict(X_test)
print("=== 超参数调优结果 ===")
print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳交叉验证分数: {-grid_search.best_score_:.4f}")
print(f"测试集MSE: {mean_squared_error(y_test, y_pred):.4f}")
print(f"测试集R²: {r2_score(y_test, y_pred):.4f}")
return best_tree
# 演示回归树
print("=== 回归树演示 ===")
tree_demo = RegressionTreeDemo()
tree_demo.theory_explanation()
tree_demo.visualize_tree_splits()
tree_demo.basic_regression_tree()
tree_demo.hyperparameter_tuning()
4.5 算法综合比较
4.5.1 性能比较
class RegressionAlgorithmComparison:
def __init__(self):
self.algorithms = {
'Linear Regression': LinearRegression(),
'Ridge Regression': Ridge(alpha=1.0),
'Lasso Regression': Lasso(alpha=1.0),
'Elastic Net': ElasticNet(alpha=1.0, l1_ratio=0.5),
'Polynomial Regression': Pipeline([
('poly', PolynomialFeatures(degree=2)),
('linear', LinearRegression())
]),
'Decision Tree': DecisionTreeRegressor(max_depth=5, random_state=42),
'Random Forest': RandomForestRegressor(n_estimators=100, random_state=42),
'SVR': SVR(kernel='rbf', C=1.0, gamma='scale')
}
self.results = {}
def compare_on_datasets(self):
"""在不同数据集上比较算法"""
datasets = {
'Linear Data': self._generate_linear_data(),
'Polynomial Data': self._generate_polynomial_data(),
'Noisy Data': self._generate_noisy_data(),
'Boston Housing': self._load_boston_data()
}
results = {}
for dataset_name, (X, y) in datasets.items():
print(f"\n=== {dataset_name} ===")
dataset_results = {}
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 标准化特征
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
for name, model in self.algorithms.items():
try:
# 训练模型
start_time = time.time()
if name in ['SVR']:
model.fit(X_train_scaled, y_train)
y_pred = model.predict(X_test_scaled)
else:
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
training_time = time.time() - start_time
# 评估
mse = mean_squared_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
mae = mean_absolute_error(y_test, y_pred)
dataset_results[name] = {
'MSE': mse,
'R²': r2,
'MAE': mae,
'Training Time': training_time
}
print(f"{name:20} - MSE: {mse:.4f}, R²: {r2:.4f}, MAE: {mae:.4f}")
except Exception as e:
print(f"{name:20} - Error: {str(e)}")
results[dataset_name] = dataset_results
return results
def _generate_linear_data(self):
"""生成线性数据"""
X, y = make_regression(n_samples=200, n_features=5, noise=10, random_state=42)
return X, y
def _generate_polynomial_data(self):
"""生成多项式数据"""
np.random.seed(42)
X = np.random.uniform(-2, 2, (200, 1))
y = X.ravel()**3 + 2*X.ravel()**2 + np.random.normal(0, 0.5, 200)
return X, y
def _generate_noisy_data(self):
"""生成噪声数据"""
X, y = make_regression(n_samples=200, n_features=10, noise=50, random_state=42)
return X, y
def _load_boston_data(self):
"""加载波士顿房价数据"""
boston = load_boston()
return boston.data, boston.target
def algorithm_selection_guide(self):
"""算法选择指南"""
guide = {
'Linear Regression': {
'适用场景': ['线性关系', '特征数量适中', '需要可解释性'],
'优点': ['简单快速', '可解释性强', '无超参数'],
'缺点': ['只能处理线性关系', '对异常值敏感'],
'推荐条件': '数据呈线性关系且噪声较小'
},
'Ridge Regression': {
'适用场景': ['多重共线性', '特征数量较多', '防止过拟合'],
'优点': ['处理多重共线性', '防止过拟合', '稳定性好'],
'缺点': ['不能进行特征选择', '需要调参'],
'推荐条件': '特征间存在相关性或特征数量较多'
},
'Lasso Regression': {
'适用场景': ['特征选择', '稀疏解', '高维数据'],
'优点': ['自动特征选择', '产生稀疏解', '防止过拟合'],
'缺点': ['可能删除重要特征', '对相关特征选择不稳定'],
'推荐条件': '需要特征选择或期望稀疏模型'
},
'Decision Tree': {
'适用场景': ['非线性关系', '特征交互', '需要可解释性'],
'优点': ['处理非线性', '可解释性强', '处理缺失值'],
'缺点': ['容易过拟合', '不稳定'],
'推荐条件': '需要可解释的非线性模型'
}
}
print("=== 回归算法选择指南 ===\n")
for alg, info in guide.items():
print(f"【{alg}】")
for key, value in info.items():
if isinstance(value, list):
print(f" {key}: {', '.join(value)}")
else:
print(f" {key}: {value}")
print()
# 演示算法比较
print("=== 回归算法综合比较 ===")
comparison = RegressionAlgorithmComparison()
results = comparison.compare_on_datasets()
comparison.algorithm_selection_guide()
4.6 实战案例:房价预测
4.6.1 项目背景
我们将使用多种回归算法来预测房价,比较不同算法的性能。
class HousePricePrediction:
def __init__(self):
self.data = None
self.models = {}
self.results = {}
def create_dataset(self):
"""创建房价数据集"""
np.random.seed(42)
n_samples = 1000
# 生成特征
area = np.random.normal(150, 50, n_samples) # 面积
bedrooms = np.random.poisson(3, n_samples) # 卧室数
age = np.random.uniform(0, 50, n_samples) # 房龄
location_score = np.random.uniform(1, 10, n_samples) # 位置评分
# 生成目标变量(房价)
price = (area * 100 +
bedrooms * 5000 +
(50 - age) * 200 +
location_score * 3000 +
np.random.normal(0, 5000, n_samples))
# 确保价格为正
price = np.maximum(price, 10000)
# 创建DataFrame
self.data = pd.DataFrame({
'area': area,
'bedrooms': bedrooms,
'age': age,
'location_score': location_score,
'price': price
})
print("=== 数据集信息 ===")
print(self.data.describe())
print(f"\n数据形状: {self.data.shape}")
return self.data
def build_models(self):
"""构建多个回归模型"""
if self.data is None:
self.create_dataset()
# 准备数据
X = self.data.drop('price', axis=1)
y = self.data['price']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 特征缩放
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 定义模型
models = {
'Linear Regression': LinearRegression(),
'Ridge Regression': Ridge(alpha=1.0),
'Lasso Regression': Lasso(alpha=1.0),
'Decision Tree': DecisionTreeRegressor(max_depth=10, random_state=42),
'Random Forest': RandomForestRegressor(n_estimators=100, random_state=42)
}
# 训练和评估模型
results = {}
for name, model in models.items():
print(f"\n训练 {name}...")
# 训练
start_time = time.time()
if name in ['Ridge Regression', 'Lasso Regression']:
model.fit(X_train_scaled, y_train)
y_pred = model.predict(X_test_scaled)
else:
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
training_time = time.time() - start_time
# 评估
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
results[name] = {
'model': model,
'y_pred': y_pred,
'MSE': mse,
'RMSE': rmse,
'MAE': mae,
'R²': r2,
'Training Time': training_time
}
print(f" MSE: {mse:.2f}")
print(f" RMSE: {rmse:.2f}")
print(f" MAE: {mae:.2f}")
print(f" R²: {r2:.4f}")
print(f" 训练时间: {training_time:.4f}秒")
self.models = models
self.results = results
self.X_test = X_test
self.y_test = y_test
return results
# 运行房价预测项目
print("=== 房价预测实战案例 ===")
house_prediction = HousePricePrediction()
house_prediction.create_dataset()
house_prediction.build_models()
4.7 小结
本章我们深入学习了回归算法的理论和实践:
4.7.1 主要内容回顾
回归基础
- 回归问题的本质和应用场景
- 重要的评估指标:MSE、RMSE、MAE、R²
- 数据集的创建和可视化
线性回归
- 最小二乘法原理
- 简单线性回归和多元线性回归
- 假设检验和模型诊断
多项式回归
- 非线性关系的处理
- 多项式特征变换
- 过拟合问题和模型选择
正则化回归
- Ridge回归:L2正则化
- Lasso回归:L1正则化和特征选择
- Elastic Net:结合L1和L2正则化
- 正则化参数的选择
回归树
- 决策树在回归问题中的应用
- 分裂准则和剪枝技术
- 特征重要性分析
算法综合比较
- 不同算法在各种数据集上的性能
- 算法选择的指导原则
- 性能可视化和分析
实战案例
- 房价预测完整项目
- 从数据探索到模型部署
- 特征工程和模型优化
4.7.2 算法选择指导
| 算法 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| 线性回归 | 线性关系,需要可解释性 | 简单快速,可解释性强 | 只能处理线性关系 |
| Ridge回归 | 多重共线性,防止过拟合 | 处理共线性,稳定性好 | 不能特征选择 |
| Lasso回归 | 特征选择,稀疏解 | 自动特征选择 | 可能删除重要特征 |
| Elastic Net | 相关特征组,平衡性能 | 结合Ridge和Lasso优点 | 需要调两个参数 |
| 多项式回归 | 非线性关系 | 捕获非线性关系 | 容易过拟合 |
| 回归树 | 非线性,需要可解释性 | 处理非线性,可解释 | 容易过拟合,不稳定 |
4.7.3 最佳实践
数据预处理
- 检查和处理缺失值
- 异常值检测和处理
- 特征缩放和标准化
模型选择
- 从简单模型开始
- 根据数据特点选择算法
- 使用交叉验证评估性能
超参数调优
- 使用网格搜索或随机搜索
- 注意过拟合问题
- 在验证集上选择最佳参数
模型评估
- 使用多个评估指标
- 分析残差和预测误差
- 检查模型假设
4.7.4 常见陷阱
- 过拟合:模型过于复杂,在训练集上表现好但泛化能力差
- 欠拟合:模型过于简单,无法捕获数据的真实关系
- 多重共线性:特征间高度相关,影响模型稳定性
- 异常值影响:极端值对某些算法影响很大
- 特征缩放:忘记对特征进行标准化
4.7.5 下一步学习
- 无监督学习:聚类、降维、异常检测
- 集成学习:Bagging、Boosting、随机森林
- 深度学习:神经网络在回归问题中的应用
- 时间序列:时间序列回归和预测
- 贝叶斯方法:贝叶斯线性回归
4.7.6 练习题
基础练习
- 实现简单线性回归的梯度下降算法
- 比较不同正则化参数对模型的影响
- 分析多项式回归中不同阶数的效果
进阶练习
- 实现自定义的特征选择算法
- 构建回归模型的集成方法
- 分析残差的分布和模式
项目练习
- 使用真实数据集进行回归分析
- 构建端到端的回归预测系统
- 比较不同算法在特定领域的性能
通过本章的学习,你应该能够: - 理解各种回归算法的原理和适用场景 - 根据数据特点选择合适的回归算法 - 进行完整的回归分析项目 - 评估和优化回归模型的性能
下一章我们将学习无监督学习算法,探索数据中的隐藏模式和结构。 “`