6.1 章节概述
数据分组与聚合是数据分析中的核心操作,它允许我们按照某些条件将数据分组,然后对每个组进行统计计算。本章将详细介绍Pandas中的groupby操作、各种聚合函数以及数据透视表的使用。
6.1.1 学习目标
- 掌握groupby操作的基本原理和用法
- 学会使用各种聚合函数进行数据汇总
- 理解分组后的数据转换和过滤
- 掌握数据透视表的创建和使用
- 学习多级分组和复杂聚合操作
- 了解分组操作的性能优化技巧
6.1.2 分组聚合流程
graph TD
A[原始数据] --> B[分组 Split]
B --> C[应用函数 Apply]
C --> D[合并结果 Combine]
D --> E[聚合结果]
B --> B1[按单列分组]
B --> B2[按多列分组]
B --> B3[按条件分组]
C --> C1[聚合函数]
C --> C2[转换函数]
C --> C3[过滤函数]
6.2 GroupBy基础
6.2.1 创建示例数据
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
# 创建销售数据示例
np.random.seed(42)
sales_data = pd.DataFrame({
'date': pd.date_range('2023-01-01', periods=1000, freq='D'),
'salesperson': np.random.choice(['Alice', 'Bob', 'Charlie', 'David', 'Eve'], 1000),
'region': np.random.choice(['North', 'South', 'East', 'West'], 1000),
'product_category': np.random.choice(['Electronics', 'Clothing', 'Books', 'Home'], 1000),
'sales_amount': np.random.exponential(100, 1000),
'quantity': np.random.poisson(5, 1000),
'customer_type': np.random.choice(['New', 'Returning'], 1000, p=[0.3, 0.7])
})
# 添加一些计算列
sales_data['month'] = sales_data['date'].dt.month
sales_data['quarter'] = sales_data['date'].dt.quarter
sales_data['unit_price'] = sales_data['sales_amount'] / sales_data['quantity']
print("销售数据示例:")
print(sales_data.head(10))
print(f"\n数据形状:{sales_data.shape}")
print(f"数据类型:\n{sales_data.dtypes}")
6.2.2 基本分组操作
# 基本groupby操作
print("基本分组操作:")
# 按单列分组
print("1. 按销售人员分组:")
grouped_by_person = sales_data.groupby('salesperson')
print(f"分组对象类型:{type(grouped_by_person)}")
print(f"分组数量:{grouped_by_person.ngroups}")
print(f"分组大小:\n{grouped_by_person.size()}")
# 查看分组的键
print(f"\n分组键:{list(grouped_by_person.groups.keys())}")
# 获取特定组的数据
print("\nAlice的销售数据(前5条):")
alice_data = grouped_by_person.get_group('Alice')
print(alice_data.head())
# 按多列分组
print("\n2. 按地区和产品类别分组:")
grouped_multi = sales_data.groupby(['region', 'product_category'])
print(f"多列分组数量:{grouped_multi.ngroups}")
print(f"多列分组大小:\n{grouped_multi.size()}")
# 遍历分组
print("\n3. 遍历分组(显示前3个组):")
for i, (name, group) in enumerate(grouped_by_person):
if i < 3:
print(f"\n组名:{name}")
print(f"组大小:{len(group)}")
print(f"平均销售额:{group['sales_amount'].mean():.2f}")
6.2.3 分组对象的属性和方法
# 分组对象的属性和方法
print("分组对象的属性和方法:")
# 基本信息
print(f"分组列:{grouped_by_person.grouper.names}")
print(f"分组轴:{grouped_by_person.grouper.axis}")
# 分组统计信息
print("\n分组统计信息:")
print(f"各组大小:\n{grouped_by_person.size()}")
print(f"\n各组描述统计:")
print(grouped_by_person['sales_amount'].describe())
# 分组索引
print(f"\n分组索引:")
for name, indices in list(grouped_by_person.indices.items())[:2]:
print(f"{name}: {indices[:10]}...") # 只显示前10个索引
# 分组应用函数
print("\n分组应用自定义函数:")
def group_summary(group):
return pd.Series({
'count': len(group),
'total_sales': group['sales_amount'].sum(),
'avg_sales': group['sales_amount'].mean(),
'max_sales': group['sales_amount'].max()
})
summary = grouped_by_person.apply(group_summary)
print(summary)
6.3 聚合函数
6.3.1 基本聚合函数
# 基本聚合函数
print("基本聚合函数:")
# 单个聚合函数
print("1. 各销售人员的总销售额:")
total_sales = sales_data.groupby('salesperson')['sales_amount'].sum()
print(total_sales.sort_values(ascending=False))
print("\n2. 各销售人员的平均销售额:")
avg_sales = sales_data.groupby('salesperson')['sales_amount'].mean()
print(avg_sales.sort_values(ascending=False))
print("\n3. 各销售人员的销售次数:")
sales_count = sales_data.groupby('salesperson')['sales_amount'].count()
print(sales_count.sort_values(ascending=False))
# 多个聚合函数
print("\n4. 多个聚合函数:")
multi_agg = sales_data.groupby('salesperson')['sales_amount'].agg(['count', 'sum', 'mean', 'std', 'min', 'max'])
print(multi_agg.round(2))
# 对不同列应用不同聚合函数
print("\n5. 对不同列应用不同聚合函数:")
custom_agg = sales_data.groupby('salesperson').agg({
'sales_amount': ['sum', 'mean', 'count'],
'quantity': ['sum', 'mean'],
'unit_price': 'mean'
})
print(custom_agg.round(2))
6.3.2 自定义聚合函数
# 自定义聚合函数
print("自定义聚合函数:")
# 定义自定义聚合函数
def sales_range(series):
"""计算销售额的范围"""
return series.max() - series.min()
def coefficient_of_variation(series):
"""计算变异系数"""
return series.std() / series.mean() if series.mean() != 0 else 0
def top_percentile(series, percentile=90):
"""计算指定百分位数"""
return series.quantile(percentile/100)
# 应用自定义聚合函数
print("1. 自定义聚合函数结果:")
custom_metrics = sales_data.groupby('salesperson')['sales_amount'].agg([
'mean',
sales_range,
coefficient_of_variation,
lambda x: top_percentile(x, 90)
])
# 重命名列
custom_metrics.columns = ['平均销售额', '销售额范围', '变异系数', '90分位数']
print(custom_metrics.round(3))
# 使用命名聚合
print("\n2. 使用命名聚合:")
named_agg = sales_data.groupby('salesperson').agg(
总销售额=('sales_amount', 'sum'),
平均销售额=('sales_amount', 'mean'),
销售次数=('sales_amount', 'count'),
最大单笔=('sales_amount', 'max'),
销售额标准差=('sales_amount', 'std')
).round(2)
print(named_agg)
# 复杂的自定义聚合
print("\n3. 复杂的自定义聚合:")
def comprehensive_stats(group):
"""综合统计函数"""
return pd.Series({
'销售总额': group['sales_amount'].sum(),
'销售次数': len(group),
'平均客单价': group['sales_amount'].mean(),
'销售额中位数': group['sales_amount'].median(),
'高价值订单数': (group['sales_amount'] > group['sales_amount'].quantile(0.8)).sum(),
'主要产品类别': group['product_category'].mode().iloc[0] if not group['product_category'].mode().empty else 'N/A',
'新客户比例': (group['customer_type'] == 'New').mean()
})
comprehensive_results = sales_data.groupby('salesperson').apply(comprehensive_stats)
print(comprehensive_results.round(3))
6.3.3 分组后的统计分析
# 分组后的统计分析
print("分组后的统计分析:")
# 描述性统计
print("1. 各地区销售额描述性统计:")
region_stats = sales_data.groupby('region')['sales_amount'].describe()
print(region_stats.round(2))
# 分位数分析
print("\n2. 各地区销售额分位数分析:")
quantiles = sales_data.groupby('region')['sales_amount'].quantile([0.25, 0.5, 0.75, 0.9, 0.95])
print(quantiles.round(2))
# 方差分析
print("\n3. 各产品类别的销售额方差分析:")
variance_analysis = sales_data.groupby('product_category')['sales_amount'].agg([
'count', 'mean', 'std', 'var'
])
variance_analysis['cv'] = variance_analysis['std'] / variance_analysis['mean'] # 变异系数
print(variance_analysis.round(3))
# 相关性分析
print("\n4. 各销售人员的销售额与数量相关性:")
def correlation_analysis(group):
return group['sales_amount'].corr(group['quantity'])
correlations = sales_data.groupby('salesperson').apply(correlation_analysis)
print(correlations.round(3))
6.4 数据转换
6.4.1 Transform方法
# Transform方法
print("Transform方法:")
# 基本transform操作
print("1. 计算各组的标准化值:")
sales_data['sales_amount_normalized'] = sales_data.groupby('salesperson')['sales_amount'].transform(
lambda x: (x - x.mean()) / x.std()
)
print("标准化后的数据(前10行):")
print(sales_data[['salesperson', 'sales_amount', 'sales_amount_normalized']].head(10))
# 计算组内排名
print("\n2. 计算组内销售额排名:")
sales_data['sales_rank_in_group'] = sales_data.groupby('salesperson')['sales_amount'].transform('rank', ascending=False)
print("组内排名(前10行):")
print(sales_data[['salesperson', 'sales_amount', 'sales_rank_in_group']].head(10))
# 计算移动平均
print("\n3. 计算7天移动平均:")
sales_data_sorted = sales_data.sort_values(['salesperson', 'date'])
sales_data_sorted['sales_ma7'] = sales_data_sorted.groupby('salesperson')['sales_amount'].transform(
lambda x: x.rolling(window=7, min_periods=1).mean()
)
print("移动平均(Alice的前10条记录):")
alice_ma = sales_data_sorted[sales_data_sorted['salesperson'] == 'Alice'][
['date', 'sales_amount', 'sales_ma7']
].head(10)
print(alice_ma)
# 计算累计值
print("\n4. 计算累计销售额:")
sales_data['cumulative_sales'] = sales_data.groupby('salesperson')['sales_amount'].transform('cumsum')
print("累计销售额(前10行):")
print(sales_data[['salesperson', 'sales_amount', 'cumulative_sales']].head(10))
6.4.2 高级转换操作
# 高级转换操作
print("高级转换操作:")
# 计算组内百分比
print("1. 计算各销售人员在总销售额中的占比:")
total_sales_by_person = sales_data.groupby('salesperson')['sales_amount'].sum()
sales_data['person_sales_percentage'] = sales_data['salesperson'].map(
lambda x: total_sales_by_person[x] / total_sales_by_person.sum() * 100
)
print("销售人员占比:")
person_percentage = sales_data.groupby('salesperson')['person_sales_percentage'].first().sort_values(ascending=False)
print(person_percentage.round(2))
# 计算相对于组均值的偏差
print("\n2. 计算相对于组均值的偏差:")
sales_data['deviation_from_group_mean'] = sales_data.groupby('region')['sales_amount'].transform(
lambda x: x - x.mean()
)
print("偏差统计:")
deviation_stats = sales_data.groupby('region')['deviation_from_group_mean'].agg(['mean', 'std', 'min', 'max'])
print(deviation_stats.round(2))
# 标记异常值
print("\n3. 标记组内异常值:")
def mark_outliers(group, column, threshold=2):
"""标记超过阈值标准差的异常值"""
mean_val = group[column].mean()
std_val = group[column].std()
return np.abs(group[column] - mean_val) > threshold * std_val
sales_data['is_outlier'] = sales_data.groupby('salesperson').apply(
lambda x: mark_outliers(x, 'sales_amount')
).reset_index(level=0, drop=True)
print("异常值统计:")
outlier_stats = sales_data.groupby('salesperson')['is_outlier'].agg(['sum', 'mean'])
outlier_stats.columns = ['异常值数量', '异常值比例']
print(outlier_stats.round(3))
6.5 数据透视表
6.5.1 基本透视表
# 基本透视表
print("基本透视表:")
# 简单透视表
print("1. 销售人员vs地区的销售额透视表:")
pivot_basic = sales_data.pivot_table(
values='sales_amount',
index='salesperson',
columns='region',
aggfunc='sum'
)
print(pivot_basic.round(2))
# 添加边际总计
print("\n2. 带总计的透视表:")
pivot_with_margins = sales_data.pivot_table(
values='sales_amount',
index='salesperson',
columns='region',
aggfunc='sum',
margins=True,
margins_name='总计'
)
print(pivot_with_margins.round(2))
# 多个值的透视表
print("\n3. 多个值的透视表:")
pivot_multi_values = sales_data.pivot_table(
values=['sales_amount', 'quantity'],
index='salesperson',
columns='region',
aggfunc='sum'
)
print(pivot_multi_values.round(2))
# 不同聚合函数
print("\n4. 不同聚合函数的透视表:")
pivot_multi_agg = sales_data.pivot_table(
values='sales_amount',
index='salesperson',
columns='region',
aggfunc=['sum', 'mean', 'count']
)
print(pivot_multi_agg.round(2))
6.5.2 复杂透视表
# 复杂透视表
print("复杂透视表:")
# 多级索引透视表
print("1. 多级索引透视表:")
pivot_multi_index = sales_data.pivot_table(
values='sales_amount',
index=['region', 'salesperson'],
columns=['product_category', 'customer_type'],
aggfunc='sum',
fill_value=0
)
print(pivot_multi_index.head(10))
# 时间序列透视表
print("\n2. 月度销售透视表:")
pivot_monthly = sales_data.pivot_table(
values='sales_amount',
index='month',
columns='product_category',
aggfunc='sum',
margins=True
)
print(pivot_monthly.round(2))
# 自定义聚合函数的透视表
print("\n3. 自定义聚合函数透视表:")
def sales_efficiency(series):
"""销售效率:平均销售额/销售次数"""
return series.mean() / len(series) if len(series) > 0 else 0
pivot_custom = sales_data.pivot_table(
values='sales_amount',
index='salesperson',
columns='quarter',
aggfunc=sales_efficiency
)
print(pivot_custom.round(4))
# 条件透视表
print("\n4. 高价值订单透视表(销售额>200):")
high_value_sales = sales_data[sales_data['sales_amount'] > 200]
pivot_high_value = high_value_sales.pivot_table(
values='sales_amount',
index='salesperson',
columns='region',
aggfunc=['count', 'sum'],
fill_value=0
)
print(pivot_high_value)
6.5.3 透视表可视化
# 透视表可视化
print("透视表可视化:")
# 创建热力图
plt.figure(figsize=(12, 8))
# 销售人员vs地区的销售额热力图
pivot_for_heatmap = sales_data.pivot_table(
values='sales_amount',
index='salesperson',
columns='region',
aggfunc='sum'
)
plt.subplot(2, 2, 1)
sns.heatmap(pivot_for_heatmap, annot=True, fmt='.0f', cmap='YlOrRd')
plt.title('销售人员vs地区销售额热力图')
# 产品类别vs月份的销售量热力图
pivot_product_month = sales_data.pivot_table(
values='quantity',
index='product_category',
columns='month',
aggfunc='sum'
)
plt.subplot(2, 2, 2)
sns.heatmap(pivot_product_month, annot=True, fmt='.0f', cmap='Blues')
plt.title('产品类别vs月份销售量热力图')
# 地区vs客户类型的平均销售额
pivot_region_customer = sales_data.pivot_table(
values='sales_amount',
index='region',
columns='customer_type',
aggfunc='mean'
)
plt.subplot(2, 2, 3)
sns.heatmap(pivot_region_customer, annot=True, fmt='.1f', cmap='Greens')
plt.title('地区vs客户类型平均销售额')
# 销售人员vs季度的销售次数
pivot_person_quarter = sales_data.pivot_table(
values='sales_amount',
index='salesperson',
columns='quarter',
aggfunc='count'
)
plt.subplot(2, 2, 4)
sns.heatmap(pivot_person_quarter, annot=True, fmt='.0f', cmap='Purples')
plt.title('销售人员vs季度销售次数')
plt.tight_layout()
plt.show()
# 透视表条形图
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 各地区总销售额
region_sales = sales_data.groupby('region')['sales_amount'].sum().sort_values(ascending=False)
region_sales.plot(kind='bar', ax=axes[0, 0], color='skyblue')
axes[0, 0].set_title('各地区总销售额')
axes[0, 0].set_ylabel('销售额')
# 各产品类别平均销售额
category_avg = sales_data.groupby('product_category')['sales_amount'].mean().sort_values(ascending=False)
category_avg.plot(kind='bar', ax=axes[0, 1], color='lightgreen')
axes[0, 1].set_title('各产品类别平均销售额')
axes[0, 1].set_ylabel('平均销售额')
# 各销售人员销售次数
person_count = sales_data.groupby('salesperson')['sales_amount'].count().sort_values(ascending=False)
person_count.plot(kind='bar', ax=axes[1, 0], color='orange')
axes[1, 0].set_title('各销售人员销售次数')
axes[1, 0].set_ylabel('销售次数')
# 月度销售趋势
monthly_sales = sales_data.groupby('month')['sales_amount'].sum()
monthly_sales.plot(kind='line', ax=axes[1, 1], marker='o', color='red')
axes[1, 1].set_title('月度销售趋势')
axes[1, 1].set_ylabel('销售额')
axes[1, 1].set_xlabel('月份')
plt.tight_layout()
plt.show()
6.6 多级分组
6.6.1 多级分组基础
# 多级分组基础
print("多级分组基础:")
# 按多个列分组
print("1. 按地区和产品类别分组:")
multi_group = sales_data.groupby(['region', 'product_category'])
# 查看分组结构
print(f"分组数量:{multi_group.ngroups}")
print(f"分组大小:\n{multi_group.size()}")
# 多级分组聚合
print("\n2. 多级分组聚合:")
multi_agg = multi_group['sales_amount'].agg(['count', 'sum', 'mean'])
print(multi_agg.round(2))
# 多级分组的层级操作
print("\n3. 按第一级分组求和:")
level_0_sum = multi_agg.groupby(level=0).sum()
print(level_0_sum)
print("\n4. 按第二级分组求平均:")
level_1_mean = multi_agg.groupby(level=1).mean()
print(level_1_mean.round(2))
# 重置索引
print("\n5. 重置多级索引:")
multi_agg_reset = multi_agg.reset_index()
print(multi_agg_reset.head())
6.6.2 复杂多级分组
# 复杂多级分组
print("复杂多级分组:")
# 三级分组
print("1. 三级分组(地区-产品类别-客户类型):")
triple_group = sales_data.groupby(['region', 'product_category', 'customer_type'])
triple_agg = triple_group['sales_amount'].agg(['count', 'sum', 'mean'])
print(triple_agg.head(10))
# 不同级别的不同聚合
print("\n2. 不同级别的不同聚合:")
complex_agg = sales_data.groupby(['region', 'salesperson']).agg({
'sales_amount': ['sum', 'mean', 'count'],
'quantity': ['sum', 'mean'],
'unit_price': 'mean'
})
# 展平列名
complex_agg.columns = ['_'.join(col).strip() for col in complex_agg.columns.values]
print(complex_agg.head())
# 条件分组
print("\n3. 条件分组:")
# 创建销售额等级
sales_data['sales_level'] = pd.cut(
sales_data['sales_amount'],
bins=[0, 50, 100, 200, float('inf')],
labels=['低', '中', '高', '极高']
)
level_group = sales_data.groupby(['region', 'sales_level'])
level_stats = level_group.size().unstack(fill_value=0)
print("各地区销售额等级分布:")
print(level_stats)
# 计算比例
level_props = level_stats.div(level_stats.sum(axis=1), axis=0)
print("\n各地区销售额等级比例:")
print(level_props.round(3))
6.7 分组过滤
6.7.1 基本分组过滤
# 基本分组过滤
print("基本分组过滤:")
# 过滤组大小
print("1. 过滤销售次数大于50的销售人员:")
frequent_sellers = sales_data.groupby('salesperson').filter(lambda x: len(x) > 50)
print(f"过滤后数据量:{len(frequent_sellers)}")
print("剩余销售人员:", frequent_sellers['salesperson'].unique())
# 过滤组统计值
print("\n2. 过滤平均销售额大于100的地区:")
high_avg_regions = sales_data.groupby('region').filter(lambda x: x['sales_amount'].mean() > 100)
print(f"过滤后数据量:{len(high_avg_regions)}")
print("剩余地区:", high_avg_regions['region'].unique())
# 过滤组内最大值
print("\n3. 过滤最大销售额大于500的产品类别:")
high_max_categories = sales_data.groupby('product_category').filter(
lambda x: x['sales_amount'].max() > 500
)
print(f"过滤后数据量:{len(high_max_categories)}")
print("剩余产品类别:", high_max_categories['product_category'].unique())
# 复合条件过滤
print("\n4. 复合条件过滤:")
complex_filter = sales_data.groupby('salesperson').filter(
lambda x: (len(x) > 30) and (x['sales_amount'].mean() > 80) and (x['sales_amount'].std() < 100)
)
print(f"复合条件过滤后数据量:{len(complex_filter)}")
print("符合条件的销售人员:", complex_filter['salesperson'].unique())
6.7.2 高级分组过滤
# 高级分组过滤
print("高级分组过滤:")
# 基于分位数的过滤
print("1. 过滤销售额中位数在前50%的地区:")
region_medians = sales_data.groupby('region')['sales_amount'].median()
median_threshold = region_medians.quantile(0.5)
top_median_regions = sales_data.groupby('region').filter(
lambda x: x['sales_amount'].median() > median_threshold
)
print(f"过滤后地区:{top_median_regions['region'].unique()}")
# 基于相关性的过滤
print("\n2. 过滤销售额与数量相关性大于0.3的销售人员:")
def high_correlation_filter(group):
if len(group) < 10: # 样本量太小
return False
correlation = group['sales_amount'].corr(group['quantity'])
return correlation > 0.3
high_corr_sellers = sales_data.groupby('salesperson').filter(high_correlation_filter)
print(f"高相关性销售人员:{high_corr_sellers['salesperson'].unique()}")
# 基于趋势的过滤
print("\n3. 过滤销售额呈上升趋势的销售人员:")
def upward_trend_filter(group):
if len(group) < 20: # 需要足够的数据点
return False
# 简单的趋势检测:后半部分平均值 > 前半部分平均值
mid_point = len(group) // 2
first_half_avg = group.iloc[:mid_point]['sales_amount'].mean()
second_half_avg = group.iloc[mid_point:]['sales_amount'].mean()
return second_half_avg > first_half_avg * 1.1 # 至少增长10%
# 按日期排序后应用过滤
sales_sorted = sales_data.sort_values(['salesperson', 'date'])
upward_trend_sellers = sales_sorted.groupby('salesperson').filter(upward_trend_filter)
print(f"上升趋势销售人员:{upward_trend_sellers['salesperson'].unique()}")
# 组合多个过滤条件
print("\n4. 组合过滤条件:")
def comprehensive_filter(group):
conditions = [
len(group) >= 40, # 足够的销售记录
group['sales_amount'].mean() >= 90, # 平均销售额不低
group['sales_amount'].std() <= 120, # 销售额稳定性
(group['customer_type'] == 'Returning').mean() >= 0.6 # 回头客比例高
]
return all(conditions)
elite_sellers = sales_data.groupby('salesperson').filter(comprehensive_filter)
print(f"精英销售人员:{elite_sellers['salesperson'].unique()}")
# 显示精英销售人员的详细统计
if len(elite_sellers) > 0:
elite_stats = elite_sellers.groupby('salesperson').agg({
'sales_amount': ['count', 'sum', 'mean', 'std'],
'customer_type': lambda x: (x == 'Returning').mean()
})
elite_stats.columns = ['销售次数', '总销售额', '平均销售额', '销售额标准差', '回头客比例']
print("\n精英销售人员统计:")
print(elite_stats.round(2))
6.8 窗口函数
6.8.1 滚动窗口
# 滚动窗口
print("滚动窗口:")
# 准备时间序列数据
daily_sales = sales_data.groupby('date')['sales_amount'].sum().reset_index()
daily_sales = daily_sales.sort_values('date')
print("1. 7天滚动平均:")
daily_sales['sales_ma7'] = daily_sales['sales_amount'].rolling(window=7).mean()
print(daily_sales.head(10))
print("\n2. 30天滚动统计:")
daily_sales['sales_ma30'] = daily_sales['sales_amount'].rolling(window=30).mean()
daily_sales['sales_std30'] = daily_sales['sales_amount'].rolling(window=30).std()
daily_sales['sales_min30'] = daily_sales['sales_amount'].rolling(window=30).min()
daily_sales['sales_max30'] = daily_sales['sales_amount'].rolling(window=30).max()
print(daily_sales[['date', 'sales_amount', 'sales_ma30', 'sales_std30']].head(35))
# 分组滚动窗口
print("\n3. 各销售人员的滚动统计:")
sales_person_daily = sales_data.groupby(['salesperson', 'date'])['sales_amount'].sum().reset_index()
sales_person_daily = sales_person_daily.sort_values(['salesperson', 'date'])
sales_person_daily['rolling_avg'] = sales_person_daily.groupby('salesperson')['sales_amount'].transform(
lambda x: x.rolling(window=7, min_periods=1).mean()
)
print("Alice的滚动平均(前10天):")
alice_rolling = sales_person_daily[sales_person_daily['salesperson'] == 'Alice'].head(10)
print(alice_rolling[['date', 'sales_amount', 'rolling_avg']])
6.8.2 扩展窗口
# 扩展窗口
print("扩展窗口:")
print("1. 累计统计:")
daily_sales['cumulative_sum'] = daily_sales['sales_amount'].expanding().sum()
daily_sales['cumulative_mean'] = daily_sales['sales_amount'].expanding().mean()
daily_sales['cumulative_std'] = daily_sales['sales_amount'].expanding().std()
print(daily_sales[['date', 'sales_amount', 'cumulative_sum', 'cumulative_mean']].head(10))
print("\n2. 各销售人员的累计业绩:")
sales_person_daily['cumulative_sales'] = sales_person_daily.groupby('salesperson')['sales_amount'].transform(
lambda x: x.expanding().sum()
)
print("Bob的累计销售额(前10天):")
bob_cumulative = sales_person_daily[sales_person_daily['salesperson'] == 'Bob'].head(10)
print(bob_cumulative[['date', 'sales_amount', 'cumulative_sales']])
# 扩展窗口的应用:计算历史最佳表现
print("\n3. 历史最佳表现追踪:")
sales_person_daily['historical_max'] = sales_person_daily.groupby('salesperson')['sales_amount'].transform(
lambda x: x.expanding().max()
)
sales_person_daily['is_new_record'] = (
sales_person_daily['sales_amount'] == sales_person_daily['historical_max']
)
print("Charlie的新记录情况:")
charlie_records = sales_person_daily[
(sales_person_daily['salesperson'] == 'Charlie') &
(sales_person_daily['is_new_record'])
].head(5)
print(charlie_records[['date', 'sales_amount', 'historical_max']])
6.9 性能优化
6.9.1 分组操作性能优化
# 分组操作性能优化
print("分组操作性能优化:")
# 创建大数据集进行性能测试
import time
large_data = pd.DataFrame({
'group': np.random.choice(['A', 'B', 'C', 'D', 'E'], 1000000),
'value1': np.random.randn(1000000),
'value2': np.random.randn(1000000),
'category': np.random.choice(['X', 'Y', 'Z'], 1000000)
})
print(f"大数据集形状:{large_data.shape}")
# 1. 分类数据类型优化
print("\n1. 分类数据类型优化:")
start_time = time.time()
result1 = large_data.groupby('group')['value1'].sum()
original_time = time.time() - start_time
large_data['group'] = large_data['group'].astype('category')
large_data['category'] = large_data['category'].astype('category')
start_time = time.time()
result2 = large_data.groupby('group')['value1'].sum()
category_time = time.time() - start_time
print(f"原始分组时间:{original_time:.4f}秒")
print(f"分类类型分组时间:{category_time:.4f}秒")
print(f"性能提升:{original_time/category_time:.2f}倍")
# 2. 预排序优化
print("\n2. 预排序优化:")
unsorted_data = large_data.sample(frac=1).reset_index(drop=True) # 打乱顺序
start_time = time.time()
result3 = unsorted_data.groupby('group')['value1'].sum()
unsorted_time = time.time() - start_time
sorted_data = unsorted_data.sort_values('group')
start_time = time.time()
result4 = sorted_data.groupby('group')['value1'].sum()
sorted_time = time.time() - start_time
print(f"未排序分组时间:{unsorted_time:.4f}秒")
print(f"预排序分组时间:{sorted_time:.4f}秒")
print(f"性能提升:{unsorted_time/sorted_time:.2f}倍")
# 3. 使用numba加速自定义聚合函数
try:
from numba import jit
@jit
def fast_custom_agg(values):
"""使用numba加速的自定义聚合函数"""
return np.sum(values ** 2) / len(values)
print("\n3. numba加速测试:")
start_time = time.time()
result5 = large_data.groupby('group')['value1'].agg(lambda x: np.sum(x**2) / len(x))
normal_agg_time = time.time() - start_time
start_time = time.time()
result6 = large_data.groupby('group')['value1'].agg(fast_custom_agg)
numba_agg_time = time.time() - start_time
print(f"普通自定义聚合时间:{normal_agg_time:.4f}秒")
print(f"numba加速聚合时间:{numba_agg_time:.4f}秒")
print(f"性能提升:{normal_agg_time/numba_agg_time:.2f}倍")
except ImportError:
print("\n3. numba未安装,跳过加速测试")
6.9.2 内存优化技巧
# 内存优化技巧
print("内存优化技巧:")
# 1. 分块处理大数据
def chunked_groupby(df, group_col, value_col, agg_func, chunk_size=100000):
"""分块处理大数据的分组操作"""
results = []
for i in range(0, len(df), chunk_size):
chunk = df.iloc[i:i+chunk_size]
chunk_result = chunk.groupby(group_col)[value_col].agg(agg_func)
results.append(chunk_result)
# 合并结果
combined = pd.concat(results)
if agg_func == 'sum':
return combined.groupby(combined.index).sum()
elif agg_func == 'mean':
# 对于均值,需要重新计算
counts = pd.concat([chunk.groupby(group_col)[value_col].count() for chunk in
[df.iloc[i:i+chunk_size] for i in range(0, len(df), chunk_size)]])
counts = counts.groupby(counts.index).sum()
sums = combined.groupby(combined.index).sum()
return sums / counts
else:
return combined.groupby(combined.index).agg(agg_func)
print("1. 分块处理测试:")
start_time = time.time()
normal_result = large_data.groupby('group')['value1'].sum()
normal_time = time.time() - start_time
start_time = time.time()
chunked_result = chunked_groupby(large_data, 'group', 'value1', 'sum', chunk_size=200000)
chunked_time = time.time() - start_time
print(f"普通分组时间:{normal_time:.4f}秒")
print(f"分块分组时间:{chunked_time:.4f}秒")
print(f"结果一致性:{np.allclose(normal_result.sort_index(), chunked_result.sort_index())}")
# 2. 内存使用监控
def memory_usage_mb():
"""获取当前内存使用量(MB)"""
import psutil
process = psutil.Process()
return process.memory_info().rss / 1024 / 1024
try:
import psutil
print(f"\n2. 当前内存使用:{memory_usage_mb():.2f} MB")
# 大数据操作前后的内存对比
before_memory = memory_usage_mb()
large_result = large_data.groupby(['group', 'category']).agg({
'value1': ['sum', 'mean', 'std'],
'value2': ['sum', 'mean', 'std']
})
after_memory = memory_usage_mb()
print(f"操作前内存:{before_memory:.2f} MB")
print(f"操作后内存:{after_memory:.2f} MB")
print(f"内存增长:{after_memory - before_memory:.2f} MB")
# 清理内存
del large_result
import gc
gc.collect()
except ImportError:
print("\n2. psutil未安装,跳过内存监控")
6.10 实际应用案例
6.10.1 销售业绩分析
# 销售业绩分析案例
print("销售业绩分析案例:")
# 综合业绩分析
print("1. 销售人员综合业绩排名:")
performance_metrics = sales_data.groupby('salesperson').agg({
'sales_amount': ['count', 'sum', 'mean', 'std'],
'quantity': 'sum',
'customer_type': lambda x: (x == 'Returning').mean()
}).round(2)
# 展平列名
performance_metrics.columns = ['销售次数', '总销售额', '平均销售额', '销售额标准差', '总销量', '回头客比例']
# 计算综合得分
performance_metrics['效率得分'] = (
performance_metrics['平均销售额'] / performance_metrics['平均销售额'].max() * 0.3 +
performance_metrics['总销售额'] / performance_metrics['总销售额'].max() * 0.4 +
performance_metrics['回头客比例'] * 0.3
)
performance_ranking = performance_metrics.sort_values('效率得分', ascending=False)
print(performance_ranking)
# 地区业绩对比
print("\n2. 地区业绩对比:")
region_performance = sales_data.groupby('region').agg({
'sales_amount': ['count', 'sum', 'mean'],
'salesperson': 'nunique',
'product_category': 'nunique'
}).round(2)
region_performance.columns = ['订单数', '总销售额', '平均订单额', '销售人员数', '产品类别数']
region_performance['人均销售额'] = region_performance['总销售额'] / region_performance['销售人员数']
region_performance = region_performance.sort_values('总销售额', ascending=False)
print(region_performance)
# 产品类别分析
print("\n3. 产品类别盈利能力分析:")
category_analysis = sales_data.groupby('product_category').agg({
'sales_amount': ['count', 'sum', 'mean', 'median'],
'unit_price': ['mean', 'std'],
'quantity': 'sum'
}).round(2)
category_analysis.columns = ['订单数', '总销售额', '平均订单额', '中位订单额', '平均单价', '单价标准差', '总销量']
category_analysis['市场份额'] = category_analysis['总销售额'] / category_analysis['总销售额'].sum()
category_analysis = category_analysis.sort_values('总销售额', ascending=False)
print(category_analysis)
6.10.2 客户行为分析
# 客户行为分析案例
print("\n客户行为分析案例:")
# 创建客户数据
customer_data = sales_data.groupby(['region', 'customer_type']).agg({
'sales_amount': ['count', 'sum', 'mean'],
'quantity': 'sum',
'salesperson': 'nunique'
}).round(2)
customer_data.columns = ['购买次数', '总消费额', '平均消费额', '总购买量', '接触销售人员数']
print("1. 客户类型分析:")
print(customer_data)
# 客户价值分层
print("\n2. 客户价值分层:")
customer_value = sales_data.groupby('customer_type').agg({
'sales_amount': ['count', 'sum', 'mean'],
'quantity': 'mean'
}).round(2)
customer_value.columns = ['购买频次', '总价值', '平均订单价值', '平均购买量']
customer_value['客户生命周期价值'] = customer_value['总价值'] / customer_value.index.map(
{'New': 1, 'Returning': 3} # 假设回头客的生命周期是新客户的3倍
)
print(customer_value)
# 地区客户偏好分析
print("\n3. 地区客户偏好分析:")
region_preference = sales_data.groupby(['region', 'product_category']).agg({
'sales_amount': 'sum',
'quantity': 'sum'
}).round(2)
# 计算各地区各产品类别的占比
region_total = sales_data.groupby('region')['sales_amount'].sum()
region_preference['销售额占比'] = region_preference.groupby(level=0)['sales_amount'].transform(
lambda x: x / x.sum()
)
print("各地区产品类别销售额占比:")
preference_pivot = region_preference['销售额占比'].unstack().round(3)
print(preference_pivot)
# 找出各地区的主要产品类别
print("\n各地区主要产品类别:")
for region in preference_pivot.index:
main_category = preference_pivot.loc[region].idxmax()
main_share = preference_pivot.loc[region].max()
print(f"{region}: {main_category} ({main_share:.1%})")
6.11 本章小结
6.11.1 核心知识点
GroupBy基础
- 分组对象:
df.groupby(column)
- 分组属性:
groups
,size()
,ngroups
- 获取组:
get_group()
, 遍历组
- 分组对象:
聚合函数
- 基本聚合:
sum()
,mean()
,count()
,std()
- 多重聚合:
agg(['func1', 'func2'])
- 自定义聚合:
agg(custom_function)
- 命名聚合:
agg(name=('column', 'function'))
- 基本聚合:
数据转换
- Transform:
transform(function)
- 标准化、排名、移动平均
- 组内计算和标记
- Transform:
数据透视表
- 基本透视:
pivot_table()
- 多级透视:多个index和columns
- 自定义聚合函数
- 边际总计:
margins=True
- 基本透视:
多级分组
- 多列分组:
groupby(['col1', 'col2'])
- 层级操作:
groupby(level=0)
- 复杂聚合组合
- 多列分组:
分组过滤
- 条件过滤:
filter(function)
- 组大小过滤
- 统计值过滤
- 条件过滤:
窗口函数
- 滚动窗口:
rolling(window)
- 扩展窗口:
expanding()
- 分组窗口操作
- 滚动窗口:
6.11.2 最佳实践
- 使用分类数据类型提高分组性能
- 预排序数据可以加速分组操作
- 对于大数据集考虑分块处理
- 合理使用transform避免重复计算
- 透视表适合交叉分析和报表生成
6.11.3 性能优化要点
- 分类数据类型:减少内存使用和提高速度
- 预排序:对于大数据集显著提升性能
- 避免复杂的自定义函数:优先使用内置函数
- 分块处理:处理超大数据集
- 内存管理:及时清理不需要的中间结果
6.11.4 下一步学习
在下一章中,我们将学习: - 数据合并和连接操作 - 不同类型的join操作 - 数据拼接和组合 - 处理重复和冲突数据
练习题
- 创建一个复杂的销售数据分析报告
- 实现自定义的分组聚合函数
- 设计一个多维度的数据透视表
- 优化大数据集的分组操作性能
- 使用窗口函数分析时间序列趋势
记住:分组聚合是数据分析的核心技能,熟练掌握这些操作将大大提高你的数据分析效率!