2.1 数据预处理概述

数据预处理是机器学习项目中最重要的步骤之一,通常占据整个项目80%的时间。高质量的数据是构建优秀模型的基础,而原始数据往往存在各种问题,需要通过预处理来提高数据质量。

2.1.1 为什么需要数据预处理

原始数据的常见问题: - 缺失值(Missing Values) - 异常值(Outliers) - 数据格式不一致 - 重复数据 - 噪声数据 - 数据类型错误 - 量纲不统一

数据预处理的目标: - 提高数据质量 - 减少噪声和异常值的影响 - 统一数据格式和量纲 - 提取有用信息 - 提高模型性能

2.1.2 数据预处理流程

原始数据 → 数据清洗 → 数据集成 → 数据变换 → 数据规约 → 清洁数据
  1. 数据清洗:处理缺失值、异常值、噪声数据
  2. 数据集成:合并多个数据源
  3. 数据变换:标准化、归一化、离散化
  4. 数据规约:降维、特征选择

2.2 数据质量评估

2.2.1 数据质量维度

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats

# 创建示例数据集
np.random.seed(42)
n_samples = 1000

data = {
    'age': np.random.normal(35, 10, n_samples),
    'income': np.random.lognormal(10, 1, n_samples),
    'education': np.random.choice(['高中', '本科', '硕士', '博士'], n_samples, p=[0.3, 0.4, 0.2, 0.1]),
    'experience': np.random.normal(8, 5, n_samples),
    'score': np.random.normal(75, 15, n_samples)
}

# 引入一些数据质量问题
data['age'][50:60] = np.nan  # 缺失值
data['income'][100:110] = data['income'][100:110] * 10  # 异常值
data['experience'] = np.maximum(0, data['experience'])  # 确保经验年数非负
data['score'] = np.clip(data['score'], 0, 100)  # 确保分数在0-100之间

df = pd.DataFrame(data)

print("数据集基本信息:")
print(df.info())
print("\n数据集描述统计:")
print(df.describe())

2.2.2 数据质量检查函数

def data_quality_report(df):
    """
    生成数据质量报告
    """
    print("=" * 50)
    print("数据质量报告")
    print("=" * 50)
    
    # 基本信息
    print(f"数据集形状: {df.shape}")
    print(f"内存使用: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
    
    # 缺失值分析
    print("\n缺失值分析:")
    missing_data = df.isnull().sum()
    missing_percent = 100 * missing_data / len(df)
    missing_table = pd.DataFrame({
        '缺失数量': missing_data,
        '缺失百分比': missing_percent
    })
    print(missing_table[missing_table['缺失数量'] > 0])
    
    # 重复值分析
    duplicates = df.duplicated().sum()
    print(f"\n重复行数量: {duplicates}")
    
    # 数据类型分析
    print("\n数据类型分析:")
    print(df.dtypes.value_counts())
    
    # 数值型变量的异常值检测(使用IQR方法)
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    print("\n异常值分析(IQR方法):")
    
    for col in numeric_cols:
        Q1 = df[col].quantile(0.25)
        Q3 = df[col].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        
        outliers = df[(df[col] < lower_bound) | (df[col] > upper_bound)][col]
        print(f"{col}: {len(outliers)} 个异常值 ({len(outliers)/len(df)*100:.1f}%)")
    
    return missing_table

# 生成数据质量报告
quality_report = data_quality_report(df)

2.2.3 数据分布可视化

def plot_data_distribution(df):
    """
    绘制数据分布图
    """
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    n_cols = len(numeric_cols)
    
    fig, axes = plt.subplots(2, n_cols, figsize=(4*n_cols, 8))
    
    for i, col in enumerate(numeric_cols):
        # 直方图
        axes[0, i].hist(df[col].dropna(), bins=30, alpha=0.7, edgecolor='black')
        axes[0, i].set_title(f'{col} 分布')
        axes[0, i].set_xlabel(col)
        axes[0, i].set_ylabel('频率')
        
        # 箱线图
        axes[1, i].boxplot(df[col].dropna())
        axes[1, i].set_title(f'{col} 箱线图')
        axes[1, i].set_ylabel(col)
    
    plt.tight_layout()
    plt.show()

# 绘制分布图
plot_data_distribution(df)

2.3 缺失值处理

2.3.1 缺失值类型

完全随机缺失(MCAR - Missing Completely At Random): - 缺失与任何变量都无关 - 最理想的缺失类型 - 可以安全删除或简单填充

随机缺失(MAR - Missing At Random): - 缺失与观测到的变量有关 - 可以通过其他变量预测 - 需要更复杂的处理方法

非随机缺失(MNAR - Missing Not At Random): - 缺失与未观测到的值有关 - 最难处理的缺失类型 - 需要领域知识

2.3.2 缺失值检测

def missing_value_analysis(df):
    """
    缺失值分析
    """
    # 缺失值统计
    missing_stats = pd.DataFrame({
        '缺失数量': df.isnull().sum(),
        '缺失百分比': df.isnull().sum() / len(df) * 100,
        '数据类型': df.dtypes
    })
    missing_stats = missing_stats[missing_stats['缺失数量'] > 0].sort_values('缺失百分比', ascending=False)
    
    # 缺失值模式可视化
    plt.figure(figsize=(12, 8))
    
    # 缺失值热图
    plt.subplot(2, 2, 1)
    sns.heatmap(df.isnull(), cbar=True, yticklabels=False, cmap='viridis')
    plt.title('缺失值模式')
    
    # 缺失值条形图
    plt.subplot(2, 2, 2)
    missing_stats['缺失百分比'].plot(kind='bar')
    plt.title('各变量缺失百分比')
    plt.ylabel('缺失百分比 (%)')
    plt.xticks(rotation=45)
    
    # 缺失值组合模式
    plt.subplot(2, 2, 3)
    missing_combinations = df.isnull().value_counts()
    missing_combinations.head(10).plot(kind='bar')
    plt.title('缺失值组合模式(前10)')
    plt.ylabel('频次')
    plt.xticks(rotation=45)
    
    plt.tight_layout()
    plt.show()
    
    return missing_stats

# 分析缺失值
missing_analysis = missing_value_analysis(df)
print(missing_analysis)

2.3.3 缺失值处理方法

删除法

# 1. 删除包含缺失值的行
df_dropna_rows = df.dropna()
print(f"删除缺失行后数据形状: {df_dropna_rows.shape}")

# 2. 删除包含缺失值的列
df_dropna_cols = df.dropna(axis=1)
print(f"删除缺失列后数据形状: {df_dropna_cols.shape}")

# 3. 删除缺失值超过阈值的行/列
threshold = 0.8  # 保留至少80%非缺失值的行
df_threshold = df.dropna(thresh=int(threshold * len(df.columns)))
print(f"阈值删除后数据形状: {df_threshold.shape}")

填充法

from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer

# 1. 简单填充
class SimpleImputerMethods:
    def __init__(self, df):
        self.df = df.copy()
    
    def mean_imputation(self, columns=None):
        """均值填充"""
        if columns is None:
            columns = self.df.select_dtypes(include=[np.number]).columns
        
        for col in columns:
            self.df[col].fillna(self.df[col].mean(), inplace=True)
        return self.df
    
    def median_imputation(self, columns=None):
        """中位数填充"""
        if columns is None:
            columns = self.df.select_dtypes(include=[np.number]).columns
        
        for col in columns:
            self.df[col].fillna(self.df[col].median(), inplace=True)
        return self.df
    
    def mode_imputation(self, columns=None):
        """众数填充"""
        if columns is None:
            columns = self.df.select_dtypes(include=['object']).columns
        
        for col in columns:
            mode_value = self.df[col].mode()
            if len(mode_value) > 0:
                self.df[col].fillna(mode_value[0], inplace=True)
        return self.df
    
    def forward_fill(self, columns=None):
        """前向填充"""
        if columns is None:
            columns = self.df.columns
        
        for col in columns:
            self.df[col].fillna(method='ffill', inplace=True)
        return self.df
    
    def backward_fill(self, columns=None):
        """后向填充"""
        if columns is None:
            columns = self.df.columns
        
        for col in columns:
            self.df[col].fillna(method='bfill', inplace=True)
        return self.df

# 使用简单填充方法
imputer = SimpleImputerMethods(df)
df_mean_imputed = imputer.mean_imputation(['age', 'income', 'experience', 'score'])
print("均值填充后的缺失值:")
print(df_mean_imputed.isnull().sum())

高级填充方法

# 2. KNN填充
def knn_imputation(df, n_neighbors=5):
    """
    KNN填充
    """
    # 分离数值型和类别型变量
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    categorical_cols = df.select_dtypes(include=['object']).columns
    
    df_imputed = df.copy()
    
    # 对数值型变量使用KNN填充
    if len(numeric_cols) > 0:
        knn_imputer = KNNImputer(n_neighbors=n_neighbors)
        df_imputed[numeric_cols] = knn_imputer.fit_transform(df[numeric_cols])
    
    # 对类别型变量使用众数填充
    for col in categorical_cols:
        mode_value = df[col].mode()
        if len(mode_value) > 0:
            df_imputed[col].fillna(mode_value[0], inplace=True)
    
    return df_imputed

# 使用KNN填充
df_knn_imputed = knn_imputation(df)
print("KNN填充后的缺失值:")
print(df_knn_imputed.isnull().sum())

# 3. 迭代填充(MICE)
def mice_imputation(df, max_iter=10, random_state=42):
    """
    MICE(多重插补链式方程)填充
    """
    # 只对数值型变量使用MICE
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    categorical_cols = df.select_dtypes(include=['object']).columns
    
    df_imputed = df.copy()
    
    if len(numeric_cols) > 0:
        mice_imputer = IterativeImputer(max_iter=max_iter, random_state=random_state)
        df_imputed[numeric_cols] = mice_imputer.fit_transform(df[numeric_cols])
    
    # 类别型变量使用众数填充
    for col in categorical_cols:
        mode_value = df[col].mode()
        if len(mode_value) > 0:
            df_imputed[col].fillna(mode_value[0], inplace=True)
    
    return df_imputed

# 使用MICE填充
df_mice_imputed = mice_imputation(df)
print("MICE填充后的缺失值:")
print(df_mice_imputed.isnull().sum())

填充效果比较

def compare_imputation_methods(df_original, methods_dict):
    """
    比较不同填充方法的效果
    """
    # 创建人工缺失值用于测试
    df_test = df_original.copy()
    
    # 随机设置20%的值为缺失
    np.random.seed(42)
    for col in df_test.select_dtypes(include=[np.number]).columns:
        missing_idx = np.random.choice(df_test.index, size=int(0.2*len(df_test)), replace=False)
        df_test.loc[missing_idx, col] = np.nan
    
    results = {}
    
    for method_name, df_imputed in methods_dict.items():
        # 计算填充值与原始值的差异
        mse_scores = []
        
        for col in df_original.select_dtypes(include=[np.number]).columns:
            original_values = df_original[col]
            imputed_values = df_imputed[col]
            
            # 只比较原本有值但被人工设为缺失的位置
            mask = df_test[col].isnull() & df_original[col].notnull()
            
            if mask.sum() > 0:
                mse = np.mean((original_values[mask] - imputed_values[mask]) ** 2)
                mse_scores.append(mse)
        
        results[method_name] = np.mean(mse_scores) if mse_scores else np.nan
    
    return results

# 比较填充方法
methods = {
    '均值填充': df_mean_imputed,
    'KNN填充': df_knn_imputed,
    'MICE填充': df_mice_imputed
}

# 注意:这里需要原始完整数据进行比较,实际项目中可能没有
# comparison_results = compare_imputation_methods(df_complete, methods)
# print("填充方法比较(MSE越小越好):")
# for method, mse in comparison_results.items():
#     print(f"{method}: {mse:.4f}")

2.4 异常值检测与处理

2.4.1 异常值检测方法

统计方法

class OutlierDetector:
    def __init__(self, df):
        self.df = df.copy()
    
    def z_score_detection(self, columns=None, threshold=3):
        """
        Z-Score方法检测异常值
        """
        if columns is None:
            columns = self.df.select_dtypes(include=[np.number]).columns
        
        outliers = {}
        
        for col in columns:
            z_scores = np.abs(stats.zscore(self.df[col].dropna()))
            outlier_indices = self.df[col].dropna().index[z_scores > threshold]
            outliers[col] = outlier_indices.tolist()
        
        return outliers
    
    def iqr_detection(self, columns=None, factor=1.5):
        """
        IQR方法检测异常值
        """
        if columns is None:
            columns = self.df.select_dtypes(include=[np.number]).columns
        
        outliers = {}
        
        for col in columns:
            Q1 = self.df[col].quantile(0.25)
            Q3 = self.df[col].quantile(0.75)
            IQR = Q3 - Q1
            
            lower_bound = Q1 - factor * IQR
            upper_bound = Q3 + factor * IQR
            
            outlier_mask = (self.df[col] < lower_bound) | (self.df[col] > upper_bound)
            outliers[col] = self.df[outlier_mask].index.tolist()
        
        return outliers
    
    def modified_z_score_detection(self, columns=None, threshold=3.5):
        """
        修正Z-Score方法(基于中位数)
        """
        if columns is None:
            columns = self.df.select_dtypes(include=[np.number]).columns
        
        outliers = {}
        
        for col in columns:
            median = self.df[col].median()
            mad = np.median(np.abs(self.df[col] - median))
            
            if mad == 0:
                mad = np.std(self.df[col])
            
            modified_z_scores = 0.6745 * (self.df[col] - median) / mad
            outlier_mask = np.abs(modified_z_scores) > threshold
            outliers[col] = self.df[outlier_mask].index.tolist()
        
        return outliers

# 使用异常值检测
detector = OutlierDetector(df)

# Z-Score检测
z_outliers = detector.z_score_detection()
print("Z-Score异常值检测结果:")
for col, indices in z_outliers.items():
    print(f"{col}: {len(indices)} 个异常值")

# IQR检测
iqr_outliers = detector.iqr_detection()
print("\nIQR异常值检测结果:")
for col, indices in iqr_outliers.items():
    print(f"{col}: {len(indices)} 个异常值")

# 修正Z-Score检测
modified_z_outliers = detector.modified_z_score_detection()
print("\n修正Z-Score异常值检测结果:")
for col, indices in modified_z_outliers.items():
    print(f"{col}: {len(indices)} 个异常值")

机器学习方法

from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler

def ml_outlier_detection(df, method='isolation_forest', contamination=0.1):
    """
    使用机器学习方法检测异常值
    """
    # 只使用数值型特征
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    X = df[numeric_cols].dropna()
    
    # 标准化
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    
    if method == 'isolation_forest':
        detector = IsolationForest(contamination=contamination, random_state=42)
    elif method == 'lof':
        detector = LocalOutlierFactor(contamination=contamination)
    elif method == 'one_class_svm':
        detector = OneClassSVM(nu=contamination)
    else:
        raise ValueError("Unsupported method")
    
    # 检测异常值
    if method == 'lof':
        outlier_labels = detector.fit_predict(X_scaled)
    else:
        outlier_labels = detector.fit_predict(X_scaled)
    
    # 返回异常值索引
    outlier_indices = X.index[outlier_labels == -1].tolist()
    
    return outlier_indices, detector

# 使用不同的机器学习方法检测异常值
methods = ['isolation_forest', 'lof', 'one_class_svm']

for method in methods:
    outlier_indices, model = ml_outlier_detection(df, method=method)
    print(f"{method.upper()} 检测到 {len(outlier_indices)} 个异常值")

2.4.2 异常值可视化

def visualize_outliers(df, outlier_indices, method_name):
    """
    可视化异常值
    """
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    n_cols = len(numeric_cols)
    
    fig, axes = plt.subplots(2, n_cols, figsize=(4*n_cols, 8))
    
    for i, col in enumerate(numeric_cols):
        # 散点图
        axes[0, i].scatter(df.index, df[col], alpha=0.6, label='正常值')
        axes[0, i].scatter(outlier_indices, df.loc[outlier_indices, col], 
                          color='red', alpha=0.8, label='异常值')
        axes[0, i].set_title(f'{col} - {method_name}')
        axes[0, i].set_xlabel('索引')
        axes[0, i].set_ylabel(col)
        axes[0, i].legend()
        
        # 箱线图
        box_data = [df[col].dropna(), df.loc[outlier_indices, col].dropna()]
        axes[1, i].boxplot(box_data, labels=['全部数据', '异常值'])
        axes[1, i].set_title(f'{col} 箱线图')
        axes[1, i].set_ylabel(col)
    
    plt.tight_layout()
    plt.show()

# 可视化IQR方法检测的异常值
all_iqr_outliers = set()
for indices in iqr_outliers.values():
    all_iqr_outliers.update(indices)

visualize_outliers(df, list(all_iqr_outliers), 'IQR方法')

2.4.3 异常值处理方法

class OutlierHandler:
    def __init__(self, df):
        self.df = df.copy()
    
    def remove_outliers(self, outlier_indices):
        """
        删除异常值
        """
        return self.df.drop(outlier_indices)
    
    def cap_outliers(self, columns=None, method='iqr', factor=1.5):
        """
        截断异常值(Winsorization)
        """
        if columns is None:
            columns = self.df.select_dtypes(include=[np.number]).columns
        
        df_capped = self.df.copy()
        
        for col in columns:
            if method == 'iqr':
                Q1 = df_capped[col].quantile(0.25)
                Q3 = df_capped[col].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - factor * IQR
                upper_bound = Q3 + factor * IQR
            elif method == 'percentile':
                lower_bound = df_capped[col].quantile(0.05)
                upper_bound = df_capped[col].quantile(0.95)
            
            df_capped[col] = np.clip(df_capped[col], lower_bound, upper_bound)
        
        return df_capped
    
    def transform_outliers(self, columns=None, method='log'):
        """
        变换异常值
        """
        if columns is None:
            columns = self.df.select_dtypes(include=[np.number]).columns
        
        df_transformed = self.df.copy()
        
        for col in columns:
            if method == 'log':
                # 确保所有值为正
                min_val = df_transformed[col].min()
                if min_val <= 0:
                    df_transformed[col] = df_transformed[col] - min_val + 1
                df_transformed[col] = np.log(df_transformed[col])
            elif method == 'sqrt':
                # 确保所有值非负
                df_transformed[col] = np.sqrt(np.abs(df_transformed[col]))
            elif method == 'box_cox':
                from scipy.stats import boxcox
                # 确保所有值为正
                min_val = df_transformed[col].min()
                if min_val <= 0:
                    df_transformed[col] = df_transformed[col] - min_val + 1
                df_transformed[col], _ = boxcox(df_transformed[col])
        
        return df_transformed
    
    def replace_outliers(self, outlier_indices, method='median'):
        """
        替换异常值
        """
        df_replaced = self.df.copy()
        
        for col in self.df.select_dtypes(include=[np.number]).columns:
            col_outliers = [idx for idx in outlier_indices if idx in df_replaced.index]
            
            if method == 'median':
                replacement_value = df_replaced[col].median()
            elif method == 'mean':
                replacement_value = df_replaced[col].mean()
            elif method == 'mode':
                replacement_value = df_replaced[col].mode()[0]
            
            df_replaced.loc[col_outliers, col] = replacement_value
        
        return df_replaced

# 使用异常值处理方法
handler = OutlierHandler(df)

# 1. 截断异常值
df_capped = handler.cap_outliers(method='iqr')
print("截断后的数据统计:")
print(df_capped.describe())

# 2. 对数变换
df_log_transformed = handler.transform_outliers(columns=['income'], method='log')
print("\n对数变换后的收入统计:")
print(df_log_transformed['income'].describe())

# 3. 替换异常值
all_outliers = list(all_iqr_outliers)
df_replaced = handler.replace_outliers(all_outliers, method='median')
print("\n替换异常值后的数据统计:")
print(df_replaced.describe())

2.5 数据标准化与归一化

2.5.1 为什么需要数据标准化

不同特征的量纲和数值范围可能差异很大,这会影响机器学习算法的性能:

  • 距离敏感算法:KNN、K-Means、SVM等
  • 梯度下降算法:神经网络、线性回归等
  • 正则化算法:Ridge、Lasso等

2.5.2 标准化方法

from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, Normalizer
from sklearn.preprocessing import QuantileTransformer, PowerTransformer

class DataScaler:
    def __init__(self, df):
        self.df = df.copy()
        self.scalers = {}
    
    def standard_scaling(self, columns=None):
        """
        Z-Score标准化:(x - μ) / σ
        """
        if columns is None:
            columns = self.df.select_dtypes(include=[np.number]).columns
        
        scaler = StandardScaler()
        df_scaled = self.df.copy()
        df_scaled[columns] = scaler.fit_transform(self.df[columns])
        
        self.scalers['standard'] = scaler
        return df_scaled
    
    def min_max_scaling(self, columns=None, feature_range=(0, 1)):
        """
        最小-最大标准化:(x - min) / (max - min)
        """
        if columns is None:
            columns = self.df.select_dtypes(include=[np.number]).columns
        
        scaler = MinMaxScaler(feature_range=feature_range)
        df_scaled = self.df.copy()
        df_scaled[columns] = scaler.fit_transform(self.df[columns])
        
        self.scalers['minmax'] = scaler
        return df_scaled
    
    def robust_scaling(self, columns=None):
        """
        鲁棒标准化:(x - median) / IQR
        """
        if columns is None:
            columns = self.df.select_dtypes(include=[np.number]).columns
        
        scaler = RobustScaler()
        df_scaled = self.df.copy()
        df_scaled[columns] = scaler.fit_transform(self.df[columns])
        
        self.scalers['robust'] = scaler
        return df_scaled
    
    def unit_vector_scaling(self, columns=None):
        """
        单位向量标准化:x / ||x||
        """
        if columns is None:
            columns = self.df.select_dtypes(include=[np.number]).columns
        
        scaler = Normalizer()
        df_scaled = self.df.copy()
        df_scaled[columns] = scaler.fit_transform(self.df[columns])
        
        self.scalers['normalizer'] = scaler
        return df_scaled
    
    def quantile_scaling(self, columns=None, n_quantiles=1000):
        """
        分位数标准化
        """
        if columns is None:
            columns = self.df.select_dtypes(include=[np.number]).columns
        
        scaler = QuantileTransformer(n_quantiles=n_quantiles, random_state=42)
        df_scaled = self.df.copy()
        df_scaled[columns] = scaler.fit_transform(self.df[columns])
        
        self.scalers['quantile'] = scaler
        return df_scaled
    
    def power_transform(self, columns=None, method='yeo-johnson'):
        """
        幂变换
        """
        if columns is None:
            columns = self.df.select_dtypes(include=[np.number]).columns
        
        scaler = PowerTransformer(method=method, standardize=True)
        df_scaled = self.df.copy()
        df_scaled[columns] = scaler.fit_transform(self.df[columns])
        
        self.scalers['power'] = scaler
        return df_scaled

# 使用不同的标准化方法
scaler = DataScaler(df.dropna())

# 标准化
df_standard = scaler.standard_scaling()
df_minmax = scaler.min_max_scaling()
df_robust = scaler.robust_scaling()
df_quantile = scaler.quantile_scaling()
df_power = scaler.power_transform()

print("原始数据统计:")
print(df.describe())

print("\nZ-Score标准化后:")
print(df_standard.describe())

print("\nMin-Max标准化后:")
print(df_minmax.describe())

2.5.3 标准化效果比较

def compare_scaling_methods(df_original, scaling_results):
    """
    比较不同标准化方法的效果
    """
    numeric_cols = df_original.select_dtypes(include=[np.number]).columns
    n_methods = len(scaling_results)
    n_cols = len(numeric_cols)
    
    fig, axes = plt.subplots(n_methods, n_cols, figsize=(4*n_cols, 3*n_methods))
    
    for i, (method_name, df_scaled) in enumerate(scaling_results.items()):
        for j, col in enumerate(numeric_cols):
            if n_methods == 1:
                ax = axes[j]
            else:
                ax = axes[i, j]
            
            ax.hist(df_scaled[col].dropna(), bins=30, alpha=0.7, edgecolor='black')
            ax.set_title(f'{method_name} - {col}')
            ax.set_xlabel(col)
            ax.set_ylabel('频率')
    
    plt.tight_layout()
    plt.show()

# 比较标准化效果
scaling_results = {
    '原始数据': df.dropna(),
    'Z-Score': df_standard,
    'Min-Max': df_minmax,
    'Robust': df_robust,
    'Quantile': df_quantile
}

compare_scaling_methods(df.dropna(), scaling_results)

2.6 特征工程

2.6.1 特征选择

过滤法(Filter Methods)

from sklearn.feature_selection import SelectKBest, chi2, f_classif, mutual_info_classif
from sklearn.feature_selection import VarianceThreshold
from scipy.stats import pearsonr

class FeatureSelector:
    def __init__(self, X, y=None):
        self.X = X
        self.y = y
        self.selected_features = {}
    
    def variance_threshold_selection(self, threshold=0.0):
        """
        方差阈值选择
        """
        selector = VarianceThreshold(threshold=threshold)
        X_selected = selector.fit_transform(self.X)
        
        selected_features = self.X.columns[selector.get_support()].tolist()
        self.selected_features['variance_threshold'] = selected_features
        
        return X_selected, selected_features
    
    def correlation_selection(self, threshold=0.95):
        """
        相关性选择(去除高相关特征)
        """
        corr_matrix = self.X.corr().abs()
        upper_triangle = corr_matrix.where(
            np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)
        )
        
        # 找到高相关的特征对
        high_corr_features = [column for column in upper_triangle.columns 
                             if any(upper_triangle[column] > threshold)]
        
        selected_features = [col for col in self.X.columns if col not in high_corr_features]
        self.selected_features['correlation'] = selected_features
        
        return self.X[selected_features], selected_features
    
    def univariate_selection(self, score_func=f_classif, k=10):
        """
        单变量特征选择
        """
        if self.y is None:
            raise ValueError("需要目标变量进行单变量选择")
        
        selector = SelectKBest(score_func=score_func, k=k)
        X_selected = selector.fit_transform(self.X, self.y)
        
        selected_features = self.X.columns[selector.get_support()].tolist()
        feature_scores = selector.scores_
        
        self.selected_features['univariate'] = selected_features
        
        return X_selected, selected_features, feature_scores
    
    def mutual_info_selection(self, k=10):
        """
        互信息特征选择
        """
        if self.y is None:
            raise ValueError("需要目标变量进行互信息选择")
        
        selector = SelectKBest(score_func=mutual_info_classif, k=k)
        X_selected = selector.fit_transform(self.X, self.y)
        
        selected_features = self.X.columns[selector.get_support()].tolist()
        feature_scores = selector.scores_
        
        self.selected_features['mutual_info'] = selected_features
        
        return X_selected, selected_features, feature_scores

# 创建示例分类数据
from sklearn.datasets import make_classification

X_demo, y_demo = make_classification(n_samples=1000, n_features=20, n_informative=10, 
                                    n_redundant=5, n_clusters_per_class=1, random_state=42)
X_demo_df = pd.DataFrame(X_demo, columns=[f'feature_{i}' for i in range(20)])

# 使用特征选择
selector = FeatureSelector(X_demo_df, y_demo)

# 方差阈值选择
X_var, features_var = selector.variance_threshold_selection(threshold=0.1)
print(f"方差阈值选择后的特征数量: {len(features_var)}")

# 相关性选择
X_corr, features_corr = selector.correlation_selection(threshold=0.9)
print(f"相关性选择后的特征数量: {len(features_corr)}")

# 单变量选择
X_uni, features_uni, scores_uni = selector.univariate_selection(k=10)
print(f"单变量选择后的特征数量: {len(features_uni)}")

# 互信息选择
X_mi, features_mi, scores_mi = selector.mutual_info_selection(k=10)
print(f"互信息选择后的特征数量: {len(features_mi)}")

包装法(Wrapper Methods)

from sklearn.feature_selection import RFE, RFECV
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import StratifiedKFold

def recursive_feature_elimination(X, y, estimator=None, n_features=10):
    """
    递归特征消除
    """
    if estimator is None:
        estimator = LogisticRegression(random_state=42)
    
    # RFE
    rfe = RFE(estimator=estimator, n_features_to_select=n_features)
    X_rfe = rfe.fit_transform(X, y)
    
    selected_features = X.columns[rfe.support_].tolist()
    feature_ranking = rfe.ranking_
    
    return X_rfe, selected_features, feature_ranking

