概述

查询优化是ClickHouse性能调优的核心内容。本章将深入介绍ClickHouse的查询执行原理、性能分析方法、优化策略和最佳实践,帮助你构建高性能的分析查询。

查询执行原理

查询处理流程

ClickHouse的查询处理包含多个阶段,理解这些阶段有助于进行针对性优化。


## 性能监控与调优

性能监控是持续优化ClickHouse性能的基础,通过系统监控和查询分析可以及时发现和解决性能问题。

```python
class MetricType(Enum):
    """监控指标类型"""
    QUERY_PERFORMANCE = "query_performance"
    SYSTEM_RESOURCE = "system_resource"
    TABLE_STATISTICS = "table_statistics"
    CLUSTER_HEALTH = "cluster_health"

class AlertLevel(Enum):
    """告警级别"""
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"
    EMERGENCY = "emergency"

@dataclass
class PerformanceMetric:
    """性能指标"""
    metric_name: str
    metric_type: MetricType
    current_value: float
    threshold_warning: float
    threshold_critical: float
    unit: str
    description: str
    timestamp: float

@dataclass
class QueryAnalysisResult:
    """查询分析结果"""
    query_id: str
    sql_hash: str
    execution_time_ms: float
    rows_read: int
    bytes_read: int
    memory_usage_mb: float
    cpu_time_ms: float
    optimization_suggestions: List[str]
    performance_grade: str  # A, B, C, D, F

@dataclass
class SystemAlert:
    """系统告警"""
    alert_id: str
    level: AlertLevel
    metric_name: str
    current_value: float
    threshold: float
    message: str
    timestamp: float
    suggested_actions: List[str]

class ClickHousePerformanceMonitor:
    """ClickHouse性能监控器"""
    
    def __init__(self):
        self.metrics_config = self._initialize_metrics_config()
        self.performance_baselines = self._initialize_performance_baselines()
        self.alert_rules = self._initialize_alert_rules()
    
    def _initialize_metrics_config(self) -> Dict[str, PerformanceMetric]:
        """初始化监控指标配置"""
        return {
            "query_duration_p95": PerformanceMetric(
                metric_name="query_duration_p95",
                metric_type=MetricType.QUERY_PERFORMANCE,
                current_value=0.0,
                threshold_warning=1000.0,  # 1秒
                threshold_critical=5000.0,  # 5秒
                unit="ms",
                description="查询执行时间95分位数",
                timestamp=time.time()
            ),
            "memory_usage_percent": PerformanceMetric(
                metric_name="memory_usage_percent",
                metric_type=MetricType.SYSTEM_RESOURCE,
                current_value=0.0,
                threshold_warning=80.0,
                threshold_critical=95.0,
                unit="%",
                description="内存使用率",
                timestamp=time.time()
            ),
            "disk_usage_percent": PerformanceMetric(
                metric_name="disk_usage_percent",
                metric_type=MetricType.SYSTEM_RESOURCE,
                current_value=0.0,
                threshold_warning=85.0,
                threshold_critical=95.0,
                unit="%",
                description="磁盘使用率",
                timestamp=time.time()
            ),
            "concurrent_queries": PerformanceMetric(
                metric_name="concurrent_queries",
                metric_type=MetricType.QUERY_PERFORMANCE,
                current_value=0.0,
                threshold_warning=100.0,
                threshold_critical=200.0,
                unit="count",
                description="并发查询数量",
                timestamp=time.time()
            ),
            "merge_operations_per_second": PerformanceMetric(
                metric_name="merge_operations_per_second",
                metric_type=MetricType.TABLE_STATISTICS,
                current_value=0.0,
                threshold_warning=10.0,
                threshold_critical=20.0,
                unit="ops/sec",
                description="合并操作频率",
                timestamp=time.time()
            ),
            "replication_lag_seconds": PerformanceMetric(
                metric_name="replication_lag_seconds",
                metric_type=MetricType.CLUSTER_HEALTH,
                current_value=0.0,
                threshold_warning=60.0,
                threshold_critical=300.0,
                unit="seconds",
                description="复制延迟",
                timestamp=time.time()
            )
        }
    
    def _initialize_performance_baselines(self) -> Dict[str, Dict[str, float]]:
        """初始化性能基线"""
        return {
            "query_performance": {
                "simple_select_ms_per_million_rows": 100,
                "aggregation_ms_per_million_rows": 500,
                "join_ms_per_million_rows": 2000,
                "max_acceptable_memory_mb": 8192
            },
            "system_resources": {
                "optimal_cpu_usage_percent": 70,
                "optimal_memory_usage_percent": 75,
                "optimal_disk_io_mbps": 500,
                "max_concurrent_queries": 50
            },
            "table_operations": {
                "max_parts_per_partition": 100,
                "optimal_merge_frequency_per_hour": 10,
                "max_mutation_queue_size": 10
            }
        }
    
    def _initialize_alert_rules(self) -> Dict[str, Dict[str, Any]]:
        """初始化告警规则"""
        return {
            "slow_query": {
                "condition": "query_duration > 5000",
                "level": AlertLevel.WARNING,
                "message": "检测到慢查询",
                "actions": ["分析查询执行计划", "检查索引使用情况", "考虑查询优化"]
            },
            "high_memory_usage": {
                "condition": "memory_usage_percent > 90",
                "level": AlertLevel.CRITICAL,
                "message": "内存使用率过高",
                "actions": ["检查大查询", "增加内存限制", "优化查询"]
            },
            "disk_space_low": {
                "condition": "disk_usage_percent > 90",
                "level": AlertLevel.CRITICAL,
                "message": "磁盘空间不足",
                "actions": ["清理旧数据", "扩展存储", "检查数据压缩"]
            },
            "replication_lag": {
                "condition": "replication_lag_seconds > 300",
                "level": AlertLevel.WARNING,
                "message": "复制延迟过高",
                "actions": ["检查网络连接", "检查副本状态", "重启复制"]
            }
        }
    
    def collect_system_metrics(self) -> Dict[str, float]:
        """收集系统指标"""
        # 模拟系统指标收集
        metrics = {
            "cpu_usage_percent": random.uniform(20, 85),
            "memory_usage_percent": random.uniform(40, 90),
            "disk_usage_percent": random.uniform(30, 95),
            "disk_io_mbps": random.uniform(50, 800),
            "network_io_mbps": random.uniform(10, 200),
            "concurrent_queries": random.randint(5, 150),
            "queries_per_second": random.uniform(10, 500)
        }
        
        return metrics
    
    def analyze_query_performance(self, query_log: List[Dict[str, Any]]) -> List[QueryAnalysisResult]:
        """分析查询性能"""
        analysis_results = []
        
        for log_entry in query_log:
            # 计算性能评分
            performance_score = self._calculate_performance_score(log_entry)
            
            # 生成优化建议
            suggestions = self._generate_optimization_suggestions(log_entry)
            
            # 确定性能等级
            grade = self._determine_performance_grade(performance_score)
            
            result = QueryAnalysisResult(
                query_id=log_entry.get("query_id", f"query_{int(time.time())}"),
                sql_hash=log_entry.get("sql_hash", "unknown"),
                execution_time_ms=log_entry.get("execution_time_ms", 0),
                rows_read=log_entry.get("rows_read", 0),
                bytes_read=log_entry.get("bytes_read", 0),
                memory_usage_mb=log_entry.get("memory_usage_mb", 0),
                cpu_time_ms=log_entry.get("cpu_time_ms", 0),
                optimization_suggestions=suggestions,
                performance_grade=grade
            )
            
            analysis_results.append(result)
        
        return analysis_results
    
    def _calculate_performance_score(self, log_entry: Dict[str, Any]) -> float:
        """计算性能评分"""
        score = 100.0
        
        # 执行时间评分
        execution_time = log_entry.get("execution_time_ms", 0)
        if execution_time > 5000:
            score -= 30
        elif execution_time > 1000:
            score -= 15
        elif execution_time > 500:
            score -= 5
        
        # 内存使用评分
        memory_usage = log_entry.get("memory_usage_mb", 0)
        if memory_usage > 4096:
            score -= 25
        elif memory_usage > 1024:
            score -= 10
        elif memory_usage > 512:
            score -= 5
        
        # 数据读取效率评分
        rows_read = log_entry.get("rows_read", 0)
        bytes_read = log_entry.get("bytes_read", 0)
        
        if rows_read > 0 and bytes_read > 0:
            bytes_per_row = bytes_read / rows_read
            if bytes_per_row > 1000:  # 每行超过1KB
                score -= 15
            elif bytes_per_row > 500:
                score -= 8
        
        # CPU效率评分
        cpu_time = log_entry.get("cpu_time_ms", 0)
        if cpu_time > execution_time * 2:  # CPU时间超过执行时间2倍
            score -= 20
        elif cpu_time > execution_time * 1.5:
            score -= 10
        
        return max(0, score)
    
    def _generate_optimization_suggestions(self, log_entry: Dict[str, Any]) -> List[str]:
        """生成优化建议"""
        suggestions = []
        
        execution_time = log_entry.get("execution_time_ms", 0)
        memory_usage = log_entry.get("memory_usage_mb", 0)
        rows_read = log_entry.get("rows_read", 0)
        
        if execution_time > 5000:
            suggestions.append("查询执行时间过长,建议检查索引使用情况")
            suggestions.append("考虑添加WHERE条件减少数据扫描范围")
        
        if memory_usage > 4096:
            suggestions.append("内存使用过高,建议优化JOIN操作或使用LIMIT")
            suggestions.append("考虑分批处理大数据集")
        
        if rows_read > 10_000_000:
            suggestions.append("扫描行数过多,建议添加分区裁剪条件")
            suggestions.append("检查是否可以使用物化视图")
        
        if not suggestions:
            suggestions.append("查询性能良好,无需特殊优化")
        
        return suggestions
    
    def _determine_performance_grade(self, score: float) -> str:
        """确定性能等级"""
        if score >= 90:
            return "A"
        elif score >= 80:
            return "B"
        elif score >= 70:
            return "C"
        elif score >= 60:
            return "D"
        else:
            return "F"
    
    def check_alerts(self, current_metrics: Dict[str, float]) -> List[SystemAlert]:
        """检查告警"""
        alerts = []
        
        # 检查慢查询
        if current_metrics.get("max_query_duration_ms", 0) > 5000:
            alerts.append(SystemAlert(
                alert_id=f"slow_query_{int(time.time())}",
                level=AlertLevel.WARNING,
                metric_name="query_duration",
                current_value=current_metrics["max_query_duration_ms"],
                threshold=5000,
                message="检测到慢查询",
                timestamp=time.time(),
                suggested_actions=["分析查询执行计划", "检查索引使用情况"]
            ))
        
        # 检查内存使用
        memory_usage = current_metrics.get("memory_usage_percent", 0)
        if memory_usage > 90:
            alerts.append(SystemAlert(
                alert_id=f"high_memory_{int(time.time())}",
                level=AlertLevel.CRITICAL,
                metric_name="memory_usage_percent",
                current_value=memory_usage,
                threshold=90,
                message="内存使用率过高",
                timestamp=time.time(),
                suggested_actions=["检查大查询", "增加内存限制"]
            ))
        
        # 检查磁盘使用
        disk_usage = current_metrics.get("disk_usage_percent", 0)
        if disk_usage > 90:
            alerts.append(SystemAlert(
                alert_id=f"disk_full_{int(time.time())}",
                level=AlertLevel.CRITICAL,
                metric_name="disk_usage_percent",
                current_value=disk_usage,
                threshold=90,
                message="磁盘空间不足",
                timestamp=time.time(),
                suggested_actions=["清理旧数据", "扩展存储"]
            ))
        
        return alerts
    
    def generate_performance_report(self, time_range_hours: int = 24) -> Dict[str, Any]:
        """生成性能报告"""
        # 模拟性能数据收集
        report_data = {
            "report_period": f"最近{time_range_hours}小时",
            "generated_at": time.strftime("%Y-%m-%d %H:%M:%S"),
            "summary": {
                "total_queries": random.randint(10000, 50000),
                "avg_query_duration_ms": random.uniform(100, 800),
                "p95_query_duration_ms": random.uniform(500, 2000),
                "p99_query_duration_ms": random.uniform(1000, 5000),
                "slow_queries_count": random.randint(10, 100),
                "failed_queries_count": random.randint(0, 20)
            },
            "resource_usage": {
                "avg_cpu_percent": random.uniform(40, 80),
                "peak_cpu_percent": random.uniform(80, 95),
                "avg_memory_percent": random.uniform(50, 85),
                "peak_memory_percent": random.uniform(85, 95),
                "avg_disk_io_mbps": random.uniform(100, 400),
                "peak_disk_io_mbps": random.uniform(400, 800)
            },
            "top_slow_queries": [
                {
                    "sql_hash": "abc123",
                    "avg_duration_ms": 3500,
                    "execution_count": 25,
                    "total_cpu_time_ms": 87500
                },
                {
                    "sql_hash": "def456",
                    "avg_duration_ms": 2800,
                    "execution_count": 15,
                    "total_cpu_time_ms": 42000
                }
            ],
            "optimization_opportunities": [
                "为user_id列添加索引可优化30%的查询",
                "优化JOIN顺序可减少15%的内存使用",
                "使用分区裁剪可提升25%的查询速度"
            ],
            "alerts_summary": {
                "critical_alerts": random.randint(0, 5),
                "warning_alerts": random.randint(5, 20),
                "info_alerts": random.randint(10, 50)
            }
        }
        
        return report_data
    
    def recommend_tuning_actions(self, performance_data: Dict[str, Any]) -> List[Dict[str, Any]]:
        """推荐调优操作"""
        recommendations = []
        
        summary = performance_data.get("summary", {})
        resource_usage = performance_data.get("resource_usage", {})
        
        # 基于查询性能的推荐
        if summary.get("p95_query_duration_ms", 0) > 1000:
            recommendations.append({
                "category": "查询优化",
                "priority": "高",
                "action": "优化慢查询",
                "description": "95分位查询时间过长,需要优化索引和查询结构",
                "expected_improvement": "30-50%性能提升",
                "implementation_steps": [
                    "分析慢查询日志",
                    "检查索引使用情况",
                    "优化查询结构",
                    "添加必要索引"
                ]
            })
        
        # 基于资源使用的推荐
        if resource_usage.get("peak_memory_percent", 0) > 90:
            recommendations.append({
                "category": "内存优化",
                "priority": "高",
                "action": "优化内存使用",
                "description": "内存使用率过高,可能影响系统稳定性",
                "expected_improvement": "减少20-30%内存使用",
                "implementation_steps": [
                    "调整max_memory_usage设置",
                    "优化大查询",
                    "使用流式处理",
                    "增加物理内存"
                ]
            })
        
        if resource_usage.get("peak_cpu_percent", 0) > 85:
            recommendations.append({
                "category": "CPU优化",
                "priority": "中",
                "action": "优化CPU使用",
                "description": "CPU使用率过高,可能需要优化查询或扩展资源",
                "expected_improvement": "减少15-25%CPU使用",
                "implementation_steps": [
                    "优化复杂查询",
                    "调整并发设置",
                    "使用查询缓存",
                    "考虑水平扩展"
                ]
            })
        
        # 基于慢查询数量的推荐
        if summary.get("slow_queries_count", 0) > 50:
            recommendations.append({
                "category": "索引优化",
                "priority": "中",
                "action": "添加索引",
                "description": "慢查询数量较多,可能缺少必要的索引",
                "expected_improvement": "40-60%查询速度提升",
                "implementation_steps": [
                    "分析查询模式",
                    "识别高频过滤列",
                    "创建合适索引",
                    "监控索引效果"
                ]
            })
        
        return recommendations

# 性能监控示例
print("\n\n=== ClickHouse性能监控分析 ===")

monitor = ClickHousePerformanceMonitor()

print("\n1. 系统指标收集:")
system_metrics = monitor.collect_system_metrics()

for metric_name, value in list(system_metrics.items())[:5]:
    unit = "%" if "percent" in metric_name else ("mbps" if "mbps" in metric_name else "count")
    print(f"   {metric_name}: {value:.1f} {unit}")

print("\n2. 查询性能分析:")
# 模拟查询日志
sample_query_log = [
    {
        "query_id": "query_001",
        "sql_hash": "abc123",
        "execution_time_ms": 2500,
        "rows_read": 5_000_000,
        "bytes_read": 500_000_000,
        "memory_usage_mb": 1024,
        "cpu_time_ms": 3000
    },
    {
        "query_id": "query_002",
        "sql_hash": "def456",
        "execution_time_ms": 150,
        "rows_read": 100_000,
        "bytes_read": 10_000_000,
        "memory_usage_mb": 64,
        "cpu_time_ms": 120
    },
    {
        "query_id": "query_003",
        "sql_hash": "ghi789",
        "execution_time_ms": 8000,
        "rows_read": 20_000_000,
        "bytes_read": 2_000_000_000,
        "memory_usage_mb": 4096,
        "cpu_time_ms": 12000
    }
]

query_analysis = monitor.analyze_query_performance(sample_query_log)

for i, analysis in enumerate(query_analysis, 1):
    print(f"\n   查询 {i} (等级: {analysis.performance_grade}):")
    print(f"     执行时间: {analysis.execution_time_ms:.0f}ms")
    print(f"     内存使用: {analysis.memory_usage_mb:.0f}MB")
    print(f"     读取行数: {analysis.rows_read:,}")
    print(f"     优化建议: {analysis.optimization_suggestions[0]}")

print("\n3. 告警检查:")
# 模拟当前指标
current_metrics = {
    "max_query_duration_ms": 6000,
    "memory_usage_percent": 92,
    "disk_usage_percent": 88,
    "concurrent_queries": 45
}

alerts = monitor.check_alerts(current_metrics)

for i, alert in enumerate(alerts, 1):
    print(f"\n   告警 {i} ({alert.level.value.upper()}):")
    print(f"     指标: {alert.metric_name}")
    print(f"     当前值: {alert.current_value:.1f}")
    print(f"     阈值: {alert.threshold:.1f}")
    print(f"     消息: {alert.message}")
    print(f"     建议操作: {', '.join(alert.suggested_actions[:2])}")

print("\n4. 性能报告:")
performance_report = monitor.generate_performance_report(24)

print(f"\n   报告周期: {performance_report['report_period']}")
print(f"   生成时间: {performance_report['generated_at']}")
print(f"\n   查询统计:")
for key, value in performance_report['summary'].items():
    unit = "ms" if "duration" in key else "个"
    print(f"     {key}: {value:.0f if isinstance(value, float) else value} {unit}")

print(f"\n   资源使用:")
for key, value in list(performance_report['resource_usage'].items())[:3]:
    unit = "%" if "percent" in key else "mbps"
    print(f"     {key}: {value:.1f} {unit}")

print(f"\n   优化机会:")
for i, opportunity in enumerate(performance_report['optimization_opportunities'], 1):
    print(f"     {i}. {opportunity}")

print("\n5. 调优建议:")
tuning_recommendations = monitor.recommend_tuning_actions(performance_report)

for i, rec in enumerate(tuning_recommendations, 1):
    print(f"\n   建议 {i} ({rec['priority']}优先级):")
    print(f"     类别: {rec['category']}")
    print(f"     操作: {rec['action']}")
    print(f"     描述: {rec['description']}")
    print(f"     预期改进: {rec['expected_improvement']}")
    print(f"     实施步骤: {', '.join(rec['implementation_steps'][:2])}...")

最佳实践与总结

查询优化最佳实践

  1. 索引策略

    • 为高频查询列创建合适的索引
    • 主键选择要考虑查询模式和数据分布
    • 避免过多索引影响写入性能
  2. 查询编写规范

    • 使用明确的列名而非SELECT *
    • 合理使用WHERE条件进行数据过滤
    • 优化JOIN顺序,小表在前
    • 使用LIMIT限制结果集大小
  3. 分区设计

    • 基于查询模式设计分区键
    • 避免分区过多或过少
    • 定期清理历史分区

性能调优策略

  1. 系统配置优化

    • 合理配置内存限制
    • 调整并发查询数量
    • 优化磁盘IO设置
  2. 表结构优化

    • 选择合适的表引擎
    • 优化列的数据类型
    • 合理设计表结构
  3. 监控和维护

    • 建立完善的监控体系
    • 定期分析慢查询
    • 持续优化和调整

总结

本章详细介绍了ClickHouse的查询优化与性能调优,包括:

  • 查询执行原理: 理解ClickHouse的查询处理流程和优化机制
  • 索引优化策略: 掌握各种索引类型的使用场景和优化方法
  • 性能监控: 建立完善的性能监控和告警体系
  • 调优实践: 学习系统级和查询级的性能优化方法

通过本章的学习,你应该能够: - 分析和优化ClickHouse查询性能 - 设计合理的索引策略 - 建立有效的性能监控体系 - 实施系统性的性能调优

下一章将介绍ClickHouse的集群部署与管理,学习如何构建高可用的ClickHouse集群。python from enum import Enum from dataclasses import dataclass from typing import Dict, List, Any, Optional import time import random

class QueryStage(Enum): “”“查询执行阶段”“” PARSING = “parsing” ANALYSIS = “analysis” OPTIMIZATION = “optimization” EXECUTION = “execution” RESULT_PROCESSING = “result_processing”

class OptimizationType(Enum): “”“优化类型”“” PREDICATE_PUSHDOWN = “predicate_pushdown” PROJECTION_PUSHDOWN = “projection_pushdown” JOIN_REORDERING = “join_reordering” INDEX_USAGE = “index_usage” PARTITION_PRUNING = “partition_pruning” AGGREGATION_PUSHDOWN = “aggregation_pushdown”

@dataclass class QueryExecutionPlan: “”“查询执行计划”“” query_id: str sql: str stages: List[Dict[str, Any]] estimated_cost: float estimated_rows: int optimizations_applied: List[OptimizationType]

@dataclass class QueryPerformanceMetrics: “”“查询性能指标”“” execution_time_ms: float rows_read: int bytes_read: int rows_processed: int memory_usage_mb: float cpu_usage_percent: float io_wait_percent: float cache_hit_ratio: float

class ClickHouseQueryOptimizer: “”“ClickHouse查询优化器”“”

def __init__(self):
    self.optimization_rules = self._initialize_optimization_rules()
    self.performance_baselines = self._initialize_performance_baselines()

def _initialize_optimization_rules(self) -> Dict[OptimizationType, Dict[str, Any]]:
    """初始化优化规则"""
    return {
        OptimizationType.PREDICATE_PUSHDOWN: {
            "description": "将WHERE条件下推到存储层",
            "benefits": ["减少读取数据量", "提高过滤效率", "降低网络传输"],
            "applicable_scenarios": ["有选择性过滤条件", "多表连接", "子查询"],
            "performance_gain": "50-90%",
            "implementation_complexity": "low"
        },
        OptimizationType.PROJECTION_PUSHDOWN: {
            "description": "将SELECT列下推到存储层",
            "benefits": ["减少列读取", "降低内存使用", "提高缓存效率"],
            "applicable_scenarios": ["宽表查询", "列式存储", "网络传输优化"],
            "performance_gain": "20-60%",
            "implementation_complexity": "low"
        },
        OptimizationType.JOIN_REORDERING: {
            "description": "重新排列JOIN顺序以优化执行",
            "benefits": ["减少中间结果", "优化内存使用", "提高JOIN效率"],
            "applicable_scenarios": ["多表JOIN", "不同表大小", "复杂查询"],
            "performance_gain": "30-80%",
            "implementation_complexity": "medium"
        },
        OptimizationType.INDEX_USAGE: {
            "description": "利用索引加速查询",
            "benefits": ["快速定位数据", "减少扫描范围", "提高查询速度"],
            "applicable_scenarios": ["等值查询", "范围查询", "排序查询"],
            "performance_gain": "70-95%",
            "implementation_complexity": "low"
        },
        OptimizationType.PARTITION_PRUNING: {
            "description": "根据分区键过滤不相关分区",
            "benefits": ["减少扫描分区", "提高查询速度", "降低IO开销"],
            "applicable_scenarios": ["时间范围查询", "分区表", "大数据集"],
            "performance_gain": "60-90%",
            "implementation_complexity": "low"
        },
        OptimizationType.AGGREGATION_PUSHDOWN: {
            "description": "将聚合操作下推到存储层",
            "benefits": ["减少数据传输", "提前聚合", "降低内存压力"],
            "applicable_scenarios": ["GROUP BY查询", "聚合函数", "大数据集"],
            "performance_gain": "40-70%",
            "implementation_complexity": "medium"
        }
    }

def _initialize_performance_baselines(self) -> Dict[str, Dict[str, float]]:
    """初始化性能基线"""
    return {
        "simple_select": {
            "rows_per_second": 10_000_000,
            "bytes_per_second": 1_000_000_000,
            "memory_mb_per_million_rows": 100
        },
        "aggregation": {
            "rows_per_second": 5_000_000,
            "bytes_per_second": 500_000_000,
            "memory_mb_per_million_rows": 200
        },
        "join": {
            "rows_per_second": 1_000_000,
            "bytes_per_second": 200_000_000,
            "memory_mb_per_million_rows": 500
        },
        "window_function": {
            "rows_per_second": 2_000_000,
            "bytes_per_second": 300_000_000,
            "memory_mb_per_million_rows": 300
        }
    }

def analyze_query_execution_plan(self, sql: str) -> QueryExecutionPlan:
    """分析查询执行计划"""
    query_id = f"query_{int(time.time() * 1000)}"

    # 模拟查询解析和分析
    stages = []

    # 解析阶段
    stages.append({
        "stage": QueryStage.PARSING,
        "description": "SQL语法解析和词法分析",
        "estimated_time_ms": 1,
        "operations": ["tokenization", "syntax_tree_building"]
    })

    # 分析阶段
    stages.append({
        "stage": QueryStage.ANALYSIS,
        "description": "语义分析和类型检查",
        "estimated_time_ms": 5,
        "operations": ["table_resolution", "column_validation", "type_checking"]
    })

    # 优化阶段
    optimizations_applied = self._identify_applicable_optimizations(sql)
    stages.append({
        "stage": QueryStage.OPTIMIZATION,
        "description": "查询优化和执行计划生成",
        "estimated_time_ms": 10,
        "operations": [opt.value for opt in optimizations_applied]
    })

    # 执行阶段
    estimated_rows = self._estimate_result_rows(sql)
    estimated_cost = self._estimate_query_cost(sql, estimated_rows)

    stages.append({
        "stage": QueryStage.EXECUTION,
        "description": "查询执行和数据处理",
        "estimated_time_ms": estimated_cost,
        "operations": ["data_reading", "filtering", "processing"]
    })

    # 结果处理阶段
    stages.append({
        "stage": QueryStage.RESULT_PROCESSING,
        "description": "结果格式化和返回",
        "estimated_time_ms": max(1, estimated_rows / 1000000),
        "operations": ["result_formatting", "network_transfer"]
    })

    return QueryExecutionPlan(
        query_id=query_id,
        sql=sql,
        stages=stages,
        estimated_cost=estimated_cost,
        estimated_rows=estimated_rows,
        optimizations_applied=optimizations_applied
    )

def _identify_applicable_optimizations(self, sql: str) -> List[OptimizationType]:
    """识别适用的优化策略"""
    optimizations = []
    sql_lower = sql.lower()

    # 检查谓词下推
    if 'where' in sql_lower:
        optimizations.append(OptimizationType.PREDICATE_PUSHDOWN)

    # 检查投影下推
    if 'select' in sql_lower and '*' not in sql_lower:
        optimizations.append(OptimizationType.PROJECTION_PUSHDOWN)

    # 检查JOIN重排序
    if 'join' in sql_lower:
        optimizations.append(OptimizationType.JOIN_REORDERING)

    # 检查索引使用
    if any(keyword in sql_lower for keyword in ['where', 'order by', 'group by']):
        optimizations.append(OptimizationType.INDEX_USAGE)

    # 检查分区裁剪
    if any(keyword in sql_lower for keyword in ['date', 'timestamp', 'time']):
        optimizations.append(OptimizationType.PARTITION_PRUNING)

    # 检查聚合下推
    if any(keyword in sql_lower for keyword in ['group by', 'count', 'sum', 'avg']):
        optimizations.append(OptimizationType.AGGREGATION_PUSHDOWN)

    return optimizations

def _estimate_result_rows(self, sql: str) -> int:
    """估算结果行数"""
    sql_lower = sql.lower()

    # 基础行数估算
    base_rows = 1_000_000

    # 根据查询类型调整
    if 'group by' in sql_lower:
        base_rows = int(base_rows * 0.1)  # 聚合查询通常减少行数
    elif 'join' in sql_lower:
        base_rows = int(base_rows * 1.5)  # JOIN可能增加行数
    elif 'where' in sql_lower:
        base_rows = int(base_rows * 0.3)  # WHERE条件减少行数

    return max(1, base_rows)

def _estimate_query_cost(self, sql: str, estimated_rows: int) -> float:
    """估算查询成本(毫秒)"""
    sql_lower = sql.lower()

    # 基础成本计算
    base_cost_per_row = 0.001  # 每行0.001毫秒

    # 根据查询复杂度调整
    complexity_multiplier = 1.0

    if 'join' in sql_lower:
        complexity_multiplier *= 3.0
    if 'group by' in sql_lower:
        complexity_multiplier *= 2.0
    if 'order by' in sql_lower:
        complexity_multiplier *= 1.5
    if 'window' in sql_lower or 'over' in sql_lower:
        complexity_multiplier *= 4.0

    return estimated_rows * base_cost_per_row * complexity_multiplier

def generate_optimization_recommendations(self, execution_plan: QueryExecutionPlan) -> List[Dict[str, Any]]:
    """生成优化建议"""
    recommendations = []

    for optimization in execution_plan.optimizations_applied:
        rule = self.optimization_rules[optimization]

        recommendations.append({
            "optimization_type": optimization,
            "description": rule["description"],
            "expected_performance_gain": rule["performance_gain"],
            "implementation_complexity": rule["implementation_complexity"],
            "benefits": rule["benefits"],
            "applicable_scenarios": rule["applicable_scenarios"]
        })

    # 添加通用优化建议
    if execution_plan.estimated_cost > 1000:  # 查询成本较高
        recommendations.append({
            "optimization_type": "general",
            "description": "考虑添加适当的索引或优化表结构",
            "expected_performance_gain": "30-70%",
            "implementation_complexity": "medium",
            "benefits": ["减少查询时间", "提高并发性能"],
            "applicable_scenarios": ["高频查询", "复杂查询"]
        })

    if execution_plan.estimated_rows > 10_000_000:  # 结果集较大
        recommendations.append({
            "optimization_type": "result_limiting",
            "description": "考虑使用LIMIT或分页查询",
            "expected_performance_gain": "50-90%",
            "implementation_complexity": "low",
            "benefits": ["减少内存使用", "提高响应速度"],
            "applicable_scenarios": ["大结果集", "交互式查询"]
        })

    return recommendations

def simulate_query_performance(self, sql: str, table_size_rows: int = 1_000_000) -> QueryPerformanceMetrics:
    """模拟查询性能"""
    sql_lower = sql.lower()

    # 确定查询类型
    query_type = "simple_select"
    if 'join' in sql_lower:
        query_type = "join"
    elif 'group by' in sql_lower:
        query_type = "aggregation"
    elif 'window' in sql_lower or 'over' in sql_lower:
        query_type = "window_function"

    baseline = self.performance_baselines[query_type]

    # 计算性能指标
    rows_read = table_size_rows
    if 'where' in sql_lower:
        rows_read = int(rows_read * random.uniform(0.1, 0.5))  # WHERE条件减少读取行数

    bytes_read = rows_read * random.uniform(50, 200)  # 每行50-200字节

    execution_time_ms = (rows_read / baseline["rows_per_second"]) * 1000
    execution_time_ms *= random.uniform(0.8, 1.2)  # 添加随机变化

    memory_usage_mb = (rows_read / 1_000_000) * baseline["memory_mb_per_million_rows"]

    return QueryPerformanceMetrics(
        execution_time_ms=execution_time_ms,
        rows_read=rows_read,
        bytes_read=int(bytes_read),
        rows_processed=rows_read,
        memory_usage_mb=memory_usage_mb,
        cpu_usage_percent=random.uniform(20, 80),
        io_wait_percent=random.uniform(5, 30),
        cache_hit_ratio=random.uniform(0.6, 0.9)
    )

def compare_query_performance(self, original_sql: str, optimized_sql: str, 
                            table_size_rows: int = 1_000_000) -> Dict[str, Any]:
    """比较查询性能"""
    original_metrics = self.simulate_query_performance(original_sql, table_size_rows)
    optimized_metrics = self.simulate_query_performance(optimized_sql, table_size_rows)

    # 计算改进百分比
    improvements = {
        "execution_time": ((original_metrics.execution_time_ms - optimized_metrics.execution_time_ms) 
                          / original_metrics.execution_time_ms) * 100,
        "rows_read": ((original_metrics.rows_read - optimized_metrics.rows_read) 
                     / original_metrics.rows_read) * 100,
        "memory_usage": ((original_metrics.memory_usage_mb - optimized_metrics.memory_usage_mb) 
                       / original_metrics.memory_usage_mb) * 100
    }

    return {
        "original_metrics": original_metrics,
        "optimized_metrics": optimized_metrics,
        "improvements": improvements,
        "summary": {
            "faster_by_percent": improvements["execution_time"],
            "memory_saved_percent": improvements["memory_usage"],
            "io_reduced_percent": improvements["rows_read"]
        }
    }

查询优化示例

print(“\n\n=== ClickHouse查询优化分析 ===”)

optimizer = ClickHouseQueryOptimizer()

print(“\n1. 查询执行计划分析:”) test_queries = [ “SELECT user_id, count() FROM events WHERE date >= ‘2024-01-01’ GROUP BY user_id”, “SELECT e., u.name FROM events e JOIN users u ON e.user_id = u.id WHERE e.date >= today()”, “SELECT date, sum(revenue) OVER (ORDER BY date ROWS 7 PRECEDING) FROM sales ORDER BY date” ]

for i, query in enumerate(test_queries, 1): execution_plan = optimizer.analyze_query_execution_plan(query)

print(f"\n   查询 {i}:")
print(f"     SQL: {query[:60]}...")
print(f"     预估成本: {execution_plan.estimated_cost:.2f}ms")
print(f"     预估行数: {execution_plan.estimated_rows:,}")
print(f"     应用优化: {[opt.value for opt in execution_plan.optimizations_applied][:3]}")

# 显示执行阶段
total_time = sum(stage['estimated_time_ms'] for stage in execution_plan.stages)
print(f"     执行阶段 (总计: {total_time:.2f}ms):")
for stage in execution_plan.stages[:3]:  # 显示前3个阶段
    print(f"       - {stage['stage'].value}: {stage['estimated_time_ms']:.2f}ms")

print(“\n2. 优化建议生成:”) for i, query in enumerate(test_queries[:2], 1): execution_plan = optimizer.analyze_query_execution_plan(query) recommendations = optimizer.generate_optimization_recommendations(execution_plan)

print(f"\n   查询 {i} 优化建议:")
for j, rec in enumerate(recommendations[:2], 1):
    print(f"     建议 {j}: {rec['description']}")
    print(f"       预期性能提升: {rec['expected_performance_gain']}")
    print(f"       实现复杂度: {rec['implementation_complexity']}")
    print(f"       主要收益: {', '.join(rec['benefits'][:2])}")

print(“\n3. 查询性能模拟:”) performance_test_cases = [ {“sql”: “SELECT * FROM large_table WHERE id = 12345”, “size”: 10_000_000}, {“sql”: “SELECT category, count() FROM products GROUP BY category”, “size”: 1_000_000}, {“sql”: “SELECT p., c.name FROM products p JOIN categories c ON p.category_id = c.id”, “size”: 5_000_000} ]

for i, case in enumerate(performance_test_cases, 1): metrics = optimizer.simulate_query_performance(case[“sql”], case[“size”])

print(f"\n   测试 {i} (表大小: {case['size']:,} 行):")
print(f"     SQL: {case['sql'][:50]}...")
print(f"     执行时间: {metrics.execution_time_ms:.2f}ms")
print(f"     读取行数: {metrics.rows_read:,}")
print(f"     内存使用: {metrics.memory_usage_mb:.2f}MB")
print(f"     缓存命中率: {metrics.cache_hit_ratio:.2%}")

print(“\n4. 查询性能对比:”) comparison_cases = [ { “original”: “SELECT * FROM events WHERE toYYYYMM(date) = 202401”, “optimized”: “SELECT * FROM events WHERE date >= ‘2024-01-01’ AND date < ‘2024-02-01’” }, { “original”: “SELECT user_id, count() FROM events GROUP BY user_id ORDER BY count() DESC”, “optimized”: “SELECT user_id, count() FROM events GROUP BY user_id ORDER BY count() DESC LIMIT 100” } ]

for i, case in enumerate(comparison_cases, 1): comparison = optimizer.compare_query_performance( case[“original”], case[“optimized”], 5_000_000 )

print(f"\n   对比 {i}:")
print(f"     原始查询: {case['original'][:40]}...")
print(f"     优化查询: {case['optimized'][:40]}...")
print(f"     性能提升: {comparison['summary']['faster_by_percent']:.1f}%")
print(f"     内存节省: {comparison['summary']['memory_saved_percent']:.1f}%")
print(f"     IO减少: {comparison['summary']['io_reduced_percent']:.1f}%")

## 索引优化策略

索引是ClickHouse查询优化的关键因素,合理的索引设计可以显著提升查询性能。

```python
class IndexType(Enum):
    """索引类型"""
    PRIMARY_KEY = "primary_key"
    MINMAX = "minmax"
    SET = "set"
    BLOOM_FILTER = "bloom_filter"
    TOKENBF_V1 = "tokenbf_v1"
    NGRAMBF_V1 = "ngrambf_v1"

@dataclass
class IndexRecommendation:
    """索引推荐"""
    index_type: IndexType
    columns: List[str]
    description: str
    expected_improvement: str
    use_cases: List[str]
    creation_sql: str
    maintenance_cost: str

class ClickHouseIndexOptimizer:
    """ClickHouse索引优化器"""
    
    def __init__(self):
        self.index_characteristics = self._initialize_index_characteristics()
    
    def _initialize_index_characteristics(self) -> Dict[IndexType, Dict[str, Any]]:
        """初始化索引特性"""
        return {
            IndexType.PRIMARY_KEY: {
                "description": "主键索引,用于快速定位和排序",
                "best_for": ["等值查询", "范围查询", "排序查询"],
                "performance_gain": "70-95%",
                "memory_overhead": "low",
                "maintenance_cost": "low",
                "cardinality_requirement": "medium_to_high"
            },
            IndexType.MINMAX: {
                "description": "最小最大值索引,适用于范围查询",
                "best_for": ["范围查询", "数值比较", "日期范围"],
                "performance_gain": "50-80%",
                "memory_overhead": "very_low",
                "maintenance_cost": "very_low",
                "cardinality_requirement": "any"
            },
            IndexType.SET: {
                "description": "集合索引,适用于IN查询",
                "best_for": ["IN查询", "等值查询", "小集合过滤"],
                "performance_gain": "60-90%",
                "memory_overhead": "medium",
                "maintenance_cost": "medium",
                "cardinality_requirement": "low_to_medium"
            },
            IndexType.BLOOM_FILTER: {
                "description": "布隆过滤器索引,适用于等值查询",
                "best_for": ["等值查询", "存在性检查", "高基数列"],
                "performance_gain": "40-70%",
                "memory_overhead": "low",
                "maintenance_cost": "low",
                "cardinality_requirement": "high"
            },
            IndexType.TOKENBF_V1: {
                "description": "分词布隆过滤器,适用于文本搜索",
                "best_for": ["文本搜索", "关键词查询", "模糊匹配"],
                "performance_gain": "30-60%",
                "memory_overhead": "medium",
                "maintenance_cost": "medium",
                "cardinality_requirement": "high"
            },
            IndexType.NGRAMBF_V1: {
                "description": "N-gram布隆过滤器,适用于子串搜索",
                "best_for": ["子串搜索", "LIKE查询", "模式匹配"],
                "performance_gain": "25-50%",
                "memory_overhead": "high",
                "maintenance_cost": "high",
                "cardinality_requirement": "high"
            }
        }
    
    def analyze_query_patterns(self, queries: List[str]) -> Dict[str, Any]:
        """分析查询模式"""
        patterns = {
            "equality_filters": [],
            "range_filters": [],
            "in_filters": [],
            "like_filters": [],
            "order_by_columns": [],
            "group_by_columns": [],
            "join_columns": []
        }
        
        for query in queries:
            query_lower = query.lower()
            
            # 分析WHERE条件
            if 'where' in query_lower:
                # 等值过滤
                if ' = ' in query_lower:
                    patterns["equality_filters"].extend(self._extract_equality_columns(query))
                
                # 范围过滤
                if any(op in query_lower for op in ['>', '<', 'between', '>=', '<=']):
                    patterns["range_filters"].extend(self._extract_range_columns(query))
                
                # IN过滤
                if ' in ' in query_lower:
                    patterns["in_filters"].extend(self._extract_in_columns(query))
                
                # LIKE过滤
                if ' like ' in query_lower:
                    patterns["like_filters"].extend(self._extract_like_columns(query))
            
            # 分析ORDER BY
            if 'order by' in query_lower:
                patterns["order_by_columns"].extend(self._extract_order_by_columns(query))
            
            # 分析GROUP BY
            if 'group by' in query_lower:
                patterns["group_by_columns"].extend(self._extract_group_by_columns(query))
            
            # 分析JOIN
            if 'join' in query_lower:
                patterns["join_columns"].extend(self._extract_join_columns(query))
        
        # 统计频率
        for pattern_type in patterns:
            column_counts = {}
            for column in patterns[pattern_type]:
                column_counts[column] = column_counts.get(column, 0) + 1
            patterns[pattern_type] = sorted(column_counts.items(), key=lambda x: x[1], reverse=True)
        
        return patterns
    
    def _extract_equality_columns(self, query: str) -> List[str]:
        """提取等值查询列"""
        # 简化实现,实际应该使用SQL解析器
        import re
        pattern = r'(\w+)\s*=\s*'
        matches = re.findall(pattern, query, re.IGNORECASE)
        return [match for match in matches if match.lower() not in ['select', 'from', 'where', 'and', 'or']]
    
    def _extract_range_columns(self, query: str) -> List[str]:
        """提取范围查询列"""
        import re
        pattern = r'(\w+)\s*[<>=]+\s*'
        matches = re.findall(pattern, query, re.IGNORECASE)
        return [match for match in matches if match.lower() not in ['select', 'from', 'where', 'and', 'or']]
    
    def _extract_in_columns(self, query: str) -> List[str]:
        """提取IN查询列"""
        import re
        pattern = r'(\w+)\s+in\s*\('
        matches = re.findall(pattern, query, re.IGNORECASE)
        return matches
    
    def _extract_like_columns(self, query: str) -> List[str]:
        """提取LIKE查询列"""
        import re
        pattern = r'(\w+)\s+like\s+'
        matches = re.findall(pattern, query, re.IGNORECASE)
        return matches
    
    def _extract_order_by_columns(self, query: str) -> List[str]:
        """提取ORDER BY列"""
        import re
        pattern = r'order\s+by\s+([\w,\s]+)'
        match = re.search(pattern, query, re.IGNORECASE)
        if match:
            columns = match.group(1).split(',')
            return [col.strip().split()[0] for col in columns]
        return []
    
    def _extract_group_by_columns(self, query: str) -> List[str]:
        """提取GROUP BY列"""
        import re
        pattern = r'group\s+by\s+([\w,\s]+)'
        match = re.search(pattern, query, re.IGNORECASE)
        if match:
            columns = match.group(1).split(',')
            return [col.strip() for col in columns]
        return []
    
    def _extract_join_columns(self, query: str) -> List[str]:
        """提取JOIN列"""
        import re
        pattern = r'on\s+(\w+\.\w+)\s*=\s*(\w+\.\w+)'
        matches = re.findall(pattern, query, re.IGNORECASE)
        columns = []
        for match in matches:
            columns.extend([col.split('.')[1] for col in match])
        return columns
    
    def recommend_indexes(self, query_patterns: Dict[str, Any], 
                         table_info: Dict[str, Any]) -> List[IndexRecommendation]:
        """推荐索引"""
        recommendations = []
        
        # 主键索引推荐
        if query_patterns["equality_filters"] or query_patterns["range_filters"] or query_patterns["order_by_columns"]:
            # 选择最频繁使用的列作为主键候选
            all_columns = (query_patterns["equality_filters"] + 
                          query_patterns["range_filters"] + 
                          query_patterns["order_by_columns"])
            
            if all_columns:
                primary_key_candidate = all_columns[0][0]  # 最频繁的列
                recommendations.append(IndexRecommendation(
                    index_type=IndexType.PRIMARY_KEY,
                    columns=[primary_key_candidate],
                    description=f"将{primary_key_candidate}设置为主键以优化查询性能",
                    expected_improvement="70-95%",
                    use_cases=["等值查询", "范围查询", "排序查询"],
                    creation_sql=f"ORDER BY {primary_key_candidate}",
                    maintenance_cost="低"
                ))
        
        # MinMax索引推荐
        for column, frequency in query_patterns["range_filters"][:3]:  # 前3个最频繁的范围查询列
            recommendations.append(IndexRecommendation(
                index_type=IndexType.MINMAX,
                columns=[column],
                description=f"为{column}创建MinMax索引以优化范围查询",
                expected_improvement="50-80%",
                use_cases=["范围查询", "数值比较"],
                creation_sql=f"INDEX idx_minmax_{column} {column} TYPE minmax GRANULARITY 1",
                maintenance_cost="极低"
            ))
        
        # Set索引推荐
        for column, frequency in query_patterns["in_filters"][:2]:  # 前2个最频繁的IN查询列
            recommendations.append(IndexRecommendation(
                index_type=IndexType.SET,
                columns=[column],
                description=f"为{column}创建Set索引以优化IN查询",
                expected_improvement="60-90%",
                use_cases=["IN查询", "等值查询"],
                creation_sql=f"INDEX idx_set_{column} {column} TYPE set(100) GRANULARITY 1",
                maintenance_cost="中等"
            ))
        
        # Bloom Filter索引推荐
        for column, frequency in query_patterns["equality_filters"][:2]:  # 前2个最频繁的等值查询列
            if frequency >= 5:  # 只为高频查询列推荐
                recommendations.append(IndexRecommendation(
                    index_type=IndexType.BLOOM_FILTER,
                    columns=[column],
                    description=f"为{column}创建Bloom Filter索引以优化等值查询",
                    expected_improvement="40-70%",
                    use_cases=["等值查询", "存在性检查"],
                    creation_sql=f"INDEX idx_bloom_{column} {column} TYPE bloom_filter GRANULARITY 1",
                    maintenance_cost="低"
                ))
        
        # 文本搜索索引推荐
        for column, frequency in query_patterns["like_filters"]:
            if frequency >= 3:  # 为频繁的LIKE查询推荐
                recommendations.append(IndexRecommendation(
                    index_type=IndexType.TOKENBF_V1,
                    columns=[column],
                    description=f"为{column}创建Token Bloom Filter索引以优化文本搜索",
                    expected_improvement="30-60%",
                    use_cases=["文本搜索", "关键词查询"],
                    creation_sql=f"INDEX idx_token_{column} {column} TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1",
                    maintenance_cost="中等"
                ))
        
        return recommendations
    
    def generate_index_creation_script(self, table_name: str, 
                                     recommendations: List[IndexRecommendation]) -> str:
        """生成索引创建脚本"""
        script_parts = []
        script_parts.append(f"-- 索引优化脚本 for {table_name}")
        script_parts.append(f"-- 生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
        script_parts.append("")
        
        for i, rec in enumerate(recommendations, 1):
            script_parts.append(f"-- 索引 {i}: {rec.description}")
            script_parts.append(f"-- 预期性能提升: {rec.expected_improvement}")
            script_parts.append(f"-- 维护成本: {rec.maintenance_cost}")
            
            if rec.index_type == IndexType.PRIMARY_KEY:
                script_parts.append(f"-- 注意: 主键需要在CREATE TABLE时指定")
                script_parts.append(f"-- {rec.creation_sql}")
            else:
                script_parts.append(f"ALTER TABLE {table_name} ADD {rec.creation_sql};")
            
            script_parts.append("")
        
        return "\n".join(script_parts)
    
    def analyze_index_effectiveness(self, index_recommendations: List[IndexRecommendation],
                                  query_patterns: Dict[str, Any]) -> Dict[str, Any]:
        """分析索引有效性"""
        effectiveness_analysis = {
            "high_impact_indexes": [],
            "medium_impact_indexes": [],
            "low_impact_indexes": [],
            "total_expected_improvement": 0,
            "implementation_priority": []
        }
        
        for rec in index_recommendations:
            # 计算影响分数
            impact_score = 0
            
            # 基于索引类型的基础分数
            type_scores = {
                IndexType.PRIMARY_KEY: 90,
                IndexType.MINMAX: 70,
                IndexType.SET: 75,
                IndexType.BLOOM_FILTER: 60,
                IndexType.TOKENBF_V1: 50,
                IndexType.NGRAMBF_V1: 40
            }
            impact_score += type_scores.get(rec.index_type, 50)
            
            # 基于查询频率的调整
            column = rec.columns[0] if rec.columns else ""
            for pattern_type, columns in query_patterns.items():
                for col, frequency in columns:
                    if col == column:
                        impact_score += min(frequency * 5, 30)  # 最多加30分
                        break
            
            # 分类索引
            rec_with_score = {"recommendation": rec, "impact_score": impact_score}
            
            if impact_score >= 80:
                effectiveness_analysis["high_impact_indexes"].append(rec_with_score)
            elif impact_score >= 60:
                effectiveness_analysis["medium_impact_indexes"].append(rec_with_score)
            else:
                effectiveness_analysis["low_impact_indexes"].append(rec_with_score)
        
        # 计算总体预期改进
        total_improvement = 0
        for category in ["high_impact_indexes", "medium_impact_indexes", "low_impact_indexes"]:
            for item in effectiveness_analysis[category]:
                # 从预期改进字符串中提取数值
                improvement_str = item["recommendation"].expected_improvement
                if "-" in improvement_str:
                    avg_improvement = sum(map(int, improvement_str.replace("%", "").split("-"))) / 2
                else:
                    avg_improvement = int(improvement_str.replace("%", ""))
                total_improvement += avg_improvement * 0.1  # 权重调整
        
        effectiveness_analysis["total_expected_improvement"] = min(total_improvement, 90)  # 最大90%
        
        # 实现优先级
        all_indexes = (effectiveness_analysis["high_impact_indexes"] + 
                      effectiveness_analysis["medium_impact_indexes"] + 
                      effectiveness_analysis["low_impact_indexes"])
        
        effectiveness_analysis["implementation_priority"] = sorted(
            all_indexes, key=lambda x: x["impact_score"], reverse=True
        )
        
        return effectiveness_analysis

# 索引优化示例
print("\n\n=== ClickHouse索引优化分析 ===")

index_optimizer = ClickHouseIndexOptimizer()

print("\n1. 查询模式分析:")
sample_queries = [
    "SELECT * FROM events WHERE user_id = 12345",
    "SELECT * FROM events WHERE user_id = 67890",
    "SELECT * FROM events WHERE date >= '2024-01-01' AND date <= '2024-01-31'",
    "SELECT * FROM events WHERE event_type IN ('click', 'view', 'purchase')",
    "SELECT * FROM events WHERE message LIKE '%error%'",
    "SELECT user_id, count(*) FROM events GROUP BY user_id ORDER BY count(*) DESC",
    "SELECT e.*, u.name FROM events e JOIN users u ON e.user_id = u.id"
]

query_patterns = index_optimizer.analyze_query_patterns(sample_queries)

for pattern_type, columns in query_patterns.items():
    if columns:
        print(f"\n   {pattern_type.replace('_', ' ').title()}:")
        for column, frequency in columns[:3]:  # 显示前3个最频繁的
            print(f"     {column}: {frequency} 次")

print("\n2. 索引推荐:")
table_info = {"name": "events", "size_gb": 100, "daily_queries": 10000}
index_recommendations = index_optimizer.recommend_indexes(query_patterns, table_info)

for i, rec in enumerate(index_recommendations, 1):
    print(f"\n   推荐 {i}: {rec.index_type.value}")
    print(f"     列: {', '.join(rec.columns)}")
    print(f"     描述: {rec.description}")
    print(f"     预期改进: {rec.expected_improvement}")
    print(f"     维护成本: {rec.maintenance_cost}")
    print(f"     适用场景: {', '.join(rec.use_cases[:2])}")

print("\n3. 索引创建脚本:")
creation_script = index_optimizer.generate_index_creation_script("events", index_recommendations[:3])
print(creation_script[:500] + "..." if len(creation_script) > 500 else creation_script)

print("\n4. 索引有效性分析:")
effectiveness = index_optimizer.analyze_index_effectiveness(index_recommendations, query_patterns)

print(f"\n   总体预期性能提升: {effectiveness['total_expected_improvement']:.1f}%")
print(f"\n   高影响索引 ({len(effectiveness['high_impact_indexes'])} 个):")
for item in effectiveness['high_impact_indexes'][:2]:
    rec = item['recommendation']
    print(f"     - {rec.index_type.value} on {', '.join(rec.columns)} (影响分数: {item['impact_score']})")

print(f"\n   实现优先级 (前3个):")
for i, item in enumerate(effectiveness['implementation_priority'][:3], 1):
    rec = item['recommendation']
    print(f"     {i}. {rec.index_type.value} on {', '.join(rec.columns)} (分数: {item['impact_score']})")