6.1 Cube构建基础

6.1.1 Cube构建原理

Apache Kylin通过预计算的方式构建Cube,将多维数据预先聚合并存储,以提供快速的查询响应。

构建流程概述

graph TD
    A[数据源] --> B[数据抽取]
    B --> C[数据清洗]
    C --> D[维度处理]
    D --> E[事实表处理]
    E --> F[Cuboid计算]
    F --> G[数据存储]
    G --> H[索引构建]
    H --> I[元数据更新]

构建阶段详解

  1. 资源准备阶段

    • 检查数据源连接
    • 验证模型配置
    • 分配计算资源
  2. 数据抽取阶段

    • 从数据源读取增量数据
    • 应用分区过滤条件
    • 数据格式转换
  3. 维度处理阶段

    • 维度表数据加载
    • 维度编码处理
    • 维度字典构建
  4. Cuboid计算阶段

    • 按聚合组计算Cuboid
    • 应用聚合函数
    • 数据压缩优化
  5. 存储优化阶段

    • HBase表创建
    • 数据分片存储
    • 索引构建

6.1.2 构建策略配置

增量构建配置

{
  "build_strategy": {
    "type": "incremental",
    "partition_column": "order_date",
    "partition_format": "yyyy-MM-dd",
    "merge_threshold": 7,
    "retention_range": 365,
    "auto_merge": true
  },
  "build_settings": {
    "max_build_segments": 20,
    "segment_size_mb": 1024,
    "build_timeout_hours": 24,
    "retry_count": 3
  }
}

全量构建配置

{
  "build_strategy": {
    "type": "full",
    "data_range": {
      "start_date": "2020-01-01",
      "end_date": "2024-12-31"
    },
    "parallel_segments": 4,
    "segment_granularity": "MONTH"
  },
  "optimization": {
    "enable_compression": true,
    "compression_codec": "snappy",
    "enable_dictionary": true,
    "dictionary_threshold": 1000000
  }
}

6.1.3 构建工具脚本

#!/usr/bin/env python3
# cube_builder.py - Cube构建管理工具

import requests
import json
import time
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import base64

class CubeBuilder:
    """Cube构建管理器"""
    
    def __init__(self, kylin_host: str, username: str, password: str):
        self.kylin_host = kylin_host.rstrip('/')
        self.username = username
        self.password = password
        self.session = requests.Session()
        self.logger = logging.getLogger(__name__)
        
        # 设置认证
        auth_string = f"{username}:{password}"
        auth_bytes = auth_string.encode('ascii')
        auth_b64 = base64.b64encode(auth_bytes).decode('ascii')
        self.session.headers.update({
            'Authorization': f'Basic {auth_b64}',
            'Content-Type': 'application/json'
        })
    
    def authenticate(self) -> bool:
        """认证登录"""
        try:
            url = f"{self.kylin_host}/kylin/api/user/authentication"
            response = self.session.post(url, json={
                'username': self.username,
                'password': self.password
            })
            
            if response.status_code == 200:
                self.logger.info("认证成功")
                return True
            else:
                self.logger.error(f"认证失败: {response.status_code}")
                return False
        except Exception as e:
            self.logger.error(f"认证异常: {e}")
            return False
    
    def get_cube_info(self, cube_name: str) -> Optional[Dict]:
        """获取Cube信息"""
        try:
            url = f"{self.kylin_host}/kylin/api/cubes/{cube_name}"
            response = self.session.get(url)
            
            if response.status_code == 200:
                return response.json()
            else:
                self.logger.error(f"获取Cube信息失败: {response.status_code}")
                return None
        except Exception as e:
            self.logger.error(f"获取Cube信息异常: {e}")
            return None
    
    def build_cube(self, cube_name: str, start_time: str, end_time: str, 
                   build_type: str = "BUILD") -> Optional[str]:
        """构建Cube"""
        try:
            url = f"{self.kylin_host}/kylin/api/cubes/{cube_name}/build"
            
            build_request = {
                "startTime": int(datetime.strptime(start_time, "%Y-%m-%d").timestamp() * 1000),
                "endTime": int(datetime.strptime(end_time, "%Y-%m-%d").timestamp() * 1000),
                "buildType": build_type
            }
            
            response = self.session.put(url, json=build_request)
            
            if response.status_code == 200:
                job_info = response.json()
                job_id = job_info.get('uuid')
                self.logger.info(f"构建任务已提交: {job_id}")
                return job_id
            else:
                self.logger.error(f"构建任务提交失败: {response.status_code}")
                return None
        except Exception as e:
            self.logger.error(f"构建任务提交异常: {e}")
            return None
    
    def get_job_status(self, job_id: str) -> Optional[Dict]:
        """获取任务状态"""
        try:
            url = f"{self.kylin_host}/kylin/api/jobs/{job_id}"
            response = self.session.get(url)
            
            if response.status_code == 200:
                return response.json()
            else:
                self.logger.error(f"获取任务状态失败: {response.status_code}")
                return None
        except Exception as e:
            self.logger.error(f"获取任务状态异常: {e}")
            return None
    
    def wait_for_job_completion(self, job_id: str, timeout_minutes: int = 60) -> bool:
        """等待任务完成"""
        start_time = time.time()
        timeout_seconds = timeout_minutes * 60
        
        while time.time() - start_time < timeout_seconds:
            job_status = self.get_job_status(job_id)
            
            if job_status:
                status = job_status.get('job_status')
                progress = job_status.get('progress', 0)
                
                self.logger.info(f"任务状态: {status}, 进度: {progress}%")
                
                if status == 'FINISHED':
                    self.logger.info("任务完成")
                    return True
                elif status in ['ERROR', 'DISCARDED']:
                    self.logger.error(f"任务失败: {status}")
                    return False
            
            time.sleep(30)  # 等待30秒后再次检查
        
        self.logger.error("任务超时")
        return False
    
    def purge_cube(self, cube_name: str) -> bool:
        """清空Cube数据"""
        try:
            url = f"{self.kylin_host}/kylin/api/cubes/{cube_name}/purge"
            response = self.session.put(url)
            
            if response.status_code == 200:
                self.logger.info(f"Cube {cube_name} 已清空")
                return True
            else:
                self.logger.error(f"清空Cube失败: {response.status_code}")
                return False
        except Exception as e:
            self.logger.error(f"清空Cube异常: {e}")
            return False
    
    def delete_segment(self, cube_name: str, segment_name: str) -> bool:
        """删除Cube段"""
        try:
            url = f"{self.kylin_host}/kylin/api/cubes/{cube_name}/segs/{segment_name}"
            response = self.session.delete(url)
            
            if response.status_code == 200:
                self.logger.info(f"段 {segment_name} 已删除")
                return True
            else:
                self.logger.error(f"删除段失败: {response.status_code}")
                return False
        except Exception as e:
            self.logger.error(f"删除段异常: {e}")
            return False
    
    def merge_segments(self, cube_name: str, start_time: str, end_time: str) -> Optional[str]:
        """合并Cube段"""
        try:
            url = f"{self.kylin_host}/kylin/api/cubes/{cube_name}/merge"
            
            merge_request = {
                "startTime": int(datetime.strptime(start_time, "%Y-%m-%d").timestamp() * 1000),
                "endTime": int(datetime.strptime(end_time, "%Y-%m-%d").timestamp() * 1000)
            }
            
            response = self.session.put(url, json=merge_request)
            
            if response.status_code == 200:
                job_info = response.json()
                job_id = job_info.get('uuid')
                self.logger.info(f"合并任务已提交: {job_id}")
                return job_id
            else:
                self.logger.error(f"合并任务提交失败: {response.status_code}")
                return None
        except Exception as e:
            self.logger.error(f"合并任务提交异常: {e}")
            return None
    
    def refresh_segment(self, cube_name: str, start_time: str, end_time: str) -> Optional[str]:
        """刷新Cube段"""
        try:
            url = f"{self.kylin_host}/kylin/api/cubes/{cube_name}/refresh"
            
            refresh_request = {
                "startTime": int(datetime.strptime(start_time, "%Y-%m-%d").timestamp() * 1000),
                "endTime": int(datetime.strptime(end_time, "%Y-%m-%d").timestamp() * 1000)
            }
            
            response = self.session.put(url, json=refresh_request)
            
            if response.status_code == 200:
                job_info = response.json()
                job_id = job_info.get('uuid')
                self.logger.info(f"刷新任务已提交: {job_id}")
                return job_id
            else:
                self.logger.error(f"刷新任务提交失败: {response.status_code}")
                return None
        except Exception as e:
            self.logger.error(f"刷新任务提交异常: {e}")
            return None
    
    def get_cube_segments(self, cube_name: str) -> List[Dict]:
        """获取Cube段信息"""
        cube_info = self.get_cube_info(cube_name)
        if cube_info:
            return cube_info.get('segments', [])
        return []
    
    def find_expired_segments(self, cube_name: str, retention_days: int = 365) -> List[Dict]:
        """查找过期段"""
        segments = self.get_cube_segments(cube_name)
        expired_segments = []
        
        cutoff_time = datetime.now() - timedelta(days=retention_days)
        cutoff_timestamp = int(cutoff_time.timestamp() * 1000)
        
        for segment in segments:
            if segment.get('date_range_end', 0) < cutoff_timestamp:
                expired_segments.append(segment)
        
        return expired_segments
    
    def auto_build_incremental(self, cube_name: str, days_back: int = 1) -> Optional[str]:
        """自动增量构建"""
        end_date = datetime.now().date()
        start_date = end_date - timedelta(days=days_back)
        
        start_time = start_date.strftime("%Y-%m-%d")
        end_time = end_date.strftime("%Y-%m-%d")
        
        self.logger.info(f"开始增量构建: {start_time} 到 {end_time}")
        return self.build_cube(cube_name, start_time, end_time, "BUILD")
    
    def generate_build_report(self, cube_name: str) -> Dict:
        """生成构建报告"""
        cube_info = self.get_cube_info(cube_name)
        if not cube_info:
            return {"error": "无法获取Cube信息"}
        
        segments = cube_info.get('segments', [])
        
        # 统计信息
        total_segments = len(segments)
        total_size = sum(segment.get('size_kb', 0) for segment in segments)
        
        # 状态统计
        status_count = {}
        for segment in segments:
            status = segment.get('status', 'UNKNOWN')
            status_count[status] = status_count.get(status, 0) + 1
        
        # 最新段信息
        latest_segment = None
        if segments:
            latest_segment = max(segments, key=lambda s: s.get('date_range_end', 0))
        
        return {
            "cube_name": cube_name,
            "report_time": datetime.now().isoformat(),
            "summary": {
                "total_segments": total_segments,
                "total_size_mb": round(total_size / 1024, 2),
                "status_distribution": status_count
            },
            "latest_segment": {
                "name": latest_segment.get('name') if latest_segment else None,
                "status": latest_segment.get('status') if latest_segment else None,
                "size_mb": round(latest_segment.get('size_kb', 0) / 1024, 2) if latest_segment else 0,
                "build_time": latest_segment.get('last_build_time') if latest_segment else None
            },
            "segments": segments
        }
    
    def print_build_report(self, cube_name: str):
        """打印构建报告"""
        report = self.generate_build_report(cube_name)
        
        if "error" in report:
            print(f"❌ 错误: {report['error']}")
            return
        
        print(f"\n=== Cube构建报告 ===")
        print(f"Cube名称: {report['cube_name']}")
        print(f"报告时间: {report['report_time']}")
        
        summary = report['summary']
        print(f"\n📊 概览:")
        print(f"  总段数: {summary['total_segments']}")
        print(f"  总大小: {summary['total_size_mb']} MB")
        
        print(f"\n📈 状态分布:")
        for status, count in summary['status_distribution'].items():
            print(f"  {status}: {count}")
        
        latest = report['latest_segment']
        if latest['name']:
            print(f"\n🕒 最新段:")
            print(f"  名称: {latest['name']}")
            print(f"  状态: {latest['status']}")
            print(f"  大小: {latest['size_mb']} MB")
            print(f"  构建时间: {latest['build_time']}")

def main():
    # 示例使用
    builder = CubeBuilder(
        kylin_host="http://localhost:7070",
        username="ADMIN",
        password="KYLIN"
    )
    
    # 认证
    if not builder.authenticate():
        print("认证失败")
        return
    
    cube_name = "sales_cube"
    
    # 获取构建报告
    builder.print_build_report(cube_name)
    
    # 自动增量构建
    job_id = builder.auto_build_incremental(cube_name)
    if job_id:
        print(f"\n构建任务已提交: {job_id}")
        
        # 等待完成
        if builder.wait_for_job_completion(job_id, timeout_minutes=30):
            print("构建完成")
            builder.print_build_report(cube_name)
        else:
            print("构建失败或超时")

if __name__ == "__main__":
    main()

6.2 构建优化策略

6.2.1 并行构建优化

MapReduce参数调优

#!/bin/bash
# optimize_build_performance.sh - 构建性能优化脚本

# 设置Hadoop配置
echo "配置Hadoop参数..."

# MapReduce内存配置
export HADOOP_HEAPSIZE=4096
export YARN_HEAPSIZE=4096

# MapReduce任务配置
hadoop_conf_dir="/etc/hadoop/conf"

# 创建mapred-site.xml配置
cat > "$hadoop_conf_dir/mapred-site.xml" << EOF
<?xml version="1.0"?>
<configuration>
    <!-- Map任务配置 -->
    <property>
        <name>mapreduce.map.memory.mb</name>
        <value>4096</value>
    </property>
    <property>
        <name>mapreduce.map.java.opts</name>
        <value>-Xmx3276m</value>
    </property>
    <property>
        <name>mapreduce.map.cpu.vcores</name>
        <value>2</value>
    </property>
    
    <!-- Reduce任务配置 -->
    <property>
        <name>mapreduce.reduce.memory.mb</name>
        <value>8192</value>
    </property>
    <property>
        <name>mapreduce.reduce.java.opts</name>
        <value>-Xmx6553m</value>
    </property>
    <property>
        <name>mapreduce.reduce.cpu.vcores</name>
        <value>4</value>
    </property>
    
    <!-- 并行度配置 -->
    <property>
        <name>mapreduce.job.maps</name>
        <value>20</value>
    </property>
    <property>
        <name>mapreduce.job.reduces</name>
        <value>10</value>
    </property>
    
    <!-- 压缩配置 -->
    <property>
        <name>mapreduce.map.output.compress</name>
        <value>true</value>
    </property>
    <property>
        <name>mapreduce.map.output.compress.codec</name>
        <value>org.apache.hadoop.io.compress.SnappyCodec</value>
    </property>
    
    <!-- 输出压缩 -->
    <property>
        <name>mapreduce.output.fileoutputformat.compress</name>
        <value>true</value>
    </property>
    <property>
        <name>mapreduce.output.fileoutputformat.compress.codec</name>
        <value>org.apache.hadoop.io.compress.SnappyCodec</value>
    </property>
EOF

# 创建yarn-site.xml配置
cat > "$hadoop_conf_dir/yarn-site.xml" << EOF
<?xml version="1.0"?>
<configuration>
    <!-- 资源管理配置 -->
    <property>
        <name>yarn.nodemanager.resource.memory-mb</name>
        <value>32768</value>
    </property>
    <property>
        <name>yarn.nodemanager.resource.cpu-vcores</name>
        <value>16</value>
    </property>
    
    <!-- 调度器配置 -->
    <property>
        <name>yarn.scheduler.maximum-allocation-mb</name>
        <value>16384</value>
    </property>
    <property>
        <name>yarn.scheduler.maximum-allocation-vcores</name>
        <value>8</value>
    </property>
    
    <!-- 容器配置 -->
    <property>
        <name>yarn.nodemanager.vmem-pmem-ratio</name>
        <value>4</value>
    </property>
EOF

echo "Hadoop配置完成"

# 设置Kylin构建参数
echo "配置Kylin构建参数..."

kylin_conf_dir="/opt/kylin/conf"

# 创建kylin.properties配置
cat >> "$kylin_conf_dir/kylin.properties" << EOF

# 构建引擎配置
kylin.engine.default=2
kylin.engine.mr.config-override.mapreduce.map.memory.mb=4096
kylin.engine.mr.config-override.mapreduce.reduce.memory.mb=8192
kylin.engine.mr.config-override.mapreduce.map.java.opts=-Xmx3276m
kylin.engine.mr.config-override.mapreduce.reduce.java.opts=-Xmx6553m

# 并行度配置
kylin.engine.mr.config-override.mapreduce.job.split.metainfo.maxsize=268435456
kylin.engine.mr.config-override.mapreduce.input.fileinputformat.split.maxsize=268435456
kylin.engine.mr.config-override.mapreduce.input.fileinputformat.split.minsize=134217728

# 压缩配置
kylin.engine.mr.config-override.mapreduce.map.output.compress=true
kylin.engine.mr.config-override.mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec
kylin.engine.mr.config-override.mapreduce.output.fileoutputformat.compress=true
kylin.engine.mr.config-override.mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec

# HBase配置
kylin.storage.hbase.compression-codec=snappy
kylin.storage.hbase.region-cut-gb=5
kylin.storage.hbase.hfile-size-gb=2

# 构建优化
kylin.cube.algorithm=auto
kylin.cube.algorithm.layer-or-inmem-threshold=7
kylin.cube.aggrgroup.max-combination=32768

# 内存优化
kylin.cube.algorithm.inmem-split-limit=500
kylin.job.mapreduce.default.reduce.input.mb=500
kylin.job.mapreduce.max.reducer.number=500

# 超时配置
kylin.job.timeout=86400
kylin.job.step.timeout=3600
kylin.job.retry=3
EOF

echo "Kylin配置完成"

# 重启服务
echo "重启Kylin服务..."
sudo systemctl restart kylin

echo "构建性能优化配置完成"

6.2.2 存储优化

HBase优化配置

#!/usr/bin/env python3
# hbase_optimizer.py - HBase存储优化工具

import subprocess
import json
import logging
from typing import Dict, List

class HBaseOptimizer:
    """HBase存储优化器"""
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
    
    def optimize_hbase_config(self) -> Dict:
        """优化HBase配置"""
        config = {
            "hbase-site.xml": {
                # 内存配置
                "hbase.regionserver.global.memstore.size": "0.4",
                "hbase.regionserver.global.memstore.size.lower.limit": "0.38",
                "hbase.hregion.memstore.flush.size": "134217728",  # 128MB
                
                # 压缩配置
                "hbase.hregion.majorcompaction": "604800000",  # 7天
                "hbase.hstore.compactionThreshold": "3",
                "hbase.hstore.compaction.max": "10",
                "hbase.hstore.compaction.min": "2",
                
                # 块缓存配置
                "hfile.block.cache.size": "0.4",
                "hbase.bucketcache.ioengine": "offheap",
                "hbase.bucketcache.size": "8192",  # 8GB
                
                # 写入优化
                "hbase.hregion.max.filesize": "10737418240",  # 10GB
                "hbase.hstore.blockingStoreFiles": "16",
                "hbase.hstore.blockingWaitTime": "90000",
                
                # 读取优化
                "hbase.client.scanner.caching": "1000",
                "hbase.client.scanner.timeout.period": "600000",
                "hbase.rpc.timeout": "600000",
                
                # 并发配置
                "hbase.regionserver.handler.count": "100",
                "hbase.regionserver.metahandler.count": "20",
                "hbase.htable.threads.max": "96",
                
                # WAL配置
                "hbase.regionserver.hlog.blocksize": "134217728",  # 128MB
                "hbase.regionserver.maxlogs": "32",
                "hbase.wal.provider": "multiwal",
                
                # 分裂配置
                "hbase.hregion.split.policy": "org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy",
                "hbase.increasing.policy.initial.size": "268435456",  # 256MB
            },
            
            "hbase-env.sh": {
                "HBASE_HEAPSIZE": "16384m",
                "HBASE_OFFHEAPSIZE": "8192m",
                "HBASE_OPTS": "-XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+UnlockExperimentalVMOptions -XX:G1NewSizePercent=20 -XX:G1MaxNewSizePercent=40 -XX:+DisableExplicitGC"
            }
        }
        
        return config
    
    def create_optimized_table(self, table_name: str, column_families: List[str]) -> str:
        """创建优化的HBase表"""
        hbase_shell_commands = []
        
        # 禁用表(如果存在)
        hbase_shell_commands.append(f"disable '{table_name}'")
        hbase_shell_commands.append(f"drop '{table_name}'")
        
        # 创建表定义
        cf_definitions = []
        for cf in column_families:
            cf_def = f"'{cf}' => {{" \
                    f"COMPRESSION => 'SNAPPY', " \
                    f"BLOOMFILTER => 'ROW', " \
                    f"BLOCKSIZE => '65536', " \
                    f"BLOCKCACHE => 'true', " \
                    f"DATA_BLOCK_ENCODING => 'FAST_DIFF', " \
                    f"TTL => '31536000'}}"  # 1年TTL
            cf_definitions.append(cf_def)
        
        create_cmd = f"create '{table_name}', {', '.join(cf_definitions)}, " \
                    f"{{SPLITS => {self._generate_splits()}}"
        
        hbase_shell_commands.append(create_cmd)
        
        # 设置表属性
        hbase_shell_commands.extend([
            f"alter '{table_name}', {{MAX_FILESIZE => '10737418240'}}",  # 10GB
            f"alter '{table_name}', {{MEMSTORE_FLUSHSIZE => '134217728'}}",  # 128MB
            f"alter '{table_name}', {{COMPACTION_ENABLED => 'true'}}",
        ])
        
        return "\n".join(hbase_shell_commands)
    
    def _generate_splits(self) -> str:
        """生成预分裂键"""
        splits = []
        for i in range(1, 16):  # 16个分区
            split_key = f"'{i:02d}'"
            splits.append(split_key)
        return f"[{', '.join(splits)}]"
    
    def monitor_compaction(self, table_name: str) -> Dict:
        """监控压缩状态"""
        try:
            # 获取表状态
            cmd = f"echo \"status '{table_name}'\" | hbase shell -n"
            result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
            
            # 解析输出
            output_lines = result.stdout.split('\n')
            
            compaction_info = {
                "table_name": table_name,
                "regions": [],
                "total_store_files": 0,
                "total_size_mb": 0,
                "compaction_needed": False
            }
            
            # 简化的解析逻辑
            for line in output_lines:
                if "storefiles" in line.lower():
                    # 提取store files数量
                    parts = line.split()
                    for i, part in enumerate(parts):
                        if "storefiles" in part.lower() and i > 0:
                            try:
                                store_files = int(parts[i-1])
                                compaction_info["total_store_files"] += store_files
                                if store_files > 10:  # 阈值
                                    compaction_info["compaction_needed"] = True
                            except ValueError:
                                pass
            
            return compaction_info
            
        except Exception as e:
            self.logger.error(f"监控压缩状态异常: {e}")
            return {"error": str(e)}
    
    def trigger_major_compaction(self, table_name: str) -> bool:
        """触发主压缩"""
        try:
            cmd = f"echo \"major_compact '{table_name}'\" | hbase shell -n"
            result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
            
            if result.returncode == 0:
                self.logger.info(f"已触发表 {table_name} 的主压缩")
                return True
            else:
                self.logger.error(f"触发主压缩失败: {result.stderr}")
                return False
                
        except Exception as e:
            self.logger.error(f"触发主压缩异常: {e}")
            return False
    
    def optimize_region_splits(self, table_name: str) -> List[str]:
        """优化Region分裂"""
        recommendations = []
        
        try:
            # 获取表的Region信息
            cmd = f"echo \"list_regions '{table_name}'\" | hbase shell -n"
            result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
            
            if result.returncode == 0:
                output_lines = result.stdout.split('\n')
                region_count = len([line for line in output_lines if table_name in line])
                
                if region_count < 10:
                    recommendations.append("考虑增加预分裂数量以提高并行度")
                elif region_count > 100:
                    recommendations.append("Region数量过多,考虑增大Region大小")
                
                recommendations.append(f"当前Region数量: {region_count}")
            
        except Exception as e:
            self.logger.error(f"分析Region分裂异常: {e}")
            recommendations.append(f"分析失败: {e}")
        
        return recommendations
    
    def generate_optimization_report(self, table_name: str) -> Dict:
        """生成优化报告"""
        report = {
            "table_name": table_name,
            "timestamp": "2024-01-15T10:00:00Z",
            "compaction_status": self.monitor_compaction(table_name),
            "split_recommendations": self.optimize_region_splits(table_name),
            "configuration_recommendations": []
        }
        
        # 配置建议
        config_recommendations = [
            "启用Snappy压缩以减少存储空间",
            "配置适当的块缓存大小",
            "使用ROW级别的布隆过滤器",
            "启用数据块编码",
            "设置合适的TTL值",
            "配置预分裂以提高写入性能"
        ]
        
        report["configuration_recommendations"] = config_recommendations
        
        return report
    
    def print_optimization_report(self, table_name: str):
        """打印优化报告"""
        report = self.generate_optimization_report(table_name)
        
        print(f"\n=== HBase存储优化报告 ===")
        print(f"表名: {report['table_name']}")
        print(f"时间: {report['timestamp']}")
        
        # 压缩状态
        compaction = report['compaction_status']
        if "error" not in compaction:
            print(f"\n📊 压缩状态:")
            print(f"  Store Files总数: {compaction['total_store_files']}")
            print(f"  需要压缩: {'是' if compaction['compaction_needed'] else '否'}")
        
        # 分裂建议
        if report['split_recommendations']:
            print(f"\n🔧 分裂建议:")
            for rec in report['split_recommendations']:
                print(f"  - {rec}")
        
        # 配置建议
        print(f"\n💡 配置建议:")
        for rec in report['configuration_recommendations']:
            print(f"  - {rec}")