def recursive_feature_elimination_cv(X, y, estimator=None, cv=5):
    """
    交叉验证递归特征消除
    """
    if estimator is None:
        estimator = LogisticRegression(random_state=42)
    
    # RFECV
    rfecv = RFECV(estimator=estimator, cv=StratifiedKFold(cv), scoring='accuracy')
    X_rfecv = rfecv.fit_transform(X, y)
    
    selected_features = X.columns[rfecv.support_].tolist()
    optimal_features = rfecv.n_features_
    
    return X_rfecv, selected_features, optimal_features

# 使用包装法
X_rfe, features_rfe, ranking_rfe = recursive_feature_elimination(X_demo_df, y_demo)
print(f"RFE选择的特征: {features_rfe}")

X_rfecv, features_rfecv, optimal_n = recursive_feature_elimination_cv(X_demo_df, y_demo)
print(f"RFECV选择的特征数量: {optimal_n}")
print(f"RFECV选择的特征: {features_rfecv}")

嵌入法(Embedded Methods)

from sklearn.linear_model import Lasso, Ridge
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_selection import SelectFromModel

def lasso_feature_selection(X, y, alpha=0.01):
    """
    Lasso特征选择
    """
    lasso = Lasso(alpha=alpha, random_state=42)
    selector = SelectFromModel(lasso)
    X_lasso = selector.fit_transform(X, y)
    
    selected_features = X.columns[selector.get_support()].tolist()
    
    return X_lasso, selected_features

def random_forest_feature_selection(X, y, threshold='median'):
    """
    随机森林特征选择
    """
    rf = RandomForestClassifier(n_estimators=100, random_state=42)
    selector = SelectFromModel(rf, threshold=threshold)
    X_rf = selector.fit_transform(X, y)
    
    selected_features = X.columns[selector.get_support()].tolist()
    
    # 获取特征重要性
    rf.fit(X, y)
    feature_importance = rf.feature_importances_
    
    return X_rf, selected_features, feature_importance

# 使用嵌入法
X_lasso, features_lasso = lasso_feature_selection(X_demo_df, y_demo)
print(f"Lasso选择的特征数量: {len(features_lasso)}")

X_rf, features_rf, importance_rf = random_forest_feature_selection(X_demo_df, y_demo)
print(f"随机森林选择的特征数量: {len(features_rf)}")

# 可视化特征重要性
plt.figure(figsize=(12, 6))
feature_names = [f'feature_{i}' for i in range(20)]
indices = np.argsort(importance_rf)[::-1]

plt.bar(range(len(importance_rf)), importance_rf[indices])
plt.xticks(range(len(importance_rf)), [feature_names[i] for i in indices], rotation=45)
plt.title('随机森林特征重要性')
plt.xlabel('特征')
plt.ylabel('重要性')
plt.tight_layout()
plt.show()

2.6.2 特征创建

数值特征创建

class FeatureCreator:
    def __init__(self, df):
        self.df = df.copy()
    
    def create_polynomial_features(self, columns, degree=2):
        """
        创建多项式特征
        """
        from sklearn.preprocessing import PolynomialFeatures
        
        poly = PolynomialFeatures(degree=degree, include_bias=False)
        poly_features = poly.fit_transform(self.df[columns])
        
        # 创建特征名称
        feature_names = poly.get_feature_names_out(columns)
        
        # 添加到数据框
        poly_df = pd.DataFrame(poly_features, columns=feature_names, index=self.df.index)
        
        return pd.concat([self.df, poly_df], axis=1)
    
    def create_interaction_features(self, column_pairs):
        """
        创建交互特征
        """
        df_new = self.df.copy()
        
        for col1, col2 in column_pairs:
            if col1 in df_new.columns and col2 in df_new.columns:
                # 乘积交互
                df_new[f'{col1}_x_{col2}'] = df_new[col1] * df_new[col2]
                
                # 比值交互(避免除零)
                df_new[f'{col1}_div_{col2}'] = df_new[col1] / (df_new[col2] + 1e-8)
                
                # 差值交互
                df_new[f'{col1}_minus_{col2}'] = df_new[col1] - df_new[col2]
                
                # 和值交互
                df_new[f'{col1}_plus_{col2}'] = df_new[col1] + df_new[col2]
        
        return df_new
    
    def create_binning_features(self, column, n_bins=5, strategy='quantile'):
        """
        创建分箱特征
        """
        from sklearn.preprocessing import KBinsDiscretizer
        
        df_new = self.df.copy()
        
        discretizer = KBinsDiscretizer(n_bins=n_bins, encode='ordinal', strategy=strategy)
        binned_feature = discretizer.fit_transform(df_new[[column]])
        
        df_new[f'{column}_binned'] = binned_feature.flatten()
        
        return df_new
    
    def create_aggregation_features(self, group_column, agg_columns, agg_functions=['mean', 'std', 'min', 'max']):
        """
        创建聚合特征
        """
        df_new = self.df.copy()
        
        for agg_col in agg_columns:
            for func in agg_functions:
                agg_feature = df_new.groupby(group_column)[agg_col].transform(func)
                df_new[f'{agg_col}_{func}_by_{group_column}'] = agg_feature
        
        return df_new
    
    def create_time_features(self, date_column):
        """
        创建时间特征
        """
        df_new = self.df.copy()
        
        # 确保是datetime类型
        df_new[date_column] = pd.to_datetime(df_new[date_column])
        
        # 提取时间特征
        df_new[f'{date_column}_year'] = df_new[date_column].dt.year
        df_new[f'{date_column}_month'] = df_new[date_column].dt.month
        df_new[f'{date_column}_day'] = df_new[date_column].dt.day
        df_new[f'{date_column}_dayofweek'] = df_new[date_column].dt.dayofweek
        df_new[f'{date_column}_quarter'] = df_new[date_column].dt.quarter
        df_new[f'{date_column}_is_weekend'] = (df_new[date_column].dt.dayofweek >= 5).astype(int)
        
        return df_new

