📚 本章概述

本章将深入探讨NumPy的文件输入输出功能和数据持久化技术。您将学习如何保存和加载NumPy数组,处理各种文件格式,以及与其他数据格式进行转换。这些技能对于数据分析、机器学习和科学计算项目至关重要。

🎯 学习目标

  • 掌握NumPy原生文件格式的读写
  • 学会处理文本文件和CSV数据
  • 了解二进制文件的高效存储
  • 掌握与其他格式的数据交换
  • 学会大数据文件的处理技巧
  • 理解数据压缩和性能优化

1. NumPy原生文件格式

1.1 .npy格式 - 单数组存储

import numpy as np
import os
import time
import matplotlib.pyplot as plt

# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

print("💾 NumPy原生文件格式")
print("=" * 50)

# 创建示例数据
np.random.seed(42)
data_1d = np.random.random(1000)
data_2d = np.random.randint(0, 100, (50, 20))
data_3d = np.random.normal(0, 1, (10, 10, 5))

print("创建示例数据:")
print(f"1D数组形状: {data_1d.shape}, 类型: {data_1d.dtype}")
print(f"2D数组形状: {data_2d.shape}, 类型: {data_2d.dtype}")
print(f"3D数组形状: {data_3d.shape}, 类型: {data_3d.dtype}")

# 保存单个数组到.npy文件
print(f"\n保存数组到.npy文件:")
np.save('data_1d.npy', data_1d)
np.save('data_2d.npy', data_2d)
np.save('data_3d.npy', data_3d)

print("✅ 数组已保存到.npy文件")

# 加载.npy文件
print(f"\n从.npy文件加载数组:")
loaded_1d = np.load('data_1d.npy')
loaded_2d = np.load('data_2d.npy')
loaded_3d = np.load('data_3d.npy')

print(f"加载的1D数组形状: {loaded_1d.shape}")
print(f"加载的2D数组形状: {loaded_2d.shape}")
print(f"加载的3D数组形状: {loaded_3d.shape}")

# 验证数据完整性
print(f"\n数据完整性验证:")
print(f"1D数组相等: {np.array_equal(data_1d, loaded_1d)}")
print(f"2D数组相等: {np.array_equal(data_2d, loaded_2d)}")
print(f"3D数组相等: {np.array_equal(data_3d, loaded_3d)}")

# 文件大小信息
print(f"\n文件大小信息:")
for filename in ['data_1d.npy', 'data_2d.npy', 'data_3d.npy']:
    size = os.path.getsize(filename)
    print(f"{filename}: {size} 字节 ({size/1024:.2f} KB)")

1.2 .npz格式 - 多数组存储

# 2. .npz格式 - 多数组存储
print("\n📦 .npz格式 - 多数组存储:")

# 创建多个相关数组
features = np.random.random((1000, 10))
labels = np.random.randint(0, 3, 1000)
weights = np.random.random(10)
metadata = np.array(['feature_' + str(i) for i in range(10)])

print("创建多个数组:")
print(f"特征矩阵: {features.shape}")
print(f"标签向量: {labels.shape}")
print(f"权重向量: {weights.shape}")
print(f"元数据: {metadata.shape}")

# 保存多个数组到.npz文件(未压缩)
print(f"\n保存到.npz文件(未压缩):")
np.savez('dataset.npz', 
         features=features, 
         labels=labels, 
         weights=weights, 
         metadata=metadata)

# 保存多个数组到.npz文件(压缩)
print(f"保存到.npz文件(压缩):")
np.savez_compressed('dataset_compressed.npz', 
                   features=features, 
                   labels=labels, 
                   weights=weights, 
                   metadata=metadata)

# 加载.npz文件
print(f"\n从.npz文件加载:")
with np.load('dataset.npz') as data:
    print(f"文件中的数组: {list(data.keys())}")
    loaded_features = data['features']
    loaded_labels = data['labels']
    loaded_weights = data['weights']
    loaded_metadata = data['metadata']

print(f"加载的特征矩阵形状: {loaded_features.shape}")
print(f"加载的标签向量形状: {loaded_labels.shape}")

# 比较文件大小
size_uncompressed = os.path.getsize('dataset.npz')
size_compressed = os.path.getsize('dataset_compressed.npz')

print(f"\n文件大小比较:")
print(f"未压缩: {size_uncompressed} 字节 ({size_uncompressed/1024:.2f} KB)")
print(f"压缩后: {size_compressed} 字节 ({size_compressed/1024:.2f} KB)")
print(f"压缩比: {size_uncompressed/size_compressed:.2f}:1")
print(f"节省空间: {(1-size_compressed/size_uncompressed)*100:.1f}%")

# 验证数据完整性
print(f"\n数据完整性验证:")
print(f"特征矩阵相等: {np.array_equal(features, loaded_features)}")
print(f"标签向量相等: {np.array_equal(labels, loaded_labels)}")
print(f"权重向量相等: {np.array_equal(weights, loaded_weights)}")
print(f"元数据相等: {np.array_equal(metadata, loaded_metadata)}")

1.3 内存映射文件

# 3. 内存映射文件
print("\n🗺️ 内存映射文件:")

# 创建大数组
large_array = np.random.random((1000, 1000))
print(f"大数组形状: {large_array.shape}")
print(f"大数组大小: {large_array.nbytes / 1024 / 1024:.2f} MB")

# 保存为内存映射文件
print(f"\n创建内存映射文件:")
memmap_file = np.memmap('large_data.dat', dtype='float64', mode='w+', shape=(1000, 1000))
memmap_file[:] = large_array[:]
del memmap_file  # 确保数据写入磁盘

