6.1 章节概述

数据分组与聚合是数据分析中的核心操作,它允许我们按照某些条件将数据分组,然后对每个组进行统计计算。本章将详细介绍Pandas中的groupby操作、各种聚合函数以及数据透视表的使用。

6.1.1 学习目标

  • 掌握groupby操作的基本原理和用法
  • 学会使用各种聚合函数进行数据汇总
  • 理解分组后的数据转换和过滤
  • 掌握数据透视表的创建和使用
  • 学习多级分组和复杂聚合操作
  • 了解分组操作的性能优化技巧

6.1.2 分组聚合流程

graph TD
    A[原始数据] --> B[分组 Split]
    B --> C[应用函数 Apply]
    C --> D[合并结果 Combine]
    D --> E[聚合结果]
    
    B --> B1[按单列分组]
    B --> B2[按多列分组]
    B --> B3[按条件分组]
    
    C --> C1[聚合函数]
    C --> C2[转换函数]
    C --> C3[过滤函数]

6.2 GroupBy基础

6.2.1 创建示例数据

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# 创建销售数据示例
np.random.seed(42)
sales_data = pd.DataFrame({
    'date': pd.date_range('2023-01-01', periods=1000, freq='D'),
    'salesperson': np.random.choice(['Alice', 'Bob', 'Charlie', 'David', 'Eve'], 1000),
    'region': np.random.choice(['North', 'South', 'East', 'West'], 1000),
    'product_category': np.random.choice(['Electronics', 'Clothing', 'Books', 'Home'], 1000),
    'sales_amount': np.random.exponential(100, 1000),
    'quantity': np.random.poisson(5, 1000),
    'customer_type': np.random.choice(['New', 'Returning'], 1000, p=[0.3, 0.7])
})

# 添加一些计算列
sales_data['month'] = sales_data['date'].dt.month
sales_data['quarter'] = sales_data['date'].dt.quarter
sales_data['unit_price'] = sales_data['sales_amount'] / sales_data['quantity']

print("销售数据示例:")
print(sales_data.head(10))
print(f"\n数据形状:{sales_data.shape}")
print(f"数据类型:\n{sales_data.dtypes}")

6.2.2 基本分组操作

# 基本groupby操作
print("基本分组操作:")

# 按单列分组
print("1. 按销售人员分组:")
grouped_by_person = sales_data.groupby('salesperson')
print(f"分组对象类型:{type(grouped_by_person)}")
print(f"分组数量:{grouped_by_person.ngroups}")
print(f"分组大小:\n{grouped_by_person.size()}")

# 查看分组的键
print(f"\n分组键:{list(grouped_by_person.groups.keys())}")

# 获取特定组的数据
print("\nAlice的销售数据(前5条):")
alice_data = grouped_by_person.get_group('Alice')
print(alice_data.head())

# 按多列分组
print("\n2. 按地区和产品类别分组:")
grouped_multi = sales_data.groupby(['region', 'product_category'])
print(f"多列分组数量:{grouped_multi.ngroups}")
print(f"多列分组大小:\n{grouped_multi.size()}")

# 遍历分组
print("\n3. 遍历分组(显示前3个组):")
for i, (name, group) in enumerate(grouped_by_person):
    if i < 3:
        print(f"\n组名:{name}")
        print(f"组大小:{len(group)}")
        print(f"平均销售额:{group['sales_amount'].mean():.2f}")

6.2.3 分组对象的属性和方法

# 分组对象的属性和方法
print("分组对象的属性和方法:")

# 基本信息
print(f"分组列:{grouped_by_person.grouper.names}")
print(f"分组轴:{grouped_by_person.grouper.axis}")

# 分组统计信息
print("\n分组统计信息:")
print(f"各组大小:\n{grouped_by_person.size()}")
print(f"\n各组描述统计:")
print(grouped_by_person['sales_amount'].describe())

# 分组索引
print(f"\n分组索引:")
for name, indices in list(grouped_by_person.indices.items())[:2]:
    print(f"{name}: {indices[:10]}...")  # 只显示前10个索引

# 分组应用函数
print("\n分组应用自定义函数:")
def group_summary(group):
    return pd.Series({
        'count': len(group),
        'total_sales': group['sales_amount'].sum(),
        'avg_sales': group['sales_amount'].mean(),
        'max_sales': group['sales_amount'].max()
    })

summary = grouped_by_person.apply(group_summary)
print(summary)

6.3 聚合函数

6.3.1 基本聚合函数

# 基本聚合函数
print("基本聚合函数:")

# 单个聚合函数
print("1. 各销售人员的总销售额:")
total_sales = sales_data.groupby('salesperson')['sales_amount'].sum()
print(total_sales.sort_values(ascending=False))

print("\n2. 各销售人员的平均销售额:")
avg_sales = sales_data.groupby('salesperson')['sales_amount'].mean()
print(avg_sales.sort_values(ascending=False))

