1. ELK Stack集成

1.1 Logstash集成

# /etc/logstash/conf.d/goaccess.conf
# GoAccess日志处理配置

input {
  # 读取GoAccess JSON输出
  file {
    path => "/var/log/goaccess/goaccess.json"
    start_position => "beginning"
    codec => "json"
    tags => ["goaccess"]
  }
  
  # 读取原始访问日志
  file {
    path => "/var/log/nginx/access.log"
    start_position => "beginning"
    tags => ["nginx", "access"]
  }
}

filter {
  if "goaccess" in [tags] {
    # 处理GoAccess统计数据
    mutate {
      add_field => { "[@metadata][index]" => "goaccess-stats" }
    }
    
    # 添加时间戳
    if ![timestamp] {
      ruby {
        code => "event.set('timestamp', Time.now.iso8601)"
      }
    }
    
    date {
      match => [ "timestamp", "ISO8601" ]
    }
  }
  
  if "nginx" in [tags] {
    # 解析Nginx访问日志
    grok {
      match => { 
        "message" => "%{COMBINEDAPACHELOG}" 
      }
    }
    
    # 解析用户代理
    useragent {
      source => "agent"
    }
    
    # GeoIP解析
    geoip {
      source => "clientip"
      target => "geoip"
    }
    
    # 数据类型转换
    mutate {
      convert => { "response" => "integer" }
      convert => { "bytes" => "integer" }
    }
    
    # 添加自定义字段
    if [response] >= 400 {
      mutate {
        add_tag => [ "error" ]
      }
    }
    
    if [response] >= 500 {
      mutate {
        add_tag => [ "server_error" ]
      }
    }
    
    # 检测机器人
    if [agent] =~ /bot|crawler|spider/i {
      mutate {
        add_tag => [ "bot" ]
      }
    }
    
    mutate {
      add_field => { "[@metadata][index]" => "nginx-access" }
    }
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "%{[@metadata][index]}-%{+YYYY.MM.dd}"
  }
  
  # 调试输出
  if [@metadata][debug] {
    stdout {
      codec => rubydebug
    }
  }
}

1.2 Elasticsearch索引模板

{
  "index_patterns": ["goaccess-*"],
  "template": {
    "settings": {
      "number_of_shards": 1,
      "number_of_replicas": 1,
      "index.refresh_interval": "5s",
      "index.codec": "best_compression"
    },
    "mappings": {
      "properties": {
        "timestamp": {
          "type": "date"
        },
        "general": {
          "properties": {
            "total_requests": {
              "type": "long"
            },
            "unique_visitors": {
              "type": "long"
            },
            "bandwidth": {
              "type": "long"
            },
            "log_size": {
              "type": "long"
            }
          }
        },
        "requests": {
          "properties": {
            "data": {
              "type": "keyword"
            },
            "hits": {
              "type": "long"
            },
            "visitors": {
              "type": "long"
            },
            "bandwidth": {
              "type": "long"
            }
          }
        },
        "status_codes": {
          "properties": {
            "data": {
              "type": "keyword"
            },
            "hits": {
              "type": "long"
            }
          }
        },
        "hosts": {
          "properties": {
            "data": {
              "type": "ip"
            },
            "hits": {
              "type": "long"
            }
          }
        },
        "os": {
          "properties": {
            "data": {
              "type": "keyword"
            },
            "hits": {
              "type": "long"
            }
          }
        },
        "browsers": {
          "properties": {
            "data": {
              "type": "keyword"
            },
            "hits": {
              "type": "long"
            }
          }
        },
        "visit_time": {
          "properties": {
            "data": {
              "type": "keyword"
            },
            "hits": {
              "type": "long"
            }
          }
        },
        "virtual_hosts": {
          "properties": {
            "data": {
              "type": "keyword"
            },
            "hits": {
              "type": "long"
            }
          }
        },
        "referrers": {
          "properties": {
            "data": {
              "type": "keyword"
            },
            "hits": {
              "type": "long"
            }
          }
        },
        "referring_sites": {
          "properties": {
            "data": {
              "type": "keyword"
            },
            "hits": {
              "type": "long"
            }
          }
        },
        "keyphrases": {
          "properties": {
            "data": {
              "type": "text",
              "analyzer": "standard"
            },
            "hits": {
              "type": "long"
            }
          }
        },
        "geo_location": {
          "properties": {
            "data": {
              "type": "keyword"
            },
            "hits": {
              "type": "long"
            }
          }
        }
      }
    }
  }
}

1.3 Kibana仪表板配置

#!/usr/bin/env python3
# kibana_dashboard_setup.py - 自动创建Kibana仪表板

import json
import requests
from datetime import datetime
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class KibanaDashboardSetup:
    def __init__(self, kibana_url="http://localhost:5601"):
        self.kibana_url = kibana_url
        self.headers = {
            'Content-Type': 'application/json',
            'kbn-xsrf': 'true'
        }
    
    def create_index_pattern(self):
        """创建索引模式"""
        index_patterns = [
            {
                "id": "goaccess-stats",
                "title": "goaccess-stats-*",
                "timeFieldName": "timestamp"
            },
            {
                "id": "nginx-access",
                "title": "nginx-access-*",
                "timeFieldName": "@timestamp"
            }
        ]
        
        for pattern in index_patterns:
            url = f"{self.kibana_url}/api/saved_objects/index-pattern/{pattern['id']}"
            data = {
                "attributes": {
                    "title": pattern['title'],
                    "timeFieldName": pattern['timeFieldName']
                }
            }
            
            try:
                response = requests.post(url, json=data, headers=self.headers)
                if response.status_code in [200, 409]:  # 200=创建成功, 409=已存在
                    logger.info(f"索引模式 {pattern['id']} 创建成功")
                else:
                    logger.error(f"创建索引模式失败: {response.text}")
            except Exception as e:
                logger.error(f"创建索引模式时出错: {e}")
    
    def create_visualizations(self):
        """创建可视化图表"""
        visualizations = [
            {
                "id": "goaccess-requests-timeline",
                "title": "请求时间线",
                "type": "line",
                "config": {
                    "view": "timeseries",
                    "aggs": [
                        {
                            "id": "1",
                            "type": "sum",
                            "schema": "metric",
                            "params": {
                                "field": "general.total_requests"
                            }
                        },
                        {
                            "id": "2",
                            "type": "date_histogram",
                            "schema": "segment",
                            "params": {
                                "field": "timestamp",
                                "interval": "auto"
                            }
                        }
                    ]
                }
            },
            {
                "id": "goaccess-status-codes",
                "title": "状态码分布",
                "type": "pie",
                "config": {
                    "aggs": [
                        {
                            "id": "1",
                            "type": "sum",
                            "schema": "metric",
                            "params": {
                                "field": "status_codes.hits"
                            }
                        },
                        {
                            "id": "2",
                            "type": "terms",
                            "schema": "segment",
                            "params": {
                                "field": "status_codes.data",
                                "size": 10
                            }
                        }
                    ]
                }
            },
            {
                "id": "goaccess-top-pages",
                "title": "热门页面",
                "type": "horizontal_bar",
                "config": {
                    "aggs": [
                        {
                            "id": "1",
                            "type": "sum",
                            "schema": "metric",
                            "params": {
                                "field": "requests.hits"
                            }
                        },
                        {
                            "id": "2",
                            "type": "terms",
                            "schema": "segment",
                            "params": {
                                "field": "requests.data",
                                "size": 20
                            }
                        }
                    ]
                }
            },
            {
                "id": "goaccess-geo-map",
                "title": "访问者地理分布",
                "type": "tile_map",
                "config": {
                    "aggs": [
                        {
                            "id": "1",
                            "type": "sum",
                            "schema": "metric",
                            "params": {
                                "field": "geo_location.hits"
                            }
                        },
                        {
                            "id": "2",
                            "type": "geohash_grid",
                            "schema": "segment",
                            "params": {
                                "field": "geoip.location",
                                "precision": 3
                            }
                        }
                    ]
                }
            },
            {
                "id": "goaccess-browsers",
                "title": "浏览器分布",
                "type": "pie",
                "config": {
                    "aggs": [
                        {
                            "id": "1",
                            "type": "sum",
                            "schema": "metric",
                            "params": {
                                "field": "browsers.hits"
                            }
                        },
                        {
                            "id": "2",
                            "type": "terms",
                            "schema": "segment",
                            "params": {
                                "field": "browsers.data",
                                "size": 10
                            }
                        }
                    ]
                }
            },
            {
                "id": "goaccess-os-distribution",
                "title": "操作系统分布",
                "type": "pie",
                "config": {
                    "aggs": [
                        {
                            "id": "1",
                            "type": "sum",
                            "schema": "metric",
                            "params": {
                                "field": "os.hits"
                            }
                        },
                        {
                            "id": "2",
                            "type": "terms",
                            "schema": "segment",
                            "params": {
                                "field": "os.data",
                                "size": 10
                            }
                        }
                    ]
                }
            }
        ]
        
        for viz in visualizations:
            self.create_visualization(viz)
    
    def create_visualization(self, viz_config):
        """创建单个可视化图表"""
        url = f"{self.kibana_url}/api/saved_objects/visualization/{viz_config['id']}"
        
        data = {
            "attributes": {
                "title": viz_config['title'],
                "visState": json.dumps({
                    "title": viz_config['title'],
                    "type": viz_config['type'],
                    "params": viz_config['config'],
                    "aggs": viz_config['config'].get('aggs', [])
                }),
                "uiStateJSON": "{}",
                "description": "",
                "version": 1,
                "kibanaSavedObjectMeta": {
                    "searchSourceJSON": json.dumps({
                        "index": "goaccess-stats",
                        "query": {
                            "match_all": {}
                        },
                        "filter": []
                    })
                }
            }
        }
        
        try:
            response = requests.post(url, json=data, headers=self.headers)
            if response.status_code in [200, 409]:
                logger.info(f"可视化图表 {viz_config['title']} 创建成功")
            else:
                logger.error(f"创建可视化图表失败: {response.text}")
        except Exception as e:
            logger.error(f"创建可视化图表时出错: {e}")
    
    def create_dashboard(self):
        """创建仪表板"""
        dashboard_id = "goaccess-main-dashboard"
        url = f"{self.kibana_url}/api/saved_objects/dashboard/{dashboard_id}"
        
        # 定义面板布局
        panels = [
            {
                "gridData": {"x": 0, "y": 0, "w": 24, "h": 15},
                "id": "goaccess-requests-timeline",
                "type": "visualization"
            },
            {
                "gridData": {"x": 24, "y": 0, "w": 24, "h": 15},
                "id": "goaccess-status-codes",
                "type": "visualization"
            },
            {
                "gridData": {"x": 0, "y": 15, "w": 24, "h": 15},
                "id": "goaccess-top-pages",
                "type": "visualization"
            },
            {
                "gridData": {"x": 24, "y": 15, "w": 24, "h": 15},
                "id": "goaccess-geo-map",
                "type": "visualization"
            },
            {
                "gridData": {"x": 0, "y": 30, "w": 24, "h": 15},
                "id": "goaccess-browsers",
                "type": "visualization"
            },
            {
                "gridData": {"x": 24, "y": 30, "w": 24, "h": 15},
                "id": "goaccess-os-distribution",
                "type": "visualization"
            }
        ]
        
        data = {
            "attributes": {
                "title": "GoAccess Web分析仪表板",
                "hits": 0,
                "description": "基于GoAccess数据的综合Web分析仪表板",
                "panelsJSON": json.dumps(panels),
                "optionsJSON": json.dumps({
                    "useMargins": True,
                    "syncColors": False,
                    "hidePanelTitles": False
                }),
                "version": 1,
                "timeRestore": False,
                "kibanaSavedObjectMeta": {
                    "searchSourceJSON": json.dumps({
                        "query": {
                            "match_all": {}
                        },
                        "filter": []
                    })
                }
            }
        }
        
        try:
            response = requests.post(url, json=data, headers=self.headers)
            if response.status_code in [200, 409]:
                logger.info("GoAccess仪表板创建成功")
                logger.info(f"访问地址: {self.kibana_url}/app/kibana#/dashboard/{dashboard_id}")
            else:
                logger.error(f"创建仪表板失败: {response.text}")
        except Exception as e:
            logger.error(f"创建仪表板时出错: {e}")
    
    def setup_all(self):
        """执行完整设置"""
        logger.info("开始设置Splunk GoAccess集成...")
        
        # 创建仪表板
        self.create_dashboard()
        
        # 创建告警
        self.create_alerts()
        
        logger.info("Splunk GoAccess集成设置完成")

