10.1 章节概述

本章将深入探讨Pandas的高级应用技巧和性能优化策略。我们将学习如何处理大规模数据集、优化内存使用、实现并行处理,以及掌握一些高级的数据处理技术。这些技能对于处理真实世界的大数据项目至关重要。

graph TD
    A[高级应用与性能优化] --> B[内存优化技术]
    A --> C[性能分析与监控]
    A --> D[并行处理与分布式计算]
    A --> E[高级数据处理技术]
    A --> F[大数据处理策略]
    A --> G[生产环境最佳实践]
    
    B --> B1[数据类型优化]
    B --> B2[内存映射]
    B --> B3[分块处理]
    
    C --> C1[性能分析工具]
    C --> C2[瓶颈识别]
    C --> C3[基准测试]
    
    D --> D1[多进程处理]
    D --> D2[Dask集成]
    D --> D3[并行计算模式]
    
    E --> E1[自定义函数优化]
    E --> E2[向量化操作]
    E --> E3[高级索引技术]
    
    F --> F1[流式处理]
    F --> F2[增量处理]
    F --> F3[数据管道设计]
    
    G --> G1[错误处理]
    G --> G2[日志记录]
    G --> G3[监控告警]

10.1.1 学习目标

通过本章学习,你将能够:

  1. 掌握内存优化技术

    • 理解Pandas内存使用机制
    • 学会选择合适的数据类型
    • 掌握分块处理大数据的方法
  2. 实现性能优化

    • 使用性能分析工具识别瓶颈
    • 应用向量化操作提升性能
    • 优化数据处理流程
  3. 应用并行处理技术

    • 实现多进程数据处理
    • 集成Dask进行分布式计算
    • 设计可扩展的数据处理架构
  4. 掌握高级数据处理技术

    • 实现复杂的数据转换
    • 构建高效的数据管道
    • 处理流式数据

10.1.2 准备工作

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')

# 设置显示选项
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', 50)

# 性能分析工具
import time
import psutil
import memory_profiler
from functools import wraps

# 并行处理工具
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import dask.dataframe as dd

# 创建示例数据
np.random.seed(42)

def create_large_dataset(n_rows=1000000):
    """创建大型示例数据集"""
    data = {
        'id': range(n_rows),
        'timestamp': pd.date_range('2020-01-01', periods=n_rows, freq='1min'),
        'category': np.random.choice(['A', 'B', 'C', 'D'], n_rows),
        'value1': np.random.randn(n_rows) * 100,
        'value2': np.random.randn(n_rows) * 50,
        'value3': np.random.randint(1, 1000, n_rows),
        'text_data': [f'text_{i%10000}' for i in range(n_rows)],
        'flag': np.random.choice([True, False], n_rows)
    }
    return pd.DataFrame(data)

