4.1 数据源概述
4.1.1 支持的数据源类型
Apache Kylin支持多种数据源类型,为不同的数据场景提供灵活的接入方案:
主要数据源类型:
数据源类型 | 描述 | 适用场景 | 版本支持 |
---|---|---|---|
Hive | 基于Hadoop的数据仓库 | 批处理、历史数据分析 | 所有版本 |
Kafka | 分布式流处理平台 | 实时数据流、流式分析 | Kylin 3.0+ |
JDBC | 关系型数据库连接 | 传统数据库、小数据量 | Kylin 4.0+ |
Parquet | 列式存储格式 | 高性能分析、压缩存储 | Kylin 4.0+ |
ORC | 优化行列式存储 | Hive优化存储 | Kylin 3.0+ |
4.1.2 数据源架构
数据源连接架构:
graph TD
A[Kylin Server] --> B[Data Source Manager]
B --> C[Hive Connector]
B --> D[Kafka Connector]
B --> E[JDBC Connector]
B --> F[File Connector]
C --> G[Hive Metastore]
C --> H[HDFS]
D --> I[Kafka Cluster]
E --> J[MySQL]
E --> K[PostgreSQL]
E --> L[Oracle]
F --> M[Parquet Files]
F --> N[ORC Files]
连接池管理:
// 数据源连接池配置
public class DataSourceConnectionPool {
private final Map<String, HikariDataSource> connectionPools;
private final DataSourceConfig config;
public DataSourceConnectionPool(DataSourceConfig config) {
this.config = config;
this.connectionPools = new ConcurrentHashMap<>();
initializeConnectionPools();
}
private void initializeConnectionPools() {
// Hive连接池
HikariConfig hiveConfig = new HikariConfig();
hiveConfig.setJdbcUrl(config.getHiveJdbcUrl());
hiveConfig.setUsername(config.getHiveUsername());
hiveConfig.setPassword(config.getHivePassword());
hiveConfig.setMaximumPoolSize(config.getHiveMaxConnections());
hiveConfig.setMinimumIdle(config.getHiveMinIdle());
hiveConfig.setConnectionTimeout(config.getConnectionTimeout());
hiveConfig.setIdleTimeout(config.getIdleTimeout());
connectionPools.put("hive", new HikariDataSource(hiveConfig));
// JDBC连接池
for (JdbcDataSourceConfig jdbcConfig : config.getJdbcDataSources()) {
HikariConfig config = createJdbcConfig(jdbcConfig);
connectionPools.put(jdbcConfig.getName(), new HikariDataSource(config));
}
}
public Connection getConnection(String dataSourceName) throws SQLException {
HikariDataSource dataSource = connectionPools.get(dataSourceName);
if (dataSource == null) {
throw new IllegalArgumentException("Unknown data source: " + dataSourceName);
}
return dataSource.getConnection();
}
}
4.2 Hive数据源配置
4.2.1 Hive连接配置
基础配置文件(kylin.properties):
# Hive数据源配置
kylin.source.hive.database-for-flat-table=default
kylin.source.hive.flat-table-storage-format=SEQUENCEFILE
kylin.source.hive.flat-table-field-delimiter=\u001F
# Hive JDBC配置
kylin.source.hive.connection-url=jdbc:hive2://hive-server:10000/default
kylin.source.hive.connection-user=kylin
kylin.source.hive.connection-password=kylin123
kylin.source.hive.connection-driver=org.apache.hive.jdbc.HiveDriver
# 连接池配置
kylin.source.hive.pool.max-total=50
kylin.source.hive.pool.max-idle=10
kylin.source.hive.pool.min-idle=5
kylin.source.hive.pool.max-wait-millis=30000
# Hive表扫描配置
kylin.source.hive.table-dir-create-first=true
kylin.source.hive.redistribute-flat-table=true
kylin.source.hive.flat-table-cluster-by-dict-column=true
高级Hive配置:
<!-- hive-site.xml 优化配置 -->
<configuration>
<!-- 启用向量化执行 -->
<property>
<name>hive.vectorized.execution.enabled</name>
<value>true</value>
</property>
<!-- 启用CBO优化器 -->
<property>
<name>hive.cbo.enable</name>
<value>true</value>
</property>
<!-- 启用统计信息 -->
<property>
<name>hive.stats.autogather</name>
<value>true</value>
</property>
<!-- 动态分区配置 -->
<property>
<name>hive.exec.dynamic.partition</name>
<value>true</value>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
<!-- 压缩配置 -->
<property>
<name>hive.exec.compress.output</name>
<value>true</value>
</property>
<property>
<name>mapred.output.compression.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
</configuration>
4.2.2 Hive表管理
表发现与同步脚本:
#!/bin/bash
# hive_table_sync.sh - Hive表同步脚本
KYLIN_HOME="/opt/kylin"
HIVE_DATABASE="$1"
TABLE_PATTERN="$2"
if [ -z "$HIVE_DATABASE" ]; then
echo "用法: $0 <database> [table_pattern]"
echo "示例: $0 sales_db sales_*"
exit 1
fi
echo "开始同步Hive数据库: $HIVE_DATABASE"
# 获取表列表
if [ -n "$TABLE_PATTERN" ]; then
TABLES=$(hive -e "SHOW TABLES IN $HIVE_DATABASE LIKE '$TABLE_PATTERN';" 2>/dev/null)
else
TABLES=$(hive -e "SHOW TABLES IN $HIVE_DATABASE;" 2>/dev/null)
fi
if [ -z "$TABLES" ]; then
echo "未找到匹配的表"
exit 1
fi
echo "发现表: $TABLES"
# 同步每个表
for table in $TABLES; do
echo "正在同步表: $HIVE_DATABASE.$table"
# 获取表结构
hive -e "DESCRIBE FORMATTED $HIVE_DATABASE.$table;" > "/tmp/${table}_schema.txt"
# 检查表是否已存在于Kylin中
table_exists=$(curl -s -X GET \
-H "Authorization: Basic QURNSU46S1lMSU4=" \
"http://localhost:7070/kylin/api/tables/$HIVE_DATABASE.$table" | \
jq -r '.name // "null"')
if [ "$table_exists" = "null" ]; then
echo "添加新表到Kylin: $HIVE_DATABASE.$table"
# 加载表到Kylin
curl -X POST \
-H "Authorization: Basic QURNSU46S1lMSU4=" \
-H "Content-Type: application/json" \
-d "{
\"tables\": [\"$HIVE_DATABASE.$table\"],
\"project\": \"default\"
}" \
"http://localhost:7070/kylin/api/tables/$HIVE_DATABASE.$table/hive"
else
echo "表已存在,检查是否需要更新: $HIVE_DATABASE.$table"
# 重新加载表结构
curl -X PUT \
-H "Authorization: Basic QURNSU46S1lMSU4=" \
"http://localhost:7070/kylin/api/tables/$HIVE_DATABASE.$table/hive"
fi
echo "表同步完成: $HIVE_DATABASE.$table"
echo "---"
done
echo "Hive表同步完成"
表结构分析脚本:
#!/usr/bin/env python3
# hive_table_analyzer.py - Hive表结构分析
import sys
import json
import subprocess
from collections import defaultdict
class HiveTableAnalyzer:
def __init__(self, database):
self.database = database
self.tables_info = {}
def analyze_database(self):
"""分析整个数据库"""
print(f"分析Hive数据库: {self.database}")
# 获取所有表
tables = self.get_tables()
for table in tables:
print(f"分析表: {table}")
self.analyze_table(table)
# 生成分析报告
self.generate_report()
def get_tables(self):
"""获取数据库中的所有表"""
cmd = f"hive -e 'SHOW TABLES IN {self.database};'"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
print(f"错误: {result.stderr}")
return []
return [table.strip() for table in result.stdout.split('\n') if table.strip()]
def analyze_table(self, table):
"""分析单个表"""
table_info = {
'name': table,
'columns': [],
'partitions': [],
'storage_format': '',
'location': '',
'size': 0,
'row_count': 0
}
# 获取表结构
cmd = f"hive -e 'DESCRIBE FORMATTED {self.database}.{table};'"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
print(f"错误: 无法获取表 {table} 的结构")
return
# 解析表结构
lines = result.stdout.split('\n')
current_section = 'columns'
for line in lines:
line = line.strip()
if not line:
continue
if line.startswith('# Partition Information'):
current_section = 'partitions'
continue
elif line.startswith('# Detailed Table Information'):
current_section = 'details'
continue
if current_section == 'columns' and '\t' in line:
parts = line.split('\t')
if len(parts) >= 2 and not line.startswith('#'):
column_info = {
'name': parts[0].strip(),
'type': parts[1].strip(),
'comment': parts[2].strip() if len(parts) > 2 else ''
}
table_info['columns'].append(column_info)
elif current_section == 'partitions' and '\t' in line:
parts = line.split('\t')
if len(parts) >= 2 and not line.startswith('#'):
partition_info = {
'name': parts[0].strip(),
'type': parts[1].strip()
}
table_info['partitions'].append(partition_info)
elif current_section == 'details':
if 'InputFormat:' in line:
table_info['storage_format'] = line.split(':', 1)[1].strip()
elif 'Location:' in line:
table_info['location'] = line.split(':', 1)[1].strip()
# 获取表统计信息
self.get_table_stats(table, table_info)
self.tables_info[table] = table_info
def get_table_stats(self, table, table_info):
"""获取表统计信息"""
try:
# 获取行数
cmd = f"hive -e 'SELECT COUNT(*) FROM {self.database}.{table};'"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=300)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
for line in reversed(lines):
if line.strip().isdigit():
table_info['row_count'] = int(line.strip())
break
except subprocess.TimeoutExpired:
print(f"警告: 获取表 {table} 行数超时")
except Exception as e:
print(f"警告: 获取表 {table} 统计信息失败: {e}")
def generate_report(self):
"""生成分析报告"""
report = {
'database': self.database,
'total_tables': len(self.tables_info),
'tables': self.tables_info,
'summary': self.generate_summary()
}
# 保存报告
report_file = f"{self.database}_analysis_report.json"
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"\n分析报告已保存到: {report_file}")
# 打印摘要
self.print_summary(report['summary'])
def generate_summary(self):
"""生成摘要信息"""
summary = {
'total_tables': len(self.tables_info),
'total_columns': 0,
'partitioned_tables': 0,
'storage_formats': defaultdict(int),
'column_types': defaultdict(int),
'large_tables': [], # 行数 > 1000万
'potential_fact_tables': [], # 可能的事实表
'potential_dim_tables': [] # 可能的维度表
}
for table_name, table_info in self.tables_info.items():
# 统计列数
summary['total_columns'] += len(table_info['columns'])
# 统计分区表
if table_info['partitions']:
summary['partitioned_tables'] += 1
# 统计存储格式
storage_format = table_info['storage_format']
if storage_format:
summary['storage_formats'][storage_format] += 1
# 统计列类型
for column in table_info['columns']:
col_type = column['type'].lower()
if 'decimal' in col_type or 'double' in col_type or 'float' in col_type:
summary['column_types']['numeric'] += 1
elif 'int' in col_type or 'bigint' in col_type:
summary['column_types']['integer'] += 1
elif 'string' in col_type or 'varchar' in col_type:
summary['column_types']['string'] += 1
elif 'date' in col_type or 'timestamp' in col_type:
summary['column_types']['datetime'] += 1
else:
summary['column_types']['other'] += 1
# 识别大表
if table_info['row_count'] > 10000000: # 1000万行
summary['large_tables'].append({
'name': table_name,
'row_count': table_info['row_count']
})
# 简单的表类型识别
numeric_columns = sum(1 for col in table_info['columns']
if any(t in col['type'].lower()
for t in ['decimal', 'double', 'float', 'int']))
if numeric_columns > len(table_info['columns']) * 0.3: # 数值列占比 > 30%
summary['potential_fact_tables'].append(table_name)
else:
summary['potential_dim_tables'].append(table_name)
return summary
def print_summary(self, summary):
"""打印摘要信息"""
print("\n=== 数据库分析摘要 ===")
print(f"数据库: {self.database}")
print(f"总表数: {summary['total_tables']}")
print(f"总列数: {summary['total_columns']}")
print(f"分区表数: {summary['partitioned_tables']}")
print("\n存储格式分布:")
for format_type, count in summary['storage_formats'].items():
print(f" {format_type}: {count}")
print("\n列类型分布:")
for col_type, count in summary['column_types'].items():
print(f" {col_type}: {count}")
if summary['large_tables']:
print("\n大表 (>1000万行):")
for table in summary['large_tables']:
print(f" {table['name']}: {table['row_count']:,} 行")
print(f"\n可能的事实表 ({len(summary['potential_fact_tables'])}):")
for table in summary['potential_fact_tables'][:5]: # 只显示前5个
print(f" {table}")
print(f"\n可能的维度表 ({len(summary['potential_dim_tables'])}):")
for table in summary['potential_dim_tables'][:5]: # 只显示前5个
print(f" {table}")
def main():
if len(sys.argv) != 2:
print("用法: python3 hive_table_analyzer.py <database>")
sys.exit(1)
database = sys.argv[1]
analyzer = HiveTableAnalyzer(database)
analyzer.analyze_database()
if __name__ == "__main__":
main()
4.6.2 数据源性能监控
数据源性能监控配置:
# 数据源性能监控
kylin.datasource.monitor.enabled=true
kylin.datasource.monitor.interval=60000
kylin.datasource.monitor.metrics.enabled=true
kylin.datasource.monitor.slow-query.threshold=5000
# 连接池监控
kylin.datasource.pool.monitor.enabled=true
kylin.datasource.pool.monitor.log-abandoned=true
kylin.datasource.pool.monitor.remove-abandoned=true
kylin.datasource.pool.monitor.remove-abandoned-timeout=300
性能监控脚本:
#!/usr/bin/env python3
# datasource_performance_monitor.py - 数据源性能监控
import time
import json
import logging
import threading
from datetime import datetime, timedelta
import psutil
import requests
from collections import defaultdict, deque
class DataSourcePerformanceMonitor:
def __init__(self, config_file):
self.config = self.load_config(config_file)
self.setup_logging()
self.metrics = defaultdict(lambda: deque(maxlen=1000))
self.running = False
self.monitor_thread = None
def load_config(self, config_file):
"""加载配置文件"""
with open(config_file, 'r', encoding='utf-8') as f:
return json.load(f)
def setup_logging(self):
"""设置日志"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def collect_system_metrics(self):
"""收集系统指标"""
timestamp = datetime.now()
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
self.metrics['system_cpu'].append({
'timestamp': timestamp.isoformat(),
'value': cpu_percent
})
# 内存使用率
memory = psutil.virtual_memory()
self.metrics['system_memory'].append({
'timestamp': timestamp.isoformat(),
'value': memory.percent
})
# 磁盘I/O
disk_io = psutil.disk_io_counters()
if disk_io:
self.metrics['disk_read_bytes'].append({
'timestamp': timestamp.isoformat(),
'value': disk_io.read_bytes
})
self.metrics['disk_write_bytes'].append({
'timestamp': timestamp.isoformat(),
'value': disk_io.write_bytes
})
# 网络I/O
net_io = psutil.net_io_counters()
if net_io:
self.metrics['network_bytes_sent'].append({
'timestamp': timestamp.isoformat(),
'value': net_io.bytes_sent
})
self.metrics['network_bytes_recv'].append({
'timestamp': timestamp.isoformat(),
'value': net_io.bytes_recv
})
def collect_kylin_metrics(self):
"""收集Kylin指标"""
try:
kylin_config = self.config.get('kylin', {})
base_url = kylin_config.get('base_url', 'http://localhost:7070')
username = kylin_config.get('username', 'ADMIN')
password = kylin_config.get('password', 'KYLIN')
# 获取系统信息
response = requests.get(
f"{base_url}/kylin/api/admin/system",
auth=(username, password),
timeout=10
)
if response.status_code == 200:
system_info = response.json()
timestamp = datetime.now().isoformat()
# JVM内存使用
if 'jvm' in system_info:
jvm_info = system_info['jvm']
self.metrics['kylin_jvm_heap_used'].append({
'timestamp': timestamp,
'value': jvm_info.get('heapUsed', 0)
})
self.metrics['kylin_jvm_heap_max'].append({
'timestamp': timestamp,
'value': jvm_info.get('heapMax', 0)
})
# 活跃查询数
if 'activeQueries' in system_info:
self.metrics['kylin_active_queries'].append({
'timestamp': timestamp,
'value': system_info['activeQueries']
})
# 获取查询统计
response = requests.get(
f"{base_url}/kylin/api/query/statistics",
auth=(username, password),
timeout=10
)
if response.status_code == 200:
query_stats = response.json()
timestamp = datetime.now().isoformat()
self.metrics['kylin_query_count'].append({
'timestamp': timestamp,
'value': query_stats.get('totalQueries', 0)
})
self.metrics['kylin_avg_query_time'].append({
'timestamp': timestamp,
'value': query_stats.get('avgQueryTime', 0)
})
except Exception as e:
self.logger.error(f"收集Kylin指标失败: {e}")
def collect_datasource_metrics(self):
"""收集数据源指标"""
# 收集Hive指标
self.collect_hive_metrics()
# 收集Kafka指标
self.collect_kafka_metrics()
# 收集JDBC指标
self.collect_jdbc_metrics()
def collect_hive_metrics(self):
"""收集Hive指标"""
try:
hive_config = self.config.get('hive', {})
if not hive_config:
return
# 这里可以通过JMX或其他方式收集Hive指标
# 示例:收集Hive Metastore连接数
timestamp = datetime.now().isoformat()
# 模拟指标收集
self.metrics['hive_metastore_connections'].append({
'timestamp': timestamp,
'value': 10 # 实际应该从JMX获取
})
except Exception as e:
self.logger.error(f"收集Hive指标失败: {e}")
def collect_kafka_metrics(self):
"""收集Kafka指标"""
try:
kafka_config = self.config.get('kafka', {})
if not kafka_config:
return
# 这里可以通过Kafka JMX收集指标
timestamp = datetime.now().isoformat()
# 模拟指标收集
self.metrics['kafka_consumer_lag'].append({
'timestamp': timestamp,
'value': 100 # 实际应该从Kafka JMX获取
})
except Exception as e:
self.logger.error(f"收集Kafka指标失败: {e}")
def collect_jdbc_metrics(self):
"""收集JDBC指标"""
try:
jdbc_configs = self.config.get('jdbc_datasources', [])
for jdbc_config in jdbc_configs:
name = jdbc_config['name']
timestamp = datetime.now().isoformat()
# 模拟连接池指标
self.metrics[f'jdbc_{name}_active_connections'].append({
'timestamp': timestamp,
'value': 5 # 实际应该从连接池获取
})
self.metrics[f'jdbc_{name}_idle_connections'].append({
'timestamp': timestamp,
'value': 15 # 实际应该从连接池获取
})
except Exception as e:
self.logger.error(f"收集JDBC指标失败: {e}")
def analyze_performance(self):
"""分析性能数据"""
analysis_result = {
'timestamp': datetime.now().isoformat(),
'alerts': [],
'recommendations': [],
'summary': {}
}
# 分析CPU使用率
if 'system_cpu' in self.metrics and self.metrics['system_cpu']:
recent_cpu = [m['value'] for m in list(self.metrics['system_cpu'])[-10:]]
avg_cpu = sum(recent_cpu) / len(recent_cpu)
if avg_cpu > 80:
analysis_result['alerts'].append({
'type': 'high_cpu',
'message': f'CPU使用率过高: {avg_cpu:.1f}%',
'severity': 'warning'
})
analysis_result['recommendations'].append(
'建议检查是否有大量并发查询或优化查询性能'
)
# 分析内存使用率
if 'system_memory' in self.metrics and self.metrics['system_memory']:
recent_memory = [m['value'] for m in list(self.metrics['system_memory'])[-10:]]
avg_memory = sum(recent_memory) / len(recent_memory)
if avg_memory > 85:
analysis_result['alerts'].append({
'type': 'high_memory',
'message': f'内存使用率过高: {avg_memory:.1f}%',
'severity': 'critical'
})
analysis_result['recommendations'].append(
'建议增加内存或优化JVM堆大小配置'
)
# 分析查询性能
if 'kylin_avg_query_time' in self.metrics and self.metrics['kylin_avg_query_time']:
recent_query_time = [m['value'] for m in list(self.metrics['kylin_avg_query_time'])[-10:]]
avg_query_time = sum(recent_query_time) / len(recent_query_time)
if avg_query_time > 5000: # 5秒
analysis_result['alerts'].append({
'type': 'slow_query',
'message': f'平均查询时间过长: {avg_query_time:.0f}ms',
'severity': 'warning'
})
analysis_result['recommendations'].append(
'建议检查Cube设计和查询优化'
)
return analysis_result
def generate_performance_report(self):
"""生成性能报告"""
analysis = self.analyze_performance()
report = {
'timestamp': datetime.now().isoformat(),
'period': '最近1小时',
'analysis': analysis,
'metrics_summary': self.get_metrics_summary()
}
# 保存报告
report_file = f"performance_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
self.logger.info(f"性能报告已保存: {report_file}")
# 打印摘要
self.print_performance_summary(report)
return report
def get_metrics_summary(self):
"""获取指标摘要"""
summary = {}
for metric_name, metric_data in self.metrics.items():
if metric_data:
values = [m['value'] for m in metric_data]
summary[metric_name] = {
'count': len(values),
'avg': sum(values) / len(values),
'min': min(values),
'max': max(values),
'latest': values[-1] if values else 0
}
return summary
def print_performance_summary(self, report):
"""打印性能摘要"""
print("\n=== 数据源性能监控报告 ===")
print(f"报告时间: {report['timestamp']}")
print(f"监控周期: {report['period']}")
# 打印告警
alerts = report['analysis']['alerts']
if alerts:
print("\n⚠️ 性能告警:")
for alert in alerts:
severity_icon = {
'critical': '🔴',
'warning': '🟡',
'info': '🔵'
}.get(alert['severity'], '⚪')
print(f" {severity_icon} {alert['message']}")
else:
print("\n✅ 无性能告警")
# 打印建议
recommendations = report['analysis']['recommendations']
if recommendations:
print("\n💡 优化建议:")
for i, rec in enumerate(recommendations, 1):
print(f" {i}. {rec}")
# 打印关键指标
metrics_summary = report['metrics_summary']
print("\n📊 关键指标:")
key_metrics = [
('system_cpu', 'CPU使用率', '%'),
('system_memory', '内存使用率', '%'),
('kylin_active_queries', '活跃查询数', '个'),
('kylin_avg_query_time', '平均查询时间', 'ms')
]
for metric_key, metric_name, unit in key_metrics:
if metric_key in metrics_summary:
metric = metrics_summary[metric_key]
print(f" {metric_name}: {metric['latest']:.1f}{unit} (平均: {metric['avg']:.1f}{unit})")
def start_monitoring(self):
"""开始监控"""
self.running = True
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
self.logger.info("性能监控已启动")
def stop_monitoring(self):
"""停止监控"""
self.running = False
if self.monitor_thread:
self.monitor_thread.join()
self.logger.info("性能监控已停止")
def _monitor_loop(self):
"""监控循环"""
while self.running:
try:
# 收集指标
self.collect_system_metrics()
self.collect_kylin_metrics()
self.collect_datasource_metrics()
# 每10分钟生成一次报告
if int(time.time()) % 600 == 0:
self.generate_performance_report()
time.sleep(60) # 每分钟收集一次
except Exception as e:
self.logger.error(f"监控循环错误: {e}")
time.sleep(60)
def main():
import argparse
import signal
parser = argparse.ArgumentParser(description='数据源性能监控工具')
parser.add_argument('--config', required=True, help='配置文件路径')
parser.add_argument('--duration', type=int, default=3600, help='监控持续时间(秒)')
args = parser.parse_args()
monitor = DataSourcePerformanceMonitor(args.config)
def signal_handler(signum, frame):
print("\n正在停止监控...")
monitor.stop_monitoring()
monitor.generate_performance_report()
exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
monitor.start_monitoring()
try:
time.sleep(args.duration)
except KeyboardInterrupt:
pass
finally:
monitor.stop_monitoring()
monitor.generate_performance_report()
if __name__ == "__main__":
main()
4.7 故障排除
4.7.1 常见问题诊断
连接问题排查:
#!/bin/bash
# datasource_troubleshoot.sh - 数据源故障排查脚本
echo "=== 数据源连接故障排查 ==="
# 检查网络连通性
check_network() {
local host=$1
local port=$2
echo "检查网络连通性: $host:$port"
if command -v nc >/dev/null 2>&1; then
if nc -z -w5 $host $port; then
echo "✅ 网络连接正常"
else
echo "❌ 网络连接失败"
return 1
fi
else
if timeout 5 bash -c "</dev/tcp/$host/$port"; then
echo "✅ 网络连接正常"
else
echo "❌ 网络连接失败"
return 1
fi
fi
}
# 检查DNS解析
check_dns() {
local hostname=$1
echo "检查DNS解析: $hostname"
if nslookup $hostname >/dev/null 2>&1; then
echo "✅ DNS解析正常"
nslookup $hostname | grep "Address:" | tail -n +2
else
echo "❌ DNS解析失败"
return 1
fi
}
# 检查Hive连接
check_hive() {
echo "\n=== 检查Hive连接 ==="
local hive_host=${HIVE_HOST:-"localhost"}
local hive_port=${HIVE_PORT:-"10000"}
local metastore_port=${METASTORE_PORT:-"9083"}
# 检查HiveServer2
echo "检查HiveServer2连接:"
check_network $hive_host $hive_port
# 检查Metastore
echo "检查Metastore连接:"
check_network $hive_host $metastore_port
# 检查Hive服务状态
echo "检查Hive服务状态:"
if command -v beeline >/dev/null 2>&1; then
if beeline -u "jdbc:hive2://$hive_host:$hive_port" -e "SHOW DATABASES;" >/dev/null 2>&1; then
echo "✅ Hive服务正常"
else
echo "❌ Hive服务异常"
fi
else
echo "⚠️ beeline命令不可用,无法测试Hive连接"
fi
}
# 检查Kafka连接
check_kafka() {
echo "\n=== 检查Kafka连接 ==="
local kafka_host=${KAFKA_HOST:-"localhost"}
local kafka_port=${KAFKA_PORT:-"9092"}
# 检查Kafka Broker
echo "检查Kafka Broker连接:"
check_network $kafka_host $kafka_port
# 检查Kafka服务状态
if command -v kafka-topics.sh >/dev/null 2>&1; then
echo "检查Kafka主题列表:"
if kafka-topics.sh --bootstrap-server $kafka_host:$kafka_port --list >/dev/null 2>&1; then
echo "✅ Kafka服务正常"
kafka-topics.sh --bootstrap-server $kafka_host:$kafka_port --list | head -5
else
echo "❌ Kafka服务异常"
fi
else
echo "⚠️ kafka-topics.sh命令不可用,无法测试Kafka连接"
fi
}
# 检查JDBC连接
check_jdbc() {
echo "\n=== 检查JDBC连接 ==="
# MySQL连接检查
if [ ! -z "$MYSQL_HOST" ]; then
echo "检查MySQL连接:"
check_network ${MYSQL_HOST} ${MYSQL_PORT:-3306}
if command -v mysql >/dev/null 2>&1; then
if mysql -h${MYSQL_HOST} -P${MYSQL_PORT:-3306} -u${MYSQL_USER} -p${MYSQL_PASSWORD} -e "SELECT 1;" >/dev/null 2>&1; then
echo "✅ MySQL连接正常"
else
echo "❌ MySQL连接失败"
fi
fi
fi
# PostgreSQL连接检查
if [ ! -z "$POSTGRES_HOST" ]; then
echo "检查PostgreSQL连接:"
check_network ${POSTGRES_HOST} ${POSTGRES_PORT:-5432}
if command -v psql >/dev/null 2>&1; then
if PGPASSWORD=${POSTGRES_PASSWORD} psql -h ${POSTGRES_HOST} -p ${POSTGRES_PORT:-5432} -U ${POSTGRES_USER} -d ${POSTGRES_DB} -c "SELECT 1;" >/dev/null 2>&1; then
echo "✅ PostgreSQL连接正常"
else
echo "❌ PostgreSQL连接失败"
fi
fi
fi
}
# 检查HDFS连接
check_hdfs() {
echo "\n=== 检查HDFS连接 ==="
local hdfs_host=${HDFS_HOST:-"localhost"}
local hdfs_port=${HDFS_PORT:-"9000"}
# 检查NameNode
echo "检查HDFS NameNode连接:"
check_network $hdfs_host $hdfs_port
# 检查HDFS服务状态
if command -v hdfs >/dev/null 2>&1; then
echo "检查HDFS文件系统:"
if hdfs dfs -ls / >/dev/null 2>&1; then
echo "✅ HDFS服务正常"
else
echo "❌ HDFS服务异常"
fi
else
echo "⚠️ hdfs命令不可用,无法测试HDFS连接"
fi
}
# 检查系统资源
check_system_resources() {
echo "\n=== 检查系统资源 ==="
# 检查磁盘空间
echo "磁盘使用情况:"
df -h | grep -E "(Filesystem|/dev/)"
# 检查内存使用
echo "\n内存使用情况:"
free -h
# 检查CPU负载
echo "\nCPU负载:"
uptime
# 检查网络连接
echo "\n网络连接统计:"
netstat -an | awk '/^tcp/ {print $6}' | sort | uniq -c | sort -nr
}
# 主函数
main() {
echo "开始数据源故障排查..."
echo "时间: $(date)"
check_system_resources
check_hive
check_kafka
check_jdbc
check_hdfs
echo "\n=== 故障排查完成 ==="
}
# 执行主函数
main
4.7.2 日志分析
日志分析脚本:
#!/usr/bin/env python3
# log_analyzer.py - 数据源日志分析工具
import re
import json
import argparse
from datetime import datetime, timedelta
from collections import defaultdict, Counter
import matplotlib.pyplot as plt
import pandas as pd
class LogAnalyzer:
def __init__(self):
self.error_patterns = {
'connection_timeout': r'Connection.*timeout',
'connection_refused': r'Connection refused',
'out_of_memory': r'OutOfMemoryError|Java heap space',
'sql_exception': r'SQLException|SQL.*Error',
'hive_error': r'HiveException|Hive.*Error',
'kafka_error': r'KafkaException|Kafka.*Error',
'hdfs_error': r'HDFSException|HDFS.*Error'
}
self.warning_patterns = {
'slow_query': r'Query.*took.*[0-9]+.*seconds',
'high_memory': r'Memory usage.*[8-9][0-9]%|Memory usage.*100%',
'connection_pool': r'Connection pool.*exhausted|Pool.*full'
}
def parse_log_line(self, line):
"""解析日志行"""
# 通用日志格式: YYYY-MM-DD HH:MM:SS LEVEL [THREAD] CLASS - MESSAGE
log_pattern = r'(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}[,.]\d{3})\s+(\w+)\s+\[([^\]]+)\]\s+([^\s]+)\s+-\s+(.*)'
match = re.match(log_pattern, line)
if match:
timestamp_str, level, thread, class_name, message = match.groups()
# 解析时间戳
try:
timestamp = datetime.strptime(timestamp_str.replace(',', '.'), '%Y-%m-%d %H:%M:%S.%f')
except ValueError:
timestamp = datetime.strptime(timestamp_str.split(',')[0], '%Y-%m-%d %H:%M:%S')
return {
'timestamp': timestamp,
'level': level,
'thread': thread,
'class': class_name,
'message': message,
'raw_line': line
}
return None
def analyze_errors(self, log_entries):
"""分析错误日志"""
error_analysis = {
'error_counts': defaultdict(int),
'error_timeline': defaultdict(list),
'error_details': defaultdict(list)
}
for entry in log_entries:
if entry['level'] in ['ERROR', 'FATAL']:
message = entry['message']
# 匹配错误模式
for error_type, pattern in self.error_patterns.items():
if re.search(pattern, message, re.IGNORECASE):
error_analysis['error_counts'][error_type] += 1
error_analysis['error_timeline'][error_type].append(entry['timestamp'])
error_analysis['error_details'][error_type].append({
'timestamp': entry['timestamp'],
'message': message,
'thread': entry['thread'],
'class': entry['class']
})
break
else:
# 未分类的错误
error_analysis['error_counts']['other'] += 1
error_analysis['error_timeline']['other'].append(entry['timestamp'])
error_analysis['error_details']['other'].append({
'timestamp': entry['timestamp'],
'message': message,
'thread': entry['thread'],
'class': entry['class']
})
return error_analysis
def analyze_warnings(self, log_entries):
"""分析警告日志"""
warning_analysis = {
'warning_counts': defaultdict(int),
'warning_timeline': defaultdict(list),
'warning_details': defaultdict(list)
}
for entry in log_entries:
if entry['level'] == 'WARN':
message = entry['message']
# 匹配警告模式
for warning_type, pattern in self.warning_patterns.items():
if re.search(pattern, message, re.IGNORECASE):
warning_analysis['warning_counts'][warning_type] += 1
warning_analysis['warning_timeline'][warning_type].append(entry['timestamp'])
warning_analysis['warning_details'][warning_type].append({
'timestamp': entry['timestamp'],
'message': message,
'thread': entry['thread'],
'class': entry['class']
})
break
return warning_analysis
def analyze_performance(self, log_entries):
"""分析性能日志"""
performance_analysis = {
'query_times': [],
'slow_queries': [],
'memory_usage': [],
'connection_stats': defaultdict(int)
}
for entry in log_entries:
message = entry['message']
# 提取查询时间
query_time_match = re.search(r'Query.*took\s+(\d+(?:\.\d+)?)\s*(?:seconds?|ms)', message, re.IGNORECASE)
if query_time_match:
query_time = float(query_time_match.group(1))
if 'ms' not in message.lower():
query_time *= 1000 # 转换为毫秒
performance_analysis['query_times'].append({
'timestamp': entry['timestamp'],
'duration': query_time
})
if query_time > 5000: # 超过5秒的慢查询
performance_analysis['slow_queries'].append({
'timestamp': entry['timestamp'],
'duration': query_time,
'message': message
})
# 提取内存使用
memory_match = re.search(r'Memory usage[:\s]+(\d+(?:\.\d+)?)%', message, re.IGNORECASE)
if memory_match:
memory_usage = float(memory_match.group(1))
performance_analysis['memory_usage'].append({
'timestamp': entry['timestamp'],
'usage': memory_usage
})
# 统计连接相关信息
if 'connection' in message.lower():
if 'created' in message.lower():
performance_analysis['connection_stats']['created'] += 1
elif 'closed' in message.lower():
performance_analysis['connection_stats']['closed'] += 1
elif 'timeout' in message.lower():
performance_analysis['connection_stats']['timeout'] += 1
return performance_analysis
def generate_report(self, log_file, output_file=None):
"""生成分析报告"""
print(f"分析日志文件: {log_file}")
# 读取日志文件
log_entries = []
with open(log_file, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
entry = self.parse_log_line(line.strip())
if entry:
log_entries.append(entry)
elif line_num <= 10: # 只显示前10行解析失败的信息
print(f"警告: 第{line_num}行解析失败: {line[:100]}...")
print(f"成功解析 {len(log_entries)} 条日志记录")
if not log_entries:
print("没有找到有效的日志记录")
return
# 分析日志
error_analysis = self.analyze_errors(log_entries)
warning_analysis = self.analyze_warnings(log_entries)
performance_analysis = self.analyze_performance(log_entries)
# 生成报告
report = {
'analysis_time': datetime.now().isoformat(),
'log_file': log_file,
'total_entries': len(log_entries),
'time_range': {
'start': min(entry['timestamp'] for entry in log_entries).isoformat(),
'end': max(entry['timestamp'] for entry in log_entries).isoformat()
},
'level_distribution': dict(Counter(entry['level'] for entry in log_entries)),
'error_analysis': {
'total_errors': sum(error_analysis['error_counts'].values()),
'error_types': dict(error_analysis['error_counts']),
'top_errors': self.get_top_errors(error_analysis)
},
'warning_analysis': {
'total_warnings': sum(warning_analysis['warning_counts'].values()),
'warning_types': dict(warning_analysis['warning_counts'])
},
'performance_analysis': {
'total_queries': len(performance_analysis['query_times']),
'slow_queries': len(performance_analysis['slow_queries']),
'avg_query_time': self.calculate_avg_query_time(performance_analysis['query_times']),
'connection_stats': dict(performance_analysis['connection_stats'])
}
}
# 保存报告
if output_file:
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False, default=str)
print(f"分析报告已保存到: {output_file}")
# 打印摘要
self.print_report_summary(report)
return report
def get_top_errors(self, error_analysis, top_n=5):
"""获取最频繁的错误"""
top_errors = []
for error_type, count in sorted(error_analysis['error_counts'].items(),
key=lambda x: x[1], reverse=True)[:top_n]:
if error_type in error_analysis['error_details']:
latest_error = max(error_analysis['error_details'][error_type],
key=lambda x: x['timestamp'])
top_errors.append({
'type': error_type,
'count': count,
'latest_message': latest_error['message'][:200],
'latest_time': latest_error['timestamp'].isoformat()
})
return top_errors
def calculate_avg_query_time(self, query_times):
"""计算平均查询时间"""
if not query_times:
return 0
total_time = sum(qt['duration'] for qt in query_times)
return total_time / len(query_times)
def print_report_summary(self, report):
"""打印报告摘要"""
print("\n=== 日志分析报告 ===")
print(f"分析时间: {report['analysis_time']}")
print(f"日志文件: {report['log_file']}")
print(f"总记录数: {report['total_entries']}")
print(f"时间范围: {report['time_range']['start']} ~ {report['time_range']['end']}")
# 日志级别分布
print("\n📊 日志级别分布:")
for level, count in sorted(report['level_distribution'].items()):
print(f" {level}: {count}")
# 错误分析
error_analysis = report['error_analysis']
print(f"\n❌ 错误分析 (总计: {error_analysis['total_errors']})")
if error_analysis['error_types']:
for error_type, count in sorted(error_analysis['error_types'].items(),
key=lambda x: x[1], reverse=True):
print(f" {error_type}: {count}")
else:
print(" 无错误记录")
# 性能分析
perf_analysis = report['performance_analysis']
print(f"\n⚡ 性能分析:")
print(f" 总查询数: {perf_analysis['total_queries']}")
print(f" 慢查询数: {perf_analysis['slow_queries']}")
if perf_analysis['avg_query_time'] > 0:
print(f" 平均查询时间: {perf_analysis['avg_query_time']:.2f}ms")
# 连接统计
conn_stats = perf_analysis['connection_stats']
if conn_stats:
print(f" 连接统计: 创建{conn_stats.get('created', 0)} 关闭{conn_stats.get('closed', 0)} 超时{conn_stats.get('timeout', 0)}")
def main():
parser = argparse.ArgumentParser(description='数据源日志分析工具')
parser.add_argument('log_file', help='日志文件路径')
parser.add_argument('--output', '-o', help='输出报告文件路径')
args = parser.parse_args()
analyzer = LogAnalyzer()
analyzer.generate_report(args.log_file, args.output)
if __name__ == "__main__":
main()
4.8 本章小结
本章详细介绍了Apache Kylin的数据源管理与连接,主要内容包括:
核心知识点
数据源类型:
- Hive数据源:主要的批处理数据源
- Kafka数据源:流式数据处理
- JDBC数据源:关系型数据库集成
- 文件数据源:Parquet、ORC格式支持
连接配置:
- 连接池管理和优化
- 安全认证配置
- 性能调优参数
数据同步:
- 增量数据同步策略
- 批量数据处理
- 实时流数据处理
监控管理:
- 健康检查机制
- 性能监控指标
- 故障诊断工具
最佳实践
- 连接池配置:合理设置连接池大小和超时参数
- 安全配置:启用SSL/TLS加密和身份认证
- 性能优化:根据数据特点选择合适的数据源类型
- 监控告警:建立完善的监控和告警机制
实用工具
本章提供了多个实用脚本: - Hive表同步和分析工具 - Kafka主题管理和流处理工具 - JDBC数据同步工具 - 数据源健康检查工具 - 性能监控工具 - 故障排查和日志分析工具
下一章预告
下一章将介绍数据模型设计与管理,包括: - 维度建模理论 - 星型模式和雪花模式设计 - Kylin数据模型创建 - 维度和度量定义 - 模型优化策略
练习与思考
理论练习
- 比较Hive、Kafka、JDBC三种数据源的特点和适用场景
- 解释连接池的工作原理和配置要点
- 分析流式数据处理和批处理的区别
实践练习
- 配置一个Hive数据源,包括安全认证和性能优化
- 设置Kafka流式数据源,实现实时数据处理
- 配置JDBC数据源连接MySQL数据库
- 使用提供的脚本进行数据源健康检查
- 分析Kylin日志文件,识别性能瓶颈
思考题
- 如何设计一个支持多种数据源的统一数据接入架构?
- 在大数据量场景下,如何优化数据源连接性能?
- 如何实现数据源的故障自动切换和恢复?
- 流式数据和批处理数据如何在Kylin中协同工作?
4.2.3 Hive性能优化
查询优化配置:
# Hive查询优化
kylin.source.hive.beeline-shell=/opt/hive/bin/beeline
kylin.source.hive.beeline-params=-u jdbc:hive2://localhost:10000 --hiveconf hive.security.authorization.sqlstd.confwhitelist.append=mapreduce.job.queuename|mapreduce.job.tags
# 启用Tez执行引擎
kylin.source.hive.beeline-params=--hiveconf hive.execution.engine=tez
# 启用向量化
kylin.source.hive.beeline-params=--hiveconf hive.vectorized.execution.enabled=true
# 启用CBO
kylin.source.hive.beeline-params=--hiveconf hive.cbo.enable=true
# 动态分区裁剪
kylin.source.hive.beeline-params=--hiveconf hive.optimize.ppd=true
Hive表优化脚本:
#!/bin/bash
# optimize_hive_tables.sh - Hive表优化脚本
DATABASE="$1"
TABLE="$2"
if [ -z "$DATABASE" ] || [ -z "$TABLE" ]; then
echo "用法: $0 <database> <table>"
exit 1
fi
echo "优化Hive表: $DATABASE.$TABLE"
# 1. 收集表统计信息
echo "收集表统计信息..."
hive -e "ANALYZE TABLE $DATABASE.$TABLE COMPUTE STATISTICS;"
# 2. 收集列统计信息
echo "收集列统计信息..."
COLUMNS=$(hive -e "DESCRIBE $DATABASE.$TABLE;" | awk '{print $1}' | grep -v '^#' | grep -v '^$' | tr '\n' ',')
COLUMNS=${COLUMNS%,} # 移除最后的逗号
hive -e "ANALYZE TABLE $DATABASE.$TABLE COMPUTE STATISTICS FOR COLUMNS $COLUMNS;"
# 3. 优化文件格式(转换为ORC)
echo "检查表格式..."
FORMAT=$(hive -e "DESCRIBE FORMATTED $DATABASE.$TABLE;" | grep 'InputFormat:' | awk '{print $2}')
if [[ "$FORMAT" != *"OrcInputFormat"* ]]; then
echo "转换表格式为ORC..."
# 创建ORC格式的新表
hive -e "
CREATE TABLE ${DATABASE}.${TABLE}_orc
STORED AS ORC
TBLPROPERTIES ('orc.compress'='SNAPPY')
AS SELECT * FROM $DATABASE.$TABLE;
"
# 备份原表
hive -e "ALTER TABLE $DATABASE.$TABLE RENAME TO ${DATABASE}.${TABLE}_backup;"
# 重命名新表
hive -e "ALTER TABLE ${DATABASE}.${TABLE}_orc RENAME TO $DATABASE.$TABLE;"
echo "表格式转换完成"
else
echo "表已经是ORC格式"
fi
# 4. 压缩小文件
echo "检查小文件..."
FILE_COUNT=$(hadoop fs -ls /warehouse/$DATABASE.db/$TABLE/ 2>/dev/null | wc -l)
if [ "$FILE_COUNT" -gt 100 ]; then
echo "发现小文件过多,进行合并..."
hive -e "
SET hive.merge.mapfiles=true;
SET hive.merge.mapredfiles=true;
SET hive.merge.size.per.task=256000000;
SET hive.merge.smallfiles.avgsize=128000000;
INSERT OVERWRITE TABLE $DATABASE.$TABLE
SELECT * FROM $DATABASE.$TABLE;
"
echo "小文件合并完成"
fi
echo "表优化完成: $DATABASE.$TABLE"
4.3 Kafka数据源配置
4.3.1 Kafka连接配置
Kafka数据源配置:
# Kafka数据源配置
kylin.source.kafka.bootstrap-servers=kafka1:9092,kafka2:9092,kafka3:9092
kylin.source.kafka.consumer.group.id=kylin-consumer-group
kylin.source.kafka.consumer.auto.offset.reset=earliest
kylin.source.kafka.consumer.enable.auto.commit=false
kylin.source.kafka.consumer.max.poll.records=1000
kylin.source.kafka.consumer.session.timeout.ms=30000
kylin.source.kafka.consumer.heartbeat.interval.ms=3000
# Kafka安全配置
kylin.source.kafka.security.protocol=SASL_SSL
kylin.source.kafka.sasl.mechanism=PLAIN
kylin.source.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kylin" password="kylin123";
kylin.source.kafka.ssl.truststore.location=/opt/kylin/conf/kafka.client.truststore.jks
kylin.source.kafka.ssl.truststore.password=truststore123
# 流处理配置
kylin.source.kafka.streaming.window.size=60000
kylin.source.kafka.streaming.watermark.delay=10000
kylin.source.kafka.streaming.checkpoint.interval=30000
Kafka主题管理脚本:
#!/bin/bash
# kafka_topic_manager.sh - Kafka主题管理脚本
KAFKA_HOME="/opt/kafka"
BOOTSTRAP_SERVERS="kafka1:9092,kafka2:9092,kafka3:9092"
ACTION="$1"
TOPIC="$2"
case "$ACTION" in
"create")
if [ -z "$TOPIC" ]; then
echo "用法: $0 create <topic_name> [partitions] [replication_factor]"
exit 1
fi
PARTITIONS=${3:-3}
REPLICATION_FACTOR=${4:-2}
echo "创建Kafka主题: $TOPIC"
$KAFKA_HOME/bin/kafka-topics.sh \
--create \
--bootstrap-server $BOOTSTRAP_SERVERS \
--topic $TOPIC \
--partitions $PARTITIONS \
--replication-factor $REPLICATION_FACTOR
;;
"list")
echo "列出所有Kafka主题:"
$KAFKA_HOME/bin/kafka-topics.sh \
--list \
--bootstrap-server $BOOTSTRAP_SERVERS
;;
"describe")
if [ -z "$TOPIC" ]; then
echo "用法: $0 describe <topic_name>"
exit 1
fi
echo "描述Kafka主题: $TOPIC"
$KAFKA_HOME/bin/kafka-topics.sh \
--describe \
--bootstrap-server $BOOTSTRAP_SERVERS \
--topic $TOPIC
;;
"delete")
if [ -z "$TOPIC" ]; then
echo "用法: $0 delete <topic_name>"
exit 1
fi
echo "删除Kafka主题: $TOPIC"
$KAFKA_HOME/bin/kafka-topics.sh \
--delete \
--bootstrap-server $BOOTSTRAP_SERVERS \
--topic $TOPIC
;;
"produce")
if [ -z "$TOPIC" ]; then
echo "用法: $0 produce <topic_name>"
exit 1
fi
echo "向主题 $TOPIC 发送消息 (Ctrl+C 退出):"
$KAFKA_HOME/bin/kafka-console-producer.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--topic $TOPIC
;;
"consume")
if [ -z "$TOPIC" ]; then
echo "用法: $0 consume <topic_name> [from_beginning]"
exit 1
fi
FROM_BEGINNING="$3"
echo "消费主题 $TOPIC 的消息 (Ctrl+C 退出):"
if [ "$FROM_BEGINNING" = "true" ]; then
$KAFKA_HOME/bin/kafka-console-consumer.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--topic $TOPIC \
--from-beginning
else
$KAFKA_HOME/bin/kafka-console-consumer.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--topic $TOPIC
fi
;;
"monitor")
if [ -z "$TOPIC" ]; then
echo "用法: $0 monitor <topic_name>"
exit 1
fi
echo "监控主题 $TOPIC:"
while true; do
echo "=== $(date) ==="
# 获取主题详情
$KAFKA_HOME/bin/kafka-topics.sh \
--describe \
--bootstrap-server $BOOTSTRAP_SERVERS \
--topic $TOPIC
# 获取消费者组信息
echo "\n消费者组信息:"
$KAFKA_HOME/bin/kafka-consumer-groups.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--describe \
--all-groups | grep $TOPIC
echo "\n---"
sleep 10
done
;;
*)
echo "用法: $0 {create|list|describe|delete|produce|consume|monitor} [options]"
echo "示例:"
echo " $0 create sales_events 6 3"
echo " $0 list"
echo " $0 describe sales_events"
echo " $0 consume sales_events true"
echo " $0 monitor sales_events"
exit 1
;;
esac
4.3.2 流式数据模型
Kafka流式数据模型配置:
{
"name": "sales_streaming_model",
"description": "销售流式数据模型",
"fact_table": "kafka.sales_events",
"streaming_config": {
"kafka_config": {
"bootstrap_servers": "kafka1:9092,kafka2:9092,kafka3:9092",
"topic": "sales_events",
"consumer_group": "kylin_sales_consumer",
"auto_offset_reset": "latest"
},
"schema_config": {
"format": "json",
"timestamp_field": "event_time",
"timestamp_format": "yyyy-MM-dd HH:mm:ss"
},
"window_config": {
"window_size": "1 minute",
"slide_interval": "30 seconds",
"watermark_delay": "10 seconds"
}
},
"columns": [
{
"name": "event_time",
"type": "timestamp",
"nullable": false
},
{
"name": "order_id",
"type": "string",
"nullable": false
},
{
"name": "customer_id",
"type": "string",
"nullable": false
},
{
"name": "product_id",
"type": "string",
"nullable": false
},
{
"name": "quantity",
"type": "int",
"nullable": false
},
{
"name": "unit_price",
"type": "decimal(10,2)",
"nullable": false
},
{
"name": "total_amount",
"type": "decimal(12,2)",
"nullable": false
}
]
}
流式数据处理脚本:
#!/usr/bin/env python3
# kafka_stream_processor.py - Kafka流式数据处理
import json
import time
from datetime import datetime
from kafka import KafkaConsumer, KafkaProducer
from kafka.errors import KafkaError
import logging
class KafkaStreamProcessor:
def __init__(self, config):
self.config = config
self.consumer = None
self.producer = None
self.setup_logging()
self.setup_kafka()
def setup_logging(self):
"""设置日志"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('kafka_stream_processor.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def setup_kafka(self):
"""设置Kafka连接"""
try:
# 设置消费者
self.consumer = KafkaConsumer(
self.config['input_topic'],
bootstrap_servers=self.config['bootstrap_servers'],
group_id=self.config['consumer_group'],
auto_offset_reset='latest',
enable_auto_commit=False,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
# 设置生产者
self.producer = KafkaProducer(
bootstrap_servers=self.config['bootstrap_servers'],
value_serializer=lambda x: json.dumps(x).encode('utf-8'),
acks='all',
retries=3
)
self.logger.info("Kafka连接设置完成")
except Exception as e:
self.logger.error(f"Kafka连接设置失败: {e}")
raise
def process_message(self, message):
"""处理单条消息"""
try:
# 数据验证
if not self.validate_message(message):
return None
# 数据转换
processed_data = self.transform_message(message)
# 数据增强
enriched_data = self.enrich_message(processed_data)
return enriched_data
except Exception as e:
self.logger.error(f"消息处理失败: {e}, 消息: {message}")
return None
def validate_message(self, message):
"""验证消息格式"""
required_fields = ['event_time', 'order_id', 'customer_id', 'product_id', 'quantity', 'unit_price']
for field in required_fields:
if field not in message:
self.logger.warning(f"缺少必需字段: {field}")
return False
# 验证数据类型
try:
datetime.strptime(message['event_time'], '%Y-%m-%d %H:%M:%S')
float(message['quantity'])
float(message['unit_price'])
except (ValueError, TypeError) as e:
self.logger.warning(f"数据类型验证失败: {e}")
return False
return True
def transform_message(self, message):
"""转换消息格式"""
# 计算总金额
total_amount = float(message['quantity']) * float(message['unit_price'])
# 标准化时间格式
event_time = datetime.strptime(message['event_time'], '%Y-%m-%d %H:%M:%S')
transformed = {
'event_time': event_time.isoformat(),
'order_id': str(message['order_id']),
'customer_id': str(message['customer_id']),
'product_id': str(message['product_id']),
'quantity': int(message['quantity']),
'unit_price': round(float(message['unit_price']), 2),
'total_amount': round(total_amount, 2),
'processed_time': datetime.now().isoformat()
}
return transformed
def enrich_message(self, message):
"""数据增强"""
# 添加时间维度
event_time = datetime.fromisoformat(message['event_time'])
message.update({
'year': event_time.year,
'month': event_time.month,
'day': event_time.day,
'hour': event_time.hour,
'day_of_week': event_time.weekday() + 1,
'is_weekend': event_time.weekday() >= 5
})
# 添加业务分类
if message['total_amount'] > 1000:
message['order_category'] = 'high_value'
elif message['total_amount'] > 100:
message['order_category'] = 'medium_value'
else:
message['order_category'] = 'low_value'
return message
def send_to_output(self, processed_data):
"""发送处理后的数据"""
try:
future = self.producer.send(
self.config['output_topic'],
value=processed_data
)
# 等待发送完成
future.get(timeout=10)
except KafkaError as e:
self.logger.error(f"发送消息失败: {e}")
raise
def run(self):
"""运行流处理器"""
self.logger.info("开始流式数据处理")
try:
for message in self.consumer:
# 处理消息
processed_data = self.process_message(message.value)
if processed_data:
# 发送到输出主题
self.send_to_output(processed_data)
# 提交偏移量
self.consumer.commit()
self.logger.debug(f"处理消息: {message.offset}")
except KeyboardInterrupt:
self.logger.info("收到停止信号")
except Exception as e:
self.logger.error(f"流处理器运行错误: {e}")
raise
finally:
self.cleanup()
def cleanup(self):
"""清理资源"""
if self.consumer:
self.consumer.close()
if self.producer:
self.producer.close()
self.logger.info("资源清理完成")
def main():
config = {
'bootstrap_servers': ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
'input_topic': 'sales_events_raw',
'output_topic': 'sales_events_processed',
'consumer_group': 'kylin_stream_processor'
}
processor = KafkaStreamProcessor(config)
processor.run()
if __name__ == "__main__":
main()
4.4 JDBC数据源配置
4.4.1 JDBC连接配置
JDBC数据源配置文件:
# JDBC数据源配置
kylin.source.jdbc.connection-url=jdbc:mysql://mysql-server:3306/sales_db?useSSL=false&serverTimezone=UTC
kylin.source.jdbc.driver=com.mysql.cj.jdbc.Driver
kylin.source.jdbc.username=kylin_user
kylin.source.jdbc.password=kylin123
# 连接池配置
kylin.source.jdbc.pool.max-total=20
kylin.source.jdbc.pool.max-idle=10
kylin.source.jdbc.pool.min-idle=5
kylin.source.jdbc.pool.max-wait-millis=30000
kylin.source.jdbc.pool.validation-query=SELECT 1
kylin.source.jdbc.pool.test-on-borrow=true
kylin.source.jdbc.pool.test-while-idle=true
# 查询配置
kylin.source.jdbc.fetch-size=1000
kylin.source.jdbc.query-timeout=300
kylin.source.jdbc.batch-size=1000
多数据源配置:
{
"jdbc_data_sources": [
{
"name": "mysql_sales",
"type": "mysql",
"connection": {
"url": "jdbc:mysql://mysql-server:3306/sales_db",
"driver": "com.mysql.cj.jdbc.Driver",
"username": "sales_user",
"password": "sales123"
},
"pool_config": {
"max_total": 20,
"max_idle": 10,
"min_idle": 5
}
},
{
"name": "postgresql_crm",
"type": "postgresql",
"connection": {
"url": "jdbc:postgresql://pg-server:5432/crm_db",
"driver": "org.postgresql.Driver",
"username": "crm_user",
"password": "crm123"
},
"pool_config": {
"max_total": 15,
"max_idle": 8,
"min_idle": 3
}
},
{
"name": "oracle_finance",
"type": "oracle",
"connection": {
"url": "jdbc:oracle:thin:@oracle-server:1521:XE",
"driver": "oracle.jdbc.OracleDriver",
"username": "finance_user",
"password": "finance123"
},
"pool_config": {
"max_total": 10,
"max_idle": 5,
"min_idle": 2
}
}
]
}
4.4.2 JDBC数据同步
JDBC数据同步脚本:
#!/usr/bin/env python3
# jdbc_data_sync.py - JDBC数据同步脚本
import json
import time
import logging
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
import pymysql
import psycopg2
import cx_Oracle
from hdfs import InsecureClient
class JDBCDataSync:
def __init__(self, config_file):
self.config = self.load_config(config_file)
self.setup_logging()
self.hdfs_client = InsecureClient(self.config['hdfs']['url'])
def load_config(self, config_file):
"""加载配置文件"""
with open(config_file, 'r', encoding='utf-8') as f:
return json.load(f)
def setup_logging(self):
"""设置日志"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('jdbc_data_sync.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def get_connection(self, datasource_config):
"""获取数据库连接"""
db_type = datasource_config['type']
conn_config = datasource_config['connection']
try:
if db_type == 'mysql':
return pymysql.connect(
host=conn_config['host'],
port=conn_config.get('port', 3306),
user=conn_config['username'],
password=conn_config['password'],
database=conn_config['database'],
charset='utf8mb4'
)
elif db_type == 'postgresql':
return psycopg2.connect(
host=conn_config['host'],
port=conn_config.get('port', 5432),
user=conn_config['username'],
password=conn_config['password'],
database=conn_config['database']
)
elif db_type == 'oracle':
dsn = cx_Oracle.makedsn(
conn_config['host'],
conn_config.get('port', 1521),
service_name=conn_config['service_name']
)
return cx_Oracle.connect(
conn_config['username'],
conn_config['password'],
dsn
)
else:
raise ValueError(f"不支持的数据库类型: {db_type}")
except Exception as e:
self.logger.error(f"连接数据库失败: {e}")
raise
def extract_table_data(self, datasource_config, table_config):
"""提取表数据"""
table_name = table_config['name']
self.logger.info(f"开始提取表数据: {table_name}")
connection = None
try:
connection = self.get_connection(datasource_config)
cursor = connection.cursor()
# 构建查询SQL
sql = self.build_extract_sql(table_config)
self.logger.info(f"执行SQL: {sql}")
cursor.execute(sql)
# 获取列名
columns = [desc[0] for desc in cursor.description]
# 分批读取数据
batch_size = table_config.get('batch_size', 10000)
batch_num = 0
while True:
rows = cursor.fetchmany(batch_size)
if not rows:
break
batch_num += 1
self.logger.info(f"处理批次 {batch_num}, 记录数: {len(rows)}")
# 转换为JSON格式
json_data = []
for row in rows:
record = {}
for i, value in enumerate(row):
# 处理特殊数据类型
if isinstance(value, datetime):
record[columns[i]] = value.isoformat()
elif value is None:
record[columns[i]] = None
else:
record[columns[i]] = str(value)
json_data.append(record)
# 写入HDFS
self.write_to_hdfs(table_name, json_data, batch_num)
self.logger.info(f"表数据提取完成: {table_name}, 总批次: {batch_num}")
except Exception as e:
self.logger.error(f"提取表数据失败: {table_name}, 错误: {e}")
raise
finally:
if connection:
connection.close()
def build_extract_sql(self, table_config):
"""构建数据提取SQL"""
table_name = table_config['name']
columns = table_config.get('columns', ['*'])
# 构建列列表
if columns == ['*']:
column_list = '*'
else:
column_list = ', '.join(columns)
sql = f"SELECT {column_list} FROM {table_name}"
# 添加WHERE条件
if 'where_condition' in table_config:
sql += f" WHERE {table_config['where_condition']}"
# 添加增量同步条件
if table_config.get('incremental', False):
timestamp_column = table_config['timestamp_column']
last_sync_time = self.get_last_sync_time(table_name)
if last_sync_time:
if 'where_condition' in table_config:
sql += f" AND {timestamp_column} > '{last_sync_time}'"
else:
sql += f" WHERE {timestamp_column} > '{last_sync_time}'"
# 添加ORDER BY
if 'order_by' in table_config:
sql += f" ORDER BY {table_config['order_by']}"
return sql
def write_to_hdfs(self, table_name, data, batch_num):
"""写入数据到HDFS"""
try:
# 构建HDFS路径
date_str = datetime.now().strftime('%Y%m%d')
hdfs_path = f"/kylin/data/{table_name}/{date_str}/batch_{batch_num:06d}.json"
# 确保目录存在
hdfs_dir = f"/kylin/data/{table_name}/{date_str}"
if not self.hdfs_client.status(hdfs_dir, strict=False):
self.hdfs_client.makedirs(hdfs_dir)
# 写入数据
json_content = '\n'.join(json.dumps(record, ensure_ascii=False) for record in data)
with self.hdfs_client.write(hdfs_path, encoding='utf-8') as writer:
writer.write(json_content)
self.logger.info(f"数据已写入HDFS: {hdfs_path}")
except Exception as e:
self.logger.error(f"写入HDFS失败: {e}")
raise
def get_last_sync_time(self, table_name):
"""获取上次同步时间"""
sync_file = f"/kylin/sync_status/{table_name}_last_sync.txt"
try:
if self.hdfs_client.status(sync_file, strict=False):
with self.hdfs_client.read(sync_file, encoding='utf-8') as reader:
return reader.read().strip()
except Exception:
pass
return None
def update_sync_time(self, table_name, sync_time):
"""更新同步时间"""
sync_file = f"/kylin/sync_status/{table_name}_last_sync.txt"
try:
# 确保目录存在
sync_dir = "/kylin/sync_status"
if not self.hdfs_client.status(sync_dir, strict=False):
self.hdfs_client.makedirs(sync_dir)
with self.hdfs_client.write(sync_file, encoding='utf-8', overwrite=True) as writer:
writer.write(sync_time)
except Exception as e:
self.logger.error(f"更新同步时间失败: {e}")
def sync_datasource(self, datasource_name):
"""同步单个数据源"""
self.logger.info(f"开始同步数据源: {datasource_name}")
datasource_config = None
for ds in self.config['datasources']:
if ds['name'] == datasource_name:
datasource_config = ds
break
if not datasource_config:
raise ValueError(f"未找到数据源配置: {datasource_name}")
# 同步所有表
for table_config in datasource_config['tables']:
try:
self.extract_table_data(datasource_config, table_config)
# 更新同步时间
if table_config.get('incremental', False):
current_time = datetime.now().isoformat()
self.update_sync_time(table_config['name'], current_time)
except Exception as e:
self.logger.error(f"同步表失败: {table_config['name']}, 错误: {e}")
continue
self.logger.info(f"数据源同步完成: {datasource_name}")
def sync_all(self, parallel=True):
"""同步所有数据源"""
self.logger.info("开始同步所有数据源")
datasource_names = [ds['name'] for ds in self.config['datasources']]
if parallel:
# 并行同步
with ThreadPoolExecutor(max_workers=self.config.get('max_workers', 4)) as executor:
futures = {executor.submit(self.sync_datasource, name): name for name in datasource_names}
for future in as_completed(futures):
datasource_name = futures[future]
try:
future.result()
self.logger.info(f"数据源同步成功: {datasource_name}")
except Exception as e:
self.logger.error(f"数据源同步失败: {datasource_name}, 错误: {e}")
else:
# 串行同步
for datasource_name in datasource_names:
try:
self.sync_datasource(datasource_name)
except Exception as e:
self.logger.error(f"数据源同步失败: {datasource_name}, 错误: {e}")
continue
self.logger.info("所有数据源同步完成")
def main():
import argparse
parser = argparse.ArgumentParser(description='JDBC数据同步工具')
parser.add_argument('--config', required=True, help='配置文件路径')
parser.add_argument('--datasource', help='指定数据源名称')
parser.add_argument('--parallel', action='store_true', help='并行同步')
args = parser.parse_args()
sync_tool = JDBCDataSync(args.config)
if args.datasource:
sync_tool.sync_datasource(args.datasource)
else:
sync_tool.sync_all(args.parallel)
if __name__ == "__main__":
main()
JDBC同步配置文件示例:
{
"hdfs": {
"url": "http://namenode:9870"
},
"max_workers": 4,
"datasources": [
{
"name": "mysql_sales",
"type": "mysql",
"connection": {
"host": "pg-server",
"port": 5432,
"username": "crm_user",
"password": "crm123",
"database": "crm_db"
},
"tables": [
{
"name": "customers",
"columns": ["customer_id", "customer_name", "segment", "created_date"],
"incremental": true,
"timestamp_column": "created_date",
"batch_size": 8000
}
]
}
]
}
4.5 文件数据源配置
4.5.1 Parquet文件配置
Parquet数据源配置:
# Parquet文件配置
kylin.source.parquet.base-path=/data/parquet
kylin.source.parquet.file-pattern=*.parquet
kylin.source.parquet.schema-inference=true
kylin.source.parquet.merge-schema=true
kylin.source.parquet.compression=snappy
# 读取优化
kylin.source.parquet.batch-size=10000
kylin.source.parquet.parallel-read=true
kylin.source.parquet.max-parallel-files=10
Parquet文件管理脚本:
#!/bin/bash
# parquet_file_manager.sh - Parquet文件管理脚本
PARQUET_BASE_PATH="/data/parquet"
ACTION="$1"
TABLE_NAME="$2"
case "$ACTION" in
"list")
echo "列出Parquet文件:"
find $PARQUET_BASE_PATH -name "*.parquet" -type f | sort
;;
"info")
if [ -z "$TABLE_NAME" ]; then
echo "用法: $0 info <parquet_file>"
exit 1
fi
echo "Parquet文件信息: $TABLE_NAME"
parquet-tools meta $TABLE_NAME
echo "\n文件内容预览:"
parquet-tools head -n 10 $TABLE_NAME
;;
"schema")
if [ -z "$TABLE_NAME" ]; then
echo "用法: $0 schema <parquet_file>"
exit 1
fi
echo "Parquet文件Schema: $TABLE_NAME"
parquet-tools schema $TABLE_NAME
;;
"convert")
INPUT_FILE="$TABLE_NAME"
OUTPUT_FILE="$3"
if [ -z "$INPUT_FILE" ] || [ -z "$OUTPUT_FILE" ]; then
echo "用法: $0 convert <input_file> <output_parquet>"
exit 1
fi
echo "转换文件为Parquet格式: $INPUT_FILE -> $OUTPUT_FILE"
# 使用Spark进行转换
spark-submit --class org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat \
--master local[*] \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.coalescePartitions.enabled=true \
/opt/spark/examples/jars/spark-examples_2.12-3.1.2.jar \
$INPUT_FILE $OUTPUT_FILE
;;
"optimize")
if [ -z "$TABLE_NAME" ]; then
echo "用法: $0 optimize <parquet_directory>"
exit 1
fi
echo "优化Parquet文件: $TABLE_NAME"
# 合并小文件
spark-submit --class ParquetOptimizer \
--master local[*] \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.coalescePartitions.enabled=true \
--conf spark.sql.files.maxPartitionBytes=134217728 \
/opt/kylin/lib/parquet-optimizer.jar \
$TABLE_NAME
;;
*)
echo "用法: $0 {list|info|schema|convert|optimize} [options]"
echo "示例:"
echo " $0 list"
echo " $0 info /data/parquet/sales.parquet"
echo " $0 schema /data/parquet/sales.parquet"
echo " $0 convert /data/csv/sales.csv /data/parquet/sales.parquet"
echo " $0 optimize /data/parquet/sales/"
exit 1
;;
esac
4.5.2 ORC文件配置
ORC数据源配置:
# ORC文件配置
kylin.source.orc.base-path=/data/orc
kylin.source.orc.file-pattern=*.orc
kylin.source.orc.compression=zlib
kylin.source.orc.stripe-size=67108864
kylin.source.orc.row-index-stride=10000
# 读取优化
kylin.source.orc.batch-size=1024
kylin.source.orc.use-selected=true
kylin.source.orc.zero-copy=true
4.6 数据源监控与管理
4.6.1 数据源健康检查
数据源健康检查脚本: “`python #!/usr/bin/env python3
datasource_health_check.py - 数据源健康检查
import json import time import logging from datetime import datetime import requests import pymysql import psycopg2 from kafka import KafkaConsumer from hdfs import InsecureClient
class DataSourceHealthChecker: def init(self, config_file): self.config = self.load_config(config_file) self.setup_logging() self.health_status = {}
def load_config(self, config_file):
"""加载配置文件"""
with open(config_file, 'r', encoding='utf-8') as f:
return json.load(f)
def setup_logging(self):
"""设置日志"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def check_hive_health(self, hive_config):
"""检查Hive健康状态"""
try:
# 检查Hive Metastore
metastore_url = hive_config['metastore_url']
response = requests.get(f"{metastore_url}/api/v1/databases", timeout=10)
if response.status_code == 200:
self.health_status['hive_metastore'] = 'healthy'
else:
self.health_status['hive_metastore'] = 'unhealthy'
# 检查HiveServer2
import jaydebeapi
conn = jaydebeapi.connect(
'org.apache.hive.jdbc.HiveDriver',
hive_config['jdbc_url'],
[hive_config['username'], hive_config['password']],
hive_config['driver_path']
)
cursor = conn.cursor()
cursor.execute("SHOW DATABASES")
databases = cursor.fetchall()
if databases:
self.health_status['hive_server'] = 'healthy'
else:
self.health_status['hive_server'] = 'unhealthy'
conn.close()
except Exception as e:
self.logger.error(f"Hive健康检查失败: {e}")
self.health_status['hive_metastore'] = 'error'
self.health_status['hive_server'] = 'error'
def check_kafka_health(self, kafka_config):
"""检查Kafka健康状态"""
try:
consumer = KafkaConsumer(
bootstrap_servers=kafka_config['bootstrap_servers'],
consumer_timeout_ms=5000
)
# 获取主题列表
topics = consumer.topics()
if topics:
self.health_status['kafka'] = 'healthy'
else:
self.health_status['kafka'] = 'unhealthy'
consumer.close()
except Exception as e:
self.logger.error(f"Kafka健康检查失败: {e}")
self.health_status['kafka'] = 'error'
def check_jdbc_health(self, jdbc_configs):
"""检查JDBC数据源健康状态"""
for jdbc_config in jdbc_configs:
name = jdbc_config['name']
try:
if jdbc_config['type'] == 'mysql':
conn = pymysql.connect(
host=jdbc_config['connection']['host'],
port=jdbc_config['connection'].get('port', 3306),
user=jdbc_config['connection']['username'],
password=jdbc_config['connection']['password'],
database=jdbc_config['connection']['database']
)
elif jdbc_config['type'] == 'postgresql':
conn = psycopg2.connect(
host=jdbc_config['connection']['host'],
port=jdbc_config['connection'].get('port', 5432),
user=jdbc_config['connection']['username'],
password=jdbc_config['connection']['password'],
database=jdbc_config['connection']['database']
)
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
if result:
self.health_status[f'jdbc_{name}'] = 'healthy'
else:
self.health_status[f'jdbc_{name}'] = 'unhealthy'
conn.close()
except Exception as e:
self.logger.error(f"JDBC数据源 {name} 健康检查失败: {e}")
self.health_status[f'jdbc_{name}'] = 'error'
def check_hdfs_health(self, hdfs_config):
"""检查HDFS健康状态"""
try:
client = InsecureClient(hdfs_config['url'])
# 检查根目录
status = client.status('/')
if status:
self.health_status['hdfs'] = 'healthy'
else:
self.health_status['hdfs'] = 'unhealthy'
except Exception as e:
self.logger.error(f"HDFS健康检查失败: {e}")
self.health_status['hdfs'] = 'error'
def run_health_check(self):
"""运行健康检查"""
self.logger.info("开始数据源健康检查")
# 检查Hive
if 'hive' in self.config:
self.check_hive_health(self.config['hive'])
# 检查Kafka
if 'kafka' in self.config:
self.check_kafka_health(self.config['kafka'])
# 检查JDBC数据源
if 'jdbc_datasources' in self.config:
self.check_jdbc_health(self.config['jdbc_datasources'])
# 检查HDFS
if 'hdfs' in self.config:
self.check_hdfs_health(self.config['hdfs'])
# 生成报告
self.generate_health_report()
def generate_health_report(self):
"""生成健康检查报告"""
report = {
'timestamp': datetime.now().isoformat(),
'overall_status': self.calculate_overall_status(),
'datasource_status': self.health_status,
'summary': self.generate_summary()
}
# 保存报告
report_file = f"datasource_health_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
self.logger.info(f"健康检查报告已保存: {report_file}")
# 打印摘要
self.print_health_summary(report)
def calculate_overall_status(self):
"""计算总体健康状态"""
if not self.health_status:
return 'unknown'
error_count = sum(1 for status in self.health_status.values() if status == 'error')
unhealthy_count = sum(1 for status in self.health_status.values() if status == 'unhealthy')
if error_count > 0:
return 'critical'
elif unhealthy_count > 0:
return 'warning'
else:
return 'healthy'
def generate_summary(self):
"""生成摘要信息"""
total = len(self.health_status)
healthy = sum(1 for status in self.health_status.values() if status == 'healthy')
unhealthy = sum(1 for status in self.health_status.values() if status == 'unhealthy')
error = sum(1 for status in self.health_status.values() if status == 'error')
return {
'total_datasources': total,
'healthy_count': healthy,
'unhealthy_count': unhealthy,
'error_count': error,
'health_percentage': round((healthy / total * 100) if total > 0 else 0, 2)
}
def print_health_summary(self, report):
"""打印健康检查摘要"""
print("\n=== 数据源健康检查报告 ===")
print(f"检查时间: {report['timestamp']}")
print(f"总体状态: {report['overall_status']}")
print(f"健康率: {report['summary']['health_percentage']}%")
print("\n数据源状态:")
for datasource, status in report['datasource_status'].items():
status_icon = {
'healthy': '✅',
'unhealthy': '⚠️',
'error': '❌'
}.get(status, '❓')
print(f" {status_icon} {datasource}: {status}")
print(f"\n统计信息:")
summary = report['summary']
print(f" 总数据源: {summary['total_datasources']}")
print(f" 健康: {summary['healthy_count']}")
print(f" 异常: {summary['unhealthy_count']}")
print(f" 错误: {summary['error_count']}")
def main(): import argparse
parser = argparse.ArgumentParser(description='数据源健康检查工具')
parser.add_argument('--config', required=True, help='配置文件路径')
args = parser.parse_args()
checker = DataSourceHealthChecker(args.config)
checker.run_health_check()
if name == “main”: main() “`”: “mysql-server”, “port”: 3306, “username”: “sales_user”, “password”: “sales123”, “database”: “sales_db” }, “tables”: [ { “name”: “orders”, “columns”: [“order_id”, “customer_id”, “order_date”, “total_amount”], “incremental”: true, “timestamp_column”: “order_date”, “batch_size”: 10000 }, { “name”: “customers”, “columns”: [“customer_id”, “customer_name”, “email”, “phone”], “incremental”: false, “batch_size”: 5000 } ] }, { “name”: “postgresql_crm”, “type”: “postgresql”, “connection”: { “host