概述

本章将详细介绍TiDB的基础操作和SQL语法,包括数据库和表的管理、数据的增删改查、索引操作、事务处理等。TiDB高度兼容MySQL语法,同时提供了一些分布式数据库特有的功能。

学习目标

通过本章学习,您将了解: - TiDB的连接和基础操作 - 数据库和表的创建与管理 - 数据的增删改查操作 - 索引的创建和优化 - 事务处理和并发控制 - TiDB特有的SQL功能 - 性能监控和优化技巧

连接TiDB

1. 连接方式

from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Any, Optional, Tuple, Union
import json
import time
import random
from datetime import datetime, timedelta

class ConnectionType(Enum):
    """连接类型"""
    MYSQL_CLIENT = "mysql_client"      # MySQL客户端
    PYTHON_PYMYSQL = "python_pymysql"  # Python PyMySQL
    JAVA_JDBC = "java_jdbc"            # Java JDBC
    GOLANG_DRIVER = "golang_driver"    # Go MySQL Driver
    NODEJS_MYSQL = "nodejs_mysql"      # Node.js MySQL

class SQLType(Enum):
    """SQL类型"""
    DDL = "ddl"    # 数据定义语言
    DML = "dml"    # 数据操作语言
    DQL = "dql"    # 数据查询语言
    DCL = "dcl"    # 数据控制语言
    TCL = "tcl"    # 事务控制语言

class IndexType(Enum):
    """索引类型"""
    PRIMARY = "primary"        # 主键索引
    UNIQUE = "unique"          # 唯一索引
    NORMAL = "normal"          # 普通索引
    FULLTEXT = "fulltext"      # 全文索引
    COMPOSITE = "composite"    # 复合索引

class TransactionLevel(Enum):
    """事务隔离级别"""
    READ_UNCOMMITTED = "READ UNCOMMITTED"
    READ_COMMITTED = "READ COMMITTED"
    REPEATABLE_READ = "REPEATABLE READ"
    SERIALIZABLE = "SERIALIZABLE"

@dataclass
class ConnectionConfig:
    """连接配置"""
    host: str
    port: int
    username: str
    password: str
    database: str
    charset: str
    connection_timeout: int
    read_timeout: int
    write_timeout: int
    max_connections: int
    ssl_config: Dict[str, Any]

@dataclass
class TableSchema:
    """表结构"""
    table_name: str
    columns: List[Dict[str, Any]]
    primary_key: List[str]
    indexes: List[Dict[str, Any]]
    foreign_keys: List[Dict[str, Any]]
    table_options: Dict[str, Any]
    comment: str

@dataclass
class QueryResult:
    """查询结果"""
    sql: str
    execution_time_ms: float
    rows_affected: int
    rows_returned: int
    columns: List[str]
    data: List[Dict[str, Any]]
    explain_plan: Optional[Dict[str, Any]]
    warnings: List[str]
    errors: List[str]

@dataclass
class TransactionInfo:
    """事务信息"""
    transaction_id: str
    start_time: datetime
    isolation_level: TransactionLevel
    read_only: bool
    statements: List[str]
    status: str
    duration_ms: float

