10.1 章节概述
本章将深入探讨Pandas的高级应用技巧和性能优化策略。我们将学习如何处理大规模数据集、优化内存使用、实现并行处理,以及掌握一些高级的数据处理技术。这些技能对于处理真实世界的大数据项目至关重要。
graph TD
A[高级应用与性能优化] --> B[内存优化技术]
A --> C[性能分析与监控]
A --> D[并行处理与分布式计算]
A --> E[高级数据处理技术]
A --> F[大数据处理策略]
A --> G[生产环境最佳实践]
B --> B1[数据类型优化]
B --> B2[内存映射]
B --> B3[分块处理]
C --> C1[性能分析工具]
C --> C2[瓶颈识别]
C --> C3[基准测试]
D --> D1[多进程处理]
D --> D2[Dask集成]
D --> D3[并行计算模式]
E --> E1[自定义函数优化]
E --> E2[向量化操作]
E --> E3[高级索引技术]
F --> F1[流式处理]
F --> F2[增量处理]
F --> F3[数据管道设计]
G --> G1[错误处理]
G --> G2[日志记录]
G --> G3[监控告警]
10.1.1 学习目标
通过本章学习,你将能够:
掌握内存优化技术
- 理解Pandas内存使用机制
- 学会选择合适的数据类型
- 掌握分块处理大数据的方法
实现性能优化
- 使用性能分析工具识别瓶颈
- 应用向量化操作提升性能
- 优化数据处理流程
应用并行处理技术
- 实现多进程数据处理
- 集成Dask进行分布式计算
- 设计可扩展的数据处理架构
掌握高级数据处理技术
- 实现复杂的数据转换
- 构建高效的数据管道
- 处理流式数据
10.1.2 准备工作
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')
# 设置显示选项
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', 50)
# 性能分析工具
import time
import psutil
import memory_profiler
from functools import wraps
# 并行处理工具
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import dask.dataframe as dd
# 创建示例数据
np.random.seed(42)
def create_large_dataset(n_rows=1000000):
"""创建大型示例数据集"""
data = {
'id': range(n_rows),
'timestamp': pd.date_range('2020-01-01', periods=n_rows, freq='1min'),
'category': np.random.choice(['A', 'B', 'C', 'D'], n_rows),
'value1': np.random.randn(n_rows) * 100,
'value2': np.random.randn(n_rows) * 50,
'value3': np.random.randint(1, 1000, n_rows),
'text_data': [f'text_{i%10000}' for i in range(n_rows)],
'flag': np.random.choice([True, False], n_rows)
}
return pd.DataFrame(data)
# 创建示例数据
print("创建示例数据集...")
large_df = create_large_dataset(100000) # 较小的数据集用于演示
print(f"数据集形状: {large_df.shape}")
print(f"内存使用: {large_df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
10.2 内存优化技术
10.2.1 数据类型优化
print("=== 数据类型优化 ===")
# 1. 分析当前内存使用
print("1. 当前内存使用分析:")
def analyze_memory_usage(df, name="DataFrame"):
"""分析DataFrame的内存使用情况"""
memory_usage = df.memory_usage(deep=True)
total_memory = memory_usage.sum()
print(f"\n{name} 内存使用分析:")
print(f"总内存使用: {total_memory / 1024**2:.2f} MB")
print("\n各列内存使用:")
for col, usage in memory_usage.items():
if col != 'Index':
dtype = df[col].dtype
print(f" {col:15} ({dtype}): {usage / 1024**2:.2f} MB")
return total_memory
original_memory = analyze_memory_usage(large_df, "原始数据")
# 2. 数据类型优化
print("\n2. 数据类型优化:")
def optimize_dtypes(df):
"""优化DataFrame的数据类型"""
optimized_df = df.copy()
for col in optimized_df.columns:
col_type = optimized_df[col].dtype
if col_type != 'object':
# 数值类型优化
if str(col_type).startswith('int'):
# 整数类型优化
min_val = optimized_df[col].min()
max_val = optimized_df[col].max()
if min_val >= 0:
# 无符号整数
if max_val < 255:
optimized_df[col] = optimized_df[col].astype(np.uint8)
elif max_val < 65535:
optimized_df[col] = optimized_df[col].astype(np.uint16)
elif max_val < 4294967295:
optimized_df[col] = optimized_df[col].astype(np.uint32)
else:
# 有符号整数
if min_val > -128 and max_val < 127:
optimized_df[col] = optimized_df[col].astype(np.int8)
elif min_val > -32768 and max_val < 32767:
optimized_df[col] = optimized_df[col].astype(np.int16)
elif min_val > -2147483648 and max_val < 2147483647:
optimized_df[col] = optimized_df[col].astype(np.int32)
elif str(col_type).startswith('float'):
# 浮点数类型优化
optimized_df[col] = pd.to_numeric(optimized_df[col], downcast='float')
else:
# 对象类型优化
if optimized_df[col].nunique() / len(optimized_df) < 0.5:
# 如果唯一值比例小于50%,转换为category
optimized_df[col] = optimized_df[col].astype('category')
return optimized_df
# 应用优化
optimized_df = optimize_dtypes(large_df)
optimized_memory = analyze_memory_usage(optimized_df, "优化后数据")
# 计算节省的内存
memory_saved = original_memory - optimized_memory
memory_saved_percent = (memory_saved / original_memory) * 100
print(f"\n内存优化结果:")
print(f"原始内存: {original_memory / 1024**2:.2f} MB")
print(f"优化后内存: {optimized_memory / 1024**2:.2f} MB")
print(f"节省内存: {memory_saved / 1024**2:.2f} MB ({memory_saved_percent:.1f}%)")
# 3. 类别数据优化
print("\n3. 类别数据优化:")
# 比较不同类别数据存储方式
text_series = pd.Series(['A', 'B', 'C'] * 10000)
category_series = text_series.astype('category')
print(f"字符串存储: {text_series.memory_usage(deep=True) / 1024:.2f} KB")
print(f"类别存储: {category_series.memory_usage(deep=True) / 1024:.2f} KB")
print(f"节省比例: {(1 - category_series.memory_usage(deep=True) / text_series.memory_usage(deep=True)) * 100:.1f}%")
# 4. 稀疏数据优化
print("\n4. 稀疏数据优化:")
# 创建稀疏数据
sparse_data = np.random.choice([0, 1, 2], size=50000, p=[0.95, 0.03, 0.02])
dense_series = pd.Series(sparse_data)
sparse_series = pd.Series(sparse_data).astype(pd.SparseDtype(int, 0))
print(f"密集存储: {dense_series.memory_usage(deep=True) / 1024:.2f} KB")
print(f"稀疏存储: {sparse_series.memory_usage(deep=True) / 1024:.2f} KB")
print(f"稀疏度: {(sparse_data == 0).mean() * 100:.1f}%")
print(f"节省比例: {(1 - sparse_series.memory_usage(deep=True) / dense_series.memory_usage(deep=True)) * 100:.1f}%")
10.2.2 分块处理技术
print("\n=== 分块处理技术 ===")
# 1. 基础分块处理
print("1. 基础分块处理:")
def process_in_chunks(df, chunk_size=10000, func=None):
"""分块处理大型DataFrame"""
if func is None:
func = lambda x: x.describe()
results = []
n_chunks = len(df) // chunk_size + (1 if len(df) % chunk_size else 0)
print(f"处理 {len(df)} 行数据,分为 {n_chunks} 个块")
for i in range(0, len(df), chunk_size):
chunk = df.iloc[i:i+chunk_size]
result = func(chunk)
results.append(result)
if i // chunk_size % 10 == 0: # 每10个块显示一次进度
print(f"已处理: {min(i + chunk_size, len(df))}/{len(df)} 行")
return results
# 示例:分块计算统计信息
def calculate_stats(chunk):
"""计算块的统计信息"""
return {
'count': len(chunk),
'mean_value1': chunk['value1'].mean(),
'std_value1': chunk['value1'].std(),
'unique_categories': chunk['category'].nunique()
}
chunk_results = process_in_chunks(large_df, chunk_size=20000, func=calculate_stats)
# 汇总结果
total_count = sum(r['count'] for r in chunk_results)
weighted_mean = sum(r['mean_value1'] * r['count'] for r in chunk_results) / total_count
print(f"\n分块处理结果汇总:")
print(f"总行数: {total_count}")
print(f"加权平均值: {weighted_mean:.4f}")
print(f"直接计算平均值: {large_df['value1'].mean():.4f}")
# 2. 内存映射处理
print("\n2. 内存映射处理:")
# 保存数据到文件
large_df.to_csv('temp_large_data.csv', index=False)
def process_csv_chunks(filename, chunk_size=10000):
"""使用分块读取处理大型CSV文件"""
chunk_results = []
# 使用chunksize参数分块读取
for chunk_num, chunk in enumerate(pd.read_csv(filename, chunksize=chunk_size)):
# 处理每个块
result = {
'chunk_num': chunk_num,
'rows': len(chunk),
'value1_sum': chunk['value1'].sum(),
'value2_sum': chunk['value2'].sum()
}
chunk_results.append(result)
if chunk_num % 5 == 0:
print(f"处理块 {chunk_num}, 行数: {len(chunk)}")
return chunk_results
csv_results = process_csv_chunks('temp_large_data.csv', chunk_size=15000)
# 汇总CSV处理结果
total_value1_sum = sum(r['value1_sum'] for r in csv_results)
total_value2_sum = sum(r['value2_sum'] for r in csv_results)
print(f"\nCSV分块处理结果:")
print(f"处理了 {len(csv_results)} 个块")
print(f"value1总和: {total_value1_sum:.2f}")
print(f"value2总和: {total_value2_sum:.2f}")
# 3. 生成器模式处理
print("\n3. 生成器模式处理:")
def data_generator(df, batch_size=5000):
"""数据生成器,逐批返回数据"""
for i in range(0, len(df), batch_size):
yield df.iloc[i:i+batch_size]
def streaming_aggregation(data_gen):
"""流式聚合处理"""
running_sum = 0
running_count = 0
running_sum_squares = 0
for batch in data_gen:
batch_sum = batch['value1'].sum()
batch_count = len(batch)
batch_sum_squares = (batch['value1'] ** 2).sum()
running_sum += batch_sum
running_count += batch_count
running_sum_squares += batch_sum_squares
# 计算当前的统计信息
current_mean = running_sum / running_count
current_var = (running_sum_squares / running_count) - (current_mean ** 2)
current_std = np.sqrt(current_var)
yield {
'processed_rows': running_count,
'current_mean': current_mean,
'current_std': current_std
}
# 使用生成器处理数据
gen = data_generator(large_df, batch_size=10000)
streaming_results = list(streaming_aggregation(gen))
print("流式处理结果(最后几个批次):")
for result in streaming_results[-3:]:
print(f"已处理 {result['processed_rows']} 行, "
f"均值: {result['current_mean']:.4f}, "
f"标准差: {result['current_std']:.4f}")
# 4. 内存监控
print("\n4. 内存监控:")
def memory_monitor(func):
"""内存监控装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
# 获取当前进程
process = psutil.Process()
# 执行前的内存使用
mem_before = process.memory_info().rss / 1024**2
# 执行函数
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
# 执行后的内存使用
mem_after = process.memory_info().rss / 1024**2
print(f"\n函数 {func.__name__} 执行监控:")
print(f"执行时间: {end_time - start_time:.2f} 秒")
print(f"执行前内存: {mem_before:.2f} MB")
print(f"执行后内存: {mem_after:.2f} MB")
print(f"内存变化: {mem_after - mem_before:.2f} MB")
return result
return wrapper
@memory_monitor
def memory_intensive_operation(df):
"""内存密集型操作示例"""
# 创建多个副本
df_copy1 = df.copy()
df_copy2 = df.copy()
df_copy3 = df.copy()
# 进行一些计算
result = df_copy1['value1'] + df_copy2['value2'] + df_copy3['value3']
return result.sum()
# 测试内存监控
result = memory_intensive_operation(large_df)
print(f"计算结果: {result:.2f}")
# 清理临时文件
import os
if os.path.exists('temp_large_data.csv'):
os.remove('temp_large_data.csv')
10.2.3 高效数据结构选择
print("\n=== 高效数据结构选择 ===")
# 1. 索引优化
print("1. 索引优化:")
def compare_index_performance():
"""比较不同索引类型的性能"""
# 创建测试数据
n = 100000
df_default = pd.DataFrame({
'id': range(n),
'value': np.random.randn(n)
})
# 设置不同类型的索引
df_int_index = df_default.set_index('id')
df_range_index = df_default.copy()
df_range_index.index = pd.RangeIndex(n)
# 测试查询性能
test_ids = np.random.choice(range(n), 1000)
# 默认索引查询
start_time = time.time()
for idx in test_ids:
_ = df_default.iloc[idx]
default_time = time.time() - start_time
# 整数索引查询
start_time = time.time()
for idx in test_ids:
_ = df_int_index.loc[idx]
int_index_time = time.time() - start_time
# RangeIndex查询
start_time = time.time()
for idx in test_ids:
_ = df_range_index.iloc[idx]
range_index_time = time.time() - start_time
print(f"默认索引查询时间: {default_time:.4f} 秒")
print(f"整数索引查询时间: {int_index_time:.4f} 秒")
print(f"RangeIndex查询时间: {range_index_time:.4f} 秒")
# 内存使用比较
print(f"\n内存使用比较:")
print(f"默认索引: {df_default.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
print(f"整数索引: {df_int_index.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
print(f"RangeIndex: {df_range_index.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
compare_index_performance()
# 2. 数据结构选择策略
print("\n2. 数据结构选择策略:")
def choose_optimal_structure(data_characteristics):
"""根据数据特征选择最优数据结构"""
recommendations = []
# 分析数据特征
for col, char in data_characteristics.items():
if char['type'] == 'categorical':
if char['cardinality'] / char['total_rows'] < 0.5:
recommendations.append(f"{col}: 使用 category 类型")
else:
recommendations.append(f"{col}: 保持 object 类型")
elif char['type'] == 'numeric':
if char['has_nulls']:
recommendations.append(f"{col}: 考虑使用 nullable integer 类型")
else:
if char['is_integer']:
recommendations.append(f"{col}: 使用合适的整数类型")
else:
recommendations.append(f"{col}: 使用 float32 如果精度允许")
elif char['type'] == 'datetime':
recommendations.append(f"{col}: 使用 datetime64[ns] 并考虑时区")
elif char['type'] == 'boolean':
if char['has_nulls']:
recommendations.append(f"{col}: 使用 boolean 类型(支持NA)")
else:
recommendations.append(f"{col}: 使用 bool 类型")
return recommendations
# 分析示例数据特征
data_chars = {
'id': {'type': 'numeric', 'is_integer': True, 'has_nulls': False, 'total_rows': len(large_df)},
'category': {'type': 'categorical', 'cardinality': large_df['category'].nunique(), 'total_rows': len(large_df)},
'timestamp': {'type': 'datetime', 'has_nulls': False, 'total_rows': len(large_df)},
'flag': {'type': 'boolean', 'has_nulls': False, 'total_rows': len(large_df)}
}
recommendations = choose_optimal_structure(data_chars)
print("数据结构优化建议:")
for rec in recommendations:
print(f" - {rec}")
# 3. 向量化操作优化
print("\n3. 向量化操作优化:")
def compare_vectorization():
"""比较向量化操作与循环操作的性能"""
n = 50000
data = pd.DataFrame({
'a': np.random.randn(n),
'b': np.random.randn(n),
'c': np.random.randn(n)
})
# 方法1:循环操作(慢)
start_time = time.time()
result1 = []
for i in range(len(data)):
result1.append(data.iloc[i]['a'] * data.iloc[i]['b'] + data.iloc[i]['c'])
loop_time = time.time() - start_time
# 方法2:向量化操作(快)
start_time = time.time()
result2 = data['a'] * data['b'] + data['c']
vectorized_time = time.time() - start_time
# 方法3:NumPy操作(最快)
start_time = time.time()
result3 = data['a'].values * data['b'].values + data['c'].values
numpy_time = time.time() - start_time
print(f"循环操作时间: {loop_time:.4f} 秒")
print(f"向量化操作时间: {vectorized_time:.4f} 秒")
print(f"NumPy操作时间: {numpy_time:.4f} 秒")
print(f"向量化加速比: {loop_time / vectorized_time:.1f}x")
print(f"NumPy加速比: {loop_time / numpy_time:.1f}x")
compare_vectorization()
# 4. 缓存策略
print("\n4. 缓存策略:")
from functools import lru_cache
class DataFrameCache:
"""DataFrame缓存类"""
def __init__(self, max_size=100):
self.cache = {}
self.max_size = max_size
self.access_count = {}
def get(self, key, compute_func, *args, **kwargs):
"""获取缓存数据或计算新数据"""
if key in self.cache:
self.access_count[key] = self.access_count.get(key, 0) + 1
print(f"缓存命中: {key}")
return self.cache[key]
# 计算新数据
print(f"计算新数据: {key}")
result = compute_func(*args, **kwargs)
# 添加到缓存
if len(self.cache) >= self.max_size:
# 移除最少使用的项
least_used = min(self.access_count.items(), key=lambda x: x[1])[0]
del self.cache[least_used]
del self.access_count[least_used]
self.cache[key] = result
self.access_count[key] = 1
return result
def clear(self):
"""清空缓存"""
self.cache.clear()
self.access_count.clear()
# 使用缓存示例
cache = DataFrameCache(max_size=5)
def expensive_computation(df, operation):
"""模拟昂贵的计算操作"""
time.sleep(0.1) # 模拟计算时间
if operation == 'mean':
return df.mean()
elif operation == 'std':
return df.std()
elif operation == 'corr':
return df.corr()
# 测试缓存效果
start_time = time.time()
result1 = cache.get('mean_calc', expensive_computation, large_df[['value1', 'value2']], 'mean')
first_call_time = time.time() - start_time
start_time = time.time()
result2 = cache.get('mean_calc', expensive_computation, large_df[['value1', 'value2']], 'mean')
second_call_time = time.time() - start_time
print(f"首次计算时间: {first_call_time:.4f} 秒")
print(f"缓存命中时间: {second_call_time:.4f} 秒")
print(f"缓存加速比: {first_call_time / second_call_time:.1f}x")
10.3 性能分析与监控
10.3.1 性能分析工具
print("\n=== 性能分析工具 ===")
# 1. 时间分析工具
print("1. 时间分析工具:")
import cProfile
import pstats
from io import StringIO
def profile_function(func, *args, **kwargs):
"""分析函数性能"""
pr = cProfile.Profile()
pr.enable()
result = func(*args, **kwargs)
pr.disable()
# 获取统计信息
s = StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
ps.print_stats(10) # 显示前10个最耗时的函数
print(s.getvalue())
return result
def complex_data_operation(df):
"""复杂数据操作示例"""
# 多步骤数据处理
result = df.copy()
result['new_col1'] = result['value1'] * result['value2']
result['new_col2'] = result.groupby('category')['value1'].transform('mean')
result['new_col3'] = result['value1'].rolling(window=100).mean()
result = result.dropna()
return result.groupby('category').agg({
'new_col1': ['mean', 'std'],
'new_col2': 'first',
'new_col3': 'mean'
})
print("性能分析结果:")
result = profile_function(complex_data_operation, large_df)
# 2. 内存分析工具
print("\n2. 内存分析工具:")
def memory_profile_decorator(func):
"""内存分析装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
# 使用memory_profiler进行内存分析
from memory_profiler import profile
# 获取函数的内存使用情况
mem_usage = memory_profiler.memory_usage((func, args, kwargs), interval=0.1)
result = func(*args, **kwargs)
print(f"\n函数 {func.__name__} 内存分析:")
print(f"最大内存使用: {max(mem_usage):.2f} MB")
print(f"最小内存使用: {min(mem_usage):.2f} MB")
print(f"内存波动: {max(mem_usage) - min(mem_usage):.2f} MB")
return result
return wrapper
@memory_profile_decorator
def memory_intensive_function(df):
"""内存密集型函数"""
# 创建多个大型中间结果
temp1 = df.copy()
temp2 = df.copy()
temp3 = pd.concat([temp1, temp2], ignore_index=True)
# 进行计算
result = temp3.groupby('category').agg({
'value1': ['mean', 'std', 'min', 'max'],
'value2': ['sum', 'count']
})
return result
memory_result = memory_intensive_function(large_df)
# 3. 基准测试框架
print("\n3. 基准测试框架:")
class BenchmarkSuite:
"""基准测试套件"""
def __init__(self):
self.results = {}
def benchmark(self, name, func, *args, **kwargs):
"""执行基准测试"""
times = []
memory_usage = []
# 多次运行取平均值
for _ in range(5):
process = psutil.Process()
mem_before = process.memory_info().rss / 1024**2
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
mem_after = process.memory_info().rss / 1024**2
times.append(end_time - start_time)
memory_usage.append(mem_after - mem_before)
self.results[name] = {
'avg_time': np.mean(times),
'std_time': np.std(times),
'avg_memory': np.mean(memory_usage),
'std_memory': np.std(memory_usage)
}
return result
def compare_methods(self, methods_dict, *args, **kwargs):
"""比较多种方法的性能"""
for name, func in methods_dict.items():
self.benchmark(name, func, *args, **kwargs)
def report(self):
"""生成性能报告"""
print("\n=== 基准测试报告 ===")
print(f"{'方法名':<20} {'平均时间(s)':<12} {'时间标准差':<12} {'内存变化(MB)':<15}")
print("-" * 65)
for name, stats in self.results.items():
print(f"{name:<20} {stats['avg_time']:<12.4f} {stats['std_time']:<12.4f} {stats['avg_memory']:<15.2f}")
# 定义不同的数据处理方法
def method_pandas_groupby(df):
return df.groupby('category')['value1'].mean()
def method_pandas_pivot(df):
return df.pivot_table(values='value1', index='category', aggfunc='mean')
def method_manual_groupby(df):
result = {}
for cat in df['category'].unique():
mask = df['category'] == cat
result[cat] = df.loc[mask, 'value1'].mean()
return pd.Series(result)
# 运行基准测试
benchmark = BenchmarkSuite()
methods = {
'Pandas GroupBy': method_pandas_groupby,
'Pandas Pivot': method_pandas_pivot,
'Manual GroupBy': method_manual_groupby
}
benchmark.compare_methods(methods, large_df)
benchmark.report()
# 4. 性能监控仪表板
print("\n4. 性能监控仪表板:")
class PerformanceMonitor:
"""性能监控器"""
def __init__(self):
self.metrics = []
self.start_time = None
def start_monitoring(self):
"""开始监控"""
self.start_time = time.time()
self.metrics = []
def record_metric(self, operation_name, duration, memory_delta):
"""记录性能指标"""
self.metrics.append({
'timestamp': time.time() - self.start_time,
'operation': operation_name,
'duration': duration,
'memory_delta': memory_delta
})
def context_manager(self, operation_name):
"""上下文管理器用于自动监控"""
return PerformanceContext(self, operation_name)
def generate_report(self):
"""生成性能报告"""
if not self.metrics:
print("没有性能数据")
return
df_metrics = pd.DataFrame(self.metrics)
print("\n=== 性能监控报告 ===")
print(f"总监控时间: {df_metrics['timestamp'].max():.2f} 秒")
print(f"总操作数: {len(df_metrics)}")
# 按操作类型汇总
summary = df_metrics.groupby('operation').agg({
'duration': ['count', 'mean', 'sum'],
'memory_delta': ['mean', 'sum']
}).round(4)
print("\n操作汇总:")
print(summary)
# 可视化性能趋势
fig, axes = plt.subplots(2, 1, figsize=(12, 8))
# 时间趋势
axes[0].plot(df_metrics['timestamp'], df_metrics['duration'], 'o-')
axes[0].set_title('操作耗时趋势')
axes[0].set_xlabel('时间 (秒)')
axes[0].set_ylabel('耗时 (秒)')
# 内存使用趋势
axes[1].plot(df_metrics['timestamp'], df_metrics['memory_delta'], 'o-', color='red')
axes[1].set_title('内存使用变化趋势')
axes[1].set_xlabel('时间 (秒)')
axes[1].set_ylabel('内存变化 (MB)')
plt.tight_layout()
plt.show()
class PerformanceContext:
"""性能监控上下文管理器"""
def __init__(self, monitor, operation_name):
self.monitor = monitor
self.operation_name = operation_name
self.start_time = None
self.start_memory = None
def __enter__(self):
process = psutil.Process()
self.start_time = time.time()
self.start_memory = process.memory_info().rss / 1024**2
return self
def __exit__(self, exc_type, exc_val, exc_tb):
process = psutil.Process()
end_time = time.time()
end_memory = process.memory_info().rss / 1024**2
duration = end_time - self.start_time
memory_delta = end_memory - self.start_memory
self.monitor.record_metric(self.operation_name, duration, memory_delta)
# 使用性能监控器
monitor = PerformanceMonitor()
monitor.start_monitoring()
# 模拟一系列操作
with monitor.context_manager('数据加载'):
time.sleep(0.1) # 模拟加载时间
with monitor.context_manager('数据清洗'):
cleaned_data = large_df.dropna()
with monitor.context_manager('特征工程'):
cleaned_data['new_feature'] = cleaned_data['value1'] * cleaned_data['value2']
with monitor.context_manager('聚合计算'):
result = cleaned_data.groupby('category').agg({
'value1': 'mean',
'value2': 'std',
'new_feature': 'sum'
})
monitor.generate_report()
10.3.2 瓶颈识别与优化
print("\n=== 瓶颈识别与优化 ===")
# 1. 常见性能瓶颈识别
print("1. 常见性能瓶颈识别:")
def identify_bottlenecks(df):
"""识别DataFrame操作的潜在瓶颈"""
bottlenecks = []
# 检查数据大小
memory_usage = df.memory_usage(deep=True).sum() / 1024**2
if memory_usage > 1000: # 大于1GB
bottlenecks.append(f"大数据集警告: {memory_usage:.1f} MB,考虑分块处理")
# 检查数据类型
for col in df.columns:
if df[col].dtype == 'object':
unique_ratio = df[col].nunique() / len(df)
if unique_ratio < 0.5:
bottlenecks.append(f"列 '{col}' 可优化为category类型")
# 检查索引
if not isinstance(df.index, pd.RangeIndex):
bottlenecks.append("考虑使用RangeIndex以提高性能")
# 检查重复数据
duplicate_ratio = df.duplicated().sum() / len(df)
if duplicate_ratio > 0.1:
bottlenecks.append(f"发现 {duplicate_ratio:.1%} 重复数据,考虑去重")
# 检查缺失值
missing_ratio = df.isnull().sum().sum() / (len(df) * len(df.columns))
if missing_ratio > 0.2:
bottlenecks.append(f"发现 {missing_ratio:.1%} 缺失值,影响计算效率")
return bottlenecks
bottlenecks = identify_bottlenecks(large_df)
print("发现的性能瓶颈:")
for bottleneck in bottlenecks:
print(f" - {bottleneck}")
# 2. 操作优化策略
print("\n2. 操作优化策略:")
def optimize_operations():
"""演示各种操作的优化策略"""
# 创建测试数据
test_df = large_df.copy()
print("a) 条件筛选优化:")
# 低效方法:多次筛选
start_time = time.time()
result1 = test_df[test_df['value1'] > 0]
result1 = result1[result1['value2'] < 50]
result1 = result1[result1['category'].isin(['A', 'B'])]
inefficient_time = time.time() - start_time
# 高效方法:组合条件
start_time = time.time()
mask = (test_df['value1'] > 0) & (test_df['value2'] < 50) & (test_df['category'].isin(['A', 'B']))
result2 = test_df[mask]
efficient_time = time.time() - start_time
print(f"多次筛选时间: {inefficient_time:.4f} 秒")
print(f"组合条件时间: {efficient_time:.4f} 秒")
print(f"优化倍数: {inefficient_time / efficient_time:.1f}x")
print("\nb) 字符串操作优化:")
# 创建字符串数据
string_data = pd.Series(['text_' + str(i) for i in range(10000)])
# 低效方法:逐个处理
start_time = time.time()
result1 = string_data.apply(lambda x: x.upper())
apply_time = time.time() - start_time
# 高效方法:向量化操作
start_time = time.time()
result2 = string_data.str.upper()
vectorized_time = time.time() - start_time
print(f"apply方法时间: {apply_time:.4f} 秒")
print(f"向量化时间: {vectorized_time:.4f} 秒")
print(f"优化倍数: {apply_time / vectorized_time:.1f}x")
print("\nc) 聚合操作优化:")
# 低效方法:多次聚合
start_time = time.time()
mean_val = test_df.groupby('category')['value1'].mean()
std_val = test_df.groupby('category')['value1'].std()
count_val = test_df.groupby('category')['value1'].count()
multiple_agg_time = time.time() - start_time
# 高效方法:一次聚合
start_time = time.time()
result = test_df.groupby('category')['value1'].agg(['mean', 'std', 'count'])
single_agg_time = time.time() - start_time
print(f"多次聚合时间: {multiple_agg_time:.4f} 秒")
print(f"一次聚合时间: {single_agg_time:.4f} 秒")
print(f"优化倍数: {multiple_agg_time / single_agg_time:.1f}x")
optimize_operations()
# 3. 查询优化
print("\n3. 查询优化:")
def optimize_queries():
"""查询优化示例"""
# 创建带索引的数据
indexed_df = large_df.set_index('id')
print("a) 索引查询 vs 扫描查询:")
# 随机选择查询ID
query_ids = np.random.choice(large_df['id'], 100)
# 扫描查询(慢)
start_time = time.time()
for qid in query_ids:
result = large_df[large_df['id'] == qid]
scan_time = time.time() - start_time
# 索引查询(快)
start_time = time.time()
for qid in query_ids:
result = indexed_df.loc[qid:qid]
index_time = time.time() - start_time
print(f"扫描查询时间: {scan_time:.4f} 秒")
print(f"索引查询时间: {index_time:.4f} 秒")
print(f"优化倍数: {scan_time / index_time:.1f}x")
print("\nb) 范围查询优化:")
# 排序数据以优化范围查询
sorted_df = large_df.sort_values('value1')
# 普通范围查询
start_time = time.time()
result1 = large_df[(large_df['value1'] >= -50) & (large_df['value1'] <= 50)]
normal_range_time = time.time() - start_time
# 在排序数据上的范围查询
start_time = time.time()
result2 = sorted_df[(sorted_df['value1'] >= -50) & (sorted_df['value1'] <= 50)]
sorted_range_time = time.time() - start_time
print(f"普通范围查询时间: {normal_range_time:.4f} 秒")
print(f"排序数据查询时间: {sorted_range_time:.4f} 秒")
print("\nc) 查询计划优化:")
# 复杂查询的不同执行顺序
# 方法1:先筛选再聚合
start_time = time.time()
filtered = large_df[large_df['value1'] > 0]
result1 = filtered.groupby('category')['value2'].mean()
filter_first_time = time.time() - start_time
# 方法2:先聚合再筛选(如果可能)
start_time = time.time()
grouped = large_df.groupby('category').apply(
lambda x: x[x['value1'] > 0]['value2'].mean()
)
group_first_time = time.time() - start_time
print(f"先筛选后聚合时间: {filter_first_time:.4f} 秒")
print(f"分组内筛选时间: {group_first_time:.4f} 秒")
optimize_queries()
# 4. 自动优化建议系统
print("\n4. 自动优化建议系统:")
class OptimizationAdvisor:
"""优化建议系统"""
def __init__(self):
self.suggestions = []
def analyze_dataframe(self, df):
"""分析DataFrame并提供优化建议"""
self.suggestions = []
# 内存优化建议
self._analyze_memory_usage(df)
# 数据类型优化建议
self._analyze_dtypes(df)
# 索引优化建议
self._analyze_index(df)
# 数据质量建议
self._analyze_data_quality(df)
return self.suggestions
def _analyze_memory_usage(self, df):
"""分析内存使用"""
memory_mb = df.memory_usage(deep=True).sum() / 1024**2
if memory_mb > 500:
self.suggestions.append({
'type': 'memory',
'priority': 'high',
'message': f'数据集较大 ({memory_mb:.1f} MB),建议使用分块处理',
'code': 'pd.read_csv(file, chunksize=10000)'
})
def _analyze_dtypes(self, df):
"""分析数据类型"""
for col in df.columns:
if df[col].dtype == 'object':
unique_ratio = df[col].nunique() / len(df)
if unique_ratio < 0.5:
self.suggestions.append({
'type': 'dtype',
'priority': 'medium',
'message': f'列 {col} 适合转换为category类型',
'code': f"df['{col}'] = df['{col}'].astype('category')"
})
elif str(df[col].dtype).startswith('int64'):
min_val, max_val = df[col].min(), df[col].max()
if min_val >= 0 and max_val < 255:
self.suggestions.append({
'type': 'dtype',
'priority': 'low',
'message': f'列 {col} 可以使用uint8类型',
'code': f"df['{col}'] = df['{col}'].astype('uint8')"
})
def _analyze_index(self, df):
"""分析索引"""
if not isinstance(df.index, pd.RangeIndex):
self.suggestions.append({
'type': 'index',
'priority': 'medium',
'message': '考虑重置为RangeIndex以提高性能',
'code': 'df.reset_index(drop=True, inplace=True)'
})
def _analyze_data_quality(self, df):
"""分析数据质量"""
# 检查重复值
dup_ratio = df.duplicated().sum() / len(df)
if dup_ratio > 0.05:
self.suggestions.append({
'type': 'quality',
'priority': 'medium',
'message': f'发现 {dup_ratio:.1%} 重复数据',
'code': 'df.drop_duplicates(inplace=True)'
})
# 检查缺失值
missing_cols = df.columns[df.isnull().any()].tolist()
if missing_cols:
self.suggestions.append({
'type': 'quality',
'priority': 'medium',
'message': f'列 {missing_cols} 包含缺失值',
'code': 'df.fillna(method="ffill") 或 df.dropna()'
})
def generate_report(self):
"""生成优化报告"""
if not self.suggestions:
print("没有发现优化建议")
return
print("\n=== 优化建议报告 ===")
# 按优先级分组
by_priority = {}
for suggestion in self.suggestions:
priority = suggestion['priority']
if priority not in by_priority:
by_priority[priority] = []
by_priority[priority].append(suggestion)
# 按优先级输出
for priority in ['high', 'medium', 'low']:
if priority in by_priority:
print(f"\n{priority.upper()} 优先级建议:")
for i, suggestion in enumerate(by_priority[priority], 1):
print(f"{i}. {suggestion['message']}")
print(f" 建议代码: {suggestion['code']}")
# 使用优化建议系统
advisor = OptimizationAdvisor()
suggestions = advisor.analyze_dataframe(large_df)
advisor.generate_report()
10.4 并行处理与分布式计算
10.4.1 多进程处理
print("\n=== 多进程处理 ===")
# 1. 基础多进程处理
print("1. 基础多进程处理:")
def process_chunk(chunk_data):
"""处理数据块的函数"""
chunk, chunk_id = chunk_data
# 模拟复杂计算
result = {
'chunk_id': chunk_id,
'rows': len(chunk),
'mean_value1': chunk['value1'].mean(),
'std_value1': chunk['value1'].std(),
'category_counts': chunk['category'].value_counts().to_dict()
}
return result
def parallel_processing_demo():
"""并行处理演示"""
# 准备数据块
chunk_size = 10000
chunks = []
for i in range(0, len(large_df), chunk_size):
chunk = large_df.iloc[i:i+chunk_size]
chunks.append((chunk, i // chunk_size))
print(f"数据分为 {len(chunks)} 个块进行处理")
# 串行处理
start_time = time.time()
serial_results = []
for chunk_data in chunks:
result = process_chunk(chunk_data)
serial_results.append(result)
serial_time = time.time() - start_time
# 并行处理
start_time = time.time()
with ProcessPoolExecutor(max_workers=mp.cpu_count()) as executor:
parallel_results = list(executor.map(process_chunk, chunks))
parallel_time = time.time() - start_time
print(f"串行处理时间: {serial_time:.2f} 秒")
print(f"并行处理时间: {parallel_time:.2f} 秒")
print(f"加速比: {serial_time / parallel_time:.1f}x")
print(f"使用CPU核心数: {mp.cpu_count()}")
return parallel_results
parallel_results = parallel_processing_demo()
# 2. 高级并行模式
print("\n2. 高级并行模式:")
class ParallelDataProcessor:
"""并行数据处理器"""
def __init__(self, n_workers=None):
self.n_workers = n_workers or mp.cpu_count()
def map_reduce(self, df, map_func, reduce_func, chunk_size=10000):
"""Map-Reduce模式处理"""
# Map阶段:并行处理数据块
chunks = self._split_dataframe(df, chunk_size)
with ProcessPoolExecutor(max_workers=self.n_workers) as executor:
map_results = list(executor.map(map_func, chunks))
# Reduce阶段:合并结果
final_result = reduce_func(map_results)
return final_result
def _split_dataframe(self, df, chunk_size):
"""分割DataFrame为块"""
chunks = []
for i in range(0, len(df), chunk_size):
chunks.append(df.iloc[i:i+chunk_size])
return chunks
def parallel_apply(self, df, func, chunk_size=10000):
"""并行应用函数"""
chunks = self._split_dataframe(df, chunk_size)
with ProcessPoolExecutor(max_workers=self.n_workers) as executor:
results = list(executor.map(func, chunks))
# 合并结果
return pd.concat(results, ignore_index=True)
def parallel_groupby(self, df, groupby_cols, agg_dict, chunk_size=10000):
"""并行分组聚合"""
def map_func(chunk):
return chunk.groupby(groupby_cols).agg(agg_dict)
def reduce_func(results):
combined = pd.concat(results)
return combined.groupby(level=0).agg(agg_dict)
return self.map_reduce(df, map_func, reduce_func, chunk_size)
# 使用并行处理器
processor = ParallelDataProcessor()
# 示例1:并行计算统计信息
def compute_stats(chunk):
"""计算统计信息"""
return pd.DataFrame({
'category': chunk['category'].unique(),
'mean_value1': [chunk[chunk['category'] == cat]['value1'].mean()
for cat in chunk['category'].unique()],
'count': [chunk[chunk['category'] == cat].shape[0]
for cat in chunk['category'].unique()]
})
def combine_stats(results):
"""合并统计结果"""
combined = pd.concat(results, ignore_index=True)
return combined.groupby('category').agg({
'mean_value1': lambda x: np.average(x, weights=combined.loc[x.index, 'count']),
'count': 'sum'
})
print("并行统计计算:")
start_time = time.time()
parallel_stats = processor.map_reduce(large_df, compute_stats, combine_stats)
parallel_time = time.time() - start_time
# 对比串行计算
start_time = time.time()
serial_stats = large_df.groupby('category')['value1'].agg(['mean', 'count'])
serial_time = time.time() - start_time
print(f"并行计算时间: {parallel_time:.4f} 秒")
print(f"串行计算时间: {serial_time:.4f} 秒")
print("并行计算结果:")
print(parallel_stats)
# 3. 内存共享优化
print("\n3. 内存共享优化:")
import multiprocessing.shared_memory as shared_memory
def create_shared_dataframe(df):
"""创建共享内存DataFrame"""
# 将DataFrame转换为numpy数组
numeric_cols = df.select_dtypes(include=[np.number]).columns
numeric_data = df[numeric_cols].values
# 创建共享内存
shm = shared_memory.SharedMemory(create=True, size=numeric_data.nbytes)
# 将数据复制到共享内存
shared_array = np.ndarray(numeric_data.shape, dtype=numeric_data.dtype, buffer=shm.buf)
shared_array[:] = numeric_data[:]
return shm, shared_array.shape, shared_array.dtype, numeric_cols.tolist()
def process_shared_data(shm_info):
"""处理共享内存数据"""
shm_name, shape, dtype, columns = shm_info
# 连接到共享内存
existing_shm = shared_memory.SharedMemory(name=shm_name)
shared_array = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
# 创建DataFrame
df = pd.DataFrame(shared_array, columns=columns)
# 进行计算
result = df.mean().to_dict()
# 清理
existing_shm.close()
return result
# 注意:共享内存在某些环境中可能不可用,这里提供概念演示
print("共享内存处理概念演示(可能需要特定环境支持)")
# 4. 异步处理
print("\n4. 异步处理:")
import asyncio
import concurrent.futures
async def async_data_processing():
"""异步数据处理示例"""
def cpu_bound_task(chunk):
"""CPU密集型任务"""
return chunk.groupby('category')['value1'].agg(['mean', 'std', 'count'])
# 创建事件循环
loop = asyncio.get_event_loop()
# 准备数据块
chunks = [large_df.iloc[i:i+5000] for i in range(0, len(large_df), 5000)]
# 使用线程池执行CPU密集型任务
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
# 提交所有任务
futures = [loop.run_in_executor(executor, cpu_bound_task, chunk) for chunk in chunks]
# 等待所有任务完成
results = await asyncio.gather(*futures)
# 合并结果
final_result = pd.concat(results).groupby(level=0).agg({
'mean': lambda x: np.average(x, weights=pd.concat(results).loc[x.index, 'count']),
'std': 'mean',
'count': 'sum'
})
return final_result
# 运行异步处理
print("异步处理演示:")
try:
# 在某些环境中可能需要不同的事件循环处理
async_result = asyncio.run(async_data_processing())
print("异步处理完成")
print(async_result.head())
except Exception as e:
print(f"异步处理演示跳过: {e}")
10.4.2 Dask集成
print("\n=== Dask集成 ===")
# 1. Dask DataFrame基础
print("1. Dask DataFrame基础:")
try:
import dask
import dask.dataframe as dd
from dask.distributed import Client
# 创建Dask DataFrame
dask_df = dd.from_pandas(large_df, npartitions=4)
print(f"Pandas DataFrame形状: {large_df.shape}")
print(f"Dask DataFrame分区数: {dask_df.npartitions}")
print(f"每个分区大小: ~{len(large_df) // dask_df.npartitions} 行")
# Dask操作是惰性的
print("\n2. 惰性计算演示:")
# 定义计算图
result = dask_df.groupby('category')['value1'].mean()
print("计算图已创建(未执行)")
# 执行计算
start_time = time.time()
computed_result = result.compute()
dask_time = time.time() - start_time
# 对比Pandas直接计算
start_time = time.time()
pandas_result = large_df.groupby('category')['value1'].mean()
pandas_time = time.time() - start_time
print(f"Dask计算时间: {dask_time:.4f} 秒")
print(f"Pandas计算时间: {pandas_time:.4f} 秒")
print("Dask结果:")
print(computed_result)
# 3. 复杂操作链
print("\n3. 复杂操作链:")
# 构建复杂的计算图
complex_result = (dask_df
.assign(value_ratio=dask_df['value1'] / dask_df['value2'])
.query('value_ratio > 0')
.groupby('category')
.agg({'value1': ['mean', 'std'],
'value_ratio': 'mean'})
.compute())
print("复杂操作结果:")
print(complex_result)
# 4. 分布式计算
print("\n4. 分布式计算设置:")
# 创建本地集群
try:
client = Client(processes=False, threads_per_worker=2, n_workers=2)
print(f"Dask客户端: {client}")
print(f"集群信息: {client.cluster}")
# 使用分布式计算
distributed_result = dask_df.value1.sum().compute()
print(f"分布式计算结果: {distributed_result}")
client.close()
except Exception as e:
print(f"分布式计算演示跳过: {e}")
# 5. 内存外计算
print("\n5. 内存外计算演示:")
# 保存大数据集到多个文件
large_df.to_csv('temp_data_part1.csv', index=False)
large_df.to_csv('temp_data_part2.csv', index=False)
# 从多个文件创建Dask DataFrame
file_pattern = 'temp_data_part*.csv'
dask_from_files = dd.read_csv(file_pattern)
print(f"从文件创建的Dask DataFrame分区数: {dask_from_files.npartitions}")
# 执行内存外计算
out_of_core_result = dask_from_files.groupby('category').value1.mean().compute()
print("内存外计算结果:")
print(out_of_core_result)
# 清理临时文件
import os
for file in ['temp_data_part1.csv', 'temp_data_part2.csv']:
if os.path.exists(file):
os.remove(file)
except ImportError:
print("Dask未安装,跳过Dask演示")
print("安装命令: pip install dask[complete]")
# 6. 自定义并行函数
print("\n6. 自定义并行函数:")
def custom_parallel_function(df, func, n_workers=None):
"""自定义并行函数执行器"""
if n_workers is None:
n_workers = mp.cpu_count()
# 分割数据
chunk_size = len(df) // n_workers
chunks = [df.iloc[i:i+chunk_size] for i in range(0, len(df), chunk_size)]
# 并行执行
with ProcessPoolExecutor(max_workers=n_workers) as executor:
results = list(executor.map(func, chunks))
return results
def sample_function(chunk):
"""示例处理函数"""
return {
'chunk_size': len(chunk),
'mean_value1': chunk['value1'].mean(),
'unique_categories': chunk['category'].nunique()
}
# 测试自定义并行函数
parallel_custom_results = custom_parallel_function(large_df, sample_function, n_workers=4)
print("自定义并行函数结果:")
for i, result in enumerate(parallel_custom_results):
print(f"块 {i}: {result}")
10.4.3 分布式计算架构
print("\n=== 分布式计算架构 ===")
# 1. 数据分片策略
print("1. 数据分片策略:")
class DataSharding:
"""数据分片管理器"""
def __init__(self, df):
self.df = df
self.shards = {}
def shard_by_hash(self, column, n_shards):
"""基于哈希的分片"""
hash_values = pd.util.hash_pandas_object(self.df[column])
shard_ids = hash_values % n_shards
for shard_id in range(n_shards):
mask = shard_ids == shard_id
self.shards[f'hash_shard_{shard_id}'] = self.df[mask]
return self.shards
def shard_by_range(self, column, n_shards):
"""基于范围的分片"""
sorted_df = self.df.sort_values(column)
chunk_size = len(sorted_df) // n_shards
for shard_id in range(n_shards):
start_idx = shard_id * chunk_size
end_idx = start_idx + chunk_size if shard_id < n_shards - 1 else len(sorted_df)
self.shards[f'range_shard_{shard_id}'] = sorted_df.iloc[start_idx:end_idx]
return self.shards
def shard_by_category(self, column):
"""基于类别的分片"""
for category in self.df[column].unique():
mask = self.df[column] == category
self.shards[f'category_shard_{category}'] = self.df[mask]
return self.shards
def get_shard_info(self):
"""获取分片信息"""
info = {}
for shard_name, shard_data in self.shards.items():
info[shard_name] = {
'rows': len(shard_data),
'memory_mb': shard_data.memory_usage(deep=True).sum() / 1024**2,
'columns': list(shard_data.columns)
}
return info
# 演示不同分片策略
sharding = DataSharding(large_df)
# 哈希分片
hash_shards = sharding.shard_by_hash('id', 4)
print("哈希分片信息:")
hash_info = sharding.get_shard_info()
for shard_name, info in hash_info.items():
print(f" {shard_name}: {info['rows']} 行, {info['memory_mb']:.2f} MB")
# 重置分片器
sharding = DataSharding(large_df)
# 类别分片
category_shards = sharding.shard_by_category('category')
print("\n类别分片信息:")
category_info = sharding.get_shard_info()
for shard_name, info in category_info.items():
print(f" {shard_name}: {info['rows']} 行, {info['memory_mb']:.2f} MB")
# 2. 负载均衡
print("\n2. 负载均衡:")
class LoadBalancer:
"""负载均衡器"""
def __init__(self, workers):
self.workers = workers
self.worker_loads = {worker: 0 for worker in workers}
def assign_task(self, task_weight=1):
"""分配任务到负载最小的工作节点"""
min_load_worker = min(self.worker_loads.items(), key=lambda x: x[1])[0]
self.worker_loads[min_load_worker] += task_weight
return min_load_worker
def complete_task(self, worker, task_weight=1):
"""标记任务完成"""
self.worker_loads[worker] -= task_weight
def get_load_distribution(self):
"""获取负载分布"""
return self.worker_loads.copy()
# 模拟负载均衡
workers = ['worker_1', 'worker_2', 'worker_3', 'worker_4']
balancer = LoadBalancer(workers)
# 分配任务
tasks = [('task_A', 3), ('task_B', 1), ('task_C', 2), ('task_D', 4), ('task_E', 1)]
print("任务分配过程:")
task_assignments = {}
for task_name, task_weight in tasks:
assigned_worker = balancer.assign_task(task_weight)
task_assignments[task_name] = assigned_worker
print(f"{task_name} (权重:{task_weight}) -> {assigned_worker}")
print(f"\n最终负载分布: {balancer.get_load_distribution()}")
# 3. 容错机制
print("\n3. 容错机制:")
class FaultTolerantProcessor:
"""容错处理器"""
def __init__(self, max_retries=3):
self.max_retries = max_retries
self.failed_tasks = []
self.completed_tasks = []
def process_with_retry(self, task_func, *args, **kwargs):
"""带重试的任务处理"""
for attempt in range(self.max_retries + 1):
try:
result = task_func(*args, **kwargs)
self.completed_tasks.append({
'task': task_func.__name__,
'attempt': attempt + 1,
'status': 'success'
})
return result
except Exception as e:
if attempt == self.max_retries:
self.failed_tasks.append({
'task': task_func.__name__,
'attempts': attempt + 1,
'error': str(e)
})
raise e
else:
print(f"任务 {task_func.__name__} 第 {attempt + 1} 次尝试失败: {e}")
time.sleep(0.1 * (attempt + 1)) # 指数退避
def get_statistics(self):
"""获取处理统计"""
return {
'completed': len(self.completed_tasks),
'failed': len(self.failed_tasks),
'success_rate': len(self.completed_tasks) / (len(self.completed_tasks) + len(self.failed_tasks)) if (len(self.completed_tasks) + len(self.failed_tasks)) > 0 else 0
}
def unreliable_task(data, fail_probability=0.3):
"""不可靠的任务(模拟失败)"""
if np.random.random() < fail_probability:
raise Exception("随机任务失败")
return data.sum()
# 测试容错机制
processor = FaultTolerantProcessor(max_retries=2)
print("容错处理测试:")
for i in range(5):
try:
result = processor.process_with_retry(unreliable_task, large_df['value1'])
print(f"任务 {i+1} 成功: {result:.2f}")
except Exception as e:
print(f"任务 {i+1} 最终失败: {e}")
stats = processor.get_statistics()
print(f"\n处理统计: {stats}")
10.5 高级数据处理技术
10.5.1 自定义函数优化
print("\n=== 自定义函数优化 ===")
# 1. 向量化自定义函数
print("1. 向量化自定义函数:")
# 低效的逐行处理
def slow_custom_function(row):
"""慢速自定义函数"""
if row['value1'] > 0:
return row['value1'] * row['value2'] + 10
else:
return row['value2'] - 5
# 向量化版本
def fast_custom_function(value1, value2):
"""快速向量化函数"""
return np.where(value1 > 0, value1 * value2 + 10, value2 - 5)
# 性能比较
print("自定义函数性能比较:")
# 测试慢速版本
start_time = time.time()
slow_result = large_df.apply(slow_custom_function, axis=1)
slow_time = time.time() - start_time
# 测试快速版本
start_time = time.time()
fast_result = fast_custom_function(large_df['value1'], large_df['value2'])
fast_time = time.time() - start_time
print(f"逐行处理时间: {slow_time:.4f} 秒")
print(f"向量化处理时间: {fast_time:.4f} 秒")
print(f"加速比: {slow_time / fast_time:.1f}x")
print(f"结果一致性: {np.allclose(slow_result, fast_result)}")
# 2. NumPy集成优化
print("\n2. NumPy集成优化:")
def numpy_optimized_function(df):
"""使用NumPy优化的函数"""
# 直接使用NumPy数组
values1 = df['value1'].values
values2 = df['value2'].values
# NumPy向量化操作
result = np.where(
values1 > values1.mean(),
values1 * values2,
values1 + values2
)
return pd.Series(result, index=df.index)
def pandas_function(df):
"""纯Pandas版本"""
mean_val = df['value1'].mean()
return np.where(
df['value1'] > mean_val,
df['value1'] * df['value2'],
df['value1'] + df['value2']
)
# 性能比较
start_time = time.time()
numpy_result = numpy_optimized_function(large_df)
numpy_time = time.time() - start_time
start_time = time.time()
pandas_result = pandas_function(large_df)
pandas_time = time.time() - start_time
print(f"NumPy优化时间: {numpy_time:.4f} 秒")
print(f"Pandas版本时间: {pandas_time:.4f} 秒")
print(f"NumPy加速比: {pandas_time / numpy_time:.1f}x")
# 3. Numba加速
print("\n3. Numba加速:")
try:
from numba import jit, prange
@jit(nopython=True)
def numba_function(values1, values2):
"""Numba编译的函数"""
result = np.empty(len(values1))
for i in prange(len(values1)):
if values1[i] > 0:
result[i] = values1[i] * values2[i] + 10
else:
result[i] = values2[i] - 5
return result
# 预热Numba函数
_ = numba_function(large_df['value1'].values[:100], large_df['value2'].values[:100])
# 性能测试
start_time = time.time()
numba_result = numba_function(large_df['value1'].values, large_df['value2'].values)
numba_time = time.time() - start_time
print(f"Numba编译时间: {numba_time:.4f} 秒")
print(f"相比向量化加速比: {fast_time / numba_time:.1f}x")
except ImportError:
print("Numba未安装,跳过Numba演示")
print("安装命令: pip install numba")
# 4. 缓存优化
print("\n4. 缓存优化:")
from functools import lru_cache
@lru_cache(maxsize=128)
def expensive_calculation(value):
"""昂贵的计算(带缓存)"""
# 模拟复杂计算
time.sleep(0.001)
return value ** 2 + np.sin(value) * 100
def apply_with_cache(series):
"""应用带缓存的函数"""
return series.apply(expensive_calculation)
def apply_without_cache(series):
"""不带缓存的版本"""
return series.apply(lambda x: x ** 2 + np.sin(x) * 100)
# 创建有重复值的测试数据
test_data = pd.Series(np.random.choice(range(100), 1000))
# 测试缓存效果
start_time = time.time()
cached_result = apply_with_cache(test_data)
cached_time = time.time() - start_time
start_time = time.time()
uncached_result = apply_without_cache(test_data)
uncached_time = time.time() - start_time
print(f"带缓存时间: {cached_time:.4f} 秒")
print(f"不带缓存时间: {uncached_time:.4f} 秒")
print(f"缓存加速比: {uncached_time / cached_time:.1f}x")
print(f"数据重复率: {1 - test_data.nunique() / len(test_data):.1%}")
10.5.2 流式数据处理
print("\n=== 流式数据处理 ===")
# 1. 数据流生成器
print("1. 数据流生成器:")
class DataStreamGenerator:
"""数据流生成器"""
def __init__(self, batch_size=1000, total_batches=10):
self.batch_size = batch_size
self.total_batches = total_batches
self.current_batch = 0
def generate_batch(self):
"""生成一批数据"""
if self.current_batch >= self.total_batches:
return None
data = {
'timestamp': pd.date_range(
start=pd.Timestamp.now() + pd.Timedelta(minutes=self.current_batch),
periods=self.batch_size,
freq='1s'
),
'value': np.random.randn(self.batch_size) * 100,
'category': np.random.choice(['A', 'B', 'C'], self.batch_size),
'batch_id': self.current_batch
}
self.current_batch += 1
return pd.DataFrame(data)
def stream(self):
"""数据流迭代器"""
while True:
batch = self.generate_batch()
if batch is None:
break
yield batch
# 2. 流式处理器
class StreamProcessor:
"""流式数据处理器"""
def __init__(self, window_size=5):
self.window_size = window_size
self.buffer = []
self.processed_count = 0
self.running_stats = {
'count': 0,
'sum': 0,
'sum_squares': 0
}
def process_batch(self, batch):
"""处理单个批次"""
# 更新运行统计
batch_sum = batch['value'].sum()
batch_count = len(batch)
batch_sum_squares = (batch['value'] ** 2).sum()
self.running_stats['count'] += batch_count
self.running_stats['sum'] += batch_sum
self.running_stats['sum_squares'] += batch_sum_squares
# 计算当前统计信息
current_mean = self.running_stats['sum'] / self.running_stats['count']
current_var = (self.running_stats['sum_squares'] / self.running_stats['count']) - (current_mean ** 2)
current_std = np.sqrt(current_var)
# 滑动窗口处理
self.buffer.append(batch)
if len(self.buffer) > self.window_size:
self.buffer.pop(0)
# 窗口内聚合
if self.buffer:
window_data = pd.concat(self.buffer, ignore_index=True)
window_stats = {
'window_mean': window_data['value'].mean(),
'window_std': window_data['value'].std(),
'window_size': len(window_data)
}
else:
window_stats = {}
self.processed_count += 1
return {
'batch_id': self.processed_count,
'batch_stats': {
'mean': batch['value'].mean(),
'std': batch['value'].std(),
'count': len(batch)
},
'running_stats': {
'mean': current_mean,
'std': current_std,
'total_count': self.running_stats['count']
},
'window_stats': window_stats
}
def process_stream(self, data_stream):
"""处理数据流"""
results = []
for batch in data_stream:
result = self.process_batch(batch)
results.append(result)
# 实时输出
print(f"批次 {result['batch_id']}: "
f"批次均值={result['batch_stats']['mean']:.2f}, "
f"运行均值={result['running_stats']['mean']:.2f}, "
f"总计数={result['running_stats']['total_count']}")
return results
# 演示流式处理
print("流式处理演示:")
generator = DataStreamGenerator(batch_size=500, total_batches=5)
processor = StreamProcessor(window_size=3)
stream_results = processor.process_stream(generator.stream())
print(f"\n流式处理完成,共处理 {len(stream_results)} 个批次")
# 3. 实时聚合
print("\n3. 实时聚合:")
class RealTimeAggregator:
"""实时聚合器"""
def __init__(self):
self.aggregates = {}
self.last_update = {}
def update(self, key, value, timestamp=None):
"""更新聚合值"""
if timestamp is None:
timestamp = pd.Timestamp.now()
if key not in self.aggregates:
self.aggregates[key] = {
'count': 0,
'sum': 0,
'min': float('inf'),
'max': float('-inf'),
'values': [] # 用于计算中位数等
}
agg = self.aggregates[key]
agg['count'] += 1
agg['sum'] += value
agg['min'] = min(agg['min'], value)
agg['max'] = max(agg['max'], value)
agg['values'].append(value)
# 保持values列表大小限制
if len(agg['values']) > 1000:
agg['values'] = agg['values'][-1000:]
self.last_update[key] = timestamp
def get_stats(self, key):
"""获取聚合统计"""
if key not in self.aggregates:
return None
agg = self.aggregates[key]
return {
'count': agg['count'],
'mean': agg['sum'] / agg['count'],
'min': agg['min'],
'max': agg['max'],
'median': np.median(agg['values']),
'last_update': self.last_update[key]
}
def get_all_stats(self):
"""获取所有聚合统计"""
return {key: self.get_stats(key) for key in self.aggregates.keys()}
# 演示实时聚合
aggregator = RealTimeAggregator()
# 模拟实时数据流
print("实时聚合演示:")
for i in range(20):
category = np.random.choice(['A', 'B', 'C'])
value = np.random.randn() * 100
aggregator.update(category, value)
if i % 5 == 4: # 每5次更新显示一次统计
print(f"\n第 {i+1} 次更新后的统计:")
for cat, stats in aggregator.get_all_stats().items():
print(f" {cat}: 计数={stats['count']}, 均值={stats['mean']:.2f}, "
f"中位数={stats['median']:.2f}")
# 4. 数据管道设计
print("\n4. 数据管道设计:")
class DataPipeline:
"""数据处理管道"""
def __init__(self):
self.stages = []
self.metrics = {
'processed_records': 0,
'processing_time': 0,
'errors': 0
}
def add_stage(self, stage_func, stage_name=None):
"""添加处理阶段"""
if stage_name is None:
stage_name = stage_func.__name__
self.stages.append({
'func': stage_func,
'name': stage_name
})
return self
def process(self, data):
"""执行管道处理"""
start_time = time.time()
current_data = data
try:
for stage in self.stages:
stage_start = time.time()
current_data = stage['func'](current_data)
stage_time = time.time() - stage_start
print(f"阶段 '{stage['name']}' 完成,耗时: {stage_time:.4f}秒")
self.metrics['processed_records'] += len(data) if hasattr(data, '__len__') else 1
self.metrics['processing_time'] += time.time() - start_time
return current_data
except Exception as e:
self.metrics['errors'] += 1
print(f"管道处理错误: {e}")
raise
def get_metrics(self):
"""获取管道指标"""
return self.metrics.copy()
# 定义管道阶段
def clean_data(df):
"""数据清洗阶段"""
return df.dropna()
def transform_data(df):
"""数据转换阶段"""
df = df.copy()
df['value_squared'] = df['value1'] ** 2
return df
def aggregate_data(df):
"""数据聚合阶段"""
return df.groupby('category').agg({
'value1': ['mean', 'std'],
'value_squared': 'sum'
})
# 构建和执行管道
pipeline = (DataPipeline()
.add_stage(clean_data, "数据清洗")
.add_stage(transform_data, "数据转换")
.add_stage(aggregate_data, "数据聚合"))
print("数据管道执行:")
pipeline_result = pipeline.process(large_df)
print(f"\n管道执行指标: {pipeline.get_metrics()}")
print("管道结果:")
print(pipeline_result)
10.6 生产环境最佳实践
10.6.1 错误处理与日志记录
print("\n=== 错误处理与日志记录 ===")
import logging
from datetime import datetime
# 1. 日志配置
print("1. 日志配置:")
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('pandas_processing.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('PandasProcessor')
class DataProcessorWithLogging:
"""带日志记录的数据处理器"""
def __init__(self, name="DataProcessor"):
self.name = name
self.logger = logging.getLogger(name)
self.processing_stats = {
'start_time': None,
'end_time': None,
'records_processed': 0,
'errors_count': 0
}
def process_data(self, df, operations):
"""处理数据并记录日志"""
self.processing_stats['start_time'] = datetime.now()
self.logger.info(f"开始处理数据,输入形状: {df.shape}")
try:
result = df.copy()
for i, operation in enumerate(operations):
operation_start = time.time()
try:
result = operation(result)
operation_time = time.time() - operation_start
self.logger.info(f"操作 {i+1} ({operation.__name__}) 完成,"
f"耗时: {operation_time:.4f}秒,"
f"结果形状: {result.shape}")
except Exception as e:
self.processing_stats['errors_count'] += 1
self.logger.error(f"操作 {i+1} ({operation.__name__}) 失败: {e}")
raise
self.processing_stats['records_processed'] = len(result)
self.processing_stats['end_time'] = datetime.now()
total_time = (self.processing_stats['end_time'] -
self.processing_stats['start_time']).total_seconds()
self.logger.info(f"数据处理完成,总耗时: {total_time:.2f}秒,"
f"处理记录数: {self.processing_stats['records_processed']}")
return result
except Exception as e:
self.logger.error(f"数据处理失败: {e}")
self.processing_stats['end_time'] = datetime.now()
raise
def get_stats(self):
"""获取处理统计"""
return self.processing_stats.copy()
# 定义处理操作
def remove_outliers(df):
"""移除异常值"""
Q1 = df['value1'].quantile(0.25)
Q3 = df['value1'].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
return df[(df['value1'] >= lower_bound) & (df['value1'] <= upper_bound)]
def add_features(df):
"""添加特征"""
df = df.copy()
df['value_ratio'] = df['value1'] / df['value2']
df['value_sum'] = df['value1'] + df['value2']
return df
def normalize_values(df):
"""标准化数值"""
df = df.copy()
df['value1_norm'] = (df['value1'] - df['value1'].mean()) / df['value1'].std()
df['value2_norm'] = (df['value2'] - df['value2'].mean()) / df['value2'].std()
return df
# 使用带日志的处理器
processor = DataProcessorWithLogging("MainProcessor")
operations = [remove_outliers, add_features, normalize_values]
print("执行带日志的数据处理:")
processed_data = processor.process_data(large_df, operations)
print(f"\n处理统计: {processor.get_stats()}")
# 2. 异常处理策略
print("\n2. 异常处理策略:")
class RobustDataProcessor:
"""健壮的数据处理器"""
def __init__(self, max_retries=3, fallback_strategy='skip'):
self.max_retries = max_retries
self.fallback_strategy = fallback_strategy # 'skip', 'default', 'raise'
self.error_log = []
def safe_operation(self, df, operation, operation_name=None):
"""安全执行操作"""
if operation_name is None:
operation_name = operation.__name__
for attempt in range(self.max_retries + 1):
try:
return operation(df)
except Exception as e:
error_info = {
'operation': operation_name,
'attempt': attempt + 1,
'error': str(e),
'timestamp': datetime.now()
}
self.error_log.append(error_info)
if attempt == self.max_retries:
# 最后一次尝试失败,应用回退策略
return self._apply_fallback_strategy(df, operation_name, e)
else:
logger.warning(f"操作 {operation_name} 第 {attempt + 1} 次尝试失败: {e}")
time.sleep(0.1 * (attempt + 1)) # 指数退避
def _apply_fallback_strategy(self, df, operation_name, error):
"""应用回退策略"""
if self.fallback_strategy == 'skip':
logger.warning(f"跳过失败的操作 {operation_name}")
return df
elif self.fallback_strategy == 'default':
logger.warning(f"使用默认值处理 {operation_name}")
return self._get_default_result(df, operation_name)
else: # 'raise'
logger.error(f"操作 {operation_name} 最终失败")
raise error
def _get_default_result(self, df, operation_name):
"""获取默认结果"""
# 根据操作类型返回合理的默认值
if 'normalize' in operation_name.lower():
# 标准化失败时返回原数据
return df
elif 'feature' in operation_name.lower():
# 特征工程失败时添加空列
df = df.copy()
df['default_feature'] = 0
return df
else:
return df
def get_error_summary(self):
"""获取错误摘要"""
if not self.error_log:
return "没有错误记录"
error_summary = {}
for error in self.error_log:
op = error['operation']
if op not in error_summary:
error_summary[op] = {'count': 0, 'last_error': ''}
error_summary[op]['count'] += 1
error_summary[op]['last_error'] = error['error']
return error_summary
# 模拟可能失败的操作
def unreliable_operation(df):
"""不可靠的操作"""
if np.random.random() < 0.7: # 70%概率失败
raise ValueError("模拟操作失败")
return df.copy()
# 测试健壮处理器
robust_processor = RobustDataProcessor(max_retries=2, fallback_strategy='skip')
print("健壮处理器测试:")
for i in range(3):
try:
result = robust_processor.safe_operation(large_df, unreliable_operation, f"测试操作_{i+1}")
print(f"操作 {i+1} 成功")
except Exception as e:
print(f"操作 {i+1} 失败: {e}")
print(f"\n错误摘要: {robust_processor.get_error_summary()}")
# 3. 性能监控
print("\n3. 性能监控:")
class PerformanceMonitor:
"""性能监控器"""
def __init__(self):
self.metrics = []
self.alerts = []
self.thresholds = {
'max_processing_time': 10.0, # 秒
'max_memory_usage': 1000.0, # MB
'min_success_rate': 0.95 # 95%
}
def monitor_operation(self, operation_name):
"""操作监控装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
start_memory = psutil.Process().memory_info().rss / 1024**2
try:
result = func(*args, **kwargs)
success = True
error_msg = None
except Exception as e:
success = False
error_msg = str(e)
result = None
raise
finally:
end_time = time.time()
end_memory = psutil.Process().memory_info().rss / 1024**2
metric = {
'operation': operation_name,
'timestamp': datetime.now(),
'duration': end_time - start_time,
'memory_delta': end_memory - start_memory,
'success': success,
'error': error_msg
}
self.metrics.append(metric)
self._check_alerts(metric)
return result
return wrapper
return decorator
def _check_alerts(self, metric):
"""检查告警条件"""
alerts = []
if metric['duration'] > self.thresholds['max_processing_time']:
alerts.append(f"处理时间过长: {metric['duration']:.2f}秒")
if metric['memory_delta'] > self.thresholds['max_memory_usage']:
alerts.append(f"内存使用过高: {metric['memory_delta']:.2f}MB")
# 检查成功率
recent_metrics = self.metrics[-10:] # 最近10次操作
if len(recent_metrics) >= 5:
success_rate = sum(1 for m in recent_metrics if m['success']) / len(recent_metrics)
if success_rate < self.thresholds['min_success_rate']:
alerts.append(f"成功率过低: {success_rate:.1%}")
for alert in alerts:
alert_info = {
'timestamp': metric['timestamp'],
'operation': metric['operation'],
'message': alert
}
self.alerts.append(alert_info)
logger.warning(f"性能告警: {alert}")
def get_performance_report(self):
"""生成性能报告"""
if not self.metrics:
return "没有性能数据"
df_metrics = pd.DataFrame(self.metrics)
report = {
'total_operations': len(df_metrics),
'success_rate': df_metrics['success'].mean(),
'avg_duration': df_metrics['duration'].mean(),
'max_duration': df_metrics['duration'].max(),
'avg_memory_delta': df_metrics['memory_delta'].mean(),
'total_alerts': len(self.alerts)
}
return report
# 使用性能监控
monitor = PerformanceMonitor()
@monitor.monitor_operation("数据聚合")
def monitored_aggregation(df):
"""被监控的聚合操作"""
time.sleep(0.1) # 模拟处理时间
return df.groupby('category').agg({
'value1': ['mean', 'std'],
'value2': 'sum'
})
@monitor.monitor_operation("数据转换")
def monitored_transformation(df):
"""被监控的转换操作"""
df = df.copy()
df['new_feature'] = df['value1'] * df['value2']
return df
# 执行监控操作
print("执行性能监控:")
for i in range(5):
try:
agg_result = monitored_aggregation(large_df)
trans_result = monitored_transformation(large_df)
print(f"操作组 {i+1} 完成")
except Exception as e:
print(f"操作组 {i+1} 失败: {e}")
performance_report = monitor.get_performance_report()
print(f"\n性能报告: {performance_report}")
if monitor.alerts:
print(f"\n告警记录 ({len(monitor.alerts)} 条):")
for alert in monitor.alerts[-3:]: # 显示最近3条告警
print(f" {alert['timestamp']}: {alert['message']}")
10.7 本章小结
10.7.1 核心知识点回顾
print("\n=== 核心知识点回顾 ===")
# 总结本章学习的关键技术
key_techniques = {
"内存优化": [
"数据类型优化(category, sparse, 数值类型降级)",
"分块处理大数据集",
"内存映射和流式处理",
"缓存策略和内存监控"
],
"性能优化": [
"向量化操作替代循环",
"NumPy集成优化",
"索引优化和查询优化",
"性能分析和瓶颈识别"
],
"并行处理": [
"多进程数据处理",
"Dask分布式计算",
"异步处理模式",
"负载均衡和容错机制"
],
"高级技术": [
"自定义函数优化",
"流式数据处理",
"数据管道设计",
"实时聚合处理"
],
"生产实践": [
"错误处理和重试机制",
"日志记录和监控",
"性能告警系统",
"健壮性设计"
]
}
print("本章核心技术总结:")
for category, techniques in key_techniques.items():
print(f"\n{category}:")
for technique in techniques:
print(f" • {technique}")
10.7.2 性能优化检查清单
print("\n=== 性能优化检查清单 ===")
optimization_checklist = {
"数据加载阶段": [
"☐ 使用合适的数据类型(category, int8/16, float32)",
"☐ 分块读取大文件(chunksize参数)",
"☐ 只加载需要的列(usecols参数)",
"☐ 使用高效的文件格式(Parquet, HDF5)"
],
"数据处理阶段": [
"☐ 优先使用向量化操作",
"☐ 避免不必要的数据复制",
"☐ 使用inplace操作减少内存使用",
"☐ 合并多个操作减少中间结果"
],
"查询和筛选": [
"☐ 使用布尔索引而非循环",
"☐ 设置合适的索引",
"☐ 优化查询顺序(先筛选后聚合)",
"☐ 使用query()方法处理复杂条件"
],
"聚合和分组": [
"☐ 一次性完成多个聚合操作",
"☐ 使用transform()避免重复计算",
"☐ 考虑使用groupby的优化参数",
"☐ 预排序数据提高分组效率"
],
"内存管理": [
"☐ 定期检查内存使用情况",
"☐ 及时删除不需要的变量",
"☐ 使用生成器处理大数据",
"☐ 监控内存泄漏"
]
}
print("性能优化检查清单:")
for phase, items in optimization_checklist.items():
print(f"\n{phase}:")
for item in items:
print(f" {item}")
10.7.3 常见陷阱与解决方案
print("\n=== 常见陷阱与解决方案 ===")
common_pitfalls = {
"内存陷阱": {
"问题": "数据类型选择不当导致内存浪费",
"示例": "使用object类型存储重复的字符串",
"解决方案": "转换为category类型,节省50-90%内存",
"代码": "df['column'] = df['column'].astype('category')"
},
"性能陷阱": {
"问题": "使用apply()处理可向量化的操作",
"示例": "df.apply(lambda x: x['a'] + x['b'], axis=1)",
"解决方案": "使用向量化操作",
"代码": "df['a'] + df['b']"
},
"索引陷阱": {
"问题": "重复设置索引导致性能下降",
"示例": "在循环中多次调用set_index()",
"解决方案": "预先设置索引或使用其他查询方法",
"代码": "df.set_index('key', inplace=True) # 只设置一次"
},
"复制陷阱": {
"问题": "不必要的数据复制消耗内存",
"示例": "df_new = df.copy(); df_new['col'] = value",
"解决方案": "使用inplace操作或视图",
"代码": "df['col'] = value # 直接修改"
},
"聚合陷阱": {
"问题": "多次分组聚合重复计算",
"示例": "多次调用groupby()计算不同统计量",
"解决方案": "一次性计算所有需要的聚合",
"代码": "df.groupby('key').agg({'col1': 'mean', 'col2': 'sum'})"
}
}
print("常见陷阱与解决方案:")
for pitfall_type, details in common_pitfalls.items():
print(f"\n{pitfall_type}:")
print(f" 问题: {details['问题']}")
print(f" 示例: {details['示例']}")
print(f" 解决方案: {details['解决方案']}")
print(f" 推荐代码: {details['代码']}")
10.7.4 下一步学习建议
print("\n=== 下一步学习建议 ===")
next_steps = {
"深入学习方向": [
"学习Dask进行大规模分布式计算",
"掌握Apache Spark与Pandas的集成",
"了解GPU加速计算(cuDF, Rapids)",
"学习时间序列数据库(InfluxDB, TimescaleDB)"
],
"相关技术栈": [
"NumPy高级功能和优化技巧",
"Scikit-learn机器学习管道",
"Matplotlib/Seaborn高级可视化",
"Jupyter Notebook性能优化"
],
"生产环境技能": [
"Docker容器化Pandas应用",
"Kubernetes部署和扩展",
"监控和告警系统集成",
"数据质量和治理实践"
],
"实践项目建议": [
"构建实时数据处理管道",
"开发数据质量监控系统",
"实现自动化数据分析报告",
"创建高性能数据API服务"
]
}
print("下一步学习建议:")
for category, suggestions in next_steps.items():
print(f"\n{category}:")
for suggestion in suggestions:
print(f" • {suggestion}")
10.7.5 练习题
print("\n=== 练习题 ===")
exercises = [
{
"题目": "内存优化挑战",
"描述": "给定一个1GB的CSV文件,包含用户行为数据,要求在内存限制为512MB的环境中完成数据分析",
"要求": [
"实现分块读取和处理",
"优化数据类型减少内存使用",
"计算用户活跃度统计",
"生成每日汇总报告"
],
"提示": "使用chunksize、category类型、生成器模式"
},
{
"题目": "性能优化竞赛",
"描述": "优化一个现有的数据处理脚本,要求将处理时间减少至少50%",
"要求": [
"识别性能瓶颈",
"应用向量化操作",
"优化查询和聚合",
"实现并行处理"
],
"提示": "使用cProfile分析、向量化替代循环、多进程处理"
},
{
"题目": "实时数据流处理",
"描述": "设计一个实时数据流处理系统,处理每秒1000条记录的用户事件",
"要求": [
"实现流式数据接收",
"设计滑动窗口聚合",
"实现异常检测和告警",
"提供实时监控面板"
],
"提示": "使用生成器、滑动窗口、异步处理、实时聚合"
},
{
"题目": "分布式计算项目",
"描述": "使用Dask处理多个大型数据文件,实现分布式机器学习管道",
"要求": [
"设置Dask集群",
"实现数据预处理管道",
"分布式特征工程",
"模型训练和评估"
],
"提示": "Dask DataFrame、分布式计算、机器学习集成"
},
{
"题目": "生产环境部署",
"描述": "将Pandas数据处理应用部署到生产环境,确保高可用性和可扩展性",
"要求": [
"实现错误处理和重试机制",
"添加日志记录和监控",
"设计健康检查和告警",
"实现自动扩缩容"
],
"提示": "容错设计、日志系统、监控告警、容器化部署"
}
]
print("本章练习题:")
for i, exercise in enumerate(exercises, 1):
print(f"\n练习 {i}: {exercise['题目']}")
print(f"描述: {exercise['描述']}")
print("要求:")
for req in exercise['要求']:
print(f" • {req}")
print(f"提示: {exercise['提示']}")
print(f"\n" + "="*50)
print("🎉 恭喜!您已完成Pandas高级应用与性能优化的学习!")
print("通过本章的学习,您掌握了:")
print("• 内存优化和大数据处理技术")
print("• 性能分析和瓶颈识别方法")
print("• 并行处理和分布式计算")
print("• 生产环境最佳实践")
print("现在您已经具备了处理真实世界大规模数据项目的能力!")
print("="*50)