7.1 章节概述

在实际数据分析中,我们经常需要将来自不同数据源的数据进行合并和连接。本章将详细介绍Pandas中的数据合并操作,包括merge、join、concat等方法,以及如何处理各种复杂的数据连接场景。

7.1.1 学习目标

  • 掌握merge操作的各种连接类型
  • 学会使用join方法进行索引连接
  • 理解concat操作的数据拼接
  • 学习处理重复数据和冲突解决
  • 掌握多表连接和复杂合并场景
  • 了解合并操作的性能优化技巧

7.1.2 数据合并类型

graph TD
    A[数据合并] --> B[merge 合并]
    A --> C[join 连接]
    A --> D[concat 拼接]
    
    B --> B1[inner join 内连接]
    B --> B2[left join 左连接]
    B --> B3[right join 右连接]
    B --> B4[outer join 外连接]
    
    C --> C1[基于索引连接]
    C --> C2[多级索引连接]
    
    D --> D1[垂直拼接]
    D --> D2[水平拼接]
    D --> D3[多层拼接]

7.2 数据准备

7.2.1 创建示例数据

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# 创建员工信息表
employees = pd.DataFrame({
    'emp_id': [1, 2, 3, 4, 5, 6, 7, 8],
    'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve', 'Frank', 'Grace', 'Henry'],
    'department': ['IT', 'HR', 'Finance', 'IT', 'Marketing', 'IT', 'HR', 'Finance'],
    'hire_date': pd.date_range('2020-01-01', periods=8, freq='3M'),
    'salary': [75000, 65000, 70000, 80000, 72000, 78000, 68000, 73000]
})

# 创建部门信息表
departments = pd.DataFrame({
    'dept_name': ['IT', 'HR', 'Finance', 'Marketing', 'Sales'],
    'manager': ['John', 'Sarah', 'Mike', 'Lisa', 'Tom'],
    'budget': [500000, 200000, 300000, 250000, 400000],
    'location': ['Building A', 'Building B', 'Building A', 'Building C', 'Building B']
})

# 创建项目信息表
projects = pd.DataFrame({
    'project_id': ['P001', 'P002', 'P003', 'P004', 'P005'],
    'project_name': ['Website Redesign', 'HR System', 'Budget Analysis', 'Marketing Campaign', 'Sales Portal'],
    'department': ['IT', 'HR', 'Finance', 'Marketing', 'Sales'],
    'budget': [100000, 50000, 30000, 80000, 120000],
    'status': ['Active', 'Completed', 'Active', 'Planning', 'Active']
})

# 创建员工项目分配表
emp_projects = pd.DataFrame({
    'emp_id': [1, 1, 2, 3, 4, 4, 5, 6, 7, 8],
    'project_id': ['P001', 'P003', 'P002', 'P003', 'P001', 'P004', 'P004', 'P001', 'P002', 'P003'],
    'role': ['Lead', 'Analyst', 'Lead', 'Lead', 'Developer', 'Analyst', 'Lead', 'Developer', 'Analyst', 'Analyst'],
    'allocation': [0.8, 0.2, 1.0, 1.0, 0.6, 0.4, 1.0, 0.5, 1.0, 1.0]
})

# 创建销售数据表
sales_q1 = pd.DataFrame({
    'emp_id': [1, 2, 3, 4, 5],
    'q1_sales': [120000, 95000, 110000, 130000, 105000],
    'q1_target': [100000, 90000, 100000, 120000, 100000]
})

sales_q2 = pd.DataFrame({
    'emp_id': [1, 2, 3, 4, 6],
    'q2_sales': [135000, 88000, 125000, 140000, 115000],
    'q2_target': [110000, 95000, 110000, 130000, 110000]
})

print("示例数据创建完成:")
print(f"员工表:{employees.shape}")
print(f"部门表:{departments.shape}")
print(f"项目表:{projects.shape}")
print(f"员工项目表:{emp_projects.shape}")
print(f"Q1销售表:{sales_q1.shape}")
print(f"Q2销售表:{sales_q2.shape}")

# 显示数据样例
print("\n员工信息表:")
print(employees)
print("\n部门信息表:")
print(departments)

7.2.2 数据概览

# 数据概览
print("数据概览:")

print("1. 员工信息表结构:")
print(employees.info())
print(f"员工数量:{len(employees)}")
print(f"部门分布:\n{employees['department'].value_counts()}")

print("\n2. 部门信息表结构:")
print(departments.info())
print(f"部门数量:{len(departments)}")

print("\n3. 项目信息表结构:")
print(projects.info())
print(f"项目状态分布:\n{projects['status'].value_counts()}")

print("\n4. 员工项目分配表结构:")
print(emp_projects.info())
print(f"角色分布:\n{emp_projects['role'].value_counts()}")

# 检查数据质量
print("\n5. 数据质量检查:")
print("员工表缺失值:", employees.isnull().sum().sum())
print("部门表缺失值:", departments.isnull().sum().sum())
print("项目表缺失值:", projects.isnull().sum().sum())
print("员工项目表缺失值:", emp_projects.isnull().sum().sum())

# 检查键的唯一性
print("\n6. 键的唯一性检查:")
print(f"员工ID唯一性:{employees['emp_id'].is_unique}")
print(f"部门名称唯一性:{departments['dept_name'].is_unique}")
print(f"项目ID唯一性:{projects['project_id'].is_unique}")

7.3 Merge操作详解

7.3.1 基本Merge操作

# 基本Merge操作
print("基本Merge操作:")

# 内连接(Inner Join)
print("1. 内连接 - 员工与部门信息:")
inner_merge = pd.merge(employees, departments, left_on='department', right_on='dept_name', how='inner')
print(f"内连接结果行数:{len(inner_merge)}")
print(inner_merge[['name', 'department', 'manager', 'budget', 'location']].head())

# 左连接(Left Join)
print("\n2. 左连接 - 保留所有员工:")
left_merge = pd.merge(employees, departments, left_on='department', right_on='dept_name', how='left')
print(f"左连接结果行数:{len(left_merge)}")
print("缺失部门信息的员工:")
missing_dept = left_merge[left_merge['manager'].isnull()]
if not missing_dept.empty:
    print(missing_dept[['name', 'department']])
