9.1 概述

Apache Kylin作为企业级OLAP引擎,需要与各种系统进行集成,包括BI工具、数据湖、实时计算系统等。本章将详细介绍Kylin的系统集成方案和API开发实践。

9.1.1 集成架构

flowchart TB
    subgraph "数据源层"
        A[关系型数据库]
        B[数据湖]
        C[实时数据流]
        D[文件系统]
    end
    
    subgraph "Kylin集群"
        E[Kylin Server]
        F[HBase]
        G[Spark]
        H[Kafka]
    end
    
    subgraph "API层"
        I[REST API]
        J[JDBC Driver]
        K[ODBC Driver]
        L[Python SDK]
    end
    
    subgraph "应用层"
        M[BI工具]
        N[自定义应用]
        O[数据可视化]
        P[报表系统]
    end
    
    A --> E
    B --> E
    C --> H
    D --> E
    
    E --> F
    E --> G
    H --> E
    
    E --> I
    E --> J
    E --> K
    E --> L
    
    I --> N
    J --> M
    K --> M
    L --> O
    
    M --> P
    N --> P
    O --> P
    
    style E fill:#ff9999
    style I fill:#99ccff
    style M fill:#99ff99

9.1.2 集成方式

集成方式 适用场景 优势 劣势
REST API Web应用、微服务 标准化、易用 性能相对较低
JDBC Java应用、BI工具 性能好、兼容性强 仅支持Java
ODBC 多语言应用 跨平台、标准化 配置复杂
Python SDK 数据科学、自动化 易用、功能丰富 仅支持Python

9.2 REST API集成

9.2.1 API概览

Kylin提供了完整的REST API,支持所有核心功能:

  • 认证与授权: 用户登录、权限管理
  • 项目管理: 项目创建、配置、删除
  • 数据源管理: 数据源连接、表同步
  • 模型管理: 数据模型创建、修改
  • Cube管理: Cube构建、查询、管理
  • 查询服务: SQL查询、结果获取
  • 系统管理: 监控、配置、日志

9.2.2 API认证

基本认证

#!/usr/bin/env python3
# kylin_api_client.py - Kylin REST API客户端

import requests
import json
import base64
from typing import Dict, List, Optional, Any
from datetime import datetime
import logging

class KylinAPIClient:
    """Kylin REST API客户端"""
    
    def __init__(self, host: str = "localhost", port: int = 7070, 
                 username: str = "ADMIN", password: str = "KYLIN",
                 use_ssl: bool = False):
        """
        初始化Kylin API客户端
        
        Args:
            host: Kylin服务器地址
            port: Kylin服务器端口
            username: 用户名
            password: 密码
            use_ssl: 是否使用HTTPS
        """
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        
        protocol = "https" if use_ssl else "http"
        self.base_url = f"{protocol}://{host}:{port}/kylin/api"
        
        # 设置认证
        self.auth = (username, password)
        
        # 设置会话
        self.session = requests.Session()
        self.session.auth = self.auth
        self.session.headers.update({
            'Content-Type': 'application/json',
            'Accept': 'application/json'
        })
        
        # 设置日志
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
        
        # 登录获取token
        self._login()
    
    def _login(self) -> bool:
        """登录获取认证token"""
        try:
            response = self.session.post(f"{self.base_url}/user/authentication")
            if response.status_code == 200:
                self.logger.info("登录成功")
                return True
            else:
                self.logger.error(f"登录失败: {response.status_code}")
                return False
        except Exception as e:
            self.logger.error(f"登录异常: {e}")
            return False
    
    def _request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
        """发送HTTP请求"""
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        
        try:
            response = self.session.request(method, url, **kwargs)
            
            # 记录请求日志
            self.logger.debug(f"{method} {url} - {response.status_code}")
            
            return response
            
        except Exception as e:
            self.logger.error(f"请求异常: {e}")
            raise
    
    def get(self, endpoint: str, **kwargs) -> requests.Response:
        """GET请求"""
        return self._request('GET', endpoint, **kwargs)
    
    def post(self, endpoint: str, **kwargs) -> requests.Response:
        """POST请求"""
        return self._request('POST', endpoint, **kwargs)
    
    def put(self, endpoint: str, **kwargs) -> requests.Response:
        """PUT请求"""
        return self._request('PUT', endpoint, **kwargs)
    
    def delete(self, endpoint: str, **kwargs) -> requests.Response:
        """DELETE请求"""
        return self._request('DELETE', endpoint, **kwargs)

Token认证

class KylinTokenAuth:
    """Kylin Token认证"""
    
    def __init__(self, client: KylinAPIClient):
        self.client = client
        self.token = None
        self.token_expires = None
    
    def get_token(self) -> str:
        """获取访问token"""
        if self.token and self.token_expires and datetime.now() < self.token_expires:
            return self.token
        
        # 刷新token
        response = self.client.post('/user/authentication')
        if response.status_code == 200:
            data = response.json()
            self.token = data.get('token')
            # 假设token有效期为1小时
            self.token_expires = datetime.now() + timedelta(hours=1)
            return self.token
        else:
            raise Exception(f"获取token失败: {response.status_code}")
    
    def refresh_token(self):
        """刷新token"""
        self.token = None
        self.token_expires = None
        return self.get_token()

9.2.3 项目管理API

class ProjectManager:
    """项目管理器"""
    
    def __init__(self, client: KylinAPIClient):
        self.client = client
    
    def list_projects(self) -> List[Dict]:
        """获取项目列表"""
        response = self.client.get('/projects')
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"获取项目列表失败: {response.status_code}")
    
    def get_project(self, project_name: str) -> Dict:
        """获取项目详情"""
        response = self.client.get(f'/projects/{project_name}')
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"获取项目详情失败: {response.status_code}")
    
    def create_project(self, project_name: str, description: str = "") -> Dict:
        """创建项目"""
        data = {
            "name": project_name,
            "description": description
        }
        
        response = self.client.post('/projects', json=data)
        if response.status_code == 201:
            return response.json()
        else:
            raise Exception(f"创建项目失败: {response.status_code}")
    
    def update_project(self, project_name: str, description: str) -> Dict:
        """更新项目"""
        data = {
            "name": project_name,
            "description": description
        }
        
        response = self.client.put(f'/projects/{project_name}', json=data)
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"更新项目失败: {response.status_code}")
    
    def delete_project(self, project_name: str) -> bool:
        """删除项目"""
        response = self.client.delete(f'/projects/{project_name}')
        return response.status_code == 204

9.2.4 数据源管理API

class DataSourceManager:
    """数据源管理器"""
    
    def __init__(self, client: KylinAPIClient):
        self.client = client
    
    def list_tables(self, project_name: str, ext: bool = True) -> List[Dict]:
        """获取表列表"""
        params = {'ext': ext, 'project': project_name}
        response = self.client.get('/tables', params=params)
        
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"获取表列表失败: {response.status_code}")
    
    def get_table(self, table_name: str, project_name: str) -> Dict:
        """获取表详情"""
        params = {'project': project_name}
        response = self.client.get(f'/tables/{table_name}', params=params)
        
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"获取表详情失败: {response.status_code}")
    
    def load_tables(self, project_name: str, tables: List[str]) -> Dict:
        """加载表到项目"""
        data = {
            "tables": tables,
            "project": project_name
        }
        
        response = self.client.post('/tables', json=data)
        if response.status_code == 201:
            return response.json()
        else:
            raise Exception(f"加载表失败: {response.status_code}")
    
    def unload_table(self, table_name: str, project_name: str) -> bool:
        """从项目中卸载表"""
        params = {'project': project_name}
        response = self.client.delete(f'/tables/{table_name}', params=params)
        return response.status_code == 204
    
    def sync_table(self, table_name: str, project_name: str) -> Dict:
        """同步表结构"""
        data = {'project': project_name}
        response = self.client.put(f'/tables/{table_name}/sync', json=data)
        
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"同步表失败: {response.status_code}")

9.2.5 模型管理API

class ModelManager:
    """模型管理器"""
    
    def __init__(self, client: KylinAPIClient):
        self.client = client
    
    def list_models(self, project_name: str) -> List[Dict]:
        """获取模型列表"""
        params = {'project': project_name}
        response = self.client.get('/models', params=params)
        
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"获取模型列表失败: {response.status_code}")
    
    def get_model(self, model_name: str, project_name: str) -> Dict:
        """获取模型详情"""
        params = {'project': project_name}
        response = self.client.get(f'/models/{model_name}', params=params)
        
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"获取模型详情失败: {response.status_code}")
    
    def create_model(self, project_name: str, model_config: Dict) -> Dict:
        """创建数据模型"""
        model_config['project'] = project_name
        
        response = self.client.post('/models', json=model_config)
        if response.status_code == 201:
            return response.json()
        else:
            raise Exception(f"创建模型失败: {response.status_code}")
    
    def update_model(self, model_name: str, project_name: str, model_config: Dict) -> Dict:
        """更新数据模型"""
        model_config['project'] = project_name
        
        response = self.client.put(f'/models/{model_name}', json=model_config)
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"更新模型失败: {response.status_code}")
    
    def delete_model(self, model_name: str, project_name: str) -> bool:
        """删除数据模型"""
        params = {'project': project_name}
        response = self.client.delete(f'/models/{model_name}', params=params)
        return response.status_code == 204
    
    def clone_model(self, model_name: str, new_model_name: str, project_name: str) -> Dict:
        """克隆数据模型"""
        data = {
            'modelName': new_model_name,
            'project': project_name
        }
        
        response = self.client.post(f'/models/{model_name}/clone', json=data)
        if response.status_code == 201:
            return response.json()
        else:
            raise Exception(f"克隆模型失败: {response.status_code}")

9.2.6 Cube管理API

class CubeManager:
    """Cube管理器"""
    
    def __init__(self, client: KylinAPIClient):
        self.client = client
    
    def list_cubes(self, project_name: str = None) -> List[Dict]:
        """获取Cube列表"""
        params = {}
        if project_name:
            params['project'] = project_name
        
        response = self.client.get('/cubes', params=params)
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"获取Cube列表失败: {response.status_code}")
    
    def get_cube(self, cube_name: str) -> Dict:
        """获取Cube详情"""
        response = self.client.get(f'/cubes/{cube_name}')
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"获取Cube详情失败: {response.status_code}")
    
    def create_cube(self, cube_config: Dict) -> Dict:
        """创建Cube"""
        response = self.client.post('/cubes', json=cube_config)
        if response.status_code == 201:
            return response.json()
        else:
            raise Exception(f"创建Cube失败: {response.status_code}")
    
    def update_cube(self, cube_name: str, cube_config: Dict) -> Dict:
        """更新Cube"""
        response = self.client.put(f'/cubes/{cube_name}', json=cube_config)
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"更新Cube失败: {response.status_code}")
    
    def delete_cube(self, cube_name: str) -> bool:
        """删除Cube"""
        response = self.client.delete(f'/cubes/{cube_name}')
        return response.status_code == 204
    
    def build_cube(self, cube_name: str, start_time: str, end_time: str, 
                   build_type: str = "BUILD") -> Dict:
        """构建Cube"""
        data = {
            "startTime": start_time,
            "endTime": end_time,
            "buildType": build_type
        }
        
        response = self.client.put(f'/cubes/{cube_name}/build', json=data)
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"构建Cube失败: {response.status_code}")
    
    def enable_cube(self, cube_name: str) -> Dict:
        """启用Cube"""
        response = self.client.put(f'/cubes/{cube_name}/enable')
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"启用Cube失败: {response.status_code}")
    
    def disable_cube(self, cube_name: str) -> Dict:
        """禁用Cube"""
        response = self.client.put(f'/cubes/{cube_name}/disable')
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"禁用Cube失败: {response.status_code}")
    
    def purge_cube(self, cube_name: str) -> Dict:
        """清理Cube"""
        response = self.client.put(f'/cubes/{cube_name}/purge')
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"清理Cube失败: {response.status_code}")

9.2.7 查询API

class QueryManager:
    """查询管理器"""
    
    def __init__(self, client: KylinAPIClient):
        self.client = client
    
    def execute_query(self, sql: str, project: str, limit: int = 50000, 
                     offset: int = 0, accept_partial: bool = False) -> Dict:
        """执行SQL查询"""
        data = {
            "sql": sql,
            "project": project,
            "limit": limit,
            "offset": offset,
            "acceptPartial": accept_partial
        }
        
        response = self.client.post('/query', json=data)
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"查询执行失败: {response.status_code} - {response.text}")
    
    def get_query_result(self, query_id: str) -> Dict:
        """获取查询结果"""
        response = self.client.get(f'/query/{query_id}')
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"获取查询结果失败: {response.status_code}")
    
    def cancel_query(self, query_id: str) -> bool:
        """取消查询"""
        response = self.client.delete(f'/query/{query_id}')
        return response.status_code == 204
    
    def list_queries(self, project: str = None, limit: int = 20, offset: int = 0) -> List[Dict]:
        """获取查询历史"""
        params = {
            "limit": limit,
            "offset": offset
        }
        if project:
            params["project"] = project
        
        response = self.client.get('/query', params=params)
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"获取查询历史失败: {response.status_code}")
    
    def get_query_cache(self, sql: str, project: str) -> Optional[Dict]:
        """获取查询缓存"""
        params = {
            "sql": sql,
            "project": project
        }
        
        response = self.client.get('/query/cache', params=params)
        if response.status_code == 200:
            return response.json()
        elif response.status_code == 404:
            return None
        else:
            raise Exception(f"获取查询缓存失败: {response.status_code}")
    
    def clear_query_cache(self, project: str = None) -> bool:
        """清理查询缓存"""
        params = {}
        if project:
            params["project"] = project
        
        response = self.client.delete('/query/cache', params=params)
        return response.status_code == 204

9.2.8 作业管理API

class JobManager:
    """作业管理器"""
    
    def __init__(self, client: KylinAPIClient):
        self.client = client
    
    def list_jobs(self, project: str = None, cube_name: str = None, 
                 status: str = None, limit: int = 15, offset: int = 0) -> List[Dict]:
        """获取作业列表"""
        params = {
            "limit": limit,
            "offset": offset
        }
        
        if project:
            params["project"] = project
        if cube_name:
            params["cubeName"] = cube_name
        if status:
            params["status"] = status
        
        response = self.client.get('/jobs', params=params)
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"获取作业列表失败: {response.status_code}")
    
    def get_job(self, job_id: str) -> Dict:
        """获取作业详情"""
        response = self.client.get(f'/jobs/{job_id}')
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"获取作业详情失败: {response.status_code}")
    
    def cancel_job(self, job_id: str) -> Dict:
        """取消作业"""
        response = self.client.put(f'/jobs/{job_id}/cancel')
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"取消作业失败: {response.status_code}")
    
    def resume_job(self, job_id: str) -> Dict:
        """恢复作业"""
        response = self.client.put(f'/jobs/{job_id}/resume')
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"恢复作业失败: {response.status_code}")
    
    def get_job_steps(self, job_id: str) -> List[Dict]:
        """获取作业步骤"""
        response = self.client.get(f'/jobs/{job_id}/steps')
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"获取作业步骤失败: {response.status_code}")
    
    def get_job_output(self, job_id: str, step_id: str) -> str:
        """获取作业输出"""
        response = self.client.get(f'/jobs/{job_id}/steps/{step_id}/output')
        if response.status_code == 200:
            return response.text
        else:
            raise Exception(f"获取作业输出失败: {response.status_code}")

9.2.9 完整的API客户端

class KylinClient:
    """Kylin完整API客户端"""
    
    def __init__(self, host: str = "localhost", port: int = 7070,
                 username: str = "ADMIN", password: str = "KYLIN",
                 use_ssl: bool = False):
        """
        初始化Kylin客户端
        """
        # 创建基础API客户端
        self.api_client = KylinAPIClient(host, port, username, password, use_ssl)
        
        # 初始化各个管理器
        self.projects = ProjectManager(self.api_client)
        self.datasources = DataSourceManager(self.api_client)
        self.models = ModelManager(self.api_client)
        self.cubes = CubeManager(self.api_client)
        self.queries = QueryManager(self.api_client)
        self.jobs = JobManager(self.api_client)
    
    def health_check(self) -> Dict:
        """健康检查"""
        response = self.api_client.get('/admin/public_config')
        if response.status_code == 200:
            return {
                'status': 'healthy',
                'config': response.json()
            }
        else:
            return {
                'status': 'unhealthy',
                'error': f"HTTP {response.status_code}"
            }
    
    def get_version(self) -> str:
        """获取版本信息"""
        response = self.api_client.get('/admin/version')
        if response.status_code == 200:
            return response.json().get('version', 'unknown')
        else:
            return 'unknown'
    
    def close(self):
        """关闭连接"""
        if hasattr(self.api_client, 'session'):
            self.api_client.session.close()

# 使用示例
def main():
    # 创建客户端
    client = KylinClient(host='localhost', port=7070)
    
    try:
        # 健康检查
        health = client.health_check()
        print(f"健康状态: {health['status']}")
        
        # 获取版本
        version = client.get_version()
        print(f"Kylin版本: {version}")
        
        # 获取项目列表
        projects = client.projects.list_projects()
        print(f"项目数量: {len(projects)}")
        
        # 执行查询
        if projects:
            project_name = projects[0]['name']
            result = client.queries.execute_query(
                sql="SELECT COUNT(*) FROM KYLIN_SALES",
                project=project_name
            )
            print(f"查询结果: {result}")
        
    except Exception as e:
        print(f"错误: {e}")
    
    finally:
        client.close()

if __name__ == '__main__':
    main()

9.6.3 Apache Iceberg集成

#!/usr/bin/env python3
# iceberg_integration.py - Apache Iceberg集成

from pyspark.sql import SparkSession
import logging
from typing import Dict, List, Optional