class TiDBOperationManager:
    """TiDB操作管理器"""
    
    def __init__(self):
        self.connection_configs = self._initialize_connection_configs()
        self.sample_schemas = self._initialize_sample_schemas()
        self.query_history = []
        self.transaction_history = []
    
    def _initialize_connection_configs(self) -> Dict[ConnectionType, Dict[str, Any]]:
        """初始化连接配置"""
        configs = {}
        
        # MySQL客户端连接
        configs[ConnectionType.MYSQL_CLIENT] = {
            "command": "mysql -h {host} -P {port} -u {username} -p{password} {database}",
            "example": "mysql -h 192.168.1.10 -P 4000 -u root -p test_db",
            "features": ["交互式操作", "脚本执行", "批量导入", "结果导出"],
            "use_cases": ["日常管理", "数据迁移", "脚本执行", "问题排查"]
        }
        
        # Python PyMySQL连接
        configs[ConnectionType.PYTHON_PYMYSQL] = {
            "code_example": """
import pymysql

connection = pymysql.connect(
    host='192.168.1.10',
    port=4000,
    user='root',
    password='password',
    database='test_db',
    charset='utf8mb4',
    cursorclass=pymysql.cursors.DictCursor
)

try:
    with connection.cursor() as cursor:
        cursor.execute("SELECT VERSION()")
        result = cursor.fetchone()
        print(result)
finally:
    connection.close()
""",
            "features": ["连接池支持", "事务管理", "参数绑定", "异常处理"],
            "use_cases": ["Web应用", "数据分析", "ETL处理", "自动化脚本"]
        }
        
        # Java JDBC连接
        configs[ConnectionType.JAVA_JDBC] = {
            "code_example": """
import java.sql.*;

public class TiDBConnection {
    public static void main(String[] args) {
        String url = "jdbc:mysql://192.168.1.10:4000/test_db";
        String username = "root";
        String password = "password";
        
        try (Connection conn = DriverManager.getConnection(url, username, password)) {
            Statement stmt = conn.createStatement();
            ResultSet rs = stmt.executeQuery("SELECT VERSION()");
            
            while (rs.next()) {
                System.out.println(rs.getString(1));
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}
""",
            "features": ["连接池", "预编译语句", "批处理", "事务控制"],
            "use_cases": ["企业应用", "微服务", "大数据处理", "报表系统"]
        }
        
        # Go MySQL Driver连接
        configs[ConnectionType.GOLANG_DRIVER] = {
            "code_example": """
package main

import (
    "database/sql"
    "fmt"
    "log"
    
    _ "github.com/go-sql-driver/mysql"
)

func main() {
    dsn := "root:password@tcp(192.168.1.10:4000)/test_db?charset=utf8mb4&parseTime=True&loc=Local"
    
    db, err := sql.Open("mysql", dsn)
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()
    
    var version string
    err = db.QueryRow("SELECT VERSION()").Scan(&version)
    if err != nil {
        log.Fatal(err)
    }
    
    fmt.Println("TiDB Version:", version)
}
""",
            "features": ["高性能", "并发安全", "连接池", "上下文支持"],
            "use_cases": ["高性能服务", "云原生应用", "API服务", "数据处理"]
        }
        
        return configs
    
    def _initialize_sample_schemas(self) -> Dict[str, TableSchema]:
        """初始化示例表结构"""
        schemas = {}
        
        # 用户表
        schemas["users"] = TableSchema(
            table_name="users",
            columns=[
                {"name": "id", "type": "BIGINT", "nullable": False, "auto_increment": True, "comment": "用户ID"},
                {"name": "username", "type": "VARCHAR(50)", "nullable": False, "unique": True, "comment": "用户名"},
                {"name": "email", "type": "VARCHAR(100)", "nullable": False, "unique": True, "comment": "邮箱"},
                {"name": "password_hash", "type": "VARCHAR(255)", "nullable": False, "comment": "密码哈希"},
                {"name": "full_name", "type": "VARCHAR(100)", "nullable": True, "comment": "全名"},
                {"name": "phone", "type": "VARCHAR(20)", "nullable": True, "comment": "电话"},
                {"name": "status", "type": "ENUM('active','inactive','suspended')", "nullable": False, "default": "'active'", "comment": "状态"},
                {"name": "created_at", "type": "TIMESTAMP", "nullable": False, "default": "CURRENT_TIMESTAMP", "comment": "创建时间"},
                {"name": "updated_at", "type": "TIMESTAMP", "nullable": False, "default": "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP", "comment": "更新时间"}
            ],
            primary_key=["id"],
            indexes=[
                {"name": "idx_username", "columns": ["username"], "type": "UNIQUE"},
                {"name": "idx_email", "columns": ["email"], "type": "UNIQUE"},
                {"name": "idx_status", "columns": ["status"], "type": "NORMAL"},
                {"name": "idx_created_at", "columns": ["created_at"], "type": "NORMAL"}
            ],
            foreign_keys=[],
            table_options={"engine": "InnoDB", "charset": "utf8mb4", "collate": "utf8mb4_unicode_ci"},
            comment="用户信息表"
        )
        
        # 订单表
        schemas["orders"] = TableSchema(
            table_name="orders",
            columns=[
                {"name": "id", "type": "BIGINT", "nullable": False, "auto_increment": True, "comment": "订单ID"},
                {"name": "user_id", "type": "BIGINT", "nullable": False, "comment": "用户ID"},
                {"name": "order_no", "type": "VARCHAR(32)", "nullable": False, "unique": True, "comment": "订单号"},
                {"name": "total_amount", "type": "DECIMAL(10,2)", "nullable": False, "comment": "总金额"},
                {"name": "status", "type": "ENUM('pending','paid','shipped','delivered','cancelled')", "nullable": False, "default": "'pending'", "comment": "订单状态"},
                {"name": "payment_method", "type": "VARCHAR(20)", "nullable": True, "comment": "支付方式"},
                {"name": "shipping_address", "type": "TEXT", "nullable": True, "comment": "收货地址"},
                {"name": "order_date", "type": "TIMESTAMP", "nullable": False, "default": "CURRENT_TIMESTAMP", "comment": "下单时间"},
                {"name": "updated_at", "type": "TIMESTAMP", "nullable": False, "default": "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP", "comment": "更新时间"}
            ],
            primary_key=["id"],
            indexes=[
                {"name": "idx_user_id", "columns": ["user_id"], "type": "NORMAL"},
                {"name": "idx_order_no", "columns": ["order_no"], "type": "UNIQUE"},
                {"name": "idx_status", "columns": ["status"], "type": "NORMAL"},
                {"name": "idx_order_date", "columns": ["order_date"], "type": "NORMAL"},
                {"name": "idx_user_status", "columns": ["user_id", "status"], "type": "COMPOSITE"}
            ],
            foreign_keys=[
                {"name": "fk_orders_user_id", "columns": ["user_id"], "ref_table": "users", "ref_columns": ["id"]}
            ],
            table_options={"engine": "InnoDB", "charset": "utf8mb4", "collate": "utf8mb4_unicode_ci"},
            comment="订单信息表"
        )
        
        # 产品表
        schemas["products"] = TableSchema(
            table_name="products",
            columns=[
                {"name": "id", "type": "BIGINT", "nullable": False, "auto_increment": True, "comment": "产品ID"},
                {"name": "name", "type": "VARCHAR(200)", "nullable": False, "comment": "产品名称"},
                {"name": "description", "type": "TEXT", "nullable": True, "comment": "产品描述"},
                {"name": "price", "type": "DECIMAL(10,2)", "nullable": False, "comment": "价格"},
                {"name": "stock_quantity", "type": "INT", "nullable": False, "default": "0", "comment": "库存数量"},
                {"name": "category_id", "type": "INT", "nullable": False, "comment": "分类ID"},
                {"name": "brand", "type": "VARCHAR(100)", "nullable": True, "comment": "品牌"},
                {"name": "sku", "type": "VARCHAR(50)", "nullable": False, "unique": True, "comment": "SKU"},
                {"name": "weight", "type": "DECIMAL(8,3)", "nullable": True, "comment": "重量(kg)"},
                {"name": "is_active", "type": "BOOLEAN", "nullable": False, "default": "TRUE", "comment": "是否激活"},
                {"name": "created_at", "type": "TIMESTAMP", "nullable": False, "default": "CURRENT_TIMESTAMP", "comment": "创建时间"},
                {"name": "updated_at", "type": "TIMESTAMP", "nullable": False, "default": "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP", "comment": "更新时间"}
            ],
            primary_key=["id"],
            indexes=[
                {"name": "idx_name", "columns": ["name"], "type": "NORMAL"},
                {"name": "idx_sku", "columns": ["sku"], "type": "UNIQUE"},
                {"name": "idx_category_id", "columns": ["category_id"], "type": "NORMAL"},
                {"name": "idx_brand", "columns": ["brand"], "type": "NORMAL"},
                {"name": "idx_price", "columns": ["price"], "type": "NORMAL"},
                {"name": "idx_stock", "columns": ["stock_quantity"], "type": "NORMAL"},
                {"name": "idx_category_brand", "columns": ["category_id", "brand"], "type": "COMPOSITE"}
            ],
            foreign_keys=[],
            table_options={"engine": "InnoDB", "charset": "utf8mb4", "collate": "utf8mb4_unicode_ci"},
            comment="产品信息表"
        )
        
        return schemas
    
    def generate_connection_examples(self, connection_type: ConnectionType) -> Dict[str, Any]:
        """生成连接示例"""
        config = self.connection_configs.get(connection_type)
        if not config:
            return {"error": f"不支持的连接类型: {connection_type}"}
        
        example = {
            "connection_type": connection_type.value,
            "description": f"使用{connection_type.value}连接TiDB",
            "features": config.get("features", []),
            "use_cases": config.get("use_cases", []),
            "example_code": config.get("code_example", config.get("command", "")),
            "best_practices": [
                "使用连接池管理连接",
                "设置合适的超时时间",
                "正确处理异常和错误",
                "及时关闭连接释放资源",
                "使用参数绑定防止SQL注入"
            ],
            "common_issues": [
                "连接超时",
                "字符编码问题",
                "SSL连接配置",
                "连接池配置不当",
                "事务隔离级别设置"
            ]
        }
        
        return example
    
    def generate_create_table_sql(self, schema: TableSchema) -> str:
        """生成创建表的SQL"""
        sql_parts = []
        sql_parts.append(f"CREATE TABLE `{schema.table_name}` (")
        
        # 添加列定义
        column_definitions = []
        for col in schema.columns:
            col_def = f"  `{col['name']}` {col['type']}"
            
            if not col.get('nullable', True):
                col_def += " NOT NULL"
            
            if col.get('auto_increment', False):
                col_def += " AUTO_INCREMENT"
            
            if 'default' in col:
                col_def += f" DEFAULT {col['default']}"
            
            if 'comment' in col:
                col_def += f" COMMENT '{col['comment']}'"
            
            column_definitions.append(col_def)
        
        sql_parts.append(",\n".join(column_definitions))
        
        # 添加主键
        if schema.primary_key:
            pk_cols = ", ".join([f"`{col}`" for col in schema.primary_key])
            sql_parts.append(f",\n  PRIMARY KEY ({pk_cols})")
        
        # 添加索引
        for idx in schema.indexes:
            idx_cols = ", ".join([f"`{col}`" for col in idx['columns']])
            if idx['type'] == 'UNIQUE':
                sql_parts.append(f",\n  UNIQUE KEY `{idx['name']}` ({idx_cols})")
            elif idx['type'] in ['NORMAL', 'COMPOSITE']:
                sql_parts.append(f",\n  KEY `{idx['name']}` ({idx_cols})")
        
        # 添加外键
        for fk in schema.foreign_keys:
            fk_cols = ", ".join([f"`{col}`" for col in fk['columns']])
            ref_cols = ", ".join([f"`{col}`" for col in fk['ref_columns']])
            sql_parts.append(f",\n  CONSTRAINT `{fk['name']}` FOREIGN KEY ({fk_cols}) REFERENCES `{fk['ref_table']}` ({ref_cols})")
        
        sql_parts.append("\n)")
        
        # 添加表选项
        options = []
        for key, value in schema.table_options.items():
            if key == 'engine':
                options.append(f"ENGINE={value}")
            elif key == 'charset':
                options.append(f"DEFAULT CHARSET={value}")
            elif key == 'collate':
                options.append(f"COLLATE={value}")
        
        if options:
            sql_parts.append(" " + " ".join(options))
        
        if schema.comment:
            sql_parts.append(f" COMMENT='{schema.comment}'")
        
        sql_parts.append(";")
        
        return "".join(sql_parts)
    
    def generate_sample_data_sql(self, table_name: str, count: int = 10) -> List[str]:
        """生成示例数据SQL"""
        if table_name not in self.sample_schemas:
            return [f"-- 未找到表 {table_name} 的结构定义"]
        
        schema = self.sample_schemas[table_name]
        insert_sqls = []
        
        if table_name == "users":
            for i in range(1, count + 1):
                sql = f"""
INSERT INTO users (username, email, password_hash, full_name, phone, status) VALUES
('user{i:03d}', 'user{i:03d}@example.com', 'hash_{i:03d}', 'User {i:03d}', '138{i:08d}', 'active');"""
                insert_sqls.append(sql.strip())
        
        elif table_name == "products":
            categories = [1, 2, 3, 4, 5]
            brands = ['Apple', 'Samsung', 'Huawei', 'Xiaomi', 'OPPO']
            
            for i in range(1, count + 1):
                category = random.choice(categories)
                brand = random.choice(brands)
                price = round(random.uniform(99.99, 9999.99), 2)
                stock = random.randint(0, 1000)
                
                sql = f"""
INSERT INTO products (name, description, price, stock_quantity, category_id, brand, sku, weight, is_active) VALUES
('Product {i:03d}', 'Description for product {i:03d}', {price}, {stock}, {category}, '{brand}', 'SKU{i:06d}', {random.uniform(0.1, 5.0):.3f}, TRUE);"""
                insert_sqls.append(sql.strip())
        
        elif table_name == "orders":
            statuses = ['pending', 'paid', 'shipped', 'delivered']
            payment_methods = ['credit_card', 'alipay', 'wechat_pay', 'bank_transfer']
            
            for i in range(1, count + 1):
                user_id = random.randint(1, min(count, 10))
                status = random.choice(statuses)
                payment_method = random.choice(payment_methods)
                total_amount = round(random.uniform(50.00, 5000.00), 2)
                
                sql = f"""
INSERT INTO orders (user_id, order_no, total_amount, status, payment_method, shipping_address) VALUES
({user_id}, 'ORD{i:010d}', {total_amount}, '{status}', '{payment_method}', 'Address {i:03d}, City, Province, Country');"""
                insert_sqls.append(sql.strip())
        
        return insert_sqls
    
    def generate_query_examples(self, table_name: str) -> List[Dict[str, Any]]:
        """生成查询示例"""
        examples = []
        
        if table_name == "users":
            examples = [
                {
                    "title": "查询所有活跃用户",
                    "sql": "SELECT id, username, email, full_name, created_at FROM users WHERE status = 'active' ORDER BY created_at DESC;",
                    "description": "获取所有状态为活跃的用户信息",
                    "use_case": "用户管理"
                },
                {
                    "title": "按用户名搜索",
                    "sql": "SELECT * FROM users WHERE username LIKE '%user%' LIMIT 10;",
                    "description": "模糊搜索用户名包含'user'的用户",
                    "use_case": "用户搜索"
                },
                {
                    "title": "统计用户状态分布",
                    "sql": "SELECT status, COUNT(*) as count FROM users GROUP BY status;",
                    "description": "统计不同状态的用户数量",
                    "use_case": "数据分析"
                },
                {
                    "title": "查询最近注册的用户",
                    "sql": "SELECT username, email, created_at FROM users WHERE created_at >= DATE_SUB(NOW(), INTERVAL 7 DAY) ORDER BY created_at DESC;",
                    "description": "查询最近7天注册的用户",
                    "use_case": "用户增长分析"
                }
            ]
        
        elif table_name == "products":
            examples = [
                {
                    "title": "查询热销产品",
                    "sql": "SELECT name, price, stock_quantity, brand FROM products WHERE is_active = TRUE AND stock_quantity > 0 ORDER BY stock_quantity DESC LIMIT 20;",
                    "description": "查询库存充足的活跃产品",
                    "use_case": "产品推荐"
                },
                {
                    "title": "按价格区间查询",
                    "sql": "SELECT name, price, brand FROM products WHERE price BETWEEN 100 AND 1000 AND is_active = TRUE ORDER BY price;",
                    "description": "查询价格在100-1000之间的产品",
                    "use_case": "价格筛选"
                },
                {
                    "title": "品牌销售统计",
                    "sql": "SELECT brand, COUNT(*) as product_count, AVG(price) as avg_price, SUM(stock_quantity) as total_stock FROM products WHERE is_active = TRUE GROUP BY brand ORDER BY product_count DESC;",
                    "description": "统计各品牌的产品数量、平均价格和总库存",
                    "use_case": "品牌分析"
                },
                {
                    "title": "库存预警查询",
                    "sql": "SELECT name, sku, stock_quantity, price FROM products WHERE stock_quantity < 10 AND is_active = TRUE ORDER BY stock_quantity;",
                    "description": "查询库存不足10的产品",
                    "use_case": "库存管理"
                }
            ]
        
        elif table_name == "orders":
            examples = [
                {
                    "title": "查询用户订单",
                    "sql": "SELECT o.order_no, o.total_amount, o.status, o.order_date, u.username FROM orders o JOIN users u ON o.user_id = u.id WHERE u.username = 'user001' ORDER BY o.order_date DESC;",
                    "description": "查询指定用户的所有订单",
                    "use_case": "用户订单管理"
                },
                {
                    "title": "订单状态统计",
                    "sql": "SELECT status, COUNT(*) as order_count, SUM(total_amount) as total_revenue FROM orders GROUP BY status;",
                    "description": "统计不同状态订单的数量和总金额",
                    "use_case": "订单分析"
                },
                {
                    "title": "日销售额统计",
                    "sql": "SELECT DATE(order_date) as order_day, COUNT(*) as order_count, SUM(total_amount) as daily_revenue FROM orders WHERE status IN ('paid', 'shipped', 'delivered') GROUP BY DATE(order_date) ORDER BY order_day DESC LIMIT 30;",
                    "description": "统计最近30天的日销售额",
                    "use_case": "销售分析"
                },
                {
                    "title": "大额订单查询",
                    "sql": "SELECT order_no, user_id, total_amount, status, order_date FROM orders WHERE total_amount > 1000 ORDER BY total_amount DESC;",
                    "description": "查询金额超过1000的大额订单",
                    "use_case": "风险控制"
                }
            ]
        
        return examples
    
    def simulate_query_execution(self, sql: str, table_name: str = None) -> QueryResult:
        """模拟查询执行"""
        start_time = time.time()
        
        # 模拟执行时间
        execution_time = random.uniform(0.5, 50.0)
        time.sleep(0.01)  # 模拟延迟
        
        # 分析SQL类型
        sql_upper = sql.upper().strip()
        if sql_upper.startswith('SELECT'):
            sql_type = SQLType.DQL
            rows_affected = 0
            rows_returned = random.randint(1, 100)
        elif sql_upper.startswith(('INSERT', 'UPDATE', 'DELETE')):
            sql_type = SQLType.DML
            rows_affected = random.randint(1, 10)
            rows_returned = 0
        elif sql_upper.startswith(('CREATE', 'ALTER', 'DROP')):
            sql_type = SQLType.DDL
            rows_affected = 0
            rows_returned = 0
        else:
            sql_type = SQLType.DCL
            rows_affected = 0
            rows_returned = 0
        
        # 生成模拟数据
        columns = []
        data = []
        
        if sql_type == SQLType.DQL and table_name:
            if table_name == "users":
                columns = ["id", "username", "email", "full_name", "status", "created_at"]
                for i in range(min(rows_returned, 10)):
                    data.append({
                        "id": i + 1,
                        "username": f"user{i+1:03d}",
                        "email": f"user{i+1:03d}@example.com",
                        "full_name": f"User {i+1:03d}",
                        "status": "active",
                        "created_at": datetime.now() - timedelta(days=random.randint(1, 365))
                    })
            elif table_name == "products":
                columns = ["id", "name", "price", "stock_quantity", "brand", "is_active"]
                brands = ['Apple', 'Samsung', 'Huawei', 'Xiaomi', 'OPPO']
                for i in range(min(rows_returned, 10)):
                    data.append({
                        "id": i + 1,
                        "name": f"Product {i+1:03d}",
                        "price": round(random.uniform(99.99, 9999.99), 2),
                        "stock_quantity": random.randint(0, 1000),
                        "brand": random.choice(brands),
                        "is_active": True
                    })
            elif table_name == "orders":
                columns = ["id", "order_no", "user_id", "total_amount", "status", "order_date"]
                statuses = ['pending', 'paid', 'shipped', 'delivered']
                for i in range(min(rows_returned, 10)):
                    data.append({
                        "id": i + 1,
                        "order_no": f"ORD{i+1:010d}",
                        "user_id": random.randint(1, 100),
                        "total_amount": round(random.uniform(50.00, 5000.00), 2),
                        "status": random.choice(statuses),
                        "order_date": datetime.now() - timedelta(days=random.randint(1, 30))
                    })
        
        # 生成执行计划
        explain_plan = None
        if sql_type == SQLType.DQL:
            explain_plan = {
                "id": 1,
                "select_type": "SIMPLE",
                "table": table_name or "unknown",
                "type": "ALL" if "WHERE" not in sql_upper else "range",
                "possible_keys": ["PRIMARY", "idx_status"] if table_name else [],
                "key": "idx_status" if "WHERE" in sql_upper else None,
                "rows": rows_returned,
                "extra": "Using where" if "WHERE" in sql_upper else ""
            }
        
        # 生成警告和错误
        warnings = []
        errors = []
        
        if execution_time > 30:
            warnings.append("查询执行时间较长,建议优化")
        
        if rows_returned > 1000:
            warnings.append("返回行数较多,建议添加LIMIT限制")
        
        result = QueryResult(
            sql=sql,
            execution_time_ms=execution_time,
            rows_affected=rows_affected,
            rows_returned=rows_returned,
            columns=columns,
            data=data,
            explain_plan=explain_plan,
            warnings=warnings,
            errors=errors
        )
        
        self.query_history.append(result)
        return result
    
    def generate_index_optimization_suggestions(self, table_name: str, 
                                              query_patterns: List[str]) -> List[Dict[str, Any]]:
        """生成索引优化建议"""
        suggestions = []
        
        if table_name not in self.sample_schemas:
            return [{"error": f"未找到表 {table_name} 的结构定义"}]
        
        schema = self.sample_schemas[table_name]
        existing_indexes = {idx['name']: idx['columns'] for idx in schema.indexes}
        
        # 分析查询模式
        for i, pattern in enumerate(query_patterns):
            pattern_upper = pattern.upper()
            suggestion = {
                "query_pattern": pattern,
                "analysis": [],
                "recommendations": [],
                "estimated_improvement": "未知"
            }
            
            # 分析WHERE条件
            if "WHERE" in pattern_upper:
                where_part = pattern_upper.split("WHERE")[1].split("ORDER")[0].split("GROUP")[0]
                
                # 检查常见的查询条件
                if "STATUS" in where_part:
                    if "idx_status" in existing_indexes:
                        suggestion["analysis"].append("已存在status字段索引")
                    else:
                        suggestion["recommendations"].append({
                            "type": "CREATE_INDEX",
                            "sql": f"CREATE INDEX idx_status ON {table_name} (status);",
                            "reason": "status字段经常用于WHERE条件"
                        })
                
                if "USER_ID" in where_part:
                    if "idx_user_id" in existing_indexes:
                        suggestion["analysis"].append("已存在user_id字段索引")
                    else:
                        suggestion["recommendations"].append({
                            "type": "CREATE_INDEX",
                            "sql": f"CREATE INDEX idx_user_id ON {table_name} (user_id);",
                            "reason": "user_id字段经常用于WHERE条件和JOIN"
                        })
                
                if "CREATED_AT" in where_part or "ORDER_DATE" in where_part:
                    date_field = "created_at" if "CREATED_AT" in where_part else "order_date"
                    idx_name = f"idx_{date_field}"
                    if idx_name in existing_indexes:
                        suggestion["analysis"].append(f"已存在{date_field}字段索引")
                    else:
                        suggestion["recommendations"].append({
                            "type": "CREATE_INDEX",
                            "sql": f"CREATE INDEX {idx_name} ON {table_name} ({date_field});",
                            "reason": f"{date_field}字段经常用于时间范围查询"
                        })
            
            # 分析ORDER BY
            if "ORDER BY" in pattern_upper:
                order_part = pattern_upper.split("ORDER BY")[1].split("LIMIT")[0].strip()
                order_fields = [field.strip().split()[0] for field in order_part.split(",")]
                
                for field in order_fields:
                    field_clean = field.replace("`", "").lower()
                    idx_name = f"idx_{field_clean}"
                    if idx_name not in existing_indexes:
                        suggestion["recommendations"].append({
                            "type": "CREATE_INDEX",
                            "sql": f"CREATE INDEX {idx_name} ON {table_name} ({field_clean});",
                            "reason": f"{field_clean}字段用于排序,索引可以避免filesort"
                        })
            
            # 分析JOIN条件
            if "JOIN" in pattern_upper:
                # 简化的JOIN分析
                if table_name == "orders" and "users" in pattern_upper:
                    if "idx_user_id" not in existing_indexes:
                        suggestion["recommendations"].append({
                            "type": "CREATE_INDEX",
                            "sql": f"CREATE INDEX idx_user_id ON {table_name} (user_id);",
                            "reason": "user_id字段用于JOIN操作"
                        })
            
            # 估算性能提升
            if suggestion["recommendations"]:
                improvement_score = len(suggestion["recommendations"]) * 20
                suggestion["estimated_improvement"] = f"查询性能预计提升{min(improvement_score, 80)}%"
            else:
                suggestion["estimated_improvement"] = "当前索引配置已较优"
            
            suggestions.append(suggestion)
        
        return suggestions
    
    def generate_transaction_example(self, scenario: str) -> Dict[str, Any]:
        """生成事务示例"""
        examples = {
            "transfer": {
                "title": "银行转账事务",
                "description": "模拟银行账户间转账操作,确保数据一致性",
                "sql_statements": [
                    "START TRANSACTION;",
                    "-- 检查转出账户余额",
                    "SELECT balance FROM accounts WHERE account_id = 'A001' FOR UPDATE;",
                    "-- 检查余额是否充足",
                    "-- 如果余额充足,执行转账",
                    "UPDATE accounts SET balance = balance - 1000 WHERE account_id = 'A001';",
                    "UPDATE accounts SET balance = balance + 1000 WHERE account_id = 'A002';",
                    "-- 记录转账日志",
                    "INSERT INTO transfer_log (from_account, to_account, amount, transfer_time) VALUES ('A001', 'A002', 1000, NOW());",
                    "COMMIT;"
                ],
                "isolation_level": TransactionLevel.READ_COMMITTED,
                "key_points": [
                    "使用FOR UPDATE锁定读取",
                    "检查业务逻辑约束",
                    "原子性操作保证",
                    "异常时自动回滚"
                ]
            },
            "order_processing": {
                "title": "订单处理事务",
                "description": "处理订单创建、库存扣减、支付记录等操作",
                "sql_statements": [
                    "START TRANSACTION;",
                    "-- 创建订单",
                    "INSERT INTO orders (user_id, order_no, total_amount, status) VALUES (1, 'ORD2023001', 299.99, 'pending');",
                    "SET @order_id = LAST_INSERT_ID();",
                    "-- 添加订单明细",
                    "INSERT INTO order_items (order_id, product_id, quantity, price) VALUES (@order_id, 101, 2, 149.99);",
                    "-- 扣减库存",
                    "UPDATE products SET stock_quantity = stock_quantity - 2 WHERE id = 101 AND stock_quantity >= 2;",
                    "-- 检查库存扣减是否成功",
                    "SELECT ROW_COUNT() as affected_rows;",
                    "-- 如果库存不足,回滚事务",
                    "-- 更新订单状态",
                    "UPDATE orders SET status = 'confirmed' WHERE id = @order_id;",
                    "COMMIT;"
                ],
                "isolation_level": TransactionLevel.REPEATABLE_READ,
                "key_points": [
                    "多表操作的一致性",
                    "库存并发控制",
                    "业务状态管理",
                    "错误处理机制"
                ]
            },
            "batch_update": {
                "title": "批量更新事务",
                "description": "批量更新用户积分和等级",
                "sql_statements": [
                    "START TRANSACTION;",
                    "-- 批量更新用户积分",
                    "UPDATE users SET points = points + 100 WHERE status = 'active' AND last_login_date >= DATE_SUB(NOW(), INTERVAL 30 DAY);",
                    "-- 根据积分更新用户等级",
                    "UPDATE users SET level = CASE ",
                    "  WHEN points >= 10000 THEN 'platinum'",
                    "  WHEN points >= 5000 THEN 'gold'",
                    "  WHEN points >= 1000 THEN 'silver'",
                    "  ELSE 'bronze'",
                    "END WHERE status = 'active';",
                    "-- 记录积分变更日志",
                    "INSERT INTO point_log (user_id, point_change, change_reason, change_time) ",
                    "SELECT id, 100, 'monthly_bonus', NOW() FROM users ",
                    "WHERE status = 'active' AND last_login_date >= DATE_SUB(NOW(), INTERVAL 30 DAY);",
                    "COMMIT;"
                ],
                "isolation_level": TransactionLevel.READ_COMMITTED,
                "key_points": [
                    "批量操作效率",
                    "条件更新逻辑",
                    "审计日志记录",
                    "事务大小控制"
                ]
            }
        }
        
        example = examples.get(scenario)
        if not example:
            return {"error": f"未找到场景 {scenario} 的事务示例"}
        
        # 添加通用的事务管理建议
        example["best_practices"] = [
            "保持事务尽可能短小",
            "避免在事务中执行长时间操作",
            "合理设置事务隔离级别",
            "及时处理死锁和超时",
            "使用适当的锁策略",
            "监控事务执行时间"
        ]
        
        example["common_issues"] = [
            "死锁检测和处理",
            "长事务导致的锁等待",
            "事务隔离级别选择",
            "并发更新冲突",
            "事务日志空间管理"
        ]
        
        return example