else:
    print("所有员工都有对应的部门信息")

# 右连接(Right Join)
print("\n3. 右连接 - 保留所有部门:")
right_merge = pd.merge(employees, departments, left_on='department', right_on='dept_name', how='right')
print(f"右连接结果行数:{len(right_merge)}")
print("没有员工的部门:")
no_employees = right_merge[right_merge['name'].isnull()]
if not no_employees.empty:
    print(no_employees[['dept_name', 'manager']])
else:
    print("所有部门都有员工")

# 外连接(Outer Join)
print("\n4. 外连接 - 保留所有记录:")
outer_merge = pd.merge(employees, departments, left_on='department', right_on='dept_name', how='outer')
print(f"外连接结果行数:{len(outer_merge)}")
print("外连接缺失值统计:")
print(outer_merge.isnull().sum())

7.3.2 多列连接

# 多列连接
print("多列连接:")

# 创建包含多个连接键的数据
emp_extended = employees.copy()
emp_extended['dept_code'] = emp_extended['department'].map({
    'IT': 'IT01', 'HR': 'HR01', 'Finance': 'FN01', 'Marketing': 'MK01'
})

dept_extended = departments.copy()
dept_extended['dept_code'] = dept_extended['dept_name'].map({
    'IT': 'IT01', 'HR': 'HR01', 'Finance': 'FN01', 'Marketing': 'MK01', 'Sales': 'SL01'
})

print("1. 多列连接示例:")
multi_key_merge = pd.merge(
    emp_extended, 
    dept_extended, 
    left_on=['department', 'dept_code'], 
    right_on=['dept_name', 'dept_code'], 
    how='inner'
)
print(f"多列连接结果:{len(multi_key_merge)}行")
print(multi_key_merge[['name', 'department', 'dept_code', 'manager']].head())

# 处理列名冲突
print("\n2. 处理列名冲突:")
# 创建有冲突列名的数据
emp_with_budget = employees.copy()
emp_with_budget['budget'] = emp_with_budget['salary'] * 0.1  # 个人预算

conflict_merge = pd.merge(
    emp_with_budget, 
    departments, 
    left_on='department', 
    right_on='dept_name', 
    how='inner',
    suffixes=('_personal', '_dept')
)
print("列名冲突处理结果:")
print(conflict_merge[['name', 'budget_personal', 'budget_dept']].head())

7.3.3 复杂连接场景

# 复杂连接场景
print("复杂连接场景:")

# 一对多连接
print("1. 一对多连接 - 员工与项目分配:")
emp_project_merge = pd.merge(employees, emp_projects, on='emp_id', how='left')
print(f"员工项目连接结果:{len(emp_project_merge)}行")
print("员工项目分配情况:")
print(emp_project_merge[['name', 'project_id', 'role', 'allocation']].head(10))

# 多对多连接
print("\n2. 多对多连接 - 完整项目信息:")
full_project_info = pd.merge(
    pd.merge(employees, emp_projects, on='emp_id'),
    projects, on='project_id', how='inner'
)
print(f"完整项目信息:{len(full_project_info)}行")
print("项目团队信息:")
print(full_project_info[['name', 'project_name', 'role', 'allocation', 'status']].head())

# 链式连接
print("\n3. 链式连接 - 员工-部门-项目完整信息:")
complete_info = (employees
                .merge(departments, left_on='department', right_on='dept_name', how='left')
                .merge(emp_projects, on='emp_id', how='left')
                .merge(projects, on='project_id', how='left'))

print(f"完整信息表:{len(complete_info)}行")
print("完整信息样例:")
print(complete_info[['name', 'department', 'manager', 'project_name', 'role']].head())

# 条件连接
print("\n4. 条件连接 - 活跃项目的员工:")
active_projects = projects[projects['status'] == 'Active']
active_emp_projects = pd.merge(
    pd.merge(employees, emp_projects, on='emp_id'),
    active_projects, on='project_id', how='inner'
)
print(f"参与活跃项目的员工:{len(active_emp_projects)}行")
print("活跃项目参与者:")
active_summary = active_emp_projects.groupby('name').agg({
    'project_name': 'count',
    'allocation': 'sum'
}).rename(columns={'project_name': 'active_projects', 'allocation': 'total_allocation'})
print(active_summary)

7.3.4 Merge性能优化

# Merge性能优化
print("Merge性能优化:")

# 创建大数据集进行性能测试
import time

large_left = pd.DataFrame({
    'key': np.random.randint(0, 10000, 100000),
    'value1': np.random.randn(100000)
})

large_right = pd.DataFrame({
    'key': np.random.randint(0, 5000, 50000),
    'value2': np.random.randn(50000)
})

print(f"左表大小:{large_left.shape}")
print(f"右表大小:{large_right.shape}")

# 1. 基本merge性能
print("\n1. 基本merge性能测试:")
start_time = time.time()
basic_merge = pd.merge(large_left, large_right, on='key', how='inner')
basic_time = time.time() - start_time
print(f"基本merge时间:{basic_time:.4f}秒,结果行数:{len(basic_merge)}")

# 2. 排序优化
print("\n2. 排序优化测试:")
large_left_sorted = large_left.sort_values('key')
large_right_sorted = large_right.sort_values('key')

start_time = time.time()
sorted_merge = pd.merge(large_left_sorted, large_right_sorted, on='key', how='inner')
sorted_time = time.time() - start_time
print(f"排序后merge时间:{sorted_time:.4f}秒")
print(f"性能提升:{basic_time/sorted_time:.2f}倍")

# 3. 索引优化
print("\n3. 索引优化测试:")
large_left_indexed = large_left.set_index('key')
large_right_indexed = large_right.set_index('key')

start_time = time.time()
indexed_merge = large_left_indexed.join(large_right_indexed, how='inner')
indexed_time = time.time() - start_time
print(f"索引join时间:{indexed_time:.4f}秒")
print(f"相对基本merge提升:{basic_time/indexed_time:.2f}倍")