# 使用特征创建
creator = FeatureCreator(df.dropna())

# 创建交互特征
interaction_pairs = [('age', 'experience'), ('income', 'age')]
df_with_interactions = creator.create_interaction_features(interaction_pairs)
print(f"添加交互特征后的列数: {df_with_interactions.shape[1]}")

# 创建分箱特征
df_with_binning = creator.create_binning_features('age', n_bins=5)
print(f"年龄分箱: {df_with_binning['age_binned'].value_counts().sort_index()}")

# 创建聚合特征
df_with_agg = creator.create_aggregation_features('education', ['age', 'income'])
print(f"添加聚合特征后的列数: {df_with_agg.shape[1]}")

类别特征编码

from sklearn.preprocessing import LabelEncoder, OneHotEncoder, OrdinalEncoder
from sklearn.preprocessing import TargetEncoder

class CategoricalEncoder:
    def __init__(self, df):
        self.df = df.copy()
        self.encoders = {}
    
    def label_encoding(self, columns):
        """
        标签编码
        """
        df_encoded = self.df.copy()
        
        for col in columns:
            if col in df_encoded.columns:
                le = LabelEncoder()
                df_encoded[f'{col}_label'] = le.fit_transform(df_encoded[col].astype(str))
                self.encoders[f'{col}_label'] = le
        
        return df_encoded
    
    def one_hot_encoding(self, columns, drop_first=True):
        """
        独热编码
        """
        df_encoded = self.df.copy()
        
        for col in columns:
            if col in df_encoded.columns:
                # 使用pandas的get_dummies
                dummies = pd.get_dummies(df_encoded[col], prefix=col, drop_first=drop_first)
                df_encoded = pd.concat([df_encoded, dummies], axis=1)
        
        return df_encoded
    
    def ordinal_encoding(self, columns, categories=None):
        """
        序数编码
        """
        df_encoded = self.df.copy()
        
        for col in columns:
            if col in df_encoded.columns:
                oe = OrdinalEncoder(categories=categories)
                df_encoded[f'{col}_ordinal'] = oe.fit_transform(df_encoded[[col]])
                self.encoders[f'{col}_ordinal'] = oe
        
        return df_encoded
    
    def target_encoding(self, columns, target):
        """
        目标编码
        """
        df_encoded = self.df.copy()
        
        for col in columns:
            if col in df_encoded.columns:
                # 计算每个类别的目标变量均值
                target_mean = df_encoded.groupby(col)[target].mean()
                df_encoded[f'{col}_target'] = df_encoded[col].map(target_mean)
        
        return df_encoded
    
    def frequency_encoding(self, columns):
        """
        频率编码
        """
        df_encoded = self.df.copy()
        
        for col in columns:
            if col in df_encoded.columns:
                freq_map = df_encoded[col].value_counts().to_dict()
                df_encoded[f'{col}_freq'] = df_encoded[col].map(freq_map)
        
        return df_encoded

# 使用类别特征编码
encoder = CategoricalEncoder(df.dropna())

# 标签编码
df_label = encoder.label_encoding(['education'])
print("标签编码结果:")
print(df_label[['education', 'education_label']].head())

# 独热编码
df_onehot = encoder.one_hot_encoding(['education'])
print(f"\n独热编码后的列数: {df_onehot.shape[1]}")
print("独热编码列名:")
print([col for col in df_onehot.columns if 'education_' in col])

# 频率编码
df_freq = encoder.frequency_encoding(['education'])
print("\n频率编码结果:")
print(df_freq[['education', 'education_freq']].head())

2.7 数据预处理管道

2.7.1 使用sklearn Pipeline

from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.base import BaseEstimator, TransformerMixin

# 自定义变换器
class OutlierRemover(BaseEstimator, TransformerMixin):
    def __init__(self, factor=1.5):
        self.factor = factor
    
    def fit(self, X, y=None):
        Q1 = np.percentile(X, 25, axis=0)
        Q3 = np.percentile(X, 75, axis=0)
        IQR = Q3 - Q1
        self.lower_bound = Q1 - self.factor * IQR
        self.upper_bound = Q3 + self.factor * IQR
        return self
    
    def transform(self, X):
        X_transformed = X.copy()
        for i in range(X.shape[1]):
            X_transformed[:, i] = np.clip(X_transformed[:, i], 
                                        self.lower_bound[i], 
                                        self.upper_bound[i])
        return X_transformed

class FeatureCreatorTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, create_interactions=True):
        self.create_interactions = create_interactions
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X_df = pd.DataFrame(X) if not isinstance(X, pd.DataFrame) else X
        
        if self.create_interactions and X_df.shape[1] >= 2:
            # 创建前两个特征的交互项
            interaction = X_df.iloc[:, 0] * X_df.iloc[:, 1]
            X_df['interaction_0_1'] = interaction
        
        return X_df.values

# 创建预处理管道
def create_preprocessing_pipeline():
    # 数值特征管道
    numeric_pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('outlier_remover', OutlierRemover(factor=1.5)),
        ('scaler', StandardScaler())
    ])
    
    # 类别特征管道
    categorical_pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
        ('onehot', OneHotEncoder(drop='first', sparse=False, handle_unknown='ignore'))
    ])
    
    return numeric_pipeline, categorical_pipeline

# 使用ColumnTransformer组合管道
def create_full_pipeline(numeric_features, categorical_features):
    numeric_pipeline, categorical_pipeline = create_preprocessing_pipeline()
    
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_pipeline, numeric_features),
            ('cat', categorical_pipeline, categorical_features)
        ]
    )
    
    # 完整管道(包括特征创建)
    full_pipeline = Pipeline([
        ('preprocessor', preprocessor),
        ('feature_creator', FeatureCreatorTransformer())
    ])
    
    return full_pipeline

# 示例使用
numeric_features = ['age', 'income', 'experience', 'score']
categorical_features = ['education']

full_pipeline = create_full_pipeline(numeric_features, categorical_features)

# 拟合和变换数据
X_processed = full_pipeline.fit_transform(df)
print(f"处理后的数据形状: {X_processed.shape}")

### 2.7.2 自定义预处理类

