5.1 维度建模理论基础

5.1.1 维度建模概述

维度建模是数据仓库设计的核心方法论,由Ralph Kimball提出。它将复杂的业务数据组织成易于理解和查询的结构。

核心概念:

  1. 事实表(Fact Table)

    • 存储业务过程的度量数据
    • 包含外键指向维度表
    • 通常包含数值型度量字段
    • 数据量大,更新频繁
  2. 维度表(Dimension Table)

    • 存储描述性信息
    • 提供查询的上下文
    • 包含层次结构信息
    • 数据量相对较小,更新较少
  3. 度量(Measure)

    • 可聚合的数值型数据
    • 如销售额、数量、利润等
    • 支持SUM、COUNT、AVG等聚合函数
  4. 维度(Dimension)

    • 查询的分析角度
    • 如时间、地区、产品、客户等
    • 提供数据的分组和过滤条件

5.1.2 星型模式(Star Schema)

星型模式是最简单和最常用的维度建模方式:

特点: - 中心有一个事实表 - 周围环绕多个维度表 - 维度表直接连接到事实表 - 结构简单,查询性能好

示例:销售数据星型模式

-- 事实表:销售事实
CREATE TABLE sales_fact (
    sale_id BIGINT,
    date_key INT,
    product_key INT,
    customer_key INT,
    store_key INT,
    sales_amount DECIMAL(10,2),
    quantity INT,
    discount_amount DECIMAL(10,2),
    profit DECIMAL(10,2)
);

-- 维度表:日期维度
CREATE TABLE date_dim (
    date_key INT PRIMARY KEY,
    full_date DATE,
    year INT,
    quarter INT,
    month INT,
    week INT,
    day_of_week INT,
    day_name VARCHAR(10),
    month_name VARCHAR(10),
    is_weekend BOOLEAN,
    is_holiday BOOLEAN
);

-- 维度表:产品维度
CREATE TABLE product_dim (
    product_key INT PRIMARY KEY,
    product_id VARCHAR(50),
    product_name VARCHAR(200),
    category VARCHAR(100),
    subcategory VARCHAR(100),
    brand VARCHAR(100),
    unit_price DECIMAL(10,2),
    product_status VARCHAR(20)
);

-- 维度表:客户维度
CREATE TABLE customer_dim (
    customer_key INT PRIMARY KEY,
    customer_id VARCHAR(50),
    customer_name VARCHAR(200),
    gender VARCHAR(10),
    age_group VARCHAR(20),
    city VARCHAR(100),
    state VARCHAR(100),
    country VARCHAR(100),
    customer_segment VARCHAR(50)
);

-- 维度表:门店维度
CREATE TABLE store_dim (
    store_key INT PRIMARY KEY,
    store_id VARCHAR(50),
    store_name VARCHAR(200),
    store_type VARCHAR(50),
    city VARCHAR(100),
    state VARCHAR(100),
    region VARCHAR(100),
    manager VARCHAR(100),
    open_date DATE
);

5.1.3 雪花模式(Snowflake Schema)

雪花模式是星型模式的规范化版本:

特点: - 维度表进一步规范化 - 减少数据冗余 - 增加了表的数量和复杂性 - 查询性能相对较低

示例:产品维度雪花化

-- 产品维度表(雪花化)
CREATE TABLE product_dim (
    product_key INT PRIMARY KEY,
    product_id VARCHAR(50),
    product_name VARCHAR(200),
    subcategory_key INT,
    brand_key INT,
    unit_price DECIMAL(10,2),
    product_status VARCHAR(20)
);

-- 子类别维度表
CREATE TABLE subcategory_dim (
    subcategory_key INT PRIMARY KEY,
    subcategory_name VARCHAR(100),
    category_key INT
);

-- 类别维度表
CREATE TABLE category_dim (
    category_key INT PRIMARY KEY,
    category_name VARCHAR(100),
    department_key INT
);

-- 部门维度表
CREATE TABLE department_dim (
    department_key INT PRIMARY KEY,
    department_name VARCHAR(100)
);

-- 品牌维度表
CREATE TABLE brand_dim (
    brand_key INT PRIMARY KEY,
    brand_name VARCHAR(100),
    brand_country VARCHAR(100)
);

5.1.4 维度建模最佳实践

1. 选择合适的粒度

#!/usr/bin/env python3
# granularity_analyzer.py - 粒度分析工具

import pandas as pd
from datetime import datetime, timedelta
import numpy as np

class GranularityAnalyzer:
    def __init__(self):
        self.granularity_levels = {
            'time': ['year', 'quarter', 'month', 'week', 'day', 'hour', 'minute'],
            'geography': ['country', 'region', 'state', 'city', 'store'],
            'product': ['department', 'category', 'subcategory', 'product', 'sku'],
            'customer': ['segment', 'group', 'individual']
        }
    
    def analyze_data_volume(self, fact_table_size, dimension_counts):
        """分析不同粒度下的数据量"""
        analysis_result = {
            'current_size': fact_table_size,
            'granularity_impact': {},
            'recommendations': []
        }
        
        # 计算不同粒度组合的数据量
        base_combinations = [
            ('day', 'store', 'product'),
            ('day', 'city', 'product'),
            ('week', 'store', 'product'),
            ('month', 'store', 'category'),
            ('quarter', 'region', 'category')
        ]
        
        for time_grain, geo_grain, prod_grain in base_combinations:
            estimated_size = self.estimate_fact_table_size(
                dimension_counts, time_grain, geo_grain, prod_grain
            )
            
            combination_key = f"{time_grain}_{geo_grain}_{prod_grain}"
            analysis_result['granularity_impact'][combination_key] = {
                'estimated_size': estimated_size,
                'size_ratio': estimated_size / fact_table_size if fact_table_size > 0 else 0,
                'query_performance': self.estimate_query_performance(estimated_size),
                'storage_cost': self.estimate_storage_cost(estimated_size)
            }
        
        # 生成建议
        analysis_result['recommendations'] = self.generate_granularity_recommendations(
            analysis_result['granularity_impact']
        )
        
        return analysis_result
    
    def estimate_fact_table_size(self, dimension_counts, time_grain, geo_grain, prod_grain):
        """估算事实表大小"""
        time_factor = {
            'year': 1, 'quarter': 4, 'month': 12, 'week': 52, 
            'day': 365, 'hour': 8760, 'minute': 525600
        }
        
        geo_factor = dimension_counts.get('geography', {}).get(geo_grain, 1)
        prod_factor = dimension_counts.get('product', {}).get(prod_grain, 1)
        time_records = time_factor.get(time_grain, 365)
        
        # 估算记录数(考虑稀疏性)
        sparsity_factor = 0.7  # 假设70%的组合有数据
        estimated_records = time_records * geo_factor * prod_factor * sparsity_factor
        
        # 估算存储大小(假设每条记录平均200字节)
        estimated_size_mb = estimated_records * 200 / (1024 * 1024)
        
        return estimated_size_mb
    
    def estimate_query_performance(self, table_size_mb):
        """估算查询性能"""
        if table_size_mb < 100:
            return 'excellent'
        elif table_size_mb < 1000:
            return 'good'
        elif table_size_mb < 10000:
            return 'fair'
        else:
            return 'poor'
    
    def estimate_storage_cost(self, table_size_mb):
        """估算存储成本"""
        # 假设每GB存储成本0.1美元/月
        cost_per_gb_month = 0.1
        size_gb = table_size_mb / 1024
        monthly_cost = size_gb * cost_per_gb_month
        
        return {
            'size_gb': round(size_gb, 2),
            'monthly_cost_usd': round(monthly_cost, 2)
        }
    
    def generate_granularity_recommendations(self, granularity_impact):
        """生成粒度建议"""
        recommendations = []
        
        # 按查询性能排序
        sorted_options = sorted(
            granularity_impact.items(),
            key=lambda x: (x[1]['query_performance'], x[1]['estimated_size'])
        )
        
        best_option = sorted_options[0]
        recommendations.append(
            f"推荐粒度组合: {best_option[0]} (查询性能: {best_option[1]['query_performance']}, "
            f"存储大小: {best_option[1]['storage_cost']['size_gb']}GB)"
        )
        
        # 检查是否有性能问题
        for option_name, metrics in granularity_impact.items():
            if metrics['query_performance'] == 'poor':
                recommendations.append(
                    f"警告: {option_name} 粒度组合可能导致查询性能问题,建议考虑聚合"
                )
            
            if metrics['storage_cost']['monthly_cost_usd'] > 100:
                recommendations.append(
                    f"注意: {option_name} 粒度组合存储成本较高 "
                    f"(${metrics['storage_cost']['monthly_cost_usd']}/月)"
                )
        
        return recommendations
    
    def print_analysis_report(self, analysis_result):
        """打印分析报告"""
        print("\n=== 粒度分析报告 ===")
        print(f"当前事实表大小: {analysis_result['current_size']:.2f} MB")
        
        print("\n📊 不同粒度组合分析:")
        for option_name, metrics in analysis_result['granularity_impact'].items():
            print(f"\n{option_name}:")
            print(f"  估算大小: {metrics['estimated_size']:.2f} MB")
            print(f"  大小比例: {metrics['size_ratio']:.2f}x")
            print(f"  查询性能: {metrics['query_performance']}")
            print(f"  存储成本: ${metrics['storage_cost']['monthly_cost_usd']}/月 "
                  f"({metrics['storage_cost']['size_gb']}GB)")
        
        print("\n💡 建议:")
        for i, recommendation in enumerate(analysis_result['recommendations'], 1):
            print(f"  {i}. {recommendation}")

def main():
    # 示例使用
    analyzer = GranularityAnalyzer()
    
    # 示例维度计数
    dimension_counts = {
        'geography': {
            'country': 5,
            'region': 20,
            'state': 50,
            'city': 500,
            'store': 2000
        },
        'product': {
            'department': 10,
            'category': 50,
            'subcategory': 200,
            'product': 5000,
            'sku': 50000
        }
    }
    
    # 当前事实表大小(MB)
    current_fact_size = 1000
    
    # 执行分析
    result = analyzer.analyze_data_volume(current_fact_size, dimension_counts)
    analyzer.print_analysis_report(result)

if __name__ == "__main__":
    main()

5.4 模型管理与优化

5.4.1 模型生命周期管理

1. 模型版本控制

#!/usr/bin/env python3
# model_lifecycle_manager.py - 模型生命周期管理工具

import json
import os
import shutil
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from enum import Enum

class ModelStatus(Enum):
    """模型状态"""
    DRAFT = "draft"              # 草稿
    DEVELOPMENT = "development"  # 开发中
    TESTING = "testing"          # 测试中
    PRODUCTION = "production"    # 生产环境
    DEPRECATED = "deprecated"    # 已废弃
    ARCHIVED = "archived"        # 已归档

@dataclass
class ModelVersion:
    """模型版本信息"""
    version: str
    status: ModelStatus
    created_by: str
    created_at: datetime
    description: str
    changes: List[str]
    config_file: str
    backup_file: Optional[str] = None
    
