📚 本章概述

数据预处理是机器学习流程中最重要的步骤之一,通常占据整个项目80%的时间。高质量的数据预处理直接决定了模型的性能上限。本章将深入讲解Scikit-learn中的数据预处理技术,包括数据清洗、特征缩放、标准化、特征选择和降维等核心内容。

🎯 学习目标

  • 掌握数据预处理的基本概念和重要性
  • 学会使用Scikit-learn进行数据清洗和转换
  • 理解特征缩放和标准化的原理及应用
  • 掌握特征选择和降维技术
  • 学会构建完整的数据预处理管道

📖 目录

  1. 数据预处理概述
  2. 数据清洗与转换
  3. 特征缩放与标准化
  4. 特征选择
  5. 降维技术
  6. 数据预处理管道
  7. 实战案例

1. 数据预处理概述

1.1 什么是数据预处理

数据预处理是指在将数据输入机器学习算法之前,对原始数据进行清洗、转换和优化的过程。

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from sklearn.feature_selection import SelectKBest, chi2, f_classif
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris, load_boston, make_classification
import warnings
warnings.filterwarnings('ignore')

# 设置中文字体和图形样式
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
sns.set_style("whitegrid")

class DataPreprocessingDemo:
    """数据预处理演示类"""
    
    def __init__(self):
        self.data = None
        self.processed_data = None
        
    def load_sample_data(self):
        """加载示例数据"""
        # 创建包含各种数据问题的示例数据集
        np.random.seed(42)
        n_samples = 1000
        
        # 生成基础特征
        age = np.random.normal(35, 10, n_samples)
        income = np.random.exponential(50000, n_samples)
        education_years = np.random.poisson(12, n_samples)
        
        # 添加缺失值
        age[np.random.choice(n_samples, 50, replace=False)] = np.nan
        income[np.random.choice(n_samples, 30, replace=False)] = np.nan
        
        # 添加异常值
        income[np.random.choice(n_samples, 20, replace=False)] *= 10
        
        # 分类特征
        gender = np.random.choice(['Male', 'Female'], n_samples)
        city = np.random.choice(['Beijing', 'Shanghai', 'Guangzhou', 'Shenzhen'], n_samples)
        
        # 创建DataFrame
        self.data = pd.DataFrame({
            'age': age,
            'income': income,
            'education_years': education_years,
            'gender': gender,
            'city': city
        })
        
        return self.data
    
    def analyze_data_quality(self):
        """分析数据质量"""
        if self.data is None:
            self.load_sample_data()
            
        print("=== 数据质量分析 ===")
        print(f"数据形状: {self.data.shape}")
        print(f"\n数据类型:")
        print(self.data.dtypes)
        
        print(f"\n缺失值统计:")
        missing_stats = self.data.isnull().sum()
        missing_percent = (missing_stats / len(self.data)) * 100
        missing_df = pd.DataFrame({
            '缺失数量': missing_stats,
            '缺失比例(%)': missing_percent
        })
        print(missing_df[missing_df['缺失数量'] > 0])
        
        print(f"\n数值特征统计:")
        print(self.data.describe())
        
        # 可视化数据分布
        self.visualize_data_distribution()
        
    def visualize_data_distribution(self):
        """可视化数据分布"""
        fig, axes = plt.subplots(2, 3, figsize=(15, 10))
        fig.suptitle('数据分布分析', fontsize=16)
        
        # 数值特征分布
        numeric_cols = ['age', 'income', 'education_years']
        for i, col in enumerate(numeric_cols):
            axes[0, i].hist(self.data[col].dropna(), bins=30, alpha=0.7)
            axes[0, i].set_title(f'{col} 分布')
            axes[0, i].set_xlabel(col)
            axes[0, i].set_ylabel('频次')
        
        # 分类特征分布
        categorical_cols = ['gender', 'city']
        for i, col in enumerate(categorical_cols):
            self.data[col].value_counts().plot(kind='bar', ax=axes[1, i])
            axes[1, i].set_title(f'{col} 分布')
            axes[1, i].set_xlabel(col)
            axes[1, i].set_ylabel('计数')
            axes[1, i].tick_params(axis='x', rotation=45)
        
        # 缺失值热图
        axes[1, 2].imshow(self.data.isnull(), cmap='viridis', aspect='auto')
        axes[1, 2].set_title('缺失值模式')
        axes[1, 2].set_xlabel('特征')
        axes[1, 2].set_ylabel('样本')
        
        plt.tight_layout()
        plt.show()

# 创建演示实例
demo = DataPreprocessingDemo()
data = demo.load_sample_data()
demo.analyze_data_quality()

1.2 数据预处理的重要性

class PreprocessingImportanceDemo:
    """数据预处理重要性演示"""
    
    def __init__(self):
        self.raw_data = None
        self.processed_data = None
        
    def demonstrate_scaling_importance(self):
        """演示特征缩放的重要性"""
        from sklearn.neighbors import KNeighborsClassifier
        from sklearn.metrics import accuracy_score
        
        # 创建具有不同尺度特征的数据
        np.random.seed(42)
        X1 = np.random.normal(0, 1, (1000, 1))  # 标准正态分布
        X2 = np.random.normal(0, 1000, (1000, 1))  # 大尺度特征
        y = (X1.ravel() + X2.ravel()/1000 > 0).astype(int)
        
        X = np.hstack([X1, X2])
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
        
        # 不进行缩放的模型
        knn_raw = KNeighborsClassifier(n_neighbors=5)
        knn_raw.fit(X_train, y_train)
        y_pred_raw = knn_raw.predict(X_test)
        acc_raw = accuracy_score(y_test, y_pred_raw)
        
        # 进行标准化的模型
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)
        
        knn_scaled = KNeighborsClassifier(n_neighbors=5)
        knn_scaled.fit(X_train_scaled, y_train)
        y_pred_scaled = knn_scaled.predict(X_test_scaled)
        acc_scaled = accuracy_score(y_test, y_pred_scaled)
        
        print("=== 特征缩放重要性演示 ===")
        print(f"原始数据准确率: {acc_raw:.4f}")
        print(f"标准化后准确率: {acc_scaled:.4f}")
        print(f"性能提升: {((acc_scaled - acc_raw) / acc_raw * 100):.2f}%")
        
        # 可视化决策边界
        self.plot_decision_boundary(X, y, knn_raw, knn_scaled, scaler)
        
    def plot_decision_boundary(self, X, y, model_raw, model_scaled, scaler):
        """绘制决策边界"""
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
        
        # 创建网格
        h = 0.02
        x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
        y_min, y_max = X[:, 1].min() - 100, X[:, 1].max() + 100
        xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
                            np.arange(y_min, y_max, h*100))
        
        # 原始数据决策边界
        mesh_points = np.c_[xx.ravel(), yy.ravel()]
        Z_raw = model_raw.predict(mesh_points)
        Z_raw = Z_raw.reshape(xx.shape)
        
        ax1.contourf(xx, yy, Z_raw, alpha=0.8, cmap=plt.cm.RdYlBu)
        scatter = ax1.scatter(X[:, 0], X[:, 1], c=y, cmap=plt.cm.RdYlBu, edgecolors='black')
        ax1.set_title('原始数据 (未缩放)')
        ax1.set_xlabel('特征1 (小尺度)')
        ax1.set_ylabel('特征2 (大尺度)')
        
        # 标准化数据决策边界
        X_scaled = scaler.transform(X)
        mesh_points_scaled = scaler.transform(mesh_points)
        Z_scaled = model_scaled.predict(mesh_points_scaled)
        Z_scaled = Z_scaled.reshape(xx.shape)
        
        # 重新创建缩放后的网格
        x_min_s, x_max_s = X_scaled[:, 0].min() - 1, X_scaled[:, 0].max() + 1
        y_min_s, y_max_s = X_scaled[:, 1].min() - 1, X_scaled[:, 1].max() + 1
        xx_s, yy_s = np.meshgrid(np.arange(x_min_s, x_max_s, 0.02),
                                np.arange(y_min_s, y_max_s, 0.02))
        
        mesh_scaled = np.c_[xx_s.ravel(), yy_s.ravel()]
        Z_s = model_scaled.predict(mesh_scaled)
        Z_s = Z_s.reshape(xx_s.shape)
        
        ax2.contourf(xx_s, yy_s, Z_s, alpha=0.8, cmap=plt.cm.RdYlBu)
        ax2.scatter(X_scaled[:, 0], X_scaled[:, 1], c=y, cmap=plt.cm.RdYlBu, edgecolors='black')
        ax2.set_title('标准化数据')
        ax2.set_xlabel('特征1 (标准化)')
        ax2.set_ylabel('特征2 (标准化)')
        
        plt.tight_layout()
        plt.show()

# 演示特征缩放的重要性
importance_demo = PreprocessingImportanceDemo()
importance_demo.demonstrate_scaling_importance()

2. 数据清洗与转换

2.1 处理缺失值

