概述

本章将深入探讨TiDB的高级特性和性能优化技巧,包括分区表、窗口函数、CTE、HTAP能力、分布式执行计划优化等。这些特性能够帮助您充分发挥TiDB分布式数据库的优势。

学习目标

通过本章学习,您将了解: - TiDB的分区表设计和管理 - 高级SQL功能:窗口函数、CTE、递归查询 - HTAP混合负载处理 - 分布式执行计划优化 - 热点问题识别和解决 - 性能调优最佳实践 - TiDB特有的系统变量和配置

分区表

1. 分区表概述

from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Any, Optional, Tuple, Union
import json
import time
import random
from datetime import datetime, timedelta
import calendar

class PartitionType(Enum):
    """分区类型"""
    RANGE = "range"              # 范围分区
    HASH = "hash"                # 哈希分区
    LIST = "list"                # 列表分区
    RANGE_COLUMNS = "range_columns"  # 多列范围分区
    LIST_COLUMNS = "list_columns"    # 多列列表分区

class PartitionStrategy(Enum):
    """分区策略"""
    BY_DATE = "by_date"          # 按日期分区
    BY_HASH = "by_hash"          # 按哈希分区
    BY_REGION = "by_region"      # 按地区分区
    BY_USER_ID = "by_user_id"    # 按用户ID分区
    BY_STATUS = "by_status"      # 按状态分区

class OptimizationType(Enum):
    """优化类型"""
    PARTITION_PRUNING = "partition_pruning"    # 分区裁剪
    INDEX_OPTIMIZATION = "index_optimization"  # 索引优化
    QUERY_REWRITE = "query_rewrite"           # 查询重写
    PARALLEL_EXECUTION = "parallel_execution"  # 并行执行
    MEMORY_OPTIMIZATION = "memory_optimization" # 内存优化

@dataclass
class PartitionInfo:
    """分区信息"""
    partition_name: str
    partition_type: PartitionType
    partition_expression: str
    partition_description: str
    row_count: int
    data_size_mb: float
    created_at: datetime
    last_accessed: datetime

@dataclass
class PartitionPlan:
    """分区方案"""
    table_name: str
    partition_strategy: PartitionStrategy
    partition_type: PartitionType
    partition_key: str
    partition_count: int
    partitions: List[PartitionInfo]
    estimated_performance_gain: float
    maintenance_complexity: str
    storage_efficiency: float

@dataclass
class QueryOptimization:
    """查询优化"""
    original_sql: str
    optimized_sql: str
    optimization_type: OptimizationType
    performance_improvement: float
    explanation: str
    applicable_scenarios: List[str]

@dataclass
class HTAPWorkload:
    """HTAP工作负载"""
    workload_name: str
    oltp_queries: List[str]
    olap_queries: List[str]
    mixed_queries: List[str]
    resource_isolation: Dict[str, Any]
    performance_metrics: Dict[str, float]
    optimization_suggestions: List[str]

class TiDBAdvancedManager:
    """TiDB高级特性管理器"""
    
    def __init__(self):
        self.partition_templates = self._initialize_partition_templates()
        self.optimization_patterns = self._initialize_optimization_patterns()
        self.htap_scenarios = self._initialize_htap_scenarios()
        self.performance_baselines = self._initialize_performance_baselines()
    
    def _initialize_partition_templates(self) -> Dict[PartitionStrategy, Dict[str, Any]]:
        """初始化分区模板"""
        templates = {}
        
        # 按日期分区模板
        templates[PartitionStrategy.BY_DATE] = {
            "description": "按日期进行范围分区,适用于时间序列数据",
            "partition_type": PartitionType.RANGE,
            "examples": {
                "orders_by_month": {
                    "table_name": "orders",
                    "partition_key": "created_at",
                    "sql_template": """
CREATE TABLE orders (
    id BIGINT AUTO_INCREMENT,
    user_id BIGINT NOT NULL,
    order_no VARCHAR(32) NOT NULL,
    total_amount DECIMAL(10,2) NOT NULL,
    status ENUM('pending','paid','shipped','delivered','cancelled') DEFAULT 'pending',
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (id, created_at),
    UNIQUE KEY uk_order_no (order_no),
    KEY idx_user_id (user_id),
    KEY idx_status (status)
) PARTITION BY RANGE (YEAR(created_at)*100 + MONTH(created_at)) (
    PARTITION p202301 VALUES LESS THAN (202302),
    PARTITION p202302 VALUES LESS THAN (202303),
    PARTITION p202303 VALUES LESS THAN (202304),
    PARTITION p202304 VALUES LESS THAN (202305),
    PARTITION p202305 VALUES LESS THAN (202306),
    PARTITION p202306 VALUES LESS THAN (202307),
    PARTITION p202307 VALUES LESS THAN (202308),
    PARTITION p202308 VALUES LESS THAN (202309),
    PARTITION p202309 VALUES LESS THAN (202310),
    PARTITION p202310 VALUES LESS THAN (202311),
    PARTITION p202311 VALUES LESS THAN (202312),
    PARTITION p202312 VALUES LESS THAN (202401),
    PARTITION p_future VALUES LESS THAN MAXVALUE
);""",
                    "benefits": [
                        "查询特定时间范围时只扫描相关分区",
                        "历史数据可以单独管理和归档",
                        "删除历史数据时可以直接删除分区",
                        "提高并发查询性能"
                    ],
                    "use_cases": ["订单系统", "日志分析", "监控数据", "交易记录"]
                },
                "logs_by_day": {
                    "table_name": "access_logs",
                    "partition_key": "log_time",
                    "sql_template": """
CREATE TABLE access_logs (
    id BIGINT AUTO_INCREMENT,
    user_id BIGINT,
    ip_address VARCHAR(45),
    user_agent TEXT,
    request_url VARCHAR(500),
    response_code INT,
    response_time_ms INT,
    log_time TIMESTAMP NOT NULL,
    PRIMARY KEY (id, log_time),
    KEY idx_user_id (user_id),
    KEY idx_ip (ip_address),
    KEY idx_response_code (response_code)
) PARTITION BY RANGE (TO_DAYS(log_time)) (
    PARTITION p20230101 VALUES LESS THAN (TO_DAYS('2023-01-02')),
    PARTITION p20230102 VALUES LESS THAN (TO_DAYS('2023-01-03')),
    PARTITION p20230103 VALUES LESS THAN (TO_DAYS('2023-01-04')),
    PARTITION p20230104 VALUES LESS THAN (TO_DAYS('2023-01-05')),
    PARTITION p20230105 VALUES LESS THAN (TO_DAYS('2023-01-06')),
    PARTITION p_future VALUES LESS THAN MAXVALUE
);""",
                    "benefits": [
                        "日志数据按天分区便于管理",
                        "可以快速删除过期日志",
                        "提高日志查询和分析性能",
                        "支持并行处理多天数据"
                    ],
                    "use_cases": ["访问日志", "应用日志", "审计日志", "性能监控"]
                }
            }
        }
        
        # 按哈希分区模板
        templates[PartitionStrategy.BY_HASH] = {
            "description": "按哈希值分区,适用于均匀分布数据",
            "partition_type": PartitionType.HASH,
            "examples": {
                "users_by_hash": {
                    "table_name": "users",
                    "partition_key": "id",
                    "sql_template": """
CREATE TABLE users (
    id BIGINT AUTO_INCREMENT,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100) NOT NULL,
    password_hash VARCHAR(255) NOT NULL,
    full_name VARCHAR(100),
    phone VARCHAR(20),
    status ENUM('active','inactive','suspended') DEFAULT 'active',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (id),
    UNIQUE KEY uk_username (username),
    UNIQUE KEY uk_email (email),
    KEY idx_status (status)
) PARTITION BY HASH(id) PARTITIONS 8;""",
                    "benefits": [
                        "数据均匀分布到各个分区",
                        "避免热点问题",
                        "支持并行查询和更新",
                        "简化分区管理"
                    ],
                    "use_cases": ["用户表", "产品表", "配置表", "字典表"]
                }
            }
        }
        
        # 按地区分区模板
        templates[PartitionStrategy.BY_REGION] = {
            "description": "按地区进行列表分区,适用于地理位置相关数据",
            "partition_type": PartitionType.LIST,
            "examples": {
                "orders_by_region": {
                    "table_name": "regional_orders",
                    "partition_key": "region_code",
                    "sql_template": """
CREATE TABLE regional_orders (
    id BIGINT AUTO_INCREMENT,
    user_id BIGINT NOT NULL,
    order_no VARCHAR(32) NOT NULL,
    total_amount DECIMAL(10,2) NOT NULL,
    region_code VARCHAR(10) NOT NULL,
    city VARCHAR(50),
    status ENUM('pending','paid','shipped','delivered','cancelled') DEFAULT 'pending',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (id, region_code),
    UNIQUE KEY uk_order_no (order_no),
    KEY idx_user_id (user_id),
    KEY idx_city (city)
) PARTITION BY LIST (region_code) (
    PARTITION p_north VALUES IN ('BJ','TJ','HE','SX','NM'),
    PARTITION p_northeast VALUES IN ('LN','JL','HL'),
    PARTITION p_east VALUES IN ('SH','JS','ZJ','AH','FJ','JX','SD'),
    PARTITION p_south VALUES IN ('HN','HB','GD','GX','HI'),
    PARTITION p_southwest VALUES IN ('CQ','SC','GZ','YN','XZ'),
    PARTITION p_northwest VALUES IN ('SN','GS','QH','NX','XJ'),
    PARTITION p_other VALUES IN ('TW','HK','MO')
);""",
                    "benefits": [
                        "按地区查询时只扫描相关分区",
                        "支持地区级别的数据管理",
                        "便于地区性能分析",
                        "支持地区级别的数据备份"
                    ],
                    "use_cases": ["电商订单", "物流配送", "区域销售", "用户分析"]
                }
            }
        }
        
        return templates
    
    def _initialize_optimization_patterns(self) -> Dict[OptimizationType, List[Dict[str, Any]]]:
        """初始化优化模式"""
        patterns = {}
        
        # 分区裁剪优化
        patterns[OptimizationType.PARTITION_PRUNING] = [
            {
                "name": "时间范围查询优化",
                "original_sql": "SELECT * FROM orders WHERE created_at >= '2023-06-01' AND created_at < '2023-07-01'",
                "optimized_sql": "SELECT * FROM orders WHERE created_at >= '2023-06-01' AND created_at < '2023-07-01'",
                "optimization": "确保WHERE条件包含分区键,启用分区裁剪",
                "performance_gain": 80.0,
                "explanation": "通过分区裁剪,只扫描2023年6月的分区,避免全表扫描"
            },
            {
                "name": "多条件分区裁剪",
                "original_sql": "SELECT * FROM orders WHERE user_id = 12345 AND status = 'paid'",
                "optimized_sql": "SELECT * FROM orders WHERE user_id = 12345 AND status = 'paid' AND created_at >= '2023-01-01'",
                "optimization": "添加分区键条件,即使是宽泛的条件也能启用分区裁剪",
                "performance_gain": 60.0,
                "explanation": "添加时间条件后,可以限制扫描的分区数量"
            }
        ]
        
        # 索引优化
        patterns[OptimizationType.INDEX_OPTIMIZATION] = [
            {
                "name": "复合索引优化",
                "original_sql": "SELECT * FROM orders WHERE user_id = 12345 AND status = 'paid' ORDER BY created_at DESC",
                "optimized_sql": "SELECT * FROM orders WHERE user_id = 12345 AND status = 'paid' ORDER BY created_at DESC",
                "optimization": "CREATE INDEX idx_user_status_time ON orders (user_id, status, created_at)",
                "performance_gain": 90.0,
                "explanation": "复合索引覆盖WHERE条件和ORDER BY,避免额外排序"
            },
            {
                "name": "覆盖索引优化",
                "original_sql": "SELECT user_id, total_amount, created_at FROM orders WHERE status = 'paid'",
                "optimized_sql": "SELECT user_id, total_amount, created_at FROM orders WHERE status = 'paid'",
                "optimization": "CREATE INDEX idx_status_covering ON orders (status, user_id, total_amount, created_at)",
                "performance_gain": 70.0,
                "explanation": "覆盖索引包含所有需要的列,避免回表查询"
            }
        ]
        
        # 查询重写优化
        patterns[OptimizationType.QUERY_REWRITE] = [
            {
                "name": "子查询转换为JOIN",
                "original_sql": "SELECT * FROM users WHERE id IN (SELECT user_id FROM orders WHERE status = 'paid')",
                "optimized_sql": "SELECT DISTINCT u.* FROM users u INNER JOIN orders o ON u.id = o.user_id WHERE o.status = 'paid'",
                "optimization": "将IN子查询转换为JOIN,提高执行效率",
                "performance_gain": 50.0,
                "explanation": "JOIN通常比IN子查询有更好的执行计划"
            },
            {
                "name": "EXISTS优化",
                "original_sql": "SELECT * FROM users WHERE id IN (SELECT user_id FROM orders)",
                "optimized_sql": "SELECT * FROM users u WHERE EXISTS (SELECT 1 FROM orders o WHERE o.user_id = u.id)",
                "optimization": "使用EXISTS替代IN,当子查询结果集较大时更高效",
                "performance_gain": 40.0,
                "explanation": "EXISTS在找到第一个匹配后即停止,比IN更高效"
            }
        ]
        
        return patterns
    
    def _initialize_htap_scenarios(self) -> Dict[str, HTAPWorkload]:
        """初始化HTAP场景"""
        scenarios = {}
        
        # 电商场景
        scenarios["ecommerce"] = HTAPWorkload(
            workload_name="电商HTAP场景",
            oltp_queries=[
                "INSERT INTO orders (user_id, order_no, total_amount) VALUES (?, ?, ?)",
                "UPDATE orders SET status = 'paid' WHERE order_no = ?",
                "SELECT * FROM users WHERE id = ?",
                "SELECT * FROM products WHERE id = ? AND is_active = TRUE",
                "UPDATE products SET stock_quantity = stock_quantity - ? WHERE id = ?"
            ],
            olap_queries=[
                "SELECT DATE(created_at) as order_date, COUNT(*) as order_count, SUM(total_amount) as revenue FROM orders WHERE created_at >= DATE_SUB(NOW(), INTERVAL 30 DAY) GROUP BY DATE(created_at)",
                "SELECT u.region, COUNT(DISTINCT u.id) as user_count, SUM(o.total_amount) as total_revenue FROM users u JOIN orders o ON u.id = o.user_id WHERE o.status = 'paid' GROUP BY u.region",
                "SELECT p.category_id, p.brand, COUNT(oi.product_id) as sales_count, SUM(oi.quantity * oi.price) as revenue FROM products p JOIN order_items oi ON p.id = oi.product_id JOIN orders o ON oi.order_id = o.id WHERE o.status = 'paid' GROUP BY p.category_id, p.brand"
            ],
            mixed_queries=[
                "SELECT u.username, u.email, COUNT(o.id) as order_count, SUM(o.total_amount) as total_spent FROM users u LEFT JOIN orders o ON u.id = o.user_id WHERE u.created_at >= ? GROUP BY u.id",
                "SELECT p.name, p.price, p.stock_quantity, COALESCE(sales.total_sold, 0) as total_sold FROM products p LEFT JOIN (SELECT oi.product_id, SUM(oi.quantity) as total_sold FROM order_items oi JOIN orders o ON oi.order_id = o.id WHERE o.status = 'paid' GROUP BY oi.product_id) sales ON p.id = sales.product_id WHERE p.is_active = TRUE"
            ],
            resource_isolation={
                "tiflash_replicas": 2,
                "tidb_isolation_read_engines": "tiflash,tikv",
                "oltp_resource_group": "rg_oltp",
                "olap_resource_group": "rg_olap"
            },
            performance_metrics={
                "oltp_avg_latency_ms": 5.0,
                "oltp_p99_latency_ms": 20.0,
                "olap_avg_query_time_s": 2.5,
                "olap_throughput_qps": 100.0,
                "mixed_workload_efficiency": 85.0
            },
            optimization_suggestions=[
                "为OLAP查询创建TiFlash副本",
                "使用资源组隔离OLTP和OLAP工作负载",
                "为时间序列数据创建分区表",
                "优化JOIN查询的索引设计",
                "使用列式存储加速聚合查询"
            ]
        )
        
        # 金融场景
        scenarios["finance"] = HTAPWorkload(
            workload_name="金融HTAP场景",
            oltp_queries=[
                "INSERT INTO transactions (account_id, amount, transaction_type, description) VALUES (?, ?, ?, ?)",
                "UPDATE accounts SET balance = balance + ? WHERE account_id = ?",
                "SELECT balance FROM accounts WHERE account_id = ? FOR UPDATE",
                "INSERT INTO audit_log (user_id, action, details, timestamp) VALUES (?, ?, ?, NOW())"
            ],
            olap_queries=[
                "SELECT account_type, SUM(balance) as total_balance, COUNT(*) as account_count FROM accounts GROUP BY account_type",
                "SELECT DATE(transaction_time) as trans_date, transaction_type, SUM(amount) as total_amount FROM transactions WHERE transaction_time >= DATE_SUB(NOW(), INTERVAL 90 DAY) GROUP BY DATE(transaction_time), transaction_type",
                "SELECT customer_segment, AVG(balance) as avg_balance, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY balance) as median_balance FROM accounts a JOIN customers c ON a.customer_id = c.id GROUP BY customer_segment"
            ],
            mixed_queries=[
                "SELECT a.account_id, a.balance, COUNT(t.id) as transaction_count, SUM(CASE WHEN t.amount > 0 THEN t.amount ELSE 0 END) as total_deposits FROM accounts a LEFT JOIN transactions t ON a.account_id = t.account_id WHERE a.status = 'active' GROUP BY a.account_id",
                "SELECT c.customer_id, c.name, SUM(a.balance) as total_balance, COUNT(DISTINCT a.account_id) as account_count FROM customers c JOIN accounts a ON c.id = a.customer_id WHERE c.status = 'active' GROUP BY c.customer_id"
            ],
            resource_isolation={
                "tiflash_replicas": 3,
                "tidb_isolation_read_engines": "tiflash,tikv",
                "oltp_resource_group": "rg_trading",
                "olap_resource_group": "rg_reporting"
            },
            performance_metrics={
                "oltp_avg_latency_ms": 2.0,
                "oltp_p99_latency_ms": 10.0,
                "olap_avg_query_time_s": 5.0,
                "olap_throughput_qps": 50.0,
                "mixed_workload_efficiency": 90.0
            },
            optimization_suggestions=[
                "实施严格的资源隔离策略",
                "为实时报表查询优化TiFlash配置",
                "使用分区表管理历史交易数据",
                "实施多级缓存策略",
                "优化高频查询的执行计划"
            ]
        )
        
        return scenarios
    
    def _initialize_performance_baselines(self) -> Dict[str, Dict[str, float]]:
        """初始化性能基准"""
        return {
            "query_performance": {
                "simple_select_ms": 1.0,
                "complex_join_ms": 50.0,
                "aggregation_query_ms": 100.0,
                "full_table_scan_ms": 1000.0
            },
            "throughput": {
                "insert_tps": 10000.0,
                "update_tps": 8000.0,
                "select_qps": 50000.0,
                "mixed_workload_tps": 15000.0
            },
            "resource_usage": {
                "cpu_utilization_percent": 70.0,
                "memory_usage_percent": 80.0,
                "disk_io_utilization_percent": 60.0,
                "network_bandwidth_mbps": 1000.0
            }
        }
    
    def design_partition_strategy(self, table_name: str, data_characteristics: Dict[str, Any]) -> PartitionPlan:
        """设计分区策略"""
        # 分析数据特征
        data_size_gb = data_characteristics.get('data_size_gb', 100)
        growth_rate_monthly = data_characteristics.get('growth_rate_monthly', 0.1)
        query_patterns = data_characteristics.get('query_patterns', [])
        time_column = data_characteristics.get('time_column')
        distribution_column = data_characteristics.get('distribution_column')
        
        # 选择分区策略
        if time_column and any('time_range' in pattern for pattern in query_patterns):
            strategy = PartitionStrategy.BY_DATE
            partition_type = PartitionType.RANGE
            partition_key = time_column
        elif distribution_column and data_characteristics.get('uniform_distribution', False):
            strategy = PartitionStrategy.BY_HASH
            partition_type = PartitionType.HASH
            partition_key = distribution_column
        elif data_characteristics.get('region_column'):
            strategy = PartitionStrategy.BY_REGION
            partition_type = PartitionType.LIST
            partition_key = data_characteristics['region_column']
        else:
            strategy = PartitionStrategy.BY_HASH
            partition_type = PartitionType.HASH
            partition_key = 'id'
        
        # 计算分区数量
        if strategy == PartitionStrategy.BY_DATE:
            # 按月分区,保留2年数据
            partition_count = 24
        elif strategy == PartitionStrategy.BY_HASH:
            # 根据数据大小确定分区数
            partition_count = min(max(int(data_size_gb / 10), 4), 64)
        else:
            partition_count = 8  # 默认分区数
        
        # 生成分区信息
        partitions = []
        for i in range(partition_count):
            partition = PartitionInfo(
                partition_name=f"p{i:02d}",
                partition_type=partition_type,
                partition_expression=f"{partition_key} partition {i}",
                partition_description=f"Partition {i} for {table_name}",
                row_count=random.randint(100000, 1000000),
                data_size_mb=random.uniform(100, 1000),
                created_at=datetime.now() - timedelta(days=random.randint(1, 365)),
                last_accessed=datetime.now() - timedelta(hours=random.randint(1, 24))
            )
            partitions.append(partition)
        
        # 估算性能提升
        if strategy == PartitionStrategy.BY_DATE and time_column:
            performance_gain = 70.0  # 时间范围查询性能提升
        elif strategy == PartitionStrategy.BY_HASH:
            performance_gain = 40.0  # 并行处理性能提升
        else:
            performance_gain = 50.0  # 一般性能提升
        
        # 评估维护复杂度
        if strategy == PartitionStrategy.BY_DATE:
            maintenance_complexity = "中等 - 需要定期添加新分区和清理历史分区"
        elif strategy == PartitionStrategy.BY_HASH:
            maintenance_complexity = "低 - 分区自动管理,维护简单"
        else:
            maintenance_complexity = "中等 - 需要根据业务变化调整分区"
        
        return PartitionPlan(
            table_name=table_name,
            partition_strategy=strategy,
            partition_type=partition_type,
            partition_key=partition_key,
            partition_count=partition_count,
            partitions=partitions,
            estimated_performance_gain=performance_gain,
            maintenance_complexity=maintenance_complexity,
            storage_efficiency=85.0
        )
    
    def generate_partition_ddl(self, partition_plan: PartitionPlan) -> str:
        """生成分区表DDL"""
        template = self.partition_templates.get(partition_plan.partition_strategy)
        if not template:
            return f"-- 未找到分区策略 {partition_plan.partition_strategy} 的模板"
        
        # 获取对应的示例模板
        examples = template.get('examples', {})
        if not examples:
            return f"-- 分区策略 {partition_plan.partition_strategy} 暂无示例"
        
        # 选择第一个示例作为基础
        example_key = list(examples.keys())[0]
        example = examples[example_key]
        
        # 替换表名和分区键
        ddl = example['sql_template']
        ddl = ddl.replace(example['table_name'], partition_plan.table_name)
        
        return ddl
    
    def analyze_query_optimization(self, sql: str, table_info: Dict[str, Any]) -> List[QueryOptimization]:
        """分析查询优化机会"""
        optimizations = []
        sql_upper = sql.upper()
        
        # 检查各种优化模式
        for opt_type, patterns in self.optimization_patterns.items():
            for pattern in patterns:
                # 简化的模式匹配
                if self._matches_pattern(sql, pattern['original_sql']):
                    optimization = QueryOptimization(
                        original_sql=sql,
                        optimized_sql=pattern['optimized_sql'],
                        optimization_type=opt_type,
                        performance_improvement=pattern['performance_gain'],
                        explanation=pattern['explanation'],
                        applicable_scenarios=["通用场景", "高并发场景", "大数据量场景"]
                    )
                    optimizations.append(optimization)
        
        # 添加通用优化建议
        if "SELECT *" in sql_upper:
            optimizations.append(QueryOptimization(
                original_sql=sql,
                optimized_sql=sql.replace("SELECT *", "SELECT specific_columns"),
                optimization_type=OptimizationType.QUERY_REWRITE,
                performance_improvement=20.0,
                explanation="避免SELECT *,只查询需要的列",
                applicable_scenarios=["网络带宽敏感", "内存使用优化", "缓存效率提升"]
            ))
        
        if "ORDER BY" in sql_upper and "LIMIT" not in sql_upper:
            optimizations.append(QueryOptimization(
                original_sql=sql,
                optimized_sql=sql + " LIMIT 1000",
                optimization_type=OptimizationType.QUERY_REWRITE,
                performance_improvement=30.0,
                explanation="为ORDER BY查询添加LIMIT限制",
                applicable_scenarios=["分页查询", "Top-N查询", "结果集限制"]
            ))
        
        return optimizations
    
    def _matches_pattern(self, sql: str, pattern: str) -> bool:
        """简化的SQL模式匹配"""
        sql_keywords = set(sql.upper().split())
        pattern_keywords = set(pattern.upper().split())
        
        # 检查关键词重叠度
        common_keywords = sql_keywords.intersection(pattern_keywords)
        return len(common_keywords) >= 3
    
    def generate_htap_configuration(self, scenario_name: str) -> Dict[str, Any]:
        """生成HTAP配置"""
        scenario = self.htap_scenarios.get(scenario_name)
        if not scenario:
            return {"error": f"未找到场景 {scenario_name}"}
        
        config = {
            "scenario": scenario.workload_name,
            "tiflash_configuration": {
                "enable_tiflash": True,
                "tiflash_replicas": scenario.resource_isolation.get('tiflash_replicas', 2),
                "tiflash_learner_config": {
                    "log-level": "info",
                    "server.engine-store.enable-fast-scan": True,
                    "profiles.default.max_memory_usage": "10000000000"
                }
            },
            "resource_groups": {
                "oltp_group": {
                    "name": scenario.resource_isolation.get('oltp_resource_group', 'rg_oltp'),
                    "ru_per_sec": 2000,
                    "priority": "HIGH",
                    "burstable": True
                },
                "olap_group": {
                    "name": scenario.resource_isolation.get('olap_resource_group', 'rg_olap'),
                    "ru_per_sec": 1000,
                    "priority": "MEDIUM",
                    "burstable": False
                }
            },
            "isolation_settings": {
                "tidb_isolation_read_engines": scenario.resource_isolation.get('tidb_isolation_read_engines', 'tiflash,tikv'),
                "tidb_allow_mpp": True,
                "tidb_enforce_mpp": False,
                "tidb_opt_broadcast_cartesian_join": 2
            },
            "performance_targets": scenario.performance_metrics,
            "optimization_recommendations": scenario.optimization_suggestions,
            "monitoring_queries": {
                "tiflash_status": "SELECT * FROM INFORMATION_SCHEMA.TIFLASH_REPLICA;",
                "resource_group_usage": "SELECT * FROM INFORMATION_SCHEMA.RESOURCE_GROUPS;",
                "mpp_query_stats": "SELECT * FROM INFORMATION_SCHEMA.TIDB_TRX WHERE state = 'Running';"
            }
        }
        
        return config
    
    def generate_window_function_examples(self) -> List[Dict[str, Any]]:
        """生成窗口函数示例"""
        examples = [
            {
                "title": "排名函数 - ROW_NUMBER, RANK, DENSE_RANK",
                "description": "为每行分配唯一排名、相同值相同排名、密集排名",
                "sql": """
-- 用户订单排名分析
SELECT 
    user_id,
    order_no,
    total_amount,
    created_at,
    ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY total_amount DESC) as row_num,
    RANK() OVER (PARTITION BY user_id ORDER BY total_amount DESC) as rank_num,
    DENSE_RANK() OVER (PARTITION BY user_id ORDER BY total_amount DESC) as dense_rank_num
FROM orders 
WHERE status = 'paid'
ORDER BY user_id, total_amount DESC;""",
                "use_cases": ["用户消费排名", "产品销量排名", "员工绩效排名", "学生成绩排名"]
            },
            {
                "title": "聚合窗口函数 - SUM, AVG, COUNT",
                "description": "计算累计值、移动平均、滚动计数",
                "sql": """
-- 销售趋势分析
SELECT 
    DATE(created_at) as order_date,
    COUNT(*) as daily_orders,
    SUM(total_amount) as daily_revenue,
    SUM(COUNT(*)) OVER (ORDER BY DATE(created_at)) as cumulative_orders,
    SUM(SUM(total_amount)) OVER (ORDER BY DATE(created_at)) as cumulative_revenue,
    AVG(SUM(total_amount)) OVER (ORDER BY DATE(created_at) ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) as weekly_avg_revenue
FROM orders 
WHERE status = 'paid' AND created_at >= DATE_SUB(NOW(), INTERVAL 30 DAY)
GROUP BY DATE(created_at)
ORDER BY order_date;""",
                "use_cases": ["销售趋势分析", "累计指标计算", "移动平均计算", "同比环比分析"]
            },
            {
                "title": "偏移函数 - LAG, LEAD",
                "description": "访问前一行或后一行的数据",
                "sql": """
-- 用户行为分析
SELECT 
    user_id,
    login_time,
    LAG(login_time) OVER (PARTITION BY user_id ORDER BY login_time) as prev_login,
    LEAD(login_time) OVER (PARTITION BY user_id ORDER BY login_time) as next_login,
    TIMESTAMPDIFF(MINUTE, LAG(login_time) OVER (PARTITION BY user_id ORDER BY login_time), login_time) as minutes_since_last_login
FROM user_login_log 
WHERE login_time >= DATE_SUB(NOW(), INTERVAL 7 DAY)
ORDER BY user_id, login_time;""",
                "use_cases": ["用户行为分析", "时间间隔计算", "状态变化追踪", "序列数据分析"]
            },
            {
                "title": "分布函数 - NTILE, PERCENT_RANK",
                "description": "数据分组和百分位排名",
                "sql": """
-- 用户价值分层
SELECT 
    user_id,
    total_spent,
    NTILE(4) OVER (ORDER BY total_spent DESC) as value_quartile,
    PERCENT_RANK() OVER (ORDER BY total_spent) as percentile_rank,
    CASE 
        WHEN NTILE(4) OVER (ORDER BY total_spent DESC) = 1 THEN 'VIP'
        WHEN NTILE(4) OVER (ORDER BY total_spent DESC) = 2 THEN 'Gold'
        WHEN NTILE(4) OVER (ORDER BY total_spent DESC) = 3 THEN 'Silver'
        ELSE 'Bronze'
    END as user_tier
FROM (
    SELECT 
        user_id, 
        SUM(total_amount) as total_spent
    FROM orders 
    WHERE status = 'paid'
    GROUP BY user_id
) user_spending
ORDER BY total_spent DESC;""",
                "use_cases": ["用户分层", "数据分组", "百分位分析", "等级划分"]
            }
        ]
        
        return examples
    
    def generate_cte_examples(self) -> List[Dict[str, Any]]:
        """生成CTE示例"""
        examples = [
            {
                "title": "基础CTE - 简化复杂查询",
                "description": "使用CTE提高查询可读性和维护性",
                "sql": """
-- 用户订单统计分析
WITH user_stats AS (
    SELECT 
        user_id,
        COUNT(*) as order_count,
        SUM(total_amount) as total_spent,
        AVG(total_amount) as avg_order_value,
        MAX(created_at) as last_order_date
    FROM orders 
    WHERE status = 'paid'
    GROUP BY user_id
),
user_categories AS (
    SELECT 
        user_id,
        order_count,
        total_spent,
        avg_order_value,
        last_order_date,
        CASE 
            WHEN total_spent >= 10000 THEN 'VIP'
            WHEN total_spent >= 5000 THEN 'Premium'
            WHEN total_spent >= 1000 THEN 'Regular'
            ELSE 'New'
        END as user_category
    FROM user_stats
)
SELECT 
    uc.user_category,
    COUNT(*) as user_count,
    AVG(uc.total_spent) as avg_total_spent,
    AVG(uc.order_count) as avg_order_count,
    AVG(uc.avg_order_value) as avg_order_value
FROM user_categories uc
GROUP BY uc.user_category
ORDER BY avg_total_spent DESC;""",
                "use_cases": ["用户分析", "数据分层", "复杂统计", "报表生成"]
            },
            {
                "title": "递归CTE - 层级数据处理",
                "description": "处理树形结构和层级关系数据",
                "sql": """
-- 组织架构层级查询
WITH RECURSIVE org_hierarchy AS (
    -- 锚点:顶级部门
    SELECT 
        dept_id,
        dept_name,
        parent_dept_id,
        manager_id,
        1 as level,
        CAST(dept_name AS CHAR(1000)) as path
    FROM departments 
    WHERE parent_dept_id IS NULL
    
    UNION ALL
    
    -- 递归:子部门
    SELECT 
        d.dept_id,
        d.dept_name,
        d.parent_dept_id,
        d.manager_id,
        oh.level + 1,
        CONCAT(oh.path, ' -> ', d.dept_name)
    FROM departments d
    INNER JOIN org_hierarchy oh ON d.parent_dept_id = oh.dept_id
    WHERE oh.level < 10  -- 防止无限递归
)
SELECT 
    dept_id,
    CONCAT(REPEAT('  ', level - 1), dept_name) as dept_hierarchy,
    level,
    path,
    manager_id
FROM org_hierarchy
ORDER BY path;""",
                "use_cases": ["组织架构", "分类层级", "菜单结构", "地区层级"]
            },
            {
                "title": "多CTE组合 - 复杂业务分析",
                "description": "组合多个CTE进行复杂的业务分析",
                "sql": """
-- 产品销售综合分析
WITH monthly_sales AS (
    SELECT 
        p.product_id,
        p.name as product_name,
        p.category_id,
        DATE_FORMAT(o.created_at, '%Y-%m') as sales_month,
        SUM(oi.quantity) as units_sold,
        SUM(oi.quantity * oi.price) as revenue
    FROM products p
    JOIN order_items oi ON p.id = oi.product_id
    JOIN orders o ON oi.order_id = o.id
    WHERE o.status = 'paid' 
      AND o.created_at >= DATE_SUB(NOW(), INTERVAL 12 MONTH)
    GROUP BY p.product_id, p.name, p.category_id, DATE_FORMAT(o.created_at, '%Y-%m')
),
product_trends AS (
    SELECT 
        product_id,
        product_name,
        category_id,
        sales_month,
        units_sold,
        revenue,
        LAG(revenue) OVER (PARTITION BY product_id ORDER BY sales_month) as prev_month_revenue,
        AVG(revenue) OVER (PARTITION BY product_id ORDER BY sales_month ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as rolling_avg_revenue
    FROM monthly_sales
),
category_performance AS (
    SELECT 
        category_id,
        sales_month,
        SUM(revenue) as category_revenue,
        COUNT(DISTINCT product_id) as active_products
    FROM monthly_sales
    GROUP BY category_id, sales_month
)
SELECT 
    pt.product_name,
    pt.sales_month,
    pt.revenue,
    pt.prev_month_revenue,
    CASE 
        WHEN pt.prev_month_revenue > 0 THEN 
            ROUND((pt.revenue - pt.prev_month_revenue) / pt.prev_month_revenue * 100, 2)
        ELSE NULL
    END as revenue_growth_percent,
    pt.rolling_avg_revenue,
    cp.category_revenue,
    ROUND(pt.revenue / cp.category_revenue * 100, 2) as category_share_percent
FROM product_trends pt
JOIN category_performance cp ON pt.category_id = cp.category_id AND pt.sales_month = cp.sales_month
WHERE pt.sales_month >= DATE_FORMAT(DATE_SUB(NOW(), INTERVAL 6 MONTH), '%Y-%m')
ORDER BY pt.sales_month DESC, pt.revenue DESC;""",
                "use_cases": ["销售分析", "趋势分析", "市场份额", "业绩评估"]
            }
        ]
        
        return examples

# TiDB高级特性演示
print("\n\n=== TiDB高级特性与优化 ===")

advanced_manager = TiDBAdvancedManager()

print("\n1. 分区策略设计:")
test_scenarios = [
    {
        "table_name": "orders",
        "data_characteristics": {
            "data_size_gb": 500,
            "growth_rate_monthly": 0.15,
            "query_patterns": ["time_range", "user_filter", "status_filter"],
            "time_column": "created_at",
            "uniform_distribution": False
        }
    },
    {
        "table_name": "user_events",
        "data_characteristics": {
            "data_size_gb": 1000,
            "growth_rate_monthly": 0.25,
            "query_patterns": ["user_filter", "event_type"],
            "distribution_column": "user_id",
            "uniform_distribution": True
        }
    },
    {
        "table_name": "regional_sales",
        "data_characteristics": {
            "data_size_gb": 200,
            "growth_rate_monthly": 0.1,
            "query_patterns": ["region_filter", "time_range"],
            "region_column": "region_code"
        }
    }
]

for scenario in test_scenarios:
    plan = advanced_manager.design_partition_strategy(
        scenario["table_name"], 
        scenario["data_characteristics"]
    )
    print(f"\n   {plan.table_name}表分区方案:")
    print(f"     策略: {plan.partition_strategy.value}")
    print(f"     类型: {plan.partition_type.value}")
    print(f"     分区键: {plan.partition_key}")
    print(f"     分区数: {plan.partition_count}")
    print(f"     预期性能提升: {plan.estimated_performance_gain}%")
    print(f"     维护复杂度: {plan.maintenance_complexity}")
    print(f"     存储效率: {plan.storage_efficiency}%")

print("\n2. 分区表DDL生成:")
for scenario in test_scenarios[:2]:
    plan = advanced_manager.design_partition_strategy(
        scenario["table_name"], 
        scenario["data_characteristics"]
    )
    ddl = advanced_manager.generate_partition_ddl(plan)
    lines = ddl.split('\n')
    print(f"\n   {plan.table_name}表DDL:")
    print(f"     DDL长度: {len(ddl)}字符")
    print(f"     DDL行数: {len(lines)}行")
    print("\n     DDL预览:")
    for line in lines[:10]:
        if line.strip():
            print(f"       {line}")
    if len(lines) > 10:
        print(f"       ... (还有{len(lines)-10}行)")

print("\n3. 查询优化分析:")
test_queries = [
    "SELECT * FROM orders WHERE created_at >= '2023-06-01' AND user_id = 12345",
    "SELECT user_id, COUNT(*) FROM orders WHERE status = 'paid' GROUP BY user_id ORDER BY COUNT(*) DESC",
    "SELECT * FROM users WHERE id IN (SELECT user_id FROM orders WHERE total_amount > 1000)"
]

for sql in test_queries:
    optimizations = advanced_manager.analyze_query_optimization(sql, {})
    print(f"\n   查询: {sql[:60]}...")
    print(f"     优化建议数量: {len(optimizations)}个")
    for i, opt in enumerate(optimizations[:2], 1):
        print(f"     建议{i}: {opt.optimization_type.value}")
        print(f"       性能提升: {opt.performance_improvement}%")
        print(f"       说明: {opt.explanation}")

print("\n4. HTAP配置生成:")
for scenario_name in ['ecommerce', 'finance']:
    config = advanced_manager.generate_htap_configuration(scenario_name)
    if not config.get('error'):
        print(f"\n   {config['scenario']}:")
        print(f"     TiFlash副本数: {config['tiflash_configuration']['tiflash_replicas']}")
        print(f"     资源组数量: {len(config['resource_groups'])}个")
        print(f"     OLTP平均延迟: {config['performance_targets']['oltp_avg_latency_ms']}ms")
        print(f"     OLAP平均查询时间: {config['performance_targets']['olap_avg_query_time_s']}s")
        print(f"     混合负载效率: {config['performance_targets']['mixed_workload_efficiency']}%")
        print(f"     优化建议: {len(config['optimization_recommendations'])}项")

print("\n5. 窗口函数示例:")
window_examples = advanced_manager.generate_window_function_examples()
for i, example in enumerate(window_examples, 1):
    print(f"\n   示例{i}: {example['title']}")
    print(f"     描述: {example['description']}")
    print(f"     适用场景: {', '.join(example['use_cases'][:3])}")
    sql_lines = example['sql'].strip().split('\n')
    print(f"     SQL行数: {len(sql_lines)}行")
    print("\n     SQL预览:")
    for line in sql_lines[:8]:
        if line.strip():
            print(f"       {line}")
    if len(sql_lines) > 8:
        print(f"       ... (还有{len(sql_lines)-8}行)")

print("\n6. CTE示例:")
cte_examples = advanced_manager.generate_cte_examples()
for i, example in enumerate(cte_examples, 1):
    print(f"\n   示例{i}: {example['title']}")
    print(f"     描述: {example['description']}")
    print(f"     适用场景: {', '.join(example['use_cases'][:3])}")
    sql_lines = example['sql'].strip().split('\n')
    print(f"     SQL行数: {len(sql_lines)}行")
    print("\n     SQL预览:")
    for line in sql_lines[:10]:
        if line.strip():
            print(f"       {line}")
    if len(sql_lines) > 10:
        print(f"       ... (还有{len(sql_lines)-10}行)")

窗口函数详解

1. 排名函数

ROW_NUMBER():

-- 为每行分配唯一的序号
SELECT 
    product_id,
    product_name,
    price,
    ROW_NUMBER() OVER (ORDER BY price DESC) as price_rank
FROM products
WHERE is_active = TRUE;

RANK() 和 DENSE_RANK():

-- 处理并列排名
SELECT 
    student_id,
    student_name,
    score,
    RANK() OVER (ORDER BY score DESC) as rank_with_gaps,
    DENSE_RANK() OVER (ORDER BY score DESC) as dense_rank
FROM student_scores
ORDER BY score DESC;

2. 聚合窗口函数

累计计算:

-- 累计销售额
SELECT 
    order_date,
    daily_revenue,
    SUM(daily_revenue) OVER (ORDER BY order_date) as cumulative_revenue,
    AVG(daily_revenue) OVER (ORDER BY order_date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) as weekly_avg
FROM (
    SELECT 
        DATE(created_at) as order_date,
        SUM(total_amount) as daily_revenue
    FROM orders
    WHERE status = 'paid'
    GROUP BY DATE(created_at)
) daily_sales
ORDER BY order_date;

移动窗口计算:

-- 移动平均和移动总和
SELECT 
    user_id,
    login_date,
    daily_logins,
    AVG(daily_logins) OVER (
        PARTITION BY user_id 
        ORDER BY login_date 
        ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
    ) as weekly_avg_logins,
    SUM(daily_logins) OVER (
        PARTITION BY user_id 
        ORDER BY login_date 
        ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
    ) as monthly_total_logins
FROM user_daily_stats
ORDER BY user_id, login_date;

3. 偏移函数

LAG() 和 LEAD():

-- 计算同比和环比
SELECT 
    sales_month,
    monthly_revenue,
    LAG(monthly_revenue, 1) OVER (ORDER BY sales_month) as prev_month_revenue,
    LAG(monthly_revenue, 12) OVER (ORDER BY sales_month) as same_month_last_year,
    ROUND(
        (monthly_revenue - LAG(monthly_revenue, 1) OVER (ORDER BY sales_month)) / 
        LAG(monthly_revenue, 1) OVER (ORDER BY sales_month) * 100, 2
    ) as month_over_month_growth,
    ROUND(
        (monthly_revenue - LAG(monthly_revenue, 12) OVER (ORDER BY sales_month)) / 
        LAG(monthly_revenue, 12) OVER (ORDER BY sales_month) * 100, 2
    ) as year_over_year_growth
FROM monthly_sales_summary
ORDER BY sales_month;

4. 分布函数

NTILE():

-- 数据分组
SELECT 
    customer_id,
    total_spent,
    NTILE(4) OVER (ORDER BY total_spent DESC) as spending_quartile,
    CASE NTILE(4) OVER (ORDER BY total_spent DESC)
        WHEN 1 THEN 'Top 25%'
        WHEN 2 THEN 'Second 25%'
        WHEN 3 THEN 'Third 25%'
        WHEN 4 THEN 'Bottom 25%'
    END as customer_segment
FROM customer_spending_summary
ORDER BY total_spent DESC;

PERCENT_RANK():

-- 百分位排名
SELECT 
    employee_id,
    salary,
    PERCENT_RANK() OVER (ORDER BY salary) as salary_percentile,
    ROUND(PERCENT_RANK() OVER (ORDER BY salary) * 100, 2) as salary_percentile_pct
FROM employees
ORDER BY salary DESC;

公用表表达式(CTE)

1. 基础CTE

简化复杂查询:

-- 用户消费分析
WITH user_spending AS (
    SELECT 
        user_id,
        COUNT(*) as order_count,
        SUM(total_amount) as total_spent,
        AVG(total_amount) as avg_order_value
    FROM orders 
    WHERE status = 'paid'
    GROUP BY user_id
),
user_segments AS (
    SELECT 
        user_id,
        order_count,
        total_spent,
        avg_order_value,
        CASE 
            WHEN total_spent >= 10000 THEN 'VIP'
            WHEN total_spent >= 5000 THEN 'Premium'
            WHEN total_spent >= 1000 THEN 'Regular'
            ELSE 'New'
        END as segment
    FROM user_spending
)
SELECT 
    segment,
    COUNT(*) as user_count,
    AVG(total_spent) as avg_spending,
    AVG(order_count) as avg_orders
FROM user_segments
GROUP BY segment
ORDER BY avg_spending DESC;

2. 递归CTE

组织层级查询:

-- 部门层级结构
WITH RECURSIVE dept_hierarchy AS (
    -- 锚点查询:顶级部门
    SELECT 
        dept_id,
        dept_name,
        parent_dept_id,
        1 as level,
        CAST(dept_name AS CHAR(500)) as path
    FROM departments 
    WHERE parent_dept_id IS NULL
    
    UNION ALL
    
    -- 递归查询:子部门
    SELECT 
        d.dept_id,
        d.dept_name,
        d.parent_dept_id,
        dh.level + 1,
        CONCAT(dh.path, ' -> ', d.dept_name)
    FROM departments d
    INNER JOIN dept_hierarchy dh ON d.parent_dept_id = dh.dept_id
    WHERE dh.level < 10
)
SELECT 
    dept_id,
    CONCAT(REPEAT('  ', level - 1), dept_name) as hierarchy_display,
    level,
    path
FROM dept_hierarchy
ORDER BY path;

分类树遍历:

-- 商品分类层级
WITH RECURSIVE category_tree AS (
    SELECT 
        category_id,
        category_name,
        parent_category_id,
        1 as depth,
        category_name as full_path
    FROM product_categories 
    WHERE parent_category_id IS NULL
    
    UNION ALL
    
    SELECT 
        pc.category_id,
        pc.category_name,
        pc.parent_category_id,
        ct.depth + 1,
        CONCAT(ct.full_path, ' / ', pc.category_name)
    FROM product_categories pc
    INNER JOIN category_tree ct ON pc.parent_category_id = ct.category_id
    WHERE ct.depth < 5
)
SELECT 
    category_id,
    category_name,
    depth,
    full_path,
    (SELECT COUNT(*) FROM products p WHERE p.category_id = ct.category_id) as product_count
FROM category_tree ct
ORDER BY full_path;

HTAP混合负载处理

1. TiFlash配置

创建TiFlash副本:

-- 为表创建TiFlash副本
ALTER TABLE orders SET TIFLASH REPLICA 2;
ALTER TABLE order_items SET TIFLASH REPLICA 2;
ALTER TABLE users SET TIFLASH REPLICA 1;

-- 检查TiFlash副本状态
SELECT 
    TABLE_SCHEMA,
    TABLE_NAME,
    REPLICA_COUNT,
    LOCATION_LABELS,
    AVAILABLE,
    PROGRESS
FROM INFORMATION_SCHEMA.TIFLASH_REPLICA;

强制使用TiFlash:

-- 设置会话级别使用TiFlash
SET SESSION tidb_isolation_read_engines = 'tiflash';

-- 分析查询使用TiFlash
SELECT /*+ READ_FROM_STORAGE(TIFLASH[orders, order_items]) */
    DATE(o.created_at) as order_date,
    COUNT(*) as order_count,
    SUM(o.total_amount) as daily_revenue,
    COUNT(DISTINCT o.user_id) as unique_customers
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE o.created_at >= DATE_SUB(NOW(), INTERVAL 30 DAY)
GROUP BY DATE(o.created_at)
ORDER BY order_date;

2. 资源组管理

创建资源组:

-- 创建OLTP资源组
CREATE RESOURCE GROUP rg_oltp
    RU_PER_SEC = 2000
    PRIORITY = HIGH
    BURSTABLE;

-- 创建OLAP资源组
CREATE RESOURCE GROUP rg_olap
    RU_PER_SEC = 1000
    PRIORITY = MEDIUM;

-- 查看资源组
SELECT * FROM INFORMATION_SCHEMA.RESOURCE_GROUPS;

绑定资源组:

-- 为用户绑定资源组
ALTER USER 'oltp_user'@'%' RESOURCE GROUP rg_oltp;
ALTER USER 'olap_user'@'%' RESOURCE GROUP rg_olap;

-- 为SQL绑定资源组
SET RESOURCE GROUP rg_olap;
SELECT /*+ RESOURCE_GROUP(rg_olap) */
    category,
    SUM(revenue) as total_revenue
FROM sales_summary
GROUP BY category;

3. MPP查询优化

启用MPP:

-- 启用MPP模式
SET SESSION tidb_allow_mpp = ON;
SET SESSION tidb_enforce_mpp = ON;

-- MPP聚合查询
SELECT /*+ MPP_1PHASE_AGG() */
    region,
    product_category,
    SUM(sales_amount) as total_sales,
    COUNT(DISTINCT customer_id) as unique_customers
FROM sales_fact sf
JOIN product_dim pd ON sf.product_id = pd.product_id
JOIN customer_dim cd ON sf.customer_id = cd.customer_id
WHERE sf.sale_date >= '2023-01-01'
GROUP BY region, product_category
ORDER BY total_sales DESC;

分布式执行计划优化

1. 执行计划分析

查看执行计划:

-- 查看详细执行计划
EXPLAIN ANALYZE SELECT 
    u.username,
    COUNT(o.id) as order_count,
    SUM(o.total_amount) as total_spent
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.created_at >= '2023-01-01'
GROUP BY u.id, u.username
HAVING COUNT(o.id) > 5
ORDER BY total_spent DESC
LIMIT 100;

分析执行统计:

-- 查看执行统计信息
EXPLAIN FORMAT='verbose' SELECT 
    product_id,
    SUM(quantity) as total_sold
FROM order_items
WHERE created_at >= DATE_SUB(NOW(), INTERVAL 30 DAY)
GROUP BY product_id
ORDER BY total_sold DESC
LIMIT 10;

2. 索引优化

复合索引设计:

-- 为多条件查询创建复合索引
CREATE INDEX idx_orders_user_status_time 
ON orders (user_id, status, created_at);

-- 为排序查询创建索引
CREATE INDEX idx_orders_time_amount 
ON orders (created_at DESC, total_amount DESC);

-- 覆盖索引
CREATE INDEX idx_orders_covering 
ON orders (status, user_id, total_amount, created_at);

表达式索引:

-- 为计算列创建索引
CREATE INDEX idx_orders_year_month 
ON orders ((YEAR(created_at) * 100 + MONTH(created_at)));

-- 为JSON字段创建索引
CREATE INDEX idx_user_profile_age 
ON users ((CAST(profile->>'$.age' AS UNSIGNED)));

3. 统计信息管理

更新统计信息:

-- 手动更新表统计信息
ANALYZE TABLE orders;
ANALYZE TABLE users;

-- 更新指定列统计信息
ANALYZE TABLE orders UPDATE HISTOGRAM ON user_id, status;

-- 查看统计信息
SHOW STATS_META WHERE table_name = 'orders';
SHOW STATS_HISTOGRAMS WHERE table_name = 'orders';

热点问题处理

1. 热点识别

监控热点Region:

-- 查看热点Region
SELECT 
    table_name,
    region_id,
    read_keys,
    read_bytes,
    write_keys,
    write_bytes
FROM INFORMATION_SCHEMA.TIKV_REGION_STATUS
WHERE read_keys > 1000 OR write_keys > 1000
ORDER BY read_keys + write_keys DESC
LIMIT 20;

分析慢查询:

-- 查看慢查询日志
SELECT 
    query_time,
    query,
    parse_time,
    compile_time,
    process_keys,
    total_keys
FROM INFORMATION_SCHEMA.SLOW_QUERY
WHERE time >= DATE_SUB(NOW(), INTERVAL 1 HOUR)
ORDER BY query_time DESC
LIMIT 10;

2. 热点解决方案

自增ID优化:

-- 使用SHARD_ROW_ID_BITS分散热点
CREATE TABLE orders_optimized (
    id BIGINT AUTO_INCREMENT,
    user_id BIGINT NOT NULL,
    order_no VARCHAR(32) NOT NULL,
    total_amount DECIMAL(10,2) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (id)
) SHARD_ROW_ID_BITS = 4;

随机化主键:

-- 使用UUID避免热点
CREATE TABLE events_optimized (
    id VARCHAR(36) DEFAULT (UUID()),
    user_id BIGINT NOT NULL,
    event_type VARCHAR(50) NOT NULL,
    event_data JSON,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (id),
    KEY idx_user_time (user_id, created_at)
);

分区表分散:

-- 使用哈希分区分散写入
CREATE TABLE user_actions (
    id BIGINT AUTO_INCREMENT,
    user_id BIGINT NOT NULL,
    action_type VARCHAR(50) NOT NULL,
    action_data JSON,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (id, user_id),
    KEY idx_user_time (user_id, created_at)
) PARTITION BY HASH(user_id) PARTITIONS 16;

性能调优最佳实践

1. 系统配置优化

TiDB配置优化:

# tidb.toml
[performance]
max-procs = 0                    # 使用所有CPU核心
max-memory = "75%"               # 最大内存使用率
stmt-count-limit = 5000          # 单个事务语句数限制
txn-total-size-limit = 104857600 # 事务大小限制(100MB)

[prepared-plan-cache]
enabled = true                   # 启用执行计划缓存
capacity = 1000                  # 缓存容量
memory-guard-ratio = 0.1         # 内存保护比例

[tikv-client]
grpc-connection-count = 4        # gRPC连接数
grpc-keepalive-time = 10         # 保活时间
grpc-keepalive-timeout = 3       # 保活超时

TiKV配置优化:

# tikv.toml
[server]
grpc-concurrency = 8             # gRPC并发数
grpc-raft-conn-num = 10          # Raft连接数

[storage]
block-cache-size = "45%"         # 块缓存大小