def main():
    import argparse
    
    parser = argparse.ArgumentParser(description='Splunk GoAccess集成设置')
    parser.add_argument('--splunk-url', required=True, help='Splunk服务器URL')
    parser.add_argument('--username', required=True, help='Splunk用户名')
    parser.add_argument('--password', required=True, help='Splunk密码')
    
    args = parser.parse_args()
    
    creator = SplunkDashboardCreator(
        splunk_url=args.splunk_url,
        username=args.username,
        password=args.password
    )
    
    creator.setup_all()

if __name__ == "__main__":
    main()

4.2 Redis集成

#!/usr/bin/env python3
# redis_integration.py - GoAccess Redis集成

import json
import redis
import subprocess
from datetime import datetime, timedelta
import logging
import time
import hashlib

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class GoAccessRedisIntegration:
    def __init__(self, redis_config, log_file):
        self.redis_config = redis_config
        self.log_file = log_file
        self.redis_client = None
        
        # 连接Redis
        self.connect_redis()
    
    def connect_redis(self):
        """连接Redis数据库"""
        try:
            self.redis_client = redis.Redis(**self.redis_config)
            # 测试连接
            self.redis_client.ping()
            logger.info("Redis连接成功")
        except Exception as e:
            logger.error(f"连接Redis失败: {e}")
            raise
    
    def get_goaccess_data(self):
        """获取GoAccess数据"""
        try:
            cmd = [
                'goaccess', self.log_file,
                '--log-format=COMBINED',
                '--json-pretty-print'
            ]
            
            result = subprocess.run(
                cmd, 
                capture_output=True, 
                text=True, 
                timeout=60
            )
            
            if result.returncode == 0:
                return json.loads(result.stdout)
            else:
                logger.error(f"GoAccess执行失败: {result.stderr}")
                return None
                
        except Exception as e:
            logger.error(f"获取GoAccess数据时出错: {e}")
            return None
    
    def store_summary_data(self, data):
        """存储汇总数据"""
        try:
            general = data.get('general', {})
            timestamp = datetime.now().isoformat()
            
            # 存储当前统计数据
            summary_key = f"goaccess:summary:{timestamp}"
            summary_data = {
                'timestamp': timestamp,
                'total_requests': general.get('total_requests', 0),
                'unique_visitors': general.get('unique_visitors', 0),
                'bandwidth': general.get('bandwidth', 0),
                'log_size': general.get('log_size', 0)
            }
            
            # 计算错误率
            status_codes = data.get('status_codes', {}).get('data', [])
            total_requests = general.get('total_requests', 0)
            error_requests = sum(
                item.get('hits', 0) for item in status_codes
                if item.get('data', '').startswith(('4', '5'))
            )
            error_rate = (error_requests / total_requests * 100) if total_requests > 0 else 0
            summary_data['error_rate'] = round(error_rate, 2)
            
            # 存储到Redis
            self.redis_client.hset(summary_key, mapping=summary_data)
            
            # 设置过期时间(7天)
            self.redis_client.expire(summary_key, 7 * 24 * 3600)
            
            # 添加到时间序列索引
            self.redis_client.zadd('goaccess:summary:index', {summary_key: time.time()})
            
            logger.info(f"汇总数据存储成功: {summary_key}")
            
        except Exception as e:
            logger.error(f"存储汇总数据失败: {e}")
    
    def store_requests_data(self, data):
        """存储请求数据"""
        try:
            requests_data = data.get('requests', {}).get('data', [])
            timestamp = datetime.now().isoformat()
            
            # 存储热门请求(前50个)
            for i, item in enumerate(requests_data[:50]):
                request_key = f"goaccess:requests:{timestamp}:{i}"
                request_data = {
                    'timestamp': timestamp,
                    'request_path': item.get('data', ''),
                    'hits': item.get('hits', 0),
                    'visitors': item.get('visitors', 0),
                    'bandwidth': item.get('bandwidth', 0)
                }
                
                self.redis_client.hset(request_key, mapping=request_data)
                self.redis_client.expire(request_key, 3 * 24 * 3600)  # 3天过期
            
            # 创建请求索引
            requests_index_key = f"goaccess:requests:index:{timestamp}"
            self.redis_client.zadd(
                requests_index_key, 
                {f"goaccess:requests:{timestamp}:{i}": item.get('hits', 0) 
                 for i, item in enumerate(requests_data[:50])}
            )
            self.redis_client.expire(requests_index_key, 3 * 24 * 3600)
            
            logger.info(f"请求数据存储成功,共{min(50, len(requests_data))}条")
            
        except Exception as e:
            logger.error(f"存储请求数据失败: {e}")
    
    def store_realtime_metrics(self, data):
        """存储实时指标"""
        try:
            general = data.get('general', {})
            current_time = int(time.time())
            
            # 存储实时指标到时间序列
            metrics = {
                'requests_per_second': general.get('total_requests', 0) / 3600,  # 假设1小时数据
                'unique_visitors': general.get('unique_visitors', 0),
                'bandwidth_per_second': general.get('bandwidth', 0) / 3600
            }
            
            for metric_name, value in metrics.items():
                metric_key = f"goaccess:realtime:{metric_name}"
                
                # 添加到时间序列(保留最近1小时的数据)
                self.redis_client.zadd(metric_key, {current_time: value})
                
                # 清理1小时前的数据
                one_hour_ago = current_time - 3600
                self.redis_client.zremrangebyscore(metric_key, 0, one_hour_ago)
            
            # 存储当前状态
            status_key = "goaccess:current_status"
            status_data = {
                'last_update': datetime.now().isoformat(),
                'total_requests': general.get('total_requests', 0),
                'unique_visitors': general.get('unique_visitors', 0),
                'bandwidth': general.get('bandwidth', 0)
            }
            
            self.redis_client.hset(status_key, mapping=status_data)
            
            logger.info("实时指标存储成功")
            
        except Exception as e:
            logger.error(f"存储实时指标失败: {e}")
    
    def store_geo_data(self, data):
        """存储地理位置数据"""
        try:
            geo_data = data.get('geo_location', {}).get('data', [])
            timestamp = datetime.now().isoformat()
            
            # 存储地理位置统计
            geo_key = f"goaccess:geo:{timestamp}"
            geo_stats = {}
            
            for item in geo_data[:30]:  # 前30个国家/地区
                country = item.get('data', 'Unknown')
                hits = item.get('hits', 0)
                geo_stats[country] = hits
            
            if geo_stats:
                self.redis_client.hset(geo_key, mapping=geo_stats)
                self.redis_client.expire(geo_key, 7 * 24 * 3600)  # 7天过期
                
                # 添加到地理位置索引
                self.redis_client.zadd('goaccess:geo:index', {geo_key: time.time()})
            
            logger.info(f"地理位置数据存储成功,共{len(geo_stats)}个国家/地区")
            
        except Exception as e:
            logger.error(f"存储地理位置数据失败: {e}")
    
    def get_realtime_metrics(self, minutes=60):
        """获取实时指标"""
        try:
            current_time = int(time.time())
            start_time = current_time - (minutes * 60)
            
            metrics = {}
            metric_names = ['requests_per_second', 'unique_visitors', 'bandwidth_per_second']
            
            for metric_name in metric_names:
                metric_key = f"goaccess:realtime:{metric_name}"
                
                # 获取时间范围内的数据
                data_points = self.redis_client.zrangebyscore(
                    metric_key, start_time, current_time, withscores=True
                )
                
                metrics[metric_name] = [
                    {'timestamp': int(score), 'value': float(value)} 
                    for value, score in data_points
                ]
            
            return metrics
            
        except Exception as e:
            logger.error(f"获取实时指标失败: {e}")
            return {}
    
    def get_current_status(self):
        """获取当前状态"""
        try:
            status_key = "goaccess:current_status"
            status_data = self.redis_client.hgetall(status_key)
            
            # 转换字节字符串为普通字符串
            return {k.decode(): v.decode() for k, v in status_data.items()}
            
        except Exception as e:
            logger.error(f"获取当前状态失败: {e}")
            return {}
    
    def get_top_requests(self, hours=24, limit=20):
        """获取热门请求"""
        try:
            current_time = time.time()
            start_time = current_time - (hours * 3600)
            
            # 获取时间范围内的请求索引
            index_keys = self.redis_client.zrangebyscore(
                'goaccess:requests:index', start_time, current_time
            )
            
            request_stats = {}
            
            for index_key in index_keys:
                if isinstance(index_key, bytes):
                    index_key = index_key.decode()
                
                # 获取该时间点的请求数据
                request_keys = self.redis_client.zrevrange(index_key, 0, limit-1)
                
                for request_key in request_keys:
                    if isinstance(request_key, bytes):
                        request_key = request_key.decode()
                    
                    request_data = self.redis_client.hgetall(request_key)
                    if request_data:
                        path = request_data.get(b'request_path', b'').decode()
                        hits = int(request_data.get(b'hits', 0))
                        
                        if path in request_stats:
                            request_stats[path] += hits
                        else:
                            request_stats[path] = hits
            
            # 排序并返回前N个
            sorted_requests = sorted(
                request_stats.items(), 
                key=lambda x: x[1], 
                reverse=True
            )[:limit]
            
            return [{'path': path, 'hits': hits} for path, hits in sorted_requests]
            
        except Exception as e:
            logger.error(f"获取热门请求失败: {e}")
            return []
    
    def get_geo_statistics(self, hours=24):
        """获取地理位置统计"""
        try:
            current_time = time.time()
            start_time = current_time - (hours * 3600)
            
            # 获取时间范围内的地理位置数据
            geo_keys = self.redis_client.zrangebyscore(
                'goaccess:geo:index', start_time, current_time
            )
            
            country_stats = {}
            
            for geo_key in geo_keys:
                if isinstance(geo_key, bytes):
                    geo_key = geo_key.decode()
                
                geo_data = self.redis_client.hgetall(geo_key)
                
                for country, hits in geo_data.items():
                    if isinstance(country, bytes):
                        country = country.decode()
                    
                    hits = int(hits)
                    
                    if country in country_stats:
                        country_stats[country] += hits
                    else:
                        country_stats[country] = hits
            
            # 排序并返回前20个
            sorted_countries = sorted(
                country_stats.items(), 
                key=lambda x: x[1], 
                reverse=True
            )[:20]
            
            return [{'country': country, 'hits': hits} for country, hits in sorted_countries]
            
        except Exception as e:
            logger.error(f"获取地理位置统计失败: {e}")
            return []
    
    def process_and_store_data(self):
        """处理并存储数据"""
        logger.info("开始处理GoAccess数据...")
        
        # 获取数据
        data = self.get_goaccess_data()
        if not data:
            logger.warning("未获取到GoAccess数据")
            return
        
        try:
            # 存储各类数据
            self.store_summary_data(data)
            self.store_requests_data(data)
            self.store_realtime_metrics(data)
            self.store_geo_data(data)
            
            logger.info("数据处理和存储完成")
            
        except Exception as e:
            logger.error(f"处理数据时出错: {e}")
    
    def cleanup_old_data(self, days=7):
        """清理旧数据"""
        try:
            current_time = time.time()
            cutoff_time = current_time - (days * 24 * 3600)
            
            # 清理汇总数据索引
            removed_summary = self.redis_client.zremrangebyscore(
                'goaccess:summary:index', 0, cutoff_time
            )
            
            # 清理地理位置数据索引
            removed_geo = self.redis_client.zremrangebyscore(
                'goaccess:geo:index', 0, cutoff_time
            )
            
            logger.info(f"清理完成:删除{removed_summary}个汇总记录,{removed_geo}个地理位置记录")
            
        except Exception as e:
            logger.error(f"清理旧数据失败: {e}")
    
    def get_dashboard_data(self):
        """获取仪表板数据"""
        try:
            dashboard_data = {
                'current_status': self.get_current_status(),
                'realtime_metrics': self.get_realtime_metrics(60),
                'top_requests': self.get_top_requests(24, 10),
                'geo_statistics': self.get_geo_statistics(24)
            }
            
            return dashboard_data
            
        except Exception as e:
            logger.error(f"获取仪表板数据失败: {e}")
            return {}

