11.1 章节概述

本章将深入探讨Apache Kylin的日常运维、故障诊断、性能排查和数据一致性检查等关键运维技能。通过系统化的运维方法和实用工具,帮助运维人员建立完整的Kylin运维体系。 mermaid graph TD A[Kylin运维与故障排除] --> B[日常运维操作] A --> C[故障诊断方法] A --> D[性能问题排查] A --> E[数据一致性检查] A --> F[监控告警体系] A --> G[备份恢复策略] B --> B1[服务管理] B --> B2[资源监控] B --> B3[日志管理] C --> C1[系统诊断] C --> C2[网络排查] C --> C3[依赖检查] D --> D1[查询性能分析] D --> D2[构建性能优化] D --> D3[系统资源调优] E --> E1[数据校验] E --> E2[一致性检查] E --> E3[修复策略] F --> F1[指标收集] F --> F2[告警规则] F --> F3[通知机制] G --> G1[元数据备份] G --> G2[Cube数据备份] G --> G3[灾难恢复]

11.2 日常运维操作

11.2.1 服务管理脚本

“`bash #!/bin/bash

kylin_service_manager.sh - Kylin服务管理脚本

配置变量

KYLIN_HOME=”/opt/kylin” KYLIN_USER=“kylin” KYLIN_PID_FILE=“$KYLIN_HOME/pid” KYLIN_LOG_DIR=“$KYLIN_HOME/logs” KYLIN_CONFIG_DIR=“$KYLIN_HOME/conf” HEALTH_CHECK_URL=”http://localhost:7070/kylin/api/admin/config” HEALTH_CHECK_TIMEOUT=30

颜色输出

RED=‘\033[0;31m’ GREEN=‘\033[0;32m’ YELLOW=‘\033[1;33m’ BLUE=‘\033[0;34m’ NC=‘\033[0m’ # No Color

日志函数

log_info() { echo -e “${GREEN}[INFO]${NC} $(date ‘+%Y-%m-%d %H:%M:%S’) - $1” } log_warn() { echo -e “${YELLOW}[WARN]${NC} $(date ‘+%Y-%m-%d %H:%M:%S’) - $1” } log_error() { echo -e “${RED}[ERROR]${NC} $(date ‘+%Y-%m-%d %H:%M:%S’) - $1” } log_debug() { echo -e “${BLUE}[DEBUG]${NC} $(date ‘+%Y-%m-%d %H:%M:%S’) - $1” }

检查Kylin是否运行

is_kylin_running() { if [ -f “$KYLIN_PID_FILE” ]; then local pid=$(cat “$KYLIN_PID_FILE”) if ps -p “$pid” > /dev/null 2>&1; then return 0 else log_warn “PID file exists but process not running, removing stale PID file” rm -f “$KYLIN_PID_FILE” return 1 fi else return 1 fi }

获取Kylin进程ID

get_kylin_pid() { if [ -f “$KYLIN_PID_FILE” ]; then cat “$KYLIN_PID_FILE” else echo “N/A” fi }

启动Kylin服务

start_kylin() { log_info “Starting Kylin service…” if is_kylin_running; then log_warn “Kylin is already running (PID: $(get_kylin_pid))” return 1 fi # 环境检查 if ! check_environment; then log_error “Environment check failed, cannot start Kylin” return 1 fi # 切换到Kylin用户执行 if [ “$(whoami)” != “$KYLIN_USER” ]; then log_info “Switching to user: $KYLIN_USER” sudo -u “$KYLIN_USER” “$0” start return $? fi # 启动服务 cd “$KYLIN_HOME” nohup bin/kylin.sh start > “$KYLIN_LOG_DIR/startup.log” 2>&1 & local start_pid=$! # 等待启动 log_info “Waiting for Kylin to start…” local max_wait=120 local wait_time=0 while [ $wait_time -lt $max_wait ]; do if is_kylin_running; then log_info “Kylin started successfully (PID: $(get_kylin_pid))” # 健康检查 if health_check; then log_info “Kylin health check passed” return 0 else log_warn “Kylin started but health check failed” return 1 fi fi sleep 5 wait_time=$((wait_time + 5)) echo -n “.” done echo log_error “Kylin failed to start within $max_wait seconds” return 1 }

停止Kylin服务

stop_kylin() { log_info “Stopping Kylin service…” if ! is_kylin_running; then log_warn “Kylin is not running” return 1 fi local pid=$(get_kylin_pid) log_info “Stopping Kylin process (PID: $pid)” # 切换到Kylin用户执行 if [ “$(whoami)” != “$KYLIN_USER” ]; then sudo -u “$KYLIN_USER” “$0” stop return $? fi # 优雅停止 cd “$KYLIN_HOME” bin/kylin.sh stop # 等待停止 local max_wait=60 local wait_time=0 while [ $wait_time -lt $max_wait ]; do if ! is_kylin_running; then log_info “Kylin stopped successfully” return 0 fi sleep 2 wait_time=$((wait_time + 2)) echo -n “.” done echo log_warn “Kylin did not stop gracefully, forcing termination” # 强制停止 if [ -f “$KYLIN_PID_FILE” ]; then local pid=$(cat “$KYLIN_PID_FILE”) kill -9 “$pid” 2>/dev/null rm -f “$KYLIN_PID_FILE” log_info “Kylin process terminated forcefully” fi return 0 }

重启Kylin服务

restart_kylin() { log_info “Restarting Kylin service…” stop_kylin sleep 5 start_kylin return $? }

检查Kylin状态

status_kylin() { echo “=== Kylin Service Status ===” if is_kylin_running; then local pid=$(get_kylin_pid) echo -e “Status: ${GREEN}RUNNING${NC}” echo “PID: $pid” # 进程信息 echo “Process Info:” ps -p “$pid” -o pid,ppid,user,start,time,command –no-headers 2>/dev/null || echo “ Process details unavailable” # 端口信息 echo “Port Usage:” netstat -tlnp 2>/dev/null | grep “:7070” | head -5 # 内存使用 echo “Memory Usage:” ps -p “$pid” -o pid,vsz,rss,pmem –no-headers 2>/dev/null || echo “ Memory details unavailable” else echo -e “Status: ${RED}STOPPED${NC}” fi echo echo “=== Recent Log Entries ===” if [ -f “$KYLIN_LOG_DIR/kylin.log” ]; then tail -10 “$KYLIN_LOG_DIR/kylin.log” else echo “No log file found” fi }

健康检查

health_check() { log_info “Performing health check…” # 检查进程 if ! is_kylin_running; then log_error “Health check failed: Kylin process not running” return 1 fi # 检查端口 if ! netstat -tln | grep -q “:7070”; then log_error “Health check failed: Port 7070 not listening” return 1 fi # 检查API响应 if command -v curl >/dev/null 2>&1; then local response=$(curl -s -w “%{http_code}” -o /dev/null –connect-timeout $HEALTH_CHECK_TIMEOUT “$HEALTH_CHECK_URL” 2>/dev/null) if [ “$response” = “200” ] || [ “$response” = “401” ]; then log_info “Health check passed: API responding” return 0 else log_error “Health check failed: API not responding (HTTP: $response)” return 1 fi else log_warn “curl not available, skipping API health check” return 0 fi }

环境检查

check_environment() { log_info “Checking environment…” local errors=0 # 检查Java if ! command -v java >/dev/null 2>&1; then log_error “Java not found in PATH” errors=$((errors + 1)) else local java_version=$(java -version 2>&1 | head -n1 | cut -d’“’ -f2) log_info “Java version: $java_version” fi # 检查JAVA_HOME if [ -z “$JAVA_HOME” ]; then log_error “JAVA_HOME not set” errors=$((errors + 1)) else log_info “JAVA_HOME: $JAVA_HOME” fi # 检查Kylin目录 if [ ! -d “$KYLIN_HOME” ]; then log_error “Kylin home directory not found: $KYLIN_HOME” errors=$((errors + 1)) else log_info “Kylin home: $KYLIN_HOME” fi # 检查配置文件 if [ ! -f “$KYLIN_CONFIG_DIR/kylin.properties” ]; then log_error “Kylin configuration file not found: $KYLIN_CONFIG_DIR/kylin.properties” errors=$((errors + 1)) fi # 检查日志目录 if [ ! -d “$KYLIN_LOG_DIR” ]; then log_warn “Log directory not found, creating: $KYLIN_LOG_DIR” mkdir -p “$KYLIN_LOG_DIR” fi # 检查磁盘空间 local disk_usage=$(df “$KYLIN_HOME” | tail -1 | awk ‘{print $5}’ | sed ’s/%//‘) if [ “$disk_usage” -gt 90 ]; then log_error “Disk usage too high: ${disk_usage}%” errors=$((errors + 1)) else log_info “Disk usage: ${disk_usage}%” fi # 检查内存 local mem_available=$(free -m | awk ‘NR==2{printf “%.0f”, $7}’) if [ “$mem_available” -lt 1024 ]; then log_warn “Low available memory: ${mem_available}MB” else log_info “Available memory: ${mem_available}MB” fi if [ $errors -eq 0 ]; then log_info “Environment check passed” return 0 else log_error “Environment check failed with $errors errors” return 1 fi }

获取系统信息

get_system_info() { echo “=== System Information ===” echo “Hostname: $(hostname)” echo “OS: $(uname -s) $(uname -r)” echo “Architecture: $(uname -m)” echo “Uptime: $(uptime | cut -d’,’ -f1 | cut -d’ ‘ -f4-)” echo echo “=== Java Information ===” if command -v java >/dev/null 2>&1; then java -version 2>&1 | head -3 else echo “Java not found” fi echo echo “=== Kylin Information ===” echo “Kylin Home: $KYLIN_HOME” echo “Kylin User: $KYLIN_USER” echo “Config Dir: $KYLIN_CONFIG_DIR” echo “Log Dir: $KYLIN_LOG_DIR” echo echo “=== Resource Usage ===” echo “CPU Usage:” top -bn1 | grep “Cpu(s)” | head -1 echo echo “Memory Usage:” free -h echo echo “Disk Usage:” df -h “$KYLIN_HOME” echo echo “=== Network Information ===” echo “Listening Ports:” netstat -tln | grep -E “:(7070|2181|9000|50070)” | head -10 }

主函数

main() { case “$1” in start) start_kylin ;; stop) stop_kylin ;; restart) restart_kylin ;; status) status_kylin ;; health) health_check ;; env) check_environment ;; info) get_system_info ;; *) echo “Usage: $0 {start|stop|restart|status|health|env|info}” echo echo “Commands:” echo “ start - Start Kylin service” echo “ stop - Stop Kylin service” echo “ restart - Restart Kylin service” echo “ status - Show Kylin service status” echo “ health - Perform health check” echo “ env - Check environment” echo “ info - Show system information” exit 1 ;; esac }

执行主函数

main “$@” “`

11.4 性能问题排查

11.4.1 查询性能分析