class MissingValueHandler:
    """缺失值处理类"""
    
    def __init__(self):
        self.strategies = {}
        
    def analyze_missing_patterns(self, data):
        """分析缺失值模式"""
        missing_info = pd.DataFrame({
            '缺失数量': data.isnull().sum(),
            '缺失比例': data.isnull().sum() / len(data),
            '数据类型': data.dtypes
        })
        
        print("=== 缺失值分析 ===")
        print(missing_info[missing_info['缺失数量'] > 0])
        
        # 可视化缺失值模式
        self.visualize_missing_patterns(data)
        
        return missing_info
    
    def visualize_missing_patterns(self, data):
        """可视化缺失值模式"""
        fig, axes = plt.subplots(1, 3, figsize=(18, 6))
        
        # 缺失值热图
        sns.heatmap(data.isnull(), cbar=True, ax=axes[0], cmap='viridis')
        axes[0].set_title('缺失值热图')
        
        # 缺失值条形图
        missing_counts = data.isnull().sum()
        missing_counts[missing_counts > 0].plot(kind='bar', ax=axes[1])
        axes[1].set_title('各特征缺失值数量')
        axes[1].set_ylabel('缺失值数量')
        axes[1].tick_params(axis='x', rotation=45)
        
        # 缺失值相关性
        missing_corr = data.isnull().corr()
        sns.heatmap(missing_corr, annot=True, cmap='coolwarm', center=0, ax=axes[2])
        axes[2].set_title('缺失值相关性')
        
        plt.tight_layout()
        plt.show()
    
    def handle_missing_values(self, data, strategies=None):
        """处理缺失值"""
        from sklearn.impute import SimpleImputer, KNNImputer
        from sklearn.experimental import enable_iterative_imputer
        from sklearn.impute import IterativeImputer
        
        if strategies is None:
            strategies = {
                'age': 'median',
                'income': 'knn',
                'education_years': 'mode',
                'gender': 'mode',
                'city': 'mode'
            }
        
        data_filled = data.copy()
        
        for column, strategy in strategies.items():
            if column not in data.columns:
                continue
                
            if data[column].isnull().sum() == 0:
                continue
                
            print(f"处理 {column} 的缺失值,策略: {strategy}")
            
            if strategy == 'mean':
                imputer = SimpleImputer(strategy='mean')
                data_filled[[column]] = imputer.fit_transform(data_filled[[column]])
                
            elif strategy == 'median':
                imputer = SimpleImputer(strategy='median')
                data_filled[[column]] = imputer.fit_transform(data_filled[[column]])
                
            elif strategy == 'mode':
                imputer = SimpleImputer(strategy='most_frequent')
                data_filled[[column]] = imputer.fit_transform(data_filled[[column]])
                
            elif strategy == 'knn':
                # 只对数值列使用KNN填充
                numeric_cols = data_filled.select_dtypes(include=[np.number]).columns
                knn_imputer = KNNImputer(n_neighbors=5)
                data_filled[numeric_cols] = knn_imputer.fit_transform(data_filled[numeric_cols])
                
            elif strategy == 'iterative':
                # 迭代填充
                numeric_cols = data_filled.select_dtypes(include=[np.number]).columns
                iterative_imputer = IterativeImputer(random_state=42)
                data_filled[numeric_cols] = iterative_imputer.fit_transform(data_filled[numeric_cols])
                
            elif strategy == 'drop':
                data_filled = data_filled.dropna(subset=[column])
                
        return data_filled
    
    def compare_imputation_methods(self, data, target_column='income'):
        """比较不同填充方法的效果"""
        from sklearn.metrics import mean_squared_error
        
        # 创建人工缺失值用于测试
        data_test = data.copy()
        complete_mask = ~data_test[target_column].isnull()
        complete_data = data_test[complete_mask].copy()
        
        # 随机创建缺失值
        np.random.seed(42)
        missing_indices = np.random.choice(len(complete_data), 
                                         size=int(0.2 * len(complete_data)), 
                                         replace=False)
        
        test_data = complete_data.copy()
        true_values = test_data.iloc[missing_indices][target_column].values
        test_data.iloc[missing_indices, test_data.columns.get_loc(target_column)] = np.nan
        
        methods = {
            'mean': SimpleImputer(strategy='mean'),
            'median': SimpleImputer(strategy='median'),
            'knn': KNNImputer(n_neighbors=5),
            'iterative': IterativeImputer(random_state=42)
        }
        
        results = {}
        
        for method_name, imputer in methods.items():
            # 只对数值列进行填充
            numeric_cols = test_data.select_dtypes(include=[np.number]).columns
            test_filled = test_data.copy()
            test_filled[numeric_cols] = imputer.fit_transform(test_filled[numeric_cols])
            
            predicted_values = test_filled.iloc[missing_indices][target_column].values
            mse = mean_squared_error(true_values, predicted_values)
            results[method_name] = mse
            
            print(f"{method_name} MSE: {mse:.2f}")
        
        # 可视化比较结果
        self.plot_imputation_comparison(results)
        
        return results
    
    def plot_imputation_comparison(self, results):
        """绘制填充方法比较图"""
        methods = list(results.keys())
        mse_values = list(results.values())
        
        plt.figure(figsize=(10, 6))
        bars = plt.bar(methods, mse_values, color=['skyblue', 'lightgreen', 'lightcoral', 'gold'])
        plt.title('不同填充方法的MSE比较')
        plt.xlabel('填充方法')
        plt.ylabel('均方误差 (MSE)')
        
        # 添加数值标签
        for bar, value in zip(bars, mse_values):
            plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + max(mse_values)*0.01,
                    f'{value:.2f}', ha='center', va='bottom')
        
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()

# 演示缺失值处理
missing_handler = MissingValueHandler()
missing_info = missing_handler.analyze_missing_patterns(data)
data_filled = missing_handler.handle_missing_values(data)
comparison_results = missing_handler.compare_imputation_methods(data)

2.2 异常值检测与处理

class OutlierDetector:
    """异常值检测与处理类"""
    
    def __init__(self):
        self.outlier_indices = {}
        
    def detect_outliers_statistical(self, data, columns=None, method='iqr'):
        """统计方法检测异常值"""
        if columns is None:
            columns = data.select_dtypes(include=[np.number]).columns
            
        outliers = {}
        
        for column in columns:
            if method == 'iqr':
                Q1 = data[column].quantile(0.25)
                Q3 = data[column].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - 1.5 * IQR
                upper_bound = Q3 + 1.5 * IQR
                outliers[column] = data[(data[column] < lower_bound) | 
                                      (data[column] > upper_bound)].index.tolist()
                
            elif method == 'zscore':
                z_scores = np.abs((data[column] - data[column].mean()) / data[column].std())
                outliers[column] = data[z_scores > 3].index.tolist()
                
            elif method == 'modified_zscore':
                median = data[column].median()
                mad = np.median(np.abs(data[column] - median))
                modified_z_scores = 0.6745 * (data[column] - median) / mad
                outliers[column] = data[np.abs(modified_z_scores) > 3.5].index.tolist()
        
        return outliers
    
    def detect_outliers_ml(self, data, columns=None):
        """机器学习方法检测异常值"""
        from sklearn.ensemble import IsolationForest
        from sklearn.neighbors import LocalOutlierFactor
        from sklearn.svm import OneClassSVM
        
        if columns is None:
            columns = data.select_dtypes(include=[np.number]).columns
            
        X = data[columns].fillna(data[columns].median())
        
        methods = {
            'isolation_forest': IsolationForest(contamination=0.1, random_state=42),
            'local_outlier_factor': LocalOutlierFactor(contamination=0.1),
            'one_class_svm': OneClassSVM(nu=0.1)
        }
        
        outlier_results = {}
        
        for method_name, detector in methods.items():
            if method_name == 'local_outlier_factor':
                outlier_labels = detector.fit_predict(X)
            else:
                outlier_labels = detector.fit_predict(X)
            
            outlier_indices = np.where(outlier_labels == -1)[0]
            outlier_results[method_name] = outlier_indices.tolist()
            
            print(f"{method_name} 检测到 {len(outlier_indices)} 个异常值")
        
        return outlier_results
    
    def visualize_outliers(self, data, column, outlier_indices):
        """可视化异常值"""
        fig, axes = plt.subplots(1, 3, figsize=(18, 6))
        
        # 箱线图
        data[column].plot(kind='box', ax=axes[0])
        axes[0].set_title(f'{column} 箱线图')
        axes[0].set_ylabel(column)
        
        # 直方图
        axes[1].hist(data[column].dropna(), bins=50, alpha=0.7, color='skyblue', label='正常值')
        if outlier_indices:
            axes[1].hist(data.loc[outlier_indices, column].dropna(), 
                        bins=20, alpha=0.7, color='red', label='异常值')
        axes[1].set_title(f'{column} 分布')
        axes[1].set_xlabel(column)
        axes[1].set_ylabel('频次')
        axes[1].legend()
        
        # 散点图(如果有索引)
        normal_indices = data.index.difference(outlier_indices)
        axes[2].scatter(normal_indices, data.loc[normal_indices, column], 
                       alpha=0.6, color='skyblue', label='正常值')
        if outlier_indices:
            axes[2].scatter(outlier_indices, data.loc[outlier_indices, column], 
                           alpha=0.8, color='red', label='异常值')
        axes[2].set_title(f'{column} 散点图')
        axes[2].set_xlabel('索引')
        axes[2].set_ylabel(column)
        axes[2].legend()
        
        plt.tight_layout()
        plt.show()
    
    def handle_outliers(self, data, outlier_indices, method='cap'):
        """处理异常值"""
        data_processed = data.copy()
        
        for column, indices in outlier_indices.items():
            if not indices:
                continue
                
            print(f"处理 {column} 的 {len(indices)} 个异常值,方法: {method}")
            
            if method == 'remove':
                data_processed = data_processed.drop(indices)
                
            elif method == 'cap':
                Q1 = data[column].quantile(0.25)
                Q3 = data[column].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - 1.5 * IQR
                upper_bound = Q3 + 1.5 * IQR
                
                data_processed.loc[data_processed[column] < lower_bound, column] = lower_bound
                data_processed.loc[data_processed[column] > upper_bound, column] = upper_bound
                
            elif method == 'transform':
                # 对数变换
                if (data_processed[column] > 0).all():
                    data_processed[column] = np.log1p(data_processed[column])
                    
            elif method == 'winsorize':
                from scipy.stats import mstats
                data_processed[column] = mstats.winsorize(data_processed[column], 
                                                        limits=[0.05, 0.05])
        
        return data_processed

# 演示异常值检测与处理
outlier_detector = OutlierDetector()

# 统计方法检测异常值
statistical_outliers = outlier_detector.detect_outliers_statistical(data_filled, 
                                                                   columns=['income'], 
                                                                   method='iqr')
print("=== 统计方法检测异常值 ===")
for column, indices in statistical_outliers.items():
    print(f"{column}: {len(indices)} 个异常值")

# 可视化异常值
outlier_detector.visualize_outliers(data_filled, 'income', statistical_outliers['income'])

# 机器学习方法检测异常值
ml_outliers = outlier_detector.detect_outliers_ml(data_filled)

# 处理异常值
data_no_outliers = outlier_detector.handle_outliers(data_filled, statistical_outliers, method='cap')

2.3 数据类型转换

