学习目标
通过本章学习,你将掌握: - 文件的基本读写操作 - 文件路径处理和目录操作 - CSV和JSON文件处理 - 二进制文件操作 - 网络I/O和异步I/O - 文件系统监控和高级I/O技术
9.1 文件基本操作
文件的打开和关闭
import os
import tempfile
from pathlib import Path
# 基本文件操作
def basic_file_operations():
"""基本文件操作示例"""
print("=== 基本文件操作 ===")
# 1. 创建临时文件用于演示
temp_dir = tempfile.mkdtemp()
file_path = os.path.join(temp_dir, "example.txt")
print(f"临时文件路径: {file_path}")
# 2. 写入文件 - 基本方式
print("\n--- 写入文件 ---")
# 方式1: 手动关闭文件
file_handle = open(file_path, 'w', encoding='utf-8')
file_handle.write("Hello, World!\n")
file_handle.write("这是第二行\n")
file_handle.close()
print("文件写入完成(手动关闭)")
# 方式2: 使用with语句(推荐)
with open(file_path, 'a', encoding='utf-8') as f:
f.write("这是追加的内容\n")
f.write("支持中文字符\n")
print("文件追加完成(自动关闭)")
# 3. 读取文件
print("\n--- 读取文件 ---")
# 读取整个文件
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
print("文件全部内容:")
print(content)
# 按行读取
with open(file_path, 'r', encoding='utf-8') as f:
print("逐行读取:")
for line_num, line in enumerate(f, 1):
print(f"第{line_num}行: {line.rstrip()}")
# 读取所有行到列表
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
print(f"\n总共 {len(lines)} 行")
print(f"第一行: {lines[0].rstrip()}")
print(f"最后一行: {lines[-1].rstrip()}")
# 4. 文件模式详解
print("\n--- 文件模式 ---")
modes = {
'r': '只读模式(默认)',
'w': '写入模式(覆盖)',
'a': '追加模式',
'x': '独占创建模式',
'r+': '读写模式',
'w+': '读写模式(覆盖)',
'a+': '读写模式(追加)',
'rb': '二进制只读',
'wb': '二进制写入',
'ab': '二进制追加'
}
for mode, description in modes.items():
print(f"{mode:3s}: {description}")
# 5. 文件属性和状态
print("\n--- 文件属性 ---")
with open(file_path, 'r', encoding='utf-8') as f:
print(f"文件名: {f.name}")
print(f"文件模式: {f.mode}")
print(f"编码: {f.encoding}")
print(f"是否关闭: {f.closed}")
print(f"是否可读: {f.readable()}")
print(f"是否可写: {f.writable()}")
print(f"是否可定位: {f.seekable()}")
# 6. 文件定位操作
print("\n--- 文件定位 ---")
with open(file_path, 'r', encoding='utf-8') as f:
print(f"初始位置: {f.tell()}")
# 读取前10个字符
content = f.read(10)
print(f"读取内容: {repr(content)}")
print(f"当前位置: {f.tell()}")
# 回到文件开头
f.seek(0)
print(f"重置后位置: {f.tell()}")
# 移动到文件末尾
f.seek(0, 2) # 2表示从文件末尾开始
print(f"文件末尾位置: {f.tell()}")
# 清理临时文件
os.remove(file_path)
os.rmdir(temp_dir)
print("\n临时文件已清理")
basic_file_operations()
# 文件错误处理
def file_error_handling():
"""文件操作错误处理"""
print("\n=== 文件错误处理 ===")
def safe_file_operation(file_path, operation='read'):
"""安全的文件操作"""
try:
if operation == 'read':
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
elif operation == 'write':
with open(file_path, 'w', encoding='utf-8') as f:
f.write("测试内容")
return "写入成功"
elif operation == 'append':
with open(file_path, 'a', encoding='utf-8') as f:
f.write("追加内容\n")
return "追加成功"
except FileNotFoundError:
return f"错误: 文件不存在 - {file_path}"
except PermissionError:
return f"错误: 没有权限访问文件 - {file_path}"
except UnicodeDecodeError:
return f"错误: 文件编码错误 - {file_path}"
except UnicodeEncodeError:
return f"错误: 编码写入错误 - {file_path}"
except OSError as e:
return f"错误: 操作系统错误 - {e}"
except Exception as e:
return f"错误: 未知错误 - {e}"
# 测试各种错误情况
test_cases = [
('nonexistent.txt', 'read'), # 文件不存在
('temp_test.txt', 'write'), # 正常写入
('temp_test.txt', 'read'), # 正常读取
('temp_test.txt', 'append'), # 正常追加
]
for file_path, operation in test_cases:
result = safe_file_operation(file_path, operation)
print(f"{operation:6s} {file_path:15s}: {result}")
# 清理测试文件
try:
os.remove('temp_test.txt')
except FileNotFoundError:
pass
file_error_handling()
# 大文件处理
def large_file_handling():
"""大文件处理技术"""
print("\n=== 大文件处理 ===")
# 创建一个较大的测试文件
large_file = 'large_test.txt'
print("创建大文件...")
with open(large_file, 'w', encoding='utf-8') as f:
for i in range(10000):
f.write(f"这是第 {i+1:05d} 行,包含一些测试数据和中文字符。\n")
file_size = os.path.getsize(large_file)
print(f"文件大小: {file_size:,} 字节 ({file_size/1024:.1f} KB)")
# 1. 逐行处理(内存友好)
print("\n--- 逐行处理 ---")
line_count = 0
word_count = 0
with open(large_file, 'r', encoding='utf-8') as f:
for line in f:
line_count += 1
word_count += len(line.split())
# 只显示前几行和最后几行
if line_count <= 3 or line_count > 9997:
print(f"行 {line_count}: {line.rstrip()}")
elif line_count == 4:
print("... (省略中间行) ...")
print(f"\n总行数: {line_count:,}")
print(f"总词数: {word_count:,}")
# 2. 分块读取
print("\n--- 分块读取 ---")
chunk_size = 1024 # 1KB chunks
chunk_count = 0
total_chars = 0
with open(large_file, 'r', encoding='utf-8') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
chunk_count += 1
total_chars += len(chunk)
if chunk_count <= 3:
print(f"块 {chunk_count}: {len(chunk)} 字符")
print(f" 内容预览: {repr(chunk[:50])}...")
print(f"\n总块数: {chunk_count}")
print(f"总字符数: {total_chars:,}")
# 3. 内存映射(适用于非常大的文件)
print("\n--- 内存映射 ---")
import mmap
with open(large_file, 'r', encoding='utf-8') as f:
# 注意:mmap在Windows上可能需要特殊处理
try:
with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
# 查找特定内容
search_text = b'00100'
pos = mm.find(search_text)
if pos != -1:
print(f"找到 '{search_text.decode()}' 在位置: {pos}")
# 读取周围的内容
start = max(0, pos - 20)
end = min(len(mm), pos + 50)
context = mm[start:end].decode('utf-8', errors='ignore')
print(f"上下文: {repr(context)}")
else:
print(f"未找到 '{search_text.decode()}'")
except Exception as e:
print(f"内存映射失败: {e}")
# 4. 文件迭代器(生成器方式)
print("\n--- 生成器方式处理 ---")
def file_lines_generator(filename, encoding='utf-8'):
"""文件行生成器"""
with open(filename, 'r', encoding=encoding) as f:
for line_num, line in enumerate(f, 1):
yield line_num, line.rstrip()
def file_chunks_generator(filename, chunk_size=1024, encoding='utf-8'):
"""文件块生成器"""
with open(filename, 'r', encoding=encoding) as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
# 使用生成器处理文件
print("使用行生成器:")
line_gen = file_lines_generator(large_file)
for i, (line_num, line) in enumerate(line_gen):
if i < 3: # 只显示前3行
print(f" 行 {line_num}: {line}")
else:
break
print("\n使用块生成器:")
chunk_gen = file_chunks_generator(large_file, 512)
for i, chunk in enumerate(chunk_gen):
if i < 2: # 只显示前2块
print(f" 块 {i+1}: {len(chunk)} 字符")
else:
break
# 清理测试文件
os.remove(large_file)
print("\n大文件已清理")
large_file_handling()
文件路径处理
import os
import glob
from pathlib import Path
import shutil
import tempfile
def path_operations():
"""路径操作示例"""
print("=== 路径操作 ===")
# 1. os.path 模块
print("\n--- os.path 模块 ---")
# 路径拼接
path1 = os.path.join('home', 'user', 'documents', 'file.txt')
print(f"路径拼接: {path1}")
# 路径分解
sample_path = '/home/user/documents/report.pdf'
print(f"\n原路径: {sample_path}")
print(f"目录名: {os.path.dirname(sample_path)}")
print(f"文件名: {os.path.basename(sample_path)}")
print(f"文件名和扩展名: {os.path.splitext(sample_path)}")
print(f"路径分割: {os.path.split(sample_path)}")
# 路径信息
current_dir = os.getcwd()
print(f"\n当前目录: {current_dir}")
print(f"绝对路径: {os.path.abspath('.')}")
print(f"规范路径: {os.path.normpath('/home/user/../user/./documents')}")
# 路径检查
test_paths = [current_dir, 'nonexistent.txt', __file__]
for path in test_paths:
print(f"\n路径: {path}")
print(f" 存在: {os.path.exists(path)}")
print(f" 是文件: {os.path.isfile(path)}")
print(f" 是目录: {os.path.isdir(path)}")
print(f" 是绝对路径: {os.path.isabs(path)}")
if os.path.exists(path):
stat = os.stat(path)
print(f" 大小: {stat.st_size} 字节")
print(f" 修改时间: {stat.st_mtime}")
# 2. pathlib 模块(推荐)
print("\n--- pathlib 模块 ---")
# 创建路径对象
p = Path('home') / 'user' / 'documents' / 'file.txt'
print(f"Path对象: {p}")
print(f"类型: {type(p)}")
# 路径属性
sample_path = Path('/home/user/documents/report.pdf')
print(f"\n路径: {sample_path}")
print(f"父目录: {sample_path.parent}")
print(f"所有父目录: {list(sample_path.parents)}")
print(f"文件名: {sample_path.name}")
print(f"文件主名: {sample_path.stem}")
print(f"文件扩展名: {sample_path.suffix}")
print(f"所有扩展名: {sample_path.suffixes}")
print(f"锚点: {sample_path.anchor}")
print(f"路径部分: {sample_path.parts}")
# 路径操作
current_path = Path.cwd()
print(f"\n当前目录: {current_path}")
print(f"主目录: {Path.home()}")
print(f"绝对路径: {Path('file.txt').resolve()}")
# 路径匹配
print(f"\n路径匹配:")
test_path = Path('documents/report.pdf')
patterns = ['*.pdf', '**/*.pdf', 'documents/*', 'reports/*']
for pattern in patterns:
print(f" {test_path} 匹配 '{pattern}': {test_path.match(pattern)}")
# 3. 文件查找和通配符
print("\n--- 文件查找 ---")
# 创建测试目录结构
test_dir = Path(tempfile.mkdtemp())
print(f"测试目录: {test_dir}")
# 1. 基本二进制读写
print("\n--- 基本二进制读写 ---")
binary_file = test_dir / 'binary_data.bin'
# 写入二进制数据
data = b'\x00\x01\x02\x03\x04\x05\xFF\xFE\xFD'
with open(binary_file, 'wb') as f:
f.write(data)
print(f"写入二进制数据: {len(data)} 字节")
print(f"数据内容: {data.hex()}")
# 读取二进制数据
with open(binary_file, 'rb') as f:
read_data = f.read()
print(f"读取二进制数据: {len(read_data)} 字节")
print(f"数据内容: {read_data.hex()}")
print(f"数据相同: {data == read_data}")
# 2. struct模块 - 结构化二进制数据
print("\n--- struct模块 ---")
# 定义数据结构
# 格式: i=int(4字节), f=float(4字节), 10s=10字节字符串
format_string = 'if10s'
# 打包数据
packed_data = struct.pack(format_string, 42, 3.14159, b'Hello')
print(f"打包数据大小: {len(packed_data)} 字节")
print(f"打包数据: {packed_data.hex()}")
# 解包数据
unpacked_data = struct.unpack(format_string, packed_data)
print(f"解包数据: {unpacked_data}")
print(f"整数: {unpacked_data[0]}")
print(f"浮点数: {unpacked_data[1]}")
print(f"字符串: {unpacked_data[2].rstrip(b'\x00')}")
# 复杂数据结构
struct_file = test_dir / 'structured_data.bin'
# 员工记录结构: ID(int), 薪资(float), 姓名(20字节), 年龄(short)
employee_format = 'if20sH'
employees = [
(1, 8000.0, '张三'.encode('utf-8'), 28),
(2, 6500.0, '李四'.encode('utf-8'), 32),
(3, 7200.0, '王五'.encode('utf-8'), 25)
]
# 写入结构化数据
with open(struct_file, 'wb') as f:
for emp in employees:
packed = struct.pack(employee_format, *emp)
f.write(packed)
print(f"\n写入 {len(employees)} 个员工记录")
# 读取结构化数据
record_size = struct.calcsize(employee_format)
print(f"每条记录大小: {record_size} 字节")
with open(struct_file, 'rb') as f:
print("读取员工记录:")
while True:
data = f.read(record_size)
if not data:
break
emp_id, salary, name_bytes, age = struct.unpack(employee_format, data)
name = name_bytes.rstrip(b'\x00').decode('utf-8')
print(f" ID: {emp_id}, 姓名: {name}, 薪资: {salary}, 年龄: {age}")
# 3. pickle模块 - Python对象序列化
print("\n--- pickle序列化 ---")
# 复杂Python对象
class Employee:
def __init__(self, name, age, skills):
self.name = name
self.age = age
self.skills = skills
self.hire_date = '2023-01-01'
def __repr__(self):
return f"Employee('{self.name}', {self.age}, {self.skills})"
# 创建对象
employees_obj = [
Employee('张三', 28, ['Python', 'Django']),
Employee('李四', 32, ['Java', 'Spring']),
{'department': '技术部', 'budget': 500000}
]
pickle_file = test_dir / 'employees.pickle'
# 序列化到文件
with open(pickle_file, 'wb') as f:
pickle.dump(employees_obj, f)
print(f"序列化对象到文件: {pickle_file.name}")
print(f"文件大小: {pickle_file.stat().st_size} 字节")
# 从文件反序列化
with open(pickle_file, 'rb') as f:
loaded_employees = pickle.load(f)
print("反序列化对象:")
for obj in loaded_employees:
print(f" {obj}")
# pickle字符串操作
data_dict = {'key1': 'value1', 'key2': [1, 2, 3], 'key3': {'nested': True}}
# 序列化到字节串
pickled_bytes = pickle.dumps(data_dict)
print(f"\n序列化字节串长度: {len(pickled_bytes)}")
# 从字节串反序列化
unpickled_dict = pickle.loads(pickled_bytes)
print(f"反序列化结果: {unpickled_dict}")
# 4. 文件哈希计算
print("\n--- 文件哈希计算 ---")
def calculate_file_hash(filename, algorithm='sha256'):
"""计算文件哈希值"""
hash_obj = hashlib.new(algorithm)
with open(filename, 'rb') as f:
# 分块读取,适用于大文件
while chunk := f.read(8192):
hash_obj.update(chunk)
return hash_obj.hexdigest()
# 计算不同文件的哈希
files_to_hash = [binary_file, struct_file, pickle_file]
for file_path in files_to_hash:
if file_path.exists():
sha256_hash = calculate_file_hash(file_path, 'sha256')
md5_hash = calculate_file_hash(file_path, 'md5')
print(f"文件: {file_path.name}")
print(f" SHA256: {sha256_hash}")
print(f" MD5: {md5_hash}")
# 5. 文件分割和合并
print("\n--- 文件分割和合并 ---")
def split_file(filename, chunk_size=1024):
"""分割文件"""
file_path = Path(filename)
base_name = file_path.stem
extension = file_path.suffix
chunk_files = []
with open(file_path, 'rb') as f:
chunk_num = 0
while True:
chunk = f.read(chunk_size)
if not chunk:
break
chunk_filename = file_path.parent / f"{base_name}.part{chunk_num:03d}{extension}"
with open(chunk_filename, 'wb') as chunk_file:
chunk_file.write(chunk)
chunk_files.append(chunk_filename)
chunk_num += 1
return chunk_files
def merge_files(chunk_files, output_filename):
"""合并文件"""
with open(output_filename, 'wb') as output_file:
for chunk_file in sorted(chunk_files):
with open(chunk_file, 'rb') as f:
output_file.write(f.read())
# 创建测试文件
large_binary_file = test_dir / 'large_binary.bin'
test_data = b'A' * 5000 + b'B' * 3000 + b'C' * 2000 # 10KB数据
with open(large_binary_file, 'wb') as f:
f.write(test_data)
print(f"创建测试文件: {large_binary_file.name} ({len(test_data)} 字节)")
# 分割文件
chunk_files = split_file(large_binary_file, 2048) # 2KB chunks
print(f"文件分割为 {len(chunk_files)} 个块")
for chunk_file in chunk_files:
size = chunk_file.stat().st_size
print(f" {chunk_file.name}: {size} 字节")
# 合并文件
merged_file = test_dir / 'merged_binary.bin'
merge_files(chunk_files, merged_file)
# 验证合并结果
with open(merged_file, 'rb') as f:
merged_data = f.read()
print(f"\n合并文件: {merged_file.name} ({len(merged_data)} 字节)")
print(f"数据完整性: {test_data == merged_data}")
# 清理分割文件
for chunk_file in chunk_files:
chunk_file.unlink()
# 6. 内存映射文件
print("\n--- 内存映射文件 ---")
import mmap
# 创建大文件用于内存映射
mmap_file = test_dir / 'mmap_test.bin'
data_size = 1024 * 1024 # 1MB
# 创建文件
with open(mmap_file, 'wb') as f:
f.write(b'\x00' * data_size)
print(f"创建内存映射文件: {mmap_file.name} ({data_size} 字节)")
try:
# 内存映射读写
with open(mmap_file, 'r+b') as f:
with mmap.mmap(f.fileno(), 0) as mm:
print(f"内存映射大小: {len(mm)} 字节")
# 写入数据
mm[0:10] = b'Hello Mmap'
mm[100:110] = b'Test Data!'
# 读取数据
print(f"位置0-10: {mm[0:10]}")
print(f"位置100-110: {mm[100:110]}")
# 查找数据
pos = mm.find(b'Test')
if pos != -1:
print(f"找到'Test'在位置: {pos}")
# 刷新到磁盘
mm.flush()
except Exception as e:
print(f"内存映射操作失败: {e}")
# 清理测试文件
import shutil
shutil.rmtree(test_dir)
print(f"\n测试目录已清理")
binary_file_operations()
# 异步I/O操作
def async_io_operations():
"""异步I/O操作示例"""
print("\n=== 异步I/O操作 ===")
import asyncio
import aiofiles
import time
async def async_file_demo():
"""异步文件操作演示"""
# 创建测试目录
test_dir = Path(tempfile.mkdtemp())
print(f"异步测试目录: {test_dir}")
# 1. 异步文件写入
print("\n--- 异步文件写入 ---")
async def write_file_async(filename, content):
"""异步写入文件"""
async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
await f.write(content)
return filename
# 并发写入多个文件
tasks = []
for i in range(5):
filename = test_dir / f'async_file_{i}.txt'
content = f'这是异步文件 {i}\n' * 100
tasks.append(write_file_async(filename, content))
start_time = time.time()
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"并发写入 {len(results)} 个文件")
print(f"耗时: {end_time - start_time:.3f} 秒")
# 2. 异步文件读取
print("\n--- 异步文件读取 ---")
async def read_file_async(filename):
"""异步读取文件"""
async with aiofiles.open(filename, 'r', encoding='utf-8') as f:
content = await f.read()
return len(content)
# 并发读取文件
read_tasks = [read_file_async(result) for result in results]
start_time = time.time()
file_sizes = await asyncio.gather(*read_tasks)
end_time = time.time()
print(f"并发读取 {len(file_sizes)} 个文件")
print(f"文件大小: {file_sizes}")
print(f"耗时: {end_time - start_time:.3f} 秒")
# 3. 异步文件处理
print("\n--- 异步文件处理 ---")
async def process_file_async(filename):
"""异步处理文件"""
async with aiofiles.open(filename, 'r', encoding='utf-8') as f:
lines = await f.readlines()
# 模拟处理时间
await asyncio.sleep(0.1)
# 统计信息
line_count = len(lines)
word_count = sum(len(line.split()) for line in lines)
char_count = sum(len(line) for line in lines)
return {
'filename': filename.name,
'lines': line_count,
'words': word_count,
'chars': char_count
}
# 并发处理文件
process_tasks = [process_file_async(Path(result)) for result in results]
start_time = time.time()
stats = await asyncio.gather(*process_tasks)
end_time = time.time()
print(f"并发处理 {len(stats)} 个文件")
for stat in stats:
print(f" {stat['filename']}: {stat['lines']}行, {stat['words']}词, {stat['chars']}字符")
print(f"耗时: {end_time - start_time:.3f} 秒")
# 清理测试文件
import shutil
shutil.rmtree(test_dir)
print(f"\n异步测试目录已清理")
# 运行异步演示
try:
# 检查是否安装了aiofiles
import aiofiles
asyncio.run(async_file_demo())
except ImportError:
print("需要安装aiofiles: pip install aiofiles")
print("跳过异步I/O演示")
except Exception as e:
print(f"异步I/O演示失败: {e}")
async_io_operations()
运行二进制文件操作示例:
python binary_io.py
9.4 网络I/O和文件监控
网络文件操作
import urllib.request
import urllib.parse
import urllib.error
import requests
import tempfile
from pathlib import Path
import time
def network_io_operations():
"""网络I/O操作示例"""
print("=== 网络I/O操作 ===")
# 创建测试目录
test_dir = Path(tempfile.mkdtemp())
print(f"测试目录: {test_dir}")
# 1. urllib下载文件
print("\n--- urllib下载文件 ---")
def download_with_urllib(url, filename):
"""使用urllib下载文件"""
try:
print(f"下载: {url}")
# 创建请求对象
req = urllib.request.Request(url)
req.add_header('User-Agent', 'Python/urllib')
# 下载文件
with urllib.request.urlopen(req) as response:
# 获取文件信息
content_length = response.headers.get('Content-Length')
content_type = response.headers.get('Content-Type')
print(f" 内容类型: {content_type}")
print(f" 文件大小: {content_length} 字节")
# 保存文件
with open(filename, 'wb') as f:
while True:
chunk = response.read(8192)
if not chunk:
break
f.write(chunk)
return True
except urllib.error.URLError as e:
print(f" 下载失败: {e}")
return False
except Exception as e:
print(f" 错误: {e}")
return False
# 测试下载(使用一个小的测试文件)
test_url = "https://httpbin.org/json"
urllib_file = test_dir / "urllib_download.json"
if download_with_urllib(test_url, urllib_file):
print(f" 下载成功: {urllib_file.name}")
print(f" 文件大小: {urllib_file.stat().st_size} 字节")
# 2. requests下载文件(推荐)
print("\n--- requests下载文件 ---")
def download_with_requests(url, filename, chunk_size=8192):
"""使用requests下载文件"""
try:
print(f"下载: {url}")
# 发送请求
response = requests.get(url, stream=True)
response.raise_for_status()
# 获取文件信息
content_length = response.headers.get('Content-Length')
content_type = response.headers.get('Content-Type')
print(f" 状态码: {response.status_code}")
print(f" 内容类型: {content_type}")
print(f" 文件大小: {content_length} 字节")
# 下载文件
downloaded = 0
with open(filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
# 显示进度(如果知道文件大小)
if content_length:
progress = (downloaded / int(content_length)) * 100
print(f"\r 进度: {progress:.1f}%", end='', flush=True)
print() # 换行
return True
except requests.RequestException as e:
print(f" 下载失败: {e}")
return False
except Exception as e:
print(f" 错误: {e}")
return False
# 测试requests下载
requests_file = test_dir / "requests_download.json"
try:
if download_with_requests(test_url, requests_file):
print(f" 下载成功: {requests_file.name}")
print(f" 文件大小: {requests_file.stat().st_size} 字节")
except ImportError:
print(" 需要安装requests: pip install requests")
# 3. 文件上传
print("\n--- 文件上传 ---")
def upload_file(url, filename, field_name='file'):
"""上传文件"""
try:
with open(filename, 'rb') as f:
files = {field_name: f}
response = requests.post(url, files=files)
response.raise_for_status()
print(f"上传成功: {response.status_code}")
return response.json() if response.headers.get('Content-Type', '').startswith('application/json') else response.text
except requests.RequestException as e:
print(f"上传失败: {e}")
return None
except Exception as e:
print(f"错误: {e}")
return None
# 创建测试文件用于上传
upload_test_file = test_dir / "upload_test.txt"
upload_test_file.write_text("这是一个测试上传文件\n包含一些中文内容")
# 测试上传(使用httpbin.org的测试端点)
upload_url = "https://httpbin.org/post"
try:
result = upload_file(upload_url, upload_test_file)
if result:
print(f"上传响应: {type(result)}")
except:
print("上传测试跳过(网络问题)")
# 4. FTP操作
print("\n--- FTP操作 ---")
from ftplib import FTP
def ftp_operations_demo():
"""FTP操作演示"""
print("FTP操作演示(需要FTP服务器)")
# 这里只演示代码结构,实际需要FTP服务器
ftp_config = {
'host': 'ftp.example.com',
'user': 'username',
'password': 'password'
}
print("FTP连接示例代码:")
print(f"""try:
ftp = FTP('{ftp_config['host']}')
ftp.login('{ftp_config['user']}', '{ftp_config['password']}')
# 列出目录
files = ftp.nlst()
print(f"FTP文件列表: {{files}}")
# 下载文件
with open('downloaded_file.txt', 'wb') as f:
ftp.retrbinary('RETR remote_file.txt', f.write)
# 上传文件
with open('local_file.txt', 'rb') as f:
ftp.storbinary('STOR remote_file.txt', f)
ftp.quit()
except Exception as e:
print(f"FTP操作失败: {{e}}")
""")
ftp_operations_demo()
# 清理测试文件
import shutil
shutil.rmtree(test_dir)
print(f"\n测试目录已清理")
network_io_operations()
# 文件系统监控
def file_system_monitoring():
"""文件系统监控示例"""
print("\n=== 文件系统监控 ===")
# 创建测试目录
test_dir = Path(tempfile.mkdtemp())
print(f"监控测试目录: {test_dir}")
# 1. 基本文件监控(轮询方式)
print("\n--- 基本文件监控 ---")
def simple_file_monitor(directory, interval=1, duration=10):
"""简单的文件监控(轮询方式)"""
print(f"开始监控目录: {directory}")
print(f"监控间隔: {interval}秒, 持续时间: {duration}秒")
# 记录初始状态
initial_files = {}
if directory.exists():
for file_path in directory.rglob('*'):
if file_path.is_file():
stat = file_path.stat()
initial_files[str(file_path)] = {
'size': stat.st_size,
'mtime': stat.st_mtime
}
print(f"初始文件数: {len(initial_files)}")
start_time = time.time()
while time.time() - start_time < duration:
time.sleep(interval)
current_files = {}
if directory.exists():
for file_path in directory.rglob('*'):
if file_path.is_file():
stat = file_path.stat()
current_files[str(file_path)] = {
'size': stat.st_size,
'mtime': stat.st_mtime
}
# 检查变化
# 新文件
new_files = set(current_files.keys()) - set(initial_files.keys())
for file_path in new_files:
print(f" [新建] {Path(file_path).name}")
# 删除的文件
deleted_files = set(initial_files.keys()) - set(current_files.keys())
for file_path in deleted_files:
print(f" [删除] {Path(file_path).name}")
# 修改的文件
for file_path in current_files:
if file_path in initial_files:
current = current_files[file_path]
initial = initial_files[file_path]
if (current['size'] != initial['size'] or
current['mtime'] != initial['mtime']):
print(f" [修改] {Path(file_path).name}")
# 更新状态
initial_files = current_files
print("监控结束")
# 2. 使用watchdog库进行高级监控
print("\n--- 高级文件监控 ---")
try:
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class FileChangeHandler(FileSystemEventHandler):
"""文件变化处理器"""
def on_modified(self, event):
if not event.is_directory:
print(f" [修改] {Path(event.src_path).name}")
def on_created(self, event):
if not event.is_directory:
print(f" [创建] {Path(event.src_path).name}")
def on_deleted(self, event):
if not event.is_directory:
print(f" [删除] {Path(event.src_path).name}")
def on_moved(self, event):
if not event.is_directory:
src_name = Path(event.src_path).name
dest_name = Path(event.dest_path).name
print(f" [移动] {src_name} -> {dest_name}")
def advanced_file_monitor(directory, duration=10):
"""高级文件监控"""
print(f"开始高级监控: {directory}")
event_handler = FileChangeHandler()
observer = Observer()
observer.schedule(event_handler, str(directory), recursive=True)
observer.start()
try:
time.sleep(duration)
except KeyboardInterrupt:
pass
finally:
observer.stop()
observer.join()
print("高级监控结束")
# 启动监控(在后台)
import threading
def run_monitor():
advanced_file_monitor(test_dir, 5)
monitor_thread = threading.Thread(target=run_monitor)
monitor_thread.start()
# 模拟文件操作
print("\n模拟文件操作:")
time.sleep(1)
# 创建文件
test_file1 = test_dir / "test1.txt"
test_file1.write_text("测试文件1")
print("创建 test1.txt")
time.sleep(1)
# 修改文件
test_file1.write_text("修改后的测试文件1")
print("修改 test1.txt")
time.sleep(1)
# 创建另一个文件
test_file2 = test_dir / "test2.txt"
test_file2.write_text("测试文件2")
print("创建 test2.txt")
time.sleep(1)
# 移动文件
test_file3 = test_dir / "test3.txt"
test_file2.rename(test_file3)
print("重命名 test2.txt -> test3.txt")
time.sleep(1)
# 删除文件
test_file1.unlink()
print("删除 test1.txt")
# 等待监控线程结束
monitor_thread.join()
except ImportError:
print("需要安装watchdog: pip install watchdog")
print("使用简单监控代替...")
# 启动简单监控
import threading
def run_simple_monitor():
simple_file_monitor(test_dir, 1, 5)
monitor_thread = threading.Thread(target=run_simple_monitor)
monitor_thread.start()
# 模拟文件操作
print("\n模拟文件操作:")
time.sleep(2)
test_file = test_dir / "simple_test.txt"
test_file.write_text("简单测试")
print("创建文件")
time.sleep(2)
test_file.write_text("修改内容")
print("修改文件")
monitor_thread.join()
# 清理测试目录
import shutil
shutil.rmtree(test_dir)
print(f"\n监控测试目录已清理")
file_system_monitoring()
运行网络I/O和文件监控示例:
python network_monitoring.py
本章小结
本章我们全面学习了Python的文件操作和I/O:
- 文件基本操作:文件读写、路径处理、目录操作、错误处理
- 结构化数据:CSV和JSON文件的处理和操作
- 二进制文件:struct模块、pickle序列化、文件哈希、内存映射
- 高级I/O:异步I/O、网络文件操作、文件系统监控
下一章预告
下一章我们将学习《标准库和第三方库》,内容包括: - 常用标准库详解 - 第三方库的安装和使用 - 包管理和虚拟环境 - 库的选择和最佳实践
练习题
基础练习
文件操作:
- 实现一个文件备份工具
- 创建一个日志文件分析器
- 编写一个文件同步程序
数据处理:
- 实现CSV数据的增删改查
- 创建JSON配置文件管理器
- 编写数据格式转换工具
进阶练习
高级I/O:
- 实现一个文件下载管理器
- 创建一个实时文件监控系统
- 编写一个分布式文件同步工具
性能优化:
- 实现大文件的高效处理
- 创建一个文件压缩工具
- 编写一个文件去重系统
提示:文件I/O是程序与外部世界交互的重要方式。掌握各种文件操作技术,能让你的程序更加实用和强大。注意处理异常情况和性能优化。 {test_dir}“)
# 创建测试文件
test_files = [
'file1.txt', 'file2.txt', 'document.pdf',
'subdir/file3.txt', 'subdir/image.jpg',
'subdir/nested/file4.txt'
]
for file_path in test_files:
full_path = test_dir / file_path
full_path.parent.mkdir(parents=True, exist_ok=True)
full_path.write_text(f"内容: {file_path}")
# 使用glob查找文件
print("\nglob查找:")
os.chdir(test_dir)
patterns = ['*.txt', '*.pdf', '**/*.txt', 'subdir/*', '**/file*']
for pattern in patterns:
matches = glob.glob(pattern, recursive=True)
print(f" '{pattern}': {matches}")
# 使用pathlib查找文件
print("\npathlib查找:")
# 查找所有.txt文件
txt_files = list(test_dir.glob('**/*.txt'))
print(f"所有.txt文件: {[str(f.relative_to(test_dir)) for f in txt_files]}")
# 查找特定模式的文件
file_pattern = list(test_dir.glob('**/file*'))
print(f"以'file'开头的文件: {[str(f.relative_to(test_dir)) for f in file_pattern]}")
# 递归查找所有文件
all_files = [f for f in test_dir.rglob('*') if f.is_file()]
print(f"所有文件: {[str(f.relative_to(test_dir)) for f in all_files]}")
# 4. 目录操作
print("\n--- 目录操作 ---")
# 创建目录
new_dir = test_dir / 'new_directory' / 'nested'
new_dir.mkdir(parents=True, exist_ok=True)
print(f"创建目录: {new_dir.relative_to(test_dir)}")
# 列出目录内容
print(f"\n目录内容 ({test_dir}):")
for item in test_dir.iterdir():
item_type = "目录" if item.is_dir() else "文件"
size = item.stat().st_size if item.is_file() else "-"
print(f" {item_type}: {item.name} ({size} 字节)")
# 递归列出所有内容
print(f"\n递归目录内容:")
for item in test_dir.rglob('*'):
level = len(item.relative_to(test_dir).parts)
indent = " " * level
item_type = "📁" if item.is_dir() else "📄"
print(f"{indent}{item_type} {item.name}")
# 目录统计
file_count = sum(1 for f in test_dir.rglob('*') if f.is_file())
dir_count = sum(1 for d in test_dir.rglob('*') if d.is_dir())
total_size = sum(f.stat().st_size for f in test_dir.rglob('*') if f.is_file())
print(f"\n目录统计:")
print(f" 文件数: {file_count}")
print(f" 目录数: {dir_count}")
print(f" 总大小: {total_size} 字节")
# 清理测试目录
os.chdir(Path.cwd().parent) # 退出测试目录
shutil.rmtree(test_dir)
print(f"\n测试目录已清理")
path_operations()
文件和目录管理
def file_directory_management(): “”“文件和目录管理”“”
print("\n=== 文件和目录管理 ===")
# 创建测试环境
base_dir = Path(tempfile.mkdtemp())
print(f"测试目录: {base_dir}")
# 1. 文件复制和移动
print("\n--- 文件复制和移动 ---")
# 创建源文件
source_file = base_dir / 'source.txt'
source_file.write_text('这是源文件的内容\n包含多行文本\n')
# 复制文件
dest_file = base_dir / 'destination.txt'
shutil.copy2(source_file, dest_file)
print(f"复制文件: {source_file.name} -> {dest_file.name}")
# 验证复制
print(f"源文件大小: {source_file.stat().st_size}")
print(f"目标文件大小: {dest_file.stat().st_size}")
print(f"内容相同: {source_file.read_text() == dest_file.read_text()}")
# 移动文件
moved_file = base_dir / 'moved.txt'
shutil.move(dest_file, moved_file)
print(f"移动文件: {dest_file.name} -> {moved_file.name}")
print(f"原文件存在: {dest_file.exists()}")
print(f"新文件存在: {moved_file.exists()}")
# 2. 目录复制
print("\n--- 目录复制 ---")
# 创建源目录结构
source_dir = base_dir / 'source_dir'
source_dir.mkdir()
(source_dir / 'file1.txt').write_text('文件1内容')
(source_dir / 'file2.txt').write_text('文件2内容')
subdir = source_dir / 'subdir'
subdir.mkdir()
(subdir / 'file3.txt').write_text('文件3内容')
# 复制整个目录
dest_dir = base_dir / 'dest_dir'
shutil.copytree(source_dir, dest_dir)
print(f"复制目录: {source_dir.name} -> {dest_dir.name}")
# 验证目录复制
def count_files(directory):
return sum(1 for f in directory.rglob('*') if f.is_file())
source_files = count_files(source_dir)
dest_files = count_files(dest_dir)
print(f"源目录文件数: {source_files}")
print(f"目标目录文件数: {dest_files}")
# 3. 文件权限和属性
print("\n--- 文件权限和属性 ---")
test_file = base_dir / 'test_permissions.txt'
test_file.write_text('测试权限的文件')
# 获取文件状态
stat = test_file.stat()
print(f"文件: {test_file.name}")
print(f"大小: {stat.st_size} 字节")
print(f"权限: {oct(stat.st_mode)}")
print(f"创建时间: {stat.st_ctime}")
print(f"修改时间: {stat.st_mtime}")
print(f"访问时间: {stat.st_atime}")
# 修改文件权限(Unix/Linux系统)
try:
import stat as stat_module
# 设置为只读
test_file.chmod(stat_module.S_IREAD)
print(f"设置为只读: {oct(test_file.stat().st_mode)}")
# 恢复读写权限
test_file.chmod(stat_module.S_IREAD | stat_module.S_IWRITE)
print(f"恢复读写: {oct(test_file.stat().st_mode)}")
except Exception as e:
print(f"权限修改失败(可能是Windows系统): {e}")
# 4. 文件比较
print("\n--- 文件比较 ---")
import filecmp
# 创建比较文件
file1 = base_dir / 'compare1.txt'
file2 = base_dir / 'compare2.txt'
file3 = base_dir / 'compare3.txt'
content = '相同的内容\n第二行\n'
file1.write_text(content)
file2.write_text(content)
file3.write_text('不同的内容\n')
# 比较文件
print(f"file1 == file2: {filecmp.cmp(file1, file2)}")
print(f"file1 == file3: {filecmp.cmp(file1, file3)}")
# 比较目录
comparison = filecmp.dircmp(source_dir, dest_dir)
print(f"\n目录比较:")
print(f"相同文件: {comparison.same_files}")
print(f"不同文件: {comparison.diff_files}")
print(f"仅在左侧: {comparison.left_only}")
print(f"仅在右侧: {comparison.right_only}")
# 5. 临时文件和目录
print("\n--- 临时文件和目录 ---")
# 临时文件
with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.txt') as temp_file:
temp_file.write('临时文件内容')
temp_path = temp_file.name
print(f"临时文件: {temp_path}")
# 读取临时文件
with open(temp_path, 'r') as f:
content = f.read()
print(f"临时文件内容: {content}")
# 删除临时文件
os.unlink(temp_path)
print(f"临时文件已删除")
# 临时目录
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
print(f"临时目录: {temp_path}")
# 在临时目录中创建文件
(temp_path / 'temp_file.txt').write_text('临时目录中的文件')
print(f"临时目录文件数: {len(list(temp_path.iterdir()))}")
print("临时目录已自动清理")
# 清理测试环境
shutil.rmtree(base_dir)
print(f"\n测试环境已清理")
file_directory_management()
运行文件基本操作示例:
```bash
python file_basics.py
9.2 结构化数据文件处理
CSV文件处理
import csv
import io
from pathlib import Path
import tempfile
import os
def csv_operations():
"""CSV文件操作示例"""
print("=== CSV文件操作 ===")
# 创建测试目录
test_dir = Path(tempfile.mkdtemp())
print(f"测试目录: {test_dir}")
# 1. 基本CSV写入
print("\n--- CSV写入 ---")
csv_file = test_dir / 'employees.csv'
# 准备数据
employees = [
['姓名', '年龄', '部门', '薪资'],
['张三', 28, '技术部', 8000],
['李四', 32, '销售部', 6500],
['王五', 25, '技术部', 7200],
['赵六', 35, '人事部', 7800],
['钱七', 29, '技术部', 8500]
]
# 写入CSV文件
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerows(employees)
print(f"CSV文件已创建: {csv_file.name}")
print(f"文件大小: {csv_file.stat().st_size} 字节")
# 2. 基本CSV读取
print("\n--- CSV读取 ---")
with open(csv_file, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
print("逐行读取:")
for row_num, row in enumerate(reader, 1):
print(f" 第{row_num}行: {row}")
# 3. 字典方式处理CSV
print("\n--- 字典方式处理 ---")
# 使用DictWriter写入
dict_csv_file = test_dir / 'employees_dict.csv'
employees_dict = [
{'姓名': '张三', '年龄': 28, '部门': '技术部', '薪资': 8000},
{'姓名': '李四', '年龄': 32, '部门': '销售部', '薪资': 6500},
{'姓名': '王五', '年龄': 25, '部门': '技术部', '薪资': 7200},
{'姓名': '赵六', '年龄': 35, '部门': '人事部', '薪资': 7800},
{'姓名': '钱七', '年龄': 29, '部门': '技术部', '薪资': 8500}
]
fieldnames = ['姓名', '年龄', '部门', '薪资']
with open(dict_csv_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader() # 写入表头
writer.writerows(employees_dict)
print(f"字典CSV文件已创建: {dict_csv_file.name}")
# 使用DictReader读取
with open(dict_csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
print(f"\n字段名: {reader.fieldnames}")
print("字典方式读取:")
for row_num, row in enumerate(reader, 1):
print(f" 员工{row_num}: {dict(row)}")
# 4. CSV数据分析
print("\n--- CSV数据分析 ---")
def analyze_csv(filename):
"""分析CSV文件"""
with open(filename, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
# 收集数据
data = list(reader)
print(f"数据分析 - {filename.name}:")
print(f" 总记录数: {len(data)}")
print(f" 字段数: {len(reader.fieldnames)}")
print(f" 字段名: {reader.fieldnames}")
# 部门统计
departments = {}
total_salary = 0
ages = []
for row in data:
dept = row['部门']
departments[dept] = departments.get(dept, 0) + 1
total_salary += int(row['薪资'])
ages.append(int(row['年龄']))
print(f" 部门分布: {departments}")
print(f" 平均薪资: {total_salary / len(data):.0f}")
print(f" 平均年龄: {sum(ages) / len(ages):.1f}")
print(f" 年龄范围: {min(ages)} - {max(ages)}")
return data
data = analyze_csv(dict_csv_file)
# 5. CSV数据过滤和处理
print("\n--- CSV数据过滤 ---")
# 过滤技术部员工
tech_employees = [emp for emp in data if emp['部门'] == '技术部']
tech_csv_file = test_dir / 'tech_employees.csv'
with open(tech_csv_file, 'w', newline='', encoding='utf-8') as f:
if tech_employees:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(tech_employees)
print(f"技术部员工文件: {tech_csv_file.name}")
print(f"技术部员工数: {len(tech_employees)}")
# 薪资排序
sorted_employees = sorted(data, key=lambda x: int(x['薪资']), reverse=True)
sorted_csv_file = test_dir / 'employees_sorted.csv'
with open(sorted_csv_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(sorted_employees)
print(f"薪资排序文件: {sorted_csv_file.name}")
print("薪资排序(前3名):")
for i, emp in enumerate(sorted_employees[:3], 1):
print(f" {i}. {emp['姓名']}: {emp['薪资']}")
# 6. 处理特殊CSV格式
print("\n--- 特殊CSV格式 ---")
# 自定义分隔符和引用符
special_csv = test_dir / 'special.csv'
special_data = [
['产品名称', '价格', '描述'],
['iPhone 13', '6999', '苹果手机,包含"高级"功能'],
['MacBook Pro', '12999', '专业笔记本;适合开发'],
['iPad Air', '4399', '平板电脑,轻薄便携']
]
# 使用分号作为分隔符
with open(special_csv, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f, delimiter=';', quotechar='"', quoting=csv.QUOTE_MINIMAL)
writer.writerows(special_data)
print(f"特殊格式CSV: {special_csv.name}")
# 读取特殊格式
with open(special_csv, 'r', encoding='utf-8') as f:
reader = csv.reader(f, delimiter=';')
print("特殊格式读取:")
for row in reader:
print(f" {row}")
# 7. CSV错误处理
print("\n--- CSV错误处理 ---")
def safe_csv_read(filename, encoding='utf-8'):
"""安全读取CSV文件"""
try:
with open(filename, 'r', encoding=encoding) as f:
# 检测方言
sample = f.read(1024)
f.seek(0)
sniffer = csv.Sniffer()
dialect = sniffer.sniff(sample)
print(f"检测到的CSV方言:")
print(f" 分隔符: {repr(dialect.delimiter)}")
print(f" 引用符: {repr(dialect.quotechar)}")
print(f" 行终止符: {repr(dialect.lineterminator)}")
reader = csv.reader(f, dialect)
data = list(reader)
return data
except UnicodeDecodeError:
print(f"编码错误,尝试其他编码")
for encoding in ['gbk', 'latin1']:
try:
return safe_csv_read(filename, encoding)
except:
continue
raise
except csv.Error as e:
print(f"CSV格式错误: {e}")
return None
except Exception as e:
print(f"读取错误: {e}")
return None
# 测试错误处理
data = safe_csv_read(special_csv)
if data:
print(f"成功读取 {len(data)} 行数据")
# 8. 内存中的CSV操作
print("\n--- 内存CSV操作 ---")
# 使用StringIO在内存中处理CSV
csv_string = """姓名,年龄,城市
张三,25,北京
李四,30,上海
王五,28,广州"""
# 从字符串读取CSV
csv_io = io.StringIO(csv_string)
reader = csv.DictReader(csv_io)
print("从字符串读取CSV:")
for row in reader:
print(f" {dict(row)}")
# 写入到字符串
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=['姓名', '年龄', '城市'])
writer.writeheader()
writer.writerow({'姓名': '赵六', '年龄': 35, '城市': '深圳'})
csv_output = output.getvalue()
print(f"\n写入到字符串:")
print(csv_output)
# 清理测试文件
import shutil
shutil.rmtree(test_dir)
print(f"\n测试目录已清理")
csv_operations()
JSON文件处理
import json
import tempfile
from pathlib import Path
from datetime import datetime, date
from decimal import Decimal
def json_operations():
"""JSON文件操作示例"""
print("=== JSON文件操作 ===")
# 创建测试目录
test_dir = Path(tempfile.mkdtemp())
print(f"测试目录: {test_dir}")
# 1. 基本JSON操作
print("\n--- 基本JSON操作 ---")
# 准备数据
data = {
'company': '科技有限公司',
'employees': [
{
'id': 1,
'name': '张三',
'age': 28,
'department': '技术部',
'salary': 8000,
'skills': ['Python', 'JavaScript', 'SQL'],
'is_active': True,
'hire_date': '2020-01-15'
},
{
'id': 2,
'name': '李四',
'age': 32,
'department': '销售部',
'salary': 6500,
'skills': ['销售', '客户管理'],
'is_active': True,
'hire_date': '2019-03-20'
},
{
'id': 3,
'name': '王五',
'age': 25,
'department': '技术部',
'salary': 7200,
'skills': ['Java', 'Spring', 'MySQL'],
'is_active': False,
'hire_date': '2021-06-10'
}
],
'departments': {
'技术部': {'budget': 500000, 'head': '张三'},
'销售部': {'budget': 300000, 'head': '李四'},
'人事部': {'budget': 200000, 'head': '赵六'}
},
'metadata': {
'created_at': '2023-01-01T00:00:00',
'version': '1.0',
'total_employees': 3
}
}
# 写入JSON文件
json_file = test_dir / 'company.json'
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"JSON文件已创建: {json_file.name}")
print(f"文件大小: {json_file.stat().st_size} 字节")
# 读取JSON文件
with open(json_file, 'r', encoding='utf-8') as f:
loaded_data = json.load(f)
print(f"\n读取的数据类型: {type(loaded_data)}")
print(f"公司名称: {loaded_data['company']}")
print(f"员工数量: {len(loaded_data['employees'])}")
# 2. JSON字符串操作
print("\n--- JSON字符串操作 ---")
# 对象转JSON字符串
employee = loaded_data['employees'][0]
json_string = json.dumps(employee, ensure_ascii=False, indent=2)
print(f"员工JSON字符串:")
print(json_string)
# JSON字符串转对象
parsed_employee = json.loads(json_string)
print(f"\n解析后的员工: {parsed_employee['name']}")
# 3. JSON数据查询和操作
print("\n--- JSON数据查询 ---")
def query_employees(data, **filters):
"""查询员工数据"""
employees = data['employees']
results = []
for emp in employees:
match = True
for key, value in filters.items():
if key not in emp or emp[key] != value:
match = False
break
if match:
results.append(emp)
return results
# 查询技术部员工
tech_employees = query_employees(loaded_data, department='技术部')
print(f"技术部员工数: {len(tech_employees)}")
for emp in tech_employees:
print(f" {emp['name']}: {emp['skills']}")
# 查询在职员工
active_employees = query_employees(loaded_data, is_active=True)
print(f"\n在职员工数: {len(active_employees)}")
# 薪资统计
salaries = [emp['salary'] for emp in loaded_data['employees']]
print(f"\n薪资统计:")
print(f" 平均薪资: {sum(salaries) / len(salaries):.0f}")
print(f" 最高薪资: {max(salaries)}")
print(f" 最低薪资: {min(salaries)}")
# 4. JSON数据修改
print("\n--- JSON数据修改 ---")
# 添加新员工
new_employee = {
'id': 4,
'name': '赵六',
'age': 35,
'department': '人事部',
'salary': 7800,
'skills': ['招聘', '培训', '绩效管理'],
'is_active': True,
'hire_date': '2018-09-01'
}
loaded_data['employees'].append(new_employee)
loaded_data['metadata']['total_employees'] = len(loaded_data['employees'])
print(f"添加新员工: {new_employee['name']}")
print(f"总员工数: {loaded_data['metadata']['total_employees']}")
# 更新员工信息
for emp in loaded_data['employees']:
if emp['name'] == '张三':
emp['salary'] = 8500 # 加薪
emp['skills'].append('Docker') # 新技能
print(f"更新员工 {emp['name']}: 薪资 {emp['salary']}, 技能 {emp['skills']}")
break
# 删除离职员工
loaded_data['employees'] = [emp for emp in loaded_data['employees'] if emp['is_active']]
loaded_data['metadata']['total_employees'] = len(loaded_data['employees'])
print(f"删除离职员工后,总员工数: {loaded_data['metadata']['total_employees']}")
# 保存修改后的数据
updated_json_file = test_dir / 'company_updated.json'
with open(updated_json_file, 'w', encoding='utf-8') as f:
json.dump(loaded_data, f, ensure_ascii=False, indent=2)
print(f"更新后的文件: {updated_json_file.name}")
# 5. 自定义JSON编码器
print("\n--- 自定义JSON编码器 ---")
class CustomJSONEncoder(json.JSONEncoder):
"""自定义JSON编码器"""
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, date):
return obj.isoformat()
elif isinstance(obj, Decimal):
return float(obj)
elif hasattr(obj, '__dict__'):
return obj.__dict__
return super().default(obj)
# 测试自定义编码器
class Employee:
def __init__(self, name, hire_date, salary):
self.name = name
self.hire_date = hire_date
self.salary = salary
emp_obj = Employee('测试员工', datetime.now(), Decimal('8888.88'))
# 使用自定义编码器
custom_json = json.dumps(emp_obj, cls=CustomJSONEncoder, ensure_ascii=False, indent=2)
print(f"自定义编码器结果:")
print(custom_json)
# 6. JSON Schema验证(概念演示)
print("\n--- JSON数据验证 ---")
def validate_employee(employee_data):
"""验证员工数据格式"""
required_fields = ['id', 'name', 'age', 'department', 'salary']
errors = []
# 检查必需字段
for field in required_fields:
if field not in employee_data:
errors.append(f"缺少必需字段: {field}")
# 类型检查
if 'id' in employee_data and not isinstance(employee_data['id'], int):
errors.append("id必须是整数")
if 'name' in employee_data and not isinstance(employee_data['name'], str):
errors.append("name必须是字符串")
if 'age' in employee_data:
age = employee_data['age']
if not isinstance(age, int) or age < 18 or age > 65:
errors.append("age必须是18-65之间的整数")
if 'salary' in employee_data:
salary = employee_data['salary']
if not isinstance(salary, (int, float)) or salary < 0:
errors.append("salary必须是非负数")
return errors
# 测试验证
test_employees = [
{'id': 1, 'name': '正常员工', 'age': 30, 'department': '技术部', 'salary': 8000},
{'id': '2', 'name': '错误ID', 'age': 25, 'department': '销售部', 'salary': 6000},
{'name': '缺少ID', 'age': 28, 'department': '人事部', 'salary': 7000},
{'id': 3, 'name': '年龄错误', 'age': 16, 'department': '技术部', 'salary': 5000}
]
for i, emp in enumerate(test_employees, 1):
errors = validate_employee(emp)
if errors:
print(f"员工 {i} 验证失败:")
for error in errors:
print(f" - {error}")
else:
print(f"员工 {i} 验证通过")
# 7. JSON文件合并
print("\n--- JSON文件合并 ---")
# 创建多个JSON文件
dept_tech = {
'department': '技术部',
'employees': [
{'name': '程序员A', 'level': 'senior'},
{'name': '程序员B', 'level': 'junior'}
]
}
dept_sales = {
'department': '销售部',
'employees': [
{'name': '销售员A', 'level': 'manager'},
{'name': '销售员B', 'level': 'staff'}
]
}
# 保存部门文件
tech_file = test_dir / 'tech_dept.json'
sales_file = test_dir / 'sales_dept.json'
with open(tech_file, 'w', encoding='utf-8') as f:
json.dump(dept_tech, f, ensure_ascii=False, indent=2)
with open(sales_file, 'w', encoding='utf-8') as f:
json.dump(dept_sales, f, ensure_ascii=False, indent=2)
# 合并JSON文件
def merge_json_files(*filenames):
"""合并多个JSON文件"""
merged_data = {'departments': []}
for filename in filenames:
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
merged_data['departments'].append(data)
return merged_data
merged_data = merge_json_files(tech_file, sales_file)
merged_file = test_dir / 'merged_departments.json'
with open(merged_file, 'w', encoding='utf-8') as f:
json.dump(merged_data, f, ensure_ascii=False, indent=2)
print(f"合并文件: {merged_file.name}")
print(f"合并后部门数: {len(merged_data['departments'])}")
# 8. JSON性能优化
print("\n--- JSON性能测试 ---")
import time
# 创建大量数据
large_data = {
'users': [
{
'id': i,
'name': f'用户{i}',
'email': f'user{i}@example.com',
'data': list(range(100)) # 每个用户100个数据点
}
for i in range(1000) # 1000个用户
]
}
large_file = test_dir / 'large_data.json'
# 测试写入性能
start_time = time.time()
with open(large_file, 'w', encoding='utf-8') as f:
json.dump(large_data, f)
write_time = time.time() - start_time
file_size = large_file.stat().st_size
print(f"大文件写入:")
print(f" 文件大小: {file_size:,} 字节 ({file_size/1024/1024:.1f} MB)")
print(f" 写入时间: {write_time:.3f} 秒")
# 测试读取性能
start_time = time.time()
with open(large_file, 'r', encoding='utf-8') as f:
loaded_large_data = json.load(f)
read_time = time.time() - start_time
print(f" 读取时间: {read_time:.3f} 秒")
print(f" 用户数量: {len(loaded_large_data['users'])}")
# 清理测试文件
import shutil
shutil.rmtree(test_dir)
print(f"\n测试目录已清理")
json_operations()
运行结构化数据文件处理示例:
python structured_data.py
9.3 二进制文件和高级I/O
二进制文件操作
”`python import struct import pickle import tempfile from pathlib import Path import os import hashlib
def binary_file_operations(): “”“二进制文件操作示例”“”
print("=== 二进制文件操作 ===")
# 创建测试目录
test_dir = Path(tempfile.mkdtemp())
print(f"测试目录: