特征工程是机器学习项目中最重要的环节之一,往往决定了模型的上限。好的特征工程能够显著提升模型性能,而糟糕的特征可能让最先进的算法也无法发挥作用。本章将深入探讨特征工程的各个方面,从基础的特征处理到高级的特征构造技术。


8.1 特征工程概述

8.1.1 什么是特征工程

特征工程是利用领域知识和数据科学技术,从原始数据中提取、构造和选择特征的过程。它包括:

  • 特征提取:从原始数据中提取有用信息
  • 特征构造:基于现有特征创建新特征
  • 特征选择:选择最相关的特征子集
  • 特征变换:改变特征的分布或尺度
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import make_classification, load_boston, fetch_20newsgroups
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.preprocessing import LabelEncoder, OneHotEncoder, OrdinalEncoder
from sklearn.preprocessing import PolynomialFeatures, PowerTransformer
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif
from sklearn.feature_selection import RFE, SelectFromModel
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.metrics import accuracy_score, mean_squared_error
import warnings
warnings.filterwarnings('ignore')

# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

class FeatureEngineeringIntro:
    def __init__(self):
        self.data = None
        
    def feature_engineering_importance(self):
        """演示特征工程的重要性"""
        print("=== 特征工程重要性演示 ===")
        
        # 创建一个简单的数据集
        np.random.seed(42)
        n_samples = 1000
        
        # 原始特征
        x1 = np.random.normal(0, 1, n_samples)
        x2 = np.random.normal(0, 1, n_samples)
        
        # 目标变量与特征的非线性关系
        y = (x1**2 + x2**2 + x1*x2 + np.random.normal(0, 0.1, n_samples)) > 1
        y = y.astype(int)
        
        # 原始特征
        X_original = np.column_stack([x1, x2])
        
        # 工程特征
        X_engineered = np.column_stack([
            x1, x2,                    # 原始特征
            x1**2, x2**2,             # 平方特征
            x1*x2,                    # 交互特征
            np.abs(x1), np.abs(x2),   # 绝对值特征
            x1**3, x2**3              # 立方特征
        ])
        
        # 比较模型性能
        models = {
            '原始特征': X_original,
            '工程特征': X_engineered
        }
        
        results = {}
        
        for name, X in models.items():
            # 划分数据集
            X_train, X_test, y_train, y_test = train_test_split(
                X, y, test_size=0.3, random_state=42
            )
            
            # 训练模型
            model = LogisticRegression(random_state=42)
            model.fit(X_train, y_train)
            
            # 评估性能
            train_score = model.score(X_train, y_train)
            test_score = model.score(X_test, y_test)
            
            results[name] = {
                'train_score': train_score,
                'test_score': test_score,
                'n_features': X.shape[1]
            }
            
            print(f"{name}:")
            print(f"  特征数量: {X.shape[1]}")
            print(f"  训练准确率: {train_score:.3f}")
            print(f"  测试准确率: {test_score:.3f}")
            print()
        
        # 可视化结果
        self.visualize_feature_importance_demo(X_original, X_engineered, y, results)
        
        return results
        
    def visualize_feature_importance_demo(self, X_original, X_engineered, y, results):
        """可视化特征工程重要性"""
        fig, axes = plt.subplots(2, 2, figsize=(15, 12))
        
        # 1. 原始数据分布
        scatter = axes[0, 0].scatter(X_original[:, 0], X_original[:, 1], 
                                   c=y, cmap='viridis', alpha=0.6)
        axes[0, 0].set_xlabel('特征1')
        axes[0, 0].set_ylabel('特征2')
        axes[0, 0].set_title('原始特征分布')
        plt.colorbar(scatter, ax=axes[0, 0])
        
        # 2. 工程特征相关性
        feature_names = ['x1', 'x2', 'x1²', 'x2²', 'x1*x2', '|x1|', '|x2|', 'x1³', 'x2³']
        corr_matrix = np.corrcoef(X_engineered.T)
        
        im = axes[0, 1].imshow(corr_matrix, cmap='coolwarm', vmin=-1, vmax=1)
        axes[0, 1].set_xticks(range(len(feature_names)))
        axes[0, 1].set_yticks(range(len(feature_names)))
        axes[0, 1].set_xticklabels(feature_names, rotation=45)
        axes[0, 1].set_yticklabels(feature_names)
        axes[0, 1].set_title('工程特征相关性矩阵')
        plt.colorbar(im, ax=axes[0, 1])
        
        # 3. 性能比较
        methods = list(results.keys())
        train_scores = [results[method]['train_score'] for method in methods]
        test_scores = [results[method]['test_score'] for method in methods]
        
        x = np.arange(len(methods))
        width = 0.35
        
        axes[1, 0].bar(x - width/2, train_scores, width, label='训练准确率', alpha=0.8)
        axes[1, 0].bar(x + width/2, test_scores, width, label='测试准确率', alpha=0.8)
        axes[1, 0].set_xlabel('方法')
        axes[1, 0].set_ylabel('准确率')
        axes[1, 0].set_title('性能比较')
        axes[1, 0].set_xticks(x)
        axes[1, 0].set_xticklabels(methods)
        axes[1, 0].legend()
        axes[1, 0].grid(True, alpha=0.3)
        
        # 4. 特征数量对比
        n_features = [results[method]['n_features'] for method in methods]
        axes[1, 1].bar(methods, n_features, color=['skyblue', 'lightcoral'])
        axes[1, 1].set_xlabel('方法')
        axes[1, 1].set_ylabel('特征数量')
        axes[1, 1].set_title('特征数量对比')
        axes[1, 1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
    def feature_engineering_workflow(self):
        """特征工程工作流程"""
        print("\n=== 特征工程工作流程 ===")
        
        workflow_steps = {
            "1. 数据理解": [
                "探索数据结构和类型",
                "识别缺失值和异常值",
                "理解业务背景和目标",
                "分析特征与目标的关系"
            ],
            "2. 数据清洗": [
                "处理缺失值",
                "检测和处理异常值",
                "数据类型转换",
                "重复数据处理"
            ],
            "3. 特征变换": [
                "数值特征标准化/归一化",
                "类别特征编码",
                "分布变换(对数、Box-Cox等)",
                "离散化处理"
            ],
            "4. 特征构造": [
                "多项式特征",
                "交互特征",
                "聚合特征",
                "时间特征提取"
            ],
            "5. 特征选择": [
                "过滤法(统计检验)",
                "包装法(递归特征消除)",
                "嵌入法(基于模型)",
                "降维技术"
            ],
            "6. 特征验证": [
                "交叉验证评估",
                "特征重要性分析",
                "模型性能对比",
                "业务价值评估"
            ]
        }
        
        for step, details in workflow_steps.items():
            print(f"\n{step}:")
            for detail in details:
                print(f"  • {detail}")
                
        print("\n=== 特征工程最佳实践 ===")
        best_practices = [
            "始终基于业务理解进行特征工程",
            "保持训练集和测试集的一致性",
            "避免数据泄露(不使用未来信息)",
            "记录所有特征工程步骤",
            "使用交叉验证评估特征效果",
            "平衡特征数量和模型复杂度",
            "考虑特征的可解释性",
            "定期重新评估特征的有效性"
        ]
        
        for i, practice in enumerate(best_practices, 1):
            print(f"  {i}. {practice}")

# 演示特征工程概述
print("=== 特征工程概述 ===")
intro = FeatureEngineeringIntro()
importance_results = intro.feature_engineering_importance()
intro.feature_engineering_workflow()

8.2 数值特征处理

8.2.1 特征缩放与标准化

不同的特征往往具有不同的量纲和数值范围,需要进行适当的缩放处理。

class NumericalFeatureProcessing:
    def __init__(self):
        self.scalers = {}
        
    def scaling_methods_comparison(self):
        """比较不同的特征缩放方法"""
        print("=== 特征缩放方法比较 ===")
        
        # 创建具有不同分布的数据
        np.random.seed(42)
        n_samples = 1000
        
        # 不同分布的特征
        normal_feature = np.random.normal(50, 15, n_samples)
        uniform_feature = np.random.uniform(0, 100, n_samples)
        exponential_feature = np.random.exponential(2, n_samples)
        outlier_feature = np.concatenate([
            np.random.normal(10, 2, int(0.95 * n_samples)),
            np.random.normal(50, 5, int(0.05 * n_samples))
        ])
        
        # 组合数据
        X = np.column_stack([normal_feature, uniform_feature, 
                           exponential_feature, outlier_feature])
        
        feature_names = ['正态分布', '均匀分布', '指数分布', '含异常值']
        
        # 不同的缩放方法
        scalers = {
            '原始数据': None,
            '标准化 (StandardScaler)': StandardScaler(),
            '最小-最大缩放 (MinMaxScaler)': MinMaxScaler(),
            '鲁棒缩放 (RobustScaler)': RobustScaler(),
            '最大绝对值缩放': MinMaxScaler(feature_range=(-1, 1))
        }
        
        # 应用不同的缩放方法
        scaled_data = {}
        
        for name, scaler in scalers.items():
            if scaler is None:
                scaled_data[name] = X
            else:
                scaled_data[name] = scaler.fit_transform(X)
                
        # 可视化结果
        self.visualize_scaling_methods(scaled_data, feature_names)
        
        # 统计信息比较
        self.compare_scaling_statistics(scaled_data, feature_names)
        
        return scaled_data
        
    def visualize_scaling_methods(self, scaled_data, feature_names):
        """可视化不同缩放方法的效果"""
        n_methods = len(scaled_data)
        n_features = len(feature_names)
        
        fig, axes = plt.subplots(n_methods, n_features, figsize=(20, 15))
        
        for i, (method_name, data) in enumerate(scaled_data.items()):
            for j, feature_name in enumerate(feature_names):
                ax = axes[i, j] if n_methods > 1 else axes[j]
                
                # 绘制直方图
                ax.hist(data[:, j], bins=50, alpha=0.7, density=True)
                ax.set_title(f'{method_name}\n{feature_name}')
                ax.grid(True, alpha=0.3)
                
                # 添加统计信息
                mean_val = np.mean(data[:, j])
                std_val = np.std(data[:, j])
                ax.axvline(mean_val, color='red', linestyle='--', 
                          label=f'均值: {mean_val:.2f}')
                ax.axvline(mean_val + std_val, color='orange', linestyle='--', 
                          label=f'±1σ: {std_val:.2f}')
                ax.axvline(mean_val - std_val, color='orange', linestyle='--')
                
                if j == 0:  # 只在第一列显示图例
                    ax.legend(fontsize=8)
        
        plt.tight_layout()
        plt.show()
        
    def compare_scaling_statistics(self, scaled_data, feature_names):
        """比较缩放后的统计信息"""
        print("\n=== 缩放方法统计信息比较 ===")
        
        for method_name, data in scaled_data.items():
            print(f"\n{method_name}:")
            for j, feature_name in enumerate(feature_names):
                feature_data = data[:, j]
                print(f"  {feature_name:10}: "
                      f"均值={np.mean(feature_data):6.2f}, "
                      f"标准差={np.std(feature_data):6.2f}, "
                      f"最小值={np.min(feature_data):6.2f}, "
                      f"最大值={np.max(feature_data):6.2f}")
                      
    def distribution_transformation(self):
        """分布变换技术"""
        print("\n=== 分布变换技术 ===")
        
        # 创建偏态分布数据
        np.random.seed(42)
        n_samples = 1000
        
        # 右偏分布
        right_skewed = np.random.exponential(2, n_samples)
        # 左偏分布  
        left_skewed = 10 - np.random.exponential(2, n_samples)
        # 双峰分布
        bimodal = np.concatenate([
            np.random.normal(2, 0.5, n_samples//2),
            np.random.normal(8, 0.5, n_samples//2)
        ])
        
        X = np.column_stack([right_skewed, left_skewed, bimodal])
        feature_names = ['右偏分布', '左偏分布', '双峰分布']
        
        # 不同的变换方法
        transformers = {
            '原始数据': None,
            '对数变换': lambda x: np.log1p(x - x.min() + 1),
            '平方根变换': lambda x: np.sqrt(x - x.min()),
            'Box-Cox变换': PowerTransformer(method='box-cox'),
            'Yeo-Johnson变换': PowerTransformer(method='yeo-johnson')
        }
        
        # 应用变换
        transformed_data = {}
        
        for name, transformer in transformers.items():
            if transformer is None:
                transformed_data[name] = X
            elif callable(transformer):
                # 自定义函数变换
                transformed_data[name] = np.column_stack([
                    transformer(X[:, i]) for i in range(X.shape[1])
                ])
            else:
                # sklearn变换器
                transformed_data[name] = transformer.fit_transform(X)
        
        # 可视化变换效果
        self.visualize_distribution_transformation(transformed_data, feature_names)
        
        return transformed_data
        
    def visualize_distribution_transformation(self, transformed_data, feature_names):
        """可视化分布变换效果"""
        n_methods = len(transformed_data)
        n_features = len(feature_names)
        
        fig, axes = plt.subplots(n_features, n_methods, figsize=(20, 12))
        
        for i, feature_name in enumerate(feature_names):
            for j, (method_name, data) in enumerate(transformed_data.items()):
                ax = axes[i, j]
                
                # 绘制直方图和密度曲线
                feature_data = data[:, i]
                ax.hist(feature_data, bins=50, alpha=0.7, density=True, 
                       color='skyblue', edgecolor='black')
                
                # 添加统计信息
                from scipy import stats
                skewness = stats.skew(feature_data)
                kurtosis = stats.kurtosis(feature_data)
                
                ax.set_title(f'{feature_name}\n{method_name}\n'
                           f'偏度: {skewness:.2f}, 峰度: {kurtosis:.2f}')
                ax.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
    def discretization_techniques(self):
        """离散化技术"""
        print("\n=== 离散化技术 ===")
        
        # 创建连续数据
        np.random.seed(42)
        n_samples = 1000
        continuous_feature = np.random.normal(50, 15, n_samples)
        
        # 不同的离散化方法
        from sklearn.preprocessing import KBinsDiscretizer
        
        discretizers = {
            '等宽离散化': KBinsDiscretizer(n_bins=5, encode='ordinal', strategy='uniform'),
            '等频离散化': KBinsDiscretizer(n_bins=5, encode='ordinal', strategy='quantile'),
            'K-means离散化': KBinsDiscretizer(n_bins=5, encode='ordinal', strategy='kmeans')
        }
        
        # 应用离散化
        discretized_results = {}
        
        for name, discretizer in discretizers.items():
            discretized = discretizer.fit_transform(continuous_feature.reshape(-1, 1))
            discretized_results[name] = {
                'discretized': discretized.flatten(),
                'bin_edges': discretizer.bin_edges_[0]
            }
        
        # 可视化离散化效果
        self.visualize_discretization(continuous_feature, discretized_results)
        
        return discretized_results
        
    def visualize_discretization(self, original_data, discretized_results):
        """可视化离散化效果"""
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        
        # 原始数据分布
        axes[0, 0].hist(original_data, bins=50, alpha=0.7, density=True)
        axes[0, 0].set_title('原始连续数据分布')
        axes[0, 0].set_xlabel('数值')
        axes[0, 0].set_ylabel('密度')
        axes[0, 0].grid(True, alpha=0.3)
        
        # 不同离散化方法的结果
        for i, (name, result) in enumerate(discretized_results.items()):
            row = (i + 1) // 2
            col = (i + 1) % 2
            ax = axes[row, col]
            
            # 绘制离散化结果
            discretized = result['discretized']
            bin_edges = result['bin_edges']
            
            # 直方图
            ax.hist(original_data, bins=bin_edges, alpha=0.5, density=True, 
                   label='原始数据')
            
            # 离散化边界
            for edge in bin_edges:
                ax.axvline(edge, color='red', linestyle='--', alpha=0.7)
            
            # 离散化后的分布
            unique_bins, counts = np.unique(discretized, return_counts=True)
            bin_centers = [(bin_edges[int(b)] + bin_edges[int(b)+1]) / 2 
                          for b in unique_bins]
            ax.bar(bin_centers, counts/len(discretized), 
                  width=(bin_edges[1]-bin_edges[0])*0.8, 
                  alpha=0.7, color='orange', label='离散化结果')
            
            ax.set_title(f'{name}')
            ax.set_xlabel('数值')
            ax.set_ylabel('密度/频率')
            ax.legend()
            ax.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()

# 演示数值特征处理
print("=== 数值特征处理 ===")
numerical_processor = NumericalFeatureProcessing()
scaled_data = numerical_processor.scaling_methods_comparison()
transformed_data = numerical_processor.distribution_transformation()
discretized_data = numerical_processor.discretization_techniques()

8.3 类别特征处理

8.3.1 类别特征编码

类别特征需要转换为数值形式才能被机器学习算法使用。

class CategoricalFeatureProcessing:
    def __init__(self):
        self.encoders = {}
        
    def create_categorical_dataset(self):
        """创建包含不同类型类别特征的数据集"""
        print("=== 创建类别特征数据集 ===")
        
        np.random.seed(42)
        n_samples = 1000
        
        # 不同类型的类别特征
        # 1. 无序类别特征(名义变量)
        colors = np.random.choice(['红色', '蓝色', '绿色', '黄色'], n_samples)
        
        # 2. 有序类别特征(序数变量)
        education = np.random.choice(['小学', '初中', '高中', '本科', '研究生'], 
                                   n_samples, p=[0.1, 0.2, 0.3, 0.3, 0.1])
        
        # 3. 高基数类别特征
        cities = np.random.choice([f'城市{i}' for i in range(50)], n_samples)
        
        # 4. 二元类别特征
        gender = np.random.choice(['男', '女'], n_samples)
        
        # 创建目标变量(与某些类别特征相关)
        education_mapping = {'小学': 0, '初中': 1, '高中': 2, '本科': 3, '研究生': 4}
        education_numeric = np.array([education_mapping[e] for e in education])
        
        y = (education_numeric + np.random.normal(0, 1, n_samples) + 
             (colors == '红色').astype(int) + 
             (gender == '男').astype(int) * 0.5) > 2
        y = y.astype(int)
        
        # 组合数据
        data = pd.DataFrame({
            'color': colors,
            'education': education,
            'city': cities,
            'gender': gender,
            'target': y
        })
        
        print(f"数据集大小: {data.shape}")
        print(f"类别特征统计:")
        for col in ['color', 'education', 'city', 'gender']:
            print(f"  {col}: {data[col].nunique()} 个唯一值")
            
        return data
        
    def encoding_methods_comparison(self, data):
        """比较不同的编码方法"""
        print("\n=== 类别特征编码方法比较 ===")
        
        # 准备数据
        categorical_features = ['color', 'education', 'city', 'gender']
        X = data[categorical_features]
        y = data['target']
        
        # 不同的编码方法
        encoding_methods = {}
        
        # 1. 标签编码
        print("1. 标签编码 (Label Encoding)")
        X_label = X.copy()
        label_encoders = {}
        for col in categorical_features:
            le = LabelEncoder()
            X_label[col] = le.fit_transform(X[col])
            label_encoders[col] = le
        encoding_methods['标签编码'] = X_label
        
        # 2. 独热编码
        print("2. 独热编码 (One-Hot Encoding)")
        X_onehot = pd.get_dummies(X, prefix=categorical_features)
        encoding_methods['独热编码'] = X_onehot
        
        # 3. 序数编码(针对有序特征)
        print("3. 序数编码 (Ordinal Encoding)")
        X_ordinal = X.copy()
        # 为education定义顺序
        education_order = ['小学', '初中', '高中', '本科', '研究生']
        ordinal_encoder = OrdinalEncoder(categories=[
            X['color'].unique(),
            education_order,
            X['city'].unique(),
            X['gender'].unique()
        ])
        X_ordinal_encoded = ordinal_encoder.fit_transform(X)
        X_ordinal = pd.DataFrame(X_ordinal_encoded, columns=categorical_features)
        encoding_methods['序数编码'] = X_ordinal
        
        # 4. 目标编码(均值编码)
        print("4. 目标编码 (Target Encoding)")
        X_target = X.copy()
        for col in categorical_features:
            target_mean = data.groupby(col)['target'].mean()
            X_target[col] = X[col].map(target_mean)
        encoding_methods['目标编码'] = X_target
        
        # 5. 频率编码
        print("5. 频率编码 (Frequency Encoding)")
        X_freq = X.copy()
        for col in categorical_features:
            freq_map = X[col].value_counts().to_dict()
            X_freq[col] = X[col].map(freq_map)
        encoding_methods['频率编码'] = X_freq
        
        # 评估不同编码方法的效果
        self.evaluate_encoding_methods(encoding_methods, y)
        
        # 可视化编码效果
        self.visualize_encoding_methods(X, encoding_methods)
        
        return encoding_methods
        
    def evaluate_encoding_methods(self, encoding_methods, y):
        """评估不同编码方法的效果"""
        print("\n=== 编码方法性能评估 ===")
        
        results = {}
        
        for name, X_encoded in encoding_methods.items():
            # 划分数据集
            X_train, X_test, y_train, y_test = train_test_split(
                X_encoded, y, test_size=0.3, random_state=42
            )
            
            # 标准化(除了独热编码)
            if name != '独热编码':
                scaler = StandardScaler()
                X_train = scaler.fit_transform(X_train)
                X_test = scaler.transform(X_test)
            
            # 训练模型
            model = LogisticRegression(random_state=42, max_iter=1000)
            model.fit(X_train, y_train)
            
            # 评估性能
            train_score = model.score(X_train, y_train)
            test_score = model.score(X_test, y_test)
            
            results[name] = {
                'train_score': train_score,
                'test_score': test_score,
                'n_features': X_encoded.shape[1]
            }
            
            print(f"{name:10}: 训练={train_score:.3f}, 测试={test_score:.3f}, "
                  f"特征数={X_encoded.shape[1]}")
        
        return results
        
    def visualize_encoding_methods(self, X_original, encoding_methods):
        """可视化编码方法效果"""
        fig, axes = plt.subplots(2, 3, figsize=(18, 12))
        axes = axes.ravel()
        
        # 原始数据分布
        axes[0].bar(range(len(X_original.columns)), 
                   [X_original[col].nunique() for col in X_original.columns])
        axes[0].set_title('原始类别特征唯一值数量')
        axes[0].set_xlabel('特征')
        axes[0].set_ylabel('唯一值数量')
        axes[0].set_xticks(range(len(X_original.columns)))
        axes[0].set_xticklabels(X_original.columns, rotation=45)
        axes[0].grid(True, alpha=0.3)
        
        # 不同编码方法的特征数量
        method_names = list(encoding_methods.keys())
        feature_counts = [encoding_methods[name].shape[1] for name in method_names]
        
        axes[1].bar(method_names, feature_counts, color='lightcoral')
        axes[1].set_title('不同编码方法的特征数量')
        axes[1].set_xlabel('编码方法')
        axes[1].set_ylabel('特征数量')
        axes[1].tick_params(axis='x', rotation=45)
        axes[1].grid(True, alpha=0.3)
        
        # 展示部分编码结果
        for i, (name, X_encoded) in enumerate(list(encoding_methods.items())[:4]):
            ax = axes[i + 2]
            
            if name == '独热编码':
                # 独热编码显示前10个特征的分布
                feature_subset = X_encoded.iloc[:, :10]
                im = ax.imshow(feature_subset.head(50).T, cmap='viridis', aspect='auto')
                ax.set_title(f'{name} (前10个特征)')
                ax.set_xlabel('样本')
                ax.set_ylabel('特征')
            else:
                # 其他编码方法显示所有特征的分布
                if X_encoded.shape[1] <= 10:
                    im = ax.imshow(X_encoded.head(50).T, cmap='viridis', aspect='auto')
                    ax.set_title(f'{name}')
                    ax.set_xlabel('样本')
                    ax.set_ylabel('特征')
                else:
                    # 如果特征太多,显示相关性矩阵
                    corr = X_encoded.corr()
                    im = ax.imshow(corr, cmap='coolwarm', vmin=-1, vmax=1)
                    ax.set_title(f'{name} 相关性矩阵')
        
        plt.tight_layout()
        plt.show()
        
    def high_cardinality_encoding(self):
        """高基数类别特征处理"""
        print("\n=== 高基数类别特征处理 ===")
        
        # 创建高基数类别特征
        np.random.seed(42)
        n_samples = 10000
        n_categories = 1000
        
        # 模拟用户ID(高基数)
        user_ids = np.random.choice([f'user_{i}' for i in range(n_categories)], 
                                  n_samples)
        
        # 模拟其他特征
        age = np.random.normal(35, 10, n_samples)
        income = np.random.lognormal(10, 0.5, n_samples)
        
        # 创建目标变量(某些用户有更高的转化率)
        high_value_users = set(np.random.choice([f'user_{i}' for i in range(n_categories)], 
                                              int(0.1 * n_categories), replace=False))
        y = np.array([1 if uid in high_value_users else 0 for uid in user_ids])
        y = (y + np.random.binomial(1, 0.1, n_samples)) > 0  # 添加噪声
        
        data = pd.DataFrame({
            'user_id': user_ids,
            'age': age,
            'income': income,
            'target': y.astype(int)
        })
        
        print(f"数据集大小: {data.shape}")
        print(f"用户ID唯一值数量: {data['user_id'].nunique()}")
        print(f"目标变量分布: {data['target'].value_counts().to_dict()}")
        
        # 不同的高基数处理方法
        methods = {}
        
        # 1. 频率编码
        freq_map = data['user_id'].value_counts().to_dict()
        data['user_id_freq'] = data['user_id'].map(freq_map)
        methods['频率编码'] = data[['user_id_freq', 'age', 'income']]
        
        # 2. 目标编码
        target_mean = data.groupby('user_id')['target'].mean()
        data['user_id_target'] = data['user_id'].map(target_mean)
        methods['目标编码'] = data[['user_id_target', 'age', 'income']]
        
        # 3. 哈希编码
        from sklearn.feature_extraction import FeatureHasher
        hasher = FeatureHasher(n_features=100, input_type='string')
        user_id_hashed = hasher.transform(data['user_id']).toarray()
        
        hash_df = pd.DataFrame(user_id_hashed, 
                              columns=[f'hash_{i}' for i in range(100)])
        hash_df['age'] = data['age'].values
        hash_df['income'] = data['income'].values
        methods['哈希编码'] = hash_df
        
        # 4. 嵌入编码(简化版)
        # 使用目标编码作为嵌入的简化版本
        methods['嵌入编码'] = methods['目标编码']  # 实际应用中会使用神经网络学习嵌入
        
        # 评估不同方法
        self.evaluate_high_cardinality_methods(methods, data['target'])
        
        return methods
        
    def evaluate_high_cardinality_methods(self, methods, y):
        """评估高基数处理方法"""
        print("\n=== 高基数处理方法性能评估 ===")
        
        results = {}
        
        for name, X in methods.items():
            # 划分数据集
            X_train, X_test, y_train, y_test = train_test_split(
                X, y, test_size=0.3, random_state=42
            )
            
            # 标准化
            scaler = StandardScaler()
            X_train_scaled = scaler.fit_transform(X_train)
            X_test_scaled = scaler.transform(X_test)
            
            # 训练模型
            model = LogisticRegression(random_state=42, max_iter=1000)
            model.fit(X_train_scaled, y_train)
            
            # 评估性能
            train_score = model.score(X_train_scaled, y_train)
            test_score = model.score(X_test_scaled, y_test)
            
            results[name] = {
                'train_score': train_score,
                'test_score': test_score,
                'n_features': X.shape[1]
            }
            
            print(f"{name:10}: 训练={train_score:.3f}, 测试={test_score:.3f}, "
                  f"特征数={X.shape[1]}")
        
        # 可视化结果
        self.visualize_high_cardinality_results(results)
        
        return results
        
    def visualize_high_cardinality_results(self, results):
        """可视化高基数处理结果"""
        fig, axes = plt.subplots(1, 2, figsize=(12, 5))
        
        methods = list(results.keys())
        train_scores = [results[method]['train_score'] for method in methods]
        test_scores = [results[method]['test_score'] for method in methods]
        feature_counts = [results[method]['n_features'] for method in methods]
        
        # 性能比较
        x = np.arange(len(methods))
        width = 0.35
        
        axes[0].bar(x - width/2, train_scores, width, label='训练准确率', alpha=0.8)
        axes[0].bar(x + width/2, test_scores, width, label='测试准确率', alpha=0.8)
        axes[0].set_xlabel('方法')
        axes[0].set_ylabel('准确率')
        axes[0].set_title('高基数处理方法性能比较')
        axes[0].set_xticks(x)
        axes[0].set_xticklabels(methods, rotation=45)
        axes[0].legend()
        axes[0].grid(True, alpha=0.3)
        
        # 特征数量比较
        axes[1].bar(methods, feature_counts, color='lightgreen')
        axes[1].set_xlabel('方法')
        axes[1].set_ylabel('特征数量')
        axes[1].set_title('特征数量比较')
        axes[1].tick_params(axis='x', rotation=45)
        axes[1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()

# 演示类别特征处理
print("=== 类别特征处理 ===")
categorical_processor = CategoricalFeatureProcessing()
categorical_data = categorical_processor.create_categorical_dataset()
encoding_methods = categorical_processor.encoding_methods_comparison(categorical_data)
high_cardinality_methods = categorical_processor.high_cardinality_encoding()

8.4 特征构造

8.4.1 多项式特征和交互特征

通过组合现有特征可以创建更有表达力的特征。

class FeatureConstruction:
    def __init__(self):
        self.constructed_features = {}
        
    def polynomial_features_demo(self):
        """多项式特征演示"""
        print("=== 多项式特征构造 ===")
        
        # 创建非线性数据
        np.random.seed(42)
        n_samples = 500
        
        X1 = np.random.uniform(-2, 2, n_samples)
        X2 = np.random.uniform(-2, 2, n_samples)
        
        # 非线性目标函数
        y = (X1**2 + X2**2 + X1*X2 + 0.5*X1**3 + 
             np.random.normal(0, 0.1, n_samples))
        
        X_original = np.column_stack([X1, X2])
        
        # 不同阶数的多项式特征
        polynomial_degrees = [1, 2, 3, 4]
        results = {}
        
        for degree in polynomial_degrees:
            # 生成多项式特征
            poly = PolynomialFeatures(degree=degree, include_bias=False)
            X_poly = poly.fit_transform(X_original)
            
            # 获取特征名称
            feature_names = poly.get_feature_names_out(['X1', 'X2'])
            
            # 划分数据集
            X_train, X_test, y_train, y_test = train_test_split(
                X_poly, y, test_size=0.3, random_state=42
            )
            
            # 标准化
            scaler = StandardScaler()
            X_train_scaled = scaler.fit_transform(X_train)
            X_test_scaled = scaler.transform(X_test)
            
            # 训练模型
            model = LinearRegression()
            model.fit(X_train_scaled, y_train)
            
            # 评估性能
            train_score = model.score(X_train_scaled, y_train)
            test_score = model.score(X_test_scaled, y_test)
            
            results[f'{degree}阶'] = {
                'degree': degree,
                'n_features': X_poly.shape[1],
                'feature_names': feature_names,
                'train_score': train_score,
                'test_score': test_score,
                'X_poly': X_poly,
                'model': model,
                'scaler': scaler
            }
            
            print(f"{degree}阶多项式: 特征数={X_poly.shape[1]}, "
                  f"训练R²={train_score:.3f}, 测试R²={test_score:.3f}")
        
        # 可视化多项式特征效果
        self.visualize_polynomial_features(X_original, y, results)
        
        return results
        
    def visualize_polynomial_features(self, X_original, y, results):
        """可视化多项式特征效果"""
        fig, axes = plt.subplots(2, 3, figsize=(18, 12))
        
        # 原始数据分布
        scatter = axes[0, 0].scatter(X_original[:, 0], X_original[:, 1], 
                                   c=y, cmap='viridis', alpha=0.6)
        axes[0, 0].set_xlabel('X1')
        axes[0, 0].set_ylabel('X2')
        axes[0, 0].set_title('原始数据分布')
        plt.colorbar(scatter, ax=axes[0, 0])
        
        # 性能比较
        degrees = [result['degree'] for result in results.values()]
        train_scores = [result['train_score'] for result in results.values()]
        test_scores = [result['test_score'] for result in results.values()]
        feature_counts = [result['n_features'] for result in results.values()]
        
        axes[0, 1].plot(degrees, train_scores, 'o-', label='训练R²', linewidth=2)
        axes[0, 1].plot(degrees, test_scores, 's-', label='测试R²', linewidth=2)
        axes[0, 1].set_xlabel('多项式阶数')
        axes[0, 1].set_ylabel('R²分数')
        axes[0, 1].set_title('不同阶数多项式性能')
        axes[0, 1].legend()
        axes[0, 1].grid(True, alpha=0.3)
        
        # 特征数量增长
        axes[0, 2].bar(degrees, feature_counts, color='lightcoral')
        axes[0, 2].set_xlabel('多项式阶数')
        axes[0, 2].set_ylabel('特征数量')
        axes[0, 2].set_title('特征数量增长')
        axes[0, 2].grid(True, alpha=0.3)
        
        # 预测效果可视化(选择最佳模型)
        best_result = max(results.values(), key=lambda x: x['test_score'])
        
        # 创建网格进行预测
        x1_range = np.linspace(X_original[:, 0].min(), X_original[:, 0].max(), 50)
        x2_range = np.linspace(X_original[:, 1].min(), X_original[:, 1].max(), 50)
        X1_grid, X2_grid = np.meshgrid(x1_range, x2_range)
        grid_points = np.column_stack([X1_grid.ravel(), X2_grid.ravel()])
        
        # 生成多项式特征并预测
        poly = PolynomialFeatures(degree=best_result['degree'], include_bias=False)
        grid_poly = poly.fit_transform(grid_points)
        grid_scaled = best_result['scaler'].transform(grid_poly)
        predictions = best_result['model'].predict(grid_scaled)
        
        # 绘制预测表面
        predictions_grid = predictions.reshape(X1_grid.shape)
        contour = axes[1, 0].contourf(X1_grid, X2_grid, predictions_grid, 
                                    levels=20, cmap='viridis', alpha=0.8)
        axes[1, 0].scatter(X_original[:, 0], X_original[:, 1], 
                         c=y, cmap='viridis', edgecolors='black', alpha=0.6)
        axes[1, 0].set_xlabel('X1')
        axes[1, 0].set_ylabel('X2')
        axes[1, 0].set_title(f'最佳模型预测 ({best_result["degree"]}阶)')
        plt.colorbar(contour, ax=axes[1, 0])
        
        # 特征重要性(系数大小)
        if best_result['degree'] <= 3:  # 只显示低阶的特征名称
            feature_names = best_result['feature_names']
            coefficients = np.abs(best_result['model'].coef_)
            
            # 选择最重要的特征
            top_indices = np.argsort(coefficients)[-10:]
            top_features = [feature_names[i] for i in top_indices]
            top_coeffs = coefficients[top_indices]
            
            axes[1, 1].barh(range(len(top_features)), top_coeffs)
            axes[1, 1].set_yticks(range(len(top_features)))
            axes[1, 1].set_yticklabels(top_features)
            axes[1, 1].set_xlabel('系数绝对值')
            axes[1, 1].set_title('最重要的多项式特征')
            axes[1, 1].grid(True, alpha=0.3)
        
        # 残差分析
        X_test_poly = poly.transform(X_original)
        X_test_scaled = best_result['scaler'].transform(X_test_poly)
        y_pred = best_result['model'].predict(X_test_scaled)
        residuals = y - y_pred
        
        axes[1, 2].scatter(y_pred, residuals, alpha=0.6)
        axes[1, 2].axhline(y=0, color='red', linestyle='--')
        axes[1, 2].set_xlabel('预测值')
        axes[1, 2].set_ylabel('残差')
        axes[1, 2].set_title('残差分析')
        axes[1, 2].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
    def interaction_features_demo(self):
        """交互特征演示"""
        print("\n=== 交互特征构造 ===")
        
        # 创建具有交互效应的数据
        np.random.seed(42)
        n_samples = 1000
        
        # 基础特征
        age = np.random.normal(35, 10, n_samples)
        income = np.random.lognormal(10, 0.5, n_samples)
        education = np.random.choice([0, 1, 2, 3, 4], n_samples)  # 教育水平
        experience = np.random.normal(10, 5, n_samples)  # 工作经验
        
        # 目标变量具有复杂的交互效应
        y = (0.1 * age + 
             0.0001 * income + 
             0.2 * education + 
             0.15 * experience +
             0.05 * age * education +  # 年龄与教育的交互
             0.00001 * income * experience +  # 收入与经验的交互
             0.02 * age * experience +  # 年龄与经验的交互
             np.random.normal(0, 1, n_samples)) > 5
        
        y = y.astype(int)
        
        # 原始特征
        X_original = np.column_stack([age, income, education, experience])
        feature_names = ['age', 'income', 'education', 'experience']
        
        # 构造交互特征
        interaction_features = []
        interaction_names = []
        
        # 两两交互
        for i in range(len(feature_names)):
            for j in range(i+1, len(feature_names)):
                interaction_features.append(X_original[:, i] * X_original[:, j])
                interaction_names.append(f'{feature_names[i]}*{feature_names[j]}')
        
        # 组合原始特征和交互特征
        X_with_interactions = np.column_stack([X_original] + interaction_features)
        all_feature_names = feature_names + interaction_names
        
        # 比较有无交互特征的模型性能
        datasets = {
            '仅原始特征': (X_original, feature_names),
            '包含交互特征': (X_with_interactions, all_feature_names)
        }
        
        results = {}
        
        for name, (X, names) in datasets.items():
            # 划分数据集
            X_train, X_test, y_train, y_test = train_test_split(
                X, y, test_size=0.3, random_state=42
            )
            
            # 标准化
            scaler = StandardScaler()
            X_train_scaled = scaler.fit_transform(X_train)
            X_test_scaled = scaler.transform(X_test)
            
            # 训练模型
            model = LogisticRegression(random_state=42, max_iter=1000)
            model.fit(X_train_scaled, y_train)
            
            # 评估性能
            train_score = model.score(X_train_scaled, y_train)
            test_score = model.score(X_test_scaled, y_test)
            
            results[name] = {
                'train_score': train_score,
                'test_score': test_score,
                'n_features': X.shape[1],
                'feature_names': names,
                'model': model,
                'scaler': scaler
            }
            
            print(f"{name}: 特征数={X.shape[1]}, "
                  f"训练准确率={train_score:.3f}, 测试准确率={test_score:.3f}")
        
        # 可视化交互特征效果
        self.visualize_interaction_features(X_original, y, results, 
                                          feature_names, interaction_names)
        
        return results
        
    def visualize_interaction_features(self, X_original, y, results, 
                                     feature_names, interaction_names):
        """可视化交互特征效果"""
        fig, axes = plt.subplots(2, 3, figsize=(18, 12))
        
        # 原始特征相关性
        corr_original = np.corrcoef(X_original.T)
        im1 = axes[0, 0].imshow(corr_original, cmap='coolwarm', vmin=-1, vmax=1)
        axes[0, 0].set_xticks(range(len(feature_names)))
        axes[0, 0].set_yticks(range(len(feature_names)))
        axes[0, 0].set_xticklabels(feature_names, rotation=45)
        axes[0, 0].set_yticklabels(feature_names)
        axes[0, 0].set_title('原始特征相关性')
        plt.colorbar(im1, ax=axes[0, 0])
        
        # 性能比较
        methods = list(results.keys())
        train_scores = [results[method]['train_score'] for method in methods]
        test_scores = [results[method]['test_score'] for method in methods]
        feature_counts = [results[method]['n_features'] for method in methods]
        
        x = np.arange(len(methods))
        width = 0.35
        
        axes[0, 1].bar(x - width/2, train_scores, width, label='训练准确率', alpha=0.8)
        axes[0, 1].bar(x + width/2, test_scores, width, label='测试准确率', alpha=0.8)
        axes[0, 1].set_xlabel('方法')
        axes[0, 1].set_ylabel('准确率')
        axes[0, 1].set_title('交互特征效果比较')
        axes[0, 1].set_xticks(x)
        axes[0, 1].set_xticklabels(methods)
        axes[0, 1].legend()
        axes[0, 1].grid(True, alpha=0.3)
        
        # 特征数量比较
        axes[0, 2].bar(methods, feature_counts, color='lightgreen')
        axes[0, 2].set_xlabel('方法')
        axes[0, 2].set_ylabel('特征数量')
        axes[0, 2].set_title('特征数量比较')
        axes[0, 2].grid(True, alpha=0.3)
        
        # 特征重要性(系数)
        model_with_interactions = results['包含交互特征']['model']
        all_names = results['包含交互特征']['feature_names']
        coefficients = np.abs(model_with_interactions.coef_[0])
        
        # 选择最重要的特征
        top_indices = np.argsort(coefficients)[-10:]
        top_features = [all_names[i] for i in top_indices]
        top_coeffs = coefficients[top_indices]
        
        axes[1, 0].barh(range(len(top_features)), top_coeffs)
        axes[1, 0].set_yticks(range(len(top_features)))
        axes[1, 0].set_yticklabels(top_features)
        axes[1, 0].set_xlabel('系数绝对值')
        axes[1, 0].set_title('最重要的特征(包含交互)')
        axes[1, 0].grid(True, alpha=0.3)
        
        # 交互特征可视化(选择一个交互特征)
        # 年龄与教育的交互
        age_idx = feature_names.index('age')
        education_idx = feature_names.index('education')
        
        # 创建分组
        education_levels = np.unique(X_original[:, education_idx])
        colors = plt.cm.viridis(np.linspace(0, 1, len(education_levels)))
        
        for i, edu_level in enumerate(education_levels):
            mask = X_original[:, education_idx] == edu_level
            axes[1, 1].scatter(X_original[mask, age_idx], y[mask], 
                             c=[colors[i]], label=f'教育水平{int(edu_level)}', 
                             alpha=0.6)
        
        axes[1, 1].set_xlabel('年龄')
        axes[1, 1].set_ylabel('目标变量')
        axes[1, 1].set_title('年龄与教育水平的交互效应')
        axes[1, 1].legend()
        axes[1, 1].grid(True, alpha=0.3)
        
        # 交互特征分布
        age_education_interaction = X_original[:, age_idx] * X_original[:, education_idx]
        axes[1, 2].hist([age_education_interaction[y==0], age_education_interaction[y==1]], 
                       bins=30, alpha=0.7, label=['类别0', '类别1'])
        axes[1, 2].set_xlabel('年龄×教育水平')
        axes[1, 2].set_ylabel('频率')
        axes[1, 2].set_title('交互特征分布')
        axes[1, 2].legend()
        axes[1, 2].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
    def aggregation_features_demo(self):
        """聚合特征演示"""
        print("\n=== 聚合特征构造 ===")
        
        # 创建分组数据(模拟用户行为数据)
        np.random.seed(42)
        n_users = 1000
        n_transactions = 5000
        
        # 用户基本信息
        user_ids = np.arange(n_users)
        user_ages = np.random.normal(35, 10, n_users)
        user_cities = np.random.choice(['北京', '上海', '广州', '深圳', '杭州'], n_users)
        
        # 交易数据
        transaction_user_ids = np.random.choice(user_ids, n_transactions)
        transaction_amounts = np.random.lognormal(5, 1, n_transactions)
        transaction_categories = np.random.choice(['食品', '服装', '电子', '旅游', '教育'], 
                                                n_transactions)
        transaction_dates = pd.date_range('2023-01-01', periods=n_transactions, freq='H')
        
        # 创建交易数据框
        transactions = pd.DataFrame({
            'user_id': transaction_user_ids,
            'amount': transaction_amounts,
            'category': transaction_categories,
            'date': transaction_dates
        })
        
        # 用户数据框
        users = pd.DataFrame({
            'user_id': user_ids,
            'age': user_ages,
            'city': user_cities
        })
        
        print(f"用户数量: {len(users)}")
        print(f"交易数量: {len(transactions)}")
        
        # 构造聚合特征
        aggregation_features = {}
        
        # 1. 基础统计聚合
        basic_agg = transactions.groupby('user_id').agg({
            'amount': ['count', 'sum', 'mean', 'std', 'min', 'max'],
            'category': 'nunique'
        }).round(2)
        
        # 扁平化列名
        basic_agg.columns = ['_'.join(col).strip() for col in basic_agg.columns]
        basic_agg = basic_agg.reset_index()
        
        # 2. 时间相关聚合
        transactions['hour'] = transactions['date'].dt.hour
        transactions['day_of_week'] = transactions['date'].dt.dayofweek
        transactions['month'] = transactions['date'].dt.month
        
        time_agg = transactions.groupby('user_id').agg({
            'hour': ['mean', 'std'],
            'day_of_week': 'mean',
            'month': 'nunique'
        }).round(2)
        
        time_agg.columns = ['_'.join(col).strip() for col in time_agg.columns]
        time_agg = time_agg.reset_index()
        
        # 3. 类别相关聚合
        category_pivot = transactions.pivot_table(
            index='user_id', 
            columns='category', 
            values='amount', 
            aggfunc=['count', 'sum'], 
            fill_value=0
        )
        
        # 扁平化列名
        category_pivot.columns = ['_'.join(col).strip() for col in category_pivot.columns]
        category_pivot = category_pivot.reset_index()
        
        # 4. 比例特征
        ratio_features = transactions.groupby('user_id').apply(
            lambda x: pd.Series({
                'weekend_ratio': (x['day_of_week'] >= 5).mean(),
                'high_amount_ratio': (x['amount'] > x['amount'].quantile(0.8)).mean(),
                'recent_activity_ratio': (x['date'] > x['date'].quantile(0.7)).mean()
            })
        ).reset_index()
        
        # 合并所有聚合特征
        user_features = users.copy()
        for df in [basic_agg, time_agg, category_pivot, ratio_features]:
            user_features = user_features.merge(df, on='user_id', how='left')
        
        # 填充缺失值
        user_features = user_features.fillna(0)
        
        print(f"聚合后特征数量: {user_features.shape[1]}")
        print(f"聚合特征示例:")
        print(user_features.head())
        
        # 可视化聚合特征
        self.visualize_aggregation_features(transactions, user_features)
        
        return user_features, transactions
        
    def visualize_aggregation_features(self, transactions, user_features):
        """可视化聚合特征"""
        fig, axes = plt.subplots(2, 3, figsize=(18, 12))
        
        # 1. 用户交易频率分布
        transaction_counts = transactions['user_id'].value_counts()
        axes[0, 0].hist(transaction_counts, bins=30, alpha=0.7, edgecolor='black')
        axes[0, 0].set_xlabel('交易次数')
        axes[0, 0].set_ylabel('用户数量')
        axes[0, 0].set_title('用户交易频率分布')
        axes[0, 0].grid(True, alpha=0.3)
        
        # 2. 用户平均交易金额分布
        axes[0, 1].hist(user_features['amount_mean'], bins=30, alpha=0.7, edgecolor='black')
        axes[0, 1].set_xlabel('平均交易金额')
        axes[0, 1].set_ylabel('用户数量')
        axes[0, 1].set_title('用户平均交易金额分布')
        axes[0, 1].grid(True, alpha=0.3)
        
        # 3. 交易类别多样性
        axes[0, 2].hist(user_features['category_nunique'], bins=range(1, 7), 
                       alpha=0.7, edgecolor='black')
        axes[0, 2].set_xlabel('交易类别数量')
        axes[0, 2].set_ylabel('用户数量')
        axes[0, 2].set_title('用户交易类别多样性')
        axes[0, 2].grid(True, alpha=0.3)
        
        # 4. 时间模式 - 平均交易时间
        axes[1, 0].hist(user_features['hour_mean'], bins=24, alpha=0.7, edgecolor='black')
        axes[1, 0].set_xlabel('平均交易时间(小时)')
        axes[1, 0].set_ylabel('用户数量')
        axes[1, 0].set_title('用户平均交易时间分布')
        axes[1, 0].grid(True, alpha=0.3)
        
        # 5. 周末交易比例
        axes[1, 1].hist(user_features['weekend_ratio'], bins=20, alpha=0.7, edgecolor='black')
        axes[1, 1].set_xlabel('周末交易比例')
        axes[1, 1].set_ylabel('用户数量')
        axes[1, 1].set_title('用户周末交易比例分布')
        axes[1, 1].grid(True, alpha=0.3)
        
        # 6. 特征相关性热图(选择部分数值特征)
        numeric_features = user_features.select_dtypes(include=[np.number]).columns[:10]
        corr_matrix = user_features[numeric_features].corr()
        
        im = axes[1, 2].imshow(corr_matrix, cmap='coolwarm', vmin=-1, vmax=1)
        axes[1, 2].set_xticks(range(len(numeric_features)))
        axes[1, 2].set_yticks(range(len(numeric_features)))
        axes[1, 2].set_xticklabels(numeric_features, rotation=45, ha='right')
        axes[1, 2].set_yticklabels(numeric_features)
        axes[1, 2].set_title('聚合特征相关性')
        plt.colorbar(im, ax=axes[1, 2])
        
        plt.tight_layout()
        plt.show()

# 演示特征构造
print("=== 特征构造 ===")
feature_constructor = FeatureConstruction()
polynomial_results = feature_constructor.polynomial_features_demo()
interaction_results = feature_constructor.interaction_features_demo()
user_features, transactions = feature_constructor.aggregation_features_demo()

8.5 文本特征提取

8.5.1 词袋模型和TF-IDF

文本数据需要转换为数值特征才能被机器学习算法处理。

class TextFeatureExtraction:
    def __init__(self):
        self.vectorizers = {}
        
    def create_text_dataset(self):
        """创建文本数据集"""
        print("=== 创建文本数据集 ===")
        
        # 使用20newsgroups数据集的子集
        categories = ['alt.atheism', 'soc.religion.christian', 
                     'comp.graphics', 'sci.med']
        
        # 加载数据
        newsgroups_train = fetch_20newsgroups(subset='train', 
                                            categories=categories,
                                            shuffle=True, 
                                            random_state=42,
                                            remove=('headers', 'footers', 'quotes'))
        
        # 创建简化的文本数据
        texts = newsgroups_train.data[:1000]  # 使用前1000个文档
        labels = newsgroups_train.target[:1000]
        target_names = newsgroups_train.target_names
        
        print(f"文档数量: {len(texts)}")
        print(f"类别数量: {len(target_names)}")
        print(f"类别名称: {target_names}")
        print(f"类别分布: {np.bincount(labels)}")
        
        # 显示示例文档
        print(f"\n示例文档 (类别: {target_names[labels[0]]}):")
        print(texts[0][:300] + "...")
        
        return texts, labels, target_names
        
    def bag_of_words_demo(self, texts, labels):
        """词袋模型演示"""
        print("\n=== 词袋模型 (Bag of Words) ===")
        
        # 不同的词袋模型配置
        vectorizers = {
            '基础词袋': CountVectorizer(max_features=1000, stop_words='english'),
            '二元词袋': CountVectorizer(max_features=1000, ngram_range=(1, 2), 
                                    stop_words='english'),
            '限制词频': CountVectorizer(max_features=1000, min_df=2, max_df=0.8, 
                                    stop_words='english'),
            '二进制词袋': CountVectorizer(max_features=1000, binary=True, 
                                     stop_words='english')
        }
        
        results = {}
        
        for name, vectorizer in vectorizers.items():
            print(f"\n{name}:")
            
            # 拟合和转换
            X = vectorizer.fit_transform(texts)
            
            # 划分数据集
            X_train, X_test, y_train, y_test = train_test_split(
                X, labels, test_size=0.3, random_state=42, stratify=labels
            )
            
            # 训练分类器
            classifier = LogisticRegression(random_state=42, max_iter=1000)
            classifier.fit(X_train, y_train)
            
            # 评估性能
            train_score = classifier.score(X_train, y_train)
            test_score = classifier.score(X_test, y_test)
            
            # 获取词汇表信息
            vocabulary_size = len(vectorizer.vocabulary_)
            feature_names = vectorizer.get_feature_names_out()
            
            results[name] = {
                'vectorizer': vectorizer,
                'X': X,
                'train_score': train_score,
                'test_score': test_score,
                'vocabulary_size': vocabulary_size,
                'feature_names': feature_names,
                'classifier': classifier
            }
            
            print(f"  词汇表大小: {vocabulary_size}")
            print(f"  特征矩阵形状: {X.shape}")
            print(f"  稀疏度: {1 - X.nnz / (X.shape[0] * X.shape[1]):.3f}")
            print(f"  训练准确率: {train_score:.3f}")
            print(f"  测试准确率: {test_score:.3f}")
        
        # 可视化词袋模型效果
        self.visualize_bow_results(results, texts, labels)
        
        return results
        
    def tfidf_demo(self, texts, labels):
        """TF-IDF演示"""
        print("\n=== TF-IDF特征提取 ===")
        
        # 不同的TF-IDF配置
        tfidf_vectorizers = {
            '基础TF-IDF': TfidfVectorizer(max_features=1000, stop_words='english'),
            'L1正则化': TfidfVectorizer(max_features=1000, norm='l1', 
                                     stop_words='english'),
            '子线性缩放': TfidfVectorizer(max_features=1000, sublinear_tf=True, 
                                      stop_words='english'),
            '二元组合': TfidfVectorizer(max_features=1000, ngram_range=(1, 2), 
                                    stop_words='english')
        }
        
        results = {}
        
        for name, vectorizer in tfidf_vectorizers.items():
            print(f"\n{name}:")
            
            # 拟合和转换
            X = vectorizer.fit_transform(texts)
            
            # 划分数据集
            X_train, X_test, y_train, y_test = train_test_split(
                X, labels, test_size=0.3, random_state=42, stratify=labels
            )
            
            # 训练分类器
            classifier = LogisticRegression(random_state=42, max_iter=1000)
            classifier.fit(X_train, y_train)
            
            # 评估性能
            train_score = classifier.score(X_train, y_train)
            test_score = classifier.score(X_test, y_test)
            
            results[name] = {
                'vectorizer': vectorizer,
                'X': X,
                'train_score': train_score,
                'test_score': test_score,
                'classifier': classifier
            }
            
            print(f"  特征矩阵形状: {X.shape}")
            print(f"  稀疏度: {1 - X.nnz / (X.shape[0] * X.shape[1]):.3f}")
            print(f"  训练准确率: {train_score:.3f}")
            print(f"  测试准确率: {test_score:.3f}")
        
        # 可视化TF-IDF效果
        self.visualize_tfidf_results(results, texts, labels)
        
        return results
        
    def visualize_bow_results(self, results, texts, labels):
        """可视化词袋模型结果"""
        fig, axes = plt.subplots(2, 2, figsize=(15, 12))
        
        # 1. 性能比较
        methods = list(results.keys())
        train_scores = [results[method]['train_score'] for method in methods]
        test_scores = [results[method]['test_score'] for method in methods]
        
        x = np.arange(len(methods))
        width = 0.35
        
        axes[0, 0].bar(x - width/2, train_scores, width, label='训练准确率', alpha=0.8)
        axes[0, 0].bar(x + width/2, test_scores, width, label='测试准确率', alpha=0.8)
        axes[0, 0].set_xlabel('方法')
        axes[0, 0].set_ylabel('准确率')
        axes[0, 0].set_title('词袋模型性能比较')
        axes[0, 0].set_xticks(x)
        axes[0, 0].set_xticklabels(methods, rotation=45)
        axes[0, 0].legend()
        axes[0, 0].grid(True, alpha=0.3)
        
        # 2. 词汇表大小比较
        vocab_sizes = [results[method]['vocabulary_size'] for method in methods]
        axes[0, 1].bar(methods, vocab_sizes, color='lightcoral')
        axes[0, 1].set_xlabel('方法')
        axes[0, 1].set_ylabel('词汇表大小')
        axes[0, 1].set_title('词汇表大小比较')
        axes[0, 1].tick_params(axis='x', rotation=45)
        axes[0, 1].grid(True, alpha=0.3)
        
        # 3. 特征重要性(选择基础词袋模型)
        base_result = results['基础词袋']
        feature_names = base_result['feature_names']
        classifier = base_result['classifier']
        
        # 获取每个类别的重要特征
        class_names = ['atheism', 'christian', 'graphics', 'med']
        
        for i, class_name in enumerate(class_names):
            if i >= 4:  # 只显示4个类别
                break
                
            # 获取该类别的系数
            coef = classifier.coef_[i]
            top_indices = np.argsort(coef)[-10:]  # 最重要的10个特征
            top_features = [feature_names[idx] for idx in top_indices]
            top_scores = coef[top_indices]
            
            if i < 2:
                ax = axes[1, i]
                ax.barh(range(len(top_features)), top_scores)
                ax.set_yticks(range(len(top_features)))
                ax.set_yticklabels(top_features)
                ax.set_xlabel('重要性分数')
                ax.set_title(f'{class_name}类别重要特征')
                ax.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
    def visualize_tfidf_results(self, results, texts, labels):
        """可视化TF-IDF结果"""
        fig, axes = plt.subplots(2, 2, figsize=(15, 12))
        
        # 1. 性能比较
        methods = list(results.keys())
        train_scores = [results[method]['train_score'] for method in methods]
        test_scores = [results[method]['test_score'] for method in methods]
        
        x = np.arange(len(methods))
        width = 0.35
        
        axes[0, 0].bar(x - width/2, train_scores, width, label='训练准确率', alpha=0.8)
        axes[0, 0].bar(x + width/2, test_scores, width, label='测试准确率', alpha=0.8)
        axes[0, 0].set_xlabel('方法')
        axes[0, 0].set_ylabel('准确率')
        axes[0, 0].set_title('TF-IDF方法性能比较')
        axes[0, 0].set_xticks(x)
        axes[0, 0].set_xticklabels(methods, rotation=45)
        axes[0, 0].legend()
        axes[0, 0].grid(True, alpha=0.3)
        
        # 2. 稀疏度比较
        sparsity = []
        for method in methods:
            X = results[method]['X']
            sparsity.append(1 - X.nnz / (X.shape[0] * X.shape[1]))
        
        axes[0, 1].bar(methods, sparsity, color='lightgreen')
        axes[0, 1].set_xlabel('方法')
        axes[0, 1].set_ylabel('稀疏度')
        axes[0, 1].set_title('特征矩阵稀疏度比较')
        axes[0, 1].tick_params(axis='x', rotation=45)
        axes[0, 1].grid(True, alpha=0.3)
        
        # 3. TF-IDF值分布
        base_tfidf = results['基础TF-IDF']['X']
        tfidf_values = base_tfidf.data  # 非零TF-IDF值
        
        axes[1, 0].hist(tfidf_values, bins=50, alpha=0.7, edgecolor='black')
        axes[1, 0].set_xlabel('TF-IDF值')
        axes[1, 0].set_ylabel('频率')
        axes[1, 0].set_title('TF-IDF值分布')
        axes[1, 0].grid(True, alpha=0.3)
        
        # 4. 文档长度与TF-IDF的关系
        doc_lengths = [len(text.split()) for text in texts[:100]]  # 前100个文档
        avg_tfidf_scores = []
        
        for i in range(min(100, base_tfidf.shape[0])):
            doc_tfidf = base_tfidf[i].data
            avg_score = np.mean(doc_tfidf) if len(doc_tfidf) > 0 else 0
            avg_tfidf_scores.append(avg_score)
        
        axes[1, 1].scatter(doc_lengths, avg_tfidf_scores, alpha=0.6)
        axes[1, 1].set_xlabel('文档长度(词数)')
        axes[1, 1].set_ylabel('平均TF-IDF分数')
        axes[1, 1].set_title('文档长度与TF-IDF关系')
        axes[1, 1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
    def advanced_text_features(self, texts, labels):
        """高级文本特征"""
        print("\n=== 高级文本特征 ===")
        
        # 1. 哈希向量化
        print("1. 哈希向量化 (Hashing Vectorizer)")
        hash_vectorizer = HashingVectorizer(n_features=1000, stop_words='english')
        X_hash = hash_vectorizer.fit_transform(texts)
        
        # 2. 字符级n-gram
        print("2. 字符级n-gram")
        char_vectorizer = TfidfVectorizer(analyzer='char', ngram_range=(2, 4), 
                                        max_features=1000)
        X_char = char_vectorizer.fit_transform(texts)
        
        # 3. 文本统计特征
        print("3. 文本统计特征")
        text_stats = []
        for text in texts:
            stats = {
                'length': len(text),
                'word_count': len(text.split()),
                'avg_word_length': np.mean([len(word) for word in text.split()]),
                'sentence_count': text.count('.') + text.count('!') + text.count('?'),
                'uppercase_ratio': sum(1 for c in text if c.isupper()) / len(text),
                'digit_ratio': sum(1 for c in text if c.isdigit()) / len(text)
            }
            text_stats.append(list(stats.values()))
        
        X_stats = np.array(text_stats)
        
        # 评估不同特征的效果
        feature_sets = {
            '哈希向量': X_hash,
            '字符n-gram': X_char,
            '统计特征': X_stats
        }
        
        results = {}
        
        for name, X in feature_sets.items():
            # 划分数据集
            X_train, X_test, y_train, y_test = train_test_split(
                X, labels, test_size=0.3, random_state=42, stratify=labels
            )
            
            # 标准化(对统计特征)
            if name == '统计特征':
                scaler = StandardScaler()
                X_train = scaler.fit_transform(X_train)
                X_test = scaler.transform(X_test)
            
            # 训练分类器
            classifier = LogisticRegression(random_state=42, max_iter=1000)
            classifier.fit(X_train, y_train)
            
            # 评估性能
            train_score = classifier.score(X_train, y_train)
            test_score = classifier.score(X_test, y_test)
            
            results[name] = {
                'train_score': train_score,
                'test_score': test_score,
                'n_features': X.shape[1]
            }
            
            print(f"{name}: 特征数={X.shape[1]}, "
                  f"训练准确率={train_score:.3f}, 测试准确率={test_score:.3f}")
        
        # 可视化高级特征效果
        self.visualize_advanced_features(results, X_stats, texts)
        
        return results
        
    def visualize_advanced_features(self, results, X_stats, texts):
        """可视化高级文本特征"""
        fig, axes = plt.subplots(2, 2, figsize=(15, 12))
        
        # 1. 性能比较
        methods = list(results.keys())
        train_scores = [results[method]['train_score'] for method in methods]
        test_scores = [results[method]['test_score'] for method in methods]
        feature_counts = [results[method]['n_features'] for method in methods]
        
        x = np.arange(len(methods))
        width = 0.35
        
        axes[0, 0].bar(x - width/2, train_scores, width, label='训练准确率', alpha=0.8)
        axes[0, 0].bar(x + width/2, test_scores, width, label='测试准确率', alpha=0.8)
        axes[0, 0].set_xlabel('方法')
        axes[0, 0].set_ylabel('准确率')
        axes[0, 0].set_title('高级文本特征性能比较')
        axes[0, 0].set_xticks(x)
        axes[0, 0].set_xticklabels(methods, rotation=45)
        axes[0, 0].legend()
        axes[0, 0].grid(True, alpha=0.3)
        
        # 2. 特征数量比较
        axes[0, 1].bar(methods, feature_counts, color='lightcoral')
        axes[0, 1].set_xlabel('方法')
        axes[0, 1].set_ylabel('特征数量')
        axes[0, 1].set_title('特征数量比较')
        axes[0, 1].tick_params(axis='x', rotation=45)
        axes[0, 1].grid(True, alpha=0.3)
        
        # 3. 文本统计特征分布
        stat_names = ['文本长度', '词数', '平均词长', '句子数', '大写比例', '数字比例']
        
        for i in range(min(4, len(stat_names))):
            if i < 2:
                ax = axes[1, i]
                ax.hist(X_stats[:, i], bins=30, alpha=0.7, edgecolor='black')
                ax.set_xlabel(stat_names[i])
                ax.set_ylabel('频率')
                ax.set_title(f'{stat_names[i]}分布')
                ax.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()

# 演示文本特征提取
print("=== 文本特征提取 ===")
text_extractor = TextFeatureExtraction()
texts, labels, target_names = text_extractor.create_text_dataset()
bow_results = text_extractor.bag_of_words_demo(texts, labels)
tfidf_results = text_extractor.tfidf_demo(texts, labels)
advanced_results = text_extractor.advanced_text_features(texts, labels)

8.6 时间序列特征工程

8.6.1 时间特征提取

时间序列数据包含丰富的时间模式信息,需要提取相关特征。

class TimeSeriesFeatureEngineering:
    def __init__(self):
        self.features = {}
        
    def create_time_series_data(self):
        """创建时间序列数据"""
        print("=== 创建时间序列数据 ===")
        
        # 创建日期范围
        date_range = pd.date_range(start='2020-01-01', end='2023-12-31', freq='D')
        
        # 创建模拟销售数据
        np.random.seed(42)
        n_days = len(date_range)
        
        # 基础趋势
        trend = np.linspace(100, 200, n_days)
        
        # 季节性模式(年度)
        yearly_seasonal = 20 * np.sin(2 * np.pi * np.arange(n_days) / 365.25)
        
        # 周期性模式(周)
        weekly_seasonal = 10 * np.sin(2 * np.pi * np.arange(n_days) / 7)
        
        # 随机噪声
        noise = np.random.normal(0, 15, n_days)
        
        # 特殊事件影响
        special_events = np.zeros(n_days)
        # 黑色星期五效应
        for year in range(2020, 2024):
            black_friday = pd.Timestamp(f'{year}-11-25')
            if black_friday in date_range:
                idx = date_range.get_loc(black_friday)
                special_events[idx:idx+3] = 50  # 3天的促销效应
        
        # 组合所有成分
        sales = trend + yearly_seasonal + weekly_seasonal + noise + special_events
        sales = np.maximum(sales, 0)  # 确保销售额非负
        
        # 创建DataFrame
        df = pd.DataFrame({
            'date': date_range,
            'sales': sales,
            'trend': trend,
            'yearly_seasonal': yearly_seasonal,
            'weekly_seasonal': weekly_seasonal,
            'noise': noise,
            'special_events': special_events
        })
        
        print(f"数据形状: {df.shape}")
        print(f"日期范围: {df['date'].min()} 到 {df['date'].max()}")
        print(f"销售额统计:")
        print(df['sales'].describe())
        
        # 可视化原始数据
        self.visualize_time_series_data(df)
        
        return df
        
    def extract_time_features(self, df):
        """提取时间特征"""
        print("\n=== 提取时间特征 ===")
        
        # 复制数据
        df_features = df.copy()
        
        # 基础时间特征
        df_features['year'] = df_features['date'].dt.year
        df_features['month'] = df_features['date'].dt.month
        df_features['day'] = df_features['date'].dt.day
        df_features['dayofweek'] = df_features['date'].dt.dayofweek
        df_features['dayofyear'] = df_features['date'].dt.dayofyear
        df_features['quarter'] = df_features['date'].dt.quarter
        df_features['week'] = df_features['date'].dt.isocalendar().week
        
        # 周末标识
        df_features['is_weekend'] = (df_features['dayofweek'] >= 5).astype(int)
        
        # 月初月末标识
        df_features['is_month_start'] = df_features['date'].dt.is_month_start.astype(int)
        df_features['is_month_end'] = df_features['date'].dt.is_month_end.astype(int)
        
        # 季节标识
        df_features['is_spring'] = df_features['month'].isin([3, 4, 5]).astype(int)
        df_features['is_summer'] = df_features['month'].isin([6, 7, 8]).astype(int)
        df_features['is_autumn'] = df_features['month'].isin([9, 10, 11]).astype(int)
        df_features['is_winter'] = df_features['month'].isin([12, 1, 2]).astype(int)
        
        # 节假日标识(简化版)
        df_features['is_holiday'] = 0
        # 新年
        df_features.loc[df_features['date'].dt.strftime('%m-%d') == '01-01', 'is_holiday'] = 1
        # 圣诞节
        df_features.loc[df_features['date'].dt.strftime('%m-%d') == '12-25', 'is_holiday'] = 1
        # 感恩节(11月第四个星期四)
        for year in df_features['year'].unique():
            thanksgiving = pd.Timestamp(f'{year}-11-01') + pd.DateOffset(days=21)
            while thanksgiving.dayofweek != 3:  # 星期四
                thanksgiving += pd.DateOffset(days=1)
            df_features.loc[df_features['date'] == thanksgiving, 'is_holiday'] = 1
        
        # 循环编码(处理周期性)
        df_features['month_sin'] = np.sin(2 * np.pi * df_features['month'] / 12)
        df_features['month_cos'] = np.cos(2 * np.pi * df_features['month'] / 12)
        df_features['dayofweek_sin'] = np.sin(2 * np.pi * df_features['dayofweek'] / 7)
        df_features['dayofweek_cos'] = np.cos(2 * np.pi * df_features['dayofweek'] / 7)
        df_features['dayofyear_sin'] = np.sin(2 * np.pi * df_features['dayofyear'] / 365.25)
        df_features['dayofyear_cos'] = np.cos(2 * np.pi * df_features['dayofyear'] / 365.25)
        
        print("提取的时间特征:")
        time_features = [col for col in df_features.columns if col not in ['date', 'sales', 'trend', 'yearly_seasonal', 'weekly_seasonal', 'noise', 'special_events']]
        print(time_features)
        
        # 可视化时间特征
        self.visualize_time_features(df_features, time_features)
        
        return df_features, time_features
        
    def create_lag_features(self, df, target_col='sales', lags=[1, 7, 30]):
        """创建滞后特征"""
        print(f"\n=== 创建滞后特征 (lags: {lags}) ===")
        
        df_lag = df.copy()
        
        # 滞后特征
        for lag in lags:
            df_lag[f'{target_col}_lag_{lag}'] = df_lag[target_col].shift(lag)
        
        # 滚动窗口特征
        windows = [7, 14, 30]
        for window in windows:
            df_lag[f'{target_col}_rolling_mean_{window}'] = df_lag[target_col].rolling(window=window).mean()
            df_lag[f'{target_col}_rolling_std_{window}'] = df_lag[target_col].rolling(window=window).std()
            df_lag[f'{target_col}_rolling_min_{window}'] = df_lag[target_col].rolling(window=window).min()
            df_lag[f'{target_col}_rolling_max_{window}'] = df_lag[target_col].rolling(window=window).max()
        
        # 指数加权移动平均
        alphas = [0.1, 0.3, 0.5]
        for alpha in alphas:
            df_lag[f'{target_col}_ewm_{alpha}'] = df_lag[target_col].ewm(alpha=alpha).mean()
        
        # 差分特征
        df_lag[f'{target_col}_diff_1'] = df_lag[target_col].diff(1)
        df_lag[f'{target_col}_diff_7'] = df_lag[target_col].diff(7)
        
        # 变化率特征
        df_lag[f'{target_col}_pct_change_1'] = df_lag[target_col].pct_change(1)
        df_lag[f'{target_col}_pct_change_7'] = df_lag[target_col].pct_change(7)
        
        # 移除包含NaN的行
        df_lag_clean = df_lag.dropna()
        
        lag_features = [col for col in df_lag.columns if 'lag' in col or 'rolling' in col or 'ewm' in col or 'diff' in col or 'pct_change' in col]
        
        print(f"创建的滞后特征数量: {len(lag_features)}")
        print(f"清理后数据形状: {df_lag_clean.shape}")
        
        # 可视化滞后特征
        self.visualize_lag_features(df_lag_clean, target_col, lag_features)
        
        return df_lag_clean, lag_features
        
    def visualize_time_series_data(self, df):
        """可视化时间序列数据"""
        fig, axes = plt.subplots(3, 2, figsize=(15, 12))
        
        # 1. 原始销售数据
        axes[0, 0].plot(df['date'], df['sales'], linewidth=1)
        axes[0, 0].set_title('原始销售数据')
        axes[0, 0].set_xlabel('日期')
        axes[0, 0].set_ylabel('销售额')
        axes[0, 0].grid(True, alpha=0.3)
        
        # 2. 趋势分解
        axes[0, 1].plot(df['date'], df['trend'], label='趋势', linewidth=2)
        axes[0, 1].plot(df['date'], df['yearly_seasonal'], label='年度季节性', alpha=0.7)
        axes[0, 1].plot(df['date'], df['weekly_seasonal'], label='周季节性', alpha=0.7)
        axes[0, 1].set_title('时间序列分解')
        axes[0, 1].set_xlabel('日期')
        axes[0, 1].set_ylabel('值')
        axes[0, 1].legend()
        axes[0, 1].grid(True, alpha=0.3)
        
        # 3. 月度销售模式
        monthly_sales = df.groupby(df['date'].dt.month)['sales'].mean()
        axes[1, 0].bar(monthly_sales.index, monthly_sales.values)
        axes[1, 0].set_title('月度平均销售额')
        axes[1, 0].set_xlabel('月份')
        axes[1, 0].set_ylabel('平均销售额')
        axes[1, 0].grid(True, alpha=0.3)
        
        # 4. 周度销售模式
        weekly_sales = df.groupby(df['date'].dt.dayofweek)['sales'].mean()
        weekdays = ['周一', '周二', '周三', '周四', '周五', '周六', '周日']
        axes[1, 1].bar(range(7), weekly_sales.values)
        axes[1, 1].set_title('周度平均销售额')
        axes[1, 1].set_xlabel('星期')
        axes[1, 1].set_ylabel('平均销售额')
        axes[1, 1].set_xticks(range(7))
        axes[1, 1].set_xticklabels(weekdays, rotation=45)
        axes[1, 1].grid(True, alpha=0.3)
        
        # 5. 年度销售趋势
        yearly_sales = df.groupby(df['date'].dt.year)['sales'].mean()
        axes[2, 0].plot(yearly_sales.index, yearly_sales.values, marker='o', linewidth=2)
        axes[2, 0].set_title('年度平均销售额')
        axes[2, 0].set_xlabel('年份')
        axes[2, 0].set_ylabel('平均销售额')
        axes[2, 0].grid(True, alpha=0.3)
        
        # 6. 销售额分布
        axes[2, 1].hist(df['sales'], bins=50, alpha=0.7, edgecolor='black')
        axes[2, 1].set_title('销售额分布')
        axes[2, 1].set_xlabel('销售额')
        axes[2, 1].set_ylabel('频率')
        axes[2, 1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
    def visualize_time_features(self, df, time_features):
        """可视化时间特征"""
        fig, axes = plt.subplots(2, 3, figsize=(18, 10))
        
        # 1. 循环编码可视化
        axes[0, 0].scatter(df['month_sin'], df['month_cos'], c=df['month'], cmap='tab12', alpha=0.6)
        axes[0, 0].set_xlabel('Month Sin')
        axes[0, 0].set_ylabel('Month Cos')
        axes[0, 0].set_title('月份循环编码')
        axes[0, 0].grid(True, alpha=0.3)
        
        # 2. 周几循环编码
        axes[0, 1].scatter(df['dayofweek_sin'], df['dayofweek_cos'], c=df['dayofweek'], cmap='tab7', alpha=0.6)
        axes[0, 1].set_xlabel('Dayofweek Sin')
        axes[0, 1].set_ylabel('Dayofweek Cos')
        axes[0, 1].set_title('星期循环编码')
        axes[0, 1].grid(True, alpha=0.3)
        
        # 3. 季节性特征与销售额关系
        seasonal_features = ['is_spring', 'is_summer', 'is_autumn', 'is_winter']
        seasonal_sales = []
        for feature in seasonal_features:
            seasonal_sales.append(df[df[feature] == 1]['sales'].mean())
        
        axes[0, 2].bar(range(len(seasonal_features)), seasonal_sales)
        axes[0, 2].set_title('季节性销售额')
        axes[0, 2].set_xlabel('季节')
        axes[0, 2].set_ylabel('平均销售额')
        axes[0, 2].set_xticks(range(len(seasonal_features)))
        axes[0, 2].set_xticklabels(['春', '夏', '秋', '冬'])
        axes[0, 2].grid(True, alpha=0.3)
        
        # 4. 周末vs工作日
        weekend_sales = df[df['is_weekend'] == 1]['sales'].mean()
        weekday_sales = df[df['is_weekend'] == 0]['sales'].mean()
        
        axes[1, 0].bar(['工作日', '周末'], [weekday_sales, weekend_sales])
        axes[1, 0].set_title('工作日vs周末销售额')
        axes[1, 0].set_ylabel('平均销售额')
        axes[1, 0].grid(True, alpha=0.3)
        
        # 5. 节假日效应
        holiday_sales = df[df['is_holiday'] == 1]['sales'].mean()
        normal_sales = df[df['is_holiday'] == 0]['sales'].mean()
        
        axes[1, 1].bar(['普通日', '节假日'], [normal_sales, holiday_sales])
        axes[1, 1].set_title('节假日vs普通日销售额')
        axes[1, 1].set_ylabel('平均销售额')
        axes[1, 1].grid(True, alpha=0.3)
        
        # 6. 特征相关性热图
        feature_subset = ['month', 'dayofweek', 'is_weekend', 'is_holiday', 'quarter']
        corr_matrix = df[feature_subset + ['sales']].corr()
        
        im = axes[1, 2].imshow(corr_matrix, cmap='coolwarm', aspect='auto', vmin=-1, vmax=1)
        axes[1, 2].set_xticks(range(len(corr_matrix.columns)))
        axes[1, 2].set_yticks(range(len(corr_matrix.columns)))
        axes[1, 2].set_xticklabels(corr_matrix.columns, rotation=45)
        axes[1, 2].set_yticklabels(corr_matrix.columns)
        axes[1, 2].set_title('特征相关性热图')
        
        # 添加相关系数文本
        for i in range(len(corr_matrix.columns)):
            for j in range(len(corr_matrix.columns)):
                text = axes[1, 2].text(j, i, f'{corr_matrix.iloc[i, j]:.2f}',
                                     ha="center", va="center", color="black", fontsize=8)
        
        plt.colorbar(im, ax=axes[1, 2])
        plt.tight_layout()
        plt.show()
        
    def visualize_lag_features(self, df, target_col, lag_features):
        """可视化滞后特征"""
        fig, axes = plt.subplots(2, 3, figsize=(18, 10))
        
        # 1. 滞后特征与目标变量的相关性
        lag_only_features = [f for f in lag_features if 'lag' in f and 'rolling' not in f][:5]
        correlations = []
        for feature in lag_only_features:
            corr = df[target_col].corr(df[feature])
            correlations.append(corr)
        
        axes[0, 0].bar(range(len(lag_only_features)), correlations)
        axes[0, 0].set_title('滞后特征相关性')
        axes[0, 0].set_xlabel('滞后特征')
        axes[0, 0].set_ylabel('相关系数')
        axes[0, 0].set_xticks(range(len(lag_only_features)))
        axes[0, 0].set_xticklabels([f.replace(f'{target_col}_', '') for f in lag_only_features], rotation=45)
        axes[0, 0].grid(True, alpha=0.3)
        
        # 2. 滚动窗口特征
        rolling_features = [f for f in lag_features if 'rolling_mean' in f]
        
        sample_data = df.tail(100)  # 最后100天
        axes[0, 1].plot(sample_data.index, sample_data[target_col], label='原始数据', linewidth=2)
        for feature in rolling_features:
            axes[0, 1].plot(sample_data.index, sample_data[feature], 
                          label=feature.replace(f'{target_col}_', ''), alpha=0.7)
        axes[0, 1].set_title('滚动平均特征')
        axes[0, 1].set_xlabel('时间')
        axes[0, 1].set_ylabel('值')
        axes[0, 1].legend()
        axes[0, 1].grid(True, alpha=0.3)
        
        # 3. 指数加权移动平均
        ewm_features = [f for f in lag_features if 'ewm' in f]
        
        axes[0, 2].plot(sample_data.index, sample_data[target_col], label='原始数据', linewidth=2)
        for feature in ewm_features:
            axes[0, 2].plot(sample_data.index, sample_data[feature], 
                          label=feature.replace(f'{target_col}_', ''), alpha=0.7)
        axes[0, 2].set_title('指数加权移动平均')
        axes[0, 2].set_xlabel('时间')
        axes[0, 2].set_ylabel('值')
        axes[0, 2].legend()
        axes[0, 2].grid(True, alpha=0.3)
        
        # 4. 差分特征
        diff_features = [f for f in lag_features if 'diff' in f]
        
        for i, feature in enumerate(diff_features):
            axes[1, 0].plot(sample_data.index, sample_data[feature], 
                          label=feature.replace(f'{target_col}_', ''), alpha=0.7)
        axes[1, 0].set_title('差分特征')
        axes[1, 0].set_xlabel('时间')
        axes[1, 0].set_ylabel('差分值')
        axes[1, 0].legend()
        axes[1, 0].grid(True, alpha=0.3)
        
        # 5. 变化率特征
        pct_features = [f for f in lag_features if 'pct_change' in f]
        
        for feature in pct_features:
            axes[1, 1].plot(sample_data.index, sample_data[feature], 
                          label=feature.replace(f'{target_col}_', ''), alpha=0.7)
        axes[1, 1].set_title('变化率特征')
        axes[1, 1].set_xlabel('时间')
        axes[1, 1].set_ylabel('变化率')
        axes[1, 1].legend()
        axes[1, 1].grid(True, alpha=0.3)
        
        # 6. 特征重要性(使用随机森林)
        from sklearn.ensemble import RandomForestRegressor
        
        # 准备特征和目标
        feature_cols = lag_features[:10]  # 选择前10个特征
        X = df[feature_cols].fillna(0)
        y = df[target_col]
        
        # 训练随机森林
        rf = RandomForestRegressor(n_estimators=100, random_state=42)
        rf.fit(X, y)
        
        # 获取特征重要性
        importances = rf.feature_importances_
        feature_names = [f.replace(f'{target_col}_', '') for f in feature_cols]
        
        # 排序
        indices = np.argsort(importances)[::-1]
        
        axes[1, 2].bar(range(len(feature_cols)), importances[indices])
        axes[1, 2].set_title('滞后特征重要性')
        axes[1, 2].set_xlabel('特征')
        axes[1, 2].set_ylabel('重要性')
        axes[1, 2].set_xticks(range(len(feature_cols)))
        axes[1, 2].set_xticklabels([feature_names[i] for i in indices], rotation=45)
        axes[1, 2].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()

# 演示时间序列特征工程
print("=== 时间序列特征工程 ===")
ts_engineer = TimeSeriesFeatureEngineering()
ts_data = ts_engineer.create_time_series_data()
ts_features_data, time_features = ts_engineer.extract_time_features(ts_data)
ts_lag_data, lag_features = ts_engineer.create_lag_features(ts_features_data)

8.7 综合案例:房价预测特征工程

8.7.1 项目背景与目标

在这个综合案例中,我们将运用本章学到的所有特征工程技术来解决房价预测问题。

class HousePricePredictionFeatureEngineering:
    def __init__(self):
        self.feature_transformers = {}
        self.feature_importance = {}
        
    def create_house_dataset(self):
        """创建房价数据集"""
        print("=== 创建房价数据集 ===")
        
        np.random.seed(42)
        n_samples = 5000
        
        # 基础特征
        data = {
            # 数值特征
            'area': np.random.normal(150, 50, n_samples),  # 面积
            'bedrooms': np.random.poisson(3, n_samples),   # 卧室数
            'bathrooms': np.random.poisson(2, n_samples),  # 浴室数
            'age': np.random.exponential(10, n_samples),   # 房龄
            'garage_size': np.random.poisson(1, n_samples), # 车库大小
            
            # 类别特征
            'neighborhood': np.random.choice(['Downtown', 'Suburb', 'Rural', 'Waterfront'], n_samples, 
                                           p=[0.3, 0.4, 0.2, 0.1]),
            'house_type': np.random.choice(['Apartment', 'House', 'Condo', 'Townhouse'], n_samples,
                                         p=[0.25, 0.35, 0.25, 0.15]),
            'heating_type': np.random.choice(['Gas', 'Electric', 'Oil', 'Solar'], n_samples,
                                           p=[0.5, 0.3, 0.15, 0.05]),
            
            # 布尔特征
            'has_pool': np.random.choice([0, 1], n_samples, p=[0.8, 0.2]),
            'has_garden': np.random.choice([0, 1], n_samples, p=[0.6, 0.4]),
            'has_parking': np.random.choice([0, 1], n_samples, p=[0.3, 0.7]),
            'recently_renovated': np.random.choice([0, 1], n_samples, p=[0.7, 0.3]),
        }
        
        # 创建DataFrame
        df = pd.DataFrame(data)
        
        # 确保数值特征的合理性
        df['area'] = np.clip(df['area'], 50, 500)
        df['bedrooms'] = np.clip(df['bedrooms'], 1, 6)
        df['bathrooms'] = np.clip(df['bathrooms'], 1, 4)
        df['age'] = np.clip(df['age'], 0, 100)
        df['garage_size'] = np.clip(df['garage_size'], 0, 3)
        
        # 生成目标变量(房价)
        # 基础价格
        base_price = 200000
        
        # 面积影响
        area_effect = df['area'] * 800
        
        # 卧室和浴室影响
        room_effect = df['bedrooms'] * 15000 + df['bathrooms'] * 10000
        
        # 房龄影响(负相关)
        age_effect = -df['age'] * 1000
        
        # 地区影响
        neighborhood_effect = df['neighborhood'].map({
            'Downtown': 100000, 'Waterfront': 150000, 'Suburb': 50000, 'Rural': 0
        })
        
        # 房型影响
        house_type_effect = df['house_type'].map({
            'House': 50000, 'Condo': 20000, 'Townhouse': 30000, 'Apartment': 0
        })
        
        # 设施影响
        amenity_effect = (df['has_pool'] * 25000 + 
                         df['has_garden'] * 15000 + 
                         df['has_parking'] * 10000 + 
                         df['recently_renovated'] * 20000)
        
        # 随机噪声
        noise = np.random.normal(0, 30000, n_samples)
        
        # 计算最终价格
        df['price'] = (base_price + area_effect + room_effect + age_effect + 
                      neighborhood_effect + house_type_effect + amenity_effect + noise)
        
        # 确保价格为正
        df['price'] = np.maximum(df['price'], 50000)
        
        print(f"数据集形状: {df.shape}")
        print(f"房价统计:")
        print(df['price'].describe())
        
        # 可视化数据集
        self.visualize_house_dataset(df)
        
        return df
        
    def comprehensive_feature_engineering(self, df):
        """综合特征工程"""
        print("\n=== 综合特征工程 ===")
        
        df_features = df.copy()
        
        # 1. 数值特征处理
        print("1. 数值特征处理")
        
        # 对数变换(处理偏态分布)
        df_features['area_log'] = np.log1p(df_features['area'])
        df_features['age_log'] = np.log1p(df_features['age'])
        
        # 标准化
        scaler = StandardScaler()
        numerical_features = ['area', 'bedrooms', 'bathrooms', 'age', 'garage_size']
        df_features[['area_scaled', 'bedrooms_scaled', 'bathrooms_scaled', 
                    'age_scaled', 'garage_size_scaled']] = scaler.fit_transform(df_features[numerical_features])
        
        # 分箱处理
        df_features['area_binned'] = pd.cut(df_features['area'], bins=5, labels=['很小', '小', '中', '大', '很大'])
        df_features['age_binned'] = pd.cut(df_features['age'], bins=[0, 5, 15, 30, 100], 
                                         labels=['新房', '较新', '中等', '老房'])
        
        # 2. 类别特征编码
        print("2. 类别特征编码")
        
        # One-hot编码
        categorical_features = ['neighborhood', 'house_type', 'heating_type']
        df_encoded = pd.get_dummies(df_features, columns=categorical_features, prefix=categorical_features)
        
        # 目标编码(针对高基数类别特征)
        target_encoder = TargetEncoder()
        df_encoded['neighborhood_target_encoded'] = target_encoder.fit_transform(
            df_features[['neighborhood']], df_features['price']
        )
        
        # 3. 特征构造
        print("3. 特征构造")
        
        # 多项式特征
        poly_features = PolynomialFeatures(degree=2, include_bias=False, interaction_only=True)
        area_bedrooms = poly_features.fit_transform(df_features[['area', 'bedrooms']])
        df_encoded['area_bedrooms_interaction'] = area_bedrooms[:, -1]  # 交互项
        
        # 比例特征
        df_encoded['bathroom_bedroom_ratio'] = df_features['bathrooms'] / (df_features['bedrooms'] + 1)
        df_encoded['area_per_bedroom'] = df_features['area'] / (df_features['bedrooms'] + 1)
        
        # 聚合特征
        df_encoded['total_rooms'] = df_features['bedrooms'] + df_features['bathrooms']
        df_encoded['luxury_score'] = (df_features['has_pool'] + df_features['has_garden'] + 
                                    df_features['recently_renovated'])
        
        # 4. 特征选择
        print("4. 特征选择")
        
        # 移除原始类别特征和一些冗余特征
        features_to_drop = ['neighborhood', 'house_type', 'heating_type', 'area_binned', 'age_binned']
        df_final = df_encoded.drop(columns=features_to_drop, errors='ignore')
        
        # 获取特征列表
        feature_columns = [col for col in df_final.columns if col != 'price']
        
        print(f"最终特征数量: {len(feature_columns)}")
        print(f"特征列表: {feature_columns[:10]}...")  # 显示前10个特征
        
        return df_final, feature_columns
        
    def feature_importance_analysis(self, df, feature_columns):
        """特征重要性分析"""
        print("\n=== 特征重要性分析 ===")
        
        X = df[feature_columns]
        y = df['price']
        
        # 划分数据集
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
        
        # 1. 单变量特征选择
        print("1. 单变量特征选择")
        selector = SelectKBest(score_func=f_regression, k=10)
        X_selected = selector.fit_transform(X_train, y_train)
        selected_features = X.columns[selector.get_support()]
        univariate_scores = selector.scores_
        
        print(f"选择的特征: {list(selected_features)}")
        
        # 2. 基于模型的特征选择
        print("2. 基于模型的特征选择")
        
        # 随机森林特征重要性
        rf = RandomForestRegressor(n_estimators=100, random_state=42)
        rf.fit(X_train, y_train)
        rf_importance = rf.feature_importances_
        
        # Lasso特征选择
        lasso = LassoCV(cv=5, random_state=42)
        lasso.fit(X_train, y_train)
        lasso_coef = np.abs(lasso.coef_)
        
        # 3. 递归特征消除
        print("3. 递归特征消除")
        rfe = RFE(estimator=LinearRegression(), n_features_to_select=10)
        rfe.fit(X_train, y_train)
        rfe_selected = X.columns[rfe.support_]
        
        print(f"RFE选择的特征: {list(rfe_selected)}")
        
        # 4. 模型性能比较
        print("4. 模型性能比较")
        
        models = {
            '全部特征': (X_train, X_test),
            '单变量选择': (X_train[selected_features], X_test[selected_features]),
            'RFE选择': (X_train[rfe_selected], X_test[rfe_selected])
        }
        
        results = {}
        
        for name, (X_tr, X_te) in models.items():
            # 训练模型
            model = RandomForestRegressor(n_estimators=100, random_state=42)
            model.fit(X_tr, y_train)
            
            # 评估性能
            train_score = model.score(X_tr, y_train)
            test_score = model.score(X_te, y_test)
            
            # 预测
            y_pred = model.predict(X_te)
            mse = mean_squared_error(y_test, y_pred)
            mae = mean_absolute_error(y_test, y_pred)
            
            results[name] = {
                'train_r2': train_score,
                'test_r2': test_score,
                'mse': mse,
                'mae': mae,
                'n_features': X_tr.shape[1]
            }
            
            print(f"{name}: 特征数={X_tr.shape[1]}, 测试R²={test_score:.3f}, MAE={mae:.0f}")
        
        # 可视化特征重要性分析
        self.visualize_feature_importance(X, y, univariate_scores, rf_importance, lasso_coef, results)
        
        return results, selected_features, rfe_selected
        
    def visualize_house_dataset(self, df):
        """可视化房价数据集"""
        fig, axes = plt.subplots(3, 3, figsize=(18, 15))
        
        # 1. 房价分布
        axes[0, 0].hist(df['price'], bins=50, alpha=0.7, edgecolor='black')
        axes[0, 0].set_title('房价分布')
        axes[0, 0].set_xlabel('价格')
        axes[0, 0].set_ylabel('频率')
        axes[0, 0].grid(True, alpha=0.3)
        
        # 2. 面积vs价格
        axes[0, 1].scatter(df['area'], df['price'], alpha=0.5)
        axes[0, 1].set_title('面积 vs 价格')
        axes[0, 1].set_xlabel('面积')
        axes[0, 1].set_ylabel('价格')
        axes[0, 1].grid(True, alpha=0.3)
        
        # 3. 房龄vs价格
        axes[0, 2].scatter(df['age'], df['price'], alpha=0.5)
        axes[0, 2].set_title('房龄 vs 价格')
        axes[0, 2].set_xlabel('房龄')
        axes[0, 2].set_ylabel('价格')
        axes[0, 2].grid(True, alpha=0.3)
        
        # 4. 地区价格分布
        neighborhood_prices = df.groupby('neighborhood')['price'].mean().sort_values(ascending=False)
        axes[1, 0].bar(range(len(neighborhood_prices)), neighborhood_prices.values)
        axes[1, 0].set_title('不同地区平均房价')
        axes[1, 0].set_xlabel('地区')
        axes[1, 0].set_ylabel('平均价格')
        axes[1, 0].set_xticks(range(len(neighborhood_prices)))
        axes[1, 0].set_xticklabels(neighborhood_prices.index, rotation=45)
        axes[1, 0].grid(True, alpha=0.3)
        
        # 5. 房型价格分布
        house_type_prices = df.groupby('house_type')['price'].mean().sort_values(ascending=False)
        axes[1, 1].bar(range(len(house_type_prices)), house_type_prices.values)
        axes[1, 1].set_title('不同房型平均房价')
        axes[1, 1].set_xlabel('房型')
        axes[1, 1].set_ylabel('平均价格')
        axes[1, 1].set_xticks(range(len(house_type_prices)))
        axes[1, 1].set_xticklabels(house_type_prices.index, rotation=45)
        axes[1, 1].grid(True, alpha=0.3)
        
        # 6. 卧室数量vs价格
        bedroom_prices = df.groupby('bedrooms')['price'].mean()
        axes[1, 2].plot(bedroom_prices.index, bedroom_prices.values, marker='o', linewidth=2)
        axes[1, 2].set_title('卧室数量 vs 平均价格')
        axes[1, 2].set_xlabel('卧室数量')
        axes[1, 2].set_ylabel('平均价格')
        axes[1, 2].grid(True, alpha=0.3)
        
        # 7. 设施对价格的影响
        amenities = ['has_pool', 'has_garden', 'has_parking', 'recently_renovated']
        amenity_effects = []
        for amenity in amenities:
            with_amenity = df[df[amenity] == 1]['price'].mean()
            without_amenity = df[df[amenity] == 0]['price'].mean()
            amenity_effects.append(with_amenity - without_amenity)
        
        axes[2, 0].bar(range(len(amenities)), amenity_effects)
        axes[2, 0].set_title('设施对房价的影响')
        axes[2, 0].set_xlabel('设施')
        axes[2, 0].set_ylabel('价格差异')
        axes[2, 0].set_xticks(range(len(amenities)))
        axes[2, 0].set_xticklabels(['游泳池', '花园', '停车位', '最近装修'], rotation=45)
        axes[2, 0].grid(True, alpha=0.3)
        
        # 8. 相关性热图
        numerical_features = ['area', 'bedrooms', 'bathrooms', 'age', 'garage_size', 'price']
        corr_matrix = df[numerical_features].corr()
        
        im = axes[2, 1].imshow(corr_matrix, cmap='coolwarm', aspect='auto', vmin=-1, vmax=1)
        axes[2, 1].set_xticks(range(len(corr_matrix.columns)))
        axes[2, 1].set_yticks(range(len(corr_matrix.columns)))
        axes[2, 1].set_xticklabels(corr_matrix.columns, rotation=45)
        axes[2, 1].set_yticklabels(corr_matrix.columns)
        axes[2, 1].set_title('数值特征相关性')
        
        # 添加相关系数文本
        for i in range(len(corr_matrix.columns)):
            for j in range(len(corr_matrix.columns)):
                text = axes[2, 1].text(j, i, f'{corr_matrix.iloc[i, j]:.2f}',
                                     ha="center", va="center", color="black", fontsize=8)
        
        # 9. 价格vs面积(按地区分色)
        neighborhoods = df['neighborhood'].unique()
        colors = plt.cm.Set1(np.linspace(0, 1, len(neighborhoods)))
        
        for i, neighborhood in enumerate(neighborhoods):
            subset = df[df['neighborhood'] == neighborhood]
            axes[2, 2].scatter(subset['area'], subset['price'], 
                             label=neighborhood, alpha=0.6, color=colors[i])
        
        axes[2, 2].set_title('面积 vs 价格(按地区分类)')
        axes[2, 2].set_xlabel('面积')
        axes[2, 2].set_ylabel('价格')
        axes[2, 2].legend()
        axes[2, 2].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
    def visualize_feature_importance(self, X, y, univariate_scores, rf_importance, lasso_coef, results):
        """可视化特征重要性分析"""
        fig, axes = plt.subplots(2, 3, figsize=(18, 12))
        
        # 1. 单变量特征分数
        top_10_indices = np.argsort(univariate_scores)[-10:]
        top_10_features = X.columns[top_10_indices]
        top_10_scores = univariate_scores[top_10_indices]
        
        axes[0, 0].barh(range(len(top_10_features)), top_10_scores)
        axes[0, 0].set_yticks(range(len(top_10_features)))
        axes[0, 0].set_yticklabels(top_10_features)
        axes[0, 0].set_title('单变量特征重要性(Top 10)')
        axes[0, 0].set_xlabel('F分数')
        axes[0, 0].grid(True, alpha=0.3)
        
        # 2. 随机森林特征重要性
        rf_top_10_indices = np.argsort(rf_importance)[-10:]
        rf_top_10_features = X.columns[rf_top_10_indices]
        rf_top_10_scores = rf_importance[rf_top_10_indices]
        
        axes[0, 1].barh(range(len(rf_top_10_features)), rf_top_10_scores)
        axes[0, 1].set_yticks(range(len(rf_top_10_features)))
        axes[0, 1].set_yticklabels(rf_top_10_features)
        axes[0, 1].set_title('随机森林特征重要性(Top 10)')
        axes[0, 1].set_xlabel('重要性')
        axes[0, 1].grid(True, alpha=0.3)
        
        # 3. Lasso系数
        lasso_top_10_indices = np.argsort(lasso_coef)[-10:]
        lasso_top_10_features = X.columns[lasso_top_10_indices]
        lasso_top_10_scores = lasso_coef[lasso_top_10_indices]
        
        axes[0, 2].barh(range(len(lasso_top_10_features)), lasso_top_10_scores)
        axes[0, 2].set_yticks(range(len(lasso_top_10_features)))
        axes[0, 2].set_yticklabels(lasso_top_10_features)
        axes[0, 2].set_title('Lasso系数绝对值(Top 10)')
        axes[0, 2].set_xlabel('|系数|')
        axes[0, 2].grid(True, alpha=0.3)
        
        # 4. 模型性能比较
        methods = list(results.keys())
        test_r2_scores = [results[method]['test_r2'] for method in methods]
        mae_scores = [results[method]['mae'] for method in methods]
        n_features = [results[method]['n_features'] for method in methods]
        
        x = np.arange(len(methods))
        width = 0.35
        
        axes[1, 0].bar(x, test_r2_scores)
        axes[1, 0].set_title('不同特征选择方法的R²分数')
        axes[1, 0].set_xlabel('方法')
        axes[1, 0].set_ylabel('测试R²')
        axes[1, 0].set_xticks(x)
        axes[1, 0].set_xticklabels(methods, rotation=45)
        axes[1, 0].grid(True, alpha=0.3)
        
        # 5. MAE比较
        axes[1, 1].bar(x, mae_scores, color='orange')
        axes[1, 1].set_title('不同特征选择方法的MAE')
        axes[1, 1].set_xlabel('方法')
        axes[1, 1].set_ylabel('平均绝对误差')
        axes[1, 1].set_xticks(x)
        axes[1, 1].set_xticklabels(methods, rotation=45)
        axes[1, 1].grid(True, alpha=0.3)
        
        # 6. 特征数量vs性能
        axes[1, 2].scatter(n_features, test_r2_scores, s=100, alpha=0.7)
        for i, method in enumerate(methods):
            axes[1, 2].annotate(method, (n_features[i], test_r2_scores[i]), 
                              xytext=(5, 5), textcoords='offset points')
        axes[1, 2].set_title('特征数量 vs 模型性能')
        axes[1, 2].set_xlabel('特征数量')
        axes[1, 2].set_ylabel('测试R²')
        axes[1, 2].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()

# 演示综合案例
print("=== 房价预测特征工程综合案例 ===")
house_fe = HousePricePredictionFeatureEngineering()
house_data = house_fe.create_house_dataset()
house_features_data, feature_columns = house_fe.comprehensive_feature_engineering(house_data)
importance_results, selected_features, rfe_features = house_fe.feature_importance_analysis(house_features_data, feature_columns)

8.8 本章小结

8.8.1 特征工程核心概念回顾

本章我们深入学习了特征工程的各个方面:

  1. 特征工程概述

    • 特征工程的定义和重要性
    • 特征工程的工作流程
    • 特征工程的最佳实践
  2. 数值特征处理

    • 特征缩放与标准化
    • 分布变换技术
    • 离散化技术
  3. 类别特征处理

    • 类别特征编码方法
    • 高基数类别特征处理
  4. 特征构造

    • 多项式特征
    • 交互特征
    • 聚合特征
  5. 文本特征提取

    • 词袋模型和TF-IDF
    • 高级文本特征
  6. 时间序列特征工程

    • 时间特征提取
    • 滞后特征和滚动窗口特征
  7. 综合案例

    • 房价预测特征工程实战

8.8.2 特征工程最佳实践

  1. 数据理解优先

    • 深入理解业务背景
    • 分析数据分布和特征关系
    • 识别数据质量问题
  2. 特征工程策略

    • 从简单到复杂
    • 保持特征的可解释性
    • 避免数据泄露
  3. 特征验证

    • 使用交叉验证评估特征效果
    • 监控特征重要性
    • 定期更新特征工程流程

8.8.3 常见陷阱与注意事项

  1. 数据泄露

    • 避免使用未来信息
    • 正确处理时间序列数据
    • 谨慎使用目标编码
  2. 过度工程

    • 避免创建过多无用特征
    • 平衡模型复杂度和性能
    • 考虑计算成本
  3. 特征选择

    • 避免盲目删除特征
    • 考虑特征间的交互作用
    • 使用多种特征选择方法

8.8.4 进阶学习方向

  1. 自动化特征工程

    • 学习AutoML工具
    • 研究特征生成算法
    • 探索深度特征学习
  2. 领域特定特征工程

    • 图像特征工程
    • 音频特征工程
    • 网络数据特征工程
  3. 大规模特征工程

    • 分布式特征计算
    • 特征存储和管理
    • 实时特征工程

8.8.5 练习题

理论题

  1. 解释为什么特征工程在机器学习中如此重要?
  2. 比较不同的特征缩放方法,说明它们的适用场景。
  3. 什么是数据泄露?如何在特征工程中避免数据泄露?

实践题

  1. 使用本章学到的技术,对一个新的数据集进行完整的特征工程。
  2. 实现一个自动化的特征选择流程,比较不同方法的效果。
  3. 针对时间序列数据,设计并实现一套完整的特征工程方案。

项目题

  1. 选择一个实际的机器学习问题,从数据收集到特征工程,完成整个流程。
  2. 构建一个特征工程工具包,包含常用的特征处理函数。
  3. 研究某个特定领域的特征工程技术,并实现相关算法。

第8章完结

特征工程是机器学习成功的关键因素之一。通过本章的学习,你已经掌握了从基础的数值和类别特征处理,到高级的文本和时间序列特征工程的完整技能体系。记住,好的特征工程往往比复杂的算法更能提升模型性能。在实际项目中,要根据具体的业务场景和数据特点,灵活运用这些技术,持续优化特征工程流程。