def main():
    # Redis配置
    redis_config = {
        'host': 'localhost',
        'port': 6379,
        'db': 0,
        'decode_responses': False  # 保持字节格式以便处理
    }
    
    log_file = '/var/log/nginx/access.log'
    
    # 创建集成实例
    integration = GoAccessRedisIntegration(redis_config, log_file)
    
    try:
        # 执行一次数据处理
        integration.process_and_store_data()
        
        # 获取仪表板数据
        dashboard_data = integration.get_dashboard_data()
        if dashboard_data:
            print("仪表板数据:")
            print(json.dumps(dashboard_data, indent=2, default=str))
        
        # 清理旧数据
        integration.cleanup_old_data()
        
    except Exception as e:
        logger.error(f"执行失败: {e}")

if __name__ == "__main__":
    main()

5. 集成最佳实践

5.1 数据流设计

# data_flow.yml - 数据流配置
apiVersion: v1
kind: ConfigMap
metadata:
  name: goaccess-integration-config
data:
  flow.conf: |
    # GoAccess数据流配置
    
    # 数据源配置
    sources:
      - name: nginx_access_log
        type: file
        path: /var/log/nginx/access.log
        format: COMBINED
        
      - name: apache_access_log
        type: file
        path: /var/log/apache2/access.log
        format: COMMON
    
    # 处理管道
    pipeline:
      - stage: parse
        processor: goaccess
        config:
          real_time: true
          json_output: true
          
      - stage: enrich
        processor: geo_ip
        config:
          database: /usr/share/GeoIP/GeoLite2-Country.mmdb
          
      - stage: filter
        processor: exclude_bots
        config:
          patterns:
            - "bot"
            - "crawler"
            - "spider"
    
    # 输出目标
    outputs:
      - name: elasticsearch
        type: elasticsearch
        config:
          hosts: ["localhost:9200"]
          index: "goaccess-{+YYYY.MM.dd}"
          
      - name: prometheus
        type: prometheus
        config:
          port: 9090
          metrics_path: /metrics
          
      - name: mysql
        type: mysql
        config:
          host: localhost
          database: goaccess_analytics
          table: access_logs
          
      - name: redis
        type: redis
        config:
          host: localhost
          port: 6379
          db: 0
    
    # 告警配置
    alerts:
      - name: high_error_rate
        condition: error_rate > 5
        actions:
          - email
          - slack
          
      - name: traffic_spike
        condition: requests_per_minute > 1000
        actions:
          - webhook

5.2 性能优化

#!/usr/bin/env python3
# performance_optimizer.py - 性能优化工具

