1. 高级过滤功能

1.1 IP地址过滤

# 配置文件中设置
ignore-ip 127.0.0.1
ignore-ip ::1
ignore-ip 192.168.1.0/24
ignore-ip 10.0.0.0/8

# 命令行参数
goaccess /var/log/nginx/access.log \
  --ignore-ip=127.0.0.1 \
  --ignore-ip=192.168.1.0/24 \
  --log-format=COMBINED

1.2 状态码过滤

# 忽略特定状态码
ignore-status 200
ignore-status 304
ignore-status 404

# 命令行使用
goaccess /var/log/nginx/access.log \
  --ignore-status=200 \
  --ignore-status=304 \
  --log-format=COMBINED

1.3 面板过滤

# 隐藏不需要的统计面板
ignore-panel VISITORS
ignore-panel REQUESTS_STATIC
ignore-panel OS
ignore-panel BROWSERS

# 可用面板列表
# VISITORS - 访问者统计
# REQUESTS - 请求统计
# REQUESTS_STATIC - 静态请求
# NOT_FOUND - 404错误
# HOSTS - 主机统计
# OS - 操作系统
# BROWSERS - 浏览器
# VISIT_TIMES - 访问时间
# VIRTUAL_HOSTS - 虚拟主机
# REFERRERS - 引用页面
# REFERRING_SITES - 引用站点
# KEYPHRASES - 关键词
# STATUS_CODES - 状态码
# REMOTE_USER - 远程用户
# CACHE_STATUS - 缓存状态

1.4 高级过滤脚本

#!/usr/bin/env python3
# advanced_filter.py - GoAccess高级过滤工具

import re
import sys
import argparse
from datetime import datetime, timedelta
from ipaddress import ip_network, ip_address

class LogFilter:
    def __init__(self):
        self.filters = {
            'ip_whitelist': [],
            'ip_blacklist': [],
            'status_codes': [],
            'user_agents': [],
            'time_range': None,
            'request_patterns': [],
            'min_response_size': None,
            'max_response_size': None
        }
    
    def add_ip_filter(self, ip_range, filter_type='blacklist'):
        """添加IP过滤规则"""
        try:
            network = ip_network(ip_range, strict=False)
            if filter_type == 'blacklist':
                self.filters['ip_blacklist'].append(network)
            else:
                self.filters['ip_whitelist'].append(network)
        except Exception as e:
            print(f"无效的IP范围 {ip_range}: {e}")
    
    def add_status_filter(self, status_codes):
        """添加状态码过滤"""
        if isinstance(status_codes, str):
            status_codes = [status_codes]
        self.filters['status_codes'].extend(status_codes)
    
    def add_time_filter(self, start_time, end_time):
        """添加时间范围过滤"""
        self.filters['time_range'] = (start_time, end_time)
    
    def add_user_agent_filter(self, patterns):
        """添加User-Agent过滤"""
        if isinstance(patterns, str):
            patterns = [patterns]
        self.filters['user_agents'].extend(patterns)
    
    def add_request_pattern_filter(self, patterns):
        """添加请求模式过滤"""
        if isinstance(patterns, str):
            patterns = [patterns]
        self.filters['request_patterns'].extend(patterns)
    
    def set_response_size_filter(self, min_size=None, max_size=None):
        """设置响应大小过滤"""
        self.filters['min_response_size'] = min_size
        self.filters['max_response_size'] = max_size
    
    def parse_log_line(self, line):
        """解析日志行"""
        # Apache/Nginx Combined格式
        pattern = r'^(\S+) \S+ \S+ \[([^\]]+)\] "([^"]+)" (\d+) (\S+) "([^"]+)" "([^"]+)"'
        match = re.match(pattern, line)
        
        if not match:
            return None
        
        return {
            'ip': match.group(1),
            'timestamp': match.group(2),
            'request': match.group(3),
            'status': match.group(4),
            'size': match.group(5),
            'referer': match.group(6),
            'user_agent': match.group(7)
        }
    
    def should_include_line(self, log_entry):
        """判断是否应该包含此日志行"""
        if not log_entry:
            return False
        
        # IP过滤
        try:
            client_ip = ip_address(log_entry['ip'])
            
            # 黑名单检查
            for network in self.filters['ip_blacklist']:
                if client_ip in network:
                    return False
            
            # 白名单检查
            if self.filters['ip_whitelist']:
                in_whitelist = False
                for network in self.filters['ip_whitelist']:
                    if client_ip in network:
                        in_whitelist = True
                        break
                if not in_whitelist:
                    return False
        
        except Exception:
            pass  # 忽略IP解析错误
        
        # 状态码过滤
        if self.filters['status_codes']:
            if log_entry['status'] in self.filters['status_codes']:
                return False
        
        # User-Agent过滤
        for pattern in self.filters['user_agents']:
            if re.search(pattern, log_entry['user_agent'], re.IGNORECASE):
                return False
        
        # 请求模式过滤
        for pattern in self.filters['request_patterns']:
            if re.search(pattern, log_entry['request'], re.IGNORECASE):
                return False
        
        # 响应大小过滤
        try:
            size = int(log_entry['size']) if log_entry['size'] != '-' else 0
            if self.filters['min_response_size'] and size < self.filters['min_response_size']:
                return False
            if self.filters['max_response_size'] and size > self.filters['max_response_size']:
                return False
        except ValueError:
            pass
        
        return True
    
    def filter_log_file(self, input_file, output_file=None):
        """过滤日志文件"""
        filtered_lines = []
        total_lines = 0
        filtered_count = 0
        
        try:
            with open(input_file, 'r', encoding='utf-8', errors='ignore') as f:
                for line in f:
                    total_lines += 1
                    line = line.strip()
                    
                    log_entry = self.parse_log_line(line)
                    if self.should_include_line(log_entry):
                        filtered_lines.append(line)
                    else:
                        filtered_count += 1
            
            # 输出结果
            if output_file:
                with open(output_file, 'w', encoding='utf-8') as f:
                    for line in filtered_lines:
                        f.write(line + '\n')
                print(f"过滤后的日志已保存到: {output_file}")
            else:
                for line in filtered_lines:
                    print(line)
            
            print(f"\n过滤统计:")
            print(f"总行数: {total_lines}")
            print(f"过滤掉: {filtered_count}")
            print(f"保留: {len(filtered_lines)}")
            print(f"过滤率: {filtered_count/total_lines*100:.2f}%")
            
            return filtered_lines
        
        except Exception as e:
            print(f"处理日志文件时出错: {e}")
            return []

