1. ELK Stack集成
1.1 Logstash集成
# /etc/logstash/conf.d/goaccess.conf
# GoAccess日志处理配置
input {
# 读取GoAccess JSON输出
file {
path => "/var/log/goaccess/goaccess.json"
start_position => "beginning"
codec => "json"
tags => ["goaccess"]
}
# 读取原始访问日志
file {
path => "/var/log/nginx/access.log"
start_position => "beginning"
tags => ["nginx", "access"]
}
}
filter {
if "goaccess" in [tags] {
# 处理GoAccess统计数据
mutate {
add_field => { "[@metadata][index]" => "goaccess-stats" }
}
# 添加时间戳
if ![timestamp] {
ruby {
code => "event.set('timestamp', Time.now.iso8601)"
}
}
date {
match => [ "timestamp", "ISO8601" ]
}
}
if "nginx" in [tags] {
# 解析Nginx访问日志
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
# 解析用户代理
useragent {
source => "agent"
}
# GeoIP解析
geoip {
source => "clientip"
target => "geoip"
}
# 数据类型转换
mutate {
convert => { "response" => "integer" }
convert => { "bytes" => "integer" }
}
# 添加自定义字段
if [response] >= 400 {
mutate {
add_tag => [ "error" ]
}
}
if [response] >= 500 {
mutate {
add_tag => [ "server_error" ]
}
}
# 检测机器人
if [agent] =~ /bot|crawler|spider/i {
mutate {
add_tag => [ "bot" ]
}
}
mutate {
add_field => { "[@metadata][index]" => "nginx-access" }
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{[@metadata][index]}-%{+YYYY.MM.dd}"
}
# 调试输出
if [@metadata][debug] {
stdout {
codec => rubydebug
}
}
}
1.2 Elasticsearch索引模板
{
"index_patterns": ["goaccess-*"],
"template": {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"index.refresh_interval": "5s",
"index.codec": "best_compression"
},
"mappings": {
"properties": {
"timestamp": {
"type": "date"
},
"general": {
"properties": {
"total_requests": {
"type": "long"
},
"unique_visitors": {
"type": "long"
},
"bandwidth": {
"type": "long"
},
"log_size": {
"type": "long"
}
}
},
"requests": {
"properties": {
"data": {
"type": "keyword"
},
"hits": {
"type": "long"
},
"visitors": {
"type": "long"
},
"bandwidth": {
"type": "long"
}
}
},
"status_codes": {
"properties": {
"data": {
"type": "keyword"
},
"hits": {
"type": "long"
}
}
},
"hosts": {
"properties": {
"data": {
"type": "ip"
},
"hits": {
"type": "long"
}
}
},
"os": {
"properties": {
"data": {
"type": "keyword"
},
"hits": {
"type": "long"
}
}
},
"browsers": {
"properties": {
"data": {
"type": "keyword"
},
"hits": {
"type": "long"
}
}
},
"visit_time": {
"properties": {
"data": {
"type": "keyword"
},
"hits": {
"type": "long"
}
}
},
"virtual_hosts": {
"properties": {
"data": {
"type": "keyword"
},
"hits": {
"type": "long"
}
}
},
"referrers": {
"properties": {
"data": {
"type": "keyword"
},
"hits": {
"type": "long"
}
}
},
"referring_sites": {
"properties": {
"data": {
"type": "keyword"
},
"hits": {
"type": "long"
}
}
},
"keyphrases": {
"properties": {
"data": {
"type": "text",
"analyzer": "standard"
},
"hits": {
"type": "long"
}
}
},
"geo_location": {
"properties": {
"data": {
"type": "keyword"
},
"hits": {
"type": "long"
}
}
}
}
}
}
}
1.3 Kibana仪表板配置
#!/usr/bin/env python3
# kibana_dashboard_setup.py - 自动创建Kibana仪表板
import json
import requests
from datetime import datetime
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class KibanaDashboardSetup:
def __init__(self, kibana_url="http://localhost:5601"):
self.kibana_url = kibana_url
self.headers = {
'Content-Type': 'application/json',
'kbn-xsrf': 'true'
}
def create_index_pattern(self):
"""创建索引模式"""
index_patterns = [
{
"id": "goaccess-stats",
"title": "goaccess-stats-*",
"timeFieldName": "timestamp"
},
{
"id": "nginx-access",
"title": "nginx-access-*",
"timeFieldName": "@timestamp"
}
]
for pattern in index_patterns:
url = f"{self.kibana_url}/api/saved_objects/index-pattern/{pattern['id']}"
data = {
"attributes": {
"title": pattern['title'],
"timeFieldName": pattern['timeFieldName']
}
}
try:
response = requests.post(url, json=data, headers=self.headers)
if response.status_code in [200, 409]: # 200=创建成功, 409=已存在
logger.info(f"索引模式 {pattern['id']} 创建成功")
else:
logger.error(f"创建索引模式失败: {response.text}")
except Exception as e:
logger.error(f"创建索引模式时出错: {e}")
def create_visualizations(self):
"""创建可视化图表"""
visualizations = [
{
"id": "goaccess-requests-timeline",
"title": "请求时间线",
"type": "line",
"config": {
"view": "timeseries",
"aggs": [
{
"id": "1",
"type": "sum",
"schema": "metric",
"params": {
"field": "general.total_requests"
}
},
{
"id": "2",
"type": "date_histogram",
"schema": "segment",
"params": {
"field": "timestamp",
"interval": "auto"
}
}
]
}
},
{
"id": "goaccess-status-codes",
"title": "状态码分布",
"type": "pie",
"config": {
"aggs": [
{
"id": "1",
"type": "sum",
"schema": "metric",
"params": {
"field": "status_codes.hits"
}
},
{
"id": "2",
"type": "terms",
"schema": "segment",
"params": {
"field": "status_codes.data",
"size": 10
}
}
]
}
},
{
"id": "goaccess-top-pages",
"title": "热门页面",
"type": "horizontal_bar",
"config": {
"aggs": [
{
"id": "1",
"type": "sum",
"schema": "metric",
"params": {
"field": "requests.hits"
}
},
{
"id": "2",
"type": "terms",
"schema": "segment",
"params": {
"field": "requests.data",
"size": 20
}
}
]
}
},
{
"id": "goaccess-geo-map",
"title": "访问者地理分布",
"type": "tile_map",
"config": {
"aggs": [
{
"id": "1",
"type": "sum",
"schema": "metric",
"params": {
"field": "geo_location.hits"
}
},
{
"id": "2",
"type": "geohash_grid",
"schema": "segment",
"params": {
"field": "geoip.location",
"precision": 3
}
}
]
}
},
{
"id": "goaccess-browsers",
"title": "浏览器分布",
"type": "pie",
"config": {
"aggs": [
{
"id": "1",
"type": "sum",
"schema": "metric",
"params": {
"field": "browsers.hits"
}
},
{
"id": "2",
"type": "terms",
"schema": "segment",
"params": {
"field": "browsers.data",
"size": 10
}
}
]
}
},
{
"id": "goaccess-os-distribution",
"title": "操作系统分布",
"type": "pie",
"config": {
"aggs": [
{
"id": "1",
"type": "sum",
"schema": "metric",
"params": {
"field": "os.hits"
}
},
{
"id": "2",
"type": "terms",
"schema": "segment",
"params": {
"field": "os.data",
"size": 10
}
}
]
}
}
]
for viz in visualizations:
self.create_visualization(viz)
def create_visualization(self, viz_config):
"""创建单个可视化图表"""
url = f"{self.kibana_url}/api/saved_objects/visualization/{viz_config['id']}"
data = {
"attributes": {
"title": viz_config['title'],
"visState": json.dumps({
"title": viz_config['title'],
"type": viz_config['type'],
"params": viz_config['config'],
"aggs": viz_config['config'].get('aggs', [])
}),
"uiStateJSON": "{}",
"description": "",
"version": 1,
"kibanaSavedObjectMeta": {
"searchSourceJSON": json.dumps({
"index": "goaccess-stats",
"query": {
"match_all": {}
},
"filter": []
})
}
}
}
try:
response = requests.post(url, json=data, headers=self.headers)
if response.status_code in [200, 409]:
logger.info(f"可视化图表 {viz_config['title']} 创建成功")
else:
logger.error(f"创建可视化图表失败: {response.text}")
except Exception as e:
logger.error(f"创建可视化图表时出错: {e}")
def create_dashboard(self):
"""创建仪表板"""
dashboard_id = "goaccess-main-dashboard"
url = f"{self.kibana_url}/api/saved_objects/dashboard/{dashboard_id}"
# 定义面板布局
panels = [
{
"gridData": {"x": 0, "y": 0, "w": 24, "h": 15},
"id": "goaccess-requests-timeline",
"type": "visualization"
},
{
"gridData": {"x": 24, "y": 0, "w": 24, "h": 15},
"id": "goaccess-status-codes",
"type": "visualization"
},
{
"gridData": {"x": 0, "y": 15, "w": 24, "h": 15},
"id": "goaccess-top-pages",
"type": "visualization"
},
{
"gridData": {"x": 24, "y": 15, "w": 24, "h": 15},
"id": "goaccess-geo-map",
"type": "visualization"
},
{
"gridData": {"x": 0, "y": 30, "w": 24, "h": 15},
"id": "goaccess-browsers",
"type": "visualization"
},
{
"gridData": {"x": 24, "y": 30, "w": 24, "h": 15},
"id": "goaccess-os-distribution",
"type": "visualization"
}
]
data = {
"attributes": {
"title": "GoAccess Web分析仪表板",
"hits": 0,
"description": "基于GoAccess数据的综合Web分析仪表板",
"panelsJSON": json.dumps(panels),
"optionsJSON": json.dumps({
"useMargins": True,
"syncColors": False,
"hidePanelTitles": False
}),
"version": 1,
"timeRestore": False,
"kibanaSavedObjectMeta": {
"searchSourceJSON": json.dumps({
"query": {
"match_all": {}
},
"filter": []
})
}
}
}
try:
response = requests.post(url, json=data, headers=self.headers)
if response.status_code in [200, 409]:
logger.info("GoAccess仪表板创建成功")
logger.info(f"访问地址: {self.kibana_url}/app/kibana#/dashboard/{dashboard_id}")
else:
logger.error(f"创建仪表板失败: {response.text}")
except Exception as e:
logger.error(f"创建仪表板时出错: {e}")
def setup_all(self):
"""执行完整设置"""
logger.info("开始设置Splunk GoAccess集成...")
# 创建仪表板
self.create_dashboard()
# 创建告警
self.create_alerts()
logger.info("Splunk GoAccess集成设置完成")
def main():
import argparse
parser = argparse.ArgumentParser(description='Splunk GoAccess集成设置')
parser.add_argument('--splunk-url', required=True, help='Splunk服务器URL')
parser.add_argument('--username', required=True, help='Splunk用户名')
parser.add_argument('--password', required=True, help='Splunk密码')
args = parser.parse_args()
creator = SplunkDashboardCreator(
splunk_url=args.splunk_url,
username=args.username,
password=args.password
)
creator.setup_all()
if __name__ == "__main__":
main()
4.2 Redis集成
#!/usr/bin/env python3
# redis_integration.py - GoAccess Redis集成
import json
import redis
import subprocess
from datetime import datetime, timedelta
import logging
import time
import hashlib
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class GoAccessRedisIntegration:
def __init__(self, redis_config, log_file):
self.redis_config = redis_config
self.log_file = log_file
self.redis_client = None
# 连接Redis
self.connect_redis()
def connect_redis(self):
"""连接Redis数据库"""
try:
self.redis_client = redis.Redis(**self.redis_config)
# 测试连接
self.redis_client.ping()
logger.info("Redis连接成功")
except Exception as e:
logger.error(f"连接Redis失败: {e}")
raise
def get_goaccess_data(self):
"""获取GoAccess数据"""
try:
cmd = [
'goaccess', self.log_file,
'--log-format=COMBINED',
'--json-pretty-print'
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=60
)
if result.returncode == 0:
return json.loads(result.stdout)
else:
logger.error(f"GoAccess执行失败: {result.stderr}")
return None
except Exception as e:
logger.error(f"获取GoAccess数据时出错: {e}")
return None
def store_summary_data(self, data):
"""存储汇总数据"""
try:
general = data.get('general', {})
timestamp = datetime.now().isoformat()
# 存储当前统计数据
summary_key = f"goaccess:summary:{timestamp}"
summary_data = {
'timestamp': timestamp,
'total_requests': general.get('total_requests', 0),
'unique_visitors': general.get('unique_visitors', 0),
'bandwidth': general.get('bandwidth', 0),
'log_size': general.get('log_size', 0)
}
# 计算错误率
status_codes = data.get('status_codes', {}).get('data', [])
total_requests = general.get('total_requests', 0)
error_requests = sum(
item.get('hits', 0) for item in status_codes
if item.get('data', '').startswith(('4', '5'))
)
error_rate = (error_requests / total_requests * 100) if total_requests > 0 else 0
summary_data['error_rate'] = round(error_rate, 2)
# 存储到Redis
self.redis_client.hset(summary_key, mapping=summary_data)
# 设置过期时间(7天)
self.redis_client.expire(summary_key, 7 * 24 * 3600)
# 添加到时间序列索引
self.redis_client.zadd('goaccess:summary:index', {summary_key: time.time()})
logger.info(f"汇总数据存储成功: {summary_key}")
except Exception as e:
logger.error(f"存储汇总数据失败: {e}")
def store_requests_data(self, data):
"""存储请求数据"""
try:
requests_data = data.get('requests', {}).get('data', [])
timestamp = datetime.now().isoformat()
# 存储热门请求(前50个)
for i, item in enumerate(requests_data[:50]):
request_key = f"goaccess:requests:{timestamp}:{i}"
request_data = {
'timestamp': timestamp,
'request_path': item.get('data', ''),
'hits': item.get('hits', 0),
'visitors': item.get('visitors', 0),
'bandwidth': item.get('bandwidth', 0)
}
self.redis_client.hset(request_key, mapping=request_data)
self.redis_client.expire(request_key, 3 * 24 * 3600) # 3天过期
# 创建请求索引
requests_index_key = f"goaccess:requests:index:{timestamp}"
self.redis_client.zadd(
requests_index_key,
{f"goaccess:requests:{timestamp}:{i}": item.get('hits', 0)
for i, item in enumerate(requests_data[:50])}
)
self.redis_client.expire(requests_index_key, 3 * 24 * 3600)
logger.info(f"请求数据存储成功,共{min(50, len(requests_data))}条")
except Exception as e:
logger.error(f"存储请求数据失败: {e}")
def store_realtime_metrics(self, data):
"""存储实时指标"""
try:
general = data.get('general', {})
current_time = int(time.time())
# 存储实时指标到时间序列
metrics = {
'requests_per_second': general.get('total_requests', 0) / 3600, # 假设1小时数据
'unique_visitors': general.get('unique_visitors', 0),
'bandwidth_per_second': general.get('bandwidth', 0) / 3600
}
for metric_name, value in metrics.items():
metric_key = f"goaccess:realtime:{metric_name}"
# 添加到时间序列(保留最近1小时的数据)
self.redis_client.zadd(metric_key, {current_time: value})
# 清理1小时前的数据
one_hour_ago = current_time - 3600
self.redis_client.zremrangebyscore(metric_key, 0, one_hour_ago)
# 存储当前状态
status_key = "goaccess:current_status"
status_data = {
'last_update': datetime.now().isoformat(),
'total_requests': general.get('total_requests', 0),
'unique_visitors': general.get('unique_visitors', 0),
'bandwidth': general.get('bandwidth', 0)
}
self.redis_client.hset(status_key, mapping=status_data)
logger.info("实时指标存储成功")
except Exception as e:
logger.error(f"存储实时指标失败: {e}")
def store_geo_data(self, data):
"""存储地理位置数据"""
try:
geo_data = data.get('geo_location', {}).get('data', [])
timestamp = datetime.now().isoformat()
# 存储地理位置统计
geo_key = f"goaccess:geo:{timestamp}"
geo_stats = {}
for item in geo_data[:30]: # 前30个国家/地区
country = item.get('data', 'Unknown')
hits = item.get('hits', 0)
geo_stats[country] = hits
if geo_stats:
self.redis_client.hset(geo_key, mapping=geo_stats)
self.redis_client.expire(geo_key, 7 * 24 * 3600) # 7天过期
# 添加到地理位置索引
self.redis_client.zadd('goaccess:geo:index', {geo_key: time.time()})
logger.info(f"地理位置数据存储成功,共{len(geo_stats)}个国家/地区")
except Exception as e:
logger.error(f"存储地理位置数据失败: {e}")
def get_realtime_metrics(self, minutes=60):
"""获取实时指标"""
try:
current_time = int(time.time())
start_time = current_time - (minutes * 60)
metrics = {}
metric_names = ['requests_per_second', 'unique_visitors', 'bandwidth_per_second']
for metric_name in metric_names:
metric_key = f"goaccess:realtime:{metric_name}"
# 获取时间范围内的数据
data_points = self.redis_client.zrangebyscore(
metric_key, start_time, current_time, withscores=True
)
metrics[metric_name] = [
{'timestamp': int(score), 'value': float(value)}
for value, score in data_points
]
return metrics
except Exception as e:
logger.error(f"获取实时指标失败: {e}")
return {}
def get_current_status(self):
"""获取当前状态"""
try:
status_key = "goaccess:current_status"
status_data = self.redis_client.hgetall(status_key)
# 转换字节字符串为普通字符串
return {k.decode(): v.decode() for k, v in status_data.items()}
except Exception as e:
logger.error(f"获取当前状态失败: {e}")
return {}
def get_top_requests(self, hours=24, limit=20):
"""获取热门请求"""
try:
current_time = time.time()
start_time = current_time - (hours * 3600)
# 获取时间范围内的请求索引
index_keys = self.redis_client.zrangebyscore(
'goaccess:requests:index', start_time, current_time
)
request_stats = {}
for index_key in index_keys:
if isinstance(index_key, bytes):
index_key = index_key.decode()
# 获取该时间点的请求数据
request_keys = self.redis_client.zrevrange(index_key, 0, limit-1)
for request_key in request_keys:
if isinstance(request_key, bytes):
request_key = request_key.decode()
request_data = self.redis_client.hgetall(request_key)
if request_data:
path = request_data.get(b'request_path', b'').decode()
hits = int(request_data.get(b'hits', 0))
if path in request_stats:
request_stats[path] += hits
else:
request_stats[path] = hits
# 排序并返回前N个
sorted_requests = sorted(
request_stats.items(),
key=lambda x: x[1],
reverse=True
)[:limit]
return [{'path': path, 'hits': hits} for path, hits in sorted_requests]
except Exception as e:
logger.error(f"获取热门请求失败: {e}")
return []
def get_geo_statistics(self, hours=24):
"""获取地理位置统计"""
try:
current_time = time.time()
start_time = current_time - (hours * 3600)
# 获取时间范围内的地理位置数据
geo_keys = self.redis_client.zrangebyscore(
'goaccess:geo:index', start_time, current_time
)
country_stats = {}
for geo_key in geo_keys:
if isinstance(geo_key, bytes):
geo_key = geo_key.decode()
geo_data = self.redis_client.hgetall(geo_key)
for country, hits in geo_data.items():
if isinstance(country, bytes):
country = country.decode()
hits = int(hits)
if country in country_stats:
country_stats[country] += hits
else:
country_stats[country] = hits
# 排序并返回前20个
sorted_countries = sorted(
country_stats.items(),
key=lambda x: x[1],
reverse=True
)[:20]
return [{'country': country, 'hits': hits} for country, hits in sorted_countries]
except Exception as e:
logger.error(f"获取地理位置统计失败: {e}")
return []
def process_and_store_data(self):
"""处理并存储数据"""
logger.info("开始处理GoAccess数据...")
# 获取数据
data = self.get_goaccess_data()
if not data:
logger.warning("未获取到GoAccess数据")
return
try:
# 存储各类数据
self.store_summary_data(data)
self.store_requests_data(data)
self.store_realtime_metrics(data)
self.store_geo_data(data)
logger.info("数据处理和存储完成")
except Exception as e:
logger.error(f"处理数据时出错: {e}")
def cleanup_old_data(self, days=7):
"""清理旧数据"""
try:
current_time = time.time()
cutoff_time = current_time - (days * 24 * 3600)
# 清理汇总数据索引
removed_summary = self.redis_client.zremrangebyscore(
'goaccess:summary:index', 0, cutoff_time
)
# 清理地理位置数据索引
removed_geo = self.redis_client.zremrangebyscore(
'goaccess:geo:index', 0, cutoff_time
)
logger.info(f"清理完成:删除{removed_summary}个汇总记录,{removed_geo}个地理位置记录")
except Exception as e:
logger.error(f"清理旧数据失败: {e}")
def get_dashboard_data(self):
"""获取仪表板数据"""
try:
dashboard_data = {
'current_status': self.get_current_status(),
'realtime_metrics': self.get_realtime_metrics(60),
'top_requests': self.get_top_requests(24, 10),
'geo_statistics': self.get_geo_statistics(24)
}
return dashboard_data
except Exception as e:
logger.error(f"获取仪表板数据失败: {e}")
return {}
def main():
# Redis配置
redis_config = {
'host': 'localhost',
'port': 6379,
'db': 0,
'decode_responses': False # 保持字节格式以便处理
}
log_file = '/var/log/nginx/access.log'
# 创建集成实例
integration = GoAccessRedisIntegration(redis_config, log_file)
try:
# 执行一次数据处理
integration.process_and_store_data()
# 获取仪表板数据
dashboard_data = integration.get_dashboard_data()
if dashboard_data:
print("仪表板数据:")
print(json.dumps(dashboard_data, indent=2, default=str))
# 清理旧数据
integration.cleanup_old_data()
except Exception as e:
logger.error(f"执行失败: {e}")
if __name__ == "__main__":
main()
5. 集成最佳实践
5.1 数据流设计
# data_flow.yml - 数据流配置
apiVersion: v1
kind: ConfigMap
metadata:
name: goaccess-integration-config
data:
flow.conf: |
# GoAccess数据流配置
# 数据源配置
sources:
- name: nginx_access_log
type: file
path: /var/log/nginx/access.log
format: COMBINED
- name: apache_access_log
type: file
path: /var/log/apache2/access.log
format: COMMON
# 处理管道
pipeline:
- stage: parse
processor: goaccess
config:
real_time: true
json_output: true
- stage: enrich
processor: geo_ip
config:
database: /usr/share/GeoIP/GeoLite2-Country.mmdb
- stage: filter
processor: exclude_bots
config:
patterns:
- "bot"
- "crawler"
- "spider"
# 输出目标
outputs:
- name: elasticsearch
type: elasticsearch
config:
hosts: ["localhost:9200"]
index: "goaccess-{+YYYY.MM.dd}"
- name: prometheus
type: prometheus
config:
port: 9090
metrics_path: /metrics
- name: mysql
type: mysql
config:
host: localhost
database: goaccess_analytics
table: access_logs
- name: redis
type: redis
config:
host: localhost
port: 6379
db: 0
# 告警配置
alerts:
- name: high_error_rate
condition: error_rate > 5
actions:
- email
- slack
- name: traffic_spike
condition: requests_per_minute > 1000
actions:
- webhook
5.2 性能优化
#!/usr/bin/env python3
# performance_optimizer.py - 性能优化工具
import asyncio
import aiofiles
import json
from concurrent.futures import ThreadPoolExecutor
import logging
import time
from typing import List, Dict, Any
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class GoAccessPerformanceOptimizer:
def __init__(self, config):
self.config = config
self.executor = ThreadPoolExecutor(max_workers=config.get('max_workers', 4))
self.batch_size = config.get('batch_size', 1000)
self.buffer_size = config.get('buffer_size', 10000)
async def process_logs_async(self, log_files: List[str]):
"""异步处理多个日志文件"""
tasks = []
for log_file in log_files:
task = asyncio.create_task(self.process_single_log(log_file))
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = sum(1 for r in results if not isinstance(r, Exception))
failed = len(results) - successful
logger.info(f"处理完成:成功{successful}个,失败{failed}个")
return results
async def process_single_log(self, log_file: str):
"""处理单个日志文件"""
try:
start_time = time.time()
# 异步读取文件
async with aiofiles.open(log_file, 'r') as f:
content = await f.read()
# 在线程池中执行GoAccess处理
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
self.executor,
self.run_goaccess,
log_file
)
processing_time = time.time() - start_time
logger.info(f"处理{log_file}完成,耗时{processing_time:.2f}秒")
return result
except Exception as e:
logger.error(f"处理{log_file}失败: {e}")
raise
def run_goaccess(self, log_file: str):
"""运行GoAccess分析"""
import subprocess
cmd = [
'goaccess', log_file,
'--log-format=COMBINED',
'--json-pretty-print',
'--no-progress'
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=300 # 5分钟超时
)
if result.returncode == 0:
return json.loads(result.stdout)
else:
raise Exception(f"GoAccess执行失败: {result.stderr}")
async def batch_process_data(self, data_list: List[Dict[Any, Any]], processor_func):
"""批量处理数据"""
batches = []
# 分批处理
for i in range(0, len(data_list), self.batch_size):
batch = data_list[i:i + self.batch_size]
batches.append(batch)
tasks = []
for batch in batches:
task = asyncio.create_task(self.process_batch(batch, processor_func))
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def process_batch(self, batch: List[Dict[Any, Any]], processor_func):
"""处理单个批次"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
processor_func,
batch
)
def optimize_goaccess_config(self, log_file_size: int) -> Dict[str, Any]:
"""根据日志文件大小优化GoAccess配置"""
config = {}
# 根据文件大小调整配置
if log_file_size < 100 * 1024 * 1024: # < 100MB
config.update({
'real_time_html': True,
'keep_db_files': False,
'load_from_disk': False
})
elif log_file_size < 1024 * 1024 * 1024: # < 1GB
config.update({
'real_time_html': False,
'keep_db_files': True,
'load_from_disk': True,
'db_path': '/tmp/goaccess'
})
else: # >= 1GB
config.update({
'real_time_html': False,
'keep_db_files': True,
'load_from_disk': True,
'db_path': '/tmp/goaccess',
'process_and_exit': True
})
return config
async def monitor_performance(self, duration_seconds: int = 300):
"""监控性能指标"""
import psutil
metrics = []
start_time = time.time()
while time.time() - start_time < duration_seconds:
metric = {
'timestamp': time.time(),
'cpu_percent': psutil.cpu_percent(),
'memory_percent': psutil.virtual_memory().percent,
'disk_io': psutil.disk_io_counters()._asdict() if psutil.disk_io_counters() else {},
'network_io': psutil.net_io_counters()._asdict() if psutil.net_io_counters() else {}
}
metrics.append(metric)
await asyncio.sleep(5) # 每5秒采集一次
return metrics
def generate_performance_report(self, metrics: List[Dict[str, Any]]) -> Dict[str, Any]:
"""生成性能报告"""
if not metrics:
return {}
cpu_values = [m['cpu_percent'] for m in metrics]
memory_values = [m['memory_percent'] for m in metrics]
report = {
'duration': len(metrics) * 5, # 秒
'cpu': {
'avg': sum(cpu_values) / len(cpu_values),
'max': max(cpu_values),
'min': min(cpu_values)
},
'memory': {
'avg': sum(memory_values) / len(memory_values),
'max': max(memory_values),
'min': min(memory_values)
},
'recommendations': self.generate_recommendations(metrics)
}
return report
def generate_recommendations(self, metrics: List[Dict[str, Any]]) -> List[str]:
"""生成优化建议"""
recommendations = []
if not metrics:
return recommendations
avg_cpu = sum(m['cpu_percent'] for m in metrics) / len(metrics)
avg_memory = sum(m['memory_percent'] for m in metrics) / len(metrics)
if avg_cpu > 80:
recommendations.append("CPU使用率过高,建议增加处理线程数或优化GoAccess配置")
if avg_memory > 80:
recommendations.append("内存使用率过高,建议启用磁盘数据库模式或增加内存")
if avg_cpu < 20 and avg_memory < 30:
recommendations.append("资源利用率较低,可以增加并发处理数量")
return recommendations
async def main():
config = {
'max_workers': 4,
'batch_size': 1000,
'buffer_size': 10000
}
optimizer = GoAccessPerformanceOptimizer(config)
# 示例:处理多个日志文件
log_files = [
'/var/log/nginx/access.log',
'/var/log/nginx/access.log.1',
'/var/log/apache2/access.log'
]
try:
# 异步处理日志文件
results = await optimizer.process_logs_async(log_files)
# 监控性能
metrics = await optimizer.monitor_performance(60) # 监控1分钟
# 生成性能报告
report = optimizer.generate_performance_report(metrics)
print("性能报告:")
print(json.dumps(report, indent=2))
except Exception as e:
logger.error(f"执行失败: {e}")
if __name__ == "__main__":
asyncio.run(main())
6. 总结
本章详细介绍了GoAccess与其他工具的集成方法,包括:
6.1 主要集成方案
ELK Stack集成
- Logstash配置用于处理GoAccess输出
- Elasticsearch索引模板优化
- Kibana仪表板自动化创建
Grafana集成
- Prometheus指标导出
- 自定义仪表板配置
- 实时监控展示
Splunk集成
- 数据输入配置
- 字段提取和搜索
- 仪表板和告警设置
数据库集成
- MySQL关系型数据库存储
- Redis缓存和实时数据
- 数据清理和维护
6.2 集成优势
- 数据统一管理:将GoAccess数据集成到现有监控体系
- 实时分析能力:支持实时数据流处理和分析
- 可视化增强:利用专业工具创建丰富的可视化界面
- 告警机制:集成现有告警系统,及时发现异常
- 数据持久化:长期存储和历史数据分析
6.3 最佳实践
- 性能优化:异步处理、批量操作、资源监控
- 数据流设计:合理的数据管道和处理流程
- 错误处理:完善的异常处理和恢复机制
- 安全考虑:数据传输加密和访问控制
- 监控运维:系统健康检查和性能监控
下一章我们将学习GoAccess的性能优化和故障排除技巧。
4. 数据库集成
4.1 MySQL集成
#!/usr/bin/env python3
# mysql_integration.py - GoAccess MySQL集成
import json
import mysql.connector
import subprocess
from datetime import datetime
import logging
import schedule
import time
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class GoAccessMySQLIntegration:
def __init__(self, mysql_config, log_file):
self.mysql_config = mysql_config
self.log_file = log_file
self.connection = None
# 连接数据库
self.connect_database()
# 创建表结构
self.create_tables()
def connect_database(self):
"""连接MySQL数据库"""
try:
self.connection = mysql.connector.connect(**self.mysql_config)
logger.info("MySQL数据库连接成功")
except Exception as e:
logger.error(f"连接MySQL数据库失败: {e}")
raise
def create_tables(self):
"""创建数据表"""
cursor = self.connection.cursor()
# 创建统计汇总表
create_summary_table = """
CREATE TABLE IF NOT EXISTS goaccess_summary (
id INT AUTO_INCREMENT PRIMARY KEY,
timestamp DATETIME NOT NULL,
total_requests BIGINT DEFAULT 0,
unique_visitors BIGINT DEFAULT 0,
bandwidth BIGINT DEFAULT 0,
log_size BIGINT DEFAULT 0,
error_rate DECIMAL(5,2) DEFAULT 0.00,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_timestamp (timestamp)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""
# 创建请求详情表
create_requests_table = """
CREATE TABLE IF NOT EXISTS goaccess_requests (
id INT AUTO_INCREMENT PRIMARY KEY,
timestamp DATETIME NOT NULL,
request_path VARCHAR(500) NOT NULL,
hits BIGINT DEFAULT 0,
visitors BIGINT DEFAULT 0,
bandwidth BIGINT DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_timestamp (timestamp),
INDEX idx_request_path (request_path(100))
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""
# 创建状态码表
create_status_codes_table = """
CREATE TABLE IF NOT EXISTS goaccess_status_codes (
id INT AUTO_INCREMENT PRIMARY KEY,
timestamp DATETIME NOT NULL,
status_code VARCHAR(10) NOT NULL,
hits BIGINT DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_timestamp (timestamp),
INDEX idx_status_code (status_code)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""
# 创建浏览器统计表
create_browsers_table = """
CREATE TABLE IF NOT EXISTS goaccess_browsers (
id INT AUTO_INCREMENT PRIMARY KEY,
timestamp DATETIME NOT NULL,
browser VARCHAR(100) NOT NULL,
hits BIGINT DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_timestamp (timestamp),
INDEX idx_browser (browser)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""
# 创建地理位置表
create_geo_table = """
CREATE TABLE IF NOT EXISTS goaccess_geo_location (
id INT AUTO_INCREMENT PRIMARY KEY,
timestamp DATETIME NOT NULL,
country VARCHAR(100) NOT NULL,
hits BIGINT DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_timestamp (timestamp),
INDEX idx_country (country)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""
tables = [
create_summary_table,
create_requests_table,
create_status_codes_table,
create_browsers_table,
create_geo_table
]
try:
for table_sql in tables:
cursor.execute(table_sql)
self.connection.commit()
logger.info("数据表创建成功")
except Exception as e:
logger.error(f"创建数据表失败: {e}")
raise
finally:
cursor.close()
def get_goaccess_data(self):
"""获取GoAccess数据"""
try:
cmd = [
'goaccess', self.log_file,
'--log-format=COMBINED',
'--json-pretty-print'
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=60
)
if result.returncode == 0:
return json.loads(result.stdout)
else:
logger.error(f"GoAccess执行失败: {result.stderr}")
return None
except Exception as e:
logger.error(f"获取GoAccess数据时出错: {e}")
return None
def insert_summary_data(self, data, timestamp):
"""插入汇总数据"""
cursor = self.connection.cursor()
try:
general = data.get('general', {})
# 计算错误率
status_codes = data.get('status_codes', {}).get('data', [])
total_requests = general.get('total_requests', 0)
error_requests = sum(
item.get('hits', 0) for item in status_codes
if item.get('data', '').startswith(('4', '5'))
)
error_rate = (error_requests / total_requests * 100) if total_requests > 0 else 0
insert_sql = """
INSERT INTO goaccess_summary
(timestamp, total_requests, unique_visitors, bandwidth, log_size, error_rate)
VALUES (%s, %s, %s, %s, %s, %s)
"""
values = (
timestamp,
general.get('total_requests', 0),
general.get('unique_visitors', 0),
general.get('bandwidth', 0),
general.get('log_size', 0),
round(error_rate, 2)
)
cursor.execute(insert_sql, values)
logger.info("汇总数据插入成功")
except Exception as e:
logger.error(f"插入汇总数据失败: {e}")
finally:
cursor.close()
def insert_requests_data(self, data, timestamp):
"""插入请求数据"""
cursor = self.connection.cursor()
try:
requests_data = data.get('requests', {}).get('data', [])
insert_sql = """
INSERT INTO goaccess_requests
(timestamp, request_path, hits, visitors, bandwidth)
VALUES (%s, %s, %s, %s, %s)
"""
values_list = []
for item in requests_data[:100]: # 限制前100个
values_list.append((
timestamp,
item.get('data', '')[:500], # 限制长度
item.get('hits', 0),
item.get('visitors', 0),
item.get('bandwidth', 0)
))
if values_list:
cursor.executemany(insert_sql, values_list)
logger.info(f"插入{len(values_list)}条请求数据")
except Exception as e:
logger.error(f"插入请求数据失败: {e}")
finally:
cursor.close()
def insert_status_codes_data(self, data, timestamp):
"""插入状态码数据"""
cursor = self.connection.cursor()
try:
status_codes = data.get('status_codes', {}).get('data', [])
insert_sql = """
INSERT INTO goaccess_status_codes
(timestamp, status_code, hits)
VALUES (%s, %s, %s)
"""
values_list = []
for item in status_codes:
values_list.append((
timestamp,
item.get('data', ''),
item.get('hits', 0)
))
if values_list:
cursor.executemany(insert_sql, values_list)
logger.info(f"插入{len(values_list)}条状态码数据")
except Exception as e:
logger.error(f"插入状态码数据失败: {e}")
finally:
cursor.close()
def insert_browsers_data(self, data, timestamp):
"""插入浏览器数据"""
cursor = self.connection.cursor()
try:
browsers = data.get('browsers', {}).get('data', [])
insert_sql = """
INSERT INTO goaccess_browsers
(timestamp, browser, hits)
VALUES (%s, %s, %s)
"""
values_list = []
for item in browsers[:20]: # 限制前20个
values_list.append((
timestamp,
item.get('data', '')[:100], # 限制长度
item.get('hits', 0)
))
if values_list:
cursor.executemany(insert_sql, values_list)
logger.info(f"插入{len(values_list)}条浏览器数据")
except Exception as e:
logger.error(f"插入浏览器数据失败: {e}")
finally:
cursor.close()
def insert_geo_data(self, data, timestamp):
"""插入地理位置数据"""
cursor = self.connection.cursor()
try:
geo_data = data.get('geo_location', {}).get('data', [])
insert_sql = """
INSERT INTO goaccess_geo_location
(timestamp, country, hits)
VALUES (%s, %s, %s)
"""
values_list = []
for item in geo_data[:50]: # 限制前50个
values_list.append((
timestamp,
item.get('data', '')[:100], # 限制长度
item.get('hits', 0)
))
if values_list:
cursor.executemany(insert_sql, values_list)
logger.info(f"插入{len(values_list)}条地理位置数据")
except Exception as e:
logger.error(f"插入地理位置数据失败: {e}")
finally:
cursor.close()
def process_and_store_data(self):
"""处理并存储数据"""
logger.info("开始处理GoAccess数据...")
# 获取数据
data = self.get_goaccess_data()
if not data:
logger.warning("未获取到GoAccess数据")
return
timestamp = datetime.now()
try:
# 插入各类数据
self.insert_summary_data(data, timestamp)
self.insert_requests_data(data, timestamp)
self.insert_status_codes_data(data, timestamp)
self.insert_browsers_data(data, timestamp)
self.insert_geo_data(data, timestamp)
# 提交事务
self.connection.commit()
logger.info("数据处理和存储完成")
except Exception as e:
logger.error(f"处理数据时出错: {e}")
self.connection.rollback()
def cleanup_old_data(self, days=30):
"""清理旧数据"""
cursor = self.connection.cursor()
tables = [
'goaccess_summary',
'goaccess_requests',
'goaccess_status_codes',
'goaccess_browsers',
'goaccess_geo_location'
]
try:
for table in tables:
delete_sql = f"""
DELETE FROM {table}
WHERE timestamp < DATE_SUB(NOW(), INTERVAL %s DAY)
"""
cursor.execute(delete_sql, (days,))
deleted_rows = cursor.rowcount
logger.info(f"从{table}表删除了{deleted_rows}行旧数据")
self.connection.commit()
logger.info(f"清理{days}天前的旧数据完成")
except Exception as e:
logger.error(f"清理旧数据失败: {e}")
self.connection.rollback()
finally:
cursor.close()
def get_statistics(self, hours=24):
"""获取统计信息"""
cursor = self.connection.cursor(dictionary=True)
try:
# 获取最近24小时的统计
stats_sql = """
SELECT
AVG(total_requests) as avg_requests,
MAX(total_requests) as max_requests,
AVG(unique_visitors) as avg_visitors,
MAX(unique_visitors) as max_visitors,
AVG(error_rate) as avg_error_rate,
MAX(error_rate) as max_error_rate,
COUNT(*) as data_points
FROM goaccess_summary
WHERE timestamp >= DATE_SUB(NOW(), INTERVAL %s HOUR)
"""
cursor.execute(stats_sql, (hours,))
stats = cursor.fetchone()
# 获取热门页面
top_pages_sql = """
SELECT
request_path,
SUM(hits) as total_hits,
SUM(visitors) as total_visitors
FROM goaccess_requests
WHERE timestamp >= DATE_SUB(NOW(), INTERVAL %s HOUR)
GROUP BY request_path
ORDER BY total_hits DESC
LIMIT 10
"""
cursor.execute(top_pages_sql, (hours,))
top_pages = cursor.fetchall()
# 获取状态码分布
status_distribution_sql = """
SELECT
status_code,
SUM(hits) as total_hits
FROM goaccess_status_codes
WHERE timestamp >= DATE_SUB(NOW(), INTERVAL %s HOUR)
GROUP BY status_code
ORDER BY total_hits DESC
"""
cursor.execute(status_distribution_sql, (hours,))
status_distribution = cursor.fetchall()
return {
'summary': stats,
'top_pages': top_pages,
'status_distribution': status_distribution
}
except Exception as e:
logger.error(f"获取统计信息失败: {e}")
return None
finally:
cursor.close()
def start_scheduled_processing(self, interval_minutes=5):
"""启动定时处理"""
logger.info(f"启动定时数据处理,间隔{interval_minutes}分钟")
# 调度数据处理
schedule.every(interval_minutes).minutes.do(self.process_and_store_data)
# 调度数据清理(每天凌晨2点)
schedule.every().day.at("02:00").do(self.cleanup_old_data)
try:
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次
except KeyboardInterrupt:
logger.info("定时处理被用户中断")
def close(self):
"""关闭数据库连接"""
if self.connection:
self.connection.close()
logger.info("数据库连接已关闭")
def main():
# MySQL配置
mysql_config = {
'host': 'localhost',
'port': 3306,
'user': 'goaccess',
'password': 'your_password',
'database': 'goaccess_analytics',
'charset': 'utf8mb4',
'autocommit': False
}
log_file = '/var/log/nginx/access.log'
# 创建集成实例
integration = GoAccessMySQLIntegration(mysql_config, log_file)
try:
# 执行一次数据处理
integration.process_and_store_data()
# 获取统计信息
stats = integration.get_statistics()
if stats:
print("统计信息:")
print(json.dumps(stats, indent=2, default=str))
# 启动定时处理(可选)
# integration.start_scheduled_processing()
finally:
integration.close()
if __name__ == "__main__":
main()
logger.info("开始设置Kibana仪表板...")
# 创建索引模式
self.create_index_pattern()
# 等待一下让索引模式生效
import time
time.sleep(2)
# 创建可视化图表
self.create_visualizations()
# 等待一下让可视化图表生效
time.sleep(2)
# 创建仪表板
self.create_dashboard()
logger.info("Kibana仪表板设置完成")
def main(): setup = KibanaDashboardSetup() setup.setup_all()
if name == “main”: main()
## 2. Grafana集成
### 2.1 Prometheus指标导出
```python
#!/usr/bin/env python3
# goaccess_prometheus_exporter.py - GoAccess Prometheus指标导出器
import json
import time
import subprocess
import logging
from prometheus_client import start_http_server, Gauge, Counter, Histogram
from prometheus_client.core import CollectorRegistry
import threading
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class GoAccessPrometheusExporter:
def __init__(self, log_file, port=8000, interval=60):
self.log_file = log_file
self.port = port
self.interval = interval
# 创建注册表
self.registry = CollectorRegistry()
# 定义指标
self.setup_metrics()
# 启动标志
self.running = False
def setup_metrics(self):
"""设置Prometheus指标"""
# 总体指标
self.total_requests = Gauge(
'goaccess_total_requests',
'Total number of requests',
registry=self.registry
)
self.unique_visitors = Gauge(
'goaccess_unique_visitors',
'Number of unique visitors',
registry=self.registry
)
self.bandwidth_bytes = Gauge(
'goaccess_bandwidth_bytes',
'Total bandwidth in bytes',
registry=self.registry
)
self.log_size_bytes = Gauge(
'goaccess_log_size_bytes',
'Log file size in bytes',
registry=self.registry
)
# 状态码指标
self.status_code_requests = Gauge(
'goaccess_status_code_requests',
'Number of requests by status code',
['status_code'],
registry=self.registry
)
# 热门页面指标
self.page_requests = Gauge(
'goaccess_page_requests',
'Number of requests by page',
['page'],
registry=self.registry
)
# 浏览器指标
self.browser_requests = Gauge(
'goaccess_browser_requests',
'Number of requests by browser',
['browser'],
registry=self.registry
)
# 操作系统指标
self.os_requests = Gauge(
'goaccess_os_requests',
'Number of requests by operating system',
['os'],
registry=self.registry
)
# 地理位置指标
self.geo_requests = Gauge(
'goaccess_geo_requests',
'Number of requests by country',
['country'],
registry=self.registry
)
# 引荐来源指标
self.referrer_requests = Gauge(
'goaccess_referrer_requests',
'Number of requests by referrer',
['referrer'],
registry=self.registry
)
# 访问时间指标
self.visit_time_requests = Gauge(
'goaccess_visit_time_requests',
'Number of requests by visit time',
['hour'],
registry=self.registry
)
# 虚拟主机指标
self.vhost_requests = Gauge(
'goaccess_vhost_requests',
'Number of requests by virtual host',
['vhost'],
registry=self.registry
)
# 错误率指标
self.error_rate = Gauge(
'goaccess_error_rate',
'Error rate percentage',
registry=self.registry
)
# 导出器状态指标
self.exporter_last_update = Gauge(
'goaccess_exporter_last_update_timestamp',
'Timestamp of last successful update',
registry=self.registry
)
self.exporter_errors_total = Counter(
'goaccess_exporter_errors_total',
'Total number of exporter errors',
registry=self.registry
)
def get_goaccess_data(self):
"""获取GoAccess数据"""
try:
cmd = [
'goaccess', self.log_file,
'--log-format=COMBINED',
'--json-pretty-print'
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
return json.loads(result.stdout)
else:
logger.error(f"GoAccess执行失败: {result.stderr}")
return None
except subprocess.TimeoutExpired:
logger.error("GoAccess执行超时")
return None
except json.JSONDecodeError as e:
logger.error(f"JSON解析失败: {e}")
return None
except Exception as e:
logger.error(f"获取GoAccess数据时出错: {e}")
return None
def update_metrics(self, data):
"""更新Prometheus指标"""
if not data:
return
try:
# 更新总体指标
general = data.get('general', {})
self.total_requests.set(general.get('total_requests', 0))
self.unique_visitors.set(general.get('unique_visitors', 0))
self.bandwidth_bytes.set(general.get('bandwidth', 0))
self.log_size_bytes.set(general.get('log_size', 0))
# 更新状态码指标
status_codes = data.get('status_codes', {}).get('data', [])
for item in status_codes[:10]: # 限制前10个
status_code = item.get('data', 'unknown')
hits = item.get('hits', 0)
self.status_code_requests.labels(status_code=status_code).set(hits)
# 更新页面指标
requests_data = data.get('requests', {}).get('data', [])
for item in requests_data[:20]: # 限制前20个
page = item.get('data', 'unknown')
hits = item.get('hits', 0)
# 清理页面名称,避免标签值过长
page = page[:100] if len(page) > 100 else page
self.page_requests.labels(page=page).set(hits)
# 更新浏览器指标
browsers = data.get('browsers', {}).get('data', [])
for item in browsers[:10]:
browser = item.get('data', 'unknown')
hits = item.get('hits', 0)
self.browser_requests.labels(browser=browser).set(hits)
# 更新操作系统指标
os_data = data.get('os', {}).get('data', [])
for item in os_data[:10]:
os_name = item.get('data', 'unknown')
hits = item.get('hits', 0)
self.os_requests.labels(os=os_name).set(hits)
# 更新地理位置指标
geo_data = data.get('geo_location', {}).get('data', [])
for item in geo_data[:15]:
country = item.get('data', 'unknown')
hits = item.get('hits', 0)
self.geo_requests.labels(country=country).set(hits)
# 更新引荐来源指标
referrers = data.get('referring_sites', {}).get('data', [])
for item in referrers[:10]:
referrer = item.get('data', 'unknown')
hits = item.get('hits', 0)
# 清理引荐来源名称
referrer = referrer[:50] if len(referrer) > 50 else referrer
self.referrer_requests.labels(referrer=referrer).set(hits)
# 更新访问时间指标
visit_time = data.get('visit_time', {}).get('data', [])
for item in visit_time:
hour = item.get('data', 'unknown')
hits = item.get('hits', 0)
self.visit_time_requests.labels(hour=hour).set(hits)
# 更新虚拟主机指标
vhosts = data.get('virtual_hosts', {}).get('data', [])
for item in vhosts[:10]:
vhost = item.get('data', 'unknown')
hits = item.get('hits', 0)
self.vhost_requests.labels(vhost=vhost).set(hits)
# 计算错误率
total_requests = general.get('total_requests', 0)
if total_requests > 0:
error_requests = sum(
item.get('hits', 0) for item in status_codes
if item.get('data', '').startswith(('4', '5'))
)
error_rate = (error_requests / total_requests) * 100
self.error_rate.set(error_rate)
# 更新导出器状态
self.exporter_last_update.set(time.time())
logger.info(f"指标更新成功,总请求数: {general.get('total_requests', 0)}")
except Exception as e:
logger.error(f"更新指标时出错: {e}")
self.exporter_errors_total.inc()
def collect_loop(self):
"""数据收集循环"""
while self.running:
try:
logger.info("开始收集GoAccess数据...")
data = self.get_goaccess_data()
self.update_metrics(data)
except Exception as e:
logger.error(f"收集数据时出错: {e}")
self.exporter_errors_total.inc()
# 等待下次收集
time.sleep(self.interval)
def start(self):
"""启动导出器"""
logger.info(f"启动GoAccess Prometheus导出器,端口: {self.port}")
# 启动HTTP服务器
start_http_server(self.port, registry=self.registry)
# 启动数据收集
self.running = True
collect_thread = threading.Thread(target=self.collect_loop)
collect_thread.daemon = True
collect_thread.start()
logger.info(f"导出器已启动,指标地址: http://localhost:{self.port}/metrics")
try:
# 保持主线程运行
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("收到中断信号,正在停止...")
self.stop()
def stop(self):
"""停止导出器"""
self.running = False
logger.info("导出器已停止")
def main():
import argparse
parser = argparse.ArgumentParser(description='GoAccess Prometheus导出器')
parser.add_argument('--log-file', required=True, help='日志文件路径')
parser.add_argument('--port', type=int, default=8000, help='HTTP服务端口')
parser.add_argument('--interval', type=int, default=60, help='数据收集间隔(秒)')
args = parser.parse_args()
exporter = GoAccessPrometheusExporter(
log_file=args.log_file,
port=args.port,
interval=args.interval
)
exporter.start()
if __name__ == "__main__":
main()
2.2 Grafana仪表板配置
{
"dashboard": {
"id": null,
"title": "GoAccess Web Analytics",
"tags": ["goaccess", "web", "analytics"],
"style": "dark",
"timezone": "browser",
"refresh": "30s",
"time": {
"from": "now-1h",
"to": "now"
},
"panels": [
{
"id": 1,
"title": "总请求数",
"type": "stat",
"gridPos": {"h": 8, "w": 6, "x": 0, "y": 0},
"targets": [
{
"expr": "goaccess_total_requests",
"legendFormat": "总请求数"
}
],
"fieldConfig": {
"defaults": {
"color": {"mode": "palette-classic"},
"unit": "short",
"thresholds": {
"steps": [
{"color": "green", "value": null},
{"color": "yellow", "value": 1000},
{"color": "red", "value": 10000}
]
}
}
}
},
{
"id": 2,
"title": "独立访客",
"type": "stat",
"gridPos": {"h": 8, "w": 6, "x": 6, "y": 0},
"targets": [
{
"expr": "goaccess_unique_visitors",
"legendFormat": "独立访客"
}
]
},
{
"id": 3,
"title": "带宽使用",
"type": "stat",
"gridPos": {"h": 8, "w": 6, "x": 12, "y": 0},
"targets": [
{
"expr": "goaccess_bandwidth_bytes",
"legendFormat": "带宽"
}
],
"fieldConfig": {
"defaults": {
"unit": "bytes"
}
}
},
{
"id": 4,
"title": "错误率",
"type": "stat",
"gridPos": {"h": 8, "w": 6, "x": 18, "y": 0},
"targets": [
{
"expr": "goaccess_error_rate",
"legendFormat": "错误率"
}
],
"fieldConfig": {
"defaults": {
"unit": "percent",
"thresholds": {
"steps": [
{"color": "green", "value": null},
{"color": "yellow", "value": 5},
{"color": "red", "value": 10}
]
}
}
}
},
{
"id": 5,
"title": "请求趋势",
"type": "timeseries",
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 8},
"targets": [
{
"expr": "rate(goaccess_total_requests[5m]) * 60",
"legendFormat": "每分钟请求数"
}
]
},
{
"id": 6,
"title": "状态码分布",
"type": "piechart",
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 8},
"targets": [
{
"expr": "goaccess_status_code_requests",
"legendFormat": "{{status_code}}"
}
]
},
{
"id": 7,
"title": "热门页面",
"type": "barchart",
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 16},
"targets": [
{
"expr": "topk(10, goaccess_page_requests)",
"legendFormat": "{{page}}"
}
]
},
{
"id": 8,
"title": "浏览器分布",
"type": "piechart",
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 16},
"targets": [
{
"expr": "goaccess_browser_requests",
"legendFormat": "{{browser}}"
}
]
},
{
"id": 9,
"title": "地理分布",
"type": "geomap",
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 24},
"targets": [
{
"expr": "goaccess_geo_requests",
"legendFormat": "{{country}}"
}
]
},
{
"id": 10,
"title": "访问时间分布",
"type": "barchart",
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 24},
"targets": [
{
"expr": "goaccess_visit_time_requests",
"legendFormat": "{{hour}}时"
}
]
}
]
}
}
3. Splunk集成
3.1 Splunk数据输入配置
# /opt/splunk/etc/apps/goaccess/local/inputs.conf
[monitor:///var/log/goaccess/*.json]
disabled = false
index = goaccess
sourcetype = goaccess:json
host_segment = 4
[monitor:///var/log/nginx/access.log]
disabled = false
index = web_logs
sourcetype = nginx:access
host_segment = 4
# 实时监控
[monitor:///var/log/goaccess/realtime.json]
disabled = false
index = goaccess_realtime
sourcetype = goaccess:realtime
host_segment = 4
followTail = 1
3.2 Splunk字段提取配置
# /opt/splunk/etc/apps/goaccess/local/props.conf
[goaccess:json]
KV_MODE = json
TIME_PREFIX = "timestamp":"
TIME_FORMAT = %Y-%m-%dT%H:%M:%S
MAX_TIMESTAMP_LOOKAHEAD = 25
SHOULD_LINEMERGE = false
TRUNCATE = 10000
# 字段提取
EXTRACT-general_stats = "general":\s*{[^}]*"total_requests":\s*(?<total_requests>\d+)[^}]*"unique_visitors":\s*(?<unique_visitors>\d+)[^}]*"bandwidth":\s*(?<bandwidth>\d+)
EXTRACT-status_codes = "status_codes":\s*{[^}]*"data":\s*\[(?<status_code_data>[^\]]+)\]
EXTRACT-requests = "requests":\s*{[^}]*"data":\s*\[(?<request_data>[^\]]+)\]
[goaccess:realtime]
KV_MODE = json
TIME_PREFIX = timestamp
TIME_FORMAT = %Y-%m-%dT%H:%M:%S
SHOULD_LINEMERGE = false
[nginx:access]
REPORT-nginx_access = nginx_access_extract
TIME_FORMAT = %d/%b/%Y:%H:%M:%S %z
MAX_TIMESTAMP_LOOKAHEAD = 30
3.3 Splunk搜索和仪表板
”`python #!/usr/bin/env python3
splunk_dashboard_creator.py - 创建Splunk仪表板
import json import requests from datetime import datetime import logging
logging.basicConfig(level=logging.INFO) logger = logging.getLogger(name)
class SplunkDashboardCreator: def init(self, splunk_url, username, password): self.splunk_url = splunk_url.rstrip(‘/’) self.username = username self.password = password self.session_key = None
# 获取会话密钥
self.authenticate()
def authenticate(self):
"""认证并获取会话密钥"""
auth_url = f"{self.splunk_url}/services/auth/login"
data = {
'username': self.username,
'password': self.password
}
try:
response = requests.post(auth_url, data=data, verify=False)
response.raise_for_status()
# 解析XML响应获取会话密钥
import xml.etree.ElementTree as ET
root = ET.fromstring(response.text)
self.session_key = root.find('.//sessionKey').text
logger.info("Splunk认证成功")
except Exception as e:
logger.error(f"Splunk认证失败: {e}")
raise
def create_saved_search(self, name, search_query, description=""):
"""创建保存的搜索"""
url = f"{self.splunk_url}/services/saved/searches"
headers = {
'Authorization': f'Splunk {self.session_key}',
'Content-Type': 'application/x-www-form-urlencoded'
}
data = {
'name': name,
'search': search_query,
'description': description,
'is_visible': '1',
'is_scheduled': '0'
}
try:
response = requests.post(url, data=data, headers=headers, verify=False)
if response.status_code in [200, 201, 409]: # 409表示已存在
logger.info(f"保存的搜索 '{name}' 创建成功")
else:
logger.error(f"创建保存的搜索失败: {response.text}")
except Exception as e:
logger.error(f"创建保存的搜索时出错: {e}")
def create_dashboard(self):
"""创建GoAccess仪表板"""
dashboard_xml = '''
<panel>
<title>错误率趋势</title>
<chart>
<search>
<query>
index=goaccess sourcetype=goaccess:json
| eval status_codes=spath(_raw, "status_codes.data{}")
| mvexpand status_codes
| eval status_code=spath(status_codes, "data")
| eval hits=spath(status_codes, "hits")
| eval is_error=if(match(status_code, "^[45]"), hits, 0)
| stats sum(hits) as total_hits, sum(is_error) as error_hits by _time
| eval error_rate=round((error_hits/total_hits)*100, 2)
| timechart span=5m avg(error_rate) as "错误率(%)"
</query>
<earliest>-4h@h</earliest>
<latest>now</latest>
<refresh>1m</refresh>
</search>
<option name="charting.chart">line</option>
<option name="charting.axisTitleY.text">错误率 (%)</option>
<option name="charting.legend.placement">bottom</option>
</chart>
</panel>
<panel>
<title>热门页面</title>
<table>
<search>
<query>
index=goaccess sourcetype=goaccess:json
| eval requests=spath(_raw, "requests.data{}")
| mvexpand requests
| eval page=spath(requests, "data")
| eval hits=spath(requests, "hits")
| eval visitors=spath(requests, "visitors")
| eval bandwidth=spath(requests, "bandwidth")
| stats sum(hits) as "访问次数", sum(visitors) as "独立访客", sum(bandwidth) as "带宽" by page
| sort -"访问次数"
| head 20
| rename page as "页面"
</query>
<earliest>-1h@h</earliest>
<latest>now</latest>
<refresh>30s</refresh>
</search>
<option name="drilldown">cell</option>
<option name="refresh.display">progressbar</option>
</table>
</panel>
<panel>
<title>地理分布</title>
<map>
<search>
<query>
index=goaccess sourcetype=goaccess:json
| eval geo_location=spath(_raw, "geo_location.data{}")
| mvexpand geo_location
| eval country=spath(geo_location, "data")
| eval hits=spath(geo_location, "hits")
| stats sum(hits) as count by country
| sort -count
| head 20
| geom geo_countries featureIdField=country
</query>
<earliest>-1h@h</earliest>
<latest>now</latest>
<refresh>30s</refresh>
</search>
<option name="mapping.type">choropleth</option>
<option name="mapping.choroplethLayer.colorMode">auto</option>
<option name="mapping.choroplethLayer.maximumColor">0xDB5800</option>
<option name="mapping.choroplethLayer.minimumColor">0x2F25BA</option>
</map>
</panel>
<panel>
<title>引荐来源</title>
<table>
<search>
<query>
index=goaccess sourcetype=goaccess:json
| eval referring_sites=spath(_raw, "referring_sites.data{}")
| mvexpand referring_sites
| eval referrer=spath(referring_sites, "data")
| eval hits=spath(referring_sites, "hits")
| stats sum(hits) as "访问次数" by referrer
| sort -"访问次数"
| head 15
| rename referrer as "引荐网站"
</query>
<earliest>-1h@h</earliest>
<latest>now</latest>
<refresh>30s</refresh>
</search>
<option name="drilldown">cell</option>
</table>
</panel>
“’
url = f"{self.splunk_url}/services/data/ui/views"
headers = {
'Authorization': f'Splunk {self.session_key}',
'Content-Type': 'application/x-www-form-urlencoded'
}
data = {
'name': 'goaccess_analytics',
'eai:data': dashboard_xml
}
try:
response = requests.post(url, data=data, headers=headers, verify=False)
if response.status_code in [200, 201, 409]:
logger.info("GoAccess仪表板创建成功")
logger.info(f"访问地址: {self.splunk_url}/app/search/goaccess_analytics")
else:
logger.error(f"创建仪表板失败: {response.text}")
except Exception as e:
logger.error(f"创建仪表板时出错: {e}")
def create_alerts(self):
"""创建告警"""
alerts = [
{
'name': 'goaccess_high_error_rate',
'search': '''
index=goaccess sourcetype=goaccess:json
| eval status_codes=spath(_raw, "status_codes.data{}")
| mvexpand status_codes
| eval status_code=spath(status_codes, "data")
| eval hits=spath(status_codes, "hits")
| eval is_error=if(match(status_code, "^[45]"), hits, 0)
| stats sum(hits) as total_hits, sum(is_error) as error_hits
| eval error_rate=(error_hits/total_hits)*100
| where error_rate > 5
''',
'description': '错误率超过5%时触发告警',
'cron_schedule': '*/5 * * * *',
'actions': 'email'
},
{
'name': 'goaccess_high_traffic',
'search': '''
index=goaccess sourcetype=goaccess:json
| eval total_requests=spath(_raw, "general.total_requests")
| where total_requests > 10000
''',
'description': '总请求数超过10000时触发告警',
'cron_schedule': '*/10 * * * *',
'actions': 'email'
}
]
for alert in alerts:
self.create_saved_search(
name=alert['name'],
search_query=alert['search'],
description=alert['description']
)
# 配置告警调度
self.schedule_alert(alert)
def schedule_alert(self, alert_config):
"""配置告警调度"""
url = f"{self.splunk_url}/services/saved/searches/{alert_config['name']}"
headers = {
'Authorization': f'Splunk {self.session_key}',
'Content-Type': 'application/x-www-form-urlencoded'
}
data = {
'is_scheduled': '1',
'cron_schedule': alert_config['cron_schedule'],
'actions': alert_config['actions'],
'action.email.to': 'admin@example.com',
'action.email.subject': f"Splunk Alert: {alert_config['name']}",
'action.email.message.alert': '$result.error_rate$% error rate detected'
}
try:
response = requests.post(url, data=data, headers=headers, verify=False)
if response.status_code == 200:
logger.info(f"告警 '{alert_config['name']}' 调度配置成功")
else:
logger.error(f"配置告警调度失败: {response.text}")
except Exception as e:
logger.error(f"配置告警调度时出错: {e}")
def setup_all(self):
"""执行完整设置"""