学习目标

通过本章学习,你将掌握: - 文件的基本读写操作 - 文件路径处理和目录操作 - CSV和JSON文件处理 - 二进制文件操作 - 网络I/O和异步I/O - 文件系统监控和高级I/O技术

9.1 文件基本操作

文件的打开和关闭

import os
import tempfile
from pathlib import Path

# 基本文件操作
def basic_file_operations():
    """基本文件操作示例"""
    
    print("=== 基本文件操作 ===")
    
    # 1. 创建临时文件用于演示
    temp_dir = tempfile.mkdtemp()
    file_path = os.path.join(temp_dir, "example.txt")
    
    print(f"临时文件路径: {file_path}")
    
    # 2. 写入文件 - 基本方式
    print("\n--- 写入文件 ---")
    
    # 方式1: 手动关闭文件
    file_handle = open(file_path, 'w', encoding='utf-8')
    file_handle.write("Hello, World!\n")
    file_handle.write("这是第二行\n")
    file_handle.close()
    print("文件写入完成(手动关闭)")
    
    # 方式2: 使用with语句(推荐)
    with open(file_path, 'a', encoding='utf-8') as f:
        f.write("这是追加的内容\n")
        f.write("支持中文字符\n")
    print("文件追加完成(自动关闭)")
    
    # 3. 读取文件
    print("\n--- 读取文件 ---")
    
    # 读取整个文件
    with open(file_path, 'r', encoding='utf-8') as f:
        content = f.read()
        print("文件全部内容:")
        print(content)
    
    # 按行读取
    with open(file_path, 'r', encoding='utf-8') as f:
        print("逐行读取:")
        for line_num, line in enumerate(f, 1):
            print(f"第{line_num}行: {line.rstrip()}")
    
    # 读取所有行到列表
    with open(file_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()
        print(f"\n总共 {len(lines)} 行")
        print(f"第一行: {lines[0].rstrip()}")
        print(f"最后一行: {lines[-1].rstrip()}")
    
    # 4. 文件模式详解
    print("\n--- 文件模式 ---")
    
    modes = {
        'r': '只读模式(默认)',
        'w': '写入模式(覆盖)',
        'a': '追加模式',
        'x': '独占创建模式',
        'r+': '读写模式',
        'w+': '读写模式(覆盖)',
        'a+': '读写模式(追加)',
        'rb': '二进制只读',
        'wb': '二进制写入',
        'ab': '二进制追加'
    }
    
    for mode, description in modes.items():
        print(f"{mode:3s}: {description}")
    
    # 5. 文件属性和状态
    print("\n--- 文件属性 ---")
    
    with open(file_path, 'r', encoding='utf-8') as f:
        print(f"文件名: {f.name}")
        print(f"文件模式: {f.mode}")
        print(f"编码: {f.encoding}")
        print(f"是否关闭: {f.closed}")
        print(f"是否可读: {f.readable()}")
        print(f"是否可写: {f.writable()}")
        print(f"是否可定位: {f.seekable()}")
    
    # 6. 文件定位操作
    print("\n--- 文件定位 ---")
    
    with open(file_path, 'r', encoding='utf-8') as f:
        print(f"初始位置: {f.tell()}")
        
        # 读取前10个字符
        content = f.read(10)
        print(f"读取内容: {repr(content)}")
        print(f"当前位置: {f.tell()}")
        
        # 回到文件开头
        f.seek(0)
        print(f"重置后位置: {f.tell()}")
        
        # 移动到文件末尾
        f.seek(0, 2)  # 2表示从文件末尾开始
        print(f"文件末尾位置: {f.tell()}")
    
    # 清理临时文件
    os.remove(file_path)
    os.rmdir(temp_dir)
    print("\n临时文件已清理")

basic_file_operations()

# 文件错误处理
def file_error_handling():
    """文件操作错误处理"""
    
    print("\n=== 文件错误处理 ===")
    
    def safe_file_operation(file_path, operation='read'):
        """安全的文件操作"""
        try:
            if operation == 'read':
                with open(file_path, 'r', encoding='utf-8') as f:
                    return f.read()
            
            elif operation == 'write':
                with open(file_path, 'w', encoding='utf-8') as f:
                    f.write("测试内容")
                    return "写入成功"
            
            elif operation == 'append':
                with open(file_path, 'a', encoding='utf-8') as f:
                    f.write("追加内容\n")
                    return "追加成功"
        
        except FileNotFoundError:
            return f"错误: 文件不存在 - {file_path}"
        
        except PermissionError:
            return f"错误: 没有权限访问文件 - {file_path}"
        
        except UnicodeDecodeError:
            return f"错误: 文件编码错误 - {file_path}"
        
        except UnicodeEncodeError:
            return f"错误: 编码写入错误 - {file_path}"
        
        except OSError as e:
            return f"错误: 操作系统错误 - {e}"
        
        except Exception as e:
            return f"错误: 未知错误 - {e}"
    
    # 测试各种错误情况
    test_cases = [
        ('nonexistent.txt', 'read'),      # 文件不存在
        ('temp_test.txt', 'write'),       # 正常写入
        ('temp_test.txt', 'read'),        # 正常读取
        ('temp_test.txt', 'append'),      # 正常追加
    ]
    
    for file_path, operation in test_cases:
        result = safe_file_operation(file_path, operation)
        print(f"{operation:6s} {file_path:15s}: {result}")
    
    # 清理测试文件
    try:
        os.remove('temp_test.txt')
    except FileNotFoundError:
        pass

file_error_handling()

# 大文件处理
def large_file_handling():
    """大文件处理技术"""
    
    print("\n=== 大文件处理 ===")
    
    # 创建一个较大的测试文件
    large_file = 'large_test.txt'
    
    print("创建大文件...")
    with open(large_file, 'w', encoding='utf-8') as f:
        for i in range(10000):
            f.write(f"这是第 {i+1:05d} 行,包含一些测试数据和中文字符。\n")
    
    file_size = os.path.getsize(large_file)
    print(f"文件大小: {file_size:,} 字节 ({file_size/1024:.1f} KB)")
    
    # 1. 逐行处理(内存友好)
    print("\n--- 逐行处理 ---")
    
    line_count = 0
    word_count = 0
    
    with open(large_file, 'r', encoding='utf-8') as f:
        for line in f:
            line_count += 1
            word_count += len(line.split())
            
            # 只显示前几行和最后几行
            if line_count <= 3 or line_count > 9997:
                print(f"行 {line_count}: {line.rstrip()}")
            elif line_count == 4:
                print("... (省略中间行) ...")
    
    print(f"\n总行数: {line_count:,}")
    print(f"总词数: {word_count:,}")
    
    # 2. 分块读取
    print("\n--- 分块读取 ---")
    
    chunk_size = 1024  # 1KB chunks
    chunk_count = 0
    total_chars = 0
    
    with open(large_file, 'r', encoding='utf-8') as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            
            chunk_count += 1
            total_chars += len(chunk)
            
            if chunk_count <= 3:
                print(f"块 {chunk_count}: {len(chunk)} 字符")
                print(f"  内容预览: {repr(chunk[:50])}...")
    
    print(f"\n总块数: {chunk_count}")
    print(f"总字符数: {total_chars:,}")
    
    # 3. 内存映射(适用于非常大的文件)
    print("\n--- 内存映射 ---")
    
    import mmap
    
    with open(large_file, 'r', encoding='utf-8') as f:
        # 注意:mmap在Windows上可能需要特殊处理
        try:
            with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
                # 查找特定内容
                search_text = b'00100'
                pos = mm.find(search_text)
                if pos != -1:
                    print(f"找到 '{search_text.decode()}' 在位置: {pos}")
                    # 读取周围的内容
                    start = max(0, pos - 20)
                    end = min(len(mm), pos + 50)
                    context = mm[start:end].decode('utf-8', errors='ignore')
                    print(f"上下文: {repr(context)}")
                else:
                    print(f"未找到 '{search_text.decode()}'")
        
        except Exception as e:
            print(f"内存映射失败: {e}")
    
    # 4. 文件迭代器(生成器方式)
    print("\n--- 生成器方式处理 ---")
    
    def file_lines_generator(filename, encoding='utf-8'):
        """文件行生成器"""
        with open(filename, 'r', encoding=encoding) as f:
            for line_num, line in enumerate(f, 1):
                yield line_num, line.rstrip()
    
    def file_chunks_generator(filename, chunk_size=1024, encoding='utf-8'):
        """文件块生成器"""
        with open(filename, 'r', encoding=encoding) as f:
            while True:
                chunk = f.read(chunk_size)
                if not chunk:
                    break
                yield chunk
    
    # 使用生成器处理文件
    print("使用行生成器:")
    line_gen = file_lines_generator(large_file)
    for i, (line_num, line) in enumerate(line_gen):
        if i < 3:  # 只显示前3行
            print(f"  行 {line_num}: {line}")
        else:
            break
    
    print("\n使用块生成器:")
    chunk_gen = file_chunks_generator(large_file, 512)
    for i, chunk in enumerate(chunk_gen):
        if i < 2:  # 只显示前2块
            print(f"  块 {i+1}: {len(chunk)} 字符")
        else:
            break
    
    # 清理测试文件
    os.remove(large_file)
    print("\n大文件已清理")

large_file_handling()

文件路径处理

import os
import glob
from pathlib import Path
import shutil
import tempfile

def path_operations():
    """路径操作示例"""
    
    print("=== 路径操作 ===")
    
    # 1. os.path 模块
    print("\n--- os.path 模块 ---")
    
    # 路径拼接
    path1 = os.path.join('home', 'user', 'documents', 'file.txt')
    print(f"路径拼接: {path1}")
    
    # 路径分解
    sample_path = '/home/user/documents/report.pdf'
    print(f"\n原路径: {sample_path}")
    print(f"目录名: {os.path.dirname(sample_path)}")
    print(f"文件名: {os.path.basename(sample_path)}")
    print(f"文件名和扩展名: {os.path.splitext(sample_path)}")
    print(f"路径分割: {os.path.split(sample_path)}")
    
    # 路径信息
    current_dir = os.getcwd()
    print(f"\n当前目录: {current_dir}")
    print(f"绝对路径: {os.path.abspath('.')}")
    print(f"规范路径: {os.path.normpath('/home/user/../user/./documents')}")
    
    # 路径检查
    test_paths = [current_dir, 'nonexistent.txt', __file__]
    for path in test_paths:
        print(f"\n路径: {path}")
        print(f"  存在: {os.path.exists(path)}")
        print(f"  是文件: {os.path.isfile(path)}")
        print(f"  是目录: {os.path.isdir(path)}")
        print(f"  是绝对路径: {os.path.isabs(path)}")
        
        if os.path.exists(path):
            stat = os.stat(path)
            print(f"  大小: {stat.st_size} 字节")
            print(f"  修改时间: {stat.st_mtime}")
    
    # 2. pathlib 模块(推荐)
    print("\n--- pathlib 模块 ---")
    
    # 创建路径对象
    p = Path('home') / 'user' / 'documents' / 'file.txt'
    print(f"Path对象: {p}")
    print(f"类型: {type(p)}")
    
    # 路径属性
    sample_path = Path('/home/user/documents/report.pdf')
    print(f"\n路径: {sample_path}")
    print(f"父目录: {sample_path.parent}")
    print(f"所有父目录: {list(sample_path.parents)}")
    print(f"文件名: {sample_path.name}")
    print(f"文件主名: {sample_path.stem}")
    print(f"文件扩展名: {sample_path.suffix}")
    print(f"所有扩展名: {sample_path.suffixes}")
    print(f"锚点: {sample_path.anchor}")
    print(f"路径部分: {sample_path.parts}")
    
    # 路径操作
    current_path = Path.cwd()
    print(f"\n当前目录: {current_path}")
    print(f"主目录: {Path.home()}")
    print(f"绝对路径: {Path('file.txt').resolve()}")
    
    # 路径匹配
    print(f"\n路径匹配:")
    test_path = Path('documents/report.pdf')
    patterns = ['*.pdf', '**/*.pdf', 'documents/*', 'reports/*']
    for pattern in patterns:
        print(f"  {test_path} 匹配 '{pattern}': {test_path.match(pattern)}")
    
    # 3. 文件查找和通配符
    print("\n--- 文件查找 ---")
    
    # 创建测试目录结构
    test_dir = Path(tempfile.mkdtemp())
    print(f"测试目录: {test_dir}")
    
    # 1. 基本二进制读写
    print("\n--- 基本二进制读写 ---")
    
    binary_file = test_dir / 'binary_data.bin'
    
    # 写入二进制数据
    data = b'\x00\x01\x02\x03\x04\x05\xFF\xFE\xFD'
    with open(binary_file, 'wb') as f:
        f.write(data)
    
    print(f"写入二进制数据: {len(data)} 字节")
    print(f"数据内容: {data.hex()}")
    
    # 读取二进制数据
    with open(binary_file, 'rb') as f:
        read_data = f.read()
    
    print(f"读取二进制数据: {len(read_data)} 字节")
    print(f"数据内容: {read_data.hex()}")
    print(f"数据相同: {data == read_data}")
    
    # 2. struct模块 - 结构化二进制数据
    print("\n--- struct模块 ---")
    
    # 定义数据结构
    # 格式: i=int(4字节), f=float(4字节), 10s=10字节字符串
    format_string = 'if10s'
    
    # 打包数据
    packed_data = struct.pack(format_string, 42, 3.14159, b'Hello')
    print(f"打包数据大小: {len(packed_data)} 字节")
    print(f"打包数据: {packed_data.hex()}")
    
    # 解包数据
    unpacked_data = struct.unpack(format_string, packed_data)
    print(f"解包数据: {unpacked_data}")
    print(f"整数: {unpacked_data[0]}")
    print(f"浮点数: {unpacked_data[1]}")
    print(f"字符串: {unpacked_data[2].rstrip(b'\x00')}")
    
    # 复杂数据结构
    struct_file = test_dir / 'structured_data.bin'
    
    # 员工记录结构: ID(int), 薪资(float), 姓名(20字节), 年龄(short)
    employee_format = 'if20sH'
    
    employees = [
        (1, 8000.0, '张三'.encode('utf-8'), 28),
        (2, 6500.0, '李四'.encode('utf-8'), 32),
        (3, 7200.0, '王五'.encode('utf-8'), 25)
    ]
    
    # 写入结构化数据
    with open(struct_file, 'wb') as f:
        for emp in employees:
            packed = struct.pack(employee_format, *emp)
            f.write(packed)
    
    print(f"\n写入 {len(employees)} 个员工记录")
    
    # 读取结构化数据
    record_size = struct.calcsize(employee_format)
    print(f"每条记录大小: {record_size} 字节")
    
    with open(struct_file, 'rb') as f:
        print("读取员工记录:")
        while True:
            data = f.read(record_size)
            if not data:
                break
            
            emp_id, salary, name_bytes, age = struct.unpack(employee_format, data)
            name = name_bytes.rstrip(b'\x00').decode('utf-8')
            print(f"  ID: {emp_id}, 姓名: {name}, 薪资: {salary}, 年龄: {age}")
    
    # 3. pickle模块 - Python对象序列化
    print("\n--- pickle序列化 ---")
    
    # 复杂Python对象
    class Employee:
        def __init__(self, name, age, skills):
            self.name = name
            self.age = age
            self.skills = skills
            self.hire_date = '2023-01-01'
        
        def __repr__(self):
            return f"Employee('{self.name}', {self.age}, {self.skills})"
    
    # 创建对象
    employees_obj = [
        Employee('张三', 28, ['Python', 'Django']),
        Employee('李四', 32, ['Java', 'Spring']),
        {'department': '技术部', 'budget': 500000}
    ]
    
    pickle_file = test_dir / 'employees.pickle'
    
    # 序列化到文件
    with open(pickle_file, 'wb') as f:
        pickle.dump(employees_obj, f)
    
    print(f"序列化对象到文件: {pickle_file.name}")
    print(f"文件大小: {pickle_file.stat().st_size} 字节")
    
    # 从文件反序列化
    with open(pickle_file, 'rb') as f:
        loaded_employees = pickle.load(f)
    
    print("反序列化对象:")
    for obj in loaded_employees:
        print(f"  {obj}")
    
    # pickle字符串操作
    data_dict = {'key1': 'value1', 'key2': [1, 2, 3], 'key3': {'nested': True}}
    
    # 序列化到字节串
    pickled_bytes = pickle.dumps(data_dict)
    print(f"\n序列化字节串长度: {len(pickled_bytes)}")
    
    # 从字节串反序列化
    unpickled_dict = pickle.loads(pickled_bytes)
    print(f"反序列化结果: {unpickled_dict}")
    
    # 4. 文件哈希计算
    print("\n--- 文件哈希计算 ---")
    
    def calculate_file_hash(filename, algorithm='sha256'):
        """计算文件哈希值"""
        hash_obj = hashlib.new(algorithm)
        
        with open(filename, 'rb') as f:
            # 分块读取,适用于大文件
            while chunk := f.read(8192):
                hash_obj.update(chunk)
        
        return hash_obj.hexdigest()
    
    # 计算不同文件的哈希
    files_to_hash = [binary_file, struct_file, pickle_file]
    
    for file_path in files_to_hash:
        if file_path.exists():
            sha256_hash = calculate_file_hash(file_path, 'sha256')
            md5_hash = calculate_file_hash(file_path, 'md5')
            print(f"文件: {file_path.name}")
            print(f"  SHA256: {sha256_hash}")
            print(f"  MD5: {md5_hash}")
    
    # 5. 文件分割和合并
    print("\n--- 文件分割和合并 ---")
    
    def split_file(filename, chunk_size=1024):
        """分割文件"""
        file_path = Path(filename)
        base_name = file_path.stem
        extension = file_path.suffix
        
        chunk_files = []
        
        with open(file_path, 'rb') as f:
            chunk_num = 0
            while True:
                chunk = f.read(chunk_size)
                if not chunk:
                    break
                
                chunk_filename = file_path.parent / f"{base_name}.part{chunk_num:03d}{extension}"
                with open(chunk_filename, 'wb') as chunk_file:
                    chunk_file.write(chunk)
                
                chunk_files.append(chunk_filename)
                chunk_num += 1
        
        return chunk_files
    
    def merge_files(chunk_files, output_filename):
        """合并文件"""
        with open(output_filename, 'wb') as output_file:
            for chunk_file in sorted(chunk_files):
                with open(chunk_file, 'rb') as f:
                    output_file.write(f.read())
    
    # 创建测试文件
    large_binary_file = test_dir / 'large_binary.bin'
    test_data = b'A' * 5000 + b'B' * 3000 + b'C' * 2000  # 10KB数据
    
    with open(large_binary_file, 'wb') as f:
        f.write(test_data)
    
    print(f"创建测试文件: {large_binary_file.name} ({len(test_data)} 字节)")
    
    # 分割文件
    chunk_files = split_file(large_binary_file, 2048)  # 2KB chunks
    print(f"文件分割为 {len(chunk_files)} 个块")
    
    for chunk_file in chunk_files:
        size = chunk_file.stat().st_size
        print(f"  {chunk_file.name}: {size} 字节")
    
    # 合并文件
    merged_file = test_dir / 'merged_binary.bin'
    merge_files(chunk_files, merged_file)
    
    # 验证合并结果
    with open(merged_file, 'rb') as f:
        merged_data = f.read()
    
    print(f"\n合并文件: {merged_file.name} ({len(merged_data)} 字节)")
    print(f"数据完整性: {test_data == merged_data}")
    
    # 清理分割文件
    for chunk_file in chunk_files:
        chunk_file.unlink()
    
    # 6. 内存映射文件
    print("\n--- 内存映射文件 ---")
    
    import mmap
    
    # 创建大文件用于内存映射
    mmap_file = test_dir / 'mmap_test.bin'
    data_size = 1024 * 1024  # 1MB
    
    # 创建文件
    with open(mmap_file, 'wb') as f:
        f.write(b'\x00' * data_size)
    
    print(f"创建内存映射文件: {mmap_file.name} ({data_size} 字节)")
    
    try:
        # 内存映射读写
        with open(mmap_file, 'r+b') as f:
            with mmap.mmap(f.fileno(), 0) as mm:
                print(f"内存映射大小: {len(mm)} 字节")
                
                # 写入数据
                mm[0:10] = b'Hello Mmap'
                mm[100:110] = b'Test Data!'
                
                # 读取数据
                print(f"位置0-10: {mm[0:10]}")
                print(f"位置100-110: {mm[100:110]}")
                
                # 查找数据
                pos = mm.find(b'Test')
                if pos != -1:
                    print(f"找到'Test'在位置: {pos}")
                
                # 刷新到磁盘
                mm.flush()
    
    except Exception as e:
        print(f"内存映射操作失败: {e}")
    
    # 清理测试文件
    import shutil
    shutil.rmtree(test_dir)
    print(f"\n测试目录已清理")

binary_file_operations()

# 异步I/O操作
def async_io_operations():
    """异步I/O操作示例"""
    
    print("\n=== 异步I/O操作 ===")
    
    import asyncio
    import aiofiles
    import time
    
    async def async_file_demo():
        """异步文件操作演示"""
        
        # 创建测试目录
        test_dir = Path(tempfile.mkdtemp())
        print(f"异步测试目录: {test_dir}")
        
        # 1. 异步文件写入
        print("\n--- 异步文件写入 ---")
        
        async def write_file_async(filename, content):
            """异步写入文件"""
            async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
                await f.write(content)
            return filename
        
        # 并发写入多个文件
        tasks = []
        for i in range(5):
            filename = test_dir / f'async_file_{i}.txt'
            content = f'这是异步文件 {i}\n' * 100
            tasks.append(write_file_async(filename, content))
        
        start_time = time.time()
        results = await asyncio.gather(*tasks)
        end_time = time.time()
        
        print(f"并发写入 {len(results)} 个文件")
        print(f"耗时: {end_time - start_time:.3f} 秒")
        
        # 2. 异步文件读取
        print("\n--- 异步文件读取 ---")
        
        async def read_file_async(filename):
            """异步读取文件"""
            async with aiofiles.open(filename, 'r', encoding='utf-8') as f:
                content = await f.read()
            return len(content)
        
        # 并发读取文件
        read_tasks = [read_file_async(result) for result in results]
        
        start_time = time.time()
        file_sizes = await asyncio.gather(*read_tasks)
        end_time = time.time()
        
        print(f"并发读取 {len(file_sizes)} 个文件")
        print(f"文件大小: {file_sizes}")
        print(f"耗时: {end_time - start_time:.3f} 秒")
        
        # 3. 异步文件处理
        print("\n--- 异步文件处理 ---")
        
        async def process_file_async(filename):
            """异步处理文件"""
            async with aiofiles.open(filename, 'r', encoding='utf-8') as f:
                lines = await f.readlines()
            
            # 模拟处理时间
            await asyncio.sleep(0.1)
            
            # 统计信息
            line_count = len(lines)
            word_count = sum(len(line.split()) for line in lines)
            char_count = sum(len(line) for line in lines)
            
            return {
                'filename': filename.name,
                'lines': line_count,
                'words': word_count,
                'chars': char_count
            }
        
        # 并发处理文件
        process_tasks = [process_file_async(Path(result)) for result in results]
        
        start_time = time.time()
        stats = await asyncio.gather(*process_tasks)
        end_time = time.time()
        
        print(f"并发处理 {len(stats)} 个文件")
        for stat in stats:
            print(f"  {stat['filename']}: {stat['lines']}行, {stat['words']}词, {stat['chars']}字符")
        print(f"耗时: {end_time - start_time:.3f} 秒")
        
        # 清理测试文件
        import shutil
        shutil.rmtree(test_dir)
        print(f"\n异步测试目录已清理")
    
    # 运行异步演示
    try:
        # 检查是否安装了aiofiles
        import aiofiles
        asyncio.run(async_file_demo())
    except ImportError:
        print("需要安装aiofiles: pip install aiofiles")
        print("跳过异步I/O演示")
    except Exception as e:
        print(f"异步I/O演示失败: {e}")

async_io_operations()

运行二进制文件操作示例:

python binary_io.py

9.4 网络I/O和文件监控

网络文件操作

import urllib.request
import urllib.parse
import urllib.error
import requests
import tempfile
from pathlib import Path
import time

def network_io_operations():
    """网络I/O操作示例"""
    
    print("=== 网络I/O操作 ===")
    
    # 创建测试目录
    test_dir = Path(tempfile.mkdtemp())
    print(f"测试目录: {test_dir}")
    
    # 1. urllib下载文件
    print("\n--- urllib下载文件 ---")
    
    def download_with_urllib(url, filename):
        """使用urllib下载文件"""
        try:
            print(f"下载: {url}")
            
            # 创建请求对象
            req = urllib.request.Request(url)
            req.add_header('User-Agent', 'Python/urllib')
            
            # 下载文件
            with urllib.request.urlopen(req) as response:
                # 获取文件信息
                content_length = response.headers.get('Content-Length')
                content_type = response.headers.get('Content-Type')
                
                print(f"  内容类型: {content_type}")
                print(f"  文件大小: {content_length} 字节")
                
                # 保存文件
                with open(filename, 'wb') as f:
                    while True:
                        chunk = response.read(8192)
                        if not chunk:
                            break
                        f.write(chunk)
                
                return True
        
        except urllib.error.URLError as e:
            print(f"  下载失败: {e}")
            return False
        except Exception as e:
            print(f"  错误: {e}")
            return False
    
    # 测试下载(使用一个小的测试文件)
    test_url = "https://httpbin.org/json"
    urllib_file = test_dir / "urllib_download.json"
    
    if download_with_urllib(test_url, urllib_file):
        print(f"  下载成功: {urllib_file.name}")
        print(f"  文件大小: {urllib_file.stat().st_size} 字节")
    
    # 2. requests下载文件(推荐)
    print("\n--- requests下载文件 ---")
    
    def download_with_requests(url, filename, chunk_size=8192):
        """使用requests下载文件"""
        try:
            print(f"下载: {url}")
            
            # 发送请求
            response = requests.get(url, stream=True)
            response.raise_for_status()
            
            # 获取文件信息
            content_length = response.headers.get('Content-Length')
            content_type = response.headers.get('Content-Type')
            
            print(f"  状态码: {response.status_code}")
            print(f"  内容类型: {content_type}")
            print(f"  文件大小: {content_length} 字节")
            
            # 下载文件
            downloaded = 0
            with open(filename, 'wb') as f:
                for chunk in response.iter_content(chunk_size=chunk_size):
                    if chunk:
                        f.write(chunk)
                        downloaded += len(chunk)
                        
                        # 显示进度(如果知道文件大小)
                        if content_length:
                            progress = (downloaded / int(content_length)) * 100
                            print(f"\r  进度: {progress:.1f}%", end='', flush=True)
            
            print()  # 换行
            return True
        
        except requests.RequestException as e:
            print(f"  下载失败: {e}")
            return False
        except Exception as e:
            print(f"  错误: {e}")
            return False
    
    # 测试requests下载
    requests_file = test_dir / "requests_download.json"
    
    try:
        if download_with_requests(test_url, requests_file):
            print(f"  下载成功: {requests_file.name}")
            print(f"  文件大小: {requests_file.stat().st_size} 字节")
    except ImportError:
        print("  需要安装requests: pip install requests")
    
    # 3. 文件上传
    print("\n--- 文件上传 ---")
    
    def upload_file(url, filename, field_name='file'):
        """上传文件"""
        try:
            with open(filename, 'rb') as f:
                files = {field_name: f}
                response = requests.post(url, files=files)
                response.raise_for_status()
                
                print(f"上传成功: {response.status_code}")
                return response.json() if response.headers.get('Content-Type', '').startswith('application/json') else response.text
        
        except requests.RequestException as e:
            print(f"上传失败: {e}")
            return None
        except Exception as e:
            print(f"错误: {e}")
            return None
    
    # 创建测试文件用于上传
    upload_test_file = test_dir / "upload_test.txt"
    upload_test_file.write_text("这是一个测试上传文件\n包含一些中文内容")
    
    # 测试上传(使用httpbin.org的测试端点)
    upload_url = "https://httpbin.org/post"
    
    try:
        result = upload_file(upload_url, upload_test_file)
        if result:
            print(f"上传响应: {type(result)}")
    except:
        print("上传测试跳过(网络问题)")
    
    # 4. FTP操作
    print("\n--- FTP操作 ---")
    
    from ftplib import FTP
    
    def ftp_operations_demo():
        """FTP操作演示"""
        print("FTP操作演示(需要FTP服务器)")
        
        # 这里只演示代码结构,实际需要FTP服务器
        ftp_config = {
            'host': 'ftp.example.com',
            'user': 'username',
            'password': 'password'
        }
        
        print("FTP连接示例代码:")
        print(f"""try:
    ftp = FTP('{ftp_config['host']}')
    ftp.login('{ftp_config['user']}', '{ftp_config['password']}')
    
    # 列出目录
    files = ftp.nlst()
    print(f"FTP文件列表: {{files}}")
    
    # 下载文件
    with open('downloaded_file.txt', 'wb') as f:
        ftp.retrbinary('RETR remote_file.txt', f.write)
    
    # 上传文件
    with open('local_file.txt', 'rb') as f:
        ftp.storbinary('STOR remote_file.txt', f)
    
    ftp.quit()
except Exception as e:
    print(f"FTP操作失败: {{e}}")
""")
    
    ftp_operations_demo()
    
    # 清理测试文件
    import shutil
    shutil.rmtree(test_dir)
    print(f"\n测试目录已清理")

network_io_operations()

# 文件系统监控
def file_system_monitoring():
    """文件系统监控示例"""
    
    print("\n=== 文件系统监控 ===")
    
    # 创建测试目录
    test_dir = Path(tempfile.mkdtemp())
    print(f"监控测试目录: {test_dir}")
    
    # 1. 基本文件监控(轮询方式)
    print("\n--- 基本文件监控 ---")
    
    def simple_file_monitor(directory, interval=1, duration=10):
        """简单的文件监控(轮询方式)"""
        print(f"开始监控目录: {directory}")
        print(f"监控间隔: {interval}秒, 持续时间: {duration}秒")
        
        # 记录初始状态
        initial_files = {}
        if directory.exists():
            for file_path in directory.rglob('*'):
                if file_path.is_file():
                    stat = file_path.stat()
                    initial_files[str(file_path)] = {
                        'size': stat.st_size,
                        'mtime': stat.st_mtime
                    }
        
        print(f"初始文件数: {len(initial_files)}")
        
        start_time = time.time()
        while time.time() - start_time < duration:
            time.sleep(interval)
            
            current_files = {}
            if directory.exists():
                for file_path in directory.rglob('*'):
                    if file_path.is_file():
                        stat = file_path.stat()
                        current_files[str(file_path)] = {
                            'size': stat.st_size,
                            'mtime': stat.st_mtime
                        }
            
            # 检查变化
            # 新文件
            new_files = set(current_files.keys()) - set(initial_files.keys())
            for file_path in new_files:
                print(f"  [新建] {Path(file_path).name}")
            
            # 删除的文件
            deleted_files = set(initial_files.keys()) - set(current_files.keys())
            for file_path in deleted_files:
                print(f"  [删除] {Path(file_path).name}")
            
            # 修改的文件
            for file_path in current_files:
                if file_path in initial_files:
                    current = current_files[file_path]
                    initial = initial_files[file_path]
                    
                    if (current['size'] != initial['size'] or 
                        current['mtime'] != initial['mtime']):
                        print(f"  [修改] {Path(file_path).name}")
            
            # 更新状态
            initial_files = current_files
        
        print("监控结束")
    
    # 2. 使用watchdog库进行高级监控
    print("\n--- 高级文件监控 ---")
    
    try:
        from watchdog.observers import Observer
        from watchdog.events import FileSystemEventHandler
        
        class FileChangeHandler(FileSystemEventHandler):
            """文件变化处理器"""
            
            def on_modified(self, event):
                if not event.is_directory:
                    print(f"  [修改] {Path(event.src_path).name}")
            
            def on_created(self, event):
                if not event.is_directory:
                    print(f"  [创建] {Path(event.src_path).name}")
            
            def on_deleted(self, event):
                if not event.is_directory:
                    print(f"  [删除] {Path(event.src_path).name}")
            
            def on_moved(self, event):
                if not event.is_directory:
                    src_name = Path(event.src_path).name
                    dest_name = Path(event.dest_path).name
                    print(f"  [移动] {src_name} -> {dest_name}")
        
        def advanced_file_monitor(directory, duration=10):
            """高级文件监控"""
            print(f"开始高级监控: {directory}")
            
            event_handler = FileChangeHandler()
            observer = Observer()
            observer.schedule(event_handler, str(directory), recursive=True)
            
            observer.start()
            
            try:
                time.sleep(duration)
            except KeyboardInterrupt:
                pass
            finally:
                observer.stop()
                observer.join()
            
            print("高级监控结束")
        
        # 启动监控(在后台)
        import threading
        
        def run_monitor():
            advanced_file_monitor(test_dir, 5)
        
        monitor_thread = threading.Thread(target=run_monitor)
        monitor_thread.start()
        
        # 模拟文件操作
        print("\n模拟文件操作:")
        time.sleep(1)
        
        # 创建文件
        test_file1 = test_dir / "test1.txt"
        test_file1.write_text("测试文件1")
        print("创建 test1.txt")
        time.sleep(1)
        
        # 修改文件
        test_file1.write_text("修改后的测试文件1")
        print("修改 test1.txt")
        time.sleep(1)
        
        # 创建另一个文件
        test_file2 = test_dir / "test2.txt"
        test_file2.write_text("测试文件2")
        print("创建 test2.txt")
        time.sleep(1)
        
        # 移动文件
        test_file3 = test_dir / "test3.txt"
        test_file2.rename(test_file3)
        print("重命名 test2.txt -> test3.txt")
        time.sleep(1)
        
        # 删除文件
        test_file1.unlink()
        print("删除 test1.txt")
        
        # 等待监控线程结束
        monitor_thread.join()
        
    except ImportError:
        print("需要安装watchdog: pip install watchdog")
        print("使用简单监控代替...")
        
        # 启动简单监控
        import threading
        
        def run_simple_monitor():
            simple_file_monitor(test_dir, 1, 5)
        
        monitor_thread = threading.Thread(target=run_simple_monitor)
        monitor_thread.start()
        
        # 模拟文件操作
        print("\n模拟文件操作:")
        time.sleep(2)
        
        test_file = test_dir / "simple_test.txt"
        test_file.write_text("简单测试")
        print("创建文件")
        
        time.sleep(2)
        test_file.write_text("修改内容")
        print("修改文件")
        
        monitor_thread.join()
    
    # 清理测试目录
    import shutil
    shutil.rmtree(test_dir)
    print(f"\n监控测试目录已清理")

file_system_monitoring()

运行网络I/O和文件监控示例:

python network_monitoring.py

本章小结

本章我们全面学习了Python的文件操作和I/O:

  1. 文件基本操作:文件读写、路径处理、目录操作、错误处理
  2. 结构化数据:CSV和JSON文件的处理和操作
  3. 二进制文件:struct模块、pickle序列化、文件哈希、内存映射
  4. 高级I/O:异步I/O、网络文件操作、文件系统监控

下一章预告

下一章我们将学习《标准库和第三方库》,内容包括: - 常用标准库详解 - 第三方库的安装和使用 - 包管理和虚拟环境 - 库的选择和最佳实践

练习题

基础练习

  1. 文件操作

    • 实现一个文件备份工具
    • 创建一个日志文件分析器
    • 编写一个文件同步程序
  2. 数据处理

    • 实现CSV数据的增删改查
    • 创建JSON配置文件管理器
    • 编写数据格式转换工具

进阶练习

  1. 高级I/O

    • 实现一个文件下载管理器
    • 创建一个实时文件监控系统
    • 编写一个分布式文件同步工具
  2. 性能优化

    • 实现大文件的高效处理
    • 创建一个文件压缩工具
    • 编写一个文件去重系统

提示:文件I/O是程序与外部世界交互的重要方式。掌握各种文件操作技术,能让你的程序更加实用和强大。注意处理异常情况和性能优化。 {test_dir}“)

