📚 本章概述

本章将深入探讨NumPy的性能优化技术和高级使用技巧。您将学习如何编写高效的NumPy代码,理解底层实现原理,掌握内存管理和并行计算技术,以及学会使用各种高级功能来解决复杂的数值计算问题。

🎯 学习目标

  • 理解NumPy的性能特点和优化原理
  • 掌握向量化编程技术
  • 学会内存管理和缓存优化
  • 了解并行计算和多线程技术
  • 掌握高级索引和广播技巧
  • 学会性能分析和调试方法
  • 理解与其他库的集成优化

1. 性能基础理论

1.1 NumPy性能原理

import numpy as np
import time
import matplotlib.pyplot as plt
import sys
import psutil
import gc
from memory_profiler import profile

# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

print("🚀 NumPy性能基础理论")
print("=" * 50)

# 1. 向量化 vs 循环比较
print("1. 向量化 vs 循环性能比较:")

def python_loop_sum(arr):
    """Python循环求和"""
    total = 0
    for i in range(len(arr)):
        total += arr[i]
    return total

def numpy_sum(arr):
    """NumPy向量化求和"""
    return np.sum(arr)

# 测试不同大小的数组
sizes = [1000, 10000, 100000, 1000000]
python_times = []
numpy_times = []
speedups = []

print("数组大小\tPython循环\tNumPy向量化\t速度提升")
print("-" * 50)

for size in sizes:
    arr = np.random.random(size)
    
    # Python循环计时
    start = time.time()
    python_result = python_loop_sum(arr)
    python_time = time.time() - start
    
    # NumPy向量化计时
    start = time.time()
    numpy_result = numpy_sum(arr)
    numpy_time = time.time() - start
    
    speedup = python_time / numpy_time
    
    python_times.append(python_time)
    numpy_times.append(numpy_time)
    speedups.append(speedup)
    
    print(f"{size:>8}\t{python_time:.6f}s\t{numpy_time:.6f}s\t{speedup:.1f}x")
    
    # 验证结果一致性
    assert abs(python_result - numpy_result) < 1e-10

# 可视化性能比较
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

ax1.loglog(sizes, python_times, 'o-', label='Python循环', linewidth=2)
ax1.loglog(sizes, numpy_times, 's-', label='NumPy向量化', linewidth=2)
ax1.set_xlabel('数组大小')
ax1.set_ylabel('执行时间 (秒)')
ax1.set_title('执行时间比较')
ax1.legend()
ax1.grid(True)

ax2.semilogx(sizes, speedups, 'o-', color='red', linewidth=2)
ax2.set_xlabel('数组大小')
ax2.set_ylabel('速度提升倍数')
ax2.set_title('NumPy速度提升')
ax2.grid(True)

plt.tight_layout()
plt.savefig('numpy_performance_comparison.png', dpi=300, bbox_inches='tight')
plt.show()

print(f"\n最大速度提升: {max(speedups):.1f}x")

1.2 内存布局和缓存效率

# 2. 内存布局和缓存效率
print("\n💾 内存布局和缓存效率:")

# C-order vs Fortran-order
print("C-order vs Fortran-order性能比较:")

size = 2000
arr_c = np.random.random((size, size))  # C-order (行优先)
arr_f = np.asfortranarray(arr_c)        # Fortran-order (列优先)

print(f"C-order数组: {arr_c.flags}")
print(f"Fortran-order数组: {arr_f.flags}")

# 行遍历性能测试
def row_sum_test(arr):
    """按行求和测试"""
    return np.sum(arr, axis=1)

def col_sum_test(arr):
    """按列求和测试"""
    return np.sum(arr, axis=0)

# 测试C-order数组
print(f"\nC-order数组性能:")
start = time.time()
c_row_result = row_sum_test(arr_c)
c_row_time = time.time() - start

start = time.time()
c_col_result = col_sum_test(arr_c)
c_col_time = time.time() - start

print(f"按行求和: {c_row_time:.6f}s")
print(f"按列求和: {c_col_time:.6f}s")
print(f"行/列时间比: {c_col_time/c_row_time:.2f}")

# 测试Fortran-order数组
print(f"\nFortran-order数组性能:")
start = time.time()
f_row_result = row_sum_test(arr_f)
f_row_time = time.time() - start

start = time.time()
f_col_result = col_sum_test(arr_f)
f_col_time = time.time() - start

print(f"按行求和: {f_row_time:.6f}s")
print(f"按列求和: {f_col_time:.6f}s")
print(f"行/列时间比: {f_row_time/f_col_time:.2f}")

# 内存连续性测试
print(f"\n内存连续性影响:")

# 创建非连续数组
arr_strided = arr_c[::2, ::2]  # 步长为2的子数组
arr_copy = arr_strided.copy()   # 连续的副本

print(f"原数组连续性: {arr_c.flags['C_CONTIGUOUS']}")
print(f"步长数组连续性: {arr_strided.flags['C_CONTIGUOUS']}")
print(f"副本数组连续性: {arr_copy.flags['C_CONTIGUOUS']}")

# 性能比较
start = time.time()
strided_sum = np.sum(arr_strided)
strided_time = time.time() - start

start = time.time()
copy_sum = np.sum(arr_copy)
copy_time = time.time() - start

print(f"步长数组求和时间: {strided_time:.6f}s")
print(f"连续数组求和时间: {copy_time:.6f}s")
print(f"性能差异: {strided_time/copy_time:.2f}x")

1.3 数据类型优化

# 3. 数据类型优化
print("\n🔢 数据类型优化:")

# 不同数据类型的性能和内存使用
dtypes = [np.int8, np.int16, np.int32, np.int64, 
          np.float16, np.float32, np.float64]

size = 1000000
print("数据类型\t内存使用(MB)\t计算时间(s)\t精度损失")
print("-" * 55)

base_array = np.random.random(size) * 1000

for dtype in dtypes:
    # 转换数据类型
    arr = base_array.astype(dtype)
    
    # 内存使用
    memory_mb = arr.nbytes / 1024 / 1024
    
    # 计算性能测试
    start = time.time()
    result = np.sum(arr ** 2)
    calc_time = time.time() - start
    
    # 精度测试(与float64比较)
    reference = base_array.astype(np.float64)
    precision_loss = np.mean(np.abs(arr.astype(np.float64) - reference))
    
    print(f"{dtype.__name__:>10}\t{memory_mb:>8.2f}\t{calc_time:>10.6f}\t{precision_loss:>10.6f}")

# 混合精度计算示例
print(f"\n混合精度计算示例:")

# 创建大型矩阵
n = 2000
A_f64 = np.random.random((n, n)).astype(np.float64)
B_f64 = np.random.random((n, n)).astype(np.float64)

A_f32 = A_f64.astype(np.float32)
B_f32 = B_f64.astype(np.float32)

# float64矩阵乘法
start = time.time()
C_f64 = np.dot(A_f64, B_f64)
time_f64 = time.time() - start

# float32矩阵乘法
start = time.time()
C_f32 = np.dot(A_f32, B_f32)
time_f32 = time.time() - start

# 混合精度:float32计算,float64结果
start = time.time()
C_mixed = np.dot(A_f32, B_f32).astype(np.float64)
time_mixed = time.time() - start

print(f"float64计算时间: {time_f64:.4f}s")
print(f"float32计算时间: {time_f32:.4f}s")
print(f"混合精度时间: {time_mixed:.4f}s")

print(f"float32速度提升: {time_f64/time_f32:.2f}x")
print(f"混合精度速度提升: {time_f64/time_mixed:.2f}x")

# 精度比较
error_f32 = np.mean(np.abs(C_f64 - C_f32.astype(np.float64)))
error_mixed = np.mean(np.abs(C_f64 - C_mixed))

print(f"float32精度损失: {error_f32:.2e}")
print(f"混合精度损失: {error_mixed:.2e}")

2. 向量化编程技术

2.1 高级向量化技巧

# 4. 高级向量化技巧
print("\n⚡ 高级向量化技巧:")

# 条件向量化
print("1. 条件向量化:")

# 传统方法
def traditional_conditional(arr):
    result = np.zeros_like(arr)
    for i in range(len(arr)):
        if arr[i] > 0:
            result[i] = np.sqrt(arr[i])
        elif arr[i] < 0:
            result[i] = -np.sqrt(-arr[i])
        else:
            result[i] = 0
    return result

# 向量化方法
def vectorized_conditional(arr):
    result = np.zeros_like(arr)
    pos_mask = arr > 0
    neg_mask = arr < 0
    result[pos_mask] = np.sqrt(arr[pos_mask])
    result[neg_mask] = -np.sqrt(-arr[neg_mask])
    return result

# 使用np.where的方法
def where_conditional(arr):
    return np.where(arr > 0, np.sqrt(arr),
                   np.where(arr < 0, -np.sqrt(-arr), 0))

# 性能测试
test_arr = np.random.uniform(-10, 10, 100000)

methods = [
    ("传统循环", traditional_conditional),
    ("向量化掩码", vectorized_conditional),
    ("np.where", where_conditional)
]

print("方法\t\t执行时间\t速度提升")
print("-" * 40)

baseline_time = None
for name, func in methods:
    start = time.time()
    result = func(test_arr)
    exec_time = time.time() - start
    
    if baseline_time is None:
        baseline_time = exec_time
        speedup = 1.0
    else:
        speedup = baseline_time / exec_time
    
    print(f"{name:>10}\t{exec_time:.6f}s\t{speedup:.1f}x")

# 验证结果一致性
results = [func(test_arr) for _, func in methods]
for i in range(1, len(results)):
    assert np.allclose(results[0], results[i], rtol=1e-10)

print("✅ 所有方法结果一致")

2.2 广播优化技巧

# 5. 广播优化技巧
print("\n📡 广播优化技巧:")

# 矩阵运算优化
print("1. 矩阵运算优化:")

# 创建测试数据
test_array = np.random.random(1000000)

# 比较求和函数
print("求和函数比较:")
sum_functions = [
    ("numpy_sum", numpy_sum),
    ("python_sum", python_sum)
]
benchmark.compare_functions(sum_functions, test_array)

print("\n均值函数比较:")
mean_functions = [
    ("numpy_mean", numpy_mean),
    ("manual_mean", manual_mean)
]
benchmark.compare_functions(mean_functions, test_array)

print("\n标准差函数比较:")
std_functions = [
    ("numpy_std", numpy_std),
    ("manual_std", manual_std)
]
benchmark.compare_functions(std_functions, test_array)

# 绘制结果
benchmark.plot_results()

7. 与其他库的集成优化

7.1 Pandas集成优化

# 17. Pandas集成优化
print("\n🐼 Pandas集成优化:")

try:
    import pandas as pd
    
    # NumPy vs Pandas性能比较
    print("1. NumPy vs Pandas性能比较:")
    
    # 创建测试数据
    n = 1000000
    data_dict = {
        'A': np.random.random(n),
        'B': np.random.random(n),
        'C': np.random.randint(0, 100, n)
    }
    
    # NumPy数组
    np_data = np.column_stack([data_dict['A'], data_dict['B'], data_dict['C']])
    
    # Pandas DataFrame
    df = pd.DataFrame(data_dict)
    
    # 计算操作比较
    print("计算操作性能比较:")
    
    # NumPy计算
    start = time.time()
    np_mean = np.mean(np_data[:, 0])
    np_sum = np.sum(np_data[:, 1])
    np_filtered = np_data[np_data[:, 2] > 50]
    np_time = time.time() - start
    
    # Pandas计算
    start = time.time()
    pd_mean = df['A'].mean()
    pd_sum = df['B'].sum()
    pd_filtered = df[df['C'] > 50]
    pd_time = time.time() - start
    
    print(f"NumPy计算时间: {np_time:.6f}s")
    print(f"Pandas计算时间: {pd_time:.6f}s")
    print(f"NumPy速度优势: {pd_time/np_time:.2f}x")
    
    # 优化的Pandas操作
    print(f"\n2. 优化的Pandas操作:")
    
    # 使用.values访问底层NumPy数组
    start = time.time()
    values_mean = np.mean(df['A'].values)
    values_sum = np.sum(df['B'].values)
    values_time = time.time() - start
    
    # 使用向量化操作
    start = time.time()
    df['D'] = df['A'] * df['B']  # 向量化乘法
    df['E'] = np.where(df['C'] > 50, df['A'], 0)  # 条件赋值
    vectorized_time = time.time() - start
    
    print(f"使用.values: {values_time:.6f}s")
    print(f"向量化操作: {vectorized_time:.6f}s")
    
    # 内存使用比较
    print(f"\n3. 内存使用比较:")
    np_memory = np_data.nbytes / 1024 / 1024
    df_memory = df.memory_usage(deep=True).sum() / 1024 / 1024
    
    print(f"NumPy内存使用: {np_memory:.2f} MB")
    print(f"Pandas内存使用: {df_memory:.2f} MB")
    print(f"Pandas内存开销: {df_memory/np_memory:.2f}x")

except ImportError:
    print("Pandas未安装,跳过Pandas集成示例")

7.2 SciPy集成优化

# 18. SciPy集成优化
print("\n🔬 SciPy集成优化:")

