1. 性能优化策略

1.1 硬件优化

1.1.1 CPU优化

#!/bin/bash
# cpu_optimization.sh - CPU优化脚本

echo "=== GoAccess CPU优化配置 ==="

# 检查CPU核心数
CPU_CORES=$(nproc)
echo "检测到CPU核心数: $CPU_CORES"

# 设置GoAccess进程亲和性
set_cpu_affinity() {
    local pid=$1
    local cores=$2
    
    if [ -n "$pid" ] && [ -n "$cores" ]; then
        taskset -cp "$cores" "$pid"
        echo "设置进程 $pid 的CPU亲和性为核心 $cores"
    fi
}

# 优化系统调度器
optimize_scheduler() {
    echo "优化系统调度器..."
    
    # 设置为性能模式
    echo performance | sudo tee /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor
    
    # 禁用CPU节能模式
    echo 1 | sudo tee /sys/devices/system/cpu/intel_pstate/no_turbo 2>/dev/null || true
    
    # 设置进程调度策略
    echo "设置进程调度策略完成"
}

# 监控CPU使用率
monitor_cpu_usage() {
    echo "监控GoAccess CPU使用率..."
    
    while true; do
        # 查找GoAccess进程
        GOACCESS_PID=$(pgrep -f goaccess | head -1)
        
        if [ -n "$GOACCESS_PID" ]; then
            CPU_USAGE=$(ps -p "$GOACCESS_PID" -o %cpu --no-headers)
            MEMORY_USAGE=$(ps -p "$GOACCESS_PID" -o %mem --no-headers)
            
            echo "$(date): PID=$GOACCESS_PID, CPU=${CPU_USAGE}%, Memory=${MEMORY_USAGE}%"
            
            # 如果CPU使用率过高,发出警告
            if (( $(echo "$CPU_USAGE > 80" | bc -l) )); then
                echo "警告: GoAccess CPU使用率过高 ($CPU_USAGE%)"
            fi
        fi
        
        sleep 5
    done
}

# 主函数
main() {
    case "$1" in
        "affinity")
            if [ -n "$2" ] && [ -n "$3" ]; then
                set_cpu_affinity "$2" "$3"
            else
                echo "用法: $0 affinity <PID> <CPU_CORES>"
            fi
            ;;
        "scheduler")
            optimize_scheduler
            ;;
        "monitor")
            monitor_cpu_usage
            ;;
        *)
            echo "用法: $0 {affinity|scheduler|monitor}"
            echo "  affinity <PID> <CORES> - 设置CPU亲和性"
            echo "  scheduler              - 优化系统调度器"
            echo "  monitor               - 监控CPU使用率"
            ;;
    esac
}

main "$@"

2.3 错误日志分析

2.3.1 日志错误检测工具

#!/usr/bin/env python3
# log_error_analyzer.py - 日志错误分析工具