# 加载内存映射文件
print(f"加载内存映射文件:")
loaded_memmap = np.memmap('large_data.dat', dtype='float64', mode='r', shape=(1000, 1000))

print(f"内存映射数组形状: {loaded_memmap.shape}")
print(f"内存映射数组类型: {type(loaded_memmap)}")

# 内存映射的优势:只加载需要的部分
print(f"\n部分数据访问:")
subset = loaded_memmap[100:200, 200:300]
print(f"子集形状: {subset.shape}")
print(f"子集均值: {np.mean(subset):.6f}")

# 性能比较
print(f"\n性能比较:")

# 普通加载
start_time = time.time()
normal_array = np.load('data_2d.npy')
normal_time = time.time() - start_time

# 内存映射加载
start_time = time.time()
mmap_array = np.memmap('large_data.dat', dtype='float64', mode='r', shape=(1000, 1000))
mmap_time = time.time() - start_time

print(f"普通加载时间: {normal_time:.6f}秒")
print(f"内存映射加载时间: {mmap_time:.6f}秒")
print(f"内存映射速度提升: {normal_time/mmap_time:.1f}x")

2. 文本文件处理

2.1 基本文本文件读写

# 4. 文本文件读写
print("\n📄 文本文件读写:")

# 创建示例数据
data_matrix = np.array([[1.1, 2.2, 3.3], 
                       [4.4, 5.5, 6.6], 
                       [7.7, 8.8, 9.9]])

print("原始数据:")
print(data_matrix)

# 保存为文本文件
print(f"\n保存为文本文件:")
np.savetxt('data.txt', data_matrix)
np.savetxt('data_formatted.txt', data_matrix, fmt='%.2f')
np.savetxt('data_delimited.txt', data_matrix, delimiter=',', fmt='%.3f')

print("✅ 数据已保存为文本文件")

# 从文本文件加载
print(f"\n从文本文件加载:")
loaded_data = np.loadtxt('data.txt')
loaded_formatted = np.loadtxt('data_formatted.txt')
loaded_delimited = np.loadtxt('data_delimited.txt', delimiter=',')

print(f"加载的数据形状: {loaded_data.shape}")
print(f"数据相等性验证: {np.allclose(data_matrix, loaded_data)}")

# 查看文件内容
print(f"\n文件内容示例:")
with open('data_formatted.txt', 'r') as f:
    content = f.read()
    print("data_formatted.txt内容:")
    print(content)

# 带标题和注释的保存
print(f"\n带标题和注释的保存:")
header = "Column1,Column2,Column3"
footer = "Data generated by NumPy"

np.savetxt('data_with_header.csv', 
           data_matrix, 
           delimiter=',', 
           header=header, 
           footer=footer, 
           comments='# ',
           fmt='%.2f')

print("✅ 带标题的CSV文件已保存")

2.2 复杂文本文件处理

# 5. 复杂文本文件处理
print("\n🔧 复杂文本文件处理:")

# 创建混合数据类型的数组
mixed_data = np.array([
    ('Alice', 25, 85.5, True),
    ('Bob', 30, 92.3, False),
    ('Charlie', 35, 78.9, True),
    ('Diana', 28, 88.7, False)
], dtype=[('name', 'U10'), ('age', 'i4'), ('score', 'f8'), ('passed', '?')])

print("混合数据类型数组:")
print(mixed_data)
print(f"数据类型: {mixed_data.dtype}")

# 保存结构化数组
print(f"\n保存结构化数组:")
np.savetxt('structured_data.txt', mixed_data, 
           fmt='%s %d %.1f %s',
           header='Name Age Score Passed')

# 处理缺失值
print(f"\n处理缺失值:")
data_with_nan = np.array([[1.0, 2.0, np.nan], 
                         [4.0, np.nan, 6.0], 
                         [7.0, 8.0, 9.0]])

print("包含NaN的数据:")
print(data_with_nan)

# 保存时处理NaN
np.savetxt('data_with_nan.txt', data_with_nan, 
           fmt='%.2f', 
           delimiter=',',
           comments='',
           header='Col1,Col2,Col3')

# 加载时处理缺失值
print(f"\n加载时处理缺失值:")
def custom_converter(s):
    try:
        return float(s)
    except ValueError:
        return np.nan

# 使用genfromtxt处理复杂情况
loaded_with_nan = np.genfromtxt('data_with_nan.txt', 
                               delimiter=',', 
                               skip_header=1,
                               filling_values=np.nan)

print("加载的包含NaN的数据:")
print(loaded_with_nan)
print(f"NaN位置相同: {np.isnan(data_with_nan).sum() == np.isnan(loaded_with_nan).sum()}")

2.3 CSV文件高级处理

# 6. CSV文件高级处理
print("\n📊 CSV文件高级处理:")

# 创建复杂的CSV数据
np.random.seed(42)
dates = ['2023-01-' + str(i+1).zfill(2) for i in range(10)]
products = ['Product_' + chr(65+i) for i in range(5)]
sales_data = []

for i in range(50):
    date = np.random.choice(dates)
    product = np.random.choice(products)
    quantity = np.random.randint(1, 100)
    price = np.random.uniform(10, 100)
    sales_data.append([date, product, quantity, price])

# 转换为结构化数组
sales_array = np.array(sales_data, dtype=[
    ('date', 'U10'), 
    ('product', 'U10'), 
    ('quantity', 'i4'), 
    ('price', 'f8')
])

print(f"销售数据形状: {sales_array.shape}")
print("前5条记录:")
print(sales_array[:5])

