2.1 数据预处理概述
数据预处理是机器学习项目中最重要的步骤之一,通常占据整个项目80%的时间。高质量的数据是构建优秀模型的基础,而原始数据往往存在各种问题,需要通过预处理来提高数据质量。
2.1.1 为什么需要数据预处理
原始数据的常见问题: - 缺失值(Missing Values) - 异常值(Outliers) - 数据格式不一致 - 重复数据 - 噪声数据 - 数据类型错误 - 量纲不统一
数据预处理的目标: - 提高数据质量 - 减少噪声和异常值的影响 - 统一数据格式和量纲 - 提取有用信息 - 提高模型性能
2.1.2 数据预处理流程
原始数据 → 数据清洗 → 数据集成 → 数据变换 → 数据规约 → 清洁数据
- 数据清洗:处理缺失值、异常值、噪声数据
- 数据集成:合并多个数据源
- 数据变换:标准化、归一化、离散化
- 数据规约:降维、特征选择
2.2 数据质量评估
2.2.1 数据质量维度
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
# 创建示例数据集
np.random.seed(42)
n_samples = 1000
data = {
'age': np.random.normal(35, 10, n_samples),
'income': np.random.lognormal(10, 1, n_samples),
'education': np.random.choice(['高中', '本科', '硕士', '博士'], n_samples, p=[0.3, 0.4, 0.2, 0.1]),
'experience': np.random.normal(8, 5, n_samples),
'score': np.random.normal(75, 15, n_samples)
}
# 引入一些数据质量问题
data['age'][50:60] = np.nan # 缺失值
data['income'][100:110] = data['income'][100:110] * 10 # 异常值
data['experience'] = np.maximum(0, data['experience']) # 确保经验年数非负
data['score'] = np.clip(data['score'], 0, 100) # 确保分数在0-100之间
df = pd.DataFrame(data)
print("数据集基本信息:")
print(df.info())
print("\n数据集描述统计:")
print(df.describe())
2.2.2 数据质量检查函数
def data_quality_report(df):
"""
生成数据质量报告
"""
print("=" * 50)
print("数据质量报告")
print("=" * 50)
# 基本信息
print(f"数据集形状: {df.shape}")
print(f"内存使用: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
# 缺失值分析
print("\n缺失值分析:")
missing_data = df.isnull().sum()
missing_percent = 100 * missing_data / len(df)
missing_table = pd.DataFrame({
'缺失数量': missing_data,
'缺失百分比': missing_percent
})
print(missing_table[missing_table['缺失数量'] > 0])
# 重复值分析
duplicates = df.duplicated().sum()
print(f"\n重复行数量: {duplicates}")
# 数据类型分析
print("\n数据类型分析:")
print(df.dtypes.value_counts())
# 数值型变量的异常值检测(使用IQR方法)
numeric_cols = df.select_dtypes(include=[np.number]).columns
print("\n异常值分析(IQR方法):")
for col in numeric_cols:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = df[(df[col] < lower_bound) | (df[col] > upper_bound)][col]
print(f"{col}: {len(outliers)} 个异常值 ({len(outliers)/len(df)*100:.1f}%)")
return missing_table
# 生成数据质量报告
quality_report = data_quality_report(df)
2.2.3 数据分布可视化
def plot_data_distribution(df):
"""
绘制数据分布图
"""
numeric_cols = df.select_dtypes(include=[np.number]).columns
n_cols = len(numeric_cols)
fig, axes = plt.subplots(2, n_cols, figsize=(4*n_cols, 8))
for i, col in enumerate(numeric_cols):
# 直方图
axes[0, i].hist(df[col].dropna(), bins=30, alpha=0.7, edgecolor='black')
axes[0, i].set_title(f'{col} 分布')
axes[0, i].set_xlabel(col)
axes[0, i].set_ylabel('频率')
# 箱线图
axes[1, i].boxplot(df[col].dropna())
axes[1, i].set_title(f'{col} 箱线图')
axes[1, i].set_ylabel(col)
plt.tight_layout()
plt.show()
# 绘制分布图
plot_data_distribution(df)
2.3 缺失值处理
2.3.1 缺失值类型
完全随机缺失(MCAR - Missing Completely At Random): - 缺失与任何变量都无关 - 最理想的缺失类型 - 可以安全删除或简单填充
随机缺失(MAR - Missing At Random): - 缺失与观测到的变量有关 - 可以通过其他变量预测 - 需要更复杂的处理方法
非随机缺失(MNAR - Missing Not At Random): - 缺失与未观测到的值有关 - 最难处理的缺失类型 - 需要领域知识
2.3.2 缺失值检测
def missing_value_analysis(df):
"""
缺失值分析
"""
# 缺失值统计
missing_stats = pd.DataFrame({
'缺失数量': df.isnull().sum(),
'缺失百分比': df.isnull().sum() / len(df) * 100,
'数据类型': df.dtypes
})
missing_stats = missing_stats[missing_stats['缺失数量'] > 0].sort_values('缺失百分比', ascending=False)
# 缺失值模式可视化
plt.figure(figsize=(12, 8))
# 缺失值热图
plt.subplot(2, 2, 1)
sns.heatmap(df.isnull(), cbar=True, yticklabels=False, cmap='viridis')
plt.title('缺失值模式')
# 缺失值条形图
plt.subplot(2, 2, 2)
missing_stats['缺失百分比'].plot(kind='bar')
plt.title('各变量缺失百分比')
plt.ylabel('缺失百分比 (%)')
plt.xticks(rotation=45)
# 缺失值组合模式
plt.subplot(2, 2, 3)
missing_combinations = df.isnull().value_counts()
missing_combinations.head(10).plot(kind='bar')
plt.title('缺失值组合模式(前10)')
plt.ylabel('频次')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
return missing_stats
# 分析缺失值
missing_analysis = missing_value_analysis(df)
print(missing_analysis)
2.3.3 缺失值处理方法
删除法
# 1. 删除包含缺失值的行
df_dropna_rows = df.dropna()
print(f"删除缺失行后数据形状: {df_dropna_rows.shape}")
# 2. 删除包含缺失值的列
df_dropna_cols = df.dropna(axis=1)
print(f"删除缺失列后数据形状: {df_dropna_cols.shape}")
# 3. 删除缺失值超过阈值的行/列
threshold = 0.8 # 保留至少80%非缺失值的行
df_threshold = df.dropna(thresh=int(threshold * len(df.columns)))
print(f"阈值删除后数据形状: {df_threshold.shape}")
填充法
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
# 1. 简单填充
class SimpleImputerMethods:
def __init__(self, df):
self.df = df.copy()
def mean_imputation(self, columns=None):
"""均值填充"""
if columns is None:
columns = self.df.select_dtypes(include=[np.number]).columns
for col in columns:
self.df[col].fillna(self.df[col].mean(), inplace=True)
return self.df
def median_imputation(self, columns=None):
"""中位数填充"""
if columns is None:
columns = self.df.select_dtypes(include=[np.number]).columns
for col in columns:
self.df[col].fillna(self.df[col].median(), inplace=True)
return self.df
def mode_imputation(self, columns=None):
"""众数填充"""
if columns is None:
columns = self.df.select_dtypes(include=['object']).columns
for col in columns:
mode_value = self.df[col].mode()
if len(mode_value) > 0:
self.df[col].fillna(mode_value[0], inplace=True)
return self.df
def forward_fill(self, columns=None):
"""前向填充"""
if columns is None:
columns = self.df.columns
for col in columns:
self.df[col].fillna(method='ffill', inplace=True)
return self.df
def backward_fill(self, columns=None):
"""后向填充"""
if columns is None:
columns = self.df.columns
for col in columns:
self.df[col].fillna(method='bfill', inplace=True)
return self.df
# 使用简单填充方法
imputer = SimpleImputerMethods(df)
df_mean_imputed = imputer.mean_imputation(['age', 'income', 'experience', 'score'])
print("均值填充后的缺失值:")
print(df_mean_imputed.isnull().sum())
高级填充方法
# 2. KNN填充
def knn_imputation(df, n_neighbors=5):
"""
KNN填充
"""
# 分离数值型和类别型变量
numeric_cols = df.select_dtypes(include=[np.number]).columns
categorical_cols = df.select_dtypes(include=['object']).columns
df_imputed = df.copy()
# 对数值型变量使用KNN填充
if len(numeric_cols) > 0:
knn_imputer = KNNImputer(n_neighbors=n_neighbors)
df_imputed[numeric_cols] = knn_imputer.fit_transform(df[numeric_cols])
# 对类别型变量使用众数填充
for col in categorical_cols:
mode_value = df[col].mode()
if len(mode_value) > 0:
df_imputed[col].fillna(mode_value[0], inplace=True)
return df_imputed
# 使用KNN填充
df_knn_imputed = knn_imputation(df)
print("KNN填充后的缺失值:")
print(df_knn_imputed.isnull().sum())
# 3. 迭代填充(MICE)
def mice_imputation(df, max_iter=10, random_state=42):
"""
MICE(多重插补链式方程)填充
"""
# 只对数值型变量使用MICE
numeric_cols = df.select_dtypes(include=[np.number]).columns
categorical_cols = df.select_dtypes(include=['object']).columns
df_imputed = df.copy()
if len(numeric_cols) > 0:
mice_imputer = IterativeImputer(max_iter=max_iter, random_state=random_state)
df_imputed[numeric_cols] = mice_imputer.fit_transform(df[numeric_cols])
# 类别型变量使用众数填充
for col in categorical_cols:
mode_value = df[col].mode()
if len(mode_value) > 0:
df_imputed[col].fillna(mode_value[0], inplace=True)
return df_imputed
# 使用MICE填充
df_mice_imputed = mice_imputation(df)
print("MICE填充后的缺失值:")
print(df_mice_imputed.isnull().sum())
填充效果比较
def compare_imputation_methods(df_original, methods_dict):
"""
比较不同填充方法的效果
"""
# 创建人工缺失值用于测试
df_test = df_original.copy()
# 随机设置20%的值为缺失
np.random.seed(42)
for col in df_test.select_dtypes(include=[np.number]).columns:
missing_idx = np.random.choice(df_test.index, size=int(0.2*len(df_test)), replace=False)
df_test.loc[missing_idx, col] = np.nan
results = {}
for method_name, df_imputed in methods_dict.items():
# 计算填充值与原始值的差异
mse_scores = []
for col in df_original.select_dtypes(include=[np.number]).columns:
original_values = df_original[col]
imputed_values = df_imputed[col]
# 只比较原本有值但被人工设为缺失的位置
mask = df_test[col].isnull() & df_original[col].notnull()
if mask.sum() > 0:
mse = np.mean((original_values[mask] - imputed_values[mask]) ** 2)
mse_scores.append(mse)
results[method_name] = np.mean(mse_scores) if mse_scores else np.nan
return results
# 比较填充方法
methods = {
'均值填充': df_mean_imputed,
'KNN填充': df_knn_imputed,
'MICE填充': df_mice_imputed
}
# 注意:这里需要原始完整数据进行比较,实际项目中可能没有
# comparison_results = compare_imputation_methods(df_complete, methods)
# print("填充方法比较(MSE越小越好):")
# for method, mse in comparison_results.items():
# print(f"{method}: {mse:.4f}")
2.4 异常值检测与处理
2.4.1 异常值检测方法
统计方法
class OutlierDetector:
def __init__(self, df):
self.df = df.copy()
def z_score_detection(self, columns=None, threshold=3):
"""
Z-Score方法检测异常值
"""
if columns is None:
columns = self.df.select_dtypes(include=[np.number]).columns
outliers = {}
for col in columns:
z_scores = np.abs(stats.zscore(self.df[col].dropna()))
outlier_indices = self.df[col].dropna().index[z_scores > threshold]
outliers[col] = outlier_indices.tolist()
return outliers
def iqr_detection(self, columns=None, factor=1.5):
"""
IQR方法检测异常值
"""
if columns is None:
columns = self.df.select_dtypes(include=[np.number]).columns
outliers = {}
for col in columns:
Q1 = self.df[col].quantile(0.25)
Q3 = self.df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - factor * IQR
upper_bound = Q3 + factor * IQR
outlier_mask = (self.df[col] < lower_bound) | (self.df[col] > upper_bound)
outliers[col] = self.df[outlier_mask].index.tolist()
return outliers
def modified_z_score_detection(self, columns=None, threshold=3.5):
"""
修正Z-Score方法(基于中位数)
"""
if columns is None:
columns = self.df.select_dtypes(include=[np.number]).columns
outliers = {}
for col in columns:
median = self.df[col].median()
mad = np.median(np.abs(self.df[col] - median))
if mad == 0:
mad = np.std(self.df[col])
modified_z_scores = 0.6745 * (self.df[col] - median) / mad
outlier_mask = np.abs(modified_z_scores) > threshold
outliers[col] = self.df[outlier_mask].index.tolist()
return outliers
# 使用异常值检测
detector = OutlierDetector(df)
# Z-Score检测
z_outliers = detector.z_score_detection()
print("Z-Score异常值检测结果:")
for col, indices in z_outliers.items():
print(f"{col}: {len(indices)} 个异常值")
# IQR检测
iqr_outliers = detector.iqr_detection()
print("\nIQR异常值检测结果:")
for col, indices in iqr_outliers.items():
print(f"{col}: {len(indices)} 个异常值")
# 修正Z-Score检测
modified_z_outliers = detector.modified_z_score_detection()
print("\n修正Z-Score异常值检测结果:")
for col, indices in modified_z_outliers.items():
print(f"{col}: {len(indices)} 个异常值")
机器学习方法
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler
def ml_outlier_detection(df, method='isolation_forest', contamination=0.1):
"""
使用机器学习方法检测异常值
"""
# 只使用数值型特征
numeric_cols = df.select_dtypes(include=[np.number]).columns
X = df[numeric_cols].dropna()
# 标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
if method == 'isolation_forest':
detector = IsolationForest(contamination=contamination, random_state=42)
elif method == 'lof':
detector = LocalOutlierFactor(contamination=contamination)
elif method == 'one_class_svm':
detector = OneClassSVM(nu=contamination)
else:
raise ValueError("Unsupported method")
# 检测异常值
if method == 'lof':
outlier_labels = detector.fit_predict(X_scaled)
else:
outlier_labels = detector.fit_predict(X_scaled)
# 返回异常值索引
outlier_indices = X.index[outlier_labels == -1].tolist()
return outlier_indices, detector
# 使用不同的机器学习方法检测异常值
methods = ['isolation_forest', 'lof', 'one_class_svm']
for method in methods:
outlier_indices, model = ml_outlier_detection(df, method=method)
print(f"{method.upper()} 检测到 {len(outlier_indices)} 个异常值")
2.4.2 异常值可视化
def visualize_outliers(df, outlier_indices, method_name):
"""
可视化异常值
"""
numeric_cols = df.select_dtypes(include=[np.number]).columns
n_cols = len(numeric_cols)
fig, axes = plt.subplots(2, n_cols, figsize=(4*n_cols, 8))
for i, col in enumerate(numeric_cols):
# 散点图
axes[0, i].scatter(df.index, df[col], alpha=0.6, label='正常值')
axes[0, i].scatter(outlier_indices, df.loc[outlier_indices, col],
color='red', alpha=0.8, label='异常值')
axes[0, i].set_title(f'{col} - {method_name}')
axes[0, i].set_xlabel('索引')
axes[0, i].set_ylabel(col)
axes[0, i].legend()
# 箱线图
box_data = [df[col].dropna(), df.loc[outlier_indices, col].dropna()]
axes[1, i].boxplot(box_data, labels=['全部数据', '异常值'])
axes[1, i].set_title(f'{col} 箱线图')
axes[1, i].set_ylabel(col)
plt.tight_layout()
plt.show()
# 可视化IQR方法检测的异常值
all_iqr_outliers = set()
for indices in iqr_outliers.values():
all_iqr_outliers.update(indices)
visualize_outliers(df, list(all_iqr_outliers), 'IQR方法')
2.4.3 异常值处理方法
class OutlierHandler:
def __init__(self, df):
self.df = df.copy()
def remove_outliers(self, outlier_indices):
"""
删除异常值
"""
return self.df.drop(outlier_indices)
def cap_outliers(self, columns=None, method='iqr', factor=1.5):
"""
截断异常值(Winsorization)
"""
if columns is None:
columns = self.df.select_dtypes(include=[np.number]).columns
df_capped = self.df.copy()
for col in columns:
if method == 'iqr':
Q1 = df_capped[col].quantile(0.25)
Q3 = df_capped[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - factor * IQR
upper_bound = Q3 + factor * IQR
elif method == 'percentile':
lower_bound = df_capped[col].quantile(0.05)
upper_bound = df_capped[col].quantile(0.95)
df_capped[col] = np.clip(df_capped[col], lower_bound, upper_bound)
return df_capped
def transform_outliers(self, columns=None, method='log'):
"""
变换异常值
"""
if columns is None:
columns = self.df.select_dtypes(include=[np.number]).columns
df_transformed = self.df.copy()
for col in columns:
if method == 'log':
# 确保所有值为正
min_val = df_transformed[col].min()
if min_val <= 0:
df_transformed[col] = df_transformed[col] - min_val + 1
df_transformed[col] = np.log(df_transformed[col])
elif method == 'sqrt':
# 确保所有值非负
df_transformed[col] = np.sqrt(np.abs(df_transformed[col]))
elif method == 'box_cox':
from scipy.stats import boxcox
# 确保所有值为正
min_val = df_transformed[col].min()
if min_val <= 0:
df_transformed[col] = df_transformed[col] - min_val + 1
df_transformed[col], _ = boxcox(df_transformed[col])
return df_transformed
def replace_outliers(self, outlier_indices, method='median'):
"""
替换异常值
"""
df_replaced = self.df.copy()
for col in self.df.select_dtypes(include=[np.number]).columns:
col_outliers = [idx for idx in outlier_indices if idx in df_replaced.index]
if method == 'median':
replacement_value = df_replaced[col].median()
elif method == 'mean':
replacement_value = df_replaced[col].mean()
elif method == 'mode':
replacement_value = df_replaced[col].mode()[0]
df_replaced.loc[col_outliers, col] = replacement_value
return df_replaced
# 使用异常值处理方法
handler = OutlierHandler(df)
# 1. 截断异常值
df_capped = handler.cap_outliers(method='iqr')
print("截断后的数据统计:")
print(df_capped.describe())
# 2. 对数变换
df_log_transformed = handler.transform_outliers(columns=['income'], method='log')
print("\n对数变换后的收入统计:")
print(df_log_transformed['income'].describe())
# 3. 替换异常值
all_outliers = list(all_iqr_outliers)
df_replaced = handler.replace_outliers(all_outliers, method='median')
print("\n替换异常值后的数据统计:")
print(df_replaced.describe())
2.5 数据标准化与归一化
2.5.1 为什么需要数据标准化
不同特征的量纲和数值范围可能差异很大,这会影响机器学习算法的性能:
- 距离敏感算法:KNN、K-Means、SVM等
- 梯度下降算法:神经网络、线性回归等
- 正则化算法:Ridge、Lasso等
2.5.2 标准化方法
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, Normalizer
from sklearn.preprocessing import QuantileTransformer, PowerTransformer
class DataScaler:
def __init__(self, df):
self.df = df.copy()
self.scalers = {}
def standard_scaling(self, columns=None):
"""
Z-Score标准化:(x - μ) / σ
"""
if columns is None:
columns = self.df.select_dtypes(include=[np.number]).columns
scaler = StandardScaler()
df_scaled = self.df.copy()
df_scaled[columns] = scaler.fit_transform(self.df[columns])
self.scalers['standard'] = scaler
return df_scaled
def min_max_scaling(self, columns=None, feature_range=(0, 1)):
"""
最小-最大标准化:(x - min) / (max - min)
"""
if columns is None:
columns = self.df.select_dtypes(include=[np.number]).columns
scaler = MinMaxScaler(feature_range=feature_range)
df_scaled = self.df.copy()
df_scaled[columns] = scaler.fit_transform(self.df[columns])
self.scalers['minmax'] = scaler
return df_scaled
def robust_scaling(self, columns=None):
"""
鲁棒标准化:(x - median) / IQR
"""
if columns is None:
columns = self.df.select_dtypes(include=[np.number]).columns
scaler = RobustScaler()
df_scaled = self.df.copy()
df_scaled[columns] = scaler.fit_transform(self.df[columns])
self.scalers['robust'] = scaler
return df_scaled
def unit_vector_scaling(self, columns=None):
"""
单位向量标准化:x / ||x||
"""
if columns is None:
columns = self.df.select_dtypes(include=[np.number]).columns
scaler = Normalizer()
df_scaled = self.df.copy()
df_scaled[columns] = scaler.fit_transform(self.df[columns])
self.scalers['normalizer'] = scaler
return df_scaled
def quantile_scaling(self, columns=None, n_quantiles=1000):
"""
分位数标准化
"""
if columns is None:
columns = self.df.select_dtypes(include=[np.number]).columns
scaler = QuantileTransformer(n_quantiles=n_quantiles, random_state=42)
df_scaled = self.df.copy()
df_scaled[columns] = scaler.fit_transform(self.df[columns])
self.scalers['quantile'] = scaler
return df_scaled
def power_transform(self, columns=None, method='yeo-johnson'):
"""
幂变换
"""
if columns is None:
columns = self.df.select_dtypes(include=[np.number]).columns
scaler = PowerTransformer(method=method, standardize=True)
df_scaled = self.df.copy()
df_scaled[columns] = scaler.fit_transform(self.df[columns])
self.scalers['power'] = scaler
return df_scaled
# 使用不同的标准化方法
scaler = DataScaler(df.dropna())
# 标准化
df_standard = scaler.standard_scaling()
df_minmax = scaler.min_max_scaling()
df_robust = scaler.robust_scaling()
df_quantile = scaler.quantile_scaling()
df_power = scaler.power_transform()
print("原始数据统计:")
print(df.describe())
print("\nZ-Score标准化后:")
print(df_standard.describe())
print("\nMin-Max标准化后:")
print(df_minmax.describe())
2.5.3 标准化效果比较
def compare_scaling_methods(df_original, scaling_results):
"""
比较不同标准化方法的效果
"""
numeric_cols = df_original.select_dtypes(include=[np.number]).columns
n_methods = len(scaling_results)
n_cols = len(numeric_cols)
fig, axes = plt.subplots(n_methods, n_cols, figsize=(4*n_cols, 3*n_methods))
for i, (method_name, df_scaled) in enumerate(scaling_results.items()):
for j, col in enumerate(numeric_cols):
if n_methods == 1:
ax = axes[j]
else:
ax = axes[i, j]
ax.hist(df_scaled[col].dropna(), bins=30, alpha=0.7, edgecolor='black')
ax.set_title(f'{method_name} - {col}')
ax.set_xlabel(col)
ax.set_ylabel('频率')
plt.tight_layout()
plt.show()
# 比较标准化效果
scaling_results = {
'原始数据': df.dropna(),
'Z-Score': df_standard,
'Min-Max': df_minmax,
'Robust': df_robust,
'Quantile': df_quantile
}
compare_scaling_methods(df.dropna(), scaling_results)
2.6 特征工程
2.6.1 特征选择
过滤法(Filter Methods)
from sklearn.feature_selection import SelectKBest, chi2, f_classif, mutual_info_classif
from sklearn.feature_selection import VarianceThreshold
from scipy.stats import pearsonr
class FeatureSelector:
def __init__(self, X, y=None):
self.X = X
self.y = y
self.selected_features = {}
def variance_threshold_selection(self, threshold=0.0):
"""
方差阈值选择
"""
selector = VarianceThreshold(threshold=threshold)
X_selected = selector.fit_transform(self.X)
selected_features = self.X.columns[selector.get_support()].tolist()
self.selected_features['variance_threshold'] = selected_features
return X_selected, selected_features
def correlation_selection(self, threshold=0.95):
"""
相关性选择(去除高相关特征)
"""
corr_matrix = self.X.corr().abs()
upper_triangle = corr_matrix.where(
np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)
)
# 找到高相关的特征对
high_corr_features = [column for column in upper_triangle.columns
if any(upper_triangle[column] > threshold)]
selected_features = [col for col in self.X.columns if col not in high_corr_features]
self.selected_features['correlation'] = selected_features
return self.X[selected_features], selected_features
def univariate_selection(self, score_func=f_classif, k=10):
"""
单变量特征选择
"""
if self.y is None:
raise ValueError("需要目标变量进行单变量选择")
selector = SelectKBest(score_func=score_func, k=k)
X_selected = selector.fit_transform(self.X, self.y)
selected_features = self.X.columns[selector.get_support()].tolist()
feature_scores = selector.scores_
self.selected_features['univariate'] = selected_features
return X_selected, selected_features, feature_scores
def mutual_info_selection(self, k=10):
"""
互信息特征选择
"""
if self.y is None:
raise ValueError("需要目标变量进行互信息选择")
selector = SelectKBest(score_func=mutual_info_classif, k=k)
X_selected = selector.fit_transform(self.X, self.y)
selected_features = self.X.columns[selector.get_support()].tolist()
feature_scores = selector.scores_
self.selected_features['mutual_info'] = selected_features
return X_selected, selected_features, feature_scores
# 创建示例分类数据
from sklearn.datasets import make_classification
X_demo, y_demo = make_classification(n_samples=1000, n_features=20, n_informative=10,
n_redundant=5, n_clusters_per_class=1, random_state=42)
X_demo_df = pd.DataFrame(X_demo, columns=[f'feature_{i}' for i in range(20)])
# 使用特征选择
selector = FeatureSelector(X_demo_df, y_demo)
# 方差阈值选择
X_var, features_var = selector.variance_threshold_selection(threshold=0.1)
print(f"方差阈值选择后的特征数量: {len(features_var)}")
# 相关性选择
X_corr, features_corr = selector.correlation_selection(threshold=0.9)
print(f"相关性选择后的特征数量: {len(features_corr)}")
# 单变量选择
X_uni, features_uni, scores_uni = selector.univariate_selection(k=10)
print(f"单变量选择后的特征数量: {len(features_uni)}")
# 互信息选择
X_mi, features_mi, scores_mi = selector.mutual_info_selection(k=10)
print(f"互信息选择后的特征数量: {len(features_mi)}")
包装法(Wrapper Methods)
from sklearn.feature_selection import RFE, RFECV
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import StratifiedKFold
def recursive_feature_elimination(X, y, estimator=None, n_features=10):
"""
递归特征消除
"""
if estimator is None:
estimator = LogisticRegression(random_state=42)
# RFE
rfe = RFE(estimator=estimator, n_features_to_select=n_features)
X_rfe = rfe.fit_transform(X, y)
selected_features = X.columns[rfe.support_].tolist()
feature_ranking = rfe.ranking_
return X_rfe, selected_features, feature_ranking
def recursive_feature_elimination_cv(X, y, estimator=None, cv=5):
"""
交叉验证递归特征消除
"""
if estimator is None:
estimator = LogisticRegression(random_state=42)
# RFECV
rfecv = RFECV(estimator=estimator, cv=StratifiedKFold(cv), scoring='accuracy')
X_rfecv = rfecv.fit_transform(X, y)
selected_features = X.columns[rfecv.support_].tolist()
optimal_features = rfecv.n_features_
return X_rfecv, selected_features, optimal_features
# 使用包装法
X_rfe, features_rfe, ranking_rfe = recursive_feature_elimination(X_demo_df, y_demo)
print(f"RFE选择的特征: {features_rfe}")
X_rfecv, features_rfecv, optimal_n = recursive_feature_elimination_cv(X_demo_df, y_demo)
print(f"RFECV选择的特征数量: {optimal_n}")
print(f"RFECV选择的特征: {features_rfecv}")
嵌入法(Embedded Methods)
from sklearn.linear_model import Lasso, Ridge
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_selection import SelectFromModel
def lasso_feature_selection(X, y, alpha=0.01):
"""
Lasso特征选择
"""
lasso = Lasso(alpha=alpha, random_state=42)
selector = SelectFromModel(lasso)
X_lasso = selector.fit_transform(X, y)
selected_features = X.columns[selector.get_support()].tolist()
return X_lasso, selected_features
def random_forest_feature_selection(X, y, threshold='median'):
"""
随机森林特征选择
"""
rf = RandomForestClassifier(n_estimators=100, random_state=42)
selector = SelectFromModel(rf, threshold=threshold)
X_rf = selector.fit_transform(X, y)
selected_features = X.columns[selector.get_support()].tolist()
# 获取特征重要性
rf.fit(X, y)
feature_importance = rf.feature_importances_
return X_rf, selected_features, feature_importance
# 使用嵌入法
X_lasso, features_lasso = lasso_feature_selection(X_demo_df, y_demo)
print(f"Lasso选择的特征数量: {len(features_lasso)}")
X_rf, features_rf, importance_rf = random_forest_feature_selection(X_demo_df, y_demo)
print(f"随机森林选择的特征数量: {len(features_rf)}")
# 可视化特征重要性
plt.figure(figsize=(12, 6))
feature_names = [f'feature_{i}' for i in range(20)]
indices = np.argsort(importance_rf)[::-1]
plt.bar(range(len(importance_rf)), importance_rf[indices])
plt.xticks(range(len(importance_rf)), [feature_names[i] for i in indices], rotation=45)
plt.title('随机森林特征重要性')
plt.xlabel('特征')
plt.ylabel('重要性')
plt.tight_layout()
plt.show()
2.6.2 特征创建
数值特征创建
class FeatureCreator:
def __init__(self, df):
self.df = df.copy()
def create_polynomial_features(self, columns, degree=2):
"""
创建多项式特征
"""
from sklearn.preprocessing import PolynomialFeatures
poly = PolynomialFeatures(degree=degree, include_bias=False)
poly_features = poly.fit_transform(self.df[columns])
# 创建特征名称
feature_names = poly.get_feature_names_out(columns)
# 添加到数据框
poly_df = pd.DataFrame(poly_features, columns=feature_names, index=self.df.index)
return pd.concat([self.df, poly_df], axis=1)
def create_interaction_features(self, column_pairs):
"""
创建交互特征
"""
df_new = self.df.copy()
for col1, col2 in column_pairs:
if col1 in df_new.columns and col2 in df_new.columns:
# 乘积交互
df_new[f'{col1}_x_{col2}'] = df_new[col1] * df_new[col2]
# 比值交互(避免除零)
df_new[f'{col1}_div_{col2}'] = df_new[col1] / (df_new[col2] + 1e-8)
# 差值交互
df_new[f'{col1}_minus_{col2}'] = df_new[col1] - df_new[col2]
# 和值交互
df_new[f'{col1}_plus_{col2}'] = df_new[col1] + df_new[col2]
return df_new
def create_binning_features(self, column, n_bins=5, strategy='quantile'):
"""
创建分箱特征
"""
from sklearn.preprocessing import KBinsDiscretizer
df_new = self.df.copy()
discretizer = KBinsDiscretizer(n_bins=n_bins, encode='ordinal', strategy=strategy)
binned_feature = discretizer.fit_transform(df_new[[column]])
df_new[f'{column}_binned'] = binned_feature.flatten()
return df_new
def create_aggregation_features(self, group_column, agg_columns, agg_functions=['mean', 'std', 'min', 'max']):
"""
创建聚合特征
"""
df_new = self.df.copy()
for agg_col in agg_columns:
for func in agg_functions:
agg_feature = df_new.groupby(group_column)[agg_col].transform(func)
df_new[f'{agg_col}_{func}_by_{group_column}'] = agg_feature
return df_new
def create_time_features(self, date_column):
"""
创建时间特征
"""
df_new = self.df.copy()
# 确保是datetime类型
df_new[date_column] = pd.to_datetime(df_new[date_column])
# 提取时间特征
df_new[f'{date_column}_year'] = df_new[date_column].dt.year
df_new[f'{date_column}_month'] = df_new[date_column].dt.month
df_new[f'{date_column}_day'] = df_new[date_column].dt.day
df_new[f'{date_column}_dayofweek'] = df_new[date_column].dt.dayofweek
df_new[f'{date_column}_quarter'] = df_new[date_column].dt.quarter
df_new[f'{date_column}_is_weekend'] = (df_new[date_column].dt.dayofweek >= 5).astype(int)
return df_new
# 使用特征创建
creator = FeatureCreator(df.dropna())
# 创建交互特征
interaction_pairs = [('age', 'experience'), ('income', 'age')]
df_with_interactions = creator.create_interaction_features(interaction_pairs)
print(f"添加交互特征后的列数: {df_with_interactions.shape[1]}")
# 创建分箱特征
df_with_binning = creator.create_binning_features('age', n_bins=5)
print(f"年龄分箱: {df_with_binning['age_binned'].value_counts().sort_index()}")
# 创建聚合特征
df_with_agg = creator.create_aggregation_features('education', ['age', 'income'])
print(f"添加聚合特征后的列数: {df_with_agg.shape[1]}")
类别特征编码
from sklearn.preprocessing import LabelEncoder, OneHotEncoder, OrdinalEncoder
from sklearn.preprocessing import TargetEncoder
class CategoricalEncoder:
def __init__(self, df):
self.df = df.copy()
self.encoders = {}
def label_encoding(self, columns):
"""
标签编码
"""
df_encoded = self.df.copy()
for col in columns:
if col in df_encoded.columns:
le = LabelEncoder()
df_encoded[f'{col}_label'] = le.fit_transform(df_encoded[col].astype(str))
self.encoders[f'{col}_label'] = le
return df_encoded
def one_hot_encoding(self, columns, drop_first=True):
"""
独热编码
"""
df_encoded = self.df.copy()
for col in columns:
if col in df_encoded.columns:
# 使用pandas的get_dummies
dummies = pd.get_dummies(df_encoded[col], prefix=col, drop_first=drop_first)
df_encoded = pd.concat([df_encoded, dummies], axis=1)
return df_encoded
def ordinal_encoding(self, columns, categories=None):
"""
序数编码
"""
df_encoded = self.df.copy()
for col in columns:
if col in df_encoded.columns:
oe = OrdinalEncoder(categories=categories)
df_encoded[f'{col}_ordinal'] = oe.fit_transform(df_encoded[[col]])
self.encoders[f'{col}_ordinal'] = oe
return df_encoded
def target_encoding(self, columns, target):
"""
目标编码
"""
df_encoded = self.df.copy()
for col in columns:
if col in df_encoded.columns:
# 计算每个类别的目标变量均值
target_mean = df_encoded.groupby(col)[target].mean()
df_encoded[f'{col}_target'] = df_encoded[col].map(target_mean)
return df_encoded
def frequency_encoding(self, columns):
"""
频率编码
"""
df_encoded = self.df.copy()
for col in columns:
if col in df_encoded.columns:
freq_map = df_encoded[col].value_counts().to_dict()
df_encoded[f'{col}_freq'] = df_encoded[col].map(freq_map)
return df_encoded
# 使用类别特征编码
encoder = CategoricalEncoder(df.dropna())
# 标签编码
df_label = encoder.label_encoding(['education'])
print("标签编码结果:")
print(df_label[['education', 'education_label']].head())
# 独热编码
df_onehot = encoder.one_hot_encoding(['education'])
print(f"\n独热编码后的列数: {df_onehot.shape[1]}")
print("独热编码列名:")
print([col for col in df_onehot.columns if 'education_' in col])
# 频率编码
df_freq = encoder.frequency_encoding(['education'])
print("\n频率编码结果:")
print(df_freq[['education', 'education_freq']].head())
2.7 数据预处理管道
2.7.1 使用sklearn Pipeline
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.base import BaseEstimator, TransformerMixin
# 自定义变换器
class OutlierRemover(BaseEstimator, TransformerMixin):
def __init__(self, factor=1.5):
self.factor = factor
def fit(self, X, y=None):
Q1 = np.percentile(X, 25, axis=0)
Q3 = np.percentile(X, 75, axis=0)
IQR = Q3 - Q1
self.lower_bound = Q1 - self.factor * IQR
self.upper_bound = Q3 + self.factor * IQR
return self
def transform(self, X):
X_transformed = X.copy()
for i in range(X.shape[1]):
X_transformed[:, i] = np.clip(X_transformed[:, i],
self.lower_bound[i],
self.upper_bound[i])
return X_transformed
class FeatureCreatorTransformer(BaseEstimator, TransformerMixin):
def __init__(self, create_interactions=True):
self.create_interactions = create_interactions
def fit(self, X, y=None):
return self
def transform(self, X):
X_df = pd.DataFrame(X) if not isinstance(X, pd.DataFrame) else X
if self.create_interactions and X_df.shape[1] >= 2:
# 创建前两个特征的交互项
interaction = X_df.iloc[:, 0] * X_df.iloc[:, 1]
X_df['interaction_0_1'] = interaction
return X_df.values
# 创建预处理管道
def create_preprocessing_pipeline():
# 数值特征管道
numeric_pipeline = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('outlier_remover', OutlierRemover(factor=1.5)),
('scaler', StandardScaler())
])
# 类别特征管道
categorical_pipeline = Pipeline([
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('onehot', OneHotEncoder(drop='first', sparse=False, handle_unknown='ignore'))
])
return numeric_pipeline, categorical_pipeline
# 使用ColumnTransformer组合管道
def create_full_pipeline(numeric_features, categorical_features):
numeric_pipeline, categorical_pipeline = create_preprocessing_pipeline()
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_pipeline, numeric_features),
('cat', categorical_pipeline, categorical_features)
]
)
# 完整管道(包括特征创建)
full_pipeline = Pipeline([
('preprocessor', preprocessor),
('feature_creator', FeatureCreatorTransformer())
])
return full_pipeline
# 示例使用
numeric_features = ['age', 'income', 'experience', 'score']
categorical_features = ['education']
full_pipeline = create_full_pipeline(numeric_features, categorical_features)
# 拟合和变换数据
X_processed = full_pipeline.fit_transform(df)
print(f"处理后的数据形状: {X_processed.shape}")
### 2.7.2 自定义预处理类
```python
class DataPreprocessor:
"""
完整的数据预处理类
"""
def __init__(self,
missing_strategy='auto',
outlier_method='iqr',
scaling_method='standard',
encoding_method='onehot',
feature_selection=True,
create_features=True):
self.missing_strategy = missing_strategy
self.outlier_method = outlier_method
self.scaling_method = scaling_method
self.encoding_method = encoding_method
self.feature_selection = feature_selection
self.create_features = create_features
self.preprocessors = {}
self.feature_names = None
self.selected_features = None
def fit(self, X, y=None):
"""
拟合预处理器
"""
self.X_original = X.copy()
self.y = y
# 识别特征类型
self.numeric_features = X.select_dtypes(include=[np.number]).columns.tolist()
self.categorical_features = X.select_dtypes(include=['object']).columns.tolist()
# 处理缺失值
self._fit_missing_imputers(X)
# 处理异常值
self._fit_outlier_detectors(X)
# 特征编码
self._fit_encoders(X)
# 特征缩放
self._fit_scalers(X)
# 特征选择
if self.feature_selection and y is not None:
self._fit_feature_selector(X, y)
return self
def transform(self, X):
"""
变换数据
"""
X_transformed = X.copy()
# 处理缺失值
X_transformed = self._transform_missing(X_transformed)
# 处理异常值
X_transformed = self._transform_outliers(X_transformed)
# 特征编码
X_transformed = self._transform_encoding(X_transformed)
# 创建新特征
if self.create_features:
X_transformed = self._create_features(X_transformed)
# 特征缩放
X_transformed = self._transform_scaling(X_transformed)
# 特征选择
if self.feature_selection and self.selected_features is not None:
X_transformed = X_transformed[self.selected_features]
return X_transformed
def fit_transform(self, X, y=None):
"""
拟合并变换数据
"""
return self.fit(X, y).transform(X)
def _fit_missing_imputers(self, X):
"""拟合缺失值填充器"""
from sklearn.impute import SimpleImputer, KNNImputer
if self.missing_strategy == 'auto':
# 自动选择策略
for col in self.numeric_features:
missing_ratio = X[col].isnull().sum() / len(X)
if missing_ratio < 0.1:
strategy = 'median'
else:
strategy = 'knn'
if strategy == 'knn':
imputer = KNNImputer(n_neighbors=5)
else:
imputer = SimpleImputer(strategy=strategy)
self.preprocessors[f'{col}_imputer'] = imputer
imputer.fit(X[[col]])
for col in self.categorical_features:
imputer = SimpleImputer(strategy='most_frequent')
self.preprocessors[f'{col}_imputer'] = imputer
imputer.fit(X[[col]])
def _transform_missing(self, X):
"""变换缺失值"""
X_transformed = X.copy()
for col in self.numeric_features + self.categorical_features:
if f'{col}_imputer' in self.preprocessors:
imputer = self.preprocessors[f'{col}_imputer']
X_transformed[col] = imputer.transform(X_transformed[[col]]).flatten()
return X_transformed
def _fit_outlier_detectors(self, X):
"""拟合异常值检测器"""
if self.outlier_method == 'iqr':
for col in self.numeric_features:
Q1 = X[col].quantile(0.25)
Q3 = X[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
self.preprocessors[f'{col}_bounds'] = (lower_bound, upper_bound)
def _transform_outliers(self, X):
"""变换异常值"""
X_transformed = X.copy()
if self.outlier_method == 'iqr':
for col in self.numeric_features:
if f'{col}_bounds' in self.preprocessors:
lower_bound, upper_bound = self.preprocessors[f'{col}_bounds']
X_transformed[col] = np.clip(X_transformed[col], lower_bound, upper_bound)
return X_transformed
def _fit_encoders(self, X):
"""拟合编码器"""
if self.encoding_method == 'onehot':
from sklearn.preprocessing import OneHotEncoder
for col in self.categorical_features:
encoder = OneHotEncoder(drop='first', sparse=False, handle_unknown='ignore')
self.preprocessors[f'{col}_encoder'] = encoder
encoder.fit(X[[col]])
def _transform_encoding(self, X):
"""变换编码"""
X_transformed = X.copy()
if self.encoding_method == 'onehot':
for col in self.categorical_features:
if f'{col}_encoder' in self.preprocessors:
encoder = self.preprocessors[f'{col}_encoder']
encoded_features = encoder.transform(X_transformed[[col]])
feature_names = encoder.get_feature_names_out([col])
# 添加编码后的特征
encoded_df = pd.DataFrame(encoded_features,
columns=feature_names,
index=X_transformed.index)
X_transformed = pd.concat([X_transformed, encoded_df], axis=1)
# 删除原始类别特征
X_transformed = X_transformed.drop(col, axis=1)
return X_transformed
def _create_features(self, X):
"""创建新特征"""
X_transformed = X.copy()
# 获取数值特征(可能在编码后发生变化)
current_numeric = X_transformed.select_dtypes(include=[np.number]).columns.tolist()
# 创建交互特征(只对原始数值特征)
original_numeric = [col for col in current_numeric if col in self.numeric_features]
if len(original_numeric) >= 2:
for i in range(len(original_numeric)):
for j in range(i+1, len(original_numeric)):
col1, col2 = original_numeric[i], original_numeric[j]
X_transformed[f'{col1}_x_{col2}'] = X_transformed[col1] * X_transformed[col2]
return X_transformed
def _fit_scalers(self, X):
"""拟合缩放器"""
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
if self.scaling_method == 'standard':
scaler = StandardScaler()
elif self.scaling_method == 'minmax':
scaler = MinMaxScaler()
elif self.scaling_method == 'robust':
scaler = RobustScaler()
else:
return
self.preprocessors['scaler'] = scaler
# 只对数值特征进行缩放
numeric_cols = X.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 0:
scaler.fit(X[numeric_cols])
def _transform_scaling(self, X):
"""变换缩放"""
if 'scaler' not in self.preprocessors:
return X
X_transformed = X.copy()
scaler = self.preprocessors['scaler']
# 获取当前的数值特征
numeric_cols = X_transformed.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 0:
X_transformed[numeric_cols] = scaler.transform(X_transformed[numeric_cols])
return X_transformed
def _fit_feature_selector(self, X, y):
"""拟合特征选择器"""
from sklearn.feature_selection import SelectKBest, f_classif
# 先进行基本预处理以获得数值特征
X_temp = self._basic_preprocess(X)
# 特征选择
selector = SelectKBest(score_func=f_classif, k=min(10, X_temp.shape[1]))
selector.fit(X_temp, y)
self.selected_features = X_temp.columns[selector.get_support()].tolist()
self.preprocessors['feature_selector'] = selector
def _basic_preprocess(self, X):
"""基本预处理(用于特征选择)"""
X_temp = X.copy()
# 简单填充
for col in self.numeric_features:
X_temp[col].fillna(X_temp[col].median(), inplace=True)
for col in self.categorical_features:
X_temp[col].fillna(X_temp[col].mode()[0] if len(X_temp[col].mode()) > 0 else 'unknown', inplace=True)
# 简单编码
for col in self.categorical_features:
X_temp[col] = pd.Categorical(X_temp[col]).codes
return X_temp
def get_feature_importance(self):
"""获取特征重要性"""
if 'feature_selector' in self.preprocessors:
selector = self.preprocessors['feature_selector']
scores = selector.scores_
feature_names = self.X_original.columns
importance_df = pd.DataFrame({
'feature': feature_names,
'importance': scores
}).sort_values('importance', ascending=False)
return importance_df
return None
# 使用自定义预处理类
preprocessor = DataPreprocessor(
missing_strategy='auto',
outlier_method='iqr',
scaling_method='standard',
encoding_method='onehot',
feature_selection=True,
create_features=True
)
# 创建示例目标变量
y_example = np.random.choice([0, 1], size=len(df.dropna()))
# 拟合和变换
X_processed = preprocessor.fit_transform(df.dropna(), y_example)
print(f"预处理后的数据形状: {X_processed.shape}")
# 查看特征重要性
feature_importance = preprocessor.get_feature_importance()
if feature_importance is not None:
print("\n特征重要性:")
print(feature_importance.head())
2.8 实际案例:房价预测数据预处理
2.8.1 案例背景
我们将使用一个房价预测数据集来演示完整的数据预处理流程。
# 创建房价预测示例数据集
np.random.seed(42)
n_samples = 2000
# 生成房屋特征数据
house_data = {
'area': np.random.normal(150, 50, n_samples), # 面积
'bedrooms': np.random.poisson(3, n_samples), # 卧室数
'bathrooms': np.random.poisson(2, n_samples), # 浴室数
'age': np.random.exponential(10, n_samples), # 房龄
'location': np.random.choice(['市中心', '郊区', '新区'], n_samples, p=[0.3, 0.4, 0.3]),
'type': np.random.choice(['公寓', '别墅', '联排'], n_samples, p=[0.5, 0.3, 0.2]),
'garage': np.random.choice([0, 1, 2], n_samples, p=[0.3, 0.5, 0.2]),
'garden': np.random.choice([0, 1], n_samples, p=[0.6, 0.4]),
'condition': np.random.choice(['差', '一般', '良好', '优秀'], n_samples, p=[0.1, 0.3, 0.4, 0.2])
}
# 生成房价(基于特征的线性组合加噪声)
base_price = (house_data['area'] * 100 +
house_data['bedrooms'] * 5000 +
house_data['bathrooms'] * 3000 -
house_data['age'] * 500)
# 位置调整
location_multiplier = {'市中心': 1.5, '郊区': 1.0, '新区': 1.2}
for i, loc in enumerate(house_data['location']):
base_price[i] *= location_multiplier[loc]
# 添加噪声
house_data['price'] = base_price + np.random.normal(0, 10000, n_samples)
house_data['price'] = np.maximum(house_data['price'], 50000) # 最低价格
# 创建DataFrame
house_df = pd.DataFrame(house_data)
# 引入数据质量问题
# 1. 缺失值
missing_indices = np.random.choice(house_df.index, size=int(0.1 * len(house_df)), replace=False)
house_df.loc[missing_indices[:50], 'bathrooms'] = np.nan
house_df.loc[missing_indices[50:100], 'age'] = np.nan
house_df.loc[missing_indices[100:], 'condition'] = np.nan
# 2. 异常值
outlier_indices = np.random.choice(house_df.index, size=20, replace=False)
house_df.loc[outlier_indices, 'area'] = house_df.loc[outlier_indices, 'area'] * 3
# 3. 重复数据
duplicate_indices = np.random.choice(house_df.index, size=50, replace=False)
house_df = pd.concat([house_df, house_df.loc[duplicate_indices]], ignore_index=True)
print("房价数据集基本信息:")
print(house_df.info())
print("\n数据集描述统计:")
print(house_df.describe())
2.8.2 完整预处理流程
class HousePricePreprocessor:
"""
房价预测专用预处理器
"""
def __init__(self):
self.preprocessors = {}
self.feature_names = None
def preprocess(self, df, target_col='price'):
"""
完整的预处理流程
"""
print("开始数据预处理...")
# 1. 数据清洗
df_cleaned = self._data_cleaning(df)
# 2. 特征工程
df_featured = self._feature_engineering(df_cleaned)
# 3. 异常值处理
df_outliers = self._handle_outliers(df_featured)
# 4. 缺失值处理
df_imputed = self._handle_missing_values(df_outliers)
# 5. 特征编码
df_encoded = self._encode_features(df_imputed)
# 6. 特征缩放
df_scaled = self._scale_features(df_encoded, target_col)
# 7. 特征选择
if target_col in df_scaled.columns:
X = df_scaled.drop(target_col, axis=1)
y = df_scaled[target_col]
X_selected = self._select_features(X, y)
return X_selected, y
else:
return df_scaled, None
def _data_cleaning(self, df):
"""数据清洗"""
print("1. 数据清洗...")
df_clean = df.copy()
# 删除重复数据
initial_shape = df_clean.shape
df_clean = df_clean.drop_duplicates()
print(f" 删除重复数据: {initial_shape[0] - df_clean.shape[0]} 行")
# 数据类型转换
df_clean['bedrooms'] = df_clean['bedrooms'].astype('int64')
df_clean['garage'] = df_clean['garage'].astype('int64')
df_clean['garden'] = df_clean['garden'].astype('int64')
# 数据范围检查
df_clean = df_clean[df_clean['area'] > 0]
df_clean = df_clean[df_clean['bedrooms'] >= 0]
df_clean = df_clean[df_clean['bathrooms'] >= 0]
df_clean = df_clean[df_clean['age'] >= 0]
print(f" 清洗后数据形状: {df_clean.shape}")
return df_clean
def _feature_engineering(self, df):
"""特征工程"""
print("2. 特征工程...")
df_featured = df.copy()
# 创建新特征
df_featured['rooms_total'] = df_featured['bedrooms'] + df_featured['bathrooms']
df_featured['area_per_room'] = df_featured['area'] / (df_featured['rooms_total'] + 1)
df_featured['is_new'] = (df_featured['age'] < 5).astype(int)
df_featured['has_garage'] = (df_featured['garage'] > 0).astype(int)
df_featured['luxury_score'] = (df_featured['bedrooms'] * 0.3 +
df_featured['bathrooms'] * 0.4 +
df_featured['garage'] * 0.2 +
df_featured['garden'] * 0.1)
# 面积分箱
df_featured['area_category'] = pd.cut(df_featured['area'],
bins=[0, 100, 150, 200, float('inf')],
labels=['小', '中', '大', '超大'])
# 房龄分箱
df_featured['age_category'] = pd.cut(df_featured['age'],
bins=[0, 5, 15, 30, float('inf')],
labels=['新房', '次新', '老房', '超老'])
print(f" 添加特征后数据形状: {df_featured.shape}")
return df_featured
def _handle_outliers(self, df):
"""异常值处理"""
print("3. 异常值处理...")
df_outliers = df.copy()
# 使用IQR方法处理数值特征的异常值
numeric_cols = ['area', 'age', 'price', 'area_per_room', 'luxury_score']
for col in numeric_cols:
if col in df_outliers.columns:
Q1 = df_outliers[col].quantile(0.25)
Q3 = df_outliers[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
# 截断异常值
outliers_count = ((df_outliers[col] < lower_bound) |
(df_outliers[col] > upper_bound)).sum()
df_outliers[col] = np.clip(df_outliers[col], lower_bound, upper_bound)
print(f" {col}: 处理了 {outliers_count} 个异常值")
return df_outliers
def _handle_missing_values(self, df):
"""缺失值处理"""
print("4. 缺失值处理...")
df_imputed = df.copy()
# 数值特征:使用中位数填充
numeric_cols = df_imputed.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
if df_imputed[col].isnull().sum() > 0:
median_value = df_imputed[col].median()
df_imputed[col].fillna(median_value, inplace=True)
print(f" {col}: 使用中位数 {median_value:.2f} 填充")
# 类别特征:使用众数填充
categorical_cols = df_imputed.select_dtypes(include=['object', 'category']).columns
for col in categorical_cols:
if df_imputed[col].isnull().sum() > 0:
mode_value = df_imputed[col].mode()[0] if len(df_imputed[col].mode()) > 0 else 'unknown'
df_imputed[col].fillna(mode_value, inplace=True)
print(f" {col}: 使用众数 '{mode_value}' 填充")
return df_imputed
def _encode_features(self, df):
"""特征编码"""
print("5. 特征编码...")
df_encoded = df.copy()
# 序数编码(有序类别)
condition_mapping = {'差': 1, '一般': 2, '良好': 3, '优秀': 4}
if 'condition' in df_encoded.columns:
df_encoded['condition_encoded'] = df_encoded['condition'].map(condition_mapping)
# 独热编码(无序类别)
categorical_cols = ['location', 'type', 'area_category', 'age_category']
for col in categorical_cols:
if col in df_encoded.columns:
dummies = pd.get_dummies(df_encoded[col], prefix=col, drop_first=True)
df_encoded = pd.concat([df_encoded, dummies], axis=1)
df_encoded.drop(col, axis=1, inplace=True)
# 删除原始类别列
cols_to_drop = ['condition']
for col in cols_to_drop:
if col in df_encoded.columns:
df_encoded.drop(col, axis=1, inplace=True)
print(f" 编码后数据形状: {df_encoded.shape}")
return df_encoded
def _scale_features(self, df, target_col):
"""特征缩放"""
print("6. 特征缩放...")
df_scaled = df.copy()
# 只对数值特征进行缩放(排除目标变量)
numeric_cols = df_scaled.select_dtypes(include=[np.number]).columns.tolist()
if target_col in numeric_cols:
numeric_cols.remove(target_col)
if len(numeric_cols) > 0:
scaler = StandardScaler()
df_scaled[numeric_cols] = scaler.fit_transform(df_scaled[numeric_cols])
self.preprocessors['scaler'] = scaler
print(f" 标准化了 {len(numeric_cols)} 个数值特征")
return df_scaled
def _select_features(self, X, y):
"""特征选择"""
print("7. 特征选择...")
# 使用相关性进行特征选择
correlation_with_target = X.corrwith(y).abs().sort_values(ascending=False)
# 选择与目标变量相关性较高的特征
selected_features = correlation_with_target[correlation_with_target > 0.1].index.tolist()
# 去除高相关性特征
corr_matrix = X[selected_features].corr().abs()
upper_triangle = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
high_corr_features = [column for column in upper_triangle.columns
if any(upper_triangle[column] > 0.9)]
final_features = [f for f in selected_features if f not in high_corr_features]
print(f" 选择了 {len(final_features)} 个特征")
print(f" 特征列表: {final_features[:10]}...") # 显示前10个特征
return X[final_features]
# 使用房价预处理器
house_preprocessor = HousePricePreprocessor()
X_processed, y_processed = house_preprocessor.preprocess(house_df)
print(f"\n最终处理结果:")
print(f"特征矩阵形状: {X_processed.shape}")
print(f"目标变量形状: {y_processed.shape}")
print(f"\n特征列表:")
print(X_processed.columns.tolist())
2.8.3 预处理效果评估
def evaluate_preprocessing_effect(original_df, processed_X, processed_y):
"""
评估预处理效果
"""
print("预处理效果评估:")
print("=" * 50)
# 1. 数据质量改善
print("1. 数据质量改善:")
print(f" 原始数据缺失值: {original_df.isnull().sum().sum()}")
print(f" 处理后缺失值: {processed_X.isnull().sum().sum()}")
print(f" 原始数据形状: {original_df.shape}")
print(f" 处理后形状: {processed_X.shape}")
# 2. 特征分布改善
print("\n2. 特征分布改善:")
# 选择几个关键特征进行比较
key_features = ['area', 'age'] if 'area' in original_df.columns else []
fig, axes = plt.subplots(2, len(key_features), figsize=(6*len(key_features), 8))
for i, feature in enumerate(key_features):
if feature in original_df.columns:
# 原始分布
axes[0, i].hist(original_df[feature].dropna(), bins=30, alpha=0.7, edgecolor='black')
axes[0, i].set_title(f'原始 {feature} 分布')
axes[0, i].set_xlabel(feature)
axes[0, i].set_ylabel('频率')
# 处理后分布(如果存在对应特征)
processed_feature = feature if feature in processed_X.columns else None
if processed_feature:
axes[1, i].hist(processed_X[processed_feature], bins=30, alpha=0.7, edgecolor='black')
axes[1, i].set_title(f'处理后 {feature} 分布')
axes[1, i].set_xlabel(feature)
axes[1, i].set_ylabel('频率')
plt.tight_layout()
plt.show()
# 3. 特征相关性分析
print("\n3. 特征相关性分析:")
# 计算与目标变量的相关性
correlations = processed_X.corrwith(processed_y).abs().sort_values(ascending=False)
print("与目标变量相关性最高的前10个特征:")
print(correlations.head(10))
# 4. 可视化相关性矩阵
plt.figure(figsize=(12, 10))
# 选择相关性最高的特征进行可视化
top_features = correlations.head(10).index.tolist()
corr_matrix = processed_X[top_features].corr()
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0,
square=True, fmt='.2f')
plt.title('处理后特征相关性矩阵(前10个特征)')
plt.tight_layout()
plt.show()
# 评估预处理效果
evaluate_preprocessing_effect(house_df, X_processed, y_processed)
2.9 本章小结
本章详细介绍了数据预处理与特征工程的核心技术,包括:
核心技术
- 数据质量评估:缺失值、异常值、重复值的检测和分析
- 缺失值处理:删除法、填充法、高级插补方法(KNN、MICE)
- 异常值处理:统计方法(Z-Score、IQR)、机器学习方法(Isolation Forest、LOF)
- 数据标准化:Z-Score、Min-Max、Robust、分位数标准化
- 特征工程:特征选择、特征创建、特征编码
实践技能
- 预处理管道:使用sklearn Pipeline构建可重用的预处理流程
- 自定义预处理器:根据具体业务需求设计预处理类
- 效果评估:通过可视化和统计指标评估预处理效果
最佳实践
- 数据驱动:根据数据特点选择合适的预处理方法
- 业务理解:结合业务知识进行特征工程
- 迭代优化:通过实验不断改进预处理策略
- 可重现性:确保预处理流程的一致性和可重现性
下一章预告
下一章我们将学习监督学习算法,包括: - 线性回归和逻辑回归 - 决策树和随机森林 - 支持向量机 - 朴素贝叶斯 - K近邻算法 - 模型评估和选择
2.10 练习题
基础练习
缺失值处理
# 创建包含缺失值的数据集,尝试不同的填充方法 # 比较填充效果
异常值检测
# 使用不同方法检测异常值 # 比较检测结果的差异
特征编码
# 对类别变量进行不同类型的编码 # 分析编码对模型性能的影响
进阶练习
- 特征选择比较
- 实现过滤法、包装法、嵌入法
- 比较不同方法的选择结果
- 分析特征选择对模型性能的影响
- 预处理管道设计
- 设计一个完整的预处理管道
- 包含所有必要的预处理步骤
- 确保管道的可重用性
- 特征工程创新
基于业务理解创建新特征
使用多项式特征和交互特征
评估新特征的有效性
项目练习
- 完整数据预处理项目 选择一个真实数据集,完成:
- 数据质量评估
- 缺失值和异常值处理
- 特征工程
- 预处理效果评估
- 撰写预处理报告
- 预处理方法对比研究
在同一数据集上应用不同预处理方法
训练相同的模型
比较不同预处理方法对模型性能的影响
总结最佳实践
思考题
- 预处理策略选择
- 如何根据数据特点选择合适的预处理方法?
- 什么情况下应该删除异常值,什么情况下应该保留?
- 如何平衡特征数量和模型复杂度?
- 业务场景应用
- 在你熟悉的业务场景中,有哪些特定的数据质量问题?
- 如何设计针对性的预处理策略?
- 如何确保预处理的业务合理性? 这些练习将帮助你深入理解数据预处理的各个方面,为构建高质量的机器学习模型打下坚实基础。