3.1 文件格式概览

pandas支持多种数据格式的读取和写入,这是数据分析工作流程中的重要环节。

graph TD
    A[数据源] --> B[pandas DataFrame]
    B --> C[数据分析]
    C --> D[结果输出]
    
    A --> A1[CSV文件]
    A --> A2[Excel文件]
    A --> A3[JSON文件]
    A --> A4[数据库]
    A --> A5[网络API]
    A --> A6[其他格式]
    
    D --> D1[CSV文件]
    D --> D2[Excel文件]
    D --> D3[JSON文件]
    D --> D4[数据库]
    D --> D5[图表]
    D --> D6[报告]

3.1.1 支持的文件格式

import pandas as pd
import numpy as np
import json
from datetime import datetime, timedelta

def supported_formats_overview():
    """pandas支持的文件格式概览"""
    
    formats = {
        '格式': ['CSV', 'Excel', 'JSON', 'Parquet', 'HDF5', 'SQL', 'HTML', 'XML', 'Pickle'],
        '读取函数': ['read_csv()', 'read_excel()', 'read_json()', 'read_parquet()', 
                   'read_hdf()', 'read_sql()', 'read_html()', 'read_xml()', 'read_pickle()'],
        '写入函数': ['to_csv()', 'to_excel()', 'to_json()', 'to_parquet()', 
                   'to_hdf()', 'to_sql()', 'to_html()', 'to_xml()', 'to_pickle()'],
        '特点': ['通用文本格式', 'Office格式', 'Web数据格式', '列式存储', 
               '层次化数据', '关系数据库', '网页格式', '标记语言', 'Python对象']
    }
    
    df = pd.DataFrame(formats)
    print("pandas支持的主要文件格式:")
    print(df.to_string(index=False))
    
    return df

# 运行示例
formats_df = supported_formats_overview()

3.2 CSV文件操作

3.2.1 CSV文件读取

CSV(Comma-Separated Values)是最常用的数据交换格式。

def csv_reading_examples():
    """CSV文件读取示例"""
    
    # 首先创建示例CSV数据
    sample_data = """name,age,city,salary,department
Alice,25,New York,50000,IT
Bob,30,Los Angeles,60000,HR
Charlie,35,San Francisco,70000,IT
Diana,28,Chicago,55000,Marketing
Eve,32,Boston,65000,IT
Frank,29,Seattle,58000,Finance"""
    
    # 保存为CSV文件
    with open('sample_data.csv', 'w') as f:
        f.write(sample_data)
    
    print("1. 基础读取:")
    df1 = pd.read_csv('sample_data.csv')
    print(df1)
    
    # 2. 指定索引列
    print(f"\n2. 指定name为索引:")
    df2 = pd.read_csv('sample_data.csv', index_col='name')
    print(df2)
    
    # 3. 选择特定列
    print(f"\n3. 只读取特定列:")
    df3 = pd.read_csv('sample_data.csv', usecols=['name', 'age', 'salary'])
    print(df3)
    
    # 4. 指定数据类型
    print(f"\n4. 指定数据类型:")
    dtypes = {'age': 'int8', 'salary': 'int32', 'department': 'category'}
    df4 = pd.read_csv('sample_data.csv', dtype=dtypes)
    print(df4.dtypes)
    
    # 5. 处理缺失值
    sample_data_with_na = """name,age,city,salary,department
Alice,25,New York,50000,IT
Bob,,Los Angeles,60000,HR
Charlie,35,,70000,IT
Diana,28,Chicago,,Marketing
Eve,32,Boston,65000,"""
    
    with open('sample_data_na.csv', 'w') as f:
        f.write(sample_data_with_na)
    
    print(f"\n5. 处理缺失值:")
    df5 = pd.read_csv('sample_data_na.csv', na_values=['', 'NULL', 'N/A'])
    print(df5)
    print(f"缺失值统计:\n{df5.isnull().sum()}")
    
    return df1, df2, df3, df4, df5

# 运行示例
csv_dataframes = csv_reading_examples()

3.2.2 高级CSV读取选项

def advanced_csv_reading():
    """高级CSV读取选项"""
    
    # 创建复杂的CSV数据
    complex_csv = """# 这是注释行
# 员工数据 - 2024年
"姓名","年龄","城市","薪资","部门","入职日期"
"张三",25,"北京",50000,"IT","2023-01-15"
"李四",30,"上海",60000,"HR","2022-03-20"
"王五",35,"广州",70000,"IT","2021-06-10"
"赵六",28,"深圳",55000,"市场","2023-08-05"
"钱七",32,"杭州",65000,"IT","2022-11-30"
"""
    
    with open('complex_data.csv', 'w', encoding='utf-8') as f:
        f.write(complex_csv)
    
    # 1. 跳过注释行
    print("1. 跳过注释行:")
    df1 = pd.read_csv('complex_data.csv', comment='#', encoding='utf-8')
    print(df1)
    
    # 2. 自定义分隔符
    pipe_data = "name|age|city|salary\nAlice|25|NY|50000\nBob|30|LA|60000"
    with open('pipe_data.txt', 'w') as f:
        f.write(pipe_data)
    
    print(f"\n2. 自定义分隔符:")
    df2 = pd.read_csv('pipe_data.txt', sep='|')
    print(df2)
    
    # 3. 处理日期列
    print(f"\n3. 解析日期列:")
    df3 = pd.read_csv('complex_data.csv', comment='#', encoding='utf-8',
                     parse_dates=['入职日期'])
    print(df3.dtypes)
    print(df3)
    
    # 4. 自定义列名
    print(f"\n4. 自定义列名:")
    custom_names = ['employee_name', 'employee_age', 'location', 'income', 'dept', 'hire_date']
    df4 = pd.read_csv('complex_data.csv', comment='#', encoding='utf-8',
                     names=custom_names, header=1)
    print(df4)
    
    # 5. 分块读取大文件
    print(f"\n5. 分块读取:")
    chunk_size = 2
    chunks = []
    for chunk in pd.read_csv('complex_data.csv', comment='#', encoding='utf-8', chunksize=chunk_size):
        print(f"处理块,形状: {chunk.shape}")
        chunks.append(chunk)
    
    # 合并所有块
    df5 = pd.concat(chunks, ignore_index=True)
    print(f"合并后的数据:\n{df5}")
    
    return df1, df2, df3, df4, df5

