3.1 文件格式概览
pandas支持多种数据格式的读取和写入,这是数据分析工作流程中的重要环节。
graph TD
A[数据源] --> B[pandas DataFrame]
B --> C[数据分析]
C --> D[结果输出]
A --> A1[CSV文件]
A --> A2[Excel文件]
A --> A3[JSON文件]
A --> A4[数据库]
A --> A5[网络API]
A --> A6[其他格式]
D --> D1[CSV文件]
D --> D2[Excel文件]
D --> D3[JSON文件]
D --> D4[数据库]
D --> D5[图表]
D --> D6[报告]
3.1.1 支持的文件格式
import pandas as pd
import numpy as np
import json
from datetime import datetime, timedelta
def supported_formats_overview():
"""pandas支持的文件格式概览"""
formats = {
'格式': ['CSV', 'Excel', 'JSON', 'Parquet', 'HDF5', 'SQL', 'HTML', 'XML', 'Pickle'],
'读取函数': ['read_csv()', 'read_excel()', 'read_json()', 'read_parquet()',
'read_hdf()', 'read_sql()', 'read_html()', 'read_xml()', 'read_pickle()'],
'写入函数': ['to_csv()', 'to_excel()', 'to_json()', 'to_parquet()',
'to_hdf()', 'to_sql()', 'to_html()', 'to_xml()', 'to_pickle()'],
'特点': ['通用文本格式', 'Office格式', 'Web数据格式', '列式存储',
'层次化数据', '关系数据库', '网页格式', '标记语言', 'Python对象']
}
df = pd.DataFrame(formats)
print("pandas支持的主要文件格式:")
print(df.to_string(index=False))
return df
# 运行示例
formats_df = supported_formats_overview()
3.2 CSV文件操作
3.2.1 CSV文件读取
CSV(Comma-Separated Values)是最常用的数据交换格式。
def csv_reading_examples():
"""CSV文件读取示例"""
# 首先创建示例CSV数据
sample_data = """name,age,city,salary,department
Alice,25,New York,50000,IT
Bob,30,Los Angeles,60000,HR
Charlie,35,San Francisco,70000,IT
Diana,28,Chicago,55000,Marketing
Eve,32,Boston,65000,IT
Frank,29,Seattle,58000,Finance"""
# 保存为CSV文件
with open('sample_data.csv', 'w') as f:
f.write(sample_data)
print("1. 基础读取:")
df1 = pd.read_csv('sample_data.csv')
print(df1)
# 2. 指定索引列
print(f"\n2. 指定name为索引:")
df2 = pd.read_csv('sample_data.csv', index_col='name')
print(df2)
# 3. 选择特定列
print(f"\n3. 只读取特定列:")
df3 = pd.read_csv('sample_data.csv', usecols=['name', 'age', 'salary'])
print(df3)
# 4. 指定数据类型
print(f"\n4. 指定数据类型:")
dtypes = {'age': 'int8', 'salary': 'int32', 'department': 'category'}
df4 = pd.read_csv('sample_data.csv', dtype=dtypes)
print(df4.dtypes)
# 5. 处理缺失值
sample_data_with_na = """name,age,city,salary,department
Alice,25,New York,50000,IT
Bob,,Los Angeles,60000,HR
Charlie,35,,70000,IT
Diana,28,Chicago,,Marketing
Eve,32,Boston,65000,"""
with open('sample_data_na.csv', 'w') as f:
f.write(sample_data_with_na)
print(f"\n5. 处理缺失值:")
df5 = pd.read_csv('sample_data_na.csv', na_values=['', 'NULL', 'N/A'])
print(df5)
print(f"缺失值统计:\n{df5.isnull().sum()}")
return df1, df2, df3, df4, df5
# 运行示例
csv_dataframes = csv_reading_examples()
3.2.2 高级CSV读取选项
def advanced_csv_reading():
"""高级CSV读取选项"""
# 创建复杂的CSV数据
complex_csv = """# 这是注释行
# 员工数据 - 2024年
"姓名","年龄","城市","薪资","部门","入职日期"
"张三",25,"北京",50000,"IT","2023-01-15"
"李四",30,"上海",60000,"HR","2022-03-20"
"王五",35,"广州",70000,"IT","2021-06-10"
"赵六",28,"深圳",55000,"市场","2023-08-05"
"钱七",32,"杭州",65000,"IT","2022-11-30"
"""
with open('complex_data.csv', 'w', encoding='utf-8') as f:
f.write(complex_csv)
# 1. 跳过注释行
print("1. 跳过注释行:")
df1 = pd.read_csv('complex_data.csv', comment='#', encoding='utf-8')
print(df1)
# 2. 自定义分隔符
pipe_data = "name|age|city|salary\nAlice|25|NY|50000\nBob|30|LA|60000"
with open('pipe_data.txt', 'w') as f:
f.write(pipe_data)
print(f"\n2. 自定义分隔符:")
df2 = pd.read_csv('pipe_data.txt', sep='|')
print(df2)
# 3. 处理日期列
print(f"\n3. 解析日期列:")
df3 = pd.read_csv('complex_data.csv', comment='#', encoding='utf-8',
parse_dates=['入职日期'])
print(df3.dtypes)
print(df3)
# 4. 自定义列名
print(f"\n4. 自定义列名:")
custom_names = ['employee_name', 'employee_age', 'location', 'income', 'dept', 'hire_date']
df4 = pd.read_csv('complex_data.csv', comment='#', encoding='utf-8',
names=custom_names, header=1)
print(df4)
# 5. 分块读取大文件
print(f"\n5. 分块读取:")
chunk_size = 2
chunks = []
for chunk in pd.read_csv('complex_data.csv', comment='#', encoding='utf-8', chunksize=chunk_size):
print(f"处理块,形状: {chunk.shape}")
chunks.append(chunk)
# 合并所有块
df5 = pd.concat(chunks, ignore_index=True)
print(f"合并后的数据:\n{df5}")
return df1, df2, df3, df4, df5
# 运行示例
advanced_csv_dfs = advanced_csv_reading()
3.2.3 CSV文件写入
def csv_writing_examples():
"""CSV文件写入示例"""
# 创建示例DataFrame
df = pd.DataFrame({
'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
'age': [25, 30, 35, 28, 32],
'city': ['New York', 'Los Angeles', 'San Francisco', 'Chicago', 'Boston'],
'salary': [50000, 60000, 70000, 55000, 65000],
'department': ['IT', 'HR', 'IT', 'Marketing', 'IT']
})
print("原始DataFrame:")
print(df)
# 1. 基础写入
print(f"\n1. 基础写入到CSV:")
df.to_csv('output_basic.csv')
print("文件已保存为 output_basic.csv")
# 2. 不包含索引
print(f"\n2. 不包含索引:")
df.to_csv('output_no_index.csv', index=False)
print("文件已保存为 output_no_index.csv")
# 3. 选择特定列
print(f"\n3. 只保存特定列:")
df[['name', 'age', 'salary']].to_csv('output_selected.csv', index=False)
print("文件已保存为 output_selected.csv")
# 4. 自定义分隔符
print(f"\n4. 使用自定义分隔符:")
df.to_csv('output_pipe.txt', sep='|', index=False)
print("文件已保存为 output_pipe.txt")
# 5. 处理中文编码
df_chinese = pd.DataFrame({
'姓名': ['张三', '李四', '王五'],
'年龄': [25, 30, 35],
'城市': ['北京', '上海', '广州']
})
print(f"\n5. 中文数据写入:")
df_chinese.to_csv('output_chinese.csv', index=False, encoding='utf-8-sig')
print("中文文件已保存为 output_chinese.csv")
# 6. 追加模式
print(f"\n6. 追加模式:")
new_data = pd.DataFrame({
'name': ['Frank', 'Grace'],
'age': [29, 27],
'city': ['Seattle', 'Portland'],
'salary': [58000, 52000],
'department': ['Finance', 'HR']
})
new_data.to_csv('output_basic.csv', mode='a', header=False, index=False)
print("新数据已追加到 output_basic.csv")
# 验证追加结果
appended_df = pd.read_csv('output_basic.csv')
print(f"追加后的数据:\n{appended_df}")
return df, df_chinese
# 运行示例
csv_write_dfs = csv_writing_examples()
3.3 Excel文件操作
3.3.1 Excel文件读取
def excel_reading_examples():
"""Excel文件读取示例"""
# 首先创建示例Excel文件
df1 = pd.DataFrame({
'Product': ['A', 'B', 'C', 'D', 'E'],
'Q1_Sales': [100, 150, 200, 120, 180],
'Q2_Sales': [110, 160, 190, 130, 170],
'Q3_Sales': [120, 140, 210, 125, 185],
'Q4_Sales': [130, 155, 205, 135, 175]
})
df2 = pd.DataFrame({
'Employee': ['Alice', 'Bob', 'Charlie', 'Diana'],
'Department': ['IT', 'HR', 'IT', 'Marketing'],
'Salary': [50000, 60000, 70000, 55000],
'Bonus': [5000, 6000, 7000, 5500]
})
# 创建Excel文件
with pd.ExcelWriter('sample_data.xlsx', engine='openpyxl') as writer:
df1.to_sheet(writer, sheet_name='Sales', index=False)
df2.to_sheet(writer, sheet_name='Employees', index=False)
print("Excel文件已创建")
# 1. 读取默认工作表
print(f"\n1. 读取默认工作表:")
df_default = pd.read_excel('sample_data.xlsx')
print(df_default)
# 2. 读取指定工作表
print(f"\n2. 读取指定工作表:")
df_employees = pd.read_excel('sample_data.xlsx', sheet_name='Employees')
print(df_employees)
# 3. 读取多个工作表
print(f"\n3. 读取多个工作表:")
all_sheets = pd.read_excel('sample_data.xlsx', sheet_name=None)
for sheet_name, sheet_df in all_sheets.items():
print(f"工作表: {sheet_name}")
print(sheet_df.head())
print()
# 4. 指定读取范围
print(f"\n4. 指定读取范围:")
df_range = pd.read_excel('sample_data.xlsx', sheet_name='Sales',
usecols='A:C', nrows=3)
print(df_range)
# 5. 跳过行
print(f"\n5. 跳过前2行:")
df_skip = pd.read_excel('sample_data.xlsx', sheet_name='Sales',
skiprows=2)
print(df_skip)
return df1, df2, all_sheets
# 运行示例
excel_dfs = excel_reading_examples()
3.3.2 Excel文件写入
def excel_writing_examples():
"""Excel文件写入示例"""
# 创建示例数据
sales_data = pd.DataFrame({
'Month': ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun'],
'Product_A': [100, 120, 110, 130, 125, 140],
'Product_B': [80, 90, 85, 95, 88, 100],
'Product_C': [60, 70, 65, 75, 72, 80]
})
employee_data = pd.DataFrame({
'Name': ['Alice', 'Bob', 'Charlie', 'Diana'],
'Department': ['IT', 'HR', 'Finance', 'Marketing'],
'Salary': [50000, 55000, 60000, 52000],
'Performance': [4.5, 4.2, 4.8, 4.3]
})
summary_data = pd.DataFrame({
'Metric': ['Total Sales', 'Average Salary', 'Employee Count'],
'Value': [sales_data.iloc[:, 1:].sum().sum(),
employee_data['Salary'].mean(),
len(employee_data)]
})
print("准备写入的数据:")
print("销售数据:")
print(sales_data)
print("\n员工数据:")
print(employee_data)
print("\n汇总数据:")
print(summary_data)
# 1. 基础写入
print(f"\n1. 基础写入:")
sales_data.to_excel('output_basic.xlsx', index=False)
print("文件已保存为 output_basic.xlsx")
# 2. 多工作表写入
print(f"\n2. 多工作表写入:")
with pd.ExcelWriter('output_multiple_sheets.xlsx', engine='openpyxl') as writer:
sales_data.to_excel(writer, sheet_name='Sales', index=False)
employee_data.to_excel(writer, sheet_name='Employees', index=False)
summary_data.to_excel(writer, sheet_name='Summary', index=False)
print("多工作表文件已保存")
# 3. 格式化写入
print(f"\n3. 格式化写入:")
with pd.ExcelWriter('output_formatted.xlsx', engine='openpyxl') as writer:
# 写入数据
sales_data.to_excel(writer, sheet_name='Sales', index=False, startrow=1)
employee_data.to_excel(writer, sheet_name='Employees', index=False, startrow=1)
# 获取工作簿和工作表
workbook = writer.book
sales_sheet = writer.sheets['Sales']
employee_sheet = writer.sheets['Employees']
# 添加标题
sales_sheet['A1'] = '销售数据报告'
employee_sheet['A1'] = '员工信息表'
# 设置列宽
for sheet in [sales_sheet, employee_sheet]:
for column in sheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
sheet.column_dimensions[column_letter].width = adjusted_width
print("格式化文件已保存")
# 4. 追加到现有文件
print(f"\n4. 追加到现有文件:")
new_sales = pd.DataFrame({
'Month': ['Jul', 'Aug'],
'Product_A': [145, 150],
'Product_B': [105, 110],
'Product_C': [85, 90]
})
# 读取现有数据
existing_data = pd.read_excel('output_basic.xlsx')
# 合并数据
combined_data = pd.concat([existing_data, new_sales], ignore_index=True)
# 写回文件
combined_data.to_excel('output_basic.xlsx', index=False)
print("数据已追加到现有文件")
return sales_data, employee_data, summary_data
# 运行示例
excel_write_data = excel_writing_examples()
3.4 JSON文件操作
3.4.1 JSON文件读取
def json_reading_examples():
"""JSON文件读取示例"""
# 创建示例JSON数据
json_data = {
"employees": [
{"name": "Alice", "age": 25, "department": "IT", "salary": 50000},
{"name": "Bob", "age": 30, "department": "HR", "salary": 60000},
{"name": "Charlie", "age": 35, "department": "IT", "salary": 70000}
],
"company": "TechCorp",
"location": "San Francisco"
}
# 保存为JSON文件
with open('sample_data.json', 'w') as f:
json.dump(json_data, f, indent=2)
# 1. 基础JSON读取
print("1. 基础JSON读取:")
df1 = pd.read_json('sample_data.json')
print(df1)
# 2. 指定orient参数
# 创建不同格式的JSON
records_json = [
{"name": "Alice", "age": 25, "city": "NY"},
{"name": "Bob", "age": 30, "city": "LA"},
{"name": "Charlie", "age": 35, "city": "SF"}
]
with open('records.json', 'w') as f:
json.dump(records_json, f, indent=2)
print(f"\n2. records格式JSON:")
df2 = pd.read_json('records.json', orient='records')
print(df2)
# 3. 嵌套JSON处理
nested_json = {
"data": {
"users": [
{"id": 1, "profile": {"name": "Alice", "age": 25}},
{"id": 2, "profile": {"name": "Bob", "age": 30}}
]
}
}
with open('nested.json', 'w') as f:
json.dump(nested_json, f, indent=2)
print(f"\n3. 处理嵌套JSON:")
# 使用json_normalize处理嵌套结构
with open('nested.json', 'r') as f:
data = json.load(f)
df3 = pd.json_normalize(data['data']['users'])
print(df3)
# 4. 从URL读取JSON
print(f"\n4. 从字符串读取JSON:")
json_string = '''[
{"product": "A", "price": 100, "stock": 50},
{"product": "B", "price": 150, "stock": 30},
{"product": "C", "price": 200, "stock": 20}
]'''
df4 = pd.read_json(json_string, orient='records')
print(df4)
# 5. 处理日期时间
datetime_json = [
{"event": "Meeting", "timestamp": "2024-01-15T10:00:00"},
{"event": "Call", "timestamp": "2024-01-15T14:30:00"},
{"event": "Review", "timestamp": "2024-01-15T16:00:00"}
]
with open('datetime.json', 'w') as f:
json.dump(datetime_json, f, indent=2)
print(f"\n5. 处理日期时间:")
df5 = pd.read_json('datetime.json', orient='records')
df5['timestamp'] = pd.to_datetime(df5['timestamp'])
print(df5)
print(f"数据类型:\n{df5.dtypes}")
return df1, df2, df3, df4, df5
# 运行示例
json_dfs = json_reading_examples()
3.4.2 JSON文件写入
def json_writing_examples():
"""JSON文件写入示例"""
# 创建示例DataFrame
df = pd.DataFrame({
'product_id': [1, 2, 3, 4, 5],
'product_name': ['Laptop', 'Mouse', 'Keyboard', 'Monitor', 'Headphones'],
'price': [999.99, 29.99, 79.99, 299.99, 149.99],
'category': ['Electronics', 'Accessories', 'Accessories', 'Electronics', 'Accessories'],
'in_stock': [True, True, False, True, True],
'last_updated': pd.date_range('2024-01-01', periods=5, freq='D')
})
print("原始DataFrame:")
print(df)
# 1. 基础JSON写入
print(f"\n1. 基础JSON写入 (records格式):")
df.to_json('output_records.json', orient='records', indent=2)
print("文件已保存为 output_records.json")
# 验证写入结果
with open('output_records.json', 'r') as f:
content = f.read()
print("文件内容预览:")
print(content[:200] + "...")
# 2. 不同的orient选项
print(f"\n2. 不同的orient选项:")
# index格式
df.to_json('output_index.json', orient='index', indent=2)
print("index格式已保存")
# values格式
df.to_json('output_values.json', orient='values', indent=2)
print("values格式已保存")
# columns格式
df.to_json('output_columns.json', orient='columns', indent=2)
print("columns格式已保存")
# 3. 处理日期格式
print(f"\n3. 处理日期格式:")
df.to_json('output_date_iso.json', orient='records', date_format='iso', indent=2)
df.to_json('output_date_epoch.json', orient='records', date_format='epoch', indent=2)
print("不同日期格式已保存")
# 4. 自定义JSON结构
print(f"\n4. 创建自定义JSON结构:")
custom_json = {
"metadata": {
"total_products": len(df),
"categories": df['category'].unique().tolist(),
"last_updated": datetime.now().isoformat()
},
"products": df.to_dict('records')
}
with open('output_custom.json', 'w') as f:
json.dump(custom_json, f, indent=2, default=str)
print("自定义结构JSON已保存")
# 5. 压缩JSON
print(f"\n5. 压缩JSON (无缩进):")
df.to_json('output_compressed.json', orient='records')
print("压缩JSON已保存")
# 比较文件大小
import os
size_formatted = os.path.getsize('output_records.json')
size_compressed = os.path.getsize('output_compressed.json')
print(f"格式化文件大小: {size_formatted} bytes")
print(f"压缩文件大小: {size_compressed} bytes")
print(f"压缩比: {size_compressed/size_formatted:.2%}")
return df, custom_json
# 运行示例
json_write_data = json_writing_examples()
3.5 数据库操作
3.5.1 SQLite数据库操作
import sqlite3
def sqlite_operations():
"""SQLite数据库操作示例"""
# 创建示例数据
employees_df = pd.DataFrame({
'employee_id': [1, 2, 3, 4, 5],
'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
'department': ['IT', 'HR', 'Finance', 'Marketing', 'IT'],
'salary': [50000, 55000, 60000, 52000, 58000],
'hire_date': pd.date_range('2020-01-01', periods=5, freq='6M')
})
projects_df = pd.DataFrame({
'project_id': [101, 102, 103, 104],
'project_name': ['Website Redesign', 'Mobile App', 'Data Migration', 'Marketing Campaign'],
'manager_id': [1, 3, 1, 4],
'budget': [50000, 80000, 30000, 25000],
'status': ['Active', 'Planning', 'Completed', 'Active']
})
print("员工数据:")
print(employees_df)
print("\n项目数据:")
print(projects_df)
# 1. 创建数据库连接
print(f"\n1. 创建SQLite数据库:")
conn = sqlite3.connect('company.db')
# 2. 写入数据到数据库
print(f"\n2. 写入数据到数据库:")
employees_df.to_sql('employees', conn, if_exists='replace', index=False)
projects_df.to_sql('projects', conn, if_exists='replace', index=False)
print("数据已写入数据库")
# 3. 从数据库读取数据
print(f"\n3. 从数据库读取数据:")
df_employees = pd.read_sql('SELECT * FROM employees', conn)
print("从数据库读取的员工数据:")
print(df_employees)
# 4. 执行SQL查询
print(f"\n4. 执行SQL查询:")
# 查询IT部门员工
it_employees = pd.read_sql("""
SELECT name, salary
FROM employees
WHERE department = 'IT'
ORDER BY salary DESC
""", conn)
print("IT部门员工:")
print(it_employees)
# 连接查询
project_managers = pd.read_sql("""
SELECT p.project_name, e.name as manager_name, p.budget
FROM projects p
JOIN employees e ON p.manager_id = e.employee_id
WHERE p.status = 'Active'
""", conn)
print("\n活跃项目及其经理:")
print(project_managers)
# 聚合查询
dept_stats = pd.read_sql("""
SELECT department,
COUNT(*) as employee_count,
AVG(salary) as avg_salary,
MAX(salary) as max_salary
FROM employees
GROUP BY department
""", conn)
print("\n部门统计:")
print(dept_stats)
# 5. 参数化查询
print(f"\n5. 参数化查询:")
min_salary = 55000
high_earners = pd.read_sql("""
SELECT name, department, salary
FROM employees
WHERE salary >= ?
ORDER BY salary DESC
""", conn, params=[min_salary])
print(f"薪资大于等于{min_salary}的员工:")
print(high_earners)
# 6. 更新数据
print(f"\n6. 更新数据:")
# 给IT部门员工加薪5%
cursor = conn.cursor()
cursor.execute("""
UPDATE employees
SET salary = salary * 1.05
WHERE department = 'IT'
""")
conn.commit()
# 验证更新
updated_employees = pd.read_sql('SELECT * FROM employees WHERE department = "IT"', conn)
print("IT部门员工加薪后:")
print(updated_employees[['name', 'salary']])
# 关闭连接
conn.close()
return employees_df, projects_df
# 运行示例
db_data = sqlite_operations()
3.5.2 其他数据库连接
def database_connection_examples():
"""其他数据库连接示例"""
print("数据库连接字符串示例:")
# 1. MySQL连接
mysql_example = """
# MySQL连接示例 (需要安装 pymysql 或 mysqlclient)
import pymysql
from sqlalchemy import create_engine
# 连接字符串
mysql_url = 'mysql+pymysql://username:password@localhost:3306/database_name'
engine = create_engine(mysql_url)
# 读取数据
df = pd.read_sql('SELECT * FROM table_name', engine)
# 写入数据
df.to_sql('new_table', engine, if_exists='replace', index=False)
"""
# 2. PostgreSQL连接
postgresql_example = """
# PostgreSQL连接示例 (需要安装 psycopg2)
from sqlalchemy import create_engine
# 连接字符串
postgres_url = 'postgresql://username:password@localhost:5432/database_name'
engine = create_engine(postgres_url)
# 读取数据
df = pd.read_sql('SELECT * FROM table_name', engine)
# 写入数据
df.to_sql('new_table', engine, if_exists='replace', index=False)
"""
# 3. SQL Server连接
sqlserver_example = """
# SQL Server连接示例 (需要安装 pyodbc)
from sqlalchemy import create_engine
# 连接字符串
sqlserver_url = 'mssql+pyodbc://username:password@server/database?driver=ODBC+Driver+17+for+SQL+Server'
engine = create_engine(sqlserver_url)
# 读取数据
df = pd.read_sql('SELECT * FROM table_name', engine)
# 写入数据
df.to_sql('new_table', engine, if_exists='replace', index=False)
"""
print("1. MySQL连接:")
print(mysql_example)
print("\n2. PostgreSQL连接:")
print(postgresql_example)
print("\n3. SQL Server连接:")
print(sqlserver_example)
# 4. 通用数据库操作函数
def database_operations(engine):
"""通用数据库操作函数"""
# 创建示例数据
df = pd.DataFrame({
'id': range(1, 6),
'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
'value': [100, 200, 300, 400, 500]
})
# 写入数据
df.to_sql('test_table', engine, if_exists='replace', index=False)
# 读取数据
result = pd.read_sql('SELECT * FROM test_table WHERE value > 200', engine)
return result
print("\n4. 通用数据库操作函数:")
print("""
def database_operations(engine):
# 创建示例数据
df = pd.DataFrame({
'id': range(1, 6),
'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
'value': [100, 200, 300, 400, 500]
})
# 写入数据
df.to_sql('test_table', engine, if_exists='replace', index=False)
# 读取数据
result = pd.read_sql('SELECT * FROM test_table WHERE value > 200', engine)
return result
""")
return mysql_example, postgresql_example, sqlserver_example
# 运行示例
db_examples = database_connection_examples()
3.6 其他格式操作
3.6.1 Parquet文件操作
def parquet_operations():
"""Parquet文件操作示例"""
# 创建示例数据
large_df = pd.DataFrame({
'id': range(10000),
'category': np.random.choice(['A', 'B', 'C', 'D'], 10000),
'value': np.random.randn(10000),
'timestamp': pd.date_range('2024-01-01', periods=10000, freq='1min'),
'flag': np.random.choice([True, False], 10000)
})
print(f"示例数据形状: {large_df.shape}")
print(f"数据类型:\n{large_df.dtypes}")
print(f"前5行:\n{large_df.head()}")
# 1. 写入Parquet文件
print(f"\n1. 写入Parquet文件:")
large_df.to_parquet('data.parquet', index=False)
print("Parquet文件已保存")
# 2. 读取Parquet文件
print(f"\n2. 读取Parquet文件:")
df_parquet = pd.read_parquet('data.parquet')
print(f"读取的数据形状: {df_parquet.shape}")
print(f"数据类型:\n{df_parquet.dtypes}")
# 3. 选择特定列
print(f"\n3. 选择特定列:")
df_selected = pd.read_parquet('data.parquet', columns=['id', 'category', 'value'])
print(f"选择列后的形状: {df_selected.shape}")
print(df_selected.head())
# 4. 压缩选项
print(f"\n4. 不同压缩选项:")
compression_options = ['snappy', 'gzip', 'brotli']
for compression in compression_options:
filename = f'data_{compression}.parquet'
large_df.to_parquet(filename, compression=compression, index=False)
# 检查文件大小
import os
size = os.path.getsize(filename)
print(f"{compression}压缩文件大小: {size:,} bytes")
# 5. 比较CSV和Parquet
print(f"\n5. 比较CSV和Parquet:")
# 保存为CSV
large_df.to_csv('data.csv', index=False)
# 比较文件大小
csv_size = os.path.getsize('data.csv')
parquet_size = os.path.getsize('data.parquet')
print(f"CSV文件大小: {csv_size:,} bytes")
print(f"Parquet文件大小: {parquet_size:,} bytes")
print(f"Parquet相对CSV的大小: {parquet_size/csv_size:.2%}")
# 比较读取速度
import time
# CSV读取时间
start_time = time.time()
df_csv = pd.read_csv('data.csv')
csv_time = time.time() - start_time
# Parquet读取时间
start_time = time.time()
df_parquet = pd.read_parquet('data.parquet')
parquet_time = time.time() - start_time
print(f"CSV读取时间: {csv_time:.3f} 秒")
print(f"Parquet读取时间: {parquet_time:.3f} 秒")
print(f"Parquet相对CSV的读取速度: {csv_time/parquet_time:.2f}x 更快")
return large_df
# 运行示例
parquet_df = parquet_operations()
3.6.2 HDF5文件操作
def hdf5_operations():
"""HDF5文件操作示例"""
# 创建示例数据
df1 = pd.DataFrame({
'date': pd.date_range('2024-01-01', periods=100),
'stock_A': np.random.randn(100).cumsum() + 100,
'stock_B': np.random.randn(100).cumsum() + 50,
'volume': np.random.randint(1000, 10000, 100)
})
df2 = pd.DataFrame({
'employee_id': range(1, 51),
'name': [f'Employee_{i}' for i in range(1, 51)],
'department': np.random.choice(['IT', 'HR', 'Finance', 'Marketing'], 50),
'salary': np.random.normal(60000, 15000, 50)
})
print("股票数据:")
print(df1.head())
print("\n员工数据:")
print(df2.head())
# 1. 写入HDF5文件
print(f"\n1. 写入HDF5文件:")
df1.to_hdf('data.h5', key='stocks', mode='w')
df2.to_hdf('data.h5', key='employees', mode='a')
print("HDF5文件已保存")
# 2. 读取HDF5文件
print(f"\n2. 读取HDF5文件:")
stocks_df = pd.read_hdf('data.h5', key='stocks')
employees_df = pd.read_hdf('data.h5', key='employees')
print(f"股票数据形状: {stocks_df.shape}")
print(f"员工数据形状: {employees_df.shape}")
# 3. 查看HDF5文件信息
print(f"\n3. HDF5文件信息:")
with pd.HDFStore('data.h5', mode='r') as store:
print(f"文件中的键: {list(store.keys())}")
print(f"stocks信息: {store.get_storer('stocks').shape}")
print(f"employees信息: {store.get_storer('employees').shape}")
# 4. 条件查询
print(f"\n4. 条件查询:")
# 注意:条件查询需要使用table格式
df1.to_hdf('data_table.h5', key='stocks', mode='w', format='table')
# 查询特定日期范围的数据
query_result = pd.read_hdf('data_table.h5', key='stocks',
where='date >= "2024-01-15" & date <= "2024-01-20"')
print(f"查询结果:")
print(query_result)
# 5. 追加数据
print(f"\n5. 追加数据:")
new_data = pd.DataFrame({
'date': pd.date_range('2024-04-11', periods=10),
'stock_A': np.random.randn(10).cumsum() + 100,
'stock_B': np.random.randn(10).cumsum() + 50,
'volume': np.random.randint(1000, 10000, 10)
})
new_data.to_hdf('data_table.h5', key='stocks', mode='a', format='table', append=True)
# 验证追加结果
updated_df = pd.read_hdf('data_table.h5', key='stocks')
print(f"追加后的数据形状: {updated_df.shape}")
print(f"最后几行:\n{updated_df.tail()}")
return df1, df2
# 运行示例
hdf5_data = hdf5_operations()
3.6.3 HTML和XML操作
def html_xml_operations():
"""HTML和XML操作示例"""
# 1. HTML表格操作
print("1. HTML表格操作:")
# 创建示例DataFrame
df = pd.DataFrame({
'Product': ['Laptop', 'Mouse', 'Keyboard', 'Monitor'],
'Price': [999.99, 29.99, 79.99, 299.99],
'Stock': [15, 50, 30, 8],
'Rating': [4.5, 4.2, 4.0, 4.8]
})
print("原始数据:")
print(df)
# 写入HTML
html_string = df.to_html(index=False, table_id='product_table',
classes='table table-striped')
# 创建完整的HTML页面
full_html = f"""
<!DOCTYPE html>
<html>
<head>
<title>产品列表</title>
<style>
.table {{ border-collapse: collapse; width: 100%; }}
.table th, .table td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
.table th {{ background-color: #f2f2f2; }}
.table-striped tr:nth-child(even) {{ background-color: #f9f9f9; }}
</style>
</head>
<body>
<h1>产品信息表</h1>
{html_string}
</body>
</html>
"""
with open('products.html', 'w', encoding='utf-8') as f:
f.write(full_html)
print("HTML文件已保存为 products.html")
# 从HTML读取表格
print(f"\n从HTML读取表格:")
try:
# 注意:需要安装 lxml 或 html5lib
html_tables = pd.read_html('products.html')
print(f"找到 {len(html_tables)} 个表格")
print("第一个表格:")
print(html_tables[0])
except ImportError:
print("需要安装 lxml 或 html5lib 来读取HTML表格")
print("pip install lxml")
# 2. XML操作示例
print(f"\n2. XML操作示例:")
# 创建XML数据
xml_data = """<?xml version="1.0" encoding="UTF-8"?>
<products>
<product id="1">
<name>Laptop</name>
<price>999.99</price>
<stock>15</stock>
<category>Electronics</category>
</product>
<product id="2">
<name>Mouse</name>
<price>29.99</price>
<stock>50</stock>
<category>Accessories</category>
</product>
<product id="3">
<name>Keyboard</name>
<price>79.99</price>
<stock>30</stock>
<category>Accessories</category>
</product>
</products>"""
with open('products.xml', 'w', encoding='utf-8') as f:
f.write(xml_data)
print("XML文件已创建")
# 从XML读取数据
try:
# 使用read_xml读取(pandas 1.3.0+)
xml_df = pd.read_xml('products.xml', xpath='.//product')
print("从XML读取的数据:")
print(xml_df)
except AttributeError:
print("read_xml需要pandas 1.3.0或更高版本")
# 使用xml.etree.ElementTree手动解析
import xml.etree.ElementTree as ET
tree = ET.parse('products.xml')
root = tree.getroot()
products = []
for product in root.findall('product'):
products.append({
'id': product.get('id'),
'name': product.find('name').text,
'price': float(product.find('price').text),
'stock': int(product.find('stock').text),
'category': product.find('category').text
})
xml_df = pd.DataFrame(products)
print("手动解析XML的数据:")
print(xml_df)
# 3. 网页表格抓取示例
print(f"\n3. 网页表格抓取示例:")
# 创建示例网页
sample_webpage = """
<!DOCTYPE html>
<html>
<head><title>股票数据</title></head>
<body>
<h1>股票价格</h1>
<table border="1">
<tr><th>股票代码</th><th>公司名称</th><th>当前价格</th><th>涨跌幅</th></tr>
<tr><td>AAPL</td><td>Apple Inc.</td><td>150.25</td><td>+2.5%</td></tr>
<tr><td>GOOGL</td><td>Alphabet Inc.</td><td>2800.50</td><td>+1.2%</td></tr>
<tr><td>MSFT</td><td>Microsoft Corp.</td><td>300.75</td><td>-0.8%</td></tr>
</table>
<h2>财务数据</h2>
<table border="1">
<tr><th>指标</th><th>Q1</th><th>Q2</th><th>Q3</th><th>Q4</th></tr>
<tr><td>收入</td><td>1000</td><td>1200</td><td>1100</td><td>1300</td></tr>
<tr><td>利润</td><td>200</td><td>250</td><td>220</td><td>280</td></tr>
</table>
</body>
</html>
"""
with open('sample_webpage.html', 'w', encoding='utf-8') as f:
f.write(sample_webpage)
try:
# 读取网页中的所有表格
tables = pd.read_html('sample_webpage.html')
print(f"找到 {len(tables)} 个表格")
for i, table in enumerate(tables):
print(f"\n表格 {i+1}:")
print(table)
except ImportError:
print("需要安装相关依赖来读取HTML表格")
return df, xml_df if 'xml_df' in locals() else None
# 运行示例
html_xml_data = html_xml_operations()
3.7 性能优化和最佳实践
3.7.1 读取性能优化
def performance_optimization():
"""读取性能优化技巧"""
# 创建大型测试数据
large_data = pd.DataFrame({
'id': range(100000),
'category': np.random.choice(['A', 'B', 'C', 'D', 'E'], 100000),
'value1': np.random.randn(100000),
'value2': np.random.randn(100000),
'date': pd.date_range('2020-01-01', periods=100000, freq='1min'),
'text': [f'text_{i}' for i in range(100000)]
})
print(f"测试数据形状: {large_data.shape}")
# 保存为不同格式
large_data.to_csv('large_data.csv', index=False)
large_data.to_parquet('large_data.parquet', index=False)
import time
# 1. 数据类型优化
print(f"\n1. 数据类型优化:")
# 默认读取
start_time = time.time()
df_default = pd.read_csv('large_data.csv')
default_time = time.time() - start_time
default_memory = df_default.memory_usage(deep=True).sum()
# 优化数据类型
dtypes = {
'id': 'int32',
'category': 'category',
'value1': 'float32',
'value2': 'float32',
'text': 'category'
}
start_time = time.time()
df_optimized = pd.read_csv('large_data.csv', dtype=dtypes, parse_dates=['date'])
optimized_time = time.time() - start_time
optimized_memory = df_optimized.memory_usage(deep=True).sum()
print(f"默认读取时间: {default_time:.3f}秒, 内存: {default_memory/1024/1024:.1f}MB")
print(f"优化读取时间: {optimized_time:.3f}秒, 内存: {optimized_memory/1024/1024:.1f}MB")
print(f"内存节省: {(1-optimized_memory/default_memory)*100:.1f}%")
# 2. 分块读取
print(f"\n2. 分块读取:")
def process_chunk(chunk):
"""处理数据块的示例函数"""
return chunk[chunk['value1'] > 0].groupby('category')['value2'].mean()
start_time = time.time()
chunk_results = []
for chunk in pd.read_csv('large_data.csv', chunksize=10000, dtype=dtypes):
result = process_chunk(chunk)
chunk_results.append(result)
# 合并结果
final_result = pd.concat(chunk_results).groupby(level=0).mean()
chunk_time = time.time() - start_time
print(f"分块处理时间: {chunk_time:.3f}秒")
print(f"处理结果:\n{final_result}")
# 3. 列选择优化
print(f"\n3. 列选择优化:")
# 只读取需要的列
start_time = time.time()
df_selected = pd.read_csv('large_data.csv', usecols=['id', 'category', 'value1'])
selected_time = time.time() - start_time
print(f"选择列读取时间: {selected_time:.3f}秒")
print(f"相对完整读取的时间: {selected_time/default_time:.2%}")
# 4. 格式比较
print(f"\n4. 不同格式性能比较:")
formats = {
'CSV': 'large_data.csv',
'Parquet': 'large_data.parquet'
}
for format_name, filename in formats.items():
start_time = time.time()
if format_name == 'CSV':
df = pd.read_csv(filename)
else:
df = pd.read_parquet(filename)
read_time = time.time() - start_time
file_size = os.path.getsize(filename) / 1024 / 1024 # MB
print(f"{format_name}: 读取时间 {read_time:.3f}秒, 文件大小 {file_size:.1f}MB")
return large_data
# 运行示例
perf_data = performance_optimization()
3.7.2 写入性能优化
def writing_optimization():
"""写入性能优化技巧"""
# 创建测试数据
test_data = pd.DataFrame({
'id': range(50000),
'category': np.random.choice(['A', 'B', 'C'], 50000),
'value': np.random.randn(50000),
'timestamp': pd.date_range('2024-01-01', periods=50000, freq='1min')
})
print(f"测试数据形状: {test_data.shape}")
import time
# 1. CSV写入优化
print(f"\n1. CSV写入优化:")
# 默认写入
start_time = time.time()
test_data.to_csv('output_default.csv', index=False)
default_time = time.time() - start_time
# 优化写入(禁用日期格式化)
start_time = time.time()
test_data.to_csv('output_optimized.csv', index=False, date_format='%Y-%m-%d %H:%M:%S')
optimized_time = time.time() - start_time
print(f"默认写入时间: {default_time:.3f}秒")
print(f"优化写入时间: {optimized_time:.3f}秒")
# 2. 不同格式写入比较
print(f"\n2. 不同格式写入比较:")
formats = {
'CSV': lambda df, filename: df.to_csv(filename, index=False),
'Parquet': lambda df, filename: df.to_parquet(filename, index=False),
'JSON': lambda df, filename: df.to_json(filename, orient='records'),
'Pickle': lambda df, filename: df.to_pickle(filename)
}
for format_name, write_func in formats.items():
filename = f'output_test.{format_name.lower()}'
start_time = time.time()
write_func(test_data, filename)
write_time = time.time() - start_time
file_size = os.path.getsize(filename) / 1024 / 1024 # MB
print(f"{format_name}: 写入时间 {write_time:.3f}秒, 文件大小 {file_size:.1f}MB")
# 3. 批量写入优化
print(f"\n3. 批量写入优化:")
# 模拟分批数据写入
batch_size = 10000
total_rows = 50000
# 方法1:多次写入同一文件
start_time = time.time()
for i in range(0, total_rows, batch_size):
batch_data = test_data.iloc[i:i+batch_size]
mode = 'w' if i == 0 else 'a'
header = i == 0
batch_data.to_csv('batch_output.csv', mode=mode, header=header, index=False)
batch_time = time.time() - start_time
# 方法2:收集后一次性写入
start_time = time.time()
all_batches = []
for i in range(0, total_rows, batch_size):
batch_data = test_data.iloc[i:i+batch_size]
all_batches.append(batch_data)
combined_data = pd.concat(all_batches, ignore_index=True)
combined_data.to_csv('combined_output.csv', index=False)
combined_time = time.time() - start_time
print(f"分批写入时间: {batch_time:.3f}秒")
print(f"合并写入时间: {combined_time:.3f}秒")
return test_data
# 运行示例
write_data = writing_optimization()
3.8 本章小结
3.8.1 核心知识点
文件格式支持
- CSV:最通用的文本格式,易于处理和交换
- Excel:支持多工作表,适合商业环境
- JSON:Web数据交换标准格式
- Parquet:高性能列式存储格式
- HDF5:层次化数据格式,支持大数据
- SQL:关系数据库标准接口
读取优化技巧
- 指定数据类型减少内存使用
- 选择必要列提高读取速度
- 分块读取处理大文件
- 使用高效格式如Parquet
写入最佳实践
- 选择合适的文件格式
- 考虑压缩选项节省空间
- 批量操作提高效率
- 处理编码问题
数据库操作
- 使用参数化查询防止SQL注入
- 合理使用索引提高查询性能
- 批量操作减少数据库连接开销
3.8.2 实践要点
- 根据数据特点选择合适的存储格式
- 在读取大文件时考虑内存限制
- 使用数据类型优化减少内存占用
- 定期备份重要数据
- 注意字符编码问题,特别是中文数据
3.8.3 性能对比总结
格式 | 读取速度 | 写入速度 | 文件大小 | 兼容性 | 适用场景 |
---|---|---|---|---|---|
CSV | 中等 | 快 | 大 | 极好 | 数据交换、简单分析 |
Excel | 慢 | 慢 | 中等 | 好 | 商业报告、多表数据 |
JSON | 中等 | 中等 | 大 | 好 | Web应用、API数据 |
Parquet | 快 | 快 | 小 | 中等 | 大数据分析、存储 |
HDF5 | 快 | 快 | 小 | 中等 | 科学计算、时间序列 |
Pickle | 极快 | 极快 | 中等 | 差 | Python内部使用 |
3.8.4 下一步学习
在下一章中,我们将学习: - 数据清洗技术 - 缺失值处理 - 重复数据处理 - 数据类型转换 - 异常值检测和处理
练习题
- 比较不同文件格式的读写性能
- 实现一个通用的数据导入导出工具
- 处理包含中文的CSV文件编码问题
- 设计一个数据库批量导入方案
- 优化大文件的读取和处理流程
记住:选择合适的数据格式是数据分析效率的关键!