def create_bot_filter():
    """创建机器人过滤器"""
    filter_obj = LogFilter()
    
    # 常见爬虫User-Agent模式
    bot_patterns = [
        r'bot',
        r'crawler',
        r'spider',
        r'scraper',
        r'Googlebot',
        r'Bingbot',
        r'Slurp',
        r'DuckDuckBot',
        r'Baiduspider',
        r'YandexBot',
        r'facebookexternalhit',
        r'Twitterbot',
        r'LinkedInBot',
        r'WhatsApp',
        r'Telegram'
    ]
    
    filter_obj.add_user_agent_filter(bot_patterns)
    return filter_obj

def create_security_filter():
    """创建安全过滤器"""
    filter_obj = LogFilter()
    
    # 常见攻击模式
    attack_patterns = [
        r'\.\./\.\./',  # 目录遍历
        r'<script',      # XSS
        r'union.*select', # SQL注入
        r'eval\(',       # 代码注入
        r'base64_decode', # 恶意代码
        r'system\(',     # 命令注入
        r'exec\(',       # 命令执行
        r'/etc/passwd',  # 敏感文件访问
        r'wp-admin',     # WordPress攻击
        r'phpmyadmin',   # 数据库管理工具攻击
    ]
    
    filter_obj.add_request_pattern_filter(attack_patterns)
    
    # 过滤常见攻击状态码
    filter_obj.add_status_filter(['400', '401', '403', '404', '500'])
    
    return filter_obj