# 运行示例
advanced_csv_dfs = advanced_csv_reading()

3.2.3 CSV文件写入

def csv_writing_examples():
    """CSV文件写入示例"""
    
    # 创建示例DataFrame
    df = pd.DataFrame({
        'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
        'age': [25, 30, 35, 28, 32],
        'city': ['New York', 'Los Angeles', 'San Francisco', 'Chicago', 'Boston'],
        'salary': [50000, 60000, 70000, 55000, 65000],
        'department': ['IT', 'HR', 'IT', 'Marketing', 'IT']
    })
    
    print("原始DataFrame:")
    print(df)
    
    # 1. 基础写入
    print(f"\n1. 基础写入到CSV:")
    df.to_csv('output_basic.csv')
    print("文件已保存为 output_basic.csv")
    
    # 2. 不包含索引
    print(f"\n2. 不包含索引:")
    df.to_csv('output_no_index.csv', index=False)
    print("文件已保存为 output_no_index.csv")
    
    # 3. 选择特定列
    print(f"\n3. 只保存特定列:")
    df[['name', 'age', 'salary']].to_csv('output_selected.csv', index=False)
    print("文件已保存为 output_selected.csv")
    
    # 4. 自定义分隔符
    print(f"\n4. 使用自定义分隔符:")
    df.to_csv('output_pipe.txt', sep='|', index=False)
    print("文件已保存为 output_pipe.txt")
    
    # 5. 处理中文编码
    df_chinese = pd.DataFrame({
        '姓名': ['张三', '李四', '王五'],
        '年龄': [25, 30, 35],
        '城市': ['北京', '上海', '广州']
    })
    
    print(f"\n5. 中文数据写入:")
    df_chinese.to_csv('output_chinese.csv', index=False, encoding='utf-8-sig')
    print("中文文件已保存为 output_chinese.csv")
    
    # 6. 追加模式
    print(f"\n6. 追加模式:")
    new_data = pd.DataFrame({
        'name': ['Frank', 'Grace'],
        'age': [29, 27],
        'city': ['Seattle', 'Portland'],
        'salary': [58000, 52000],
        'department': ['Finance', 'HR']
    })
    
    new_data.to_csv('output_basic.csv', mode='a', header=False, index=False)
    print("新数据已追加到 output_basic.csv")
    
    # 验证追加结果
    appended_df = pd.read_csv('output_basic.csv')
    print(f"追加后的数据:\n{appended_df}")
    
    return df, df_chinese

# 运行示例
csv_write_dfs = csv_writing_examples()

3.3 Excel文件操作

3.3.1 Excel文件读取

def excel_reading_examples():
    """Excel文件读取示例"""
    
    # 首先创建示例Excel文件
    df1 = pd.DataFrame({
        'Product': ['A', 'B', 'C', 'D', 'E'],
        'Q1_Sales': [100, 150, 200, 120, 180],
        'Q2_Sales': [110, 160, 190, 130, 170],
        'Q3_Sales': [120, 140, 210, 125, 185],
        'Q4_Sales': [130, 155, 205, 135, 175]
    })
    
    df2 = pd.DataFrame({
        'Employee': ['Alice', 'Bob', 'Charlie', 'Diana'],
        'Department': ['IT', 'HR', 'IT', 'Marketing'],
        'Salary': [50000, 60000, 70000, 55000],
        'Bonus': [5000, 6000, 7000, 5500]
    })
    
    # 创建Excel文件
    with pd.ExcelWriter('sample_data.xlsx', engine='openpyxl') as writer:
        df1.to_sheet(writer, sheet_name='Sales', index=False)
        df2.to_sheet(writer, sheet_name='Employees', index=False)
    
    print("Excel文件已创建")
    
    # 1. 读取默认工作表
    print(f"\n1. 读取默认工作表:")
    df_default = pd.read_excel('sample_data.xlsx')
    print(df_default)
    
    # 2. 读取指定工作表
    print(f"\n2. 读取指定工作表:")
    df_employees = pd.read_excel('sample_data.xlsx', sheet_name='Employees')
    print(df_employees)
    
    # 3. 读取多个工作表
    print(f"\n3. 读取多个工作表:")
    all_sheets = pd.read_excel('sample_data.xlsx', sheet_name=None)
    for sheet_name, sheet_df in all_sheets.items():
        print(f"工作表: {sheet_name}")
        print(sheet_df.head())
        print()
    
    # 4. 指定读取范围
    print(f"\n4. 指定读取范围:")
    df_range = pd.read_excel('sample_data.xlsx', sheet_name='Sales', 
                           usecols='A:C', nrows=3)
    print(df_range)
    
    # 5. 跳过行
    print(f"\n5. 跳过前2行:")
    df_skip = pd.read_excel('sample_data.xlsx', sheet_name='Sales', 
                          skiprows=2)
    print(df_skip)
    
    return df1, df2, all_sheets

# 运行示例
excel_dfs = excel_reading_examples()

3.3.2 Excel文件写入

