4.1 数据源概述

4.1.1 支持的数据源类型

Apache Kylin支持多种数据源类型,为不同的数据场景提供灵活的接入方案:

主要数据源类型:

数据源类型 描述 适用场景 版本支持
Hive 基于Hadoop的数据仓库 批处理、历史数据分析 所有版本
Kafka 分布式流处理平台 实时数据流、流式分析 Kylin 3.0+
JDBC 关系型数据库连接 传统数据库、小数据量 Kylin 4.0+
Parquet 列式存储格式 高性能分析、压缩存储 Kylin 4.0+
ORC 优化行列式存储 Hive优化存储 Kylin 3.0+

4.1.2 数据源架构

数据源连接架构:

graph TD
    A[Kylin Server] --> B[Data Source Manager]
    B --> C[Hive Connector]
    B --> D[Kafka Connector]
    B --> E[JDBC Connector]
    B --> F[File Connector]
    
    C --> G[Hive Metastore]
    C --> H[HDFS]
    
    D --> I[Kafka Cluster]
    
    E --> J[MySQL]
    E --> K[PostgreSQL]
    E --> L[Oracle]
    
    F --> M[Parquet Files]
    F --> N[ORC Files]

连接池管理:

// 数据源连接池配置
public class DataSourceConnectionPool {
    private final Map<String, HikariDataSource> connectionPools;
    private final DataSourceConfig config;
    
    public DataSourceConnectionPool(DataSourceConfig config) {
        this.config = config;
        this.connectionPools = new ConcurrentHashMap<>();
        initializeConnectionPools();
    }
    
    private void initializeConnectionPools() {
        // Hive连接池
        HikariConfig hiveConfig = new HikariConfig();
        hiveConfig.setJdbcUrl(config.getHiveJdbcUrl());
        hiveConfig.setUsername(config.getHiveUsername());
        hiveConfig.setPassword(config.getHivePassword());
        hiveConfig.setMaximumPoolSize(config.getHiveMaxConnections());
        hiveConfig.setMinimumIdle(config.getHiveMinIdle());
        hiveConfig.setConnectionTimeout(config.getConnectionTimeout());
        hiveConfig.setIdleTimeout(config.getIdleTimeout());
        
        connectionPools.put("hive", new HikariDataSource(hiveConfig));
        
        // JDBC连接池
        for (JdbcDataSourceConfig jdbcConfig : config.getJdbcDataSources()) {
            HikariConfig config = createJdbcConfig(jdbcConfig);
            connectionPools.put(jdbcConfig.getName(), new HikariDataSource(config));
        }
    }
    
    public Connection getConnection(String dataSourceName) throws SQLException {
        HikariDataSource dataSource = connectionPools.get(dataSourceName);
        if (dataSource == null) {
            throw new IllegalArgumentException("Unknown data source: " + dataSourceName);
        }
        return dataSource.getConnection();
    }
}

4.2 Hive数据源配置

4.2.1 Hive连接配置

基础配置文件(kylin.properties):

# Hive数据源配置
kylin.source.hive.database-for-flat-table=default
kylin.source.hive.flat-table-storage-format=SEQUENCEFILE
kylin.source.hive.flat-table-field-delimiter=\u001F

# Hive JDBC配置
kylin.source.hive.connection-url=jdbc:hive2://hive-server:10000/default
kylin.source.hive.connection-user=kylin
kylin.source.hive.connection-password=kylin123
kylin.source.hive.connection-driver=org.apache.hive.jdbc.HiveDriver

# 连接池配置
kylin.source.hive.pool.max-total=50
kylin.source.hive.pool.max-idle=10
kylin.source.hive.pool.min-idle=5
kylin.source.hive.pool.max-wait-millis=30000

# Hive表扫描配置
kylin.source.hive.table-dir-create-first=true
kylin.source.hive.redistribute-flat-table=true
kylin.source.hive.flat-table-cluster-by-dict-column=true

高级Hive配置:

<!-- hive-site.xml 优化配置 -->
<configuration>
    <!-- 启用向量化执行 -->
    <property>
        <name>hive.vectorized.execution.enabled</name>
        <value>true</value>
    </property>
    
    <!-- 启用CBO优化器 -->
    <property>
        <name>hive.cbo.enable</name>
        <value>true</value>
    </property>
    
    <!-- 启用统计信息 -->
    <property>
        <name>hive.stats.autogather</name>
        <value>true</value>
    </property>
    
    <!-- 动态分区配置 -->
    <property>
        <name>hive.exec.dynamic.partition</name>
        <value>true</value>
    </property>
    
    <property>
        <name>hive.exec.dynamic.partition.mode</name>
        <value>nonstrict</value>
    </property>
    
    <!-- 压缩配置 -->
    <property>
        <name>hive.exec.compress.output</name>
        <value>true</value>
    </property>
    
    <property>
        <name>mapred.output.compression.codec</name>
        <value>org.apache.hadoop.io.compress.SnappyCodec</value>
    </property>
</configuration>

4.2.2 Hive表管理

表发现与同步脚本:

#!/bin/bash
# hive_table_sync.sh - Hive表同步脚本

KYLIN_HOME="/opt/kylin"
HIVE_DATABASE="$1"
TABLE_PATTERN="$2"

if [ -z "$HIVE_DATABASE" ]; then
    echo "用法: $0 <database> [table_pattern]"
    echo "示例: $0 sales_db sales_*"
    exit 1
fi

echo "开始同步Hive数据库: $HIVE_DATABASE"

# 获取表列表
if [ -n "$TABLE_PATTERN" ]; then
    TABLES=$(hive -e "SHOW TABLES IN $HIVE_DATABASE LIKE '$TABLE_PATTERN';" 2>/dev/null)
else
    TABLES=$(hive -e "SHOW TABLES IN $HIVE_DATABASE;" 2>/dev/null)
fi

if [ -z "$TABLES" ]; then
    echo "未找到匹配的表"
    exit 1
fi

echo "发现表: $TABLES"

# 同步每个表
for table in $TABLES; do
    echo "正在同步表: $HIVE_DATABASE.$table"
    
    # 获取表结构
    hive -e "DESCRIBE FORMATTED $HIVE_DATABASE.$table;" > "/tmp/${table}_schema.txt"
    
    # 检查表是否已存在于Kylin中
    table_exists=$(curl -s -X GET \
        -H "Authorization: Basic QURNSU46S1lMSU4=" \
        "http://localhost:7070/kylin/api/tables/$HIVE_DATABASE.$table" | \
        jq -r '.name // "null"')
    
    if [ "$table_exists" = "null" ]; then
        echo "添加新表到Kylin: $HIVE_DATABASE.$table"
        
        # 加载表到Kylin
        curl -X POST \
            -H "Authorization: Basic QURNSU46S1lMSU4=" \
            -H "Content-Type: application/json" \
            -d "{
                \"tables\": [\"$HIVE_DATABASE.$table\"],
                \"project\": \"default\"
            }" \
            "http://localhost:7070/kylin/api/tables/$HIVE_DATABASE.$table/hive"
    else
        echo "表已存在,检查是否需要更新: $HIVE_DATABASE.$table"
        
        # 重新加载表结构
        curl -X PUT \
            -H "Authorization: Basic QURNSU46S1lMSU4=" \
            "http://localhost:7070/kylin/api/tables/$HIVE_DATABASE.$table/hive"
    fi
    
    echo "表同步完成: $HIVE_DATABASE.$table"
    echo "---"
done

echo "Hive表同步完成"

表结构分析脚本:

#!/usr/bin/env python3
# hive_table_analyzer.py - Hive表结构分析

import sys
import json
import subprocess
from collections import defaultdict

