7.1 章节概述
在实际数据分析中,我们经常需要将来自不同数据源的数据进行合并和连接。本章将详细介绍Pandas中的数据合并操作,包括merge、join、concat等方法,以及如何处理各种复杂的数据连接场景。
7.1.1 学习目标
- 掌握merge操作的各种连接类型
- 学会使用join方法进行索引连接
- 理解concat操作的数据拼接
- 学习处理重复数据和冲突解决
- 掌握多表连接和复杂合并场景
- 了解合并操作的性能优化技巧
7.1.2 数据合并类型
graph TD
A[数据合并] --> B[merge 合并]
A --> C[join 连接]
A --> D[concat 拼接]
B --> B1[inner join 内连接]
B --> B2[left join 左连接]
B --> B3[right join 右连接]
B --> B4[outer join 外连接]
C --> C1[基于索引连接]
C --> C2[多级索引连接]
D --> D1[垂直拼接]
D --> D2[水平拼接]
D --> D3[多层拼接]
7.2 数据准备
7.2.1 创建示例数据
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
# 创建员工信息表
employees = pd.DataFrame({
'emp_id': [1, 2, 3, 4, 5, 6, 7, 8],
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve', 'Frank', 'Grace', 'Henry'],
'department': ['IT', 'HR', 'Finance', 'IT', 'Marketing', 'IT', 'HR', 'Finance'],
'hire_date': pd.date_range('2020-01-01', periods=8, freq='3M'),
'salary': [75000, 65000, 70000, 80000, 72000, 78000, 68000, 73000]
})
# 创建部门信息表
departments = pd.DataFrame({
'dept_name': ['IT', 'HR', 'Finance', 'Marketing', 'Sales'],
'manager': ['John', 'Sarah', 'Mike', 'Lisa', 'Tom'],
'budget': [500000, 200000, 300000, 250000, 400000],
'location': ['Building A', 'Building B', 'Building A', 'Building C', 'Building B']
})
# 创建项目信息表
projects = pd.DataFrame({
'project_id': ['P001', 'P002', 'P003', 'P004', 'P005'],
'project_name': ['Website Redesign', 'HR System', 'Budget Analysis', 'Marketing Campaign', 'Sales Portal'],
'department': ['IT', 'HR', 'Finance', 'Marketing', 'Sales'],
'budget': [100000, 50000, 30000, 80000, 120000],
'status': ['Active', 'Completed', 'Active', 'Planning', 'Active']
})
# 创建员工项目分配表
emp_projects = pd.DataFrame({
'emp_id': [1, 1, 2, 3, 4, 4, 5, 6, 7, 8],
'project_id': ['P001', 'P003', 'P002', 'P003', 'P001', 'P004', 'P004', 'P001', 'P002', 'P003'],
'role': ['Lead', 'Analyst', 'Lead', 'Lead', 'Developer', 'Analyst', 'Lead', 'Developer', 'Analyst', 'Analyst'],
'allocation': [0.8, 0.2, 1.0, 1.0, 0.6, 0.4, 1.0, 0.5, 1.0, 1.0]
})
# 创建销售数据表
sales_q1 = pd.DataFrame({
'emp_id': [1, 2, 3, 4, 5],
'q1_sales': [120000, 95000, 110000, 130000, 105000],
'q1_target': [100000, 90000, 100000, 120000, 100000]
})
sales_q2 = pd.DataFrame({
'emp_id': [1, 2, 3, 4, 6],
'q2_sales': [135000, 88000, 125000, 140000, 115000],
'q2_target': [110000, 95000, 110000, 130000, 110000]
})
print("示例数据创建完成:")
print(f"员工表:{employees.shape}")
print(f"部门表:{departments.shape}")
print(f"项目表:{projects.shape}")
print(f"员工项目表:{emp_projects.shape}")
print(f"Q1销售表:{sales_q1.shape}")
print(f"Q2销售表:{sales_q2.shape}")
# 显示数据样例
print("\n员工信息表:")
print(employees)
print("\n部门信息表:")
print(departments)
7.2.2 数据概览
# 数据概览
print("数据概览:")
print("1. 员工信息表结构:")
print(employees.info())
print(f"员工数量:{len(employees)}")
print(f"部门分布:\n{employees['department'].value_counts()}")
print("\n2. 部门信息表结构:")
print(departments.info())
print(f"部门数量:{len(departments)}")
print("\n3. 项目信息表结构:")
print(projects.info())
print(f"项目状态分布:\n{projects['status'].value_counts()}")
print("\n4. 员工项目分配表结构:")
print(emp_projects.info())
print(f"角色分布:\n{emp_projects['role'].value_counts()}")
# 检查数据质量
print("\n5. 数据质量检查:")
print("员工表缺失值:", employees.isnull().sum().sum())
print("部门表缺失值:", departments.isnull().sum().sum())
print("项目表缺失值:", projects.isnull().sum().sum())
print("员工项目表缺失值:", emp_projects.isnull().sum().sum())
# 检查键的唯一性
print("\n6. 键的唯一性检查:")
print(f"员工ID唯一性:{employees['emp_id'].is_unique}")
print(f"部门名称唯一性:{departments['dept_name'].is_unique}")
print(f"项目ID唯一性:{projects['project_id'].is_unique}")
7.3 Merge操作详解
7.3.1 基本Merge操作
# 基本Merge操作
print("基本Merge操作:")
# 内连接(Inner Join)
print("1. 内连接 - 员工与部门信息:")
inner_merge = pd.merge(employees, departments, left_on='department', right_on='dept_name', how='inner')
print(f"内连接结果行数:{len(inner_merge)}")
print(inner_merge[['name', 'department', 'manager', 'budget', 'location']].head())
# 左连接(Left Join)
print("\n2. 左连接 - 保留所有员工:")
left_merge = pd.merge(employees, departments, left_on='department', right_on='dept_name', how='left')
print(f"左连接结果行数:{len(left_merge)}")
print("缺失部门信息的员工:")
missing_dept = left_merge[left_merge['manager'].isnull()]
if not missing_dept.empty:
print(missing_dept[['name', 'department']])
else:
print("所有员工都有对应的部门信息")
# 右连接(Right Join)
print("\n3. 右连接 - 保留所有部门:")
right_merge = pd.merge(employees, departments, left_on='department', right_on='dept_name', how='right')
print(f"右连接结果行数:{len(right_merge)}")
print("没有员工的部门:")
no_employees = right_merge[right_merge['name'].isnull()]
if not no_employees.empty:
print(no_employees[['dept_name', 'manager']])
else:
print("所有部门都有员工")
# 外连接(Outer Join)
print("\n4. 外连接 - 保留所有记录:")
outer_merge = pd.merge(employees, departments, left_on='department', right_on='dept_name', how='outer')
print(f"外连接结果行数:{len(outer_merge)}")
print("外连接缺失值统计:")
print(outer_merge.isnull().sum())
7.3.2 多列连接
# 多列连接
print("多列连接:")
# 创建包含多个连接键的数据
emp_extended = employees.copy()
emp_extended['dept_code'] = emp_extended['department'].map({
'IT': 'IT01', 'HR': 'HR01', 'Finance': 'FN01', 'Marketing': 'MK01'
})
dept_extended = departments.copy()
dept_extended['dept_code'] = dept_extended['dept_name'].map({
'IT': 'IT01', 'HR': 'HR01', 'Finance': 'FN01', 'Marketing': 'MK01', 'Sales': 'SL01'
})
print("1. 多列连接示例:")
multi_key_merge = pd.merge(
emp_extended,
dept_extended,
left_on=['department', 'dept_code'],
right_on=['dept_name', 'dept_code'],
how='inner'
)
print(f"多列连接结果:{len(multi_key_merge)}行")
print(multi_key_merge[['name', 'department', 'dept_code', 'manager']].head())
# 处理列名冲突
print("\n2. 处理列名冲突:")
# 创建有冲突列名的数据
emp_with_budget = employees.copy()
emp_with_budget['budget'] = emp_with_budget['salary'] * 0.1 # 个人预算
conflict_merge = pd.merge(
emp_with_budget,
departments,
left_on='department',
right_on='dept_name',
how='inner',
suffixes=('_personal', '_dept')
)
print("列名冲突处理结果:")
print(conflict_merge[['name', 'budget_personal', 'budget_dept']].head())
7.3.3 复杂连接场景
# 复杂连接场景
print("复杂连接场景:")
# 一对多连接
print("1. 一对多连接 - 员工与项目分配:")
emp_project_merge = pd.merge(employees, emp_projects, on='emp_id', how='left')
print(f"员工项目连接结果:{len(emp_project_merge)}行")
print("员工项目分配情况:")
print(emp_project_merge[['name', 'project_id', 'role', 'allocation']].head(10))
# 多对多连接
print("\n2. 多对多连接 - 完整项目信息:")
full_project_info = pd.merge(
pd.merge(employees, emp_projects, on='emp_id'),
projects, on='project_id', how='inner'
)
print(f"完整项目信息:{len(full_project_info)}行")
print("项目团队信息:")
print(full_project_info[['name', 'project_name', 'role', 'allocation', 'status']].head())
# 链式连接
print("\n3. 链式连接 - 员工-部门-项目完整信息:")
complete_info = (employees
.merge(departments, left_on='department', right_on='dept_name', how='left')
.merge(emp_projects, on='emp_id', how='left')
.merge(projects, on='project_id', how='left'))
print(f"完整信息表:{len(complete_info)}行")
print("完整信息样例:")
print(complete_info[['name', 'department', 'manager', 'project_name', 'role']].head())
# 条件连接
print("\n4. 条件连接 - 活跃项目的员工:")
active_projects = projects[projects['status'] == 'Active']
active_emp_projects = pd.merge(
pd.merge(employees, emp_projects, on='emp_id'),
active_projects, on='project_id', how='inner'
)
print(f"参与活跃项目的员工:{len(active_emp_projects)}行")
print("活跃项目参与者:")
active_summary = active_emp_projects.groupby('name').agg({
'project_name': 'count',
'allocation': 'sum'
}).rename(columns={'project_name': 'active_projects', 'allocation': 'total_allocation'})
print(active_summary)
7.3.4 Merge性能优化
# Merge性能优化
print("Merge性能优化:")
# 创建大数据集进行性能测试
import time
large_left = pd.DataFrame({
'key': np.random.randint(0, 10000, 100000),
'value1': np.random.randn(100000)
})
large_right = pd.DataFrame({
'key': np.random.randint(0, 5000, 50000),
'value2': np.random.randn(50000)
})
print(f"左表大小:{large_left.shape}")
print(f"右表大小:{large_right.shape}")
# 1. 基本merge性能
print("\n1. 基本merge性能测试:")
start_time = time.time()
basic_merge = pd.merge(large_left, large_right, on='key', how='inner')
basic_time = time.time() - start_time
print(f"基本merge时间:{basic_time:.4f}秒,结果行数:{len(basic_merge)}")
# 2. 排序优化
print("\n2. 排序优化测试:")
large_left_sorted = large_left.sort_values('key')
large_right_sorted = large_right.sort_values('key')
start_time = time.time()
sorted_merge = pd.merge(large_left_sorted, large_right_sorted, on='key', how='inner')
sorted_time = time.time() - start_time
print(f"排序后merge时间:{sorted_time:.4f}秒")
print(f"性能提升:{basic_time/sorted_time:.2f}倍")
# 3. 索引优化
print("\n3. 索引优化测试:")
large_left_indexed = large_left.set_index('key')
large_right_indexed = large_right.set_index('key')
start_time = time.time()
indexed_merge = large_left_indexed.join(large_right_indexed, how='inner')
indexed_time = time.time() - start_time
print(f"索引join时间:{indexed_time:.4f}秒")
print(f"相对基本merge提升:{basic_time/indexed_time:.2f}倍")
# 4. 分类数据类型优化
print("\n4. 分类数据类型优化:")
large_left['key_cat'] = large_left['key'].astype('category')
large_right['key_cat'] = large_right['key'].astype('category')
start_time = time.time()
category_merge = pd.merge(large_left, large_right, on='key_cat', how='inner')
category_time = time.time() - start_time
print(f"分类类型merge时间:{category_time:.4f}秒")
print(f"相对基本merge提升:{basic_time/category_time:.2f}倍")
7.4 Join操作
7.4.1 基本Join操作
# 基本Join操作
print("基本Join操作:")
# 设置索引进行join
emp_indexed = employees.set_index('emp_id')
sales_q1_indexed = sales_q1.set_index('emp_id')
sales_q2_indexed = sales_q2.set_index('emp_id')
print("1. 基本索引join:")
emp_sales_q1 = emp_indexed.join(sales_q1_indexed, how='left')
print(f"员工Q1销售数据:{len(emp_sales_q1)}行")
print(emp_sales_q1[['name', 'department', 'q1_sales', 'q1_target']].head())
print("\n2. 多表join:")
emp_all_sales = emp_indexed.join([sales_q1_indexed, sales_q2_indexed], how='outer')
print(f"员工全年销售数据:{len(emp_all_sales)}行")
print(emp_all_sales[['name', 'q1_sales', 'q2_sales']].head())
# 不同join类型
print("\n3. 不同join类型对比:")
join_types = ['left', 'right', 'outer', 'inner']
for join_type in join_types:
result = emp_indexed.join(sales_q1_indexed, how=join_type)
print(f"{join_type} join: {len(result)}行")
# 处理列名冲突
print("\n4. Join中的列名冲突处理:")
emp_with_target = employees.set_index('emp_id')
emp_with_target['target'] = 100000 # 添加目标列
sales_with_target = sales_q1.set_index('emp_id')
sales_with_target['target'] = sales_with_target['q1_target'] # 重命名目标列
conflict_join = emp_with_target.join(sales_with_target, rsuffix='_sales')
print("列名冲突处理结果:")
print(conflict_join[['name', 'target', 'target_sales']].head())
7.4.2 多级索引Join
# 多级索引Join
print("多级索引Join:")
# 创建多级索引数据
emp_multi = employees.set_index(['department', 'emp_id'])
dept_multi = departments.set_index('dept_name')
print("1. 多级索引结构:")
print("员工多级索引:")
print(emp_multi.head())
# 多级索引join
print("\n2. 多级索引join操作:")
# 需要调整索引级别进行join
emp_dept_join = emp_multi.reset_index().set_index('department').join(
dept_multi, how='left'
)
print("多级索引join结果:")
print(emp_dept_join[['emp_id', 'name', 'manager', 'budget']].head())
# 复杂多级索引场景
print("\n3. 复杂多级索引场景:")
# 创建按部门和项目的多级索引
project_dept = projects.set_index(['department', 'project_id'])
emp_project_multi = emp_projects.merge(
employees[['emp_id', 'department']], on='emp_id'
).set_index(['department', 'project_id'])
multi_join = emp_project_multi.join(project_dept, how='left')
print("复杂多级索引join:")
print(multi_join[['emp_id', 'role', 'project_name', 'status']].head())
7.4.3 Join性能对比
# Join性能对比
print("Join性能对比:")
# 创建测试数据
test_left = pd.DataFrame({
'key': range(50000),
'value1': np.random.randn(50000)
}).set_index('key')
test_right = pd.DataFrame({
'key': range(25000, 75000),
'value2': np.random.randn(50000)
}).set_index('key')
print(f"测试数据大小 - 左表:{test_left.shape},右表:{test_right.shape}")
# Join vs Merge性能对比
print("\n1. Join vs Merge性能对比:")
# Join方法
start_time = time.time()
join_result = test_left.join(test_right, how='inner')
join_time = time.time() - start_time
# Merge方法
test_left_reset = test_left.reset_index()
test_right_reset = test_right.reset_index()
start_time = time.time()
merge_result = pd.merge(test_left_reset, test_right_reset, on='key', how='inner')
merge_time = time.time() - start_time
print(f"Join时间:{join_time:.4f}秒,结果:{len(join_result)}行")
print(f"Merge时间:{merge_time:.4f}秒,结果:{len(merge_result)}行")
print(f"Join相对Merge速度提升:{merge_time/join_time:.2f}倍")
# 不同join类型的性能
print("\n2. 不同join类型性能:")
join_types = ['left', 'right', 'outer', 'inner']
for join_type in join_types:
start_time = time.time()
result = test_left.join(test_right, how=join_type)
elapsed_time = time.time() - start_time
print(f"{join_type} join: {elapsed_time:.4f}秒,{len(result)}行")
7.5 Concat操作
7.5.1 基本Concat操作
# 基本Concat操作
print("基本Concat操作:")
# 垂直拼接(行拼接)
print("1. 垂直拼接 - 合并Q1和Q2销售数据:")
sales_combined = pd.concat([sales_q1, sales_q2], ignore_index=True)
print(f"拼接结果:{len(sales_combined)}行")
print(sales_combined.head(8))
# 处理重复索引
print("\n2. 处理重复索引:")
sales_with_keys = pd.concat([sales_q1, sales_q2], keys=['Q1', 'Q2'])
print("带键的拼接结果:")
print(sales_with_keys.head(8))
# 水平拼接(列拼接)
print("\n3. 水平拼接 - 合并Q1和Q2销售指标:")
sales_horizontal = pd.concat([
sales_q1.set_index('emp_id'),
sales_q2.set_index('emp_id')
], axis=1, join='outer')
print("水平拼接结果:")
print(sales_horizontal)
# 处理缺失值
print("\n4. 处理拼接中的缺失值:")
sales_inner = pd.concat([
sales_q1.set_index('emp_id'),
sales_q2.set_index('emp_id')
], axis=1, join='inner')
print("内连接拼接(无缺失值):")
print(sales_inner)
7.5.2 复杂Concat场景
# 复杂Concat场景
print("复杂Concat场景:")
# 多个DataFrame拼接
print("1. 多个DataFrame垂直拼接:")
# 创建多个月份的数据
monthly_data = []
for month in range(1, 4):
month_data = pd.DataFrame({
'emp_id': np.random.choice(employees['emp_id'], 5),
'month': month,
'sales': np.random.randint(10000, 50000, 5),
'expenses': np.random.randint(1000, 5000, 5)
})
monthly_data.append(month_data)
quarterly_data = pd.concat(monthly_data, ignore_index=True)
print(f"季度数据:{len(quarterly_data)}行")
print(quarterly_data.head(10))
# 带标识的拼接
print("\n2. 带标识的多DataFrame拼接:")
labeled_data = pd.concat(monthly_data, keys=['Jan', 'Feb', 'Mar'])
print("带月份标识的数据:")
print(labeled_data.head(10))
# 不同结构DataFrame的拼接
print("\n3. 不同结构DataFrame拼接:")
df1 = pd.DataFrame({
'A': [1, 2, 3],
'B': [4, 5, 6],
'C': [7, 8, 9]
})
df2 = pd.DataFrame({
'A': [10, 11],
'B': [12, 13],
'D': [14, 15] # 不同的列
})
df3 = pd.DataFrame({
'A': [20],
'E': [21] # 完全不同的列
})
mixed_concat = pd.concat([df1, df2, df3], ignore_index=True, sort=False)
print("不同结构拼接结果:")
print(mixed_concat)
# 填充缺失值
print("\n4. 拼接时填充缺失值:")
filled_concat = pd.concat([df1, df2, df3], ignore_index=True).fillna(0)
print("填充缺失值后:")
print(filled_concat)
7.5.3 Concat性能优化
# Concat性能优化
print("Concat性能优化:")
# 创建性能测试数据
test_dfs = []
for i in range(100):
df = pd.DataFrame({
'id': range(i*1000, (i+1)*1000),
'value': np.random.randn(1000),
'category': np.random.choice(['A', 'B', 'C'], 1000)
})
test_dfs.append(df)
print(f"测试数据:{len(test_dfs)}个DataFrame,每个{test_dfs[0].shape[0]}行")
# 1. 基本concat性能
print("\n1. 基本concat性能:")
start_time = time.time()
basic_concat = pd.concat(test_dfs, ignore_index=True)
basic_time = time.time() - start_time
print(f"基本concat时间:{basic_time:.4f}秒,结果:{basic_concat.shape}")
# 2. 预分配内存优化
print("\n2. 预分配内存优化:")
start_time = time.time()
# 预计算总行数
total_rows = sum(len(df) for df in test_dfs)
# 使用copy=False减少内存复制
optimized_concat = pd.concat(test_dfs, ignore_index=True, copy=False)
optimized_time = time.time() - start_time
print(f"优化concat时间:{optimized_time:.4f}秒")
print(f"性能提升:{basic_time/optimized_time:.2f}倍")
# 3. 分批concat
print("\n3. 分批concat策略:")
start_time = time.time()
batch_size = 10
batched_results = []
for i in range(0, len(test_dfs), batch_size):
batch = test_dfs[i:i+batch_size]
batch_result = pd.concat(batch, ignore_index=True)
batched_results.append(batch_result)
final_result = pd.concat(batched_results, ignore_index=True)
batched_time = time.time() - start_time
print(f"分批concat时间:{batched_time:.4f}秒")
print(f"相对基本concat:{basic_time/batched_time:.2f}倍")
# 内存使用对比
print(f"\n4. 内存使用对比:")
print(f"基本concat内存:{basic_concat.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
print(f"优化concat内存:{optimized_concat.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
7.6 处理重复和冲突
7.6.1 重复数据处理
# 重复数据处理
print("重复数据处理:")
# 创建包含重复数据的示例
emp_duplicate = pd.concat([employees, employees.iloc[:3]], ignore_index=True)
print("1. 包含重复数据的员工表:")
print(f"原始数据:{len(employees)}行,重复数据:{len(emp_duplicate)}行")
# 检测重复
print("\n2. 检测重复数据:")
print(f"重复行数:{emp_duplicate.duplicated().sum()}")
print("重复的行:")
print(emp_duplicate[emp_duplicate.duplicated()])
# 删除重复
print("\n3. 删除重复数据:")
emp_no_dup = emp_duplicate.drop_duplicates()
print(f"删除重复后:{len(emp_no_dup)}行")
# 基于特定列检测重复
print("\n4. 基于特定列检测重复:")
emp_name_dup = emp_duplicate.drop_duplicates(subset=['name'])
print(f"基于姓名去重:{len(emp_name_dup)}行")
# 保留不同的重复项
print("\n5. 保留不同的重复项:")
emp_keep_last = emp_duplicate.drop_duplicates(keep='last')
print(f"保留最后一个重复项:{len(emp_keep_last)}行")
emp_keep_none = emp_duplicate.drop_duplicates(keep=False)
print(f"删除所有重复项:{len(emp_keep_none)}行")
7.6.2 合并冲突处理
# 合并冲突处理
print("合并冲突处理:")
# 创建有冲突的数据
emp_update1 = pd.DataFrame({
'emp_id': [1, 2, 3],
'salary': [80000, 70000, 75000], # 更新的薪资
'department': ['IT', 'HR', 'Finance'],
'update_date': ['2023-01-01', '2023-01-01', '2023-01-01']
})
emp_update2 = pd.DataFrame({
'emp_id': [1, 2, 4],
'salary': [82000, 68000, 85000], # 不同的薪资更新
'department': ['IT', 'HR', 'IT'],
'update_date': ['2023-02-01', '2023-02-01', '2023-02-01']
})
print("1. 冲突数据示例:")
print("更新1:")
print(emp_update1)
print("\n更新2:")
print(emp_update2)
# 使用suffixes处理冲突
print("\n2. 使用suffixes处理冲突:")
conflict_merge = pd.merge(emp_update1, emp_update2, on='emp_id', how='outer', suffixes=('_v1', '_v2'))
print(conflict_merge)
# 自定义冲突解决策略
print("\n3. 自定义冲突解决策略:")
def resolve_conflicts(row):
"""解决冲突的自定义函数"""
# 使用最新的更新日期
if pd.isna(row['update_date_v2']):
return row['salary_v1'], row['department_v1'], row['update_date_v1']
elif pd.isna(row['update_date_v1']):
return row['salary_v2'], row['department_v2'], row['update_date_v2']
else:
# 比较日期,选择最新的
if row['update_date_v2'] > row['update_date_v1']:
return row['salary_v2'], row['department_v2'], row['update_date_v2']
else:
return row['salary_v1'], row['department_v1'], row['update_date_v1']
# 应用冲突解决策略
resolved_data = conflict_merge.copy()
resolved_data[['salary_final', 'department_final', 'update_date_final']] = resolved_data.apply(
lambda row: pd.Series(resolve_conflicts(row)), axis=1
)
print("冲突解决后的数据:")
print(resolved_data[['emp_id', 'salary_final', 'department_final', 'update_date_final']])
# 使用combine_first处理冲突
print("\n4. 使用combine_first处理冲突:")
emp_base = employees.set_index('emp_id')[['salary', 'department']]
emp_update1_indexed = emp_update1.set_index('emp_id')[['salary', 'department']]
combined_data = emp_update1_indexed.combine_first(emp_base)
print("combine_first结果:")
print(combined_data)
7.6.3 数据一致性检查
# 数据一致性检查
print("数据一致性检查:")
# 创建测试数据
emp_sales_merge = pd.merge(employees, sales_q1, on='emp_id', how='left')
print("1. 基本一致性检查:")
print(f"员工总数:{len(employees)}")
print(f"有销售数据的员工:{emp_sales_merge['q1_sales'].notna().sum()}")
print(f"缺失销售数据的员工:{emp_sales_merge['q1_sales'].isna().sum()}")
# 检查数据范围一致性
print("\n2. 数据范围一致性检查:")
print("薪资范围检查:")
print(f"薪资最小值:{employees['salary'].min()}")
print(f"薪资最大值:{employees['salary'].max()}")
print(f"薪资平均值:{employees['salary'].mean():.2f}")
# 检查异常值
Q1 = employees['salary'].quantile(0.25)
Q3 = employees['salary'].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = employees[(employees['salary'] < lower_bound) | (employees['salary'] > upper_bound)]
print(f"\n薪资异常值检查:")
print(f"异常值数量:{len(outliers)}")
if len(outliers) > 0:
print("异常值详情:")
print(outliers[['name', 'salary']])
# 检查引用完整性
print("\n3. 引用完整性检查:")
# 检查员工表中的部门是否都在部门表中
emp_depts = set(employees['department'].unique())
dept_names = set(departments['dept_name'].unique())
missing_depts = emp_depts - dept_names
extra_depts = dept_names - emp_depts
print(f"员工表中的部门:{emp_depts}")
print(f"部门表中的部门:{dept_names}")
print(f"缺失的部门定义:{missing_depts}")
print(f"多余的部门定义:{extra_depts}")
# 检查项目分配的完整性
print("\n4. 项目分配完整性检查:")
emp_in_projects = set(emp_projects['emp_id'].unique())
all_employees = set(employees['emp_id'].unique())
projects_in_allocation = set(emp_projects['project_id'].unique())
all_projects = set(projects['project_id'].unique())
print(f"参与项目的员工:{len(emp_in_projects)}/{len(all_employees)}")
print(f"有员工分配的项目:{len(projects_in_allocation)}/{len(all_projects)}")
unassigned_employees = all_employees - emp_in_projects
unassigned_projects = all_projects - projects_in_allocation
if unassigned_employees:
print(f"未分配项目的员工:{unassigned_employees}")
if unassigned_projects:
print(f"未分配员工的项目:{unassigned_projects}")
7.7 高级合并技巧
7.7.1 条件合并
# 条件合并
print("条件合并:")
# 基于条件的合并
print("1. 基于薪资等级的合并:")
# 创建薪资等级表
salary_grades = pd.DataFrame({
'grade': ['Junior', 'Mid', 'Senior', 'Lead'],
'min_salary': [0, 60000, 75000, 90000],
'max_salary': [59999, 74999, 89999, 999999],
'bonus_rate': [0.05, 0.10, 0.15, 0.20]
})
print("薪资等级表:")
print(salary_grades)
# 使用pd.cut进行条件分组
employees['salary_grade'] = pd.cut(
employees['salary'],
bins=[0, 60000, 75000, 90000, float('inf')],
labels=['Junior', 'Mid', 'Senior', 'Lead'],
right=False
)
# 合并薪资等级信息
emp_with_grade = pd.merge(employees, salary_grades, left_on='salary_grade', right_on='grade', how='left')
print("\n员工薪资等级信息:")
print(emp_with_grade[['name', 'salary', 'salary_grade', 'bonus_rate']].head())
# 计算奖金
emp_with_grade['bonus'] = emp_with_grade['salary'] * emp_with_grade['bonus_rate']
print("\n员工奖金计算:")
print(emp_with_grade[['name', 'salary', 'bonus_rate', 'bonus']].head())
7.7.2 时间序列合并
# 时间序列合并
print("时间序列合并:")
# 创建时间序列数据
date_range = pd.date_range('2023-01-01', '2023-12-31', freq='D')
daily_metrics = pd.DataFrame({
'date': date_range,
'temperature': np.random.normal(20, 10, len(date_range)),
'sales_factor': np.random.uniform(0.8, 1.2, len(date_range))
})
# 创建员工日销售数据(稀疏数据)
emp_daily_sales = pd.DataFrame({
'emp_id': np.random.choice(employees['emp_id'], 1000),
'date': np.random.choice(date_range, 1000),
'daily_sales': np.random.exponential(1000, 1000)
})
print("1. 时间序列数据概览:")
print(f"日度指标数据:{len(daily_metrics)}行")
print(f"员工日销售数据:{len(emp_daily_sales)}行")
# 时间序列合并
print("\n2. 时间序列合并:")
emp_sales_with_metrics = pd.merge(
emp_daily_sales,
daily_metrics,
on='date',
how='left'
)
print("合并后的时间序列数据:")
print(emp_sales_with_metrics.head())
# 分析温度对销售的影响
print("\n3. 温度对销售的影响分析:")
correlation = emp_sales_with_metrics['temperature'].corr(emp_sales_with_metrics['daily_sales'])
print(f"温度与销售额相关性:{correlation:.3f}")
# 按温度区间分析
emp_sales_with_metrics['temp_range'] = pd.cut(
emp_sales_with_metrics['temperature'],
bins=[-float('inf'), 10, 20, 30, float('inf')],
labels=['Cold', 'Cool', 'Warm', 'Hot']
)
temp_sales_analysis = emp_sales_with_metrics.groupby('temp_range')['daily_sales'].agg(['count', 'mean', 'std'])
print("\n不同温度区间的销售分析:")
print(temp_sales_analysis.round(2))
7.7.3 模糊匹配合并
# 模糊匹配合并
print("模糊匹配合并:")
# 创建包含拼写错误的数据
emp_external = pd.DataFrame({
'employee_name': ['Alice Johnson', 'Bob Smith', 'Charlie Brown', 'David Wilson', 'Eva Green'],
'external_id': ['EXT001', 'EXT002', 'EXT003', 'EXT004', 'EXT005'],
'certification': ['AWS', 'Azure', 'GCP', 'AWS', 'Azure']
})
print("1. 外部员工数据(可能有拼写差异):")
print(emp_external)
# 简单的模糊匹配(基于名字的第一个单词)
print("\n2. 基于名字首词的匹配:")
employees['first_name'] = employees['name'].str.split().str[0]
emp_external['first_name'] = emp_external['employee_name'].str.split().str[0]
fuzzy_merge = pd.merge(
employees,
emp_external,
on='first_name',
how='inner'
)
print("模糊匹配结果:")
print(fuzzy_merge[['name', 'employee_name', 'certification']].head())
# 使用fuzzywuzzy进行更精确的模糊匹配(如果安装了的话)
try:
from fuzzywuzzy import fuzz, process
print("\n3. 使用fuzzywuzzy进行精确模糊匹配:")
def fuzzy_match(name, choices, threshold=80):
"""模糊匹配函数"""
match = process.extractOne(name, choices)
if match and match[1] >= threshold:
return match[0]
return None
# 为每个外部员工找到最佳匹配
emp_names = employees['name'].tolist()
emp_external['matched_name'] = emp_external['employee_name'].apply(
lambda x: fuzzy_match(x, emp_names, threshold=70)
)
print("模糊匹配结果:")
print(emp_external[['employee_name', 'matched_name', 'certification']])
# 基于模糊匹配结果进行合并
precise_fuzzy_merge = pd.merge(
employees,
emp_external,
left_on='name',
right_on='matched_name',
how='inner'
)
print("\n精确模糊匹配合并结果:")
print(precise_fuzzy_merge[['name', 'employee_name', 'certification']])
except ImportError:
print("\n3. fuzzywuzzy未安装,跳过精确模糊匹配")
7.8 实际应用案例
7.8.1 客户360度视图
# 客户360度视图案例
print("客户360度视图案例:")
# 创建客户相关的多个数据表
customers = pd.DataFrame({
'customer_id': range(1, 11),
'name': [f'Customer_{i}' for i in range(1, 11)],
'segment': np.random.choice(['Premium', 'Standard', 'Basic'], 10),
'registration_date': pd.date_range('2022-01-01', periods=10, freq='30D')
})
orders = pd.DataFrame({
'order_id': range(1, 21),
'customer_id': np.random.choice(range(1, 11), 20),
'order_date': pd.date_range('2023-01-01', periods=20, freq='15D'),
'amount': np.random.uniform(100, 1000, 20),
'status': np.random.choice(['Completed', 'Pending', 'Cancelled'], 20)
})
support_tickets = pd.DataFrame({
'ticket_id': range(1, 16),
'customer_id': np.random.choice(range(1, 11), 15),
'issue_type': np.random.choice(['Technical', 'Billing', 'General'], 15),
'priority': np.random.choice(['High', 'Medium', 'Low'], 15),
'status': np.random.choice(['Open', 'Closed', 'In Progress'], 15)
})
print("1. 基础数据表:")
print(f"客户表:{customers.shape}")
print(f"订单表:{orders.shape}")
print(f"支持票据表:{support_tickets.shape}")
# 构建360度客户视图
print("\n2. 构建360度客户视图:")
# 订单汇总
order_summary = orders.groupby('customer_id').agg({
'order_id': 'count',
'amount': ['sum', 'mean', 'max'],
'order_date': ['min', 'max']
}).round(2)
order_summary.columns = ['total_orders', 'total_amount', 'avg_amount', 'max_amount', 'first_order', 'last_order']
# 支持票据汇总
support_summary = support_tickets.groupby('customer_id').agg({
'ticket_id': 'count',
'priority': lambda x: (x == 'High').sum(),
'status': lambda x: (x == 'Open').sum()
})
support_summary.columns = ['total_tickets', 'high_priority_tickets', 'open_tickets']
# 合并所有信息
customer_360 = (customers.set_index('customer_id')
.join(order_summary, how='left')
.join(support_summary, how='left')
.fillna(0))
print("客户360度视图:")
print(customer_360.head())
# 客户价值分析
print("\n3. 客户价值分析:")
customer_360['customer_value_score'] = (
customer_360['total_amount'] * 0.4 +
customer_360['total_orders'] * 100 * 0.3 +
(10 - customer_360['total_tickets']) * 50 * 0.2 +
(customer_360['segment'].map({'Premium': 300, 'Standard': 200, 'Basic': 100})) * 0.1
)
customer_ranking = customer_360.sort_values('customer_value_score', ascending=False)
print("客户价值排名(前5名):")
print(customer_ranking[['name', 'segment', 'total_amount', 'total_orders', 'customer_value_score']].head())
7.8.2 供应链数据整合
# 供应链数据整合案例
print("\n供应链数据整合案例:")
# 创建供应链相关数据
suppliers = pd.DataFrame({
'supplier_id': ['SUP001', 'SUP002', 'SUP003', 'SUP004'],
'supplier_name': ['TechCorp', 'MaterialsInc', 'ComponentsLtd', 'PartsWorld'],
'country': ['USA', 'China', 'Germany', 'Japan'],
'rating': [4.5, 4.2, 4.8, 4.3]
})
products = pd.DataFrame({
'product_id': ['PROD001', 'PROD002', 'PROD003', 'PROD004', 'PROD005'],
'product_name': ['Laptop', 'Mouse', 'Keyboard', 'Monitor', 'Headphones'],
'category': ['Electronics', 'Accessories', 'Accessories', 'Electronics', 'Accessories'],
'unit_cost': [800, 25, 50, 300, 100]
})
procurement = pd.DataFrame({
'procurement_id': range(1, 21),
'supplier_id': np.random.choice(suppliers['supplier_id'], 20),
'product_id': np.random.choice(products['product_id'], 20),
'quantity': np.random.randint(10, 100, 20),
'unit_price': np.random.uniform(20, 900, 20),
'order_date': pd.date_range('2023-01-01', periods=20, freq='10D')
})
inventory = pd.DataFrame({
'product_id': products['product_id'],
'current_stock': np.random.randint(50, 500, len(products)),
'reorder_level': np.random.randint(20, 100, len(products)),
'last_updated': pd.date_range('2023-12-01', periods=len(products), freq='D')
})
print("1. 供应链数据概览:")
print(f"供应商:{len(suppliers)}家")
print(f"产品:{len(products)}种")
print(f"采购记录:{len(procurement)}条")
print(f"库存记录:{len(inventory)}条")
# 整合供应链数据
print("\n2. 供应链数据整合:")
supply_chain_data = (procurement
.merge(suppliers, on='supplier_id', how='left')
.merge(products, on='product_id', how='left')
.merge(inventory, on='product_id', how='left'))
print("整合后的供应链数据:")
print(supply_chain_data[['supplier_name', 'product_name', 'quantity', 'unit_price', 'current_stock']].head())
# 供应链分析
print("\n3. 供应链分析:")
# 供应商表现分析
supplier_performance = supply_chain_data.groupby('supplier_name').agg({
'procurement_id': 'count',
'quantity': 'sum',
'unit_price': 'mean',
'rating': 'first'
}).round(2)
supplier_performance.columns = ['total_orders', 'total_quantity', 'avg_price', 'rating']
supplier_performance['price_competitiveness'] = 1 / supplier_performance['avg_price'] * 1000 # 价格竞争力指数
print("供应商表现分析:")
print(supplier_performance.sort_values('rating', ascending=False))
# 产品采购分析
product_analysis = supply_chain_data.groupby('product_name').agg({
'quantity': 'sum',
'unit_price': 'mean',
'current_stock': 'first',
'reorder_level': 'first'
}).round(2)
product_analysis['stock_status'] = product_analysis.apply(
lambda row: 'Low Stock' if row['current_stock'] <= row['reorder_level'] else 'Normal',
axis=1
)
print("\n产品采购分析:")
print(product_analysis)
# 库存预警
print("\n4. 库存预警:")
low_stock_products = product_analysis[product_analysis['stock_status'] == 'Low Stock']
if not low_stock_products.empty:
print("需要补货的产品:")
print(low_stock_products[['current_stock', 'reorder_level']])
else:
print("所有产品库存正常")
7.9 性能优化最佳实践
7.9.1 大数据合并优化
# 大数据合并优化
print("大数据合并优化:")
# 创建大数据集
large_left = pd.DataFrame({
'key': np.random.randint(0, 100000, 500000),
'value1': np.random.randn(500000),
'category': np.random.choice(['A', 'B', 'C', 'D'], 500000)
})
large_right = pd.DataFrame({
'key': np.random.randint(0, 50000, 200000),
'value2': np.random.randn(200000),
'info': np.random.choice(['X', 'Y', 'Z'], 200000)
})
print(f"大数据集 - 左表:{large_left.shape},右表:{large_right.shape}")
# 1. 数据类型优化
print("\n1. 数据类型优化:")
start_time = time.time()
result1 = pd.merge(large_left, large_right, on='key', how='inner')
original_time = time.time() - start_time
# 优化数据类型
large_left_opt = large_left.copy()
large_right_opt = large_right.copy()
large_left_opt['category'] = large_left_opt['category'].astype('category')
large_right_opt['info'] = large_right_opt['info'].astype('category')
start_time = time.time()
result2 = pd.merge(large_left_opt, large_right_opt, on='key', how='inner')
optimized_time = time.time() - start_time
print(f"原始合并时间:{original_time:.4f}秒")
print(f"优化后合并时间:{optimized_time:.4f}秒")
print(f"性能提升:{original_time/optimized_time:.2f}倍")
# 2. 索引优化
print("\n2. 索引优化:")
large_left_indexed = large_left.set_index('key').sort_index()
large_right_indexed = large_right.set_index('key').sort_index()
start_time = time.time()
result3 = large_left_indexed.join(large_right_indexed, how='inner')
indexed_time = time.time() - start_time
print(f"索引join时间:{indexed_time:.4f}秒")
print(f"相对原始merge提升:{original_time/indexed_time:.2f}倍")
# 3. 分块处理
print("\n3. 分块处理:")
def chunked_merge(left_df, right_df, chunk_size=50000):
"""分块合并大数据"""
results = []
for i in range(0, len(left_df), chunk_size):
chunk = left_df.iloc[i:i+chunk_size]
chunk_result = pd.merge(chunk, right_df, on='key', how='inner')
results.append(chunk_result)
return pd.concat(results, ignore_index=True)
start_time = time.time()
result4 = chunked_merge(large_left, large_right, chunk_size=100000)
chunked_time = time.time() - start_time
print(f"分块合并时间:{chunked_time:.4f}秒")
print(f"相对原始merge:{original_time/chunked_time:.2f}倍")
7.9.2 内存优化策略
# 内存优化策略
print("内存优化策略:")
# 内存使用监控
def get_memory_usage():
"""获取当前内存使用情况"""
try:
import psutil
process = psutil.Process()
return process.memory_info().rss / 1024 / 1024 # MB
except ImportError:
return "psutil not available"
print(f"1. 当前内存使用:{get_memory_usage()} MB")
# 创建内存密集型操作
memory_test_left = pd.DataFrame({
'key': range(200000),
'data': ['x' * 100] * 200000 # 大字符串数据
})
memory_test_right = pd.DataFrame({
'key': range(100000, 300000),
'info': ['y' * 100] * 200000
})
print(f"测试数据内存使用:{memory_test_left.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
# 内存优化技巧
print("\n2. 内存优化技巧:")
# 使用更高效的数据类型
memory_test_left_opt = memory_test_left.copy()
memory_test_left_opt['data'] = memory_test_left_opt['data'].astype('string') # 使用string类型
print(f"优化后内存使用:{memory_test_left_opt.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
# 及时删除不需要的数据
del memory_test_left, memory_test_right, memory_test_left_opt
import gc
gc.collect()
print(f"清理后内存使用:{get_memory_usage()} MB")
# 使用生成器处理大数据
print("\n3. 生成器处理大数据:")
def data_generator(size, chunk_size=10000):
"""数据生成器"""
for i in range(0, size, chunk_size):
yield pd.DataFrame({
'key': range(i, min(i + chunk_size, size)),
'value': np.random.randn(min(chunk_size, size - i))
})
# 使用生成器处理数据
total_processed = 0
for chunk in data_generator(100000, 10000):
# 模拟处理
processed_chunk = chunk.groupby('key')['value'].sum()
total_processed += len(processed_chunk)
print(f"生成器处理完成,总处理记录:{total_processed}")
7.9.3 合并策略选择
# 合并策略选择
print("合并策略选择:")
# 创建不同大小的测试数据
small_left = pd.DataFrame({'key': range(1000), 'value': range(1000)})
small_right = pd.DataFrame({'key': range(500, 1500), 'info': range(1000)})
medium_left = pd.DataFrame({'key': range(50000), 'value': range(50000)})
medium_right = pd.DataFrame({'key': range(25000, 75000), 'info': range(50000)})
print("1. 不同数据规模的最佳策略:")
# 小数据集
start_time = time.time()
small_merge = pd.merge(small_left, small_right, on='key', how='inner')
small_merge_time = time.time() - start_time
start_time = time.time()
small_join = small_left.set_index('key').join(small_right.set_index('key'), how='inner')
small_join_time = time.time() - start_time
print(f"小数据集 - Merge: {small_merge_time:.6f}秒, Join: {small_join_time:.6f}秒")
# 中等数据集
start_time = time.time()
medium_merge = pd.merge(medium_left, medium_right, on='key', how='inner')
medium_merge_time = time.time() - start_time
start_time = time.time()
medium_join = medium_left.set_index('key').join(medium_right.set_index('key'), how='inner')
medium_join_time = time.time() - start_time
print(f"中等数据集 - Merge: {medium_merge_time:.6f}秒, Join: {medium_join_time:.6f}秒")
# 策略建议
print("\n2. 策略选择建议:")
print("- 小数据集(<10K行): merge和join性能相近,选择语法更清晰的")
print("- 中等数据集(10K-100K行): 预先设置索引的join通常更快")
print("- 大数据集(>100K行): 考虑分块处理或使用数据库")
print("- 多次合并: 优先考虑索引优化")
print("- 内存受限: 使用分块处理或生成器")
7.10 本章小结
7.10.1 核心知识点
Merge操作
- 连接类型:inner, left, right, outer
- 连接键:单列、多列连接
- 冲突处理:suffixes参数
- 性能优化:排序、索引、数据类型
Join操作
- 基于索引的连接
- 多级索引join
- 性能优势:预设索引的情况下
Concat操作
- 垂直拼接:行方向合并
- 水平拼接:列方向合并
- 多层拼接:keys参数
- 缺失值处理:join参数
数据质量管理
- 重复数据检测和处理
- 冲突解决策略
- 数据一致性检查
- 引用完整性验证
高级技巧
- 条件合并
- 时间序列合并
- 模糊匹配
- 链式操作
7.10.2 最佳实践
- 选择合适的合并方法:根据数据特点选择merge、join或concat
- 优化数据类型:使用category类型减少内存使用
- 预处理优化:排序、索引设置提高性能
- 分块处理:处理超大数据集时的内存管理
- 数据验证:合并后进行数据质量检查
7.10.3 性能优化要点
- 索引优化:预先设置索引可显著提升join性能
- 数据类型优化:使用合适的数据类型减少内存使用
- 分块处理:对于大数据集采用分块策略
- 内存管理:及时清理不需要的中间结果
- 策略选择:根据数据规模选择最优的合并策略
7.10.4 常见陷阱
- 忘记处理重复键导致的数据膨胀
- 不同数据类型的键无法正确匹配
- 大数据集合并时的内存溢出
- 时间序列数据的时区和格式问题
- 字符串匹配的大小写敏感性
7.10.5 下一步学习
在下一章中,我们将学习: - 时间序列数据的处理和分析 - 日期时间索引的使用 - 时间序列的重采样和频率转换 - 时间窗口分析和滚动计算
练习题
- 实现一个完整的客户数据整合流程
- 设计一个处理大数据集合并的优化方案
- 创建一个模糊匹配的数据清洗工具
- 构建一个多数据源的实时数据合并系统
- 分析不同合并策略在各种场景下的性能表现
记住:数据合并是数据分析的基础技能,正确的合并策略能够确保数据的完整性和分析的准确性!