try:
    from scipy import linalg, sparse, optimize, signal
    
    # 线性代数优化
    print("1. 线性代数优化:")
    
    n = 2000
    A = np.random.random((n, n))
    b = np.random.random(n)
    
    # NumPy求解
    start = time.time()
    x_numpy = np.linalg.solve(A, b)
    numpy_time = time.time() - start
    
    # SciPy求解(可能使用更优化的LAPACK)
    start = time.time()
    x_scipy = linalg.solve(A, b)
    scipy_time = time.time() - start
    
    print(f"NumPy求解时间: {numpy_time:.6f}s")
    print(f"SciPy求解时间: {scipy_time:.6f}s")
    print(f"结果一致性: {np.allclose(x_numpy, x_scipy)}")
    
    # 稀疏矩阵优化
    print(f"\n2. 稀疏矩阵优化:")
    
    # 创建稀疏矩阵
    density = 0.01  # 1%的非零元素
    sparse_data = np.random.random((n, n))
    sparse_data[np.random.random((n, n)) > density] = 0
    
    # 密集矩阵操作
    start = time.time()
    dense_result = np.dot(sparse_data, sparse_data.T)
    dense_time = time.time() - start
    dense_memory = sparse_data.nbytes / 1024 / 1024
    
    # 稀疏矩阵操作
    sparse_matrix = sparse.csr_matrix(sparse_data)
    start = time.time()
    sparse_result = sparse_matrix.dot(sparse_matrix.T)
    sparse_time = time.time() - start
    sparse_memory = (sparse_matrix.data.nbytes + 
                    sparse_matrix.indices.nbytes + 
                    sparse_matrix.indptr.nbytes) / 1024 / 1024
    
    print(f"密集矩阵:")
    print(f"  计算时间: {dense_time:.6f}s")
    print(f"  内存使用: {dense_memory:.2f} MB")
    
    print(f"稀疏矩阵:")
    print(f"  计算时间: {sparse_time:.6f}s")
    print(f"  内存使用: {sparse_memory:.2f} MB")
    print(f"  内存节省: {dense_memory/sparse_memory:.2f}x")
    
    # 信号处理优化
    print(f"\n3. 信号处理优化:")
    
    # 创建信号
    fs = 1000  # 采样频率
    t = np.arange(0, 1, 1/fs)
    signal_data = np.sin(2*np.pi*50*t) + 0.5*np.sin(2*np.pi*120*t) + np.random.normal(0, 0.1, len(t))
    
    # FFT比较
    start = time.time()
    fft_numpy = np.fft.fft(signal_data)
    numpy_fft_time = time.time() - start
    
    start = time.time()
    fft_scipy = np.fft.fft(signal_data)  # SciPy使用相同的FFT
    scipy_fft_time = time.time() - start
    
    # 滤波器设计
    start = time.time()
    b, a = signal.butter(4, 0.2, 'low')
    filtered_signal = signal.filtfilt(b, a, signal_data)
    filter_time = time.time() - start
    
    print(f"FFT计算时间: {numpy_fft_time:.6f}s")
    print(f"滤波器设计和应用: {filter_time:.6f}s")

except ImportError:
    print("SciPy未安装,跳过SciPy集成示例")

7.3 机器学习库集成

# 19. 机器学习库集成
print("\n🤖 机器学习库集成:")

try:
    from sklearn.datasets import make_classification
    from sklearn.model_selection import train_test_split
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import accuracy_score
    
    print("1. Scikit-learn集成:")
    
    # 生成数据集
    X, y = make_classification(n_samples=10000, n_features=20, 
                              n_informative=10, n_redundant=10, 
                              random_state=42)
    
    print(f"数据集形状: {X.shape}")
    print(f"数据类型: {X.dtype}")
    
    # 数据预处理优化
    start = time.time()
    
    # 标准化(NumPy实现)
    X_mean = np.mean(X, axis=0)
    X_std = np.std(X, axis=0)
    X_normalized = (X - X_mean) / X_std
    
    preprocess_time = time.time() - start
    
    # 训练测试分割
    X_train, X_test, y_train, y_test = train_test_split(
        X_normalized, y, test_size=0.2, random_state=42)
    
    # 模型训练
    start = time.time()
    model = RandomForestClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)
    train_time = time.time() - start
    
    # 预测
    start = time.time()
    y_pred = model.predict(X_test)
    predict_time = time.time() - start
    
    accuracy = accuracy_score(y_test, y_pred)
    
    print(f"预处理时间: {preprocess_time:.6f}s")
    print(f"训练时间: {train_time:.6f}s")
    print(f"预测时间: {predict_time:.6f}s")
    print(f"准确率: {accuracy:.4f}")
    
    # 特征重要性分析
    feature_importance = model.feature_importances_
    top_features = np.argsort(feature_importance)[-5:]
    
    print(f"前5个重要特征: {top_features}")
    print(f"重要性分数: {feature_importance[top_features]}")

except ImportError:
    print("Scikit-learn未安装,跳过机器学习集成示例")

8. 本章小结

8.1 核心知识点

  1. 性能基础理论

    • 向量化编程的重要性
    • 内存布局和缓存效率
    • 数据类型对性能的影响
  2. 向量化编程技术

    • 高级向量化技巧
    • 广播优化策略
    • 通用函数(ufunc)的使用
  3. 内存管理优化

    • 内存使用分析和监控
    • 就地操作和视图优化
    • 缓存友好的编程模式
  4. 并行计算优化

    • 多线程配置和优化
    • 手动并行化技术
    • GPU加速的概念和应用
  5. 高级索引和广播

    • 高效的索引策略
    • 广播性能优化
    • 复杂数据操作技巧
  6. 性能分析和调试

    • 性能分析工具的使用
    • 内存泄漏检测
    • 基准测试方法

8.2 最佳实践

  • 🚀 优先向量化: 避免显式循环,使用NumPy的向量化操作
  • 💾 内存效率: 使用就地操作,避免不必要的数组复制
  • 🔧 数据类型优化: 根据精度需求选择合适的数据类型
  • 📊 性能监控: 定期分析代码性能,识别瓶颈

8.3 常见陷阱

  • 过度优化: 在性能不是瓶颈时进行不必要的优化
  • 内存泄漏: 忘记清理大型数组和临时变量
  • 缓存不友好: 不考虑内存访问模式的影响
  • 盲目并行: 在不适合的场景下使用并行计算

8.4 下一步学习

  • 🔬 深入学习特定领域的NumPy应用
  • 🌐 探索分布式计算框架
  • 🤖 学习深度学习框架中的NumPy应用
  • 📚 研究数值计算的理论基础

9. 练习题

9.1 基础练习

  1. 向量化优化

    • 将循环代码转换为向量化操作
    • 比较性能差异
    • 分析内存使用
  2. 内存管理

    • 实现内存高效的算法
    • 检测和修复内存泄漏
    • 优化大数据处理
  3. 性能分析

    • 使用profiling工具分析代码
    • 识别性能瓶颈
    • 实施优化策略

9.2 进阶练习

  1. 并行计算

    • 实现多线程数值计算
    • 比较不同并行策略
    • 优化线程配置
  2. 缓存优化

    • 设计缓存友好的算法
    • 分析内存访问模式
    • 实现分块处理
  3. 库集成

    • 优化与其他库的交互
    • 实现高效的数据转换
    • 设计性能测试套件

9.3 挑战练习

  1. 自定义ufunc

    • 实现高性能的自定义函数
    • 使用Cython或Numba加速
    • 与NumPy内置函数比较
  2. 大规模优化

    • 处理超大数据集
    • 实现内存映射算法
    • 设计分布式计算方案
  3. 实时性能

    • 实现实时数据处理
    • 优化延迟和吞吐量
    • 设计性能监控系统

恭喜您完成第8章的学习! 🎉

您已经掌握了NumPy的性能优化技术和高级使用技巧,这些知识将帮助您编写高效、可扩展的数值计算代码。

NumPy教程完结! 🎊

通过这8章的学习,您已经全面掌握了NumPy的核心概念、基本操作、高级功能和性能优化技术。现在您可以:

  • 高效地处理大规模数值数据
  • 实现复杂的数学和科学计算
  • 优化代码性能和内存使用
  • 与其他科学计算库无缝集成

继续您的数据科学和科学计算之旅吧! 🚀数据 n_samples = 10000 n_features = 100 X = np.random.random((n_samples, n_features)) weights = np.random.random(n_features) bias = np.random.random()

方法1: 显式循环

def explicit_loop(X, weights, bias): result = np.zeros(X.shape[0]) for i in range(X.shape[0]): result[i] = np.dot(X[i], weights) + bias return result

方法2: 矩阵乘法 + 广播

def vectorized_matmul(X, weights, bias): return np.dot(X, weights) + bias

方法3: 使用einsum

def einsum_method(X, weights, bias): return np.einsum(‘ij,j->i’, X, weights) + bias

性能比较

methods = [ (“显式循环”, explicit_loop), (“矩阵乘法+广播”, vectorized_matmul), (“einsum”, einsum_method) ]

print(“方法\t\t执行时间\t速度提升”) print(“-” * 40)

baseline_time = None for name, func in methods: start = time.time() result = func(X, weights, bias) exec_time = time.time() - start

if baseline_time is None:
    baseline_time = exec_time
    speedup = 1.0
else:
    speedup = baseline_time / exec_time

print(f"{name:>12}\t{exec_time:.6f}s\t{speedup:.1f}x")

复杂广播示例

print(f”\n2. 复杂广播示例:“)

计算所有样本对之间的欧氏距离

def pairwise_distances_loop(X): n = X.shape[0] distances = np.zeros((n, n)) for i in range(n): for j in range(n): distances[i, j] = np.sqrt(np.sum((X[i] - X[j]) ** 2)) return distances

def pairwise_distances_broadcast(X): # 使用广播计算所有对之间的距离 X_expanded = X[:, np.newaxis, :] # (n, 1, d) X_transposed = X[np.newaxis, :, :] # (1, n, d) diff = X_expanded - X_transposed # (n, n, d) distances = np.sqrt(np.sum(diff ** 2, axis=2)) # (n, n) return distances

def pairwise_distances_cdist(X): from scipy.spatial.distance import cdist return cdist(X, X)

小规模测试

X_small = X[:100] # 只取100个样本进行测试

print(“计算100x100距离矩阵:”) print(“方法\t\t执行时间\t速度提升”) print(“-” * 40)

methods_dist = [ (“双重循环”, pairwise_distances_loop), (“广播向量化”, pairwise_distances_broadcast), ]

try: from scipy.spatial.distance import cdist methods_dist.append((“scipy.cdist”, pairwise_distances_cdist)) except ImportError: pass

baseline_time = None for name, func in methods_dist: start = time.time() result = func(X_small) exec_time = time.time() - start

if baseline_time is None:
    baseline_time = exec_time
    speedup = 1.0
else:
    speedup = baseline_time / exec_time

print(f"{name:>12}\t{exec_time:.6f}s\t{speedup:.1f}x")

### 2.3 通用函数(ufunc)优化

```python
# 6. 通用函数(ufunc)优化
print("\n🔧 通用函数(ufunc)优化:")

# 自定义ufunc
print("1. 自定义ufunc:")

# Python函数
def sigmoid_python(x):
    return 1 / (1 + np.exp(-x))

# 向量化版本
sigmoid_vectorized = np.vectorize(sigmoid_python)

# NumPy优化版本
def sigmoid_numpy(x):
    return 1 / (1 + np.exp(-x))

# 数值稳定版本
def sigmoid_stable(x):
    return np.where(x >= 0, 
                   1 / (1 + np.exp(-x)),
                   np.exp(x) / (1 + np.exp(x)))

# 性能测试
test_data = np.random.uniform(-10, 10, 1000000)

methods = [
    ("Python函数", lambda x: [sigmoid_python(xi) for xi in x]),
    ("np.vectorize", sigmoid_vectorized),
    ("NumPy优化", sigmoid_numpy),
    ("数值稳定版", sigmoid_stable)
]

print("方法\t\t执行时间\t速度提升")
print("-" * 40)

baseline_time = None
for name, func in methods:
    start = time.time()
    if name == "Python函数":
        result = np.array(func(test_data[:1000]))  # 只测试1000个元素
        exec_time = (time.time() - start) * 1000  # 按比例放大
    else:
        result = func(test_data)
        exec_time = time.time() - start
    
    if baseline_time is None:
        baseline_time = exec_time
        speedup = 1.0
    else:
        speedup = baseline_time / exec_time
    
    print(f"{name:>12}\t{exec_time:.6f}s\t{speedup:.1f}x")

# ufunc的高级特性
print(f"\n2. ufunc的高级特性:")

# reduce操作
arr = np.random.random(1000000)
print("reduce操作比较:")

start = time.time()
sum_builtin = sum(arr)
time_builtin = time.time() - start

start = time.time()
sum_numpy = np.sum(arr)
time_numpy = time.time() - start

start = time.time()
sum_add_reduce = np.add.reduce(arr)
time_reduce = time.time() - start

print(f"Python内置sum: {time_builtin:.6f}s")
print(f"np.sum: {time_numpy:.6f}s")
print(f"np.add.reduce: {time_reduce:.6f}s")

# accumulate操作
print(f"\naccumulate操作:")
small_arr = np.arange(1000)

start = time.time()
cumsum_numpy = np.cumsum(small_arr)
time_cumsum = time.time() - start

start = time.time()
cumsum_accumulate = np.add.accumulate(small_arr)
time_accumulate = time.time() - start

print(f"np.cumsum: {time_cumsum:.6f}s")
print(f"np.add.accumulate: {time_accumulate:.6f}s")

# outer操作
print(f"\nouter操作:")
x = np.arange(1000)
y = np.arange(1000)

start = time.time()
outer_manual = x[:, np.newaxis] * y[np.newaxis, :]
time_manual = time.time() - start

start = time.time()
outer_ufunc = np.multiply.outer(x, y)
time_outer = time.time() - start

print(f"手动广播: {time_manual:.6f}s")
print(f"np.multiply.outer: {time_outer:.6f}s")

3. 内存管理优化

3.1 内存使用分析

# 7. 内存使用分析
print("\n💾 内存使用分析:")

def get_memory_usage():
    """获取当前内存使用量(MB)"""
    process = psutil.Process()
    return process.memory_info().rss / 1024 / 1024

# 内存使用监控
print("1. 内存使用监控:")

initial_memory = get_memory_usage()
print(f"初始内存使用: {initial_memory:.2f} MB")

# 创建大数组
print(f"\n创建大数组:")
sizes = [1000, 5000, 10000, 20000]
memory_usage = []

for size in sizes:
    arr = np.random.random((size, size))
    current_memory = get_memory_usage()
    array_memory = current_memory - initial_memory
    memory_usage.append(array_memory)
    
    expected_memory = arr.nbytes / 1024 / 1024
    overhead = array_memory - expected_memory
    
    print(f"数组大小: {size}x{size}")
    print(f"  预期内存: {expected_memory:.2f} MB")
    print(f"  实际内存: {array_memory:.2f} MB")
    print(f"  内存开销: {overhead:.2f} MB ({overhead/expected_memory*100:.1f}%)")
    
    del arr
    gc.collect()

# 内存碎片化分析
print(f"\n2. 内存碎片化分析:")

# 创建多个小数组
small_arrays = []
memory_before = get_memory_usage()

for i in range(1000):
    arr = np.random.random((100, 100))
    small_arrays.append(arr)

memory_after = get_memory_usage()
total_array_memory = sum(arr.nbytes for arr in small_arrays) / 1024 / 1024
actual_memory_increase = memory_after - memory_before

print(f"1000个小数组:")
print(f"  数组总内存: {total_array_memory:.2f} MB")
print(f"  实际内存增加: {actual_memory_increase:.2f} MB")
print(f"  碎片化开销: {actual_memory_increase - total_array_memory:.2f} MB")

# 清理内存
del small_arrays
gc.collect()

# 创建一个大数组
large_array = np.random.random((int(np.sqrt(1000) * 100), int(np.sqrt(1000) * 100)))
memory_large = get_memory_usage()
large_array_memory = large_array.nbytes / 1024 / 1024
actual_large_increase = memory_large - memory_before

print(f"\n1个大数组:")
print(f"  数组内存: {large_array_memory:.2f} MB")
print(f"  实际内存增加: {actual_large_increase:.2f} MB")
print(f"  内存效率: {large_array_memory/actual_large_increase*100:.1f}%")

del large_array
gc.collect()

3.2 内存优化技术

# 8. 内存优化技术
print("\n🔧 内存优化技术:")

# 就地操作优化
print("1. 就地操作优化:")

# 创建测试数据
n = 5000
A = np.random.random((n, n))
B = np.random.random((n, n))

memory_before = get_memory_usage()

# 非就地操作
start = time.time()
C1 = A + B
C1 = C1 * 2
C1 = np.sqrt(C1)
time_copy = time.time() - start
memory_copy = get_memory_usage() - memory_before

# 就地操作
A_copy = A.copy()
B_copy = B.copy()
memory_before = get_memory_usage()

start = time.time()
A_copy += B_copy
A_copy *= 2
np.sqrt(A_copy, out=A_copy)
time_inplace = time.time() - start
memory_inplace = get_memory_usage() - memory_before

print(f"非就地操作:")
print(f"  时间: {time_copy:.4f}s")
print(f"  内存增加: {memory_copy:.2f} MB")

print(f"就地操作:")
print(f"  时间: {time_inplace:.4f}s")
print(f"  内存增加: {memory_inplace:.2f} MB")
print(f"  内存节省: {memory_copy - memory_inplace:.2f} MB")

# 验证结果一致性
assert np.allclose(C1, A_copy)

# 视图 vs 副本
print(f"\n2. 视图 vs 副本:")

original = np.random.random((1000, 1000))
memory_before = get_memory_usage()

# 创建视图
view = original[::2, ::2]
memory_view = get_memory_usage() - memory_before

# 创建副本
copy = original[::2, ::2].copy()
memory_copy = get_memory_usage() - memory_view - memory_before

print(f"原数组内存: {original.nbytes / 1024 / 1024:.2f} MB")
print(f"视图额外内存: {memory_view:.2f} MB")
print(f"副本额外内存: {memory_copy:.2f} MB")

# 修改测试
original[0, 0] = 999
print(f"修改原数组后:")
print(f"  视图受影响: {view[0, 0] == 999}")
print(f"  副本不受影响: {copy[0, 0] != 999}")

# 内存映射优化
print(f"\n3. 内存映射优化:")

# 创建大文件
large_data = np.random.random((10000, 1000))
np.save('large_file.npy', large_data)
del large_data
gc.collect()

memory_before = get_memory_usage()

# 普通加载
loaded_array = np.load('large_file.npy')
memory_loaded = get_memory_usage() - memory_before

# 内存映射加载
memory_before = get_memory_usage()
mmap_array = np.load('large_file.npy', mmap_mode='r')
memory_mmap = get_memory_usage() - memory_before

print(f"普通加载内存: {memory_loaded:.2f} MB")
print(f"内存映射内存: {memory_mmap:.2f} MB")
print(f"内存节省: {memory_loaded - memory_mmap:.2f} MB")

# 访问性能比较
start = time.time()
subset_loaded = loaded_array[1000:2000, 100:200]
time_loaded = time.time() - start

start = time.time()
subset_mmap = mmap_array[1000:2000, 100:200]
time_mmap = time.time() - start

print(f"普通数组访问时间: {time_loaded:.6f}s")
print(f"内存映射访问时间: {time_mmap:.6f}s")

del loaded_array, mmap_array
gc.collect()

3.3 缓存优化

# 9. 缓存优化
print("\n🗄️ 缓存优化:")

# 缓存友好的数据访问模式
print("1. 缓存友好的数据访问模式:")

# 创建大矩阵
size = 2000
matrix = np.random.random((size, size))

# 按行访问(缓存友好)
start = time.time()
row_sum = 0
for i in range(size):
    row_sum += np.sum(matrix[i, :])
time_row = time.time() - start

# 按列访问(缓存不友好)
start = time.time()
col_sum = 0
for j in range(size):
    col_sum += np.sum(matrix[:, j])
time_col = time.time() - start

# 向量化访问
start = time.time()
total_sum = np.sum(matrix)
time_vectorized = time.time() - start

print(f"按行访问时间: {time_row:.4f}s")
print(f"按列访问时间: {time_col:.4f}s")
print(f"向量化访问时间: {time_vectorized:.4f}s")
print(f"行/列访问比: {time_col/time_row:.2f}")

# 分块处理优化
print(f"\n2. 分块处理优化:")

def matrix_multiply_naive(A, B):
    """朴素矩阵乘法"""
    n, m, p = A.shape[0], A.shape[1], B.shape[1]
    C = np.zeros((n, p))
    for i in range(n):
        for j in range(p):
            for k in range(m):
                C[i, j] += A[i, k] * B[k, j]
    return C

def matrix_multiply_blocked(A, B, block_size=64):
    """分块矩阵乘法"""
    n, m, p = A.shape[0], A.shape[1], B.shape[1]
    C = np.zeros((n, p))
    
    for i in range(0, n, block_size):
        for j in range(0, p, block_size):
            for k in range(0, m, block_size):
                i_end = min(i + block_size, n)
                j_end = min(j + block_size, p)
                k_end = min(k + block_size, m)
                
                C[i:i_end, j:j_end] += np.dot(A[i:i_end, k:k_end], 
                                             B[k:k_end, j:j_end])
    return C

# 小规模测试
size = 200
A_small = np.random.random((size, size))
B_small = np.random.random((size, size))

# NumPy内置
start = time.time()
C_numpy = np.dot(A_small, B_small)
time_numpy = time.time() - start

# 分块算法
start = time.time()
C_blocked = matrix_multiply_blocked(A_small, B_small, block_size=32)
time_blocked = time.time() - start

print(f"NumPy内置: {time_numpy:.4f}s")
print(f"分块算法: {time_blocked:.4f}s")
print(f"结果一致性: {np.allclose(C_numpy, C_blocked)}")

# 不同分块大小的性能
block_sizes = [16, 32, 64, 128]
print(f"\n不同分块大小的性能:")
print("分块大小\t执行时间\t相对性能")

for block_size in block_sizes:
    start = time.time()
    C_test = matrix_multiply_blocked(A_small, B_small, block_size)
    exec_time = time.time() - start
    relative_perf = time_numpy / exec_time
    print(f"{block_size:>6}\t{exec_time:.4f}s\t{relative_perf:.2f}x")

4. 并行计算优化

4.1 多线程优化

# 10. 多线程优化
print("\n🧵 多线程优化:")

import threading
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing as mp

# NumPy线程控制
print("1. NumPy线程控制:")

# 检查当前线程设置
print(f"NumPy线程数: {np.get_num_threads()}")
print(f"CPU核心数: {mp.cpu_count()}")

# 矩阵乘法性能测试
size = 1000
A = np.random.random((size, size))
B = np.random.random((size, size))

# 测试不同线程数的性能
thread_counts = [1, 2, 4, 8]
times = []

for num_threads in thread_counts:
    np.set_num_threads(num_threads)
    
    start = time.time()
    for _ in range(5):  # 多次测试取平均
        C = np.dot(A, B)
    avg_time = (time.time() - start) / 5
    times.append(avg_time)
    
    print(f"线程数: {num_threads}, 平均时间: {avg_time:.4f}s")

# 恢复默认线程数
np.set_num_threads(mp.cpu_count())

# 可视化性能
plt.figure(figsize=(10, 6))
plt.plot(thread_counts, times, 'o-', linewidth=2, markersize=8)
plt.xlabel('线程数')
plt.ylabel('执行时间 (秒)')
plt.title('NumPy多线程性能')
plt.grid(True)
plt.savefig('numpy_threading_performance.png', dpi=300, bbox_inches='tight')
plt.show()

# 手动并行化
print(f"\n2. 手动并行化:")

def parallel_sum_chunks(arr, num_workers=4):
    """并行计算数组和"""
    chunk_size = len(arr) // num_workers
    chunks = [arr[i:i+chunk_size] for i in range(0, len(arr), chunk_size)]
    
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        futures = [executor.submit(np.sum, chunk) for chunk in chunks]
        results = [future.result() for future in futures]
    
    return sum(results)

def parallel_sum_processes(arr, num_workers=4):
    """使用进程池并行计算"""
    chunk_size = len(arr) // num_workers
    chunks = [arr[i:i+chunk_size] for i in range(0, len(arr), chunk_size)]
    
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        futures = [executor.submit(np.sum, chunk) for chunk in chunks]
        results = [future.result() for future in futures]
    
    return sum(results)

# 性能测试
large_array = np.random.random(10000000)

# 串行计算
start = time.time()
serial_result = np.sum(large_array)
serial_time = time.time() - start

# 线程并行
start = time.time()
thread_result = parallel_sum_chunks(large_array, num_workers=4)
thread_time = time.time() - start

# 进程并行
start = time.time()
process_result = parallel_sum_processes(large_array, num_workers=4)
process_time = time.time() - start

print(f"串行计算: {serial_time:.4f}s")
print(f"线程并行: {thread_time:.4f}s (速度提升: {serial_time/thread_time:.2f}x)")
print(f"进程并行: {process_time:.4f}s (速度提升: {serial_time/process_time:.2f}x)")

# 验证结果
print(f"结果一致性: {abs(serial_result - thread_result) < 1e-10}")
print(f"结果一致性: {abs(serial_result - process_result) < 1e-10}")

4.2 GPU加速(概念性)

# 11. GPU加速概念
print("\n🚀 GPU加速概念:")

# 模拟GPU加速的概念
print("GPU加速的优势和考虑因素:")

# CPU vs GPU特性比较
cpu_cores = mp.cpu_count()
gpu_cores = 2048  # 假设的GPU核心数

print(f"CPU核心数: {cpu_cores}")
print(f"GPU核心数: {gpu_cores} (假设)")
print(f"并行度差异: {gpu_cores/cpu_cores:.0f}x")

# 数据传输开销模拟
def simulate_gpu_computation(data_size, computation_intensity):
    """模拟GPU计算的开销"""
    # 数据传输时间(模拟)
    transfer_time = data_size * 1e-9  # 假设传输速度
    
    # CPU计算时间
    cpu_time = computation_intensity * data_size * 1e-8
    
    # GPU计算时间(假设100x加速)
    gpu_compute_time = cpu_time / 100
    
    # 总GPU时间 = 传输时间 + 计算时间
    total_gpu_time = 2 * transfer_time + gpu_compute_time  # 双向传输
    
    return cpu_time, total_gpu_time, transfer_time, gpu_compute_time

# 不同数据大小的分析
data_sizes = [1000, 10000, 100000, 1000000, 10000000]
computation_intensities = [1, 10, 100]  # 不同计算强度

print(f"\nGPU加速效果分析:")
print("数据大小\t计算强度\tCPU时间\tGPU时间\t加速比\t传输占比")
print("-" * 70)

for comp_intensity in computation_intensities:
    print(f"\n计算强度: {comp_intensity}")
    for data_size in data_sizes:
        cpu_time, gpu_time, transfer_time, gpu_compute_time = \
            simulate_gpu_computation(data_size, comp_intensity)
        
        speedup = cpu_time / gpu_time
        transfer_ratio = (2 * transfer_time) / gpu_time * 100
        
        print(f"{data_size:>8}\t{comp_intensity:>6}\t{cpu_time:.6f}s\t"
              f"{gpu_time:.6f}s\t{speedup:.2f}x\t{transfer_ratio:.1f}%")

# GPU适用性分析
print(f"\nGPU加速适用性分析:")
print("✅ 适合GPU的场景:")
print("  - 大规模并行计算")
print("  - 计算密集型任务")
print("  - 数据规模大,传输开销相对较小")
print("  - 重复性计算")

print("\n❌ 不适合GPU的场景:")
print("  - 小规模数据")
print("  - 频繁的CPU-GPU数据传输")
print("  - 复杂的控制流")
print("  - 内存访问模式不规则")

# 实际GPU库的使用建议
print(f"\n实际GPU库使用建议:")
print("1. CuPy: NumPy-like GPU库")
print("   - 语法与NumPy几乎相同")
print("   - 适合现有NumPy代码迁移")

print("\n2. JAX: 可编译的NumPy")
print("   - 支持JIT编译")
print("   - 自动微分")
print("   - 支持GPU/TPU")

print("\n3. TensorFlow/PyTorch:")
print("   - 深度学习框架")
print("   - 内置GPU支持")
print("   - 适合机器学习任务")

5. 高级索引和广播技巧

5.1 高级索引优化

# 12. 高级索引优化
print("\n🎯 高级索引优化:")

# 布尔索引优化
print("1. 布尔索引优化:")

# 创建测试数据
n = 1000000
data = np.random.random(n)
threshold = 0.5

# 方法1: 布尔索引
start = time.time()
mask = data > threshold
filtered_bool = data[mask]
time_bool = time.time() - start

# 方法2: np.where
start = time.time()
indices = np.where(data > threshold)[0]
filtered_where = data[indices]
time_where = time.time() - start

# 方法3: np.compress
start = time.time()
filtered_compress = np.compress(data > threshold, data)
time_compress = time.time() - start

# 方法4: 列表推导(对比)
start = time.time()
filtered_list = np.array([x for x in data if x > threshold])
time_list = time.time() - start

print(f"布尔索引: {time_bool:.6f}s")
print(f"np.where: {time_where:.6f}s")
print(f"np.compress: {time_compress:.6f}s")
print(f"列表推导: {time_list:.6f}s")

# 验证结果一致性
assert np.array_equal(filtered_bool, filtered_where)
assert np.array_equal(filtered_bool, filtered_compress)
assert np.allclose(filtered_bool, filtered_list)

# 多条件索引优化
print(f"\n2. 多条件索引优化:")

# 创建2D数据
rows, cols = 10000, 100
data_2d = np.random.random((rows, cols))
condition1 = np.random.random(rows) > 0.3
condition2 = np.random.random(rows) > 0.7

# 方法1: 逐步过滤
start = time.time()
mask1 = condition1
mask2 = condition2
combined_mask = mask1 & mask2
result1 = data_2d[combined_mask]
time_step = time.time() - start

# 方法2: 直接组合
start = time.time()
result2 = data_2d[condition1 & condition2]
time_direct = time.time() - start

# 方法3: np.logical_and
start = time.time()
combined_logical = np.logical_and(condition1, condition2)
result3 = data_2d[combined_logical]
time_logical = time.time() - start

print(f"逐步过滤: {time_step:.6f}s")
print(f"直接组合: {time_direct:.6f}s")
print(f"logical_and: {time_logical:.6f}s")

# 花式索引优化
print(f"\n3. 花式索引优化:")

# 创建测试数据
data_large = np.random.random((10000, 1000))
indices = np.random.randint(0, 10000, 5000)

# 方法1: 花式索引
start = time.time()
result_fancy = data_large[indices]
time_fancy = time.time() - start

# 方法2: np.take
start = time.time()
result_take = np.take(data_large, indices, axis=0)
time_take = time.time() - start

# 方法3: 循环(对比)
start = time.time()
result_loop = np.array([data_large[i] for i in indices])
time_loop = time.time() - start

print(f"花式索引: {time_fancy:.6f}s")
print(f"np.take: {time_take:.6f}s")
print(f"循环方式: {time_loop:.6f}s")

# 验证结果
assert np.array_equal(result_fancy, result_take)
assert np.array_equal(result_fancy, result_loop)

5.2 广播性能优化

# 13. 广播性能优化
print("\n📡 广播性能优化:")

# 广播内存效率
print("1. 广播内存效率:")

# 创建测试数据
n, m = 5000, 1000
matrix = np.random.random((n, m))
vector = np.random.random(m)

# 方法1: 显式扩展
start = time.time()
vector_expanded = np.tile(vector, (n, 1))
result1 = matrix + vector_expanded
time_explicit = time.time() - start
memory_explicit = vector_expanded.nbytes / 1024 / 1024

# 方法2: 广播
start = time.time()
result2 = matrix + vector
time_broadcast = time.time() - start
memory_broadcast = 0  # 广播不创建额外数组

print(f"显式扩展:")
print(f"  时间: {time_explicit:.6f}s")
print(f"  额外内存: {memory_explicit:.2f} MB")

print(f"广播:")
print(f"  时间: {time_broadcast:.6f}s")
print(f"  额外内存: {memory_broadcast:.2f} MB")
print(f"  速度提升: {time_explicit/time_broadcast:.2f}x")

# 验证结果
assert np.allclose(result1, result2)

# 复杂广播优化
print(f"\n2. 复杂广播优化:")

# 三维广播示例
shape1 = (100, 1, 50)
shape2 = (1, 200, 50)
shape3 = (100, 200, 1)

arr1 = np.random.random(shape1)
arr2 = np.random.random(shape2)
arr3 = np.random.random(shape3)

# 方法1: 逐步计算
start = time.time()
temp1 = arr1 + arr2  # 广播到 (100, 200, 50)
result_step = temp1 + arr3  # 广播到 (100, 200, 50)
time_step = time.time() - start

# 方法2: 一次性计算
start = time.time()
result_once = arr1 + arr2 + arr3
time_once = time.time() - start

print(f"逐步计算: {time_step:.6f}s")
print(f"一次性计算: {time_once:.6f}s")
print(f"性能差异: {time_step/time_once:.2f}x")

# 验证结果
assert np.allclose(result_step, result_once)

# 广播形状优化
print(f"\n3. 广播形状优化:")

# 不同的广播策略
data = np.random.random((1000, 1000))
weights_row = np.random.random((1, 1000))  # 行权重
weights_col = np.random.random((1000, 1))  # 列权重

# 策略1: 分别应用权重
start = time.time()
weighted1 = data * weights_row
weighted1 = weighted1 * weights_col
time_separate = time.time() - start

# 策略2: 组合权重
start = time.time()
combined_weights = weights_row * weights_col
weighted2 = data * combined_weights
time_combined = time.time() - start

# 策略3: 直接广播
start = time.time()
weighted3 = data * weights_row * weights_col
time_direct = time.time() - start

print(f"分别应用: {time_separate:.6f}s")
print(f"组合权重: {time_combined:.6f}s")
print(f"直接广播: {time_direct:.6f}s")

# 验证结果
assert np.allclose(weighted1, weighted2)
assert np.allclose(weighted1, weighted3)

6. 性能分析和调试

6.1 性能分析工具

# 14. 性能分析工具
print("\n🔍 性能分析工具:")

import cProfile
import pstats
from line_profiler import LineProfiler

# 创建测试函数
def matrix_operations():
    """复杂的矩阵操作函数"""
    n = 1000
    
    # 创建矩阵
    A = np.random.random((n, n))
    B = np.random.random((n, n))
    
    # 矩阵乘法
    C = np.dot(A, B)
    
    # 特征值计算
    eigenvals = np.linalg.eigvals(A)
    
    # 统计计算
    mean_vals = np.mean(C, axis=0)
    std_vals = np.std(C, axis=0)
    
    # 条件操作
    result = np.where(C > np.mean(C), C, 0)
    
    return result

# cProfile分析
print("1. cProfile性能分析:")

# 运行性能分析
profiler = cProfile.Profile()
profiler.enable()

result = matrix_operations()

profiler.disable()

# 保存和显示结果
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(10)  # 显示前10个最耗时的函数

# 时间测量装饰器
def timing_decorator(func):
    """计时装饰器"""
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(f"{func.__name__} 执行时间: {end - start:.6f}s")
        return result
    return wrapper

@timing_decorator
def optimized_matrix_ops():
    """优化的矩阵操作"""
    n = 1000
    
    # 使用更高效的数据类型
    A = np.random.random((n, n)).astype(np.float32)
    B = np.random.random((n, n)).astype(np.float32)
    
    # 矩阵乘法
    C = np.dot(A, B)
    
    # 使用就地操作
    mean_val = np.mean(C)
    np.subtract(C, mean_val, out=C)
    np.maximum(C, 0, out=C)
    
    return C

print(f"\n2. 优化前后对比:")
print("原始版本:")
original_result = matrix_operations()

print("优化版本:")
optimized_result = optimized_matrix_ops()

6.2 内存分析

# 15. 内存分析
print("\n💾 内存分析:")

# 内存使用分析函数
def analyze_memory_usage(func, *args, **kwargs):
    """分析函数的内存使用"""
    import tracemalloc
    
    # 开始内存跟踪
    tracemalloc.start()
    
    # 记录初始内存
    initial_memory = get_memory_usage()
    
    # 执行函数
    start_time = time.time()
    result = func(*args, **kwargs)
    execution_time = time.time() - start_time
    
    # 记录最终内存
    final_memory = get_memory_usage()
    
    # 获取内存统计
    current, peak = tracemalloc.get_traced_memory()
    tracemalloc.stop()
    
    print(f"函数: {func.__name__}")
    print(f"  执行时间: {execution_time:.6f}s")
    print(f"  内存变化: {final_memory - initial_memory:.2f} MB")
    print(f"  峰值内存: {peak / 1024 / 1024:.2f} MB")
    print(f"  当前内存: {current / 1024 / 1024:.2f} MB")
    
    return result

# 测试不同的内存使用模式
def memory_efficient_function():
    """内存高效的函数"""
    # 使用生成器和就地操作
    n = 5000
    result = 0
    
    # 分块处理
    chunk_size = 1000
    for i in range(0, n, chunk_size):
        chunk = np.random.random((chunk_size, chunk_size))
        result += np.sum(chunk)
        del chunk  # 显式删除
        gc.collect()
    
    return result

def memory_intensive_function():
    """内存密集的函数"""
    # 创建多个大数组
    arrays = []
    for i in range(5):
        arr = np.random.random((2000, 2000))
        arrays.append(arr)
    
    # 计算结果
    result = sum(np.sum(arr) for arr in arrays)
    return result

print("内存高效版本:")
result1 = analyze_memory_usage(memory_efficient_function)

print("\n内存密集版本:")
result2 = analyze_memory_usage(memory_intensive_function)

# 内存泄漏检测
print(f"\n3. 内存泄漏检测:")

def potential_memory_leak():
    """可能存在内存泄漏的函数"""
    arrays = []
    for i in range(100):
        arr = np.random.random((100, 100))
        arrays.append(arr)
        # 忘记清理旧数组
    return arrays[-1]  # 只返回最后一个,但保持了所有数组的引用

# 检测内存泄漏
initial_memory = get_memory_usage()
for i in range(10):
    result = potential_memory_leak()
    current_memory = get_memory_usage()
    print(f"迭代 {i+1}: 内存使用 {current_memory:.2f} MB "
          f"(增加 {current_memory - initial_memory:.2f} MB)")

# 清理内存
gc.collect()
final_memory = get_memory_usage()
print(f"垃圾回收后: {final_memory:.2f} MB")

6.3 性能基准测试

”`python

16. 性能基准测试

print(“\n📊 性能基准测试:”)

import timeit from collections import defaultdict

class PerformanceBenchmark: “”“性能基准测试类”“”

def __init__(self):
    self.results = defaultdict(list)

def benchmark_function(self, func, *args, name=None, number=10, **kwargs):
    """基准测试函数"""
    if name is None:
        name = func.__name__

    # 使用timeit进行精确计时
    timer = timeit.Timer(lambda: func(*args, **kwargs))
    times = timer.repeat(repeat=5, number=number)

    avg_time = sum(times) / len(times) / number
    min_time = min(times) / number
    max_time = max(times) / number

    self.results[name] = {
        'avg_time': avg_time,
        'min_time': min_time,
        'max_time': max_time,
        'std_time': np.std([t/number for t in times])
    }

    return avg_time

def compare_functions(self, functions, *args, **kwargs):
    """比较多个函数的性能"""
    print("函数性能比较:")
    print("函数名\t\t平均时间\t最小时间\t最大时间\t标准差")
    print("-" * 70)

    baseline_time = None
    for name, func in functions:
        avg_time = self.benchmark_function(func, *args, name=name, **kwargs)
        result = self.results[name]

        if baseline_time is None:
            baseline_time = avg_time
            speedup = 1.0
        else:
            speedup = baseline_time / avg_time

        print(f"{name:>12}\t{result['avg_time']:.6f}s\t"
              f"{result['min_time']:.6f}s\t{result['max_time']:.6f}s\t"
              f"{result['std_time']:.6f}s\t{speedup:.2f}x")

def plot_results(self):
    """绘制性能比较图"""
    names = list(self.results.keys())
    avg_times = [self.results[name]['avg_time'] for name in names]
    std_times = [self.results[name]['std_time'] for name in names]

    plt.figure(figsize=(12, 6))
    bars = plt.bar(names, avg_times, yerr=std_times, capsize=5)
    plt.ylabel('执行时间 (秒)')
    plt.title('函数性能比较')
    plt.xticks(rotation=45)

    # 添加数值标签
    for bar, time in zip(bars, avg_times):
        plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.001,
                f'{time:.4f}s', ha='center', va='bottom')

    plt.tight_layout()
    plt.savefig('performance_benchmark.png', dpi=300, bbox_inches='tight')
    plt.show()

创建基准测试实例

benchmark = PerformanceBenchmark()

定义测试函数

def numpy_sum(arr): return np.sum(arr)

def python_sum(arr): return sum(arr)

def numpy_mean(arr): return np.mean(arr)

def manual_mean(arr): return np.sum(arr) / len(arr)

def numpy_std(arr): return np.std(arr)

def manual_std(arr): mean = np.mean(arr) return np.sqrt(np.mean((arr - mean) ** 2))

创建测试