class IcebergKylinIntegration:
    """Apache Iceberg与Kylin集成"""
    
    def __init__(self, spark_config: dict = None, catalog_config: dict = None):
        """
        初始化Iceberg集成
        
        Args:
            spark_config: Spark配置
            catalog_config: Catalog配置
        """
        # 默认Spark配置
        default_config = {
            "spark.app.name": "IcebergKylinIntegration",
            "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
            "spark.sql.catalog.spark_catalog": "org.apache.iceberg.spark.SparkSessionCatalog",
            "spark.sql.catalog.spark_catalog.type": "hive",
            "spark.sql.catalog.local": "org.apache.iceberg.spark.SparkCatalog",
            "spark.sql.catalog.local.type": "hadoop",
            "spark.sql.catalog.local.warehouse": "hdfs://namenode:9000/warehouse/iceberg",
            "spark.serializer": "org.apache.spark.serializer.KryoSerializer"
        }
        
        if spark_config:
            default_config.update(spark_config)
        
        # 创建Spark会话
        builder = SparkSession.builder
        for key, value in default_config.items():
            builder = builder.config(key, value)
        
        self.spark = builder.getOrCreate()
        self.logger = logging.getLogger(__name__)
        
        # Catalog配置
        self.catalog_config = catalog_config or {
            "catalog_name": "local",
            "database": "kylin_db"
        }
    
    def create_iceberg_table_from_hive(self, hive_table: str, iceberg_table: str,
                                     partition_cols: list = None):
        """从Hive表创建Iceberg表"""
        try:
            # 读取Hive表
            df = self.spark.table(hive_table)
            
            # 获取表结构
            schema = df.schema
            
            # 构建CREATE TABLE语句
            catalog = self.catalog_config['catalog_name']
            database = self.catalog_config['database']
            
            create_sql = f"""
                CREATE TABLE {catalog}.{database}.{iceberg_table}
                USING iceberg
            """
            
            if partition_cols:
                partition_spec = ", ".join(partition_cols)
                create_sql += f" PARTITIONED BY ({partition_spec})"
            
            create_sql += f" AS SELECT * FROM {hive_table}"
            
            # 执行创建
            self.spark.sql(create_sql)
            
            self.logger.info(f"成功创建Iceberg表: {catalog}.{database}.{iceberg_table}")
            
        except Exception as e:
            self.logger.error(f"创建Iceberg表失败: {e}")
            raise
    
    def merge_data_to_iceberg(self, target_table: str, source_df,
                            merge_condition: str, update_cols: list = None):
        """合并数据到Iceberg表"""
        try:
            # 创建临时视图
            temp_view = "temp_merge_source"
            source_df.createOrReplaceTempView(temp_view)
            
            # 构建MERGE语句
            catalog = self.catalog_config['catalog_name']
            database = self.catalog_config['database']
            
            if update_cols:
                update_set = ", ".join([f"{col} = source.{col}" for col in update_cols])
            else:
                update_set = ", ".join([f"{col} = source.{col}" for col in source_df.columns])
            
            merge_sql = f"""
                MERGE INTO {catalog}.{database}.{target_table} AS target
                USING {temp_view} AS source
                ON {merge_condition}
                WHEN MATCHED THEN UPDATE SET {update_set}
                WHEN NOT MATCHED THEN INSERT *
            """
            
            # 执行合并
            self.spark.sql(merge_sql)
            
            self.logger.info(f"数据合并完成: {catalog}.{database}.{target_table}")
            
        except Exception as e:
            self.logger.error(f"数据合并失败: {e}")
            raise
    
    def time_travel_query(self, table_name: str, timestamp: str = None,
                         snapshot_id: str = None, version: int = None):
        """时间旅行查询"""
        try:
            catalog = self.catalog_config['catalog_name']
            database = self.catalog_config['database']
            
            if timestamp:
                query = f"SELECT * FROM {catalog}.{database}.{table_name} TIMESTAMP AS OF '{timestamp}'"
            elif snapshot_id:
                query = f"SELECT * FROM {catalog}.{database}.{table_name} VERSION AS OF {snapshot_id}"
            elif version:
                query = f"SELECT * FROM {catalog}.{database}.{table_name} VERSION AS OF {version}"
            else:
                query = f"SELECT * FROM {catalog}.{database}.{table_name}"
            
            return self.spark.sql(query)
            
        except Exception as e:
            self.logger.error(f"时间旅行查询失败: {e}")
            raise
    
    def get_table_snapshots(self, table_name: str):
        """获取表快照信息"""
        try:
            catalog = self.catalog_config['catalog_name']
            database = self.catalog_config['database']
            
            snapshots_df = self.spark.sql(f"""
                SELECT * FROM {catalog}.{database}.{table_name}.snapshots
                ORDER BY committed_at DESC
            """)
            
            return snapshots_df
            
        except Exception as e:
            self.logger.error(f"获取表快照失败: {e}")
            raise
    
    def optimize_iceberg_table(self, table_name: str, target_file_size: str = "134217728"):
        """优化Iceberg表"""
        try:
            catalog = self.catalog_config['catalog_name']
            database = self.catalog_config['database']
            
            # 重写数据文件
            self.spark.sql(f"""
                CALL {catalog}.system.rewrite_data_files(
                    table => '{database}.{table_name}',
                    options => map('target-file-size-bytes', '{target_file_size}')
                )
            """)
            
            self.logger.info(f"Iceberg表优化完成: {catalog}.{database}.{table_name}")
            
        except Exception as e:
            self.logger.error(f"Iceberg表优化失败: {e}")
            raise
    
    def expire_snapshots(self, table_name: str, older_than: str):
        """清理过期快照"""
        try:
            catalog = self.catalog_config['catalog_name']
            database = self.catalog_config['database']
            
            self.spark.sql(f"""
                CALL {catalog}.system.expire_snapshots(
                    table => '{database}.{table_name}',
                    older_than => TIMESTAMP '{older_than}'
                )
            """)
            
            self.logger.info(f"快照清理完成: {catalog}.{database}.{table_name}")
            
        except Exception as e:
            self.logger.error(f"快照清理失败: {e}")
            raise
    
    def close(self):
        """关闭Spark会话"""
        if self.spark:
            self.spark.stop()

# 使用示例
def main():
    # 创建集成实例
    integration = IcebergKylinIntegration()
    
    try:
        # 示例1: 从Hive创建Iceberg表
        integration.create_iceberg_table_from_hive(
            hive_table="default.sales_data",
            iceberg_table="sales_iceberg",
            partition_cols=["year", "month"]
        )
        
        # 示例2: 时间旅行查询
        historical_data = integration.time_travel_query(
            table_name="sales_iceberg",
            timestamp="2023-01-01 00:00:00"
        )
        historical_data.show()
        
        # 示例3: 获取快照信息
        snapshots = integration.get_table_snapshots("sales_iceberg")
        snapshots.show()
        
        # 示例4: 优化表
        integration.optimize_iceberg_table("sales_iceberg")
        
    except Exception as e:
        print(f"错误: {e}")
    
    finally:
        integration.close()

if __name__ == '__main__':
    main()

9.7 实时数据集成

9.7.1 Kafka集成

#!/usr/bin/env python3
# kafka_integration.py - Kafka实时数据集成

from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json
import logging
from typing import Dict, List, Callable, Any
from datetime import datetime
import threading
import time

class KafkaKylinIntegration:
    """Kafka与Kylin实时数据集成"""
    
    def __init__(self, bootstrap_servers: str = "localhost:9092",
                 kylin_config: dict = None):
        """
        初始化Kafka集成
        
        Args:
            bootstrap_servers: Kafka服务器地址
            kylin_config: Kylin配置
        """
        self.bootstrap_servers = bootstrap_servers
        self.kylin_config = kylin_config or {
            'host': 'localhost',
            'port': 7070,
            'username': 'ADMIN',
            'password': 'KYLIN'
        }
        
        self.logger = logging.getLogger(__name__)
        self.running = False
        
        # 初始化Kylin SDK
        try:
            from kylin_python_sdk import KylinPythonSDK
            self.kylin_sdk = KylinPythonSDK(**self.kylin_config)
        except ImportError:
            self.logger.warning("Kylin SDK未安装,部分功能不可用")
            self.kylin_sdk = None
    
    def create_producer(self, **config) -> KafkaProducer:
        """创建Kafka生产者"""
        default_config = {
            'bootstrap_servers': self.bootstrap_servers,
            'value_serializer': lambda v: json.dumps(v).encode('utf-8'),
            'key_serializer': lambda k: str(k).encode('utf-8') if k else None,
            'acks': 'all',
            'retries': 3,
            'batch_size': 16384,
            'linger_ms': 10,
            'buffer_memory': 33554432
        }
        
        default_config.update(config)
        
        try:
            producer = KafkaProducer(**default_config)
            self.logger.info("Kafka生产者创建成功")
            return producer
        except Exception as e:
            self.logger.error(f"创建Kafka生产者失败: {e}")
            raise
    
    def create_consumer(self, topics: List[str], group_id: str = None,
                       **config) -> KafkaConsumer:
        """创建Kafka消费者"""
        default_config = {
            'bootstrap_servers': self.bootstrap_servers,
            'value_deserializer': lambda m: json.loads(m.decode('utf-8')),
            'key_deserializer': lambda m: m.decode('utf-8') if m else None,
            'group_id': group_id or 'kylin-consumer-group',
            'auto_offset_reset': 'latest',
            'enable_auto_commit': True,
            'auto_commit_interval_ms': 1000,
            'session_timeout_ms': 30000,
            'max_poll_records': 500
        }
        
        default_config.update(config)
        
        try:
            consumer = KafkaConsumer(*topics, **default_config)
            self.logger.info(f"Kafka消费者创建成功,订阅主题: {topics}")
            return consumer
        except Exception as e:
            self.logger.error(f"创建Kafka消费者失败: {e}")
            raise
    
    def send_sales_data(self, producer: KafkaProducer, topic: str,
                       sales_data: Dict[str, Any]):
        """发送销售数据到Kafka"""
        try:
            # 添加时间戳
            sales_data['timestamp'] = datetime.now().isoformat()
            
            # 发送消息
            future = producer.send(topic, value=sales_data, key=sales_data.get('order_id'))
            
            # 等待发送完成
            record_metadata = future.get(timeout=10)
            
            self.logger.debug(f"消息发送成功: topic={record_metadata.topic}, "
                            f"partition={record_metadata.partition}, "
                            f"offset={record_metadata.offset}")
            
        except KafkaError as e:
            self.logger.error(f"发送消息失败: {e}")
            raise
    
    def batch_send_data(self, producer: KafkaProducer, topic: str,
                       data_list: List[Dict[str, Any]], batch_size: int = 100):
        """批量发送数据"""
        try:
            for i in range(0, len(data_list), batch_size):
                batch = data_list[i:i + batch_size]
                
                for data in batch:
                    self.send_sales_data(producer, topic, data)
                
                # 刷新缓冲区
                producer.flush()
                
                self.logger.info(f"批次 {i//batch_size + 1} 发送完成,共 {len(batch)} 条记录")
                
        except Exception as e:
            self.logger.error(f"批量发送失败: {e}")
            raise
    
    def start_real_time_consumer(self, topics: List[str], 
                               message_handler: Callable[[Dict], None],
                               group_id: str = None):
        """启动实时消费者"""
        consumer = self.create_consumer(topics, group_id)
        self.running = True
        
        def consume_messages():
            try:
                while self.running:
                    message_batch = consumer.poll(timeout_ms=1000)
                    
                    for topic_partition, messages in message_batch.items():
                        for message in messages:
                            try:
                                # 处理消息
                                message_handler(message.value)
                                
                            except Exception as e:
                                self.logger.error(f"处理消息失败: {e}")
                                
            except Exception as e:
                self.logger.error(f"消费消息异常: {e}")
            finally:
                consumer.close()
                self.logger.info("Kafka消费者已关闭")
        
        # 启动消费线程
        consumer_thread = threading.Thread(target=consume_messages)
        consumer_thread.daemon = True
        consumer_thread.start()
        
        self.logger.info(f"实时消费者已启动,订阅主题: {topics}")
        return consumer_thread
    
    def create_streaming_aggregator(self, input_topic: str, output_topic: str,
                                  window_size: int = 60, group_id: str = None):
        """创建流式聚合器"""
        from collections import defaultdict, deque
        import time
        
        # 窗口数据存储
        window_data = defaultdict(lambda: deque())
        
        def aggregate_handler(message: Dict[str, Any]):
            try:
                current_time = time.time()
                product_id = message.get('product_id', 'unknown')
                price = float(message.get('price', 0))
                
                # 添加到窗口
                window_data[product_id].append((current_time, price))
                
                # 清理过期数据
                cutoff_time = current_time - window_size
                while (window_data[product_id] and 
                       window_data[product_id][0][0] < cutoff_time):
                    window_data[product_id].popleft()
                
                # 计算聚合结果
                if window_data[product_id]:
                    prices = [price for _, price in window_data[product_id]]
                    aggregated_data = {
                        'product_id': product_id,
                        'window_start': cutoff_time,
                        'window_end': current_time,
                        'total_sales': sum(prices),
                        'avg_price': sum(prices) / len(prices),
                        'min_price': min(prices),
                        'max_price': max(prices),
                        'order_count': len(prices),
                        'timestamp': datetime.now().isoformat()
                    }
                    
                    # 发送聚合结果
                    producer = self.create_producer()
                    try:
                        producer.send(output_topic, value=aggregated_data, key=product_id)
                        producer.flush()
                        
                        self.logger.debug(f"聚合数据已发送: {product_id}")
                        
                    finally:
                        producer.close()
                        
            except Exception as e:
                self.logger.error(f"聚合处理失败: {e}")
        
        # 启动聚合消费者
        return self.start_real_time_consumer([input_topic], aggregate_handler, group_id)
    
    def sync_to_kylin_table(self, message: Dict[str, Any], table_name: str):
        """同步数据到Kylin表"""
        if not self.kylin_sdk:
            self.logger.warning("Kylin SDK不可用,跳过同步")
            return
        
        try:
            # 这里可以实现将实时数据写入到Hive表
            # 然后触发Kylin Cube的增量构建
            
            # 示例:记录到日志文件,实际应用中可以写入Hive
            log_entry = {
                'timestamp': datetime.now().isoformat(),
                'table': table_name,
                'data': message
            }
            
            self.logger.info(f"数据同步到表 {table_name}: {log_entry}")
            
        except Exception as e:
            self.logger.error(f"同步到Kylin表失败: {e}")
    
    def stop(self):
        """停止所有消费者"""
        self.running = False
        self.logger.info("正在停止Kafka集成...")

# 使用示例
def main():
    # 创建集成实例
    integration = KafkaKylinIntegration()
    
    try:
        # 示例1: 生产者发送数据
        print("=== 生产者示例 ===")
        producer = integration.create_producer()
        
        # 模拟销售数据
        sample_data = {
            'order_id': 'ORD001',
            'product_id': 'PROD001',
            'customer_id': 'CUST001',
            'price': 99.99,
            'quantity': 2,
            'category': 'Electronics'
        }
        
        integration.send_sales_data(producer, 'sales-events', sample_data)
        producer.close()
        
        # 示例2: 消费者处理数据
        print("\n=== 消费者示例 ===")
        
        def message_handler(message):
            print(f"收到消息: {message}")
            # 同步到Kylin表
            integration.sync_to_kylin_table(message, 'kylin_sales_realtime')
        
        # 启动消费者
        consumer_thread = integration.start_real_time_consumer(
            ['sales-events'], message_handler, 'kylin-group'
        )
        
        # 示例3: 流式聚合
        print("\n=== 流式聚合示例 ===")
        aggregator_thread = integration.create_streaming_aggregator(
            'sales-events', 'sales-aggregated', window_size=60
        )
        
        # 运行一段时间
        time.sleep(10)
        
    except Exception as e:
        print(f"错误: {e}")
    
    finally:
        integration.stop()
        time.sleep(2)  # 等待线程结束

if __name__ == '__main__':
    main()

9.7.2 Flink集成

#!/usr/bin/env python3
# flink_integration.py - Flink流处理集成

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.datastream.connectors import FlinkKafkaConsumer, FlinkKafkaProducer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
import json
import logging

class FlinkKylinIntegration:
    """Flink与Kylin流处理集成"""
    
    def __init__(self, checkpoint_dir: str = "file:///tmp/flink-checkpoints"):
        """
        初始化Flink集成
        
        Args:
            checkpoint_dir: 检查点目录
        """
        # 创建流执行环境
        self.env = StreamExecutionEnvironment.get_execution_environment()
        self.env.set_parallelism(2)
        
        # 启用检查点
        self.env.enable_checkpointing(60000)  # 60秒
        self.env.get_checkpoint_config().set_checkpoint_storage_dir(checkpoint_dir)
        
        # 创建表环境
        settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
        self.table_env = StreamTableEnvironment.create(self.env, settings)
        
        self.logger = logging.getLogger(__name__)
    
    def setup_kafka_source(self, topic: str, bootstrap_servers: str = "localhost:9092",
                          group_id: str = "flink-consumer"):
        """设置Kafka数据源"""
        try:
            # Kafka消费者属性
            kafka_props = {
                'bootstrap.servers': bootstrap_servers,
                'group.id': group_id,
                'auto.offset.reset': 'latest'
            }
            
            # 创建Kafka消费者
            kafka_consumer = FlinkKafkaConsumer(
                topic,
                SimpleStringSchema(),
                kafka_props
            )
            
            # 添加到数据流
            kafka_stream = self.env.add_source(kafka_consumer)
            
            self.logger.info(f"Kafka数据源设置完成: {topic}")
            return kafka_stream
            
        except Exception as e:
            self.logger.error(f"设置Kafka数据源失败: {e}")
            raise
    
    def setup_kafka_sink(self, topic: str, bootstrap_servers: str = "localhost:9092"):
        """设置Kafka数据汇"""
        try:
            # Kafka生产者属性
            kafka_props = {
                'bootstrap.servers': bootstrap_servers,
                'acks': 'all',
                'retries': '3'
            }
            
            # 创建Kafka生产者
            kafka_producer = FlinkKafkaProducer(
                topic,
                SimpleStringSchema(),
                kafka_props
            )
            
            self.logger.info(f"Kafka数据汇设置完成: {topic}")
            return kafka_producer
            
        except Exception as e:
            self.logger.error(f"设置Kafka数据汇失败: {e}")
            raise
    
    def create_sales_processing_job(self, input_topic: str, output_topic: str):
        """创建销售数据处理作业"""
        try:
            # 设置Kafka源
            kafka_source = self.setup_kafka_source(input_topic)
            
            # 解析JSON数据
            def parse_json(value):
                try:
                    return json.loads(value)
                except:
                    return None
            
            parsed_stream = kafka_source.map(parse_json, output_type=Types.PICKLED_BYTE_ARRAY())
            
            # 过滤无效数据
            valid_stream = parsed_stream.filter(lambda x: x is not None)
            
            # 数据转换和聚合
            def enrich_data(data):
                if data:
                    # 添加处理时间戳
                    import time
                    data['process_time'] = int(time.time() * 1000)
                    
                    # 计算总金额
                    price = float(data.get('price', 0))
                    quantity = int(data.get('quantity', 1))
                    data['total_amount'] = price * quantity
                    
                    # 添加分类标签
                    if price > 100:
                        data['price_category'] = 'high'
                    elif price > 50:
                        data['price_category'] = 'medium'
                    else:
                        data['price_category'] = 'low'
                    
                    return json.dumps(data)
                return None
            
            enriched_stream = valid_stream.map(enrich_data, output_type=Types.STRING())
            
            # 过滤空值
            final_stream = enriched_stream.filter(lambda x: x is not None)
            
            # 设置Kafka汇
            kafka_sink = self.setup_kafka_sink(output_topic)
            final_stream.add_sink(kafka_sink)
            
            self.logger.info("销售数据处理作业创建完成")
            
        except Exception as e:
            self.logger.error(f"创建处理作业失败: {e}")
            raise
    
    def create_windowed_aggregation_job(self, input_topic: str, output_topic: str,
                                      window_size: int = 60):
        """创建窗口聚合作业"""
        try:
            # 使用Table API进行窗口聚合
            
            # 创建Kafka源表
            self.table_env.execute_sql(f"""
                CREATE TABLE sales_source (
                    order_id STRING,
                    product_id STRING,
                    customer_id STRING,
                    price DECIMAL(10,2),
                    quantity INT,
                    category STRING,
                    event_time TIMESTAMP(3),
                    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
                ) WITH (
                    'connector' = 'kafka',
                    'topic' = '{input_topic}',
                    'properties.bootstrap.servers' = 'localhost:9092',
                    'properties.group.id' = 'flink-aggregation',
                    'format' = 'json',
                    'json.timestamp-format.standard' = 'ISO-8601'
                )
            """)
            
            # 创建Kafka汇表
            self.table_env.execute_sql(f"""
                CREATE TABLE sales_sink (
                    window_start TIMESTAMP(3),
                    window_end TIMESTAMP(3),
                    product_id STRING,
                    total_sales DECIMAL(10,2),
                    avg_price DECIMAL(10,2),
                    order_count BIGINT,
                    max_price DECIMAL(10,2),
                    min_price DECIMAL(10,2)
                ) WITH (
                    'connector' = 'kafka',
                    'topic' = '{output_topic}',
                    'properties.bootstrap.servers' = 'localhost:9092',
                    'format' = 'json'
                )
            """)
            
            # 执行窗口聚合查询
            self.table_env.execute_sql(f"""
                INSERT INTO sales_sink
                SELECT 
                    TUMBLE_START(event_time, INTERVAL '{window_size}' SECOND) as window_start,
                    TUMBLE_END(event_time, INTERVAL '{window_size}' SECOND) as window_end,
                    product_id,
                    SUM(price * quantity) as total_sales,
                    AVG(price) as avg_price,
                    COUNT(*) as order_count,
                    MAX(price) as max_price,
                    MIN(price) as min_price
                FROM sales_source
                GROUP BY 
                    TUMBLE(event_time, INTERVAL '{window_size}' SECOND),
                    product_id
            """)
            
            self.logger.info(f"窗口聚合作业创建完成,窗口大小: {window_size}秒")
            
        except Exception as e:
            self.logger.error(f"创建窗口聚合作业失败: {e}")
            raise
    
    def create_cep_pattern_job(self, input_topic: str, output_topic: str):
        """创建复杂事件处理(CEP)作业"""
        try:
            from pyflink.cep import CEP, Pattern
            from pyflink.cep.pattern_stream import PatternStream
            
            # 设置数据源
            kafka_source = self.setup_kafka_source(input_topic)
            
            # 解析数据并添加时间戳
            def parse_with_timestamp(value):
                try:
                    data = json.loads(value)
                    import time
                    data['timestamp'] = int(time.time() * 1000)
                    return data
                except:
                    return None
            
            parsed_stream = kafka_source.map(parse_with_timestamp)
            valid_stream = parsed_stream.filter(lambda x: x is not None)
            
            # 定义CEP模式:检测异常高价订单
            pattern = Pattern.begin("high_price").where(lambda x: x.get('price', 0) > 1000) \
                           .next("another_high").where(lambda x: x.get('price', 0) > 1000) \
                           .within_time(300000)  # 5分钟内
            
            # 应用模式
            pattern_stream = CEP.pattern(valid_stream, pattern)
            
            # 处理匹配的模式
            def process_pattern(pattern_map):
                high_price_events = pattern_map.get('high_price', [])
                another_high_events = pattern_map.get('another_high', [])
                
                alert = {
                    'alert_type': 'high_price_pattern',
                    'timestamp': int(time.time() * 1000),
                    'first_order': high_price_events[0] if high_price_events else None,
                    'second_order': another_high_events[0] if another_high_events else None,
                    'message': '检测到连续高价订单模式'
                }
                
                return json.dumps(alert)
            
            alert_stream = pattern_stream.select(process_pattern)
            
            # 输出到Kafka
            kafka_sink = self.setup_kafka_sink(output_topic)
            alert_stream.add_sink(kafka_sink)
            
            self.logger.info("CEP模式检测作业创建完成")
            
        except Exception as e:
            self.logger.error(f"创建CEP作业失败: {e}")
            raise
    
    def execute_job(self, job_name: str = "Flink Kylin Integration"):
        """执行Flink作业"""
        try:
            self.logger.info(f"开始执行Flink作业: {job_name}")
            self.env.execute(job_name)
            
        except Exception as e:
            self.logger.error(f"执行Flink作业失败: {e}")
            raise