import re
import json
import logging
from typing import Dict, List, Tuple
from collections import defaultdict, Counter
from datetime import datetime, timedelta

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class LogErrorAnalyzer:
    def __init__(self):
        self.error_patterns = {
            'format_error': [
                r'invalid format',
                r'format not recognized',
                r'unable to parse',
                r'malformed log entry'
            ],
            'permission_error': [
                r'permission denied',
                r'access denied',
                r'cannot open file',
                r'no such file or directory'
            ],
            'memory_error': [
                r'out of memory',
                r'memory allocation failed',
                r'cannot allocate memory'
            ],
            'disk_error': [
                r'no space left on device',
                r'disk full',
                r'write error',
                r'i/o error'
            ],
            'network_error': [
                r'connection refused',
                r'network unreachable',
                r'timeout',
                r'socket error'
            ]
        }
        
        self.warning_patterns = {
            'performance_warning': [
                r'slow query',
                r'high cpu usage',
                r'memory usage high',
                r'processing time exceeded'
            ],
            'data_warning': [
                r'missing field',
                r'unexpected value',
                r'data truncated',
                r'invalid timestamp'
            ]
        }
    
    def analyze_goaccess_logs(self, log_paths: List[str]) -> Dict[str, any]:
        """分析GoAccess相关日志"""
        results = {
            'total_lines': 0,
            'errors': defaultdict(list),
            'warnings': defaultdict(list),
            'error_summary': defaultdict(int),
            'warning_summary': defaultdict(int),
            'timeline': [],
            'recommendations': []
        }
        
        for log_path in log_paths:
            try:
                logger.info(f"分析日志文件: {log_path}")
                file_results = self.analyze_single_log(log_path)
                
                # 合并结果
                results['total_lines'] += file_results['total_lines']
                
                for error_type, errors in file_results['errors'].items():
                    results['errors'][error_type].extend(errors)
                    results['error_summary'][error_type] += len(errors)
                
                for warning_type, warnings in file_results['warnings'].items():
                    results['warnings'][warning_type].extend(warnings)
                    results['warning_summary'][warning_type] += len(warnings)
                
                results['timeline'].extend(file_results['timeline'])
                
            except Exception as e:
                logger.error(f"分析日志文件 {log_path} 时出错: {e}")
                results['errors']['analysis_error'].append({
                    'file': log_path,
                    'error': str(e),
                    'timestamp': datetime.now().isoformat()
                })
        
        # 生成建议
        results['recommendations'] = self.generate_error_recommendations(results)
        
        # 排序时间线
        results['timeline'].sort(key=lambda x: x['timestamp'])
        
        return results
    
    def analyze_single_log(self, log_path: str) -> Dict[str, any]:
        """分析单个日志文件"""
        results = {
            'total_lines': 0,
            'errors': defaultdict(list),
            'warnings': defaultdict(list),
            'timeline': []
        }
        
        try:
            with open(log_path, 'r', encoding='utf-8', errors='ignore') as f:
                for line_num, line in enumerate(f, 1):
                    results['total_lines'] += 1
                    line = line.strip()
                    
                    if not line:
                        continue
                    
                    # 提取时间戳
                    timestamp = self.extract_timestamp(line)
                    
                    # 检查错误模式
                    for error_type, patterns in self.error_patterns.items():
                        for pattern in patterns:
                            if re.search(pattern, line, re.IGNORECASE):
                                error_entry = {
                                    'line_number': line_num,
                                    'content': line,
                                    'pattern': pattern,
                                    'timestamp': timestamp,
                                    'file': log_path
                                }
                                results['errors'][error_type].append(error_entry)
                                results['timeline'].append({
                                    'timestamp': timestamp,
                                    'type': 'error',
                                    'category': error_type,
                                    'content': line[:100] + '...' if len(line) > 100 else line
                                })
                                break
                    
                    # 检查警告模式
                    for warning_type, patterns in self.warning_patterns.items():
                        for pattern in patterns:
                            if re.search(pattern, line, re.IGNORECASE):
                                warning_entry = {
                                    'line_number': line_num,
                                    'content': line,
                                    'pattern': pattern,
                                    'timestamp': timestamp,
                                    'file': log_path
                                }
                                results['warnings'][warning_type].append(warning_entry)
                                results['timeline'].append({
                                    'timestamp': timestamp,
                                    'type': 'warning',
                                    'category': warning_type,
                                    'content': line[:100] + '...' if len(line) > 100 else line
                                })
                                break
        
        except Exception as e:
            logger.error(f"读取日志文件 {log_path} 时出错: {e}")
            raise
        
        return results
    
    def extract_timestamp(self, line: str) -> str:
        """从日志行中提取时间戳"""
        # 常见时间戳格式
        timestamp_patterns = [
            r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})',  # YYYY-MM-DD HH:MM:SS
            r'(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2})',  # DD/MMM/YYYY:HH:MM:SS
            r'(\w{3} \d{2} \d{2}:\d{2}:\d{2})',        # MMM DD HH:MM:SS
            r'(\d{10})',                                # Unix timestamp
        ]
        
        for pattern in timestamp_patterns:
            match = re.search(pattern, line)
            if match:
                return match.group(1)
        
        return datetime.now().isoformat()
    
    def generate_error_recommendations(self, results: Dict[str, any]) -> List[str]:
        """生成错误修复建议"""
        recommendations = []
        
        # 基于错误类型生成建议
        if results['error_summary']['format_error'] > 0:
            recommendations.append("检测到日志格式错误,请验证GoAccess配置中的log-format设置")
        
        if results['error_summary']['permission_error'] > 0:
            recommendations.append("检测到权限错误,请检查日志文件和输出目录的读写权限")
        
        if results['error_summary']['memory_error'] > 0:
            recommendations.append("检测到内存错误,建议增加系统内存或启用GoAccess磁盘数据库模式")
        
        if results['error_summary']['disk_error'] > 0:
            recommendations.append("检测到磁盘错误,请清理磁盘空间或检查磁盘健康状态")
        
        if results['error_summary']['network_error'] > 0:
            recommendations.append("检测到网络错误,请检查网络连接和防火墙设置")
        
        # 基于警告类型生成建议
        if results['warning_summary']['performance_warning'] > 0:
            recommendations.append("检测到性能警告,建议优化GoAccess配置或升级硬件")
        
        if results['warning_summary']['data_warning'] > 0:
            recommendations.append("检测到数据警告,请检查日志数据质量和完整性")
        
        # 基于错误频率生成建议
        total_errors = sum(results['error_summary'].values())
        if total_errors > 100:
            recommendations.append("错误数量较多,建议进行系统全面检查")
        elif total_errors > 10:
            recommendations.append("存在一些错误,建议定期监控和维护")
        
        if not recommendations:
            recommendations.append("未发现明显错误,系统运行正常")
        
        return recommendations
    
    def generate_error_report(self, results: Dict[str, any], output_format: str = 'text') -> str:
        """生成错误报告"""
        if output_format == 'json':
            return json.dumps(results, indent=2, ensure_ascii=False, default=str)
        
        # 文本格式报告
        report = []
        report.append("=== GoAccess日志错误分析报告 ===")
        report.append(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report.append(f"分析行数: {results['total_lines']}")
        report.append("")
        
        # 错误摘要
        if results['error_summary']:
            report.append("=== 错误摘要 ===")
            for error_type, count in results['error_summary'].items():
                report.append(f"{error_type}: {count}")
            report.append("")
        
        # 警告摘要
        if results['warning_summary']:
            report.append("=== 警告摘要 ===")
            for warning_type, count in results['warning_summary'].items():
                report.append(f"{warning_type}: {count}")
            report.append("")
        
        # 详细错误信息
        if results['errors']:
            report.append("=== 详细错误信息 ===")
            for error_type, errors in results['errors'].items():
                if errors:
                    report.append(f"\n--- {error_type} ---")
                    for error in errors[:5]:  # 只显示前5个
                        report.append(f"文件: {error.get('file', 'unknown')}")
                        report.append(f"行号: {error.get('line_number', 'unknown')}")
                        report.append(f"内容: {error.get('content', '')[:200]}")
                        report.append("")
                    
                    if len(errors) > 5:
                        report.append(f"... 还有 {len(errors) - 5} 个类似错误")
                        report.append("")
        
        # 修复建议
        if results['recommendations']:
            report.append("=== 修复建议 ===")
            for i, rec in enumerate(results['recommendations'], 1):
                report.append(f"{i}. {rec}")
            report.append("")
        
        return "\n".join(report)
    
    def monitor_error_trends(self, results: Dict[str, any], hours: int = 24) -> Dict[str, any]:
        """监控错误趋势"""
        now = datetime.now()
        cutoff_time = now - timedelta(hours=hours)
        
        # 过滤最近的错误
        recent_errors = []
        for entry in results['timeline']:
            try:
                entry_time = datetime.fromisoformat(entry['timestamp'])
                if entry_time >= cutoff_time:
                    recent_errors.append(entry)
            except:
                continue
        
        # 按小时分组
        hourly_stats = defaultdict(lambda: {'errors': 0, 'warnings': 0})
        
        for entry in recent_errors:
            try:
                entry_time = datetime.fromisoformat(entry['timestamp'])
                hour_key = entry_time.strftime('%Y-%m-%d %H:00')
                
                if entry['type'] == 'error':
                    hourly_stats[hour_key]['errors'] += 1
                elif entry['type'] == 'warning':
                    hourly_stats[hour_key]['warnings'] += 1
            except:
                continue
        
        # 计算趋势
        trend_analysis = {
            'total_recent_errors': len([e for e in recent_errors if e['type'] == 'error']),
            'total_recent_warnings': len([e for e in recent_errors if e['type'] == 'warning']),
            'hourly_breakdown': dict(hourly_stats),
            'peak_error_hour': None,
            'trend_direction': 'stable'
        }
        
        # 找出错误最多的小时
        if hourly_stats:
            peak_hour = max(hourly_stats.items(), key=lambda x: x[1]['errors'])
            trend_analysis['peak_error_hour'] = {
                'hour': peak_hour[0],
                'error_count': peak_hour[1]['errors']
            }
        
        # 简单趋势分析
        if len(hourly_stats) >= 2:
            hours = sorted(hourly_stats.keys())
            first_half = hours[:len(hours)//2]
            second_half = hours[len(hours)//2:]
            
            first_half_errors = sum(hourly_stats[h]['errors'] for h in first_half)
            second_half_errors = sum(hourly_stats[h]['errors'] for h in second_half)
            
            if second_half_errors > first_half_errors * 1.2:
                trend_analysis['trend_direction'] = 'increasing'
            elif second_half_errors < first_half_errors * 0.8:
                trend_analysis['trend_direction'] = 'decreasing'
        
        return trend_analysis

def main():
    import argparse
    
    parser = argparse.ArgumentParser(description='GoAccess日志错误分析工具')
    parser.add_argument('log_files', nargs='+', help='要分析的日志文件')
    parser.add_argument('--output', '-o', help='输出文件路径')
    parser.add_argument('--format', choices=['text', 'json'], default='text',
                       help='输出格式')
    parser.add_argument('--trend-hours', type=int, default=24,
                       help='趋势分析的小时数')
    
    args = parser.parse_args()
    
    # 创建分析器
    analyzer = LogErrorAnalyzer()
    
    # 分析日志
    logger.info("开始分析日志文件...")
    results = analyzer.analyze_goaccess_logs(args.log_files)
    
    # 趋势分析
    trend_analysis = analyzer.monitor_error_trends(results, args.trend_hours)
    results['trend_analysis'] = trend_analysis
    
    # 生成报告
    report = analyzer.generate_error_report(results, args.format)
    
    # 输出结果
    if args.output:
        with open(args.output, 'w', encoding='utf-8') as f:
            f.write(report)
        logger.info(f"报告已保存到: {args.output}")
    else:
        print(report)
    
    # 显示趋势分析
    if args.format == 'text':
        print("\n=== 错误趋势分析 ===")
        print(f"最近{args.trend_hours}小时内:")
        print(f"  错误总数: {trend_analysis['total_recent_errors']}")
        print(f"  警告总数: {trend_analysis['total_recent_warnings']}")
        print(f"  趋势方向: {trend_analysis['trend_direction']}")
        
        if trend_analysis['peak_error_hour']:
            peak = trend_analysis['peak_error_hour']
            print(f"  错误高峰: {peak['hour']} ({peak['error_count']}个错误)")

if __name__ == "__main__":
    main()

3. 最佳实践总结

3.1 性能优化检查清单

# goaccess_optimization_checklist.yml
# GoAccess性能优化检查清单

performance_optimization:
  hardware:
    - name: "CPU优化"
      items:
        - "检查CPU核心数和使用率"
        - "设置进程CPU亲和性"
        - "优化系统调度器设置"
        - "监控CPU温度和频率"
    
    - name: "内存优化"
      items:
        - "检查可用内存容量"
        - "优化交换分区使用"
        - "清理页面缓存"
        - "监控内存泄漏"
    
    - name: "磁盘I/O优化"
      items:
        - "使用SSD存储"
        - "优化I/O调度器"
        - "设置合适的文件系统"
        - "监控磁盘使用率"
  
  software:
    - name: "GoAccess配置"
      items:
        - "选择合适的日志格式"
        - "启用磁盘数据库模式"
        - "优化输出格式设置"
        - "配置合理的更新间隔"
    
    - name: "系统配置"
      items:
        - "增加文件描述符限制"
        - "优化内核参数"
        - "配置日志轮转"
        - "设置合理的权限"
  
  monitoring:
    - name: "性能监控"
      items:
        - "监控CPU和内存使用"
        - "监控磁盘I/O性能"
        - "监控网络连接状态"
        - "设置性能告警阈值"

troubleshooting:
  common_issues:
    - name: "安装问题"
      symptoms:
        - "命令未找到"
        - "版本不兼容"
        - "依赖库缺失"
      solutions:
        - "重新安装GoAccess"
        - "检查PATH环境变量"
        - "安装必要的依赖库"
    
    - name: "配置问题"
      symptoms:
        - "日志格式错误"
        - "输出文件无法生成"
        - "实时监控不工作"
      solutions:
        - "验证日志格式配置"
        - "检查输出目录权限"
        - "确认WebSocket端口开放"
    
    - name: "性能问题"
      symptoms:
        - "处理速度慢"
        - "内存使用过高"
        - "CPU使用率过高"
      solutions:
        - "启用磁盘数据库模式"
        - "增加系统资源"
        - "优化日志文件大小"
    
    - name: "数据问题"
      symptoms:
        - "统计数据不准确"
        - "缺少地理位置信息"
        - "时间显示错误"
      solutions:
        - "检查日志数据质量"
        - "安装GeoIP数据库"
        - "配置正确的时区"

monitoring_alerts:
  cpu_usage:
    warning_threshold: 70
    critical_threshold: 90
    check_interval: 60
  
  memory_usage:
    warning_threshold: 80
    critical_threshold: 95
    check_interval: 60
  
  disk_usage:
    warning_threshold: 80
    critical_threshold: 90
    check_interval: 300
  
  error_rate:
    warning_threshold: 10
    critical_threshold: 50
    check_interval: 300

maintenance_schedule:
  daily:
    - "检查日志文件大小"
    - "监控系统资源使用"
    - "清理临时文件"
  
  weekly:
    - "更新GeoIP数据库"
    - "检查配置文件"
    - "分析性能趋势"
  
  monthly:
    - "系统全面检查"
    - "更新GoAccess版本"
    - "优化配置参数"

3.2 故障排除流程图

flowchart TD
    A[GoAccess问题] --> B{问题类型}
    
    B -->|安装问题| C[检查安装]
    B -->|配置问题| D[检查配置]
    B -->|性能问题| E[性能分析]
    B -->|数据问题| F[数据验证]
    
    C --> C1[验证安装路径]
    C --> C2[检查版本兼容性]
    C --> C3[安装依赖库]
    C1 --> G[问题解决]
    C2 --> G
    C3 --> G
    
    D --> D1[验证配置语法]
    D --> D2[检查文件权限]
    D --> D3[测试日志格式]
    D1 --> G
    D2 --> G
    D3 --> G
    
    E --> E1[监控系统资源]
    E --> E2[分析性能瓶颈]
    E --> E3[优化配置参数]
    E1 --> G
    E2 --> G
    E3 --> G
    
    F --> F1[检查日志质量]
    F --> F2[验证数据完整性]
    F --> F3[更新数据库]
    F1 --> G
    F2 --> G
    F3 --> G
    
    G --> H{问题是否解决}
    H -->|是| I[监控和维护]
    H -->|否| J[深入诊断]
    
    J --> K[收集详细日志]
    J --> L[联系技术支持]
    J --> M[查阅文档]
    
    K --> N[重新分析]
    L --> N
    M --> N
    
    N --> H

4. 总结

本章详细介绍了GoAccess的性能优化和故障排除方法,包括:

  1. 性能优化策略

    • 硬件优化(CPU、内存、磁盘I/O)
    • GoAccess配置优化
    • 系统参数调优
  2. 故障排除指南

    • 常见问题诊断工具
    • 性能问题排查脚本
    • 错误日志分析工具
  3. 最佳实践

    • 性能优化检查清单
    • 故障排除流程
    • 监控和维护建议

通过本章的学习,您应该能够: - 识别和解决GoAccess的常见问题 - 优化GoAccess的性能表现 - 建立有效的监控和维护机制 - 快速诊断和修复系统故障

下一章我们将学习GoAccess的高级应用场景和企业级部署方案。


#### 1.1.2 内存优化

```python
#!/usr/bin/env python3
# memory_optimizer.py - 内存优化工具

import psutil
import subprocess
import json
import logging
import time
from typing import Dict, List, Optional

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class GoAccessMemoryOptimizer:
    def __init__(self):
        self.memory_threshold = 80  # 内存使用率阈值
        self.swap_threshold = 50    # 交换分区使用率阈值
        
    def get_system_memory_info(self) -> Dict[str, float]:
        """获取系统内存信息"""
        memory = psutil.virtual_memory()
        swap = psutil.swap_memory()
        
        return {
            'total_memory_gb': memory.total / (1024**3),
            'available_memory_gb': memory.available / (1024**3),
            'memory_percent': memory.percent,
            'swap_total_gb': swap.total / (1024**3),
            'swap_used_gb': swap.used / (1024**3),
            'swap_percent': swap.percent
        }
    
    def get_goaccess_memory_usage(self) -> List[Dict[str, any]]:
        """获取GoAccess进程内存使用情况"""
        processes = []
        
        for proc in psutil.process_iter(['pid', 'name', 'memory_info', 'memory_percent']):
            try:
                if 'goaccess' in proc.info['name'].lower():
                    memory_info = proc.info['memory_info']
                    processes.append({
                        'pid': proc.info['pid'],
                        'name': proc.info['name'],
                        'memory_mb': memory_info.rss / (1024**2),
                        'memory_percent': proc.info['memory_percent'],
                        'vms_mb': memory_info.vms / (1024**2)
                    })
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                continue
        
        return processes
    
    def optimize_memory_settings(self) -> Dict[str, str]:
        """优化内存设置"""
        optimizations = {}
        
        try:
            # 清理页面缓存
            subprocess.run(['sudo', 'sync'], check=True)
            subprocess.run(['sudo', 'sh', '-c', 'echo 1 > /proc/sys/vm/drop_caches'], check=True)
            optimizations['page_cache'] = '已清理页面缓存'
            
            # 优化交换分区使用
            subprocess.run(['sudo', 'sysctl', 'vm.swappiness=10'], check=True)
            optimizations['swappiness'] = '设置交换分区使用率为10'
            
            # 优化内存回收
            subprocess.run(['sudo', 'sysctl', 'vm.vfs_cache_pressure=50'], check=True)
            optimizations['cache_pressure'] = '设置VFS缓存压力为50'
            
            # 优化脏页回写
            subprocess.run(['sudo', 'sysctl', 'vm.dirty_ratio=15'], check=True)
            subprocess.run(['sudo', 'sysctl', 'vm.dirty_background_ratio=5'], check=True)
            optimizations['dirty_pages'] = '优化脏页回写设置'
            
        except subprocess.CalledProcessError as e:
            logger.error(f"优化内存设置失败: {e}")
            optimizations['error'] = str(e)
        
        return optimizations
    
    def configure_goaccess_memory(self, log_file_size_mb: int) -> Dict[str, any]:
        """根据日志文件大小配置GoAccess内存使用"""
        config = {}
        
        # 根据日志文件大小推荐配置
        if log_file_size_mb < 100:
            config = {
                'keep_db_files': False,
                'load_from_disk': False,
                'real_time_html': True,
                'memory_usage': 'low'
            }
        elif log_file_size_mb < 500:
            config = {
                'keep_db_files': True,
                'load_from_disk': True,
                'real_time_html': False,
                'db_path': '/tmp/goaccess',
                'memory_usage': 'medium'
            }
        else:
            config = {
                'keep_db_files': True,
                'load_from_disk': True,
                'real_time_html': False,
                'db_path': '/var/lib/goaccess',
                'process_and_exit': True,
                'memory_usage': 'high'
            }
        
        return config
    
    def monitor_memory_usage(self, duration_minutes: int = 10):
        """监控内存使用情况"""
        logger.info(f"开始监控内存使用情况,持续{duration_minutes}分钟")
        
        start_time = time.time()
        end_time = start_time + (duration_minutes * 60)
        
        metrics = []
        
        while time.time() < end_time:
            # 获取系统内存信息
            system_memory = self.get_system_memory_info()
            
            # 获取GoAccess进程信息
            goaccess_processes = self.get_goaccess_memory_usage()
            
            metric = {
                'timestamp': time.time(),
                'system_memory': system_memory,
                'goaccess_processes': goaccess_processes
            }
            
            metrics.append(metric)
            
            # 检查内存使用率
            if system_memory['memory_percent'] > self.memory_threshold:
                logger.warning(f"系统内存使用率过高: {system_memory['memory_percent']:.1f}%")
            
            if system_memory['swap_percent'] > self.swap_threshold:
                logger.warning(f"交换分区使用率过高: {system_memory['swap_percent']:.1f}%")
            
            # 检查GoAccess进程内存使用
            for proc in goaccess_processes:
                if proc['memory_percent'] > 10:  # 单个进程使用超过10%内存
                    logger.warning(f"GoAccess进程 {proc['pid']} 内存使用率过高: {proc['memory_percent']:.1f}%")
            
            time.sleep(30)  # 每30秒检查一次
        
        return metrics
    
    def generate_memory_report(self, metrics: List[Dict[str, any]]) -> Dict[str, any]:
        """生成内存使用报告"""
        if not metrics:
            return {}
        
        # 计算平均值
        avg_memory_percent = sum(m['system_memory']['memory_percent'] for m in metrics) / len(metrics)
        max_memory_percent = max(m['system_memory']['memory_percent'] for m in metrics)
        
        avg_swap_percent = sum(m['system_memory']['swap_percent'] for m in metrics) / len(metrics)
        max_swap_percent = max(m['system_memory']['swap_percent'] for m in metrics)
        
        # GoAccess进程统计
        all_goaccess_processes = []
        for metric in metrics:
            all_goaccess_processes.extend(metric['goaccess_processes'])
        
        if all_goaccess_processes:
            avg_goaccess_memory = sum(p['memory_mb'] for p in all_goaccess_processes) / len(all_goaccess_processes)
            max_goaccess_memory = max(p['memory_mb'] for p in all_goaccess_processes)
        else:
            avg_goaccess_memory = 0
            max_goaccess_memory = 0
        
        report = {
            'monitoring_duration_minutes': len(metrics) * 0.5,  # 每30秒一次
            'system_memory': {
                'avg_usage_percent': round(avg_memory_percent, 2),
                'max_usage_percent': round(max_memory_percent, 2),
                'avg_swap_percent': round(avg_swap_percent, 2),
                'max_swap_percent': round(max_swap_percent, 2)
            },
            'goaccess_memory': {
                'avg_usage_mb': round(avg_goaccess_memory, 2),
                'max_usage_mb': round(max_goaccess_memory, 2),
                'process_count': len(set(p['pid'] for p in all_goaccess_processes))
            },
            'recommendations': self.generate_memory_recommendations(metrics)
        }
        
        return report
    
    def generate_memory_recommendations(self, metrics: List[Dict[str, any]]) -> List[str]:
        """生成内存优化建议"""
        recommendations = []
        
        if not metrics:
            return recommendations
        
        avg_memory_percent = sum(m['system_memory']['memory_percent'] for m in metrics) / len(metrics)
        avg_swap_percent = sum(m['system_memory']['swap_percent'] for m in metrics) / len(metrics)
        
        if avg_memory_percent > 80:
            recommendations.append("系统内存使用率过高,建议增加物理内存或启用GoAccess磁盘数据库模式")
        
        if avg_swap_percent > 30:
            recommendations.append("交换分区使用率过高,建议增加物理内存或优化GoAccess配置")
        
        if avg_memory_percent < 30:
            recommendations.append("内存使用率较低,可以考虑增加GoAccess并发处理或启用更多功能")
        
        # 检查GoAccess进程内存使用
        all_goaccess_processes = []
        for metric in metrics:
            all_goaccess_processes.extend(metric['goaccess_processes'])
        
        if all_goaccess_processes:
            max_goaccess_memory = max(p['memory_mb'] for p in all_goaccess_processes)
            if max_goaccess_memory > 1000:  # 超过1GB
                recommendations.append("GoAccess进程内存使用过高,建议启用磁盘数据库模式或分批处理")
        
        return recommendations
    
    def cleanup_memory(self):
        """清理内存"""
        try:
            logger.info("开始清理内存...")
            
            # 同步文件系统
            subprocess.run(['sync'], check=True)
            
            # 清理页面缓存
            subprocess.run(['sudo', 'sh', '-c', 'echo 1 > /proc/sys/vm/drop_caches'], check=True)
            
            # 清理目录项和inode缓存
            subprocess.run(['sudo', 'sh', '-c', 'echo 2 > /proc/sys/vm/drop_caches'], check=True)
            
            # 清理所有缓存
            subprocess.run(['sudo', 'sh', '-c', 'echo 3 > /proc/sys/vm/drop_caches'], check=True)
            
            logger.info("内存清理完成")
            
        except subprocess.CalledProcessError as e:
            logger.error(f"清理内存失败: {e}")

def main():
    optimizer = GoAccessMemoryOptimizer()
    
    # 获取系统内存信息
    memory_info = optimizer.get_system_memory_info()
    print("系统内存信息:")
    print(json.dumps(memory_info, indent=2))
    
    # 获取GoAccess进程信息
    goaccess_processes = optimizer.get_goaccess_memory_usage()
    if goaccess_processes:
        print("\nGoAccess进程信息:")
        print(json.dumps(goaccess_processes, indent=2))
    else:
        print("\n未发现运行中的GoAccess进程")
    
    # 生成配置建议
    config = optimizer.configure_goaccess_memory(500)  # 假设500MB日志文件
    print("\nGoAccess内存配置建议:")
    print(json.dumps(config, indent=2))
    
    # 优化内存设置
    optimizations = optimizer.optimize_memory_settings()
    print("\n内存优化结果:")
    print(json.dumps(optimizations, indent=2))

if __name__ == "__main__":
    main()

1.2 磁盘I/O优化

1.2.1 磁盘性能监控

#!/usr/bin/env python3
# disk_monitor.py - 磁盘性能监控工具

import psutil
import time
import json
import logging
from typing import Dict, List
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class DiskPerformanceMonitor:
    def __init__(self):
        self.baseline_metrics = None
        
    def get_disk_usage(self) -> Dict[str, Dict[str, float]]:
        """获取磁盘使用情况"""
        disk_usage = {}
        
        # 获取所有挂载点
        partitions = psutil.disk_partitions()
        
        for partition in partitions:
            try:
                usage = psutil.disk_usage(partition.mountpoint)
                disk_usage[partition.mountpoint] = {
                    'device': partition.device,
                    'fstype': partition.fstype,
                    'total_gb': usage.total / (1024**3),
                    'used_gb': usage.used / (1024**3),
                    'free_gb': usage.free / (1024**3),
                    'percent': (usage.used / usage.total) * 100
                }
            except PermissionError:
                continue
        
        return disk_usage
    
    def get_disk_io_stats(self) -> Dict[str, Dict[str, float]]:
        """获取磁盘I/O统计"""
        io_stats = {}
        
        # 获取磁盘I/O统计
        disk_io = psutil.disk_io_counters(perdisk=True)
        
        for device, stats in disk_io.items():
            io_stats[device] = {
                'read_count': stats.read_count,
                'write_count': stats.write_count,
                'read_bytes': stats.read_bytes,
                'write_bytes': stats.write_bytes,
                'read_time': stats.read_time,
                'write_time': stats.write_time,
                'read_mb': stats.read_bytes / (1024**2),
                'write_mb': stats.write_bytes / (1024**2)
            }
        
        return io_stats
    
    def calculate_io_rates(self, current_stats: Dict, previous_stats: Dict, time_diff: float) -> Dict:
        """计算I/O速率"""
        rates = {}
        
        for device in current_stats:
            if device in previous_stats:
                current = current_stats[device]
                previous = previous_stats[device]
                
                rates[device] = {
                    'read_rate_mb_s': (current['read_mb'] - previous['read_mb']) / time_diff,
                    'write_rate_mb_s': (current['write_mb'] - previous['write_mb']) / time_diff,
                    'read_iops': (current['read_count'] - previous['read_count']) / time_diff,
                    'write_iops': (current['write_count'] - previous['write_count']) / time_diff
                }
        
        return rates
    
    def monitor_disk_performance(self, duration_minutes: int = 10, interval_seconds: int = 5) -> List[Dict]:
        """监控磁盘性能"""
        logger.info(f"开始监控磁盘性能,持续{duration_minutes}分钟,间隔{interval_seconds}秒")
        
        metrics = []
        start_time = time.time()
        end_time = start_time + (duration_minutes * 60)
        
        previous_io_stats = None
        previous_time = None
        
        while time.time() < end_time:
            current_time = time.time()
            
            # 获取当前统计
            disk_usage = self.get_disk_usage()
            current_io_stats = self.get_disk_io_stats()
            
            metric = {
                'timestamp': current_time,
                'datetime': datetime.fromtimestamp(current_time).isoformat(),
                'disk_usage': disk_usage,
                'io_stats': current_io_stats
            }
            
            # 计算I/O速率
            if previous_io_stats and previous_time:
                time_diff = current_time - previous_time
                io_rates = self.calculate_io_rates(current_io_stats, previous_io_stats, time_diff)
                metric['io_rates'] = io_rates
                
                # 检查性能警告
                self.check_performance_warnings(io_rates, disk_usage)
            
            metrics.append(metric)
            
            previous_io_stats = current_io_stats
            previous_time = current_time
            
            time.sleep(interval_seconds)
        
        return metrics
    
    def check_performance_warnings(self, io_rates: Dict, disk_usage: Dict):
        """检查性能警告"""
        # 检查磁盘使用率
        for mountpoint, usage in disk_usage.items():
            if usage['percent'] > 90:
                logger.warning(f"磁盘使用率过高: {mountpoint} ({usage['percent']:.1f}%)")
            elif usage['percent'] > 80:
                logger.info(f"磁盘使用率较高: {mountpoint} ({usage['percent']:.1f}%)")
        
        # 检查I/O速率
        for device, rates in io_rates.items():
            # 读取速率警告(超过100MB/s)
            if rates['read_rate_mb_s'] > 100:
                logger.warning(f"设备 {device} 读取速率过高: {rates['read_rate_mb_s']:.1f} MB/s")
            
            # 写入速率警告(超过100MB/s)
            if rates['write_rate_mb_s'] > 100:
                logger.warning(f"设备 {device} 写入速率过高: {rates['write_rate_mb_s']:.1f} MB/s")
            
            # IOPS警告
            if rates['read_iops'] > 1000:
                logger.warning(f"设备 {device} 读取IOPS过高: {rates['read_iops']:.0f}")
            
            if rates['write_iops'] > 1000:
                logger.warning(f"设备 {device} 写入IOPS过高: {rates['write_iops']:.0f}")
    
    def generate_performance_report(self, metrics: List[Dict]) -> Dict:
        """生成性能报告"""
        if not metrics:
            return {}
        
        # 计算平均I/O速率
        all_io_rates = []
        for metric in metrics:
            if 'io_rates' in metric:
                all_io_rates.append(metric['io_rates'])
        
        if not all_io_rates:
            return {'error': '没有足够的I/O速率数据'}
        
        # 按设备统计
        device_stats = {}
        for io_rate in all_io_rates:
            for device, rates in io_rate.items():
                if device not in device_stats:
                    device_stats[device] = {
                        'read_rates': [],
                        'write_rates': [],
                        'read_iops': [],
                        'write_iops': []
                    }
                
                device_stats[device]['read_rates'].append(rates['read_rate_mb_s'])
                device_stats[device]['write_rates'].append(rates['write_rate_mb_s'])
                device_stats[device]['read_iops'].append(rates['read_iops'])
                device_stats[device]['write_iops'].append(rates['write_iops'])
        
        # 计算统计值
        report = {
            'monitoring_duration_minutes': len(metrics) * 5 / 60,  # 假设5秒间隔
            'devices': {}
        }
        
        for device, stats in device_stats.items():
            report['devices'][device] = {
                'avg_read_rate_mb_s': sum(stats['read_rates']) / len(stats['read_rates']),
                'max_read_rate_mb_s': max(stats['read_rates']),
                'avg_write_rate_mb_s': sum(stats['write_rates']) / len(stats['write_rates']),
                'max_write_rate_mb_s': max(stats['write_rates']),
                'avg_read_iops': sum(stats['read_iops']) / len(stats['read_iops']),
                'max_read_iops': max(stats['read_iops']),
                'avg_write_iops': sum(stats['write_iops']) / len(stats['write_iops']),
                'max_write_iops': max(stats['write_iops'])
            }
        
        # 添加磁盘使用情况(最后一次测量)
        if metrics:
            last_metric = metrics[-1]
            report['disk_usage'] = last_metric.get('disk_usage', {})
        
        # 生成优化建议
        report['recommendations'] = self.generate_disk_recommendations(report)
        
        return report
    
    def generate_disk_recommendations(self, report: Dict) -> List[str]:
        """生成磁盘优化建议"""
        recommendations = []
        
        # 检查磁盘使用率
        disk_usage = report.get('disk_usage', {})
        for mountpoint, usage in disk_usage.items():
            if usage['percent'] > 90:
                recommendations.append(f"磁盘 {mountpoint} 使用率过高({usage['percent']:.1f}%),建议清理文件或扩容")
            elif usage['percent'] > 80:
                recommendations.append(f"磁盘 {mountpoint} 使用率较高({usage['percent']:.1f}%),建议监控空间使用")
        
        # 检查I/O性能
        devices = report.get('devices', {})
        for device, stats in devices.items():
            if stats['max_read_rate_mb_s'] > 100:
                recommendations.append(f"设备 {device} 读取速率过高,建议优化读取操作或使用SSD")
            
            if stats['max_write_rate_mb_s'] > 100:
                recommendations.append(f"设备 {device} 写入速率过高,建议优化写入操作或使用SSD")
            
            if stats['max_read_iops'] > 1000 or stats['max_write_iops'] > 1000:
                recommendations.append(f"设备 {device} IOPS过高,建议使用更快的存储设备")
        
        if not recommendations:
            recommendations.append("磁盘性能正常,无需特别优化")
        
        return recommendations
    
    def optimize_disk_settings(self) -> Dict[str, str]:
        """优化磁盘设置"""
        optimizations = {}
        
        try:
            import subprocess
            
            # 优化I/O调度器
            result = subprocess.run(['cat', '/sys/block/sda/queue/scheduler'], 
                                  capture_output=True, text=True)
            if result.returncode == 0:
                current_scheduler = result.stdout.strip()
                logger.info(f"当前I/O调度器: {current_scheduler}")
                
                # 设置为deadline调度器(适合数据库和日志处理)
                try:
                    subprocess.run(['sudo', 'sh', '-c', 'echo deadline > /sys/block/sda/queue/scheduler'], 
                                 check=True)
                    optimizations['io_scheduler'] = '设置I/O调度器为deadline'
                except subprocess.CalledProcessError:
                    optimizations['io_scheduler'] = '设置I/O调度器失败'
            
            # 优化文件系统挂载选项
            optimizations['mount_options'] = '建议使用noatime,nodiratime挂载选项减少磁盘写入'
            
            # 优化内核参数
            subprocess.run(['sudo', 'sysctl', 'vm.dirty_ratio=5'], check=True)
            subprocess.run(['sudo', 'sysctl', 'vm.dirty_background_ratio=2'], check=True)
            optimizations['dirty_pages'] = '优化脏页回写参数'
            
        except Exception as e:
            logger.error(f"优化磁盘设置失败: {e}")
            optimizations['error'] = str(e)
        
        return optimizations

def main():
    monitor = DiskPerformanceMonitor()
    
    # 获取当前磁盘使用情况
    disk_usage = monitor.get_disk_usage()
    print("磁盘使用情况:")
    print(json.dumps(disk_usage, indent=2))
    
    # 获取当前I/O统计
    io_stats = monitor.get_disk_io_stats()
    print("\n磁盘I/O统计:")
    print(json.dumps(io_stats, indent=2))
    
    # 监控磁盘性能(示例:监控1分钟)
    print("\n开始监控磁盘性能...")
    metrics = monitor.monitor_disk_performance(duration_minutes=1, interval_seconds=5)
    
    # 生成性能报告
    report = monitor.generate_performance_report(metrics)
    print("\n磁盘性能报告:")
    print(json.dumps(report, indent=2))
    
    # 优化磁盘设置
    optimizations = monitor.optimize_disk_settings()
    print("\n磁盘优化结果:")
    print(json.dumps(optimizations, indent=2))

if __name__ == "__main__":
    main()

1.3 GoAccess配置优化

1.3.1 配置文件优化

#!/bin/bash
# goaccess_config_optimizer.sh - GoAccess配置优化脚本

CONFIG_FILE="/etc/goaccess/goaccess.conf"
BACKUP_FILE="/etc/goaccess/goaccess.conf.backup.$(date +%Y%m%d_%H%M%S)"

echo "=== GoAccess配置优化工具 ==="

# 备份原配置文件
backup_config() {
    if [ -f "$CONFIG_FILE" ]; then
        cp "$CONFIG_FILE" "$BACKUP_FILE"
        echo "配置文件已备份到: $BACKUP_FILE"
    fi
}

# 生成优化配置
generate_optimized_config() {
    local log_size=$1
    local memory_gb=$2
    
    cat > "$CONFIG_FILE" << EOF
# GoAccess优化配置文件
# 生成时间: $(date)
# 日志大小: ${log_size}MB
# 系统内存: ${memory_gb}GB

# 基础配置
time-format %H:%M:%S
date-format %d/%b/%Y
log-format COMBINED

# 性能优化配置
EOF

    # 根据日志大小和内存配置
    if [ "$log_size" -lt 100 ]; then
        # 小文件配置
        cat >> "$CONFIG_FILE" << EOF
# 小文件优化配置 (<100MB)
real-time-html true
keep-db-files false
load-from-disk false
html-prefs '{"theme":"bright","perPage":7,"layout":"horizontal","showTables":true}'
EOF
    elif [ "$log_size" -lt 1000 ]; then
        # 中等文件配置
        cat >> "$CONFIG_FILE" << EOF
# 中等文件优化配置 (100MB-1GB)
keep-db-files true
load-from-disk true
db-path /tmp/goaccess
html-prefs '{"theme":"dark","perPage":10,"layout":"vertical","showTables":true}'
EOF
    else
        # 大文件配置
        cat >> "$CONFIG_FILE" << EOF
# 大文件优化配置 (>1GB)
keep-db-files true
load-from-disk true
db-path /var/lib/goaccess
process-and-exit true
html-prefs '{"theme":"dark","perPage":15,"layout":"vertical","showTables":false}'
EOF
    fi

    # 通用优化配置
    cat >> "$CONFIG_FILE" << EOF

# 通用优化配置
no-progress true
no-color false
no-csv-summary false
no-json-summary false

# 排除配置
ignore-panel KEYPHRASES
ignore-panel GEOLOCATION
ignore-status 301
ignore-status 302
ignore-status 304

# 包含静态文件
static-file .css
static-file .js
static-file .jpg
static-file .jpeg
static-file .png
static-file .gif
static-file .ico
static-file .svg
static-file .woff
static-file .woff2
static-file .ttf
static-file .eot
static-file .pdf
static-file .zip
static-file .tar.gz

# 地理位置数据库
geoip-database /usr/share/GeoIP/GeoLite2-Country.mmdb

# 输出格式
output /var/www/html/goaccess.html
json-pretty-print true
html-custom-css /etc/goaccess/custom.css
html-custom-js /etc/goaccess/custom.js
EOF

    echo "优化配置已生成: $CONFIG_FILE"
}

# 创建自定义CSS
create_custom_css() {
    local css_file="/etc/goaccess/custom.css"
    
    cat > "$css_file" << EOF
/* GoAccess自定义样式 */
.container {
    max-width: 1200px;
    margin: 0 auto;
}

.panel {
    margin-bottom: 20px;
    border-radius: 5px;
    box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}

.panel-heading {
    background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
    color: white;
    padding: 15px;
    border-radius: 5px 5px 0 0;
}

.table {
    font-size: 12px;
}

.table th {
    background-color: #f8f9fa;
    font-weight: bold;
}

.table tr:hover {
    background-color: #f5f5f5;
}

/* 响应式设计 */
@media (max-width: 768px) {
    .container {
        padding: 10px;
    }
    
    .table {
        font-size: 10px;
    }
}
EOF

    echo "自定义CSS已创建: $css_file"
}

# 创建自定义JavaScript
create_custom_js() {
    local js_file="/etc/goaccess/custom.js"
    
    cat > "$js_file" << EOF
// GoAccess自定义JavaScript

// 添加刷新按钮
function addRefreshButton() {
    const header = document.querySelector('.container');
    if (header) {
        const refreshBtn = document.createElement('button');
        refreshBtn.innerHTML = '刷新数据';
        refreshBtn.className = 'btn btn-primary';
        refreshBtn.style.cssText = 'position: fixed; top: 10px; right: 10px; z-index: 1000;';
        refreshBtn.onclick = function() {
            location.reload();
        };
        header.appendChild(refreshBtn);
    }
}

// 添加搜索功能
function addSearchFunction() {
    const tables = document.querySelectorAll('.table');
    tables.forEach(table => {
        const searchInput = document.createElement('input');
        searchInput.type = 'text';
        searchInput.placeholder = '搜索...';
        searchInput.className = 'form-control';
        searchInput.style.cssText = 'margin-bottom: 10px; width: 200px;';
        
        searchInput.addEventListener('keyup', function() {
            const filter = this.value.toLowerCase();
            const rows = table.querySelectorAll('tbody tr');
            
            rows.forEach(row => {
                const text = row.textContent.toLowerCase();
                row.style.display = text.includes(filter) ? '' : 'none';
            });
        });
        
        table.parentNode.insertBefore(searchInput, table);
    });
}

// 页面加载完成后执行
document.addEventListener('DOMContentLoaded', function() {
    addRefreshButton();
    addSearchFunction();
    
    // 添加最后更新时间
    const updateTime = document.createElement('div');
    updateTime.innerHTML = '最后更新: ' + new Date().toLocaleString();
    updateTime.style.cssText = 'text-align: center; color: #666; margin: 20px 0;';
    document.body.appendChild(updateTime);
});
EOF

    echo "自定义JavaScript已创建: $js_file"
}

# 检测系统资源
detect_system_resources() {
    # 检测内存
    local memory_kb=$(grep MemTotal /proc/meminfo | awk '{print $2}')
    local memory_gb=$((memory_kb / 1024 / 1024))
    
    # 检测CPU核心数
    local cpu_cores=$(nproc)
    
    # 检测磁盘空间
    local disk_space=$(df -BG / | awk 'NR==2 {print $4}' | sed 's/G//')
    
    echo "系统资源检测结果:"
    echo "  内存: ${memory_gb}GB"
    echo "  CPU核心: ${cpu_cores}"
    echo "  可用磁盘空间: ${disk_space}GB"
    
    # 返回内存大小供其他函数使用
    echo "$memory_gb"
}

# 优化系统参数
optimize_system_params() {
    echo "优化系统参数..."
    
    # 创建优化脚本
    cat > /tmp/goaccess_system_optimize.sh << 'EOF'
#!/bin/bash
# 系统参数优化

# 优化文件描述符限制
echo "* soft nofile 65536" >> /etc/security/limits.conf
echo "* hard nofile 65536" >> /etc/security/limits.conf

# 优化内核参数
echo "vm.swappiness=10" >> /etc/sysctl.conf
echo "vm.dirty_ratio=5" >> /etc/sysctl.conf
echo "vm.dirty_background_ratio=2" >> /etc/sysctl.conf
echo "fs.file-max=2097152" >> /etc/sysctl.conf

# 应用参数
sysctl -p

echo "系统参数优化完成"
EOF

    chmod +x /tmp/goaccess_system_optimize.sh
    echo "系统优化脚本已创建: /tmp/goaccess_system_optimize.sh"
    echo "请以root权限运行该脚本以应用系统优化"
}

# 主函数
main() {
    case "$1" in
        "optimize")
            local log_size=${2:-500}  # 默认500MB
            
            echo "开始优化GoAccess配置..."
            
            # 备份配置
            backup_config
            
            # 检测系统资源
            local memory_gb=$(detect_system_resources)
            
            # 生成优化配置
            generate_optimized_config "$log_size" "$memory_gb"
            
            # 创建自定义文件
            create_custom_css
            create_custom_js
            
            # 优化系统参数
            optimize_system_params
            
            echo "GoAccess配置优化完成!"
            echo "配置文件: $CONFIG_FILE"
            echo "备份文件: $BACKUP_FILE"
            ;;
        "restore")
            if [ -n "$2" ] && [ -f "$2" ]; then
                cp "$2" "$CONFIG_FILE"
                echo "配置已恢复从: $2"
            else
                echo "请指定有效的备份文件路径"
            fi
            ;;
        "test")
            echo "测试当前配置..."
            goaccess --dcf
            ;;
        *)
            echo "用法: $0 {optimize|restore|test}"
            echo "  optimize [log_size_mb] - 优化配置(默认500MB日志)"
            echo "  restore <backup_file>  - 恢复配置"
            echo "  test                   - 测试当前配置"
            ;;
    esac
}

main "$@"

2. 故障排除指南

2.1 常见问题诊断

2.1.1 诊断工具

#!/usr/bin/env python3
# goaccess_diagnostics.py - GoAccess诊断工具

import subprocess
import json
import os
import sys
import re
import logging
from typing import Dict, List, Optional
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class GoAccessDiagnostics:
    def __init__(self):
        self.issues = []
        self.warnings = []
        self.info = []
        
    def check_installation(self) -> Dict[str, any]:
        """检查GoAccess安装"""
        result = {
            'installed': False,
            'version': None,
            'path': None,
            'features': []
        }
        
        try:
            # 检查GoAccess是否安装
            which_result = subprocess.run(['which', 'goaccess'], 
                                        capture_output=True, text=True)
            if which_result.returncode == 0:
                result['installed'] = True
                result['path'] = which_result.stdout.strip()
                
                # 获取版本信息
                version_result = subprocess.run(['goaccess', '--version'], 
                                              capture_output=True, text=True)
                if version_result.returncode == 0:
                    version_output = version_result.stdout
                    version_match = re.search(r'GoAccess - (\d+\.\d+\.\d+)', version_output)
                    if version_match:
                        result['version'] = version_match.group(1)
                    
                    # 检查编译特性
                    if 'UTF-8' in version_output:
                        result['features'].append('UTF-8')
                    if 'GeoIP' in version_output:
                        result['features'].append('GeoIP')
                    if 'Tokyo Cabinet' in version_output:
                        result['features'].append('Tokyo Cabinet')
                    if 'SSL/TLS' in version_output:
                        result['features'].append('SSL/TLS')
                
                self.info.append(f"GoAccess已安装: {result['version']} at {result['path']}")
            else:
                self.issues.append("GoAccess未安装或不在PATH中")
                
        except Exception as e:
            self.issues.append(f"检查GoAccess安装时出错: {e}")
        
        return result
    
    def check_configuration(self) -> Dict[str, any]:
        """检查配置文件"""
        result = {
            'config_files': [],
            'valid_config': False,
            'config_errors': []
        }
        
        # 常见配置文件位置
        config_paths = [
            '/etc/goaccess/goaccess.conf',
            '/etc/goaccess.conf',
            '/usr/local/etc/goaccess.conf',
            os.path.expanduser('~/.goaccessrc')
        ]
        
        for config_path in config_paths:
            if os.path.exists(config_path):
                result['config_files'].append(config_path)
                self.info.append(f"找到配置文件: {config_path}")
        
        if not result['config_files']:
            self.warnings.append("未找到GoAccess配置文件")
        
        # 测试配置文件
        try:
            test_result = subprocess.run(['goaccess', '--dcf'], 
                                       capture_output=True, text=True)
            if test_result.returncode == 0:
                result['valid_config'] = True
                self.info.append("配置文件语法正确")
            else:
                result['config_errors'].append(test_result.stderr)
                self.issues.append(f"配置文件语法错误: {test_result.stderr}")
        except Exception as e:
            self.issues.append(f"测试配置文件时出错: {e}")
        
        return result
    
    def check_log_files(self, log_paths: List[str]) -> Dict[str, any]:
        """检查日志文件"""
        result = {
            'accessible_logs': [],
            'inaccessible_logs': [],
            'log_formats': {},
            'total_size_mb': 0
        }
        
        for log_path in log_paths:
            try:
                if os.path.exists(log_path):
                    if os.access(log_path, os.R_OK):
                        stat_info = os.stat(log_path)
                        size_mb = stat_info.st_size / (1024 * 1024)
                        result['accessible_logs'].append({
                            'path': log_path,
                            'size_mb': round(size_mb, 2),
                            'modified': datetime.fromtimestamp(stat_info.st_mtime).isoformat()
                        })
                        result['total_size_mb'] += size_mb
                        
                        # 检测日志格式
                        log_format = self.detect_log_format(log_path)
                        result['log_formats'][log_path] = log_format
                        
                        self.info.append(f"日志文件可访问: {log_path} ({size_mb:.1f}MB)")
                    else:
                        result['inaccessible_logs'].append(log_path)
                        self.issues.append(f"日志文件无读取权限: {log_path}")
                else:
                    result['inaccessible_logs'].append(log_path)
                    self.issues.append(f"日志文件不存在: {log_path}")
            except Exception as e:
                result['inaccessible_logs'].append(log_path)
                self.issues.append(f"检查日志文件 {log_path} 时出错: {e}")
        
        return result
    
    def detect_log_format(self, log_path: str) -> str:
        """检测日志格式"""
        try:
            with open(log_path, 'r') as f:
                # 读取前几行进行格式检测
                lines = [f.readline().strip() for _ in range(5) if f.readline()]
            
            if not lines:
                return 'empty'
            
            sample_line = lines[0]
            
            # 检测常见格式
            if '"' in sample_line and '[' in sample_line and ']' in sample_line:
                # 可能是Combined格式
                parts = sample_line.split('"')
                if len(parts) >= 3:
                    return 'COMBINED'
            
            if sample_line.count(' ') >= 6:
                # 可能是Common格式
                return 'COMMON'
            
            # 检查是否是JSON格式
            try:
                json.loads(sample_line)
                return 'JSON'
            except json.JSONDecodeError:
                pass
            
            return 'unknown'
            
        except Exception as e:
            logger.error(f"检测日志格式时出错: {e}")
            return 'error'
    
    def check_dependencies(self) -> Dict[str, any]:
        """检查依赖项"""
        result = {
            'geoip_database': False,
            'geoip_path': None,
            'system_libraries': {},
            'disk_space': {}
        }
        
        # 检查GeoIP数据库
        geoip_paths = [
            '/usr/share/GeoIP/GeoLite2-Country.mmdb',
            '/usr/local/share/GeoIP/GeoLite2-Country.mmdb',
            '/var/lib/GeoIP/GeoLite2-Country.mmdb'
        ]
        
        for geoip_path in geoip_paths:
            if os.path.exists(geoip_path):
                result['geoip_database'] = True
                result['geoip_path'] = geoip_path
                self.info.append(f"找到GeoIP数据库: {geoip_path}")
                break
        
        if not result['geoip_database']:
            self.warnings.append("未找到GeoIP数据库,地理位置功能将不可用")
        
        # 检查系统库
        libraries = ['libncurses', 'libgeoip', 'libssl']
        for lib in libraries:
            try:
                ldconfig_result = subprocess.run(['ldconfig', '-p'], 
                                                capture_output=True, text=True)
                if lib in ldconfig_result.stdout:
                    result['system_libraries'][lib] = True
                else:
                    result['system_libraries'][lib] = False
                    self.warnings.append(f"系统库 {lib} 可能未安装")
            except Exception:
                result['system_libraries'][lib] = 'unknown'
        
        # 检查磁盘空间
        try:
            import shutil
            
            paths_to_check = ['/tmp', '/var/log', '/var/www']
            for path in paths_to_check:
                if os.path.exists(path):
                    total, used, free = shutil.disk_usage(path)
                    result['disk_space'][path] = {
                        'total_gb': round(total / (1024**3), 2),
                        'free_gb': round(free / (1024**3), 2),
                        'used_percent': round((used / total) * 100, 1)
                    }
                    
                    if (free / total) < 0.1:  # 少于10%空闲空间
                        self.warnings.append(f"磁盘空间不足: {path} (仅剩{round(free / (1024**3), 1)}GB)")
        except Exception as e:
            self.warnings.append(f"检查磁盘空间时出错: {e}")
        
        return result
    
    def check_permissions(self) -> Dict[str, any]:
        """检查权限"""
        result = {
            'user': os.getenv('USER', 'unknown'),
            'uid': os.getuid(),
            'gid': os.getgid(),
            'writable_paths': [],
            'permission_issues': []
        }
        
        # 检查常见输出路径的写权限
        paths_to_check = [
            '/var/www/html',
            '/tmp',
            '/var/log',
            os.path.expanduser('~')
        ]
        
        for path in paths_to_check:
            if os.path.exists(path):
                if os.access(path, os.W_OK):
                    result['writable_paths'].append(path)
                    self.info.append(f"路径可写: {path}")
                else:
                    result['permission_issues'].append(path)
                    self.issues.append(f"路径无写权限: {path}")
        
        return result
    
    def test_basic_functionality(self, test_log_path: str = None) -> Dict[str, any]:
        """测试基本功能"""
        result = {
            'basic_parsing': False,
            'json_output': False,
            'html_output': False,
            'error_messages': []
        }
        
        if not test_log_path:
            # 创建测试日志
            test_log_path = '/tmp/goaccess_test.log'
            test_log_content = '''127.0.0.1 - - [25/Dec/2023:10:00:00 +0000] "GET / HTTP/1.1" 200 1234 "-" "Mozilla/5.0"
127.0.0.1 - - [25/Dec/2023:10:00:01 +0000] "GET /test HTTP/1.1" 404 567 "-" "Mozilla/5.0"
'''
            try:
                with open(test_log_path, 'w') as f:
                    f.write(test_log_content)
            except Exception as e:
                self.issues.append(f"创建测试日志失败: {e}")
                return result
        
        # 测试基本解析
        try:
            basic_result = subprocess.run([
                'goaccess', test_log_path,
                '--log-format=COMBINED',
                '--no-progress'
            ], capture_output=True, text=True, timeout=30)
            
            if basic_result.returncode == 0:
                result['basic_parsing'] = True
                self.info.append("基本解析功能正常")
            else:
                result['error_messages'].append(f"基本解析失败: {basic_result.stderr}")
                self.issues.append(f"基本解析失败: {basic_result.stderr}")
        except subprocess.TimeoutExpired:
            result['error_messages'].append("基本解析超时")
            self.issues.append("基本解析超时")
        except Exception as e:
            result['error_messages'].append(f"基本解析出错: {e}")
            self.issues.append(f"基本解析出错: {e}")
        
        # 测试JSON输出
        try:
            json_result = subprocess.run([
                'goaccess', test_log_path,
                '--log-format=COMBINED',
                '--json-pretty-print',
                '--no-progress'
            ], capture_output=True, text=True, timeout=30)
            
            if json_result.returncode == 0:
                # 验证JSON格式
                try:
                    json.loads(json_result.stdout)
                    result['json_output'] = True
                    self.info.append("JSON输出功能正常")
                except json.JSONDecodeError:
                    result['error_messages'].append("JSON输出格式无效")
                    self.issues.append("JSON输出格式无效")
            else:
                result['error_messages'].append(f"JSON输出失败: {json_result.stderr}")
                self.issues.append(f"JSON输出失败: {json_result.stderr}")
        except subprocess.TimeoutExpired:
            result['error_messages'].append("JSON输出超时")
            self.issues.append("JSON输出超时")
        except Exception as e:
            result['error_messages'].append(f"JSON输出出错: {e}")
            self.issues.append(f"JSON输出出错: {e}")
        
        # 测试HTML输出
        try:
            html_output_path = '/tmp/goaccess_test.html'
            html_result = subprocess.run([
                'goaccess', test_log_path,
                '--log-format=COMBINED',
                '--output', html_output_path,
                '--no-progress'
            ], capture_output=True, text=True, timeout=30)
            
            if html_result.returncode == 0 and os.path.exists(html_output_path):
                result['html_output'] = True
                self.info.append("HTML输出功能正常")
                # 清理测试文件
                try:
                    os.remove(html_output_path)
                except:
                    pass
            else:
                result['error_messages'].append(f"HTML输出失败: {html_result.stderr}")
                self.issues.append(f"HTML输出失败: {html_result.stderr}")
        except subprocess.TimeoutExpired:
            result['error_messages'].append("HTML输出超时")
            self.issues.append("HTML输出超时")
        except Exception as e:
            result['error_messages'].append(f"HTML输出出错: {e}")
            self.issues.append(f"HTML输出出错: {e}")
        
        # 清理测试日志
        if test_log_path == '/tmp/goaccess_test.log':
            try:
                os.remove(test_log_path)
            except:
                pass
        
        return result
    
    def generate_diagnostic_report(self) -> Dict[str, any]:
        """生成诊断报告"""
        report = {
            'timestamp': datetime.now().isoformat(),
            'summary': {
                'total_issues': len(self.issues),
                'total_warnings': len(self.warnings),
                'total_info': len(self.info)
            },
            'issues': self.issues,
            'warnings': self.warnings,
            'info': self.info,
            'recommendations': self.generate_recommendations()
        }
        
        return report
    
    def generate_recommendations(self) -> List[str]:
        """生成修复建议"""
        recommendations = []
        
        if any('未安装' in issue for issue in self.issues):
            recommendations.append("安装GoAccess: sudo apt-get install goaccess 或从源码编译")
        
        if any('配置文件' in issue for issue in self.issues):
            recommendations.append("创建或修复GoAccess配置文件")
        
        if any('权限' in issue for issue in self.issues):
            recommendations.append("检查并修复文件权限问题")
        
        if any('日志文件' in issue for issue in self.issues):
            recommendations.append("确保日志文件存在且可读")
        
        if any('GeoIP' in warning for warning in self.warnings):
            recommendations.append("安装GeoIP数据库以启用地理位置功能")
        
        if any('磁盘空间' in warning for warning in self.warnings):
            recommendations.append("清理磁盘空间或扩容")
        
        if not recommendations:
            recommendations.append("系统状态良好,无需特别修复")
        
        return recommendations
    
    def run_full_diagnostics(self, log_paths: List[str] = None) -> Dict[str, any]:
        """运行完整诊断"""
        if log_paths is None:
            log_paths = ['/var/log/nginx/access.log', '/var/log/apache2/access.log']
        
        logger.info("开始GoAccess完整诊断...")
        
        diagnostics = {
            'installation': self.check_installation(),
            'configuration': self.check_configuration(),
            'log_files': self.check_log_files(log_paths),
            'dependencies': self.check_dependencies(),
            'permissions': self.check_permissions(),
            'functionality': self.test_basic_functionality()
        }
        
        # 生成诊断报告
        diagnostics['report'] = self.generate_diagnostic_report()
        
        return diagnostics

def main():
    import argparse
    
    parser = argparse.ArgumentParser(description='GoAccess诊断工具')
    parser.add_argument('--log-files', nargs='+', 
                       help='要检查的日志文件路径')
    parser.add_argument('--output', '-o', 
                       help='诊断报告输出文件')
    parser.add_argument('--json', action='store_true',
                       help='以JSON格式输出')
    
    args = parser.parse_args()
    
    # 运行诊断
    diagnostics = GoAccessDiagnostics()
    results = diagnostics.run_full_diagnostics(args.log_files)
    
    # 输出结果
    if args.json:
        output = json.dumps(results, indent=2, ensure_ascii=False)
    else:
        # 格式化输出
        output = "=== GoAccess诊断报告 ===\n\n"
        
        # 摘要
        summary = results['report']['summary']
        output += f"问题数量: {summary['total_issues']}\n"
        output += f"警告数量: {summary['total_warnings']}\n"
        output += f"信息数量: {summary['total_info']}\n\n"
        
        # 问题列表
        if results['report']['issues']:
            output += "=== 发现的问题 ===\n"
            for i, issue in enumerate(results['report']['issues'], 1):
                output += f"{i}. {issue}\n"
            output += "\n"
        
        # 警告列表
        if results['report']['warnings']:
            output += "=== 警告信息 ===\n"
            for i, warning in enumerate(results['report']['warnings'], 1):
                output += f"{i}. {warning}\n"
            output += "\n"
        
        # 修复建议
        output += "=== 修复建议 ===\n"
        for i, rec in enumerate(results['report']['recommendations'], 1):
            output += f"{i}. {rec}\n"
    
    # 保存或打印结果
    if args.output:
        with open(args.output, 'w', encoding='utf-8') as f:
            f.write(output)
        print(f"诊断报告已保存到: {args.output}")
    else:
        print(output)

if __name__ == "__main__":
    main()

2.2 性能问题排查

2.2.1 性能监控脚本

”`bash #!/bin/bash

performance_troubleshoot.sh - 性能问题排查脚本

LOG_FILE=“/var/log/goaccess_performance.log” PID_FILE=“/var/run/goaccess_monitor.pid”

echo “=== GoAccess性能问题排查工具 ===”

日志记录函数

log_message() { echo “$(date ‘+%Y-%m-%d %H:%M:%S’) - $1” | tee -a “$LOG_FILE” }

检查GoAccess进程

check_goaccess_processes() { log_message “检查GoAccess进程…”

local processes=$(ps aux | grep goaccess | grep -v grep)

if [ -z "$processes" ]; then
    log_message "未发现运行中的GoAccess进程"
    return 1
fi

echo "$processes" | while read line; do
    local pid=$(echo "$line" | awk '{print $2}')
    local cpu=$(echo "$line" | awk '{print $3}')
    local mem=$(echo "$line" | awk '{print $4}')
    local cmd=$(echo "$line" | awk '{for(i=11;i<=NF;i++) printf "%s ", $i; print ""}')

    log_message "进程 $pid: CPU=${cpu}%, Memory=${mem}%, Command=$cmd"

    # 检查高CPU使用率
    if (( $(echo "$cpu > 80" | bc -l) )); then
        log_message "警告: 进程 $pid CPU使用率过高 ($cpu%)"

        # 获取进程详细信息
        if [ -d "/proc/$pid" ]; then
            local fd_count=$(ls /proc/$pid/fd 2>/dev/null | wc -l)
            local threads=$(cat /proc/$pid/status 2>/dev/null | grep Threads | awk '{print $2}')
            log_message "进程 $pid 详情: 文件描述符=$fd_count, 线程数=$threads"
        fi
    fi

    # 检查高内存使用率
    if (( $(echo "$mem > 10" | bc -l) )); then
        log_message "警告: 进程 $pid 内存使用率过高 ($mem%)"
    fi
done

}

检查系统资源

check_system_resources() { log_message “检查系统资源…”

# CPU使用率
local cpu_usage=$(top -bn1 | grep "Cpu(s)" | awk '{print $2}' | sed 's/%us,//')
log_message "系统CPU使用率: $cpu_usage"

# 内存使用情况
local memory_info=$(free -h | grep Mem)
log_message "内存使用情况: $memory_info"

# 磁盘使用情况
log_message "磁盘使用情况:"
df -h | grep -E '^/dev/' | while read line; do
    log_message "  $line"
done

# 负载平均值
local load_avg=$(uptime | awk -F'load average:' '{print $2}')
log_message "系统负载: $load_avg"

# I/O等待
local io_wait=$(iostat -c 1 2 | tail -1 | awk '{print $4}')
log_message "I/O等待: ${io_wait}%"

}

检查日志文件状态

check_log_files() { log_message “检查日志文件状态…”

local log_paths=(
    "/var/log/nginx/access.log"
    "/var/log/apache2/access.log"
    "/var/log/httpd/access_log"
)

for log_path in "${log_paths[@]}"; do
    if [ -f "$log_path" ]; then
        local size=$(du -h "$log_path" | cut -f1)
        local lines=$(wc -l < "$log_path")
        local modified=$(stat -c %y "$log_path")

        log_message "日志文件: $log_path"
        log_message "  大小: $size"
        log_message "  行数: $lines"
        log_message "  修改时间: $modified"

        # 检查日志增长速度
        local current_size=$(stat -c%s "$log_path")
        local size_file="/tmp/goaccess_log_size_$(basename "$log_path")"

        if [ -f "$size_file" ]; then
            local previous_size=$(cat "$size_file")
            local growth=$((current_size - previous_size))
            local growth_mb=$((growth / 1024 / 1024))

            if [ $growth_mb -gt 0 ]; then
                log_message "  增长: ${growth_mb}MB (自上次检查)"
            fi
        fi

        echo "$current_size" > "$size_file"
    fi
done

}

检查网络连接

check_network_connections() { log_message “检查网络连接…”

# 检查监听端口
local listening_ports=$(netstat -tlnp 2>/dev/null | grep goaccess)
if [ -n "$listening_ports" ]; then
    log_message "GoAccess监听端口:"
    echo "$listening_ports" | while read line; do
        log_message "  $line"
    done
fi

# 检查活动连接数
local active_connections=$(netstat -an | grep :7890 | wc -l)
log_message "活动WebSocket连接数: $active_connections"

if [ $active_connections -gt 100 ]; then
    log_message "警告: WebSocket连接数过多 ($active_connections)"
fi

}

分析性能瓶颈

analyze_performance_bottlenecks() { log_message “分析性能瓶颈…”

# 检查I/O瓶颈
local io_stats=$(iostat -x 1 2 | tail -n +4)
log_message "磁盘I/O统计:"
echo "$io_stats" | while read line; do
    if [ -n "$line" ] && [[ ! "$line" =~ ^avg-cpu ]]; then
        log_message "  $line"
    fi
done

# 检查内存瓶颈
local swap_usage=$(free | grep Swap | awk '{print ($3/$2)*100}')
if (( $(echo "$swap_usage > 10" | bc -l) )); then
    log_message "警告: 交换分区使用率过高 (${swap_usage}%)"
fi

# 检查文件描述符使用
local fd_usage=$(lsof | wc -l)
local fd_limit=$(ulimit -n)
local fd_percent=$((fd_usage * 100 / fd_limit))

log_message "文件描述符使用: $fd_usage/$fd_limit ($fd_percent%)"

if [ $fd_percent -gt 80 ]; then
    log_message "警告: 文件描述符使用率过高"
fi

}

生成性能报告

generate_performance_report() { local report_file=“/tmp/goaccess_performancereport$(date +%Y%m%d_%H%M%S).txt”

log_message "生成性能报告: $report_file"

{
    echo "=== GoAccess性能报告 ==="
    echo "生成时间: $(date)"
    echo ""

    echo "=== 系统信息 ==="
    uname -a
    echo ""

    echo "=== CPU信息 ==="
    lscpu | grep -E '^CPU\(s\)|^Model name|^CPU MHz'
    echo ""

    echo "=== 内存信息 ==="
    free -h
    echo ""

    echo "=== 磁盘信息 ==="
    df -h
    echo ""

    echo "=== GoAccess进程信息 ==="
    ps aux | grep goaccess | grep -v grep
    echo ""

    echo "=== 最近的性能日志 ==="
    tail -50 "$LOG_FILE"

} > "$report_file"

log_message "性能报告已生成: $report_file"

}

自动修复常见问题

auto_fix_common_issues() { log_message “尝试自动修复常见问题…”

# 清理临时文件
local temp_files=$(find /tmp -name "goaccess*" -type f -mtime +1 2>/dev/null)
if [ -n "$temp_files" ]; then
    echo "$temp_files" | xargs rm -f
    log_message "清理了过期的临时文件"
fi

# 重启卡死的GoAccess进程
local stuck_processes=$(ps aux | grep goaccess | grep -v grep | awk '$3 > 90 {print $2}')
if [ -n "$stuck_processes" ]; then
    for pid in $stuck_processes; do
        log_message "终止卡死的进程: $pid"
        kill -TERM "$pid"
        sleep 5
        if kill -0 "$pid" 2>/dev/null; then
            kill -KILL "$pid"
            log_message "强制终止进程: $pid"
        fi
    done
fi

# 清理内存缓存
if [ -w /proc/sys/vm/drop_caches ]; then
    sync
    echo 1 > /proc/sys/vm/drop_caches
    log_message "清理了页面缓存"
fi

}

持续监控模式

continuous_monitoring() { local interval=${1:-60} # 默认60秒间隔

log_message "开始持续监控模式,间隔: ${interval}秒"

# 保存PID
echo $$ > "$PID_FILE"

# 信号处理
trap 'log_message "收到停止信号,退出监控"; rm -f "$PID_FILE"; exit 0' TERM INT

while true; do
    check_goaccess_processes
    check_system_resources
    check_log_files
    check_network_connections
    analyze_performance_bottlenecks

    # 检查是否需要自动修复
    local issues=$(grep -c "警告" "$LOG_FILE" | tail -1)
    if [ "$issues" -gt 5 ]; then
        auto_fix_common_issues
    fi

    sleep "$interval"
done

}

主函数

main() { case “$1” in “check”) check_goaccess_processes check_system_resources check_log_files check_network_connections analyze_performance_bottlenecks ;; “monitor”) continuous_monitoring “${2:-60}” ;; “report”) generate_performance_report ;; “fix”) auto_fix_common_issues ;; “stop”) if [ -f “$PID_FILE” ]; then local monitor_pid=$(cat “$PID_FILE”) kill “$monitor_pid” 2>/dev/null rm -f “$PID_FILE” log_message “监控已停止” else log_message “监控未运行” fi ;; *) echo “用法: $0 {check|monitor|report|fix|stop}” echo “ check - 执行一次性检查” echo “ monitor [间隔] - 持续监控模式” echo “ report - 生成性能报告” echo “ fix - 自动修复常见问题” echo “ stop - 停止监控” ;; esac }

main “$@”