def main():
    optimizer = HBaseOptimizer()
    
    # 生成优化配置
    config = optimizer.optimize_hbase_config()
    print("HBase优化配置:")
    print(json.dumps(config, indent=2, ensure_ascii=False))
    
    # 生成表创建脚本
    table_script = optimizer.create_optimized_table(
        "kylin_sales_cube", 
        ["F1", "F2", "F3"]
    )
    print(f"\n表创建脚本:\n{table_script}")
    
    # 生成优化报告
    optimizer.print_optimization_report("kylin_sales_cube")

if __name__ == "__main__":
    main()

6.3 构建监控与管理

6.3.1 构建监控系统

#!/usr/bin/env python3
# build_monitor.py - Cube构建监控系统

import time
import json
import logging
import smtplib
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import requests
import threading

class BuildMonitor:
    """Cube构建监控器"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.logger = logging.getLogger(__name__)
        self.monitoring = False
        self.alert_rules = config.get('alert_rules', {})
        self.notification_config = config.get('notifications', {})
        
        # 监控状态
        self.job_status_cache = {}
        self.alert_history = []
    
    def start_monitoring(self):
        """启动监控"""
        self.monitoring = True
        self.logger.info("构建监控已启动")
        
        # 启动监控线程
        monitor_thread = threading.Thread(target=self._monitor_loop)
        monitor_thread.daemon = True
        monitor_thread.start()
    
    def stop_monitoring(self):
        """停止监控"""
        self.monitoring = False
        self.logger.info("构建监控已停止")
    
    def _monitor_loop(self):
        """监控循环"""
        while self.monitoring:
            try:
                self._check_running_jobs()
                self._check_failed_jobs()
                self._check_long_running_jobs()
                self._check_resource_usage()
                
                time.sleep(self.config.get('check_interval', 60))  # 默认60秒检查一次
                
            except Exception as e:
                self.logger.error(f"监控循环异常: {e}")
                time.sleep(30)
    
    def _check_running_jobs(self):
        """检查运行中的任务"""
        try:
            # 获取运行中的任务
            running_jobs = self._get_running_jobs()
            
            for job in running_jobs:
                job_id = job.get('uuid')
                job_name = job.get('job_name', 'unknown')
                progress = job.get('progress', 0)
                
                # 更新缓存
                if job_id not in self.job_status_cache:
                    self.job_status_cache[job_id] = {
                        'start_time': datetime.now(),
                        'last_progress': 0,
                        'stuck_count': 0
                    }
                
                cache_entry = self.job_status_cache[job_id]
                
                # 检查进度是否停滞
                if progress == cache_entry['last_progress']:
                    cache_entry['stuck_count'] += 1
                    
                    # 进度停滞告警
                    if cache_entry['stuck_count'] >= self.alert_rules.get('stuck_threshold', 10):
                        self._send_alert(
                            'PROGRESS_STUCK',
                            f"任务 {job_name} 进度停滞",
                            f"任务ID: {job_id}\n当前进度: {progress}%\n停滞时间: {cache_entry['stuck_count']} 分钟"
                        )
                else:
                    cache_entry['last_progress'] = progress
                    cache_entry['stuck_count'] = 0
                
                self.logger.info(f"任务 {job_name} 进度: {progress}%")
                
        except Exception as e:
            self.logger.error(f"检查运行任务异常: {e}")
    
    def _check_failed_jobs(self):
        """检查失败的任务"""
        try:
            # 获取最近失败的任务
            failed_jobs = self._get_failed_jobs()
            
            for job in failed_jobs:
                job_id = job.get('uuid')
                job_name = job.get('job_name', 'unknown')
                error_msg = job.get('error_message', '未知错误')
                
                # 发送失败告警
                self._send_alert(
                    'JOB_FAILED',
                    f"任务 {job_name} 执行失败",
                    f"任务ID: {job_id}\n错误信息: {error_msg}\n失败时间: {datetime.now()}"
                )
                
        except Exception as e:
            self.logger.error(f"检查失败任务异常: {e}")
    
    def _check_long_running_jobs(self):
        """检查长时间运行的任务"""
        try:
            max_duration = self.alert_rules.get('max_job_duration_hours', 24)
            
            for job_id, cache_entry in self.job_status_cache.items():
                duration = datetime.now() - cache_entry['start_time']
                
                if duration.total_seconds() > max_duration * 3600:
                    self._send_alert(
                        'LONG_RUNNING',
                        f"任务运行时间过长",
                        f"任务ID: {job_id}\n运行时间: {duration}\n超时阈值: {max_duration}小时"
                    )
                    
        except Exception as e:
            self.logger.error(f"检查长时间运行任务异常: {e}")
    
    def _check_resource_usage(self):
        """检查资源使用情况"""
        try:
            # 检查YARN资源使用
            yarn_usage = self._get_yarn_usage()
            
            memory_usage = yarn_usage.get('memory_usage_percent', 0)
            cpu_usage = yarn_usage.get('cpu_usage_percent', 0)
            
            # 内存使用告警
            if memory_usage > self.alert_rules.get('memory_threshold', 80):
                self._send_alert(
                    'HIGH_MEMORY_USAGE',
                    f"YARN内存使用率过高",
                    f"当前内存使用率: {memory_usage}%\n阈值: {self.alert_rules.get('memory_threshold', 80)}%"
                )
            
            # CPU使用告警
            if cpu_usage > self.alert_rules.get('cpu_threshold', 80):
                self._send_alert(
                    'HIGH_CPU_USAGE',
                    f"YARN CPU使用率过高",
                    f"当前CPU使用率: {cpu_usage}%\n阈值: {self.alert_rules.get('cpu_threshold', 80)}%"
                )
                
        except Exception as e:
            self.logger.error(f"检查资源使用异常: {e}")
    
    def _get_running_jobs(self) -> List[Dict]:
        """获取运行中的任务"""
        # 模拟API调用
        return [
            {
                'uuid': 'job-001',
                'job_name': 'sales_cube_build',
                'progress': 45,
                'status': 'RUNNING'
            }
        ]
    
    def _get_failed_jobs(self) -> List[Dict]:
        """获取失败的任务"""
        # 模拟API调用
        return []
    
    def _get_yarn_usage(self) -> Dict:
        """获取YARN资源使用情况"""
        # 模拟资源监控
        return {
            'memory_usage_percent': 65,
            'cpu_usage_percent': 45,
            'available_memory_mb': 8192,
            'used_memory_mb': 5324
        }
    
    def _send_alert(self, alert_type: str, subject: str, message: str):
        """发送告警"""
        # 检查告警频率限制
        if self._should_suppress_alert(alert_type):
            return
        
        # 记录告警历史
        alert_record = {
            'type': alert_type,
            'subject': subject,
            'message': message,
            'timestamp': datetime.now()
        }
        self.alert_history.append(alert_record)
        
        # 发送邮件告警
        if self.notification_config.get('email', {}).get('enabled', False):
            self._send_email_alert(subject, message)
        
        # 发送钉钉告警
        if self.notification_config.get('dingtalk', {}).get('enabled', False):
            self._send_dingtalk_alert(subject, message)
        
        self.logger.warning(f"告警: {subject} - {message}")
    
    def _should_suppress_alert(self, alert_type: str) -> bool:
        """检查是否应该抑制告警"""
        suppress_minutes = self.alert_rules.get('alert_suppress_minutes', 30)
        cutoff_time = datetime.now() - timedelta(minutes=suppress_minutes)
        
        # 检查最近是否有相同类型的告警
        recent_alerts = [
            alert for alert in self.alert_history
            if alert['type'] == alert_type and alert['timestamp'] > cutoff_time
        ]
        
        return len(recent_alerts) > 0
    
    def _send_email_alert(self, subject: str, message: str):
        """发送邮件告警"""
        try:
            email_config = self.notification_config['email']
            
            msg = MIMEMultipart()
            msg['From'] = email_config['from']
            msg['To'] = ', '.join(email_config['to'])
            msg['Subject'] = f"[Kylin告警] {subject}"
            
            body = f"""
            Kylin构建监控告警
            
            时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
            类型: {subject}
            
            详细信息:
            {message}
            
            请及时处理。
            """
            
            msg.attach(MIMEText(body, 'plain', 'utf-8'))
            
            server = smtplib.SMTP(email_config['smtp_host'], email_config['smtp_port'])
            if email_config.get('use_tls', True):
                server.starttls()
            
            if email_config.get('username') and email_config.get('password'):
                server.login(email_config['username'], email_config['password'])
            
            server.send_message(msg)
            server.quit()
            
            self.logger.info("邮件告警发送成功")
            
        except Exception as e:
            self.logger.error(f"发送邮件告警失败: {e}")
    
    def _send_dingtalk_alert(self, subject: str, message: str):
        """发送钉钉告警"""
        try:
            dingtalk_config = self.notification_config['dingtalk']
            webhook_url = dingtalk_config['webhook_url']
            
            payload = {
                "msgtype": "text",
                "text": {
                    "content": f"Kylin构建告警\n\n{subject}\n\n{message}"
                },
                "at": {
                    "atMobiles": dingtalk_config.get('at_mobiles', []),
                    "isAtAll": dingtalk_config.get('at_all', False)
                }
            }
            
            response = requests.post(webhook_url, json=payload)
            
            if response.status_code == 200:
                self.logger.info("钉钉告警发送成功")
            else:
                self.logger.error(f"钉钉告警发送失败: {response.status_code}")
                
        except Exception as e:
            self.logger.error(f"发送钉钉告警失败: {e}")
    
    def get_monitoring_status(self) -> Dict:
        """获取监控状态"""
        return {
            "monitoring_active": self.monitoring,
            "active_jobs": len(self.job_status_cache),
            "recent_alerts": len([
                alert for alert in self.alert_history
                if alert['timestamp'] > datetime.now() - timedelta(hours=24)
            ]),
            "last_check": datetime.now().isoformat()
        }
    
    def generate_monitoring_report(self) -> Dict:
        """生成监控报告"""
        now = datetime.now()
        last_24h = now - timedelta(hours=24)
        
        # 统计最近24小时的告警
        recent_alerts = [
            alert for alert in self.alert_history
            if alert['timestamp'] > last_24h
        ]
        
        alert_by_type = {}
        for alert in recent_alerts:
            alert_type = alert['type']
            alert_by_type[alert_type] = alert_by_type.get(alert_type, 0) + 1
        
        return {
            "report_time": now.isoformat(),
            "monitoring_period": "24小时",
            "summary": {
                "total_alerts": len(recent_alerts),
                "active_jobs": len(self.job_status_cache),
                "monitoring_status": "运行中" if self.monitoring else "已停止"
            },
            "alert_distribution": alert_by_type,
            "recent_alerts": [
                {
                    "type": alert['type'],
                    "subject": alert['subject'],
                    "timestamp": alert['timestamp'].isoformat()
                }
                for alert in recent_alerts[-10:]  # 最近10条
            ]
        }
    
    def print_monitoring_report(self):
        """打印监控报告"""
        report = self.generate_monitoring_report()
        
        print(f"\n=== Cube构建监控报告 ===")
        print(f"报告时间: {report['report_time']}")
        print(f"监控周期: {report['monitoring_period']}")
        
        summary = report['summary']
        print(f"\n📊 概览:")
        print(f"  监控状态: {summary['monitoring_status']}")
        print(f"  活跃任务: {summary['active_jobs']}")
        print(f"  告警总数: {summary['total_alerts']}")
        
        if report['alert_distribution']:
            print(f"\n🚨 告警分布:")
            for alert_type, count in report['alert_distribution'].items():
                print(f"  {alert_type}: {count}")
        
        if report['recent_alerts']:
            print(f"\n🕒 最近告警:")
            for alert in report['recent_alerts']:
                print(f"  [{alert['timestamp']}] {alert['type']}: {alert['subject']}")

def main():
    # 监控配置
    config = {
        "check_interval": 60,  # 检查间隔(秒)
        "alert_rules": {
            "stuck_threshold": 10,  # 进度停滞阈值(分钟)
            "max_job_duration_hours": 24,  # 最大任务运行时间(小时)
            "memory_threshold": 80,  # 内存使用阈值(%)
            "cpu_threshold": 80,  # CPU使用阈值(%)
            "alert_suppress_minutes": 30  # 告警抑制时间(分钟)
        },
        "notifications": {
            "email": {
                "enabled": True,
                "smtp_host": "smtp.company.com",
                "smtp_port": 587,
                "use_tls": True,
                "from": "kylin-monitor@company.com",
                "to": ["admin@company.com"],
                "username": "kylin-monitor@company.com",
                "password": "password"
            },
            "dingtalk": {
                "enabled": True,
                "webhook_url": "https://oapi.dingtalk.com/robot/send?access_token=xxx",
                "at_mobiles": ["13800138000"],
                "at_all": False
            }
        }
    }
    
    # 创建监控器
    monitor = BuildMonitor(config)
    
    # 启动监控
    monitor.start_monitoring()
    
    try:
        # 运行一段时间后生成报告
        time.sleep(5)
        monitor.print_monitoring_report()
        
        # 保持运行
        while True:
            time.sleep(60)
            
    except KeyboardInterrupt:
        print("\n停止监控...")
        monitor.stop_monitoring()

if __name__ == "__main__":
    main()

6.4 构建故障排除

6.4.1 常见构建问题诊断

#!/usr/bin/env python3
# build_troubleshooter.py - 构建故障排除工具

import os
import re
import json
import subprocess
import logging
from datetime import datetime
from typing import Dict, List, Optional, Tuple

class BuildTroubleshooter:
    """构建故障排除器"""
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.diagnostic_results = []
    
    def diagnose_build_failure(self, job_id: str, log_path: str) -> Dict:
        """诊断构建失败"""
        diagnosis = {
            "job_id": job_id,
            "diagnosis_time": datetime.now().isoformat(),
            "issues_found": [],
            "recommendations": [],
            "log_analysis": {}
        }
        
        # 分析日志文件
        if os.path.exists(log_path):
            diagnosis["log_analysis"] = self._analyze_log_file(log_path)
        
        # 检查常见问题
        diagnosis["issues_found"].extend(self._check_memory_issues(log_path))
        diagnosis["issues_found"].extend(self._check_data_issues(log_path))
        diagnosis["issues_found"].extend(self._check_configuration_issues(log_path))
        diagnosis["issues_found"].extend(self._check_resource_issues(log_path))
        diagnosis["issues_found"].extend(self._check_network_issues(log_path))
        
        # 生成建议
        diagnosis["recommendations"] = self._generate_recommendations(diagnosis["issues_found"])
        
        return diagnosis
    
    def _analyze_log_file(self, log_path: str) -> Dict:
        """分析日志文件"""
        analysis = {
            "file_size_mb": 0,
            "total_lines": 0,
            "error_count": 0,
            "warning_count": 0,
            "key_errors": [],
            "performance_metrics": {}
        }
        
        try:
            # 获取文件大小
            file_size = os.path.getsize(log_path)
            analysis["file_size_mb"] = round(file_size / (1024 * 1024), 2)
            
            with open(log_path, 'r', encoding='utf-8', errors='ignore') as f:
                lines = f.readlines()
                analysis["total_lines"] = len(lines)
                
                # 分析每一行
                for line_num, line in enumerate(lines, 1):
                    line_lower = line.lower()
                    
                    # 统计错误和警告
                    if 'error' in line_lower:
                        analysis["error_count"] += 1
                        if len(analysis["key_errors"]) < 10:  # 只保留前10个关键错误
                            analysis["key_errors"].append({
                                "line_number": line_num,
                                "content": line.strip()
                            })
                    
                    if 'warning' in line_lower or 'warn' in line_lower:
                        analysis["warning_count"] += 1
                    
                    # 提取性能指标
                    self._extract_performance_metrics(line, analysis["performance_metrics"])
        
        except Exception as e:
            self.logger.error(f"分析日志文件异常: {e}")
            analysis["error"] = str(e)
        
        return analysis
    
    def _extract_performance_metrics(self, line: str, metrics: Dict):
        """提取性能指标"""
        # 提取内存使用
        memory_match = re.search(r'memory.*?(\d+)\s*mb', line.lower())
        if memory_match:
            memory_mb = int(memory_match.group(1))
            if 'max_memory_mb' not in metrics or memory_mb > metrics['max_memory_mb']:
                metrics['max_memory_mb'] = memory_mb
        
        # 提取处理时间
        time_match = re.search(r'took\s+(\d+)\s*ms', line.lower())
        if time_match:
            time_ms = int(time_match.group(1))
            if 'processing_times' not in metrics:
                metrics['processing_times'] = []
            metrics['processing_times'].append(time_ms)
        
        # 提取记录数
        records_match = re.search(r'processed\s+(\d+)\s*records', line.lower())
        if records_match:
            records = int(records_match.group(1))
            metrics['total_records'] = metrics.get('total_records', 0) + records
    
    def _check_memory_issues(self, log_path: str) -> List[Dict]:
        """检查内存问题"""
        issues = []
        
        try:
            with open(log_path, 'r', encoding='utf-8', errors='ignore') as f:
                content = f.read().lower()
                
                # OutOfMemoryError
                if 'outofmemoryerror' in content or 'java heap space' in content:
                    issues.append({
                        "type": "MEMORY_ERROR",
                        "severity": "HIGH",
                        "description": "检测到内存溢出错误",
                        "details": "Java堆内存不足,需要增加内存配置"
                    })
                
                # GC问题
                if 'gc overhead limit exceeded' in content:
                    issues.append({
                        "type": "GC_ERROR",
                        "severity": "HIGH",
                        "description": "GC开销过大",
                        "details": "垃圾回收时间过长,可能需要调整GC参数"
                    })
                
                # 内存泄漏
                if content.count('gc') > 100:  # 频繁GC
                    issues.append({
                        "type": "MEMORY_LEAK",
                        "severity": "MEDIUM",
                        "description": "可能存在内存泄漏",
                        "details": "检测到频繁的垃圾回收,建议检查内存使用"
                    })
        
        except Exception as e:
            self.logger.error(f"检查内存问题异常: {e}")
        
        return issues
    
    def _check_data_issues(self, log_path: str) -> List[Dict]:
        """检查数据问题"""
        issues = []
        
        try:
            with open(log_path, 'r', encoding='utf-8', errors='ignore') as f:
                content = f.read().lower()
                
                # 数据格式错误
                if 'numberformatexception' in content or 'parseexception' in content:
                    issues.append({
                        "type": "DATA_FORMAT_ERROR",
                        "severity": "HIGH",
                        "description": "数据格式错误",
                        "details": "源数据格式不符合预期,需要检查数据质量"
                    })
                
                # 空数据
                if 'no data found' in content or 'empty result' in content:
                    issues.append({
                        "type": "NO_DATA",
                        "severity": "MEDIUM",
                        "description": "未找到数据",
                        "details": "指定时间范围内没有数据,请检查数据源"
                    })
                
                # 数据倾斜
                if 'data skew' in content or 'uneven distribution' in content:
                    issues.append({
                        "type": "DATA_SKEW",
                        "severity": "MEDIUM",
                        "description": "数据倾斜",
                        "details": "数据分布不均匀,可能影响构建性能"
                    })
        
        except Exception as e:
            self.logger.error(f"检查数据问题异常: {e}")
        
        return issues
    
    def _check_configuration_issues(self, log_path: str) -> List[Dict]:
        """检查配置问题"""
        issues = []
        
        try:
            with open(log_path, 'r', encoding='utf-8', errors='ignore') as f:
                content = f.read().lower()
                
                # 配置文件错误
                if 'configuration error' in content or 'invalid configuration' in content:
                    issues.append({
                        "type": "CONFIG_ERROR",
                        "severity": "HIGH",
                        "description": "配置错误",
                        "details": "Kylin配置文件存在错误,需要检查配置"
                    })
                
                # 连接问题
                if 'connection refused' in content or 'connection timeout' in content:
                    issues.append({
                        "type": "CONNECTION_ERROR",
                        "severity": "HIGH",
                        "description": "连接错误",
                        "details": "无法连接到数据源或存储系统"
                    })
                
                # 权限问题
                if 'permission denied' in content or 'access denied' in content:
                    issues.append({
                        "type": "PERMISSION_ERROR",
                        "severity": "HIGH",
                        "description": "权限错误",
                        "details": "缺少必要的访问权限"
                    })
        
        except Exception as e:
            self.logger.error(f"检查配置问题异常: {e}")
        
        return issues
    
    def _check_resource_issues(self, log_path: str) -> List[Dict]:
        """检查资源问题"""
        issues = []
        
        try:
            with open(log_path, 'r', encoding='utf-8', errors='ignore') as f:
                content = f.read().lower()
                
                # 磁盘空间不足
                if 'no space left' in content or 'disk full' in content:
                    issues.append({
                        "type": "DISK_SPACE_ERROR",
                        "severity": "HIGH",
                        "description": "磁盘空间不足",
                        "details": "临时目录或输出目录空间不足"
                    })
                
                # 资源不足
                if 'insufficient resources' in content or 'resource exhausted' in content:
                    issues.append({
                        "type": "RESOURCE_ERROR",
                        "severity": "HIGH",
                        "description": "资源不足",
                        "details": "YARN或Hadoop集群资源不足"
                    })
                
                # 超时
                if 'timeout' in content or 'timed out' in content:
                    issues.append({
                        "type": "TIMEOUT_ERROR",
                        "severity": "MEDIUM",
                        "description": "操作超时",
                        "details": "任务执行时间超过配置的超时时间"
                    })
        
        except Exception as e:
            self.logger.error(f"检查资源问题异常: {e}")
        
        return issues
    
    def _check_network_issues(self, log_path: str) -> List[Dict]:
        """检查网络问题"""
        issues = []
        
        try:
            with open(log_path, 'r', encoding='utf-8', errors='ignore') as f:
                content = f.read().lower()
                
                # 网络连接问题
                if 'network unreachable' in content or 'host unreachable' in content:
                    issues.append({
                        "type": "NETWORK_ERROR",
                        "severity": "HIGH",
                        "description": "网络连接问题",
                        "details": "无法连接到远程主机或服务"
                    })
                
                # DNS问题
                if 'unknown host' in content or 'name resolution failed' in content:
                    issues.append({
                        "type": "DNS_ERROR",
                        "severity": "MEDIUM",
                        "description": "DNS解析问题",
                        "details": "主机名无法解析,检查DNS配置"
                    })
        
        except Exception as e:
            self.logger.error(f"检查网络问题异常: {e}")
        
        return issues
    
    def _generate_recommendations(self, issues: List[Dict]) -> List[str]:
        """生成修复建议"""
        recommendations = []
        
        issue_types = [issue['type'] for issue in issues]
        
        # 内存相关建议
        if any(t in issue_types for t in ['MEMORY_ERROR', 'GC_ERROR', 'MEMORY_LEAK']):
            recommendations.extend([
                "增加JVM堆内存大小:-Xmx8g -Xms4g",
                "优化GC参数:-XX:+UseG1GC -XX:MaxGCPauseMillis=200",
                "检查内存泄漏,使用内存分析工具",
                "减少并发任务数量以降低内存压力"
            ])
        
        # 数据相关建议
        if any(t in issue_types for t in ['DATA_FORMAT_ERROR', 'NO_DATA', 'DATA_SKEW']):
            recommendations.extend([
                "验证源数据格式和质量",
                "检查数据源连接和权限",
                "优化数据分区策略以减少数据倾斜",
                "使用数据预处理清理异常数据"
            ])
        
        # 配置相关建议
        if any(t in issue_types for t in ['CONFIG_ERROR', 'CONNECTION_ERROR', 'PERMISSION_ERROR']):
            recommendations.extend([
                "检查Kylin配置文件语法",
                "验证数据源连接配置",
                "确认用户权限和访问控制",
                "检查防火墙和网络配置"
            ])
        
        # 资源相关建议
        if any(t in issue_types for t in ['DISK_SPACE_ERROR', 'RESOURCE_ERROR', 'TIMEOUT_ERROR']):
            recommendations.extend([
                "清理临时文件和日志文件",
                "增加YARN集群资源配置",
                "调整任务超时时间配置",
                "优化任务并行度和资源分配"
            ])
        
        # 网络相关建议
        if any(t in issue_types for t in ['NETWORK_ERROR', 'DNS_ERROR']):
            recommendations.extend([
                "检查网络连通性",
                "验证DNS配置",
                "检查防火墙规则",
                "确认服务端口开放状态"
            ])
        
        # 通用建议
        if not recommendations:
            recommendations.extend([
                "检查Kylin和Hadoop集群状态",
                "查看详细的错误日志",
                "验证系统资源使用情况",
                "联系系统管理员获取支持"
            ])
        
        return recommendations
    
    def check_system_health(self) -> Dict:
        """检查系统健康状态"""
        health_status = {
            "timestamp": datetime.now().isoformat(),
            "overall_status": "HEALTHY",
            "checks": {}
        }
        
        # 检查磁盘空间
        disk_check = self._check_disk_space()
        health_status["checks"]["disk_space"] = disk_check
        
        # 检查内存使用
        memory_check = self._check_memory_usage()
        health_status["checks"]["memory_usage"] = memory_check
        
        # 检查服务状态
        service_check = self._check_service_status()
        health_status["checks"]["service_status"] = service_check
        
        # 检查网络连接
        network_check = self._check_network_connectivity()
        health_status["checks"]["network_connectivity"] = network_check
        
        # 确定整体状态
        failed_checks = [
            check for check in health_status["checks"].values()
            if check["status"] != "OK"
        ]
        
        if failed_checks:
            if any(check["status"] == "CRITICAL" for check in failed_checks):
                health_status["overall_status"] = "CRITICAL"
            else:
                health_status["overall_status"] = "WARNING"
        
        return health_status
    
    def _check_disk_space(self) -> Dict:
        """检查磁盘空间"""
        try:
            # 检查根目录
            result = subprocess.run(['df', '-h', '/'], capture_output=True, text=True)
            if result.returncode == 0:
                lines = result.stdout.strip().split('\n')
                if len(lines) > 1:
                    parts = lines[1].split()
                    if len(parts) >= 5:
                        usage_percent = int(parts[4].rstrip('%'))
                        
                        if usage_percent > 90:
                            return {
                                "status": "CRITICAL",
                                "message": f"磁盘使用率过高: {usage_percent}%",
                                "usage_percent": usage_percent
                            }
                        elif usage_percent > 80:
                            return {
                                "status": "WARNING",
                                "message": f"磁盘使用率较高: {usage_percent}%",
                                "usage_percent": usage_percent
                            }
                        else:
                            return {
                                "status": "OK",
                                "message": f"磁盘使用率正常: {usage_percent}%",
                                "usage_percent": usage_percent
                            }
            
            return {
                "status": "UNKNOWN",
                "message": "无法获取磁盘使用信息"
            }
            
        except Exception as e:
            return {
                "status": "ERROR",
                "message": f"检查磁盘空间异常: {e}"
            }
    
    def _check_memory_usage(self) -> Dict:
        """检查内存使用"""
        try:
            result = subprocess.run(['free', '-m'], capture_output=True, text=True)
            if result.returncode == 0:
                lines = result.stdout.strip().split('\n')
                if len(lines) > 1:
                    parts = lines[1].split()
                    if len(parts) >= 3:
                        total_mb = int(parts[1])
                        used_mb = int(parts[2])
                        usage_percent = int((used_mb / total_mb) * 100)
                        
                        if usage_percent > 90:
                            return {
                                "status": "CRITICAL",
                                "message": f"内存使用率过高: {usage_percent}%",
                                "usage_percent": usage_percent,
                                "total_mb": total_mb,
                                "used_mb": used_mb
                            }
                        elif usage_percent > 80:
                            return {
                                "status": "WARNING",
                                "message": f"内存使用率较高: {usage_percent}%",
                                "usage_percent": usage_percent,
                                "total_mb": total_mb,
                                "used_mb": used_mb
                            }
                        else:
                            return {
                                "status": "OK",
                                "message": f"内存使用率正常: {usage_percent}%",
                                "usage_percent": usage_percent,
                                "total_mb": total_mb,
                                "used_mb": used_mb
                            }
            
            return {
                "status": "UNKNOWN",
                "message": "无法获取内存使用信息"
            }
            
        except Exception as e:
            return {
                "status": "ERROR",
                "message": f"检查内存使用异常: {e}"
            }
    
    def _check_service_status(self) -> Dict:
        """检查服务状态"""
        services = ['kylin', 'hadoop-namenode', 'hadoop-datanode', 'yarn-resourcemanager']
        service_status = {}
        overall_status = "OK"
        
        for service in services:
            try:
                result = subprocess.run(
                    ['systemctl', 'is-active', service],
                    capture_output=True, text=True
                )
                
                if result.stdout.strip() == 'active':
                    service_status[service] = "RUNNING"
                else:
                    service_status[service] = "STOPPED"
                    overall_status = "CRITICAL"
                    
            except Exception:
                service_status[service] = "UNKNOWN"
                overall_status = "WARNING"
        
        return {
            "status": overall_status,
            "message": f"服务状态检查完成",
            "services": service_status
        }
    
    def _check_network_connectivity(self) -> Dict:
        """检查网络连接"""
        hosts = ['localhost', '127.0.0.1']
        connectivity_status = {}
        overall_status = "OK"
        
        for host in hosts:
            try:
                result = subprocess.run(
                    ['ping', '-c', '1', '-W', '3', host],
                    capture_output=True, text=True
                )
                
                if result.returncode == 0:
                    connectivity_status[host] = "REACHABLE"
                else:
                    connectivity_status[host] = "UNREACHABLE"
                    overall_status = "WARNING"
                    
            except Exception:
                connectivity_status[host] = "ERROR"
                overall_status = "WARNING"
        
        return {
            "status": overall_status,
            "message": "网络连接检查完成",
            "hosts": connectivity_status
        }
    
    def generate_troubleshooting_report(self, job_id: str, log_path: str) -> Dict:
        """生成故障排除报告"""
        report = {
            "job_id": job_id,
            "report_time": datetime.now().isoformat(),
            "diagnosis": self.diagnose_build_failure(job_id, log_path),
            "system_health": self.check_system_health(),
            "summary": {}
        }
        
        # 生成摘要
        issues_count = len(report["diagnosis"]["issues_found"])
        critical_issues = len([
            issue for issue in report["diagnosis"]["issues_found"]
            if issue.get("severity") == "HIGH"
        ])
        
        report["summary"] = {
            "total_issues": issues_count,
            "critical_issues": critical_issues,
            "system_status": report["system_health"]["overall_status"],
            "recommendations_count": len(report["diagnosis"]["recommendations"])
        }
        
        return report
    
    def print_troubleshooting_report(self, job_id: str, log_path: str):
        """打印故障排除报告"""
        report = self.generate_troubleshooting_report(job_id, log_path)
        
        print(f"\n=== Cube构建故障排除报告 ===")
        print(f"任务ID: {report['job_id']}")
        print(f"报告时间: {report['report_time']}")
        
        # 摘要
        summary = report['summary']
        print(f"\n📊 摘要:")
        print(f"  发现问题: {summary['total_issues']}")
        print(f"  严重问题: {summary['critical_issues']}")
        print(f"  系统状态: {summary['system_status']}")
        print(f"  修复建议: {summary['recommendations_count']}")
        
        # 问题详情
        issues = report['diagnosis']['issues_found']
        if issues:
            print(f"\n🚨 发现的问题:")
            for i, issue in enumerate(issues, 1):
                severity_icon = "🔴" if issue['severity'] == 'HIGH' else "🟡"
                print(f"  {i}. {severity_icon} {issue['description']}")
                print(f"     类型: {issue['type']}")
                print(f"     详情: {issue['details']}")
        
        # 修复建议
        recommendations = report['diagnosis']['recommendations']
        if recommendations:
            print(f"\n💡 修复建议:")
            for i, rec in enumerate(recommendations, 1):
                print(f"  {i}. {rec}")
        
        # 系统健康状态
        health = report['system_health']
        print(f"\n🏥 系统健康状态:")
        for check_name, check_result in health['checks'].items():
            status_icon = "✅" if check_result['status'] == 'OK' else "❌"
            print(f"  {status_icon} {check_name}: {check_result['message']}")

def main():
    troubleshooter = BuildTroubleshooter()
    
    # 示例:诊断构建失败
    job_id = "job-12345"
    log_path = "/opt/kylin/logs/kylin.log"
    
    # 生成故障排除报告
    troubleshooter.print_troubleshooting_report(job_id, log_path)
    
    # 检查系统健康状态
    health_status = troubleshooter.check_system_health()
    print(f"\n系统整体状态: {health_status['overall_status']}")

if __name__ == "__main__":
    main()

6.4.2 性能调优指南

JVM参数优化

#!/bin/bash
# jvm_tuning.sh - JVM性能调优脚本

echo "开始JVM性能调优..."

# 获取系统内存
TOTAL_MEMORY=$(free -m | awk 'NR==2{printf "%.0f", $2}')
echo "系统总内存: ${TOTAL_MEMORY}MB"

# 计算JVM内存分配
KYLIN_HEAP_SIZE=$((TOTAL_MEMORY * 60 / 100))  # 60%给Kylin
YARN_HEAP_SIZE=$((TOTAL_MEMORY * 30 / 100))   # 30%给YARN

echo "建议Kylin堆内存: ${KYLIN_HEAP_SIZE}MB"
echo "建议YARN堆内存: ${YARN_HEAP_SIZE}MB"

# 创建Kylin JVM配置
cat > /opt/kylin/conf/setenv.sh << EOF
#!/bin/bash

# JVM堆内存配置
export KYLIN_JVM_SETTINGS="-Xms${KYLIN_HEAP_SIZE}m -Xmx${KYLIN_HEAP_SIZE}m"

# GC配置
export KYLIN_JVM_SETTINGS="\$KYLIN_JVM_SETTINGS -XX:+UseG1GC"
export KYLIN_JVM_SETTINGS="\$KYLIN_JVM_SETTINGS -XX:MaxGCPauseMillis=200"
export KYLIN_JVM_SETTINGS="\$KYLIN_JVM_SETTINGS -XX:G1HeapRegionSize=16m"
export KYLIN_JVM_SETTINGS="\$KYLIN_JVM_SETTINGS -XX:+UnlockExperimentalVMOptions"
export KYLIN_JVM_SETTINGS="\$KYLIN_JVM_SETTINGS -XX:G1NewSizePercent=20"
export KYLIN_JVM_SETTINGS="\$KYLIN_JVM_SETTINGS -XX:G1MaxNewSizePercent=40"
export KYLIN_JVM_SETTINGS="\$KYLIN_JVM_SETTINGS -XX:+DisableExplicitGC"

# 内存优化
export KYLIN_JVM_SETTINGS="\$KYLIN_JVM_SETTINGS -XX:+UseStringDeduplication"
export KYLIN_JVM_SETTINGS="\$KYLIN_JVM_SETTINGS -XX:+OptimizeStringConcat"
export KYLIN_JVM_SETTINGS="\$KYLIN_JVM_SETTINGS -XX:+UseCompressedOops"
export KYLIN_JVM_SETTINGS="\$KYLIN_JVM_SETTINGS -XX:+UseCompressedClassPointers"

# 性能监控
export KYLIN_JVM_SETTINGS="\$KYLIN_JVM_SETTINGS -XX:+PrintGC"
export KYLIN_JVM_SETTINGS="\$KYLIN_JVM_SETTINGS -XX:+PrintGCDetails"
export KYLIN_JVM_SETTINGS="\$KYLIN_JVM_SETTINGS -XX:+PrintGCTimeStamps"
export KYLIN_JVM_SETTINGS="\$KYLIN_JVM_SETTINGS -XX:+PrintGCApplicationStoppedTime"
export KYLIN_JVM_SETTINGS="\$KYLIN_JVM_SETTINGS -Xloggc:/opt/kylin/logs/gc.log"
export KYLIN_JVM_SETTINGS="\$KYLIN_JVM_SETTINGS -XX:+UseGCLogFileRotation"
export KYLIN_JVM_SETTINGS="\$KYLIN_JVM_SETTINGS -XX:NumberOfGCLogFiles=10"
export KYLIN_JVM_SETTINGS="\$KYLIN_JVM_SETTINGS -XX:GCLogFileSize=100M"

# 调试和诊断
export KYLIN_JVM_SETTINGS="\$KYLIN_JVM_SETTINGS -XX:+HeapDumpOnOutOfMemoryError"
export KYLIN_JVM_SETTINGS="\$KYLIN_JVM_SETTINGS -XX:HeapDumpPath=/opt/kylin/logs/"
export KYLIN_JVM_SETTINGS="\$KYLIN_JVM_SETTINGS -XX:+PrintStringDeduplicationStatistics"

# 网络优化
export KYLIN_JVM_SETTINGS="\$KYLIN_JVM_SETTINGS -Djava.net.preferIPv4Stack=true"
export KYLIN_JVM_SETTINGS="\$KYLIN_JVM_SETTINGS -Dsun.net.inetaddr.ttl=60"

EOF

echo "Kylin JVM配置已生成"

# 创建MapReduce JVM配置
cat > /tmp/mapred_jvm_config.xml << EOF
<!-- MapReduce JVM优化配置 -->
<configuration>
    <!-- Map任务JVM配置 -->
    <property>
        <name>mapreduce.map.java.opts</name>
        <value>-Xmx3276m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+UseCompressedOops</value>
    </property>
    
    <!-- Reduce任务JVM配置 -->
    <property>
        <name>mapreduce.reduce.java.opts</name>
        <value>-Xmx6553m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+UseCompressedOops</value>
    </property>
    
    <!-- 应用Master JVM配置 -->
    <property>
        <name>yarn.app.mapreduce.am.command-opts</name>
        <value>-Xmx1638m -XX:+UseG1GC -XX:MaxGCPauseMillis=200</value>
    </property>
EOF

echo "MapReduce JVM配置已生成"

# 创建性能监控脚本
cat > /opt/kylin/bin/monitor_performance.sh << 'EOF'
#!/bin/bash
# 性能监控脚本

LOG_DIR="/opt/kylin/logs"
DATE=$(date +"%Y%m%d_%H%M%S")

# 创建监控日志
echo "开始性能监控 - $DATE" > "$LOG_DIR/performance_$DATE.log"

# 监控JVM内存使用
echo "=== JVM内存使用 ===" >> "$LOG_DIR/performance_$DATE.log"
jps -v | grep kylin >> "$LOG_DIR/performance_$DATE.log"

# 监控GC情况
echo "=== GC统计 ===" >> "$LOG_DIR/performance_$DATE.log"
if [ -f "$LOG_DIR/gc.log" ]; then
    tail -n 100 "$LOG_DIR/gc.log" >> "$LOG_DIR/performance_$DATE.log"
fi

# 监控系统资源
echo "=== 系统资源 ===" >> "$LOG_DIR/performance_$DATE.log"
echo "CPU使用率:" >> "$LOG_DIR/performance_$DATE.log"
top -bn1 | grep "Cpu(s)" >> "$LOG_DIR/performance_$DATE.log"

echo "内存使用:" >> "$LOG_DIR/performance_$DATE.log"
free -h >> "$LOG_DIR/performance_$DATE.log"

echo "磁盘使用:" >> "$LOG_DIR/performance_$DATE.log"
df -h >> "$LOG_DIR/performance_$DATE.log"

# 监控网络连接
echo "=== 网络连接 ===" >> "$LOG_DIR/performance_$DATE.log"
netstat -an | grep :7070 >> "$LOG_DIR/performance_$DATE.log"

echo "性能监控完成 - $DATE" >> "$LOG_DIR/performance_$DATE.log"
EOF

chmod +x /opt/kylin/bin/monitor_performance.sh

echo "性能监控脚本已创建"

# 设置定时监控
echo "设置定时性能监控..."
(crontab -l 2>/dev/null; echo "*/10 * * * * /opt/kylin/bin/monitor_performance.sh") | crontab -

echo "JVM性能调优完成!"
echo "请重启Kylin服务以应用新配置:sudo systemctl restart kylin"

6.5 本章小结

本章详细介绍了Apache Kylin的Cube构建与管理,主要内容包括:

6.5.1 核心知识点

  1. Cube构建基础

    • 构建原理和流程
    • 增量构建vs全量构建
    • 构建策略配置
  2. 构建优化策略

    • 并行构建优化
    • MapReduce参数调优
    • 存储优化配置
  3. 构建监控与管理

    • 实时监控系统
    • 告警机制设计
    • 性能指标收集
  4. 故障排除

    • 常见问题诊断
    • 系统健康检查
    • 性能调优指南

6.5.2 实用工具

本章提供了多个实用的管理工具: - cube_builder.py - Cube构建管理工具 - hbase_optimizer.py - HBase存储优化工具 - build_monitor.py - 构建监控系统 - build_troubleshooter.py - 故障排除工具

6.5.3 最佳实践

  1. 构建策略

    • 合理选择增量构建时间窗口
    • 设置适当的段合并策略
    • 优化并行度配置
  2. 性能优化

    • JVM参数调优
    • 资源分配优化
    • 存储压缩配置
  3. 运维管理

    • 建立完善的监控体系
    • 制定故障处理流程
    • 定期性能评估

6.6 下一章预告

下一章将介绍查询优化与性能调优,包括: - SQL查询优化技巧 - 查询性能分析 - 缓存策略配置 - 并发查询管理

6.7 练习与思考

理论练习

  1. 解释Cube构建的五个主要阶段及其作用
  2. 比较增量构建和全量构建的优缺点
  3. 分析影响构建性能的主要因素

实践练习

  1. 使用提供的工具脚本构建一个测试Cube
  2. 配置构建监控系统并测试告警功能
  3. 模拟构建失败场景并使用故障排除工具诊断

思考题

  1. 如何设计一个自动化的Cube构建调度系统?
  2. 在大数据量场景下,如何优化构建性能?
  3. 如何平衡构建频率和系统资源消耗?