# 创建测试文件
test_files = [
    'file1.txt', 'file2.txt', 'document.pdf',
    'subdir/file3.txt', 'subdir/image.jpg',
    'subdir/nested/file4.txt'
]

for file_path in test_files:
    full_path = test_dir / file_path
    full_path.parent.mkdir(parents=True, exist_ok=True)
    full_path.write_text(f"内容: {file_path}")

# 使用glob查找文件
print("\nglob查找:")
os.chdir(test_dir)

patterns = ['*.txt', '*.pdf', '**/*.txt', 'subdir/*', '**/file*']
for pattern in patterns:
    matches = glob.glob(pattern, recursive=True)
    print(f"  '{pattern}': {matches}")

# 使用pathlib查找文件
print("\npathlib查找:")

# 查找所有.txt文件
txt_files = list(test_dir.glob('**/*.txt'))
print(f"所有.txt文件: {[str(f.relative_to(test_dir)) for f in txt_files]}")

# 查找特定模式的文件
file_pattern = list(test_dir.glob('**/file*'))
print(f"以'file'开头的文件: {[str(f.relative_to(test_dir)) for f in file_pattern]}")

# 递归查找所有文件
all_files = [f for f in test_dir.rglob('*') if f.is_file()]
print(f"所有文件: {[str(f.relative_to(test_dir)) for f in all_files]}")

