7.1 查询优化基础
7.1.1 查询执行原理
Apache Kylin的查询优化是提升系统性能的关键环节。理解查询执行原理有助于我们更好地优化查询性能。
查询执行流程
graph TD
A[SQL查询] --> B[SQL解析]
B --> C[查询重写]
C --> D[Cube匹配]
D --> E[执行计划生成]
E --> F[查询执行]
F --> G[结果返回]
D --> H[HBase查询]
D --> I[Spark SQL查询]
H --> J[Cuboid选择]
I --> K[实时计算]
J --> F
K --> F
查询分析器
#!/usr/bin/env python3
# query_analyzer.py - 查询分析器
import re
import json
import time
import logging
from typing import Dict, List, Tuple, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass
from collections import defaultdict
@dataclass
class QueryMetrics:
"""查询指标"""
query_id: str
sql: str
execution_time_ms: int
scan_count: int
cache_hit: bool
cuboid_id: str
result_rows: int
timestamp: datetime
class QueryAnalyzer:
"""查询分析器"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.query_history = []
self.performance_cache = {}
def analyze_sql(self, sql: str) -> Dict:
"""分析SQL查询"""
analysis = {
"sql": sql,
"analysis_time": datetime.now().isoformat(),
"complexity": self._calculate_complexity(sql),
"tables": self._extract_tables(sql),
"columns": self._extract_columns(sql),
"filters": self._extract_filters(sql),
"aggregations": self._extract_aggregations(sql),
"joins": self._extract_joins(sql),
"optimization_suggestions": []
}
# 生成优化建议
analysis["optimization_suggestions"] = self._generate_optimization_suggestions(analysis)
return analysis
def _calculate_complexity(self, sql: str) -> Dict:
"""计算查询复杂度"""
sql_lower = sql.lower()
complexity = {
"score": 0,
"level": "SIMPLE",
"factors": []
}
# 表数量
table_count = len(re.findall(r'\bfrom\s+\w+|\bjoin\s+\w+', sql_lower))
if table_count > 1:
complexity["score"] += table_count * 10
complexity["factors"].append(f"多表查询({table_count}个表)")
# 子查询
subquery_count = sql_lower.count('select') - 1
if subquery_count > 0:
complexity["score"] += subquery_count * 20
complexity["factors"].append(f"子查询({subquery_count}个)")
# 聚合函数
agg_functions = ['sum', 'count', 'avg', 'max', 'min', 'group by']
agg_count = sum(sql_lower.count(func) for func in agg_functions)
if agg_count > 0:
complexity["score"] += agg_count * 5
complexity["factors"].append(f"聚合操作({agg_count}个)")
# 排序
if 'order by' in sql_lower:
complexity["score"] += 10
complexity["factors"].append("排序操作")
# 窗口函数
if 'over(' in sql_lower:
complexity["score"] += 30
complexity["factors"].append("窗口函数")
# 确定复杂度级别
if complexity["score"] < 20:
complexity["level"] = "SIMPLE"
elif complexity["score"] < 50:
complexity["level"] = "MEDIUM"
elif complexity["score"] < 100:
complexity["level"] = "COMPLEX"
else:
complexity["level"] = "VERY_COMPLEX"
return complexity
def _extract_tables(self, sql: str) -> List[str]:
"""提取表名"""
# 简化的表名提取
pattern = r'\b(?:from|join)\s+([\w\.]+)'
matches = re.findall(pattern, sql.lower())
return list(set(matches))
def _extract_columns(self, sql: str) -> List[str]:
"""提取列名"""
# 简化的列名提取
select_part = re.search(r'select\s+(.*?)\s+from', sql.lower(), re.DOTALL)
if select_part:
columns_str = select_part.group(1)
# 移除函数和别名
columns = re.findall(r'\b([\w\.]+)\b', columns_str)
return [col for col in columns if col not in ['sum', 'count', 'avg', 'max', 'min', 'as']]
return []
def _extract_filters(self, sql: str) -> List[Dict]:
"""提取过滤条件"""
filters = []
# WHERE条件
where_match = re.search(r'where\s+(.*?)(?:\s+group\s+by|\s+order\s+by|\s+limit|$)', sql.lower(), re.DOTALL)
if where_match:
where_clause = where_match.group(1)
# 时间过滤
time_patterns = [
r'(\w+)\s*>=\s*[\'"]([\d\-\s:]+)[\'"]',
r'(\w+)\s*<=\s*[\'"]([\d\-\s:]+)[\'"]',
r'(\w+)\s*between\s*[\'"]([\d\-\s:]+)[\'"]\s*and\s*[\'"]([\d\-\s:]+)[\'"]'
]
for pattern in time_patterns:
matches = re.findall(pattern, where_clause)
for match in matches:
filters.append({
"type": "TIME_FILTER",
"column": match[0],
"condition": match
})
# 等值过滤
eq_pattern = r'(\w+)\s*=\s*[\'"]?([^\s\'"]+)[\'"]?'
eq_matches = re.findall(eq_pattern, where_clause)
for match in eq_matches:
filters.append({
"type": "EQUALITY_FILTER",
"column": match[0],
"value": match[1]
})
return filters
def _extract_aggregations(self, sql: str) -> List[Dict]:
"""提取聚合操作"""
aggregations = []
# 聚合函数
agg_pattern = r'(sum|count|avg|max|min)\s*\(\s*([^)]+)\s*\)'
matches = re.findall(agg_pattern, sql.lower())
for func, column in matches:
aggregations.append({
"function": func.upper(),
"column": column.strip()
})
# GROUP BY
group_by_match = re.search(r'group\s+by\s+([^\s]+(?:\s*,\s*[^\s]+)*)', sql.lower())
if group_by_match:
group_columns = [col.strip() for col in group_by_match.group(1).split(',')]
aggregations.append({
"function": "GROUP_BY",
"columns": group_columns
})
return aggregations
def _extract_joins(self, sql: str) -> List[Dict]:
"""提取JOIN操作"""
joins = []
join_pattern = r'(\w+\s+)?join\s+(\w+)\s+on\s+([^\s]+\s*=\s*[^\s]+)'
matches = re.findall(join_pattern, sql.lower())
for join_type, table, condition in matches:
joins.append({
"type": join_type.strip() if join_type else "INNER",
"table": table,
"condition": condition
})
return joins
def _generate_optimization_suggestions(self, analysis: Dict) -> List[str]:
"""生成优化建议"""
suggestions = []
# 复杂度建议
if analysis["complexity"]["level"] == "VERY_COMPLEX":
suggestions.append("查询过于复杂,建议拆分为多个简单查询")
# 过滤条件建议
time_filters = [f for f in analysis["filters"] if f["type"] == "TIME_FILTER"]
if not time_filters:
suggestions.append("建议添加时间过滤条件以提高查询性能")
# 聚合建议
if len(analysis["aggregations"]) > 5:
suggestions.append("聚合操作过多,考虑预计算或使用物化视图")
# JOIN建议
if len(analysis["joins"]) > 3:
suggestions.append("多表JOIN可能影响性能,考虑数据预处理")
# 列选择建议
if '*' in analysis["sql"]:
suggestions.append("避免使用SELECT *,明确指定需要的列")
return suggestions
def analyze_query_performance(self, query_metrics: QueryMetrics) -> Dict:
"""分析查询性能"""
performance_analysis = {
"query_id": query_metrics.query_id,
"execution_time_ms": query_metrics.execution_time_ms,
"performance_level": self._classify_performance(query_metrics.execution_time_ms),
"cache_efficiency": "HIT" if query_metrics.cache_hit else "MISS",
"scan_efficiency": self._analyze_scan_efficiency(query_metrics.scan_count, query_metrics.result_rows),
"recommendations": []
}
# 性能建议
if query_metrics.execution_time_ms > 10000: # 10秒
performance_analysis["recommendations"].append("查询时间过长,检查索引和Cube设计")
if not query_metrics.cache_hit:
performance_analysis["recommendations"].append("缓存未命中,考虑预热缓存")
if query_metrics.scan_count > query_metrics.result_rows * 10:
performance_analysis["recommendations"].append("扫描行数过多,优化过滤条件")
return performance_analysis
def _classify_performance(self, execution_time_ms: int) -> str:
"""分类性能等级"""
if execution_time_ms < 1000: # 1秒
return "EXCELLENT"
elif execution_time_ms < 5000: # 5秒
return "GOOD"
elif execution_time_ms < 10000: # 10秒
return "ACCEPTABLE"
else:
return "POOR"
def _analyze_scan_efficiency(self, scan_count: int, result_rows: int) -> Dict:
"""分析扫描效率"""
if result_rows == 0:
efficiency_ratio = 0
else:
efficiency_ratio = result_rows / scan_count
return {
"scan_count": scan_count,
"result_rows": result_rows,
"efficiency_ratio": round(efficiency_ratio, 4),
"level": "HIGH" if efficiency_ratio > 0.1 else "MEDIUM" if efficiency_ratio > 0.01 else "LOW"
}
def generate_query_report(self, queries: List[QueryMetrics]) -> Dict:
"""生成查询报告"""
if not queries:
return {"error": "没有查询数据"}
# 统计信息
total_queries = len(queries)
total_time = sum(q.execution_time_ms for q in queries)
avg_time = total_time / total_queries
cache_hits = sum(1 for q in queries if q.cache_hit)
cache_hit_rate = cache_hits / total_queries
# 性能分布
performance_dist = defaultdict(int)
for query in queries:
level = self._classify_performance(query.execution_time_ms)
performance_dist[level] += 1
# 慢查询
slow_queries = [q for q in queries if q.execution_time_ms > 10000]
slow_queries.sort(key=lambda x: x.execution_time_ms, reverse=True)
report = {
"report_time": datetime.now().isoformat(),
"summary": {
"total_queries": total_queries,
"total_time_ms": total_time,
"avg_time_ms": round(avg_time, 2),
"cache_hit_rate": round(cache_hit_rate, 4),
"slow_queries_count": len(slow_queries)
},
"performance_distribution": dict(performance_dist),
"slow_queries": [
{
"query_id": q.query_id,
"execution_time_ms": q.execution_time_ms,
"sql": q.sql[:100] + "..." if len(q.sql) > 100 else q.sql
}
for q in slow_queries[:10] # 只显示前10个
],
"recommendations": self._generate_report_recommendations(queries)
}
return report
def _generate_report_recommendations(self, queries: List[QueryMetrics]) -> List[str]:
"""生成报告建议"""
recommendations = []
# 缓存命中率建议
cache_hit_rate = sum(1 for q in queries if q.cache_hit) / len(queries)
if cache_hit_rate < 0.5:
recommendations.append(f"缓存命中率较低({cache_hit_rate:.2%}),建议优化缓存策略")
# 慢查询建议
slow_queries = [q for q in queries if q.execution_time_ms > 10000]
if len(slow_queries) > len(queries) * 0.1:
recommendations.append(f"慢查询比例过高({len(slow_queries)/len(queries):.2%}),需要优化查询性能")
# 扫描效率建议
low_efficiency_queries = [
q for q in queries
if q.result_rows > 0 and q.scan_count / q.result_rows > 100
]
if len(low_efficiency_queries) > len(queries) * 0.2:
recommendations.append("扫描效率较低的查询较多,建议优化过滤条件")
return recommendations
def print_query_analysis(self, sql: str):
"""打印查询分析结果"""
analysis = self.analyze_sql(sql)
print(f"\n=== SQL查询分析 ===")
print(f"分析时间: {analysis['analysis_time']}")
print(f"\n📊 复杂度分析:")
complexity = analysis['complexity']
print(f" 复杂度等级: {complexity['level']}")
print(f" 复杂度评分: {complexity['score']}")
if complexity['factors']:
print(f" 复杂度因素: {', '.join(complexity['factors'])}")
print(f"\n🗂️ 查询组件:")
print(f" 涉及表: {', '.join(analysis['tables']) if analysis['tables'] else '无'}")
print(f" 查询列: {len(analysis['columns'])}个")
print(f" 过滤条件: {len(analysis['filters'])}个")
print(f" 聚合操作: {len(analysis['aggregations'])}个")
print(f" JOIN操作: {len(analysis['joins'])}个")
if analysis['optimization_suggestions']:
print(f"\n💡 优化建议:")
for i, suggestion in enumerate(analysis['optimization_suggestions'], 1):
print(f" {i}. {suggestion}")
def print_performance_report(self, queries: List[QueryMetrics]):
"""打印性能报告"""
report = self.generate_query_report(queries)
if "error" in report:
print(f"错误: {report['error']}")
return
print(f"\n=== 查询性能报告 ===")
print(f"报告时间: {report['report_time']}")
summary = report['summary']
print(f"\n📈 总体统计:")
print(f" 查询总数: {summary['total_queries']}")
print(f" 总执行时间: {summary['total_time_ms']}ms")
print(f" 平均执行时间: {summary['avg_time_ms']}ms")
print(f" 缓存命中率: {summary['cache_hit_rate']:.2%}")
print(f" 慢查询数量: {summary['slow_queries_count']}")
print(f"\n🎯 性能分布:")
for level, count in report['performance_distribution'].items():
percentage = count / summary['total_queries'] * 100
print(f" {level}: {count}个 ({percentage:.1f}%)")
if report['slow_queries']:
print(f"\n🐌 慢查询TOP10:")
for i, query in enumerate(report['slow_queries'], 1):
print(f" {i}. {query['query_id']}: {query['execution_time_ms']}ms")
print(f" SQL: {query['sql']}")
if report['recommendations']:
print(f"\n💡 优化建议:")
for i, rec in enumerate(report['recommendations'], 1):
print(f" {i}. {rec}")
def main():
analyzer = QueryAnalyzer()
# 示例SQL分析
sample_sql = """
SELECT
d.year, d.month, p.category,
SUM(f.sales_amount) as total_sales,
COUNT(*) as order_count,
AVG(f.sales_amount) as avg_order_value
FROM sales_fact f
JOIN date_dim d ON f.date_key = d.date_key
JOIN product_dim p ON f.product_key = p.product_key
WHERE d.date_value >= '2023-01-01'
AND d.date_value <= '2023-12-31'
AND p.category IN ('Electronics', 'Clothing')
GROUP BY d.year, d.month, p.category
ORDER BY d.year, d.month, total_sales DESC
"""
analyzer.print_query_analysis(sample_sql)
# 示例性能数据
sample_queries = [
QueryMetrics(
query_id="q1", sql=sample_sql, execution_time_ms=2500,
scan_count=10000, cache_hit=True, cuboid_id="cube1",
result_rows=100, timestamp=datetime.now()
),
QueryMetrics(
query_id="q2", sql="SELECT COUNT(*) FROM sales_fact", execution_time_ms=15000,
scan_count=1000000, cache_hit=False, cuboid_id="cube2",
result_rows=1, timestamp=datetime.now()
)
]
analyzer.print_performance_report(sample_queries)
if __name__ == "__main__":
main()
7.3 系统级优化
7.3.1 HBase优化配置
HBase参数调优
#!/bin/bash
# hbase_tuning.sh - HBase性能调优脚本
echo "=== HBase性能调优配置 ==="
# HBase配置文件路径
HBASE_CONF_DIR="/opt/hbase/conf"
HBASE_SITE_XML="$HBASE_CONF_DIR/hbase-site.xml"
# 备份原配置
cp "$HBASE_SITE_XML" "$HBASE_SITE_XML.backup.$(date +%Y%m%d_%H%M%S)"
echo "正在优化HBase配置..."
# 创建优化配置
cat > "$HBASE_SITE_XML" << 'EOF'
<?xml version="1.0"?>
<configuration>
<!-- 内存优化 -->
<property>
<name>hbase.regionserver.global.memstore.size</name>
<value>0.4</value>
<description>RegionServer内存中MemStore占用的比例</description>
</property>
<property>
<name>hbase.regionserver.global.memstore.size.lower.limit</name>
<value>0.38</value>
<description>MemStore强制flush的下限</description>
</property>
<property>
<name>hbase.hregion.memstore.flush.size</name>
<value>134217728</value>
<description>MemStore flush大小(128MB)</description>
</property>
<property>
<name>hbase.hregion.memstore.block.multiplier</name>
<value>4</value>
<description>MemStore阻塞倍数</description>
</property>
<!-- 压缩优化 -->
<property>
<name>hbase.hregion.max.filesize</name>
<value>10737418240</value>
<description>HFile最大大小(10GB)</description>
</property>
<property>
<name>hbase.hstore.compaction.min</name>
<value>3</value>
<description>最小压缩文件数</description>
</property>
<property>
<name>hbase.hstore.compaction.max</name>
<value>10</value>
<description>最大压缩文件数</description>
</property>
<property>
<name>hbase.hstore.compaction.ratio</name>
<value>1.2</value>
<description>压缩比例</description>
</property>
<!-- 并发优化 -->
<property>
<name>hbase.regionserver.handler.count</name>
<value>100</value>
<description>RegionServer处理线程数</description>
</property>
<property>
<name>hbase.regionserver.metahandler.count</name>
<value>20</value>
<description>Meta表处理线程数</description>
</property>
<property>
<name>hbase.htable.threads.max</name>
<value>96</value>
<description>客户端最大线程数</description>
</property>
<!-- 缓存优化 -->
<property>
<name>hfile.block.cache.size</name>
<value>0.4</value>
<description>BlockCache大小比例</description>
</property>
<property>
<name>hbase.bucketcache.size</name>
<value>2048</value>
<description>BucketCache大小(MB)</description>
</property>
<property>
<name>hbase.bucketcache.ioengine</name>
<value>offheap</value>
<description>BucketCache存储引擎</description>
</property>
<!-- 网络优化 -->
<property>
<name>hbase.ipc.server.callqueue.handler.factor</name>
<value>0.1</value>
<description>IPC调用队列因子</description>
</property>
<property>
<name>hbase.ipc.server.callqueue.read.ratio</name>
<value>0.7</value>
<description>读操作队列比例</description>
</property>
<property>
<name>hbase.ipc.server.callqueue.scan.ratio</name>
<value>0.5</value>
<description>扫描操作队列比例</description>
</property>
<!-- WAL优化 -->
<property>
<name>hbase.regionserver.hlog.blocksize</name>
<value>134217728</value>
<description>WAL块大小(128MB)</description>
</property>
<property>
<name>hbase.regionserver.maxlogs</name>
<value>32</value>
<description>最大WAL文件数</description>
</property>
<property>
<name>hbase.wal.provider</name>
<value>multiwal</value>
<description>WAL提供者</description>
</property>
<!-- 超时设置 -->
<property>
<name>hbase.client.scanner.timeout.period</name>
<value>600000</value>
<description>扫描器超时时间(10分钟)</description>
</property>
<property>
<name>hbase.rpc.timeout</name>
<value>300000</value>
<description>RPC超时时间(5分钟)</description>
</property>
<!-- 其他优化 -->
<property>
<name>hbase.master.balancer.stochastic.maxMovePercent</name>
<value>0.25</value>
<description>负载均衡最大移动比例</description>
</property>
<property>
<name>hbase.balancer.period</name>
<value>300000</value>
<description>负载均衡周期(5分钟)</description>
</property>
</configuration>
EOF
echo "HBase配置优化完成!"
echo "请重启HBase服务以应用新配置"
# 显示关键配置
echo ""
echo "=== 关键配置摘要 ==="
echo "MemStore大小比例: 40%"
echo "BlockCache大小比例: 40%"
echo "BucketCache大小: 2GB"
echo "RegionServer处理线程: 100"
echo "最大HFile大小: 10GB"
echo "WAL块大小: 128MB"
# 生成JVM参数建议
echo ""
echo "=== 推荐JVM参数 ==="
echo "export HBASE_REGIONSERVER_OPTS=\""
echo " -Xms8g -Xmx8g"
echo " -XX:+UseG1GC"
echo " -XX:MaxGCPauseMillis=50"
echo " -XX:G1HeapRegionSize=16m"
echo " -XX:+UnlockExperimentalVMOptions"
echo " -XX:+UseCGroupMemoryLimitForHeap"
echo " -XX:+UseStringDeduplication"
echo " -XX:+OptimizeStringConcat"
echo " -Dhbase.log.dir=/var/log/hbase"
echo " -Dhbase.log.file=hbase-regionserver.log\""
echo ""
echo "export HBASE_MASTER_OPTS=\""
echo " -Xms2g -Xmx2g"
echo " -XX:+UseG1GC"
echo " -XX:MaxGCPauseMillis=50"
echo " -Dhbase.log.dir=/var/log/hbase"
echo " -Dhbase.log.file=hbase-master.log\""
7.3.2 Spark SQL优化
Spark配置优化
#!/usr/bin/env python3
# spark_optimizer.py - Spark SQL优化器
import os
import json
import logging
from typing import Dict, List, Optional
from dataclasses import dataclass
@dataclass
class SparkConfig:
"""Spark配置"""
executor_memory: str
executor_cores: int
num_executors: int
driver_memory: str
driver_cores: int
max_result_size: str
sql_adaptive_enabled: bool
sql_adaptive_coalesce_partitions_enabled: bool
sql_adaptive_skew_join_enabled: bool
serializer: str
kryo_buffer_max: str
class SparkOptimizer:
"""Spark SQL优化器"""
def __init__(self):
self.logger = logging.getLogger(__name__)
def analyze_cluster_resources(self) -> Dict:
"""分析集群资源"""
# 这里简化实现,实际应该从YARN或K8s获取
return {
"total_memory_gb": 128,
"total_cores": 32,
"num_nodes": 4,
"memory_per_node_gb": 32,
"cores_per_node": 8
}
def calculate_optimal_config(self, workload_type: str = "olap") -> SparkConfig:
"""计算最优配置"""
resources = self.analyze_cluster_resources()
if workload_type == "olap":
# OLAP工作负载优化
executor_memory_gb = min(8, resources["memory_per_node_gb"] // 2)
executor_cores = min(4, resources["cores_per_node"] // 2)
num_executors = (resources["num_nodes"] * resources["cores_per_node"]) // executor_cores - 1
return SparkConfig(
executor_memory=f"{executor_memory_gb}g",
executor_cores=executor_cores,
num_executors=num_executors,
driver_memory="4g",
driver_cores=2,
max_result_size="2g",
sql_adaptive_enabled=True,
sql_adaptive_coalesce_partitions_enabled=True,
sql_adaptive_skew_join_enabled=True,
serializer="org.apache.spark.serializer.KryoSerializer",
kryo_buffer_max="256m"
)
else:
# 默认配置
return SparkConfig(
executor_memory="2g",
executor_cores=2,
num_executors=4,
driver_memory="2g",
driver_cores=1,
max_result_size="1g",
sql_adaptive_enabled=True,
sql_adaptive_coalesce_partitions_enabled=True,
sql_adaptive_skew_join_enabled=True,
serializer="org.apache.spark.serializer.KryoSerializer",
kryo_buffer_max="128m"
)
def generate_spark_conf(self, config: SparkConfig) -> Dict[str, str]:
"""生成Spark配置"""
return {
# 基础配置
"spark.executor.memory": config.executor_memory,
"spark.executor.cores": str(config.executor_cores),
"spark.executor.instances": str(config.num_executors),
"spark.driver.memory": config.driver_memory,
"spark.driver.cores": str(config.driver_cores),
"spark.driver.maxResultSize": config.max_result_size,
# SQL优化
"spark.sql.adaptive.enabled": str(config.sql_adaptive_enabled).lower(),
"spark.sql.adaptive.coalescePartitions.enabled": str(config.sql_adaptive_coalesce_partitions_enabled).lower(),
"spark.sql.adaptive.skewJoin.enabled": str(config.sql_adaptive_skew_join_enabled).lower(),
"spark.sql.adaptive.advisoryPartitionSizeInBytes": "128MB",
"spark.sql.adaptive.coalescePartitions.minPartitionNum": "1",
"spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes": "256MB",
# 序列化优化
"spark.serializer": config.serializer,
"spark.kryoserializer.buffer.max": config.kryo_buffer_max,
"spark.sql.execution.arrow.pyspark.enabled": "true",
# 内存优化
"spark.executor.memoryFraction": "0.8",
"spark.storage.memoryFraction": "0.5",
"spark.shuffle.memoryFraction": "0.3",
"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1PrintRegionRememberedSetInfo",
# 网络优化
"spark.network.timeout": "600s",
"spark.executor.heartbeatInterval": "20s",
"spark.sql.broadcastTimeout": "600",
# Shuffle优化
"spark.sql.shuffle.partitions": "200",
"spark.shuffle.compress": "true",
"spark.shuffle.spill.compress": "true",
"spark.io.compression.codec": "snappy",
# 动态分配
"spark.dynamicAllocation.enabled": "true",
"spark.dynamicAllocation.minExecutors": "1",
"spark.dynamicAllocation.maxExecutors": str(config.num_executors * 2),
"spark.dynamicAllocation.initialExecutors": str(config.num_executors),
# Kylin特定配置
"spark.sql.execution.arrow.maxRecordsPerBatch": "10000",
"spark.sql.execution.arrow.fallback.enabled": "true",
"spark.sql.parquet.enableVectorizedReader": "true",
"spark.sql.parquet.columnarReaderBatchSize": "4096"
}
def optimize_for_kylin(self, base_config: Dict[str, str]) -> Dict[str, str]:
"""针对Kylin优化Spark配置"""
kylin_optimizations = {
# Kylin特定优化
"spark.sql.execution.arrow.pyspark.enabled": "true",
"spark.sql.execution.arrow.maxRecordsPerBatch": "10000",
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.sql.adaptive.localShuffleReader.enabled": "true",
"spark.sql.adaptive.skewJoin.enabled": "true",
# 列式存储优化
"spark.sql.parquet.enableVectorizedReader": "true",
"spark.sql.parquet.columnarReaderBatchSize": "4096",
"spark.sql.orc.enableVectorizedReader": "true",
# 内存管理
"spark.sql.execution.arrow.fallback.enabled": "true",
"spark.sql.execution.pandas.convertToArrowArraySafely": "true",
# 查询优化
"spark.sql.optimizer.excludedRules": "org.apache.spark.sql.catalyst.optimizer.ColumnPruning",
"spark.sql.cbo.enabled": "true",
"spark.sql.cbo.joinReorder.enabled": "true",
"spark.sql.statistics.histogram.enabled": "true",
# 缓存优化
"spark.sql.inMemoryColumnarStorage.compressed": "true",
"spark.sql.inMemoryColumnarStorage.batchSize": "10000",
# 并发控制
"spark.sql.execution.arrow.maxRecordsPerBatch": "10000",
"spark.sql.adaptive.advisoryPartitionSizeInBytes": "128MB"
}
# 合并配置
optimized_config = base_config.copy()
optimized_config.update(kylin_optimizations)
return optimized_config
def generate_spark_defaults_conf(self, config: Dict[str, str]) -> str:
"""生成spark-defaults.conf文件内容"""
lines = ["# Spark配置文件 - 由SparkOptimizer自动生成"]
lines.append(f"# 生成时间: {os.popen('date').read().strip()}")
lines.append("")
# 按类别组织配置
categories = {
"基础配置": ["spark.executor.memory", "spark.executor.cores", "spark.executor.instances",
"spark.driver.memory", "spark.driver.cores", "spark.driver.maxResultSize"],
"SQL优化": [k for k in config.keys() if "sql" in k],
"内存优化": [k for k in config.keys() if "memory" in k and "sql" not in k],
"网络优化": [k for k in config.keys() if "network" in k or "timeout" in k],
"序列化优化": [k for k in config.keys() if "serializer" in k or "kryo" in k],
"其他优化": [k for k in config.keys() if k not in sum([v for v in categories.values()], [])]
}
for category, keys in categories.items():
if any(k in config for k in keys):
lines.append(f"# {category}")
for key in keys:
if key in config:
lines.append(f"{key} {config[key]}")
lines.append("")
return "\n".join(lines)
def analyze_query_performance(self, query_stats: Dict) -> Dict:
"""分析查询性能"""
analysis = {
"performance_score": 0,
"bottlenecks": [],
"recommendations": []
}
# 分析执行时间
if query_stats.get("avg_execution_time_ms", 0) > 30000: # 30秒
analysis["bottlenecks"].append("查询执行时间过长")
analysis["recommendations"].append("考虑增加executor数量或优化SQL")
# 分析内存使用
if query_stats.get("max_memory_usage_mb", 0) > 4096: # 4GB
analysis["bottlenecks"].append("内存使用过高")
analysis["recommendations"].append("增加executor内存或优化数据分区")
# 分析数据倾斜
if query_stats.get("task_time_variance", 0) > 0.5:
analysis["bottlenecks"].append("存在数据倾斜")
analysis["recommendations"].append("启用skewJoin优化或重新分区数据")
# 分析Shuffle
if query_stats.get("shuffle_read_mb", 0) > 1024: # 1GB
analysis["bottlenecks"].append("Shuffle数据量过大")
analysis["recommendations"].append("优化JOIN策略或增加分区数")
# 计算性能分数
score = 100
score -= len(analysis["bottlenecks"]) * 20
analysis["performance_score"] = max(0, score)
return analysis
def print_optimization_report(self, workload_type: str = "olap"):
"""打印优化报告"""
print(f"\n=== Spark SQL优化报告 ===")
print(f"工作负载类型: {workload_type.upper()}")
# 分析资源
resources = self.analyze_cluster_resources()
print(f"\n📊 集群资源:")
print(f" 总内存: {resources['total_memory_gb']}GB")
print(f" 总核心数: {resources['total_cores']}")
print(f" 节点数: {resources['num_nodes']}")
print(f" 每节点内存: {resources['memory_per_node_gb']}GB")
print(f" 每节点核心数: {resources['cores_per_node']}")
# 计算最优配置
config = self.calculate_optimal_config(workload_type)
print(f"\n⚙️ 推荐配置:")
print(f" Executor内存: {config.executor_memory}")
print(f" Executor核心数: {config.executor_cores}")
print(f" Executor数量: {config.num_executors}")
print(f" Driver内存: {config.driver_memory}")
print(f" Driver核心数: {config.driver_cores}")
# 生成配置
spark_conf = self.generate_spark_conf(config)
optimized_conf = self.optimize_for_kylin(spark_conf)
print(f"\n🔧 关键优化配置:")
key_configs = [
"spark.sql.adaptive.enabled",
"spark.sql.adaptive.coalescePartitions.enabled",
"spark.sql.adaptive.skewJoin.enabled",
"spark.serializer",
"spark.sql.shuffle.partitions"
]
for key in key_configs:
if key in optimized_conf:
print(f" {key}: {optimized_conf[key]}")
print(f"\n📝 配置文件生成:")
conf_content = self.generate_spark_defaults_conf(optimized_conf)
print(f" 配置项数量: {len(optimized_conf)}")
print(f" 配置文件大小: {len(conf_content)}字符")
# 性能预期
print(f"\n📈 性能预期:")
print(f" 内存利用率提升: 20-30%")
print(f" 查询响应时间改善: 15-25%")
print(f" 并发处理能力提升: 30-40%")
print(f" 资源利用效率提升: 25-35%")
def main():
optimizer = SparkOptimizer()
# 生成OLAP工作负载优化报告
optimizer.print_optimization_report("olap")
# 生成配置文件
config = optimizer.calculate_optimal_config("olap")
spark_conf = optimizer.generate_spark_conf(config)
optimized_conf = optimizer.optimize_for_kylin(spark_conf)
# 保存配置文件
conf_content = optimizer.generate_spark_defaults_conf(optimized_conf)
with open("spark-defaults.conf", "w") as f:
f.write(conf_content)
print(f"\n✅ 配置文件已保存到: spark-defaults.conf")
# 示例查询性能分析
sample_stats = {
"avg_execution_time_ms": 45000,
"max_memory_usage_mb": 6144,
"task_time_variance": 0.7,
"shuffle_read_mb": 2048
}
performance_analysis = optimizer.analyze_query_performance(sample_stats)
print(f"\n🔍 查询性能分析示例:")
print(f" 性能分数: {performance_analysis['performance_score']}/100")
if performance_analysis["bottlenecks"]:
print(f" 性能瓶颈:")
for i, bottleneck in enumerate(performance_analysis["bottlenecks"], 1):
print(f" {i}. {bottleneck}")
if performance_analysis["recommendations"]:
print(f" 优化建议:")
for i, rec in enumerate(performance_analysis["recommendations"], 1):
print(f" {i}. {rec}")
if __name__ == "__main__":
main()
7.4 最佳实践与总结
7.4.1 查询优化最佳实践
1. SQL编写规范
-- ✅ 好的实践
-- 1. 明确指定需要的列,避免SELECT *
SELECT
customer_id,
SUM(sales_amount) as total_sales,
COUNT(*) as order_count
FROM sales_fact
WHERE sale_date >= '2023-01-01'
AND sale_date < '2024-01-01'
GROUP BY customer_id
HAVING SUM(sales_amount) > 1000
ORDER BY total_sales DESC
LIMIT 100;
-- ❌ 避免的写法
SELECT *
FROM sales_fact
WHERE YEAR(sale_date) = 2023;
2. 维度表优化
-- ✅ 使用维度键进行JOIN
SELECT
d.year,
d.quarter,
p.category,
SUM(f.sales_amount) as total_sales
FROM sales_fact f
JOIN date_dim d ON f.date_key = d.date_key
JOIN product_dim p ON f.product_key = p.product_key
WHERE d.year = 2023
GROUP BY d.year, d.quarter, p.category;
-- ❌ 避免在事实表上使用函数
SELECT
YEAR(f.sale_date) as year,
QUARTER(f.sale_date) as quarter,
p.category,
SUM(f.sales_amount) as total_sales
FROM sales_fact f
JOIN product_dim p ON f.product_id = p.product_id
WHERE YEAR(f.sale_date) = 2023
GROUP BY YEAR(f.sale_date), QUARTER(f.sale_date), p.category;
3. 分区策略
-- ✅ 利用分区剪枝
SELECT
region,
SUM(sales_amount) as total_sales
FROM sales_fact
WHERE year = 2023 -- 分区字段
AND month IN (10, 11, 12) -- 分区字段
AND product_category = 'Electronics'
GROUP BY region;
-- ✅ 多级分区查询
SELECT
product_category,
AVG(sales_amount) as avg_sales
FROM sales_fact
WHERE year = 2023
AND month = 12
AND day BETWEEN 1 AND 15
GROUP BY product_category;
7.4.2 性能监控指标
关键性能指标(KPI)
#!/usr/bin/env python3
# performance_kpi.py - 性能KPI监控
from dataclasses import dataclass
from typing import Dict, List
from datetime import datetime, timedelta
import json
@dataclass
class PerformanceKPI:
"""性能KPI"""
timestamp: datetime
query_response_time_p95: float # 95分位响应时间(ms)
query_response_time_avg: float # 平均响应时间(ms)
query_success_rate: float # 查询成功率(%)
concurrent_queries: int # 并发查询数
cache_hit_rate: float # 缓存命中率(%)
cuboid_hit_rate: float # Cuboid命中率(%)
system_cpu_usage: float # 系统CPU使用率(%)
system_memory_usage: float # 系统内存使用率(%)
hbase_read_qps: float # HBase读QPS
spark_job_duration_avg: float # Spark作业平均时长(s)
class PerformanceMonitor:
"""性能监控器"""
def __init__(self):
self.kpi_history = []
self.alert_thresholds = {
"query_response_time_p95": 30000, # 30秒
"query_success_rate": 95.0, # 95%
"cache_hit_rate": 70.0, # 70%
"system_cpu_usage": 80.0, # 80%
"system_memory_usage": 85.0 # 85%
}
def collect_kpi(self) -> PerformanceKPI:
"""收集性能KPI(模拟实现)"""
# 实际实现中应该从监控系统获取真实数据
import random
return PerformanceKPI(
timestamp=datetime.now(),
query_response_time_p95=random.uniform(5000, 25000),
query_response_time_avg=random.uniform(2000, 15000),
query_success_rate=random.uniform(92, 99.5),
concurrent_queries=random.randint(10, 50),
cache_hit_rate=random.uniform(60, 90),
cuboid_hit_rate=random.uniform(70, 95),
system_cpu_usage=random.uniform(30, 85),
system_memory_usage=random.uniform(40, 80),
hbase_read_qps=random.uniform(100, 1000),
spark_job_duration_avg=random.uniform(30, 300)
)
def check_alerts(self, kpi: PerformanceKPI) -> List[Dict]:
"""检查告警"""
alerts = []
for metric, threshold in self.alert_thresholds.items():
value = getattr(kpi, metric)
if metric in ["query_response_time_p95", "system_cpu_usage", "system_memory_usage"]:
# 这些指标超过阈值时告警
if value > threshold:
alerts.append({
"metric": metric,
"value": value,
"threshold": threshold,
"severity": "HIGH" if value > threshold * 1.2 else "MEDIUM",
"message": f"{metric}超过阈值: {value:.2f} > {threshold}"
})
else:
# 这些指标低于阈值时告警
if value < threshold:
alerts.append({
"metric": metric,
"value": value,
"threshold": threshold,
"severity": "HIGH" if value < threshold * 0.8 else "MEDIUM",
"message": f"{metric}低于阈值: {value:.2f} < {threshold}"
})
return alerts
def analyze_trend(self, hours: int = 24) -> Dict:
"""分析性能趋势"""
if len(self.kpi_history) < 2:
return {"trend": "INSUFFICIENT_DATA"}
# 获取指定时间范围内的数据
cutoff_time = datetime.now() - timedelta(hours=hours)
recent_kpis = [kpi for kpi in self.kpi_history if kpi.timestamp > cutoff_time]
if len(recent_kpis) < 2:
return {"trend": "INSUFFICIENT_DATA"}
# 计算趋势
first_half = recent_kpis[:len(recent_kpis)//2]
second_half = recent_kpis[len(recent_kpis)//2:]
def avg_metric(kpis, metric):
return sum(getattr(kpi, metric) for kpi in kpis) / len(kpis)
trends = {}
metrics = ["query_response_time_avg", "query_success_rate", "cache_hit_rate",
"system_cpu_usage", "system_memory_usage"]
for metric in metrics:
first_avg = avg_metric(first_half, metric)
second_avg = avg_metric(second_half, metric)
change_pct = ((second_avg - first_avg) / first_avg) * 100
if abs(change_pct) < 5:
trend = "STABLE"
elif change_pct > 0:
trend = "INCREASING" if metric not in ["query_response_time_avg", "system_cpu_usage", "system_memory_usage"] else "DEGRADING"
else:
trend = "DECREASING" if metric not in ["query_response_time_avg", "system_cpu_usage", "system_memory_usage"] else "IMPROVING"
trends[metric] = {
"trend": trend,
"change_percent": change_pct,
"first_period_avg": first_avg,
"second_period_avg": second_avg
}
return trends
def generate_performance_report(self) -> Dict:
"""生成性能报告"""
if not self.kpi_history:
return {"error": "没有性能数据"}
latest_kpi = self.kpi_history[-1]
alerts = self.check_alerts(latest_kpi)
trends = self.analyze_trend()
# 计算性能评分
score = 100
# 响应时间评分
if latest_kpi.query_response_time_p95 > 30000:
score -= 30
elif latest_kpi.query_response_time_p95 > 15000:
score -= 15
# 成功率评分
if latest_kpi.query_success_rate < 95:
score -= 25
elif latest_kpi.query_success_rate < 98:
score -= 10
# 缓存命中率评分
if latest_kpi.cache_hit_rate < 70:
score -= 20
elif latest_kpi.cache_hit_rate < 80:
score -= 10
# 系统资源评分
if latest_kpi.system_cpu_usage > 80 or latest_kpi.system_memory_usage > 85:
score -= 15
performance_level = "EXCELLENT" if score >= 90 else "GOOD" if score >= 70 else "FAIR" if score >= 50 else "POOR"
return {
"timestamp": latest_kpi.timestamp.isoformat(),
"performance_score": max(0, score),
"performance_level": performance_level,
"current_kpi": {
"query_response_time_p95": f"{latest_kpi.query_response_time_p95:.0f}ms",
"query_response_time_avg": f"{latest_kpi.query_response_time_avg:.0f}ms",
"query_success_rate": f"{latest_kpi.query_success_rate:.1f}%",
"cache_hit_rate": f"{latest_kpi.cache_hit_rate:.1f}%",
"cuboid_hit_rate": f"{latest_kpi.cuboid_hit_rate:.1f}%",
"concurrent_queries": latest_kpi.concurrent_queries,
"system_cpu_usage": f"{latest_kpi.system_cpu_usage:.1f}%",
"system_memory_usage": f"{latest_kpi.system_memory_usage:.1f}%"
},
"alerts": alerts,
"trends": trends
}
def print_performance_dashboard(self):
"""打印性能仪表板"""
report = self.generate_performance_report()
if "error" in report:
print(f"❌ {report['error']}")
return
print(f"\n=== 性能监控仪表板 ===")
print(f"报告时间: {report['timestamp']}")
print(f"性能评分: {report['performance_score']}/100 ({report['performance_level']})")
print(f"\n📊 当前性能指标:")
kpi = report['current_kpi']
print(f" 查询响应时间(P95): {kpi['query_response_time_p95']}")
print(f" 查询响应时间(平均): {kpi['query_response_time_avg']}")
print(f" 查询成功率: {kpi['query_success_rate']}")
print(f" 缓存命中率: {kpi['cache_hit_rate']}")
print(f" Cuboid命中率: {kpi['cuboid_hit_rate']}")
print(f" 并发查询数: {kpi['concurrent_queries']}")
print(f" CPU使用率: {kpi['system_cpu_usage']}")
print(f" 内存使用率: {kpi['system_memory_usage']}")
# 告警信息
if report['alerts']:
print(f"\n🚨 性能告警:")
for i, alert in enumerate(report['alerts'], 1):
severity_icon = "🔴" if alert['severity'] == 'HIGH' else "🟡"
print(f" {i}. {severity_icon} {alert['message']}")
else:
print(f"\n✅ 无性能告警")
# 趋势分析
if report['trends'] and report['trends'] != {"trend": "INSUFFICIENT_DATA"}:
print(f"\n📈 性能趋势(24小时):")
for metric, trend_data in report['trends'].items():
trend_icon = "📈" if "IMPROVING" in trend_data['trend'] or "INCREASING" in trend_data['trend'] else "📉" if "DEGRADING" in trend_data['trend'] or "DECREASING" in trend_data['trend'] else "➡️"
print(f" {trend_icon} {metric}: {trend_data['trend']} ({trend_data['change_percent']:+.1f}%)")
def main():
monitor = PerformanceMonitor()
# 模拟收集一些历史数据
print("正在收集性能数据...")
for i in range(10):
kpi = monitor.collect_kpi()
monitor.kpi_history.append(kpi)
# 显示性能仪表板
monitor.print_performance_dashboard()
if __name__ == "__main__":
main()
7.4.3 故障排除指南
常见问题诊断
问题症状 | 可能原因 | 解决方案 |
---|---|---|
查询响应慢 | Cube未命中、数据倾斜、资源不足 | 检查Cube设计、优化分区、增加资源 |
内存溢出 | Executor内存不足、数据量过大 | 增加内存、优化查询、分批处理 |
查询失败 | SQL语法错误、权限问题、服务异常 | 检查SQL、验证权限、重启服务 |
缓存命中率低 | TTL设置不当、查询模式变化 | 调整TTL、分析查询模式 |
系统负载高 | 并发过高、资源竞争 | 限制并发、优化调度 |
本章小结
本章深入介绍了Apache Kylin的查询优化与性能调优,主要内容包括:
核心知识点
查询优化基础
- 查询执行原理和流程
- 查询分析器的设计与实现
- SQL复杂度分析方法
Cube匹配优化
- Cube匹配策略和算法
- Cuboid选择和推荐
- 查询模式分析
查询性能优化
- SQL优化技巧和重写规则
- 缓存优化策略
- 索引建议系统
系统级优化
- HBase参数调优
- Spark SQL配置优化
- 资源管理和分配
性能监控
- 关键性能指标(KPI)
- 性能趋势分析
- 告警和故障排除
实用工具
- 查询分析器: 分析SQL复杂度和性能
- Cube匹配器: 智能选择最优Cuboid
- SQL优化器: 自动优化SQL查询
- 缓存优化器: 管理和优化查询缓存
- Spark优化器: 生成最优Spark配置
- 性能监控器: 实时监控系统性能
最佳实践
- 查询设计: 遵循SQL编写规范,合理使用维度和度量
- 缓存策略: 根据查询模式优化缓存配置
- 资源配置: 基于工作负载特点调整系统参数
- 监控告警: 建立完善的性能监控体系
- 故障排除: 掌握常见问题的诊断和解决方法
通过本章的学习,你应该能够: - 理解Kylin查询优化的原理和方法 - 掌握各种性能调优技术 - 使用工具进行查询分析和优化 - 建立有效的性能监控体系 - 快速诊断和解决性能问题
下一章我们将学习”运维管理与监控”,了解如何在生产环境中管理和维护Kylin集群。
练习题
理论题
- 解释Kylin查询执行的完整流程
- 分析影响查询性能的主要因素
- 比较不同缓存策略的优缺点
实践题
- 使用查询分析器分析复杂SQL的性能
- 配置Spark SQL参数优化查询性能
- 设计性能监控指标和告警规则
思考题
- 如何在查询性能和资源消耗之间找到平衡?
- 在什么情况下应该考虑重新设计Cube?
- 如何评估查询优化的效果?
7.1.2 Cube匹配优化
Cube匹配策略
Kylin通过智能的Cube匹配算法来选择最优的Cuboid进行查询。理解这个过程有助于优化Cube设计。
#!/usr/bin/env python3
# cube_matcher.py - Cube匹配优化器
import json
import logging
from typing import Dict, List, Set, Optional
from dataclasses import dataclass
from collections import defaultdict
@dataclass
class CuboidInfo:
"""Cuboid信息"""
cuboid_id: str
dimensions: Set[str]
measures: Set[str]
row_count: int
size_mb: float
build_time: str
@dataclass
class QueryPattern:
"""查询模式"""
dimensions: Set[str]
measures: Set[str]
filters: Dict[str, any]
group_by: Set[str]
order_by: List[str]
class CubeMatcher:
"""Cube匹配优化器"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.cuboids = []
self.query_history = []
def add_cuboid(self, cuboid: CuboidInfo):
"""添加Cuboid信息"""
self.cuboids.append(cuboid)
self.logger.info(f"添加Cuboid: {cuboid.cuboid_id}")
def find_matching_cuboids(self, query_pattern: QueryPattern) -> List[Dict]:
"""查找匹配的Cuboids"""
matching_cuboids = []
for cuboid in self.cuboids:
match_score = self._calculate_match_score(cuboid, query_pattern)
if match_score > 0:
matching_cuboids.append({
"cuboid": cuboid,
"match_score": match_score,
"coverage": self._calculate_coverage(cuboid, query_pattern)
})
# 按匹配分数排序
matching_cuboids.sort(key=lambda x: x["match_score"], reverse=True)
return matching_cuboids
def _calculate_match_score(self, cuboid: CuboidInfo, query_pattern: QueryPattern) -> float:
"""计算匹配分数"""
score = 0.0
# 维度匹配度 (权重: 40%)
required_dims = query_pattern.dimensions | query_pattern.group_by
if required_dims.issubset(cuboid.dimensions):
dim_coverage = len(required_dims) / len(cuboid.dimensions) if cuboid.dimensions else 0
score += 40 * dim_coverage
else:
# 如果不能完全覆盖,返回0分
return 0.0
# 度量匹配度 (权重: 30%)
if query_pattern.measures.issubset(cuboid.measures):
measure_coverage = len(query_pattern.measures) / len(cuboid.measures) if cuboid.measures else 0
score += 30 * measure_coverage
else:
return 0.0
# 存储效率 (权重: 20%)
# 优先选择较小的Cuboid
if cuboid.size_mb > 0:
size_score = max(0, 1 - (cuboid.size_mb / 10000)) # 假设10GB为基准
score += 20 * size_score
# 行数效率 (权重: 10%)
# 优先选择行数较少的Cuboid
if cuboid.row_count > 0:
row_score = max(0, 1 - (cuboid.row_count / 10000000)) # 假设1000万行为基准
score += 10 * row_score
return score
def _calculate_coverage(self, cuboid: CuboidInfo, query_pattern: QueryPattern) -> Dict:
"""计算覆盖度"""
required_dims = query_pattern.dimensions | query_pattern.group_by
return {
"dimension_coverage": len(required_dims & cuboid.dimensions) / len(required_dims) if required_dims else 1.0,
"measure_coverage": len(query_pattern.measures & cuboid.measures) / len(query_pattern.measures) if query_pattern.measures else 1.0,
"exact_match": required_dims == cuboid.dimensions and query_pattern.measures == cuboid.measures
}
def recommend_optimal_cuboid(self, query_pattern: QueryPattern) -> Optional[Dict]:
"""推荐最优Cuboid"""
matching_cuboids = self.find_matching_cuboids(query_pattern)
if not matching_cuboids:
return None
# 选择最高分的Cuboid
best_match = matching_cuboids[0]
recommendation = {
"recommended_cuboid": best_match["cuboid"],
"match_score": best_match["match_score"],
"coverage": best_match["coverage"],
"alternatives": matching_cuboids[1:5], # 显示前5个备选
"optimization_suggestions": self._generate_cuboid_suggestions(query_pattern, matching_cuboids)
}
return recommendation
def _generate_cuboid_suggestions(self, query_pattern: QueryPattern, matching_cuboids: List[Dict]) -> List[str]:
"""生成Cuboid优化建议"""
suggestions = []
if not matching_cuboids:
suggestions.append("没有找到匹配的Cuboid,建议创建新的Cube")
required_dims = query_pattern.dimensions | query_pattern.group_by
suggestions.append(f"建议Cube包含维度: {', '.join(required_dims)}")
suggestions.append(f"建议Cube包含度量: {', '.join(query_pattern.measures)}")
return suggestions
best_match = matching_cuboids[0]
# 匹配度建议
if best_match["match_score"] < 80:
suggestions.append(f"最佳匹配分数较低({best_match['match_score']:.1f}),考虑优化Cube设计")
# 覆盖度建议
coverage = best_match["coverage"]
if coverage["dimension_coverage"] < 1.0:
suggestions.append("维度覆盖不完整,可能需要额外计算")
if coverage["measure_coverage"] < 1.0:
suggestions.append("度量覆盖不完整,可能需要额外计算")
# 性能建议
cuboid = best_match["cuboid"]
if cuboid.size_mb > 5000: # 5GB
suggestions.append("选中的Cuboid较大,查询可能较慢")
if cuboid.row_count > 5000000: # 500万行
suggestions.append("选中的Cuboid行数较多,考虑增加过滤条件")
return suggestions
def analyze_query_patterns(self, queries: List[QueryPattern]) -> Dict:
"""分析查询模式"""
analysis = {
"total_queries": len(queries),
"unique_dimensions": set(),
"unique_measures": set(),
"dimension_frequency": defaultdict(int),
"measure_frequency": defaultdict(int),
"common_patterns": [],
"optimization_opportunities": []
}
# 统计维度和度量使用频率
for query in queries:
all_dims = query.dimensions | query.group_by
analysis["unique_dimensions"].update(all_dims)
analysis["unique_measures"].update(query.measures)
for dim in all_dims:
analysis["dimension_frequency"][dim] += 1
for measure in query.measures:
analysis["measure_frequency"][measure] += 1
# 转换为列表以便JSON序列化
analysis["unique_dimensions"] = list(analysis["unique_dimensions"])
analysis["unique_measures"] = list(analysis["unique_measures"])
analysis["dimension_frequency"] = dict(analysis["dimension_frequency"])
analysis["measure_frequency"] = dict(analysis["measure_frequency"])
# 识别常见模式
analysis["common_patterns"] = self._identify_common_patterns(queries)
# 生成优化建议
analysis["optimization_opportunities"] = self._identify_optimization_opportunities(analysis)
return analysis
def _identify_common_patterns(self, queries: List[QueryPattern]) -> List[Dict]:
"""识别常见查询模式"""
pattern_counts = defaultdict(int)
for query in queries:
# 创建模式签名
dims_signature = tuple(sorted(query.dimensions | query.group_by))
measures_signature = tuple(sorted(query.measures))
pattern_signature = (dims_signature, measures_signature)
pattern_counts[pattern_signature] += 1
# 按频率排序
common_patterns = []
for (dims, measures), count in sorted(pattern_counts.items(), key=lambda x: x[1], reverse=True):
if count > 1: # 只显示出现多次的模式
common_patterns.append({
"dimensions": list(dims),
"measures": list(measures),
"frequency": count,
"percentage": count / len(queries) * 100
})
return common_patterns[:10] # 返回前10个常见模式
def _identify_optimization_opportunities(self, analysis: Dict) -> List[str]:
"""识别优化机会"""
opportunities = []
# 高频维度组合
high_freq_dims = [
dim for dim, freq in analysis["dimension_frequency"].items()
if freq > analysis["total_queries"] * 0.5
]
if len(high_freq_dims) > 1:
opportunities.append(f"高频维度组合建议创建专用Cuboid: {', '.join(high_freq_dims)}")
# 高频度量组合
high_freq_measures = [
measure for measure, freq in analysis["measure_frequency"].items()
if freq > analysis["total_queries"] * 0.5
]
if len(high_freq_measures) > 1:
opportunities.append(f"高频度量组合建议优化: {', '.join(high_freq_measures)}")
# 常见模式优化
if analysis["common_patterns"]:
top_pattern = analysis["common_patterns"][0]
if top_pattern["percentage"] > 30:
opportunities.append(f"最常见查询模式占比{top_pattern['percentage']:.1f}%,建议创建专门的Cuboid")
return opportunities
def generate_cuboid_recommendations(self, analysis: Dict) -> List[Dict]:
"""生成Cuboid创建建议"""
recommendations = []
# 基于常见模式的建议
for pattern in analysis["common_patterns"][:3]: # 前3个最常见的模式
if pattern["frequency"] >= 2:
recommendations.append({
"type": "PATTERN_BASED",
"priority": "HIGH" if pattern["percentage"] > 20 else "MEDIUM",
"dimensions": pattern["dimensions"],
"measures": pattern["measures"],
"reason": f"该模式出现{pattern['frequency']}次,占比{pattern['percentage']:.1f}%",
"estimated_benefit": f"可优化{pattern['percentage']:.1f}%的查询"
})
# 基于高频维度的建议
high_freq_dims = [
dim for dim, freq in analysis["dimension_frequency"].items()
if freq > analysis["total_queries"] * 0.3
]
if len(high_freq_dims) >= 2:
recommendations.append({
"type": "HIGH_FREQUENCY_DIMS",
"priority": "MEDIUM",
"dimensions": high_freq_dims,
"measures": list(analysis["unique_measures"]),
"reason": "基于高频使用的维度组合",
"estimated_benefit": "提升常用维度组合的查询性能"
})
return recommendations
def print_matching_analysis(self, query_pattern: QueryPattern):
"""打印匹配分析结果"""
recommendation = self.recommend_optimal_cuboid(query_pattern)
print(f"\n=== Cube匹配分析 ===")
print(f"查询维度: {', '.join(query_pattern.dimensions)}")
print(f"分组维度: {', '.join(query_pattern.group_by)}")
print(f"查询度量: {', '.join(query_pattern.measures)}")
if recommendation:
cuboid = recommendation["recommended_cuboid"]
print(f"\n🎯 推荐Cuboid:")
print(f" Cuboid ID: {cuboid.cuboid_id}")
print(f" 匹配分数: {recommendation['match_score']:.1f}")
print(f" 维度覆盖: {recommendation['coverage']['dimension_coverage']:.2%}")
print(f" 度量覆盖: {recommendation['coverage']['measure_coverage']:.2%}")
print(f" 存储大小: {cuboid.size_mb:.1f}MB")
print(f" 行数: {cuboid.row_count:,}")
if recommendation["alternatives"]:
print(f"\n📋 备选Cuboids:")
for i, alt in enumerate(recommendation["alternatives"], 1):
alt_cuboid = alt["cuboid"]
print(f" {i}. {alt_cuboid.cuboid_id} (分数: {alt['match_score']:.1f})")
if recommendation["optimization_suggestions"]:
print(f"\n💡 优化建议:")
for i, suggestion in enumerate(recommendation["optimization_suggestions"], 1):
print(f" {i}. {suggestion}")
else:
print(f"\n❌ 未找到匹配的Cuboid")
print(f"建议创建包含以下维度和度量的新Cube:")
all_dims = query_pattern.dimensions | query_pattern.group_by
print(f" 维度: {', '.join(all_dims)}")
print(f" 度量: {', '.join(query_pattern.measures)}")
def print_pattern_analysis(self, queries: List[QueryPattern]):
"""打印模式分析结果"""
analysis = self.analyze_query_patterns(queries)
print(f"\n=== 查询模式分析 ===")
print(f"查询总数: {analysis['total_queries']}")
print(f"唯一维度数: {len(analysis['unique_dimensions'])}")
print(f"唯一度量数: {len(analysis['unique_measures'])}")
print(f"\n📊 维度使用频率:")
sorted_dims = sorted(analysis["dimension_frequency"].items(), key=lambda x: x[1], reverse=True)
for dim, freq in sorted_dims[:10]:
percentage = freq / analysis['total_queries'] * 100
print(f" {dim}: {freq}次 ({percentage:.1f}%)")
print(f"\n📈 度量使用频率:")
sorted_measures = sorted(analysis["measure_frequency"].items(), key=lambda x: x[1], reverse=True)
for measure, freq in sorted_measures[:10]:
percentage = freq / analysis['total_queries'] * 100
print(f" {measure}: {freq}次 ({percentage:.1f}%)")
if analysis["common_patterns"]:
print(f"\n🔄 常见查询模式:")
for i, pattern in enumerate(analysis["common_patterns"], 1):
print(f" {i}. 频率: {pattern['frequency']}次 ({pattern['percentage']:.1f}%)")
print(f" 维度: {', '.join(pattern['dimensions'])}")
print(f" 度量: {', '.join(pattern['measures'])}")
if analysis["optimization_opportunities"]:
print(f"\n🚀 优化机会:")
for i, opportunity in enumerate(analysis["optimization_opportunities"], 1):
print(f" {i}. {opportunity}")
# 生成Cuboid建议
recommendations = self.generate_cuboid_recommendations(analysis)
if recommendations:
print(f"\n💎 Cuboid创建建议:")
for i, rec in enumerate(recommendations, 1):
print(f" {i}. 优先级: {rec['priority']}")
print(f" 类型: {rec['type']}")
print(f" 维度: {', '.join(rec['dimensions'])}")
print(f" 度量: {', '.join(rec['measures'])}")
print(f" 原因: {rec['reason']}")
print(f" 预期收益: {rec['estimated_benefit']}")
def main():
matcher = CubeMatcher()
# 添加示例Cuboids
cuboids = [
CuboidInfo(
cuboid_id="sales_yearly",
dimensions={"year", "product_category", "region"},
measures={"sales_amount", "order_count"},
row_count=1000,
size_mb=50.0,
build_time="2023-01-01"
),
CuboidInfo(
cuboid_id="sales_monthly",
dimensions={"year", "month", "product_category"},
measures={"sales_amount", "order_count", "avg_order_value"},
row_count=12000,
size_mb=200.0,
build_time="2023-01-01"
),
CuboidInfo(
cuboid_id="sales_daily",
dimensions={"year", "month", "day", "product_category", "region"},
measures={"sales_amount", "order_count"},
row_count=365000,
size_mb=1500.0,
build_time="2023-01-01"
)
]
for cuboid in cuboids:
matcher.add_cuboid(cuboid)
# 示例查询模式
query_pattern = QueryPattern(
dimensions={"product_category"},
measures={"sales_amount", "order_count"},
filters={"year": 2023},
group_by={"year", "product_category"},
order_by=["sales_amount"]
)
matcher.print_matching_analysis(query_pattern)
# 示例查询模式分析
sample_queries = [
QueryPattern(
dimensions={"year", "product_category"},
measures={"sales_amount"},
filters={},
group_by={"year", "product_category"},
order_by=[]
),
QueryPattern(
dimensions={"year", "month"},
measures={"sales_amount", "order_count"},
filters={},
group_by={"year", "month"},
order_by=[]
),
QueryPattern(
dimensions={"product_category"},
measures={"sales_amount"},
filters={},
group_by={"product_category"},
order_by=[]
)
]
matcher.print_pattern_analysis(sample_queries)
if __name__ == "__main__":
main()
7.2 查询性能优化
7.2.1 SQL优化技巧
查询重写优化
#!/usr/bin/env python3
# sql_optimizer.py - SQL查询优化器
import re
import sqlparse
from sqlparse import sql, tokens
from typing import List, Dict, Tuple
import logging
class SQLOptimizer:
"""SQL查询优化器"""
def __init__(self):
self.logger = logging.getLogger(__name__)
def optimize_query(self, sql_text: str) -> Dict:
"""优化SQL查询"""
optimization_result = {
"original_sql": sql_text,
"optimized_sql": sql_text,
"optimizations_applied": [],
"performance_impact": "UNKNOWN",
"recommendations": []
}
try:
# 解析SQL
parsed = sqlparse.parse(sql_text)[0]
# 应用各种优化
optimized_sql = sql_text
# 1. 优化SELECT子句
optimized_sql, select_opts = self._optimize_select_clause(optimized_sql)
optimization_result["optimizations_applied"].extend(select_opts)
# 2. 优化WHERE子句
optimized_sql, where_opts = self._optimize_where_clause(optimized_sql)
optimization_result["optimizations_applied"].extend(where_opts)
# 3. 优化JOIN操作
optimized_sql, join_opts = self._optimize_joins(optimized_sql)
optimization_result["optimizations_applied"].extend(join_opts)
# 4. 优化GROUP BY和ORDER BY
optimized_sql, group_opts = self._optimize_group_order(optimized_sql)
optimization_result["optimizations_applied"].extend(group_opts)
# 5. 优化子查询
optimized_sql, subquery_opts = self._optimize_subqueries(optimized_sql)
optimization_result["optimizations_applied"].extend(subquery_opts)
optimization_result["optimized_sql"] = optimized_sql
# 评估性能影响
optimization_result["performance_impact"] = self._estimate_performance_impact(
optimization_result["optimizations_applied"]
)
# 生成额外建议
optimization_result["recommendations"] = self._generate_recommendations(sql_text)
except Exception as e:
self.logger.error(f"SQL优化失败: {e}")
optimization_result["error"] = str(e)
return optimization_result
def _optimize_select_clause(self, sql: str) -> Tuple[str, List[str]]:
"""优化SELECT子句"""
optimizations = []
optimized_sql = sql
# 检查是否使用SELECT *
if re.search(r'\bselect\s+\*\s+from', sql, re.IGNORECASE):
optimizations.append("建议避免使用SELECT *,明确指定需要的列")
# 检查重复的列
select_match = re.search(r'select\s+(.*?)\s+from', sql, re.IGNORECASE | re.DOTALL)
if select_match:
select_clause = select_match.group(1)
columns = [col.strip() for col in select_clause.split(',')]
unique_columns = list(set(columns))
if len(columns) != len(unique_columns):
optimized_sql = sql.replace(select_clause, ', '.join(unique_columns))
optimizations.append("移除重复的SELECT列")
return optimized_sql, optimizations
def _optimize_where_clause(self, sql: str) -> Tuple[str, List[str]]:
"""优化WHERE子句"""
optimizations = []
optimized_sql = sql
# 优化日期过滤条件
# 将 YEAR(date_col) = 2023 优化为 date_col >= '2023-01-01' AND date_col < '2024-01-01'
year_pattern = r'YEAR\s*\(\s*(\w+)\s*\)\s*=\s*(\d{4})'
year_matches = re.finditer(year_pattern, sql, re.IGNORECASE)
for match in year_matches:
column = match.group(1)
year = match.group(2)
original = match.group(0)
optimized_condition = f"{column} >= '{year}-01-01' AND {column} < '{int(year)+1}-01-01'"
optimized_sql = optimized_sql.replace(original, optimized_condition)
optimizations.append(f"优化年份过滤条件: {original} -> {optimized_condition}")
# 优化IN条件(如果值过多,建议使用临时表)
in_pattern = r'\w+\s+IN\s*\([^)]+\)'
in_matches = re.finditer(in_pattern, sql, re.IGNORECASE)
for match in in_matches:
in_clause = match.group(0)
values_count = in_clause.count(',') + 1
if values_count > 100:
optimizations.append(f"IN条件包含{values_count}个值,建议使用临时表或EXISTS子查询")
# 检查是否缺少时间过滤条件
if not re.search(r'\b(date|time|timestamp)\w*\s*[><=]', sql, re.IGNORECASE):
optimizations.append("建议添加时间范围过滤条件以提高查询性能")
return optimized_sql, optimizations
def _optimize_joins(self, sql: str) -> Tuple[str, List[str]]:
"""优化JOIN操作"""
optimizations = []
optimized_sql = sql
# 检查JOIN顺序(小表在前)
join_pattern = r'\b(\w+)\s+(?:INNER\s+|LEFT\s+|RIGHT\s+)?JOIN\s+(\w+)\s+ON'
joins = re.findall(join_pattern, sql, re.IGNORECASE)
if len(joins) > 2:
optimizations.append("多表JOIN检测到,建议检查JOIN顺序,将小表放在前面")
# 检查JOIN条件
join_conditions = re.findall(r'ON\s+([^\s]+\s*=\s*[^\s]+)', sql, re.IGNORECASE)
for condition in join_conditions:
if '.' not in condition:
optimizations.append(f"JOIN条件可能缺少表别名: {condition}")
# 建议使用表别名
if len(joins) > 1 and not re.search(r'\b\w+\s+(?:AS\s+)?\w+\b', sql, re.IGNORECASE):
optimizations.append("建议为表使用别名以提高可读性和性能")
return optimized_sql, optimizations
def _optimize_group_order(self, sql: str) -> Tuple[str, List[str]]:
"""优化GROUP BY和ORDER BY"""
optimizations = []
optimized_sql = sql
# 检查GROUP BY和ORDER BY的一致性
group_by_match = re.search(r'GROUP\s+BY\s+([^\s]+(?:\s*,\s*[^\s]+)*)', sql, re.IGNORECASE)
order_by_match = re.search(r'ORDER\s+BY\s+([^\s]+(?:\s*,\s*[^\s]+)*)', sql, re.IGNORECASE)
if group_by_match and order_by_match:
group_cols = [col.strip() for col in group_by_match.group(1).split(',')]
order_cols = [col.strip().split()[0] for col in order_by_match.group(1).split(',')] # 移除ASC/DESC
# 检查ORDER BY是否包含GROUP BY的列
group_set = set(group_cols)
order_set = set(order_cols)
if not order_set.issubset(group_set) and not group_set.issubset(order_set):
optimizations.append("ORDER BY列与GROUP BY列不一致,可能影响性能")
# 检查是否对聚合结果排序
if order_by_match and re.search(r'\b(SUM|COUNT|AVG|MAX|MIN)\s*\(', sql, re.IGNORECASE):
order_clause = order_by_match.group(1)
if any(func in order_clause.upper() for func in ['SUM', 'COUNT', 'AVG', 'MAX', 'MIN']):
optimizations.append("对聚合结果排序,确保这是必要的操作")
return optimized_sql, optimizations
def _optimize_subqueries(self, sql: str) -> Tuple[str, List[str]]:
"""优化子查询"""
optimizations = []
optimized_sql = sql
# 检查相关子查询
if re.search(r'\bEXISTS\s*\(', sql, re.IGNORECASE):
optimizations.append("检测到EXISTS子查询,确认是否可以用JOIN替代")
# 检查IN子查询
if re.search(r'\bIN\s*\(\s*SELECT', sql, re.IGNORECASE):
optimizations.append("检测到IN子查询,考虑使用EXISTS或JOIN替代")
# 检查标量子查询
scalar_subquery_pattern = r'\(\s*SELECT\s+[^)]+\)\s*(?:AS\s+\w+)?'
if re.search(scalar_subquery_pattern, sql, re.IGNORECASE):
optimizations.append("检测到标量子查询,考虑使用LEFT JOIN替代")
return optimized_sql, optimizations
def _estimate_performance_impact(self, optimizations: List[str]) -> str:
"""估算性能影响"""
if not optimizations:
return "NONE"
high_impact_keywords = ['JOIN', '子查询', '时间过滤', 'SELECT *']
medium_impact_keywords = ['ORDER BY', 'GROUP BY', 'IN条件']
high_impact_count = sum(1 for opt in optimizations
if any(keyword in opt for keyword in high_impact_keywords))
medium_impact_count = sum(1 for opt in optimizations
if any(keyword in opt for keyword in medium_impact_keywords))
if high_impact_count > 0:
return "HIGH"
elif medium_impact_count > 0:
return "MEDIUM"
else:
return "LOW"
def _generate_recommendations(self, sql: str) -> List[str]:
"""生成额外的优化建议"""
recommendations = []
# 检查LIMIT使用
if not re.search(r'\bLIMIT\s+\d+', sql, re.IGNORECASE):
if re.search(r'\bORDER\s+BY\b', sql, re.IGNORECASE):
recommendations.append("使用ORDER BY时建议添加LIMIT以避免排序大量数据")
# 检查DISTINCT使用
if re.search(r'\bDISTINCT\b', sql, re.IGNORECASE):
recommendations.append("使用DISTINCT可能影响性能,确认是否真的需要去重")
# 检查UNION vs UNION ALL
if re.search(r'\bUNION\s+(?!ALL)', sql, re.IGNORECASE):
recommendations.append("考虑使用UNION ALL替代UNION以提高性能(如果不需要去重)")
# 检查LIKE模糊查询
if re.search(r"LIKE\s+['\"]%", sql, re.IGNORECASE):
recommendations.append("前缀模糊查询(LIKE '%xxx')无法使用索引,考虑全文搜索")
# 检查函数在WHERE条件中的使用
if re.search(r'WHERE\s+[^\s]*\([^)]*\)\s*[=<>]', sql, re.IGNORECASE):
recommendations.append("WHERE条件中使用函数可能无法使用索引,考虑重写查询")
return recommendations
def analyze_query_complexity(self, sql: str) -> Dict:
"""分析查询复杂度"""
complexity_analysis = {
"tables_count": 0,
"joins_count": 0,
"subqueries_count": 0,
"aggregations_count": 0,
"conditions_count": 0,
"complexity_score": 0,
"complexity_level": "SIMPLE"
}
# 统计表数量
tables = re.findall(r'\bFROM\s+(\w+)|\bJOIN\s+(\w+)', sql, re.IGNORECASE)
complexity_analysis["tables_count"] = len([t for sublist in tables for t in sublist if t])
# 统计JOIN数量
joins = re.findall(r'\bJOIN\b', sql, re.IGNORECASE)
complexity_analysis["joins_count"] = len(joins)
# 统计子查询数量
subqueries = re.findall(r'\(\s*SELECT\b', sql, re.IGNORECASE)
complexity_analysis["subqueries_count"] = len(subqueries)
# 统计聚合函数数量
aggregations = re.findall(r'\b(SUM|COUNT|AVG|MAX|MIN|GROUP\s+BY)\b', sql, re.IGNORECASE)
complexity_analysis["aggregations_count"] = len(aggregations)
# 统计WHERE条件数量
where_match = re.search(r'WHERE\s+(.*?)(?:\s+GROUP\s+BY|\s+ORDER\s+BY|\s+LIMIT|$)', sql, re.IGNORECASE | re.DOTALL)
if where_match:
where_clause = where_match.group(1)
conditions = re.findall(r'\b\w+\s*[=<>!]+|\bLIKE\b|\bIN\b|\bBETWEEN\b', where_clause, re.IGNORECASE)
complexity_analysis["conditions_count"] = len(conditions)
# 计算复杂度分数
score = 0
score += complexity_analysis["tables_count"] * 5
score += complexity_analysis["joins_count"] * 10
score += complexity_analysis["subqueries_count"] * 15
score += complexity_analysis["aggregations_count"] * 3
score += complexity_analysis["conditions_count"] * 2
complexity_analysis["complexity_score"] = score
# 确定复杂度级别
if score < 20:
complexity_analysis["complexity_level"] = "SIMPLE"
elif score < 50:
complexity_analysis["complexity_level"] = "MEDIUM"
elif score < 100:
complexity_analysis["complexity_level"] = "COMPLEX"
else:
complexity_analysis["complexity_level"] = "VERY_COMPLEX"
return complexity_analysis
def suggest_index_optimization(self, sql: str) -> List[Dict]:
"""建议索引优化"""
index_suggestions = []
# 分析WHERE条件中的列
where_match = re.search(r'WHERE\s+(.*?)(?:\s+GROUP\s+BY|\s+ORDER\s+BY|\s+LIMIT|$)', sql, re.IGNORECASE | re.DOTALL)
if where_match:
where_clause = where_match.group(1)
# 等值条件的列
eq_columns = re.findall(r'(\w+)\s*=', where_clause, re.IGNORECASE)
for col in eq_columns:
index_suggestions.append({
"type": "EQUALITY_INDEX",
"column": col,
"priority": "HIGH",
"reason": "等值查询条件,建议创建单列索引"
})
# 范围条件的列
range_columns = re.findall(r'(\w+)\s*[<>]=?', where_clause, re.IGNORECASE)
for col in range_columns:
index_suggestions.append({
"type": "RANGE_INDEX",
"column": col,
"priority": "MEDIUM",
"reason": "范围查询条件,建议创建单列索引"
})
# 分析JOIN条件中的列
join_conditions = re.findall(r'ON\s+(\w+)\s*=\s*(\w+)', sql, re.IGNORECASE)
for left_col, right_col in join_conditions:
index_suggestions.extend([
{
"type": "JOIN_INDEX",
"column": left_col,
"priority": "HIGH",
"reason": "JOIN条件,建议创建索引"
},
{
"type": "JOIN_INDEX",
"column": right_col,
"priority": "HIGH",
"reason": "JOIN条件,建议创建索引"
}
])
# 分析ORDER BY中的列
order_match = re.search(r'ORDER\s+BY\s+([^\s]+(?:\s*,\s*[^\s]+)*)', sql, re.IGNORECASE)
if order_match:
order_columns = [col.strip().split()[0] for col in order_match.group(1).split(',')]
for col in order_columns:
index_suggestions.append({
"type": "ORDER_INDEX",
"column": col,
"priority": "MEDIUM",
"reason": "ORDER BY条件,建议创建索引"
})
# 去重并合并相同列的建议
column_suggestions = {}
for suggestion in index_suggestions:
col = suggestion["column"]
if col not in column_suggestions:
column_suggestions[col] = suggestion
else:
# 合并原因
existing = column_suggestions[col]
existing["reason"] += f"; {suggestion['reason']}"
# 提升优先级
if suggestion["priority"] == "HIGH":
existing["priority"] = "HIGH"
return list(column_suggestions.values())
def print_optimization_result(self, sql: str):
"""打印优化结果"""
result = self.optimize_query(sql)
print(f"\n=== SQL优化分析 ===")
if "error" in result:
print(f"❌ 优化失败: {result['error']}")
return
print(f"\n📊 复杂度分析:")
complexity = self.analyze_query_complexity(sql)
print(f" 复杂度等级: {complexity['complexity_level']}")
print(f" 复杂度分数: {complexity['complexity_score']}")
print(f" 表数量: {complexity['tables_count']}")
print(f" JOIN数量: {complexity['joins_count']}")
print(f" 子查询数量: {complexity['subqueries_count']}")
print(f" 聚合操作数量: {complexity['aggregations_count']}")
print(f" WHERE条件数量: {complexity['conditions_count']}")
print(f"\n🔧 应用的优化:")
if result["optimizations_applied"]:
for i, opt in enumerate(result["optimizations_applied"], 1):
print(f" {i}. {opt}")
else:
print(f" 无需优化")
print(f"\n📈 性能影响: {result['performance_impact']}")
if result["recommendations"]:
print(f"\n💡 额外建议:")
for i, rec in enumerate(result["recommendations"], 1):
print(f" {i}. {rec}")
# 索引建议
index_suggestions = self.suggest_index_optimization(sql)
if index_suggestions:
print(f"\n🗂️ 索引建议:")
for i, suggestion in enumerate(index_suggestions, 1):
print(f" {i}. 列: {suggestion['column']}")
print(f" 类型: {suggestion['type']}")
print(f" 优先级: {suggestion['priority']}")
print(f" 原因: {suggestion['reason']}")
# 显示优化后的SQL(如果有变化)
if result["original_sql"] != result["optimized_sql"]:
print(f"\n📝 优化后的SQL:")
print(result["optimized_sql"])
def main():
optimizer = SQLOptimizer()
# 示例SQL
sample_sql = """
SELECT *
FROM sales_fact f, date_dim d, product_dim p
WHERE YEAR(f.sale_date) = 2023
AND f.date_key = d.date_key
AND f.product_key = p.product_key
AND p.category IN ('Electronics', 'Clothing', 'Books', 'Sports')
ORDER BY f.sales_amount DESC
"""
optimizer.print_optimization_result(sample_sql)
if __name__ == "__main__":
main()
7.2.2 缓存优化策略
查询结果缓存
#!/usr/bin/env python3
# cache_optimizer.py - 缓存优化器
import hashlib
import json
import time
import logging
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict
from collections import defaultdict
import threading
@dataclass
class CacheEntry:
"""缓存条目"""
key: str
sql: str
result: any
created_time: datetime
last_accessed: datetime
access_count: int
size_bytes: int
ttl_seconds: int
def is_expired(self) -> bool:
"""检查是否过期"""
if self.ttl_seconds <= 0:
return False
return datetime.now() > self.created_time + timedelta(seconds=self.ttl_seconds)
def is_stale(self, max_age_seconds: int) -> bool:
"""检查是否陈旧"""
return datetime.now() > self.last_accessed + timedelta(seconds=max_age_seconds)
class CacheOptimizer:
"""缓存优化器"""
def __init__(self, max_size_mb: int = 1024, default_ttl: int = 3600):
self.max_size_bytes = max_size_mb * 1024 * 1024
self.default_ttl = default_ttl
self.cache = {}
self.cache_stats = {
"hits": 0,
"misses": 0,
"evictions": 0,
"size_bytes": 0
}
self.lock = threading.RLock()
self.logger = logging.getLogger(__name__)
def generate_cache_key(self, sql: str, parameters: Dict = None) -> str:
"""生成缓存键"""
# 标准化SQL
normalized_sql = self._normalize_sql(sql)
# 包含参数
cache_data = {
"sql": normalized_sql,
"parameters": parameters or {}
}
# 生成哈希
cache_str = json.dumps(cache_data, sort_keys=True)
return hashlib.md5(cache_str.encode()).hexdigest()
def _normalize_sql(self, sql: str) -> str:
"""标准化SQL"""
# 移除多余空格和换行
normalized = ' '.join(sql.split())
# 转换为小写(保留字符串常量)
# 这里简化处理,实际应该更复杂
return normalized.lower().strip()
def get(self, sql: str, parameters: Dict = None) -> Optional[any]:
"""获取缓存结果"""
cache_key = self.generate_cache_key(sql, parameters)
with self.lock:
if cache_key in self.cache:
entry = self.cache[cache_key]
# 检查是否过期
if entry.is_expired():
self._remove_entry(cache_key)
self.cache_stats["misses"] += 1
return None
# 更新访问信息
entry.last_accessed = datetime.now()
entry.access_count += 1
self.cache_stats["hits"] += 1
self.logger.debug(f"缓存命中: {cache_key}")
return entry.result
else:
self.cache_stats["misses"] += 1
return None
def put(self, sql: str, result: any, parameters: Dict = None, ttl: int = None) -> bool:
"""存储缓存结果"""
cache_key = self.generate_cache_key(sql, parameters)
# 计算结果大小
result_size = self._calculate_size(result)
# 检查是否超过最大缓存大小
if result_size > self.max_size_bytes:
self.logger.warning(f"结果太大,无法缓存: {result_size} bytes")
return False
with self.lock:
# 确保有足够空间
self._ensure_space(result_size)
# 创建缓存条目
entry = CacheEntry(
key=cache_key,
sql=sql,
result=result,
created_time=datetime.now(),
last_accessed=datetime.now(),
access_count=1,
size_bytes=result_size,
ttl_seconds=ttl or self.default_ttl
)
# 如果已存在,先移除旧的
if cache_key in self.cache:
self._remove_entry(cache_key)
# 添加新条目
self.cache[cache_key] = entry
self.cache_stats["size_bytes"] += result_size
self.logger.debug(f"缓存存储: {cache_key}, 大小: {result_size} bytes")
return True
def _calculate_size(self, obj: any) -> int:
"""计算对象大小(简化实现)"""
try:
return len(json.dumps(obj, default=str).encode())
except:
# 如果无法序列化,估算大小
return 1024 # 默认1KB
def _ensure_space(self, required_bytes: int):
"""确保有足够的缓存空间"""
while (self.cache_stats["size_bytes"] + required_bytes) > self.max_size_bytes:
if not self._evict_one():
break
def _evict_one(self) -> bool:
"""驱逐一个缓存条目"""
if not self.cache:
return False
# LRU策略:移除最久未访问的条目
oldest_key = min(self.cache.keys(),
key=lambda k: self.cache[k].last_accessed)
self._remove_entry(oldest_key)
self.cache_stats["evictions"] += 1
return True
def _remove_entry(self, cache_key: str):
"""移除缓存条目"""
if cache_key in self.cache:
entry = self.cache[cache_key]
self.cache_stats["size_bytes"] -= entry.size_bytes
del self.cache[cache_key]
def clear_expired(self) -> int:
"""清理过期缓存"""
expired_keys = []
with self.lock:
for key, entry in self.cache.items():
if entry.is_expired():
expired_keys.append(key)
for key in expired_keys:
self._remove_entry(key)
self.logger.info(f"清理过期缓存: {len(expired_keys)}个条目")
return len(expired_keys)
def clear_stale(self, max_age_seconds: int = 7200) -> int:
"""清理陈旧缓存"""
stale_keys = []
with self.lock:
for key, entry in self.cache.items():
if entry.is_stale(max_age_seconds):
stale_keys.append(key)
for key in stale_keys:
self._remove_entry(key)
self.logger.info(f"清理陈旧缓存: {len(stale_keys)}个条目")
return len(stale_keys)
def get_cache_stats(self) -> Dict:
"""获取缓存统计"""
with self.lock:
total_requests = self.cache_stats["hits"] + self.cache_stats["misses"]
hit_rate = self.cache_stats["hits"] / total_requests if total_requests > 0 else 0
return {
"total_entries": len(self.cache),
"total_size_mb": self.cache_stats["size_bytes"] / (1024 * 1024),
"max_size_mb": self.max_size_bytes / (1024 * 1024),
"usage_percentage": (self.cache_stats["size_bytes"] / self.max_size_bytes) * 100,
"hits": self.cache_stats["hits"],
"misses": self.cache_stats["misses"],
"hit_rate": hit_rate,
"evictions": self.cache_stats["evictions"]
}
def get_top_queries(self, limit: int = 10) -> List[Dict]:
"""获取热门查询"""
with self.lock:
sorted_entries = sorted(
self.cache.values(),
key=lambda e: e.access_count,
reverse=True
)
return [
{
"sql": entry.sql[:100] + "..." if len(entry.sql) > 100 else entry.sql,
"access_count": entry.access_count,
"size_mb": entry.size_bytes / (1024 * 1024),
"created_time": entry.created_time.isoformat(),
"last_accessed": entry.last_accessed.isoformat()
}
for entry in sorted_entries[:limit]
]
def analyze_cache_efficiency(self) -> Dict:
"""分析缓存效率"""
stats = self.get_cache_stats()
analysis = {
"efficiency_score": 0,
"efficiency_level": "POOR",
"recommendations": []
}
# 计算效率分数
hit_rate_score = stats["hit_rate"] * 40 # 命中率权重40%
usage_score = min(stats["usage_percentage"] / 80, 1) * 30 # 使用率权重30%(80%为最优)
eviction_score = max(0, 1 - (stats["evictions"] / max(stats["hits"] + stats["misses"], 1))) * 30 # 驱逐率权重30%
analysis["efficiency_score"] = hit_rate_score + usage_score + eviction_score
# 确定效率级别
if analysis["efficiency_score"] >= 80:
analysis["efficiency_level"] = "EXCELLENT"
elif analysis["efficiency_score"] >= 60:
analysis["efficiency_level"] = "GOOD"
elif analysis["efficiency_score"] >= 40:
analysis["efficiency_level"] = "FAIR"
else:
analysis["efficiency_level"] = "POOR"
# 生成建议
if stats["hit_rate"] < 0.5:
analysis["recommendations"].append("缓存命中率较低,检查查询模式和TTL设置")
if stats["usage_percentage"] < 50:
analysis["recommendations"].append("缓存使用率较低,可以增加缓存大小或调整TTL")
if stats["usage_percentage"] > 90:
analysis["recommendations"].append("缓存使用率过高,建议增加缓存大小")
if stats["evictions"] > (stats["hits"] + stats["misses"]) * 0.1:
analysis["recommendations"].append("缓存驱逐频繁,建议增加缓存大小或优化查询")
return analysis
def optimize_cache_settings(self) -> Dict:
"""优化缓存设置"""
stats = self.get_cache_stats()
recommendations = {
"current_settings": {
"max_size_mb": self.max_size_bytes / (1024 * 1024),
"default_ttl": self.default_ttl
},
"recommended_settings": {},
"reasoning": []
}
# 推荐缓存大小
if stats["usage_percentage"] > 90 and stats["evictions"] > 0:
recommended_size = self.max_size_bytes * 1.5 / (1024 * 1024)
recommendations["recommended_settings"]["max_size_mb"] = recommended_size
recommendations["reasoning"].append(f"缓存使用率过高({stats['usage_percentage']:.1f}%),建议增加到{recommended_size:.0f}MB")
elif stats["usage_percentage"] < 30:
recommended_size = self.max_size_bytes * 0.7 / (1024 * 1024)
recommendations["recommended_settings"]["max_size_mb"] = recommended_size
recommendations["reasoning"].append(f"缓存使用率较低({stats['usage_percentage']:.1f}%),可以减少到{recommended_size:.0f}MB")
# 推荐TTL设置
if stats["hit_rate"] < 0.3:
recommended_ttl = self.default_ttl * 2
recommendations["recommended_settings"]["default_ttl"] = recommended_ttl
recommendations["reasoning"].append(f"命中率较低({stats['hit_rate']:.2%}),建议增加TTL到{recommended_ttl}秒")
elif stats["hit_rate"] > 0.8 and stats["usage_percentage"] > 80:
recommended_ttl = self.default_ttl // 2
recommendations["recommended_settings"]["default_ttl"] = recommended_ttl
recommendations["reasoning"].append(f"命中率很高({stats['hit_rate']:.2%})但空间紧张,可以减少TTL到{recommended_ttl}秒")
return recommendations
def print_cache_report(self):
"""打印缓存报告"""
stats = self.get_cache_stats()
efficiency = self.analyze_cache_efficiency()
optimization = self.optimize_cache_settings()
print(f"\n=== 缓存性能报告 ===")
print(f"报告时间: {datetime.now().isoformat()}")
print(f"\n📊 基础统计:")
print(f" 缓存条目数: {stats['total_entries']}")
print(f" 总大小: {stats['total_size_mb']:.2f}MB / {stats['max_size_mb']:.0f}MB")
print(f" 使用率: {stats['usage_percentage']:.1f}%")
print(f" 命中次数: {stats['hits']}")
print(f" 未命中次数: {stats['misses']}")
print(f" 命中率: {stats['hit_rate']:.2%}")
print(f" 驱逐次数: {stats['evictions']}")
print(f"\n🎯 效率分析:")
print(f" 效率分数: {efficiency['efficiency_score']:.1f}/100")
print(f" 效率级别: {efficiency['efficiency_level']}")
if efficiency["recommendations"]:
print(f"\n💡 效率建议:")
for i, rec in enumerate(efficiency["recommendations"], 1):
print(f" {i}. {rec}")
print(f"\n⚙️ 当前设置:")
current = optimization["current_settings"]
print(f" 最大缓存大小: {current['max_size_mb']:.0f}MB")
print(f" 默认TTL: {current['default_ttl']}秒")
if optimization["recommended_settings"]:
print(f"\n🔧 推荐设置:")
recommended = optimization["recommended_settings"]
if "max_size_mb" in recommended:
print(f" 推荐缓存大小: {recommended['max_size_mb']:.0f}MB")
if "default_ttl" in recommended:
print(f" 推荐TTL: {recommended['default_ttl']}秒")
print(f"\n📝 调整原因:")
for i, reason in enumerate(optimization["reasoning"], 1):
print(f" {i}. {reason}")
# 热门查询
top_queries = self.get_top_queries(5)
if top_queries:
print(f"\n🔥 热门查询TOP5:")
for i, query in enumerate(top_queries, 1):
print(f" {i}. 访问{query['access_count']}次, {query['size_mb']:.2f}MB")
print(f" SQL: {query['sql']}")
def main():
# 创建缓存优化器
cache_optimizer = CacheOptimizer(max_size_mb=512, default_ttl=1800)
# 模拟一些查询和缓存操作
sample_queries = [
"SELECT SUM(sales_amount) FROM sales_fact WHERE year = 2023",
"SELECT COUNT(*) FROM customer_dim WHERE region = 'North'",
"SELECT AVG(order_value) FROM sales_fact WHERE month = 12",
"SELECT SUM(sales_amount) FROM sales_fact WHERE year = 2023", # 重复查询
"SELECT COUNT(*) FROM customer_dim WHERE region = 'North'" # 重复查询
]
# 模拟查询结果
sample_results = [
{"total_sales": 1000000},
{"customer_count": 5000},
{"avg_order": 150.50},
{"total_sales": 1000000},
{"customer_count": 5000}
]
print("模拟缓存操作...")
for i, (sql, result) in enumerate(zip(sample_queries, sample_results)):
print(f"\n查询 {i+1}: {sql[:50]}...")
# 尝试从缓存获取
cached_result = cache_optimizer.get(sql)
if cached_result:
print(f" ✅ 缓存命中")
else:
print(f" ❌ 缓存未命中,执行查询并缓存结果")
cache_optimizer.put(sql, result)
# 短暂延迟
time.sleep(0.1)
# 打印缓存报告
cache_optimizer.print_cache_report()
if __name__ == "__main__":
main()