def excel_writing_examples():
    """Excel文件写入示例"""
    
    # 创建示例数据
    sales_data = pd.DataFrame({
        'Month': ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun'],
        'Product_A': [100, 120, 110, 130, 125, 140],
        'Product_B': [80, 90, 85, 95, 88, 100],
        'Product_C': [60, 70, 65, 75, 72, 80]
    })
    
    employee_data = pd.DataFrame({
        'Name': ['Alice', 'Bob', 'Charlie', 'Diana'],
        'Department': ['IT', 'HR', 'Finance', 'Marketing'],
        'Salary': [50000, 55000, 60000, 52000],
        'Performance': [4.5, 4.2, 4.8, 4.3]
    })
    
    summary_data = pd.DataFrame({
        'Metric': ['Total Sales', 'Average Salary', 'Employee Count'],
        'Value': [sales_data.iloc[:, 1:].sum().sum(), 
                 employee_data['Salary'].mean(),
                 len(employee_data)]
    })
    
    print("准备写入的数据:")
    print("销售数据:")
    print(sales_data)
    print("\n员工数据:")
    print(employee_data)
    print("\n汇总数据:")
    print(summary_data)
    
    # 1. 基础写入
    print(f"\n1. 基础写入:")
    sales_data.to_excel('output_basic.xlsx', index=False)
    print("文件已保存为 output_basic.xlsx")
    
    # 2. 多工作表写入
    print(f"\n2. 多工作表写入:")
    with pd.ExcelWriter('output_multiple_sheets.xlsx', engine='openpyxl') as writer:
        sales_data.to_excel(writer, sheet_name='Sales', index=False)
        employee_data.to_excel(writer, sheet_name='Employees', index=False)
        summary_data.to_excel(writer, sheet_name='Summary', index=False)
    print("多工作表文件已保存")
    
    # 3. 格式化写入
    print(f"\n3. 格式化写入:")
    with pd.ExcelWriter('output_formatted.xlsx', engine='openpyxl') as writer:
        # 写入数据
        sales_data.to_excel(writer, sheet_name='Sales', index=False, startrow=1)
        employee_data.to_excel(writer, sheet_name='Employees', index=False, startrow=1)
        
        # 获取工作簿和工作表
        workbook = writer.book
        sales_sheet = writer.sheets['Sales']
        employee_sheet = writer.sheets['Employees']
        
        # 添加标题
        sales_sheet['A1'] = '销售数据报告'
        employee_sheet['A1'] = '员工信息表'
        
        # 设置列宽
        for sheet in [sales_sheet, employee_sheet]:
            for column in sheet.columns:
                max_length = 0
                column_letter = column[0].column_letter
                for cell in column:
                    try:
                        if len(str(cell.value)) > max_length:
                            max_length = len(str(cell.value))
                    except:
                        pass
                adjusted_width = min(max_length + 2, 50)
                sheet.column_dimensions[column_letter].width = adjusted_width
    
    print("格式化文件已保存")
    
    # 4. 追加到现有文件
    print(f"\n4. 追加到现有文件:")
    new_sales = pd.DataFrame({
        'Month': ['Jul', 'Aug'],
        'Product_A': [145, 150],
        'Product_B': [105, 110],
        'Product_C': [85, 90]
    })
    
    # 读取现有数据
    existing_data = pd.read_excel('output_basic.xlsx')
    # 合并数据
    combined_data = pd.concat([existing_data, new_sales], ignore_index=True)
    # 写回文件
    combined_data.to_excel('output_basic.xlsx', index=False)
    print("数据已追加到现有文件")
    
    return sales_data, employee_data, summary_data

# 运行示例
excel_write_data = excel_writing_examples()

3.4 JSON文件操作

3.4.1 JSON文件读取

def json_reading_examples():
    """JSON文件读取示例"""
    
    # 创建示例JSON数据
    json_data = {
        "employees": [
            {"name": "Alice", "age": 25, "department": "IT", "salary": 50000},
            {"name": "Bob", "age": 30, "department": "HR", "salary": 60000},
            {"name": "Charlie", "age": 35, "department": "IT", "salary": 70000}
        ],
        "company": "TechCorp",
        "location": "San Francisco"
    }
    
    # 保存为JSON文件
    with open('sample_data.json', 'w') as f:
        json.dump(json_data, f, indent=2)
    
    # 1. 基础JSON读取
    print("1. 基础JSON读取:")
    df1 = pd.read_json('sample_data.json')
    print(df1)
    
    # 2. 指定orient参数
    # 创建不同格式的JSON
    records_json = [
        {"name": "Alice", "age": 25, "city": "NY"},
        {"name": "Bob", "age": 30, "city": "LA"},
        {"name": "Charlie", "age": 35, "city": "SF"}
    ]
    
    with open('records.json', 'w') as f:
        json.dump(records_json, f, indent=2)
    
    print(f"\n2. records格式JSON:")
    df2 = pd.read_json('records.json', orient='records')
    print(df2)
    
    # 3. 嵌套JSON处理
    nested_json = {
        "data": {
            "users": [
                {"id": 1, "profile": {"name": "Alice", "age": 25}},
                {"id": 2, "profile": {"name": "Bob", "age": 30}}
            ]
        }
    }
    
    with open('nested.json', 'w') as f:
        json.dump(nested_json, f, indent=2)
    
    print(f"\n3. 处理嵌套JSON:")
    # 使用json_normalize处理嵌套结构
    with open('nested.json', 'r') as f:
        data = json.load(f)
    
    df3 = pd.json_normalize(data['data']['users'])
    print(df3)
    
    # 4. 从URL读取JSON
    print(f"\n4. 从字符串读取JSON:")
    json_string = '''[
        {"product": "A", "price": 100, "stock": 50},
        {"product": "B", "price": 150, "stock": 30},
        {"product": "C", "price": 200, "stock": 20}
    ]'''
    
    df4 = pd.read_json(json_string, orient='records')
    print(df4)
    
    # 5. 处理日期时间
    datetime_json = [
        {"event": "Meeting", "timestamp": "2024-01-15T10:00:00"},
        {"event": "Call", "timestamp": "2024-01-15T14:30:00"},
        {"event": "Review", "timestamp": "2024-01-15T16:00:00"}
    ]
    
    with open('datetime.json', 'w') as f:
        json.dump(datetime_json, f, indent=2)
    
    print(f"\n5. 处理日期时间:")
    df5 = pd.read_json('datetime.json', orient='records')
    df5['timestamp'] = pd.to_datetime(df5['timestamp'])
    print(df5)
    print(f"数据类型:\n{df5.dtypes}")
    
    return df1, df2, df3, df4, df5

