4.1 章节概述
数据清洗是数据分析过程中最重要且耗时的步骤之一。真实世界的数据往往包含缺失值、重复记录、异常值和格式不一致等问题。本章将详细介绍如何使用Pandas进行数据清洗和预处理。
4.1.1 学习目标
- 掌握缺失值的检测和处理方法
- 学会识别和处理重复数据
- 理解数据类型转换的技巧
- 掌握异常值检测和处理
- 学习数据标准化和归一化
- 了解字符串数据的清洗方法
4.1.2 数据清洗流程
graph TD
A[原始数据] --> B[数据探索]
B --> C[缺失值处理]
C --> D[重复值处理]
D --> E[数据类型转换]
E --> F[异常值处理]
F --> G[数据标准化]
G --> H[清洗后数据]
4.2 缺失值处理
4.2.1 缺失值检测
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
# 创建包含缺失值的示例数据
data = {
'name': ['Alice', 'Bob', None, 'David', 'Eve'],
'age': [25, 30, np.nan, 35, 28],
'salary': [50000, 60000, 55000, np.nan, 52000],
'department': ['IT', 'HR', 'IT', 'Finance', None]
}
df = pd.DataFrame(data)
# 检测缺失值
print("数据概览:")
print(df)
print("\n缺失值统计:")
print(df.isnull().sum())
# 缺失值比例
print("\n缺失值比例:")
print(df.isnull().mean() * 100)
# 检查每行的缺失值情况
print("\n每行缺失值数量:")
print(df.isnull().sum(axis=1))
# 可视化缺失值模式
plt.figure(figsize=(10, 6))
sns.heatmap(df.isnull(), yticklabels=False, cbar=True, cmap='viridis')
plt.title('缺失值模式可视化')
plt.show()
4.2.2 缺失值处理策略
# 1. 删除缺失值
# 删除包含任何缺失值的行
df_drop_any = df.dropna()
print("删除任何缺失值后:")
print(df_drop_any)
# 删除所有值都为缺失的行
df_drop_all = df.dropna(how='all')
print("\n删除全部缺失值后:")
print(df_drop_all)
# 删除特定列的缺失值
df_drop_subset = df.dropna(subset=['age'])
print("\n删除age列缺失值后:")
print(df_drop_subset)
# 2. 填充缺失值
# 用固定值填充
df_fill_value = df.fillna('Unknown')
print("\n用固定值填充:")
print(df_fill_value)
# 用前一个值填充(前向填充)
df_ffill = df.fillna(method='ffill')
print("\n前向填充:")
print(df_ffill)
# 用后一个值填充(后向填充)
df_bfill = df.fillna(method='bfill')
print("\n后向填充:")
print(df_bfill)
# 用均值填充数值列
df_mean_fill = df.copy()
df_mean_fill['age'].fillna(df['age'].mean(), inplace=True)
df_mean_fill['salary'].fillna(df['salary'].mean(), inplace=True)
print("\n用均值填充数值列:")
print(df_mean_fill)
# 用众数填充分类列
df_mode_fill = df.copy()
df_mode_fill['department'].fillna(df['department'].mode()[0], inplace=True)
print("\n用众数填充分类列:")
print(df_mode_fill)
4.2.3 高级缺失值处理
# 插值填充
dates = pd.date_range('2023-01-01', periods=10, freq='D')
values = [1, 2, np.nan, 4, np.nan, 6, 7, np.nan, 9, 10]
ts = pd.Series(values, index=dates)
print("原始时间序列:")
print(ts)
# 线性插值
ts_linear = ts.interpolate(method='linear')
print("\n线性插值:")
print(ts_linear)
# 时间插值
ts_time = ts.interpolate(method='time')
print("\n时间插值:")
print(ts_time)
# 多项式插值
ts_poly = ts.interpolate(method='polynomial', order=2)
print("\n多项式插值:")
print(ts_poly)
# 基于分组的填充
# 创建更复杂的数据
np.random.seed(42)
data_complex = {
'group': ['A', 'A', 'B', 'B', 'A', 'B', 'A', 'B'],
'value': [1, np.nan, 3, 4, np.nan, 6, 7, np.nan]
}
df_complex = pd.DataFrame(data_complex)
# 按组填充均值
df_group_fill = df_complex.copy()
df_group_fill['value'] = df_group_fill.groupby('group')['value'].transform(
lambda x: x.fillna(x.mean())
)
print("\n按组填充均值:")
print(df_group_fill)
4.3 重复数据处理
4.3.1 重复数据检测
# 创建包含重复数据的示例
data_dup = {
'name': ['Alice', 'Bob', 'Alice', 'David', 'Bob', 'Eve'],
'age': [25, 30, 25, 35, 30, 28],
'city': ['NY', 'LA', 'NY', 'Chicago', 'LA', 'Boston']
}
df_dup = pd.DataFrame(data_dup)
print("原始数据:")
print(df_dup)
# 检测重复行
print("\n重复行检测:")
print(df_dup.duplicated())
# 显示重复的行
print("\n重复的行:")
print(df_dup[df_dup.duplicated()])
# 检测特定列的重复
print("\n基于name列的重复:")
print(df_dup.duplicated(subset=['name']))
# 统计重复数量
print(f"\n总重复行数:{df_dup.duplicated().sum()}")
print(f"基于name的重复数:{df_dup.duplicated(subset=['name']).sum()}")
4.3.2 重复数据处理
# 删除重复行(保留第一个)
df_drop_dup = df_dup.drop_duplicates()
print("删除重复行后:")
print(df_drop_dup)
# 删除重复行(保留最后一个)
df_drop_dup_last = df_dup.drop_duplicates(keep='last')
print("\n删除重复行(保留最后一个):")
print(df_drop_dup_last)
# 基于特定列删除重复
df_drop_dup_subset = df_dup.drop_duplicates(subset=['name'])
print("\n基于name列删除重复:")
print(df_drop_dup_subset)
# 标记重复数据
df_dup['is_duplicate'] = df_dup.duplicated()
print("\n标记重复数据:")
print(df_dup)
4.4 数据类型转换
4.4.1 基本类型转换
# 创建混合类型数据
data_types = {
'id': ['1', '2', '3', '4', '5'],
'score': ['85.5', '92.0', '78.5', '95.0', '88.5'],
'date': ['2023-01-01', '2023-01-02', '2023-01-03', '2023-01-04', '2023-01-05'],
'is_active': ['True', 'False', 'True', 'True', 'False']
}
df_types = pd.DataFrame(data_types)
print("原始数据类型:")
print(df_types.dtypes)
print("\n原始数据:")
print(df_types)
# 转换数据类型
df_converted = df_types.copy()
# 转换为数值类型
df_converted['id'] = pd.to_numeric(df_converted['id'])
df_converted['score'] = pd.to_numeric(df_converted['score'])
# 转换为日期类型
df_converted['date'] = pd.to_datetime(df_converted['date'])
# 转换为布尔类型
df_converted['is_active'] = df_converted['is_active'].map({'True': True, 'False': False})
print("\n转换后的数据类型:")
print(df_converted.dtypes)
print("\n转换后的数据:")
print(df_converted)
4.4.2 分类数据处理
# 创建分类数据
categories = ['Low', 'Medium', 'High', 'Medium', 'Low', 'High', 'Medium']
df_cat = pd.DataFrame({'level': categories})
print("原始分类数据:")
print(df_cat)
print(f"数据类型:{df_cat['level'].dtype}")
# 转换为分类类型
df_cat['level'] = df_cat['level'].astype('category')
print(f"\n转换后数据类型:{df_cat['level'].dtype}")
print(f"分类:{df_cat['level'].cat.categories}")
# 有序分类
df_cat['level'] = pd.Categorical(df_cat['level'],
categories=['Low', 'Medium', 'High'],
ordered=True)
print(f"\n有序分类:{df_cat['level'].dtype}")
# 分类编码
df_cat['level_code'] = df_cat['level'].cat.codes
print("\n分类编码:")
print(df_cat)
# 独热编码
df_dummies = pd.get_dummies(df_cat['level'], prefix='level')
print("\n独热编码:")
print(df_dummies)
4.5 异常值检测与处理
4.5.1 统计方法检测异常值
# 创建包含异常值的数据
np.random.seed(42)
normal_data = np.random.normal(50, 10, 100)
outliers = [120, 130, -20, -30] # 人工添加异常值
data_with_outliers = np.concatenate([normal_data, outliers])
df_outliers = pd.DataFrame({'value': data_with_outliers})
print("数据统计信息:")
print(df_outliers.describe())
# 1. 使用IQR方法检测异常值
Q1 = df_outliers['value'].quantile(0.25)
Q3 = df_outliers['value'].quantile(0.75)
IQR = Q3 - Q1
# 定义异常值边界
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
print(f"\nIQR方法:")
print(f"下边界:{lower_bound:.2f}")
print(f"上边界:{upper_bound:.2f}")
# 标记异常值
df_outliers['is_outlier_iqr'] = (df_outliers['value'] < lower_bound) | (df_outliers['value'] > upper_bound)
outliers_iqr = df_outliers[df_outliers['is_outlier_iqr']]
print(f"IQR方法检测到的异常值数量:{len(outliers_iqr)}")
# 2. 使用Z-score方法
from scipy import stats
df_outliers['z_score'] = np.abs(stats.zscore(df_outliers['value']))
df_outliers['is_outlier_zscore'] = df_outliers['z_score'] > 3
outliers_zscore = df_outliers[df_outliers['is_outlier_zscore']]
print(f"Z-score方法检测到的异常值数量:{len(outliers_zscore)}")
# 可视化异常值
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
# 箱线图
axes[0].boxplot(df_outliers['value'])
axes[0].set_title('箱线图')
axes[0].set_ylabel('值')
# 散点图
axes[1].scatter(range(len(df_outliers)), df_outliers['value'],
c=df_outliers['is_outlier_iqr'], cmap='coolwarm')
axes[1].set_title('IQR异常值检测')
axes[1].set_xlabel('索引')
axes[1].set_ylabel('值')
# 直方图
axes[2].hist(df_outliers['value'], bins=20, alpha=0.7)
axes[2].axvline(lower_bound, color='r', linestyle='--', label='下边界')
axes[2].axvline(upper_bound, color='r', linestyle='--', label='上边界')
axes[2].set_title('数据分布')
axes[2].set_xlabel('值')
axes[2].set_ylabel('频次')
axes[2].legend()
plt.tight_layout()
plt.show()
4.5.2 异常值处理策略
# 异常值处理方法
# 1. 删除异常值
df_no_outliers = df_outliers[~df_outliers['is_outlier_iqr']].copy()
print(f"删除异常值后数据量:{len(df_no_outliers)}")
# 2. 截断异常值
df_capped = df_outliers.copy()
df_capped['value_capped'] = df_capped['value'].clip(lower=lower_bound, upper=upper_bound)
print("\n截断异常值:")
print(df_capped[['value', 'value_capped']].describe())
# 3. 用边界值替换
df_replaced = df_outliers.copy()
df_replaced.loc[df_replaced['value'] < lower_bound, 'value'] = lower_bound
df_replaced.loc[df_replaced['value'] > upper_bound, 'value'] = upper_bound
# 4. 用中位数替换
df_median_replaced = df_outliers.copy()
median_value = df_median_replaced['value'].median()
df_median_replaced.loc[df_median_replaced['is_outlier_iqr'], 'value'] = median_value
print(f"\n用中位数替换异常值后的统计:")
print(df_median_replaced['value'].describe())
4.6 数据标准化与归一化
4.6.1 标准化方法
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
# 创建示例数据
np.random.seed(42)
data_scale = {
'feature1': np.random.normal(100, 15, 100),
'feature2': np.random.normal(50, 5, 100),
'feature3': np.random.exponential(2, 100)
}
df_scale = pd.DataFrame(data_scale)
print("原始数据统计:")
print(df_scale.describe())
# 1. Z-score标准化(StandardScaler)
scaler_standard = StandardScaler()
df_standard = pd.DataFrame(
scaler_standard.fit_transform(df_scale),
columns=[f'{col}_standard' for col in df_scale.columns]
)
print("\nZ-score标准化后:")
print(df_standard.describe())
# 2. Min-Max归一化
scaler_minmax = MinMaxScaler()
df_minmax = pd.DataFrame(
scaler_minmax.fit_transform(df_scale),
columns=[f'{col}_minmax' for col in df_scale.columns]
)
print("\nMin-Max归一化后:")
print(df_minmax.describe())
# 3. 鲁棒标准化(RobustScaler)
scaler_robust = RobustScaler()
df_robust = pd.DataFrame(
scaler_robust.fit_transform(df_scale),
columns=[f'{col}_robust' for col in df_scale.columns]
)
print("\n鲁棒标准化后:")
print(df_robust.describe())
# 可视化比较
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
# 原始数据
df_scale.boxplot(ax=axes[0, 0])
axes[0, 0].set_title('原始数据')
# 标准化数据
df_standard.boxplot(ax=axes[0, 1])
axes[0, 1].set_title('Z-score标准化')
# 归一化数据
df_minmax.boxplot(ax=axes[1, 0])
axes[1, 0].set_title('Min-Max归一化')
# 鲁棒标准化数据
df_robust.boxplot(ax=axes[1, 1])
axes[1, 1].set_title('鲁棒标准化')
plt.tight_layout()
plt.show()
4.6.2 自定义标准化函数
def custom_normalize(series, method='zscore'):
"""
自定义标准化函数
Parameters:
series: pandas Series
method: str, 标准化方法 ('zscore', 'minmax', 'robust')
Returns:
pandas Series: 标准化后的数据
"""
if method == 'zscore':
return (series - series.mean()) / series.std()
elif method == 'minmax':
return (series - series.min()) / (series.max() - series.min())
elif method == 'robust':
median = series.median()
mad = (series - median).abs().median()
return (series - median) / mad
else:
raise ValueError("方法必须是 'zscore', 'minmax', 或 'robust'")
# 应用自定义标准化
df_custom = df_scale.copy()
for col in df_custom.columns:
df_custom[f'{col}_custom_zscore'] = custom_normalize(df_custom[col], 'zscore')
df_custom[f'{col}_custom_minmax'] = custom_normalize(df_custom[col], 'minmax')
print("自定义标准化结果:")
print(df_custom.describe())
4.7 字符串数据清洗
4.7.1 基本字符串清洗
# 创建包含脏字符串数据的示例
dirty_strings = [
" Alice Johnson ",
"bob@email.com",
"CHARLIE BROWN",
"david_smith123",
"Eve O'Connor",
"frank.miller@company.co.uk",
" GRACE LEE ",
None
]
df_strings = pd.DataFrame({'name': dirty_strings})
print("原始字符串数据:")
print(df_strings)
# 基本清洗操作
df_clean = df_strings.copy()
# 去除首尾空格
df_clean['name_stripped'] = df_clean['name'].str.strip()
# 转换为小写
df_clean['name_lower'] = df_clean['name'].str.lower()
# 转换为标题格式
df_clean['name_title'] = df_clean['name'].str.title()
# 替换特殊字符
df_clean['name_clean'] = df_clean['name'].str.replace(r'[^a-zA-Z\s]', '', regex=True)
print("\n字符串清洗结果:")
print(df_clean)
4.7.2 高级字符串处理
import re
# 创建更复杂的字符串数据
complex_data = {
'email': ['alice@gmail.com', 'bob@yahoo.co.uk', 'invalid-email', 'charlie@company.org'],
'phone': ['123-456-7890', '(555) 123-4567', '555.123.4567', '1234567890'],
'address': ['123 Main St, New York, NY 10001', '456 Oak Ave, Los Angeles, CA 90210',
'789 Pine Rd, Chicago, IL 60601', 'Invalid Address']
}
df_complex = pd.DataFrame(complex_data)
print("复杂字符串数据:")
print(df_complex)
# 邮箱验证和清洗
def validate_email(email):
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, str(email)))
df_complex['email_valid'] = df_complex['email'].apply(validate_email)
# 提取邮箱域名
df_complex['email_domain'] = df_complex['email'].str.extract(r'@([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})')
# 电话号码标准化
def clean_phone(phone):
# 移除所有非数字字符
digits = re.sub(r'\D', '', str(phone))
# 格式化为 XXX-XXX-XXXX
if len(digits) == 10:
return f"{digits[:3]}-{digits[3:6]}-{digits[6:]}"
else:
return "Invalid"
df_complex['phone_clean'] = df_complex['phone'].apply(clean_phone)
# 地址解析
def parse_address(address):
# 简单的地址解析示例
parts = str(address).split(', ')
if len(parts) >= 3:
return {
'street': parts[0],
'city': parts[1],
'state_zip': parts[2]
}
else:
return {'street': None, 'city': None, 'state_zip': None}
address_parsed = df_complex['address'].apply(parse_address)
df_address = pd.json_normalize(address_parsed)
df_complex = pd.concat([df_complex, df_address], axis=1)
print("\n字符串处理结果:")
print(df_complex)
4.8 数据质量评估
4.8.1 数据质量指标
def data_quality_report(df):
"""
生成数据质量报告
"""
report = {}
# 基本信息
report['总行数'] = len(df)
report['总列数'] = len(df.columns)
# 缺失值分析
missing_count = df.isnull().sum()
missing_percent = (missing_count / len(df)) * 100
report['缺失值统计'] = pd.DataFrame({
'缺失数量': missing_count,
'缺失比例(%)': missing_percent
})
# 重复值分析
report['重复行数'] = df.duplicated().sum()
report['重复比例(%)'] = (df.duplicated().sum() / len(df)) * 100
# 数据类型分析
dtype_counts = df.dtypes.value_counts()
report['数据类型分布'] = dtype_counts
# 唯一值分析
unique_counts = df.nunique()
report['唯一值统计'] = unique_counts
return report
# 创建示例数据进行质量评估
np.random.seed(42)
sample_data = {
'id': range(1000),
'name': ['User_' + str(i) if i % 10 != 0 else None for i in range(1000)],
'age': [np.random.randint(18, 80) if i % 20 != 0 else None for i in range(1000)],
'score': np.random.normal(75, 15, 1000),
'category': np.random.choice(['A', 'B', 'C'], 1000)
}
# 添加一些重复行
sample_df = pd.DataFrame(sample_data)
sample_df = pd.concat([sample_df, sample_df.iloc[:50]], ignore_index=True)
# 生成质量报告
quality_report = data_quality_report(sample_df)
print("数据质量报告:")
print(f"总行数:{quality_report['总行数']}")
print(f"总列数:{quality_report['总列数']}")
print(f"重复行数:{quality_report['重复行数']}")
print(f"重复比例:{quality_report['重复比例(%)']:.2f}%")
print("\n缺失值统计:")
print(quality_report['缺失值统计'])
print("\n数据类型分布:")
print(quality_report['数据类型分布'])
print("\n唯一值统计:")
print(quality_report['唯一值统计'])
4.8.2 数据清洗管道
class DataCleaningPipeline:
"""
数据清洗管道类
"""
def __init__(self):
self.steps = []
self.log = []
def add_step(self, func, name, **kwargs):
"""添加清洗步骤"""
self.steps.append((func, name, kwargs))
return self
def fit_transform(self, df):
"""执行清洗管道"""
result = df.copy()
for func, name, kwargs in self.steps:
before_shape = result.shape
result = func(result, **kwargs)
after_shape = result.shape
log_entry = {
'step': name,
'before_shape': before_shape,
'after_shape': after_shape,
'rows_changed': before_shape[0] - after_shape[0],
'cols_changed': before_shape[1] - after_shape[1]
}
self.log.append(log_entry)
return result
def get_log(self):
"""获取清洗日志"""
return pd.DataFrame(self.log)
# 定义清洗函数
def remove_duplicates(df):
return df.drop_duplicates()
def handle_missing_values(df, strategy='drop'):
if strategy == 'drop':
return df.dropna()
elif strategy == 'fill_mean':
numeric_cols = df.select_dtypes(include=[np.number]).columns
df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].mean())
return df
def remove_outliers(df, columns=None, method='iqr'):
if columns is None:
columns = df.select_dtypes(include=[np.number]).columns
result = df.copy()
for col in columns:
if method == 'iqr':
Q1 = result[col].quantile(0.25)
Q3 = result[col].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - 1.5 * IQR
upper = Q3 + 1.5 * IQR
result = result[(result[col] >= lower) & (result[col] <= upper)]
return result
# 创建和使用清洗管道
pipeline = DataCleaningPipeline()
pipeline.add_step(remove_duplicates, "去除重复值")
pipeline.add_step(handle_missing_values, "处理缺失值", strategy='fill_mean')
pipeline.add_step(remove_outliers, "去除异常值", columns=['score'])
# 应用管道
cleaned_data = pipeline.fit_transform(sample_df)
print("清洗管道执行日志:")
print(pipeline.get_log())
print(f"\n原始数据形状:{sample_df.shape}")
print(f"清洗后数据形状:{cleaned_data.shape}")
4.9 本章小结
4.9.1 核心知识点
缺失值处理
- 检测:
isnull()
,isna()
,info()
- 删除:
dropna()
- 填充:
fillna()
,interpolate()
- 策略:删除、固定值、统计值、插值
- 检测:
重复数据处理
- 检测:
duplicated()
- 删除:
drop_duplicates()
- 保留策略:first, last, False
- 检测:
数据类型转换
- 数值转换:
pd.to_numeric()
- 日期转换:
pd.to_datetime()
- 分类转换:
astype('category')
- 编码:
get_dummies()
,cat.codes
- 数值转换:
异常值处理
- 检测方法:IQR、Z-score、统计方法
- 处理策略:删除、截断、替换
数据标准化
- Z-score标准化:均值0,标准差1
- Min-Max归一化:缩放到[0,1]
- 鲁棒标准化:基于中位数和MAD
字符串清洗
- 基本操作:
str.strip()
,str.lower()
,str.replace()
- 正则表达式:模式匹配和提取
- 验证:邮箱、电话等格式验证
- 基本操作:
4.9.2 最佳实践
- 在清洗前先进行数据探索和质量评估
- 记录清洗步骤和参数,确保可重现性
- 使用管道化方法组织清洗流程
- 保留原始数据的备份
- 验证清洗结果的合理性
4.9.3 下一步学习
在下一章中,我们将学习: - 数据选择和过滤技术 - 条件查询和布尔索引 - 多条件筛选 - 数据切片和采样
练习题
- 设计一个完整的数据清洗流程处理真实数据集
- 比较不同缺失值处理方法的效果
- 实现一个自动化的异常值检测系统
- 创建一个数据质量评估工具
- 处理包含多种格式问题的字符串数据
记住:数据清洗是数据分析成功的基础,投入时间进行彻底的清洗将大大提高后续分析的质量!