9.1 章节概述
数据可视化和统计分析是数据科学工作流程中的关键环节。Pandas提供了强大的内置可视化功能,并与matplotlib、seaborn等可视化库无缝集成。本章将深入探讨如何使用Pandas进行数据可视化和统计分析,帮助你从数据中提取有价值的洞察。
graph TD
A[数据可视化与统计分析] --> B[Pandas内置可视化]
A --> C[统计描述分析]
A --> D[分布分析]
A --> E[相关性分析]
A --> F[假设检验]
A --> G[高级可视化]
B --> B1[基础图表]
B --> B2[时间序列图]
B --> B3[分布图]
B --> B4[多变量图]
C --> C1[描述性统计]
C --> C2[分组统计]
C --> C3[交叉表分析]
D --> D1[正态性检验]
D --> D2[分布拟合]
D --> D3[异常值分析]
E --> E1[相关系数]
E --> E2[协方差分析]
E --> E3[因子分析]
F --> F1[t检验]
F --> F2[卡方检验]
F --> F3[方差分析]
G --> G1[交互式图表]
G --> G2[仪表板]
G --> G3[地理可视化]
9.1.1 学习目标
- 掌握Pandas内置的可视化功能
- 学会进行全面的统计描述分析
- 理解数据分布特征和异常值检测
- 掌握相关性分析和假设检验方法
- 学习高级可视化技巧和最佳实践
- 了解交互式可视化的实现方法
9.1.2 应用场景
- 探索性数据分析:快速了解数据特征和分布
- 业务报告:创建清晰的数据可视化报告
- 假设验证:通过统计检验验证业务假设
- 异常检测:识别数据中的异常模式
- 趋势分析:分析时间序列数据的趋势和季节性
9.2 Pandas内置可视化
9.2.1 基础图表
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
import warnings
warnings.filterwarnings('ignore')
# 设置中文字体和样式
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['figure.figsize'] = (12, 8)
sns.set_style("whitegrid")
print("=== Pandas内置可视化 ===")
# 1. 创建示例数据
print("1. 创建示例数据:")
np.random.seed(42)
# 销售数据
dates = pd.date_range('2023-01-01', periods=365, freq='D')
sales_data = pd.DataFrame({
'date': dates,
'product_A': 1000 + np.cumsum(np.random.randn(365) * 50) + 200 * np.sin(np.arange(365) * 2 * np.pi / 365),
'product_B': 800 + np.cumsum(np.random.randn(365) * 30) + 150 * np.cos(np.arange(365) * 2 * np.pi / 365),
'product_C': 600 + np.cumsum(np.random.randn(365) * 40),
'region': np.random.choice(['北区', '南区', '东区', '西区'], 365),
'channel': np.random.choice(['线上', '线下'], 365, p=[0.6, 0.4])
})
sales_data.set_index('date', inplace=True)
print("销售数据示例:")
print(sales_data.head())
# 2. 线图
print("\n2. 线图:")
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 基本线图
sales_data[['product_A', 'product_B', 'product_C']].plot(ax=axes[0,0], title='产品销售趋势')
axes[0,0].set_ylabel('销售额')
# 子图
sales_data['product_A'].plot(ax=axes[0,1], title='产品A销售趋势', color='red')
axes[0,1].set_ylabel('销售额')
# 滚动平均
sales_data[['product_A', 'product_B']].rolling(30).mean().plot(ax=axes[1,0], title='30天滚动平均')
axes[1,0].set_ylabel('销售额')
# 累积图
sales_data[['product_A', 'product_B', 'product_C']].cumsum().plot(ax=axes[1,1], title='累积销售额')
axes[1,1].set_ylabel('累积销售额')
plt.tight_layout()
plt.show()
# 3. 柱状图
print("\n3. 柱状图:")
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 月度销售柱状图
monthly_sales = sales_data.resample('M').sum()
monthly_sales[['product_A', 'product_B', 'product_C']].plot(kind='bar', ax=axes[0,0], title='月度销售额')
axes[0,0].set_ylabel('销售额')
axes[0,0].tick_params(axis='x', rotation=45)
# 水平柱状图
monthly_sales.iloc[-6:][['product_A', 'product_B', 'product_C']].plot(kind='barh', ax=axes[0,1], title='最近6个月销售额')
axes[0,1].set_xlabel('销售额')
# 堆叠柱状图
monthly_sales[['product_A', 'product_B', 'product_C']].plot(kind='bar', stacked=True, ax=axes[1,0], title='堆叠月度销售额')
axes[1,0].set_ylabel('销售额')
axes[1,0].tick_params(axis='x', rotation=45)
# 分组柱状图
region_sales = sales_data.groupby('region')[['product_A', 'product_B', 'product_C']].sum()
region_sales.plot(kind='bar', ax=axes[1,1], title='各区域销售额')
axes[1,1].set_ylabel('销售额')
axes[1,1].tick_params(axis='x', rotation=45)
plt.tight_layout()
plt.show()
# 4. 散点图
print("\n4. 散点图:")
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 基本散点图
sales_data.plot(kind='scatter', x='product_A', y='product_B', ax=axes[0,0], title='产品A vs 产品B', alpha=0.6)
# 带颜色映射的散点图
sales_data.plot(kind='scatter', x='product_A', y='product_C', c='product_B',
colormap='viridis', ax=axes[0,1], title='产品A vs 产品C (颜色=产品B)')
# 分类散点图
for region in sales_data['region'].unique():
region_data = sales_data[sales_data['region'] == region]
axes[1,0].scatter(region_data['product_A'], region_data['product_B'],
label=region, alpha=0.6)
axes[1,0].set_xlabel('产品A')
axes[1,0].set_ylabel('产品B')
axes[1,0].set_title('按区域分类的散点图')
axes[1,0].legend()
# 气泡图
sales_data.plot(kind='scatter', x='product_A', y='product_B', s=sales_data['product_C']/10,
ax=axes[1,1], title='气泡图 (大小=产品C)', alpha=0.6)
plt.tight_layout()
plt.show()
9.2.2 分布图
print("\n=== 分布图 ===")
# 1. 直方图
print("1. 直方图:")
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 基本直方图
sales_data['product_A'].plot(kind='hist', bins=30, ax=axes[0,0], title='产品A销售分布', alpha=0.7)
axes[0,0].set_xlabel('销售额')
axes[0,0].set_ylabel('频次')
# 多变量直方图
sales_data[['product_A', 'product_B', 'product_C']].plot(kind='hist', bins=30, alpha=0.7,
ax=axes[0,1], title='产品销售分布对比')
axes[0,1].set_xlabel('销售额')
axes[0,1].set_ylabel('频次')
# 密度图
sales_data['product_A'].plot(kind='density', ax=axes[1,0], title='产品A销售密度分布')
axes[1,0].set_xlabel('销售额')
axes[1,0].set_ylabel('密度')
# 多变量密度图
sales_data[['product_A', 'product_B', 'product_C']].plot(kind='density', ax=axes[1,1], title='产品销售密度对比')
axes[1,1].set_xlabel('销售额')
axes[1,1].set_ylabel('密度')
plt.tight_layout()
plt.show()
# 2. 箱线图
print("\n2. 箱线图:")
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 基本箱线图
sales_data[['product_A', 'product_B', 'product_C']].plot(kind='box', ax=axes[0,0], title='产品销售箱线图')
axes[0,0].set_ylabel('销售额')
# 分组箱线图
region_data = []
for region in sales_data['region'].unique():
region_sales = sales_data[sales_data['region'] == region]['product_A']
region_data.append(region_sales)
axes[0,1].boxplot(region_data, labels=sales_data['region'].unique())
axes[0,1].set_title('各区域产品A销售箱线图')
axes[0,1].set_ylabel('销售额')
# 小提琴图(使用seaborn)
import seaborn as sns
sns.violinplot(data=sales_data, y='product_A', ax=axes[1,0])
axes[1,0].set_title('产品A销售小提琴图')
# 分组小提琴图
sns.violinplot(data=sales_data, x='region', y='product_A', ax=axes[1,1])
axes[1,1].set_title('各区域产品A销售小提琴图')
axes[1,1].tick_params(axis='x', rotation=45)
plt.tight_layout()
plt.show()
# 3. 饼图
print("\n3. 饼图:")
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 区域销售占比
region_total = sales_data.groupby('region')['product_A'].sum()
region_total.plot(kind='pie', ax=axes[0,0], title='各区域产品A销售占比', autopct='%1.1f%%')
# 渠道销售占比
channel_total = sales_data.groupby('channel')['product_A'].sum()
channel_total.plot(kind='pie', ax=axes[0,1], title='各渠道产品A销售占比', autopct='%1.1f%%')
# 产品销售占比
product_total = sales_data[['product_A', 'product_B', 'product_C']].sum()
product_total.plot(kind='pie', ax=axes[1,0], title='产品销售总额占比', autopct='%1.1f%%')
# 环形图(使用matplotlib)
sizes = product_total.values
labels = product_total.index
colors = ['#ff9999', '#66b3ff', '#99ff99']
wedges, texts, autotexts = axes[1,1].pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%',
startangle=90, pctdistance=0.85)
# 创建环形效果
centre_circle = plt.Circle((0,0), 0.70, fc='white')
axes[1,1].add_artist(centre_circle)
axes[1,1].set_title('产品销售环形图')
plt.tight_layout()
plt.show()
9.2.3 时间序列可视化
print("\n=== 时间序列可视化 ===")
# 1. 时间序列基础图
print("1. 时间序列基础图:")
fig, axes = plt.subplots(3, 1, figsize=(15, 12))
# 原始时间序列
sales_data['product_A'].plot(ax=axes[0], title='产品A日销售趋势', color='blue')
axes[0].set_ylabel('销售额')
# 移动平均
sales_data['product_A'].rolling(7).mean().plot(ax=axes[1], title='产品A周移动平均', color='red')
sales_data['product_A'].rolling(30).mean().plot(ax=axes[1], color='green')
axes[1].legend(['7天移动平均', '30天移动平均'])
axes[1].set_ylabel('销售额')
# 季节性分解
from statsmodels.tsa.seasonal import seasonal_decompose
# 确保数据没有缺失值
product_a_clean = sales_data['product_A'].fillna(method='ffill')
decomposition = seasonal_decompose(product_a_clean, model='additive', period=30)
decomposition.trend.plot(ax=axes[2], title='趋势分量', color='orange')
axes[2].set_ylabel('趋势')
plt.tight_layout()
plt.show()
# 2. 季节性分析
print("\n2. 季节性分析:")
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 月度模式
monthly_pattern = sales_data.groupby(sales_data.index.month)['product_A'].mean()
monthly_pattern.plot(kind='bar', ax=axes[0,0], title='月度销售模式')
axes[0,0].set_xlabel('月份')
axes[0,0].set_ylabel('平均销售额')
# 星期模式
weekly_pattern = sales_data.groupby(sales_data.index.dayofweek)['product_A'].mean()
weekly_pattern.index = ['周一', '周二', '周三', '周四', '周五', '周六', '周日']
weekly_pattern.plot(kind='bar', ax=axes[0,1], title='星期销售模式')
axes[0,1].set_xlabel('星期')
axes[0,1].set_ylabel('平均销售额')
axes[0,1].tick_params(axis='x', rotation=45)
# 热力图 - 月份vs星期
pivot_data = sales_data.pivot_table(values='product_A',
index=sales_data.index.month,
columns=sales_data.index.dayofweek,
aggfunc='mean')
im = axes[1,0].imshow(pivot_data.values, cmap='YlOrRd', aspect='auto')
axes[1,0].set_title('月份-星期销售热力图')
axes[1,0].set_xlabel('星期')
axes[1,0].set_ylabel('月份')
axes[1,0].set_xticks(range(7))
axes[1,0].set_xticklabels(['周一', '周二', '周三', '周四', '周五', '周六', '周日'])
axes[1,0].set_yticks(range(12))
axes[1,0].set_yticklabels(range(1, 13))
plt.colorbar(im, ax=axes[1,0])
# 累积分布
sales_data['product_A'].cumsum().plot(ax=axes[1,1], title='产品A累积销售额')
axes[1,1].set_ylabel('累积销售额')
plt.tight_layout()
plt.show()
# 3. 多变量时间序列
print("\n3. 多变量时间序列:")
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 多产品对比
sales_data[['product_A', 'product_B', 'product_C']].plot(ax=axes[0,0], title='多产品销售对比')
axes[0,0].set_ylabel('销售额')
# 标准化对比
normalized_data = sales_data[['product_A', 'product_B', 'product_C']].apply(lambda x: (x - x.mean()) / x.std())
normalized_data.plot(ax=axes[0,1], title='标准化销售对比')
axes[0,1].set_ylabel('标准化销售额')
# 相关性滚动图
rolling_corr = sales_data['product_A'].rolling(30).corr(sales_data['product_B'])
rolling_corr.plot(ax=axes[1,0], title='产品A与产品B滚动相关性')
axes[1,0].set_ylabel('相关系数')
axes[1,0].axhline(y=0, color='red', linestyle='--', alpha=0.5)
# 差值图
diff_data = sales_data['product_A'] - sales_data['product_B']
diff_data.plot(ax=axes[1,1], title='产品A与产品B销售差值')
axes[1,1].set_ylabel('销售差值')
axes[1,1].axhline(y=0, color='red', linestyle='--', alpha=0.5)
plt.tight_layout()
plt.show()
9.2.4 多变量可视化
print("\n=== 多变量可视化 ===")
# 1. 相关性矩阵
print("1. 相关性矩阵:")
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 计算相关性矩阵
numeric_cols = ['product_A', 'product_B', 'product_C']
corr_matrix = sales_data[numeric_cols].corr()
# 热力图
im1 = axes[0,0].imshow(corr_matrix, cmap='coolwarm', vmin=-1, vmax=1)
axes[0,0].set_title('产品销售相关性矩阵')
axes[0,0].set_xticks(range(len(numeric_cols)))
axes[0,0].set_yticks(range(len(numeric_cols)))
axes[0,0].set_xticklabels(numeric_cols)
axes[0,0].set_yticklabels(numeric_cols)
# 添加数值标签
for i in range(len(numeric_cols)):
for j in range(len(numeric_cols)):
axes[0,0].text(j, i, f'{corr_matrix.iloc[i, j]:.2f}',
ha='center', va='center', color='white' if abs(corr_matrix.iloc[i, j]) > 0.5 else 'black')
plt.colorbar(im1, ax=axes[0,0])
# 使用seaborn绘制更美观的热力图
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0,
square=True, ax=axes[0,1], cbar_kws={'shrink': 0.8})
axes[0,1].set_title('Seaborn相关性热力图')
# 散点图矩阵
from pandas.plotting import scatter_matrix
scatter_matrix(sales_data[numeric_cols], ax=axes[1,0], figsize=(8, 8), alpha=0.6)
axes[1,0].set_title('散点图矩阵')
# 平行坐标图
from pandas.plotting import parallel_coordinates
# 准备数据
plot_data = sales_data[numeric_cols + ['region']].sample(100) # 随机采样100个点
parallel_coordinates(plot_data, 'region', ax=axes[1,1], alpha=0.6)
axes[1,1].set_title('平行坐标图')
axes[1,1].legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
plt.show()
# 2. 分组可视化
print("\n2. 分组可视化:")
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 分组箱线图
sales_data.boxplot(column='product_A', by='region', ax=axes[0,0])
axes[0,0].set_title('各区域产品A销售分布')
axes[0,0].set_xlabel('区域')
axes[0,0].set_ylabel('销售额')
# 分组直方图
for i, region in enumerate(sales_data['region'].unique()):
region_data = sales_data[sales_data['region'] == region]['product_A']
axes[0,1].hist(region_data, alpha=0.6, label=region, bins=20)
axes[0,1].set_title('各区域产品A销售分布')
axes[0,1].set_xlabel('销售额')
axes[0,1].set_ylabel('频次')
axes[0,1].legend()
# 分组条形图
region_channel = sales_data.groupby(['region', 'channel'])['product_A'].sum().unstack()
region_channel.plot(kind='bar', ax=axes[1,0], title='区域-渠道销售额')
axes[1,0].set_ylabel('销售额')
axes[1,0].tick_params(axis='x', rotation=45)
# 堆叠面积图
monthly_region = sales_data.groupby([sales_data.index.to_period('M'), 'region'])['product_A'].sum().unstack()
monthly_region.plot(kind='area', stacked=True, ax=axes[1,1], title='月度区域销售趋势')
axes[1,1].set_ylabel('销售额')
plt.tight_layout()
plt.show()
# 3. 高维数据可视化
print("\n3. 高维数据可视化:")
# 创建更多维度的数据
sales_data['total_sales'] = sales_data[['product_A', 'product_B', 'product_C']].sum(axis=1)
sales_data['sales_ratio'] = sales_data['product_A'] / sales_data['total_sales']
sales_data['volatility'] = sales_data['product_A'].rolling(7).std()
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 气泡图 - 三维数据
scatter = axes[0,0].scatter(sales_data['product_A'], sales_data['product_B'],
s=sales_data['product_C']/10, c=sales_data['total_sales'],
cmap='viridis', alpha=0.6)
axes[0,0].set_xlabel('产品A')
axes[0,0].set_ylabel('产品B')
axes[0,0].set_title('多维气泡图 (大小=产品C, 颜色=总销售)')
plt.colorbar(scatter, ax=axes[0,0])
# 分面散点图
regions = sales_data['region'].unique()
colors = ['red', 'blue', 'green', 'orange']
for i, region in enumerate(regions):
region_data = sales_data[sales_data['region'] == region]
axes[0,1].scatter(region_data['sales_ratio'], region_data['volatility'],
c=colors[i], label=region, alpha=0.6)
axes[0,1].set_xlabel('销售占比')
axes[0,1].set_ylabel('波动率')
axes[0,1].set_title('销售占比 vs 波动率')
axes[0,1].legend()
# 雷达图
def create_radar_chart(data, labels, ax, title):
angles = np.linspace(0, 2 * np.pi, len(labels), endpoint=False)
angles = np.concatenate((angles, [angles[0]]))
data = np.concatenate((data, [data[0]]))
ax.plot(angles, data, 'o-', linewidth=2)
ax.fill(angles, data, alpha=0.25)
ax.set_xticks(angles[:-1])
ax.set_xticklabels(labels)
ax.set_title(title)
ax.grid(True)
# 为雷达图准备数据
radar_data = []
radar_labels = ['产品A', '产品B', '产品C']
for region in sales_data['region'].unique()[:2]: # 只显示前两个区域
region_avg = sales_data[sales_data['region'] == region][numeric_cols].mean()
# 标准化到0-1范围
normalized = (region_avg - sales_data[numeric_cols].min()) / (sales_data[numeric_cols].max() - sales_data[numeric_cols].min())
radar_data.append(normalized.values)
# 创建雷达图
ax1 = plt.subplot(2, 2, 3, projection='polar')
create_radar_chart(radar_data[0], radar_labels, ax1, f'{sales_data["region"].unique()[0]}区域销售雷达图')
ax2 = plt.subplot(2, 2, 4, projection='polar')
create_radar_chart(radar_data[1], radar_labels, ax2, f'{sales_data["region"].unique()[1]}区域销售雷达图')
plt.tight_layout()
plt.show()
9.3 统计描述分析
9.3.1 描述性统计
print("\n=== 描述性统计 ===")
# 1. 基本描述性统计
print("1. 基本描述性统计:")
# 基本统计信息
basic_stats = sales_data[numeric_cols].describe()
print("基本统计信息:")
print(basic_stats)
# 扩展统计信息
def extended_describe(df):
"""扩展的描述性统计"""
stats = df.describe()
# 添加额外统计量
stats.loc['skewness'] = df.skew() # 偏度
stats.loc['kurtosis'] = df.kurtosis() # 峰度
stats.loc['variance'] = df.var() # 方差
stats.loc['range'] = df.max() - df.min() # 极差
stats.loc['iqr'] = df.quantile(0.75) - df.quantile(0.25) # 四分位距
stats.loc['cv'] = df.std() / df.mean() # 变异系数
return stats
extended_stats = extended_describe(sales_data[numeric_cols])
print("\n扩展统计信息:")
print(extended_stats.round(4))
# 2. 分位数分析
print("\n2. 分位数分析:")
# 自定义分位数
quantiles = [0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99]
quantile_stats = sales_data[numeric_cols].quantile(quantiles)
print("分位数统计:")
print(quantile_stats.round(2))
# 可视化分位数
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
for i, col in enumerate(numeric_cols):
quantile_stats[col].plot(kind='bar', ax=axes[i], title=f'{col}分位数分布')
axes[i].set_ylabel('销售额')
axes[i].tick_params(axis='x', rotation=45)
plt.tight_layout()
plt.show()
# 3. 分组统计
print("\n3. 分组统计:")
# 按区域分组统计
region_stats = sales_data.groupby('region')[numeric_cols].agg([
'count', 'mean', 'median', 'std', 'min', 'max'
])
print("区域分组统计:")
print(region_stats.round(2))
# 按渠道分组统计
channel_stats = sales_data.groupby('channel')[numeric_cols].describe()
print("\n渠道分组统计:")
print(channel_stats.round(2))
# 多维分组统计
multi_group_stats = sales_data.groupby(['region', 'channel'])[numeric_cols].agg({
'product_A': ['mean', 'std'],
'product_B': ['mean', 'std'],
'product_C': ['mean', 'std']
})
print("\n区域-渠道分组统计:")
print(multi_group_stats.round(2))
# 4. 时间序列统计
print("\n4. 时间序列统计:")
# 月度统计
monthly_stats = sales_data.resample('M')[numeric_cols].agg([
'mean', 'std', 'min', 'max', 'sum'
])
print("月度统计:")
print(monthly_stats.round(2))
# 季度统计
quarterly_stats = sales_data.resample('Q')[numeric_cols].agg([
'mean', 'median', 'std'
])
print("\n季度统计:")
print(quarterly_stats.round(2))
# 滚动统计
rolling_stats = pd.DataFrame({
'product_A_mean_30d': sales_data['product_A'].rolling(30).mean(),
'product_A_std_30d': sales_data['product_A'].rolling(30).std(),
'product_A_min_30d': sales_data['product_A'].rolling(30).min(),
'product_A_max_30d': sales_data['product_A'].rolling(30).max()
})
print("\n30天滚动统计示例:")
print(rolling_stats.tail(10).round(2))
9.3.2 交叉表分析
print("\n=== 交叉表分析 ===")
# 1. 基本交叉表
print("1. 基本交叉表:")
# 区域-渠道交叉表
cross_tab1 = pd.crosstab(sales_data['region'], sales_data['channel'])
print("区域-渠道交叉表:")
print(cross_tab1)
# 带边际总计的交叉表
cross_tab2 = pd.crosstab(sales_data['region'], sales_data['channel'], margins=True)
print("\n带边际总计的交叉表:")
print(cross_tab2)
# 2. 数值型交叉表
print("\n2. 数值型交叉表:")
# 创建分类变量
sales_data['product_A_category'] = pd.cut(sales_data['product_A'],
bins=3,
labels=['低', '中', '高'])
sales_data['product_B_category'] = pd.cut(sales_data['product_B'],
bins=3,
labels=['低', '中', '高'])
# 产品分类交叉表
product_cross = pd.crosstab(sales_data['product_A_category'],
sales_data['product_B_category'],
margins=True)
print("产品A-产品B分类交叉表:")
print(product_cross)
# 3. 带数值的交叉表
print("\n3. 带数值的交叉表:")
# 平均值交叉表
avg_cross = pd.crosstab(sales_data['region'],
sales_data['channel'],
values=sales_data['product_A'],
aggfunc='mean')
print("区域-渠道平均销售额交叉表:")
print(avg_cross.round(2))
# 多个聚合函数
multi_agg_cross = pd.crosstab(sales_data['region'],
sales_data['channel'],
values=sales_data['product_A'],
aggfunc=['mean', 'sum', 'count'])
print("\n多聚合函数交叉表:")
print(multi_agg_cross.round(2))
# 4. 比例交叉表
print("\n4. 比例交叉表:")
# 行比例
row_prop = pd.crosstab(sales_data['region'], sales_data['channel'], normalize='index')
print("行比例交叉表:")
print(row_prop.round(3))
# 列比例
col_prop = pd.crosstab(sales_data['region'], sales_data['channel'], normalize='columns')
print("\n列比例交叉表:")
print(col_prop.round(3))
# 总比例
total_prop = pd.crosstab(sales_data['region'], sales_data['channel'], normalize='all')
print("\n总比例交叉表:")
print(total_prop.round(3))
# 5. 交叉表可视化
print("\n5. 交叉表可视化:")
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 热力图
sns.heatmap(cross_tab1, annot=True, fmt='d', cmap='Blues', ax=axes[0,0])
axes[0,0].set_title('区域-渠道交叉表热力图')
# 堆叠柱状图
cross_tab1.plot(kind='bar', stacked=True, ax=axes[0,1], title='区域-渠道堆叠柱状图')
axes[0,1].tick_params(axis='x', rotation=45)
# 比例堆叠图
row_prop.plot(kind='bar', stacked=True, ax=axes[1,0], title='区域-渠道比例图')
axes[1,0].set_ylabel('比例')
axes[1,0].tick_params(axis='x', rotation=45)
# 分组柱状图
cross_tab1.plot(kind='bar', ax=axes[1,1], title='区域-渠道分组柱状图')
axes[1,1].tick_params(axis='x', rotation=45)
plt.tight_layout()
plt.show()
9.3.3 高级统计分析
print("\n=== 高级统计分析 ===")
# 1. 偏度和峰度分析
print("1. 偏度和峰度分析:")
from scipy.stats import skew, kurtosis, jarque_bera
# 计算偏度和峰度
skewness_stats = pd.DataFrame({
'skewness': sales_data[numeric_cols].skew(),
'kurtosis': sales_data[numeric_cols].kurtosis(),
'jarque_bera_stat': [jarque_bera(sales_data[col].dropna())[0] for col in numeric_cols],
'jarque_bera_pvalue': [jarque_bera(sales_data[col].dropna())[1] for col in numeric_cols]
})
print("偏度和峰度统计:")
print(skewness_stats.round(4))
# 解释偏度和峰度
def interpret_skewness(skew_val):
if abs(skew_val) < 0.5:
return "近似对称"
elif skew_val > 0.5:
return "右偏"
else:
return "左偏"
def interpret_kurtosis(kurt_val):
if abs(kurt_val) < 0.5:
return "近似正态"
elif kurt_val > 0.5:
return "尖峰"
else:
return "平峰"
interpretation = pd.DataFrame({
'skewness_interpretation': [interpret_skewness(s) for s in skewness_stats['skewness']],
'kurtosis_interpretation': [interpret_kurtosis(k) for k in skewness_stats['kurtosis']],
'normal_test': ['正态' if p > 0.05 else '非正态' for p in skewness_stats['jarque_bera_pvalue']]
}, index=numeric_cols)
print("\n分布特征解释:")
print(interpretation)
# 2. 异常值检测
print("\n2. 异常值检测:")
def detect_outliers_multiple_methods(data):
"""多种方法检测异常值"""
results = pd.DataFrame(index=data.index)
# Z-score方法
z_scores = np.abs(stats.zscore(data.dropna()))
results['z_score_outlier'] = z_scores > 3
# IQR方法
Q1 = data.quantile(0.25)
Q3 = data.quantile(0.75)
IQR = Q3 - Q1
results['iqr_outlier'] = (data < Q1 - 1.5 * IQR) | (data > Q3 + 1.5 * IQR)
# 修正Z-score方法
median = data.median()
mad = np.median(np.abs(data - median))
modified_z_scores = 0.6745 * (data - median) / mad
results['modified_z_outlier'] = np.abs(modified_z_scores) > 3.5
return results
# 对每个产品检测异常值
outlier_results = {}
for col in numeric_cols:
outlier_results[col] = detect_outliers_multiple_methods(sales_data[col])
# 汇总异常值统计
outlier_summary = pd.DataFrame({
col: {
'z_score_outliers': outlier_results[col]['z_score_outlier'].sum(),
'iqr_outliers': outlier_results[col]['iqr_outlier'].sum(),
'modified_z_outliers': outlier_results[col]['modified_z_outlier'].sum()
} for col in numeric_cols
}).T
print("异常值检测汇总:")
print(outlier_summary)
# 3. 相关性深度分析
print("\n3. 相关性深度分析:")
# Pearson相关系数
pearson_corr = sales_data[numeric_cols].corr(method='pearson')
# Spearman相关系数
spearman_corr = sales_data[numeric_cols].corr(method='spearman')
# Kendall相关系数
kendall_corr = sales_data[numeric_cols].corr(method='kendall')
print("Pearson相关系数:")
print(pearson_corr.round(3))
print("\nSpearman相关系数:")
print(spearman_corr.round(3))
print("\nKendall相关系数:")
print(kendall_corr.round(3))
# 相关性显著性检验
def correlation_significance_test(data):
"""相关性显著性检验"""
from scipy.stats import pearsonr
cols = data.columns
n = len(cols)
corr_matrix = np.zeros((n, n))
p_matrix = np.zeros((n, n))
for i in range(n):
for j in range(n):
if i != j:
corr, p_val = pearsonr(data.iloc[:, i].dropna(), data.iloc[:, j].dropna())
corr_matrix[i, j] = corr
p_matrix[i, j] = p_val
else:
corr_matrix[i, j] = 1.0
p_matrix[i, j] = 0.0
return pd.DataFrame(corr_matrix, index=cols, columns=cols), pd.DataFrame(p_matrix, index=cols, columns=cols)
corr_matrix, p_matrix = correlation_significance_test(sales_data[numeric_cols])
print("\n相关性P值矩阵:")
print(p_matrix.round(4))
# 4. 时间序列统计特性
print("\n4. 时间序列统计特性:")
# 平稳性检验
from statsmodels.tsa.stattools import adfuller
def stationarity_test(timeseries):
"""平稳性检验"""
result = adfuller(timeseries.dropna())
return {
'ADF_Statistic': result[0],
'p_value': result[1],
'Critical_Values': result[4],
'is_stationary': result[1] < 0.05
}
stationarity_results = {}
for col in numeric_cols:
stationarity_results[col] = stationarity_test(sales_data[col])
stationarity_df = pd.DataFrame(stationarity_results).T
print("平稳性检验结果:")
print(stationarity_df[['ADF_Statistic', 'p_value', 'is_stationary']])
# 自相关分析
from statsmodels.tsa.stattools import acf, pacf
def autocorrelation_analysis(data, lags=20):
"""自相关分析"""
autocorr = acf(data.dropna(), nlags=lags)
partial_autocorr = pacf(data.dropna(), nlags=lags)
return pd.DataFrame({
'autocorrelation': autocorr,
'partial_autocorrelation': partial_autocorr
})
# 对产品A进行自相关分析
autocorr_results = autocorrelation_analysis(sales_data['product_A'])
print("\n产品A自相关分析(前10期):")
print(autocorr_results.head(10).round(4))
# 可视化自相关
fig, axes = plt.subplots(2, 1, figsize=(12, 8))
# 自相关图
axes[0].plot(autocorr_results.index, autocorr_results['autocorrelation'], 'o-')
axes[0].axhline(y=0, color='red', linestyle='--')
axes[0].axhline(y=0.05, color='red', linestyle='--', alpha=0.5)
axes[0].axhline(y=-0.05, color='red', linestyle='--', alpha=0.5)
axes[0].set_title('自相关函数')
axes[0].set_xlabel('滞后期')
axes[0].set_ylabel('自相关系数')
# 偏自相关图
axes[1].plot(autocorr_results.index, autocorr_results['partial_autocorrelation'], 'o-')
axes[1].axhline(y=0, color='red', linestyle='--')
axes[1].axhline(y=0.05, color='red', linestyle='--', alpha=0.5)
axes[1].axhline(y=-0.05, color='red', linestyle='--', alpha=0.5)
axes[1].set_title('偏自相关函数')
axes[1].set_xlabel('滞后期')
axes[1].set_ylabel('偏自相关系数')
plt.tight_layout()
plt.show()
9.4 分布分析
9.4.1 正态性检验
print("\n=== 正态性检验 ===")
from scipy import stats
import scipy.stats as stats
# 1. 多种正态性检验方法
print("1. 多种正态性检验方法:")
def comprehensive_normality_test(data):
"""综合正态性检验"""
data_clean = data.dropna()
tests = {}
# Shapiro-Wilk检验
if len(data_clean) <= 5000: # Shapiro-Wilk对大样本不适用
shapiro_stat, shapiro_p = stats.shapiro(data_clean)
tests['Shapiro-Wilk'] = {'statistic': shapiro_stat, 'p_value': shapiro_p}
# Kolmogorov-Smirnov检验
ks_stat, ks_p = stats.kstest(data_clean, 'norm', args=(data_clean.mean(), data_clean.std()))
tests['Kolmogorov-Smirnov'] = {'statistic': ks_stat, 'p_value': ks_p}
# Anderson-Darling检验
ad_stat, ad_critical, ad_significance = stats.anderson(data_clean, dist='norm')
tests['Anderson-Darling'] = {'statistic': ad_stat, 'critical_values': ad_critical}
# Jarque-Bera检验
jb_stat, jb_p = stats.jarque_bera(data_clean)
tests['Jarque-Bera'] = {'statistic': jb_stat, 'p_value': jb_p}
# D'Agostino检验
dag_stat, dag_p = stats.normaltest(data_clean)
tests["D'Agostino"] = {'statistic': dag_stat, 'p_value': dag_p}
return tests
# 对每个产品进行正态性检验
normality_results = {}
for col in numeric_cols:
normality_results[col] = comprehensive_normality_test(sales_data[col])
# 整理结果
normality_summary = []
for col in numeric_cols:
for test_name, result in normality_results[col].items():
if 'p_value' in result:
normality_summary.append({
'Product': col,
'Test': test_name,
'Statistic': result['statistic'],
'P_value': result['p_value'],
'Is_Normal': 'Yes' if result['p_value'] > 0.05 else 'No'
})
normality_df = pd.DataFrame(normality_summary)
print("正态性检验结果:")
print(normality_df.round(4))
# 2. Q-Q图
print("\n2. Q-Q图:")
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
axes = axes.ravel()
for i, col in enumerate(numeric_cols):
stats.probplot(sales_data[col].dropna(), dist="norm", plot=axes[i])
axes[i].set_title(f'{col} Q-Q图')
axes[i].grid(True)
# 添加一个综合Q-Q图
stats.probplot(sales_data[numeric_cols].stack().dropna(), dist="norm", plot=axes[3])
axes[3].set_title('所有产品综合 Q-Q图')
axes[3].grid(True)
plt.tight_layout()
plt.show()
# 3. 正态性可视化对比
print("\n3. 正态性可视化对比:")
fig, axes = plt.subplots(3, 3, figsize=(18, 12))
for i, col in enumerate(numeric_cols):
data = sales_data[col].dropna()
# 直方图 + 正态分布拟合
axes[i, 0].hist(data, bins=30, density=True, alpha=0.7, color='skyblue')
mu, sigma = data.mean(), data.std()
x = np.linspace(data.min(), data.max(), 100)
axes[i, 0].plot(x, stats.norm.pdf(x, mu, sigma), 'r-', linewidth=2, label='正态分布拟合')
axes[i, 0].set_title(f'{col} 分布 vs 正态分布')
axes[i, 0].legend()
# 密度图对比
data.plot(kind='density', ax=axes[i, 1], label='实际分布')
pd.Series(np.random.normal(mu, sigma, len(data))).plot(kind='density', ax=axes[i, 1], label='正态分布')
axes[i, 1].set_title(f'{col} 密度对比')
axes[i, 1].legend()
# 累积分布函数对比
sorted_data = np.sort(data)
y_empirical = np.arange(1, len(sorted_data) + 1) / len(sorted_data)
y_theoretical = stats.norm.cdf(sorted_data, mu, sigma)
axes[i, 2].plot(sorted_data, y_empirical, label='经验分布')
axes[i, 2].plot(sorted_data, y_theoretical, label='理论正态分布')
axes[i, 2].set_title(f'{col} 累积分布对比')
axes[i, 2].legend()
plt.tight_layout()
plt.show()
9.4.2 分布拟合
print("\n=== 分布拟合 ===")
from scipy.stats import norm, lognorm, gamma, beta, expon, weibull_min
# 1. 多种分布拟合
print("1. 多种分布拟合:")
def fit_distributions(data):
"""拟合多种分布"""
data_clean = data.dropna()
distributions = {
'normal': stats.norm,
'lognormal': stats.lognorm,
'gamma': stats.gamma,
'exponential': stats.expon,
'weibull': stats.weibull_min,
'beta': stats.beta
}
results = {}
for dist_name, distribution in distributions.items():
try:
# 拟合参数
if dist_name == 'beta':
# Beta分布需要数据在[0,1]范围内
normalized_data = (data_clean - data_clean.min()) / (data_clean.max() - data_clean.min())
params = distribution.fit(normalized_data)
# 计算AIC和BIC
log_likelihood = np.sum(distribution.logpdf(normalized_data, *params))
else:
params = distribution.fit(data_clean)
log_likelihood = np.sum(distribution.logpdf(data_clean, *params))
k = len(params) # 参数个数
n = len(data_clean) # 样本大小
aic = 2 * k - 2 * log_likelihood
bic = k * np.log(n) - 2 * log_likelihood
# Kolmogorov-Smirnov检验
if dist_name == 'beta':
ks_stat, ks_p = stats.kstest(normalized_data, lambda x: distribution.cdf(x, *params))
else:
ks_stat, ks_p = stats.kstest(data_clean, lambda x: distribution.cdf(x, *params))
results[dist_name] = {
'params': params,
'aic': aic,
'bic': bic,
'ks_statistic': ks_stat,
'ks_p_value': ks_p,
'log_likelihood': log_likelihood
}
except:
results[dist_name] = None
return results
# 对产品A进行分布拟合
dist_results = fit_distributions(sales_data['product_A'])
# 整理拟合结果
fit_summary = []
for dist_name, result in dist_results.items():
if result is not None:
fit_summary.append({
'Distribution': dist_name,
'AIC': result['aic'],
'BIC': result['bic'],
'KS_Statistic': result['ks_statistic'],
'KS_P_value': result['ks_p_value'],
'Log_Likelihood': result['log_likelihood']
})
fit_df = pd.DataFrame(fit_summary)
fit_df = fit_df.sort_values('AIC') # 按AIC排序,越小越好
print("产品A分布拟合结果(按AIC排序):")
print(fit_df.round(4))
# 2. 最佳分布可视化
print("\n2. 最佳分布可视化:")
best_dist_name = fit_df.iloc[0]['Distribution']
best_dist_params = dist_results[best_dist_name]['params']
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
data = sales_data['product_A'].dropna()
# 直方图 + 拟合分布
axes[0, 0].hist(data, bins=30, density=True, alpha=0.7, color='skyblue', label='实际数据')
x = np.linspace(data.min(), data.max(), 100)
if best_dist_name == 'normal':
y = stats.norm.pdf(x, *best_dist_params)
elif best_dist_name == 'lognormal':
y = stats.lognorm.pdf(x, *best_dist_params)
elif best_dist_name == 'gamma':
y = stats.gamma.pdf(x, *best_dist_params)
elif best_dist_name == 'exponential':
y = stats.expon.pdf(x, *best_dist_params)
elif best_dist_name == 'weibull':
y = stats.weibull_min.pdf(x, *best_dist_params)
axes[0, 0].plot(x, y, 'r-', linewidth=2, label=f'{best_dist_name}分布拟合')
axes[0, 0].set_title(f'产品A最佳拟合分布: {best_dist_name}')
axes[0, 0].legend()
# Q-Q图
if best_dist_name == 'normal':
stats.probplot(data, dist=stats.norm, sparams=best_dist_params, plot=axes[0, 1])
elif best_dist_name == 'lognormal':
stats.probplot(data, dist=stats.lognorm, sparams=best_dist_params, plot=axes[0, 1])
axes[0, 1].set_title(f'{best_dist_name}分布 Q-Q图')
# 累积分布函数对比
sorted_data = np.sort(data)
empirical_cdf = np.arange(1, len(sorted_data) + 1) / len(sorted_data)
if best_dist_name == 'normal':
theoretical_cdf = stats.norm.cdf(sorted_data, *best_dist_params)
elif best_dist_name == 'lognormal':
theoretical_cdf = stats.lognorm.cdf(sorted_data, *best_dist_params)
elif best_dist_name == 'gamma':
theoretical_cdf = stats.gamma.cdf(sorted_data, *best_dist_params)
axes[1, 0].plot(sorted_data, empirical_cdf, label='经验CDF')
axes[1, 0].plot(sorted_data, theoretical_cdf, label=f'{best_dist_name} CDF')
axes[1, 0].set_title('累积分布函数对比')
axes[1, 0].legend()
# 残差图
residuals = empirical_cdf - theoretical_cdf
axes[1, 1].plot(sorted_data, residuals, 'o-', alpha=0.6)
axes[1, 1].axhline(y=0, color='red', linestyle='--')
axes[1, 1].set_title('CDF残差图')
axes[1, 1].set_xlabel('数据值')
axes[1, 1].set_ylabel('残差')
plt.tight_layout()
plt.show()
# 3. 分布参数解释
print("\n3. 分布参数解释:")
def interpret_distribution_params(dist_name, params):
"""解释分布参数"""
if dist_name == 'normal':
return f"均值: {params[0]:.2f}, 标准差: {params[1]:.2f}"
elif dist_name == 'lognormal':
return f"形状参数: {params[0]:.2f}, 位置参数: {params[1]:.2f}, 尺度参数: {params[2]:.2f}"
elif dist_name == 'gamma':
return f"形状参数: {params[0]:.2f}, 位置参数: {params[1]:.2f}, 尺度参数: {params[2]:.2f}"
elif dist_name == 'exponential':
return f"位置参数: {params[0]:.2f}, 尺度参数: {params[1]:.2f}"
elif dist_name == 'weibull':
return f"形状参数: {params[0]:.2f}, 位置参数: {params[1]:.2f}, 尺度参数: {params[2]:.2f}"
else:
return f"参数: {params}"
print(f"最佳分布 ({best_dist_name}) 参数解释:")
print(interpret_distribution_params(best_dist_name, best_dist_params))
# 4. 分布比较
print("\n4. 分布比较:")
# 选择前3个最佳分布进行比较
top_3_dists = fit_df.head(3)
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 概率密度函数对比
axes[0, 0].hist(data, bins=30, density=True, alpha=0.7, color='lightgray', label='实际数据')
x = np.linspace(data.min(), data.max(), 100)
colors = ['red', 'blue', 'green']
for i, (_, row) in enumerate(top_3_dists.iterrows()):
dist_name = row['Distribution']
params = dist_results[dist_name]['params']
if dist_name == 'normal':
y = stats.norm.pdf(x, *params)
elif dist_name == 'lognormal':
y = stats.lognorm.pdf(x, *params)
elif dist_name == 'gamma':
y = stats.gamma.pdf(x, *params)
elif dist_name == 'exponential':
y = stats.expon.pdf(x, *params)
elif dist_name == 'weibull':
y = stats.weibull_min.pdf(x, *params)
axes[0, 0].plot(x, y, color=colors[i], linewidth=2, label=f'{dist_name} (AIC: {row["AIC"]:.1f})')
axes[0, 0].set_title('前3个最佳分布对比')
axes[0, 0].legend()
# AIC/BIC对比
top_3_dists.set_index('Distribution')[['AIC', 'BIC']].plot(kind='bar', ax=axes[0, 1])
axes[0, 1].set_title('AIC/BIC对比')
axes[0, 1].tick_params(axis='x', rotation=45)
# KS检验结果
top_3_dists.plot(x='Distribution', y='KS_P_value', kind='bar', ax=axes[1, 0])
axes[1, 0].axhline(y=0.05, color='red', linestyle='--', label='显著性水平')
axes[1, 0].set_title('KS检验P值')
axes[1, 0].set_ylabel('P值')
axes[1, 0].legend()
axes[1, 0].tick_params(axis='x', rotation=45)
# 对数似然值对比
top_3_dists.plot(x='Distribution', y='Log_Likelihood', kind='bar', ax=axes[1, 1])
axes[1, 1].set_title('对数似然值对比')
axes[1, 1].set_ylabel('对数似然值')
axes[1, 1].tick_params(axis='x', rotation=45)
plt.tight_layout()
plt.show()
9.4.3 异常值深度分析
print("\n=== 异常值深度分析 ===")
# 1. 多维异常值检测
print("1. 多维异常值检测:")
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.covariance import EllipticEnvelope
# 准备多维数据
multi_data = sales_data[numeric_cols].dropna()
# Isolation Forest
iso_forest = IsolationForest(contamination=0.1, random_state=42)
iso_outliers = iso_forest.fit_predict(multi_data)
# Local Outlier Factor
lof = LocalOutlierFactor(n_neighbors=20, contamination=0.1)
lof_outliers = lof.fit_predict(multi_data)
# Elliptic Envelope
elliptic = EllipticEnvelope(contamination=0.1, random_state=42)
elliptic_outliers = elliptic.fit_predict(multi_data)
# 汇总结果
outlier_comparison = pd.DataFrame({
'Isolation_Forest': iso_outliers,
'Local_Outlier_Factor': lof_outliers,
'Elliptic_Envelope': elliptic_outliers
}, index=multi_data.index)
# 统计异常值数量
outlier_counts = pd.DataFrame({
'Method': ['Isolation Forest', 'Local Outlier Factor', 'Elliptic Envelope'],
'Outlier_Count': [
(iso_outliers == -1).sum(),
(lof_outliers == -1).sum(),
(elliptic_outliers == -1).sum()
],
'Outlier_Percentage': [
(iso_outliers == -1).mean() * 100,
(lof_outliers == -1).mean() * 100,
(elliptic_outliers == -1).mean() * 100
]
})
print("多维异常值检测结果:")
print(outlier_counts)
# 2. 异常值可视化
print("\n2. 异常值可视化:")
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# Isolation Forest结果
normal_points = multi_data[iso_outliers == 1]
outlier_points = multi_data[iso_outliers == -1]
axes[0, 0].scatter(normal_points['product_A'], normal_points['product_B'],
c='blue', alpha=0.6, label='正常点')
axes[0, 0].scatter(outlier_points['product_A'], outlier_points['product_B'],
c='red', alpha=0.8, label='异常点')
axes[0, 0].set_title('Isolation Forest异常检测')
axes[0, 0].set_xlabel('产品A')
axes[0, 0].set_ylabel('产品B')
axes[0, 0].legend()
# LOF结果
normal_points_lof = multi_data[lof_outliers == 1]
outlier_points_lof = multi_data[lof_outliers == -1]
axes[0, 1].scatter(normal_points_lof['product_A'], normal_points_lof['product_C'],
c='blue', alpha=0.6, label='正常点')
axes[0, 1].scatter(outlier_points_lof['product_A'], outlier_points_lof['product_C'],
c='red', alpha=0.8, label='异常点')
axes[0, 1].set_title('Local Outlier Factor异常检测')
axes[0, 1].set_xlabel('产品A')
axes[0, 1].set_ylabel('产品C')
axes[0, 1].legend()
# 异常值时间分布
outlier_dates = multi_data.index[iso_outliers == -1]
outlier_time_dist = pd.Series(outlier_dates).dt.month.value_counts().sort_index()
outlier_time_dist.plot(kind='bar', ax=axes[1, 0], title='异常值月度分布')
axes[1, 0].set_xlabel('月份')
axes[1, 0].set_ylabel('异常值数量')
# 异常值方法一致性
consistency = pd.DataFrame({
'All_Methods': (iso_outliers == -1) & (lof_outliers == -1) & (elliptic_outliers == -1),
'Two_Methods': ((iso_outliers == -1) & (lof_outliers == -1)) |
((iso_outliers == -1) & (elliptic_outliers == -1)) |
((lof_outliers == -1) & (elliptic_outliers == -1)),
'One_Method': (iso_outliers == -1) | (lof_outliers == -1) | (elliptic_outliers == -1)
})
consistency_counts = consistency.sum()
consistency_counts.plot(kind='bar', ax=axes[1, 1], title='异常值检测方法一致性')
axes[1, 1].set_ylabel('异常值数量')
axes[1, 1].tick_params(axis='x', rotation=45)
plt.tight_layout()
plt.show()
# 3. 异常值影响分析
print("\n3. 异常值影响分析:")
# 计算有无异常值的统计差异
original_stats = multi_data.describe()
cleaned_data = multi_data[iso_outliers == 1] # 移除异常值
cleaned_stats = cleaned_data.describe()
# 统计差异对比
stats_comparison = pd.DataFrame({
'Original_Mean': original_stats.loc['mean'],
'Cleaned_Mean': cleaned_stats.loc['mean'],
'Original_Std': original_stats.loc['std'],
'Cleaned_Std': cleaned_stats.loc['std'],
'Mean_Change_%': ((cleaned_stats.loc['mean'] - original_stats.loc['mean']) / original_stats.loc['mean'] * 100),
'Std_Change_%': ((cleaned_stats.loc['std'] - original_stats.loc['std']) / original_stats.loc['std'] * 100)
})
print("异常值对统计量的影响:")
print(stats_comparison.round(3))
# 相关性变化
original_corr = multi_data.corr()
cleaned_corr = cleaned_data.corr()
corr_change = cleaned_corr - original_corr
print("\n相关性变化矩阵:")
print(corr_change.round(4))
# 4. 异常值处理策略
print("\n4. 异常值处理策略:")
def handle_outliers_multiple_methods(data, outlier_mask):
"""多种异常值处理方法"""
results = {}
# 方法1:删除异常值
results['删除'] = data[~outlier_mask]
# 方法2:用中位数替换
data_median = data.copy()
for col in data.columns:
data_median.loc[outlier_mask, col] = data[col].median()
results['中位数替换'] = data_median
# 方法3:用分位数替换
data_quantile = data.copy()
for col in data.columns:
Q1 = data[col].quantile(0.25)
Q3 = data[col].quantile(0.75)
data_quantile.loc[outlier_mask & (data[col] < Q1), col] = Q1
data_quantile.loc[outlier_mask & (data[col] > Q3), col] = Q3
results['分位数替换'] = data_quantile
# 方法4:Winsorizing
from scipy.stats.mstats import winsorize
data_winsor = data.copy()
for col in data.columns:
data_winsor[col] = winsorize(data[col], limits=[0.05, 0.05])
results['Winsorizing'] = data_winsor
return results
# 应用不同处理方法
outlier_mask = iso_outliers == -1
processed_data = handle_outliers_multiple_methods(multi_data, outlier_mask)
# 比较处理效果
processing_comparison = pd.DataFrame({
method: {
'Mean_A': data['product_A'].mean(),
'Std_A': data['product_A'].std(),
'Skew_A': data['product_A'].skew(),
'Kurt_A': data['product_A'].kurtosis(),
'Sample_Size': len(data)
} for method, data in processed_data.items()
}).T
# 添加原始数据
processing_comparison.loc['原始数据'] = {
'Mean_A': multi_data['product_A'].mean(),
'Std_A': multi_data['product_A'].std(),
'Skew_A': multi_data['product_A'].skew(),
'Kurt_A': multi_data['product_A'].kurtosis(),
'Sample_Size': len(multi_data)
}
print("异常值处理方法效果对比:")
print(processing_comparison.round(4))
# 可视化处理效果
fig, axes = plt.subplots(2, 3, figsize=(18, 10))
axes = axes.ravel()
methods = ['原始数据'] + list(processed_data.keys())
colors = ['black', 'red', 'blue', 'green', 'orange']
for i, method in enumerate(methods):
if method == '原始数据':
data_to_plot = multi_data['product_A']
else:
data_to_plot = processed_data[method]['product_A']
axes[i].hist(data_to_plot, bins=30, alpha=0.7, color=colors[i])
axes[i].set_title(f'{method}\n均值: {data_to_plot.mean():.1f}, 标准差: {data_to_plot.std():.1f}')
axes[i].set_xlabel('产品A销售额')
axes[i].set_ylabel('频次')
# 最后一个子图显示所有方法的密度对比
for i, method in enumerate(methods):
if method == '原始数据':
data_to_plot = multi_data['product_A']
else:
data_to_plot = processed_data[method]['product_A']
data_to_plot.plot(kind='density', ax=axes[5], label=method, alpha=0.7)
axes[5].set_title('所有处理方法密度对比')
axes[5].legend()
plt.tight_layout()
plt.show()
9.5 相关性分析
9.5.1 相关系数深度分析
print("\n=== 相关系数深度分析 ===")
# 1. 多种相关系数计算
print("1. 多种相关系数计算:")
# Pearson相关系数(线性相关)
pearson_corr = sales_data[numeric_cols].corr(method='pearson')
# Spearman相关系数(单调相关)
spearman_corr = sales_data[numeric_cols].corr(method='spearman')
# Kendall相关系数(秩相关)
kendall_corr = sales_data[numeric_cols].corr(method='kendall')
# 距离相关系数(非线性相关)
def distance_correlation(x, y):
"""计算距离相关系数"""
from scipy.spatial.distance import pdist, squareform
def _distance_covariance(x, y):
n = len(x)
a = squareform(pdist(x.reshape(-1, 1)))
b = squareform(pdist(y.reshape(-1, 1)))
A = a - a.mean(axis=0)[None, :] - a.mean(axis=1)[:, None] + a.mean()
B = b - b.mean(axis=0)[None, :] - b.mean(axis=1)[:, None] + b.mean()
dcov_xy = (A * B).sum() / (n * n)
dcov_xx = (A * A).sum() / (n * n)
dcov_yy = (B * B).sum() / (n * n)
return dcov_xy, dcov_xx, dcov_yy
dcov_xy, dcov_xx, dcov_yy = _distance_covariance(x, y)
if dcov_xx > 0 and dcov_yy > 0:
return dcov_xy / np.sqrt(dcov_xx * dcov_yy)
else:
return 0
# 计算距离相关矩阵
distance_corr = pd.DataFrame(index=numeric_cols, columns=numeric_cols)
for i, col1 in enumerate(numeric_cols):
for j, col2 in enumerate(numeric_cols):
if i == j:
distance_corr.loc[col1, col2] = 1.0
else:
x = sales_data[col1].dropna().values
y = sales_data[col2].dropna().values
# 确保两个序列长度相同
min_len = min(len(x), len(y))
distance_corr.loc[col1, col2] = distance_correlation(x[:min_len], y[:min_len])
distance_corr = distance_corr.astype(float)
print("Pearson相关系数矩阵:")
print(pearson_corr.round(4))
print("\nSpearman相关系数矩阵:")
print(spearman_corr.round(4))
print("\nKendall相关系数矩阵:")
print(kendall_corr.round(4))
print("\n距离相关系数矩阵:")
print(distance_corr.round(4))
# 2. 相关系数可视化对比
print("\n2. 相关系数可视化对比:")
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
# Pearson相关热力图
sns.heatmap(pearson_corr, annot=True, cmap='coolwarm', center=0,
square=True, ax=axes[0,0], cbar_kws={'shrink': 0.8})
axes[0,0].set_title('Pearson相关系数')
# Spearman相关热力图
sns.heatmap(spearman_corr, annot=True, cmap='coolwarm', center=0,
square=True, ax=axes[0,1], cbar_kws={'shrink': 0.8})
axes[0,1].set_title('Spearman相关系数')
# Kendall相关热力图
sns.heatmap(kendall_corr, annot=True, cmap='coolwarm', center=0,
square=True, ax=axes[1,0], cbar_kws={'shrink': 0.8})
axes[1,0].set_title('Kendall相关系数')
# 距离相关热力图
sns.heatmap(distance_corr, annot=True, cmap='coolwarm', center=0,
square=True, ax=axes[1,1], cbar_kws={'shrink': 0.8})
axes[1,1].set_title('距离相关系数')
plt.tight_layout()
plt.show()
# 3. 相关系数差异分析
print("\n3. 相关系数差异分析:")
# 计算不同相关系数之间的差异
corr_diff_analysis = pd.DataFrame({
'Pearson_vs_Spearman': (pearson_corr - spearman_corr).abs().values.flatten(),
'Pearson_vs_Kendall': (pearson_corr - kendall_corr).abs().values.flatten(),
'Spearman_vs_Kendall': (spearman_corr - kendall_corr).abs().values.flatten(),
'Pearson_vs_Distance': (pearson_corr - distance_corr).abs().values.flatten()
})
# 移除对角线元素(自相关)
mask = np.eye(len(numeric_cols), dtype=bool).flatten()
corr_diff_analysis = corr_diff_analysis[~mask]
print("相关系数差异统计:")
print(corr_diff_analysis.describe().round(4))
# 可视化差异分布
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
for i, col in enumerate(corr_diff_analysis.columns):
ax = axes[i//2, i%2]
corr_diff_analysis[col].hist(bins=20, ax=ax, alpha=0.7)
ax.set_title(f'{col}差异分布')
ax.set_xlabel('绝对差异')
ax.set_ylabel('频次')
plt.tight_layout()
plt.show()
9.5.2 偏相关分析
print("\n=== 偏相关分析 ===")
# 1. 偏相关系数计算
print("1. 偏相关系数计算:")
def partial_correlation(data, x, y, control_vars):
"""计算偏相关系数"""
from sklearn.linear_model import LinearRegression
# 准备数据
X = data[control_vars].values
x_vals = data[x].values
y_vals = data[y].values
# 对x和y分别进行回归,控制其他变量
reg_x = LinearRegression().fit(X, x_vals)
reg_y = LinearRegression().fit(X, y_vals)
# 计算残差
residual_x = x_vals - reg_x.predict(X)
residual_y = y_vals - reg_y.predict(X)
# 计算残差的相关系数
return np.corrcoef(residual_x, residual_y)[0, 1]
# 计算所有变量对的偏相关系数
partial_corr_matrix = pd.DataFrame(index=numeric_cols, columns=numeric_cols)
for i, var1 in enumerate(numeric_cols):
for j, var2 in enumerate(numeric_cols):
if i == j:
partial_corr_matrix.loc[var1, var2] = 1.0
else:
# 控制变量为除了var1和var2之外的所有变量
control_vars = [v for v in numeric_cols if v not in [var1, var2]]
if control_vars: # 确保有控制变量
partial_corr = partial_correlation(sales_data.dropna(), var1, var2, control_vars)
partial_corr_matrix.loc[var1, var2] = partial_corr
else:
partial_corr_matrix.loc[var1, var2] = pearson_corr.loc[var1, var2]
partial_corr_matrix = partial_corr_matrix.astype(float)
print("偏相关系数矩阵:")
print(partial_corr_matrix.round(4))
# 2. 偏相关与简单相关对比
print("\n2. 偏相关与简单相关对比:")
# 创建对比表
correlation_comparison = pd.DataFrame({
'Simple_Correlation': pearson_corr.values.flatten(),
'Partial_Correlation': partial_corr_matrix.values.flatten()
})
# 移除对角线元素
mask = np.eye(len(numeric_cols), dtype=bool).flatten()
correlation_comparison = correlation_comparison[~mask]
# 计算差异
correlation_comparison['Difference'] = (correlation_comparison['Simple_Correlation'] -
correlation_comparison['Partial_Correlation'])
print("简单相关与偏相关对比:")
print(correlation_comparison.describe().round(4))
# 可视化对比
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 散点图对比
axes[0, 0].scatter(correlation_comparison['Simple_Correlation'],
correlation_comparison['Partial_Correlation'], alpha=0.7)
axes[0, 0].plot([-1, 1], [-1, 1], 'r--', alpha=0.8)
axes[0, 0].set_xlabel('简单相关系数')
axes[0, 0].set_ylabel('偏相关系数')
axes[0, 0].set_title('简单相关 vs 偏相关')
# 差异分布
correlation_comparison['Difference'].hist(bins=20, ax=axes[0, 1], alpha=0.7)
axes[0, 1].set_xlabel('差异 (简单相关 - 偏相关)')
axes[0, 1].set_ylabel('频次')
axes[0, 1].set_title('相关系数差异分布')
# 偏相关热力图
sns.heatmap(partial_corr_matrix, annot=True, cmap='coolwarm', center=0,
square=True, ax=axes[1, 0], cbar_kws={'shrink': 0.8})
axes[1, 0].set_title('偏相关系数热力图')
# 相关系数变化
change_matrix = partial_corr_matrix - pearson_corr
sns.heatmap(change_matrix, annot=True, cmap='RdBu', center=0,
square=True, ax=axes[1, 1], cbar_kws={'shrink': 0.8})
axes[1, 1].set_title('相关系数变化 (偏相关 - 简单相关)')
plt.tight_layout()
plt.show()
9.5.3 动态相关分析
print("\n=== 动态相关分析 ===")
# 1. 滚动相关分析
print("1. 滚动相关分析:")
# 计算滚动相关系数
window_sizes = [30, 60, 90]
rolling_correlations = {}
for window in window_sizes:
rolling_corr = sales_data['product_A'].rolling(window).corr(sales_data['product_B'])
rolling_correlations[f'{window}天'] = rolling_corr
rolling_corr_df = pd.DataFrame(rolling_correlations, index=sales_data.index)
print("滚动相关统计:")
print(rolling_corr_df.describe().round(4))
# 可视化滚动相关
fig, axes = plt.subplots(3, 1, figsize=(15, 12))
# 原始数据
sales_data[['product_A', 'product_B']].plot(ax=axes[0], title='产品A和产品B销售趋势')
axes[0].set_ylabel('销售额')
# 滚动相关
rolling_corr_df.plot(ax=axes[1], title='产品A与产品B滚动相关系数')
axes[1].axhline(y=0, color='red', linestyle='--', alpha=0.5)
axes[1].set_ylabel('相关系数')
# 相关系数分布
for window in window_sizes:
rolling_corr_df[f'{window}天'].dropna().hist(bins=30, alpha=0.6,
label=f'{window}天', ax=axes[2])
axes[2].set_xlabel('相关系数')
axes[2].set_ylabel('频次')
axes[2].set_title('滚动相关系数分布')
axes[2].legend()
plt.tight_layout()
plt.show()
# 2. 分段相关分析
print("\n2. 分段相关分析:")
# 按季度分段
quarterly_correlations = []
for quarter in sales_data.index.to_period('Q').unique():
quarter_data = sales_data[sales_data.index.to_period('Q') == quarter]
if len(quarter_data) > 10: # 确保有足够的数据点
corr_matrix = quarter_data[numeric_cols].corr()
quarterly_correlations.append({
'Quarter': str(quarter),
'A_B_Corr': corr_matrix.loc['product_A', 'product_B'],
'A_C_Corr': corr_matrix.loc['product_A', 'product_C'],
'B_C_Corr': corr_matrix.loc['product_B', 'product_C']
})
quarterly_corr_df = pd.DataFrame(quarterly_correlations)
print("季度相关系数:")
print(quarterly_corr_df.round(4))
# 按月份分段
monthly_correlations = []
for month in range(1, 13):
month_data = sales_data[sales_data.index.month == month]
if len(month_data) > 10:
corr_matrix = month_data[numeric_cols].corr()
monthly_correlations.append({
'Month': month,
'A_B_Corr': corr_matrix.loc['product_A', 'product_B'],
'A_C_Corr': corr_matrix.loc['product_A', 'product_C'],
'B_C_Corr': corr_matrix.loc['product_B', 'product_C']
})
monthly_corr_df = pd.DataFrame(monthly_correlations)
print("\n月度相关系数:")
print(monthly_corr_df.round(4))
# 可视化分段相关
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 季度相关趋势
quarterly_corr_df.set_index('Quarter')[['A_B_Corr', 'A_C_Corr', 'B_C_Corr']].plot(
kind='bar', ax=axes[0, 0], title='季度相关系数')
axes[0, 0].tick_params(axis='x', rotation=45)
axes[0, 0].axhline(y=0, color='red', linestyle='--', alpha=0.5)
# 月度相关趋势
monthly_corr_df.set_index('Month')[['A_B_Corr', 'A_C_Corr', 'B_C_Corr']].plot(
ax=axes[0, 1], title='月度相关系数')
axes[0, 1].set_xlabel('月份')
axes[0, 1].axhline(y=0, color='red', linestyle='--', alpha=0.5)
# 相关系数变异性
corr_variability = pd.DataFrame({
'Quarterly_Std': quarterly_corr_df[['A_B_Corr', 'A_C_Corr', 'B_C_Corr']].std(),
'Monthly_Std': monthly_corr_df[['A_B_Corr', 'A_C_Corr', 'B_C_Corr']].std()
})
corr_variability.plot(kind='bar', ax=axes[1, 0], title='相关系数变异性')
axes[1, 0].set_ylabel('标准差')
axes[1, 0].tick_params(axis='x', rotation=45)
# 相关系数稳定性分析
stability_analysis = pd.DataFrame({
'Mean': monthly_corr_df[['A_B_Corr', 'A_C_Corr', 'B_C_Corr']].mean(),
'Std': monthly_corr_df[['A_B_Corr', 'A_C_Corr', 'B_C_Corr']].std(),
'CV': monthly_corr_df[['A_B_Corr', 'A_C_Corr', 'B_C_Corr']].std() /
monthly_corr_df[['A_B_Corr', 'A_C_Corr', 'B_C_Corr']].mean().abs()
})
stability_analysis['CV'].plot(kind='bar', ax=axes[1, 1], title='相关系数变异系数')
axes[1, 1].set_ylabel('变异系数')
axes[1, 1].tick_params(axis='x', rotation=45)
plt.tight_layout()
plt.show()
# 3. 条件相关分析
print("\n3. 条件相关分析:")
# 按区域分组的相关分析
region_correlations = []
for region in sales_data['region'].unique():
region_data = sales_data[sales_data['region'] == region]
corr_matrix = region_data[numeric_cols].corr()
region_correlations.append({
'Region': region,
'A_B_Corr': corr_matrix.loc['product_A', 'product_B'],
'A_C_Corr': corr_matrix.loc['product_A', 'product_C'],
'B_C_Corr': corr_matrix.loc['product_B', 'product_C'],
'Sample_Size': len(region_data)
})
region_corr_df = pd.DataFrame(region_correlations)
print("区域相关系数:")
print(region_corr_df.round(4))
# 按渠道分组的相关分析
channel_correlations = []
for channel in sales_data['channel'].unique():
channel_data = sales_data[sales_data['channel'] == channel]
corr_matrix = channel_data[numeric_cols].corr()
channel_correlations.append({
'Channel': channel,
'A_B_Corr': corr_matrix.loc['product_A', 'product_B'],
'A_C_Corr': corr_matrix.loc['product_A', 'product_C'],
'B_C_Corr': corr_matrix.loc['product_B', 'product_C'],
'Sample_Size': len(channel_data)
})
channel_corr_df = pd.DataFrame(channel_correlations)
print("\n渠道相关系数:")
print(channel_corr_df.round(4))
# 可视化条件相关
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 区域相关对比
region_corr_df.set_index('Region')[['A_B_Corr', 'A_C_Corr', 'B_C_Corr']].plot(
kind='bar', ax=axes[0, 0], title='各区域相关系数')
axes[0, 0].tick_params(axis='x', rotation=45)
axes[0, 0].axhline(y=0, color='red', linestyle='--', alpha=0.5)
# 渠道相关对比
channel_corr_df.set_index('Channel')[['A_B_Corr', 'A_C_Corr', 'B_C_Corr']].plot(
kind='bar', ax=axes[0, 1], title='各渠道相关系数')
axes[0, 1].tick_params(axis='x', rotation=45)
axes[0, 1].axhline(y=0, color='red', linestyle='--', alpha=0.5)
# 相关系数差异分析
overall_corr = sales_data[numeric_cols].corr()
region_diff = pd.DataFrame({
region: {
'A_B_Diff': region_corr_df[region_corr_df['Region']==region]['A_B_Corr'].iloc[0] - overall_corr.loc['product_A', 'product_B'],
'A_C_Diff': region_corr_df[region_corr_df['Region']==region]['A_C_Corr'].iloc[0] - overall_corr.loc['product_A', 'product_C'],
'B_C_Diff': region_corr_df[region_corr_df['Region']==region]['B_C_Corr'].iloc[0] - overall_corr.loc['product_B', 'product_C']
} for region in sales_data['region'].unique()
}).T
region_diff.plot(kind='bar', ax=axes[1, 0], title='区域相关系数与整体差异')
axes[1, 0].tick_params(axis='x', rotation=45)
axes[1, 0].axhline(y=0, color='red', linestyle='--', alpha=0.5)
# 样本量与相关系数关系
axes[1, 1].scatter(region_corr_df['Sample_Size'], region_corr_df['A_B_Corr'],
label='A-B相关', alpha=0.7)
axes[1, 1].scatter(region_corr_df['Sample_Size'], region_corr_df['A_C_Corr'],
label='A-C相关', alpha=0.7)
axes[1, 1].scatter(region_corr_df['Sample_Size'], region_corr_df['B_C_Corr'],
label='B-C相关', alpha=0.7)
axes[1, 1].set_xlabel('样本量')
axes[1, 1].set_ylabel('相关系数')
axes[1, 1].set_title('样本量与相关系数关系')
axes[1, 1].legend()
plt.tight_layout()
plt.show()
9.6 本章小结
9.6.1 核心知识点
Pandas内置可视化
- 基础图表:线图、柱状图、散点图、饼图
- 分布图:直方图、箱线图、密度图
- 时间序列可视化:趋势图、季节性分析
- 多变量可视化:相关性矩阵、散点图矩阵
统计描述分析
- 描述性统计:均值、中位数、标准差、偏度、峰度
- 交叉表分析:分类变量关系分析
- 分组统计:按不同维度进行统计汇总
分布分析
- 正态性检验:多种检验方法的应用
- 分布拟合:最佳分布选择和参数估计
- 异常值检测:多维异常值检测和处理策略
相关性分析
- 多种相关系数:Pearson、Spearman、Kendall、距离相关
- 偏相关分析:控制其他变量的相关性
- 动态相关分析:时间变化的相关性模式
9.6.2 最佳实践
可视化设计原则
- 选择合适的图表类型
- 保持图表简洁清晰
- 使用一致的颜色和样式
- 添加必要的标签和说明
统计分析流程
- 先进行探索性数据分析
- 检查数据质量和分布特征
- 选择合适的统计方法
- 验证分析结果的可靠性
异常值处理策略
- 使用多种方法检测异常值
- 分析异常值的成因
- 根据业务场景选择处理方法
- 评估处理效果
9.6.3 性能优化要点
大数据可视化
- 使用采样技术减少数据量
- 选择高效的绘图库
- 避免过度复杂的图表
统计计算优化
- 利用向量化操作
- 使用适当的数据类型
- 缓存重复计算结果
9.6.4 常见陷阱
相关性误解
- 相关性不等于因果性
- 注意虚假相关
- 考虑非线性关系
统计检验误用
- 检查假设条件
- 注意多重比较问题
- 理解p值的含义
9.6.5 下一步学习
- 学习高级机器学习算法
- 掌握大数据处理技术
- 深入时间序列分析
- 探索因果推断方法
9.6.6 练习题
基础练习
- 创建一个包含多种图表类型的数据分析报告
- 对给定数据集进行全面的描述性统计分析
- 使用多种方法检测和处理异常值
进阶练习
- 实现自定义的相关性分析函数
- 构建交互式数据可视化仪表板
- 进行多变量分布拟合和模型选择
实战项目
- 分析真实业务数据的统计特征
- 构建数据质量监控系统
- 开发自动化的异常检测算法
通过本章的学习,你已经掌握了Pandas数据可视化和统计分析的核心技能。这些技能将帮助你更好地理解数据特征,发现数据中的模式和异常,为后续的机器学习和深度分析奠定坚实基础。