class DataTypeConverter:
    """数据类型转换类"""
    
    def __init__(self):
        self.encoders = {}
        
    def analyze_data_types(self, data):
        """分析数据类型"""
        type_info = pd.DataFrame({
            '数据类型': data.dtypes,
            '唯一值数量': data.nunique(),
            '缺失值数量': data.isnull().sum(),
            '内存使用(MB)': data.memory_usage(deep=True) / 1024**2
        })
        
        print("=== 数据类型分析 ===")
        print(type_info)
        
        return type_info
    
    def optimize_data_types(self, data):
        """优化数据类型以节省内存"""
        data_optimized = data.copy()
        
        for column in data.columns:
            col_type = data[column].dtype
            
            if col_type != 'object':
                c_min = data[column].min()
                c_max = data[column].max()
                
                if str(col_type)[:3] == 'int':
                    if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                        data_optimized[column] = data_optimized[column].astype(np.int8)
                    elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                        data_optimized[column] = data_optimized[column].astype(np.int16)
                    elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                        data_optimized[column] = data_optimized[column].astype(np.int32)
                        
                elif str(col_type)[:5] == 'float':
                    if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                        data_optimized[column] = data_optimized[column].astype(np.float32)
            else:
                # 对于字符串类型,检查是否可以转换为category
                if data[column].nunique() / len(data) < 0.5:
                    data_optimized[column] = data_optimized[column].astype('category')
        
        # 显示内存节省情况
        original_memory = data.memory_usage(deep=True).sum() / 1024**2
        optimized_memory = data_optimized.memory_usage(deep=True).sum() / 1024**2
        
        print(f"原始内存使用: {original_memory:.2f} MB")
        print(f"优化后内存使用: {optimized_memory:.2f} MB")
        print(f"内存节省: {((original_memory - optimized_memory) / original_memory * 100):.2f}%")
        
        return data_optimized
    
    def encode_categorical_features(self, data, encoding_method='onehot'):
        """编码分类特征"""
        from sklearn.preprocessing import LabelEncoder, OneHotEncoder, OrdinalEncoder
        
        categorical_columns = data.select_dtypes(include=['object', 'category']).columns
        data_encoded = data.copy()
        
        if encoding_method == 'label':
            for column in categorical_columns:
                le = LabelEncoder()
                data_encoded[column] = le.fit_transform(data_encoded[column].astype(str))
                self.encoders[column] = le
                
        elif encoding_method == 'onehot':
            # 使用pandas的get_dummies进行独热编码
            data_encoded = pd.get_dummies(data_encoded, columns=categorical_columns, 
                                        prefix=categorical_columns)
            
        elif encoding_method == 'ordinal':
            for column in categorical_columns:
                oe = OrdinalEncoder()
                data_encoded[column] = oe.fit_transform(data_encoded[[column]])
                self.encoders[column] = oe
        
        print(f"使用 {encoding_method} 编码后的数据形状: {data_encoded.shape}")
        return data_encoded
    
    def handle_datetime_features(self, data, datetime_columns):
        """处理日期时间特征"""
        data_processed = data.copy()
        
        for column in datetime_columns:
            if column not in data.columns:
                continue
                
            # 转换为datetime类型
            data_processed[column] = pd.to_datetime(data_processed[column])
            
            # 提取时间特征
            data_processed[f'{column}_year'] = data_processed[column].dt.year
            data_processed[f'{column}_month'] = data_processed[column].dt.month
            data_processed[f'{column}_day'] = data_processed[column].dt.day
            data_processed[f'{column}_dayofweek'] = data_processed[column].dt.dayofweek
            data_processed[f'{column}_quarter'] = data_processed[column].dt.quarter
            data_processed[f'{column}_is_weekend'] = (data_processed[column].dt.dayofweek >= 5).astype(int)
            
            # 删除原始日期列
            data_processed = data_processed.drop(column, axis=1)
        
        return data_processed

# 演示数据类型转换
converter = DataTypeConverter()
type_info = converter.analyze_data_types(data_no_outliers)
data_optimized = converter.optimize_data_types(data_no_outliers)
data_encoded = converter.encode_categorical_features(data_optimized, encoding_method='onehot')

3. 特征缩放与标准化

3.1 标准化 (Standardization)