# TiDB基础操作演示
print("\n\n=== TiDB基础操作与SQL语法 ===")

op_manager = TiDBOperationManager()

print("\n1. 连接方式示例:")
for conn_type in [ConnectionType.MYSQL_CLIENT, ConnectionType.PYTHON_PYMYSQL, ConnectionType.JAVA_JDBC]:
    example = op_manager.generate_connection_examples(conn_type)
    print(f"\n   {example['description']}:")
    print(f"     特性: {', '.join(example['features'][:3])}")
    print(f"     适用场景: {', '.join(example['use_cases'][:3])}")
    if 'command' in example['example_code']:
        print(f"     连接命令: {example['example_code']}")
    else:
        print(f"     代码示例: {len(example['example_code'].split())}行代码")

print("\n2. 表结构设计:")
for table_name, schema in list(op_manager.sample_schemas.items())[:3]:
    print(f"\n   {table_name}表:")
    print(f"     列数: {len(schema.columns)}")
    print(f"     索引数: {len(schema.indexes)}")
    print(f"     主键: {', '.join(schema.primary_key)}")
    print(f"     注释: {schema.comment}")
    
    print(f"\n     主要列:")
    for col in schema.columns[:5]:
        nullable = "NULL" if col.get('nullable', True) else "NOT NULL"
        print(f"       {col['name']}: {col['type']} {nullable} - {col.get('comment', '')}")