import asyncio
import aiofiles
import json
from concurrent.futures import ThreadPoolExecutor
import logging
import time
from typing import List, Dict, Any

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class GoAccessPerformanceOptimizer:
    def __init__(self, config):
        self.config = config
        self.executor = ThreadPoolExecutor(max_workers=config.get('max_workers', 4))
        self.batch_size = config.get('batch_size', 1000)
        self.buffer_size = config.get('buffer_size', 10000)
        
    async def process_logs_async(self, log_files: List[str]):
        """异步处理多个日志文件"""
        tasks = []
        
        for log_file in log_files:
            task = asyncio.create_task(self.process_single_log(log_file))
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        successful = sum(1 for r in results if not isinstance(r, Exception))
        failed = len(results) - successful
        
        logger.info(f"处理完成:成功{successful}个,失败{failed}个")
        
        return results
    
    async def process_single_log(self, log_file: str):
        """处理单个日志文件"""
        try:
            start_time = time.time()
            
            # 异步读取文件
            async with aiofiles.open(log_file, 'r') as f:
                content = await f.read()
            
            # 在线程池中执行GoAccess处理
            loop = asyncio.get_event_loop()
            result = await loop.run_in_executor(
                self.executor,
                self.run_goaccess,
                log_file
            )
            
            processing_time = time.time() - start_time
            logger.info(f"处理{log_file}完成,耗时{processing_time:.2f}秒")
            
            return result
            
        except Exception as e:
            logger.error(f"处理{log_file}失败: {e}")
            raise
    
    def run_goaccess(self, log_file: str):
        """运行GoAccess分析"""
        import subprocess
        
        cmd = [
            'goaccess', log_file,
            '--log-format=COMBINED',
            '--json-pretty-print',
            '--no-progress'
        ]
        
        result = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            timeout=300  # 5分钟超时
        )
        
        if result.returncode == 0:
            return json.loads(result.stdout)
        else:
            raise Exception(f"GoAccess执行失败: {result.stderr}")
    
    async def batch_process_data(self, data_list: List[Dict[Any, Any]], processor_func):
        """批量处理数据"""
        batches = []
        
        # 分批处理
        for i in range(0, len(data_list), self.batch_size):
            batch = data_list[i:i + self.batch_size]
            batches.append(batch)
        
        tasks = []
        for batch in batches:
            task = asyncio.create_task(self.process_batch(batch, processor_func))
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        return results
    
    async def process_batch(self, batch: List[Dict[Any, Any]], processor_func):
        """处理单个批次"""
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self.executor,
            processor_func,
            batch
        )
    
    def optimize_goaccess_config(self, log_file_size: int) -> Dict[str, Any]:
        """根据日志文件大小优化GoAccess配置"""
        config = {}
        
        # 根据文件大小调整配置
        if log_file_size < 100 * 1024 * 1024:  # < 100MB
            config.update({
                'real_time_html': True,
                'keep_db_files': False,
                'load_from_disk': False
            })
        elif log_file_size < 1024 * 1024 * 1024:  # < 1GB
            config.update({
                'real_time_html': False,
                'keep_db_files': True,
                'load_from_disk': True,
                'db_path': '/tmp/goaccess'
            })
        else:  # >= 1GB
            config.update({
                'real_time_html': False,
                'keep_db_files': True,
                'load_from_disk': True,
                'db_path': '/tmp/goaccess',
                'process_and_exit': True
            })
        
        return config
    
    async def monitor_performance(self, duration_seconds: int = 300):
        """监控性能指标"""
        import psutil
        
        metrics = []
        start_time = time.time()
        
        while time.time() - start_time < duration_seconds:
            metric = {
                'timestamp': time.time(),
                'cpu_percent': psutil.cpu_percent(),
                'memory_percent': psutil.virtual_memory().percent,
                'disk_io': psutil.disk_io_counters()._asdict() if psutil.disk_io_counters() else {},
                'network_io': psutil.net_io_counters()._asdict() if psutil.net_io_counters() else {}
            }
            
            metrics.append(metric)
            await asyncio.sleep(5)  # 每5秒采集一次
        
        return metrics
    
    def generate_performance_report(self, metrics: List[Dict[str, Any]]) -> Dict[str, Any]:
        """生成性能报告"""
        if not metrics:
            return {}
        
        cpu_values = [m['cpu_percent'] for m in metrics]
        memory_values = [m['memory_percent'] for m in metrics]
        
        report = {
            'duration': len(metrics) * 5,  # 秒
            'cpu': {
                'avg': sum(cpu_values) / len(cpu_values),
                'max': max(cpu_values),
                'min': min(cpu_values)
            },
            'memory': {
                'avg': sum(memory_values) / len(memory_values),
                'max': max(memory_values),
                'min': min(memory_values)
            },
            'recommendations': self.generate_recommendations(metrics)
        }
        
        return report
    
    def generate_recommendations(self, metrics: List[Dict[str, Any]]) -> List[str]:
        """生成优化建议"""
        recommendations = []
        
        if not metrics:
            return recommendations
        
        avg_cpu = sum(m['cpu_percent'] for m in metrics) / len(metrics)
        avg_memory = sum(m['memory_percent'] for m in metrics) / len(metrics)
        
        if avg_cpu > 80:
            recommendations.append("CPU使用率过高,建议增加处理线程数或优化GoAccess配置")
        
        if avg_memory > 80:
            recommendations.append("内存使用率过高,建议启用磁盘数据库模式或增加内存")
        
        if avg_cpu < 20 and avg_memory < 30:
            recommendations.append("资源利用率较低,可以增加并发处理数量")
        
        return recommendations

async def main():
    config = {
        'max_workers': 4,
        'batch_size': 1000,
        'buffer_size': 10000
    }
    
    optimizer = GoAccessPerformanceOptimizer(config)
    
    # 示例:处理多个日志文件
    log_files = [
        '/var/log/nginx/access.log',
        '/var/log/nginx/access.log.1',
        '/var/log/apache2/access.log'
    ]
    
    try:
        # 异步处理日志文件
        results = await optimizer.process_logs_async(log_files)
        
        # 监控性能
        metrics = await optimizer.monitor_performance(60)  # 监控1分钟
        
        # 生成性能报告
        report = optimizer.generate_performance_report(metrics)
        
        print("性能报告:")
        print(json.dumps(report, indent=2))
        
    except Exception as e:
        logger.error(f"执行失败: {e}")

if __name__ == "__main__":
    asyncio.run(main())

6. 总结

本章详细介绍了GoAccess与其他工具的集成方法,包括:

6.1 主要集成方案

  1. ELK Stack集成

    • Logstash配置用于处理GoAccess输出
    • Elasticsearch索引模板优化
    • Kibana仪表板自动化创建
  2. Grafana集成

    • Prometheus指标导出
    • 自定义仪表板配置
    • 实时监控展示
  3. Splunk集成

    • 数据输入配置
    • 字段提取和搜索
    • 仪表板和告警设置
  4. 数据库集成

    • MySQL关系型数据库存储
    • Redis缓存和实时数据
    • 数据清理和维护

6.2 集成优势

  • 数据统一管理:将GoAccess数据集成到现有监控体系
  • 实时分析能力:支持实时数据流处理和分析
  • 可视化增强:利用专业工具创建丰富的可视化界面
  • 告警机制:集成现有告警系统,及时发现异常
  • 数据持久化:长期存储和历史数据分析

6.3 最佳实践

  • 性能优化:异步处理、批量操作、资源监控
  • 数据流设计:合理的数据管道和处理流程
  • 错误处理:完善的异常处理和恢复机制
  • 安全考虑:数据传输加密和访问控制
  • 监控运维:系统健康检查和性能监控

下一章我们将学习GoAccess的性能优化和故障排除技巧。

4. 数据库集成

4.1 MySQL集成

#!/usr/bin/env python3
# mysql_integration.py - GoAccess MySQL集成