# 保存为CSV
print(f"\n保存为CSV文件:")
with open('sales_data.csv', 'w') as f:
    f.write('Date,Product,Quantity,Price\n')
    for record in sales_array:
        f.write(f"{record['date']},{record['product']},{record['quantity']},{record['price']:.2f}\n")

# 使用genfromtxt加载CSV
print(f"\n使用genfromtxt加载CSV:")
csv_data = np.genfromtxt('sales_data.csv', 
                        delimiter=',', 
                        skip_header=1,
                        dtype=None,
                        encoding='utf-8',
                        names=True)

print(f"加载的CSV数据形状: {csv_data.shape}")
print("前3条记录:")
print(csv_data[:3])

# 数据分析示例
print(f"\n数据分析示例:")
quantities = csv_data['Quantity'].astype(float)
prices = csv_data['Price'].astype(float)

print(f"平均数量: {np.mean(quantities):.2f}")
print(f"平均价格: {np.mean(prices):.2f}")
print(f"总销售额: {np.sum(quantities * prices):.2f}")

# 按产品分组统计
unique_products = np.unique(csv_data['Product'])
print(f"\n按产品统计:")
for product in unique_products:
    mask = csv_data['Product'] == product
    product_quantities = quantities[mask]
    product_prices = prices[mask]
    total_sales = np.sum(product_quantities * product_prices)
    print(f"{product}: 总销售额 {total_sales:.2f}")

3. 二进制文件处理

3.1 原始二进制文件

# 7. 原始二进制文件处理
print("\n🔢 原始二进制文件处理:")

# 创建不同数据类型的数组
int_array = np.arange(1000, dtype=np.int32)
float_array = np.random.random(1000).astype(np.float32)
complex_array = np.random.random(500) + 1j * np.random.random(500)

print("数据类型示例:")
print(f"整数数组: {int_array.dtype}, 大小: {int_array.nbytes} 字节")
print(f"浮点数组: {float_array.dtype}, 大小: {float_array.nbytes} 字节")
print(f"复数数组: {complex_array.dtype}, 大小: {complex_array.nbytes} 字节")

# 保存为原始二进制文件
print(f"\n保存为原始二进制文件:")
int_array.tofile('int_data.bin')
float_array.tofile('float_data.bin')
complex_array.tofile('complex_data.bin')

print("✅ 二进制文件已保存")

# 从二进制文件加载
print(f"\n从二进制文件加载:")
loaded_int = np.fromfile('int_data.bin', dtype=np.int32)
loaded_float = np.fromfile('float_data.bin', dtype=np.float32)
loaded_complex = np.fromfile('complex_data.bin', dtype=np.complex128)

print(f"加载的整数数组形状: {loaded_int.shape}")
print(f"加载的浮点数组形状: {loaded_float.shape}")
print(f"加载的复数数组形状: {loaded_complex.shape}")

# 验证数据完整性
print(f"\n数据完整性验证:")
print(f"整数数组相等: {np.array_equal(int_array, loaded_int)}")
print(f"浮点数组相等: {np.allclose(float_array, loaded_float)}")
print(f"复数数组相等: {np.allclose(complex_array, loaded_complex)}")

# 文件大小比较
print(f"\n文件大小比较:")
formats = [
    ('int_data.bin', 'int_data.npy'),
    ('float_data.bin', 'float_data.npy'),
    ('complex_data.bin', 'complex_data.npy')
]

# 保存为.npy格式用于比较
np.save('int_data.npy', int_array)
np.save('float_data.npy', float_array)
np.save('complex_data.npy', complex_array)

for bin_file, npy_file in formats:
    bin_size = os.path.getsize(bin_file)
    npy_size = os.path.getsize(npy_file)
    print(f"{bin_file}: {bin_size} 字节")
    print(f"{npy_file}: {npy_size} 字节")
    print(f"开销: {npy_size - bin_size} 字节 ({(npy_size/bin_size-1)*100:.1f}%)\n")

3.2 自定义二进制格式

# 8. 自定义二进制格式
print("\n🛠️ 自定义二进制格式:")

import struct

# 创建自定义数据结构
class DataRecord:
    def __init__(self, timestamp, sensor_id, temperature, humidity, pressure):
        self.timestamp = timestamp
        self.sensor_id = sensor_id
        self.temperature = temperature
        self.humidity = humidity
        self.pressure = pressure
    
    def to_bytes(self):
        # 格式: 双精度时间戳 + 整数ID + 3个浮点数
        return struct.pack('dIfff', 
                          self.timestamp, 
                          self.sensor_id, 
                          self.temperature, 
                          self.humidity, 
                          self.pressure)
    
    @classmethod
    def from_bytes(cls, data):
        timestamp, sensor_id, temp, humid, press = struct.unpack('dIfff', data)
        return cls(timestamp, sensor_id, temp, humid, press)
    
    def __repr__(self):
        return f"DataRecord(ts={self.timestamp:.2f}, id={self.sensor_id}, T={self.temperature:.1f}°C, H={self.humidity:.1f}%, P={self.pressure:.1f}hPa)"

# 生成示例数据
print("生成传感器数据:")
records = []
for i in range(100):
    record = DataRecord(
        timestamp=time.time() + i * 60,  # 每分钟一个记录
        sensor_id=i % 10,  # 10个传感器
        temperature=20 + 10 * np.random.random(),
        humidity=40 + 20 * np.random.random(),
        pressure=1000 + 50 * np.random.random()
    )
    records.append(record)

print(f"生成了 {len(records)} 条记录")
print("前3条记录:")
for record in records[:3]:
    print(record)