# 4. 目录操作
print("\n--- 目录操作 ---")

# 创建目录
new_dir = test_dir / 'new_directory' / 'nested'
new_dir.mkdir(parents=True, exist_ok=True)
print(f"创建目录: {new_dir.relative_to(test_dir)}")

# 列出目录内容
print(f"\n目录内容 ({test_dir}):")
for item in test_dir.iterdir():
    item_type = "目录" if item.is_dir() else "文件"
    size = item.stat().st_size if item.is_file() else "-"
    print(f"  {item_type}: {item.name} ({size} 字节)")

# 递归列出所有内容
print(f"\n递归目录内容:")
for item in test_dir.rglob('*'):
    level = len(item.relative_to(test_dir).parts)
    indent = "  " * level
    item_type = "📁" if item.is_dir() else "📄"
    print(f"{indent}{item_type} {item.name}")

# 目录统计
file_count = sum(1 for f in test_dir.rglob('*') if f.is_file())
dir_count = sum(1 for d in test_dir.rglob('*') if d.is_dir())
total_size = sum(f.stat().st_size for f in test_dir.rglob('*') if f.is_file())

print(f"\n目录统计:")
print(f"  文件数: {file_count}")
print(f"  目录数: {dir_count}")
print(f"  总大小: {total_size} 字节")