# 4. 分类数据类型优化
print("\n4. 分类数据类型优化:")
large_left['key_cat'] = large_left['key'].astype('category')
large_right['key_cat'] = large_right['key'].astype('category')

start_time = time.time()
category_merge = pd.merge(large_left, large_right, on='key_cat', how='inner')
category_time = time.time() - start_time
print(f"分类类型merge时间:{category_time:.4f}秒")
print(f"相对基本merge提升:{basic_time/category_time:.2f}倍")

7.4 Join操作

7.4.1 基本Join操作

# 基本Join操作
print("基本Join操作:")

# 设置索引进行join
emp_indexed = employees.set_index('emp_id')
sales_q1_indexed = sales_q1.set_index('emp_id')
sales_q2_indexed = sales_q2.set_index('emp_id')

print("1. 基本索引join:")
emp_sales_q1 = emp_indexed.join(sales_q1_indexed, how='left')
print(f"员工Q1销售数据:{len(emp_sales_q1)}行")
print(emp_sales_q1[['name', 'department', 'q1_sales', 'q1_target']].head())

print("\n2. 多表join:")
emp_all_sales = emp_indexed.join([sales_q1_indexed, sales_q2_indexed], how='outer')
print(f"员工全年销售数据:{len(emp_all_sales)}行")
print(emp_all_sales[['name', 'q1_sales', 'q2_sales']].head())

# 不同join类型
print("\n3. 不同join类型对比:")
join_types = ['left', 'right', 'outer', 'inner']
for join_type in join_types:
    result = emp_indexed.join(sales_q1_indexed, how=join_type)
    print(f"{join_type} join: {len(result)}行")

# 处理列名冲突
print("\n4. Join中的列名冲突处理:")
emp_with_target = employees.set_index('emp_id')
emp_with_target['target'] = 100000  # 添加目标列

sales_with_target = sales_q1.set_index('emp_id')
sales_with_target['target'] = sales_with_target['q1_target']  # 重命名目标列

conflict_join = emp_with_target.join(sales_with_target, rsuffix='_sales')
print("列名冲突处理结果:")
print(conflict_join[['name', 'target', 'target_sales']].head())

7.4.2 多级索引Join

# 多级索引Join
print("多级索引Join:")

# 创建多级索引数据
emp_multi = employees.set_index(['department', 'emp_id'])
dept_multi = departments.set_index('dept_name')

print("1. 多级索引结构:")
print("员工多级索引:")
print(emp_multi.head())

# 多级索引join
print("\n2. 多级索引join操作:")
# 需要调整索引级别进行join
emp_dept_join = emp_multi.reset_index().set_index('department').join(
    dept_multi, how='left'
)
print("多级索引join结果:")
print(emp_dept_join[['emp_id', 'name', 'manager', 'budget']].head())

# 复杂多级索引场景
print("\n3. 复杂多级索引场景:")
# 创建按部门和项目的多级索引
project_dept = projects.set_index(['department', 'project_id'])
emp_project_multi = emp_projects.merge(
    employees[['emp_id', 'department']], on='emp_id'
).set_index(['department', 'project_id'])

multi_join = emp_project_multi.join(project_dept, how='left')
print("复杂多级索引join:")
print(multi_join[['emp_id', 'role', 'project_name', 'status']].head())

7.4.3 Join性能对比

# Join性能对比
print("Join性能对比:")

# 创建测试数据
test_left = pd.DataFrame({
    'key': range(50000),
    'value1': np.random.randn(50000)
}).set_index('key')

test_right = pd.DataFrame({
    'key': range(25000, 75000),
    'value2': np.random.randn(50000)
}).set_index('key')

print(f"测试数据大小 - 左表:{test_left.shape},右表:{test_right.shape}")

# Join vs Merge性能对比
print("\n1. Join vs Merge性能对比:")

# Join方法
start_time = time.time()
join_result = test_left.join(test_right, how='inner')
join_time = time.time() - start_time

# Merge方法
test_left_reset = test_left.reset_index()
test_right_reset = test_right.reset_index()
start_time = time.time()
merge_result = pd.merge(test_left_reset, test_right_reset, on='key', how='inner')
merge_time = time.time() - start_time

print(f"Join时间:{join_time:.4f}秒,结果:{len(join_result)}行")
print(f"Merge时间:{merge_time:.4f}秒,结果:{len(merge_result)}行")
print(f"Join相对Merge速度提升:{merge_time/join_time:.2f}倍")

# 不同join类型的性能
print("\n2. 不同join类型性能:")
join_types = ['left', 'right', 'outer', 'inner']
for join_type in join_types:
    start_time = time.time()
    result = test_left.join(test_right, how=join_type)
    elapsed_time = time.time() - start_time
    print(f"{join_type} join: {elapsed_time:.4f}秒,{len(result)}行")

7.5 Concat操作

7.5.1 基本Concat操作

# 基本Concat操作
print("基本Concat操作:")

# 垂直拼接(行拼接)
print("1. 垂直拼接 - 合并Q1和Q2销售数据:")
sales_combined = pd.concat([sales_q1, sales_q2], ignore_index=True)
print(f"拼接结果:{len(sales_combined)}行")
print(sales_combined.head(8))

# 处理重复索引
print("\n2. 处理重复索引:")
sales_with_keys = pd.concat([sales_q1, sales_q2], keys=['Q1', 'Q2'])
print("带键的拼接结果:")
print(sales_with_keys.head(8))

# 水平拼接(列拼接)
print("\n3. 水平拼接 - 合并Q1和Q2销售指标:")
sales_horizontal = pd.concat([
    sales_q1.set_index('emp_id'), 
    sales_q2.set_index('emp_id')
], axis=1, join='outer')
print("水平拼接结果:")
print(sales_horizontal)

# 处理缺失值
print("\n4. 处理拼接中的缺失值:")
sales_inner = pd.concat([
    sales_q1.set_index('emp_id'), 
    sales_q2.set_index('emp_id')
], axis=1, join='inner')
print("内连接拼接(无缺失值):")
print(sales_inner)

7.5.2 复杂Concat场景

# 复杂Concat场景
print("复杂Concat场景:")