class FeatureScalingDemo:
    """特征缩放演示类"""
    
    def __init__(self):
        self.scalers = {}
        
    def demonstrate_standardization(self, data):
        """演示标准化"""
        from sklearn.preprocessing import StandardScaler
        
        # 选择数值特征
        numeric_columns = data.select_dtypes(include=[np.number]).columns
        X = data[numeric_columns]
        
        # 标准化
        scaler = StandardScaler()
        X_standardized = scaler.fit_transform(X)
        X_standardized_df = pd.DataFrame(X_standardized, columns=numeric_columns)
        
        # 比较标准化前后的统计信息
        print("=== 标准化前后对比 ===")
        print("原始数据统计:")
        print(X.describe())
        print("\n标准化后统计:")
        print(X_standardized_df.describe())
        
        # 可视化对比
        self.plot_scaling_comparison(X, X_standardized_df, "标准化")
        
        return X_standardized_df, scaler
    
    def demonstrate_normalization(self, data):
        """演示归一化"""
        from sklearn.preprocessing import MinMaxScaler
        
        numeric_columns = data.select_dtypes(include=[np.number]).columns
        X = data[numeric_columns]
        
        # 归一化到[0,1]
        scaler = MinMaxScaler()
        X_normalized = scaler.fit_transform(X)
        X_normalized_df = pd.DataFrame(X_normalized, columns=numeric_columns)
        
        print("=== 归一化前后对比 ===")
        print("原始数据范围:")
        print(f"最小值: {X.min()}")
        print(f"最大值: {X.max()}")
        print("\n归一化后范围:")
        print(f"最小值: {X_normalized_df.min()}")
        print(f"最大值: {X_normalized_df.max()}")
        
        # 可视化对比
        self.plot_scaling_comparison(X, X_normalized_df, "归一化")
        
        return X_normalized_df, scaler
    
    def demonstrate_robust_scaling(self, data):
        """演示鲁棒缩放"""
        from sklearn.preprocessing import RobustScaler
        
        numeric_columns = data.select_dtypes(include=[np.number]).columns
        X = data[numeric_columns]
        
        # 鲁棒缩放
        scaler = RobustScaler()
        X_robust = scaler.fit_transform(X)
        X_robust_df = pd.DataFrame(X_robust, columns=numeric_columns)
        
        print("=== 鲁棒缩放前后对比 ===")
        print("原始数据中位数和IQR:")
        print(f"中位数: {X.median()}")
        print(f"IQR: {X.quantile(0.75) - X.quantile(0.25)}")
        print("\n鲁棒缩放后中位数和IQR:")
        print(f"中位数: {X_robust_df.median()}")
        print(f"IQR: {X_robust_df.quantile(0.75) - X_robust_df.quantile(0.25)}")
        
        # 可视化对比
        self.plot_scaling_comparison(X, X_robust_df, "鲁棒缩放")
        
        return X_robust_df, scaler
    
    def plot_scaling_comparison(self, original, scaled, method_name):
        """绘制缩放前后对比图"""
        fig, axes = plt.subplots(2, len(original.columns), figsize=(15, 8))
        fig.suptitle(f'{method_name}前后对比', fontsize=16)
        
        for i, column in enumerate(original.columns):
            # 原始数据分布
            axes[0, i].hist(original[column].dropna(), bins=30, alpha=0.7, color='skyblue')
            axes[0, i].set_title(f'原始 {column}')
            axes[0, i].set_ylabel('频次')
            
            # 缩放后数据分布
            axes[1, i].hist(scaled[column].dropna(), bins=30, alpha=0.7, color='lightgreen')
            axes[1, i].set_title(f'{method_name}后 {column}')
            axes[1, i].set_xlabel(column)
            axes[1, i].set_ylabel('频次')
        
        plt.tight_layout()
        plt.show()
    
    def compare_scaling_methods(self, data):
        """比较不同缩放方法"""
        from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, MaxAbsScaler
        
        numeric_columns = data.select_dtypes(include=[np.number]).columns
        X = data[numeric_columns].iloc[:, 0]  # 选择第一个数值列进行演示
        
        scalers = {
            '原始数据': None,
            '标准化': StandardScaler(),
            '归一化': MinMaxScaler(),
            '鲁棒缩放': RobustScaler(),
            '最大绝对值缩放': MaxAbsScaler()
        }
        
        fig, axes = plt.subplots(1, len(scalers), figsize=(20, 4))
        fig.suptitle('不同缩放方法比较', fontsize=16)
        
        for i, (method_name, scaler) in enumerate(scalers.items()):
            if scaler is None:
                data_scaled = X
            else:
                data_scaled = scaler.fit_transform(X.values.reshape(-1, 1)).flatten()
            
            axes[i].hist(data_scaled, bins=30, alpha=0.7, color=plt.cm.Set3(i))
            axes[i].set_title(method_name)
            axes[i].set_xlabel('值')
            axes[i].set_ylabel('频次')
            
            # 添加统计信息
            axes[i].text(0.05, 0.95, f'均值: {np.mean(data_scaled):.2f}\n标准差: {np.std(data_scaled):.2f}',
                        transform=axes[i].transAxes, verticalalignment='top',
                        bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
        
        plt.tight_layout()
        plt.show()

# 演示特征缩放
scaling_demo = FeatureScalingDemo()

# 标准化
X_standardized, std_scaler = scaling_demo.demonstrate_standardization(data_encoded)

# 归一化
X_normalized, minmax_scaler = scaling_demo.demonstrate_normalization(data_encoded)

# 鲁棒缩放
X_robust, robust_scaler = scaling_demo.demonstrate_robust_scaling(data_encoded)

# 比较不同缩放方法
scaling_demo.compare_scaling_methods(data_encoded)

3.2 缩放方法选择指南

class ScalingMethodSelector:
    """缩放方法选择指南"""
    
    def __init__(self):
        self.recommendations = {}
        
    def analyze_data_distribution(self, data):
        """分析数据分布特征"""
        numeric_columns = data.select_dtypes(include=[np.number]).columns
        analysis_results = {}
        
        for column in numeric_columns:
            col_data = data[column].dropna()
            
            # 计算分布特征
            skewness = col_data.skew()
            kurtosis = col_data.kurtosis()
            outlier_ratio = self.calculate_outlier_ratio(col_data)
            
            analysis_results[column] = {
                'skewness': skewness,
                'kurtosis': kurtosis,
                'outlier_ratio': outlier_ratio,
                'min': col_data.min(),
                'max': col_data.max(),
                'std': col_data.std()
            }
        
        return analysis_results
    
    def calculate_outlier_ratio(self, data):
        """计算异常值比例"""
        Q1 = data.quantile(0.25)
        Q3 = data.quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        
        outliers = data[(data < lower_bound) | (data > upper_bound)]
        return len(outliers) / len(data)
    
    def recommend_scaling_method(self, analysis_results):
        """推荐缩放方法"""
        recommendations = {}
        
        for column, stats in analysis_results.items():
            recommendation = []
            
            # 基于偏度推荐
            if abs(stats['skewness']) < 0.5:
                recommendation.append("数据接近正态分布,推荐标准化")
            elif abs(stats['skewness']) > 2:
                recommendation.append("数据严重偏斜,考虑对数变换后标准化")
            
            # 基于异常值比例推荐
            if stats['outlier_ratio'] > 0.1:
                recommendation.append("异常值较多,推荐鲁棒缩放")
            elif stats['outlier_ratio'] < 0.05:
                recommendation.append("异常值较少,可使用标准化或归一化")
            
            # 基于数据范围推荐
            if stats['max'] - stats['min'] > 1000:
                recommendation.append("数据范围很大,推荐标准化")
            elif stats['min'] >= 0:
                recommendation.append("数据非负,可考虑归一化")
            
            recommendations[column] = recommendation
        
        return recommendations
    
    def create_scaling_decision_tree(self):
        """创建缩放方法决策树"""
        decision_tree = """
        缩放方法选择决策树:
        
        1. 数据是否包含异常值?
           ├─ 是 (>10%) → 使用鲁棒缩放 (RobustScaler)
           └─ 否 (<5%) → 继续下一步
        
        2. 数据分布是否接近正态分布?
           ├─ 是 (|偏度| < 0.5) → 使用标准化 (StandardScaler)
           └─ 否 → 继续下一步
        
        3. 数据是否严重偏斜?
           ├─ 是 (|偏度| > 2) → 先对数变换,再标准化
           └─ 否 → 继续下一步
        
        4. 需要保持数据在特定范围内?
           ├─ 是 [0,1] → 使用归一化 (MinMaxScaler)
           ├─ 是 [-1,1] → 使用最大绝对值缩放 (MaxAbsScaler)
           └─ 否 → 使用标准化 (StandardScaler)
        
        5. 特殊情况:
           - 稀疏数据 → MaxAbsScaler 或不缩放
           - 神经网络 → StandardScaler 或 MinMaxScaler
           - 树模型 → 通常不需要缩放
           - 距离算法 → 必须缩放,推荐 StandardScaler
        """
        
        print(decision_tree)
        return decision_tree

# 演示缩放方法选择
selector = ScalingMethodSelector()
analysis_results = selector.analyze_data_distribution(data_encoded)
recommendations = selector.recommend_scaling_method(analysis_results)

print("=== 缩放方法推荐 ===")
for column, recs in recommendations.items():
    print(f"\n{column}:")
    for rec in recs:
        print(f"  - {rec}")

selector.create_scaling_decision_tree()

4. 特征选择

4.1 过滤法特征选择

class FilterFeatureSelector:
    """过滤法特征选择"""
    
    def __init__(self):
        self.selected_features = {}
        self.scores = {}
        
    def univariate_selection(self, X, y, k=10, score_func=None):
        """单变量特征选择"""
        from sklearn.feature_selection import SelectKBest, chi2, f_classif, f_regression, mutual_info_classif
        
        if score_func is None:
            # 根据目标变量类型选择评分函数
            if len(np.unique(y)) < 10:  # 分类问题
                score_func = f_classif
            else:  # 回归问题
                score_func = f_regression
        
        selector = SelectKBest(score_func=score_func, k=k)
        X_selected = selector.fit_transform(X, y)
        
        # 获取特征分数和选中的特征
        feature_scores = selector.scores_
        selected_features = selector.get_support(indices=True)
        
        self.scores['univariate'] = feature_scores
        self.selected_features['univariate'] = selected_features
        
        print(f"=== 单变量特征选择 ===")
        print(f"原始特征数: {X.shape[1]}")
        print(f"选择特征数: {k}")
        print(f"选中的特征索引: {selected_features}")
        
        # 可视化特征分数
        self.plot_feature_scores(feature_scores, selected_features, "单变量特征选择")
        
        return X_selected, selected_features
    
    def correlation_selection(self, X, threshold=0.95):
        """基于相关性的特征选择"""
        # 计算特征间相关性
        corr_matrix = np.corrcoef(X.T)
        
        # 找到高相关性的特征对
        high_corr_pairs = []
        features_to_remove = set()
        
        for i in range(len(corr_matrix)):
            for j in range(i+1, len(corr_matrix)):
                if abs(corr_matrix[i, j]) > threshold:
                    high_corr_pairs.append((i, j, corr_matrix[i, j]))
                    # 移除其中一个特征(保留索引较小的)
                    features_to_remove.add(j)
        
        selected_features = [i for i in range(X.shape[1]) if i not in features_to_remove]
        X_selected = X[:, selected_features]
        
        self.selected_features['correlation'] = selected_features
        
        print(f"=== 相关性特征选择 ===")
        print(f"相关性阈值: {threshold}")
        print(f"发现 {len(high_corr_pairs)} 对高相关特征")
        print(f"移除 {len(features_to_remove)} 个特征")
        print(f"保留 {len(selected_features)} 个特征")
        
        # 可视化相关性矩阵
        self.plot_correlation_matrix(corr_matrix, features_to_remove)
        
        return X_selected, selected_features
    
    def variance_selection(self, X, threshold=0.01):
        """基于方差的特征选择"""
        from sklearn.feature_selection import VarianceThreshold
        
        selector = VarianceThreshold(threshold=threshold)
        X_selected = selector.fit_transform(X)
        
        selected_features = selector.get_support(indices=True)
        feature_variances = np.var(X, axis=0)
        
        self.selected_features['variance'] = selected_features
        self.scores['variance'] = feature_variances
        
        print(f"=== 方差特征选择 ===")
        print(f"方差阈值: {threshold}")
        print(f"原始特征数: {X.shape[1]}")
        print(f"选择特征数: {X_selected.shape[1]}")
        print(f"移除的低方差特征数: {X.shape[1] - X_selected.shape[1]}")
        
        # 可视化特征方差
        self.plot_feature_variances(feature_variances, threshold)
        
        return X_selected, selected_features
    
    def plot_feature_scores(self, scores, selected_features, title):
        """绘制特征分数"""
        plt.figure(figsize=(12, 6))
        
        feature_indices = range(len(scores))
        colors = ['red' if i in selected_features else 'lightblue' for i in feature_indices]
        
        bars = plt.bar(feature_indices, scores, color=colors, alpha=0.7)
        plt.title(f'{title} - 特征分数')
        plt.xlabel('特征索引')
        plt.ylabel('分数')
        
        # 添加图例
        red_patch = plt.Rectangle((0, 0), 1, 1, fc="red", alpha=0.7)
        blue_patch = plt.Rectangle((0, 0), 1, 1, fc="lightblue", alpha=0.7)
        plt.legend([red_patch, blue_patch], ['选中特征', '未选中特征'])
        
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()
    
    def plot_correlation_matrix(self, corr_matrix, removed_features):
        """绘制相关性矩阵"""
        plt.figure(figsize=(10, 8))
        
        # 创建掩码来突出显示被移除的特征
        mask = np.zeros_like(corr_matrix, dtype=bool)
        for feature in removed_features:
            mask[feature, :] = True
            mask[:, feature] = True
        
        sns.heatmap(corr_matrix, annot=False, cmap='coolwarm', center=0,
                   mask=mask, square=True, cbar_kws={"shrink": .8})
        plt.title('特征相关性矩阵 (灰色区域为被移除的特征)')
        plt.tight_layout()
        plt.show()
    
    def plot_feature_variances(self, variances, threshold):
        """绘制特征方差"""
        plt.figure(figsize=(12, 6))
        
        feature_indices = range(len(variances))
        colors = ['red' if var > threshold else 'lightblue' for var in variances]
        
        plt.bar(feature_indices, variances, color=colors, alpha=0.7)
        plt.axhline(y=threshold, color='black', linestyle='--', label=f'阈值 = {threshold}')
        plt.title('特征方差分布')
        plt.xlabel('特征索引')
        plt.ylabel('方差')
        plt.legend()
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()

# 创建示例数据进行特征选择演示
from sklearn.datasets import make_classification

# 生成分类数据
X_demo, y_demo = make_classification(n_samples=1000, n_features=20, n_informative=10, 
                                   n_redundant=5, n_clusters_per_class=1, random_state=42)

# 演示过滤法特征选择
filter_selector = FilterFeatureSelector()

# 单变量特征选择
X_univariate, selected_univariate = filter_selector.univariate_selection(X_demo, y_demo, k=10)

# 相关性特征选择
X_correlation, selected_correlation = filter_selector.correlation_selection(X_demo, threshold=0.8)

# 方差特征选择
X_variance, selected_variance = filter_selector.variance_selection(X_demo, threshold=0.1)

4.2 包装法特征选择

class WrapperFeatureSelector:
    """包装法特征选择"""
    
    def __init__(self):
        self.selected_features = {}
        self.feature_importance = {}
        
    def recursive_feature_elimination(self, X, y, estimator=None, n_features=10):
        """递归特征消除"""
        from sklearn.feature_selection import RFE
        from sklearn.linear_model import LogisticRegression
        from sklearn.ensemble import RandomForestClassifier
        
        if estimator is None:
            estimator = RandomForestClassifier(n_estimators=100, random_state=42)
        
        selector = RFE(estimator=estimator, n_features_to_select=n_features)
        X_selected = selector.fit_transform(X, y)
        
        selected_features = selector.get_support(indices=True)
        feature_ranking = selector.ranking_
        
        self.selected_features['rfe'] = selected_features
        self.feature_importance['rfe'] = feature_ranking
        
        print(f"=== 递归特征消除 ===")
        print(f"原始特征数: {X.shape[1]}")
        print(f"选择特征数: {n_features}")
        print(f"选中的特征索引: {selected_features}")
        
        # 可视化特征排名
        self.plot_feature_ranking(feature_ranking, selected_features, "递归特征消除")
        
        return X_selected, selected_features
    
    def sequential_feature_selection(self, X, y, direction='forward', n_features=10):
        """序列特征选择"""
        from sklearn.feature_selection import SequentialFeatureSelector
        from sklearn.ensemble import RandomForestClassifier
        
        estimator = RandomForestClassifier(n_estimators=100, random_state=42)
        selector = SequentialFeatureSelector(estimator, n_features_to_select=n_features, 
                                           direction=direction, cv=5)
        
        X_selected = selector.fit_transform(X, y)
        selected_features = selector.get_support(indices=True)
        
        self.selected_features[f'sfs_{direction}'] = selected_features
        
        print(f"=== 序列特征选择 ({direction}) ===")
        print(f"原始特征数: {X.shape[1]}")
        print(f"选择特征数: {n_features}")
        print(f"选中的特征索引: {selected_features}")
        
        return X_selected, selected_features
    
    def genetic_algorithm_selection(self, X, y, n_features=10, population_size=50, generations=20):
        """遗传算法特征选择"""
        from sklearn.ensemble import RandomForestClassifier
        from sklearn.model_selection import cross_val_score
        
        # 简化的遗传算法实现
        def fitness_function(features_mask):
            if np.sum(features_mask) == 0:
                return 0
            X_subset = X[:, features_mask]
            estimator = RandomForestClassifier(n_estimators=50, random_state=42)
            scores = cross_val_score(estimator, X_subset, y, cv=3, scoring='accuracy')
            return np.mean(scores)
        
        # 初始化种群
        population = []
        for _ in range(population_size):
            individual = np.random.choice([True, False], size=X.shape[1], 
                                        p=[n_features/X.shape[1], 1-n_features/X.shape[1]])
            population.append(individual)
        
        best_fitness = 0
        best_individual = None
        
        for generation in range(generations):
            # 计算适应度
            fitness_scores = [fitness_function(individual) for individual in population]
            
            # 记录最佳个体
            max_fitness_idx = np.argmax(fitness_scores)
            if fitness_scores[max_fitness_idx] > best_fitness:
                best_fitness = fitness_scores[max_fitness_idx]
                best_individual = population[max_fitness_idx].copy()
            
            # 选择、交叉、变异(简化实现)
            new_population = []
            for _ in range(population_size):
                # 锦标赛选择
                tournament_size = 3
                tournament_indices = np.random.choice(population_size, tournament_size)
                tournament_fitness = [fitness_scores[i] for i in tournament_indices]
                winner_idx = tournament_indices[np.argmax(tournament_fitness)]
                new_population.append(population[winner_idx].copy())
            
            population = new_population
            
            if generation % 5 == 0:
                print(f"第 {generation} 代,最佳适应度: {best_fitness:.4f}")
        
        selected_features = np.where(best_individual)[0]
        X_selected = X[:, best_individual]
        
        self.selected_features['genetic'] = selected_features
        
        print(f"=== 遗传算法特征选择 ===")
        print(f"最终适应度: {best_fitness:.4f}")
        print(f"选中的特征索引: {selected_features}")
        
        return X_selected, selected_features
    
    def plot_feature_ranking(self, ranking, selected_features, title):
        """绘制特征排名"""
        plt.figure(figsize=(12, 6))
        
        feature_indices = range(len(ranking))
        colors = ['red' if i in selected_features else 'lightblue' for i in feature_indices]
        
        plt.bar(feature_indices, ranking, color=colors, alpha=0.7)
        plt.title(f'{title} - 特征排名 (1=最重要)')
        plt.xlabel('特征索引')
        plt.ylabel('排名')
        
        # 添加图例
        red_patch = plt.Rectangle((0, 0), 1, 1, fc="red", alpha=0.7)
        blue_patch = plt.Rectangle((0, 0), 1, 1, fc="lightblue", alpha=0.7)
        plt.legend([red_patch, blue_patch], ['选中特征', '未选中特征'])
        
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()

# 演示包装法特征选择
wrapper_selector = WrapperFeatureSelector()

# 递归特征消除
X_rfe, selected_rfe = wrapper_selector.recursive_feature_elimination(X_demo, y_demo, n_features=10)

# 前向序列特征选择
X_sfs_forward, selected_sfs_forward = wrapper_selector.sequential_feature_selection(
    X_demo, y_demo, direction='forward', n_features=10)

# 后向序列特征选择
X_sfs_backward, selected_sfs_backward = wrapper_selector.sequential_feature_selection(
    X_demo, y_demo, direction='backward', n_features=10)

# 遗传算法特征选择
X_genetic, selected_genetic = wrapper_selector.genetic_algorithm_selection(
    X_demo, y_demo, n_features=10, population_size=30, generations=10)

4.3 嵌入法特征选择

class EmbeddedFeatureSelector:
    """嵌入法特征选择"""
    
    def __init__(self):
        self.selected_features = {}
        self.feature_importance = {}
        
    def lasso_selection(self, X, y, alpha=0.01):
        """基于Lasso的特征选择"""
        from sklearn.linear_model import LassoCV, Lasso
        from sklearn.preprocessing import StandardScaler
        
        # 标准化数据
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X)
        
        # 使用交叉验证选择最佳alpha
        lasso_cv = LassoCV(cv=5, random_state=42)
        lasso_cv.fit(X_scaled, y)
        
        # 使用最佳alpha训练Lasso模型
        lasso = Lasso(alpha=lasso_cv.alpha_)
        lasso.fit(X_scaled, y)
        
        # 选择非零系数的特征
        selected_features = np.where(lasso.coef_ != 0)[0]
        feature_importance = np.abs(lasso.coef_)
        
        self.selected_features['lasso'] = selected_features
        self.feature_importance['lasso'] = feature_importance
        
        print(f"=== Lasso特征选择 ===")
        print(f"最佳alpha: {lasso_cv.alpha_:.6f}")
        print(f"原始特征数: {X.shape[1]}")
        print(f"选择特征数: {len(selected_features)}")
        print(f"选中的特征索引: {selected_features}")
        
        # 可视化特征重要性
        self.plot_feature_importance(feature_importance, selected_features, "Lasso特征选择")
        
        return X[:, selected_features], selected_features
    
    def tree_based_selection(self, X, y, estimator_type='random_forest', threshold='median'):
        """基于树模型的特征选择"""
        from sklearn.ensemble import RandomForestClassifier, ExtraTreesClassifier
        from sklearn.tree import DecisionTreeClassifier
        from sklearn.feature_selection import SelectFromModel
        
        # 选择估计器
        if estimator_type == 'random_forest':
            estimator = RandomForestClassifier(n_estimators=100, random_state=42)
        elif estimator_type == 'extra_trees':
            estimator = ExtraTreesClassifier(n_estimators=100, random_state=42)
        elif estimator_type == 'decision_tree':
            estimator = DecisionTreeClassifier(random_state=42)
        
        # 训练模型并选择特征
        selector = SelectFromModel(estimator, threshold=threshold)
        X_selected = selector.fit_transform(X, y)
        
        selected_features = selector.get_support(indices=True)
        feature_importance = estimator.fit(X, y).feature_importances_
        
        self.selected_features[f'tree_{estimator_type}'] = selected_features
        self.feature_importance[f'tree_{estimator_type}'] = feature_importance
        
        print(f"=== {estimator_type}特征选择 ===")
        print(f"阈值: {threshold}")
        print(f"原始特征数: {X.shape[1]}")
        print(f"选择特征数: {len(selected_features)}")
        print(f"选中的特征索引: {selected_features}")
        
        # 可视化特征重要性
        self.plot_feature_importance(feature_importance, selected_features, 
                                   f"{estimator_type}特征选择")
        
        return X_selected, selected_features
    
    def elastic_net_selection(self, X, y, l1_ratio=0.5):
        """基于弹性网络的特征选择"""
        from sklearn.linear_model import ElasticNetCV, ElasticNet
        from sklearn.preprocessing import StandardScaler
        
        # 标准化数据
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X)
        
        # 使用交叉验证选择最佳参数
        elastic_cv = ElasticNetCV(l1_ratio=l1_ratio, cv=5, random_state=42)
        elastic_cv.fit(X_scaled, y)
        
        # 使用最佳参数训练模型
        elastic = ElasticNet(alpha=elastic_cv.alpha_, l1_ratio=l1_ratio)
        elastic.fit(X_scaled, y)
        
        # 选择非零系数的特征
        selected_features = np.where(elastic.coef_ != 0)[0]
        feature_importance = np.abs(elastic.coef_)
        
        self.selected_features['elastic_net'] = selected_features
        self.feature_importance['elastic_net'] = feature_importance
        
        print(f"=== 弹性网络特征选择 ===")
        print(f"最佳alpha: {elastic_cv.alpha_:.6f}")
        print(f"L1比例: {l1_ratio}")
        print(f"原始特征数: {X.shape[1]}")
        print(f"选择特征数: {len(selected_features)}")
        print(f"选中的特征索引: {selected_features}")
        
        # 可视化特征重要性
        self.plot_feature_importance(feature_importance, selected_features, "弹性网络特征选择")
        
        return X[:, selected_features], selected_features
    
    def plot_feature_importance(self, importance, selected_features, title):
        """绘制特征重要性"""
        plt.figure(figsize=(12, 6))
        
        feature_indices = range(len(importance))
        colors = ['red' if i in selected_features else 'lightblue' for i in feature_indices]
        
        plt.bar(feature_indices, importance, color=colors, alpha=0.7)
        plt.title(f'{title} - 特征重要性')
        plt.xlabel('特征索引')
        plt.ylabel('重要性')
        
        # 添加图例
        red_patch = plt.Rectangle((0, 0), 1, 1, fc="red", alpha=0.7)
        blue_patch = plt.Rectangle((0, 0), 1, 1, fc="lightblue", alpha=0.7)
        plt.legend([red_patch, blue_patch], ['选中特征', '未选中特征'])
        
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()
    
    def compare_selection_methods(self, X, y):
        """比较不同特征选择方法"""
        from sklearn.model_selection import cross_val_score
        from sklearn.ensemble import RandomForestClassifier
        
        methods_results = {}
        
        # 原始数据
        estimator = RandomForestClassifier(n_estimators=100, random_state=42)
        original_scores = cross_val_score(estimator, X, y, cv=5, scoring='accuracy')
        methods_results['原始数据'] = {
            'n_features': X.shape[1],
            'cv_score': np.mean(original_scores),
            'cv_std': np.std(original_scores)
        }
        
        # 各种特征选择方法
        for method_name, features in self.selected_features.items():
            if len(features) > 0:
                X_subset = X[:, features]
                scores = cross_val_score(estimator, X_subset, y, cv=5, scoring='accuracy')
                methods_results[method_name] = {
                    'n_features': len(features),
                    'cv_score': np.mean(scores),
                    'cv_std': np.std(scores)
                }
        
        # 可视化比较结果
        self.plot_methods_comparison(methods_results)
        
        return methods_results
    
    def plot_methods_comparison(self, results):
        """绘制方法比较图"""
        methods = list(results.keys())
        scores = [results[method]['cv_score'] for method in methods]
        stds = [results[method]['cv_std'] for method in methods]
        n_features = [results[method]['n_features'] for method in methods]
        
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
        
        # 准确率比较
        bars1 = ax1.bar(methods, scores, yerr=stds, capsize=5, alpha=0.7, 
                       color=plt.cm.Set3(range(len(methods))))
        ax1.set_title('不同特征选择方法的准确率比较')
        ax1.set_ylabel('交叉验证准确率')
        ax1.tick_params(axis='x', rotation=45)
        
        # 添加数值标签
        for bar, score in zip(bars1, scores):
            ax1.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
                    f'{score:.3f}', ha='center', va='bottom')
        
        # 特征数量比较
        bars2 = ax2.bar(methods, n_features, alpha=0.7, 
                       color=plt.cm.Set3(range(len(methods))))
        ax2.set_title('不同方法选择的特征数量')
        ax2.set_ylabel('特征数量')
        ax2.tick_params(axis='x', rotation=45)
        
        # 添加数值标签
        for bar, n_feat in zip(bars2, n_features):
            ax2.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.5,
                    f'{n_feat}', ha='center', va='bottom')
        
        plt.tight_layout()
        plt.show()