# 运行示例
json_dfs = json_reading_examples()

3.4.2 JSON文件写入

def json_writing_examples():
    """JSON文件写入示例"""
    
    # 创建示例DataFrame
    df = pd.DataFrame({
        'product_id': [1, 2, 3, 4, 5],
        'product_name': ['Laptop', 'Mouse', 'Keyboard', 'Monitor', 'Headphones'],
        'price': [999.99, 29.99, 79.99, 299.99, 149.99],
        'category': ['Electronics', 'Accessories', 'Accessories', 'Electronics', 'Accessories'],
        'in_stock': [True, True, False, True, True],
        'last_updated': pd.date_range('2024-01-01', periods=5, freq='D')
    })
    
    print("原始DataFrame:")
    print(df)
    
    # 1. 基础JSON写入
    print(f"\n1. 基础JSON写入 (records格式):")
    df.to_json('output_records.json', orient='records', indent=2)
    print("文件已保存为 output_records.json")
    
    # 验证写入结果
    with open('output_records.json', 'r') as f:
        content = f.read()
        print("文件内容预览:")
        print(content[:200] + "...")
    
    # 2. 不同的orient选项
    print(f"\n2. 不同的orient选项:")
    
    # index格式
    df.to_json('output_index.json', orient='index', indent=2)
    print("index格式已保存")
    
    # values格式
    df.to_json('output_values.json', orient='values', indent=2)
    print("values格式已保存")
    
    # columns格式
    df.to_json('output_columns.json', orient='columns', indent=2)
    print("columns格式已保存")
    
    # 3. 处理日期格式
    print(f"\n3. 处理日期格式:")
    df.to_json('output_date_iso.json', orient='records', date_format='iso', indent=2)
    df.to_json('output_date_epoch.json', orient='records', date_format='epoch', indent=2)
    print("不同日期格式已保存")
    
    # 4. 自定义JSON结构
    print(f"\n4. 创建自定义JSON结构:")
    custom_json = {
        "metadata": {
            "total_products": len(df),
            "categories": df['category'].unique().tolist(),
            "last_updated": datetime.now().isoformat()
        },
        "products": df.to_dict('records')
    }
    
    with open('output_custom.json', 'w') as f:
        json.dump(custom_json, f, indent=2, default=str)
    print("自定义结构JSON已保存")
    
    # 5. 压缩JSON
    print(f"\n5. 压缩JSON (无缩进):")
    df.to_json('output_compressed.json', orient='records')
    print("压缩JSON已保存")
    
    # 比较文件大小
    import os
    size_formatted = os.path.getsize('output_records.json')
    size_compressed = os.path.getsize('output_compressed.json')
    print(f"格式化文件大小: {size_formatted} bytes")
    print(f"压缩文件大小: {size_compressed} bytes")
    print(f"压缩比: {size_compressed/size_formatted:.2%}")
    
    return df, custom_json

# 运行示例
json_write_data = json_writing_examples()

3.5 数据库操作

3.5.1 SQLite数据库操作

import sqlite3

def sqlite_operations():
    """SQLite数据库操作示例"""
    
    # 创建示例数据
    employees_df = pd.DataFrame({
        'employee_id': [1, 2, 3, 4, 5],
        'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
        'department': ['IT', 'HR', 'Finance', 'Marketing', 'IT'],
        'salary': [50000, 55000, 60000, 52000, 58000],
        'hire_date': pd.date_range('2020-01-01', periods=5, freq='6M')
    })
    
    projects_df = pd.DataFrame({
        'project_id': [101, 102, 103, 104],
        'project_name': ['Website Redesign', 'Mobile App', 'Data Migration', 'Marketing Campaign'],
        'manager_id': [1, 3, 1, 4],
        'budget': [50000, 80000, 30000, 25000],
        'status': ['Active', 'Planning', 'Completed', 'Active']
    })
    
    print("员工数据:")
    print(employees_df)
    print("\n项目数据:")
    print(projects_df)
    
    # 1. 创建数据库连接
    print(f"\n1. 创建SQLite数据库:")
    conn = sqlite3.connect('company.db')
    
    # 2. 写入数据到数据库
    print(f"\n2. 写入数据到数据库:")
    employees_df.to_sql('employees', conn, if_exists='replace', index=False)
    projects_df.to_sql('projects', conn, if_exists='replace', index=False)
    print("数据已写入数据库")
    
    # 3. 从数据库读取数据
    print(f"\n3. 从数据库读取数据:")
    df_employees = pd.read_sql('SELECT * FROM employees', conn)
    print("从数据库读取的员工数据:")
    print(df_employees)
    
    # 4. 执行SQL查询
    print(f"\n4. 执行SQL查询:")
    
    # 查询IT部门员工
    it_employees = pd.read_sql("""
        SELECT name, salary 
        FROM employees 
        WHERE department = 'IT'
        ORDER BY salary DESC
    """, conn)
    print("IT部门员工:")
    print(it_employees)
    
    # 连接查询
    project_managers = pd.read_sql("""
        SELECT p.project_name, e.name as manager_name, p.budget
        FROM projects p
        JOIN employees e ON p.manager_id = e.employee_id
        WHERE p.status = 'Active'
    """, conn)
    print("\n活跃项目及其经理:")
    print(project_managers)
    
    # 聚合查询
    dept_stats = pd.read_sql("""
        SELECT department, 
               COUNT(*) as employee_count,
               AVG(salary) as avg_salary,
               MAX(salary) as max_salary
        FROM employees
        GROUP BY department
    """, conn)
    print("\n部门统计:")
    print(dept_stats)
    
    # 5. 参数化查询
    print(f"\n5. 参数化查询:")
    min_salary = 55000
    high_earners = pd.read_sql("""
        SELECT name, department, salary
        FROM employees
        WHERE salary >= ?
        ORDER BY salary DESC
    """, conn, params=[min_salary])
    print(f"薪资大于等于{min_salary}的员工:")
    print(high_earners)
    
    # 6. 更新数据
    print(f"\n6. 更新数据:")
    # 给IT部门员工加薪5%
    cursor = conn.cursor()
    cursor.execute("""
        UPDATE employees 
        SET salary = salary * 1.05 
        WHERE department = 'IT'
    """)
    conn.commit()
    
    # 验证更新
    updated_employees = pd.read_sql('SELECT * FROM employees WHERE department = "IT"', conn)
    print("IT部门员工加薪后:")
    print(updated_employees[['name', 'salary']])
    
    # 关闭连接
    conn.close()
    
    return employees_df, projects_df