def main():
    """主函数"""
    parser = argparse.ArgumentParser(description='GoAccess高级日志过滤工具')
    parser.add_argument('input_file', help='输入日志文件')
    parser.add_argument('-o', '--output', help='输出文件路径')
    parser.add_argument('--filter-bots', action='store_true', help='过滤机器人')
    parser.add_argument('--filter-attacks', action='store_true', help='过滤攻击请求')
    parser.add_argument('--ignore-ip', action='append', help='忽略的IP地址或网段')
    parser.add_argument('--ignore-status', action='append', help='忽略的状态码')
    parser.add_argument('--min-size', type=int, help='最小响应大小')
    parser.add_argument('--max-size', type=int, help='最大响应大小')
    
    args = parser.parse_args()
    
    # 创建过滤器
    log_filter = LogFilter()
    
    # 应用预定义过滤器
    if args.filter_bots:
        bot_filter = create_bot_filter()
        log_filter.filters['user_agents'].extend(bot_filter.filters['user_agents'])
    
    if args.filter_attacks:
        security_filter = create_security_filter()
        log_filter.filters['request_patterns'].extend(security_filter.filters['request_patterns'])
        log_filter.filters['status_codes'].extend(security_filter.filters['status_codes'])
    
    # 应用命令行参数
    if args.ignore_ip:
        for ip in args.ignore_ip:
            log_filter.add_ip_filter(ip, 'blacklist')
    
    if args.ignore_status:
        log_filter.add_status_filter(args.ignore_status)
    
    if args.min_size or args.max_size:
        log_filter.set_response_size_filter(args.min_size, args.max_size)
    
    # 执行过滤
    log_filter.filter_log_file(args.input_file, args.output)

if __name__ == "__main__":
    main()

2. 地理位置分析

2.1 GeoIP配置

# 安装GeoIP数据库
sudo apt-get install geoip-database geoip-database-extra

# 或下载最新数据库
wget http://geolite.maxmind.com/download/geoip/database/GeoLiteCountry/GeoIP.dat.gz
gunzip GeoIP.dat.gz
sudo mv GeoIP.dat /usr/share/GeoIP/

# 配置GoAccess
geoip-database /usr/share/GeoIP/GeoIP.dat

2.2 GeoIP数据库更新脚本

#!/bin/bash
# update_geoip.sh - GeoIP数据库更新脚本

GEOIP_DIR="/usr/share/GeoIP"
TEMP_DIR="/tmp/geoip_update"
LOG_FILE="/var/log/geoip_update.log"

# 创建临时目录
mkdir -p $TEMP_DIR
cd $TEMP_DIR

echo "$(date): 开始更新GeoIP数据库" >> $LOG_FILE

# 下载最新数据库
echo "下载GeoIP数据库..."
wget -q http://geolite.maxmind.com/download/geoip/database/GeoLiteCountry/GeoIP.dat.gz
wget -q http://geolite.maxmind.com/download/geoip/database/GeoLiteCity/GeoLiteCity.dat.gz

if [ $? -eq 0 ]; then
    echo "$(date): 下载成功" >> $LOG_FILE
    
    # 解压文件
    gunzip -f *.gz
    
    # 备份旧文件
    if [ -f "$GEOIP_DIR/GeoIP.dat" ]; then
        cp "$GEOIP_DIR/GeoIP.dat" "$GEOIP_DIR/GeoIP.dat.bak"
    fi
    
    if [ -f "$GEOIP_DIR/GeoLiteCity.dat" ]; then
        cp "$GEOIP_DIR/GeoLiteCity.dat" "$GEOIP_DIR/GeoLiteCity.dat.bak"
    fi
    
    # 安装新文件
    sudo mv GeoIP.dat $GEOIP_DIR/
    sudo mv GeoLiteCity.dat $GEOIP_DIR/
    
    # 设置权限
    sudo chmod 644 $GEOIP_DIR/GeoIP.dat
    sudo chmod 644 $GEOIP_DIR/GeoLiteCity.dat
    
    echo "$(date): GeoIP数据库更新完成" >> $LOG_FILE
    
    # 重启GoAccess服务(如果在运行)
    if systemctl is-active --quiet goaccess; then
        sudo systemctl restart goaccess
        echo "$(date): GoAccess服务已重启" >> $LOG_FILE
    fi
else
    echo "$(date): 下载失败" >> $LOG_FILE
fi

# 清理临时文件
rm -rf $TEMP_DIR

echo "$(date): GeoIP更新任务完成" >> $LOG_FILE

2.3 地理位置分析脚本

#!/usr/bin/env python3
# geo_analysis.py - 地理位置访问分析

import json
import sys
import subprocess
from collections import defaultdict, Counter
import matplotlib.pyplot as plt
import pandas as pd
from datetime import datetime