```python
class DataPreprocessor:
    """
    完整的数据预处理类
    """
    def __init__(self, 
                 missing_strategy='auto',
                 outlier_method='iqr',
                 scaling_method='standard',
                 encoding_method='onehot',
                 feature_selection=True,
                 create_features=True):
        
        self.missing_strategy = missing_strategy
        self.outlier_method = outlier_method
        self.scaling_method = scaling_method
        self.encoding_method = encoding_method
        self.feature_selection = feature_selection
        self.create_features = create_features
        
        self.preprocessors = {}
        self.feature_names = None
        self.selected_features = None
    
    def fit(self, X, y=None):
        """
        拟合预处理器
        """
        self.X_original = X.copy()
        self.y = y
        
        # 识别特征类型
        self.numeric_features = X.select_dtypes(include=[np.number]).columns.tolist()
        self.categorical_features = X.select_dtypes(include=['object']).columns.tolist()
        
        # 处理缺失值
        self._fit_missing_imputers(X)
        
        # 处理异常值
        self._fit_outlier_detectors(X)
        
        # 特征编码
        self._fit_encoders(X)
        
        # 特征缩放
        self._fit_scalers(X)
        
        # 特征选择
        if self.feature_selection and y is not None:
            self._fit_feature_selector(X, y)
        
        return self
    
    def transform(self, X):
        """
        变换数据
        """
        X_transformed = X.copy()
        
        # 处理缺失值
        X_transformed = self._transform_missing(X_transformed)
        
        # 处理异常值
        X_transformed = self._transform_outliers(X_transformed)
        
        # 特征编码
        X_transformed = self._transform_encoding(X_transformed)
        
        # 创建新特征
        if self.create_features:
            X_transformed = self._create_features(X_transformed)
        
        # 特征缩放
        X_transformed = self._transform_scaling(X_transformed)
        
        # 特征选择
        if self.feature_selection and self.selected_features is not None:
            X_transformed = X_transformed[self.selected_features]
        
        return X_transformed
    
    def fit_transform(self, X, y=None):
        """
        拟合并变换数据
        """
        return self.fit(X, y).transform(X)
    
    def _fit_missing_imputers(self, X):
        """拟合缺失值填充器"""
        from sklearn.impute import SimpleImputer, KNNImputer
        
        if self.missing_strategy == 'auto':
            # 自动选择策略
            for col in self.numeric_features:
                missing_ratio = X[col].isnull().sum() / len(X)
                if missing_ratio < 0.1:
                    strategy = 'median'
                else:
                    strategy = 'knn'
                
                if strategy == 'knn':
                    imputer = KNNImputer(n_neighbors=5)
                else:
                    imputer = SimpleImputer(strategy=strategy)
                
                self.preprocessors[f'{col}_imputer'] = imputer
                imputer.fit(X[[col]])
            
            for col in self.categorical_features:
                imputer = SimpleImputer(strategy='most_frequent')
                self.preprocessors[f'{col}_imputer'] = imputer
                imputer.fit(X[[col]])
    
    def _transform_missing(self, X):
        """变换缺失值"""
        X_transformed = X.copy()
        
        for col in self.numeric_features + self.categorical_features:
            if f'{col}_imputer' in self.preprocessors:
                imputer = self.preprocessors[f'{col}_imputer']
                X_transformed[col] = imputer.transform(X_transformed[[col]]).flatten()
        
        return X_transformed
    
    def _fit_outlier_detectors(self, X):
        """拟合异常值检测器"""
        if self.outlier_method == 'iqr':
            for col in self.numeric_features:
                Q1 = X[col].quantile(0.25)
                Q3 = X[col].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - 1.5 * IQR
                upper_bound = Q3 + 1.5 * IQR
                
                self.preprocessors[f'{col}_bounds'] = (lower_bound, upper_bound)
    
    def _transform_outliers(self, X):
        """变换异常值"""
        X_transformed = X.copy()
        
        if self.outlier_method == 'iqr':
            for col in self.numeric_features:
                if f'{col}_bounds' in self.preprocessors:
                    lower_bound, upper_bound = self.preprocessors[f'{col}_bounds']
                    X_transformed[col] = np.clip(X_transformed[col], lower_bound, upper_bound)
        
        return X_transformed
    
    def _fit_encoders(self, X):
        """拟合编码器"""
        if self.encoding_method == 'onehot':
            from sklearn.preprocessing import OneHotEncoder
            
            for col in self.categorical_features:
                encoder = OneHotEncoder(drop='first', sparse=False, handle_unknown='ignore')
                self.preprocessors[f'{col}_encoder'] = encoder
                encoder.fit(X[[col]])
    
    def _transform_encoding(self, X):
        """变换编码"""
        X_transformed = X.copy()
        
        if self.encoding_method == 'onehot':
            for col in self.categorical_features:
                if f'{col}_encoder' in self.preprocessors:
                    encoder = self.preprocessors[f'{col}_encoder']
                    encoded_features = encoder.transform(X_transformed[[col]])
                    feature_names = encoder.get_feature_names_out([col])
                    
                    # 添加编码后的特征
                    encoded_df = pd.DataFrame(encoded_features, 
                                            columns=feature_names, 
                                            index=X_transformed.index)
                    X_transformed = pd.concat([X_transformed, encoded_df], axis=1)
                    
                    # 删除原始类别特征
                    X_transformed = X_transformed.drop(col, axis=1)
        
        return X_transformed
    
    def _create_features(self, X):
        """创建新特征"""
        X_transformed = X.copy()
        
        # 获取数值特征(可能在编码后发生变化)
        current_numeric = X_transformed.select_dtypes(include=[np.number]).columns.tolist()
        
        # 创建交互特征(只对原始数值特征)
        original_numeric = [col for col in current_numeric if col in self.numeric_features]
        
        if len(original_numeric) >= 2:
            for i in range(len(original_numeric)):
                for j in range(i+1, len(original_numeric)):
                    col1, col2 = original_numeric[i], original_numeric[j]
                    X_transformed[f'{col1}_x_{col2}'] = X_transformed[col1] * X_transformed[col2]
        
        return X_transformed
    
    def _fit_scalers(self, X):
        """拟合缩放器"""
        from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
        
        if self.scaling_method == 'standard':
            scaler = StandardScaler()
        elif self.scaling_method == 'minmax':
            scaler = MinMaxScaler()
        elif self.scaling_method == 'robust':
            scaler = RobustScaler()
        else:
            return
        
        self.preprocessors['scaler'] = scaler
        
        # 只对数值特征进行缩放
        numeric_cols = X.select_dtypes(include=[np.number]).columns
        if len(numeric_cols) > 0:
            scaler.fit(X[numeric_cols])
    
    def _transform_scaling(self, X):
        """变换缩放"""
        if 'scaler' not in self.preprocessors:
            return X
        
        X_transformed = X.copy()
        scaler = self.preprocessors['scaler']
        
        # 获取当前的数值特征
        numeric_cols = X_transformed.select_dtypes(include=[np.number]).columns
        
        if len(numeric_cols) > 0:
            X_transformed[numeric_cols] = scaler.transform(X_transformed[numeric_cols])
        
        return X_transformed
    
    def _fit_feature_selector(self, X, y):
        """拟合特征选择器"""
        from sklearn.feature_selection import SelectKBest, f_classif
        
        # 先进行基本预处理以获得数值特征
        X_temp = self._basic_preprocess(X)
        
        # 特征选择
        selector = SelectKBest(score_func=f_classif, k=min(10, X_temp.shape[1]))
        selector.fit(X_temp, y)
        
        self.selected_features = X_temp.columns[selector.get_support()].tolist()
        self.preprocessors['feature_selector'] = selector
    
    def _basic_preprocess(self, X):
        """基本预处理(用于特征选择)"""
        X_temp = X.copy()
        
        # 简单填充
        for col in self.numeric_features:
            X_temp[col].fillna(X_temp[col].median(), inplace=True)
        
        for col in self.categorical_features:
            X_temp[col].fillna(X_temp[col].mode()[0] if len(X_temp[col].mode()) > 0 else 'unknown', inplace=True)
        
        # 简单编码
        for col in self.categorical_features:
            X_temp[col] = pd.Categorical(X_temp[col]).codes
        
        return X_temp
    
    def get_feature_importance(self):
        """获取特征重要性"""
        if 'feature_selector' in self.preprocessors:
            selector = self.preprocessors['feature_selector']
            scores = selector.scores_
            feature_names = self.X_original.columns
            
            importance_df = pd.DataFrame({
                'feature': feature_names,
                'importance': scores
            }).sort_values('importance', ascending=False)
            
            return importance_df
        
        return None

# 使用自定义预处理类
preprocessor = DataPreprocessor(
    missing_strategy='auto',
    outlier_method='iqr',
    scaling_method='standard',
    encoding_method='onehot',
    feature_selection=True,
    create_features=True
)

# 创建示例目标变量
y_example = np.random.choice([0, 1], size=len(df.dropna()))

# 拟合和变换
X_processed = preprocessor.fit_transform(df.dropna(), y_example)
print(f"预处理后的数据形状: {X_processed.shape}")

# 查看特征重要性
feature_importance = preprocessor.get_feature_importance()
if feature_importance is not None:
    print("\n特征重要性:")
    print(feature_importance.head())

2.8 实际案例:房价预测数据预处理

2.8.1 案例背景

我们将使用一个房价预测数据集来演示完整的数据预处理流程。

# 创建房价预测示例数据集
np.random.seed(42)
n_samples = 2000

# 生成房屋特征数据
house_data = {
    'area': np.random.normal(150, 50, n_samples),  # 面积
    'bedrooms': np.random.poisson(3, n_samples),   # 卧室数
    'bathrooms': np.random.poisson(2, n_samples),  # 浴室数
    'age': np.random.exponential(10, n_samples),   # 房龄
    'location': np.random.choice(['市中心', '郊区', '新区'], n_samples, p=[0.3, 0.4, 0.3]),
    'type': np.random.choice(['公寓', '别墅', '联排'], n_samples, p=[0.5, 0.3, 0.2]),
    'garage': np.random.choice([0, 1, 2], n_samples, p=[0.3, 0.5, 0.2]),
    'garden': np.random.choice([0, 1], n_samples, p=[0.6, 0.4]),
    'condition': np.random.choice(['差', '一般', '良好', '优秀'], n_samples, p=[0.1, 0.3, 0.4, 0.2])
}

# 生成房价(基于特征的线性组合加噪声)
base_price = (house_data['area'] * 100 + 
              house_data['bedrooms'] * 5000 + 
              house_data['bathrooms'] * 3000 - 
              house_data['age'] * 500)

# 位置调整
location_multiplier = {'市中心': 1.5, '郊区': 1.0, '新区': 1.2}
for i, loc in enumerate(house_data['location']):
    base_price[i] *= location_multiplier[loc]

# 添加噪声
house_data['price'] = base_price + np.random.normal(0, 10000, n_samples)
house_data['price'] = np.maximum(house_data['price'], 50000)  # 最低价格

# 创建DataFrame
house_df = pd.DataFrame(house_data)

# 引入数据质量问题
# 1. 缺失值
missing_indices = np.random.choice(house_df.index, size=int(0.1 * len(house_df)), replace=False)
house_df.loc[missing_indices[:50], 'bathrooms'] = np.nan
house_df.loc[missing_indices[50:100], 'age'] = np.nan
house_df.loc[missing_indices[100:], 'condition'] = np.nan

# 2. 异常值
outlier_indices = np.random.choice(house_df.index, size=20, replace=False)
house_df.loc[outlier_indices, 'area'] = house_df.loc[outlier_indices, 'area'] * 3

# 3. 重复数据
duplicate_indices = np.random.choice(house_df.index, size=50, replace=False)
house_df = pd.concat([house_df, house_df.loc[duplicate_indices]], ignore_index=True)

print("房价数据集基本信息:")
print(house_df.info())
print("\n数据集描述统计:")
print(house_df.describe())

2.8.2 完整预处理流程

class HousePricePreprocessor:
    """
    房价预测专用预处理器
    """
    def __init__(self):
        self.preprocessors = {}
        self.feature_names = None
    
    def preprocess(self, df, target_col='price'):
        """
        完整的预处理流程
        """
        print("开始数据预处理...")
        
        # 1. 数据清洗
        df_cleaned = self._data_cleaning(df)
        
        # 2. 特征工程
        df_featured = self._feature_engineering(df_cleaned)
        
        # 3. 异常值处理
        df_outliers = self._handle_outliers(df_featured)
        
        # 4. 缺失值处理
        df_imputed = self._handle_missing_values(df_outliers)
        
        # 5. 特征编码
        df_encoded = self._encode_features(df_imputed)
        
        # 6. 特征缩放
        df_scaled = self._scale_features(df_encoded, target_col)
        
        # 7. 特征选择
        if target_col in df_scaled.columns:
            X = df_scaled.drop(target_col, axis=1)
            y = df_scaled[target_col]
            X_selected = self._select_features(X, y)
            return X_selected, y
        else:
            return df_scaled, None
    
    def _data_cleaning(self, df):
        """数据清洗"""
        print("1. 数据清洗...")
        
        df_clean = df.copy()
        
        # 删除重复数据
        initial_shape = df_clean.shape
        df_clean = df_clean.drop_duplicates()
        print(f"   删除重复数据: {initial_shape[0] - df_clean.shape[0]} 行")
        
        # 数据类型转换
        df_clean['bedrooms'] = df_clean['bedrooms'].astype('int64')
        df_clean['garage'] = df_clean['garage'].astype('int64')
        df_clean['garden'] = df_clean['garden'].astype('int64')
        
        # 数据范围检查
        df_clean = df_clean[df_clean['area'] > 0]
        df_clean = df_clean[df_clean['bedrooms'] >= 0]
        df_clean = df_clean[df_clean['bathrooms'] >= 0]
        df_clean = df_clean[df_clean['age'] >= 0]
        
        print(f"   清洗后数据形状: {df_clean.shape}")
        return df_clean
    
    def _feature_engineering(self, df):
        """特征工程"""
        print("2. 特征工程...")
        
        df_featured = df.copy()
        
        # 创建新特征
        df_featured['rooms_total'] = df_featured['bedrooms'] + df_featured['bathrooms']
        df_featured['area_per_room'] = df_featured['area'] / (df_featured['rooms_total'] + 1)
        df_featured['is_new'] = (df_featured['age'] < 5).astype(int)
        df_featured['has_garage'] = (df_featured['garage'] > 0).astype(int)
        df_featured['luxury_score'] = (df_featured['bedrooms'] * 0.3 + 
                                     df_featured['bathrooms'] * 0.4 + 
                                     df_featured['garage'] * 0.2 + 
                                     df_featured['garden'] * 0.1)
        
        # 面积分箱
        df_featured['area_category'] = pd.cut(df_featured['area'], 
                                            bins=[0, 100, 150, 200, float('inf')],
                                            labels=['小', '中', '大', '超大'])
        
        # 房龄分箱
        df_featured['age_category'] = pd.cut(df_featured['age'],
                                           bins=[0, 5, 15, 30, float('inf')],
                                           labels=['新房', '次新', '老房', '超老'])
        
        print(f"   添加特征后数据形状: {df_featured.shape}")
        return df_featured
    
    def _handle_outliers(self, df):
        """异常值处理"""
        print("3. 异常值处理...")
        
        df_outliers = df.copy()
        
        # 使用IQR方法处理数值特征的异常值
        numeric_cols = ['area', 'age', 'price', 'area_per_room', 'luxury_score']
        
        for col in numeric_cols:
            if col in df_outliers.columns:
                Q1 = df_outliers[col].quantile(0.25)
                Q3 = df_outliers[col].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - 1.5 * IQR
                upper_bound = Q3 + 1.5 * IQR
                
                # 截断异常值
                outliers_count = ((df_outliers[col] < lower_bound) | 
                                (df_outliers[col] > upper_bound)).sum()
                df_outliers[col] = np.clip(df_outliers[col], lower_bound, upper_bound)
                
                print(f"   {col}: 处理了 {outliers_count} 个异常值")
        
        return df_outliers
    
    def _handle_missing_values(self, df):
        """缺失值处理"""
        print("4. 缺失值处理...")
        
        df_imputed = df.copy()
        
        # 数值特征:使用中位数填充
        numeric_cols = df_imputed.select_dtypes(include=[np.number]).columns
        for col in numeric_cols:
            if df_imputed[col].isnull().sum() > 0:
                median_value = df_imputed[col].median()
                df_imputed[col].fillna(median_value, inplace=True)
                print(f"   {col}: 使用中位数 {median_value:.2f} 填充")
        
        # 类别特征:使用众数填充
        categorical_cols = df_imputed.select_dtypes(include=['object', 'category']).columns
        for col in categorical_cols:
            if df_imputed[col].isnull().sum() > 0:
                mode_value = df_imputed[col].mode()[0] if len(df_imputed[col].mode()) > 0 else 'unknown'
                df_imputed[col].fillna(mode_value, inplace=True)
                print(f"   {col}: 使用众数 '{mode_value}' 填充")
        
        return df_imputed
    
    def _encode_features(self, df):
        """特征编码"""
        print("5. 特征编码...")
        
        df_encoded = df.copy()
        
        # 序数编码(有序类别)
        condition_mapping = {'差': 1, '一般': 2, '良好': 3, '优秀': 4}
        if 'condition' in df_encoded.columns:
            df_encoded['condition_encoded'] = df_encoded['condition'].map(condition_mapping)
        
        # 独热编码(无序类别)
        categorical_cols = ['location', 'type', 'area_category', 'age_category']
        
        for col in categorical_cols:
            if col in df_encoded.columns:
                dummies = pd.get_dummies(df_encoded[col], prefix=col, drop_first=True)
                df_encoded = pd.concat([df_encoded, dummies], axis=1)
                df_encoded.drop(col, axis=1, inplace=True)
        
        # 删除原始类别列
        cols_to_drop = ['condition']
        for col in cols_to_drop:
            if col in df_encoded.columns:
                df_encoded.drop(col, axis=1, inplace=True)
        
        print(f"   编码后数据形状: {df_encoded.shape}")
        return df_encoded
    
    def _scale_features(self, df, target_col):
        """特征缩放"""
        print("6. 特征缩放...")
        
        df_scaled = df.copy()
        
        # 只对数值特征进行缩放(排除目标变量)
        numeric_cols = df_scaled.select_dtypes(include=[np.number]).columns.tolist()
        if target_col in numeric_cols:
            numeric_cols.remove(target_col)
        
        if len(numeric_cols) > 0:
            scaler = StandardScaler()
            df_scaled[numeric_cols] = scaler.fit_transform(df_scaled[numeric_cols])
            self.preprocessors['scaler'] = scaler
            print(f"   标准化了 {len(numeric_cols)} 个数值特征")
        
        return df_scaled
    
    def _select_features(self, X, y):
        """特征选择"""
        print("7. 特征选择...")
        
        # 使用相关性进行特征选择
        correlation_with_target = X.corrwith(y).abs().sort_values(ascending=False)
        
        # 选择与目标变量相关性较高的特征
        selected_features = correlation_with_target[correlation_with_target > 0.1].index.tolist()
        
        # 去除高相关性特征
        corr_matrix = X[selected_features].corr().abs()
        upper_triangle = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
        
        high_corr_features = [column for column in upper_triangle.columns 
                             if any(upper_triangle[column] > 0.9)]
        
        final_features = [f for f in selected_features if f not in high_corr_features]
        
        print(f"   选择了 {len(final_features)} 个特征")
        print(f"   特征列表: {final_features[:10]}...")  # 显示前10个特征
        
        return X[final_features]

# 使用房价预处理器
house_preprocessor = HousePricePreprocessor()
X_processed, y_processed = house_preprocessor.preprocess(house_df)

print(f"\n最终处理结果:")
print(f"特征矩阵形状: {X_processed.shape}")
print(f"目标变量形状: {y_processed.shape}")
print(f"\n特征列表:")
print(X_processed.columns.tolist())

2.8.3 预处理效果评估

def evaluate_preprocessing_effect(original_df, processed_X, processed_y):
    """
    评估预处理效果
    """
    print("预处理效果评估:")
    print("=" * 50)
    
    # 1. 数据质量改善
    print("1. 数据质量改善:")
    print(f"   原始数据缺失值: {original_df.isnull().sum().sum()}")
    print(f"   处理后缺失值: {processed_X.isnull().sum().sum()}")
    
    print(f"   原始数据形状: {original_df.shape}")
    print(f"   处理后形状: {processed_X.shape}")
    
    # 2. 特征分布改善
    print("\n2. 特征分布改善:")
    
    # 选择几个关键特征进行比较
    key_features = ['area', 'age'] if 'area' in original_df.columns else []
    
    fig, axes = plt.subplots(2, len(key_features), figsize=(6*len(key_features), 8))
    
    for i, feature in enumerate(key_features):
        if feature in original_df.columns:
            # 原始分布
            axes[0, i].hist(original_df[feature].dropna(), bins=30, alpha=0.7, edgecolor='black')
            axes[0, i].set_title(f'原始 {feature} 分布')
            axes[0, i].set_xlabel(feature)
            axes[0, i].set_ylabel('频率')
            
            # 处理后分布(如果存在对应特征)
            processed_feature = feature if feature in processed_X.columns else None
            if processed_feature:
                axes[1, i].hist(processed_X[processed_feature], bins=30, alpha=0.7, edgecolor='black')
                axes[1, i].set_title(f'处理后 {feature} 分布')
                axes[1, i].set_xlabel(feature)
                axes[1, i].set_ylabel('频率')
    
    plt.tight_layout()
    plt.show()
    
    # 3. 特征相关性分析
    print("\n3. 特征相关性分析:")
    
    # 计算与目标变量的相关性
    correlations = processed_X.corrwith(processed_y).abs().sort_values(ascending=False)
    
    print("与目标变量相关性最高的前10个特征:")
    print(correlations.head(10))
    
    # 4. 可视化相关性矩阵
    plt.figure(figsize=(12, 10))
    
    # 选择相关性最高的特征进行可视化
    top_features = correlations.head(10).index.tolist()
    corr_matrix = processed_X[top_features].corr()
    
    sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0, 
                square=True, fmt='.2f')
    plt.title('处理后特征相关性矩阵(前10个特征)')
    plt.tight_layout()
    plt.show()

# 评估预处理效果
evaluate_preprocessing_effect(house_df, X_processed, y_processed)

2.9 本章小结

本章详细介绍了数据预处理与特征工程的核心技术,包括:

核心技术

  • 数据质量评估:缺失值、异常值、重复值的检测和分析
  • 缺失值处理:删除法、填充法、高级插补方法(KNN、MICE)
  • 异常值处理:统计方法(Z-Score、IQR)、机器学习方法(Isolation Forest、LOF)
  • 数据标准化:Z-Score、Min-Max、Robust、分位数标准化
  • 特征工程:特征选择、特征创建、特征编码

实践技能

  • 预处理管道:使用sklearn Pipeline构建可重用的预处理流程
  • 自定义预处理器:根据具体业务需求设计预处理类
  • 效果评估:通过可视化和统计指标评估预处理效果

最佳实践

  • 数据驱动:根据数据特点选择合适的预处理方法
  • 业务理解:结合业务知识进行特征工程
  • 迭代优化:通过实验不断改进预处理策略
  • 可重现性:确保预处理流程的一致性和可重现性

下一章预告

下一章我们将学习监督学习算法,包括: - 线性回归和逻辑回归 - 决策树和随机森林 - 支持向量机 - 朴素贝叶斯 - K近邻算法 - 模型评估和选择

2.10 练习题

基础练习

  1. 缺失值处理

    # 创建包含缺失值的数据集,尝试不同的填充方法
    # 比较填充效果
    
    1. 异常值检测

      # 使用不同方法检测异常值
      # 比较检测结果的差异
      
  2. 特征编码

    # 对类别变量进行不同类型的编码
    # 分析编码对模型性能的影响
    

    进阶练习

    1. 特征选择比较
    • 实现过滤法、包装法、嵌入法
    • 比较不同方法的选择结果
    • 分析特征选择对模型性能的影响
    1. 预处理管道设计
    • 设计一个完整的预处理管道
    • 包含所有必要的预处理步骤
    • 确保管道的可重用性
    1. 特征工程创新
    • 基于业务理解创建新特征

    • 使用多项式特征和交互特征

    • 评估新特征的有效性

      项目练习

    1. 完整数据预处理项目 选择一个真实数据集,完成:
    • 数据质量评估
    • 缺失值和异常值处理
    • 特征工程
    • 预处理效果评估
    • 撰写预处理报告
    1. 预处理方法对比研究
    • 在同一数据集上应用不同预处理方法

    • 训练相同的模型

    • 比较不同预处理方法对模型性能的影响

    • 总结最佳实践

      思考题

    1. 预处理策略选择
    • 如何根据数据特点选择合适的预处理方法?
    • 什么情况下应该删除异常值,什么情况下应该保留?
    • 如何平衡特征数量和模型复杂度?
    1. 业务场景应用
    • 在你熟悉的业务场景中,有哪些特定的数据质量问题?
    • 如何设计针对性的预处理策略?
    • 如何确保预处理的业务合理性? 这些练习将帮助你深入理解数据预处理的各个方面,为构建高质量的机器学习模型打下坚实基础。