8.1 集群部署与管理

8.1.1 生产环境部署

集群规划

在生产环境中部署Apache Kylin需要仔细规划集群架构,确保高可用性、可扩展性和性能。

graph TB
    subgraph "负载均衡层"
        LB[负载均衡器]
    end
    
    subgraph "Kylin集群"
        K1[Kylin节点1<br/>Query + Job]
        K2[Kylin节点2<br/>Query + Job]
        K3[Kylin节点3<br/>Query Only]
    end
    
    subgraph "存储层"
        subgraph "HBase集群"
            HM[HBase Master]
            HR1[RegionServer1]
            HR2[RegionServer2]
            HR3[RegionServer3]
        end
        
        subgraph "HDFS集群"
            NN[NameNode]
            DN1[DataNode1]
            DN2[DataNode2]
            DN3[DataNode3]
        end
    end
    
    subgraph "计算层"
        subgraph "Spark集群"
            SM[Spark Master]
            SW1[Spark Worker1]
            SW2[Spark Worker2]
            SW3[Spark Worker3]
        end
    end
    
    subgraph "协调层"
        ZK1[ZooKeeper1]
        ZK2[ZooKeeper2]
        ZK3[ZooKeeper3]
    end
    
    LB --> K1
    LB --> K2
    LB --> K3
    
    K1 --> HM
    K2 --> HM
    K3 --> HM
    
    K1 --> SM
    K2 --> SM
    
    HM --> ZK1
    SM --> ZK1
    K1 --> ZK1

自动化部署脚本

#!/bin/bash
# deploy_kylin_cluster.sh - Kylin集群自动化部署脚本

set -e

# 配置参数
KYLIN_VERSION="4.0.3"
CLUSTER_NODES=("node1" "node2" "node3")
KYLIN_HOME="/opt/kylin"
KYLIN_USER="kylin"
SSH_KEY="~/.ssh/id_rsa"

# 颜色输出
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color

log_info() {
    echo -e "${GREEN}[INFO]${NC} $1"
}

log_warn() {
    echo -e "${YELLOW}[WARN]${NC} $1"
}

log_error() {
    echo -e "${RED}[ERROR]${NC} $1"
}