import json
import mysql.connector
import subprocess
from datetime import datetime
import logging
import schedule
import time

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class GoAccessMySQLIntegration:
    def __init__(self, mysql_config, log_file):
        self.mysql_config = mysql_config
        self.log_file = log_file
        self.connection = None
        
        # 连接数据库
        self.connect_database()
        
        # 创建表结构
        self.create_tables()
    
    def connect_database(self):
        """连接MySQL数据库"""
        try:
            self.connection = mysql.connector.connect(**self.mysql_config)
            logger.info("MySQL数据库连接成功")
        except Exception as e:
            logger.error(f"连接MySQL数据库失败: {e}")
            raise
    
    def create_tables(self):
        """创建数据表"""
        cursor = self.connection.cursor()
        
        # 创建统计汇总表
        create_summary_table = """
        CREATE TABLE IF NOT EXISTS goaccess_summary (
            id INT AUTO_INCREMENT PRIMARY KEY,
            timestamp DATETIME NOT NULL,
            total_requests BIGINT DEFAULT 0,
            unique_visitors BIGINT DEFAULT 0,
            bandwidth BIGINT DEFAULT 0,
            log_size BIGINT DEFAULT 0,
            error_rate DECIMAL(5,2) DEFAULT 0.00,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            INDEX idx_timestamp (timestamp)
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
        """
        
        # 创建请求详情表
        create_requests_table = """
        CREATE TABLE IF NOT EXISTS goaccess_requests (
            id INT AUTO_INCREMENT PRIMARY KEY,
            timestamp DATETIME NOT NULL,
            request_path VARCHAR(500) NOT NULL,
            hits BIGINT DEFAULT 0,
            visitors BIGINT DEFAULT 0,
            bandwidth BIGINT DEFAULT 0,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            INDEX idx_timestamp (timestamp),
            INDEX idx_request_path (request_path(100))
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
        """
        
        # 创建状态码表
        create_status_codes_table = """
        CREATE TABLE IF NOT EXISTS goaccess_status_codes (
            id INT AUTO_INCREMENT PRIMARY KEY,
            timestamp DATETIME NOT NULL,
            status_code VARCHAR(10) NOT NULL,
            hits BIGINT DEFAULT 0,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            INDEX idx_timestamp (timestamp),
            INDEX idx_status_code (status_code)
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
        """
        
        # 创建浏览器统计表
        create_browsers_table = """
        CREATE TABLE IF NOT EXISTS goaccess_browsers (
            id INT AUTO_INCREMENT PRIMARY KEY,
            timestamp DATETIME NOT NULL,
            browser VARCHAR(100) NOT NULL,
            hits BIGINT DEFAULT 0,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            INDEX idx_timestamp (timestamp),
            INDEX idx_browser (browser)
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
        """
        
        # 创建地理位置表
        create_geo_table = """
        CREATE TABLE IF NOT EXISTS goaccess_geo_location (
            id INT AUTO_INCREMENT PRIMARY KEY,
            timestamp DATETIME NOT NULL,
            country VARCHAR(100) NOT NULL,
            hits BIGINT DEFAULT 0,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            INDEX idx_timestamp (timestamp),
            INDEX idx_country (country)
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
        """
        
        tables = [
            create_summary_table,
            create_requests_table,
            create_status_codes_table,
            create_browsers_table,
            create_geo_table
        ]
        
        try:
            for table_sql in tables:
                cursor.execute(table_sql)
            
            self.connection.commit()
            logger.info("数据表创建成功")
            
        except Exception as e:
            logger.error(f"创建数据表失败: {e}")
            raise
        finally:
            cursor.close()
    
    def get_goaccess_data(self):
        """获取GoAccess数据"""
        try:
            cmd = [
                'goaccess', self.log_file,
                '--log-format=COMBINED',
                '--json-pretty-print'
            ]
            
            result = subprocess.run(
                cmd, 
                capture_output=True, 
                text=True, 
                timeout=60
            )
            
            if result.returncode == 0:
                return json.loads(result.stdout)
            else:
                logger.error(f"GoAccess执行失败: {result.stderr}")
                return None
                
        except Exception as e:
            logger.error(f"获取GoAccess数据时出错: {e}")
            return None
    
    def insert_summary_data(self, data, timestamp):
        """插入汇总数据"""
        cursor = self.connection.cursor()
        
        try:
            general = data.get('general', {})
            
            # 计算错误率
            status_codes = data.get('status_codes', {}).get('data', [])
            total_requests = general.get('total_requests', 0)
            error_requests = sum(
                item.get('hits', 0) for item in status_codes
                if item.get('data', '').startswith(('4', '5'))
            )
            error_rate = (error_requests / total_requests * 100) if total_requests > 0 else 0
            
            insert_sql = """
            INSERT INTO goaccess_summary 
            (timestamp, total_requests, unique_visitors, bandwidth, log_size, error_rate)
            VALUES (%s, %s, %s, %s, %s, %s)
            """
            
            values = (
                timestamp,
                general.get('total_requests', 0),
                general.get('unique_visitors', 0),
                general.get('bandwidth', 0),
                general.get('log_size', 0),
                round(error_rate, 2)
            )
            
            cursor.execute(insert_sql, values)
            logger.info("汇总数据插入成功")
            
        except Exception as e:
            logger.error(f"插入汇总数据失败: {e}")
        finally:
            cursor.close()
    
    def insert_requests_data(self, data, timestamp):
        """插入请求数据"""
        cursor = self.connection.cursor()
        
        try:
            requests_data = data.get('requests', {}).get('data', [])
            
            insert_sql = """
            INSERT INTO goaccess_requests 
            (timestamp, request_path, hits, visitors, bandwidth)
            VALUES (%s, %s, %s, %s, %s)
            """
            
            values_list = []
            for item in requests_data[:100]:  # 限制前100个
                values_list.append((
                    timestamp,
                    item.get('data', '')[:500],  # 限制长度
                    item.get('hits', 0),
                    item.get('visitors', 0),
                    item.get('bandwidth', 0)
                ))
            
            if values_list:
                cursor.executemany(insert_sql, values_list)
                logger.info(f"插入{len(values_list)}条请求数据")
            
        except Exception as e:
            logger.error(f"插入请求数据失败: {e}")
        finally:
            cursor.close()
    
    def insert_status_codes_data(self, data, timestamp):
        """插入状态码数据"""
        cursor = self.connection.cursor()
        
        try:
            status_codes = data.get('status_codes', {}).get('data', [])
            
            insert_sql = """
            INSERT INTO goaccess_status_codes 
            (timestamp, status_code, hits)
            VALUES (%s, %s, %s)
            """
            
            values_list = []
            for item in status_codes:
                values_list.append((
                    timestamp,
                    item.get('data', ''),
                    item.get('hits', 0)
                ))
            
            if values_list:
                cursor.executemany(insert_sql, values_list)
                logger.info(f"插入{len(values_list)}条状态码数据")
            
        except Exception as e:
            logger.error(f"插入状态码数据失败: {e}")
        finally:
            cursor.close()
    
    def insert_browsers_data(self, data, timestamp):
        """插入浏览器数据"""
        cursor = self.connection.cursor()
        
        try:
            browsers = data.get('browsers', {}).get('data', [])
            
            insert_sql = """
            INSERT INTO goaccess_browsers 
            (timestamp, browser, hits)
            VALUES (%s, %s, %s)
            """
            
            values_list = []
            for item in browsers[:20]:  # 限制前20个
                values_list.append((
                    timestamp,
                    item.get('data', '')[:100],  # 限制长度
                    item.get('hits', 0)
                ))
            
            if values_list:
                cursor.executemany(insert_sql, values_list)
                logger.info(f"插入{len(values_list)}条浏览器数据")
            
        except Exception as e:
            logger.error(f"插入浏览器数据失败: {e}")
        finally:
            cursor.close()
    
    def insert_geo_data(self, data, timestamp):
        """插入地理位置数据"""
        cursor = self.connection.cursor()
        
        try:
            geo_data = data.get('geo_location', {}).get('data', [])
            
            insert_sql = """
            INSERT INTO goaccess_geo_location 
            (timestamp, country, hits)
            VALUES (%s, %s, %s)
            """
            
            values_list = []
            for item in geo_data[:50]:  # 限制前50个
                values_list.append((
                    timestamp,
                    item.get('data', '')[:100],  # 限制长度
                    item.get('hits', 0)
                ))
            
            if values_list:
                cursor.executemany(insert_sql, values_list)
                logger.info(f"插入{len(values_list)}条地理位置数据")
            
        except Exception as e:
            logger.error(f"插入地理位置数据失败: {e}")
        finally:
            cursor.close()
    
    def process_and_store_data(self):
        """处理并存储数据"""
        logger.info("开始处理GoAccess数据...")
        
        # 获取数据
        data = self.get_goaccess_data()
        if not data:
            logger.warning("未获取到GoAccess数据")
            return
        
        timestamp = datetime.now()
        
        try:
            # 插入各类数据
            self.insert_summary_data(data, timestamp)
            self.insert_requests_data(data, timestamp)
            self.insert_status_codes_data(data, timestamp)
            self.insert_browsers_data(data, timestamp)
            self.insert_geo_data(data, timestamp)
            
            # 提交事务
            self.connection.commit()
            logger.info("数据处理和存储完成")
            
        except Exception as e:
            logger.error(f"处理数据时出错: {e}")
            self.connection.rollback()
    
    def cleanup_old_data(self, days=30):
        """清理旧数据"""
        cursor = self.connection.cursor()
        
        tables = [
            'goaccess_summary',
            'goaccess_requests', 
            'goaccess_status_codes',
            'goaccess_browsers',
            'goaccess_geo_location'
        ]
        
        try:
            for table in tables:
                delete_sql = f"""
                DELETE FROM {table} 
                WHERE timestamp < DATE_SUB(NOW(), INTERVAL %s DAY)
                """
                
                cursor.execute(delete_sql, (days,))
                deleted_rows = cursor.rowcount
                logger.info(f"从{table}表删除了{deleted_rows}行旧数据")
            
            self.connection.commit()
            logger.info(f"清理{days}天前的旧数据完成")
            
        except Exception as e:
            logger.error(f"清理旧数据失败: {e}")
            self.connection.rollback()
        finally:
            cursor.close()
    
    def get_statistics(self, hours=24):
        """获取统计信息"""
        cursor = self.connection.cursor(dictionary=True)
        
        try:
            # 获取最近24小时的统计
            stats_sql = """
            SELECT 
                AVG(total_requests) as avg_requests,
                MAX(total_requests) as max_requests,
                AVG(unique_visitors) as avg_visitors,
                MAX(unique_visitors) as max_visitors,
                AVG(error_rate) as avg_error_rate,
                MAX(error_rate) as max_error_rate,
                COUNT(*) as data_points
            FROM goaccess_summary 
            WHERE timestamp >= DATE_SUB(NOW(), INTERVAL %s HOUR)
            """
            
            cursor.execute(stats_sql, (hours,))
            stats = cursor.fetchone()
            
            # 获取热门页面
            top_pages_sql = """
            SELECT 
                request_path,
                SUM(hits) as total_hits,
                SUM(visitors) as total_visitors
            FROM goaccess_requests 
            WHERE timestamp >= DATE_SUB(NOW(), INTERVAL %s HOUR)
            GROUP BY request_path
            ORDER BY total_hits DESC
            LIMIT 10
            """
            
            cursor.execute(top_pages_sql, (hours,))
            top_pages = cursor.fetchall()
            
            # 获取状态码分布
            status_distribution_sql = """
            SELECT 
                status_code,
                SUM(hits) as total_hits
            FROM goaccess_status_codes 
            WHERE timestamp >= DATE_SUB(NOW(), INTERVAL %s HOUR)
            GROUP BY status_code
            ORDER BY total_hits DESC
            """
            
            cursor.execute(status_distribution_sql, (hours,))
            status_distribution = cursor.fetchall()
            
            return {
                'summary': stats,
                'top_pages': top_pages,
                'status_distribution': status_distribution
            }
            
        except Exception as e:
            logger.error(f"获取统计信息失败: {e}")
            return None
        finally:
            cursor.close()
    
    def start_scheduled_processing(self, interval_minutes=5):
        """启动定时处理"""
        logger.info(f"启动定时数据处理,间隔{interval_minutes}分钟")
        
        # 调度数据处理
        schedule.every(interval_minutes).minutes.do(self.process_and_store_data)
        
        # 调度数据清理(每天凌晨2点)
        schedule.every().day.at("02:00").do(self.cleanup_old_data)
        
        try:
            while True:
                schedule.run_pending()
                time.sleep(60)  # 每分钟检查一次
        except KeyboardInterrupt:
            logger.info("定时处理被用户中断")
    
    def close(self):
        """关闭数据库连接"""
        if self.connection:
            self.connection.close()
            logger.info("数据库连接已关闭")