# 清理测试目录
os.chdir(Path.cwd().parent)  # 退出测试目录
shutil.rmtree(test_dir)
print(f"\n测试目录已清理")

path_operations()

文件和目录管理

def file_directory_management(): “”“文件和目录管理”“”

print("\n=== 文件和目录管理 ===")

# 创建测试环境
base_dir = Path(tempfile.mkdtemp())
print(f"测试目录: {base_dir}")

# 1. 文件复制和移动
print("\n--- 文件复制和移动 ---")

# 创建源文件
source_file = base_dir / 'source.txt'
source_file.write_text('这是源文件的内容\n包含多行文本\n')

# 复制文件
dest_file = base_dir / 'destination.txt'
shutil.copy2(source_file, dest_file)
print(f"复制文件: {source_file.name} -> {dest_file.name}")

# 验证复制
print(f"源文件大小: {source_file.stat().st_size}")
print(f"目标文件大小: {dest_file.stat().st_size}")
print(f"内容相同: {source_file.read_text() == dest_file.read_text()}")

# 移动文件
moved_file = base_dir / 'moved.txt'
shutil.move(dest_file, moved_file)
print(f"移动文件: {dest_file.name} -> {moved_file.name}")
print(f"原文件存在: {dest_file.exists()}")
print(f"新文件存在: {moved_file.exists()}")