“`python #!/usr/bin/env python3

query_performance_analyzer.py - 查询性能分析器

import time import json import logging import requests import threading import statistics from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple from dataclasses import dataclass, asdict from collections import defaultdict, deque import matplotlib.pyplot as plt import pandas as pd @dataclass class QueryMetrics: “”“查询指标”“” query_id: str sql: str start_time: datetime end_time: datetime duration_ms: int status: str # SUCCESS, FAILED, TIMEOUT rows_returned: int cube_used: Optional[str] project: str user: str error_message: Optional[str] = None @property def duration_seconds(self) -> float: return self.duration_ms / 1000.0 @dataclass class PerformanceAlert: “”“性能告警”“” alert_type: str severity: str # LOW, MEDIUM, HIGH, CRITICAL message: str timestamp: datetime metrics: Dict suggestions: List[str] class QueryPerformanceAnalyzer: “”“查询性能分析器”“” def init(self, kylin_host: str = “localhost”, kylin_port: int = 7070, username: str = “ADMIN”, password: str = “KYLIN”): self.kylin_host = kylin_host self.kylin_port = kylin_port self.username = username self.password = password self.base_url = f”http://{kylin_host}:{kylin_port}/kylin/api” # 配置日志 self.logger = logging.getLogger(name) self.logger.setLevel(logging.INFO) # 性能数据存储 self.query_history: deque = deque(maxlen=10000) # 最近10000条查询记录 self.performance_alerts: List[PerformanceAlert] = [] # 性能阈值配置 self.thresholds = { ‘slow_query_ms’: 30000, # 慢查询阈值:30秒 ‘very_slow_query_ms’: 60000, # 极慢查询阈值:60秒 ‘high_failure_rate’: 0.1, # 高失败率阈值:10% ‘high_avg_duration_ms’: 10000, # 高平均响应时间阈值:10秒 ‘concurrent_queries’: 50 # 并发查询数阈值 } # 监控状态 self.monitoring_active = False self.monitor_thread = None # 会话管理 self.session = requests.Session() self._authenticate() def _authenticate(self): “”“认证登录”“” try: auth_url = f”{self.base_url}/user/authentication” auth_data = { “username”: self.username, “password”: self.password } response = self.session.post(auth_url, json=auth_data, timeout=10) if response.status_code == 200: self.logger.info(“Authentication successful”) else: self.logger.error(f”Authentication failed: {response.status_code}“) except Exception as e: self.logger.error(f”Authentication error: {e}“) def execute_query_with_metrics(self, sql: str, project: str) -> QueryMetrics: “”“执行查询并收集性能指标”“” query_id = f”perftest{int(time.time() * 1000)}” start_time = datetime.now() try: # 执行查询 query_url = f”{self.base_url}/query” query_data = { “sql”: sql, “project”: project, “format”: “json” } response = self.session.post(query_url, json=query_data, timeout=120) end_time = datetime.now() duration_ms = int((end_time - start_time).total_seconds() * 1000) if response.status_code == 200: result = response.json() rows_returned = len(result.get(‘results’, [])) cube_used = result.get(‘cube’, None) metrics = QueryMetrics( query_id=query_id, sql=sql, start_time=start_time, end_time=end_time, duration_ms=duration_ms, status=“SUCCESS”, rows_returned=rows_returned, cube_used=cube_used, project=project, user=self.username ) else: metrics = QueryMetrics( query_id=query_id, sql=sql, start_time=start_time, end_time=end_time, duration_ms=duration_ms, status=“FAILED”, rows_returned=0, cube_used=None, project=project, user=self.username, error_message=f”HTTP {response.status_code}: {response.text}” ) except requests.exceptions.Timeout: end_time = datetime.now() duration_ms = int((end_time - start_time).total_seconds() * 1000) metrics = QueryMetrics( query_id=query_id, sql=sql, start_time=start_time, end_time=end_time, duration_ms=duration_ms, status=“TIMEOUT”, rows_returned=0, cube_used=None, project=project, user=self.username, error_message=“Query timeout” ) except Exception as e: end_time = datetime.now() duration_ms = int((end_time - start_time).total_seconds() * 1000) metrics = QueryMetrics( query_id=query_id, sql=sql, start_time=start_time, end_time=end_time, duration_ms=duration_ms, status=“FAILED”, rows_returned=0, cube_used=None, project=project, user=self.username, error_message=str(e) ) # 存储查询记录 self.query_history.append(metrics) # 检查性能告警 self._check_performance_alerts(metrics) return metrics def _check_performance_alerts(self, metrics: QueryMetrics): “”“检查性能告警”“” alerts = [] # 检查慢查询 if metrics.duration_ms > self.thresholds[‘very_slow_query_ms’]: alerts.append(PerformanceAlert( alert_type=“VERY_SLOW_QUERY”, severity=“CRITICAL”, message=f”Very slow query detected: {metrics.duration_ms}ms”, timestamp=datetime.now(), metrics=asdict(metrics), suggestions=[ “Check if appropriate cube is being used”, “Review query complexity and optimize SQL”, “Consider adding more specific filters”, “Check system resource usage” ] )) elif metrics.duration_ms > self.thresholds[‘slow_query_ms’]: alerts.append(PerformanceAlert( alert_type=“SLOW_QUERY”, severity=“HIGH”, message=f”Slow query detected: {metrics.duration_ms}ms”, timestamp=datetime.now(), metrics=asdict(metrics), suggestions=[ “Review query execution plan”, “Check cube selection”, “Optimize WHERE clauses” ] )) # 检查查询失败 if metrics.status in [“FAILED”, “TIMEOUT”]: severity = “CRITICAL” if metrics.status == “TIMEOUT” else “HIGH” alerts.append(PerformanceAlert( alert_type=“QUERY_FAILURE”, severity=severity, message=f”Query {metrics.status.lower()}: {metrics.error_message}“, timestamp=datetime.now(), metrics=asdict(metrics), suggestions=[ “Check query syntax and validity”, “Verify data source availability”, “Review system logs for errors”, “Check network connectivity” ] )) # 添加告警到列表 self.performance_alerts.extend(alerts) # 限制告警历史数量 if len(self.performance_alerts) > 1000: self.performance_alerts = self.performance_alerts[-1000:] def analyze_query_patterns(self, hours: int = 24) -> Dict: “”“分析查询模式”“” cutoff_time = datetime.now() - timedelta(hours=hours) recent_queries = [q for q in self.query_history if q.start_time >= cutoff_time] if not recent_queries: return {“message”: “No recent queries found”} # 基本统计 total_queries = len(recent_queries) successful_queries = [q for q in recent_queries if q.status == “SUCCESS”] failed_queries = [q for q in recent_queries if q.status in [“FAILED”, “TIMEOUT”]] success_rate = len(successful_queries) / total_queries if total_queries > 0 else 0 # 响应时间统计 durations = [q.duration_ms for q in successful_queries] if durations: avg_duration = statistics.mean(durations) median_duration = statistics.median(durations) p95_duration = sorted(durations)[int(len(durations) * 0.95)] if len(durations) > 20 else max(durations) p99_duration = sorted(durations)[int(len(durations) * 0.99)] if len(durations) > 100 else max(durations) else: avg_duration = median_duration = p95_duration = p99_duration = 0 # 按项目分组 project_stats = defaultdict(list) for query in recent_queries: project_stats[query.project].append(query) project_summary = {} for project, queries in project_stats.items(): project_durations = [q.duration_ms for q in queries if q.status == “SUCCESS”] project_summary[project] = { “total_queries”: len(queries), “success_rate”: len([q for q in queries if q.status == “SUCCESS”]) / len(queries), “avg_duration_ms”: statistics.mean(project_durations) if project_durations else 0 } # 按Cube分组 cube_stats = defaultdict(list) for query in successful_queries: if query.cube_used: cube_stats[query.cube_used].append(query) cube_summary = {} for cube, queries in cube_stats.items(): cube_durations = [q.duration_ms for q in queries] cube_summary[cube] = { “query_count”: len(queries), “avg_duration_ms”: statistics.mean(cube_durations), “total_rows”: sum(q.rows_returned for q in queries) } # 慢查询分析 slow_queries = [q for q in recent_queries if q.duration_ms > self.thresholds[‘slow_query_ms’]] return { “analysis_period_hours”: hours, “total_queries”: total_queries, “success_rate”: success_rate, “failed_queries”: len(failed_queries), “performance_metrics”: { “avg_duration_ms”: avg_duration, “median_duration_ms”: median_duration, “p95_duration_ms”: p95_duration, “p99_duration_ms”: p99_duration }, “slow_queries”: { “count”: len(slow_queries), “percentage”: len(slow_queries) / total_queries if total_queries > 0 else 0 }, “project_summary”: project_summary, “cube_summary”: cube_summary, “top_slow_queries”: [ { “sql”: q.sql[:100] + “…” if len(q.sql) > 100 else q.sql, “duration_ms”: q.duration_ms, “project”: q.project, “cube”: q.cube_used } for q in sorted(slow_queries, key=lambda x: x.duration_ms, reverse=True)[:10] ] } def generate_performance_report(self, output_file: str = None) -> str: “”“生成性能报告”“” if outputfile is None: timestamp = datetime.now().strftime(‘%Y%m%d%H%M%S’) output_file = f”kylin_performancereport{timestamp}.json” # 分析最近24小时的数据 analysis = self.analyze_query_patterns(24) # 获取最近的告警 recent_alerts = [ asdict(alert) for alert in self.performance_alerts[-100:] ] # 系统状态检查 system_status = self._check_system_status() report = { “generated_at”: datetime.now().isoformat(), “report_period”: “Last 24 hours”, “query_analysis”: analysis, “recent_alerts”: recent_alerts, “system_status”: system_status, “recommendations”: self._generate_recommendations(analysis) } # 保存报告 with open(output_file, ‘w’, encoding=‘utf-8’) as f: json.dump(report, f, indent=2, ensure_ascii=False, default=str) self.logger.info(f”Performance report saved to: {output_file}“) return output_file def _check_system_status(self) -> Dict: “”“检查系统状态”“” try: # 检查系统信息 system_url = f”{self.base_url}/admin/config” response = self.session.get(system_url, timeout=10) if response.status_code == 200: config = response.json() return { “kylin_version”: config.get(“kylin.server.mode”, “Unknown”), “status”: “HEALTHY”, “response_time_ms”: response.elapsed.total_seconds() * 1000 } else: return { “status”: “UNHEALTHY”, “error”: f”HTTP {response.status_code}” } except Exception as e: return { “status”: “ERROR”, “error”: str(e) } def _generate_recommendations(self, analysis: Dict) -> List[str]: “”“生成性能优化建议”“” recommendations = [] # 基于成功率的建议 if analysis.get(“success_rate”, 1) < 0.95: recommendations.append( “Query success rate is below 95%. Check for data source issues and query syntax errors.” ) # 基于响应时间的建议 avg_duration = analysis.get(“performance_metrics”, {}).get(“avg_duration_ms”, 0) if avg_duration > self.thresholds[‘high_avg_duration_ms’]: recommendations.append( f”Average query duration ({avg_duration:.0f}ms) is high. Consider cube optimization and query tuning.” ) # 基于慢查询比例的建议 slow_query_percentage = analysis.get(“slow_queries”, {}).get(“percentage”, 0) if slow_query_percentage > 0.1: # 10% recommendations.append( f”High percentage of slow queries ({slow_query_percentage:.1%}). Review cube design and query patterns.” ) # 基于Cube使用情况的建议 cube_summary = analysis.get(“cube_summary”, {}) if cube_summary: # 找出最慢的Cube slowest_cube = max(cube_summary.items(), key=lambda x: x[1][‘avg_duration_ms’]) if slowest_cube[1][‘avg_duration_ms’] > self.thresholds[‘slow_query_ms’]: recommendations.append( f”Cube ‘{slowest_cube[0]}’ has high average query time. Consider optimization or rebuild.” ) # 通用建议 if not recommendations: recommendations.append(“System performance appears normal. Continue monitoring.”) return recommendations def start_real_time_monitoring(self, interval_seconds: int = 60): “”“启动实时监控”“” if self.monitoring_active: self.logger.warning(“Monitoring is already active”) return self.monitoring_active = True self.monitor_thread = threading.Thread( target=self._monitoring_loop, args=(interval_seconds,), daemon=True ) self.monitor_thread.start() self.logger.info(f”Real-time monitoring started (interval: {interval_seconds}s)“) def stop_real_time_monitoring(self): “”“停止实时监控”“” self.monitoring_active = False if self.monitor_thread: self.monitor_thread.join(timeout=5) self.logger.info(“Real-time monitoring stopped”) def _monitoring_loop(self, interval_seconds: int): “”“监控循环”“” while self.monitoring_active: try: # 检查系统状态 system_status = self._check_system_status() # 分析最近的查询 if len(self.query_history) > 0: recent_analysis = self.analyze_query_patterns(1) # 最近1小时 # 检查是否需要告警 avg_duration = recent_analysis.get(“performance_metrics”, {}).get(“avg_duration_ms”, 0) if avg_duration > self.thresholds[‘high_avg_duration_ms’]: alert = PerformanceAlert( alert_type=“HIGH_AVG_DURATION”, severity=“MEDIUM”, message=f”High average query duration: {avg_duration:.0f}ms”, timestamp=datetime.now(), metrics=recent_analysis, suggestions=[ “Monitor system resources”, “Check for concurrent query load”, “Review recent query patterns” ] ) self.performance_alerts.append(alert) time.sleep(interval_seconds) except Exception as e: self.logger.error(f”Error in monitoring loop: {e}“) time.sleep(interval_seconds) def create_performance_dashboard(self, output_file: str = “performance_dashboard.html”): “”“创建性能仪表板”“” if not self.query_history: self.logger.warning(“No query history available for dashboard”) return # 准备数据 df_data = [] for query in self.query_history: df_data.append({ ‘timestamp’: query.start_time, ‘duration_ms’: query.duration_ms, ‘status’: query.status, ‘project’: query.project, ‘cube’: query.cube_used or ‘Unknown’, ‘rows’: query.rows_returned }) df = pd.DataFrame(df_data) # 创建图表 fig, axes = plt.subplots(2, 2, figsize=(15, 10)) fig.suptitle(‘Kylin Query Performance Dashboard’, fontsize=16) # 1. 响应时间趋势 successful_queries = df[df[‘status’] == ‘SUCCESS’] if not successful_queries.empty: axes[0, 0].plot(successful_queries[‘timestamp’], successful_queries[‘duration_ms’]) axes[0, 0].set_title(‘Query Response Time Trend’) axes[0, 0].set_xlabel(‘Time’) axes[0, 0].set_ylabel(‘Duration (ms)’) axes[0, 0].tick_params(axis=‘x’, rotation=45) # 2. 状态分布 status_counts = df[‘status’].value_counts() axes[0, 1].pie(status_counts.values, labels=status_counts.index, autopct=‘%1.1f%%’) axes[0, 1].set_title(‘Query Status Distribution’) # 3. 项目查询分布 project_counts = df[‘project’].value_counts().head(10) axes[1, 0].bar(project_counts.index, project_counts.values) axes[1, 0].set_title(‘Queries by Project (Top 10)’) axes[1, 0].set_xlabel(‘Project’) axes[1, 0].set_ylabel(‘Query Count’) axes[1, 0].tick_params(axis=‘x’, rotation=45) # 4. 响应时间分布 if not successful_queries.empty: axes[1, 1].hist(successful_queries[‘duration_ms’], bins=20, alpha=0.7) axes[1, 1].set_title(‘Response Time Distribution’) axes[1, 1].set_xlabel(‘Duration (ms)’) axes[1, 1].set_ylabel(‘Frequency’) plt.tight_layout() # 保存图表 dashboard_image = output_file.replace(‘.html’, ‘.png’) plt.savefig(dashboard_image, dpi=300, bbox_inches=‘tight’) # 生成HTML报告 html_content = f”“” <!DOCTYPE html> Kylin Performance Dashboard

Kylin Performance Dashboard

Generated: {datetime.now().strftime(‘%Y-%m-%d %H:%M:%S’)}

Total Queries

{len(df)}

Success Rate

{len(successful_queries) / len(df) * 100:.1f}%

Avg Response Time

{successful_queries[‘duration_ms’].mean():.0f}ms

Active Projects

{df[‘project’].nunique()}

Performance Charts

Performance Charts

Recent Alerts

“”” # 添加最近的告警 recent_alerts = self.performance_alerts[-10:] if self.performance_alerts else [] for alert in recent_alerts: severity_class = alert.severity.lower() html_content += f”“”
{alert.alert_type} - {alert.severity}
{alert.message}
{alert.timestamp.strftime(‘%Y-%m-%d %H:%M:%S’)}
“”” if not recent_alerts: html_content += “

No recent alerts.

” html_content += “””
“”” # 保存HTML文件 with open(output_file, ‘w’, encoding=‘utf-8’) as f: f.write(html_content) self.logger.info(f”Performance dashboard saved to: {output_file}“) plt.close()

使用示例

if name == “main”: import argparse parser = argparse.ArgumentParser(description=‘Kylin Query Performance Analyzer’) parser.add_argument(‘–host’, default=‘localhost’, help=‘Kylin host’) parser.add_argument(‘–port’, type=int, default=7070, help=‘Kylin port’) parser.add_argument(‘–username’, default=‘ADMIN’, help=‘Username’) parser.add_argument(‘–password’, default=‘KYLIN’, help=‘Password’) parser.add_argument(‘–action’, choices=[‘test’, ‘monitor’, ‘report’, ‘dashboard’], default=‘test’, help=‘Action to perform’) parser.add_argument(‘–sql’, help=‘SQL query to test’) parser.add_argument(‘–project’, default=‘learn_kylin’, help=‘Project name’) parser.add_argument(‘–duration’, type=int, default=60, help=‘Monitoring duration in seconds’) args = parser.parse_args() # 创建分析器 analyzer = QueryPerformanceAnalyzer( kylin_host=args.host, kylin_port=args.port, username=args.username, password=args.password ) if args.action == ‘test’: # 测试查询性能 test_sql = args.sql or “SELECT COUNT(*) FROM KYLIN_SALES” print(f”Testing query: {test_sql}“) metrics = analyzer.execute_query_with_metrics(test_sql, args.project) print(f”Query completed in {metrics.duration_ms}ms”) print(f”Status: {metrics.status}“) print(f”Rows returned: {metrics.rows_returned}“) if metrics.cube_used: print(f”Cube used: {metrics.cube_used}“) if metrics.error_message: print(f”Error: {metrics.error_message}“) elif args.action == ‘monitor’: # 启动实时监控 print(f”Starting real-time monitoring for {args.duration} seconds…“) analyzer.start_real_time_monitoring() time.sleep(args.duration) analyzer.stop_real_time_monitoring() # 生成分析报告 analysis = analyzer.analyze_query_patterns() print(json.dumps(analysis, indent=2, default=str)) elif args.action == ‘report’: # 生成性能报告 report_file = analyzer.generate_performance_report() print(f”Performance report generated: {report_file}“) elif args.action == ‘dashboard’: # 创建性能仪表板 analyzer.create_performance_dashboard() print(“Performance dashboard created: performance_dashboard.html”) “`

