📚 本章概述
本章将深入探讨NumPy的文件输入输出功能和数据持久化技术。您将学习如何保存和加载NumPy数组,处理各种文件格式,以及与其他数据格式进行转换。这些技能对于数据分析、机器学习和科学计算项目至关重要。
🎯 学习目标
- 掌握NumPy原生文件格式的读写
- 学会处理文本文件和CSV数据
- 了解二进制文件的高效存储
- 掌握与其他格式的数据交换
- 学会大数据文件的处理技巧
- 理解数据压缩和性能优化
1. NumPy原生文件格式
1.1 .npy格式 - 单数组存储
import numpy as np
import os
import time
import matplotlib.pyplot as plt
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
print("💾 NumPy原生文件格式")
print("=" * 50)
# 创建示例数据
np.random.seed(42)
data_1d = np.random.random(1000)
data_2d = np.random.randint(0, 100, (50, 20))
data_3d = np.random.normal(0, 1, (10, 10, 5))
print("创建示例数据:")
print(f"1D数组形状: {data_1d.shape}, 类型: {data_1d.dtype}")
print(f"2D数组形状: {data_2d.shape}, 类型: {data_2d.dtype}")
print(f"3D数组形状: {data_3d.shape}, 类型: {data_3d.dtype}")
# 保存单个数组到.npy文件
print(f"\n保存数组到.npy文件:")
np.save('data_1d.npy', data_1d)
np.save('data_2d.npy', data_2d)
np.save('data_3d.npy', data_3d)
print("✅ 数组已保存到.npy文件")
# 加载.npy文件
print(f"\n从.npy文件加载数组:")
loaded_1d = np.load('data_1d.npy')
loaded_2d = np.load('data_2d.npy')
loaded_3d = np.load('data_3d.npy')
print(f"加载的1D数组形状: {loaded_1d.shape}")
print(f"加载的2D数组形状: {loaded_2d.shape}")
print(f"加载的3D数组形状: {loaded_3d.shape}")
# 验证数据完整性
print(f"\n数据完整性验证:")
print(f"1D数组相等: {np.array_equal(data_1d, loaded_1d)}")
print(f"2D数组相等: {np.array_equal(data_2d, loaded_2d)}")
print(f"3D数组相等: {np.array_equal(data_3d, loaded_3d)}")
# 文件大小信息
print(f"\n文件大小信息:")
for filename in ['data_1d.npy', 'data_2d.npy', 'data_3d.npy']:
size = os.path.getsize(filename)
print(f"{filename}: {size} 字节 ({size/1024:.2f} KB)")
1.2 .npz格式 - 多数组存储
# 2. .npz格式 - 多数组存储
print("\n📦 .npz格式 - 多数组存储:")
# 创建多个相关数组
features = np.random.random((1000, 10))
labels = np.random.randint(0, 3, 1000)
weights = np.random.random(10)
metadata = np.array(['feature_' + str(i) for i in range(10)])
print("创建多个数组:")
print(f"特征矩阵: {features.shape}")
print(f"标签向量: {labels.shape}")
print(f"权重向量: {weights.shape}")
print(f"元数据: {metadata.shape}")
# 保存多个数组到.npz文件(未压缩)
print(f"\n保存到.npz文件(未压缩):")
np.savez('dataset.npz',
features=features,
labels=labels,
weights=weights,
metadata=metadata)
# 保存多个数组到.npz文件(压缩)
print(f"保存到.npz文件(压缩):")
np.savez_compressed('dataset_compressed.npz',
features=features,
labels=labels,
weights=weights,
metadata=metadata)
# 加载.npz文件
print(f"\n从.npz文件加载:")
with np.load('dataset.npz') as data:
print(f"文件中的数组: {list(data.keys())}")
loaded_features = data['features']
loaded_labels = data['labels']
loaded_weights = data['weights']
loaded_metadata = data['metadata']
print(f"加载的特征矩阵形状: {loaded_features.shape}")
print(f"加载的标签向量形状: {loaded_labels.shape}")
# 比较文件大小
size_uncompressed = os.path.getsize('dataset.npz')
size_compressed = os.path.getsize('dataset_compressed.npz')
print(f"\n文件大小比较:")
print(f"未压缩: {size_uncompressed} 字节 ({size_uncompressed/1024:.2f} KB)")
print(f"压缩后: {size_compressed} 字节 ({size_compressed/1024:.2f} KB)")
print(f"压缩比: {size_uncompressed/size_compressed:.2f}:1")
print(f"节省空间: {(1-size_compressed/size_uncompressed)*100:.1f}%")
# 验证数据完整性
print(f"\n数据完整性验证:")
print(f"特征矩阵相等: {np.array_equal(features, loaded_features)}")
print(f"标签向量相等: {np.array_equal(labels, loaded_labels)}")
print(f"权重向量相等: {np.array_equal(weights, loaded_weights)}")
print(f"元数据相等: {np.array_equal(metadata, loaded_metadata)}")
1.3 内存映射文件
# 3. 内存映射文件
print("\n🗺️ 内存映射文件:")
# 创建大数组
large_array = np.random.random((1000, 1000))
print(f"大数组形状: {large_array.shape}")
print(f"大数组大小: {large_array.nbytes / 1024 / 1024:.2f} MB")
# 保存为内存映射文件
print(f"\n创建内存映射文件:")
memmap_file = np.memmap('large_data.dat', dtype='float64', mode='w+', shape=(1000, 1000))
memmap_file[:] = large_array[:]
del memmap_file # 确保数据写入磁盘
# 加载内存映射文件
print(f"加载内存映射文件:")
loaded_memmap = np.memmap('large_data.dat', dtype='float64', mode='r', shape=(1000, 1000))
print(f"内存映射数组形状: {loaded_memmap.shape}")
print(f"内存映射数组类型: {type(loaded_memmap)}")
# 内存映射的优势:只加载需要的部分
print(f"\n部分数据访问:")
subset = loaded_memmap[100:200, 200:300]
print(f"子集形状: {subset.shape}")
print(f"子集均值: {np.mean(subset):.6f}")
# 性能比较
print(f"\n性能比较:")
# 普通加载
start_time = time.time()
normal_array = np.load('data_2d.npy')
normal_time = time.time() - start_time
# 内存映射加载
start_time = time.time()
mmap_array = np.memmap('large_data.dat', dtype='float64', mode='r', shape=(1000, 1000))
mmap_time = time.time() - start_time
print(f"普通加载时间: {normal_time:.6f}秒")
print(f"内存映射加载时间: {mmap_time:.6f}秒")
print(f"内存映射速度提升: {normal_time/mmap_time:.1f}x")
2. 文本文件处理
2.1 基本文本文件读写
# 4. 文本文件读写
print("\n📄 文本文件读写:")
# 创建示例数据
data_matrix = np.array([[1.1, 2.2, 3.3],
[4.4, 5.5, 6.6],
[7.7, 8.8, 9.9]])
print("原始数据:")
print(data_matrix)
# 保存为文本文件
print(f"\n保存为文本文件:")
np.savetxt('data.txt', data_matrix)
np.savetxt('data_formatted.txt', data_matrix, fmt='%.2f')
np.savetxt('data_delimited.txt', data_matrix, delimiter=',', fmt='%.3f')
print("✅ 数据已保存为文本文件")
# 从文本文件加载
print(f"\n从文本文件加载:")
loaded_data = np.loadtxt('data.txt')
loaded_formatted = np.loadtxt('data_formatted.txt')
loaded_delimited = np.loadtxt('data_delimited.txt', delimiter=',')
print(f"加载的数据形状: {loaded_data.shape}")
print(f"数据相等性验证: {np.allclose(data_matrix, loaded_data)}")
# 查看文件内容
print(f"\n文件内容示例:")
with open('data_formatted.txt', 'r') as f:
content = f.read()
print("data_formatted.txt内容:")
print(content)
# 带标题和注释的保存
print(f"\n带标题和注释的保存:")
header = "Column1,Column2,Column3"
footer = "Data generated by NumPy"
np.savetxt('data_with_header.csv',
data_matrix,
delimiter=',',
header=header,
footer=footer,
comments='# ',
fmt='%.2f')
print("✅ 带标题的CSV文件已保存")
2.2 复杂文本文件处理
# 5. 复杂文本文件处理
print("\n🔧 复杂文本文件处理:")
# 创建混合数据类型的数组
mixed_data = np.array([
('Alice', 25, 85.5, True),
('Bob', 30, 92.3, False),
('Charlie', 35, 78.9, True),
('Diana', 28, 88.7, False)
], dtype=[('name', 'U10'), ('age', 'i4'), ('score', 'f8'), ('passed', '?')])
print("混合数据类型数组:")
print(mixed_data)
print(f"数据类型: {mixed_data.dtype}")
# 保存结构化数组
print(f"\n保存结构化数组:")
np.savetxt('structured_data.txt', mixed_data,
fmt='%s %d %.1f %s',
header='Name Age Score Passed')
# 处理缺失值
print(f"\n处理缺失值:")
data_with_nan = np.array([[1.0, 2.0, np.nan],
[4.0, np.nan, 6.0],
[7.0, 8.0, 9.0]])
print("包含NaN的数据:")
print(data_with_nan)
# 保存时处理NaN
np.savetxt('data_with_nan.txt', data_with_nan,
fmt='%.2f',
delimiter=',',
comments='',
header='Col1,Col2,Col3')
# 加载时处理缺失值
print(f"\n加载时处理缺失值:")
def custom_converter(s):
try:
return float(s)
except ValueError:
return np.nan
# 使用genfromtxt处理复杂情况
loaded_with_nan = np.genfromtxt('data_with_nan.txt',
delimiter=',',
skip_header=1,
filling_values=np.nan)
print("加载的包含NaN的数据:")
print(loaded_with_nan)
print(f"NaN位置相同: {np.isnan(data_with_nan).sum() == np.isnan(loaded_with_nan).sum()}")
2.3 CSV文件高级处理
# 6. CSV文件高级处理
print("\n📊 CSV文件高级处理:")
# 创建复杂的CSV数据
np.random.seed(42)
dates = ['2023-01-' + str(i+1).zfill(2) for i in range(10)]
products = ['Product_' + chr(65+i) for i in range(5)]
sales_data = []
for i in range(50):
date = np.random.choice(dates)
product = np.random.choice(products)
quantity = np.random.randint(1, 100)
price = np.random.uniform(10, 100)
sales_data.append([date, product, quantity, price])
# 转换为结构化数组
sales_array = np.array(sales_data, dtype=[
('date', 'U10'),
('product', 'U10'),
('quantity', 'i4'),
('price', 'f8')
])
print(f"销售数据形状: {sales_array.shape}")
print("前5条记录:")
print(sales_array[:5])
# 保存为CSV
print(f"\n保存为CSV文件:")
with open('sales_data.csv', 'w') as f:
f.write('Date,Product,Quantity,Price\n')
for record in sales_array:
f.write(f"{record['date']},{record['product']},{record['quantity']},{record['price']:.2f}\n")
# 使用genfromtxt加载CSV
print(f"\n使用genfromtxt加载CSV:")
csv_data = np.genfromtxt('sales_data.csv',
delimiter=',',
skip_header=1,
dtype=None,
encoding='utf-8',
names=True)
print(f"加载的CSV数据形状: {csv_data.shape}")
print("前3条记录:")
print(csv_data[:3])
# 数据分析示例
print(f"\n数据分析示例:")
quantities = csv_data['Quantity'].astype(float)
prices = csv_data['Price'].astype(float)
print(f"平均数量: {np.mean(quantities):.2f}")
print(f"平均价格: {np.mean(prices):.2f}")
print(f"总销售额: {np.sum(quantities * prices):.2f}")
# 按产品分组统计
unique_products = np.unique(csv_data['Product'])
print(f"\n按产品统计:")
for product in unique_products:
mask = csv_data['Product'] == product
product_quantities = quantities[mask]
product_prices = prices[mask]
total_sales = np.sum(product_quantities * product_prices)
print(f"{product}: 总销售额 {total_sales:.2f}")
3. 二进制文件处理
3.1 原始二进制文件
# 7. 原始二进制文件处理
print("\n🔢 原始二进制文件处理:")
# 创建不同数据类型的数组
int_array = np.arange(1000, dtype=np.int32)
float_array = np.random.random(1000).astype(np.float32)
complex_array = np.random.random(500) + 1j * np.random.random(500)
print("数据类型示例:")
print(f"整数数组: {int_array.dtype}, 大小: {int_array.nbytes} 字节")
print(f"浮点数组: {float_array.dtype}, 大小: {float_array.nbytes} 字节")
print(f"复数数组: {complex_array.dtype}, 大小: {complex_array.nbytes} 字节")
# 保存为原始二进制文件
print(f"\n保存为原始二进制文件:")
int_array.tofile('int_data.bin')
float_array.tofile('float_data.bin')
complex_array.tofile('complex_data.bin')
print("✅ 二进制文件已保存")
# 从二进制文件加载
print(f"\n从二进制文件加载:")
loaded_int = np.fromfile('int_data.bin', dtype=np.int32)
loaded_float = np.fromfile('float_data.bin', dtype=np.float32)
loaded_complex = np.fromfile('complex_data.bin', dtype=np.complex128)
print(f"加载的整数数组形状: {loaded_int.shape}")
print(f"加载的浮点数组形状: {loaded_float.shape}")
print(f"加载的复数数组形状: {loaded_complex.shape}")
# 验证数据完整性
print(f"\n数据完整性验证:")
print(f"整数数组相等: {np.array_equal(int_array, loaded_int)}")
print(f"浮点数组相等: {np.allclose(float_array, loaded_float)}")
print(f"复数数组相等: {np.allclose(complex_array, loaded_complex)}")
# 文件大小比较
print(f"\n文件大小比较:")
formats = [
('int_data.bin', 'int_data.npy'),
('float_data.bin', 'float_data.npy'),
('complex_data.bin', 'complex_data.npy')
]
# 保存为.npy格式用于比较
np.save('int_data.npy', int_array)
np.save('float_data.npy', float_array)
np.save('complex_data.npy', complex_array)
for bin_file, npy_file in formats:
bin_size = os.path.getsize(bin_file)
npy_size = os.path.getsize(npy_file)
print(f"{bin_file}: {bin_size} 字节")
print(f"{npy_file}: {npy_size} 字节")
print(f"开销: {npy_size - bin_size} 字节 ({(npy_size/bin_size-1)*100:.1f}%)\n")
3.2 自定义二进制格式
# 8. 自定义二进制格式
print("\n🛠️ 自定义二进制格式:")
import struct
# 创建自定义数据结构
class DataRecord:
def __init__(self, timestamp, sensor_id, temperature, humidity, pressure):
self.timestamp = timestamp
self.sensor_id = sensor_id
self.temperature = temperature
self.humidity = humidity
self.pressure = pressure
def to_bytes(self):
# 格式: 双精度时间戳 + 整数ID + 3个浮点数
return struct.pack('dIfff',
self.timestamp,
self.sensor_id,
self.temperature,
self.humidity,
self.pressure)
@classmethod
def from_bytes(cls, data):
timestamp, sensor_id, temp, humid, press = struct.unpack('dIfff', data)
return cls(timestamp, sensor_id, temp, humid, press)
def __repr__(self):
return f"DataRecord(ts={self.timestamp:.2f}, id={self.sensor_id}, T={self.temperature:.1f}°C, H={self.humidity:.1f}%, P={self.pressure:.1f}hPa)"
# 生成示例数据
print("生成传感器数据:")
records = []
for i in range(100):
record = DataRecord(
timestamp=time.time() + i * 60, # 每分钟一个记录
sensor_id=i % 10, # 10个传感器
temperature=20 + 10 * np.random.random(),
humidity=40 + 20 * np.random.random(),
pressure=1000 + 50 * np.random.random()
)
records.append(record)
print(f"生成了 {len(records)} 条记录")
print("前3条记录:")
for record in records[:3]:
print(record)
# 保存为自定义二进制格式
print(f"\n保存为自定义二进制格式:")
with open('sensor_data.bin', 'wb') as f:
# 写入记录数量
f.write(struct.pack('I', len(records)))
# 写入所有记录
for record in records:
f.write(record.to_bytes())
print("✅ 自定义二进制文件已保存")
# 从自定义二进制格式加载
print(f"\n从自定义二进制格式加载:")
loaded_records = []
with open('sensor_data.bin', 'rb') as f:
# 读取记录数量
num_records = struct.unpack('I', f.read(4))[0]
print(f"文件中有 {num_records} 条记录")
# 读取所有记录
record_size = struct.calcsize('dIfff')
for i in range(num_records):
data = f.read(record_size)
record = DataRecord.from_bytes(data)
loaded_records.append(record)
print("前3条加载的记录:")
for record in loaded_records[:3]:
print(record)
# 验证数据完整性
print(f"\n数据完整性验证:")
all_equal = True
for orig, loaded in zip(records, loaded_records):
if (abs(orig.timestamp - loaded.timestamp) > 1e-6 or
orig.sensor_id != loaded.sensor_id or
abs(orig.temperature - loaded.temperature) > 1e-6 or
abs(orig.humidity - loaded.humidity) > 1e-6 or
abs(orig.pressure - loaded.pressure) > 1e-6):
all_equal = False
break
print(f"所有记录相等: {all_equal}")
# 文件大小分析
file_size = os.path.getsize('sensor_data.bin')
record_size = struct.calcsize('dIfff')
expected_size = 4 + len(records) * record_size # 4字节记录数 + 记录数据
print(f"文件大小: {file_size} 字节")
print(f"预期大小: {expected_size} 字节")
print(f"每条记录: {record_size} 字节")
4. 数据格式转换
4.1 与Pandas的互操作
# 9. 与Pandas的互操作
print("\n🐼 与Pandas的互操作:")
try:
import pandas as pd
# NumPy数组转DataFrame
print("NumPy数组转DataFrame:")
np_data = np.random.random((100, 5))
columns = ['Feature_' + str(i) for i in range(5)]
df = pd.DataFrame(np_data, columns=columns)
print(f"DataFrame形状: {df.shape}")
print("前5行:")
print(df.head())
# DataFrame转NumPy数组
print(f"\nDataFrame转NumPy数组:")
back_to_numpy = df.values
print(f"转换后数组形状: {back_to_numpy.shape}")
print(f"数据类型: {back_to_numpy.dtype}")
print(f"数据相等: {np.allclose(np_data, back_to_numpy)}")
# 处理混合数据类型
print(f"\n处理混合数据类型:")
mixed_df = pd.DataFrame({
'name': ['Alice', 'Bob', 'Charlie'],
'age': [25, 30, 35],
'salary': [50000.0, 60000.0, 70000.0],
'active': [True, False, True]
})
print("混合类型DataFrame:")
print(mixed_df)
print(f"数据类型:\n{mixed_df.dtypes}")
# 分别提取不同类型的列
numeric_data = mixed_df.select_dtypes(include=[np.number]).values
string_data = mixed_df.select_dtypes(include=['object']).values
bool_data = mixed_df.select_dtypes(include=['bool']).values
print(f"\n数值数据形状: {numeric_data.shape}")
print(f"字符串数据形状: {string_data.shape}")
print(f"布尔数据形状: {bool_data.shape}")
# 保存和加载
print(f"\n保存和加载:")
df.to_csv('pandas_data.csv', index=False)
mixed_df.to_pickle('mixed_data.pkl')
# 使用NumPy加载CSV
csv_array = np.genfromtxt('pandas_data.csv', delimiter=',', skip_header=1)
print(f"NumPy加载的CSV数据形状: {csv_array.shape}")
except ImportError:
print("Pandas未安装,跳过Pandas互操作示例")
4.2 与其他格式的转换
# 10. 与其他格式的转换
print("\n🔄 与其他格式的转换:")
# JSON格式
import json
print("JSON格式转换:")
data_dict = {
'array_1d': np_data[:5, 0].tolist(),
'array_2d': np_data[:3, :3].tolist(),
'metadata': {
'shape': list(np_data.shape),
'dtype': str(np_data.dtype),
'mean': float(np.mean(np_data)),
'std': float(np.std(np_data))
}
}
# 保存为JSON
with open('data.json', 'w') as f:
json.dump(data_dict, f, indent=2)
print("✅ 数据已保存为JSON格式")
# 从JSON加载
with open('data.json', 'r') as f:
loaded_dict = json.load(f)
# 转换回NumPy数组
loaded_1d = np.array(loaded_dict['array_1d'])
loaded_2d = np.array(loaded_dict['array_2d'])
print(f"从JSON加载的1D数组: {loaded_1d}")
print(f"从JSON加载的2D数组形状: {loaded_2d.shape}")
# XML格式(简单示例)
print(f"\nXML格式转换:")
import xml.etree.ElementTree as ET
def array_to_xml(array, root_name='array'):
root = ET.Element(root_name)
root.set('shape', str(array.shape))
root.set('dtype', str(array.dtype))
if array.ndim == 1:
for i, value in enumerate(array):
elem = ET.SubElement(root, 'element')
elem.set('index', str(i))
elem.text = str(value)
elif array.ndim == 2:
for i in range(array.shape[0]):
row = ET.SubElement(root, 'row')
row.set('index', str(i))
for j in range(array.shape[1]):
elem = ET.SubElement(row, 'element')
elem.set('index', str(j))
elem.text = str(array[i, j])
return root
# 转换小数组为XML
small_array = np.array([[1, 2, 3], [4, 5, 6]])
xml_root = array_to_xml(small_array)
# 保存XML
tree = ET.ElementTree(xml_root)
tree.write('data.xml', encoding='utf-8', xml_declaration=True)
print("✅ 数据已保存为XML格式")
# HDF5格式(如果可用)
try:
import h5py
print(f"\nHDF5格式转换:")
# 保存为HDF5
with h5py.File('data.h5', 'w') as f:
f.create_dataset('features', data=np_data)
f.create_dataset('labels', data=np.random.randint(0, 3, 100))
f.create_dataset('weights', data=np.random.random(5))
# 添加属性
f.attrs['created_by'] = 'NumPy tutorial'
f.attrs['version'] = '1.0'
f['features'].attrs['description'] = 'Random feature matrix'
print("✅ 数据已保存为HDF5格式")
# 从HDF5加载
with h5py.File('data.h5', 'r') as f:
print(f"HDF5文件中的数据集: {list(f.keys())}")
loaded_features = f['features'][:]
loaded_labels = f['labels'][:]
print(f"加载的特征形状: {loaded_features.shape}")
print(f"加载的标签形状: {loaded_labels.shape}")
print(f"文件属性: {dict(f.attrs)}")
except ImportError:
print("h5py未安装,跳过HDF5示例")
5. 大数据文件处理
5.1 分块处理
# 11. 大数据文件分块处理
print("\n📊 大数据文件分块处理:")
# 创建大数据文件
print("创建大数据文件:")
large_data = np.random.random((10000, 100))
np.save('large_dataset.npy', large_data)
file_size = os.path.getsize('large_dataset.npy')
print(f"大数据文件大小: {file_size / 1024 / 1024:.2f} MB")
# 分块读取和处理
print(f"\n分块读取和处理:")
chunk_size = 1000
num_chunks = len(large_data) // chunk_size
# 使用内存映射进行分块处理
mmap_data = np.memmap('large_dataset.npy', dtype='float64', mode='r')
# 注意:需要跳过.npy文件头
mmap_data = mmap_data[40:].reshape(10000, 100) # 跳过头部信息
print(f"数据分为 {num_chunks} 个块,每块 {chunk_size} 行")
# 分块计算统计信息
chunk_means = []
chunk_stds = []
for i in range(num_chunks):
start_idx = i * chunk_size
end_idx = start_idx + chunk_size
chunk = mmap_data[start_idx:end_idx]
chunk_mean = np.mean(chunk)
chunk_std = np.std(chunk)
chunk_means.append(chunk_mean)
chunk_stds.append(chunk_std)
if i < 3: # 只打印前3个块的信息
print(f"块 {i}: 均值={chunk_mean:.6f}, 标准差={chunk_std:.6f}")
# 计算全局统计信息
global_mean = np.mean(chunk_means)
global_std = np.sqrt(np.mean([s**2 for s in chunk_stds]))
print(f"\n全局统计信息:")
print(f"全局均值: {global_mean:.6f}")
print(f"全局标准差: {global_std:.6f}")
# 验证与直接计算的结果
direct_mean = np.mean(large_data)
direct_std = np.std(large_data)
print(f"\n验证(直接计算):")
print(f"直接计算均值: {direct_mean:.6f}")
print(f"直接计算标准差: {direct_std:.6f}")
print(f"均值误差: {abs(global_mean - direct_mean):.8f}")
print(f"标准差误差: {abs(global_std - direct_std):.8f}")
5.2 流式处理
# 12. 流式处理
print("\n🌊 流式处理:")
def data_generator(n_samples, n_features, batch_size=100):
"""数据生成器,模拟流式数据"""
for i in range(0, n_samples, batch_size):
current_batch_size = min(batch_size, n_samples - i)
batch = np.random.random((current_batch_size, n_features))
yield batch, i
# 流式处理示例
print("流式处理示例:")
total_samples = 5000
n_features = 50
batch_size = 200
# 在线统计计算
running_sum = np.zeros(n_features)
running_sum_sq = np.zeros(n_features)
total_processed = 0
print(f"处理 {total_samples} 个样本,批大小 {batch_size}")
for batch, batch_start in data_generator(total_samples, n_features, batch_size):
# 更新在线统计
running_sum += np.sum(batch, axis=0)
running_sum_sq += np.sum(batch**2, axis=0)
total_processed += len(batch)
# 计算当前的均值和方差
current_mean = running_sum / total_processed
current_var = (running_sum_sq / total_processed) - current_mean**2
current_std = np.sqrt(current_var)
if batch_start % (batch_size * 5) == 0: # 每5个批次打印一次
print(f"已处理 {total_processed} 样本,当前均值: {np.mean(current_mean):.6f}")
final_mean = running_sum / total_processed
final_var = (running_sum_sq / total_processed) - final_mean**2
final_std = np.sqrt(final_var)
print(f"\n最终统计结果:")
print(f"样本数: {total_processed}")
print(f"特征均值的均值: {np.mean(final_mean):.6f}")
print(f"特征标准差的均值: {np.mean(final_std):.6f}")
# 验证结果
verification_data = np.random.random((total_samples, n_features))
verification_mean = np.mean(verification_data, axis=0)
verification_std = np.std(verification_data, axis=0)
print(f"\n验证(理论值约为0.5和0.289):")
print(f"理论均值: 0.5, 实际: {np.mean(verification_mean):.6f}")
print(f"理论标准差: 0.289, 实际: {np.mean(verification_std):.6f}")
5.3 并行处理
# 13. 并行处理
print("\n⚡ 并行处理:")
from multiprocessing import Pool
import functools
def process_chunk(args):
"""处理数据块的函数"""
chunk_data, chunk_id = args
# 模拟复杂计算
result = {
'chunk_id': chunk_id,
'mean': np.mean(chunk_data),
'std': np.std(chunk_data),
'min': np.min(chunk_data),
'max': np.max(chunk_data),
'sum': np.sum(chunk_data),
'size': chunk_data.size
}
return result
# 创建测试数据
print("创建测试数据:")
test_data = np.random.random((8000, 50))
chunk_size = 1000
chunks = [test_data[i:i+chunk_size] for i in range(0, len(test_data), chunk_size)]
print(f"数据分为 {len(chunks)} 个块")
# 串行处理
print(f"\n串行处理:")
start_time = time.time()
serial_results = []
for i, chunk in enumerate(chunks):
result = process_chunk((chunk, i))
serial_results.append(result)
serial_time = time.time() - start_time
print(f"串行处理时间: {serial_time:.4f}秒")
# 并行处理
print(f"\n并行处理:")
start_time = time.time()
with Pool() as pool:
chunk_args = [(chunk, i) for i, chunk in enumerate(chunks)]
parallel_results = pool.map(process_chunk, chunk_args)
parallel_time = time.time() - start_time
print(f"并行处理时间: {parallel_time:.4f}秒")
print(f"速度提升: {serial_time/parallel_time:.2f}x")
# 验证结果一致性
print(f"\n结果验证:")
for serial, parallel in zip(serial_results, parallel_results):
for key in serial:
if key != 'chunk_id':
assert abs(serial[key] - parallel[key]) < 1e-10
print("✅ 串行和并行结果一致")
# 汇总结果
total_sum = sum(r['sum'] for r in parallel_results)
total_size = sum(r['size'] for r in parallel_results)
global_mean = total_sum / total_size
print(f"\n汇总统计:")
print(f"全局均值: {global_mean:.6f}")
print(f"全局最小值: {min(r['min'] for r in parallel_results):.6f}")
print(f"全局最大值: {max(r['max'] for r in parallel_results):.6f}")
6. 性能优化和最佳实践
6.1 IO性能比较
# 14. IO性能比较
print("\n🏃 IO性能比较:")
# 创建测试数据
test_sizes = [1000, 5000, 10000]
data_types = [np.float32, np.float64, np.int32, np.int64]
print("不同格式的IO性能比较:")
print("大小\t数据类型\t.npy保存\t.npy加载\t.npz保存\t.npz加载\t文本保存\t文本加载")
for size in test_sizes:
for dtype in data_types:
# 创建测试数据
test_array = np.random.random((size, 10)).astype(dtype)
# .npy格式
start = time.time()
np.save('test.npy', test_array)
npy_save_time = time.time() - start
start = time.time()
loaded_npy = np.load('test.npy')
npy_load_time = time.time() - start
# .npz格式
start = time.time()
np.savez('test.npz', data=test_array)
npz_save_time = time.time() - start
start = time.time()
loaded_npz = np.load('test.npz')['data']
npz_load_time = time.time() - start
# 文本格式
start = time.time()
np.savetxt('test.txt', test_array)
txt_save_time = time.time() - start
start = time.time()
loaded_txt = np.loadtxt('test.txt')
txt_load_time = time.time() - start
print(f"{size}\t{dtype.__name__}\t{npy_save_time:.4f}s\t{npy_load_time:.4f}s\t"
f"{npz_save_time:.4f}s\t{npz_load_time:.4f}s\t{txt_save_time:.4f}s\t{txt_load_time:.4f}s")
# 文件大小比较
print(f"\n文件大小比较:")
test_array = np.random.random((1000, 10))
np.save('size_test.npy', test_array)
np.savez('size_test.npz', data=test_array)
np.savez_compressed('size_test_compressed.npz', data=test_array)
np.savetxt('size_test.txt', test_array)
files = ['size_test.npy', 'size_test.npz', 'size_test_compressed.npz', 'size_test.txt']
for filename in files:
size = os.path.getsize(filename)
print(f"{filename}: {size} 字节 ({size/1024:.2f} KB)")
6.2 内存使用优化
# 15. 内存使用优化
print("\n💾 内存使用优化:")
import psutil
import gc
def get_memory_usage():
"""获取当前内存使用量"""
process = psutil.Process()
return process.memory_info().rss / 1024 / 1024 # MB
print("内存使用优化示例:")
# 基准内存使用
baseline_memory = get_memory_usage()
print(f"基准内存使用: {baseline_memory:.2f} MB")
# 方法1: 直接加载大文件
print(f"\n方法1: 直接加载大文件")
large_array = np.random.random((5000, 1000))
np.save('memory_test.npy', large_array)
direct_memory_before = get_memory_usage()
loaded_direct = np.load('memory_test.npy')
direct_memory_after = get_memory_usage()
print(f"直接加载前内存: {direct_memory_before:.2f} MB")
print(f"直接加载后内存: {direct_memory_after:.2f} MB")
print(f"内存增加: {direct_memory_after - direct_memory_before:.2f} MB")
# 清理内存
del loaded_direct
gc.collect()
# 方法2: 使用内存映射
print(f"\n方法2: 使用内存映射")
mmap_memory_before = get_memory_usage()
mmap_array = np.memmap('memory_test.npy', dtype='float64', mode='r')
mmap_array = mmap_array[40:].reshape(5000, 1000) # 跳过.npy头部
mmap_memory_after = get_memory_usage()
print(f"内存映射前内存: {mmap_memory_before:.2f} MB")
print(f"内存映射后内存: {mmap_memory_after:.2f} MB")
print(f"内存增加: {mmap_memory_after - mmap_memory_before:.2f} MB")
# 访问部分数据
subset = mmap_array[1000:2000, 100:200]
subset_memory = get_memory_usage()
print(f"访问子集后内存: {subset_memory:.2f} MB")
# 方法3: 分块处理
print(f"\n方法3: 分块处理")
chunk_memory_before = get_memory_usage()
chunk_results = []
for i in range(0, 5000, 1000):
chunk = mmap_array[i:i+1000]
chunk_mean = np.mean(chunk)
chunk_results.append(chunk_mean)
# 强制垃圾回收
del chunk
gc.collect()
chunk_memory_after = get_memory_usage()
print(f"分块处理前内存: {chunk_memory_before:.2f} MB")
print(f"分块处理后内存: {chunk_memory_after:.2f} MB")
print(f"内存增加: {chunk_memory_after - chunk_memory_before:.2f} MB")
print(f"\n内存使用总结:")
print(f"直接加载内存开销: {direct_memory_after - direct_memory_before:.2f} MB")
print(f"内存映射内存开销: {mmap_memory_after - mmap_memory_before:.2f} MB")
print(f"分块处理内存开销: {chunk_memory_after - chunk_memory_before:.2f} MB")
7. 实际应用案例
7.1 科学数据处理
# 16. 科学数据处理案例
print("\n🔬 科学数据处理案例:")
# 模拟科学实验数据
print("模拟科学实验数据:")
# 实验参数
n_experiments = 1000
n_measurements = 100
n_conditions = 5
# 生成实验数据
experiment_data = {
'experiment_id': np.arange(n_experiments),
'condition': np.random.randint(0, n_conditions, n_experiments),
'temperature': np.random.normal(25, 5, n_experiments),
'pressure': np.random.normal(1013, 50, n_experiments),
'measurements': np.random.normal(100, 10, (n_experiments, n_measurements))
}
print(f"实验数据包含:")
print(f"- {n_experiments} 个实验")
print(f"- {n_conditions} 种条件")
print(f"- 每个实验 {n_measurements} 次测量")
# 保存实验数据
print(f"\n保存实验数据:")
np.savez_compressed('experiment_data.npz', **experiment_data)
# 添加元数据
metadata = {
'experiment_date': '2023-12-01',
'researcher': 'Dr. Smith',
'equipment': 'Spectrometer XYZ-2000',
'calibration_date': '2023-11-15',
'notes': 'Temperature controlled environment'
}
with open('experiment_metadata.json', 'w') as f:
json.dump(metadata, f, indent=2)
print("✅ 实验数据和元数据已保存")
# 数据分析
print(f"\n数据分析:")
measurements = experiment_data['measurements']
conditions = experiment_data['condition']
# 按条件分组分析
condition_stats = {}
for cond in range(n_conditions):
mask = conditions == cond
cond_measurements = measurements[mask]
condition_stats[cond] = {
'count': np.sum(mask),
'mean': np.mean(cond_measurements),
'std': np.std(cond_measurements),
'min': np.min(cond_measurements),
'max': np.max(cond_measurements)
}
print("按条件统计:")
for cond, stats in condition_stats.items():
print(f"条件 {cond}: 实验数={stats['count']}, "
f"均值={stats['mean']:.2f}±{stats['std']:.2f}")
# 保存分析结果
analysis_results = {
'condition_statistics': condition_stats,
'overall_mean': float(np.mean(measurements)),
'overall_std': float(np.std(measurements)),
'total_experiments': int(n_experiments),
'total_measurements': int(n_experiments * n_measurements)
}
with open('analysis_results.json', 'w') as f:
json.dump(analysis_results, f, indent=2)
print("✅ 分析结果已保存")
7.2 机器学习数据管道
# 17. 机器学习数据管道
print("\n🤖 机器学习数据管道:")
# 创建机器学习数据集
print("创建机器学习数据集:")
n_samples = 10000
n_features = 50
n_classes = 3
# 生成特征数据
X = np.random.random((n_samples, n_features))
# 添加一些相关性
X[:, 1] = X[:, 0] + 0.1 * np.random.random(n_samples)
X[:, 2] = X[:, 0] - X[:, 1] + 0.1 * np.random.random(n_samples)
# 生成标签
y = np.random.randint(0, n_classes, n_samples)
# 生成样本权重
sample_weights = np.random.uniform(0.5, 1.5, n_samples)
print(f"数据集大小: {X.shape}")
print(f"标签分布: {np.bincount(y)}")
# 数据预处理
print(f"\n数据预处理:")
# 标准化特征
X_mean = np.mean(X, axis=0)
X_std = np.std(X, axis=0)
X_normalized = (X - X_mean) / X_std
print(f"标准化后特征均值: {np.mean(X_normalized, axis=0)[:5]}")
print(f"标准化后特征标准差: {np.std(X_normalized, axis=0)[:5]}")
# 数据分割
print(f"\n数据分割:")
indices = np.random.permutation(n_samples)
train_size = int(0.7 * n_samples)
val_size = int(0.2 * n_samples)
train_indices = indices[:train_size]
val_indices = indices[train_size:train_size + val_size]
test_indices = indices[train_size + val_size:]
X_train = X_normalized[train_indices]
y_train = y[train_indices]
weights_train = sample_weights[train_indices]
X_val = X_normalized[val_indices]
y_val = y[val_indices]
X_test = X_normalized[test_indices]
y_test = y[test_indices]
print(f"训练集: {X_train.shape}")
print(f"验证集: {X_val.shape}")
print(f"测试集: {X_test.shape}")
# 保存数据集
print(f"\n保存数据集:")
dataset = {
'X_train': X_train,
'y_train': y_train,
'weights_train': weights_train,
'X_val': X_val,
'y_val': y_val,
'X_test': X_test,
'y_test': y_test,
'feature_names': [f'feature_{i}' for i in range(n_features)],
'class_names': [f'class_{i}' for i in range(n_classes)],
'preprocessing_params': {
'mean': X_mean,
'std': X_std
}
}
np.savez_compressed('ml_dataset.npz', **dataset)
# 保存数据集信息
dataset_info = {
'name': 'Synthetic Classification Dataset',
'version': '1.0',
'created_date': '2023-12-01',
'n_samples': int(n_samples),
'n_features': int(n_features),
'n_classes': int(n_classes),
'train_size': int(train_size),
'val_size': int(val_size),
'test_size': int(len(test_indices)),
'preprocessing': 'StandardScaler',
'class_distribution': {f'class_{i}': int(count) for i, count in enumerate(np.bincount(y))}
}
with open('dataset_info.json', 'w') as f:
json.dump(dataset_info, f, indent=2)
print("✅ 机器学习数据集已保存")
# 数据加载验证
print(f"\n数据加载验证:")
with np.load('ml_dataset.npz') as data:
print(f"数据集包含的数组: {list(data.keys())}")
loaded_X_train = data['X_train']
loaded_y_train = data['y_train']
print(f"加载的训练集形状: {loaded_X_train.shape}")
print(f"数据完整性验证: {np.array_equal(X_train, loaded_X_train)}")
# 文件大小分析
file_size = os.path.getsize('ml_dataset.npz')
print(f"数据集文件大小: {file_size / 1024 / 1024:.2f} MB")
8. 本章小结
8.1 核心知识点
NumPy原生格式
- .npy: 单数组存储,高效二进制格式
- .npz: 多数组存储,支持压缩
- 内存映射: 大文件的高效访问
文本文件处理
- savetxt/loadtxt: 基本文本文件操作
- genfromtxt: 复杂文本文件处理
- CSV格式的高级处理
二进制文件
- tofile/fromfile: 原始二进制格式
- 自定义二进制格式设计
- 跨平台兼容性考虑
数据格式转换
- 与Pandas的互操作
- JSON、XML、HDF5等格式
- 数据类型转换和验证
大数据处理
- 分块处理策略
- 流式处理技术
- 并行处理优化
8.2 最佳实践
- 🚀 选择合适的格式: 根据数据特点选择最优存储格式
- 💾 内存效率: 使用内存映射处理大文件
- 🔄 数据完整性: 始终验证保存和加载的数据
- 📊 元数据管理: 保存数据的描述信息和处理参数
8.3 常见陷阱
- ❌ 数据类型丢失: 文本格式可能丢失精度
- ❌ 平台兼容性: 二进制格式的字节序问题
- ❌ 内存溢出: 大文件的不当加载
- ❌ 文件损坏: 缺乏数据完整性检查
8.4 下一步学习
- ⚡ 学习性能优化技巧
- 🔧 掌握与其他库的集成
- 📚 深入了解数据库连接
- 🌐 学习分布式数据处理
9. 练习题
9.1 基础练习
文件格式比较
- 比较不同格式的存储效率
- 测试加载速度差异
- 分析压缩效果
数据完整性
- 实现数据校验机制
- 处理文件损坏情况
- 设计备份策略
格式转换
- 实现多种格式间的转换
- 处理数据类型转换
- 保持元数据信息
9.2 进阶练习
大数据处理
- 设计分块处理算法
- 实现流式数据处理
- 优化内存使用
自定义格式
- 设计专用数据格式
- 实现版本兼容性
- 添加压缩功能
数据管道
- 构建完整的数据处理管道
- 实现错误处理和恢复
- 添加进度监控
9.3 挑战练习
分布式存储
- 实现数据分片存储
- 设计负载均衡策略
- 处理节点故障
实时数据流
- 处理实时数据流
- 实现增量更新
- 优化延迟和吞吐量
数据库集成
- 与关系数据库集成
- 实现NoSQL数据库连接
- 设计缓存策略
恭喜您完成第7章的学习! 🎉
您已经掌握了NumPy的文件IO和数据持久化技术,这些技能对于实际项目中的数据管理至关重要。在下一章中,我们将学习性能优化和高级技巧。