print("\n3. 创建表SQL:")
for table_name in ['users', 'products']:
    schema = op_manager.sample_schemas[table_name]
    create_sql = op_manager.generate_create_table_sql(schema)
    lines = create_sql.split('\n')
    print(f"\n   {table_name}表创建SQL:")
    print(f"     SQL长度: {len(create_sql)}字符")
    print(f"     SQL行数: {len(lines)}行")
    print("\n     SQL预览:")
    for line in lines[:8]:
        if line.strip():
            print(f"       {line}")
    if len(lines) > 8:
        print(f"       ... (还有{len(lines)-8}行)")

print("\n4. 示例数据生成:")
for table_name in ['users', 'products', 'orders']:
    sample_sqls = op_manager.generate_sample_data_sql(table_name, 5)
    print(f"\n   {table_name}表示例数据:")
    print(f"     生成SQL数量: {len(sample_sqls)}条")
    if sample_sqls:
        print(f"     示例SQL: {sample_sqls[0][:100]}...")

print("\n5. 查询示例:")
for table_name in ['users', 'products', 'orders']:
    examples = op_manager.generate_query_examples(table_name)
    print(f"\n   {table_name}表查询示例:")
    for i, example in enumerate(examples[:3], 1):
        print(f"     {i}. {example['title']}")
        print(f"        用途: {example['use_case']}")
        print(f"        SQL: {example['sql'][:80]}...")