# 2. 目录复制
print("\n--- 目录复制 ---")

# 创建源目录结构
source_dir = base_dir / 'source_dir'
source_dir.mkdir()

(source_dir / 'file1.txt').write_text('文件1内容')
(source_dir / 'file2.txt').write_text('文件2内容')

subdir = source_dir / 'subdir'
subdir.mkdir()
(subdir / 'file3.txt').write_text('文件3内容')

# 复制整个目录
dest_dir = base_dir / 'dest_dir'
shutil.copytree(source_dir, dest_dir)
print(f"复制目录: {source_dir.name} -> {dest_dir.name}")

# 验证目录复制
def count_files(directory):
    return sum(1 for f in directory.rglob('*') if f.is_file())

source_files = count_files(source_dir)
dest_files = count_files(dest_dir)
print(f"源目录文件数: {source_files}")
print(f"目标目录文件数: {dest_files}")

# 3. 文件权限和属性
print("\n--- 文件权限和属性 ---")

test_file = base_dir / 'test_permissions.txt'
test_file.write_text('测试权限的文件')

# 获取文件状态
stat = test_file.stat()
print(f"文件: {test_file.name}")
print(f"大小: {stat.st_size} 字节")
print(f"权限: {oct(stat.st_mode)}")
print(f"创建时间: {stat.st_ctime}")
print(f"修改时间: {stat.st_mtime}")
print(f"访问时间: {stat.st_atime}")