# 运行示例
db_data = sqlite_operations()

3.5.2 其他数据库连接

def database_connection_examples():
    """其他数据库连接示例"""
    
    print("数据库连接字符串示例:")
    
    # 1. MySQL连接
    mysql_example = """
    # MySQL连接示例 (需要安装 pymysql 或 mysqlclient)
    import pymysql
    from sqlalchemy import create_engine
    
    # 连接字符串
    mysql_url = 'mysql+pymysql://username:password@localhost:3306/database_name'
    engine = create_engine(mysql_url)
    
    # 读取数据
    df = pd.read_sql('SELECT * FROM table_name', engine)
    
    # 写入数据
    df.to_sql('new_table', engine, if_exists='replace', index=False)
    """
    
    # 2. PostgreSQL连接
    postgresql_example = """
    # PostgreSQL连接示例 (需要安装 psycopg2)
    from sqlalchemy import create_engine
    
    # 连接字符串
    postgres_url = 'postgresql://username:password@localhost:5432/database_name'
    engine = create_engine(postgres_url)
    
    # 读取数据
    df = pd.read_sql('SELECT * FROM table_name', engine)
    
    # 写入数据
    df.to_sql('new_table', engine, if_exists='replace', index=False)
    """
    
    # 3. SQL Server连接
    sqlserver_example = """
    # SQL Server连接示例 (需要安装 pyodbc)
    from sqlalchemy import create_engine
    
    # 连接字符串
    sqlserver_url = 'mssql+pyodbc://username:password@server/database?driver=ODBC+Driver+17+for+SQL+Server'
    engine = create_engine(sqlserver_url)
    
    # 读取数据
    df = pd.read_sql('SELECT * FROM table_name', engine)
    
    # 写入数据
    df.to_sql('new_table', engine, if_exists='replace', index=False)
    """
    
    print("1. MySQL连接:")
    print(mysql_example)
    
    print("\n2. PostgreSQL连接:")
    print(postgresql_example)
    
    print("\n3. SQL Server连接:")
    print(sqlserver_example)
    
    # 4. 通用数据库操作函数
    def database_operations(engine):
        """通用数据库操作函数"""
        
        # 创建示例数据
        df = pd.DataFrame({
            'id': range(1, 6),
            'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
            'value': [100, 200, 300, 400, 500]
        })
        
        # 写入数据
        df.to_sql('test_table', engine, if_exists='replace', index=False)
        
        # 读取数据
        result = pd.read_sql('SELECT * FROM test_table WHERE value > 200', engine)
        
        return result
    
    print("\n4. 通用数据库操作函数:")
    print("""
    def database_operations(engine):
        # 创建示例数据
        df = pd.DataFrame({
            'id': range(1, 6),
            'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
            'value': [100, 200, 300, 400, 500]
        })
        
        # 写入数据
        df.to_sql('test_table', engine, if_exists='replace', index=False)
        
        # 读取数据
        result = pd.read_sql('SELECT * FROM test_table WHERE value > 200', engine)
        
        return result
    """)
    
    return mysql_example, postgresql_example, sqlserver_example

# 运行示例
db_examples = database_connection_examples()

3.6 其他格式操作

3.6.1 Parquet文件操作

def parquet_operations():
    """Parquet文件操作示例"""
    
    # 创建示例数据
    large_df = pd.DataFrame({
        'id': range(10000),
        'category': np.random.choice(['A', 'B', 'C', 'D'], 10000),
        'value': np.random.randn(10000),
        'timestamp': pd.date_range('2024-01-01', periods=10000, freq='1min'),
        'flag': np.random.choice([True, False], 10000)
    })
    
    print(f"示例数据形状: {large_df.shape}")
    print(f"数据类型:\n{large_df.dtypes}")
    print(f"前5行:\n{large_df.head()}")
    
    # 1. 写入Parquet文件
    print(f"\n1. 写入Parquet文件:")
    large_df.to_parquet('data.parquet', index=False)
    print("Parquet文件已保存")
    
    # 2. 读取Parquet文件
    print(f"\n2. 读取Parquet文件:")
    df_parquet = pd.read_parquet('data.parquet')
    print(f"读取的数据形状: {df_parquet.shape}")
    print(f"数据类型:\n{df_parquet.dtypes}")
    
    # 3. 选择特定列
    print(f"\n3. 选择特定列:")
    df_selected = pd.read_parquet('data.parquet', columns=['id', 'category', 'value'])
    print(f"选择列后的形状: {df_selected.shape}")
    print(df_selected.head())
    
    # 4. 压缩选项
    print(f"\n4. 不同压缩选项:")
    compression_options = ['snappy', 'gzip', 'brotli']
    
    for compression in compression_options:
        filename = f'data_{compression}.parquet'
        large_df.to_parquet(filename, compression=compression, index=False)
        
        # 检查文件大小
        import os
        size = os.path.getsize(filename)
        print(f"{compression}压缩文件大小: {size:,} bytes")
    
    # 5. 比较CSV和Parquet
    print(f"\n5. 比较CSV和Parquet:")
    
    # 保存为CSV
    large_df.to_csv('data.csv', index=False)
    
    # 比较文件大小
    csv_size = os.path.getsize('data.csv')
    parquet_size = os.path.getsize('data.parquet')
    
    print(f"CSV文件大小: {csv_size:,} bytes")
    print(f"Parquet文件大小: {parquet_size:,} bytes")
    print(f"Parquet相对CSV的大小: {parquet_size/csv_size:.2%}")
    
    # 比较读取速度
    import time
    
    # CSV读取时间
    start_time = time.time()
    df_csv = pd.read_csv('data.csv')
    csv_time = time.time() - start_time
    
    # Parquet读取时间
    start_time = time.time()
    df_parquet = pd.read_parquet('data.parquet')
    parquet_time = time.time() - start_time
    
    print(f"CSV读取时间: {csv_time:.3f} 秒")
    print(f"Parquet读取时间: {parquet_time:.3f} 秒")
    print(f"Parquet相对CSV的读取速度: {csv_time/parquet_time:.2f}x 更快")
    
    return large_df