# 创建示例数据
print("创建示例数据集...")
large_df = create_large_dataset(100000)  # 较小的数据集用于演示
print(f"数据集形状: {large_df.shape}")
print(f"内存使用: {large_df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

10.2 内存优化技术

10.2.1 数据类型优化

print("=== 数据类型优化 ===")

# 1. 分析当前内存使用
print("1. 当前内存使用分析:")

def analyze_memory_usage(df, name="DataFrame"):
    """分析DataFrame的内存使用情况"""
    memory_usage = df.memory_usage(deep=True)
    total_memory = memory_usage.sum()
    
    print(f"\n{name} 内存使用分析:")
    print(f"总内存使用: {total_memory / 1024**2:.2f} MB")
    print("\n各列内存使用:")
    for col, usage in memory_usage.items():
        if col != 'Index':
            dtype = df[col].dtype
            print(f"  {col:15} ({dtype}): {usage / 1024**2:.2f} MB")
    
    return total_memory

original_memory = analyze_memory_usage(large_df, "原始数据")

# 2. 数据类型优化
print("\n2. 数据类型优化:")

def optimize_dtypes(df):
    """优化DataFrame的数据类型"""
    optimized_df = df.copy()
    
    for col in optimized_df.columns:
        col_type = optimized_df[col].dtype
        
        if col_type != 'object':
            # 数值类型优化
            if str(col_type).startswith('int'):
                # 整数类型优化
                min_val = optimized_df[col].min()
                max_val = optimized_df[col].max()
                
                if min_val >= 0:
                    # 无符号整数
                    if max_val < 255:
                        optimized_df[col] = optimized_df[col].astype(np.uint8)
                    elif max_val < 65535:
                        optimized_df[col] = optimized_df[col].astype(np.uint16)
                    elif max_val < 4294967295:
                        optimized_df[col] = optimized_df[col].astype(np.uint32)
                else:
                    # 有符号整数
                    if min_val > -128 and max_val < 127:
                        optimized_df[col] = optimized_df[col].astype(np.int8)
                    elif min_val > -32768 and max_val < 32767:
                        optimized_df[col] = optimized_df[col].astype(np.int16)
                    elif min_val > -2147483648 and max_val < 2147483647:
                        optimized_df[col] = optimized_df[col].astype(np.int32)
            
            elif str(col_type).startswith('float'):
                # 浮点数类型优化
                optimized_df[col] = pd.to_numeric(optimized_df[col], downcast='float')
        
        else:
            # 对象类型优化
            if optimized_df[col].nunique() / len(optimized_df) < 0.5:
                # 如果唯一值比例小于50%,转换为category
                optimized_df[col] = optimized_df[col].astype('category')
    
    return optimized_df

# 应用优化
optimized_df = optimize_dtypes(large_df)
optimized_memory = analyze_memory_usage(optimized_df, "优化后数据")

# 计算节省的内存
memory_saved = original_memory - optimized_memory
memory_saved_percent = (memory_saved / original_memory) * 100

print(f"\n内存优化结果:")
print(f"原始内存: {original_memory / 1024**2:.2f} MB")
print(f"优化后内存: {optimized_memory / 1024**2:.2f} MB")
print(f"节省内存: {memory_saved / 1024**2:.2f} MB ({memory_saved_percent:.1f}%)")

# 3. 类别数据优化
print("\n3. 类别数据优化:")

# 比较不同类别数据存储方式
text_series = pd.Series(['A', 'B', 'C'] * 10000)
category_series = text_series.astype('category')

print(f"字符串存储: {text_series.memory_usage(deep=True) / 1024:.2f} KB")
print(f"类别存储: {category_series.memory_usage(deep=True) / 1024:.2f} KB")
print(f"节省比例: {(1 - category_series.memory_usage(deep=True) / text_series.memory_usage(deep=True)) * 100:.1f}%")

# 4. 稀疏数据优化
print("\n4. 稀疏数据优化:")

# 创建稀疏数据
sparse_data = np.random.choice([0, 1, 2], size=50000, p=[0.95, 0.03, 0.02])
dense_series = pd.Series(sparse_data)
sparse_series = pd.Series(sparse_data).astype(pd.SparseDtype(int, 0))

print(f"密集存储: {dense_series.memory_usage(deep=True) / 1024:.2f} KB")
print(f"稀疏存储: {sparse_series.memory_usage(deep=True) / 1024:.2f} KB")
print(f"稀疏度: {(sparse_data == 0).mean() * 100:.1f}%")
print(f"节省比例: {(1 - sparse_series.memory_usage(deep=True) / dense_series.memory_usage(deep=True)) * 100:.1f}%")

10.2.2 分块处理技术

print("\n=== 分块处理技术 ===")

# 1. 基础分块处理
print("1. 基础分块处理:")

def process_in_chunks(df, chunk_size=10000, func=None):
    """分块处理大型DataFrame"""
    if func is None:
        func = lambda x: x.describe()
    
    results = []
    n_chunks = len(df) // chunk_size + (1 if len(df) % chunk_size else 0)
    
    print(f"处理 {len(df)} 行数据,分为 {n_chunks} 个块")
    
    for i in range(0, len(df), chunk_size):
        chunk = df.iloc[i:i+chunk_size]
        result = func(chunk)
        results.append(result)
        
        if i // chunk_size % 10 == 0:  # 每10个块显示一次进度
            print(f"已处理: {min(i + chunk_size, len(df))}/{len(df)} 行")
    
    return results

# 示例:分块计算统计信息
def calculate_stats(chunk):
    """计算块的统计信息"""
    return {
        'count': len(chunk),
        'mean_value1': chunk['value1'].mean(),
        'std_value1': chunk['value1'].std(),
        'unique_categories': chunk['category'].nunique()
    }

chunk_results = process_in_chunks(large_df, chunk_size=20000, func=calculate_stats)

# 汇总结果
total_count = sum(r['count'] for r in chunk_results)
weighted_mean = sum(r['mean_value1'] * r['count'] for r in chunk_results) / total_count

print(f"\n分块处理结果汇总:")
print(f"总行数: {total_count}")
print(f"加权平均值: {weighted_mean:.4f}")
print(f"直接计算平均值: {large_df['value1'].mean():.4f}")

# 2. 内存映射处理
print("\n2. 内存映射处理:")

# 保存数据到文件
large_df.to_csv('temp_large_data.csv', index=False)

def process_csv_chunks(filename, chunk_size=10000):
    """使用分块读取处理大型CSV文件"""
    chunk_results = []
    
    # 使用chunksize参数分块读取
    for chunk_num, chunk in enumerate(pd.read_csv(filename, chunksize=chunk_size)):
        # 处理每个块
        result = {
            'chunk_num': chunk_num,
            'rows': len(chunk),
            'value1_sum': chunk['value1'].sum(),
            'value2_sum': chunk['value2'].sum()
        }
        chunk_results.append(result)
        
        if chunk_num % 5 == 0:
            print(f"处理块 {chunk_num}, 行数: {len(chunk)}")
    
    return chunk_results

csv_results = process_csv_chunks('temp_large_data.csv', chunk_size=15000)

# 汇总CSV处理结果
total_value1_sum = sum(r['value1_sum'] for r in csv_results)
total_value2_sum = sum(r['value2_sum'] for r in csv_results)

print(f"\nCSV分块处理结果:")
print(f"处理了 {len(csv_results)} 个块")
print(f"value1总和: {total_value1_sum:.2f}")
print(f"value2总和: {total_value2_sum:.2f}")

# 3. 生成器模式处理
print("\n3. 生成器模式处理:")

def data_generator(df, batch_size=5000):
    """数据生成器,逐批返回数据"""
    for i in range(0, len(df), batch_size):
        yield df.iloc[i:i+batch_size]

def streaming_aggregation(data_gen):
    """流式聚合处理"""
    running_sum = 0
    running_count = 0
    running_sum_squares = 0
    
    for batch in data_gen:
        batch_sum = batch['value1'].sum()
        batch_count = len(batch)
        batch_sum_squares = (batch['value1'] ** 2).sum()
        
        running_sum += batch_sum
        running_count += batch_count
        running_sum_squares += batch_sum_squares
        
        # 计算当前的统计信息
        current_mean = running_sum / running_count
        current_var = (running_sum_squares / running_count) - (current_mean ** 2)
        current_std = np.sqrt(current_var)
        
        yield {
            'processed_rows': running_count,
            'current_mean': current_mean,
            'current_std': current_std
        }

# 使用生成器处理数据
gen = data_generator(large_df, batch_size=10000)
streaming_results = list(streaming_aggregation(gen))

print("流式处理结果(最后几个批次):")
for result in streaming_results[-3:]:
    print(f"已处理 {result['processed_rows']} 行, "
          f"均值: {result['current_mean']:.4f}, "
          f"标准差: {result['current_std']:.4f}")

# 4. 内存监控
print("\n4. 内存监控:")

def memory_monitor(func):
    """内存监控装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        # 获取当前进程
        process = psutil.Process()
        
        # 执行前的内存使用
        mem_before = process.memory_info().rss / 1024**2
        
        # 执行函数
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        
        # 执行后的内存使用
        mem_after = process.memory_info().rss / 1024**2
        
        print(f"\n函数 {func.__name__} 执行监控:")
        print(f"执行时间: {end_time - start_time:.2f} 秒")
        print(f"执行前内存: {mem_before:.2f} MB")
        print(f"执行后内存: {mem_after:.2f} MB")
        print(f"内存变化: {mem_after - mem_before:.2f} MB")
        
        return result
    return wrapper

@memory_monitor
def memory_intensive_operation(df):
    """内存密集型操作示例"""
    # 创建多个副本
    df_copy1 = df.copy()
    df_copy2 = df.copy()
    df_copy3 = df.copy()
    
    # 进行一些计算
    result = df_copy1['value1'] + df_copy2['value2'] + df_copy3['value3']
    
    return result.sum()

# 测试内存监控
result = memory_intensive_operation(large_df)
print(f"计算结果: {result:.2f}")

# 清理临时文件
import os
if os.path.exists('temp_large_data.csv'):
    os.remove('temp_large_data.csv')

10.2.3 高效数据结构选择

print("\n=== 高效数据结构选择 ===")

# 1. 索引优化
print("1. 索引优化:")

def compare_index_performance():
    """比较不同索引类型的性能"""
    # 创建测试数据
    n = 100000
    df_default = pd.DataFrame({
        'id': range(n),
        'value': np.random.randn(n)
    })
    
    # 设置不同类型的索引
    df_int_index = df_default.set_index('id')
    df_range_index = df_default.copy()
    df_range_index.index = pd.RangeIndex(n)
    
    # 测试查询性能
    test_ids = np.random.choice(range(n), 1000)
    
    # 默认索引查询
    start_time = time.time()
    for idx in test_ids:
        _ = df_default.iloc[idx]
    default_time = time.time() - start_time
    
    # 整数索引查询
    start_time = time.time()
    for idx in test_ids:
        _ = df_int_index.loc[idx]
    int_index_time = time.time() - start_time
    
    # RangeIndex查询
    start_time = time.time()
    for idx in test_ids:
        _ = df_range_index.iloc[idx]
    range_index_time = time.time() - start_time
    
    print(f"默认索引查询时间: {default_time:.4f} 秒")
    print(f"整数索引查询时间: {int_index_time:.4f} 秒")
    print(f"RangeIndex查询时间: {range_index_time:.4f} 秒")
    
    # 内存使用比较
    print(f"\n内存使用比较:")
    print(f"默认索引: {df_default.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
    print(f"整数索引: {df_int_index.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
    print(f"RangeIndex: {df_range_index.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

compare_index_performance()

# 2. 数据结构选择策略
print("\n2. 数据结构选择策略:")

def choose_optimal_structure(data_characteristics):
    """根据数据特征选择最优数据结构"""
    recommendations = []
    
    # 分析数据特征
    for col, char in data_characteristics.items():
        if char['type'] == 'categorical':
            if char['cardinality'] / char['total_rows'] < 0.5:
                recommendations.append(f"{col}: 使用 category 类型")
            else:
                recommendations.append(f"{col}: 保持 object 类型")
        
        elif char['type'] == 'numeric':
            if char['has_nulls']:
                recommendations.append(f"{col}: 考虑使用 nullable integer 类型")
            else:
                if char['is_integer']:
                    recommendations.append(f"{col}: 使用合适的整数类型")
                else:
                    recommendations.append(f"{col}: 使用 float32 如果精度允许")
        
        elif char['type'] == 'datetime':
            recommendations.append(f"{col}: 使用 datetime64[ns] 并考虑时区")
        
        elif char['type'] == 'boolean':
            if char['has_nulls']:
                recommendations.append(f"{col}: 使用 boolean 类型(支持NA)")
            else:
                recommendations.append(f"{col}: 使用 bool 类型")
    
    return recommendations

# 分析示例数据特征
data_chars = {
    'id': {'type': 'numeric', 'is_integer': True, 'has_nulls': False, 'total_rows': len(large_df)},
    'category': {'type': 'categorical', 'cardinality': large_df['category'].nunique(), 'total_rows': len(large_df)},
    'timestamp': {'type': 'datetime', 'has_nulls': False, 'total_rows': len(large_df)},
    'flag': {'type': 'boolean', 'has_nulls': False, 'total_rows': len(large_df)}
}

recommendations = choose_optimal_structure(data_chars)
print("数据结构优化建议:")
for rec in recommendations:
    print(f"  - {rec}")

# 3. 向量化操作优化
print("\n3. 向量化操作优化:")

def compare_vectorization():
    """比较向量化操作与循环操作的性能"""
    n = 50000
    data = pd.DataFrame({
        'a': np.random.randn(n),
        'b': np.random.randn(n),
        'c': np.random.randn(n)
    })
    
    # 方法1:循环操作(慢)
    start_time = time.time()
    result1 = []
    for i in range(len(data)):
        result1.append(data.iloc[i]['a'] * data.iloc[i]['b'] + data.iloc[i]['c'])
    loop_time = time.time() - start_time
    
    # 方法2:向量化操作(快)
    start_time = time.time()
    result2 = data['a'] * data['b'] + data['c']
    vectorized_time = time.time() - start_time
    
    # 方法3:NumPy操作(最快)
    start_time = time.time()
    result3 = data['a'].values * data['b'].values + data['c'].values
    numpy_time = time.time() - start_time
    
    print(f"循环操作时间: {loop_time:.4f} 秒")
    print(f"向量化操作时间: {vectorized_time:.4f} 秒")
    print(f"NumPy操作时间: {numpy_time:.4f} 秒")
    print(f"向量化加速比: {loop_time / vectorized_time:.1f}x")
    print(f"NumPy加速比: {loop_time / numpy_time:.1f}x")

compare_vectorization()

# 4. 缓存策略
print("\n4. 缓存策略:")

from functools import lru_cache

class DataFrameCache:
    """DataFrame缓存类"""
    
    def __init__(self, max_size=100):
        self.cache = {}
        self.max_size = max_size
        self.access_count = {}
    
    def get(self, key, compute_func, *args, **kwargs):
        """获取缓存数据或计算新数据"""
        if key in self.cache:
            self.access_count[key] = self.access_count.get(key, 0) + 1
            print(f"缓存命中: {key}")
            return self.cache[key]
        
        # 计算新数据
        print(f"计算新数据: {key}")
        result = compute_func(*args, **kwargs)
        
        # 添加到缓存
        if len(self.cache) >= self.max_size:
            # 移除最少使用的项
            least_used = min(self.access_count.items(), key=lambda x: x[1])[0]
            del self.cache[least_used]
            del self.access_count[least_used]
        
        self.cache[key] = result
        self.access_count[key] = 1
        
        return result
    
    def clear(self):
        """清空缓存"""
        self.cache.clear()
        self.access_count.clear()

# 使用缓存示例
cache = DataFrameCache(max_size=5)

def expensive_computation(df, operation):
    """模拟昂贵的计算操作"""
    time.sleep(0.1)  # 模拟计算时间
    if operation == 'mean':
        return df.mean()
    elif operation == 'std':
        return df.std()
    elif operation == 'corr':
        return df.corr()

# 测试缓存效果
start_time = time.time()
result1 = cache.get('mean_calc', expensive_computation, large_df[['value1', 'value2']], 'mean')
first_call_time = time.time() - start_time

start_time = time.time()
result2 = cache.get('mean_calc', expensive_computation, large_df[['value1', 'value2']], 'mean')
second_call_time = time.time() - start_time

print(f"首次计算时间: {first_call_time:.4f} 秒")
print(f"缓存命中时间: {second_call_time:.4f} 秒")
print(f"缓存加速比: {first_call_time / second_call_time:.1f}x")

10.3 性能分析与监控

10.3.1 性能分析工具

print("\n=== 性能分析工具 ===")

# 1. 时间分析工具
print("1. 时间分析工具:")

import cProfile
import pstats
from io import StringIO

def profile_function(func, *args, **kwargs):
    """分析函数性能"""
    pr = cProfile.Profile()
    pr.enable()
    
    result = func(*args, **kwargs)
    
    pr.disable()
    
    # 获取统计信息
    s = StringIO()
    ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
    ps.print_stats(10)  # 显示前10个最耗时的函数
    
    print(s.getvalue())
    return result

def complex_data_operation(df):
    """复杂数据操作示例"""
    # 多步骤数据处理
    result = df.copy()
    result['new_col1'] = result['value1'] * result['value2']
    result['new_col2'] = result.groupby('category')['value1'].transform('mean')
    result['new_col3'] = result['value1'].rolling(window=100).mean()
    result = result.dropna()
    return result.groupby('category').agg({
        'new_col1': ['mean', 'std'],
        'new_col2': 'first',
        'new_col3': 'mean'
    })

print("性能分析结果:")
result = profile_function(complex_data_operation, large_df)

# 2. 内存分析工具
print("\n2. 内存分析工具:")

def memory_profile_decorator(func):
    """内存分析装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        # 使用memory_profiler进行内存分析
        from memory_profiler import profile
        
        # 获取函数的内存使用情况
        mem_usage = memory_profiler.memory_usage((func, args, kwargs), interval=0.1)
        
        result = func(*args, **kwargs)
        
        print(f"\n函数 {func.__name__} 内存分析:")
        print(f"最大内存使用: {max(mem_usage):.2f} MB")
        print(f"最小内存使用: {min(mem_usage):.2f} MB")
        print(f"内存波动: {max(mem_usage) - min(mem_usage):.2f} MB")
        
        return result
    return wrapper

@memory_profile_decorator
def memory_intensive_function(df):
    """内存密集型函数"""
    # 创建多个大型中间结果
    temp1 = df.copy()
    temp2 = df.copy()
    temp3 = pd.concat([temp1, temp2], ignore_index=True)
    
    # 进行计算
    result = temp3.groupby('category').agg({
        'value1': ['mean', 'std', 'min', 'max'],
        'value2': ['sum', 'count']
    })
    
    return result

memory_result = memory_intensive_function(large_df)

# 3. 基准测试框架
print("\n3. 基准测试框架:")

class BenchmarkSuite:
    """基准测试套件"""
    
    def __init__(self):
        self.results = {}
    
    def benchmark(self, name, func, *args, **kwargs):
        """执行基准测试"""
        times = []
        memory_usage = []
        
        # 多次运行取平均值
        for _ in range(5):
            process = psutil.Process()
            mem_before = process.memory_info().rss / 1024**2
            
            start_time = time.time()
            result = func(*args, **kwargs)
            end_time = time.time()
            
            mem_after = process.memory_info().rss / 1024**2
            
            times.append(end_time - start_time)
            memory_usage.append(mem_after - mem_before)
        
        self.results[name] = {
            'avg_time': np.mean(times),
            'std_time': np.std(times),
            'avg_memory': np.mean(memory_usage),
            'std_memory': np.std(memory_usage)
        }
        
        return result
    
    def compare_methods(self, methods_dict, *args, **kwargs):
        """比较多种方法的性能"""
        for name, func in methods_dict.items():
            self.benchmark(name, func, *args, **kwargs)
    
    def report(self):
        """生成性能报告"""
        print("\n=== 基准测试报告 ===")
        print(f"{'方法名':<20} {'平均时间(s)':<12} {'时间标准差':<12} {'内存变化(MB)':<15}")
        print("-" * 65)
        
        for name, stats in self.results.items():
            print(f"{name:<20} {stats['avg_time']:<12.4f} {stats['std_time']:<12.4f} {stats['avg_memory']:<15.2f}")

# 定义不同的数据处理方法
def method_pandas_groupby(df):
    return df.groupby('category')['value1'].mean()

def method_pandas_pivot(df):
    return df.pivot_table(values='value1', index='category', aggfunc='mean')

def method_manual_groupby(df):
    result = {}
    for cat in df['category'].unique():
        mask = df['category'] == cat
        result[cat] = df.loc[mask, 'value1'].mean()
    return pd.Series(result)

# 运行基准测试
benchmark = BenchmarkSuite()
methods = {
    'Pandas GroupBy': method_pandas_groupby,
    'Pandas Pivot': method_pandas_pivot,
    'Manual GroupBy': method_manual_groupby
}

benchmark.compare_methods(methods, large_df)
benchmark.report()

# 4. 性能监控仪表板
print("\n4. 性能监控仪表板:")

class PerformanceMonitor:
    """性能监控器"""
    
    def __init__(self):
        self.metrics = []
        self.start_time = None
    
    def start_monitoring(self):
        """开始监控"""
        self.start_time = time.time()
        self.metrics = []
    
    def record_metric(self, operation_name, duration, memory_delta):
        """记录性能指标"""
        self.metrics.append({
            'timestamp': time.time() - self.start_time,
            'operation': operation_name,
            'duration': duration,
            'memory_delta': memory_delta
        })
    
    def context_manager(self, operation_name):
        """上下文管理器用于自动监控"""
        return PerformanceContext(self, operation_name)
    
    def generate_report(self):
        """生成性能报告"""
        if not self.metrics:
            print("没有性能数据")
            return
        
        df_metrics = pd.DataFrame(self.metrics)
        
        print("\n=== 性能监控报告 ===")
        print(f"总监控时间: {df_metrics['timestamp'].max():.2f} 秒")
        print(f"总操作数: {len(df_metrics)}")
        
        # 按操作类型汇总
        summary = df_metrics.groupby('operation').agg({
            'duration': ['count', 'mean', 'sum'],
            'memory_delta': ['mean', 'sum']
        }).round(4)
        
        print("\n操作汇总:")
        print(summary)
        
        # 可视化性能趋势
        fig, axes = plt.subplots(2, 1, figsize=(12, 8))
        
        # 时间趋势
        axes[0].plot(df_metrics['timestamp'], df_metrics['duration'], 'o-')
        axes[0].set_title('操作耗时趋势')
        axes[0].set_xlabel('时间 (秒)')
        axes[0].set_ylabel('耗时 (秒)')
        
        # 内存使用趋势
        axes[1].plot(df_metrics['timestamp'], df_metrics['memory_delta'], 'o-', color='red')
        axes[1].set_title('内存使用变化趋势')
        axes[1].set_xlabel('时间 (秒)')
        axes[1].set_ylabel('内存变化 (MB)')
        
        plt.tight_layout()
        plt.show()

class PerformanceContext:
    """性能监控上下文管理器"""
    
    def __init__(self, monitor, operation_name):
        self.monitor = monitor
        self.operation_name = operation_name
        self.start_time = None
        self.start_memory = None
    
    def __enter__(self):
        process = psutil.Process()
        self.start_time = time.time()
        self.start_memory = process.memory_info().rss / 1024**2
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        process = psutil.Process()
        end_time = time.time()
        end_memory = process.memory_info().rss / 1024**2
        
        duration = end_time - self.start_time
        memory_delta = end_memory - self.start_memory
        
        self.monitor.record_metric(self.operation_name, duration, memory_delta)

# 使用性能监控器
monitor = PerformanceMonitor()
monitor.start_monitoring()

# 模拟一系列操作
with monitor.context_manager('数据加载'):
    time.sleep(0.1)  # 模拟加载时间

with monitor.context_manager('数据清洗'):
    cleaned_data = large_df.dropna()

with monitor.context_manager('特征工程'):
    cleaned_data['new_feature'] = cleaned_data['value1'] * cleaned_data['value2']

with monitor.context_manager('聚合计算'):
    result = cleaned_data.groupby('category').agg({
        'value1': 'mean',
        'value2': 'std',
        'new_feature': 'sum'
    })

monitor.generate_report()

10.3.2 瓶颈识别与优化

print("\n=== 瓶颈识别与优化 ===")

# 1. 常见性能瓶颈识别
print("1. 常见性能瓶颈识别:")

def identify_bottlenecks(df):
    """识别DataFrame操作的潜在瓶颈"""
    bottlenecks = []
    
    # 检查数据大小
    memory_usage = df.memory_usage(deep=True).sum() / 1024**2
    if memory_usage > 1000:  # 大于1GB
        bottlenecks.append(f"大数据集警告: {memory_usage:.1f} MB,考虑分块处理")
    
    # 检查数据类型
    for col in df.columns:
        if df[col].dtype == 'object':
            unique_ratio = df[col].nunique() / len(df)
            if unique_ratio < 0.5:
                bottlenecks.append(f"列 '{col}' 可优化为category类型")
    
    # 检查索引
    if not isinstance(df.index, pd.RangeIndex):
        bottlenecks.append("考虑使用RangeIndex以提高性能")
    
    # 检查重复数据
    duplicate_ratio = df.duplicated().sum() / len(df)
    if duplicate_ratio > 0.1:
        bottlenecks.append(f"发现 {duplicate_ratio:.1%} 重复数据,考虑去重")
    
    # 检查缺失值
    missing_ratio = df.isnull().sum().sum() / (len(df) * len(df.columns))
    if missing_ratio > 0.2:
        bottlenecks.append(f"发现 {missing_ratio:.1%} 缺失值,影响计算效率")
    
    return bottlenecks

bottlenecks = identify_bottlenecks(large_df)
print("发现的性能瓶颈:")
for bottleneck in bottlenecks:
    print(f"  - {bottleneck}")

# 2. 操作优化策略
print("\n2. 操作优化策略:")

def optimize_operations():
    """演示各种操作的优化策略"""
    
    # 创建测试数据
    test_df = large_df.copy()
    
    print("a) 条件筛选优化:")
    
    # 低效方法:多次筛选
    start_time = time.time()
    result1 = test_df[test_df['value1'] > 0]
    result1 = result1[result1['value2'] < 50]
    result1 = result1[result1['category'].isin(['A', 'B'])]
    inefficient_time = time.time() - start_time
    
    # 高效方法:组合条件
    start_time = time.time()
    mask = (test_df['value1'] > 0) & (test_df['value2'] < 50) & (test_df['category'].isin(['A', 'B']))
    result2 = test_df[mask]
    efficient_time = time.time() - start_time
    
    print(f"多次筛选时间: {inefficient_time:.4f} 秒")
    print(f"组合条件时间: {efficient_time:.4f} 秒")
    print(f"优化倍数: {inefficient_time / efficient_time:.1f}x")
    
    print("\nb) 字符串操作优化:")
    
    # 创建字符串数据
    string_data = pd.Series(['text_' + str(i) for i in range(10000)])
    
    # 低效方法:逐个处理
    start_time = time.time()
    result1 = string_data.apply(lambda x: x.upper())
    apply_time = time.time() - start_time
    
    # 高效方法:向量化操作
    start_time = time.time()
    result2 = string_data.str.upper()
    vectorized_time = time.time() - start_time
    
    print(f"apply方法时间: {apply_time:.4f} 秒")
    print(f"向量化时间: {vectorized_time:.4f} 秒")
    print(f"优化倍数: {apply_time / vectorized_time:.1f}x")
    
    print("\nc) 聚合操作优化:")
    
    # 低效方法:多次聚合
    start_time = time.time()
    mean_val = test_df.groupby('category')['value1'].mean()
    std_val = test_df.groupby('category')['value1'].std()
    count_val = test_df.groupby('category')['value1'].count()
    multiple_agg_time = time.time() - start_time
    
    # 高效方法:一次聚合
    start_time = time.time()
    result = test_df.groupby('category')['value1'].agg(['mean', 'std', 'count'])
    single_agg_time = time.time() - start_time
    
    print(f"多次聚合时间: {multiple_agg_time:.4f} 秒")
    print(f"一次聚合时间: {single_agg_time:.4f} 秒")
    print(f"优化倍数: {multiple_agg_time / single_agg_time:.1f}x")

optimize_operations()

# 3. 查询优化
print("\n3. 查询优化:")

def optimize_queries():
    """查询优化示例"""
    
    # 创建带索引的数据
    indexed_df = large_df.set_index('id')
    
    print("a) 索引查询 vs 扫描查询:")
    
    # 随机选择查询ID
    query_ids = np.random.choice(large_df['id'], 100)
    
    # 扫描查询(慢)
    start_time = time.time()
    for qid in query_ids:
        result = large_df[large_df['id'] == qid]
    scan_time = time.time() - start_time
    
    # 索引查询(快)
    start_time = time.time()
    for qid in query_ids:
        result = indexed_df.loc[qid:qid]
    index_time = time.time() - start_time
    
    print(f"扫描查询时间: {scan_time:.4f} 秒")
    print(f"索引查询时间: {index_time:.4f} 秒")
    print(f"优化倍数: {scan_time / index_time:.1f}x")
    
    print("\nb) 范围查询优化:")
    
    # 排序数据以优化范围查询
    sorted_df = large_df.sort_values('value1')
    
    # 普通范围查询
    start_time = time.time()
    result1 = large_df[(large_df['value1'] >= -50) & (large_df['value1'] <= 50)]
    normal_range_time = time.time() - start_time
    
    # 在排序数据上的范围查询
    start_time = time.time()
    result2 = sorted_df[(sorted_df['value1'] >= -50) & (sorted_df['value1'] <= 50)]
    sorted_range_time = time.time() - start_time
    
    print(f"普通范围查询时间: {normal_range_time:.4f} 秒")
    print(f"排序数据查询时间: {sorted_range_time:.4f} 秒")
    
    print("\nc) 查询计划优化:")
    
    # 复杂查询的不同执行顺序
    
    # 方法1:先筛选再聚合
    start_time = time.time()
    filtered = large_df[large_df['value1'] > 0]
    result1 = filtered.groupby('category')['value2'].mean()
    filter_first_time = time.time() - start_time
    
    # 方法2:先聚合再筛选(如果可能)
    start_time = time.time()
    grouped = large_df.groupby('category').apply(
        lambda x: x[x['value1'] > 0]['value2'].mean()
    )
    group_first_time = time.time() - start_time
    
    print(f"先筛选后聚合时间: {filter_first_time:.4f} 秒")
    print(f"分组内筛选时间: {group_first_time:.4f} 秒")

optimize_queries()

# 4. 自动优化建议系统
print("\n4. 自动优化建议系统:")

class OptimizationAdvisor:
    """优化建议系统"""
    
    def __init__(self):
        self.suggestions = []
    
    def analyze_dataframe(self, df):
        """分析DataFrame并提供优化建议"""
        self.suggestions = []
        
        # 内存优化建议
        self._analyze_memory_usage(df)
        
        # 数据类型优化建议
        self._analyze_dtypes(df)
        
        # 索引优化建议
        self._analyze_index(df)
        
        # 数据质量建议
        self._analyze_data_quality(df)
        
        return self.suggestions
    
    def _analyze_memory_usage(self, df):
        """分析内存使用"""
        memory_mb = df.memory_usage(deep=True).sum() / 1024**2
        
        if memory_mb > 500:
            self.suggestions.append({
                'type': 'memory',
                'priority': 'high',
                'message': f'数据集较大 ({memory_mb:.1f} MB),建议使用分块处理',
                'code': 'pd.read_csv(file, chunksize=10000)'
            })
    
    def _analyze_dtypes(self, df):
        """分析数据类型"""
        for col in df.columns:
            if df[col].dtype == 'object':
                unique_ratio = df[col].nunique() / len(df)
                if unique_ratio < 0.5:
                    self.suggestions.append({
                        'type': 'dtype',
                        'priority': 'medium',
                        'message': f'列 {col} 适合转换为category类型',
                        'code': f"df['{col}'] = df['{col}'].astype('category')"
                    })
            
            elif str(df[col].dtype).startswith('int64'):
                min_val, max_val = df[col].min(), df[col].max()
                if min_val >= 0 and max_val < 255:
                    self.suggestions.append({
                        'type': 'dtype',
                        'priority': 'low',
                        'message': f'列 {col} 可以使用uint8类型',
                        'code': f"df['{col}'] = df['{col}'].astype('uint8')"
                    })
    
    def _analyze_index(self, df):
        """分析索引"""
        if not isinstance(df.index, pd.RangeIndex):
            self.suggestions.append({
                'type': 'index',
                'priority': 'medium',
                'message': '考虑重置为RangeIndex以提高性能',
                'code': 'df.reset_index(drop=True, inplace=True)'
            })
    
    def _analyze_data_quality(self, df):
        """分析数据质量"""
        # 检查重复值
        dup_ratio = df.duplicated().sum() / len(df)
        if dup_ratio > 0.05:
            self.suggestions.append({
                'type': 'quality',
                'priority': 'medium',
                'message': f'发现 {dup_ratio:.1%} 重复数据',
                'code': 'df.drop_duplicates(inplace=True)'
            })
        
        # 检查缺失值
        missing_cols = df.columns[df.isnull().any()].tolist()
        if missing_cols:
            self.suggestions.append({
                'type': 'quality',
                'priority': 'medium',
                'message': f'列 {missing_cols} 包含缺失值',
                'code': 'df.fillna(method="ffill") 或 df.dropna()'
            })
    
    def generate_report(self):
        """生成优化报告"""
        if not self.suggestions:
            print("没有发现优化建议")
            return
        
        print("\n=== 优化建议报告 ===")
        
        # 按优先级分组
        by_priority = {}
        for suggestion in self.suggestions:
            priority = suggestion['priority']
            if priority not in by_priority:
                by_priority[priority] = []
            by_priority[priority].append(suggestion)
        
        # 按优先级输出
        for priority in ['high', 'medium', 'low']:
            if priority in by_priority:
                print(f"\n{priority.upper()} 优先级建议:")
                for i, suggestion in enumerate(by_priority[priority], 1):
                    print(f"{i}. {suggestion['message']}")
                    print(f"   建议代码: {suggestion['code']}")

# 使用优化建议系统
advisor = OptimizationAdvisor()
suggestions = advisor.analyze_dataframe(large_df)
advisor.generate_report()

10.4 并行处理与分布式计算

10.4.1 多进程处理

print("\n=== 多进程处理 ===")

# 1. 基础多进程处理
print("1. 基础多进程处理:")

def process_chunk(chunk_data):
    """处理数据块的函数"""
    chunk, chunk_id = chunk_data
    
    # 模拟复杂计算
    result = {
        'chunk_id': chunk_id,
        'rows': len(chunk),
        'mean_value1': chunk['value1'].mean(),
        'std_value1': chunk['value1'].std(),
        'category_counts': chunk['category'].value_counts().to_dict()
    }
    
    return result

def parallel_processing_demo():
    """并行处理演示"""
    
    # 准备数据块
    chunk_size = 10000
    chunks = []
    for i in range(0, len(large_df), chunk_size):
        chunk = large_df.iloc[i:i+chunk_size]
        chunks.append((chunk, i // chunk_size))
    
    print(f"数据分为 {len(chunks)} 个块进行处理")
    
    # 串行处理
    start_time = time.time()
    serial_results = []
    for chunk_data in chunks:
        result = process_chunk(chunk_data)
        serial_results.append(result)
    serial_time = time.time() - start_time
    
    # 并行处理
    start_time = time.time()
    with ProcessPoolExecutor(max_workers=mp.cpu_count()) as executor:
        parallel_results = list(executor.map(process_chunk, chunks))
    parallel_time = time.time() - start_time
    
    print(f"串行处理时间: {serial_time:.2f} 秒")
    print(f"并行处理时间: {parallel_time:.2f} 秒")
    print(f"加速比: {serial_time / parallel_time:.1f}x")
    print(f"使用CPU核心数: {mp.cpu_count()}")
    
    return parallel_results

parallel_results = parallel_processing_demo()

# 2. 高级并行模式
print("\n2. 高级并行模式:")

class ParallelDataProcessor:
    """并行数据处理器"""
    
    def __init__(self, n_workers=None):
        self.n_workers = n_workers or mp.cpu_count()
    
    def map_reduce(self, df, map_func, reduce_func, chunk_size=10000):
        """Map-Reduce模式处理"""
        
        # Map阶段:并行处理数据块
        chunks = self._split_dataframe(df, chunk_size)
        
        with ProcessPoolExecutor(max_workers=self.n_workers) as executor:
            map_results = list(executor.map(map_func, chunks))
        
        # Reduce阶段:合并结果
        final_result = reduce_func(map_results)
        
        return final_result
    
    def _split_dataframe(self, df, chunk_size):
        """分割DataFrame为块"""
        chunks = []
        for i in range(0, len(df), chunk_size):
            chunks.append(df.iloc[i:i+chunk_size])
        return chunks
    
    def parallel_apply(self, df, func, chunk_size=10000):
        """并行应用函数"""
        chunks = self._split_dataframe(df, chunk_size)
        
        with ProcessPoolExecutor(max_workers=self.n_workers) as executor:
            results = list(executor.map(func, chunks))
        
        # 合并结果
        return pd.concat(results, ignore_index=True)
    
    def parallel_groupby(self, df, groupby_cols, agg_dict, chunk_size=10000):
        """并行分组聚合"""
        
        def map_func(chunk):
            return chunk.groupby(groupby_cols).agg(agg_dict)
        
        def reduce_func(results):
            combined = pd.concat(results)
            return combined.groupby(level=0).agg(agg_dict)
        
        return self.map_reduce(df, map_func, reduce_func, chunk_size)

# 使用并行处理器
processor = ParallelDataProcessor()

# 示例1:并行计算统计信息
def compute_stats(chunk):
    """计算统计信息"""
    return pd.DataFrame({
        'category': chunk['category'].unique(),
        'mean_value1': [chunk[chunk['category'] == cat]['value1'].mean() 
                       for cat in chunk['category'].unique()],
        'count': [chunk[chunk['category'] == cat].shape[0] 
                 for cat in chunk['category'].unique()]
    })

def combine_stats(results):
    """合并统计结果"""
    combined = pd.concat(results, ignore_index=True)
    return combined.groupby('category').agg({
        'mean_value1': lambda x: np.average(x, weights=combined.loc[x.index, 'count']),
        'count': 'sum'
    })

print("并行统计计算:")
start_time = time.time()
parallel_stats = processor.map_reduce(large_df, compute_stats, combine_stats)
parallel_time = time.time() - start_time

# 对比串行计算
start_time = time.time()
serial_stats = large_df.groupby('category')['value1'].agg(['mean', 'count'])
serial_time = time.time() - start_time

print(f"并行计算时间: {parallel_time:.4f} 秒")
print(f"串行计算时间: {serial_time:.4f} 秒")
print("并行计算结果:")
print(parallel_stats)

# 3. 内存共享优化
print("\n3. 内存共享优化:")

import multiprocessing.shared_memory as shared_memory

def create_shared_dataframe(df):
    """创建共享内存DataFrame"""
    # 将DataFrame转换为numpy数组
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    numeric_data = df[numeric_cols].values
    
    # 创建共享内存
    shm = shared_memory.SharedMemory(create=True, size=numeric_data.nbytes)
    
    # 将数据复制到共享内存
    shared_array = np.ndarray(numeric_data.shape, dtype=numeric_data.dtype, buffer=shm.buf)
    shared_array[:] = numeric_data[:]
    
    return shm, shared_array.shape, shared_array.dtype, numeric_cols.tolist()

def process_shared_data(shm_info):
    """处理共享内存数据"""
    shm_name, shape, dtype, columns = shm_info
    
    # 连接到共享内存
    existing_shm = shared_memory.SharedMemory(name=shm_name)
    shared_array = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
    
    # 创建DataFrame
    df = pd.DataFrame(shared_array, columns=columns)
    
    # 进行计算
    result = df.mean().to_dict()
    
    # 清理
    existing_shm.close()
    
    return result

# 注意:共享内存在某些环境中可能不可用,这里提供概念演示
print("共享内存处理概念演示(可能需要特定环境支持)")

# 4. 异步处理
print("\n4. 异步处理:")

import asyncio
import concurrent.futures

async def async_data_processing():
    """异步数据处理示例"""
    
    def cpu_bound_task(chunk):
        """CPU密集型任务"""
        return chunk.groupby('category')['value1'].agg(['mean', 'std', 'count'])
    
    # 创建事件循环
    loop = asyncio.get_event_loop()
    
    # 准备数据块
    chunks = [large_df.iloc[i:i+5000] for i in range(0, len(large_df), 5000)]
    
    # 使用线程池执行CPU密集型任务
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        # 提交所有任务
        futures = [loop.run_in_executor(executor, cpu_bound_task, chunk) for chunk in chunks]
        
        # 等待所有任务完成
        results = await asyncio.gather(*futures)
    
    # 合并结果
    final_result = pd.concat(results).groupby(level=0).agg({
        'mean': lambda x: np.average(x, weights=pd.concat(results).loc[x.index, 'count']),
        'std': 'mean',
        'count': 'sum'
    })
    
    return final_result

# 运行异步处理
print("异步处理演示:")
try:
    # 在某些环境中可能需要不同的事件循环处理
    async_result = asyncio.run(async_data_processing())
    print("异步处理完成")
    print(async_result.head())
except Exception as e:
    print(f"异步处理演示跳过: {e}")

10.4.2 Dask集成

print("\n=== Dask集成 ===")

# 1. Dask DataFrame基础
print("1. Dask DataFrame基础:")

try:
    import dask
    import dask.dataframe as dd
    from dask.distributed import Client
    
    # 创建Dask DataFrame
    dask_df = dd.from_pandas(large_df, npartitions=4)
    
    print(f"Pandas DataFrame形状: {large_df.shape}")
    print(f"Dask DataFrame分区数: {dask_df.npartitions}")
    print(f"每个分区大小: ~{len(large_df) // dask_df.npartitions} 行")
    
    # Dask操作是惰性的
    print("\n2. 惰性计算演示:")
    
    # 定义计算图
    result = dask_df.groupby('category')['value1'].mean()
    print("计算图已创建(未执行)")
    
    # 执行计算
    start_time = time.time()
    computed_result = result.compute()
    dask_time = time.time() - start_time
    
    # 对比Pandas直接计算
    start_time = time.time()
    pandas_result = large_df.groupby('category')['value1'].mean()
    pandas_time = time.time() - start_time
    
    print(f"Dask计算时间: {dask_time:.4f} 秒")
    print(f"Pandas计算时间: {pandas_time:.4f} 秒")
    print("Dask结果:")
    print(computed_result)
    
    # 3. 复杂操作链
    print("\n3. 复杂操作链:")
    
    # 构建复杂的计算图
    complex_result = (dask_df
                     .assign(value_ratio=dask_df['value1'] / dask_df['value2'])
                     .query('value_ratio > 0')
                     .groupby('category')
                     .agg({'value1': ['mean', 'std'], 
                           'value_ratio': 'mean'})
                     .compute())
    
    print("复杂操作结果:")
    print(complex_result)
    
    # 4. 分布式计算
    print("\n4. 分布式计算设置:")
    
    # 创建本地集群
    try:
        client = Client(processes=False, threads_per_worker=2, n_workers=2)
        print(f"Dask客户端: {client}")
        print(f"集群信息: {client.cluster}")
        
        # 使用分布式计算
        distributed_result = dask_df.value1.sum().compute()
        print(f"分布式计算结果: {distributed_result}")
        
        client.close()
        
    except Exception as e:
        print(f"分布式计算演示跳过: {e}")
    
    # 5. 内存外计算
    print("\n5. 内存外计算演示:")
    
    # 保存大数据集到多个文件
    large_df.to_csv('temp_data_part1.csv', index=False)
    large_df.to_csv('temp_data_part2.csv', index=False)
    
    # 从多个文件创建Dask DataFrame
    file_pattern = 'temp_data_part*.csv'
    dask_from_files = dd.read_csv(file_pattern)
    
    print(f"从文件创建的Dask DataFrame分区数: {dask_from_files.npartitions}")
    
    # 执行内存外计算
    out_of_core_result = dask_from_files.groupby('category').value1.mean().compute()
    print("内存外计算结果:")
    print(out_of_core_result)
    
    # 清理临时文件
    import os
    for file in ['temp_data_part1.csv', 'temp_data_part2.csv']:
        if os.path.exists(file):
            os.remove(file)

except ImportError:
    print("Dask未安装,跳过Dask演示")
    print("安装命令: pip install dask[complete]")

# 6. 自定义并行函数
print("\n6. 自定义并行函数:")

def custom_parallel_function(df, func, n_workers=None):
    """自定义并行函数执行器"""
    if n_workers is None:
        n_workers = mp.cpu_count()
    
    # 分割数据
    chunk_size = len(df) // n_workers
    chunks = [df.iloc[i:i+chunk_size] for i in range(0, len(df), chunk_size)]
    
    # 并行执行
    with ProcessPoolExecutor(max_workers=n_workers) as executor:
        results = list(executor.map(func, chunks))
    
    return results

def sample_function(chunk):
    """示例处理函数"""
    return {
        'chunk_size': len(chunk),
        'mean_value1': chunk['value1'].mean(),
        'unique_categories': chunk['category'].nunique()
    }

# 测试自定义并行函数
parallel_custom_results = custom_parallel_function(large_df, sample_function, n_workers=4)

print("自定义并行函数结果:")
for i, result in enumerate(parallel_custom_results):
    print(f"块 {i}: {result}")

10.4.3 分布式计算架构

print("\n=== 分布式计算架构 ===")

# 1. 数据分片策略
print("1. 数据分片策略:")

class DataSharding:
    """数据分片管理器"""
    
    def __init__(self, df):
        self.df = df
        self.shards = {}
    
    def shard_by_hash(self, column, n_shards):
        """基于哈希的分片"""
        hash_values = pd.util.hash_pandas_object(self.df[column])
        shard_ids = hash_values % n_shards
        
        for shard_id in range(n_shards):
            mask = shard_ids == shard_id
            self.shards[f'hash_shard_{shard_id}'] = self.df[mask]
        
        return self.shards
    
    def shard_by_range(self, column, n_shards):
        """基于范围的分片"""
        sorted_df = self.df.sort_values(column)
        chunk_size = len(sorted_df) // n_shards
        
        for shard_id in range(n_shards):
            start_idx = shard_id * chunk_size
            end_idx = start_idx + chunk_size if shard_id < n_shards - 1 else len(sorted_df)
            self.shards[f'range_shard_{shard_id}'] = sorted_df.iloc[start_idx:end_idx]
        
        return self.shards
    
    def shard_by_category(self, column):
        """基于类别的分片"""
        for category in self.df[column].unique():
            mask = self.df[column] == category
            self.shards[f'category_shard_{category}'] = self.df[mask]
        
        return self.shards
    
    def get_shard_info(self):
        """获取分片信息"""
        info = {}
        for shard_name, shard_data in self.shards.items():
            info[shard_name] = {
                'rows': len(shard_data),
                'memory_mb': shard_data.memory_usage(deep=True).sum() / 1024**2,
                'columns': list(shard_data.columns)
            }
        return info

# 演示不同分片策略
sharding = DataSharding(large_df)

# 哈希分片
hash_shards = sharding.shard_by_hash('id', 4)
print("哈希分片信息:")
hash_info = sharding.get_shard_info()
for shard_name, info in hash_info.items():
    print(f"  {shard_name}: {info['rows']} 行, {info['memory_mb']:.2f} MB")

# 重置分片器
sharding = DataSharding(large_df)

# 类别分片
category_shards = sharding.shard_by_category('category')
print("\n类别分片信息:")
category_info = sharding.get_shard_info()
for shard_name, info in category_info.items():
    print(f"  {shard_name}: {info['rows']} 行, {info['memory_mb']:.2f} MB")

# 2. 负载均衡
print("\n2. 负载均衡:")

class LoadBalancer:
    """负载均衡器"""
    
    def __init__(self, workers):
        self.workers = workers
        self.worker_loads = {worker: 0 for worker in workers}
    
    def assign_task(self, task_weight=1):
        """分配任务到负载最小的工作节点"""
        min_load_worker = min(self.worker_loads.items(), key=lambda x: x[1])[0]
        self.worker_loads[min_load_worker] += task_weight
        return min_load_worker
    
    def complete_task(self, worker, task_weight=1):
        """标记任务完成"""
        self.worker_loads[worker] -= task_weight
    
    def get_load_distribution(self):
        """获取负载分布"""
        return self.worker_loads.copy()

# 模拟负载均衡
workers = ['worker_1', 'worker_2', 'worker_3', 'worker_4']
balancer = LoadBalancer(workers)

# 分配任务
tasks = [('task_A', 3), ('task_B', 1), ('task_C', 2), ('task_D', 4), ('task_E', 1)]

print("任务分配过程:")
task_assignments = {}
for task_name, task_weight in tasks:
    assigned_worker = balancer.assign_task(task_weight)
    task_assignments[task_name] = assigned_worker
    print(f"{task_name} (权重:{task_weight}) -> {assigned_worker}")

print(f"\n最终负载分布: {balancer.get_load_distribution()}")

# 3. 容错机制
print("\n3. 容错机制:")

class FaultTolerantProcessor:
    """容错处理器"""
    
    def __init__(self, max_retries=3):
        self.max_retries = max_retries
        self.failed_tasks = []
        self.completed_tasks = []
    
    def process_with_retry(self, task_func, *args, **kwargs):
        """带重试的任务处理"""
        for attempt in range(self.max_retries + 1):
            try:
                result = task_func(*args, **kwargs)
                self.completed_tasks.append({
                    'task': task_func.__name__,
                    'attempt': attempt + 1,
                    'status': 'success'
                })
                return result
            
            except Exception as e:
                if attempt == self.max_retries:
                    self.failed_tasks.append({
                        'task': task_func.__name__,
                        'attempts': attempt + 1,
                        'error': str(e)
                    })
                    raise e
                else:
                    print(f"任务 {task_func.__name__} 第 {attempt + 1} 次尝试失败: {e}")
                    time.sleep(0.1 * (attempt + 1))  # 指数退避
    
    def get_statistics(self):
        """获取处理统计"""
        return {
            'completed': len(self.completed_tasks),
            'failed': len(self.failed_tasks),
            'success_rate': len(self.completed_tasks) / (len(self.completed_tasks) + len(self.failed_tasks)) if (len(self.completed_tasks) + len(self.failed_tasks)) > 0 else 0
        }

def unreliable_task(data, fail_probability=0.3):
    """不可靠的任务(模拟失败)"""
    if np.random.random() < fail_probability:
        raise Exception("随机任务失败")
    return data.sum()

# 测试容错机制
processor = FaultTolerantProcessor(max_retries=2)

print("容错处理测试:")
for i in range(5):
    try:
        result = processor.process_with_retry(unreliable_task, large_df['value1'])
        print(f"任务 {i+1} 成功: {result:.2f}")
    except Exception as e:
        print(f"任务 {i+1} 最终失败: {e}")

stats = processor.get_statistics()
print(f"\n处理统计: {stats}")

10.5 高级数据处理技术

10.5.1 自定义函数优化

print("\n=== 自定义函数优化 ===")

# 1. 向量化自定义函数
print("1. 向量化自定义函数:")

# 低效的逐行处理
def slow_custom_function(row):
    """慢速自定义函数"""
    if row['value1'] > 0:
        return row['value1'] * row['value2'] + 10
    else:
        return row['value2'] - 5

# 向量化版本
def fast_custom_function(value1, value2):
    """快速向量化函数"""
    return np.where(value1 > 0, value1 * value2 + 10, value2 - 5)

# 性能比较
print("自定义函数性能比较:")

# 测试慢速版本
start_time = time.time()
slow_result = large_df.apply(slow_custom_function, axis=1)
slow_time = time.time() - start_time

# 测试快速版本
start_time = time.time()
fast_result = fast_custom_function(large_df['value1'], large_df['value2'])
fast_time = time.time() - start_time

print(f"逐行处理时间: {slow_time:.4f} 秒")
print(f"向量化处理时间: {fast_time:.4f} 秒")
print(f"加速比: {slow_time / fast_time:.1f}x")
print(f"结果一致性: {np.allclose(slow_result, fast_result)}")

# 2. NumPy集成优化
print("\n2. NumPy集成优化:")

def numpy_optimized_function(df):
    """使用NumPy优化的函数"""
    # 直接使用NumPy数组
    values1 = df['value1'].values
    values2 = df['value2'].values
    
    # NumPy向量化操作
    result = np.where(
        values1 > values1.mean(),
        values1 * values2,
        values1 + values2
    )
    
    return pd.Series(result, index=df.index)

def pandas_function(df):
    """纯Pandas版本"""
    mean_val = df['value1'].mean()
    return np.where(
        df['value1'] > mean_val,
        df['value1'] * df['value2'],
        df['value1'] + df['value2']
    )

# 性能比较
start_time = time.time()
numpy_result = numpy_optimized_function(large_df)
numpy_time = time.time() - start_time

start_time = time.time()
pandas_result = pandas_function(large_df)
pandas_time = time.time() - start_time

print(f"NumPy优化时间: {numpy_time:.4f} 秒")
print(f"Pandas版本时间: {pandas_time:.4f} 秒")
print(f"NumPy加速比: {pandas_time / numpy_time:.1f}x")

# 3. Numba加速
print("\n3. Numba加速:")

try:
    from numba import jit, prange
    
    @jit(nopython=True)
    def numba_function(values1, values2):
        """Numba编译的函数"""
        result = np.empty(len(values1))
        for i in prange(len(values1)):
            if values1[i] > 0:
                result[i] = values1[i] * values2[i] + 10
            else:
                result[i] = values2[i] - 5
        return result
    
    # 预热Numba函数
    _ = numba_function(large_df['value1'].values[:100], large_df['value2'].values[:100])
    
    # 性能测试
    start_time = time.time()
    numba_result = numba_function(large_df['value1'].values, large_df['value2'].values)
    numba_time = time.time() - start_time
    
    print(f"Numba编译时间: {numba_time:.4f} 秒")
    print(f"相比向量化加速比: {fast_time / numba_time:.1f}x")
    
except ImportError:
    print("Numba未安装,跳过Numba演示")
    print("安装命令: pip install numba")

# 4. 缓存优化
print("\n4. 缓存优化:")

from functools import lru_cache

@lru_cache(maxsize=128)
def expensive_calculation(value):
    """昂贵的计算(带缓存)"""
    # 模拟复杂计算
    time.sleep(0.001)
    return value ** 2 + np.sin(value) * 100

def apply_with_cache(series):
    """应用带缓存的函数"""
    return series.apply(expensive_calculation)

def apply_without_cache(series):
    """不带缓存的版本"""
    return series.apply(lambda x: x ** 2 + np.sin(x) * 100)

# 创建有重复值的测试数据
test_data = pd.Series(np.random.choice(range(100), 1000))

# 测试缓存效果
start_time = time.time()
cached_result = apply_with_cache(test_data)
cached_time = time.time() - start_time

start_time = time.time()
uncached_result = apply_without_cache(test_data)
uncached_time = time.time() - start_time

print(f"带缓存时间: {cached_time:.4f} 秒")
print(f"不带缓存时间: {uncached_time:.4f} 秒")
print(f"缓存加速比: {uncached_time / cached_time:.1f}x")
print(f"数据重复率: {1 - test_data.nunique() / len(test_data):.1%}")

10.5.2 流式数据处理

print("\n=== 流式数据处理 ===")

# 1. 数据流生成器
print("1. 数据流生成器:")

class DataStreamGenerator:
    """数据流生成器"""
    
    def __init__(self, batch_size=1000, total_batches=10):
        self.batch_size = batch_size
        self.total_batches = total_batches
        self.current_batch = 0
    
    def generate_batch(self):
        """生成一批数据"""
        if self.current_batch >= self.total_batches:
            return None
        
        data = {
            'timestamp': pd.date_range(
                start=pd.Timestamp.now() + pd.Timedelta(minutes=self.current_batch),
                periods=self.batch_size,
                freq='1s'
            ),
            'value': np.random.randn(self.batch_size) * 100,
            'category': np.random.choice(['A', 'B', 'C'], self.batch_size),
            'batch_id': self.current_batch
        }
        
        self.current_batch += 1
        return pd.DataFrame(data)
    
    def stream(self):
        """数据流迭代器"""
        while True:
            batch = self.generate_batch()
            if batch is None:
                break
            yield batch

# 2. 流式处理器
class StreamProcessor:
    """流式数据处理器"""
    
    def __init__(self, window_size=5):
        self.window_size = window_size
        self.buffer = []
        self.processed_count = 0
        self.running_stats = {
            'count': 0,
            'sum': 0,
            'sum_squares': 0
        }
    
    def process_batch(self, batch):
        """处理单个批次"""
        # 更新运行统计
        batch_sum = batch['value'].sum()
        batch_count = len(batch)
        batch_sum_squares = (batch['value'] ** 2).sum()
        
        self.running_stats['count'] += batch_count
        self.running_stats['sum'] += batch_sum
        self.running_stats['sum_squares'] += batch_sum_squares
        
        # 计算当前统计信息
        current_mean = self.running_stats['sum'] / self.running_stats['count']
        current_var = (self.running_stats['sum_squares'] / self.running_stats['count']) - (current_mean ** 2)
        current_std = np.sqrt(current_var)
        
        # 滑动窗口处理
        self.buffer.append(batch)
        if len(self.buffer) > self.window_size:
            self.buffer.pop(0)
        
        # 窗口内聚合
        if self.buffer:
            window_data = pd.concat(self.buffer, ignore_index=True)
            window_stats = {
                'window_mean': window_data['value'].mean(),
                'window_std': window_data['value'].std(),
                'window_size': len(window_data)
            }
        else:
            window_stats = {}
        
        self.processed_count += 1
        
        return {
            'batch_id': self.processed_count,
            'batch_stats': {
                'mean': batch['value'].mean(),
                'std': batch['value'].std(),
                'count': len(batch)
            },
            'running_stats': {
                'mean': current_mean,
                'std': current_std,
                'total_count': self.running_stats['count']
            },
            'window_stats': window_stats
        }
    
    def process_stream(self, data_stream):
        """处理数据流"""
        results = []
        
        for batch in data_stream:
            result = self.process_batch(batch)
            results.append(result)
            
            # 实时输出
            print(f"批次 {result['batch_id']}: "
                  f"批次均值={result['batch_stats']['mean']:.2f}, "
                  f"运行均值={result['running_stats']['mean']:.2f}, "
                  f"总计数={result['running_stats']['total_count']}")
        
        return results

# 演示流式处理
print("流式处理演示:")
generator = DataStreamGenerator(batch_size=500, total_batches=5)
processor = StreamProcessor(window_size=3)

stream_results = processor.process_stream(generator.stream())

print(f"\n流式处理完成,共处理 {len(stream_results)} 个批次")

# 3. 实时聚合
print("\n3. 实时聚合:")

class RealTimeAggregator:
    """实时聚合器"""
    
    def __init__(self):
        self.aggregates = {}
        self.last_update = {}
    
    def update(self, key, value, timestamp=None):
        """更新聚合值"""
        if timestamp is None:
            timestamp = pd.Timestamp.now()
        
        if key not in self.aggregates:
            self.aggregates[key] = {
                'count': 0,
                'sum': 0,
                'min': float('inf'),
                'max': float('-inf'),
                'values': []  # 用于计算中位数等
            }
        
        agg = self.aggregates[key]
        agg['count'] += 1
        agg['sum'] += value
        agg['min'] = min(agg['min'], value)
        agg['max'] = max(agg['max'], value)
        agg['values'].append(value)
        
        # 保持values列表大小限制
        if len(agg['values']) > 1000:
            agg['values'] = agg['values'][-1000:]
        
        self.last_update[key] = timestamp
    
    def get_stats(self, key):
        """获取聚合统计"""
        if key not in self.aggregates:
            return None
        
        agg = self.aggregates[key]
        return {
            'count': agg['count'],
            'mean': agg['sum'] / agg['count'],
            'min': agg['min'],
            'max': agg['max'],
            'median': np.median(agg['values']),
            'last_update': self.last_update[key]
        }
    
    def get_all_stats(self):
        """获取所有聚合统计"""
        return {key: self.get_stats(key) for key in self.aggregates.keys()}

# 演示实时聚合
aggregator = RealTimeAggregator()

# 模拟实时数据流
print("实时聚合演示:")
for i in range(20):
    category = np.random.choice(['A', 'B', 'C'])
    value = np.random.randn() * 100
    aggregator.update(category, value)
    
    if i % 5 == 4:  # 每5次更新显示一次统计
        print(f"\n第 {i+1} 次更新后的统计:")
        for cat, stats in aggregator.get_all_stats().items():
            print(f"  {cat}: 计数={stats['count']}, 均值={stats['mean']:.2f}, "
                  f"中位数={stats['median']:.2f}")

# 4. 数据管道设计
print("\n4. 数据管道设计:")

class DataPipeline:
    """数据处理管道"""
    
    def __init__(self):
        self.stages = []
        self.metrics = {
            'processed_records': 0,
            'processing_time': 0,
            'errors': 0
        }
    
    def add_stage(self, stage_func, stage_name=None):
        """添加处理阶段"""
        if stage_name is None:
            stage_name = stage_func.__name__
        
        self.stages.append({
            'func': stage_func,
            'name': stage_name
        })
        return self
    
    def process(self, data):
        """执行管道处理"""
        start_time = time.time()
        current_data = data
        
        try:
            for stage in self.stages:
                stage_start = time.time()
                current_data = stage['func'](current_data)
                stage_time = time.time() - stage_start
                
                print(f"阶段 '{stage['name']}' 完成,耗时: {stage_time:.4f}秒")
            
            self.metrics['processed_records'] += len(data) if hasattr(data, '__len__') else 1
            self.metrics['processing_time'] += time.time() - start_time
            
            return current_data
            
        except Exception as e:
            self.metrics['errors'] += 1
            print(f"管道处理错误: {e}")
            raise
    
    def get_metrics(self):
        """获取管道指标"""
        return self.metrics.copy()

# 定义管道阶段
def clean_data(df):
    """数据清洗阶段"""
    return df.dropna()

def transform_data(df):
    """数据转换阶段"""
    df = df.copy()
    df['value_squared'] = df['value1'] ** 2
    return df

def aggregate_data(df):
    """数据聚合阶段"""
    return df.groupby('category').agg({
        'value1': ['mean', 'std'],
        'value_squared': 'sum'
    })

# 构建和执行管道
pipeline = (DataPipeline()
           .add_stage(clean_data, "数据清洗")
           .add_stage(transform_data, "数据转换")
           .add_stage(aggregate_data, "数据聚合"))

print("数据管道执行:")
pipeline_result = pipeline.process(large_df)

print(f"\n管道执行指标: {pipeline.get_metrics()}")
print("管道结果:")
print(pipeline_result)

10.6 生产环境最佳实践

10.6.1 错误处理与日志记录

print("\n=== 错误处理与日志记录 ===")

import logging
from datetime import datetime

# 1. 日志配置
print("1. 日志配置:")

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('pandas_processing.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger('PandasProcessor')

class DataProcessorWithLogging:
    """带日志记录的数据处理器"""
    
    def __init__(self, name="DataProcessor"):
        self.name = name
        self.logger = logging.getLogger(name)
        self.processing_stats = {
            'start_time': None,
            'end_time': None,
            'records_processed': 0,
            'errors_count': 0
        }
    
    def process_data(self, df, operations):
        """处理数据并记录日志"""
        self.processing_stats['start_time'] = datetime.now()
        self.logger.info(f"开始处理数据,输入形状: {df.shape}")
        
        try:
            result = df.copy()
            
            for i, operation in enumerate(operations):
                operation_start = time.time()
                
                try:
                    result = operation(result)
                    operation_time = time.time() - operation_start
                    
                    self.logger.info(f"操作 {i+1} ({operation.__name__}) 完成,"
                                   f"耗时: {operation_time:.4f}秒,"
                                   f"结果形状: {result.shape}")
                
                except Exception as e:
                    self.processing_stats['errors_count'] += 1
                    self.logger.error(f"操作 {i+1} ({operation.__name__}) 失败: {e}")
                    raise
            
            self.processing_stats['records_processed'] = len(result)
            self.processing_stats['end_time'] = datetime.now()
            
            total_time = (self.processing_stats['end_time'] - 
                         self.processing_stats['start_time']).total_seconds()
            
            self.logger.info(f"数据处理完成,总耗时: {total_time:.2f}秒,"
                           f"处理记录数: {self.processing_stats['records_processed']}")
            
            return result
            
        except Exception as e:
            self.logger.error(f"数据处理失败: {e}")
            self.processing_stats['end_time'] = datetime.now()
            raise
    
    def get_stats(self):
        """获取处理统计"""
        return self.processing_stats.copy()

# 定义处理操作
def remove_outliers(df):
    """移除异常值"""
    Q1 = df['value1'].quantile(0.25)
    Q3 = df['value1'].quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    return df[(df['value1'] >= lower_bound) & (df['value1'] <= upper_bound)]

def add_features(df):
    """添加特征"""
    df = df.copy()
    df['value_ratio'] = df['value1'] / df['value2']
    df['value_sum'] = df['value1'] + df['value2']
    return df

def normalize_values(df):
    """标准化数值"""
    df = df.copy()
    df['value1_norm'] = (df['value1'] - df['value1'].mean()) / df['value1'].std()
    df['value2_norm'] = (df['value2'] - df['value2'].mean()) / df['value2'].std()
    return df

# 使用带日志的处理器
processor = DataProcessorWithLogging("MainProcessor")
operations = [remove_outliers, add_features, normalize_values]

print("执行带日志的数据处理:")
processed_data = processor.process_data(large_df, operations)

print(f"\n处理统计: {processor.get_stats()}")

# 2. 异常处理策略
print("\n2. 异常处理策略:")

class RobustDataProcessor:
    """健壮的数据处理器"""
    
    def __init__(self, max_retries=3, fallback_strategy='skip'):
        self.max_retries = max_retries
        self.fallback_strategy = fallback_strategy  # 'skip', 'default', 'raise'
        self.error_log = []
    
    def safe_operation(self, df, operation, operation_name=None):
        """安全执行操作"""
        if operation_name is None:
            operation_name = operation.__name__
        
        for attempt in range(self.max_retries + 1):
            try:
                return operation(df)
            
            except Exception as e:
                error_info = {
                    'operation': operation_name,
                    'attempt': attempt + 1,
                    'error': str(e),
                    'timestamp': datetime.now()
                }
                self.error_log.append(error_info)
                
                if attempt == self.max_retries:
                    # 最后一次尝试失败,应用回退策略
                    return self._apply_fallback_strategy(df, operation_name, e)
                else:
                    logger.warning(f"操作 {operation_name} 第 {attempt + 1} 次尝试失败: {e}")
                    time.sleep(0.1 * (attempt + 1))  # 指数退避
    
    def _apply_fallback_strategy(self, df, operation_name, error):
        """应用回退策略"""
        if self.fallback_strategy == 'skip':
            logger.warning(f"跳过失败的操作 {operation_name}")
            return df
        elif self.fallback_strategy == 'default':
            logger.warning(f"使用默认值处理 {operation_name}")
            return self._get_default_result(df, operation_name)
        else:  # 'raise'
            logger.error(f"操作 {operation_name} 最终失败")
            raise error
    
    def _get_default_result(self, df, operation_name):
        """获取默认结果"""
        # 根据操作类型返回合理的默认值
        if 'normalize' in operation_name.lower():
            # 标准化失败时返回原数据
            return df
        elif 'feature' in operation_name.lower():
            # 特征工程失败时添加空列
            df = df.copy()
            df['default_feature'] = 0
            return df
        else:
            return df
    
    def get_error_summary(self):
        """获取错误摘要"""
        if not self.error_log:
            return "没有错误记录"
        
        error_summary = {}
        for error in self.error_log:
            op = error['operation']
            if op not in error_summary:
                error_summary[op] = {'count': 0, 'last_error': ''}
            error_summary[op]['count'] += 1
            error_summary[op]['last_error'] = error['error']
        
        return error_summary

# 模拟可能失败的操作
def unreliable_operation(df):
    """不可靠的操作"""
    if np.random.random() < 0.7:  # 70%概率失败
        raise ValueError("模拟操作失败")
    return df.copy()

# 测试健壮处理器
robust_processor = RobustDataProcessor(max_retries=2, fallback_strategy='skip')

print("健壮处理器测试:")
for i in range(3):
    try:
        result = robust_processor.safe_operation(large_df, unreliable_operation, f"测试操作_{i+1}")
        print(f"操作 {i+1} 成功")
    except Exception as e:
        print(f"操作 {i+1} 失败: {e}")

print(f"\n错误摘要: {robust_processor.get_error_summary()}")

# 3. 性能监控
print("\n3. 性能监控:")

class PerformanceMonitor:
    """性能监控器"""
    
    def __init__(self):
        self.metrics = []
        self.alerts = []
        self.thresholds = {
            'max_processing_time': 10.0,  # 秒
            'max_memory_usage': 1000.0,   # MB
            'min_success_rate': 0.95      # 95%
        }
    
    def monitor_operation(self, operation_name):
        """操作监控装饰器"""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                start_time = time.time()
                start_memory = psutil.Process().memory_info().rss / 1024**2
                
                try:
                    result = func(*args, **kwargs)
                    success = True
                    error_msg = None
                except Exception as e:
                    success = False
                    error_msg = str(e)
                    result = None
                    raise
                finally:
                    end_time = time.time()
                    end_memory = psutil.Process().memory_info().rss / 1024**2
                    
                    metric = {
                        'operation': operation_name,
                        'timestamp': datetime.now(),
                        'duration': end_time - start_time,
                        'memory_delta': end_memory - start_memory,
                        'success': success,
                        'error': error_msg
                    }
                    
                    self.metrics.append(metric)
                    self._check_alerts(metric)
                
                return result
            return wrapper
        return decorator
    
    def _check_alerts(self, metric):
        """检查告警条件"""
        alerts = []
        
        if metric['duration'] > self.thresholds['max_processing_time']:
            alerts.append(f"处理时间过长: {metric['duration']:.2f}秒")
        
        if metric['memory_delta'] > self.thresholds['max_memory_usage']:
            alerts.append(f"内存使用过高: {metric['memory_delta']:.2f}MB")
        
        # 检查成功率
        recent_metrics = self.metrics[-10:]  # 最近10次操作
        if len(recent_metrics) >= 5:
            success_rate = sum(1 for m in recent_metrics if m['success']) / len(recent_metrics)
            if success_rate < self.thresholds['min_success_rate']:
                alerts.append(f"成功率过低: {success_rate:.1%}")
        
        for alert in alerts:
            alert_info = {
                'timestamp': metric['timestamp'],
                'operation': metric['operation'],
                'message': alert
            }
            self.alerts.append(alert_info)
            logger.warning(f"性能告警: {alert}")
    
    def get_performance_report(self):
        """生成性能报告"""
        if not self.metrics:
            return "没有性能数据"
        
        df_metrics = pd.DataFrame(self.metrics)
        
        report = {
            'total_operations': len(df_metrics),
            'success_rate': df_metrics['success'].mean(),
            'avg_duration': df_metrics['duration'].mean(),
            'max_duration': df_metrics['duration'].max(),
            'avg_memory_delta': df_metrics['memory_delta'].mean(),
            'total_alerts': len(self.alerts)
        }
        
        return report

# 使用性能监控
monitor = PerformanceMonitor()

@monitor.monitor_operation("数据聚合")
def monitored_aggregation(df):
    """被监控的聚合操作"""
    time.sleep(0.1)  # 模拟处理时间
    return df.groupby('category').agg({
        'value1': ['mean', 'std'],
        'value2': 'sum'
    })

@monitor.monitor_operation("数据转换")
def monitored_transformation(df):
    """被监控的转换操作"""
    df = df.copy()
    df['new_feature'] = df['value1'] * df['value2']
    return df

# 执行监控操作
print("执行性能监控:")
for i in range(5):
    try:
        agg_result = monitored_aggregation(large_df)
        trans_result = monitored_transformation(large_df)
        print(f"操作组 {i+1} 完成")
    except Exception as e:
        print(f"操作组 {i+1} 失败: {e}")

performance_report = monitor.get_performance_report()
print(f"\n性能报告: {performance_report}")

if monitor.alerts:
    print(f"\n告警记录 ({len(monitor.alerts)} 条):")
    for alert in monitor.alerts[-3:]:  # 显示最近3条告警
        print(f"  {alert['timestamp']}: {alert['message']}")

10.7 本章小结

10.7.1 核心知识点回顾

print("\n=== 核心知识点回顾 ===")

# 总结本章学习的关键技术
key_techniques = {
    "内存优化": [
        "数据类型优化(category, sparse, 数值类型降级)",
        "分块处理大数据集",
        "内存映射和流式处理",
        "缓存策略和内存监控"
    ],
    "性能优化": [
        "向量化操作替代循环",
        "NumPy集成优化",
        "索引优化和查询优化",
        "性能分析和瓶颈识别"
    ],
    "并行处理": [
        "多进程数据处理",
        "Dask分布式计算",
        "异步处理模式",
        "负载均衡和容错机制"
    ],
    "高级技术": [
        "自定义函数优化",
        "流式数据处理",
        "数据管道设计",
        "实时聚合处理"
    ],
    "生产实践": [
        "错误处理和重试机制",
        "日志记录和监控",
        "性能告警系统",
        "健壮性设计"
    ]
}

print("本章核心技术总结:")
for category, techniques in key_techniques.items():
    print(f"\n{category}:")
    for technique in techniques:
        print(f"  • {technique}")

10.7.2 性能优化检查清单

print("\n=== 性能优化检查清单 ===")

optimization_checklist = {
    "数据加载阶段": [
        "☐ 使用合适的数据类型(category, int8/16, float32)",
        "☐ 分块读取大文件(chunksize参数)",
        "☐ 只加载需要的列(usecols参数)",
        "☐ 使用高效的文件格式(Parquet, HDF5)"
    ],
    "数据处理阶段": [
        "☐ 优先使用向量化操作",
        "☐ 避免不必要的数据复制",
        "☐ 使用inplace操作减少内存使用",
        "☐ 合并多个操作减少中间结果"
    ],
    "查询和筛选": [
        "☐ 使用布尔索引而非循环",
        "☐ 设置合适的索引",
        "☐ 优化查询顺序(先筛选后聚合)",
        "☐ 使用query()方法处理复杂条件"
    ],
    "聚合和分组": [
        "☐ 一次性完成多个聚合操作",
        "☐ 使用transform()避免重复计算",
        "☐ 考虑使用groupby的优化参数",
        "☐ 预排序数据提高分组效率"
    ],
    "内存管理": [
        "☐ 定期检查内存使用情况",
        "☐ 及时删除不需要的变量",
        "☐ 使用生成器处理大数据",
        "☐ 监控内存泄漏"
    ]
}

print("性能优化检查清单:")
for phase, items in optimization_checklist.items():
    print(f"\n{phase}:")
    for item in items:
        print(f"  {item}")

10.7.3 常见陷阱与解决方案

print("\n=== 常见陷阱与解决方案 ===")

common_pitfalls = {
    "内存陷阱": {
        "问题": "数据类型选择不当导致内存浪费",
        "示例": "使用object类型存储重复的字符串",
        "解决方案": "转换为category类型,节省50-90%内存",
        "代码": "df['column'] = df['column'].astype('category')"
    },
    "性能陷阱": {
        "问题": "使用apply()处理可向量化的操作",
        "示例": "df.apply(lambda x: x['a'] + x['b'], axis=1)",
        "解决方案": "使用向量化操作",
        "代码": "df['a'] + df['b']"
    },
    "索引陷阱": {
        "问题": "重复设置索引导致性能下降",
        "示例": "在循环中多次调用set_index()",
        "解决方案": "预先设置索引或使用其他查询方法",
        "代码": "df.set_index('key', inplace=True)  # 只设置一次"
    },
    "复制陷阱": {
        "问题": "不必要的数据复制消耗内存",
        "示例": "df_new = df.copy(); df_new['col'] = value",
        "解决方案": "使用inplace操作或视图",
        "代码": "df['col'] = value  # 直接修改"
    },
    "聚合陷阱": {
        "问题": "多次分组聚合重复计算",
        "示例": "多次调用groupby()计算不同统计量",
        "解决方案": "一次性计算所有需要的聚合",
        "代码": "df.groupby('key').agg({'col1': 'mean', 'col2': 'sum'})"
    }
}

print("常见陷阱与解决方案:")
for pitfall_type, details in common_pitfalls.items():
    print(f"\n{pitfall_type}:")
    print(f"  问题: {details['问题']}")
    print(f"  示例: {details['示例']}")
    print(f"  解决方案: {details['解决方案']}")
    print(f"  推荐代码: {details['代码']}")

10.7.4 下一步学习建议

print("\n=== 下一步学习建议 ===")

next_steps = {
    "深入学习方向": [
        "学习Dask进行大规模分布式计算",
        "掌握Apache Spark与Pandas的集成",
        "了解GPU加速计算(cuDF, Rapids)",
        "学习时间序列数据库(InfluxDB, TimescaleDB)"
    ],
    "相关技术栈": [
        "NumPy高级功能和优化技巧",
        "Scikit-learn机器学习管道",
        "Matplotlib/Seaborn高级可视化",
        "Jupyter Notebook性能优化"
    ],
    "生产环境技能": [
        "Docker容器化Pandas应用",
        "Kubernetes部署和扩展",
        "监控和告警系统集成",
        "数据质量和治理实践"
    ],
    "实践项目建议": [
        "构建实时数据处理管道",
        "开发数据质量监控系统",
        "实现自动化数据分析报告",
        "创建高性能数据API服务"
    ]
}

print("下一步学习建议:")
for category, suggestions in next_steps.items():
    print(f"\n{category}:")
    for suggestion in suggestions:
        print(f"  • {suggestion}")

10.7.5 练习题

print("\n=== 练习题 ===")

exercises = [
    {
        "题目": "内存优化挑战",
        "描述": "给定一个1GB的CSV文件,包含用户行为数据,要求在内存限制为512MB的环境中完成数据分析",
        "要求": [
            "实现分块读取和处理",
            "优化数据类型减少内存使用",
            "计算用户活跃度统计",
            "生成每日汇总报告"
        ],
        "提示": "使用chunksize、category类型、生成器模式"
    },
    {
        "题目": "性能优化竞赛",
        "描述": "优化一个现有的数据处理脚本,要求将处理时间减少至少50%",
        "要求": [
            "识别性能瓶颈",
            "应用向量化操作",
            "优化查询和聚合",
            "实现并行处理"
        ],
        "提示": "使用cProfile分析、向量化替代循环、多进程处理"
    },
    {
        "题目": "实时数据流处理",
        "描述": "设计一个实时数据流处理系统,处理每秒1000条记录的用户事件",
        "要求": [
            "实现流式数据接收",
            "设计滑动窗口聚合",
            "实现异常检测和告警",
            "提供实时监控面板"
        ],
        "提示": "使用生成器、滑动窗口、异步处理、实时聚合"
    },
    {
        "题目": "分布式计算项目",
        "描述": "使用Dask处理多个大型数据文件,实现分布式机器学习管道",
        "要求": [
            "设置Dask集群",
            "实现数据预处理管道",
            "分布式特征工程",
            "模型训练和评估"
        ],
        "提示": "Dask DataFrame、分布式计算、机器学习集成"
    },
    {
        "题目": "生产环境部署",
        "描述": "将Pandas数据处理应用部署到生产环境,确保高可用性和可扩展性",
        "要求": [
            "实现错误处理和重试机制",
            "添加日志记录和监控",
            "设计健康检查和告警",
            "实现自动扩缩容"
        ],
        "提示": "容错设计、日志系统、监控告警、容器化部署"
    }
]

print("本章练习题:")
for i, exercise in enumerate(exercises, 1):
    print(f"\n练习 {i}: {exercise['题目']}")
    print(f"描述: {exercise['描述']}")
    print("要求:")
    for req in exercise['要求']:
        print(f"  • {req}")
    print(f"提示: {exercise['提示']}")

print(f"\n" + "="*50)
print("🎉 恭喜!您已完成Pandas高级应用与性能优化的学习!")
print("通过本章的学习,您掌握了:")
print("• 内存优化和大数据处理技术")
print("• 性能分析和瓶颈识别方法") 
print("• 并行处理和分布式计算")
print("• 生产环境最佳实践")
print("现在您已经具备了处理真实世界大规模数据项目的能力!")
print("="*50)