class GeoAnalyzer:
    def __init__(self, log_file):
        self.log_file = log_file
        self.geo_data = defaultdict(int)
        self.country_stats = Counter()
        self.city_stats = Counter()
    
    def generate_goaccess_report(self):
        """生成GoAccess JSON报告"""
        cmd = [
            'goaccess', self.log_file,
            '--log-format=COMBINED',
            '--json-pretty-print'
        ]
        
        try:
            result = subprocess.run(cmd, capture_output=True, text=True)
            if result.returncode == 0:
                return json.loads(result.stdout)
            else:
                print(f"GoAccess执行失败: {result.stderr}")
                return None
        except Exception as e:
            print(f"执行GoAccess时出错: {e}")
            return None
    
    def analyze_geo_data(self, report_data):
        """分析地理位置数据"""
        if not report_data or 'hosts' not in report_data:
            print("报告中没有主机数据")
            return
        
        hosts_data = report_data['hosts']['data']
        
        for host in hosts_data:
            country = host.get('country', 'Unknown')
            city = host.get('city', 'Unknown')
            hits = host.get('hits', 0)
            
            self.country_stats[country] += hits
            if city != 'Unknown':
                self.city_stats[f"{city}, {country}"] += hits
    
    def generate_country_report(self, top_n=20):
        """生成国家访问报告"""
        print(f"\n访问量前{top_n}的国家:")
        print("=" * 50)
        print(f"{'排名':<4} {'国家':<20} {'访问量':<10} {'百分比':<8}")
        print("-" * 50)
        
        total_hits = sum(self.country_stats.values())
        
        for i, (country, hits) in enumerate(self.country_stats.most_common(top_n), 1):
            percentage = (hits / total_hits) * 100 if total_hits > 0 else 0
            print(f"{i:<4} {country:<20} {hits:<10} {percentage:<7.2f}%")
    
    def generate_city_report(self, top_n=15):
        """生成城市访问报告"""
        print(f"\n访问量前{top_n}的城市:")
        print("=" * 60)
        print(f"{'排名':<4} {'城市':<30} {'访问量':<10} {'百分比':<8}")
        print("-" * 60)
        
        total_hits = sum(self.city_stats.values())
        
        for i, (city, hits) in enumerate(self.city_stats.most_common(top_n), 1):
            percentage = (hits / total_hits) * 100 if total_hits > 0 else 0
            print(f"{i:<4} {city:<30} {hits:<10} {percentage:<7.2f}%")
    
    def create_country_chart(self, output_file='country_stats.png', top_n=10):
        """创建国家访问量图表"""
        try:
            # 获取前N个国家数据
            top_countries = dict(self.country_stats.most_common(top_n))
            
            # 创建图表
            plt.figure(figsize=(12, 8))
            countries = list(top_countries.keys())
            hits = list(top_countries.values())
            
            # 创建条形图
            bars = plt.bar(countries, hits, color='skyblue', edgecolor='navy', alpha=0.7)
            
            # 添加数值标签
            for bar, hit in zip(bars, hits):
                plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + max(hits)*0.01,
                        str(hit), ha='center', va='bottom', fontweight='bold')
            
            plt.title(f'访问量前{top_n}的国家', fontsize=16, fontweight='bold')
            plt.xlabel('国家', fontsize=12)
            plt.ylabel('访问量', fontsize=12)
            plt.xticks(rotation=45, ha='right')
            plt.grid(axis='y', alpha=0.3)
            plt.tight_layout()
            
            plt.savefig(output_file, dpi=300, bbox_inches='tight')
            print(f"\n国家访问量图表已保存: {output_file}")
            
        except ImportError:
            print("需要安装matplotlib: pip install matplotlib")
        except Exception as e:
            print(f"创建图表时出错: {e}")
    
    def create_pie_chart(self, output_file='country_pie.png', top_n=8):
        """创建国家访问量饼图"""
        try:
            # 获取前N个国家数据
            top_countries = dict(self.country_stats.most_common(top_n))
            
            # 计算其他国家的总和
            total_hits = sum(self.country_stats.values())
            top_hits = sum(top_countries.values())
            other_hits = total_hits - top_hits
            
            if other_hits > 0:
                top_countries['其他'] = other_hits
            
            # 创建饼图
            plt.figure(figsize=(10, 8))
            countries = list(top_countries.keys())
            hits = list(top_countries.values())
            
            # 设置颜色
            colors = plt.cm.Set3(range(len(countries)))
            
            # 创建饼图
            wedges, texts, autotexts = plt.pie(hits, labels=countries, autopct='%1.1f%%',
                                              colors=colors, startangle=90)
            
            # 美化文本
            for autotext in autotexts:
                autotext.set_color('white')
                autotext.set_fontweight('bold')
            
            plt.title('访问量国家分布', fontsize=16, fontweight='bold')
            plt.axis('equal')
            
            plt.savefig(output_file, dpi=300, bbox_inches='tight')
            print(f"国家分布饼图已保存: {output_file}")
            
        except ImportError:
            print("需要安装matplotlib: pip install matplotlib")
        except Exception as e:
            print(f"创建饼图时出错: {e}")
    
    def export_to_csv(self, output_file='geo_stats.csv'):
        """导出地理位置统计到CSV"""
        try:
            # 准备数据
            data = []
            
            # 国家数据
            for country, hits in self.country_stats.most_common():
                data.append({
                    'type': 'country',
                    'location': country,
                    'hits': hits,
                    'percentage': (hits / sum(self.country_stats.values())) * 100
                })
            
            # 城市数据
            for city, hits in self.city_stats.most_common():
                data.append({
                    'type': 'city',
                    'location': city,
                    'hits': hits,
                    'percentage': (hits / sum(self.city_stats.values())) * 100
                })
            
            # 创建DataFrame并保存
            df = pd.DataFrame(data)
            df.to_csv(output_file, index=False, encoding='utf-8')
            print(f"\n地理位置统计已导出: {output_file}")
            
        except ImportError:
            print("需要安装pandas: pip install pandas")
        except Exception as e:
            print(f"导出CSV时出错: {e}")
    
    def run_analysis(self):
        """运行完整分析"""
        print("开始地理位置分析...")
        print("=" * 50)
        
        # 生成GoAccess报告
        report_data = self.generate_goaccess_report()
        if not report_data:
            return
        
        # 分析地理位置数据
        self.analyze_geo_data(report_data)
        
        if not self.country_stats:
            print("没有找到地理位置数据,请确保:")
            print("1. 已安装GeoIP数据库")
            print("2. GoAccess配置中启用了GeoIP")
            return
        
        # 生成报告
        self.generate_country_report()
        self.generate_city_report()
        
        # 创建图表
        self.create_country_chart()
        self.create_pie_chart()
        
        # 导出数据
        self.export_to_csv()
        
        print("\n地理位置分析完成!")