# 修改文件权限(Unix/Linux系统)
try:
    import stat as stat_module
    # 设置为只读
    test_file.chmod(stat_module.S_IREAD)
    print(f"设置为只读: {oct(test_file.stat().st_mode)}")

    # 恢复读写权限
    test_file.chmod(stat_module.S_IREAD | stat_module.S_IWRITE)
    print(f"恢复读写: {oct(test_file.stat().st_mode)}")
except Exception as e:
    print(f"权限修改失败(可能是Windows系统): {e}")

# 4. 文件比较
print("\n--- 文件比较 ---")

import filecmp

# 创建比较文件
file1 = base_dir / 'compare1.txt'
file2 = base_dir / 'compare2.txt'
file3 = base_dir / 'compare3.txt'

content = '相同的内容\n第二行\n'
file1.write_text(content)
file2.write_text(content)
file3.write_text('不同的内容\n')

# 比较文件
print(f"file1 == file2: {filecmp.cmp(file1, file2)}")
print(f"file1 == file3: {filecmp.cmp(file1, file3)}")

# 比较目录
comparison = filecmp.dircmp(source_dir, dest_dir)
print(f"\n目录比较:")
print(f"相同文件: {comparison.same_files}")
print(f"不同文件: {comparison.diff_files}")
print(f"仅在左侧: {comparison.left_only}")
print(f"仅在右侧: {comparison.right_only}")

# 5. 临时文件和目录
print("\n--- 临时文件和目录 ---")

# 临时文件
with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.txt') as temp_file:
    temp_file.write('临时文件内容')
    temp_path = temp_file.name
    print(f"临时文件: {temp_path}")

# 读取临时文件
with open(temp_path, 'r') as f:
    content = f.read()
    print(f"临时文件内容: {content}")

# 删除临时文件
os.unlink(temp_path)
print(f"临时文件已删除")

# 临时目录
with tempfile.TemporaryDirectory() as temp_dir:
    temp_path = Path(temp_dir)
    print(f"临时目录: {temp_path}")

    # 在临时目录中创建文件
    (temp_path / 'temp_file.txt').write_text('临时目录中的文件')
    print(f"临时目录文件数: {len(list(temp_path.iterdir()))}")

print("临时目录已自动清理")

# 清理测试环境
shutil.rmtree(base_dir)
print(f"\n测试环境已清理")

file_directory_management()


