📚 本章概述
本章将深入探讨NumPy的性能优化技术和高级使用技巧。您将学习如何编写高效的NumPy代码,理解底层实现原理,掌握内存管理和并行计算技术,以及学会使用各种高级功能来解决复杂的数值计算问题。
🎯 学习目标
- 理解NumPy的性能特点和优化原理
- 掌握向量化编程技术
- 学会内存管理和缓存优化
- 了解并行计算和多线程技术
- 掌握高级索引和广播技巧
- 学会性能分析和调试方法
- 理解与其他库的集成优化
1. 性能基础理论
1.1 NumPy性能原理
import numpy as np
import time
import matplotlib.pyplot as plt
import sys
import psutil
import gc
from memory_profiler import profile
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
print("🚀 NumPy性能基础理论")
print("=" * 50)
# 1. 向量化 vs 循环比较
print("1. 向量化 vs 循环性能比较:")
def python_loop_sum(arr):
"""Python循环求和"""
total = 0
for i in range(len(arr)):
total += arr[i]
return total
def numpy_sum(arr):
"""NumPy向量化求和"""
return np.sum(arr)
# 测试不同大小的数组
sizes = [1000, 10000, 100000, 1000000]
python_times = []
numpy_times = []
speedups = []
print("数组大小\tPython循环\tNumPy向量化\t速度提升")
print("-" * 50)
for size in sizes:
arr = np.random.random(size)
# Python循环计时
start = time.time()
python_result = python_loop_sum(arr)
python_time = time.time() - start
# NumPy向量化计时
start = time.time()
numpy_result = numpy_sum(arr)
numpy_time = time.time() - start
speedup = python_time / numpy_time
python_times.append(python_time)
numpy_times.append(numpy_time)
speedups.append(speedup)
print(f"{size:>8}\t{python_time:.6f}s\t{numpy_time:.6f}s\t{speedup:.1f}x")
# 验证结果一致性
assert abs(python_result - numpy_result) < 1e-10
# 可视化性能比较
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
ax1.loglog(sizes, python_times, 'o-', label='Python循环', linewidth=2)
ax1.loglog(sizes, numpy_times, 's-', label='NumPy向量化', linewidth=2)
ax1.set_xlabel('数组大小')
ax1.set_ylabel('执行时间 (秒)')
ax1.set_title('执行时间比较')
ax1.legend()
ax1.grid(True)
ax2.semilogx(sizes, speedups, 'o-', color='red', linewidth=2)
ax2.set_xlabel('数组大小')
ax2.set_ylabel('速度提升倍数')
ax2.set_title('NumPy速度提升')
ax2.grid(True)
plt.tight_layout()
plt.savefig('numpy_performance_comparison.png', dpi=300, bbox_inches='tight')
plt.show()
print(f"\n最大速度提升: {max(speedups):.1f}x")
1.2 内存布局和缓存效率
# 2. 内存布局和缓存效率
print("\n💾 内存布局和缓存效率:")
# C-order vs Fortran-order
print("C-order vs Fortran-order性能比较:")
size = 2000
arr_c = np.random.random((size, size)) # C-order (行优先)
arr_f = np.asfortranarray(arr_c) # Fortran-order (列优先)
print(f"C-order数组: {arr_c.flags}")
print(f"Fortran-order数组: {arr_f.flags}")
# 行遍历性能测试
def row_sum_test(arr):
"""按行求和测试"""
return np.sum(arr, axis=1)
def col_sum_test(arr):
"""按列求和测试"""
return np.sum(arr, axis=0)
# 测试C-order数组
print(f"\nC-order数组性能:")
start = time.time()
c_row_result = row_sum_test(arr_c)
c_row_time = time.time() - start
start = time.time()
c_col_result = col_sum_test(arr_c)
c_col_time = time.time() - start
print(f"按行求和: {c_row_time:.6f}s")
print(f"按列求和: {c_col_time:.6f}s")
print(f"行/列时间比: {c_col_time/c_row_time:.2f}")
# 测试Fortran-order数组
print(f"\nFortran-order数组性能:")
start = time.time()
f_row_result = row_sum_test(arr_f)
f_row_time = time.time() - start
start = time.time()
f_col_result = col_sum_test(arr_f)
f_col_time = time.time() - start
print(f"按行求和: {f_row_time:.6f}s")
print(f"按列求和: {f_col_time:.6f}s")
print(f"行/列时间比: {f_row_time/f_col_time:.2f}")
# 内存连续性测试
print(f"\n内存连续性影响:")
# 创建非连续数组
arr_strided = arr_c[::2, ::2] # 步长为2的子数组
arr_copy = arr_strided.copy() # 连续的副本
print(f"原数组连续性: {arr_c.flags['C_CONTIGUOUS']}")
print(f"步长数组连续性: {arr_strided.flags['C_CONTIGUOUS']}")
print(f"副本数组连续性: {arr_copy.flags['C_CONTIGUOUS']}")
# 性能比较
start = time.time()
strided_sum = np.sum(arr_strided)
strided_time = time.time() - start
start = time.time()
copy_sum = np.sum(arr_copy)
copy_time = time.time() - start
print(f"步长数组求和时间: {strided_time:.6f}s")
print(f"连续数组求和时间: {copy_time:.6f}s")
print(f"性能差异: {strided_time/copy_time:.2f}x")
1.3 数据类型优化
# 3. 数据类型优化
print("\n🔢 数据类型优化:")
# 不同数据类型的性能和内存使用
dtypes = [np.int8, np.int16, np.int32, np.int64,
np.float16, np.float32, np.float64]
size = 1000000
print("数据类型\t内存使用(MB)\t计算时间(s)\t精度损失")
print("-" * 55)
base_array = np.random.random(size) * 1000
for dtype in dtypes:
# 转换数据类型
arr = base_array.astype(dtype)
# 内存使用
memory_mb = arr.nbytes / 1024 / 1024
# 计算性能测试
start = time.time()
result = np.sum(arr ** 2)
calc_time = time.time() - start
# 精度测试(与float64比较)
reference = base_array.astype(np.float64)
precision_loss = np.mean(np.abs(arr.astype(np.float64) - reference))
print(f"{dtype.__name__:>10}\t{memory_mb:>8.2f}\t{calc_time:>10.6f}\t{precision_loss:>10.6f}")
# 混合精度计算示例
print(f"\n混合精度计算示例:")
# 创建大型矩阵
n = 2000
A_f64 = np.random.random((n, n)).astype(np.float64)
B_f64 = np.random.random((n, n)).astype(np.float64)
A_f32 = A_f64.astype(np.float32)
B_f32 = B_f64.astype(np.float32)
# float64矩阵乘法
start = time.time()
C_f64 = np.dot(A_f64, B_f64)
time_f64 = time.time() - start
# float32矩阵乘法
start = time.time()
C_f32 = np.dot(A_f32, B_f32)
time_f32 = time.time() - start
# 混合精度:float32计算,float64结果
start = time.time()
C_mixed = np.dot(A_f32, B_f32).astype(np.float64)
time_mixed = time.time() - start
print(f"float64计算时间: {time_f64:.4f}s")
print(f"float32计算时间: {time_f32:.4f}s")
print(f"混合精度时间: {time_mixed:.4f}s")
print(f"float32速度提升: {time_f64/time_f32:.2f}x")
print(f"混合精度速度提升: {time_f64/time_mixed:.2f}x")
# 精度比较
error_f32 = np.mean(np.abs(C_f64 - C_f32.astype(np.float64)))
error_mixed = np.mean(np.abs(C_f64 - C_mixed))
print(f"float32精度损失: {error_f32:.2e}")
print(f"混合精度损失: {error_mixed:.2e}")
2. 向量化编程技术
2.1 高级向量化技巧
# 4. 高级向量化技巧
print("\n⚡ 高级向量化技巧:")
# 条件向量化
print("1. 条件向量化:")
# 传统方法
def traditional_conditional(arr):
result = np.zeros_like(arr)
for i in range(len(arr)):
if arr[i] > 0:
result[i] = np.sqrt(arr[i])
elif arr[i] < 0:
result[i] = -np.sqrt(-arr[i])
else:
result[i] = 0
return result
# 向量化方法
def vectorized_conditional(arr):
result = np.zeros_like(arr)
pos_mask = arr > 0
neg_mask = arr < 0
result[pos_mask] = np.sqrt(arr[pos_mask])
result[neg_mask] = -np.sqrt(-arr[neg_mask])
return result
# 使用np.where的方法
def where_conditional(arr):
return np.where(arr > 0, np.sqrt(arr),
np.where(arr < 0, -np.sqrt(-arr), 0))
# 性能测试
test_arr = np.random.uniform(-10, 10, 100000)
methods = [
("传统循环", traditional_conditional),
("向量化掩码", vectorized_conditional),
("np.where", where_conditional)
]
print("方法\t\t执行时间\t速度提升")
print("-" * 40)
baseline_time = None
for name, func in methods:
start = time.time()
result = func(test_arr)
exec_time = time.time() - start
if baseline_time is None:
baseline_time = exec_time
speedup = 1.0
else:
speedup = baseline_time / exec_time
print(f"{name:>10}\t{exec_time:.6f}s\t{speedup:.1f}x")
# 验证结果一致性
results = [func(test_arr) for _, func in methods]
for i in range(1, len(results)):
assert np.allclose(results[0], results[i], rtol=1e-10)
print("✅ 所有方法结果一致")
2.2 广播优化技巧
# 5. 广播优化技巧
print("\n📡 广播优化技巧:")
# 矩阵运算优化
print("1. 矩阵运算优化:")
# 创建测试数据
test_array = np.random.random(1000000)
# 比较求和函数
print("求和函数比较:")
sum_functions = [
("numpy_sum", numpy_sum),
("python_sum", python_sum)
]
benchmark.compare_functions(sum_functions, test_array)
print("\n均值函数比较:")
mean_functions = [
("numpy_mean", numpy_mean),
("manual_mean", manual_mean)
]
benchmark.compare_functions(mean_functions, test_array)
print("\n标准差函数比较:")
std_functions = [
("numpy_std", numpy_std),
("manual_std", manual_std)
]
benchmark.compare_functions(std_functions, test_array)
# 绘制结果
benchmark.plot_results()
7. 与其他库的集成优化
7.1 Pandas集成优化
# 17. Pandas集成优化
print("\n🐼 Pandas集成优化:")
try:
import pandas as pd
# NumPy vs Pandas性能比较
print("1. NumPy vs Pandas性能比较:")
# 创建测试数据
n = 1000000
data_dict = {
'A': np.random.random(n),
'B': np.random.random(n),
'C': np.random.randint(0, 100, n)
}
# NumPy数组
np_data = np.column_stack([data_dict['A'], data_dict['B'], data_dict['C']])
# Pandas DataFrame
df = pd.DataFrame(data_dict)
# 计算操作比较
print("计算操作性能比较:")
# NumPy计算
start = time.time()
np_mean = np.mean(np_data[:, 0])
np_sum = np.sum(np_data[:, 1])
np_filtered = np_data[np_data[:, 2] > 50]
np_time = time.time() - start
# Pandas计算
start = time.time()
pd_mean = df['A'].mean()
pd_sum = df['B'].sum()
pd_filtered = df[df['C'] > 50]
pd_time = time.time() - start
print(f"NumPy计算时间: {np_time:.6f}s")
print(f"Pandas计算时间: {pd_time:.6f}s")
print(f"NumPy速度优势: {pd_time/np_time:.2f}x")
# 优化的Pandas操作
print(f"\n2. 优化的Pandas操作:")
# 使用.values访问底层NumPy数组
start = time.time()
values_mean = np.mean(df['A'].values)
values_sum = np.sum(df['B'].values)
values_time = time.time() - start
# 使用向量化操作
start = time.time()
df['D'] = df['A'] * df['B'] # 向量化乘法
df['E'] = np.where(df['C'] > 50, df['A'], 0) # 条件赋值
vectorized_time = time.time() - start
print(f"使用.values: {values_time:.6f}s")
print(f"向量化操作: {vectorized_time:.6f}s")
# 内存使用比较
print(f"\n3. 内存使用比较:")
np_memory = np_data.nbytes / 1024 / 1024
df_memory = df.memory_usage(deep=True).sum() / 1024 / 1024
print(f"NumPy内存使用: {np_memory:.2f} MB")
print(f"Pandas内存使用: {df_memory:.2f} MB")
print(f"Pandas内存开销: {df_memory/np_memory:.2f}x")
except ImportError:
print("Pandas未安装,跳过Pandas集成示例")
7.2 SciPy集成优化
# 18. SciPy集成优化
print("\n🔬 SciPy集成优化:")
try:
from scipy import linalg, sparse, optimize, signal
# 线性代数优化
print("1. 线性代数优化:")
n = 2000
A = np.random.random((n, n))
b = np.random.random(n)
# NumPy求解
start = time.time()
x_numpy = np.linalg.solve(A, b)
numpy_time = time.time() - start
# SciPy求解(可能使用更优化的LAPACK)
start = time.time()
x_scipy = linalg.solve(A, b)
scipy_time = time.time() - start
print(f"NumPy求解时间: {numpy_time:.6f}s")
print(f"SciPy求解时间: {scipy_time:.6f}s")
print(f"结果一致性: {np.allclose(x_numpy, x_scipy)}")
# 稀疏矩阵优化
print(f"\n2. 稀疏矩阵优化:")
# 创建稀疏矩阵
density = 0.01 # 1%的非零元素
sparse_data = np.random.random((n, n))
sparse_data[np.random.random((n, n)) > density] = 0
# 密集矩阵操作
start = time.time()
dense_result = np.dot(sparse_data, sparse_data.T)
dense_time = time.time() - start
dense_memory = sparse_data.nbytes / 1024 / 1024
# 稀疏矩阵操作
sparse_matrix = sparse.csr_matrix(sparse_data)
start = time.time()
sparse_result = sparse_matrix.dot(sparse_matrix.T)
sparse_time = time.time() - start
sparse_memory = (sparse_matrix.data.nbytes +
sparse_matrix.indices.nbytes +
sparse_matrix.indptr.nbytes) / 1024 / 1024
print(f"密集矩阵:")
print(f" 计算时间: {dense_time:.6f}s")
print(f" 内存使用: {dense_memory:.2f} MB")
print(f"稀疏矩阵:")
print(f" 计算时间: {sparse_time:.6f}s")
print(f" 内存使用: {sparse_memory:.2f} MB")
print(f" 内存节省: {dense_memory/sparse_memory:.2f}x")
# 信号处理优化
print(f"\n3. 信号处理优化:")
# 创建信号
fs = 1000 # 采样频率
t = np.arange(0, 1, 1/fs)
signal_data = np.sin(2*np.pi*50*t) + 0.5*np.sin(2*np.pi*120*t) + np.random.normal(0, 0.1, len(t))
# FFT比较
start = time.time()
fft_numpy = np.fft.fft(signal_data)
numpy_fft_time = time.time() - start
start = time.time()
fft_scipy = np.fft.fft(signal_data) # SciPy使用相同的FFT
scipy_fft_time = time.time() - start
# 滤波器设计
start = time.time()
b, a = signal.butter(4, 0.2, 'low')
filtered_signal = signal.filtfilt(b, a, signal_data)
filter_time = time.time() - start
print(f"FFT计算时间: {numpy_fft_time:.6f}s")
print(f"滤波器设计和应用: {filter_time:.6f}s")
except ImportError:
print("SciPy未安装,跳过SciPy集成示例")
7.3 机器学习库集成
# 19. 机器学习库集成
print("\n🤖 机器学习库集成:")
try:
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
print("1. Scikit-learn集成:")
# 生成数据集
X, y = make_classification(n_samples=10000, n_features=20,
n_informative=10, n_redundant=10,
random_state=42)
print(f"数据集形状: {X.shape}")
print(f"数据类型: {X.dtype}")
# 数据预处理优化
start = time.time()
# 标准化(NumPy实现)
X_mean = np.mean(X, axis=0)
X_std = np.std(X, axis=0)
X_normalized = (X - X_mean) / X_std
preprocess_time = time.time() - start
# 训练测试分割
X_train, X_test, y_train, y_test = train_test_split(
X_normalized, y, test_size=0.2, random_state=42)
# 模型训练
start = time.time()
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
train_time = time.time() - start
# 预测
start = time.time()
y_pred = model.predict(X_test)
predict_time = time.time() - start
accuracy = accuracy_score(y_test, y_pred)
print(f"预处理时间: {preprocess_time:.6f}s")
print(f"训练时间: {train_time:.6f}s")
print(f"预测时间: {predict_time:.6f}s")
print(f"准确率: {accuracy:.4f}")
# 特征重要性分析
feature_importance = model.feature_importances_
top_features = np.argsort(feature_importance)[-5:]
print(f"前5个重要特征: {top_features}")
print(f"重要性分数: {feature_importance[top_features]}")
except ImportError:
print("Scikit-learn未安装,跳过机器学习集成示例")
8. 本章小结
8.1 核心知识点
性能基础理论
- 向量化编程的重要性
- 内存布局和缓存效率
- 数据类型对性能的影响
向量化编程技术
- 高级向量化技巧
- 广播优化策略
- 通用函数(ufunc)的使用
内存管理优化
- 内存使用分析和监控
- 就地操作和视图优化
- 缓存友好的编程模式
并行计算优化
- 多线程配置和优化
- 手动并行化技术
- GPU加速的概念和应用
高级索引和广播
- 高效的索引策略
- 广播性能优化
- 复杂数据操作技巧
性能分析和调试
- 性能分析工具的使用
- 内存泄漏检测
- 基准测试方法
8.2 最佳实践
- 🚀 优先向量化: 避免显式循环,使用NumPy的向量化操作
- 💾 内存效率: 使用就地操作,避免不必要的数组复制
- 🔧 数据类型优化: 根据精度需求选择合适的数据类型
- 📊 性能监控: 定期分析代码性能,识别瓶颈
8.3 常见陷阱
- ❌ 过度优化: 在性能不是瓶颈时进行不必要的优化
- ❌ 内存泄漏: 忘记清理大型数组和临时变量
- ❌ 缓存不友好: 不考虑内存访问模式的影响
- ❌ 盲目并行: 在不适合的场景下使用并行计算
8.4 下一步学习
- 🔬 深入学习特定领域的NumPy应用
- 🌐 探索分布式计算框架
- 🤖 学习深度学习框架中的NumPy应用
- 📚 研究数值计算的理论基础
9. 练习题
9.1 基础练习
向量化优化
- 将循环代码转换为向量化操作
- 比较性能差异
- 分析内存使用
内存管理
- 实现内存高效的算法
- 检测和修复内存泄漏
- 优化大数据处理
性能分析
- 使用profiling工具分析代码
- 识别性能瓶颈
- 实施优化策略
9.2 进阶练习
并行计算
- 实现多线程数值计算
- 比较不同并行策略
- 优化线程配置
缓存优化
- 设计缓存友好的算法
- 分析内存访问模式
- 实现分块处理
库集成
- 优化与其他库的交互
- 实现高效的数据转换
- 设计性能测试套件
9.3 挑战练习
自定义ufunc
- 实现高性能的自定义函数
- 使用Cython或Numba加速
- 与NumPy内置函数比较
大规模优化
- 处理超大数据集
- 实现内存映射算法
- 设计分布式计算方案
实时性能
- 实现实时数据处理
- 优化延迟和吞吐量
- 设计性能监控系统
恭喜您完成第8章的学习! 🎉
您已经掌握了NumPy的性能优化技术和高级使用技巧,这些知识将帮助您编写高效、可扩展的数值计算代码。
NumPy教程完结! 🎊
通过这8章的学习,您已经全面掌握了NumPy的核心概念、基本操作、高级功能和性能优化技术。现在您可以:
- 高效地处理大规模数值数据
- 实现复杂的数学和科学计算
- 优化代码性能和内存使用
- 与其他科学计算库无缝集成
继续您的数据科学和科学计算之旅吧! 🚀数据 n_samples = 10000 n_features = 100 X = np.random.random((n_samples, n_features)) weights = np.random.random(n_features) bias = np.random.random()
方法1: 显式循环
def explicit_loop(X, weights, bias): result = np.zeros(X.shape[0]) for i in range(X.shape[0]): result[i] = np.dot(X[i], weights) + bias return result
方法2: 矩阵乘法 + 广播
def vectorized_matmul(X, weights, bias): return np.dot(X, weights) + bias
方法3: 使用einsum
def einsum_method(X, weights, bias): return np.einsum(‘ij,j->i’, X, weights) + bias
性能比较
methods = [ (“显式循环”, explicit_loop), (“矩阵乘法+广播”, vectorized_matmul), (“einsum”, einsum_method) ]
print(“方法\t\t执行时间\t速度提升”) print(“-” * 40)
baseline_time = None for name, func in methods: start = time.time() result = func(X, weights, bias) exec_time = time.time() - start
if baseline_time is None:
baseline_time = exec_time
speedup = 1.0
else:
speedup = baseline_time / exec_time
print(f"{name:>12}\t{exec_time:.6f}s\t{speedup:.1f}x")
复杂广播示例
print(f”\n2. 复杂广播示例:“)
计算所有样本对之间的欧氏距离
def pairwise_distances_loop(X): n = X.shape[0] distances = np.zeros((n, n)) for i in range(n): for j in range(n): distances[i, j] = np.sqrt(np.sum((X[i] - X[j]) ** 2)) return distances
def pairwise_distances_broadcast(X): # 使用广播计算所有对之间的距离 X_expanded = X[:, np.newaxis, :] # (n, 1, d) X_transposed = X[np.newaxis, :, :] # (1, n, d) diff = X_expanded - X_transposed # (n, n, d) distances = np.sqrt(np.sum(diff ** 2, axis=2)) # (n, n) return distances
def pairwise_distances_cdist(X): from scipy.spatial.distance import cdist return cdist(X, X)
小规模测试
X_small = X[:100] # 只取100个样本进行测试
print(“计算100x100距离矩阵:”) print(“方法\t\t执行时间\t速度提升”) print(“-” * 40)
methods_dist = [ (“双重循环”, pairwise_distances_loop), (“广播向量化”, pairwise_distances_broadcast), ]
try: from scipy.spatial.distance import cdist methods_dist.append((“scipy.cdist”, pairwise_distances_cdist)) except ImportError: pass
baseline_time = None for name, func in methods_dist: start = time.time() result = func(X_small) exec_time = time.time() - start
if baseline_time is None:
baseline_time = exec_time
speedup = 1.0
else:
speedup = baseline_time / exec_time
print(f"{name:>12}\t{exec_time:.6f}s\t{speedup:.1f}x")
### 2.3 通用函数(ufunc)优化
```python
# 6. 通用函数(ufunc)优化
print("\n🔧 通用函数(ufunc)优化:")
# 自定义ufunc
print("1. 自定义ufunc:")
# Python函数
def sigmoid_python(x):
return 1 / (1 + np.exp(-x))
# 向量化版本
sigmoid_vectorized = np.vectorize(sigmoid_python)
# NumPy优化版本
def sigmoid_numpy(x):
return 1 / (1 + np.exp(-x))
# 数值稳定版本
def sigmoid_stable(x):
return np.where(x >= 0,
1 / (1 + np.exp(-x)),
np.exp(x) / (1 + np.exp(x)))
# 性能测试
test_data = np.random.uniform(-10, 10, 1000000)
methods = [
("Python函数", lambda x: [sigmoid_python(xi) for xi in x]),
("np.vectorize", sigmoid_vectorized),
("NumPy优化", sigmoid_numpy),
("数值稳定版", sigmoid_stable)
]
print("方法\t\t执行时间\t速度提升")
print("-" * 40)
baseline_time = None
for name, func in methods:
start = time.time()
if name == "Python函数":
result = np.array(func(test_data[:1000])) # 只测试1000个元素
exec_time = (time.time() - start) * 1000 # 按比例放大
else:
result = func(test_data)
exec_time = time.time() - start
if baseline_time is None:
baseline_time = exec_time
speedup = 1.0
else:
speedup = baseline_time / exec_time
print(f"{name:>12}\t{exec_time:.6f}s\t{speedup:.1f}x")
# ufunc的高级特性
print(f"\n2. ufunc的高级特性:")
# reduce操作
arr = np.random.random(1000000)
print("reduce操作比较:")
start = time.time()
sum_builtin = sum(arr)
time_builtin = time.time() - start
start = time.time()
sum_numpy = np.sum(arr)
time_numpy = time.time() - start
start = time.time()
sum_add_reduce = np.add.reduce(arr)
time_reduce = time.time() - start
print(f"Python内置sum: {time_builtin:.6f}s")
print(f"np.sum: {time_numpy:.6f}s")
print(f"np.add.reduce: {time_reduce:.6f}s")
# accumulate操作
print(f"\naccumulate操作:")
small_arr = np.arange(1000)
start = time.time()
cumsum_numpy = np.cumsum(small_arr)
time_cumsum = time.time() - start
start = time.time()
cumsum_accumulate = np.add.accumulate(small_arr)
time_accumulate = time.time() - start
print(f"np.cumsum: {time_cumsum:.6f}s")
print(f"np.add.accumulate: {time_accumulate:.6f}s")
# outer操作
print(f"\nouter操作:")
x = np.arange(1000)
y = np.arange(1000)
start = time.time()
outer_manual = x[:, np.newaxis] * y[np.newaxis, :]
time_manual = time.time() - start
start = time.time()
outer_ufunc = np.multiply.outer(x, y)
time_outer = time.time() - start
print(f"手动广播: {time_manual:.6f}s")
print(f"np.multiply.outer: {time_outer:.6f}s")
3. 内存管理优化
3.1 内存使用分析
# 7. 内存使用分析
print("\n💾 内存使用分析:")
def get_memory_usage():
"""获取当前内存使用量(MB)"""
process = psutil.Process()
return process.memory_info().rss / 1024 / 1024
# 内存使用监控
print("1. 内存使用监控:")
initial_memory = get_memory_usage()
print(f"初始内存使用: {initial_memory:.2f} MB")
# 创建大数组
print(f"\n创建大数组:")
sizes = [1000, 5000, 10000, 20000]
memory_usage = []
for size in sizes:
arr = np.random.random((size, size))
current_memory = get_memory_usage()
array_memory = current_memory - initial_memory
memory_usage.append(array_memory)
expected_memory = arr.nbytes / 1024 / 1024
overhead = array_memory - expected_memory
print(f"数组大小: {size}x{size}")
print(f" 预期内存: {expected_memory:.2f} MB")
print(f" 实际内存: {array_memory:.2f} MB")
print(f" 内存开销: {overhead:.2f} MB ({overhead/expected_memory*100:.1f}%)")
del arr
gc.collect()
# 内存碎片化分析
print(f"\n2. 内存碎片化分析:")
# 创建多个小数组
small_arrays = []
memory_before = get_memory_usage()
for i in range(1000):
arr = np.random.random((100, 100))
small_arrays.append(arr)
memory_after = get_memory_usage()
total_array_memory = sum(arr.nbytes for arr in small_arrays) / 1024 / 1024
actual_memory_increase = memory_after - memory_before
print(f"1000个小数组:")
print(f" 数组总内存: {total_array_memory:.2f} MB")
print(f" 实际内存增加: {actual_memory_increase:.2f} MB")
print(f" 碎片化开销: {actual_memory_increase - total_array_memory:.2f} MB")
# 清理内存
del small_arrays
gc.collect()
# 创建一个大数组
large_array = np.random.random((int(np.sqrt(1000) * 100), int(np.sqrt(1000) * 100)))
memory_large = get_memory_usage()
large_array_memory = large_array.nbytes / 1024 / 1024
actual_large_increase = memory_large - memory_before
print(f"\n1个大数组:")
print(f" 数组内存: {large_array_memory:.2f} MB")
print(f" 实际内存增加: {actual_large_increase:.2f} MB")
print(f" 内存效率: {large_array_memory/actual_large_increase*100:.1f}%")
del large_array
gc.collect()
3.2 内存优化技术
# 8. 内存优化技术
print("\n🔧 内存优化技术:")
# 就地操作优化
print("1. 就地操作优化:")
# 创建测试数据
n = 5000
A = np.random.random((n, n))
B = np.random.random((n, n))
memory_before = get_memory_usage()
# 非就地操作
start = time.time()
C1 = A + B
C1 = C1 * 2
C1 = np.sqrt(C1)
time_copy = time.time() - start
memory_copy = get_memory_usage() - memory_before
# 就地操作
A_copy = A.copy()
B_copy = B.copy()
memory_before = get_memory_usage()
start = time.time()
A_copy += B_copy
A_copy *= 2
np.sqrt(A_copy, out=A_copy)
time_inplace = time.time() - start
memory_inplace = get_memory_usage() - memory_before
print(f"非就地操作:")
print(f" 时间: {time_copy:.4f}s")
print(f" 内存增加: {memory_copy:.2f} MB")
print(f"就地操作:")
print(f" 时间: {time_inplace:.4f}s")
print(f" 内存增加: {memory_inplace:.2f} MB")
print(f" 内存节省: {memory_copy - memory_inplace:.2f} MB")
# 验证结果一致性
assert np.allclose(C1, A_copy)
# 视图 vs 副本
print(f"\n2. 视图 vs 副本:")
original = np.random.random((1000, 1000))
memory_before = get_memory_usage()
# 创建视图
view = original[::2, ::2]
memory_view = get_memory_usage() - memory_before
# 创建副本
copy = original[::2, ::2].copy()
memory_copy = get_memory_usage() - memory_view - memory_before
print(f"原数组内存: {original.nbytes / 1024 / 1024:.2f} MB")
print(f"视图额外内存: {memory_view:.2f} MB")
print(f"副本额外内存: {memory_copy:.2f} MB")
# 修改测试
original[0, 0] = 999
print(f"修改原数组后:")
print(f" 视图受影响: {view[0, 0] == 999}")
print(f" 副本不受影响: {copy[0, 0] != 999}")
# 内存映射优化
print(f"\n3. 内存映射优化:")
# 创建大文件
large_data = np.random.random((10000, 1000))
np.save('large_file.npy', large_data)
del large_data
gc.collect()
memory_before = get_memory_usage()
# 普通加载
loaded_array = np.load('large_file.npy')
memory_loaded = get_memory_usage() - memory_before
# 内存映射加载
memory_before = get_memory_usage()
mmap_array = np.load('large_file.npy', mmap_mode='r')
memory_mmap = get_memory_usage() - memory_before
print(f"普通加载内存: {memory_loaded:.2f} MB")
print(f"内存映射内存: {memory_mmap:.2f} MB")
print(f"内存节省: {memory_loaded - memory_mmap:.2f} MB")
# 访问性能比较
start = time.time()
subset_loaded = loaded_array[1000:2000, 100:200]
time_loaded = time.time() - start
start = time.time()
subset_mmap = mmap_array[1000:2000, 100:200]
time_mmap = time.time() - start
print(f"普通数组访问时间: {time_loaded:.6f}s")
print(f"内存映射访问时间: {time_mmap:.6f}s")
del loaded_array, mmap_array
gc.collect()
3.3 缓存优化
# 9. 缓存优化
print("\n🗄️ 缓存优化:")
# 缓存友好的数据访问模式
print("1. 缓存友好的数据访问模式:")
# 创建大矩阵
size = 2000
matrix = np.random.random((size, size))
# 按行访问(缓存友好)
start = time.time()
row_sum = 0
for i in range(size):
row_sum += np.sum(matrix[i, :])
time_row = time.time() - start
# 按列访问(缓存不友好)
start = time.time()
col_sum = 0
for j in range(size):
col_sum += np.sum(matrix[:, j])
time_col = time.time() - start
# 向量化访问
start = time.time()
total_sum = np.sum(matrix)
time_vectorized = time.time() - start
print(f"按行访问时间: {time_row:.4f}s")
print(f"按列访问时间: {time_col:.4f}s")
print(f"向量化访问时间: {time_vectorized:.4f}s")
print(f"行/列访问比: {time_col/time_row:.2f}")
# 分块处理优化
print(f"\n2. 分块处理优化:")
def matrix_multiply_naive(A, B):
"""朴素矩阵乘法"""
n, m, p = A.shape[0], A.shape[1], B.shape[1]
C = np.zeros((n, p))
for i in range(n):
for j in range(p):
for k in range(m):
C[i, j] += A[i, k] * B[k, j]
return C
def matrix_multiply_blocked(A, B, block_size=64):
"""分块矩阵乘法"""
n, m, p = A.shape[0], A.shape[1], B.shape[1]
C = np.zeros((n, p))
for i in range(0, n, block_size):
for j in range(0, p, block_size):
for k in range(0, m, block_size):
i_end = min(i + block_size, n)
j_end = min(j + block_size, p)
k_end = min(k + block_size, m)
C[i:i_end, j:j_end] += np.dot(A[i:i_end, k:k_end],
B[k:k_end, j:j_end])
return C
# 小规模测试
size = 200
A_small = np.random.random((size, size))
B_small = np.random.random((size, size))
# NumPy内置
start = time.time()
C_numpy = np.dot(A_small, B_small)
time_numpy = time.time() - start
# 分块算法
start = time.time()
C_blocked = matrix_multiply_blocked(A_small, B_small, block_size=32)
time_blocked = time.time() - start
print(f"NumPy内置: {time_numpy:.4f}s")
print(f"分块算法: {time_blocked:.4f}s")
print(f"结果一致性: {np.allclose(C_numpy, C_blocked)}")
# 不同分块大小的性能
block_sizes = [16, 32, 64, 128]
print(f"\n不同分块大小的性能:")
print("分块大小\t执行时间\t相对性能")
for block_size in block_sizes:
start = time.time()
C_test = matrix_multiply_blocked(A_small, B_small, block_size)
exec_time = time.time() - start
relative_perf = time_numpy / exec_time
print(f"{block_size:>6}\t{exec_time:.4f}s\t{relative_perf:.2f}x")
4. 并行计算优化
4.1 多线程优化
# 10. 多线程优化
print("\n🧵 多线程优化:")
import threading
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing as mp
# NumPy线程控制
print("1. NumPy线程控制:")
# 检查当前线程设置
print(f"NumPy线程数: {np.get_num_threads()}")
print(f"CPU核心数: {mp.cpu_count()}")
# 矩阵乘法性能测试
size = 1000
A = np.random.random((size, size))
B = np.random.random((size, size))
# 测试不同线程数的性能
thread_counts = [1, 2, 4, 8]
times = []
for num_threads in thread_counts:
np.set_num_threads(num_threads)
start = time.time()
for _ in range(5): # 多次测试取平均
C = np.dot(A, B)
avg_time = (time.time() - start) / 5
times.append(avg_time)
print(f"线程数: {num_threads}, 平均时间: {avg_time:.4f}s")
# 恢复默认线程数
np.set_num_threads(mp.cpu_count())
# 可视化性能
plt.figure(figsize=(10, 6))
plt.plot(thread_counts, times, 'o-', linewidth=2, markersize=8)
plt.xlabel('线程数')
plt.ylabel('执行时间 (秒)')
plt.title('NumPy多线程性能')
plt.grid(True)
plt.savefig('numpy_threading_performance.png', dpi=300, bbox_inches='tight')
plt.show()
# 手动并行化
print(f"\n2. 手动并行化:")
def parallel_sum_chunks(arr, num_workers=4):
"""并行计算数组和"""
chunk_size = len(arr) // num_workers
chunks = [arr[i:i+chunk_size] for i in range(0, len(arr), chunk_size)]
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = [executor.submit(np.sum, chunk) for chunk in chunks]
results = [future.result() for future in futures]
return sum(results)
def parallel_sum_processes(arr, num_workers=4):
"""使用进程池并行计算"""
chunk_size = len(arr) // num_workers
chunks = [arr[i:i+chunk_size] for i in range(0, len(arr), chunk_size)]
with ProcessPoolExecutor(max_workers=num_workers) as executor:
futures = [executor.submit(np.sum, chunk) for chunk in chunks]
results = [future.result() for future in futures]
return sum(results)
# 性能测试
large_array = np.random.random(10000000)
# 串行计算
start = time.time()
serial_result = np.sum(large_array)
serial_time = time.time() - start
# 线程并行
start = time.time()
thread_result = parallel_sum_chunks(large_array, num_workers=4)
thread_time = time.time() - start
# 进程并行
start = time.time()
process_result = parallel_sum_processes(large_array, num_workers=4)
process_time = time.time() - start
print(f"串行计算: {serial_time:.4f}s")
print(f"线程并行: {thread_time:.4f}s (速度提升: {serial_time/thread_time:.2f}x)")
print(f"进程并行: {process_time:.4f}s (速度提升: {serial_time/process_time:.2f}x)")
# 验证结果
print(f"结果一致性: {abs(serial_result - thread_result) < 1e-10}")
print(f"结果一致性: {abs(serial_result - process_result) < 1e-10}")
4.2 GPU加速(概念性)
# 11. GPU加速概念
print("\n🚀 GPU加速概念:")
# 模拟GPU加速的概念
print("GPU加速的优势和考虑因素:")
# CPU vs GPU特性比较
cpu_cores = mp.cpu_count()
gpu_cores = 2048 # 假设的GPU核心数
print(f"CPU核心数: {cpu_cores}")
print(f"GPU核心数: {gpu_cores} (假设)")
print(f"并行度差异: {gpu_cores/cpu_cores:.0f}x")
# 数据传输开销模拟
def simulate_gpu_computation(data_size, computation_intensity):
"""模拟GPU计算的开销"""
# 数据传输时间(模拟)
transfer_time = data_size * 1e-9 # 假设传输速度
# CPU计算时间
cpu_time = computation_intensity * data_size * 1e-8
# GPU计算时间(假设100x加速)
gpu_compute_time = cpu_time / 100
# 总GPU时间 = 传输时间 + 计算时间
total_gpu_time = 2 * transfer_time + gpu_compute_time # 双向传输
return cpu_time, total_gpu_time, transfer_time, gpu_compute_time
# 不同数据大小的分析
data_sizes = [1000, 10000, 100000, 1000000, 10000000]
computation_intensities = [1, 10, 100] # 不同计算强度
print(f"\nGPU加速效果分析:")
print("数据大小\t计算强度\tCPU时间\tGPU时间\t加速比\t传输占比")
print("-" * 70)
for comp_intensity in computation_intensities:
print(f"\n计算强度: {comp_intensity}")
for data_size in data_sizes:
cpu_time, gpu_time, transfer_time, gpu_compute_time = \
simulate_gpu_computation(data_size, comp_intensity)
speedup = cpu_time / gpu_time
transfer_ratio = (2 * transfer_time) / gpu_time * 100
print(f"{data_size:>8}\t{comp_intensity:>6}\t{cpu_time:.6f}s\t"
f"{gpu_time:.6f}s\t{speedup:.2f}x\t{transfer_ratio:.1f}%")
# GPU适用性分析
print(f"\nGPU加速适用性分析:")
print("✅ 适合GPU的场景:")
print(" - 大规模并行计算")
print(" - 计算密集型任务")
print(" - 数据规模大,传输开销相对较小")
print(" - 重复性计算")
print("\n❌ 不适合GPU的场景:")
print(" - 小规模数据")
print(" - 频繁的CPU-GPU数据传输")
print(" - 复杂的控制流")
print(" - 内存访问模式不规则")
# 实际GPU库的使用建议
print(f"\n实际GPU库使用建议:")
print("1. CuPy: NumPy-like GPU库")
print(" - 语法与NumPy几乎相同")
print(" - 适合现有NumPy代码迁移")
print("\n2. JAX: 可编译的NumPy")
print(" - 支持JIT编译")
print(" - 自动微分")
print(" - 支持GPU/TPU")
print("\n3. TensorFlow/PyTorch:")
print(" - 深度学习框架")
print(" - 内置GPU支持")
print(" - 适合机器学习任务")
5. 高级索引和广播技巧
5.1 高级索引优化
# 12. 高级索引优化
print("\n🎯 高级索引优化:")
# 布尔索引优化
print("1. 布尔索引优化:")
# 创建测试数据
n = 1000000
data = np.random.random(n)
threshold = 0.5
# 方法1: 布尔索引
start = time.time()
mask = data > threshold
filtered_bool = data[mask]
time_bool = time.time() - start
# 方法2: np.where
start = time.time()
indices = np.where(data > threshold)[0]
filtered_where = data[indices]
time_where = time.time() - start
# 方法3: np.compress
start = time.time()
filtered_compress = np.compress(data > threshold, data)
time_compress = time.time() - start
# 方法4: 列表推导(对比)
start = time.time()
filtered_list = np.array([x for x in data if x > threshold])
time_list = time.time() - start
print(f"布尔索引: {time_bool:.6f}s")
print(f"np.where: {time_where:.6f}s")
print(f"np.compress: {time_compress:.6f}s")
print(f"列表推导: {time_list:.6f}s")
# 验证结果一致性
assert np.array_equal(filtered_bool, filtered_where)
assert np.array_equal(filtered_bool, filtered_compress)
assert np.allclose(filtered_bool, filtered_list)
# 多条件索引优化
print(f"\n2. 多条件索引优化:")
# 创建2D数据
rows, cols = 10000, 100
data_2d = np.random.random((rows, cols))
condition1 = np.random.random(rows) > 0.3
condition2 = np.random.random(rows) > 0.7
# 方法1: 逐步过滤
start = time.time()
mask1 = condition1
mask2 = condition2
combined_mask = mask1 & mask2
result1 = data_2d[combined_mask]
time_step = time.time() - start
# 方法2: 直接组合
start = time.time()
result2 = data_2d[condition1 & condition2]
time_direct = time.time() - start
# 方法3: np.logical_and
start = time.time()
combined_logical = np.logical_and(condition1, condition2)
result3 = data_2d[combined_logical]
time_logical = time.time() - start
print(f"逐步过滤: {time_step:.6f}s")
print(f"直接组合: {time_direct:.6f}s")
print(f"logical_and: {time_logical:.6f}s")
# 花式索引优化
print(f"\n3. 花式索引优化:")
# 创建测试数据
data_large = np.random.random((10000, 1000))
indices = np.random.randint(0, 10000, 5000)
# 方法1: 花式索引
start = time.time()
result_fancy = data_large[indices]
time_fancy = time.time() - start
# 方法2: np.take
start = time.time()
result_take = np.take(data_large, indices, axis=0)
time_take = time.time() - start
# 方法3: 循环(对比)
start = time.time()
result_loop = np.array([data_large[i] for i in indices])
time_loop = time.time() - start
print(f"花式索引: {time_fancy:.6f}s")
print(f"np.take: {time_take:.6f}s")
print(f"循环方式: {time_loop:.6f}s")
# 验证结果
assert np.array_equal(result_fancy, result_take)
assert np.array_equal(result_fancy, result_loop)
5.2 广播性能优化
# 13. 广播性能优化
print("\n📡 广播性能优化:")
# 广播内存效率
print("1. 广播内存效率:")
# 创建测试数据
n, m = 5000, 1000
matrix = np.random.random((n, m))
vector = np.random.random(m)
# 方法1: 显式扩展
start = time.time()
vector_expanded = np.tile(vector, (n, 1))
result1 = matrix + vector_expanded
time_explicit = time.time() - start
memory_explicit = vector_expanded.nbytes / 1024 / 1024
# 方法2: 广播
start = time.time()
result2 = matrix + vector
time_broadcast = time.time() - start
memory_broadcast = 0 # 广播不创建额外数组
print(f"显式扩展:")
print(f" 时间: {time_explicit:.6f}s")
print(f" 额外内存: {memory_explicit:.2f} MB")
print(f"广播:")
print(f" 时间: {time_broadcast:.6f}s")
print(f" 额外内存: {memory_broadcast:.2f} MB")
print(f" 速度提升: {time_explicit/time_broadcast:.2f}x")
# 验证结果
assert np.allclose(result1, result2)
# 复杂广播优化
print(f"\n2. 复杂广播优化:")
# 三维广播示例
shape1 = (100, 1, 50)
shape2 = (1, 200, 50)
shape3 = (100, 200, 1)
arr1 = np.random.random(shape1)
arr2 = np.random.random(shape2)
arr3 = np.random.random(shape3)
# 方法1: 逐步计算
start = time.time()
temp1 = arr1 + arr2 # 广播到 (100, 200, 50)
result_step = temp1 + arr3 # 广播到 (100, 200, 50)
time_step = time.time() - start
# 方法2: 一次性计算
start = time.time()
result_once = arr1 + arr2 + arr3
time_once = time.time() - start
print(f"逐步计算: {time_step:.6f}s")
print(f"一次性计算: {time_once:.6f}s")
print(f"性能差异: {time_step/time_once:.2f}x")
# 验证结果
assert np.allclose(result_step, result_once)
# 广播形状优化
print(f"\n3. 广播形状优化:")
# 不同的广播策略
data = np.random.random((1000, 1000))
weights_row = np.random.random((1, 1000)) # 行权重
weights_col = np.random.random((1000, 1)) # 列权重
# 策略1: 分别应用权重
start = time.time()
weighted1 = data * weights_row
weighted1 = weighted1 * weights_col
time_separate = time.time() - start
# 策略2: 组合权重
start = time.time()
combined_weights = weights_row * weights_col
weighted2 = data * combined_weights
time_combined = time.time() - start
# 策略3: 直接广播
start = time.time()
weighted3 = data * weights_row * weights_col
time_direct = time.time() - start
print(f"分别应用: {time_separate:.6f}s")
print(f"组合权重: {time_combined:.6f}s")
print(f"直接广播: {time_direct:.6f}s")
# 验证结果
assert np.allclose(weighted1, weighted2)
assert np.allclose(weighted1, weighted3)
6. 性能分析和调试
6.1 性能分析工具
# 14. 性能分析工具
print("\n🔍 性能分析工具:")
import cProfile
import pstats
from line_profiler import LineProfiler
# 创建测试函数
def matrix_operations():
"""复杂的矩阵操作函数"""
n = 1000
# 创建矩阵
A = np.random.random((n, n))
B = np.random.random((n, n))
# 矩阵乘法
C = np.dot(A, B)
# 特征值计算
eigenvals = np.linalg.eigvals(A)
# 统计计算
mean_vals = np.mean(C, axis=0)
std_vals = np.std(C, axis=0)
# 条件操作
result = np.where(C > np.mean(C), C, 0)
return result
# cProfile分析
print("1. cProfile性能分析:")
# 运行性能分析
profiler = cProfile.Profile()
profiler.enable()
result = matrix_operations()
profiler.disable()
# 保存和显示结果
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(10) # 显示前10个最耗时的函数
# 时间测量装饰器
def timing_decorator(func):
"""计时装饰器"""
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
print(f"{func.__name__} 执行时间: {end - start:.6f}s")
return result
return wrapper
@timing_decorator
def optimized_matrix_ops():
"""优化的矩阵操作"""
n = 1000
# 使用更高效的数据类型
A = np.random.random((n, n)).astype(np.float32)
B = np.random.random((n, n)).astype(np.float32)
# 矩阵乘法
C = np.dot(A, B)
# 使用就地操作
mean_val = np.mean(C)
np.subtract(C, mean_val, out=C)
np.maximum(C, 0, out=C)
return C
print(f"\n2. 优化前后对比:")
print("原始版本:")
original_result = matrix_operations()
print("优化版本:")
optimized_result = optimized_matrix_ops()
6.2 内存分析
# 15. 内存分析
print("\n💾 内存分析:")
# 内存使用分析函数
def analyze_memory_usage(func, *args, **kwargs):
"""分析函数的内存使用"""
import tracemalloc
# 开始内存跟踪
tracemalloc.start()
# 记录初始内存
initial_memory = get_memory_usage()
# 执行函数
start_time = time.time()
result = func(*args, **kwargs)
execution_time = time.time() - start_time
# 记录最终内存
final_memory = get_memory_usage()
# 获取内存统计
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"函数: {func.__name__}")
print(f" 执行时间: {execution_time:.6f}s")
print(f" 内存变化: {final_memory - initial_memory:.2f} MB")
print(f" 峰值内存: {peak / 1024 / 1024:.2f} MB")
print(f" 当前内存: {current / 1024 / 1024:.2f} MB")
return result
# 测试不同的内存使用模式
def memory_efficient_function():
"""内存高效的函数"""
# 使用生成器和就地操作
n = 5000
result = 0
# 分块处理
chunk_size = 1000
for i in range(0, n, chunk_size):
chunk = np.random.random((chunk_size, chunk_size))
result += np.sum(chunk)
del chunk # 显式删除
gc.collect()
return result
def memory_intensive_function():
"""内存密集的函数"""
# 创建多个大数组
arrays = []
for i in range(5):
arr = np.random.random((2000, 2000))
arrays.append(arr)
# 计算结果
result = sum(np.sum(arr) for arr in arrays)
return result
print("内存高效版本:")
result1 = analyze_memory_usage(memory_efficient_function)
print("\n内存密集版本:")
result2 = analyze_memory_usage(memory_intensive_function)
# 内存泄漏检测
print(f"\n3. 内存泄漏检测:")
def potential_memory_leak():
"""可能存在内存泄漏的函数"""
arrays = []
for i in range(100):
arr = np.random.random((100, 100))
arrays.append(arr)
# 忘记清理旧数组
return arrays[-1] # 只返回最后一个,但保持了所有数组的引用
# 检测内存泄漏
initial_memory = get_memory_usage()
for i in range(10):
result = potential_memory_leak()
current_memory = get_memory_usage()
print(f"迭代 {i+1}: 内存使用 {current_memory:.2f} MB "
f"(增加 {current_memory - initial_memory:.2f} MB)")
# 清理内存
gc.collect()
final_memory = get_memory_usage()
print(f"垃圾回收后: {final_memory:.2f} MB")
6.3 性能基准测试
”`python
16. 性能基准测试
print(“\n📊 性能基准测试:”)
import timeit from collections import defaultdict
class PerformanceBenchmark: “”“性能基准测试类”“”
def __init__(self):
self.results = defaultdict(list)
def benchmark_function(self, func, *args, name=None, number=10, **kwargs):
"""基准测试函数"""
if name is None:
name = func.__name__
# 使用timeit进行精确计时
timer = timeit.Timer(lambda: func(*args, **kwargs))
times = timer.repeat(repeat=5, number=number)
avg_time = sum(times) / len(times) / number
min_time = min(times) / number
max_time = max(times) / number
self.results[name] = {
'avg_time': avg_time,
'min_time': min_time,
'max_time': max_time,
'std_time': np.std([t/number for t in times])
}
return avg_time
def compare_functions(self, functions, *args, **kwargs):
"""比较多个函数的性能"""
print("函数性能比较:")
print("函数名\t\t平均时间\t最小时间\t最大时间\t标准差")
print("-" * 70)
baseline_time = None
for name, func in functions:
avg_time = self.benchmark_function(func, *args, name=name, **kwargs)
result = self.results[name]
if baseline_time is None:
baseline_time = avg_time
speedup = 1.0
else:
speedup = baseline_time / avg_time
print(f"{name:>12}\t{result['avg_time']:.6f}s\t"
f"{result['min_time']:.6f}s\t{result['max_time']:.6f}s\t"
f"{result['std_time']:.6f}s\t{speedup:.2f}x")
def plot_results(self):
"""绘制性能比较图"""
names = list(self.results.keys())
avg_times = [self.results[name]['avg_time'] for name in names]
std_times = [self.results[name]['std_time'] for name in names]
plt.figure(figsize=(12, 6))
bars = plt.bar(names, avg_times, yerr=std_times, capsize=5)
plt.ylabel('执行时间 (秒)')
plt.title('函数性能比较')
plt.xticks(rotation=45)
# 添加数值标签
for bar, time in zip(bars, avg_times):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.001,
f'{time:.4f}s', ha='center', va='bottom')
plt.tight_layout()
plt.savefig('performance_benchmark.png', dpi=300, bbox_inches='tight')
plt.show()
创建基准测试实例
benchmark = PerformanceBenchmark()
定义测试函数
def numpy_sum(arr): return np.sum(arr)
def python_sum(arr): return sum(arr)
def numpy_mean(arr): return np.mean(arr)
def manual_mean(arr): return np.sum(arr) / len(arr)
def numpy_std(arr): return np.std(arr)
def manual_std(arr): mean = np.mean(arr) return np.sqrt(np.mean((arr - mean) ** 2))