# 使用示例
def main():
    # 创建Flink集成实例
    flink_integration = FlinkKylinIntegration()
    
    try:
        # 示例1: 创建销售数据处理作业
        print("=== 创建销售数据处理作业 ===")
        flink_integration.create_sales_processing_job(
            input_topic="sales-raw",
            output_topic="sales-processed"
        )
        
        # 示例2: 创建窗口聚合作业
        print("=== 创建窗口聚合作业 ===")
        flink_integration.create_windowed_aggregation_job(
            input_topic="sales-events",
            output_topic="sales-aggregated",
            window_size=60
        )
        
        # 示例3: 创建CEP模式检测作业
        print("=== 创建CEP模式检测作业 ===")
        flink_integration.create_cep_pattern_job(
            input_topic="sales-events",
            output_topic="sales-alerts"
        )
        
        # 执行作业
        print("=== 执行Flink作业 ===")
        flink_integration.execute_job("Kylin Real-time Processing")
        
    except Exception as e:
        print(f"错误: {e}")

if __name__ == '__main__':
    main()

9.8 微服务集成

9.8.1 Spring Boot微服务

// KylinMicroservice.java - Spring Boot微服务集成
package com.example.kylin.microservice;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.cache.annotation.EnableCaching;

@SpringBootApplication
@EnableFeignClients
@EnableAsync
@EnableCaching
public class KylinMicroserviceApplication {
    public static void main(String[] args) {
        SpringApplication.run(KylinMicroserviceApplication.class, args);
    }
}

// KylinService.java - Kylin服务层
package com.example.kylin.service;

import com.example.kylin.config.KylinConfig;
import com.example.kylin.dto.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import org.springframework.http.*;
import org.springframework.scheduling.annotation.Async;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;

@Service
@Slf4j
public class KylinService {
    
    @Autowired
    private KylinConfig kylinConfig;
    
    @Autowired
    private RestTemplate restTemplate;
    
    private String authToken;
    
    /**
     * 认证获取Token
     */
    public String authenticate() {
        try {
            String url = kylinConfig.getBaseUrl() + "/api/user/authentication";
            
            HttpHeaders headers = new HttpHeaders();
            headers.setContentType(MediaType.APPLICATION_JSON);
            headers.setBasicAuth(kylinConfig.getUsername(), kylinConfig.getPassword());
            
            HttpEntity<String> entity = new HttpEntity<>(headers);
            ResponseEntity<Map> response = restTemplate.postForEntity(url, entity, Map.class);
            
            if (response.getStatusCode() == HttpStatus.OK) {
                this.authToken = (String) response.getBody().get("token");
                log.info("Kylin认证成功");
                return this.authToken;
            }
            
        } catch (Exception e) {
            log.error("Kylin认证失败: {}", e.getMessage());
        }
        return null;
    }
    
    /**
     * 获取项目列表
     */
    @Cacheable(value = "projects", unless = "#result == null")
    public List<ProjectDto> getProjects() {
        try {
            String url = kylinConfig.getBaseUrl() + "/api/projects";
            HttpHeaders headers = createAuthHeaders();
            HttpEntity<String> entity = new HttpEntity<>(headers);
            
            ResponseEntity<ProjectDto[]> response = restTemplate.exchange(
                url, HttpMethod.GET, entity, ProjectDto[].class);
            
            if (response.getStatusCode() == HttpStatus.OK) {
                return Arrays.asList(response.getBody());
            }
            
        } catch (Exception e) {
            log.error("获取项目列表失败: {}", e.getMessage());
        }
        return Collections.emptyList();
    }
    
    /**
     * 异步执行查询
     */
    @Async
    public CompletableFuture<QueryResultDto> executeQueryAsync(QueryRequestDto request) {
        try {
            String url = kylinConfig.getBaseUrl() + "/api/query";
            HttpHeaders headers = createAuthHeaders();
            HttpEntity<QueryRequestDto> entity = new HttpEntity<>(request, headers);
            
            ResponseEntity<QueryResultDto> response = restTemplate.postForEntity(
                url, entity, QueryResultDto.class);
            
            if (response.getStatusCode() == HttpStatus.OK) {
                log.info("查询执行成功: {}", request.getSql());
                return CompletableFuture.completedFuture(response.getBody());
            }
            
        } catch (Exception e) {
            log.error("查询执行失败: {}", e.getMessage());
        }
        return CompletableFuture.completedFuture(null);
    }
    
    /**
     * 获取Cube列表
     */
    @Cacheable(value = "cubes", key = "#projectName", unless = "#result == null")
    public List<CubeDto> getCubes(String projectName) {
        try {
            String url = kylinConfig.getBaseUrl() + "/api/cubes?projectName=" + projectName;
            HttpHeaders headers = createAuthHeaders();
            HttpEntity<String> entity = new HttpEntity<>(headers);
            
            ResponseEntity<CubeDto[]> response = restTemplate.exchange(
                url, HttpMethod.GET, entity, CubeDto[].class);
            
            if (response.getStatusCode() == HttpStatus.OK) {
                return Arrays.asList(response.getBody());
            }
            
        } catch (Exception e) {
            log.error("获取Cube列表失败: {}", e.getMessage());
        }
        return Collections.emptyList();
    }
    
    /**
     * 构建Cube
     */
    public JobDto buildCube(String cubeName, long startTime, long endTime) {
        try {
            String url = kylinConfig.getBaseUrl() + "/api/cubes/" + cubeName + "/build";
            
            Map<String, Object> buildRequest = new HashMap<>();
            buildRequest.put("startTime", startTime);
            buildRequest.put("endTime", endTime);
            buildRequest.put("buildType", "BUILD");
            
            HttpHeaders headers = createAuthHeaders();
            HttpEntity<Map<String, Object>> entity = new HttpEntity<>(buildRequest, headers);
            
            ResponseEntity<JobDto> response = restTemplate.postForEntity(
                url, entity, JobDto.class);
            
            if (response.getStatusCode() == HttpStatus.OK) {
                log.info("Cube构建任务提交成功: {}", cubeName);
                return response.getBody();
            }
            
        } catch (Exception e) {
            log.error("Cube构建失败: {}", e.getMessage());
        }
        return null;
    }
    
    /**
     * 获取作业状态
     */
    public JobDto getJobStatus(String jobId) {
        try {
            String url = kylinConfig.getBaseUrl() + "/api/jobs/" + jobId;
            HttpHeaders headers = createAuthHeaders();
            HttpEntity<String> entity = new HttpEntity<>(headers);
            
            ResponseEntity<JobDto> response = restTemplate.exchange(
                url, HttpMethod.GET, entity, JobDto.class);
            
            if (response.getStatusCode() == HttpStatus.OK) {
                return response.getBody();
            }
            
        } catch (Exception e) {
            log.error("获取作业状态失败: {}", e.getMessage());
        }
        return null;
    }
    
    /**
     * 创建认证头
     */
    private HttpHeaders createAuthHeaders() {
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        
        if (authToken != null) {
            headers.set("Authorization", authToken);
        } else {
            // 如果没有token,使用基本认证
            headers.setBasicAuth(kylinConfig.getUsername(), kylinConfig.getPassword());
        }
        
        return headers;
    }
}

// KylinController.java - REST控制器
package com.example.kylin.controller;

import com.example.kylin.service.KylinService;
import com.example.kylin.dto.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.validation.annotation.Validated;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import java.util.List;
import java.util.concurrent.CompletableFuture;

@RestController
@RequestMapping("/api/kylin")
@Tag(name = "Kylin API", description = "Kylin集成API")
@Validated
public class KylinController {
    
    @Autowired
    private KylinService kylinService;
    
    @PostMapping("/auth")
    @Operation(summary = "认证", description = "获取Kylin认证Token")
    public ResponseEntity<String> authenticate() {
        String token = kylinService.authenticate();
        if (token != null) {
            return ResponseEntity.ok(token);
        }
        return ResponseEntity.badRequest().body("认证失败");
    }
    
    @GetMapping("/projects")
    @Operation(summary = "获取项目列表", description = "获取所有Kylin项目")
    public ResponseEntity<List<ProjectDto>> getProjects() {
        List<ProjectDto> projects = kylinService.getProjects();
        return ResponseEntity.ok(projects);
    }
    
    @GetMapping("/cubes")
    @Operation(summary = "获取Cube列表", description = "获取指定项目的Cube列表")
    public ResponseEntity<List<CubeDto>> getCubes(@RequestParam String projectName) {
        List<CubeDto> cubes = kylinService.getCubes(projectName);
        return ResponseEntity.ok(cubes);
    }
    
    @PostMapping("/query")
    @Operation(summary = "执行查询", description = "执行SQL查询")
    public CompletableFuture<ResponseEntity<QueryResultDto>> executeQuery(
            @RequestBody @Validated QueryRequestDto request) {
        return kylinService.executeQueryAsync(request)
            .thenApply(result -> {
                if (result != null) {
                    return ResponseEntity.ok(result);
                }
                return ResponseEntity.badRequest().body(null);
            });
    }
    
    @PostMapping("/cubes/{cubeName}/build")
    @Operation(summary = "构建Cube", description = "提交Cube构建任务")
    public ResponseEntity<JobDto> buildCube(
            @PathVariable String cubeName,
            @RequestParam long startTime,
            @RequestParam long endTime) {
        JobDto job = kylinService.buildCube(cubeName, startTime, endTime);
        if (job != null) {
            return ResponseEntity.ok(job);
        }
        return ResponseEntity.badRequest().body(null);
    }
    
    @GetMapping("/jobs/{jobId}")
    @Operation(summary = "获取作业状态", description = "获取构建作业的状态")
    public ResponseEntity<JobDto> getJobStatus(@PathVariable String jobId) {
        JobDto job = kylinService.getJobStatus(jobId);
        if (job != null) {
            return ResponseEntity.ok(job);
        }
        return ResponseEntity.notFound().build();
    }
}

9.8.2 Docker容器化

# Dockerfile - Kylin微服务容器化
FROM openjdk:11-jre-slim

# 设置工作目录
WORKDIR /app

# 安装必要的工具
RUN apt-get update && apt-get install -y \
    curl \
    wget \
    && rm -rf /var/lib/apt/lists/*

# 复制应用JAR文件
COPY target/kylin-microservice-*.jar app.jar

# 复制配置文件
COPY src/main/resources/application.yml application.yml

# 设置环境变量
ENV JAVA_OPTS="-Xmx512m -Xms256m"
ENV SPRING_PROFILES_ACTIVE=docker

# 暴露端口
EXPOSE 8080

# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
    CMD curl -f http://localhost:8080/actuator/health || exit 1

# 启动应用
ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -jar app.jar"]
# docker-compose.yml - 容器编排
version: '3.8'

services:
  kylin-microservice:
    build: .
    ports:
      - "8080:8080"
    environment:
      - SPRING_PROFILES_ACTIVE=docker
      - KYLIN_HOST=kylin-server
      - KYLIN_PORT=7070
      - KYLIN_USERNAME=ADMIN
      - KYLIN_PASSWORD=KYLIN
      - REDIS_HOST=redis
      - REDIS_PORT=6379
    depends_on:
      - redis
      - kylin-server
    networks:
      - kylin-network
    restart: unless-stopped
    
  redis:
    image: redis:6-alpine
    ports:
      - "6379:6379"
    networks:
      - kylin-network
    restart: unless-stopped
    
  kylin-server:
    image: apachekylin/apache-kylin-standalone:4.0.1
    ports:
      - "7070:7070"
      - "8088:8088"
    environment:
      - KYLIN_HOME=/opt/kylin
    volumes:
      - kylin-data:/opt/kylin/logs
      - kylin-conf:/opt/kylin/conf
    networks:
      - kylin-network
    restart: unless-stopped
    
  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
    depends_on:
      - kylin-microservice
    networks:
      - kylin-network
    restart: unless-stopped

volumes:
  kylin-data:
  kylin-conf:

networks:
  kylin-network:
    driver: bridge

9.9 安全认证与授权

9.9.1 OAuth2集成

// OAuth2SecurityConfig.java - OAuth2安全配置
package com.example.kylin.security;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.oauth2.server.resource.authentication.JwtAuthenticationConverter;
import org.springframework.security.web.SecurityFilterChain;
import org.springframework.security.oauth2.jwt.JwtDecoder;
import org.springframework.security.oauth2.jwt.NimbusJwtDecoder;

@Configuration
@EnableWebSecurity
public class OAuth2SecurityConfig {
    
    @Bean
    public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
        http
            .authorizeHttpRequests(authz -> authz
                .requestMatchers("/api/public/**").permitAll()
                .requestMatchers("/actuator/health").permitAll()
                .requestMatchers("/swagger-ui/**", "/v3/api-docs/**").permitAll()
                .requestMatchers("/api/kylin/auth").permitAll()
                .requestMatchers("/api/kylin/**").hasRole("USER")
                .anyRequest().authenticated()
            )
            .oauth2ResourceServer(oauth2 -> oauth2
                .jwt(jwt -> jwt
                    .jwtAuthenticationConverter(jwtAuthenticationConverter())
                )
            )
            .csrf().disable();
        
        return http.build();
    }
    
    @Bean
    public JwtAuthenticationConverter jwtAuthenticationConverter() {
        JwtAuthenticationConverter converter = new JwtAuthenticationConverter();
        converter.setJwtGrantedAuthoritiesConverter(jwt -> {
            // 从JWT中提取角色信息
            List<String> roles = jwt.getClaimAsStringList("roles");
            return roles.stream()
                .map(role -> new SimpleGrantedAuthority("ROLE_" + role.toUpperCase()))
                .collect(Collectors.toList());
        });
        return converter;
    }
    
    @Bean
    public JwtDecoder jwtDecoder() {
        // 配置JWT解码器
        return NimbusJwtDecoder.withJwkSetUri("https://your-auth-server/.well-known/jwks.json")
            .build();
    }
}

// KylinSecurityService.java - Kylin安全服务
package com.example.kylin.security;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.oauth2.jwt.Jwt;
import org.springframework.stereotype.Service;
import lombok.extern.slf4j.Slf4j;

@Service
@Slf4j
public class KylinSecurityService {
    
    /**
     * 获取当前用户信息
     */
    public UserInfo getCurrentUser() {
        Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
        
        if (authentication != null && authentication.getPrincipal() instanceof Jwt) {
            Jwt jwt = (Jwt) authentication.getPrincipal();
            
            return UserInfo.builder()
                .username(jwt.getClaimAsString("sub"))
                .email(jwt.getClaimAsString("email"))
                .roles(jwt.getClaimAsStringList("roles"))
                .build();
        }
        
        return null;
    }
    
    /**
     * 检查用户是否有访问项目的权限
     */
    public boolean hasProjectAccess(String projectName) {
        UserInfo user = getCurrentUser();
        if (user == null) {
            return false;
        }
        
        // 检查用户角色和项目权限
        if (user.getRoles().contains("ADMIN")) {
            return true;
        }
        
        // 检查项目特定权限
        return user.getRoles().contains("PROJECT_" + projectName.toUpperCase());
    }
    
    /**
     * 检查用户是否有执行查询的权限
     */
    public boolean hasQueryAccess(String sql) {
        UserInfo user = getCurrentUser();
        if (user == null) {
            return false;
        }
        
        // 检查SQL注入和敏感操作
        String upperSql = sql.toUpperCase();
        if (upperSql.contains("DROP") || upperSql.contains("DELETE") || 
            upperSql.contains("UPDATE") || upperSql.contains("INSERT")) {
            return user.getRoles().contains("ADMIN");
        }
        
        return user.getRoles().contains("USER") || user.getRoles().contains("ANALYST");
    }
}

9.9.2 API密钥认证

#!/usr/bin/env python3
# api_key_auth.py - API密钥认证

import hashlib
import hmac
import time
import jwt
import secrets
from typing import Dict, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass

@dataclass
class ApiKey:
    """API密钥信息"""
    key_id: str
    secret: str
    user_id: str
    permissions: list
    created_at: datetime
    expires_at: Optional[datetime] = None
    is_active: bool = True

class ApiKeyManager:
    """API密钥管理器"""
    
    def __init__(self, secret_key: str):
        self.secret_key = secret_key
        self.api_keys: Dict[str, ApiKey] = {}
    
    def generate_api_key(self, user_id: str, permissions: list, 
                        expires_days: int = 365) -> ApiKey:
        """生成API密钥"""
        key_id = secrets.token_urlsafe(16)
        secret = secrets.token_urlsafe(32)
        
        expires_at = datetime.now() + timedelta(days=expires_days) if expires_days > 0 else None
        
        api_key = ApiKey(
            key_id=key_id,
            secret=secret,
            user_id=user_id,
            permissions=permissions,
            created_at=datetime.now(),
            expires_at=expires_at
        )
        
        self.api_keys[key_id] = api_key
        return api_key
    
    def validate_api_key(self, key_id: str, signature: str, 
                        timestamp: str, request_data: str) -> Optional[ApiKey]:
        """验证API密钥"""
        api_key = self.api_keys.get(key_id)
        
        if not api_key or not api_key.is_active:
            return None
        
        # 检查过期时间
        if api_key.expires_at and datetime.now() > api_key.expires_at:
            return None
        
        # 检查时间戳(防重放攻击)
        try:
            request_time = datetime.fromtimestamp(int(timestamp))
            if abs((datetime.now() - request_time).total_seconds()) > 300:  # 5分钟
                return None
        except (ValueError, TypeError):
            return None
        
        # 验证签名
        expected_signature = self._generate_signature(
            api_key.secret, timestamp, request_data
        )
        
        if not hmac.compare_digest(signature, expected_signature):
            return None
        
        return api_key
    
    def _generate_signature(self, secret: str, timestamp: str, data: str) -> str:
        """生成请求签名"""
        message = f"{timestamp}:{data}"
        return hmac.new(
            secret.encode('utf-8'),
            message.encode('utf-8'),
            hashlib.sha256
        ).hexdigest()
    
    def revoke_api_key(self, key_id: str) -> bool:
        """撤销API密钥"""
        if key_id in self.api_keys:
            self.api_keys[key_id].is_active = False
            return True
        return False
    
    def list_user_keys(self, user_id: str) -> list:
        """列出用户的API密钥"""
        return [
            {
                'key_id': key.key_id,
                'permissions': key.permissions,
                'created_at': key.created_at.isoformat(),
                'expires_at': key.expires_at.isoformat() if key.expires_at else None,
                'is_active': key.is_active
            }
            for key in self.api_keys.values()
            if key.user_id == user_id
        ]

class KylinApiKeyAuth:
    """Kylin API密钥认证"""
    
    def __init__(self, kylin_config: dict, api_key_manager: ApiKeyManager):
        self.kylin_config = kylin_config
        self.api_key_manager = api_key_manager
    
    def authenticate_request(self, headers: dict, request_data: str) -> Optional[dict]:
        """认证API请求"""
        # 提取认证头信息
        auth_header = headers.get('Authorization', '')
        if not auth_header.startswith('ApiKey '):
            return None
        
        try:
            # 解析认证信息
            auth_parts = auth_header[7:].split(':')
            if len(auth_parts) != 3:
                return None
            
            key_id, timestamp, signature = auth_parts
            
            # 验证API密钥
            api_key = self.api_key_manager.validate_api_key(
                key_id, signature, timestamp, request_data
            )
            
            if api_key:
                return {
                    'user_id': api_key.user_id,
                    'permissions': api_key.permissions,
                    'key_id': api_key.key_id
                }
            
        except Exception as e:
            print(f"认证错误: {e}")
        
        return None
    
    def check_permission(self, user_info: dict, required_permission: str) -> bool:
        """检查权限"""
        if not user_info:
            return False
        
        permissions = user_info.get('permissions', [])
        return 'admin' in permissions or required_permission in permissions