# 保存为自定义二进制格式
print(f"\n保存为自定义二进制格式:")
with open('sensor_data.bin', 'wb') as f:
    # 写入记录数量
    f.write(struct.pack('I', len(records)))
    # 写入所有记录
    for record in records:
        f.write(record.to_bytes())

print("✅ 自定义二进制文件已保存")

# 从自定义二进制格式加载
print(f"\n从自定义二进制格式加载:")
loaded_records = []
with open('sensor_data.bin', 'rb') as f:
    # 读取记录数量
    num_records = struct.unpack('I', f.read(4))[0]
    print(f"文件中有 {num_records} 条记录")
    
    # 读取所有记录
    record_size = struct.calcsize('dIfff')
    for i in range(num_records):
        data = f.read(record_size)
        record = DataRecord.from_bytes(data)
        loaded_records.append(record)

print("前3条加载的记录:")
for record in loaded_records[:3]:
    print(record)

# 验证数据完整性
print(f"\n数据完整性验证:")
all_equal = True
for orig, loaded in zip(records, loaded_records):
    if (abs(orig.timestamp - loaded.timestamp) > 1e-6 or
        orig.sensor_id != loaded.sensor_id or
        abs(orig.temperature - loaded.temperature) > 1e-6 or
        abs(orig.humidity - loaded.humidity) > 1e-6 or
        abs(orig.pressure - loaded.pressure) > 1e-6):
        all_equal = False
        break

print(f"所有记录相等: {all_equal}")

# 文件大小分析
file_size = os.path.getsize('sensor_data.bin')
record_size = struct.calcsize('dIfff')
expected_size = 4 + len(records) * record_size  # 4字节记录数 + 记录数据
print(f"文件大小: {file_size} 字节")
print(f"预期大小: {expected_size} 字节")
print(f"每条记录: {record_size} 字节")

4. 数据格式转换

4.1 与Pandas的互操作

# 9. 与Pandas的互操作
print("\n🐼 与Pandas的互操作:")

try:
    import pandas as pd
    
    # NumPy数组转DataFrame
    print("NumPy数组转DataFrame:")
    np_data = np.random.random((100, 5))
    columns = ['Feature_' + str(i) for i in range(5)]
    
    df = pd.DataFrame(np_data, columns=columns)
    print(f"DataFrame形状: {df.shape}")
    print("前5行:")
    print(df.head())
    
    # DataFrame转NumPy数组
    print(f"\nDataFrame转NumPy数组:")
    back_to_numpy = df.values
    print(f"转换后数组形状: {back_to_numpy.shape}")
    print(f"数据类型: {back_to_numpy.dtype}")
    print(f"数据相等: {np.allclose(np_data, back_to_numpy)}")
    
    # 处理混合数据类型
    print(f"\n处理混合数据类型:")
    mixed_df = pd.DataFrame({
        'name': ['Alice', 'Bob', 'Charlie'],
        'age': [25, 30, 35],
        'salary': [50000.0, 60000.0, 70000.0],
        'active': [True, False, True]
    })
    
    print("混合类型DataFrame:")
    print(mixed_df)
    print(f"数据类型:\n{mixed_df.dtypes}")
    
    # 分别提取不同类型的列
    numeric_data = mixed_df.select_dtypes(include=[np.number]).values
    string_data = mixed_df.select_dtypes(include=['object']).values
    bool_data = mixed_df.select_dtypes(include=['bool']).values
    
    print(f"\n数值数据形状: {numeric_data.shape}")
    print(f"字符串数据形状: {string_data.shape}")
    print(f"布尔数据形状: {bool_data.shape}")
    
    # 保存和加载
    print(f"\n保存和加载:")
    df.to_csv('pandas_data.csv', index=False)
    mixed_df.to_pickle('mixed_data.pkl')
    
    # 使用NumPy加载CSV
    csv_array = np.genfromtxt('pandas_data.csv', delimiter=',', skip_header=1)
    print(f"NumPy加载的CSV数据形状: {csv_array.shape}")
    
except ImportError:
    print("Pandas未安装,跳过Pandas互操作示例")

4.2 与其他格式的转换

# 10. 与其他格式的转换
print("\n🔄 与其他格式的转换:")

# JSON格式
import json

print("JSON格式转换:")
data_dict = {
    'array_1d': np_data[:5, 0].tolist(),
    'array_2d': np_data[:3, :3].tolist(),
    'metadata': {
        'shape': list(np_data.shape),
        'dtype': str(np_data.dtype),
        'mean': float(np.mean(np_data)),
        'std': float(np.std(np_data))
    }
}

# 保存为JSON
with open('data.json', 'w') as f:
    json.dump(data_dict, f, indent=2)

print("✅ 数据已保存为JSON格式")

# 从JSON加载
with open('data.json', 'r') as f:
    loaded_dict = json.load(f)

# 转换回NumPy数组
loaded_1d = np.array(loaded_dict['array_1d'])
loaded_2d = np.array(loaded_dict['array_2d'])

print(f"从JSON加载的1D数组: {loaded_1d}")
print(f"从JSON加载的2D数组形状: {loaded_2d.shape}")

# XML格式(简单示例)
print(f"\nXML格式转换:")
import xml.etree.ElementTree as ET

def array_to_xml(array, root_name='array'):
    root = ET.Element(root_name)
    root.set('shape', str(array.shape))
    root.set('dtype', str(array.dtype))
    
    if array.ndim == 1:
        for i, value in enumerate(array):
            elem = ET.SubElement(root, 'element')
            elem.set('index', str(i))
            elem.text = str(value)
    elif array.ndim == 2:
        for i in range(array.shape[0]):
            row = ET.SubElement(root, 'row')
            row.set('index', str(i))
            for j in range(array.shape[1]):
                elem = ET.SubElement(row, 'element')
                elem.set('index', str(j))
                elem.text = str(array[i, j])
    
    return root