class HiveTableAnalyzer:
    def __init__(self, database):
        self.database = database
        self.tables_info = {}
    
    def analyze_database(self):
        """分析整个数据库"""
        print(f"分析Hive数据库: {self.database}")
        
        # 获取所有表
        tables = self.get_tables()
        
        for table in tables:
            print(f"分析表: {table}")
            self.analyze_table(table)
        
        # 生成分析报告
        self.generate_report()
    
    def get_tables(self):
        """获取数据库中的所有表"""
        cmd = f"hive -e 'SHOW TABLES IN {self.database};'"
        result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
        
        if result.returncode != 0:
            print(f"错误: {result.stderr}")
            return []
        
        return [table.strip() for table in result.stdout.split('\n') if table.strip()]
    
    def analyze_table(self, table):
        """分析单个表"""
        table_info = {
            'name': table,
            'columns': [],
            'partitions': [],
            'storage_format': '',
            'location': '',
            'size': 0,
            'row_count': 0
        }
        
        # 获取表结构
        cmd = f"hive -e 'DESCRIBE FORMATTED {self.database}.{table};'"
        result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
        
        if result.returncode != 0:
            print(f"错误: 无法获取表 {table} 的结构")
            return
        
        # 解析表结构
        lines = result.stdout.split('\n')
        current_section = 'columns'
        
        for line in lines:
            line = line.strip()
            if not line:
                continue
            
            if line.startswith('# Partition Information'):
                current_section = 'partitions'
                continue
            elif line.startswith('# Detailed Table Information'):
                current_section = 'details'
                continue
            
            if current_section == 'columns' and '\t' in line:
                parts = line.split('\t')
                if len(parts) >= 2 and not line.startswith('#'):
                    column_info = {
                        'name': parts[0].strip(),
                        'type': parts[1].strip(),
                        'comment': parts[2].strip() if len(parts) > 2 else ''
                    }
                    table_info['columns'].append(column_info)
            
            elif current_section == 'partitions' and '\t' in line:
                parts = line.split('\t')
                if len(parts) >= 2 and not line.startswith('#'):
                    partition_info = {
                        'name': parts[0].strip(),
                        'type': parts[1].strip()
                    }
                    table_info['partitions'].append(partition_info)
            
            elif current_section == 'details':
                if 'InputFormat:' in line:
                    table_info['storage_format'] = line.split(':', 1)[1].strip()
                elif 'Location:' in line:
                    table_info['location'] = line.split(':', 1)[1].strip()
        
        # 获取表统计信息
        self.get_table_stats(table, table_info)
        
        self.tables_info[table] = table_info
    
    def get_table_stats(self, table, table_info):
        """获取表统计信息"""
        try:
            # 获取行数
            cmd = f"hive -e 'SELECT COUNT(*) FROM {self.database}.{table};'"
            result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=300)
            
            if result.returncode == 0:
                lines = result.stdout.strip().split('\n')
                for line in reversed(lines):
                    if line.strip().isdigit():
                        table_info['row_count'] = int(line.strip())
                        break
        except subprocess.TimeoutExpired:
            print(f"警告: 获取表 {table} 行数超时")
        except Exception as e:
            print(f"警告: 获取表 {table} 统计信息失败: {e}")
    
    def generate_report(self):
        """生成分析报告"""
        report = {
            'database': self.database,
            'total_tables': len(self.tables_info),
            'tables': self.tables_info,
            'summary': self.generate_summary()
        }
        
        # 保存报告
        report_file = f"{self.database}_analysis_report.json"
        with open(report_file, 'w', encoding='utf-8') as f:
            json.dump(report, f, indent=2, ensure_ascii=False)
        
        print(f"\n分析报告已保存到: {report_file}")
        
        # 打印摘要
        self.print_summary(report['summary'])
    
    def generate_summary(self):
        """生成摘要信息"""
        summary = {
            'total_tables': len(self.tables_info),
            'total_columns': 0,
            'partitioned_tables': 0,
            'storage_formats': defaultdict(int),
            'column_types': defaultdict(int),
            'large_tables': [],  # 行数 > 1000万
            'potential_fact_tables': [],  # 可能的事实表
            'potential_dim_tables': []   # 可能的维度表
        }
        
        for table_name, table_info in self.tables_info.items():
            # 统计列数
            summary['total_columns'] += len(table_info['columns'])
            
            # 统计分区表
            if table_info['partitions']:
                summary['partitioned_tables'] += 1
            
            # 统计存储格式
            storage_format = table_info['storage_format']
            if storage_format:
                summary['storage_formats'][storage_format] += 1
            
            # 统计列类型
            for column in table_info['columns']:
                col_type = column['type'].lower()
                if 'decimal' in col_type or 'double' in col_type or 'float' in col_type:
                    summary['column_types']['numeric'] += 1
                elif 'int' in col_type or 'bigint' in col_type:
                    summary['column_types']['integer'] += 1
                elif 'string' in col_type or 'varchar' in col_type:
                    summary['column_types']['string'] += 1
                elif 'date' in col_type or 'timestamp' in col_type:
                    summary['column_types']['datetime'] += 1
                else:
                    summary['column_types']['other'] += 1
            
            # 识别大表
            if table_info['row_count'] > 10000000:  # 1000万行
                summary['large_tables'].append({
                    'name': table_name,
                    'row_count': table_info['row_count']
                })
            
            # 简单的表类型识别
            numeric_columns = sum(1 for col in table_info['columns'] 
                                if any(t in col['type'].lower() 
                                     for t in ['decimal', 'double', 'float', 'int']))
            
            if numeric_columns > len(table_info['columns']) * 0.3:  # 数值列占比 > 30%
                summary['potential_fact_tables'].append(table_name)
            else:
                summary['potential_dim_tables'].append(table_name)
        
        return summary
    
    def print_summary(self, summary):
        """打印摘要信息"""
        print("\n=== 数据库分析摘要 ===")
        print(f"数据库: {self.database}")
        print(f"总表数: {summary['total_tables']}")
        print(f"总列数: {summary['total_columns']}")
        print(f"分区表数: {summary['partitioned_tables']}")
        
        print("\n存储格式分布:")
        for format_type, count in summary['storage_formats'].items():
            print(f"  {format_type}: {count}")
        
        print("\n列类型分布:")
        for col_type, count in summary['column_types'].items():
            print(f"  {col_type}: {count}")
        
        if summary['large_tables']:
            print("\n大表 (>1000万行):")
            for table in summary['large_tables']:
                print(f"  {table['name']}: {table['row_count']:,} 行")
        
        print(f"\n可能的事实表 ({len(summary['potential_fact_tables'])}):")
        for table in summary['potential_fact_tables'][:5]:  # 只显示前5个
            print(f"  {table}")
        
        print(f"\n可能的维度表 ({len(summary['potential_dim_tables'])}):")
        for table in summary['potential_dim_tables'][:5]:  # 只显示前5个
            print(f"  {table}")

def main():
    if len(sys.argv) != 2:
        print("用法: python3 hive_table_analyzer.py <database>")
        sys.exit(1)
    
    database = sys.argv[1]
    analyzer = HiveTableAnalyzer(database)
    analyzer.analyze_database()

if __name__ == "__main__":
    main()

4.6.2 数据源性能监控

数据源性能监控配置:

# 数据源性能监控
kylin.datasource.monitor.enabled=true
kylin.datasource.monitor.interval=60000
kylin.datasource.monitor.metrics.enabled=true
kylin.datasource.monitor.slow-query.threshold=5000

# 连接池监控
kylin.datasource.pool.monitor.enabled=true
kylin.datasource.pool.monitor.log-abandoned=true
kylin.datasource.pool.monitor.remove-abandoned=true
kylin.datasource.pool.monitor.remove-abandoned-timeout=300

性能监控脚本:

#!/usr/bin/env python3
# datasource_performance_monitor.py - 数据源性能监控

import time
import json
import logging
import threading
from datetime import datetime, timedelta
import psutil
import requests
from collections import defaultdict, deque

class DataSourcePerformanceMonitor:
    def __init__(self, config_file):
        self.config = self.load_config(config_file)
        self.setup_logging()
        self.metrics = defaultdict(lambda: deque(maxlen=1000))
        self.running = False
        self.monitor_thread = None
    
    def load_config(self, config_file):
        """加载配置文件"""
        with open(config_file, 'r', encoding='utf-8') as f:
            return json.load(f)
    
    def setup_logging(self):
        """设置日志"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
    
    def collect_system_metrics(self):
        """收集系统指标"""
        timestamp = datetime.now()
        
        # CPU使用率
        cpu_percent = psutil.cpu_percent(interval=1)
        self.metrics['system_cpu'].append({
            'timestamp': timestamp.isoformat(),
            'value': cpu_percent
        })
        
        # 内存使用率
        memory = psutil.virtual_memory()
        self.metrics['system_memory'].append({
            'timestamp': timestamp.isoformat(),
            'value': memory.percent
        })
        
        # 磁盘I/O
        disk_io = psutil.disk_io_counters()
        if disk_io:
            self.metrics['disk_read_bytes'].append({
                'timestamp': timestamp.isoformat(),
                'value': disk_io.read_bytes
            })
            self.metrics['disk_write_bytes'].append({
                'timestamp': timestamp.isoformat(),
                'value': disk_io.write_bytes
            })
        
        # 网络I/O
        net_io = psutil.net_io_counters()
        if net_io:
            self.metrics['network_bytes_sent'].append({
                'timestamp': timestamp.isoformat(),
                'value': net_io.bytes_sent
            })
            self.metrics['network_bytes_recv'].append({
                'timestamp': timestamp.isoformat(),
                'value': net_io.bytes_recv
            })
    
    def collect_kylin_metrics(self):
        """收集Kylin指标"""
        try:
            kylin_config = self.config.get('kylin', {})
            base_url = kylin_config.get('base_url', 'http://localhost:7070')
            username = kylin_config.get('username', 'ADMIN')
            password = kylin_config.get('password', 'KYLIN')
            
            # 获取系统信息
            response = requests.get(
                f"{base_url}/kylin/api/admin/system",
                auth=(username, password),
                timeout=10
            )
            
            if response.status_code == 200:
                system_info = response.json()
                timestamp = datetime.now().isoformat()
                
                # JVM内存使用
                if 'jvm' in system_info:
                    jvm_info = system_info['jvm']
                    self.metrics['kylin_jvm_heap_used'].append({
                        'timestamp': timestamp,
                        'value': jvm_info.get('heapUsed', 0)
                    })
                    self.metrics['kylin_jvm_heap_max'].append({
                        'timestamp': timestamp,
                        'value': jvm_info.get('heapMax', 0)
                    })
                
                # 活跃查询数
                if 'activeQueries' in system_info:
                    self.metrics['kylin_active_queries'].append({
                        'timestamp': timestamp,
                        'value': system_info['activeQueries']
                    })
            
            # 获取查询统计
            response = requests.get(
                f"{base_url}/kylin/api/query/statistics",
                auth=(username, password),
                timeout=10
            )
            
            if response.status_code == 200:
                query_stats = response.json()
                timestamp = datetime.now().isoformat()
                
                self.metrics['kylin_query_count'].append({
                    'timestamp': timestamp,
                    'value': query_stats.get('totalQueries', 0)
                })
                self.metrics['kylin_avg_query_time'].append({
                    'timestamp': timestamp,
                    'value': query_stats.get('avgQueryTime', 0)
                })
                
        except Exception as e:
            self.logger.error(f"收集Kylin指标失败: {e}")
    
    def collect_datasource_metrics(self):
        """收集数据源指标"""
        # 收集Hive指标
        self.collect_hive_metrics()
        
        # 收集Kafka指标
        self.collect_kafka_metrics()
        
        # 收集JDBC指标
        self.collect_jdbc_metrics()
    
    def collect_hive_metrics(self):
        """收集Hive指标"""
        try:
            hive_config = self.config.get('hive', {})
            if not hive_config:
                return
            
            # 这里可以通过JMX或其他方式收集Hive指标
            # 示例:收集Hive Metastore连接数
            timestamp = datetime.now().isoformat()
            
            # 模拟指标收集
            self.metrics['hive_metastore_connections'].append({
                'timestamp': timestamp,
                'value': 10  # 实际应该从JMX获取
            })
            
        except Exception as e:
            self.logger.error(f"收集Hive指标失败: {e}")
    
    def collect_kafka_metrics(self):
        """收集Kafka指标"""
        try:
            kafka_config = self.config.get('kafka', {})
            if not kafka_config:
                return
            
            # 这里可以通过Kafka JMX收集指标
            timestamp = datetime.now().isoformat()
            
            # 模拟指标收集
            self.metrics['kafka_consumer_lag'].append({
                'timestamp': timestamp,
                'value': 100  # 实际应该从Kafka JMX获取
            })
            
        except Exception as e:
            self.logger.error(f"收集Kafka指标失败: {e}")
    
    def collect_jdbc_metrics(self):
        """收集JDBC指标"""
        try:
            jdbc_configs = self.config.get('jdbc_datasources', [])
            
            for jdbc_config in jdbc_configs:
                name = jdbc_config['name']
                timestamp = datetime.now().isoformat()
                
                # 模拟连接池指标
                self.metrics[f'jdbc_{name}_active_connections'].append({
                    'timestamp': timestamp,
                    'value': 5  # 实际应该从连接池获取
                })
                self.metrics[f'jdbc_{name}_idle_connections'].append({
                    'timestamp': timestamp,
                    'value': 15  # 实际应该从连接池获取
                })
                
        except Exception as e:
            self.logger.error(f"收集JDBC指标失败: {e}")
    
    def analyze_performance(self):
        """分析性能数据"""
        analysis_result = {
            'timestamp': datetime.now().isoformat(),
            'alerts': [],
            'recommendations': [],
            'summary': {}
        }
        
        # 分析CPU使用率
        if 'system_cpu' in self.metrics and self.metrics['system_cpu']:
            recent_cpu = [m['value'] for m in list(self.metrics['system_cpu'])[-10:]]
            avg_cpu = sum(recent_cpu) / len(recent_cpu)
            
            if avg_cpu > 80:
                analysis_result['alerts'].append({
                    'type': 'high_cpu',
                    'message': f'CPU使用率过高: {avg_cpu:.1f}%',
                    'severity': 'warning'
                })
                analysis_result['recommendations'].append(
                    '建议检查是否有大量并发查询或优化查询性能'
                )
        
        # 分析内存使用率
        if 'system_memory' in self.metrics and self.metrics['system_memory']:
            recent_memory = [m['value'] for m in list(self.metrics['system_memory'])[-10:]]
            avg_memory = sum(recent_memory) / len(recent_memory)
            
            if avg_memory > 85:
                analysis_result['alerts'].append({
                    'type': 'high_memory',
                    'message': f'内存使用率过高: {avg_memory:.1f}%',
                    'severity': 'critical'
                })
                analysis_result['recommendations'].append(
                    '建议增加内存或优化JVM堆大小配置'
                )
        
        # 分析查询性能
        if 'kylin_avg_query_time' in self.metrics and self.metrics['kylin_avg_query_time']:
            recent_query_time = [m['value'] for m in list(self.metrics['kylin_avg_query_time'])[-10:]]
            avg_query_time = sum(recent_query_time) / len(recent_query_time)
            
            if avg_query_time > 5000:  # 5秒
                analysis_result['alerts'].append({
                    'type': 'slow_query',
                    'message': f'平均查询时间过长: {avg_query_time:.0f}ms',
                    'severity': 'warning'
                })
                analysis_result['recommendations'].append(
                    '建议检查Cube设计和查询优化'
                )
        
        return analysis_result
    
    def generate_performance_report(self):
        """生成性能报告"""
        analysis = self.analyze_performance()
        
        report = {
            'timestamp': datetime.now().isoformat(),
            'period': '最近1小时',
            'analysis': analysis,
            'metrics_summary': self.get_metrics_summary()
        }
        
        # 保存报告
        report_file = f"performance_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        with open(report_file, 'w', encoding='utf-8') as f:
            json.dump(report, f, indent=2, ensure_ascii=False)
        
        self.logger.info(f"性能报告已保存: {report_file}")
        
        # 打印摘要
        self.print_performance_summary(report)
        
        return report
    
    def get_metrics_summary(self):
        """获取指标摘要"""
        summary = {}
        
        for metric_name, metric_data in self.metrics.items():
            if metric_data:
                values = [m['value'] for m in metric_data]
                summary[metric_name] = {
                    'count': len(values),
                    'avg': sum(values) / len(values),
                    'min': min(values),
                    'max': max(values),
                    'latest': values[-1] if values else 0
                }
        
        return summary
    
    def print_performance_summary(self, report):
        """打印性能摘要"""
        print("\n=== 数据源性能监控报告 ===")
        print(f"报告时间: {report['timestamp']}")
        print(f"监控周期: {report['period']}")
        
        # 打印告警
        alerts = report['analysis']['alerts']
        if alerts:
            print("\n⚠️ 性能告警:")
            for alert in alerts:
                severity_icon = {
                    'critical': '🔴',
                    'warning': '🟡',
                    'info': '🔵'
                }.get(alert['severity'], '⚪')
                print(f"  {severity_icon} {alert['message']}")
        else:
            print("\n✅ 无性能告警")
        
        # 打印建议
        recommendations = report['analysis']['recommendations']
        if recommendations:
            print("\n💡 优化建议:")
            for i, rec in enumerate(recommendations, 1):
                print(f"  {i}. {rec}")
        
        # 打印关键指标
        metrics_summary = report['metrics_summary']
        print("\n📊 关键指标:")
        
        key_metrics = [
            ('system_cpu', 'CPU使用率', '%'),
            ('system_memory', '内存使用率', '%'),
            ('kylin_active_queries', '活跃查询数', '个'),
            ('kylin_avg_query_time', '平均查询时间', 'ms')
        ]
        
        for metric_key, metric_name, unit in key_metrics:
            if metric_key in metrics_summary:
                metric = metrics_summary[metric_key]
                print(f"  {metric_name}: {metric['latest']:.1f}{unit} (平均: {metric['avg']:.1f}{unit})")
    
    def start_monitoring(self):
        """开始监控"""
        self.running = True
        self.monitor_thread = threading.Thread(target=self._monitor_loop)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
        self.logger.info("性能监控已启动")
    
    def stop_monitoring(self):
        """停止监控"""
        self.running = False
        if self.monitor_thread:
            self.monitor_thread.join()
        self.logger.info("性能监控已停止")
    
    def _monitor_loop(self):
        """监控循环"""
        while self.running:
            try:
                # 收集指标
                self.collect_system_metrics()
                self.collect_kylin_metrics()
                self.collect_datasource_metrics()
                
                # 每10分钟生成一次报告
                if int(time.time()) % 600 == 0:
                    self.generate_performance_report()
                
                time.sleep(60)  # 每分钟收集一次
                
            except Exception as e:
                self.logger.error(f"监控循环错误: {e}")
                time.sleep(60)

def main():
    import argparse
    import signal
    
    parser = argparse.ArgumentParser(description='数据源性能监控工具')
    parser.add_argument('--config', required=True, help='配置文件路径')
    parser.add_argument('--duration', type=int, default=3600, help='监控持续时间(秒)')
    
    args = parser.parse_args()
    
    monitor = DataSourcePerformanceMonitor(args.config)
    
    def signal_handler(signum, frame):
        print("\n正在停止监控...")
        monitor.stop_monitoring()
        monitor.generate_performance_report()
        exit(0)
    
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    
    monitor.start_monitoring()
    
    try:
        time.sleep(args.duration)
    except KeyboardInterrupt:
        pass
    finally:
        monitor.stop_monitoring()
        monitor.generate_performance_report()

if __name__ == "__main__":
    main()

4.7 故障排除

4.7.1 常见问题诊断

连接问题排查:

#!/bin/bash
# datasource_troubleshoot.sh - 数据源故障排查脚本

echo "=== 数据源连接故障排查 ==="

# 检查网络连通性
check_network() {
    local host=$1
    local port=$2
    
    echo "检查网络连通性: $host:$port"
    
    if command -v nc >/dev/null 2>&1; then
        if nc -z -w5 $host $port; then
            echo "✅ 网络连接正常"
        else
            echo "❌ 网络连接失败"
            return 1
        fi
    else
        if timeout 5 bash -c "</dev/tcp/$host/$port"; then
            echo "✅ 网络连接正常"
        else
            echo "❌ 网络连接失败"
            return 1
        fi
    fi
}

# 检查DNS解析
check_dns() {
    local hostname=$1
    
    echo "检查DNS解析: $hostname"
    
    if nslookup $hostname >/dev/null 2>&1; then
        echo "✅ DNS解析正常"
        nslookup $hostname | grep "Address:" | tail -n +2
    else
        echo "❌ DNS解析失败"
        return 1
    fi
}

# 检查Hive连接
check_hive() {
    echo "\n=== 检查Hive连接 ==="
    
    local hive_host=${HIVE_HOST:-"localhost"}
    local hive_port=${HIVE_PORT:-"10000"}
    local metastore_port=${METASTORE_PORT:-"9083"}
    
    # 检查HiveServer2
    echo "检查HiveServer2连接:"
    check_network $hive_host $hive_port
    
    # 检查Metastore
    echo "检查Metastore连接:"
    check_network $hive_host $metastore_port
    
    # 检查Hive服务状态
    echo "检查Hive服务状态:"
    if command -v beeline >/dev/null 2>&1; then
        if beeline -u "jdbc:hive2://$hive_host:$hive_port" -e "SHOW DATABASES;" >/dev/null 2>&1; then
            echo "✅ Hive服务正常"
        else
            echo "❌ Hive服务异常"
        fi
    else
        echo "⚠️ beeline命令不可用,无法测试Hive连接"
    fi
}

# 检查Kafka连接
check_kafka() {
    echo "\n=== 检查Kafka连接 ==="
    
    local kafka_host=${KAFKA_HOST:-"localhost"}
    local kafka_port=${KAFKA_PORT:-"9092"}
    
    # 检查Kafka Broker
    echo "检查Kafka Broker连接:"
    check_network $kafka_host $kafka_port
    
    # 检查Kafka服务状态
    if command -v kafka-topics.sh >/dev/null 2>&1; then
        echo "检查Kafka主题列表:"
        if kafka-topics.sh --bootstrap-server $kafka_host:$kafka_port --list >/dev/null 2>&1; then
            echo "✅ Kafka服务正常"
            kafka-topics.sh --bootstrap-server $kafka_host:$kafka_port --list | head -5
        else
            echo "❌ Kafka服务异常"
        fi
    else
        echo "⚠️ kafka-topics.sh命令不可用,无法测试Kafka连接"
    fi
}

# 检查JDBC连接
check_jdbc() {
    echo "\n=== 检查JDBC连接 ==="
    
    # MySQL连接检查
    if [ ! -z "$MYSQL_HOST" ]; then
        echo "检查MySQL连接:"
        check_network ${MYSQL_HOST} ${MYSQL_PORT:-3306}
        
        if command -v mysql >/dev/null 2>&1; then
            if mysql -h${MYSQL_HOST} -P${MYSQL_PORT:-3306} -u${MYSQL_USER} -p${MYSQL_PASSWORD} -e "SELECT 1;" >/dev/null 2>&1; then
                echo "✅ MySQL连接正常"
            else
                echo "❌ MySQL连接失败"
            fi
        fi
    fi
    
    # PostgreSQL连接检查
    if [ ! -z "$POSTGRES_HOST" ]; then
        echo "检查PostgreSQL连接:"
        check_network ${POSTGRES_HOST} ${POSTGRES_PORT:-5432}
        
        if command -v psql >/dev/null 2>&1; then
            if PGPASSWORD=${POSTGRES_PASSWORD} psql -h ${POSTGRES_HOST} -p ${POSTGRES_PORT:-5432} -U ${POSTGRES_USER} -d ${POSTGRES_DB} -c "SELECT 1;" >/dev/null 2>&1; then
                echo "✅ PostgreSQL连接正常"
            else
                echo "❌ PostgreSQL连接失败"
            fi
        fi
    fi
}

# 检查HDFS连接
check_hdfs() {
    echo "\n=== 检查HDFS连接 ==="
    
    local hdfs_host=${HDFS_HOST:-"localhost"}
    local hdfs_port=${HDFS_PORT:-"9000"}
    
    # 检查NameNode
    echo "检查HDFS NameNode连接:"
    check_network $hdfs_host $hdfs_port
    
    # 检查HDFS服务状态
    if command -v hdfs >/dev/null 2>&1; then
        echo "检查HDFS文件系统:"
        if hdfs dfs -ls / >/dev/null 2>&1; then
            echo "✅ HDFS服务正常"
        else
            echo "❌ HDFS服务异常"
        fi
    else
        echo "⚠️ hdfs命令不可用,无法测试HDFS连接"
    fi
}

# 检查系统资源
check_system_resources() {
    echo "\n=== 检查系统资源 ==="
    
    # 检查磁盘空间
    echo "磁盘使用情况:"
    df -h | grep -E "(Filesystem|/dev/)"
    
    # 检查内存使用
    echo "\n内存使用情况:"
    free -h
    
    # 检查CPU负载
    echo "\nCPU负载:"
    uptime
    
    # 检查网络连接
    echo "\n网络连接统计:"
    netstat -an | awk '/^tcp/ {print $6}' | sort | uniq -c | sort -nr
}

# 主函数
main() {
    echo "开始数据源故障排查..."
    echo "时间: $(date)"
    
    check_system_resources
    check_hive
    check_kafka
    check_jdbc
    check_hdfs
    
    echo "\n=== 故障排查完成 ==="
}

# 执行主函数
main

4.7.2 日志分析

日志分析脚本:

#!/usr/bin/env python3
# log_analyzer.py - 数据源日志分析工具

import re
import json
import argparse
from datetime import datetime, timedelta
from collections import defaultdict, Counter
import matplotlib.pyplot as plt
import pandas as pd

class LogAnalyzer:
    def __init__(self):
        self.error_patterns = {
            'connection_timeout': r'Connection.*timeout',
            'connection_refused': r'Connection refused',
            'out_of_memory': r'OutOfMemoryError|Java heap space',
            'sql_exception': r'SQLException|SQL.*Error',
            'hive_error': r'HiveException|Hive.*Error',
            'kafka_error': r'KafkaException|Kafka.*Error',
            'hdfs_error': r'HDFSException|HDFS.*Error'
        }
        
        self.warning_patterns = {
            'slow_query': r'Query.*took.*[0-9]+.*seconds',
            'high_memory': r'Memory usage.*[8-9][0-9]%|Memory usage.*100%',
            'connection_pool': r'Connection pool.*exhausted|Pool.*full'
        }
    
    def parse_log_line(self, line):
        """解析日志行"""
        # 通用日志格式: YYYY-MM-DD HH:MM:SS LEVEL [THREAD] CLASS - MESSAGE
        log_pattern = r'(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}[,.]\d{3})\s+(\w+)\s+\[([^\]]+)\]\s+([^\s]+)\s+-\s+(.*)'
        
        match = re.match(log_pattern, line)
        if match:
            timestamp_str, level, thread, class_name, message = match.groups()
            
            # 解析时间戳
            try:
                timestamp = datetime.strptime(timestamp_str.replace(',', '.'), '%Y-%m-%d %H:%M:%S.%f')
            except ValueError:
                timestamp = datetime.strptime(timestamp_str.split(',')[0], '%Y-%m-%d %H:%M:%S')
            
            return {
                'timestamp': timestamp,
                'level': level,
                'thread': thread,
                'class': class_name,
                'message': message,
                'raw_line': line
            }
        
        return None
    
    def analyze_errors(self, log_entries):
        """分析错误日志"""
        error_analysis = {
            'error_counts': defaultdict(int),
            'error_timeline': defaultdict(list),
            'error_details': defaultdict(list)
        }
        
        for entry in log_entries:
            if entry['level'] in ['ERROR', 'FATAL']:
                message = entry['message']
                
                # 匹配错误模式
                for error_type, pattern in self.error_patterns.items():
                    if re.search(pattern, message, re.IGNORECASE):
                        error_analysis['error_counts'][error_type] += 1
                        error_analysis['error_timeline'][error_type].append(entry['timestamp'])
                        error_analysis['error_details'][error_type].append({
                            'timestamp': entry['timestamp'],
                            'message': message,
                            'thread': entry['thread'],
                            'class': entry['class']
                        })
                        break
                else:
                    # 未分类的错误
                    error_analysis['error_counts']['other'] += 1
                    error_analysis['error_timeline']['other'].append(entry['timestamp'])
                    error_analysis['error_details']['other'].append({
                        'timestamp': entry['timestamp'],
                        'message': message,
                        'thread': entry['thread'],
                        'class': entry['class']
                    })
        
        return error_analysis
    
    def analyze_warnings(self, log_entries):
        """分析警告日志"""
        warning_analysis = {
            'warning_counts': defaultdict(int),
            'warning_timeline': defaultdict(list),
            'warning_details': defaultdict(list)
        }
        
        for entry in log_entries:
            if entry['level'] == 'WARN':
                message = entry['message']
                
                # 匹配警告模式
                for warning_type, pattern in self.warning_patterns.items():
                    if re.search(pattern, message, re.IGNORECASE):
                        warning_analysis['warning_counts'][warning_type] += 1
                        warning_analysis['warning_timeline'][warning_type].append(entry['timestamp'])
                        warning_analysis['warning_details'][warning_type].append({
                            'timestamp': entry['timestamp'],
                            'message': message,
                            'thread': entry['thread'],
                            'class': entry['class']
                        })
                        break
        
        return warning_analysis
    
    def analyze_performance(self, log_entries):
        """分析性能日志"""
        performance_analysis = {
            'query_times': [],
            'slow_queries': [],
            'memory_usage': [],
            'connection_stats': defaultdict(int)
        }
        
        for entry in log_entries:
            message = entry['message']
            
            # 提取查询时间
            query_time_match = re.search(r'Query.*took\s+(\d+(?:\.\d+)?)\s*(?:seconds?|ms)', message, re.IGNORECASE)
            if query_time_match:
                query_time = float(query_time_match.group(1))
                if 'ms' not in message.lower():
                    query_time *= 1000  # 转换为毫秒
                
                performance_analysis['query_times'].append({
                    'timestamp': entry['timestamp'],
                    'duration': query_time
                })
                
                if query_time > 5000:  # 超过5秒的慢查询
                    performance_analysis['slow_queries'].append({
                        'timestamp': entry['timestamp'],
                        'duration': query_time,
                        'message': message
                    })
            
            # 提取内存使用
            memory_match = re.search(r'Memory usage[:\s]+(\d+(?:\.\d+)?)%', message, re.IGNORECASE)
            if memory_match:
                memory_usage = float(memory_match.group(1))
                performance_analysis['memory_usage'].append({
                    'timestamp': entry['timestamp'],
                    'usage': memory_usage
                })
            
            # 统计连接相关信息
            if 'connection' in message.lower():
                if 'created' in message.lower():
                    performance_analysis['connection_stats']['created'] += 1
                elif 'closed' in message.lower():
                    performance_analysis['connection_stats']['closed'] += 1
                elif 'timeout' in message.lower():
                    performance_analysis['connection_stats']['timeout'] += 1
        
        return performance_analysis
    
    def generate_report(self, log_file, output_file=None):
        """生成分析报告"""
        print(f"分析日志文件: {log_file}")
        
        # 读取日志文件
        log_entries = []
        with open(log_file, 'r', encoding='utf-8') as f:
            for line_num, line in enumerate(f, 1):
                entry = self.parse_log_line(line.strip())
                if entry:
                    log_entries.append(entry)
                elif line_num <= 10:  # 只显示前10行解析失败的信息
                    print(f"警告: 第{line_num}行解析失败: {line[:100]}...")
        
        print(f"成功解析 {len(log_entries)} 条日志记录")
        
        if not log_entries:
            print("没有找到有效的日志记录")
            return
        
        # 分析日志
        error_analysis = self.analyze_errors(log_entries)
        warning_analysis = self.analyze_warnings(log_entries)
        performance_analysis = self.analyze_performance(log_entries)
        
        # 生成报告
        report = {
            'analysis_time': datetime.now().isoformat(),
            'log_file': log_file,
            'total_entries': len(log_entries),
            'time_range': {
                'start': min(entry['timestamp'] for entry in log_entries).isoformat(),
                'end': max(entry['timestamp'] for entry in log_entries).isoformat()
            },
            'level_distribution': dict(Counter(entry['level'] for entry in log_entries)),
            'error_analysis': {
                'total_errors': sum(error_analysis['error_counts'].values()),
                'error_types': dict(error_analysis['error_counts']),
                'top_errors': self.get_top_errors(error_analysis)
            },
            'warning_analysis': {
                'total_warnings': sum(warning_analysis['warning_counts'].values()),
                'warning_types': dict(warning_analysis['warning_counts'])
            },
            'performance_analysis': {
                'total_queries': len(performance_analysis['query_times']),
                'slow_queries': len(performance_analysis['slow_queries']),
                'avg_query_time': self.calculate_avg_query_time(performance_analysis['query_times']),
                'connection_stats': dict(performance_analysis['connection_stats'])
            }
        }
        
        # 保存报告
        if output_file:
            with open(output_file, 'w', encoding='utf-8') as f:
                json.dump(report, f, indent=2, ensure_ascii=False, default=str)
            print(f"分析报告已保存到: {output_file}")
        
        # 打印摘要
        self.print_report_summary(report)
        
        return report
    
    def get_top_errors(self, error_analysis, top_n=5):
        """获取最频繁的错误"""
        top_errors = []
        
        for error_type, count in sorted(error_analysis['error_counts'].items(), 
                                      key=lambda x: x[1], reverse=True)[:top_n]:
            if error_type in error_analysis['error_details']:
                latest_error = max(error_analysis['error_details'][error_type], 
                                 key=lambda x: x['timestamp'])
                top_errors.append({
                    'type': error_type,
                    'count': count,
                    'latest_message': latest_error['message'][:200],
                    'latest_time': latest_error['timestamp'].isoformat()
                })
        
        return top_errors
    
    def calculate_avg_query_time(self, query_times):
        """计算平均查询时间"""
        if not query_times:
            return 0
        
        total_time = sum(qt['duration'] for qt in query_times)
        return total_time / len(query_times)
    
    def print_report_summary(self, report):
        """打印报告摘要"""
        print("\n=== 日志分析报告 ===")
        print(f"分析时间: {report['analysis_time']}")
        print(f"日志文件: {report['log_file']}")
        print(f"总记录数: {report['total_entries']}")
        print(f"时间范围: {report['time_range']['start']} ~ {report['time_range']['end']}")
        
        # 日志级别分布
        print("\n📊 日志级别分布:")
        for level, count in sorted(report['level_distribution'].items()):
            print(f"  {level}: {count}")
        
        # 错误分析
        error_analysis = report['error_analysis']
        print(f"\n❌ 错误分析 (总计: {error_analysis['total_errors']})")
        if error_analysis['error_types']:
            for error_type, count in sorted(error_analysis['error_types'].items(), 
                                          key=lambda x: x[1], reverse=True):
                print(f"  {error_type}: {count}")
        else:
            print("  无错误记录")
        
        # 性能分析
        perf_analysis = report['performance_analysis']
        print(f"\n⚡ 性能分析:")
        print(f"  总查询数: {perf_analysis['total_queries']}")
        print(f"  慢查询数: {perf_analysis['slow_queries']}")
        if perf_analysis['avg_query_time'] > 0:
            print(f"  平均查询时间: {perf_analysis['avg_query_time']:.2f}ms")
        
        # 连接统计
        conn_stats = perf_analysis['connection_stats']
        if conn_stats:
            print(f"  连接统计: 创建{conn_stats.get('created', 0)} 关闭{conn_stats.get('closed', 0)} 超时{conn_stats.get('timeout', 0)}")

def main():
    parser = argparse.ArgumentParser(description='数据源日志分析工具')
    parser.add_argument('log_file', help='日志文件路径')
    parser.add_argument('--output', '-o', help='输出报告文件路径')
    
    args = parser.parse_args()
    
    analyzer = LogAnalyzer()
    analyzer.generate_report(args.log_file, args.output)

if __name__ == "__main__":
    main()

4.8 本章小结

本章详细介绍了Apache Kylin的数据源管理与连接,主要内容包括:

核心知识点

  1. 数据源类型

    • Hive数据源:主要的批处理数据源
    • Kafka数据源:流式数据处理
    • JDBC数据源:关系型数据库集成
    • 文件数据源:Parquet、ORC格式支持
  2. 连接配置

    • 连接池管理和优化
    • 安全认证配置
    • 性能调优参数
  3. 数据同步

    • 增量数据同步策略
    • 批量数据处理
    • 实时流数据处理
  4. 监控管理

    • 健康检查机制
    • 性能监控指标
    • 故障诊断工具

最佳实践

  1. 连接池配置:合理设置连接池大小和超时参数
  2. 安全配置:启用SSL/TLS加密和身份认证
  3. 性能优化:根据数据特点选择合适的数据源类型
  4. 监控告警:建立完善的监控和告警机制

实用工具

本章提供了多个实用脚本: - Hive表同步和分析工具 - Kafka主题管理和流处理工具 - JDBC数据同步工具 - 数据源健康检查工具 - 性能监控工具 - 故障排查和日志分析工具

下一章预告

下一章将介绍数据模型设计与管理,包括: - 维度建模理论 - 星型模式和雪花模式设计 - Kylin数据模型创建 - 维度和度量定义 - 模型优化策略

练习与思考

理论练习

  1. 比较Hive、Kafka、JDBC三种数据源的特点和适用场景
  2. 解释连接池的工作原理和配置要点
  3. 分析流式数据处理和批处理的区别

实践练习

  1. 配置一个Hive数据源,包括安全认证和性能优化
  2. 设置Kafka流式数据源,实现实时数据处理
  3. 配置JDBC数据源连接MySQL数据库
  4. 使用提供的脚本进行数据源健康检查
  5. 分析Kylin日志文件,识别性能瓶颈

思考题

  1. 如何设计一个支持多种数据源的统一数据接入架构?
  2. 在大数据量场景下,如何优化数据源连接性能?
  3. 如何实现数据源的故障自动切换和恢复?
  4. 流式数据和批处理数据如何在Kylin中协同工作?

4.2.3 Hive性能优化

查询优化配置:

# Hive查询优化
kylin.source.hive.beeline-shell=/opt/hive/bin/beeline
kylin.source.hive.beeline-params=-u jdbc:hive2://localhost:10000 --hiveconf hive.security.authorization.sqlstd.confwhitelist.append=mapreduce.job.queuename|mapreduce.job.tags

# 启用Tez执行引擎
kylin.source.hive.beeline-params=--hiveconf hive.execution.engine=tez

# 启用向量化
kylin.source.hive.beeline-params=--hiveconf hive.vectorized.execution.enabled=true

# 启用CBO
kylin.source.hive.beeline-params=--hiveconf hive.cbo.enable=true

# 动态分区裁剪
kylin.source.hive.beeline-params=--hiveconf hive.optimize.ppd=true

Hive表优化脚本:

#!/bin/bash
# optimize_hive_tables.sh - Hive表优化脚本

DATABASE="$1"
TABLE="$2"

if [ -z "$DATABASE" ] || [ -z "$TABLE" ]; then
    echo "用法: $0 <database> <table>"
    exit 1
fi

echo "优化Hive表: $DATABASE.$TABLE"

# 1. 收集表统计信息
echo "收集表统计信息..."
hive -e "ANALYZE TABLE $DATABASE.$TABLE COMPUTE STATISTICS;"

# 2. 收集列统计信息
echo "收集列统计信息..."
COLUMNS=$(hive -e "DESCRIBE $DATABASE.$TABLE;" | awk '{print $1}' | grep -v '^#' | grep -v '^$' | tr '\n' ',')
COLUMNS=${COLUMNS%,}  # 移除最后的逗号

hive -e "ANALYZE TABLE $DATABASE.$TABLE COMPUTE STATISTICS FOR COLUMNS $COLUMNS;"

# 3. 优化文件格式(转换为ORC)
echo "检查表格式..."
FORMAT=$(hive -e "DESCRIBE FORMATTED $DATABASE.$TABLE;" | grep 'InputFormat:' | awk '{print $2}')

if [[ "$FORMAT" != *"OrcInputFormat"* ]]; then
    echo "转换表格式为ORC..."
    
    # 创建ORC格式的新表
    hive -e "
        CREATE TABLE ${DATABASE}.${TABLE}_orc
        STORED AS ORC
        TBLPROPERTIES ('orc.compress'='SNAPPY')
        AS SELECT * FROM $DATABASE.$TABLE;
    "
    
    # 备份原表
    hive -e "ALTER TABLE $DATABASE.$TABLE RENAME TO ${DATABASE}.${TABLE}_backup;"
    
    # 重命名新表
    hive -e "ALTER TABLE ${DATABASE}.${TABLE}_orc RENAME TO $DATABASE.$TABLE;"
    
    echo "表格式转换完成"
else
    echo "表已经是ORC格式"
fi

# 4. 压缩小文件
echo "检查小文件..."
FILE_COUNT=$(hadoop fs -ls /warehouse/$DATABASE.db/$TABLE/ 2>/dev/null | wc -l)

if [ "$FILE_COUNT" -gt 100 ]; then
    echo "发现小文件过多,进行合并..."
    
    hive -e "
        SET hive.merge.mapfiles=true;
        SET hive.merge.mapredfiles=true;
        SET hive.merge.size.per.task=256000000;
        SET hive.merge.smallfiles.avgsize=128000000;
        
        INSERT OVERWRITE TABLE $DATABASE.$TABLE
        SELECT * FROM $DATABASE.$TABLE;
    "
    
    echo "小文件合并完成"
fi

echo "表优化完成: $DATABASE.$TABLE"

4.3 Kafka数据源配置

4.3.1 Kafka连接配置

Kafka数据源配置:

# Kafka数据源配置
kylin.source.kafka.bootstrap-servers=kafka1:9092,kafka2:9092,kafka3:9092
kylin.source.kafka.consumer.group.id=kylin-consumer-group
kylin.source.kafka.consumer.auto.offset.reset=earliest
kylin.source.kafka.consumer.enable.auto.commit=false
kylin.source.kafka.consumer.max.poll.records=1000
kylin.source.kafka.consumer.session.timeout.ms=30000
kylin.source.kafka.consumer.heartbeat.interval.ms=3000

# Kafka安全配置
kylin.source.kafka.security.protocol=SASL_SSL
kylin.source.kafka.sasl.mechanism=PLAIN
kylin.source.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kylin" password="kylin123";
kylin.source.kafka.ssl.truststore.location=/opt/kylin/conf/kafka.client.truststore.jks
kylin.source.kafka.ssl.truststore.password=truststore123

# 流处理配置
kylin.source.kafka.streaming.window.size=60000
kylin.source.kafka.streaming.watermark.delay=10000
kylin.source.kafka.streaming.checkpoint.interval=30000

Kafka主题管理脚本:

#!/bin/bash
# kafka_topic_manager.sh - Kafka主题管理脚本

KAFKA_HOME="/opt/kafka"
BOOTSTRAP_SERVERS="kafka1:9092,kafka2:9092,kafka3:9092"
ACTION="$1"
TOPIC="$2"

case "$ACTION" in
    "create")
        if [ -z "$TOPIC" ]; then
            echo "用法: $0 create <topic_name> [partitions] [replication_factor]"
            exit 1
        fi
        
        PARTITIONS=${3:-3}
        REPLICATION_FACTOR=${4:-2}
        
        echo "创建Kafka主题: $TOPIC"
        $KAFKA_HOME/bin/kafka-topics.sh \
            --create \
            --bootstrap-server $BOOTSTRAP_SERVERS \
            --topic $TOPIC \
            --partitions $PARTITIONS \
            --replication-factor $REPLICATION_FACTOR
        ;;
    
    "list")
        echo "列出所有Kafka主题:"
        $KAFKA_HOME/bin/kafka-topics.sh \
            --list \
            --bootstrap-server $BOOTSTRAP_SERVERS
        ;;
    
    "describe")
        if [ -z "$TOPIC" ]; then
            echo "用法: $0 describe <topic_name>"
            exit 1
        fi
        
        echo "描述Kafka主题: $TOPIC"
        $KAFKA_HOME/bin/kafka-topics.sh \
            --describe \
            --bootstrap-server $BOOTSTRAP_SERVERS \
            --topic $TOPIC
        ;;
    
    "delete")
        if [ -z "$TOPIC" ]; then
            echo "用法: $0 delete <topic_name>"
            exit 1
        fi
        
        echo "删除Kafka主题: $TOPIC"
        $KAFKA_HOME/bin/kafka-topics.sh \
            --delete \
            --bootstrap-server $BOOTSTRAP_SERVERS \
            --topic $TOPIC
        ;;
    
    "produce")
        if [ -z "$TOPIC" ]; then
            echo "用法: $0 produce <topic_name>"
            exit 1
        fi
        
        echo "向主题 $TOPIC 发送消息 (Ctrl+C 退出):"
        $KAFKA_HOME/bin/kafka-console-producer.sh \
            --bootstrap-server $BOOTSTRAP_SERVERS \
            --topic $TOPIC
        ;;
    
    "consume")
        if [ -z "$TOPIC" ]; then
            echo "用法: $0 consume <topic_name> [from_beginning]"
            exit 1
        fi
        
        FROM_BEGINNING="$3"
        
        echo "消费主题 $TOPIC 的消息 (Ctrl+C 退出):"
        
        if [ "$FROM_BEGINNING" = "true" ]; then
            $KAFKA_HOME/bin/kafka-console-consumer.sh \
                --bootstrap-server $BOOTSTRAP_SERVERS \
                --topic $TOPIC \
                --from-beginning
        else
            $KAFKA_HOME/bin/kafka-console-consumer.sh \
                --bootstrap-server $BOOTSTRAP_SERVERS \
                --topic $TOPIC
        fi
        ;;
    
    "monitor")
        if [ -z "$TOPIC" ]; then
            echo "用法: $0 monitor <topic_name>"
            exit 1
        fi
        
        echo "监控主题 $TOPIC:"
        
        while true; do
            echo "=== $(date) ==="
            
            # 获取主题详情
            $KAFKA_HOME/bin/kafka-topics.sh \
                --describe \
                --bootstrap-server $BOOTSTRAP_SERVERS \
                --topic $TOPIC
            
            # 获取消费者组信息
            echo "\n消费者组信息:"
            $KAFKA_HOME/bin/kafka-consumer-groups.sh \
                --bootstrap-server $BOOTSTRAP_SERVERS \
                --describe \
                --all-groups | grep $TOPIC
            
            echo "\n---"
            sleep 10
        done
        ;;
    
    *)
        echo "用法: $0 {create|list|describe|delete|produce|consume|monitor} [options]"
        echo "示例:"
        echo "  $0 create sales_events 6 3"
        echo "  $0 list"
        echo "  $0 describe sales_events"
        echo "  $0 consume sales_events true"
        echo "  $0 monitor sales_events"
        exit 1
        ;;
esac

4.3.2 流式数据模型

Kafka流式数据模型配置:

{
  "name": "sales_streaming_model",
  "description": "销售流式数据模型",
  "fact_table": "kafka.sales_events",
  "streaming_config": {
    "kafka_config": {
      "bootstrap_servers": "kafka1:9092,kafka2:9092,kafka3:9092",
      "topic": "sales_events",
      "consumer_group": "kylin_sales_consumer",
      "auto_offset_reset": "latest"
    },
    "schema_config": {
      "format": "json",
      "timestamp_field": "event_time",
      "timestamp_format": "yyyy-MM-dd HH:mm:ss"
    },
    "window_config": {
      "window_size": "1 minute",
      "slide_interval": "30 seconds",
      "watermark_delay": "10 seconds"
    }
  },
  "columns": [
    {
      "name": "event_time",
      "type": "timestamp",
      "nullable": false
    },
    {
      "name": "order_id",
      "type": "string",
      "nullable": false
    },
    {
      "name": "customer_id",
      "type": "string",
      "nullable": false
    },
    {
      "name": "product_id",
      "type": "string",
      "nullable": false
    },
    {
      "name": "quantity",
      "type": "int",
      "nullable": false
    },
    {
      "name": "unit_price",
      "type": "decimal(10,2)",
      "nullable": false
    },
    {
      "name": "total_amount",
      "type": "decimal(12,2)",
      "nullable": false
    }
  ]
}

流式数据处理脚本:

#!/usr/bin/env python3
# kafka_stream_processor.py - Kafka流式数据处理

import json
import time
from datetime import datetime
from kafka import KafkaConsumer, KafkaProducer
from kafka.errors import KafkaError
import logging

class KafkaStreamProcessor:
    def __init__(self, config):
        self.config = config
        self.consumer = None
        self.producer = None
        self.setup_logging()
        self.setup_kafka()
    
    def setup_logging(self):
        """设置日志"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('kafka_stream_processor.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def setup_kafka(self):
        """设置Kafka连接"""
        try:
            # 设置消费者
            self.consumer = KafkaConsumer(
                self.config['input_topic'],
                bootstrap_servers=self.config['bootstrap_servers'],
                group_id=self.config['consumer_group'],
                auto_offset_reset='latest',
                enable_auto_commit=False,
                value_deserializer=lambda x: json.loads(x.decode('utf-8'))
            )
            
            # 设置生产者
            self.producer = KafkaProducer(
                bootstrap_servers=self.config['bootstrap_servers'],
                value_serializer=lambda x: json.dumps(x).encode('utf-8'),
                acks='all',
                retries=3
            )
            
            self.logger.info("Kafka连接设置完成")
            
        except Exception as e:
            self.logger.error(f"Kafka连接设置失败: {e}")
            raise
    
    def process_message(self, message):
        """处理单条消息"""
        try:
            # 数据验证
            if not self.validate_message(message):
                return None
            
            # 数据转换
            processed_data = self.transform_message(message)
            
            # 数据增强
            enriched_data = self.enrich_message(processed_data)
            
            return enriched_data
            
        except Exception as e:
            self.logger.error(f"消息处理失败: {e}, 消息: {message}")
            return None
    
    def validate_message(self, message):
        """验证消息格式"""
        required_fields = ['event_time', 'order_id', 'customer_id', 'product_id', 'quantity', 'unit_price']
        
        for field in required_fields:
            if field not in message:
                self.logger.warning(f"缺少必需字段: {field}")
                return False
        
        # 验证数据类型
        try:
            datetime.strptime(message['event_time'], '%Y-%m-%d %H:%M:%S')
            float(message['quantity'])
            float(message['unit_price'])
        except (ValueError, TypeError) as e:
            self.logger.warning(f"数据类型验证失败: {e}")
            return False
        
        return True
    
    def transform_message(self, message):
        """转换消息格式"""
        # 计算总金额
        total_amount = float(message['quantity']) * float(message['unit_price'])
        
        # 标准化时间格式
        event_time = datetime.strptime(message['event_time'], '%Y-%m-%d %H:%M:%S')
        
        transformed = {
            'event_time': event_time.isoformat(),
            'order_id': str(message['order_id']),
            'customer_id': str(message['customer_id']),
            'product_id': str(message['product_id']),
            'quantity': int(message['quantity']),
            'unit_price': round(float(message['unit_price']), 2),
            'total_amount': round(total_amount, 2),
            'processed_time': datetime.now().isoformat()
        }
        
        return transformed
    
    def enrich_message(self, message):
        """数据增强"""
        # 添加时间维度
        event_time = datetime.fromisoformat(message['event_time'])
        
        message.update({
            'year': event_time.year,
            'month': event_time.month,
            'day': event_time.day,
            'hour': event_time.hour,
            'day_of_week': event_time.weekday() + 1,
            'is_weekend': event_time.weekday() >= 5
        })
        
        # 添加业务分类
        if message['total_amount'] > 1000:
            message['order_category'] = 'high_value'
        elif message['total_amount'] > 100:
            message['order_category'] = 'medium_value'
        else:
            message['order_category'] = 'low_value'
        
        return message
    
    def send_to_output(self, processed_data):
        """发送处理后的数据"""
        try:
            future = self.producer.send(
                self.config['output_topic'],
                value=processed_data
            )
            
            # 等待发送完成
            future.get(timeout=10)
            
        except KafkaError as e:
            self.logger.error(f"发送消息失败: {e}")
            raise
    
    def run(self):
        """运行流处理器"""
        self.logger.info("开始流式数据处理")
        
        try:
            for message in self.consumer:
                # 处理消息
                processed_data = self.process_message(message.value)
                
                if processed_data:
                    # 发送到输出主题
                    self.send_to_output(processed_data)
                    
                    # 提交偏移量
                    self.consumer.commit()
                    
                    self.logger.debug(f"处理消息: {message.offset}")
                
        except KeyboardInterrupt:
            self.logger.info("收到停止信号")
        except Exception as e:
            self.logger.error(f"流处理器运行错误: {e}")
            raise
        finally:
            self.cleanup()
    
    def cleanup(self):
        """清理资源"""
        if self.consumer:
            self.consumer.close()
        if self.producer:
            self.producer.close()
        self.logger.info("资源清理完成")