print("\n6. 查询执行模拟:")
test_queries = [
    ("SELECT * FROM users WHERE status = 'active' LIMIT 10", "users"),
    ("SELECT name, price FROM products WHERE price > 100 ORDER BY price", "products"),
    ("SELECT COUNT(*) FROM orders WHERE status = 'paid'", "orders")
]

for sql, table in test_queries:
    result = op_manager.simulate_query_execution(sql, table)
    print(f"\n   查询: {sql[:50]}...")
    print(f"     执行时间: {result.execution_time_ms:.2f}ms")
    print(f"     返回行数: {result.rows_returned}")
    print(f"     影响行数: {result.rows_affected}")
    if result.warnings:
        print(f"     警告: {result.warnings[0]}")
    if result.explain_plan:
        print(f"     执行计划: {result.explain_plan['type']}扫描")

print("\n7. 索引优化建议:")
query_patterns = [
    "SELECT * FROM users WHERE status = 'active' ORDER BY created_at DESC",
    "SELECT * FROM orders WHERE user_id = 123 AND status IN ('paid', 'shipped')",
    "SELECT * FROM products WHERE category_id = 1 AND price BETWEEN 100 AND 1000"
]

for table_name in ['users', 'orders', 'products']:
    suggestions = op_manager.generate_index_optimization_suggestions(table_name, query_patterns)
    if suggestions and not suggestions[0].get('error'):
        print(f"\n   {table_name}表索引优化:")
        for i, suggestion in enumerate(suggestions[:2], 1):
            print(f"     查询{i}: {suggestion['query_pattern'][:60]}...")
            print(f"     建议数量: {len(suggestion['recommendations'])}个")
            print(f"     预期提升: {suggestion['estimated_improvement']}")
            if suggestion['recommendations']:
                print(f"     首个建议: {suggestion['recommendations'][0]['reason']}")