# 转换小数组为XML
small_array = np.array([[1, 2, 3], [4, 5, 6]])
xml_root = array_to_xml(small_array)

# 保存XML
tree = ET.ElementTree(xml_root)
tree.write('data.xml', encoding='utf-8', xml_declaration=True)

print("✅ 数据已保存为XML格式")

# HDF5格式(如果可用)
try:
    import h5py
    
    print(f"\nHDF5格式转换:")
    
    # 保存为HDF5
    with h5py.File('data.h5', 'w') as f:
        f.create_dataset('features', data=np_data)
        f.create_dataset('labels', data=np.random.randint(0, 3, 100))
        f.create_dataset('weights', data=np.random.random(5))
        
        # 添加属性
        f.attrs['created_by'] = 'NumPy tutorial'
        f.attrs['version'] = '1.0'
        f['features'].attrs['description'] = 'Random feature matrix'
    
    print("✅ 数据已保存为HDF5格式")
    
    # 从HDF5加载
    with h5py.File('data.h5', 'r') as f:
        print(f"HDF5文件中的数据集: {list(f.keys())}")
        loaded_features = f['features'][:]
        loaded_labels = f['labels'][:]
        
        print(f"加载的特征形状: {loaded_features.shape}")
        print(f"加载的标签形状: {loaded_labels.shape}")
        print(f"文件属性: {dict(f.attrs)}")
    
except ImportError:
    print("h5py未安装,跳过HDF5示例")

5. 大数据文件处理

5.1 分块处理

# 11. 大数据文件分块处理
print("\n📊 大数据文件分块处理:")

# 创建大数据文件
print("创建大数据文件:")
large_data = np.random.random((10000, 100))
np.save('large_dataset.npy', large_data)

file_size = os.path.getsize('large_dataset.npy')
print(f"大数据文件大小: {file_size / 1024 / 1024:.2f} MB")

# 分块读取和处理
print(f"\n分块读取和处理:")
chunk_size = 1000
num_chunks = len(large_data) // chunk_size

# 使用内存映射进行分块处理
mmap_data = np.memmap('large_dataset.npy', dtype='float64', mode='r')
# 注意:需要跳过.npy文件头
mmap_data = mmap_data[40:].reshape(10000, 100)  # 跳过头部信息

print(f"数据分为 {num_chunks} 个块,每块 {chunk_size} 行")

# 分块计算统计信息
chunk_means = []
chunk_stds = []

for i in range(num_chunks):
    start_idx = i * chunk_size
    end_idx = start_idx + chunk_size
    
    chunk = mmap_data[start_idx:end_idx]
    chunk_mean = np.mean(chunk)
    chunk_std = np.std(chunk)
    
    chunk_means.append(chunk_mean)
    chunk_stds.append(chunk_std)
    
    if i < 3:  # 只打印前3个块的信息
        print(f"块 {i}: 均值={chunk_mean:.6f}, 标准差={chunk_std:.6f}")

# 计算全局统计信息
global_mean = np.mean(chunk_means)
global_std = np.sqrt(np.mean([s**2 for s in chunk_stds]))

print(f"\n全局统计信息:")
print(f"全局均值: {global_mean:.6f}")
print(f"全局标准差: {global_std:.6f}")

# 验证与直接计算的结果
direct_mean = np.mean(large_data)
direct_std = np.std(large_data)

print(f"\n验证(直接计算):")
print(f"直接计算均值: {direct_mean:.6f}")
print(f"直接计算标准差: {direct_std:.6f}")
print(f"均值误差: {abs(global_mean - direct_mean):.8f}")
print(f"标准差误差: {abs(global_std - direct_std):.8f}")

5.2 流式处理

# 12. 流式处理
print("\n🌊 流式处理:")

def data_generator(n_samples, n_features, batch_size=100):
    """数据生成器,模拟流式数据"""
    for i in range(0, n_samples, batch_size):
        current_batch_size = min(batch_size, n_samples - i)
        batch = np.random.random((current_batch_size, n_features))
        yield batch, i

# 流式处理示例
print("流式处理示例:")
total_samples = 5000
n_features = 50
batch_size = 200

# 在线统计计算
running_sum = np.zeros(n_features)
running_sum_sq = np.zeros(n_features)
total_processed = 0

print(f"处理 {total_samples} 个样本,批大小 {batch_size}")

for batch, batch_start in data_generator(total_samples, n_features, batch_size):
    # 更新在线统计
    running_sum += np.sum(batch, axis=0)
    running_sum_sq += np.sum(batch**2, axis=0)
    total_processed += len(batch)
    
    # 计算当前的均值和方差
    current_mean = running_sum / total_processed
    current_var = (running_sum_sq / total_processed) - current_mean**2
    current_std = np.sqrt(current_var)
    
    if batch_start % (batch_size * 5) == 0:  # 每5个批次打印一次
        print(f"已处理 {total_processed} 样本,当前均值: {np.mean(current_mean):.6f}")

final_mean = running_sum / total_processed
final_var = (running_sum_sq / total_processed) - final_mean**2
final_std = np.sqrt(final_var)

print(f"\n最终统计结果:")
print(f"样本数: {total_processed}")
print(f"特征均值的均值: {np.mean(final_mean):.6f}")
print(f"特征标准差的均值: {np.mean(final_std):.6f}")