# 演示嵌入法特征选择
embedded_selector = EmbeddedFeatureSelector()

# Lasso特征选择
X_lasso, selected_lasso = embedded_selector.lasso_selection(X_demo, y_demo)

# 随机森林特征选择
X_rf, selected_rf = embedded_selector.tree_based_selection(X_demo, y_demo, 'random_forest')

# 极端随机树特征选择
X_et, selected_et = embedded_selector.tree_based_selection(X_demo, y_demo, 'extra_trees')

# 弹性网络特征选择
X_elastic, selected_elastic = embedded_selector.elastic_net_selection(X_demo, y_demo)

# 比较不同方法
comparison_results = embedded_selector.compare_selection_methods(X_demo, y_demo)

5. 降维技术

5.1 主成分分析 (PCA)

class DimensionalityReductionDemo:
    """降维技术演示"""
    
    def __init__(self):
        self.reducers = {}
        self.explained_variance = {}
        
    def pca_analysis(self, X, n_components=None):
        """主成分分析"""
        from sklearn.decomposition import PCA
        from sklearn.preprocessing import StandardScaler
        
        # 标准化数据
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X)
        
        # 如果未指定组件数,使用所有组件进行分析
        if n_components is None:
            n_components = min(X.shape[0], X.shape[1])
        
        pca = PCA(n_components=n_components)
        X_pca = pca.fit_transform(X_scaled)
        
        self.reducers['pca'] = pca
        self.explained_variance['pca'] = pca.explained_variance_ratio_
        
        print(f"=== PCA分析 ===")
        print(f"原始维度: {X.shape[1]}")
        print(f"降维后维度: {X_pca.shape[1]}")
        print(f"累积解释方差比: {np.cumsum(pca.explained_variance_ratio_)[:5]}")
        
        # 可视化PCA结果
        self.plot_pca_analysis(pca, X_pca)
        
        return X_pca, pca
    
    def plot_pca_analysis(self, pca, X_pca):
        """可视化PCA分析结果"""
        fig, axes = plt.subplots(2, 2, figsize=(15, 12))
        
        # 解释方差比
        axes[0, 0].bar(range(1, len(pca.explained_variance_ratio_) + 1), 
                      pca.explained_variance_ratio_, alpha=0.7)
        axes[0, 0].set_title('各主成分解释方差比')
        axes[0, 0].set_xlabel('主成分')
        axes[0, 0].set_ylabel('解释方差比')
        
        # 累积解释方差比
        cumsum_var = np.cumsum(pca.explained_variance_ratio_)
        axes[0, 1].plot(range(1, len(cumsum_var) + 1), cumsum_var, 'bo-')
        axes[0, 1].axhline(y=0.95, color='r', linestyle='--', label='95%阈值')
        axes[0, 1].set_title('累积解释方差比')
        axes[0, 1].set_xlabel('主成分数量')
        axes[0, 1].set_ylabel('累积解释方差比')
        axes[0, 1].legend()
        axes[0, 1].grid(True)
        
        # 前两个主成分的散点图
        if X_pca.shape[1] >= 2:
            axes[1, 0].scatter(X_pca[:, 0], X_pca[:, 1], alpha=0.6)
            axes[1, 0].set_title('前两个主成分散点图')
            axes[1, 0].set_xlabel('第一主成分')
            axes[1, 0].set_ylabel('第二主成分')
        
        # 主成分载荷图(如果原始特征数不太多)
        if pca.components_.shape[1] <= 20:
            im = axes[1, 1].imshow(pca.components_[:5], cmap='coolwarm', aspect='auto')
            axes[1, 1].set_title('前5个主成分载荷')
            axes[1, 1].set_xlabel('原始特征')
            axes[1, 1].set_ylabel('主成分')
            plt.colorbar(im, ax=axes[1, 1])
        
        plt.tight_layout()
        plt.show()
    
    def determine_optimal_components(self, X, variance_threshold=0.95):
        """确定最优主成分数量"""
        from sklearn.decomposition import PCA
        from sklearn.preprocessing import StandardScaler
        
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X)
        
        # 计算所有主成分
        pca_full = PCA()
        pca_full.fit(X_scaled)
        
        # 找到达到方差阈值的组件数
        cumsum_var = np.cumsum(pca_full.explained_variance_ratio_)
        n_components_95 = np.argmax(cumsum_var >= variance_threshold) + 1
        
        # 使用肘部法则
        n_components_elbow = self.find_elbow_point(pca_full.explained_variance_ratio_)
        
        print(f"=== 最优主成分数量分析 ===")
        print(f"达到{variance_threshold*100}%方差的组件数: {n_components_95}")
        print(f"肘部法则建议的组件数: {n_components_elbow}")
        
        # 可视化选择过程
        self.plot_component_selection(pca_full.explained_variance_ratio_, 
                                    n_components_95, n_components_elbow, variance_threshold)
        
        return n_components_95, n_components_elbow
    
    def find_elbow_point(self, explained_variance_ratio):
        """使用肘部法则找到最优组件数"""
        # 计算二阶导数来找到肘部点
        n_components = len(explained_variance_ratio)
        if n_components < 3:
            return 1
        
        # 计算差分
        first_diff = np.diff(explained_variance_ratio)
        second_diff = np.diff(first_diff)
        
        # 找到二阶导数最大的点(肘部)
        elbow_point = np.argmax(second_diff) + 2  # +2因为两次差分
        
        return min(elbow_point, n_components)
    
    def plot_component_selection(self, explained_variance_ratio, n_95, n_elbow, threshold):
        """可视化组件选择过程"""
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
        
        # 解释方差比图
        ax1.bar(range(1, len(explained_variance_ratio) + 1), explained_variance_ratio, alpha=0.7)
        ax1.axvline(x=n_elbow, color='red', linestyle='--', label=f'肘部法则: {n_elbow}')
        ax1.set_title('各主成分解释方差比')
        ax1.set_xlabel('主成分')
        ax1.set_ylabel('解释方差比')
        ax1.legend()
        
        # 累积方差图
        cumsum_var = np.cumsum(explained_variance_ratio)
        ax2.plot(range(1, len(cumsum_var) + 1), cumsum_var, 'bo-')
        ax2.axhline(y=threshold, color='red', linestyle='--', label=f'{threshold*100}%阈值')
        ax2.axvline(x=n_95, color='green', linestyle='--', label=f'95%方差: {n_95}')
        ax2.set_title('累积解释方差比')
        ax2.set_xlabel('主成分数量')
        ax2.set_ylabel('累积解释方差比')
        ax2.legend()
        ax2.grid(True)
        
        plt.tight_layout()
        plt.show()