11.4.2 系统资源监控

“`python #!/usr/bin/env python3

system_resource_monitor.py - 系统资源监控器

import psutil import time import json import logging import threading from datetime import datetime, timedelta from typing import Dict, List, Optional from dataclasses import dataclass, asdict from collections import deque import matplotlib.pyplot as plt import numpy as np @dataclass class ResourceMetrics: “”“资源指标”“” timestamp: datetime cpu_percent: float memory_percent: float memory_used_gb: float memory_total_gb: float disk_percent: float disk_used_gb: float disk_total_gb: float network_bytes_sent: int network_bytes_recv: int load_average: Optional[List[float]] = None process_count: int = 0 kylin_memory_mb: float = 0 kylin_cpu_percent: float = 0 @dataclass class ResourceAlert: “”“资源告警”“” alert_type: str severity: str message: str timestamp: datetime current_value: float threshold: float suggestions: List[str] class SystemResourceMonitor: “”“系统资源监控器”“” def init(self, kylin_process_name: str = “kylin”): self.kylin_process_name = kylin_process_name # 配置日志 self.logger = logging.getLogger(name) self.logger.setLevel(logging.INFO) # 资源数据存储 self.metrics_history: deque = deque(maxlen=1440) # 24小时数据(每分钟一个点) self.alerts: List[ResourceAlert] = [] # 告警阈值配置 self.thresholds = { ‘cpu_percent’: 80.0, ‘memory_percent’: 85.0, ‘disk_percent’: 90.0, ‘load_average_1m’: psutil.cpu_count() * 0.8, ‘kylin_memory_mb’: 8192, # 8GB ‘kylin_cpu_percent’: 50.0 } # 监控状态 self.monitoring_active = False self.monitor_thread = None # 网络基线(用于计算网络使用率) self.network_baseline = None self._establish_network_baseline() def _establish_network_baseline(self): “”“建立网络基线”“” try: net_io = psutil.net_io_counters() self.network_baseline = { ‘bytes_sent’: net_io.bytes_sent, ‘bytes_recv’: net_io.bytes_recv, ‘timestamp’: time.time() } except Exception as e: self.logger.warning(f”Could not establish network baseline: {e}“) def collect_metrics(self) -> ResourceMetrics: “”“收集系统资源指标”“” timestamp = datetime.now() # CPU指标 cpu_percent = psutil.cpu_percent(interval=1) # 内存指标 memory = psutil.virtual_memory() memory_percent = memory.percent memory_used_gb = memory.used / (10243) memory_total_gb = memory.total / (10243) # 磁盘指标 disk = psutil.disk_usage(‘/’) disk_percent = (disk.used / disk.total) * 100 disk_used_gb = disk.used / (10243) disk_total_gb = disk.total / (10243) # 网络指标 net_io = psutil.net_io_counters() network_bytes_sent = net_io.bytes_sent network_bytes_recv = net_io.bytes_recv # 负载平均值(仅Linux/Unix) load_average = None try: if hasattr(psutil, ‘getloadavg’): load_average = list(psutil.getloadavg()) except (AttributeError, OSError): pass # 进程数量 process_count = len(psutil.pids()) # Kylin进程指标 kylin_memory_mb = 0 kylin_cpu_percent = 0 try: kylin_processes = self._find_kylin_processes() if kylin_processes: total_memory = sum(p.memory_info().rss for p in kylin_processes) kylin_memory_mb = total_memory / (1024**2) total_cpu = sum(p.cpu_percent() for p in kylin_processes) kylin_cpu_percent = total_cpu except Exception as e: self.logger.debug(f”Could not collect Kylin process metrics: {e}“) metrics = ResourceMetrics( timestamp=timestamp, cpu_percent=cpu_percent, memory_percent=memory_percent, memory_used_gb=memory_used_gb, memory_total_gb=memory_total_gb, disk_percent=disk_percent, disk_used_gb=disk_used_gb, disk_total_gb=disk_total_gb, network_bytes_sent=network_bytes_sent, network_bytes_recv=network_bytes_recv, load_average=load_average, process_count=process_count, kylin_memory_mb=kylin_memory_mb, kylin_cpu_percent=kylin_cpu_percent ) # 存储指标 self.metrics_history.append(metrics) # 检查告警 self._check_resource_alerts(metrics) return metrics def _find_kylin_processes(self) -> List: “”“查找Kylin进程”“” kylin_processes = [] for proc in psutil.process_iter([‘pid’, ‘name’, ‘cmdline’]): try: proc_info = proc.info if proc_info[‘name’] and self.kylin_process_name.lower() in proc_info[‘name’].lower(): kylin_processes.append(proc) elif proc_info[‘cmdline’]: cmdline = ‘ ‘.join(proc_info[‘cmdline’]).lower() if ‘kylin’ in cmdline or ‘org.apache.kylin’ in cmdline: kylin_processes.append(proc) except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): continue return kylin_processes def _check_resource_alerts(self, metrics: ResourceMetrics): “”“检查资源告警”“” alerts = [] # CPU告警 if metrics.cpu_percent > self.thresholds[‘cpu_percent’]: severity = “CRITICAL” if metrics.cpu_percent > 95 else “HIGH” alerts.append(ResourceAlert( alert_type=“HIGH_CPU_USAGE”, severity=severity, message=f”High CPU usage: {metrics.cpu_percent:.1f}%“, timestamp=metrics.timestamp, current_value=metrics.cpu_percent, threshold=self.thresholds[‘cpu_percent’], suggestions=[ “Check for runaway processes”, “Review current query load”, “Consider scaling resources”, “Optimize query performance” ] )) # 内存告警 if metrics.memory_percent > self.thresholds[‘memory_percent’]: severity = “CRITICAL” if metrics.memory_percent > 95 else “HIGH” alerts.append(ResourceAlert( alert_type=“HIGH_MEMORY_USAGE”, severity=severity, message=f”High memory usage: {metrics.memory_percent:.1f}% ({metrics.memory_used_gb:.1f}GB)“, timestamp=metrics.timestamp, current_value=metrics.memory_percent, threshold=self.thresholds[‘memory_percent’], suggestions=[ “Check for memory leaks”, “Restart Kylin service if necessary”, “Increase available memory”, “Optimize cube build settings” ] )) # 磁盘告警 if metrics.disk_percent > self.thresholds[‘disk_percent’]: severity = “CRITICAL” if metrics.disk_percent > 98 else “HIGH” alerts.append(ResourceAlert( alert_type=“HIGH_DISK_USAGE”, severity=severity, message=f”High disk usage: {metrics.disk_percent:.1f}% ({metrics.disk_used_gb:.1f}GB)“, timestamp=metrics.timestamp, current_value=metrics.disk_percent, threshold=self.thresholds[‘disk_percent’], suggestions=[ “Clean up old log files”, “Remove unnecessary cube segments”, “Archive old data”, “Add more disk space” ] )) # 负载平均值告警 if metrics.load_average and len(metrics.load_average) > 0: load_1m = metrics.load_average[0] if load_1m > self.thresholds[‘load_average_1m’]: severity = “HIGH” if load_1m < self.thresholds[‘load_average_1m’] * 1.5 else “CRITICAL” alerts.append(ResourceAlert( alert_type=“HIGH_LOAD_AVERAGE”, severity=severity, message=f”High load average: {load_1m:.2f}“, timestamp=metrics.timestamp, current_value=load_1m, threshold=self.thresholds[‘load_average_1m’], suggestions=[ “Check for I/O bottlenecks”, “Review concurrent processes”, “Monitor disk and network performance”, “Consider load balancing” ] )) # Kylin进程告警 if metrics.kylin_memory_mb > self.thresholds[‘kylin_memory_mb’]: alerts.append(ResourceAlert( alert_type=“HIGH_KYLIN_MEMORY”, severity=“MEDIUM”, message=f”High Kylin memory usage: {metrics.kylin_memory_mb:.0f}MB”, timestamp=metrics.timestamp, current_value=metrics.kylin_memory_mb, threshold=self.thresholds[‘kylin_memory_mb’], suggestions=[ “Check for memory leaks in Kylin”, “Review JVM heap settings”, “Monitor cube build processes”, “Consider restarting Kylin service” ] )) if metrics.kylin_cpu_percent > self.thresholds[‘kylin_cpu_percent’]: alerts.append(ResourceAlert( alert_type=“HIGH_KYLIN_CPU”, severity=“MEDIUM”, message=f”High Kylin CPU usage: {metrics.kylin_cpu_percent:.1f}%“, timestamp=metrics.timestamp, current_value=metrics.kylin_cpu_percent, threshold=self.thresholds[‘kylin_cpu_percent’], suggestions=[ “Check current query load”, “Review cube build status”, “Optimize query performance”, “Monitor concurrent operations” ] )) # 添加告警到列表 self.alerts.extend(alerts) # 限制告警历史数量 if len(self.alerts) > 1000: self.alerts = self.alerts[-1000:] def start_monitoring(self, interval_seconds: int = 60): “”“启动资源监控”“” if self.monitoring_active: self.logger.warning(“Monitoring is already active”) return self.monitoring_active = True self.monitor_thread = threading.Thread( target=self._monitoring_loop, args=(interval_seconds,), daemon=True ) self.monitor_thread.start() self.logger.info(f”Resource monitoring started (interval: {interval_seconds}s)“) def stop_monitoring(self): “”“停止资源监控”“” self.monitoring_active = False if self.monitor_thread: self.monitor_thread.join(timeout=5) self.logger.info(“Resource monitoring stopped”) def _monitoring_loop(self, interval_seconds: int): “”“监控循环”“” while self.monitoring_active: try: metrics = self.collect_metrics() self.logger.debug(f”Collected metrics: CPU={metrics.cpu_percent:.1f}%, “ f”Memory={metrics.memory_percent:.1f}%, “ f”Disk={metrics.disk_percent:.1f}%“) time.sleep(interval_seconds) except Exception as e: self.logger.error(f”Error in monitoring loop: {e}“) time.sleep(interval_seconds) def get_resource_summary(self, hours: int = 24) -> Dict: “”“获取资源使用摘要”“” cutoff_time = datetime.now() - timedelta(hours=hours) recent_metrics = [m for m in self.metrics_history if m.timestamp >= cutoff_time] if not recent_metrics: return {“message”: “No recent metrics available”} # 计算统计信息 cpu_values = [m.cpu_percent for m in recent_metrics] memory_values = [m.memory_percent for m in recent_metrics] disk_values = [m.disk_percent for m in recent_metrics] summary = { “period_hours”: hours, “data_points”: len(recent_metrics), “cpu”: { “current”: recent_metrics[-1].cpu_percent if recent_metrics else 0, “average”: sum(cpu_values) / len(cpu_values) if cpu_values else 0, “max”: max(cpu_values) if cpu_values else 0, “min”: min(cpu_values) if cpu_values else 0 }, “memory”: { “current_percent”: recent_metrics[-1].memory_percent if recent_metrics else 0, “current_gb”: recent_metrics[-1].memory_used_gb if recent_metrics else 0, “total_gb”: recent_metrics[-1].memory_total_gb if recent_metrics else 0, “average_percent”: sum(memory_values) / len(memory_values) if memory_values else 0, “max_percent”: max(memory_values) if memory_values else 0 }, “disk”: { “current_percent”: recent_metrics[-1].disk_percent if recent_metrics else 0, “current_used_gb”: recent_metrics[-1].disk_used_gb if recent_metrics else 0, “total_gb”: recent_metrics[-1].disk_total_gb if recent_metrics else 0, “average_percent”: sum(disk_values) / len(disk_values) if disk_values else 0, “max_percent”: max(disk_values) if disk_values else 0 }, “kylin_process”: { “current_memory_mb”: recent_metrics[-1].kylin_memory_mb if recent_metrics else 0, “current_cpu_percent”: recent_metrics[-1].kylin_cpu_percent if recent_metrics else 0, “average_memory_mb”: sum(m.kylin_memory_mb for m in recent_metrics) / len(recent_metrics) if recent_metrics else 0, “average_cpu_percent”: sum(m.kylin_cpu_percent for m in recent_metrics) / len(recent_metrics) if recent_metrics else 0 }, “alerts”: { “total”: len([a for a in self.alerts if a.timestamp >= cutoff_time]), “critical”: len([a for a in self.alerts if a.timestamp >= cutoff_time and a.severity == “CRITICAL”]), “high”: len([a for a in self.alerts if a.timestamp >= cutoff_time and a.severity == “HIGH”]), “medium”: len([a for a in self.alerts if a.timestamp >= cutoff_time and a.severity == “MEDIUM”]) } } return summary def generate_resource_report(self, output_file: str = None) -> str: “”“生成资源使用报告”“” if outputfile is None: timestamp = datetime.now().strftime(‘%Y%m%d%H%M%S’) output_file = f”resourcereport{timestamp}.json” # 获取摘要信息 summary = self.get_resource_summary(24) # 获取最近的告警 recent_alerts = [ asdict(alert) for alert in self.alerts[-50:] ] # 生成报告 report = { “generated_at”: datetime.now().isoformat(), “monitoring_period”: “Last 24 hours”, “resource_summary”: summary, “recent_alerts”: recent_alerts, “thresholds”: self.thresholds, “recommendations”: self._generate_resource_recommendations(summary) } # 保存报告 with open(output_file, ‘w’, encoding=‘utf-8’) as f: json.dump(report, f, indent=2, ensure_ascii=False, default=str) self.logger.info(f”Resource report saved to: {output_file}“) return output_file def _generate_resource_recommendations(self, summary: Dict) -> List[str]: “”“生成资源优化建议”“” recommendations = [] # CPU建议 cpu_avg = summary.get(“cpu”, {}).get(“average”, 0) if cpu_avg > 70: recommendations.append( f”Average CPU usage is high ({cpu_avg:.1f}%). Consider optimizing queries or scaling resources.” ) # 内存建议 memory_avg = summary.get(“memory”, {}).get(“average_percent”, 0) if memory_avg > 75: recommendations.append( f”Average memory usage is high ({memory_avg:.1f}%). Monitor for memory leaks and consider increasing RAM.” ) # 磁盘建议 disk_current = summary.get(“disk”, {}).get(“current_percent”, 0) if disk_current > 80: recommendations.append( f”Disk usage is high ({disk_current:.1f}%). Clean up old files and consider adding storage.” ) # Kylin进程建议 kylin_memory = summary.get(“kylin_process”, {}).get(“average_memory_mb”, 0) if kylin_memory > 6144: # 6GB recommendations.append( f”Kylin process memory usage is high ({kylin_memory:.0f}MB). Review JVM settings and cube configurations.” ) # 告警建议 critical_alerts = summary.get(“alerts”, {}).get(“critical”, 0) if critical_alerts > 0: recommendations.append( f”There are {critical_alerts} critical alerts. Immediate attention required.” ) if not recommendations: recommendations.append(“Resource usage appears normal. Continue monitoring.”) return recommendations def create_resource_charts(self, output_file: str = “resource_charts.png”, hours: int = 24): “”“创建资源使用图表”“” cutoff_time = datetime.now() - timedelta(hours=hours) recent_metrics = [m for m in self.metrics_history if m.timestamp >= cutoff_time] if not recent_metrics: self.logger.warning(“No recent metrics available for charts”) return # 准备数据 timestamps = [m.timestamp for m in recent_metrics] cpu_values = [m.cpu_percent for m in recent_metrics] memory_values = [m.memory_percent for m in recent_metrics] disk_values = [m.disk_percent for m in recent_metrics] kylin_memory_values = [m.kylin_memory_mb for m in recent_metrics] # 创建图表 fig, axes = plt.subplots(2, 2, figsize=(15, 10)) fig.suptitle(f’System Resource Usage - Last {hours} Hours’, fontsize=16) # CPU使用率 axes[0, 0].plot(timestamps, cpu_values, ‘b-’, linewidth=2) axes[0, 0].axhline(y=self.thresholds[‘cpu_percent’], color=‘r’, linestyle=‘–’, alpha=0.7) axes[0, 0].set_title(‘CPU Usage (%)’) axes[0, 0].set_ylabel(‘CPU %’) axes[0, 0].grid(True, alpha=0.3) axes[0, 0].tick_params(axis=‘x’, rotation=45) # 内存使用率 axes[0, 1].plot(timestamps, memory_values, ‘g-’, linewidth=2) axes[0, 1].axhline(y=self.thresholds[‘memory_percent’], color=‘r’, linestyle=‘–’, alpha=0.7) axes[0, 1].set_title(‘Memory Usage (%)’) axes[0, 1].set_ylabel(‘Memory %’) axes[0, 1].grid(True, alpha=0.3) axes[0, 1].tick_params(axis=‘x’, rotation=45) # 磁盘使用率 axes[1, 0].plot(timestamps, disk_values, ‘orange’, linewidth=2) axes[1, 0].axhline(y=self.thresholds[‘disk_percent’], color=‘r’, linestyle=‘–’, alpha=0.7) axes[1, 0].set_title(‘Disk Usage (%)’) axes[1, 0].set_ylabel(‘Disk %’) axes[1, 0].grid(True, alpha=0.3) axes[1, 0].tick_params(axis=‘x’, rotation=45) # Kylin内存使用 axes[1, 1].plot(timestamps, kylin_memory_values, ‘purple’, linewidth=2) axes[1, 1].axhline(y=self.thresholds[‘kylin_memory_mb’], color=‘r’, linestyle=‘–’, alpha=0.7) axes[1, 1].set_title(‘Kylin Memory Usage (MB)’) axes[1, 1].set_ylabel(‘Memory (MB)’) axes[1, 1].grid(True, alpha=0.3) axes[1, 1].tick_params(axis=‘x’, rotation=45) plt.tight_layout() plt.savefig(output_file, dpi=300, bbox_inches=‘tight’) plt.close() self.logger.info(f”Resource charts saved to: {output_file}“)

使用示例

if name == “main”: import argparse parser = argparse.ArgumentParser(description=‘System Resource Monitor for Kylin’) parser.add_argument(‘–action’, choices=[‘monitor’, ‘report’, ‘charts’, ‘summary’], default=‘monitor’, help=‘Action to perform’) parser.add_argument(‘–duration’, type=int, default=300, help=‘Monitoring duration in seconds’) parser.add_argument(‘–interval’, type=int, default=60, help=‘Monitoring interval in seconds’) parser.add_argument(‘–hours’, type=int, default=24, help=‘Hours of data to analyze’) args = parser.parse_args() # 创建监控器 monitor = SystemResourceMonitor() if args.action == ‘monitor’: # 启动监控 print(f”Starting resource monitoring for {args.duration} seconds…“) monitor.start_monitoring(args.interval) time.sleep(args.duration) monitor.stop_monitoring() # 显示摘要 summary = monitor.get_resource_summary(1) # 最近1小时 print(json.dumps(summary, indent=2, default=str)) elif args.action == ‘report’: # 生成报告 report_file = monitor.generate_resource_report() print(f”Resource report generated: {report_file}“) elif args.action == ‘charts’: # 创建图表 monitor.create_resource_charts(hours=args.hours) print(f”Resource charts created: resource_charts.png”) elif args.action == ‘summary’: # 显示摘要 summary = monitor.get_resource_summary(args.hours) print(json.dumps(summary, indent=2, default=str))

11.2.2 资源监控

“`python