class ModelLifecycleManager:
    """模型生命周期管理器"""
    
    def __init__(self, workspace_dir: str):
        self.workspace_dir = workspace_dir
        self.models_dir = os.path.join(workspace_dir, "models")
        self.versions_dir = os.path.join(workspace_dir, "versions")
        self.backups_dir = os.path.join(workspace_dir, "backups")
        
        # 创建目录结构
        for directory in [self.models_dir, self.versions_dir, self.backups_dir]:
            os.makedirs(directory, exist_ok=True)
    
    def create_model_version(self, model_name: str, config: Dict, 
                           version: str, description: str, 
                           created_by: str, changes: List[str]) -> ModelVersion:
        """创建模型版本"""
        # 生成配置文件路径
        config_filename = f"{model_name}_v{version}.json"
        config_file = os.path.join(self.models_dir, config_filename)
        
        # 保存配置文件
        with open(config_file, 'w', encoding='utf-8') as f:
            json.dump(config, f, indent=2, ensure_ascii=False, default=str)
        
        # 创建版本信息
        model_version = ModelVersion(
            version=version,
            status=ModelStatus.DRAFT,
            created_by=created_by,
            created_at=datetime.now(),
            description=description,
            changes=changes,
            config_file=config_file
        )
        
        # 保存版本信息
        self._save_version_info(model_name, model_version)
        
        print(f"✅ 模型版本创建成功: {model_name} v{version}")
        return model_version
    
    def update_model_status(self, model_name: str, version: str, 
                          new_status: ModelStatus, updated_by: str) -> bool:
        """更新模型状态"""
        version_info = self._load_version_info(model_name, version)
        if not version_info:
            print(f"❌ 模型版本不存在: {model_name} v{version}")
            return False
        
        # 状态转换验证
        if not self._validate_status_transition(version_info.status, new_status):
            print(f"❌ 无效的状态转换: {version_info.status.value} -> {new_status.value}")
            return False
        
        # 更新状态
        version_info.status = new_status
        
        # 如果是生产环境,创建备份
        if new_status == ModelStatus.PRODUCTION:
            backup_file = self._create_backup(model_name, version)
            version_info.backup_file = backup_file
        
        # 保存更新后的版本信息
        self._save_version_info(model_name, version_info)
        
        print(f"✅ 模型状态更新成功: {model_name} v{version} -> {new_status.value}")
        return True
    
    def list_model_versions(self, model_name: str) -> List[ModelVersion]:
        """列出模型的所有版本"""
        versions = []
        version_pattern = f"{model_name}_v"
        
        for filename in os.listdir(self.versions_dir):
            if filename.startswith(version_pattern) and filename.endswith('.json'):
                version_file = os.path.join(self.versions_dir, filename)
                with open(version_file, 'r', encoding='utf-8') as f:
                    version_data = json.load(f)
                    version_info = ModelVersion(
                        version=version_data['version'],
                        status=ModelStatus(version_data['status']),
                        created_by=version_data['created_by'],
                        created_at=datetime.fromisoformat(version_data['created_at']),
                        description=version_data['description'],
                        changes=version_data['changes'],
                        config_file=version_data['config_file'],
                        backup_file=version_data.get('backup_file')
                    )
                    versions.append(version_info)
        
        # 按版本号排序
        versions.sort(key=lambda x: x.version, reverse=True)
        return versions
    
    def get_production_version(self, model_name: str) -> Optional[ModelVersion]:
        """获取生产环境版本"""
        versions = self.list_model_versions(model_name)
        for version in versions:
            if version.status == ModelStatus.PRODUCTION:
                return version
        return None
    
    def rollback_model(self, model_name: str, target_version: str, 
                      rollback_by: str) -> bool:
        """回滚模型到指定版本"""
        # 获取当前生产版本
        current_prod = self.get_production_version(model_name)
        if not current_prod:
            print(f"❌ 没有找到生产环境版本: {model_name}")
            return False
        
        # 获取目标版本
        target_version_info = self._load_version_info(model_name, target_version)
        if not target_version_info:
            print(f"❌ 目标版本不存在: {model_name} v{target_version}")
            return False
        
        # 验证目标版本状态
        if target_version_info.status not in [ModelStatus.PRODUCTION, ModelStatus.TESTING]:
            print(f"❌ 目标版本状态不允许回滚: {target_version_info.status.value}")
            return False
        
        try:
            # 将当前生产版本标记为已废弃
            self.update_model_status(model_name, current_prod.version, 
                                   ModelStatus.DEPRECATED, rollback_by)
            
            # 将目标版本设置为生产环境
            self.update_model_status(model_name, target_version, 
                                   ModelStatus.PRODUCTION, rollback_by)
            
            print(f"✅ 模型回滚成功: {model_name} {current_prod.version} -> {target_version}")
            return True
            
        except Exception as e:
            print(f"❌ 模型回滚失败: {str(e)}")
            return False
    
    def archive_old_versions(self, model_name: str, keep_count: int = 5) -> int:
        """归档旧版本"""
        versions = self.list_model_versions(model_name)
        
        # 过滤出可以归档的版本(非生产环境且非最新的几个版本)
        archivable_versions = []
        for i, version in enumerate(versions):
            if (version.status not in [ModelStatus.PRODUCTION, ModelStatus.TESTING] and 
                i >= keep_count):
                archivable_versions.append(version)
        
        archived_count = 0
        for version in archivable_versions:
            if self.update_model_status(model_name, version.version, 
                                      ModelStatus.ARCHIVED, "system"):
                archived_count += 1
        
        print(f"✅ 已归档 {archived_count} 个旧版本")
        return archived_count
    
    def _save_version_info(self, model_name: str, version_info: ModelVersion):
        """保存版本信息"""
        version_filename = f"{model_name}_v{version_info.version}.json"
        version_file = os.path.join(self.versions_dir, version_filename)
        
        version_data = asdict(version_info)
        version_data['status'] = version_info.status.value
        version_data['created_at'] = version_info.created_at.isoformat()
        
        with open(version_file, 'w', encoding='utf-8') as f:
            json.dump(version_data, f, indent=2, ensure_ascii=False)
    
    def _load_version_info(self, model_name: str, version: str) -> Optional[ModelVersion]:
        """加载版本信息"""
        version_filename = f"{model_name}_v{version}.json"
        version_file = os.path.join(self.versions_dir, version_filename)
        
        if not os.path.exists(version_file):
            return None
        
        with open(version_file, 'r', encoding='utf-8') as f:
            version_data = json.load(f)
            return ModelVersion(
                version=version_data['version'],
                status=ModelStatus(version_data['status']),
                created_by=version_data['created_by'],
                created_at=datetime.fromisoformat(version_data['created_at']),
                description=version_data['description'],
                changes=version_data['changes'],
                config_file=version_data['config_file'],
                backup_file=version_data.get('backup_file')
            )
    
    def _validate_status_transition(self, current_status: ModelStatus, 
                                  new_status: ModelStatus) -> bool:
        """验证状态转换是否有效"""
        valid_transitions = {
            ModelStatus.DRAFT: [ModelStatus.DEVELOPMENT, ModelStatus.ARCHIVED],
            ModelStatus.DEVELOPMENT: [ModelStatus.TESTING, ModelStatus.DRAFT, ModelStatus.ARCHIVED],
            ModelStatus.TESTING: [ModelStatus.PRODUCTION, ModelStatus.DEVELOPMENT, ModelStatus.ARCHIVED],
            ModelStatus.PRODUCTION: [ModelStatus.DEPRECATED],
            ModelStatus.DEPRECATED: [ModelStatus.ARCHIVED],
            ModelStatus.ARCHIVED: []  # 归档状态不能转换到其他状态
        }
        
        return new_status in valid_transitions.get(current_status, [])
    
    def _create_backup(self, model_name: str, version: str) -> str:
        """创建备份文件"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        backup_filename = f"{model_name}_v{version}_{timestamp}.backup"
        backup_file = os.path.join(self.backups_dir, backup_filename)
        
        # 复制配置文件到备份目录
        version_info = self._load_version_info(model_name, version)
        if version_info and os.path.exists(version_info.config_file):
            shutil.copy2(version_info.config_file, backup_file)
        
        return backup_file
    
    def generate_lifecycle_report(self, model_name: str) -> Dict:
        """生成生命周期报告"""
        versions = self.list_model_versions(model_name)
        
        if not versions:
            return {'error': f'模型不存在: {model_name}'}
        
        # 统计各状态的版本数量
        status_counts = {}
        for status in ModelStatus:
            status_counts[status.value] = 0
        
        for version in versions:
            status_counts[version.status.value] += 1
        
        # 获取关键版本信息
        latest_version = versions[0] if versions else None
        production_version = self.get_production_version(model_name)
        
        report = {
            'model_name': model_name,
            'total_versions': len(versions),
            'status_distribution': status_counts,
            'latest_version': {
                'version': latest_version.version,
                'status': latest_version.status.value,
                'created_at': latest_version.created_at.isoformat(),
                'created_by': latest_version.created_by
            } if latest_version else None,
            'production_version': {
                'version': production_version.version,
                'created_at': production_version.created_at.isoformat(),
                'created_by': production_version.created_by
            } if production_version else None,
            'version_history': [
                {
                    'version': v.version,
                    'status': v.status.value,
                    'created_at': v.created_at.isoformat(),
                    'created_by': v.created_by,
                    'description': v.description
                } for v in versions
            ]
        }
        
        return report
    
    def print_lifecycle_report(self, model_name: str):
        """打印生命周期报告"""
        report = self.generate_lifecycle_report(model_name)
        
        if 'error' in report:
            print(f"❌ {report['error']}")
            return
        
        print(f"\n=== {model_name} 生命周期报告 ===")
        print(f"总版本数: {report['total_versions']}")
        
        print("\n📊 状态分布:")
        for status, count in report['status_distribution'].items():
            if count > 0:
                print(f"  {status}: {count}")
        
        if report['latest_version']:
            latest = report['latest_version']
            print(f"\n🔄 最新版本: v{latest['version']} ({latest['status']})")
            print(f"  创建时间: {latest['created_at']}")
            print(f"  创建者: {latest['created_by']}")
        
        if report['production_version']:
            prod = report['production_version']
            print(f"\n🚀 生产版本: v{prod['version']}")
            print(f"  创建时间: {prod['created_at']}")
            print(f"  创建者: {prod['created_by']}")
        else:
            print("\n⚠️ 没有生产环境版本")
        
        print("\n📋 版本历史:")
        for version in report['version_history'][:10]:  # 显示最近10个版本
            status_icon = {
                'draft': '📝',
                'development': '🔧',
                'testing': '🧪',
                'production': '🚀',
                'deprecated': '⚠️',
                'archived': '📦'
            }.get(version['status'], '❓')
            
            print(f"  {status_icon} v{version['version']} ({version['status']}) - {version['description']}")
            print(f"    {version['created_at']} by {version['created_by']}")

def main():
    # 示例使用
    manager = ModelLifecycleManager("/tmp/kylin_models")
    
    # 创建模型版本
    model_config = {
        "name": "sales_model",
        "fact_table": "sales_fact",
        "dimensions": ["time", "product", "customer"],
        "measures": ["sales_amount", "quantity", "profit"]
    }
    
    # 创建版本 1.0
    manager.create_model_version(
        model_name="sales_model",
        config=model_config,
        version="1.0",
        description="初始版本",
        created_by="data_engineer",
        changes=["创建基础销售模型"]
    )
    
    # 更新状态到开发环境
    manager.update_model_status("sales_model", "1.0", ModelStatus.DEVELOPMENT, "data_engineer")
    
    # 创建版本 1.1
    model_config["measures"].append("discount_amount")
    manager.create_model_version(
        model_name="sales_model",
        config=model_config,
        version="1.1",
        description="添加折扣金额度量",
        created_by="data_engineer",
        changes=["添加discount_amount度量"]
    )
    
    # 更新到测试环境
    manager.update_model_status("sales_model", "1.1", ModelStatus.TESTING, "data_engineer")
    
    # 部署到生产环境
    manager.update_model_status("sales_model", "1.1", ModelStatus.PRODUCTION, "data_engineer")
    
    # 打印生命周期报告
    manager.print_lifecycle_report("sales_model")

if __name__ == "__main__":
    main()

2. 模型性能监控

#!/usr/bin/env python3
# model_performance_monitor.py - 模型性能监控工具

import time
import psutil
import requests
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass
import json

@dataclass
class PerformanceMetric:
    """性能指标"""
    timestamp: datetime
    model_name: str
    metric_name: str
    metric_value: float
    unit: str
    threshold: Optional[float] = None
    
class ModelPerformanceMonitor:
    """模型性能监控器"""
    
    def __init__(self, kylin_host: str, username: str, password: str):
        self.kylin_host = kylin_host
        self.username = username
        self.password = password
        self.session = requests.Session()
        self.session.auth = (username, password)
        self.metrics_history = []
        
        # 性能阈值配置
        self.thresholds = {
            'query_response_time': 5.0,      # 秒
            'cube_build_time': 3600.0,       # 秒
            'storage_size': 10.0,            # GB
            'memory_usage': 80.0,            # 百分比
            'cpu_usage': 80.0,               # 百分比
            'query_success_rate': 95.0       # 百分比
        }
    
    def collect_system_metrics(self) -> List[PerformanceMetric]:
        """收集系统性能指标"""
        metrics = []
        timestamp = datetime.now()
        
        # CPU使用率
        cpu_percent = psutil.cpu_percent(interval=1)
        metrics.append(PerformanceMetric(
            timestamp=timestamp,
            model_name="system",
            metric_name="cpu_usage",
            metric_value=cpu_percent,
            unit="percent",
            threshold=self.thresholds['cpu_usage']
        ))
        
        # 内存使用率
        memory = psutil.virtual_memory()
        metrics.append(PerformanceMetric(
            timestamp=timestamp,
            model_name="system",
            metric_name="memory_usage",
            metric_value=memory.percent,
            unit="percent",
            threshold=self.thresholds['memory_usage']
        ))
        
        # 磁盘使用情况
        disk = psutil.disk_usage('/')
        disk_usage_gb = (disk.total - disk.free) / (1024**3)
        metrics.append(PerformanceMetric(
            timestamp=timestamp,
            model_name="system",
            metric_name="disk_usage",
            metric_value=disk_usage_gb,
            unit="GB"
        ))
        
        return metrics
    
    def collect_kylin_metrics(self, model_name: str) -> List[PerformanceMetric]:
        """收集Kylin模型性能指标"""
        metrics = []
        timestamp = datetime.now()
        
        try:
            # 获取模型信息
            model_info = self.get_model_info(model_name)
            if not model_info:
                return metrics
            
            # 获取Cube信息
            cubes = self.get_model_cubes(model_name)
            
            for cube in cubes:
                cube_name = cube.get('name')
                
                # Cube存储大小
                storage_size = self.get_cube_storage_size(cube_name)
                if storage_size is not None:
                    metrics.append(PerformanceMetric(
                        timestamp=timestamp,
                        model_name=model_name,
                        metric_name="storage_size",
                        metric_value=storage_size,
                        unit="GB",
                        threshold=self.thresholds['storage_size']
                    ))
                
                # 最近构建时间
                build_time = self.get_last_build_time(cube_name)
                if build_time is not None:
                    metrics.append(PerformanceMetric(
                        timestamp=timestamp,
                        model_name=model_name,
                        metric_name="cube_build_time",
                        metric_value=build_time,
                        unit="seconds",
                        threshold=self.thresholds['cube_build_time']
                    ))
            
            # 查询性能指标
            query_metrics = self.get_query_performance_metrics(model_name)
            metrics.extend(query_metrics)
            
        except Exception as e:
            print(f"收集Kylin指标时出错: {e}")
        
        return metrics
    
    def get_model_info(self, model_name: str) -> Optional[Dict]:
        """获取模型信息"""
        try:
            response = self.session.get(
                f"http://{self.kylin_host}/kylin/api/models/{model_name}"
            )
            return response.json() if response.status_code == 200 else None
        except:
            return None
    
    def get_model_cubes(self, model_name: str) -> List[Dict]:
        """获取模型的Cube列表"""
        try:
            response = self.session.get(
                f"http://{self.kylin_host}/kylin/api/cubes",
                params={'modelName': model_name}
            )
            return response.json() if response.status_code == 200 else []
        except:
            return []
    
    def get_cube_storage_size(self, cube_name: str) -> Optional[float]:
        """获取Cube存储大小(GB)"""
        try:
            response = self.session.get(
                f"http://{self.kylin_host}/kylin/api/cubes/{cube_name}"
            )
            if response.status_code == 200:
                cube_info = response.json()
                size_kb = cube_info.get('size_kb', 0)
                return size_kb / (1024 * 1024)  # 转换为GB
        except:
            pass
        return None
    
    def get_last_build_time(self, cube_name: str) -> Optional[float]:
        """获取最近构建时间(秒)"""
        try:
            response = self.session.get(
                f"http://{self.kylin_host}/kylin/api/jobs",
                params={'cubeName': cube_name, 'limit': 1}
            )
            if response.status_code == 200:
                jobs = response.json()
                if jobs:
                    job = jobs[0]
                    start_time = job.get('create_time')
                    end_time = job.get('last_modified')
                    if start_time and end_time:
                        duration = (end_time - start_time) / 1000  # 转换为秒
                        return duration
        except:
            pass
        return None
    
    def get_query_performance_metrics(self, model_name: str) -> List[PerformanceMetric]:
        """获取查询性能指标"""
        metrics = []
        timestamp = datetime.now()
        
        try:
            # 执行测试查询
            test_queries = self.get_test_queries(model_name)
            
            total_queries = len(test_queries)
            successful_queries = 0
            total_response_time = 0
            
            for query in test_queries:
                start_time = time.time()
                success = self.execute_test_query(query)
                end_time = time.time()
                
                response_time = end_time - start_time
                total_response_time += response_time
                
                if success:
                    successful_queries += 1
                
                # 记录单个查询的响应时间
                metrics.append(PerformanceMetric(
                    timestamp=timestamp,
                    model_name=model_name,
                    metric_name="query_response_time",
                    metric_value=response_time,
                    unit="seconds",
                    threshold=self.thresholds['query_response_time']
                ))
            
            # 计算平均响应时间
            if total_queries > 0:
                avg_response_time = total_response_time / total_queries
                metrics.append(PerformanceMetric(
                    timestamp=timestamp,
                    model_name=model_name,
                    metric_name="avg_query_response_time",
                    metric_value=avg_response_time,
                    unit="seconds",
                    threshold=self.thresholds['query_response_time']
                ))
                
                # 计算查询成功率
                success_rate = (successful_queries / total_queries) * 100
                metrics.append(PerformanceMetric(
                    timestamp=timestamp,
                    model_name=model_name,
                    metric_name="query_success_rate",
                    metric_value=success_rate,
                    unit="percent",
                    threshold=self.thresholds['query_success_rate']
                ))
        
        except Exception as e:
            print(f"获取查询性能指标时出错: {e}")
        
        return metrics
    
    def get_test_queries(self, model_name: str) -> List[str]:
        """获取测试查询列表"""
        # 这里应该根据模型定义生成测试查询
        # 为了示例,返回一些基本查询
        return [
            f"SELECT COUNT(*) FROM {model_name}",
            f"SELECT SUM(sales_amount) FROM {model_name} WHERE year = 2023",
            f"SELECT category, SUM(sales_amount) FROM {model_name} GROUP BY category"
        ]
    
    def execute_test_query(self, query: str) -> bool:
        """执行测试查询"""
        try:
            response = self.session.post(
                f"http://{self.kylin_host}/kylin/api/query",
                json={
                    "sql": query,
                    "project": "default"
                }
            )
            return response.status_code == 200
        except:
            return False
    
    def monitor_model(self, model_name: str, duration_minutes: int = 60, 
                     interval_seconds: int = 300) -> List[PerformanceMetric]:
        """监控模型性能"""
        print(f"开始监控模型: {model_name},持续时间: {duration_minutes}分钟")
        
        all_metrics = []
        end_time = datetime.now() + timedelta(minutes=duration_minutes)
        
        while datetime.now() < end_time:
            print(f"收集性能指标... {datetime.now().strftime('%H:%M:%S')}")
            
            # 收集系统指标
            system_metrics = self.collect_system_metrics()
            all_metrics.extend(system_metrics)
            
            # 收集Kylin指标
            kylin_metrics = self.collect_kylin_metrics(model_name)
            all_metrics.extend(kylin_metrics)
            
            # 检查阈值告警
            self.check_thresholds(system_metrics + kylin_metrics)
            
            # 等待下一次收集
            time.sleep(interval_seconds)
        
        self.metrics_history.extend(all_metrics)
        print(f"监控完成,共收集 {len(all_metrics)} 个指标")
        
        return all_metrics
    
    def check_thresholds(self, metrics: List[PerformanceMetric]):
        """检查阈值告警"""
        for metric in metrics:
            if metric.threshold is not None and metric.metric_value > metric.threshold:
                print(f"⚠️ 阈值告警: {metric.model_name}.{metric.metric_name} = "
                      f"{metric.metric_value:.2f} {metric.unit} (阈值: {metric.threshold})")
    
    def generate_performance_report(self, model_name: str, 
                                  hours: int = 24) -> Dict:
        """生成性能报告"""
        # 过滤指定时间范围内的指标
        cutoff_time = datetime.now() - timedelta(hours=hours)
        recent_metrics = [
            m for m in self.metrics_history 
            if m.timestamp >= cutoff_time and 
               (m.model_name == model_name or m.model_name == "system")
        ]
        
        if not recent_metrics:
            return {'error': f'没有找到 {model_name} 在过去 {hours} 小时的性能数据'}
        
        # 按指标名称分组
        metrics_by_name = {}
        for metric in recent_metrics:
            if metric.metric_name not in metrics_by_name:
                metrics_by_name[metric.metric_name] = []
            metrics_by_name[metric.metric_name].append(metric)
        
        # 计算统计信息
        report = {
            'model_name': model_name,
            'report_period': f'{hours} hours',
            'total_metrics': len(recent_metrics),
            'metrics_summary': {}
        }
        
        for metric_name, metric_list in metrics_by_name.items():
            values = [m.metric_value for m in metric_list]
            unit = metric_list[0].unit
            threshold = metric_list[0].threshold
            
            summary = {
                'count': len(values),
                'min': min(values),
                'max': max(values),
                'avg': sum(values) / len(values),
                'unit': unit,
                'threshold': threshold,
                'threshold_violations': sum(1 for v in values if threshold and v > threshold)
            }
            
            report['metrics_summary'][metric_name] = summary
        
        return report
    
    def print_performance_report(self, model_name: str, hours: int = 24):
        """打印性能报告"""
        report = self.generate_performance_report(model_name, hours)
        
        if 'error' in report:
            print(f"❌ {report['error']}")
            return
        
        print(f"\n=== {model_name} 性能报告 ({report['report_period']}) ===")
        print(f"总指标数: {report['total_metrics']}")
        
        print("\n📊 指标摘要:")
        for metric_name, summary in report['metrics_summary'].items():
            print(f"\n{metric_name}:")
            print(f"  数量: {summary['count']}")
            print(f"  最小值: {summary['min']:.2f} {summary['unit']}")
            print(f"  最大值: {summary['max']:.2f} {summary['unit']}")
            print(f"  平均值: {summary['avg']:.2f} {summary['unit']}")
            
            if summary['threshold']:
                print(f"  阈值: {summary['threshold']} {summary['unit']}")
                if summary['threshold_violations'] > 0:
                    print(f"  ⚠️ 阈值违规: {summary['threshold_violations']} 次")
                else:
                    print(f"  ✅ 无阈值违规")

def main():
    # 示例使用
    monitor = ModelPerformanceMonitor(
        kylin_host="localhost:7070",
        username="ADMIN",
        password="KYLIN"
    )
    
    # 监控模型性能(5分钟,每30秒收集一次)
    metrics = monitor.monitor_model(
        model_name="sales_model",
        duration_minutes=5,
        interval_seconds=30
    )
    
    # 生成性能报告
    monitor.print_performance_report("sales_model", hours=1)

if __name__ == "__main__":
    main()

5.4.2 模型优化策略

1. 存储优化

#!/usr/bin/env python3
# model_optimizer.py - 模型优化工具

import json
import math
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from enum import Enum

class OptimizationType(Enum):
    """优化类型"""
    STORAGE = "storage"          # 存储优化
    PERFORMANCE = "performance"  # 性能优化
    COST = "cost"               # 成本优化
    MAINTENANCE = "maintenance" # 维护优化

@dataclass
class OptimizationRecommendation:
    """优化建议"""
    type: OptimizationType
    priority: str  # high, medium, low
    title: str
    description: str
    impact: str
    implementation: str
    estimated_savings: Optional[str] = None

class ModelOptimizer:
    """模型优化器"""
    
    def __init__(self):
        self.optimization_rules = self._load_optimization_rules()
    
    def analyze_model(self, model_config: Dict) -> List[OptimizationRecommendation]:
        """分析模型并生成优化建议"""
        recommendations = []
        
        # 存储优化分析
        storage_recommendations = self._analyze_storage_optimization(model_config)
        recommendations.extend(storage_recommendations)
        
        # 性能优化分析
        performance_recommendations = self._analyze_performance_optimization(model_config)
        recommendations.extend(performance_recommendations)
        
        # 成本优化分析
        cost_recommendations = self._analyze_cost_optimization(model_config)
        recommendations.extend(cost_recommendations)
        
        # 维护优化分析
        maintenance_recommendations = self._analyze_maintenance_optimization(model_config)
        recommendations.extend(maintenance_recommendations)
        
        # 按优先级排序
        priority_order = {'high': 0, 'medium': 1, 'low': 2}
        recommendations.sort(key=lambda x: priority_order.get(x.priority, 3))
        
        return recommendations
    
    def _analyze_storage_optimization(self, model_config: Dict) -> List[OptimizationRecommendation]:
        """分析存储优化"""
        recommendations = []
        
        # 检查维度数量
        dimensions = model_config.get('dimensions', [])
        if len(dimensions) > 15:
            recommendations.append(OptimizationRecommendation(
                type=OptimizationType.STORAGE,
                priority='high',
                title='减少维度数量',
                description=f'当前模型有{len(dimensions)}个维度,过多的维度会导致Cube爆炸',
                impact='可显著减少存储空间和构建时间',
                implementation='移除低频使用的维度,或将相关维度合并为层次维度',
                estimated_savings='存储空间可减少30-50%'
            ))
        
        # 检查度量数量
        measures = model_config.get('measures', [])
        if len(measures) > 20:
            recommendations.append(OptimizationRecommendation(
                type=OptimizationType.STORAGE,
                priority='medium',
                title='优化度量定义',
                description=f'当前模型有{len(measures)}个度量,建议合并相似度量',
                impact='减少存储开销和计算复杂度',
                implementation='合并相似度量,使用派生度量替代冗余度量',
                estimated_savings='存储空间可减少10-20%'
            ))
        
        # 检查分区策略
        partition_config = model_config.get('partition_config', {})
        if not partition_config:
            recommendations.append(OptimizationRecommendation(
                type=OptimizationType.STORAGE,
                priority='high',
                title='配置分区策略',
                description='未配置分区策略,会影响增量构建效率',
                impact='提高增量构建性能,减少全量构建频率',
                implementation='基于时间维度配置合适的分区策略',
                estimated_savings='构建时间可减少60-80%'
            ))
        
        # 检查聚合组配置
        aggregation_groups = model_config.get('aggregation_groups', [])
        if not aggregation_groups:
            recommendations.append(OptimizationRecommendation(
                type=OptimizationType.STORAGE,
                priority='high',
                title='配置聚合组',
                description='未配置聚合组,无法有效控制Cuboid数量',
                impact='大幅减少存储空间和构建时间',
                implementation='根据查询模式配置合适的聚合组',
                estimated_savings='存储空间可减少70-90%'
            ))
        
        return recommendations
    
    def _analyze_performance_optimization(self, model_config: Dict) -> List[OptimizationRecommendation]:
        """分析性能优化"""
        recommendations = []
        
        # 检查编码策略
        dimensions = model_config.get('dimensions', [])
        for dim in dimensions:
            if isinstance(dim, dict):
                encoding = dim.get('encoding', {})
                if not encoding:
                    recommendations.append(OptimizationRecommendation(
                        type=OptimizationType.PERFORMANCE,
                        priority='medium',
                        title=f'优化维度{dim.get("name", "")}的编码',
                        description='未配置编码策略,影响查询性能',
                        impact='提高查询性能和压缩率',
                        implementation='根据维度基数选择合适的编码方式',
                        estimated_savings='查询性能可提升20-40%'
                    ))
        
        # 检查索引配置
        if 'rowkey_columns' not in model_config:
            recommendations.append(OptimizationRecommendation(
                type=OptimizationType.PERFORMANCE,
                priority='high',
                title='优化Rowkey设计',
                description='Rowkey设计不合理,影响查询性能',
                impact='显著提高查询性能',
                implementation='根据查询模式优化Rowkey列顺序',
                estimated_savings='查询性能可提升50-80%'
            ))
        
        # 检查缓存配置
        cache_config = model_config.get('cache_config', {})
        if not cache_config.get('enabled', False):
            recommendations.append(OptimizationRecommendation(
                type=OptimizationType.PERFORMANCE,
                priority='medium',
                title='启用查询缓存',
                description='未启用查询缓存,重复查询性能差',
                impact='提高重复查询性能',
                implementation='启用并配置合适的缓存策略',
                estimated_savings='重复查询性能可提升80-95%'
            ))
        
        return recommendations
    
    def _analyze_cost_optimization(self, model_config: Dict) -> List[OptimizationRecommendation]:
        """分析成本优化"""
        recommendations = []
        
        # 检查构建频率
        build_config = model_config.get('build_config', {})
        build_frequency = build_config.get('frequency', 'daily')
        
        if build_frequency == 'hourly':
            recommendations.append(OptimizationRecommendation(
                type=OptimizationType.COST,
                priority='medium',
                title='优化构建频率',
                description='小时级构建频率可能过于频繁,增加计算成本',
                impact='减少计算资源消耗',
                implementation='根据业务需求调整为合适的构建频率',
                estimated_savings='计算成本可减少30-50%'
            ))
        
        # 检查保留策略
        retention_config = model_config.get('retention_config', {})
        if not retention_config:
            recommendations.append(OptimizationRecommendation(
                type=OptimizationType.COST,
                priority='low',
                title='配置数据保留策略',
                description='未配置数据保留策略,可能存储过多历史数据',
                impact='减少存储成本',
                implementation='根据业务需求配置合适的数据保留期',
                estimated_savings='存储成本可减少20-40%'
            ))
        
        return recommendations
    
    def _analyze_maintenance_optimization(self, model_config: Dict) -> List[OptimizationRecommendation]:
        """分析维护优化"""
        recommendations = []
        
        # 检查监控配置
        monitoring_config = model_config.get('monitoring_config', {})
        if not monitoring_config:
            recommendations.append(OptimizationRecommendation(
                type=OptimizationType.MAINTENANCE,
                priority='medium',
                title='配置监控告警',
                description='未配置监控告警,难以及时发现问题',
                impact='提高系统可靠性和运维效率',
                implementation='配置关键指标的监控和告警',
                estimated_savings='运维成本可减少40-60%'
            ))
        
        # 检查备份策略
        backup_config = model_config.get('backup_config', {})
        if not backup_config:
            recommendations.append(OptimizationRecommendation(
                type=OptimizationType.MAINTENANCE,
                priority='high',
                title='配置备份策略',
                description='未配置备份策略,存在数据丢失风险',
                impact='提高数据安全性',
                implementation='配置定期备份和恢复策略',
                estimated_savings='避免数据丢失风险'
            ))
        
        return recommendations
    
    def _load_optimization_rules(self) -> Dict:
        """加载优化规则"""
        return {
            'max_dimensions': 15,
            'max_measures': 20,
            'recommended_encodings': {
                'low_cardinality': 'dict',
                'medium_cardinality': 'fixed_length',
                'high_cardinality': 'fixed_length_hex'
            },
            'partition_strategies': {
                'daily': 'YYYY-MM-DD',
                'monthly': 'YYYY-MM',
                'yearly': 'YYYY'
            }
        }
    
    def generate_optimization_plan(self, model_config: Dict) -> Dict:
        """生成优化计划"""
        recommendations = self.analyze_model(model_config)
        
        # 按类型分组
        recommendations_by_type = {}
        for rec in recommendations:
            type_name = rec.type.value
            if type_name not in recommendations_by_type:
                recommendations_by_type[type_name] = []
            recommendations_by_type[type_name].append(rec)
        
        # 计算优化影响
        total_recommendations = len(recommendations)
        high_priority_count = sum(1 for r in recommendations if r.priority == 'high')
        
        plan = {
            'model_name': model_config.get('name', 'unknown'),
            'analysis_date': '2024-01-01',  # 实际应用中使用当前日期
            'total_recommendations': total_recommendations,
            'high_priority_count': high_priority_count,
            'recommendations_by_type': recommendations_by_type,
            'implementation_phases': self._create_implementation_phases(recommendations)
        }
        
        return plan
    
    def _create_implementation_phases(self, recommendations: List[OptimizationRecommendation]) -> List[Dict]:
        """创建实施阶段"""
        phases = []
        
        # 第一阶段:高优先级优化
        high_priority = [r for r in recommendations if r.priority == 'high']
        if high_priority:
            phases.append({
                'phase': 1,
                'name': '紧急优化',
                'description': '解决高优先级问题,立即实施',
                'duration': '1-2周',
                'recommendations': [{
                    'title': r.title,
                    'type': r.type.value,
                    'implementation': r.implementation
                } for r in high_priority]
            })
        
        # 第二阶段:中优先级优化
        medium_priority = [r for r in recommendations if r.priority == 'medium']
        if medium_priority:
            phases.append({
                'phase': 2,
                'name': '性能优化',
                'description': '提升系统性能和效率',
                'duration': '2-4周',
                'recommendations': [{
                    'title': r.title,
                    'type': r.type.value,
                    'implementation': r.implementation
                } for r in medium_priority]
            })
        
        # 第三阶段:低优先级优化
        low_priority = [r for r in recommendations if r.priority == 'low']
        if low_priority:
            phases.append({
                'phase': 3,
                'name': '长期优化',
                'description': '长期改进和成本优化',
                'duration': '1-2个月',
                'recommendations': [{
                    'title': r.title,
                    'type': r.type.value,
                    'implementation': r.implementation
                } for r in low_priority]
            })
        
        return phases
    
    def print_optimization_report(self, model_config: Dict):
        """打印优化报告"""
        plan = self.generate_optimization_plan(model_config)
        
        print(f"\n=== {plan['model_name']} 优化报告 ===")
        print(f"分析日期: {plan['analysis_date']}")
        print(f"总建议数: {plan['total_recommendations']}")
        print(f"高优先级: {plan['high_priority_count']}")
        
        print("\n📊 按类型分布:")
        for opt_type, recommendations in plan['recommendations_by_type'].items():
            print(f"  {opt_type}: {len(recommendations)}")
        
        print("\n🔧 详细建议:")
        for opt_type, recommendations in plan['recommendations_by_type'].items():
            print(f"\n{opt_type.upper()} 优化:")
            for i, rec in enumerate(recommendations, 1):
                priority_icon = {'high': '🔴', 'medium': '🟡', 'low': '🟢'}.get(rec.priority, '⚪')
                print(f"  {priority_icon} {i}. {rec.title}")
                print(f"     描述: {rec.description}")
                print(f"     影响: {rec.impact}")
                print(f"     实施: {rec.implementation}")
                if rec.estimated_savings:
                    print(f"     预期收益: {rec.estimated_savings}")
        
        print("\n📅 实施计划:")
        for phase in plan['implementation_phases']:
            print(f"\n阶段 {phase['phase']}: {phase['name']}")
            print(f"  描述: {phase['description']}")
            print(f"  周期: {phase['duration']}")
            print(f"  任务数: {len(phase['recommendations'])}")

def main():
    # 示例使用
    optimizer = ModelOptimizer()
    
    # 示例模型配置
    model_config = {
        "name": "sales_model",
        "dimensions": [
            {"name": "time", "encoding": {"type": "date"}},
            {"name": "product"},  # 缺少编码配置
            {"name": "customer"},
            {"name": "region"},
            {"name": "channel"},
            # ... 更多维度(模拟过多维度)
        ] + [{"name": f"dim_{i}"} for i in range(10, 20)],  # 总共17个维度
        "measures": [
            "sales_amount", "quantity", "profit", "discount",
            "cost", "revenue", "margin", "tax", "shipping"
        ],
        "build_config": {
            "frequency": "hourly"  # 可能过于频繁
        }
        # 缺少分区配置、聚合组配置等
    }
    
    # 生成优化报告
    optimizer.print_optimization_report(model_config)

if __name__ == "__main__":
    main()

2. 查询优化

#!/usr/bin/env python3
# query_optimizer.py - 查询优化工具

import re
import sqlparse
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from enum import Enum

class QueryIssueType(Enum):
    """查询问题类型"""
    PERFORMANCE = "performance"      # 性能问题
    SYNTAX = "syntax"               # 语法问题
    OPTIMIZATION = "optimization"   # 优化建议
    BEST_PRACTICE = "best_practice" # 最佳实践

@dataclass
class QueryIssue:
    """查询问题"""
    type: QueryIssueType
    severity: str  # high, medium, low
    title: str
    description: str
    suggestion: str
    example: Optional[str] = None

class QueryOptimizer:
    """查询优化器"""
    
    def __init__(self):
        self.optimization_rules = self._load_optimization_rules()
    
    def analyze_query(self, sql: str, model_config: Dict = None) -> List[QueryIssue]:
        """分析SQL查询并提供优化建议"""
        issues = []
        
        # 解析SQL
        try:
            parsed = sqlparse.parse(sql)[0]
        except Exception as e:
            issues.append(QueryIssue(
                type=QueryIssueType.SYNTAX,
                severity='high',
                title='SQL语法错误',
                description=f'SQL解析失败: {str(e)}',
                suggestion='检查SQL语法是否正确'
            ))
            return issues
        
        # 性能分析
        performance_issues = self._analyze_performance(sql, parsed)
        issues.extend(performance_issues)
        
        # 语法分析
        syntax_issues = self._analyze_syntax(sql, parsed)
        issues.extend(syntax_issues)
        
        # 优化建议
        optimization_issues = self._analyze_optimization(sql, parsed, model_config)
        issues.extend(optimization_issues)
        
        # 最佳实践检查
        best_practice_issues = self._analyze_best_practices(sql, parsed)
        issues.extend(best_practice_issues)
        
        return issues
    
    def _analyze_performance(self, sql: str, parsed) -> List[QueryIssue]:
        """分析性能问题"""
        issues = []
        sql_lower = sql.lower()
        
        # 检查SELECT *
        if 'select *' in sql_lower:
            issues.append(QueryIssue(
                type=QueryIssueType.PERFORMANCE,
                severity='medium',
                title='避免使用SELECT *',
                description='SELECT * 会查询所有列,影响性能',
                suggestion='明确指定需要的列名',
                example='SELECT col1, col2 FROM table'
            ))
        
        # 检查缺少WHERE条件
        if 'where' not in sql_lower and 'group by' in sql_lower:
            issues.append(QueryIssue(
                type=QueryIssueType.PERFORMANCE,
                severity='high',
                title='缺少WHERE过滤条件',
                description='聚合查询缺少WHERE条件可能导致全表扫描',
                suggestion='添加适当的WHERE条件过滤数据',
                example='SELECT ... FROM table WHERE date >= \'2024-01-01\''
            ))
        
        # 检查DISTINCT使用
        if 'distinct' in sql_lower:
            issues.append(QueryIssue(
                type=QueryIssueType.PERFORMANCE,
                severity='medium',
                title='谨慎使用DISTINCT',
                description='DISTINCT操作可能影响查询性能',
                suggestion='确认是否真的需要去重,考虑使用GROUP BY替代',
                example='SELECT col1, COUNT(*) FROM table GROUP BY col1'
            ))
        
        # 检查子查询
        if re.search(r'\(\s*select', sql_lower):
            issues.append(QueryIssue(
                type=QueryIssueType.PERFORMANCE,
                severity='medium',
                title='优化子查询',
                description='复杂子查询可能影响性能',
                suggestion='考虑使用JOIN或WITH子句重写子查询'
            ))
        
        # 检查ORDER BY without LIMIT
        if 'order by' in sql_lower and 'limit' not in sql_lower:
            issues.append(QueryIssue(
                type=QueryIssueType.PERFORMANCE,
                severity='medium',
                title='ORDER BY建议配合LIMIT使用',
                description='大结果集的排序操作消耗资源',
                suggestion='如果不需要全部结果,添加LIMIT限制',
                example='SELECT ... ORDER BY col1 LIMIT 100'
            ))
        
        return issues
    
    def _analyze_syntax(self, sql: str, parsed) -> List[QueryIssue]:
        """分析语法问题"""
        issues = []
        
        # 检查日期格式
        date_patterns = re.findall(r"'(\d{4}-\d{2}-\d{2}[^']*)'|\"(\d{4}-\d{2}-\d{2}[^\"]*)\"", sql)
        for pattern_group in date_patterns:
            date_str = pattern_group[0] or pattern_group[1]
            if not re.match(r'^\d{4}-\d{2}-\d{2}(\s\d{2}:\d{2}:\d{2})?$', date_str):
                issues.append(QueryIssue(
                    type=QueryIssueType.SYNTAX,
                    severity='medium',
                    title='日期格式不规范',
                    description=f'日期格式 "{date_str}" 可能不被正确识别',
                    suggestion='使用标准日期格式 YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS',
                    example="WHERE date_col >= '2024-01-01'"
                ))
        
        # 检查表名和列名引用
        if re.search(r'[^a-zA-Z0-9_\s]\w+\.[^a-zA-Z0-9_]', sql):
            issues.append(QueryIssue(
                type=QueryIssueType.SYNTAX,
                severity='low',
                title='建议使用表别名',
                description='复杂查询建议使用表别名提高可读性',
                suggestion='为表定义简短的别名',
                example='SELECT t1.col1, t2.col2 FROM table1 t1 JOIN table2 t2 ON ...'
            ))
        
        return issues
    
    def _analyze_optimization(self, sql: str, parsed, model_config: Dict = None) -> List[QueryIssue]:
        """分析优化机会"""
        issues = []
        sql_lower = sql.lower()
        
        # 检查时间过滤优化
        if model_config and 'partition_column' in model_config:
            partition_col = model_config['partition_column'].lower()
            if partition_col not in sql_lower:
                issues.append(QueryIssue(
                    type=QueryIssueType.OPTIMIZATION,
                    severity='high',
                    title='添加分区列过滤',
                    description=f'查询未包含分区列 {partition_col},无法利用分区剪枝',
                    suggestion=f'在WHERE条件中添加 {partition_col} 过滤',
                    example=f"WHERE {partition_col} >= '2024-01-01'"
                ))
        
        # 检查聚合函数优化
        agg_functions = re.findall(r'(count|sum|avg|min|max)\s*\([^)]+\)', sql_lower)
        if len(agg_functions) > 5:
            issues.append(QueryIssue(
                type=QueryIssueType.OPTIMIZATION,
                severity='medium',
                title='简化聚合函数',
                description=f'查询包含{len(agg_functions)}个聚合函数,可能影响性能',
                suggestion='考虑分解为多个简单查询或使用预聚合表'
            ))
        
        # 检查JOIN优化
        join_count = len(re.findall(r'\bjoin\b', sql_lower))
        if join_count > 3:
            issues.append(QueryIssue(
                type=QueryIssueType.OPTIMIZATION,
                severity='medium',
                title='优化多表JOIN',
                description=f'查询包含{join_count}个JOIN,可能影响性能',
                suggestion='检查JOIN顺序和条件,考虑使用宽表或预聚合'
            ))
        
        # 检查CASE WHEN优化
        case_count = len(re.findall(r'\bcase\b', sql_lower))
        if case_count > 3:
            issues.append(QueryIssue(
                type=QueryIssueType.OPTIMIZATION,
                severity='low',
                title='简化CASE WHEN逻辑',
                description=f'查询包含{case_count}个CASE表达式',
                suggestion='考虑在数据模型中预处理复杂逻辑'
            ))
        
        return issues
    
    def _analyze_best_practices(self, sql: str, parsed) -> List[QueryIssue]:
        """分析最佳实践"""
        issues = []
        sql_lower = sql.lower()
        
        # 检查注释
        if '--' not in sql and '/*' not in sql:
            issues.append(QueryIssue(
                type=QueryIssueType.BEST_PRACTICE,
                severity='low',
                title='添加查询注释',
                description='复杂查询建议添加注释说明业务逻辑',
                suggestion='在查询开头添加注释说明查询目的',
                example='-- 查询2024年各产品类别的销售汇总\nSELECT ...'
            ))
        
        # 检查格式化
        if '\n' not in sql or sql.count(' ') / len(sql) < 0.1:
            issues.append(QueryIssue(
                type=QueryIssueType.BEST_PRACTICE,
                severity='low',
                title='改善SQL格式',
                description='SQL格式不够清晰,影响可读性',
                suggestion='使用适当的换行和缩进格式化SQL'
            ))
        
        # 检查大小写一致性
        keywords = ['select', 'from', 'where', 'group', 'order', 'having']
        keyword_cases = []
        for keyword in keywords:
            if keyword in sql_lower:
                # 查找原始大小写
                pattern = re.compile(re.escape(keyword), re.IGNORECASE)
                matches = pattern.findall(sql)
                if matches:
                    keyword_cases.extend(matches)
        
        if len(set(keyword_cases)) > 1:
            issues.append(QueryIssue(
                type=QueryIssueType.BEST_PRACTICE,
                severity='low',
                title='保持关键字大小写一致',
                description='SQL关键字大小写不一致',
                suggestion='统一使用大写或小写关键字'
            ))
        
        return issues
    
    def _load_optimization_rules(self) -> Dict:
        """加载优化规则"""
        return {
            'max_aggregations': 5,
            'max_joins': 3,
            'max_case_expressions': 3,
            'recommended_date_format': 'YYYY-MM-DD',
            'performance_keywords': ['distinct', 'order by', 'group by']
        }
    
    def optimize_query(self, sql: str, model_config: Dict = None) -> Dict:
        """优化查询并返回建议"""
        issues = self.analyze_query(sql, model_config)
        
        # 生成优化后的SQL(示例)
        optimized_sql = self._generate_optimized_sql(sql, issues)
        
        return {
            'original_sql': sql,
            'optimized_sql': optimized_sql,
            'issues': issues,
            'optimization_summary': self._generate_optimization_summary(issues)
        }
    
    def _generate_optimized_sql(self, sql: str, issues: List[QueryIssue]) -> str:
        """生成优化后的SQL(简化示例)"""
        optimized = sql
        
        # 简单的优化示例
        for issue in issues:
            if issue.title == '避免使用SELECT *' and 'SELECT *' in optimized:
                optimized = optimized.replace('SELECT *', 'SELECT col1, col2, col3  -- 请替换为实际需要的列')
        
        return optimized
    
    def _generate_optimization_summary(self, issues: List[QueryIssue]) -> Dict:
        """生成优化摘要"""
        summary = {
            'total_issues': len(issues),
            'by_severity': {'high': 0, 'medium': 0, 'low': 0},
            'by_type': {}
        }
        
        for issue in issues:
            summary['by_severity'][issue.severity] += 1
            type_name = issue.type.value
            if type_name not in summary['by_type']:
                summary['by_type'][type_name] = 0
            summary['by_type'][type_name] += 1
        
        return summary
    
    def print_optimization_report(self, sql: str, model_config: Dict = None):
        """打印优化报告"""
        result = self.optimize_query(sql, model_config)
        issues = result['issues']
        summary = result['optimization_summary']
        
        print("\n=== SQL查询优化报告 ===")
        print(f"总问题数: {summary['total_issues']}")
        
        if summary['total_issues'] == 0:
            print("✅ 查询已经很好,没有发现明显问题")
            return
        
        print("\n📊 问题分布:")
        print(f"  严重: {summary['by_severity']['high']}")
        print(f"  中等: {summary['by_severity']['medium']}")
        print(f"  轻微: {summary['by_severity']['low']}")
        
        print("\n🔍 详细问题:")
        for i, issue in enumerate(issues, 1):
            severity_icon = {'high': '🔴', 'medium': '🟡', 'low': '🟢'}.get(issue.severity, '⚪')
            type_icon = {
                'performance': '⚡',
                'syntax': '📝',
                'optimization': '🔧',
                'best_practice': '💡'
            }.get(issue.type.value, '❓')
            
            print(f"\n{severity_icon} {type_icon} {i}. {issue.title}")
            print(f"   问题: {issue.description}")
            print(f"   建议: {issue.suggestion}")
            if issue.example:
                print(f"   示例: {issue.example}")
        
        if result['optimized_sql'] != result['original_sql']:
            print("\n🔧 优化后的SQL:")
            print(result['optimized_sql'])

def main():
    # 示例使用
    optimizer = QueryOptimizer()
    
    # 示例SQL查询
    test_sql = """
    SELECT * FROM sales_fact 
    WHERE product_category = 'Electronics'
    GROUP BY customer_region
    ORDER BY sum(sales_amount) DESC
    """
    
    # 示例模型配置
    model_config = {
        'partition_column': 'order_date'
    }
    
    # 生成优化报告
    optimizer.print_optimization_report(test_sql, model_config)

if __name__ == "__main__":
    main()

5.4.3 最佳实践总结

1. 模型设计原则

  • 维度设计

    • 控制维度数量(建议不超过15个)
    • 合理设计维度层次
    • 选择合适的编码策略
    • 避免高基数维度
  • 度量设计

    • 优先使用预聚合度量
    • 合理使用派生度量
    • 避免复杂计算逻辑
    • 考虑度量的可加性
  • 分区策略

    • 基于时间维度分区
    • 选择合适的分区粒度
    • 配置增量构建
    • 定期清理历史分区

2. 性能优化策略

  • 存储优化

    • 配置聚合组减少Cuboid
    • 优化Rowkey设计
    • 选择合适的存储引擎
    • 启用压缩算法
  • 查询优化

    • 利用分区剪枝
    • 避免全表扫描
    • 合理使用缓存
    • 优化SQL语句
  • 构建优化

    • 配置合适的构建频率
    • 使用增量构建
    • 优化资源配置
    • 监控构建性能

3. 运维管理

  • 监控告警

    • 配置关键指标监控
    • 设置合理的告警阈值
    • 建立故障响应流程
    • 定期性能评估
  • 备份恢复

    • 定期备份元数据
    • 备份重要Cube数据
    • 测试恢复流程
    • 制定灾难恢复计划
  • 版本管理

    • 建立版本控制流程
    • 记录变更历史
    • 支持快速回滚
    • 环境隔离部署

5.5 本章小结

本章详细介绍了Apache Kylin的数据模型设计与管理,包括:

  1. 维度建模基础:掌握了事实表、维度表、星型模式等核心概念
  2. Kylin数据模型:学会了创建和配置数据模型的方法
  3. 维度和度量设计:了解了维度层次、度量类型等设计原则
  4. 模型管理优化:掌握了生命周期管理、性能监控等管理技能

通过本章的学习,你应该能够: - 设计合理的维度建模方案 - 创建和配置Kylin数据模型 - 优化模型性能和存储 - 建立完善的模型管理流程

下一章预告

下一章将介绍”Cube设计与构建”,包括: - Cube基本概念和架构 - Cube设计原则和策略 - 构建流程和优化 - 增量构建和刷新 - 故障排除和监控

练习与思考

理论练习

  1. 解释星型模式和雪花模式的区别和适用场景
  2. 描述Kylin中维度编码的作用和选择原则
  3. 分析聚合组对Cube存储的影响

实践练习

  1. 设计一个电商销售分析的数据模型
  2. 为给定的业务场景选择合适的维度和度量
  3. 使用提供的工具分析和优化现有模型

思考题

  1. 如何平衡查询性能和存储成本?
  2. 在什么情况下应该重新设计数据模型?
  3. 如何评估数据模型的质量和效果?

2. 维度层次设计

-- 时间维度层次设计
CREATE TABLE dim_time (
    time_key INT PRIMARY KEY,
    date_value DATE,
    year_value INT,
    quarter_value INT,
    month_value INT,
    week_value INT,
    day_value INT,
    weekday_name VARCHAR(20),
    month_name VARCHAR(20),
    quarter_name VARCHAR(10),
    is_weekend BOOLEAN,
    is_holiday BOOLEAN,
    fiscal_year INT,
    fiscal_quarter INT
);

-- 产品维度层次设计
CREATE TABLE dim_product (
    product_key INT PRIMARY KEY,
    product_id VARCHAR(50),
    product_name VARCHAR(200),
    product_category VARCHAR(100),
    product_subcategory VARCHAR(100),
    brand VARCHAR(100),
    supplier VARCHAR(100),
    unit_price DECIMAL(10,2),
    product_status VARCHAR(20),
    created_date DATE,
    updated_date DATE
);

-- 地理维度层次设计
CREATE TABLE dim_geography (
    geography_key INT PRIMARY KEY,
    country_code VARCHAR(10),
    country_name VARCHAR(100),
    region_code VARCHAR(10),
    region_name VARCHAR(100),
    state_code VARCHAR(10),
    state_name VARCHAR(100),
    city_code VARCHAR(10),
    city_name VARCHAR(100),
    postal_code VARCHAR(20),
    latitude DECIMAL(10,6),
    longitude DECIMAL(10,6)
);

-- 客户维度层次设计
CREATE TABLE dim_customer (
    customer_key INT PRIMARY KEY,
    customer_id VARCHAR(50),
    customer_name VARCHAR(200),
    customer_type VARCHAR(50),
    customer_segment VARCHAR(50),
    industry VARCHAR(100),
    company_size VARCHAR(50),
    registration_date DATE,
    last_activity_date DATE,
    customer_status VARCHAR(20)
);

3. 事实表设计

-- 销售事实表设计
CREATE TABLE fact_sales (
    sales_key BIGINT PRIMARY KEY,
    time_key INT,
    product_key INT,
    customer_key INT,
    geography_key INT,
    channel_key INT,
    promotion_key INT,
    
    -- 度量字段
    quantity DECIMAL(10,2),
    unit_price DECIMAL(10,2),
    sales_amount DECIMAL(15,2),
    cost_amount DECIMAL(15,2),
    profit_amount DECIMAL(15,2),
    discount_amount DECIMAL(15,2),
    tax_amount DECIMAL(15,2),
    shipping_amount DECIMAL(15,2),
    
    -- 退化维度
    order_id VARCHAR(50),
    invoice_id VARCHAR(50),
    sales_rep_id VARCHAR(50),
    
    -- 审计字段
    created_date TIMESTAMP,
    updated_date TIMESTAMP,
    
    -- 外键约束
    FOREIGN KEY (time_key) REFERENCES dim_time(time_key),
    FOREIGN KEY (product_key) REFERENCES dim_product(product_key),
    FOREIGN KEY (customer_key) REFERENCES dim_customer(customer_key),
    FOREIGN KEY (geography_key) REFERENCES dim_geography(geography_key)
);

-- 分区策略
ALTER TABLE fact_sales 
PARTITION BY RANGE (time_key) (
    PARTITION p2023 VALUES LESS THAN (20240101),
    PARTITION p2024q1 VALUES LESS THAN (20240401),
    PARTITION p2024q2 VALUES LESS THAN (20240701),
    PARTITION p2024q3 VALUES LESS THAN (20241001),
    PARTITION p2024q4 VALUES LESS THAN (20250101)
);

5.2.3 数据模型验证工具

#!/usr/bin/env python3
# advanced_model_validator.py - 高级数据模型验证工具

import json
import re
from typing import Dict, List, Tuple, Optional, Set
from dataclasses import dataclass
from enum import Enum
import logging

class ValidationLevel(Enum):
    """验证级别"""
    ERROR = "error"      # 错误,必须修复
    WARNING = "warning"  # 警告,建议修复
    INFO = "info"        # 信息,可选优化

@dataclass
class ValidationResult:
    """验证结果"""
    level: ValidationLevel
    category: str
    title: str
    description: str
    suggestion: str
    affected_objects: List[str]
    severity_score: int  # 1-10,10最严重

class AdvancedModelValidator:
    """高级数据模型验证器"""
    
    def __init__(self):
        self.validation_rules = self._load_validation_rules()
        self.logger = logging.getLogger(__name__)
    
    def validate_model(self, model_config: Dict) -> List[ValidationResult]:
        """全面验证数据模型"""
        results = []
        
        # 基础结构验证
        structure_results = self._validate_structure(model_config)
        results.extend(structure_results)
        
        # 维度验证
        dimension_results = self._validate_dimensions(model_config)
        results.extend(dimension_results)
        
        # 度量验证
        measure_results = self._validate_measures(model_config)
        results.extend(measure_results)
        
        # 关系验证
        relationship_results = self._validate_relationships(model_config)
        results.extend(relationship_results)
        
        # 性能验证
        performance_results = self._validate_performance(model_config)
        results.extend(performance_results)
        
        # 业务逻辑验证
        business_results = self._validate_business_logic(model_config)
        results.extend(business_results)
        
        # 安全性验证
        security_results = self._validate_security(model_config)
        results.extend(security_results)
        
        # 按严重程度排序
        results.sort(key=lambda x: (x.level.value, -x.severity_score))
        
        return results
    
    def _validate_structure(self, model_config: Dict) -> List[ValidationResult]:
        """验证模型结构"""
        results = []
        
        # 检查必需字段
        required_fields = ['name', 'fact_table', 'dimensions', 'measures']
        for field in required_fields:
            if field not in model_config:
                results.append(ValidationResult(
                    level=ValidationLevel.ERROR,
                    category='structure',
                    title=f'缺少必需字段: {field}',
                    description=f'数据模型必须包含{field}字段',
                    suggestion=f'添加{field}字段到模型配置中',
                    affected_objects=[field],
                    severity_score=9
                ))
        
        # 检查模型名称规范
        if 'name' in model_config:
            name = model_config['name']
            if not re.match(r'^[a-zA-Z][a-zA-Z0-9_]*$', name):
                results.append(ValidationResult(
                    level=ValidationLevel.WARNING,
                    category='structure',
                    title='模型名称不规范',
                    description=f'模型名称"{name}"不符合命名规范',
                    suggestion='使用字母开头,只包含字母、数字和下划线的名称',
                    affected_objects=[name],
                    severity_score=4
                ))
        
        # 检查描述信息
        if 'description' not in model_config or not model_config['description']:
            results.append(ValidationResult(
                level=ValidationLevel.INFO,
                category='structure',
                title='缺少模型描述',
                description='建议为模型添加描述信息',
                suggestion='添加description字段说明模型用途和业务场景',
                affected_objects=['description'],
                severity_score=2
            ))
        
        return results
    
    def _validate_dimensions(self, model_config: Dict) -> List[ValidationResult]:
        """验证维度配置"""
        results = []
        dimensions = model_config.get('dimensions', [])
        
        if not dimensions:
            results.append(ValidationResult(
                level=ValidationLevel.ERROR,
                category='dimensions',
                title='缺少维度定义',
                description='数据模型必须至少包含一个维度',
                suggestion='添加维度定义到dimensions数组中',
                affected_objects=['dimensions'],
                severity_score=10
            ))
            return results
        
        # 检查维度数量
        if len(dimensions) > 20:
            results.append(ValidationResult(
                level=ValidationLevel.WARNING,
                category='dimensions',
                title='维度数量过多',
                description=f'当前有{len(dimensions)}个维度,可能导致Cube爆炸',
                suggestion='考虑合并相关维度或移除低频使用的维度',
                affected_objects=[d.get('name', f'dim_{i}') for i, d in enumerate(dimensions)],
                severity_score=7
            ))
        
        # 验证每个维度
        dimension_names = set()
        for i, dim in enumerate(dimensions):
            if isinstance(dim, dict):
                dim_name = dim.get('name', f'dimension_{i}')
                
                # 检查重复名称
                if dim_name in dimension_names:
                    results.append(ValidationResult(
                        level=ValidationLevel.ERROR,
                        category='dimensions',
                        title=f'维度名称重复: {dim_name}',
                        description='维度名称必须唯一',
                        suggestion=f'修改重复的维度名称{dim_name}',
                        affected_objects=[dim_name],
                        severity_score=8
                    ))
                else:
                    dimension_names.add(dim_name)
                
                # 检查维度表配置
                if 'table' not in dim:
                    results.append(ValidationResult(
                        level=ValidationLevel.ERROR,
                        category='dimensions',
                        title=f'维度{dim_name}缺少表配置',
                        description='每个维度必须指定对应的表',
                        suggestion=f'为维度{dim_name}添加table配置',
                        affected_objects=[dim_name],
                        severity_score=9
                    ))
                
                # 检查编码配置
                if 'encoding' not in dim:
                    results.append(ValidationResult(
                        level=ValidationLevel.WARNING,
                        category='dimensions',
                        title=f'维度{dim_name}缺少编码配置',
                        description='建议为维度配置合适的编码策略',
                        suggestion=f'为维度{dim_name}添加encoding配置',
                        affected_objects=[dim_name],
                        severity_score=5
                    ))
                
                # 检查高基数维度
                cardinality = dim.get('cardinality', 0)
                if cardinality > 1000000:
                    results.append(ValidationResult(
                        level=ValidationLevel.WARNING,
                        category='dimensions',
                        title=f'维度{dim_name}基数过高',
                        description=f'维度基数{cardinality}可能影响性能',
                        suggestion='考虑对高基数维度进行分层或编码优化',
                        affected_objects=[dim_name],
                        severity_score=6
                    ))
        
        return results
    
    def _validate_measures(self, model_config: Dict) -> List[ValidationResult]:
        """验证度量配置"""
        results = []
        measures = model_config.get('measures', [])
        
        if not measures:
            results.append(ValidationResult(
                level=ValidationLevel.ERROR,
                category='measures',
                title='缺少度量定义',
                description='数据模型必须至少包含一个度量',
                suggestion='添加度量定义到measures数组中',
                affected_objects=['measures'],
                severity_score=10
            ))
            return results
        
        # 检查度量数量
        if len(measures) > 50:
            results.append(ValidationResult(
                level=ValidationLevel.WARNING,
                category='measures',
                title='度量数量过多',
                description=f'当前有{len(measures)}个度量,可能影响性能',
                suggestion='考虑合并相似度量或使用派生度量',
                affected_objects=[m.get('name', f'measure_{i}') for i, m in enumerate(measures)],
                severity_score=6
            ))
        
        # 验证每个度量
        measure_names = set()
        for i, measure in enumerate(measures):
            if isinstance(measure, dict):
                measure_name = measure.get('name', f'measure_{i}')
                
                # 检查重复名称
                if measure_name in measure_names:
                    results.append(ValidationResult(
                        level=ValidationLevel.ERROR,
                        category='measures',
                        title=f'度量名称重复: {measure_name}',
                        description='度量名称必须唯一',
                        suggestion=f'修改重复的度量名称{measure_name}',
                        affected_objects=[measure_name],
                        severity_score=8
                    ))
                else:
                    measure_names.add(measure_name)
                
                # 检查度量类型
                measure_type = measure.get('function', '')
                valid_functions = ['SUM', 'COUNT', 'MAX', 'MIN', 'COUNT_DISTINCT', 'TOP_N']
                if measure_type not in valid_functions:
                    results.append(ValidationResult(
                        level=ValidationLevel.ERROR,
                        category='measures',
                        title=f'度量{measure_name}函数类型无效',
                        description=f'度量函数"{measure_type}"不在支持列表中',
                        suggestion=f'使用有效的度量函数: {valid_functions}',
                        affected_objects=[measure_name],
                        severity_score=7
                    ))
                
                # 检查返回类型
                return_type = measure.get('return_type', '')
                if not return_type:
                    results.append(ValidationResult(
                        level=ValidationLevel.WARNING,
                        category='measures',
                        title=f'度量{measure_name}缺少返回类型',
                        description='建议明确指定度量的返回类型',
                        suggestion=f'为度量{measure_name}添加return_type配置',
                        affected_objects=[measure_name],
                        severity_score=4
                    ))
        
        return results
    
    def _validate_relationships(self, model_config: Dict) -> List[ValidationResult]:
        """验证表关系"""
        results = []
        
        # 检查连接关系
        joins = model_config.get('joins', [])
        if not joins:
            results.append(ValidationResult(
                level=ValidationLevel.WARNING,
                category='relationships',
                title='缺少表连接定义',
                description='多表模型建议明确定义表连接关系',
                suggestion='在joins数组中定义表之间的连接关系',
                affected_objects=['joins'],
                severity_score=5
            ))
        
        # 验证连接配置
        for i, join in enumerate(joins):
            if isinstance(join, dict):
                # 检查连接类型
                join_type = join.get('type', '')
                valid_types = ['inner', 'left', 'right', 'full']
                if join_type not in valid_types:
                    results.append(ValidationResult(
                        level=ValidationLevel.ERROR,
                        category='relationships',
                        title=f'连接{i}类型无效',
                        description=f'连接类型"{join_type}"不在支持列表中',
                        suggestion=f'使用有效的连接类型: {valid_types}',
                        affected_objects=[f'join_{i}'],
                        severity_score=7
                    ))
                
                # 检查连接条件
                if 'condition' not in join or not join['condition']:
                    results.append(ValidationResult(
                        level=ValidationLevel.ERROR,
                        category='relationships',
                        title=f'连接{i}缺少连接条件',
                        description='每个连接必须指定连接条件',
                        suggestion=f'为连接{i}添加condition配置',
                        affected_objects=[f'join_{i}'],
                        severity_score=8
                    ))
        
        return results
    
    def _validate_performance(self, model_config: Dict) -> List[ValidationResult]:
        """验证性能配置"""
        results = []
        
        # 检查分区配置
        partition_config = model_config.get('partition_config', {})
        if not partition_config:
            results.append(ValidationResult(
                level=ValidationLevel.WARNING,
                category='performance',
                title='缺少分区配置',
                description='建议配置分区策略以提高查询性能',
                suggestion='添加partition_config配置时间分区',
                affected_objects=['partition_config'],
                severity_score=6
            ))
        
        # 检查聚合组配置
        aggregation_groups = model_config.get('aggregation_groups', [])
        if not aggregation_groups:
            results.append(ValidationResult(
                level=ValidationLevel.WARNING,
                category='performance',
                title='缺少聚合组配置',
                description='建议配置聚合组以控制Cuboid数量',
                suggestion='添加aggregation_groups配置优化存储',
                affected_objects=['aggregation_groups'],
                severity_score=7
            ))
        
        # 检查Rowkey设计
        rowkey_columns = model_config.get('rowkey_columns', [])
        if not rowkey_columns:
            results.append(ValidationResult(
                level=ValidationLevel.WARNING,
                category='performance',
                title='缺少Rowkey配置',
                description='建议配置Rowkey以优化查询性能',
                suggestion='添加rowkey_columns配置查询优化',
                affected_objects=['rowkey_columns'],
                severity_score=6
            ))
        
        return results
    
    def _validate_business_logic(self, model_config: Dict) -> List[ValidationResult]:
        """验证业务逻辑"""
        results = []
        
        # 检查业务规则
        business_rules = model_config.get('business_rules', [])
        if business_rules:
            for i, rule in enumerate(business_rules):
                if isinstance(rule, dict):
                    # 检查规则完整性
                    if 'condition' not in rule or 'action' not in rule:
                        results.append(ValidationResult(
                            level=ValidationLevel.ERROR,
                            category='business_logic',
                            title=f'业务规则{i}不完整',
                            description='业务规则必须包含条件和动作',
                            suggestion=f'为业务规则{i}添加condition和action',
                            affected_objects=[f'business_rule_{i}'],
                            severity_score=6
                        ))
        
        # 检查数据质量规则
        quality_rules = model_config.get('quality_rules', [])
        if not quality_rules:
            results.append(ValidationResult(
                level=ValidationLevel.INFO,
                category='business_logic',
                title='建议添加数据质量规则',
                description='数据质量规则有助于确保数据准确性',
                suggestion='添加quality_rules配置数据验证',
                affected_objects=['quality_rules'],
                severity_score=3
            ))
        
        return results
    
    def _validate_security(self, model_config: Dict) -> List[ValidationResult]:
        """验证安全配置"""
        results = []
        
        # 检查访问控制
        access_control = model_config.get('access_control', {})
        if not access_control:
            results.append(ValidationResult(
                level=ValidationLevel.INFO,
                category='security',
                title='建议配置访问控制',
                description='访问控制有助于保护敏感数据',
                suggestion='添加access_control配置权限管理',
                affected_objects=['access_control'],
                severity_score=4
            ))
        
        # 检查敏感数据处理
        sensitive_columns = model_config.get('sensitive_columns', [])
        if sensitive_columns:
            for col in sensitive_columns:
                if isinstance(col, dict):
                    if 'encryption' not in col and 'masking' not in col:
                        results.append(ValidationResult(
                            level=ValidationLevel.WARNING,
                            category='security',
                            title=f'敏感列{col.get("name", "")}缺少保护措施',
                            description='敏感数据应配置加密或脱敏处理',
                            suggestion='为敏感列添加encryption或masking配置',
                            affected_objects=[col.get('name', '')],
                            severity_score=7
                        ))
        
        return results
    
    def _load_validation_rules(self) -> Dict:
        """加载验证规则"""
        return {
            'max_dimensions': 20,
            'max_measures': 50,
            'max_cardinality': 1000000,
            'required_fields': ['name', 'fact_table', 'dimensions', 'measures'],
            'valid_join_types': ['inner', 'left', 'right', 'full'],
            'valid_measure_functions': ['SUM', 'COUNT', 'MAX', 'MIN', 'COUNT_DISTINCT', 'TOP_N']
        }
    
    def generate_validation_report(self, model_config: Dict) -> Dict:
        """生成验证报告"""
        results = self.validate_model(model_config)
        
        # 统计结果
        stats = {
            'total_issues': len(results),
            'by_level': {'error': 0, 'warning': 0, 'info': 0},
            'by_category': {},
            'severity_distribution': {}
        }
        
        for result in results:
            # 按级别统计
            stats['by_level'][result.level.value] += 1
            
            # 按类别统计
            if result.category not in stats['by_category']:
                stats['by_category'][result.category] = 0
            stats['by_category'][result.category] += 1
            
            # 按严重程度统计
            severity_range = f"{(result.severity_score-1)//2*2+1}-{(result.severity_score-1)//2*2+2}"
            if severity_range not in stats['severity_distribution']:
                stats['severity_distribution'][severity_range] = 0
            stats['severity_distribution'][severity_range] += 1
        
        # 计算质量分数
        quality_score = self._calculate_quality_score(results)
        
        return {
            'model_name': model_config.get('name', 'unknown'),
            'validation_date': '2024-01-01',  # 实际应用中使用当前日期
            'quality_score': quality_score,
            'statistics': stats,
            'results': results,
            'recommendations': self._generate_recommendations(results)
        }
    
    def _calculate_quality_score(self, results: List[ValidationResult]) -> int:
        """计算模型质量分数(0-100)"""
        if not results:
            return 100
        
        # 基础分数
        base_score = 100
        
        # 根据问题严重程度扣分
        for result in results:
            if result.level == ValidationLevel.ERROR:
                base_score -= result.severity_score * 2
            elif result.level == ValidationLevel.WARNING:
                base_score -= result.severity_score
            else:  # INFO
                base_score -= result.severity_score * 0.5
        
        return max(0, int(base_score))
    
    def _generate_recommendations(self, results: List[ValidationResult]) -> List[Dict]:
        """生成改进建议"""
        recommendations = []
        
        # 按类别分组问题
        issues_by_category = {}
        for result in results:
            if result.category not in issues_by_category:
                issues_by_category[result.category] = []
            issues_by_category[result.category].append(result)
        
        # 为每个类别生成建议
        for category, issues in issues_by_category.items():
            error_count = sum(1 for issue in issues if issue.level == ValidationLevel.ERROR)
            warning_count = sum(1 for issue in issues if issue.level == ValidationLevel.WARNING)
            
            if error_count > 0:
                priority = 'high'
                description = f'{category}类别有{error_count}个错误需要立即修复'
            elif warning_count > 0:
                priority = 'medium'
                description = f'{category}类别有{warning_count}个警告建议修复'
            else:
                priority = 'low'
                description = f'{category}类别有一些优化建议'
            
            recommendations.append({
                'category': category,
                'priority': priority,
                'description': description,
                'issue_count': len(issues),
                'suggested_actions': [issue.suggestion for issue in issues[:3]]  # 前3个建议
            })
        
        return recommendations
    
    def print_validation_report(self, model_config: Dict):
        """打印验证报告"""
        report = self.generate_validation_report(model_config)
        
        print(f"\n=== {report['model_name']} 模型验证报告 ===")
        print(f"验证日期: {report['validation_date']}")
        print(f"质量分数: {report['quality_score']}/100")
        
        stats = report['statistics']
        print(f"\n📊 问题统计:")
        print(f"  总问题数: {stats['total_issues']}")
        print(f"  错误: {stats['by_level']['error']}")
        print(f"  警告: {stats['by_level']['warning']}")
        print(f"  信息: {stats['by_level']['info']}")
        
        if stats['total_issues'] == 0:
            print("\n✅ 模型验证通过,没有发现问题!")
            return
        
        print("\n📋 按类别分布:")
        for category, count in stats['by_category'].items():
            print(f"  {category}: {count}")
        
        print("\n🔍 详细问题:")
        for i, result in enumerate(report['results'], 1):
            level_icon = {'error': '🔴', 'warning': '🟡', 'info': '🔵'}.get(result.level.value, '⚪')
            print(f"\n{level_icon} {i}. [{result.category}] {result.title}")
            print(f"   描述: {result.description}")
            print(f"   建议: {result.suggestion}")
            print(f"   影响对象: {', '.join(result.affected_objects)}")
            print(f"   严重程度: {result.severity_score}/10")
        
        print("\n💡 改进建议:")
        for rec in report['recommendations']:
            priority_icon = {'high': '🔴', 'medium': '🟡', 'low': '🟢'}.get(rec['priority'], '⚪')
            print(f"\n{priority_icon} {rec['category'].upper()}")
            print(f"   {rec['description']}")
            print(f"   建议行动:")
            for action in rec['suggested_actions']:
                print(f"     - {action}")

def main():
    # 示例使用
    validator = AdvancedModelValidator()
    
    # 示例模型配置(包含一些问题)
    model_config = {
        "name": "sales-model",  # 名称不规范
        "fact_table": "fact_sales",
        "dimensions": [
            {"name": "time", "table": "dim_time", "encoding": {"type": "date"}},
            {"name": "product", "table": "dim_product"},  # 缺少编码
            {"name": "customer", "table": "dim_customer", "cardinality": 2000000},  # 高基数
            {"name": "time", "table": "dim_time2"}  # 重复名称
        ],
        "measures": [
            {"name": "sales_amount", "function": "SUM", "return_type": "decimal"},
            {"name": "quantity", "function": "INVALID_FUNC"},  # 无效函数
            {"name": "sales_amount", "function": "COUNT"}  # 重复名称
        ],
        "joins": [
            {"type": "invalid_join", "condition": ""}  # 无效类型和空条件
        ]
        # 缺少描述、分区配置等
    }
    
    # 生成验证报告
    validator.print_validation_report(model_config)

if __name__ == "__main__":
    main()

5.3 Cube设计与管理

5.3.1 Cube基础概念

Cube是Kylin中预计算聚合数据的核心概念,它基于数据模型定义,包含了所有可能的维度组合(Cuboid)。

Cube设计原则:

  1. 维度选择原则

    • 选择查询频率高的维度
    • 避免高基数维度组合
    • 考虑维度之间的相关性
  2. 聚合组设计

    • 使用聚合组控制Cuboid数量
    • 设置强制维度和层次维度
    • 配置联合维度减少组合
  3. 分区策略

    • 按时间分区提高构建效率
    • 合理设置分区粒度
    • 考虑数据保留策略

5.3.2 Cube配置示例

{
  "uuid": "a24ca905-1fc6-4f67-985c-38fa5aeafd92",
  "last_modified": 1578313162000,
  "version": "3.0.0.20500",
  "name": "sales_cube",
  "description": "销售数据分析Cube",
  "model_name": "sales_model",
  "engine_type": 2,
  "storage_type": 2,
  "dimensions": [
    {
      "name": "TIME",
      "table": "DIM_TIME",
      "column": "TIME_KEY",
      "derived": ["YEAR", "QUARTER", "MONTH", "WEEK", "DAY"]
    },
    {
      "name": "PRODUCT",
      "table": "DIM_PRODUCT",
      "column": "PRODUCT_KEY",
      "derived": ["CATEGORY", "SUBCATEGORY", "BRAND"]
    },
    {
      "name": "CUSTOMER",
      "table": "DIM_CUSTOMER",
      "column": "CUSTOMER_KEY",
      "derived": ["SEGMENT", "TYPE", "INDUSTRY"]
    },
    {
      "name": "GEOGRAPHY",
      "table": "DIM_GEOGRAPHY",
      "column": "GEOGRAPHY_KEY",
      "derived": ["COUNTRY", "REGION", "STATE", "CITY"]
    }
  ],
  "measures": [
    {
      "name": "SALES_AMOUNT",
      "function": {
        "expression": "SUM",
        "parameter": {
          "type": "column",
          "value": "SALES_AMOUNT"
        },
        "returntype": "decimal(19,4)"
      }
    },
    {
      "name": "QUANTITY",
      "function": {
        "expression": "SUM",
        "parameter": {
          "type": "column",
          "value": "QUANTITY"
        },
        "returntype": "decimal(19,4)"
      }
    },
    {
      "name": "ORDER_COUNT",
      "function": {
        "expression": "COUNT_DISTINCT",
        "parameter": {
          "type": "column",
          "value": "ORDER_ID"
        },
        "returntype": "hllc(10)"
      }
    }
  ],
  "aggregation_groups": [
    {
      "includes": ["TIME", "PRODUCT", "CUSTOMER", "GEOGRAPHY"],
      "select_rule": {
        "hierarchy_dims": [
          ["TIME.YEAR", "TIME.QUARTER", "TIME.MONTH", "TIME.WEEK", "TIME.DAY"],
          ["PRODUCT.CATEGORY", "PRODUCT.SUBCATEGORY", "PRODUCT.BRAND"],
          ["GEOGRAPHY.COUNTRY", "GEOGRAPHY.REGION", "GEOGRAPHY.STATE", "GEOGRAPHY.CITY"]
        ],
        "mandatory_dims": ["TIME"],
        "joint_dims": [
          ["CUSTOMER.SEGMENT", "CUSTOMER.TYPE"]
        ]
      }
    }
  ],
  "partition_date_start": 0,
  "partition_date_end": 3153600000000,
  "partition_type": "APPEND",
  "partition_date_column": "TIME.DATE_VALUE",
  "partition_date_format": "yyyy-MM-dd",
  "retention_range": 0,
  "auto_merge_time_ranges": [604800000, 2419200000],
  "volatile_range": 0,
  "notify_list": [],
  "status_need_notify": ["ERROR", "DISCARDED", "SUCCEED"],
  "override_kylin_properties": {
    "kylin.cube.algorithm": "auto",
    "kylin.cube.algorithm.layer-or-inmem-threshold": "7",
    "kylin.cube.aggrgroup.max-combination": "32768"
  }
}

5.3.3 Cube管理工具

#!/usr/bin/env python3
# cube_manager.py - Cube管理工具

import json
import requests
import time
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging

@dataclass
class CubeInfo:
    """Cube信息"""
    name: str
    status: str
    size: int
    segments: int
    last_build_time: str
    model_name: str
    project: str

@dataclass
class BuildJob:
    """构建任务信息"""
    uuid: str
    cube_name: str
    job_type: str
    status: str
    progress: float
    start_time: str
    duration: int
    error_message: Optional[str] = None

class CubeManager:
    """Cube管理器"""
    
    def __init__(self, kylin_host: str, username: str, password: str, project: str = "default"):
        self.kylin_host = kylin_host.rstrip('/')
        self.username = username
        self.password = password
        self.project = project
        self.session = requests.Session()
        self.logger = logging.getLogger(__name__)
        
        # 认证
        self._authenticate()
    
    def _authenticate(self):
        """认证登录"""
        auth_url = f"{self.kylin_host}/kylin/api/user/authentication"
        auth_data = {
            "username": self.username,
            "password": self.password
        }
        
        response = self.session.post(auth_url, json=auth_data)
        if response.status_code == 200:
            self.logger.info("Kylin认证成功")
        else:
            raise Exception(f"Kylin认证失败: {response.text}")
    
    def list_cubes(self, project: Optional[str] = None) -> List[CubeInfo]:
        """列出所有Cube"""
        project = project or self.project
        url = f"{self.kylin_host}/kylin/api/cubes"
        params = {"projectName": project}
        
        response = self.session.get(url, params=params)
        if response.status_code != 200:
            raise Exception(f"获取Cube列表失败: {response.text}")
        
        cubes = []
        for cube_data in response.json():
            cube_info = CubeInfo(
                name=cube_data["name"],
                status=cube_data["status"],
                size=cube_data.get("size_kb", 0),
                segments=len(cube_data.get("segments", [])),
                last_build_time=cube_data.get("last_build_time", ""),
                model_name=cube_data.get("model_name", ""),
                project=cube_data.get("project", "")
            )
            cubes.append(cube_info)
        
        return cubes
    
    def get_cube_info(self, cube_name: str) -> Dict:
        """获取Cube详细信息"""
        url = f"{self.kylin_host}/kylin/api/cubes/{cube_name}"
        
        response = self.session.get(url)
        if response.status_code != 200:
            raise Exception(f"获取Cube信息失败: {response.text}")
        
        return response.json()
    
    def create_cube(self, cube_config: Dict) -> bool:
        """创建Cube"""
        url = f"{self.kylin_host}/kylin/api/cubes"
        
        response = self.session.post(url, json=cube_config)
        if response.status_code == 200:
            self.logger.info(f"Cube {cube_config['name']} 创建成功")
            return True
        else:
            self.logger.error(f"Cube创建失败: {response.text}")
            return False
    
    def build_cube(self, cube_name: str, start_time: str, end_time: str, 
                   build_type: str = "BUILD") -> str:
        """构建Cube"""
        url = f"{self.kylin_host}/kylin/api/cubes/{cube_name}/build"
        
        build_data = {
            "startTime": int(datetime.strptime(start_time, "%Y-%m-%d").timestamp() * 1000),
            "endTime": int(datetime.strptime(end_time, "%Y-%m-%d").timestamp() * 1000),
            "buildType": build_type
        }
        
        response = self.session.put(url, json=build_data)
        if response.status_code == 200:
            job_info = response.json()
            job_uuid = job_info["uuid"]
            self.logger.info(f"Cube构建任务已提交,任务ID: {job_uuid}")
            return job_uuid
        else:
            raise Exception(f"Cube构建失败: {response.text}")
    
    def get_job_status(self, job_uuid: str) -> BuildJob:
        """获取构建任务状态"""
        url = f"{self.kylin_host}/kylin/api/jobs/{job_uuid}"
        
        response = self.session.get(url)
        if response.status_code != 200:
            raise Exception(f"获取任务状态失败: {response.text}")
        
        job_data = response.json()
        return BuildJob(
            uuid=job_data["uuid"],
            cube_name=job_data.get("related_cube", ""),
            job_type=job_data["job_type"],
            status=job_data["job_status"],
            progress=job_data.get("progress", 0.0),
            start_time=job_data.get("create_time", ""),
            duration=job_data.get("duration", 0),
            error_message=job_data.get("error_message")
        )
    
    def wait_for_job_completion(self, job_uuid: str, timeout: int = 3600) -> bool:
        """等待任务完成"""
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            job = self.get_job_status(job_uuid)
            
            if job.status == "FINISHED":
                self.logger.info(f"任务 {job_uuid} 完成")
                return True
            elif job.status in ["ERROR", "DISCARDED"]:
                self.logger.error(f"任务 {job_uuid} 失败: {job.error_message}")
                return False
            
            self.logger.info(f"任务 {job_uuid} 进度: {job.progress:.1f}%")
            time.sleep(30)  # 等待30秒后再次检查
        
        self.logger.warning(f"任务 {job_uuid} 超时")
        return False
    
    def purge_cube(self, cube_name: str) -> bool:
        """清空Cube数据"""
        url = f"{self.kylin_host}/kylin/api/cubes/{cube_name}/purge"
        
        response = self.session.put(url)
        if response.status_code == 200:
            self.logger.info(f"Cube {cube_name} 数据已清空")
            return True
        else:
            self.logger.error(f"清空Cube失败: {response.text}")
            return False
    
    def delete_cube(self, cube_name: str) -> bool:
        """删除Cube"""
        url = f"{self.kylin_host}/kylin/api/cubes/{cube_name}"
        
        response = self.session.delete(url)
        if response.status_code == 200:
            self.logger.info(f"Cube {cube_name} 已删除")
            return True
        else:
            self.logger.error(f"删除Cube失败: {response.text}")
            return False
    
    def optimize_cube_segments(self, cube_name: str) -> List[str]:
        """优化Cube段"""
        # 获取Cube信息
        cube_info = self.get_cube_info(cube_name)
        segments = cube_info.get("segments", [])
        
        optimization_jobs = []
        
        # 合并小段
        small_segments = [s for s in segments if s.get("size_kb", 0) < 1024 * 1024]  # 小于1GB
        if len(small_segments) > 1:
            # 提交合并任务
            merge_job = self._merge_segments(cube_name, small_segments)
            if merge_job:
                optimization_jobs.append(merge_job)
        
        # 刷新过期段
        expired_segments = self._find_expired_segments(segments)
        for segment in expired_segments:
            refresh_job = self._refresh_segment(cube_name, segment)
            if refresh_job:
                optimization_jobs.append(refresh_job)
        
        return optimization_jobs
    
    def _merge_segments(self, cube_name: str, segments: List[Dict]) -> Optional[str]:
        """合并段"""
        if len(segments) < 2:
            return None
        
        url = f"{self.kylin_host}/kylin/api/cubes/{cube_name}/merge"
        
        # 找到时间范围
        start_time = min(s["date_range_start"] for s in segments)
        end_time = max(s["date_range_end"] for s in segments)
        
        merge_data = {
            "startTime": start_time,
            "endTime": end_time
        }
        
        response = self.session.put(url, json=merge_data)
        if response.status_code == 200:
            job_info = response.json()
            return job_info["uuid"]
        else:
            self.logger.error(f"段合并失败: {response.text}")
            return None
    
    def _find_expired_segments(self, segments: List[Dict]) -> List[Dict]:
        """查找过期段"""
        expired = []
        current_time = int(time.time() * 1000)
        
        for segment in segments:
            # 如果段超过7天没有更新,认为可能需要刷新
            last_build_time = segment.get("last_build_time", 0)
            if current_time - last_build_time > 7 * 24 * 3600 * 1000:
                expired.append(segment)
        
        return expired
    
    def _refresh_segment(self, cube_name: str, segment: Dict) -> Optional[str]:
        """刷新段"""
        url = f"{self.kylin_host}/kylin/api/cubes/{cube_name}/build"
        
        refresh_data = {
            "startTime": segment["date_range_start"],
            "endTime": segment["date_range_end"],
            "buildType": "REFRESH"
        }
        
        response = self.session.put(url, json=refresh_data)
        if response.status_code == 200:
            job_info = response.json()
            return job_info["uuid"]
        else:
            self.logger.error(f"段刷新失败: {response.text}")
            return None
    
    def generate_cube_report(self, cube_name: str) -> Dict:
        """生成Cube报告"""
        cube_info = self.get_cube_info(cube_name)
        segments = cube_info.get("segments", [])
        
        # 计算统计信息
        total_size = sum(s.get("size_kb", 0) for s in segments)
        total_rows = sum(s.get("input_records", 0) for s in segments)
        
        # 分析段分布
        segment_analysis = {
            "total_segments": len(segments),
            "total_size_gb": total_size / (1024 * 1024),
            "total_rows": total_rows,
            "avg_segment_size_mb": (total_size / len(segments) / 1024) if segments else 0,
            "size_distribution": self._analyze_segment_sizes(segments),
            "time_coverage": self._analyze_time_coverage(segments)
        }
        
        # 性能分析
        performance_analysis = {
            "compression_ratio": self._calculate_compression_ratio(cube_info),
            "build_efficiency": self._analyze_build_efficiency(segments),
            "query_performance": self._estimate_query_performance(cube_info)
        }
        
        # 优化建议
        optimization_suggestions = self._generate_optimization_suggestions(cube_info, segment_analysis)
        
        return {
            "cube_name": cube_name,
            "report_time": datetime.now().isoformat(),
            "basic_info": {
                "status": cube_info["status"],
                "model_name": cube_info.get("model_name", ""),
                "dimensions": len(cube_info.get("dimensions", [])),
                "measures": len(cube_info.get("measures", []))
            },
            "segment_analysis": segment_analysis,
            "performance_analysis": performance_analysis,
            "optimization_suggestions": optimization_suggestions
        }
    
    def _analyze_segment_sizes(self, segments: List[Dict]) -> Dict:
        """分析段大小分布"""
        if not segments:
            return {}
        
        sizes = [s.get("size_kb", 0) for s in segments]
        sizes.sort()
        
        return {
            "min_size_mb": min(sizes) / 1024,
            "max_size_mb": max(sizes) / 1024,
            "median_size_mb": sizes[len(sizes)//2] / 1024,
            "small_segments": len([s for s in sizes if s < 100 * 1024]),  # <100MB
            "large_segments": len([s for s in sizes if s > 10 * 1024 * 1024])  # >10GB
        }
    
    def _analyze_time_coverage(self, segments: List[Dict]) -> Dict:
        """分析时间覆盖范围"""
        if not segments:
            return {}
        
        start_times = [s.get("date_range_start", 0) for s in segments]
        end_times = [s.get("date_range_end", 0) for s in segments]
        
        return {
            "earliest_date": datetime.fromtimestamp(min(start_times)/1000).isoformat(),
            "latest_date": datetime.fromtimestamp(max(end_times)/1000).isoformat(),
            "total_days": (max(end_times) - min(start_times)) / (24 * 3600 * 1000),
            "gaps": self._find_time_gaps(segments)
        }
    
    def _find_time_gaps(self, segments: List[Dict]) -> List[Dict]:
        """查找时间间隙"""
        if len(segments) < 2:
            return []
        
        # 按时间排序
        sorted_segments = sorted(segments, key=lambda x: x.get("date_range_start", 0))
        
        gaps = []
        for i in range(len(sorted_segments) - 1):
            current_end = sorted_segments[i].get("date_range_end", 0)
            next_start = sorted_segments[i + 1].get("date_range_start", 0)
            
            if next_start > current_end:
                gap_days = (next_start - current_end) / (24 * 3600 * 1000)
                gaps.append({
                    "start": datetime.fromtimestamp(current_end/1000).isoformat(),
                    "end": datetime.fromtimestamp(next_start/1000).isoformat(),
                    "days": gap_days
                })
        
        return gaps
    
    def _calculate_compression_ratio(self, cube_info: Dict) -> float:
        """计算压缩比"""
        segments = cube_info.get("segments", [])
        if not segments:
            return 0.0
        
        total_input_records = sum(s.get("input_records", 0) for s in segments)
        total_size_kb = sum(s.get("size_kb", 0) for s in segments)
        
        if total_input_records == 0:
            return 0.0
        
        # 估算原始数据大小(假设每行平均1KB)
        estimated_raw_size_kb = total_input_records
        
        return estimated_raw_size_kb / total_size_kb if total_size_kb > 0 else 0.0
    
    def _analyze_build_efficiency(self, segments: List[Dict]) -> Dict:
        """分析构建效率"""
        if not segments:
            return {}
        
        build_times = []
        for segment in segments:
            start_time = segment.get("create_time_utc", 0)
            end_time = segment.get("last_build_time", 0)
            if end_time > start_time:
                build_times.append(end_time - start_time)
        
        if not build_times:
            return {}
        
        return {
            "avg_build_time_minutes": sum(build_times) / len(build_times) / (60 * 1000),
            "max_build_time_minutes": max(build_times) / (60 * 1000),
            "min_build_time_minutes": min(build_times) / (60 * 1000)
        }
    
    def _estimate_query_performance(self, cube_info: Dict) -> Dict:
        """估算查询性能"""
        dimensions = cube_info.get("dimensions", [])
        measures = cube_info.get("measures", [])
        aggregation_groups = cube_info.get("aggregation_groups", [])
        
        # 估算Cuboid数量
        total_cuboids = 1
        for agg_group in aggregation_groups:
            includes = agg_group.get("includes", [])
            total_cuboids *= 2 ** len(includes)
        
        return {
            "estimated_cuboids": total_cuboids,
            "dimension_count": len(dimensions),
            "measure_count": len(measures),
            "complexity_score": min(10, total_cuboids / 1000)  # 1-10分
        }
    
    def _generate_optimization_suggestions(self, cube_info: Dict, segment_analysis: Dict) -> List[Dict]:
        """生成优化建议"""
        suggestions = []
        
        # 段数量建议
        if segment_analysis["total_segments"] > 100:
            suggestions.append({
                "category": "segments",
                "priority": "high",
                "title": "段数量过多",
                "description": f"当前有{segment_analysis['total_segments']}个段,建议合并",
                "action": "执行段合并操作,将小段合并为大段"
            })
        
        # 段大小建议
        size_dist = segment_analysis.get("size_distribution", {})
        if size_dist.get("small_segments", 0) > 10:
            suggestions.append({
                "category": "segments",
                "priority": "medium",
                "title": "存在过多小段",
                "description": f"有{size_dist['small_segments']}个小于100MB的段",
                "action": "合并小段以提高查询性能"
            })
        
        # 时间间隙建议
        time_coverage = segment_analysis.get("time_coverage", {})
        gaps = time_coverage.get("gaps", [])
        if gaps:
            suggestions.append({
                "category": "data",
                "priority": "medium",
                "title": "存在时间间隙",
                "description": f"发现{len(gaps)}个时间间隙",
                "action": "检查数据源,补充缺失的时间段数据"
            })
        
        return suggestions
    
    def print_cube_report(self, cube_name: str):
        """打印Cube报告"""
        report = self.generate_cube_report(cube_name)
        
        print(f"\n=== {cube_name} Cube分析报告 ===")
        print(f"报告时间: {report['report_time']}")
        
        # 基础信息
        basic = report['basic_info']
        print(f"\n📊 基础信息:")
        print(f"  状态: {basic['status']}")
        print(f"  数据模型: {basic['model_name']}")
        print(f"  维度数量: {basic['dimensions']}")
        print(f"  度量数量: {basic['measures']}")
        
        # 段分析
        segment = report['segment_analysis']
        print(f"\n📈 段分析:")
        print(f"  总段数: {segment['total_segments']}")
        print(f"  总大小: {segment['total_size_gb']:.2f} GB")
        print(f"  总行数: {segment['total_rows']:,}")
        print(f"  平均段大小: {segment['avg_segment_size_mb']:.2f} MB")
        
        # 性能分析
        performance = report['performance_analysis']
        print(f"\n⚡ 性能分析:")
        print(f"  压缩比: {performance['compression_ratio']:.2f}")
        print(f"  复杂度评分: {performance['query_performance']['complexity_score']:.1f}/10")
        print(f"  预估Cuboid数: {performance['query_performance']['estimated_cuboids']:,}")
        
        # 优化建议
        suggestions = report['optimization_suggestions']
        if suggestions:
            print(f"\n💡 优化建议:")
            for i, suggestion in enumerate(suggestions, 1):
                priority_icon = {'high': '🔴', 'medium': '🟡', 'low': '🟢'}.get(suggestion['priority'], '⚪')
                print(f"\n{priority_icon} {i}. {suggestion['title']}")
                print(f"   {suggestion['description']}")
                print(f"   建议: {suggestion['action']}")
        else:
            print(f"\n✅ 暂无优化建议,Cube状态良好")

def main():
    # 示例使用
    manager = CubeManager(
        kylin_host="http://localhost:7070",
        username="ADMIN",
        password="KYLIN",
        project="learn_kylin"
    )
    
    # 列出所有Cube
    cubes = manager.list_cubes()
    print(f"找到 {len(cubes)} 个Cube:")
    for cube in cubes:
        print(f"  - {cube.name} ({cube.status})")
    
    # 生成第一个Cube的报告
    if cubes:
        manager.print_cube_report(cubes[0].name)

if __name__ == "__main__":
    main()

5.3.4 Cube性能优化

1. 聚合组优化

#!/usr/bin/env python3
# aggregation_group_optimizer.py - 聚合组优化工具

import itertools
from typing import List, Dict, Set, Tuple
import math

class AggregationGroupOptimizer:
    """聚合组优化器"""
    
    def __init__(self):
        self.max_combination_limit = 32768  # Kylin默认限制
    
    def optimize_aggregation_groups(self, dimensions: List[str], 
                                  query_patterns: List[Dict]) -> List[Dict]:
        """优化聚合组配置"""
        
        # 分析查询模式
        dimension_usage = self._analyze_dimension_usage(query_patterns)
        dimension_correlations = self._analyze_dimension_correlations(query_patterns)
        
        # 识别强制维度
        mandatory_dims = self._identify_mandatory_dimensions(dimension_usage)
        
        # 识别层次维度
        hierarchy_dims = self._identify_hierarchy_dimensions(dimensions, query_patterns)
        
        # 识别联合维度
        joint_dims = self._identify_joint_dimensions(dimension_correlations)
        
        # 生成聚合组
        aggregation_groups = self._generate_aggregation_groups(
            dimensions, mandatory_dims, hierarchy_dims, joint_dims
        )
        
        # 验证和优化
        optimized_groups = self._validate_and_optimize(aggregation_groups)
        
        return optimized_groups
    
    def _analyze_dimension_usage(self, query_patterns: List[Dict]) -> Dict[str, float]:
        """分析维度使用频率"""
        usage_count = {}
        total_queries = len(query_patterns)
        
        for pattern in query_patterns:
            dimensions = pattern.get('dimensions', [])
            frequency = pattern.get('frequency', 1)
            
            for dim in dimensions:
                if dim not in usage_count:
                    usage_count[dim] = 0
                usage_count[dim] += frequency
        
        # 计算使用率
        usage_rate = {}
        for dim, count in usage_count.items():
            usage_rate[dim] = count / total_queries if total_queries > 0 else 0
        
        return usage_rate
    
    def _analyze_dimension_correlations(self, query_patterns: List[Dict]) -> Dict[Tuple[str, str], float]:
        """分析维度相关性"""
        correlations = {}
        
        for pattern in query_patterns:
            dimensions = pattern.get('dimensions', [])
            frequency = pattern.get('frequency', 1)
            
            # 计算维度对的共现频率
            for i, dim1 in enumerate(dimensions):
                for dim2 in dimensions[i+1:]:
                    pair = tuple(sorted([dim1, dim2]))
                    if pair not in correlations:
                        correlations[pair] = 0
                    correlations[pair] += frequency
        
        # 归一化相关性分数
        max_freq = max(correlations.values()) if correlations else 1
        for pair in correlations:
            correlations[pair] = correlations[pair] / max_freq
        
        return correlations
    
    def _identify_mandatory_dimensions(self, dimension_usage: Dict[str, float]) -> List[str]:
        """识别强制维度(使用率>80%)"""
        mandatory = []
        for dim, usage_rate in dimension_usage.items():
            if usage_rate > 0.8:
                mandatory.append(dim)
        return mandatory
    
    def _identify_hierarchy_dimensions(self, dimensions: List[str], 
                                     query_patterns: List[Dict]) -> List[List[str]]:
        """识别层次维度"""
        hierarchies = []
        
        # 基于命名模式识别层次
        time_hierarchy = [d for d in dimensions if any(t in d.lower() for t in ['year', 'quarter', 'month', 'week', 'day'])]
        if len(time_hierarchy) > 1:
            # 按时间粒度排序
            time_order = ['year', 'quarter', 'month', 'week', 'day']
            time_hierarchy.sort(key=lambda x: min([time_order.index(t) for t in time_order if t in x.lower()] + [999]))
            hierarchies.append(time_hierarchy)
        
        geo_hierarchy = [d for d in dimensions if any(g in d.lower() for g in ['country', 'region', 'state', 'city'])]
        if len(geo_hierarchy) > 1:
            # 按地理层级排序
            geo_order = ['country', 'region', 'state', 'city']
            geo_hierarchy.sort(key=lambda x: min([geo_order.index(g) for g in geo_order if g in x.lower()] + [999]))
            hierarchies.append(geo_hierarchy)
        
        product_hierarchy = [d for d in dimensions if any(p in d.lower() for p in ['category', 'subcategory', 'brand', 'product'])]
        if len(product_hierarchy) > 1:
            # 按产品层级排序
            product_order = ['category', 'subcategory', 'brand', 'product']
            product_hierarchy.sort(key=lambda x: min([product_order.index(p) for p in product_order if p in x.lower()] + [999]))
            hierarchies.append(product_hierarchy)
        
        return hierarchies
    
    def _identify_joint_dimensions(self, correlations: Dict[Tuple[str, str], float]) -> List[List[str]]:
        """识别联合维度(相关性>0.8)"""
        joint_dims = []
        high_correlation_pairs = [(pair, score) for pair, score in correlations.items() if score > 0.8]
        
        # 将高相关性的维度对组合成联合维度组
        processed_dims = set()
        for pair, score in high_correlation_pairs:
            dim1, dim2 = pair
            if dim1 not in processed_dims and dim2 not in processed_dims:
                joint_dims.append([dim1, dim2])
                processed_dims.add(dim1)
                processed_dims.add(dim2)
        
        return joint_dims
    
    def _generate_aggregation_groups(self, dimensions: List[str], 
                                   mandatory_dims: List[str],
                                   hierarchy_dims: List[List[str]],
                                   joint_dims: List[List[str]]) -> List[Dict]:
        """生成聚合组配置"""
        aggregation_groups = []
        
        # 主聚合组:包含所有维度
        main_group = {
            "includes": dimensions,
            "select_rule": {
                "mandatory_dims": mandatory_dims,
                "hierarchy_dims": hierarchy_dims,
                "joint_dims": joint_dims
            }
        }
        aggregation_groups.append(main_group)
        
        # 如果维度太多,创建额外的聚合组
        if len(dimensions) > 15:
            # 按功能分组
            time_dims = [d for d in dimensions if any(t in d.lower() for t in ['time', 'date', 'year', 'month', 'day'])]
            product_dims = [d for d in dimensions if any(p in d.lower() for p in ['product', 'category', 'brand'])]
            customer_dims = [d for d in dimensions if any(c in d.lower() for c in ['customer', 'client', 'user'])]
            geo_dims = [d for d in dimensions if any(g in d.lower() for g in ['geo', 'location', 'country', 'city'])]
            
            # 时间+产品聚合组
            if time_dims and product_dims:
                time_product_group = {
                    "includes": time_dims + product_dims + mandatory_dims,
                    "select_rule": {
                        "mandatory_dims": [d for d in mandatory_dims if d in time_dims + product_dims],
                        "hierarchy_dims": [h for h in hierarchy_dims if any(d in time_dims + product_dims for d in h)]
                    }
                }
                aggregation_groups.append(time_product_group)
            
            # 时间+客户聚合组
            if time_dims and customer_dims:
                time_customer_group = {
                    "includes": time_dims + customer_dims + mandatory_dims,
                    "select_rule": {
                        "mandatory_dims": [d for d in mandatory_dims if d in time_dims + customer_dims],
                        "hierarchy_dims": [h for h in hierarchy_dims if any(d in time_dims + customer_dims for d in h)]
                    }
                }
                aggregation_groups.append(time_customer_group)
        
        return aggregation_groups
    
    def _validate_and_optimize(self, aggregation_groups: List[Dict]) -> List[Dict]:
        """验证和优化聚合组"""
        optimized_groups = []
        
        for group in aggregation_groups:
            # 计算Cuboid数量
            cuboid_count = self._estimate_cuboid_count(group)
            
            if cuboid_count > self.max_combination_limit:
                # 如果超过限制,进行优化
                optimized_group = self._reduce_cuboid_count(group)
                optimized_groups.append(optimized_group)
            else:
                optimized_groups.append(group)
        
        return optimized_groups
    
    def _estimate_cuboid_count(self, aggregation_group: Dict) -> int:
        """估算Cuboid数量"""
        includes = aggregation_group.get("includes", [])
        select_rule = aggregation_group.get("select_rule", {})
        
        # 基础组合数
        base_combinations = 2 ** len(includes)
        
        # 考虑层次维度的减少
        hierarchy_dims = select_rule.get("hierarchy_dims", [])
        for hierarchy in hierarchy_dims:
            if len(hierarchy) > 1:
                # 层次维度减少组合数
                hierarchy_reduction = 2 ** len(hierarchy) - len(hierarchy) - 1
                base_combinations -= hierarchy_reduction
        
        # 考虑联合维度的减少
        joint_dims = select_rule.get("joint_dims", [])
        for joint in joint_dims:
            if len(joint) > 1:
                # 联合维度减少组合数
                joint_reduction = 2 ** len(joint) - 2
                base_combinations -= joint_reduction
        
        return max(1, base_combinations)
    
    def _reduce_cuboid_count(self, aggregation_group: Dict) -> Dict:
        """减少Cuboid数量"""
        includes = aggregation_group.get("includes", [])
        select_rule = aggregation_group.get("select_rule", {})
        
        # 增加更多联合维度
        existing_joints = select_rule.get("joint_dims", [])
        
        # 将相关维度组合成联合维度
        remaining_dims = [d for d in includes if not any(d in joint for joint in existing_joints)]
        
        # 每3个维度组成一个联合维度组
        new_joints = []
        for i in range(0, len(remaining_dims), 3):
            joint_group = remaining_dims[i:i+3]
            if len(joint_group) > 1:
                new_joints.append(joint_group)
        
        # 更新聚合组
        optimized_group = aggregation_group.copy()
        optimized_group["select_rule"]["joint_dims"] = existing_joints + new_joints
        
        return optimized_group
    
    def generate_optimization_report(self, dimensions: List[str], 
                                   query_patterns: List[Dict]) -> Dict:
        """生成优化报告"""
        # 分析当前状态
        dimension_usage = self._analyze_dimension_usage(query_patterns)
        correlations = self._analyze_dimension_correlations(query_patterns)
        
        # 生成优化建议
        optimized_groups = self.optimize_aggregation_groups(dimensions, query_patterns)
        
        # 计算优化效果
        original_cuboids = 2 ** len(dimensions)
        optimized_cuboids = sum(self._estimate_cuboid_count(group) for group in optimized_groups)
        
        reduction_ratio = (original_cuboids - optimized_cuboids) / original_cuboids if original_cuboids > 0 else 0
        
        return {
            "analysis": {
                "total_dimensions": len(dimensions),
                "total_query_patterns": len(query_patterns),
                "dimension_usage": dimension_usage,
                "high_correlation_pairs": [(pair, score) for pair, score in correlations.items() if score > 0.7]
            },
            "optimization": {
                "original_cuboids": original_cuboids,
                "optimized_cuboids": optimized_cuboids,
                "reduction_ratio": reduction_ratio,
                "aggregation_groups": optimized_groups
            },
            "recommendations": self._generate_recommendations(dimension_usage, correlations, reduction_ratio)
        }
    
    def _generate_recommendations(self, dimension_usage: Dict[str, float], 
                                correlations: Dict[Tuple[str, str], float],
                                reduction_ratio: float) -> List[Dict]:
        """生成优化建议"""
        recommendations = []
        
        # 低使用率维度建议
        low_usage_dims = [dim for dim, usage in dimension_usage.items() if usage < 0.1]
        if low_usage_dims:
            recommendations.append({
                "category": "dimensions",
                "priority": "medium",
                "title": "移除低使用率维度",
                "description": f"以下维度使用率低于10%: {', '.join(low_usage_dims)}",
                "action": "考虑从模型中移除这些维度以减少Cube复杂度"
            })
        
        # 高相关性维度建议
        high_corr_pairs = [(pair, score) for pair, score in correlations.items() if score > 0.9]
        if high_corr_pairs:
            recommendations.append({
                "category": "aggregation",
                "priority": "high",
                "title": "配置联合维度",
                "description": f"发现{len(high_corr_pairs)}对高相关性维度",
                "action": "将高相关性维度配置为联合维度以减少Cuboid数量"
            })
        
        # 优化效果建议
        if reduction_ratio > 0.8:
            recommendations.append({
                "category": "performance",
                "priority": "high",
                "title": "优化效果显著",
                "description": f"Cuboid数量减少了{reduction_ratio:.1%}",
                "action": "应用建议的聚合组配置以提高性能"
            })
        elif reduction_ratio < 0.3:
            recommendations.append({
                "category": "performance",
                "priority": "medium",
                "title": "优化空间有限",
                "description": f"Cuboid数量仅减少了{reduction_ratio:.1%}",
                "action": "考虑重新设计数据模型或调整查询模式"
            })
        
        return recommendations

def main():
    # 示例使用
    optimizer = AggregationGroupOptimizer()
    
    # 示例维度
    dimensions = [
        "TIME_YEAR", "TIME_QUARTER", "TIME_MONTH", "TIME_WEEK", "TIME_DAY",
        "PRODUCT_CATEGORY", "PRODUCT_SUBCATEGORY", "PRODUCT_BRAND",
        "CUSTOMER_SEGMENT", "CUSTOMER_TYPE", "CUSTOMER_INDUSTRY",
        "GEO_COUNTRY", "GEO_REGION", "GEO_STATE", "GEO_CITY",
        "CHANNEL", "PROMOTION"
    ]
    
    # 示例查询模式
    query_patterns = [
        {"dimensions": ["TIME_YEAR", "TIME_QUARTER", "PRODUCT_CATEGORY"], "frequency": 50},
        {"dimensions": ["TIME_MONTH", "CUSTOMER_SEGMENT", "GEO_COUNTRY"], "frequency": 30},
        {"dimensions": ["TIME_DAY", "PRODUCT_BRAND", "CHANNEL"], "frequency": 20},
        {"dimensions": ["CUSTOMER_TYPE", "CUSTOMER_INDUSTRY"], "frequency": 40},
        {"dimensions": ["GEO_REGION", "GEO_STATE", "GEO_CITY"], "frequency": 15}
    ]
    
    # 生成优化报告
    report = optimizer.generate_optimization_report(dimensions, query_patterns)
    
    print("=== 聚合组优化报告 ===")
    print(f"维度数量: {report['analysis']['total_dimensions']}")
    print(f"查询模式数量: {report['analysis']['total_query_patterns']}")
    print(f"原始Cuboid数量: {report['optimization']['original_cuboids']:,}")
    print(f"优化后Cuboid数量: {report['optimization']['optimized_cuboids']:,}")
    print(f"减少比例: {report['optimization']['reduction_ratio']:.1%}")
    
    print("\n优化建议:")
    for i, rec in enumerate(report['recommendations'], 1):
        print(f"{i}. {rec['title']} ({rec['priority']})")
        print(f"   {rec['description']}")
        print(f"   {rec['action']}")

if __name__ == "__main__":
    main()

2. 存储优化

#!/usr/bin/env python3
# storage_optimizer.py - 存储优化工具

import json
import os
from typing import Dict, List, Tuple
from dataclasses import dataclass
import math

@dataclass
class StorageMetrics:
    """存储指标"""
    total_size_gb: float
    segment_count: int
    avg_segment_size_mb: float
    compression_ratio: float
    storage_efficiency: float

class StorageOptimizer:
    """存储优化器"""
    
    def __init__(self):
        self.optimal_segment_size_mb = 1024  # 1GB
        self.max_segments_per_cube = 50
        self.min_compression_ratio = 5.0
    
    def analyze_storage_efficiency(self, cube_info: Dict) -> StorageMetrics:
        """分析存储效率"""
        segments = cube_info.get("segments", [])
        
        if not segments:
            return StorageMetrics(0, 0, 0, 0, 0)
        
        # 计算基础指标
        total_size_kb = sum(s.get("size_kb", 0) for s in segments)
        total_size_gb = total_size_kb / (1024 * 1024)
        segment_count = len(segments)
        avg_segment_size_mb = (total_size_kb / segment_count / 1024) if segment_count > 0 else 0
        
        # 计算压缩比
        total_input_records = sum(s.get("input_records", 0) for s in segments)
        estimated_raw_size_kb = total_input_records  # 假设每行1KB
        compression_ratio = estimated_raw_size_kb / total_size_kb if total_size_kb > 0 else 0
        
        # 计算存储效率(0-100分)
        storage_efficiency = self._calculate_storage_efficiency(
            segment_count, avg_segment_size_mb, compression_ratio
        )
        
        return StorageMetrics(
            total_size_gb=total_size_gb,
            segment_count=segment_count,
            avg_segment_size_mb=avg_segment_size_mb,
            compression_ratio=compression_ratio,
            storage_efficiency=storage_efficiency
        )
    
    def _calculate_storage_efficiency(self, segment_count: int, 
                                    avg_segment_size_mb: float,
                                    compression_ratio: float) -> float:
        """计算存储效率评分"""
        score = 100.0
        
        # 段数量评分(段数过多扣分)
        if segment_count > self.max_segments_per_cube:
            score -= min(30, (segment_count - self.max_segments_per_cube) * 2)
        
        # 段大小评分(偏离最优大小扣分)
        size_deviation = abs(avg_segment_size_mb - self.optimal_segment_size_mb) / self.optimal_segment_size_mb
        score -= min(25, size_deviation * 50)
        
        # 压缩比评分(压缩比低扣分)
        if compression_ratio < self.min_compression_ratio:
            score -= min(25, (self.min_compression_ratio - compression_ratio) * 5)
        
        return max(0, score)
    
    def generate_storage_optimization_plan(self, cube_info: Dict) -> Dict:
        """生成存储优化方案"""
        metrics = self.analyze_storage_efficiency(cube_info)
        segments = cube_info.get("segments", [])
        
        optimization_plan = {
            "current_metrics": metrics,
            "optimizations": [],
            "expected_improvements": {}
        }
        
        # 段合并优化
        if metrics.segment_count > self.max_segments_per_cube:
            merge_plan = self._plan_segment_merging(segments)
            optimization_plan["optimizations"].append(merge_plan)
        
        # 压缩优化
        if metrics.compression_ratio < self.min_compression_ratio:
            compression_plan = self._plan_compression_optimization(cube_info)
            optimization_plan["optimizations"].append(compression_plan)
        
        # 分区优化
        partition_plan = self._plan_partition_optimization(cube_info)
        if partition_plan:
            optimization_plan["optimizations"].append(partition_plan)
        
        # 计算预期改进
        optimization_plan["expected_improvements"] = self._calculate_expected_improvements(
            metrics, optimization_plan["optimizations"]
        )
        
        return optimization_plan
    
    def _plan_segment_merging(self, segments: List[Dict]) -> Dict:
        """规划段合并"""
        # 按大小排序,优先合并小段
        small_segments = [s for s in segments if s.get("size_kb", 0) < 500 * 1024]  # <500MB
        
        merge_groups = []
        current_group = []
        current_size = 0
        
        for segment in sorted(small_segments, key=lambda x: x.get("size_kb", 0)):
            segment_size = segment.get("size_kb", 0)
            
            if current_size + segment_size <= self.optimal_segment_size_mb * 1024:
                current_group.append(segment)
                current_size += segment_size
            else:
                if len(current_group) > 1:
                    merge_groups.append(current_group)
                current_group = [segment]
                current_size = segment_size
        
        if len(current_group) > 1:
            merge_groups.append(current_group)
        
        return {
            "type": "segment_merging",
            "description": f"合并{len(merge_groups)}组小段",
            "details": {
                "merge_groups": len(merge_groups),
                "segments_to_merge": sum(len(group) for group in merge_groups),
                "estimated_size_reduction_gb": self._estimate_merge_size_reduction(merge_groups)
            },
            "priority": "high" if len(merge_groups) > 5 else "medium"
        }
    
    def _plan_compression_optimization(self, cube_info: Dict) -> Dict:
        """规划压缩优化"""
        current_algorithm = cube_info.get("override_kylin_properties", {}).get("kylin.cube.algorithm", "auto")
        
        recommendations = []
        
        # 推荐更好的压缩算法
        if current_algorithm != "inmem":
            recommendations.append("使用内存算法提高压缩效率")
        
        # 推荐调整编码方式
        dimensions = cube_info.get("dimensions", [])
        for dim in dimensions:
            if dim.get("encoding") != "dict":
                recommendations.append(f"为维度{dim['name']}使用字典编码")
        
        return {
            "type": "compression_optimization",
            "description": "优化压缩配置",
            "details": {
                "current_algorithm": current_algorithm,
                "recommendations": recommendations,
                "estimated_compression_improvement": "20-30%"
            },
            "priority": "medium"
        }
    
    def _plan_partition_optimization(self, cube_info: Dict) -> Dict:
        """规划分区优化"""
        partition_column = cube_info.get("partition_date_column")
        segments = cube_info.get("segments", [])
        
        if not partition_column or not segments:
            return None
        
        # 分析分区分布
        partition_analysis = self._analyze_partition_distribution(segments)
        
        recommendations = []
        
        # 检查分区大小不均
        if partition_analysis["size_variance"] > 0.5:
            recommendations.append("调整分区策略以平衡分区大小")
        
        # 检查分区数量
        if partition_analysis["partition_count"] > 100:
            recommendations.append("考虑使用更粗粒度的分区")
        
        if not recommendations:
            return None
        
        return {
            "type": "partition_optimization",
            "description": "优化分区策略",
            "details": {
                "current_partitions": partition_analysis["partition_count"],
                "size_variance": partition_analysis["size_variance"],
                "recommendations": recommendations
            },
            "priority": "low"
        }
    
    def _analyze_partition_distribution(self, segments: List[Dict]) -> Dict:
        """分析分区分布"""
        if not segments:
            return {"partition_count": 0, "size_variance": 0}
        
        sizes = [s.get("size_kb", 0) for s in segments]
        avg_size = sum(sizes) / len(sizes)
        variance = sum((size - avg_size) ** 2 for size in sizes) / len(sizes)
        size_variance = (variance ** 0.5) / avg_size if avg_size > 0 else 0
        
        return {
            "partition_count": len(segments),
            "size_variance": size_variance
        }
    
    def _estimate_merge_size_reduction(self, merge_groups: List[List[Dict]]) -> float:
        """估算合并后的大小减少"""
        total_reduction = 0
        
        for group in merge_groups:
            if len(group) > 1:
                group_size = sum(s.get("size_kb", 0) for s in group)
                # 假设合并后减少5%的存储空间
                reduction = group_size * 0.05
                total_reduction += reduction
        
        return total_reduction / (1024 * 1024)  # 转换为GB
    
    def _calculate_expected_improvements(self, current_metrics: StorageMetrics, 
                                       optimizations: List[Dict]) -> Dict:
        """计算预期改进"""
        improvements = {
            "storage_size_reduction_gb": 0,
            "segment_count_reduction": 0,
            "efficiency_score_improvement": 0
        }
        
        for opt in optimizations:
            if opt["type"] == "segment_merging":
                improvements["storage_size_reduction_gb"] += opt["details"].get("estimated_size_reduction_gb", 0)
                improvements["segment_count_reduction"] += opt["details"].get("segments_to_merge", 0) // 2
            elif opt["type"] == "compression_optimization":
                improvements["storage_size_reduction_gb"] += current_metrics.total_size_gb * 0.25
        
        # 计算效率评分改进
        new_segment_count = current_metrics.segment_count - improvements["segment_count_reduction"]
        new_compression_ratio = current_metrics.compression_ratio * 1.25  # 假设压缩比提升25%
        
        new_efficiency = self._calculate_storage_efficiency(
            new_segment_count, current_metrics.avg_segment_size_mb, new_compression_ratio
        )
        improvements["efficiency_score_improvement"] = new_efficiency - current_metrics.storage_efficiency
        
        return improvements
    
    def print_storage_report(self, cube_name: str, cube_info: Dict):
        """打印存储报告"""
        metrics = self.analyze_storage_efficiency(cube_info)
        optimization_plan = self.generate_storage_optimization_plan(cube_info)
        
        print(f"\n=== {cube_name} 存储分析报告 ===")
        
        # 当前指标
        print(f"\n📊 当前存储指标:")
        print(f"  总大小: {metrics.total_size_gb:.2f} GB")
        print(f"  段数量: {metrics.segment_count}")
        print(f"  平均段大小: {metrics.avg_segment_size_mb:.2f} MB")
        print(f"  压缩比: {metrics.compression_ratio:.2f}")
        print(f"  存储效率: {metrics.storage_efficiency:.1f}/100")
        
        # 优化建议
        optimizations = optimization_plan["optimizations"]
        if optimizations:
            print(f"\n🔧 优化建议:")
            for i, opt in enumerate(optimizations, 1):
                priority_icon = {'high': '🔴', 'medium': '🟡', 'low': '🟢'}.get(opt['priority'], '⚪')
                print(f"\n{priority_icon} {i}. {opt['description']} ({opt['priority']})")
                
                if opt["type"] == "segment_merging":
                    details = opt["details"]
                    print(f"   合并组数: {details['merge_groups']}")
                    print(f"   涉及段数: {details['segments_to_merge']}")
                    print(f"   预计减少: {details['estimated_size_reduction_gb']:.2f} GB")
                elif opt["type"] == "compression_optimization":
                    details = opt["details"]
                    print(f"   当前算法: {details['current_algorithm']}")
                    print(f"   预计改进: {details['estimated_compression_improvement']}")
                    for rec in details["recommendations"]:
                        print(f"   - {rec}")
        
        # 预期改进
        improvements = optimization_plan["expected_improvements"]
        if any(improvements.values()):
            print(f"\n📈 预期改进:")
            if improvements["storage_size_reduction_gb"] > 0:
                print(f"  存储减少: {improvements['storage_size_reduction_gb']:.2f} GB")
            if improvements["segment_count_reduction"] > 0:
                print(f"  段数减少: {improvements['segment_count_reduction']}")
            if improvements["efficiency_score_improvement"] > 0:
                print(f"  效率提升: +{improvements['efficiency_score_improvement']:.1f}分")

def main():
    # 示例使用
    optimizer = StorageOptimizer()
    
    # 示例Cube信息
    cube_info = {
        "name": "sales_cube",
        "segments": [
            {"size_kb": 102400, "input_records": 100000, "date_range_start": 1609459200000},  # 100MB
            {"size_kb": 204800, "input_records": 200000, "date_range_start": 1612137600000},  # 200MB
            {"size_kb": 51200, "input_records": 50000, "date_range_start": 1614556800000},   # 50MB
            {"size_kb": 1048576, "input_records": 1000000, "date_range_start": 1617235200000}, # 1GB
        ],
        "dimensions": [
            {"name": "TIME", "encoding": "date"},
            {"name": "PRODUCT", "encoding": "dict"},
            {"name": "CUSTOMER", "encoding": "int"}
        ],
        "override_kylin_properties": {
            "kylin.cube.algorithm": "auto"
        },
        "partition_date_column": "TIME.DATE_VALUE"
    }
    
    optimizer.print_storage_report("sales_cube", cube_info)

if __name__ == "__main__":
    main()

3. 构建优化

#!/usr/bin/env python3
# build_optimizer.py - 构建优化工具

import json
import time
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging

@dataclass
class BuildMetrics:
    """构建指标"""
    avg_build_time_minutes: float
    build_success_rate: float
    resource_utilization: float
    parallel_efficiency: float
    build_frequency_per_day: float

class BuildOptimizer:
    """构建优化器"""
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.optimal_build_time_minutes = 30
        self.max_parallel_builds = 4
        self.target_success_rate = 0.95
    
    def analyze_build_performance(self, build_history: List[Dict]) -> BuildMetrics:
        """分析构建性能"""
        if not build_history:
            return BuildMetrics(0, 0, 0, 0, 0)
        
        # 计算平均构建时间
        build_times = []
        successful_builds = 0
        
        for build in build_history:
            duration = build.get("duration", 0)
            if duration > 0:
                build_times.append(duration / (60 * 1000))  # 转换为分钟
            
            if build.get("status") == "FINISHED":
                successful_builds += 1
        
        avg_build_time = sum(build_times) / len(build_times) if build_times else 0
        success_rate = successful_builds / len(build_history) if build_history else 0
        
        # 计算资源利用率(基于构建时间分布)
        resource_utilization = self._calculate_resource_utilization(build_history)
        
        # 计算并行效率
        parallel_efficiency = self._calculate_parallel_efficiency(build_history)
        
        # 计算构建频率
        build_frequency = self._calculate_build_frequency(build_history)
        
        return BuildMetrics(
            avg_build_time_minutes=avg_build_time,
            build_success_rate=success_rate,
            resource_utilization=resource_utilization,
            parallel_efficiency=parallel_efficiency,
            build_frequency_per_day=build_frequency
        )
    
    def _calculate_resource_utilization(self, build_history: List[Dict]) -> float:
        """计算资源利用率"""
        if not build_history:
            return 0.0
        
        # 分析构建时间分布,计算资源使用的均匀程度
        build_hours = {}
        for build in build_history:
            start_time = build.get("create_time", 0)
            if start_time > 0:
                hour = datetime.fromtimestamp(start_time / 1000).hour
                build_hours[hour] = build_hours.get(hour, 0) + 1
        
        if not build_hours:
            return 0.0
        
        # 计算分布的均匀程度
        avg_builds_per_hour = sum(build_hours.values()) / 24
        variance = sum((count - avg_builds_per_hour) ** 2 for count in build_hours.values()) / 24
        
        # 利用率 = 1 - 标准差/平均值(归一化到0-1)
        if avg_builds_per_hour > 0:
            cv = (variance ** 0.5) / avg_builds_per_hour
            return max(0, 1 - min(1, cv))
        
        return 0.0
    
    def _calculate_parallel_efficiency(self, build_history: List[Dict]) -> float:
        """计算并行效率"""
        if not build_history:
            return 0.0
        
        # 分析同时运行的构建任务数量
        concurrent_builds = []
        
        for i, build in enumerate(build_history):
            start_time = build.get("create_time", 0)
            end_time = start_time + build.get("duration", 0)
            
            concurrent_count = 1  # 包括当前构建
            for other_build in build_history:
                if other_build == build:
                    continue
                
                other_start = other_build.get("create_time", 0)
                other_end = other_start + other_build.get("duration", 0)
                
                # 检查时间重叠
                if (start_time < other_end and end_time > other_start):
                    concurrent_count += 1
            
            concurrent_builds.append(concurrent_count)
        
        if not concurrent_builds:
            return 0.0
        
        avg_concurrent = sum(concurrent_builds) / len(concurrent_builds)
        return min(1.0, avg_concurrent / self.max_parallel_builds)
    
    def _calculate_build_frequency(self, build_history: List[Dict]) -> float:
        """计算构建频率"""
        if len(build_history) < 2:
            return 0.0
        
        # 计算时间跨度
        start_times = [b.get("create_time", 0) for b in build_history if b.get("create_time", 0) > 0]
        if len(start_times) < 2:
            return 0.0
        
        time_span_days = (max(start_times) - min(start_times)) / (24 * 3600 * 1000)
        return len(build_history) / time_span_days if time_span_days > 0 else 0
    
    def generate_build_optimization_plan(self, cube_info: Dict, 
                                       build_history: List[Dict]) -> Dict:
        """生成构建优化方案"""
        metrics = self.analyze_build_performance(build_history)
        
        optimization_plan = {
            "current_metrics": metrics,
            "optimizations": [],
            "configuration_recommendations": {}
        }
        
        # 构建时间优化
        if metrics.avg_build_time_minutes > self.optimal_build_time_minutes:
            time_optimization = self._plan_build_time_optimization(cube_info, metrics)
            optimization_plan["optimizations"].append(time_optimization)
        
        # 成功率优化
        if metrics.build_success_rate < self.target_success_rate:
            reliability_optimization = self._plan_reliability_optimization(build_history)
            optimization_plan["optimizations"].append(reliability_optimization)
        
        # 资源利用率优化
        if metrics.resource_utilization < 0.7:
            resource_optimization = self._plan_resource_optimization(build_history)
            optimization_plan["optimizations"].append(resource_optimization)
        
        # 并行优化
        if metrics.parallel_efficiency < 0.5:
            parallel_optimization = self._plan_parallel_optimization(cube_info)
            optimization_plan["optimizations"].append(parallel_optimization)
        
        # 生成配置建议
        optimization_plan["configuration_recommendations"] = self._generate_config_recommendations(
            cube_info, metrics
        )
        
        return optimization_plan
    
    def _plan_build_time_optimization(self, cube_info: Dict, metrics: BuildMetrics) -> Dict:
        """规划构建时间优化"""
        recommendations = []
        
        # 分析Cube复杂度
        dimensions = cube_info.get("dimensions", [])
        aggregation_groups = cube_info.get("aggregation_groups", [])
        
        if len(dimensions) > 15:
            recommendations.append("减少维度数量或优化聚合组配置")
        
        # 检查算法配置
        algorithm = cube_info.get("override_kylin_properties", {}).get("kylin.cube.algorithm", "auto")
        if algorithm == "auto":
            recommendations.append("指定具体的构建算法(layer或inmem)")
        
        # 检查分区策略
        partition_column = cube_info.get("partition_date_column")
        if not partition_column:
            recommendations.append("配置分区列以支持增量构建")
        
        return {
            "type": "build_time_optimization",
            "description": f"优化构建时间(当前: {metrics.avg_build_time_minutes:.1f}分钟)",
            "details": {
                "current_time_minutes": metrics.avg_build_time_minutes,
                "target_time_minutes": self.optimal_build_time_minutes,
                "recommendations": recommendations
            },
            "priority": "high" if metrics.avg_build_time_minutes > 60 else "medium"
        }
    
    def _plan_reliability_optimization(self, build_history: List[Dict]) -> Dict:
        """规划可靠性优化"""
        failed_builds = [b for b in build_history if b.get("status") in ["ERROR", "DISCARDED"]]
        
        # 分析失败原因
        failure_reasons = {}
        for build in failed_builds:
            error_msg = build.get("error_message", "Unknown error")
            # 简化错误分类
            if "memory" in error_msg.lower() or "heap" in error_msg.lower():
                failure_reasons["memory"] = failure_reasons.get("memory", 0) + 1
            elif "timeout" in error_msg.lower():
                failure_reasons["timeout"] = failure_reasons.get("timeout", 0) + 1
            elif "connection" in error_msg.lower():
                failure_reasons["connection"] = failure_reasons.get("connection", 0) + 1
            else:
                failure_reasons["other"] = failure_reasons.get("other", 0) + 1
        
        recommendations = []
        if failure_reasons.get("memory", 0) > 0:
            recommendations.append("增加构建节点内存配置")
        if failure_reasons.get("timeout", 0) > 0:
            recommendations.append("调整构建超时时间配置")
        if failure_reasons.get("connection", 0) > 0:
            recommendations.append("检查网络连接和数据源稳定性")
        
        return {
            "type": "reliability_optimization",
            "description": "提高构建成功率",
            "details": {
                "failed_builds": len(failed_builds),
                "failure_reasons": failure_reasons,
                "recommendations": recommendations
            },
            "priority": "high"
        }
    
    def _plan_resource_optimization(self, build_history: List[Dict]) -> Dict:
        """规划资源优化"""
        # 分析构建时间分布
        build_hours = {}
        for build in build_history:
            start_time = build.get("create_time", 0)
            if start_time > 0:
                hour = datetime.fromtimestamp(start_time / 1000).hour
                build_hours[hour] = build_hours.get(hour, 0) + 1
        
        # 找出高峰和低谷时段
        peak_hours = [h for h, count in build_hours.items() if count > 2]
        low_hours = [h for h in range(24) if h not in build_hours or build_hours[h] == 0]
        
        recommendations = []
        if peak_hours:
            recommendations.append(f"高峰时段({peak_hours})考虑限制并发构建数量")
        if low_hours:
            recommendations.append(f"低谷时段({low_hours})可以安排大型构建任务")
        
        return {
            "type": "resource_optimization",
            "description": "优化资源利用率",
            "details": {
                "peak_hours": peak_hours,
                "low_hours": low_hours,
                "recommendations": recommendations
            },
            "priority": "medium"
        }
    
    def _plan_parallel_optimization(self, cube_info: Dict) -> Dict:
        """规划并行优化"""
        recommendations = []
        
        # 检查是否支持分区并行构建
        partition_column = cube_info.get("partition_date_column")
        if partition_column:
            recommendations.append("启用分区并行构建")
        
        # 检查聚合组配置
        aggregation_groups = cube_info.get("aggregation_groups", [])
        if len(aggregation_groups) > 1:
            recommendations.append("配置聚合组并行构建")
        
        return {
            "type": "parallel_optimization",
            "description": "提高并行构建效率",
            "details": {
                "supports_partition_parallel": bool(partition_column),
                "aggregation_groups_count": len(aggregation_groups),
                "recommendations": recommendations
            },
            "priority": "medium"
        }
    
    def _generate_config_recommendations(self, cube_info: Dict, metrics: BuildMetrics) -> Dict:
        """生成配置建议"""
        config_recommendations = {
            "kylin_properties": {},
            "cube_properties": {},
            "system_properties": {}
        }
        
        # Kylin配置建议
        if metrics.avg_build_time_minutes > 60:
            config_recommendations["kylin_properties"]["kylin.job.max-concurrent-jobs"] = "3"
            config_recommendations["kylin_properties"]["kylin.cube.algorithm"] = "inmem"
        
        if metrics.build_success_rate < 0.9:
            config_recommendations["kylin_properties"]["kylin.job.retry"] = "3"
            config_recommendations["kylin_properties"]["kylin.job.timeout.seconds"] = "7200"
        
        # Cube配置建议
        dimensions = cube_info.get("dimensions", [])
        if len(dimensions) > 10:
            config_recommendations["cube_properties"]["kylin.cube.aggrgroup.max-combination"] = "16384"
        
        # 系统配置建议
        if metrics.avg_build_time_minutes > 30:
            config_recommendations["system_properties"]["spark.executor.memory"] = "4g"
            config_recommendations["system_properties"]["spark.executor.cores"] = "2"
        
        return config_recommendations
    
    def print_build_report(self, cube_name: str, build_history: List[Dict], cube_info: Dict):
        """打印构建报告"""
        metrics = self.analyze_build_performance(build_history)
        optimization_plan = self.generate_build_optimization_plan(cube_info, build_history)
        
        print(f"\n=== {cube_name} 构建性能报告 ===")
        
        # 当前指标
        print(f"\n📊 构建指标:")
        print(f"  平均构建时间: {metrics.avg_build_time_minutes:.1f} 分钟")
        print(f"  构建成功率: {metrics.build_success_rate:.1%}")
        print(f"  资源利用率: {metrics.resource_utilization:.1%}")
        print(f"  并行效率: {metrics.parallel_efficiency:.1%}")
        print(f"  构建频率: {metrics.build_frequency_per_day:.1f} 次/天")
        
        # 优化建议
        optimizations = optimization_plan["optimizations"]
        if optimizations:
            print(f"\n🔧 优化建议:")
            for i, opt in enumerate(optimizations, 1):
                priority_icon = {'high': '🔴', 'medium': '🟡', 'low': '🟢'}.get(opt['priority'], '⚪')
                print(f"\n{priority_icon} {i}. {opt['description']} ({opt['priority']})")
                
                for rec in opt["details"].get("recommendations", []):
                    print(f"   - {rec}")
        
        # 配置建议
        config_recs = optimization_plan["configuration_recommendations"]
        if any(config_recs.values()):
            print(f"\n⚙️ 配置建议:")
            
            for category, configs in config_recs.items():
                if configs:
                    print(f"\n  {category.replace('_', ' ').title()}:")
                    for key, value in configs.items():
                        print(f"    {key} = {value}")

def main():
    # 示例使用
    optimizer = BuildOptimizer()
    
    # 示例构建历史
    build_history = [
        {
            "uuid": "job1",
            "status": "FINISHED",
            "create_time": int(time.time() * 1000) - 3600000,  # 1小时前
            "duration": 1800000,  # 30分钟
            "error_message": None
        },
        {
            "uuid": "job2",
            "status": "ERROR",
            "create_time": int(time.time() * 1000) - 7200000,  # 2小时前
            "duration": 3600000,  # 60分钟
            "error_message": "OutOfMemoryError: Java heap space"
        },
        {
            "uuid": "job3",
            "status": "FINISHED",
            "create_time": int(time.time() * 1000) - 10800000,  # 3小时前
            "duration": 2400000,  # 40分钟
            "error_message": None
        }
    ]
    
    # 示例Cube信息
    cube_info = {
        "name": "sales_cube",
        "dimensions": [{"name": f"dim_{i}"} for i in range(12)],
        "aggregation_groups": [{"includes": [f"dim_{i}" for i in range(6)]}],
        "partition_date_column": "TIME.DATE_VALUE",
        "override_kylin_properties": {
            "kylin.cube.algorithm": "auto"
        }
    }
    
    optimizer.print_build_report("sales_cube", build_history, cube_info)

if __name__ == "__main__":
    main()

4. 查询优化

#!/usr/bin/env python3
# query_optimizer.py - 查询优化工具

import json
import re
from typing import Dict, List, Set, Tuple
from dataclasses import dataclass
import logging

@dataclass
class QueryMetrics:
    """查询指标"""
    avg_response_time_ms: float
    query_success_rate: float
    cache_hit_rate: float
    cuboid_hit_rate: float
    scan_count_avg: float

class QueryOptimizer:
    """查询优化器"""
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.target_response_time_ms = 5000  # 5秒
        self.min_cache_hit_rate = 0.8
        self.min_cuboid_hit_rate = 0.9
    
    def analyze_query_performance(self, query_history: List[Dict]) -> QueryMetrics:
        """分析查询性能"""
        if not query_history:
            return QueryMetrics(0, 0, 0, 0, 0)
        
        # 计算响应时间
        response_times = []
        successful_queries = 0
        cache_hits = 0
        cuboid_hits = 0
        scan_counts = []
        
        for query in query_history:
            duration = query.get("duration", 0)
            if duration > 0:
                response_times.append(duration)
            
            if query.get("exception") is None:
                successful_queries += 1
            
            if query.get("cache_hit", False):
                cache_hits += 1
            
            if query.get("exactly_match", False):
                cuboid_hits += 1
            
            scan_count = query.get("scan_count", 0)
            if scan_count > 0:
                scan_counts.append(scan_count)
        
        avg_response_time = sum(response_times) / len(response_times) if response_times else 0
        success_rate = successful_queries / len(query_history) if query_history else 0
        cache_hit_rate = cache_hits / len(query_history) if query_history else 0
        cuboid_hit_rate = cuboid_hits / len(query_history) if query_history else 0
        avg_scan_count = sum(scan_counts) / len(scan_counts) if scan_counts else 0
        
        return QueryMetrics(
            avg_response_time_ms=avg_response_time,
            query_success_rate=success_rate,
            cache_hit_rate=cache_hit_rate,
            cuboid_hit_rate=cuboid_hit_rate,
            scan_count_avg=avg_scan_count
        )
    
    def analyze_query_patterns(self, query_history: List[Dict]) -> Dict:
        """分析查询模式"""
        patterns = {
            "frequent_dimensions": {},
            "frequent_measures": {},
            "time_patterns": {},
            "filter_patterns": {},
            "groupby_patterns": {}
        }
        
        for query in query_history:
            sql = query.get("sql", "")
            if not sql:
                continue
            
            # 分析维度使用频率
            dimensions = self._extract_dimensions_from_sql(sql)
            for dim in dimensions:
                patterns["frequent_dimensions"][dim] = patterns["frequent_dimensions"].get(dim, 0) + 1
            
            # 分析度量使用频率
            measures = self._extract_measures_from_sql(sql)
            for measure in measures:
                patterns["frequent_measures"][measure] = patterns["frequent_measures"].get(measure, 0) + 1
            
            # 分析时间模式
            time_filters = self._extract_time_filters(sql)
            for time_filter in time_filters:
                patterns["time_patterns"][time_filter] = patterns["time_patterns"].get(time_filter, 0) + 1
            
            # 分析过滤模式
            filters = self._extract_filters(sql)
            for filter_expr in filters:
                patterns["filter_patterns"][filter_expr] = patterns["filter_patterns"].get(filter_expr, 0) + 1
            
            # 分析GROUP BY模式
            groupby_cols = self._extract_groupby_columns(sql)
            groupby_key = ",".join(sorted(groupby_cols))
            if groupby_key:
                patterns["groupby_patterns"][groupby_key] = patterns["groupby_patterns"].get(groupby_key, 0) + 1
        
        # 排序并取前10
        for pattern_type in patterns:
            patterns[pattern_type] = dict(sorted(
                patterns[pattern_type].items(), 
                key=lambda x: x[1], 
                reverse=True
            )[:10])
        
        return patterns
    
    def _extract_dimensions_from_sql(self, sql: str) -> List[str]:
        """从SQL中提取维度"""
        # 简化的维度提取逻辑
        dimensions = []
        
        # 查找SELECT子句中的维度列
        select_match = re.search(r'SELECT\s+(.+?)\s+FROM', sql, re.IGNORECASE | re.DOTALL)
        if select_match:
            select_clause = select_match.group(1)
            # 排除聚合函数,提取普通列
            cols = re.findall(r'(?<!\w)([a-zA-Z_][a-zA-Z0-9_]*\.[a-zA-Z_][a-zA-Z0-9_]*)(?!\s*\()', select_clause)
            dimensions.extend(cols)
        
        # 查找GROUP BY子句中的维度
        groupby_match = re.search(r'GROUP\s+BY\s+(.+?)(?:\s+ORDER|\s+HAVING|\s+LIMIT|$)', sql, re.IGNORECASE)
        if groupby_match:
            groupby_clause = groupby_match.group(1)
            cols = re.findall(r'([a-zA-Z_][a-zA-Z0-9_]*\.[a-zA-Z_][a-zA-Z0-9_]*)', groupby_clause)
            dimensions.extend(cols)
        
        return list(set(dimensions))
    
    def _extract_measures_from_sql(self, sql: str) -> List[str]:
        """从SQL中提取度量"""
        measures = []
        
        # 查找聚合函数
        agg_functions = re.findall(r'(SUM|COUNT|AVG|MAX|MIN)\s*\([^)]+\)', sql, re.IGNORECASE)
        measures.extend(agg_functions)
        
        return measures
    
    def _extract_time_filters(self, sql: str) -> List[str]:
        """提取时间过滤条件"""
        time_filters = []
        
        # 查找日期相关的WHERE条件
        date_patterns = [
            r'\w+\.\w*date\w*\s*[><=]+\s*[\'"][^\'"]++[\'"]',
            r'\w+\.\w*time\w*\s*[><=]+\s*[\'"][^\'"]++[\'"]',
            r'\w+\.\w*year\w*\s*[><=]+\s*\d+',
            r'\w+\.\w*month\w*\s*[><=]+\s*\d+'
        ]
        
        for pattern in date_patterns:
            matches = re.findall(pattern, sql, re.IGNORECASE)
            time_filters.extend(matches)
        
        return time_filters
    
    def _extract_filters(self, sql: str) -> List[str]:
        """提取过滤条件"""
        filters = []
        
        # 查找WHERE子句
        where_match = re.search(r'WHERE\s+(.+?)(?:\s+GROUP|\s+ORDER|\s+LIMIT|$)', sql, re.IGNORECASE | re.DOTALL)
        if where_match:
            where_clause = where_match.group(1)
            # 简化的过滤条件提取
            filter_conditions = re.findall(r'\w+\.\w+\s*[><=!]+\s*[\w\'"][^\s]+', where_clause)
            filters.extend(filter_conditions)
        
        return filters
    
    def _extract_groupby_columns(self, sql: str) -> List[str]:
        """提取GROUP BY列"""
        groupby_match = re.search(r'GROUP\s+BY\s+(.+?)(?:\s+ORDER|\s+HAVING|\s+LIMIT|$)', sql, re.IGNORECASE)
        if groupby_match:
            groupby_clause = groupby_match.group(1)
            columns = [col.strip() for col in groupby_clause.split(',')]
            return [col for col in columns if col]
        return []
    
    def generate_query_optimization_plan(self, cube_info: Dict, 
                                       query_history: List[Dict]) -> Dict:
        """生成查询优化方案"""
        metrics = self.analyze_query_performance(query_history)
        patterns = self.analyze_query_patterns(query_history)
        
        optimization_plan = {
            "current_metrics": metrics,
            "query_patterns": patterns,
            "optimizations": [],
            "index_recommendations": [],
            "cube_recommendations": []
        }
        
        # 响应时间优化
        if metrics.avg_response_time_ms > self.target_response_time_ms:
            response_time_opt = self._plan_response_time_optimization(metrics, patterns)
            optimization_plan["optimizations"].append(response_time_opt)
        
        # 缓存优化
        if metrics.cache_hit_rate < self.min_cache_hit_rate:
            cache_opt = self._plan_cache_optimization(patterns)
            optimization_plan["optimizations"].append(cache_opt)
        
        # Cuboid优化
        if metrics.cuboid_hit_rate < self.min_cuboid_hit_rate:
            cuboid_opt = self._plan_cuboid_optimization(cube_info, patterns)
            optimization_plan["optimizations"].append(cuboid_opt)
        
        # 生成索引建议
        optimization_plan["index_recommendations"] = self._generate_index_recommendations(patterns)
        
        # 生成Cube建议
        optimization_plan["cube_recommendations"] = self._generate_cube_recommendations(
            cube_info, patterns
        )
        
        return optimization_plan
    
    def _plan_response_time_optimization(self, metrics: QueryMetrics, patterns: Dict) -> Dict:
        """规划响应时间优化"""
        recommendations = []
        
        # 基于扫描行数分析
        if metrics.scan_count_avg > 1000000:  # 超过100万行
            recommendations.append("优化查询过滤条件,减少扫描行数")
        
        # 基于查询模式分析
        frequent_dims = patterns.get("frequent_dimensions", {})
        if len(frequent_dims) > 10:
            recommendations.append("考虑创建针对高频维度的专用Cube")
        
        # 基于GROUP BY模式
        groupby_patterns = patterns.get("groupby_patterns", {})
        if len(groupby_patterns) > 20:
            recommendations.append("优化聚合组配置以覆盖常用查询模式")
        
        return {
            "type": "response_time_optimization",
            "description": f"优化查询响应时间(当前: {metrics.avg_response_time_ms:.0f}ms)",
            "details": {
                "current_response_time_ms": metrics.avg_response_time_ms,
                "target_response_time_ms": self.target_response_time_ms,
                "avg_scan_count": metrics.scan_count_avg,
                "recommendations": recommendations
            },
            "priority": "high" if metrics.avg_response_time_ms > 10000 else "medium"
        }
    
    def _plan_cache_optimization(self, patterns: Dict) -> Dict:
        """规划缓存优化"""
        recommendations = [
            "增加查询结果缓存大小",
            "优化缓存策略,提高热点查询缓存命中率",
            "考虑预热常用查询的缓存"
        ]
        
        # 基于查询模式的缓存建议
        frequent_patterns = patterns.get("groupby_patterns", {})
        if frequent_patterns:
            top_pattern = list(frequent_patterns.keys())[0]
            recommendations.append(f"为高频查询模式({top_pattern})配置专用缓存")
        
        return {
            "type": "cache_optimization",
            "description": "优化查询缓存策略",
            "details": {
                "recommendations": recommendations
            },
            "priority": "medium"
        }
    
    def _plan_cuboid_optimization(self, cube_info: Dict, patterns: Dict) -> Dict:
        """规划Cuboid优化"""
        recommendations = []
        
        # 分析当前聚合组配置
        aggregation_groups = cube_info.get("aggregation_groups", [])
        frequent_dims = patterns.get("frequent_dimensions", {})
        
        if frequent_dims:
            top_dims = list(frequent_dims.keys())[:5]
            recommendations.append(f"确保高频维度({', '.join(top_dims)})在聚合组中")
        
        # 分析GROUP BY模式
        groupby_patterns = patterns.get("groupby_patterns", {})
        if groupby_patterns:
            top_groupby = list(groupby_patterns.keys())[0]
            recommendations.append(f"为常用GROUP BY模式({top_groupby})创建专用Cuboid")
        
        return {
            "type": "cuboid_optimization",
            "description": "优化Cuboid配置",
            "details": {
                "current_aggregation_groups": len(aggregation_groups),
                "recommendations": recommendations
            },
            "priority": "high"
        }
    
    def _generate_index_recommendations(self, patterns: Dict) -> List[Dict]:
        """生成索引建议"""
        recommendations = []
        
        # 基于过滤模式的索引建议
        filter_patterns = patterns.get("filter_patterns", {})
        for filter_expr, count in list(filter_patterns.items())[:5]:
            if count > 10:  # 高频过滤条件
                recommendations.append({
                    "type": "filter_index",
                    "description": f"为高频过滤条件创建索引",
                    "filter_expression": filter_expr,
                    "frequency": count,
                    "priority": "high" if count > 50 else "medium"
                })
        
        # 基于时间模式的索引建议
        time_patterns = patterns.get("time_patterns", {})
        if time_patterns:
            recommendations.append({
                "type": "time_index",
                "description": "优化时间维度索引",
                "time_patterns": list(time_patterns.keys())[:3],
                "priority": "medium"
            })
        
        return recommendations
    
    def _generate_cube_recommendations(self, cube_info: Dict, patterns: Dict) -> List[Dict]:
        """生成Cube建议"""
        recommendations = []
        
        # 维度建议
        frequent_dims = patterns.get("frequent_dimensions", {})
        current_dims = [d.get("name", "") for d in cube_info.get("dimensions", [])]
        
        for dim, count in frequent_dims.items():
            if dim not in current_dims and count > 20:
                recommendations.append({
                    "type": "add_dimension",
                    "description": f"添加高频维度: {dim}",
                    "dimension": dim,
                    "frequency": count,
                    "priority": "high"
                })
        
        # 度量建议
        frequent_measures = patterns.get("frequent_measures", {})
        for measure, count in frequent_measures.items():
            if count > 15:
                recommendations.append({
                    "type": "optimize_measure",
                    "description": f"优化高频度量: {measure}",
                    "measure": measure,
                    "frequency": count,
                    "priority": "medium"
                })
        
        # 聚合组建议
        groupby_patterns = patterns.get("groupby_patterns", {})
        for pattern, count in list(groupby_patterns.items())[:3]:
            if count > 10:
                recommendations.append({
                    "type": "aggregation_group",
                    "description": f"为常用查询模式创建聚合组",
                    "pattern": pattern,
                    "frequency": count,
                    "priority": "high" if count > 30 else "medium"
                })
        
        return recommendations
    
    def print_query_report(self, cube_name: str, query_history: List[Dict], cube_info: Dict):
        """打印查询报告"""
        metrics = self.analyze_query_performance(query_history)
        optimization_plan = self.generate_query_optimization_plan(cube_info, query_history)
        
        print(f"\n=== {cube_name} 查询性能报告 ===")
        
        # 当前指标
        print(f"\n📊 查询指标:")
        print(f"  平均响应时间: {metrics.avg_response_time_ms:.0f} ms")
        print(f"  查询成功率: {metrics.query_success_rate:.1%}")
        print(f"  缓存命中率: {metrics.cache_hit_rate:.1%}")
        print(f"  Cuboid命中率: {metrics.cuboid_hit_rate:.1%}")
        print(f"  平均扫描行数: {metrics.scan_count_avg:.0f}")
        
        # 查询模式
        patterns = optimization_plan["query_patterns"]
        print(f"\n📈 查询模式分析:")
        
        frequent_dims = patterns.get("frequent_dimensions", {})
        if frequent_dims:
            print(f"\n  高频维度:")
            for dim, count in list(frequent_dims.items())[:5]:
                print(f"    {dim}: {count}次")
        
        groupby_patterns = patterns.get("groupby_patterns", {})
        if groupby_patterns:
            print(f"\n  常用GROUP BY模式:")
            for pattern, count in list(groupby_patterns.items())[:3]:
                print(f"    {pattern}: {count}次")
        
        # 优化建议
        optimizations = optimization_plan["optimizations"]
        if optimizations:
            print(f"\n🔧 优化建议:")
            for i, opt in enumerate(optimizations, 1):
                priority_icon = {'high': '🔴', 'medium': '🟡', 'low': '🟢'}.get(opt['priority'], '⚪')
                print(f"\n{priority_icon} {i}. {opt['description']} ({opt['priority']})")
                
                for rec in opt["details"].get("recommendations", []):
                    print(f"   - {rec}")
        
        # Cube建议
        cube_recs = optimization_plan["cube_recommendations"]
        if cube_recs:
            print(f"\n🎯 Cube优化建议:")
            for i, rec in enumerate(cube_recs[:5], 1):
                priority_icon = {'high': '🔴', 'medium': '🟡', 'low': '🟢'}.get(rec['priority'], '⚪')
                print(f"\n{priority_icon} {i}. {rec['description']} ({rec['priority']})")
                if 'frequency' in rec:
                    print(f"   频率: {rec['frequency']}次")

def main():
    # 示例使用
    optimizer = QueryOptimizer()
    
    # 示例查询历史
    query_history = [
        {
            "sql": "SELECT time.year, product.category, SUM(sales.amount) FROM sales_fact sales JOIN time_dim time ON sales.time_key = time.date_key JOIN product_dim product ON sales.product_key = product.product_key WHERE time.year = 2023 GROUP BY time.year, product.category",
            "duration": 3000,
            "exception": None,
            "cache_hit": False,
            "exactly_match": True,
            "scan_count": 500000
        },
        {
            "sql": "SELECT customer.region, SUM(sales.amount), COUNT(*) FROM sales_fact sales JOIN customer_dim customer ON sales.customer_key = customer.customer_key WHERE customer.region = 'North' GROUP BY customer.region",
            "duration": 8000,
            "exception": None,
            "cache_hit": True,
            "exactly_match": False,
            "scan_count": 1200000
        }
    ]
    
    # 示例Cube信息
    cube_info = {
        "name": "sales_cube",
        "dimensions": [
            {"name": "time.year"},
            {"name": "product.category"},
            {"name": "customer.region"}
        ],
        "aggregation_groups": [
            {"includes": ["time.year", "product.category"]}
        ]
    }
    
    optimizer.print_query_report("sales_cube", query_history, cube_info)

if __name__ == "__main__":
    main()

5.6 最佳实践总结

5.6.1 模型设计原则

维度设计原则: - 选择合适的粒度,平衡查询灵活性和性能 - 建立清晰的维度层次结构 - 使用一致的命名规范 - 考虑维度的变化频率和历史追踪需求

度量设计原则: - 区分基础度量和派生度量 - 选择合适的聚合函数 - 考虑度量的可加性 - 预计算常用的复合度量

分区策略: - 基于时间维度进行分区 - 选择合适的分区粒度(日、周、月) - 考虑数据增长速度和查询模式 - 定期清理历史分区

5.6.2 性能优化策略

存储优化: - 合理控制段数量和大小 - 选择合适的压缩算法 - 优化编码方式 - 定期进行段合并

查询优化: - 设计合理的聚合组 - 优化Cuboid配置 - 提高缓存命中率 - 减少不必要的维度组合

构建优化: - 选择合适的构建算法 - 配置并行构建 - 优化资源分配 - 监控构建性能

5.6.3 运维管理

监控告警: - 设置性能阈值告警 - 监控存储使用情况 - 跟踪查询性能趋势 - 建立故障处理流程

备份恢复: - 定期备份模型配置 - 建立数据恢复机制 - 测试恢复流程 - 文档化操作步骤

版本管理: - 使用版本控制管理模型变更 - 建立变更审批流程 - 保留历史版本 - 支持快速回滚

5.7 本章小结

本章深入介绍了Apache Kylin的数据模型设计与管理,涵盖了以下核心内容:

  1. 维度建模理论:介绍了事实表、维度表、星型模式等基础概念
  2. 数据模型创建:详细说明了通过Web UI和REST API创建模型的方法
  3. 维度和度量定义:提供了维度层次设计和度量类型分类的最佳实践
  4. 模型管理与优化:包括生命周期管理、性能监控和优化策略
  5. Cube设计与管理:深入讲解了Cube配置、聚合组优化等高级主题
  6. 性能优化:提供了存储、构建、查询等多方面的优化工具和方法

通过本章的学习,读者应该能够: - 理解维度建模的基本原理和最佳实践 - 熟练创建和管理Kylin数据模型 - 设计高效的Cube配置 - 进行性能监控和优化 - 建立完善的运维管理体系

下一章将介绍”Cube构建与管理”,深入探讨Cube的构建策略、增量构建、任务调度等高级主题。

5.8 练习与思考

理论练习

  1. 维度建模设计

    • 为电商业务设计一个完整的星型模式
    • 包括订单事实表和相关维度表
    • 考虑维度的层次结构和属性
  2. 聚合组优化

    • 分析给定的查询模式
    • 设计最优的聚合组配置
    • 计算Cuboid数量和存储开销
  3. 性能分析

    • 根据查询日志分析性能瓶颈
    • 提出具体的优化建议
    • 评估优化效果

实践练习

  1. 模型创建实践

    # 使用提供的脚本创建数据模型
    python model_validator.py --config sales_model.json
    python dimension_designer.py --create-time-dimension
    python measure_designer.py --create-basic-measures
    
    1. 性能监控实践

      # 运行性能监控工具
      python model_performance_monitor.py --cube sales_cube
      python query_optimizer.py --analyze-patterns
      python storage_optimizer.py --generate-report
      
  2. 优化实践

    # 执行优化建议
    python aggregation_group_optimizer.py --optimize sales_cube
    python build_optimizer.py --analyze-build-history
    

    思考题

    1. 设计权衡:在维度数量和查询性能之间如何找到平衡点?
    2. 存储优化:什么情况下应该选择layer算法而不是inmem算法?
    3. 查询优化:如何设计聚合组来最大化Cuboid命中率?
    4. 运维策略:如何建立有效的模型变更管理流程?
    5. 扩展性考虑:随着数据量增长,模型设计需要考虑哪些因素? – 时间维度层次示例 CREATE TABLE time_hierarchy ( date_key INT PRIMARY KEY, full_date DATE, – 年层次 year_key INT, year_name VARCHAR(10), – 季度层次 quarter_key INT, quarter_name VARCHAR(10), – 月层次 month_key INT, month_name VARCHAR(20), month_short_name VARCHAR(10), – 周层次 week_key INT, week_name VARCHAR(20), – 日层次 day_of_year INT, day_of_month INT, day_of_week INT, day_name VARCHAR(20), is_weekend BOOLEAN, is_holiday BOOLEAN, – 业务日历 fiscal_year INT, fiscal_quarter INT, fiscal_month INT ); – 产品维度层次示例 CREATE TABLE product_hierarchy ( product_key INT PRIMARY KEY, product_id VARCHAR(50), product_name VARCHAR(200), – 品类层次 category_key INT, category_name VARCHAR(100), subcategory_key INT, subcategory_name VARCHAR(100), – 品牌层次 brand_key INT, brand_name VARCHAR(100), – 供应商层次 supplier_key INT, supplier_name VARCHAR(100), – 产品属性 product_type VARCHAR(50), product_status VARCHAR(20), unit_price DECIMAL(10,2), unit_cost DECIMAL(10,2), weight DECIMAL(8,2), color VARCHAR(50), size VARCHAR(50), – 审计字段 created_date DATE, modified_date DATE, is_active BOOLEAN ); – 地理维度层次示例 CREATE TABLE geography_hierarchy ( geography_key INT PRIMARY KEY, – 国家层次 country_key INT, country_name VARCHAR(100), country_code VARCHAR(10), – 地区层次 region_key INT, region_name VARCHAR(100), – 省/州层次 state_key INT, state_name VARCHAR(100), state_code VARCHAR(10), – 城市层次 city_key INT, city_name VARCHAR(100), – 邮编层次 postal_code VARCHAR(20), – 地理坐标 latitude DECIMAL(10,8), longitude DECIMAL(11,8), – 时区 timezone VARCHAR(50) ); – 客户维度层次示例 CREATE TABLE customer_hierarchy ( customer_key INT PRIMARY KEY, customer_id VARCHAR(50), customer_name VARCHAR(200), – 客户分类 customer_type VARCHAR(50), – B2B, B2C customer_segment VARCHAR(50), – VIP, Premium, Standard customer_status VARCHAR(20), – Active, Inactive, Suspended – 人口统计 age_group VARCHAR(20), gender VARCHAR(10), education_level VARCHAR(50), income_level VARCHAR(50), – 地理信息 geography_key INT, – 联系信息 email VARCHAR(200), phone VARCHAR(50), – 注册信息 registration_date DATE, first_purchase_date DATE, last_purchase_date DATE, – 客户价值 lifetime_value DECIMAL(12,2), credit_score INT, – 审计字段 created_date DATE, modified_date DATE, is_active BOOLEAN, – 外键约束 FOREIGN KEY (geography_key) REFERENCES geography_hierarchy(geography_key) ); – 销售事实表设计 CREATE TABLE sales_fact ( – 代理键 sales_key BIGINT PRIMARY KEY, – 维度外键 time_key INT NOT NULL, product_key INT NOT NULL, customer_key INT NOT NULL, geography_key INT NOT NULL, sales_rep_key INT, promotion_key INT, – 度量字段 quantity_sold INT, unit_price DECIMAL(10,2), unit_cost DECIMAL(10,2), sales_amount DECIMAL(12,2), cost_amount DECIMAL(12,2), profit_amount DECIMAL(12,2), discount_amount DECIMAL(10,2), tax_amount DECIMAL(10,2), shipping_amount DECIMAL(10,2), – 退化维度(直接存储在事实表中的维度属性) order_id VARCHAR(50), order_line_number INT, invoice_number VARCHAR(50), payment_method VARCHAR(50), shipping_method VARCHAR(50), – 审计字段 transaction_timestamp TIMESTAMP, created_date DATE, modified_date DATE, etl_batch_id BIGINT, – 外键约束 FOREIGN KEY (time_key) REFERENCES time_hierarchy(date_key), FOREIGN KEY (product_key) REFERENCES product_hierarchy(product_key), FOREIGN KEY (customer_key) REFERENCES customer_hierarchy(customer_key), FOREIGN KEY (geography_key) REFERENCES geography_hierarchy(geography_key) ) – 分区策略 PARTITION BY RANGE (time_key) ( PARTITION p2023_q1 VALUES LESS THAN (20230401), PARTITION p2023_q2 VALUES LESS THAN (20230701), PARTITION p2023_q3 VALUES LESS THAN (20231001), PARTITION p2023_q4 VALUES LESS THAN (20240101), PARTITION p2024_q1 VALUES LESS THAN (20240401) ); – 创建索引 CREATE INDEX idx_sales_time ON sales_fact(time_key); CREATE INDEX idx_sales_product ON sales_fact(product_key); CREATE INDEX idx_sales_customer ON sales_fact(customer_key); CREATE INDEX idx_sales_geography ON sales_fact(geography_key); CREATE INDEX idx_sales_order ON sales_fact(order_id); CREATE INDEX idx_sales_timestamp ON sales_fact(transaction_timestamp); “`

5.4 数据模型验证工具

为了确保数据模型的质量和性能,我们需要建立完善的验证机制:

#!/usr/bin/env python3
# advanced_model_validator.py - 高级数据模型验证工具

import json
import re
import logging
from typing import Dict, List, Set, Tuple, Optional
from dataclasses import dataclass
from datetime import datetime
import sqlite3
import pandas as pd

@dataclass
class ValidationResult:
    """验证结果"""
    category: str
    level: str  # ERROR, WARNING, INFO
    message: str
    details: Dict
    suggestions: List[str]

class AdvancedModelValidator:
    """高级数据模型验证器"""
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.validation_results = []
        
        # 验证规则配置
        self.rules = {
            "max_dimensions": 50,
            "max_measures": 100,
            "max_cuboids": 10000,
            "min_fact_table_rows": 1000,
            "max_dimension_cardinality": 1000000,
            "recommended_aggregation_groups": 5
        }
    
    def validate_model_structure(self, model_config: Dict) -> List[ValidationResult]:
        """验证模型结构"""
        results = []
        
        # 验证基本结构
        required_fields = ['name', 'fact_table', 'dimensions', 'measures']
        for field in required_fields:
            if field not in model_config:
                results.append(ValidationResult(
                    category="structure",
                    level="ERROR",
                    message=f"缺少必需字段: {field}",
                    details={"missing_field": field},
                    suggestions=[f"添加{field}字段到模型配置中"]
                ))
        
        # 验证维度数量
        dimensions = model_config.get('dimensions', [])
        if len(dimensions) > self.rules['max_dimensions']:
            results.append(ValidationResult(
                category="structure",
                level="WARNING",
                message=f"维度数量过多: {len(dimensions)} > {self.rules['max_dimensions']}",
                details={"dimension_count": len(dimensions), "max_allowed": self.rules['max_dimensions']},
                suggestions=[
                    "考虑合并相似维度",
                    "移除不常用的维度",
                    "使用维度层次减少维度数量"
                ]
            ))
        
        # 验证度量数量
        measures = model_config.get('measures', [])
        if len(measures) > self.rules['max_measures']:
            results.append(ValidationResult(
                category="structure",
                level="WARNING",
                message=f"度量数量过多: {len(measures)} > {self.rules['max_measures']}",
                details={"measure_count": len(measures), "max_allowed": self.rules['max_measures']},
                suggestions=[
                    "移除不常用的度量",
                    "使用派生度量替代基础度量",
                    "考虑在查询时计算某些度量"
                ]
            ))
        
        return results
    
    def validate_dimensions(self, model_config: Dict) -> List[ValidationResult]:
        """验证维度设计"""
        results = []
        dimensions = model_config.get('dimensions', [])
        
        for dim in dimensions:
            dim_name = dim.get('name', 'unknown')
            
            # 验证维度名称
            if not self._is_valid_identifier(dim_name):
                results.append(ValidationResult(
                    category="dimension",
                    level="ERROR",
                    message=f"维度名称不符合规范: {dim_name}",
                    details={"dimension": dim_name},
                    suggestions=["使用字母、数字和下划线组成的名称"]
                ))
            
            # 验证维度表
            table = dim.get('table')
            if not table:
                results.append(ValidationResult(
                    category="dimension",
                    level="ERROR",
                    message=f"维度{dim_name}缺少表定义",
                    details={"dimension": dim_name},
                    suggestions=["为维度指定对应的维度表"]
                ))
            
            # 验证维度列
            column = dim.get('column')
            if not column:
                results.append(ValidationResult(
                    category="dimension",
                    level="ERROR",
                    message=f"维度{dim_name}缺少列定义",
                    details={"dimension": dim_name},
                    suggestions=["为维度指定对应的列名"]
                ))
            
            # 验证维度基数
            cardinality = dim.get('cardinality', 0)
            if cardinality > self.rules['max_dimension_cardinality']:
                results.append(ValidationResult(
                    category="dimension",
                    level="WARNING",
                    message=f"维度{dim_name}基数过高: {cardinality}",
                    details={"dimension": dim_name, "cardinality": cardinality},
                    suggestions=[
                        "考虑对高基数维度进行分组",
                        "使用维度层次减少基数",
                        "评估是否真的需要这个维度"
                    ]
                ))
        
        return results
    
    def validate_measures(self, model_config: Dict) -> List[ValidationResult]:
        """验证度量设计"""
        results = []
        measures = model_config.get('measures', [])
        
        measure_names = set()
        for measure in measures:
            measure_name = measure.get('name', 'unknown')
            
            # 检查重复度量名称
            if measure_name in measure_names:
                results.append(ValidationResult(
                    category="measure",
                    level="ERROR",
                    message=f"重复的度量名称: {measure_name}",
                    details={"measure": measure_name},
                    suggestions=["使用唯一的度量名称"]
                ))
            measure_names.add(measure_name)
            
            # 验证度量名称
            if not self._is_valid_identifier(measure_name):
                results.append(ValidationResult(
                    category="measure",
                    level="ERROR",
                    message=f"度量名称不符合规范: {measure_name}",
                    details={"measure": measure_name},
                    suggestions=["使用字母、数字和下划线组成的名称"]
                ))
            
            # 验证聚合函数
            function = measure.get('function', '').upper()
            valid_functions = ['SUM', 'COUNT', 'MAX', 'MIN', 'AVG', 'COUNT_DISTINCT']
            if function not in valid_functions:
                results.append(ValidationResult(
                    category="measure",
                    level="ERROR",
                    message=f"不支持的聚合函数: {function}",
                    details={"measure": measure_name, "function": function},
                    suggestions=[f"使用支持的聚合函数: {', '.join(valid_functions)}"]
                ))
            
            # 验证度量表达式
            expression = measure.get('expression')
            if not expression:
                results.append(ValidationResult(
                    category="measure",
                    level="ERROR",
                    message=f"度量{measure_name}缺少表达式",
                    details={"measure": measure_name},
                    suggestions=["为度量定义计算表达式"]
                ))
        
        return results
    
    def validate_relationships(self, model_config: Dict) -> List[ValidationResult]:
        """验证表关系"""
        results = []
        
        fact_table = model_config.get('fact_table')
        dimensions = model_config.get('dimensions', [])
        lookups = model_config.get('lookups', [])
        
        # 验证事实表定义
        if not fact_table:
            results.append(ValidationResult(
                category="relationship",
                level="ERROR",
                message="缺少事实表定义",
                details={},
                suggestions=["定义模型的事实表"]
            ))
            return results
        
        # 验证连接关系
        for lookup in lookups:
            # 验证连接类型
            join_type = lookup.get('join', {}).get('type', '').upper()
            valid_joins = ['INNER', 'LEFT']
            if join_type not in valid_joins:
                results.append(ValidationResult(
                    category="relationship",
                    level="WARNING",
                    message=f"建议使用INNER或LEFT连接,当前: {join_type}",
                    details={"join_type": join_type},
                    suggestions=["使用INNER JOIN提高性能,使用LEFT JOIN保证数据完整性"]
                ))
            
            # 验证连接条件
            primary_key = lookup.get('join', {}).get('primary_key', [])
            foreign_key = lookup.get('join', {}).get('foreign_key', [])
            
            if len(primary_key) != len(foreign_key):
                results.append(ValidationResult(
                    category="relationship",
                    level="ERROR",
                    message="主键和外键数量不匹配",
                    details={"primary_key": primary_key, "foreign_key": foreign_key},
                    suggestions=["确保主键和外键字段一一对应"]
                ))
        
        return results
    
    def validate_performance(self, model_config: Dict) -> List[ValidationResult]:
        """验证性能配置"""
        results = []
        
        # 验证分区配置
        partition_desc = model_config.get('partition_desc')
        if not partition_desc:
            results.append(ValidationResult(
                category="performance",
                level="WARNING",
                message="未配置分区策略",
                details={},
                suggestions=[
                    "配置基于时间的分区策略",
                    "选择合适的分区粒度(日/周/月)",
                    "考虑数据增长速度和查询模式"
                ]
            ))
        else:
            # 验证分区列
            partition_column = partition_desc.get('partition_date_column')
            if not partition_column:
                results.append(ValidationResult(
                    category="performance",
                    level="ERROR",
                    message="分区配置缺少分区列",
                    details={},
                    suggestions=["指定用于分区的日期列"]
                ))
        
        # 验证聚合组配置
        aggregation_groups = model_config.get('aggregation_groups', [])
        if not aggregation_groups:
            results.append(ValidationResult(
                category="performance",
                level="WARNING",
                message="未配置聚合组",
                details={},
                suggestions=[
                    "配置聚合组以优化查询性能",
                    "基于查询模式设计聚合组",
                    "控制Cuboid数量"
                ]
            ))
        elif len(aggregation_groups) > self.rules['recommended_aggregation_groups']:
            results.append(ValidationResult(
                category="performance",
                level="WARNING",
                message=f"聚合组数量过多: {len(aggregation_groups)}",
                details={"count": len(aggregation_groups)},
                suggestions=[
                    "合并相似的聚合组",
                    "移除不必要的聚合组",
                    "优化聚合组设计"
                ]
            ))
        
        # 估算Cuboid数量
        total_cuboids = self._estimate_cuboid_count(model_config)
        if total_cuboids > self.rules['max_cuboids']:
            results.append(ValidationResult(
                category="performance",
                level="ERROR",
                message=f"预估Cuboid数量过多: {total_cuboids}",
                details={"estimated_cuboids": total_cuboids},
                suggestions=[
                    "减少维度数量",
                    "优化聚合组配置",
                    "使用强制维度和层次维度",
                    "考虑使用联合维度"
                ]
            ))
        
        return results
    
    def validate_business_logic(self, model_config: Dict) -> List[ValidationResult]:
        """验证业务逻辑"""
        results = []
        
        # 验证度量的业务含义
        measures = model_config.get('measures', [])
        for measure in measures:
            measure_name = measure.get('name', '')
            function = measure.get('function', '').upper()
            
            # 检查可能的业务逻辑错误
            if 'amount' in measure_name.lower() and function == 'COUNT':
                results.append(ValidationResult(
                    category="business_logic",
                    level="WARNING",
                    message=f"度量{measure_name}使用COUNT函数可能不合适",
                    details={"measure": measure_name, "function": function},
                    suggestions=["金额类度量通常使用SUM函数"]
                ))
            
            if 'count' in measure_name.lower() and function != 'COUNT':
                results.append(ValidationResult(
                    category="business_logic",
                    level="WARNING",
                    message=f"度量{measure_name}可能应该使用COUNT函数",
                    details={"measure": measure_name, "function": function},
                    suggestions=["计数类度量通常使用COUNT函数"]
                ))
        
        # 验证维度的业务含义
        dimensions = model_config.get('dimensions', [])
        time_dimensions = [d for d in dimensions if 'time' in d.get('name', '').lower() or 'date' in d.get('name', '').lower()]
        
        if not time_dimensions:
            results.append(ValidationResult(
                category="business_logic",
                level="WARNING",
                message="模型中缺少时间维度",
                details={},
                suggestions=[
                    "添加时间维度以支持时间序列分析",
                    "考虑业务分析的时间需求"
                ]
            ))
        
        return results
    
    def validate_security(self, model_config: Dict) -> List[ValidationResult]:
        """验证安全配置"""
        results = []
        
        # 检查敏感数据
        dimensions = model_config.get('dimensions', [])
        measures = model_config.get('measures', [])
        
        sensitive_keywords = ['password', 'ssn', 'credit_card', 'phone', 'email', 'address']
        
        for dim in dimensions:
            dim_name = dim.get('name', '').lower()
            for keyword in sensitive_keywords:
                if keyword in dim_name:
                    results.append(ValidationResult(
                        category="security",
                        level="WARNING",
                        message=f"维度{dim.get('name')}可能包含敏感信息",
                        details={"dimension": dim.get('name'), "keyword": keyword},
                        suggestions=[
                            "考虑对敏感数据进行脱敏处理",
                            "实施行级安全控制",
                            "限制对敏感维度的访问权限"
                        ]
                    ))
        
        return results
    
    def _is_valid_identifier(self, name: str) -> bool:
        """验证标识符是否有效"""
        if not name:
            return False
        return re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name) is not None
    
    def _estimate_cuboid_count(self, model_config: Dict) -> int:
        """估算Cuboid数量"""
        dimensions = model_config.get('dimensions', [])
        aggregation_groups = model_config.get('aggregation_groups', [])
        
        if not aggregation_groups:
            # 如果没有聚合组,估算为2^n
            return 2 ** min(len(dimensions), 20)  # 限制最大值
        
        total_cuboids = 0
        for group in aggregation_groups:
            includes = group.get('includes', [])
            mandatory_dims = group.get('select_rule', {}).get('mandatory_dims', [])
            hierarchy_dims = group.get('select_rule', {}).get('hierarchy_dims', [])
            joint_dims = group.get('select_rule', {}).get('joint_dims', [])
            
            # 简化的Cuboid数量估算
            base_dims = len(includes)
            if base_dims > 0:
                group_cuboids = 2 ** min(base_dims, 15)  # 限制单个组的最大值
                total_cuboids += group_cuboids
        
        return total_cuboids
    
    def generate_validation_report(self, model_config: Dict) -> Dict:
        """生成验证报告"""
        all_results = []
        
        # 执行所有验证
        all_results.extend(self.validate_model_structure(model_config))
        all_results.extend(self.validate_dimensions(model_config))
        all_results.extend(self.validate_measures(model_config))
        all_results.extend(self.validate_relationships(model_config))
        all_results.extend(self.validate_performance(model_config))
        all_results.extend(self.validate_business_logic(model_config))
        all_results.extend(self.validate_security(model_config))
        
        # 统计结果
        error_count = len([r for r in all_results if r.level == 'ERROR'])
        warning_count = len([r for r in all_results if r.level == 'WARNING'])
        info_count = len([r for r in all_results if r.level == 'INFO'])
        
        # 计算质量评分
        quality_score = max(0, 100 - error_count * 20 - warning_count * 5)
        
        return {
            "model_name": model_config.get('name', 'unknown'),
            "validation_timestamp": datetime.now().isoformat(),
            "quality_score": quality_score,
            "summary": {
                "total_issues": len(all_results),
                "errors": error_count,
                "warnings": warning_count,
                "info": info_count
            },
            "results": [{
                "category": r.category,
                "level": r.level,
                "message": r.message,
                "details": r.details,
                "suggestions": r.suggestions
            } for r in all_results],
            "recommendations": self._generate_recommendations(all_results)
        }
    
    def _generate_recommendations(self, results: List[ValidationResult]) -> List[str]:
        """生成改进建议"""
        recommendations = []
        
        error_count = len([r for r in results if r.level == 'ERROR'])
        warning_count = len([r for r in results if r.level == 'WARNING'])
        
        if error_count > 0:
            recommendations.append(f"修复{error_count}个错误以确保模型可用性")
        
        if warning_count > 5:
            recommendations.append(f"关注{warning_count}个警告以提高模型质量")
        
        # 基于问题类型的建议
        categories = {}
        for result in results:
            if result.level in ['ERROR', 'WARNING']:
                categories[result.category] = categories.get(result.category, 0) + 1
        
        if categories.get('performance', 0) > 2:
            recommendations.append("重点关注性能优化,考虑重新设计聚合组")
        
        if categories.get('structure', 0) > 2:
            recommendations.append("检查模型结构设计,确保符合最佳实践")
        
        if categories.get('business_logic', 0) > 1:
            recommendations.append("与业务专家确认度量和维度的业务含义")
        
        return recommendations
    
    def print_validation_report(self, model_config: Dict):
        """打印验证报告"""
        report = self.generate_validation_report(model_config)
        
        print(f"\n=== 数据模型验证报告 ===")
        print(f"模型名称: {report['model_name']}")
        print(f"验证时间: {report['validation_timestamp']}")
        print(f"质量评分: {report['quality_score']}/100")
        
        # 问题统计
        summary = report['summary']
        print(f"\n📊 问题统计:")
        print(f"  总问题数: {summary['total_issues']}")
        print(f"  错误: {summary['errors']}")
        print(f"  警告: {summary['warnings']}")
        print(f"  信息: {summary['info']}")
        
        # 详细结果
        if report['results']:
            print(f"\n🔍 详细结果:")
            for i, result in enumerate(report['results'], 1):
                level_icon = {'ERROR': '❌', 'WARNING': '⚠️', 'INFO': 'ℹ️'}.get(result['level'], '•')
                print(f"\n{level_icon} {i}. [{result['category']}] {result['message']}")
                
                if result['suggestions']:
                    print("   建议:")
                    for suggestion in result['suggestions']:
                        print(f"   - {suggestion}")
        
        # 改进建议
        if report['recommendations']:
            print(f"\n💡 改进建议:")
            for i, rec in enumerate(report['recommendations'], 1):
                print(f"  {i}. {rec}")

def main():
    # 示例使用
    validator = AdvancedModelValidator()
    
    # 示例模型配置
    model_config = {
        "name": "sales_model",
        "fact_table": "sales_fact",
        "dimensions": [
            {
                "name": "time_dim",
                "table": "time_hierarchy",
                "column": "date_key",
                "cardinality": 3650
            },
            {
                "name": "product_dim",
                "table": "product_hierarchy",
                "column": "product_key",
                "cardinality": 50000
            }
        ],
        "measures": [
            {
                "name": "sales_amount",
                "function": "SUM",
                "expression": "sales_fact.sales_amount"
            },
            {
                "name": "order_count",
                "function": "COUNT",
                "expression": "1"
            }
        ],
        "lookups": [
            {
                "table": "time_hierarchy",
                "join": {
                    "type": "inner",
                    "primary_key": ["time_hierarchy.date_key"],
                    "foreign_key": ["sales_fact.time_key"]
                }
            }
        ],
        "partition_desc": {
            "partition_date_column": "time_key",
            "partition_date_start": "2020-01-01"
        },
        "aggregation_groups": [
            {
                "includes": ["time_dim", "product_dim"],
                "select_rule": {
                    "mandatory_dims": ["time_dim"]
                }
            }
        ]
    }
    
    validator.print_validation_report(model_config)

if __name__ == "__main__":
    main()
quarter_key INT,
quarter_name VARCHAR(10),
quarter_of_year INT,
-- 月层次
month_key INT,
month_name VARCHAR(20),
month_of_year INT,
month_of_quarter INT,
-- 周层次
week_key INT,
week_of_year INT,
week_of_month INT,
-- 日层次
day_of_year INT,
day_of_quarter INT,
day_of_month INT,
day_of_week INT,
day_name VARCHAR(10),
-- 业务属性
is_weekend BOOLEAN,
is_holiday BOOLEAN,
is_workday BOOLEAN,
fiscal_year INT,
fiscal_quarter INT,
fiscal_month INT

);

– 地理维度层次示例 CREATE TABLE geography_hierarchy ( geography_key INT PRIMARY KEY, – 最细粒度 store_id VARCHAR(50), store_name VARCHAR(200), – 城市层次 city_key INT, city_name VARCHAR(100), – 州/省层次 state_key INT, state_name VARCHAR(100), state_code VARCHAR(10), – 区域层次 region_key INT, region_name VARCHAR(100), – 国家层次 country_key INT, country_name VARCHAR(100), country_code VARCHAR(10), – 大洲层次 continent_key INT, continent_name VARCHAR(50), – 地理坐标 latitude DECIMAL(10,8), longitude DECIMAL(11,8), timezone VARCHAR(50) );


## 5.2 Kylin数据模型创建

### 5.2.1 数据模型基础

在Apache Kylin中,数据模型定义了事实表和维度表之间的关系,是构建Cube的基础。

**数据模型组成:**
1. **事实表**:包含度量数据的主表
2. **维度表**:提供描述信息的查找表
3. **连接关系**:定义表之间的关联方式
4. **分区信息**:用于数据分区和增量构建

### 5.2.2 创建数据模型

**通过Web UI创建数据模型:**

1. **登录Kylin Web UI**
   - 访问 http://kylin-server:7070/kylin
   - 使用管理员账户登录

2. **进入模型管理页面**
   - 点击"Models"菜单
   - 点击"+ New"按钮

3. **配置基本信息**
```json
{
  "name": "sales_model",
  "description": "销售数据模型",
  "fact_table": "default.sales_fact",
  "lookups": [
    {
      "table": "default.date_dim",
      "kind": "LOOKUP",
      "alias": "date_dim",
      "join": {
        "type": "INNER",
        "primary_key": ["date_key"],
        "foreign_key": ["date_key"]
      }
    },
    {
      "table": "default.product_dim",
      "kind": "LOOKUP",
      "alias": "product_dim",
      "join": {
        "type": "INNER",
        "primary_key": ["product_key"],
        "foreign_key": ["product_key"]
      }
    },
    {
      "table": "default.customer_dim",
      "kind": "LOOKUP",
      "alias": "customer_dim",
      "join": {
        "type": "LEFT",
        "primary_key": ["customer_key"],
        "foreign_key": ["customer_key"]
      }
    }
  ],
  "partition_desc": {
    "partition_date_column": "sales_fact.sale_date",
    "partition_time_column": null,
    "partition_date_start": 0,
    "partition_date_format": "yyyy-MM-dd",
    "partition_type": "APPEND",
    "partition_condition_builder": "org.apache.kylin.metadata.model.DefaultPartitionConditionBuilder"
  },
  "filter_condition": "sales_fact.sales_amount > 0",
  "capacity": "MEDIUM"
}

通过REST API创建数据模型:

#!/bin/bash
# create_data_model.sh - 创建数据模型脚本

KYLIN_HOST="localhost:7070"
USERNAME="ADMIN"
PASSWORD="KYLIN"
PROJECT="sales_project"

# 数据模型JSON配置
read -r -d '' MODEL_JSON << 'EOF'
{
  "uuid": null,
  "name": "sales_model",
  "owner": "ADMIN",
  "description": "销售数据分析模型",
  "fact_table": "default.sales_fact",
  "lookups": [
    {
      "table": "default.date_dim",
      "kind": "LOOKUP",
      "alias": "date_dim",
      "join": {
        "type": "INNER",
        "primary_key": ["date_key"],
        "foreign_key": ["date_key"]
      }
    },
    {
      "table": "default.product_dim",
      "kind": "LOOKUP",
      "alias": "product_dim",
      "join": {
        "type": "INNER",
        "primary_key": ["product_key"],
        "foreign_key": ["product_key"]
      }
    },
    {
      "table": "default.customer_dim",
      "kind": "LOOKUP",
      "alias": "customer_dim",
      "join": {
        "type": "LEFT",
        "primary_key": ["customer_key"],
        "foreign_key": ["customer_key"]
      }
    },
    {
      "table": "default.store_dim",
      "kind": "LOOKUP",
      "alias": "store_dim",
      "join": {
        "type": "INNER",
        "primary_key": ["store_key"],
        "foreign_key": ["store_key"]
      }
    }
  ],
  "dimensions": [
    {
      "table": "sales_fact",
      "columns": ["sale_id"]
    },
    {
      "table": "date_dim",
      "columns": ["year", "quarter", "month", "week", "day_of_week", "is_weekend"]
    },
    {
      "table": "product_dim",
      "columns": ["category", "subcategory", "brand", "product_name"]
    },
    {
      "table": "customer_dim",
      "columns": ["gender", "age_group", "city", "state", "customer_segment"]
    },
    {
      "table": "store_dim",
      "columns": ["store_type", "city", "state", "region"]
    }
  ],
  "metrics": [
    {
      "name": "sales_amount",
      "function": {
        "expression": "SUM",
        "parameter": {
          "type": "column",
          "value": "sales_fact.sales_amount"
        },
        "returntype": "decimal(19,4)"
      }
    },
    {
      "name": "quantity",
      "function": {
        "expression": "SUM",
        "parameter": {
          "type": "column",
          "value": "sales_fact.quantity"
        },
        "returntype": "bigint"
      }
    },
    {
      "name": "profit",
      "function": {
        "expression": "SUM",
        "parameter": {
          "type": "column",
          "value": "sales_fact.profit"
        },
        "returntype": "decimal(19,4)"
      }
    },
    {
      "name": "transaction_count",
      "function": {
        "expression": "COUNT",
        "parameter": {
          "type": "constant",
          "value": "1"
        },
        "returntype": "bigint"
      }
    }
  ],
  "partition_desc": {
    "partition_date_column": "sales_fact.sale_date",
    "partition_date_start": 0,
    "partition_date_format": "yyyy-MM-dd",
    "partition_type": "APPEND"
  },
  "filter_condition": "sales_fact.sales_amount > 0 AND sales_fact.quantity > 0",
  "capacity": "MEDIUM"
}
EOF

# 创建数据模型
echo "创建数据模型: sales_model"
response=$(curl -s -X POST \
  "http://${KYLIN_HOST}/kylin/api/models" \
  -H "Content-Type: application/json" \
  -H "Authorization: Basic $(echo -n ${USERNAME}:${PASSWORD} | base64)" \
  -d "${MODEL_JSON}")

if echo "$response" | grep -q '"uuid"'; then
    echo "✅ 数据模型创建成功"
    echo "$response" | python -m json.tool
else
    echo "❌ 数据模型创建失败"
    echo "错误信息: $response"
    exit 1
fi

# 获取模型列表验证
echo "\n验证模型创建..."
models=$(curl -s -X GET \
  "http://${KYLIN_HOST}/kylin/api/models" \
  -H "Authorization: Basic $(echo -n ${USERNAME}:${PASSWORD} | base64)")

if echo "$models" | grep -q "sales_model"; then
    echo "✅ 模型验证成功"
else
    echo "❌ 模型验证失败"
fi

5.2.3 数据模型验证

模型验证脚本:

#!/usr/bin/env python3
# model_validator.py - 数据模型验证工具

import requests
import json
import sys
from datetime import datetime

class ModelValidator:
    def __init__(self, kylin_host, username, password):
        self.kylin_host = kylin_host
        self.username = username
        self.password = password
        self.session = requests.Session()
        self.session.auth = (username, password)
    
    def validate_model(self, model_name):
        """验证数据模型"""
        print(f"开始验证数据模型: {model_name}")
        
        validation_result = {
            'model_name': model_name,
            'validation_time': datetime.now().isoformat(),
            'checks': {},
            'overall_status': 'unknown',
            'recommendations': []
        }
        
        try:
            # 获取模型信息
            model_info = self.get_model_info(model_name)
            if not model_info:
                validation_result['overall_status'] = 'failed'
                validation_result['checks']['model_exists'] = {
                    'status': 'failed',
                    'message': '模型不存在'
                }
                return validation_result
            
            # 执行各项检查
            validation_result['checks']['model_exists'] = {
                'status': 'passed',
                'message': '模型存在'
            }
            
            # 检查事实表
            self.check_fact_table(model_info, validation_result)
            
            # 检查维度表
            self.check_dimension_tables(model_info, validation_result)
            
            # 检查连接关系
            self.check_join_relationships(model_info, validation_result)
            
            # 检查分区配置
            self.check_partition_config(model_info, validation_result)
            
            # 检查维度和度量
            self.check_dimensions_and_measures(model_info, validation_result)
            
            # 检查数据质量
            self.check_data_quality(model_info, validation_result)
            
            # 计算总体状态
            self.calculate_overall_status(validation_result)
            
            # 生成建议
            self.generate_recommendations(validation_result)
            
        except Exception as e:
            validation_result['overall_status'] = 'error'
            validation_result['checks']['validation_error'] = {
                'status': 'error',
                'message': f'验证过程出错: {str(e)}'
            }
        
        return validation_result
    
    def get_model_info(self, model_name):
        """获取模型信息"""
        try:
            response = self.session.get(
                f"http://{self.kylin_host}/kylin/api/models/{model_name}"
            )
            
            if response.status_code == 200:
                return response.json()
            else:
                return None
                
        except Exception as e:
            print(f"获取模型信息失败: {e}")
            return None
    
    def check_fact_table(self, model_info, validation_result):
        """检查事实表"""
        fact_table = model_info.get('fact_table')
        
        if not fact_table:
            validation_result['checks']['fact_table'] = {
                'status': 'failed',
                'message': '未定义事实表'
            }
            return
        
        # 检查事实表是否存在
        table_exists = self.check_table_exists(fact_table)
        
        if table_exists:
            # 检查事实表记录数
            record_count = self.get_table_record_count(fact_table)
            
            validation_result['checks']['fact_table'] = {
                'status': 'passed' if record_count > 0 else 'warning',
                'message': f'事实表存在,记录数: {record_count}',
                'details': {
                    'table_name': fact_table,
                    'record_count': record_count
                }
            }
            
            if record_count == 0:
                validation_result['recommendations'].append(
                    '事实表没有数据,请检查数据加载过程'
                )
        else:
            validation_result['checks']['fact_table'] = {
                'status': 'failed',
                'message': f'事实表不存在: {fact_table}'
            }
    
    def check_dimension_tables(self, model_info, validation_result):
        """检查维度表"""
        lookups = model_info.get('lookups', [])
        
        if not lookups:
            validation_result['checks']['dimension_tables'] = {
                'status': 'warning',
                'message': '没有定义维度表'
            }
            return
        
        dimension_check_results = []
        
        for lookup in lookups:
            table_name = lookup.get('table')
            alias = lookup.get('alias')
            
            table_exists = self.check_table_exists(table_name)
            record_count = self.get_table_record_count(table_name) if table_exists else 0
            
            dimension_check_results.append({
                'table': table_name,
                'alias': alias,
                'exists': table_exists,
                'record_count': record_count,
                'status': 'passed' if table_exists and record_count > 0 else 'failed'
            })
        
        failed_dimensions = [d for d in dimension_check_results if d['status'] == 'failed']
        
        if failed_dimensions:
            validation_result['checks']['dimension_tables'] = {
                'status': 'failed',
                'message': f'{len(failed_dimensions)} 个维度表存在问题',
                'details': dimension_check_results
            }
        else:
            validation_result['checks']['dimension_tables'] = {
                'status': 'passed',
                'message': f'所有 {len(dimension_check_results)} 个维度表正常',
                'details': dimension_check_results
            }
    
    def check_join_relationships(self, model_info, validation_result):
        """检查连接关系"""
        lookups = model_info.get('lookups', [])
        join_check_results = []
        
        for lookup in lookups:
            join_info = lookup.get('join', {})
            table_name = lookup.get('table')
            
            primary_key = join_info.get('primary_key', [])
            foreign_key = join_info.get('foreign_key', [])
            join_type = join_info.get('type', 'INNER')
            
            # 检查连接键是否匹配
            keys_match = len(primary_key) == len(foreign_key) and len(primary_key) > 0
            
            join_check_results.append({
                'table': table_name,
                'join_type': join_type,
                'primary_key': primary_key,
                'foreign_key': foreign_key,
                'keys_match': keys_match,
                'status': 'passed' if keys_match else 'failed'
            })
        
        failed_joins = [j for j in join_check_results if j['status'] == 'failed']
        
        if failed_joins:
            validation_result['checks']['join_relationships'] = {
                'status': 'failed',
                'message': f'{len(failed_joins)} 个连接关系配置错误',
                'details': join_check_results
            }
        else:
            validation_result['checks']['join_relationships'] = {
                'status': 'passed',
                'message': '所有连接关系配置正确',
                'details': join_check_results
            }
    
    def check_partition_config(self, model_info, validation_result):
        """检查分区配置"""
        partition_desc = model_info.get('partition_desc')
        
        if not partition_desc:
            validation_result['checks']['partition_config'] = {
                'status': 'warning',
                'message': '未配置分区信息'
            }
            return
        
        partition_column = partition_desc.get('partition_date_column')
        partition_format = partition_desc.get('partition_date_format')
        
        if partition_column and partition_format:
            validation_result['checks']['partition_config'] = {
                'status': 'passed',
                'message': '分区配置正确',
                'details': {
                    'partition_column': partition_column,
                    'partition_format': partition_format
                }
            }
        else:
            validation_result['checks']['partition_config'] = {
                'status': 'failed',
                'message': '分区配置不完整'
            }
    
    def check_dimensions_and_measures(self, model_info, validation_result):
        """检查维度和度量"""
        dimensions = model_info.get('dimensions', [])
        metrics = model_info.get('metrics', [])
        
        dimension_count = sum(len(dim.get('columns', [])) for dim in dimensions)
        measure_count = len(metrics)
        
        status = 'passed'
        messages = []
        
        if dimension_count == 0:
            status = 'failed'
            messages.append('没有定义维度')
        
        if measure_count == 0:
            status = 'failed'
            messages.append('没有定义度量')
        
        if dimension_count > 50:
            status = 'warning'
            messages.append(f'维度数量过多({dimension_count}),可能影响性能')
        
        validation_result['checks']['dimensions_and_measures'] = {
            'status': status,
            'message': '; '.join(messages) if messages else f'维度: {dimension_count}, 度量: {measure_count}',
            'details': {
                'dimension_count': dimension_count,
                'measure_count': measure_count
            }
        }
    
    def check_data_quality(self, model_info, validation_result):
        """检查数据质量"""
        fact_table = model_info.get('fact_table')
        
        if not fact_table:
            validation_result['checks']['data_quality'] = {
                'status': 'skipped',
                'message': '无法检查数据质量:事实表未定义'
            }
            return
        
        try:
            # 检查空值比例
            null_check_query = f"""
            SELECT 
                COUNT(*) as total_records,
                COUNT(CASE WHEN sales_amount IS NULL THEN 1 END) as null_sales,
                COUNT(CASE WHEN quantity IS NULL THEN 1 END) as null_quantity
            FROM {fact_table}
            LIMIT 1
            """
            
            # 这里应该执行查询,但为了示例,我们模拟结果
            total_records = 1000000
            null_sales = 100
            null_quantity = 50
            
            null_ratio = (null_sales + null_quantity) / (total_records * 2)
            
            if null_ratio < 0.01:  # 小于1%
                status = 'passed'
                message = f'数据质量良好,空值比例: {null_ratio:.2%}'
            elif null_ratio < 0.05:  # 小于5%
                status = 'warning'
                message = f'数据质量一般,空值比例: {null_ratio:.2%}'
            else:
                status = 'failed'
                message = f'数据质量较差,空值比例: {null_ratio:.2%}'
            
            validation_result['checks']['data_quality'] = {
                'status': status,
                'message': message,
                'details': {
                    'total_records': total_records,
                    'null_sales': null_sales,
                    'null_quantity': null_quantity,
                    'null_ratio': null_ratio
                }
            }
            
        except Exception as e:
            validation_result['checks']['data_quality'] = {
                'status': 'error',
                'message': f'数据质量检查失败: {str(e)}'
            }
    
    def check_table_exists(self, table_name):
        """检查表是否存在"""
        try:
            response = self.session.get(
                f"http://{self.kylin_host}/kylin/api/tables/{table_name}"
            )
            return response.status_code == 200
        except:
            return False
    
    def get_table_record_count(self, table_name):
        """获取表记录数(模拟)"""
        # 在实际实现中,这里应该执行SQL查询
        # 为了示例,我们返回模拟数据
        table_counts = {
            'default.sales_fact': 1000000,
            'default.date_dim': 3650,
            'default.product_dim': 5000,
            'default.customer_dim': 50000,
            'default.store_dim': 2000
        }
        return table_counts.get(table_name, 0)
    
    def calculate_overall_status(self, validation_result):
        """计算总体状态"""
        checks = validation_result['checks']
        
        failed_checks = [name for name, check in checks.items() 
                        if check.get('status') == 'failed']
        error_checks = [name for name, check in checks.items() 
                       if check.get('status') == 'error']
        warning_checks = [name for name, check in checks.items() 
                         if check.get('status') == 'warning']
        
        if error_checks or failed_checks:
            validation_result['overall_status'] = 'failed'
        elif warning_checks:
            validation_result['overall_status'] = 'warning'
        else:
            validation_result['overall_status'] = 'passed'
    
    def generate_recommendations(self, validation_result):
        """生成建议"""
        checks = validation_result['checks']
        recommendations = validation_result['recommendations']
        
        # 基于检查结果生成建议
        if checks.get('fact_table', {}).get('status') == 'failed':
            recommendations.append('请确保事实表存在且包含数据')
        
        if checks.get('dimension_tables', {}).get('status') == 'failed':
            recommendations.append('请检查维度表的存在性和数据完整性')
        
        if checks.get('join_relationships', {}).get('status') == 'failed':
            recommendations.append('请检查表连接关系的主外键配置')
        
        if checks.get('partition_config', {}).get('status') in ['failed', 'warning']:
            recommendations.append('建议配置分区信息以提高查询性能')
        
        dimension_details = checks.get('dimensions_and_measures', {}).get('details', {})
        if dimension_details.get('dimension_count', 0) > 30:
            recommendations.append('考虑减少维度数量或使用聚合组优化')
        
        if checks.get('data_quality', {}).get('status') in ['failed', 'warning']:
            recommendations.append('建议改善数据质量,减少空值和异常数据')
    
    def print_validation_report(self, validation_result):
        """打印验证报告"""
        print("\n=== 数据模型验证报告 ===")
        print(f"模型名称: {validation_result['model_name']}")
        print(f"验证时间: {validation_result['validation_time']}")
        print(f"总体状态: {validation_result['overall_status'].upper()}")
        
        # 打印检查结果
        print("\n📋 检查结果:")
        for check_name, check_result in validation_result['checks'].items():
            status = check_result['status']
            message = check_result['message']
            
            status_icon = {
                'passed': '✅',
                'warning': '⚠️',
                'failed': '❌',
                'error': '🔴',
                'skipped': '⏭️'
            }.get(status, '❓')
            
            print(f"  {status_icon} {check_name}: {message}")
        
        # 打印建议
        recommendations = validation_result['recommendations']
        if recommendations:
            print("\n💡 优化建议:")
            for i, rec in enumerate(recommendations, 1):
                print(f"  {i}. {rec}")
        else:
            print("\n✨ 模型配置良好,无需额外优化")

def main():
    import argparse
    
    parser = argparse.ArgumentParser(description='数据模型验证工具')
    parser.add_argument('--host', default='localhost:7070', help='Kylin服务器地址')
    parser.add_argument('--username', default='ADMIN', help='用户名')
    parser.add_argument('--password', default='KYLIN', help='密码')
    parser.add_argument('--model', required=True, help='要验证的模型名称')
    parser.add_argument('--output', help='输出报告文件路径')
    
    args = parser.parse_args()
    
    validator = ModelValidator(args.host, args.username, args.password)
    result = validator.validate_model(args.model)
    
    # 打印报告
    validator.print_validation_report(result)
    
    # 保存报告
    if args.output:
        with open(args.output, 'w', encoding='utf-8') as f:
            json.dump(result, f, indent=2, ensure_ascii=False, default=str)
        print(f"\n📄 详细报告已保存到: {args.output}")
    
    # 返回适当的退出码
    if result['overall_status'] == 'failed':
        sys.exit(1)
    elif result['overall_status'] == 'warning':
        sys.exit(2)
    else:
        sys.exit(0)

if __name__ == "__main__":
    main()

5.3 维度和度量定义

5.3.1 维度设计原则

1. 维度层次设计

维度应该包含自然的层次结构,支持上钻和下钻分析:

{
  "time_dimension": {
    "table": "date_dim",
    "hierarchy": [
      {
        "level": "year",
        "column": "year",
        "name_column": "year_name"
      },
      {
        "level": "quarter",
        "column": "quarter",
        "name_column": "quarter_name"
      },
      {
        "level": "month",
        "column": "month",
        "name_column": "month_name"
      },
      {
        "level": "day",
        "column": "full_date",
        "name_column": "full_date"
      }
    ]
  },
  "product_dimension": {
    "table": "product_dim",
    "hierarchy": [
      {
        "level": "category",
        "column": "category",
        "name_column": "category"
      },
      {
        "level": "subcategory",
        "column": "subcategory",
        "name_column": "subcategory"
      },
      {
        "level": "product",
        "column": "product_id",
        "name_column": "product_name"
      }
    ]
  }
}

2. 维度属性设计

#!/usr/bin/env python3
# dimension_designer.py - 维度设计工具

from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum

class DimensionType(Enum):
    """维度类型"""
    NORMAL = "normal"          # 普通维度
    DERIVED = "derived"        # 派生维度
    HIERARCHY = "hierarchy"    # 层次维度
    SLOWLY_CHANGING = "scd"    # 缓慢变化维度

class DataType(Enum):
    """数据类型"""
    STRING = "string"
    INTEGER = "integer"
    DECIMAL = "decimal"
    DATE = "date"
    TIMESTAMP = "timestamp"
    BOOLEAN = "boolean"

@dataclass
class DimensionAttribute:
    """维度属性"""
    name: str
    column: str
    data_type: DataType
    description: str
    is_key: bool = False
    is_name: bool = False
    is_hierarchical: bool = False
    parent_attribute: Optional[str] = None
    format_pattern: Optional[str] = None
    default_value: Optional[str] = None

@dataclass
class Dimension:
    """维度定义"""
    name: str
    table: str
    dimension_type: DimensionType
    description: str
    attributes: List[DimensionAttribute]
    primary_key: List[str]
    business_key: Optional[str] = None
    
    def get_hierarchy_levels(self) -> List[DimensionAttribute]:
        """获取层次级别"""
        return [attr for attr in self.attributes if attr.is_hierarchical]
    
    def get_key_attributes(self) -> List[DimensionAttribute]:
        """获取键属性"""
        return [attr for attr in self.attributes if attr.is_key]
    
    def get_name_attributes(self) -> List[DimensionAttribute]:
        """获取名称属性"""
        return [attr for attr in self.attributes if attr.is_name]

class DimensionDesigner:
    """维度设计器"""
    
    def __init__(self):
        self.dimensions = {}
        self.design_rules = {
            'max_attributes_per_dimension': 20,
            'max_hierarchy_levels': 6,
            'require_business_key': True,
            'require_name_attribute': True
        }
    
    def create_time_dimension(self) -> Dimension:
        """创建时间维度"""
        time_attributes = [
            DimensionAttribute(
                name="date_key",
                column="date_key",
                data_type=DataType.INTEGER,
                description="日期键",
                is_key=True
            ),
            DimensionAttribute(
                name="full_date",
                column="full_date",
                data_type=DataType.DATE,
                description="完整日期",
                is_name=True,
                format_pattern="yyyy-MM-dd"
            ),
            DimensionAttribute(
                name="year",
                column="year",
                data_type=DataType.INTEGER,
                description="年份",
                is_hierarchical=True
            ),
            DimensionAttribute(
                name="quarter",
                column="quarter",
                data_type=DataType.INTEGER,
                description="季度",
                is_hierarchical=True,
                parent_attribute="year"
            ),
            DimensionAttribute(
                name="month",
                column="month",
                data_type=DataType.INTEGER,
                description="月份",
                is_hierarchical=True,
                parent_attribute="quarter"
            ),
            DimensionAttribute(
                name="day_of_month",
                column="day_of_month",
                data_type=DataType.INTEGER,
                description="月中的天",
                is_hierarchical=True,
                parent_attribute="month"
            ),
            DimensionAttribute(
                name="day_of_week",
                column="day_of_week",
                data_type=DataType.INTEGER,
                description="星期几"
            ),
            DimensionAttribute(
                name="is_weekend",
                column="is_weekend",
                data_type=DataType.BOOLEAN,
                description="是否周末"
            ),
            DimensionAttribute(
                name="is_holiday",
                column="is_holiday",
                data_type=DataType.BOOLEAN,
                description="是否节假日"
            )
        ]
        
        return Dimension(
            name="time_dimension",
            table="date_dim",
            dimension_type=DimensionType.HIERARCHY,
            description="时间维度",
            attributes=time_attributes,
            primary_key=["date_key"],
            business_key="full_date"
        )
    
    def create_product_dimension(self) -> Dimension:
        """创建产品维度"""
        product_attributes = [
            DimensionAttribute(
                name="product_key",
                column="product_key",
                data_type=DataType.INTEGER,
                description="产品键",
                is_key=True
            ),
            DimensionAttribute(
                name="product_id",
                column="product_id",
                data_type=DataType.STRING,
                description="产品ID"
            ),
            DimensionAttribute(
                name="product_name",
                column="product_name",
                data_type=DataType.STRING,
                description="产品名称",
                is_name=True
            ),
            DimensionAttribute(
                name="category",
                column="category",
                data_type=DataType.STRING,
                description="产品类别",
                is_hierarchical=True
            ),
            DimensionAttribute(
                name="subcategory",
                column="subcategory",
                data_type=DataType.STRING,
                description="产品子类别",
                is_hierarchical=True,
                parent_attribute="category"
            ),
            DimensionAttribute(
                name="brand",
                column="brand",
                data_type=DataType.STRING,
                description="品牌"
            ),
            DimensionAttribute(
                name="unit_price",
                column="unit_price",
                data_type=DataType.DECIMAL,
                description="单价"
            ),
            DimensionAttribute(
                name="product_status",
                column="product_status",
                data_type=DataType.STRING,
                description="产品状态",
                default_value="Active"
            )
        ]
        
        return Dimension(
            name="product_dimension",
            table="product_dim",
            dimension_type=DimensionType.HIERARCHY,
            description="产品维度",
            attributes=product_attributes,
            primary_key=["product_key"],
            business_key="product_id"
        )
    
    def create_customer_dimension(self) -> Dimension:
        """创建客户维度"""
        customer_attributes = [
            DimensionAttribute(
                name="customer_key",
                column="customer_key",
                data_type=DataType.INTEGER,
                description="客户键",
                is_key=True
            ),
            DimensionAttribute(
                name="customer_id",
                column="customer_id",
                data_type=DataType.STRING,
                description="客户ID"
            ),
            DimensionAttribute(
                name="customer_name",
                column="customer_name",
                data_type=DataType.STRING,
                description="客户姓名",
                is_name=True
            ),
            DimensionAttribute(
                name="gender",
                column="gender",
                data_type=DataType.STRING,
                description="性别"
            ),
            DimensionAttribute(
                name="age_group",
                column="age_group",
                data_type=DataType.STRING,
                description="年龄组"
            ),
            DimensionAttribute(
                name="country",
                column="country",
                data_type=DataType.STRING,
                description="国家",
                is_hierarchical=True
            ),
            DimensionAttribute(
                name="state",
                column="state",
                data_type=DataType.STRING,
                description="州/省",
                is_hierarchical=True,
                parent_attribute="country"
            ),
            DimensionAttribute(
                name="city",
                column="city",
                data_type=DataType.STRING,
                description="城市",
                is_hierarchical=True,
                parent_attribute="state"
            ),
            DimensionAttribute(
                name="customer_segment",
                column="customer_segment",
                data_type=DataType.STRING,
                description="客户细分"
            )
        ]
        
        return Dimension(
            name="customer_dimension",
            table="customer_dim",
            dimension_type=DimensionType.SLOWLY_CHANGING,
            description="客户维度",
            attributes=customer_attributes,
            primary_key=["customer_key"],
            business_key="customer_id"
        )
    
    def validate_dimension(self, dimension: Dimension) -> Dict:
        """验证维度设计"""
        validation_result = {
            'dimension_name': dimension.name,
            'is_valid': True,
            'warnings': [],
            'errors': []
        }
        
        # 检查属性数量
        if len(dimension.attributes) > self.design_rules['max_attributes_per_dimension']:
            validation_result['warnings'].append(
                f"属性数量({len(dimension.attributes)})超过建议值"
                f"({self.design_rules['max_attributes_per_dimension']})"
            )
        
        # 检查层次级别
        hierarchy_levels = dimension.get_hierarchy_levels()
        if len(hierarchy_levels) > self.design_rules['max_hierarchy_levels']:
            validation_result['warnings'].append(
                f"层次级别({len(hierarchy_levels)})超过建议值"
                f"({self.design_rules['max_hierarchy_levels']})"
            )
        
        # 检查是否有业务键
        if self.design_rules['require_business_key'] and not dimension.business_key:
            validation_result['errors'].append("缺少业务键")
            validation_result['is_valid'] = False
        
        # 检查是否有名称属性
        name_attributes = dimension.get_name_attributes()
        if self.design_rules['require_name_attribute'] and not name_attributes:
            validation_result['errors'].append("缺少名称属性")
            validation_result['is_valid'] = False
        
        # 检查主键
        if not dimension.primary_key:
            validation_result['errors'].append("缺少主键定义")
            validation_result['is_valid'] = False
        
        return validation_result
    
    def generate_dimension_ddl(self, dimension: Dimension) -> str:
        """生成维度表DDL"""
        ddl_lines = []
        ddl_lines.append(f"-- {dimension.description}")
        ddl_lines.append(f"CREATE TABLE {dimension.table} (")
        
        # 生成列定义
        column_definitions = []
        for attr in dimension.attributes:
            column_def = f"    {attr.column} {self._get_sql_type(attr.data_type)}"
            
            if attr.is_key:
                column_def += " NOT NULL"
            
            if attr.default_value:
                column_def += f" DEFAULT '{attr.default_value}'"
            
            column_def += f" COMMENT '{attr.description}'"
            column_definitions.append(column_def)
        
        ddl_lines.extend(column_definitions)
        
        # 添加主键约束
        if dimension.primary_key:
            pk_columns = ", ".join(dimension.primary_key)
            ddl_lines.append(f"    PRIMARY KEY ({pk_columns})")
        
        ddl_lines.append(");")
        
        return ",\n".join(ddl_lines[:-1]) + "\n" + ddl_lines[-1]
    
    def _get_sql_type(self, data_type: DataType) -> str:
        """获取SQL数据类型"""
        type_mapping = {
            DataType.STRING: "VARCHAR(255)",
            DataType.INTEGER: "INT",
            DataType.DECIMAL: "DECIMAL(19,4)",
            DataType.DATE: "DATE",
            DataType.TIMESTAMP: "TIMESTAMP",
            DataType.BOOLEAN: "BOOLEAN"
        }
        return type_mapping.get(data_type, "VARCHAR(255)")
    
    def export_kylin_model_config(self, dimensions: List[Dimension]) -> Dict:
        """导出Kylin模型配置"""
        model_config = {
            "dimensions": [],
            "lookups": []
        }
        
        for dimension in dimensions:
            # 添加维度配置
            dim_config = {
                "table": dimension.table.split('.')[-1],  # 去掉schema前缀
                "columns": [attr.column for attr in dimension.attributes if not attr.is_key]
            }
            model_config["dimensions"].append(dim_config)
            
            # 添加查找表配置
            lookup_config = {
                "table": dimension.table,
                "kind": "LOOKUP",
                "alias": dimension.table.split('.')[-1],
                "join": {
                    "type": "INNER",
                    "primary_key": dimension.primary_key,
                    "foreign_key": dimension.primary_key  # 假设外键名称相同
                }
            }
            model_config["lookups"].append(lookup_config)
        
        return model_config

def main():
    # 示例使用
    designer = DimensionDesigner()
    
    # 创建维度
    time_dim = designer.create_time_dimension()
    product_dim = designer.create_product_dimension()
    customer_dim = designer.create_customer_dimension()
    
    dimensions = [time_dim, product_dim, customer_dim]
    
    # 验证维度
    print("=== 维度验证结果 ===")
    for dim in dimensions:
        result = designer.validate_dimension(dim)
        print(f"\n{dim.name}:")
        print(f"  有效性: {'✅' if result['is_valid'] else '❌'}")
        
        if result['warnings']:
            print("  警告:")
            for warning in result['warnings']:
                print(f"    ⚠️ {warning}")
        
        if result['errors']:
            print("  错误:")
            for error in result['errors']:
                print(f"    ❌ {error}")
    
    # 生成DDL
    print("\n=== 生成的DDL ===")
    for dim in dimensions:
        print(f"\n{designer.generate_dimension_ddl(dim)}")
    
    # 导出Kylin配置
    kylin_config = designer.export_kylin_model_config(dimensions)
    print("\n=== Kylin模型配置 ===")
    import json
    print(json.dumps(kylin_config, indent=2, ensure_ascii=False))

if __name__ == "__main__":
    main()

5.3.2 度量设计原则

1. 度量类型分类

#!/usr/bin/env python3
# measure_designer.py - 度量设计工具

from dataclasses import dataclass
from typing import List, Dict, Optional, Union
from enum import Enum

class MeasureType(Enum):
    """度量类型"""
    ADDITIVE = "additive"              # 可加性度量
    SEMI_ADDITIVE = "semi_additive"    # 半可加性度量
    NON_ADDITIVE = "non_additive"      # 非可加性度量
    DERIVED = "derived"                # 派生度量

class AggregationFunction(Enum):
    """聚合函数"""
    SUM = "SUM"
    COUNT = "COUNT"
    COUNT_DISTINCT = "COUNT_DISTINCT"
    AVG = "AVG"
    MIN = "MIN"
    MAX = "MAX"
    PERCENTILE = "PERCENTILE"
    TOP_N = "TOP_N"

@dataclass
class MeasureParameter:
    """度量参数"""
    type: str  # column, constant, expression
    value: str
    data_type: str = "decimal(19,4)"

@dataclass
class Measure:
    """度量定义"""
    name: str
    description: str
    measure_type: MeasureType
    aggregation_function: AggregationFunction
    parameter: MeasureParameter
    return_type: str
    format_pattern: Optional[str] = None
    is_visible: bool = True
    depends_on: Optional[List[str]] = None  # 依赖的其他度量
    
    def to_kylin_config(self) -> Dict:
        """转换为Kylin配置格式"""
        return {
            "name": self.name,
            "function": {
                "expression": self.aggregation_function.value,
                "parameter": {
                    "type": self.parameter.type,
                    "value": self.parameter.value
                },
                "returntype": self.return_type
            }
        }

class MeasureDesigner:
    """度量设计器"""
    
    def __init__(self):
        self.measures = {}
        self.design_rules = {
            'max_measures_per_cube': 50,
            'prefer_additive_measures': True,
            'require_count_measure': True
        }
    
    def create_basic_measures(self, fact_table: str) -> List[Measure]:
        """创建基础度量"""
        basic_measures = []
        
        # 销售金额 - 可加性度量
        sales_amount = Measure(
            name="sales_amount",
            description="销售金额",
            measure_type=MeasureType.ADDITIVE,
            aggregation_function=AggregationFunction.SUM,
            parameter=MeasureParameter(
                type="column",
                value=f"{fact_table}.sales_amount",
                data_type="decimal(19,4)"
            ),
            return_type="decimal(19,4)",
            format_pattern="#,##0.00"
        )
        basic_measures.append(sales_amount)
        
        # 销售数量 - 可加性度量
        quantity = Measure(
            name="quantity",
            description="销售数量",
            measure_type=MeasureType.ADDITIVE,
            aggregation_function=AggregationFunction.SUM,
            parameter=MeasureParameter(
                type="column",
                value=f"{fact_table}.quantity",
                data_type="bigint"
            ),
            return_type="bigint",
            format_pattern="#,##0"
        )
        basic_measures.append(quantity)
        
        # 利润 - 可加性度量
        profit = Measure(
            name="profit",
            description="利润",
            measure_type=MeasureType.ADDITIVE,
            aggregation_function=AggregationFunction.SUM,
            parameter=MeasureParameter(
                type="column",
                value=f"{fact_table}.profit",
                data_type="decimal(19,4)"
            ),
            return_type="decimal(19,4)",
            format_pattern="#,##0.00"
        )
        basic_measures.append(profit)
        
        # 交易次数 - 计数度量
        transaction_count = Measure(
            name="transaction_count",
            description="交易次数",
            measure_type=MeasureType.ADDITIVE,
            aggregation_function=AggregationFunction.COUNT,
            parameter=MeasureParameter(
                type="constant",
                value="1",
                data_type="bigint"
            ),
            return_type="bigint",
            format_pattern="#,##0"
        )
        basic_measures.append(transaction_count)
        
        # 唯一客户数 - 去重计数度量
        unique_customers = Measure(
            name="unique_customers",
            description="唯一客户数",
            measure_type=MeasureType.SEMI_ADDITIVE,
            aggregation_function=AggregationFunction.COUNT_DISTINCT,
            parameter=MeasureParameter(
                type="column",
                value=f"{fact_table}.customer_key",
                data_type="int"
            ),
            return_type="bigint",
            format_pattern="#,##0"
        )
        basic_measures.append(unique_customers)
        
        return basic_measures
    
    def create_derived_measures(self, basic_measures: List[Measure]) -> List[Measure]:
        """创建派生度量"""
        derived_measures = []
        
        # 平均销售金额 - 派生度量
        avg_sales_amount = Measure(
            name="avg_sales_amount",
            description="平均销售金额",
            measure_type=MeasureType.DERIVED,
            aggregation_function=AggregationFunction.AVG,
            parameter=MeasureParameter(
                type="expression",
                value="sales_amount / transaction_count",
                data_type="decimal(19,4)"
            ),
            return_type="decimal(19,4)",
            format_pattern="#,##0.00",
            depends_on=["sales_amount", "transaction_count"]
        )
        derived_measures.append(avg_sales_amount)
        
        # 利润率 - 派生度量
        profit_margin = Measure(
            name="profit_margin",
            description="利润率",
            measure_type=MeasureType.DERIVED,
            aggregation_function=AggregationFunction.AVG,
            parameter=MeasureParameter(
                type="expression",
                value="profit / sales_amount",
                data_type="decimal(10,4)"
            ),
            return_type="decimal(10,4)",
            format_pattern="0.00%",
            depends_on=["profit", "sales_amount"]
        )
        derived_measures.append(profit_margin)
        
        # 平均客单价 - 派生度量
        avg_order_value = Measure(
            name="avg_order_value",
            description="平均客单价",
            measure_type=MeasureType.DERIVED,
            aggregation_function=AggregationFunction.AVG,
            parameter=MeasureParameter(
                type="expression",
                value="sales_amount / unique_customers",
                data_type="decimal(19,4)"
            ),
            return_type="decimal(19,4)",
            format_pattern="#,##0.00",
            depends_on=["sales_amount", "unique_customers"]
        )
        derived_measures.append(avg_order_value)
        
        return derived_measures
    
    def validate_measures(self, measures: List[Measure]) -> Dict:
        """验证度量设计"""
        validation_result = {
            'is_valid': True,
            'warnings': [],
            'errors': [],
            'statistics': {
                'total_measures': len(measures),
                'additive_measures': 0,
                'semi_additive_measures': 0,
                'non_additive_measures': 0,
                'derived_measures': 0
            }
        }
        
        # 统计度量类型
        for measure in measures:
            if measure.measure_type == MeasureType.ADDITIVE:
                validation_result['statistics']['additive_measures'] += 1
            elif measure.measure_type == MeasureType.SEMI_ADDITIVE:
                validation_result['statistics']['semi_additive_measures'] += 1
            elif measure.measure_type == MeasureType.NON_ADDITIVE:
                validation_result['statistics']['non_additive_measures'] += 1
            elif measure.measure_type == MeasureType.DERIVED:
                validation_result['statistics']['derived_measures'] += 1
        
        # 检查度量数量
        if len(measures) > self.design_rules['max_measures_per_cube']:
            validation_result['warnings'].append(
                f"度量数量({len(measures)})超过建议值"
                f"({self.design_rules['max_measures_per_cube']})"
            )
        
        # 检查是否有计数度量
        has_count_measure = any(
            measure.aggregation_function == AggregationFunction.COUNT 
            for measure in measures
        )
        
        if self.design_rules['require_count_measure'] and not has_count_measure:
            validation_result['errors'].append("缺少计数度量")
            validation_result['is_valid'] = False
        
        # 检查派生度量的依赖关系
        measure_names = {measure.name for measure in measures}
        for measure in measures:
            if measure.depends_on:
                missing_dependencies = set(measure.depends_on) - measure_names
                if missing_dependencies:
                    validation_result['errors'].append(
                        f"度量 {measure.name} 依赖的度量不存在: {missing_dependencies}"
                    )
                    validation_result['is_valid'] = False
        
        # 检查可加性度量比例
        if self.design_rules['prefer_additive_measures']:
            additive_ratio = validation_result['statistics']['additive_measures'] / len(measures)
            if additive_ratio < 0.5:
                validation_result['warnings'].append(
                    f"可加性度量比例较低({additive_ratio:.1%}),建议增加可加性度量"
                )
        
        return validation_result
    
    def generate_measure_documentation(self, measures: List[Measure]) -> str:
        """生成度量文档"""
        doc_lines = []
        doc_lines.append("# 度量定义文档\n")
        
        # 按类型分组
        measures_by_type = {}
        for measure in measures:
            measure_type = measure.measure_type.value
            if measure_type not in measures_by_type:
                measures_by_type[measure_type] = []
            measures_by_type[measure_type].append(measure)
        
        # 生成各类型的文档
        type_names = {
            'additive': '可加性度量',
            'semi_additive': '半可加性度量',
            'non_additive': '非可加性度量',
            'derived': '派生度量'
        }
        
        for measure_type, type_measures in measures_by_type.items():
            doc_lines.append(f"## {type_names.get(measure_type, measure_type)}\n")
            
            for measure in type_measures:
                doc_lines.append(f"### {measure.name}\n")
                doc_lines.append(f"**描述**: {measure.description}\n")
                doc_lines.append(f"**聚合函数**: {measure.aggregation_function.value}\n")
                doc_lines.append(f"**数据源**: {measure.parameter.value}\n")
                doc_lines.append(f"**返回类型**: {measure.return_type}\n")
                
                if measure.format_pattern:
                    doc_lines.append(f"**格式**: {measure.format_pattern}\n")
                
                if measure.depends_on:
                    doc_lines.append(f"**依赖度量**: {', '.join(measure.depends_on)}\n")
                
                doc_lines.append("\n")
        
        return "\n".join(doc_lines)
    
    def export_kylin_measures_config(self, measures: List[Measure]) -> List[Dict]:
        """导出Kylin度量配置"""
        return [measure.to_kylin_config() for measure in measures if not measure.measure_type == MeasureType.DERIVED]

def main():
    # 示例使用
    designer = MeasureDesigner()
    
    # 创建基础度量
    basic_measures = designer.create_basic_measures("sales_fact")
    
    # 创建派生度量
    derived_measures = designer.create_derived_measures(basic_measures)
    
    # 合并所有度量
    all_measures = basic_measures + derived_measures
    
    # 验证度量
    validation_result = designer.validate_measures(all_measures)
    
    print("=== 度量验证结果 ===")
    print(f"有效性: {'✅' if validation_result['is_valid'] else '❌'}")
    print(f"总度量数: {validation_result['statistics']['total_measures']}")
    print(f"可加性度量: {validation_result['statistics']['additive_measures']}")
    print(f"半可加性度量: {validation_result['statistics']['semi_additive_measures']}")
    print(f"派生度量: {validation_result['statistics']['derived_measures']}")
    
    if validation_result['warnings']:
        print("\n警告:")
        for warning in validation_result['warnings']:
            print(f"  ⚠️ {warning}")
    
    if validation_result['errors']:
        print("\n错误:")
        for error in validation_result['errors']:
            print(f"  ❌ {error}")
    
    # 生成文档
    documentation = designer.generate_measure_documentation(all_measures)
    print("\n=== 度量文档 ===")
    print(documentation)
    
    # 导出Kylin配置
    kylin_measures = designer.export_kylin_measures_config(all_measures)
    print("=== Kylin度量配置 ===")
    import json
    print(json.dumps(kylin_measures, indent=2, ensure_ascii=False))

if __name__ == "__main__":
    main()