# 验证结果
verification_data = np.random.random((total_samples, n_features))
verification_mean = np.mean(verification_data, axis=0)
verification_std = np.std(verification_data, axis=0)

print(f"\n验证(理论值约为0.5和0.289):")
print(f"理论均值: 0.5, 实际: {np.mean(verification_mean):.6f}")
print(f"理论标准差: 0.289, 实际: {np.mean(verification_std):.6f}")

5.3 并行处理

# 13. 并行处理
print("\n⚡ 并行处理:")

from multiprocessing import Pool
import functools

def process_chunk(args):
    """处理数据块的函数"""
    chunk_data, chunk_id = args
    
    # 模拟复杂计算
    result = {
        'chunk_id': chunk_id,
        'mean': np.mean(chunk_data),
        'std': np.std(chunk_data),
        'min': np.min(chunk_data),
        'max': np.max(chunk_data),
        'sum': np.sum(chunk_data),
        'size': chunk_data.size
    }
    
    return result

# 创建测试数据
print("创建测试数据:")
test_data = np.random.random((8000, 50))
chunk_size = 1000
chunks = [test_data[i:i+chunk_size] for i in range(0, len(test_data), chunk_size)]

print(f"数据分为 {len(chunks)} 个块")

# 串行处理
print(f"\n串行处理:")
start_time = time.time()
serial_results = []
for i, chunk in enumerate(chunks):
    result = process_chunk((chunk, i))
    serial_results.append(result)
serial_time = time.time() - start_time

print(f"串行处理时间: {serial_time:.4f}秒")

# 并行处理
print(f"\n并行处理:")
start_time = time.time()
with Pool() as pool:
    chunk_args = [(chunk, i) for i, chunk in enumerate(chunks)]
    parallel_results = pool.map(process_chunk, chunk_args)
parallel_time = time.time() - start_time

print(f"并行处理时间: {parallel_time:.4f}秒")
print(f"速度提升: {serial_time/parallel_time:.2f}x")

# 验证结果一致性
print(f"\n结果验证:")
for serial, parallel in zip(serial_results, parallel_results):
    for key in serial:
        if key != 'chunk_id':
            assert abs(serial[key] - parallel[key]) < 1e-10
print("✅ 串行和并行结果一致")

# 汇总结果
total_sum = sum(r['sum'] for r in parallel_results)
total_size = sum(r['size'] for r in parallel_results)
global_mean = total_sum / total_size

print(f"\n汇总统计:")
print(f"全局均值: {global_mean:.6f}")
print(f"全局最小值: {min(r['min'] for r in parallel_results):.6f}")
print(f"全局最大值: {max(r['max'] for r in parallel_results):.6f}")

6. 性能优化和最佳实践

6.1 IO性能比较

# 14. IO性能比较
print("\n🏃 IO性能比较:")

# 创建测试数据
test_sizes = [1000, 5000, 10000]
data_types = [np.float32, np.float64, np.int32, np.int64]

print("不同格式的IO性能比较:")
print("大小\t数据类型\t.npy保存\t.npy加载\t.npz保存\t.npz加载\t文本保存\t文本加载")

for size in test_sizes:
    for dtype in data_types:
        # 创建测试数据
        test_array = np.random.random((size, 10)).astype(dtype)
        
        # .npy格式
        start = time.time()
        np.save('test.npy', test_array)
        npy_save_time = time.time() - start
        
        start = time.time()
        loaded_npy = np.load('test.npy')
        npy_load_time = time.time() - start
        
        # .npz格式
        start = time.time()
        np.savez('test.npz', data=test_array)
        npz_save_time = time.time() - start
        
        start = time.time()
        loaded_npz = np.load('test.npz')['data']
        npz_load_time = time.time() - start
        
        # 文本格式
        start = time.time()
        np.savetxt('test.txt', test_array)
        txt_save_time = time.time() - start
        
        start = time.time()
        loaded_txt = np.loadtxt('test.txt')
        txt_load_time = time.time() - start
        
        print(f"{size}\t{dtype.__name__}\t{npy_save_time:.4f}s\t{npy_load_time:.4f}s\t"
              f"{npz_save_time:.4f}s\t{npz_load_time:.4f}s\t{txt_save_time:.4f}s\t{txt_load_time:.4f}s")

# 文件大小比较
print(f"\n文件大小比较:")
test_array = np.random.random((1000, 10))

np.save('size_test.npy', test_array)
np.savez('size_test.npz', data=test_array)
np.savez_compressed('size_test_compressed.npz', data=test_array)
np.savetxt('size_test.txt', test_array)

files = ['size_test.npy', 'size_test.npz', 'size_test_compressed.npz', 'size_test.txt']
for filename in files:
    size = os.path.getsize(filename)
    print(f"{filename}: {size} 字节 ({size/1024:.2f} KB)")

6.2 内存使用优化

# 15. 内存使用优化
print("\n💾 内存使用优化:")

import psutil
import gc

def get_memory_usage():
    """获取当前内存使用量"""
    process = psutil.Process()
    return process.memory_info().rss / 1024 / 1024  # MB

print("内存使用优化示例:")

# 基准内存使用
baseline_memory = get_memory_usage()
print(f"基准内存使用: {baseline_memory:.2f} MB")

# 方法1: 直接加载大文件
print(f"\n方法1: 直接加载大文件")
large_array = np.random.random((5000, 1000))
np.save('memory_test.npy', large_array)

direct_memory_before = get_memory_usage()
loaded_direct = np.load('memory_test.npy')
direct_memory_after = get_memory_usage()