# 演示PCA降维
dim_reduction_demo = DimensionalityReductionDemo()

# PCA分析
X_pca, pca_model = dim_reduction_demo.pca_analysis(X_demo, n_components=10)

# 确定最优组件数
n_95, n_elbow = dim_reduction_demo.determine_optimal_components(X_demo)

5.2 其他降维技术

class AdvancedDimensionalityReduction:
    """高级降维技术"""
    
    def __init__(self):
        self.reducers = {}
        
    def t_sne_analysis(self, X, n_components=2, perplexity=30):
        """t-SNE降维分析"""
        from sklearn.manifold import TSNE
        from sklearn.preprocessing import StandardScaler
        
        # 标准化数据
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X)
        
        # 如果数据维度太高,先用PCA降维
        if X.shape[1] > 50:
            from sklearn.decomposition import PCA
            pca = PCA(n_components=50)
            X_scaled = pca.fit_transform(X_scaled)
        
        # t-SNE降维
        tsne = TSNE(n_components=n_components, perplexity=perplexity, random_state=42)
        X_tsne = tsne.fit_transform(X_scaled)
        
        self.reducers['tsne'] = tsne
        
        print(f"=== t-SNE分析 ===")
        print(f"原始维度: {X.shape[1]}")
        print(f"降维后维度: {n_components}")
        print(f"困惑度: {perplexity}")
        
        return X_tsne, tsne
    
    def umap_analysis(self, X, n_components=2, n_neighbors=15):
        """UMAP降维分析"""
        try:
            import umap
            from sklearn.preprocessing import StandardScaler
            
            # 标准化数据
            scaler = StandardScaler()
            X_scaled = scaler.fit_transform(X)
            
            # UMAP降维
            reducer = umap.UMAP(n_components=n_components, n_neighbors=n_neighbors, random_state=42)
            X_umap = reducer.fit_transform(X_scaled)
            
            self.reducers['umap'] = reducer
            
            print(f"=== UMAP分析 ===")
            print(f"原始维度: {X.shape[1]}")
            print(f"降维后维度: {n_components}")
            print(f"邻居数: {n_neighbors}")
            
            return X_umap, reducer
            
        except ImportError:
            print("UMAP未安装,请使用 'pip install umap-learn' 安装")
            return None, None
    
    def lda_analysis(self, X, y, n_components=None):
        """线性判别分析降维"""
        from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
        from sklearn.preprocessing import StandardScaler
        
        # 标准化数据
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X)
        
        # LDA降维
        if n_components is None:
            n_components = len(np.unique(y)) - 1
        
        lda = LinearDiscriminantAnalysis(n_components=n_components)
        X_lda = lda.fit_transform(X_scaled, y)
        
        self.reducers['lda'] = lda
        
        print(f"=== LDA分析 ===")
        print(f"原始维度: {X.shape[1]}")
        print(f"降维后维度: {X_lda.shape[1]}")
        print(f"解释方差比: {lda.explained_variance_ratio_}")
        
        return X_lda, lda
    
    def compare_dimensionality_reduction(self, X, y=None):
        """比较不同降维方法"""
        from sklearn.preprocessing import StandardScaler
        from sklearn.decomposition import PCA
        from sklearn.manifold import TSNE
        
        # 标准化数据
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X)
        
        # 如果数据太大,先采样
        if X.shape[0] > 1000:
            indices = np.random.choice(X.shape[0], 1000, replace=False)
            X_sample = X_scaled[indices]
            y_sample = y[indices] if y is not None else None
        else:
            X_sample = X_scaled
            y_sample = y
        
        methods = {}
        
        # PCA
        pca = PCA(n_components=2)
        X_pca = pca.fit_transform(X_sample)
        methods['PCA'] = X_pca
        
        # t-SNE
        tsne = TSNE(n_components=2, random_state=42)
        X_tsne = tsne.fit_transform(X_sample)
        methods['t-SNE'] = X_tsne
        
        # LDA (如果有标签)
        if y_sample is not None:
            from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
            lda = LinearDiscriminantAnalysis(n_components=2)
            X_lda = lda.fit_transform(X_sample, y_sample)
            methods['LDA'] = X_lda
        
        # 可视化比较
        self.plot_dimensionality_reduction_comparison(methods, y_sample)
        
        return methods
    
    def plot_dimensionality_reduction_comparison(self, methods, y=None):
        """可视化降维方法比较"""
        n_methods = len(methods)
        fig, axes = plt.subplots(1, n_methods, figsize=(5*n_methods, 5))
        
        if n_methods == 1:
            axes = [axes]
        
        for i, (method_name, X_reduced) in enumerate(methods.items()):
            if y is not None:
                scatter = axes[i].scatter(X_reduced[:, 0], X_reduced[:, 1], 
                                        c=y, cmap='tab10', alpha=0.6)
                plt.colorbar(scatter, ax=axes[i])
            else:
                axes[i].scatter(X_reduced[:, 0], X_reduced[:, 1], alpha=0.6)
            
            axes[i].set_title(f'{method_name}降维结果')
            axes[i].set_xlabel('第一维')
            axes[i].set_ylabel('第二维')
        
        plt.tight_layout()
        plt.show()

