1. 高级过滤功能
1.1 IP地址过滤
# 配置文件中设置
ignore-ip 127.0.0.1
ignore-ip ::1
ignore-ip 192.168.1.0/24
ignore-ip 10.0.0.0/8
# 命令行参数
goaccess /var/log/nginx/access.log \
--ignore-ip=127.0.0.1 \
--ignore-ip=192.168.1.0/24 \
--log-format=COMBINED
1.2 状态码过滤
# 忽略特定状态码
ignore-status 200
ignore-status 304
ignore-status 404
# 命令行使用
goaccess /var/log/nginx/access.log \
--ignore-status=200 \
--ignore-status=304 \
--log-format=COMBINED
1.3 面板过滤
# 隐藏不需要的统计面板
ignore-panel VISITORS
ignore-panel REQUESTS_STATIC
ignore-panel OS
ignore-panel BROWSERS
# 可用面板列表
# VISITORS - 访问者统计
# REQUESTS - 请求统计
# REQUESTS_STATIC - 静态请求
# NOT_FOUND - 404错误
# HOSTS - 主机统计
# OS - 操作系统
# BROWSERS - 浏览器
# VISIT_TIMES - 访问时间
# VIRTUAL_HOSTS - 虚拟主机
# REFERRERS - 引用页面
# REFERRING_SITES - 引用站点
# KEYPHRASES - 关键词
# STATUS_CODES - 状态码
# REMOTE_USER - 远程用户
# CACHE_STATUS - 缓存状态
1.4 高级过滤脚本
#!/usr/bin/env python3
# advanced_filter.py - GoAccess高级过滤工具
import re
import sys
import argparse
from datetime import datetime, timedelta
from ipaddress import ip_network, ip_address
class LogFilter:
def __init__(self):
self.filters = {
'ip_whitelist': [],
'ip_blacklist': [],
'status_codes': [],
'user_agents': [],
'time_range': None,
'request_patterns': [],
'min_response_size': None,
'max_response_size': None
}
def add_ip_filter(self, ip_range, filter_type='blacklist'):
"""添加IP过滤规则"""
try:
network = ip_network(ip_range, strict=False)
if filter_type == 'blacklist':
self.filters['ip_blacklist'].append(network)
else:
self.filters['ip_whitelist'].append(network)
except Exception as e:
print(f"无效的IP范围 {ip_range}: {e}")
def add_status_filter(self, status_codes):
"""添加状态码过滤"""
if isinstance(status_codes, str):
status_codes = [status_codes]
self.filters['status_codes'].extend(status_codes)
def add_time_filter(self, start_time, end_time):
"""添加时间范围过滤"""
self.filters['time_range'] = (start_time, end_time)
def add_user_agent_filter(self, patterns):
"""添加User-Agent过滤"""
if isinstance(patterns, str):
patterns = [patterns]
self.filters['user_agents'].extend(patterns)
def add_request_pattern_filter(self, patterns):
"""添加请求模式过滤"""
if isinstance(patterns, str):
patterns = [patterns]
self.filters['request_patterns'].extend(patterns)
def set_response_size_filter(self, min_size=None, max_size=None):
"""设置响应大小过滤"""
self.filters['min_response_size'] = min_size
self.filters['max_response_size'] = max_size
def parse_log_line(self, line):
"""解析日志行"""
# Apache/Nginx Combined格式
pattern = r'^(\S+) \S+ \S+ \[([^\]]+)\] "([^"]+)" (\d+) (\S+) "([^"]+)" "([^"]+)"'
match = re.match(pattern, line)
if not match:
return None
return {
'ip': match.group(1),
'timestamp': match.group(2),
'request': match.group(3),
'status': match.group(4),
'size': match.group(5),
'referer': match.group(6),
'user_agent': match.group(7)
}
def should_include_line(self, log_entry):
"""判断是否应该包含此日志行"""
if not log_entry:
return False
# IP过滤
try:
client_ip = ip_address(log_entry['ip'])
# 黑名单检查
for network in self.filters['ip_blacklist']:
if client_ip in network:
return False
# 白名单检查
if self.filters['ip_whitelist']:
in_whitelist = False
for network in self.filters['ip_whitelist']:
if client_ip in network:
in_whitelist = True
break
if not in_whitelist:
return False
except Exception:
pass # 忽略IP解析错误
# 状态码过滤
if self.filters['status_codes']:
if log_entry['status'] in self.filters['status_codes']:
return False
# User-Agent过滤
for pattern in self.filters['user_agents']:
if re.search(pattern, log_entry['user_agent'], re.IGNORECASE):
return False
# 请求模式过滤
for pattern in self.filters['request_patterns']:
if re.search(pattern, log_entry['request'], re.IGNORECASE):
return False
# 响应大小过滤
try:
size = int(log_entry['size']) if log_entry['size'] != '-' else 0
if self.filters['min_response_size'] and size < self.filters['min_response_size']:
return False
if self.filters['max_response_size'] and size > self.filters['max_response_size']:
return False
except ValueError:
pass
return True
def filter_log_file(self, input_file, output_file=None):
"""过滤日志文件"""
filtered_lines = []
total_lines = 0
filtered_count = 0
try:
with open(input_file, 'r', encoding='utf-8', errors='ignore') as f:
for line in f:
total_lines += 1
line = line.strip()
log_entry = self.parse_log_line(line)
if self.should_include_line(log_entry):
filtered_lines.append(line)
else:
filtered_count += 1
# 输出结果
if output_file:
with open(output_file, 'w', encoding='utf-8') as f:
for line in filtered_lines:
f.write(line + '\n')
print(f"过滤后的日志已保存到: {output_file}")
else:
for line in filtered_lines:
print(line)
print(f"\n过滤统计:")
print(f"总行数: {total_lines}")
print(f"过滤掉: {filtered_count}")
print(f"保留: {len(filtered_lines)}")
print(f"过滤率: {filtered_count/total_lines*100:.2f}%")
return filtered_lines
except Exception as e:
print(f"处理日志文件时出错: {e}")
return []
def create_bot_filter():
"""创建机器人过滤器"""
filter_obj = LogFilter()
# 常见爬虫User-Agent模式
bot_patterns = [
r'bot',
r'crawler',
r'spider',
r'scraper',
r'Googlebot',
r'Bingbot',
r'Slurp',
r'DuckDuckBot',
r'Baiduspider',
r'YandexBot',
r'facebookexternalhit',
r'Twitterbot',
r'LinkedInBot',
r'WhatsApp',
r'Telegram'
]
filter_obj.add_user_agent_filter(bot_patterns)
return filter_obj
def create_security_filter():
"""创建安全过滤器"""
filter_obj = LogFilter()
# 常见攻击模式
attack_patterns = [
r'\.\./\.\./', # 目录遍历
r'<script', # XSS
r'union.*select', # SQL注入
r'eval\(', # 代码注入
r'base64_decode', # 恶意代码
r'system\(', # 命令注入
r'exec\(', # 命令执行
r'/etc/passwd', # 敏感文件访问
r'wp-admin', # WordPress攻击
r'phpmyadmin', # 数据库管理工具攻击
]
filter_obj.add_request_pattern_filter(attack_patterns)
# 过滤常见攻击状态码
filter_obj.add_status_filter(['400', '401', '403', '404', '500'])
return filter_obj
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='GoAccess高级日志过滤工具')
parser.add_argument('input_file', help='输入日志文件')
parser.add_argument('-o', '--output', help='输出文件路径')
parser.add_argument('--filter-bots', action='store_true', help='过滤机器人')
parser.add_argument('--filter-attacks', action='store_true', help='过滤攻击请求')
parser.add_argument('--ignore-ip', action='append', help='忽略的IP地址或网段')
parser.add_argument('--ignore-status', action='append', help='忽略的状态码')
parser.add_argument('--min-size', type=int, help='最小响应大小')
parser.add_argument('--max-size', type=int, help='最大响应大小')
args = parser.parse_args()
# 创建过滤器
log_filter = LogFilter()
# 应用预定义过滤器
if args.filter_bots:
bot_filter = create_bot_filter()
log_filter.filters['user_agents'].extend(bot_filter.filters['user_agents'])
if args.filter_attacks:
security_filter = create_security_filter()
log_filter.filters['request_patterns'].extend(security_filter.filters['request_patterns'])
log_filter.filters['status_codes'].extend(security_filter.filters['status_codes'])
# 应用命令行参数
if args.ignore_ip:
for ip in args.ignore_ip:
log_filter.add_ip_filter(ip, 'blacklist')
if args.ignore_status:
log_filter.add_status_filter(args.ignore_status)
if args.min_size or args.max_size:
log_filter.set_response_size_filter(args.min_size, args.max_size)
# 执行过滤
log_filter.filter_log_file(args.input_file, args.output)
if __name__ == "__main__":
main()
2. 地理位置分析
2.1 GeoIP配置
# 安装GeoIP数据库
sudo apt-get install geoip-database geoip-database-extra
# 或下载最新数据库
wget http://geolite.maxmind.com/download/geoip/database/GeoLiteCountry/GeoIP.dat.gz
gunzip GeoIP.dat.gz
sudo mv GeoIP.dat /usr/share/GeoIP/
# 配置GoAccess
geoip-database /usr/share/GeoIP/GeoIP.dat
2.2 GeoIP数据库更新脚本
#!/bin/bash
# update_geoip.sh - GeoIP数据库更新脚本
GEOIP_DIR="/usr/share/GeoIP"
TEMP_DIR="/tmp/geoip_update"
LOG_FILE="/var/log/geoip_update.log"
# 创建临时目录
mkdir -p $TEMP_DIR
cd $TEMP_DIR
echo "$(date): 开始更新GeoIP数据库" >> $LOG_FILE
# 下载最新数据库
echo "下载GeoIP数据库..."
wget -q http://geolite.maxmind.com/download/geoip/database/GeoLiteCountry/GeoIP.dat.gz
wget -q http://geolite.maxmind.com/download/geoip/database/GeoLiteCity/GeoLiteCity.dat.gz
if [ $? -eq 0 ]; then
echo "$(date): 下载成功" >> $LOG_FILE
# 解压文件
gunzip -f *.gz
# 备份旧文件
if [ -f "$GEOIP_DIR/GeoIP.dat" ]; then
cp "$GEOIP_DIR/GeoIP.dat" "$GEOIP_DIR/GeoIP.dat.bak"
fi
if [ -f "$GEOIP_DIR/GeoLiteCity.dat" ]; then
cp "$GEOIP_DIR/GeoLiteCity.dat" "$GEOIP_DIR/GeoLiteCity.dat.bak"
fi
# 安装新文件
sudo mv GeoIP.dat $GEOIP_DIR/
sudo mv GeoLiteCity.dat $GEOIP_DIR/
# 设置权限
sudo chmod 644 $GEOIP_DIR/GeoIP.dat
sudo chmod 644 $GEOIP_DIR/GeoLiteCity.dat
echo "$(date): GeoIP数据库更新完成" >> $LOG_FILE
# 重启GoAccess服务(如果在运行)
if systemctl is-active --quiet goaccess; then
sudo systemctl restart goaccess
echo "$(date): GoAccess服务已重启" >> $LOG_FILE
fi
else
echo "$(date): 下载失败" >> $LOG_FILE
fi
# 清理临时文件
rm -rf $TEMP_DIR
echo "$(date): GeoIP更新任务完成" >> $LOG_FILE
2.3 地理位置分析脚本
#!/usr/bin/env python3
# geo_analysis.py - 地理位置访问分析
import json
import sys
import subprocess
from collections import defaultdict, Counter
import matplotlib.pyplot as plt
import pandas as pd
from datetime import datetime
class GeoAnalyzer:
def __init__(self, log_file):
self.log_file = log_file
self.geo_data = defaultdict(int)
self.country_stats = Counter()
self.city_stats = Counter()
def generate_goaccess_report(self):
"""生成GoAccess JSON报告"""
cmd = [
'goaccess', self.log_file,
'--log-format=COMBINED',
'--json-pretty-print'
]
try:
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
return json.loads(result.stdout)
else:
print(f"GoAccess执行失败: {result.stderr}")
return None
except Exception as e:
print(f"执行GoAccess时出错: {e}")
return None
def analyze_geo_data(self, report_data):
"""分析地理位置数据"""
if not report_data or 'hosts' not in report_data:
print("报告中没有主机数据")
return
hosts_data = report_data['hosts']['data']
for host in hosts_data:
country = host.get('country', 'Unknown')
city = host.get('city', 'Unknown')
hits = host.get('hits', 0)
self.country_stats[country] += hits
if city != 'Unknown':
self.city_stats[f"{city}, {country}"] += hits
def generate_country_report(self, top_n=20):
"""生成国家访问报告"""
print(f"\n访问量前{top_n}的国家:")
print("=" * 50)
print(f"{'排名':<4} {'国家':<20} {'访问量':<10} {'百分比':<8}")
print("-" * 50)
total_hits = sum(self.country_stats.values())
for i, (country, hits) in enumerate(self.country_stats.most_common(top_n), 1):
percentage = (hits / total_hits) * 100 if total_hits > 0 else 0
print(f"{i:<4} {country:<20} {hits:<10} {percentage:<7.2f}%")
def generate_city_report(self, top_n=15):
"""生成城市访问报告"""
print(f"\n访问量前{top_n}的城市:")
print("=" * 60)
print(f"{'排名':<4} {'城市':<30} {'访问量':<10} {'百分比':<8}")
print("-" * 60)
total_hits = sum(self.city_stats.values())
for i, (city, hits) in enumerate(self.city_stats.most_common(top_n), 1):
percentage = (hits / total_hits) * 100 if total_hits > 0 else 0
print(f"{i:<4} {city:<30} {hits:<10} {percentage:<7.2f}%")
def create_country_chart(self, output_file='country_stats.png', top_n=10):
"""创建国家访问量图表"""
try:
# 获取前N个国家数据
top_countries = dict(self.country_stats.most_common(top_n))
# 创建图表
plt.figure(figsize=(12, 8))
countries = list(top_countries.keys())
hits = list(top_countries.values())
# 创建条形图
bars = plt.bar(countries, hits, color='skyblue', edgecolor='navy', alpha=0.7)
# 添加数值标签
for bar, hit in zip(bars, hits):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + max(hits)*0.01,
str(hit), ha='center', va='bottom', fontweight='bold')
plt.title(f'访问量前{top_n}的国家', fontsize=16, fontweight='bold')
plt.xlabel('国家', fontsize=12)
plt.ylabel('访问量', fontsize=12)
plt.xticks(rotation=45, ha='right')
plt.grid(axis='y', alpha=0.3)
plt.tight_layout()
plt.savefig(output_file, dpi=300, bbox_inches='tight')
print(f"\n国家访问量图表已保存: {output_file}")
except ImportError:
print("需要安装matplotlib: pip install matplotlib")
except Exception as e:
print(f"创建图表时出错: {e}")
def create_pie_chart(self, output_file='country_pie.png', top_n=8):
"""创建国家访问量饼图"""
try:
# 获取前N个国家数据
top_countries = dict(self.country_stats.most_common(top_n))
# 计算其他国家的总和
total_hits = sum(self.country_stats.values())
top_hits = sum(top_countries.values())
other_hits = total_hits - top_hits
if other_hits > 0:
top_countries['其他'] = other_hits
# 创建饼图
plt.figure(figsize=(10, 8))
countries = list(top_countries.keys())
hits = list(top_countries.values())
# 设置颜色
colors = plt.cm.Set3(range(len(countries)))
# 创建饼图
wedges, texts, autotexts = plt.pie(hits, labels=countries, autopct='%1.1f%%',
colors=colors, startangle=90)
# 美化文本
for autotext in autotexts:
autotext.set_color('white')
autotext.set_fontweight('bold')
plt.title('访问量国家分布', fontsize=16, fontweight='bold')
plt.axis('equal')
plt.savefig(output_file, dpi=300, bbox_inches='tight')
print(f"国家分布饼图已保存: {output_file}")
except ImportError:
print("需要安装matplotlib: pip install matplotlib")
except Exception as e:
print(f"创建饼图时出错: {e}")
def export_to_csv(self, output_file='geo_stats.csv'):
"""导出地理位置统计到CSV"""
try:
# 准备数据
data = []
# 国家数据
for country, hits in self.country_stats.most_common():
data.append({
'type': 'country',
'location': country,
'hits': hits,
'percentage': (hits / sum(self.country_stats.values())) * 100
})
# 城市数据
for city, hits in self.city_stats.most_common():
data.append({
'type': 'city',
'location': city,
'hits': hits,
'percentage': (hits / sum(self.city_stats.values())) * 100
})
# 创建DataFrame并保存
df = pd.DataFrame(data)
df.to_csv(output_file, index=False, encoding='utf-8')
print(f"\n地理位置统计已导出: {output_file}")
except ImportError:
print("需要安装pandas: pip install pandas")
except Exception as e:
print(f"导出CSV时出错: {e}")
def run_analysis(self):
"""运行完整分析"""
print("开始地理位置分析...")
print("=" * 50)
# 生成GoAccess报告
report_data = self.generate_goaccess_report()
if not report_data:
return
# 分析地理位置数据
self.analyze_geo_data(report_data)
if not self.country_stats:
print("没有找到地理位置数据,请确保:")
print("1. 已安装GeoIP数据库")
print("2. GoAccess配置中启用了GeoIP")
return
# 生成报告
self.generate_country_report()
self.generate_city_report()
# 创建图表
self.create_country_chart()
self.create_pie_chart()
# 导出数据
self.export_to_csv()
print("\n地理位置分析完成!")
def main():
"""主函数"""
if len(sys.argv) != 2:
print("用法: python3 geo_analysis.py <log_file>")
sys.exit(1)
log_file = sys.argv[1]
analyzer = GeoAnalyzer(log_file)
analyzer.run_analysis()
if __name__ == "__main__":
main()
3. 自定义统计面板
3.1 创建自定义面板
# 在配置文件中定义自定义面板
# 例如:API端点统计
static-file .api
# 自定义日志格式以包含更多信息
log-format %h %^[%d:%t %^] "%r" %s %b "%R" "%u" %T %D
3.2 API分析脚本
#!/usr/bin/env python3
# api_analysis.py - API端点分析工具
import re
import json
import sys
from collections import defaultdict, Counter
from urllib.parse import urlparse, parse_qs
class APIAnalyzer:
def __init__(self, log_file):
self.log_file = log_file
self.api_stats = defaultdict(lambda: {
'count': 0,
'methods': Counter(),
'status_codes': Counter(),
'response_times': [],
'response_sizes': []
})
self.error_patterns = Counter()
def parse_log_line(self, line):
"""解析日志行"""
# 扩展的日志格式,包含响应时间
pattern = r'^(\S+) \S+ \S+ \[([^\]]+)\] "(\S+) ([^"]+) HTTP/[^"]+" (\d+) (\S+) "([^"]+)" "([^"]+)"(?:\s+(\S+))?'
match = re.match(pattern, line)
if not match:
return None
return {
'ip': match.group(1),
'timestamp': match.group(2),
'method': match.group(3),
'path': match.group(4),
'status': match.group(5),
'size': match.group(6),
'referer': match.group(7),
'user_agent': match.group(8),
'response_time': match.group(9) if match.group(9) else None
}
def is_api_request(self, path):
"""判断是否为API请求"""
api_patterns = [
r'^/api/',
r'^/v\d+/',
r'\.json$',
r'\.xml$',
r'/rest/',
r'/graphql'
]
for pattern in api_patterns:
if re.search(pattern, path, re.IGNORECASE):
return True
return False
def normalize_api_path(self, path):
"""标准化API路径"""
# 移除查询参数
parsed = urlparse(path)
path = parsed.path
# 替换数字ID为占位符
path = re.sub(r'/\d+(?=/|$)', '/{id}', path)
# 替换UUID为占位符
path = re.sub(r'/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}(?=/|$)', '/{uuid}', path, flags=re.IGNORECASE)
# 替换其他常见模式
path = re.sub(r'/[a-f0-9]{24}(?=/|$)', '/{objectid}', path) # MongoDB ObjectId
return path
def analyze_logs(self):
"""分析日志文件"""
print("分析API访问日志...")
total_lines = 0
api_requests = 0
try:
with open(self.log_file, 'r', encoding='utf-8', errors='ignore') as f:
for line in f:
total_lines += 1
line = line.strip()
log_entry = self.parse_log_line(line)
if not log_entry:
continue
path = log_entry['path']
# 只分析API请求
if not self.is_api_request(path):
continue
api_requests += 1
# 标准化路径
normalized_path = self.normalize_api_path(path)
# 更新统计
stats = self.api_stats[normalized_path]
stats['count'] += 1
stats['methods'][log_entry['method']] += 1
stats['status_codes'][log_entry['status']] += 1
# 响应大小
try:
size = int(log_entry['size']) if log_entry['size'] != '-' else 0
stats['response_sizes'].append(size)
except ValueError:
pass
# 响应时间
if log_entry['response_time']:
try:
response_time = float(log_entry['response_time'])
stats['response_times'].append(response_time)
except ValueError:
pass
# 错误模式分析
if log_entry['status'].startswith(('4', '5')):
self.error_patterns[f"{log_entry['method']} {normalized_path} -> {log_entry['status']}"] += 1
except Exception as e:
print(f"分析日志时出错: {e}")
return
print(f"总日志行数: {total_lines}")
print(f"API请求数: {api_requests}")
print(f"唯一API端点: {len(self.api_stats)}")
def generate_api_report(self, top_n=20):
"""生成API端点报告"""
print(f"\n访问量前{top_n}的API端点:")
print("=" * 80)
print(f"{'排名':<4} {'端点':<40} {'请求数':<8} {'主要方法':<8} {'错误率':<8}")
print("-" * 80)
# 按请求数排序
sorted_apis = sorted(self.api_stats.items(), key=lambda x: x[1]['count'], reverse=True)
for i, (endpoint, stats) in enumerate(sorted_apis[:top_n], 1):
# 主要HTTP方法
main_method = stats['methods'].most_common(1)[0][0] if stats['methods'] else 'N/A'
# 错误率计算
total_requests = stats['count']
error_requests = sum(count for status, count in stats['status_codes'].items()
if status.startswith(('4', '5')))
error_rate = (error_requests / total_requests) * 100 if total_requests > 0 else 0
print(f"{i:<4} {endpoint:<40} {total_requests:<8} {main_method:<8} {error_rate:<7.1f}%")
def generate_method_report(self):
"""生成HTTP方法统计报告"""
method_stats = Counter()
for stats in self.api_stats.values():
for method, count in stats['methods'].items():
method_stats[method] += count
print("\nHTTP方法统计:")
print("=" * 30)
print(f"{'方法':<8} {'请求数':<10} {'百分比':<8}")
print("-" * 30)
total_requests = sum(method_stats.values())
for method, count in method_stats.most_common():
percentage = (count / total_requests) * 100 if total_requests > 0 else 0
print(f"{method:<8} {count:<10} {percentage:<7.1f}%")
def generate_error_report(self, top_n=15):
"""生成错误报告"""
print(f"\n错误频率前{top_n}的API:")
print("=" * 70)
print(f"{'排名':<4} {'错误模式':<50} {'次数':<8}")
print("-" * 70)
for i, (pattern, count) in enumerate(self.error_patterns.most_common(top_n), 1):
print(f"{i:<4} {pattern:<50} {count:<8}")
def generate_performance_report(self):
"""生成性能报告"""
print("\nAPI性能统计:")
print("=" * 60)
print(f"{'端点':<40} {'平均响应时间':<12} {'平均大小':<10}")
print("-" * 60)
performance_data = []
for endpoint, stats in self.api_stats.items():
if stats['response_times']:
avg_time = sum(stats['response_times']) / len(stats['response_times'])
else:
avg_time = 0
if stats['response_sizes']:
avg_size = sum(stats['response_sizes']) / len(stats['response_sizes'])
else:
avg_size = 0
performance_data.append((endpoint, avg_time, avg_size, stats['count']))
# 按平均响应时间排序
performance_data.sort(key=lambda x: x[1], reverse=True)
for endpoint, avg_time, avg_size, count in performance_data[:15]:
if avg_time > 0: # 只显示有响应时间数据的
print(f"{endpoint:<40} {avg_time:<11.3f}s {avg_size:<9.0f}B")
def export_to_json(self, output_file='api_analysis.json'):
"""导出分析结果到JSON"""
# 准备导出数据
export_data = {
'timestamp': str(datetime.now()),
'summary': {
'total_endpoints': len(self.api_stats),
'total_requests': sum(stats['count'] for stats in self.api_stats.values())
},
'endpoints': {}
}
for endpoint, stats in self.api_stats.items():
export_data['endpoints'][endpoint] = {
'count': stats['count'],
'methods': dict(stats['methods']),
'status_codes': dict(stats['status_codes']),
'avg_response_time': sum(stats['response_times']) / len(stats['response_times']) if stats['response_times'] else 0,
'avg_response_size': sum(stats['response_sizes']) / len(stats['response_sizes']) if stats['response_sizes'] else 0
}
export_data['errors'] = dict(self.error_patterns)
try:
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(export_data, f, indent=2, ensure_ascii=False)
print(f"\n分析结果已导出: {output_file}")
except Exception as e:
print(f"导出JSON时出错: {e}")
def run_analysis(self):
"""运行完整分析"""
self.analyze_logs()
if not self.api_stats:
print("没有找到API请求")
return
self.generate_api_report()
self.generate_method_report()
self.generate_error_report()
self.generate_performance_report()
self.export_to_json()
def main():
"""主函数"""
if len(sys.argv) != 2:
print("用法: python3 api_analysis.py <log_file>")
sys.exit(1)
log_file = sys.argv[1]
analyzer = APIAnalyzer(log_file)
analyzer.run_analysis()
if __name__ == "__main__":
main()
4. 性能优化技巧
4.1 大文件处理优化
# 使用内存映射
goaccess /var/log/nginx/access.log \
--log-format=COMBINED \
--enable-mmap \
--no-progress
# 增加缓存大小
goaccess /var/log/nginx/access.log \
--log-format=COMBINED \
--cache-lcnum=2000000 \
--cache-ncnum=131072
# 使用磁盘存储
goaccess /var/log/nginx/access.log \
--log-format=COMBINED \
--keep-db-files \
--db-path=/tmp/goaccess
4.2 性能监控脚本
#!/usr/bin/env python3
# performance_monitor.py - GoAccess性能监控
import psutil
import time
import subprocess
import sys
from datetime import datetime
class GoAccessMonitor:
def __init__(self, log_file, output_file):
self.log_file = log_file
self.output_file = output_file
self.process = None
self.stats = []
def start_goaccess(self):
"""启动GoAccess进程"""
cmd = [
'goaccess', self.log_file,
'--log-format=COMBINED',
'--real-time-html',
'--port=7890',
'--addr=0.0.0.0',
'-o', self.output_file
]
try:
self.process = subprocess.Popen(cmd)
print(f"GoAccess进程已启动,PID: {self.process.pid}")
return True
except Exception as e:
print(f"启动GoAccess失败: {e}")
return False
def monitor_performance(self, duration=300, interval=5):
"""监控性能指标"""
if not self.process:
print("GoAccess进程未启动")
return
print(f"开始监控性能,持续{duration}秒,间隔{interval}秒")
print("时间\t\tCPU%\t内存MB\t文件描述符")
print("-" * 50)
start_time = time.time()
try:
process = psutil.Process(self.process.pid)
while time.time() - start_time < duration:
if not process.is_running():
print("GoAccess进程已停止")
break
# 获取性能指标
cpu_percent = process.cpu_percent()
memory_mb = process.memory_info().rss / 1024 / 1024
num_fds = process.num_fds() if hasattr(process, 'num_fds') else 0
timestamp = datetime.now().strftime('%H:%M:%S')
print(f"{timestamp}\t{cpu_percent:.1f}\t{memory_mb:.1f}\t\t{num_fds}")
# 记录统计数据
self.stats.append({
'timestamp': timestamp,
'cpu_percent': cpu_percent,
'memory_mb': memory_mb,
'num_fds': num_fds
})
time.sleep(interval)
except psutil.NoSuchProcess:
print("GoAccess进程不存在")
except KeyboardInterrupt:
print("\n监控被中断")
def generate_performance_report(self):
"""生成性能报告"""
if not self.stats:
print("没有性能数据")
return
# 计算统计信息
cpu_values = [stat['cpu_percent'] for stat in self.stats]
memory_values = [stat['memory_mb'] for stat in self.stats]
avg_cpu = sum(cpu_values) / len(cpu_values)
max_cpu = max(cpu_values)
avg_memory = sum(memory_values) / len(memory_values)
max_memory = max(memory_values)
print("\n性能统计报告:")
print("=" * 30)
print(f"平均CPU使用率: {avg_cpu:.2f}%")
print(f"最大CPU使用率: {max_cpu:.2f}%")
print(f"平均内存使用: {avg_memory:.2f}MB")
print(f"最大内存使用: {max_memory:.2f}MB")
print(f"监控数据点: {len(self.stats)}个")
def stop_goaccess(self):
"""停止GoAccess进程"""
if self.process:
self.process.terminate()
self.process.wait()
print("GoAccess进程已停止")
def main():
"""主函数"""
if len(sys.argv) != 3:
print("用法: python3 performance_monitor.py <log_file> <output_file>")
sys.exit(1)
log_file = sys.argv[1]
output_file = sys.argv[2]
monitor = GoAccessMonitor(log_file, output_file)
try:
if monitor.start_goaccess():
time.sleep(2) # 等待进程启动
monitor.monitor_performance(duration=60, interval=2)
monitor.generate_performance_report()
finally:
monitor.stop_goaccess()
if __name__ == "__main__":
main()
5. 下一步
掌握了高级功能后,您可以:
- 学习实时监控和告警设置
- 了解与其他工具的集成方法
- 掌握自动化部署和管理
- 学习安全分析和威胁检测
- 探索自定义开发和扩展
下一章我们将介绍GoAccess的实时监控和告警功能。