print("\n3. 各销售人员的销售次数:")
sales_count = sales_data.groupby('salesperson')['sales_amount'].count()
print(sales_count.sort_values(ascending=False))

# 多个聚合函数
print("\n4. 多个聚合函数:")
multi_agg = sales_data.groupby('salesperson')['sales_amount'].agg(['count', 'sum', 'mean', 'std', 'min', 'max'])
print(multi_agg.round(2))

# 对不同列应用不同聚合函数
print("\n5. 对不同列应用不同聚合函数:")
custom_agg = sales_data.groupby('salesperson').agg({
    'sales_amount': ['sum', 'mean', 'count'],
    'quantity': ['sum', 'mean'],
    'unit_price': 'mean'
})
print(custom_agg.round(2))

6.3.2 自定义聚合函数

# 自定义聚合函数
print("自定义聚合函数:")

# 定义自定义聚合函数
def sales_range(series):
    """计算销售额的范围"""
    return series.max() - series.min()

def coefficient_of_variation(series):
    """计算变异系数"""
    return series.std() / series.mean() if series.mean() != 0 else 0

def top_percentile(series, percentile=90):
    """计算指定百分位数"""
    return series.quantile(percentile/100)

# 应用自定义聚合函数
print("1. 自定义聚合函数结果:")
custom_metrics = sales_data.groupby('salesperson')['sales_amount'].agg([
    'mean',
    sales_range,
    coefficient_of_variation,
    lambda x: top_percentile(x, 90)
])

# 重命名列
custom_metrics.columns = ['平均销售额', '销售额范围', '变异系数', '90分位数']
print(custom_metrics.round(3))

# 使用命名聚合
print("\n2. 使用命名聚合:")
named_agg = sales_data.groupby('salesperson').agg(
    总销售额=('sales_amount', 'sum'),
    平均销售额=('sales_amount', 'mean'),
    销售次数=('sales_amount', 'count'),
    最大单笔=('sales_amount', 'max'),
    销售额标准差=('sales_amount', 'std')
).round(2)
print(named_agg)

# 复杂的自定义聚合
print("\n3. 复杂的自定义聚合:")
def comprehensive_stats(group):
    """综合统计函数"""
    return pd.Series({
        '销售总额': group['sales_amount'].sum(),
        '销售次数': len(group),
        '平均客单价': group['sales_amount'].mean(),
        '销售额中位数': group['sales_amount'].median(),
        '高价值订单数': (group['sales_amount'] > group['sales_amount'].quantile(0.8)).sum(),
        '主要产品类别': group['product_category'].mode().iloc[0] if not group['product_category'].mode().empty else 'N/A',
        '新客户比例': (group['customer_type'] == 'New').mean()
    })

comprehensive_results = sales_data.groupby('salesperson').apply(comprehensive_stats)
print(comprehensive_results.round(3))

6.3.3 分组后的统计分析

# 分组后的统计分析
print("分组后的统计分析:")

# 描述性统计
print("1. 各地区销售额描述性统计:")
region_stats = sales_data.groupby('region')['sales_amount'].describe()
print(region_stats.round(2))

# 分位数分析
print("\n2. 各地区销售额分位数分析:")
quantiles = sales_data.groupby('region')['sales_amount'].quantile([0.25, 0.5, 0.75, 0.9, 0.95])
print(quantiles.round(2))

# 方差分析
print("\n3. 各产品类别的销售额方差分析:")
variance_analysis = sales_data.groupby('product_category')['sales_amount'].agg([
    'count', 'mean', 'std', 'var'
])
variance_analysis['cv'] = variance_analysis['std'] / variance_analysis['mean']  # 变异系数
print(variance_analysis.round(3))

# 相关性分析
print("\n4. 各销售人员的销售额与数量相关性:")
def correlation_analysis(group):
    return group['sales_amount'].corr(group['quantity'])

correlations = sales_data.groupby('salesperson').apply(correlation_analysis)
print(correlations.round(3))

6.4 数据转换

6.4.1 Transform方法

# Transform方法
print("Transform方法:")

# 基本transform操作
print("1. 计算各组的标准化值:")
sales_data['sales_amount_normalized'] = sales_data.groupby('salesperson')['sales_amount'].transform(
    lambda x: (x - x.mean()) / x.std()
)

print("标准化后的数据(前10行):")
print(sales_data[['salesperson', 'sales_amount', 'sales_amount_normalized']].head(10))

# 计算组内排名
print("\n2. 计算组内销售额排名:")
sales_data['sales_rank_in_group'] = sales_data.groupby('salesperson')['sales_amount'].transform('rank', ascending=False)

print("组内排名(前10行):")
print(sales_data[['salesperson', 'sales_amount', 'sales_rank_in_group']].head(10))

# 计算移动平均
print("\n3. 计算7天移动平均:")
sales_data_sorted = sales_data.sort_values(['salesperson', 'date'])
sales_data_sorted['sales_ma7'] = sales_data_sorted.groupby('salesperson')['sales_amount'].transform(
    lambda x: x.rolling(window=7, min_periods=1).mean()
)