def main():
    """主函数"""
    if len(sys.argv) != 2:
        print("用法: python3 geo_analysis.py <log_file>")
        sys.exit(1)
    
    log_file = sys.argv[1]
    analyzer = GeoAnalyzer(log_file)
    analyzer.run_analysis()

if __name__ == "__main__":
    main()

3. 自定义统计面板

3.1 创建自定义面板

# 在配置文件中定义自定义面板
# 例如:API端点统计
static-file .api

# 自定义日志格式以包含更多信息
log-format %h %^[%d:%t %^] "%r" %s %b "%R" "%u" %T %D

3.2 API分析脚本

#!/usr/bin/env python3
# api_analysis.py - API端点分析工具

import re
import json
import sys
from collections import defaultdict, Counter
from urllib.parse import urlparse, parse_qs

class APIAnalyzer:
    def __init__(self, log_file):
        self.log_file = log_file
        self.api_stats = defaultdict(lambda: {
            'count': 0,
            'methods': Counter(),
            'status_codes': Counter(),
            'response_times': [],
            'response_sizes': []
        })
        self.error_patterns = Counter()
    
    def parse_log_line(self, line):
        """解析日志行"""
        # 扩展的日志格式,包含响应时间
        pattern = r'^(\S+) \S+ \S+ \[([^\]]+)\] "(\S+) ([^"]+) HTTP/[^"]+" (\d+) (\S+) "([^"]+)" "([^"]+)"(?:\s+(\S+))?'
        match = re.match(pattern, line)
        
        if not match:
            return None
        
        return {
            'ip': match.group(1),
            'timestamp': match.group(2),
            'method': match.group(3),
            'path': match.group(4),
            'status': match.group(5),
            'size': match.group(6),
            'referer': match.group(7),
            'user_agent': match.group(8),
            'response_time': match.group(9) if match.group(9) else None
        }
    
    def is_api_request(self, path):
        """判断是否为API请求"""
        api_patterns = [
            r'^/api/',
            r'^/v\d+/',
            r'\.json$',
            r'\.xml$',
            r'/rest/',
            r'/graphql'
        ]
        
        for pattern in api_patterns:
            if re.search(pattern, path, re.IGNORECASE):
                return True
        return False
    
    def normalize_api_path(self, path):
        """标准化API路径"""
        # 移除查询参数
        parsed = urlparse(path)
        path = parsed.path
        
        # 替换数字ID为占位符
        path = re.sub(r'/\d+(?=/|$)', '/{id}', path)
        
        # 替换UUID为占位符
        path = re.sub(r'/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}(?=/|$)', '/{uuid}', path, flags=re.IGNORECASE)
        
        # 替换其他常见模式
        path = re.sub(r'/[a-f0-9]{24}(?=/|$)', '/{objectid}', path)  # MongoDB ObjectId
        
        return path
    
    def analyze_logs(self):
        """分析日志文件"""
        print("分析API访问日志...")
        
        total_lines = 0
        api_requests = 0
        
        try:
            with open(self.log_file, 'r', encoding='utf-8', errors='ignore') as f:
                for line in f:
                    total_lines += 1
                    line = line.strip()
                    
                    log_entry = self.parse_log_line(line)
                    if not log_entry:
                        continue
                    
                    path = log_entry['path']
                    
                    # 只分析API请求
                    if not self.is_api_request(path):
                        continue
                    
                    api_requests += 1
                    
                    # 标准化路径
                    normalized_path = self.normalize_api_path(path)
                    
                    # 更新统计
                    stats = self.api_stats[normalized_path]
                    stats['count'] += 1
                    stats['methods'][log_entry['method']] += 1
                    stats['status_codes'][log_entry['status']] += 1
                    
                    # 响应大小
                    try:
                        size = int(log_entry['size']) if log_entry['size'] != '-' else 0
                        stats['response_sizes'].append(size)
                    except ValueError:
                        pass
                    
                    # 响应时间
                    if log_entry['response_time']:
                        try:
                            response_time = float(log_entry['response_time'])
                            stats['response_times'].append(response_time)
                        except ValueError:
                            pass
                    
                    # 错误模式分析
                    if log_entry['status'].startswith(('4', '5')):
                        self.error_patterns[f"{log_entry['method']} {normalized_path} -> {log_entry['status']}"] += 1
        
        except Exception as e:
            print(f"分析日志时出错: {e}")
            return
        
        print(f"总日志行数: {total_lines}")
        print(f"API请求数: {api_requests}")
        print(f"唯一API端点: {len(self.api_stats)}")
    
    def generate_api_report(self, top_n=20):
        """生成API端点报告"""
        print(f"\n访问量前{top_n}的API端点:")
        print("=" * 80)
        print(f"{'排名':<4} {'端点':<40} {'请求数':<8} {'主要方法':<8} {'错误率':<8}")
        print("-" * 80)
        
        # 按请求数排序
        sorted_apis = sorted(self.api_stats.items(), key=lambda x: x[1]['count'], reverse=True)
        
        for i, (endpoint, stats) in enumerate(sorted_apis[:top_n], 1):
            # 主要HTTP方法
            main_method = stats['methods'].most_common(1)[0][0] if stats['methods'] else 'N/A'
            
            # 错误率计算
            total_requests = stats['count']
            error_requests = sum(count for status, count in stats['status_codes'].items() 
                               if status.startswith(('4', '5')))
            error_rate = (error_requests / total_requests) * 100 if total_requests > 0 else 0
            
            print(f"{i:<4} {endpoint:<40} {total_requests:<8} {main_method:<8} {error_rate:<7.1f}%")
    
    def generate_method_report(self):
        """生成HTTP方法统计报告"""
        method_stats = Counter()
        
        for stats in self.api_stats.values():
            for method, count in stats['methods'].items():
                method_stats[method] += count
        
        print("\nHTTP方法统计:")
        print("=" * 30)
        print(f"{'方法':<8} {'请求数':<10} {'百分比':<8}")
        print("-" * 30)
        
        total_requests = sum(method_stats.values())
        
        for method, count in method_stats.most_common():
            percentage = (count / total_requests) * 100 if total_requests > 0 else 0
            print(f"{method:<8} {count:<10} {percentage:<7.1f}%")
    
    def generate_error_report(self, top_n=15):
        """生成错误报告"""
        print(f"\n错误频率前{top_n}的API:")
        print("=" * 70)
        print(f"{'排名':<4} {'错误模式':<50} {'次数':<8}")
        print("-" * 70)
        
        for i, (pattern, count) in enumerate(self.error_patterns.most_common(top_n), 1):
            print(f"{i:<4} {pattern:<50} {count:<8}")
    
    def generate_performance_report(self):
        """生成性能报告"""
        print("\nAPI性能统计:")
        print("=" * 60)
        print(f"{'端点':<40} {'平均响应时间':<12} {'平均大小':<10}")
        print("-" * 60)
        
        performance_data = []
        
        for endpoint, stats in self.api_stats.items():
            if stats['response_times']:
                avg_time = sum(stats['response_times']) / len(stats['response_times'])
            else:
                avg_time = 0
            
            if stats['response_sizes']:
                avg_size = sum(stats['response_sizes']) / len(stats['response_sizes'])
            else:
                avg_size = 0
            
            performance_data.append((endpoint, avg_time, avg_size, stats['count']))
        
        # 按平均响应时间排序
        performance_data.sort(key=lambda x: x[1], reverse=True)
        
        for endpoint, avg_time, avg_size, count in performance_data[:15]:
            if avg_time > 0:  # 只显示有响应时间数据的
                print(f"{endpoint:<40} {avg_time:<11.3f}s {avg_size:<9.0f}B")
    
    def export_to_json(self, output_file='api_analysis.json'):
        """导出分析结果到JSON"""
        # 准备导出数据
        export_data = {
            'timestamp': str(datetime.now()),
            'summary': {
                'total_endpoints': len(self.api_stats),
                'total_requests': sum(stats['count'] for stats in self.api_stats.values())
            },
            'endpoints': {}
        }
        
        for endpoint, stats in self.api_stats.items():
            export_data['endpoints'][endpoint] = {
                'count': stats['count'],
                'methods': dict(stats['methods']),
                'status_codes': dict(stats['status_codes']),
                'avg_response_time': sum(stats['response_times']) / len(stats['response_times']) if stats['response_times'] else 0,
                'avg_response_size': sum(stats['response_sizes']) / len(stats['response_sizes']) if stats['response_sizes'] else 0
            }
        
        export_data['errors'] = dict(self.error_patterns)
        
        try:
            with open(output_file, 'w', encoding='utf-8') as f:
                json.dump(export_data, f, indent=2, ensure_ascii=False)
            print(f"\n分析结果已导出: {output_file}")
        except Exception as e:
            print(f"导出JSON时出错: {e}")
    
    def run_analysis(self):
        """运行完整分析"""
        self.analyze_logs()
        
        if not self.api_stats:
            print("没有找到API请求")
            return
        
        self.generate_api_report()
        self.generate_method_report()
        self.generate_error_report()
        self.generate_performance_report()
        self.export_to_json()