def main():
    # MySQL配置
    mysql_config = {
        'host': 'localhost',
        'port': 3306,
        'user': 'goaccess',
        'password': 'your_password',
        'database': 'goaccess_analytics',
        'charset': 'utf8mb4',
        'autocommit': False
    }
    
    log_file = '/var/log/nginx/access.log'
    
    # 创建集成实例
    integration = GoAccessMySQLIntegration(mysql_config, log_file)
    
    try:
        # 执行一次数据处理
        integration.process_and_store_data()
        
        # 获取统计信息
        stats = integration.get_statistics()
        if stats:
            print("统计信息:")
            print(json.dumps(stats, indent=2, default=str))
        
        # 启动定时处理(可选)
        # integration.start_scheduled_processing()
        
    finally:
        integration.close()

if __name__ == "__main__":
    main()
    logger.info("开始设置Kibana仪表板...")

    # 创建索引模式
    self.create_index_pattern()

    # 等待一下让索引模式生效
    import time
    time.sleep(2)

    # 创建可视化图表
    self.create_visualizations()

    # 等待一下让可视化图表生效
    time.sleep(2)

    # 创建仪表板
    self.create_dashboard()

    logger.info("Kibana仪表板设置完成")

def main(): setup = KibanaDashboardSetup() setup.setup_all()

if name == “main”: main()


## 2. Grafana集成

### 2.1 Prometheus指标导出