print("移动平均(Alice的前10条记录):")
alice_ma = sales_data_sorted[sales_data_sorted['salesperson'] == 'Alice'][
    ['date', 'sales_amount', 'sales_ma7']
].head(10)
print(alice_ma)

# 计算累计值
print("\n4. 计算累计销售额:")
sales_data['cumulative_sales'] = sales_data.groupby('salesperson')['sales_amount'].transform('cumsum')

print("累计销售额(前10行):")
print(sales_data[['salesperson', 'sales_amount', 'cumulative_sales']].head(10))

6.4.2 高级转换操作

# 高级转换操作
print("高级转换操作:")

# 计算组内百分比
print("1. 计算各销售人员在总销售额中的占比:")
total_sales_by_person = sales_data.groupby('salesperson')['sales_amount'].sum()
sales_data['person_sales_percentage'] = sales_data['salesperson'].map(
    lambda x: total_sales_by_person[x] / total_sales_by_person.sum() * 100
)

print("销售人员占比:")
person_percentage = sales_data.groupby('salesperson')['person_sales_percentage'].first().sort_values(ascending=False)
print(person_percentage.round(2))

# 计算相对于组均值的偏差
print("\n2. 计算相对于组均值的偏差:")
sales_data['deviation_from_group_mean'] = sales_data.groupby('region')['sales_amount'].transform(
    lambda x: x - x.mean()
)

print("偏差统计:")
deviation_stats = sales_data.groupby('region')['deviation_from_group_mean'].agg(['mean', 'std', 'min', 'max'])
print(deviation_stats.round(2))

# 标记异常值
print("\n3. 标记组内异常值:")
def mark_outliers(group, column, threshold=2):
    """标记超过阈值标准差的异常值"""
    mean_val = group[column].mean()
    std_val = group[column].std()
    return np.abs(group[column] - mean_val) > threshold * std_val

sales_data['is_outlier'] = sales_data.groupby('salesperson').apply(
    lambda x: mark_outliers(x, 'sales_amount')
).reset_index(level=0, drop=True)

print("异常值统计:")
outlier_stats = sales_data.groupby('salesperson')['is_outlier'].agg(['sum', 'mean'])
outlier_stats.columns = ['异常值数量', '异常值比例']
print(outlier_stats.round(3))

6.5 数据透视表

6.5.1 基本透视表

# 基本透视表
print("基本透视表:")

# 简单透视表
print("1. 销售人员vs地区的销售额透视表:")
pivot_basic = sales_data.pivot_table(
    values='sales_amount',
    index='salesperson',
    columns='region',
    aggfunc='sum'
)
print(pivot_basic.round(2))

# 添加边际总计
print("\n2. 带总计的透视表:")
pivot_with_margins = sales_data.pivot_table(
    values='sales_amount',
    index='salesperson',
    columns='region',
    aggfunc='sum',
    margins=True,
    margins_name='总计'
)
print(pivot_with_margins.round(2))

# 多个值的透视表
print("\n3. 多个值的透视表:")
pivot_multi_values = sales_data.pivot_table(
    values=['sales_amount', 'quantity'],
    index='salesperson',
    columns='region',
    aggfunc='sum'
)
print(pivot_multi_values.round(2))

# 不同聚合函数
print("\n4. 不同聚合函数的透视表:")
pivot_multi_agg = sales_data.pivot_table(
    values='sales_amount',
    index='salesperson',
    columns='region',
    aggfunc=['sum', 'mean', 'count']
)
print(pivot_multi_agg.round(2))

6.5.2 复杂透视表

# 复杂透视表
print("复杂透视表:")

# 多级索引透视表
print("1. 多级索引透视表:")
pivot_multi_index = sales_data.pivot_table(
    values='sales_amount',
    index=['region', 'salesperson'],
    columns=['product_category', 'customer_type'],
    aggfunc='sum',
    fill_value=0
)
print(pivot_multi_index.head(10))

# 时间序列透视表
print("\n2. 月度销售透视表:")
pivot_monthly = sales_data.pivot_table(
    values='sales_amount',
    index='month',
    columns='product_category',
    aggfunc='sum',
    margins=True
)
print(pivot_monthly.round(2))

# 自定义聚合函数的透视表
print("\n3. 自定义聚合函数透视表:")
def sales_efficiency(series):
    """销售效率:平均销售额/销售次数"""
    return series.mean() / len(series) if len(series) > 0 else 0

pivot_custom = sales_data.pivot_table(
    values='sales_amount',
    index='salesperson',
    columns='quarter',
    aggfunc=sales_efficiency
)
print(pivot_custom.round(4))

# 条件透视表
print("\n4. 高价值订单透视表(销售额>200):")
high_value_sales = sales_data[sales_data['sales_amount'] > 200]
pivot_high_value = high_value_sales.pivot_table(
    values='sales_amount',
    index='salesperson',
    columns='region',
    aggfunc=['count', 'sum'],
    fill_value=0
)
print(pivot_high_value)

6.5.3 透视表可视化

# 透视表可视化
print("透视表可视化:")

# 创建热力图
plt.figure(figsize=(12, 8))

# 销售人员vs地区的销售额热力图
pivot_for_heatmap = sales_data.pivot_table(
    values='sales_amount',
    index='salesperson',
    columns='region',
    aggfunc='sum'
)

plt.subplot(2, 2, 1)
sns.heatmap(pivot_for_heatmap, annot=True, fmt='.0f', cmap='YlOrRd')
plt.title('销售人员vs地区销售额热力图')

# 产品类别vs月份的销售量热力图
pivot_product_month = sales_data.pivot_table(
    values='quantity',
    index='product_category',
    columns='month',
    aggfunc='sum'
)

plt.subplot(2, 2, 2)
sns.heatmap(pivot_product_month, annot=True, fmt='.0f', cmap='Blues')
plt.title('产品类别vs月份销售量热力图')

# 地区vs客户类型的平均销售额
pivot_region_customer = sales_data.pivot_table(
    values='sales_amount',
    index='region',
    columns='customer_type',
    aggfunc='mean'
)

plt.subplot(2, 2, 3)
sns.heatmap(pivot_region_customer, annot=True, fmt='.1f', cmap='Greens')
plt.title('地区vs客户类型平均销售额')

# 销售人员vs季度的销售次数
pivot_person_quarter = sales_data.pivot_table(
    values='sales_amount',
    index='salesperson',
    columns='quarter',
    aggfunc='count'
)

plt.subplot(2, 2, 4)
sns.heatmap(pivot_person_quarter, annot=True, fmt='.0f', cmap='Purples')
plt.title('销售人员vs季度销售次数')

plt.tight_layout()
plt.show()

# 透视表条形图
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# 各地区总销售额
region_sales = sales_data.groupby('region')['sales_amount'].sum().sort_values(ascending=False)
region_sales.plot(kind='bar', ax=axes[0, 0], color='skyblue')
axes[0, 0].set_title('各地区总销售额')
axes[0, 0].set_ylabel('销售额')

# 各产品类别平均销售额
category_avg = sales_data.groupby('product_category')['sales_amount'].mean().sort_values(ascending=False)
category_avg.plot(kind='bar', ax=axes[0, 1], color='lightgreen')
axes[0, 1].set_title('各产品类别平均销售额')
axes[0, 1].set_ylabel('平均销售额')

# 各销售人员销售次数
person_count = sales_data.groupby('salesperson')['sales_amount'].count().sort_values(ascending=False)
person_count.plot(kind='bar', ax=axes[1, 0], color='orange')
axes[1, 0].set_title('各销售人员销售次数')
axes[1, 0].set_ylabel('销售次数')

# 月度销售趋势
monthly_sales = sales_data.groupby('month')['sales_amount'].sum()
monthly_sales.plot(kind='line', ax=axes[1, 1], marker='o', color='red')
axes[1, 1].set_title('月度销售趋势')
axes[1, 1].set_ylabel('销售额')
axes[1, 1].set_xlabel('月份')

plt.tight_layout()
plt.show()

6.6 多级分组

6.6.1 多级分组基础

# 多级分组基础
print("多级分组基础:")

# 按多个列分组
print("1. 按地区和产品类别分组:")
multi_group = sales_data.groupby(['region', 'product_category'])

# 查看分组结构
print(f"分组数量:{multi_group.ngroups}")
print(f"分组大小:\n{multi_group.size()}")

# 多级分组聚合
print("\n2. 多级分组聚合:")
multi_agg = multi_group['sales_amount'].agg(['count', 'sum', 'mean'])
print(multi_agg.round(2))

# 多级分组的层级操作
print("\n3. 按第一级分组求和:")
level_0_sum = multi_agg.groupby(level=0).sum()
print(level_0_sum)

print("\n4. 按第二级分组求平均:")
level_1_mean = multi_agg.groupby(level=1).mean()
print(level_1_mean.round(2))

# 重置索引
print("\n5. 重置多级索引:")
multi_agg_reset = multi_agg.reset_index()
print(multi_agg_reset.head())

6.6.2 复杂多级分组

# 复杂多级分组
print("复杂多级分组:")

# 三级分组
print("1. 三级分组(地区-产品类别-客户类型):")
triple_group = sales_data.groupby(['region', 'product_category', 'customer_type'])
triple_agg = triple_group['sales_amount'].agg(['count', 'sum', 'mean'])
print(triple_agg.head(10))

# 不同级别的不同聚合
print("\n2. 不同级别的不同聚合:")
complex_agg = sales_data.groupby(['region', 'salesperson']).agg({
    'sales_amount': ['sum', 'mean', 'count'],
    'quantity': ['sum', 'mean'],
    'unit_price': 'mean'
})

# 展平列名
complex_agg.columns = ['_'.join(col).strip() for col in complex_agg.columns.values]
print(complex_agg.head())