def main():
    """主函数"""
    if len(sys.argv) != 2:
        print("用法: python3 api_analysis.py <log_file>")
        sys.exit(1)
    
    log_file = sys.argv[1]
    analyzer = APIAnalyzer(log_file)
    analyzer.run_analysis()

if __name__ == "__main__":
    main()

4. 性能优化技巧

4.1 大文件处理优化

# 使用内存映射
goaccess /var/log/nginx/access.log \
  --log-format=COMBINED \
  --enable-mmap \
  --no-progress

# 增加缓存大小
goaccess /var/log/nginx/access.log \
  --log-format=COMBINED \
  --cache-lcnum=2000000 \
  --cache-ncnum=131072

# 使用磁盘存储
goaccess /var/log/nginx/access.log \
  --log-format=COMBINED \
  --keep-db-files \
  --db-path=/tmp/goaccess

4.2 性能监控脚本

#!/usr/bin/env python3
# performance_monitor.py - GoAccess性能监控

import psutil
import time
import subprocess
import sys
from datetime import datetime

class GoAccessMonitor:
    def __init__(self, log_file, output_file):
        self.log_file = log_file
        self.output_file = output_file
        self.process = None
        self.stats = []
    
    def start_goaccess(self):
        """启动GoAccess进程"""
        cmd = [
            'goaccess', self.log_file,
            '--log-format=COMBINED',
            '--real-time-html',
            '--port=7890',
            '--addr=0.0.0.0',
            '-o', self.output_file
        ]
        
        try:
            self.process = subprocess.Popen(cmd)
            print(f"GoAccess进程已启动,PID: {self.process.pid}")
            return True
        except Exception as e:
            print(f"启动GoAccess失败: {e}")
            return False
    
    def monitor_performance(self, duration=300, interval=5):
        """监控性能指标"""
        if not self.process:
            print("GoAccess进程未启动")
            return
        
        print(f"开始监控性能,持续{duration}秒,间隔{interval}秒")
        print("时间\t\tCPU%\t内存MB\t文件描述符")
        print("-" * 50)
        
        start_time = time.time()
        
        try:
            process = psutil.Process(self.process.pid)
            
            while time.time() - start_time < duration:
                if not process.is_running():
                    print("GoAccess进程已停止")
                    break
                
                # 获取性能指标
                cpu_percent = process.cpu_percent()
                memory_mb = process.memory_info().rss / 1024 / 1024
                num_fds = process.num_fds() if hasattr(process, 'num_fds') else 0
                
                timestamp = datetime.now().strftime('%H:%M:%S')
                
                print(f"{timestamp}\t{cpu_percent:.1f}\t{memory_mb:.1f}\t\t{num_fds}")
                
                # 记录统计数据
                self.stats.append({
                    'timestamp': timestamp,
                    'cpu_percent': cpu_percent,
                    'memory_mb': memory_mb,
                    'num_fds': num_fds
                })
                
                time.sleep(interval)
        
        except psutil.NoSuchProcess:
            print("GoAccess进程不存在")
        except KeyboardInterrupt:
            print("\n监控被中断")
    
    def generate_performance_report(self):
        """生成性能报告"""
        if not self.stats:
            print("没有性能数据")
            return
        
        # 计算统计信息
        cpu_values = [stat['cpu_percent'] for stat in self.stats]
        memory_values = [stat['memory_mb'] for stat in self.stats]
        
        avg_cpu = sum(cpu_values) / len(cpu_values)
        max_cpu = max(cpu_values)
        avg_memory = sum(memory_values) / len(memory_values)
        max_memory = max(memory_values)
        
        print("\n性能统计报告:")
        print("=" * 30)
        print(f"平均CPU使用率: {avg_cpu:.2f}%")
        print(f"最大CPU使用率: {max_cpu:.2f}%")
        print(f"平均内存使用: {avg_memory:.2f}MB")
        print(f"最大内存使用: {max_memory:.2f}MB")
        print(f"监控数据点: {len(self.stats)}个")
    
    def stop_goaccess(self):
        """停止GoAccess进程"""
        if self.process:
            self.process.terminate()
            self.process.wait()
            print("GoAccess进程已停止")

def main():
    """主函数"""
    if len(sys.argv) != 3:
        print("用法: python3 performance_monitor.py <log_file> <output_file>")
        sys.exit(1)
    
    log_file = sys.argv[1]
    output_file = sys.argv[2]
    
    monitor = GoAccessMonitor(log_file, output_file)
    
    try:
        if monitor.start_goaccess():
            time.sleep(2)  # 等待进程启动
            monitor.monitor_performance(duration=60, interval=2)
            monitor.generate_performance_report()
    finally:
        monitor.stop_goaccess()

if __name__ == "__main__":
    main()

5. 下一步

掌握了高级功能后,您可以:

  1. 学习实时监控和告警设置
  2. 了解与其他工具的集成方法
  3. 掌握自动化部署和管理
  4. 学习安全分析和威胁检测
  5. 探索自定义开发和扩展

下一章我们将介绍GoAccess的实时监控和告警功能。