特征工程是机器学习项目中最重要的环节之一,往往决定了模型的上限。好的特征工程能够显著提升模型性能,而糟糕的特征可能让最先进的算法也无法发挥作用。本章将深入探讨特征工程的各个方面,从基础的特征处理到高级的特征构造技术。
8.1 特征工程概述
8.1.1 什么是特征工程
特征工程是利用领域知识和数据科学技术,从原始数据中提取、构造和选择特征的过程。它包括:
- 特征提取:从原始数据中提取有用信息
- 特征构造:基于现有特征创建新特征
- 特征选择:选择最相关的特征子集
- 特征变换:改变特征的分布或尺度
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import make_classification, load_boston, fetch_20newsgroups
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.preprocessing import LabelEncoder, OneHotEncoder, OrdinalEncoder
from sklearn.preprocessing import PolynomialFeatures, PowerTransformer
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif
from sklearn.feature_selection import RFE, SelectFromModel
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.metrics import accuracy_score, mean_squared_error
import warnings
warnings.filterwarnings('ignore')
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
class FeatureEngineeringIntro:
def __init__(self):
self.data = None
def feature_engineering_importance(self):
"""演示特征工程的重要性"""
print("=== 特征工程重要性演示 ===")
# 创建一个简单的数据集
np.random.seed(42)
n_samples = 1000
# 原始特征
x1 = np.random.normal(0, 1, n_samples)
x2 = np.random.normal(0, 1, n_samples)
# 目标变量与特征的非线性关系
y = (x1**2 + x2**2 + x1*x2 + np.random.normal(0, 0.1, n_samples)) > 1
y = y.astype(int)
# 原始特征
X_original = np.column_stack([x1, x2])
# 工程特征
X_engineered = np.column_stack([
x1, x2, # 原始特征
x1**2, x2**2, # 平方特征
x1*x2, # 交互特征
np.abs(x1), np.abs(x2), # 绝对值特征
x1**3, x2**3 # 立方特征
])
# 比较模型性能
models = {
'原始特征': X_original,
'工程特征': X_engineered
}
results = {}
for name, X in models.items():
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# 训练模型
model = LogisticRegression(random_state=42)
model.fit(X_train, y_train)
# 评估性能
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)
results[name] = {
'train_score': train_score,
'test_score': test_score,
'n_features': X.shape[1]
}
print(f"{name}:")
print(f" 特征数量: {X.shape[1]}")
print(f" 训练准确率: {train_score:.3f}")
print(f" 测试准确率: {test_score:.3f}")
print()
# 可视化结果
self.visualize_feature_importance_demo(X_original, X_engineered, y, results)
return results
def visualize_feature_importance_demo(self, X_original, X_engineered, y, results):
"""可视化特征工程重要性"""
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
# 1. 原始数据分布
scatter = axes[0, 0].scatter(X_original[:, 0], X_original[:, 1],
c=y, cmap='viridis', alpha=0.6)
axes[0, 0].set_xlabel('特征1')
axes[0, 0].set_ylabel('特征2')
axes[0, 0].set_title('原始特征分布')
plt.colorbar(scatter, ax=axes[0, 0])
# 2. 工程特征相关性
feature_names = ['x1', 'x2', 'x1²', 'x2²', 'x1*x2', '|x1|', '|x2|', 'x1³', 'x2³']
corr_matrix = np.corrcoef(X_engineered.T)
im = axes[0, 1].imshow(corr_matrix, cmap='coolwarm', vmin=-1, vmax=1)
axes[0, 1].set_xticks(range(len(feature_names)))
axes[0, 1].set_yticks(range(len(feature_names)))
axes[0, 1].set_xticklabels(feature_names, rotation=45)
axes[0, 1].set_yticklabels(feature_names)
axes[0, 1].set_title('工程特征相关性矩阵')
plt.colorbar(im, ax=axes[0, 1])
# 3. 性能比较
methods = list(results.keys())
train_scores = [results[method]['train_score'] for method in methods]
test_scores = [results[method]['test_score'] for method in methods]
x = np.arange(len(methods))
width = 0.35
axes[1, 0].bar(x - width/2, train_scores, width, label='训练准确率', alpha=0.8)
axes[1, 0].bar(x + width/2, test_scores, width, label='测试准确率', alpha=0.8)
axes[1, 0].set_xlabel('方法')
axes[1, 0].set_ylabel('准确率')
axes[1, 0].set_title('性能比较')
axes[1, 0].set_xticks(x)
axes[1, 0].set_xticklabels(methods)
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
# 4. 特征数量对比
n_features = [results[method]['n_features'] for method in methods]
axes[1, 1].bar(methods, n_features, color=['skyblue', 'lightcoral'])
axes[1, 1].set_xlabel('方法')
axes[1, 1].set_ylabel('特征数量')
axes[1, 1].set_title('特征数量对比')
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def feature_engineering_workflow(self):
"""特征工程工作流程"""
print("\n=== 特征工程工作流程 ===")
workflow_steps = {
"1. 数据理解": [
"探索数据结构和类型",
"识别缺失值和异常值",
"理解业务背景和目标",
"分析特征与目标的关系"
],
"2. 数据清洗": [
"处理缺失值",
"检测和处理异常值",
"数据类型转换",
"重复数据处理"
],
"3. 特征变换": [
"数值特征标准化/归一化",
"类别特征编码",
"分布变换(对数、Box-Cox等)",
"离散化处理"
],
"4. 特征构造": [
"多项式特征",
"交互特征",
"聚合特征",
"时间特征提取"
],
"5. 特征选择": [
"过滤法(统计检验)",
"包装法(递归特征消除)",
"嵌入法(基于模型)",
"降维技术"
],
"6. 特征验证": [
"交叉验证评估",
"特征重要性分析",
"模型性能对比",
"业务价值评估"
]
}
for step, details in workflow_steps.items():
print(f"\n{step}:")
for detail in details:
print(f" • {detail}")
print("\n=== 特征工程最佳实践 ===")
best_practices = [
"始终基于业务理解进行特征工程",
"保持训练集和测试集的一致性",
"避免数据泄露(不使用未来信息)",
"记录所有特征工程步骤",
"使用交叉验证评估特征效果",
"平衡特征数量和模型复杂度",
"考虑特征的可解释性",
"定期重新评估特征的有效性"
]
for i, practice in enumerate(best_practices, 1):
print(f" {i}. {practice}")
# 演示特征工程概述
print("=== 特征工程概述 ===")
intro = FeatureEngineeringIntro()
importance_results = intro.feature_engineering_importance()
intro.feature_engineering_workflow()
8.2 数值特征处理
8.2.1 特征缩放与标准化
不同的特征往往具有不同的量纲和数值范围,需要进行适当的缩放处理。
class NumericalFeatureProcessing:
def __init__(self):
self.scalers = {}
def scaling_methods_comparison(self):
"""比较不同的特征缩放方法"""
print("=== 特征缩放方法比较 ===")
# 创建具有不同分布的数据
np.random.seed(42)
n_samples = 1000
# 不同分布的特征
normal_feature = np.random.normal(50, 15, n_samples)
uniform_feature = np.random.uniform(0, 100, n_samples)
exponential_feature = np.random.exponential(2, n_samples)
outlier_feature = np.concatenate([
np.random.normal(10, 2, int(0.95 * n_samples)),
np.random.normal(50, 5, int(0.05 * n_samples))
])
# 组合数据
X = np.column_stack([normal_feature, uniform_feature,
exponential_feature, outlier_feature])
feature_names = ['正态分布', '均匀分布', '指数分布', '含异常值']
# 不同的缩放方法
scalers = {
'原始数据': None,
'标准化 (StandardScaler)': StandardScaler(),
'最小-最大缩放 (MinMaxScaler)': MinMaxScaler(),
'鲁棒缩放 (RobustScaler)': RobustScaler(),
'最大绝对值缩放': MinMaxScaler(feature_range=(-1, 1))
}
# 应用不同的缩放方法
scaled_data = {}
for name, scaler in scalers.items():
if scaler is None:
scaled_data[name] = X
else:
scaled_data[name] = scaler.fit_transform(X)
# 可视化结果
self.visualize_scaling_methods(scaled_data, feature_names)
# 统计信息比较
self.compare_scaling_statistics(scaled_data, feature_names)
return scaled_data
def visualize_scaling_methods(self, scaled_data, feature_names):
"""可视化不同缩放方法的效果"""
n_methods = len(scaled_data)
n_features = len(feature_names)
fig, axes = plt.subplots(n_methods, n_features, figsize=(20, 15))
for i, (method_name, data) in enumerate(scaled_data.items()):
for j, feature_name in enumerate(feature_names):
ax = axes[i, j] if n_methods > 1 else axes[j]
# 绘制直方图
ax.hist(data[:, j], bins=50, alpha=0.7, density=True)
ax.set_title(f'{method_name}\n{feature_name}')
ax.grid(True, alpha=0.3)
# 添加统计信息
mean_val = np.mean(data[:, j])
std_val = np.std(data[:, j])
ax.axvline(mean_val, color='red', linestyle='--',
label=f'均值: {mean_val:.2f}')
ax.axvline(mean_val + std_val, color='orange', linestyle='--',
label=f'±1σ: {std_val:.2f}')
ax.axvline(mean_val - std_val, color='orange', linestyle='--')
if j == 0: # 只在第一列显示图例
ax.legend(fontsize=8)
plt.tight_layout()
plt.show()
def compare_scaling_statistics(self, scaled_data, feature_names):
"""比较缩放后的统计信息"""
print("\n=== 缩放方法统计信息比较 ===")
for method_name, data in scaled_data.items():
print(f"\n{method_name}:")
for j, feature_name in enumerate(feature_names):
feature_data = data[:, j]
print(f" {feature_name:10}: "
f"均值={np.mean(feature_data):6.2f}, "
f"标准差={np.std(feature_data):6.2f}, "
f"最小值={np.min(feature_data):6.2f}, "
f"最大值={np.max(feature_data):6.2f}")
def distribution_transformation(self):
"""分布变换技术"""
print("\n=== 分布变换技术 ===")
# 创建偏态分布数据
np.random.seed(42)
n_samples = 1000
# 右偏分布
right_skewed = np.random.exponential(2, n_samples)
# 左偏分布
left_skewed = 10 - np.random.exponential(2, n_samples)
# 双峰分布
bimodal = np.concatenate([
np.random.normal(2, 0.5, n_samples//2),
np.random.normal(8, 0.5, n_samples//2)
])
X = np.column_stack([right_skewed, left_skewed, bimodal])
feature_names = ['右偏分布', '左偏分布', '双峰分布']
# 不同的变换方法
transformers = {
'原始数据': None,
'对数变换': lambda x: np.log1p(x - x.min() + 1),
'平方根变换': lambda x: np.sqrt(x - x.min()),
'Box-Cox变换': PowerTransformer(method='box-cox'),
'Yeo-Johnson变换': PowerTransformer(method='yeo-johnson')
}
# 应用变换
transformed_data = {}
for name, transformer in transformers.items():
if transformer is None:
transformed_data[name] = X
elif callable(transformer):
# 自定义函数变换
transformed_data[name] = np.column_stack([
transformer(X[:, i]) for i in range(X.shape[1])
])
else:
# sklearn变换器
transformed_data[name] = transformer.fit_transform(X)
# 可视化变换效果
self.visualize_distribution_transformation(transformed_data, feature_names)
return transformed_data
def visualize_distribution_transformation(self, transformed_data, feature_names):
"""可视化分布变换效果"""
n_methods = len(transformed_data)
n_features = len(feature_names)
fig, axes = plt.subplots(n_features, n_methods, figsize=(20, 12))
for i, feature_name in enumerate(feature_names):
for j, (method_name, data) in enumerate(transformed_data.items()):
ax = axes[i, j]
# 绘制直方图和密度曲线
feature_data = data[:, i]
ax.hist(feature_data, bins=50, alpha=0.7, density=True,
color='skyblue', edgecolor='black')
# 添加统计信息
from scipy import stats
skewness = stats.skew(feature_data)
kurtosis = stats.kurtosis(feature_data)
ax.set_title(f'{feature_name}\n{method_name}\n'
f'偏度: {skewness:.2f}, 峰度: {kurtosis:.2f}')
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def discretization_techniques(self):
"""离散化技术"""
print("\n=== 离散化技术 ===")
# 创建连续数据
np.random.seed(42)
n_samples = 1000
continuous_feature = np.random.normal(50, 15, n_samples)
# 不同的离散化方法
from sklearn.preprocessing import KBinsDiscretizer
discretizers = {
'等宽离散化': KBinsDiscretizer(n_bins=5, encode='ordinal', strategy='uniform'),
'等频离散化': KBinsDiscretizer(n_bins=5, encode='ordinal', strategy='quantile'),
'K-means离散化': KBinsDiscretizer(n_bins=5, encode='ordinal', strategy='kmeans')
}
# 应用离散化
discretized_results = {}
for name, discretizer in discretizers.items():
discretized = discretizer.fit_transform(continuous_feature.reshape(-1, 1))
discretized_results[name] = {
'discretized': discretized.flatten(),
'bin_edges': discretizer.bin_edges_[0]
}
# 可视化离散化效果
self.visualize_discretization(continuous_feature, discretized_results)
return discretized_results
def visualize_discretization(self, original_data, discretized_results):
"""可视化离散化效果"""
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 原始数据分布
axes[0, 0].hist(original_data, bins=50, alpha=0.7, density=True)
axes[0, 0].set_title('原始连续数据分布')
axes[0, 0].set_xlabel('数值')
axes[0, 0].set_ylabel('密度')
axes[0, 0].grid(True, alpha=0.3)
# 不同离散化方法的结果
for i, (name, result) in enumerate(discretized_results.items()):
row = (i + 1) // 2
col = (i + 1) % 2
ax = axes[row, col]
# 绘制离散化结果
discretized = result['discretized']
bin_edges = result['bin_edges']
# 直方图
ax.hist(original_data, bins=bin_edges, alpha=0.5, density=True,
label='原始数据')
# 离散化边界
for edge in bin_edges:
ax.axvline(edge, color='red', linestyle='--', alpha=0.7)
# 离散化后的分布
unique_bins, counts = np.unique(discretized, return_counts=True)
bin_centers = [(bin_edges[int(b)] + bin_edges[int(b)+1]) / 2
for b in unique_bins]
ax.bar(bin_centers, counts/len(discretized),
width=(bin_edges[1]-bin_edges[0])*0.8,
alpha=0.7, color='orange', label='离散化结果')
ax.set_title(f'{name}')
ax.set_xlabel('数值')
ax.set_ylabel('密度/频率')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 演示数值特征处理
print("=== 数值特征处理 ===")
numerical_processor = NumericalFeatureProcessing()
scaled_data = numerical_processor.scaling_methods_comparison()
transformed_data = numerical_processor.distribution_transformation()
discretized_data = numerical_processor.discretization_techniques()
8.3 类别特征处理
8.3.1 类别特征编码
类别特征需要转换为数值形式才能被机器学习算法使用。
class CategoricalFeatureProcessing:
def __init__(self):
self.encoders = {}
def create_categorical_dataset(self):
"""创建包含不同类型类别特征的数据集"""
print("=== 创建类别特征数据集 ===")
np.random.seed(42)
n_samples = 1000
# 不同类型的类别特征
# 1. 无序类别特征(名义变量)
colors = np.random.choice(['红色', '蓝色', '绿色', '黄色'], n_samples)
# 2. 有序类别特征(序数变量)
education = np.random.choice(['小学', '初中', '高中', '本科', '研究生'],
n_samples, p=[0.1, 0.2, 0.3, 0.3, 0.1])
# 3. 高基数类别特征
cities = np.random.choice([f'城市{i}' for i in range(50)], n_samples)
# 4. 二元类别特征
gender = np.random.choice(['男', '女'], n_samples)
# 创建目标变量(与某些类别特征相关)
education_mapping = {'小学': 0, '初中': 1, '高中': 2, '本科': 3, '研究生': 4}
education_numeric = np.array([education_mapping[e] for e in education])
y = (education_numeric + np.random.normal(0, 1, n_samples) +
(colors == '红色').astype(int) +
(gender == '男').astype(int) * 0.5) > 2
y = y.astype(int)
# 组合数据
data = pd.DataFrame({
'color': colors,
'education': education,
'city': cities,
'gender': gender,
'target': y
})
print(f"数据集大小: {data.shape}")
print(f"类别特征统计:")
for col in ['color', 'education', 'city', 'gender']:
print(f" {col}: {data[col].nunique()} 个唯一值")
return data
def encoding_methods_comparison(self, data):
"""比较不同的编码方法"""
print("\n=== 类别特征编码方法比较 ===")
# 准备数据
categorical_features = ['color', 'education', 'city', 'gender']
X = data[categorical_features]
y = data['target']
# 不同的编码方法
encoding_methods = {}
# 1. 标签编码
print("1. 标签编码 (Label Encoding)")
X_label = X.copy()
label_encoders = {}
for col in categorical_features:
le = LabelEncoder()
X_label[col] = le.fit_transform(X[col])
label_encoders[col] = le
encoding_methods['标签编码'] = X_label
# 2. 独热编码
print("2. 独热编码 (One-Hot Encoding)")
X_onehot = pd.get_dummies(X, prefix=categorical_features)
encoding_methods['独热编码'] = X_onehot
# 3. 序数编码(针对有序特征)
print("3. 序数编码 (Ordinal Encoding)")
X_ordinal = X.copy()
# 为education定义顺序
education_order = ['小学', '初中', '高中', '本科', '研究生']
ordinal_encoder = OrdinalEncoder(categories=[
X['color'].unique(),
education_order,
X['city'].unique(),
X['gender'].unique()
])
X_ordinal_encoded = ordinal_encoder.fit_transform(X)
X_ordinal = pd.DataFrame(X_ordinal_encoded, columns=categorical_features)
encoding_methods['序数编码'] = X_ordinal
# 4. 目标编码(均值编码)
print("4. 目标编码 (Target Encoding)")
X_target = X.copy()
for col in categorical_features:
target_mean = data.groupby(col)['target'].mean()
X_target[col] = X[col].map(target_mean)
encoding_methods['目标编码'] = X_target
# 5. 频率编码
print("5. 频率编码 (Frequency Encoding)")
X_freq = X.copy()
for col in categorical_features:
freq_map = X[col].value_counts().to_dict()
X_freq[col] = X[col].map(freq_map)
encoding_methods['频率编码'] = X_freq
# 评估不同编码方法的效果
self.evaluate_encoding_methods(encoding_methods, y)
# 可视化编码效果
self.visualize_encoding_methods(X, encoding_methods)
return encoding_methods
def evaluate_encoding_methods(self, encoding_methods, y):
"""评估不同编码方法的效果"""
print("\n=== 编码方法性能评估 ===")
results = {}
for name, X_encoded in encoding_methods.items():
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(
X_encoded, y, test_size=0.3, random_state=42
)
# 标准化(除了独热编码)
if name != '独热编码':
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
# 训练模型
model = LogisticRegression(random_state=42, max_iter=1000)
model.fit(X_train, y_train)
# 评估性能
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)
results[name] = {
'train_score': train_score,
'test_score': test_score,
'n_features': X_encoded.shape[1]
}
print(f"{name:10}: 训练={train_score:.3f}, 测试={test_score:.3f}, "
f"特征数={X_encoded.shape[1]}")
return results
def visualize_encoding_methods(self, X_original, encoding_methods):
"""可视化编码方法效果"""
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
axes = axes.ravel()
# 原始数据分布
axes[0].bar(range(len(X_original.columns)),
[X_original[col].nunique() for col in X_original.columns])
axes[0].set_title('原始类别特征唯一值数量')
axes[0].set_xlabel('特征')
axes[0].set_ylabel('唯一值数量')
axes[0].set_xticks(range(len(X_original.columns)))
axes[0].set_xticklabels(X_original.columns, rotation=45)
axes[0].grid(True, alpha=0.3)
# 不同编码方法的特征数量
method_names = list(encoding_methods.keys())
feature_counts = [encoding_methods[name].shape[1] for name in method_names]
axes[1].bar(method_names, feature_counts, color='lightcoral')
axes[1].set_title('不同编码方法的特征数量')
axes[1].set_xlabel('编码方法')
axes[1].set_ylabel('特征数量')
axes[1].tick_params(axis='x', rotation=45)
axes[1].grid(True, alpha=0.3)
# 展示部分编码结果
for i, (name, X_encoded) in enumerate(list(encoding_methods.items())[:4]):
ax = axes[i + 2]
if name == '独热编码':
# 独热编码显示前10个特征的分布
feature_subset = X_encoded.iloc[:, :10]
im = ax.imshow(feature_subset.head(50).T, cmap='viridis', aspect='auto')
ax.set_title(f'{name} (前10个特征)')
ax.set_xlabel('样本')
ax.set_ylabel('特征')
else:
# 其他编码方法显示所有特征的分布
if X_encoded.shape[1] <= 10:
im = ax.imshow(X_encoded.head(50).T, cmap='viridis', aspect='auto')
ax.set_title(f'{name}')
ax.set_xlabel('样本')
ax.set_ylabel('特征')
else:
# 如果特征太多,显示相关性矩阵
corr = X_encoded.corr()
im = ax.imshow(corr, cmap='coolwarm', vmin=-1, vmax=1)
ax.set_title(f'{name} 相关性矩阵')
plt.tight_layout()
plt.show()
def high_cardinality_encoding(self):
"""高基数类别特征处理"""
print("\n=== 高基数类别特征处理 ===")
# 创建高基数类别特征
np.random.seed(42)
n_samples = 10000
n_categories = 1000
# 模拟用户ID(高基数)
user_ids = np.random.choice([f'user_{i}' for i in range(n_categories)],
n_samples)
# 模拟其他特征
age = np.random.normal(35, 10, n_samples)
income = np.random.lognormal(10, 0.5, n_samples)
# 创建目标变量(某些用户有更高的转化率)
high_value_users = set(np.random.choice([f'user_{i}' for i in range(n_categories)],
int(0.1 * n_categories), replace=False))
y = np.array([1 if uid in high_value_users else 0 for uid in user_ids])
y = (y + np.random.binomial(1, 0.1, n_samples)) > 0 # 添加噪声
data = pd.DataFrame({
'user_id': user_ids,
'age': age,
'income': income,
'target': y.astype(int)
})
print(f"数据集大小: {data.shape}")
print(f"用户ID唯一值数量: {data['user_id'].nunique()}")
print(f"目标变量分布: {data['target'].value_counts().to_dict()}")
# 不同的高基数处理方法
methods = {}
# 1. 频率编码
freq_map = data['user_id'].value_counts().to_dict()
data['user_id_freq'] = data['user_id'].map(freq_map)
methods['频率编码'] = data[['user_id_freq', 'age', 'income']]
# 2. 目标编码
target_mean = data.groupby('user_id')['target'].mean()
data['user_id_target'] = data['user_id'].map(target_mean)
methods['目标编码'] = data[['user_id_target', 'age', 'income']]
# 3. 哈希编码
from sklearn.feature_extraction import FeatureHasher
hasher = FeatureHasher(n_features=100, input_type='string')
user_id_hashed = hasher.transform(data['user_id']).toarray()
hash_df = pd.DataFrame(user_id_hashed,
columns=[f'hash_{i}' for i in range(100)])
hash_df['age'] = data['age'].values
hash_df['income'] = data['income'].values
methods['哈希编码'] = hash_df
# 4. 嵌入编码(简化版)
# 使用目标编码作为嵌入的简化版本
methods['嵌入编码'] = methods['目标编码'] # 实际应用中会使用神经网络学习嵌入
# 评估不同方法
self.evaluate_high_cardinality_methods(methods, data['target'])
return methods
def evaluate_high_cardinality_methods(self, methods, y):
"""评估高基数处理方法"""
print("\n=== 高基数处理方法性能评估 ===")
results = {}
for name, X in methods.items():
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# 标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 训练模型
model = LogisticRegression(random_state=42, max_iter=1000)
model.fit(X_train_scaled, y_train)
# 评估性能
train_score = model.score(X_train_scaled, y_train)
test_score = model.score(X_test_scaled, y_test)
results[name] = {
'train_score': train_score,
'test_score': test_score,
'n_features': X.shape[1]
}
print(f"{name:10}: 训练={train_score:.3f}, 测试={test_score:.3f}, "
f"特征数={X.shape[1]}")
# 可视化结果
self.visualize_high_cardinality_results(results)
return results
def visualize_high_cardinality_results(self, results):
"""可视化高基数处理结果"""
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
methods = list(results.keys())
train_scores = [results[method]['train_score'] for method in methods]
test_scores = [results[method]['test_score'] for method in methods]
feature_counts = [results[method]['n_features'] for method in methods]
# 性能比较
x = np.arange(len(methods))
width = 0.35
axes[0].bar(x - width/2, train_scores, width, label='训练准确率', alpha=0.8)
axes[0].bar(x + width/2, test_scores, width, label='测试准确率', alpha=0.8)
axes[0].set_xlabel('方法')
axes[0].set_ylabel('准确率')
axes[0].set_title('高基数处理方法性能比较')
axes[0].set_xticks(x)
axes[0].set_xticklabels(methods, rotation=45)
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 特征数量比较
axes[1].bar(methods, feature_counts, color='lightgreen')
axes[1].set_xlabel('方法')
axes[1].set_ylabel('特征数量')
axes[1].set_title('特征数量比较')
axes[1].tick_params(axis='x', rotation=45)
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 演示类别特征处理
print("=== 类别特征处理 ===")
categorical_processor = CategoricalFeatureProcessing()
categorical_data = categorical_processor.create_categorical_dataset()
encoding_methods = categorical_processor.encoding_methods_comparison(categorical_data)
high_cardinality_methods = categorical_processor.high_cardinality_encoding()
8.4 特征构造
8.4.1 多项式特征和交互特征
通过组合现有特征可以创建更有表达力的特征。
class FeatureConstruction:
def __init__(self):
self.constructed_features = {}
def polynomial_features_demo(self):
"""多项式特征演示"""
print("=== 多项式特征构造 ===")
# 创建非线性数据
np.random.seed(42)
n_samples = 500
X1 = np.random.uniform(-2, 2, n_samples)
X2 = np.random.uniform(-2, 2, n_samples)
# 非线性目标函数
y = (X1**2 + X2**2 + X1*X2 + 0.5*X1**3 +
np.random.normal(0, 0.1, n_samples))
X_original = np.column_stack([X1, X2])
# 不同阶数的多项式特征
polynomial_degrees = [1, 2, 3, 4]
results = {}
for degree in polynomial_degrees:
# 生成多项式特征
poly = PolynomialFeatures(degree=degree, include_bias=False)
X_poly = poly.fit_transform(X_original)
# 获取特征名称
feature_names = poly.get_feature_names_out(['X1', 'X2'])
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(
X_poly, y, test_size=0.3, random_state=42
)
# 标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 训练模型
model = LinearRegression()
model.fit(X_train_scaled, y_train)
# 评估性能
train_score = model.score(X_train_scaled, y_train)
test_score = model.score(X_test_scaled, y_test)
results[f'{degree}阶'] = {
'degree': degree,
'n_features': X_poly.shape[1],
'feature_names': feature_names,
'train_score': train_score,
'test_score': test_score,
'X_poly': X_poly,
'model': model,
'scaler': scaler
}
print(f"{degree}阶多项式: 特征数={X_poly.shape[1]}, "
f"训练R²={train_score:.3f}, 测试R²={test_score:.3f}")
# 可视化多项式特征效果
self.visualize_polynomial_features(X_original, y, results)
return results
def visualize_polynomial_features(self, X_original, y, results):
"""可视化多项式特征效果"""
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
# 原始数据分布
scatter = axes[0, 0].scatter(X_original[:, 0], X_original[:, 1],
c=y, cmap='viridis', alpha=0.6)
axes[0, 0].set_xlabel('X1')
axes[0, 0].set_ylabel('X2')
axes[0, 0].set_title('原始数据分布')
plt.colorbar(scatter, ax=axes[0, 0])
# 性能比较
degrees = [result['degree'] for result in results.values()]
train_scores = [result['train_score'] for result in results.values()]
test_scores = [result['test_score'] for result in results.values()]
feature_counts = [result['n_features'] for result in results.values()]
axes[0, 1].plot(degrees, train_scores, 'o-', label='训练R²', linewidth=2)
axes[0, 1].plot(degrees, test_scores, 's-', label='测试R²', linewidth=2)
axes[0, 1].set_xlabel('多项式阶数')
axes[0, 1].set_ylabel('R²分数')
axes[0, 1].set_title('不同阶数多项式性能')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# 特征数量增长
axes[0, 2].bar(degrees, feature_counts, color='lightcoral')
axes[0, 2].set_xlabel('多项式阶数')
axes[0, 2].set_ylabel('特征数量')
axes[0, 2].set_title('特征数量增长')
axes[0, 2].grid(True, alpha=0.3)
# 预测效果可视化(选择最佳模型)
best_result = max(results.values(), key=lambda x: x['test_score'])
# 创建网格进行预测
x1_range = np.linspace(X_original[:, 0].min(), X_original[:, 0].max(), 50)
x2_range = np.linspace(X_original[:, 1].min(), X_original[:, 1].max(), 50)
X1_grid, X2_grid = np.meshgrid(x1_range, x2_range)
grid_points = np.column_stack([X1_grid.ravel(), X2_grid.ravel()])
# 生成多项式特征并预测
poly = PolynomialFeatures(degree=best_result['degree'], include_bias=False)
grid_poly = poly.fit_transform(grid_points)
grid_scaled = best_result['scaler'].transform(grid_poly)
predictions = best_result['model'].predict(grid_scaled)
# 绘制预测表面
predictions_grid = predictions.reshape(X1_grid.shape)
contour = axes[1, 0].contourf(X1_grid, X2_grid, predictions_grid,
levels=20, cmap='viridis', alpha=0.8)
axes[1, 0].scatter(X_original[:, 0], X_original[:, 1],
c=y, cmap='viridis', edgecolors='black', alpha=0.6)
axes[1, 0].set_xlabel('X1')
axes[1, 0].set_ylabel('X2')
axes[1, 0].set_title(f'最佳模型预测 ({best_result["degree"]}阶)')
plt.colorbar(contour, ax=axes[1, 0])
# 特征重要性(系数大小)
if best_result['degree'] <= 3: # 只显示低阶的特征名称
feature_names = best_result['feature_names']
coefficients = np.abs(best_result['model'].coef_)
# 选择最重要的特征
top_indices = np.argsort(coefficients)[-10:]
top_features = [feature_names[i] for i in top_indices]
top_coeffs = coefficients[top_indices]
axes[1, 1].barh(range(len(top_features)), top_coeffs)
axes[1, 1].set_yticks(range(len(top_features)))
axes[1, 1].set_yticklabels(top_features)
axes[1, 1].set_xlabel('系数绝对值')
axes[1, 1].set_title('最重要的多项式特征')
axes[1, 1].grid(True, alpha=0.3)
# 残差分析
X_test_poly = poly.transform(X_original)
X_test_scaled = best_result['scaler'].transform(X_test_poly)
y_pred = best_result['model'].predict(X_test_scaled)
residuals = y - y_pred
axes[1, 2].scatter(y_pred, residuals, alpha=0.6)
axes[1, 2].axhline(y=0, color='red', linestyle='--')
axes[1, 2].set_xlabel('预测值')
axes[1, 2].set_ylabel('残差')
axes[1, 2].set_title('残差分析')
axes[1, 2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def interaction_features_demo(self):
"""交互特征演示"""
print("\n=== 交互特征构造 ===")
# 创建具有交互效应的数据
np.random.seed(42)
n_samples = 1000
# 基础特征
age = np.random.normal(35, 10, n_samples)
income = np.random.lognormal(10, 0.5, n_samples)
education = np.random.choice([0, 1, 2, 3, 4], n_samples) # 教育水平
experience = np.random.normal(10, 5, n_samples) # 工作经验
# 目标变量具有复杂的交互效应
y = (0.1 * age +
0.0001 * income +
0.2 * education +
0.15 * experience +
0.05 * age * education + # 年龄与教育的交互
0.00001 * income * experience + # 收入与经验的交互
0.02 * age * experience + # 年龄与经验的交互
np.random.normal(0, 1, n_samples)) > 5
y = y.astype(int)
# 原始特征
X_original = np.column_stack([age, income, education, experience])
feature_names = ['age', 'income', 'education', 'experience']
# 构造交互特征
interaction_features = []
interaction_names = []
# 两两交互
for i in range(len(feature_names)):
for j in range(i+1, len(feature_names)):
interaction_features.append(X_original[:, i] * X_original[:, j])
interaction_names.append(f'{feature_names[i]}*{feature_names[j]}')
# 组合原始特征和交互特征
X_with_interactions = np.column_stack([X_original] + interaction_features)
all_feature_names = feature_names + interaction_names
# 比较有无交互特征的模型性能
datasets = {
'仅原始特征': (X_original, feature_names),
'包含交互特征': (X_with_interactions, all_feature_names)
}
results = {}
for name, (X, names) in datasets.items():
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# 标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 训练模型
model = LogisticRegression(random_state=42, max_iter=1000)
model.fit(X_train_scaled, y_train)
# 评估性能
train_score = model.score(X_train_scaled, y_train)
test_score = model.score(X_test_scaled, y_test)
results[name] = {
'train_score': train_score,
'test_score': test_score,
'n_features': X.shape[1],
'feature_names': names,
'model': model,
'scaler': scaler
}
print(f"{name}: 特征数={X.shape[1]}, "
f"训练准确率={train_score:.3f}, 测试准确率={test_score:.3f}")
# 可视化交互特征效果
self.visualize_interaction_features(X_original, y, results,
feature_names, interaction_names)
return results
def visualize_interaction_features(self, X_original, y, results,
feature_names, interaction_names):
"""可视化交互特征效果"""
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
# 原始特征相关性
corr_original = np.corrcoef(X_original.T)
im1 = axes[0, 0].imshow(corr_original, cmap='coolwarm', vmin=-1, vmax=1)
axes[0, 0].set_xticks(range(len(feature_names)))
axes[0, 0].set_yticks(range(len(feature_names)))
axes[0, 0].set_xticklabels(feature_names, rotation=45)
axes[0, 0].set_yticklabels(feature_names)
axes[0, 0].set_title('原始特征相关性')
plt.colorbar(im1, ax=axes[0, 0])
# 性能比较
methods = list(results.keys())
train_scores = [results[method]['train_score'] for method in methods]
test_scores = [results[method]['test_score'] for method in methods]
feature_counts = [results[method]['n_features'] for method in methods]
x = np.arange(len(methods))
width = 0.35
axes[0, 1].bar(x - width/2, train_scores, width, label='训练准确率', alpha=0.8)
axes[0, 1].bar(x + width/2, test_scores, width, label='测试准确率', alpha=0.8)
axes[0, 1].set_xlabel('方法')
axes[0, 1].set_ylabel('准确率')
axes[0, 1].set_title('交互特征效果比较')
axes[0, 1].set_xticks(x)
axes[0, 1].set_xticklabels(methods)
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# 特征数量比较
axes[0, 2].bar(methods, feature_counts, color='lightgreen')
axes[0, 2].set_xlabel('方法')
axes[0, 2].set_ylabel('特征数量')
axes[0, 2].set_title('特征数量比较')
axes[0, 2].grid(True, alpha=0.3)
# 特征重要性(系数)
model_with_interactions = results['包含交互特征']['model']
all_names = results['包含交互特征']['feature_names']
coefficients = np.abs(model_with_interactions.coef_[0])
# 选择最重要的特征
top_indices = np.argsort(coefficients)[-10:]
top_features = [all_names[i] for i in top_indices]
top_coeffs = coefficients[top_indices]
axes[1, 0].barh(range(len(top_features)), top_coeffs)
axes[1, 0].set_yticks(range(len(top_features)))
axes[1, 0].set_yticklabels(top_features)
axes[1, 0].set_xlabel('系数绝对值')
axes[1, 0].set_title('最重要的特征(包含交互)')
axes[1, 0].grid(True, alpha=0.3)
# 交互特征可视化(选择一个交互特征)
# 年龄与教育的交互
age_idx = feature_names.index('age')
education_idx = feature_names.index('education')
# 创建分组
education_levels = np.unique(X_original[:, education_idx])
colors = plt.cm.viridis(np.linspace(0, 1, len(education_levels)))
for i, edu_level in enumerate(education_levels):
mask = X_original[:, education_idx] == edu_level
axes[1, 1].scatter(X_original[mask, age_idx], y[mask],
c=[colors[i]], label=f'教育水平{int(edu_level)}',
alpha=0.6)
axes[1, 1].set_xlabel('年龄')
axes[1, 1].set_ylabel('目标变量')
axes[1, 1].set_title('年龄与教育水平的交互效应')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
# 交互特征分布
age_education_interaction = X_original[:, age_idx] * X_original[:, education_idx]
axes[1, 2].hist([age_education_interaction[y==0], age_education_interaction[y==1]],
bins=30, alpha=0.7, label=['类别0', '类别1'])
axes[1, 2].set_xlabel('年龄×教育水平')
axes[1, 2].set_ylabel('频率')
axes[1, 2].set_title('交互特征分布')
axes[1, 2].legend()
axes[1, 2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def aggregation_features_demo(self):
"""聚合特征演示"""
print("\n=== 聚合特征构造 ===")
# 创建分组数据(模拟用户行为数据)
np.random.seed(42)
n_users = 1000
n_transactions = 5000
# 用户基本信息
user_ids = np.arange(n_users)
user_ages = np.random.normal(35, 10, n_users)
user_cities = np.random.choice(['北京', '上海', '广州', '深圳', '杭州'], n_users)
# 交易数据
transaction_user_ids = np.random.choice(user_ids, n_transactions)
transaction_amounts = np.random.lognormal(5, 1, n_transactions)
transaction_categories = np.random.choice(['食品', '服装', '电子', '旅游', '教育'],
n_transactions)
transaction_dates = pd.date_range('2023-01-01', periods=n_transactions, freq='H')
# 创建交易数据框
transactions = pd.DataFrame({
'user_id': transaction_user_ids,
'amount': transaction_amounts,
'category': transaction_categories,
'date': transaction_dates
})
# 用户数据框
users = pd.DataFrame({
'user_id': user_ids,
'age': user_ages,
'city': user_cities
})
print(f"用户数量: {len(users)}")
print(f"交易数量: {len(transactions)}")
# 构造聚合特征
aggregation_features = {}
# 1. 基础统计聚合
basic_agg = transactions.groupby('user_id').agg({
'amount': ['count', 'sum', 'mean', 'std', 'min', 'max'],
'category': 'nunique'
}).round(2)
# 扁平化列名
basic_agg.columns = ['_'.join(col).strip() for col in basic_agg.columns]
basic_agg = basic_agg.reset_index()
# 2. 时间相关聚合
transactions['hour'] = transactions['date'].dt.hour
transactions['day_of_week'] = transactions['date'].dt.dayofweek
transactions['month'] = transactions['date'].dt.month
time_agg = transactions.groupby('user_id').agg({
'hour': ['mean', 'std'],
'day_of_week': 'mean',
'month': 'nunique'
}).round(2)
time_agg.columns = ['_'.join(col).strip() for col in time_agg.columns]
time_agg = time_agg.reset_index()
# 3. 类别相关聚合
category_pivot = transactions.pivot_table(
index='user_id',
columns='category',
values='amount',
aggfunc=['count', 'sum'],
fill_value=0
)
# 扁平化列名
category_pivot.columns = ['_'.join(col).strip() for col in category_pivot.columns]
category_pivot = category_pivot.reset_index()
# 4. 比例特征
ratio_features = transactions.groupby('user_id').apply(
lambda x: pd.Series({
'weekend_ratio': (x['day_of_week'] >= 5).mean(),
'high_amount_ratio': (x['amount'] > x['amount'].quantile(0.8)).mean(),
'recent_activity_ratio': (x['date'] > x['date'].quantile(0.7)).mean()
})
).reset_index()
# 合并所有聚合特征
user_features = users.copy()
for df in [basic_agg, time_agg, category_pivot, ratio_features]:
user_features = user_features.merge(df, on='user_id', how='left')
# 填充缺失值
user_features = user_features.fillna(0)
print(f"聚合后特征数量: {user_features.shape[1]}")
print(f"聚合特征示例:")
print(user_features.head())
# 可视化聚合特征
self.visualize_aggregation_features(transactions, user_features)
return user_features, transactions
def visualize_aggregation_features(self, transactions, user_features):
"""可视化聚合特征"""
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
# 1. 用户交易频率分布
transaction_counts = transactions['user_id'].value_counts()
axes[0, 0].hist(transaction_counts, bins=30, alpha=0.7, edgecolor='black')
axes[0, 0].set_xlabel('交易次数')
axes[0, 0].set_ylabel('用户数量')
axes[0, 0].set_title('用户交易频率分布')
axes[0, 0].grid(True, alpha=0.3)
# 2. 用户平均交易金额分布
axes[0, 1].hist(user_features['amount_mean'], bins=30, alpha=0.7, edgecolor='black')
axes[0, 1].set_xlabel('平均交易金额')
axes[0, 1].set_ylabel('用户数量')
axes[0, 1].set_title('用户平均交易金额分布')
axes[0, 1].grid(True, alpha=0.3)
# 3. 交易类别多样性
axes[0, 2].hist(user_features['category_nunique'], bins=range(1, 7),
alpha=0.7, edgecolor='black')
axes[0, 2].set_xlabel('交易类别数量')
axes[0, 2].set_ylabel('用户数量')
axes[0, 2].set_title('用户交易类别多样性')
axes[0, 2].grid(True, alpha=0.3)
# 4. 时间模式 - 平均交易时间
axes[1, 0].hist(user_features['hour_mean'], bins=24, alpha=0.7, edgecolor='black')
axes[1, 0].set_xlabel('平均交易时间(小时)')
axes[1, 0].set_ylabel('用户数量')
axes[1, 0].set_title('用户平均交易时间分布')
axes[1, 0].grid(True, alpha=0.3)
# 5. 周末交易比例
axes[1, 1].hist(user_features['weekend_ratio'], bins=20, alpha=0.7, edgecolor='black')
axes[1, 1].set_xlabel('周末交易比例')
axes[1, 1].set_ylabel('用户数量')
axes[1, 1].set_title('用户周末交易比例分布')
axes[1, 1].grid(True, alpha=0.3)
# 6. 特征相关性热图(选择部分数值特征)
numeric_features = user_features.select_dtypes(include=[np.number]).columns[:10]
corr_matrix = user_features[numeric_features].corr()
im = axes[1, 2].imshow(corr_matrix, cmap='coolwarm', vmin=-1, vmax=1)
axes[1, 2].set_xticks(range(len(numeric_features)))
axes[1, 2].set_yticks(range(len(numeric_features)))
axes[1, 2].set_xticklabels(numeric_features, rotation=45, ha='right')
axes[1, 2].set_yticklabels(numeric_features)
axes[1, 2].set_title('聚合特征相关性')
plt.colorbar(im, ax=axes[1, 2])
plt.tight_layout()
plt.show()
# 演示特征构造
print("=== 特征构造 ===")
feature_constructor = FeatureConstruction()
polynomial_results = feature_constructor.polynomial_features_demo()
interaction_results = feature_constructor.interaction_features_demo()
user_features, transactions = feature_constructor.aggregation_features_demo()
8.5 文本特征提取
8.5.1 词袋模型和TF-IDF
文本数据需要转换为数值特征才能被机器学习算法处理。
class TextFeatureExtraction:
def __init__(self):
self.vectorizers = {}
def create_text_dataset(self):
"""创建文本数据集"""
print("=== 创建文本数据集 ===")
# 使用20newsgroups数据集的子集
categories = ['alt.atheism', 'soc.religion.christian',
'comp.graphics', 'sci.med']
# 加载数据
newsgroups_train = fetch_20newsgroups(subset='train',
categories=categories,
shuffle=True,
random_state=42,
remove=('headers', 'footers', 'quotes'))
# 创建简化的文本数据
texts = newsgroups_train.data[:1000] # 使用前1000个文档
labels = newsgroups_train.target[:1000]
target_names = newsgroups_train.target_names
print(f"文档数量: {len(texts)}")
print(f"类别数量: {len(target_names)}")
print(f"类别名称: {target_names}")
print(f"类别分布: {np.bincount(labels)}")
# 显示示例文档
print(f"\n示例文档 (类别: {target_names[labels[0]]}):")
print(texts[0][:300] + "...")
return texts, labels, target_names
def bag_of_words_demo(self, texts, labels):
"""词袋模型演示"""
print("\n=== 词袋模型 (Bag of Words) ===")
# 不同的词袋模型配置
vectorizers = {
'基础词袋': CountVectorizer(max_features=1000, stop_words='english'),
'二元词袋': CountVectorizer(max_features=1000, ngram_range=(1, 2),
stop_words='english'),
'限制词频': CountVectorizer(max_features=1000, min_df=2, max_df=0.8,
stop_words='english'),
'二进制词袋': CountVectorizer(max_features=1000, binary=True,
stop_words='english')
}
results = {}
for name, vectorizer in vectorizers.items():
print(f"\n{name}:")
# 拟合和转换
X = vectorizer.fit_transform(texts)
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(
X, labels, test_size=0.3, random_state=42, stratify=labels
)
# 训练分类器
classifier = LogisticRegression(random_state=42, max_iter=1000)
classifier.fit(X_train, y_train)
# 评估性能
train_score = classifier.score(X_train, y_train)
test_score = classifier.score(X_test, y_test)
# 获取词汇表信息
vocabulary_size = len(vectorizer.vocabulary_)
feature_names = vectorizer.get_feature_names_out()
results[name] = {
'vectorizer': vectorizer,
'X': X,
'train_score': train_score,
'test_score': test_score,
'vocabulary_size': vocabulary_size,
'feature_names': feature_names,
'classifier': classifier
}
print(f" 词汇表大小: {vocabulary_size}")
print(f" 特征矩阵形状: {X.shape}")
print(f" 稀疏度: {1 - X.nnz / (X.shape[0] * X.shape[1]):.3f}")
print(f" 训练准确率: {train_score:.3f}")
print(f" 测试准确率: {test_score:.3f}")
# 可视化词袋模型效果
self.visualize_bow_results(results, texts, labels)
return results
def tfidf_demo(self, texts, labels):
"""TF-IDF演示"""
print("\n=== TF-IDF特征提取 ===")
# 不同的TF-IDF配置
tfidf_vectorizers = {
'基础TF-IDF': TfidfVectorizer(max_features=1000, stop_words='english'),
'L1正则化': TfidfVectorizer(max_features=1000, norm='l1',
stop_words='english'),
'子线性缩放': TfidfVectorizer(max_features=1000, sublinear_tf=True,
stop_words='english'),
'二元组合': TfidfVectorizer(max_features=1000, ngram_range=(1, 2),
stop_words='english')
}
results = {}
for name, vectorizer in tfidf_vectorizers.items():
print(f"\n{name}:")
# 拟合和转换
X = vectorizer.fit_transform(texts)
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(
X, labels, test_size=0.3, random_state=42, stratify=labels
)
# 训练分类器
classifier = LogisticRegression(random_state=42, max_iter=1000)
classifier.fit(X_train, y_train)
# 评估性能
train_score = classifier.score(X_train, y_train)
test_score = classifier.score(X_test, y_test)
results[name] = {
'vectorizer': vectorizer,
'X': X,
'train_score': train_score,
'test_score': test_score,
'classifier': classifier
}
print(f" 特征矩阵形状: {X.shape}")
print(f" 稀疏度: {1 - X.nnz / (X.shape[0] * X.shape[1]):.3f}")
print(f" 训练准确率: {train_score:.3f}")
print(f" 测试准确率: {test_score:.3f}")
# 可视化TF-IDF效果
self.visualize_tfidf_results(results, texts, labels)
return results
def visualize_bow_results(self, results, texts, labels):
"""可视化词袋模型结果"""
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
# 1. 性能比较
methods = list(results.keys())
train_scores = [results[method]['train_score'] for method in methods]
test_scores = [results[method]['test_score'] for method in methods]
x = np.arange(len(methods))
width = 0.35
axes[0, 0].bar(x - width/2, train_scores, width, label='训练准确率', alpha=0.8)
axes[0, 0].bar(x + width/2, test_scores, width, label='测试准确率', alpha=0.8)
axes[0, 0].set_xlabel('方法')
axes[0, 0].set_ylabel('准确率')
axes[0, 0].set_title('词袋模型性能比较')
axes[0, 0].set_xticks(x)
axes[0, 0].set_xticklabels(methods, rotation=45)
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 2. 词汇表大小比较
vocab_sizes = [results[method]['vocabulary_size'] for method in methods]
axes[0, 1].bar(methods, vocab_sizes, color='lightcoral')
axes[0, 1].set_xlabel('方法')
axes[0, 1].set_ylabel('词汇表大小')
axes[0, 1].set_title('词汇表大小比较')
axes[0, 1].tick_params(axis='x', rotation=45)
axes[0, 1].grid(True, alpha=0.3)
# 3. 特征重要性(选择基础词袋模型)
base_result = results['基础词袋']
feature_names = base_result['feature_names']
classifier = base_result['classifier']
# 获取每个类别的重要特征
class_names = ['atheism', 'christian', 'graphics', 'med']
for i, class_name in enumerate(class_names):
if i >= 4: # 只显示4个类别
break
# 获取该类别的系数
coef = classifier.coef_[i]
top_indices = np.argsort(coef)[-10:] # 最重要的10个特征
top_features = [feature_names[idx] for idx in top_indices]
top_scores = coef[top_indices]
if i < 2:
ax = axes[1, i]
ax.barh(range(len(top_features)), top_scores)
ax.set_yticks(range(len(top_features)))
ax.set_yticklabels(top_features)
ax.set_xlabel('重要性分数')
ax.set_title(f'{class_name}类别重要特征')
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def visualize_tfidf_results(self, results, texts, labels):
"""可视化TF-IDF结果"""
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
# 1. 性能比较
methods = list(results.keys())
train_scores = [results[method]['train_score'] for method in methods]
test_scores = [results[method]['test_score'] for method in methods]
x = np.arange(len(methods))
width = 0.35
axes[0, 0].bar(x - width/2, train_scores, width, label='训练准确率', alpha=0.8)
axes[0, 0].bar(x + width/2, test_scores, width, label='测试准确率', alpha=0.8)
axes[0, 0].set_xlabel('方法')
axes[0, 0].set_ylabel('准确率')
axes[0, 0].set_title('TF-IDF方法性能比较')
axes[0, 0].set_xticks(x)
axes[0, 0].set_xticklabels(methods, rotation=45)
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 2. 稀疏度比较
sparsity = []
for method in methods:
X = results[method]['X']
sparsity.append(1 - X.nnz / (X.shape[0] * X.shape[1]))
axes[0, 1].bar(methods, sparsity, color='lightgreen')
axes[0, 1].set_xlabel('方法')
axes[0, 1].set_ylabel('稀疏度')
axes[0, 1].set_title('特征矩阵稀疏度比较')
axes[0, 1].tick_params(axis='x', rotation=45)
axes[0, 1].grid(True, alpha=0.3)
# 3. TF-IDF值分布
base_tfidf = results['基础TF-IDF']['X']
tfidf_values = base_tfidf.data # 非零TF-IDF值
axes[1, 0].hist(tfidf_values, bins=50, alpha=0.7, edgecolor='black')
axes[1, 0].set_xlabel('TF-IDF值')
axes[1, 0].set_ylabel('频率')
axes[1, 0].set_title('TF-IDF值分布')
axes[1, 0].grid(True, alpha=0.3)
# 4. 文档长度与TF-IDF的关系
doc_lengths = [len(text.split()) for text in texts[:100]] # 前100个文档
avg_tfidf_scores = []
for i in range(min(100, base_tfidf.shape[0])):
doc_tfidf = base_tfidf[i].data
avg_score = np.mean(doc_tfidf) if len(doc_tfidf) > 0 else 0
avg_tfidf_scores.append(avg_score)
axes[1, 1].scatter(doc_lengths, avg_tfidf_scores, alpha=0.6)
axes[1, 1].set_xlabel('文档长度(词数)')
axes[1, 1].set_ylabel('平均TF-IDF分数')
axes[1, 1].set_title('文档长度与TF-IDF关系')
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def advanced_text_features(self, texts, labels):
"""高级文本特征"""
print("\n=== 高级文本特征 ===")
# 1. 哈希向量化
print("1. 哈希向量化 (Hashing Vectorizer)")
hash_vectorizer = HashingVectorizer(n_features=1000, stop_words='english')
X_hash = hash_vectorizer.fit_transform(texts)
# 2. 字符级n-gram
print("2. 字符级n-gram")
char_vectorizer = TfidfVectorizer(analyzer='char', ngram_range=(2, 4),
max_features=1000)
X_char = char_vectorizer.fit_transform(texts)
# 3. 文本统计特征
print("3. 文本统计特征")
text_stats = []
for text in texts:
stats = {
'length': len(text),
'word_count': len(text.split()),
'avg_word_length': np.mean([len(word) for word in text.split()]),
'sentence_count': text.count('.') + text.count('!') + text.count('?'),
'uppercase_ratio': sum(1 for c in text if c.isupper()) / len(text),
'digit_ratio': sum(1 for c in text if c.isdigit()) / len(text)
}
text_stats.append(list(stats.values()))
X_stats = np.array(text_stats)
# 评估不同特征的效果
feature_sets = {
'哈希向量': X_hash,
'字符n-gram': X_char,
'统计特征': X_stats
}
results = {}
for name, X in feature_sets.items():
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(
X, labels, test_size=0.3, random_state=42, stratify=labels
)
# 标准化(对统计特征)
if name == '统计特征':
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
# 训练分类器
classifier = LogisticRegression(random_state=42, max_iter=1000)
classifier.fit(X_train, y_train)
# 评估性能
train_score = classifier.score(X_train, y_train)
test_score = classifier.score(X_test, y_test)
results[name] = {
'train_score': train_score,
'test_score': test_score,
'n_features': X.shape[1]
}
print(f"{name}: 特征数={X.shape[1]}, "
f"训练准确率={train_score:.3f}, 测试准确率={test_score:.3f}")
# 可视化高级特征效果
self.visualize_advanced_features(results, X_stats, texts)
return results
def visualize_advanced_features(self, results, X_stats, texts):
"""可视化高级文本特征"""
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
# 1. 性能比较
methods = list(results.keys())
train_scores = [results[method]['train_score'] for method in methods]
test_scores = [results[method]['test_score'] for method in methods]
feature_counts = [results[method]['n_features'] for method in methods]
x = np.arange(len(methods))
width = 0.35
axes[0, 0].bar(x - width/2, train_scores, width, label='训练准确率', alpha=0.8)
axes[0, 0].bar(x + width/2, test_scores, width, label='测试准确率', alpha=0.8)
axes[0, 0].set_xlabel('方法')
axes[0, 0].set_ylabel('准确率')
axes[0, 0].set_title('高级文本特征性能比较')
axes[0, 0].set_xticks(x)
axes[0, 0].set_xticklabels(methods, rotation=45)
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 2. 特征数量比较
axes[0, 1].bar(methods, feature_counts, color='lightcoral')
axes[0, 1].set_xlabel('方法')
axes[0, 1].set_ylabel('特征数量')
axes[0, 1].set_title('特征数量比较')
axes[0, 1].tick_params(axis='x', rotation=45)
axes[0, 1].grid(True, alpha=0.3)
# 3. 文本统计特征分布
stat_names = ['文本长度', '词数', '平均词长', '句子数', '大写比例', '数字比例']
for i in range(min(4, len(stat_names))):
if i < 2:
ax = axes[1, i]
ax.hist(X_stats[:, i], bins=30, alpha=0.7, edgecolor='black')
ax.set_xlabel(stat_names[i])
ax.set_ylabel('频率')
ax.set_title(f'{stat_names[i]}分布')
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 演示文本特征提取
print("=== 文本特征提取 ===")
text_extractor = TextFeatureExtraction()
texts, labels, target_names = text_extractor.create_text_dataset()
bow_results = text_extractor.bag_of_words_demo(texts, labels)
tfidf_results = text_extractor.tfidf_demo(texts, labels)
advanced_results = text_extractor.advanced_text_features(texts, labels)
8.6 时间序列特征工程
8.6.1 时间特征提取
时间序列数据包含丰富的时间模式信息,需要提取相关特征。
class TimeSeriesFeatureEngineering:
def __init__(self):
self.features = {}
def create_time_series_data(self):
"""创建时间序列数据"""
print("=== 创建时间序列数据 ===")
# 创建日期范围
date_range = pd.date_range(start='2020-01-01', end='2023-12-31', freq='D')
# 创建模拟销售数据
np.random.seed(42)
n_days = len(date_range)
# 基础趋势
trend = np.linspace(100, 200, n_days)
# 季节性模式(年度)
yearly_seasonal = 20 * np.sin(2 * np.pi * np.arange(n_days) / 365.25)
# 周期性模式(周)
weekly_seasonal = 10 * np.sin(2 * np.pi * np.arange(n_days) / 7)
# 随机噪声
noise = np.random.normal(0, 15, n_days)
# 特殊事件影响
special_events = np.zeros(n_days)
# 黑色星期五效应
for year in range(2020, 2024):
black_friday = pd.Timestamp(f'{year}-11-25')
if black_friday in date_range:
idx = date_range.get_loc(black_friday)
special_events[idx:idx+3] = 50 # 3天的促销效应
# 组合所有成分
sales = trend + yearly_seasonal + weekly_seasonal + noise + special_events
sales = np.maximum(sales, 0) # 确保销售额非负
# 创建DataFrame
df = pd.DataFrame({
'date': date_range,
'sales': sales,
'trend': trend,
'yearly_seasonal': yearly_seasonal,
'weekly_seasonal': weekly_seasonal,
'noise': noise,
'special_events': special_events
})
print(f"数据形状: {df.shape}")
print(f"日期范围: {df['date'].min()} 到 {df['date'].max()}")
print(f"销售额统计:")
print(df['sales'].describe())
# 可视化原始数据
self.visualize_time_series_data(df)
return df
def extract_time_features(self, df):
"""提取时间特征"""
print("\n=== 提取时间特征 ===")
# 复制数据
df_features = df.copy()
# 基础时间特征
df_features['year'] = df_features['date'].dt.year
df_features['month'] = df_features['date'].dt.month
df_features['day'] = df_features['date'].dt.day
df_features['dayofweek'] = df_features['date'].dt.dayofweek
df_features['dayofyear'] = df_features['date'].dt.dayofyear
df_features['quarter'] = df_features['date'].dt.quarter
df_features['week'] = df_features['date'].dt.isocalendar().week
# 周末标识
df_features['is_weekend'] = (df_features['dayofweek'] >= 5).astype(int)
# 月初月末标识
df_features['is_month_start'] = df_features['date'].dt.is_month_start.astype(int)
df_features['is_month_end'] = df_features['date'].dt.is_month_end.astype(int)
# 季节标识
df_features['is_spring'] = df_features['month'].isin([3, 4, 5]).astype(int)
df_features['is_summer'] = df_features['month'].isin([6, 7, 8]).astype(int)
df_features['is_autumn'] = df_features['month'].isin([9, 10, 11]).astype(int)
df_features['is_winter'] = df_features['month'].isin([12, 1, 2]).astype(int)
# 节假日标识(简化版)
df_features['is_holiday'] = 0
# 新年
df_features.loc[df_features['date'].dt.strftime('%m-%d') == '01-01', 'is_holiday'] = 1
# 圣诞节
df_features.loc[df_features['date'].dt.strftime('%m-%d') == '12-25', 'is_holiday'] = 1
# 感恩节(11月第四个星期四)
for year in df_features['year'].unique():
thanksgiving = pd.Timestamp(f'{year}-11-01') + pd.DateOffset(days=21)
while thanksgiving.dayofweek != 3: # 星期四
thanksgiving += pd.DateOffset(days=1)
df_features.loc[df_features['date'] == thanksgiving, 'is_holiday'] = 1
# 循环编码(处理周期性)
df_features['month_sin'] = np.sin(2 * np.pi * df_features['month'] / 12)
df_features['month_cos'] = np.cos(2 * np.pi * df_features['month'] / 12)
df_features['dayofweek_sin'] = np.sin(2 * np.pi * df_features['dayofweek'] / 7)
df_features['dayofweek_cos'] = np.cos(2 * np.pi * df_features['dayofweek'] / 7)
df_features['dayofyear_sin'] = np.sin(2 * np.pi * df_features['dayofyear'] / 365.25)
df_features['dayofyear_cos'] = np.cos(2 * np.pi * df_features['dayofyear'] / 365.25)
print("提取的时间特征:")
time_features = [col for col in df_features.columns if col not in ['date', 'sales', 'trend', 'yearly_seasonal', 'weekly_seasonal', 'noise', 'special_events']]
print(time_features)
# 可视化时间特征
self.visualize_time_features(df_features, time_features)
return df_features, time_features
def create_lag_features(self, df, target_col='sales', lags=[1, 7, 30]):
"""创建滞后特征"""
print(f"\n=== 创建滞后特征 (lags: {lags}) ===")
df_lag = df.copy()
# 滞后特征
for lag in lags:
df_lag[f'{target_col}_lag_{lag}'] = df_lag[target_col].shift(lag)
# 滚动窗口特征
windows = [7, 14, 30]
for window in windows:
df_lag[f'{target_col}_rolling_mean_{window}'] = df_lag[target_col].rolling(window=window).mean()
df_lag[f'{target_col}_rolling_std_{window}'] = df_lag[target_col].rolling(window=window).std()
df_lag[f'{target_col}_rolling_min_{window}'] = df_lag[target_col].rolling(window=window).min()
df_lag[f'{target_col}_rolling_max_{window}'] = df_lag[target_col].rolling(window=window).max()
# 指数加权移动平均
alphas = [0.1, 0.3, 0.5]
for alpha in alphas:
df_lag[f'{target_col}_ewm_{alpha}'] = df_lag[target_col].ewm(alpha=alpha).mean()
# 差分特征
df_lag[f'{target_col}_diff_1'] = df_lag[target_col].diff(1)
df_lag[f'{target_col}_diff_7'] = df_lag[target_col].diff(7)
# 变化率特征
df_lag[f'{target_col}_pct_change_1'] = df_lag[target_col].pct_change(1)
df_lag[f'{target_col}_pct_change_7'] = df_lag[target_col].pct_change(7)
# 移除包含NaN的行
df_lag_clean = df_lag.dropna()
lag_features = [col for col in df_lag.columns if 'lag' in col or 'rolling' in col or 'ewm' in col or 'diff' in col or 'pct_change' in col]
print(f"创建的滞后特征数量: {len(lag_features)}")
print(f"清理后数据形状: {df_lag_clean.shape}")
# 可视化滞后特征
self.visualize_lag_features(df_lag_clean, target_col, lag_features)
return df_lag_clean, lag_features
def visualize_time_series_data(self, df):
"""可视化时间序列数据"""
fig, axes = plt.subplots(3, 2, figsize=(15, 12))
# 1. 原始销售数据
axes[0, 0].plot(df['date'], df['sales'], linewidth=1)
axes[0, 0].set_title('原始销售数据')
axes[0, 0].set_xlabel('日期')
axes[0, 0].set_ylabel('销售额')
axes[0, 0].grid(True, alpha=0.3)
# 2. 趋势分解
axes[0, 1].plot(df['date'], df['trend'], label='趋势', linewidth=2)
axes[0, 1].plot(df['date'], df['yearly_seasonal'], label='年度季节性', alpha=0.7)
axes[0, 1].plot(df['date'], df['weekly_seasonal'], label='周季节性', alpha=0.7)
axes[0, 1].set_title('时间序列分解')
axes[0, 1].set_xlabel('日期')
axes[0, 1].set_ylabel('值')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# 3. 月度销售模式
monthly_sales = df.groupby(df['date'].dt.month)['sales'].mean()
axes[1, 0].bar(monthly_sales.index, monthly_sales.values)
axes[1, 0].set_title('月度平均销售额')
axes[1, 0].set_xlabel('月份')
axes[1, 0].set_ylabel('平均销售额')
axes[1, 0].grid(True, alpha=0.3)
# 4. 周度销售模式
weekly_sales = df.groupby(df['date'].dt.dayofweek)['sales'].mean()
weekdays = ['周一', '周二', '周三', '周四', '周五', '周六', '周日']
axes[1, 1].bar(range(7), weekly_sales.values)
axes[1, 1].set_title('周度平均销售额')
axes[1, 1].set_xlabel('星期')
axes[1, 1].set_ylabel('平均销售额')
axes[1, 1].set_xticks(range(7))
axes[1, 1].set_xticklabels(weekdays, rotation=45)
axes[1, 1].grid(True, alpha=0.3)
# 5. 年度销售趋势
yearly_sales = df.groupby(df['date'].dt.year)['sales'].mean()
axes[2, 0].plot(yearly_sales.index, yearly_sales.values, marker='o', linewidth=2)
axes[2, 0].set_title('年度平均销售额')
axes[2, 0].set_xlabel('年份')
axes[2, 0].set_ylabel('平均销售额')
axes[2, 0].grid(True, alpha=0.3)
# 6. 销售额分布
axes[2, 1].hist(df['sales'], bins=50, alpha=0.7, edgecolor='black')
axes[2, 1].set_title('销售额分布')
axes[2, 1].set_xlabel('销售额')
axes[2, 1].set_ylabel('频率')
axes[2, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def visualize_time_features(self, df, time_features):
"""可视化时间特征"""
fig, axes = plt.subplots(2, 3, figsize=(18, 10))
# 1. 循环编码可视化
axes[0, 0].scatter(df['month_sin'], df['month_cos'], c=df['month'], cmap='tab12', alpha=0.6)
axes[0, 0].set_xlabel('Month Sin')
axes[0, 0].set_ylabel('Month Cos')
axes[0, 0].set_title('月份循环编码')
axes[0, 0].grid(True, alpha=0.3)
# 2. 周几循环编码
axes[0, 1].scatter(df['dayofweek_sin'], df['dayofweek_cos'], c=df['dayofweek'], cmap='tab7', alpha=0.6)
axes[0, 1].set_xlabel('Dayofweek Sin')
axes[0, 1].set_ylabel('Dayofweek Cos')
axes[0, 1].set_title('星期循环编码')
axes[0, 1].grid(True, alpha=0.3)
# 3. 季节性特征与销售额关系
seasonal_features = ['is_spring', 'is_summer', 'is_autumn', 'is_winter']
seasonal_sales = []
for feature in seasonal_features:
seasonal_sales.append(df[df[feature] == 1]['sales'].mean())
axes[0, 2].bar(range(len(seasonal_features)), seasonal_sales)
axes[0, 2].set_title('季节性销售额')
axes[0, 2].set_xlabel('季节')
axes[0, 2].set_ylabel('平均销售额')
axes[0, 2].set_xticks(range(len(seasonal_features)))
axes[0, 2].set_xticklabels(['春', '夏', '秋', '冬'])
axes[0, 2].grid(True, alpha=0.3)
# 4. 周末vs工作日
weekend_sales = df[df['is_weekend'] == 1]['sales'].mean()
weekday_sales = df[df['is_weekend'] == 0]['sales'].mean()
axes[1, 0].bar(['工作日', '周末'], [weekday_sales, weekend_sales])
axes[1, 0].set_title('工作日vs周末销售额')
axes[1, 0].set_ylabel('平均销售额')
axes[1, 0].grid(True, alpha=0.3)
# 5. 节假日效应
holiday_sales = df[df['is_holiday'] == 1]['sales'].mean()
normal_sales = df[df['is_holiday'] == 0]['sales'].mean()
axes[1, 1].bar(['普通日', '节假日'], [normal_sales, holiday_sales])
axes[1, 1].set_title('节假日vs普通日销售额')
axes[1, 1].set_ylabel('平均销售额')
axes[1, 1].grid(True, alpha=0.3)
# 6. 特征相关性热图
feature_subset = ['month', 'dayofweek', 'is_weekend', 'is_holiday', 'quarter']
corr_matrix = df[feature_subset + ['sales']].corr()
im = axes[1, 2].imshow(corr_matrix, cmap='coolwarm', aspect='auto', vmin=-1, vmax=1)
axes[1, 2].set_xticks(range(len(corr_matrix.columns)))
axes[1, 2].set_yticks(range(len(corr_matrix.columns)))
axes[1, 2].set_xticklabels(corr_matrix.columns, rotation=45)
axes[1, 2].set_yticklabels(corr_matrix.columns)
axes[1, 2].set_title('特征相关性热图')
# 添加相关系数文本
for i in range(len(corr_matrix.columns)):
for j in range(len(corr_matrix.columns)):
text = axes[1, 2].text(j, i, f'{corr_matrix.iloc[i, j]:.2f}',
ha="center", va="center", color="black", fontsize=8)
plt.colorbar(im, ax=axes[1, 2])
plt.tight_layout()
plt.show()
def visualize_lag_features(self, df, target_col, lag_features):
"""可视化滞后特征"""
fig, axes = plt.subplots(2, 3, figsize=(18, 10))
# 1. 滞后特征与目标变量的相关性
lag_only_features = [f for f in lag_features if 'lag' in f and 'rolling' not in f][:5]
correlations = []
for feature in lag_only_features:
corr = df[target_col].corr(df[feature])
correlations.append(corr)
axes[0, 0].bar(range(len(lag_only_features)), correlations)
axes[0, 0].set_title('滞后特征相关性')
axes[0, 0].set_xlabel('滞后特征')
axes[0, 0].set_ylabel('相关系数')
axes[0, 0].set_xticks(range(len(lag_only_features)))
axes[0, 0].set_xticklabels([f.replace(f'{target_col}_', '') for f in lag_only_features], rotation=45)
axes[0, 0].grid(True, alpha=0.3)
# 2. 滚动窗口特征
rolling_features = [f for f in lag_features if 'rolling_mean' in f]
sample_data = df.tail(100) # 最后100天
axes[0, 1].plot(sample_data.index, sample_data[target_col], label='原始数据', linewidth=2)
for feature in rolling_features:
axes[0, 1].plot(sample_data.index, sample_data[feature],
label=feature.replace(f'{target_col}_', ''), alpha=0.7)
axes[0, 1].set_title('滚动平均特征')
axes[0, 1].set_xlabel('时间')
axes[0, 1].set_ylabel('值')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# 3. 指数加权移动平均
ewm_features = [f for f in lag_features if 'ewm' in f]
axes[0, 2].plot(sample_data.index, sample_data[target_col], label='原始数据', linewidth=2)
for feature in ewm_features:
axes[0, 2].plot(sample_data.index, sample_data[feature],
label=feature.replace(f'{target_col}_', ''), alpha=0.7)
axes[0, 2].set_title('指数加权移动平均')
axes[0, 2].set_xlabel('时间')
axes[0, 2].set_ylabel('值')
axes[0, 2].legend()
axes[0, 2].grid(True, alpha=0.3)
# 4. 差分特征
diff_features = [f for f in lag_features if 'diff' in f]
for i, feature in enumerate(diff_features):
axes[1, 0].plot(sample_data.index, sample_data[feature],
label=feature.replace(f'{target_col}_', ''), alpha=0.7)
axes[1, 0].set_title('差分特征')
axes[1, 0].set_xlabel('时间')
axes[1, 0].set_ylabel('差分值')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
# 5. 变化率特征
pct_features = [f for f in lag_features if 'pct_change' in f]
for feature in pct_features:
axes[1, 1].plot(sample_data.index, sample_data[feature],
label=feature.replace(f'{target_col}_', ''), alpha=0.7)
axes[1, 1].set_title('变化率特征')
axes[1, 1].set_xlabel('时间')
axes[1, 1].set_ylabel('变化率')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
# 6. 特征重要性(使用随机森林)
from sklearn.ensemble import RandomForestRegressor
# 准备特征和目标
feature_cols = lag_features[:10] # 选择前10个特征
X = df[feature_cols].fillna(0)
y = df[target_col]
# 训练随机森林
rf = RandomForestRegressor(n_estimators=100, random_state=42)
rf.fit(X, y)
# 获取特征重要性
importances = rf.feature_importances_
feature_names = [f.replace(f'{target_col}_', '') for f in feature_cols]
# 排序
indices = np.argsort(importances)[::-1]
axes[1, 2].bar(range(len(feature_cols)), importances[indices])
axes[1, 2].set_title('滞后特征重要性')
axes[1, 2].set_xlabel('特征')
axes[1, 2].set_ylabel('重要性')
axes[1, 2].set_xticks(range(len(feature_cols)))
axes[1, 2].set_xticklabels([feature_names[i] for i in indices], rotation=45)
axes[1, 2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 演示时间序列特征工程
print("=== 时间序列特征工程 ===")
ts_engineer = TimeSeriesFeatureEngineering()
ts_data = ts_engineer.create_time_series_data()
ts_features_data, time_features = ts_engineer.extract_time_features(ts_data)
ts_lag_data, lag_features = ts_engineer.create_lag_features(ts_features_data)
8.7 综合案例:房价预测特征工程
8.7.1 项目背景与目标
在这个综合案例中,我们将运用本章学到的所有特征工程技术来解决房价预测问题。
class HousePricePredictionFeatureEngineering:
def __init__(self):
self.feature_transformers = {}
self.feature_importance = {}
def create_house_dataset(self):
"""创建房价数据集"""
print("=== 创建房价数据集 ===")
np.random.seed(42)
n_samples = 5000
# 基础特征
data = {
# 数值特征
'area': np.random.normal(150, 50, n_samples), # 面积
'bedrooms': np.random.poisson(3, n_samples), # 卧室数
'bathrooms': np.random.poisson(2, n_samples), # 浴室数
'age': np.random.exponential(10, n_samples), # 房龄
'garage_size': np.random.poisson(1, n_samples), # 车库大小
# 类别特征
'neighborhood': np.random.choice(['Downtown', 'Suburb', 'Rural', 'Waterfront'], n_samples,
p=[0.3, 0.4, 0.2, 0.1]),
'house_type': np.random.choice(['Apartment', 'House', 'Condo', 'Townhouse'], n_samples,
p=[0.25, 0.35, 0.25, 0.15]),
'heating_type': np.random.choice(['Gas', 'Electric', 'Oil', 'Solar'], n_samples,
p=[0.5, 0.3, 0.15, 0.05]),
# 布尔特征
'has_pool': np.random.choice([0, 1], n_samples, p=[0.8, 0.2]),
'has_garden': np.random.choice([0, 1], n_samples, p=[0.6, 0.4]),
'has_parking': np.random.choice([0, 1], n_samples, p=[0.3, 0.7]),
'recently_renovated': np.random.choice([0, 1], n_samples, p=[0.7, 0.3]),
}
# 创建DataFrame
df = pd.DataFrame(data)
# 确保数值特征的合理性
df['area'] = np.clip(df['area'], 50, 500)
df['bedrooms'] = np.clip(df['bedrooms'], 1, 6)
df['bathrooms'] = np.clip(df['bathrooms'], 1, 4)
df['age'] = np.clip(df['age'], 0, 100)
df['garage_size'] = np.clip(df['garage_size'], 0, 3)
# 生成目标变量(房价)
# 基础价格
base_price = 200000
# 面积影响
area_effect = df['area'] * 800
# 卧室和浴室影响
room_effect = df['bedrooms'] * 15000 + df['bathrooms'] * 10000
# 房龄影响(负相关)
age_effect = -df['age'] * 1000
# 地区影响
neighborhood_effect = df['neighborhood'].map({
'Downtown': 100000, 'Waterfront': 150000, 'Suburb': 50000, 'Rural': 0
})
# 房型影响
house_type_effect = df['house_type'].map({
'House': 50000, 'Condo': 20000, 'Townhouse': 30000, 'Apartment': 0
})
# 设施影响
amenity_effect = (df['has_pool'] * 25000 +
df['has_garden'] * 15000 +
df['has_parking'] * 10000 +
df['recently_renovated'] * 20000)
# 随机噪声
noise = np.random.normal(0, 30000, n_samples)
# 计算最终价格
df['price'] = (base_price + area_effect + room_effect + age_effect +
neighborhood_effect + house_type_effect + amenity_effect + noise)
# 确保价格为正
df['price'] = np.maximum(df['price'], 50000)
print(f"数据集形状: {df.shape}")
print(f"房价统计:")
print(df['price'].describe())
# 可视化数据集
self.visualize_house_dataset(df)
return df
def comprehensive_feature_engineering(self, df):
"""综合特征工程"""
print("\n=== 综合特征工程 ===")
df_features = df.copy()
# 1. 数值特征处理
print("1. 数值特征处理")
# 对数变换(处理偏态分布)
df_features['area_log'] = np.log1p(df_features['area'])
df_features['age_log'] = np.log1p(df_features['age'])
# 标准化
scaler = StandardScaler()
numerical_features = ['area', 'bedrooms', 'bathrooms', 'age', 'garage_size']
df_features[['area_scaled', 'bedrooms_scaled', 'bathrooms_scaled',
'age_scaled', 'garage_size_scaled']] = scaler.fit_transform(df_features[numerical_features])
# 分箱处理
df_features['area_binned'] = pd.cut(df_features['area'], bins=5, labels=['很小', '小', '中', '大', '很大'])
df_features['age_binned'] = pd.cut(df_features['age'], bins=[0, 5, 15, 30, 100],
labels=['新房', '较新', '中等', '老房'])
# 2. 类别特征编码
print("2. 类别特征编码")
# One-hot编码
categorical_features = ['neighborhood', 'house_type', 'heating_type']
df_encoded = pd.get_dummies(df_features, columns=categorical_features, prefix=categorical_features)
# 目标编码(针对高基数类别特征)
target_encoder = TargetEncoder()
df_encoded['neighborhood_target_encoded'] = target_encoder.fit_transform(
df_features[['neighborhood']], df_features['price']
)
# 3. 特征构造
print("3. 特征构造")
# 多项式特征
poly_features = PolynomialFeatures(degree=2, include_bias=False, interaction_only=True)
area_bedrooms = poly_features.fit_transform(df_features[['area', 'bedrooms']])
df_encoded['area_bedrooms_interaction'] = area_bedrooms[:, -1] # 交互项
# 比例特征
df_encoded['bathroom_bedroom_ratio'] = df_features['bathrooms'] / (df_features['bedrooms'] + 1)
df_encoded['area_per_bedroom'] = df_features['area'] / (df_features['bedrooms'] + 1)
# 聚合特征
df_encoded['total_rooms'] = df_features['bedrooms'] + df_features['bathrooms']
df_encoded['luxury_score'] = (df_features['has_pool'] + df_features['has_garden'] +
df_features['recently_renovated'])
# 4. 特征选择
print("4. 特征选择")
# 移除原始类别特征和一些冗余特征
features_to_drop = ['neighborhood', 'house_type', 'heating_type', 'area_binned', 'age_binned']
df_final = df_encoded.drop(columns=features_to_drop, errors='ignore')
# 获取特征列表
feature_columns = [col for col in df_final.columns if col != 'price']
print(f"最终特征数量: {len(feature_columns)}")
print(f"特征列表: {feature_columns[:10]}...") # 显示前10个特征
return df_final, feature_columns
def feature_importance_analysis(self, df, feature_columns):
"""特征重要性分析"""
print("\n=== 特征重要性分析 ===")
X = df[feature_columns]
y = df['price']
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 1. 单变量特征选择
print("1. 单变量特征选择")
selector = SelectKBest(score_func=f_regression, k=10)
X_selected = selector.fit_transform(X_train, y_train)
selected_features = X.columns[selector.get_support()]
univariate_scores = selector.scores_
print(f"选择的特征: {list(selected_features)}")
# 2. 基于模型的特征选择
print("2. 基于模型的特征选择")
# 随机森林特征重要性
rf = RandomForestRegressor(n_estimators=100, random_state=42)
rf.fit(X_train, y_train)
rf_importance = rf.feature_importances_
# Lasso特征选择
lasso = LassoCV(cv=5, random_state=42)
lasso.fit(X_train, y_train)
lasso_coef = np.abs(lasso.coef_)
# 3. 递归特征消除
print("3. 递归特征消除")
rfe = RFE(estimator=LinearRegression(), n_features_to_select=10)
rfe.fit(X_train, y_train)
rfe_selected = X.columns[rfe.support_]
print(f"RFE选择的特征: {list(rfe_selected)}")
# 4. 模型性能比较
print("4. 模型性能比较")
models = {
'全部特征': (X_train, X_test),
'单变量选择': (X_train[selected_features], X_test[selected_features]),
'RFE选择': (X_train[rfe_selected], X_test[rfe_selected])
}
results = {}
for name, (X_tr, X_te) in models.items():
# 训练模型
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_tr, y_train)
# 评估性能
train_score = model.score(X_tr, y_train)
test_score = model.score(X_te, y_test)
# 预测
y_pred = model.predict(X_te)
mse = mean_squared_error(y_test, y_pred)
mae = mean_absolute_error(y_test, y_pred)
results[name] = {
'train_r2': train_score,
'test_r2': test_score,
'mse': mse,
'mae': mae,
'n_features': X_tr.shape[1]
}
print(f"{name}: 特征数={X_tr.shape[1]}, 测试R²={test_score:.3f}, MAE={mae:.0f}")
# 可视化特征重要性分析
self.visualize_feature_importance(X, y, univariate_scores, rf_importance, lasso_coef, results)
return results, selected_features, rfe_selected
def visualize_house_dataset(self, df):
"""可视化房价数据集"""
fig, axes = plt.subplots(3, 3, figsize=(18, 15))
# 1. 房价分布
axes[0, 0].hist(df['price'], bins=50, alpha=0.7, edgecolor='black')
axes[0, 0].set_title('房价分布')
axes[0, 0].set_xlabel('价格')
axes[0, 0].set_ylabel('频率')
axes[0, 0].grid(True, alpha=0.3)
# 2. 面积vs价格
axes[0, 1].scatter(df['area'], df['price'], alpha=0.5)
axes[0, 1].set_title('面积 vs 价格')
axes[0, 1].set_xlabel('面积')
axes[0, 1].set_ylabel('价格')
axes[0, 1].grid(True, alpha=0.3)
# 3. 房龄vs价格
axes[0, 2].scatter(df['age'], df['price'], alpha=0.5)
axes[0, 2].set_title('房龄 vs 价格')
axes[0, 2].set_xlabel('房龄')
axes[0, 2].set_ylabel('价格')
axes[0, 2].grid(True, alpha=0.3)
# 4. 地区价格分布
neighborhood_prices = df.groupby('neighborhood')['price'].mean().sort_values(ascending=False)
axes[1, 0].bar(range(len(neighborhood_prices)), neighborhood_prices.values)
axes[1, 0].set_title('不同地区平均房价')
axes[1, 0].set_xlabel('地区')
axes[1, 0].set_ylabel('平均价格')
axes[1, 0].set_xticks(range(len(neighborhood_prices)))
axes[1, 0].set_xticklabels(neighborhood_prices.index, rotation=45)
axes[1, 0].grid(True, alpha=0.3)
# 5. 房型价格分布
house_type_prices = df.groupby('house_type')['price'].mean().sort_values(ascending=False)
axes[1, 1].bar(range(len(house_type_prices)), house_type_prices.values)
axes[1, 1].set_title('不同房型平均房价')
axes[1, 1].set_xlabel('房型')
axes[1, 1].set_ylabel('平均价格')
axes[1, 1].set_xticks(range(len(house_type_prices)))
axes[1, 1].set_xticklabels(house_type_prices.index, rotation=45)
axes[1, 1].grid(True, alpha=0.3)
# 6. 卧室数量vs价格
bedroom_prices = df.groupby('bedrooms')['price'].mean()
axes[1, 2].plot(bedroom_prices.index, bedroom_prices.values, marker='o', linewidth=2)
axes[1, 2].set_title('卧室数量 vs 平均价格')
axes[1, 2].set_xlabel('卧室数量')
axes[1, 2].set_ylabel('平均价格')
axes[1, 2].grid(True, alpha=0.3)
# 7. 设施对价格的影响
amenities = ['has_pool', 'has_garden', 'has_parking', 'recently_renovated']
amenity_effects = []
for amenity in amenities:
with_amenity = df[df[amenity] == 1]['price'].mean()
without_amenity = df[df[amenity] == 0]['price'].mean()
amenity_effects.append(with_amenity - without_amenity)
axes[2, 0].bar(range(len(amenities)), amenity_effects)
axes[2, 0].set_title('设施对房价的影响')
axes[2, 0].set_xlabel('设施')
axes[2, 0].set_ylabel('价格差异')
axes[2, 0].set_xticks(range(len(amenities)))
axes[2, 0].set_xticklabels(['游泳池', '花园', '停车位', '最近装修'], rotation=45)
axes[2, 0].grid(True, alpha=0.3)
# 8. 相关性热图
numerical_features = ['area', 'bedrooms', 'bathrooms', 'age', 'garage_size', 'price']
corr_matrix = df[numerical_features].corr()
im = axes[2, 1].imshow(corr_matrix, cmap='coolwarm', aspect='auto', vmin=-1, vmax=1)
axes[2, 1].set_xticks(range(len(corr_matrix.columns)))
axes[2, 1].set_yticks(range(len(corr_matrix.columns)))
axes[2, 1].set_xticklabels(corr_matrix.columns, rotation=45)
axes[2, 1].set_yticklabels(corr_matrix.columns)
axes[2, 1].set_title('数值特征相关性')
# 添加相关系数文本
for i in range(len(corr_matrix.columns)):
for j in range(len(corr_matrix.columns)):
text = axes[2, 1].text(j, i, f'{corr_matrix.iloc[i, j]:.2f}',
ha="center", va="center", color="black", fontsize=8)
# 9. 价格vs面积(按地区分色)
neighborhoods = df['neighborhood'].unique()
colors = plt.cm.Set1(np.linspace(0, 1, len(neighborhoods)))
for i, neighborhood in enumerate(neighborhoods):
subset = df[df['neighborhood'] == neighborhood]
axes[2, 2].scatter(subset['area'], subset['price'],
label=neighborhood, alpha=0.6, color=colors[i])
axes[2, 2].set_title('面积 vs 价格(按地区分类)')
axes[2, 2].set_xlabel('面积')
axes[2, 2].set_ylabel('价格')
axes[2, 2].legend()
axes[2, 2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def visualize_feature_importance(self, X, y, univariate_scores, rf_importance, lasso_coef, results):
"""可视化特征重要性分析"""
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
# 1. 单变量特征分数
top_10_indices = np.argsort(univariate_scores)[-10:]
top_10_features = X.columns[top_10_indices]
top_10_scores = univariate_scores[top_10_indices]
axes[0, 0].barh(range(len(top_10_features)), top_10_scores)
axes[0, 0].set_yticks(range(len(top_10_features)))
axes[0, 0].set_yticklabels(top_10_features)
axes[0, 0].set_title('单变量特征重要性(Top 10)')
axes[0, 0].set_xlabel('F分数')
axes[0, 0].grid(True, alpha=0.3)
# 2. 随机森林特征重要性
rf_top_10_indices = np.argsort(rf_importance)[-10:]
rf_top_10_features = X.columns[rf_top_10_indices]
rf_top_10_scores = rf_importance[rf_top_10_indices]
axes[0, 1].barh(range(len(rf_top_10_features)), rf_top_10_scores)
axes[0, 1].set_yticks(range(len(rf_top_10_features)))
axes[0, 1].set_yticklabels(rf_top_10_features)
axes[0, 1].set_title('随机森林特征重要性(Top 10)')
axes[0, 1].set_xlabel('重要性')
axes[0, 1].grid(True, alpha=0.3)
# 3. Lasso系数
lasso_top_10_indices = np.argsort(lasso_coef)[-10:]
lasso_top_10_features = X.columns[lasso_top_10_indices]
lasso_top_10_scores = lasso_coef[lasso_top_10_indices]
axes[0, 2].barh(range(len(lasso_top_10_features)), lasso_top_10_scores)
axes[0, 2].set_yticks(range(len(lasso_top_10_features)))
axes[0, 2].set_yticklabels(lasso_top_10_features)
axes[0, 2].set_title('Lasso系数绝对值(Top 10)')
axes[0, 2].set_xlabel('|系数|')
axes[0, 2].grid(True, alpha=0.3)
# 4. 模型性能比较
methods = list(results.keys())
test_r2_scores = [results[method]['test_r2'] for method in methods]
mae_scores = [results[method]['mae'] for method in methods]
n_features = [results[method]['n_features'] for method in methods]
x = np.arange(len(methods))
width = 0.35
axes[1, 0].bar(x, test_r2_scores)
axes[1, 0].set_title('不同特征选择方法的R²分数')
axes[1, 0].set_xlabel('方法')
axes[1, 0].set_ylabel('测试R²')
axes[1, 0].set_xticks(x)
axes[1, 0].set_xticklabels(methods, rotation=45)
axes[1, 0].grid(True, alpha=0.3)
# 5. MAE比较
axes[1, 1].bar(x, mae_scores, color='orange')
axes[1, 1].set_title('不同特征选择方法的MAE')
axes[1, 1].set_xlabel('方法')
axes[1, 1].set_ylabel('平均绝对误差')
axes[1, 1].set_xticks(x)
axes[1, 1].set_xticklabels(methods, rotation=45)
axes[1, 1].grid(True, alpha=0.3)
# 6. 特征数量vs性能
axes[1, 2].scatter(n_features, test_r2_scores, s=100, alpha=0.7)
for i, method in enumerate(methods):
axes[1, 2].annotate(method, (n_features[i], test_r2_scores[i]),
xytext=(5, 5), textcoords='offset points')
axes[1, 2].set_title('特征数量 vs 模型性能')
axes[1, 2].set_xlabel('特征数量')
axes[1, 2].set_ylabel('测试R²')
axes[1, 2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 演示综合案例
print("=== 房价预测特征工程综合案例 ===")
house_fe = HousePricePredictionFeatureEngineering()
house_data = house_fe.create_house_dataset()
house_features_data, feature_columns = house_fe.comprehensive_feature_engineering(house_data)
importance_results, selected_features, rfe_features = house_fe.feature_importance_analysis(house_features_data, feature_columns)
8.8 本章小结
8.8.1 特征工程核心概念回顾
本章我们深入学习了特征工程的各个方面:
特征工程概述
- 特征工程的定义和重要性
- 特征工程的工作流程
- 特征工程的最佳实践
数值特征处理
- 特征缩放与标准化
- 分布变换技术
- 离散化技术
类别特征处理
- 类别特征编码方法
- 高基数类别特征处理
特征构造
- 多项式特征
- 交互特征
- 聚合特征
文本特征提取
- 词袋模型和TF-IDF
- 高级文本特征
时间序列特征工程
- 时间特征提取
- 滞后特征和滚动窗口特征
综合案例
- 房价预测特征工程实战
8.8.2 特征工程最佳实践
数据理解优先
- 深入理解业务背景
- 分析数据分布和特征关系
- 识别数据质量问题
特征工程策略
- 从简单到复杂
- 保持特征的可解释性
- 避免数据泄露
特征验证
- 使用交叉验证评估特征效果
- 监控特征重要性
- 定期更新特征工程流程
8.8.3 常见陷阱与注意事项
数据泄露
- 避免使用未来信息
- 正确处理时间序列数据
- 谨慎使用目标编码
过度工程
- 避免创建过多无用特征
- 平衡模型复杂度和性能
- 考虑计算成本
特征选择
- 避免盲目删除特征
- 考虑特征间的交互作用
- 使用多种特征选择方法
8.8.4 进阶学习方向
自动化特征工程
- 学习AutoML工具
- 研究特征生成算法
- 探索深度特征学习
领域特定特征工程
- 图像特征工程
- 音频特征工程
- 网络数据特征工程
大规模特征工程
- 分布式特征计算
- 特征存储和管理
- 实时特征工程
8.8.5 练习题
理论题
- 解释为什么特征工程在机器学习中如此重要?
- 比较不同的特征缩放方法,说明它们的适用场景。
- 什么是数据泄露?如何在特征工程中避免数据泄露?
实践题
- 使用本章学到的技术,对一个新的数据集进行完整的特征工程。
- 实现一个自动化的特征选择流程,比较不同方法的效果。
- 针对时间序列数据,设计并实现一套完整的特征工程方案。
项目题
- 选择一个实际的机器学习问题,从数据收集到特征工程,完成整个流程。
- 构建一个特征工程工具包,包含常用的特征处理函数。
- 研究某个特定领域的特征工程技术,并实现相关算法。
第8章完结
特征工程是机器学习成功的关键因素之一。通过本章的学习,你已经掌握了从基础的数值和类别特征处理,到高级的文本和时间序列特征工程的完整技能体系。记住,好的特征工程往往比复杂的算法更能提升模型性能。在实际项目中,要根据具体的业务场景和数据特点,灵活运用这些技术,持续优化特征工程流程。