print(f"直接加载前内存: {direct_memory_before:.2f} MB")
print(f"直接加载后内存: {direct_memory_after:.2f} MB")
print(f"内存增加: {direct_memory_after - direct_memory_before:.2f} MB")

# 清理内存
del loaded_direct
gc.collect()

# 方法2: 使用内存映射
print(f"\n方法2: 使用内存映射")
mmap_memory_before = get_memory_usage()
mmap_array = np.memmap('memory_test.npy', dtype='float64', mode='r')
mmap_array = mmap_array[40:].reshape(5000, 1000)  # 跳过.npy头部
mmap_memory_after = get_memory_usage()

print(f"内存映射前内存: {mmap_memory_before:.2f} MB")
print(f"内存映射后内存: {mmap_memory_after:.2f} MB")
print(f"内存增加: {mmap_memory_after - mmap_memory_before:.2f} MB")

# 访问部分数据
subset = mmap_array[1000:2000, 100:200]
subset_memory = get_memory_usage()
print(f"访问子集后内存: {subset_memory:.2f} MB")

# 方法3: 分块处理
print(f"\n方法3: 分块处理")
chunk_memory_before = get_memory_usage()

chunk_results = []
for i in range(0, 5000, 1000):
    chunk = mmap_array[i:i+1000]
    chunk_mean = np.mean(chunk)
    chunk_results.append(chunk_mean)
    
    # 强制垃圾回收
    del chunk
    gc.collect()

chunk_memory_after = get_memory_usage()
print(f"分块处理前内存: {chunk_memory_before:.2f} MB")
print(f"分块处理后内存: {chunk_memory_after:.2f} MB")
print(f"内存增加: {chunk_memory_after - chunk_memory_before:.2f} MB")

print(f"\n内存使用总结:")
print(f"直接加载内存开销: {direct_memory_after - direct_memory_before:.2f} MB")
print(f"内存映射内存开销: {mmap_memory_after - mmap_memory_before:.2f} MB")
print(f"分块处理内存开销: {chunk_memory_after - chunk_memory_before:.2f} MB")

7. 实际应用案例

7.1 科学数据处理

# 16. 科学数据处理案例
print("\n🔬 科学数据处理案例:")

# 模拟科学实验数据
print("模拟科学实验数据:")

# 实验参数
n_experiments = 1000
n_measurements = 100
n_conditions = 5

# 生成实验数据
experiment_data = {
    'experiment_id': np.arange(n_experiments),
    'condition': np.random.randint(0, n_conditions, n_experiments),
    'temperature': np.random.normal(25, 5, n_experiments),
    'pressure': np.random.normal(1013, 50, n_experiments),
    'measurements': np.random.normal(100, 10, (n_experiments, n_measurements))
}

print(f"实验数据包含:")
print(f"- {n_experiments} 个实验")
print(f"- {n_conditions} 种条件")
print(f"- 每个实验 {n_measurements} 次测量")

# 保存实验数据
print(f"\n保存实验数据:")
np.savez_compressed('experiment_data.npz', **experiment_data)

# 添加元数据
metadata = {
    'experiment_date': '2023-12-01',
    'researcher': 'Dr. Smith',
    'equipment': 'Spectrometer XYZ-2000',
    'calibration_date': '2023-11-15',
    'notes': 'Temperature controlled environment'
}

with open('experiment_metadata.json', 'w') as f:
    json.dump(metadata, f, indent=2)

print("✅ 实验数据和元数据已保存")

# 数据分析
print(f"\n数据分析:")
measurements = experiment_data['measurements']
conditions = experiment_data['condition']

# 按条件分组分析
condition_stats = {}
for cond in range(n_conditions):
    mask = conditions == cond
    cond_measurements = measurements[mask]
    
    condition_stats[cond] = {
        'count': np.sum(mask),
        'mean': np.mean(cond_measurements),
        'std': np.std(cond_measurements),
        'min': np.min(cond_measurements),
        'max': np.max(cond_measurements)
    }

print("按条件统计:")
for cond, stats in condition_stats.items():
    print(f"条件 {cond}: 实验数={stats['count']}, "
          f"均值={stats['mean']:.2f}±{stats['std']:.2f}")

# 保存分析结果
analysis_results = {
    'condition_statistics': condition_stats,
    'overall_mean': float(np.mean(measurements)),
    'overall_std': float(np.std(measurements)),
    'total_experiments': int(n_experiments),
    'total_measurements': int(n_experiments * n_measurements)
}

with open('analysis_results.json', 'w') as f:
    json.dump(analysis_results, f, indent=2)

print("✅ 分析结果已保存")

7.2 机器学习数据管道

# 17. 机器学习数据管道
print("\n🤖 机器学习数据管道:")

# 创建机器学习数据集
print("创建机器学习数据集:")

n_samples = 10000
n_features = 50
n_classes = 3

# 生成特征数据
X = np.random.random((n_samples, n_features))
# 添加一些相关性
X[:, 1] = X[:, 0] + 0.1 * np.random.random(n_samples)
X[:, 2] = X[:, 0] - X[:, 1] + 0.1 * np.random.random(n_samples)

# 生成标签
y = np.random.randint(0, n_classes, n_samples)

# 生成样本权重
sample_weights = np.random.uniform(0.5, 1.5, n_samples)

print(f"数据集大小: {X.shape}")
print(f"标签分布: {np.bincount(y)}")

# 数据预处理
print(f"\n数据预处理:")

# 标准化特征
X_mean = np.mean(X, axis=0)
X_std = np.std(X, axis=0)
X_normalized = (X - X_mean) / X_std