resource_monitor.py - Kylin资源监控脚本

import time import json import logging import smtplib import requests import psutil import threading from datetime import datetime, timedelta from typing import Dict, List, Any, Optional from dataclasses import dataclass, asdict from email.mime.text import MimeText from email.mime.multipart import MimeMultipart from collections import deque, defaultdict import sqlite3 import os @dataclass class SystemMetrics: “”“系统指标数据类”“” timestamp: datetime cpu_percent: float memory_percent: float memory_available_mb: float disk_usage_percent: float disk_free_gb: float network_io_sent_mb: float network_io_recv_mb: float load_average_1m: float load_average_5m: float load_average_15m: float @dataclass class KylinMetrics: “”“Kylin服务指标数据类”“” timestamp: datetime service_status: str # ‘running’, ‘stopped’, ‘error’ response_time_ms: float active_queries: int completed_queries_1h: int failed_queries_1h: int build_jobs_running: int build_jobs_pending: int cube_count: int project_count: int error_message: Optional[str] = None class ResourceMonitor: “”“Kylin资源监控器”“” def init(self, config_file: str = ‘monitor_config.json’): self.config = self._load_config(config_file) self.setup_logging() # 监控状态 self.monitoring = False self.monitor_thread = None # 数据存储 self.metrics_history = deque(maxlen=1440) # 24小时数据(每分钟一个点) self.alerts_sent = defaultdict(lambda: datetime.min) # 初始化数据库 self.init_database() # 网络IO基线 self.network_baseline = self._get_network_baseline() def _load_config(self, config_file: str) -> Dict[str, Any]: “”“加载配置文件”“” default_config = { ‘kylin’: { ‘host’: ‘localhost’, ‘port’: 7070, ‘username’: ‘ADMIN’, ‘password’: ‘KYLIN’, ‘health_check_url’: ‘http://localhost:7070/kylin/api/admin/config’ }, ‘monitoring’: { ‘interval_seconds’: 60, ‘data_retention_days’: 30, ‘alert_cooldown_minutes’: 30 }, ‘thresholds’: { ‘cpu_warning’: 80, ‘cpu_critical’: 90, ‘memory_warning’: 85, ‘memory_critical’: 95, ‘disk_warning’: 80, ‘disk_critical’: 90, ‘response_time_warning’: 5000, ‘response_time_critical’: 10000, ‘load_warning’: 2.0, ‘load_critical’: 4.0 }, ‘alerts’: { ‘email’: { ‘enabled’: False, ‘smtp_server’: ‘smtp.example.com’, ‘smtp_port’: 587, ‘username’: ‘monitor@example.com’, ‘password’: ‘password’, ‘recipients’: [‘admin@example.com’] }, ‘webhook’: { ‘enabled’: False, ‘url’: ‘https://hooks.slack.com/services/xxx’, ‘timeout’: 10 } }, ‘database’: { ‘path’: ‘kylin_monitor.db’ } } try: if os.path.exists(config_file): with open(config_file, ‘r’, encoding=‘utf-8’) as f: user_config = json.load(f) # 合并配置 self._merge_config(default_config, user_config) else: # 创建默认配置文件 with open(config_file, ‘w’, encoding=‘utf-8’) as f: json.dump(default_config, f, indent=2, ensure_ascii=False) print(f”Created default config file: {config_file}“) except Exception as e: print(f”Error loading config: {e}, using default config”) return default_config def _merge_config(self, base: Dict, update: Dict): “”“递归合并配置”“” for key, value in update.items(): if key in base and isinstance(base[key], dict) and isinstance(value, dict): self._merge_config(base[key], value) else: base[key] = value def setup_logging(self): “”“设置日志”“” logging.basicConfig( level=logging.INFO, format=‘%(asctime)s - %(name)s - %(levelname)s - %(message)s’, handlers=[ logging.FileHandler(‘kylin_monitor.log’), logging.StreamHandler() ] ) self.logger = logging.getLogger(name) def init_database(self): “”“初始化数据库”“” db_path = self.config[‘database’][‘path’] try: conn = sqlite3.connect(db_path) cursor = conn.cursor() # 创建系统指标表 cursor.execute(“’ CREATE TABLE IF NOT EXISTS system_metrics ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, cpu_percent REAL, memory_percent REAL, memory_available_mb REAL, disk_usage_percent REAL, disk_free_gb REAL, network_io_sent_mb REAL, network_io_recv_mb REAL, load_average_1m REAL, load_average_5m REAL, load_average_15m REAL ) “‘) # 创建Kylin指标表 cursor.execute(”’ CREATE TABLE IF NOT EXISTS kylin_metrics ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, service_status TEXT, response_time_ms REAL, active_queries INTEGER, completed_queries_1h INTEGER, failed_queries_1h INTEGER, build_jobs_running INTEGER, build_jobs_pending INTEGER, cube_count INTEGER, project_count INTEGER, error_message TEXT ) “‘) # 创建告警记录表 cursor.execute(”’ CREATE TABLE IF NOT EXISTS alert_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, alert_type TEXT, severity TEXT, message TEXT, metric_value REAL, threshold_value REAL ) “‘) conn.commit() conn.close() self.logger.info(f”Database initialized: {db_path}“) except Exception as e: self.logger.error(f”Failed to initialize database: {e}“) def _get_network_baseline(self) -> Dict[str, float]: “”“获取网络IO基线”“” try: net_io = psutil.net_io_counters() return { ‘bytes_sent’: net_io.bytes_sent / (1024 * 1024), # MB ‘bytes_recv’: net_io.bytes_recv / (1024 * 1024) # MB } except: return {‘bytes_sent’: 0, ‘bytes_recv’: 0} def collect_system_metrics(self) -> SystemMetrics: “”“收集系统指标”“” try: # CPU使用率 cpu_percent = psutil.cpu_percent(interval=1) # 内存信息 memory = psutil.virtual_memory() memory_percent = memory.percent memory_available_mb = memory.available / (1024 * 1024) # 磁盘信息 disk = psutil.disk_usage(‘/’) disk_usage_percent = (disk.used / disk.total) * 100 disk_free_gb = disk.free / (1024 * 1024 * 1024) # 网络IO net_io = psutil.net_io_counters() network_io_sent_mb = net_io.bytes_sent / (1024 * 1024) - self.network_baseline[‘bytes_sent’] network_io_recv_mb = net_io.bytes_recv / (1024 * 1024) - self.network_baseline[‘bytes_recv’] # 系统负载 load_avg = psutil.getloadavg() return SystemMetrics( timestamp=datetime.now(), cpu_percent=cpu_percent, memory_percent=memory_percent, memory_available_mb=memory_available_mb, disk_usage_percent=disk_usage_percent, disk_free_gb=disk_free_gb, network_io_sent_mb=max(0, network_io_sent_mb), network_io_recv_mb=max(0, network_io_recv_mb), load_average_1m=load_avg[0], load_average_5m=load_avg[1], load_average_15m=load_avg[2] ) except Exception as e: self.logger.error(f”Error collecting system metrics: {e}“) return None def collect_kylin_metrics(self) -> KylinMetrics: “”“收集Kylin服务指标”“” timestamp = datetime.now() try: # 检查服务状态和响应时间 start_time = time.time() kylin_config = self.config[‘kylin’] health_url = kylin_config[‘health_check_url’] response = requests.get( health_url, auth=(kylin_config[‘username’], kylin_config[‘password’]), timeout=10 ) response_time_ms = (time.time() - start_time) * 1000 if response.status_code in [200, 401]: # 401表示需要认证,但服务正常 service_status = ‘running’ error_message = None else: service_status = ‘error’ error_message = f”HTTP {response.status_code}: {response.text[:100]}” # 获取其他指标(这里使用模拟数据,实际应调用Kylin API) active_queries = self._get_active_queries_count() completed_queries_1h = self._get_completed_queries_count() failed_queries_1h = self._get_failed_queries_count() build_jobs_running = self._get_running_build_jobs_count() build_jobs_pending = self._get_pending_build_jobs_count() cube_count = self._get_cube_count() project_count = self._get_project_count() return KylinMetrics( timestamp=timestamp, service_status=service_status, response_time_ms=response_time_ms, active_queries=active_queries, completed_queries_1h=completed_queries_1h, failed_queries_1h=failed_queries_1h, build_jobs_running=build_jobs_running, build_jobs_pending=build_jobs_pending, cube_count=cube_count, project_count=project_count, error_message=error_message ) except requests.exceptions.Timeout: return KylinMetrics( timestamp=timestamp, service_status=‘timeout’, response_time_ms=10000, # 超时 active_queries=0, completed_queries_1h=0, failed_queries_1h=0, build_jobs_running=0, build_jobs_pending=0, cube_count=0, project_count=0, error_message=‘Service timeout’ ) except Exception as e: return KylinMetrics( timestamp=timestamp, service_status=‘stopped’, response_time_ms=0, active_queries=0, completed_queries_1h=0, failed_queries_1h=0, build_jobs_running=0, build_jobs_pending=0, cube_count=0, project_count=0, error_message=str(e) ) def _get_active_queries_count(self) -> int: “”“获取活跃查询数量(模拟)”“” # 实际实现应调用Kylin API return 0 def _get_completed_queries_count(self) -> int: “”“获取最近1小时完成的查询数量(模拟)”“” # 实际实现应调用Kylin API或查询日志 return 0 def _get_failed_queries_count(self) -> int: “”“获取最近1小时失败的查询数量(模拟)”“” # 实际实现应调用Kylin API或查询日志 return 0 def _get_running_build_jobs_count(self) -> int: “”“获取运行中的构建作业数量(模拟)”“” # 实际实现应调用Kylin API return 0 def _get_pending_build_jobs_count(self) -> int: “”“获取等待中的构建作业数量(模拟)”“” # 实际实现应调用Kylin API return 0 def _get_cube_count(self) -> int: “”“获取Cube数量(模拟)”“” # 实际实现应调用Kylin API return 0 def _get_project_count(self) -> int: “”“获取项目数量(模拟)”“” # 实际实现应调用Kylin API return 0 def check_alerts(self, system_metrics: SystemMetrics, kylin_metrics: KylinMetrics): “”“检查告警条件”“” alerts = [] thresholds = self.config[‘thresholds’] # 系统资源告警 if system_metrics: # CPU告警 if system_metrics.cpu_percent >= thresholds[‘cpu_critical’]: alerts.append({ ‘type’: ‘CPU_CRITICAL’, ‘severity’: ‘CRITICAL’, ‘message’: f’CPU usage critical: {system_metrics.cpu_percent:.1f}%‘, ‘value’: system_metrics.cpu_percent, ‘threshold’: thresholds[‘cpu_critical’] }) elif system_metrics.cpu_percent >= thresholds[‘cpu_warning’]: alerts.append({ ‘type’: ‘CPU_WARNING’, ‘severity’: ‘WARNING’, ‘message’: f’CPU usage high: {system_metrics.cpu_percent:.1f}%‘, ‘value’: system_metrics.cpu_percent, ‘threshold’: thresholds[‘cpu_warning’] }) # 内存告警 if system_metrics.memory_percent >= thresholds[‘memory_critical’]: alerts.append({ ‘type’: ‘MEMORY_CRITICAL’, ‘severity’: ‘CRITICAL’, ‘message’: f’Memory usage critical: {system_metrics.memory_percent:.1f}%‘, ‘value’: system_metrics.memory_percent, ‘threshold’: thresholds[‘memory_critical’] }) elif system_metrics.memory_percent >= thresholds[‘memory_warning’]: alerts.append({ ‘type’: ‘MEMORY_WARNING’, ‘severity’: ‘WARNING’, ‘message’: f’Memory usage high: {system_metrics.memory_percent:.1f}%‘, ‘value’: system_metrics.memory_percent, ‘threshold’: thresholds[‘memory_warning’] }) # 磁盘告警 if system_metrics.disk_usage_percent >= thresholds[‘disk_critical’]: alerts.append({ ‘type’: ‘DISK_CRITICAL’, ‘severity’: ‘CRITICAL’, ‘message’: f’Disk usage critical: {system_metrics.disk_usage_percent:.1f}%‘, ‘value’: system_metrics.disk_usage_percent, ‘threshold’: thresholds[‘disk_critical’] }) elif system_metrics.disk_usage_percent >= thresholds[‘disk_warning’]: alerts.append({ ‘type’: ‘DISK_WARNING’, ‘severity’: ‘WARNING’, ‘message’: f’Disk usage high: {system_metrics.disk_usage_percent:.1f}%‘, ‘value’: system_metrics.disk_usage_percent, ‘threshold’: thresholds[‘disk_warning’] }) # 系统负载告警 if system_metrics.load_average_1m >= thresholds[‘load_critical’]: alerts.append({ ‘type’: ‘LOAD_CRITICAL’, ‘severity’: ‘CRITICAL’, ‘message’: f’System load critical: {system_metrics.load_average_1m:.2f}‘, ‘value’: system_metrics.load_average_1m, ‘threshold’: thresholds[‘load_critical’] }) elif system_metrics.load_average_1m >= thresholds[‘load_warning’]: alerts.append({ ‘type’: ‘LOAD_WARNING’, ‘severity’: ‘WARNING’, ‘message’: f’System load high: {system_metrics.load_average_1m:.2f}‘, ‘value’: system_metrics.load_average_1m, ‘threshold’: thresholds[‘load_warning’] }) # Kylin服务告警 if kylin_metrics: # 服务状态告警 if kylin_metrics.service_status != ‘running’: alerts.append({ ‘type’: ‘SERVICE_DOWN’, ‘severity’: ‘CRITICAL’, ‘message’: f’Kylin service is {kylin_metrics.service_status}: {kylin_metrics.error_message or “Unknown error”}‘, ‘value’: 0, ‘threshold’: 1 }) # 响应时间告警 if kylin_metrics.response_time_ms >= thresholds[‘response_time_critical’]: alerts.append({ ‘type’: ‘RESPONSE_TIME_CRITICAL’, ‘severity’: ‘CRITICAL’, ‘message’: f’Kylin response time critical: {kylin_metrics.response_time_ms:.0f}ms’, ‘value’: kylin_metrics.response_time_ms, ‘threshold’: thresholds[‘response_time_critical’] }) elif kylin_metrics.response_time_ms >= thresholds[‘response_time_warning’]: alerts.append({ ‘type’: ‘RESPONSE_TIME_WARNING’, ‘severity’: ‘WARNING’, ‘message’: f’Kylin response time high: {kylin_metrics.response_time_ms:.0f}ms’, ‘value’: kylin_metrics.response_time_ms, ‘threshold’: thresholds[‘response_time_warning’] }) # 发送告警 for alert in alerts: self.send_alert(alert) def send_alert(self, alert: Dict[str, Any]): “”“发送告警”“” alert_type = alert[‘type’] cooldown_minutes = self.config[‘monitoring’][‘alert_cooldown_minutes’] # 检查告警冷却时间 last_sent = self.alerts_sent[alert_type] if datetime.now() - last_sent < timedelta(minutes=cooldown_minutes): return # 记录告警 self.logger.warning(f”ALERT: {alert[‘message’]}“) # 保存到数据库 self.save_alert_to_db(alert) # 发送邮件告警 if self.config[‘alerts’][‘email’][‘enabled’]: self.send_email_alert(alert) # 发送Webhook告警 if self.config[‘alerts’][‘webhook’][‘enabled’]: self.send_webhook_alert(alert) # 更新告警发送时间 self.alerts_sent[alert_type] = datetime.now() def save_alert_to_db(self, alert: Dict[str, Any]): “”“保存告警到数据库”“” try: conn = sqlite3.connect(self.config[‘database’][‘path’]) cursor = conn.cursor() cursor.execute(“’ INSERT INTO alert_history (timestamp, alert_type, severity, message, metric_value, threshold_value) VALUES (?, ?, ?, ?, ?, ?) “‘, ( datetime.now().isoformat(), alert[‘type’], alert[‘severity’], alert[‘message’], alert[‘value’], alert[‘threshold’] )) conn.commit() conn.close() except Exception as e: self.logger.error(f”Failed to save alert to database: {e}“) def send_email_alert(self, alert: Dict[str, Any]): “”“发送邮件告警”“” try: email_config = self.config[‘alerts’][‘email’] msg = MimeMultipart() msg[‘From’] = email_config[‘username’] msg[‘To’] = ‘, ‘.join(email_config[‘recipients’]) msg[‘Subject’] = f”Kylin Monitor Alert - {alert[‘severity’]}: {alert[‘type’]}” body = f”“” Alert Details: Type: {alert[‘type’]} Severity: {alert[‘severity’]} Message: {alert[‘message’]} Current Value: {alert[‘value’]} Threshold: {alert[‘threshold’]} Time: {datetime.now().strftime(‘%Y-%m-%d %H:%M:%S’)} Please check the Kylin system immediately. “”” msg.attach(MimeText(body, ‘plain’)) server = smtplib.SMTP(email_config[‘smtp_server’], email_config[‘smtp_port’]) server.starttls() server.login(email_config[‘username’], email_config[‘password’]) text = msg.as_string() server.sendmail(email_config[‘username’], email_config[‘recipients’], text) server.quit() self.logger.info(f”Email alert sent for {alert[‘type’]}“) except Exception as e: self.logger.error(f”Failed to send email alert: {e}“) def send_webhook_alert(self, alert: Dict[str, Any]): “”“发送Webhook告警”“” try: webhook_config = self.config[‘alerts’][‘webhook’] payload = { ‘text’: f”Kylin Monitor Alert - {alert[‘severity’]}“, ‘attachments’: [{ ‘color’: ‘danger’ if alert[‘severity’] == ‘CRITICAL’ else ‘warning’, ‘fields’: [ {‘title’: ‘Type’, ‘value’: alert[‘type’], ‘short’: True}, {‘title’: ‘Severity’, ‘value’: alert[‘severity’], ‘short’: True}, {‘title’: ‘Message’, ‘value’: alert[‘message’], ‘short’: False}, {‘title’: ‘Value’, ‘value’: str(alert[‘value’]), ‘short’: True}, {‘title’: ‘Threshold’, ‘value’: str(alert[‘threshold’]), ‘short’: True} ], ‘timestamp’: datetime.now().isoformat() }] } response = requests.post( webhook_config[‘url’], json=payload, timeout=webhook_config[‘timeout’] ) if response.status_code == 200: self.logger.info(f”Webhook alert sent for {alert[‘type’]}“) else: self.logger.error(f”Webhook alert failed: {response.status_code}“) except Exception as e: self.logger.error(f”Failed to send webhook alert: {e}“) def save_metrics_to_db(self, system_metrics: SystemMetrics, kylin_metrics: KylinMetrics): “”“保存指标到数据库”“” try: conn = sqlite3.connect(self.config[‘database’][‘path’]) cursor = conn.cursor() # 保存系统指标 if system_metrics: cursor.execute(“’ INSERT INTO system_metrics (timestamp, cpu_percent, memory_percent, memory_available_mb, disk_usage_percent, disk_free_gb, network_io_sent_mb, network_io_recv_mb, load_average_1m, load_average_5m, load_average_15m) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) “‘, ( system_metrics.timestamp.isoformat(), system_metrics.cpu_percent, system_metrics.memory_percent, system_metrics.memory_available_mb, system_metrics.disk_usage_percent, system_metrics.disk_free_gb, system_metrics.network_io_sent_mb, system_metrics.network_io_recv_mb, system_metrics.load_average_1m, system_metrics.load_average_5m, system_metrics.load_average_15m )) # 保存Kylin指标 if kylin_metrics: cursor.execute(”’ INSERT INTO kylin_metrics (timestamp, service_status, response_time_ms, active_queries, completed_queries_1h, failed_queries_1h, build_jobs_running, build_jobs_pending, cube_count, project_count, error_message) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) “‘, ( kylin_metrics.timestamp.isoformat(), kylin_metrics.service_status, kylin_metrics.response_time_ms, kylin_metrics.active_queries, kylin_metrics.completed_queries_1h, kylin_metrics.failed_queries_1h, kylin_metrics.build_jobs_running, kylin_metrics.build_jobs_pending, kylin_metrics.cube_count, kylin_metrics.project_count, kylin_metrics.error_message )) conn.commit() conn.close() except Exception as e: self.logger.error(f”Failed to save metrics to database: {e}“) def cleanup_old_data(self): “”“清理过期数据”“” try: retention_days = self.config[‘monitoring’][‘data_retention_days’] cutoff_date = datetime.now() - timedelta(days=retention_days) conn = sqlite3.connect(self.config[‘database’][‘path’]) cursor = conn.cursor() # 清理系统指标 cursor.execute(‘DELETE FROM system_metrics WHERE timestamp < ?’, (cutoff_date.isoformat(),)) system_deleted = cursor.rowcount # 清理Kylin指标 cursor.execute(‘DELETE FROM kylin_metrics WHERE timestamp < ?’, (cutoff_date.isoformat(),)) kylin_deleted = cursor.rowcount # 清理告警历史 cursor.execute(‘DELETE FROM alert_history WHERE timestamp < ?’, (cutoff_date.isoformat(),)) alert_deleted = cursor.rowcount conn.commit() conn.close() if system_deleted > 0 or kylin_deleted > 0 or alert_deleted > 0: self.logger.info(f”Cleaned up old data: {system_deleted} system metrics, {kylin_deleted} kylin metrics, {alert_deleted} alerts”) except Exception as e: self.logger.error(f”Failed to cleanup old data: {e}“) def generate_summary_report(self, hours: int = 24) -> Dict[str, Any]: “”“生成摘要报告”“” try: cutoff_time = datetime.now() - timedelta(hours=hours) conn = sqlite3.connect(self.config[‘database’][‘path’]) cursor = conn.cursor() # 系统指标摘要 cursor.execute(“’ SELECT AVG(cpu_percent) as avg_cpu, MAX(cpu_percent) as max_cpu, AVG(memory_percent) as avg_memory, MAX(memory_percent) as max_memory, AVG(disk_usage_percent) as avg_disk, MAX(disk_usage_percent) as max_disk, AVG(load_average_1m) as avg_load FROM system_metrics WHERE timestamp > ? “‘, (cutoff_time.isoformat(),)) system_summary = cursor.fetchone() # Kylin指标摘要 cursor.execute(”’ SELECT AVG(response_time_ms) as avg_response_time, MAX(response_time_ms) as max_response_time, SUM(CASE WHEN service_status = ‘running’ THEN 1 ELSE 0 END) as uptime_count, COUNT() as total_count FROM kylin_metrics WHERE timestamp > ? “‘, (cutoff_time.isoformat(),)) kylin_summary = cursor.fetchone() # 告警摘要 cursor.execute(”’ SELECT alert_type, severity, COUNT() as count FROM alert_history WHERE timestamp > ? GROUP BY alert_type, severity ORDER BY count DESC “‘, (cutoff_time.isoformat(),)) alert_summary = cursor.fetchall() conn.close() # 计算可用性 uptime_percentage = 0 if kylin_summary and kylin_summary[3] > 0: uptime_percentage = (kylin_summary[2] / kylin_summary[3]) * 100 return { ‘period_hours’: hours, ‘generated_at’: datetime.now().isoformat(), ‘system_metrics’: { ‘avg_cpu_percent’: round(system_summary[0] or 0, 2), ‘max_cpu_percent’: round(system_summary[1] or 0, 2), ‘avg_memory_percent’: round(system_summary[2] or 0, 2), ‘max_memory_percent’: round(system_summary[3] or 0, 2), ‘avg_disk_percent’: round(system_summary[4] or 0, 2), ‘max_disk_percent’: round(system_summary[5] or 0, 2), ‘avg_load’: round(system_summary[6] or 0, 2) }, ‘kylin_metrics’: { ‘avg_response_time_ms’: round(kylin_summary[0] or 0, 2), ‘max_response_time_ms’: round(kylin_summary[1] or 0, 2), ‘uptime_percentage’: round(uptime_percentage, 2) }, ‘alerts’: [ { ‘type’: alert[0], ‘severity’: alert[1], ‘count’: alert[2] } for alert in alert_summary ] } except Exception as e: self.logger.error(f”Failed to generate summary report: {e}“) return {} def start_monitoring(self): “”“启动监控”“” if self.monitoring: self.logger.warning(“Monitoring is already running”) return self.monitoring = True self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True) self.monitor_thread.start() self.logger.info(“Resource monitoring started”) def stop_monitoring(self): “”“停止监控”“” self.monitoring = False if self.monitor_thread: self.monitor_thread.join(timeout=10) self.logger.info(“Resource monitoring stopped”) def _monitor_loop(self): “”“监控循环”“” interval = self.config[‘monitoring’][‘interval_seconds’] cleanup_counter = 0 while self.monitoring: try: # 收集指标 system_metrics = self.collect_system_metrics() kylin_metrics = self.collect_kylin_metrics() # 检查告警 self.check_alerts(system_metrics, kylin_metrics) # 保存到数据库 self.save_metrics_to_db(system_metrics, kylin_metrics) # 保存到内存(用于实时查询) self.metrics_history.append({ ‘timestamp’: datetime.now(), ‘system’: asdict(system_metrics) if system_metrics else None, ‘kylin’: asdict(kylin_metrics) if kylin_metrics else None }) # 定期清理过期数据(每小时一次) cleanup_counter += 1 if cleanup_counter >= (3600 // interval): # 每小时 self.cleanup_old_data() cleanup_counter = 0 time.sleep(interval) except Exception as e: self.logger.error(f”Error in monitoring loop: {e}“) time.sleep(interval)