# 多个DataFrame拼接
print("1. 多个DataFrame垂直拼接:")
# 创建多个月份的数据
monthly_data = []
for month in range(1, 4):
    month_data = pd.DataFrame({
        'emp_id': np.random.choice(employees['emp_id'], 5),
        'month': month,
        'sales': np.random.randint(10000, 50000, 5),
        'expenses': np.random.randint(1000, 5000, 5)
    })
    monthly_data.append(month_data)

quarterly_data = pd.concat(monthly_data, ignore_index=True)
print(f"季度数据:{len(quarterly_data)}行")
print(quarterly_data.head(10))

# 带标识的拼接
print("\n2. 带标识的多DataFrame拼接:")
labeled_data = pd.concat(monthly_data, keys=['Jan', 'Feb', 'Mar'])
print("带月份标识的数据:")
print(labeled_data.head(10))

# 不同结构DataFrame的拼接
print("\n3. 不同结构DataFrame拼接:")
df1 = pd.DataFrame({
    'A': [1, 2, 3],
    'B': [4, 5, 6],
    'C': [7, 8, 9]
})

df2 = pd.DataFrame({
    'A': [10, 11],
    'B': [12, 13],
    'D': [14, 15]  # 不同的列
})

df3 = pd.DataFrame({
    'A': [20],
    'E': [21]  # 完全不同的列
})

mixed_concat = pd.concat([df1, df2, df3], ignore_index=True, sort=False)
print("不同结构拼接结果:")
print(mixed_concat)

# 填充缺失值
print("\n4. 拼接时填充缺失值:")
filled_concat = pd.concat([df1, df2, df3], ignore_index=True).fillna(0)
print("填充缺失值后:")
print(filled_concat)

7.5.3 Concat性能优化

# Concat性能优化
print("Concat性能优化:")

# 创建性能测试数据
test_dfs = []
for i in range(100):
    df = pd.DataFrame({
        'id': range(i*1000, (i+1)*1000),
        'value': np.random.randn(1000),
        'category': np.random.choice(['A', 'B', 'C'], 1000)
    })
    test_dfs.append(df)

print(f"测试数据:{len(test_dfs)}个DataFrame,每个{test_dfs[0].shape[0]}行")

# 1. 基本concat性能
print("\n1. 基本concat性能:")
start_time = time.time()
basic_concat = pd.concat(test_dfs, ignore_index=True)
basic_time = time.time() - start_time
print(f"基本concat时间:{basic_time:.4f}秒,结果:{basic_concat.shape}")

# 2. 预分配内存优化
print("\n2. 预分配内存优化:")
start_time = time.time()
# 预计算总行数
total_rows = sum(len(df) for df in test_dfs)
# 使用copy=False减少内存复制
optimized_concat = pd.concat(test_dfs, ignore_index=True, copy=False)
optimized_time = time.time() - start_time
print(f"优化concat时间:{optimized_time:.4f}秒")
print(f"性能提升:{basic_time/optimized_time:.2f}倍")

# 3. 分批concat
print("\n3. 分批concat策略:")
start_time = time.time()
batch_size = 10
batched_results = []
for i in range(0, len(test_dfs), batch_size):
    batch = test_dfs[i:i+batch_size]
    batch_result = pd.concat(batch, ignore_index=True)
    batched_results.append(batch_result)

final_result = pd.concat(batched_results, ignore_index=True)
batched_time = time.time() - start_time
print(f"分批concat时间:{batched_time:.4f}秒")
print(f"相对基本concat:{basic_time/batched_time:.2f}倍")