print(f"标准化后特征均值: {np.mean(X_normalized, axis=0)[:5]}")
print(f"标准化后特征标准差: {np.std(X_normalized, axis=0)[:5]}")

# 数据分割
print(f"\n数据分割:")
indices = np.random.permutation(n_samples)
train_size = int(0.7 * n_samples)
val_size = int(0.2 * n_samples)

train_indices = indices[:train_size]
val_indices = indices[train_size:train_size + val_size]
test_indices = indices[train_size + val_size:]

X_train = X_normalized[train_indices]
y_train = y[train_indices]
weights_train = sample_weights[train_indices]

X_val = X_normalized[val_indices]
y_val = y[val_indices]

X_test = X_normalized[test_indices]
y_test = y[test_indices]

print(f"训练集: {X_train.shape}")
print(f"验证集: {X_val.shape}")
print(f"测试集: {X_test.shape}")

# 保存数据集
print(f"\n保存数据集:")
dataset = {
    'X_train': X_train,
    'y_train': y_train,
    'weights_train': weights_train,
    'X_val': X_val,
    'y_val': y_val,
    'X_test': X_test,
    'y_test': y_test,
    'feature_names': [f'feature_{i}' for i in range(n_features)],
    'class_names': [f'class_{i}' for i in range(n_classes)],
    'preprocessing_params': {
        'mean': X_mean,
        'std': X_std
    }
}

np.savez_compressed('ml_dataset.npz', **dataset)

# 保存数据集信息
dataset_info = {
    'name': 'Synthetic Classification Dataset',
    'version': '1.0',
    'created_date': '2023-12-01',
    'n_samples': int(n_samples),
    'n_features': int(n_features),
    'n_classes': int(n_classes),
    'train_size': int(train_size),
    'val_size': int(val_size),
    'test_size': int(len(test_indices)),
    'preprocessing': 'StandardScaler',
    'class_distribution': {f'class_{i}': int(count) for i, count in enumerate(np.bincount(y))}
}

with open('dataset_info.json', 'w') as f:
    json.dump(dataset_info, f, indent=2)

print("✅ 机器学习数据集已保存")

# 数据加载验证
print(f"\n数据加载验证:")
with np.load('ml_dataset.npz') as data:
    print(f"数据集包含的数组: {list(data.keys())}")
    loaded_X_train = data['X_train']
    loaded_y_train = data['y_train']
    
print(f"加载的训练集形状: {loaded_X_train.shape}")
print(f"数据完整性验证: {np.array_equal(X_train, loaded_X_train)}")

# 文件大小分析
file_size = os.path.getsize('ml_dataset.npz')
print(f"数据集文件大小: {file_size / 1024 / 1024:.2f} MB")

8. 本章小结

8.1 核心知识点

  1. NumPy原生格式

    • .npy: 单数组存储,高效二进制格式
    • .npz: 多数组存储,支持压缩
    • 内存映射: 大文件的高效访问
  2. 文本文件处理

    • savetxt/loadtxt: 基本文本文件操作
    • genfromtxt: 复杂文本文件处理
    • CSV格式的高级处理
  3. 二进制文件

    • tofile/fromfile: 原始二进制格式
    • 自定义二进制格式设计
    • 跨平台兼容性考虑
  4. 数据格式转换

    • 与Pandas的互操作
    • JSON、XML、HDF5等格式
    • 数据类型转换和验证
  5. 大数据处理

    • 分块处理策略
    • 流式处理技术
    • 并行处理优化

8.2 最佳实践

  • 🚀 选择合适的格式: 根据数据特点选择最优存储格式
  • 💾 内存效率: 使用内存映射处理大文件
  • 🔄 数据完整性: 始终验证保存和加载的数据
  • 📊 元数据管理: 保存数据的描述信息和处理参数

8.3 常见陷阱

  • 数据类型丢失: 文本格式可能丢失精度
  • 平台兼容性: 二进制格式的字节序问题
  • 内存溢出: 大文件的不当加载
  • 文件损坏: 缺乏数据完整性检查

8.4 下一步学习

  • ⚡ 学习性能优化技巧
  • 🔧 掌握与其他库的集成
  • 📚 深入了解数据库连接
  • 🌐 学习分布式数据处理

9. 练习题

9.1 基础练习

  1. 文件格式比较

    • 比较不同格式的存储效率
    • 测试加载速度差异
    • 分析压缩效果
  2. 数据完整性

    • 实现数据校验机制
    • 处理文件损坏情况
    • 设计备份策略
  3. 格式转换

    • 实现多种格式间的转换
    • 处理数据类型转换
    • 保持元数据信息

9.2 进阶练习

  1. 大数据处理

    • 设计分块处理算法
    • 实现流式数据处理
    • 优化内存使用
  2. 自定义格式

    • 设计专用数据格式
    • 实现版本兼容性
    • 添加压缩功能
  3. 数据管道

    • 构建完整的数据处理管道
    • 实现错误处理和恢复
    • 添加进度监控

9.3 挑战练习

  1. 分布式存储

    • 实现数据分片存储
    • 设计负载均衡策略
    • 处理节点故障
  2. 实时数据流

    • 处理实时数据流
    • 实现增量更新
    • 优化延迟和吞吐量
  3. 数据库集成

    • 与关系数据库集成
    • 实现NoSQL数据库连接
    • 设计缓存策略

恭喜您完成第7章的学习! 🎉

您已经掌握了NumPy的文件IO和数据持久化技术,这些技能对于实际项目中的数据管理至关重要。在下一章中,我们将学习性能优化和高级技巧。

👉 继续学习第8章:性能优化与高级技巧