# 条件分组
print("\n3. 条件分组:")
# 创建销售额等级
sales_data['sales_level'] = pd.cut(
    sales_data['sales_amount'],
    bins=[0, 50, 100, 200, float('inf')],
    labels=['低', '中', '高', '极高']
)

level_group = sales_data.groupby(['region', 'sales_level'])
level_stats = level_group.size().unstack(fill_value=0)
print("各地区销售额等级分布:")
print(level_stats)

# 计算比例
level_props = level_stats.div(level_stats.sum(axis=1), axis=0)
print("\n各地区销售额等级比例:")
print(level_props.round(3))

6.7 分组过滤

6.7.1 基本分组过滤

# 基本分组过滤
print("基本分组过滤:")

# 过滤组大小
print("1. 过滤销售次数大于50的销售人员:")
frequent_sellers = sales_data.groupby('salesperson').filter(lambda x: len(x) > 50)
print(f"过滤后数据量:{len(frequent_sellers)}")
print("剩余销售人员:", frequent_sellers['salesperson'].unique())

# 过滤组统计值
print("\n2. 过滤平均销售额大于100的地区:")
high_avg_regions = sales_data.groupby('region').filter(lambda x: x['sales_amount'].mean() > 100)
print(f"过滤后数据量:{len(high_avg_regions)}")
print("剩余地区:", high_avg_regions['region'].unique())

# 过滤组内最大值
print("\n3. 过滤最大销售额大于500的产品类别:")
high_max_categories = sales_data.groupby('product_category').filter(
    lambda x: x['sales_amount'].max() > 500
)
print(f"过滤后数据量:{len(high_max_categories)}")
print("剩余产品类别:", high_max_categories['product_category'].unique())

# 复合条件过滤
print("\n4. 复合条件过滤:")
complex_filter = sales_data.groupby('salesperson').filter(
    lambda x: (len(x) > 30) and (x['sales_amount'].mean() > 80) and (x['sales_amount'].std() < 100)
)
print(f"复合条件过滤后数据量:{len(complex_filter)}")
print("符合条件的销售人员:", complex_filter['salesperson'].unique())

6.7.2 高级分组过滤

# 高级分组过滤
print("高级分组过滤:")

# 基于分位数的过滤
print("1. 过滤销售额中位数在前50%的地区:")
region_medians = sales_data.groupby('region')['sales_amount'].median()
median_threshold = region_medians.quantile(0.5)
top_median_regions = sales_data.groupby('region').filter(
    lambda x: x['sales_amount'].median() > median_threshold
)
print(f"过滤后地区:{top_median_regions['region'].unique()}")

# 基于相关性的过滤
print("\n2. 过滤销售额与数量相关性大于0.3的销售人员:")
def high_correlation_filter(group):
    if len(group) < 10:  # 样本量太小
        return False
    correlation = group['sales_amount'].corr(group['quantity'])
    return correlation > 0.3

high_corr_sellers = sales_data.groupby('salesperson').filter(high_correlation_filter)
print(f"高相关性销售人员:{high_corr_sellers['salesperson'].unique()}")

# 基于趋势的过滤
print("\n3. 过滤销售额呈上升趋势的销售人员:")
def upward_trend_filter(group):
    if len(group) < 20:  # 需要足够的数据点
        return False
    # 简单的趋势检测:后半部分平均值 > 前半部分平均值
    mid_point = len(group) // 2
    first_half_avg = group.iloc[:mid_point]['sales_amount'].mean()
    second_half_avg = group.iloc[mid_point:]['sales_amount'].mean()
    return second_half_avg > first_half_avg * 1.1  # 至少增长10%

# 按日期排序后应用过滤
sales_sorted = sales_data.sort_values(['salesperson', 'date'])
upward_trend_sellers = sales_sorted.groupby('salesperson').filter(upward_trend_filter)
print(f"上升趋势销售人员:{upward_trend_sellers['salesperson'].unique()}")

# 组合多个过滤条件
print("\n4. 组合过滤条件:")
def comprehensive_filter(group):
    conditions = [
        len(group) >= 40,  # 足够的销售记录
        group['sales_amount'].mean() >= 90,  # 平均销售额不低
        group['sales_amount'].std() <= 120,  # 销售额稳定性
        (group['customer_type'] == 'Returning').mean() >= 0.6  # 回头客比例高
    ]
    return all(conditions)

elite_sellers = sales_data.groupby('salesperson').filter(comprehensive_filter)
print(f"精英销售人员:{elite_sellers['salesperson'].unique()}")

# 显示精英销售人员的详细统计
if len(elite_sellers) > 0:
    elite_stats = elite_sellers.groupby('salesperson').agg({
        'sales_amount': ['count', 'sum', 'mean', 'std'],
        'customer_type': lambda x: (x == 'Returning').mean()
    })
    elite_stats.columns = ['销售次数', '总销售额', '平均销售额', '销售额标准差', '回头客比例']
    print("\n精英销售人员统计:")
    print(elite_stats.round(2))

6.8 窗口函数