使用示例

if name == “main”: # 创建监控器 monitor = ResourceMonitor() try: # 启动监控 monitor.start_monitoring() # 运行一段时间 print(“Monitoring started. Press Ctrl+C to stop…”) while True: time.sleep(60) # 生成摘要报告 report = monitor.generate_summary_report(1) # 最近1小时 print(f”\nSummary Report (Last 1 hour):“) print(json.dumps(report, indent=2)) except KeyboardInterrupt: print(”\nStopping monitor…“) monitor.stop_monitoring() print(“Monitor stopped.”) “`

11.3 故障诊断方法

11.3.1 系统诊断脚本

“`python

system_diagnostics.py - 系统诊断脚本

import os import sys import subprocess import json import time import socket import requests from datetime import datetime from typing import Dict, List, Any, Optional, Tuple from dataclasses import dataclass, asdict import logging @dataclass class DiagnosticResult: “”“诊断结果数据类”“” category: str test_name: str status: str # ‘PASS’, ‘FAIL’, ‘WARNING’, ‘SKIP’ message: str details: Optional[Dict[str, Any]] = None timestamp: Optional[datetime] = None def post_init(self): if self.timestamp is None: self.timestamp = datetime.now() class SystemDiagnostics: “”“系统诊断工具”“” def init(self, kylin_home: str = ‘/opt/kylin’, kylin_host: str = ‘localhost’, kylin_port: int = 7070): self.kylin_home = kylin_home self.kylin_host = kylin_host self.kylin_port = kylin_port self.results: List[DiagnosticResult] = [] # 设置日志 logging.basicConfig(level=logging.INFO) self.logger = logging.getLogger(name) def run_command(self, command: str, timeout: int = 30) -> Tuple[int, str, str]: “”“执行系统命令”“” try: process = subprocess.Popen( command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) stdout, stderr = process.communicate(timeout=timeout) return process.returncode, stdout.strip(), stderr.strip() except subprocess.TimeoutExpired: process.kill() return -1, “”, f”Command timeout after {timeout} seconds” except Exception as e: return -1, “”, str(e) def add_result(self, category: str, test_name: str, status: str, message: str, details: Optional[Dict] = None): “”“添加诊断结果”“” result = DiagnosticResult( category=category, test_name=test_name, status=status, message=message, details=details ) self.results.append(result) # 记录日志 log_level = { ‘PASS’: logging.INFO, ‘WARNING’: logging.WARNING, ‘FAIL’: logging.ERROR, ‘SKIP’: logging.INFO }.get(status, logging.INFO) self.logger.log(log_level, f”[{category}] {test_name}: {status} - {message}“) def check_java_environment(self): “”“检查Java环境”“” category = “Java Environment” # 检查Java是否安装 returncode, stdout, stderr = self.run_command(“java -version”) if returncode == 0: java_version = stderr.split(‘\n’)[0] if stderr else stdout.split(‘\n’)[0] self.add_result(category, “Java Installation”, “PASS”, f”Java is installed: {java_version}“) # 检查Java版本 if “1.8” in java_version or “11” in java_version or “17” in java_version: self.add_result(category, “Java Version”, “PASS”, f”Java version is compatible: {java_version}“) else: self.add_result(category, “Java Version”, “WARNING”, f”Java version may not be optimal: {java_version}“) else: self.add_result(category, “Java Installation”, “FAIL”, “Java is not installed or not in PATH”) # 检查JAVA_HOME java_home = os.environ.get(‘JAVA_HOME’) if java_home: if os.path.exists(java_home): self.add_result(category, “JAVA_HOME”, “PASS”, f”JAVA_HOME is set and valid: {java_home}“) else: self.add_result(category, “JAVA_HOME”, “FAIL”, f”JAVA_HOME is set but directory does not exist: {java_home}“) else: self.add_result(category, “JAVA_HOME”, “WARNING”, “JAVA_HOME environment variable is not set”) def check_hadoop_environment(self): “”“检查Hadoop环境”“” category = “Hadoop Environment” # 检查Hadoop命令 returncode, stdout, stderr = self.run_command(“hadoop version”) if returncode == 0: hadoop_version = stdout.split(‘\n’)[0] if stdout else “Unknown” self.add_result(category, “Hadoop Installation”, “PASS”, f”Hadoop is installed: {hadoop_version}“) else: self.add_result(category, “Hadoop Installation”, “FAIL”, “Hadoop is not installed or not in PATH”) # 检查HADOOP_HOME hadoop_home = os.environ.get(‘HADOOP_HOME’) if hadoop_home: if os.path.exists(hadoop_home): self.add_result(category, “HADOOP_HOME”, “PASS”, f”HADOOP_HOME is set and valid: {hadoop_home}“) else: self.add_result(category, “HADOOP_HOME”, “FAIL”, f”HADOOP_HOME is set but directory does not exist: {hadoop_home}“) else: self.add_result(category, “HADOOP_HOME”, “WARNING”, “HADOOP_HOME environment variable is not set”) # 检查HDFS连接 returncode, stdout, stderr = self.run_command(“hdfs dfs -ls /”, timeout=10) if returncode == 0: self.add_result(category, “HDFS Connectivity”, “PASS”, “HDFS is accessible”) else: self.add_result(category, “HDFS Connectivity”, “FAIL”, f”Cannot access HDFS: {stderr}“) def check_hbase_environment(self): “”“检查HBase环境”“” category = “HBase Environment” # 检查HBase命令 returncode, stdout, stderr = self.run_command(“hbase version”) if returncode == 0: hbase_version = stdout.split(‘\n’)[0] if stdout else “Unknown” self.add_result(category, “HBase Installation”, “PASS”, f”HBase is installed: {hbase_version}“) else: self.add_result(category, “HBase Installation”, “FAIL”, “HBase is not installed or not in PATH”) # 检查HBASE_HOME hbase_home = os.environ.get(‘HBASE_HOME’) if hbase_home: if os.path.exists(hbase_home): self.add_result(category, “HBASE_HOME”, “PASS”, f”HBASE_HOME is set and valid: {hbase_home}“) else: self.add_result(category, “HBASE_HOME”, “FAIL”, f”HBASE_HOME is set but directory does not exist: {hbase_home}“) else: self.add_result(category, “HBASE_HOME”, “WARNING”, “HBASE_HOME environment variable is not set”) # 检查HBase连接 returncode, stdout, stderr = self.run_command(“echo ‘list’ | hbase shell -n”, timeout=15) if returncode == 0: self.add_result(category, “HBase Connectivity”, “PASS”, “HBase is accessible”) else: self.add_result(category, “HBase Connectivity”, “FAIL”, f”Cannot access HBase: {stderr}“) def check_kylin_installation(self): “”“检查Kylin安装”“” category = “Kylin Installation” # 检查Kylin目录 if os.path.exists(self.kylin_home): self.add_result(category, “Kylin Directory”, “PASS”, f”Kylin home directory exists: {self.kylin_home}“) # 检查关键文件 key_files = [ ‘bin/kylin.sh’, ‘conf/kylin.properties’, ‘lib’, ‘tomcat’ ] for file_path in key_files: full_path = os.path.join(self.kylin_home, file_path) if os.path.exists(full_path): self.add_result(category, f”Kylin {file_path}“, “PASS”, f”Required file/directory exists: {file_path}“) else: self.add_result(category, f”Kylin {file_path}“, “FAIL”, f”Required file/directory missing: {file_path}“) else: self.add_result(category, “Kylin Directory”, “FAIL”, f”Kylin home directory does not exist: {self.kylin_home}“) # 检查Kylin配置 config_file = os.path.join(self.kylin_home, ‘conf’, ‘kylin.properties’) if os.path.exists(config_file): try: with open(config_file, ‘r’) as f: config_content = f.read() # 检查关键配置项 key_configs = [ ‘kylin.metadata.url’, ‘kylin.storage.url’, ‘kylin.job.scheduler.provider.100’ ] for config_key in key_configs: if config_key in config_content: self.add_result(category, f”Config {config_key}“, “PASS”, f”Configuration key found: {config_key}“) else: self.add_result(category, f”Config {config_key}“, “WARNING”, f”Configuration key not found: {config_key}“) except Exception as e: self.add_result(category, “Kylin Configuration”, “FAIL”, f”Cannot read configuration file: {e}“) def check_system_resources(self): “”“检查系统资源”“” category = “System Resources” try: import psutil # 检查CPU cpu_count = psutil.cpu_count() cpu_percent = psutil.cpu_percent(interval=1) self.add_result(category, “CPU Count”, “PASS”, f”CPU cores: {cpu_count}“) if cpu_percent < 80: self.add_result(category, “CPU Usage”, “PASS”, f”CPU usage: {cpu_percent:.1f}%“) elif cpu_percent < 90: self.add_result(category, “CPU Usage”, “WARNING”, f”High CPU usage: {cpu_percent:.1f}%“) else: self.add_result(category, “CPU Usage”, “FAIL”, f”Critical CPU usage: {cpu_percent:.1f}%“) # 检查内存 memory = psutil.virtual_memory() memory_gb = memory.total / (10243) memory_percent = memory.percent if memory_gb >= 8: self.add_result(category, “Memory Size”, “PASS”, f”Total memory: {memory_gb:.1f}GB”) elif memory_gb >= 4: self.add_result(category, “Memory Size”, “WARNING”, f”Low memory: {memory_gb:.1f}GB (recommend 8GB+)“) else: self.add_result(category, “Memory Size”, “FAIL”, f”Insufficient memory: {memory_gb:.1f}GB (minimum 4GB)“) if memory_percent < 80: self.add_result(category, “Memory Usage”, “PASS”, f”Memory usage: {memory_percent:.1f}%“) elif memory_percent < 90: self.add_result(category, “Memory Usage”, “WARNING”, f”High memory usage: {memory_percent:.1f}%“) else: self.add_result(category, “Memory Usage”, “FAIL”, f”Critical memory usage: {memory_percent:.1f}%“) # 检查磁盘空间 disk = psutil.disk_usage(self.kylin_home if os.path.exists(self.kylin_home) else ‘/’) disk_free_gb = disk.free / (10243) disk_percent = (disk.used / disk.total) * 100 if disk_free_gb >= 10: self.add_result(category, “Disk Space”, “PASS”, f”Free disk space: {disk_free_gb:.1f}GB”) elif disk_free_gb >= 5: self.add_result(category, “Disk Space”, “WARNING”, f”Low disk space: {disk_free_gb:.1f}GB”) else: self.add_result(category, “Disk Space”, “FAIL”, f”Critical disk space: {disk_free_gb:.1f}GB”) except ImportError: self.add_result(category, “Resource Check”, “SKIP”, “psutil not available, skipping resource checks”) except Exception as e: self.add_result(category, “Resource Check”, “FAIL”, f”Error checking system resources: {e}“) def check_network_connectivity(self): “”“检查网络连通性”“” category = “Network Connectivity” # 检查本地端口 ports_to_check = [ (self.kylin_host, self.kylin_port, “Kylin Web”), (self.kylin_host, 2181, “Zookeeper”), (self.kylin_host, 9000, “Hadoop NameNode”), (self.kylin_host, 16010, “HBase Master”) ] for host, port, service in ports_to_check: try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) result = sock.connect_ex((host, port)) sock.close() if result == 0: self.add_result(category, f”{service} Port”, “PASS”, f”{service} port {port} is accessible”) else: self.add_result(category, f”{service} Port”, “FAIL”, f”{service} port {port} is not accessible”) except Exception as e: self.add_result(category, f”{service} Port”, “FAIL”, f”Error checking {service} port {port}: {e}“) def check_kylin_service_status(self): “”“检查Kylin服务状态”“” category = “Kylin Service” # 检查进程 returncode, stdout, stderr = self.run_command(“ps aux | grep kylin | grep -v grep”) if returncode == 0 and stdout: self.add_result(category, “Kylin Process”, “PASS”, “Kylin process is running”) else: self.add_result(category, “Kylin Process”, “FAIL”, “Kylin process is not running”) # 检查PID文件 pid_file = os.path.join(self.kylin_home, ‘pid’) if os.path.exists(pid_file): try: with open(pid_file, ‘r’) as f: pid = f.read().strip() # 检查进程是否存在 returncode, stdout, stderr = self.run_command(f”ps -p {pid}“) if returncode == 0: self.add_result(category, “PID File”, “PASS”, f”PID file exists and process is running: {pid}“) else: self.add_result(category, “PID File”, “WARNING”, f”PID file exists but process not running: {pid}“) except Exception as e: self.add_result(category, “PID File”, “FAIL”, f”Error reading PID file: {e}“) else: self.add_result(category, “PID File”, “WARNING”, “PID file does not exist”) # 检查Web服务 try: url = f”http://{self.kylin_host}:{self.kylin_port}/kylin/api/admin/config” response = requests.get(url, timeout=10) if response.status_code in [200, 401]: # 401表示需要认证,但服务正常 self.add_result(category, “Web Service”, “PASS”, f”Kylin web service is responding (HTTP {response.status_code})“) else: self.add_result(category, “Web Service”, “FAIL”, f”Kylin web service error (HTTP {response.status_code})“) except requests.exceptions.Timeout: self.add_result(category, “Web Service”, “FAIL”, “Kylin web service timeout”) except requests.exceptions.ConnectionError: self.add_result(category, “Web Service”, “FAIL”, “Cannot connect to Kylin web service”) except Exception as e: self.add_result(category, “Web Service”, “FAIL”, f”Error checking Kylin web service: {e}“) def run_all_diagnostics(self) -> List[DiagnosticResult]: “”“运行所有诊断检查”“” self.logger.info(“Starting system diagnostics…”) # 清空之前的结果 self.results = [] # 运行所有检查 self.check_java_environment() self.check_hadoop_environment() self.check_hbase_environment() self.check_kylin_installation() self.check_system_resources() self.check_network_connectivity() self.check_kylin_service_status() self.logger.info(f”Diagnostics completed. Total checks: {len(self.results)}“) return self.results def generate_report(self, output_file: str = None) -> str: “”“生成诊断报告”“” if outputfile is None: timestamp = datetime.now().strftime(‘%Y%m%d%H%M%S’) output_file = f”kylindiagnostics{timestamp}.json” # 统计结果 stats = { ‘total’: len(self.results), ‘pass’: len([r for r in self.results if r.status == ‘PASS’]), ‘fail’: len([r for r in self.results if r.status == ‘FAIL’]), ‘warning’: len([r for r in self.results if r.status == ‘WARNING’]), ‘skip’: len([r for r in self.results if r.status == ‘SKIP’]) } # 生成报告 report = { ‘generated_at’: datetime.now().isoformat(), ‘kylin_home’: self.kylin_home, ‘kylin_host’: self.kylin_host, ‘kylin_port’: self.kylin_port, ‘statistics’: stats, ‘results’: [asdict(result) for result in self.results], ‘summary’: { ‘overall_status’: ‘HEALTHY’ if stats[‘fail’] == 0 else ‘ISSUES_FOUND’, ‘critical_issues’: [r for r in self.results if r.status == ‘FAIL’], ‘warnings’: [r for r in self.results if r.status == ‘WARNING’] } } # 保存报告 with open(output_file, ‘w’, encoding=‘utf-8’) as f: json.dump(report, f, indent=2, ensure_ascii=False, default=str) self.logger.info(f”Diagnostic report saved to: {output_file}“) return output_file def print_summary(self): “”“打印诊断摘要”“” print(“\n” + “=”*60) print(“KYLIN SYSTEM DIAGNOSTICS SUMMARY”) print(“=”*60) # 按类别分组 categories = {} for result in self.results: if result.category not in categories: categories[result.category] = [] categories[result.category].append(result) # 打印每个类别的结果 for category, results in categories.items(): print(f”\n[{category}]“) for result in results: status_symbol = { ‘PASS’: ‘✓’, ‘FAIL’: ‘✗’, ‘WARNING’: ‘⚠’, ‘SKIP’: ‘-’ }.get(result.status, ‘?’) print(f” {status_symbol} {result.test_name}: {result.message}“) # 打印统计信息 stats = { ‘PASS’: len([r for r in self.results if r.status == ‘PASS’]), ‘FAIL’: len([r for r in self.results if r.status == ‘FAIL’]), ‘WARNING’: len([r for r in self.results if r.status == ‘WARNING’]), ‘SKIP’: len([r for r in self.results if r.status == ‘SKIP’]) } print(”\n” + “-”*60) print(“STATISTICS:”) print(f” Total checks: {len(self.results)}“) print(f” Passed: {stats[‘PASS’]}“) print(f” Failed: {stats[‘FAIL’]}“) print(f” Warnings: {stats[‘WARNING’]}“) print(f” Skipped: {stats[‘SKIP’]}“) if stats[‘FAIL’] == 0: print(”\n✓ Overall Status: HEALTHY”) else: print(“\n✗ Overall Status: ISSUES FOUND”) print(“\nCritical Issues:”) for result in self.results: if result.status == ‘FAIL’: print(f” - {result.category}: {result.test_name} - {result.message}“) print(”=“*60)

使用示例

if name == “main”: import argparse parser = argparse.ArgumentParser(description=‘Kylin System Diagnostics’) parser.add_argument(‘–kylin-home’, default=‘/opt/kylin’, help=‘Kylin home directory’) parser.add_argument(‘–kylin-host’, default=‘localhost’, help=‘Kylin host’) parser.add_argument(‘–kylin-port’, type=int, default=7070, help=‘Kylin port’) parser.add_argument(‘–output’, help=‘Output file for detailed report’) parser.add_argument(‘–quiet’, action=‘store_true’, help=‘Suppress console output’) args = parser.parse_args() # 创建诊断器 diagnostics = SystemDiagnostics( kylin_home=args.kylin_home, kylin_host=args.kylin_host, kylin_port=args.kylin_port ) # 运行诊断 results = diagnostics.run_all_diagnostics() # 生成报告 report_file = diagnostics.generate_report(args.output) # 打印摘要 if not args.quiet: diagnostics.print_summary() print(f”\nDetailed report saved to: {report_file}“) # 设置退出码 failed_checks = len([r for r in results if r.status == ‘FAIL’]) sys.exit(1 if failed_checks > 0 else 0) “`