运行文件基本操作示例:
```bash
python file_basics.py

9.2 结构化数据文件处理

CSV文件处理

import csv
import io
from pathlib import Path
import tempfile
import os

def csv_operations():
    """CSV文件操作示例"""
    
    print("=== CSV文件操作 ===")
    
    # 创建测试目录
    test_dir = Path(tempfile.mkdtemp())
    print(f"测试目录: {test_dir}")
    
    # 1. 基本CSV写入
    print("\n--- CSV写入 ---")
    
    csv_file = test_dir / 'employees.csv'
    
    # 准备数据
    employees = [
        ['姓名', '年龄', '部门', '薪资'],
        ['张三', 28, '技术部', 8000],
        ['李四', 32, '销售部', 6500],
        ['王五', 25, '技术部', 7200],
        ['赵六', 35, '人事部', 7800],
        ['钱七', 29, '技术部', 8500]
    ]
    
    # 写入CSV文件
    with open(csv_file, 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerows(employees)
    
    print(f"CSV文件已创建: {csv_file.name}")
    print(f"文件大小: {csv_file.stat().st_size} 字节")
    
    # 2. 基本CSV读取
    print("\n--- CSV读取 ---")
    
    with open(csv_file, 'r', encoding='utf-8') as f:
        reader = csv.reader(f)
        print("逐行读取:")
        for row_num, row in enumerate(reader, 1):
            print(f"  第{row_num}行: {row}")
    
    # 3. 字典方式处理CSV
    print("\n--- 字典方式处理 ---")
    
    # 使用DictWriter写入
    dict_csv_file = test_dir / 'employees_dict.csv'
    
    employees_dict = [
        {'姓名': '张三', '年龄': 28, '部门': '技术部', '薪资': 8000},
        {'姓名': '李四', '年龄': 32, '部门': '销售部', '薪资': 6500},
        {'姓名': '王五', '年龄': 25, '部门': '技术部', '薪资': 7200},
        {'姓名': '赵六', '年龄': 35, '部门': '人事部', '薪资': 7800},
        {'姓名': '钱七', '年龄': 29, '部门': '技术部', '薪资': 8500}
    ]
    
    fieldnames = ['姓名', '年龄', '部门', '薪资']
    
    with open(dict_csv_file, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()  # 写入表头
        writer.writerows(employees_dict)
    
    print(f"字典CSV文件已创建: {dict_csv_file.name}")
    
    # 使用DictReader读取
    with open(dict_csv_file, 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        print(f"\n字段名: {reader.fieldnames}")
        print("字典方式读取:")
        for row_num, row in enumerate(reader, 1):
            print(f"  员工{row_num}: {dict(row)}")
    
    # 4. CSV数据分析
    print("\n--- CSV数据分析 ---")
    
    def analyze_csv(filename):
        """分析CSV文件"""
        with open(filename, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            
            # 收集数据
            data = list(reader)
            
            print(f"数据分析 - {filename.name}:")
            print(f"  总记录数: {len(data)}")
            print(f"  字段数: {len(reader.fieldnames)}")
            print(f"  字段名: {reader.fieldnames}")
            
            # 部门统计
            departments = {}
            total_salary = 0
            ages = []
            
            for row in data:
                dept = row['部门']
                departments[dept] = departments.get(dept, 0) + 1
                total_salary += int(row['薪资'])
                ages.append(int(row['年龄']))
            
            print(f"  部门分布: {departments}")
            print(f"  平均薪资: {total_salary / len(data):.0f}")
            print(f"  平均年龄: {sum(ages) / len(ages):.1f}")
            print(f"  年龄范围: {min(ages)} - {max(ages)}")
            
            return data
    
    data = analyze_csv(dict_csv_file)
    
    # 5. CSV数据过滤和处理
    print("\n--- CSV数据过滤 ---")
    
    # 过滤技术部员工
    tech_employees = [emp for emp in data if emp['部门'] == '技术部']
    
    tech_csv_file = test_dir / 'tech_employees.csv'
    with open(tech_csv_file, 'w', newline='', encoding='utf-8') as f:
        if tech_employees:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(tech_employees)
    
    print(f"技术部员工文件: {tech_csv_file.name}")
    print(f"技术部员工数: {len(tech_employees)}")
    
    # 薪资排序
    sorted_employees = sorted(data, key=lambda x: int(x['薪资']), reverse=True)
    
    sorted_csv_file = test_dir / 'employees_sorted.csv'
    with open(sorted_csv_file, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(sorted_employees)
    
    print(f"薪资排序文件: {sorted_csv_file.name}")
    print("薪资排序(前3名):")
    for i, emp in enumerate(sorted_employees[:3], 1):
        print(f"  {i}. {emp['姓名']}: {emp['薪资']}")
    
    # 6. 处理特殊CSV格式
    print("\n--- 特殊CSV格式 ---")
    
    # 自定义分隔符和引用符
    special_csv = test_dir / 'special.csv'
    
    special_data = [
        ['产品名称', '价格', '描述'],
        ['iPhone 13', '6999', '苹果手机,包含"高级"功能'],
        ['MacBook Pro', '12999', '专业笔记本;适合开发'],
        ['iPad Air', '4399', '平板电脑,轻薄便携']
    ]
    
    # 使用分号作为分隔符
    with open(special_csv, 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f, delimiter=';', quotechar='"', quoting=csv.QUOTE_MINIMAL)
        writer.writerows(special_data)
    
    print(f"特殊格式CSV: {special_csv.name}")
    
    # 读取特殊格式
    with open(special_csv, 'r', encoding='utf-8') as f:
        reader = csv.reader(f, delimiter=';')
        print("特殊格式读取:")
        for row in reader:
            print(f"  {row}")
    
    # 7. CSV错误处理
    print("\n--- CSV错误处理 ---")
    
    def safe_csv_read(filename, encoding='utf-8'):
        """安全读取CSV文件"""
        try:
            with open(filename, 'r', encoding=encoding) as f:
                # 检测方言
                sample = f.read(1024)
                f.seek(0)
                
                sniffer = csv.Sniffer()
                dialect = sniffer.sniff(sample)
                
                print(f"检测到的CSV方言:")
                print(f"  分隔符: {repr(dialect.delimiter)}")
                print(f"  引用符: {repr(dialect.quotechar)}")
                print(f"  行终止符: {repr(dialect.lineterminator)}")
                
                reader = csv.reader(f, dialect)
                data = list(reader)
                return data
        
        except UnicodeDecodeError:
            print(f"编码错误,尝试其他编码")
            for encoding in ['gbk', 'latin1']:
                try:
                    return safe_csv_read(filename, encoding)
                except:
                    continue
            raise
        
        except csv.Error as e:
            print(f"CSV格式错误: {e}")
            return None
        
        except Exception as e:
            print(f"读取错误: {e}")
            return None
    
    # 测试错误处理
    data = safe_csv_read(special_csv)
    if data:
        print(f"成功读取 {len(data)} 行数据")
    
    # 8. 内存中的CSV操作
    print("\n--- 内存CSV操作 ---")
    
    # 使用StringIO在内存中处理CSV
    csv_string = """姓名,年龄,城市
张三,25,北京
李四,30,上海
王五,28,广州"""
    
    # 从字符串读取CSV
    csv_io = io.StringIO(csv_string)
    reader = csv.DictReader(csv_io)
    
    print("从字符串读取CSV:")
    for row in reader:
        print(f"  {dict(row)}")
    
    # 写入到字符串
    output = io.StringIO()
    writer = csv.DictWriter(output, fieldnames=['姓名', '年龄', '城市'])
    writer.writeheader()
    writer.writerow({'姓名': '赵六', '年龄': 35, '城市': '深圳'})
    
    csv_output = output.getvalue()
    print(f"\n写入到字符串:")
    print(csv_output)
    
    # 清理测试文件
    import shutil
    shutil.rmtree(test_dir)
    print(f"\n测试目录已清理")

csv_operations()

JSON文件处理

import json
import tempfile
from pathlib import Path
from datetime import datetime, date
from decimal import Decimal

def json_operations():
    """JSON文件操作示例"""
    
    print("=== JSON文件操作 ===")
    
    # 创建测试目录
    test_dir = Path(tempfile.mkdtemp())
    print(f"测试目录: {test_dir}")
    
    # 1. 基本JSON操作
    print("\n--- 基本JSON操作 ---")
    
    # 准备数据
    data = {
        'company': '科技有限公司',
        'employees': [
            {
                'id': 1,
                'name': '张三',
                'age': 28,
                'department': '技术部',
                'salary': 8000,
                'skills': ['Python', 'JavaScript', 'SQL'],
                'is_active': True,
                'hire_date': '2020-01-15'
            },
            {
                'id': 2,
                'name': '李四',
                'age': 32,
                'department': '销售部',
                'salary': 6500,
                'skills': ['销售', '客户管理'],
                'is_active': True,
                'hire_date': '2019-03-20'
            },
            {
                'id': 3,
                'name': '王五',
                'age': 25,
                'department': '技术部',
                'salary': 7200,
                'skills': ['Java', 'Spring', 'MySQL'],
                'is_active': False,
                'hire_date': '2021-06-10'
            }
        ],
        'departments': {
            '技术部': {'budget': 500000, 'head': '张三'},
            '销售部': {'budget': 300000, 'head': '李四'},
            '人事部': {'budget': 200000, 'head': '赵六'}
        },
        'metadata': {
            'created_at': '2023-01-01T00:00:00',
            'version': '1.0',
            'total_employees': 3
        }
    }
    
    # 写入JSON文件
    json_file = test_dir / 'company.json'
    
    with open(json_file, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)
    
    print(f"JSON文件已创建: {json_file.name}")
    print(f"文件大小: {json_file.stat().st_size} 字节")
    
    # 读取JSON文件
    with open(json_file, 'r', encoding='utf-8') as f:
        loaded_data = json.load(f)
    
    print(f"\n读取的数据类型: {type(loaded_data)}")
    print(f"公司名称: {loaded_data['company']}")
    print(f"员工数量: {len(loaded_data['employees'])}")
    
    # 2. JSON字符串操作
    print("\n--- JSON字符串操作 ---")
    
    # 对象转JSON字符串
    employee = loaded_data['employees'][0]
    json_string = json.dumps(employee, ensure_ascii=False, indent=2)
    print(f"员工JSON字符串:")
    print(json_string)
    
    # JSON字符串转对象
    parsed_employee = json.loads(json_string)
    print(f"\n解析后的员工: {parsed_employee['name']}")
    
    # 3. JSON数据查询和操作
    print("\n--- JSON数据查询 ---")
    
    def query_employees(data, **filters):
        """查询员工数据"""
        employees = data['employees']
        results = []
        
        for emp in employees:
            match = True
            for key, value in filters.items():
                if key not in emp or emp[key] != value:
                    match = False
                    break
            if match:
                results.append(emp)
        
        return results
    
    # 查询技术部员工
    tech_employees = query_employees(loaded_data, department='技术部')
    print(f"技术部员工数: {len(tech_employees)}")
    for emp in tech_employees:
        print(f"  {emp['name']}: {emp['skills']}")
    
    # 查询在职员工
    active_employees = query_employees(loaded_data, is_active=True)
    print(f"\n在职员工数: {len(active_employees)}")
    
    # 薪资统计
    salaries = [emp['salary'] for emp in loaded_data['employees']]
    print(f"\n薪资统计:")
    print(f"  平均薪资: {sum(salaries) / len(salaries):.0f}")
    print(f"  最高薪资: {max(salaries)}")
    print(f"  最低薪资: {min(salaries)}")
    
    # 4. JSON数据修改
    print("\n--- JSON数据修改 ---")
    
    # 添加新员工
    new_employee = {
        'id': 4,
        'name': '赵六',
        'age': 35,
        'department': '人事部',
        'salary': 7800,
        'skills': ['招聘', '培训', '绩效管理'],
        'is_active': True,
        'hire_date': '2018-09-01'
    }
    
    loaded_data['employees'].append(new_employee)
    loaded_data['metadata']['total_employees'] = len(loaded_data['employees'])
    
    print(f"添加新员工: {new_employee['name']}")
    print(f"总员工数: {loaded_data['metadata']['total_employees']}")
    
    # 更新员工信息
    for emp in loaded_data['employees']:
        if emp['name'] == '张三':
            emp['salary'] = 8500  # 加薪
            emp['skills'].append('Docker')  # 新技能
            print(f"更新员工 {emp['name']}: 薪资 {emp['salary']}, 技能 {emp['skills']}")
            break
    
    # 删除离职员工
    loaded_data['employees'] = [emp for emp in loaded_data['employees'] if emp['is_active']]
    loaded_data['metadata']['total_employees'] = len(loaded_data['employees'])
    
    print(f"删除离职员工后,总员工数: {loaded_data['metadata']['total_employees']}")
    
    # 保存修改后的数据
    updated_json_file = test_dir / 'company_updated.json'
    with open(updated_json_file, 'w', encoding='utf-8') as f:
        json.dump(loaded_data, f, ensure_ascii=False, indent=2)
    
    print(f"更新后的文件: {updated_json_file.name}")
    
    # 5. 自定义JSON编码器
    print("\n--- 自定义JSON编码器 ---")
    
    class CustomJSONEncoder(json.JSONEncoder):
        """自定义JSON编码器"""
        
        def default(self, obj):
            if isinstance(obj, datetime):
                return obj.isoformat()
            elif isinstance(obj, date):
                return obj.isoformat()
            elif isinstance(obj, Decimal):
                return float(obj)
            elif hasattr(obj, '__dict__'):
                return obj.__dict__
            return super().default(obj)
    
    # 测试自定义编码器
    class Employee:
        def __init__(self, name, hire_date, salary):
            self.name = name
            self.hire_date = hire_date
            self.salary = salary
    
    emp_obj = Employee('测试员工', datetime.now(), Decimal('8888.88'))
    
    # 使用自定义编码器
    custom_json = json.dumps(emp_obj, cls=CustomJSONEncoder, ensure_ascii=False, indent=2)
    print(f"自定义编码器结果:")
    print(custom_json)
    
    # 6. JSON Schema验证(概念演示)
    print("\n--- JSON数据验证 ---")
    
    def validate_employee(employee_data):
        """验证员工数据格式"""
        required_fields = ['id', 'name', 'age', 'department', 'salary']
        errors = []
        
        # 检查必需字段
        for field in required_fields:
            if field not in employee_data:
                errors.append(f"缺少必需字段: {field}")
        
        # 类型检查
        if 'id' in employee_data and not isinstance(employee_data['id'], int):
            errors.append("id必须是整数")
        
        if 'name' in employee_data and not isinstance(employee_data['name'], str):
            errors.append("name必须是字符串")
        
        if 'age' in employee_data:
            age = employee_data['age']
            if not isinstance(age, int) or age < 18 or age > 65:
                errors.append("age必须是18-65之间的整数")
        
        if 'salary' in employee_data:
            salary = employee_data['salary']
            if not isinstance(salary, (int, float)) or salary < 0:
                errors.append("salary必须是非负数")
        
        return errors
    
    # 测试验证
    test_employees = [
        {'id': 1, 'name': '正常员工', 'age': 30, 'department': '技术部', 'salary': 8000},
        {'id': '2', 'name': '错误ID', 'age': 25, 'department': '销售部', 'salary': 6000},
        {'name': '缺少ID', 'age': 28, 'department': '人事部', 'salary': 7000},
        {'id': 3, 'name': '年龄错误', 'age': 16, 'department': '技术部', 'salary': 5000}
    ]
    
    for i, emp in enumerate(test_employees, 1):
        errors = validate_employee(emp)
        if errors:
            print(f"员工 {i} 验证失败:")
            for error in errors:
                print(f"  - {error}")
        else:
            print(f"员工 {i} 验证通过")
    
    # 7. JSON文件合并
    print("\n--- JSON文件合并 ---")
    
    # 创建多个JSON文件
    dept_tech = {
        'department': '技术部',
        'employees': [
            {'name': '程序员A', 'level': 'senior'},
            {'name': '程序员B', 'level': 'junior'}
        ]
    }
    
    dept_sales = {
        'department': '销售部',
        'employees': [
            {'name': '销售员A', 'level': 'manager'},
            {'name': '销售员B', 'level': 'staff'}
        ]
    }
    
    # 保存部门文件
    tech_file = test_dir / 'tech_dept.json'
    sales_file = test_dir / 'sales_dept.json'
    
    with open(tech_file, 'w', encoding='utf-8') as f:
        json.dump(dept_tech, f, ensure_ascii=False, indent=2)
    
    with open(sales_file, 'w', encoding='utf-8') as f:
        json.dump(dept_sales, f, ensure_ascii=False, indent=2)
    
    # 合并JSON文件
    def merge_json_files(*filenames):
        """合并多个JSON文件"""
        merged_data = {'departments': []}
        
        for filename in filenames:
            with open(filename, 'r', encoding='utf-8') as f:
                data = json.load(f)
                merged_data['departments'].append(data)
        
        return merged_data
    
    merged_data = merge_json_files(tech_file, sales_file)
    
    merged_file = test_dir / 'merged_departments.json'
    with open(merged_file, 'w', encoding='utf-8') as f:
        json.dump(merged_data, f, ensure_ascii=False, indent=2)
    
    print(f"合并文件: {merged_file.name}")
    print(f"合并后部门数: {len(merged_data['departments'])}")
    
    # 8. JSON性能优化
    print("\n--- JSON性能测试 ---")
    
    import time
    
    # 创建大量数据
    large_data = {
        'users': [
            {
                'id': i,
                'name': f'用户{i}',
                'email': f'user{i}@example.com',
                'data': list(range(100))  # 每个用户100个数据点
            }
            for i in range(1000)  # 1000个用户
        ]
    }
    
    large_file = test_dir / 'large_data.json'
    
    # 测试写入性能
    start_time = time.time()
    with open(large_file, 'w', encoding='utf-8') as f:
        json.dump(large_data, f)
    write_time = time.time() - start_time
    
    file_size = large_file.stat().st_size
    print(f"大文件写入:")
    print(f"  文件大小: {file_size:,} 字节 ({file_size/1024/1024:.1f} MB)")
    print(f"  写入时间: {write_time:.3f} 秒")
    
    # 测试读取性能
    start_time = time.time()
    with open(large_file, 'r', encoding='utf-8') as f:
        loaded_large_data = json.load(f)
    read_time = time.time() - start_time
    
    print(f"  读取时间: {read_time:.3f} 秒")
    print(f"  用户数量: {len(loaded_large_data['users'])}")
    
    # 清理测试文件
    import shutil
    shutil.rmtree(test_dir)
    print(f"\n测试目录已清理")

json_operations()

运行结构化数据文件处理示例:

python structured_data.py

9.3 二进制文件和高级I/O

二进制文件操作

”`python import struct import pickle import tempfile from pathlib import Path import os import hashlib

def binary_file_operations(): “”“二进制文件操作示例”“”

print("=== 二进制文件操作 ===")

# 创建测试目录
test_dir = Path(tempfile.mkdtemp())
print(f"测试目录: