概述
查询优化是ClickHouse性能调优的核心内容。本章将深入介绍ClickHouse的查询执行原理、性能分析方法、优化策略和最佳实践,帮助你构建高性能的分析查询。
查询执行原理
查询处理流程
ClickHouse的查询处理包含多个阶段,理解这些阶段有助于进行针对性优化。
## 性能监控与调优
性能监控是持续优化ClickHouse性能的基础,通过系统监控和查询分析可以及时发现和解决性能问题。
```python
class MetricType(Enum):
"""监控指标类型"""
QUERY_PERFORMANCE = "query_performance"
SYSTEM_RESOURCE = "system_resource"
TABLE_STATISTICS = "table_statistics"
CLUSTER_HEALTH = "cluster_health"
class AlertLevel(Enum):
"""告警级别"""
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
EMERGENCY = "emergency"
@dataclass
class PerformanceMetric:
"""性能指标"""
metric_name: str
metric_type: MetricType
current_value: float
threshold_warning: float
threshold_critical: float
unit: str
description: str
timestamp: float
@dataclass
class QueryAnalysisResult:
"""查询分析结果"""
query_id: str
sql_hash: str
execution_time_ms: float
rows_read: int
bytes_read: int
memory_usage_mb: float
cpu_time_ms: float
optimization_suggestions: List[str]
performance_grade: str # A, B, C, D, F
@dataclass
class SystemAlert:
"""系统告警"""
alert_id: str
level: AlertLevel
metric_name: str
current_value: float
threshold: float
message: str
timestamp: float
suggested_actions: List[str]
class ClickHousePerformanceMonitor:
"""ClickHouse性能监控器"""
def __init__(self):
self.metrics_config = self._initialize_metrics_config()
self.performance_baselines = self._initialize_performance_baselines()
self.alert_rules = self._initialize_alert_rules()
def _initialize_metrics_config(self) -> Dict[str, PerformanceMetric]:
"""初始化监控指标配置"""
return {
"query_duration_p95": PerformanceMetric(
metric_name="query_duration_p95",
metric_type=MetricType.QUERY_PERFORMANCE,
current_value=0.0,
threshold_warning=1000.0, # 1秒
threshold_critical=5000.0, # 5秒
unit="ms",
description="查询执行时间95分位数",
timestamp=time.time()
),
"memory_usage_percent": PerformanceMetric(
metric_name="memory_usage_percent",
metric_type=MetricType.SYSTEM_RESOURCE,
current_value=0.0,
threshold_warning=80.0,
threshold_critical=95.0,
unit="%",
description="内存使用率",
timestamp=time.time()
),
"disk_usage_percent": PerformanceMetric(
metric_name="disk_usage_percent",
metric_type=MetricType.SYSTEM_RESOURCE,
current_value=0.0,
threshold_warning=85.0,
threshold_critical=95.0,
unit="%",
description="磁盘使用率",
timestamp=time.time()
),
"concurrent_queries": PerformanceMetric(
metric_name="concurrent_queries",
metric_type=MetricType.QUERY_PERFORMANCE,
current_value=0.0,
threshold_warning=100.0,
threshold_critical=200.0,
unit="count",
description="并发查询数量",
timestamp=time.time()
),
"merge_operations_per_second": PerformanceMetric(
metric_name="merge_operations_per_second",
metric_type=MetricType.TABLE_STATISTICS,
current_value=0.0,
threshold_warning=10.0,
threshold_critical=20.0,
unit="ops/sec",
description="合并操作频率",
timestamp=time.time()
),
"replication_lag_seconds": PerformanceMetric(
metric_name="replication_lag_seconds",
metric_type=MetricType.CLUSTER_HEALTH,
current_value=0.0,
threshold_warning=60.0,
threshold_critical=300.0,
unit="seconds",
description="复制延迟",
timestamp=time.time()
)
}
def _initialize_performance_baselines(self) -> Dict[str, Dict[str, float]]:
"""初始化性能基线"""
return {
"query_performance": {
"simple_select_ms_per_million_rows": 100,
"aggregation_ms_per_million_rows": 500,
"join_ms_per_million_rows": 2000,
"max_acceptable_memory_mb": 8192
},
"system_resources": {
"optimal_cpu_usage_percent": 70,
"optimal_memory_usage_percent": 75,
"optimal_disk_io_mbps": 500,
"max_concurrent_queries": 50
},
"table_operations": {
"max_parts_per_partition": 100,
"optimal_merge_frequency_per_hour": 10,
"max_mutation_queue_size": 10
}
}
def _initialize_alert_rules(self) -> Dict[str, Dict[str, Any]]:
"""初始化告警规则"""
return {
"slow_query": {
"condition": "query_duration > 5000",
"level": AlertLevel.WARNING,
"message": "检测到慢查询",
"actions": ["分析查询执行计划", "检查索引使用情况", "考虑查询优化"]
},
"high_memory_usage": {
"condition": "memory_usage_percent > 90",
"level": AlertLevel.CRITICAL,
"message": "内存使用率过高",
"actions": ["检查大查询", "增加内存限制", "优化查询"]
},
"disk_space_low": {
"condition": "disk_usage_percent > 90",
"level": AlertLevel.CRITICAL,
"message": "磁盘空间不足",
"actions": ["清理旧数据", "扩展存储", "检查数据压缩"]
},
"replication_lag": {
"condition": "replication_lag_seconds > 300",
"level": AlertLevel.WARNING,
"message": "复制延迟过高",
"actions": ["检查网络连接", "检查副本状态", "重启复制"]
}
}
def collect_system_metrics(self) -> Dict[str, float]:
"""收集系统指标"""
# 模拟系统指标收集
metrics = {
"cpu_usage_percent": random.uniform(20, 85),
"memory_usage_percent": random.uniform(40, 90),
"disk_usage_percent": random.uniform(30, 95),
"disk_io_mbps": random.uniform(50, 800),
"network_io_mbps": random.uniform(10, 200),
"concurrent_queries": random.randint(5, 150),
"queries_per_second": random.uniform(10, 500)
}
return metrics
def analyze_query_performance(self, query_log: List[Dict[str, Any]]) -> List[QueryAnalysisResult]:
"""分析查询性能"""
analysis_results = []
for log_entry in query_log:
# 计算性能评分
performance_score = self._calculate_performance_score(log_entry)
# 生成优化建议
suggestions = self._generate_optimization_suggestions(log_entry)
# 确定性能等级
grade = self._determine_performance_grade(performance_score)
result = QueryAnalysisResult(
query_id=log_entry.get("query_id", f"query_{int(time.time())}"),
sql_hash=log_entry.get("sql_hash", "unknown"),
execution_time_ms=log_entry.get("execution_time_ms", 0),
rows_read=log_entry.get("rows_read", 0),
bytes_read=log_entry.get("bytes_read", 0),
memory_usage_mb=log_entry.get("memory_usage_mb", 0),
cpu_time_ms=log_entry.get("cpu_time_ms", 0),
optimization_suggestions=suggestions,
performance_grade=grade
)
analysis_results.append(result)
return analysis_results
def _calculate_performance_score(self, log_entry: Dict[str, Any]) -> float:
"""计算性能评分"""
score = 100.0
# 执行时间评分
execution_time = log_entry.get("execution_time_ms", 0)
if execution_time > 5000:
score -= 30
elif execution_time > 1000:
score -= 15
elif execution_time > 500:
score -= 5
# 内存使用评分
memory_usage = log_entry.get("memory_usage_mb", 0)
if memory_usage > 4096:
score -= 25
elif memory_usage > 1024:
score -= 10
elif memory_usage > 512:
score -= 5
# 数据读取效率评分
rows_read = log_entry.get("rows_read", 0)
bytes_read = log_entry.get("bytes_read", 0)
if rows_read > 0 and bytes_read > 0:
bytes_per_row = bytes_read / rows_read
if bytes_per_row > 1000: # 每行超过1KB
score -= 15
elif bytes_per_row > 500:
score -= 8
# CPU效率评分
cpu_time = log_entry.get("cpu_time_ms", 0)
if cpu_time > execution_time * 2: # CPU时间超过执行时间2倍
score -= 20
elif cpu_time > execution_time * 1.5:
score -= 10
return max(0, score)
def _generate_optimization_suggestions(self, log_entry: Dict[str, Any]) -> List[str]:
"""生成优化建议"""
suggestions = []
execution_time = log_entry.get("execution_time_ms", 0)
memory_usage = log_entry.get("memory_usage_mb", 0)
rows_read = log_entry.get("rows_read", 0)
if execution_time > 5000:
suggestions.append("查询执行时间过长,建议检查索引使用情况")
suggestions.append("考虑添加WHERE条件减少数据扫描范围")
if memory_usage > 4096:
suggestions.append("内存使用过高,建议优化JOIN操作或使用LIMIT")
suggestions.append("考虑分批处理大数据集")
if rows_read > 10_000_000:
suggestions.append("扫描行数过多,建议添加分区裁剪条件")
suggestions.append("检查是否可以使用物化视图")
if not suggestions:
suggestions.append("查询性能良好,无需特殊优化")
return suggestions
def _determine_performance_grade(self, score: float) -> str:
"""确定性能等级"""
if score >= 90:
return "A"
elif score >= 80:
return "B"
elif score >= 70:
return "C"
elif score >= 60:
return "D"
else:
return "F"
def check_alerts(self, current_metrics: Dict[str, float]) -> List[SystemAlert]:
"""检查告警"""
alerts = []
# 检查慢查询
if current_metrics.get("max_query_duration_ms", 0) > 5000:
alerts.append(SystemAlert(
alert_id=f"slow_query_{int(time.time())}",
level=AlertLevel.WARNING,
metric_name="query_duration",
current_value=current_metrics["max_query_duration_ms"],
threshold=5000,
message="检测到慢查询",
timestamp=time.time(),
suggested_actions=["分析查询执行计划", "检查索引使用情况"]
))
# 检查内存使用
memory_usage = current_metrics.get("memory_usage_percent", 0)
if memory_usage > 90:
alerts.append(SystemAlert(
alert_id=f"high_memory_{int(time.time())}",
level=AlertLevel.CRITICAL,
metric_name="memory_usage_percent",
current_value=memory_usage,
threshold=90,
message="内存使用率过高",
timestamp=time.time(),
suggested_actions=["检查大查询", "增加内存限制"]
))
# 检查磁盘使用
disk_usage = current_metrics.get("disk_usage_percent", 0)
if disk_usage > 90:
alerts.append(SystemAlert(
alert_id=f"disk_full_{int(time.time())}",
level=AlertLevel.CRITICAL,
metric_name="disk_usage_percent",
current_value=disk_usage,
threshold=90,
message="磁盘空间不足",
timestamp=time.time(),
suggested_actions=["清理旧数据", "扩展存储"]
))
return alerts
def generate_performance_report(self, time_range_hours: int = 24) -> Dict[str, Any]:
"""生成性能报告"""
# 模拟性能数据收集
report_data = {
"report_period": f"最近{time_range_hours}小时",
"generated_at": time.strftime("%Y-%m-%d %H:%M:%S"),
"summary": {
"total_queries": random.randint(10000, 50000),
"avg_query_duration_ms": random.uniform(100, 800),
"p95_query_duration_ms": random.uniform(500, 2000),
"p99_query_duration_ms": random.uniform(1000, 5000),
"slow_queries_count": random.randint(10, 100),
"failed_queries_count": random.randint(0, 20)
},
"resource_usage": {
"avg_cpu_percent": random.uniform(40, 80),
"peak_cpu_percent": random.uniform(80, 95),
"avg_memory_percent": random.uniform(50, 85),
"peak_memory_percent": random.uniform(85, 95),
"avg_disk_io_mbps": random.uniform(100, 400),
"peak_disk_io_mbps": random.uniform(400, 800)
},
"top_slow_queries": [
{
"sql_hash": "abc123",
"avg_duration_ms": 3500,
"execution_count": 25,
"total_cpu_time_ms": 87500
},
{
"sql_hash": "def456",
"avg_duration_ms": 2800,
"execution_count": 15,
"total_cpu_time_ms": 42000
}
],
"optimization_opportunities": [
"为user_id列添加索引可优化30%的查询",
"优化JOIN顺序可减少15%的内存使用",
"使用分区裁剪可提升25%的查询速度"
],
"alerts_summary": {
"critical_alerts": random.randint(0, 5),
"warning_alerts": random.randint(5, 20),
"info_alerts": random.randint(10, 50)
}
}
return report_data
def recommend_tuning_actions(self, performance_data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""推荐调优操作"""
recommendations = []
summary = performance_data.get("summary", {})
resource_usage = performance_data.get("resource_usage", {})
# 基于查询性能的推荐
if summary.get("p95_query_duration_ms", 0) > 1000:
recommendations.append({
"category": "查询优化",
"priority": "高",
"action": "优化慢查询",
"description": "95分位查询时间过长,需要优化索引和查询结构",
"expected_improvement": "30-50%性能提升",
"implementation_steps": [
"分析慢查询日志",
"检查索引使用情况",
"优化查询结构",
"添加必要索引"
]
})
# 基于资源使用的推荐
if resource_usage.get("peak_memory_percent", 0) > 90:
recommendations.append({
"category": "内存优化",
"priority": "高",
"action": "优化内存使用",
"description": "内存使用率过高,可能影响系统稳定性",
"expected_improvement": "减少20-30%内存使用",
"implementation_steps": [
"调整max_memory_usage设置",
"优化大查询",
"使用流式处理",
"增加物理内存"
]
})
if resource_usage.get("peak_cpu_percent", 0) > 85:
recommendations.append({
"category": "CPU优化",
"priority": "中",
"action": "优化CPU使用",
"description": "CPU使用率过高,可能需要优化查询或扩展资源",
"expected_improvement": "减少15-25%CPU使用",
"implementation_steps": [
"优化复杂查询",
"调整并发设置",
"使用查询缓存",
"考虑水平扩展"
]
})
# 基于慢查询数量的推荐
if summary.get("slow_queries_count", 0) > 50:
recommendations.append({
"category": "索引优化",
"priority": "中",
"action": "添加索引",
"description": "慢查询数量较多,可能缺少必要的索引",
"expected_improvement": "40-60%查询速度提升",
"implementation_steps": [
"分析查询模式",
"识别高频过滤列",
"创建合适索引",
"监控索引效果"
]
})
return recommendations
# 性能监控示例
print("\n\n=== ClickHouse性能监控分析 ===")
monitor = ClickHousePerformanceMonitor()
print("\n1. 系统指标收集:")
system_metrics = monitor.collect_system_metrics()
for metric_name, value in list(system_metrics.items())[:5]:
unit = "%" if "percent" in metric_name else ("mbps" if "mbps" in metric_name else "count")
print(f" {metric_name}: {value:.1f} {unit}")
print("\n2. 查询性能分析:")
# 模拟查询日志
sample_query_log = [
{
"query_id": "query_001",
"sql_hash": "abc123",
"execution_time_ms": 2500,
"rows_read": 5_000_000,
"bytes_read": 500_000_000,
"memory_usage_mb": 1024,
"cpu_time_ms": 3000
},
{
"query_id": "query_002",
"sql_hash": "def456",
"execution_time_ms": 150,
"rows_read": 100_000,
"bytes_read": 10_000_000,
"memory_usage_mb": 64,
"cpu_time_ms": 120
},
{
"query_id": "query_003",
"sql_hash": "ghi789",
"execution_time_ms": 8000,
"rows_read": 20_000_000,
"bytes_read": 2_000_000_000,
"memory_usage_mb": 4096,
"cpu_time_ms": 12000
}
]
query_analysis = monitor.analyze_query_performance(sample_query_log)
for i, analysis in enumerate(query_analysis, 1):
print(f"\n 查询 {i} (等级: {analysis.performance_grade}):")
print(f" 执行时间: {analysis.execution_time_ms:.0f}ms")
print(f" 内存使用: {analysis.memory_usage_mb:.0f}MB")
print(f" 读取行数: {analysis.rows_read:,}")
print(f" 优化建议: {analysis.optimization_suggestions[0]}")
print("\n3. 告警检查:")
# 模拟当前指标
current_metrics = {
"max_query_duration_ms": 6000,
"memory_usage_percent": 92,
"disk_usage_percent": 88,
"concurrent_queries": 45
}
alerts = monitor.check_alerts(current_metrics)
for i, alert in enumerate(alerts, 1):
print(f"\n 告警 {i} ({alert.level.value.upper()}):")
print(f" 指标: {alert.metric_name}")
print(f" 当前值: {alert.current_value:.1f}")
print(f" 阈值: {alert.threshold:.1f}")
print(f" 消息: {alert.message}")
print(f" 建议操作: {', '.join(alert.suggested_actions[:2])}")
print("\n4. 性能报告:")
performance_report = monitor.generate_performance_report(24)
print(f"\n 报告周期: {performance_report['report_period']}")
print(f" 生成时间: {performance_report['generated_at']}")
print(f"\n 查询统计:")
for key, value in performance_report['summary'].items():
unit = "ms" if "duration" in key else "个"
print(f" {key}: {value:.0f if isinstance(value, float) else value} {unit}")
print(f"\n 资源使用:")
for key, value in list(performance_report['resource_usage'].items())[:3]:
unit = "%" if "percent" in key else "mbps"
print(f" {key}: {value:.1f} {unit}")
print(f"\n 优化机会:")
for i, opportunity in enumerate(performance_report['optimization_opportunities'], 1):
print(f" {i}. {opportunity}")
print("\n5. 调优建议:")
tuning_recommendations = monitor.recommend_tuning_actions(performance_report)
for i, rec in enumerate(tuning_recommendations, 1):
print(f"\n 建议 {i} ({rec['priority']}优先级):")
print(f" 类别: {rec['category']}")
print(f" 操作: {rec['action']}")
print(f" 描述: {rec['description']}")
print(f" 预期改进: {rec['expected_improvement']}")
print(f" 实施步骤: {', '.join(rec['implementation_steps'][:2])}...")
最佳实践与总结
查询优化最佳实践
索引策略
- 为高频查询列创建合适的索引
- 主键选择要考虑查询模式和数据分布
- 避免过多索引影响写入性能
查询编写规范
- 使用明确的列名而非SELECT *
- 合理使用WHERE条件进行数据过滤
- 优化JOIN顺序,小表在前
- 使用LIMIT限制结果集大小
分区设计
- 基于查询模式设计分区键
- 避免分区过多或过少
- 定期清理历史分区
性能调优策略
系统配置优化
- 合理配置内存限制
- 调整并发查询数量
- 优化磁盘IO设置
表结构优化
- 选择合适的表引擎
- 优化列的数据类型
- 合理设计表结构
监控和维护
- 建立完善的监控体系
- 定期分析慢查询
- 持续优化和调整
总结
本章详细介绍了ClickHouse的查询优化与性能调优,包括:
- 查询执行原理: 理解ClickHouse的查询处理流程和优化机制
- 索引优化策略: 掌握各种索引类型的使用场景和优化方法
- 性能监控: 建立完善的性能监控和告警体系
- 调优实践: 学习系统级和查询级的性能优化方法
通过本章的学习,你应该能够: - 分析和优化ClickHouse查询性能 - 设计合理的索引策略 - 建立有效的性能监控体系 - 实施系统性的性能调优
下一章将介绍ClickHouse的集群部署与管理,学习如何构建高可用的ClickHouse集群。python from enum import Enum from dataclasses import dataclass from typing import Dict, List, Any, Optional import time import random
class QueryStage(Enum): “”“查询执行阶段”“” PARSING = “parsing” ANALYSIS = “analysis” OPTIMIZATION = “optimization” EXECUTION = “execution” RESULT_PROCESSING = “result_processing”
class OptimizationType(Enum): “”“优化类型”“” PREDICATE_PUSHDOWN = “predicate_pushdown” PROJECTION_PUSHDOWN = “projection_pushdown” JOIN_REORDERING = “join_reordering” INDEX_USAGE = “index_usage” PARTITION_PRUNING = “partition_pruning” AGGREGATION_PUSHDOWN = “aggregation_pushdown”
@dataclass class QueryExecutionPlan: “”“查询执行计划”“” query_id: str sql: str stages: List[Dict[str, Any]] estimated_cost: float estimated_rows: int optimizations_applied: List[OptimizationType]
@dataclass class QueryPerformanceMetrics: “”“查询性能指标”“” execution_time_ms: float rows_read: int bytes_read: int rows_processed: int memory_usage_mb: float cpu_usage_percent: float io_wait_percent: float cache_hit_ratio: float
class ClickHouseQueryOptimizer: “”“ClickHouse查询优化器”“”
def __init__(self):
self.optimization_rules = self._initialize_optimization_rules()
self.performance_baselines = self._initialize_performance_baselines()
def _initialize_optimization_rules(self) -> Dict[OptimizationType, Dict[str, Any]]:
"""初始化优化规则"""
return {
OptimizationType.PREDICATE_PUSHDOWN: {
"description": "将WHERE条件下推到存储层",
"benefits": ["减少读取数据量", "提高过滤效率", "降低网络传输"],
"applicable_scenarios": ["有选择性过滤条件", "多表连接", "子查询"],
"performance_gain": "50-90%",
"implementation_complexity": "low"
},
OptimizationType.PROJECTION_PUSHDOWN: {
"description": "将SELECT列下推到存储层",
"benefits": ["减少列读取", "降低内存使用", "提高缓存效率"],
"applicable_scenarios": ["宽表查询", "列式存储", "网络传输优化"],
"performance_gain": "20-60%",
"implementation_complexity": "low"
},
OptimizationType.JOIN_REORDERING: {
"description": "重新排列JOIN顺序以优化执行",
"benefits": ["减少中间结果", "优化内存使用", "提高JOIN效率"],
"applicable_scenarios": ["多表JOIN", "不同表大小", "复杂查询"],
"performance_gain": "30-80%",
"implementation_complexity": "medium"
},
OptimizationType.INDEX_USAGE: {
"description": "利用索引加速查询",
"benefits": ["快速定位数据", "减少扫描范围", "提高查询速度"],
"applicable_scenarios": ["等值查询", "范围查询", "排序查询"],
"performance_gain": "70-95%",
"implementation_complexity": "low"
},
OptimizationType.PARTITION_PRUNING: {
"description": "根据分区键过滤不相关分区",
"benefits": ["减少扫描分区", "提高查询速度", "降低IO开销"],
"applicable_scenarios": ["时间范围查询", "分区表", "大数据集"],
"performance_gain": "60-90%",
"implementation_complexity": "low"
},
OptimizationType.AGGREGATION_PUSHDOWN: {
"description": "将聚合操作下推到存储层",
"benefits": ["减少数据传输", "提前聚合", "降低内存压力"],
"applicable_scenarios": ["GROUP BY查询", "聚合函数", "大数据集"],
"performance_gain": "40-70%",
"implementation_complexity": "medium"
}
}
def _initialize_performance_baselines(self) -> Dict[str, Dict[str, float]]:
"""初始化性能基线"""
return {
"simple_select": {
"rows_per_second": 10_000_000,
"bytes_per_second": 1_000_000_000,
"memory_mb_per_million_rows": 100
},
"aggregation": {
"rows_per_second": 5_000_000,
"bytes_per_second": 500_000_000,
"memory_mb_per_million_rows": 200
},
"join": {
"rows_per_second": 1_000_000,
"bytes_per_second": 200_000_000,
"memory_mb_per_million_rows": 500
},
"window_function": {
"rows_per_second": 2_000_000,
"bytes_per_second": 300_000_000,
"memory_mb_per_million_rows": 300
}
}
def analyze_query_execution_plan(self, sql: str) -> QueryExecutionPlan:
"""分析查询执行计划"""
query_id = f"query_{int(time.time() * 1000)}"
# 模拟查询解析和分析
stages = []
# 解析阶段
stages.append({
"stage": QueryStage.PARSING,
"description": "SQL语法解析和词法分析",
"estimated_time_ms": 1,
"operations": ["tokenization", "syntax_tree_building"]
})
# 分析阶段
stages.append({
"stage": QueryStage.ANALYSIS,
"description": "语义分析和类型检查",
"estimated_time_ms": 5,
"operations": ["table_resolution", "column_validation", "type_checking"]
})
# 优化阶段
optimizations_applied = self._identify_applicable_optimizations(sql)
stages.append({
"stage": QueryStage.OPTIMIZATION,
"description": "查询优化和执行计划生成",
"estimated_time_ms": 10,
"operations": [opt.value for opt in optimizations_applied]
})
# 执行阶段
estimated_rows = self._estimate_result_rows(sql)
estimated_cost = self._estimate_query_cost(sql, estimated_rows)
stages.append({
"stage": QueryStage.EXECUTION,
"description": "查询执行和数据处理",
"estimated_time_ms": estimated_cost,
"operations": ["data_reading", "filtering", "processing"]
})
# 结果处理阶段
stages.append({
"stage": QueryStage.RESULT_PROCESSING,
"description": "结果格式化和返回",
"estimated_time_ms": max(1, estimated_rows / 1000000),
"operations": ["result_formatting", "network_transfer"]
})
return QueryExecutionPlan(
query_id=query_id,
sql=sql,
stages=stages,
estimated_cost=estimated_cost,
estimated_rows=estimated_rows,
optimizations_applied=optimizations_applied
)
def _identify_applicable_optimizations(self, sql: str) -> List[OptimizationType]:
"""识别适用的优化策略"""
optimizations = []
sql_lower = sql.lower()
# 检查谓词下推
if 'where' in sql_lower:
optimizations.append(OptimizationType.PREDICATE_PUSHDOWN)
# 检查投影下推
if 'select' in sql_lower and '*' not in sql_lower:
optimizations.append(OptimizationType.PROJECTION_PUSHDOWN)
# 检查JOIN重排序
if 'join' in sql_lower:
optimizations.append(OptimizationType.JOIN_REORDERING)
# 检查索引使用
if any(keyword in sql_lower for keyword in ['where', 'order by', 'group by']):
optimizations.append(OptimizationType.INDEX_USAGE)
# 检查分区裁剪
if any(keyword in sql_lower for keyword in ['date', 'timestamp', 'time']):
optimizations.append(OptimizationType.PARTITION_PRUNING)
# 检查聚合下推
if any(keyword in sql_lower for keyword in ['group by', 'count', 'sum', 'avg']):
optimizations.append(OptimizationType.AGGREGATION_PUSHDOWN)
return optimizations
def _estimate_result_rows(self, sql: str) -> int:
"""估算结果行数"""
sql_lower = sql.lower()
# 基础行数估算
base_rows = 1_000_000
# 根据查询类型调整
if 'group by' in sql_lower:
base_rows = int(base_rows * 0.1) # 聚合查询通常减少行数
elif 'join' in sql_lower:
base_rows = int(base_rows * 1.5) # JOIN可能增加行数
elif 'where' in sql_lower:
base_rows = int(base_rows * 0.3) # WHERE条件减少行数
return max(1, base_rows)
def _estimate_query_cost(self, sql: str, estimated_rows: int) -> float:
"""估算查询成本(毫秒)"""
sql_lower = sql.lower()
# 基础成本计算
base_cost_per_row = 0.001 # 每行0.001毫秒
# 根据查询复杂度调整
complexity_multiplier = 1.0
if 'join' in sql_lower:
complexity_multiplier *= 3.0
if 'group by' in sql_lower:
complexity_multiplier *= 2.0
if 'order by' in sql_lower:
complexity_multiplier *= 1.5
if 'window' in sql_lower or 'over' in sql_lower:
complexity_multiplier *= 4.0
return estimated_rows * base_cost_per_row * complexity_multiplier
def generate_optimization_recommendations(self, execution_plan: QueryExecutionPlan) -> List[Dict[str, Any]]:
"""生成优化建议"""
recommendations = []
for optimization in execution_plan.optimizations_applied:
rule = self.optimization_rules[optimization]
recommendations.append({
"optimization_type": optimization,
"description": rule["description"],
"expected_performance_gain": rule["performance_gain"],
"implementation_complexity": rule["implementation_complexity"],
"benefits": rule["benefits"],
"applicable_scenarios": rule["applicable_scenarios"]
})
# 添加通用优化建议
if execution_plan.estimated_cost > 1000: # 查询成本较高
recommendations.append({
"optimization_type": "general",
"description": "考虑添加适当的索引或优化表结构",
"expected_performance_gain": "30-70%",
"implementation_complexity": "medium",
"benefits": ["减少查询时间", "提高并发性能"],
"applicable_scenarios": ["高频查询", "复杂查询"]
})
if execution_plan.estimated_rows > 10_000_000: # 结果集较大
recommendations.append({
"optimization_type": "result_limiting",
"description": "考虑使用LIMIT或分页查询",
"expected_performance_gain": "50-90%",
"implementation_complexity": "low",
"benefits": ["减少内存使用", "提高响应速度"],
"applicable_scenarios": ["大结果集", "交互式查询"]
})
return recommendations
def simulate_query_performance(self, sql: str, table_size_rows: int = 1_000_000) -> QueryPerformanceMetrics:
"""模拟查询性能"""
sql_lower = sql.lower()
# 确定查询类型
query_type = "simple_select"
if 'join' in sql_lower:
query_type = "join"
elif 'group by' in sql_lower:
query_type = "aggregation"
elif 'window' in sql_lower or 'over' in sql_lower:
query_type = "window_function"
baseline = self.performance_baselines[query_type]
# 计算性能指标
rows_read = table_size_rows
if 'where' in sql_lower:
rows_read = int(rows_read * random.uniform(0.1, 0.5)) # WHERE条件减少读取行数
bytes_read = rows_read * random.uniform(50, 200) # 每行50-200字节
execution_time_ms = (rows_read / baseline["rows_per_second"]) * 1000
execution_time_ms *= random.uniform(0.8, 1.2) # 添加随机变化
memory_usage_mb = (rows_read / 1_000_000) * baseline["memory_mb_per_million_rows"]
return QueryPerformanceMetrics(
execution_time_ms=execution_time_ms,
rows_read=rows_read,
bytes_read=int(bytes_read),
rows_processed=rows_read,
memory_usage_mb=memory_usage_mb,
cpu_usage_percent=random.uniform(20, 80),
io_wait_percent=random.uniform(5, 30),
cache_hit_ratio=random.uniform(0.6, 0.9)
)
def compare_query_performance(self, original_sql: str, optimized_sql: str,
table_size_rows: int = 1_000_000) -> Dict[str, Any]:
"""比较查询性能"""
original_metrics = self.simulate_query_performance(original_sql, table_size_rows)
optimized_metrics = self.simulate_query_performance(optimized_sql, table_size_rows)
# 计算改进百分比
improvements = {
"execution_time": ((original_metrics.execution_time_ms - optimized_metrics.execution_time_ms)
/ original_metrics.execution_time_ms) * 100,
"rows_read": ((original_metrics.rows_read - optimized_metrics.rows_read)
/ original_metrics.rows_read) * 100,
"memory_usage": ((original_metrics.memory_usage_mb - optimized_metrics.memory_usage_mb)
/ original_metrics.memory_usage_mb) * 100
}
return {
"original_metrics": original_metrics,
"optimized_metrics": optimized_metrics,
"improvements": improvements,
"summary": {
"faster_by_percent": improvements["execution_time"],
"memory_saved_percent": improvements["memory_usage"],
"io_reduced_percent": improvements["rows_read"]
}
}
查询优化示例
print(“\n\n=== ClickHouse查询优化分析 ===”)
optimizer = ClickHouseQueryOptimizer()
print(“\n1. 查询执行计划分析:”) test_queries = [ “SELECT user_id, count() FROM events WHERE date >= ‘2024-01-01’ GROUP BY user_id”, “SELECT e., u.name FROM events e JOIN users u ON e.user_id = u.id WHERE e.date >= today()”, “SELECT date, sum(revenue) OVER (ORDER BY date ROWS 7 PRECEDING) FROM sales ORDER BY date” ]
for i, query in enumerate(test_queries, 1): execution_plan = optimizer.analyze_query_execution_plan(query)
print(f"\n 查询 {i}:")
print(f" SQL: {query[:60]}...")
print(f" 预估成本: {execution_plan.estimated_cost:.2f}ms")
print(f" 预估行数: {execution_plan.estimated_rows:,}")
print(f" 应用优化: {[opt.value for opt in execution_plan.optimizations_applied][:3]}")
# 显示执行阶段
total_time = sum(stage['estimated_time_ms'] for stage in execution_plan.stages)
print(f" 执行阶段 (总计: {total_time:.2f}ms):")
for stage in execution_plan.stages[:3]: # 显示前3个阶段
print(f" - {stage['stage'].value}: {stage['estimated_time_ms']:.2f}ms")
print(“\n2. 优化建议生成:”) for i, query in enumerate(test_queries[:2], 1): execution_plan = optimizer.analyze_query_execution_plan(query) recommendations = optimizer.generate_optimization_recommendations(execution_plan)
print(f"\n 查询 {i} 优化建议:")
for j, rec in enumerate(recommendations[:2], 1):
print(f" 建议 {j}: {rec['description']}")
print(f" 预期性能提升: {rec['expected_performance_gain']}")
print(f" 实现复杂度: {rec['implementation_complexity']}")
print(f" 主要收益: {', '.join(rec['benefits'][:2])}")
print(“\n3. 查询性能模拟:”) performance_test_cases = [ {“sql”: “SELECT * FROM large_table WHERE id = 12345”, “size”: 10_000_000}, {“sql”: “SELECT category, count() FROM products GROUP BY category”, “size”: 1_000_000}, {“sql”: “SELECT p., c.name FROM products p JOIN categories c ON p.category_id = c.id”, “size”: 5_000_000} ]
for i, case in enumerate(performance_test_cases, 1): metrics = optimizer.simulate_query_performance(case[“sql”], case[“size”])
print(f"\n 测试 {i} (表大小: {case['size']:,} 行):")
print(f" SQL: {case['sql'][:50]}...")
print(f" 执行时间: {metrics.execution_time_ms:.2f}ms")
print(f" 读取行数: {metrics.rows_read:,}")
print(f" 内存使用: {metrics.memory_usage_mb:.2f}MB")
print(f" 缓存命中率: {metrics.cache_hit_ratio:.2%}")
print(“\n4. 查询性能对比:”) comparison_cases = [ { “original”: “SELECT * FROM events WHERE toYYYYMM(date) = 202401”, “optimized”: “SELECT * FROM events WHERE date >= ‘2024-01-01’ AND date < ‘2024-02-01’” }, { “original”: “SELECT user_id, count() FROM events GROUP BY user_id ORDER BY count() DESC”, “optimized”: “SELECT user_id, count() FROM events GROUP BY user_id ORDER BY count() DESC LIMIT 100” } ]
for i, case in enumerate(comparison_cases, 1): comparison = optimizer.compare_query_performance( case[“original”], case[“optimized”], 5_000_000 )
print(f"\n 对比 {i}:")
print(f" 原始查询: {case['original'][:40]}...")
print(f" 优化查询: {case['optimized'][:40]}...")
print(f" 性能提升: {comparison['summary']['faster_by_percent']:.1f}%")
print(f" 内存节省: {comparison['summary']['memory_saved_percent']:.1f}%")
print(f" IO减少: {comparison['summary']['io_reduced_percent']:.1f}%")
## 索引优化策略
索引是ClickHouse查询优化的关键因素,合理的索引设计可以显著提升查询性能。
```python
class IndexType(Enum):
"""索引类型"""
PRIMARY_KEY = "primary_key"
MINMAX = "minmax"
SET = "set"
BLOOM_FILTER = "bloom_filter"
TOKENBF_V1 = "tokenbf_v1"
NGRAMBF_V1 = "ngrambf_v1"
@dataclass
class IndexRecommendation:
"""索引推荐"""
index_type: IndexType
columns: List[str]
description: str
expected_improvement: str
use_cases: List[str]
creation_sql: str
maintenance_cost: str
class ClickHouseIndexOptimizer:
"""ClickHouse索引优化器"""
def __init__(self):
self.index_characteristics = self._initialize_index_characteristics()
def _initialize_index_characteristics(self) -> Dict[IndexType, Dict[str, Any]]:
"""初始化索引特性"""
return {
IndexType.PRIMARY_KEY: {
"description": "主键索引,用于快速定位和排序",
"best_for": ["等值查询", "范围查询", "排序查询"],
"performance_gain": "70-95%",
"memory_overhead": "low",
"maintenance_cost": "low",
"cardinality_requirement": "medium_to_high"
},
IndexType.MINMAX: {
"description": "最小最大值索引,适用于范围查询",
"best_for": ["范围查询", "数值比较", "日期范围"],
"performance_gain": "50-80%",
"memory_overhead": "very_low",
"maintenance_cost": "very_low",
"cardinality_requirement": "any"
},
IndexType.SET: {
"description": "集合索引,适用于IN查询",
"best_for": ["IN查询", "等值查询", "小集合过滤"],
"performance_gain": "60-90%",
"memory_overhead": "medium",
"maintenance_cost": "medium",
"cardinality_requirement": "low_to_medium"
},
IndexType.BLOOM_FILTER: {
"description": "布隆过滤器索引,适用于等值查询",
"best_for": ["等值查询", "存在性检查", "高基数列"],
"performance_gain": "40-70%",
"memory_overhead": "low",
"maintenance_cost": "low",
"cardinality_requirement": "high"
},
IndexType.TOKENBF_V1: {
"description": "分词布隆过滤器,适用于文本搜索",
"best_for": ["文本搜索", "关键词查询", "模糊匹配"],
"performance_gain": "30-60%",
"memory_overhead": "medium",
"maintenance_cost": "medium",
"cardinality_requirement": "high"
},
IndexType.NGRAMBF_V1: {
"description": "N-gram布隆过滤器,适用于子串搜索",
"best_for": ["子串搜索", "LIKE查询", "模式匹配"],
"performance_gain": "25-50%",
"memory_overhead": "high",
"maintenance_cost": "high",
"cardinality_requirement": "high"
}
}
def analyze_query_patterns(self, queries: List[str]) -> Dict[str, Any]:
"""分析查询模式"""
patterns = {
"equality_filters": [],
"range_filters": [],
"in_filters": [],
"like_filters": [],
"order_by_columns": [],
"group_by_columns": [],
"join_columns": []
}
for query in queries:
query_lower = query.lower()
# 分析WHERE条件
if 'where' in query_lower:
# 等值过滤
if ' = ' in query_lower:
patterns["equality_filters"].extend(self._extract_equality_columns(query))
# 范围过滤
if any(op in query_lower for op in ['>', '<', 'between', '>=', '<=']):
patterns["range_filters"].extend(self._extract_range_columns(query))
# IN过滤
if ' in ' in query_lower:
patterns["in_filters"].extend(self._extract_in_columns(query))
# LIKE过滤
if ' like ' in query_lower:
patterns["like_filters"].extend(self._extract_like_columns(query))
# 分析ORDER BY
if 'order by' in query_lower:
patterns["order_by_columns"].extend(self._extract_order_by_columns(query))
# 分析GROUP BY
if 'group by' in query_lower:
patterns["group_by_columns"].extend(self._extract_group_by_columns(query))
# 分析JOIN
if 'join' in query_lower:
patterns["join_columns"].extend(self._extract_join_columns(query))
# 统计频率
for pattern_type in patterns:
column_counts = {}
for column in patterns[pattern_type]:
column_counts[column] = column_counts.get(column, 0) + 1
patterns[pattern_type] = sorted(column_counts.items(), key=lambda x: x[1], reverse=True)
return patterns
def _extract_equality_columns(self, query: str) -> List[str]:
"""提取等值查询列"""
# 简化实现,实际应该使用SQL解析器
import re
pattern = r'(\w+)\s*=\s*'
matches = re.findall(pattern, query, re.IGNORECASE)
return [match for match in matches if match.lower() not in ['select', 'from', 'where', 'and', 'or']]
def _extract_range_columns(self, query: str) -> List[str]:
"""提取范围查询列"""
import re
pattern = r'(\w+)\s*[<>=]+\s*'
matches = re.findall(pattern, query, re.IGNORECASE)
return [match for match in matches if match.lower() not in ['select', 'from', 'where', 'and', 'or']]
def _extract_in_columns(self, query: str) -> List[str]:
"""提取IN查询列"""
import re
pattern = r'(\w+)\s+in\s*\('
matches = re.findall(pattern, query, re.IGNORECASE)
return matches
def _extract_like_columns(self, query: str) -> List[str]:
"""提取LIKE查询列"""
import re
pattern = r'(\w+)\s+like\s+'
matches = re.findall(pattern, query, re.IGNORECASE)
return matches
def _extract_order_by_columns(self, query: str) -> List[str]:
"""提取ORDER BY列"""
import re
pattern = r'order\s+by\s+([\w,\s]+)'
match = re.search(pattern, query, re.IGNORECASE)
if match:
columns = match.group(1).split(',')
return [col.strip().split()[0] for col in columns]
return []
def _extract_group_by_columns(self, query: str) -> List[str]:
"""提取GROUP BY列"""
import re
pattern = r'group\s+by\s+([\w,\s]+)'
match = re.search(pattern, query, re.IGNORECASE)
if match:
columns = match.group(1).split(',')
return [col.strip() for col in columns]
return []
def _extract_join_columns(self, query: str) -> List[str]:
"""提取JOIN列"""
import re
pattern = r'on\s+(\w+\.\w+)\s*=\s*(\w+\.\w+)'
matches = re.findall(pattern, query, re.IGNORECASE)
columns = []
for match in matches:
columns.extend([col.split('.')[1] for col in match])
return columns
def recommend_indexes(self, query_patterns: Dict[str, Any],
table_info: Dict[str, Any]) -> List[IndexRecommendation]:
"""推荐索引"""
recommendations = []
# 主键索引推荐
if query_patterns["equality_filters"] or query_patterns["range_filters"] or query_patterns["order_by_columns"]:
# 选择最频繁使用的列作为主键候选
all_columns = (query_patterns["equality_filters"] +
query_patterns["range_filters"] +
query_patterns["order_by_columns"])
if all_columns:
primary_key_candidate = all_columns[0][0] # 最频繁的列
recommendations.append(IndexRecommendation(
index_type=IndexType.PRIMARY_KEY,
columns=[primary_key_candidate],
description=f"将{primary_key_candidate}设置为主键以优化查询性能",
expected_improvement="70-95%",
use_cases=["等值查询", "范围查询", "排序查询"],
creation_sql=f"ORDER BY {primary_key_candidate}",
maintenance_cost="低"
))
# MinMax索引推荐
for column, frequency in query_patterns["range_filters"][:3]: # 前3个最频繁的范围查询列
recommendations.append(IndexRecommendation(
index_type=IndexType.MINMAX,
columns=[column],
description=f"为{column}创建MinMax索引以优化范围查询",
expected_improvement="50-80%",
use_cases=["范围查询", "数值比较"],
creation_sql=f"INDEX idx_minmax_{column} {column} TYPE minmax GRANULARITY 1",
maintenance_cost="极低"
))
# Set索引推荐
for column, frequency in query_patterns["in_filters"][:2]: # 前2个最频繁的IN查询列
recommendations.append(IndexRecommendation(
index_type=IndexType.SET,
columns=[column],
description=f"为{column}创建Set索引以优化IN查询",
expected_improvement="60-90%",
use_cases=["IN查询", "等值查询"],
creation_sql=f"INDEX idx_set_{column} {column} TYPE set(100) GRANULARITY 1",
maintenance_cost="中等"
))
# Bloom Filter索引推荐
for column, frequency in query_patterns["equality_filters"][:2]: # 前2个最频繁的等值查询列
if frequency >= 5: # 只为高频查询列推荐
recommendations.append(IndexRecommendation(
index_type=IndexType.BLOOM_FILTER,
columns=[column],
description=f"为{column}创建Bloom Filter索引以优化等值查询",
expected_improvement="40-70%",
use_cases=["等值查询", "存在性检查"],
creation_sql=f"INDEX idx_bloom_{column} {column} TYPE bloom_filter GRANULARITY 1",
maintenance_cost="低"
))
# 文本搜索索引推荐
for column, frequency in query_patterns["like_filters"]:
if frequency >= 3: # 为频繁的LIKE查询推荐
recommendations.append(IndexRecommendation(
index_type=IndexType.TOKENBF_V1,
columns=[column],
description=f"为{column}创建Token Bloom Filter索引以优化文本搜索",
expected_improvement="30-60%",
use_cases=["文本搜索", "关键词查询"],
creation_sql=f"INDEX idx_token_{column} {column} TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1",
maintenance_cost="中等"
))
return recommendations
def generate_index_creation_script(self, table_name: str,
recommendations: List[IndexRecommendation]) -> str:
"""生成索引创建脚本"""
script_parts = []
script_parts.append(f"-- 索引优化脚本 for {table_name}")
script_parts.append(f"-- 生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
script_parts.append("")
for i, rec in enumerate(recommendations, 1):
script_parts.append(f"-- 索引 {i}: {rec.description}")
script_parts.append(f"-- 预期性能提升: {rec.expected_improvement}")
script_parts.append(f"-- 维护成本: {rec.maintenance_cost}")
if rec.index_type == IndexType.PRIMARY_KEY:
script_parts.append(f"-- 注意: 主键需要在CREATE TABLE时指定")
script_parts.append(f"-- {rec.creation_sql}")
else:
script_parts.append(f"ALTER TABLE {table_name} ADD {rec.creation_sql};")
script_parts.append("")
return "\n".join(script_parts)
def analyze_index_effectiveness(self, index_recommendations: List[IndexRecommendation],
query_patterns: Dict[str, Any]) -> Dict[str, Any]:
"""分析索引有效性"""
effectiveness_analysis = {
"high_impact_indexes": [],
"medium_impact_indexes": [],
"low_impact_indexes": [],
"total_expected_improvement": 0,
"implementation_priority": []
}
for rec in index_recommendations:
# 计算影响分数
impact_score = 0
# 基于索引类型的基础分数
type_scores = {
IndexType.PRIMARY_KEY: 90,
IndexType.MINMAX: 70,
IndexType.SET: 75,
IndexType.BLOOM_FILTER: 60,
IndexType.TOKENBF_V1: 50,
IndexType.NGRAMBF_V1: 40
}
impact_score += type_scores.get(rec.index_type, 50)
# 基于查询频率的调整
column = rec.columns[0] if rec.columns else ""
for pattern_type, columns in query_patterns.items():
for col, frequency in columns:
if col == column:
impact_score += min(frequency * 5, 30) # 最多加30分
break
# 分类索引
rec_with_score = {"recommendation": rec, "impact_score": impact_score}
if impact_score >= 80:
effectiveness_analysis["high_impact_indexes"].append(rec_with_score)
elif impact_score >= 60:
effectiveness_analysis["medium_impact_indexes"].append(rec_with_score)
else:
effectiveness_analysis["low_impact_indexes"].append(rec_with_score)
# 计算总体预期改进
total_improvement = 0
for category in ["high_impact_indexes", "medium_impact_indexes", "low_impact_indexes"]:
for item in effectiveness_analysis[category]:
# 从预期改进字符串中提取数值
improvement_str = item["recommendation"].expected_improvement
if "-" in improvement_str:
avg_improvement = sum(map(int, improvement_str.replace("%", "").split("-"))) / 2
else:
avg_improvement = int(improvement_str.replace("%", ""))
total_improvement += avg_improvement * 0.1 # 权重调整
effectiveness_analysis["total_expected_improvement"] = min(total_improvement, 90) # 最大90%
# 实现优先级
all_indexes = (effectiveness_analysis["high_impact_indexes"] +
effectiveness_analysis["medium_impact_indexes"] +
effectiveness_analysis["low_impact_indexes"])
effectiveness_analysis["implementation_priority"] = sorted(
all_indexes, key=lambda x: x["impact_score"], reverse=True
)
return effectiveness_analysis
# 索引优化示例
print("\n\n=== ClickHouse索引优化分析 ===")
index_optimizer = ClickHouseIndexOptimizer()
print("\n1. 查询模式分析:")
sample_queries = [
"SELECT * FROM events WHERE user_id = 12345",
"SELECT * FROM events WHERE user_id = 67890",
"SELECT * FROM events WHERE date >= '2024-01-01' AND date <= '2024-01-31'",
"SELECT * FROM events WHERE event_type IN ('click', 'view', 'purchase')",
"SELECT * FROM events WHERE message LIKE '%error%'",
"SELECT user_id, count(*) FROM events GROUP BY user_id ORDER BY count(*) DESC",
"SELECT e.*, u.name FROM events e JOIN users u ON e.user_id = u.id"
]
query_patterns = index_optimizer.analyze_query_patterns(sample_queries)
for pattern_type, columns in query_patterns.items():
if columns:
print(f"\n {pattern_type.replace('_', ' ').title()}:")
for column, frequency in columns[:3]: # 显示前3个最频繁的
print(f" {column}: {frequency} 次")
print("\n2. 索引推荐:")
table_info = {"name": "events", "size_gb": 100, "daily_queries": 10000}
index_recommendations = index_optimizer.recommend_indexes(query_patterns, table_info)
for i, rec in enumerate(index_recommendations, 1):
print(f"\n 推荐 {i}: {rec.index_type.value}")
print(f" 列: {', '.join(rec.columns)}")
print(f" 描述: {rec.description}")
print(f" 预期改进: {rec.expected_improvement}")
print(f" 维护成本: {rec.maintenance_cost}")
print(f" 适用场景: {', '.join(rec.use_cases[:2])}")
print("\n3. 索引创建脚本:")
creation_script = index_optimizer.generate_index_creation_script("events", index_recommendations[:3])
print(creation_script[:500] + "..." if len(creation_script) > 500 else creation_script)
print("\n4. 索引有效性分析:")
effectiveness = index_optimizer.analyze_index_effectiveness(index_recommendations, query_patterns)
print(f"\n 总体预期性能提升: {effectiveness['total_expected_improvement']:.1f}%")
print(f"\n 高影响索引 ({len(effectiveness['high_impact_indexes'])} 个):")
for item in effectiveness['high_impact_indexes'][:2]:
rec = item['recommendation']
print(f" - {rec.index_type.value} on {', '.join(rec.columns)} (影响分数: {item['impact_score']})")
print(f"\n 实现优先级 (前3个):")
for i, item in enumerate(effectiveness['implementation_priority'][:3], 1):
rec = item['recommendation']
print(f" {i}. {rec.index_type.value} on {', '.join(rec.columns)} (分数: {item['impact_score']})")