# 运行示例
parquet_df = parquet_operations()

3.6.2 HDF5文件操作

def hdf5_operations():
    """HDF5文件操作示例"""
    
    # 创建示例数据
    df1 = pd.DataFrame({
        'date': pd.date_range('2024-01-01', periods=100),
        'stock_A': np.random.randn(100).cumsum() + 100,
        'stock_B': np.random.randn(100).cumsum() + 50,
        'volume': np.random.randint(1000, 10000, 100)
    })
    
    df2 = pd.DataFrame({
        'employee_id': range(1, 51),
        'name': [f'Employee_{i}' for i in range(1, 51)],
        'department': np.random.choice(['IT', 'HR', 'Finance', 'Marketing'], 50),
        'salary': np.random.normal(60000, 15000, 50)
    })
    
    print("股票数据:")
    print(df1.head())
    print("\n员工数据:")
    print(df2.head())
    
    # 1. 写入HDF5文件
    print(f"\n1. 写入HDF5文件:")
    df1.to_hdf('data.h5', key='stocks', mode='w')
    df2.to_hdf('data.h5', key='employees', mode='a')
    print("HDF5文件已保存")
    
    # 2. 读取HDF5文件
    print(f"\n2. 读取HDF5文件:")
    stocks_df = pd.read_hdf('data.h5', key='stocks')
    employees_df = pd.read_hdf('data.h5', key='employees')
    
    print(f"股票数据形状: {stocks_df.shape}")
    print(f"员工数据形状: {employees_df.shape}")
    
    # 3. 查看HDF5文件信息
    print(f"\n3. HDF5文件信息:")
    with pd.HDFStore('data.h5', mode='r') as store:
        print(f"文件中的键: {list(store.keys())}")
        print(f"stocks信息: {store.get_storer('stocks').shape}")
        print(f"employees信息: {store.get_storer('employees').shape}")
    
    # 4. 条件查询
    print(f"\n4. 条件查询:")
    # 注意:条件查询需要使用table格式
    df1.to_hdf('data_table.h5', key='stocks', mode='w', format='table')
    
    # 查询特定日期范围的数据
    query_result = pd.read_hdf('data_table.h5', key='stocks', 
                              where='date >= "2024-01-15" & date <= "2024-01-20"')
    print(f"查询结果:")
    print(query_result)
    
    # 5. 追加数据
    print(f"\n5. 追加数据:")
    new_data = pd.DataFrame({
        'date': pd.date_range('2024-04-11', periods=10),
        'stock_A': np.random.randn(10).cumsum() + 100,
        'stock_B': np.random.randn(10).cumsum() + 50,
        'volume': np.random.randint(1000, 10000, 10)
    })
    
    new_data.to_hdf('data_table.h5', key='stocks', mode='a', format='table', append=True)
    
    # 验证追加结果
    updated_df = pd.read_hdf('data_table.h5', key='stocks')
    print(f"追加后的数据形状: {updated_df.shape}")
    print(f"最后几行:\n{updated_df.tail()}")
    
    return df1, df2

# 运行示例
hdf5_data = hdf5_operations()

3.6.3 HTML和XML操作