# 使用示例
def main():
    # 创建API密钥管理器
    manager = ApiKeyManager("your-secret-key")
    
    # 生成API密钥
    api_key = manager.generate_api_key(
        user_id="user123",
        permissions=["kylin:query", "kylin:project:read"],
        expires_days=30
    )
    
    print(f"API Key ID: {api_key.key_id}")
    print(f"Secret: {api_key.secret}")
    
    # 模拟客户端请求
    timestamp = str(int(time.time()))
    request_data = '{"sql": "SELECT * FROM sales"}'
    
    # 生成签名
    signature = hmac.new(
        api_key.secret.encode('utf-8'),
        f"{timestamp}:{request_data}".encode('utf-8'),
        hashlib.sha256
    ).hexdigest()
    
    # 验证请求
    headers = {
        'Authorization': f'ApiKey {api_key.key_id}:{timestamp}:{signature}'
    }
    
    auth = KylinApiKeyAuth({}, manager)
    user_info = auth.authenticate_request(headers, request_data)
    
    if user_info:
        print(f"认证成功: {user_info}")
        
        # 检查权限
        if auth.check_permission(user_info, "kylin:query"):
            print("有查询权限")
        else:
            print("无查询权限")
    else:
        print("认证失败")

if __name__ == '__main__':
    main()

9.10 性能优化

9.10.1 连接池优化

// OptimizedKylinConnectionPool.java - 优化的连接池
package com.example.kylin.pool;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;
import java.util.concurrent.TimeUnit;

@Configuration
public class OptimizedKylinConnectionPool {
    
    @Value("${kylin.jdbc.url}")
    private String jdbcUrl;
    
    @Value("${kylin.jdbc.username}")
    private String username;
    
    @Value("${kylin.jdbc.password}")
    private String password;
    
    @Bean
    @Primary
    public DataSource kylinDataSource() {
        HikariConfig config = new HikariConfig();
        
        // 基本配置
        config.setJdbcUrl(jdbcUrl);
        config.setUsername(username);
        config.setPassword(password);
        config.setDriverClassName("org.apache.kylin.jdbc.Driver");
        
        // 连接池大小配置
        config.setMinimumIdle(5);                    // 最小空闲连接数
        config.setMaximumPoolSize(20);               // 最大连接数
        config.setIdleTimeout(TimeUnit.MINUTES.toMillis(10));  // 空闲超时
        config.setMaxLifetime(TimeUnit.HOURS.toMillis(2));     // 连接最大生命周期
        
        // 连接验证配置
        config.setConnectionTestQuery("SELECT 1");
        config.setValidationTimeout(TimeUnit.SECONDS.toMillis(5));
        
        // 连接超时配置
        config.setConnectionTimeout(TimeUnit.SECONDS.toMillis(30));
        config.setLeakDetectionThreshold(TimeUnit.MINUTES.toMillis(2));
        
        // 性能优化配置
        config.addDataSourceProperty("cachePrepStmts", "true");
        config.addDataSourceProperty("prepStmtCacheSize", "250");
        config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
        config.addDataSourceProperty("useServerPrepStmts", "true");
        config.addDataSourceProperty("useLocalSessionState", "true");
        config.addDataSourceProperty("rewriteBatchedStatements", "true");
        config.addDataSourceProperty("cacheResultSetMetadata", "true");
        config.addDataSourceProperty("cacheServerConfiguration", "true");
        config.addDataSourceProperty("elideSetAutoCommits", "true");
        config.addDataSourceProperty("maintainTimeStats", "false");
        
        // 连接池名称
        config.setPoolName("KylinHikariPool");
        
        return new HikariDataSource(config);
    }
}

9.10.2 查询缓存优化

#!/usr/bin/env python3
# query_cache_optimizer.py - 查询缓存优化

import redis
import json
import hashlib
import time
import logging
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta

@dataclass
class CacheEntry:
    """缓存条目"""
    key: str
    data: Any
    created_at: datetime
    expires_at: datetime
    hit_count: int = 0
    size_bytes: int = 0

class QueryCacheOptimizer:
    """查询缓存优化器"""
    
    def __init__(self, redis_config: dict = None, cache_config: dict = None):
        """
        初始化缓存优化器
        
        Args:
            redis_config: Redis配置
            cache_config: 缓存配置
        """
        # Redis连接
        redis_config = redis_config or {
            'host': 'localhost',
            'port': 6379,
            'db': 0,
            'decode_responses': True
        }
        self.redis_client = redis.Redis(**redis_config)
        
        # 缓存配置
        self.cache_config = cache_config or {
            'default_ttl': 3600,        # 默认TTL(秒)
            'max_cache_size': 1000,     # 最大缓存条目数
            'max_memory_mb': 512,       # 最大内存使用(MB)
            'compression_threshold': 1024,  # 压缩阈值(字节)
            'cache_key_prefix': 'kylin:query:'
        }
        
        self.logger = logging.getLogger(__name__)
        
        # 统计信息
        self.stats = {
            'hits': 0,
            'misses': 0,
            'evictions': 0,
            'total_queries': 0
        }
    
    def generate_cache_key(self, sql: str, project: str = None, 
                          params: dict = None) -> str:
        """生成缓存键"""
        # 标准化SQL
        normalized_sql = self._normalize_sql(sql)
        
        # 构建缓存键数据
        key_data = {
            'sql': normalized_sql,
            'project': project or 'default',
            'params': params or {}
        }
        
        # 生成哈希
        key_str = json.dumps(key_data, sort_keys=True)
        key_hash = hashlib.md5(key_str.encode('utf-8')).hexdigest()
        
        return f"{self.cache_config['cache_key_prefix']}{key_hash}"
    
    def get_cached_result(self, cache_key: str) -> Optional[Dict[str, Any]]:
        """获取缓存结果"""
        try:
            cached_data = self.redis_client.get(cache_key)
            if cached_data:
                result = json.loads(cached_data)
                
                # 更新命中统计
                self.stats['hits'] += 1
                self._update_hit_count(cache_key)
                
                self.logger.debug(f"缓存命中: {cache_key}")
                return result
            else:
                self.stats['misses'] += 1
                self.logger.debug(f"缓存未命中: {cache_key}")
                return None
                
        except Exception as e:
            self.logger.error(f"获取缓存失败: {e}")
            return None
    
    def set_cached_result(self, cache_key: str, result: Dict[str, Any], 
                         ttl: int = None) -> bool:
        """设置缓存结果"""
        try:
            ttl = ttl or self.cache_config['default_ttl']
            
            # 序列化数据
            cached_data = json.dumps(result)
            data_size = len(cached_data.encode('utf-8'))
            
            # 检查是否需要压缩
            if data_size > self.cache_config['compression_threshold']:
                cached_data = self._compress_data(cached_data)
            
            # 检查缓存容量
            if not self._check_cache_capacity(data_size):
                self._evict_cache_entries()
            
            # 设置缓存
            success = self.redis_client.setex(cache_key, ttl, cached_data)
            
            if success:
                # 记录缓存元数据
                self._record_cache_metadata(cache_key, data_size, ttl)
                self.logger.debug(f"缓存设置成功: {cache_key}, 大小: {data_size} bytes")
            
            return success
            
        except Exception as e:
            self.logger.error(f"设置缓存失败: {e}")
            return False
    
    def invalidate_cache(self, pattern: str = None) -> int:
        """失效缓存"""
        try:
            if pattern:
                # 按模式删除
                keys = self.redis_client.keys(f"{self.cache_config['cache_key_prefix']}{pattern}*")
            else:
                # 删除所有查询缓存
                keys = self.redis_client.keys(f"{self.cache_config['cache_key_prefix']}*")
            
            if keys:
                deleted = self.redis_client.delete(*keys)
                self.logger.info(f"失效缓存: {deleted} 个条目")
                return deleted
            
            return 0
            
        except Exception as e:
            self.logger.error(f"失效缓存失败: {e}")
            return 0
    
    def get_cache_stats(self) -> Dict[str, Any]:
        """获取缓存统计"""
        try:
            # Redis信息
            redis_info = self.redis_client.info('memory')
            
            # 缓存键数量
            cache_keys = len(self.redis_client.keys(f"{self.cache_config['cache_key_prefix']}*"))
            
            # 计算命中率
            total_requests = self.stats['hits'] + self.stats['misses']
            hit_rate = (self.stats['hits'] / total_requests * 100) if total_requests > 0 else 0
            
            return {
                'hit_rate': round(hit_rate, 2),
                'total_hits': self.stats['hits'],
                'total_misses': self.stats['misses'],
                'total_evictions': self.stats['evictions'],
                'cache_keys_count': cache_keys,
                'redis_memory_used': redis_info.get('used_memory_human', 'N/A'),
                'redis_memory_peak': redis_info.get('used_memory_peak_human', 'N/A')
            }
            
        except Exception as e:
            self.logger.error(f"获取缓存统计失败: {e}")
            return {}
    
    def optimize_cache(self) -> Dict[str, Any]:
        """优化缓存"""
        optimization_report = {
            'actions_taken': [],
            'before_stats': self.get_cache_stats(),
            'after_stats': None
        }
        
        try:
            # 1. 清理过期键
            expired_keys = self._cleanup_expired_keys()
            if expired_keys > 0:
                optimization_report['actions_taken'].append(
                    f"清理过期键: {expired_keys} 个"
                )
            
            # 2. 清理低命中率缓存
            low_hit_keys = self._cleanup_low_hit_cache()
            if low_hit_keys > 0:
                optimization_report['actions_taken'].append(
                    f"清理低命中率缓存: {low_hit_keys} 个"
                )
            
            # 3. 内存压缩
            if self._should_compress_memory():
                compressed = self._compress_cache_memory()
                optimization_report['actions_taken'].append(
                    f"内存压缩: 节省 {compressed} bytes"
                )
            
            # 4. 更新统计
            optimization_report['after_stats'] = self.get_cache_stats()
            
            self.logger.info(f"缓存优化完成: {optimization_report['actions_taken']}")
            
        except Exception as e:
            self.logger.error(f"缓存优化失败: {e}")
            optimization_report['error'] = str(e)
        
        return optimization_report
    
    def _normalize_sql(self, sql: str) -> str:
        """标准化SQL"""
        # 移除多余空格和换行
        normalized = ' '.join(sql.split())
        
        # 转换为大写(可选)
        # normalized = normalized.upper()
        
        return normalized.strip()
    
    def _update_hit_count(self, cache_key: str):
        """更新命中次数"""
        try:
            hit_key = f"{cache_key}:hits"
            self.redis_client.incr(hit_key)
            self.redis_client.expire(hit_key, 86400)  # 24小时过期
        except Exception as e:
            self.logger.debug(f"更新命中次数失败: {e}")
    
    def _check_cache_capacity(self, data_size: int) -> bool:
        """检查缓存容量"""
        try:
            # 检查内存使用
            redis_info = self.redis_client.info('memory')
            used_memory_mb = redis_info.get('used_memory', 0) / 1024 / 1024
            
            if used_memory_mb > self.cache_config['max_memory_mb']:
                return False
            
            # 检查键数量
            cache_keys = len(self.redis_client.keys(f"{self.cache_config['cache_key_prefix']}*"))
            if cache_keys >= self.cache_config['max_cache_size']:
                return False
            
            return True
            
        except Exception as e:
            self.logger.error(f"检查缓存容量失败: {e}")
            return True
    
    def _evict_cache_entries(self, count: int = 10):
        """驱逐缓存条目"""
        try:
            # 获取所有缓存键
            cache_keys = self.redis_client.keys(f"{self.cache_config['cache_key_prefix']}*")
            
            # 按命中次数排序,删除最少使用的
            key_hits = []
            for key in cache_keys[:100]:  # 限制检查数量
                hit_key = f"{key}:hits"
                hits = int(self.redis_client.get(hit_key) or 0)
                key_hits.append((key, hits))
            
            # 排序并删除最少使用的
            key_hits.sort(key=lambda x: x[1])
            keys_to_delete = [key for key, _ in key_hits[:count]]
            
            if keys_to_delete:
                deleted = self.redis_client.delete(*keys_to_delete)
                self.stats['evictions'] += deleted
                self.logger.info(f"驱逐缓存条目: {deleted} 个")
            
        except Exception as e:
            self.logger.error(f"驱逐缓存条目失败: {e}")
    
    def _cleanup_expired_keys(self) -> int:
        """清理过期键"""
        # Redis会自动清理过期键,这里主要是统计
        return 0
    
    def _cleanup_low_hit_cache(self, min_hits: int = 2) -> int:
        """清理低命中率缓存"""
        try:
            cache_keys = self.redis_client.keys(f"{self.cache_config['cache_key_prefix']}*")
            keys_to_delete = []
            
            for key in cache_keys:
                hit_key = f"{key}:hits"
                hits = int(self.redis_client.get(hit_key) or 0)
                
                if hits < min_hits:
                    keys_to_delete.append(key)
            
            if keys_to_delete:
                deleted = self.redis_client.delete(*keys_to_delete)
                return deleted
            
            return 0
            
        except Exception as e:
            self.logger.error(f"清理低命中率缓存失败: {e}")
            return 0
    
    def _should_compress_memory(self) -> bool:
        """是否应该压缩内存"""
        try:
            redis_info = self.redis_client.info('memory')
            used_memory_mb = redis_info.get('used_memory', 0) / 1024 / 1024
            return used_memory_mb > self.cache_config['max_memory_mb'] * 0.8
        except:
            return False
    
    def _compress_cache_memory(self) -> int:
        """压缩缓存内存"""
        # 这里可以实现具体的压缩逻辑
        return 0
    
    def _compress_data(self, data: str) -> str:
        """压缩数据"""
        import gzip
        import base64
        
        compressed = gzip.compress(data.encode('utf-8'))
        return base64.b64encode(compressed).decode('utf-8')
    
    def _record_cache_metadata(self, cache_key: str, size: int, ttl: int):
        """记录缓存元数据"""
        try:
            metadata = {
                'size': size,
                'created_at': datetime.now().isoformat(),
                'ttl': ttl
            }
            
            metadata_key = f"{cache_key}:metadata"
            self.redis_client.setex(metadata_key, ttl, json.dumps(metadata))
            
        except Exception as e:
            self.logger.debug(f"记录缓存元数据失败: {e}")

# 使用示例
def main():
    # 创建缓存优化器
    cache_optimizer = QueryCacheOptimizer()
    
    # 示例查询
    sql = "SELECT product_id, SUM(sales_amount) FROM sales_fact GROUP BY product_id"
    cache_key = cache_optimizer.generate_cache_key(sql, "sales_project")
    
    print(f"缓存键: {cache_key}")
    
    # 检查缓存
    cached_result = cache_optimizer.get_cached_result(cache_key)
    if cached_result:
        print("使用缓存结果")
    else:
        print("执行查询并缓存结果")
        
        # 模拟查询结果
        query_result = {
            'columns': ['product_id', 'total_sales'],
            'data': [['PROD001', 10000], ['PROD002', 15000]],
            'execution_time': 1.5
        }
        
        # 缓存结果
        cache_optimizer.set_cached_result(cache_key, query_result)
    
    # 获取统计信息
    stats = cache_optimizer.get_cache_stats()
    print(f"缓存统计: {stats}")
    
    # 优化缓存
    optimization_report = cache_optimizer.optimize_cache()
    print(f"优化报告: {optimization_report}")

if __name__ == '__main__':
    main()

9.11 最佳实践

9.11.1 API设计最佳实践

#!/usr/bin/env python3
# api_best_practices.py - API设计最佳实践

from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from datetime import datetime
import logging
import asyncio
import aiohttp
import json

@dataclass
class ApiResponse:
    """标准API响应格式"""
    success: bool
    data: Any = None
    message: str = ""
    error_code: str = None
    timestamp: datetime = None
    request_id: str = None
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now()