# 演示高级降维技术
advanced_reduction = AdvancedDimensionalityReduction()

# t-SNE降维
X_tsne, tsne_model = advanced_reduction.t_sne_analysis(X_demo)

# LDA降维(需要标签)
X_lda, lda_model = advanced_reduction.lda_analysis(X_demo, y_demo)

# 比较不同降维方法
reduction_comparison = advanced_reduction.compare_dimensionality_reduction(X_demo, y_demo)

6. 数据预处理管道

6.1 构建预处理管道

class PreprocessingPipeline:
    """数据预处理管道"""
    
    def __init__(self):
        self.pipeline = None
        self.feature_names = None
        
    def create_basic_pipeline(self, numeric_features, categorical_features):
        """创建基础预处理管道"""
        from sklearn.pipeline import Pipeline
        from sklearn.compose import ColumnTransformer
        from sklearn.preprocessing import StandardScaler, OneHotEncoder
        from sklearn.impute import SimpleImputer
        
        # 数值特征预处理
        numeric_transformer = Pipeline(steps=[
            ('imputer', SimpleImputer(strategy='median')),
            ('scaler', StandardScaler())
        ])
        
        # 分类特征预处理
        categorical_transformer = Pipeline(steps=[
            ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
            ('onehot', OneHotEncoder(handle_unknown='ignore'))
        ])
        
        # 组合预处理器
        preprocessor = ColumnTransformer(
            transformers=[
                ('num', numeric_transformer, numeric_features),
                ('cat', categorical_transformer, categorical_features)
            ])
        
        self.pipeline = preprocessor
        return preprocessor
    
    def create_advanced_pipeline(self, numeric_features, categorical_features, 
                               feature_selection=True, dimensionality_reduction=True):
        """创建高级预处理管道"""
        from sklearn.pipeline import Pipeline
        from sklearn.compose import ColumnTransformer
        from sklearn.preprocessing import StandardScaler, OneHotEncoder, RobustScaler
        from sklearn.impute import KNNImputer, SimpleImputer
        from sklearn.feature_selection import SelectKBest, f_classif
        from sklearn.decomposition import PCA
        
        # 数值特征预处理(使用更高级的方法)
        numeric_transformer = Pipeline(steps=[
            ('imputer', KNNImputer(n_neighbors=5)),
            ('scaler', RobustScaler())  # 对异常值更鲁棒
        ])
        
        # 分类特征预处理
        categorical_transformer = Pipeline(steps=[
            ('imputer', SimpleImputer(strategy='most_frequent')),
            ('onehot', OneHotEncoder(handle_unknown='ignore', sparse=False))
        ])
        
        # 组合预处理器
        preprocessor = ColumnTransformer(
            transformers=[
                ('num', numeric_transformer, numeric_features),
                ('cat', categorical_transformer, categorical_features)
            ])
        
        # 构建完整管道
        pipeline_steps = [('preprocessor', preprocessor)]
        
        if feature_selection:
            pipeline_steps.append(('feature_selection', SelectKBest(f_classif, k=10)))
        
        if dimensionality_reduction:
            pipeline_steps.append(('pca', PCA(n_components=0.95)))
        
        self.pipeline = Pipeline(pipeline_steps)
        return self.pipeline
    
    def create_custom_pipeline(self, config):
        """根据配置创建自定义管道"""
        from sklearn.pipeline import Pipeline
        from sklearn.compose import ColumnTransformer
        
        transformers = []
        
        for feature_type, settings in config.items():
            if feature_type == 'numeric':
                transformer = self._create_numeric_transformer(settings)
                transformers.append(('num', transformer, settings['features']))
            elif feature_type == 'categorical':
                transformer = self._create_categorical_transformer(settings)
                transformers.append(('cat', transformer, settings['features']))
        
        preprocessor = ColumnTransformer(transformers=transformers)
        
        # 添加后处理步骤
        pipeline_steps = [('preprocessor', preprocessor)]
        
        if 'feature_selection' in config:
            fs_config = config['feature_selection']
            pipeline_steps.append(('feature_selection', 
                                 self._create_feature_selector(fs_config)))
        
        if 'dimensionality_reduction' in config:
            dr_config = config['dimensionality_reduction']
            pipeline_steps.append(('dim_reduction', 
                                 self._create_dim_reducer(dr_config)))
        
        self.pipeline = Pipeline(pipeline_steps)
        return self.pipeline
    
    def _create_numeric_transformer(self, settings):
        """创建数值特征转换器"""
        from sklearn.pipeline import Pipeline
        from sklearn.impute import SimpleImputer, KNNImputer
        from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
        
        steps = []
        
        # 缺失值处理
        if settings.get('imputation') == 'mean':
            steps.append(('imputer', SimpleImputer(strategy='mean')))
        elif settings.get('imputation') == 'median':
            steps.append(('imputer', SimpleImputer(strategy='median')))
        elif settings.get('imputation') == 'knn':
            steps.append(('imputer', KNNImputer(n_neighbors=5)))
        
        # 缩放
        if settings.get('scaling') == 'standard':
            steps.append(('scaler', StandardScaler()))
        elif settings.get('scaling') == 'minmax':
            steps.append(('scaler', MinMaxScaler()))
        elif settings.get('scaling') == 'robust':
            steps.append(('scaler', RobustScaler()))
        
        return Pipeline(steps)
    
    def _create_categorical_transformer(self, settings):
        """创建分类特征转换器"""
        from sklearn.pipeline import Pipeline
        from sklearn.impute import SimpleImputer
        from sklearn.preprocessing import OneHotEncoder, LabelEncoder, OrdinalEncoder
        
        steps = []
        
        # 缺失值处理
        if settings.get('imputation') == 'mode':
            steps.append(('imputer', SimpleImputer(strategy='most_frequent')))
        elif settings.get('imputation') == 'constant':
            steps.append(('imputer', SimpleImputer(strategy='constant', fill_value='missing')))
        
        # 编码
        if settings.get('encoding') == 'onehot':
            steps.append(('encoder', OneHotEncoder(handle_unknown='ignore', sparse=False)))
        elif settings.get('encoding') == 'ordinal':
            steps.append(('encoder', OrdinalEncoder(handle_unknown='use_encoded_value', 
                                                  unknown_value=-1)))
        
        return Pipeline(steps)
    
    def _create_feature_selector(self, config):
        """创建特征选择器"""
        from sklearn.feature_selection import SelectKBest, f_classif, chi2, RFE
        from sklearn.ensemble import RandomForestClassifier
        
        if config['method'] == 'univariate':
            return SelectKBest(f_classif, k=config.get('k', 10))
        elif config['method'] == 'rfe':
            estimator = RandomForestClassifier(n_estimators=100, random_state=42)
            return RFE(estimator, n_features_to_select=config.get('k', 10))
    
    def _create_dim_reducer(self, config):
        """创建降维器"""
        from sklearn.decomposition import PCA
        from sklearn.manifold import TSNE
        
        if config['method'] == 'pca':
            return PCA(n_components=config.get('n_components', 0.95))
        elif config['method'] == 'tsne':
            return TSNE(n_components=config.get('n_components', 2), random_state=42)
    
    def fit_transform_pipeline(self, X, y=None):
        """拟合并转换数据"""
        if self.pipeline is None:
            raise ValueError("管道未创建,请先调用create_*_pipeline方法")
        
        X_transformed = self.pipeline.fit_transform(X, y)
        
        # 尝试获取特征名称
        try:
            if hasattr(self.pipeline, 'get_feature_names_out'):
                self.feature_names = self.pipeline.get_feature_names_out()
            else:
                # 对于较老版本的sklearn
                self.feature_names = [f'feature_{i}' for i in range(X_transformed.shape[1])]
        except:
            self.feature_names = [f'feature_{i}' for i in range(X_transformed.shape[1])]
        
        print(f"=== 管道处理结果 ===")
        print(f"原始数据形状: {X.shape}")
        print(f"处理后数据形状: {X_transformed.shape}")
        print(f"特征数量变化: {X.shape[1]} -> {X_transformed.shape[1]}")
        
        return X_transformed
    
    def transform_pipeline(self, X):
        """使用已拟合的管道转换新数据"""
        if self.pipeline is None:
            raise ValueError("管道未创建或未拟合")
        
        return self.pipeline.transform(X)
    
    def get_pipeline_info(self):
        """获取管道信息"""
        if self.pipeline is None:
            return "管道未创建"
        
        info = "=== 管道信息 ===\n"
        
        if hasattr(self.pipeline, 'steps'):
            for step_name, step_transformer in self.pipeline.steps:
                info += f"步骤: {step_name}\n"
                info += f"  转换器: {type(step_transformer).__name__}\n"
                
                if hasattr(step_transformer, 'transformers'):
                    for name, transformer, features in step_transformer.transformers:
                        info += f"    {name}: {type(transformer).__name__}\n"
                        info += f"      特征: {features}\n"
        
        return info

# 演示预处理管道
pipeline_demo = PreprocessingPipeline()

# 准备特征列表
numeric_features = ['age', 'income', 'education_years']
categorical_features = ['gender', 'city']

# 创建基础管道
basic_pipeline = pipeline_demo.create_basic_pipeline(numeric_features, categorical_features)

# 使用管道处理数据
X_processed = pipeline_demo.fit_transform_pipeline(data_encoded[numeric_features + categorical_features])

# 显示管道信息
print(pipeline_demo.get_pipeline_info())

# 创建高级管道配置
advanced_config = {
    'numeric': {
        'features': numeric_features,
        'imputation': 'knn',
        'scaling': 'robust'
    },
    'categorical': {
        'features': categorical_features,
        'imputation': 'mode',
        'encoding': 'onehot'
    },
    'feature_selection': {
        'method': 'univariate',
        'k': 8
    },
    'dimensionality_reduction': {
        'method': 'pca',
        'n_components': 0.95
    }
}