6.8.1 滚动窗口

# 滚动窗口
print("滚动窗口:")

# 准备时间序列数据
daily_sales = sales_data.groupby('date')['sales_amount'].sum().reset_index()
daily_sales = daily_sales.sort_values('date')

print("1. 7天滚动平均:")
daily_sales['sales_ma7'] = daily_sales['sales_amount'].rolling(window=7).mean()
print(daily_sales.head(10))

print("\n2. 30天滚动统计:")
daily_sales['sales_ma30'] = daily_sales['sales_amount'].rolling(window=30).mean()
daily_sales['sales_std30'] = daily_sales['sales_amount'].rolling(window=30).std()
daily_sales['sales_min30'] = daily_sales['sales_amount'].rolling(window=30).min()
daily_sales['sales_max30'] = daily_sales['sales_amount'].rolling(window=30).max()

print(daily_sales[['date', 'sales_amount', 'sales_ma30', 'sales_std30']].head(35))

# 分组滚动窗口
print("\n3. 各销售人员的滚动统计:")
sales_person_daily = sales_data.groupby(['salesperson', 'date'])['sales_amount'].sum().reset_index()
sales_person_daily = sales_person_daily.sort_values(['salesperson', 'date'])

sales_person_daily['rolling_avg'] = sales_person_daily.groupby('salesperson')['sales_amount'].transform(
    lambda x: x.rolling(window=7, min_periods=1).mean()
)

print("Alice的滚动平均(前10天):")
alice_rolling = sales_person_daily[sales_person_daily['salesperson'] == 'Alice'].head(10)
print(alice_rolling[['date', 'sales_amount', 'rolling_avg']])

6.8.2 扩展窗口

# 扩展窗口
print("扩展窗口:")

print("1. 累计统计:")
daily_sales['cumulative_sum'] = daily_sales['sales_amount'].expanding().sum()
daily_sales['cumulative_mean'] = daily_sales['sales_amount'].expanding().mean()
daily_sales['cumulative_std'] = daily_sales['sales_amount'].expanding().std()

print(daily_sales[['date', 'sales_amount', 'cumulative_sum', 'cumulative_mean']].head(10))

print("\n2. 各销售人员的累计业绩:")
sales_person_daily['cumulative_sales'] = sales_person_daily.groupby('salesperson')['sales_amount'].transform(
    lambda x: x.expanding().sum()
)

print("Bob的累计销售额(前10天):")
bob_cumulative = sales_person_daily[sales_person_daily['salesperson'] == 'Bob'].head(10)
print(bob_cumulative[['date', 'sales_amount', 'cumulative_sales']])

# 扩展窗口的应用:计算历史最佳表现
print("\n3. 历史最佳表现追踪:")
sales_person_daily['historical_max'] = sales_person_daily.groupby('salesperson')['sales_amount'].transform(
    lambda x: x.expanding().max()
)
sales_person_daily['is_new_record'] = (
    sales_person_daily['sales_amount'] == sales_person_daily['historical_max']
)

print("Charlie的新记录情况:")
charlie_records = sales_person_daily[
    (sales_person_daily['salesperson'] == 'Charlie') & 
    (sales_person_daily['is_new_record'])
].head(5)
print(charlie_records[['date', 'sales_amount', 'historical_max']])

6.9 性能优化

6.9.1 分组操作性能优化

# 分组操作性能优化
print("分组操作性能优化:")

# 创建大数据集进行性能测试
import time

large_data = pd.DataFrame({
    'group': np.random.choice(['A', 'B', 'C', 'D', 'E'], 1000000),
    'value1': np.random.randn(1000000),
    'value2': np.random.randn(1000000),
    'category': np.random.choice(['X', 'Y', 'Z'], 1000000)
})

print(f"大数据集形状:{large_data.shape}")

# 1. 分类数据类型优化
print("\n1. 分类数据类型优化:")
start_time = time.time()
result1 = large_data.groupby('group')['value1'].sum()
original_time = time.time() - start_time

large_data['group'] = large_data['group'].astype('category')
large_data['category'] = large_data['category'].astype('category')

start_time = time.time()
result2 = large_data.groupby('group')['value1'].sum()
category_time = time.time() - start_time

print(f"原始分组时间:{original_time:.4f}秒")
print(f"分类类型分组时间:{category_time:.4f}秒")
print(f"性能提升:{original_time/category_time:.2f}倍")

# 2. 预排序优化
print("\n2. 预排序优化:")
unsorted_data = large_data.sample(frac=1).reset_index(drop=True)  # 打乱顺序

start_time = time.time()
result3 = unsorted_data.groupby('group')['value1'].sum()
unsorted_time = time.time() - start_time

sorted_data = unsorted_data.sort_values('group')
start_time = time.time()
result4 = sorted_data.groupby('group')['value1'].sum()
sorted_time = time.time() - start_time

print(f"未排序分组时间:{unsorted_time:.4f}秒")
print(f"预排序分组时间:{sorted_time:.4f}秒")
print(f"性能提升:{unsorted_time/sorted_time:.2f}倍")