def main():
    config = {
        'bootstrap_servers': ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
        'input_topic': 'sales_events_raw',
        'output_topic': 'sales_events_processed',
        'consumer_group': 'kylin_stream_processor'
    }
    
    processor = KafkaStreamProcessor(config)
    processor.run()

if __name__ == "__main__":
    main()

4.4 JDBC数据源配置

4.4.1 JDBC连接配置

JDBC数据源配置文件:

# JDBC数据源配置
kylin.source.jdbc.connection-url=jdbc:mysql://mysql-server:3306/sales_db?useSSL=false&serverTimezone=UTC
kylin.source.jdbc.driver=com.mysql.cj.jdbc.Driver
kylin.source.jdbc.username=kylin_user
kylin.source.jdbc.password=kylin123

# 连接池配置
kylin.source.jdbc.pool.max-total=20
kylin.source.jdbc.pool.max-idle=10
kylin.source.jdbc.pool.min-idle=5
kylin.source.jdbc.pool.max-wait-millis=30000
kylin.source.jdbc.pool.validation-query=SELECT 1
kylin.source.jdbc.pool.test-on-borrow=true
kylin.source.jdbc.pool.test-while-idle=true

# 查询配置
kylin.source.jdbc.fetch-size=1000
kylin.source.jdbc.query-timeout=300
kylin.source.jdbc.batch-size=1000