# 检查前置条件
check_prerequisites() {
    log_info "检查部署前置条件..."
    
    # 检查SSH连接
    for node in "${CLUSTER_NODES[@]}"; do
        if ! ssh -i "$SSH_KEY" -o ConnectTimeout=5 "$KYLIN_USER@$node" "echo 'SSH连接正常'" >/dev/null 2>&1; then
            log_error "无法SSH连接到节点: $node"
            exit 1
        fi
    done
    
    # 检查Java环境
    for node in "${CLUSTER_NODES[@]}"; do
        java_version=$(ssh -i "$SSH_KEY" "$KYLIN_USER@$node" "java -version 2>&1 | head -n1 | cut -d'"' -f2")
        if [[ ! $java_version =~ ^1\.[8-9]|^[0-9][0-9] ]]; then
            log_error "节点 $node Java版本不符合要求: $java_version"
            exit 1
        fi
        log_info "节点 $node Java版本: $java_version"
    done
    
    # 检查Hadoop环境
    for node in "${CLUSTER_NODES[@]}"; do
        if ! ssh -i "$SSH_KEY" "$KYLIN_USER@$node" "which hadoop" >/dev/null 2>&1; then
            log_error "节点 $node 未安装Hadoop"
            exit 1
        fi
    done
    
    # 检查HBase环境
    for node in "${CLUSTER_NODES[@]}"; do
        if ! ssh -i "$SSH_KEY" "$KYLIN_USER@$node" "which hbase" >/dev/null 2>&1; then
            log_error "节点 $node 未安装HBase"
            exit 1
        fi
    done
    
    log_info "前置条件检查通过"
}

# 下载Kylin
download_kylin() {
    log_info "下载Apache Kylin $KYLIN_VERSION..."
    
    local kylin_package="apache-kylin-$KYLIN_VERSION-bin.tar.gz"
    local download_url="https://archive.apache.org/dist/kylin/$KYLIN_VERSION/$kylin_package"
    
    if [ ! -f "$kylin_package" ]; then
        wget "$download_url" || {
            log_error "下载Kylin失败"
            exit 1
        }
    fi
    
    log_info "Kylin下载完成"
}

# 分发Kylin到各节点
distribute_kylin() {
    log_info "分发Kylin到集群节点..."
    
    local kylin_package="apache-kylin-$KYLIN_VERSION-bin.tar.gz"
    
    for node in "${CLUSTER_NODES[@]}"; do
        log_info "分发到节点: $node"
        
        # 创建目录
        ssh -i "$SSH_KEY" "$KYLIN_USER@$node" "sudo mkdir -p $KYLIN_HOME && sudo chown $KYLIN_USER:$KYLIN_USER $KYLIN_HOME"
        
        # 上传文件
        scp -i "$SSH_KEY" "$kylin_package" "$KYLIN_USER@$node:/tmp/"
        
        # 解压安装
        ssh -i "$SSH_KEY" "$KYLIN_USER@$node" "
            cd /tmp && 
            tar -xzf $kylin_package && 
            mv apache-kylin-$KYLIN_VERSION-bin/* $KYLIN_HOME/ && 
            rm -rf apache-kylin-$KYLIN_VERSION-bin $kylin_package
        "
        
        log_info "节点 $node 安装完成"
    done
}

# 配置Kylin
configure_kylin() {
    log_info "配置Kylin集群..."
    
    # 生成配置文件
    cat > kylin.properties << 'EOF'
# Kylin集群配置
kylin.metadata.url=kylin_metadata@hbase
kylin.storage.url=hbase
kylin.engine.default=2
kylin.engine.spark-conf-mergedict=true
kylin.engine.spark.rdd-partition-cut-mb=10
kylin.engine.spark.min-partition=1
kylin.engine.spark.max-partition=5000

# 查询配置
kylin.query.cache-enabled=true
kylin.query.large-query-threshold=1000000
kylin.query.timeout-seconds=300
kylin.query.max-return-rows=5000000

# 安全配置
kylin.security.profile=ldap
kylin.security.ldap.connection-server=ldap://ldap.company.com:389
kylin.security.ldap.connection-username=cn=admin,dc=company,dc=com
kylin.security.ldap.connection-password=admin_password
kylin.security.ldap.user-search-base=ou=people,dc=company,dc=com
kylin.security.ldap.user-search-pattern=(&(cn={0}))
kylin.security.ldap.user-group-search-base=ou=groups,dc=company,dc=com
kylin.security.ldap.user-group-search-filter=(member={0})

# 监控配置
kylin.metrics.enabled=true
kylin.metrics.reporter=jmx
kylin.metrics.prefix=KYLIN

# 集群配置
kylin.server.cluster-servers=node1:7070,node2:7070,node3:7070
kylin.server.cluster-name=kylin_cluster
EOF

    # 分发配置文件
    for node in "${CLUSTER_NODES[@]}"; do
        scp -i "$SSH_KEY" kylin.properties "$KYLIN_USER@$node:$KYLIN_HOME/conf/"
        
        # 设置环境变量
        ssh -i "$SSH_KEY" "$KYLIN_USER@$node" "
            echo 'export KYLIN_HOME=$KYLIN_HOME' >> ~/.bashrc
            echo 'export PATH=\$KYLIN_HOME/bin:\$PATH' >> ~/.bashrc
            source ~/.bashrc
        "
    done
    
    log_info "Kylin配置完成"
}

# 初始化元数据
init_metadata() {
    log_info "初始化Kylin元数据..."
    
    local master_node="${CLUSTER_NODES[0]}"
    
    ssh -i "$SSH_KEY" "$KYLIN_USER@$master_node" "
        cd $KYLIN_HOME && 
        bin/metastore.sh reset
    " || {
        log_error "元数据初始化失败"
        exit 1
    }
    
    log_info "元数据初始化完成"
}

# 启动集群
start_cluster() {
    log_info "启动Kylin集群..."
    
    # 启动第一个节点(作为主节点)
    local master_node="${CLUSTER_NODES[0]}"
    log_info "启动主节点: $master_node"
    
    ssh -i "$SSH_KEY" "$KYLIN_USER@$master_node" "
        cd $KYLIN_HOME && 
        bin/kylin.sh start
    " &
    
    sleep 30  # 等待主节点启动
    
    # 启动其他节点
    for i in $(seq 1 $((${#CLUSTER_NODES[@]} - 1))); do
        local node="${CLUSTER_NODES[$i]}"
        log_info "启动节点: $node"
        
        ssh -i "$SSH_KEY" "$KYLIN_USER@$node" "
            cd $KYLIN_HOME && 
            bin/kylin.sh start
        " &
        
        sleep 10  # 错开启动时间
    done
    
    wait  # 等待所有启动完成
    
    log_info "集群启动完成"
}

# 验证部署
verify_deployment() {
    log_info "验证集群部署..."
    
    sleep 60  # 等待服务完全启动
    
    for node in "${CLUSTER_NODES[@]}"; do
        log_info "检查节点: $node"
        
        # 检查进程
        if ssh -i "$SSH_KEY" "$KYLIN_USER@$node" "jps | grep -q KylinLauncher"; then
            log_info "✓ 节点 $node Kylin进程正常"
        else
            log_error "✗ 节点 $node Kylin进程异常"
        fi
        
        # 检查端口
        if ssh -i "$SSH_KEY" "$KYLIN_USER@$node" "netstat -tlnp | grep -q :7070"; then
            log_info "✓ 节点 $node 端口7070正常监听"
        else
            log_warn "✗ 节点 $node 端口7070未监听"
        fi
        
        # 检查Web界面
        if curl -s "http://$node:7070/kylin/api/admin/version" >/dev/null; then
            log_info "✓ 节点 $node Web界面可访问"
        else
            log_warn "✗ 节点 $node Web界面不可访问"
        fi
    done
    
    log_info "部署验证完成"
}

# 生成部署报告
generate_report() {
    log_info "生成部署报告..."
    
    cat > deployment_report.txt << EOF
=== Kylin集群部署报告 ===
部署时间: $(date)
Kylin版本: $KYLIN_VERSION
集群节点: ${CLUSTER_NODES[*]}
安装路径: $KYLIN_HOME

=== 访问信息 ===
Web界面: http://${CLUSTER_NODES[0]}:7070/kylin
默认用户: ADMIN
默认密码: KYLIN

=== 配置文件位置 ===
主配置: $KYLIN_HOME/conf/kylin.properties
日志配置: $KYLIN_HOME/conf/kylin-tools-log4j.properties
环境配置: $KYLIN_HOME/conf/setenv.sh

=== 常用命令 ===
启动服务: $KYLIN_HOME/bin/kylin.sh start
停止服务: $KYLIN_HOME/bin/kylin.sh stop
查看状态: $KYLIN_HOME/bin/kylin.sh status
查看日志: tail -f $KYLIN_HOME/logs/kylin.log

=== 下一步操作 ===
1. 访问Web界面验证部署
2. 创建项目和数据模型
3. 配置监控和告警
4. 设置备份策略
EOF

    log_info "部署报告已生成: deployment_report.txt"
}

# 主函数
main() {
    log_info "开始Kylin集群自动化部署..."
    
    check_prerequisites
    download_kylin
    distribute_kylin
    configure_kylin
    init_metadata
    start_cluster
    verify_deployment
    generate_report
    
    log_info "Kylin集群部署完成!"
    log_info "请查看 deployment_report.txt 获取详细信息"
}

# 脚本入口
if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then
    main "$@"
fi

8.4 容灾与高可用

8.4.1 高可用架构设计

架构设计原则

graph TB
    subgraph "负载均衡层"
        LB1["Nginx 主"]
        LB2["Nginx 备"]
    end
    
    subgraph "Kylin集群"
        K1["Kylin节点1"]
        K2["Kylin节点2"]
        K3["Kylin节点3"]
    end
    
    subgraph "存储层"
        subgraph "HBase集群"
            H1["HBase Master1"]
            H2["HBase Master2"]
            H3["RegionServer1"]
            H4["RegionServer2"]
            H5["RegionServer3"]
        end
        
        subgraph "HDFS集群"
            N1["NameNode1"]
            N2["NameNode2"]
            D1["DataNode1"]
            D2["DataNode2"]
            D3["DataNode3"]
        end
    end
    
    subgraph "协调层"
        Z1["ZooKeeper1"]
        Z2["ZooKeeper2"]
        Z3["ZooKeeper3"]
    end
    
    LB1 --> K1
    LB1 --> K2
    LB1 --> K3
    LB2 --> K1
    LB2 --> K2
    LB2 --> K3
    
    K1 --> H1
    K2 --> H2
    K3 --> H3
    
    H1 --> Z1
    H2 --> Z2
    H3 --> Z3
    
    N1 --> Z1
    N2 --> Z2

高可用配置脚本

#!/bin/bash
# ha_setup.sh - Kylin高可用配置脚本

set -e

# 配置参数
KYLIN_NODES=("node1" "node2" "node3")
VIP="192.168.1.100"
INTERFACE="eth0"
KYLIN_HOME="/opt/kylin"
KEEPALIVED_CONF="/etc/keepalived/keepalived.conf"

# 颜色输出
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m'

log_info() {
    echo -e "${GREEN}[INFO]${NC} $(date '+%Y-%m-%d %H:%M:%S') $1"
}

log_warn() {
    echo -e "${YELLOW}[WARN]${NC} $(date '+%Y-%m-%d %H:%M:%S') $1"
}

log_error() {
    echo -e "${RED}[ERROR]${NC} $(date '+%Y-%m-%d %H:%M:%S') $1"
}

log_step() {
    echo -e "${BLUE}[STEP]${NC} $(date '+%Y-%m-%d %H:%M:%S') $1"
}

# 安装keepalived
install_keepalived() {
    log_step "安装keepalived..."
    
    if command -v yum &> /dev/null; then
        yum install -y keepalived
    elif command -v apt-get &> /dev/null; then
        apt-get update
        apt-get install -y keepalived
    else
        log_error "不支持的包管理器"
        return 1
    fi
    
    log_info "keepalived安装完成"
}

# 配置keepalived
configure_keepalived() {
    local node_priority="$1"
    local state="$2"
    
    log_step "配置keepalived (优先级: $node_priority, 状态: $state)..."
    
    cat > "$KEEPALIVED_CONF" << EOF
! Configuration File for keepalived

global_defs {
    notification_email {
        admin@company.com
    }
    notification_email_from kylin@company.com
    smtp_server localhost
    smtp_connect_timeout 30
    router_id KYLIN_HA
}

vrrp_script chk_kylin {
    script "/opt/scripts/check_kylin.sh"
    interval 2
    weight -2
    fall 3
    rise 2
}

vrrp_instance VI_1 {
    state $state
    interface $INTERFACE
    virtual_router_id 51
    priority $node_priority
    advert_int 1
    authentication {
        auth_type PASS
        auth_pass kylin123
    }
    virtual_ipaddress {
        $VIP
    }
    track_script {
        chk_kylin
    }
    notify_master "/opt/scripts/kylin_master.sh"
    notify_backup "/opt/scripts/kylin_backup.sh"
    notify_fault "/opt/scripts/kylin_fault.sh"
}
EOF
    
    log_info "keepalived配置完成"
}

# 创建健康检查脚本
create_health_check() {
    log_step "创建健康检查脚本..."
    
    mkdir -p /opt/scripts
    
    # Kylin健康检查脚本
    cat > /opt/scripts/check_kylin.sh << 'EOF'
#!/bin/bash
# 检查Kylin服务状态

KYLIN_PORT=7070
KYLIN_URL="http://localhost:$KYLIN_PORT/kylin/api/user/authentication"

# 检查端口是否监听
if ! netstat -tuln | grep ":$KYLIN_PORT " > /dev/null; then
    exit 1
fi

# 检查HTTP响应
if ! curl -s --connect-timeout 5 "$KYLIN_URL" > /dev/null; then
    exit 1
fi

exit 0
EOF
    
    # Master切换脚本
    cat > /opt/scripts/kylin_master.sh << 'EOF'
#!/bin/bash
# Kylin Master切换处理

echo "$(date): Kylin节点切换为Master" >> /var/log/kylin_ha.log

# 确保Kylin服务运行
if ! pgrep -f "kylin" > /dev/null; then
    /opt/kylin/bin/kylin.sh start
fi

# 发送通知
echo "Kylin Master切换: $(hostname)" | mail -s "Kylin HA Event" admin@company.com
EOF
    
    # Backup切换脚本
    cat > /opt/scripts/kylin_backup.sh << 'EOF'
#!/bin/bash
# Kylin Backup切换处理

echo "$(date): Kylin节点切换为Backup" >> /var/log/kylin_ha.log
EOF
    
    # 故障处理脚本
    cat > /opt/scripts/kylin_fault.sh << 'EOF'
#!/bin/bash
# Kylin故障处理

echo "$(date): Kylin节点故障" >> /var/log/kylin_ha.log

# 发送告警
echo "Kylin节点故障: $(hostname)" | mail -s "Kylin HA Alert" admin@company.com
EOF
    
    # 设置执行权限
    chmod +x /opt/scripts/*.sh
    
    log_info "健康检查脚本创建完成"
}

# 配置Nginx负载均衡
configure_nginx_lb() {
    log_step "配置Nginx负载均衡..."
    
    # 安装Nginx
    if ! command -v nginx &> /dev/null; then
        if command -v yum &> /dev/null; then
            yum install -y nginx
        elif command -v apt-get &> /dev/null; then
            apt-get install -y nginx
        fi
    fi
    
    # 配置负载均衡
    cat > /etc/nginx/conf.d/kylin_lb.conf << EOF
upstream kylin_backend {
    least_conn;
    server ${KYLIN_NODES[0]}:7070 max_fails=3 fail_timeout=30s;
    server ${KYLIN_NODES[1]}:7070 max_fails=3 fail_timeout=30s;
    server ${KYLIN_NODES[2]}:7070 max_fails=3 fail_timeout=30s;
}

server {
    listen 80;
    server_name kylin.company.com;
    
    # 健康检查
    location /health {
        access_log off;
        return 200 "healthy\n";
        add_header Content-Type text/plain;
    }
    
    # Kylin代理
    location / {
        proxy_pass http://kylin_backend;
        proxy_set_header Host \$host;
        proxy_set_header X-Real-IP \$remote_addr;
        proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto \$scheme;
        
        # 超时设置
        proxy_connect_timeout 30s;
        proxy_send_timeout 60s;
        proxy_read_timeout 60s;
        
        # 会话保持
        proxy_cookie_path / "/; HttpOnly; Secure";
    }
    
    # WebSocket支持
    location /kylin/ws {
        proxy_pass http://kylin_backend;
        proxy_http_version 1.1;
        proxy_set_header Upgrade \$http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host \$host;
    }
}
EOF
    
    # 测试配置
    nginx -t
    
    # 重启Nginx
    systemctl restart nginx
    systemctl enable nginx
    
    log_info "Nginx负载均衡配置完成"
}

# 主函数
main() {
    case "${1:-setup}" in
        "setup")
            if [ -z "$2" ]; then
                echo "用法: $0 setup <node_index>"
                echo "  node_index: 节点索引 (0, 1, 2)"
                exit 1
            fi
            setup_cluster_node "$2"
            ;;
        "nginx")
            configure_nginx_lb
            ;;
        "verify")
            verify_ha_setup
            ;;
        "test")
            test_failover
            ;;
        *)
            echo "用法: $0 [setup|nginx|verify|test]"
            echo "  setup:   配置集群节点"
            echo "  nginx:   配置Nginx负载均衡"
            echo "  verify:  验证高可用配置"
            echo "  test:    故障转移测试"
            exit 1
            ;;
    esac
}

# 脚本入口
if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then
    main "$@"
fi

8.4.2 故障转移机制

自动故障转移脚本

#!/usr/bin/env python3
# failover_manager.py - Kylin故障转移管理器

import time
import requests
import subprocess
import logging
import json
from typing import List, Dict
from dataclasses import dataclass
from datetime import datetime
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

@dataclass
class KylinNode:
    """Kylin节点信息"""
    hostname: str
    ip: str
    port: int = 7070
    priority: int = 100
    status: str = "unknown"  # healthy, unhealthy, unknown
    last_check: datetime = None
    
    @property
    def url(self) -> str:
        return f"http://{self.ip}:{self.port}"
    
    @property
    def health_url(self) -> str:
        return f"{self.url}/kylin/api/user/authentication"

class FailoverManager:
    """故障转移管理器"""
    
    def __init__(self, config_file: str = "/etc/kylin/failover.json"):
        self.config = self._load_config(config_file)
        self.nodes = [KylinNode(**node) for node in self.config['nodes']]
        self.vip = self.config['vip']
        self.interface = self.config['interface']
        self.check_interval = self.config.get('check_interval', 30)
        self.failure_threshold = self.config.get('failure_threshold', 3)
        self.recovery_threshold = self.config.get('recovery_threshold', 2)
        
        # 设置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('/var/log/kylin_failover.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
        
        # 故障计数器
        self.failure_counts = {node.hostname: 0 for node in self.nodes}
        self.recovery_counts = {node.hostname: 0 for node in self.nodes}
        
    def _load_config(self, config_file: str) -> Dict:
        """加载配置文件"""
        try:
            with open(config_file, 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            # 默认配置
            return {
                "nodes": [
                    {"hostname": "kylin1", "ip": "192.168.1.101", "priority": 110},
                    {"hostname": "kylin2", "ip": "192.168.1.102", "priority": 100},
                    {"hostname": "kylin3", "ip": "192.168.1.103", "priority": 90}
                ],
                "vip": "192.168.1.100",
                "interface": "eth0",
                "check_interval": 30,
                "failure_threshold": 3,
                "recovery_threshold": 2,
                "email": {
                    "smtp_server": "localhost",
                    "smtp_port": 25,
                    "from_addr": "kylin@company.com",
                    "to_addrs": ["admin@company.com"]
                }
            }
    
    def check_node_health(self, node: KylinNode) -> bool:
        """检查节点健康状态"""
        try:
            response = requests.get(
                node.health_url,
                timeout=10,
                auth=('ADMIN', 'KYLIN')
            )
            
            if response.status_code == 200:
                node.status = "healthy"
                node.last_check = datetime.now()
                return True
            else:
                node.status = "unhealthy"
                return False
                
        except Exception as e:
            self.logger.warning(f"节点 {node.hostname} 健康检查失败: {e}")
            node.status = "unhealthy"
            return False
    
    def get_current_master(self) -> KylinNode:
        """获取当前Master节点"""
        try:
            # 检查VIP在哪个节点上
            result = subprocess.run(
                ['ip', 'addr', 'show', self.interface],
                capture_output=True,
                text=True
            )
            
            if self.vip in result.stdout:
                # VIP在本地,查找本地节点
                hostname = subprocess.run(
                    ['hostname'],
                    capture_output=True,
                    text=True
                ).stdout.strip()
                
                for node in self.nodes:
                    if node.hostname == hostname:
                        return node
            
            # 通过SSH检查其他节点
            for node in self.nodes:
                try:
                    result = subprocess.run(
                        ['ssh', node.ip, 'ip', 'addr', 'show', self.interface],
                        capture_output=True,
                        text=True,
                        timeout=5
                    )
                    
                    if self.vip in result.stdout:
                        return node
                        
                except subprocess.TimeoutExpired:
                    continue
                    
        except Exception as e:
            self.logger.error(f"获取当前Master失败: {e}")
            
        return None
    
    def select_new_master(self, exclude_nodes: List[str] = None) -> KylinNode:
        """选择新的Master节点"""
        exclude_nodes = exclude_nodes or []
        
        # 过滤掉排除的节点和不健康的节点
        candidates = [
            node for node in self.nodes
            if node.hostname not in exclude_nodes and node.status == "healthy"
        ]
        
        if not candidates:
            self.logger.error("没有可用的候选Master节点")
            return None
        
        # 按优先级排序,选择优先级最高的节点
        candidates.sort(key=lambda x: x.priority, reverse=True)
        return candidates[0]
    
    def promote_to_master(self, node: KylinNode) -> bool:
        """提升节点为Master"""
        try:
            self.logger.info(f"提升节点 {node.hostname} 为Master")
            
            # 在目标节点上配置VIP
            cmd = [
                'ssh', node.ip,
                'sudo', 'ip', 'addr', 'add', f'{self.vip}/24', 'dev', self.interface
            ]
            
            result = subprocess.run(cmd, capture_output=True, text=True)
            
            if result.returncode == 0:
                self.logger.info(f"VIP {self.vip} 已配置到节点 {node.hostname}")
                
                # 发送gratuitous ARP
                subprocess.run([
                    'ssh', node.ip,
                    'sudo', 'arping', '-c', '3', '-A', '-I', self.interface, self.vip
                ])
                
                # 更新负载均衡配置
                self._update_load_balancer(node)
                
                # 发送通知
                self._send_notification(
                    f"Kylin Master切换",
                    f"节点 {node.hostname} ({node.ip}) 已提升为Master"
                )
                
                return True
            else:
                self.logger.error(f"配置VIP失败: {result.stderr}")
                return False
                
        except Exception as e:
            self.logger.error(f"提升Master失败: {e}")
            return False
    
    def demote_from_master(self, node: KylinNode) -> bool:
        """降级Master节点"""
        try:
            self.logger.info(f"降级节点 {node.hostname} 的Master状态")
            
            # 移除VIP
            cmd = [
                'ssh', node.ip,
                'sudo', 'ip', 'addr', 'del', f'{self.vip}/24', 'dev', self.interface
            ]
            
            result = subprocess.run(cmd, capture_output=True, text=True)
            
            if result.returncode == 0 or "Cannot assign" in result.stderr:
                self.logger.info(f"VIP {self.vip} 已从节点 {node.hostname} 移除")
                return True
            else:
                self.logger.warning(f"移除VIP警告: {result.stderr}")
                return True  # 可能VIP已经不在该节点上
                
        except Exception as e:
            self.logger.error(f"降级Master失败: {e}")
            return False
    
    def _update_load_balancer(self, master_node: KylinNode):
        """更新负载均衡配置"""
        try:
            # 这里可以调用负载均衡器的API来更新配置
            # 例如更新Nginx upstream配置
            self.logger.info(f"更新负载均衡配置,Master: {master_node.hostname}")
            
        except Exception as e:
            self.logger.error(f"更新负载均衡配置失败: {e}")
    
    def _send_notification(self, subject: str, message: str):
        """发送邮件通知"""
        try:
            email_config = self.config.get('email', {})
            if not email_config.get('to_addrs'):
                return
            
            msg = MIMEMultipart()
            msg['From'] = email_config.get('from_addr', 'kylin@company.com')
            msg['To'] = ', '.join(email_config['to_addrs'])
            msg['Subject'] = f"[Kylin HA] {subject}"
            
            body = f"""
时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
事件: {subject}
详情: {message}

当前集群状态:
"""
            
            for node in self.nodes:
                body += f"  {node.hostname} ({node.ip}): {node.status}\n"
            
            msg.attach(MIMEText(body, 'plain'))
            
            server = smtplib.SMTP(
                email_config.get('smtp_server', 'localhost'),
                email_config.get('smtp_port', 25)
            )
            
            server.send_message(msg)
            server.quit()
            
            self.logger.info(f"通知邮件已发送: {subject}")
            
        except Exception as e:
            self.logger.error(f"发送邮件通知失败: {e}")
    
    def perform_failover(self) -> bool:
        """执行故障转移"""
        current_master = self.get_current_master()
        
        if not current_master:
            self.logger.error("无法确定当前Master节点")
            return False
        
        self.logger.info(f"当前Master: {current_master.hostname}")
        
        # 检查当前Master是否健康
        if self.check_node_health(current_master):
            self.logger.info("当前Master节点健康,无需故障转移")
            return True
        
        self.logger.warning(f"当前Master {current_master.hostname} 不健康,开始故障转移")
        
        # 选择新的Master
        new_master = self.select_new_master([current_master.hostname])
        
        if not new_master:
            self.logger.error("没有可用的新Master节点")
            return False
        
        self.logger.info(f"选择新Master: {new_master.hostname}")
        
        # 执行故障转移
        success = True
        
        # 降级当前Master
        if not self.demote_from_master(current_master):
            self.logger.error("降级当前Master失败")
            success = False
        
        # 提升新Master
        if not self.promote_to_master(new_master):
            self.logger.error("提升新Master失败")
            success = False
        
        if success:
            self.logger.info(f"故障转移成功: {current_master.hostname} -> {new_master.hostname}")
        else:
            self.logger.error("故障转移失败")
        
        return success
    
    def monitor_loop(self):
        """监控循环"""
        self.logger.info("开始Kylin故障转移监控")
        
        while True:
            try:
                # 检查所有节点健康状态
                for node in self.nodes:
                    is_healthy = self.check_node_health(node)
                    
                    if is_healthy:
                        # 重置故障计数,增加恢复计数
                        self.failure_counts[node.hostname] = 0
                        self.recovery_counts[node.hostname] += 1
                        
                        if (self.recovery_counts[node.hostname] >= self.recovery_threshold and 
                            node.status != "healthy"):
                            self.logger.info(f"节点 {node.hostname} 已恢复")
                            node.status = "healthy"
                            self.recovery_counts[node.hostname] = 0
                    else:
                        # 增加故障计数,重置恢复计数
                        self.failure_counts[node.hostname] += 1
                        self.recovery_counts[node.hostname] = 0
                        
                        if self.failure_counts[node.hostname] >= self.failure_threshold:
                            self.logger.warning(
                                f"节点 {node.hostname} 连续 {self.failure_counts[node.hostname]} 次检查失败"
                            )
                            node.status = "unhealthy"
                
                # 检查是否需要故障转移
                current_master = self.get_current_master()
                if current_master and current_master.status == "unhealthy":
                    self.perform_failover()
                
                time.sleep(self.check_interval)
                
            except KeyboardInterrupt:
                self.logger.info("监控已停止")
                break
            except Exception as e:
                self.logger.error(f"监控循环异常: {e}")
                time.sleep(self.check_interval)
    
    def status_report(self) -> Dict:
        """生成状态报告"""
        # 检查所有节点状态
        for node in self.nodes:
            self.check_node_health(node)
        
        current_master = self.get_current_master()
        
        report = {
            "timestamp": datetime.now().isoformat(),
            "current_master": current_master.hostname if current_master else None,
            "vip": self.vip,
            "nodes": [
                {
                    "hostname": node.hostname,
                    "ip": node.ip,
                    "status": node.status,
                    "priority": node.priority,
                    "last_check": node.last_check.isoformat() if node.last_check else None,
                    "failure_count": self.failure_counts[node.hostname],
                    "recovery_count": self.recovery_counts[node.hostname]
                }
                for node in self.nodes
            ]
        }
        
        return report

def main():
    import argparse
    
    parser = argparse.ArgumentParser(description='Kylin故障转移管理器')
    parser.add_argument('--config', default='/etc/kylin/failover.json', help='配置文件路径')
    parser.add_argument('--action', choices=['monitor', 'status', 'failover'], 
                       default='monitor', help='执行动作')
    
    args = parser.parse_args()
    
    manager = FailoverManager(args.config)
    
    if args.action == 'monitor':
        manager.monitor_loop()
    elif args.action == 'status':
        report = manager.status_report()
        print(json.dumps(report, indent=2, ensure_ascii=False))
    elif args.action == 'failover':
        success = manager.perform_failover()
        exit(0 if success else 1)

if __name__ == '__main__':
    main()

8.5 性能调优

8.5.1 系统级性能调优

性能调优脚本

#!/bin/bash
# performance_tuning.sh - Kylin性能调优脚本

set -e

# 配置参数
KYLIN_HOME="/opt/kylin"
HBASE_HOME="/opt/hbase"
HADOOP_HOME="/opt/hadoop"
SPARK_HOME="/opt/spark"

# 颜色输出
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m'

log_info() {
    echo -e "${GREEN}[INFO]${NC} $(date '+%Y-%m-%d %H:%M:%S') $1"
}

log_warn() {
    echo -e "${YELLOW}[WARN]${NC} $(date '+%Y-%m-%d %H:%M:%S') $1"
}

log_error() {
    echo -e "${RED}[ERROR]${NC} $(date '+%Y-%m-%d %H:%M:%S') $1"
}

log_step() {
    echo -e "${BLUE}[STEP]${NC} $(date '+%Y-%m-%d %H:%M:%S') $1"
}

# 获取系统信息
get_system_info() {
    log_step "收集系统信息..."
    
    echo "=== 系统信息 ==="
    echo "CPU核心数: $(nproc)"
    echo "内存大小: $(free -h | grep '^Mem:' | awk '{print $2}')"
    echo "磁盘信息:"
    df -h | grep -E '^/dev/'
    echo
    
    # 保存到变量
    export CPU_CORES=$(nproc)
    export TOTAL_MEM_GB=$(free -g | grep '^Mem:' | awk '{print $2}')
    
    log_info "系统信息收集完成"
}

# 优化内核参数
optimize_kernel_params() {
    log_step "优化内核参数..."
    
    # 备份原始配置
    cp /etc/sysctl.conf /etc/sysctl.conf.backup.$(date +%Y%m%d_%H%M%S)
    
    # 添加优化参数
    cat >> /etc/sysctl.conf << 'EOF'

# Kylin性能优化参数
# 网络优化
net.core.rmem_max = 134217728
net.core.wmem_max = 134217728
net.ipv4.tcp_rmem = 4096 65536 134217728
net.ipv4.tcp_wmem = 4096 65536 134217728
net.core.netdev_max_backlog = 5000
net.ipv4.tcp_congestion_control = bbr

# 内存优化
vm.swappiness = 1
vm.dirty_ratio = 15
vm.dirty_background_ratio = 5
vm.overcommit_memory = 1

# 文件系统优化
fs.file-max = 2097152
fs.nr_open = 2097152

# 进程优化
kernel.pid_max = 4194304
kernel.threads-max = 4194304
EOF
    
    # 应用参数
    sysctl -p
    
    log_info "内核参数优化完成"
}

# 优化文件描述符限制
optimize_file_limits() {
    log_step "优化文件描述符限制..."
    
    # 备份原始配置
    cp /etc/security/limits.conf /etc/security/limits.conf.backup.$(date +%Y%m%d_%H%M%S)
    
    # 添加限制配置
    cat >> /etc/security/limits.conf << 'EOF'

# Kylin性能优化
* soft nofile 1048576
* hard nofile 1048576
* soft nproc 1048576
* hard nproc 1048576
* soft memlock unlimited
* hard memlock unlimited
EOF
    
    # 更新PAM配置
    if ! grep -q "pam_limits.so" /etc/pam.d/login; then
        echo "session required pam_limits.so" >> /etc/pam.d/login
    fi
    
    log_info "文件描述符限制优化完成"
}

# 优化JVM参数
optimize_jvm_params() {
    log_step "优化JVM参数..."
    
    # 计算JVM内存配置
    local heap_size_gb=$((TOTAL_MEM_GB * 70 / 100))  # 70%的内存给JVM
    local young_gen_size_gb=$((heap_size_gb * 30 / 100))  # 30%给年轻代
    
    # Kylin JVM优化
    local kylin_jvm_opts="
-Xms${heap_size_gb}g
-Xmx${heap_size_gb}g
-Xmn${young_gen_size_gb}g
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:G1HeapRegionSize=16m
-XX:+UseStringDeduplication
-XX:+OptimizeStringConcat
-XX:+UseCompressedOops
-XX:+UseCompressedClassPointers
-XX:+UnlockExperimentalVMOptions
-XX:+UseCGroupMemoryLimitForHeap
-XX:+PrintGC
-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps
-XX:+PrintGCApplicationStoppedTime
-Xloggc:$KYLIN_HOME/logs/gc.log
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=10
-XX:GCLogFileSize=100M
-Djava.awt.headless=true
-Dfile.encoding=UTF-8
-Duser.timezone=Asia/Shanghai
"
    
    # 更新Kylin配置
    if [ -f "$KYLIN_HOME/conf/setenv.sh" ]; then
        cp "$KYLIN_HOME/conf/setenv.sh" "$KYLIN_HOME/conf/setenv.sh.backup.$(date +%Y%m%d_%H%M%S)"
    fi
    
    cat > "$KYLIN_HOME/conf/setenv.sh" << EOF
#!/bin/bash

# Kylin JVM优化配置
export KYLIN_JVM_SETTINGS="$kylin_jvm_opts"

# 设置JAVA_HOME
export JAVA_HOME=\${JAVA_HOME:-/usr/lib/jvm/java-8-openjdk}

# 设置Hadoop配置
export HADOOP_CONF_DIR=\${HADOOP_CONF_DIR:-$HADOOP_HOME/etc/hadoop}

# 设置HBase配置
export HBASE_CONF_DIR=\${HBASE_CONF_DIR:-$HBASE_HOME/conf}

# 设置Spark配置
export SPARK_HOME=\${SPARK_HOME:-$SPARK_HOME}
EOF
    
    chmod +x "$KYLIN_HOME/conf/setenv.sh"
    
    log_info "JVM参数优化完成 (堆内存: ${heap_size_gb}GB)"
}

# 优化HBase配置
optimize_hbase_config() {
    log_step "优化HBase配置..."
    
    local hbase_site="$HBASE_HOME/conf/hbase-site.xml"
    
    if [ -f "$hbase_site" ]; then
        cp "$hbase_site" "$hbase_site.backup.$(date +%Y%m%d_%H%M%S)"
    fi
    
    # 计算HBase内存配置
    local regionserver_heap_gb=$((TOTAL_MEM_GB * 50 / 100))
    local master_heap_gb=$((TOTAL_MEM_GB * 10 / 100))
    
    # 生成优化的HBase配置
    cat > "$hbase_site" << EOF
<?xml version="1.0"?>
<configuration>
    <!-- 内存配置 -->
    <property>
        <name>hbase.regionserver.global.memstore.size</name>
        <value>0.4</value>
    </property>
    
    <property>
        <name>hbase.regionserver.global.memstore.size.lower.limit</name>
        <value>0.38</value>
    </property>
    
    <!-- 压缩配置 -->
    <property>
        <name>hbase.hregion.majorcompaction</name>
        <value>604800000</value> <!-- 7天 -->
    </property>
    
    <property>
        <name>hbase.hstore.compaction.min</name>
        <value>3</value>
    </property>
    
    <property>
        <name>hbase.hstore.compaction.max</name>
        <value>10</value>
    </property>
    
    <!-- 并发配置 -->
    <property>
        <name>hbase.regionserver.handler.count</name>
        <value>$((CPU_CORES * 10))</value>
    </property>
    
    <property>
        <name>hbase.regionserver.metahandler.count</name>
        <value>$((CPU_CORES * 2))</value>
    </property>
    
    <!-- 缓存配置 -->
    <property>
        <name>hfile.block.cache.size</name>
        <value>0.3</value>
    </property>
    
    <property>
        <name>hbase.bucketcache.size</name>
        <value>$((regionserver_heap_gb * 1024 * 20 / 100))</value> <!-- 20%堆外缓存 -->
    </property>
    
    <!-- 网络配置 -->
    <property>
        <name>hbase.ipc.server.callqueue.handler.factor</name>
        <value>0.1</value>
    </property>
    
    <property>
        <name>hbase.ipc.server.callqueue.read.ratio</name>
        <value>0.7</value>
    </property>
    
    <!-- WAL配置 -->
    <property>
        <name>hbase.regionserver.hlog.blocksize</name>
        <value>134217728</value> <!-- 128MB -->
    </property>
    
    <property>
        <name>hbase.regionserver.maxlogs</name>
        <value>32</value>
    </property>
</configuration>
EOF
    
    # 优化HBase环境变量
    local hbase_env="$HBASE_HOME/conf/hbase-env.sh"
    
    if [ -f "$hbase_env" ]; then
        cp "$hbase_env" "$hbase_env.backup.$(date +%Y%m%d_%H%M%S)"
    fi
    
    cat >> "$hbase_env" << EOF

# HBase性能优化配置
export HBASE_HEAPSIZE=${regionserver_heap_gb}G
export HBASE_MASTER_OPTS="\$HBASE_MASTER_OPTS -Xms${master_heap_gb}g -Xmx${master_heap_gb}g"
export HBASE_REGIONSERVER_OPTS="\$HBASE_REGIONSERVER_OPTS -Xms${regionserver_heap_gb}g -Xmx${regionserver_heap_gb}g -XX:+UseG1GC -XX:MaxGCPauseMillis=200"

# GC日志配置
export HBASE_OPTS="\$HBASE_OPTS -XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Xloggc:\$HBASE_HOME/logs/gc.log"
EOF
    
    log_info "HBase配置优化完成 (RegionServer堆内存: ${regionserver_heap_gb}GB)"
}

# 优化Spark配置
optimize_spark_config() {
    log_step "优化Spark配置..."
    
    local spark_defaults="$SPARK_HOME/conf/spark-defaults.conf"
    
    if [ -f "$spark_defaults" ]; then
        cp "$spark_defaults" "$spark_defaults.backup.$(date +%Y%m%d_%H%M%S)"
    fi
    
    # 计算Spark内存配置
    local executor_memory_gb=$((TOTAL_MEM_GB * 60 / 100 / CPU_CORES))
    local driver_memory_gb=$((TOTAL_MEM_GB * 20 / 100))
    
    cat > "$spark_defaults" << EOF
# Spark性能优化配置

# 内存配置
spark.executor.memory              ${executor_memory_gb}g
spark.driver.memory                ${driver_memory_gb}g
spark.executor.memoryFraction      0.8
spark.storage.memoryFraction       0.3

# 并行度配置
spark.executor.instances           $CPU_CORES
spark.executor.cores               1
spark.default.parallelism          $((CPU_CORES * 2))
spark.sql.adaptive.enabled         true
spark.sql.adaptive.coalescePartitions.enabled  true

# 序列化配置
spark.serializer                   org.apache.spark.serializer.KryoSerializer
spark.kryo.unsafe                  true
spark.kryo.referenceTracking       false

# 网络配置
spark.network.timeout              300s
spark.rpc.askTimeout               300s
spark.sql.broadcastTimeout         300s

# 缓存配置
spark.sql.adaptive.shuffle.targetPostShuffleInputSize  64MB
spark.sql.adaptive.skewJoin.enabled  true

# 动态分配
spark.dynamicAllocation.enabled    true
spark.dynamicAllocation.minExecutors  1
spark.dynamicAllocation.maxExecutors  $((CPU_CORES * 2))
spark.dynamicAllocation.initialExecutors  $CPU_CORES

# GC配置
spark.executor.extraJavaOptions    -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+PrintGC -XX:+PrintGCDetails
spark.driver.extraJavaOptions      -XX:+UseG1GC -XX:MaxGCPauseMillis=200
EOF
    
    log_info "Spark配置优化完成 (Executor内存: ${executor_memory_gb}GB)"
}

# 优化磁盘IO
optimize_disk_io() {
    log_step "优化磁盘IO..."
    
    # 设置磁盘调度器
    for disk in $(lsblk -d -o NAME | grep -E '^[sv]d[a-z]$'); do
        if [ -f "/sys/block/$disk/queue/scheduler" ]; then
            echo "deadline" > "/sys/block/$disk/queue/scheduler"
            log_info "磁盘 $disk 调度器设置为 deadline"
        fi
    done
    
    # 优化文件系统挂载参数
    log_info "建议在 /etc/fstab 中添加以下挂载参数:"
    echo "  noatime,nodiratime,nobarrier,data=writeback"
    
    log_info "磁盘IO优化完成"
}

# 生成性能报告
generate_performance_report() {
    log_step "生成性能调优报告..."
    
    local report_file="/tmp/kylin_performance_report_$(date +%Y%m%d_%H%M%S).txt"
    
    cat > "$report_file" << EOF
Kylin性能调优报告
生成时间: $(date)

=== 系统信息 ===
CPU核心数: $CPU_CORES
总内存: ${TOTAL_MEM_GB}GB
操作系统: $(cat /etc/os-release | grep PRETTY_NAME | cut -d'"' -f2)
内核版本: $(uname -r)

=== 优化配置 ===
1. 内核参数优化: 已完成
   - 网络参数优化
   - 内存管理优化
   - 文件系统优化

2. 文件描述符限制: 已优化到 1048576

3. JVM配置优化:
   - Kylin堆内存: $((TOTAL_MEM_GB * 70 / 100))GB
   - 使用G1GC垃圾收集器
   - GC日志已启用

4. HBase配置优化:
   - RegionServer堆内存: $((TOTAL_MEM_GB * 50 / 100))GB
   - MemStore大小: 40%
   - BlockCache大小: 30%
   - 处理器线程数: $((CPU_CORES * 10))

5. Spark配置优化:
   - Executor内存: $((TOTAL_MEM_GB * 60 / 100 / CPU_CORES))GB
   - Driver内存: $((TOTAL_MEM_GB * 20 / 100))GB
   - 并行度: $((CPU_CORES * 2))
   - 动态分配已启用

6. 磁盘IO优化:
   - 磁盘调度器: deadline
   - 建议文件系统挂载参数已提供

=== 建议的后续操作 ===
1. 重启系统以应用内核参数
2. 重启Kylin、HBase、Spark服务
3. 监控系统性能指标
4. 根据实际负载调整配置

=== 监控建议 ===
1. 监控JVM GC日志
2. 监控HBase RegionServer性能
3. 监控Spark作业执行时间
4. 监控系统资源使用率
EOF
    
    echo "性能调优报告已生成: $report_file"
    cat "$report_file"
}

# 验证配置
verify_configuration() {
    log_step "验证配置..."
    
    echo "=== 配置验证 ==="
    
    # 检查内核参数
    echo "1. 内核参数:"
    echo "   vm.swappiness = $(sysctl -n vm.swappiness)"
    echo "   net.core.rmem_max = $(sysctl -n net.core.rmem_max)"
    
    # 检查文件限制
    echo "2. 文件描述符限制:"
    echo "   soft nofile = $(ulimit -Sn)"
    echo "   hard nofile = $(ulimit -Hn)"
    
    # 检查Java版本
    echo "3. Java版本:"
    java -version 2>&1 | head -1
    
    # 检查服务状态
    echo "4. 服务状态:"
    for service in kylin hbase-master hbase-regionserver; do
        if systemctl is-active "$service" >/dev/null 2>&1; then
            echo "   $service: 运行中"
        else
            echo "   $service: 未运行"
        fi
    done
    
    log_info "配置验证完成"
}

# 主函数
main() {
    case "${1:-all}" in
        "all")
            get_system_info
            optimize_kernel_params
            optimize_file_limits
            optimize_jvm_params
            optimize_hbase_config
            optimize_spark_config
            optimize_disk_io
            generate_performance_report
            ;;
        "kernel")
            optimize_kernel_params
            ;;
        "limits")
            optimize_file_limits
            ;;
        "jvm")
            get_system_info
            optimize_jvm_params
            ;;
        "hbase")
            get_system_info
            optimize_hbase_config
            ;;
        "spark")
            get_system_info
            optimize_spark_config
            ;;
        "disk")
            optimize_disk_io
            ;;
        "verify")
            verify_configuration
            ;;
        "report")
            get_system_info
            generate_performance_report
            ;;
        *)
            echo "用法: $0 [all|kernel|limits|jvm|hbase|spark|disk|verify|report]"
            echo "  all:     执行所有优化"
            echo "  kernel:  优化内核参数"
            echo "  limits:  优化文件限制"
            echo "  jvm:     优化JVM参数"
            echo "  hbase:   优化HBase配置"
            echo "  spark:   优化Spark配置"
            echo "  disk:    优化磁盘IO"
            echo "  verify:  验证配置"
            echo "  report:  生成性能报告"
            exit 1
            ;;
    esac
}

# 脚本入口
if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then
    main "$@"
fi

8.5.2 查询性能调优

查询优化监控脚本

#!/usr/bin/env python3
# query_optimizer.py - Kylin查询性能优化器

import time
import json
import requests
import logging
import sqlite3
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass
import matplotlib.pyplot as plt
import pandas as pd

@dataclass
class QueryMetrics:
    """查询指标"""
    query_id: str
    sql: str
    duration_ms: int
    scan_count: int
    scan_bytes: int
    result_count: int
    cache_hit: bool
    cube_name: str
    timestamp: datetime
    
class QueryOptimizer:
    """查询性能优化器"""
    
    def __init__(self, kylin_host: str = "localhost", kylin_port: int = 7070):
        self.kylin_url = f"http://{kylin_host}:{kylin_port}"
        self.auth = ('ADMIN', 'KYLIN')
        
        # 设置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
        
        # 初始化数据库
        self._init_database()
        
    def _init_database(self):
        """初始化SQLite数据库"""
        self.conn = sqlite3.connect('/var/log/kylin_query_metrics.db')
        
        self.conn.execute('''
            CREATE TABLE IF NOT EXISTS query_metrics (
                query_id TEXT PRIMARY KEY,
                sql TEXT,
                duration_ms INTEGER,
                scan_count INTEGER,
                scan_bytes INTEGER,
                result_count INTEGER,
                cache_hit BOOLEAN,
                cube_name TEXT,
                timestamp DATETIME
            )
        ''')
        
        self.conn.execute('''
            CREATE TABLE IF NOT EXISTS optimization_suggestions (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                query_pattern TEXT,
                suggestion TEXT,
                priority INTEGER,
                created_at DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        self.conn.commit()
    
    def collect_query_metrics(self) -> List[QueryMetrics]:
        """收集查询指标"""
        try:
            # 获取查询历史
            response = requests.get(
                f"{self.kylin_url}/kylin/api/query",
                auth=self.auth,
                params={'limit': 100, 'offset': 0}
            )
            
            if response.status_code != 200:
                self.logger.error(f"获取查询历史失败: {response.status_code}")
                return []
            
            queries = response.json()
            metrics = []
            
            for query in queries:
                try:
                    metric = QueryMetrics(
                        query_id=query.get('queryId', ''),
                        sql=query.get('sql', ''),
                        duration_ms=query.get('duration', 0),
                        scan_count=query.get('scanCount', 0),
                        scan_bytes=query.get('scanBytes', 0),
                        result_count=query.get('resultRowCount', 0),
                        cache_hit=query.get('cacheHit', False),
                        cube_name=query.get('cubeName', ''),
                        timestamp=datetime.fromtimestamp(query.get('lastModified', 0) / 1000)
                    )
                    metrics.append(metric)
                    
                except Exception as e:
                    self.logger.warning(f"解析查询指标失败: {e}")
                    continue
            
            return metrics
            
        except Exception as e:
            self.logger.error(f"收集查询指标失败: {e}")
            return []
    
    def store_metrics(self, metrics: List[QueryMetrics]):
        """存储指标到数据库"""
        for metric in metrics:
            try:
                self.conn.execute('''
                    INSERT OR REPLACE INTO query_metrics 
                    (query_id, sql, duration_ms, scan_count, scan_bytes, 
                     result_count, cache_hit, cube_name, timestamp)
                    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
                ''', (
                    metric.query_id, metric.sql, metric.duration_ms,
                    metric.scan_count, metric.scan_bytes, metric.result_count,
                    metric.cache_hit, metric.cube_name, metric.timestamp
                ))
                
            except Exception as e:
                self.logger.error(f"存储指标失败: {e}")
        
        self.conn.commit()
    
    def analyze_slow_queries(self, threshold_ms: int = 10000) -> List[Dict]:
        """分析慢查询"""
        cursor = self.conn.execute('''
            SELECT sql, AVG(duration_ms) as avg_duration, COUNT(*) as count,
                   cube_name, MAX(timestamp) as last_execution
            FROM query_metrics 
            WHERE duration_ms > ?
            GROUP BY sql, cube_name
            ORDER BY avg_duration DESC
            LIMIT 20
        ''', (threshold_ms,))
        
        slow_queries = []
        for row in cursor.fetchall():
            slow_queries.append({
                'sql': row[0],
                'avg_duration_ms': row[1],
                'execution_count': row[2],
                'cube_name': row[3],
                'last_execution': row[4]
            })
        
        return slow_queries
    
    def analyze_cache_efficiency(self) -> Dict:
        """分析缓存效率"""
        cursor = self.conn.execute('''
            SELECT 
                COUNT(*) as total_queries,
                SUM(CASE WHEN cache_hit THEN 1 ELSE 0 END) as cache_hits,
                AVG(CASE WHEN cache_hit THEN duration_ms ELSE NULL END) as avg_cache_duration,
                AVG(CASE WHEN NOT cache_hit THEN duration_ms ELSE NULL END) as avg_no_cache_duration
            FROM query_metrics
            WHERE timestamp > datetime('now', '-7 days')
        ''')
        
        row = cursor.fetchone()
        
        if row[0] > 0:
            cache_hit_rate = (row[1] / row[0]) * 100
        else:
            cache_hit_rate = 0
        
        return {
            'total_queries': row[0],
            'cache_hits': row[1],
            'cache_hit_rate': cache_hit_rate,
            'avg_cache_duration_ms': row[2] or 0,
            'avg_no_cache_duration_ms': row[3] or 0
        }
    
    def analyze_cube_usage(self) -> List[Dict]:
        """分析Cube使用情况"""
        cursor = self.conn.execute('''
            SELECT 
                cube_name,
                COUNT(*) as query_count,
                AVG(duration_ms) as avg_duration,
                SUM(scan_bytes) as total_scan_bytes,
                MAX(timestamp) as last_used
            FROM query_metrics
            WHERE timestamp > datetime('now', '-30 days')
            GROUP BY cube_name
            ORDER BY query_count DESC
        ''')
        
        cube_usage = []
        for row in cursor.fetchall():
            cube_usage.append({
                'cube_name': row[0],
                'query_count': row[1],
                'avg_duration_ms': row[2],
                'total_scan_bytes': row[3],
                'last_used': row[4]
            })
        
        return cube_usage
    
    def generate_optimization_suggestions(self) -> List[Dict]:
        """生成优化建议"""
        suggestions = []
        
        # 分析慢查询
        slow_queries = self.analyze_slow_queries()
        for query in slow_queries[:5]:  # 只处理前5个最慢的查询
            sql = query['sql'].lower()
            
            # 检查是否缺少WHERE条件
            if 'where' not in sql:
                suggestions.append({
                    'type': 'missing_filter',
                    'priority': 'high',
                    'description': '查询缺少WHERE条件,可能导致全表扫描',
                    'sql_pattern': query['sql'][:100] + '...',
                    'suggestion': '添加适当的WHERE条件来过滤数据'
                })
            
            # 检查是否使用了SELECT *
            if 'select *' in sql:
                suggestions.append({
                    'type': 'select_all',
                    'priority': 'medium',
                    'description': '使用SELECT *可能返回不必要的列',
                    'sql_pattern': query['sql'][:100] + '...',
                    'suggestion': '只选择需要的列,避免使用SELECT *'
                })
            
            # 检查是否有复杂的JOIN
            if sql.count('join') > 2:
                suggestions.append({
                    'type': 'complex_join',
                    'priority': 'medium',
                    'description': '查询包含多个JOIN,可能影响性能',
                    'sql_pattern': query['sql'][:100] + '...',
                    'suggestion': '考虑优化JOIN顺序或使用预聚合的Cube'
                })
        
        # 分析缓存效率
        cache_stats = self.analyze_cache_efficiency()
        if cache_stats['cache_hit_rate'] < 50:
            suggestions.append({
                'type': 'low_cache_hit',
                'priority': 'high',
                'description': f'缓存命中率较低 ({cache_stats["cache_hit_rate"]:.1f}%)',
                'suggestion': '检查查询模式,考虑调整缓存策略或优化Cube设计'
            })
        
        # 分析Cube使用
        cube_usage = self.analyze_cube_usage()
        unused_cubes = [cube for cube in cube_usage if cube['query_count'] == 0]
        if unused_cubes:
            suggestions.append({
                'type': 'unused_cubes',
                'priority': 'low',
                'description': f'发现 {len(unused_cubes)} 个未使用的Cube',
                'suggestion': '考虑删除或重新设计未使用的Cube以节省存储空间'
            })
        
        return suggestions
    
    def generate_performance_report(self) -> str:
        """生成性能报告"""
        report = []
        report.append("# Kylin查询性能分析报告")
        report.append(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report.append("")
        
        # 缓存效率分析
        cache_stats = self.analyze_cache_efficiency()
        report.append("## 缓存效率分析")
        report.append(f"- 总查询数: {cache_stats['total_queries']}")
        report.append(f"- 缓存命中数: {cache_stats['cache_hits']}")
        report.append(f"- 缓存命中率: {cache_stats['cache_hit_rate']:.1f}%")
        report.append(f"- 缓存命中平均耗时: {cache_stats['avg_cache_duration_ms']:.0f}ms")
        report.append(f"- 缓存未命中平均耗时: {cache_stats['avg_no_cache_duration_ms']:.0f}ms")
        report.append("")
        
        # 慢查询分析
        slow_queries = self.analyze_slow_queries()
        report.append("## 慢查询分析 (>10秒)")
        if slow_queries:
            for i, query in enumerate(slow_queries[:10], 1):
                report.append(f"{i}. Cube: {query['cube_name']}")
                report.append(f"   平均耗时: {query['avg_duration_ms']:.0f}ms")
                report.append(f"   执行次数: {query['execution_count']}")
                report.append(f"   SQL: {query['sql'][:100]}...")
                report.append("")
        else:
            report.append("未发现慢查询")
            report.append("")
        
        # Cube使用分析
        cube_usage = self.analyze_cube_usage()
        report.append("## Cube使用情况")
        for cube in cube_usage[:10]:
            report.append(f"- {cube['cube_name']}:")
            report.append(f"  查询次数: {cube['query_count']}")
            report.append(f"  平均耗时: {cube['avg_duration_ms']:.0f}ms")
            report.append(f"  扫描数据量: {cube['total_scan_bytes'] / 1024 / 1024:.1f}MB")
        report.append("")
        
        # 优化建议
        suggestions = self.generate_optimization_suggestions()
        report.append("## 优化建议")
        if suggestions:
            for i, suggestion in enumerate(suggestions, 1):
                report.append(f"{i}. **{suggestion['type']}** (优先级: {suggestion['priority']})")
                report.append(f"   问题: {suggestion['description']}")
                report.append(f"   建议: {suggestion['suggestion']}")
                if 'sql_pattern' in suggestion:
                    report.append(f"   SQL模式: {suggestion['sql_pattern']}")
                report.append("")
        else:
            report.append("暂无优化建议")
        
        return "\n".join(report)
    
    def create_performance_charts(self, output_dir: str = "/tmp"):
        """创建性能图表"""
        try:
            # 查询耗时趋势图
            cursor = self.conn.execute('''
                SELECT DATE(timestamp) as date, AVG(duration_ms) as avg_duration
                FROM query_metrics
                WHERE timestamp > datetime('now', '-30 days')
                GROUP BY DATE(timestamp)
                ORDER BY date
            ''')
            
            dates = []
            durations = []
            for row in cursor.fetchall():
                dates.append(row[0])
                durations.append(row[1])
            
            if dates:
                plt.figure(figsize=(12, 6))
                plt.plot(dates, durations, marker='o')
                plt.title('查询平均耗时趋势')
                plt.xlabel('日期')
                plt.ylabel('平均耗时 (ms)')
                plt.xticks(rotation=45)
                plt.tight_layout()
                plt.savefig(f"{output_dir}/query_duration_trend.png")
                plt.close()
            
            # 缓存命中率图表
            cursor = self.conn.execute('''
                SELECT DATE(timestamp) as date, 
                       AVG(CASE WHEN cache_hit THEN 1.0 ELSE 0.0 END) * 100 as hit_rate
                FROM query_metrics
                WHERE timestamp > datetime('now', '-30 days')
                GROUP BY DATE(timestamp)
                ORDER BY date
            ''')
            
            dates = []
            hit_rates = []
            for row in cursor.fetchall():
                dates.append(row[0])
                hit_rates.append(row[1])
            
            if dates:
                plt.figure(figsize=(12, 6))
                plt.plot(dates, hit_rates, marker='s', color='green')
                plt.title('缓存命中率趋势')
                plt.xlabel('日期')
                plt.ylabel('缓存命中率 (%)')
                plt.ylim(0, 100)
                plt.xticks(rotation=45)
                plt.tight_layout()
                plt.savefig(f"{output_dir}/cache_hit_rate_trend.png")
                plt.close()
            
            self.logger.info(f"性能图表已生成到 {output_dir}")
            
        except Exception as e:
            self.logger.error(f"生成性能图表失败: {e}")
    
    def run_optimization_analysis(self):
        """运行优化分析"""
        self.logger.info("开始查询性能优化分析...")
        
        # 收集指标
        metrics = self.collect_query_metrics()
        if metrics:
            self.store_metrics(metrics)
            self.logger.info(f"收集了 {len(metrics)} 条查询指标")
        
        # 生成报告
        report = self.generate_performance_report()
        
        # 保存报告
        report_file = f"/tmp/kylin_query_performance_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
        with open(report_file, 'w', encoding='utf-8') as f:
            f.write(report)
        
        self.logger.info(f"性能报告已生成: {report_file}")
        
        # 生成图表
        self.create_performance_charts()
        
        return report_file

def main():
    import argparse
    
    parser = argparse.ArgumentParser(description='Kylin查询性能优化器')
    parser.add_argument('--host', default='localhost', help='Kylin主机地址')
    parser.add_argument('--port', type=int, default=7070, help='Kylin端口')
    parser.add_argument('--action', choices=['analyze', 'report', 'monitor'], 
                       default='analyze', help='执行动作')
    
    args = parser.parse_args()
    
    optimizer = QueryOptimizer(args.host, args.port)
    
    if args.action == 'analyze':
        report_file = optimizer.run_optimization_analysis()
        print(f"分析完成,报告文件: {report_file}")
    elif args.action == 'report':
        report = optimizer.generate_performance_report()
        print(report)
    elif args.action == 'monitor':
        # 持续监控模式
        while True:
            try:
                optimizer.run_optimization_analysis()
                time.sleep(3600)  # 每小时分析一次
            except KeyboardInterrupt:
                print("监控已停止")
                break

if __name__ == '__main__':
    main()

8.6 最佳实践与总结

8.6.1 运维最佳实践

1. 部署最佳实践

  • 环境隔离: 严格区分开发、测试、生产环境
  • 版本管理: 使用Git管理配置文件和脚本
  • 自动化部署: 使用脚本实现一键部署和回滚
  • 配置管理: 集中管理配置文件,避免手动修改

2. 监控最佳实践

  • 全方位监控: 系统、应用、业务三个层面的监控
  • 告警策略: 设置合理的告警阈值,避免告警风暴
  • 日志管理: 统一日志格式,集中日志收集和分析
  • 性能基线: 建立性能基线,及时发现性能退化

3. 备份最佳实践

  • 定期备份: 建立定期备份策略,包括全量和增量备份
  • 异地备份: 重要数据进行异地备份
  • 备份验证: 定期验证备份文件的完整性
  • 恢复演练: 定期进行恢复演练,确保备份可用

4. 安全最佳实践

  • 访问控制: 实施最小权限原则
  • 网络安全: 配置防火墙和网络隔离
  • 数据加密: 敏感数据传输和存储加密
  • 审计日志: 记录所有操作日志,便于审计

8.6.2 性能优化最佳实践

1. 系统级优化

# 系统优化检查清单
echo "=== Kylin系统优化检查清单 ==="
echo "1. 内核参数优化"
echo "   - vm.swappiness = 1"
echo "   - net.core.rmem_max = 134217728"
echo "   - fs.file-max = 2097152"
echo
echo "2. JVM优化"
echo "   - 使用G1GC垃圾收集器"
echo "   - 设置合适的堆内存大小"
echo "   - 启用GC日志"
echo
echo "3. HBase优化"
echo "   - MemStore大小: 40%"
echo "   - BlockCache大小: 30%"
echo "   - 合理设置压缩策略"
echo
echo "4. Spark优化"
echo "   - 动态资源分配"
echo "   - 使用Kryo序列化"
echo "   - 合理设置并行度"

2. 应用级优化

  • Cube设计: 合理设计维度和度量
  • 分区策略: 使用时间分区提高查询效率
  • 索引优化: 为常用查询字段建立索引
  • 缓存策略: 合理配置查询缓存

8.6.3 故障处理最佳实践

故障处理流程

flowchart TD
    A[故障发现] --> B[故障确认]
    B --> C[影响评估]
    C --> D[应急处理]
    D --> E[根因分析]
    E --> F[永久修复]
    F --> G[总结改进]
    
    B --> H[误报处理]
    D --> I[升级处理]
    
    style A fill:#ff9999
    style D fill:#99ccff
    style F fill:#99ff99

常见故障处理

故障类型 症状 处理方法
服务无响应 连接超时 重启服务,检查日志
内存不足 OOM错误 调整JVM参数,增加内存
磁盘空间不足 写入失败 清理日志,扩展磁盘
网络问题 连接中断 检查网络配置,重启网络
数据不一致 查询结果异常 重建Cube,检查数据源

8.6.4 容量规划

容量规划公式

# 容量规划计算器
def calculate_capacity(daily_data_gb, retention_days, growth_rate, compression_ratio=0.3):
    """
    计算Kylin集群容量需求
    
    Args:
        daily_data_gb: 每日数据量(GB)
        retention_days: 数据保留天数
        growth_rate: 年增长率
        compression_ratio: 压缩比
    """
    # 基础存储需求
    base_storage = daily_data_gb * retention_days * compression_ratio
    
    # 考虑增长的存储需求
    yearly_growth = base_storage * growth_rate
    total_storage = base_storage + yearly_growth
    
    # 添加安全边际(30%)
    safe_storage = total_storage * 1.3
    
    # 计算内存需求(存储的10-20%)
    memory_requirement = safe_storage * 0.15
    
    # 计算CPU需求(每TB存储需要2-4核)
    cpu_requirement = (safe_storage / 1024) * 3
    
    return {
        'storage_gb': safe_storage,
        'memory_gb': memory_requirement,
        'cpu_cores': cpu_requirement,
        'nodes_recommended': max(3, int(cpu_requirement / 16))  # 假设每节点16核
    }

# 示例计算
capacity = calculate_capacity(
    daily_data_gb=100,
    retention_days=365,
    growth_rate=0.5,
    compression_ratio=0.3
)

print(f"存储需求: {capacity['storage_gb']:.0f} GB")
print(f"内存需求: {capacity['memory_gb']:.0f} GB")
print(f"CPU需求: {capacity['cpu_cores']:.0f} 核")
print(f"推荐节点数: {capacity['nodes_recommended']} 个")

8.6.5 运维工具集

运维脚本集合

#!/bin/bash
# kylin_ops_toolkit.sh - Kylin运维工具集

KYLIN_HOME="/opt/kylin"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"

# 显示菜单
show_menu() {
    echo "=== Kylin运维工具集 ==="
    echo "1. 健康检查"
    echo "2. 性能监控"
    echo "3. 日志分析"
    echo "4. 备份数据"
    echo "5. 恢复数据"
    echo "6. 性能调优"
    echo "7. 故障诊断"
    echo "8. 容量分析"
    echo "9. 集群状态"
    echo "0. 退出"
    echo "==================="
}

# 健康检查
health_check() {
    echo "执行健康检查..."
    python3 "$SCRIPT_DIR/kylin_monitor.py" --action health
}

# 性能监控
performance_monitor() {
    echo "启动性能监控..."
    python3 "$SCRIPT_DIR/query_optimizer.py" --action analyze
}

# 日志分析
log_analysis() {
    echo "分析系统日志..."
    python3 "$SCRIPT_DIR/log_analyzer.py" --days 7
}

# 备份数据
backup_data() {
    echo "开始数据备份..."
    bash "$SCRIPT_DIR/kylin_backup.sh"
}

# 恢复数据
restore_data() {
    echo "数据恢复..."
    bash "$SCRIPT_DIR/kylin_restore.sh" --interactive
}

# 性能调优
performance_tuning() {
    echo "执行性能调优..."
    bash "$SCRIPT_DIR/performance_tuning.sh" all
}

# 故障诊断
fault_diagnosis() {
    echo "故障诊断..."
    echo "检查服务状态..."
    systemctl status kylin hbase-master hbase-regionserver
    echo
    echo "检查端口监听..."
    netstat -tlnp | grep -E ':(7070|16010|16030)'
    echo
    echo "检查磁盘空间..."
    df -h
    echo
    echo "检查内存使用..."
    free -h
}

# 容量分析
capacity_analysis() {
    echo "容量分析..."
    echo "HBase表大小:"
    echo "list" | hbase shell | grep -E '^[a-zA-Z]'
    echo
    echo "磁盘使用情况:"
    du -sh "$KYLIN_HOME"/* 2>/dev/null
}

# 集群状态
cluster_status() {
    echo "集群状态检查..."
    echo "Kylin状态:"
    curl -s "http://localhost:7070/kylin/api/admin/public_config" | jq .
    echo
    echo "HBase状态:"
    echo "status" | hbase shell
}

# 主循环
main() {
    while true; do
        show_menu
        read -p "请选择操作 (0-9): " choice
        
        case $choice in
            1) health_check ;;
            2) performance_monitor ;;
            3) log_analysis ;;
            4) backup_data ;;
            5) restore_data ;;
            6) performance_tuning ;;
            7) fault_diagnosis ;;
            8) capacity_analysis ;;
            9) cluster_status ;;
            0) echo "退出"; exit 0 ;;
            *) echo "无效选择,请重新输入" ;;
        esac
        
        echo
        read -p "按回车键继续..."
        clear
    done
}

# 脚本入口
if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then
    main "$@"
fi

8.7 本章小结

本章详细介绍了Apache Kylin的运维管理与监控,主要内容包括:

核心知识点

  1. 集群部署与管理

    • 生产环境部署架构设计
    • 自动化部署脚本实现
    • 高可用配置和故障转移
  2. 监控与告警

    • 系统监控指标收集
    • JVM性能监控
    • 日志分析和告警机制
  3. 备份与恢复

    • 自动化备份策略
    • 数据恢复流程
    • 容灾和高可用设计
  4. 性能调优

    • 系统级性能优化
    • 查询性能分析和优化
    • 容量规划和资源配置
  5. 运维最佳实践

    • 故障处理流程
    • 安全管理策略
    • 运维工具集成

实用工具

  • 部署工具: deploy_kylin_cluster.sh - 自动化集群部署
  • 监控工具: kylin_monitor.py - 综合监控系统
  • 备份工具: kylin_backup.sh - 自动化备份
  • 恢复工具: kylin_restore.sh - 数据恢复
  • 调优工具: performance_tuning.sh - 性能优化
  • 故障转移: failover_manager.py - 自动故障转移
  • 查询优化: query_optimizer.py - 查询性能分析

最佳实践

  1. 监控策略: 建立全方位监控体系
  2. 备份策略: 定期备份和恢复演练
  3. 性能优化: 系统和应用双重优化
  4. 故障处理: 标准化故障处理流程
  5. 容量规划: 基于业务增长的容量规划

下一章预告

下一章将介绍Kylin与其他系统的集成,包括: - 与BI工具的集成 - 与数据湖的集成 - 与实时计算的集成 - API开发和SDK使用

练习与思考

  1. 实践练习:

    • 部署一个3节点的Kylin集群
    • 配置监控和告警系统
    • 实施备份和恢复策略
  2. 思考题:

    • 如何设计适合你业务场景的监控指标?
    • 在资源有限的情况下如何进行性能优化?
    • 如何建立有效的故障处理机制?

8.3.2 数据恢复策略

恢复脚本

#!/bin/bash
# kylin_restore.sh - Kylin数据恢复脚本

set -e

# 配置参数
BACKUP_DIR="/backup/kylin"
KYLIN_HOME="/opt/kylin"
HBASE_HOME="/opt/hbase"
HDFS_BACKUP_DIR="/backup/kylin"
RESTORE_DIR="/tmp/kylin_restore"

# 颜色输出
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m'

log_info() {
    echo -e "${GREEN}[INFO]${NC} $(date '+%Y-%m-%d %H:%M:%S') $1"
}

log_warn() {
    echo -e "${YELLOW}[WARN]${NC} $(date '+%Y-%m-%d %H:%M:%S') $1"
}

log_error() {
    echo -e "${RED}[ERROR]${NC} $(date '+%Y-%m-%d %H:%M:%S') $1"
}

log_step() {
    echo -e "${BLUE}[STEP]${NC} $(date '+%Y-%m-%d %H:%M:%S') $1"
}

# 检查前置条件
check_prerequisites() {
    log_step "检查恢复前置条件..."
    
    # 检查必要的工具
    local required_tools=("tar" "hadoop" "hbase")
    for tool in "${required_tools[@]}"; do
        if ! command -v "$tool" &> /dev/null; then
            log_error "缺少必要工具: $tool"
            return 1
        fi
    done
    
    # 检查HBase是否运行
    if ! "$HBASE_HOME/bin/hbase" shell <<< "status" &> /dev/null; then
        log_error "HBase未运行,请先启动HBase"
        return 1
    fi
    
    # 检查HDFS是否可用
    if ! hadoop fs -ls / &> /dev/null; then
        log_error "HDFS不可用,请检查Hadoop集群状态"
        return 1
    fi
    
    log_info "前置条件检查通过"
}

# 列出可用备份
list_backups() {
    log_step "列出可用备份..."
    
    echo "本地备份:"
    if [ -d "$BACKUP_DIR" ]; then
        find "$BACKUP_DIR" -name "kylin_backup_*.tar.gz" -printf "%f\t%TY-%Tm-%Td %TH:%TM\t%s bytes\n" | sort -r
    else
        echo "  无本地备份"
    fi
    
    echo "\nHDFS备份:"
    if hadoop fs -test -d "$HDFS_BACKUP_DIR" 2>/dev/null; then
        hadoop fs -ls "$HDFS_BACKUP_DIR" | grep "^d" | awk '{print $8"\t"$6" "$7}'
    else
        echo "  无HDFS备份"
    fi
}

# 验证备份文件
validate_backup() {
    local backup_file="$1"
    
    log_step "验证备份文件: $backup_file"
    
    # 检查文件是否存在
    if [ ! -f "$backup_file" ]; then
        log_error "备份文件不存在: $backup_file"
        return 1
    fi
    
    # 检查文件完整性
    if ! tar -tzf "$backup_file" >/dev/null 2>&1; then
        log_error "备份文件损坏: $backup_file"
        return 1
    fi
    
    # 检查备份内容
    local contents
    contents=$(tar -tzf "$backup_file" | head -20)
    log_info "备份内容预览:"
    echo "$contents" | sed 's/^/  /'
    
    log_info "备份文件验证通过"
}

# 解压备份文件
extract_backup() {
    local backup_file="$1"
    local extract_dir="$2"
    
    log_step "解压备份文件到: $extract_dir"
    
    # 创建解压目录
    mkdir -p "$extract_dir"
    
    # 解压备份文件
    tar -xzf "$backup_file" -C "$extract_dir" --strip-components=1
    
    if [ $? -eq 0 ]; then
        log_info "备份文件解压完成"
        
        # 显示解压内容
        log_info "解压内容:"
        find "$extract_dir" -maxdepth 2 -type d | sed 's/^/  /'
    else
        log_error "备份文件解压失败"
        return 1
    fi
}

# 停止Kylin服务
stop_kylin() {
    log_step "停止Kylin服务..."
    
    if [ -f "$KYLIN_HOME/bin/kylin.sh" ]; then
        "$KYLIN_HOME/bin/kylin.sh" stop || {
            log_warn "正常停止失败,尝试强制停止"
            pkill -f "kylin" || true
        }
        
        # 等待服务完全停止
        sleep 10
        
        if pgrep -f "kylin" > /dev/null; then
            log_error "Kylin服务仍在运行,请手动停止"
            return 1
        fi
        
        log_info "Kylin服务已停止"
    else
        log_warn "未找到Kylin启动脚本,跳过服务停止"
    fi
}

# 备份当前数据
backup_current_data() {
    local current_backup_dir="$RESTORE_DIR/current_backup_$(date +%Y%m%d_%H%M%S)"
    
    log_step "备份当前数据到: $current_backup_dir"
    
    mkdir -p "$current_backup_dir"
    
    # 备份当前元数据
    if [ -d "$KYLIN_HOME/meta" ]; then
        cp -r "$KYLIN_HOME/meta" "$current_backup_dir/"
        log_info "当前元数据已备份"
    fi
    
    # 备份当前配置
    if [ -d "$KYLIN_HOME/conf" ]; then
        cp -r "$KYLIN_HOME/conf" "$current_backup_dir/"
        log_info "当前配置已备份"
    fi
    
    log_info "当前数据备份完成: $current_backup_dir"
    echo "$current_backup_dir"
}

# 恢复元数据
restore_metadata() {
    local backup_extract_dir="$1"
    local metadata_dir="$backup_extract_dir/metadata"
    
    log_step "恢复元数据..."
    
    if [ ! -d "$metadata_dir" ]; then
        log_error "备份中未找到元数据目录"
        return 1
    fi
    
    # 恢复本地元数据
    if [ -d "$metadata_dir/local_meta" ]; then
        log_info "恢复本地元数据文件..."
        
        # 清理现有元数据
        if [ -d "$KYLIN_HOME/meta" ]; then
            rm -rf "$KYLIN_HOME/meta"
        fi
        
        # 恢复元数据
        cp -r "$metadata_dir/local_meta" "$KYLIN_HOME/meta"
        chown -R kylin:kylin "$KYLIN_HOME/meta" 2>/dev/null || true
        
        log_info "本地元数据恢复完成"
    fi
    
    # 恢复HBase元数据表
    local metadata_tables=("kylin_metadata" "kylin_dict" "kylin_statistics")
    
    for table in "${metadata_tables[@]}"; do
        local hdfs_backup_path="$HDFS_BACKUP_DIR/metadata/$(basename "$backup_extract_dir")/$table"
        
        if hadoop fs -test -d "$hdfs_backup_path" 2>/dev/null; then
            log_info "恢复元数据表: $table"
            
            # 删除现有表
            "$HBASE_HOME/bin/hbase" shell <<< "disable '$table'; drop '$table'" 2>/dev/null || true
            
            # 导入备份数据
            "$HBASE_HOME/bin/hbase" org.apache.hadoop.hbase.mapreduce.Import \
                "$table" "$hdfs_backup_path" || {
                log_error "恢复元数据表 $table 失败"
                return 1
            }
            
            log_info "元数据表 $table 恢复完成"
        else
            log_warn "未找到元数据表 $table 的备份数据"
        fi
    done
    
    log_info "元数据恢复完成"
}

# 恢复Cube数据
restore_cube_data() {
    local backup_extract_dir="$1"
    local cube_backup_path="$HDFS_BACKUP_DIR/cubes/$(basename "$backup_extract_dir")"
    
    log_step "恢复Cube数据..."
    
    if ! hadoop fs -test -d "$cube_backup_path" 2>/dev/null; then
        log_warn "未找到Cube数据备份,跳过Cube数据恢复"
        return 0
    fi
    
    # 获取备份中的Cube表
    local cube_tables
    cube_tables=$(hadoop fs -ls "$cube_backup_path" 2>/dev/null | grep "^d" | awk '{print $8}' | xargs -I {} basename {})
    
    if [ -n "$cube_tables" ]; then
        echo "$cube_tables" | while read -r table; do
            if [ -n "$table" ]; then
                log_info "恢复Cube表: $table"
                
                # 删除现有表
                "$HBASE_HOME/bin/hbase" shell <<< "disable '$table'; drop '$table'" 2>/dev/null || true
                
                # 导入备份数据
                "$HBASE_HOME/bin/hbase" org.apache.hadoop.hbase.mapreduce.Import \
                    "$table" "$cube_backup_path/$table" || {
                    log_warn "恢复Cube表 $table 失败,继续恢复其他表"
                }
            fi
        done
        
        log_info "Cube数据恢复完成"
    else
        log_info "未找到Cube表备份,跳过Cube数据恢复"
    fi
}

# 恢复配置文件
restore_config() {
    local backup_extract_dir="$1"
    local config_dir="$backup_extract_dir/config"
    
    log_step "恢复配置文件..."
    
    if [ ! -d "$config_dir" ]; then
        log_warn "备份中未找到配置目录,跳过配置恢复"
        return 0
    fi
    
    # 恢复Kylin配置
    if [ -d "$config_dir/kylin_conf" ]; then
        log_info "恢复Kylin配置文件..."
        
        # 备份当前配置
        if [ -d "$KYLIN_HOME/conf" ]; then
            mv "$KYLIN_HOME/conf" "$KYLIN_HOME/conf.backup.$(date +%Y%m%d_%H%M%S)"
        fi
        
        # 恢复配置
        cp -r "$config_dir/kylin_conf" "$KYLIN_HOME/conf"
        chown -R kylin:kylin "$KYLIN_HOME/conf" 2>/dev/null || true
        
        log_info "Kylin配置文件恢复完成"
    fi
    
    # 恢复HBase配置(可选)
    if [ -d "$config_dir/hbase_conf" ]; then
        log_info "发现HBase配置备份,请手动检查是否需要恢复"
        log_info "HBase配置备份位置: $config_dir/hbase_conf"
    fi
    
    log_info "配置文件恢复完成"
}

# 启动Kylin服务
start_kylin() {
    log_step "启动Kylin服务..."
    
    if [ -f "$KYLIN_HOME/bin/kylin.sh" ]; then
        "$KYLIN_HOME/bin/kylin.sh" start
        
        # 等待服务启动
        local max_wait=60
        local wait_time=0
        
        while [ $wait_time -lt $max_wait ]; do
            if curl -s "http://localhost:7070/kylin/api/user/authentication" >/dev/null 2>&1; then
                log_info "Kylin服务启动成功"
                return 0
            fi
            
            sleep 5
            wait_time=$((wait_time + 5))
            echo -n "."
        done
        
        echo
        log_error "Kylin服务启动超时,请检查日志"
        return 1
    else
        log_error "未找到Kylin启动脚本"
        return 1
    fi
}

# 验证恢复结果
verify_restore() {
    log_step "验证恢复结果..."
    
    # 检查Kylin服务状态
    if curl -s "http://localhost:7070/kylin/api/user/authentication" >/dev/null 2>&1; then
        log_info "✓ Kylin服务运行正常"
    else
        log_error "✗ Kylin服务异常"
        return 1
    fi
    
    # 检查元数据表
    local metadata_tables=("kylin_metadata" "kylin_dict" "kylin_statistics")
    for table in "${metadata_tables[@]}"; do
        if "$HBASE_HOME/bin/hbase" shell <<< "exists '$table'" 2>/dev/null | grep -q "Table $table does exist"; then
            log_info "✓ 元数据表 $table 存在"
        else
            log_warn "✗ 元数据表 $table 不存在"
        fi
    done
    
    # 检查项目和模型
    local api_response
    api_response=$(curl -s -u ADMIN:KYLIN "http://localhost:7070/kylin/api/projects" 2>/dev/null || echo "{}")
    
    if echo "$api_response" | grep -q '"name"'; then
        local project_count
        project_count=$(echo "$api_response" | grep -o '"name"' | wc -l)
        log_info "✓ 发现 $project_count 个项目"
    else
        log_warn "✗ 未发现项目数据"
    fi
    
    log_info "恢复验证完成"
}

# 清理临时文件
cleanup_temp_files() {
    log_step "清理临时文件..."
    
    if [ -d "$RESTORE_DIR" ]; then
        # 保留当前数据备份,删除其他临时文件
        find "$RESTORE_DIR" -maxdepth 1 -type d -name "kylin_backup_*" -exec rm -rf {} \; 2>/dev/null || true
        
        log_info "临时文件清理完成"
    fi
}

# 交互式恢复
interactive_restore() {
    echo "=== Kylin数据恢复向导 ==="
    echo
    
    # 列出可用备份
    list_backups
    echo
    
    # 选择备份文件
    read -p "请输入要恢复的备份文件路径: " backup_file
    
    if [ -z "$backup_file" ]; then
        log_error "未指定备份文件"
        return 1
    fi
    
    # 确认恢复操作
    echo
    echo "⚠️  警告: 恢复操作将覆盖当前数据!"
    echo "备份文件: $backup_file"
    echo
    read -p "确认继续恢复?(yes/no): " confirm
    
    if [ "$confirm" != "yes" ]; then
        log_info "恢复操作已取消"
        return 0
    fi
    
    # 执行恢复
    perform_restore "$backup_file"
}

# 执行恢复
perform_restore() {
    local backup_file="$1"
    
    if [ -z "$backup_file" ]; then
        log_error "未指定备份文件"
        return 1
    fi
    
    log_info "开始Kylin数据恢复..."
    log_info "备份文件: $backup_file"
    
    local start_time=$(date +%s)
    local extract_dir="$RESTORE_DIR/$(basename "$backup_file" .tar.gz)"
    
    # 检查前置条件
    check_prerequisites || return 1
    
    # 验证备份文件
    validate_backup "$backup_file" || return 1
    
    # 解压备份文件
    extract_backup "$backup_file" "$extract_dir" || return 1
    
    # 停止Kylin服务
    stop_kylin || return 1
    
    # 备份当前数据
    local current_backup
    current_backup=$(backup_current_data)
    
    # 执行恢复
    restore_metadata "$extract_dir" || {
        log_error "元数据恢复失败"
        return 1
    }
    
    restore_cube_data "$extract_dir" || {
        log_warn "Cube数据恢复失败,继续其他恢复"
    }
    
    restore_config "$extract_dir" || {
        log_warn "配置文件恢复失败,继续其他恢复"
    }
    
    # 启动Kylin服务
    start_kylin || {
        log_error "Kylin服务启动失败"
        return 1
    }
    
    # 验证恢复结果
    verify_restore || {
        log_error "恢复验证失败"
        return 1
    }
    
    # 清理临时文件
    cleanup_temp_files
    
    local end_time=$(date +%s)
    local duration=$((end_time - start_time))
    
    log_info "恢复完成!"
    log_info "耗时: ${duration}秒"
    log_info "当前数据备份: $current_backup"
    
    echo
    echo "=== 恢复完成 ==="
    echo "如果发现问题,可以使用以下备份回滚:"
    echo "  $current_backup"
}

# 主函数
main() {
    case "${1:-interactive}" in
        "interactive")
            interactive_restore
            ;;
        "restore")
            if [ -z "$2" ]; then
                echo "用法: $0 restore <backup_file>"
                exit 1
            fi
            perform_restore "$2"
            ;;
        "list")
            list_backups
            ;;
        "verify")
            if [ -z "$2" ]; then
                echo "用法: $0 verify <backup_file>"
                exit 1
            fi
            validate_backup "$2"
            ;;
        *)
            echo "用法: $0 [interactive|restore|list|verify]"
            echo "  interactive: 交互式恢复向导"
            echo "  restore:     恢复指定备份文件"
            echo "  list:        列出可用备份"
            echo "  verify:      验证备份文件"
            exit 1
            ;;
    esac
}

# 脚本入口
if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then
    main "$@"
fi

8.2.3 日志监控

日志分析工具

#!/usr/bin/env python3
# log_analyzer.py - Kylin日志分析工具

import re
import os
import gzip
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
from collections import defaultdict, Counter
import json
import argparse

class LogAnalyzer:
    """Kylin日志分析器"""
    
    def __init__(self, log_dir: str = "/opt/kylin/logs"):
        self.log_dir = log_dir
        self.patterns = {
            'error': re.compile(r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}).*ERROR.*?([^\n]+)', re.MULTILINE),
            'warn': re.compile(r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}).*WARN.*?([^\n]+)', re.MULTILINE),
            'query': re.compile(r'Query (\w+) takes (\d+) ms'),
            'build': re.compile(r'Build job (\w+) (RUNNING|FINISHED|ERROR|DISCARDED)'),
            'gc': re.compile(r'GC\s+\((\w+)\)\s+(\d+)ms'),
            'memory': re.compile(r'Memory usage: (\d+)MB / (\d+)MB')
        }
        
    def analyze_log_file(self, file_path: str, start_time: datetime = None, end_time: datetime = None) -> Dict:
        """分析单个日志文件"""
        results = {
            'errors': [],
            'warnings': [],
            'queries': [],
            'builds': [],
            'gc_events': [],
            'memory_events': [],
            'stats': {
                'total_lines': 0,
                'error_count': 0,
                'warn_count': 0,
                'query_count': 0,
                'avg_query_time': 0,
                'gc_count': 0,
                'total_gc_time': 0
            }
        }
        
        try:
            # 处理压缩文件
            if file_path.endswith('.gz'):
                with gzip.open(file_path, 'rt', encoding='utf-8') as f:
                    content = f.read()
            else:
                with open(file_path, 'r', encoding='utf-8') as f:
                    content = f.read()
            
            lines = content.split('\n')
            results['stats']['total_lines'] = len(lines)
            
            # 分析错误
            for match in self.patterns['error'].finditer(content):
                timestamp_str, message = match.groups()
                timestamp = self._parse_timestamp(timestamp_str)
                
                if self._is_in_time_range(timestamp, start_time, end_time):
                    results['errors'].append({
                        'timestamp': timestamp,
                        'message': message.strip()
                    })
                    results['stats']['error_count'] += 1
            
            # 分析警告
            for match in self.patterns['warn'].finditer(content):
                timestamp_str, message = match.groups()
                timestamp = self._parse_timestamp(timestamp_str)
                
                if self._is_in_time_range(timestamp, start_time, end_time):
                    results['warnings'].append({
                        'timestamp': timestamp,
                        'message': message.strip()
                    })
                    results['stats']['warn_count'] += 1
            
            # 分析查询
            query_times = []
            for match in self.patterns['query'].finditer(content):
                query_id, duration = match.groups()
                duration = int(duration)
                query_times.append(duration)
                
                results['queries'].append({
                    'query_id': query_id,
                    'duration': duration
                })
                results['stats']['query_count'] += 1
            
            if query_times:
                results['stats']['avg_query_time'] = sum(query_times) / len(query_times)
            
            # 分析构建任务
            for match in self.patterns['build'].finditer(content):
                job_id, status = match.groups()
                results['builds'].append({
                    'job_id': job_id,
                    'status': status
                })
            
            # 分析GC事件
            gc_times = []
            for match in self.patterns['gc'].finditer(content):
                gc_type, duration = match.groups()
                duration = int(duration)
                gc_times.append(duration)
                
                results['gc_events'].append({
                    'type': gc_type,
                    'duration': duration
                })
                results['stats']['gc_count'] += 1
            
            if gc_times:
                results['stats']['total_gc_time'] = sum(gc_times)
            
            # 分析内存事件
            for match in self.patterns['memory'].finditer(content):
                used, total = match.groups()
                results['memory_events'].append({
                    'used': int(used),
                    'total': int(total),
                    'usage_percent': (int(used) / int(total)) * 100
                })
            
        except Exception as e:
            print(f"分析文件 {file_path} 时出错: {e}")
        
        return results
    
    def _parse_timestamp(self, timestamp_str: str) -> datetime:
        """解析时间戳"""
        try:
            return datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S,%f')
        except ValueError:
            return datetime.now()
    
    def _is_in_time_range(self, timestamp: datetime, start_time: datetime, end_time: datetime) -> bool:
        """检查时间戳是否在指定范围内"""
        if start_time and timestamp < start_time:
            return False
        if end_time and timestamp > end_time:
            return False
        return True
    
    def analyze_directory(self, start_time: datetime = None, end_time: datetime = None) -> Dict:
        """分析整个日志目录"""
        combined_results = {
            'errors': [],
            'warnings': [],
            'queries': [],
            'builds': [],
            'gc_events': [],
            'memory_events': [],
            'stats': {
                'total_lines': 0,
                'error_count': 0,
                'warn_count': 0,
                'query_count': 0,
                'avg_query_time': 0,
                'gc_count': 0,
                'total_gc_time': 0,
                'files_analyzed': 0
            }
        }
        
        log_files = []
        for file_name in os.listdir(self.log_dir):
            if file_name.endswith('.log') or file_name.endswith('.log.gz'):
                log_files.append(os.path.join(self.log_dir, file_name))
        
        log_files.sort()
        
        total_query_time = 0
        total_queries = 0
        
        for file_path in log_files:
            print(f"分析文件: {file_path}")
            file_results = self.analyze_log_file(file_path, start_time, end_time)
            
            # 合并结果
            combined_results['errors'].extend(file_results['errors'])
            combined_results['warnings'].extend(file_results['warnings'])
            combined_results['queries'].extend(file_results['queries'])
            combined_results['builds'].extend(file_results['builds'])
            combined_results['gc_events'].extend(file_results['gc_events'])
            combined_results['memory_events'].extend(file_results['memory_events'])
            
            # 合并统计
            combined_results['stats']['total_lines'] += file_results['stats']['total_lines']
            combined_results['stats']['error_count'] += file_results['stats']['error_count']
            combined_results['stats']['warn_count'] += file_results['stats']['warn_count']
            combined_results['stats']['query_count'] += file_results['stats']['query_count']
            combined_results['stats']['gc_count'] += file_results['stats']['gc_count']
            combined_results['stats']['total_gc_time'] += file_results['stats']['total_gc_time']
            combined_results['stats']['files_analyzed'] += 1
            
            if file_results['stats']['query_count'] > 0:
                total_query_time += file_results['stats']['avg_query_time'] * file_results['stats']['query_count']
                total_queries += file_results['stats']['query_count']
        
        if total_queries > 0:
            combined_results['stats']['avg_query_time'] = total_query_time / total_queries
        
        return combined_results
    
    def generate_error_summary(self, results: Dict) -> Dict:
        """生成错误摘要"""
        error_patterns = Counter()
        error_timeline = defaultdict(int)
        
        for error in results['errors']:
            # 提取错误模式
            message = error['message']
            # 简化错误消息以识别模式
            pattern = re.sub(r'\d+', 'N', message)  # 替换数字
            pattern = re.sub(r'[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}', 'UUID', pattern)  # 替换UUID
            error_patterns[pattern] += 1
            
            # 按小时统计错误
            hour_key = error['timestamp'].strftime('%Y-%m-%d %H:00')
            error_timeline[hour_key] += 1
        
        return {
            'top_error_patterns': error_patterns.most_common(10),
            'error_timeline': dict(error_timeline),
            'total_unique_errors': len(error_patterns)
        }
    
    def generate_performance_summary(self, results: Dict) -> Dict:
        """生成性能摘要"""
        query_times = [q['duration'] for q in results['queries']]
        gc_times = [gc['duration'] for gc in results['gc_events']]
        
        performance_summary = {
            'query_performance': {},
            'gc_performance': {},
            'memory_usage': {}
        }
        
        if query_times:
            query_times.sort()
            performance_summary['query_performance'] = {
                'total_queries': len(query_times),
                'avg_time': sum(query_times) / len(query_times),
                'median_time': query_times[len(query_times) // 2],
                'p95_time': query_times[int(len(query_times) * 0.95)],
                'p99_time': query_times[int(len(query_times) * 0.99)],
                'max_time': max(query_times),
                'min_time': min(query_times)
            }
        
        if gc_times:
            performance_summary['gc_performance'] = {
                'total_gc_events': len(gc_times),
                'total_gc_time': sum(gc_times),
                'avg_gc_time': sum(gc_times) / len(gc_times),
                'max_gc_time': max(gc_times)
            }
        
        if results['memory_events']:
            memory_usages = [m['usage_percent'] for m in results['memory_events']]
            performance_summary['memory_usage'] = {
                'avg_usage': sum(memory_usages) / len(memory_usages),
                'max_usage': max(memory_usages),
                'min_usage': min(memory_usages)
            }
        
        return performance_summary
    
    def generate_report(self, results: Dict, output_file: str = None) -> str:
        """生成分析报告"""
        error_summary = self.generate_error_summary(results)
        performance_summary = self.generate_performance_summary(results)
        
        report = f"""
=== Kylin日志分析报告 ===
生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
分析文件数: {results['stats']['files_analyzed']}
总日志行数: {results['stats']['total_lines']:,}

📊 基本统计:
  错误数量: {results['stats']['error_count']:,}
  警告数量: {results['stats']['warn_count']:,}
  查询数量: {results['stats']['query_count']:,}
  GC事件数: {results['stats']['gc_count']:,}

🔍 查询性能:"""
        
        if performance_summary['query_performance']:
            qp = performance_summary['query_performance']
            report += f"""
  总查询数: {qp['total_queries']:,}
  平均响应时间: {qp['avg_time']:.2f}ms
  中位数响应时间: {qp['median_time']:.2f}ms
  95%响应时间: {qp['p95_time']:.2f}ms
  99%响应时间: {qp['p99_time']:.2f}ms
  最大响应时间: {qp['max_time']:.2f}ms
  最小响应时间: {qp['min_time']:.2f}ms"""
        
        report += "\n\n🗑️ GC性能:"
        if performance_summary['gc_performance']:
            gp = performance_summary['gc_performance']
            report += f"""
  GC事件总数: {gp['total_gc_events']:,}
  GC总时间: {gp['total_gc_time']:,}ms
  平均GC时间: {gp['avg_gc_time']:.2f}ms
  最大GC时间: {gp['max_gc_time']:.2f}ms"""
        
        report += "\n\n💾 内存使用:"
        if performance_summary['memory_usage']:
            mu = performance_summary['memory_usage']
            report += f"""
  平均内存使用率: {mu['avg_usage']:.1f}%
  最大内存使用率: {mu['max_usage']:.1f}%
  最小内存使用率: {mu['min_usage']:.1f}%"""
        
        report += "\n\n⚠️ 错误分析:"
        if error_summary['top_error_patterns']:
            report += f"""
  错误模式总数: {error_summary['total_unique_errors']}
  
  Top 10 错误模式:"""
            for i, (pattern, count) in enumerate(error_summary['top_error_patterns'], 1):
                report += f"\n  {i}. ({count}次) {pattern[:100]}..."
        
        report += "\n\n📈 错误时间分布:"
        if error_summary['error_timeline']:
            sorted_timeline = sorted(error_summary['error_timeline'].items())
            for time_slot, count in sorted_timeline[-10:]:  # 显示最近10个时间段
                report += f"\n  {time_slot}: {count}个错误"
        
        # 构建任务分析
        if results['builds']:
            build_status = Counter(b['status'] for b in results['builds'])
            report += "\n\n🔨 构建任务状态:"
            for status, count in build_status.items():
                report += f"\n  {status}: {count}"
        
        report += "\n\n=== 报告结束 ==="
        
        if output_file:
            with open(output_file, 'w', encoding='utf-8') as f:
                f.write(report)
            print(f"报告已保存到: {output_file}")
        
        return report

def main():
    parser = argparse.ArgumentParser(description='Kylin日志分析工具')
    parser.add_argument('--log-dir', default='/opt/kylin/logs', help='日志目录路径')
    parser.add_argument('--start-time', help='开始时间 (YYYY-MM-DD HH:MM:SS)')
    parser.add_argument('--end-time', help='结束时间 (YYYY-MM-DD HH:MM:SS)')
    parser.add_argument('--output', help='输出报告文件路径')
    parser.add_argument('--json', action='store_true', help='输出JSON格式结果')
    
    args = parser.parse_args()
    
    analyzer = LogAnalyzer(args.log_dir)
    
    start_time = None
    end_time = None
    
    if args.start_time:
        start_time = datetime.strptime(args.start_time, '%Y-%m-%d %H:%M:%S')
    
    if args.end_time:
        end_time = datetime.strptime(args.end_time, '%Y-%m-%d %H:%M:%S')
    
    print("开始分析日志...")
    results = analyzer.analyze_directory(start_time, end_time)
    
    if args.json:
        # 转换datetime对象为字符串以便JSON序列化
        for error in results['errors']:
            error['timestamp'] = error['timestamp'].isoformat()
        for warning in results['warnings']:
            warning['timestamp'] = warning['timestamp'].isoformat()
        
        output = json.dumps(results, indent=2, ensure_ascii=False)
        if args.output:
            with open(args.output, 'w', encoding='utf-8') as f:
                f.write(output)
        else:
            print(output)
    else:
        report = analyzer.generate_report(results, args.output)
        if not args.output:
            print(report)

if __name__ == "__main__":
    main()

8.3 备份与恢复

8.3.1 数据备份策略

自动化备份脚本

#!/bin/bash
# kylin_backup.sh - Kylin数据备份脚本

set -e

# 配置参数
BACKUP_DIR="/backup/kylin"
KYLIN_HOME="/opt/kylin"
HBASE_HOME="/opt/hbase"
HDFS_BACKUP_DIR="/backup/kylin"
RETENTION_DAYS=30
DATE=$(date +"%Y%m%d_%H%M%S")
BACKUP_NAME="kylin_backup_$DATE"

# 邮件配置
EMAIL_ENABLED=true
EMAIL_TO="admin@company.com"
SMTP_SERVER="smtp.company.com"

# 颜色输出
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'

log_info() {
    echo -e "${GREEN}[INFO]${NC} $(date '+%Y-%m-%d %H:%M:%S') $1"
}

log_warn() {
    echo -e "${YELLOW}[WARN]${NC} $(date '+%Y-%m-%d %H:%M:%S') $1"
}

log_error() {
    echo -e "${RED}[ERROR]${NC} $(date '+%Y-%m-%d %H:%M:%S') $1"
}

# 发送邮件通知
send_notification() {
    local subject="$1"
    local message="$2"
    
    if [ "$EMAIL_ENABLED" = "true" ]; then
        echo "$message" | mail -s "$subject" "$EMAIL_TO"
    fi
}

# 创建备份目录
create_backup_dir() {
    local backup_path="$BACKUP_DIR/$BACKUP_NAME"
    
    log_info "创建备份目录: $backup_path"
    mkdir -p "$backup_path"
    
    echo "$backup_path"
}

# 备份Kylin元数据
backup_metadata() {
    local backup_path="$1"
    local metadata_backup="$backup_path/metadata"
    
    log_info "备份Kylin元数据..."
    
    mkdir -p "$metadata_backup"
    
    # 备份HBase中的元数据表
    local metadata_tables=("kylin_metadata" "kylin_dict" "kylin_statistics")
    
    for table in "${metadata_tables[@]}"; do
        log_info "备份元数据表: $table"
        
        # 使用HBase export工具
        "$HBASE_HOME/bin/hbase" org.apache.hadoop.hbase.mapreduce.Export \
            "$table" "$HDFS_BACKUP_DIR/metadata/$BACKUP_NAME/$table" || {
            log_error "备份元数据表 $table 失败"
            return 1
        }
    done
    
    # 备份本地元数据文件
    if [ -d "$KYLIN_HOME/meta" ]; then
        log_info "备份本地元数据文件..."
        cp -r "$KYLIN_HOME/meta" "$metadata_backup/local_meta"
    fi
    
    log_info "元数据备份完成"
}

# 备份Cube数据
backup_cube_data() {
    local backup_path="$1"
    local cube_backup="$backup_path/cubes"
    
    log_info "备份Cube数据..."
    
    mkdir -p "$cube_backup"
    
    # 获取所有Cube表
    local cube_tables
    cube_tables=$("$HBASE_HOME/bin/hbase" shell <<< "list 'kylin_*'" 2>/dev/null | grep "kylin_" | grep -v "kylin_metadata" | grep -v "kylin_dict" | grep -v "kylin_statistics")
    
    if [ -n "$cube_tables" ]; then
        echo "$cube_tables" | while read -r table; do
            if [ -n "$table" ]; then
                log_info "备份Cube表: $table"
                
                # 使用HBase export工具
                "$HBASE_HOME/bin/hbase" org.apache.hadoop.hbase.mapreduce.Export \
                    "$table" "$HDFS_BACKUP_DIR/cubes/$BACKUP_NAME/$table" || {
                    log_warn "备份Cube表 $table 失败,继续备份其他表"
                }
            fi
        done
    else
        log_info "未找到Cube表,跳过Cube数据备份"
    fi
    
    log_info "Cube数据备份完成"
}

# 备份配置文件
backup_config() {
    local backup_path="$1"
    local config_backup="$backup_path/config"
    
    log_info "备份配置文件..."
    
    mkdir -p "$config_backup"
    
    # 备份Kylin配置
    if [ -d "$KYLIN_HOME/conf" ]; then
        cp -r "$KYLIN_HOME/conf" "$config_backup/kylin_conf"
    fi
    
    # 备份HBase配置
    if [ -d "$HBASE_HOME/conf" ]; then
        cp -r "$HBASE_HOME/conf" "$config_backup/hbase_conf"
    fi
    
    # 备份环境变量
    env | grep -E "(KYLIN|HBASE|HADOOP|SPARK)" > "$config_backup/environment.txt"
    
    log_info "配置文件备份完成"
}

# 备份日志文件
backup_logs() {
    local backup_path="$1"
    local logs_backup="$backup_path/logs"
    
    log_info "备份日志文件..."
    
    mkdir -p "$logs_backup"
    
    # 备份最近7天的日志
    if [ -d "$KYLIN_HOME/logs" ]; then
        find "$KYLIN_HOME/logs" -name "*.log" -mtime -7 -exec cp {} "$logs_backup/" \;
        find "$KYLIN_HOME/logs" -name "*.log.gz" -mtime -7 -exec cp {} "$logs_backup/" \;
    fi
    
    log_info "日志文件备份完成"
}

# 创建备份清单
create_backup_manifest() {
    local backup_path="$1"
    local manifest_file="$backup_path/backup_manifest.txt"
    
    log_info "创建备份清单..."
    
    cat > "$manifest_file" << EOF
=== Kylin备份清单 ===
备份名称: $BACKUP_NAME
备份时间: $(date)
备份路径: $backup_path

=== 备份内容 ===
EOF
    
    # 统计各部分大小
    if [ -d "$backup_path/metadata" ]; then
        metadata_size=$(du -sh "$backup_path/metadata" | cut -f1)
        echo "元数据: $metadata_size" >> "$manifest_file"
    fi
    
    if [ -d "$backup_path/cubes" ]; then
        cubes_size=$(du -sh "$backup_path/cubes" | cut -f1)
        echo "Cube数据: $cubes_size" >> "$manifest_file"
    fi
    
    if [ -d "$backup_path/config" ]; then
        config_size=$(du -sh "$backup_path/config" | cut -f1)
        echo "配置文件: $config_size" >> "$manifest_file"
    fi
    
    if [ -d "$backup_path/logs" ]; then
        logs_size=$(du -sh "$backup_path/logs" | cut -f1)
        echo "日志文件: $logs_size" >> "$manifest_file"
    fi
    
    total_size=$(du -sh "$backup_path" | cut -f1)
    echo "\n总大小: $total_size" >> "$manifest_file"
    
    # 添加文件列表
    echo "\n=== 文件列表 ===" >> "$manifest_file"
    find "$backup_path" -type f | sort >> "$manifest_file"
    
    log_info "备份清单创建完成"
}

# 压缩备份
compress_backup() {
    local backup_path="$1"
    local compressed_file="$backup_path.tar.gz"
    
    log_info "压缩备份文件..."
    
    cd "$(dirname "$backup_path")"
    tar -czf "$compressed_file" "$(basename "$backup_path")"
    
    if [ $? -eq 0 ]; then
        log_info "备份压缩完成: $compressed_file"
        
        # 删除原始备份目录
        rm -rf "$backup_path"
        
        echo "$compressed_file"
    else
        log_error "备份压缩失败"
        return 1
    fi
}

# 清理旧备份
cleanup_old_backups() {
    log_info "清理 $RETENTION_DAYS 天前的备份..."
    
    # 清理本地备份
    find "$BACKUP_DIR" -name "kylin_backup_*.tar.gz" -mtime +$RETENTION_DAYS -delete
    
    # 清理HDFS备份
    hadoop fs -ls "$HDFS_BACKUP_DIR" 2>/dev/null | awk '{print $8}' | while read -r path; do
        if [ -n "$path" ]; then
            # 检查目录修改时间
            mod_time=$(hadoop fs -stat "%Y" "$path" 2>/dev/null)
            if [ -n "$mod_time" ]; then
                current_time=$(date +%s)
                age_days=$(( (current_time - mod_time) / 86400 ))
                
                if [ $age_days -gt $RETENTION_DAYS ]; then
                    log_info "删除旧备份: $path"
                    hadoop fs -rm -r "$path"
                fi
            fi
        fi
    done
    
    log_info "旧备份清理完成"
}

# 验证备份
verify_backup() {
    local backup_file="$1"
    
    log_info "验证备份文件..."
    
    # 检查文件是否存在
    if [ ! -f "$backup_file" ]; then
        log_error "备份文件不存在: $backup_file"
        return 1
    fi
    
    # 检查文件大小
    file_size=$(stat -c%s "$backup_file")
    if [ $file_size -lt 1024 ]; then
        log_error "备份文件过小,可能损坏: $backup_file"
        return 1
    fi
    
    # 测试压缩文件完整性
    if ! tar -tzf "$backup_file" >/dev/null 2>&1; then
        log_error "备份文件损坏: $backup_file"
        return 1
    fi
    
    log_info "备份验证通过"
    return 0
}

# 主备份流程
perform_backup() {
    log_info "开始Kylin数据备份..."
    
    local start_time=$(date +%s)
    
    # 创建备份目录
    local backup_path
    backup_path=$(create_backup_dir)
    
    # 执行各项备份
    backup_metadata "$backup_path" || {
        log_error "元数据备份失败"
        send_notification "Kylin备份失败" "元数据备份失败,请检查系统状态"
        return 1
    }
    
    backup_cube_data "$backup_path" || {
        log_warn "Cube数据备份失败,继续其他备份"
    }
    
    backup_config "$backup_path"
    backup_logs "$backup_path"
    
    # 创建备份清单
    create_backup_manifest "$backup_path"
    
    # 压缩备份
    local compressed_file
    compressed_file=$(compress_backup "$backup_path")
    
    if [ $? -eq 0 ]; then
        # 验证备份
        if verify_backup "$compressed_file"; then
            local end_time=$(date +%s)
            local duration=$((end_time - start_time))
            local file_size=$(du -sh "$compressed_file" | cut -f1)
            
            log_info "备份完成!"
            log_info "备份文件: $compressed_file"
            log_info "文件大小: $file_size"
            log_info "耗时: ${duration}秒"
            
            # 发送成功通知
            send_notification "Kylin备份成功" "备份已完成\n文件: $compressed_file\n大小: $file_size\n耗时: ${duration}秒"
            
            # 清理旧备份
            cleanup_old_backups
        else
            log_error "备份验证失败"
            send_notification "Kylin备份失败" "备份验证失败,请检查备份文件"
            return 1
        fi
    else
        log_error "备份压缩失败"
        send_notification "Kylin备份失败" "备份压缩失败,请检查磁盘空间"
        return 1
    fi
}

# 主函数
main() {
    case "${1:-backup}" in
        "backup")
            perform_backup
            ;;
        "cleanup")
            cleanup_old_backups
            ;;
        "verify")
            if [ -z "$2" ]; then
                echo "用法: $0 verify <backup_file>"
                exit 1
            fi
            verify_backup "$2"
            ;;
        *)
            echo "用法: $0 [backup|cleanup|verify]"
            echo "  backup:  执行完整备份"
            echo "  cleanup: 清理旧备份"
            echo "  verify:  验证备份文件"
            exit 1
            ;;
    esac
}

# 脚本入口
if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then
    main "$@"
fi

8.1.2 高可用配置

负载均衡配置

# nginx.conf - Kylin负载均衡配置

upstream kylin_cluster {
    # 权重轮询
    server node1:7070 weight=3 max_fails=3 fail_timeout=30s;
    server node2:7070 weight=3 max_fails=3 fail_timeout=30s;
    server node3:7070 weight=2 max_fails=3 fail_timeout=30s;
    
    # 会话保持
    ip_hash;
}

server {
    listen 80;
    server_name kylin.company.com;
    
    # 重定向到HTTPS
    return 301 https://$server_name$request_uri;
}

server {
    listen 443 ssl http2;
    server_name kylin.company.com;
    
    # SSL配置
    ssl_certificate /etc/ssl/certs/kylin.crt;
    ssl_certificate_key /etc/ssl/private/kylin.key;
    ssl_protocols TLSv1.2 TLSv1.3;
    ssl_ciphers ECDHE-RSA-AES256-GCM-SHA512:DHE-RSA-AES256-GCM-SHA512:ECDHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES256-GCM-SHA384;
    ssl_prefer_server_ciphers off;
    ssl_session_cache shared:SSL:10m;
    ssl_session_timeout 10m;
    
    # 安全头
    add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
    add_header X-Frame-Options DENY always;
    add_header X-Content-Type-Options nosniff always;
    add_header X-XSS-Protection "1; mode=block" always;
    
    # 日志配置
    access_log /var/log/nginx/kylin_access.log;
    error_log /var/log/nginx/kylin_error.log;
    
    # 代理配置
    location / {
        proxy_pass http://kylin_cluster;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        
        # 超时配置
        proxy_connect_timeout 30s;
        proxy_send_timeout 300s;
        proxy_read_timeout 300s;
        
        # 缓冲配置
        proxy_buffering on;
        proxy_buffer_size 4k;
        proxy_buffers 8 4k;
        proxy_busy_buffers_size 8k;
        
        # WebSocket支持
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }
    
    # API路径特殊处理
    location /kylin/api/ {
        proxy_pass http://kylin_cluster;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        
        # API超时配置
        proxy_connect_timeout 10s;
        proxy_send_timeout 600s;
        proxy_read_timeout 600s;
    }
    
    # 健康检查
    location /health {
        access_log off;
        return 200 "healthy\n";
        add_header Content-Type text/plain;
    }
    
    # 静态资源缓存
    location ~* \.(js|css|png|jpg|jpeg|gif|ico|svg)$ {
        proxy_pass http://kylin_cluster;
        expires 1y;
        add_header Cache-Control "public, immutable";
    }
}

故障转移脚本

#!/usr/bin/env python3
# failover_manager.py - Kylin故障转移管理器

import time
import requests
import logging
import smtplib
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart
from typing import List, Dict
from dataclasses import dataclass
from datetime import datetime
import subprocess
import json

@dataclass
class KylinNode:
    """Kylin节点信息"""
    hostname: str
    port: int
    role: str  # master, slave, query_only
    status: str  # active, inactive, failed
    last_check: datetime
    fail_count: int

class FailoverManager:
    """故障转移管理器"""
    
    def __init__(self, config_file: str = "failover_config.json"):
        self.logger = logging.getLogger(__name__)
        self.config = self.load_config(config_file)
        self.nodes = self.init_nodes()
        self.current_master = None
        
    def load_config(self, config_file: str) -> Dict:
        """加载配置文件"""
        try:
            with open(config_file, 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            # 默认配置
            return {
                "nodes": [
                    {"hostname": "node1", "port": 7070, "role": "master"},
                    {"hostname": "node2", "port": 7070, "role": "slave"},
                    {"hostname": "node3", "port": 7070, "role": "query_only"}
                ],
                "check_interval": 30,
                "fail_threshold": 3,
                "timeout": 10,
                "email": {
                    "smtp_server": "smtp.company.com",
                    "smtp_port": 587,
                    "username": "alert@company.com",
                    "password": "password",
                    "recipients": ["admin@company.com"]
                },
                "nginx_config": "/etc/nginx/conf.d/kylin.conf"
            }
    
    def init_nodes(self) -> List[KylinNode]:
        """初始化节点列表"""
        nodes = []
        for node_config in self.config["nodes"]:
            node = KylinNode(
                hostname=node_config["hostname"],
                port=node_config["port"],
                role=node_config["role"],
                status="unknown",
                last_check=datetime.now(),
                fail_count=0
            )
            nodes.append(node)
            
            if node.role == "master":
                self.current_master = node
        
        return nodes
    
    def check_node_health(self, node: KylinNode) -> bool:
        """检查节点健康状态"""
        try:
            url = f"http://{node.hostname}:{node.port}/kylin/api/admin/version"
            response = requests.get(url, timeout=self.config["timeout"])
            
            if response.status_code == 200:
                node.status = "active"
                node.fail_count = 0
                return True
            else:
                node.fail_count += 1
                return False
                
        except Exception as e:
            self.logger.warning(f"节点 {node.hostname} 健康检查失败: {e}")
            node.fail_count += 1
            return False
        finally:
            node.last_check = datetime.now()
    
    def check_all_nodes(self):
        """检查所有节点健康状态"""
        self.logger.info("开始健康检查...")
        
        for node in self.nodes:
            is_healthy = self.check_node_health(node)
            
            if not is_healthy:
                if node.fail_count >= self.config["fail_threshold"]:
                    if node.status != "failed":
                        self.logger.error(f"节点 {node.hostname} 故障")
                        node.status = "failed"
                        self.handle_node_failure(node)
                else:
                    node.status = "inactive"
            
            self.logger.info(f"节点 {node.hostname}: {node.status} (失败次数: {node.fail_count})")
    
    def handle_node_failure(self, failed_node: KylinNode):
        """处理节点故障"""
        self.logger.error(f"处理节点故障: {failed_node.hostname}")
        
        # 发送告警
        self.send_alert(f"Kylin节点故障", f"节点 {failed_node.hostname} 已故障")
        
        # 如果是主节点故障,进行故障转移
        if failed_node.role == "master" or failed_node == self.current_master:
            self.perform_failover()
        
        # 从负载均衡中移除故障节点
        self.update_load_balancer()
    
    def perform_failover(self):
        """执行故障转移"""
        self.logger.info("开始执行故障转移...")
        
        # 寻找可用的备用节点
        candidate_nodes = [
            node for node in self.nodes 
            if node.status == "active" and node.role in ["slave", "master"]
        ]
        
        if not candidate_nodes:
            self.logger.error("没有可用的备用节点进行故障转移")
            self.send_alert("故障转移失败", "没有可用的备用节点")
            return
        
        # 选择新的主节点(优先选择slave角色)
        new_master = None
        for node in candidate_nodes:
            if node.role == "slave":
                new_master = node
                break
        
        if not new_master:
            new_master = candidate_nodes[0]
        
        # 更新角色
        if self.current_master:
            self.current_master.role = "failed"
        
        new_master.role = "master"
        self.current_master = new_master
        
        self.logger.info(f"故障转移完成,新主节点: {new_master.hostname}")
        self.send_alert("故障转移完成", f"新主节点: {new_master.hostname}")
        
        # 更新负载均衡配置
        self.update_load_balancer()
    
    def update_load_balancer(self):
        """更新负载均衡配置"""
        try:
            # 生成新的upstream配置
            upstream_config = "upstream kylin_cluster {\n"
            
            for node in self.nodes:
                if node.status == "active":
                    weight = 3 if node.role == "master" else 2 if node.role == "slave" else 1
                    upstream_config += f"    server {node.hostname}:{node.port} weight={weight} max_fails=3 fail_timeout=30s;\n"
            
            upstream_config += "    ip_hash;\n}"
            
            # 这里应该更新nginx配置并重新加载
            # 实际实现中需要根据具体的负载均衡器进行配置
            self.logger.info("负载均衡配置已更新")
            
            # 重新加载nginx配置
            subprocess.run(["nginx", "-s", "reload"], check=True)
            
        except Exception as e:
            self.logger.error(f"更新负载均衡配置失败: {e}")
    
    def send_alert(self, subject: str, message: str):
        """发送告警邮件"""
        try:
            email_config = self.config["email"]
            
            msg = MimeMultipart()
            msg['From'] = email_config["username"]
            msg['To'] = ", ".join(email_config["recipients"])
            msg['Subject'] = f"[Kylin Alert] {subject}"
            
            body = f"""
时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
主题: {subject}
详情: {message}

当前集群状态:
"""
            
            for node in self.nodes:
                body += f"- {node.hostname}: {node.status} ({node.role})\n"
            
            msg.attach(MimeText(body, 'plain'))
            
            server = smtplib.SMTP(email_config["smtp_server"], email_config["smtp_port"])
            server.starttls()
            server.login(email_config["username"], email_config["password"])
            
            text = msg.as_string()
            server.sendmail(email_config["username"], email_config["recipients"], text)
            server.quit()
            
            self.logger.info(f"告警邮件已发送: {subject}")
            
        except Exception as e:
            self.logger.error(f"发送告警邮件失败: {e}")
    
    def recover_node(self, node_hostname: str):
        """恢复节点"""
        node = next((n for n in self.nodes if n.hostname == node_hostname), None)
        if not node:
            self.logger.error(f"未找到节点: {node_hostname}")
            return
        
        # 检查节点是否已恢复
        if self.check_node_health(node):
            self.logger.info(f"节点 {node_hostname} 已恢复")
            
            # 如果原来是主节点,可以选择是否切换回来
            if node.role == "failed" and not self.current_master:
                node.role = "master"
                self.current_master = node
                self.logger.info(f"主节点已恢复: {node_hostname}")
            
            # 更新负载均衡
            self.update_load_balancer()
            
            # 发送恢复通知
            self.send_alert("节点恢复", f"节点 {node_hostname} 已恢复正常")
    
    def get_cluster_status(self) -> Dict:
        """获取集群状态"""
        active_nodes = [n for n in self.nodes if n.status == "active"]
        failed_nodes = [n for n in self.nodes if n.status == "failed"]
        
        return {
            "total_nodes": len(self.nodes),
            "active_nodes": len(active_nodes),
            "failed_nodes": len(failed_nodes),
            "current_master": self.current_master.hostname if self.current_master else None,
            "cluster_health": "healthy" if len(failed_nodes) == 0 else "degraded" if len(active_nodes) > 0 else "critical",
            "nodes": [
                {
                    "hostname": n.hostname,
                    "status": n.status,
                    "role": n.role,
                    "fail_count": n.fail_count,
                    "last_check": n.last_check.isoformat()
                }
                for n in self.nodes
            ]
        }
    
    def run_monitoring(self):
        """运行监控循环"""
        self.logger.info("启动故障转移监控...")
        
        while True:
            try:
                self.check_all_nodes()
                
                # 打印集群状态
                status = self.get_cluster_status()
                self.logger.info(f"集群状态: {status['cluster_health']} ({status['active_nodes']}/{status['total_nodes']} 节点正常)")
                
                time.sleep(self.config["check_interval"])
                
            except KeyboardInterrupt:
                self.logger.info("监控已停止")
                break
            except Exception as e:
                self.logger.error(f"监控循环异常: {e}")
                time.sleep(self.config["check_interval"])

def main():
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    manager = FailoverManager()
    
    try:
        manager.run_monitoring()
    except Exception as e:
        logging.error(f"故障转移管理器异常: {e}")

if __name__ == "__main__":
    main()

8.2 监控与告警

8.2.1 系统监控

综合监控系统

#!/usr/bin/env python3
# kylin_monitor.py - Kylin综合监控系统

import time
import psutil
import requests
import logging
import json
import subprocess
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from threading import Thread
import sqlite3
import smtplib
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart

@dataclass
class SystemMetrics:
    """系统指标"""
    timestamp: datetime
    cpu_usage: float
    memory_usage: float
    disk_usage: float
    network_io: Dict[str, int]
    load_average: List[float]
    
@dataclass
class KylinMetrics:
    """Kylin指标"""
    timestamp: datetime
    active_queries: int
    query_response_time_avg: float
    query_success_rate: float
    cache_hit_rate: float
    cube_build_jobs: int
    failed_jobs: int
    
@dataclass
class HBaseMetrics:
    """HBase指标"""
    timestamp: datetime
    region_count: int
    read_requests: int
    write_requests: int
    compaction_queue_length: int
    memstore_size: int
    
class MetricsCollector:
    """指标收集器"""
    
    def __init__(self, kylin_host: str = "localhost", kylin_port: int = 7070):
        self.kylin_host = kylin_host
        self.kylin_port = kylin_port
        self.logger = logging.getLogger(__name__)
        
    def collect_system_metrics(self) -> SystemMetrics:
        """收集系统指标"""
        try:
            # CPU使用率
            cpu_usage = psutil.cpu_percent(interval=1)
            
            # 内存使用率
            memory = psutil.virtual_memory()
            memory_usage = memory.percent
            
            # 磁盘使用率
            disk = psutil.disk_usage('/')
            disk_usage = (disk.used / disk.total) * 100
            
            # 网络IO
            network = psutil.net_io_counters()
            network_io = {
                "bytes_sent": network.bytes_sent,
                "bytes_recv": network.bytes_recv,
                "packets_sent": network.packets_sent,
                "packets_recv": network.packets_recv
            }
            
            # 系统负载
            load_average = list(psutil.getloadavg())
            
            return SystemMetrics(
                timestamp=datetime.now(),
                cpu_usage=cpu_usage,
                memory_usage=memory_usage,
                disk_usage=disk_usage,
                network_io=network_io,
                load_average=load_average
            )
            
        except Exception as e:
            self.logger.error(f"收集系统指标失败: {e}")
            return None
    
    def collect_kylin_metrics(self) -> KylinMetrics:
        """收集Kylin指标"""
        try:
            base_url = f"http://{self.kylin_host}:{self.kylin_port}/kylin/api"
            
            # 获取查询统计
            query_stats = self._get_query_stats(base_url)
            
            # 获取构建任务统计
            job_stats = self._get_job_stats(base_url)
            
            # 获取缓存统计
            cache_stats = self._get_cache_stats(base_url)
            
            return KylinMetrics(
                timestamp=datetime.now(),
                active_queries=query_stats.get("active_queries", 0),
                query_response_time_avg=query_stats.get("avg_response_time", 0),
                query_success_rate=query_stats.get("success_rate", 0),
                cache_hit_rate=cache_stats.get("hit_rate", 0),
                cube_build_jobs=job_stats.get("running_jobs", 0),
                failed_jobs=job_stats.get("failed_jobs", 0)
            )
            
        except Exception as e:
            self.logger.error(f"收集Kylin指标失败: {e}")
            return None
    
    def _get_query_stats(self, base_url: str) -> Dict:
        """获取查询统计"""
        try:
            # 这里应该调用Kylin的监控API
            # 由于API可能不存在,这里使用模拟数据
            import random
            return {
                "active_queries": random.randint(0, 20),
                "avg_response_time": random.uniform(1000, 5000),
                "success_rate": random.uniform(95, 99.9)
            }
        except:
            return {}
    
    def _get_job_stats(self, base_url: str) -> Dict:
        """获取任务统计"""
        try:
            response = requests.get(f"{base_url}/jobs", timeout=10)
            if response.status_code == 200:
                jobs = response.json()
                running_jobs = len([j for j in jobs if j.get("job_status") == "RUNNING"])
                failed_jobs = len([j for j in jobs if j.get("job_status") == "ERROR"])
                return {
                    "running_jobs": running_jobs,
                    "failed_jobs": failed_jobs
                }
        except:
            pass
        
        # 模拟数据
        import random
        return {
            "running_jobs": random.randint(0, 5),
            "failed_jobs": random.randint(0, 2)
        }
    
    def _get_cache_stats(self, base_url: str) -> Dict:
        """获取缓存统计"""
        try:
            # 模拟缓存统计
            import random
            return {
                "hit_rate": random.uniform(70, 95)
            }
        except:
            return {}
    
    def collect_hbase_metrics(self) -> HBaseMetrics:
        """收集HBase指标"""
        try:
            # 通过HBase shell或JMX获取指标
            # 这里使用模拟数据
            import random
            
            return HBaseMetrics(
                timestamp=datetime.now(),
                region_count=random.randint(100, 500),
                read_requests=random.randint(1000, 10000),
                write_requests=random.randint(100, 1000),
                compaction_queue_length=random.randint(0, 10),
                memstore_size=random.randint(100, 1000)
            )
            
        except Exception as e:
            self.logger.error(f"收集HBase指标失败: {e}")
            return None

class MetricsStorage:
    """指标存储"""
    
    def __init__(self, db_path: str = "kylin_metrics.db"):
        self.db_path = db_path
        self.init_database()
    
    def init_database(self):
        """初始化数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 系统指标表
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS system_metrics (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp TEXT NOT NULL,
                cpu_usage REAL,
                memory_usage REAL,
                disk_usage REAL,
                network_io TEXT,
                load_average TEXT
            )
        """)
        
        # Kylin指标表
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS kylin_metrics (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp TEXT NOT NULL,
                active_queries INTEGER,
                query_response_time_avg REAL,
                query_success_rate REAL,
                cache_hit_rate REAL,
                cube_build_jobs INTEGER,
                failed_jobs INTEGER
            )
        """)
        
        # HBase指标表
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS hbase_metrics (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp TEXT NOT NULL,
                region_count INTEGER,
                read_requests INTEGER,
                write_requests INTEGER,
                compaction_queue_length INTEGER,
                memstore_size INTEGER
            )
        """)
        
        conn.commit()
        conn.close()
    
    def store_system_metrics(self, metrics: SystemMetrics):
        """存储系统指标"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            INSERT INTO system_metrics 
            (timestamp, cpu_usage, memory_usage, disk_usage, network_io, load_average)
            VALUES (?, ?, ?, ?, ?, ?)
        """, (
            metrics.timestamp.isoformat(),
            metrics.cpu_usage,
            metrics.memory_usage,
            metrics.disk_usage,
            json.dumps(metrics.network_io),
            json.dumps(metrics.load_average)
        ))
        
        conn.commit()
        conn.close()
    
    def store_kylin_metrics(self, metrics: KylinMetrics):
        """存储Kylin指标"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            INSERT INTO kylin_metrics 
            (timestamp, active_queries, query_response_time_avg, query_success_rate, 
             cache_hit_rate, cube_build_jobs, failed_jobs)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        """, (
            metrics.timestamp.isoformat(),
            metrics.active_queries,
            metrics.query_response_time_avg,
            metrics.query_success_rate,
            metrics.cache_hit_rate,
            metrics.cube_build_jobs,
            metrics.failed_jobs
        ))
        
        conn.commit()
        conn.close()
    
    def store_hbase_metrics(self, metrics: HBaseMetrics):
        """存储HBase指标"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            INSERT INTO hbase_metrics 
            (timestamp, region_count, read_requests, write_requests, 
             compaction_queue_length, memstore_size)
            VALUES (?, ?, ?, ?, ?, ?)
        """, (
            metrics.timestamp.isoformat(),
            metrics.region_count,
            metrics.read_requests,
            metrics.write_requests,
            metrics.compaction_queue_length,
            metrics.memstore_size
        ))
        
        conn.commit()
        conn.close()
    
    def cleanup_old_data(self, days: int = 30):
        """清理旧数据"""
        cutoff_date = datetime.now() - timedelta(days=days)
        cutoff_str = cutoff_date.isoformat()
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        tables = ["system_metrics", "kylin_metrics", "hbase_metrics"]
        for table in tables:
            cursor.execute(f"DELETE FROM {table} WHERE timestamp < ?", (cutoff_str,))
        
        conn.commit()
        conn.close()

class AlertManager:
    """告警管理器"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.logger = logging.getLogger(__name__)
        self.alert_history = []
    
    def check_alerts(self, system_metrics: SystemMetrics, 
                    kylin_metrics: KylinMetrics, 
                    hbase_metrics: HBaseMetrics):
        """检查告警条件"""
        alerts = []
        
        # 系统告警
        if system_metrics:
            if system_metrics.cpu_usage > self.config.get("cpu_threshold", 80):
                alerts.append({
                    "type": "system",
                    "level": "warning",
                    "message": f"CPU使用率过高: {system_metrics.cpu_usage:.1f}%"
                })
            
            if system_metrics.memory_usage > self.config.get("memory_threshold", 85):
                alerts.append({
                    "type": "system",
                    "level": "warning",
                    "message": f"内存使用率过高: {system_metrics.memory_usage:.1f}%"
                })
            
            if system_metrics.disk_usage > self.config.get("disk_threshold", 90):
                alerts.append({
                    "type": "system",
                    "level": "critical",
                    "message": f"磁盘使用率过高: {system_metrics.disk_usage:.1f}%"
                })
        
        # Kylin告警
        if kylin_metrics:
            if kylin_metrics.query_success_rate < self.config.get("success_rate_threshold", 95):
                alerts.append({
                    "type": "kylin",
                    "level": "critical",
                    "message": f"查询成功率过低: {kylin_metrics.query_success_rate:.1f}%"
                })
            
            if kylin_metrics.query_response_time_avg > self.config.get("response_time_threshold", 30000):
                alerts.append({
                    "type": "kylin",
                    "level": "warning",
                    "message": f"查询响应时间过长: {kylin_metrics.query_response_time_avg:.0f}ms"
                })
            
            if kylin_metrics.failed_jobs > self.config.get("failed_jobs_threshold", 3):
                alerts.append({
                    "type": "kylin",
                    "level": "critical",
                    "message": f"失败任务过多: {kylin_metrics.failed_jobs}"
                })
        
        # HBase告警
        if hbase_metrics:
            if hbase_metrics.compaction_queue_length > self.config.get("compaction_threshold", 20):
                alerts.append({
                    "type": "hbase",
                    "level": "warning",
                    "message": f"HBase压缩队列过长: {hbase_metrics.compaction_queue_length}"
                })
        
        # 发送告警
        for alert in alerts:
            self.send_alert(alert)
    
    def send_alert(self, alert: Dict):
        """发送告警"""
        # 检查是否重复告警
        if self._is_duplicate_alert(alert):
            return
        
        self.logger.warning(f"告警: {alert['message']}")
        
        # 记录告警历史
        alert["timestamp"] = datetime.now().isoformat()
        self.alert_history.append(alert)
        
        # 发送邮件告警
        if self.config.get("email_enabled", False):
            self._send_email_alert(alert)
        
        # 发送钉钉告警
        if self.config.get("dingtalk_enabled", False):
            self._send_dingtalk_alert(alert)
    
    def _is_duplicate_alert(self, alert: Dict) -> bool:
        """检查是否重复告警"""
        # 检查最近5分钟内是否有相同告警
        cutoff_time = datetime.now() - timedelta(minutes=5)
        
        for hist_alert in self.alert_history:
            if (hist_alert["message"] == alert["message"] and 
                datetime.fromisoformat(hist_alert["timestamp"]) > cutoff_time):
                return True
        
        return False
    
    def _send_email_alert(self, alert: Dict):
        """发送邮件告警"""
        try:
            email_config = self.config["email"]
            
            msg = MimeMultipart()
            msg['From'] = email_config["from"]
            msg['To'] = ", ".join(email_config["to"])
            msg['Subject'] = f"[Kylin Alert] {alert['level'].upper()}: {alert['type']}"
            
            body = f"""
告警时间: {alert['timestamp']}
告警级别: {alert['level'].upper()}
告警类型: {alert['type']}
告警内容: {alert['message']}

请及时处理!
"""
            
            msg.attach(MimeText(body, 'plain'))
            
            server = smtplib.SMTP(email_config["smtp_server"], email_config["smtp_port"])
            server.starttls()
            server.login(email_config["username"], email_config["password"])
            
            text = msg.as_string()
            server.sendmail(email_config["from"], email_config["to"], text)
            server.quit()
            
        except Exception as e:
            self.logger.error(f"发送邮件告警失败: {e}")
    
    def _send_dingtalk_alert(self, alert: Dict):
        """发送钉钉告警"""
        try:
            webhook_url = self.config["dingtalk"]["webhook_url"]
            
            message = {
                "msgtype": "text",
                "text": {
                    "content": f"Kylin告警\n级别: {alert['level'].upper()}\n类型: {alert['type']}\n内容: {alert['message']}\n时间: {alert['timestamp']}"
                }
            }
            
            response = requests.post(webhook_url, json=message, timeout=10)
            if response.status_code == 200:
                self.logger.info("钉钉告警发送成功")
            else:
                self.logger.error(f"钉钉告警发送失败: {response.text}")
                
        except Exception as e:
            self.logger.error(f"发送钉钉告警失败: {e}")

class KylinMonitor:
    """Kylin监控主类"""
    
    def __init__(self, config_file: str = "monitor_config.json"):
        self.config = self.load_config(config_file)
        self.collector = MetricsCollector(
            self.config.get("kylin_host", "localhost"),
            self.config.get("kylin_port", 7070)
        )
        self.storage = MetricsStorage(self.config.get("db_path", "kylin_metrics.db"))
        self.alert_manager = AlertManager(self.config.get("alerts", {}))
        self.logger = logging.getLogger(__name__)
        self.running = False
    
    def load_config(self, config_file: str) -> Dict:
        """加载配置文件"""
        try:
            with open(config_file, 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            # 默认配置
            return {
                "kylin_host": "localhost",
                "kylin_port": 7070,
                "collect_interval": 60,
                "db_path": "kylin_metrics.db",
                "alerts": {
                    "cpu_threshold": 80,
                    "memory_threshold": 85,
                    "disk_threshold": 90,
                    "success_rate_threshold": 95,
                    "response_time_threshold": 30000,
                    "failed_jobs_threshold": 3,
                    "compaction_threshold": 20,
                    "email_enabled": False,
                    "dingtalk_enabled": False
                }
            }
    
    def collect_and_store_metrics(self):
        """收集并存储指标"""
        try:
            # 收集指标
            system_metrics = self.collector.collect_system_metrics()
            kylin_metrics = self.collector.collect_kylin_metrics()
            hbase_metrics = self.collector.collect_hbase_metrics()
            
            # 存储指标
            if system_metrics:
                self.storage.store_system_metrics(system_metrics)
            if kylin_metrics:
                self.storage.store_kylin_metrics(kylin_metrics)
            if hbase_metrics:
                self.storage.store_hbase_metrics(hbase_metrics)
            
            # 检查告警
            self.alert_manager.check_alerts(system_metrics, kylin_metrics, hbase_metrics)
            
            # 打印当前状态
            self.print_current_status(system_metrics, kylin_metrics, hbase_metrics)
            
        except Exception as e:
            self.logger.error(f"收集指标失败: {e}")
    
    def print_current_status(self, system_metrics: SystemMetrics, 
                           kylin_metrics: KylinMetrics, 
                           hbase_metrics: HBaseMetrics):
        """打印当前状态"""
        print(f"\n=== Kylin监控状态 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ===")
        
        if system_metrics:
            print(f"📊 系统指标:")
            print(f"  CPU使用率: {system_metrics.cpu_usage:.1f}%")
            print(f"  内存使用率: {system_metrics.memory_usage:.1f}%")
            print(f"  磁盘使用率: {system_metrics.disk_usage:.1f}%")
            print(f"  系统负载: {system_metrics.load_average}")
        
        if kylin_metrics:
            print(f"🔍 Kylin指标:")
            print(f"  活跃查询数: {kylin_metrics.active_queries}")
            print(f"  平均响应时间: {kylin_metrics.query_response_time_avg:.0f}ms")
            print(f"  查询成功率: {kylin_metrics.query_success_rate:.1f}%")
            print(f"  缓存命中率: {kylin_metrics.cache_hit_rate:.1f}%")
            print(f"  构建任务数: {kylin_metrics.cube_build_jobs}")
            print(f"  失败任务数: {kylin_metrics.failed_jobs}")
        
        if hbase_metrics:
            print(f"🗄️ HBase指标:")
            print(f"  Region数量: {hbase_metrics.region_count}")
            print(f"  读请求数: {hbase_metrics.read_requests}")
            print(f"  写请求数: {hbase_metrics.write_requests}")
            print(f"  压缩队列长度: {hbase_metrics.compaction_queue_length}")
            print(f"  MemStore大小: {hbase_metrics.memstore_size}MB")
    
    def start_monitoring(self):
        """启动监控"""
        self.logger.info("启动Kylin监控系统...")
        self.running = True
        
        while self.running:
            try:
                self.collect_and_store_metrics()
                
                # 清理旧数据
                if datetime.now().hour == 2:  # 每天凌晨2点清理
                    self.storage.cleanup_old_data()
                
                time.sleep(self.config.get("collect_interval", 60))
                
            except KeyboardInterrupt:
                self.logger.info("监控已停止")
                self.running = False
            except Exception as e:
                self.logger.error(f"监控循环异常: {e}")
                time.sleep(self.config.get("collect_interval", 60))
    
    def stop_monitoring(self):
        """停止监控"""
        self.running = False
        self.logger.info("监控系统已停止")
    
    def generate_report(self, hours: int = 24) -> str:
        """生成监控报告"""
        end_time = datetime.now()
        start_time = end_time - timedelta(hours=hours)
        
        conn = sqlite3.connect(self.storage.db_path)
        cursor = conn.cursor()
        
        report = f"\n=== Kylin监控报告 ({start_time.strftime('%Y-%m-%d %H:%M')} - {end_time.strftime('%Y-%m-%d %H:%M')}) ===\n\n"
        
        # 系统指标统计
        cursor.execute("""
            SELECT AVG(cpu_usage), AVG(memory_usage), AVG(disk_usage), MAX(cpu_usage), MAX(memory_usage)
            FROM system_metrics 
            WHERE timestamp >= ?
        """, (start_time.isoformat(),))
        
        sys_stats = cursor.fetchone()
        if sys_stats and sys_stats[0]:
            report += f"📊 系统指标统计:\n"
            report += f"  平均CPU使用率: {sys_stats[0]:.1f}% (峰值: {sys_stats[3]:.1f}%)\n"
            report += f"  平均内存使用率: {sys_stats[1]:.1f}% (峰值: {sys_stats[4]:.1f}%)\n"
            report += f"  平均磁盘使用率: {sys_stats[2]:.1f}%\n\n"
        
        # Kylin指标统计
        cursor.execute("""
            SELECT AVG(query_response_time_avg), AVG(query_success_rate), AVG(cache_hit_rate), 
                   SUM(failed_jobs), AVG(active_queries)
            FROM kylin_metrics 
            WHERE timestamp >= ?
        """, (start_time.isoformat(),))
        
        kylin_stats = cursor.fetchone()
        if kylin_stats and kylin_stats[0]:
            report += f"🔍 Kylin指标统计:\n"
            report += f"  平均响应时间: {kylin_stats[0]:.0f}ms\n"
            report += f"  平均成功率: {kylin_stats[1]:.1f}%\n"
            report += f"  平均缓存命中率: {kylin_stats[2]:.1f}%\n"
            report += f"  总失败任务数: {kylin_stats[3] or 0}\n"
            report += f"  平均活跃查询数: {kylin_stats[4]:.1f}\n\n"
        
        # 告警统计
        alert_count = len([a for a in self.alert_manager.alert_history 
                          if datetime.fromisoformat(a["timestamp"]) >= start_time])
        report += f"⚠️ 告警统计:\n"
        report += f"  告警总数: {alert_count}\n"
        
        conn.close()
        return report

def main():
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler('kylin_monitor.log'),
            logging.StreamHandler()
        ]
    )
    
    monitor = KylinMonitor()
    
    try:
        monitor.start_monitoring()
    except Exception as e:
        logging.error(f"监控系统异常: {e}")
    finally:
        monitor.stop_monitoring()

if __name__ == "__main__":
    main()

8.2.2 性能监控

JVM监控脚本

#!/bin/bash
# jvm_monitor.sh - JVM性能监控脚本

set -e

# 配置参数
KYLIN_PID=$(pgrep -f "KylinLauncher" | head -1)
MONITOR_INTERVAL=30
LOG_DIR="/var/log/kylin/monitoring"
DATE=$(date +"%Y%m%d")
TIMESTAMP=$(date +"%Y-%m-%d %H:%M:%S")

# 创建日志目录
mkdir -p "$LOG_DIR"

# 颜色输出
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m'

log_info() {
    echo -e "${GREEN}[INFO]${NC} $1"
}

log_warn() {
    echo -e "${YELLOW}[WARN]${NC} $1"
}

log_error() {
    echo -e "${RED}[ERROR]${NC} $1"
}

# 检查Kylin进程
check_kylin_process() {
    if [ -z "$KYLIN_PID" ]; then
        log_error "未找到Kylin进程"
        exit 1
    fi
    
    log_info "找到Kylin进程: PID=$KYLIN_PID"
}

# 收集JVM内存信息
collect_jvm_memory() {
    local output_file="$LOG_DIR/jvm_memory_$DATE.log"
    
    echo "[$TIMESTAMP] JVM内存信息:" >> "$output_file"
    
    # 堆内存使用情况
    jstat -gc "$KYLIN_PID" | tail -1 | awk '{
        s0c=$1; s1c=$2; s0u=$3; s1u=$4; ec=$5; eu=$6; oc=$7; ou=$8; mc=$9; mu=$10; ccsc=$11; ccsu=$12; ygc=$13; ygct=$14; fgc=$15; fgct=$16; gct=$17
        total_heap = (s0c + s1c + ec + oc) / 1024
        used_heap = (s0u + s1u + eu + ou) / 1024
        heap_usage = (used_heap / total_heap) * 100
        
        printf "  堆内存总量: %.2f MB\n", total_heap
        printf "  堆内存使用: %.2f MB (%.1f%%)\n", used_heap, heap_usage
        printf "  年轻代使用: %.2f MB / %.2f MB\n", (s0u + s1u + eu)/1024, (s0c + s1c + ec)/1024
        printf "  老年代使用: %.2f MB / %.2f MB\n", ou/1024, oc/1024
        printf "  元空间使用: %.2f MB / %.2f MB\n", mu/1024, mc/1024
        printf "  YGC次数: %d (%.3fs)\n", ygc, ygct
        printf "  FGC次数: %d (%.3fs)\n", fgc, fgct
        printf "  总GC时间: %.3fs\n", gct
    }' >> "$output_file"
    
    echo "" >> "$output_file"
}

# 收集JVM线程信息
collect_jvm_threads() {
    local output_file="$LOG_DIR/jvm_threads_$DATE.log"
    
    echo "[$TIMESTAMP] JVM线程信息:" >> "$output_file"
    
    # 线程统计
    jstack "$KYLIN_PID" | grep -E "^\".*\" #" | wc -l | awk '{
        printf "  总线程数: %d\n", $1
    }' >> "$output_file"
    
    # 线程状态统计
    jstack "$KYLIN_PID" | grep "java.lang.Thread.State:" | sort | uniq -c | awk '{
        printf "  %s: %d\n", $2, $1
    }' >> "$output_file"
    
    echo "" >> "$output_file"
}

# 收集系统资源信息
collect_system_resources() {
    local output_file="$LOG_DIR/system_resources_$DATE.log"
    
    echo "[$TIMESTAMP] 系统资源信息:" >> "$output_file"
    
    # CPU使用率
    cpu_usage=$(top -bn1 | grep "Cpu(s)" | awk '{print $2}' | cut -d'%' -f1)
    echo "  CPU使用率: ${cpu_usage}%" >> "$output_file"
    
    # 内存使用情况
    free -m | awk 'NR==2{printf "  内存使用: %d MB / %d MB (%.1f%%)\n", $3, $2, $3*100/$2}' >> "$output_file"
    
    # 磁盘使用情况
    df -h | grep -E "^/dev/" | awk '{printf "  磁盘 %s: %s / %s (%s)\n", $1, $3, $2, $5}' >> "$output_file"
    
    # 网络连接数
    netstat_count=$(netstat -an | grep ESTABLISHED | wc -l)
    echo "  网络连接数: $netstat_count" >> "$output_file"
    
    # 进程特定资源使用
    ps -p "$KYLIN_PID" -o pid,ppid,pcpu,pmem,vsz,rss,tty,stat,start,time,cmd | tail -1 | awk '{
        printf "  Kylin进程 CPU: %.1f%%, 内存: %.1f%%, VSZ: %d KB, RSS: %d KB\n", $3, $4, $5, $6
    }' >> "$output_file"
    
    echo "" >> "$output_file"
}

# 收集Kylin特定指标
collect_kylin_metrics() {
    local output_file="$LOG_DIR/kylin_metrics_$DATE.log"
    
    echo "[$TIMESTAMP] Kylin指标:" >> "$output_file"
    
    # 通过JMX获取指标(需要启用JMX)
    if command -v jmxterm >/dev/null 2>&1; then
        # 查询缓存命中率
        echo "get -b org.apache.kylin:type=QueryCache hitRate" | jmxterm -l localhost:9999 -n 2>/dev/null | grep "hitRate" | awk -F'=' '{printf "  缓存命中率: %.2f%%\n", $2*100}' >> "$output_file" 2>/dev/null || echo "  缓存命中率: 无法获取" >> "$output_file"
        
        # 查询活跃查询数
        echo "get -b org.apache.kylin:type=QueryMetrics activeQueryCount" | jmxterm -l localhost:9999 -n 2>/dev/null | grep "activeQueryCount" | awk -F'=' '{printf "  活跃查询数: %d\n", $2}' >> "$output_file" 2>/dev/null || echo "  活跃查询数: 无法获取" >> "$output_file"
    else
        echo "  JMX工具未安装,无法获取详细指标" >> "$output_file"
    fi
    
    # 检查日志中的错误
    error_count=$(tail -1000 /opt/kylin/logs/kylin.log | grep -c "ERROR" || echo 0)
    warn_count=$(tail -1000 /opt/kylin/logs/kylin.log | grep -c "WARN" || echo 0)
    
    echo "  最近1000行日志中错误数: $error_count" >> "$output_file"
    echo "  最近1000行日志中警告数: $warn_count" >> "$output_file"
    
    echo "" >> "$output_file"
}

# 生成性能报告
generate_performance_report() {
    local report_file="$LOG_DIR/performance_report_$DATE.html"
    
    cat > "$report_file" << 'EOF'
<!DOCTYPE html>
<html>
<head>
    <title>Kylin性能监控报告</title>
    <meta charset="UTF-8">
    <style>
        body { font-family: Arial, sans-serif; margin: 20px; }
        .header { background-color: #f0f0f0; padding: 10px; border-radius: 5px; }
        .section { margin: 20px 0; padding: 15px; border: 1px solid #ddd; border-radius: 5px; }
        .metric { margin: 5px 0; }
        .warning { color: #ff6600; }
        .error { color: #ff0000; }
        .good { color: #00aa00; }
        table { border-collapse: collapse; width: 100%; }
        th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
        th { background-color: #f2f2f2; }
    </style>
</head>
<body>
    <div class="header">
        <h1>Kylin性能监控报告</h1>
        <p>生成时间: $(date)</p>
        <p>监控周期: 最近24小时</p>
    </div>
EOF

    # 添加JVM内存趋势
    echo "    <div class='section'>" >> "$report_file"
    echo "        <h2>JVM内存使用趋势</h2>" >> "$report_file"
    echo "        <table>" >> "$report_file"
    echo "            <tr><th>时间</th><th>堆内存使用率</th><th>GC次数</th><th>GC时间</th></tr>" >> "$report_file"
    
    # 从日志文件中提取最近的数据
    tail -20 "$LOG_DIR/jvm_memory_$DATE.log" | grep -A 7 "JVM内存信息" | while read line; do
        if [[ $line =~ \[([^\]]+)\] ]]; then
            timestamp="${BASH_REMATCH[1]}"
        elif [[ $line =~ 堆内存使用:.*\(([0-9.]+)%\) ]]; then
            heap_usage="${BASH_REMATCH[1]}"
        elif [[ $line =~ YGC次数:.*([0-9]+) ]]; then
            ygc_count="${BASH_REMATCH[1]}"
        elif [[ $line =~ 总GC时间:.*([0-9.]+)s ]]; then
            gc_time="${BASH_REMATCH[1]}"
            echo "            <tr><td>$timestamp</td><td>$heap_usage%</td><td>$ygc_count</td><td>${gc_time}s</td></tr>" >> "$report_file"
        fi
    done
    
    echo "        </table>" >> "$report_file"
    echo "    </div>" >> "$report_file"
    
    # 添加系统资源使用情况
    echo "    <div class='section'>" >> "$report_file"
    echo "        <h2>系统资源使用情况</h2>" >> "$report_file"
    
    # 获取最新的系统资源信息
    if [ -f "$LOG_DIR/system_resources_$DATE.log" ]; then
        echo "        <pre>" >> "$report_file"
        tail -10 "$LOG_DIR/system_resources_$DATE.log" >> "$report_file"
        echo "        </pre>" >> "$report_file"
    fi
    
    echo "    </div>" >> "$report_file"
    
    # 结束HTML
    echo "</body></html>" >> "$report_file"
    
    log_info "性能报告已生成: $report_file"
}

# 检查告警条件
check_alerts() {
    local latest_memory_log="$LOG_DIR/jvm_memory_$DATE.log"
    local latest_system_log="$LOG_DIR/system_resources_$DATE.log"
    
    if [ -f "$latest_memory_log" ]; then
        # 检查堆内存使用率
        heap_usage=$(tail -10 "$latest_memory_log" | grep "堆内存使用" | tail -1 | grep -o "([0-9.]*%)" | tr -d "()%")
        if [ -n "$heap_usage" ] && (( $(echo "$heap_usage > 85" | bc -l) )); then
            log_warn "堆内存使用率过高: ${heap_usage}%"
        fi
        
        # 检查GC频率
        fgc_count=$(tail -10 "$latest_memory_log" | grep "FGC次数" | tail -1 | grep -o "[0-9]*" | head -1)
        if [ -n "$fgc_count" ] && [ "$fgc_count" -gt 10 ]; then
            log_warn "Full GC次数过多: $fgc_count"
        fi
    fi
    
    if [ -f "$latest_system_log" ]; then
        # 检查CPU使用率
        cpu_usage=$(tail -10 "$latest_system_log" | grep "CPU使用率" | tail -1 | grep -o "[0-9.]*" | head -1)
        if [ -n "$cpu_usage" ] && (( $(echo "$cpu_usage > 80" | bc -l) )); then
            log_warn "CPU使用率过高: ${cpu_usage}%"
        fi
    fi
}

# 主监控循环
monitor_loop() {
    log_info "开始JVM性能监控..."
    
    while true; do
        TIMESTAMP=$(date +"%Y-%m-%d %H:%M:%S")
        
        log_info "收集监控数据: $TIMESTAMP"
        
        collect_jvm_memory
        collect_jvm_threads
        collect_system_resources
        collect_kylin_metrics
        
        check_alerts
        
        # 每小时生成一次报告
        if [ "$(date +%M)" = "00" ]; then
            generate_performance_report
        fi
        
        sleep "$MONITOR_INTERVAL"
    done
}

# 清理旧日志
cleanup_old_logs() {
    log_info "清理7天前的监控日志..."
    find "$LOG_DIR" -name "*.log" -mtime +7 -delete
    find "$LOG_DIR" -name "*.html" -mtime +7 -delete
}

# 主函数
main() {
    case "${1:-monitor}" in
        "monitor")
            check_kylin_process
            cleanup_old_logs
            monitor_loop
            ;;
        "report")
            generate_performance_report
            ;;
        "check")
            check_kylin_process
            collect_jvm_memory
            collect_system_resources
            check_alerts
            ;;
        *)
            echo "用法: $0 [monitor|report|check]"
            echo "  monitor: 启动持续监控"
            echo "  report:  生成性能报告"
            echo "  check:   执行一次检查"
            exit 1
            ;;
    esac
}

# 脚本入口
if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then
    main "$@"
fi