[raftstore]
raft-log-gc-count-limit = 50000  # Raft日志GC限制
raft-log-gc-size-limit = "72MB"  # Raft日志大小限制
region-split-check-diff = "32MB" # Region分裂检查差异

[rocksdb.defaultcf]
block-size = "64KB"              # 块大小
write-buffer-size = "128MB"      # 写缓冲区大小
max-write-buffer-number = 5      # 最大写缓冲区数
compression-per-level = ["no", "no", "lz4", "lz4", "lz4", "zstd", "zstd"]

2. 查询优化技巧

索引使用优化:

-- 1. 使用前缀索引减少存储空间
CREATE INDEX idx_email_prefix ON users (email(20));

-- 2. 使用函数索引优化计算查询
CREATE INDEX idx_order_month ON orders ((MONTH(created_at)));

-- 3. 使用多列索引优化复合条件
CREATE INDEX idx_user_status_time ON orders (user_id, status, created_at);

-- 4. 使用覆盖索引避免回表
CREATE INDEX idx_order_summary ON orders (user_id, status, total_amount, created_at);

查询重写优化:

-- 优化前:使用子查询
SELECT * FROM users 
WHERE id IN (
    SELECT user_id FROM orders 
    WHERE total_amount > 1000
);

-- 优化后:使用EXISTS
SELECT u.* FROM users u
WHERE EXISTS (
    SELECT 1 FROM orders o 
    WHERE o.user_id = u.id AND o.total_amount > 1000
);

-- 优化前:使用OR条件
SELECT * FROM orders 
WHERE status = 'paid' OR status = 'shipped';

-- 优化后:使用IN条件
SELECT * FROM orders 
WHERE status IN ('paid', 'shipped');

分页查询优化:

-- 优化前:OFFSET分页(性能差)
SELECT * FROM orders 
ORDER BY created_at DESC 
LIMIT 20 OFFSET 10000;

-- 优化后:游标分页(性能好)
SELECT * FROM orders 
WHERE created_at < '2023-06-01 10:30:00'
ORDER BY created_at DESC 
LIMIT 20;

-- 使用主键范围分页
SELECT * FROM orders 
WHERE id > 100000
ORDER BY id 
LIMIT 20;

3. 批量操作优化

批量插入优化:

-- 使用批量INSERT
INSERT INTO orders (user_id, order_no, total_amount) VALUES
(1, 'ORD001', 100.00),
(2, 'ORD002', 200.00),
(3, 'ORD003', 300.00);

-- 使用INSERT ... SELECT
INSERT INTO order_archive 
SELECT * FROM orders 
WHERE created_at < DATE_SUB(NOW(), INTERVAL 1 YEAR);

-- 使用LOAD DATA导入大量数据
LOAD DATA LOCAL INFILE '/path/to/data.csv'
INTO TABLE orders
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
IGNORE 1 ROWS;

批量更新优化:

-- 使用CASE WHEN批量更新
UPDATE products 
SET price = CASE 
    WHEN id = 1 THEN 99.99
    WHEN id = 2 THEN 199.99
    WHEN id = 3 THEN 299.99
    ELSE price
END
WHERE id IN (1, 2, 3);

-- 使用JOIN更新
UPDATE orders o
JOIN (
    SELECT order_id, SUM(quantity * price) as total
    FROM order_items 
    GROUP BY order_id
) oi ON o.id = oi.order_id
SET o.total_amount = oi.total;

4. 监控和诊断

性能监控查询:

-- 查看当前连接
SHOW PROCESSLIST;

-- 查看慢查询
SELECT 
    query_time,
    query,
    rows_examined,
    rows_sent
FROM INFORMATION_SCHEMA.SLOW_QUERY
WHERE time >= DATE_SUB(NOW(), INTERVAL 1 HOUR)
ORDER BY query_time DESC
LIMIT 10;

-- 查看表统计信息
SHOW TABLE STATUS LIKE 'orders';

-- 查看索引使用情况
SELECT 
    table_name,
    index_name,
    cardinality,
    sub_part,
    packed,
    nullable,
    index_type
FROM INFORMATION_SCHEMA.STATISTICS
WHERE table_schema = 'your_database'
ORDER BY table_name, seq_in_index;

系统状态监控:

-- 查看TiDB集群状态
SHOW CLUSTER_INFO;

-- 查看Region分布
SELECT 
    table_name,
    COUNT(*) as region_count,
    SUM(approximate_size) as total_size_mb
FROM INFORMATION_SCHEMA.TIKV_REGION_STATUS
GROUP BY table_name
ORDER BY total_size_mb DESC;

-- 查看热点Region
SELECT 
    table_name,
    region_id,
    read_keys + write_keys as total_keys,
    read_bytes + write_bytes as total_bytes
FROM INFORMATION_SCHEMA.TIKV_REGION_STATUS
WHERE read_keys + write_keys > 1000
ORDER BY total_keys DESC
LIMIT 20;

TiDB特有功能

1. 悲观锁和乐观锁

悲观锁事务:

-- 启用悲观锁
SET SESSION tidb_txn_mode = 'pessimistic';

BEGIN;
-- 悲观锁会立即加锁
SELECT balance FROM accounts WHERE id = 1 FOR UPDATE;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
COMMIT;

乐观锁事务:

-- 启用乐观锁
SET SESSION tidb_txn_mode = 'optimistic';

BEGIN;
-- 乐观锁在提交时检查冲突
SELECT balance FROM accounts WHERE id = 1;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
COMMIT;

2. 大事务处理

大事务配置:

-- 设置大事务限制
SET SESSION tidb_txn_total_size_limit = 1073741824; -- 1GB
SET SESSION tidb_stmt_count_limit = 10000;

-- 使用批量提交处理大事务
SET SESSION tidb_batch_commit = ON;
SET SESSION tidb_batch_size = 1000;

3. 分布式事务

跨Region事务:

-- 查看事务状态
SELECT 
    id,
    user,
    db,
    command,
    time,
    state,
    info
FROM INFORMATION_SCHEMA.PROCESSLIST
WHERE command = 'Query' AND time > 10;

-- 查看锁等待
SELECT 
    waiting_trx_id,
    waiting_pid,
    blocking_trx_id,
    blocking_pid,
    sql_digest
FROM INFORMATION_SCHEMA.DEADLOCKS;

总结

关键要点

  1. 分区表设计

    • 根据查询模式选择合适的分区策略
    • 时间序列数据使用范围分区
    • 均匀分布数据使用哈希分区
    • 地理位置数据使用列表分区
  2. 高级SQL功能

    • 窗口函数提供强大的分析能力
    • CTE简化复杂查询逻辑
    • 递归CTE处理层级数据
  3. HTAP能力

    • TiFlash提供列式存储和MPP计算
    • 资源组实现工作负载隔离
    • 实时分析不影响OLTP性能
  4. 性能优化

    • 合理设计索引策略
    • 优化查询执行计划
    • 解决热点问题
    • 监控系统性能

最佳实践

  1. 架构设计

    • 合理规划分区策略
    • 设计高效的索引
    • 避免热点Region
    • 实施资源隔离
  2. 开发规范

    • 编写高效的SQL查询
    • 使用批量操作
    • 合理使用事务
    • 避免长事务
  3. 运维管理

    • 定期更新统计信息
    • 监控慢查询
    • 分析执行计划
    • 优化系统配置
  4. 故障处理

    • 识别性能瓶颈
    • 解决热点问题
    • 处理锁冲突
    • 优化资源使用

下一步学习

  1. 集群管理 - 学习TiDB集群的部署、扩容、备份恢复
  2. 数据迁移 - 掌握从MySQL等数据库迁移到TiDB的方法
  3. 监控运维 - 深入了解TiDB的监控体系和运维工具
  4. 生态工具 - 学习TiCDC、TiSpark、TiBinlog等生态工具

通过本章的学习,您已经掌握了TiDB的高级特性和性能优化技巧。这些知识将帮助您在实际项目中充分发挥TiDB分布式数据库的优势,构建高性能、高可用的数据库系统。