def html_xml_operations():
    """HTML和XML操作示例"""
    
    # 1. HTML表格操作
    print("1. HTML表格操作:")
    
    # 创建示例DataFrame
    df = pd.DataFrame({
        'Product': ['Laptop', 'Mouse', 'Keyboard', 'Monitor'],
        'Price': [999.99, 29.99, 79.99, 299.99],
        'Stock': [15, 50, 30, 8],
        'Rating': [4.5, 4.2, 4.0, 4.8]
    })
    
    print("原始数据:")
    print(df)
    
    # 写入HTML
    html_string = df.to_html(index=False, table_id='product_table', 
                           classes='table table-striped')
    
    # 创建完整的HTML页面
    full_html = f"""
    <!DOCTYPE html>
    <html>
    <head>
        <title>产品列表</title>
        <style>
            .table {{ border-collapse: collapse; width: 100%; }}
            .table th, .table td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
            .table th {{ background-color: #f2f2f2; }}
            .table-striped tr:nth-child(even) {{ background-color: #f9f9f9; }}
        </style>
    </head>
    <body>
        <h1>产品信息表</h1>
        {html_string}
    </body>
    </html>
    """
    
    with open('products.html', 'w', encoding='utf-8') as f:
        f.write(full_html)
    
    print("HTML文件已保存为 products.html")
    
    # 从HTML读取表格
    print(f"\n从HTML读取表格:")
    try:
        # 注意:需要安装 lxml 或 html5lib
        html_tables = pd.read_html('products.html')
        print(f"找到 {len(html_tables)} 个表格")
        print("第一个表格:")
        print(html_tables[0])
    except ImportError:
        print("需要安装 lxml 或 html5lib 来读取HTML表格")
        print("pip install lxml")
    
    # 2. XML操作示例
    print(f"\n2. XML操作示例:")
    
    # 创建XML数据
    xml_data = """<?xml version="1.0" encoding="UTF-8"?>
    <products>
        <product id="1">
            <name>Laptop</name>
            <price>999.99</price>
            <stock>15</stock>
            <category>Electronics</category>
        </product>
        <product id="2">
            <name>Mouse</name>
            <price>29.99</price>
            <stock>50</stock>
            <category>Accessories</category>
        </product>
        <product id="3">
            <name>Keyboard</name>
            <price>79.99</price>
            <stock>30</stock>
            <category>Accessories</category>
        </product>
    </products>"""
    
    with open('products.xml', 'w', encoding='utf-8') as f:
        f.write(xml_data)
    
    print("XML文件已创建")
    
    # 从XML读取数据
    try:
        # 使用read_xml读取(pandas 1.3.0+)
        xml_df = pd.read_xml('products.xml', xpath='.//product')
        print("从XML读取的数据:")
        print(xml_df)
    except AttributeError:
        print("read_xml需要pandas 1.3.0或更高版本")
        
        # 使用xml.etree.ElementTree手动解析
        import xml.etree.ElementTree as ET
        
        tree = ET.parse('products.xml')
        root = tree.getroot()
        
        products = []
        for product in root.findall('product'):
            products.append({
                'id': product.get('id'),
                'name': product.find('name').text,
                'price': float(product.find('price').text),
                'stock': int(product.find('stock').text),
                'category': product.find('category').text
            })
        
        xml_df = pd.DataFrame(products)
        print("手动解析XML的数据:")
        print(xml_df)
    
    # 3. 网页表格抓取示例
    print(f"\n3. 网页表格抓取示例:")
    
    # 创建示例网页
    sample_webpage = """
    <!DOCTYPE html>
    <html>
    <head><title>股票数据</title></head>
    <body>
        <h1>股票价格</h1>
        <table border="1">
            <tr><th>股票代码</th><th>公司名称</th><th>当前价格</th><th>涨跌幅</th></tr>
            <tr><td>AAPL</td><td>Apple Inc.</td><td>150.25</td><td>+2.5%</td></tr>
            <tr><td>GOOGL</td><td>Alphabet Inc.</td><td>2800.50</td><td>+1.2%</td></tr>
            <tr><td>MSFT</td><td>Microsoft Corp.</td><td>300.75</td><td>-0.8%</td></tr>
        </table>
        
        <h2>财务数据</h2>
        <table border="1">
            <tr><th>指标</th><th>Q1</th><th>Q2</th><th>Q3</th><th>Q4</th></tr>
            <tr><td>收入</td><td>1000</td><td>1200</td><td>1100</td><td>1300</td></tr>
            <tr><td>利润</td><td>200</td><td>250</td><td>220</td><td>280</td></tr>
        </table>
    </body>
    </html>
    """
    
    with open('sample_webpage.html', 'w', encoding='utf-8') as f:
        f.write(sample_webpage)
    
    try:
        # 读取网页中的所有表格
        tables = pd.read_html('sample_webpage.html')
        
        print(f"找到 {len(tables)} 个表格")
        
        for i, table in enumerate(tables):
            print(f"\n表格 {i+1}:")
            print(table)
            
    except ImportError:
        print("需要安装相关依赖来读取HTML表格")
    
    return df, xml_df if 'xml_df' in locals() else None

# 运行示例
html_xml_data = html_xml_operations()

3.7 性能优化和最佳实践

3.7.1 读取性能优化

def performance_optimization():
    """读取性能优化技巧"""
    
    # 创建大型测试数据
    large_data = pd.DataFrame({
        'id': range(100000),
        'category': np.random.choice(['A', 'B', 'C', 'D', 'E'], 100000),
        'value1': np.random.randn(100000),
        'value2': np.random.randn(100000),
        'date': pd.date_range('2020-01-01', periods=100000, freq='1min'),
        'text': [f'text_{i}' for i in range(100000)]
    })
    
    print(f"测试数据形状: {large_data.shape}")
    
    # 保存为不同格式
    large_data.to_csv('large_data.csv', index=False)
    large_data.to_parquet('large_data.parquet', index=False)
    
    import time
    
    # 1. 数据类型优化
    print(f"\n1. 数据类型优化:")
    
    # 默认读取
    start_time = time.time()
    df_default = pd.read_csv('large_data.csv')
    default_time = time.time() - start_time
    default_memory = df_default.memory_usage(deep=True).sum()
    
    # 优化数据类型
    dtypes = {
        'id': 'int32',
        'category': 'category',
        'value1': 'float32',
        'value2': 'float32',
        'text': 'category'
    }
    
    start_time = time.time()
    df_optimized = pd.read_csv('large_data.csv', dtype=dtypes, parse_dates=['date'])
    optimized_time = time.time() - start_time
    optimized_memory = df_optimized.memory_usage(deep=True).sum()
    
    print(f"默认读取时间: {default_time:.3f}秒, 内存: {default_memory/1024/1024:.1f}MB")
    print(f"优化读取时间: {optimized_time:.3f}秒, 内存: {optimized_memory/1024/1024:.1f}MB")
    print(f"内存节省: {(1-optimized_memory/default_memory)*100:.1f}%")
    
    # 2. 分块读取
    print(f"\n2. 分块读取:")
    
    def process_chunk(chunk):
        """处理数据块的示例函数"""
        return chunk[chunk['value1'] > 0].groupby('category')['value2'].mean()
    
    start_time = time.time()
    chunk_results = []
    for chunk in pd.read_csv('large_data.csv', chunksize=10000, dtype=dtypes):
        result = process_chunk(chunk)
        chunk_results.append(result)
    
    # 合并结果
    final_result = pd.concat(chunk_results).groupby(level=0).mean()
    chunk_time = time.time() - start_time
    
    print(f"分块处理时间: {chunk_time:.3f}秒")
    print(f"处理结果:\n{final_result}")
    
    # 3. 列选择优化
    print(f"\n3. 列选择优化:")
    
    # 只读取需要的列
    start_time = time.time()
    df_selected = pd.read_csv('large_data.csv', usecols=['id', 'category', 'value1'])
    selected_time = time.time() - start_time
    
    print(f"选择列读取时间: {selected_time:.3f}秒")
    print(f"相对完整读取的时间: {selected_time/default_time:.2%}")
    
    # 4. 格式比较
    print(f"\n4. 不同格式性能比较:")
    
    formats = {
        'CSV': 'large_data.csv',
        'Parquet': 'large_data.parquet'
    }
    
    for format_name, filename in formats.items():
        start_time = time.time()
        if format_name == 'CSV':
            df = pd.read_csv(filename)
        else:
            df = pd.read_parquet(filename)
        read_time = time.time() - start_time
        
        file_size = os.path.getsize(filename) / 1024 / 1024  # MB
        
        print(f"{format_name}: 读取时间 {read_time:.3f}秒, 文件大小 {file_size:.1f}MB")
    
    return large_data

