📚 本章概述
数据预处理是机器学习流程中最重要的步骤之一,通常占据整个项目80%的时间。高质量的数据预处理直接决定了模型的性能上限。本章将深入讲解Scikit-learn中的数据预处理技术,包括数据清洗、特征缩放、标准化、特征选择和降维等核心内容。
🎯 学习目标
- 掌握数据预处理的基本概念和重要性
- 学会使用Scikit-learn进行数据清洗和转换
- 理解特征缩放和标准化的原理及应用
- 掌握特征选择和降维技术
- 学会构建完整的数据预处理管道
📖 目录
1. 数据预处理概述
1.1 什么是数据预处理
数据预处理是指在将数据输入机器学习算法之前,对原始数据进行清洗、转换和优化的过程。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from sklearn.feature_selection import SelectKBest, chi2, f_classif
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris, load_boston, make_classification
import warnings
warnings.filterwarnings('ignore')
# 设置中文字体和图形样式
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
sns.set_style("whitegrid")
class DataPreprocessingDemo:
"""数据预处理演示类"""
def __init__(self):
self.data = None
self.processed_data = None
def load_sample_data(self):
"""加载示例数据"""
# 创建包含各种数据问题的示例数据集
np.random.seed(42)
n_samples = 1000
# 生成基础特征
age = np.random.normal(35, 10, n_samples)
income = np.random.exponential(50000, n_samples)
education_years = np.random.poisson(12, n_samples)
# 添加缺失值
age[np.random.choice(n_samples, 50, replace=False)] = np.nan
income[np.random.choice(n_samples, 30, replace=False)] = np.nan
# 添加异常值
income[np.random.choice(n_samples, 20, replace=False)] *= 10
# 分类特征
gender = np.random.choice(['Male', 'Female'], n_samples)
city = np.random.choice(['Beijing', 'Shanghai', 'Guangzhou', 'Shenzhen'], n_samples)
# 创建DataFrame
self.data = pd.DataFrame({
'age': age,
'income': income,
'education_years': education_years,
'gender': gender,
'city': city
})
return self.data
def analyze_data_quality(self):
"""分析数据质量"""
if self.data is None:
self.load_sample_data()
print("=== 数据质量分析 ===")
print(f"数据形状: {self.data.shape}")
print(f"\n数据类型:")
print(self.data.dtypes)
print(f"\n缺失值统计:")
missing_stats = self.data.isnull().sum()
missing_percent = (missing_stats / len(self.data)) * 100
missing_df = pd.DataFrame({
'缺失数量': missing_stats,
'缺失比例(%)': missing_percent
})
print(missing_df[missing_df['缺失数量'] > 0])
print(f"\n数值特征统计:")
print(self.data.describe())
# 可视化数据分布
self.visualize_data_distribution()
def visualize_data_distribution(self):
"""可视化数据分布"""
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
fig.suptitle('数据分布分析', fontsize=16)
# 数值特征分布
numeric_cols = ['age', 'income', 'education_years']
for i, col in enumerate(numeric_cols):
axes[0, i].hist(self.data[col].dropna(), bins=30, alpha=0.7)
axes[0, i].set_title(f'{col} 分布')
axes[0, i].set_xlabel(col)
axes[0, i].set_ylabel('频次')
# 分类特征分布
categorical_cols = ['gender', 'city']
for i, col in enumerate(categorical_cols):
self.data[col].value_counts().plot(kind='bar', ax=axes[1, i])
axes[1, i].set_title(f'{col} 分布')
axes[1, i].set_xlabel(col)
axes[1, i].set_ylabel('计数')
axes[1, i].tick_params(axis='x', rotation=45)
# 缺失值热图
axes[1, 2].imshow(self.data.isnull(), cmap='viridis', aspect='auto')
axes[1, 2].set_title('缺失值模式')
axes[1, 2].set_xlabel('特征')
axes[1, 2].set_ylabel('样本')
plt.tight_layout()
plt.show()
# 创建演示实例
demo = DataPreprocessingDemo()
data = demo.load_sample_data()
demo.analyze_data_quality()
1.2 数据预处理的重要性
class PreprocessingImportanceDemo:
"""数据预处理重要性演示"""
def __init__(self):
self.raw_data = None
self.processed_data = None
def demonstrate_scaling_importance(self):
"""演示特征缩放的重要性"""
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
# 创建具有不同尺度特征的数据
np.random.seed(42)
X1 = np.random.normal(0, 1, (1000, 1)) # 标准正态分布
X2 = np.random.normal(0, 1000, (1000, 1)) # 大尺度特征
y = (X1.ravel() + X2.ravel()/1000 > 0).astype(int)
X = np.hstack([X1, X2])
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 不进行缩放的模型
knn_raw = KNeighborsClassifier(n_neighbors=5)
knn_raw.fit(X_train, y_train)
y_pred_raw = knn_raw.predict(X_test)
acc_raw = accuracy_score(y_test, y_pred_raw)
# 进行标准化的模型
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
knn_scaled = KNeighborsClassifier(n_neighbors=5)
knn_scaled.fit(X_train_scaled, y_train)
y_pred_scaled = knn_scaled.predict(X_test_scaled)
acc_scaled = accuracy_score(y_test, y_pred_scaled)
print("=== 特征缩放重要性演示 ===")
print(f"原始数据准确率: {acc_raw:.4f}")
print(f"标准化后准确率: {acc_scaled:.4f}")
print(f"性能提升: {((acc_scaled - acc_raw) / acc_raw * 100):.2f}%")
# 可视化决策边界
self.plot_decision_boundary(X, y, knn_raw, knn_scaled, scaler)
def plot_decision_boundary(self, X, y, model_raw, model_scaled, scaler):
"""绘制决策边界"""
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# 创建网格
h = 0.02
x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
y_min, y_max = X[:, 1].min() - 100, X[:, 1].max() + 100
xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
np.arange(y_min, y_max, h*100))
# 原始数据决策边界
mesh_points = np.c_[xx.ravel(), yy.ravel()]
Z_raw = model_raw.predict(mesh_points)
Z_raw = Z_raw.reshape(xx.shape)
ax1.contourf(xx, yy, Z_raw, alpha=0.8, cmap=plt.cm.RdYlBu)
scatter = ax1.scatter(X[:, 0], X[:, 1], c=y, cmap=plt.cm.RdYlBu, edgecolors='black')
ax1.set_title('原始数据 (未缩放)')
ax1.set_xlabel('特征1 (小尺度)')
ax1.set_ylabel('特征2 (大尺度)')
# 标准化数据决策边界
X_scaled = scaler.transform(X)
mesh_points_scaled = scaler.transform(mesh_points)
Z_scaled = model_scaled.predict(mesh_points_scaled)
Z_scaled = Z_scaled.reshape(xx.shape)
# 重新创建缩放后的网格
x_min_s, x_max_s = X_scaled[:, 0].min() - 1, X_scaled[:, 0].max() + 1
y_min_s, y_max_s = X_scaled[:, 1].min() - 1, X_scaled[:, 1].max() + 1
xx_s, yy_s = np.meshgrid(np.arange(x_min_s, x_max_s, 0.02),
np.arange(y_min_s, y_max_s, 0.02))
mesh_scaled = np.c_[xx_s.ravel(), yy_s.ravel()]
Z_s = model_scaled.predict(mesh_scaled)
Z_s = Z_s.reshape(xx_s.shape)
ax2.contourf(xx_s, yy_s, Z_s, alpha=0.8, cmap=plt.cm.RdYlBu)
ax2.scatter(X_scaled[:, 0], X_scaled[:, 1], c=y, cmap=plt.cm.RdYlBu, edgecolors='black')
ax2.set_title('标准化数据')
ax2.set_xlabel('特征1 (标准化)')
ax2.set_ylabel('特征2 (标准化)')
plt.tight_layout()
plt.show()
# 演示特征缩放的重要性
importance_demo = PreprocessingImportanceDemo()
importance_demo.demonstrate_scaling_importance()
2. 数据清洗与转换
2.1 处理缺失值
class MissingValueHandler:
"""缺失值处理类"""
def __init__(self):
self.strategies = {}
def analyze_missing_patterns(self, data):
"""分析缺失值模式"""
missing_info = pd.DataFrame({
'缺失数量': data.isnull().sum(),
'缺失比例': data.isnull().sum() / len(data),
'数据类型': data.dtypes
})
print("=== 缺失值分析 ===")
print(missing_info[missing_info['缺失数量'] > 0])
# 可视化缺失值模式
self.visualize_missing_patterns(data)
return missing_info
def visualize_missing_patterns(self, data):
"""可视化缺失值模式"""
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
# 缺失值热图
sns.heatmap(data.isnull(), cbar=True, ax=axes[0], cmap='viridis')
axes[0].set_title('缺失值热图')
# 缺失值条形图
missing_counts = data.isnull().sum()
missing_counts[missing_counts > 0].plot(kind='bar', ax=axes[1])
axes[1].set_title('各特征缺失值数量')
axes[1].set_ylabel('缺失值数量')
axes[1].tick_params(axis='x', rotation=45)
# 缺失值相关性
missing_corr = data.isnull().corr()
sns.heatmap(missing_corr, annot=True, cmap='coolwarm', center=0, ax=axes[2])
axes[2].set_title('缺失值相关性')
plt.tight_layout()
plt.show()
def handle_missing_values(self, data, strategies=None):
"""处理缺失值"""
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
if strategies is None:
strategies = {
'age': 'median',
'income': 'knn',
'education_years': 'mode',
'gender': 'mode',
'city': 'mode'
}
data_filled = data.copy()
for column, strategy in strategies.items():
if column not in data.columns:
continue
if data[column].isnull().sum() == 0:
continue
print(f"处理 {column} 的缺失值,策略: {strategy}")
if strategy == 'mean':
imputer = SimpleImputer(strategy='mean')
data_filled[[column]] = imputer.fit_transform(data_filled[[column]])
elif strategy == 'median':
imputer = SimpleImputer(strategy='median')
data_filled[[column]] = imputer.fit_transform(data_filled[[column]])
elif strategy == 'mode':
imputer = SimpleImputer(strategy='most_frequent')
data_filled[[column]] = imputer.fit_transform(data_filled[[column]])
elif strategy == 'knn':
# 只对数值列使用KNN填充
numeric_cols = data_filled.select_dtypes(include=[np.number]).columns
knn_imputer = KNNImputer(n_neighbors=5)
data_filled[numeric_cols] = knn_imputer.fit_transform(data_filled[numeric_cols])
elif strategy == 'iterative':
# 迭代填充
numeric_cols = data_filled.select_dtypes(include=[np.number]).columns
iterative_imputer = IterativeImputer(random_state=42)
data_filled[numeric_cols] = iterative_imputer.fit_transform(data_filled[numeric_cols])
elif strategy == 'drop':
data_filled = data_filled.dropna(subset=[column])
return data_filled
def compare_imputation_methods(self, data, target_column='income'):
"""比较不同填充方法的效果"""
from sklearn.metrics import mean_squared_error
# 创建人工缺失值用于测试
data_test = data.copy()
complete_mask = ~data_test[target_column].isnull()
complete_data = data_test[complete_mask].copy()
# 随机创建缺失值
np.random.seed(42)
missing_indices = np.random.choice(len(complete_data),
size=int(0.2 * len(complete_data)),
replace=False)
test_data = complete_data.copy()
true_values = test_data.iloc[missing_indices][target_column].values
test_data.iloc[missing_indices, test_data.columns.get_loc(target_column)] = np.nan
methods = {
'mean': SimpleImputer(strategy='mean'),
'median': SimpleImputer(strategy='median'),
'knn': KNNImputer(n_neighbors=5),
'iterative': IterativeImputer(random_state=42)
}
results = {}
for method_name, imputer in methods.items():
# 只对数值列进行填充
numeric_cols = test_data.select_dtypes(include=[np.number]).columns
test_filled = test_data.copy()
test_filled[numeric_cols] = imputer.fit_transform(test_filled[numeric_cols])
predicted_values = test_filled.iloc[missing_indices][target_column].values
mse = mean_squared_error(true_values, predicted_values)
results[method_name] = mse
print(f"{method_name} MSE: {mse:.2f}")
# 可视化比较结果
self.plot_imputation_comparison(results)
return results
def plot_imputation_comparison(self, results):
"""绘制填充方法比较图"""
methods = list(results.keys())
mse_values = list(results.values())
plt.figure(figsize=(10, 6))
bars = plt.bar(methods, mse_values, color=['skyblue', 'lightgreen', 'lightcoral', 'gold'])
plt.title('不同填充方法的MSE比较')
plt.xlabel('填充方法')
plt.ylabel('均方误差 (MSE)')
# 添加数值标签
for bar, value in zip(bars, mse_values):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + max(mse_values)*0.01,
f'{value:.2f}', ha='center', va='bottom')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
# 演示缺失值处理
missing_handler = MissingValueHandler()
missing_info = missing_handler.analyze_missing_patterns(data)
data_filled = missing_handler.handle_missing_values(data)
comparison_results = missing_handler.compare_imputation_methods(data)
2.2 异常值检测与处理
class OutlierDetector:
"""异常值检测与处理类"""
def __init__(self):
self.outlier_indices = {}
def detect_outliers_statistical(self, data, columns=None, method='iqr'):
"""统计方法检测异常值"""
if columns is None:
columns = data.select_dtypes(include=[np.number]).columns
outliers = {}
for column in columns:
if method == 'iqr':
Q1 = data[column].quantile(0.25)
Q3 = data[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers[column] = data[(data[column] < lower_bound) |
(data[column] > upper_bound)].index.tolist()
elif method == 'zscore':
z_scores = np.abs((data[column] - data[column].mean()) / data[column].std())
outliers[column] = data[z_scores > 3].index.tolist()
elif method == 'modified_zscore':
median = data[column].median()
mad = np.median(np.abs(data[column] - median))
modified_z_scores = 0.6745 * (data[column] - median) / mad
outliers[column] = data[np.abs(modified_z_scores) > 3.5].index.tolist()
return outliers
def detect_outliers_ml(self, data, columns=None):
"""机器学习方法检测异常值"""
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
if columns is None:
columns = data.select_dtypes(include=[np.number]).columns
X = data[columns].fillna(data[columns].median())
methods = {
'isolation_forest': IsolationForest(contamination=0.1, random_state=42),
'local_outlier_factor': LocalOutlierFactor(contamination=0.1),
'one_class_svm': OneClassSVM(nu=0.1)
}
outlier_results = {}
for method_name, detector in methods.items():
if method_name == 'local_outlier_factor':
outlier_labels = detector.fit_predict(X)
else:
outlier_labels = detector.fit_predict(X)
outlier_indices = np.where(outlier_labels == -1)[0]
outlier_results[method_name] = outlier_indices.tolist()
print(f"{method_name} 检测到 {len(outlier_indices)} 个异常值")
return outlier_results
def visualize_outliers(self, data, column, outlier_indices):
"""可视化异常值"""
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
# 箱线图
data[column].plot(kind='box', ax=axes[0])
axes[0].set_title(f'{column} 箱线图')
axes[0].set_ylabel(column)
# 直方图
axes[1].hist(data[column].dropna(), bins=50, alpha=0.7, color='skyblue', label='正常值')
if outlier_indices:
axes[1].hist(data.loc[outlier_indices, column].dropna(),
bins=20, alpha=0.7, color='red', label='异常值')
axes[1].set_title(f'{column} 分布')
axes[1].set_xlabel(column)
axes[1].set_ylabel('频次')
axes[1].legend()
# 散点图(如果有索引)
normal_indices = data.index.difference(outlier_indices)
axes[2].scatter(normal_indices, data.loc[normal_indices, column],
alpha=0.6, color='skyblue', label='正常值')
if outlier_indices:
axes[2].scatter(outlier_indices, data.loc[outlier_indices, column],
alpha=0.8, color='red', label='异常值')
axes[2].set_title(f'{column} 散点图')
axes[2].set_xlabel('索引')
axes[2].set_ylabel(column)
axes[2].legend()
plt.tight_layout()
plt.show()
def handle_outliers(self, data, outlier_indices, method='cap'):
"""处理异常值"""
data_processed = data.copy()
for column, indices in outlier_indices.items():
if not indices:
continue
print(f"处理 {column} 的 {len(indices)} 个异常值,方法: {method}")
if method == 'remove':
data_processed = data_processed.drop(indices)
elif method == 'cap':
Q1 = data[column].quantile(0.25)
Q3 = data[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
data_processed.loc[data_processed[column] < lower_bound, column] = lower_bound
data_processed.loc[data_processed[column] > upper_bound, column] = upper_bound
elif method == 'transform':
# 对数变换
if (data_processed[column] > 0).all():
data_processed[column] = np.log1p(data_processed[column])
elif method == 'winsorize':
from scipy.stats import mstats
data_processed[column] = mstats.winsorize(data_processed[column],
limits=[0.05, 0.05])
return data_processed
# 演示异常值检测与处理
outlier_detector = OutlierDetector()
# 统计方法检测异常值
statistical_outliers = outlier_detector.detect_outliers_statistical(data_filled,
columns=['income'],
method='iqr')
print("=== 统计方法检测异常值 ===")
for column, indices in statistical_outliers.items():
print(f"{column}: {len(indices)} 个异常值")
# 可视化异常值
outlier_detector.visualize_outliers(data_filled, 'income', statistical_outliers['income'])
# 机器学习方法检测异常值
ml_outliers = outlier_detector.detect_outliers_ml(data_filled)
# 处理异常值
data_no_outliers = outlier_detector.handle_outliers(data_filled, statistical_outliers, method='cap')
2.3 数据类型转换
class DataTypeConverter:
"""数据类型转换类"""
def __init__(self):
self.encoders = {}
def analyze_data_types(self, data):
"""分析数据类型"""
type_info = pd.DataFrame({
'数据类型': data.dtypes,
'唯一值数量': data.nunique(),
'缺失值数量': data.isnull().sum(),
'内存使用(MB)': data.memory_usage(deep=True) / 1024**2
})
print("=== 数据类型分析 ===")
print(type_info)
return type_info
def optimize_data_types(self, data):
"""优化数据类型以节省内存"""
data_optimized = data.copy()
for column in data.columns:
col_type = data[column].dtype
if col_type != 'object':
c_min = data[column].min()
c_max = data[column].max()
if str(col_type)[:3] == 'int':
if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
data_optimized[column] = data_optimized[column].astype(np.int8)
elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
data_optimized[column] = data_optimized[column].astype(np.int16)
elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
data_optimized[column] = data_optimized[column].astype(np.int32)
elif str(col_type)[:5] == 'float':
if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
data_optimized[column] = data_optimized[column].astype(np.float32)
else:
# 对于字符串类型,检查是否可以转换为category
if data[column].nunique() / len(data) < 0.5:
data_optimized[column] = data_optimized[column].astype('category')
# 显示内存节省情况
original_memory = data.memory_usage(deep=True).sum() / 1024**2
optimized_memory = data_optimized.memory_usage(deep=True).sum() / 1024**2
print(f"原始内存使用: {original_memory:.2f} MB")
print(f"优化后内存使用: {optimized_memory:.2f} MB")
print(f"内存节省: {((original_memory - optimized_memory) / original_memory * 100):.2f}%")
return data_optimized
def encode_categorical_features(self, data, encoding_method='onehot'):
"""编码分类特征"""
from sklearn.preprocessing import LabelEncoder, OneHotEncoder, OrdinalEncoder
categorical_columns = data.select_dtypes(include=['object', 'category']).columns
data_encoded = data.copy()
if encoding_method == 'label':
for column in categorical_columns:
le = LabelEncoder()
data_encoded[column] = le.fit_transform(data_encoded[column].astype(str))
self.encoders[column] = le
elif encoding_method == 'onehot':
# 使用pandas的get_dummies进行独热编码
data_encoded = pd.get_dummies(data_encoded, columns=categorical_columns,
prefix=categorical_columns)
elif encoding_method == 'ordinal':
for column in categorical_columns:
oe = OrdinalEncoder()
data_encoded[column] = oe.fit_transform(data_encoded[[column]])
self.encoders[column] = oe
print(f"使用 {encoding_method} 编码后的数据形状: {data_encoded.shape}")
return data_encoded
def handle_datetime_features(self, data, datetime_columns):
"""处理日期时间特征"""
data_processed = data.copy()
for column in datetime_columns:
if column not in data.columns:
continue
# 转换为datetime类型
data_processed[column] = pd.to_datetime(data_processed[column])
# 提取时间特征
data_processed[f'{column}_year'] = data_processed[column].dt.year
data_processed[f'{column}_month'] = data_processed[column].dt.month
data_processed[f'{column}_day'] = data_processed[column].dt.day
data_processed[f'{column}_dayofweek'] = data_processed[column].dt.dayofweek
data_processed[f'{column}_quarter'] = data_processed[column].dt.quarter
data_processed[f'{column}_is_weekend'] = (data_processed[column].dt.dayofweek >= 5).astype(int)
# 删除原始日期列
data_processed = data_processed.drop(column, axis=1)
return data_processed
# 演示数据类型转换
converter = DataTypeConverter()
type_info = converter.analyze_data_types(data_no_outliers)
data_optimized = converter.optimize_data_types(data_no_outliers)
data_encoded = converter.encode_categorical_features(data_optimized, encoding_method='onehot')
3. 特征缩放与标准化
3.1 标准化 (Standardization)
class FeatureScalingDemo:
"""特征缩放演示类"""
def __init__(self):
self.scalers = {}
def demonstrate_standardization(self, data):
"""演示标准化"""
from sklearn.preprocessing import StandardScaler
# 选择数值特征
numeric_columns = data.select_dtypes(include=[np.number]).columns
X = data[numeric_columns]
# 标准化
scaler = StandardScaler()
X_standardized = scaler.fit_transform(X)
X_standardized_df = pd.DataFrame(X_standardized, columns=numeric_columns)
# 比较标准化前后的统计信息
print("=== 标准化前后对比 ===")
print("原始数据统计:")
print(X.describe())
print("\n标准化后统计:")
print(X_standardized_df.describe())
# 可视化对比
self.plot_scaling_comparison(X, X_standardized_df, "标准化")
return X_standardized_df, scaler
def demonstrate_normalization(self, data):
"""演示归一化"""
from sklearn.preprocessing import MinMaxScaler
numeric_columns = data.select_dtypes(include=[np.number]).columns
X = data[numeric_columns]
# 归一化到[0,1]
scaler = MinMaxScaler()
X_normalized = scaler.fit_transform(X)
X_normalized_df = pd.DataFrame(X_normalized, columns=numeric_columns)
print("=== 归一化前后对比 ===")
print("原始数据范围:")
print(f"最小值: {X.min()}")
print(f"最大值: {X.max()}")
print("\n归一化后范围:")
print(f"最小值: {X_normalized_df.min()}")
print(f"最大值: {X_normalized_df.max()}")
# 可视化对比
self.plot_scaling_comparison(X, X_normalized_df, "归一化")
return X_normalized_df, scaler
def demonstrate_robust_scaling(self, data):
"""演示鲁棒缩放"""
from sklearn.preprocessing import RobustScaler
numeric_columns = data.select_dtypes(include=[np.number]).columns
X = data[numeric_columns]
# 鲁棒缩放
scaler = RobustScaler()
X_robust = scaler.fit_transform(X)
X_robust_df = pd.DataFrame(X_robust, columns=numeric_columns)
print("=== 鲁棒缩放前后对比 ===")
print("原始数据中位数和IQR:")
print(f"中位数: {X.median()}")
print(f"IQR: {X.quantile(0.75) - X.quantile(0.25)}")
print("\n鲁棒缩放后中位数和IQR:")
print(f"中位数: {X_robust_df.median()}")
print(f"IQR: {X_robust_df.quantile(0.75) - X_robust_df.quantile(0.25)}")
# 可视化对比
self.plot_scaling_comparison(X, X_robust_df, "鲁棒缩放")
return X_robust_df, scaler
def plot_scaling_comparison(self, original, scaled, method_name):
"""绘制缩放前后对比图"""
fig, axes = plt.subplots(2, len(original.columns), figsize=(15, 8))
fig.suptitle(f'{method_name}前后对比', fontsize=16)
for i, column in enumerate(original.columns):
# 原始数据分布
axes[0, i].hist(original[column].dropna(), bins=30, alpha=0.7, color='skyblue')
axes[0, i].set_title(f'原始 {column}')
axes[0, i].set_ylabel('频次')
# 缩放后数据分布
axes[1, i].hist(scaled[column].dropna(), bins=30, alpha=0.7, color='lightgreen')
axes[1, i].set_title(f'{method_name}后 {column}')
axes[1, i].set_xlabel(column)
axes[1, i].set_ylabel('频次')
plt.tight_layout()
plt.show()
def compare_scaling_methods(self, data):
"""比较不同缩放方法"""
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, MaxAbsScaler
numeric_columns = data.select_dtypes(include=[np.number]).columns
X = data[numeric_columns].iloc[:, 0] # 选择第一个数值列进行演示
scalers = {
'原始数据': None,
'标准化': StandardScaler(),
'归一化': MinMaxScaler(),
'鲁棒缩放': RobustScaler(),
'最大绝对值缩放': MaxAbsScaler()
}
fig, axes = plt.subplots(1, len(scalers), figsize=(20, 4))
fig.suptitle('不同缩放方法比较', fontsize=16)
for i, (method_name, scaler) in enumerate(scalers.items()):
if scaler is None:
data_scaled = X
else:
data_scaled = scaler.fit_transform(X.values.reshape(-1, 1)).flatten()
axes[i].hist(data_scaled, bins=30, alpha=0.7, color=plt.cm.Set3(i))
axes[i].set_title(method_name)
axes[i].set_xlabel('值')
axes[i].set_ylabel('频次')
# 添加统计信息
axes[i].text(0.05, 0.95, f'均值: {np.mean(data_scaled):.2f}\n标准差: {np.std(data_scaled):.2f}',
transform=axes[i].transAxes, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
plt.tight_layout()
plt.show()
# 演示特征缩放
scaling_demo = FeatureScalingDemo()
# 标准化
X_standardized, std_scaler = scaling_demo.demonstrate_standardization(data_encoded)
# 归一化
X_normalized, minmax_scaler = scaling_demo.demonstrate_normalization(data_encoded)
# 鲁棒缩放
X_robust, robust_scaler = scaling_demo.demonstrate_robust_scaling(data_encoded)
# 比较不同缩放方法
scaling_demo.compare_scaling_methods(data_encoded)
3.2 缩放方法选择指南
class ScalingMethodSelector:
"""缩放方法选择指南"""
def __init__(self):
self.recommendations = {}
def analyze_data_distribution(self, data):
"""分析数据分布特征"""
numeric_columns = data.select_dtypes(include=[np.number]).columns
analysis_results = {}
for column in numeric_columns:
col_data = data[column].dropna()
# 计算分布特征
skewness = col_data.skew()
kurtosis = col_data.kurtosis()
outlier_ratio = self.calculate_outlier_ratio(col_data)
analysis_results[column] = {
'skewness': skewness,
'kurtosis': kurtosis,
'outlier_ratio': outlier_ratio,
'min': col_data.min(),
'max': col_data.max(),
'std': col_data.std()
}
return analysis_results
def calculate_outlier_ratio(self, data):
"""计算异常值比例"""
Q1 = data.quantile(0.25)
Q3 = data.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = data[(data < lower_bound) | (data > upper_bound)]
return len(outliers) / len(data)
def recommend_scaling_method(self, analysis_results):
"""推荐缩放方法"""
recommendations = {}
for column, stats in analysis_results.items():
recommendation = []
# 基于偏度推荐
if abs(stats['skewness']) < 0.5:
recommendation.append("数据接近正态分布,推荐标准化")
elif abs(stats['skewness']) > 2:
recommendation.append("数据严重偏斜,考虑对数变换后标准化")
# 基于异常值比例推荐
if stats['outlier_ratio'] > 0.1:
recommendation.append("异常值较多,推荐鲁棒缩放")
elif stats['outlier_ratio'] < 0.05:
recommendation.append("异常值较少,可使用标准化或归一化")
# 基于数据范围推荐
if stats['max'] - stats['min'] > 1000:
recommendation.append("数据范围很大,推荐标准化")
elif stats['min'] >= 0:
recommendation.append("数据非负,可考虑归一化")
recommendations[column] = recommendation
return recommendations
def create_scaling_decision_tree(self):
"""创建缩放方法决策树"""
decision_tree = """
缩放方法选择决策树:
1. 数据是否包含异常值?
├─ 是 (>10%) → 使用鲁棒缩放 (RobustScaler)
└─ 否 (<5%) → 继续下一步
2. 数据分布是否接近正态分布?
├─ 是 (|偏度| < 0.5) → 使用标准化 (StandardScaler)
└─ 否 → 继续下一步
3. 数据是否严重偏斜?
├─ 是 (|偏度| > 2) → 先对数变换,再标准化
└─ 否 → 继续下一步
4. 需要保持数据在特定范围内?
├─ 是 [0,1] → 使用归一化 (MinMaxScaler)
├─ 是 [-1,1] → 使用最大绝对值缩放 (MaxAbsScaler)
└─ 否 → 使用标准化 (StandardScaler)
5. 特殊情况:
- 稀疏数据 → MaxAbsScaler 或不缩放
- 神经网络 → StandardScaler 或 MinMaxScaler
- 树模型 → 通常不需要缩放
- 距离算法 → 必须缩放,推荐 StandardScaler
"""
print(decision_tree)
return decision_tree
# 演示缩放方法选择
selector = ScalingMethodSelector()
analysis_results = selector.analyze_data_distribution(data_encoded)
recommendations = selector.recommend_scaling_method(analysis_results)
print("=== 缩放方法推荐 ===")
for column, recs in recommendations.items():
print(f"\n{column}:")
for rec in recs:
print(f" - {rec}")
selector.create_scaling_decision_tree()
4. 特征选择
4.1 过滤法特征选择
class FilterFeatureSelector:
"""过滤法特征选择"""
def __init__(self):
self.selected_features = {}
self.scores = {}
def univariate_selection(self, X, y, k=10, score_func=None):
"""单变量特征选择"""
from sklearn.feature_selection import SelectKBest, chi2, f_classif, f_regression, mutual_info_classif
if score_func is None:
# 根据目标变量类型选择评分函数
if len(np.unique(y)) < 10: # 分类问题
score_func = f_classif
else: # 回归问题
score_func = f_regression
selector = SelectKBest(score_func=score_func, k=k)
X_selected = selector.fit_transform(X, y)
# 获取特征分数和选中的特征
feature_scores = selector.scores_
selected_features = selector.get_support(indices=True)
self.scores['univariate'] = feature_scores
self.selected_features['univariate'] = selected_features
print(f"=== 单变量特征选择 ===")
print(f"原始特征数: {X.shape[1]}")
print(f"选择特征数: {k}")
print(f"选中的特征索引: {selected_features}")
# 可视化特征分数
self.plot_feature_scores(feature_scores, selected_features, "单变量特征选择")
return X_selected, selected_features
def correlation_selection(self, X, threshold=0.95):
"""基于相关性的特征选择"""
# 计算特征间相关性
corr_matrix = np.corrcoef(X.T)
# 找到高相关性的特征对
high_corr_pairs = []
features_to_remove = set()
for i in range(len(corr_matrix)):
for j in range(i+1, len(corr_matrix)):
if abs(corr_matrix[i, j]) > threshold:
high_corr_pairs.append((i, j, corr_matrix[i, j]))
# 移除其中一个特征(保留索引较小的)
features_to_remove.add(j)
selected_features = [i for i in range(X.shape[1]) if i not in features_to_remove]
X_selected = X[:, selected_features]
self.selected_features['correlation'] = selected_features
print(f"=== 相关性特征选择 ===")
print(f"相关性阈值: {threshold}")
print(f"发现 {len(high_corr_pairs)} 对高相关特征")
print(f"移除 {len(features_to_remove)} 个特征")
print(f"保留 {len(selected_features)} 个特征")
# 可视化相关性矩阵
self.plot_correlation_matrix(corr_matrix, features_to_remove)
return X_selected, selected_features
def variance_selection(self, X, threshold=0.01):
"""基于方差的特征选择"""
from sklearn.feature_selection import VarianceThreshold
selector = VarianceThreshold(threshold=threshold)
X_selected = selector.fit_transform(X)
selected_features = selector.get_support(indices=True)
feature_variances = np.var(X, axis=0)
self.selected_features['variance'] = selected_features
self.scores['variance'] = feature_variances
print(f"=== 方差特征选择 ===")
print(f"方差阈值: {threshold}")
print(f"原始特征数: {X.shape[1]}")
print(f"选择特征数: {X_selected.shape[1]}")
print(f"移除的低方差特征数: {X.shape[1] - X_selected.shape[1]}")
# 可视化特征方差
self.plot_feature_variances(feature_variances, threshold)
return X_selected, selected_features
def plot_feature_scores(self, scores, selected_features, title):
"""绘制特征分数"""
plt.figure(figsize=(12, 6))
feature_indices = range(len(scores))
colors = ['red' if i in selected_features else 'lightblue' for i in feature_indices]
bars = plt.bar(feature_indices, scores, color=colors, alpha=0.7)
plt.title(f'{title} - 特征分数')
plt.xlabel('特征索引')
plt.ylabel('分数')
# 添加图例
red_patch = plt.Rectangle((0, 0), 1, 1, fc="red", alpha=0.7)
blue_patch = plt.Rectangle((0, 0), 1, 1, fc="lightblue", alpha=0.7)
plt.legend([red_patch, blue_patch], ['选中特征', '未选中特征'])
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
def plot_correlation_matrix(self, corr_matrix, removed_features):
"""绘制相关性矩阵"""
plt.figure(figsize=(10, 8))
# 创建掩码来突出显示被移除的特征
mask = np.zeros_like(corr_matrix, dtype=bool)
for feature in removed_features:
mask[feature, :] = True
mask[:, feature] = True
sns.heatmap(corr_matrix, annot=False, cmap='coolwarm', center=0,
mask=mask, square=True, cbar_kws={"shrink": .8})
plt.title('特征相关性矩阵 (灰色区域为被移除的特征)')
plt.tight_layout()
plt.show()
def plot_feature_variances(self, variances, threshold):
"""绘制特征方差"""
plt.figure(figsize=(12, 6))
feature_indices = range(len(variances))
colors = ['red' if var > threshold else 'lightblue' for var in variances]
plt.bar(feature_indices, variances, color=colors, alpha=0.7)
plt.axhline(y=threshold, color='black', linestyle='--', label=f'阈值 = {threshold}')
plt.title('特征方差分布')
plt.xlabel('特征索引')
plt.ylabel('方差')
plt.legend()
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
# 创建示例数据进行特征选择演示
from sklearn.datasets import make_classification
# 生成分类数据
X_demo, y_demo = make_classification(n_samples=1000, n_features=20, n_informative=10,
n_redundant=5, n_clusters_per_class=1, random_state=42)
# 演示过滤法特征选择
filter_selector = FilterFeatureSelector()
# 单变量特征选择
X_univariate, selected_univariate = filter_selector.univariate_selection(X_demo, y_demo, k=10)
# 相关性特征选择
X_correlation, selected_correlation = filter_selector.correlation_selection(X_demo, threshold=0.8)
# 方差特征选择
X_variance, selected_variance = filter_selector.variance_selection(X_demo, threshold=0.1)
4.2 包装法特征选择
class WrapperFeatureSelector:
"""包装法特征选择"""
def __init__(self):
self.selected_features = {}
self.feature_importance = {}
def recursive_feature_elimination(self, X, y, estimator=None, n_features=10):
"""递归特征消除"""
from sklearn.feature_selection import RFE
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
if estimator is None:
estimator = RandomForestClassifier(n_estimators=100, random_state=42)
selector = RFE(estimator=estimator, n_features_to_select=n_features)
X_selected = selector.fit_transform(X, y)
selected_features = selector.get_support(indices=True)
feature_ranking = selector.ranking_
self.selected_features['rfe'] = selected_features
self.feature_importance['rfe'] = feature_ranking
print(f"=== 递归特征消除 ===")
print(f"原始特征数: {X.shape[1]}")
print(f"选择特征数: {n_features}")
print(f"选中的特征索引: {selected_features}")
# 可视化特征排名
self.plot_feature_ranking(feature_ranking, selected_features, "递归特征消除")
return X_selected, selected_features
def sequential_feature_selection(self, X, y, direction='forward', n_features=10):
"""序列特征选择"""
from sklearn.feature_selection import SequentialFeatureSelector
from sklearn.ensemble import RandomForestClassifier
estimator = RandomForestClassifier(n_estimators=100, random_state=42)
selector = SequentialFeatureSelector(estimator, n_features_to_select=n_features,
direction=direction, cv=5)
X_selected = selector.fit_transform(X, y)
selected_features = selector.get_support(indices=True)
self.selected_features[f'sfs_{direction}'] = selected_features
print(f"=== 序列特征选择 ({direction}) ===")
print(f"原始特征数: {X.shape[1]}")
print(f"选择特征数: {n_features}")
print(f"选中的特征索引: {selected_features}")
return X_selected, selected_features
def genetic_algorithm_selection(self, X, y, n_features=10, population_size=50, generations=20):
"""遗传算法特征选择"""
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
# 简化的遗传算法实现
def fitness_function(features_mask):
if np.sum(features_mask) == 0:
return 0
X_subset = X[:, features_mask]
estimator = RandomForestClassifier(n_estimators=50, random_state=42)
scores = cross_val_score(estimator, X_subset, y, cv=3, scoring='accuracy')
return np.mean(scores)
# 初始化种群
population = []
for _ in range(population_size):
individual = np.random.choice([True, False], size=X.shape[1],
p=[n_features/X.shape[1], 1-n_features/X.shape[1]])
population.append(individual)
best_fitness = 0
best_individual = None
for generation in range(generations):
# 计算适应度
fitness_scores = [fitness_function(individual) for individual in population]
# 记录最佳个体
max_fitness_idx = np.argmax(fitness_scores)
if fitness_scores[max_fitness_idx] > best_fitness:
best_fitness = fitness_scores[max_fitness_idx]
best_individual = population[max_fitness_idx].copy()
# 选择、交叉、变异(简化实现)
new_population = []
for _ in range(population_size):
# 锦标赛选择
tournament_size = 3
tournament_indices = np.random.choice(population_size, tournament_size)
tournament_fitness = [fitness_scores[i] for i in tournament_indices]
winner_idx = tournament_indices[np.argmax(tournament_fitness)]
new_population.append(population[winner_idx].copy())
population = new_population
if generation % 5 == 0:
print(f"第 {generation} 代,最佳适应度: {best_fitness:.4f}")
selected_features = np.where(best_individual)[0]
X_selected = X[:, best_individual]
self.selected_features['genetic'] = selected_features
print(f"=== 遗传算法特征选择 ===")
print(f"最终适应度: {best_fitness:.4f}")
print(f"选中的特征索引: {selected_features}")
return X_selected, selected_features
def plot_feature_ranking(self, ranking, selected_features, title):
"""绘制特征排名"""
plt.figure(figsize=(12, 6))
feature_indices = range(len(ranking))
colors = ['red' if i in selected_features else 'lightblue' for i in feature_indices]
plt.bar(feature_indices, ranking, color=colors, alpha=0.7)
plt.title(f'{title} - 特征排名 (1=最重要)')
plt.xlabel('特征索引')
plt.ylabel('排名')
# 添加图例
red_patch = plt.Rectangle((0, 0), 1, 1, fc="red", alpha=0.7)
blue_patch = plt.Rectangle((0, 0), 1, 1, fc="lightblue", alpha=0.7)
plt.legend([red_patch, blue_patch], ['选中特征', '未选中特征'])
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
# 演示包装法特征选择
wrapper_selector = WrapperFeatureSelector()
# 递归特征消除
X_rfe, selected_rfe = wrapper_selector.recursive_feature_elimination(X_demo, y_demo, n_features=10)
# 前向序列特征选择
X_sfs_forward, selected_sfs_forward = wrapper_selector.sequential_feature_selection(
X_demo, y_demo, direction='forward', n_features=10)
# 后向序列特征选择
X_sfs_backward, selected_sfs_backward = wrapper_selector.sequential_feature_selection(
X_demo, y_demo, direction='backward', n_features=10)
# 遗传算法特征选择
X_genetic, selected_genetic = wrapper_selector.genetic_algorithm_selection(
X_demo, y_demo, n_features=10, population_size=30, generations=10)
4.3 嵌入法特征选择
class EmbeddedFeatureSelector:
"""嵌入法特征选择"""
def __init__(self):
self.selected_features = {}
self.feature_importance = {}
def lasso_selection(self, X, y, alpha=0.01):
"""基于Lasso的特征选择"""
from sklearn.linear_model import LassoCV, Lasso
from sklearn.preprocessing import StandardScaler
# 标准化数据
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 使用交叉验证选择最佳alpha
lasso_cv = LassoCV(cv=5, random_state=42)
lasso_cv.fit(X_scaled, y)
# 使用最佳alpha训练Lasso模型
lasso = Lasso(alpha=lasso_cv.alpha_)
lasso.fit(X_scaled, y)
# 选择非零系数的特征
selected_features = np.where(lasso.coef_ != 0)[0]
feature_importance = np.abs(lasso.coef_)
self.selected_features['lasso'] = selected_features
self.feature_importance['lasso'] = feature_importance
print(f"=== Lasso特征选择 ===")
print(f"最佳alpha: {lasso_cv.alpha_:.6f}")
print(f"原始特征数: {X.shape[1]}")
print(f"选择特征数: {len(selected_features)}")
print(f"选中的特征索引: {selected_features}")
# 可视化特征重要性
self.plot_feature_importance(feature_importance, selected_features, "Lasso特征选择")
return X[:, selected_features], selected_features
def tree_based_selection(self, X, y, estimator_type='random_forest', threshold='median'):
"""基于树模型的特征选择"""
from sklearn.ensemble import RandomForestClassifier, ExtraTreesClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.feature_selection import SelectFromModel
# 选择估计器
if estimator_type == 'random_forest':
estimator = RandomForestClassifier(n_estimators=100, random_state=42)
elif estimator_type == 'extra_trees':
estimator = ExtraTreesClassifier(n_estimators=100, random_state=42)
elif estimator_type == 'decision_tree':
estimator = DecisionTreeClassifier(random_state=42)
# 训练模型并选择特征
selector = SelectFromModel(estimator, threshold=threshold)
X_selected = selector.fit_transform(X, y)
selected_features = selector.get_support(indices=True)
feature_importance = estimator.fit(X, y).feature_importances_
self.selected_features[f'tree_{estimator_type}'] = selected_features
self.feature_importance[f'tree_{estimator_type}'] = feature_importance
print(f"=== {estimator_type}特征选择 ===")
print(f"阈值: {threshold}")
print(f"原始特征数: {X.shape[1]}")
print(f"选择特征数: {len(selected_features)}")
print(f"选中的特征索引: {selected_features}")
# 可视化特征重要性
self.plot_feature_importance(feature_importance, selected_features,
f"{estimator_type}特征选择")
return X_selected, selected_features
def elastic_net_selection(self, X, y, l1_ratio=0.5):
"""基于弹性网络的特征选择"""
from sklearn.linear_model import ElasticNetCV, ElasticNet
from sklearn.preprocessing import StandardScaler
# 标准化数据
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 使用交叉验证选择最佳参数
elastic_cv = ElasticNetCV(l1_ratio=l1_ratio, cv=5, random_state=42)
elastic_cv.fit(X_scaled, y)
# 使用最佳参数训练模型
elastic = ElasticNet(alpha=elastic_cv.alpha_, l1_ratio=l1_ratio)
elastic.fit(X_scaled, y)
# 选择非零系数的特征
selected_features = np.where(elastic.coef_ != 0)[0]
feature_importance = np.abs(elastic.coef_)
self.selected_features['elastic_net'] = selected_features
self.feature_importance['elastic_net'] = feature_importance
print(f"=== 弹性网络特征选择 ===")
print(f"最佳alpha: {elastic_cv.alpha_:.6f}")
print(f"L1比例: {l1_ratio}")
print(f"原始特征数: {X.shape[1]}")
print(f"选择特征数: {len(selected_features)}")
print(f"选中的特征索引: {selected_features}")
# 可视化特征重要性
self.plot_feature_importance(feature_importance, selected_features, "弹性网络特征选择")
return X[:, selected_features], selected_features
def plot_feature_importance(self, importance, selected_features, title):
"""绘制特征重要性"""
plt.figure(figsize=(12, 6))
feature_indices = range(len(importance))
colors = ['red' if i in selected_features else 'lightblue' for i in feature_indices]
plt.bar(feature_indices, importance, color=colors, alpha=0.7)
plt.title(f'{title} - 特征重要性')
plt.xlabel('特征索引')
plt.ylabel('重要性')
# 添加图例
red_patch = plt.Rectangle((0, 0), 1, 1, fc="red", alpha=0.7)
blue_patch = plt.Rectangle((0, 0), 1, 1, fc="lightblue", alpha=0.7)
plt.legend([red_patch, blue_patch], ['选中特征', '未选中特征'])
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
def compare_selection_methods(self, X, y):
"""比较不同特征选择方法"""
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestClassifier
methods_results = {}
# 原始数据
estimator = RandomForestClassifier(n_estimators=100, random_state=42)
original_scores = cross_val_score(estimator, X, y, cv=5, scoring='accuracy')
methods_results['原始数据'] = {
'n_features': X.shape[1],
'cv_score': np.mean(original_scores),
'cv_std': np.std(original_scores)
}
# 各种特征选择方法
for method_name, features in self.selected_features.items():
if len(features) > 0:
X_subset = X[:, features]
scores = cross_val_score(estimator, X_subset, y, cv=5, scoring='accuracy')
methods_results[method_name] = {
'n_features': len(features),
'cv_score': np.mean(scores),
'cv_std': np.std(scores)
}
# 可视化比较结果
self.plot_methods_comparison(methods_results)
return methods_results
def plot_methods_comparison(self, results):
"""绘制方法比较图"""
methods = list(results.keys())
scores = [results[method]['cv_score'] for method in methods]
stds = [results[method]['cv_std'] for method in methods]
n_features = [results[method]['n_features'] for method in methods]
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# 准确率比较
bars1 = ax1.bar(methods, scores, yerr=stds, capsize=5, alpha=0.7,
color=plt.cm.Set3(range(len(methods))))
ax1.set_title('不同特征选择方法的准确率比较')
ax1.set_ylabel('交叉验证准确率')
ax1.tick_params(axis='x', rotation=45)
# 添加数值标签
for bar, score in zip(bars1, scores):
ax1.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
f'{score:.3f}', ha='center', va='bottom')
# 特征数量比较
bars2 = ax2.bar(methods, n_features, alpha=0.7,
color=plt.cm.Set3(range(len(methods))))
ax2.set_title('不同方法选择的特征数量')
ax2.set_ylabel('特征数量')
ax2.tick_params(axis='x', rotation=45)
# 添加数值标签
for bar, n_feat in zip(bars2, n_features):
ax2.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.5,
f'{n_feat}', ha='center', va='bottom')
plt.tight_layout()
plt.show()
# 演示嵌入法特征选择
embedded_selector = EmbeddedFeatureSelector()
# Lasso特征选择
X_lasso, selected_lasso = embedded_selector.lasso_selection(X_demo, y_demo)
# 随机森林特征选择
X_rf, selected_rf = embedded_selector.tree_based_selection(X_demo, y_demo, 'random_forest')
# 极端随机树特征选择
X_et, selected_et = embedded_selector.tree_based_selection(X_demo, y_demo, 'extra_trees')
# 弹性网络特征选择
X_elastic, selected_elastic = embedded_selector.elastic_net_selection(X_demo, y_demo)
# 比较不同方法
comparison_results = embedded_selector.compare_selection_methods(X_demo, y_demo)
5. 降维技术
5.1 主成分分析 (PCA)
class DimensionalityReductionDemo:
"""降维技术演示"""
def __init__(self):
self.reducers = {}
self.explained_variance = {}
def pca_analysis(self, X, n_components=None):
"""主成分分析"""
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
# 标准化数据
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 如果未指定组件数,使用所有组件进行分析
if n_components is None:
n_components = min(X.shape[0], X.shape[1])
pca = PCA(n_components=n_components)
X_pca = pca.fit_transform(X_scaled)
self.reducers['pca'] = pca
self.explained_variance['pca'] = pca.explained_variance_ratio_
print(f"=== PCA分析 ===")
print(f"原始维度: {X.shape[1]}")
print(f"降维后维度: {X_pca.shape[1]}")
print(f"累积解释方差比: {np.cumsum(pca.explained_variance_ratio_)[:5]}")
# 可视化PCA结果
self.plot_pca_analysis(pca, X_pca)
return X_pca, pca
def plot_pca_analysis(self, pca, X_pca):
"""可视化PCA分析结果"""
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
# 解释方差比
axes[0, 0].bar(range(1, len(pca.explained_variance_ratio_) + 1),
pca.explained_variance_ratio_, alpha=0.7)
axes[0, 0].set_title('各主成分解释方差比')
axes[0, 0].set_xlabel('主成分')
axes[0, 0].set_ylabel('解释方差比')
# 累积解释方差比
cumsum_var = np.cumsum(pca.explained_variance_ratio_)
axes[0, 1].plot(range(1, len(cumsum_var) + 1), cumsum_var, 'bo-')
axes[0, 1].axhline(y=0.95, color='r', linestyle='--', label='95%阈值')
axes[0, 1].set_title('累积解释方差比')
axes[0, 1].set_xlabel('主成分数量')
axes[0, 1].set_ylabel('累积解释方差比')
axes[0, 1].legend()
axes[0, 1].grid(True)
# 前两个主成分的散点图
if X_pca.shape[1] >= 2:
axes[1, 0].scatter(X_pca[:, 0], X_pca[:, 1], alpha=0.6)
axes[1, 0].set_title('前两个主成分散点图')
axes[1, 0].set_xlabel('第一主成分')
axes[1, 0].set_ylabel('第二主成分')
# 主成分载荷图(如果原始特征数不太多)
if pca.components_.shape[1] <= 20:
im = axes[1, 1].imshow(pca.components_[:5], cmap='coolwarm', aspect='auto')
axes[1, 1].set_title('前5个主成分载荷')
axes[1, 1].set_xlabel('原始特征')
axes[1, 1].set_ylabel('主成分')
plt.colorbar(im, ax=axes[1, 1])
plt.tight_layout()
plt.show()
def determine_optimal_components(self, X, variance_threshold=0.95):
"""确定最优主成分数量"""
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 计算所有主成分
pca_full = PCA()
pca_full.fit(X_scaled)
# 找到达到方差阈值的组件数
cumsum_var = np.cumsum(pca_full.explained_variance_ratio_)
n_components_95 = np.argmax(cumsum_var >= variance_threshold) + 1
# 使用肘部法则
n_components_elbow = self.find_elbow_point(pca_full.explained_variance_ratio_)
print(f"=== 最优主成分数量分析 ===")
print(f"达到{variance_threshold*100}%方差的组件数: {n_components_95}")
print(f"肘部法则建议的组件数: {n_components_elbow}")
# 可视化选择过程
self.plot_component_selection(pca_full.explained_variance_ratio_,
n_components_95, n_components_elbow, variance_threshold)
return n_components_95, n_components_elbow
def find_elbow_point(self, explained_variance_ratio):
"""使用肘部法则找到最优组件数"""
# 计算二阶导数来找到肘部点
n_components = len(explained_variance_ratio)
if n_components < 3:
return 1
# 计算差分
first_diff = np.diff(explained_variance_ratio)
second_diff = np.diff(first_diff)
# 找到二阶导数最大的点(肘部)
elbow_point = np.argmax(second_diff) + 2 # +2因为两次差分
return min(elbow_point, n_components)
def plot_component_selection(self, explained_variance_ratio, n_95, n_elbow, threshold):
"""可视化组件选择过程"""
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# 解释方差比图
ax1.bar(range(1, len(explained_variance_ratio) + 1), explained_variance_ratio, alpha=0.7)
ax1.axvline(x=n_elbow, color='red', linestyle='--', label=f'肘部法则: {n_elbow}')
ax1.set_title('各主成分解释方差比')
ax1.set_xlabel('主成分')
ax1.set_ylabel('解释方差比')
ax1.legend()
# 累积方差图
cumsum_var = np.cumsum(explained_variance_ratio)
ax2.plot(range(1, len(cumsum_var) + 1), cumsum_var, 'bo-')
ax2.axhline(y=threshold, color='red', linestyle='--', label=f'{threshold*100}%阈值')
ax2.axvline(x=n_95, color='green', linestyle='--', label=f'95%方差: {n_95}')
ax2.set_title('累积解释方差比')
ax2.set_xlabel('主成分数量')
ax2.set_ylabel('累积解释方差比')
ax2.legend()
ax2.grid(True)
plt.tight_layout()
plt.show()
# 演示PCA降维
dim_reduction_demo = DimensionalityReductionDemo()
# PCA分析
X_pca, pca_model = dim_reduction_demo.pca_analysis(X_demo, n_components=10)
# 确定最优组件数
n_95, n_elbow = dim_reduction_demo.determine_optimal_components(X_demo)
5.2 其他降维技术
class AdvancedDimensionalityReduction:
"""高级降维技术"""
def __init__(self):
self.reducers = {}
def t_sne_analysis(self, X, n_components=2, perplexity=30):
"""t-SNE降维分析"""
from sklearn.manifold import TSNE
from sklearn.preprocessing import StandardScaler
# 标准化数据
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 如果数据维度太高,先用PCA降维
if X.shape[1] > 50:
from sklearn.decomposition import PCA
pca = PCA(n_components=50)
X_scaled = pca.fit_transform(X_scaled)
# t-SNE降维
tsne = TSNE(n_components=n_components, perplexity=perplexity, random_state=42)
X_tsne = tsne.fit_transform(X_scaled)
self.reducers['tsne'] = tsne
print(f"=== t-SNE分析 ===")
print(f"原始维度: {X.shape[1]}")
print(f"降维后维度: {n_components}")
print(f"困惑度: {perplexity}")
return X_tsne, tsne
def umap_analysis(self, X, n_components=2, n_neighbors=15):
"""UMAP降维分析"""
try:
import umap
from sklearn.preprocessing import StandardScaler
# 标准化数据
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# UMAP降维
reducer = umap.UMAP(n_components=n_components, n_neighbors=n_neighbors, random_state=42)
X_umap = reducer.fit_transform(X_scaled)
self.reducers['umap'] = reducer
print(f"=== UMAP分析 ===")
print(f"原始维度: {X.shape[1]}")
print(f"降维后维度: {n_components}")
print(f"邻居数: {n_neighbors}")
return X_umap, reducer
except ImportError:
print("UMAP未安装,请使用 'pip install umap-learn' 安装")
return None, None
def lda_analysis(self, X, y, n_components=None):
"""线性判别分析降维"""
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.preprocessing import StandardScaler
# 标准化数据
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# LDA降维
if n_components is None:
n_components = len(np.unique(y)) - 1
lda = LinearDiscriminantAnalysis(n_components=n_components)
X_lda = lda.fit_transform(X_scaled, y)
self.reducers['lda'] = lda
print(f"=== LDA分析 ===")
print(f"原始维度: {X.shape[1]}")
print(f"降维后维度: {X_lda.shape[1]}")
print(f"解释方差比: {lda.explained_variance_ratio_}")
return X_lda, lda
def compare_dimensionality_reduction(self, X, y=None):
"""比较不同降维方法"""
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
# 标准化数据
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 如果数据太大,先采样
if X.shape[0] > 1000:
indices = np.random.choice(X.shape[0], 1000, replace=False)
X_sample = X_scaled[indices]
y_sample = y[indices] if y is not None else None
else:
X_sample = X_scaled
y_sample = y
methods = {}
# PCA
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_sample)
methods['PCA'] = X_pca
# t-SNE
tsne = TSNE(n_components=2, random_state=42)
X_tsne = tsne.fit_transform(X_sample)
methods['t-SNE'] = X_tsne
# LDA (如果有标签)
if y_sample is not None:
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
lda = LinearDiscriminantAnalysis(n_components=2)
X_lda = lda.fit_transform(X_sample, y_sample)
methods['LDA'] = X_lda
# 可视化比较
self.plot_dimensionality_reduction_comparison(methods, y_sample)
return methods
def plot_dimensionality_reduction_comparison(self, methods, y=None):
"""可视化降维方法比较"""
n_methods = len(methods)
fig, axes = plt.subplots(1, n_methods, figsize=(5*n_methods, 5))
if n_methods == 1:
axes = [axes]
for i, (method_name, X_reduced) in enumerate(methods.items()):
if y is not None:
scatter = axes[i].scatter(X_reduced[:, 0], X_reduced[:, 1],
c=y, cmap='tab10', alpha=0.6)
plt.colorbar(scatter, ax=axes[i])
else:
axes[i].scatter(X_reduced[:, 0], X_reduced[:, 1], alpha=0.6)
axes[i].set_title(f'{method_name}降维结果')
axes[i].set_xlabel('第一维')
axes[i].set_ylabel('第二维')
plt.tight_layout()
plt.show()
# 演示高级降维技术
advanced_reduction = AdvancedDimensionalityReduction()
# t-SNE降维
X_tsne, tsne_model = advanced_reduction.t_sne_analysis(X_demo)
# LDA降维(需要标签)
X_lda, lda_model = advanced_reduction.lda_analysis(X_demo, y_demo)
# 比较不同降维方法
reduction_comparison = advanced_reduction.compare_dimensionality_reduction(X_demo, y_demo)
6. 数据预处理管道
6.1 构建预处理管道
class PreprocessingPipeline:
"""数据预处理管道"""
def __init__(self):
self.pipeline = None
self.feature_names = None
def create_basic_pipeline(self, numeric_features, categorical_features):
"""创建基础预处理管道"""
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
# 数值特征预处理
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
# 分类特征预处理
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('onehot', OneHotEncoder(handle_unknown='ignore'))
])
# 组合预处理器
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
])
self.pipeline = preprocessor
return preprocessor
def create_advanced_pipeline(self, numeric_features, categorical_features,
feature_selection=True, dimensionality_reduction=True):
"""创建高级预处理管道"""
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, RobustScaler
from sklearn.impute import KNNImputer, SimpleImputer
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.decomposition import PCA
# 数值特征预处理(使用更高级的方法)
numeric_transformer = Pipeline(steps=[
('imputer', KNNImputer(n_neighbors=5)),
('scaler', RobustScaler()) # 对异常值更鲁棒
])
# 分类特征预处理
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='most_frequent')),
('onehot', OneHotEncoder(handle_unknown='ignore', sparse=False))
])
# 组合预处理器
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
])
# 构建完整管道
pipeline_steps = [('preprocessor', preprocessor)]
if feature_selection:
pipeline_steps.append(('feature_selection', SelectKBest(f_classif, k=10)))
if dimensionality_reduction:
pipeline_steps.append(('pca', PCA(n_components=0.95)))
self.pipeline = Pipeline(pipeline_steps)
return self.pipeline
def create_custom_pipeline(self, config):
"""根据配置创建自定义管道"""
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
transformers = []
for feature_type, settings in config.items():
if feature_type == 'numeric':
transformer = self._create_numeric_transformer(settings)
transformers.append(('num', transformer, settings['features']))
elif feature_type == 'categorical':
transformer = self._create_categorical_transformer(settings)
transformers.append(('cat', transformer, settings['features']))
preprocessor = ColumnTransformer(transformers=transformers)
# 添加后处理步骤
pipeline_steps = [('preprocessor', preprocessor)]
if 'feature_selection' in config:
fs_config = config['feature_selection']
pipeline_steps.append(('feature_selection',
self._create_feature_selector(fs_config)))
if 'dimensionality_reduction' in config:
dr_config = config['dimensionality_reduction']
pipeline_steps.append(('dim_reduction',
self._create_dim_reducer(dr_config)))
self.pipeline = Pipeline(pipeline_steps)
return self.pipeline
def _create_numeric_transformer(self, settings):
"""创建数值特征转换器"""
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
steps = []
# 缺失值处理
if settings.get('imputation') == 'mean':
steps.append(('imputer', SimpleImputer(strategy='mean')))
elif settings.get('imputation') == 'median':
steps.append(('imputer', SimpleImputer(strategy='median')))
elif settings.get('imputation') == 'knn':
steps.append(('imputer', KNNImputer(n_neighbors=5)))
# 缩放
if settings.get('scaling') == 'standard':
steps.append(('scaler', StandardScaler()))
elif settings.get('scaling') == 'minmax':
steps.append(('scaler', MinMaxScaler()))
elif settings.get('scaling') == 'robust':
steps.append(('scaler', RobustScaler()))
return Pipeline(steps)
def _create_categorical_transformer(self, settings):
"""创建分类特征转换器"""
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder, LabelEncoder, OrdinalEncoder
steps = []
# 缺失值处理
if settings.get('imputation') == 'mode':
steps.append(('imputer', SimpleImputer(strategy='most_frequent')))
elif settings.get('imputation') == 'constant':
steps.append(('imputer', SimpleImputer(strategy='constant', fill_value='missing')))
# 编码
if settings.get('encoding') == 'onehot':
steps.append(('encoder', OneHotEncoder(handle_unknown='ignore', sparse=False)))
elif settings.get('encoding') == 'ordinal':
steps.append(('encoder', OrdinalEncoder(handle_unknown='use_encoded_value',
unknown_value=-1)))
return Pipeline(steps)
def _create_feature_selector(self, config):
"""创建特征选择器"""
from sklearn.feature_selection import SelectKBest, f_classif, chi2, RFE
from sklearn.ensemble import RandomForestClassifier
if config['method'] == 'univariate':
return SelectKBest(f_classif, k=config.get('k', 10))
elif config['method'] == 'rfe':
estimator = RandomForestClassifier(n_estimators=100, random_state=42)
return RFE(estimator, n_features_to_select=config.get('k', 10))
def _create_dim_reducer(self, config):
"""创建降维器"""
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
if config['method'] == 'pca':
return PCA(n_components=config.get('n_components', 0.95))
elif config['method'] == 'tsne':
return TSNE(n_components=config.get('n_components', 2), random_state=42)
def fit_transform_pipeline(self, X, y=None):
"""拟合并转换数据"""
if self.pipeline is None:
raise ValueError("管道未创建,请先调用create_*_pipeline方法")
X_transformed = self.pipeline.fit_transform(X, y)
# 尝试获取特征名称
try:
if hasattr(self.pipeline, 'get_feature_names_out'):
self.feature_names = self.pipeline.get_feature_names_out()
else:
# 对于较老版本的sklearn
self.feature_names = [f'feature_{i}' for i in range(X_transformed.shape[1])]
except:
self.feature_names = [f'feature_{i}' for i in range(X_transformed.shape[1])]
print(f"=== 管道处理结果 ===")
print(f"原始数据形状: {X.shape}")
print(f"处理后数据形状: {X_transformed.shape}")
print(f"特征数量变化: {X.shape[1]} -> {X_transformed.shape[1]}")
return X_transformed
def transform_pipeline(self, X):
"""使用已拟合的管道转换新数据"""
if self.pipeline is None:
raise ValueError("管道未创建或未拟合")
return self.pipeline.transform(X)
def get_pipeline_info(self):
"""获取管道信息"""
if self.pipeline is None:
return "管道未创建"
info = "=== 管道信息 ===\n"
if hasattr(self.pipeline, 'steps'):
for step_name, step_transformer in self.pipeline.steps:
info += f"步骤: {step_name}\n"
info += f" 转换器: {type(step_transformer).__name__}\n"
if hasattr(step_transformer, 'transformers'):
for name, transformer, features in step_transformer.transformers:
info += f" {name}: {type(transformer).__name__}\n"
info += f" 特征: {features}\n"
return info
# 演示预处理管道
pipeline_demo = PreprocessingPipeline()
# 准备特征列表
numeric_features = ['age', 'income', 'education_years']
categorical_features = ['gender', 'city']
# 创建基础管道
basic_pipeline = pipeline_demo.create_basic_pipeline(numeric_features, categorical_features)
# 使用管道处理数据
X_processed = pipeline_demo.fit_transform_pipeline(data_encoded[numeric_features + categorical_features])
# 显示管道信息
print(pipeline_demo.get_pipeline_info())
# 创建高级管道配置
advanced_config = {
'numeric': {
'features': numeric_features,
'imputation': 'knn',
'scaling': 'robust'
},
'categorical': {
'features': categorical_features,
'imputation': 'mode',
'encoding': 'onehot'
},
'feature_selection': {
'method': 'univariate',
'k': 8
},
'dimensionality_reduction': {
'method': 'pca',
'n_components': 0.95
}
}
# 创建自定义管道
custom_pipeline = pipeline_demo.create_custom_pipeline(advanced_config)
X_custom_processed = pipeline_demo.fit_transform_pipeline(data_encoded[numeric_features + categorical_features])
7. 实战案例
7.1 房价预测数据预处理
class HousePricePredictionPreprocessing:
"""房价预测数据预处理案例"""
def __init__(self):
self.pipeline = None
self.feature_info = {}
def create_house_price_dataset(self):
"""创建房价数据集"""
np.random.seed(42)
n_samples = 2000
# 基础特征
area = np.random.normal(120, 40, n_samples) # 面积
bedrooms = np.random.poisson(3, n_samples) # 卧室数
bathrooms = np.random.poisson(2, n_samples) # 浴室数
age = np.random.exponential(10, n_samples) # 房龄
# 分类特征
location = np.random.choice(['市中心', '郊区', '新区'], n_samples, p=[0.3, 0.5, 0.2])
house_type = np.random.choice(['公寓', '别墅', '联排'], n_samples, p=[0.6, 0.2, 0.2])
# 添加缺失值和异常值
area[np.random.choice(n_samples, 100, replace=False)] = np.nan
age[np.random.choice(n_samples, 50, replace=False)] = np.nan
area[np.random.choice(n_samples, 20, replace=False)] *= 3 # 异常值
# 计算目标变量(房价)
price = (area * 8000 +
bedrooms * 50000 +
bathrooms * 30000 -
age * 2000 +
np.where(location == '市中心', 200000,
np.where(location == '新区', 100000, 0)) +
np.where(house_type == '别墅', 300000,
np.where(house_type == '联排', 100000, 0)) +
np.random.normal(0, 50000, n_samples))
# 确保价格为正
price = np.maximum(price, 100000)
data = pd.DataFrame({
'area': area,
'bedrooms': bedrooms,
'bathrooms': bathrooms,
'age': age,
'location': location,
'house_type': house_type,
'price': price
})
return data
def analyze_house_data(self, data):
"""分析房价数据"""
print("=== 房价数据分析 ===")
print(f"数据形状: {data.shape}")
print(f"\n基本统计信息:")
print(data.describe())
print(f"\n缺失值情况:")
print(data.isnull().sum())
print(f"\n分类特征分布:")
for col in ['location', 'house_type']:
print(f"{col}:")
print(data[col].value_counts())
print()
# 可视化数据分布
self.visualize_house_data(data)
def visualize_house_data(self, data):
"""可视化房价数据"""
fig, axes = plt.subplots(3, 3, figsize=(18, 15))
fig.suptitle('房价数据分析', fontsize=16)
# 数值特征分布
numeric_cols = ['area', 'bedrooms', 'bathrooms', 'age', 'price']
for i, col in enumerate(numeric_cols):
row, col_idx = i // 3, i % 3
axes[row, col_idx].hist(data[col].dropna(), bins=30, alpha=0.7)
axes[row, col_idx].set_title(f'{col} 分布')
axes[row, col_idx].set_xlabel(col)
axes[row, col_idx].set_ylabel('频次')
# 分类特征分布
data['location'].value_counts().plot(kind='bar', ax=axes[1, 2])
axes[1, 2].set_title('位置分布')
axes[1, 2].tick_params(axis='x', rotation=45)
data['house_type'].value_counts().plot(kind='bar', ax=axes[2, 0])
axes[2, 0].set_title('房型分布')
axes[2, 0].tick_params(axis='x', rotation=45)
# 价格与面积关系
axes[2, 1].scatter(data['area'], data['price'], alpha=0.6)
axes[2, 1].set_title('价格 vs 面积')
axes[2, 1].set_xlabel('面积')
axes[2, 1].set_ylabel('价格')
# 不同位置的价格分布
for location in data['location'].unique():
subset = data[data['location'] == location]['price']
axes[2, 2].hist(subset, alpha=0.6, label=location, bins=20)
axes[2, 2].set_title('不同位置的价格分布')
axes[2, 2].set_xlabel('价格')
axes[2, 2].set_ylabel('频次')
axes[2, 2].legend()
plt.tight_layout()
plt.show()
def create_house_price_pipeline(self):
"""创建房价预测预处理管道"""
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import KNNImputer, SimpleImputer
from sklearn.feature_selection import SelectKBest, f_regression
# 数值特征预处理
numeric_features = ['area', 'bedrooms', 'bathrooms', 'age']
numeric_transformer = Pipeline(steps=[
('imputer', KNNImputer(n_neighbors=5)),
('scaler', StandardScaler())
])
# 分类特征预处理
categorical_features = ['location', 'house_type']
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='most_frequent')),
('onehot', OneHotEncoder(handle_unknown='ignore', sparse=False))
])
# 组合预处理器
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
])
# 完整管道
self.pipeline = Pipeline([
('preprocessor', preprocessor),
('feature_selection', SelectKBest(f_regression, k=8))
])
return self.pipeline
def process_house_data(self, data):
"""处理房价数据"""
# 分离特征和目标
X = data.drop('price', axis=1)
y = data['price']
# 创建并应用管道
pipeline = self.create_house_price_pipeline()
X_processed = pipeline.fit_transform(X, y)
print(f"=== 房价数据预处理结果 ===")
print(f"原始特征数: {X.shape[1]}")
print(f"处理后特征数: {X_processed.shape[1]}")
# 评估预处理效果
self.evaluate_preprocessing_effect(X, y, X_processed)
return X_processed, y
def evaluate_preprocessing_effect(self, X_original, y, X_processed):
"""评估预处理效果"""
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
# 准备原始数据(简单处理)
X_simple = X_original.copy()
# 简单填充缺失值
for col in X_simple.select_dtypes(include=[np.number]).columns:
X_simple[col].fillna(X_simple[col].median(), inplace=True)
# 简单编码分类变量
X_simple_encoded = pd.get_dummies(X_simple, drop_first=True)
# 标准化
scaler = StandardScaler()
X_simple_scaled = scaler.fit_transform(X_simple_encoded)
# 比较模型性能
models = {
'LinearRegression': LinearRegression(),
'RandomForest': RandomForestRegressor(n_estimators=100, random_state=42)
}
results = {}
for model_name, model in models.items():
# 简单预处理的结果
scores_simple = cross_val_score(model, X_simple_scaled, y, cv=5,
scoring='neg_mean_squared_error')
# 高级预处理的结果
scores_advanced = cross_val_score(model, X_processed, y, cv=5,
scoring='neg_mean_squared_error')
results[model_name] = {
'simple': -scores_simple.mean(),
'advanced': -scores_advanced.mean(),
'improvement': ((-scores_simple.mean()) - (-scores_advanced.mean())) / (-scores_simple.mean()) * 100
}
print("\n=== 预处理效果比较 (MSE) ===")
for model_name, result in results.items():
print(f"{model_name}:")
print(f" 简单预处理: {result['simple']:.2f}")
print(f" 高级预处理: {result['advanced']:.2f}")
print(f" 改进幅度: {result['improvement']:.2f}%")
print()
# 演示房价预测数据预处理
house_demo = HousePricePredictionPreprocessing()
# 创建数据
house_data = house_demo.create_house_price_dataset()
# 分析数据
house_demo.analyze_house_data(house_data)
# 处理数据
X_house_processed, y_house = house_demo.process_house_data(house_data)
本章小结
🎯 核心概念回顾
数据预处理的重要性
- 占据机器学习项目80%的时间
- 直接影响模型性能
- 包括数据清洗、转换、特征工程等
数据清洗技术
- 缺失值处理:删除、填充、插值
- 异常值检测:统计方法、机器学习方法
- 数据类型优化:内存使用、类型转换
特征缩放方法
- 标准化:零均值单位方差
- 归一化:缩放到[0,1]区间
- 鲁棒缩放:对异常值鲁棒
特征选择技术
- 过滤法:统计检验、相关性分析
- 包装法:递归特征消除、序列选择
- 嵌入法:L1正则化、树模型重要性
降维技术
- PCA:线性降维、保持方差
- t-SNE:非线性降维、可视化
- LDA:监督降维、类别分离
💡 最佳实践
数据探索优先
- 始终先进行数据探索分析
- 理解数据分布和质量问题
- 可视化数据特征
管道化处理
- 使用Pipeline组织预处理步骤
- 确保训练和测试数据一致性
- 便于参数调优和模型部署
方法选择指南
- 根据数据特征选择合适方法
- 考虑算法对预处理的要求
- 平衡处理效果和计算成本
⚠️ 常见陷阱
数据泄露
- 在分割数据前进行预处理
- 使用训练集统计量处理测试集
过度预处理
- 不是所有算法都需要缩放
- 避免不必要的特征变换
忽视业务逻辑
- 结合领域知识进行预处理
- 保持特征的可解释性
🚀 下一步学习
- 第3章:监督学习-分类算法
- 深入学习特征工程技术
- 了解自动化机器学习(AutoML)
📝 练习题
- 基础练习:使用不同的缺失值填充方法处理数据,比较效果
- 进阶练习:构建一个完整的预处理管道,包含所有主要步骤
- 实战练习:在真实数据集上应用本章学到的技术
恭喜! 🎉 您已经完成了数据预处理章节的学习。数据预处理是机器学习成功的基础,掌握这些技术将为后续的模型训练打下坚实基础。
第2章完结