5.1 维度建模理论基础
5.1.1 维度建模概述
维度建模是数据仓库设计的核心方法论,由Ralph Kimball提出。它将复杂的业务数据组织成易于理解和查询的结构。
核心概念:
事实表(Fact Table):
- 存储业务过程的度量数据
- 包含外键指向维度表
- 通常包含数值型度量字段
- 数据量大,更新频繁
维度表(Dimension Table):
- 存储描述性信息
- 提供查询的上下文
- 包含层次结构信息
- 数据量相对较小,更新较少
度量(Measure):
- 可聚合的数值型数据
- 如销售额、数量、利润等
- 支持SUM、COUNT、AVG等聚合函数
维度(Dimension):
- 查询的分析角度
- 如时间、地区、产品、客户等
- 提供数据的分组和过滤条件
5.1.2 星型模式(Star Schema)
星型模式是最简单和最常用的维度建模方式:
特点: - 中心有一个事实表 - 周围环绕多个维度表 - 维度表直接连接到事实表 - 结构简单,查询性能好
示例:销售数据星型模式
-- 事实表:销售事实
CREATE TABLE sales_fact (
sale_id BIGINT,
date_key INT,
product_key INT,
customer_key INT,
store_key INT,
sales_amount DECIMAL(10,2),
quantity INT,
discount_amount DECIMAL(10,2),
profit DECIMAL(10,2)
);
-- 维度表:日期维度
CREATE TABLE date_dim (
date_key INT PRIMARY KEY,
full_date DATE,
year INT,
quarter INT,
month INT,
week INT,
day_of_week INT,
day_name VARCHAR(10),
month_name VARCHAR(10),
is_weekend BOOLEAN,
is_holiday BOOLEAN
);
-- 维度表:产品维度
CREATE TABLE product_dim (
product_key INT PRIMARY KEY,
product_id VARCHAR(50),
product_name VARCHAR(200),
category VARCHAR(100),
subcategory VARCHAR(100),
brand VARCHAR(100),
unit_price DECIMAL(10,2),
product_status VARCHAR(20)
);
-- 维度表:客户维度
CREATE TABLE customer_dim (
customer_key INT PRIMARY KEY,
customer_id VARCHAR(50),
customer_name VARCHAR(200),
gender VARCHAR(10),
age_group VARCHAR(20),
city VARCHAR(100),
state VARCHAR(100),
country VARCHAR(100),
customer_segment VARCHAR(50)
);
-- 维度表:门店维度
CREATE TABLE store_dim (
store_key INT PRIMARY KEY,
store_id VARCHAR(50),
store_name VARCHAR(200),
store_type VARCHAR(50),
city VARCHAR(100),
state VARCHAR(100),
region VARCHAR(100),
manager VARCHAR(100),
open_date DATE
);
5.1.3 雪花模式(Snowflake Schema)
雪花模式是星型模式的规范化版本:
特点: - 维度表进一步规范化 - 减少数据冗余 - 增加了表的数量和复杂性 - 查询性能相对较低
示例:产品维度雪花化
-- 产品维度表(雪花化)
CREATE TABLE product_dim (
product_key INT PRIMARY KEY,
product_id VARCHAR(50),
product_name VARCHAR(200),
subcategory_key INT,
brand_key INT,
unit_price DECIMAL(10,2),
product_status VARCHAR(20)
);
-- 子类别维度表
CREATE TABLE subcategory_dim (
subcategory_key INT PRIMARY KEY,
subcategory_name VARCHAR(100),
category_key INT
);
-- 类别维度表
CREATE TABLE category_dim (
category_key INT PRIMARY KEY,
category_name VARCHAR(100),
department_key INT
);
-- 部门维度表
CREATE TABLE department_dim (
department_key INT PRIMARY KEY,
department_name VARCHAR(100)
);
-- 品牌维度表
CREATE TABLE brand_dim (
brand_key INT PRIMARY KEY,
brand_name VARCHAR(100),
brand_country VARCHAR(100)
);
5.1.4 维度建模最佳实践
1. 选择合适的粒度
#!/usr/bin/env python3
# granularity_analyzer.py - 粒度分析工具
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
class GranularityAnalyzer:
def __init__(self):
self.granularity_levels = {
'time': ['year', 'quarter', 'month', 'week', 'day', 'hour', 'minute'],
'geography': ['country', 'region', 'state', 'city', 'store'],
'product': ['department', 'category', 'subcategory', 'product', 'sku'],
'customer': ['segment', 'group', 'individual']
}
def analyze_data_volume(self, fact_table_size, dimension_counts):
"""分析不同粒度下的数据量"""
analysis_result = {
'current_size': fact_table_size,
'granularity_impact': {},
'recommendations': []
}
# 计算不同粒度组合的数据量
base_combinations = [
('day', 'store', 'product'),
('day', 'city', 'product'),
('week', 'store', 'product'),
('month', 'store', 'category'),
('quarter', 'region', 'category')
]
for time_grain, geo_grain, prod_grain in base_combinations:
estimated_size = self.estimate_fact_table_size(
dimension_counts, time_grain, geo_grain, prod_grain
)
combination_key = f"{time_grain}_{geo_grain}_{prod_grain}"
analysis_result['granularity_impact'][combination_key] = {
'estimated_size': estimated_size,
'size_ratio': estimated_size / fact_table_size if fact_table_size > 0 else 0,
'query_performance': self.estimate_query_performance(estimated_size),
'storage_cost': self.estimate_storage_cost(estimated_size)
}
# 生成建议
analysis_result['recommendations'] = self.generate_granularity_recommendations(
analysis_result['granularity_impact']
)
return analysis_result
def estimate_fact_table_size(self, dimension_counts, time_grain, geo_grain, prod_grain):
"""估算事实表大小"""
time_factor = {
'year': 1, 'quarter': 4, 'month': 12, 'week': 52,
'day': 365, 'hour': 8760, 'minute': 525600
}
geo_factor = dimension_counts.get('geography', {}).get(geo_grain, 1)
prod_factor = dimension_counts.get('product', {}).get(prod_grain, 1)
time_records = time_factor.get(time_grain, 365)
# 估算记录数(考虑稀疏性)
sparsity_factor = 0.7 # 假设70%的组合有数据
estimated_records = time_records * geo_factor * prod_factor * sparsity_factor
# 估算存储大小(假设每条记录平均200字节)
estimated_size_mb = estimated_records * 200 / (1024 * 1024)
return estimated_size_mb
def estimate_query_performance(self, table_size_mb):
"""估算查询性能"""
if table_size_mb < 100:
return 'excellent'
elif table_size_mb < 1000:
return 'good'
elif table_size_mb < 10000:
return 'fair'
else:
return 'poor'
def estimate_storage_cost(self, table_size_mb):
"""估算存储成本"""
# 假设每GB存储成本0.1美元/月
cost_per_gb_month = 0.1
size_gb = table_size_mb / 1024
monthly_cost = size_gb * cost_per_gb_month
return {
'size_gb': round(size_gb, 2),
'monthly_cost_usd': round(monthly_cost, 2)
}
def generate_granularity_recommendations(self, granularity_impact):
"""生成粒度建议"""
recommendations = []
# 按查询性能排序
sorted_options = sorted(
granularity_impact.items(),
key=lambda x: (x[1]['query_performance'], x[1]['estimated_size'])
)
best_option = sorted_options[0]
recommendations.append(
f"推荐粒度组合: {best_option[0]} (查询性能: {best_option[1]['query_performance']}, "
f"存储大小: {best_option[1]['storage_cost']['size_gb']}GB)"
)
# 检查是否有性能问题
for option_name, metrics in granularity_impact.items():
if metrics['query_performance'] == 'poor':
recommendations.append(
f"警告: {option_name} 粒度组合可能导致查询性能问题,建议考虑聚合"
)
if metrics['storage_cost']['monthly_cost_usd'] > 100:
recommendations.append(
f"注意: {option_name} 粒度组合存储成本较高 "
f"(${metrics['storage_cost']['monthly_cost_usd']}/月)"
)
return recommendations
def print_analysis_report(self, analysis_result):
"""打印分析报告"""
print("\n=== 粒度分析报告 ===")
print(f"当前事实表大小: {analysis_result['current_size']:.2f} MB")
print("\n📊 不同粒度组合分析:")
for option_name, metrics in analysis_result['granularity_impact'].items():
print(f"\n{option_name}:")
print(f" 估算大小: {metrics['estimated_size']:.2f} MB")
print(f" 大小比例: {metrics['size_ratio']:.2f}x")
print(f" 查询性能: {metrics['query_performance']}")
print(f" 存储成本: ${metrics['storage_cost']['monthly_cost_usd']}/月 "
f"({metrics['storage_cost']['size_gb']}GB)")
print("\n💡 建议:")
for i, recommendation in enumerate(analysis_result['recommendations'], 1):
print(f" {i}. {recommendation}")
def main():
# 示例使用
analyzer = GranularityAnalyzer()
# 示例维度计数
dimension_counts = {
'geography': {
'country': 5,
'region': 20,
'state': 50,
'city': 500,
'store': 2000
},
'product': {
'department': 10,
'category': 50,
'subcategory': 200,
'product': 5000,
'sku': 50000
}
}
# 当前事实表大小(MB)
current_fact_size = 1000
# 执行分析
result = analyzer.analyze_data_volume(current_fact_size, dimension_counts)
analyzer.print_analysis_report(result)
if __name__ == "__main__":
main()
5.4 模型管理与优化
5.4.1 模型生命周期管理
1. 模型版本控制
#!/usr/bin/env python3
# model_lifecycle_manager.py - 模型生命周期管理工具
import json
import os
import shutil
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from enum import Enum
class ModelStatus(Enum):
"""模型状态"""
DRAFT = "draft" # 草稿
DEVELOPMENT = "development" # 开发中
TESTING = "testing" # 测试中
PRODUCTION = "production" # 生产环境
DEPRECATED = "deprecated" # 已废弃
ARCHIVED = "archived" # 已归档
@dataclass
class ModelVersion:
"""模型版本信息"""
version: str
status: ModelStatus
created_by: str
created_at: datetime
description: str
changes: List[str]
config_file: str
backup_file: Optional[str] = None
class ModelLifecycleManager:
"""模型生命周期管理器"""
def __init__(self, workspace_dir: str):
self.workspace_dir = workspace_dir
self.models_dir = os.path.join(workspace_dir, "models")
self.versions_dir = os.path.join(workspace_dir, "versions")
self.backups_dir = os.path.join(workspace_dir, "backups")
# 创建目录结构
for directory in [self.models_dir, self.versions_dir, self.backups_dir]:
os.makedirs(directory, exist_ok=True)
def create_model_version(self, model_name: str, config: Dict,
version: str, description: str,
created_by: str, changes: List[str]) -> ModelVersion:
"""创建模型版本"""
# 生成配置文件路径
config_filename = f"{model_name}_v{version}.json"
config_file = os.path.join(self.models_dir, config_filename)
# 保存配置文件
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False, default=str)
# 创建版本信息
model_version = ModelVersion(
version=version,
status=ModelStatus.DRAFT,
created_by=created_by,
created_at=datetime.now(),
description=description,
changes=changes,
config_file=config_file
)
# 保存版本信息
self._save_version_info(model_name, model_version)
print(f"✅ 模型版本创建成功: {model_name} v{version}")
return model_version
def update_model_status(self, model_name: str, version: str,
new_status: ModelStatus, updated_by: str) -> bool:
"""更新模型状态"""
version_info = self._load_version_info(model_name, version)
if not version_info:
print(f"❌ 模型版本不存在: {model_name} v{version}")
return False
# 状态转换验证
if not self._validate_status_transition(version_info.status, new_status):
print(f"❌ 无效的状态转换: {version_info.status.value} -> {new_status.value}")
return False
# 更新状态
version_info.status = new_status
# 如果是生产环境,创建备份
if new_status == ModelStatus.PRODUCTION:
backup_file = self._create_backup(model_name, version)
version_info.backup_file = backup_file
# 保存更新后的版本信息
self._save_version_info(model_name, version_info)
print(f"✅ 模型状态更新成功: {model_name} v{version} -> {new_status.value}")
return True
def list_model_versions(self, model_name: str) -> List[ModelVersion]:
"""列出模型的所有版本"""
versions = []
version_pattern = f"{model_name}_v"
for filename in os.listdir(self.versions_dir):
if filename.startswith(version_pattern) and filename.endswith('.json'):
version_file = os.path.join(self.versions_dir, filename)
with open(version_file, 'r', encoding='utf-8') as f:
version_data = json.load(f)
version_info = ModelVersion(
version=version_data['version'],
status=ModelStatus(version_data['status']),
created_by=version_data['created_by'],
created_at=datetime.fromisoformat(version_data['created_at']),
description=version_data['description'],
changes=version_data['changes'],
config_file=version_data['config_file'],
backup_file=version_data.get('backup_file')
)
versions.append(version_info)
# 按版本号排序
versions.sort(key=lambda x: x.version, reverse=True)
return versions
def get_production_version(self, model_name: str) -> Optional[ModelVersion]:
"""获取生产环境版本"""
versions = self.list_model_versions(model_name)
for version in versions:
if version.status == ModelStatus.PRODUCTION:
return version
return None
def rollback_model(self, model_name: str, target_version: str,
rollback_by: str) -> bool:
"""回滚模型到指定版本"""
# 获取当前生产版本
current_prod = self.get_production_version(model_name)
if not current_prod:
print(f"❌ 没有找到生产环境版本: {model_name}")
return False
# 获取目标版本
target_version_info = self._load_version_info(model_name, target_version)
if not target_version_info:
print(f"❌ 目标版本不存在: {model_name} v{target_version}")
return False
# 验证目标版本状态
if target_version_info.status not in [ModelStatus.PRODUCTION, ModelStatus.TESTING]:
print(f"❌ 目标版本状态不允许回滚: {target_version_info.status.value}")
return False
try:
# 将当前生产版本标记为已废弃
self.update_model_status(model_name, current_prod.version,
ModelStatus.DEPRECATED, rollback_by)
# 将目标版本设置为生产环境
self.update_model_status(model_name, target_version,
ModelStatus.PRODUCTION, rollback_by)
print(f"✅ 模型回滚成功: {model_name} {current_prod.version} -> {target_version}")
return True
except Exception as e:
print(f"❌ 模型回滚失败: {str(e)}")
return False
def archive_old_versions(self, model_name: str, keep_count: int = 5) -> int:
"""归档旧版本"""
versions = self.list_model_versions(model_name)
# 过滤出可以归档的版本(非生产环境且非最新的几个版本)
archivable_versions = []
for i, version in enumerate(versions):
if (version.status not in [ModelStatus.PRODUCTION, ModelStatus.TESTING] and
i >= keep_count):
archivable_versions.append(version)
archived_count = 0
for version in archivable_versions:
if self.update_model_status(model_name, version.version,
ModelStatus.ARCHIVED, "system"):
archived_count += 1
print(f"✅ 已归档 {archived_count} 个旧版本")
return archived_count
def _save_version_info(self, model_name: str, version_info: ModelVersion):
"""保存版本信息"""
version_filename = f"{model_name}_v{version_info.version}.json"
version_file = os.path.join(self.versions_dir, version_filename)
version_data = asdict(version_info)
version_data['status'] = version_info.status.value
version_data['created_at'] = version_info.created_at.isoformat()
with open(version_file, 'w', encoding='utf-8') as f:
json.dump(version_data, f, indent=2, ensure_ascii=False)
def _load_version_info(self, model_name: str, version: str) -> Optional[ModelVersion]:
"""加载版本信息"""
version_filename = f"{model_name}_v{version}.json"
version_file = os.path.join(self.versions_dir, version_filename)
if not os.path.exists(version_file):
return None
with open(version_file, 'r', encoding='utf-8') as f:
version_data = json.load(f)
return ModelVersion(
version=version_data['version'],
status=ModelStatus(version_data['status']),
created_by=version_data['created_by'],
created_at=datetime.fromisoformat(version_data['created_at']),
description=version_data['description'],
changes=version_data['changes'],
config_file=version_data['config_file'],
backup_file=version_data.get('backup_file')
)
def _validate_status_transition(self, current_status: ModelStatus,
new_status: ModelStatus) -> bool:
"""验证状态转换是否有效"""
valid_transitions = {
ModelStatus.DRAFT: [ModelStatus.DEVELOPMENT, ModelStatus.ARCHIVED],
ModelStatus.DEVELOPMENT: [ModelStatus.TESTING, ModelStatus.DRAFT, ModelStatus.ARCHIVED],
ModelStatus.TESTING: [ModelStatus.PRODUCTION, ModelStatus.DEVELOPMENT, ModelStatus.ARCHIVED],
ModelStatus.PRODUCTION: [ModelStatus.DEPRECATED],
ModelStatus.DEPRECATED: [ModelStatus.ARCHIVED],
ModelStatus.ARCHIVED: [] # 归档状态不能转换到其他状态
}
return new_status in valid_transitions.get(current_status, [])
def _create_backup(self, model_name: str, version: str) -> str:
"""创建备份文件"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_filename = f"{model_name}_v{version}_{timestamp}.backup"
backup_file = os.path.join(self.backups_dir, backup_filename)
# 复制配置文件到备份目录
version_info = self._load_version_info(model_name, version)
if version_info and os.path.exists(version_info.config_file):
shutil.copy2(version_info.config_file, backup_file)
return backup_file
def generate_lifecycle_report(self, model_name: str) -> Dict:
"""生成生命周期报告"""
versions = self.list_model_versions(model_name)
if not versions:
return {'error': f'模型不存在: {model_name}'}
# 统计各状态的版本数量
status_counts = {}
for status in ModelStatus:
status_counts[status.value] = 0
for version in versions:
status_counts[version.status.value] += 1
# 获取关键版本信息
latest_version = versions[0] if versions else None
production_version = self.get_production_version(model_name)
report = {
'model_name': model_name,
'total_versions': len(versions),
'status_distribution': status_counts,
'latest_version': {
'version': latest_version.version,
'status': latest_version.status.value,
'created_at': latest_version.created_at.isoformat(),
'created_by': latest_version.created_by
} if latest_version else None,
'production_version': {
'version': production_version.version,
'created_at': production_version.created_at.isoformat(),
'created_by': production_version.created_by
} if production_version else None,
'version_history': [
{
'version': v.version,
'status': v.status.value,
'created_at': v.created_at.isoformat(),
'created_by': v.created_by,
'description': v.description
} for v in versions
]
}
return report
def print_lifecycle_report(self, model_name: str):
"""打印生命周期报告"""
report = self.generate_lifecycle_report(model_name)
if 'error' in report:
print(f"❌ {report['error']}")
return
print(f"\n=== {model_name} 生命周期报告 ===")
print(f"总版本数: {report['total_versions']}")
print("\n📊 状态分布:")
for status, count in report['status_distribution'].items():
if count > 0:
print(f" {status}: {count}")
if report['latest_version']:
latest = report['latest_version']
print(f"\n🔄 最新版本: v{latest['version']} ({latest['status']})")
print(f" 创建时间: {latest['created_at']}")
print(f" 创建者: {latest['created_by']}")
if report['production_version']:
prod = report['production_version']
print(f"\n🚀 生产版本: v{prod['version']}")
print(f" 创建时间: {prod['created_at']}")
print(f" 创建者: {prod['created_by']}")
else:
print("\n⚠️ 没有生产环境版本")
print("\n📋 版本历史:")
for version in report['version_history'][:10]: # 显示最近10个版本
status_icon = {
'draft': '📝',
'development': '🔧',
'testing': '🧪',
'production': '🚀',
'deprecated': '⚠️',
'archived': '📦'
}.get(version['status'], '❓')
print(f" {status_icon} v{version['version']} ({version['status']}) - {version['description']}")
print(f" {version['created_at']} by {version['created_by']}")
def main():
# 示例使用
manager = ModelLifecycleManager("/tmp/kylin_models")
# 创建模型版本
model_config = {
"name": "sales_model",
"fact_table": "sales_fact",
"dimensions": ["time", "product", "customer"],
"measures": ["sales_amount", "quantity", "profit"]
}
# 创建版本 1.0
manager.create_model_version(
model_name="sales_model",
config=model_config,
version="1.0",
description="初始版本",
created_by="data_engineer",
changes=["创建基础销售模型"]
)
# 更新状态到开发环境
manager.update_model_status("sales_model", "1.0", ModelStatus.DEVELOPMENT, "data_engineer")
# 创建版本 1.1
model_config["measures"].append("discount_amount")
manager.create_model_version(
model_name="sales_model",
config=model_config,
version="1.1",
description="添加折扣金额度量",
created_by="data_engineer",
changes=["添加discount_amount度量"]
)
# 更新到测试环境
manager.update_model_status("sales_model", "1.1", ModelStatus.TESTING, "data_engineer")
# 部署到生产环境
manager.update_model_status("sales_model", "1.1", ModelStatus.PRODUCTION, "data_engineer")
# 打印生命周期报告
manager.print_lifecycle_report("sales_model")
if __name__ == "__main__":
main()
2. 模型性能监控
#!/usr/bin/env python3
# model_performance_monitor.py - 模型性能监控工具
import time
import psutil
import requests
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass
import json
@dataclass
class PerformanceMetric:
"""性能指标"""
timestamp: datetime
model_name: str
metric_name: str
metric_value: float
unit: str
threshold: Optional[float] = None
class ModelPerformanceMonitor:
"""模型性能监控器"""
def __init__(self, kylin_host: str, username: str, password: str):
self.kylin_host = kylin_host
self.username = username
self.password = password
self.session = requests.Session()
self.session.auth = (username, password)
self.metrics_history = []
# 性能阈值配置
self.thresholds = {
'query_response_time': 5.0, # 秒
'cube_build_time': 3600.0, # 秒
'storage_size': 10.0, # GB
'memory_usage': 80.0, # 百分比
'cpu_usage': 80.0, # 百分比
'query_success_rate': 95.0 # 百分比
}
def collect_system_metrics(self) -> List[PerformanceMetric]:
"""收集系统性能指标"""
metrics = []
timestamp = datetime.now()
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
metrics.append(PerformanceMetric(
timestamp=timestamp,
model_name="system",
metric_name="cpu_usage",
metric_value=cpu_percent,
unit="percent",
threshold=self.thresholds['cpu_usage']
))
# 内存使用率
memory = psutil.virtual_memory()
metrics.append(PerformanceMetric(
timestamp=timestamp,
model_name="system",
metric_name="memory_usage",
metric_value=memory.percent,
unit="percent",
threshold=self.thresholds['memory_usage']
))
# 磁盘使用情况
disk = psutil.disk_usage('/')
disk_usage_gb = (disk.total - disk.free) / (1024**3)
metrics.append(PerformanceMetric(
timestamp=timestamp,
model_name="system",
metric_name="disk_usage",
metric_value=disk_usage_gb,
unit="GB"
))
return metrics
def collect_kylin_metrics(self, model_name: str) -> List[PerformanceMetric]:
"""收集Kylin模型性能指标"""
metrics = []
timestamp = datetime.now()
try:
# 获取模型信息
model_info = self.get_model_info(model_name)
if not model_info:
return metrics
# 获取Cube信息
cubes = self.get_model_cubes(model_name)
for cube in cubes:
cube_name = cube.get('name')
# Cube存储大小
storage_size = self.get_cube_storage_size(cube_name)
if storage_size is not None:
metrics.append(PerformanceMetric(
timestamp=timestamp,
model_name=model_name,
metric_name="storage_size",
metric_value=storage_size,
unit="GB",
threshold=self.thresholds['storage_size']
))
# 最近构建时间
build_time = self.get_last_build_time(cube_name)
if build_time is not None:
metrics.append(PerformanceMetric(
timestamp=timestamp,
model_name=model_name,
metric_name="cube_build_time",
metric_value=build_time,
unit="seconds",
threshold=self.thresholds['cube_build_time']
))
# 查询性能指标
query_metrics = self.get_query_performance_metrics(model_name)
metrics.extend(query_metrics)
except Exception as e:
print(f"收集Kylin指标时出错: {e}")
return metrics
def get_model_info(self, model_name: str) -> Optional[Dict]:
"""获取模型信息"""
try:
response = self.session.get(
f"http://{self.kylin_host}/kylin/api/models/{model_name}"
)
return response.json() if response.status_code == 200 else None
except:
return None
def get_model_cubes(self, model_name: str) -> List[Dict]:
"""获取模型的Cube列表"""
try:
response = self.session.get(
f"http://{self.kylin_host}/kylin/api/cubes",
params={'modelName': model_name}
)
return response.json() if response.status_code == 200 else []
except:
return []
def get_cube_storage_size(self, cube_name: str) -> Optional[float]:
"""获取Cube存储大小(GB)"""
try:
response = self.session.get(
f"http://{self.kylin_host}/kylin/api/cubes/{cube_name}"
)
if response.status_code == 200:
cube_info = response.json()
size_kb = cube_info.get('size_kb', 0)
return size_kb / (1024 * 1024) # 转换为GB
except:
pass
return None
def get_last_build_time(self, cube_name: str) -> Optional[float]:
"""获取最近构建时间(秒)"""
try:
response = self.session.get(
f"http://{self.kylin_host}/kylin/api/jobs",
params={'cubeName': cube_name, 'limit': 1}
)
if response.status_code == 200:
jobs = response.json()
if jobs:
job = jobs[0]
start_time = job.get('create_time')
end_time = job.get('last_modified')
if start_time and end_time:
duration = (end_time - start_time) / 1000 # 转换为秒
return duration
except:
pass
return None
def get_query_performance_metrics(self, model_name: str) -> List[PerformanceMetric]:
"""获取查询性能指标"""
metrics = []
timestamp = datetime.now()
try:
# 执行测试查询
test_queries = self.get_test_queries(model_name)
total_queries = len(test_queries)
successful_queries = 0
total_response_time = 0
for query in test_queries:
start_time = time.time()
success = self.execute_test_query(query)
end_time = time.time()
response_time = end_time - start_time
total_response_time += response_time
if success:
successful_queries += 1
# 记录单个查询的响应时间
metrics.append(PerformanceMetric(
timestamp=timestamp,
model_name=model_name,
metric_name="query_response_time",
metric_value=response_time,
unit="seconds",
threshold=self.thresholds['query_response_time']
))
# 计算平均响应时间
if total_queries > 0:
avg_response_time = total_response_time / total_queries
metrics.append(PerformanceMetric(
timestamp=timestamp,
model_name=model_name,
metric_name="avg_query_response_time",
metric_value=avg_response_time,
unit="seconds",
threshold=self.thresholds['query_response_time']
))
# 计算查询成功率
success_rate = (successful_queries / total_queries) * 100
metrics.append(PerformanceMetric(
timestamp=timestamp,
model_name=model_name,
metric_name="query_success_rate",
metric_value=success_rate,
unit="percent",
threshold=self.thresholds['query_success_rate']
))
except Exception as e:
print(f"获取查询性能指标时出错: {e}")
return metrics
def get_test_queries(self, model_name: str) -> List[str]:
"""获取测试查询列表"""
# 这里应该根据模型定义生成测试查询
# 为了示例,返回一些基本查询
return [
f"SELECT COUNT(*) FROM {model_name}",
f"SELECT SUM(sales_amount) FROM {model_name} WHERE year = 2023",
f"SELECT category, SUM(sales_amount) FROM {model_name} GROUP BY category"
]
def execute_test_query(self, query: str) -> bool:
"""执行测试查询"""
try:
response = self.session.post(
f"http://{self.kylin_host}/kylin/api/query",
json={
"sql": query,
"project": "default"
}
)
return response.status_code == 200
except:
return False
def monitor_model(self, model_name: str, duration_minutes: int = 60,
interval_seconds: int = 300) -> List[PerformanceMetric]:
"""监控模型性能"""
print(f"开始监控模型: {model_name},持续时间: {duration_minutes}分钟")
all_metrics = []
end_time = datetime.now() + timedelta(minutes=duration_minutes)
while datetime.now() < end_time:
print(f"收集性能指标... {datetime.now().strftime('%H:%M:%S')}")
# 收集系统指标
system_metrics = self.collect_system_metrics()
all_metrics.extend(system_metrics)
# 收集Kylin指标
kylin_metrics = self.collect_kylin_metrics(model_name)
all_metrics.extend(kylin_metrics)
# 检查阈值告警
self.check_thresholds(system_metrics + kylin_metrics)
# 等待下一次收集
time.sleep(interval_seconds)
self.metrics_history.extend(all_metrics)
print(f"监控完成,共收集 {len(all_metrics)} 个指标")
return all_metrics
def check_thresholds(self, metrics: List[PerformanceMetric]):
"""检查阈值告警"""
for metric in metrics:
if metric.threshold is not None and metric.metric_value > metric.threshold:
print(f"⚠️ 阈值告警: {metric.model_name}.{metric.metric_name} = "
f"{metric.metric_value:.2f} {metric.unit} (阈值: {metric.threshold})")
def generate_performance_report(self, model_name: str,
hours: int = 24) -> Dict:
"""生成性能报告"""
# 过滤指定时间范围内的指标
cutoff_time = datetime.now() - timedelta(hours=hours)
recent_metrics = [
m for m in self.metrics_history
if m.timestamp >= cutoff_time and
(m.model_name == model_name or m.model_name == "system")
]
if not recent_metrics:
return {'error': f'没有找到 {model_name} 在过去 {hours} 小时的性能数据'}
# 按指标名称分组
metrics_by_name = {}
for metric in recent_metrics:
if metric.metric_name not in metrics_by_name:
metrics_by_name[metric.metric_name] = []
metrics_by_name[metric.metric_name].append(metric)
# 计算统计信息
report = {
'model_name': model_name,
'report_period': f'{hours} hours',
'total_metrics': len(recent_metrics),
'metrics_summary': {}
}
for metric_name, metric_list in metrics_by_name.items():
values = [m.metric_value for m in metric_list]
unit = metric_list[0].unit
threshold = metric_list[0].threshold
summary = {
'count': len(values),
'min': min(values),
'max': max(values),
'avg': sum(values) / len(values),
'unit': unit,
'threshold': threshold,
'threshold_violations': sum(1 for v in values if threshold and v > threshold)
}
report['metrics_summary'][metric_name] = summary
return report
def print_performance_report(self, model_name: str, hours: int = 24):
"""打印性能报告"""
report = self.generate_performance_report(model_name, hours)
if 'error' in report:
print(f"❌ {report['error']}")
return
print(f"\n=== {model_name} 性能报告 ({report['report_period']}) ===")
print(f"总指标数: {report['total_metrics']}")
print("\n📊 指标摘要:")
for metric_name, summary in report['metrics_summary'].items():
print(f"\n{metric_name}:")
print(f" 数量: {summary['count']}")
print(f" 最小值: {summary['min']:.2f} {summary['unit']}")
print(f" 最大值: {summary['max']:.2f} {summary['unit']}")
print(f" 平均值: {summary['avg']:.2f} {summary['unit']}")
if summary['threshold']:
print(f" 阈值: {summary['threshold']} {summary['unit']}")
if summary['threshold_violations'] > 0:
print(f" ⚠️ 阈值违规: {summary['threshold_violations']} 次")
else:
print(f" ✅ 无阈值违规")
def main():
# 示例使用
monitor = ModelPerformanceMonitor(
kylin_host="localhost:7070",
username="ADMIN",
password="KYLIN"
)
# 监控模型性能(5分钟,每30秒收集一次)
metrics = monitor.monitor_model(
model_name="sales_model",
duration_minutes=5,
interval_seconds=30
)
# 生成性能报告
monitor.print_performance_report("sales_model", hours=1)
if __name__ == "__main__":
main()
5.4.2 模型优化策略
1. 存储优化
#!/usr/bin/env python3
# model_optimizer.py - 模型优化工具
import json
import math
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from enum import Enum
class OptimizationType(Enum):
"""优化类型"""
STORAGE = "storage" # 存储优化
PERFORMANCE = "performance" # 性能优化
COST = "cost" # 成本优化
MAINTENANCE = "maintenance" # 维护优化
@dataclass
class OptimizationRecommendation:
"""优化建议"""
type: OptimizationType
priority: str # high, medium, low
title: str
description: str
impact: str
implementation: str
estimated_savings: Optional[str] = None
class ModelOptimizer:
"""模型优化器"""
def __init__(self):
self.optimization_rules = self._load_optimization_rules()
def analyze_model(self, model_config: Dict) -> List[OptimizationRecommendation]:
"""分析模型并生成优化建议"""
recommendations = []
# 存储优化分析
storage_recommendations = self._analyze_storage_optimization(model_config)
recommendations.extend(storage_recommendations)
# 性能优化分析
performance_recommendations = self._analyze_performance_optimization(model_config)
recommendations.extend(performance_recommendations)
# 成本优化分析
cost_recommendations = self._analyze_cost_optimization(model_config)
recommendations.extend(cost_recommendations)
# 维护优化分析
maintenance_recommendations = self._analyze_maintenance_optimization(model_config)
recommendations.extend(maintenance_recommendations)
# 按优先级排序
priority_order = {'high': 0, 'medium': 1, 'low': 2}
recommendations.sort(key=lambda x: priority_order.get(x.priority, 3))
return recommendations
def _analyze_storage_optimization(self, model_config: Dict) -> List[OptimizationRecommendation]:
"""分析存储优化"""
recommendations = []
# 检查维度数量
dimensions = model_config.get('dimensions', [])
if len(dimensions) > 15:
recommendations.append(OptimizationRecommendation(
type=OptimizationType.STORAGE,
priority='high',
title='减少维度数量',
description=f'当前模型有{len(dimensions)}个维度,过多的维度会导致Cube爆炸',
impact='可显著减少存储空间和构建时间',
implementation='移除低频使用的维度,或将相关维度合并为层次维度',
estimated_savings='存储空间可减少30-50%'
))
# 检查度量数量
measures = model_config.get('measures', [])
if len(measures) > 20:
recommendations.append(OptimizationRecommendation(
type=OptimizationType.STORAGE,
priority='medium',
title='优化度量定义',
description=f'当前模型有{len(measures)}个度量,建议合并相似度量',
impact='减少存储开销和计算复杂度',
implementation='合并相似度量,使用派生度量替代冗余度量',
estimated_savings='存储空间可减少10-20%'
))
# 检查分区策略
partition_config = model_config.get('partition_config', {})
if not partition_config:
recommendations.append(OptimizationRecommendation(
type=OptimizationType.STORAGE,
priority='high',
title='配置分区策略',
description='未配置分区策略,会影响增量构建效率',
impact='提高增量构建性能,减少全量构建频率',
implementation='基于时间维度配置合适的分区策略',
estimated_savings='构建时间可减少60-80%'
))
# 检查聚合组配置
aggregation_groups = model_config.get('aggregation_groups', [])
if not aggregation_groups:
recommendations.append(OptimizationRecommendation(
type=OptimizationType.STORAGE,
priority='high',
title='配置聚合组',
description='未配置聚合组,无法有效控制Cuboid数量',
impact='大幅减少存储空间和构建时间',
implementation='根据查询模式配置合适的聚合组',
estimated_savings='存储空间可减少70-90%'
))
return recommendations
def _analyze_performance_optimization(self, model_config: Dict) -> List[OptimizationRecommendation]:
"""分析性能优化"""
recommendations = []
# 检查编码策略
dimensions = model_config.get('dimensions', [])
for dim in dimensions:
if isinstance(dim, dict):
encoding = dim.get('encoding', {})
if not encoding:
recommendations.append(OptimizationRecommendation(
type=OptimizationType.PERFORMANCE,
priority='medium',
title=f'优化维度{dim.get("name", "")}的编码',
description='未配置编码策略,影响查询性能',
impact='提高查询性能和压缩率',
implementation='根据维度基数选择合适的编码方式',
estimated_savings='查询性能可提升20-40%'
))
# 检查索引配置
if 'rowkey_columns' not in model_config:
recommendations.append(OptimizationRecommendation(
type=OptimizationType.PERFORMANCE,
priority='high',
title='优化Rowkey设计',
description='Rowkey设计不合理,影响查询性能',
impact='显著提高查询性能',
implementation='根据查询模式优化Rowkey列顺序',
estimated_savings='查询性能可提升50-80%'
))
# 检查缓存配置
cache_config = model_config.get('cache_config', {})
if not cache_config.get('enabled', False):
recommendations.append(OptimizationRecommendation(
type=OptimizationType.PERFORMANCE,
priority='medium',
title='启用查询缓存',
description='未启用查询缓存,重复查询性能差',
impact='提高重复查询性能',
implementation='启用并配置合适的缓存策略',
estimated_savings='重复查询性能可提升80-95%'
))
return recommendations
def _analyze_cost_optimization(self, model_config: Dict) -> List[OptimizationRecommendation]:
"""分析成本优化"""
recommendations = []
# 检查构建频率
build_config = model_config.get('build_config', {})
build_frequency = build_config.get('frequency', 'daily')
if build_frequency == 'hourly':
recommendations.append(OptimizationRecommendation(
type=OptimizationType.COST,
priority='medium',
title='优化构建频率',
description='小时级构建频率可能过于频繁,增加计算成本',
impact='减少计算资源消耗',
implementation='根据业务需求调整为合适的构建频率',
estimated_savings='计算成本可减少30-50%'
))
# 检查保留策略
retention_config = model_config.get('retention_config', {})
if not retention_config:
recommendations.append(OptimizationRecommendation(
type=OptimizationType.COST,
priority='low',
title='配置数据保留策略',
description='未配置数据保留策略,可能存储过多历史数据',
impact='减少存储成本',
implementation='根据业务需求配置合适的数据保留期',
estimated_savings='存储成本可减少20-40%'
))
return recommendations
def _analyze_maintenance_optimization(self, model_config: Dict) -> List[OptimizationRecommendation]:
"""分析维护优化"""
recommendations = []
# 检查监控配置
monitoring_config = model_config.get('monitoring_config', {})
if not monitoring_config:
recommendations.append(OptimizationRecommendation(
type=OptimizationType.MAINTENANCE,
priority='medium',
title='配置监控告警',
description='未配置监控告警,难以及时发现问题',
impact='提高系统可靠性和运维效率',
implementation='配置关键指标的监控和告警',
estimated_savings='运维成本可减少40-60%'
))
# 检查备份策略
backup_config = model_config.get('backup_config', {})
if not backup_config:
recommendations.append(OptimizationRecommendation(
type=OptimizationType.MAINTENANCE,
priority='high',
title='配置备份策略',
description='未配置备份策略,存在数据丢失风险',
impact='提高数据安全性',
implementation='配置定期备份和恢复策略',
estimated_savings='避免数据丢失风险'
))
return recommendations
def _load_optimization_rules(self) -> Dict:
"""加载优化规则"""
return {
'max_dimensions': 15,
'max_measures': 20,
'recommended_encodings': {
'low_cardinality': 'dict',
'medium_cardinality': 'fixed_length',
'high_cardinality': 'fixed_length_hex'
},
'partition_strategies': {
'daily': 'YYYY-MM-DD',
'monthly': 'YYYY-MM',
'yearly': 'YYYY'
}
}
def generate_optimization_plan(self, model_config: Dict) -> Dict:
"""生成优化计划"""
recommendations = self.analyze_model(model_config)
# 按类型分组
recommendations_by_type = {}
for rec in recommendations:
type_name = rec.type.value
if type_name not in recommendations_by_type:
recommendations_by_type[type_name] = []
recommendations_by_type[type_name].append(rec)
# 计算优化影响
total_recommendations = len(recommendations)
high_priority_count = sum(1 for r in recommendations if r.priority == 'high')
plan = {
'model_name': model_config.get('name', 'unknown'),
'analysis_date': '2024-01-01', # 实际应用中使用当前日期
'total_recommendations': total_recommendations,
'high_priority_count': high_priority_count,
'recommendations_by_type': recommendations_by_type,
'implementation_phases': self._create_implementation_phases(recommendations)
}
return plan
def _create_implementation_phases(self, recommendations: List[OptimizationRecommendation]) -> List[Dict]:
"""创建实施阶段"""
phases = []
# 第一阶段:高优先级优化
high_priority = [r for r in recommendations if r.priority == 'high']
if high_priority:
phases.append({
'phase': 1,
'name': '紧急优化',
'description': '解决高优先级问题,立即实施',
'duration': '1-2周',
'recommendations': [{
'title': r.title,
'type': r.type.value,
'implementation': r.implementation
} for r in high_priority]
})
# 第二阶段:中优先级优化
medium_priority = [r for r in recommendations if r.priority == 'medium']
if medium_priority:
phases.append({
'phase': 2,
'name': '性能优化',
'description': '提升系统性能和效率',
'duration': '2-4周',
'recommendations': [{
'title': r.title,
'type': r.type.value,
'implementation': r.implementation
} for r in medium_priority]
})
# 第三阶段:低优先级优化
low_priority = [r for r in recommendations if r.priority == 'low']
if low_priority:
phases.append({
'phase': 3,
'name': '长期优化',
'description': '长期改进和成本优化',
'duration': '1-2个月',
'recommendations': [{
'title': r.title,
'type': r.type.value,
'implementation': r.implementation
} for r in low_priority]
})
return phases
def print_optimization_report(self, model_config: Dict):
"""打印优化报告"""
plan = self.generate_optimization_plan(model_config)
print(f"\n=== {plan['model_name']} 优化报告 ===")
print(f"分析日期: {plan['analysis_date']}")
print(f"总建议数: {plan['total_recommendations']}")
print(f"高优先级: {plan['high_priority_count']}")
print("\n📊 按类型分布:")
for opt_type, recommendations in plan['recommendations_by_type'].items():
print(f" {opt_type}: {len(recommendations)}")
print("\n🔧 详细建议:")
for opt_type, recommendations in plan['recommendations_by_type'].items():
print(f"\n{opt_type.upper()} 优化:")
for i, rec in enumerate(recommendations, 1):
priority_icon = {'high': '🔴', 'medium': '🟡', 'low': '🟢'}.get(rec.priority, '⚪')
print(f" {priority_icon} {i}. {rec.title}")
print(f" 描述: {rec.description}")
print(f" 影响: {rec.impact}")
print(f" 实施: {rec.implementation}")
if rec.estimated_savings:
print(f" 预期收益: {rec.estimated_savings}")
print("\n📅 实施计划:")
for phase in plan['implementation_phases']:
print(f"\n阶段 {phase['phase']}: {phase['name']}")
print(f" 描述: {phase['description']}")
print(f" 周期: {phase['duration']}")
print(f" 任务数: {len(phase['recommendations'])}")
def main():
# 示例使用
optimizer = ModelOptimizer()
# 示例模型配置
model_config = {
"name": "sales_model",
"dimensions": [
{"name": "time", "encoding": {"type": "date"}},
{"name": "product"}, # 缺少编码配置
{"name": "customer"},
{"name": "region"},
{"name": "channel"},
# ... 更多维度(模拟过多维度)
] + [{"name": f"dim_{i}"} for i in range(10, 20)], # 总共17个维度
"measures": [
"sales_amount", "quantity", "profit", "discount",
"cost", "revenue", "margin", "tax", "shipping"
],
"build_config": {
"frequency": "hourly" # 可能过于频繁
}
# 缺少分区配置、聚合组配置等
}
# 生成优化报告
optimizer.print_optimization_report(model_config)
if __name__ == "__main__":
main()
2. 查询优化
#!/usr/bin/env python3
# query_optimizer.py - 查询优化工具
import re
import sqlparse
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from enum import Enum
class QueryIssueType(Enum):
"""查询问题类型"""
PERFORMANCE = "performance" # 性能问题
SYNTAX = "syntax" # 语法问题
OPTIMIZATION = "optimization" # 优化建议
BEST_PRACTICE = "best_practice" # 最佳实践
@dataclass
class QueryIssue:
"""查询问题"""
type: QueryIssueType
severity: str # high, medium, low
title: str
description: str
suggestion: str
example: Optional[str] = None
class QueryOptimizer:
"""查询优化器"""
def __init__(self):
self.optimization_rules = self._load_optimization_rules()
def analyze_query(self, sql: str, model_config: Dict = None) -> List[QueryIssue]:
"""分析SQL查询并提供优化建议"""
issues = []
# 解析SQL
try:
parsed = sqlparse.parse(sql)[0]
except Exception as e:
issues.append(QueryIssue(
type=QueryIssueType.SYNTAX,
severity='high',
title='SQL语法错误',
description=f'SQL解析失败: {str(e)}',
suggestion='检查SQL语法是否正确'
))
return issues
# 性能分析
performance_issues = self._analyze_performance(sql, parsed)
issues.extend(performance_issues)
# 语法分析
syntax_issues = self._analyze_syntax(sql, parsed)
issues.extend(syntax_issues)
# 优化建议
optimization_issues = self._analyze_optimization(sql, parsed, model_config)
issues.extend(optimization_issues)
# 最佳实践检查
best_practice_issues = self._analyze_best_practices(sql, parsed)
issues.extend(best_practice_issues)
return issues
def _analyze_performance(self, sql: str, parsed) -> List[QueryIssue]:
"""分析性能问题"""
issues = []
sql_lower = sql.lower()
# 检查SELECT *
if 'select *' in sql_lower:
issues.append(QueryIssue(
type=QueryIssueType.PERFORMANCE,
severity='medium',
title='避免使用SELECT *',
description='SELECT * 会查询所有列,影响性能',
suggestion='明确指定需要的列名',
example='SELECT col1, col2 FROM table'
))
# 检查缺少WHERE条件
if 'where' not in sql_lower and 'group by' in sql_lower:
issues.append(QueryIssue(
type=QueryIssueType.PERFORMANCE,
severity='high',
title='缺少WHERE过滤条件',
description='聚合查询缺少WHERE条件可能导致全表扫描',
suggestion='添加适当的WHERE条件过滤数据',
example='SELECT ... FROM table WHERE date >= \'2024-01-01\''
))
# 检查DISTINCT使用
if 'distinct' in sql_lower:
issues.append(QueryIssue(
type=QueryIssueType.PERFORMANCE,
severity='medium',
title='谨慎使用DISTINCT',
description='DISTINCT操作可能影响查询性能',
suggestion='确认是否真的需要去重,考虑使用GROUP BY替代',
example='SELECT col1, COUNT(*) FROM table GROUP BY col1'
))
# 检查子查询
if re.search(r'\(\s*select', sql_lower):
issues.append(QueryIssue(
type=QueryIssueType.PERFORMANCE,
severity='medium',
title='优化子查询',
description='复杂子查询可能影响性能',
suggestion='考虑使用JOIN或WITH子句重写子查询'
))
# 检查ORDER BY without LIMIT
if 'order by' in sql_lower and 'limit' not in sql_lower:
issues.append(QueryIssue(
type=QueryIssueType.PERFORMANCE,
severity='medium',
title='ORDER BY建议配合LIMIT使用',
description='大结果集的排序操作消耗资源',
suggestion='如果不需要全部结果,添加LIMIT限制',
example='SELECT ... ORDER BY col1 LIMIT 100'
))
return issues
def _analyze_syntax(self, sql: str, parsed) -> List[QueryIssue]:
"""分析语法问题"""
issues = []
# 检查日期格式
date_patterns = re.findall(r"'(\d{4}-\d{2}-\d{2}[^']*)'|\"(\d{4}-\d{2}-\d{2}[^\"]*)\"", sql)
for pattern_group in date_patterns:
date_str = pattern_group[0] or pattern_group[1]
if not re.match(r'^\d{4}-\d{2}-\d{2}(\s\d{2}:\d{2}:\d{2})?$', date_str):
issues.append(QueryIssue(
type=QueryIssueType.SYNTAX,
severity='medium',
title='日期格式不规范',
description=f'日期格式 "{date_str}" 可能不被正确识别',
suggestion='使用标准日期格式 YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS',
example="WHERE date_col >= '2024-01-01'"
))
# 检查表名和列名引用
if re.search(r'[^a-zA-Z0-9_\s]\w+\.[^a-zA-Z0-9_]', sql):
issues.append(QueryIssue(
type=QueryIssueType.SYNTAX,
severity='low',
title='建议使用表别名',
description='复杂查询建议使用表别名提高可读性',
suggestion='为表定义简短的别名',
example='SELECT t1.col1, t2.col2 FROM table1 t1 JOIN table2 t2 ON ...'
))
return issues
def _analyze_optimization(self, sql: str, parsed, model_config: Dict = None) -> List[QueryIssue]:
"""分析优化机会"""
issues = []
sql_lower = sql.lower()
# 检查时间过滤优化
if model_config and 'partition_column' in model_config:
partition_col = model_config['partition_column'].lower()
if partition_col not in sql_lower:
issues.append(QueryIssue(
type=QueryIssueType.OPTIMIZATION,
severity='high',
title='添加分区列过滤',
description=f'查询未包含分区列 {partition_col},无法利用分区剪枝',
suggestion=f'在WHERE条件中添加 {partition_col} 过滤',
example=f"WHERE {partition_col} >= '2024-01-01'"
))
# 检查聚合函数优化
agg_functions = re.findall(r'(count|sum|avg|min|max)\s*\([^)]+\)', sql_lower)
if len(agg_functions) > 5:
issues.append(QueryIssue(
type=QueryIssueType.OPTIMIZATION,
severity='medium',
title='简化聚合函数',
description=f'查询包含{len(agg_functions)}个聚合函数,可能影响性能',
suggestion='考虑分解为多个简单查询或使用预聚合表'
))
# 检查JOIN优化
join_count = len(re.findall(r'\bjoin\b', sql_lower))
if join_count > 3:
issues.append(QueryIssue(
type=QueryIssueType.OPTIMIZATION,
severity='medium',
title='优化多表JOIN',
description=f'查询包含{join_count}个JOIN,可能影响性能',
suggestion='检查JOIN顺序和条件,考虑使用宽表或预聚合'
))
# 检查CASE WHEN优化
case_count = len(re.findall(r'\bcase\b', sql_lower))
if case_count > 3:
issues.append(QueryIssue(
type=QueryIssueType.OPTIMIZATION,
severity='low',
title='简化CASE WHEN逻辑',
description=f'查询包含{case_count}个CASE表达式',
suggestion='考虑在数据模型中预处理复杂逻辑'
))
return issues
def _analyze_best_practices(self, sql: str, parsed) -> List[QueryIssue]:
"""分析最佳实践"""
issues = []
sql_lower = sql.lower()
# 检查注释
if '--' not in sql and '/*' not in sql:
issues.append(QueryIssue(
type=QueryIssueType.BEST_PRACTICE,
severity='low',
title='添加查询注释',
description='复杂查询建议添加注释说明业务逻辑',
suggestion='在查询开头添加注释说明查询目的',
example='-- 查询2024年各产品类别的销售汇总\nSELECT ...'
))
# 检查格式化
if '\n' not in sql or sql.count(' ') / len(sql) < 0.1:
issues.append(QueryIssue(
type=QueryIssueType.BEST_PRACTICE,
severity='low',
title='改善SQL格式',
description='SQL格式不够清晰,影响可读性',
suggestion='使用适当的换行和缩进格式化SQL'
))
# 检查大小写一致性
keywords = ['select', 'from', 'where', 'group', 'order', 'having']
keyword_cases = []
for keyword in keywords:
if keyword in sql_lower:
# 查找原始大小写
pattern = re.compile(re.escape(keyword), re.IGNORECASE)
matches = pattern.findall(sql)
if matches:
keyword_cases.extend(matches)
if len(set(keyword_cases)) > 1:
issues.append(QueryIssue(
type=QueryIssueType.BEST_PRACTICE,
severity='low',
title='保持关键字大小写一致',
description='SQL关键字大小写不一致',
suggestion='统一使用大写或小写关键字'
))
return issues
def _load_optimization_rules(self) -> Dict:
"""加载优化规则"""
return {
'max_aggregations': 5,
'max_joins': 3,
'max_case_expressions': 3,
'recommended_date_format': 'YYYY-MM-DD',
'performance_keywords': ['distinct', 'order by', 'group by']
}
def optimize_query(self, sql: str, model_config: Dict = None) -> Dict:
"""优化查询并返回建议"""
issues = self.analyze_query(sql, model_config)
# 生成优化后的SQL(示例)
optimized_sql = self._generate_optimized_sql(sql, issues)
return {
'original_sql': sql,
'optimized_sql': optimized_sql,
'issues': issues,
'optimization_summary': self._generate_optimization_summary(issues)
}
def _generate_optimized_sql(self, sql: str, issues: List[QueryIssue]) -> str:
"""生成优化后的SQL(简化示例)"""
optimized = sql
# 简单的优化示例
for issue in issues:
if issue.title == '避免使用SELECT *' and 'SELECT *' in optimized:
optimized = optimized.replace('SELECT *', 'SELECT col1, col2, col3 -- 请替换为实际需要的列')
return optimized
def _generate_optimization_summary(self, issues: List[QueryIssue]) -> Dict:
"""生成优化摘要"""
summary = {
'total_issues': len(issues),
'by_severity': {'high': 0, 'medium': 0, 'low': 0},
'by_type': {}
}
for issue in issues:
summary['by_severity'][issue.severity] += 1
type_name = issue.type.value
if type_name not in summary['by_type']:
summary['by_type'][type_name] = 0
summary['by_type'][type_name] += 1
return summary
def print_optimization_report(self, sql: str, model_config: Dict = None):
"""打印优化报告"""
result = self.optimize_query(sql, model_config)
issues = result['issues']
summary = result['optimization_summary']
print("\n=== SQL查询优化报告 ===")
print(f"总问题数: {summary['total_issues']}")
if summary['total_issues'] == 0:
print("✅ 查询已经很好,没有发现明显问题")
return
print("\n📊 问题分布:")
print(f" 严重: {summary['by_severity']['high']}")
print(f" 中等: {summary['by_severity']['medium']}")
print(f" 轻微: {summary['by_severity']['low']}")
print("\n🔍 详细问题:")
for i, issue in enumerate(issues, 1):
severity_icon = {'high': '🔴', 'medium': '🟡', 'low': '🟢'}.get(issue.severity, '⚪')
type_icon = {
'performance': '⚡',
'syntax': '📝',
'optimization': '🔧',
'best_practice': '💡'
}.get(issue.type.value, '❓')
print(f"\n{severity_icon} {type_icon} {i}. {issue.title}")
print(f" 问题: {issue.description}")
print(f" 建议: {issue.suggestion}")
if issue.example:
print(f" 示例: {issue.example}")
if result['optimized_sql'] != result['original_sql']:
print("\n🔧 优化后的SQL:")
print(result['optimized_sql'])
def main():
# 示例使用
optimizer = QueryOptimizer()
# 示例SQL查询
test_sql = """
SELECT * FROM sales_fact
WHERE product_category = 'Electronics'
GROUP BY customer_region
ORDER BY sum(sales_amount) DESC
"""
# 示例模型配置
model_config = {
'partition_column': 'order_date'
}
# 生成优化报告
optimizer.print_optimization_report(test_sql, model_config)
if __name__ == "__main__":
main()
5.4.3 最佳实践总结
1. 模型设计原则
维度设计:
- 控制维度数量(建议不超过15个)
- 合理设计维度层次
- 选择合适的编码策略
- 避免高基数维度
度量设计:
- 优先使用预聚合度量
- 合理使用派生度量
- 避免复杂计算逻辑
- 考虑度量的可加性
分区策略:
- 基于时间维度分区
- 选择合适的分区粒度
- 配置增量构建
- 定期清理历史分区
2. 性能优化策略
存储优化:
- 配置聚合组减少Cuboid
- 优化Rowkey设计
- 选择合适的存储引擎
- 启用压缩算法
查询优化:
- 利用分区剪枝
- 避免全表扫描
- 合理使用缓存
- 优化SQL语句
构建优化:
- 配置合适的构建频率
- 使用增量构建
- 优化资源配置
- 监控构建性能
3. 运维管理
监控告警:
- 配置关键指标监控
- 设置合理的告警阈值
- 建立故障响应流程
- 定期性能评估
备份恢复:
- 定期备份元数据
- 备份重要Cube数据
- 测试恢复流程
- 制定灾难恢复计划
版本管理:
- 建立版本控制流程
- 记录变更历史
- 支持快速回滚
- 环境隔离部署
5.5 本章小结
本章详细介绍了Apache Kylin的数据模型设计与管理,包括:
- 维度建模基础:掌握了事实表、维度表、星型模式等核心概念
- Kylin数据模型:学会了创建和配置数据模型的方法
- 维度和度量设计:了解了维度层次、度量类型等设计原则
- 模型管理优化:掌握了生命周期管理、性能监控等管理技能
通过本章的学习,你应该能够: - 设计合理的维度建模方案 - 创建和配置Kylin数据模型 - 优化模型性能和存储 - 建立完善的模型管理流程
下一章预告
下一章将介绍”Cube设计与构建”,包括: - Cube基本概念和架构 - Cube设计原则和策略 - 构建流程和优化 - 增量构建和刷新 - 故障排除和监控
练习与思考
理论练习
- 解释星型模式和雪花模式的区别和适用场景
- 描述Kylin中维度编码的作用和选择原则
- 分析聚合组对Cube存储的影响
实践练习
- 设计一个电商销售分析的数据模型
- 为给定的业务场景选择合适的维度和度量
- 使用提供的工具分析和优化现有模型
思考题
- 如何平衡查询性能和存储成本?
- 在什么情况下应该重新设计数据模型?
- 如何评估数据模型的质量和效果?
2. 维度层次设计
-- 时间维度层次设计
CREATE TABLE dim_time (
time_key INT PRIMARY KEY,
date_value DATE,
year_value INT,
quarter_value INT,
month_value INT,
week_value INT,
day_value INT,
weekday_name VARCHAR(20),
month_name VARCHAR(20),
quarter_name VARCHAR(10),
is_weekend BOOLEAN,
is_holiday BOOLEAN,
fiscal_year INT,
fiscal_quarter INT
);
-- 产品维度层次设计
CREATE TABLE dim_product (
product_key INT PRIMARY KEY,
product_id VARCHAR(50),
product_name VARCHAR(200),
product_category VARCHAR(100),
product_subcategory VARCHAR(100),
brand VARCHAR(100),
supplier VARCHAR(100),
unit_price DECIMAL(10,2),
product_status VARCHAR(20),
created_date DATE,
updated_date DATE
);
-- 地理维度层次设计
CREATE TABLE dim_geography (
geography_key INT PRIMARY KEY,
country_code VARCHAR(10),
country_name VARCHAR(100),
region_code VARCHAR(10),
region_name VARCHAR(100),
state_code VARCHAR(10),
state_name VARCHAR(100),
city_code VARCHAR(10),
city_name VARCHAR(100),
postal_code VARCHAR(20),
latitude DECIMAL(10,6),
longitude DECIMAL(10,6)
);
-- 客户维度层次设计
CREATE TABLE dim_customer (
customer_key INT PRIMARY KEY,
customer_id VARCHAR(50),
customer_name VARCHAR(200),
customer_type VARCHAR(50),
customer_segment VARCHAR(50),
industry VARCHAR(100),
company_size VARCHAR(50),
registration_date DATE,
last_activity_date DATE,
customer_status VARCHAR(20)
);
3. 事实表设计
-- 销售事实表设计
CREATE TABLE fact_sales (
sales_key BIGINT PRIMARY KEY,
time_key INT,
product_key INT,
customer_key INT,
geography_key INT,
channel_key INT,
promotion_key INT,
-- 度量字段
quantity DECIMAL(10,2),
unit_price DECIMAL(10,2),
sales_amount DECIMAL(15,2),
cost_amount DECIMAL(15,2),
profit_amount DECIMAL(15,2),
discount_amount DECIMAL(15,2),
tax_amount DECIMAL(15,2),
shipping_amount DECIMAL(15,2),
-- 退化维度
order_id VARCHAR(50),
invoice_id VARCHAR(50),
sales_rep_id VARCHAR(50),
-- 审计字段
created_date TIMESTAMP,
updated_date TIMESTAMP,
-- 外键约束
FOREIGN KEY (time_key) REFERENCES dim_time(time_key),
FOREIGN KEY (product_key) REFERENCES dim_product(product_key),
FOREIGN KEY (customer_key) REFERENCES dim_customer(customer_key),
FOREIGN KEY (geography_key) REFERENCES dim_geography(geography_key)
);
-- 分区策略
ALTER TABLE fact_sales
PARTITION BY RANGE (time_key) (
PARTITION p2023 VALUES LESS THAN (20240101),
PARTITION p2024q1 VALUES LESS THAN (20240401),
PARTITION p2024q2 VALUES LESS THAN (20240701),
PARTITION p2024q3 VALUES LESS THAN (20241001),
PARTITION p2024q4 VALUES LESS THAN (20250101)
);
5.2.3 数据模型验证工具
#!/usr/bin/env python3
# advanced_model_validator.py - 高级数据模型验证工具
import json
import re
from typing import Dict, List, Tuple, Optional, Set
from dataclasses import dataclass
from enum import Enum
import logging
class ValidationLevel(Enum):
"""验证级别"""
ERROR = "error" # 错误,必须修复
WARNING = "warning" # 警告,建议修复
INFO = "info" # 信息,可选优化
@dataclass
class ValidationResult:
"""验证结果"""
level: ValidationLevel
category: str
title: str
description: str
suggestion: str
affected_objects: List[str]
severity_score: int # 1-10,10最严重
class AdvancedModelValidator:
"""高级数据模型验证器"""
def __init__(self):
self.validation_rules = self._load_validation_rules()
self.logger = logging.getLogger(__name__)
def validate_model(self, model_config: Dict) -> List[ValidationResult]:
"""全面验证数据模型"""
results = []
# 基础结构验证
structure_results = self._validate_structure(model_config)
results.extend(structure_results)
# 维度验证
dimension_results = self._validate_dimensions(model_config)
results.extend(dimension_results)
# 度量验证
measure_results = self._validate_measures(model_config)
results.extend(measure_results)
# 关系验证
relationship_results = self._validate_relationships(model_config)
results.extend(relationship_results)
# 性能验证
performance_results = self._validate_performance(model_config)
results.extend(performance_results)
# 业务逻辑验证
business_results = self._validate_business_logic(model_config)
results.extend(business_results)
# 安全性验证
security_results = self._validate_security(model_config)
results.extend(security_results)
# 按严重程度排序
results.sort(key=lambda x: (x.level.value, -x.severity_score))
return results
def _validate_structure(self, model_config: Dict) -> List[ValidationResult]:
"""验证模型结构"""
results = []
# 检查必需字段
required_fields = ['name', 'fact_table', 'dimensions', 'measures']
for field in required_fields:
if field not in model_config:
results.append(ValidationResult(
level=ValidationLevel.ERROR,
category='structure',
title=f'缺少必需字段: {field}',
description=f'数据模型必须包含{field}字段',
suggestion=f'添加{field}字段到模型配置中',
affected_objects=[field],
severity_score=9
))
# 检查模型名称规范
if 'name' in model_config:
name = model_config['name']
if not re.match(r'^[a-zA-Z][a-zA-Z0-9_]*$', name):
results.append(ValidationResult(
level=ValidationLevel.WARNING,
category='structure',
title='模型名称不规范',
description=f'模型名称"{name}"不符合命名规范',
suggestion='使用字母开头,只包含字母、数字和下划线的名称',
affected_objects=[name],
severity_score=4
))
# 检查描述信息
if 'description' not in model_config or not model_config['description']:
results.append(ValidationResult(
level=ValidationLevel.INFO,
category='structure',
title='缺少模型描述',
description='建议为模型添加描述信息',
suggestion='添加description字段说明模型用途和业务场景',
affected_objects=['description'],
severity_score=2
))
return results
def _validate_dimensions(self, model_config: Dict) -> List[ValidationResult]:
"""验证维度配置"""
results = []
dimensions = model_config.get('dimensions', [])
if not dimensions:
results.append(ValidationResult(
level=ValidationLevel.ERROR,
category='dimensions',
title='缺少维度定义',
description='数据模型必须至少包含一个维度',
suggestion='添加维度定义到dimensions数组中',
affected_objects=['dimensions'],
severity_score=10
))
return results
# 检查维度数量
if len(dimensions) > 20:
results.append(ValidationResult(
level=ValidationLevel.WARNING,
category='dimensions',
title='维度数量过多',
description=f'当前有{len(dimensions)}个维度,可能导致Cube爆炸',
suggestion='考虑合并相关维度或移除低频使用的维度',
affected_objects=[d.get('name', f'dim_{i}') for i, d in enumerate(dimensions)],
severity_score=7
))
# 验证每个维度
dimension_names = set()
for i, dim in enumerate(dimensions):
if isinstance(dim, dict):
dim_name = dim.get('name', f'dimension_{i}')
# 检查重复名称
if dim_name in dimension_names:
results.append(ValidationResult(
level=ValidationLevel.ERROR,
category='dimensions',
title=f'维度名称重复: {dim_name}',
description='维度名称必须唯一',
suggestion=f'修改重复的维度名称{dim_name}',
affected_objects=[dim_name],
severity_score=8
))
else:
dimension_names.add(dim_name)
# 检查维度表配置
if 'table' not in dim:
results.append(ValidationResult(
level=ValidationLevel.ERROR,
category='dimensions',
title=f'维度{dim_name}缺少表配置',
description='每个维度必须指定对应的表',
suggestion=f'为维度{dim_name}添加table配置',
affected_objects=[dim_name],
severity_score=9
))
# 检查编码配置
if 'encoding' not in dim:
results.append(ValidationResult(
level=ValidationLevel.WARNING,
category='dimensions',
title=f'维度{dim_name}缺少编码配置',
description='建议为维度配置合适的编码策略',
suggestion=f'为维度{dim_name}添加encoding配置',
affected_objects=[dim_name],
severity_score=5
))
# 检查高基数维度
cardinality = dim.get('cardinality', 0)
if cardinality > 1000000:
results.append(ValidationResult(
level=ValidationLevel.WARNING,
category='dimensions',
title=f'维度{dim_name}基数过高',
description=f'维度基数{cardinality}可能影响性能',
suggestion='考虑对高基数维度进行分层或编码优化',
affected_objects=[dim_name],
severity_score=6
))
return results
def _validate_measures(self, model_config: Dict) -> List[ValidationResult]:
"""验证度量配置"""
results = []
measures = model_config.get('measures', [])
if not measures:
results.append(ValidationResult(
level=ValidationLevel.ERROR,
category='measures',
title='缺少度量定义',
description='数据模型必须至少包含一个度量',
suggestion='添加度量定义到measures数组中',
affected_objects=['measures'],
severity_score=10
))
return results
# 检查度量数量
if len(measures) > 50:
results.append(ValidationResult(
level=ValidationLevel.WARNING,
category='measures',
title='度量数量过多',
description=f'当前有{len(measures)}个度量,可能影响性能',
suggestion='考虑合并相似度量或使用派生度量',
affected_objects=[m.get('name', f'measure_{i}') for i, m in enumerate(measures)],
severity_score=6
))
# 验证每个度量
measure_names = set()
for i, measure in enumerate(measures):
if isinstance(measure, dict):
measure_name = measure.get('name', f'measure_{i}')
# 检查重复名称
if measure_name in measure_names:
results.append(ValidationResult(
level=ValidationLevel.ERROR,
category='measures',
title=f'度量名称重复: {measure_name}',
description='度量名称必须唯一',
suggestion=f'修改重复的度量名称{measure_name}',
affected_objects=[measure_name],
severity_score=8
))
else:
measure_names.add(measure_name)
# 检查度量类型
measure_type = measure.get('function', '')
valid_functions = ['SUM', 'COUNT', 'MAX', 'MIN', 'COUNT_DISTINCT', 'TOP_N']
if measure_type not in valid_functions:
results.append(ValidationResult(
level=ValidationLevel.ERROR,
category='measures',
title=f'度量{measure_name}函数类型无效',
description=f'度量函数"{measure_type}"不在支持列表中',
suggestion=f'使用有效的度量函数: {valid_functions}',
affected_objects=[measure_name],
severity_score=7
))
# 检查返回类型
return_type = measure.get('return_type', '')
if not return_type:
results.append(ValidationResult(
level=ValidationLevel.WARNING,
category='measures',
title=f'度量{measure_name}缺少返回类型',
description='建议明确指定度量的返回类型',
suggestion=f'为度量{measure_name}添加return_type配置',
affected_objects=[measure_name],
severity_score=4
))
return results
def _validate_relationships(self, model_config: Dict) -> List[ValidationResult]:
"""验证表关系"""
results = []
# 检查连接关系
joins = model_config.get('joins', [])
if not joins:
results.append(ValidationResult(
level=ValidationLevel.WARNING,
category='relationships',
title='缺少表连接定义',
description='多表模型建议明确定义表连接关系',
suggestion='在joins数组中定义表之间的连接关系',
affected_objects=['joins'],
severity_score=5
))
# 验证连接配置
for i, join in enumerate(joins):
if isinstance(join, dict):
# 检查连接类型
join_type = join.get('type', '')
valid_types = ['inner', 'left', 'right', 'full']
if join_type not in valid_types:
results.append(ValidationResult(
level=ValidationLevel.ERROR,
category='relationships',
title=f'连接{i}类型无效',
description=f'连接类型"{join_type}"不在支持列表中',
suggestion=f'使用有效的连接类型: {valid_types}',
affected_objects=[f'join_{i}'],
severity_score=7
))
# 检查连接条件
if 'condition' not in join or not join['condition']:
results.append(ValidationResult(
level=ValidationLevel.ERROR,
category='relationships',
title=f'连接{i}缺少连接条件',
description='每个连接必须指定连接条件',
suggestion=f'为连接{i}添加condition配置',
affected_objects=[f'join_{i}'],
severity_score=8
))
return results
def _validate_performance(self, model_config: Dict) -> List[ValidationResult]:
"""验证性能配置"""
results = []
# 检查分区配置
partition_config = model_config.get('partition_config', {})
if not partition_config:
results.append(ValidationResult(
level=ValidationLevel.WARNING,
category='performance',
title='缺少分区配置',
description='建议配置分区策略以提高查询性能',
suggestion='添加partition_config配置时间分区',
affected_objects=['partition_config'],
severity_score=6
))
# 检查聚合组配置
aggregation_groups = model_config.get('aggregation_groups', [])
if not aggregation_groups:
results.append(ValidationResult(
level=ValidationLevel.WARNING,
category='performance',
title='缺少聚合组配置',
description='建议配置聚合组以控制Cuboid数量',
suggestion='添加aggregation_groups配置优化存储',
affected_objects=['aggregation_groups'],
severity_score=7
))
# 检查Rowkey设计
rowkey_columns = model_config.get('rowkey_columns', [])
if not rowkey_columns:
results.append(ValidationResult(
level=ValidationLevel.WARNING,
category='performance',
title='缺少Rowkey配置',
description='建议配置Rowkey以优化查询性能',
suggestion='添加rowkey_columns配置查询优化',
affected_objects=['rowkey_columns'],
severity_score=6
))
return results
def _validate_business_logic(self, model_config: Dict) -> List[ValidationResult]:
"""验证业务逻辑"""
results = []
# 检查业务规则
business_rules = model_config.get('business_rules', [])
if business_rules:
for i, rule in enumerate(business_rules):
if isinstance(rule, dict):
# 检查规则完整性
if 'condition' not in rule or 'action' not in rule:
results.append(ValidationResult(
level=ValidationLevel.ERROR,
category='business_logic',
title=f'业务规则{i}不完整',
description='业务规则必须包含条件和动作',
suggestion=f'为业务规则{i}添加condition和action',
affected_objects=[f'business_rule_{i}'],
severity_score=6
))
# 检查数据质量规则
quality_rules = model_config.get('quality_rules', [])
if not quality_rules:
results.append(ValidationResult(
level=ValidationLevel.INFO,
category='business_logic',
title='建议添加数据质量规则',
description='数据质量规则有助于确保数据准确性',
suggestion='添加quality_rules配置数据验证',
affected_objects=['quality_rules'],
severity_score=3
))
return results
def _validate_security(self, model_config: Dict) -> List[ValidationResult]:
"""验证安全配置"""
results = []
# 检查访问控制
access_control = model_config.get('access_control', {})
if not access_control:
results.append(ValidationResult(
level=ValidationLevel.INFO,
category='security',
title='建议配置访问控制',
description='访问控制有助于保护敏感数据',
suggestion='添加access_control配置权限管理',
affected_objects=['access_control'],
severity_score=4
))
# 检查敏感数据处理
sensitive_columns = model_config.get('sensitive_columns', [])
if sensitive_columns:
for col in sensitive_columns:
if isinstance(col, dict):
if 'encryption' not in col and 'masking' not in col:
results.append(ValidationResult(
level=ValidationLevel.WARNING,
category='security',
title=f'敏感列{col.get("name", "")}缺少保护措施',
description='敏感数据应配置加密或脱敏处理',
suggestion='为敏感列添加encryption或masking配置',
affected_objects=[col.get('name', '')],
severity_score=7
))
return results
def _load_validation_rules(self) -> Dict:
"""加载验证规则"""
return {
'max_dimensions': 20,
'max_measures': 50,
'max_cardinality': 1000000,
'required_fields': ['name', 'fact_table', 'dimensions', 'measures'],
'valid_join_types': ['inner', 'left', 'right', 'full'],
'valid_measure_functions': ['SUM', 'COUNT', 'MAX', 'MIN', 'COUNT_DISTINCT', 'TOP_N']
}
def generate_validation_report(self, model_config: Dict) -> Dict:
"""生成验证报告"""
results = self.validate_model(model_config)
# 统计结果
stats = {
'total_issues': len(results),
'by_level': {'error': 0, 'warning': 0, 'info': 0},
'by_category': {},
'severity_distribution': {}
}
for result in results:
# 按级别统计
stats['by_level'][result.level.value] += 1
# 按类别统计
if result.category not in stats['by_category']:
stats['by_category'][result.category] = 0
stats['by_category'][result.category] += 1
# 按严重程度统计
severity_range = f"{(result.severity_score-1)//2*2+1}-{(result.severity_score-1)//2*2+2}"
if severity_range not in stats['severity_distribution']:
stats['severity_distribution'][severity_range] = 0
stats['severity_distribution'][severity_range] += 1
# 计算质量分数
quality_score = self._calculate_quality_score(results)
return {
'model_name': model_config.get('name', 'unknown'),
'validation_date': '2024-01-01', # 实际应用中使用当前日期
'quality_score': quality_score,
'statistics': stats,
'results': results,
'recommendations': self._generate_recommendations(results)
}
def _calculate_quality_score(self, results: List[ValidationResult]) -> int:
"""计算模型质量分数(0-100)"""
if not results:
return 100
# 基础分数
base_score = 100
# 根据问题严重程度扣分
for result in results:
if result.level == ValidationLevel.ERROR:
base_score -= result.severity_score * 2
elif result.level == ValidationLevel.WARNING:
base_score -= result.severity_score
else: # INFO
base_score -= result.severity_score * 0.5
return max(0, int(base_score))
def _generate_recommendations(self, results: List[ValidationResult]) -> List[Dict]:
"""生成改进建议"""
recommendations = []
# 按类别分组问题
issues_by_category = {}
for result in results:
if result.category not in issues_by_category:
issues_by_category[result.category] = []
issues_by_category[result.category].append(result)
# 为每个类别生成建议
for category, issues in issues_by_category.items():
error_count = sum(1 for issue in issues if issue.level == ValidationLevel.ERROR)
warning_count = sum(1 for issue in issues if issue.level == ValidationLevel.WARNING)
if error_count > 0:
priority = 'high'
description = f'{category}类别有{error_count}个错误需要立即修复'
elif warning_count > 0:
priority = 'medium'
description = f'{category}类别有{warning_count}个警告建议修复'
else:
priority = 'low'
description = f'{category}类别有一些优化建议'
recommendations.append({
'category': category,
'priority': priority,
'description': description,
'issue_count': len(issues),
'suggested_actions': [issue.suggestion for issue in issues[:3]] # 前3个建议
})
return recommendations
def print_validation_report(self, model_config: Dict):
"""打印验证报告"""
report = self.generate_validation_report(model_config)
print(f"\n=== {report['model_name']} 模型验证报告 ===")
print(f"验证日期: {report['validation_date']}")
print(f"质量分数: {report['quality_score']}/100")
stats = report['statistics']
print(f"\n📊 问题统计:")
print(f" 总问题数: {stats['total_issues']}")
print(f" 错误: {stats['by_level']['error']}")
print(f" 警告: {stats['by_level']['warning']}")
print(f" 信息: {stats['by_level']['info']}")
if stats['total_issues'] == 0:
print("\n✅ 模型验证通过,没有发现问题!")
return
print("\n📋 按类别分布:")
for category, count in stats['by_category'].items():
print(f" {category}: {count}")
print("\n🔍 详细问题:")
for i, result in enumerate(report['results'], 1):
level_icon = {'error': '🔴', 'warning': '🟡', 'info': '🔵'}.get(result.level.value, '⚪')
print(f"\n{level_icon} {i}. [{result.category}] {result.title}")
print(f" 描述: {result.description}")
print(f" 建议: {result.suggestion}")
print(f" 影响对象: {', '.join(result.affected_objects)}")
print(f" 严重程度: {result.severity_score}/10")
print("\n💡 改进建议:")
for rec in report['recommendations']:
priority_icon = {'high': '🔴', 'medium': '🟡', 'low': '🟢'}.get(rec['priority'], '⚪')
print(f"\n{priority_icon} {rec['category'].upper()}")
print(f" {rec['description']}")
print(f" 建议行动:")
for action in rec['suggested_actions']:
print(f" - {action}")
def main():
# 示例使用
validator = AdvancedModelValidator()
# 示例模型配置(包含一些问题)
model_config = {
"name": "sales-model", # 名称不规范
"fact_table": "fact_sales",
"dimensions": [
{"name": "time", "table": "dim_time", "encoding": {"type": "date"}},
{"name": "product", "table": "dim_product"}, # 缺少编码
{"name": "customer", "table": "dim_customer", "cardinality": 2000000}, # 高基数
{"name": "time", "table": "dim_time2"} # 重复名称
],
"measures": [
{"name": "sales_amount", "function": "SUM", "return_type": "decimal"},
{"name": "quantity", "function": "INVALID_FUNC"}, # 无效函数
{"name": "sales_amount", "function": "COUNT"} # 重复名称
],
"joins": [
{"type": "invalid_join", "condition": ""} # 无效类型和空条件
]
# 缺少描述、分区配置等
}
# 生成验证报告
validator.print_validation_report(model_config)
if __name__ == "__main__":
main()
5.3 Cube设计与管理
5.3.1 Cube基础概念
Cube是Kylin中预计算聚合数据的核心概念,它基于数据模型定义,包含了所有可能的维度组合(Cuboid)。
Cube设计原则:
维度选择原则
- 选择查询频率高的维度
- 避免高基数维度组合
- 考虑维度之间的相关性
聚合组设计
- 使用聚合组控制Cuboid数量
- 设置强制维度和层次维度
- 配置联合维度减少组合
分区策略
- 按时间分区提高构建效率
- 合理设置分区粒度
- 考虑数据保留策略
5.3.2 Cube配置示例
{
"uuid": "a24ca905-1fc6-4f67-985c-38fa5aeafd92",
"last_modified": 1578313162000,
"version": "3.0.0.20500",
"name": "sales_cube",
"description": "销售数据分析Cube",
"model_name": "sales_model",
"engine_type": 2,
"storage_type": 2,
"dimensions": [
{
"name": "TIME",
"table": "DIM_TIME",
"column": "TIME_KEY",
"derived": ["YEAR", "QUARTER", "MONTH", "WEEK", "DAY"]
},
{
"name": "PRODUCT",
"table": "DIM_PRODUCT",
"column": "PRODUCT_KEY",
"derived": ["CATEGORY", "SUBCATEGORY", "BRAND"]
},
{
"name": "CUSTOMER",
"table": "DIM_CUSTOMER",
"column": "CUSTOMER_KEY",
"derived": ["SEGMENT", "TYPE", "INDUSTRY"]
},
{
"name": "GEOGRAPHY",
"table": "DIM_GEOGRAPHY",
"column": "GEOGRAPHY_KEY",
"derived": ["COUNTRY", "REGION", "STATE", "CITY"]
}
],
"measures": [
{
"name": "SALES_AMOUNT",
"function": {
"expression": "SUM",
"parameter": {
"type": "column",
"value": "SALES_AMOUNT"
},
"returntype": "decimal(19,4)"
}
},
{
"name": "QUANTITY",
"function": {
"expression": "SUM",
"parameter": {
"type": "column",
"value": "QUANTITY"
},
"returntype": "decimal(19,4)"
}
},
{
"name": "ORDER_COUNT",
"function": {
"expression": "COUNT_DISTINCT",
"parameter": {
"type": "column",
"value": "ORDER_ID"
},
"returntype": "hllc(10)"
}
}
],
"aggregation_groups": [
{
"includes": ["TIME", "PRODUCT", "CUSTOMER", "GEOGRAPHY"],
"select_rule": {
"hierarchy_dims": [
["TIME.YEAR", "TIME.QUARTER", "TIME.MONTH", "TIME.WEEK", "TIME.DAY"],
["PRODUCT.CATEGORY", "PRODUCT.SUBCATEGORY", "PRODUCT.BRAND"],
["GEOGRAPHY.COUNTRY", "GEOGRAPHY.REGION", "GEOGRAPHY.STATE", "GEOGRAPHY.CITY"]
],
"mandatory_dims": ["TIME"],
"joint_dims": [
["CUSTOMER.SEGMENT", "CUSTOMER.TYPE"]
]
}
}
],
"partition_date_start": 0,
"partition_date_end": 3153600000000,
"partition_type": "APPEND",
"partition_date_column": "TIME.DATE_VALUE",
"partition_date_format": "yyyy-MM-dd",
"retention_range": 0,
"auto_merge_time_ranges": [604800000, 2419200000],
"volatile_range": 0,
"notify_list": [],
"status_need_notify": ["ERROR", "DISCARDED", "SUCCEED"],
"override_kylin_properties": {
"kylin.cube.algorithm": "auto",
"kylin.cube.algorithm.layer-or-inmem-threshold": "7",
"kylin.cube.aggrgroup.max-combination": "32768"
}
}
5.3.3 Cube管理工具
#!/usr/bin/env python3
# cube_manager.py - Cube管理工具
import json
import requests
import time
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging
@dataclass
class CubeInfo:
"""Cube信息"""
name: str
status: str
size: int
segments: int
last_build_time: str
model_name: str
project: str
@dataclass
class BuildJob:
"""构建任务信息"""
uuid: str
cube_name: str
job_type: str
status: str
progress: float
start_time: str
duration: int
error_message: Optional[str] = None
class CubeManager:
"""Cube管理器"""
def __init__(self, kylin_host: str, username: str, password: str, project: str = "default"):
self.kylin_host = kylin_host.rstrip('/')
self.username = username
self.password = password
self.project = project
self.session = requests.Session()
self.logger = logging.getLogger(__name__)
# 认证
self._authenticate()
def _authenticate(self):
"""认证登录"""
auth_url = f"{self.kylin_host}/kylin/api/user/authentication"
auth_data = {
"username": self.username,
"password": self.password
}
response = self.session.post(auth_url, json=auth_data)
if response.status_code == 200:
self.logger.info("Kylin认证成功")
else:
raise Exception(f"Kylin认证失败: {response.text}")
def list_cubes(self, project: Optional[str] = None) -> List[CubeInfo]:
"""列出所有Cube"""
project = project or self.project
url = f"{self.kylin_host}/kylin/api/cubes"
params = {"projectName": project}
response = self.session.get(url, params=params)
if response.status_code != 200:
raise Exception(f"获取Cube列表失败: {response.text}")
cubes = []
for cube_data in response.json():
cube_info = CubeInfo(
name=cube_data["name"],
status=cube_data["status"],
size=cube_data.get("size_kb", 0),
segments=len(cube_data.get("segments", [])),
last_build_time=cube_data.get("last_build_time", ""),
model_name=cube_data.get("model_name", ""),
project=cube_data.get("project", "")
)
cubes.append(cube_info)
return cubes
def get_cube_info(self, cube_name: str) -> Dict:
"""获取Cube详细信息"""
url = f"{self.kylin_host}/kylin/api/cubes/{cube_name}"
response = self.session.get(url)
if response.status_code != 200:
raise Exception(f"获取Cube信息失败: {response.text}")
return response.json()
def create_cube(self, cube_config: Dict) -> bool:
"""创建Cube"""
url = f"{self.kylin_host}/kylin/api/cubes"
response = self.session.post(url, json=cube_config)
if response.status_code == 200:
self.logger.info(f"Cube {cube_config['name']} 创建成功")
return True
else:
self.logger.error(f"Cube创建失败: {response.text}")
return False
def build_cube(self, cube_name: str, start_time: str, end_time: str,
build_type: str = "BUILD") -> str:
"""构建Cube"""
url = f"{self.kylin_host}/kylin/api/cubes/{cube_name}/build"
build_data = {
"startTime": int(datetime.strptime(start_time, "%Y-%m-%d").timestamp() * 1000),
"endTime": int(datetime.strptime(end_time, "%Y-%m-%d").timestamp() * 1000),
"buildType": build_type
}
response = self.session.put(url, json=build_data)
if response.status_code == 200:
job_info = response.json()
job_uuid = job_info["uuid"]
self.logger.info(f"Cube构建任务已提交,任务ID: {job_uuid}")
return job_uuid
else:
raise Exception(f"Cube构建失败: {response.text}")
def get_job_status(self, job_uuid: str) -> BuildJob:
"""获取构建任务状态"""
url = f"{self.kylin_host}/kylin/api/jobs/{job_uuid}"
response = self.session.get(url)
if response.status_code != 200:
raise Exception(f"获取任务状态失败: {response.text}")
job_data = response.json()
return BuildJob(
uuid=job_data["uuid"],
cube_name=job_data.get("related_cube", ""),
job_type=job_data["job_type"],
status=job_data["job_status"],
progress=job_data.get("progress", 0.0),
start_time=job_data.get("create_time", ""),
duration=job_data.get("duration", 0),
error_message=job_data.get("error_message")
)
def wait_for_job_completion(self, job_uuid: str, timeout: int = 3600) -> bool:
"""等待任务完成"""
start_time = time.time()
while time.time() - start_time < timeout:
job = self.get_job_status(job_uuid)
if job.status == "FINISHED":
self.logger.info(f"任务 {job_uuid} 完成")
return True
elif job.status in ["ERROR", "DISCARDED"]:
self.logger.error(f"任务 {job_uuid} 失败: {job.error_message}")
return False
self.logger.info(f"任务 {job_uuid} 进度: {job.progress:.1f}%")
time.sleep(30) # 等待30秒后再次检查
self.logger.warning(f"任务 {job_uuid} 超时")
return False
def purge_cube(self, cube_name: str) -> bool:
"""清空Cube数据"""
url = f"{self.kylin_host}/kylin/api/cubes/{cube_name}/purge"
response = self.session.put(url)
if response.status_code == 200:
self.logger.info(f"Cube {cube_name} 数据已清空")
return True
else:
self.logger.error(f"清空Cube失败: {response.text}")
return False
def delete_cube(self, cube_name: str) -> bool:
"""删除Cube"""
url = f"{self.kylin_host}/kylin/api/cubes/{cube_name}"
response = self.session.delete(url)
if response.status_code == 200:
self.logger.info(f"Cube {cube_name} 已删除")
return True
else:
self.logger.error(f"删除Cube失败: {response.text}")
return False
def optimize_cube_segments(self, cube_name: str) -> List[str]:
"""优化Cube段"""
# 获取Cube信息
cube_info = self.get_cube_info(cube_name)
segments = cube_info.get("segments", [])
optimization_jobs = []
# 合并小段
small_segments = [s for s in segments if s.get("size_kb", 0) < 1024 * 1024] # 小于1GB
if len(small_segments) > 1:
# 提交合并任务
merge_job = self._merge_segments(cube_name, small_segments)
if merge_job:
optimization_jobs.append(merge_job)
# 刷新过期段
expired_segments = self._find_expired_segments(segments)
for segment in expired_segments:
refresh_job = self._refresh_segment(cube_name, segment)
if refresh_job:
optimization_jobs.append(refresh_job)
return optimization_jobs
def _merge_segments(self, cube_name: str, segments: List[Dict]) -> Optional[str]:
"""合并段"""
if len(segments) < 2:
return None
url = f"{self.kylin_host}/kylin/api/cubes/{cube_name}/merge"
# 找到时间范围
start_time = min(s["date_range_start"] for s in segments)
end_time = max(s["date_range_end"] for s in segments)
merge_data = {
"startTime": start_time,
"endTime": end_time
}
response = self.session.put(url, json=merge_data)
if response.status_code == 200:
job_info = response.json()
return job_info["uuid"]
else:
self.logger.error(f"段合并失败: {response.text}")
return None
def _find_expired_segments(self, segments: List[Dict]) -> List[Dict]:
"""查找过期段"""
expired = []
current_time = int(time.time() * 1000)
for segment in segments:
# 如果段超过7天没有更新,认为可能需要刷新
last_build_time = segment.get("last_build_time", 0)
if current_time - last_build_time > 7 * 24 * 3600 * 1000:
expired.append(segment)
return expired
def _refresh_segment(self, cube_name: str, segment: Dict) -> Optional[str]:
"""刷新段"""
url = f"{self.kylin_host}/kylin/api/cubes/{cube_name}/build"
refresh_data = {
"startTime": segment["date_range_start"],
"endTime": segment["date_range_end"],
"buildType": "REFRESH"
}
response = self.session.put(url, json=refresh_data)
if response.status_code == 200:
job_info = response.json()
return job_info["uuid"]
else:
self.logger.error(f"段刷新失败: {response.text}")
return None
def generate_cube_report(self, cube_name: str) -> Dict:
"""生成Cube报告"""
cube_info = self.get_cube_info(cube_name)
segments = cube_info.get("segments", [])
# 计算统计信息
total_size = sum(s.get("size_kb", 0) for s in segments)
total_rows = sum(s.get("input_records", 0) for s in segments)
# 分析段分布
segment_analysis = {
"total_segments": len(segments),
"total_size_gb": total_size / (1024 * 1024),
"total_rows": total_rows,
"avg_segment_size_mb": (total_size / len(segments) / 1024) if segments else 0,
"size_distribution": self._analyze_segment_sizes(segments),
"time_coverage": self._analyze_time_coverage(segments)
}
# 性能分析
performance_analysis = {
"compression_ratio": self._calculate_compression_ratio(cube_info),
"build_efficiency": self._analyze_build_efficiency(segments),
"query_performance": self._estimate_query_performance(cube_info)
}
# 优化建议
optimization_suggestions = self._generate_optimization_suggestions(cube_info, segment_analysis)
return {
"cube_name": cube_name,
"report_time": datetime.now().isoformat(),
"basic_info": {
"status": cube_info["status"],
"model_name": cube_info.get("model_name", ""),
"dimensions": len(cube_info.get("dimensions", [])),
"measures": len(cube_info.get("measures", []))
},
"segment_analysis": segment_analysis,
"performance_analysis": performance_analysis,
"optimization_suggestions": optimization_suggestions
}
def _analyze_segment_sizes(self, segments: List[Dict]) -> Dict:
"""分析段大小分布"""
if not segments:
return {}
sizes = [s.get("size_kb", 0) for s in segments]
sizes.sort()
return {
"min_size_mb": min(sizes) / 1024,
"max_size_mb": max(sizes) / 1024,
"median_size_mb": sizes[len(sizes)//2] / 1024,
"small_segments": len([s for s in sizes if s < 100 * 1024]), # <100MB
"large_segments": len([s for s in sizes if s > 10 * 1024 * 1024]) # >10GB
}
def _analyze_time_coverage(self, segments: List[Dict]) -> Dict:
"""分析时间覆盖范围"""
if not segments:
return {}
start_times = [s.get("date_range_start", 0) for s in segments]
end_times = [s.get("date_range_end", 0) for s in segments]
return {
"earliest_date": datetime.fromtimestamp(min(start_times)/1000).isoformat(),
"latest_date": datetime.fromtimestamp(max(end_times)/1000).isoformat(),
"total_days": (max(end_times) - min(start_times)) / (24 * 3600 * 1000),
"gaps": self._find_time_gaps(segments)
}
def _find_time_gaps(self, segments: List[Dict]) -> List[Dict]:
"""查找时间间隙"""
if len(segments) < 2:
return []
# 按时间排序
sorted_segments = sorted(segments, key=lambda x: x.get("date_range_start", 0))
gaps = []
for i in range(len(sorted_segments) - 1):
current_end = sorted_segments[i].get("date_range_end", 0)
next_start = sorted_segments[i + 1].get("date_range_start", 0)
if next_start > current_end:
gap_days = (next_start - current_end) / (24 * 3600 * 1000)
gaps.append({
"start": datetime.fromtimestamp(current_end/1000).isoformat(),
"end": datetime.fromtimestamp(next_start/1000).isoformat(),
"days": gap_days
})
return gaps
def _calculate_compression_ratio(self, cube_info: Dict) -> float:
"""计算压缩比"""
segments = cube_info.get("segments", [])
if not segments:
return 0.0
total_input_records = sum(s.get("input_records", 0) for s in segments)
total_size_kb = sum(s.get("size_kb", 0) for s in segments)
if total_input_records == 0:
return 0.0
# 估算原始数据大小(假设每行平均1KB)
estimated_raw_size_kb = total_input_records
return estimated_raw_size_kb / total_size_kb if total_size_kb > 0 else 0.0
def _analyze_build_efficiency(self, segments: List[Dict]) -> Dict:
"""分析构建效率"""
if not segments:
return {}
build_times = []
for segment in segments:
start_time = segment.get("create_time_utc", 0)
end_time = segment.get("last_build_time", 0)
if end_time > start_time:
build_times.append(end_time - start_time)
if not build_times:
return {}
return {
"avg_build_time_minutes": sum(build_times) / len(build_times) / (60 * 1000),
"max_build_time_minutes": max(build_times) / (60 * 1000),
"min_build_time_minutes": min(build_times) / (60 * 1000)
}
def _estimate_query_performance(self, cube_info: Dict) -> Dict:
"""估算查询性能"""
dimensions = cube_info.get("dimensions", [])
measures = cube_info.get("measures", [])
aggregation_groups = cube_info.get("aggregation_groups", [])
# 估算Cuboid数量
total_cuboids = 1
for agg_group in aggregation_groups:
includes = agg_group.get("includes", [])
total_cuboids *= 2 ** len(includes)
return {
"estimated_cuboids": total_cuboids,
"dimension_count": len(dimensions),
"measure_count": len(measures),
"complexity_score": min(10, total_cuboids / 1000) # 1-10分
}
def _generate_optimization_suggestions(self, cube_info: Dict, segment_analysis: Dict) -> List[Dict]:
"""生成优化建议"""
suggestions = []
# 段数量建议
if segment_analysis["total_segments"] > 100:
suggestions.append({
"category": "segments",
"priority": "high",
"title": "段数量过多",
"description": f"当前有{segment_analysis['total_segments']}个段,建议合并",
"action": "执行段合并操作,将小段合并为大段"
})
# 段大小建议
size_dist = segment_analysis.get("size_distribution", {})
if size_dist.get("small_segments", 0) > 10:
suggestions.append({
"category": "segments",
"priority": "medium",
"title": "存在过多小段",
"description": f"有{size_dist['small_segments']}个小于100MB的段",
"action": "合并小段以提高查询性能"
})
# 时间间隙建议
time_coverage = segment_analysis.get("time_coverage", {})
gaps = time_coverage.get("gaps", [])
if gaps:
suggestions.append({
"category": "data",
"priority": "medium",
"title": "存在时间间隙",
"description": f"发现{len(gaps)}个时间间隙",
"action": "检查数据源,补充缺失的时间段数据"
})
return suggestions
def print_cube_report(self, cube_name: str):
"""打印Cube报告"""
report = self.generate_cube_report(cube_name)
print(f"\n=== {cube_name} Cube分析报告 ===")
print(f"报告时间: {report['report_time']}")
# 基础信息
basic = report['basic_info']
print(f"\n📊 基础信息:")
print(f" 状态: {basic['status']}")
print(f" 数据模型: {basic['model_name']}")
print(f" 维度数量: {basic['dimensions']}")
print(f" 度量数量: {basic['measures']}")
# 段分析
segment = report['segment_analysis']
print(f"\n📈 段分析:")
print(f" 总段数: {segment['total_segments']}")
print(f" 总大小: {segment['total_size_gb']:.2f} GB")
print(f" 总行数: {segment['total_rows']:,}")
print(f" 平均段大小: {segment['avg_segment_size_mb']:.2f} MB")
# 性能分析
performance = report['performance_analysis']
print(f"\n⚡ 性能分析:")
print(f" 压缩比: {performance['compression_ratio']:.2f}")
print(f" 复杂度评分: {performance['query_performance']['complexity_score']:.1f}/10")
print(f" 预估Cuboid数: {performance['query_performance']['estimated_cuboids']:,}")
# 优化建议
suggestions = report['optimization_suggestions']
if suggestions:
print(f"\n💡 优化建议:")
for i, suggestion in enumerate(suggestions, 1):
priority_icon = {'high': '🔴', 'medium': '🟡', 'low': '🟢'}.get(suggestion['priority'], '⚪')
print(f"\n{priority_icon} {i}. {suggestion['title']}")
print(f" {suggestion['description']}")
print(f" 建议: {suggestion['action']}")
else:
print(f"\n✅ 暂无优化建议,Cube状态良好")
def main():
# 示例使用
manager = CubeManager(
kylin_host="http://localhost:7070",
username="ADMIN",
password="KYLIN",
project="learn_kylin"
)
# 列出所有Cube
cubes = manager.list_cubes()
print(f"找到 {len(cubes)} 个Cube:")
for cube in cubes:
print(f" - {cube.name} ({cube.status})")
# 生成第一个Cube的报告
if cubes:
manager.print_cube_report(cubes[0].name)
if __name__ == "__main__":
main()
5.3.4 Cube性能优化
1. 聚合组优化
#!/usr/bin/env python3
# aggregation_group_optimizer.py - 聚合组优化工具
import itertools
from typing import List, Dict, Set, Tuple
import math
class AggregationGroupOptimizer:
"""聚合组优化器"""
def __init__(self):
self.max_combination_limit = 32768 # Kylin默认限制
def optimize_aggregation_groups(self, dimensions: List[str],
query_patterns: List[Dict]) -> List[Dict]:
"""优化聚合组配置"""
# 分析查询模式
dimension_usage = self._analyze_dimension_usage(query_patterns)
dimension_correlations = self._analyze_dimension_correlations(query_patterns)
# 识别强制维度
mandatory_dims = self._identify_mandatory_dimensions(dimension_usage)
# 识别层次维度
hierarchy_dims = self._identify_hierarchy_dimensions(dimensions, query_patterns)
# 识别联合维度
joint_dims = self._identify_joint_dimensions(dimension_correlations)
# 生成聚合组
aggregation_groups = self._generate_aggregation_groups(
dimensions, mandatory_dims, hierarchy_dims, joint_dims
)
# 验证和优化
optimized_groups = self._validate_and_optimize(aggregation_groups)
return optimized_groups
def _analyze_dimension_usage(self, query_patterns: List[Dict]) -> Dict[str, float]:
"""分析维度使用频率"""
usage_count = {}
total_queries = len(query_patterns)
for pattern in query_patterns:
dimensions = pattern.get('dimensions', [])
frequency = pattern.get('frequency', 1)
for dim in dimensions:
if dim not in usage_count:
usage_count[dim] = 0
usage_count[dim] += frequency
# 计算使用率
usage_rate = {}
for dim, count in usage_count.items():
usage_rate[dim] = count / total_queries if total_queries > 0 else 0
return usage_rate
def _analyze_dimension_correlations(self, query_patterns: List[Dict]) -> Dict[Tuple[str, str], float]:
"""分析维度相关性"""
correlations = {}
for pattern in query_patterns:
dimensions = pattern.get('dimensions', [])
frequency = pattern.get('frequency', 1)
# 计算维度对的共现频率
for i, dim1 in enumerate(dimensions):
for dim2 in dimensions[i+1:]:
pair = tuple(sorted([dim1, dim2]))
if pair not in correlations:
correlations[pair] = 0
correlations[pair] += frequency
# 归一化相关性分数
max_freq = max(correlations.values()) if correlations else 1
for pair in correlations:
correlations[pair] = correlations[pair] / max_freq
return correlations
def _identify_mandatory_dimensions(self, dimension_usage: Dict[str, float]) -> List[str]:
"""识别强制维度(使用率>80%)"""
mandatory = []
for dim, usage_rate in dimension_usage.items():
if usage_rate > 0.8:
mandatory.append(dim)
return mandatory
def _identify_hierarchy_dimensions(self, dimensions: List[str],
query_patterns: List[Dict]) -> List[List[str]]:
"""识别层次维度"""
hierarchies = []
# 基于命名模式识别层次
time_hierarchy = [d for d in dimensions if any(t in d.lower() for t in ['year', 'quarter', 'month', 'week', 'day'])]
if len(time_hierarchy) > 1:
# 按时间粒度排序
time_order = ['year', 'quarter', 'month', 'week', 'day']
time_hierarchy.sort(key=lambda x: min([time_order.index(t) for t in time_order if t in x.lower()] + [999]))
hierarchies.append(time_hierarchy)
geo_hierarchy = [d for d in dimensions if any(g in d.lower() for g in ['country', 'region', 'state', 'city'])]
if len(geo_hierarchy) > 1:
# 按地理层级排序
geo_order = ['country', 'region', 'state', 'city']
geo_hierarchy.sort(key=lambda x: min([geo_order.index(g) for g in geo_order if g in x.lower()] + [999]))
hierarchies.append(geo_hierarchy)
product_hierarchy = [d for d in dimensions if any(p in d.lower() for p in ['category', 'subcategory', 'brand', 'product'])]
if len(product_hierarchy) > 1:
# 按产品层级排序
product_order = ['category', 'subcategory', 'brand', 'product']
product_hierarchy.sort(key=lambda x: min([product_order.index(p) for p in product_order if p in x.lower()] + [999]))
hierarchies.append(product_hierarchy)
return hierarchies
def _identify_joint_dimensions(self, correlations: Dict[Tuple[str, str], float]) -> List[List[str]]:
"""识别联合维度(相关性>0.8)"""
joint_dims = []
high_correlation_pairs = [(pair, score) for pair, score in correlations.items() if score > 0.8]
# 将高相关性的维度对组合成联合维度组
processed_dims = set()
for pair, score in high_correlation_pairs:
dim1, dim2 = pair
if dim1 not in processed_dims and dim2 not in processed_dims:
joint_dims.append([dim1, dim2])
processed_dims.add(dim1)
processed_dims.add(dim2)
return joint_dims
def _generate_aggregation_groups(self, dimensions: List[str],
mandatory_dims: List[str],
hierarchy_dims: List[List[str]],
joint_dims: List[List[str]]) -> List[Dict]:
"""生成聚合组配置"""
aggregation_groups = []
# 主聚合组:包含所有维度
main_group = {
"includes": dimensions,
"select_rule": {
"mandatory_dims": mandatory_dims,
"hierarchy_dims": hierarchy_dims,
"joint_dims": joint_dims
}
}
aggregation_groups.append(main_group)
# 如果维度太多,创建额外的聚合组
if len(dimensions) > 15:
# 按功能分组
time_dims = [d for d in dimensions if any(t in d.lower() for t in ['time', 'date', 'year', 'month', 'day'])]
product_dims = [d for d in dimensions if any(p in d.lower() for p in ['product', 'category', 'brand'])]
customer_dims = [d for d in dimensions if any(c in d.lower() for c in ['customer', 'client', 'user'])]
geo_dims = [d for d in dimensions if any(g in d.lower() for g in ['geo', 'location', 'country', 'city'])]
# 时间+产品聚合组
if time_dims and product_dims:
time_product_group = {
"includes": time_dims + product_dims + mandatory_dims,
"select_rule": {
"mandatory_dims": [d for d in mandatory_dims if d in time_dims + product_dims],
"hierarchy_dims": [h for h in hierarchy_dims if any(d in time_dims + product_dims for d in h)]
}
}
aggregation_groups.append(time_product_group)
# 时间+客户聚合组
if time_dims and customer_dims:
time_customer_group = {
"includes": time_dims + customer_dims + mandatory_dims,
"select_rule": {
"mandatory_dims": [d for d in mandatory_dims if d in time_dims + customer_dims],
"hierarchy_dims": [h for h in hierarchy_dims if any(d in time_dims + customer_dims for d in h)]
}
}
aggregation_groups.append(time_customer_group)
return aggregation_groups
def _validate_and_optimize(self, aggregation_groups: List[Dict]) -> List[Dict]:
"""验证和优化聚合组"""
optimized_groups = []
for group in aggregation_groups:
# 计算Cuboid数量
cuboid_count = self._estimate_cuboid_count(group)
if cuboid_count > self.max_combination_limit:
# 如果超过限制,进行优化
optimized_group = self._reduce_cuboid_count(group)
optimized_groups.append(optimized_group)
else:
optimized_groups.append(group)
return optimized_groups
def _estimate_cuboid_count(self, aggregation_group: Dict) -> int:
"""估算Cuboid数量"""
includes = aggregation_group.get("includes", [])
select_rule = aggregation_group.get("select_rule", {})
# 基础组合数
base_combinations = 2 ** len(includes)
# 考虑层次维度的减少
hierarchy_dims = select_rule.get("hierarchy_dims", [])
for hierarchy in hierarchy_dims:
if len(hierarchy) > 1:
# 层次维度减少组合数
hierarchy_reduction = 2 ** len(hierarchy) - len(hierarchy) - 1
base_combinations -= hierarchy_reduction
# 考虑联合维度的减少
joint_dims = select_rule.get("joint_dims", [])
for joint in joint_dims:
if len(joint) > 1:
# 联合维度减少组合数
joint_reduction = 2 ** len(joint) - 2
base_combinations -= joint_reduction
return max(1, base_combinations)
def _reduce_cuboid_count(self, aggregation_group: Dict) -> Dict:
"""减少Cuboid数量"""
includes = aggregation_group.get("includes", [])
select_rule = aggregation_group.get("select_rule", {})
# 增加更多联合维度
existing_joints = select_rule.get("joint_dims", [])
# 将相关维度组合成联合维度
remaining_dims = [d for d in includes if not any(d in joint for joint in existing_joints)]
# 每3个维度组成一个联合维度组
new_joints = []
for i in range(0, len(remaining_dims), 3):
joint_group = remaining_dims[i:i+3]
if len(joint_group) > 1:
new_joints.append(joint_group)
# 更新聚合组
optimized_group = aggregation_group.copy()
optimized_group["select_rule"]["joint_dims"] = existing_joints + new_joints
return optimized_group
def generate_optimization_report(self, dimensions: List[str],
query_patterns: List[Dict]) -> Dict:
"""生成优化报告"""
# 分析当前状态
dimension_usage = self._analyze_dimension_usage(query_patterns)
correlations = self._analyze_dimension_correlations(query_patterns)
# 生成优化建议
optimized_groups = self.optimize_aggregation_groups(dimensions, query_patterns)
# 计算优化效果
original_cuboids = 2 ** len(dimensions)
optimized_cuboids = sum(self._estimate_cuboid_count(group) for group in optimized_groups)
reduction_ratio = (original_cuboids - optimized_cuboids) / original_cuboids if original_cuboids > 0 else 0
return {
"analysis": {
"total_dimensions": len(dimensions),
"total_query_patterns": len(query_patterns),
"dimension_usage": dimension_usage,
"high_correlation_pairs": [(pair, score) for pair, score in correlations.items() if score > 0.7]
},
"optimization": {
"original_cuboids": original_cuboids,
"optimized_cuboids": optimized_cuboids,
"reduction_ratio": reduction_ratio,
"aggregation_groups": optimized_groups
},
"recommendations": self._generate_recommendations(dimension_usage, correlations, reduction_ratio)
}
def _generate_recommendations(self, dimension_usage: Dict[str, float],
correlations: Dict[Tuple[str, str], float],
reduction_ratio: float) -> List[Dict]:
"""生成优化建议"""
recommendations = []
# 低使用率维度建议
low_usage_dims = [dim for dim, usage in dimension_usage.items() if usage < 0.1]
if low_usage_dims:
recommendations.append({
"category": "dimensions",
"priority": "medium",
"title": "移除低使用率维度",
"description": f"以下维度使用率低于10%: {', '.join(low_usage_dims)}",
"action": "考虑从模型中移除这些维度以减少Cube复杂度"
})
# 高相关性维度建议
high_corr_pairs = [(pair, score) for pair, score in correlations.items() if score > 0.9]
if high_corr_pairs:
recommendations.append({
"category": "aggregation",
"priority": "high",
"title": "配置联合维度",
"description": f"发现{len(high_corr_pairs)}对高相关性维度",
"action": "将高相关性维度配置为联合维度以减少Cuboid数量"
})
# 优化效果建议
if reduction_ratio > 0.8:
recommendations.append({
"category": "performance",
"priority": "high",
"title": "优化效果显著",
"description": f"Cuboid数量减少了{reduction_ratio:.1%}",
"action": "应用建议的聚合组配置以提高性能"
})
elif reduction_ratio < 0.3:
recommendations.append({
"category": "performance",
"priority": "medium",
"title": "优化空间有限",
"description": f"Cuboid数量仅减少了{reduction_ratio:.1%}",
"action": "考虑重新设计数据模型或调整查询模式"
})
return recommendations
def main():
# 示例使用
optimizer = AggregationGroupOptimizer()
# 示例维度
dimensions = [
"TIME_YEAR", "TIME_QUARTER", "TIME_MONTH", "TIME_WEEK", "TIME_DAY",
"PRODUCT_CATEGORY", "PRODUCT_SUBCATEGORY", "PRODUCT_BRAND",
"CUSTOMER_SEGMENT", "CUSTOMER_TYPE", "CUSTOMER_INDUSTRY",
"GEO_COUNTRY", "GEO_REGION", "GEO_STATE", "GEO_CITY",
"CHANNEL", "PROMOTION"
]
# 示例查询模式
query_patterns = [
{"dimensions": ["TIME_YEAR", "TIME_QUARTER", "PRODUCT_CATEGORY"], "frequency": 50},
{"dimensions": ["TIME_MONTH", "CUSTOMER_SEGMENT", "GEO_COUNTRY"], "frequency": 30},
{"dimensions": ["TIME_DAY", "PRODUCT_BRAND", "CHANNEL"], "frequency": 20},
{"dimensions": ["CUSTOMER_TYPE", "CUSTOMER_INDUSTRY"], "frequency": 40},
{"dimensions": ["GEO_REGION", "GEO_STATE", "GEO_CITY"], "frequency": 15}
]
# 生成优化报告
report = optimizer.generate_optimization_report(dimensions, query_patterns)
print("=== 聚合组优化报告 ===")
print(f"维度数量: {report['analysis']['total_dimensions']}")
print(f"查询模式数量: {report['analysis']['total_query_patterns']}")
print(f"原始Cuboid数量: {report['optimization']['original_cuboids']:,}")
print(f"优化后Cuboid数量: {report['optimization']['optimized_cuboids']:,}")
print(f"减少比例: {report['optimization']['reduction_ratio']:.1%}")
print("\n优化建议:")
for i, rec in enumerate(report['recommendations'], 1):
print(f"{i}. {rec['title']} ({rec['priority']})")
print(f" {rec['description']}")
print(f" {rec['action']}")
if __name__ == "__main__":
main()
2. 存储优化
#!/usr/bin/env python3
# storage_optimizer.py - 存储优化工具
import json
import os
from typing import Dict, List, Tuple
from dataclasses import dataclass
import math
@dataclass
class StorageMetrics:
"""存储指标"""
total_size_gb: float
segment_count: int
avg_segment_size_mb: float
compression_ratio: float
storage_efficiency: float
class StorageOptimizer:
"""存储优化器"""
def __init__(self):
self.optimal_segment_size_mb = 1024 # 1GB
self.max_segments_per_cube = 50
self.min_compression_ratio = 5.0
def analyze_storage_efficiency(self, cube_info: Dict) -> StorageMetrics:
"""分析存储效率"""
segments = cube_info.get("segments", [])
if not segments:
return StorageMetrics(0, 0, 0, 0, 0)
# 计算基础指标
total_size_kb = sum(s.get("size_kb", 0) for s in segments)
total_size_gb = total_size_kb / (1024 * 1024)
segment_count = len(segments)
avg_segment_size_mb = (total_size_kb / segment_count / 1024) if segment_count > 0 else 0
# 计算压缩比
total_input_records = sum(s.get("input_records", 0) for s in segments)
estimated_raw_size_kb = total_input_records # 假设每行1KB
compression_ratio = estimated_raw_size_kb / total_size_kb if total_size_kb > 0 else 0
# 计算存储效率(0-100分)
storage_efficiency = self._calculate_storage_efficiency(
segment_count, avg_segment_size_mb, compression_ratio
)
return StorageMetrics(
total_size_gb=total_size_gb,
segment_count=segment_count,
avg_segment_size_mb=avg_segment_size_mb,
compression_ratio=compression_ratio,
storage_efficiency=storage_efficiency
)
def _calculate_storage_efficiency(self, segment_count: int,
avg_segment_size_mb: float,
compression_ratio: float) -> float:
"""计算存储效率评分"""
score = 100.0
# 段数量评分(段数过多扣分)
if segment_count > self.max_segments_per_cube:
score -= min(30, (segment_count - self.max_segments_per_cube) * 2)
# 段大小评分(偏离最优大小扣分)
size_deviation = abs(avg_segment_size_mb - self.optimal_segment_size_mb) / self.optimal_segment_size_mb
score -= min(25, size_deviation * 50)
# 压缩比评分(压缩比低扣分)
if compression_ratio < self.min_compression_ratio:
score -= min(25, (self.min_compression_ratio - compression_ratio) * 5)
return max(0, score)
def generate_storage_optimization_plan(self, cube_info: Dict) -> Dict:
"""生成存储优化方案"""
metrics = self.analyze_storage_efficiency(cube_info)
segments = cube_info.get("segments", [])
optimization_plan = {
"current_metrics": metrics,
"optimizations": [],
"expected_improvements": {}
}
# 段合并优化
if metrics.segment_count > self.max_segments_per_cube:
merge_plan = self._plan_segment_merging(segments)
optimization_plan["optimizations"].append(merge_plan)
# 压缩优化
if metrics.compression_ratio < self.min_compression_ratio:
compression_plan = self._plan_compression_optimization(cube_info)
optimization_plan["optimizations"].append(compression_plan)
# 分区优化
partition_plan = self._plan_partition_optimization(cube_info)
if partition_plan:
optimization_plan["optimizations"].append(partition_plan)
# 计算预期改进
optimization_plan["expected_improvements"] = self._calculate_expected_improvements(
metrics, optimization_plan["optimizations"]
)
return optimization_plan
def _plan_segment_merging(self, segments: List[Dict]) -> Dict:
"""规划段合并"""
# 按大小排序,优先合并小段
small_segments = [s for s in segments if s.get("size_kb", 0) < 500 * 1024] # <500MB
merge_groups = []
current_group = []
current_size = 0
for segment in sorted(small_segments, key=lambda x: x.get("size_kb", 0)):
segment_size = segment.get("size_kb", 0)
if current_size + segment_size <= self.optimal_segment_size_mb * 1024:
current_group.append(segment)
current_size += segment_size
else:
if len(current_group) > 1:
merge_groups.append(current_group)
current_group = [segment]
current_size = segment_size
if len(current_group) > 1:
merge_groups.append(current_group)
return {
"type": "segment_merging",
"description": f"合并{len(merge_groups)}组小段",
"details": {
"merge_groups": len(merge_groups),
"segments_to_merge": sum(len(group) for group in merge_groups),
"estimated_size_reduction_gb": self._estimate_merge_size_reduction(merge_groups)
},
"priority": "high" if len(merge_groups) > 5 else "medium"
}
def _plan_compression_optimization(self, cube_info: Dict) -> Dict:
"""规划压缩优化"""
current_algorithm = cube_info.get("override_kylin_properties", {}).get("kylin.cube.algorithm", "auto")
recommendations = []
# 推荐更好的压缩算法
if current_algorithm != "inmem":
recommendations.append("使用内存算法提高压缩效率")
# 推荐调整编码方式
dimensions = cube_info.get("dimensions", [])
for dim in dimensions:
if dim.get("encoding") != "dict":
recommendations.append(f"为维度{dim['name']}使用字典编码")
return {
"type": "compression_optimization",
"description": "优化压缩配置",
"details": {
"current_algorithm": current_algorithm,
"recommendations": recommendations,
"estimated_compression_improvement": "20-30%"
},
"priority": "medium"
}
def _plan_partition_optimization(self, cube_info: Dict) -> Dict:
"""规划分区优化"""
partition_column = cube_info.get("partition_date_column")
segments = cube_info.get("segments", [])
if not partition_column or not segments:
return None
# 分析分区分布
partition_analysis = self._analyze_partition_distribution(segments)
recommendations = []
# 检查分区大小不均
if partition_analysis["size_variance"] > 0.5:
recommendations.append("调整分区策略以平衡分区大小")
# 检查分区数量
if partition_analysis["partition_count"] > 100:
recommendations.append("考虑使用更粗粒度的分区")
if not recommendations:
return None
return {
"type": "partition_optimization",
"description": "优化分区策略",
"details": {
"current_partitions": partition_analysis["partition_count"],
"size_variance": partition_analysis["size_variance"],
"recommendations": recommendations
},
"priority": "low"
}
def _analyze_partition_distribution(self, segments: List[Dict]) -> Dict:
"""分析分区分布"""
if not segments:
return {"partition_count": 0, "size_variance": 0}
sizes = [s.get("size_kb", 0) for s in segments]
avg_size = sum(sizes) / len(sizes)
variance = sum((size - avg_size) ** 2 for size in sizes) / len(sizes)
size_variance = (variance ** 0.5) / avg_size if avg_size > 0 else 0
return {
"partition_count": len(segments),
"size_variance": size_variance
}
def _estimate_merge_size_reduction(self, merge_groups: List[List[Dict]]) -> float:
"""估算合并后的大小减少"""
total_reduction = 0
for group in merge_groups:
if len(group) > 1:
group_size = sum(s.get("size_kb", 0) for s in group)
# 假设合并后减少5%的存储空间
reduction = group_size * 0.05
total_reduction += reduction
return total_reduction / (1024 * 1024) # 转换为GB
def _calculate_expected_improvements(self, current_metrics: StorageMetrics,
optimizations: List[Dict]) -> Dict:
"""计算预期改进"""
improvements = {
"storage_size_reduction_gb": 0,
"segment_count_reduction": 0,
"efficiency_score_improvement": 0
}
for opt in optimizations:
if opt["type"] == "segment_merging":
improvements["storage_size_reduction_gb"] += opt["details"].get("estimated_size_reduction_gb", 0)
improvements["segment_count_reduction"] += opt["details"].get("segments_to_merge", 0) // 2
elif opt["type"] == "compression_optimization":
improvements["storage_size_reduction_gb"] += current_metrics.total_size_gb * 0.25
# 计算效率评分改进
new_segment_count = current_metrics.segment_count - improvements["segment_count_reduction"]
new_compression_ratio = current_metrics.compression_ratio * 1.25 # 假设压缩比提升25%
new_efficiency = self._calculate_storage_efficiency(
new_segment_count, current_metrics.avg_segment_size_mb, new_compression_ratio
)
improvements["efficiency_score_improvement"] = new_efficiency - current_metrics.storage_efficiency
return improvements
def print_storage_report(self, cube_name: str, cube_info: Dict):
"""打印存储报告"""
metrics = self.analyze_storage_efficiency(cube_info)
optimization_plan = self.generate_storage_optimization_plan(cube_info)
print(f"\n=== {cube_name} 存储分析报告 ===")
# 当前指标
print(f"\n📊 当前存储指标:")
print(f" 总大小: {metrics.total_size_gb:.2f} GB")
print(f" 段数量: {metrics.segment_count}")
print(f" 平均段大小: {metrics.avg_segment_size_mb:.2f} MB")
print(f" 压缩比: {metrics.compression_ratio:.2f}")
print(f" 存储效率: {metrics.storage_efficiency:.1f}/100")
# 优化建议
optimizations = optimization_plan["optimizations"]
if optimizations:
print(f"\n🔧 优化建议:")
for i, opt in enumerate(optimizations, 1):
priority_icon = {'high': '🔴', 'medium': '🟡', 'low': '🟢'}.get(opt['priority'], '⚪')
print(f"\n{priority_icon} {i}. {opt['description']} ({opt['priority']})")
if opt["type"] == "segment_merging":
details = opt["details"]
print(f" 合并组数: {details['merge_groups']}")
print(f" 涉及段数: {details['segments_to_merge']}")
print(f" 预计减少: {details['estimated_size_reduction_gb']:.2f} GB")
elif opt["type"] == "compression_optimization":
details = opt["details"]
print(f" 当前算法: {details['current_algorithm']}")
print(f" 预计改进: {details['estimated_compression_improvement']}")
for rec in details["recommendations"]:
print(f" - {rec}")
# 预期改进
improvements = optimization_plan["expected_improvements"]
if any(improvements.values()):
print(f"\n📈 预期改进:")
if improvements["storage_size_reduction_gb"] > 0:
print(f" 存储减少: {improvements['storage_size_reduction_gb']:.2f} GB")
if improvements["segment_count_reduction"] > 0:
print(f" 段数减少: {improvements['segment_count_reduction']}")
if improvements["efficiency_score_improvement"] > 0:
print(f" 效率提升: +{improvements['efficiency_score_improvement']:.1f}分")
def main():
# 示例使用
optimizer = StorageOptimizer()
# 示例Cube信息
cube_info = {
"name": "sales_cube",
"segments": [
{"size_kb": 102400, "input_records": 100000, "date_range_start": 1609459200000}, # 100MB
{"size_kb": 204800, "input_records": 200000, "date_range_start": 1612137600000}, # 200MB
{"size_kb": 51200, "input_records": 50000, "date_range_start": 1614556800000}, # 50MB
{"size_kb": 1048576, "input_records": 1000000, "date_range_start": 1617235200000}, # 1GB
],
"dimensions": [
{"name": "TIME", "encoding": "date"},
{"name": "PRODUCT", "encoding": "dict"},
{"name": "CUSTOMER", "encoding": "int"}
],
"override_kylin_properties": {
"kylin.cube.algorithm": "auto"
},
"partition_date_column": "TIME.DATE_VALUE"
}
optimizer.print_storage_report("sales_cube", cube_info)
if __name__ == "__main__":
main()
3. 构建优化
#!/usr/bin/env python3
# build_optimizer.py - 构建优化工具
import json
import time
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging
@dataclass
class BuildMetrics:
"""构建指标"""
avg_build_time_minutes: float
build_success_rate: float
resource_utilization: float
parallel_efficiency: float
build_frequency_per_day: float
class BuildOptimizer:
"""构建优化器"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.optimal_build_time_minutes = 30
self.max_parallel_builds = 4
self.target_success_rate = 0.95
def analyze_build_performance(self, build_history: List[Dict]) -> BuildMetrics:
"""分析构建性能"""
if not build_history:
return BuildMetrics(0, 0, 0, 0, 0)
# 计算平均构建时间
build_times = []
successful_builds = 0
for build in build_history:
duration = build.get("duration", 0)
if duration > 0:
build_times.append(duration / (60 * 1000)) # 转换为分钟
if build.get("status") == "FINISHED":
successful_builds += 1
avg_build_time = sum(build_times) / len(build_times) if build_times else 0
success_rate = successful_builds / len(build_history) if build_history else 0
# 计算资源利用率(基于构建时间分布)
resource_utilization = self._calculate_resource_utilization(build_history)
# 计算并行效率
parallel_efficiency = self._calculate_parallel_efficiency(build_history)
# 计算构建频率
build_frequency = self._calculate_build_frequency(build_history)
return BuildMetrics(
avg_build_time_minutes=avg_build_time,
build_success_rate=success_rate,
resource_utilization=resource_utilization,
parallel_efficiency=parallel_efficiency,
build_frequency_per_day=build_frequency
)
def _calculate_resource_utilization(self, build_history: List[Dict]) -> float:
"""计算资源利用率"""
if not build_history:
return 0.0
# 分析构建时间分布,计算资源使用的均匀程度
build_hours = {}
for build in build_history:
start_time = build.get("create_time", 0)
if start_time > 0:
hour = datetime.fromtimestamp(start_time / 1000).hour
build_hours[hour] = build_hours.get(hour, 0) + 1
if not build_hours:
return 0.0
# 计算分布的均匀程度
avg_builds_per_hour = sum(build_hours.values()) / 24
variance = sum((count - avg_builds_per_hour) ** 2 for count in build_hours.values()) / 24
# 利用率 = 1 - 标准差/平均值(归一化到0-1)
if avg_builds_per_hour > 0:
cv = (variance ** 0.5) / avg_builds_per_hour
return max(0, 1 - min(1, cv))
return 0.0
def _calculate_parallel_efficiency(self, build_history: List[Dict]) -> float:
"""计算并行效率"""
if not build_history:
return 0.0
# 分析同时运行的构建任务数量
concurrent_builds = []
for i, build in enumerate(build_history):
start_time = build.get("create_time", 0)
end_time = start_time + build.get("duration", 0)
concurrent_count = 1 # 包括当前构建
for other_build in build_history:
if other_build == build:
continue
other_start = other_build.get("create_time", 0)
other_end = other_start + other_build.get("duration", 0)
# 检查时间重叠
if (start_time < other_end and end_time > other_start):
concurrent_count += 1
concurrent_builds.append(concurrent_count)
if not concurrent_builds:
return 0.0
avg_concurrent = sum(concurrent_builds) / len(concurrent_builds)
return min(1.0, avg_concurrent / self.max_parallel_builds)
def _calculate_build_frequency(self, build_history: List[Dict]) -> float:
"""计算构建频率"""
if len(build_history) < 2:
return 0.0
# 计算时间跨度
start_times = [b.get("create_time", 0) for b in build_history if b.get("create_time", 0) > 0]
if len(start_times) < 2:
return 0.0
time_span_days = (max(start_times) - min(start_times)) / (24 * 3600 * 1000)
return len(build_history) / time_span_days if time_span_days > 0 else 0
def generate_build_optimization_plan(self, cube_info: Dict,
build_history: List[Dict]) -> Dict:
"""生成构建优化方案"""
metrics = self.analyze_build_performance(build_history)
optimization_plan = {
"current_metrics": metrics,
"optimizations": [],
"configuration_recommendations": {}
}
# 构建时间优化
if metrics.avg_build_time_minutes > self.optimal_build_time_minutes:
time_optimization = self._plan_build_time_optimization(cube_info, metrics)
optimization_plan["optimizations"].append(time_optimization)
# 成功率优化
if metrics.build_success_rate < self.target_success_rate:
reliability_optimization = self._plan_reliability_optimization(build_history)
optimization_plan["optimizations"].append(reliability_optimization)
# 资源利用率优化
if metrics.resource_utilization < 0.7:
resource_optimization = self._plan_resource_optimization(build_history)
optimization_plan["optimizations"].append(resource_optimization)
# 并行优化
if metrics.parallel_efficiency < 0.5:
parallel_optimization = self._plan_parallel_optimization(cube_info)
optimization_plan["optimizations"].append(parallel_optimization)
# 生成配置建议
optimization_plan["configuration_recommendations"] = self._generate_config_recommendations(
cube_info, metrics
)
return optimization_plan
def _plan_build_time_optimization(self, cube_info: Dict, metrics: BuildMetrics) -> Dict:
"""规划构建时间优化"""
recommendations = []
# 分析Cube复杂度
dimensions = cube_info.get("dimensions", [])
aggregation_groups = cube_info.get("aggregation_groups", [])
if len(dimensions) > 15:
recommendations.append("减少维度数量或优化聚合组配置")
# 检查算法配置
algorithm = cube_info.get("override_kylin_properties", {}).get("kylin.cube.algorithm", "auto")
if algorithm == "auto":
recommendations.append("指定具体的构建算法(layer或inmem)")
# 检查分区策略
partition_column = cube_info.get("partition_date_column")
if not partition_column:
recommendations.append("配置分区列以支持增量构建")
return {
"type": "build_time_optimization",
"description": f"优化构建时间(当前: {metrics.avg_build_time_minutes:.1f}分钟)",
"details": {
"current_time_minutes": metrics.avg_build_time_minutes,
"target_time_minutes": self.optimal_build_time_minutes,
"recommendations": recommendations
},
"priority": "high" if metrics.avg_build_time_minutes > 60 else "medium"
}
def _plan_reliability_optimization(self, build_history: List[Dict]) -> Dict:
"""规划可靠性优化"""
failed_builds = [b for b in build_history if b.get("status") in ["ERROR", "DISCARDED"]]
# 分析失败原因
failure_reasons = {}
for build in failed_builds:
error_msg = build.get("error_message", "Unknown error")
# 简化错误分类
if "memory" in error_msg.lower() or "heap" in error_msg.lower():
failure_reasons["memory"] = failure_reasons.get("memory", 0) + 1
elif "timeout" in error_msg.lower():
failure_reasons["timeout"] = failure_reasons.get("timeout", 0) + 1
elif "connection" in error_msg.lower():
failure_reasons["connection"] = failure_reasons.get("connection", 0) + 1
else:
failure_reasons["other"] = failure_reasons.get("other", 0) + 1
recommendations = []
if failure_reasons.get("memory", 0) > 0:
recommendations.append("增加构建节点内存配置")
if failure_reasons.get("timeout", 0) > 0:
recommendations.append("调整构建超时时间配置")
if failure_reasons.get("connection", 0) > 0:
recommendations.append("检查网络连接和数据源稳定性")
return {
"type": "reliability_optimization",
"description": "提高构建成功率",
"details": {
"failed_builds": len(failed_builds),
"failure_reasons": failure_reasons,
"recommendations": recommendations
},
"priority": "high"
}
def _plan_resource_optimization(self, build_history: List[Dict]) -> Dict:
"""规划资源优化"""
# 分析构建时间分布
build_hours = {}
for build in build_history:
start_time = build.get("create_time", 0)
if start_time > 0:
hour = datetime.fromtimestamp(start_time / 1000).hour
build_hours[hour] = build_hours.get(hour, 0) + 1
# 找出高峰和低谷时段
peak_hours = [h for h, count in build_hours.items() if count > 2]
low_hours = [h for h in range(24) if h not in build_hours or build_hours[h] == 0]
recommendations = []
if peak_hours:
recommendations.append(f"高峰时段({peak_hours})考虑限制并发构建数量")
if low_hours:
recommendations.append(f"低谷时段({low_hours})可以安排大型构建任务")
return {
"type": "resource_optimization",
"description": "优化资源利用率",
"details": {
"peak_hours": peak_hours,
"low_hours": low_hours,
"recommendations": recommendations
},
"priority": "medium"
}
def _plan_parallel_optimization(self, cube_info: Dict) -> Dict:
"""规划并行优化"""
recommendations = []
# 检查是否支持分区并行构建
partition_column = cube_info.get("partition_date_column")
if partition_column:
recommendations.append("启用分区并行构建")
# 检查聚合组配置
aggregation_groups = cube_info.get("aggregation_groups", [])
if len(aggregation_groups) > 1:
recommendations.append("配置聚合组并行构建")
return {
"type": "parallel_optimization",
"description": "提高并行构建效率",
"details": {
"supports_partition_parallel": bool(partition_column),
"aggregation_groups_count": len(aggregation_groups),
"recommendations": recommendations
},
"priority": "medium"
}
def _generate_config_recommendations(self, cube_info: Dict, metrics: BuildMetrics) -> Dict:
"""生成配置建议"""
config_recommendations = {
"kylin_properties": {},
"cube_properties": {},
"system_properties": {}
}
# Kylin配置建议
if metrics.avg_build_time_minutes > 60:
config_recommendations["kylin_properties"]["kylin.job.max-concurrent-jobs"] = "3"
config_recommendations["kylin_properties"]["kylin.cube.algorithm"] = "inmem"
if metrics.build_success_rate < 0.9:
config_recommendations["kylin_properties"]["kylin.job.retry"] = "3"
config_recommendations["kylin_properties"]["kylin.job.timeout.seconds"] = "7200"
# Cube配置建议
dimensions = cube_info.get("dimensions", [])
if len(dimensions) > 10:
config_recommendations["cube_properties"]["kylin.cube.aggrgroup.max-combination"] = "16384"
# 系统配置建议
if metrics.avg_build_time_minutes > 30:
config_recommendations["system_properties"]["spark.executor.memory"] = "4g"
config_recommendations["system_properties"]["spark.executor.cores"] = "2"
return config_recommendations
def print_build_report(self, cube_name: str, build_history: List[Dict], cube_info: Dict):
"""打印构建报告"""
metrics = self.analyze_build_performance(build_history)
optimization_plan = self.generate_build_optimization_plan(cube_info, build_history)
print(f"\n=== {cube_name} 构建性能报告 ===")
# 当前指标
print(f"\n📊 构建指标:")
print(f" 平均构建时间: {metrics.avg_build_time_minutes:.1f} 分钟")
print(f" 构建成功率: {metrics.build_success_rate:.1%}")
print(f" 资源利用率: {metrics.resource_utilization:.1%}")
print(f" 并行效率: {metrics.parallel_efficiency:.1%}")
print(f" 构建频率: {metrics.build_frequency_per_day:.1f} 次/天")
# 优化建议
optimizations = optimization_plan["optimizations"]
if optimizations:
print(f"\n🔧 优化建议:")
for i, opt in enumerate(optimizations, 1):
priority_icon = {'high': '🔴', 'medium': '🟡', 'low': '🟢'}.get(opt['priority'], '⚪')
print(f"\n{priority_icon} {i}. {opt['description']} ({opt['priority']})")
for rec in opt["details"].get("recommendations", []):
print(f" - {rec}")
# 配置建议
config_recs = optimization_plan["configuration_recommendations"]
if any(config_recs.values()):
print(f"\n⚙️ 配置建议:")
for category, configs in config_recs.items():
if configs:
print(f"\n {category.replace('_', ' ').title()}:")
for key, value in configs.items():
print(f" {key} = {value}")
def main():
# 示例使用
optimizer = BuildOptimizer()
# 示例构建历史
build_history = [
{
"uuid": "job1",
"status": "FINISHED",
"create_time": int(time.time() * 1000) - 3600000, # 1小时前
"duration": 1800000, # 30分钟
"error_message": None
},
{
"uuid": "job2",
"status": "ERROR",
"create_time": int(time.time() * 1000) - 7200000, # 2小时前
"duration": 3600000, # 60分钟
"error_message": "OutOfMemoryError: Java heap space"
},
{
"uuid": "job3",
"status": "FINISHED",
"create_time": int(time.time() * 1000) - 10800000, # 3小时前
"duration": 2400000, # 40分钟
"error_message": None
}
]
# 示例Cube信息
cube_info = {
"name": "sales_cube",
"dimensions": [{"name": f"dim_{i}"} for i in range(12)],
"aggregation_groups": [{"includes": [f"dim_{i}" for i in range(6)]}],
"partition_date_column": "TIME.DATE_VALUE",
"override_kylin_properties": {
"kylin.cube.algorithm": "auto"
}
}
optimizer.print_build_report("sales_cube", build_history, cube_info)
if __name__ == "__main__":
main()
4. 查询优化
#!/usr/bin/env python3
# query_optimizer.py - 查询优化工具
import json
import re
from typing import Dict, List, Set, Tuple
from dataclasses import dataclass
import logging
@dataclass
class QueryMetrics:
"""查询指标"""
avg_response_time_ms: float
query_success_rate: float
cache_hit_rate: float
cuboid_hit_rate: float
scan_count_avg: float
class QueryOptimizer:
"""查询优化器"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.target_response_time_ms = 5000 # 5秒
self.min_cache_hit_rate = 0.8
self.min_cuboid_hit_rate = 0.9
def analyze_query_performance(self, query_history: List[Dict]) -> QueryMetrics:
"""分析查询性能"""
if not query_history:
return QueryMetrics(0, 0, 0, 0, 0)
# 计算响应时间
response_times = []
successful_queries = 0
cache_hits = 0
cuboid_hits = 0
scan_counts = []
for query in query_history:
duration = query.get("duration", 0)
if duration > 0:
response_times.append(duration)
if query.get("exception") is None:
successful_queries += 1
if query.get("cache_hit", False):
cache_hits += 1
if query.get("exactly_match", False):
cuboid_hits += 1
scan_count = query.get("scan_count", 0)
if scan_count > 0:
scan_counts.append(scan_count)
avg_response_time = sum(response_times) / len(response_times) if response_times else 0
success_rate = successful_queries / len(query_history) if query_history else 0
cache_hit_rate = cache_hits / len(query_history) if query_history else 0
cuboid_hit_rate = cuboid_hits / len(query_history) if query_history else 0
avg_scan_count = sum(scan_counts) / len(scan_counts) if scan_counts else 0
return QueryMetrics(
avg_response_time_ms=avg_response_time,
query_success_rate=success_rate,
cache_hit_rate=cache_hit_rate,
cuboid_hit_rate=cuboid_hit_rate,
scan_count_avg=avg_scan_count
)
def analyze_query_patterns(self, query_history: List[Dict]) -> Dict:
"""分析查询模式"""
patterns = {
"frequent_dimensions": {},
"frequent_measures": {},
"time_patterns": {},
"filter_patterns": {},
"groupby_patterns": {}
}
for query in query_history:
sql = query.get("sql", "")
if not sql:
continue
# 分析维度使用频率
dimensions = self._extract_dimensions_from_sql(sql)
for dim in dimensions:
patterns["frequent_dimensions"][dim] = patterns["frequent_dimensions"].get(dim, 0) + 1
# 分析度量使用频率
measures = self._extract_measures_from_sql(sql)
for measure in measures:
patterns["frequent_measures"][measure] = patterns["frequent_measures"].get(measure, 0) + 1
# 分析时间模式
time_filters = self._extract_time_filters(sql)
for time_filter in time_filters:
patterns["time_patterns"][time_filter] = patterns["time_patterns"].get(time_filter, 0) + 1
# 分析过滤模式
filters = self._extract_filters(sql)
for filter_expr in filters:
patterns["filter_patterns"][filter_expr] = patterns["filter_patterns"].get(filter_expr, 0) + 1
# 分析GROUP BY模式
groupby_cols = self._extract_groupby_columns(sql)
groupby_key = ",".join(sorted(groupby_cols))
if groupby_key:
patterns["groupby_patterns"][groupby_key] = patterns["groupby_patterns"].get(groupby_key, 0) + 1
# 排序并取前10
for pattern_type in patterns:
patterns[pattern_type] = dict(sorted(
patterns[pattern_type].items(),
key=lambda x: x[1],
reverse=True
)[:10])
return patterns
def _extract_dimensions_from_sql(self, sql: str) -> List[str]:
"""从SQL中提取维度"""
# 简化的维度提取逻辑
dimensions = []
# 查找SELECT子句中的维度列
select_match = re.search(r'SELECT\s+(.+?)\s+FROM', sql, re.IGNORECASE | re.DOTALL)
if select_match:
select_clause = select_match.group(1)
# 排除聚合函数,提取普通列
cols = re.findall(r'(?<!\w)([a-zA-Z_][a-zA-Z0-9_]*\.[a-zA-Z_][a-zA-Z0-9_]*)(?!\s*\()', select_clause)
dimensions.extend(cols)
# 查找GROUP BY子句中的维度
groupby_match = re.search(r'GROUP\s+BY\s+(.+?)(?:\s+ORDER|\s+HAVING|\s+LIMIT|$)', sql, re.IGNORECASE)
if groupby_match:
groupby_clause = groupby_match.group(1)
cols = re.findall(r'([a-zA-Z_][a-zA-Z0-9_]*\.[a-zA-Z_][a-zA-Z0-9_]*)', groupby_clause)
dimensions.extend(cols)
return list(set(dimensions))
def _extract_measures_from_sql(self, sql: str) -> List[str]:
"""从SQL中提取度量"""
measures = []
# 查找聚合函数
agg_functions = re.findall(r'(SUM|COUNT|AVG|MAX|MIN)\s*\([^)]+\)', sql, re.IGNORECASE)
measures.extend(agg_functions)
return measures
def _extract_time_filters(self, sql: str) -> List[str]:
"""提取时间过滤条件"""
time_filters = []
# 查找日期相关的WHERE条件
date_patterns = [
r'\w+\.\w*date\w*\s*[><=]+\s*[\'"][^\'"]++[\'"]',
r'\w+\.\w*time\w*\s*[><=]+\s*[\'"][^\'"]++[\'"]',
r'\w+\.\w*year\w*\s*[><=]+\s*\d+',
r'\w+\.\w*month\w*\s*[><=]+\s*\d+'
]
for pattern in date_patterns:
matches = re.findall(pattern, sql, re.IGNORECASE)
time_filters.extend(matches)
return time_filters
def _extract_filters(self, sql: str) -> List[str]:
"""提取过滤条件"""
filters = []
# 查找WHERE子句
where_match = re.search(r'WHERE\s+(.+?)(?:\s+GROUP|\s+ORDER|\s+LIMIT|$)', sql, re.IGNORECASE | re.DOTALL)
if where_match:
where_clause = where_match.group(1)
# 简化的过滤条件提取
filter_conditions = re.findall(r'\w+\.\w+\s*[><=!]+\s*[\w\'"][^\s]+', where_clause)
filters.extend(filter_conditions)
return filters
def _extract_groupby_columns(self, sql: str) -> List[str]:
"""提取GROUP BY列"""
groupby_match = re.search(r'GROUP\s+BY\s+(.+?)(?:\s+ORDER|\s+HAVING|\s+LIMIT|$)', sql, re.IGNORECASE)
if groupby_match:
groupby_clause = groupby_match.group(1)
columns = [col.strip() for col in groupby_clause.split(',')]
return [col for col in columns if col]
return []
def generate_query_optimization_plan(self, cube_info: Dict,
query_history: List[Dict]) -> Dict:
"""生成查询优化方案"""
metrics = self.analyze_query_performance(query_history)
patterns = self.analyze_query_patterns(query_history)
optimization_plan = {
"current_metrics": metrics,
"query_patterns": patterns,
"optimizations": [],
"index_recommendations": [],
"cube_recommendations": []
}
# 响应时间优化
if metrics.avg_response_time_ms > self.target_response_time_ms:
response_time_opt = self._plan_response_time_optimization(metrics, patterns)
optimization_plan["optimizations"].append(response_time_opt)
# 缓存优化
if metrics.cache_hit_rate < self.min_cache_hit_rate:
cache_opt = self._plan_cache_optimization(patterns)
optimization_plan["optimizations"].append(cache_opt)
# Cuboid优化
if metrics.cuboid_hit_rate < self.min_cuboid_hit_rate:
cuboid_opt = self._plan_cuboid_optimization(cube_info, patterns)
optimization_plan["optimizations"].append(cuboid_opt)
# 生成索引建议
optimization_plan["index_recommendations"] = self._generate_index_recommendations(patterns)
# 生成Cube建议
optimization_plan["cube_recommendations"] = self._generate_cube_recommendations(
cube_info, patterns
)
return optimization_plan
def _plan_response_time_optimization(self, metrics: QueryMetrics, patterns: Dict) -> Dict:
"""规划响应时间优化"""
recommendations = []
# 基于扫描行数分析
if metrics.scan_count_avg > 1000000: # 超过100万行
recommendations.append("优化查询过滤条件,减少扫描行数")
# 基于查询模式分析
frequent_dims = patterns.get("frequent_dimensions", {})
if len(frequent_dims) > 10:
recommendations.append("考虑创建针对高频维度的专用Cube")
# 基于GROUP BY模式
groupby_patterns = patterns.get("groupby_patterns", {})
if len(groupby_patterns) > 20:
recommendations.append("优化聚合组配置以覆盖常用查询模式")
return {
"type": "response_time_optimization",
"description": f"优化查询响应时间(当前: {metrics.avg_response_time_ms:.0f}ms)",
"details": {
"current_response_time_ms": metrics.avg_response_time_ms,
"target_response_time_ms": self.target_response_time_ms,
"avg_scan_count": metrics.scan_count_avg,
"recommendations": recommendations
},
"priority": "high" if metrics.avg_response_time_ms > 10000 else "medium"
}
def _plan_cache_optimization(self, patterns: Dict) -> Dict:
"""规划缓存优化"""
recommendations = [
"增加查询结果缓存大小",
"优化缓存策略,提高热点查询缓存命中率",
"考虑预热常用查询的缓存"
]
# 基于查询模式的缓存建议
frequent_patterns = patterns.get("groupby_patterns", {})
if frequent_patterns:
top_pattern = list(frequent_patterns.keys())[0]
recommendations.append(f"为高频查询模式({top_pattern})配置专用缓存")
return {
"type": "cache_optimization",
"description": "优化查询缓存策略",
"details": {
"recommendations": recommendations
},
"priority": "medium"
}
def _plan_cuboid_optimization(self, cube_info: Dict, patterns: Dict) -> Dict:
"""规划Cuboid优化"""
recommendations = []
# 分析当前聚合组配置
aggregation_groups = cube_info.get("aggregation_groups", [])
frequent_dims = patterns.get("frequent_dimensions", {})
if frequent_dims:
top_dims = list(frequent_dims.keys())[:5]
recommendations.append(f"确保高频维度({', '.join(top_dims)})在聚合组中")
# 分析GROUP BY模式
groupby_patterns = patterns.get("groupby_patterns", {})
if groupby_patterns:
top_groupby = list(groupby_patterns.keys())[0]
recommendations.append(f"为常用GROUP BY模式({top_groupby})创建专用Cuboid")
return {
"type": "cuboid_optimization",
"description": "优化Cuboid配置",
"details": {
"current_aggregation_groups": len(aggregation_groups),
"recommendations": recommendations
},
"priority": "high"
}
def _generate_index_recommendations(self, patterns: Dict) -> List[Dict]:
"""生成索引建议"""
recommendations = []
# 基于过滤模式的索引建议
filter_patterns = patterns.get("filter_patterns", {})
for filter_expr, count in list(filter_patterns.items())[:5]:
if count > 10: # 高频过滤条件
recommendations.append({
"type": "filter_index",
"description": f"为高频过滤条件创建索引",
"filter_expression": filter_expr,
"frequency": count,
"priority": "high" if count > 50 else "medium"
})
# 基于时间模式的索引建议
time_patterns = patterns.get("time_patterns", {})
if time_patterns:
recommendations.append({
"type": "time_index",
"description": "优化时间维度索引",
"time_patterns": list(time_patterns.keys())[:3],
"priority": "medium"
})
return recommendations
def _generate_cube_recommendations(self, cube_info: Dict, patterns: Dict) -> List[Dict]:
"""生成Cube建议"""
recommendations = []
# 维度建议
frequent_dims = patterns.get("frequent_dimensions", {})
current_dims = [d.get("name", "") for d in cube_info.get("dimensions", [])]
for dim, count in frequent_dims.items():
if dim not in current_dims and count > 20:
recommendations.append({
"type": "add_dimension",
"description": f"添加高频维度: {dim}",
"dimension": dim,
"frequency": count,
"priority": "high"
})
# 度量建议
frequent_measures = patterns.get("frequent_measures", {})
for measure, count in frequent_measures.items():
if count > 15:
recommendations.append({
"type": "optimize_measure",
"description": f"优化高频度量: {measure}",
"measure": measure,
"frequency": count,
"priority": "medium"
})
# 聚合组建议
groupby_patterns = patterns.get("groupby_patterns", {})
for pattern, count in list(groupby_patterns.items())[:3]:
if count > 10:
recommendations.append({
"type": "aggregation_group",
"description": f"为常用查询模式创建聚合组",
"pattern": pattern,
"frequency": count,
"priority": "high" if count > 30 else "medium"
})
return recommendations
def print_query_report(self, cube_name: str, query_history: List[Dict], cube_info: Dict):
"""打印查询报告"""
metrics = self.analyze_query_performance(query_history)
optimization_plan = self.generate_query_optimization_plan(cube_info, query_history)
print(f"\n=== {cube_name} 查询性能报告 ===")
# 当前指标
print(f"\n📊 查询指标:")
print(f" 平均响应时间: {metrics.avg_response_time_ms:.0f} ms")
print(f" 查询成功率: {metrics.query_success_rate:.1%}")
print(f" 缓存命中率: {metrics.cache_hit_rate:.1%}")
print(f" Cuboid命中率: {metrics.cuboid_hit_rate:.1%}")
print(f" 平均扫描行数: {metrics.scan_count_avg:.0f}")
# 查询模式
patterns = optimization_plan["query_patterns"]
print(f"\n📈 查询模式分析:")
frequent_dims = patterns.get("frequent_dimensions", {})
if frequent_dims:
print(f"\n 高频维度:")
for dim, count in list(frequent_dims.items())[:5]:
print(f" {dim}: {count}次")
groupby_patterns = patterns.get("groupby_patterns", {})
if groupby_patterns:
print(f"\n 常用GROUP BY模式:")
for pattern, count in list(groupby_patterns.items())[:3]:
print(f" {pattern}: {count}次")
# 优化建议
optimizations = optimization_plan["optimizations"]
if optimizations:
print(f"\n🔧 优化建议:")
for i, opt in enumerate(optimizations, 1):
priority_icon = {'high': '🔴', 'medium': '🟡', 'low': '🟢'}.get(opt['priority'], '⚪')
print(f"\n{priority_icon} {i}. {opt['description']} ({opt['priority']})")
for rec in opt["details"].get("recommendations", []):
print(f" - {rec}")
# Cube建议
cube_recs = optimization_plan["cube_recommendations"]
if cube_recs:
print(f"\n🎯 Cube优化建议:")
for i, rec in enumerate(cube_recs[:5], 1):
priority_icon = {'high': '🔴', 'medium': '🟡', 'low': '🟢'}.get(rec['priority'], '⚪')
print(f"\n{priority_icon} {i}. {rec['description']} ({rec['priority']})")
if 'frequency' in rec:
print(f" 频率: {rec['frequency']}次")
def main():
# 示例使用
optimizer = QueryOptimizer()
# 示例查询历史
query_history = [
{
"sql": "SELECT time.year, product.category, SUM(sales.amount) FROM sales_fact sales JOIN time_dim time ON sales.time_key = time.date_key JOIN product_dim product ON sales.product_key = product.product_key WHERE time.year = 2023 GROUP BY time.year, product.category",
"duration": 3000,
"exception": None,
"cache_hit": False,
"exactly_match": True,
"scan_count": 500000
},
{
"sql": "SELECT customer.region, SUM(sales.amount), COUNT(*) FROM sales_fact sales JOIN customer_dim customer ON sales.customer_key = customer.customer_key WHERE customer.region = 'North' GROUP BY customer.region",
"duration": 8000,
"exception": None,
"cache_hit": True,
"exactly_match": False,
"scan_count": 1200000
}
]
# 示例Cube信息
cube_info = {
"name": "sales_cube",
"dimensions": [
{"name": "time.year"},
{"name": "product.category"},
{"name": "customer.region"}
],
"aggregation_groups": [
{"includes": ["time.year", "product.category"]}
]
}
optimizer.print_query_report("sales_cube", query_history, cube_info)
if __name__ == "__main__":
main()
5.6 最佳实践总结
5.6.1 模型设计原则
维度设计原则: - 选择合适的粒度,平衡查询灵活性和性能 - 建立清晰的维度层次结构 - 使用一致的命名规范 - 考虑维度的变化频率和历史追踪需求
度量设计原则: - 区分基础度量和派生度量 - 选择合适的聚合函数 - 考虑度量的可加性 - 预计算常用的复合度量
分区策略: - 基于时间维度进行分区 - 选择合适的分区粒度(日、周、月) - 考虑数据增长速度和查询模式 - 定期清理历史分区
5.6.2 性能优化策略
存储优化: - 合理控制段数量和大小 - 选择合适的压缩算法 - 优化编码方式 - 定期进行段合并
查询优化: - 设计合理的聚合组 - 优化Cuboid配置 - 提高缓存命中率 - 减少不必要的维度组合
构建优化: - 选择合适的构建算法 - 配置并行构建 - 优化资源分配 - 监控构建性能
5.6.3 运维管理
监控告警: - 设置性能阈值告警 - 监控存储使用情况 - 跟踪查询性能趋势 - 建立故障处理流程
备份恢复: - 定期备份模型配置 - 建立数据恢复机制 - 测试恢复流程 - 文档化操作步骤
版本管理: - 使用版本控制管理模型变更 - 建立变更审批流程 - 保留历史版本 - 支持快速回滚
5.7 本章小结
本章深入介绍了Apache Kylin的数据模型设计与管理,涵盖了以下核心内容:
- 维度建模理论:介绍了事实表、维度表、星型模式等基础概念
- 数据模型创建:详细说明了通过Web UI和REST API创建模型的方法
- 维度和度量定义:提供了维度层次设计和度量类型分类的最佳实践
- 模型管理与优化:包括生命周期管理、性能监控和优化策略
- Cube设计与管理:深入讲解了Cube配置、聚合组优化等高级主题
- 性能优化:提供了存储、构建、查询等多方面的优化工具和方法
通过本章的学习,读者应该能够: - 理解维度建模的基本原理和最佳实践 - 熟练创建和管理Kylin数据模型 - 设计高效的Cube配置 - 进行性能监控和优化 - 建立完善的运维管理体系
下一章将介绍”Cube构建与管理”,深入探讨Cube的构建策略、增量构建、任务调度等高级主题。
5.8 练习与思考
理论练习
维度建模设计
- 为电商业务设计一个完整的星型模式
- 包括订单事实表和相关维度表
- 考虑维度的层次结构和属性
聚合组优化
- 分析给定的查询模式
- 设计最优的聚合组配置
- 计算Cuboid数量和存储开销
性能分析
- 根据查询日志分析性能瓶颈
- 提出具体的优化建议
- 评估优化效果
实践练习
模型创建实践
# 使用提供的脚本创建数据模型 python model_validator.py --config sales_model.json python dimension_designer.py --create-time-dimension python measure_designer.py --create-basic-measures
性能监控实践
# 运行性能监控工具 python model_performance_monitor.py --cube sales_cube python query_optimizer.py --analyze-patterns python storage_optimizer.py --generate-report
优化实践
# 执行优化建议 python aggregation_group_optimizer.py --optimize sales_cube python build_optimizer.py --analyze-build-history
思考题
- 设计权衡:在维度数量和查询性能之间如何找到平衡点?
- 存储优化:什么情况下应该选择layer算法而不是inmem算法?
- 查询优化:如何设计聚合组来最大化Cuboid命中率?
- 运维策略:如何建立有效的模型变更管理流程?
- 扩展性考虑:随着数据量增长,模型设计需要考虑哪些因素? – 时间维度层次示例 CREATE TABLE time_hierarchy ( date_key INT PRIMARY KEY, full_date DATE, – 年层次 year_key INT, year_name VARCHAR(10), – 季度层次 quarter_key INT, quarter_name VARCHAR(10), – 月层次 month_key INT, month_name VARCHAR(20), month_short_name VARCHAR(10), – 周层次 week_key INT, week_name VARCHAR(20), – 日层次 day_of_year INT, day_of_month INT, day_of_week INT, day_name VARCHAR(20), is_weekend BOOLEAN, is_holiday BOOLEAN, – 业务日历 fiscal_year INT, fiscal_quarter INT, fiscal_month INT ); – 产品维度层次示例 CREATE TABLE product_hierarchy ( product_key INT PRIMARY KEY, product_id VARCHAR(50), product_name VARCHAR(200), – 品类层次 category_key INT, category_name VARCHAR(100), subcategory_key INT, subcategory_name VARCHAR(100), – 品牌层次 brand_key INT, brand_name VARCHAR(100), – 供应商层次 supplier_key INT, supplier_name VARCHAR(100), – 产品属性 product_type VARCHAR(50), product_status VARCHAR(20), unit_price DECIMAL(10,2), unit_cost DECIMAL(10,2), weight DECIMAL(8,2), color VARCHAR(50), size VARCHAR(50), – 审计字段 created_date DATE, modified_date DATE, is_active BOOLEAN ); – 地理维度层次示例 CREATE TABLE geography_hierarchy ( geography_key INT PRIMARY KEY, – 国家层次 country_key INT, country_name VARCHAR(100), country_code VARCHAR(10), – 地区层次 region_key INT, region_name VARCHAR(100), – 省/州层次 state_key INT, state_name VARCHAR(100), state_code VARCHAR(10), – 城市层次 city_key INT, city_name VARCHAR(100), – 邮编层次 postal_code VARCHAR(20), – 地理坐标 latitude DECIMAL(10,8), longitude DECIMAL(11,8), – 时区 timezone VARCHAR(50) ); – 客户维度层次示例 CREATE TABLE customer_hierarchy ( customer_key INT PRIMARY KEY, customer_id VARCHAR(50), customer_name VARCHAR(200), – 客户分类 customer_type VARCHAR(50), – B2B, B2C customer_segment VARCHAR(50), – VIP, Premium, Standard customer_status VARCHAR(20), – Active, Inactive, Suspended – 人口统计 age_group VARCHAR(20), gender VARCHAR(10), education_level VARCHAR(50), income_level VARCHAR(50), – 地理信息 geography_key INT, – 联系信息 email VARCHAR(200), phone VARCHAR(50), – 注册信息 registration_date DATE, first_purchase_date DATE, last_purchase_date DATE, – 客户价值 lifetime_value DECIMAL(12,2), credit_score INT, – 审计字段 created_date DATE, modified_date DATE, is_active BOOLEAN, – 外键约束 FOREIGN KEY (geography_key) REFERENCES geography_hierarchy(geography_key) ); – 销售事实表设计 CREATE TABLE sales_fact ( – 代理键 sales_key BIGINT PRIMARY KEY, – 维度外键 time_key INT NOT NULL, product_key INT NOT NULL, customer_key INT NOT NULL, geography_key INT NOT NULL, sales_rep_key INT, promotion_key INT, – 度量字段 quantity_sold INT, unit_price DECIMAL(10,2), unit_cost DECIMAL(10,2), sales_amount DECIMAL(12,2), cost_amount DECIMAL(12,2), profit_amount DECIMAL(12,2), discount_amount DECIMAL(10,2), tax_amount DECIMAL(10,2), shipping_amount DECIMAL(10,2), – 退化维度(直接存储在事实表中的维度属性) order_id VARCHAR(50), order_line_number INT, invoice_number VARCHAR(50), payment_method VARCHAR(50), shipping_method VARCHAR(50), – 审计字段 transaction_timestamp TIMESTAMP, created_date DATE, modified_date DATE, etl_batch_id BIGINT, – 外键约束 FOREIGN KEY (time_key) REFERENCES time_hierarchy(date_key), FOREIGN KEY (product_key) REFERENCES product_hierarchy(product_key), FOREIGN KEY (customer_key) REFERENCES customer_hierarchy(customer_key), FOREIGN KEY (geography_key) REFERENCES geography_hierarchy(geography_key) ) – 分区策略 PARTITION BY RANGE (time_key) ( PARTITION p2023_q1 VALUES LESS THAN (20230401), PARTITION p2023_q2 VALUES LESS THAN (20230701), PARTITION p2023_q3 VALUES LESS THAN (20231001), PARTITION p2023_q4 VALUES LESS THAN (20240101), PARTITION p2024_q1 VALUES LESS THAN (20240401) ); – 创建索引 CREATE INDEX idx_sales_time ON sales_fact(time_key); CREATE INDEX idx_sales_product ON sales_fact(product_key); CREATE INDEX idx_sales_customer ON sales_fact(customer_key); CREATE INDEX idx_sales_geography ON sales_fact(geography_key); CREATE INDEX idx_sales_order ON sales_fact(order_id); CREATE INDEX idx_sales_timestamp ON sales_fact(transaction_timestamp); “`
5.4 数据模型验证工具
为了确保数据模型的质量和性能,我们需要建立完善的验证机制:
#!/usr/bin/env python3
# advanced_model_validator.py - 高级数据模型验证工具
import json
import re
import logging
from typing import Dict, List, Set, Tuple, Optional
from dataclasses import dataclass
from datetime import datetime
import sqlite3
import pandas as pd
@dataclass
class ValidationResult:
"""验证结果"""
category: str
level: str # ERROR, WARNING, INFO
message: str
details: Dict
suggestions: List[str]
class AdvancedModelValidator:
"""高级数据模型验证器"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.validation_results = []
# 验证规则配置
self.rules = {
"max_dimensions": 50,
"max_measures": 100,
"max_cuboids": 10000,
"min_fact_table_rows": 1000,
"max_dimension_cardinality": 1000000,
"recommended_aggregation_groups": 5
}
def validate_model_structure(self, model_config: Dict) -> List[ValidationResult]:
"""验证模型结构"""
results = []
# 验证基本结构
required_fields = ['name', 'fact_table', 'dimensions', 'measures']
for field in required_fields:
if field not in model_config:
results.append(ValidationResult(
category="structure",
level="ERROR",
message=f"缺少必需字段: {field}",
details={"missing_field": field},
suggestions=[f"添加{field}字段到模型配置中"]
))
# 验证维度数量
dimensions = model_config.get('dimensions', [])
if len(dimensions) > self.rules['max_dimensions']:
results.append(ValidationResult(
category="structure",
level="WARNING",
message=f"维度数量过多: {len(dimensions)} > {self.rules['max_dimensions']}",
details={"dimension_count": len(dimensions), "max_allowed": self.rules['max_dimensions']},
suggestions=[
"考虑合并相似维度",
"移除不常用的维度",
"使用维度层次减少维度数量"
]
))
# 验证度量数量
measures = model_config.get('measures', [])
if len(measures) > self.rules['max_measures']:
results.append(ValidationResult(
category="structure",
level="WARNING",
message=f"度量数量过多: {len(measures)} > {self.rules['max_measures']}",
details={"measure_count": len(measures), "max_allowed": self.rules['max_measures']},
suggestions=[
"移除不常用的度量",
"使用派生度量替代基础度量",
"考虑在查询时计算某些度量"
]
))
return results
def validate_dimensions(self, model_config: Dict) -> List[ValidationResult]:
"""验证维度设计"""
results = []
dimensions = model_config.get('dimensions', [])
for dim in dimensions:
dim_name = dim.get('name', 'unknown')
# 验证维度名称
if not self._is_valid_identifier(dim_name):
results.append(ValidationResult(
category="dimension",
level="ERROR",
message=f"维度名称不符合规范: {dim_name}",
details={"dimension": dim_name},
suggestions=["使用字母、数字和下划线组成的名称"]
))
# 验证维度表
table = dim.get('table')
if not table:
results.append(ValidationResult(
category="dimension",
level="ERROR",
message=f"维度{dim_name}缺少表定义",
details={"dimension": dim_name},
suggestions=["为维度指定对应的维度表"]
))
# 验证维度列
column = dim.get('column')
if not column:
results.append(ValidationResult(
category="dimension",
level="ERROR",
message=f"维度{dim_name}缺少列定义",
details={"dimension": dim_name},
suggestions=["为维度指定对应的列名"]
))
# 验证维度基数
cardinality = dim.get('cardinality', 0)
if cardinality > self.rules['max_dimension_cardinality']:
results.append(ValidationResult(
category="dimension",
level="WARNING",
message=f"维度{dim_name}基数过高: {cardinality}",
details={"dimension": dim_name, "cardinality": cardinality},
suggestions=[
"考虑对高基数维度进行分组",
"使用维度层次减少基数",
"评估是否真的需要这个维度"
]
))
return results
def validate_measures(self, model_config: Dict) -> List[ValidationResult]:
"""验证度量设计"""
results = []
measures = model_config.get('measures', [])
measure_names = set()
for measure in measures:
measure_name = measure.get('name', 'unknown')
# 检查重复度量名称
if measure_name in measure_names:
results.append(ValidationResult(
category="measure",
level="ERROR",
message=f"重复的度量名称: {measure_name}",
details={"measure": measure_name},
suggestions=["使用唯一的度量名称"]
))
measure_names.add(measure_name)
# 验证度量名称
if not self._is_valid_identifier(measure_name):
results.append(ValidationResult(
category="measure",
level="ERROR",
message=f"度量名称不符合规范: {measure_name}",
details={"measure": measure_name},
suggestions=["使用字母、数字和下划线组成的名称"]
))
# 验证聚合函数
function = measure.get('function', '').upper()
valid_functions = ['SUM', 'COUNT', 'MAX', 'MIN', 'AVG', 'COUNT_DISTINCT']
if function not in valid_functions:
results.append(ValidationResult(
category="measure",
level="ERROR",
message=f"不支持的聚合函数: {function}",
details={"measure": measure_name, "function": function},
suggestions=[f"使用支持的聚合函数: {', '.join(valid_functions)}"]
))
# 验证度量表达式
expression = measure.get('expression')
if not expression:
results.append(ValidationResult(
category="measure",
level="ERROR",
message=f"度量{measure_name}缺少表达式",
details={"measure": measure_name},
suggestions=["为度量定义计算表达式"]
))
return results
def validate_relationships(self, model_config: Dict) -> List[ValidationResult]:
"""验证表关系"""
results = []
fact_table = model_config.get('fact_table')
dimensions = model_config.get('dimensions', [])
lookups = model_config.get('lookups', [])
# 验证事实表定义
if not fact_table:
results.append(ValidationResult(
category="relationship",
level="ERROR",
message="缺少事实表定义",
details={},
suggestions=["定义模型的事实表"]
))
return results
# 验证连接关系
for lookup in lookups:
# 验证连接类型
join_type = lookup.get('join', {}).get('type', '').upper()
valid_joins = ['INNER', 'LEFT']
if join_type not in valid_joins:
results.append(ValidationResult(
category="relationship",
level="WARNING",
message=f"建议使用INNER或LEFT连接,当前: {join_type}",
details={"join_type": join_type},
suggestions=["使用INNER JOIN提高性能,使用LEFT JOIN保证数据完整性"]
))
# 验证连接条件
primary_key = lookup.get('join', {}).get('primary_key', [])
foreign_key = lookup.get('join', {}).get('foreign_key', [])
if len(primary_key) != len(foreign_key):
results.append(ValidationResult(
category="relationship",
level="ERROR",
message="主键和外键数量不匹配",
details={"primary_key": primary_key, "foreign_key": foreign_key},
suggestions=["确保主键和外键字段一一对应"]
))
return results
def validate_performance(self, model_config: Dict) -> List[ValidationResult]:
"""验证性能配置"""
results = []
# 验证分区配置
partition_desc = model_config.get('partition_desc')
if not partition_desc:
results.append(ValidationResult(
category="performance",
level="WARNING",
message="未配置分区策略",
details={},
suggestions=[
"配置基于时间的分区策略",
"选择合适的分区粒度(日/周/月)",
"考虑数据增长速度和查询模式"
]
))
else:
# 验证分区列
partition_column = partition_desc.get('partition_date_column')
if not partition_column:
results.append(ValidationResult(
category="performance",
level="ERROR",
message="分区配置缺少分区列",
details={},
suggestions=["指定用于分区的日期列"]
))
# 验证聚合组配置
aggregation_groups = model_config.get('aggregation_groups', [])
if not aggregation_groups:
results.append(ValidationResult(
category="performance",
level="WARNING",
message="未配置聚合组",
details={},
suggestions=[
"配置聚合组以优化查询性能",
"基于查询模式设计聚合组",
"控制Cuboid数量"
]
))
elif len(aggregation_groups) > self.rules['recommended_aggregation_groups']:
results.append(ValidationResult(
category="performance",
level="WARNING",
message=f"聚合组数量过多: {len(aggregation_groups)}",
details={"count": len(aggregation_groups)},
suggestions=[
"合并相似的聚合组",
"移除不必要的聚合组",
"优化聚合组设计"
]
))
# 估算Cuboid数量
total_cuboids = self._estimate_cuboid_count(model_config)
if total_cuboids > self.rules['max_cuboids']:
results.append(ValidationResult(
category="performance",
level="ERROR",
message=f"预估Cuboid数量过多: {total_cuboids}",
details={"estimated_cuboids": total_cuboids},
suggestions=[
"减少维度数量",
"优化聚合组配置",
"使用强制维度和层次维度",
"考虑使用联合维度"
]
))
return results
def validate_business_logic(self, model_config: Dict) -> List[ValidationResult]:
"""验证业务逻辑"""
results = []
# 验证度量的业务含义
measures = model_config.get('measures', [])
for measure in measures:
measure_name = measure.get('name', '')
function = measure.get('function', '').upper()
# 检查可能的业务逻辑错误
if 'amount' in measure_name.lower() and function == 'COUNT':
results.append(ValidationResult(
category="business_logic",
level="WARNING",
message=f"度量{measure_name}使用COUNT函数可能不合适",
details={"measure": measure_name, "function": function},
suggestions=["金额类度量通常使用SUM函数"]
))
if 'count' in measure_name.lower() and function != 'COUNT':
results.append(ValidationResult(
category="business_logic",
level="WARNING",
message=f"度量{measure_name}可能应该使用COUNT函数",
details={"measure": measure_name, "function": function},
suggestions=["计数类度量通常使用COUNT函数"]
))
# 验证维度的业务含义
dimensions = model_config.get('dimensions', [])
time_dimensions = [d for d in dimensions if 'time' in d.get('name', '').lower() or 'date' in d.get('name', '').lower()]
if not time_dimensions:
results.append(ValidationResult(
category="business_logic",
level="WARNING",
message="模型中缺少时间维度",
details={},
suggestions=[
"添加时间维度以支持时间序列分析",
"考虑业务分析的时间需求"
]
))
return results
def validate_security(self, model_config: Dict) -> List[ValidationResult]:
"""验证安全配置"""
results = []
# 检查敏感数据
dimensions = model_config.get('dimensions', [])
measures = model_config.get('measures', [])
sensitive_keywords = ['password', 'ssn', 'credit_card', 'phone', 'email', 'address']
for dim in dimensions:
dim_name = dim.get('name', '').lower()
for keyword in sensitive_keywords:
if keyword in dim_name:
results.append(ValidationResult(
category="security",
level="WARNING",
message=f"维度{dim.get('name')}可能包含敏感信息",
details={"dimension": dim.get('name'), "keyword": keyword},
suggestions=[
"考虑对敏感数据进行脱敏处理",
"实施行级安全控制",
"限制对敏感维度的访问权限"
]
))
return results
def _is_valid_identifier(self, name: str) -> bool:
"""验证标识符是否有效"""
if not name:
return False
return re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name) is not None
def _estimate_cuboid_count(self, model_config: Dict) -> int:
"""估算Cuboid数量"""
dimensions = model_config.get('dimensions', [])
aggregation_groups = model_config.get('aggregation_groups', [])
if not aggregation_groups:
# 如果没有聚合组,估算为2^n
return 2 ** min(len(dimensions), 20) # 限制最大值
total_cuboids = 0
for group in aggregation_groups:
includes = group.get('includes', [])
mandatory_dims = group.get('select_rule', {}).get('mandatory_dims', [])
hierarchy_dims = group.get('select_rule', {}).get('hierarchy_dims', [])
joint_dims = group.get('select_rule', {}).get('joint_dims', [])
# 简化的Cuboid数量估算
base_dims = len(includes)
if base_dims > 0:
group_cuboids = 2 ** min(base_dims, 15) # 限制单个组的最大值
total_cuboids += group_cuboids
return total_cuboids
def generate_validation_report(self, model_config: Dict) -> Dict:
"""生成验证报告"""
all_results = []
# 执行所有验证
all_results.extend(self.validate_model_structure(model_config))
all_results.extend(self.validate_dimensions(model_config))
all_results.extend(self.validate_measures(model_config))
all_results.extend(self.validate_relationships(model_config))
all_results.extend(self.validate_performance(model_config))
all_results.extend(self.validate_business_logic(model_config))
all_results.extend(self.validate_security(model_config))
# 统计结果
error_count = len([r for r in all_results if r.level == 'ERROR'])
warning_count = len([r for r in all_results if r.level == 'WARNING'])
info_count = len([r for r in all_results if r.level == 'INFO'])
# 计算质量评分
quality_score = max(0, 100 - error_count * 20 - warning_count * 5)
return {
"model_name": model_config.get('name', 'unknown'),
"validation_timestamp": datetime.now().isoformat(),
"quality_score": quality_score,
"summary": {
"total_issues": len(all_results),
"errors": error_count,
"warnings": warning_count,
"info": info_count
},
"results": [{
"category": r.category,
"level": r.level,
"message": r.message,
"details": r.details,
"suggestions": r.suggestions
} for r in all_results],
"recommendations": self._generate_recommendations(all_results)
}
def _generate_recommendations(self, results: List[ValidationResult]) -> List[str]:
"""生成改进建议"""
recommendations = []
error_count = len([r for r in results if r.level == 'ERROR'])
warning_count = len([r for r in results if r.level == 'WARNING'])
if error_count > 0:
recommendations.append(f"修复{error_count}个错误以确保模型可用性")
if warning_count > 5:
recommendations.append(f"关注{warning_count}个警告以提高模型质量")
# 基于问题类型的建议
categories = {}
for result in results:
if result.level in ['ERROR', 'WARNING']:
categories[result.category] = categories.get(result.category, 0) + 1
if categories.get('performance', 0) > 2:
recommendations.append("重点关注性能优化,考虑重新设计聚合组")
if categories.get('structure', 0) > 2:
recommendations.append("检查模型结构设计,确保符合最佳实践")
if categories.get('business_logic', 0) > 1:
recommendations.append("与业务专家确认度量和维度的业务含义")
return recommendations
def print_validation_report(self, model_config: Dict):
"""打印验证报告"""
report = self.generate_validation_report(model_config)
print(f"\n=== 数据模型验证报告 ===")
print(f"模型名称: {report['model_name']}")
print(f"验证时间: {report['validation_timestamp']}")
print(f"质量评分: {report['quality_score']}/100")
# 问题统计
summary = report['summary']
print(f"\n📊 问题统计:")
print(f" 总问题数: {summary['total_issues']}")
print(f" 错误: {summary['errors']}")
print(f" 警告: {summary['warnings']}")
print(f" 信息: {summary['info']}")
# 详细结果
if report['results']:
print(f"\n🔍 详细结果:")
for i, result in enumerate(report['results'], 1):
level_icon = {'ERROR': '❌', 'WARNING': '⚠️', 'INFO': 'ℹ️'}.get(result['level'], '•')
print(f"\n{level_icon} {i}. [{result['category']}] {result['message']}")
if result['suggestions']:
print(" 建议:")
for suggestion in result['suggestions']:
print(f" - {suggestion}")
# 改进建议
if report['recommendations']:
print(f"\n💡 改进建议:")
for i, rec in enumerate(report['recommendations'], 1):
print(f" {i}. {rec}")
def main():
# 示例使用
validator = AdvancedModelValidator()
# 示例模型配置
model_config = {
"name": "sales_model",
"fact_table": "sales_fact",
"dimensions": [
{
"name": "time_dim",
"table": "time_hierarchy",
"column": "date_key",
"cardinality": 3650
},
{
"name": "product_dim",
"table": "product_hierarchy",
"column": "product_key",
"cardinality": 50000
}
],
"measures": [
{
"name": "sales_amount",
"function": "SUM",
"expression": "sales_fact.sales_amount"
},
{
"name": "order_count",
"function": "COUNT",
"expression": "1"
}
],
"lookups": [
{
"table": "time_hierarchy",
"join": {
"type": "inner",
"primary_key": ["time_hierarchy.date_key"],
"foreign_key": ["sales_fact.time_key"]
}
}
],
"partition_desc": {
"partition_date_column": "time_key",
"partition_date_start": "2020-01-01"
},
"aggregation_groups": [
{
"includes": ["time_dim", "product_dim"],
"select_rule": {
"mandatory_dims": ["time_dim"]
}
}
]
}
validator.print_validation_report(model_config)
if __name__ == "__main__":
main()
quarter_key INT,
quarter_name VARCHAR(10),
quarter_of_year INT,
-- 月层次
month_key INT,
month_name VARCHAR(20),
month_of_year INT,
month_of_quarter INT,
-- 周层次
week_key INT,
week_of_year INT,
week_of_month INT,
-- 日层次
day_of_year INT,
day_of_quarter INT,
day_of_month INT,
day_of_week INT,
day_name VARCHAR(10),
-- 业务属性
is_weekend BOOLEAN,
is_holiday BOOLEAN,
is_workday BOOLEAN,
fiscal_year INT,
fiscal_quarter INT,
fiscal_month INT
);
– 地理维度层次示例 CREATE TABLE geography_hierarchy ( geography_key INT PRIMARY KEY, – 最细粒度 store_id VARCHAR(50), store_name VARCHAR(200), – 城市层次 city_key INT, city_name VARCHAR(100), – 州/省层次 state_key INT, state_name VARCHAR(100), state_code VARCHAR(10), – 区域层次 region_key INT, region_name VARCHAR(100), – 国家层次 country_key INT, country_name VARCHAR(100), country_code VARCHAR(10), – 大洲层次 continent_key INT, continent_name VARCHAR(50), – 地理坐标 latitude DECIMAL(10,8), longitude DECIMAL(11,8), timezone VARCHAR(50) );
## 5.2 Kylin数据模型创建
### 5.2.1 数据模型基础
在Apache Kylin中,数据模型定义了事实表和维度表之间的关系,是构建Cube的基础。
**数据模型组成:**
1. **事实表**:包含度量数据的主表
2. **维度表**:提供描述信息的查找表
3. **连接关系**:定义表之间的关联方式
4. **分区信息**:用于数据分区和增量构建
### 5.2.2 创建数据模型
**通过Web UI创建数据模型:**
1. **登录Kylin Web UI**
- 访问 http://kylin-server:7070/kylin
- 使用管理员账户登录
2. **进入模型管理页面**
- 点击"Models"菜单
- 点击"+ New"按钮
3. **配置基本信息**
```json
{
"name": "sales_model",
"description": "销售数据模型",
"fact_table": "default.sales_fact",
"lookups": [
{
"table": "default.date_dim",
"kind": "LOOKUP",
"alias": "date_dim",
"join": {
"type": "INNER",
"primary_key": ["date_key"],
"foreign_key": ["date_key"]
}
},
{
"table": "default.product_dim",
"kind": "LOOKUP",
"alias": "product_dim",
"join": {
"type": "INNER",
"primary_key": ["product_key"],
"foreign_key": ["product_key"]
}
},
{
"table": "default.customer_dim",
"kind": "LOOKUP",
"alias": "customer_dim",
"join": {
"type": "LEFT",
"primary_key": ["customer_key"],
"foreign_key": ["customer_key"]
}
}
],
"partition_desc": {
"partition_date_column": "sales_fact.sale_date",
"partition_time_column": null,
"partition_date_start": 0,
"partition_date_format": "yyyy-MM-dd",
"partition_type": "APPEND",
"partition_condition_builder": "org.apache.kylin.metadata.model.DefaultPartitionConditionBuilder"
},
"filter_condition": "sales_fact.sales_amount > 0",
"capacity": "MEDIUM"
}
通过REST API创建数据模型:
#!/bin/bash
# create_data_model.sh - 创建数据模型脚本
KYLIN_HOST="localhost:7070"
USERNAME="ADMIN"
PASSWORD="KYLIN"
PROJECT="sales_project"
# 数据模型JSON配置
read -r -d '' MODEL_JSON << 'EOF'
{
"uuid": null,
"name": "sales_model",
"owner": "ADMIN",
"description": "销售数据分析模型",
"fact_table": "default.sales_fact",
"lookups": [
{
"table": "default.date_dim",
"kind": "LOOKUP",
"alias": "date_dim",
"join": {
"type": "INNER",
"primary_key": ["date_key"],
"foreign_key": ["date_key"]
}
},
{
"table": "default.product_dim",
"kind": "LOOKUP",
"alias": "product_dim",
"join": {
"type": "INNER",
"primary_key": ["product_key"],
"foreign_key": ["product_key"]
}
},
{
"table": "default.customer_dim",
"kind": "LOOKUP",
"alias": "customer_dim",
"join": {
"type": "LEFT",
"primary_key": ["customer_key"],
"foreign_key": ["customer_key"]
}
},
{
"table": "default.store_dim",
"kind": "LOOKUP",
"alias": "store_dim",
"join": {
"type": "INNER",
"primary_key": ["store_key"],
"foreign_key": ["store_key"]
}
}
],
"dimensions": [
{
"table": "sales_fact",
"columns": ["sale_id"]
},
{
"table": "date_dim",
"columns": ["year", "quarter", "month", "week", "day_of_week", "is_weekend"]
},
{
"table": "product_dim",
"columns": ["category", "subcategory", "brand", "product_name"]
},
{
"table": "customer_dim",
"columns": ["gender", "age_group", "city", "state", "customer_segment"]
},
{
"table": "store_dim",
"columns": ["store_type", "city", "state", "region"]
}
],
"metrics": [
{
"name": "sales_amount",
"function": {
"expression": "SUM",
"parameter": {
"type": "column",
"value": "sales_fact.sales_amount"
},
"returntype": "decimal(19,4)"
}
},
{
"name": "quantity",
"function": {
"expression": "SUM",
"parameter": {
"type": "column",
"value": "sales_fact.quantity"
},
"returntype": "bigint"
}
},
{
"name": "profit",
"function": {
"expression": "SUM",
"parameter": {
"type": "column",
"value": "sales_fact.profit"
},
"returntype": "decimal(19,4)"
}
},
{
"name": "transaction_count",
"function": {
"expression": "COUNT",
"parameter": {
"type": "constant",
"value": "1"
},
"returntype": "bigint"
}
}
],
"partition_desc": {
"partition_date_column": "sales_fact.sale_date",
"partition_date_start": 0,
"partition_date_format": "yyyy-MM-dd",
"partition_type": "APPEND"
},
"filter_condition": "sales_fact.sales_amount > 0 AND sales_fact.quantity > 0",
"capacity": "MEDIUM"
}
EOF
# 创建数据模型
echo "创建数据模型: sales_model"
response=$(curl -s -X POST \
"http://${KYLIN_HOST}/kylin/api/models" \
-H "Content-Type: application/json" \
-H "Authorization: Basic $(echo -n ${USERNAME}:${PASSWORD} | base64)" \
-d "${MODEL_JSON}")
if echo "$response" | grep -q '"uuid"'; then
echo "✅ 数据模型创建成功"
echo "$response" | python -m json.tool
else
echo "❌ 数据模型创建失败"
echo "错误信息: $response"
exit 1
fi
# 获取模型列表验证
echo "\n验证模型创建..."
models=$(curl -s -X GET \
"http://${KYLIN_HOST}/kylin/api/models" \
-H "Authorization: Basic $(echo -n ${USERNAME}:${PASSWORD} | base64)")
if echo "$models" | grep -q "sales_model"; then
echo "✅ 模型验证成功"
else
echo "❌ 模型验证失败"
fi
5.2.3 数据模型验证
模型验证脚本:
#!/usr/bin/env python3
# model_validator.py - 数据模型验证工具
import requests
import json
import sys
from datetime import datetime
class ModelValidator:
def __init__(self, kylin_host, username, password):
self.kylin_host = kylin_host
self.username = username
self.password = password
self.session = requests.Session()
self.session.auth = (username, password)
def validate_model(self, model_name):
"""验证数据模型"""
print(f"开始验证数据模型: {model_name}")
validation_result = {
'model_name': model_name,
'validation_time': datetime.now().isoformat(),
'checks': {},
'overall_status': 'unknown',
'recommendations': []
}
try:
# 获取模型信息
model_info = self.get_model_info(model_name)
if not model_info:
validation_result['overall_status'] = 'failed'
validation_result['checks']['model_exists'] = {
'status': 'failed',
'message': '模型不存在'
}
return validation_result
# 执行各项检查
validation_result['checks']['model_exists'] = {
'status': 'passed',
'message': '模型存在'
}
# 检查事实表
self.check_fact_table(model_info, validation_result)
# 检查维度表
self.check_dimension_tables(model_info, validation_result)
# 检查连接关系
self.check_join_relationships(model_info, validation_result)
# 检查分区配置
self.check_partition_config(model_info, validation_result)
# 检查维度和度量
self.check_dimensions_and_measures(model_info, validation_result)
# 检查数据质量
self.check_data_quality(model_info, validation_result)
# 计算总体状态
self.calculate_overall_status(validation_result)
# 生成建议
self.generate_recommendations(validation_result)
except Exception as e:
validation_result['overall_status'] = 'error'
validation_result['checks']['validation_error'] = {
'status': 'error',
'message': f'验证过程出错: {str(e)}'
}
return validation_result
def get_model_info(self, model_name):
"""获取模型信息"""
try:
response = self.session.get(
f"http://{self.kylin_host}/kylin/api/models/{model_name}"
)
if response.status_code == 200:
return response.json()
else:
return None
except Exception as e:
print(f"获取模型信息失败: {e}")
return None
def check_fact_table(self, model_info, validation_result):
"""检查事实表"""
fact_table = model_info.get('fact_table')
if not fact_table:
validation_result['checks']['fact_table'] = {
'status': 'failed',
'message': '未定义事实表'
}
return
# 检查事实表是否存在
table_exists = self.check_table_exists(fact_table)
if table_exists:
# 检查事实表记录数
record_count = self.get_table_record_count(fact_table)
validation_result['checks']['fact_table'] = {
'status': 'passed' if record_count > 0 else 'warning',
'message': f'事实表存在,记录数: {record_count}',
'details': {
'table_name': fact_table,
'record_count': record_count
}
}
if record_count == 0:
validation_result['recommendations'].append(
'事实表没有数据,请检查数据加载过程'
)
else:
validation_result['checks']['fact_table'] = {
'status': 'failed',
'message': f'事实表不存在: {fact_table}'
}
def check_dimension_tables(self, model_info, validation_result):
"""检查维度表"""
lookups = model_info.get('lookups', [])
if not lookups:
validation_result['checks']['dimension_tables'] = {
'status': 'warning',
'message': '没有定义维度表'
}
return
dimension_check_results = []
for lookup in lookups:
table_name = lookup.get('table')
alias = lookup.get('alias')
table_exists = self.check_table_exists(table_name)
record_count = self.get_table_record_count(table_name) if table_exists else 0
dimension_check_results.append({
'table': table_name,
'alias': alias,
'exists': table_exists,
'record_count': record_count,
'status': 'passed' if table_exists and record_count > 0 else 'failed'
})
failed_dimensions = [d for d in dimension_check_results if d['status'] == 'failed']
if failed_dimensions:
validation_result['checks']['dimension_tables'] = {
'status': 'failed',
'message': f'{len(failed_dimensions)} 个维度表存在问题',
'details': dimension_check_results
}
else:
validation_result['checks']['dimension_tables'] = {
'status': 'passed',
'message': f'所有 {len(dimension_check_results)} 个维度表正常',
'details': dimension_check_results
}
def check_join_relationships(self, model_info, validation_result):
"""检查连接关系"""
lookups = model_info.get('lookups', [])
join_check_results = []
for lookup in lookups:
join_info = lookup.get('join', {})
table_name = lookup.get('table')
primary_key = join_info.get('primary_key', [])
foreign_key = join_info.get('foreign_key', [])
join_type = join_info.get('type', 'INNER')
# 检查连接键是否匹配
keys_match = len(primary_key) == len(foreign_key) and len(primary_key) > 0
join_check_results.append({
'table': table_name,
'join_type': join_type,
'primary_key': primary_key,
'foreign_key': foreign_key,
'keys_match': keys_match,
'status': 'passed' if keys_match else 'failed'
})
failed_joins = [j for j in join_check_results if j['status'] == 'failed']
if failed_joins:
validation_result['checks']['join_relationships'] = {
'status': 'failed',
'message': f'{len(failed_joins)} 个连接关系配置错误',
'details': join_check_results
}
else:
validation_result['checks']['join_relationships'] = {
'status': 'passed',
'message': '所有连接关系配置正确',
'details': join_check_results
}
def check_partition_config(self, model_info, validation_result):
"""检查分区配置"""
partition_desc = model_info.get('partition_desc')
if not partition_desc:
validation_result['checks']['partition_config'] = {
'status': 'warning',
'message': '未配置分区信息'
}
return
partition_column = partition_desc.get('partition_date_column')
partition_format = partition_desc.get('partition_date_format')
if partition_column and partition_format:
validation_result['checks']['partition_config'] = {
'status': 'passed',
'message': '分区配置正确',
'details': {
'partition_column': partition_column,
'partition_format': partition_format
}
}
else:
validation_result['checks']['partition_config'] = {
'status': 'failed',
'message': '分区配置不完整'
}
def check_dimensions_and_measures(self, model_info, validation_result):
"""检查维度和度量"""
dimensions = model_info.get('dimensions', [])
metrics = model_info.get('metrics', [])
dimension_count = sum(len(dim.get('columns', [])) for dim in dimensions)
measure_count = len(metrics)
status = 'passed'
messages = []
if dimension_count == 0:
status = 'failed'
messages.append('没有定义维度')
if measure_count == 0:
status = 'failed'
messages.append('没有定义度量')
if dimension_count > 50:
status = 'warning'
messages.append(f'维度数量过多({dimension_count}),可能影响性能')
validation_result['checks']['dimensions_and_measures'] = {
'status': status,
'message': '; '.join(messages) if messages else f'维度: {dimension_count}, 度量: {measure_count}',
'details': {
'dimension_count': dimension_count,
'measure_count': measure_count
}
}
def check_data_quality(self, model_info, validation_result):
"""检查数据质量"""
fact_table = model_info.get('fact_table')
if not fact_table:
validation_result['checks']['data_quality'] = {
'status': 'skipped',
'message': '无法检查数据质量:事实表未定义'
}
return
try:
# 检查空值比例
null_check_query = f"""
SELECT
COUNT(*) as total_records,
COUNT(CASE WHEN sales_amount IS NULL THEN 1 END) as null_sales,
COUNT(CASE WHEN quantity IS NULL THEN 1 END) as null_quantity
FROM {fact_table}
LIMIT 1
"""
# 这里应该执行查询,但为了示例,我们模拟结果
total_records = 1000000
null_sales = 100
null_quantity = 50
null_ratio = (null_sales + null_quantity) / (total_records * 2)
if null_ratio < 0.01: # 小于1%
status = 'passed'
message = f'数据质量良好,空值比例: {null_ratio:.2%}'
elif null_ratio < 0.05: # 小于5%
status = 'warning'
message = f'数据质量一般,空值比例: {null_ratio:.2%}'
else:
status = 'failed'
message = f'数据质量较差,空值比例: {null_ratio:.2%}'
validation_result['checks']['data_quality'] = {
'status': status,
'message': message,
'details': {
'total_records': total_records,
'null_sales': null_sales,
'null_quantity': null_quantity,
'null_ratio': null_ratio
}
}
except Exception as e:
validation_result['checks']['data_quality'] = {
'status': 'error',
'message': f'数据质量检查失败: {str(e)}'
}
def check_table_exists(self, table_name):
"""检查表是否存在"""
try:
response = self.session.get(
f"http://{self.kylin_host}/kylin/api/tables/{table_name}"
)
return response.status_code == 200
except:
return False
def get_table_record_count(self, table_name):
"""获取表记录数(模拟)"""
# 在实际实现中,这里应该执行SQL查询
# 为了示例,我们返回模拟数据
table_counts = {
'default.sales_fact': 1000000,
'default.date_dim': 3650,
'default.product_dim': 5000,
'default.customer_dim': 50000,
'default.store_dim': 2000
}
return table_counts.get(table_name, 0)
def calculate_overall_status(self, validation_result):
"""计算总体状态"""
checks = validation_result['checks']
failed_checks = [name for name, check in checks.items()
if check.get('status') == 'failed']
error_checks = [name for name, check in checks.items()
if check.get('status') == 'error']
warning_checks = [name for name, check in checks.items()
if check.get('status') == 'warning']
if error_checks or failed_checks:
validation_result['overall_status'] = 'failed'
elif warning_checks:
validation_result['overall_status'] = 'warning'
else:
validation_result['overall_status'] = 'passed'
def generate_recommendations(self, validation_result):
"""生成建议"""
checks = validation_result['checks']
recommendations = validation_result['recommendations']
# 基于检查结果生成建议
if checks.get('fact_table', {}).get('status') == 'failed':
recommendations.append('请确保事实表存在且包含数据')
if checks.get('dimension_tables', {}).get('status') == 'failed':
recommendations.append('请检查维度表的存在性和数据完整性')
if checks.get('join_relationships', {}).get('status') == 'failed':
recommendations.append('请检查表连接关系的主外键配置')
if checks.get('partition_config', {}).get('status') in ['failed', 'warning']:
recommendations.append('建议配置分区信息以提高查询性能')
dimension_details = checks.get('dimensions_and_measures', {}).get('details', {})
if dimension_details.get('dimension_count', 0) > 30:
recommendations.append('考虑减少维度数量或使用聚合组优化')
if checks.get('data_quality', {}).get('status') in ['failed', 'warning']:
recommendations.append('建议改善数据质量,减少空值和异常数据')
def print_validation_report(self, validation_result):
"""打印验证报告"""
print("\n=== 数据模型验证报告 ===")
print(f"模型名称: {validation_result['model_name']}")
print(f"验证时间: {validation_result['validation_time']}")
print(f"总体状态: {validation_result['overall_status'].upper()}")
# 打印检查结果
print("\n📋 检查结果:")
for check_name, check_result in validation_result['checks'].items():
status = check_result['status']
message = check_result['message']
status_icon = {
'passed': '✅',
'warning': '⚠️',
'failed': '❌',
'error': '🔴',
'skipped': '⏭️'
}.get(status, '❓')
print(f" {status_icon} {check_name}: {message}")
# 打印建议
recommendations = validation_result['recommendations']
if recommendations:
print("\n💡 优化建议:")
for i, rec in enumerate(recommendations, 1):
print(f" {i}. {rec}")
else:
print("\n✨ 模型配置良好,无需额外优化")
def main():
import argparse
parser = argparse.ArgumentParser(description='数据模型验证工具')
parser.add_argument('--host', default='localhost:7070', help='Kylin服务器地址')
parser.add_argument('--username', default='ADMIN', help='用户名')
parser.add_argument('--password', default='KYLIN', help='密码')
parser.add_argument('--model', required=True, help='要验证的模型名称')
parser.add_argument('--output', help='输出报告文件路径')
args = parser.parse_args()
validator = ModelValidator(args.host, args.username, args.password)
result = validator.validate_model(args.model)
# 打印报告
validator.print_validation_report(result)
# 保存报告
if args.output:
with open(args.output, 'w', encoding='utf-8') as f:
json.dump(result, f, indent=2, ensure_ascii=False, default=str)
print(f"\n📄 详细报告已保存到: {args.output}")
# 返回适当的退出码
if result['overall_status'] == 'failed':
sys.exit(1)
elif result['overall_status'] == 'warning':
sys.exit(2)
else:
sys.exit(0)
if __name__ == "__main__":
main()
5.3 维度和度量定义
5.3.1 维度设计原则
1. 维度层次设计
维度应该包含自然的层次结构,支持上钻和下钻分析:
{
"time_dimension": {
"table": "date_dim",
"hierarchy": [
{
"level": "year",
"column": "year",
"name_column": "year_name"
},
{
"level": "quarter",
"column": "quarter",
"name_column": "quarter_name"
},
{
"level": "month",
"column": "month",
"name_column": "month_name"
},
{
"level": "day",
"column": "full_date",
"name_column": "full_date"
}
]
},
"product_dimension": {
"table": "product_dim",
"hierarchy": [
{
"level": "category",
"column": "category",
"name_column": "category"
},
{
"level": "subcategory",
"column": "subcategory",
"name_column": "subcategory"
},
{
"level": "product",
"column": "product_id",
"name_column": "product_name"
}
]
}
}
2. 维度属性设计
#!/usr/bin/env python3
# dimension_designer.py - 维度设计工具
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
class DimensionType(Enum):
"""维度类型"""
NORMAL = "normal" # 普通维度
DERIVED = "derived" # 派生维度
HIERARCHY = "hierarchy" # 层次维度
SLOWLY_CHANGING = "scd" # 缓慢变化维度
class DataType(Enum):
"""数据类型"""
STRING = "string"
INTEGER = "integer"
DECIMAL = "decimal"
DATE = "date"
TIMESTAMP = "timestamp"
BOOLEAN = "boolean"
@dataclass
class DimensionAttribute:
"""维度属性"""
name: str
column: str
data_type: DataType
description: str
is_key: bool = False
is_name: bool = False
is_hierarchical: bool = False
parent_attribute: Optional[str] = None
format_pattern: Optional[str] = None
default_value: Optional[str] = None
@dataclass
class Dimension:
"""维度定义"""
name: str
table: str
dimension_type: DimensionType
description: str
attributes: List[DimensionAttribute]
primary_key: List[str]
business_key: Optional[str] = None
def get_hierarchy_levels(self) -> List[DimensionAttribute]:
"""获取层次级别"""
return [attr for attr in self.attributes if attr.is_hierarchical]
def get_key_attributes(self) -> List[DimensionAttribute]:
"""获取键属性"""
return [attr for attr in self.attributes if attr.is_key]
def get_name_attributes(self) -> List[DimensionAttribute]:
"""获取名称属性"""
return [attr for attr in self.attributes if attr.is_name]
class DimensionDesigner:
"""维度设计器"""
def __init__(self):
self.dimensions = {}
self.design_rules = {
'max_attributes_per_dimension': 20,
'max_hierarchy_levels': 6,
'require_business_key': True,
'require_name_attribute': True
}
def create_time_dimension(self) -> Dimension:
"""创建时间维度"""
time_attributes = [
DimensionAttribute(
name="date_key",
column="date_key",
data_type=DataType.INTEGER,
description="日期键",
is_key=True
),
DimensionAttribute(
name="full_date",
column="full_date",
data_type=DataType.DATE,
description="完整日期",
is_name=True,
format_pattern="yyyy-MM-dd"
),
DimensionAttribute(
name="year",
column="year",
data_type=DataType.INTEGER,
description="年份",
is_hierarchical=True
),
DimensionAttribute(
name="quarter",
column="quarter",
data_type=DataType.INTEGER,
description="季度",
is_hierarchical=True,
parent_attribute="year"
),
DimensionAttribute(
name="month",
column="month",
data_type=DataType.INTEGER,
description="月份",
is_hierarchical=True,
parent_attribute="quarter"
),
DimensionAttribute(
name="day_of_month",
column="day_of_month",
data_type=DataType.INTEGER,
description="月中的天",
is_hierarchical=True,
parent_attribute="month"
),
DimensionAttribute(
name="day_of_week",
column="day_of_week",
data_type=DataType.INTEGER,
description="星期几"
),
DimensionAttribute(
name="is_weekend",
column="is_weekend",
data_type=DataType.BOOLEAN,
description="是否周末"
),
DimensionAttribute(
name="is_holiday",
column="is_holiday",
data_type=DataType.BOOLEAN,
description="是否节假日"
)
]
return Dimension(
name="time_dimension",
table="date_dim",
dimension_type=DimensionType.HIERARCHY,
description="时间维度",
attributes=time_attributes,
primary_key=["date_key"],
business_key="full_date"
)
def create_product_dimension(self) -> Dimension:
"""创建产品维度"""
product_attributes = [
DimensionAttribute(
name="product_key",
column="product_key",
data_type=DataType.INTEGER,
description="产品键",
is_key=True
),
DimensionAttribute(
name="product_id",
column="product_id",
data_type=DataType.STRING,
description="产品ID"
),
DimensionAttribute(
name="product_name",
column="product_name",
data_type=DataType.STRING,
description="产品名称",
is_name=True
),
DimensionAttribute(
name="category",
column="category",
data_type=DataType.STRING,
description="产品类别",
is_hierarchical=True
),
DimensionAttribute(
name="subcategory",
column="subcategory",
data_type=DataType.STRING,
description="产品子类别",
is_hierarchical=True,
parent_attribute="category"
),
DimensionAttribute(
name="brand",
column="brand",
data_type=DataType.STRING,
description="品牌"
),
DimensionAttribute(
name="unit_price",
column="unit_price",
data_type=DataType.DECIMAL,
description="单价"
),
DimensionAttribute(
name="product_status",
column="product_status",
data_type=DataType.STRING,
description="产品状态",
default_value="Active"
)
]
return Dimension(
name="product_dimension",
table="product_dim",
dimension_type=DimensionType.HIERARCHY,
description="产品维度",
attributes=product_attributes,
primary_key=["product_key"],
business_key="product_id"
)
def create_customer_dimension(self) -> Dimension:
"""创建客户维度"""
customer_attributes = [
DimensionAttribute(
name="customer_key",
column="customer_key",
data_type=DataType.INTEGER,
description="客户键",
is_key=True
),
DimensionAttribute(
name="customer_id",
column="customer_id",
data_type=DataType.STRING,
description="客户ID"
),
DimensionAttribute(
name="customer_name",
column="customer_name",
data_type=DataType.STRING,
description="客户姓名",
is_name=True
),
DimensionAttribute(
name="gender",
column="gender",
data_type=DataType.STRING,
description="性别"
),
DimensionAttribute(
name="age_group",
column="age_group",
data_type=DataType.STRING,
description="年龄组"
),
DimensionAttribute(
name="country",
column="country",
data_type=DataType.STRING,
description="国家",
is_hierarchical=True
),
DimensionAttribute(
name="state",
column="state",
data_type=DataType.STRING,
description="州/省",
is_hierarchical=True,
parent_attribute="country"
),
DimensionAttribute(
name="city",
column="city",
data_type=DataType.STRING,
description="城市",
is_hierarchical=True,
parent_attribute="state"
),
DimensionAttribute(
name="customer_segment",
column="customer_segment",
data_type=DataType.STRING,
description="客户细分"
)
]
return Dimension(
name="customer_dimension",
table="customer_dim",
dimension_type=DimensionType.SLOWLY_CHANGING,
description="客户维度",
attributes=customer_attributes,
primary_key=["customer_key"],
business_key="customer_id"
)
def validate_dimension(self, dimension: Dimension) -> Dict:
"""验证维度设计"""
validation_result = {
'dimension_name': dimension.name,
'is_valid': True,
'warnings': [],
'errors': []
}
# 检查属性数量
if len(dimension.attributes) > self.design_rules['max_attributes_per_dimension']:
validation_result['warnings'].append(
f"属性数量({len(dimension.attributes)})超过建议值"
f"({self.design_rules['max_attributes_per_dimension']})"
)
# 检查层次级别
hierarchy_levels = dimension.get_hierarchy_levels()
if len(hierarchy_levels) > self.design_rules['max_hierarchy_levels']:
validation_result['warnings'].append(
f"层次级别({len(hierarchy_levels)})超过建议值"
f"({self.design_rules['max_hierarchy_levels']})"
)
# 检查是否有业务键
if self.design_rules['require_business_key'] and not dimension.business_key:
validation_result['errors'].append("缺少业务键")
validation_result['is_valid'] = False
# 检查是否有名称属性
name_attributes = dimension.get_name_attributes()
if self.design_rules['require_name_attribute'] and not name_attributes:
validation_result['errors'].append("缺少名称属性")
validation_result['is_valid'] = False
# 检查主键
if not dimension.primary_key:
validation_result['errors'].append("缺少主键定义")
validation_result['is_valid'] = False
return validation_result
def generate_dimension_ddl(self, dimension: Dimension) -> str:
"""生成维度表DDL"""
ddl_lines = []
ddl_lines.append(f"-- {dimension.description}")
ddl_lines.append(f"CREATE TABLE {dimension.table} (")
# 生成列定义
column_definitions = []
for attr in dimension.attributes:
column_def = f" {attr.column} {self._get_sql_type(attr.data_type)}"
if attr.is_key:
column_def += " NOT NULL"
if attr.default_value:
column_def += f" DEFAULT '{attr.default_value}'"
column_def += f" COMMENT '{attr.description}'"
column_definitions.append(column_def)
ddl_lines.extend(column_definitions)
# 添加主键约束
if dimension.primary_key:
pk_columns = ", ".join(dimension.primary_key)
ddl_lines.append(f" PRIMARY KEY ({pk_columns})")
ddl_lines.append(");")
return ",\n".join(ddl_lines[:-1]) + "\n" + ddl_lines[-1]
def _get_sql_type(self, data_type: DataType) -> str:
"""获取SQL数据类型"""
type_mapping = {
DataType.STRING: "VARCHAR(255)",
DataType.INTEGER: "INT",
DataType.DECIMAL: "DECIMAL(19,4)",
DataType.DATE: "DATE",
DataType.TIMESTAMP: "TIMESTAMP",
DataType.BOOLEAN: "BOOLEAN"
}
return type_mapping.get(data_type, "VARCHAR(255)")
def export_kylin_model_config(self, dimensions: List[Dimension]) -> Dict:
"""导出Kylin模型配置"""
model_config = {
"dimensions": [],
"lookups": []
}
for dimension in dimensions:
# 添加维度配置
dim_config = {
"table": dimension.table.split('.')[-1], # 去掉schema前缀
"columns": [attr.column for attr in dimension.attributes if not attr.is_key]
}
model_config["dimensions"].append(dim_config)
# 添加查找表配置
lookup_config = {
"table": dimension.table,
"kind": "LOOKUP",
"alias": dimension.table.split('.')[-1],
"join": {
"type": "INNER",
"primary_key": dimension.primary_key,
"foreign_key": dimension.primary_key # 假设外键名称相同
}
}
model_config["lookups"].append(lookup_config)
return model_config
def main():
# 示例使用
designer = DimensionDesigner()
# 创建维度
time_dim = designer.create_time_dimension()
product_dim = designer.create_product_dimension()
customer_dim = designer.create_customer_dimension()
dimensions = [time_dim, product_dim, customer_dim]
# 验证维度
print("=== 维度验证结果 ===")
for dim in dimensions:
result = designer.validate_dimension(dim)
print(f"\n{dim.name}:")
print(f" 有效性: {'✅' if result['is_valid'] else '❌'}")
if result['warnings']:
print(" 警告:")
for warning in result['warnings']:
print(f" ⚠️ {warning}")
if result['errors']:
print(" 错误:")
for error in result['errors']:
print(f" ❌ {error}")
# 生成DDL
print("\n=== 生成的DDL ===")
for dim in dimensions:
print(f"\n{designer.generate_dimension_ddl(dim)}")
# 导出Kylin配置
kylin_config = designer.export_kylin_model_config(dimensions)
print("\n=== Kylin模型配置 ===")
import json
print(json.dumps(kylin_config, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
5.3.2 度量设计原则
1. 度量类型分类
#!/usr/bin/env python3
# measure_designer.py - 度量设计工具
from dataclasses import dataclass
from typing import List, Dict, Optional, Union
from enum import Enum
class MeasureType(Enum):
"""度量类型"""
ADDITIVE = "additive" # 可加性度量
SEMI_ADDITIVE = "semi_additive" # 半可加性度量
NON_ADDITIVE = "non_additive" # 非可加性度量
DERIVED = "derived" # 派生度量
class AggregationFunction(Enum):
"""聚合函数"""
SUM = "SUM"
COUNT = "COUNT"
COUNT_DISTINCT = "COUNT_DISTINCT"
AVG = "AVG"
MIN = "MIN"
MAX = "MAX"
PERCENTILE = "PERCENTILE"
TOP_N = "TOP_N"
@dataclass
class MeasureParameter:
"""度量参数"""
type: str # column, constant, expression
value: str
data_type: str = "decimal(19,4)"
@dataclass
class Measure:
"""度量定义"""
name: str
description: str
measure_type: MeasureType
aggregation_function: AggregationFunction
parameter: MeasureParameter
return_type: str
format_pattern: Optional[str] = None
is_visible: bool = True
depends_on: Optional[List[str]] = None # 依赖的其他度量
def to_kylin_config(self) -> Dict:
"""转换为Kylin配置格式"""
return {
"name": self.name,
"function": {
"expression": self.aggregation_function.value,
"parameter": {
"type": self.parameter.type,
"value": self.parameter.value
},
"returntype": self.return_type
}
}
class MeasureDesigner:
"""度量设计器"""
def __init__(self):
self.measures = {}
self.design_rules = {
'max_measures_per_cube': 50,
'prefer_additive_measures': True,
'require_count_measure': True
}
def create_basic_measures(self, fact_table: str) -> List[Measure]:
"""创建基础度量"""
basic_measures = []
# 销售金额 - 可加性度量
sales_amount = Measure(
name="sales_amount",
description="销售金额",
measure_type=MeasureType.ADDITIVE,
aggregation_function=AggregationFunction.SUM,
parameter=MeasureParameter(
type="column",
value=f"{fact_table}.sales_amount",
data_type="decimal(19,4)"
),
return_type="decimal(19,4)",
format_pattern="#,##0.00"
)
basic_measures.append(sales_amount)
# 销售数量 - 可加性度量
quantity = Measure(
name="quantity",
description="销售数量",
measure_type=MeasureType.ADDITIVE,
aggregation_function=AggregationFunction.SUM,
parameter=MeasureParameter(
type="column",
value=f"{fact_table}.quantity",
data_type="bigint"
),
return_type="bigint",
format_pattern="#,##0"
)
basic_measures.append(quantity)
# 利润 - 可加性度量
profit = Measure(
name="profit",
description="利润",
measure_type=MeasureType.ADDITIVE,
aggregation_function=AggregationFunction.SUM,
parameter=MeasureParameter(
type="column",
value=f"{fact_table}.profit",
data_type="decimal(19,4)"
),
return_type="decimal(19,4)",
format_pattern="#,##0.00"
)
basic_measures.append(profit)
# 交易次数 - 计数度量
transaction_count = Measure(
name="transaction_count",
description="交易次数",
measure_type=MeasureType.ADDITIVE,
aggregation_function=AggregationFunction.COUNT,
parameter=MeasureParameter(
type="constant",
value="1",
data_type="bigint"
),
return_type="bigint",
format_pattern="#,##0"
)
basic_measures.append(transaction_count)
# 唯一客户数 - 去重计数度量
unique_customers = Measure(
name="unique_customers",
description="唯一客户数",
measure_type=MeasureType.SEMI_ADDITIVE,
aggregation_function=AggregationFunction.COUNT_DISTINCT,
parameter=MeasureParameter(
type="column",
value=f"{fact_table}.customer_key",
data_type="int"
),
return_type="bigint",
format_pattern="#,##0"
)
basic_measures.append(unique_customers)
return basic_measures
def create_derived_measures(self, basic_measures: List[Measure]) -> List[Measure]:
"""创建派生度量"""
derived_measures = []
# 平均销售金额 - 派生度量
avg_sales_amount = Measure(
name="avg_sales_amount",
description="平均销售金额",
measure_type=MeasureType.DERIVED,
aggregation_function=AggregationFunction.AVG,
parameter=MeasureParameter(
type="expression",
value="sales_amount / transaction_count",
data_type="decimal(19,4)"
),
return_type="decimal(19,4)",
format_pattern="#,##0.00",
depends_on=["sales_amount", "transaction_count"]
)
derived_measures.append(avg_sales_amount)
# 利润率 - 派生度量
profit_margin = Measure(
name="profit_margin",
description="利润率",
measure_type=MeasureType.DERIVED,
aggregation_function=AggregationFunction.AVG,
parameter=MeasureParameter(
type="expression",
value="profit / sales_amount",
data_type="decimal(10,4)"
),
return_type="decimal(10,4)",
format_pattern="0.00%",
depends_on=["profit", "sales_amount"]
)
derived_measures.append(profit_margin)
# 平均客单价 - 派生度量
avg_order_value = Measure(
name="avg_order_value",
description="平均客单价",
measure_type=MeasureType.DERIVED,
aggregation_function=AggregationFunction.AVG,
parameter=MeasureParameter(
type="expression",
value="sales_amount / unique_customers",
data_type="decimal(19,4)"
),
return_type="decimal(19,4)",
format_pattern="#,##0.00",
depends_on=["sales_amount", "unique_customers"]
)
derived_measures.append(avg_order_value)
return derived_measures
def validate_measures(self, measures: List[Measure]) -> Dict:
"""验证度量设计"""
validation_result = {
'is_valid': True,
'warnings': [],
'errors': [],
'statistics': {
'total_measures': len(measures),
'additive_measures': 0,
'semi_additive_measures': 0,
'non_additive_measures': 0,
'derived_measures': 0
}
}
# 统计度量类型
for measure in measures:
if measure.measure_type == MeasureType.ADDITIVE:
validation_result['statistics']['additive_measures'] += 1
elif measure.measure_type == MeasureType.SEMI_ADDITIVE:
validation_result['statistics']['semi_additive_measures'] += 1
elif measure.measure_type == MeasureType.NON_ADDITIVE:
validation_result['statistics']['non_additive_measures'] += 1
elif measure.measure_type == MeasureType.DERIVED:
validation_result['statistics']['derived_measures'] += 1
# 检查度量数量
if len(measures) > self.design_rules['max_measures_per_cube']:
validation_result['warnings'].append(
f"度量数量({len(measures)})超过建议值"
f"({self.design_rules['max_measures_per_cube']})"
)
# 检查是否有计数度量
has_count_measure = any(
measure.aggregation_function == AggregationFunction.COUNT
for measure in measures
)
if self.design_rules['require_count_measure'] and not has_count_measure:
validation_result['errors'].append("缺少计数度量")
validation_result['is_valid'] = False
# 检查派生度量的依赖关系
measure_names = {measure.name for measure in measures}
for measure in measures:
if measure.depends_on:
missing_dependencies = set(measure.depends_on) - measure_names
if missing_dependencies:
validation_result['errors'].append(
f"度量 {measure.name} 依赖的度量不存在: {missing_dependencies}"
)
validation_result['is_valid'] = False
# 检查可加性度量比例
if self.design_rules['prefer_additive_measures']:
additive_ratio = validation_result['statistics']['additive_measures'] / len(measures)
if additive_ratio < 0.5:
validation_result['warnings'].append(
f"可加性度量比例较低({additive_ratio:.1%}),建议增加可加性度量"
)
return validation_result
def generate_measure_documentation(self, measures: List[Measure]) -> str:
"""生成度量文档"""
doc_lines = []
doc_lines.append("# 度量定义文档\n")
# 按类型分组
measures_by_type = {}
for measure in measures:
measure_type = measure.measure_type.value
if measure_type not in measures_by_type:
measures_by_type[measure_type] = []
measures_by_type[measure_type].append(measure)
# 生成各类型的文档
type_names = {
'additive': '可加性度量',
'semi_additive': '半可加性度量',
'non_additive': '非可加性度量',
'derived': '派生度量'
}
for measure_type, type_measures in measures_by_type.items():
doc_lines.append(f"## {type_names.get(measure_type, measure_type)}\n")
for measure in type_measures:
doc_lines.append(f"### {measure.name}\n")
doc_lines.append(f"**描述**: {measure.description}\n")
doc_lines.append(f"**聚合函数**: {measure.aggregation_function.value}\n")
doc_lines.append(f"**数据源**: {measure.parameter.value}\n")
doc_lines.append(f"**返回类型**: {measure.return_type}\n")
if measure.format_pattern:
doc_lines.append(f"**格式**: {measure.format_pattern}\n")
if measure.depends_on:
doc_lines.append(f"**依赖度量**: {', '.join(measure.depends_on)}\n")
doc_lines.append("\n")
return "\n".join(doc_lines)
def export_kylin_measures_config(self, measures: List[Measure]) -> List[Dict]:
"""导出Kylin度量配置"""
return [measure.to_kylin_config() for measure in measures if not measure.measure_type == MeasureType.DERIVED]
def main():
# 示例使用
designer = MeasureDesigner()
# 创建基础度量
basic_measures = designer.create_basic_measures("sales_fact")
# 创建派生度量
derived_measures = designer.create_derived_measures(basic_measures)
# 合并所有度量
all_measures = basic_measures + derived_measures
# 验证度量
validation_result = designer.validate_measures(all_measures)
print("=== 度量验证结果 ===")
print(f"有效性: {'✅' if validation_result['is_valid'] else '❌'}")
print(f"总度量数: {validation_result['statistics']['total_measures']}")
print(f"可加性度量: {validation_result['statistics']['additive_measures']}")
print(f"半可加性度量: {validation_result['statistics']['semi_additive_measures']}")
print(f"派生度量: {validation_result['statistics']['derived_measures']}")
if validation_result['warnings']:
print("\n警告:")
for warning in validation_result['warnings']:
print(f" ⚠️ {warning}")
if validation_result['errors']:
print("\n错误:")
for error in validation_result['errors']:
print(f" ❌ {error}")
# 生成文档
documentation = designer.generate_measure_documentation(all_measures)
print("\n=== 度量文档 ===")
print(documentation)
# 导出Kylin配置
kylin_measures = designer.export_kylin_measures_config(all_measures)
print("=== Kylin度量配置 ===")
import json
print(json.dumps(kylin_measures, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()