```python
#!/usr/bin/env python3
# goaccess_prometheus_exporter.py - GoAccess Prometheus指标导出器

import json
import time
import subprocess
import logging
from prometheus_client import start_http_server, Gauge, Counter, Histogram
from prometheus_client.core import CollectorRegistry
import threading
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class GoAccessPrometheusExporter:
    def __init__(self, log_file, port=8000, interval=60):
        self.log_file = log_file
        self.port = port
        self.interval = interval
        
        # 创建注册表
        self.registry = CollectorRegistry()
        
        # 定义指标
        self.setup_metrics()
        
        # 启动标志
        self.running = False
    
    def setup_metrics(self):
        """设置Prometheus指标"""
        # 总体指标
        self.total_requests = Gauge(
            'goaccess_total_requests',
            'Total number of requests',
            registry=self.registry
        )
        
        self.unique_visitors = Gauge(
            'goaccess_unique_visitors',
            'Number of unique visitors',
            registry=self.registry
        )
        
        self.bandwidth_bytes = Gauge(
            'goaccess_bandwidth_bytes',
            'Total bandwidth in bytes',
            registry=self.registry
        )
        
        self.log_size_bytes = Gauge(
            'goaccess_log_size_bytes',
            'Log file size in bytes',
            registry=self.registry
        )
        
        # 状态码指标
        self.status_code_requests = Gauge(
            'goaccess_status_code_requests',
            'Number of requests by status code',
            ['status_code'],
            registry=self.registry
        )
        
        # 热门页面指标
        self.page_requests = Gauge(
            'goaccess_page_requests',
            'Number of requests by page',
            ['page'],
            registry=self.registry
        )
        
        # 浏览器指标
        self.browser_requests = Gauge(
            'goaccess_browser_requests',
            'Number of requests by browser',
            ['browser'],
            registry=self.registry
        )
        
        # 操作系统指标
        self.os_requests = Gauge(
            'goaccess_os_requests',
            'Number of requests by operating system',
            ['os'],
            registry=self.registry
        )
        
        # 地理位置指标
        self.geo_requests = Gauge(
            'goaccess_geo_requests',
            'Number of requests by country',
            ['country'],
            registry=self.registry
        )
        
        # 引荐来源指标
        self.referrer_requests = Gauge(
            'goaccess_referrer_requests',
            'Number of requests by referrer',
            ['referrer'],
            registry=self.registry
        )
        
        # 访问时间指标
        self.visit_time_requests = Gauge(
            'goaccess_visit_time_requests',
            'Number of requests by visit time',
            ['hour'],
            registry=self.registry
        )
        
        # 虚拟主机指标
        self.vhost_requests = Gauge(
            'goaccess_vhost_requests',
            'Number of requests by virtual host',
            ['vhost'],
            registry=self.registry
        )
        
        # 错误率指标
        self.error_rate = Gauge(
            'goaccess_error_rate',
            'Error rate percentage',
            registry=self.registry
        )
        
        # 导出器状态指标
        self.exporter_last_update = Gauge(
            'goaccess_exporter_last_update_timestamp',
            'Timestamp of last successful update',
            registry=self.registry
        )
        
        self.exporter_errors_total = Counter(
            'goaccess_exporter_errors_total',
            'Total number of exporter errors',
            registry=self.registry
        )
    
    def get_goaccess_data(self):
        """获取GoAccess数据"""
        try:
            cmd = [
                'goaccess', self.log_file,
                '--log-format=COMBINED',
                '--json-pretty-print'
            ]
            
            result = subprocess.run(
                cmd, 
                capture_output=True, 
                text=True, 
                timeout=30
            )
            
            if result.returncode == 0:
                return json.loads(result.stdout)
            else:
                logger.error(f"GoAccess执行失败: {result.stderr}")
                return None
                
        except subprocess.TimeoutExpired:
            logger.error("GoAccess执行超时")
            return None
        except json.JSONDecodeError as e:
            logger.error(f"JSON解析失败: {e}")
            return None
        except Exception as e:
            logger.error(f"获取GoAccess数据时出错: {e}")
            return None
    
    def update_metrics(self, data):
        """更新Prometheus指标"""
        if not data:
            return
        
        try:
            # 更新总体指标
            general = data.get('general', {})
            self.total_requests.set(general.get('total_requests', 0))
            self.unique_visitors.set(general.get('unique_visitors', 0))
            self.bandwidth_bytes.set(general.get('bandwidth', 0))
            self.log_size_bytes.set(general.get('log_size', 0))
            
            # 更新状态码指标
            status_codes = data.get('status_codes', {}).get('data', [])
            for item in status_codes[:10]:  # 限制前10个
                status_code = item.get('data', 'unknown')
                hits = item.get('hits', 0)
                self.status_code_requests.labels(status_code=status_code).set(hits)
            
            # 更新页面指标
            requests_data = data.get('requests', {}).get('data', [])
            for item in requests_data[:20]:  # 限制前20个
                page = item.get('data', 'unknown')
                hits = item.get('hits', 0)
                # 清理页面名称,避免标签值过长
                page = page[:100] if len(page) > 100 else page
                self.page_requests.labels(page=page).set(hits)
            
            # 更新浏览器指标
            browsers = data.get('browsers', {}).get('data', [])
            for item in browsers[:10]:
                browser = item.get('data', 'unknown')
                hits = item.get('hits', 0)
                self.browser_requests.labels(browser=browser).set(hits)
            
            # 更新操作系统指标
            os_data = data.get('os', {}).get('data', [])
            for item in os_data[:10]:
                os_name = item.get('data', 'unknown')
                hits = item.get('hits', 0)
                self.os_requests.labels(os=os_name).set(hits)
            
            # 更新地理位置指标
            geo_data = data.get('geo_location', {}).get('data', [])
            for item in geo_data[:15]:
                country = item.get('data', 'unknown')
                hits = item.get('hits', 0)
                self.geo_requests.labels(country=country).set(hits)
            
            # 更新引荐来源指标
            referrers = data.get('referring_sites', {}).get('data', [])
            for item in referrers[:10]:
                referrer = item.get('data', 'unknown')
                hits = item.get('hits', 0)
                # 清理引荐来源名称
                referrer = referrer[:50] if len(referrer) > 50 else referrer
                self.referrer_requests.labels(referrer=referrer).set(hits)
            
            # 更新访问时间指标
            visit_time = data.get('visit_time', {}).get('data', [])
            for item in visit_time:
                hour = item.get('data', 'unknown')
                hits = item.get('hits', 0)
                self.visit_time_requests.labels(hour=hour).set(hits)
            
            # 更新虚拟主机指标
            vhosts = data.get('virtual_hosts', {}).get('data', [])
            for item in vhosts[:10]:
                vhost = item.get('data', 'unknown')
                hits = item.get('hits', 0)
                self.vhost_requests.labels(vhost=vhost).set(hits)
            
            # 计算错误率
            total_requests = general.get('total_requests', 0)
            if total_requests > 0:
                error_requests = sum(
                    item.get('hits', 0) for item in status_codes
                    if item.get('data', '').startswith(('4', '5'))
                )
                error_rate = (error_requests / total_requests) * 100
                self.error_rate.set(error_rate)
            
            # 更新导出器状态
            self.exporter_last_update.set(time.time())
            
            logger.info(f"指标更新成功,总请求数: {general.get('total_requests', 0)}")
            
        except Exception as e:
            logger.error(f"更新指标时出错: {e}")
            self.exporter_errors_total.inc()
    
    def collect_loop(self):
        """数据收集循环"""
        while self.running:
            try:
                logger.info("开始收集GoAccess数据...")
                data = self.get_goaccess_data()
                self.update_metrics(data)
                
            except Exception as e:
                logger.error(f"收集数据时出错: {e}")
                self.exporter_errors_total.inc()
            
            # 等待下次收集
            time.sleep(self.interval)
    
    def start(self):
        """启动导出器"""
        logger.info(f"启动GoAccess Prometheus导出器,端口: {self.port}")
        
        # 启动HTTP服务器
        start_http_server(self.port, registry=self.registry)
        
        # 启动数据收集
        self.running = True
        collect_thread = threading.Thread(target=self.collect_loop)
        collect_thread.daemon = True
        collect_thread.start()
        
        logger.info(f"导出器已启动,指标地址: http://localhost:{self.port}/metrics")
        
        try:
            # 保持主线程运行
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            logger.info("收到中断信号,正在停止...")
            self.stop()
    
    def stop(self):
        """停止导出器"""
        self.running = False
        logger.info("导出器已停止")

def main():
    import argparse
    
    parser = argparse.ArgumentParser(description='GoAccess Prometheus导出器')
    parser.add_argument('--log-file', required=True, help='日志文件路径')
    parser.add_argument('--port', type=int, default=8000, help='HTTP服务端口')
    parser.add_argument('--interval', type=int, default=60, help='数据收集间隔(秒)')
    
    args = parser.parse_args()
    
    exporter = GoAccessPrometheusExporter(
        log_file=args.log_file,
        port=args.port,
        interval=args.interval
    )
    
    exporter.start()

if __name__ == "__main__":
    main()

2.2 Grafana仪表板配置

{
  "dashboard": {
    "id": null,
    "title": "GoAccess Web Analytics",
    "tags": ["goaccess", "web", "analytics"],
    "style": "dark",
    "timezone": "browser",
    "refresh": "30s",
    "time": {
      "from": "now-1h",
      "to": "now"
    },
    "panels": [
      {
        "id": 1,
        "title": "总请求数",
        "type": "stat",
        "gridPos": {"h": 8, "w": 6, "x": 0, "y": 0},
        "targets": [
          {
            "expr": "goaccess_total_requests",
            "legendFormat": "总请求数"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "color": {"mode": "palette-classic"},
            "unit": "short",
            "thresholds": {
              "steps": [
                {"color": "green", "value": null},
                {"color": "yellow", "value": 1000},
                {"color": "red", "value": 10000}
              ]
            }
          }
        }
      },
      {
        "id": 2,
        "title": "独立访客",
        "type": "stat",
        "gridPos": {"h": 8, "w": 6, "x": 6, "y": 0},
        "targets": [
          {
            "expr": "goaccess_unique_visitors",
            "legendFormat": "独立访客"
          }
        ]
      },
      {
        "id": 3,
        "title": "带宽使用",
        "type": "stat",
        "gridPos": {"h": 8, "w": 6, "x": 12, "y": 0},
        "targets": [
          {
            "expr": "goaccess_bandwidth_bytes",
            "legendFormat": "带宽"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "unit": "bytes"
          }
        }
      },
      {
        "id": 4,
        "title": "错误率",
        "type": "stat",
        "gridPos": {"h": 8, "w": 6, "x": 18, "y": 0},
        "targets": [
          {
            "expr": "goaccess_error_rate",
            "legendFormat": "错误率"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "unit": "percent",
            "thresholds": {
              "steps": [
                {"color": "green", "value": null},
                {"color": "yellow", "value": 5},
                {"color": "red", "value": 10}
              ]
            }
          }
        }
      },
      {
        "id": 5,
        "title": "请求趋势",
        "type": "timeseries",
        "gridPos": {"h": 8, "w": 12, "x": 0, "y": 8},
        "targets": [
          {
            "expr": "rate(goaccess_total_requests[5m]) * 60",
            "legendFormat": "每分钟请求数"
          }
        ]
      },
      {
        "id": 6,
        "title": "状态码分布",
        "type": "piechart",
        "gridPos": {"h": 8, "w": 12, "x": 12, "y": 8},
        "targets": [
          {
            "expr": "goaccess_status_code_requests",
            "legendFormat": "{{status_code}}"
          }
        ]
      },
      {
        "id": 7,
        "title": "热门页面",
        "type": "barchart",
        "gridPos": {"h": 8, "w": 12, "x": 0, "y": 16},
        "targets": [
          {
            "expr": "topk(10, goaccess_page_requests)",
            "legendFormat": "{{page}}"
          }
        ]
      },
      {
        "id": 8,
        "title": "浏览器分布",
        "type": "piechart",
        "gridPos": {"h": 8, "w": 12, "x": 12, "y": 16},
        "targets": [
          {
            "expr": "goaccess_browser_requests",
            "legendFormat": "{{browser}}"
          }
        ]
      },
      {
        "id": 9,
        "title": "地理分布",
        "type": "geomap",
        "gridPos": {"h": 8, "w": 12, "x": 0, "y": 24},
        "targets": [
          {
            "expr": "goaccess_geo_requests",
            "legendFormat": "{{country}}"
          }
        ]
      },
      {
        "id": 10,
        "title": "访问时间分布",
        "type": "barchart",
        "gridPos": {"h": 8, "w": 12, "x": 12, "y": 24},
        "targets": [
          {
            "expr": "goaccess_visit_time_requests",
            "legendFormat": "{{hour}}时"
          }
        ]
      }
    ]
  }
}

3. Splunk集成

3.1 Splunk数据输入配置

# /opt/splunk/etc/apps/goaccess/local/inputs.conf
[monitor:///var/log/goaccess/*.json]
disabled = false
index = goaccess
sourcetype = goaccess:json
host_segment = 4

[monitor:///var/log/nginx/access.log]
disabled = false
index = web_logs
sourcetype = nginx:access
host_segment = 4

# 实时监控
[monitor:///var/log/goaccess/realtime.json]
disabled = false
index = goaccess_realtime
sourcetype = goaccess:realtime
host_segment = 4
followTail = 1

3.2 Splunk字段提取配置

# /opt/splunk/etc/apps/goaccess/local/props.conf
[goaccess:json]
KV_MODE = json
TIME_PREFIX = "timestamp":"
TIME_FORMAT = %Y-%m-%dT%H:%M:%S
MAX_TIMESTAMP_LOOKAHEAD = 25
SHOULD_LINEMERGE = false
TRUNCATE = 10000

# 字段提取
EXTRACT-general_stats = "general":\s*{[^}]*"total_requests":\s*(?<total_requests>\d+)[^}]*"unique_visitors":\s*(?<unique_visitors>\d+)[^}]*"bandwidth":\s*(?<bandwidth>\d+)
EXTRACT-status_codes = "status_codes":\s*{[^}]*"data":\s*\[(?<status_code_data>[^\]]+)\]
EXTRACT-requests = "requests":\s*{[^}]*"data":\s*\[(?<request_data>[^\]]+)\]

[goaccess:realtime]
KV_MODE = json
TIME_PREFIX = timestamp
TIME_FORMAT = %Y-%m-%dT%H:%M:%S
SHOULD_LINEMERGE = false

[nginx:access]
REPORT-nginx_access = nginx_access_extract
TIME_FORMAT = %d/%b/%Y:%H:%M:%S %z
MAX_TIMESTAMP_LOOKAHEAD = 30

3.3 Splunk搜索和仪表板

”`python #!/usr/bin/env python3

splunk_dashboard_creator.py - 创建Splunk仪表板

import json import requests from datetime import datetime import logging

logging.basicConfig(level=logging.INFO) logger = logging.getLogger(name)

class SplunkDashboardCreator: def init(self, splunk_url, username, password): self.splunk_url = splunk_url.rstrip(‘/’) self.username = username self.password = password self.session_key = None

    # 获取会话密钥
    self.authenticate()