# 3. 使用numba加速自定义聚合函数
try:
    from numba import jit
    
    @jit
    def fast_custom_agg(values):
        """使用numba加速的自定义聚合函数"""
        return np.sum(values ** 2) / len(values)
    
    print("\n3. numba加速测试:")
    start_time = time.time()
    result5 = large_data.groupby('group')['value1'].agg(lambda x: np.sum(x**2) / len(x))
    normal_agg_time = time.time() - start_time
    
    start_time = time.time()
    result6 = large_data.groupby('group')['value1'].agg(fast_custom_agg)
    numba_agg_time = time.time() - start_time
    
    print(f"普通自定义聚合时间:{normal_agg_time:.4f}秒")
    print(f"numba加速聚合时间:{numba_agg_time:.4f}秒")
    print(f"性能提升:{normal_agg_time/numba_agg_time:.2f}倍")
    
except ImportError:
    print("\n3. numba未安装,跳过加速测试")

6.9.2 内存优化技巧

# 内存优化技巧
print("内存优化技巧:")

# 1. 分块处理大数据
def chunked_groupby(df, group_col, value_col, agg_func, chunk_size=100000):
    """分块处理大数据的分组操作"""
    results = []
    for i in range(0, len(df), chunk_size):
        chunk = df.iloc[i:i+chunk_size]
        chunk_result = chunk.groupby(group_col)[value_col].agg(agg_func)
        results.append(chunk_result)
    
    # 合并结果
    combined = pd.concat(results)
    if agg_func == 'sum':
        return combined.groupby(combined.index).sum()
    elif agg_func == 'mean':
        # 对于均值,需要重新计算
        counts = pd.concat([chunk.groupby(group_col)[value_col].count() for chunk in 
                           [df.iloc[i:i+chunk_size] for i in range(0, len(df), chunk_size)]])
        counts = counts.groupby(counts.index).sum()
        sums = combined.groupby(combined.index).sum()
        return sums / counts
    else:
        return combined.groupby(combined.index).agg(agg_func)

print("1. 分块处理测试:")
start_time = time.time()
normal_result = large_data.groupby('group')['value1'].sum()
normal_time = time.time() - start_time

start_time = time.time()
chunked_result = chunked_groupby(large_data, 'group', 'value1', 'sum', chunk_size=200000)
chunked_time = time.time() - start_time

print(f"普通分组时间:{normal_time:.4f}秒")
print(f"分块分组时间:{chunked_time:.4f}秒")
print(f"结果一致性:{np.allclose(normal_result.sort_index(), chunked_result.sort_index())}")

# 2. 内存使用监控
def memory_usage_mb():
    """获取当前内存使用量(MB)"""
    import psutil
    process = psutil.Process()
    return process.memory_info().rss / 1024 / 1024

try:
    import psutil
    print(f"\n2. 当前内存使用:{memory_usage_mb():.2f} MB")
    
    # 大数据操作前后的内存对比
    before_memory = memory_usage_mb()
    large_result = large_data.groupby(['group', 'category']).agg({
        'value1': ['sum', 'mean', 'std'],
        'value2': ['sum', 'mean', 'std']
    })
    after_memory = memory_usage_mb()
    
    print(f"操作前内存:{before_memory:.2f} MB")
    print(f"操作后内存:{after_memory:.2f} MB")
    print(f"内存增长:{after_memory - before_memory:.2f} MB")
    
    # 清理内存
    del large_result
    import gc
    gc.collect()
    
except ImportError:
    print("\n2. psutil未安装,跳过内存监控")

6.10 实际应用案例

6.10.1 销售业绩分析

# 销售业绩分析案例
print("销售业绩分析案例:")

# 综合业绩分析
print("1. 销售人员综合业绩排名:")
performance_metrics = sales_data.groupby('salesperson').agg({
    'sales_amount': ['count', 'sum', 'mean', 'std'],
    'quantity': 'sum',
    'customer_type': lambda x: (x == 'Returning').mean()
}).round(2)

# 展平列名
performance_metrics.columns = ['销售次数', '总销售额', '平均销售额', '销售额标准差', '总销量', '回头客比例']

# 计算综合得分
performance_metrics['效率得分'] = (
    performance_metrics['平均销售额'] / performance_metrics['平均销售额'].max() * 0.3 +
    performance_metrics['总销售额'] / performance_metrics['总销售额'].max() * 0.4 +
    performance_metrics['回头客比例'] * 0.3
)

performance_ranking = performance_metrics.sort_values('效率得分', ascending=False)
print(performance_ranking)

# 地区业绩对比
print("\n2. 地区业绩对比:")
region_performance = sales_data.groupby('region').agg({
    'sales_amount': ['count', 'sum', 'mean'],
    'salesperson': 'nunique',
    'product_category': 'nunique'
}).round(2)