# 创建自定义管道
custom_pipeline = pipeline_demo.create_custom_pipeline(advanced_config)
X_custom_processed = pipeline_demo.fit_transform_pipeline(data_encoded[numeric_features + categorical_features])

7. 实战案例

7.1 房价预测数据预处理

class HousePricePredictionPreprocessing:
    """房价预测数据预处理案例"""
    
    def __init__(self):
        self.pipeline = None
        self.feature_info = {}
        
    def create_house_price_dataset(self):
        """创建房价数据集"""
        np.random.seed(42)
        n_samples = 2000
        
        # 基础特征
        area = np.random.normal(120, 40, n_samples)  # 面积
        bedrooms = np.random.poisson(3, n_samples)   # 卧室数
        bathrooms = np.random.poisson(2, n_samples)  # 浴室数
        age = np.random.exponential(10, n_samples)   # 房龄
        
        # 分类特征
        location = np.random.choice(['市中心', '郊区', '新区'], n_samples, p=[0.3, 0.5, 0.2])
        house_type = np.random.choice(['公寓', '别墅', '联排'], n_samples, p=[0.6, 0.2, 0.2])
        
        # 添加缺失值和异常值
        area[np.random.choice(n_samples, 100, replace=False)] = np.nan
        age[np.random.choice(n_samples, 50, replace=False)] = np.nan
        area[np.random.choice(n_samples, 20, replace=False)] *= 3  # 异常值
        
        # 计算目标变量(房价)
        price = (area * 8000 + 
                bedrooms * 50000 + 
                bathrooms * 30000 - 
                age * 2000 +
                np.where(location == '市中心', 200000, 
                        np.where(location == '新区', 100000, 0)) +
                np.where(house_type == '别墅', 300000,
                        np.where(house_type == '联排', 100000, 0)) +
                np.random.normal(0, 50000, n_samples))
        
        # 确保价格为正
        price = np.maximum(price, 100000)
        
        data = pd.DataFrame({
            'area': area,
            'bedrooms': bedrooms,
            'bathrooms': bathrooms,
            'age': age,
            'location': location,
            'house_type': house_type,
            'price': price
        })
        
        return data
    
    def analyze_house_data(self, data):
        """分析房价数据"""
        print("=== 房价数据分析 ===")
        print(f"数据形状: {data.shape}")
        print(f"\n基本统计信息:")
        print(data.describe())
        
        print(f"\n缺失值情况:")
        print(data.isnull().sum())
        
        print(f"\n分类特征分布:")
        for col in ['location', 'house_type']:
            print(f"{col}:")
            print(data[col].value_counts())
            print()
        
        # 可视化数据分布
        self.visualize_house_data(data)
        
    def visualize_house_data(self, data):
        """可视化房价数据"""
        fig, axes = plt.subplots(3, 3, figsize=(18, 15))
        fig.suptitle('房价数据分析', fontsize=16)
        
        # 数值特征分布
        numeric_cols = ['area', 'bedrooms', 'bathrooms', 'age', 'price']
        for i, col in enumerate(numeric_cols):
            row, col_idx = i // 3, i % 3
            axes[row, col_idx].hist(data[col].dropna(), bins=30, alpha=0.7)
            axes[row, col_idx].set_title(f'{col} 分布')
            axes[row, col_idx].set_xlabel(col)
            axes[row, col_idx].set_ylabel('频次')
        
        # 分类特征分布
        data['location'].value_counts().plot(kind='bar', ax=axes[1, 2])
        axes[1, 2].set_title('位置分布')
        axes[1, 2].tick_params(axis='x', rotation=45)
        
        data['house_type'].value_counts().plot(kind='bar', ax=axes[2, 0])
        axes[2, 0].set_title('房型分布')
        axes[2, 0].tick_params(axis='x', rotation=45)
        
        # 价格与面积关系
        axes[2, 1].scatter(data['area'], data['price'], alpha=0.6)
        axes[2, 1].set_title('价格 vs 面积')
        axes[2, 1].set_xlabel('面积')
        axes[2, 1].set_ylabel('价格')
        
        # 不同位置的价格分布
        for location in data['location'].unique():
            subset = data[data['location'] == location]['price']
            axes[2, 2].hist(subset, alpha=0.6, label=location, bins=20)
        axes[2, 2].set_title('不同位置的价格分布')
        axes[2, 2].set_xlabel('价格')
        axes[2, 2].set_ylabel('频次')
        axes[2, 2].legend()
        
        plt.tight_layout()
        plt.show()
    
    def create_house_price_pipeline(self):
        """创建房价预测预处理管道"""
        from sklearn.pipeline import Pipeline
        from sklearn.compose import ColumnTransformer
        from sklearn.preprocessing import StandardScaler, OneHotEncoder
        from sklearn.impute import KNNImputer, SimpleImputer
        from sklearn.feature_selection import SelectKBest, f_regression
        
        # 数值特征预处理
        numeric_features = ['area', 'bedrooms', 'bathrooms', 'age']
        numeric_transformer = Pipeline(steps=[
            ('imputer', KNNImputer(n_neighbors=5)),
            ('scaler', StandardScaler())
        ])
        
        # 分类特征预处理
        categorical_features = ['location', 'house_type']
        categorical_transformer = Pipeline(steps=[
            ('imputer', SimpleImputer(strategy='most_frequent')),
            ('onehot', OneHotEncoder(handle_unknown='ignore', sparse=False))
        ])
        
        # 组合预处理器
        preprocessor = ColumnTransformer(
            transformers=[
                ('num', numeric_transformer, numeric_features),
                ('cat', categorical_transformer, categorical_features)
            ])
        
        # 完整管道
        self.pipeline = Pipeline([
            ('preprocessor', preprocessor),
            ('feature_selection', SelectKBest(f_regression, k=8))
        ])
        
        return self.pipeline
    
    def process_house_data(self, data):
        """处理房价数据"""
        # 分离特征和目标
        X = data.drop('price', axis=1)
        y = data['price']
        
        # 创建并应用管道
        pipeline = self.create_house_price_pipeline()
        X_processed = pipeline.fit_transform(X, y)
        
        print(f"=== 房价数据预处理结果 ===")
        print(f"原始特征数: {X.shape[1]}")
        print(f"处理后特征数: {X_processed.shape[1]}")
        
        # 评估预处理效果
        self.evaluate_preprocessing_effect(X, y, X_processed)
        
        return X_processed, y
    
    def evaluate_preprocessing_effect(self, X_original, y, X_processed):
        """评估预处理效果"""
        from sklearn.model_selection import cross_val_score
        from sklearn.ensemble import RandomForestRegressor
        from sklearn.linear_model import LinearRegression
        from sklearn.preprocessing import StandardScaler
        
        # 准备原始数据(简单处理)
        X_simple = X_original.copy()
        
        # 简单填充缺失值
        for col in X_simple.select_dtypes(include=[np.number]).columns:
            X_simple[col].fillna(X_simple[col].median(), inplace=True)
        
        # 简单编码分类变量
        X_simple_encoded = pd.get_dummies(X_simple, drop_first=True)
        
        # 标准化
        scaler = StandardScaler()
        X_simple_scaled = scaler.fit_transform(X_simple_encoded)
        
        # 比较模型性能
        models = {
            'LinearRegression': LinearRegression(),
            'RandomForest': RandomForestRegressor(n_estimators=100, random_state=42)
        }
        
        results = {}
        
        for model_name, model in models.items():
            # 简单预处理的结果
            scores_simple = cross_val_score(model, X_simple_scaled, y, cv=5, 
                                          scoring='neg_mean_squared_error')
            
            # 高级预处理的结果
            scores_advanced = cross_val_score(model, X_processed, y, cv=5, 
                                            scoring='neg_mean_squared_error')
            
            results[model_name] = {
                'simple': -scores_simple.mean(),
                'advanced': -scores_advanced.mean(),
                'improvement': ((-scores_simple.mean()) - (-scores_advanced.mean())) / (-scores_simple.mean()) * 100
            }
        
        print("\n=== 预处理效果比较 (MSE) ===")
        for model_name, result in results.items():
            print(f"{model_name}:")
            print(f"  简单预处理: {result['simple']:.2f}")
            print(f"  高级预处理: {result['advanced']:.2f}")
            print(f"  改进幅度: {result['improvement']:.2f}%")
            print()

# 演示房价预测数据预处理
house_demo = HousePricePredictionPreprocessing()

# 创建数据
house_data = house_demo.create_house_price_dataset()

# 分析数据
house_demo.analyze_house_data(house_data)

# 处理数据
X_house_processed, y_house = house_demo.process_house_data(house_data)

本章小结

🎯 核心概念回顾

  1. 数据预处理的重要性

    • 占据机器学习项目80%的时间
    • 直接影响模型性能
    • 包括数据清洗、转换、特征工程等
  2. 数据清洗技术

    • 缺失值处理:删除、填充、插值
    • 异常值检测:统计方法、机器学习方法
    • 数据类型优化:内存使用、类型转换
  3. 特征缩放方法

    • 标准化:零均值单位方差
    • 归一化:缩放到[0,1]区间
    • 鲁棒缩放:对异常值鲁棒
  4. 特征选择技术

    • 过滤法:统计检验、相关性分析
    • 包装法:递归特征消除、序列选择
    • 嵌入法:L1正则化、树模型重要性
  5. 降维技术

    • PCA:线性降维、保持方差
    • t-SNE:非线性降维、可视化
    • LDA:监督降维、类别分离

💡 最佳实践

  1. 数据探索优先

    • 始终先进行数据探索分析
    • 理解数据分布和质量问题
    • 可视化数据特征
  2. 管道化处理

    • 使用Pipeline组织预处理步骤
    • 确保训练和测试数据一致性
    • 便于参数调优和模型部署
  3. 方法选择指南

    • 根据数据特征选择合适方法
    • 考虑算法对预处理的要求
    • 平衡处理效果和计算成本

⚠️ 常见陷阱

  1. 数据泄露

    • 在分割数据前进行预处理
    • 使用训练集统计量处理测试集
  2. 过度预处理

    • 不是所有算法都需要缩放
    • 避免不必要的特征变换
  3. 忽视业务逻辑

    • 结合领域知识进行预处理
    • 保持特征的可解释性

🚀 下一步学习

📝 练习题

  1. 基础练习:使用不同的缺失值填充方法处理数据,比较效果
  2. 进阶练习:构建一个完整的预处理管道,包含所有主要步骤
  3. 实战练习:在真实数据集上应用本章学到的技术

恭喜! 🎉 您已经完成了数据预处理章节的学习。数据预处理是机器学习成功的基础,掌握这些技术将为后续的模型训练打下坚实基础。


第2章完结