def authenticate(self):
    """认证并获取会话密钥"""
    auth_url = f"{self.splunk_url}/services/auth/login"

    data = {
        'username': self.username,
        'password': self.password
    }

    try:
        response = requests.post(auth_url, data=data, verify=False)
        response.raise_for_status()

        # 解析XML响应获取会话密钥
        import xml.etree.ElementTree as ET
        root = ET.fromstring(response.text)
        self.session_key = root.find('.//sessionKey').text

        logger.info("Splunk认证成功")

    except Exception as e:
        logger.error(f"Splunk认证失败: {e}")
        raise

def create_saved_search(self, name, search_query, description=""):
    """创建保存的搜索"""
    url = f"{self.splunk_url}/services/saved/searches"

    headers = {
        'Authorization': f'Splunk {self.session_key}',
        'Content-Type': 'application/x-www-form-urlencoded'
    }

    data = {
        'name': name,
        'search': search_query,
        'description': description,
        'is_visible': '1',
        'is_scheduled': '0'
    }

    try:
        response = requests.post(url, data=data, headers=headers, verify=False)
        if response.status_code in [200, 201, 409]:  # 409表示已存在
            logger.info(f"保存的搜索 '{name}' 创建成功")
        else:
            logger.error(f"创建保存的搜索失败: {response.text}")
    except Exception as e:
        logger.error(f"创建保存的搜索时出错: {e}")

def create_dashboard(self):
    """创建GoAccess仪表板"""
    dashboard_xml = '''

基于GoAccess数据的Web分析仪表板

总体统计 index=goaccess sourcetype=goaccess:json | eval total_requests=spath(_raw, “general.total_requests”) | eval unique_visitors=spath(_raw, “general.unique_visitors”) | eval bandwidth=spath(_raw, “general.bandwidth”) | stats latest(total_requests) as “总请求数”, latest(unique_visitors) as “独立访客”, latest(bandwidth) as “带宽(字节)” -1h@h now 30s

<panel>
  <title>错误率趋势</title>
  <chart>
    <search>
      <query>
        index=goaccess sourcetype=goaccess:json
        | eval status_codes=spath(_raw, "status_codes.data{}")
        | mvexpand status_codes
        | eval status_code=spath(status_codes, "data")
        | eval hits=spath(status_codes, "hits")
        | eval is_error=if(match(status_code, "^[45]"), hits, 0)
        | stats sum(hits) as total_hits, sum(is_error) as error_hits by _time
        | eval error_rate=round((error_hits/total_hits)*100, 2)
        | timechart span=5m avg(error_rate) as "错误率(%)"
      </query>
      <earliest>-4h@h</earliest>
      <latest>now</latest>
      <refresh>1m</refresh>
    </search>
    <option name="charting.chart">line</option>
    <option name="charting.axisTitleY.text">错误率 (%)</option>
    <option name="charting.legend.placement">bottom</option>
  </chart>
</panel>

状态码分布 index=goaccess sourcetype=goaccess:json | eval status_codes=spath(_raw, “status_codes.data{}”) | mvexpand status_codes | eval status_code=spath(status_codes, “data”) | eval hits=spath(status_codes, “hits”) | stats sum(hits) as count by status_code | sort -count -1h@h now 30s

<panel>
  <title>热门页面</title>
  <table>
    <search>
      <query>
        index=goaccess sourcetype=goaccess:json
        | eval requests=spath(_raw, "requests.data{}")
        | mvexpand requests
        | eval page=spath(requests, "data")
        | eval hits=spath(requests, "hits")
        | eval visitors=spath(requests, "visitors")
        | eval bandwidth=spath(requests, "bandwidth")
        | stats sum(hits) as "访问次数", sum(visitors) as "独立访客", sum(bandwidth) as "带宽" by page
        | sort -"访问次数"
        | head 20
        | rename page as "页面"
      </query>
      <earliest>-1h@h</earliest>
      <latest>now</latest>
      <refresh>30s</refresh>
    </search>
    <option name="drilldown">cell</option>
    <option name="refresh.display">progressbar</option>
  </table>
</panel>

浏览器分布 index=goaccess sourcetype=goaccess:json | eval browsers=spath(_raw, “browsers.data{}”) | mvexpand browsers | eval browser=spath(browsers, “data”) | eval hits=spath(browsers, “hits”) | stats sum(hits) as count by browser | sort -count | head 10 -1h@h now 30s

<panel>
  <title>地理分布</title>
  <map>
    <search>
      <query>
        index=goaccess sourcetype=goaccess:json
        | eval geo_location=spath(_raw, "geo_location.data{}")
        | mvexpand geo_location
        | eval country=spath(geo_location, "data")
        | eval hits=spath(geo_location, "hits")
        | stats sum(hits) as count by country
        | sort -count
        | head 20
        | geom geo_countries featureIdField=country
      </query>
      <earliest>-1h@h</earliest>
      <latest>now</latest>
      <refresh>30s</refresh>
    </search>
    <option name="mapping.type">choropleth</option>
    <option name="mapping.choroplethLayer.colorMode">auto</option>
    <option name="mapping.choroplethLayer.maximumColor">0xDB5800</option>
    <option name="mapping.choroplethLayer.minimumColor">0x2F25BA</option>
  </map>
</panel>

访问时间分布 index=goaccess sourcetype=goaccess:json | eval visit_time=spath(_raw, “visit_time.data{}”) | mvexpand visit_time | eval hour=spath(visit_time, “data”) | eval hits=spath(visit_time, “hits”) | stats sum(hits) as count by hour | sort hour -1h@h now 30s

<panel>
  <title>引荐来源</title>
  <table>
    <search>
      <query>
        index=goaccess sourcetype=goaccess:json
        | eval referring_sites=spath(_raw, "referring_sites.data{}")
        | mvexpand referring_sites
        | eval referrer=spath(referring_sites, "data")
        | eval hits=spath(referring_sites, "hits")
        | stats sum(hits) as "访问次数" by referrer
        | sort -"访问次数"
        | head 15
        | rename referrer as "引荐网站"
      </query>
      <earliest>-1h@h</earliest>
      <latest>now</latest>
      <refresh>30s</refresh>
    </search>
    <option name="drilldown">cell</option>
  </table>
</panel>

“’

    url = f"{self.splunk_url}/services/data/ui/views"

    headers = {
        'Authorization': f'Splunk {self.session_key}',
        'Content-Type': 'application/x-www-form-urlencoded'
    }

    data = {
        'name': 'goaccess_analytics',
        'eai:data': dashboard_xml
    }

    try:
        response = requests.post(url, data=data, headers=headers, verify=False)
        if response.status_code in [200, 201, 409]:
            logger.info("GoAccess仪表板创建成功")
            logger.info(f"访问地址: {self.splunk_url}/app/search/goaccess_analytics")
        else:
            logger.error(f"创建仪表板失败: {response.text}")
    except Exception as e:
        logger.error(f"创建仪表板时出错: {e}")

def create_alerts(self):
    """创建告警"""
    alerts = [
        {
            'name': 'goaccess_high_error_rate',
            'search': '''
                index=goaccess sourcetype=goaccess:json
                | eval status_codes=spath(_raw, "status_codes.data{}")
                | mvexpand status_codes
                | eval status_code=spath(status_codes, "data")
                | eval hits=spath(status_codes, "hits")
                | eval is_error=if(match(status_code, "^[45]"), hits, 0)
                | stats sum(hits) as total_hits, sum(is_error) as error_hits
                | eval error_rate=(error_hits/total_hits)*100
                | where error_rate > 5
            ''',
            'description': '错误率超过5%时触发告警',
            'cron_schedule': '*/5 * * * *',
            'actions': 'email'
        },
        {
            'name': 'goaccess_high_traffic',
            'search': '''
                index=goaccess sourcetype=goaccess:json
                | eval total_requests=spath(_raw, "general.total_requests")
                | where total_requests > 10000
            ''',
            'description': '总请求数超过10000时触发告警',
            'cron_schedule': '*/10 * * * *',
            'actions': 'email'
        }
    ]

    for alert in alerts:
        self.create_saved_search(
            name=alert['name'],
            search_query=alert['search'],
            description=alert['description']
        )

        # 配置告警调度
        self.schedule_alert(alert)

def schedule_alert(self, alert_config):
    """配置告警调度"""
    url = f"{self.splunk_url}/services/saved/searches/{alert_config['name']}"

    headers = {
        'Authorization': f'Splunk {self.session_key}',
        'Content-Type': 'application/x-www-form-urlencoded'
    }

    data = {
        'is_scheduled': '1',
        'cron_schedule': alert_config['cron_schedule'],
        'actions': alert_config['actions'],
        'action.email.to': 'admin@example.com',
        'action.email.subject': f"Splunk Alert: {alert_config['name']}",
        'action.email.message.alert': '$result.error_rate$% error rate detected'
    }

    try:
        response = requests.post(url, data=data, headers=headers, verify=False)
        if response.status_code == 200:
            logger.info(f"告警 '{alert_config['name']}' 调度配置成功")
        else:
            logger.error(f"配置告警调度失败: {response.text}")
    except Exception as e:
        logger.error(f"配置告警调度时出错: {e}")

def setup_all(self):
    """执行完整设置"""