class KylinApiBestPractices:
    """Kylin API最佳实践"""
    
    def __init__(self, config: dict):
        self.config = config
        self.logger = logging.getLogger(__name__)
        
        # 配置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
    
    def create_standard_response(self, success: bool, data: Any = None, 
                               message: str = "", error_code: str = None,
                               request_id: str = None) -> Dict[str, Any]:
        """创建标准响应格式"""
        response = ApiResponse(
            success=success,
            data=data,
            message=message,
            error_code=error_code,
            request_id=request_id
        )
        
        return {
            'success': response.success,
            'data': response.data,
            'message': response.message,
            'error_code': response.error_code,
            'timestamp': response.timestamp.isoformat(),
            'request_id': response.request_id
        }
    
    def validate_query_request(self, request: Dict[str, Any]) -> tuple[bool, str]:
        """验证查询请求"""
        # 必需字段检查
        required_fields = ['sql', 'project']
        for field in required_fields:
            if field not in request:
                return False, f"缺少必需字段: {field}"
        
        # SQL注入检查
        sql = request['sql'].upper()
        dangerous_keywords = ['DROP', 'DELETE', 'UPDATE', 'INSERT', 'TRUNCATE', 'ALTER']
        for keyword in dangerous_keywords:
            if keyword in sql:
                return False, f"SQL包含危险关键字: {keyword}"
        
        # SQL长度检查
        if len(request['sql']) > 10000:
            return False, "SQL语句过长"
        
        # 项目名称检查
        if not request['project'].replace('_', '').isalnum():
            return False, "项目名称包含非法字符"
        
        return True, "验证通过"
    
    async def execute_query_with_retry(self, request: Dict[str, Any], 
                                     max_retries: int = 3) -> Dict[str, Any]:
        """带重试的查询执行"""
        for attempt in range(max_retries):
            try:
                # 验证请求
                is_valid, message = self.validate_query_request(request)
                if not is_valid:
                    return self.create_standard_response(
                        success=False,
                        message=message,
                        error_code="VALIDATION_ERROR"
                    )
                
                # 执行查询
                result = await self._execute_kylin_query(request)
                
                return self.create_standard_response(
                    success=True,
                    data=result,
                    message="查询执行成功"
                )
                
            except aiohttp.ClientTimeout:
                if attempt == max_retries - 1:
                    return self.create_standard_response(
                        success=False,
                        message="查询超时",
                        error_code="TIMEOUT_ERROR"
                    )
                await asyncio.sleep(2 ** attempt)  # 指数退避
                
            except aiohttp.ClientError as e:
                if attempt == max_retries - 1:
                    return self.create_standard_response(
                        success=False,
                        message=f"网络错误: {str(e)}",
                        error_code="NETWORK_ERROR"
                    )
                await asyncio.sleep(2 ** attempt)
                
            except Exception as e:
                self.logger.error(f"查询执行失败: {e}")
                return self.create_standard_response(
                    success=False,
                    message=f"内部错误: {str(e)}",
                    error_code="INTERNAL_ERROR"
                )
    
    async def _execute_kylin_query(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """执行Kylin查询"""
        timeout = aiohttp.ClientTimeout(total=300)  # 5分钟超时
        
        async with aiohttp.ClientSession(timeout=timeout) as session:
            # 认证
            auth_url = f"{self.config['base_url']}/api/user/authentication"
            auth_data = aiohttp.BasicAuth(
                self.config['username'], 
                self.config['password']
            )
            
            async with session.post(auth_url, auth=auth_data) as auth_response:
                if auth_response.status != 200:
                    raise Exception("认证失败")
                
                auth_result = await auth_response.json()
                token = auth_result.get('token')
            
            # 执行查询
            query_url = f"{self.config['base_url']}/api/query"
            headers = {
                'Authorization': token,
                'Content-Type': 'application/json'
            }
            
            query_data = {
                'sql': request['sql'],
                'project': request['project'],
                'format': 'json'
            }
            
            async with session.post(query_url, headers=headers, 
                                  json=query_data) as query_response:
                if query_response.status != 200:
                    error_text = await query_response.text()
                    raise Exception(f"查询失败: {error_text}")
                
                return await query_response.json()
    
    def implement_rate_limiting(self, user_id: str, endpoint: str) -> tuple[bool, str]:
        """实现速率限制"""
        # 这里可以集成Redis或其他存储来实现速率限制
        # 示例:每分钟最多100次请求
        
        rate_limit_key = f"rate_limit:{user_id}:{endpoint}"
        current_time = datetime.now()
        
        # 简化的内存实现(生产环境应使用Redis)
        if not hasattr(self, '_rate_limits'):
            self._rate_limits = {}
        
        if rate_limit_key not in self._rate_limits:
            self._rate_limits[rate_limit_key] = {
                'count': 1,
                'window_start': current_time
            }
            return True, "请求允许"
        
        rate_data = self._rate_limits[rate_limit_key]
        
        # 检查时间窗口
        if (current_time - rate_data['window_start']).seconds >= 60:
            # 重置窗口
            rate_data['count'] = 1
            rate_data['window_start'] = current_time
            return True, "请求允许"
        
        # 检查请求数量
        if rate_data['count'] >= 100:
            return False, "请求过于频繁,请稍后再试"
        
        rate_data['count'] += 1
        return True, "请求允许"
    
    def log_api_metrics(self, endpoint: str, method: str, status_code: int, 
                       response_time: float, user_id: str = None):
        """记录API指标"""
        metrics = {
            'endpoint': endpoint,
            'method': method,
            'status_code': status_code,
            'response_time_ms': response_time * 1000,
            'user_id': user_id,
            'timestamp': datetime.now().isoformat()
        }
        
        # 记录到日志
        self.logger.info(f"API_METRICS: {json.dumps(metrics)}")
        
        # 这里可以发送到监控系统(如Prometheus、Grafana等)
        self._send_to_monitoring_system(metrics)
    
    def _send_to_monitoring_system(self, metrics: Dict[str, Any]):
        """发送指标到监控系统"""
        # 示例:发送到Prometheus
        try:
            # 这里可以使用prometheus_client库
            pass
        except Exception as e:
            self.logger.error(f"发送监控指标失败: {e}")

# 使用示例
async def main():
    config = {
        'base_url': 'http://localhost:7070/kylin',
        'username': 'ADMIN',
        'password': 'KYLIN'
    }
    
    api_bp = KylinApiBestPractices(config)
    
    # 示例查询请求
    query_request = {
        'sql': 'SELECT product_id, SUM(sales_amount) FROM sales_fact GROUP BY product_id LIMIT 10',
        'project': 'sales_project'
    }
    
    # 检查速率限制
    allowed, message = api_bp.implement_rate_limiting('user123', '/api/query')
    if not allowed:
        print(f"请求被限制: {message}")
        return
    
    # 执行查询
    start_time = datetime.now()
    result = await api_bp.execute_query_with_retry(query_request)
    end_time = datetime.now()
    
    response_time = (end_time - start_time).total_seconds()
    
    # 记录指标
    api_bp.log_api_metrics(
        endpoint='/api/query',
        method='POST',
        status_code=200 if result['success'] else 400,
        response_time=response_time,
        user_id='user123'
    )
    
    print(f"查询结果: {result}")

if __name__ == '__main__':
    asyncio.run(main())

9.11.2 错误处理最佳实践

#!/usr/bin/env python3
# error_handling_best_practices.py - 错误处理最佳实践

import traceback
import logging
from typing import Dict, Any, Optional
from enum import Enum
from dataclasses import dataclass
from datetime import datetime

class ErrorType(Enum):
    """错误类型枚举"""
    VALIDATION_ERROR = "VALIDATION_ERROR"
    AUTHENTICATION_ERROR = "AUTHENTICATION_ERROR"
    AUTHORIZATION_ERROR = "AUTHORIZATION_ERROR"
    NETWORK_ERROR = "NETWORK_ERROR"
    TIMEOUT_ERROR = "TIMEOUT_ERROR"
    KYLIN_ERROR = "KYLIN_ERROR"
    INTERNAL_ERROR = "INTERNAL_ERROR"
    RATE_LIMIT_ERROR = "RATE_LIMIT_ERROR"

@dataclass
class ErrorInfo:
    """错误信息"""
    error_type: ErrorType
    message: str
    details: Optional[str] = None
    error_code: Optional[str] = None
    timestamp: datetime = None
    trace_id: Optional[str] = None
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now()

class KylinErrorHandler:
    """Kylin错误处理器"""
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        
        # 错误码映射
        self.error_code_mapping = {
            ErrorType.VALIDATION_ERROR: "E001",
            ErrorType.AUTHENTICATION_ERROR: "E002",
            ErrorType.AUTHORIZATION_ERROR: "E003",
            ErrorType.NETWORK_ERROR: "E004",
            ErrorType.TIMEOUT_ERROR: "E005",
            ErrorType.KYLIN_ERROR: "E006",
            ErrorType.INTERNAL_ERROR: "E007",
            ErrorType.RATE_LIMIT_ERROR: "E008"
        }
    
    def handle_error(self, error: Exception, context: Dict[str, Any] = None) -> ErrorInfo:
        """统一错误处理"""
        context = context or {}
        
        # 根据异常类型确定错误类型
        error_type = self._classify_error(error)
        
        # 生成错误信息
        error_info = ErrorInfo(
            error_type=error_type,
            message=self._get_user_friendly_message(error_type, str(error)),
            details=str(error),
            error_code=self.error_code_mapping.get(error_type),
            trace_id=context.get('trace_id')
        )
        
        # 记录错误日志
        self._log_error(error, error_info, context)
        
        # 发送告警(如果需要)
        if self._should_alert(error_type):
            self._send_alert(error_info, context)
        
        return error_info
    
    def _classify_error(self, error: Exception) -> ErrorType:
        """分类错误"""
        error_str = str(error).lower()
        
        if "authentication" in error_str or "unauthorized" in error_str:
            return ErrorType.AUTHENTICATION_ERROR
        elif "permission" in error_str or "forbidden" in error_str:
            return ErrorType.AUTHORIZATION_ERROR
        elif "timeout" in error_str:
            return ErrorType.TIMEOUT_ERROR
        elif "network" in error_str or "connection" in error_str:
            return ErrorType.NETWORK_ERROR
        elif "validation" in error_str or "invalid" in error_str:
            return ErrorType.VALIDATION_ERROR
        elif "rate limit" in error_str:
            return ErrorType.RATE_LIMIT_ERROR
        elif "kylin" in error_str:
            return ErrorType.KYLIN_ERROR
        else:
            return ErrorType.INTERNAL_ERROR
    
    def _get_user_friendly_message(self, error_type: ErrorType, original_message: str) -> str:
        """获取用户友好的错误消息"""
        friendly_messages = {
            ErrorType.VALIDATION_ERROR: "请求参数不正确,请检查输入数据",
            ErrorType.AUTHENTICATION_ERROR: "身份验证失败,请检查用户名和密码",
            ErrorType.AUTHORIZATION_ERROR: "权限不足,无法执行此操作",
            ErrorType.NETWORK_ERROR: "网络连接异常,请稍后重试",
            ErrorType.TIMEOUT_ERROR: "请求超时,请稍后重试",
            ErrorType.KYLIN_ERROR: "Kylin服务异常,请联系管理员",
            ErrorType.RATE_LIMIT_ERROR: "请求过于频繁,请稍后再试",
            ErrorType.INTERNAL_ERROR: "系统内部错误,请联系技术支持"
        }
        
        return friendly_messages.get(error_type, "未知错误")
    
    def _log_error(self, error: Exception, error_info: ErrorInfo, context: Dict[str, Any]):
        """记录错误日志"""
        log_data = {
            'error_type': error_info.error_type.value,
            'error_code': error_info.error_code,
            'message': error_info.message,
            'details': error_info.details,
            'timestamp': error_info.timestamp.isoformat(),
            'trace_id': error_info.trace_id,
            'context': context,
            'traceback': traceback.format_exc()
        }
        
        if error_info.error_type in [ErrorType.INTERNAL_ERROR, ErrorType.KYLIN_ERROR]:
            self.logger.error(f"严重错误: {log_data}")
        else:
            self.logger.warning(f"业务错误: {log_data}")
    
    def _should_alert(self, error_type: ErrorType) -> bool:
        """判断是否需要发送告警"""
        alert_types = {
            ErrorType.INTERNAL_ERROR,
            ErrorType.KYLIN_ERROR,
            ErrorType.NETWORK_ERROR
        }
        return error_type in alert_types
    
    def _send_alert(self, error_info: ErrorInfo, context: Dict[str, Any]):
        """发送告警"""
        try:
            # 这里可以集成告警系统(如钉钉、企业微信、邮件等)
            alert_message = f"""
            🚨 Kylin API 错误告警
            
            错误类型: {error_info.error_type.value}
            错误代码: {error_info.error_code}
            错误消息: {error_info.message}
            发生时间: {error_info.timestamp}
            追踪ID: {error_info.trace_id}
            
            上下文信息: {context}
            """
            
            # 发送告警(示例)
            self.logger.info(f"发送告警: {alert_message}")
            
        except Exception as e:
            self.logger.error(f"发送告警失败: {e}")
    
    def create_error_response(self, error_info: ErrorInfo) -> Dict[str, Any]:
        """创建错误响应"""
        return {
            'success': False,
            'error': {
                'type': error_info.error_type.value,
                'code': error_info.error_code,
                'message': error_info.message,
                'timestamp': error_info.timestamp.isoformat(),
                'trace_id': error_info.trace_id
            },
            'data': None
        }

# 装饰器:自动错误处理
def handle_kylin_errors(error_handler: KylinErrorHandler):
    """错误处理装饰器"""
    def decorator(func):
        def wrapper(*args, **kwargs):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                # 获取上下文信息
                context = {
                    'function': func.__name__,
                    'args': str(args),
                    'kwargs': str(kwargs)
                }
                
                # 处理错误
                error_info = error_handler.handle_error(e, context)
                
                # 返回错误响应
                return error_handler.create_error_response(error_info)
        
        return wrapper
    return decorator

# 使用示例
def main():
    error_handler = KylinErrorHandler()
    
    # 示例1:处理认证错误
    try:
        raise Exception("Authentication failed: Invalid credentials")
    except Exception as e:
        error_info = error_handler.handle_error(e, {'user_id': 'user123'})
        response = error_handler.create_error_response(error_info)
        print(f"认证错误响应: {response}")
    
    # 示例2:使用装饰器
    @handle_kylin_errors(error_handler)
    def risky_function():
        raise Exception("Kylin query timeout")
    
    result = risky_function()
    print(f"装饰器处理结果: {result}")

if __name__ == '__main__':
    main()

9.12 故障排除

9.12.1 常见问题诊断

#!/bin/bash
# kylin_troubleshooting.sh - Kylin集成故障排除脚本

set -e

# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color

# 日志函数
log_info() {
    echo -e "${BLUE}[INFO]${NC} $1"
}

log_warn() {
    echo -e "${YELLOW}[WARN]${NC} $1"
}

log_error() {
    echo -e "${RED}[ERROR]${NC} $1"
}

log_success() {
    echo -e "${GREEN}[SUCCESS]${NC} $1"
}

# 配置
KYLIN_HOST=${KYLIN_HOST:-"localhost"}
KYLIN_PORT=${KYLIN_PORT:-"7070"}
KYLIN_USERNAME=${KYLIN_USERNAME:-"ADMIN"}
KYLIN_PASSWORD=${KYLIN_PASSWORD:-"KYLIN"}

# 检查网络连接
check_network_connectivity() {
    log_info "检查网络连接..."
    
    if ping -c 3 $KYLIN_HOST > /dev/null 2>&1; then
        log_success "网络连接正常"
    else
        log_error "无法连接到Kylin主机: $KYLIN_HOST"
        return 1
    fi
}

# 检查端口可达性
check_port_accessibility() {
    log_info "检查端口可达性..."
    
    if nc -z $KYLIN_HOST $KYLIN_PORT; then
        log_success "端口 $KYLIN_PORT 可达"
    else
        log_error "端口 $KYLIN_PORT 不可达"
        return 1
    fi
}

# 检查Kylin服务状态
check_kylin_service() {
    log_info "检查Kylin服务状态..."
    
    local health_url="http://$KYLIN_HOST:$KYLIN_PORT/kylin/api/admin/public_config"
    
    if curl -s --connect-timeout 10 "$health_url" > /dev/null; then
        log_success "Kylin服务运行正常"
    else
        log_error "Kylin服务不可用"
        return 1
    fi
}

# 测试认证
test_authentication() {
    log_info "测试认证..."
    
    local auth_url="http://$KYLIN_HOST:$KYLIN_PORT/kylin/api/user/authentication"
    local auth_header=$(echo -n "$KYLIN_USERNAME:$KYLIN_PASSWORD" | base64)
    
    local response=$(curl -s -w "%{http_code}" -H "Authorization: Basic $auth_header" -X POST "$auth_url")
    local http_code=${response: -3}
    
    if [ "$http_code" = "200" ]; then
        log_success "认证成功"
    else
        log_error "认证失败,HTTP状态码: $http_code"
        return 1
    fi
}

# 测试项目访问
test_project_access() {
    log_info "测试项目访问..."
    
    local auth_url="http://$KYLIN_HOST:$KYLIN_PORT/kylin/api/user/authentication"
    local projects_url="http://$KYLIN_HOST:$KYLIN_PORT/kylin/api/projects"
    local auth_header=$(echo -n "$KYLIN_USERNAME:$KYLIN_PASSWORD" | base64)
    
    # 获取认证token
    local token_response=$(curl -s -H "Authorization: Basic $auth_header" -X POST "$auth_url")
    local token=$(echo $token_response | grep -o '"[^"]*"' | head -1 | tr -d '"')
    
    if [ -z "$token" ]; then
        log_error "无法获取认证token"
        return 1
    fi
    
    # 测试项目访问
    local projects_response=$(curl -s -w "%{http_code}" -H "Authorization: $token" "$projects_url")
    local http_code=${projects_response: -3}
    
    if [ "$http_code" = "200" ]; then
        log_success "项目访问正常"
    else
        log_error "项目访问失败,HTTP状态码: $http_code"
        return 1
    fi
}

# 测试简单查询
test_simple_query() {
    log_info "测试简单查询..."
    
    local auth_url="http://$KYLIN_HOST:$KYLIN_PORT/kylin/api/user/authentication"
    local query_url="http://$KYLIN_HOST:$KYLIN_PORT/kylin/api/query"
    local auth_header=$(echo -n "$KYLIN_USERNAME:$KYLIN_PASSWORD" | base64)
    
    # 获取认证token
    local token_response=$(curl -s -H "Authorization: Basic $auth_header" -X POST "$auth_url")
    local token=$(echo $token_response | grep -o '"[^"]*"' | head -1 | tr -d '"')
    
    if [ -z "$token" ]; then
        log_error "无法获取认证token"
        return 1
    fi
    
    # 执行简单查询
    local query_data='{"sql":"SELECT 1 as test_column","project":"learn_kylin"}'
    local query_response=$(curl -s -w "%{http_code}" \
        -H "Authorization: $token" \
        -H "Content-Type: application/json" \
        -X POST \
        -d "$query_data" \
        "$query_url")
    
    local http_code=${query_response: -3}
    
    if [ "$http_code" = "200" ]; then
        log_success "查询执行成功"
    else
        log_error "查询执行失败,HTTP状态码: $http_code"
        log_error "响应内容: ${query_response%???}"
        return 1
    fi
}

# 检查JDBC连接
test_jdbc_connection() {
    log_info "测试JDBC连接..."
    
    # 创建临时Java测试文件
    cat > /tmp/KylinJDBCTest.java << 'EOF'
import java.sql.*;

public class KylinJDBCTest {
    public static void main(String[] args) {
        String url = "jdbc:kylin://" + args[0] + ":" + args[1] + "/" + args[2];
        String username = args[3];
        String password = args[4];
        
        try {
            Class.forName("org.apache.kylin.jdbc.Driver");
            Connection conn = DriverManager.getConnection(url, username, password);
            
            Statement stmt = conn.createStatement();
            ResultSet rs = stmt.executeQuery("SELECT 1 as test_column");
            
            if (rs.next()) {
                System.out.println("JDBC连接测试成功: " + rs.getInt(1));
            }
            
            rs.close();
            stmt.close();
            conn.close();
            
        } catch (Exception e) {
            System.err.println("JDBC连接测试失败: " + e.getMessage());
            System.exit(1);
        }
    }
}
EOF
    
    # 编译和运行(需要Kylin JDBC驱动)
    if [ -f "kylin-jdbc-*.jar" ]; then
        javac -cp "kylin-jdbc-*.jar" /tmp/KylinJDBCTest.java
        java -cp ".:kylin-jdbc-*.jar:/tmp" KylinJDBCTest $KYLIN_HOST $KYLIN_PORT "learn_kylin" $KYLIN_USERNAME $KYLIN_PASSWORD
        
        if [ $? -eq 0 ]; then
            log_success "JDBC连接测试成功"
        else
            log_error "JDBC连接测试失败"
            return 1
        fi
    else
        log_warn "未找到Kylin JDBC驱动,跳过JDBC测试"
    fi
    
    # 清理临时文件
    rm -f /tmp/KylinJDBCTest.java /tmp/KylinJDBCTest.class
}

# 检查系统资源
check_system_resources() {
    log_info "检查系统资源..."
    
    # 检查内存使用
    local memory_usage=$(free | grep Mem | awk '{printf "%.1f", $3/$2 * 100.0}')
    log_info "内存使用率: ${memory_usage}%"
    
    if (( $(echo "$memory_usage > 90" | bc -l) )); then
        log_warn "内存使用率过高"
    fi
    
    # 检查磁盘使用
    local disk_usage=$(df / | tail -1 | awk '{print $5}' | sed 's/%//')
    log_info "磁盘使用率: ${disk_usage}%"
    
    if [ $disk_usage -gt 90 ]; then
        log_warn "磁盘使用率过高"
    fi
    
    # 检查CPU负载
    local cpu_load=$(uptime | awk -F'load average:' '{print $2}' | awk '{print $1}' | sed 's/,//')
    log_info "CPU负载: $cpu_load"
}

# 生成诊断报告
generate_diagnostic_report() {
    local report_file="kylin_diagnostic_$(date +%Y%m%d_%H%M%S).txt"
    
    log_info "生成诊断报告: $report_file"
    
    {
        echo "Kylin集成诊断报告"
        echo "生成时间: $(date)"
        echo "=============================="
        echo ""
        
        echo "配置信息:"
        echo "Kylin主机: $KYLIN_HOST"
        echo "Kylin端口: $KYLIN_PORT"
        echo "用户名: $KYLIN_USERNAME"
        echo ""
        
        echo "系统信息:"
        uname -a
        echo ""
        
        echo "网络信息:"
        netstat -an | grep $KYLIN_PORT || echo "端口 $KYLIN_PORT 未监听"
        echo ""
        
        echo "进程信息:"
        ps aux | grep kylin || echo "未找到Kylin进程"
        echo ""
        
        echo "日志信息(最近50行):"
        if [ -f "/opt/kylin/logs/kylin.log" ]; then
            tail -50 /opt/kylin/logs/kylin.log
        else
            echo "未找到Kylin日志文件"
        fi
        
    } > $report_file
    
    log_success "诊断报告已生成: $report_file"
}

# 主函数
main() {
    echo "Kylin集成故障排除工具"
    echo "====================="
    echo ""
    
    local failed_checks=0
    
    # 执行检查
    check_network_connectivity || ((failed_checks++))
    check_port_accessibility || ((failed_checks++))
    check_kylin_service || ((failed_checks++))
    test_authentication || ((failed_checks++))
    test_project_access || ((failed_checks++))
    test_simple_query || ((failed_checks++))
    test_jdbc_connection || ((failed_checks++))
    check_system_resources
    
    echo ""
    echo "检查完成"
    echo "========"
    
    if [ $failed_checks -eq 0 ]; then
        log_success "所有检查都通过了!"
    else
        log_error "有 $failed_checks 项检查失败"
        
        # 生成诊断报告
        generate_diagnostic_report
        
        echo ""
        echo "常见解决方案:"
        echo "1. 检查Kylin服务是否正常启动"
        echo "2. 验证网络连接和防火墙设置"
        echo "3. 确认用户名和密码正确"
        echo "4. 检查项目是否存在且有权限访问"
        echo "5. 查看Kylin日志文件获取详细错误信息"
    fi
}

# 运行主函数
main "$@"

9.12.2 性能问题诊断

#!/usr/bin/env python3
# performance_diagnostics.py - 性能问题诊断

import time
import psutil
import requests
import threading
import statistics
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import pandas as pd

@dataclass
class PerformanceMetric:
    """性能指标"""
    timestamp: datetime
    response_time: float
    cpu_usage: float
    memory_usage: float
    network_io: Dict[str, int]
    disk_io: Dict[str, int]
    concurrent_requests: int

class KylinPerformanceDiagnostics:
    """Kylin性能诊断工具"""
    
    def __init__(self, kylin_config: dict):
        self.kylin_config = kylin_config
        self.metrics: List[PerformanceMetric] = []
        self.is_monitoring = False
        
    def start_monitoring(self, duration_minutes: int = 10):
        """开始性能监控"""
        print(f"开始性能监控,持续时间: {duration_minutes} 分钟")
        
        self.is_monitoring = True
        end_time = datetime.now() + timedelta(minutes=duration_minutes)
        
        while datetime.now() < end_time and self.is_monitoring:
            metric = self._collect_metric()
            self.metrics.append(metric)
            
            print(f"[{metric.timestamp.strftime('%H:%M:%S')}] "
                  f"响应时间: {metric.response_time:.2f}ms, "
                  f"CPU: {metric.cpu_usage:.1f}%, "
                  f"内存: {metric.memory_usage:.1f}%")
            
            time.sleep(30)  # 每30秒收集一次
        
        self.is_monitoring = False
        print("性能监控完成")
    
    def _collect_metric(self) -> PerformanceMetric:
        """收集性能指标"""
        # 测试响应时间
        response_time = self._measure_response_time()
        
        # 系统资源使用
        cpu_usage = psutil.cpu_percent(interval=1)
        memory_usage = psutil.virtual_memory().percent
        
        # 网络IO
        network_io = psutil.net_io_counters()._asdict()
        
        # 磁盘IO
        disk_io = psutil.disk_io_counters()._asdict()
        
        # 并发请求数(模拟)
        concurrent_requests = self._estimate_concurrent_requests()
        
        return PerformanceMetric(
            timestamp=datetime.now(),
            response_time=response_time,
            cpu_usage=cpu_usage,
            memory_usage=memory_usage,
            network_io=network_io,
            disk_io=disk_io,
            concurrent_requests=concurrent_requests
        )
    
    def _measure_response_time(self) -> float:
        """测量响应时间"""
        try:
            start_time = time.time()
            
            # 执行简单查询
            auth_url = f"{self.kylin_config['base_url']}/api/user/authentication"
            auth = (self.kylin_config['username'], self.kylin_config['password'])
            
            response = requests.post(auth_url, auth=auth, timeout=30)
            
            if response.status_code == 200:
                end_time = time.time()
                return (end_time - start_time) * 1000  # 转换为毫秒
            else:
                return -1  # 错误标记
                
        except Exception as e:
            print(f"响应时间测量失败: {e}")
            return -1
    
    def _estimate_concurrent_requests(self) -> int:
        """估算并发请求数"""
        # 这里可以通过监控Kylin日志或连接数来估算
        # 简化实现:基于CPU使用率估算
        cpu_usage = psutil.cpu_percent()
        return max(1, int(cpu_usage / 10))  # 简单估算
    
    def run_load_test(self, concurrent_users: int = 10, duration_minutes: int = 5):
        """运行负载测试"""
        print(f"开始负载测试: {concurrent_users} 并发用户,持续 {duration_minutes} 分钟")
        
        results = []
        threads = []
        
        def worker():
            """工作线程"""
            end_time = datetime.now() + timedelta(minutes=duration_minutes)
            
            while datetime.now() < end_time:
                start_time = time.time()
                
                try:
                    # 执行查询
                    response_time = self._execute_test_query()
                    
                    results.append({
                        'timestamp': datetime.now(),
                        'response_time': response_time,
                        'success': response_time > 0
                    })
                    
                except Exception as e:
                    results.append({
                        'timestamp': datetime.now(),
                        'response_time': -1,
                        'success': False,
                        'error': str(e)
                    })
                
                # 控制请求频率
                time.sleep(1)
        
        # 启动工作线程
        for i in range(concurrent_users):
            thread = threading.Thread(target=worker)
            thread.start()
            threads.append(thread)
        
        # 等待所有线程完成
        for thread in threads:
            thread.join()
        
        # 分析结果
        self._analyze_load_test_results(results)
    
    def _execute_test_query(self) -> float:
        """执行测试查询"""
        try:
            start_time = time.time()
            
            # 认证
            auth_url = f"{self.kylin_config['base_url']}/api/user/authentication"
            auth = (self.kylin_config['username'], self.kylin_config['password'])
            
            auth_response = requests.post(auth_url, auth=auth, timeout=30)
            if auth_response.status_code != 200:
                return -1
            
            token = auth_response.text.strip('"')
            
            # 执行查询
            query_url = f"{self.kylin_config['base_url']}/api/query"
            headers = {
                'Authorization': token,
                'Content-Type': 'application/json'
            }
            
            query_data = {
                'sql': 'SELECT 1 as test_column',
                'project': 'learn_kylin'
            }
            
            query_response = requests.post(query_url, headers=headers, 
                                         json=query_data, timeout=60)
            
            if query_response.status_code == 200:
                end_time = time.time()
                return (end_time - start_time) * 1000
            else:
                return -1
                
        except Exception as e:
            return -1
    
    def _analyze_load_test_results(self, results: List[Dict[str, Any]]):
        """分析负载测试结果"""
        if not results:
            print("没有测试结果")
            return
        
        # 过滤成功的请求
        successful_results = [r for r in results if r['success']]
        response_times = [r['response_time'] for r in successful_results]
        
        if not response_times:
            print("所有请求都失败了")
            return
        
        # 统计分析
        total_requests = len(results)
        successful_requests = len(successful_results)
        success_rate = (successful_requests / total_requests) * 100
        
        avg_response_time = statistics.mean(response_times)
        median_response_time = statistics.median(response_times)
        p95_response_time = sorted(response_times)[int(len(response_times) * 0.95)]
        p99_response_time = sorted(response_times)[int(len(response_times) * 0.99)]
        
        print("\n负载测试结果分析:")
        print("=================")
        print(f"总请求数: {total_requests}")
        print(f"成功请求数: {successful_requests}")
        print(f"成功率: {success_rate:.2f}%")
        print(f"平均响应时间: {avg_response_time:.2f}ms")
        print(f"中位数响应时间: {median_response_time:.2f}ms")
        print(f"95%响应时间: {p95_response_time:.2f}ms")
        print(f"99%响应时间: {p99_response_time:.2f}ms")
        
        # 性能评估
        if avg_response_time < 1000:
            print("✅ 性能良好")
        elif avg_response_time < 3000:
            print("⚠️ 性能一般")
        else:
            print("❌ 性能较差")
    
    def generate_performance_report(self):
        """生成性能报告"""
        if not self.metrics:
            print("没有性能数据")
            return
        
        # 创建DataFrame
        data = []
        for metric in self.metrics:
            data.append({
                'timestamp': metric.timestamp,
                'response_time': metric.response_time,
                'cpu_usage': metric.cpu_usage,
                'memory_usage': metric.memory_usage,
                'concurrent_requests': metric.concurrent_requests
            })
        
        df = pd.DataFrame(data)
        
        # 生成图表
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        fig.suptitle('Kylin性能监控报告', fontsize=16)
        
        # 响应时间趋势
        axes[0, 0].plot(df['timestamp'], df['response_time'])
        axes[0, 0].set_title('响应时间趋势')
        axes[0, 0].set_ylabel('响应时间 (ms)')
        axes[0, 0].tick_params(axis='x', rotation=45)
        
        # CPU使用率
        axes[0, 1].plot(df['timestamp'], df['cpu_usage'], color='red')
        axes[0, 1].set_title('CPU使用率')
        axes[0, 1].set_ylabel('CPU使用率 (%)')
        axes[0, 1].tick_params(axis='x', rotation=45)
        
        # 内存使用率
        axes[1, 0].plot(df['timestamp'], df['memory_usage'], color='green')
        axes[1, 0].set_title('内存使用率')
        axes[1, 0].set_ylabel('内存使用率 (%)')
        axes[1, 0].tick_params(axis='x', rotation=45)
        
        # 并发请求数
        axes[1, 1].plot(df['timestamp'], df['concurrent_requests'], color='orange')
        axes[1, 1].set_title('并发请求数')
        axes[1, 1].set_ylabel('并发请求数')
        axes[1, 1].tick_params(axis='x', rotation=45)
        
        plt.tight_layout()
        
        # 保存图表
        report_filename = f'kylin_performance_report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.png'
        plt.savefig(report_filename, dpi=300, bbox_inches='tight')
        print(f"性能报告已保存: {report_filename}")
        
        # 显示统计摘要
        print("\n性能统计摘要:")
        print("============")
        print(f"监控时长: {len(self.metrics)} 个数据点")
        print(f"平均响应时间: {df['response_time'].mean():.2f}ms")
        print(f"最大响应时间: {df['response_time'].max():.2f}ms")
        print(f"平均CPU使用率: {df['cpu_usage'].mean():.1f}%")
        print(f"平均内存使用率: {df['memory_usage'].mean():.1f}%")

# 使用示例
def main():
    config = {
        'base_url': 'http://localhost:7070/kylin',
        'username': 'ADMIN',
        'password': 'KYLIN'
    }
    
    diagnostics = KylinPerformanceDiagnostics(config)
    
    print("Kylin性能诊断工具")
    print("=================")
    
    # 选择操作
    while True:
        print("\n请选择操作:")
        print("1. 开始性能监控")
        print("2. 运行负载测试")
        print("3. 生成性能报告")
        print("4. 退出")
        
        choice = input("请输入选择 (1-4): ")
        
        if choice == '1':
            duration = int(input("监控时长(分钟,默认10): ") or 10)
            diagnostics.start_monitoring(duration)
        
        elif choice == '2':
            users = int(input("并发用户数(默认10): ") or 10)
            duration = int(input("测试时长(分钟,默认5): ") or 5)
            diagnostics.run_load_test(users, duration)
        
        elif choice == '3':
            diagnostics.generate_performance_report()
        
        elif choice == '4':
            break
        
        else:
            print("无效选择,请重试")

if __name__ == '__main__':
    main()

9.13 本章小结

本章详细介绍了Apache Kylin的系统集成与API开发,涵盖了以下核心内容:

9.13.1 核心知识点

  1. 集成架构设计

    • 多层架构模式
    • 数据流设计
    • 接口标准化
  2. REST API集成

    • API客户端开发
    • 认证与授权
    • 错误处理机制
  3. JDBC集成

    • 连接池配置
    • 查询优化
    • 事务管理
  4. 微服务集成

    • Spring Boot集成
    • 容器化部署
    • 服务发现
  5. 安全认证

    • OAuth2集成
    • API密钥管理
    • 权限控制
  6. 性能优化

    • 连接池优化
    • 查询缓存
    • 负载均衡

9.13.2 实用工具

本章提供了多个实用工具和脚本:

  • API客户端库:Python和Java版本
  • 缓存优化器:Redis缓存管理
  • 性能诊断工具:监控和分析
  • 故障排除脚本:自动化诊断
  • 最佳实践示例:标准化开发

9.13.3 最佳实践

  1. API设计

    • 统一响应格式
    • 版本控制策略
    • 文档自动生成
  2. 错误处理

    • 分类错误管理
    • 友好错误消息
    • 告警机制
  3. 性能优化

    • 连接复用
    • 查询缓存
    • 异步处理
  4. 安全防护

    • 认证授权
    • 速率限制
    • 输入验证

9.13.4 下一章预告

下一章将介绍”Kylin高级特性与扩展”,包括: - 自定义函数开发 - 插件机制 - 扩展存储引擎 - 高级调优技巧

9.13.5 练习与思考

理论练习: 1. 设计一个完整的Kylin集成架构 2. 分析不同集成方式的优缺点 3. 制定API安全策略

实践练习: 1. 开发一个完整的Kylin客户端应用 2. 实现查询缓存优化 3. 搭建微服务集成环境

思考题: 1. 如何设计高可用的Kylin集成架构? 2. 在大并发场景下如何优化性能? 3. 如何实现跨系统的数据一致性?

通过本章的学习,您应该能够: - 设计和实现完整的Kylin集成方案 - 开发高质量的API客户端 - 优化系统性能和安全性 - 排查和解决常见问题

9.3 JDBC集成

9.3.1 JDBC驱动配置

Kylin提供了标准的JDBC驱动,支持各种Java应用和BI工具的集成。

Maven依赖

<!-- pom.xml -->
<dependency>
    <groupId>org.apache.kylin</groupId>
    <artifactId>kylin-jdbc</artifactId>
    <version>4.0.3</version>
</dependency>

连接配置

// KylinJDBCExample.java
package com.example.kylin;

import java.sql.*;
import java.util.Properties;
import java.util.List;
import java.util.ArrayList;

public class KylinJDBCClient {
    
    private static final String DRIVER_CLASS = "org.apache.kylin.jdbc.Driver";
    private Connection connection;
    
    /**
     * 连接配置类
     */
    public static class ConnectionConfig {
        private String host = "localhost";
        private int port = 7070;
        private String project;
        private String username = "ADMIN";
        private String password = "KYLIN";
        private boolean useSSL = false;
        
        // Getters and Setters
        public String getHost() { return host; }
        public void setHost(String host) { this.host = host; }
        
        public int getPort() { return port; }
        public void setPort(int port) { this.port = port; }
        
        public String getProject() { return project; }
        public void setProject(String project) { this.project = project; }
        
        public String getUsername() { return username; }
        public void setUsername(String username) { this.username = username; }
        
        public String getPassword() { return password; }
        public void setPassword(String password) { this.password = password; }
        
        public boolean isUseSSL() { return useSSL; }
        public void setUseSSL(boolean useSSL) { this.useSSL = useSSL; }
        
        public String getJdbcUrl() {
            String protocol = useSSL ? "https" : "http";
            return String.format("jdbc:kylin://%s:%d/%s", host, port, project);
        }
    }
    
    /**
     * 建立连接
     */
    public void connect(ConnectionConfig config) throws SQLException {
        try {
            // 加载驱动
            Class.forName(DRIVER_CLASS);
            
            // 设置连接属性
            Properties props = new Properties();
            props.setProperty("user", config.getUsername());
            props.setProperty("password", config.getPassword());
            props.setProperty("ssl", String.valueOf(config.isUseSSL()));
            
            // 建立连接
            String jdbcUrl = config.getJdbcUrl();
            connection = DriverManager.getConnection(jdbcUrl, props);
            
            System.out.println("连接成功: " + jdbcUrl);
            
        } catch (ClassNotFoundException e) {
            throw new SQLException("JDBC驱动未找到", e);
        }
    }
    
    /**
     * 执行查询
     */
    public QueryResult executeQuery(String sql) throws SQLException {
        if (connection == null) {
            throw new SQLException("连接未建立");
        }
        
        try (Statement statement = connection.createStatement();
             ResultSet resultSet = statement.executeQuery(sql)) {
            
            return new QueryResult(resultSet);
        }
    }
    
    /**
     * 执行预编译查询
     */
    public QueryResult executePreparedQuery(String sql, Object... parameters) throws SQLException {
        if (connection == null) {
            throw new SQLException("连接未建立");
        }
        
        try (PreparedStatement statement = connection.prepareStatement(sql)) {
            
            // 设置参数
            for (int i = 0; i < parameters.length; i++) {
                statement.setObject(i + 1, parameters[i]);
            }
            
            try (ResultSet resultSet = statement.executeQuery()) {
                return new QueryResult(resultSet);
            }
        }
    }
    
    /**
     * 获取表元数据
     */
    public List<TableInfo> getTables() throws SQLException {
        if (connection == null) {
            throw new SQLException("连接未建立");
        }
        
        List<TableInfo> tables = new ArrayList<>();
        
        try {
            DatabaseMetaData metaData = connection.getMetaData();
            ResultSet resultSet = metaData.getTables(null, null, "%", new String[]{"TABLE"});
            
            while (resultSet.next()) {
                TableInfo table = new TableInfo();
                table.setName(resultSet.getString("TABLE_NAME"));
                table.setSchema(resultSet.getString("TABLE_SCHEM"));
                table.setType(resultSet.getString("TABLE_TYPE"));
                table.setRemarks(resultSet.getString("REMARKS"));
                
                tables.add(table);
            }
            
        } catch (SQLException e) {
            throw new SQLException("获取表信息失败", e);
        }
        
        return tables;
    }
    
    /**
     * 获取列元数据
     */
    public List<ColumnInfo> getColumns(String tableName) throws SQLException {
        if (connection == null) {
            throw new SQLException("连接未建立");
        }
        
        List<ColumnInfo> columns = new ArrayList<>();
        
        try {
            DatabaseMetaData metaData = connection.getMetaData();
            ResultSet resultSet = metaData.getColumns(null, null, tableName, "%");
            
            while (resultSet.next()) {
                ColumnInfo column = new ColumnInfo();
                column.setName(resultSet.getString("COLUMN_NAME"));
                column.setType(resultSet.getString("TYPE_NAME"));
                column.setSize(resultSet.getInt("COLUMN_SIZE"));
                column.setNullable(resultSet.getBoolean("NULLABLE"));
                column.setRemarks(resultSet.getString("REMARKS"));
                
                columns.add(column);
            }
            
        } catch (SQLException e) {
            throw new SQLException("获取列信息失败", e);
        }
        
        return columns;
    }
    
    /**
     * 关闭连接
     */
    public void close() {
        if (connection != null) {
            try {
                connection.close();
                System.out.println("连接已关闭");
            } catch (SQLException e) {
                System.err.println("关闭连接失败: " + e.getMessage());
            }
        }
    }
    
    /**
     * 查询结果类
     */
    public static class QueryResult {
        private List<String> columnNames;
        private List<List<Object>> rows;
        private int rowCount;
        
        public QueryResult(ResultSet resultSet) throws SQLException {
            columnNames = new ArrayList<>();
            rows = new ArrayList<>();
            
            // 获取列信息
            ResultSetMetaData metaData = resultSet.getMetaData();
            int columnCount = metaData.getColumnCount();
            
            for (int i = 1; i <= columnCount; i++) {
                columnNames.add(metaData.getColumnName(i));
            }
            
            // 获取数据行
            while (resultSet.next()) {
                List<Object> row = new ArrayList<>();
                for (int i = 1; i <= columnCount; i++) {
                    row.add(resultSet.getObject(i));
                }
                rows.add(row);
            }
            
            rowCount = rows.size();
        }
        
        // Getters
        public List<String> getColumnNames() { return columnNames; }
        public List<List<Object>> getRows() { return rows; }
        public int getRowCount() { return rowCount; }
        
        public void printResult() {
            // 打印列名
            System.out.println(String.join("\t", columnNames));
            System.out.println("-".repeat(columnNames.size() * 15));
            
            // 打印数据行
            for (List<Object> row : rows) {
                List<String> stringRow = new ArrayList<>();
                for (Object value : row) {
                    stringRow.add(value != null ? value.toString() : "NULL");
                }
                System.out.println(String.join("\t", stringRow));
            }
            
            System.out.println("\n总计: " + rowCount + " 行");
        }
    }
    
    /**
     * 表信息类
     */
    public static class TableInfo {
        private String name;
        private String schema;
        private String type;
        private String remarks;
        
        // Getters and Setters
        public String getName() { return name; }
        public void setName(String name) { this.name = name; }
        
        public String getSchema() { return schema; }
        public void setSchema(String schema) { this.schema = schema; }
        
        public String getType() { return type; }
        public void setType(String type) { this.type = type; }
        
        public String getRemarks() { return remarks; }
        public void setRemarks(String remarks) { this.remarks = remarks; }
        
        @Override
        public String toString() {
            return String.format("Table{name='%s', schema='%s', type='%s'}", 
                               name, schema, type);
        }
    }
    
    /**
     * 列信息类
     */
    public static class ColumnInfo {
        private String name;
        private String type;
        private int size;
        private boolean nullable;
        private String remarks;
        
        // Getters and Setters
        public String getName() { return name; }
        public void setName(String name) { this.name = name; }
        
        public String getType() { return type; }
        public void setType(String type) { this.type = type; }
        
        public int getSize() { return size; }
        public void setSize(int size) { this.size = size; }
        
        public boolean isNullable() { return nullable; }
        public void setNullable(boolean nullable) { this.nullable = nullable; }
        
        public String getRemarks() { return remarks; }
        public void setRemarks(String remarks) { this.remarks = remarks; }
        
        @Override
        public String toString() {
            return String.format("Column{name='%s', type='%s', size=%d, nullable=%s}", 
                               name, type, size, nullable);
        }
    }
}

9.3.2 连接池配置

// KylinConnectionPool.java
package com.example.kylin;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;

public class KylinConnectionPool {
    
    private HikariDataSource dataSource;
    
    /**
     * 初始化连接池
     */
    public void initialize(ConnectionConfig config) {
        HikariConfig hikariConfig = new HikariConfig();
        
        // 基本配置
        hikariConfig.setJdbcUrl(config.getJdbcUrl());
        hikariConfig.setUsername(config.getUsername());
        hikariConfig.setPassword(config.getPassword());
        hikariConfig.setDriverClassName("org.apache.kylin.jdbc.Driver");
        
        // 连接池配置
        hikariConfig.setMaximumPoolSize(20);  // 最大连接数
        hikariConfig.setMinimumIdle(5);       // 最小空闲连接数
        hikariConfig.setConnectionTimeout(30000);  // 连接超时(30秒)
        hikariConfig.setIdleTimeout(600000);       // 空闲超时(10分钟)
        hikariConfig.setMaxLifetime(1800000);      // 最大生命周期(30分钟)
        
        // 连接测试
        hikariConfig.setConnectionTestQuery("SELECT 1");
        hikariConfig.setValidationTimeout(5000);
        
        // 创建数据源
        dataSource = new HikariDataSource(hikariConfig);
        
        System.out.println("连接池初始化完成");
    }
    
    /**
     * 获取连接
     */
    public Connection getConnection() throws SQLException {
        if (dataSource == null) {
            throw new SQLException("连接池未初始化");
        }
        return dataSource.getConnection();
    }
    
    /**
     * 获取数据源
     */
    public DataSource getDataSource() {
        return dataSource;
    }
    
    /**
     * 关闭连接池
     */
    public void close() {
        if (dataSource != null) {
            dataSource.close();
            System.out.println("连接池已关闭");
        }
    }
    
    /**
     * 获取连接池状态
     */
    public void printPoolStatus() {
        if (dataSource != null) {
            System.out.println("=== 连接池状态 ===");
            System.out.println("活跃连接数: " + dataSource.getHikariPoolMXBean().getActiveConnections());
            System.out.println("空闲连接数: " + dataSource.getHikariPoolMXBean().getIdleConnections());
            System.out.println("总连接数: " + dataSource.getHikariPoolMXBean().getTotalConnections());
            System.out.println("等待连接数: " + dataSource.getHikariPoolMXBean().getThreadsAwaitingConnection());
        }
    }
}

9.3.3 JDBC使用示例

// KylinJDBCExample.java
package com.example.kylin;

import java.sql.SQLException;
import java.util.List;

public class KylinJDBCExample {
    
    public static void main(String[] args) {
        // 创建连接配置
        KylinJDBCClient.ConnectionConfig config = new KylinJDBCClient.ConnectionConfig();
        config.setHost("localhost");
        config.setPort(7070);
        config.setProject("learn_kylin");
        config.setUsername("ADMIN");
        config.setPassword("KYLIN");
        
        KylinJDBCClient client = new KylinJDBCClient();
        
        try {
            // 建立连接
            client.connect(config);
            
            // 示例1: 简单查询
            System.out.println("=== 示例1: 简单查询 ===");
            KylinJDBCClient.QueryResult result1 = client.executeQuery(
                "SELECT PART_DT, SUM(PRICE) as TOTAL_PRICE " +
                "FROM KYLIN_SALES " +
                "WHERE PART_DT >= '2012-01-01' " +
                "GROUP BY PART_DT " +
                "ORDER BY PART_DT " +
                "LIMIT 10"
            );
            result1.printResult();
            
            // 示例2: 预编译查询
            System.out.println("\n=== 示例2: 预编译查询 ===");
            KylinJDBCClient.QueryResult result2 = client.executePreparedQuery(
                "SELECT LSTG_FORMAT_NAME, COUNT(*) as ORDER_COUNT " +
                "FROM KYLIN_SALES " +
                "WHERE PART_DT >= ? AND PART_DT < ? " +
                "GROUP BY LSTG_FORMAT_NAME " +
                "ORDER BY ORDER_COUNT DESC",
                "2012-01-01", "2012-02-01"
            );
            result2.printResult();
            
            // 示例3: 获取表信息
            System.out.println("\n=== 示例3: 表信息 ===");
            List<KylinJDBCClient.TableInfo> tables = client.getTables();
            for (KylinJDBCClient.TableInfo table : tables) {
                System.out.println(table);
            }
            
            // 示例4: 获取列信息
            if (!tables.isEmpty()) {
                String tableName = tables.get(0).getName();
                System.out.println("\n=== 示例4: " + tableName + " 列信息 ===");
                List<KylinJDBCClient.ColumnInfo> columns = client.getColumns(tableName);
                for (KylinJDBCClient.ColumnInfo column : columns) {
                    System.out.println(column);
                }
            }
            
        } catch (SQLException e) {
            System.err.println("数据库操作失败: " + e.getMessage());
            e.printStackTrace();
        } finally {
            client.close();
        }
    }
}

9.3.4 Spring Boot集成

// KylinDataSourceConfig.java
package com.example.config;

import com.zaxxer.hikari.HikariDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;

import javax.sql.DataSource;

@Configuration
public class KylinDataSourceConfig {
    
    @Bean
    @ConfigurationProperties(prefix = "kylin.datasource")
    public DataSource kylinDataSource() {
        HikariDataSource dataSource = new HikariDataSource();
        dataSource.setDriverClassName("org.apache.kylin.jdbc.Driver");
        return dataSource;
    }
    
    @Bean
    public JdbcTemplate kylinJdbcTemplate(DataSource kylinDataSource) {
        return new JdbcTemplate(kylinDataSource);
    }
}
# application.yml
kylin:
  datasource:
    jdbc-url: jdbc:kylin://localhost:7070/learn_kylin
    username: ADMIN
    password: KYLIN
    maximum-pool-size: 20
    minimum-idle: 5
    connection-timeout: 30000
    idle-timeout: 600000
    max-lifetime: 1800000
// KylinService.java
package com.example.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Map;

@Service
public class KylinService {
    
    @Autowired
    private JdbcTemplate kylinJdbcTemplate;
    
    /**
     * 执行查询
     */
    public List<Map<String, Object>> executeQuery(String sql, Object... params) {
        return kylinJdbcTemplate.queryForList(sql, params);
    }
    
    /**
     * 获取销售统计
     */
    public List<Map<String, Object>> getSalesStatistics(String startDate, String endDate) {
        String sql = "SELECT PART_DT, SUM(PRICE) as TOTAL_PRICE, COUNT(*) as ORDER_COUNT " +
                    "FROM KYLIN_SALES " +
                    "WHERE PART_DT >= ? AND PART_DT < ? " +
                    "GROUP BY PART_DT " +
                    "ORDER BY PART_DT";
        
        return kylinJdbcTemplate.queryForList(sql, startDate, endDate);
    }
    
    /**
     * 获取产品销售排行
     */
    public List<Map<String, Object>> getTopProducts(int limit) {
        String sql = "SELECT LSTG_FORMAT_NAME, SUM(PRICE) as TOTAL_SALES " +
                    "FROM KYLIN_SALES " +
                    "GROUP BY LSTG_FORMAT_NAME " +
                    "ORDER BY TOTAL_SALES DESC " +
                    "LIMIT ?";
        
        return kylinJdbcTemplate.queryForList(sql, limit);
    }
}

9.4 BI工具集成

9.4.1 Tableau集成

Tableau是业界领先的数据可视化工具,可以通过ODBC或JDBC连接Kylin。

ODBC连接配置

  1. 下载ODBC驱动

    • 从Apache Kylin官网下载ODBC驱动
    • 安装驱动到系统
  2. 配置DSN

# kylin_odbc.ini
[ODBC Data Sources]
Kylin = Apache Kylin ODBC Driver

[Kylin]
Driver = /usr/local/lib/libkylinodbc.so
Description = Apache Kylin ODBC Connection
Server = localhost
Port = 7070
Project = learn_kylin
UID = ADMIN
PWD = KYLIN
SSL = 0
  1. Tableau连接步骤
-- 在Tableau中创建自定义SQL连接
SELECT 
    PART_DT as "日期",
    LSTG_FORMAT_NAME as "产品类别",
    SUM(PRICE) as "销售额",
    COUNT(*) as "订单数量",
    AVG(PRICE) as "平均订单金额"
FROM KYLIN_SALES 
WHERE PART_DT >= '2012-01-01'
GROUP BY PART_DT, LSTG_FORMAT_NAME
ORDER BY PART_DT, LSTG_FORMAT_NAME

9.4.2 Power BI集成

Microsoft Power BI可以通过ODBC连接器连接Kylin。

连接配置

{
  "version": "0.1",
  "connections": [
    {
      "details": {
        "protocol": "odbc",
        "address": {
          "server": "localhost:7070",
          "database": "learn_kylin"
        },
        "authentication": {
          "kind": "UsernamePassword",
          "username": "ADMIN",
          "password": "KYLIN"
        }
      }
    }
  ]
}

Power Query M语言示例

let
    Source = Odbc.DataSource("dsn=Kylin", [
        HierarchicalNavigation = true,
        SqlCompatibleWindowsAuth = true
    ]),
    Database = Source{[Name="learn_kylin"]}[Data],
    SalesData = Database{[Name="KYLIN_SALES"]}[Data],
    FilteredRows = Table.SelectRows(SalesData, each [PART_DT] >= #date(2012, 1, 1)),
    GroupedData = Table.Group(FilteredRows, {"PART_DT", "LSTG_FORMAT_NAME"}, {
        {"销售额", each List.Sum([PRICE]), type number},
        {"订单数", each Table.RowCount(_), type number}
    })
in
    GroupedData

9.4.3 Grafana集成

Grafana可以通过MySQL数据源插件连接Kylin(因为Kylin兼容MySQL协议)。

数据源配置

{
  "name": "Kylin",
  "type": "mysql",
  "url": "localhost:7070",
  "database": "learn_kylin",
  "user": "ADMIN",
  "password": "KYLIN",
  "jsonData": {
    "maxOpenConns": 10,
    "maxIdleConns": 2,
    "connMaxLifetime": 14400
  }
}

仪表板查询示例

-- 销售趋势图
SELECT 
  UNIX_TIMESTAMP(STR_TO_DATE(PART_DT, '%Y-%m-%d')) * 1000 as time_msec,
  SUM(PRICE) as value,
  'sales' as metric
FROM KYLIN_SALES 
WHERE PART_DT >= '$__timeFrom()' AND PART_DT <= '$__timeTo()'
GROUP BY PART_DT
ORDER BY PART_DT

-- 产品销售排行
SELECT 
  LSTG_FORMAT_NAME as product,
  SUM(PRICE) as sales
FROM KYLIN_SALES 
WHERE PART_DT >= '$__timeFrom()' AND PART_DT <= '$__timeTo()'
GROUP BY LSTG_FORMAT_NAME
ORDER BY sales DESC
LIMIT 10

9.4.4 自定义BI应用

<!-- kylin_dashboard.html -->
<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Kylin数据仪表板</title>
    <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
    <script src="https://cdn.jsdelivr.net/npm/axios/dist/axios.min.js"></script>
    <style>
        body {
            font-family: Arial, sans-serif;
            margin: 0;
            padding: 20px;
            background-color: #f5f5f5;
        }
        .dashboard {
            display: grid;
            grid-template-columns: repeat(auto-fit, minmax(400px, 1fr));
            gap: 20px;
        }
        .chart-container {
            background: white;
            padding: 20px;
            border-radius: 8px;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        }
        .chart-title {
            font-size: 18px;
            font-weight: bold;
            margin-bottom: 15px;
            color: #333;
        }
        .loading {
            text-align: center;
            color: #666;
        }
        .error {
            color: #e74c3c;
            text-align: center;
        }
    </style>
</head>
<body>
    <h1>Kylin数据仪表板</h1>
    
    <div class="dashboard">
        <div class="chart-container">
            <div class="chart-title">销售趋势</div>
            <canvas id="salesTrendChart"></canvas>
        </div>
        
        <div class="chart-container">
            <div class="chart-title">产品销售排行</div>
            <canvas id="productRankingChart"></canvas>
        </div>
        
        <div class="chart-container">
            <div class="chart-title">地区销售分布</div>
            <canvas id="regionChart"></canvas>
        </div>
        
        <div class="chart-container">
            <div class="chart-title">实时指标</div>
            <div id="metricsContainer">
                <div class="metric">
                    <div class="metric-value" id="totalSales">-</div>
                    <div class="metric-label">总销售额</div>
                </div>
                <div class="metric">
                    <div class="metric-value" id="totalOrders">-</div>
                    <div class="metric-label">总订单数</div>
                </div>
                <div class="metric">
                    <div class="metric-value" id="avgOrderValue">-</div>
                    <div class="metric-label">平均订单金额</div>
                </div>
            </div>
        </div>
    </div>
    
    <script>
        // Kylin API配置
        const KYLIN_CONFIG = {
            baseUrl: 'http://localhost:7070/kylin/api',
            project: 'learn_kylin',
            username: 'ADMIN',
            password: 'KYLIN'
        };
        
        // API客户端类
        class KylinDashboardAPI {
            constructor(config) {
                this.config = config;
                this.authHeader = 'Basic ' + btoa(config.username + ':' + config.password);
            }
            
            async executeQuery(sql) {
                try {
                    const response = await axios.post(`${this.config.baseUrl}/query`, {
                        sql: sql,
                        project: this.config.project,
                        limit: 50000
                    }, {
                        headers: {
                            'Authorization': this.authHeader,
                            'Content-Type': 'application/json'
                        }
                    });
                    
                    return response.data;
                } catch (error) {
                    console.error('查询失败:', error);
                    throw error;
                }
            }
        }
        
        // 图表管理器
        class ChartManager {
            constructor(api) {
                this.api = api;
                this.charts = {};
            }
            
            // 创建销售趋势图
            async createSalesTrendChart() {
                const sql = `
                    SELECT PART_DT, SUM(PRICE) as TOTAL_SALES
                    FROM KYLIN_SALES 
                    WHERE PART_DT >= '2012-01-01' AND PART_DT < '2013-01-01'
                    GROUP BY PART_DT
                    ORDER BY PART_DT
                `;
                
                try {
                    const result = await this.api.executeQuery(sql);
                    const labels = result.results.map(row => row[0]);
                    const data = result.results.map(row => parseFloat(row[1]));
                    
                    const ctx = document.getElementById('salesTrendChart').getContext('2d');
                    this.charts.salesTrend = new Chart(ctx, {
                        type: 'line',
                        data: {
                            labels: labels,
                            datasets: [{
                                label: '销售额',
                                data: data,
                                borderColor: 'rgb(75, 192, 192)',
                                backgroundColor: 'rgba(75, 192, 192, 0.2)',
                                tension: 0.1
                            }]
                        },
                        options: {
                            responsive: true,
                            scales: {
                                y: {
                                    beginAtZero: true,
                                    ticks: {
                                        callback: function(value) {
                                            return '¥' + value.toLocaleString();
                                        }
                                    }
                                }
                            }
                        }
                    });
                } catch (error) {
                    this.showError('salesTrendChart', '加载销售趋势数据失败');
                }
            }
            
            // 创建产品排行图
            async createProductRankingChart() {
                const sql = `
                    SELECT LSTG_FORMAT_NAME, SUM(PRICE) as TOTAL_SALES
                    FROM KYLIN_SALES 
                    GROUP BY LSTG_FORMAT_NAME
                    ORDER BY TOTAL_SALES DESC
                    LIMIT 10
                `;
                
                try {
                    const result = await this.api.executeQuery(sql);
                    const labels = result.results.map(row => row[0]);
                    const data = result.results.map(row => parseFloat(row[1]));
                    
                    const ctx = document.getElementById('productRankingChart').getContext('2d');
                    this.charts.productRanking = new Chart(ctx, {
                        type: 'bar',
                        data: {
                            labels: labels,
                            datasets: [{
                                label: '销售额',
                                data: data,
                                backgroundColor: [
                                    'rgba(255, 99, 132, 0.8)',
                                    'rgba(54, 162, 235, 0.8)',
                                    'rgba(255, 205, 86, 0.8)',
                                    'rgba(75, 192, 192, 0.8)',
                                    'rgba(153, 102, 255, 0.8)',
                                    'rgba(255, 159, 64, 0.8)',
                                    'rgba(199, 199, 199, 0.8)',
                                    'rgba(83, 102, 255, 0.8)',
                                    'rgba(255, 99, 255, 0.8)',
                                    'rgba(99, 255, 132, 0.8)'
                                ]
                            }]
                        },
                        options: {
                            responsive: true,
                            scales: {
                                y: {
                                    beginAtZero: true,
                                    ticks: {
                                        callback: function(value) {
                                            return '¥' + value.toLocaleString();
                                        }
                                    }
                                }
                            }
                        }
                    });
                } catch (error) {
                    this.showError('productRankingChart', '加载产品排行数据失败');
                }
            }
            
            // 显示错误信息
            showError(chartId, message) {
                const canvas = document.getElementById(chartId);
                const container = canvas.parentElement;
                container.innerHTML = `<div class="error">${message}</div>`;
            }
        }
        
        // 初始化仪表板
        async function initDashboard() {
            const api = new KylinDashboardAPI(KYLIN_CONFIG);
            const chartManager = new ChartManager(api);
            
            try {
                await Promise.all([
                    chartManager.createSalesTrendChart(),
                    chartManager.createProductRankingChart()
                ]);
            } catch (error) {
                console.error('初始化仪表板失败:', error);
            }
        }
        
        // 页面加载完成后初始化
        document.addEventListener('DOMContentLoaded', initDashboard);
    </script>
    
    <style>
        .metric {
            text-align: center;
            padding: 15px;
            background: #f8f9fa;
            border-radius: 6px;
            margin: 10px;
        }
        
        .metric-value {
            font-size: 24px;
            font-weight: bold;
            color: #2c3e50;
            margin-bottom: 5px;
        }
        
        .metric-label {
            font-size: 14px;
            color: #7f8c8d;
        }
        
        #metricsContainer {
            display: grid;
            grid-template-columns: repeat(auto-fit, minmax(150px, 1fr));
            gap: 10px;
        }
    </style>
</body>
</html>

## 9.5 Python SDK集成

### 9.5.1 Python SDK安装

```bash
# 安装依赖
pip install requests pandas numpy matplotlib

9.5.2 Python SDK实现

#!/usr/bin/env python3
# kylin_python_sdk.py - Kylin Python SDK

import requests
import pandas as pd
import json
from typing import Dict, List, Optional, Any
from datetime import datetime
import logging
import time

class KylinPythonSDK:
    """Kylin Python SDK"""
    
    def __init__(self, host: str = "localhost", port: int = 7070,
                 username: str = "ADMIN", password: str = "KYLIN",
                 use_ssl: bool = False, timeout: int = 300):
        """
        初始化Kylin Python SDK
        
        Args:
            host: Kylin服务器地址
            port: Kylin服务器端口
            username: 用户名
            password: 密码
            use_ssl: 是否使用HTTPS
            timeout: 请求超时时间(秒)
        """
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.timeout = timeout
        
        protocol = "https" if use_ssl else "http"
        self.base_url = f"{protocol}://{host}:{port}/kylin/api"
        
        # 设置认证
        self.auth = (username, password)
        
        # 设置会话
        self.session = requests.Session()
        self.session.auth = self.auth
        self.session.headers.update({
            'Content-Type': 'application/json',
            'Accept': 'application/json'
        })
        
        # 设置日志
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
        
        # 测试连接
        self._test_connection()
    
    def _test_connection(self) -> bool:
        """测试连接"""
        try:
            response = self.session.get(f"{self.base_url}/admin/version", timeout=10)
            if response.status_code == 200:
                version = response.json().get('version', 'unknown')
                self.logger.info(f"连接成功,Kylin版本: {version}")
                return True
            else:
                self.logger.error(f"连接失败: HTTP {response.status_code}")
                return False
        except Exception as e:
            self.logger.error(f"连接异常: {e}")
            return False
    
    def query(self, sql: str, project: str, limit: int = 50000, 
             offset: int = 0, accept_partial: bool = False) -> pd.DataFrame:
        """执行SQL查询并返回DataFrame"""
        data = {
            "sql": sql,
            "project": project,
            "limit": limit,
            "offset": offset,
            "acceptPartial": accept_partial
        }
        
        try:
            response = self.session.post(f"{self.base_url}/query", 
                                       json=data, timeout=self.timeout)
            
            if response.status_code == 200:
                result = response.json()
                
                # 提取列名和数据
                columns = [col['name'] for col in result['columnMetas']]
                rows = result['results']
                
                # 创建DataFrame
                df = pd.DataFrame(rows, columns=columns)
                
                # 数据类型转换
                for i, col_meta in enumerate(result['columnMetas']):
                    col_name = col_meta['name']
                    col_type = col_meta['typeName']
                    
                    if col_type in ['INTEGER', 'BIGINT', 'SMALLINT']:
                        df[col_name] = pd.to_numeric(df[col_name], errors='coerce')
                    elif col_type in ['DECIMAL', 'DOUBLE', 'FLOAT']:
                        df[col_name] = pd.to_numeric(df[col_name], errors='coerce')
                    elif col_type in ['DATE', 'TIMESTAMP']:
                        df[col_name] = pd.to_datetime(df[col_name], errors='coerce')
                
                self.logger.info(f"查询成功,返回 {len(df)} 行数据")
                return df
                
            else:
                raise Exception(f"查询失败: HTTP {response.status_code} - {response.text}")
                
        except Exception as e:
            self.logger.error(f"查询异常: {e}")
            raise
    
    def query_async(self, sql: str, project: str, callback=None) -> str:
        """异步执行查询"""
        data = {
            "sql": sql,
            "project": project,
            "acceptPartial": False
        }
        
        try:
            response = self.session.post(f"{self.base_url}/query/async", 
                                       json=data, timeout=30)
            
            if response.status_code == 200:
                result = response.json()
                query_id = result.get('queryId')
                
                if callback:
                    # 启动后台线程监控查询状态
                    import threading
                    thread = threading.Thread(target=self._monitor_async_query, 
                                            args=(query_id, callback))
                    thread.daemon = True
                    thread.start()
                
                return query_id
            else:
                raise Exception(f"异步查询提交失败: HTTP {response.status_code}")
                
        except Exception as e:
            self.logger.error(f"异步查询异常: {e}")
            raise
    
    def _monitor_async_query(self, query_id: str, callback):
        """监控异步查询状态"""
        while True:
            try:
                status = self.get_query_status(query_id)
                
                if status['isRunning']:
                    time.sleep(2)  # 等待2秒后再次检查
                    continue
                else:
                    # 查询完成,获取结果
                    if status['isSuccessful']:
                        result = self.get_query_result(query_id)
                        callback(query_id, result, None)
                    else:
                        error = status.get('exceptionMessage', '查询失败')
                        callback(query_id, None, error)
                    break
                    
            except Exception as e:
                callback(query_id, None, str(e))
                break
    
    def get_query_status(self, query_id: str) -> Dict:
        """获取查询状态"""
        response = self.session.get(f"{self.base_url}/query/{query_id}")
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"获取查询状态失败: HTTP {response.status_code}")
    
    def get_query_result(self, query_id: str) -> pd.DataFrame:
        """获取查询结果"""
        response = self.session.get(f"{self.base_url}/query/{query_id}/result")
        if response.status_code == 200:
            result = response.json()
            
            # 转换为DataFrame
            columns = [col['name'] for col in result['columnMetas']]
            rows = result['results']
            
            return pd.DataFrame(rows, columns=columns)
        else:
            raise Exception(f"获取查询结果失败: HTTP {response.status_code}")
    
    def get_projects(self) -> List[Dict]:
        """获取项目列表"""
        response = self.session.get(f"{self.base_url}/projects")
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"获取项目列表失败: HTTP {response.status_code}")
    
    def get_tables(self, project: str) -> List[Dict]:
        """获取表列表"""
        params = {'project': project, 'ext': True}
        response = self.session.get(f"{self.base_url}/tables", params=params)
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"获取表列表失败: HTTP {response.status_code}")
    
    def get_cubes(self, project: str = None) -> List[Dict]:
        """获取Cube列表"""
        params = {}
        if project:
            params['project'] = project
        
        response = self.session.get(f"{self.base_url}/cubes", params=params)
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"获取Cube列表失败: HTTP {response.status_code}")
    
    def close(self):
        """关闭连接"""
        if hasattr(self, 'session'):
            self.session.close()
            self.logger.info("连接已关闭")

# 数据分析工具类
class KylinDataAnalyzer:
    """Kylin数据分析工具"""
    
    def __init__(self, sdk: KylinPythonSDK):
        self.sdk = sdk
    
    def sales_trend_analysis(self, project: str, start_date: str, end_date: str) -> pd.DataFrame:
        """销售趋势分析"""
        sql = f"""
        SELECT 
            PART_DT,
            SUM(PRICE) as TOTAL_SALES,
            COUNT(*) as ORDER_COUNT,
            AVG(PRICE) as AVG_ORDER_VALUE
        FROM KYLIN_SALES 
        WHERE PART_DT >= '{start_date}' AND PART_DT < '{end_date}'
        GROUP BY PART_DT
        ORDER BY PART_DT
        """
        
        return self.sdk.query(sql, project)
    
    def product_performance_analysis(self, project: str, limit: int = 20) -> pd.DataFrame:
        """产品性能分析"""
        sql = f"""
        SELECT 
            LSTG_FORMAT_NAME as PRODUCT,
            SUM(PRICE) as TOTAL_SALES,
            COUNT(*) as ORDER_COUNT,
            AVG(PRICE) as AVG_ORDER_VALUE,
            MIN(PRICE) as MIN_PRICE,
            MAX(PRICE) as MAX_PRICE
        FROM KYLIN_SALES 
        GROUP BY LSTG_FORMAT_NAME
        ORDER BY TOTAL_SALES DESC
        LIMIT {limit}
        """
        
        return self.sdk.query(sql, project)
    
    def seller_performance_analysis(self, project: str, limit: int = 20) -> pd.DataFrame:
        """卖家性能分析"""
        sql = f"""
        SELECT 
            SELLER_ID,
            SUM(PRICE) as TOTAL_SALES,
            COUNT(*) as ORDER_COUNT,
            AVG(PRICE) as AVG_ORDER_VALUE,
            COUNT(DISTINCT LSTG_FORMAT_NAME) as PRODUCT_VARIETY
        FROM KYLIN_SALES 
        GROUP BY SELLER_ID
        ORDER BY TOTAL_SALES DESC
        LIMIT {limit}
        """
        
        return self.sdk.query(sql, project)
    
    def time_series_analysis(self, project: str, granularity: str = 'month') -> pd.DataFrame:
        """时间序列分析"""
        if granularity == 'month':
            date_format = "DATE_FORMAT(PART_DT, '%Y-%m')"
        elif granularity == 'quarter':
            date_format = "CONCAT(YEAR(PART_DT), '-Q', QUARTER(PART_DT))"
        elif granularity == 'year':
            date_format = "YEAR(PART_DT)"
        else:
            date_format = "PART_DT"
        
        sql = f"""
        SELECT 
            {date_format} as TIME_PERIOD,
            SUM(PRICE) as TOTAL_SALES,
            COUNT(*) as ORDER_COUNT,
            AVG(PRICE) as AVG_ORDER_VALUE
        FROM KYLIN_SALES 
        GROUP BY {date_format}
        ORDER BY {date_format}
        """
        
        return self.sdk.query(sql, project)
    
    def cohort_analysis(self, project: str) -> pd.DataFrame:
        """队列分析"""
        sql = """
        SELECT 
            DATE_FORMAT(PART_DT, '%Y-%m') as COHORT_MONTH,
            SELLER_ID,
            SUM(PRICE) as TOTAL_SALES,
            COUNT(*) as ORDER_COUNT
        FROM KYLIN_SALES 
        GROUP BY DATE_FORMAT(PART_DT, '%Y-%m'), SELLER_ID
        ORDER BY COHORT_MONTH, SELLER_ID
        """
        
        return self.sdk.query(sql, project)

# 可视化工具类
class KylinVisualizer:
    """Kylin数据可视化工具"""
    
    def __init__(self, analyzer: KylinDataAnalyzer):
        self.analyzer = analyzer
        
        # 导入可视化库
        try:
            import matplotlib.pyplot as plt
            import seaborn as sns
            self.plt = plt
            self.sns = sns
            
            # 设置中文字体
            plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS']
            plt.rcParams['axes.unicode_minus'] = False
            
        except ImportError:
            raise ImportError("请安装matplotlib和seaborn: pip install matplotlib seaborn")
    
    def plot_sales_trend(self, project: str, start_date: str, end_date: str, 
                        figsize: tuple = (12, 6)):
        """绘制销售趋势图"""
        df = self.analyzer.sales_trend_analysis(project, start_date, end_date)
        
        fig, (ax1, ax2) = self.plt.subplots(2, 1, figsize=figsize)
        
        # 销售额趋势
        ax1.plot(df['PART_DT'], df['TOTAL_SALES'], marker='o', linewidth=2)
        ax1.set_title('销售额趋势')
        ax1.set_ylabel('销售额')
        ax1.grid(True, alpha=0.3)
        
        # 订单数趋势
        ax2.plot(df['PART_DT'], df['ORDER_COUNT'], marker='s', color='orange', linewidth=2)
        ax2.set_title('订单数趋势')
        ax2.set_xlabel('日期')
        ax2.set_ylabel('订单数')
        ax2.grid(True, alpha=0.3)
        
        self.plt.tight_layout()
        self.plt.show()
    
    def plot_product_performance(self, project: str, limit: int = 10, 
                               figsize: tuple = (12, 8)):
        """绘制产品性能图"""
        df = self.analyzer.product_performance_analysis(project, limit)
        
        fig, ((ax1, ax2), (ax3, ax4)) = self.plt.subplots(2, 2, figsize=figsize)
        
        # 产品销售额排行
        ax1.barh(df['PRODUCT'], df['TOTAL_SALES'])
        ax1.set_title('产品销售额排行')
        ax1.set_xlabel('销售额')
        
        # 产品订单数排行
        ax2.barh(df['PRODUCT'], df['ORDER_COUNT'])
        ax2.set_title('产品订单数排行')
        ax2.set_xlabel('订单数')
        
        # 平均订单金额
        ax3.barh(df['PRODUCT'], df['AVG_ORDER_VALUE'])
        ax3.set_title('平均订单金额')
        ax3.set_xlabel('平均金额')
        
        # 价格分布
        ax4.scatter(df['MIN_PRICE'], df['MAX_PRICE'], s=df['ORDER_COUNT']/10, alpha=0.6)
        ax4.set_title('产品价格分布')
        ax4.set_xlabel('最低价格')
        ax4.set_ylabel('最高价格')
        
        self.plt.tight_layout()
        self.plt.show()
    
    def plot_time_series(self, project: str, granularity: str = 'month', 
                        figsize: tuple = (15, 6)):
        """绘制时间序列图"""
        df = self.analyzer.time_series_analysis(project, granularity)
        
        fig, ax = self.plt.subplots(figsize=figsize)
        
        # 双轴图
        ax2 = ax.twinx()
        
        # 销售额
        line1 = ax.plot(df['TIME_PERIOD'], df['TOTAL_SALES'], 
                       marker='o', color='blue', label='销售额')
        ax.set_ylabel('销售额', color='blue')
        ax.tick_params(axis='y', labelcolor='blue')
        
        # 订单数
        line2 = ax2.plot(df['TIME_PERIOD'], df['ORDER_COUNT'], 
                        marker='s', color='red', label='订单数')
        ax2.set_ylabel('订单数', color='red')
        ax2.tick_params(axis='y', labelcolor='red')
        
        # 设置标题和图例
        ax.set_title(f'{granularity.title()}级别销售趋势')
        ax.set_xlabel('时间')
        
        # 旋转x轴标签
        self.plt.setp(ax.get_xticklabels(), rotation=45)
        
        # 合并图例
        lines = line1 + line2
        labels = [l.get_label() for l in lines]
        ax.legend(lines, labels, loc='upper left')
        
        self.plt.tight_layout()
        self.plt.show()

# 使用示例
def main():
    # 创建SDK实例
    sdk = KylinPythonSDK(host='localhost', port=7070)
    
    try:
        # 基本查询
        print("=== 基本查询示例 ===")
        df = sdk.query(
            sql="SELECT PART_DT, SUM(PRICE) as TOTAL_SALES FROM KYLIN_SALES GROUP BY PART_DT ORDER BY PART_DT LIMIT 10",
            project="learn_kylin"
        )
        print(df)
        
        # 数据分析
        print("\n=== 数据分析示例 ===")
        analyzer = KylinDataAnalyzer(sdk)
        
        # 销售趋势分析
        trend_df = analyzer.sales_trend_analysis(
            project="learn_kylin",
            start_date="2012-01-01",
            end_date="2012-03-01"
        )
        print("销售趋势分析:")
        print(trend_df.head())
        
        # 产品性能分析
        product_df = analyzer.product_performance_analysis(
            project="learn_kylin",
            limit=5
        )
        print("\n产品性能分析:")
        print(product_df)
        
        # 可视化(需要安装matplotlib)
        try:
            print("\n=== 数据可视化示例 ===")
            visualizer = KylinVisualizer(analyzer)
            
            # 绘制销售趋势图
            visualizer.plot_sales_trend(
                project="learn_kylin",
                start_date="2012-01-01",
                end_date="2012-03-01"
            )
            
        except ImportError:
            print("跳过可视化示例(需要安装matplotlib和seaborn)")
        
        # 异步查询示例
        print("\n=== 异步查询示例 ===")
        
        def query_callback(query_id, result, error):
            if error:
                print(f"异步查询失败: {error}")
            else:
                print(f"异步查询成功,返回 {len(result)} 行数据")
                print(result.head())
        
        query_id = sdk.query_async(
            sql="SELECT LSTG_FORMAT_NAME, SUM(PRICE) as TOTAL_SALES FROM KYLIN_SALES GROUP BY LSTG_FORMAT_NAME ORDER BY TOTAL_SALES DESC",
            project="learn_kylin",
            callback=query_callback
        )
        print(f"异步查询已提交,查询ID: {query_id}")
        
        # 等待异步查询完成
        import time
        time.sleep(5)
        
    except Exception as e:
        print(f"错误: {e}")
    
    finally:
        sdk.close()

if __name__ == '__main__':
    main()

9.6 数据湖集成

9.6.1 Hadoop生态集成

Kylin与Hadoop生态系统深度集成,支持从HDFS、Hive等数据源构建Cube。

HDFS集成配置

<!-- core-site.xml -->
<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://namenode:9000</value>
    </property>
    
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/tmp/hadoop</value>
    </property>
    
    <property>
        <name>fs.hdfs.impl</name>
        <value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
    </property>
</configuration>
<!-- hdfs-site.xml -->
<configuration>
    <property>
        <name>dfs.replication</name>
        <value>3</value>
    </property>
    
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>/data/hadoop/namenode</value>
    </property>
    
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>/data/hadoop/datanode</value>
    </property>
</configuration>

Hive集成配置

<!-- hive-site.xml -->
<configuration>
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://localhost:3306/hive_metastore</value>
    </property>
    
    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.cj.jdbc.Driver</value>
    </property>
    
    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>hive</value>
    </property>
    
    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>hive_password</value>
    </property>
    
    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://localhost:9083</value>
    </property>
</configuration>

9.6.2 Delta Lake集成

#!/usr/bin/env python3
# delta_lake_integration.py - Delta Lake集成

from pyspark.sql import SparkSession
from delta.tables import DeltaTable
import logging

class DeltaLakeKylinIntegration:
    """Delta Lake与Kylin集成"""
    
    def __init__(self, spark_config: dict = None):
        """
        初始化Delta Lake集成
        
        Args:
            spark_config: Spark配置
        """
        # 默认Spark配置
        default_config = {
            "spark.app.name": "DeltaLakeKylinIntegration",
            "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
            "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
            "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
            "spark.sql.adaptive.enabled": "true",
            "spark.sql.adaptive.coalescePartitions.enabled": "true"
        }
        
        if spark_config:
            default_config.update(spark_config)
        
        # 创建Spark会话
        builder = SparkSession.builder
        for key, value in default_config.items():
            builder = builder.config(key, value)
        
        self.spark = builder.getOrCreate()
        self.logger = logging.getLogger(__name__)
    
    def create_delta_table_from_hive(self, hive_table: str, delta_path: str, 
                                   partition_cols: list = None):
        """从Hive表创建Delta表"""
        try:
            # 读取Hive表
            df = self.spark.table(hive_table)
            
            # 写入Delta表
            writer = df.write.format("delta").mode("overwrite")
            
            if partition_cols:
                writer = writer.partitionBy(*partition_cols)
            
            writer.save(delta_path)
            
            self.logger.info(f"成功创建Delta表: {delta_path}")
            
        except Exception as e:
            self.logger.error(f"创建Delta表失败: {e}")
            raise
    
    def sync_delta_to_hive(self, delta_path: str, hive_table: str, 
                          database: str = "default"):
        """同步Delta表到Hive"""
        try:
            # 读取Delta表
            df = self.spark.read.format("delta").load(delta_path)
            
            # 创建临时视图
            temp_view = f"temp_{hive_table}"
            df.createOrReplaceTempView(temp_view)
            
            # 创建Hive表
            self.spark.sql(f"""
                CREATE TABLE IF NOT EXISTS {database}.{hive_table}
                USING HIVE
                AS SELECT * FROM {temp_view}
            """)
            
            self.logger.info(f"成功同步到Hive表: {database}.{hive_table}")
            
        except Exception as e:
            self.logger.error(f"同步到Hive失败: {e}")
            raise
    
    def incremental_update(self, delta_path: str, source_df, 
                          merge_condition: str, update_condition: str = None):
        """增量更新Delta表"""
        try:
            # 获取Delta表
            delta_table = DeltaTable.forPath(self.spark, delta_path)
            
            # 执行MERGE操作
            merge_builder = delta_table.alias("target").merge(
                source_df.alias("source"),
                merge_condition
            )
            
            if update_condition:
                merge_builder = merge_builder.whenMatchedUpdate(
                    condition=update_condition,
                    set={
                        col: f"source.{col}" for col in source_df.columns
                    }
                )
            else:
                merge_builder = merge_builder.whenMatchedUpdateAll()
            
            merge_builder = merge_builder.whenNotMatchedInsertAll()
            
            # 执行合并
            merge_builder.execute()
            
            self.logger.info(f"增量更新完成: {delta_path}")
            
        except Exception as e:
            self.logger.error(f"增量更新失败: {e}")
            raise
    
    def optimize_delta_table(self, delta_path: str, z_order_cols: list = None):
        """优化Delta表"""
        try:
            # 压缩小文件
            self.spark.sql(f"OPTIMIZE delta.`{delta_path}`")
            
            # Z-Order优化
            if z_order_cols:
                z_order_str = ", ".join(z_order_cols)
                self.spark.sql(f"OPTIMIZE delta.`{delta_path}` ZORDER BY ({z_order_str})")
            
            self.logger.info(f"Delta表优化完成: {delta_path}")
            
        except Exception as e:
            self.logger.error(f"Delta表优化失败: {e}")
            raise
    
    def vacuum_delta_table(self, delta_path: str, retention_hours: int = 168):
        """清理Delta表历史版本"""
        try:
            self.spark.sql(f"VACUUM delta.`{delta_path}` RETAIN {retention_hours} HOURS")
            self.logger.info(f"Delta表清理完成: {delta_path}")
            
        except Exception as e:
            self.logger.error(f"Delta表清理失败: {e}")
            raise
    
    def get_table_history(self, delta_path: str):
        """获取表历史"""
        try:
            history_df = self.spark.sql(f"DESCRIBE HISTORY delta.`{delta_path}`")
            return history_df
            
        except Exception as e:
            self.logger.error(f"获取表历史失败: {e}")
            raise
    
    def close(self):
        """关闭Spark会话"""
        if self.spark:
            self.spark.stop()

# 使用示例
def main():
    # 创建集成实例
    integration = DeltaLakeKylinIntegration()
    
    try:
        # 示例1: 从Hive创建Delta表
        integration.create_delta_table_from_hive(
            hive_table="default.sales_data",
            delta_path="/data/delta/sales",
            partition_cols=["year", "month"]
        )
        
        # 示例2: 同步到Hive
        integration.sync_delta_to_hive(
            delta_path="/data/delta/sales",
            hive_table="sales_delta",
            database="kylin_db"
        )
        
        # 示例3: 优化表
        integration.optimize_delta_table(
            delta_path="/data/delta/sales",
            z_order_cols=["customer_id", "product_id"]
        )
        
        # 示例4: 查看表历史
        history = integration.get_table_history("/data/delta/sales")
        history.show()
        
    except Exception as e:
        print(f"错误: {e}")
    
    finally:
        integration.close()

if __name__ == '__main__':
    main()