print("\n8. 事务处理示例:")
for scenario in ['transfer', 'order_processing', 'batch_update']:
    example = op_manager.generate_transaction_example(scenario)
    if not example.get('error'):
        print(f"\n   {example['title']}:")
        print(f"     描述: {example['description']}")
        print(f"     隔离级别: {example['isolation_level'].value}")
        print(f"     SQL语句数: {len(example['sql_statements'])}条")
        print(f"     关键点: {', '.join(example['key_points'][:3])}")
        print(f"     最佳实践: {len(example['best_practices'])}项")

数据库和表管理

1. 数据库操作

创建数据库:

-- 创建数据库
CREATE DATABASE ecommerce 
DEFAULT CHARACTER SET utf8mb4 
DEFAULT COLLATE utf8mb4_unicode_ci;

-- 使用数据库
USE ecommerce;

-- 查看所有数据库
SHOW DATABASES;

-- 查看数据库信息
SELECT SCHEMA_NAME, DEFAULT_CHARACTER_SET_NAME, DEFAULT_COLLATION_NAME 
FROM INFORMATION_SCHEMA.SCHEMATA 
WHERE SCHEMA_NAME = 'ecommerce';

删除数据库:

-- 删除数据库(谨慎操作)
DROP DATABASE IF EXISTS test_db;

2. 表操作

创建表:

-- 创建用户表
CREATE TABLE users (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    username VARCHAR(50) NOT NULL UNIQUE,
    email VARCHAR(100) NOT NULL UNIQUE,
    password_hash VARCHAR(255) NOT NULL,
    full_name VARCHAR(100),
    phone VARCHAR(20),
    status ENUM('active', 'inactive', 'suspended') DEFAULT 'active',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    
    INDEX idx_username (username),
    INDEX idx_email (email),
    INDEX idx_status (status),
    INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='用户信息表';

修改表结构:

-- 添加列
ALTER TABLE users ADD COLUMN avatar_url VARCHAR(255) AFTER full_name;

-- 修改列
ALTER TABLE users MODIFY COLUMN phone VARCHAR(30);

-- 删除列
ALTER TABLE users DROP COLUMN avatar_url;

-- 添加索引
ALTER TABLE users ADD INDEX idx_phone (phone);

-- 删除索引
ALTER TABLE users DROP INDEX idx_phone;

-- 重命名表
RENAME TABLE users TO user_accounts;

查看表信息:

-- 查看表结构
DESC users;
SHOW CREATE TABLE users;

-- 查看表状态
SHOW TABLE STATUS LIKE 'users';

-- 查看所有表
SHOW TABLES;

-- 查看表的索引
SHOW INDEX FROM users;

数据操作

1. 插入数据

单行插入:

-- 插入单条记录
INSERT INTO users (username, email, password_hash, full_name, phone) 
VALUES ('john_doe', 'john@example.com', 'hashed_password', 'John Doe', '13800138000');

-- 插入时指定所有列
INSERT INTO users 
SET username = 'jane_smith', 
    email = 'jane@example.com', 
    password_hash = 'hashed_password',
    full_name = 'Jane Smith',
    status = 'active';

批量插入:

-- 批量插入多条记录
INSERT INTO users (username, email, password_hash, full_name) VALUES
('user001', 'user001@example.com', 'hash001', 'User 001'),
('user002', 'user002@example.com', 'hash002', 'User 002'),
('user003', 'user003@example.com', 'hash003', 'User 003');

-- 从其他表插入
INSERT INTO user_backup (username, email, full_name, created_at)
SELECT username, email, full_name, created_at 
FROM users 
WHERE status = 'active';

插入或更新:

-- ON DUPLICATE KEY UPDATE
INSERT INTO users (username, email, password_hash, full_name) 
VALUES ('john_doe', 'john@example.com', 'new_hash', 'John Doe Updated')
ON DUPLICATE KEY UPDATE 
    password_hash = VALUES(password_hash),
    full_name = VALUES(full_name),
    updated_at = CURRENT_TIMESTAMP;

-- REPLACE语句
REPLACE INTO users (id, username, email, password_hash, full_name) 
VALUES (1, 'john_doe', 'john@example.com', 'new_hash', 'John Doe Updated');

2. 更新数据

基本更新:

-- 更新单条记录
UPDATE users 
SET full_name = 'John Smith', phone = '13900139000' 
WHERE username = 'john_doe';

-- 更新多个字段
UPDATE users 
SET status = 'inactive', updated_at = CURRENT_TIMESTAMP 
WHERE last_login_date < DATE_SUB(NOW(), INTERVAL 1 YEAR);

-- 条件更新
UPDATE products 
SET price = price * 0.9 
WHERE category_id = 1 AND stock_quantity > 100;

复杂更新:

-- 使用CASE语句更新
UPDATE users 
SET level = CASE 
    WHEN points >= 10000 THEN 'platinum'
    WHEN points >= 5000 THEN 'gold'
    WHEN points >= 1000 THEN 'silver'
    ELSE 'bronze'
END
WHERE status = 'active';

-- 关联表更新
UPDATE orders o
JOIN users u ON o.user_id = u.id
SET o.discount = 0.1
WHERE u.level = 'platinum' AND o.status = 'pending';

3. 删除数据

基本删除:

-- 删除指定记录
DELETE FROM users WHERE status = 'inactive' AND last_login_date < '2022-01-01';

-- 删除所有记录(保留表结构)
DELETE FROM temp_table;

-- 清空表(更快,重置AUTO_INCREMENT)
TRUNCATE TABLE temp_table;

关联删除:

-- 删除关联数据
DELETE o, oi 
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE o.status = 'cancelled' AND o.created_at < DATE_SUB(NOW(), INTERVAL 30 DAY);

查询操作

1. 基本查询

简单查询:

-- 查询所有列
SELECT * FROM users;

-- 查询指定列
SELECT username, email, created_at FROM users;

-- 条件查询
SELECT * FROM users WHERE status = 'active';

-- 排序查询
SELECT username, created_at FROM users ORDER BY created_at DESC;

-- 限制结果数量
SELECT * FROM users ORDER BY id LIMIT 10;

-- 分页查询
SELECT * FROM users ORDER BY id LIMIT 20 OFFSET 40;

条件查询:

-- 多条件查询
SELECT * FROM products 
WHERE price BETWEEN 100 AND 1000 
  AND category_id IN (1, 2, 3) 
  AND is_active = TRUE;

-- 模糊查询
SELECT * FROM users WHERE full_name LIKE '%John%';

-- 正则表达式查询
SELECT * FROM users WHERE email REGEXP '^[a-zA-Z0-9]+@gmail\.com$';

-- NULL值查询
SELECT * FROM users WHERE phone IS NOT NULL;

2. 聚合查询

基本聚合:

-- 计数
SELECT COUNT(*) as total_users FROM users;
SELECT COUNT(DISTINCT email) as unique_emails FROM users;

-- 求和、平均值、最大值、最小值
SELECT 
    SUM(total_amount) as total_revenue,
    AVG(total_amount) as avg_order_value,
    MAX(total_amount) as max_order,
    MIN(total_amount) as min_order
FROM orders WHERE status = 'paid';

分组查询:

-- 按状态分组统计
SELECT status, COUNT(*) as count 
FROM users 
GROUP BY status;

-- 多字段分组
SELECT category_id, brand, COUNT(*) as product_count, AVG(price) as avg_price
FROM products 
WHERE is_active = TRUE
GROUP BY category_id, brand
ORDER BY category_id, avg_price DESC;

-- HAVING子句
SELECT user_id, COUNT(*) as order_count, SUM(total_amount) as total_spent
FROM orders 
WHERE status IN ('paid', 'delivered')
GROUP BY user_id
HAVING order_count >= 5 AND total_spent > 1000;

3. 连接查询

内连接:

-- 查询用户及其订单
SELECT u.username, u.email, o.order_no, o.total_amount, o.status
FROM users u
INNER JOIN orders o ON u.id = o.user_id
WHERE u.status = 'active'
ORDER BY o.created_at DESC;

左连接:

-- 查询所有用户及其订单(包括没有订单的用户)
SELECT u.username, u.email, COUNT(o.id) as order_count, COALESCE(SUM(o.total_amount), 0) as total_spent
FROM users u
LEFT JOIN orders o ON u.id = o.user_id AND o.status IN ('paid', 'delivered')
WHERE u.status = 'active'
GROUP BY u.id, u.username, u.email
ORDER BY total_spent DESC;

多表连接:

-- 查询订单详情
SELECT 
    u.username,
    o.order_no,
    o.total_amount,
    oi.product_id,
    p.name as product_name,
    oi.quantity,
    oi.price
FROM users u
JOIN orders o ON u.id = o.user_id
JOIN order_items oi ON o.id = oi.order_id
JOIN products p ON oi.product_id = p.id
WHERE o.status = 'paid'
ORDER BY o.created_at DESC, oi.id;

4. 子查询

标量子查询:

-- 查询高于平均价格的产品
SELECT name, price 
FROM products 
WHERE price > (SELECT AVG(price) FROM products);

EXISTS子查询:

-- 查询有订单的用户
SELECT username, email 
FROM users u
WHERE EXISTS (
    SELECT 1 FROM orders o 
    WHERE o.user_id = u.id AND o.status = 'paid'
);

IN子查询:

-- 查询购买过特定产品的用户
SELECT username, email
FROM users
WHERE id IN (
    SELECT DISTINCT o.user_id
    FROM orders o
    JOIN order_items oi ON o.id = oi.order_id
    WHERE oi.product_id IN (1, 2, 3) AND o.status = 'paid'
);

索引管理

1. 创建索引

基本索引:

-- 创建普通索引
CREATE INDEX idx_user_email ON users (email);

-- 创建唯一索引
CREATE UNIQUE INDEX idx_user_username ON users (username);

-- 创建复合索引
CREATE INDEX idx_order_user_status ON orders (user_id, status);

-- 创建前缀索引
CREATE INDEX idx_user_name_prefix ON users (full_name(10));

表达式索引:

-- 创建函数索引
CREATE INDEX idx_user_email_domain ON users ((SUBSTRING_INDEX(email, '@', -1)));

-- 创建条件索引
CREATE INDEX idx_active_users ON users (created_at) WHERE status = 'active';

2. 查看和删除索引

-- 查看表的索引
SHOW INDEX FROM users;

-- 查看索引使用情况
SELECT 
    TABLE_NAME,
    INDEX_NAME,
    COLUMN_NAME,
    CARDINALITY
FROM INFORMATION_SCHEMA.STATISTICS 
WHERE TABLE_SCHEMA = 'ecommerce' AND TABLE_NAME = 'users';

-- 删除索引
DROP INDEX idx_user_email ON users;
ALTER TABLE users DROP INDEX idx_user_username;

事务处理

1. 事务基础

基本事务操作:

-- 开始事务
START TRANSACTION;
-- 或者使用
BEGIN;

-- 执行SQL语句
INSERT INTO users (username, email, password_hash) VALUES ('test_user', 'test@example.com', 'hash');
UPDATE users SET status = 'active' WHERE username = 'test_user';

-- 提交事务
COMMIT;

-- 或者回滚事务
ROLLBACK;

保存点:

START TRANSACTION;

INSERT INTO users (username, email, password_hash) VALUES ('user1', 'user1@example.com', 'hash1');

-- 创建保存点
SAVEPOINT sp1;

INSERT INTO users (username, email, password_hash) VALUES ('user2', 'user2@example.com', 'hash2');

-- 回滚到保存点
ROLLBACK TO SAVEPOINT sp1;

-- 释放保存点
RELEASE SAVEPOINT sp1;

COMMIT;

2. 事务隔离级别

-- 查看当前隔离级别
SELECT @@transaction_isolation;

-- 设置会话隔离级别
SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;

-- 设置全局隔离级别
SET GLOBAL TRANSACTION ISOLATION LEVEL REPEATABLE READ;

-- 为单个事务设置隔离级别
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
START TRANSACTION;
-- 执行事务操作
COMMIT;

3. 锁机制

行级锁:

-- 共享锁(读锁)
SELECT * FROM users WHERE id = 1 LOCK IN SHARE MODE;

-- 排他锁(写锁)
SELECT * FROM users WHERE id = 1 FOR UPDATE;

-- 跳过锁等待
SELECT * FROM users WHERE id = 1 FOR UPDATE NOWAIT;

-- 跳过被锁定的行
SELECT * FROM users WHERE status = 'active' FOR UPDATE SKIP LOCKED;

TiDB特有功能

1. 分布式事务

-- 查看事务信息
SELECT * FROM INFORMATION_SCHEMA.TIDB_TRX;

-- 查看锁等待信息
SELECT * FROM INFORMATION_SCHEMA.DATA_LOCK_WAITS;

-- 查看当前锁信息
SELECT * FROM INFORMATION_SCHEMA.TIDB_LOCKS;

2. 热点问题处理

-- 使用SHARD_ROW_ID_BITS分散热点
CREATE TABLE hot_table (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    data VARCHAR(255)
) SHARD_ROW_ID_BITS = 4;

-- 使用AUTO_RANDOM避免自增主键热点
CREATE TABLE random_id_table (
    id BIGINT AUTO_RANDOM PRIMARY KEY,
    name VARCHAR(100)
);

3. 执行计划分析

-- 查看执行计划
EXPLAIN SELECT * FROM users WHERE status = 'active';

-- 详细执行计划
EXPLAIN ANALYZE SELECT u.username, COUNT(o.id) as order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.status = 'active'
GROUP BY u.id, u.username;

-- 查看SQL执行统计
SELECT * FROM INFORMATION_SCHEMA.STATEMENTS_SUMMARY 
WHERE DIGEST_TEXT LIKE '%users%' 
ORDER BY EXEC_COUNT DESC LIMIT 10;

性能优化

1. 查询优化

索引优化:

-- 分析表统计信息
ANALYZE TABLE users;

-- 查看索引使用情况
EXPLAIN FORMAT='verbose' SELECT * FROM users WHERE email = 'test@example.com';

-- 强制使用索引
SELECT * FROM users USE INDEX (idx_email) WHERE email LIKE 'test%';

-- 忽略索引
SELECT * FROM users IGNORE INDEX (idx_email) WHERE email = 'test@example.com';

查询重写:

-- 避免SELECT *
-- 不推荐
SELECT * FROM users WHERE status = 'active';

-- 推荐
SELECT id, username, email FROM users WHERE status = 'active';

-- 使用LIMIT限制结果
SELECT username, email FROM users ORDER BY created_at DESC LIMIT 100;

-- 避免在WHERE子句中使用函数
-- 不推荐
SELECT * FROM orders WHERE YEAR(created_at) = 2023;

-- 推荐
SELECT * FROM orders WHERE created_at >= '2023-01-01' AND created_at < '2024-01-01';

2. 批量操作优化

-- 批量插入
INSERT INTO users (username, email, password_hash) VALUES
('user1', 'user1@example.com', 'hash1'),
('user2', 'user2@example.com', 'hash2'),
('user3', 'user3@example.com', 'hash3');

-- 批量更新
UPDATE users 
SET status = CASE id
    WHEN 1 THEN 'active'
    WHEN 2 THEN 'inactive'
    WHEN 3 THEN 'suspended'
END
WHERE id IN (1, 2, 3);

监控和维护

1. 性能监控

-- 查看慢查询
SELECT * FROM INFORMATION_SCHEMA.SLOW_QUERY 
WHERE Time > '2023-01-01 00:00:00' 
ORDER BY Query_time DESC LIMIT 10;

-- 查看表大小
SELECT 
    TABLE_NAME,
    ROUND(((DATA_LENGTH + INDEX_LENGTH) / 1024 / 1024), 2) AS 'Size (MB)'
FROM INFORMATION_SCHEMA.TABLES 
WHERE TABLE_SCHEMA = 'ecommerce'
ORDER BY (DATA_LENGTH + INDEX_LENGTH) DESC;

-- 查看连接信息
SHOW PROCESSLIST;

-- 查看系统变量
SHOW VARIABLES LIKE 'innodb%';

2. 维护操作

-- 优化表
OPTIMIZE TABLE users;

-- 检查表
CHECK TABLE users;

-- 修复表
REPAIR TABLE users;

-- 更新表统计信息
ANALYZE TABLE users;

总结

关键要点

  1. SQL兼容性:TiDB高度兼容MySQL语法,大部分MySQL应用可以无缝迁移
  2. 分布式特性:支持水平扩展,自动分片和负载均衡
  3. 事务支持:提供ACID事务保证,支持分布式事务
  4. 性能优化:通过合理的索引设计和查询优化提升性能
  5. 监控运维:丰富的监控指标和维护工具

最佳实践

  1. 表设计

    • 选择合适的数据类型
    • 设计合理的主键和索引
    • 避免热点问题
    • 考虑分区策略
  2. 查询优化

    • 避免全表扫描
    • 合理使用索引
    • 限制返回结果数量
    • 优化JOIN操作
  3. 事务管理

    • 保持事务简短
    • 选择合适的隔离级别
    • 处理死锁和冲突
    • 监控长事务
  4. 性能监控

    • 定期分析慢查询
    • 监控系统资源使用
    • 更新表统计信息
    • 优化热点查询

下一步学习

  1. 高级查询技巧:窗口函数、CTE、递归查询
  2. 分区表设计:分区策略和分区管理
  3. 集群管理:扩容缩容、故障处理
  4. 数据迁移:从MySQL迁移到TiDB的最佳实践