region_performance.columns = ['订单数', '总销售额', '平均订单额', '销售人员数', '产品类别数']
region_performance['人均销售额'] = region_performance['总销售额'] / region_performance['销售人员数']
region_performance = region_performance.sort_values('总销售额', ascending=False)
print(region_performance)

# 产品类别分析
print("\n3. 产品类别盈利能力分析:")
category_analysis = sales_data.groupby('product_category').agg({
    'sales_amount': ['count', 'sum', 'mean', 'median'],
    'unit_price': ['mean', 'std'],
    'quantity': 'sum'
}).round(2)

category_analysis.columns = ['订单数', '总销售额', '平均订单额', '中位订单额', '平均单价', '单价标准差', '总销量']
category_analysis['市场份额'] = category_analysis['总销售额'] / category_analysis['总销售额'].sum()
category_analysis = category_analysis.sort_values('总销售额', ascending=False)
print(category_analysis)

6.10.2 客户行为分析

# 客户行为分析案例
print("\n客户行为分析案例:")

# 创建客户数据
customer_data = sales_data.groupby(['region', 'customer_type']).agg({
    'sales_amount': ['count', 'sum', 'mean'],
    'quantity': 'sum',
    'salesperson': 'nunique'
}).round(2)

customer_data.columns = ['购买次数', '总消费额', '平均消费额', '总购买量', '接触销售人员数']

print("1. 客户类型分析:")
print(customer_data)

# 客户价值分层
print("\n2. 客户价值分层:")
customer_value = sales_data.groupby('customer_type').agg({
    'sales_amount': ['count', 'sum', 'mean'],
    'quantity': 'mean'
}).round(2)

customer_value.columns = ['购买频次', '总价值', '平均订单价值', '平均购买量']
customer_value['客户生命周期价值'] = customer_value['总价值'] / customer_value.index.map(
    {'New': 1, 'Returning': 3}  # 假设回头客的生命周期是新客户的3倍
)

print(customer_value)

# 地区客户偏好分析
print("\n3. 地区客户偏好分析:")
region_preference = sales_data.groupby(['region', 'product_category']).agg({
    'sales_amount': 'sum',
    'quantity': 'sum'
}).round(2)

# 计算各地区各产品类别的占比
region_total = sales_data.groupby('region')['sales_amount'].sum()
region_preference['销售额占比'] = region_preference.groupby(level=0)['sales_amount'].transform(
    lambda x: x / x.sum()
)

print("各地区产品类别销售额占比:")
preference_pivot = region_preference['销售额占比'].unstack().round(3)
print(preference_pivot)

# 找出各地区的主要产品类别
print("\n各地区主要产品类别:")
for region in preference_pivot.index:
    main_category = preference_pivot.loc[region].idxmax()
    main_share = preference_pivot.loc[region].max()
    print(f"{region}: {main_category} ({main_share:.1%})")

6.11 本章小结

6.11.1 核心知识点

  1. GroupBy基础

    • 分组对象:df.groupby(column)
    • 分组属性:groups, size(), ngroups
    • 获取组:get_group(), 遍历组
  2. 聚合函数

    • 基本聚合:sum(), mean(), count(), std()
    • 多重聚合:agg(['func1', 'func2'])
    • 自定义聚合:agg(custom_function)
    • 命名聚合:agg(name=('column', 'function'))
  3. 数据转换

    • Transform:transform(function)
    • 标准化、排名、移动平均
    • 组内计算和标记
  4. 数据透视表

    • 基本透视:pivot_table()
    • 多级透视:多个index和columns
    • 自定义聚合函数
    • 边际总计:margins=True
  5. 多级分组

    • 多列分组:groupby(['col1', 'col2'])
    • 层级操作:groupby(level=0)
    • 复杂聚合组合
  6. 分组过滤

    • 条件过滤:filter(function)
    • 组大小过滤
    • 统计值过滤
  7. 窗口函数

    • 滚动窗口:rolling(window)
    • 扩展窗口:expanding()
    • 分组窗口操作

6.11.2 最佳实践

  • 使用分类数据类型提高分组性能
  • 预排序数据可以加速分组操作
  • 对于大数据集考虑分块处理
  • 合理使用transform避免重复计算
  • 透视表适合交叉分析和报表生成

6.11.3 性能优化要点

  • 分类数据类型:减少内存使用和提高速度
  • 预排序:对于大数据集显著提升性能
  • 避免复杂的自定义函数:优先使用内置函数
  • 分块处理:处理超大数据集
  • 内存管理:及时清理不需要的中间结果

6.11.4 下一步学习

在下一章中,我们将学习: - 数据合并和连接操作 - 不同类型的join操作 - 数据拼接和组合 - 处理重复和冲突数据


练习题

  1. 创建一个复杂的销售数据分析报告
  2. 实现自定义的分组聚合函数
  3. 设计一个多维度的数据透视表
  4. 优化大数据集的分组操作性能
  5. 使用窗口函数分析时间序列趋势

记住:分组聚合是数据分析的核心技能,熟练掌握这些操作将大大提高你的数据分析效率!