多数据源配置:

{
  "jdbc_data_sources": [
    {
      "name": "mysql_sales",
      "type": "mysql",
      "connection": {
        "url": "jdbc:mysql://mysql-server:3306/sales_db",
        "driver": "com.mysql.cj.jdbc.Driver",
        "username": "sales_user",
        "password": "sales123"
      },
      "pool_config": {
        "max_total": 20,
        "max_idle": 10,
        "min_idle": 5
      }
    },
    {
      "name": "postgresql_crm",
      "type": "postgresql",
      "connection": {
        "url": "jdbc:postgresql://pg-server:5432/crm_db",
        "driver": "org.postgresql.Driver",
        "username": "crm_user",
        "password": "crm123"
      },
      "pool_config": {
        "max_total": 15,
        "max_idle": 8,
        "min_idle": 3
      }
    },
    {
      "name": "oracle_finance",
      "type": "oracle",
      "connection": {
        "url": "jdbc:oracle:thin:@oracle-server:1521:XE",
        "driver": "oracle.jdbc.OracleDriver",
        "username": "finance_user",
        "password": "finance123"
      },
      "pool_config": {
        "max_total": 10,
        "max_idle": 5,
        "min_idle": 2
      }
    }
  ]
}

4.4.2 JDBC数据同步

JDBC数据同步脚本:

#!/usr/bin/env python3
# jdbc_data_sync.py - JDBC数据同步脚本

import json
import time
import logging
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
import pymysql
import psycopg2
import cx_Oracle
from hdfs import InsecureClient

class JDBCDataSync:
    def __init__(self, config_file):
        self.config = self.load_config(config_file)
        self.setup_logging()
        self.hdfs_client = InsecureClient(self.config['hdfs']['url'])
        
    def load_config(self, config_file):
        """加载配置文件"""
        with open(config_file, 'r', encoding='utf-8') as f:
            return json.load(f)
    
    def setup_logging(self):
        """设置日志"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('jdbc_data_sync.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def get_connection(self, datasource_config):
        """获取数据库连接"""
        db_type = datasource_config['type']
        conn_config = datasource_config['connection']
        
        try:
            if db_type == 'mysql':
                return pymysql.connect(
                    host=conn_config['host'],
                    port=conn_config.get('port', 3306),
                    user=conn_config['username'],
                    password=conn_config['password'],
                    database=conn_config['database'],
                    charset='utf8mb4'
                )
            elif db_type == 'postgresql':
                return psycopg2.connect(
                    host=conn_config['host'],
                    port=conn_config.get('port', 5432),
                    user=conn_config['username'],
                    password=conn_config['password'],
                    database=conn_config['database']
                )
            elif db_type == 'oracle':
                dsn = cx_Oracle.makedsn(
                    conn_config['host'],
                    conn_config.get('port', 1521),
                    service_name=conn_config['service_name']
                )
                return cx_Oracle.connect(
                    conn_config['username'],
                    conn_config['password'],
                    dsn
                )
            else:
                raise ValueError(f"不支持的数据库类型: {db_type}")
                
        except Exception as e:
            self.logger.error(f"连接数据库失败: {e}")
            raise
    
    def extract_table_data(self, datasource_config, table_config):
        """提取表数据"""
        table_name = table_config['name']
        self.logger.info(f"开始提取表数据: {table_name}")
        
        connection = None
        try:
            connection = self.get_connection(datasource_config)
            cursor = connection.cursor()
            
            # 构建查询SQL
            sql = self.build_extract_sql(table_config)
            self.logger.info(f"执行SQL: {sql}")
            
            cursor.execute(sql)
            
            # 获取列名
            columns = [desc[0] for desc in cursor.description]
            
            # 分批读取数据
            batch_size = table_config.get('batch_size', 10000)
            batch_num = 0
            
            while True:
                rows = cursor.fetchmany(batch_size)
                if not rows:
                    break
                
                batch_num += 1
                self.logger.info(f"处理批次 {batch_num}, 记录数: {len(rows)}")
                
                # 转换为JSON格式
                json_data = []
                for row in rows:
                    record = {}
                    for i, value in enumerate(row):
                        # 处理特殊数据类型
                        if isinstance(value, datetime):
                            record[columns[i]] = value.isoformat()
                        elif value is None:
                            record[columns[i]] = None
                        else:
                            record[columns[i]] = str(value)
                    json_data.append(record)
                
                # 写入HDFS
                self.write_to_hdfs(table_name, json_data, batch_num)
            
            self.logger.info(f"表数据提取完成: {table_name}, 总批次: {batch_num}")
            
        except Exception as e:
            self.logger.error(f"提取表数据失败: {table_name}, 错误: {e}")
            raise
        finally:
            if connection:
                connection.close()
    
    def build_extract_sql(self, table_config):
        """构建数据提取SQL"""
        table_name = table_config['name']
        columns = table_config.get('columns', ['*'])
        
        # 构建列列表
        if columns == ['*']:
            column_list = '*'
        else:
            column_list = ', '.join(columns)
        
        sql = f"SELECT {column_list} FROM {table_name}"
        
        # 添加WHERE条件
        if 'where_condition' in table_config:
            sql += f" WHERE {table_config['where_condition']}"
        
        # 添加增量同步条件
        if table_config.get('incremental', False):
            timestamp_column = table_config['timestamp_column']
            last_sync_time = self.get_last_sync_time(table_name)
            
            if last_sync_time:
                if 'where_condition' in table_config:
                    sql += f" AND {timestamp_column} > '{last_sync_time}'"
                else:
                    sql += f" WHERE {timestamp_column} > '{last_sync_time}'"
        
        # 添加ORDER BY
        if 'order_by' in table_config:
            sql += f" ORDER BY {table_config['order_by']}"
        
        return sql
    
    def write_to_hdfs(self, table_name, data, batch_num):
        """写入数据到HDFS"""
        try:
            # 构建HDFS路径
            date_str = datetime.now().strftime('%Y%m%d')
            hdfs_path = f"/kylin/data/{table_name}/{date_str}/batch_{batch_num:06d}.json"
            
            # 确保目录存在
            hdfs_dir = f"/kylin/data/{table_name}/{date_str}"
            if not self.hdfs_client.status(hdfs_dir, strict=False):
                self.hdfs_client.makedirs(hdfs_dir)
            
            # 写入数据
            json_content = '\n'.join(json.dumps(record, ensure_ascii=False) for record in data)
            
            with self.hdfs_client.write(hdfs_path, encoding='utf-8') as writer:
                writer.write(json_content)
            
            self.logger.info(f"数据已写入HDFS: {hdfs_path}")
            
        except Exception as e:
            self.logger.error(f"写入HDFS失败: {e}")
            raise
    
    def get_last_sync_time(self, table_name):
        """获取上次同步时间"""
        sync_file = f"/kylin/sync_status/{table_name}_last_sync.txt"
        
        try:
            if self.hdfs_client.status(sync_file, strict=False):
                with self.hdfs_client.read(sync_file, encoding='utf-8') as reader:
                    return reader.read().strip()
        except Exception:
            pass
        
        return None
    
    def update_sync_time(self, table_name, sync_time):
        """更新同步时间"""
        sync_file = f"/kylin/sync_status/{table_name}_last_sync.txt"
        
        try:
            # 确保目录存在
            sync_dir = "/kylin/sync_status"
            if not self.hdfs_client.status(sync_dir, strict=False):
                self.hdfs_client.makedirs(sync_dir)
            
            with self.hdfs_client.write(sync_file, encoding='utf-8', overwrite=True) as writer:
                writer.write(sync_time)
            
        except Exception as e:
            self.logger.error(f"更新同步时间失败: {e}")
    
    def sync_datasource(self, datasource_name):
        """同步单个数据源"""
        self.logger.info(f"开始同步数据源: {datasource_name}")
        
        datasource_config = None
        for ds in self.config['datasources']:
            if ds['name'] == datasource_name:
                datasource_config = ds
                break
        
        if not datasource_config:
            raise ValueError(f"未找到数据源配置: {datasource_name}")
        
        # 同步所有表
        for table_config in datasource_config['tables']:
            try:
                self.extract_table_data(datasource_config, table_config)
                
                # 更新同步时间
                if table_config.get('incremental', False):
                    current_time = datetime.now().isoformat()
                    self.update_sync_time(table_config['name'], current_time)
                
            except Exception as e:
                self.logger.error(f"同步表失败: {table_config['name']}, 错误: {e}")
                continue
        
        self.logger.info(f"数据源同步完成: {datasource_name}")
    
    def sync_all(self, parallel=True):
        """同步所有数据源"""
        self.logger.info("开始同步所有数据源")
        
        datasource_names = [ds['name'] for ds in self.config['datasources']]
        
        if parallel:
            # 并行同步
            with ThreadPoolExecutor(max_workers=self.config.get('max_workers', 4)) as executor:
                futures = {executor.submit(self.sync_datasource, name): name for name in datasource_names}
                
                for future in as_completed(futures):
                    datasource_name = futures[future]
                    try:
                        future.result()
                        self.logger.info(f"数据源同步成功: {datasource_name}")
                    except Exception as e:
                        self.logger.error(f"数据源同步失败: {datasource_name}, 错误: {e}")
        else:
            # 串行同步
            for datasource_name in datasource_names:
                try:
                    self.sync_datasource(datasource_name)
                except Exception as e:
                    self.logger.error(f"数据源同步失败: {datasource_name}, 错误: {e}")
                    continue
        
        self.logger.info("所有数据源同步完成")

def main():
    import argparse
    
    parser = argparse.ArgumentParser(description='JDBC数据同步工具')
    parser.add_argument('--config', required=True, help='配置文件路径')
    parser.add_argument('--datasource', help='指定数据源名称')
    parser.add_argument('--parallel', action='store_true', help='并行同步')
    
    args = parser.parse_args()
    
    sync_tool = JDBCDataSync(args.config)
    
    if args.datasource:
        sync_tool.sync_datasource(args.datasource)
    else:
        sync_tool.sync_all(args.parallel)

if __name__ == "__main__":
    main()

JDBC同步配置文件示例:

{
  "hdfs": {
    "url": "http://namenode:9870"
  },
  "max_workers": 4,
  "datasources": [
    {
      "name": "mysql_sales",
      "type": "mysql",
      "connection": {
        "host": "pg-server",
        "port": 5432,
        "username": "crm_user",
        "password": "crm123",
        "database": "crm_db"
      },
      "tables": [
        {
          "name": "customers",
          "columns": ["customer_id", "customer_name", "segment", "created_date"],
          "incremental": true,
          "timestamp_column": "created_date",
          "batch_size": 8000
        }
      ]
    }
  ]
}

4.5 文件数据源配置

4.5.1 Parquet文件配置

Parquet数据源配置:

# Parquet文件配置
kylin.source.parquet.base-path=/data/parquet
kylin.source.parquet.file-pattern=*.parquet
kylin.source.parquet.schema-inference=true
kylin.source.parquet.merge-schema=true
kylin.source.parquet.compression=snappy

# 读取优化
kylin.source.parquet.batch-size=10000
kylin.source.parquet.parallel-read=true
kylin.source.parquet.max-parallel-files=10

Parquet文件管理脚本:

#!/bin/bash
# parquet_file_manager.sh - Parquet文件管理脚本

PARQUET_BASE_PATH="/data/parquet"
ACTION="$1"
TABLE_NAME="$2"

case "$ACTION" in
    "list")
        echo "列出Parquet文件:"
        find $PARQUET_BASE_PATH -name "*.parquet" -type f | sort
        ;;
    
    "info")
        if [ -z "$TABLE_NAME" ]; then
            echo "用法: $0 info <parquet_file>"
            exit 1
        fi
        
        echo "Parquet文件信息: $TABLE_NAME"
        parquet-tools meta $TABLE_NAME
        echo "\n文件内容预览:"
        parquet-tools head -n 10 $TABLE_NAME
        ;;
    
    "schema")
        if [ -z "$TABLE_NAME" ]; then
            echo "用法: $0 schema <parquet_file>"
            exit 1
        fi
        
        echo "Parquet文件Schema: $TABLE_NAME"
        parquet-tools schema $TABLE_NAME
        ;;
    
    "convert")
        INPUT_FILE="$TABLE_NAME"
        OUTPUT_FILE="$3"
        
        if [ -z "$INPUT_FILE" ] || [ -z "$OUTPUT_FILE" ]; then
            echo "用法: $0 convert <input_file> <output_parquet>"
            exit 1
        fi
        
        echo "转换文件为Parquet格式: $INPUT_FILE -> $OUTPUT_FILE"
        
        # 使用Spark进行转换
        spark-submit --class org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat \
            --master local[*] \
            --conf spark.sql.adaptive.enabled=true \
            --conf spark.sql.adaptive.coalescePartitions.enabled=true \
            /opt/spark/examples/jars/spark-examples_2.12-3.1.2.jar \
            $INPUT_FILE $OUTPUT_FILE
        ;;
    
    "optimize")
        if [ -z "$TABLE_NAME" ]; then
            echo "用法: $0 optimize <parquet_directory>"
            exit 1
        fi
        
        echo "优化Parquet文件: $TABLE_NAME"
        
        # 合并小文件
        spark-submit --class ParquetOptimizer \
            --master local[*] \
            --conf spark.sql.adaptive.enabled=true \
            --conf spark.sql.adaptive.coalescePartitions.enabled=true \
            --conf spark.sql.files.maxPartitionBytes=134217728 \
            /opt/kylin/lib/parquet-optimizer.jar \
            $TABLE_NAME
        ;;
    
    *)
        echo "用法: $0 {list|info|schema|convert|optimize} [options]"
        echo "示例:"
        echo "  $0 list"
        echo "  $0 info /data/parquet/sales.parquet"
        echo "  $0 schema /data/parquet/sales.parquet"
        echo "  $0 convert /data/csv/sales.csv /data/parquet/sales.parquet"
        echo "  $0 optimize /data/parquet/sales/"
        exit 1
        ;;
esac

4.5.2 ORC文件配置

ORC数据源配置:

# ORC文件配置
kylin.source.orc.base-path=/data/orc
kylin.source.orc.file-pattern=*.orc
kylin.source.orc.compression=zlib
kylin.source.orc.stripe-size=67108864
kylin.source.orc.row-index-stride=10000

# 读取优化
kylin.source.orc.batch-size=1024
kylin.source.orc.use-selected=true
kylin.source.orc.zero-copy=true

4.6 数据源监控与管理

4.6.1 数据源健康检查

数据源健康检查脚本: “`python #!/usr/bin/env python3

datasource_health_check.py - 数据源健康检查

import json import time import logging from datetime import datetime import requests import pymysql import psycopg2 from kafka import KafkaConsumer from hdfs import InsecureClient

class DataSourceHealthChecker: def init(self, config_file): self.config = self.load_config(config_file) self.setup_logging() self.health_status = {}

def load_config(self, config_file):
    """加载配置文件"""
    with open(config_file, 'r', encoding='utf-8') as f:
        return json.load(f)

def setup_logging(self):
    """设置日志"""
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
    self.logger = logging.getLogger(__name__)

def check_hive_health(self, hive_config):
    """检查Hive健康状态"""
    try:
        # 检查Hive Metastore
        metastore_url = hive_config['metastore_url']
        response = requests.get(f"{metastore_url}/api/v1/databases", timeout=10)

        if response.status_code == 200:
            self.health_status['hive_metastore'] = 'healthy'
        else:
            self.health_status['hive_metastore'] = 'unhealthy'

        # 检查HiveServer2
        import jaydebeapi
        conn = jaydebeapi.connect(
            'org.apache.hive.jdbc.HiveDriver',
            hive_config['jdbc_url'],
            [hive_config['username'], hive_config['password']],
            hive_config['driver_path']
        )

        cursor = conn.cursor()
        cursor.execute("SHOW DATABASES")
        databases = cursor.fetchall()

        if databases:
            self.health_status['hive_server'] = 'healthy'
        else:
            self.health_status['hive_server'] = 'unhealthy'

        conn.close()

    except Exception as e:
        self.logger.error(f"Hive健康检查失败: {e}")
        self.health_status['hive_metastore'] = 'error'
        self.health_status['hive_server'] = 'error'

def check_kafka_health(self, kafka_config):
    """检查Kafka健康状态"""
    try:
        consumer = KafkaConsumer(
            bootstrap_servers=kafka_config['bootstrap_servers'],
            consumer_timeout_ms=5000
        )

        # 获取主题列表
        topics = consumer.topics()

        if topics:
            self.health_status['kafka'] = 'healthy'
        else:
            self.health_status['kafka'] = 'unhealthy'

        consumer.close()

    except Exception as e:
        self.logger.error(f"Kafka健康检查失败: {e}")
        self.health_status['kafka'] = 'error'

def check_jdbc_health(self, jdbc_configs):
    """检查JDBC数据源健康状态"""
    for jdbc_config in jdbc_configs:
        name = jdbc_config['name']
        try:
            if jdbc_config['type'] == 'mysql':
                conn = pymysql.connect(
                    host=jdbc_config['connection']['host'],
                    port=jdbc_config['connection'].get('port', 3306),
                    user=jdbc_config['connection']['username'],
                    password=jdbc_config['connection']['password'],
                    database=jdbc_config['connection']['database']
                )
            elif jdbc_config['type'] == 'postgresql':
                conn = psycopg2.connect(
                    host=jdbc_config['connection']['host'],
                    port=jdbc_config['connection'].get('port', 5432),
                    user=jdbc_config['connection']['username'],
                    password=jdbc_config['connection']['password'],
                    database=jdbc_config['connection']['database']
                )

            cursor = conn.cursor()
            cursor.execute("SELECT 1")
            result = cursor.fetchone()

            if result:
                self.health_status[f'jdbc_{name}'] = 'healthy'
            else:
                self.health_status[f'jdbc_{name}'] = 'unhealthy'

            conn.close()

        except Exception as e:
            self.logger.error(f"JDBC数据源 {name} 健康检查失败: {e}")
            self.health_status[f'jdbc_{name}'] = 'error'

def check_hdfs_health(self, hdfs_config):
    """检查HDFS健康状态"""
    try:
        client = InsecureClient(hdfs_config['url'])

        # 检查根目录
        status = client.status('/')

        if status:
            self.health_status['hdfs'] = 'healthy'
        else:
            self.health_status['hdfs'] = 'unhealthy'

    except Exception as e:
        self.logger.error(f"HDFS健康检查失败: {e}")
        self.health_status['hdfs'] = 'error'

def run_health_check(self):
    """运行健康检查"""
    self.logger.info("开始数据源健康检查")

    # 检查Hive
    if 'hive' in self.config:
        self.check_hive_health(self.config['hive'])

    # 检查Kafka
    if 'kafka' in self.config:
        self.check_kafka_health(self.config['kafka'])

    # 检查JDBC数据源
    if 'jdbc_datasources' in self.config:
        self.check_jdbc_health(self.config['jdbc_datasources'])

    # 检查HDFS
    if 'hdfs' in self.config:
        self.check_hdfs_health(self.config['hdfs'])

    # 生成报告
    self.generate_health_report()

def generate_health_report(self):
    """生成健康检查报告"""
    report = {
        'timestamp': datetime.now().isoformat(),
        'overall_status': self.calculate_overall_status(),
        'datasource_status': self.health_status,
        'summary': self.generate_summary()
    }

    # 保存报告
    report_file = f"datasource_health_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
    with open(report_file, 'w', encoding='utf-8') as f:
        json.dump(report, f, indent=2, ensure_ascii=False)

    self.logger.info(f"健康检查报告已保存: {report_file}")

    # 打印摘要
    self.print_health_summary(report)

def calculate_overall_status(self):
    """计算总体健康状态"""
    if not self.health_status:
        return 'unknown'

    error_count = sum(1 for status in self.health_status.values() if status == 'error')
    unhealthy_count = sum(1 for status in self.health_status.values() if status == 'unhealthy')

    if error_count > 0:
        return 'critical'
    elif unhealthy_count > 0:
        return 'warning'
    else:
        return 'healthy'

def generate_summary(self):
    """生成摘要信息"""
    total = len(self.health_status)
    healthy = sum(1 for status in self.health_status.values() if status == 'healthy')
    unhealthy = sum(1 for status in self.health_status.values() if status == 'unhealthy')
    error = sum(1 for status in self.health_status.values() if status == 'error')

    return {
        'total_datasources': total,
        'healthy_count': healthy,
        'unhealthy_count': unhealthy,
        'error_count': error,
        'health_percentage': round((healthy / total * 100) if total > 0 else 0, 2)
    }

def print_health_summary(self, report):
    """打印健康检查摘要"""
    print("\n=== 数据源健康检查报告 ===")
    print(f"检查时间: {report['timestamp']}")
    print(f"总体状态: {report['overall_status']}")
    print(f"健康率: {report['summary']['health_percentage']}%")

    print("\n数据源状态:")
    for datasource, status in report['datasource_status'].items():
        status_icon = {
            'healthy': '✅',
            'unhealthy': '⚠️',
            'error': '❌'
        }.get(status, '❓')
        print(f"  {status_icon} {datasource}: {status}")

    print(f"\n统计信息:")
    summary = report['summary']
    print(f"  总数据源: {summary['total_datasources']}")
    print(f"  健康: {summary['healthy_count']}")
    print(f"  异常: {summary['unhealthy_count']}")
    print(f"  错误: {summary['error_count']}")

def main(): import argparse

parser = argparse.ArgumentParser(description='数据源健康检查工具')
parser.add_argument('--config', required=True, help='配置文件路径')

args = parser.parse_args()

checker = DataSourceHealthChecker(args.config)
checker.run_health_check()

if name == “main”: main() “`”: “mysql-server”, “port”: 3306, “username”: “sales_user”, “password”: “sales123”, “database”: “sales_db” }, “tables”: [ { “name”: “orders”, “columns”: [“order_id”, “customer_id”, “order_date”, “total_amount”], “incremental”: true, “timestamp_column”: “order_date”, “batch_size”: 10000 }, { “name”: “customers”, “columns”: [“customer_id”, “customer_name”, “email”, “phone”], “incremental”: false, “batch_size”: 5000 } ] }, { “name”: “postgresql_crm”, “type”: “postgresql”, “connection”: { “host