11.3.2 网络故障排查

“`bash #!/bin/bash

network_troubleshoot.sh - 网络故障排查脚本

配置变量

KYLIN_HOST=“localhost” KYLIN_PORT=7070 ZOOKEEPER_PORT=2181 HADOOP_NAMENODE_PORT=9000 HBASE_MASTER_PORT=16010 HBASE_REGIONSERVER_PORT=16020

颜色输出

RED=‘\033[0;31m’ GREEN=‘\033[0;32m’ YELLOW=‘\033[1;33m’ BLUE=‘\033[0;34m’ NC=‘\033[0m’

日志函数

log_info() { echo -e “${GREEN}[INFO]${NC} $(date ‘+%Y-%m-%d %H:%M:%S’) - $1” } log_warn() { echo -e “${YELLOW}[WARN]${NC} $(date ‘+%Y-%m-%d %H:%M:%S’) - $1” } log_error() { echo -e “${RED}[ERROR]${NC} $(date ‘+%Y-%m-%d %H:%M:%S’) - $1” } log_debug() { echo -e “${BLUE}[DEBUG]${NC} $(date ‘+%Y-%m-%d %H:%M:%S’) - $1” }

检查命令是否存在

check_command() { if ! command -v “$1” >/dev/null 2>&1; then log_error “Command ‘$1’ not found” return 1 fi return 0 }

检查端口连通性

check_port_connectivity() { local host=“$1” local port=“$2” local service=“$3” local timeout=“${4:-5}” log_info “Checking $service connectivity ($host:$port)…” if check_command “nc”; then if nc -z -w”$timeout” “$host” “$port” 2>/dev/null; then log_info “✓ $service port $port is accessible” return 0 else log_error “✗ $service port $port is not accessible” return 1 fi elif check_command “telnet”; then # 使用telnet检查(需要expect或timeout命令) if timeout “$timeout” telnet “$host” “$port” /dev/null 2>&1; then log_info “✓ $service port $port is accessible” return 0 else log_error “✗ $service port $port is not accessible” return 1 fi else # 使用bash内置的/dev/tcp if timeout “$timeout” bash -c “/dev/null; then log_info “✓ $service port $port is accessible” return 0 else log_error “✗ $service port $port is not accessible” return 1 fi fi }

检查DNS解析

check_dns_resolution() { local hostname=“$1” log_info “Checking DNS resolution for $hostname…” if check_command “nslookup”; then local result=$(nslookup “$hostname” 2>/dev/null | grep -A1 “Name:” | tail -1 | awk ‘{print $2}’) if [ -n “$result” ]; then log_info “✓ DNS resolution successful: $hostname -> $result” return 0 else log_error “✗ DNS resolution failed for $hostname” return 1 fi elif check_command “dig”; then local result=$(dig +short “$hostname” 2>/dev/null | head -1) if [ -n “$result” ]; then log_info “✓ DNS resolution successful: $hostname -> $result” return 0 else log_error “✗ DNS resolution failed for $hostname” return 1 fi else log_warn “No DNS lookup tools available (nslookup, dig)” return 1 fi }

检查网络接口

check_network_interfaces() { log_info “Checking network interfaces…” if check_command “ip”; then echo “Network interfaces (ip addr):” ip addr show | grep -E “^[0-9]+:|inet ” | while read line; do echo “ $line” done elif check_command “ifconfig”; then echo “Network interfaces (ifconfig):” ifconfig | grep -E “^[a-zA-Z0-9]+:|inet ” | while read line; do echo “ $line” done else log_warn “No network interface tools available (ip, ifconfig)” fi echo }

检查路由表

check_routing_table() { log_info “Checking routing table…” if check_command “ip”; then echo “Routing table (ip route):” ip route show | while read line; do echo “ $line” done elif check_command “route”; then echo “Routing table (route):” route -n | while read line; do echo “ $line” done else log_warn “No routing tools available (ip, route)” fi echo }

检查防火墙状态

check_firewall_status() { log_info “Checking firewall status…” # 检查iptables if check_command “iptables”; then local iptables_rules=$(iptables -L -n 2>/dev/null | wc -l) if [ “$iptables_rules” -gt 10 ]; then log_warn “iptables has $iptables_rules rules configured” echo “Active iptables rules:” iptables -L -n | head -20 else log_info “iptables appears to be minimal/disabled” fi fi # 检查firewalld if check_command “firewall-cmd”; then if systemctl is-active firewalld >/dev/null 2>&1; then log_warn “firewalld is active” echo “Firewalld zones:” firewall-cmd –list-all-zones 2>/dev/null | head -20 else log_info “firewalld is not active” fi fi # 检查ufw if check_command “ufw”; then local ufw_status=$(ufw status 2>/dev/null | head -1) if echo “$ufw_status” | grep -q “active”; then log_warn “ufw firewall is active” ufw status numbered else log_info “ufw firewall is inactive” fi fi echo }

检查网络连接

check_network_connections() { log_info “Checking active network connections…” if check_command “netstat”; then echo “Listening ports:” netstat -tlnp 2>/dev/null | grep LISTEN | while read line; do echo “ $line” done echo echo “Established connections:” netstat -tnp 2>/dev/null | grep ESTABLISHED | head -10 | while read line; do echo “ $line” done elif check_command “ss”; then echo “Listening ports:” ss -tlnp | grep LISTEN | while read line; do echo “ $line” done echo echo “Established connections:” ss -tnp | grep ESTAB | head -10 | while read line; do echo “ $line” done else log_warn “No network connection tools available (netstat, ss)” fi echo }

检查网络延迟

check_network_latency() { local target=“$1” local count=“${2:-5}” log_info “Checking network latency to $target…” if check_command “ping”; then local ping_result=$(ping -c “$count” “$target” 2>/dev/null) if [ $? -eq 0 ]; then local avg_time=$(echo “$ping_result” | grep “avg” | awk -F’/’ ‘{print $5}’) if [ -n “$avg_time” ]; then log_info “✓ Average latency to $target: ${avg_time}ms” # 检查延迟是否过高 local avg_int=$(echo “$avg_time” | cut -d’.’ -f1) if [ “$avg_int” -gt 100 ]; then log_warn “High latency detected (>100ms)” elif [ “$avg_int” -gt 50 ]; then log_warn “Moderate latency detected (>50ms)” fi else log_info “✓ Ping to $target successful” fi else log_error “✗ Ping to $target failed” return 1 fi else log_warn “ping command not available” return 1 fi return 0 }

检查带宽

check_bandwidth() { local target=“$1” local port=“$2” log_info “Checking bandwidth to $target:$port…” if check_command “iperf3”; then log_info “Running iperf3 bandwidth test…” iperf3 -c “$target” -p “$port” -t 10 2>/dev/null elif check_command “wget”; then log_info “Testing download speed with wget…” local test_url=“http://$target:$port/” wget –spider “$test_url” 2>/dev/null if [ $? -eq 0 ]; then wget -O /dev/null “$test_url” 2>&1 | grep -E “[0-9]+[KM]B/s” else log_warn “Cannot test bandwidth - no suitable endpoint” fi else log_warn “No bandwidth testing tools available (iperf3, wget)” fi }

检查HTTP连接

check_http_connectivity() { local url=“$1” local timeout=“${2:-10}” log_info “Checking HTTP connectivity to $url…” if check_command “curl”; then local start_time=$(date +%s%N) local http_code=$(curl -s -w “%{http_code}” -o /dev/null –connect-timeout “$timeout” “$url” 2>/dev/null) local end_time=$(date +%s%N) local response_time=$(( (end_time - start_time) / 1000000 )) if [ “$http_code” = “200” ] || [ “$http_code” = “401” ]; then log_info “✓ HTTP connection successful (HTTP $http_code, ${response_time}ms)” return 0 else log_error “✗ HTTP connection failed (HTTP $http_code)” return 1 fi elif check_command “wget”; then if wget –spider –timeout=“$timeout” “$url” 2>/dev/null; then log_info “✓ HTTP connection successful” return 0 else log_error “✗ HTTP connection failed” return 1 fi else log_warn “No HTTP testing tools available (curl, wget)” return 1 fi }

生成网络诊断报告

generate_network_report() { local output_file=“${1:-networkdiagnostics$(date +%Y%m%d_%H%M%S).txt}” log_info “Generating network diagnostic report: $output_file” { echo “KYLIN NETWORK DIAGNOSTICS REPORT” echo “Generated: $(date)” echo “Host: $(hostname)” echo “User: $(whoami)” echo “=”*60 echo echo “1. NETWORK INTERFACES” echo “-”*30 check_network_interfaces echo “2. ROUTING TABLE” echo “-”*30 check_routing_table echo “3. DNS RESOLUTION” echo “-”*30 check_dns_resolution “$KYLIN_HOST” check_dns_resolution “google.com” echo echo “4. FIREWALL STATUS” echo “-”*30 check_firewall_status echo “5. NETWORK CONNECTIONS” echo “-”*30 check_network_connections echo “6. PORT CONNECTIVITY” echo “-”*30 check_port_connectivity “$KYLIN_HOST” “$KYLIN_PORT” “Kylin Web” check_port_connectivity “$KYLIN_HOST” “$ZOOKEEPER_PORT” “Zookeeper” check_port_connectivity “$KYLIN_HOST” “$HADOOP_NAMENODE_PORT” “Hadoop NameNode” check_port_connectivity “$KYLIN_HOST” “$HBASE_MASTER_PORT” “HBase Master” echo echo “7. NETWORK LATENCY” echo “-”*30 check_network_latency “$KYLIN_HOST” check_network_latency “8.8.8.8” echo echo “8. HTTP CONNECTIVITY” echo “-”*30 check_http_connectivity “http://$KYLIN_HOST:$KYLIN_PORT/kylin/” echo echo “9. SYSTEM INFORMATION” echo “-”*30 echo “OS: $(uname -a)” echo “Uptime: $(uptime)” if check_command “free”; then echo “Memory: $(free -h | grep Mem)” fi if check_command “df”; then echo “Disk: $(df -h / | tail -1)” fi echo echo “=”*60 echo “Report completed: $(date)” } > “$output_file” log_info “Network diagnostic report saved to: $output_file” return 0 }

主函数

main() { case “$1” in port) if [ -z “$2” ] || [ -z “$3” ]; then echo “Usage: $0 port [service_name]” exit 1 fi check_port_connectivity “$2” “$3” “${4:-Service}” ;; dns) if [ -z “$2” ]; then echo “Usage: $0 dns ” exit 1 fi check_dns_resolution “$2” ;; latency) if [ -z “$2” ]; then echo “Usage: $0 latency [count]” exit 1 fi check_network_latency “$2” “${3:-5}” ;; http) if [ -z “$2” ]; then echo “Usage: $0 http [timeout]” exit 1 fi check_http_connectivity “$2” “${3:-10}” ;; interfaces) check_network_interfaces ;; routes) check_routing_table ;; firewall) check_firewall_status ;; connections) check_network_connections ;; bandwidth) if [ -z “$2” ] || [ -z “$3” ]; then echo “Usage: $0 bandwidth ” exit 1 fi check_bandwidth “$2” “$3” ;; report) generate_network_report “$2” ;; all) log_info “Running comprehensive network diagnostics…” echo check_network_interfaces check_routing_table check_dns_resolution “$KYLIN_HOST” check_firewall_status check_network_connections echo “Port Connectivity Checks:” check_port_connectivity “$KYLIN_HOST” “$KYLIN_PORT” “Kylin Web” check_port_connectivity “$KYLIN_HOST” “$ZOOKEEPER_PORT” “Zookeeper” check_port_connectivity “$KYLIN_HOST” “$HADOOP_NAMENODE_PORT” “Hadoop NameNode” check_port_connectivity “$KYLIN_HOST” “$HBASE_MASTER_PORT” “HBase Master” echo echo “Network Latency Checks:” check_network_latency “$KYLIN_HOST” check_network_latency “8.8.8.8” echo echo “HTTP Connectivity Checks:” check_http_connectivity “http://$KYLIN_HOST:$KYLIN_PORT/kylin/” echo log_info “Comprehensive network diagnostics completed” ;; *) echo “Kylin Network Troubleshooting Tool” echo echo “Usage: $0 [options]” echo echo “Commands:” echo “ port [service] - Check port connectivity” echo “ dns - Check DNS resolution” echo “ latency [count] - Check network latency” echo “ http [timeout] - Check HTTP connectivity” echo “ interfaces - Show network interfaces” echo “ routes - Show routing table” echo “ firewall - Check firewall status” echo “ connections - Show network connections” echo “ bandwidth - Test bandwidth” echo “ report [output_file] - Generate diagnostic report” echo “ all - Run all diagnostics” echo echo “Examples:” echo “ $0 port localhost 7070 Kylin” echo “ $0 dns kylin.example.com” echo “ $0 latency 192.168.1.100” echo “ $0 http http://localhost:7070/kylin/” echo “ $0 report network_report.txt” echo “ $0 all” exit 1 ;; esac }

执行主函数

main “$@”