# 内存使用对比
print(f"\n4. 内存使用对比:")
print(f"基本concat内存:{basic_concat.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
print(f"优化concat内存:{optimized_concat.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

7.6 处理重复和冲突

7.6.1 重复数据处理

# 重复数据处理
print("重复数据处理:")

# 创建包含重复数据的示例
emp_duplicate = pd.concat([employees, employees.iloc[:3]], ignore_index=True)
print("1. 包含重复数据的员工表:")
print(f"原始数据:{len(employees)}行,重复数据:{len(emp_duplicate)}行")

# 检测重复
print("\n2. 检测重复数据:")
print(f"重复行数:{emp_duplicate.duplicated().sum()}")
print("重复的行:")
print(emp_duplicate[emp_duplicate.duplicated()])

# 删除重复
print("\n3. 删除重复数据:")
emp_no_dup = emp_duplicate.drop_duplicates()
print(f"删除重复后:{len(emp_no_dup)}行")

# 基于特定列检测重复
print("\n4. 基于特定列检测重复:")
emp_name_dup = emp_duplicate.drop_duplicates(subset=['name'])
print(f"基于姓名去重:{len(emp_name_dup)}行")

# 保留不同的重复项
print("\n5. 保留不同的重复项:")
emp_keep_last = emp_duplicate.drop_duplicates(keep='last')
print(f"保留最后一个重复项:{len(emp_keep_last)}行")

emp_keep_none = emp_duplicate.drop_duplicates(keep=False)
print(f"删除所有重复项:{len(emp_keep_none)}行")

7.6.2 合并冲突处理

# 合并冲突处理
print("合并冲突处理:")

# 创建有冲突的数据
emp_update1 = pd.DataFrame({
    'emp_id': [1, 2, 3],
    'salary': [80000, 70000, 75000],  # 更新的薪资
    'department': ['IT', 'HR', 'Finance'],
    'update_date': ['2023-01-01', '2023-01-01', '2023-01-01']
})

emp_update2 = pd.DataFrame({
    'emp_id': [1, 2, 4],
    'salary': [82000, 68000, 85000],  # 不同的薪资更新
    'department': ['IT', 'HR', 'IT'],
    'update_date': ['2023-02-01', '2023-02-01', '2023-02-01']
})

print("1. 冲突数据示例:")
print("更新1:")
print(emp_update1)
print("\n更新2:")
print(emp_update2)

# 使用suffixes处理冲突
print("\n2. 使用suffixes处理冲突:")
conflict_merge = pd.merge(emp_update1, emp_update2, on='emp_id', how='outer', suffixes=('_v1', '_v2'))
print(conflict_merge)

# 自定义冲突解决策略
print("\n3. 自定义冲突解决策略:")
def resolve_conflicts(row):
    """解决冲突的自定义函数"""
    # 使用最新的更新日期
    if pd.isna(row['update_date_v2']):
        return row['salary_v1'], row['department_v1'], row['update_date_v1']
    elif pd.isna(row['update_date_v1']):
        return row['salary_v2'], row['department_v2'], row['update_date_v2']
    else:
        # 比较日期,选择最新的
        if row['update_date_v2'] > row['update_date_v1']:
            return row['salary_v2'], row['department_v2'], row['update_date_v2']
        else:
            return row['salary_v1'], row['department_v1'], row['update_date_v1']

# 应用冲突解决策略
resolved_data = conflict_merge.copy()
resolved_data[['salary_final', 'department_final', 'update_date_final']] = resolved_data.apply(
    lambda row: pd.Series(resolve_conflicts(row)), axis=1
)

print("冲突解决后的数据:")
print(resolved_data[['emp_id', 'salary_final', 'department_final', 'update_date_final']])

# 使用combine_first处理冲突
print("\n4. 使用combine_first处理冲突:")
emp_base = employees.set_index('emp_id')[['salary', 'department']]
emp_update1_indexed = emp_update1.set_index('emp_id')[['salary', 'department']]

combined_data = emp_update1_indexed.combine_first(emp_base)
print("combine_first结果:")
print(combined_data)

7.6.3 数据一致性检查

# 数据一致性检查
print("数据一致性检查:")

# 创建测试数据
emp_sales_merge = pd.merge(employees, sales_q1, on='emp_id', how='left')

print("1. 基本一致性检查:")
print(f"员工总数:{len(employees)}")
print(f"有销售数据的员工:{emp_sales_merge['q1_sales'].notna().sum()}")
print(f"缺失销售数据的员工:{emp_sales_merge['q1_sales'].isna().sum()}")

# 检查数据范围一致性
print("\n2. 数据范围一致性检查:")
print("薪资范围检查:")
print(f"薪资最小值:{employees['salary'].min()}")
print(f"薪资最大值:{employees['salary'].max()}")
print(f"薪资平均值:{employees['salary'].mean():.2f}")

# 检查异常值
Q1 = employees['salary'].quantile(0.25)
Q3 = employees['salary'].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

outliers = employees[(employees['salary'] < lower_bound) | (employees['salary'] > upper_bound)]
print(f"\n薪资异常值检查:")
print(f"异常值数量:{len(outliers)}")
if len(outliers) > 0:
    print("异常值详情:")
    print(outliers[['name', 'salary']])

# 检查引用完整性
print("\n3. 引用完整性检查:")
# 检查员工表中的部门是否都在部门表中
emp_depts = set(employees['department'].unique())
dept_names = set(departments['dept_name'].unique())

missing_depts = emp_depts - dept_names
extra_depts = dept_names - emp_depts

print(f"员工表中的部门:{emp_depts}")
print(f"部门表中的部门:{dept_names}")
print(f"缺失的部门定义:{missing_depts}")
print(f"多余的部门定义:{extra_depts}")

# 检查项目分配的完整性
print("\n4. 项目分配完整性检查:")
emp_in_projects = set(emp_projects['emp_id'].unique())
all_employees = set(employees['emp_id'].unique())
projects_in_allocation = set(emp_projects['project_id'].unique())
all_projects = set(projects['project_id'].unique())

print(f"参与项目的员工:{len(emp_in_projects)}/{len(all_employees)}")
print(f"有员工分配的项目:{len(projects_in_allocation)}/{len(all_projects)}")

unassigned_employees = all_employees - emp_in_projects
unassigned_projects = all_projects - projects_in_allocation

if unassigned_employees:
    print(f"未分配项目的员工:{unassigned_employees}")
if unassigned_projects:
    print(f"未分配员工的项目:{unassigned_projects}")

7.7 高级合并技巧

7.7.1 条件合并

# 条件合并
print("条件合并:")

# 基于条件的合并
print("1. 基于薪资等级的合并:")
# 创建薪资等级表
salary_grades = pd.DataFrame({
    'grade': ['Junior', 'Mid', 'Senior', 'Lead'],
    'min_salary': [0, 60000, 75000, 90000],
    'max_salary': [59999, 74999, 89999, 999999],
    'bonus_rate': [0.05, 0.10, 0.15, 0.20]
})

print("薪资等级表:")
print(salary_grades)

# 使用pd.cut进行条件分组
employees['salary_grade'] = pd.cut(
    employees['salary'],
    bins=[0, 60000, 75000, 90000, float('inf')],
    labels=['Junior', 'Mid', 'Senior', 'Lead'],
    right=False
)

# 合并薪资等级信息
emp_with_grade = pd.merge(employees, salary_grades, left_on='salary_grade', right_on='grade', how='left')
print("\n员工薪资等级信息:")
print(emp_with_grade[['name', 'salary', 'salary_grade', 'bonus_rate']].head())

# 计算奖金
emp_with_grade['bonus'] = emp_with_grade['salary'] * emp_with_grade['bonus_rate']
print("\n员工奖金计算:")
print(emp_with_grade[['name', 'salary', 'bonus_rate', 'bonus']].head())

7.7.2 时间序列合并

# 时间序列合并
print("时间序列合并:")

# 创建时间序列数据
date_range = pd.date_range('2023-01-01', '2023-12-31', freq='D')
daily_metrics = pd.DataFrame({
    'date': date_range,
    'temperature': np.random.normal(20, 10, len(date_range)),
    'sales_factor': np.random.uniform(0.8, 1.2, len(date_range))
})

# 创建员工日销售数据(稀疏数据)
emp_daily_sales = pd.DataFrame({
    'emp_id': np.random.choice(employees['emp_id'], 1000),
    'date': np.random.choice(date_range, 1000),
    'daily_sales': np.random.exponential(1000, 1000)
})

print("1. 时间序列数据概览:")
print(f"日度指标数据:{len(daily_metrics)}行")
print(f"员工日销售数据:{len(emp_daily_sales)}行")

# 时间序列合并
print("\n2. 时间序列合并:")
emp_sales_with_metrics = pd.merge(
    emp_daily_sales, 
    daily_metrics, 
    on='date', 
    how='left'
)

print("合并后的时间序列数据:")
print(emp_sales_with_metrics.head())

# 分析温度对销售的影响
print("\n3. 温度对销售的影响分析:")
correlation = emp_sales_with_metrics['temperature'].corr(emp_sales_with_metrics['daily_sales'])
print(f"温度与销售额相关性:{correlation:.3f}")

# 按温度区间分析
emp_sales_with_metrics['temp_range'] = pd.cut(
    emp_sales_with_metrics['temperature'],
    bins=[-float('inf'), 10, 20, 30, float('inf')],
    labels=['Cold', 'Cool', 'Warm', 'Hot']
)

temp_sales_analysis = emp_sales_with_metrics.groupby('temp_range')['daily_sales'].agg(['count', 'mean', 'std'])
print("\n不同温度区间的销售分析:")
print(temp_sales_analysis.round(2))

7.7.3 模糊匹配合并

# 模糊匹配合并
print("模糊匹配合并:")

# 创建包含拼写错误的数据
emp_external = pd.DataFrame({
    'employee_name': ['Alice Johnson', 'Bob Smith', 'Charlie Brown', 'David Wilson', 'Eva Green'],
    'external_id': ['EXT001', 'EXT002', 'EXT003', 'EXT004', 'EXT005'],
    'certification': ['AWS', 'Azure', 'GCP', 'AWS', 'Azure']
})

print("1. 外部员工数据(可能有拼写差异):")
print(emp_external)

# 简单的模糊匹配(基于名字的第一个单词)
print("\n2. 基于名字首词的匹配:")
employees['first_name'] = employees['name'].str.split().str[0]
emp_external['first_name'] = emp_external['employee_name'].str.split().str[0]

fuzzy_merge = pd.merge(
    employees, 
    emp_external, 
    on='first_name', 
    how='inner'
)

print("模糊匹配结果:")
print(fuzzy_merge[['name', 'employee_name', 'certification']].head())

# 使用fuzzywuzzy进行更精确的模糊匹配(如果安装了的话)
try:
    from fuzzywuzzy import fuzz, process
    
    print("\n3. 使用fuzzywuzzy进行精确模糊匹配:")
    
    def fuzzy_match(name, choices, threshold=80):
        """模糊匹配函数"""
        match = process.extractOne(name, choices)
        if match and match[1] >= threshold:
            return match[0]
        return None
    
    # 为每个外部员工找到最佳匹配
    emp_names = employees['name'].tolist()
    emp_external['matched_name'] = emp_external['employee_name'].apply(
        lambda x: fuzzy_match(x, emp_names, threshold=70)
    )
    
    print("模糊匹配结果:")
    print(emp_external[['employee_name', 'matched_name', 'certification']])
    
    # 基于模糊匹配结果进行合并
    precise_fuzzy_merge = pd.merge(
        employees,
        emp_external,
        left_on='name',
        right_on='matched_name',
        how='inner'
    )
    
    print("\n精确模糊匹配合并结果:")
    print(precise_fuzzy_merge[['name', 'employee_name', 'certification']])
    
except ImportError:
    print("\n3. fuzzywuzzy未安装,跳过精确模糊匹配")

7.8 实际应用案例

7.8.1 客户360度视图

# 客户360度视图案例
print("客户360度视图案例:")

# 创建客户相关的多个数据表
customers = pd.DataFrame({
    'customer_id': range(1, 11),
    'name': [f'Customer_{i}' for i in range(1, 11)],
    'segment': np.random.choice(['Premium', 'Standard', 'Basic'], 10),
    'registration_date': pd.date_range('2022-01-01', periods=10, freq='30D')
})

orders = pd.DataFrame({
    'order_id': range(1, 21),
    'customer_id': np.random.choice(range(1, 11), 20),
    'order_date': pd.date_range('2023-01-01', periods=20, freq='15D'),
    'amount': np.random.uniform(100, 1000, 20),
    'status': np.random.choice(['Completed', 'Pending', 'Cancelled'], 20)
})

support_tickets = pd.DataFrame({
    'ticket_id': range(1, 16),
    'customer_id': np.random.choice(range(1, 11), 15),
    'issue_type': np.random.choice(['Technical', 'Billing', 'General'], 15),
    'priority': np.random.choice(['High', 'Medium', 'Low'], 15),
    'status': np.random.choice(['Open', 'Closed', 'In Progress'], 15)
})

print("1. 基础数据表:")
print(f"客户表:{customers.shape}")
print(f"订单表:{orders.shape}")
print(f"支持票据表:{support_tickets.shape}")

# 构建360度客户视图
print("\n2. 构建360度客户视图:")

# 订单汇总
order_summary = orders.groupby('customer_id').agg({
    'order_id': 'count',
    'amount': ['sum', 'mean', 'max'],
    'order_date': ['min', 'max']
}).round(2)

order_summary.columns = ['total_orders', 'total_amount', 'avg_amount', 'max_amount', 'first_order', 'last_order']

# 支持票据汇总
support_summary = support_tickets.groupby('customer_id').agg({
    'ticket_id': 'count',
    'priority': lambda x: (x == 'High').sum(),
    'status': lambda x: (x == 'Open').sum()
})

support_summary.columns = ['total_tickets', 'high_priority_tickets', 'open_tickets']

# 合并所有信息
customer_360 = (customers.set_index('customer_id')
                .join(order_summary, how='left')
                .join(support_summary, how='left')
                .fillna(0))

print("客户360度视图:")
print(customer_360.head())

# 客户价值分析
print("\n3. 客户价值分析:")
customer_360['customer_value_score'] = (
    customer_360['total_amount'] * 0.4 +
    customer_360['total_orders'] * 100 * 0.3 +
    (10 - customer_360['total_tickets']) * 50 * 0.2 +
    (customer_360['segment'].map({'Premium': 300, 'Standard': 200, 'Basic': 100})) * 0.1
)

customer_ranking = customer_360.sort_values('customer_value_score', ascending=False)
print("客户价值排名(前5名):")
print(customer_ranking[['name', 'segment', 'total_amount', 'total_orders', 'customer_value_score']].head())

7.8.2 供应链数据整合

# 供应链数据整合案例
print("\n供应链数据整合案例:")

# 创建供应链相关数据
suppliers = pd.DataFrame({
    'supplier_id': ['SUP001', 'SUP002', 'SUP003', 'SUP004'],
    'supplier_name': ['TechCorp', 'MaterialsInc', 'ComponentsLtd', 'PartsWorld'],
    'country': ['USA', 'China', 'Germany', 'Japan'],
    'rating': [4.5, 4.2, 4.8, 4.3]
})

products = pd.DataFrame({
    'product_id': ['PROD001', 'PROD002', 'PROD003', 'PROD004', 'PROD005'],
    'product_name': ['Laptop', 'Mouse', 'Keyboard', 'Monitor', 'Headphones'],
    'category': ['Electronics', 'Accessories', 'Accessories', 'Electronics', 'Accessories'],
    'unit_cost': [800, 25, 50, 300, 100]
})

procurement = pd.DataFrame({
    'procurement_id': range(1, 21),
    'supplier_id': np.random.choice(suppliers['supplier_id'], 20),
    'product_id': np.random.choice(products['product_id'], 20),
    'quantity': np.random.randint(10, 100, 20),
    'unit_price': np.random.uniform(20, 900, 20),
    'order_date': pd.date_range('2023-01-01', periods=20, freq='10D')
})

inventory = pd.DataFrame({
    'product_id': products['product_id'],
    'current_stock': np.random.randint(50, 500, len(products)),
    'reorder_level': np.random.randint(20, 100, len(products)),
    'last_updated': pd.date_range('2023-12-01', periods=len(products), freq='D')
})

print("1. 供应链数据概览:")
print(f"供应商:{len(suppliers)}家")
print(f"产品:{len(products)}种")
print(f"采购记录:{len(procurement)}条")
print(f"库存记录:{len(inventory)}条")

# 整合供应链数据
print("\n2. 供应链数据整合:")
supply_chain_data = (procurement
                    .merge(suppliers, on='supplier_id', how='left')
                    .merge(products, on='product_id', how='left')
                    .merge(inventory, on='product_id', how='left'))

print("整合后的供应链数据:")
print(supply_chain_data[['supplier_name', 'product_name', 'quantity', 'unit_price', 'current_stock']].head())

# 供应链分析
print("\n3. 供应链分析:")

# 供应商表现分析
supplier_performance = supply_chain_data.groupby('supplier_name').agg({
    'procurement_id': 'count',
    'quantity': 'sum',
    'unit_price': 'mean',
    'rating': 'first'
}).round(2)

supplier_performance.columns = ['total_orders', 'total_quantity', 'avg_price', 'rating']
supplier_performance['price_competitiveness'] = 1 / supplier_performance['avg_price'] * 1000  # 价格竞争力指数

print("供应商表现分析:")
print(supplier_performance.sort_values('rating', ascending=False))

# 产品采购分析
product_analysis = supply_chain_data.groupby('product_name').agg({
    'quantity': 'sum',
    'unit_price': 'mean',
    'current_stock': 'first',
    'reorder_level': 'first'
}).round(2)

product_analysis['stock_status'] = product_analysis.apply(
    lambda row: 'Low Stock' if row['current_stock'] <= row['reorder_level'] else 'Normal',
    axis=1
)

print("\n产品采购分析:")
print(product_analysis)

# 库存预警
print("\n4. 库存预警:")
low_stock_products = product_analysis[product_analysis['stock_status'] == 'Low Stock']
if not low_stock_products.empty:
    print("需要补货的产品:")
    print(low_stock_products[['current_stock', 'reorder_level']])
else:
    print("所有产品库存正常")

7.9 性能优化最佳实践

7.9.1 大数据合并优化

# 大数据合并优化
print("大数据合并优化:")

# 创建大数据集
large_left = pd.DataFrame({
    'key': np.random.randint(0, 100000, 500000),
    'value1': np.random.randn(500000),
    'category': np.random.choice(['A', 'B', 'C', 'D'], 500000)
})

large_right = pd.DataFrame({
    'key': np.random.randint(0, 50000, 200000),
    'value2': np.random.randn(200000),
    'info': np.random.choice(['X', 'Y', 'Z'], 200000)
})

print(f"大数据集 - 左表:{large_left.shape},右表:{large_right.shape}")

# 1. 数据类型优化
print("\n1. 数据类型优化:")
start_time = time.time()
result1 = pd.merge(large_left, large_right, on='key', how='inner')
original_time = time.time() - start_time

# 优化数据类型
large_left_opt = large_left.copy()
large_right_opt = large_right.copy()
large_left_opt['category'] = large_left_opt['category'].astype('category')
large_right_opt['info'] = large_right_opt['info'].astype('category')

start_time = time.time()
result2 = pd.merge(large_left_opt, large_right_opt, on='key', how='inner')
optimized_time = time.time() - start_time

print(f"原始合并时间:{original_time:.4f}秒")
print(f"优化后合并时间:{optimized_time:.4f}秒")
print(f"性能提升:{original_time/optimized_time:.2f}倍")

# 2. 索引优化
print("\n2. 索引优化:")
large_left_indexed = large_left.set_index('key').sort_index()
large_right_indexed = large_right.set_index('key').sort_index()

start_time = time.time()
result3 = large_left_indexed.join(large_right_indexed, how='inner')
indexed_time = time.time() - start_time

print(f"索引join时间:{indexed_time:.4f}秒")
print(f"相对原始merge提升:{original_time/indexed_time:.2f}倍")

# 3. 分块处理
print("\n3. 分块处理:")
def chunked_merge(left_df, right_df, chunk_size=50000):
    """分块合并大数据"""
    results = []
    for i in range(0, len(left_df), chunk_size):
        chunk = left_df.iloc[i:i+chunk_size]
        chunk_result = pd.merge(chunk, right_df, on='key', how='inner')
        results.append(chunk_result)
    return pd.concat(results, ignore_index=True)

start_time = time.time()
result4 = chunked_merge(large_left, large_right, chunk_size=100000)
chunked_time = time.time() - start_time

print(f"分块合并时间:{chunked_time:.4f}秒")
print(f"相对原始merge:{original_time/chunked_time:.2f}倍")

7.9.2 内存优化策略

# 内存优化策略
print("内存优化策略:")

# 内存使用监控
def get_memory_usage():
    """获取当前内存使用情况"""
    try:
        import psutil
        process = psutil.Process()
        return process.memory_info().rss / 1024 / 1024  # MB
    except ImportError:
        return "psutil not available"

print(f"1. 当前内存使用:{get_memory_usage()} MB")

# 创建内存密集型操作
memory_test_left = pd.DataFrame({
    'key': range(200000),
    'data': ['x' * 100] * 200000  # 大字符串数据
})

memory_test_right = pd.DataFrame({
    'key': range(100000, 300000),
    'info': ['y' * 100] * 200000
})

print(f"测试数据内存使用:{memory_test_left.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

# 内存优化技巧
print("\n2. 内存优化技巧:")

# 使用更高效的数据类型
memory_test_left_opt = memory_test_left.copy()
memory_test_left_opt['data'] = memory_test_left_opt['data'].astype('string')  # 使用string类型

print(f"优化后内存使用:{memory_test_left_opt.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

# 及时删除不需要的数据
del memory_test_left, memory_test_right, memory_test_left_opt
import gc
gc.collect()

print(f"清理后内存使用:{get_memory_usage()} MB")

# 使用生成器处理大数据
print("\n3. 生成器处理大数据:")
def data_generator(size, chunk_size=10000):
    """数据生成器"""
    for i in range(0, size, chunk_size):
        yield pd.DataFrame({
            'key': range(i, min(i + chunk_size, size)),
            'value': np.random.randn(min(chunk_size, size - i))
        })

# 使用生成器处理数据
total_processed = 0
for chunk in data_generator(100000, 10000):
    # 模拟处理
    processed_chunk = chunk.groupby('key')['value'].sum()
    total_processed += len(processed_chunk)

print(f"生成器处理完成,总处理记录:{total_processed}")

7.9.3 合并策略选择

# 合并策略选择
print("合并策略选择:")

# 创建不同大小的测试数据
small_left = pd.DataFrame({'key': range(1000), 'value': range(1000)})
small_right = pd.DataFrame({'key': range(500, 1500), 'info': range(1000)})

medium_left = pd.DataFrame({'key': range(50000), 'value': range(50000)})
medium_right = pd.DataFrame({'key': range(25000, 75000), 'info': range(50000)})

print("1. 不同数据规模的最佳策略:")

# 小数据集
start_time = time.time()
small_merge = pd.merge(small_left, small_right, on='key', how='inner')
small_merge_time = time.time() - start_time

start_time = time.time()
small_join = small_left.set_index('key').join(small_right.set_index('key'), how='inner')
small_join_time = time.time() - start_time

print(f"小数据集 - Merge: {small_merge_time:.6f}秒, Join: {small_join_time:.6f}秒")

# 中等数据集
start_time = time.time()
medium_merge = pd.merge(medium_left, medium_right, on='key', how='inner')
medium_merge_time = time.time() - start_time

start_time = time.time()
medium_join = medium_left.set_index('key').join(medium_right.set_index('key'), how='inner')
medium_join_time = time.time() - start_time

print(f"中等数据集 - Merge: {medium_merge_time:.6f}秒, Join: {medium_join_time:.6f}秒")

# 策略建议
print("\n2. 策略选择建议:")
print("- 小数据集(<10K行): merge和join性能相近,选择语法更清晰的")
print("- 中等数据集(10K-100K行): 预先设置索引的join通常更快")
print("- 大数据集(>100K行): 考虑分块处理或使用数据库")
print("- 多次合并: 优先考虑索引优化")
print("- 内存受限: 使用分块处理或生成器")

7.10 本章小结

7.10.1 核心知识点

  1. Merge操作

    • 连接类型:inner, left, right, outer
    • 连接键:单列、多列连接
    • 冲突处理:suffixes参数
    • 性能优化:排序、索引、数据类型
  2. Join操作

    • 基于索引的连接
    • 多级索引join
    • 性能优势:预设索引的情况下
  3. Concat操作

    • 垂直拼接:行方向合并
    • 水平拼接:列方向合并
    • 多层拼接:keys参数
    • 缺失值处理:join参数
  4. 数据质量管理

    • 重复数据检测和处理
    • 冲突解决策略
    • 数据一致性检查
    • 引用完整性验证
  5. 高级技巧

    • 条件合并
    • 时间序列合并
    • 模糊匹配
    • 链式操作

7.10.2 最佳实践

  • 选择合适的合并方法:根据数据特点选择merge、join或concat
  • 优化数据类型:使用category类型减少内存使用
  • 预处理优化:排序、索引设置提高性能
  • 分块处理:处理超大数据集时的内存管理
  • 数据验证:合并后进行数据质量检查

7.10.3 性能优化要点

  • 索引优化:预先设置索引可显著提升join性能
  • 数据类型优化:使用合适的数据类型减少内存使用
  • 分块处理:对于大数据集采用分块策略
  • 内存管理:及时清理不需要的中间结果
  • 策略选择:根据数据规模选择最优的合并策略

7.10.4 常见陷阱

  • 忘记处理重复键导致的数据膨胀
  • 不同数据类型的键无法正确匹配
  • 大数据集合并时的内存溢出
  • 时间序列数据的时区和格式问题
  • 字符串匹配的大小写敏感性

7.10.5 下一步学习

在下一章中,我们将学习: - 时间序列数据的处理和分析 - 日期时间索引的使用 - 时间序列的重采样和频率转换 - 时间窗口分析和滚动计算


练习题

  1. 实现一个完整的客户数据整合流程
  2. 设计一个处理大数据集合并的优化方案
  3. 创建一个模糊匹配的数据清洗工具
  4. 构建一个多数据源的实时数据合并系统
  5. 分析不同合并策略在各种场景下的性能表现

记住:数据合并是数据分析的基础技能,正确的合并策略能够确保数据的完整性和分析的准确性!