# 运行示例
perf_data = performance_optimization()

3.7.2 写入性能优化

def writing_optimization():
    """写入性能优化技巧"""
    
    # 创建测试数据
    test_data = pd.DataFrame({
        'id': range(50000),
        'category': np.random.choice(['A', 'B', 'C'], 50000),
        'value': np.random.randn(50000),
        'timestamp': pd.date_range('2024-01-01', periods=50000, freq='1min')
    })
    
    print(f"测试数据形状: {test_data.shape}")
    
    import time
    
    # 1. CSV写入优化
    print(f"\n1. CSV写入优化:")
    
    # 默认写入
    start_time = time.time()
    test_data.to_csv('output_default.csv', index=False)
    default_time = time.time() - start_time
    
    # 优化写入(禁用日期格式化)
    start_time = time.time()
    test_data.to_csv('output_optimized.csv', index=False, date_format='%Y-%m-%d %H:%M:%S')
    optimized_time = time.time() - start_time
    
    print(f"默认写入时间: {default_time:.3f}秒")
    print(f"优化写入时间: {optimized_time:.3f}秒")
    
    # 2. 不同格式写入比较
    print(f"\n2. 不同格式写入比较:")
    
    formats = {
        'CSV': lambda df, filename: df.to_csv(filename, index=False),
        'Parquet': lambda df, filename: df.to_parquet(filename, index=False),
        'JSON': lambda df, filename: df.to_json(filename, orient='records'),
        'Pickle': lambda df, filename: df.to_pickle(filename)
    }
    
    for format_name, write_func in formats.items():
        filename = f'output_test.{format_name.lower()}'
        
        start_time = time.time()
        write_func(test_data, filename)
        write_time = time.time() - start_time
        
        file_size = os.path.getsize(filename) / 1024 / 1024  # MB
        
        print(f"{format_name}: 写入时间 {write_time:.3f}秒, 文件大小 {file_size:.1f}MB")
    
    # 3. 批量写入优化
    print(f"\n3. 批量写入优化:")
    
    # 模拟分批数据写入
    batch_size = 10000
    total_rows = 50000
    
    # 方法1:多次写入同一文件
    start_time = time.time()
    for i in range(0, total_rows, batch_size):
        batch_data = test_data.iloc[i:i+batch_size]
        mode = 'w' if i == 0 else 'a'
        header = i == 0
        batch_data.to_csv('batch_output.csv', mode=mode, header=header, index=False)
    batch_time = time.time() - start_time
    
    # 方法2:收集后一次性写入
    start_time = time.time()
    all_batches = []
    for i in range(0, total_rows, batch_size):
        batch_data = test_data.iloc[i:i+batch_size]
        all_batches.append(batch_data)
    
    combined_data = pd.concat(all_batches, ignore_index=True)
    combined_data.to_csv('combined_output.csv', index=False)
    combined_time = time.time() - start_time
    
    print(f"分批写入时间: {batch_time:.3f}秒")
    print(f"合并写入时间: {combined_time:.3f}秒")
    
    return test_data

# 运行示例
write_data = writing_optimization()

3.8 本章小结

3.8.1 核心知识点

  1. 文件格式支持

    • CSV:最通用的文本格式,易于处理和交换
    • Excel:支持多工作表,适合商业环境
    • JSON:Web数据交换标准格式
    • Parquet:高性能列式存储格式
    • HDF5:层次化数据格式,支持大数据
    • SQL:关系数据库标准接口
  2. 读取优化技巧

    • 指定数据类型减少内存使用
    • 选择必要列提高读取速度
    • 分块读取处理大文件
    • 使用高效格式如Parquet
  3. 写入最佳实践

    • 选择合适的文件格式
    • 考虑压缩选项节省空间
    • 批量操作提高效率
    • 处理编码问题
  4. 数据库操作

    • 使用参数化查询防止SQL注入
    • 合理使用索引提高查询性能
    • 批量操作减少数据库连接开销

3.8.2 实践要点

  • 根据数据特点选择合适的存储格式
  • 在读取大文件时考虑内存限制
  • 使用数据类型优化减少内存占用
  • 定期备份重要数据
  • 注意字符编码问题,特别是中文数据

3.8.3 性能对比总结

格式 读取速度 写入速度 文件大小 兼容性 适用场景
CSV 中等 极好 数据交换、简单分析
Excel 中等 商业报告、多表数据
JSON 中等 中等 Web应用、API数据
Parquet 中等 大数据分析、存储
HDF5 中等 科学计算、时间序列
Pickle 极快 极快 中等 Python内部使用

3.8.4 下一步学习

在下一章中,我们将学习: - 数据清洗技术 - 缺失值处理 - 重复数据处理 - 数据类型转换 - 异常值检测和处理


练习题

  1. 比较不同文件格式的读写性能
  2. 实现一个通用的数据导入导出工具
  3. 处理包含中文的CSV文件编码问题
  4. 设计一个数据库批量导入方案
  5. 优化大文件的读取和处理流程

记住:选择合适的数据格式是数据分析效率的关键!